yangys
2024-11-13 04d53749b21921c9bceebe120d170c2ee6e533af
增加离线检查定时任务的逻辑
已修改16个文件
已添加6个文件
589 ■■■■ 文件已修改
collect/src/main/java/com/qianwen/mdc/collect/cache/WorkstationCache.java 15 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
collect/src/main/java/com/qianwen/mdc/collect/config/MqttConfig.java 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
collect/src/main/java/com/qianwen/mdc/collect/config/MqttProperties.java 19 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
collect/src/main/java/com/qianwen/mdc/collect/constants/IOTDBConstant.java 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
collect/src/main/java/com/qianwen/mdc/collect/controller/CalController.java 10 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
collect/src/main/java/com/qianwen/mdc/collect/controller/CollectTestController.java 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
collect/src/main/java/com/qianwen/mdc/collect/dto/CalendarShiftInfoDTO.java 4 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
collect/src/main/java/com/qianwen/mdc/collect/job/DeviceOfflineStatusCheckJob.java 30 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
collect/src/main/java/com/qianwen/mdc/collect/mapper/iotdb/ProcessParamMapper.java 11 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
collect/src/main/java/com/qianwen/mdc/collect/service/CalendarService.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
collect/src/main/java/com/qianwen/mdc/collect/service/CollectDataService.java 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
collect/src/main/java/com/qianwen/mdc/collect/service/DeviceOfflineCheckService.java 100 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
collect/src/main/java/com/qianwen/mdc/collect/service/DeviceStateFixPointService.java 51 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
collect/src/main/java/com/qianwen/mdc/collect/service/DeviceStateService.java 19 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
collect/src/main/java/com/qianwen/mdc/collect/service/FactoryDataService.java 106 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
collect/src/main/java/com/qianwen/mdc/collect/service/IotDBCommonService.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
collect/src/main/java/com/qianwen/mdc/collect/service/PackedDataService.java 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
collect/src/main/java/com/qianwen/mdc/collect/service/ProcessParamService.java 103 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
collect/src/main/java/com/qianwen/mdc/collect/vo/FactoryDataVO.java 79 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
collect/src/main/resources/application-dev.yml 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
collect/src/main/resources/application.yml 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
collect/src/main/resources/com/qianwen/mdc/collect/mapper/iotdb/ProcessParamMapper.xml 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
collect/src/main/java/com/qianwen/mdc/collect/cache/WorkstationCache.java
@@ -107,12 +107,7 @@
        }
        return result;
    }
    /*
    public static Boolean clearWorkStationCache() {
        String redisKey = "posting:workstation".concat("::").concat(WORKSTATION_ALL);
        return bladeRedis.del(redisKey);
    }
    */
    /**
     * èŽ·å–æŒ‡å®šæ—¥æœŸçš„æ—¥åŽ†ä»£ç 
     * @param workstationId
@@ -135,15 +130,15 @@
        return calendarCode;
    }
    
    public GlobalWcsOfRps getWorkstationWcsSetting(Long workstationId, String code) {
    public GlobalWcsOfRps getWorkstationWcsSetting(Long workstationId, String deviceStatusCode) {
        String redisKey = COLLECT_WORKSTATION.concat("::").concat(WORKSTATION_ID).concat(workstationId.toString()
            .concat(WCS_SETTING));
        
        //GlobalWcsOfRps wcsSetting = (GlobalWcsOfRps)redisUtil.hGet(redisKey, code);
        GlobalWcsOfRps wcsSetting = (GlobalWcsOfRps)redisUtil.hget(redisKey, code);
        GlobalWcsOfRps wcsSetting = (GlobalWcsOfRps)redisUtil.hget(redisKey, deviceStatusCode);
        if (wcsSetting == null) {
          wcsSetting = globalWcsOfRpsMapper.selectOne(Wrappers.<GlobalWcsOfRps>lambdaQuery()
              .eq(GlobalWcsOfRps::getCode, code)
              .eq(GlobalWcsOfRps::getCode, deviceStatusCode)
              .isNull(GlobalWcsOfRps::getPrecondition));
          if(wcsSetting == null) {
              wcsSetting = new GlobalWcsOfRps();
@@ -153,7 +148,7 @@
          //wcsSetting = Func.isNotEmpty(wcsSetting) ? wcsSetting : GlobalWcsOfRps.builder().rps(0).isPlan(0).build();
          //bladeRedis.hSet(redisKey, code, wcsSetting);
          //bladeRedis.expire(redisKey, Duration.ofDays(1L));
          redisUtil.hset(redisKey, code, wcsSetting, Duration.ofDays(1L).getSeconds());
          redisUtil.hset(redisKey, deviceStatusCode, wcsSetting, Duration.ofDays(1L).getSeconds());
        } 
        return wcsSetting;
      }
collect/src/main/java/com/qianwen/mdc/collect/config/MqttConfig.java
@@ -41,6 +41,10 @@
    @Value("${mqtt.password:}")
    private String mqttPassword;
    @Value("${mqtt.timeout:1000}")
    private int timeout;
    @Autowired
    private IOTMqttReceiveService recService;
    @Autowired
@@ -73,6 +77,7 @@
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(new String[] { mqttHost});//"tcp://82.156.1.83:1884" 
        options.setConnectionTimeout(timeout);
        
        if(ObjectUtil.isNotEmpty(mqttUserName)) {
            options.setUserName(mqttUserName);
collect/src/main/java/com/qianwen/mdc/collect/config/MqttProperties.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,19 @@
package com.qianwen.mdc.collect.config;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
@Component
@ConfigurationProperties(prefix = "mqtt")
public class MqttProperties {
    private String host;
    private String username;
    private String password;
    private String connectionTimeout;
    private int keepalive;
    private String dataReceiveTopic;
    private int timeout;
    private boolean cleansession;
}
collect/src/main/java/com/qianwen/mdc/collect/constants/IOTDBConstant.java
@@ -36,4 +36,9 @@
     * å‘Šè­¦æ¨¡æ¿åç§° alarm
     */
    public static final String TEMPLATE_ALARM = "alarm";
    /**
     * è¿‡ç¨‹å‚数表模板
     */
    public static final String TEMPLATE_PROCESS_PARAM = "process_param";
}
collect/src/main/java/com/qianwen/mdc/collect/controller/CalController.java
@@ -23,6 +23,7 @@
import com.qianwen.mdc.collect.mapper.iotdb.ProcessParamMapper;
import com.qianwen.mdc.collect.mapper.mgr.CalendarMapper;
import com.qianwen.mdc.collect.mqtt.MqttMessageSender;
import com.qianwen.mdc.collect.service.DeviceOfflineCheckService;
import com.qianwen.mdc.collect.service.DeviceStateFixPointService;
import com.qianwen.mdc.collect.service.IOTMqttReceiveService;
import com.qianwen.mdc.collect.service.IotDBCommonService;
@@ -37,7 +38,8 @@
public class CalController {
    @Autowired
    private TimeSliceCache timeSliceCache;
    @Autowired
    DeviceOfflineCheckService offService;
    String calCode= "1";
    
