package com.qianwen.mdc.collect.service; import java.util.ArrayList; import java.util.List; import java.util.Set; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.qianwen.mdc.collect.domain.TelemetryData; import com.qianwen.mdc.collect.domain.TelemetryDataItem; import com.qianwen.mdc.collect.utils.redis.RedisUtil; import com.qianwen.mdc.collect.vo.WorkstationDatapointsVO; @Service public class IOTMqttReceiveService { private static final Logger log = LoggerFactory.getLogger(IOTMqttReceiveService.class); @Autowired private CollectDataService collectDataService; @Autowired private PackedDataService packedDataService; @Autowired private WorkstationDatapointsService dpService; /** * 处理收到的消息,对应TelemetryDataPostingConsumer * @param payload */ public void handle(String payload) { //解析消息 List teleList = parsePayload(payload); collectDataService.handleCollectData(teleList); packedDataService.handle(teleList); } /** * 解析消息体字符串未列表,每个列表项是一个工位的数据 * @param payload * @return */ List parsePayload(String payload){ List dtList = new ArrayList (); //多条 数据格式:{"174":[{"values":{"output":11},"ts":"1722478128278"},{"values":{"spindleSpeed":22},"ts":"1722478128281"}]} 174是应用id //解析消息 name,value形式,如n=output,v=11 JSONObject jsonObj = JSONObject.parseObject(payload); Set keySet = jsonObj.keySet(); String[] keys = keySet.toArray(new String[] {}); //WorkstationDatapointsVO dpVo; final String NEWDP_SUFFIX = "_n";//计算规则使用之后新数据点的结尾 for(String key : keys) { String appId = key;//iot系统中的应用id,本应用中应该用表去对应 //TODO 获取工位数据点配置,只保存配置好的数据点,没有配置的采集数据抛弃。 final WorkstationDatapointsVO dpVo = dpService.getDatapointsByAppIdFromCache(appId); if(dpVo == null) { //工位没有定义过数据点或者appId不匹配 log.warn("appId={}未找到数据点定义记录,丢弃数据",appId); continue; } TelemetryData tdata = new TelemetryData(); //tdata.setWorkstationId(workstationId); tdata.setAppId(appId); tdata.setWorkstationId(dpVo.getWorkstationId()); JSONArray dtArr = jsonObj.getJSONArray(appId); for(int i=0;i valueKeySet = values.keySet(); valueKeySet.forEach(valueKey ->{ String oriValueKey = valueKey;;//由于使用计算规则的采集点名称会后面增加一个"_n",所以这个oriValueKey代表没有增加"_n"的 if(StringUtils.endsWith(valueKey, NEWDP_SUFFIX)) { oriValueKey = StringUtils.removeEnd(valueKey, NEWDP_SUFFIX); } if(!dpVo.containsDataPoint(oriValueKey)) { //如果不存在该数据点配置,该数据直接忽略 return; } tdataItem.addPoint(oriValueKey,values.getString(valueKey));//使用原始配置点保持保存数据 }); tdata.addItem(tdataItem); } dtList.add(tdata); } return dtList; } }