yangys
2025-04-13 03fc7dc5d536f08e711f7ddb79f720c2e1ded16b
增加状态核实;增加模板是否set的缓存优化
已修改12个文件
已添加8个文件
570 ■■■■■ 文件已修改
collect/pom.xml 5 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
collect/src/main/java/com/qianwen/mdc/collect/constants/IOTDBConstant.java 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
collect/src/main/java/com/qianwen/mdc/collect/controller/InnerCacheTestController.java 64 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
collect/src/main/java/com/qianwen/mdc/collect/controller/MqttController.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
collect/src/main/java/com/qianwen/mdc/collect/entity/mgr/DeviceStatusCheckCfg.java 29 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
collect/src/main/java/com/qianwen/mdc/collect/mapper/iotdb/DeviceStateMapper.java 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
collect/src/main/java/com/qianwen/mdc/collect/mapper/iotdb/ProcessParamMapper.java 3 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
collect/src/main/java/com/qianwen/mdc/collect/mapper/mgr/DeviceStatusCheckCfgMapaper.java 12 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
collect/src/main/java/com/qianwen/mdc/collect/service/DeviceStatusCheckCfgLoader.java 173 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
collect/src/main/java/com/qianwen/mdc/collect/service/IOTMqttReceiveService.java 74 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
collect/src/main/java/com/qianwen/mdc/collect/service/IotDBCommonService.java 34 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
collect/src/main/java/com/qianwen/mdc/collect/service/ProcessParamService.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
collect/src/main/java/com/qianwen/mdc/collect/utils/FixSizeLinkedList.java 33 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
collect/src/main/java/com/qianwen/mdc/collect/vo/DeviceStateCheckConfigVO.java 60 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
collect/src/main/java/com/qianwen/mdc/collect/vo/HistoryDeviceState.java 36 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
collect/src/main/resources/application-dev.yml 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
collect/src/main/resources/com/qianwen/mdc/collect/mapper/iotdb/DeviceStateMapper.xml 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
collect/src/main/resources/com/qianwen/mdc/collect/mapper/iotdb/ProcessParamMapper.xml 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
collect/src/main/resources/logback.xml 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
collect/src/test/java/com/qianwen/mdc/collect/utils/FixSizeLinkedListTest.java 27 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
collect/pom.xml
@@ -164,7 +164,10 @@
            <artifactId>xxl-job-core</artifactId>
            <version>2.1.2</version>
        </dependency>
        <dependency>
            <groupId>com.github.ben-manes.caffeine</groupId>
            <artifactId>caffeine</artifactId>
        </dependency>
        <dependency>
           <groupId>org.jmockit</groupId>
collect/src/main/java/com/qianwen/mdc/collect/constants/IOTDBConstant.java
@@ -46,4 +46,9 @@
     * é›¶ä»¶å·æ¨¡æ¿åç§° state
     */
    public static final String TEMPLATE_PROG_NAME = "prog_name";
    /**
     * è®¾å¤‡çŠ¶æ€æ•°æ®ç‚¹åç§° DeviceStatus
     */
    public static final String DEVICE_STATUS_KEY = "DeviceStatus";
}
collect/src/main/java/com/qianwen/mdc/collect/controller/InnerCacheTestController.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,64 @@
package com.qianwen.mdc.collect.controller;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.qianwen.mdc.collect.service.DeviceStatusCheckCfgLoader;
import com.qianwen.mdc.collect.vo.HistoryDeviceState;
@RestController("innercache")
@RequestMapping("innercache")
public class InnerCacheTestController {
    Cache<String, String> cache;
    @Autowired
    private DeviceStatusCheckCfgLoader cfgLoader;
    @GetMapping("/get")
    public Object gett() {
        return cache.getIfPresent("key1");
    }
    @GetMapping("/set")
    public Object sett() {
        cache = Caffeine.newBuilder()
                // æœ€å¤§100,超过则放弃老的
                .maximumSize(100).expireAfterWrite(20, TimeUnit.SECONDS)
                // 10 åˆ†é’Ÿè¿‡æœŸ
                //.expireAfterAccess(20, TimeUnit.SECONDS)
                .build();
        // å‘缓存中放入数据
        cache.put("key1", "value1");
        return cache.getIfPresent("key1");
    }
    /**
     *
     * @param wid å·¥ä½id
     * @return
     */
    @GetMapping("/getc")
    public Object getc(long wid) {
        return cfgLoader.getHistoryList(wid);
    }
    @GetMapping("/setc")
    public Object setc(long wid) {
        List<HistoryDeviceState> statusHistoryList = cfgLoader.getHistoryList(wid);
        HistoryDeviceState his = new HistoryDeviceState();
        his.setTime(999);
        his.setDeviceStatus(1000);
        his.setWorkstationId(wid);
        statusHistoryList.add(his);
        return cfgLoader.getHistoryList(wid);
    }
}
collect/src/main/java/com/qianwen/mdc/collect/controller/MqttController.java
@@ -70,7 +70,7 @@
    }
    @GetMapping("/tpluse")
    public Object tpluse(String tplname,String path) throws StatementExecutionException, IoTDBConnectionException {
        return iotService.isTemplateSetOnPath(tplname, path);
        return "";//iotService.isTemplateSetOnXXPath(tplname, path);
        
    }
    