    @GetMapping("/gett")
@@ -58,4 +60,10 @@
        
        return "settOK,targetDate="+targetDate;
    }
    @GetMapping("/offline")
    public Object off(long wid) {
        offService.handleWorkstationOffline(wid);
        return "1";
    }
}
collect/src/main/java/com/qianwen/mdc/collect/controller/CollectTestController.java
@@ -53,9 +53,9 @@
    
    @GetMapping("/last")
    public Object last() {
        List<ProcessParam> list1= mapper.mylist(1656819188967653378L);
        System.out.print(list1);
        return list1;
        //ProcessParam last= mapper.lastParamInDuration(1656819188967653378L,300000);
        //System.out.print(list1);
        return "dd";
    }
    
    @DS("iotdb")
collect/src/main/java/com/qianwen/mdc/collect/dto/CalendarShiftInfoDTO.java
@@ -1,7 +1,9 @@
package com.qianwen.mdc.collect.dto;
import java.util.Date;
/**
 * ç”Ÿäº§æ—¥åŽ†çš„ç­åˆ¶ä¿¡æ¯
 */
public class CalendarShiftInfoDTO {
    private Long workstationId;
    private String code;
collect/src/main/java/com/qianwen/mdc/collect/job/DeviceOfflineStatusCheckJob.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,30 @@
package com.qianwen.mdc.collect.job;
import javax.annotation.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import com.qianwen.mdc.collect.service.DeviceOfflineCheckService;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.handler.annotation.XxlJob;
import com.xxl.job.core.log.XxlJobLogger;
@Component
public class DeviceOfflineStatusCheckJob {
    private static final Logger log = LoggerFactory.getLogger(DeviceOfflineStatusCheckJob.class);
    @Resource
    private DeviceOfflineCheckService offlineCheckService;
    @XxlJob("offlineCheckJobHandler")
    public ReturnT<String> offlineCheckJobHandler(String param) throws Exception {
        XxlJobLogger.log("定时监控工位是否离线",param);
        offlineCheckService.checkOffline();
        log.info("离线状态处理结束");
        XxlJobLogger.log("定时监控工位是否离线完成", new Object[0]);
        return ReturnT.SUCCESS;
    }
}
collect/src/main/java/com/qianwen/mdc/collect/mapper/iotdb/ProcessParamMapper.java
@@ -1,7 +1,5 @@
package com.qianwen.mdc.collect.mapper.iotdb;
import java.util.List;
import org.apache.ibatis.annotations.Param;
import com.baomidou.dynamic.datasource.annotation.DS;
@@ -10,7 +8,12 @@
@DS("iotdb")
public interface ProcessParamMapper {
    List<ProcessParam> mylist(@Param("workstationId") Long workstationId);
    /**
     * æŸ¥è¯¢å·¥ä½åœ¨æœ€è¿‘duration毫秒内最后一个参数
     * @param workstationId
     * @param duration
     * @return
     */
    ProcessParam lastParamByWorstationId(@Param("workstationId") Long workstationId);
}
collect/src/main/java/com/qianwen/mdc/collect/service/CalendarService.java
@@ -110,6 +110,6 @@
            shiftEndTime = endTime;
        }
        CalendarShiftTimeSlicesDTO calendarShiftTimeSlicesDTO = CalendarShiftTimeSlicesDTO.builder().shiftTimeType(productionCalendarDaytime.getShiftType().toString()).endTime(LocalDateTimeUtils.LocalDateTimeToDate(shiftEndTime)).startTime(LocalDateTimeUtils.LocalDateTimeToDate(shiftStartTime)).shiftIndex(productionCalendarDaytime.getShiftIndex()).shiftTimeType(productionCalendarDaytime.getShiftType().toString()).factoryDate(LocalDateTimeUtils.formatTimeLocalDate(productionCalendarDaytime.getCalendarDate(), "yyyy-MM-dd")).factoryMonth(productionCalendarDaytime.getMonth()).factoryWeek(productionCalendarDaytime.getWeek()).factoryYear(productionCalendarDaytime.getYear()).build();
        timeSlicesDTOMap.put(Integer.valueOf(minutesOfDay), calendarShiftTimeSlicesDTO);
        timeSlicesDTOMap.put(minutesOfDay, calendarShiftTimeSlicesDTO);
    }
}
collect/src/main/java/com/qianwen/mdc/collect/service/CollectDataService.java
@@ -140,7 +140,6 @@
            Tablet tablet = new Tablet(deviceId, schemas);
            for(TypedTelemetryData tdata : typeList) {
                
                rowIndex = tablet.rowSize++;
                tablet.addTimestamp(rowIndex, tdata.getTime());
                tablet.addValue("workstation_id",rowIndex,dt.getWorkstationId());
collect/src/main/java/com/qianwen/mdc/collect/service/DeviceOfflineCheckService.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,100 @@
package com.qianwen.mdc.collect.service;
import java.util.Arrays;
import java.util.Date;
import java.util.Map;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import com.qianwen.mdc.collect.cache.WorkstationCache;
import com.qianwen.mdc.collect.dto.WorkstationDTO;
import com.qianwen.mdc.collect.entity.iotdb.DeviceState;
import com.qianwen.mdc.collect.entity.iotdb.ProcessParam;
import com.qianwen.mdc.collect.enums.FeedbackTimePointEnum;
import com.qianwen.mdc.collect.mapper.iotdb.ProcessParamMapper;
import com.qianwen.mdc.collect.vo.FactoryDataVO;
import cn.hutool.core.util.ObjectUtil;
/**
 * è®¾å¤‡ç¦»çº¿å¤„理服务,超过一定时间无采集数据判定为离线,插入离线状态数据
 */
@Service
public class DeviceOfflineCheckService{
    private static final Logger log = LoggerFactory.getLogger(DeviceOfflineCheckService.class);
    /**
     * ç¦»çº¿åˆ¤å®šæ—¶é•¿ï¼Œé»˜è®¤5分钟
     */
    @Value("${offlineConfigDuration:3000000}")
    private long offlineConfigDuration;
    public static final String OFFLINE_VALUE = "4";//离线状态值
    @Autowired
    private ProcessParamService paramService;
    @Autowired
    private WorkstationCache workstationCache;
    @Autowired
    private ProcessParamMapper processParamMapper;
    @Autowired
    private FactoryDataService factoryDataService;
    /**
     * ä¿å­˜çŠ¶æ€å›ºå®šç‚¹æ•°æ®(state_{workstationId})
     * @param workstationIdList
     */
    public void checkOffline() {
        //将数据按照工位id分组
        Map<String, WorkstationDTO> workstationsMap = workstationCache.getWorkstations();
        if (ObjectUtil.isEmpty(workstationsMap)) {
            log.info("缓存无设备数据,退出");
            return;
        }
        Set<String> workstationIds = workstationsMap.keySet();
        log.info("离线检查工位总数:{}",workstationsMap.size());
        for(String workstationId :workstationIds) {
            log.info("开始检查工位{}的离线状态",workstationId);
            Long wid = Long.parseLong(workstationId);
            handleWorkstationOffline(wid);
        }
    }
    /**
     * æ£€æŸ¥ä¸€ä¸ªå·¥ä½æ˜¯å¦ç¦»çº¿ï¼Œç¦»çº¿åˆ™å¡«å……离线状态数据
     * @param workstationId
     */
    public void handleWorkstationOffline(long workstationId) {
        ProcessParam lastParam = processParamMapper.lastParamByWorstationId(workstationId);
        long nowMills = System.currentTimeMillis();
        long onlineRange = nowMills - offlineConfigDuration;
        if(lastParam != null && lastParam.getTime()> onlineRange) {
            //有数据,且在判定时间内-》在线
            return;
        }
        //TODO è¿™é‡Œä¸€ç›´ç¦»çº¿æ˜¯ä»€ä¹ˆæ•°æ®
        //时间段内无参数,说明设备没采集数据判定为离线,插入状态,然后发送realTime消息给mdc
        ProcessParam statusParam = new ProcessParam();
        statusParam.setTime(nowMills);
        statusParam.setN("DeviceStatus");
        statusParam.setV(OFFLINE_VALUE);
        statusParam.setWorkstationId(workstationId);
        if(lastParam == null || !lastParam.getN().equals("DeviceStatus")) {
            //之前无任何采集的数据(或者上一条不是状态数据),新加一条离线状态数据
            paramService.insertProcessParam(statusParam);
        }
        //通知mdc更新实时状态
        paramService.sendRealtimeDataMsg(statusParam);
    }
}
collect/src/main/java/com/qianwen/mdc/collect/service/DeviceStateFixPointService.java
@@ -289,29 +289,36 @@
        return default24HourPointDTOList;
    }
    /**
     * å¡«å……设备状态的班次信息
     * @param calendarShiftList
     * @param default24HourPointDTOList
     * @param state
     */
    private void packCalendarShiftInfoForTimePoint(List<CalendarShiftInfoDTO> calendarShiftList, List<DeviceState> default24HourPointDTOList, DeviceState state) {
        if (Func.isNotEmpty(calendarShiftList)) {
            CalendarShiftInfoDTO relatedShift = calendarShiftList.stream().filter(item -> {
                return item.getStartTime().getTime() <= state.getTime().longValue() && item.getEndTime().getTime() > state.getTime().longValue();
            }).findFirst().orElse(null);
            if (Func.isNotEmpty(relatedShift)) {
                state.setShiftIndex(relatedShift.getShiftIndex());
                state.setShiftTimeType(relatedShift.getShiftTimeType());
                state.setFactoryYear(relatedShift.getFactoryYear());
                state.setFactoryMonth(relatedShift.getFactoryMonth());
                state.setFactoryWeek(relatedShift.getFactoryWeek());
                String factoryDate = relatedShift.getFactoryDate();
                String[] split = Func.split(factoryDate, "-");
                state.setFactoryDate(Integer.valueOf(String.join("", split)));
                state.setIsDeleted(Boolean.FALSE);
                default24HourPointDTOList.add(state);
                return;
            }
            log.warn("工位{} æ—¥åކ{} æœªæ‰¾åˆ°æ•´ç‚¹ç­æ¬¡ä¿¡æ¯", state.getWorkstationId(), state.getCalendarCode());
            return;
        if (Func.isEmpty(calendarShiftList)) {
            log.error("工位{} æ—¥åކ{} æ— æ—¥æœŸ:[{}]的班次信息", new Object[]{state.getWorkstationId(), state.getCalendarCode(), state.getTime()});
            return;
        }
        CalendarShiftInfoDTO relatedShift = calendarShiftList.stream().filter(item -> {
            return item.getStartTime().getTime() <= state.getTime() && item.getEndTime().getTime() > state.getTime();
        }).findFirst().orElse(null);
        if (Func.isNotEmpty(relatedShift)) {
            state.setShiftIndex(relatedShift.getShiftIndex());
            state.setShiftTimeType(relatedShift.getShiftTimeType());
            state.setFactoryYear(relatedShift.getFactoryYear());
            state.setFactoryMonth(relatedShift.getFactoryMonth());
            state.setFactoryWeek(relatedShift.getFactoryWeek());
            String factoryDate = relatedShift.getFactoryDate();
            String[] split = Func.split(factoryDate, "-");
            state.setFactoryDate(Integer.valueOf(String.join("", split)));
            state.setIsDeleted(false);
            default24HourPointDTOList.add(state);
        }else {
            log.warn("工位{} æ—¥åކ{} æœªæ‰¾åˆ°å›ºå®šç‚¹ç­æ¬¡ä¿¡æ¯", state.getWorkstationId(), state.getCalendarCode());
        }
        log.error("工位{} æ—¥åކ{} æ— è‡ªç„¶å¤©{}班次信息", new Object[]{state.getWorkstationId(), state.getCalendarCode(), state.getTime()});
    }
    //不一定能用上
@@ -325,7 +332,7 @@
            workstationState.setShiftIndex(CommonConstant.DEFAULT_SHIFT_INDEX);
            workstationState.setShiftTimeType(CommonConstant.DEFAULT_SHIFT_TYPE);
            workstationState.setFeedbackPointType(FeedbackTimePointEnum.NO_FEED_BACK_POINT.getValue());
            workstationState.setIsDeleted(Boolean.FALSE);
            workstationState.setIsDeleted(false);
        }
    }
}
collect/src/main/java/com/qianwen/mdc/collect/service/DeviceStateService.java
@@ -1,10 +1,6 @@
package com.qianwen.mdc.collect.service;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -18,24 +14,9 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.google.common.collect.Lists;
import com.qianwen.core.tool.utils.Func;
import com.qianwen.core.tool.utils.SpringUtil;
import com.qianwen.mdc.collect.cache.WorkstationCache;
import com.qianwen.mdc.collect.config.IotDBSessionConfig;
import com.qianwen.mdc.collect.constants.CommonConstant;
import com.qianwen.mdc.collect.constants.IOTDBConstant;
import com.qianwen.mdc.collect.dto.CalendarShiftInfoDTO;
import com.qianwen.mdc.collect.dto.WorkstationDTO;
import com.qianwen.mdc.collect.entity.iotdb.DeviceState;
import com.qianwen.mdc.collect.enums.FeedbackTimePointEnum;
import com.qianwen.mdc.collect.mapper.mgr.CalendarMapper;
import com.qianwen.mdc.collect.utils.LocalDateTimeUtils;
import cn.hutool.core.date.DatePattern;
import cn.hutool.core.date.DateTime;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.ObjectUtil;
/**
 * è®¾å¤‡çŠ¶æ€æ™®é€šæœåŠ¡
 */
