接收采集数据,增加了使用配置数据点限制,配置了的数据点才会保存。未配置则丢弃
已修改9个文件
已添加5个文件
456 ■■■■ 文件已修改
collect/src/main/java/com/qianwen/mdc/collect/config/MqttConfig.java 31 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
collect/src/main/java/com/qianwen/mdc/collect/controller/MqttController.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
collect/src/main/java/com/qianwen/mdc/collect/domain/TelemetryData.java 13 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
collect/src/main/java/com/qianwen/mdc/collect/entity/mgr/WorkstationDatapoints.java 65 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
collect/src/main/java/com/qianwen/mdc/collect/handler/AlarmDataHandler.java 43 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
collect/src/main/java/com/qianwen/mdc/collect/mapper/mgr/WorkstationDatapointsMapper.java 11 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
collect/src/main/java/com/qianwen/mdc/collect/runner/InitRunner.java 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
collect/src/main/java/com/qianwen/mdc/collect/service/CollectDataService.java 16 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
collect/src/main/java/com/qianwen/mdc/collect/service/IOTMqttReceiveService.java 38 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
collect/src/main/java/com/qianwen/mdc/collect/service/PackedDataService.java 5 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
collect/src/main/java/com/qianwen/mdc/collect/service/WorkstationDatapointsService.java 63 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
collect/src/main/java/com/qianwen/mdc/collect/vo/WorkstationDatapointsVO.java 143 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
collect/src/main/resources/application-dev.yml 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
collect/src/test/java/com/qianwen/mdc/collect/service/IOTMqttReceiveServiceTest.java 14 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
collect/src/main/java/com/qianwen/mdc/collect/config/MqttConfig.java
@@ -21,9 +21,11 @@
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import com.alibaba.fastjson.JSONObject;
import com.qianwen.mdc.collect.service.DeviceStateFixPointService;
import com.qianwen.mdc.collect.service.IOTMqttReceiveService;
import com.qianwen.mdc.collect.service.WorkstationAppMappingService;
import com.qianwen.mdc.collect.service.WorkstationDatapointsService;
import com.qianwen.mdc.collect.service.feedback.WorkstationFeedbackService;
import cn.hutool.core.date.DateTime;
@@ -40,19 +42,22 @@
    @Value("${mqtt.password:}")
    private String mqttPassword;
    @Autowired
    private IOTMqttReceiveService recService;
    @Autowired
    private DeviceStateFixPointService stateFixPointService;
    @Autowired
    private WorkstationFeedbackService workstationFeedbackService;
    @Autowired
    private WorkstationAppMappingService workstationAppMappingService;
    
    public static final String COLLECT_DATA_TOPIC = "forward/test";
    @Autowired
    private WorkstationDatapointsService dpService;
    /**
     * æŽ¥æ”¶æ•°æ®çš„mqtt topic,在IOT平台配置的
     */
    @Value("${mqtt.dataReceiveTopic:}")
    public String COLLECT_DATA_TOPIC;
    
    /**
     * åé¦ˆåˆ›å»ºçš„topic(mdc中),本应用接收并处理
@@ -61,7 +66,10 @@
    
    public static final String WOCKSTATION_CREATE_TOPIC = "mdc/workstation-create";
    
    private final String WORKSTATION_APP_MAPPING_CHANGED_TOPIC = "mdc/workstation_app_mapping_changed";
    /**
     * å·¥ä½æ•°æ®ç‚¹å˜åŒ–
     */
    private final String WORKSTATION_DATAPOINT_CHANGED_TOPIC = "mdc/workstation_datapoint_changed";
    
    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
