package com.qianwen.mdc.collect.service;
|
|
import java.util.ArrayList;
|
import java.util.HashMap;
|
import java.util.List;
|
import java.util.Map;
|
import java.util.Random;
|
import java.util.stream.Collectors;
|
|
import org.apache.iotdb.isession.pool.SessionDataSetWrapper;
|
import org.apache.iotdb.rpc.IoTDBConnectionException;
|
import org.apache.iotdb.rpc.StatementExecutionException;
|
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
|
import org.apache.iotdb.tsfile.read.common.RowRecord;
|
import org.apache.iotdb.tsfile.write.record.Tablet;
|
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
|
import org.slf4j.Logger;
|
import org.slf4j.LoggerFactory;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.stereotype.Service;
|
|
import com.alibaba.fastjson.JSONObject;
|
import com.qianwen.core.tool.utils.ObjectUtil;
|
import com.qianwen.mdc.collect.config.IotDBSessionConfig;
|
import com.qianwen.mdc.collect.constants.IOTDBConstant;
|
import com.qianwen.mdc.collect.domain.TelemetryData;
|
import com.qianwen.mdc.collect.domain.TelemetryDataItem;
|
import com.qianwen.mdc.collect.mqtt.MqttMessageSender;
|
import com.qianwen.mdc.collect.utils.redis.RedisUtil;
|
|
import cn.hutool.json.JSONUtil;
|
|
/**
|
* 采集数据处理入库
|
*/
|
@Service
|
public class CollectDataService {
|
private static final Logger log = LoggerFactory.getLogger(CollectDataService.class);
|
|
//private String DB_PREFIX = "root.f2.";
|
private static final Map<Integer, String> PROCESS_PARAM_MAP = new HashMap<>();
|
@Autowired
|
private IotDBSessionConfig iotdbConfig;
|
@Autowired
|
private IotDBCommonService iotDBCommonService;
|
@Autowired
|
private MqttMessageSender mqttMessageSender;
|
|
/**
|
* 实时数据topic,要与mdc里面得相同
|
*/
|
public static final String WOCKSTATION_REALTIMEDATA_TOPIC = "mdc/realtimedata";
|
|
private static String TEMPLATE_NAME = "process_param";
|
|
static {
|
PROCESS_PARAM_MAP.put(1, "STATE");
|
PROCESS_PARAM_MAP.put(2, "OUTPUT");
|
PROCESS_PARAM_MAP.put(3, "ALARM");
|
PROCESS_PARAM_MAP.put(4, "PROGRAMNUM");
|
PROCESS_PARAM_MAP.put(5, "OUTPUT");
|
PROCESS_PARAM_MAP.put(6, "ALARM");
|
PROCESS_PARAM_MAP.put(7, "ALARM");
|
}
|
|
|
public void handleCollectData(List<TelemetryData> telemetryDataList) {
|
|
for (TelemetryData dt : telemetryDataList) {
|
handleOneWorkstation(dt);
|
|
sendRealtimeDataMsg(dt);
|
}
|
|
}
|
|
void sendRealtimeDataMsg(TelemetryData dt) {
|
if(ObjectUtil.isEmpty(dt.getDataItems())){
|
return;
|
}
|
|
List<TypedTelemetryData> dataList= new ArrayList<>();
|
String propertyName;
|
for (TelemetryDataItem dataItem : dt.getDataItems()) {
|
|
for (Map<String, String> point : dataItem.getDataPoints()) {
|
|
String[] keys = point.keySet().toArray(new String[0]);
|
for(int i=0;i<keys.length;i++) {
|
TypedTelemetryData tpData = new TypedTelemetryData();
|
propertyName = keys[i];
|
tpData.setTime(dataItem.getTime());
|
tpData.setName(propertyName);
|
tpData.setValue(point.get(propertyName));
|
|
dataList.add(tpData);
|
}
|
|
}
|
|
}
|
|
//发送mqtt消息,通知mdc消息来了
|
for(TypedTelemetryData item : dataList) {
|
JSONObject json = new JSONObject();
|
json.put("workstationId",dt.getWorkstationId());
|
json.put("name", item.getName());
|
json.put("value", item.getValue());
|
json.put("time", item.getTime());
|
|
mqttMessageSender.sendMessage(WOCKSTATION_REALTIMEDATA_TOPIC, json.toJSONString());
|
}
|
|
}
|
/**
|
* 处理一个工位的数据解析入库
|
* @param dt
|
*/
|
void handleOneWorkstation(TelemetryData dt) {
|
String deviceId;// = DB_PREFIX+TEMPLATE_NAME + "_" + dt.getWorkstationId();
|
|
List<MeasurementSchema> schemas = new ArrayList<>();
|
|
schemas.add(new MeasurementSchema("workstation_id", TSDataType.INT64));
|
schemas.add(new MeasurementSchema("n", TSDataType.TEXT));
|
schemas.add(new MeasurementSchema("v", TSDataType.TEXT));
|
|
|
int rowIndex = 0;
|
|
Map<String, List<TypedTelemetryData>> processParamsMap = parseTelemetryToTypedMapList(dt);
|
|
|
String[] nameArr = processParamsMap.keySet().toArray(new String[0]);
|
String name;
|
for(int i=0;i<nameArr.length;i++) {
|
name = nameArr[i];
|
List<TypedTelemetryData> typeList = processParamsMap.get(name);
|
|
deviceId = generateDeviceId(dt.getWorkstationId(),name);
|
//System.out.println("deivcdid="+deviceId);
|
iotDBCommonService.setTemmplateIfNotSet(TEMPLATE_NAME, deviceId);
|
Tablet tablet = new Tablet(deviceId, schemas);
|
for(TypedTelemetryData tdata : typeList) {
|
rowIndex = tablet.rowSize++;
|
tablet.addTimestamp(rowIndex, tdata.getTime());
|
tablet.addValue("workstation_id",rowIndex,dt.getWorkstationId());
|
tablet.addValue("n",rowIndex,tdata.getName());
|
if(!tdata.getName().equals("Alarm")) {
|
tablet.addValue("v",rowIndex,tdata.getValue());
|
}else {
|
//告警信息根据原版需要处理一下,格式未json对象:{"timestamp":1718839644476,"code":1000,"msg":"EMERGENCY STOP","alarmtype":15,"level":""}
|
tablet.addValue("v",rowIndex,formatAlarmMsg(tdata.getTime(),tdata.getValue()));
|
}
|
}
|
|
try {
|
iotdbConfig.getSessionPool().insertAlignedTablet(tablet);
|
|
//updateLastParam(dt.getWorkstationId(),typeList);
|
} catch (Exception e) {
|
log.error("IOTDB入库失败",e);
|
e.printStackTrace();
|
}finally {
|
//iotdbConfig.getSessionPool().clo1se();
|
}
|
}
|
|
|
}
|
|
/**
|
* 将报警信息格式化未json对象(原版是jsonobj或者json数组),格式为 {"timestamp":1718839644476,"code":1000,"msg":"EMERGENCY STOP","alarmtype":15,"level":""}
|
* @param collectAlarmValue
|
* @return
|
*/
|
String formatAlarmMsg(long time,String collectAlarmValue){
|
JSONObject alarmObj = new JSONObject();
|
alarmObj.put("timestamp", time);
|
alarmObj.put("code", "00");
|
alarmObj.put("msg", collectAlarmValue);
|
alarmObj.put("alarmtype", 0);
|
alarmObj.put("level", "");
|
return alarmObj.toJSONString();
|
}
|
|
/**
|
* 填充最新参数的数据
|
* @param typeList
|
* @throws StatementExecutionException
|
* @throws IoTDBConnectionException
|
*/
|
void updateLastParam(long workstationId,List<TypedTelemetryData> typeList) throws IoTDBConnectionException, StatementExecutionException {
|
if(typeList.isEmpty()){
|
return;
|
}
|
long updateTime = typeList.get(0).getTime();
|
List<MeasurementSchema> schemas = new ArrayList<>();
|
|
schemas.add(new MeasurementSchema("update_time", TSDataType.INT64));
|
schemas.add(new MeasurementSchema("workstation_id", TSDataType.INT64));
|
schemas.add(new MeasurementSchema("param_json", TSDataType.TEXT));
|
|
Tablet tablet = new Tablet("root.f2.last_process_param", schemas);
|
|
|
String sql = "select update_time,workstation_id,param_json from root.f2.last_process_param where workstation_id="+workstationId;
|
|
try(SessionDataSetWrapper dsw = iotdbConfig.getSessionPool().executeQueryStatement(sql)){
|
if(dsw.hasNext()) {
|
RowRecord rec = dsw.next();
|
long time = rec.getTimestamp();
|
|
String paramJsonStr = rec.getFields().get(2).getStringValue();
|
|
tablet.rowSize = 1;
|
tablet.addTimestamp(0, time);
|
tablet.addValue("update_time", 0, updateTime);
|
tablet.addValue("workstation_id", 0, workstationId);
|
JSONObject paramObj = JSONObject.parseObject(paramJsonStr);
|
for(TypedTelemetryData tdata: typeList) {
|
|
if(paramObj.containsKey(tdata.getName())) {
|
JSONObject itemObj = paramObj.getJSONObject(tdata.getName());
|
itemObj.put("value", tdata.getValue());
|
itemObj.put("time", tdata.getTime());//采集时间
|
paramObj.put(tdata.getName(), itemObj);
|
}else {
|
JSONObject itemObj = new JSONObject();
|
itemObj.put("value", tdata.getValue());
|
itemObj.put("time", tdata.getTime());//采集时间
|
paramObj.put(tdata.getName(), itemObj);
|
|
}
|
}
|
tablet.addValue("param_json", 0, paramObj.toJSONString());
|
|
this.iotdbConfig.getSessionPool().insertAlignedTablet(tablet);
|
|
}else {
|
//没数据,新加入一条
|
tablet.rowSize = 1;
|
|
tablet.addTimestamp(0, updateTime);
|
tablet.addValue("update_time", 0, updateTime);
|
tablet.addValue("workstation_id", 0, workstationId);
|
|
JSONObject paramObj = new JSONObject();
|
for(TypedTelemetryData tdata: typeList) {
|
JSONObject itemObj = new JSONObject();
|
itemObj.put("value", tdata.getValue());
|
itemObj.put("time", tdata.getTime());//采集时间
|
paramObj.put(tdata.getName(), itemObj);
|
}
|
|
tablet.addValue("param_json", 0,paramObj.toJSONString());
|
this.iotdbConfig.getSessionPool().insertAlignedTablet(tablet);
|
}
|
}
|
//dsw.close();
|
}
|
|
/**
|
* 解析数据,形成name - value对类型的数据列表,并用name进行分组
|
* @param dt
|
* @return 按name分组后的数据
|
*/
|
Map<String, List<TypedTelemetryData>> parseTelemetryToTypedMapList(TelemetryData dt){
|
List<TypedTelemetryData> list = new ArrayList<>();
|
|
String propertyName;
|
for (TelemetryDataItem dataItem : dt.getDataItems()) {
|
|
for (Map<String, String> point : dataItem.getDataPoints()) {
|
|
String[] keys = point.keySet().toArray(new String[0]);
|
for(int i=0;i<keys.length;i++) {
|
TypedTelemetryData tpData = new TypedTelemetryData();
|
propertyName = keys[i];
|
tpData.setTime(dataItem.getTime());
|
tpData.setName(propertyName);
|
tpData.setValue(point.get(propertyName));
|
|
list.add(tpData);
|
}
|
|
}
|
|
}
|
|
Map<String, List<TypedTelemetryData>> map = list.stream().collect(Collectors.groupingBy(TypedTelemetryData::getName));
|
return map;
|
}
|
|
String generateDeviceId(long workstationId,String propertyName) {
|
return IOTDBConstant.DB_PREFIX+TEMPLATE_NAME + "_" + workstationId+"_"+propertyName;
|
}
|
}
|
|
/*
|
* 分类的数据
|
*/
|
class TypedTelemetryData{
|
private long time;
|
private String name;
|
private String value;
|
public long getTime() {
|
return time;
|
}
|
public void setTime(long time) {
|
this.time = time;
|
}
|
public String getName() {
|
return name;
|
}
|
public void setName(String name) {
|
this.name = name;
|
}
|
public String getValue() {
|
return value;
|
}
|
public void setValue(String value) {
|
this.value = value;
|
}
|
@Override
|
public String toString() {
|
return "TypedTelemetryData [time=" + time + ", name=" + name + ", value=" + value + "]";
|
}
|
|
|
}
|