package com.qianwen.mdc.collect.service; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; import org.apache.iotdb.isession.pool.SessionDataSetWrapper; import org.apache.iotdb.rpc.IoTDBConnectionException; import org.apache.iotdb.rpc.StatementExecutionException; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.common.RowRecord; import org.apache.iotdb.tsfile.write.record.Tablet; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import com.alibaba.fastjson.JSONObject; import com.qianwen.core.tool.utils.ObjectUtil; import com.qianwen.mdc.collect.config.IotDBSessionConfig; import com.qianwen.mdc.collect.constants.IOTDBConstant; import com.qianwen.mdc.collect.domain.TelemetryData; import com.qianwen.mdc.collect.domain.TelemetryDataItem; import com.qianwen.mdc.collect.mqtt.MqttMessageSender; import com.qianwen.mdc.collect.vo.WorkstationDatapointsVO; /** * 采集数据处理入库 */ @Service public class CollectDataService { private static final Logger log = LoggerFactory.getLogger(CollectDataService.class); private static final Map PROCESS_PARAM_MAP = new HashMap<>(); @Autowired private IotDBSessionConfig iotdbConfig; @Autowired private IotDBCommonService iotDBCommonService; @Autowired private MqttMessageSender mqttMessageSender; /** * 实时数据topic,要与mdc里面得相同 */ public static final String WOCKSTATION_REALTIMEDATA_TOPIC = "mdc/realtimedata"; private static String TEMPLATE_NAME = "process_param"; static { PROCESS_PARAM_MAP.put(1, "STATE"); PROCESS_PARAM_MAP.put(2, "OUTPUT"); PROCESS_PARAM_MAP.put(3, "ALARM"); PROCESS_PARAM_MAP.put(4, "PROGRAMNUM"); PROCESS_PARAM_MAP.put(5, "OUTPUT"); PROCESS_PARAM_MAP.put(6, "ALARM"); PROCESS_PARAM_MAP.put(7, "ALARM"); } public void handleCollectData(List telemetryDataList) { for (TelemetryData dt : telemetryDataList) { handleOneWorkstation(dt); sendRealtimeDataMsg(dt); } } void sendRealtimeDataMsg(TelemetryData dt) { if(ObjectUtil.isEmpty(dt.getDataItems())){ return; } List dataList= new ArrayList<>(); String propertyName; for (TelemetryDataItem dataItem : dt.getDataItems()) { for (Map point : dataItem.getDataPoints()) { String[] keys = point.keySet().toArray(new String[0]); for(int i=0;i schemas = new ArrayList<>(); schemas.add(new MeasurementSchema("workstation_id", TSDataType.INT64)); schemas.add(new MeasurementSchema("n", TSDataType.TEXT)); schemas.add(new MeasurementSchema("v", TSDataType.TEXT)); int rowIndex = 0; Map> processParamsMap = parseTelemetryToTypedMapList(dt); String[] nameArr = processParamsMap.keySet().toArray(new String[0]); String name; for(int i=0;i typeList = processParamsMap.get(name); deviceId = generateDeviceId(dt.getWorkstationId(),name); //System.out.println("deivcdid="+deviceId); iotDBCommonService.setTemmplateIfNotSet(TEMPLATE_NAME, deviceId); Tablet tablet = new Tablet(deviceId, schemas); for(TypedTelemetryData tdata : typeList) { rowIndex = tablet.rowSize++; tablet.addTimestamp(rowIndex, tdata.getTime()); tablet.addValue("workstation_id",rowIndex,dt.getWorkstationId()); tablet.addValue("n",rowIndex,tdata.getName()); if(!tdata.getName().equals("Alarm")) { tablet.addValue("v",rowIndex,tdata.getValue()); }else { //告警信息根据原版需要处理一下,格式为json对象:{"timestamp":1718839644476,"code":1000,"msg":"EMERGENCY STOP","alarmtype":15,"level":""} tablet.addValue("v",rowIndex,formatAlarmMsg(tdata.getTime(),tdata.getValue())); } } try { iotdbConfig.getSessionPool().insertAlignedTablet(tablet); //updateLastParam(dt.getWorkstationId(),typeList); } catch (Exception e) { log.error("IOTDB入库失败",e); }finally { //iotdbConfig.getSessionPool().clo1se(); } } } /** * 将报警信息格式化未json对象(原版是jsonobj或者json数组),格式为 {"timestamp":1718839644476,"code":1000,"msg":"EMERGENCY STOP","alarmtype":15,"level":""} * @param collectAlarmValue * @return */ String formatAlarmMsg(long time,String collectAlarmValue){ JSONObject alarmObj = new JSONObject(); alarmObj.put("timestamp", time); alarmObj.put("code", "00"); alarmObj.put("msg", collectAlarmValue); alarmObj.put("alarmtype", 0); alarmObj.put("level", ""); return alarmObj.toJSONString(); } /** * 填充最新参数的数据 * @param typeList * @throws StatementExecutionException * @throws IoTDBConnectionException */ void updateLastParam(long workstationId,List typeList) throws IoTDBConnectionException, StatementExecutionException { if(typeList.isEmpty()){ return; } long updateTime = typeList.get(0).getTime(); List schemas = new ArrayList<>(); schemas.add(new MeasurementSchema("update_time", TSDataType.INT64)); schemas.add(new MeasurementSchema("workstation_id", TSDataType.INT64)); schemas.add(new MeasurementSchema("param_json", TSDataType.TEXT)); Tablet tablet = new Tablet("root.f2.last_process_param", schemas); String sql = "select update_time,workstation_id,param_json from root.f2.last_process_param where workstation_id="+workstationId; try(SessionDataSetWrapper dsw = iotdbConfig.getSessionPool().executeQueryStatement(sql)){ if(dsw.hasNext()) { RowRecord rec = dsw.next(); long time = rec.getTimestamp(); String paramJsonStr = rec.getFields().get(2).getStringValue(); tablet.rowSize = 1; tablet.addTimestamp(0, time); tablet.addValue("update_time", 0, updateTime); tablet.addValue("workstation_id", 0, workstationId); JSONObject paramObj = JSONObject.parseObject(paramJsonStr); for(TypedTelemetryData tdata: typeList) { if(paramObj.containsKey(tdata.getName())) { JSONObject itemObj = paramObj.getJSONObject(tdata.getName()); itemObj.put("value", tdata.getValue()); itemObj.put("time", tdata.getTime());//采集时间 paramObj.put(tdata.getName(), itemObj); }else { JSONObject itemObj = new JSONObject(); itemObj.put("value", tdata.getValue()); itemObj.put("time", tdata.getTime());//采集时间 paramObj.put(tdata.getName(), itemObj); } } tablet.addValue("param_json", 0, paramObj.toJSONString()); this.iotdbConfig.getSessionPool().insertAlignedTablet(tablet); }else { //没数据,新加入一条 tablet.rowSize = 1; tablet.addTimestamp(0, updateTime); tablet.addValue("update_time", 0, updateTime); tablet.addValue("workstation_id", 0, workstationId); JSONObject paramObj = new JSONObject(); for(TypedTelemetryData tdata: typeList) { JSONObject itemObj = new JSONObject(); itemObj.put("value", tdata.getValue()); itemObj.put("time", tdata.getTime());//采集时间 paramObj.put(tdata.getName(), itemObj); } tablet.addValue("param_json", 0,paramObj.toJSONString()); this.iotdbConfig.getSessionPool().insertAlignedTablet(tablet); } } //dsw.close(); } /** * 解析数据,形成name - value对类型的数据列表,并用name进行分组 * @param dt * @return 按name分组后的数据 */ Map> parseTelemetryToTypedMapList(TelemetryData dt){ List list = new ArrayList<>(); String propertyName; for (TelemetryDataItem dataItem : dt.getDataItems()) { for (Map point : dataItem.getDataPoints()) { String[] keys = point.keySet().toArray(new String[0]); for(int i=0;i> map = list.stream().collect(Collectors.groupingBy(TypedTelemetryData::getName)); return map; } String generateDeviceId(long workstationId,String propertyName) { return IOTDBConstant.DB_PREFIX+TEMPLATE_NAME + "_" + workstationId+"_"+propertyName; } } /* * 分类的数据 */ class TypedTelemetryData{ private long time; private String name; private String value; public long getTime() { return time; } public void setTime(long time) { this.time = time; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getValue() { return value; } public void setValue(String value) { this.value = value; } @Override public String toString() { return "TypedTelemetryData [time=" + time + ", name=" + name + ", value=" + value + "]"; } }