yangys
2025-05-09 e0d3490c2c785a9e94b3ef5128be97da2978956f
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
package com.qianwen.mdc.collect.service;
 
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
 
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
 
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.qianwen.mdc.collect.constants.IOTDBConstant;
import com.qianwen.mdc.collect.domain.TelemetryData;
import com.qianwen.mdc.collect.domain.TelemetryDataItem;
import com.qianwen.mdc.collect.enums.DefaultWcsEnum;
import com.qianwen.mdc.collect.utils.redis.RedisUtil;
import com.qianwen.mdc.collect.vo.DeviceStateCheckConfigVO;
import com.qianwen.mdc.collect.vo.HistoryDeviceState;
import com.qianwen.mdc.collect.vo.WorkstationDatapointsVO;
 
@Service
public class IOTMqttReceiveService {
    public static final String NEWDP_SUFFIX = "_n";//计算规则使用之后新数据点的结尾
    private static final Logger log = LoggerFactory.getLogger(IOTMqttReceiveService.class);
    @Autowired
    private CollectDataService collectDataService;
    
    @Autowired
    private PackedDataService packedDataService;
    @Autowired
    private WorkstationDatapointsService dpService;
    @Autowired
    private DeviceStatusCheckCfgLoader cfgLoader;
    /**
     * 处理收到的消息,对应TelemetryDataPostingConsumer
     * @param payload
     */
    public void handle(String payload) {
        //解析消息
        List<TelemetryData> teleList = parsePayload(payload);
        
        
        checkDeviceStatus(teleList);
        
        collectDataService.handleCollectData(teleList);
        
        packedDataService.handle(teleList);
    }
    
    /**
     * 检查设备状态,数据,如果是闪烁状态(运行,待机来回切换),规则是在数据表定义的
     * @param teleList
     */
    void checkDeviceStatus(List<TelemetryData> teleList){
        DeviceStateCheckConfigVO chkConfigVO = cfgLoader.getCheckConfigVOFromCache();
        //long interval = cfgLoader.getCheckConfigVOFromCache().getInterval();
        int wcs;
        
        for(TelemetryData tdata:teleList) {//tdata.getWorkstationId()
            for(TelemetryDataItem dataItem:tdata.getDataItems()) {
                List<Map<String,String>> points = dataItem.getDataPoints();
                for(Map<String,String> pt : points) {
                    if(pt.containsKey(IOTDBConstant.DEVICE_STATUS_KEY)){
                        String val = pt.get(IOTDBConstant.DEVICE_STATUS_KEY);
                        wcs = Integer.parseInt(val);
                        //!!!code范围检查
                        computeDeviceStatus(tdata.getWorkstationId(),wcs,dataItem.getTime(),pt,chkConfigVO);
                    }
                }
            }
        }
        //di.getTime()
        //tdata.getWorkstationId();
        
        
    }
    
    /**
     * 核实并计算状态值,如果有灯闪烁情况,则判定为待机
     * @param workstationId
     * @param wcs 状态值
     * @param pointTime 数据点的时间戳
     * @param point 数据点key和val
     * @param interval 检查的时间跨度,毫秒
     */
    void computeDeviceStatus(long workstationId,int wcs,long pointTime,Map<String,String> point,DeviceStateCheckConfigVO chkConfigVO) {
        
        long interval = chkConfigVO.getInterval();
        if(wcs != DefaultWcsEnum.RUNNING.getCode() || !chkConfigVO.containsWorkstation(workstationId)) {//仅针对运行状态的才进行状态核实
            return;
        }
        
        List<HistoryDeviceState> statusHistoryList = cfgLoader.getHistoryList(workstationId);// DEVICE_STTUS_MAP.get(workstationId);//何时初始化内部数据??
        
        //pointTime-interval 3秒内
        long count = statusHistoryList.stream().filter(his -> {
            return his.getTime()>(pointTime-interval) && his.getDeviceStatus() != DefaultWcsEnum.RUNNING.getCode();
        }).count();
        log.info("checkstatus count={}",count);
        if(count > 0) {
            //灯闪烁,判定为待机,这里修改设备状态的数据
            point.put(IOTDBConstant.DEVICE_STATUS_KEY, DefaultWcsEnum.STANDBY.getCode()+"");
        }
        HistoryDeviceState his = new HistoryDeviceState();
        his.setTime(pointTime);
        his.setDeviceStatus(wcs);
        his.setWorkstationId(workstationId);
        statusHistoryList.add(his);
        
    }
    /**
     * 解析消息体字符串未列表,每个列表项是一个工位的数据
     * @param payload
     * @return
     */
    List<TelemetryData> parsePayload(String payload){
        List<TelemetryData> dtList = new ArrayList<TelemetryData> ();
        //多条 数据格式:{"174":[{"values":{"output":11},"ts":"1722478128278"},{"values":{"spindleSpeed":22},"ts":"1722478128281"}]}   174是应用id
        //解析消息 name,value形式,如n=output,v=11
        JSONObject jsonObj = JSONObject.parseObject(payload);
        
        Set<String> keySet = jsonObj.keySet();
        String[] keys = keySet.toArray(new String[] {});
        
        
        for(String key : keys) {
            String appId = key;//iot系统中的应用id,本应用中应该用表去对应
            
            //获取工位数据点配置,只保存配置好的数据点,没有配置的采集数据抛弃。
            final WorkstationDatapointsVO dpVo = dpService.getDatapointsByAppIdFromCache(appId);
            
            if(dpVo == null) {
                //工位没有定义过数据点或者appId不匹配
                log.warn("appId={}未找到数据点定义记录,丢弃数据",appId);
                continue;
            }
            
            TelemetryData tdata = new TelemetryData();
            //tdata.setWorkstationId(workstationId);
            tdata.setAppId(appId);
            tdata.setWorkstationId(dpVo.getWorkstationId());
            
            JSONArray dtArr = jsonObj.getJSONArray(appId);
            for(int i=0;i<dtArr.size();i++) {
                
                TelemetryDataItem tdataItem = new TelemetryDataItem();
                JSONObject jsonDataItem = dtArr.getJSONObject(i);//每一项中dataItem还有values对象和ts时间戳2个属性
                
                long time = jsonDataItem.getLong("ts");
                tdataItem.setTime(time);
                JSONObject values = jsonDataItem.getJSONObject("values");
                
                Set<String> valueKeySet = values.keySet();
                valueKeySet.forEach(valueKey ->{
                    if(!dpVo.containsDataPoint(valueKey)) {
                        //如果不存在该数据点配置,该数据直接忽略
                        return;
                    }
                    
                    String oriValueKey = valueKey;//由于使用计算规则的采集点名称会后面增加一个"_n",所以这个oriValueKey代表没有增加"_n"的
                    if(StringUtils.endsWith(valueKey, NEWDP_SUFFIX)) {
                        oriValueKey = StringUtils.removeEnd(valueKey, NEWDP_SUFFIX);
                    }
                    
                    tdataItem.addPoint(oriValueKey,values.getString(valueKey));//使用原始配置点保持保存数据
                });
                
                tdata.addItem(tdataItem);
            }
            
            dtList.add(tdata);
        }
        
        return dtList;
    }
    
}