package com.qianwen.mdc.collect.service; import java.util.ArrayList; import java.util.List; import java.util.Optional; import java.util.Set; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.qianwen.mdc.collect.domain.TelemetryData; import com.qianwen.mdc.collect.domain.TelemetryDataItem; import com.qianwen.mdc.collect.utils.redis.RedisUtil; @Service public class IOTMqttReceiveService { private static final Logger log = LoggerFactory.getLogger(IOTMqttReceiveService.class); @Autowired private CollectDataService collectDataService; @Autowired private PackedDataService packedDataService; @Autowired private RedisUtil redisUtil; /** * 处理收到的消息,对应TelemetryDataPostingConsumer * @param payload */ public void handle(String payload) { //System.out.println("Received message122: " + payload); //解析消息 List teleList = parsePayload(payload); collectDataService.handleCollectData(teleList); packedDataService.handle(teleList); } /** * 解析消息体字符串未列表,每个列表项是一个工位的数据 * @param payload * @return */ List parsePayload(String payload){ List dtList = new ArrayList (); //数据格式:{"174":[{"values":{"d1":12},"ts":"1721978780449"}]} 174是应用id //多条格式:{"174":[{"values":{"output":11},"ts":"1722478128278"},{"values":{"spindleSpeed":22},"ts":"1722478128281"}]} //解析消息 name,value形式,如n=output,v=11 JSONObject jsonObj = JSONObject.parseObject(payload); Set keySet = jsonObj.keySet(); String[] keys = keySet.toArray(new String[] {}); final String NEWDP_SUFFIX = "_n";//计算规则使用之后新数据点的结尾 for(String key : keys) { String appId = key;//iot系统中的应用id,本应用中应该用表去对应 long workstationId = getWorkstationIdByAppId(appId); TelemetryData tdata = new TelemetryData(); tdata.setWorkstationId(workstationId); JSONArray dtArr = jsonObj.getJSONArray(appId); for(int i=0;i valueKeySet = values.keySet(); valueKeySet.forEach(valueKey ->{ String oriValueKey = valueKey;;//由于使用计算规则的采集点名称会后面增加一个"_n",所以这个oriValueKey代表没有增加"_n"的 if(StringUtils.endsWith(valueKey, NEWDP_SUFFIX)) { oriValueKey = StringUtils.removeEnd(valueKey, NEWDP_SUFFIX); } tdataItem.addPoint(oriValueKey,values.getString(valueKey));//使用原始配置点保持保存数据 }); tdata.addItem(tdataItem); } dtList.add(tdata); } return dtList; } /** * 根据对应表缓存,获取appId对应的id * @param appId * @return */ public long getWorkstationIdByAppId(String appId) { Object wid = redisUtil.hget("workstation-appid-map", appId); String workstationId = String.valueOf(Optional.ofNullable(wid).orElse(appId)); return Long.parseLong(workstationId); } }