collect/src/main/java/com/qianwen/mdc/collect/service/FactoryDataService.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,106 @@
package com.qianwen.mdc.collect.service;
import java.time.Instant;
import java.time.LocalDate;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.google.common.collect.Sets;
import com.qianwen.mdc.collect.cache.TimeSliceCache;
import com.qianwen.mdc.collect.cache.WorkstationCache;
import com.qianwen.mdc.collect.constants.CommonConstant;
import com.qianwen.mdc.collect.domain.TelemetryData;
import com.qianwen.mdc.collect.domain.TelemetryDataItem;
import com.qianwen.mdc.collect.dto.CacheBuildDTO;
import com.qianwen.mdc.collect.dto.CalendarShiftTimeSlicesDTO;
import com.qianwen.mdc.collect.dto.PackedTelemetryData;
import com.qianwen.mdc.collect.handler.PackedTelemetryDataHandlerSelector;
import com.qianwen.mdc.collect.handler.TelemetryDataHandler;
import com.qianwen.mdc.collect.utils.LocalDateTimeUtils;
import com.qianwen.mdc.collect.vo.FactoryDataVO;
import cn.hutool.core.date.DatePattern;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.ObjectUtil;
/**
 * å·¥åŽ‚æ•°æ®èŽ·å–
 */