@@ -93,7 +101,7 @@
        String clientId = "spring-boot-mqtt-client-inbound"+r.nextInt(1000);
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter(clientId,
                        mqttClientFactory(), COLLECT_DATA_TOPIC, FEEDBACK_TOPIC,WOCKSTATION_CREATE_TOPIC,WORKSTATION_APP_MAPPING_CHANGED_TOPIC);//最后一个参数允许多个topic参数
                        mqttClientFactory(), COLLECT_DATA_TOPIC, FEEDBACK_TOPIC,WOCKSTATION_CREATE_TOPIC,WORKSTATION_DATAPOINT_CHANGED_TOPIC);//最后一个参数允许多个topic参数
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(1);
@@ -123,10 +131,15 @@
                logger.info("工位创建接收消息={}",workstationId);
                stateFixPointService.deviceStateFixPoint(DateTime.now(), Arrays.asList(workstationId));
                recService.handle((String)message.getPayload());
            }else if(WORKSTATION_APP_MAPPING_CHANGED_TOPIC.equals(topic)) {
                String workstationId = (String)message.getPayload();
                logger.info("工位appId映射变化消息={}",workstationId);
            }else if(WORKSTATION_DATAPOINT_CHANGED_TOPIC.equals(topic)) {
                String payload = (String)message.getPayload();
                logger.info("工位appId映射变化消息={}",payload);
                workstationAppMappingService.saveToCache();
                //清除该工位的数据点缓存
                JSONObject payloadObj = JSONObject.parseObject(payload);
                //payloadObj.getLong("workstationId");
                dpService.datapointsCacheEvict(payloadObj.getString("appId"));
            } else {//订阅了几个topic就会接收到几个,其他的不会进来
                logger.warn("topic={},msg={},无对应的处理器",topic,message.getPayload());
            }
collect/src/main/java/com/qianwen/mdc/collect/controller/MqttController.java
@@ -41,7 +41,7 @@
    public void testRec2() {
        //数据格式:{"174":[{"values":{"d1":12},"ts":"1721978780449"}]} 174是应用id
        //多条格式:{"174":[{"values":{"DeviceStatus":2},"ts":"1722478128278"},{"values":{"SpindleSpeed":22},"ts":"1722478128281"}]}
        String payload = "{\"174\":[{\"values\":{\"DeviceStatus_n\":2,\"Output\":38},\"ts\":\""+System.currentTimeMillis()+"\"}]}";
        String payload = "{\"182\":[{\"values\":{\"DeviceStatus_n\":2,\"Output\":38},\"ts\":\""+System.currentTimeMillis()+"\"}]}";
        //payload = "{\"174\":[{\"values\":{\"Output\":11},\"ts\":\"1722478128278\"},{\"values\":{\"SpindleSpeed\":22},\"ts\":\"1722478128281\"}]}";
        recService.handle(payload);
    }
collect/src/main/java/com/qianwen/mdc/collect/domain/TelemetryData.java
@@ -10,6 +10,10 @@
 */
