yangys
2024-10-09 7ef593e1e3c35aaeecf9318f0b3941230d3ed002
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
package com.qianwen.mdc.collect.service;
 
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Set;
 
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
 
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.qianwen.mdc.collect.domain.TelemetryData;
import com.qianwen.mdc.collect.domain.TelemetryDataItem;
import com.qianwen.mdc.collect.utils.redis.RedisUtil;
 
@Service
public class IOTMqttReceiveService {
    private static final Logger log = LoggerFactory.getLogger(IOTMqttReceiveService.class);
    @Autowired
    private CollectDataService collectDataService;
    
    @Autowired
    private PackedDataService packedDataService;
    @Autowired
    private  RedisUtil redisUtil;
    /**
     * 处理收到的消息,对应TelemetryDataPostingConsumer
     * @param payload
     */
    public void handle(String payload) {
        //System.out.println("Received message122: " + payload);
        //解析消息
        List<TelemetryData> teleList = parsePayload(payload);
        
        collectDataService.handleCollectData(teleList);
        
        packedDataService.handle(teleList);
    }
    
    /**
     * 解析消息体字符串未列表,每个列表项是一个工位的数据
     * @param payload
     * @return
     */
    List<TelemetryData> parsePayload(String payload){
        List<TelemetryData> dtList = new ArrayList<TelemetryData> ();
        //数据格式:{"174":[{"values":{"d1":12},"ts":"1721978780449"}]} 174是应用id
        //多条格式:{"174":[{"values":{"output":11},"ts":"1722478128278"},{"values":{"spindleSpeed":22},"ts":"1722478128281"}]}
        //解析消息 name,value形式,如n=output,v=11
        JSONObject jsonObj = JSONObject.parseObject(payload);
        
        Set<String> keySet = jsonObj.keySet();
        String[] keys = keySet.toArray(new String[] {});
        
        final String NEWDP_SUFFIX = "_n";//计算规则使用之后新数据点的结尾
        for(String key : keys) {
            String appId = key;//iot系统中的应用id,本应用中应该用表去对应
            long workstationId = getWorkstationIdByAppId(appId);
            
            TelemetryData tdata = new TelemetryData();
            tdata.setWorkstationId(workstationId);
            
            
            JSONArray dtArr = jsonObj.getJSONArray(appId);
            for(int i=0;i<dtArr.size();i++) {
                
                TelemetryDataItem tdataItem = new TelemetryDataItem();
                JSONObject jsonDataItem = dtArr.getJSONObject(i);//每一项中dataItem还有values对象和ts时间戳2个属性
                
                long time = jsonDataItem.getLong("ts");
                tdataItem.setTime(time);
                JSONObject values = jsonDataItem.getJSONObject("values");
                
                Set<String> valueKeySet = values.keySet();
                valueKeySet.forEach(valueKey ->{
                    String oriValueKey = valueKey;;//由于使用计算规则的采集点名称会后面增加一个"_n",所以这个oriValueKey代表没有增加"_n"的
                    if(StringUtils.endsWith(valueKey, NEWDP_SUFFIX)) {
                        oriValueKey = StringUtils.removeEnd(valueKey, NEWDP_SUFFIX);
                    }
                    tdataItem.addPoint(oriValueKey,values.getString(valueKey));//使用原始配置点保持保存数据
                });
                
                tdata.addItem(tdataItem);
            }
            
            dtList.add(tdata);
        }
        
        return dtList;
    }
    
    /**
     * 根据对应表缓存,获取appId对应的id
     * @param appId
     * @return
     */
    public long getWorkstationIdByAppId(String appId) {
 
        Object wid = redisUtil.hget("workstation-appid-map", appId);
        String workstationId = String.valueOf(Optional.ofNullable(wid).orElse(appId));
    
        return Long.parseLong(workstationId);
    }
}