@Service
public class FactoryDataService {
    private static final Logger log = LoggerFactory.getLogger(FactoryDataService.class);
    private static final Map<Integer, String> PROCESS_PARAM_MAP = new HashMap<>();
    @Autowired
    private WorkstationCache workstationCache;
    @Autowired
    private TimeSliceCache timeSliceCache;
    /**
     * å¡«å……生产日历code,以及factoryyear
     *
     * @param pdata
     */
    public FactoryDataVO getFactoryData(long workstationId,long dataCollectTime) {
        //原来在TelemetryPropertyWrapper.packWorkstationCalendarInfo中完成
        FactoryDataVO result = new FactoryDataVO();
        Date collectTime = new Date(dataCollectTime);
        String calendarCode = workstationCache.getWorkstationCalendarCodeForDate(workstationId, DateUtil.formatDate(DateUtil.date(collectTime)));
        if (ObjectUtil.isNotEmpty(calendarCode)) {
            result.setCalendarCode(calendarCode);
        } else {
            //telemetryData.setCalendarCode("#default#");
            result.setCalendarCode("#default#");
        }
        // TelemetryPropertyWrapper中packWorkstationCalendarInfo
        //------------start
        boolean isDefaultCalendar = true;
        if (!"#default#".equals(result.getCalendarCode())) {
            CalendarShiftTimeSlicesDTO calendarShiftTimeSlicesDTO = timeSliceCache.getTimeSliceShift(result.getCalendarCode(), collectTime);//从redis中获得日历的时间切片
            if (ObjectUtil.isEmpty(calendarShiftTimeSlicesDTO)) {//如果没有时间切片,则使用TimeSliceCache.build(cacheBuildDTO);构建
                LocalDate targetDate = Instant.ofEpochMilli(dataCollectTime).atZone(ZoneOffset.systemDefault()).toLocalDate();
                CacheBuildDTO cacheBuildDTO = CacheBuildDTO.builder().tenantIds(Sets.newHashSet(new String[]{"000000"})).calendarCode(calendarCode).targetDate(targetDate).build();
                timeSliceCache.build(cacheBuildDTO);
                calendarShiftTimeSlicesDTO = timeSliceCache.getTimeSliceShift(result.getCalendarCode(), collectTime);
            }
            if (ObjectUtil.isNotEmpty(calendarShiftTimeSlicesDTO)) {
                result.setShiftIndex(calendarShiftTimeSlicesDTO.getShiftIndex());
                result.setShiftTimeType(Integer.valueOf(calendarShiftTimeSlicesDTO.getShiftTimeType()));
                result.setFactoryDate(Integer.valueOf(calendarShiftTimeSlicesDTO.getFactoryDate().replaceAll("-", "")));
                result.setFactoryWeek(calendarShiftTimeSlicesDTO.getFactoryWeek());
                result.setFactoryMonth(calendarShiftTimeSlicesDTO.getFactoryMonth());
                result.setFactoryYear(calendarShiftTimeSlicesDTO.getFactoryYear());
                isDefaultCalendar = false;
            }
        }
        //无日历切片,使用采集时间填充factoryYear,month,date,week几个属性
        if (isDefaultCalendar) {
            log.info("无日历切片");
            LocalDate localDate = Instant.ofEpochMilli(dataCollectTime).atZone(ZoneOffset.systemDefault()).toLocalDate();
            result.setFactoryDate(Integer.valueOf(DatePattern.PURE_DATE_FORMAT.format(collectTime)));
            result.setFactoryYear(DateUtil.year(collectTime));
            result.setFactoryWeek(LocalDateTimeUtils.getWeek(localDate));
            result.setFactoryMonth(DateUtil.month(collectTime) + 1);
            result.setShiftIndex(CommonConstant.DEFAULT_SHIFT_INDEX);
            result.setShiftTimeType(CommonConstant.DEFAULT_SHIFT_TYPE);
        }
        return result;
    }
}
collect/src/main/java/com/qianwen/mdc/collect/service/IotDBCommonService.java
@@ -51,7 +51,7 @@
             try {
                 iotdbCfg.getSessionPool().setSchemaTemplate(template, deviceId);
             } catch (Exception e) {
                 logger.error("获取模板使用错误,template="+template+",deviceId="+deviceId,e);
                 logger.error("为deviceId设置模板错误,template="+template+",deviceId="+deviceId,e);
            }
         }
    }
