| | |
| | | import com.qianwen.mdc.collect.domain.TelemetryData; |
| | | import com.qianwen.mdc.collect.domain.TelemetryDataItem; |
| | | import com.qianwen.mdc.collect.mqtt.MqttMessageSender; |
| | | import com.qianwen.mdc.collect.vo.WorkstationDatapointsVO; |
| | | |
| | | /** |
| | | * 采集数据处理入库 |
| | |
| | | public class CollectDataService { |
| | | private static final Logger log = LoggerFactory.getLogger(CollectDataService.class); |
| | | |
| | | //private String DB_PREFIX = "root.f2."; |
| | | private static final Map<Integer, String> PROCESS_PARAM_MAP = new HashMap<>(); |
| | | @Autowired |
| | | private IotDBSessionConfig iotdbConfig; |
| | |
| | | private IotDBCommonService iotDBCommonService; |
| | | @Autowired |
| | | private MqttMessageSender mqttMessageSender; |
| | | |
| | | |
| | | /** |
| | | * 实时数据topic,要与mdc里面得相同 |
| | |
| | | public void handleCollectData(List<TelemetryData> telemetryDataList) { |
| | | |
| | | for (TelemetryData dt : telemetryDataList) { |
| | | |
| | | handleOneWorkstation(dt); |
| | | |
| | | sendRealtimeDataMsg(dt); |
| | |
| | | * @param dt |
| | | */ |
| | | void handleOneWorkstation(TelemetryData dt) { |
| | | String deviceId;// = DB_PREFIX+TEMPLATE_NAME + "_" + dt.getWorkstationId(); |
| | | |
| | | List<MeasurementSchema> schemas = new ArrayList<>(); |
| | | String deviceId; |
| | | |
| | | //long workstationId = dpVo.getWorkstationId(); |
| | | |
| | | List<MeasurementSchema> schemas = new ArrayList<>(); |
| | | schemas.add(new MeasurementSchema("workstation_id", TSDataType.INT64)); |
| | | schemas.add(new MeasurementSchema("n", TSDataType.TEXT)); |
| | | schemas.add(new MeasurementSchema("v", TSDataType.TEXT)); |
| | | |
| | | |
| | | int rowIndex = 0; |
| | | |
| | |
| | | iotDBCommonService.setTemmplateIfNotSet(TEMPLATE_NAME, deviceId); |
| | | Tablet tablet = new Tablet(deviceId, schemas); |
| | | for(TypedTelemetryData tdata : typeList) { |
| | | |
| | | |
| | | rowIndex = tablet.rowSize++; |
| | | tablet.addTimestamp(rowIndex, tdata.getTime()); |
| | | tablet.addValue("workstation_id",rowIndex,dt.getWorkstationId()); |
| | |
| | | if(!tdata.getName().equals("Alarm")) { |
| | | tablet.addValue("v",rowIndex,tdata.getValue()); |
| | | }else { |
| | | //告警信息根据原版需要处理一下,格式未json对象:{"timestamp":1718839644476,"code":1000,"msg":"EMERGENCY STOP","alarmtype":15,"level":""} |
| | | //告警信息根据原版需要处理一下,格式为json对象:{"timestamp":1718839644476,"code":1000,"msg":"EMERGENCY STOP","alarmtype":15,"level":""} |
| | | tablet.addValue("v",rowIndex,formatAlarmMsg(tdata.getTime(),tdata.getValue())); |
| | | } |
| | | } |
| | | |
| | | try { |
| | | iotdbConfig.getSessionPool().insertAlignedTablet(tablet); |
| | | |
| | | //updateLastParam(dt.getWorkstationId(),typeList); |
| | | } catch (Exception e) { |
| | | log.error("IOTDB入库失败",e); |
| | | e.printStackTrace(); |
| | | }finally { |
| | | //iotdbConfig.getSessionPool().clo1se(); |
| | | } |