public class TelemetryData {
    private long workstationId;
    /**
     * iotdb的appId
     */
    private String appId;
    private List<TelemetryDataItem> dataItems = new ArrayList<>();
    public void addItem(TelemetryDataItem item) {
@@ -35,9 +39,16 @@
    public void setDataItems(List<TelemetryDataItem> dataItems) {
        this.dataItems = dataItems;
    }
    public String getAppId() {
        return appId;
    }
    public void setAppId(String appId) {
        this.appId = appId;
    }
    @Override
    public String toString() {
        return "TelemetryData [workstationId=" + workstationId + ", dataItems=" + dataItems + "]";
        return "TelemetryData [workstationId=" + workstationId + ", appId=" + appId + ", dataItems=" + dataItems + "]";
    }
    
    
collect/src/main/java/com/qianwen/mdc/collect/entity/mgr/WorkstationDatapoints.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,65 @@
package com.qianwen.mdc.collect.entity.mgr;
import com.baomidou.mybatisplus.annotation.TableName;
import com.qianwen.core.mp.base.BaseEntity;
import io.swagger.annotations.ApiModelProperty;
/**
 * å·¥ä½é‡‡é›†æ•°æ®ç‚¹
 */
@TableName("workstation_datapoints")
public class WorkstationDatapoints  extends BaseEntity {
    private static final long serialVersionUID = 1;
    @ApiModelProperty("模板类型")
    private Integer type;
    @ApiModelProperty("点位配置(json数组)")
    private String dpConfig;
    /**
     * å·¥ä½id
     */
    private long workstationId;
    /**
     * IOT平台appId
     */
    private String appId;
    public Integer getType() {
        return type;
    }
    public void setType(Integer type) {
        this.type = type;
    }
    public String getDpConfig() {
        return dpConfig;
    }
    public void setDpConfig(String dpConfig) {
        this.dpConfig = dpConfig;
    }
    public long getWorkstationId() {
        return workstationId;
    }
    public void setWorkstationId(long workstationId) {
        this.workstationId = workstationId;
    }
    public String getAppId() {
        return appId;
    }
    public void setAppId(String appId) {
        this.appId = appId;
    }
}
collect/src/main/java/com/qianwen/mdc/collect/handler/AlarmDataHandler.java
@@ -1,12 +1,16 @@
package com.qianwen.mdc.collect.handler;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.qianwen.mdc.collect.dto.PackedTelemetryData;
import com.qianwen.mdc.collect.entity.iotdb.Alarm;
import com.qianwen.mdc.collect.service.AlarmService;
@@ -20,7 +24,19 @@
    @Override
    public void handleData(PackedTelemetryData data) {
        
        Alarm alarm =  new Alarm();//Objects.requireNonNull(BeanUtil.copy(data, Alarm.class));
        alarmService.saveAlarms(parseAlarm(data));
        log.info("报警数据处理完成:告警数据{} ",data);
    }
    List<Alarm> parseAlarm(PackedTelemetryData data){
        List<Alarm> alarmList = new ArrayList<>();
        //828d json数组格式[{"alarmNo":"8084"}]
        JSONArray alarmArr = JSONArray.parseArray(data.getValue());
        for(int i=0;i<alarmArr.size();i++) {
            Alarm alarm = new Alarm();
        alarm.setTime(data.getTime());
        alarm.setWorkstationId(data.getWorkstationId());
        alarm.setCalendarCode(data.getCalendarCode());
@@ -30,29 +46,16 @@
        alarm.setFactoryYear(data.getFactoryYear());
        alarm.setShiftIndex(data.getShiftIndex());
        alarm.setShiftTimeType(data.getShiftTimeType());
        fileAlarmDetail(alarm,data);
        alarmService.saveAlarms(Arrays.asList(alarm));
        
        log.info("报警数据处理完成:告警数据{} ",alarm);
            JSONObject alarmJson = alarmArr.getJSONObject(i);
            alarm.setCode(alarmJson.getString("alarmNo"));
        
            alarm.setMessage("");//暂时没有,需要反查
            alarmList.add(alarm);
    }
    /**
     * å¡«å……告警的详细信息,级别,信息,代码
     * @param alarm
     */
    void fileAlarmDetail(Alarm alarm,PackedTelemetryData data){
        alarm.setCode(this.parseCode(data.getValue()));
        alarm.setMessage(data.getValue());
        return alarmList;
    }
    
    
    /**
     * è§£æžæŠ¥è­¦ä»£ç 
     * @param collectText
     * @return
     */
    String parseCode(String collectText) {
        return "0000";
    }
}
collect/src/main/java/com/qianwen/mdc/collect/mapper/mgr/WorkstationDatapointsMapper.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,11 @@
package com.qianwen.mdc.collect.mapper.mgr;
import com.qianwen.core.mp.mapper.BladeMapper;
import com.qianwen.mdc.collect.entity.mgr.WorkstationDatapoints;
/**
 * å·¥ä½æ•°æ®ç‚¹mapper
 */
public interface WorkstationDatapointsMapper extends BladeMapper<WorkstationDatapoints> {
}
collect/src/main/java/com/qianwen/mdc/collect/runner/InitRunner.java
@@ -30,13 +30,13 @@
    private TimeSliceCache timeSliceCache;
    @Autowired
    private DeviceStateMapper deviceStateMapper;
    @Autowired
    private WorkstationAppMappingService mappingService;;
    //@Autowired
    //private WorkstationAppMappingService mappingService;;
    
    @Override
    public void run(ApplicationArguments args) throws Exception {
        
        mappingService.saveToCache();
        //mappingService.saveToCache();
        
        //生成时间切片
        CacheBuildDTO cacheBuildDTO = CacheBuildDTO.builder().tenantIds(Sets.newHashSet(new String[]{"000000"})).targetDate(LocalDate.now()).build();
@@ -48,7 +48,7 @@
    //@RedisLock("posting:lock:initStateFixPoint")
    public void checkNeedStateFixPoint() {
        DateTime dateTime = DateTime.now();
        log.info("进入程序启动校验是否存在工位打过固定点....... ");
        log.info("程序启动校验是否存在工位打过固定点....... ");
       
        Long count = deviceStateMapper.fixPointCountByDate(Integer.valueOf(DatePattern.PURE_DATE_FORMAT.format(dateTime)));
        /*
collect/src/main/java/com/qianwen/mdc/collect/service/CollectDataService.java
@@ -25,6 +25,7 @@
import com.qianwen.mdc.collect.domain.TelemetryData;
import com.qianwen.mdc.collect.domain.TelemetryDataItem;
import com.qianwen.mdc.collect.mqtt.MqttMessageSender;
import com.qianwen.mdc.collect.vo.WorkstationDatapointsVO;
/**
 * é‡‡é›†æ•°æ®å¤„理入库
@@ -33,7 +34,6 @@
public class CollectDataService {
    private static final Logger log = LoggerFactory.getLogger(CollectDataService.class);
    
    //private String DB_PREFIX = "root.f2.";
    private static final Map<Integer, String> PROCESS_PARAM_MAP = new HashMap<>();
    @Autowired
    private IotDBSessionConfig iotdbConfig;
@@ -41,6 +41,7 @@
    private IotDBCommonService iotDBCommonService;
    @Autowired
    private MqttMessageSender mqttMessageSender;
    
    /**
     * å®žæ—¶æ•°æ®topic,要与mdc里面得相同
@@ -63,6 +64,7 @@
    public void handleCollectData(List<TelemetryData> telemetryDataList) {
        for (TelemetryData dt : telemetryDataList) {
            handleOneWorkstation(dt);
            
            sendRealtimeDataMsg(dt);
@@ -113,14 +115,14 @@
     * @param dt
     */
    void handleOneWorkstation(TelemetryData dt) {
        String deviceId;// = DB_PREFIX+TEMPLATE_NAME + "_" + dt.getWorkstationId();
        String deviceId;
        //long workstationId = dpVo.getWorkstationId();
        List<MeasurementSchema> schemas = new ArrayList<>();
        schemas.add(new MeasurementSchema("workstation_id", TSDataType.INT64));
        schemas.add(new MeasurementSchema("n", TSDataType.TEXT));
        schemas.add(new MeasurementSchema("v", TSDataType.TEXT));
        
        int rowIndex = 0;
        
@@ -137,6 +139,8 @@
            iotDBCommonService.setTemmplateIfNotSet(TEMPLATE_NAME, deviceId);
            Tablet tablet = new Tablet(deviceId, schemas);
            for(TypedTelemetryData tdata : typeList) {
                rowIndex = tablet.rowSize++;
                tablet.addTimestamp(rowIndex, tdata.getTime());
                tablet.addValue("workstation_id",rowIndex,dt.getWorkstationId());
@@ -144,18 +148,16 @@
                if(!tdata.getName().equals("Alarm")) {
                    tablet.addValue("v",rowIndex,tdata.getValue());
                }else {
                    //告警信息根据原版需要处理一下,格式未json对象:{"timestamp":1718839644476,"code":1000,"msg":"EMERGENCY STOP","alarmtype":15,"level":""}
                    //告警信息根据原版需要处理一下,格式为json对象:{"timestamp":1718839644476,"code":1000,"msg":"EMERGENCY STOP","alarmtype":15,"level":""}
                    tablet.addValue("v",rowIndex,formatAlarmMsg(tdata.getTime(),tdata.getValue()));
                }
            }
            
            try {
                iotdbConfig.getSessionPool().insertAlignedTablet(tablet);
                //updateLastParam(dt.getWorkstationId(),typeList);
            } catch (Exception e) {
                log.error("IOTDB入库失败",e);
                e.printStackTrace();
            }finally {
                //iotdbConfig.getSessionPool().clo1se();
            }
collect/src/main/java/com/qianwen/mdc/collect/service/IOTMqttReceiveService.java
@@ -2,7 +2,6 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
@@ -16,6 +15,7 @@
import com.qianwen.mdc.collect.domain.TelemetryData;
import com.qianwen.mdc.collect.domain.TelemetryDataItem;
import com.qianwen.mdc.collect.utils.redis.RedisUtil;
import com.qianwen.mdc.collect.vo.WorkstationDatapointsVO;
@Service
public class IOTMqttReceiveService {
@@ -27,12 +27,13 @@
    private PackedDataService packedDataService;
    @Autowired
    private  RedisUtil redisUtil;
    @Autowired
    private WorkstationDatapointsService dpService;
    /**
     * å¤„理收到的消息,对应TelemetryDataPostingConsumer
     * @param payload
     */
    public void handle(String payload) {
        //System.out.println("Received message122: " + payload);
        //解析消息
        List<TelemetryData> teleList = parsePayload(payload);
        
@@ -48,22 +49,32 @@
     */
    List<TelemetryData> parsePayload(String payload){
        List<TelemetryData> dtList = new ArrayList<TelemetryData> ();
        //数据格式:{"174":[{"values":{"d1":12},"ts":"1721978780449"}]} 174是应用id
        //多条格式:{"174":[{"values":{"output":11},"ts":"1722478128278"},{"values":{"spindleSpeed":22},"ts":"1722478128281"}]}
        //多条 æ•°æ®æ ¼å¼ï¼š{"174":[{"values":{"output":11},"ts":"1722478128278"},{"values":{"spindleSpeed":22},"ts":"1722478128281"}]}   174是应用id
        //解析消息 name,value形式,如n=output,v=11
        JSONObject jsonObj = JSONObject.parseObject(payload);
        
        Set<String> keySet = jsonObj.keySet();
        String[] keys = keySet.toArray(new String[] {});
        
        //WorkstationDatapointsVO dpVo;
        final String NEWDP_SUFFIX = "_n";//计算规则使用之后新数据点的结尾
        for(String key : keys) {
            String appId = key;//iot系统中的应用id,本应用中应该用表去对应
            long workstationId = getWorkstationIdByAppId(appId);
            //TODO èŽ·å–å·¥ä½æ•°æ®ç‚¹é…ç½®,只保存配置好的数据点,没有配置的采集数据抛弃。
            final WorkstationDatapointsVO dpVo = dpService.getDatapointsByAppIdFromCache(appId);
            if(dpVo == null) {
                //工位没有定义过数据点或者appId不匹配
                log.warn("appId={}未找到数据点定义记录,丢弃数据",appId);
                continue;
            }
            
            TelemetryData tdata = new TelemetryData();
            tdata.setWorkstationId(workstationId);
            //tdata.setWorkstationId(workstationId);
            tdata.setAppId(appId);
            tdata.setWorkstationId(dpVo.getWorkstationId());
            
            JSONArray dtArr = jsonObj.getJSONArray(appId);
            for(int i=0;i<dtArr.size();i++) {
@@ -80,6 +91,10 @@
                    String oriValueKey = valueKey;;//由于使用计算规则的采集点名称会后面增加一个"_n",所以这个oriValueKey代表没有增加"_n"的
                    if(StringUtils.endsWith(valueKey, NEWDP_SUFFIX)) {
                        oriValueKey = StringUtils.removeEnd(valueKey, NEWDP_SUFFIX);
                    }
                    if(!dpVo.containsDataPoint(oriValueKey)) {
                        //如果不存在该数据点配置,该数据直接忽略
                        return;
                    }
                    tdataItem.addPoint(oriValueKey,values.getString(valueKey));//使用原始配置点保持保存数据
                });
@@ -98,11 +113,16 @@
     * @param appId
     * @return
     */
    public long getWorkstationIdByAppId(String appId) {
    /*
    public Long getWorkstationIdByAppId(String appId) {
        Object wid = redisUtil.hget("workstation-appid-map", appId);
        String workstationId = String.valueOf(Optional.ofNullable(wid).orElse(appId));
    
        String workstationId = String.valueOf(Optional.ofNullable(wid).orElse(StringUtils.EMPTY));
        if(ObjectUtil.isEmpty(workstationId)) {
            return null;
        }
        return Long.parseLong(workstationId);
    }
    */
}
collect/src/main/java/com/qianwen/mdc/collect/service/PackedDataService.java
@@ -47,6 +47,8 @@
    private WorkstationCache workstationCache;
    @Autowired
    private TimeSliceCache timeSliceCache;
    @Autowired
    private WorkstationDatapointsService dpService;
    static {
        PROCESS_PARAM_MAP.put(1, "STATE");
@@ -91,12 +93,13 @@
                    String[] keys = map.keySet().toArray(new String[0]);
                    for (int j = 0; j < keys.length; j++) {
                        //TODO: è¿™é‡Œï¼ŒåŽŸç³»ç»Ÿè¿›è¡Œäº†è¿‡æ»¤ã€‚WorkstationCollectDataServiceImpl.handlerWorkstationCollectData里
                        PackedTelemetryData pkData = new PackedTelemetryData();
                        pkData.setWorkstationId(tdata.getWorkstationId());
                        pkData.setValue(map.get(keys[j]));
                        pkData.setTime(item.getTime());
                        pkData.setName(keys[j]);//参数名称
                        pkData.setName(keys[j]);//数据点名称
                                
                        fillByCalendar(pkData);
                        
collect/src/main/java/com/qianwen/mdc/collect/service/WorkstationDatapointsService.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,63 @@
package com.qianwen.mdc.collect.service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cache.annotation.CacheEvict;
import org.springframework.cache.annotation.Cacheable;
import org.springframework.stereotype.Service;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.qianwen.core.mp.base.BaseServiceImpl;
import com.qianwen.mdc.collect.entity.mgr.WorkstationDatapoints;
import com.qianwen.mdc.collect.mapper.mgr.WorkstationDatapointsMapper;
import com.qianwen.mdc.collect.vo.WorkstationDatapointsVO;
/**
 * å·¥ä½æ•°æ®ç‚¹æœåŠ¡ï¼Œä¸»è¦ç”¨äºŽèŽ·å–æ•°æ®ç‚¹å®šä¹‰ã€‚å®šä¹‰åœ¨smartman应用
 */
@Service
public class WorkstationDatapointsService extends BaseServiceImpl<WorkstationDatapointsMapper, WorkstationDatapoints> {
    private Logger log = LoggerFactory.getLogger(this.getClass());
    /**
     * èŽ·å–å·¥ä½æ•°æ®ç‚¹é…ç½® ,从缓存
     * @param workstationId
     * @return
     */
    @Cacheable(value = "collect:datapoint" ,key = "#appId")
    public WorkstationDatapointsVO getDatapointsByAppIdFromCache(String appId) {
        return this.getDataPointByAppId(appId);
    }
    /*
    @CachePut(value = "collect:datapoint" ,key = "#appId")
    public WorkstationDatapointsVO datapointsCachePut(String appId) {
        return this.getDataPointByAppId(appId);
    }
    */
    @CacheEvict(value = "collect:datapoint" ,key = "#appId")
    public void datapointsCacheEvict(String appId) {
    }
    /**
     * ä»Žæ•°æ®åº“查询数据点定义
     * @param workstationId å·¥ä½id
     * @return
     */
    WorkstationDatapointsVO getDataPointByAppId(String appId) {
        log.info("appid={}", appId);
        WorkstationDatapoints dp = baseMapper.selectOne(Wrappers.<WorkstationDatapoints>lambdaQuery()
                .eq(WorkstationDatapoints::getAppId, appId));
        log.info("dp={}", dp);
        WorkstationDatapointsVO dpVO = null;
        if (dp != null) {
            dpVO = new WorkstationDatapointsVO(dp.getWorkstationId(), dp.getAppId(), dp.getDpConfig());
        }
        return dpVO;
    }
}
collect/src/main/java/com/qianwen/mdc/collect/vo/WorkstationDatapointsVO.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,143 @@
package com.qianwen.mdc.collect.vo;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.lang3.StringUtils;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.json.JSONUtil;
import io.swagger.annotations.ApiModelProperty;
/**
 * å·¥ä½é‡‡é›†æ•°æ®ç‚¹VO
 */
public class WorkstationDatapointsVO implements Serializable{
    /**
     * åºåˆ—化,应为需要spring缓存
     */
    private static final long serialVersionUID = 6558493027948435061L;
    @ApiModelProperty("点位表头(json数组)")
    private String dpHead;
    @ApiModelProperty("点位配置(json数组)")
    private String dpConfig;
    /**
     * å·¥ä½id
     */
    private long workstationId;
    /**
     * IOT平台appId
     */
    private String appId;
    //private List<DataPoint> points = null;
    private List<String> points = new ArrayList<>();
    public WorkstationDatapointsVO(long workstationId, String appId,String dpConfig) {
        super();
        this.dpConfig = dpConfig;
        this.workstationId = workstationId;
        this.appId = appId;
        initPoints();
    }
    void initPoints() {
        if(ObjectUtil.isEmpty(dpConfig)) {
            return;
        }
        JSONArray ptArr = JSONArray.parseArray(dpConfig);
        points = new ArrayList<>();
        JSONObject ptObj;
        for(int i=0;i<ptArr.size();i++) {
            ptObj = ptArr.getJSONObject(i);
            //DataPoint dp = new DataPoint();
            //dp.setDpName(ptObj.getString("dpName"));
            points.add(ptObj.getString("dpName"));
        }
    }
    public String getDpConfig() {
        return dpConfig;
    }
    public void setDpConfig(String dpConfig) {
        this.dpConfig = dpConfig;
    }
    public long getWorkstationId() {
        return workstationId;
    }
    public void setWorkstationId(long workstationId) {
        this.workstationId = workstationId;
    }
    public String getDpHead() {
        return dpHead;
    }
    public void setDpHead(String dpHead) {
        this.dpHead = dpHead;
    }
    public String getAppId() {
        return appId;
    }
    public void setAppId(String appId) {
        this.appId = appId;
    }
    /**
     * åˆ¤æ–­é‡‡é›†å˜é‡åç§°æ˜¯å¦åœ¨æ•°æ®ç‚¹é…ç½®ç§
     * @param dpName
     * @return
     */
    public boolean containsDataPoint(String dpName) {
        if(ObjectUtil.isEmpty(points)) {
            return false;
        }
        return points.contains(dpName);
        /*
        for(String dpn : points) {
            if(StringUtils.equals(dpn, dpName)) {
                return true;
            }
        }
        return false;
        */
    }
}
/*
class DataPoint{
    private String dpName;
    public String getDpName() {
        return dpName;
    }
    public void setDpName(String dpName) {
        this.dpName = dpName;
    }
}
*/
collect/src/main/resources/application-dev.yml
@@ -23,7 +23,7 @@
  #心跳时间
  keepalive: 10
  connectionTimeout: 3000 #连接超时时间
  dataReceiveTopic: forward/test #从iot平台接收mqtt采集数据的topic
  # mysql
datasource:
  type: mysql
@@ -35,7 +35,7 @@
#iotdb ä»¥åŠå…¶jdbc一起配置
iotdb:
  driver: org.apache.iotdb.jdbc.IoTDBDriver
  host: 120.46.212.231
  host: localhost #120.46.212.231
  port: 6667
  maxSize: 100
  username: root
collect/src/test/java/com/qianwen/mdc/collect/service/IOTMqttReceiveServiceTest.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,14 @@
package com.qianwen.mdc.collect.service;
import static org.junit.Assert.assertNull;
import org.junit.jupiter.api.Test;
public class IOTMqttReceiveServiceTest {
    @Test
    public void t() {
        //System.out.println(a == null);
    }
}