1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package com.qianwen.mdc.collect.service;
 
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
 
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
 
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.qianwen.mdc.collect.domain.TelemetryData;
import com.qianwen.mdc.collect.domain.TelemetryDataItem;
import com.qianwen.mdc.collect.utils.redis.RedisUtil;
import com.qianwen.mdc.collect.vo.WorkstationDatapointsVO;
 
@Service
public class IOTMqttReceiveService {
    private static final Logger log = LoggerFactory.getLogger(IOTMqttReceiveService.class);
    @Autowired
    private CollectDataService collectDataService;
    
    @Autowired
    private PackedDataService packedDataService;
    @Autowired
    private RedisUtil redisUtil;
    @Autowired
    private WorkstationDatapointsService dpService;
    /**
     * 处理收到的消息,对应TelemetryDataPostingConsumer
     * @param payload
     */
    public void handle(String payload) {
        //解析消息
        List<TelemetryData> teleList = parsePayload(payload);
        
        collectDataService.handleCollectData(teleList);
        
        packedDataService.handle(teleList);
    }
    
    /**
     * 解析消息体字符串未列表,每个列表项是一个工位的数据
     * @param payload
     * @return
     */
    List<TelemetryData> parsePayload(String payload){
        List<TelemetryData> dtList = new ArrayList<TelemetryData> ();
        //多条 数据格式:{"174":[{"values":{"output":11},"ts":"1722478128278"},{"values":{"spindleSpeed":22},"ts":"1722478128281"}]}   174是应用id
        //解析消息 name,value形式,如n=output,v=11
        JSONObject jsonObj = JSONObject.parseObject(payload);
        
        Set<String> keySet = jsonObj.keySet();
        String[] keys = keySet.toArray(new String[] {});
        
        //WorkstationDatapointsVO dpVo;
        
        final String NEWDP_SUFFIX = "_n";//计算规则使用之后新数据点的结尾
        for(String key : keys) {
            String appId = key;//iot系统中的应用id,本应用中应该用表去对应
            
            
            //TODO 获取工位数据点配置,只保存配置好的数据点,没有配置的采集数据抛弃。
            final WorkstationDatapointsVO dpVo = dpService.getDatapointsByAppIdFromCache(appId);
            if(dpVo == null) {
                //工位没有定义过数据点或者appId不匹配
                log.warn("appId={}未找到数据点定义记录,丢弃数据",appId);
                continue;
            }
            
            TelemetryData tdata = new TelemetryData();
            //tdata.setWorkstationId(workstationId);
            tdata.setAppId(appId);
            tdata.setWorkstationId(dpVo.getWorkstationId());
            
            JSONArray dtArr = jsonObj.getJSONArray(appId);
            for(int i=0;i<dtArr.size();i++) {
                
                TelemetryDataItem tdataItem = new TelemetryDataItem();
                JSONObject jsonDataItem = dtArr.getJSONObject(i);//每一项中dataItem还有values对象和ts时间戳2个属性
                
                long time = jsonDataItem.getLong("ts");
                tdataItem.setTime(time);
                JSONObject values = jsonDataItem.getJSONObject("values");
                
                Set<String> valueKeySet = values.keySet();
                valueKeySet.forEach(valueKey ->{
                    String oriValueKey = valueKey;;//由于使用计算规则的采集点名称会后面增加一个"_n",所以这个oriValueKey代表没有增加"_n"的
                    if(StringUtils.endsWith(valueKey, NEWDP_SUFFIX)) {
                        oriValueKey = StringUtils.removeEnd(valueKey, NEWDP_SUFFIX);
                    }
                    if(!dpVo.containsDataPoint(oriValueKey)) {
                        //如果不存在该数据点配置,该数据直接忽略
                        return;
                    }
                    tdataItem.addPoint(oriValueKey,values.getString(valueKey));//使用原始配置点保持保存数据
                });
                
                tdata.addItem(tdataItem);
            }
            
            dtList.add(tdata);
        }
        
        return dtList;
    }
    
    /**
     * 根据对应表缓存,获取appId对应的id
     * @param appId
     * @return
     */
    /*
    public Long getWorkstationIdByAppId(String appId) {
 
        Object wid = redisUtil.hget("workstation-appid-map", appId);
        
        String workstationId = String.valueOf(Optional.ofNullable(wid).orElse(StringUtils.EMPTY));
        if(ObjectUtil.isEmpty(workstationId)) {
            return null;
        }
        return Long.parseLong(workstationId);
    }
    */
}