collect/src/main/java/com/qianwen/mdc/collect/entity/mgr/DeviceStatusCheckCfg.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,29 @@
package com.qianwen.mdc.collect.entity.mgr;
import com.baomidou.mybatisplus.annotation.TableName;
import com.qianwen.core.tenant.mp.TenantEntity;
@TableName("device_status_check_cfg")
public class DeviceStatusCheckCfg  extends TenantEntity {
    /**
     *
     */
    private static final long serialVersionUID = 1L;
    private String checkConfig;
    public String getCheckConfig() {
        return checkConfig;
    }
    public void setCheckConfig(String checkConfig) {
        this.checkConfig = checkConfig;
    }
    public static long getSerialversionuid() {
        return serialVersionUID;
    }
    @Override
    public String toString() {
        return "DeviceStatusCheckCfg [checkConfig=" + checkConfig + "]";
    }
}
collect/src/main/java/com/qianwen/mdc/collect/mapper/iotdb/DeviceStateMapper.java
@@ -6,6 +6,7 @@
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.qianwen.mdc.collect.dto.StateAggregateTimeDTO;
import com.qianwen.mdc.collect.entity.iotdb.DeviceState;
import com.qianwen.mdc.collect.vo.HistoryDeviceState;
import java.time.LocalDateTime;
import java.util.Date;
collect/src/main/java/com/qianwen/mdc/collect/mapper/iotdb/ProcessParamMapper.java
@@ -1,5 +1,7 @@
package com.qianwen.mdc.collect.mapper.iotdb;
import java.util.List;
import org.apache.ibatis.annotations.Param;
import com.baomidou.dynamic.datasource.annotation.DS;
@@ -16,4 +18,5 @@
     */
    ProcessParam lastParamByWorstationId(@Param("workstationId") Long workstationId);
    List<ProcessParam> lastParamListByWorstationId(@Param("item")String item,@Param("workstationId") Long workstationId,@Param("count") int count);
}
collect/src/main/java/com/qianwen/mdc/collect/mapper/mgr/DeviceStatusCheckCfgMapaper.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,12 @@
package com.qianwen.mdc.collect.mapper.mgr;
import org.apache.ibatis.annotations.Param;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.qianwen.mdc.collect.entity.mgr.DeviceStatusCheckCfg;
import com.qianwen.mdc.collect.entity.mgr.PartWorkingHour;
public interface DeviceStatusCheckCfgMapaper extends BaseMapper<DeviceStatusCheckCfg> {
}
collect/src/main/java/com/qianwen/mdc/collect/service/DeviceStatusCheckCfgLoader.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,173 @@
package com.qianwen.mdc.collect.service;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.qianwen.mdc.collect.cache.WorkstationCache;
import com.qianwen.mdc.collect.constants.IOTDBConstant;
import com.qianwen.mdc.collect.dto.WorkstationDTO;
import com.qianwen.mdc.collect.entity.iotdb.ProcessParam;
import com.qianwen.mdc.collect.entity.mgr.DeviceStatusCheckCfg;
import com.qianwen.mdc.collect.mapper.iotdb.ProcessParamMapper;
import com.qianwen.mdc.collect.mapper.mgr.DeviceStatusCheckCfgMapaper;
import com.qianwen.mdc.collect.utils.FixSizeLinkedList;
import com.qianwen.mdc.collect.vo.DeviceStateCheckConfigVO;
import com.qianwen.mdc.collect.vo.HistoryDeviceState;
/**
 * è®¾å¤‡çŠ¶æ€æ ¸å®žé…ç½®åŠ è½½å™¨
 */