collect/src/main/java/com/qianwen/mdc/collect/service/PackedDataService.java
@@ -47,8 +47,6 @@
    private WorkstationCache workstationCache;
    @Autowired
    private TimeSliceCache timeSliceCache;
    @Autowired
    private WorkstationDatapointsService dpService;
    static {
        PROCESS_PARAM_MAP.put(1, "STATE");
@@ -130,9 +128,6 @@
            //telemetryData.setCalendarCode("#default#");
            pdata.setCalendarCode("#default#");
        }
        //pdata.setShiftIndex(1);//临时
        //pdata.setShiftTimeType(1);//临时
        
        // TelemetryPropertyWrapper中packWorkstationCalendarInfo
        //------------start
collect/src/main/java/com/qianwen/mdc/collect/service/ProcessParamService.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,103 @@
package com.qianwen.mdc.collect.service;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.iotdb.isession.pool.SessionDataSetWrapper;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.RowRecord;
import org.apache.iotdb.tsfile.write.record.Tablet;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.alibaba.fastjson.JSONObject;
import com.qianwen.core.tool.utils.ObjectUtil;
import com.qianwen.mdc.collect.config.IotDBSessionConfig;
import com.qianwen.mdc.collect.constants.IOTDBConstant;
import com.qianwen.mdc.collect.domain.TelemetryData;
import com.qianwen.mdc.collect.domain.TelemetryDataItem;
import com.qianwen.mdc.collect.entity.iotdb.ProcessParam;
import com.qianwen.mdc.collect.mqtt.MqttMessageSender;
import com.qianwen.mdc.collect.vo.WorkstationDatapointsVO;
/**
 * é‡‡é›†æ•°æ®å¤„理入库
 */
