package com.qianwen.mdc.collect.service;
|
|
import java.util.ArrayList;
|
import java.util.HashMap;
|
import java.util.List;
|
import java.util.Map;
|
import java.util.stream.Collectors;
|
|
import org.apache.iotdb.isession.pool.SessionDataSetWrapper;
|
import org.apache.iotdb.rpc.IoTDBConnectionException;
|
import org.apache.iotdb.rpc.StatementExecutionException;
|
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
|
import org.apache.iotdb.tsfile.read.common.RowRecord;
|
import org.apache.iotdb.tsfile.write.record.Tablet;
|
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
|
import org.slf4j.Logger;
|
import org.slf4j.LoggerFactory;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.stereotype.Service;
|
|
import com.alibaba.fastjson.JSONObject;
|
import com.qianwen.core.tool.utils.ObjectUtil;
|
import com.qianwen.mdc.collect.config.IotDBSessionConfig;
|
import com.qianwen.mdc.collect.constants.IOTDBConstant;
|
import com.qianwen.mdc.collect.domain.TelemetryData;
|
import com.qianwen.mdc.collect.domain.TelemetryDataItem;
|
import com.qianwen.mdc.collect.entity.iotdb.ProcessParam;
|
import com.qianwen.mdc.collect.mqtt.MqttMessageSender;
|
import com.qianwen.mdc.collect.vo.WorkstationDatapointsVO;
|
|
/**
|
* 采集数据处理入库
|
*/
|
@Service
|
public class ProcessParamService {
|
private static final Logger log = LoggerFactory.getLogger(ProcessParamService.class);
|
|
private static final Map<Integer, String> PROCESS_PARAM_MAP = new HashMap<>();
|
@Autowired
|
private IotDBSessionConfig iotdbConfig;
|
@Autowired
|
private IotDBCommonService iotDBCommonService;
|
@Autowired
|
private MqttMessageSender mqttMessageSender;
|
|
|
public static List<MeasurementSchema> schemas = new ArrayList<>();
|
static {
|
schemas.add(new MeasurementSchema("workstation_id", TSDataType.INT64));
|
schemas.add(new MeasurementSchema("n", TSDataType.TEXT));
|
schemas.add(new MeasurementSchema("v", TSDataType.TEXT));
|
}
|
/**
|
* 实时数据topic,要与mdc里面得相同
|
*/
|
public static final String WOCKSTATION_REALTIMEDATA_TOPIC = "mdc/realtimedata";
|
|
|
public void insertProcessParam(ProcessParam param) {
|
String deviceId = generateDeviceId(param.getWorkstationId(),param.getN());
|
|
iotDBCommonService.isTemplateSetOnPath(IOTDBConstant.TEMPLATE_PROCESS_PARAM, deviceId);
|
|
Tablet tablet = new Tablet(deviceId, schemas);
|
int rowIndex = tablet.rowSize++;
|
tablet.addTimestamp(rowIndex, param.getTime());
|
tablet.addValue("workstation_id",rowIndex,param.getWorkstationId());
|
tablet.addValue("n",rowIndex,param.getN());
|
tablet.addValue("v",rowIndex,param.getV());
|
try {
|
iotdbConfig.getSessionPool().insertAlignedTablet(tablet);
|
//updateLastParam(dt.getWorkstationId(),typeList);
|
} catch (Exception e) {
|
log.error("IOTDB入库失败",e);
|
}finally {
|
//iotdbConfig.getSessionPool().clo1se();
|
}
|
}
|
|
|
/**
|
* 发送实时数据消息
|
* @param param
|
*/
|
public void sendRealtimeDataMsg(ProcessParam param) {
|
|
//发送mqtt消息,通知mdc消息来了
|
|
JSONObject json = new JSONObject();
|
json.put("workstationId",param.getWorkstationId());
|
json.put("name", param.getN());
|
json.put("value", param.getV());
|
json.put("time", param.getTime());
|
|
mqttMessageSender.sendMessage(WOCKSTATION_REALTIMEDATA_TOPIC, json.toJSONString());
|
|
}
|
|
|
String generateDeviceId(long workstationId,String propertyName) {
|
return IOTDBConstant.DB_PREFIX+IOTDBConstant.TEMPLATE_PROCESS_PARAM+"_" + workstationId+"_"+propertyName;
|
}
|
}
|