yangys
2024-11-03 d187cd0fa46d01ec293e2aba1a1e54fdfab2ec80
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
package com.qianwen.mdc.collect.service;
 
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
 
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
 
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.qianwen.mdc.collect.domain.TelemetryData;
import com.qianwen.mdc.collect.domain.TelemetryDataItem;
import com.qianwen.mdc.collect.utils.redis.RedisUtil;
import com.qianwen.mdc.collect.vo.WorkstationDatapointsVO;
 
@Service
public class IOTMqttReceiveService {
    public static final String NEWDP_SUFFIX = "_n";//计算规则使用之后新数据点的结尾
    private static final Logger log = LoggerFactory.getLogger(IOTMqttReceiveService.class);
    @Autowired
    private CollectDataService collectDataService;
    
    @Autowired
    private PackedDataService packedDataService;
    @Autowired
    private WorkstationDatapointsService dpService;
    
    /**
     * 处理收到的消息,对应TelemetryDataPostingConsumer
     * @param payload
     */
    public void handle(String payload) {
        //解析消息
        List<TelemetryData> teleList = parsePayload(payload);
        
        collectDataService.handleCollectData(teleList);
        
        packedDataService.handle(teleList);
    }
    
    /**
     * 解析消息体字符串未列表,每个列表项是一个工位的数据
     * @param payload
     * @return
     */
    List<TelemetryData> parsePayload(String payload){
        List<TelemetryData> dtList = new ArrayList<TelemetryData> ();
        //多条 数据格式:{"174":[{"values":{"output":11},"ts":"1722478128278"},{"values":{"spindleSpeed":22},"ts":"1722478128281"}]}   174是应用id
        //解析消息 name,value形式,如n=output,v=11
        JSONObject jsonObj = JSONObject.parseObject(payload);
        
        Set<String> keySet = jsonObj.keySet();
        String[] keys = keySet.toArray(new String[] {});
        
        
        for(String key : keys) {
            String appId = key;//iot系统中的应用id,本应用中应该用表去对应
            
            //获取工位数据点配置,只保存配置好的数据点,没有配置的采集数据抛弃。
            final WorkstationDatapointsVO dpVo = dpService.getDatapointsByAppIdFromCache(appId);
            //final WorkstationDatapointsVO dpVo = dpService.getDataPointByAppId(appId);
            if(dpVo == null) {
                //工位没有定义过数据点或者appId不匹配
                log.warn("appId={}未找到数据点定义记录,丢弃数据",appId);
                continue;
            }
            
            TelemetryData tdata = new TelemetryData();
            //tdata.setWorkstationId(workstationId);
            tdata.setAppId(appId);
            tdata.setWorkstationId(dpVo.getWorkstationId());
            
            JSONArray dtArr = jsonObj.getJSONArray(appId);
            for(int i=0;i<dtArr.size();i++) {
                
                TelemetryDataItem tdataItem = new TelemetryDataItem();
                JSONObject jsonDataItem = dtArr.getJSONObject(i);//每一项中dataItem还有values对象和ts时间戳2个属性
                
                long time = jsonDataItem.getLong("ts");
                tdataItem.setTime(time);
                JSONObject values = jsonDataItem.getJSONObject("values");
                
                Set<String> valueKeySet = values.keySet();
                valueKeySet.forEach(valueKey ->{
                    if(!dpVo.containsDataPoint(valueKey)) {
                        //如果不存在该数据点配置,该数据直接忽略
                        return;
                    }
                    
                    String oriValueKey = valueKey;//由于使用计算规则的采集点名称会后面增加一个"_n",所以这个oriValueKey代表没有增加"_n"的
                    if(StringUtils.endsWith(valueKey, NEWDP_SUFFIX)) {
                        oriValueKey = StringUtils.removeEnd(valueKey, NEWDP_SUFFIX);
                    }
                    
                    tdataItem.addPoint(oriValueKey,values.getString(valueKey));//使用原始配置点保持保存数据
                });
                
                tdata.addItem(tdataItem);
            }
            
            dtList.add(tdata);
        }
        
        return dtList;
    }
    
}