@Service
public class DeviceStatusCheckCfgLoader {
    private static final Logger logger = LoggerFactory.getLogger(DeviceStatusCheckCfgLoader.class);
    @Autowired
    private DeviceStatusCheckCfgMapaper cfgMapper;
    @Autowired
    private WorkstationCache workstationCache;
    @Autowired
    private ProcessParamMapper processParamMapper;
    //public static Map<Long,List<HistoryDeviceStatus>> DEVICE_STTUS_MAP = new HashMap<>();
    public static final String CFG_KEY = "check_config";
    public static final String CFG_VO_KEY = "check_config_vo";
    public static final String EMPTY_CONFIG = "{\"workstationCodes\":\"\",\"interval\":3000}";
    /**
     * çŠ¶æ€åŽ†å²åˆ—è¡¨æœ€å¤§è®°å½•æ•°
     */
    public static final int HISTORY_SIZE = 20;
    /**
     * ç¼“存秒数
     */
    public static final int CACHE_DURATION_SEC = 120;//2mins
    /**
     * é…ç½®çš„缓存
     */
    Cache<String, String> cfgCache;
    /**
     * é…ç½®VO的缓存
     */
    Cache<String, DeviceStateCheckConfigVO> cfgVOCache;
    /**
     * çŠ¶æ€åŽ†å²ç¼“å­˜
     */
    Cache<Long,FixSizeLinkedList<HistoryDeviceState>> historyCache;
    public DeviceStatusCheckCfgLoader(){
        cfgCache = Caffeine.newBuilder()
                // æœ€å¤§size,超过则放弃老的
                .maximumSize(1).expireAfterWrite(CACHE_DURATION_SEC, TimeUnit.SECONDS)
                .build();
        cfgVOCache = Caffeine.newBuilder()
                .maximumSize(1).expireAfterWrite(CACHE_DURATION_SEC, TimeUnit.SECONDS)
                .build();
        historyCache = Caffeine.newBuilder()
                .maximumSize(50000)//最多50000条数据在内(50000个workstationId)
                .expireAfterWrite(CACHE_DURATION_SEC, TimeUnit.SECONDS)
                .build();
    }
    //主方法
    public FixSizeLinkedList<HistoryDeviceState> getHistoryList(long workstationId) {
        FixSizeLinkedList<HistoryDeviceState> hisList = historyCache.getIfPresent(workstationId);
        if(hisList == null) {
            //可能缓存过期了,也可能根本没有!!!
            //String checkCfg = loadCfgFromCache();
            //String checkCfg = getCfgFromDb();
            //loadAllHistory(checkCfg);
            loadHistoryByWorkstation(workstationId);
            hisList = historyCache.getIfPresent(workstationId);
            //if(hisList == null) {
                //如果还是null,则说明在数据库中没有配置这个设备。为了避免重复从数据库加载配置,放入一个空list
            //    historyCache.put(workstationId, new FixSizeLinkedList<HistoryDeviceState>(HISTORY_SIZE));
            //}
        }
        return hisList;
    }
    void loadHistoryByWorkstation(long workstationId) {
        historyCache.put(workstationId, queryStatusHistory(workstationId));
    }
    FixSizeLinkedList<HistoryDeviceState> queryStatusHistory(long workstationId){
        FixSizeLinkedList<HistoryDeviceState> list = new FixSizeLinkedList<>(HISTORY_SIZE);
        List <ProcessParam> stateList = processParamMapper.lastParamListByWorstationId(IOTDBConstant.DEVICE_STATUS_KEY,workstationId,HISTORY_SIZE);
        ProcessParam state;
        for(int i=stateList.size()-1;i>-1;i--) {
            state = stateList.get(i);
            HistoryDeviceState his = new HistoryDeviceState();
            his.setTime(state.getTime());
            his.setDeviceStatus(Integer.parseInt(state.getV()));
            his.setWorkstationId(workstationId);
            list.add(his);
        }
        return list;
    }
    public DeviceStateCheckConfigVO getCheckConfigVOFromCache() {
        DeviceStateCheckConfigVO vo = cfgVOCache.getIfPresent(CFG_VO_KEY);
        if(vo == null) {
            logger.info("configvo expired reload");
            vo = new DeviceStateCheckConfigVO();
            String checkConfig = loadCfgFromCache();
            JSONObject cfgJson = JSONObject.parseObject(checkConfig);
            vo.setWorkstationCodes(cfgJson.getString("workstationCodes"));
            vo.setInterval(cfgJson.getLongValue("interval"));
            Map<String,WorkstationDTO> wsMap = workstationCache.getWorkstations();
            vo.init(wsMap);
            cfgVOCache.put(CFG_VO_KEY, vo);
        }
        logger.info("checkconfig load vo:{}",vo);
        return vo;
    }
    /**
     * ä»Žç¼“存加载配置
     * @return
     */
    public String loadCfgFromCache() {
        String checkConfig = cfgCache.getIfPresent(CFG_KEY);
        if(checkConfig == null) {
            checkConfig = getCfgFromDb();
        }
        if(checkConfig == null) {
            checkConfig = EMPTY_CONFIG;
        }
        return checkConfig;
    }
    String getCfgFromDb() {
        List<DeviceStatusCheckCfg> cfgs = cfgMapper.selectList(Wrappers.emptyWrapper());
        logger.info("checkconfig load from db:{}",cfgs);
        if(!cfgs.isEmpty()) {
            return cfgs.get(0).getCheckConfig();
        }else {
            return null;
        }
    }
}
collect/src/main/java/com/qianwen/mdc/collect/service/IOTMqttReceiveService.java
@@ -2,6 +2,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
@@ -12,9 +13,13 @@
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.qianwen.mdc.collect.constants.IOTDBConstant;
import com.qianwen.mdc.collect.domain.TelemetryData;
import com.qianwen.mdc.collect.domain.TelemetryDataItem;
import com.qianwen.mdc.collect.enums.DefaultWcsEnum;
import com.qianwen.mdc.collect.utils.redis.RedisUtil;
import com.qianwen.mdc.collect.vo.DeviceStateCheckConfigVO;
import com.qianwen.mdc.collect.vo.HistoryDeviceState;
import com.qianwen.mdc.collect.vo.WorkstationDatapointsVO;
@Service
@@ -28,7 +33,8 @@
    private PackedDataService packedDataService;
    @Autowired
    private WorkstationDatapointsService dpService;
    @Autowired
    private DeviceStatusCheckCfgLoader cfgLoader;
    /**
     * å¤„理收到的消息,对应TelemetryDataPostingConsumer
     * @param payload
@@ -37,11 +43,75 @@
        //解析消息
        List<TelemetryData> teleList = parsePayload(payload);
        
        checkDeviceStatus(teleList);
        collectDataService.handleCollectData(teleList);
        
        packedDataService.handle(teleList);
    }
    
    /**
     * æ£€æŸ¥è®¾å¤‡çŠ¶æ€ï¼Œæ•°æ®ï¼Œå¦‚æžœæ˜¯é—ªçƒçŠ¶æ€(运行,待机来回切换),规则是在数据表定义的
     * @param teleList
     */
    void checkDeviceStatus(List<TelemetryData> teleList){
        DeviceStateCheckConfigVO chkConfigVO = cfgLoader.getCheckConfigVOFromCache();
        long interval = cfgLoader.getCheckConfigVOFromCache().getInterval();
        int wcs;
        for(TelemetryData tdata:teleList) {//tdata.getWorkstationId()
            for(TelemetryDataItem dataItem:tdata.getDataItems()) {
                List<Map<String,String>> points = dataItem.getDataPoints();
                for(Map<String,String> pt : points) {
                    if(pt.containsKey(IOTDBConstant.DEVICE_STATUS_KEY)){
                        String val = pt.get(IOTDBConstant.DEVICE_STATUS_KEY);
                        wcs = Integer.parseInt(val);
                        //!!!code范围检查
                        computeDeviceStatus(tdata.getWorkstationId(),wcs,dataItem.getTime(),pt,chkConfigVO);
                    }
                }
            }
        }
        //di.getTime()
        //tdata.getWorkstationId();
    }
    /**
     * æ ¸å®žå¹¶è®¡ç®—状态值,如果有灯闪烁情况,则判定为待机
     * @param workstationId
     * @param wcs çŠ¶æ€å€¼
     * @param pointTime æ•°æ®ç‚¹çš„æ—¶é—´æˆ³
     * @param point æ•°æ®ç‚¹key和val
     * @param interval æ£€æŸ¥çš„æ—¶é—´è·¨åº¦ï¼Œæ¯«ç§’
     */
    void computeDeviceStatus(long workstationId,int wcs,long pointTime,Map<String,String> point,DeviceStateCheckConfigVO chkConfigVO) {
        long interval = chkConfigVO.getInterval();
        if(wcs != DefaultWcsEnum.RUNNING.getCode() || !chkConfigVO.containsWorkstation(workstationId)) {//仅针对运行状态的才进行状态核实
            return;
        }
        List<HistoryDeviceState> statusHistoryList = cfgLoader.getHistoryList(workstationId);// DEVICE_STTUS_MAP.get(workstationId);//何时初始化内部数据??
        //pointTime-interval 3秒内
        long count = statusHistoryList.stream().filter(his -> {
            return his.getTime()>(pointTime-interval) && his.getDeviceStatus() != DefaultWcsEnum.RUNNING.getCode();
        }).count();
        log.info("checkstatus count={}",count);
        if(count > 0) {
            //灯闪烁,判定为待机,这里修改设备状态的数据
            point.put(IOTDBConstant.DEVICE_STATUS_KEY, DefaultWcsEnum.STANDBY.getCode()+"");
        }
        HistoryDeviceState his = new HistoryDeviceState();
        his.setTime(pointTime);
        his.setDeviceStatus(wcs);
        his.setWorkstationId(workstationId);
        statusHistoryList.add(his);
    }
    /**
     * è§£æžæ¶ˆæ¯ä½“字符串未列表,每个列表项是一个工位的数据
     * @param payload
@@ -62,7 +132,7 @@
            
            //获取工位数据点配置,只保存配置好的数据点,没有配置的采集数据抛弃。
            final WorkstationDatapointsVO dpVo = dpService.getDatapointsByAppIdFromCache(appId);
            //final WorkstationDatapointsVO dpVo = dpService.getDataPointByAppId(appId);
            if(dpVo == null) {
                //工位没有定义过数据点或者appId不匹配
                log.warn("appId={}未找到数据点定义记录,丢弃数据",appId);
collect/src/main/java/com/qianwen/mdc/collect/service/IotDBCommonService.java
@@ -1,6 +1,7 @@
package com.qianwen.mdc.collect.service;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
@@ -9,6 +10,8 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.qianwen.mdc.collect.config.IotDBSessionConfig;
@@ -23,12 +26,27 @@
    private IotDBSessionConfig iotdbCfg;
    /**
     * ç¼“存秒数
     */
    public static final int CACHE_DURATION_SEC = 300;//5mins
    /**
     * æ¨¡æ¿æ˜¯å¦è®¾ç½®çš„缓存
     */
    Cache<String, Boolean> templateSetCache;
    public IotDBCommonService() {
        templateSetCache = Caffeine.newBuilder()
         // æœ€å¤§size,超过则放弃老的,大概每个数据点都应该有一个缓存,外加一些其他类型的表(入state_xxx)
         .maximumSize(5000).expireAfterWrite(CACHE_DURATION_SEC, TimeUnit.SECONDS)
         .build();
    }
    /**
     * æ¨¡æ¿æ˜¯å¦æŒ‚载到指定路径了
     * @param template æ¨¡æ¿åç§°
     * @param path è®¾å¤‡è·¯å¾„
     * @return
     */
    public boolean isTemplateSetOnPath(String template,String path) {
    private boolean isTemplateSetOnPath(String template,String path) {
        List<String> pathlist;
        try {
            pathlist = iotdbCfg.getSessionPool().showPathsTemplateSetOn(template);
@@ -42,14 +60,28 @@
    }
    
    /**
     * æ£€æŸ¥åœ¨ç¼“存中设备id是否被设置过
     * @param path
     * @return
     */
    private boolean isTemplateSetInCache(String path) {
        return templateSetCache.getIfPresent(path) != null;//不用考虑true/false,有key就是设置过
    }
    /**
     * å¦‚æžœdeviceId指定的路径没有挂载模板则执行挂载
     * @param template æ¨¡æ¿åç§°
     * @param deviceId è®¾å¤‡è·¯å¾„
     */
    public void setTemmplateIfNotSet(String template,String deviceId) {
        if(isTemplateSetInCache(deviceId)) {//缓存中记录过,说明已经设置模板了,不再继续执行后续步骤
            return;
        }
        if(!isTemplateSetOnPath(template, deviceId)) {
             try {
                 iotdbCfg.getSessionPool().setSchemaTemplate(template, deviceId);
                 //在缓存中记录该设备path已经被配置过了
                 templateSetCache.put(deviceId, true);
             } catch (Exception e) {
                 logger.error("为deviceId设置模板错误,template="+template+",deviceId="+deviceId,e);
            }
collect/src/main/java/com/qianwen/mdc/collect/service/ProcessParamService.java
@@ -59,7 +59,7 @@
    public void insertProcessParam(ProcessParam param) {
        String deviceId = generateDeviceId(param.getWorkstationId(),param.getN());
        
        iotDBCommonService.isTemplateSetOnPath(IOTDBConstant.TEMPLATE_PROCESS_PARAM, deviceId);
        iotDBCommonService.setTemmplateIfNotSet(IOTDBConstant.TEMPLATE_PROCESS_PARAM, deviceId);
        
        Tablet tablet = new Tablet(deviceId, schemas);
        int rowIndex = tablet.rowSize++;
collect/src/main/java/com/qianwen/mdc/collect/utils/FixSizeLinkedList.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,33 @@
package com.qianwen.mdc.collect.utils;
import java.util.LinkedList;
public class FixSizeLinkedList <T> extends LinkedList<T>{
    /**
     *
     */
    private static final long serialVersionUID = 1L;
    private int capacity;
    public FixSizeLinkedList(int capacity) {
        super();
        this.capacity = capacity;
    }
    @Override
    public void add(int index, T element) {
        super.add(index, element); // å…ˆè®©å…¶æ›´æ”¹æ•°é‡ï¼›
        if (size()  > capacity) {
            super.removeFirst();
        }
    }
    @Override
    public boolean add(T t) {
        if (size() + 1 > capacity) {
            super.removeFirst();
        }
        return super.add(t);
    }
}
collect/src/main/java/com/qianwen/mdc/collect/vo/DeviceStateCheckConfigVO.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,60 @@
package com.qianwen.mdc.collect.vo;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import com.qianwen.mdc.collect.dto.WorkstationDTO;
/*
 * ç”¨äºŽæ£€æŸ¥çŠ¶æ€æ ¸å®žçš„é…ç½®ï¼Œé…ç½®çš„è§£æžç»“æžœ
 */
public class DeviceStateCheckConfigVO {
    private long interval;
    private String workstationCodes;
    private List<Long> workstationIds = new ArrayList<>();
    public long getInterval() {
        return interval;
    }
    public void setInterval(long interval) {
        this.interval = interval;
    }
    public String getWorkstationCodes() {
        return workstationCodes;
    }
    public void setWorkstationCodes(String workstationCodes) {
        this.workstationCodes = workstationCodes;
    }
    @Override
    public String toString() {
        return "DeviceStateCheckConfigVO [interval=" + interval + ", workstationCodes=" + workstationCodes + "]";
    }
    /**
     * åˆå§‹åŒ–,将工位代码初始化为对应的工位id集合,供判断是否需要核实状态
     * @param wsMap
     */
    public void init(Map<String, WorkstationDTO> wsMap) {
        String[] codeArr = StringUtils.split(workstationCodes, ",");
        for(int i=0;i<codeArr.length;i++) {
            for (Map.Entry<String,WorkstationDTO> entry : wsMap.entrySet()) {
                if(StringUtils.equals(codeArr[i], entry.getValue().getCode())) {
                    this.workstationIds.add(Long.parseLong(entry.getKey()));
                    break;
                }
            }
        }
    }
    /**
     * é…ç½®é‡Œæ˜¯å¦åŒ…含特定的工位
     * @param workstationId
     * @return
     */
    public boolean containsWorkstation(Long workstationId) {
        return workstationIds.contains(workstationId);
    }
}
collect/src/main/java/com/qianwen/mdc/collect/vo/HistoryDeviceState.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,36 @@
package com.qianwen.mdc.collect.vo;
/*
 * ç”¨äºŽä¿å­˜è®¾å¤‡çŠ¶æ€çš„åŽ†å²è®°å½•
 */
public class HistoryDeviceState {
    private long time;
    private long workstationId;
    private int deviceStatus;
    public long getTime() {
        return time;
    }
    public void setTime(long time) {
        this.time = time;
    }
    public long getWorkstationId() {
        return workstationId;
    }
    public void setWorkstationId(long workstationId) {
        this.workstationId = workstationId;
    }
    public int getDeviceStatus() {
        return deviceStatus;
    }
    public void setDeviceStatus(int deviceStatus) {
        this.deviceStatus = deviceStatus;
    }
    @Override
    public String toString() {
        return "HistoryDeviceState [time=" + time + ", workstationId=" + workstationId + ", deviceStatus="
                + deviceStatus + "]";
    }
}
collect/src/main/resources/application-dev.yml
@@ -23,7 +23,7 @@
  #心跳时间
  keepalive: 10
  connectionTimeout: 3000 #连接超时时间
  dataReceiveTopic: forward/testxx #从iot平台接收mqtt采集数据的topic forward/test
  dataReceiveTopic: forward/test #从iot平台接收mqtt采集数据的topic forward/test
  # mysql
datasource:
  type: mysql
collect/src/main/resources/com/qianwen/mdc/collect/mapper/iotdb/DeviceStateMapper.xml
@@ -193,6 +193,6 @@
        select <include refid="all_columns" /> FROM root.f2.state_${workstationId}
        where is_deleted=false and wcs=2 and time &gt;=#{startSearchTime} order by time asc limit 1
    </select>
</mapper>
collect/src/main/resources/com/qianwen/mdc/collect/mapper/iotdb/ProcessParamMapper.xml
@@ -15,6 +15,10 @@
    <select id="lastParamByWorstationId" resultType="com.qianwen.mdc.collect.entity.iotdb.ProcessParam">
            select  n as n,v as v,workstation_id as workstationId from root.f2.process_param_${workstationId}_* order by time desc limit 1 align by device
    </select>
    <select id="lastParamListByWorstationId" resultType="com.qianwen.mdc.collect.entity.iotdb.ProcessParam">
            select  n as n,v as v,workstation_id as workstationId from root.f2.process_param_${workstationId}_${item} order by time desc limit ${count} align by device
    </select>
</mapper>
collect/src/main/resources/logback.xml
@@ -1,7 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration debug="false">
    <!--定义日志文件的存储地址 å‹¿åœ¨ LogBack çš„配置中使用相对路径-->
    <property name="LOG_HOME" value="/var/collect/logs" />
    <property name="LOG_HOME" value="/var/log/collect" />
    <!-- æŽ§åˆ¶å°è¾“出 -->
    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
    <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
collect/src/test/java/com/qianwen/mdc/collect/utils/FixSizeLinkedListTest.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,27 @@
package com.qianwen.mdc.collect.utils;
import static org.junit.Assert.assertEquals;
import org.junit.jupiter.api.Test;
public class FixSizeLinkedListTest {
    @Test
    public void testAdd() {
        FixSizeLinkedList<Integer> l = new FixSizeLinkedList<>(2);
        l.add(1);
        l.add(2);
        assertEquals(l.size(),2);
        l.add(3);
        assertEquals(l.size(),2);
        System.out.print(l);
        assertEquals(1,l.stream().filter(i -> i>2).count());
        assertEquals(Integer.valueOf(3),l.stream().filter(i -> i>2).findFirst().get());
    }
}