@Service
public class ProcessParamService {
    private static final Logger log = LoggerFactory.getLogger(ProcessParamService.class);
    private static final Map<Integer, String> PROCESS_PARAM_MAP = new HashMap<>();
    @Autowired
    private IotDBSessionConfig iotdbConfig;
    @Autowired
    private IotDBCommonService iotDBCommonService;
    @Autowired
    private MqttMessageSender mqttMessageSender;
    public static List<MeasurementSchema> schemas = new ArrayList<>();
    static {
        schemas.add(new MeasurementSchema("workstation_id", TSDataType.INT64));
        schemas.add(new MeasurementSchema("n", TSDataType.TEXT));
        schemas.add(new MeasurementSchema("v", TSDataType.TEXT));
    }
    /**
     * å®žæ—¶æ•°æ®topic,要与mdc里面得相同
     */
    public static final String WOCKSTATION_REALTIMEDATA_TOPIC = "mdc/realtimedata";
    public void insertProcessParam(ProcessParam param) {
        String deviceId = generateDeviceId(param.getWorkstationId(),param.getN());
        iotDBCommonService.isTemplateSetOnPath(IOTDBConstant.TEMPLATE_PROCESS_PARAM, deviceId);
        Tablet tablet = new Tablet(deviceId, schemas);
        int rowIndex = tablet.rowSize++;
        tablet.addTimestamp(rowIndex, param.getTime());
        tablet.addValue("workstation_id",rowIndex,param.getWorkstationId());
        tablet.addValue("n",rowIndex,param.getN());
        tablet.addValue("v",rowIndex,param.getV());
        try {
            iotdbConfig.getSessionPool().insertAlignedTablet(tablet);
            //updateLastParam(dt.getWorkstationId(),typeList);
        } catch (Exception e) {
            log.error("IOTDB入库失败",e);
        }finally {
            //iotdbConfig.getSessionPool().clo1se();
        }
    }
    /**
     * å‘送实时数据消息
     * @param param
     */
    public void sendRealtimeDataMsg(ProcessParam param) {
        //发送mqtt消息,通知mdc消息来了
        JSONObject json = new JSONObject();
        json.put("workstationId",param.getWorkstationId());
        json.put("name", param.getN());
        json.put("value", param.getV());
        json.put("time", param.getTime());
        mqttMessageSender.sendMessage(WOCKSTATION_REALTIMEDATA_TOPIC, json.toJSONString());
    }
    String generateDeviceId(long workstationId,String propertyName) {
        return IOTDBConstant.DB_PREFIX+IOTDBConstant.TEMPLATE_PROCESS_PARAM+"_" + workstationId+"_"+propertyName;
    }
}
collect/src/main/java/com/qianwen/mdc/collect/vo/FactoryDataVO.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,79 @@
package com.qianwen.mdc.collect.vo;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.lang3.StringUtils;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import cn.hutool.core.util.ObjectUtil;
import io.swagger.annotations.ApiModelProperty;
/**
 * å·¥åŽ‚æ•°æ®VO
 */
