collect/pom.xml
@@ -164,7 +164,10 @@ <artifactId>xxl-job-core</artifactId> <version>2.1.2</version> </dependency> <dependency> <groupId>com.github.ben-manes.caffeine</groupId> <artifactId>caffeine</artifactId> </dependency> <dependency> <groupId>org.jmockit</groupId> collect/src/main/java/com/qianwen/mdc/collect/constants/IOTDBConstant.java
@@ -46,4 +46,9 @@ * é¶ä»¶å·æ¨¡æ¿åç§° state */ public static final String TEMPLATE_PROG_NAME = "prog_name"; /** * 设å¤ç¶ææ°æ®ç¹åç§° DeviceStatus */ public static final String DEVICE_STATUS_KEY = "DeviceStatus"; } collect/src/main/java/com/qianwen/mdc/collect/controller/InnerCacheTestController.java
¶Ô±ÈÐÂÎļþ @@ -0,0 +1,64 @@ package com.qianwen.mdc.collect.controller; import java.util.List; import java.util.concurrent.TimeUnit; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; import com.qianwen.mdc.collect.service.DeviceStatusCheckCfgLoader; import com.qianwen.mdc.collect.vo.HistoryDeviceState; @RestController("innercache") @RequestMapping("innercache") public class InnerCacheTestController { Cache<String, String> cache; @Autowired private DeviceStatusCheckCfgLoader cfgLoader; @GetMapping("/get") public Object gett() { return cache.getIfPresent("key1"); } @GetMapping("/set") public Object sett() { cache = Caffeine.newBuilder() // æå¤§100ï¼è¶ è¿åæ¾å¼èç .maximumSize(100).expireAfterWrite(20, TimeUnit.SECONDS) // 10 åéè¿æ //.expireAfterAccess(20, TimeUnit.SECONDS) .build(); // åç¼å䏿¾å ¥æ°æ® cache.put("key1", "value1"); return cache.getIfPresent("key1"); } /** * * @param wid å·¥ä½id * @return */ @GetMapping("/getc") public Object getc(long wid) { return cfgLoader.getHistoryList(wid); } @GetMapping("/setc") public Object setc(long wid) { List<HistoryDeviceState> statusHistoryList = cfgLoader.getHistoryList(wid); HistoryDeviceState his = new HistoryDeviceState(); his.setTime(999); his.setDeviceStatus(1000); his.setWorkstationId(wid); statusHistoryList.add(his); return cfgLoader.getHistoryList(wid); } } collect/src/main/java/com/qianwen/mdc/collect/controller/MqttController.java
@@ -70,7 +70,7 @@ } @GetMapping("/tpluse") public Object tpluse(String tplname,String path) throws StatementExecutionException, IoTDBConnectionException { return iotService.isTemplateSetOnPath(tplname, path); return "";//iotService.isTemplateSetOnXXPath(tplname, path); } collect/src/main/java/com/qianwen/mdc/collect/entity/mgr/DeviceStatusCheckCfg.java
¶Ô±ÈÐÂÎļþ @@ -0,0 +1,29 @@ package com.qianwen.mdc.collect.entity.mgr; import com.baomidou.mybatisplus.annotation.TableName; import com.qianwen.core.tenant.mp.TenantEntity; @TableName("device_status_check_cfg") public class DeviceStatusCheckCfg extends TenantEntity { /** * */ private static final long serialVersionUID = 1L; private String checkConfig; public String getCheckConfig() { return checkConfig; } public void setCheckConfig(String checkConfig) { this.checkConfig = checkConfig; } public static long getSerialversionuid() { return serialVersionUID; } @Override public String toString() { return "DeviceStatusCheckCfg [checkConfig=" + checkConfig + "]"; } } collect/src/main/java/com/qianwen/mdc/collect/mapper/iotdb/DeviceStateMapper.java
@@ -6,6 +6,7 @@ import com.baomidou.mybatisplus.core.mapper.BaseMapper; import com.qianwen.mdc.collect.dto.StateAggregateTimeDTO; import com.qianwen.mdc.collect.entity.iotdb.DeviceState; import com.qianwen.mdc.collect.vo.HistoryDeviceState; import java.time.LocalDateTime; import java.util.Date; collect/src/main/java/com/qianwen/mdc/collect/mapper/iotdb/ProcessParamMapper.java
@@ -1,5 +1,7 @@ package com.qianwen.mdc.collect.mapper.iotdb; import java.util.List; import org.apache.ibatis.annotations.Param; import com.baomidou.dynamic.datasource.annotation.DS; @@ -16,4 +18,5 @@ */ ProcessParam lastParamByWorstationId(@Param("workstationId") Long workstationId); List<ProcessParam> lastParamListByWorstationId(@Param("item")String item,@Param("workstationId") Long workstationId,@Param("count") int count); } collect/src/main/java/com/qianwen/mdc/collect/mapper/mgr/DeviceStatusCheckCfgMapaper.java
¶Ô±ÈÐÂÎļþ @@ -0,0 +1,12 @@ package com.qianwen.mdc.collect.mapper.mgr; import org.apache.ibatis.annotations.Param; import com.baomidou.mybatisplus.core.mapper.BaseMapper; import com.qianwen.mdc.collect.entity.mgr.DeviceStatusCheckCfg; import com.qianwen.mdc.collect.entity.mgr.PartWorkingHour; public interface DeviceStatusCheckCfgMapaper extends BaseMapper<DeviceStatusCheckCfg> { } collect/src/main/java/com/qianwen/mdc/collect/service/DeviceStatusCheckCfgLoader.java
¶Ô±ÈÐÂÎļþ @@ -0,0 +1,173 @@ package com.qianwen.mdc.collect.service; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import com.alibaba.fastjson.JSONObject; import com.baomidou.mybatisplus.core.toolkit.Wrappers; import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; import com.qianwen.mdc.collect.cache.WorkstationCache; import com.qianwen.mdc.collect.constants.IOTDBConstant; import com.qianwen.mdc.collect.dto.WorkstationDTO; import com.qianwen.mdc.collect.entity.iotdb.ProcessParam; import com.qianwen.mdc.collect.entity.mgr.DeviceStatusCheckCfg; import com.qianwen.mdc.collect.mapper.iotdb.ProcessParamMapper; import com.qianwen.mdc.collect.mapper.mgr.DeviceStatusCheckCfgMapaper; import com.qianwen.mdc.collect.utils.FixSizeLinkedList; import com.qianwen.mdc.collect.vo.DeviceStateCheckConfigVO; import com.qianwen.mdc.collect.vo.HistoryDeviceState; /** * 设å¤ç¶ææ ¸å®é ç½®å è½½å¨ */ @Service public class DeviceStatusCheckCfgLoader { private static final Logger logger = LoggerFactory.getLogger(DeviceStatusCheckCfgLoader.class); @Autowired private DeviceStatusCheckCfgMapaper cfgMapper; @Autowired private WorkstationCache workstationCache; @Autowired private ProcessParamMapper processParamMapper; //public static Map<Long,List<HistoryDeviceStatus>> DEVICE_STTUS_MAP = new HashMap<>(); public static final String CFG_KEY = "check_config"; public static final String CFG_VO_KEY = "check_config_vo"; public static final String EMPTY_CONFIG = "{\"workstationCodes\":\"\",\"interval\":3000}"; /** * ç¶æåå²å表æå¤§è®°å½æ° */ public static final int HISTORY_SIZE = 20; /** * ç¼åç§æ° */ public static final int CACHE_DURATION_SEC = 120;//2mins /** * é ç½®çç¼å */ Cache<String, String> cfgCache; /** * é ç½®VOçç¼å */ Cache<String, DeviceStateCheckConfigVO> cfgVOCache; /** * ç¶æåå²ç¼å */ Cache<Long,FixSizeLinkedList<HistoryDeviceState>> historyCache; public DeviceStatusCheckCfgLoader(){ cfgCache = Caffeine.newBuilder() // æå¤§sizeï¼è¶ è¿åæ¾å¼èç .maximumSize(1).expireAfterWrite(CACHE_DURATION_SEC, TimeUnit.SECONDS) .build(); cfgVOCache = Caffeine.newBuilder() .maximumSize(1).expireAfterWrite(CACHE_DURATION_SEC, TimeUnit.SECONDS) .build(); historyCache = Caffeine.newBuilder() .maximumSize(50000)//æå¤50000æ¡æ°æ®å¨å (50000个workstationId) .expireAfterWrite(CACHE_DURATION_SEC, TimeUnit.SECONDS) .build(); } //ä¸»æ¹æ³ public FixSizeLinkedList<HistoryDeviceState> getHistoryList(long workstationId) { FixSizeLinkedList<HistoryDeviceState> hisList = historyCache.getIfPresent(workstationId); if(hisList == null) { //å¯è½ç¼åè¿æäºï¼ä¹å¯è½æ ¹æ¬æ²¡æï¼ï¼ï¼ //String checkCfg = loadCfgFromCache(); //String checkCfg = getCfgFromDb(); //loadAllHistory(checkCfg); loadHistoryByWorkstation(workstationId); hisList = historyCache.getIfPresent(workstationId); //if(hisList == null) { //å¦æè¿æ¯nullï¼å说æå¨æ°æ®åºä¸æ²¡æé ç½®è¿ä¸ªè®¾å¤ã为äºé¿å éå¤ä»æ°æ®åºå è½½é ç½®ï¼æ¾å ¥ä¸ä¸ªç©ºlist // historyCache.put(workstationId, new FixSizeLinkedList<HistoryDeviceState>(HISTORY_SIZE)); //} } return hisList; } void loadHistoryByWorkstation(long workstationId) { historyCache.put(workstationId, queryStatusHistory(workstationId)); } FixSizeLinkedList<HistoryDeviceState> queryStatusHistory(long workstationId){ FixSizeLinkedList<HistoryDeviceState> list = new FixSizeLinkedList<>(HISTORY_SIZE); List <ProcessParam> stateList = processParamMapper.lastParamListByWorstationId(IOTDBConstant.DEVICE_STATUS_KEY,workstationId,HISTORY_SIZE); ProcessParam state; for(int i=stateList.size()-1;i>-1;i--) { state = stateList.get(i); HistoryDeviceState his = new HistoryDeviceState(); his.setTime(state.getTime()); his.setDeviceStatus(Integer.parseInt(state.getV())); his.setWorkstationId(workstationId); list.add(his); } return list; } public DeviceStateCheckConfigVO getCheckConfigVOFromCache() { DeviceStateCheckConfigVO vo = cfgVOCache.getIfPresent(CFG_VO_KEY); if(vo == null) { logger.info("configvo expired reload"); vo = new DeviceStateCheckConfigVO(); String checkConfig = loadCfgFromCache(); JSONObject cfgJson = JSONObject.parseObject(checkConfig); vo.setWorkstationCodes(cfgJson.getString("workstationCodes")); vo.setInterval(cfgJson.getLongValue("interval")); Map<String,WorkstationDTO> wsMap = workstationCache.getWorkstations(); vo.init(wsMap); cfgVOCache.put(CFG_VO_KEY, vo); } logger.info("checkconfig load vo:{}",vo); return vo; } /** * ä»ç¼åå è½½é ç½® * @return */ public String loadCfgFromCache() { String checkConfig = cfgCache.getIfPresent(CFG_KEY); if(checkConfig == null) { checkConfig = getCfgFromDb(); } if(checkConfig == null) { checkConfig = EMPTY_CONFIG; } return checkConfig; } String getCfgFromDb() { List<DeviceStatusCheckCfg> cfgs = cfgMapper.selectList(Wrappers.emptyWrapper()); logger.info("checkconfig load from db:{}",cfgs); if(!cfgs.isEmpty()) { return cfgs.get(0).getCheckConfig(); }else { return null; } } } collect/src/main/java/com/qianwen/mdc/collect/service/IOTMqttReceiveService.java
@@ -2,6 +2,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Set; import org.apache.commons.lang3.StringUtils; @@ -12,9 +13,13 @@ import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.qianwen.mdc.collect.constants.IOTDBConstant; import com.qianwen.mdc.collect.domain.TelemetryData; import com.qianwen.mdc.collect.domain.TelemetryDataItem; import com.qianwen.mdc.collect.enums.DefaultWcsEnum; import com.qianwen.mdc.collect.utils.redis.RedisUtil; import com.qianwen.mdc.collect.vo.DeviceStateCheckConfigVO; import com.qianwen.mdc.collect.vo.HistoryDeviceState; import com.qianwen.mdc.collect.vo.WorkstationDatapointsVO; @Service @@ -28,7 +33,8 @@ private PackedDataService packedDataService; @Autowired private WorkstationDatapointsService dpService; @Autowired private DeviceStatusCheckCfgLoader cfgLoader; /** * å¤çæ¶å°çæ¶æ¯,对åºTelemetryDataPostingConsumer * @param payload @@ -37,11 +43,75 @@ //è§£ææ¶æ¯ List<TelemetryData> teleList = parsePayload(payload); checkDeviceStatus(teleList); collectDataService.handleCollectData(teleList); packedDataService.handle(teleList); } /** * æ£æ¥è®¾å¤ç¶æï¼æ°æ®ï¼å¦ææ¯éªçç¶æ(è¿è¡,å¾ æºæ¥å忢ï¼ï¼è§åæ¯å¨æ°æ®è¡¨å®ä¹ç * @param teleList */ void checkDeviceStatus(List<TelemetryData> teleList){ DeviceStateCheckConfigVO chkConfigVO = cfgLoader.getCheckConfigVOFromCache(); long interval = cfgLoader.getCheckConfigVOFromCache().getInterval(); int wcs; for(TelemetryData tdata:teleList) {//tdata.getWorkstationId() for(TelemetryDataItem dataItem:tdata.getDataItems()) { List<Map<String,String>> points = dataItem.getDataPoints(); for(Map<String,String> pt : points) { if(pt.containsKey(IOTDBConstant.DEVICE_STATUS_KEY)){ String val = pt.get(IOTDBConstant.DEVICE_STATUS_KEY); wcs = Integer.parseInt(val); //!!!codeèå´æ£æ¥ computeDeviceStatus(tdata.getWorkstationId(),wcs,dataItem.getTime(),pt,chkConfigVO); } } } } //di.getTime() //tdata.getWorkstationId(); } /** * æ ¸å®å¹¶è®¡ç®ç¶æå¼ï¼å¦ææç¯éªçæ åµï¼åå¤å®ä¸ºå¾ æº * @param workstationId * @param wcs ç¶æå¼ * @param pointTime æ°æ®ç¹çæ¶é´æ³ * @param point æ°æ®ç¹keyåval * @param interval æ£æ¥çæ¶é´è·¨åº¦ï¼æ¯«ç§ */ void computeDeviceStatus(long workstationId,int wcs,long pointTime,Map<String,String> point,DeviceStateCheckConfigVO chkConfigVO) { long interval = chkConfigVO.getInterval(); if(wcs != DefaultWcsEnum.RUNNING.getCode() || !chkConfigVO.containsWorkstation(workstationId)) {//ä» é对è¿è¡ç¶æçæè¿è¡ç¶ææ ¸å® return; } List<HistoryDeviceState> statusHistoryList = cfgLoader.getHistoryList(workstationId);// DEVICE_STTUS_MAP.get(workstationId);//使¶åå§åå 鍿°æ®ï¼ï¼ //pointTime-interval 3ç§å long count = statusHistoryList.stream().filter(his -> { return his.getTime()>(pointTime-interval) && his.getDeviceStatus() != DefaultWcsEnum.RUNNING.getCode(); }).count(); log.info("checkstatus count={}",count); if(count > 0) { //ç¯éªçï¼å¤å®ä¸ºå¾ æºï¼è¿éä¿®æ¹è®¾å¤ç¶æçæ°æ® point.put(IOTDBConstant.DEVICE_STATUS_KEY, DefaultWcsEnum.STANDBY.getCode()+""); } HistoryDeviceState his = new HistoryDeviceState(); his.setTime(pointTime); his.setDeviceStatus(wcs); his.setWorkstationId(workstationId); statusHistoryList.add(his); } /** * è§£ææ¶æ¯ä½å符串æªåè¡¨ï¼æ¯ä¸ªå表项æ¯ä¸ä¸ªå·¥ä½çæ°æ® * @param payload @@ -62,7 +132,7 @@ //è·å工使°æ®ç¹é ç½®,åªä¿åé ç½®å¥½çæ°æ®ç¹ï¼æ²¡æé ç½®çééæ°æ®æå¼ã final WorkstationDatapointsVO dpVo = dpService.getDatapointsByAppIdFromCache(appId); //final WorkstationDatapointsVO dpVo = dpService.getDataPointByAppId(appId); if(dpVo == null) { //工使²¡æå®ä¹è¿æ°æ®ç¹æè appIdä¸å¹é log.warn("appId={}æªæ¾å°æ°æ®ç¹å®ä¹è®°å½ï¼ä¸¢å¼æ°æ®",appId); collect/src/main/java/com/qianwen/mdc/collect/service/IotDBCommonService.java
@@ -1,6 +1,7 @@ package com.qianwen.mdc.collect.service; import java.util.List; import java.util.concurrent.TimeUnit; import org.apache.iotdb.rpc.IoTDBConnectionException; import org.apache.iotdb.rpc.StatementExecutionException; @@ -9,6 +10,8 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; import com.qianwen.mdc.collect.config.IotDBSessionConfig; @@ -23,12 +26,27 @@ private IotDBSessionConfig iotdbCfg; /** * ç¼åç§æ° */ public static final int CACHE_DURATION_SEC = 300;//5mins /** * æ¨¡æ¿æ¯å¦è®¾ç½®çç¼å */ Cache<String, Boolean> templateSetCache; public IotDBCommonService() { templateSetCache = Caffeine.newBuilder() // æå¤§sizeï¼è¶ è¿åæ¾å¼èç,å¤§æ¦æ¯ä¸ªæ°æ®ç¹é½åºè¯¥æä¸ä¸ªç¼åï¼å¤å ä¸äºå ¶ä»ç±»åç表ï¼å ¥state_xxxï¼ .maximumSize(5000).expireAfterWrite(CACHE_DURATION_SEC, TimeUnit.SECONDS) .build(); } /** * æ¨¡æ¿æ¯å¦æè½½å°æå®è·¯å¾äº * @param template 模æ¿åç§° * @param path 设å¤è·¯å¾ * @return */ public boolean isTemplateSetOnPath(String template,String path) { private boolean isTemplateSetOnPath(String template,String path) { List<String> pathlist; try { pathlist = iotdbCfg.getSessionPool().showPathsTemplateSetOn(template); @@ -42,14 +60,28 @@ } /** * æ£æ¥å¨ç¼åä¸è®¾å¤idæ¯å¦è¢«è®¾ç½®è¿ * @param path * @return */ private boolean isTemplateSetInCache(String path) { return templateSetCache.getIfPresent(path) != null;//ä¸ç¨èètrue/false,ækeyå°±æ¯è®¾ç½®è¿ } /** * 妿deviceIdæå®çè·¯å¾æ²¡ææè½½æ¨¡æ¿åæ§è¡æè½½ * @param template 模æ¿åç§° * @param deviceId 设å¤è·¯å¾ */ public void setTemmplateIfNotSet(String template,String deviceId) { if(isTemplateSetInCache(deviceId)) {//ç¼åä¸è®°å½è¿ï¼è¯´æå·²ç»è®¾ç½®æ¨¡æ¿äºï¼ä¸åç»§ç»æ§è¡åç»æ¥éª¤ return; } if(!isTemplateSetOnPath(template, deviceId)) { try { iotdbCfg.getSessionPool().setSchemaTemplate(template, deviceId); //å¨ç¼åä¸è®°å½è¯¥è®¾å¤pathå·²ç»è¢«é ç½®è¿äº templateSetCache.put(deviceId, true); } catch (Exception e) { logger.error("为deviceId设置模æ¿é误,template="+template+",deviceId="+deviceId,e); } collect/src/main/java/com/qianwen/mdc/collect/service/ProcessParamService.java
@@ -59,7 +59,7 @@ public void insertProcessParam(ProcessParam param) { String deviceId = generateDeviceId(param.getWorkstationId(),param.getN()); iotDBCommonService.isTemplateSetOnPath(IOTDBConstant.TEMPLATE_PROCESS_PARAM, deviceId); iotDBCommonService.setTemmplateIfNotSet(IOTDBConstant.TEMPLATE_PROCESS_PARAM, deviceId); Tablet tablet = new Tablet(deviceId, schemas); int rowIndex = tablet.rowSize++; collect/src/main/java/com/qianwen/mdc/collect/utils/FixSizeLinkedList.java
¶Ô±ÈÐÂÎļþ @@ -0,0 +1,33 @@ package com.qianwen.mdc.collect.utils; import java.util.LinkedList; public class FixSizeLinkedList <T> extends LinkedList<T>{ /** * */ private static final long serialVersionUID = 1L; private int capacity; public FixSizeLinkedList(int capacity) { super(); this.capacity = capacity; } @Override public void add(int index, T element) { super.add(index, element); // å è®©å ¶æ´æ¹æ°éï¼ if (size() > capacity) { super.removeFirst(); } } @Override public boolean add(T t) { if (size() + 1 > capacity) { super.removeFirst(); } return super.add(t); } } collect/src/main/java/com/qianwen/mdc/collect/vo/DeviceStateCheckConfigVO.java
¶Ô±ÈÐÂÎļþ @@ -0,0 +1,60 @@ package com.qianwen.mdc.collect.vo; import java.util.ArrayList; import java.util.List; import java.util.Map; import org.apache.commons.lang3.StringUtils; import com.qianwen.mdc.collect.dto.WorkstationDTO; /* * ç¨äºæ£æ¥ç¶ææ ¸å®çé ç½®ï¼é ç½®çè§£æç»æ */ public class DeviceStateCheckConfigVO { private long interval; private String workstationCodes; private List<Long> workstationIds = new ArrayList<>(); public long getInterval() { return interval; } public void setInterval(long interval) { this.interval = interval; } public String getWorkstationCodes() { return workstationCodes; } public void setWorkstationCodes(String workstationCodes) { this.workstationCodes = workstationCodes; } @Override public String toString() { return "DeviceStateCheckConfigVO [interval=" + interval + ", workstationCodes=" + workstationCodes + "]"; } /** * åå§åï¼å°å·¥ä½ä»£ç åå§å为对åºçå·¥ä½idéåï¼ä¾å¤ææ¯å¦éè¦æ ¸å®ç¶æ * @param wsMap */ public void init(Map<String, WorkstationDTO> wsMap) { String[] codeArr = StringUtils.split(workstationCodes, ","); for(int i=0;i<codeArr.length;i++) { for (Map.Entry<String,WorkstationDTO> entry : wsMap.entrySet()) { if(StringUtils.equals(codeArr[i], entry.getValue().getCode())) { this.workstationIds.add(Long.parseLong(entry.getKey())); break; } } } } /** * é ç½®éæ¯å¦å å«ç¹å®çå·¥ä½ * @param workstationId * @return */ public boolean containsWorkstation(Long workstationId) { return workstationIds.contains(workstationId); } } collect/src/main/java/com/qianwen/mdc/collect/vo/HistoryDeviceState.java
¶Ô±ÈÐÂÎļþ @@ -0,0 +1,36 @@ package com.qianwen.mdc.collect.vo; /* * ç¨äºä¿å设å¤ç¶æçåå²è®°å½ */ public class HistoryDeviceState { private long time; private long workstationId; private int deviceStatus; public long getTime() { return time; } public void setTime(long time) { this.time = time; } public long getWorkstationId() { return workstationId; } public void setWorkstationId(long workstationId) { this.workstationId = workstationId; } public int getDeviceStatus() { return deviceStatus; } public void setDeviceStatus(int deviceStatus) { this.deviceStatus = deviceStatus; } @Override public String toString() { return "HistoryDeviceState [time=" + time + ", workstationId=" + workstationId + ", deviceStatus=" + deviceStatus + "]"; } } collect/src/main/resources/application-dev.yml
@@ -23,7 +23,7 @@ #å¿è·³æ¶é´ keepalive: 10 connectionTimeout: 3000 #è¿æ¥è¶ æ¶æ¶é´ dataReceiveTopic: forward/testxx #ä»iot平尿¥æ¶mqttééæ°æ®çtopic forward/test dataReceiveTopic: forward/test #ä»iot平尿¥æ¶mqttééæ°æ®çtopic forward/test # mysql datasource: type: mysql collect/src/main/resources/com/qianwen/mdc/collect/mapper/iotdb/DeviceStateMapper.xml
@@ -193,6 +193,6 @@ select <include refid="all_columns" /> FROM root.f2.state_${workstationId} where is_deleted=false and wcs=2 and time >=#{startSearchTime} order by time asc limit 1 </select> </mapper> collect/src/main/resources/com/qianwen/mdc/collect/mapper/iotdb/ProcessParamMapper.xml
@@ -15,6 +15,10 @@ <select id="lastParamByWorstationId" resultType="com.qianwen.mdc.collect.entity.iotdb.ProcessParam"> select n as n,v as v,workstation_id as workstationId from root.f2.process_param_${workstationId}_* order by time desc limit 1 align by device </select> <select id="lastParamListByWorstationId" resultType="com.qianwen.mdc.collect.entity.iotdb.ProcessParam"> select n as n,v as v,workstation_id as workstationId from root.f2.process_param_${workstationId}_${item} order by time desc limit ${count} align by device </select> </mapper> collect/src/main/resources/logback.xml
@@ -1,7 +1,7 @@ <?xml version="1.0" encoding="UTF-8"?> <configuration debug="false"> <!--å®ä¹æ¥å¿æä»¶çåå¨å°å å¿å¨ LogBack çé ç½®ä¸ä½¿ç¨ç¸å¯¹è·¯å¾--> <property name="LOG_HOME" value="/var/collect/logs" /> <property name="LOG_HOME" value="/var/log/collect" /> <!-- æ§å¶å°è¾åº --> <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder"> collect/src/test/java/com/qianwen/mdc/collect/utils/FixSizeLinkedListTest.java
¶Ô±ÈÐÂÎļþ @@ -0,0 +1,27 @@ package com.qianwen.mdc.collect.utils; import static org.junit.Assert.assertEquals; import org.junit.jupiter.api.Test; public class FixSizeLinkedListTest { @Test public void testAdd() { FixSizeLinkedList<Integer> l = new FixSizeLinkedList<>(2); l.add(1); l.add(2); assertEquals(l.size(),2); l.add(3); assertEquals(l.size(),2); System.out.print(l); assertEquals(1,l.stream().filter(i -> i>2).count()); assertEquals(Integer.valueOf(3),l.stream().filter(i -> i>2).findFirst().get()); } }