package com.qianwen.mdc.collect.service;
|
|
import java.util.ArrayList;
|
import java.util.List;
|
import java.util.Set;
|
|
import org.apache.commons.lang3.StringUtils;
|
import org.slf4j.Logger;
|
import org.slf4j.LoggerFactory;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.stereotype.Service;
|
|
import com.alibaba.fastjson.JSONArray;
|
import com.alibaba.fastjson.JSONObject;
|
import com.qianwen.mdc.collect.domain.TelemetryData;
|
import com.qianwen.mdc.collect.domain.TelemetryDataItem;
|
import com.qianwen.mdc.collect.utils.redis.RedisUtil;
|
import com.qianwen.mdc.collect.vo.WorkstationDatapointsVO;
|
|
@Service
|
public class IOTMqttReceiveService {
|
public static final String NEWDP_SUFFIX = "_n";//计算规则使用之后新数据点的结尾
|
private static final Logger log = LoggerFactory.getLogger(IOTMqttReceiveService.class);
|
@Autowired
|
private CollectDataService collectDataService;
|
|
@Autowired
|
private PackedDataService packedDataService;
|
@Autowired
|
private WorkstationDatapointsService dpService;
|
|
/**
|
* 处理收到的消息,对应TelemetryDataPostingConsumer
|
* @param payload
|
*/
|
public void handle(String payload) {
|
//解析消息
|
List<TelemetryData> teleList = parsePayload(payload);
|
|
collectDataService.handleCollectData(teleList);
|
|
packedDataService.handle(teleList);
|
}
|
|
/**
|
* 解析消息体字符串未列表,每个列表项是一个工位的数据
|
* @param payload
|
* @return
|
*/
|
List<TelemetryData> parsePayload(String payload){
|
List<TelemetryData> dtList = new ArrayList<TelemetryData> ();
|
//多条 数据格式:{"174":[{"values":{"output":11},"ts":"1722478128278"},{"values":{"spindleSpeed":22},"ts":"1722478128281"}]} 174是应用id
|
//解析消息 name,value形式,如n=output,v=11
|
JSONObject jsonObj = JSONObject.parseObject(payload);
|
|
Set<String> keySet = jsonObj.keySet();
|
String[] keys = keySet.toArray(new String[] {});
|
|
|
for(String key : keys) {
|
String appId = key;//iot系统中的应用id,本应用中应该用表去对应
|
|
//获取工位数据点配置,只保存配置好的数据点,没有配置的采集数据抛弃。
|
final WorkstationDatapointsVO dpVo = dpService.getDatapointsByAppIdFromCache(appId);
|
//final WorkstationDatapointsVO dpVo = dpService.getDataPointByAppId(appId);
|
if(dpVo == null) {
|
//工位没有定义过数据点或者appId不匹配
|
log.warn("appId={}未找到数据点定义记录,丢弃数据",appId);
|
continue;
|
}
|
|
TelemetryData tdata = new TelemetryData();
|
//tdata.setWorkstationId(workstationId);
|
tdata.setAppId(appId);
|
tdata.setWorkstationId(dpVo.getWorkstationId());
|
|
JSONArray dtArr = jsonObj.getJSONArray(appId);
|
for(int i=0;i<dtArr.size();i++) {
|
|
TelemetryDataItem tdataItem = new TelemetryDataItem();
|
JSONObject jsonDataItem = dtArr.getJSONObject(i);//每一项中dataItem还有values对象和ts时间戳2个属性
|
|
long time = jsonDataItem.getLong("ts");
|
tdataItem.setTime(time);
|
JSONObject values = jsonDataItem.getJSONObject("values");
|
|
Set<String> valueKeySet = values.keySet();
|
valueKeySet.forEach(valueKey ->{
|
if(!dpVo.containsDataPoint(valueKey)) {
|
//如果不存在该数据点配置,该数据直接忽略
|
return;
|
}
|
|
String oriValueKey = valueKey;//由于使用计算规则的采集点名称会后面增加一个"_n",所以这个oriValueKey代表没有增加"_n"的
|
if(StringUtils.endsWith(valueKey, NEWDP_SUFFIX)) {
|
oriValueKey = StringUtils.removeEnd(valueKey, NEWDP_SUFFIX);
|
}
|
|
tdataItem.addPoint(oriValueKey,values.getString(valueKey));//使用原始配置点保持保存数据
|
});
|
|
tdata.addItem(tdataItem);
|
}
|
|
dtList.add(tdata);
|
}
|
|
return dtList;
|
}
|
|
}
|