public class FactoryDataVO implements Serializable{
    /**
     * åºåˆ—化,应为需要spring缓存
     */
    private static final long serialVersionUID = 6558493027948435061L;
    private String calendarCode;
    private Integer factoryYear;
    private Integer factoryMonth;
    private Integer factoryWeek;
    private Integer factoryDate;
    private Integer shiftIndex;
    private Integer shiftTimeType;
    public String getCalendarCode() {
        return calendarCode;
    }
    public void setCalendarCode(String calendarCode) {
        this.calendarCode = calendarCode;
    }
    public Integer getFactoryYear() {
        return factoryYear;
    }
    public void setFactoryYear(Integer factoryYear) {
        this.factoryYear = factoryYear;
    }
    public Integer getFactoryMonth() {
        return factoryMonth;
    }
    public void setFactoryMonth(Integer factoryMonth) {
        this.factoryMonth = factoryMonth;
    }
    public Integer getFactoryWeek() {
        return factoryWeek;
    }
    public void setFactoryWeek(Integer factoryWeek) {
        this.factoryWeek = factoryWeek;
    }
    public Integer getFactoryDate() {
        return factoryDate;
    }
    public void setFactoryDate(Integer factoryDate) {
        this.factoryDate = factoryDate;
    }
    public Integer getShiftIndex() {
        return shiftIndex;
    }
    public void setShiftIndex(Integer shiftIndex) {
        this.shiftIndex = shiftIndex;
    }
    public Integer getShiftTimeType() {
        return shiftTimeType;
    }
    public void setShiftTimeType(Integer shiftTimeType) {
        this.shiftTimeType = shiftTimeType;
    }
}
collect/src/main/resources/application-dev.yml
@@ -1,6 +1,6 @@
spring:
  redis:
    database: 5
    database: 2
    host: 120.46.212.231
    port: 6379
    password: root
