package com.qianwen.mdc.collect.service; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; import org.apache.iotdb.isession.pool.SessionDataSetWrapper; import org.apache.iotdb.rpc.IoTDBConnectionException; import org.apache.iotdb.rpc.StatementExecutionException; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.common.RowRecord; import org.apache.iotdb.tsfile.write.record.Tablet; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import com.alibaba.fastjson.JSONObject; import com.qianwen.core.tool.utils.ObjectUtil; import com.qianwen.mdc.collect.config.IotDBSessionConfig; import com.qianwen.mdc.collect.constants.IOTDBConstant; import com.qianwen.mdc.collect.domain.TelemetryData; import com.qianwen.mdc.collect.domain.TelemetryDataItem; import com.qianwen.mdc.collect.entity.iotdb.ProcessParam; import com.qianwen.mdc.collect.mqtt.MqttMessageSender; import com.qianwen.mdc.collect.vo.WorkstationDatapointsVO; /** * 采集数据处理入库 */ @Service public class ProcessParamService { private static final Logger log = LoggerFactory.getLogger(ProcessParamService.class); private static final Map PROCESS_PARAM_MAP = new HashMap<>(); @Autowired private IotDBSessionConfig iotdbConfig; @Autowired private IotDBCommonService iotDBCommonService; @Autowired private MqttMessageSender mqttMessageSender; public static List schemas = new ArrayList<>(); static { schemas.add(new MeasurementSchema("workstation_id", TSDataType.INT64)); schemas.add(new MeasurementSchema("n", TSDataType.TEXT)); schemas.add(new MeasurementSchema("v", TSDataType.TEXT)); } /** * 实时数据topic,要与mdc里面得相同 */ public static final String WOCKSTATION_REALTIMEDATA_TOPIC = "mdc/realtimedata"; public void insertProcessParam(ProcessParam param) { String deviceId = generateDeviceId(param.getWorkstationId(),param.getN()); iotDBCommonService.isTemplateSetOnPath(IOTDBConstant.TEMPLATE_PROCESS_PARAM, deviceId); Tablet tablet = new Tablet(deviceId, schemas); int rowIndex = tablet.rowSize++; tablet.addTimestamp(rowIndex, param.getTime()); tablet.addValue("workstation_id",rowIndex,param.getWorkstationId()); tablet.addValue("n",rowIndex,param.getN()); tablet.addValue("v",rowIndex,param.getV()); try { iotdbConfig.getSessionPool().insertAlignedTablet(tablet); //updateLastParam(dt.getWorkstationId(),typeList); } catch (Exception e) { log.error("IOTDB入库失败",e); }finally { //iotdbConfig.getSessionPool().clo1se(); } } /** * 发送实时数据消息 * @param param */ public void sendRealtimeDataMsg(ProcessParam param) { //发送mqtt消息,通知mdc消息来了 JSONObject json = new JSONObject(); json.put("workstationId",param.getWorkstationId()); json.put("name", param.getN()); json.put("value", param.getV()); json.put("time", param.getTime()); mqttMessageSender.sendMessage(WOCKSTATION_REALTIMEDATA_TOPIC, json.toJSONString()); } String generateDeviceId(long workstationId,String propertyName) { return IOTDBConstant.DB_PREFIX+IOTDBConstant.TEMPLATE_PROCESS_PARAM+"_" + workstationId+"_"+propertyName; } }