| | |
| | | import org.springframework.stereotype.Service; |
| | | |
| | | import com.alibaba.fastjson.JSONObject; |
| | | import com.qianwen.core.tool.utils.ObjectUtil; |
| | | import com.qianwen.mdc.collect.config.IotDBSessionConfig; |
| | | import com.qianwen.mdc.collect.constants.IOTDBConstant; |
| | | import com.qianwen.mdc.collect.domain.TelemetryData; |
| | | import com.qianwen.mdc.collect.domain.TelemetryDataItem; |
| | | import com.qianwen.mdc.collect.mqtt.MqttMessageSender; |
| | | import com.qianwen.mdc.collect.utils.redis.RedisUtil; |
| | | |
| | | import cn.hutool.json.JSONUtil; |
| | | |
| | | /** |
| | | * 采集数据处理入库 |
| | |
| | | private IotDBSessionConfig iotdbConfig; |
| | | @Autowired |
| | | private IotDBCommonService iotDBCommonService; |
| | | @Autowired |
| | | private MqttMessageSender mqttMessageSender; |
| | | |
| | | /** |
| | | * 实时数据topic,要与mdc里面得相同 |
| | | */ |
| | | public static final String WOCKSTATION_REALTIMEDATA_TOPIC = "mdc/realtimedata"; |
| | | |
| | | private static String TEMPLATE_NAME = "process_param"; |
| | | |
| | |
| | | |
| | | for (TelemetryData dt : telemetryDataList) { |
| | | handleOneWorkstation(dt); |
| | | |
| | | sendRealtimeDataMsg(dt); |
| | | } |
| | | |
| | | } |
| | | |
| | | void sendRealtimeDataMsg(TelemetryData dt) { |
| | | if(ObjectUtil.isEmpty(dt.getDataItems())){ |
| | | return; |
| | | } |
| | | |
| | | List<TypedTelemetryData> dataList= new ArrayList<>(); |
| | | String propertyName; |
| | | for (TelemetryDataItem dataItem : dt.getDataItems()) { |
| | | |
| | | for (Map<String, String> point : dataItem.getDataPoints()) { |
| | | |
| | | String[] keys = point.keySet().toArray(new String[0]); |
| | | for(int i=0;i<keys.length;i++) { |
| | | TypedTelemetryData tpData = new TypedTelemetryData(); |
| | | propertyName = keys[i]; |
| | | tpData.setTime(dataItem.getTime()); |
| | | tpData.setName(propertyName); |
| | | tpData.setValue(point.get(propertyName)); |
| | | |
| | | dataList.add(tpData); |
| | | } |
| | | |
| | | } |
| | | |
| | | } |
| | | |
| | | //发送mqtt消息,通知mdc消息来了 |
| | | for(TypedTelemetryData item : dataList) { |
| | | JSONObject json = new JSONObject(); |
| | | json.put("workstationId",dt.getWorkstationId()); |
| | | json.put("name", item.getName()); |
| | | json.put("value", item.getValue()); |
| | | json.put("time", item.getTime()); |
| | | |
| | | mqttMessageSender.sendMessage(WOCKSTATION_REALTIMEDATA_TOPIC, json.toJSONString()); |
| | | } |
| | | |
| | | } |
| | | /** |
| | | * 处理一个工位的数据解析入库 |
| | | * @param dt |
| | | */ |
| | | void handleOneWorkstation(TelemetryData dt) { |
| | | String deviceId;// = DB_PREFIX+TEMPLATE_NAME + "_" + dt.getWorkstationId(); |
| | | |
| | | // 挂载模板 |
| | | //iotDBCommonService.setTemmplateIsNotSet(TEMPLATE_NAME, deviceId); |
| | | |
| | | List<MeasurementSchema> schemas = new ArrayList<>(); |
| | | |
| | |
| | | try { |
| | | iotdbConfig.getSessionPool().insertAlignedTablet(tablet); |
| | | |
| | | updateLastParam(dt.getWorkstationId(),typeList); |
| | | //updateLastParam(dt.getWorkstationId(),typeList); |
| | | } catch (Exception e) { |
| | | log.error("IOTDB入库失败",e); |
| | | e.printStackTrace(); |
| | | }finally { |
| | | //iotdbConfig.getSessionPool().clo1se(); |
| | | } |
| | | } |
| | | |
| | |
| | | schemas.add(new MeasurementSchema("param_json", TSDataType.TEXT)); |
| | | |
| | | Tablet tablet = new Tablet("root.f2.last_process_param", schemas); |
| | | for(TypedTelemetryData tdata: typeList) { |
| | | |
| | | } |
| | | |
| | | |
| | | |
| | | String sql = "select update_time,workstation_id,param_json from root.f2.last_process_param where workstation_id="+workstationId; |
| | | SessionDataSetWrapper dsw = iotdbConfig.getSessionPool().executeQueryStatement(sql); |
| | | |
| | | if(dsw.hasNext()) { |
| | | RowRecord rec = dsw.next(); |
| | | long time = rec.getTimestamp(); |
| | | |
| | | String paramJsonStr = rec.getFields().get(2).getStringValue(); |
| | | |
| | | tablet.rowSize = 1; |
| | | tablet.addTimestamp(0, time); |
| | | tablet.addValue("update_time", 0, updateTime); |
| | | tablet.addValue("workstation_id", 0, workstationId); |
| | | JSONObject paramObj = JSONObject.parseObject(paramJsonStr); |
| | | for(TypedTelemetryData tdata: typeList) { |
| | | try(SessionDataSetWrapper dsw = iotdbConfig.getSessionPool().executeQueryStatement(sql)){ |
| | | if(dsw.hasNext()) { |
| | | RowRecord rec = dsw.next(); |
| | | long time = rec.getTimestamp(); |
| | | |
| | | if(paramObj.containsKey(tdata.getName())) { |
| | | JSONObject itemObj = paramObj.getJSONObject(tdata.getName()); |
| | | itemObj.put("value", tdata.getValue()); |
| | | itemObj.put("time", tdata.getTime());//采集时间 |
| | | paramObj.put(tdata.getName(), itemObj); |
| | | }else { |
| | | String paramJsonStr = rec.getFields().get(2).getStringValue(); |
| | | |
| | | tablet.rowSize = 1; |
| | | tablet.addTimestamp(0, time); |
| | | tablet.addValue("update_time", 0, updateTime); |
| | | tablet.addValue("workstation_id", 0, workstationId); |
| | | JSONObject paramObj = JSONObject.parseObject(paramJsonStr); |
| | | for(TypedTelemetryData tdata: typeList) { |
| | | |
| | | if(paramObj.containsKey(tdata.getName())) { |
| | | JSONObject itemObj = paramObj.getJSONObject(tdata.getName()); |
| | | itemObj.put("value", tdata.getValue()); |
| | | itemObj.put("time", tdata.getTime());//采集时间 |
| | | paramObj.put(tdata.getName(), itemObj); |
| | | }else { |
| | | JSONObject itemObj = new JSONObject(); |
| | | itemObj.put("value", tdata.getValue()); |
| | | itemObj.put("time", tdata.getTime());//采集时间 |
| | | paramObj.put(tdata.getName(), itemObj); |
| | | |
| | | } |
| | | } |
| | | tablet.addValue("param_json", 0, paramObj.toJSONString()); |
| | | |
| | | this.iotdbConfig.getSessionPool().insertAlignedTablet(tablet); |
| | | |
| | | }else { |
| | | //没数据,新加入一条 |
| | | tablet.rowSize = 1; |
| | | |
| | | tablet.addTimestamp(0, updateTime); |
| | | tablet.addValue("update_time", 0, updateTime); |
| | | tablet.addValue("workstation_id", 0, workstationId); |
| | | |
| | | JSONObject paramObj = new JSONObject(); |
| | | for(TypedTelemetryData tdata: typeList) { |
| | | JSONObject itemObj = new JSONObject(); |
| | | itemObj.put("value", tdata.getValue()); |
| | | itemObj.put("time", tdata.getTime());//采集时间 |
| | | paramObj.put(tdata.getName(), itemObj); |
| | | |
| | | } |
| | | |
| | | tablet.addValue("param_json", 0,paramObj.toJSONString()); |
| | | this.iotdbConfig.getSessionPool().insertAlignedTablet(tablet); |
| | | } |
| | | tablet.addValue("param_json", 0, paramObj.toJSONString()); |
| | | |
| | | this.iotdbConfig.getSessionPool().insertAlignedTablet(tablet); |
| | | |
| | | }else { |
| | | //没数据,新加入一条 |
| | | tablet.rowSize = 1; |
| | | |
| | | tablet.addTimestamp(0, updateTime); |
| | | tablet.addValue("update_time", 0, updateTime); |
| | | tablet.addValue("workstation_id", 0, workstationId); |
| | | |
| | | JSONObject paramObj = new JSONObject(); |
| | | for(TypedTelemetryData tdata: typeList) { |
| | | JSONObject itemObj = new JSONObject(); |
| | | itemObj.put("value", tdata.getValue()); |
| | | itemObj.put("time", tdata.getTime());//采集时间 |
| | | paramObj.put(tdata.getName(), itemObj); |
| | | } |
| | | |
| | | tablet.addValue("param_json", 0,paramObj.toJSONString()); |
| | | this.iotdbConfig.getSessionPool().insertAlignedTablet(tablet); |
| | | } |
| | | |
| | | //dsw.close(); |
| | | } |
| | | |
| | | /** |