package com.qianwen.mdc.collect.service;
|
|
import java.util.ArrayList;
|
import java.util.List;
|
import java.util.Map;
|
import java.util.Set;
|
|
import org.apache.commons.lang3.StringUtils;
|
import org.slf4j.Logger;
|
import org.slf4j.LoggerFactory;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.stereotype.Service;
|
|
import com.alibaba.fastjson.JSONArray;
|
import com.alibaba.fastjson.JSONObject;
|
import com.qianwen.mdc.collect.constants.IOTDBConstant;
|
import com.qianwen.mdc.collect.domain.TelemetryData;
|
import com.qianwen.mdc.collect.domain.TelemetryDataItem;
|
import com.qianwen.mdc.collect.enums.DefaultWcsEnum;
|
import com.qianwen.mdc.collect.utils.redis.RedisUtil;
|
import com.qianwen.mdc.collect.vo.DeviceStateCheckConfigVO;
|
import com.qianwen.mdc.collect.vo.HistoryDeviceState;
|
import com.qianwen.mdc.collect.vo.WorkstationDatapointsVO;
|
|
@Service
|
public class IOTMqttReceiveService {
|
public static final String NEWDP_SUFFIX = "_n";//计算规则使用之后新数据点的结尾
|
private static final Logger log = LoggerFactory.getLogger(IOTMqttReceiveService.class);
|
@Autowired
|
private CollectDataService collectDataService;
|
|
@Autowired
|
private PackedDataService packedDataService;
|
@Autowired
|
private WorkstationDatapointsService dpService;
|
@Autowired
|
private DeviceStatusCheckCfgLoader cfgLoader;
|
/**
|
* 处理收到的消息,对应TelemetryDataPostingConsumer
|
* @param payload
|
*/
|
public void handle(String payload) {
|
//解析消息
|
List<TelemetryData> teleList = parsePayload(payload);
|
|
|
checkDeviceStatus(teleList);
|
|
collectDataService.handleCollectData(teleList);
|
|
packedDataService.handle(teleList);
|
}
|
|
/**
|
* 检查设备状态,数据,如果是闪烁状态(运行,待机来回切换),规则是在数据表定义的
|
* @param teleList
|
*/
|
void checkDeviceStatus(List<TelemetryData> teleList){
|
DeviceStateCheckConfigVO chkConfigVO = cfgLoader.getCheckConfigVOFromCache();
|
//long interval = cfgLoader.getCheckConfigVOFromCache().getInterval();
|
int wcs;
|
|
for(TelemetryData tdata:teleList) {//tdata.getWorkstationId()
|
for(TelemetryDataItem dataItem:tdata.getDataItems()) {
|
List<Map<String,String>> points = dataItem.getDataPoints();
|
for(Map<String,String> pt : points) {
|
if(pt.containsKey(IOTDBConstant.DEVICE_STATUS_KEY)){
|
String val = pt.get(IOTDBConstant.DEVICE_STATUS_KEY);
|
wcs = Integer.parseInt(val);
|
//!!!code范围检查
|
computeDeviceStatus(tdata.getWorkstationId(),wcs,dataItem.getTime(),pt,chkConfigVO);
|
}
|
}
|
}
|
}
|
//di.getTime()
|
//tdata.getWorkstationId();
|
|
|
}
|
|
/**
|
* 核实并计算状态值,如果有灯闪烁情况,则判定为待机
|
* @param workstationId
|
* @param wcs 状态值
|
* @param pointTime 数据点的时间戳
|
* @param point 数据点key和val
|
* @param interval 检查的时间跨度,毫秒
|
*/
|
void computeDeviceStatus(long workstationId,int wcs,long pointTime,Map<String,String> point,DeviceStateCheckConfigVO chkConfigVO) {
|
|
long interval = chkConfigVO.getInterval();
|
if(wcs != DefaultWcsEnum.RUNNING.getCode() || !chkConfigVO.containsWorkstation(workstationId)) {//仅针对运行状态的才进行状态核实
|
return;
|
}
|
|
List<HistoryDeviceState> statusHistoryList = cfgLoader.getHistoryList(workstationId);// DEVICE_STTUS_MAP.get(workstationId);//何时初始化内部数据??
|
|
//pointTime-interval 3秒内
|
long count = statusHistoryList.stream().filter(his -> {
|
return his.getTime()>(pointTime-interval) && his.getDeviceStatus() != DefaultWcsEnum.RUNNING.getCode();
|
}).count();
|
log.info("checkstatus count={}",count);
|
if(count > 0) {
|
//灯闪烁,判定为待机,这里修改设备状态的数据
|
point.put(IOTDBConstant.DEVICE_STATUS_KEY, DefaultWcsEnum.STANDBY.getCode()+"");
|
}
|
HistoryDeviceState his = new HistoryDeviceState();
|
his.setTime(pointTime);
|
his.setDeviceStatus(wcs);
|
his.setWorkstationId(workstationId);
|
statusHistoryList.add(his);
|
|
}
|
/**
|
* 解析消息体字符串未列表,每个列表项是一个工位的数据
|
* @param payload
|
* @return
|
*/
|
List<TelemetryData> parsePayload(String payload){
|
List<TelemetryData> dtList = new ArrayList<TelemetryData> ();
|
//多条 数据格式:{"174":[{"values":{"output":11},"ts":"1722478128278"},{"values":{"spindleSpeed":22},"ts":"1722478128281"}]} 174是应用id
|
//解析消息 name,value形式,如n=output,v=11
|
JSONObject jsonObj = JSONObject.parseObject(payload);
|
|
Set<String> keySet = jsonObj.keySet();
|
String[] keys = keySet.toArray(new String[] {});
|
|
|
for(String key : keys) {
|
String appId = key;//iot系统中的应用id,本应用中应该用表去对应
|
|
//获取工位数据点配置,只保存配置好的数据点,没有配置的采集数据抛弃。
|
final WorkstationDatapointsVO dpVo = dpService.getDatapointsByAppIdFromCache(appId);
|
|
if(dpVo == null) {
|
//工位没有定义过数据点或者appId不匹配
|
log.warn("appId={}未找到数据点定义记录,丢弃数据",appId);
|
continue;
|
}
|
|
TelemetryData tdata = new TelemetryData();
|
//tdata.setWorkstationId(workstationId);
|
tdata.setAppId(appId);
|
tdata.setWorkstationId(dpVo.getWorkstationId());
|
|
JSONArray dtArr = jsonObj.getJSONArray(appId);
|
for(int i=0;i<dtArr.size();i++) {
|
|
TelemetryDataItem tdataItem = new TelemetryDataItem();
|
JSONObject jsonDataItem = dtArr.getJSONObject(i);//每一项中dataItem还有values对象和ts时间戳2个属性
|
|
long time = jsonDataItem.getLong("ts");
|
tdataItem.setTime(time);
|
JSONObject values = jsonDataItem.getJSONObject("values");
|
|
Set<String> valueKeySet = values.keySet();
|
valueKeySet.forEach(valueKey ->{
|
if(!dpVo.containsDataPoint(valueKey)) {
|
//如果不存在该数据点配置,该数据直接忽略
|
return;
|
}
|
|
String oriValueKey = valueKey;//由于使用计算规则的采集点名称会后面增加一个"_n",所以这个oriValueKey代表没有增加"_n"的
|
if(StringUtils.endsWith(valueKey, NEWDP_SUFFIX)) {
|
oriValueKey = StringUtils.removeEnd(valueKey, NEWDP_SUFFIX);
|
}
|
|
tdataItem.addPoint(oriValueKey,values.getString(valueKey));//使用原始配置点保持保存数据
|
});
|
|
tdata.addItem(tdataItem);
|
}
|
|
dtList.add(tdata);
|
}
|
|
return dtList;
|
}
|
|
}
|