package com.qianwen.mdc.collect.service; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Set; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.qianwen.mdc.collect.constants.IOTDBConstant; import com.qianwen.mdc.collect.domain.TelemetryData; import com.qianwen.mdc.collect.domain.TelemetryDataItem; import com.qianwen.mdc.collect.enums.DefaultWcsEnum; import com.qianwen.mdc.collect.utils.redis.RedisUtil; import com.qianwen.mdc.collect.vo.DeviceStateCheckConfigVO; import com.qianwen.mdc.collect.vo.HistoryDeviceState; import com.qianwen.mdc.collect.vo.WorkstationDatapointsVO; @Service public class IOTMqttReceiveService { public static final String NEWDP_SUFFIX = "_n";//计算规则使用之后新数据点的结尾 private static final Logger log = LoggerFactory.getLogger(IOTMqttReceiveService.class); @Autowired private CollectDataService collectDataService; @Autowired private PackedDataService packedDataService; @Autowired private WorkstationDatapointsService dpService; @Autowired private DeviceStatusCheckCfgLoader cfgLoader; /** * 处理收到的消息,对应TelemetryDataPostingConsumer * @param payload */ public void handle(String payload) { //解析消息 List teleList = parsePayload(payload); checkDeviceStatus(teleList); collectDataService.handleCollectData(teleList); packedDataService.handle(teleList); } /** * 检查设备状态,数据,如果是闪烁状态(运行,待机来回切换),规则是在数据表定义的 * @param teleList */ void checkDeviceStatus(List teleList){ DeviceStateCheckConfigVO chkConfigVO = cfgLoader.getCheckConfigVOFromCache(); //long interval = cfgLoader.getCheckConfigVOFromCache().getInterval(); int wcs; for(TelemetryData tdata:teleList) {//tdata.getWorkstationId() for(TelemetryDataItem dataItem:tdata.getDataItems()) { List> points = dataItem.getDataPoints(); for(Map pt : points) { if(pt.containsKey(IOTDBConstant.DEVICE_STATUS_KEY)){ String val = pt.get(IOTDBConstant.DEVICE_STATUS_KEY); wcs = Integer.parseInt(val); //!!!code范围检查 computeDeviceStatus(tdata.getWorkstationId(),wcs,dataItem.getTime(),pt,chkConfigVO); } } } } //di.getTime() //tdata.getWorkstationId(); } /** * 核实并计算状态值,如果有灯闪烁情况,则判定为待机 * @param workstationId * @param wcs 状态值 * @param pointTime 数据点的时间戳 * @param point 数据点key和val * @param interval 检查的时间跨度,毫秒 */ void computeDeviceStatus(long workstationId,int wcs,long pointTime,Map point,DeviceStateCheckConfigVO chkConfigVO) { long interval = chkConfigVO.getInterval(); if(wcs != DefaultWcsEnum.RUNNING.getCode() || !chkConfigVO.containsWorkstation(workstationId)) {//仅针对运行状态的才进行状态核实 return; } List statusHistoryList = cfgLoader.getHistoryList(workstationId);// DEVICE_STTUS_MAP.get(workstationId);//何时初始化内部数据?? //pointTime-interval 3秒内 long count = statusHistoryList.stream().filter(his -> { return his.getTime()>(pointTime-interval) && his.getDeviceStatus() != DefaultWcsEnum.RUNNING.getCode(); }).count(); log.info("checkstatus count={}",count); if(count > 0) { //灯闪烁,判定为待机,这里修改设备状态的数据 point.put(IOTDBConstant.DEVICE_STATUS_KEY, DefaultWcsEnum.STANDBY.getCode()+""); } HistoryDeviceState his = new HistoryDeviceState(); his.setTime(pointTime); his.setDeviceStatus(wcs); his.setWorkstationId(workstationId); statusHistoryList.add(his); } /** * 解析消息体字符串未列表,每个列表项是一个工位的数据 * @param payload * @return */ List parsePayload(String payload){ List dtList = new ArrayList (); //多条 数据格式:{"174":[{"values":{"output":11},"ts":"1722478128278"},{"values":{"spindleSpeed":22},"ts":"1722478128281"}]} 174是应用id //解析消息 name,value形式,如n=output,v=11 JSONObject jsonObj = JSONObject.parseObject(payload); Set keySet = jsonObj.keySet(); String[] keys = keySet.toArray(new String[] {}); for(String key : keys) { String appId = key;//iot系统中的应用id,本应用中应该用表去对应 //获取工位数据点配置,只保存配置好的数据点,没有配置的采集数据抛弃。 final WorkstationDatapointsVO dpVo = dpService.getDatapointsByAppIdFromCache(appId); if(dpVo == null) { //工位没有定义过数据点或者appId不匹配 log.warn("appId={}未找到数据点定义记录,丢弃数据",appId); continue; } TelemetryData tdata = new TelemetryData(); //tdata.setWorkstationId(workstationId); tdata.setAppId(appId); tdata.setWorkstationId(dpVo.getWorkstationId()); JSONArray dtArr = jsonObj.getJSONArray(appId); for(int i=0;i valueKeySet = values.keySet(); valueKeySet.forEach(valueKey ->{ if(!dpVo.containsDataPoint(valueKey)) { //如果不存在该数据点配置,该数据直接忽略 return; } String oriValueKey = valueKey;//由于使用计算规则的采集点名称会后面增加一个"_n",所以这个oriValueKey代表没有增加"_n"的 if(StringUtils.endsWith(valueKey, NEWDP_SUFFIX)) { oriValueKey = StringUtils.removeEnd(valueKey, NEWDP_SUFFIX); } tdataItem.addPoint(oriValueKey,values.getString(valueKey));//使用原始配置点保持保存数据 }); tdata.addItem(tdataItem); } dtList.add(tdata); } return dtList; } }