@@ -18,8 +18,8 @@
  #MQTT-当前客户端的唯一标识
  clientid: mqtt_publish
  default_topic: TEST #当前客户端的默认主题(大多数时候没什么用)
  #发送超时时间
  mqtt.timeout: 1000
  #连接超时时间
  timeout: 3000
  #心跳时间
  keepalive: 10
  connectionTimeout: 3000 #连接超时时间
collect/src/main/resources/application.yml
@@ -46,6 +46,8 @@
  mapper-locations: classpath:mapper/*.xml
  type-aliases-package: com.qianwen.mdc.collect.entity.iotdb,com.qianwen.mdc.collect.entity.mgr
  #com.qianwen.mdc.collect.entity.iotdb
#离线判定时长(毫秒),超过这个时长无采集数据判定为离线
offlineConfigDuration: 3000000
wfg:
# è¯·ä¸€å®šæ³¨æ„ï¼ WorkerIdBitLength + SeqBitLength + DataCenterIdBitLength <= 22
  # 1表示雪花漂移算法,2表示传统雪花算法
collect/src/main/resources/com/qianwen/mdc/collect/mapper/iotdb/ProcessParamMapper.xml
@@ -12,12 +12,8 @@
    </resultMap>
    <!-- n,v,workstation_id -->
    <select id="mylist" resultType="com.qianwen.mdc.collect.entity.iotdb.ProcessParam"
            parameterType="java.lang.Long">
            select  n,v,workstation_id from root.f2.process_param*
             where workstation_id=#{workstationId}
             order by time asc limit 1 align by device
    <select id="lastParamByWorstationId" resultType="com.qianwen.mdc.collect.entity.iotdb.ProcessParam">
            select  n as n,v as v,workstation_id as workstationId from root.f2.process_param_${workstationId}_* order by time desc limit 1 align by device
    </select>
</mapper>