| | |
| | | import java.util.HashMap; |
| | | import java.util.List; |
| | | import java.util.Map; |
| | | import java.util.Random; |
| | | import java.util.stream.Collectors; |
| | | |
| | | import org.apache.iotdb.isession.pool.SessionDataSetWrapper; |
| | |
| | | import org.springframework.stereotype.Service; |
| | | |
| | | import com.alibaba.fastjson.JSONObject; |
| | | import com.qianwen.core.tool.utils.ObjectUtil; |
| | | import com.qianwen.mdc.collect.config.IotDBSessionConfig; |
| | | import com.qianwen.mdc.collect.constants.IOTDBConstant; |
| | | import com.qianwen.mdc.collect.domain.TelemetryData; |
| | | import com.qianwen.mdc.collect.domain.TelemetryDataItem; |
| | | import com.qianwen.mdc.collect.utils.redis.RedisUtil; |
| | | import com.qianwen.mdc.collect.mqtt.MqttMessageSender; |
| | | import com.qianwen.mdc.collect.vo.WorkstationDatapointsVO; |
| | | |
| | | /** |
| | | * 采集数据处理入库 |
| | |
| | | public class CollectDataService { |
| | | private static final Logger log = LoggerFactory.getLogger(CollectDataService.class); |
| | | |
| | | private String DB_PREFIX = "root.f2."; |
| | | private static final Map<Integer, String> PROCESS_PARAM_MAP = new HashMap<>(); |
| | | @Autowired |
| | | private IotDBSessionConfig iotdbConfig; |
| | | @Autowired |
| | | private IotDBCommonService iotDBCommonService; |
| | | @Autowired |
| | | private MqttMessageSender mqttMessageSender; |
| | | |
| | | |
| | | /** |
| | | * 实时数据topic,要与mdc里面得相同 |
| | | */ |
| | | public static final String WOCKSTATION_REALTIMEDATA_TOPIC = "mdc/realtimedata"; |
| | | |
| | | private static String TEMPLATE_NAME = "process_param"; |
| | | |
| | |
| | | public void handleCollectData(List<TelemetryData> telemetryDataList) { |
| | | |
| | | for (TelemetryData dt : telemetryDataList) { |
| | | |
| | | handleOneWorkstation(dt); |
| | | |
| | | sendRealtimeDataMsg(dt); |
| | | } |
| | | |
| | | } |
| | | |
| | | void sendRealtimeDataMsg(TelemetryData dt) { |
| | | if(ObjectUtil.isEmpty(dt.getDataItems())){ |
| | | return; |
| | | } |
| | | |
| | | List<TypedTelemetryData> dataList= new ArrayList<>(); |
| | | String propertyName; |
| | | for (TelemetryDataItem dataItem : dt.getDataItems()) { |
| | | |
| | | for (Map<String, String> point : dataItem.getDataPoints()) { |
| | | |
| | | String[] keys = point.keySet().toArray(new String[0]); |
| | | for(int i=0;i<keys.length;i++) { |
| | | TypedTelemetryData tpData = new TypedTelemetryData(); |
| | | propertyName = keys[i]; |
| | | tpData.setTime(dataItem.getTime()); |
| | | tpData.setName(propertyName); |
| | | tpData.setValue(point.get(propertyName)); |
| | | |
| | | dataList.add(tpData); |
| | | } |
| | | |
| | | } |
| | | |
| | | } |
| | | |
| | | //发送mqtt消息,通知mdc消息来了 |
| | | for(TypedTelemetryData item : dataList) { |
| | | JSONObject json = new JSONObject(); |
| | | json.put("workstationId",dt.getWorkstationId()); |
| | | json.put("name", item.getName()); |
| | | json.put("value", item.getValue()); |
| | | json.put("time", item.getTime()); |
| | | |
| | | mqttMessageSender.sendMessage(WOCKSTATION_REALTIMEDATA_TOPIC, json.toJSONString()); |
| | | } |
| | | |
| | | } |
| | | /** |
| | | * 处理一个工位的数据解析入库 |
| | | * @param dt |
| | | */ |
| | | void handleOneWorkstation(TelemetryData dt) { |
| | | String deviceId;// = DB_PREFIX+TEMPLATE_NAME + "_" + dt.getWorkstationId(); |
| | | |
| | | // 挂载模板 |
| | | //iotDBCommonService.setTemmplateIsNotSet(TEMPLATE_NAME, deviceId); |
| | | |
| | | List<MeasurementSchema> schemas = new ArrayList<>(); |
| | | String deviceId; |
| | | |
| | | //long workstationId = dpVo.getWorkstationId(); |
| | | |
| | | List<MeasurementSchema> schemas = new ArrayList<>(); |
| | | schemas.add(new MeasurementSchema("workstation_id", TSDataType.INT64)); |
| | | schemas.add(new MeasurementSchema("n", TSDataType.TEXT)); |
| | | schemas.add(new MeasurementSchema("v", TSDataType.TEXT)); |
| | | |
| | | |
| | | int rowIndex = 0; |
| | | |
| | | Map<String, List<TypedTelemetryData>> processParamsMap = parseTelemetryToTypedMapList(dt); |
| | | |
| | | |
| | | String[] nameArr = processParamsMap.keySet().toArray(new String[0]); |
| | | String name; |
| | |
| | | iotDBCommonService.setTemmplateIfNotSet(TEMPLATE_NAME, deviceId); |
| | | Tablet tablet = new Tablet(deviceId, schemas); |
| | | for(TypedTelemetryData tdata : typeList) { |
| | | |
| | | rowIndex = tablet.rowSize++; |
| | | tablet.addTimestamp(rowIndex, tdata.getTime()); |
| | | tablet.addValue("workstation_id",rowIndex,dt.getWorkstationId()); |
| | | tablet.addValue("n",rowIndex,tdata.getName()); |
| | | tablet.addValue("v",rowIndex,tdata.getValue()); |
| | | if(!tdata.getName().equals("Alarm")) { |
| | | tablet.addValue("v",rowIndex,tdata.getValue()); |
| | | }else { |
| | | //告警信息根据原版需要处理一下,格式为json对象:{"timestamp":1718839644476,"code":1000,"msg":"EMERGENCY STOP","alarmtype":15,"level":""} |
| | | tablet.addValue("v",rowIndex,formatAlarmMsg(tdata.getTime(),tdata.getValue())); |
| | | } |
| | | } |
| | | |
| | | try { |
| | | iotdbConfig.getSessionPool().insertAlignedTablet(tablet); |
| | | |
| | | updateLastParam(dt.getWorkstationId(),typeList); |
| | | //updateLastParam(dt.getWorkstationId(),typeList); |
| | | } catch (Exception e) { |
| | | log.error("IOTDB入库失败",e); |
| | | e.printStackTrace(); |
| | | }finally { |
| | | //iotdbConfig.getSessionPool().clo1se(); |
| | | } |
| | | } |
| | | |
| | | |
| | | } |
| | | |
| | | /** |
| | | * 将报警信息格式化未json对象(原版是jsonobj或者json数组),格式为 {"timestamp":1718839644476,"code":1000,"msg":"EMERGENCY STOP","alarmtype":15,"level":""} |
| | | * @param collectAlarmValue |
| | | * @return |
| | | */ |
| | | String formatAlarmMsg(long time,String collectAlarmValue){ |
| | | JSONObject alarmObj = new JSONObject(); |
| | | alarmObj.put("timestamp", time); |
| | | alarmObj.put("code", "00"); |
| | | alarmObj.put("msg", collectAlarmValue); |
| | | alarmObj.put("alarmtype", 0); |
| | | alarmObj.put("level", ""); |
| | | return alarmObj.toJSONString(); |
| | | } |
| | | |
| | | /** |
| | |
| | | schemas.add(new MeasurementSchema("param_json", TSDataType.TEXT)); |
| | | |
| | | Tablet tablet = new Tablet("root.f2.last_process_param", schemas); |
| | | for(TypedTelemetryData tdata: typeList) { |
| | | |
| | | } |
| | | |
| | | |
| | | |
| | | String sql = "select update_time,workstation_id,param_json from root.f2.last_process_param where workstation_id="+workstationId; |
| | | SessionDataSetWrapper dsw = iotdbConfig.getSessionPool().executeQueryStatement(sql); |
| | | |
| | | if(dsw.hasNext()) { |
| | | RowRecord rec = dsw.next(); |
| | | long time = rec.getTimestamp(); |
| | | |
| | | String paramJsonStr = rec.getFields().get(2).getStringValue(); |
| | | |
| | | tablet.rowSize = 1; |
| | | tablet.addTimestamp(0, time); |
| | | tablet.addValue("update_time", 0, updateTime); |
| | | tablet.addValue("workstation_id", 0, workstationId); |
| | | JSONObject paramObj = JSONObject.parseObject(paramJsonStr); |
| | | for(TypedTelemetryData tdata: typeList) { |
| | | try(SessionDataSetWrapper dsw = iotdbConfig.getSessionPool().executeQueryStatement(sql)){ |
| | | if(dsw.hasNext()) { |
| | | RowRecord rec = dsw.next(); |
| | | long time = rec.getTimestamp(); |
| | | |
| | | if(paramObj.containsKey(tdata.getName())) { |
| | | JSONObject itemObj = paramObj.getJSONObject(tdata.getName()); |
| | | itemObj.put("value", tdata.getValue()); |
| | | itemObj.put("time", tdata.getTime());//采集时间 |
| | | paramObj.put(tdata.getName(), itemObj); |
| | | }else { |
| | | String paramJsonStr = rec.getFields().get(2).getStringValue(); |
| | | |
| | | tablet.rowSize = 1; |
| | | tablet.addTimestamp(0, time); |
| | | tablet.addValue("update_time", 0, updateTime); |
| | | tablet.addValue("workstation_id", 0, workstationId); |
| | | JSONObject paramObj = JSONObject.parseObject(paramJsonStr); |
| | | for(TypedTelemetryData tdata: typeList) { |
| | | |
| | | if(paramObj.containsKey(tdata.getName())) { |
| | | JSONObject itemObj = paramObj.getJSONObject(tdata.getName()); |
| | | itemObj.put("value", tdata.getValue()); |
| | | itemObj.put("time", tdata.getTime());//采集时间 |
| | | paramObj.put(tdata.getName(), itemObj); |
| | | }else { |
| | | JSONObject itemObj = new JSONObject(); |
| | | itemObj.put("value", tdata.getValue()); |
| | | itemObj.put("time", tdata.getTime());//采集时间 |
| | | paramObj.put(tdata.getName(), itemObj); |
| | | |
| | | } |
| | | } |
| | | tablet.addValue("param_json", 0, paramObj.toJSONString()); |
| | | |
| | | this.iotdbConfig.getSessionPool().insertAlignedTablet(tablet); |
| | | |
| | | }else { |
| | | //没数据,新加入一条 |
| | | tablet.rowSize = 1; |
| | | |
| | | tablet.addTimestamp(0, updateTime); |
| | | tablet.addValue("update_time", 0, updateTime); |
| | | tablet.addValue("workstation_id", 0, workstationId); |
| | | |
| | | JSONObject paramObj = new JSONObject(); |
| | | for(TypedTelemetryData tdata: typeList) { |
| | | JSONObject itemObj = new JSONObject(); |
| | | itemObj.put("value", tdata.getValue()); |
| | | itemObj.put("time", tdata.getTime());//采集时间 |
| | | paramObj.put(tdata.getName(), itemObj); |
| | | |
| | | } |
| | | |
| | | tablet.addValue("param_json", 0,paramObj.toJSONString()); |
| | | this.iotdbConfig.getSessionPool().insertAlignedTablet(tablet); |
| | | } |
| | | tablet.addValue("param_json", 0, paramObj.toJSONString()); |
| | | |
| | | this.iotdbConfig.getSessionPool().insertAlignedTablet(tablet); |
| | | |
| | | }else { |
| | | //没数据,新加入一条 |
| | | tablet.rowSize = 1; |
| | | |
| | | tablet.addTimestamp(0, updateTime); |
| | | tablet.addValue("update_time", 0, updateTime); |
| | | tablet.addValue("workstation_id", 0, workstationId); |
| | | |
| | | JSONObject paramObj = new JSONObject(); |
| | | for(TypedTelemetryData tdata: typeList) { |
| | | JSONObject itemObj = new JSONObject(); |
| | | itemObj.put("value", tdata.getValue()); |
| | | itemObj.put("time", tdata.getTime());//采集时间 |
| | | paramObj.put(tdata.getName(), itemObj); |
| | | } |
| | | |
| | | tablet.addValue("param_json", 0,paramObj.toJSONString()); |
| | | this.iotdbConfig.getSessionPool().insertAlignedTablet(tablet); |
| | | } |
| | | |
| | | //dsw.close(); |
| | | } |
| | | |
| | | /** |
| | |
| | | } |
| | | |
| | | String generateDeviceId(long workstationId,String propertyName) { |
| | | return DB_PREFIX+TEMPLATE_NAME + "_" + workstationId+"_"+propertyName; |
| | | return IOTDBConstant.DB_PREFIX+TEMPLATE_NAME + "_" + workstationId+"_"+propertyName; |
| | | } |
| | | } |
| | | |