| | |
| | | |
| | | import java.util.ArrayList; |
| | | import java.util.List; |
| | | import java.util.Optional; |
| | | import java.util.Set; |
| | | |
| | | import org.apache.commons.lang3.StringUtils; |
| | |
| | | import com.qianwen.mdc.collect.domain.TelemetryData; |
| | | import com.qianwen.mdc.collect.domain.TelemetryDataItem; |
| | | import com.qianwen.mdc.collect.utils.redis.RedisUtil; |
| | | import com.qianwen.mdc.collect.vo.WorkstationDatapointsVO; |
| | | |
| | | @Service |
| | | public class IOTMqttReceiveService { |
| | |
| | | @Autowired |
| | | private PackedDataService packedDataService; |
| | | @Autowired |
| | | private RedisUtil redisUtil; |
| | | private RedisUtil redisUtil; |
| | | @Autowired |
| | | private WorkstationDatapointsService dpService; |
| | | /** |
| | | * 处理收到的消息,对应TelemetryDataPostingConsumer |
| | | * @param payload |
| | | */ |
| | | public void handle(String payload) { |
| | | //System.out.println("Received message122: " + payload); |
| | | //解析消息 |
| | | List<TelemetryData> teleList = parsePayload(payload); |
| | | |
| | |
| | | */ |
| | | List<TelemetryData> parsePayload(String payload){ |
| | | List<TelemetryData> dtList = new ArrayList<TelemetryData> (); |
| | | //数据格式:{"174":[{"values":{"d1":12},"ts":"1721978780449"}]} 174是应用id |
| | | //多条格式:{"174":[{"values":{"output":11},"ts":"1722478128278"},{"values":{"spindleSpeed":22},"ts":"1722478128281"}]} |
| | | //多条 数据格式:{"174":[{"values":{"output":11},"ts":"1722478128278"},{"values":{"spindleSpeed":22},"ts":"1722478128281"}]} 174是应用id |
| | | //解析消息 name,value形式,如n=output,v=11 |
| | | JSONObject jsonObj = JSONObject.parseObject(payload); |
| | | |
| | | Set<String> keySet = jsonObj.keySet(); |
| | | String[] keys = keySet.toArray(new String[] {}); |
| | | |
| | | //WorkstationDatapointsVO dpVo; |
| | | |
| | | final String NEWDP_SUFFIX = "_n";//计算规则使用之后新数据点的结尾 |
| | | for(String key : keys) { |
| | | String appId = key;//iot系统中的应用id,本应用中应该用表去对应 |
| | | long workstationId = getWorkstationIdByAppId(appId); |
| | | |
| | | |
| | | //TODO 获取工位数据点配置,只保存配置好的数据点,没有配置的采集数据抛弃。 |
| | | final WorkstationDatapointsVO dpVo = dpService.getDatapointsByAppIdFromCache(appId); |
| | | if(dpVo == null) { |
| | | //工位没有定义过数据点或者appId不匹配 |
| | | log.warn("appId={}未找到数据点定义记录,丢弃数据",appId); |
| | | continue; |
| | | } |
| | | |
| | | TelemetryData tdata = new TelemetryData(); |
| | | tdata.setWorkstationId(workstationId); |
| | | |
| | | //tdata.setWorkstationId(workstationId); |
| | | tdata.setAppId(appId); |
| | | tdata.setWorkstationId(dpVo.getWorkstationId()); |
| | | |
| | | JSONArray dtArr = jsonObj.getJSONArray(appId); |
| | | for(int i=0;i<dtArr.size();i++) { |
| | |
| | | if(StringUtils.endsWith(valueKey, NEWDP_SUFFIX)) { |
| | | oriValueKey = StringUtils.removeEnd(valueKey, NEWDP_SUFFIX); |
| | | } |
| | | if(!dpVo.containsDataPoint(oriValueKey)) { |
| | | //如果不存在该数据点配置,该数据直接忽略 |
| | | return; |
| | | } |
| | | tdataItem.addPoint(oriValueKey,values.getString(valueKey));//使用原始配置点保持保存数据 |
| | | }); |
| | | |
| | |
| | | * @param appId |
| | | * @return |
| | | */ |
| | | public long getWorkstationIdByAppId(String appId) { |
| | | /* |
| | | public Long getWorkstationIdByAppId(String appId) { |
| | | |
| | | Object wid = redisUtil.hget("workstation-appid-map", appId); |
| | | String workstationId = String.valueOf(Optional.ofNullable(wid).orElse(appId)); |
| | | |
| | | |
| | | String workstationId = String.valueOf(Optional.ofNullable(wid).orElse(StringUtils.EMPTY)); |
| | | if(ObjectUtil.isEmpty(workstationId)) { |
| | | return null; |
| | | } |
| | | return Long.parseLong(workstationId); |
| | | } |
| | | */ |
| | | } |