yangys
2024-11-02 9a9b747962cc00801d8cce4137d1e123d556a79b
修复iotdb大批插入数据问题
已修改9个文件
147 ■■■■■ 文件已修改
collect/src/main/java/com/qianwen/mdc/collect/cache/WorkstationCache.java 38 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
collect/src/main/java/com/qianwen/mdc/collect/handler/DeviceStatusDataHandler.java 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
collect/src/main/java/com/qianwen/mdc/collect/job/DeviceStatusAggregateJob.java 16 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
collect/src/main/java/com/qianwen/mdc/collect/service/DeviceStateAggregateNoFeedbackService.java 16 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
collect/src/main/java/com/qianwen/mdc/collect/service/DeviceStateFixPointService.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
collect/src/main/java/com/qianwen/mdc/collect/service/DeviceStateService.java 38 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
collect/src/main/java/com/qianwen/mdc/collect/service/IOTMqttReceiveService.java 19 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
collect/src/main/java/com/qianwen/mdc/collect/utils/redis/RedisUtil.java 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
collect/src/main/resources/application-dev.yml 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
collect/src/main/java/com/qianwen/mdc/collect/cache/WorkstationCache.java
@@ -8,6 +8,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,7 +47,7 @@
    private EmployeeOnOffWorkMapper employeeOnOffWorkMapper;
    
    
    public Map<Long, WorkstationDTO> getWorkstations() {
    public Map<String, WorkstationDTO> getWorkstations() {
        String redisKey = COLLECT_WORKSTATION.concat("::").concat(WORKSTATION_ALL);
        /*Map<String, WorkstationDTO> map = bladeRedis.hGetAll(redisKey);
       
@@ -56,7 +57,7 @@
        }
        return map;
        */
        Map<Long, WorkstationDTO> map = convertMap(redisUtil.hmget(redisKey));
        Map<String, WorkstationDTO> map = convertMap(redisUtil.hmget(redisKey));
       
        if (ObjectUtil.isEmpty(map)) {
            map = setWorkstations();
@@ -66,28 +67,37 @@
        
    }
    
    private Map<Long, WorkstationDTO> setWorkstations() {
    private Map<String, WorkstationDTO> setWorkstations() {
        List<Workstation> list = workstationService.list();
        String redisKey = COLLECT_WORKSTATION.concat("::").concat(WORKSTATION_ALL);
        /*
        list.forEach(ws -> {
            /*
            WorkStationDTO workStationDTO = WorkstationConvert.INSTANCE.convertDTO(workStation);
            bladeRedis.hSet(redisKey, workStation.getId(), workStationDTO);
            */
            //WorkStationDTO workStationDTO = WorkstationConvert.INSTANCE.convertDTO(workStation);
            //bladeRedis.hSet(redisKey, workStation.getId(), workStationDTO);
            WorkstationDTO dto = new WorkstationDTO();
            dto.setCalendarCode(ws.getCalendarCode());
            dto.setCode(ws.getCode());
            dto.setId(ws.getId());
            dto.setName(ws.getName());
            redisUtil.hset(redisKey, ws.getId(), dto);
        });
        //bladeRedis.expire(redisKey, 259200L);
        redisUtil.expire(redisKey, 259200L);
        //return bladeRedis.hGetAll(redisKey);
        });*/
        //Map<String, String> map = str.collect(Collectors.toMap(p -> p[0], p -> p[1]));
        Map<String,WorkstationDTO> mp = list.stream().collect(Collectors.toMap(ws -> String.valueOf(ws.getId()), ws->{
            WorkstationDTO dto = new WorkstationDTO();
            dto.setCalendarCode(ws.getCalendarCode());
            dto.setCode(ws.getCode());
            dto.setId(ws.getId());
            dto.setName(ws.getName());
            return dto;
        }));
        redisUtil.hmset(redisKey, mp);
        
        return convertMap(redisUtil.hmget(redisKey));
        redisUtil.expire(redisKey, 259200L);
        return (Map<String, WorkstationDTO>)redisUtil.hmget(redisKey);
    }
    
    static <K,V> Map<K,V> convertMap(Map<?,?> map){
collect/src/main/java/com/qianwen/mdc/collect/handler/DeviceStatusDataHandler.java
@@ -20,10 +20,7 @@
    private static final Logger log = LoggerFactory.getLogger(DeviceStatusDataHandler.class);
    @Autowired
    private WorkstationCache workstationCache;
    //@Autowired
    //private IotDBSessionConfig iotdbCfg;
    //@Autowired
    //private IotDBCommonService iotDBCommonService;
    @Autowired
    private DeviceStateService deviceStateService;
    @Override
@@ -53,7 +50,6 @@
        state.setShiftTimeType(data.getShiftTimeType());
        fillWorkStationCondition(data, state);
       
        //insertState(state);
        deviceStateService.saveDeviceStates(Arrays.asList(state));
        
        log.info("设备状态保存完成");
collect/src/main/java/com/qianwen/mdc/collect/job/DeviceStatusAggregateJob.java
@@ -29,14 +29,18 @@
    public ReturnT<String> workStationAggregateJobHandler(String param) throws Exception {
        XxlJobLogger.log("XXL-JOB, 定时计算工位的状态,产量等信息,开始发送.....", new Object[0]);
        
        Map<Long, WorkstationDTO> workstations = workstationCache.getWorkstations();
        if (ObjectUtil.isNotEmpty(workstations)) {
        Map<String, WorkstationDTO> workstationsMap = workstationCache.getWorkstations();
        if (ObjectUtil.isNotEmpty(workstationsMap)) {
            
            Set<Long> workStationIds = workstations.keySet();
            log.info("聚合状态工位总数:{}",workStationIds.size());
            for(Long workstationId :workStationIds) {
            Set<String> workstationIds = workstationsMap.keySet();
            log.info("聚合状态工位总数:{}",workstationsMap.size());
            for(String workstationId :workstationIds) {
                log.info("开始聚合工位{}的状态",workstationId);
                deviceStateAggregateService.stateAggregate(workstationId);
                Long wid = Long.parseLong(workstationId);
                deviceStateAggregateService.stateAggregate(wid);
            }
        }
        log.info("聚合状态整体结束");
collect/src/main/java/com/qianwen/mdc/collect/service/DeviceStateAggregateNoFeedbackService.java
@@ -97,13 +97,13 @@
        iotDBCommonService.setTemmplateIfNotSet(IOTDBConstant.TEMPLATE_AGGREGATESTATE, deviceId);//挂载模板
        
        Tablet tablet = new Tablet(deviceId, schemas);
        tablet.rowSize = aggStates.size();
        AggregateState aggState;
        int tblIndex = 0;
        int tblIndex = -1;
        
        for(int i=0;i<aggStates.size();i++) {
            aggState = aggStates.get(i);
            tblIndex = tablet.rowSize++;
            tablet.addTimestamp(tblIndex, aggState.getTime());
            tablet.addValue("workstation_id", tblIndex, aggState.getWorkstationId());
            tablet.addValue("value_collect", tblIndex, aggState.getValueCollect());
@@ -123,12 +123,16 @@
            tablet.addValue("is_deleted", tblIndex, aggState.getIsDeleted());
            tablet.addValue("employee_id", tblIndex, aggState.getEmployeeId());
            
            tblIndex++;
            if(aggState.getWorkstationId() == 1656819337286631426L) {
                System.out.println("laile"+aggState.getWorkstationId());
            }
            //tblIndex++;
            if(tblIndex >= MAX_COUNT) {
                try {
                    //每个工位批量插入一次数据
                    this.iotdbConfig.getSessionPool().insertAlignedTablet(tablet);
                    log.info("保存聚合状态完成tblIndex={}",tblIndex);
                    //tablet.rowSize = 0;
                    tablet.reset();
                    tblIndex = 0;
                } catch (Exception e) {
@@ -138,13 +142,13 @@
            
        }
        
        if(tblIndex > 0) {
        if(tablet.rowSize > 0) {
            try {
                //每个工位批量插入一次数据
                this.iotdbConfig.getSessionPool().insertAlignedTablet(tablet);
                log.info("保存聚合状态完成finaltblIndex={}",tblIndex);
                tablet.reset();
                tblIndex = 0;
                tblIndex = -1;
            } catch (Exception e) {
                log.error("保存固定点数据异常",e);
            } 
collect/src/main/java/com/qianwen/mdc/collect/service/DeviceStateFixPointService.java
@@ -58,7 +58,7 @@
    public void deviceStateFixPoint(DateTime dateTime, List<String> includeWorkstationIds) {
        List<DeviceState> result;
        
        Map<Long, WorkstationDTO> workStations = workstationCache.getWorkstations();
        Map<String, WorkstationDTO> workStations = workstationCache.getWorkstations();
        if (ObjectUtil.isEmpty(workStations)) {
            return;
        }
collect/src/main/java/com/qianwen/mdc/collect/service/DeviceStateService.java
@@ -94,9 +94,7 @@
            
            Tablet tablet = new Tablet(deviceId, schemas);
            
            states = entry.getValue();
            tablet.rowSize = states.size();
            DeviceState state;
            
            
@@ -104,9 +102,10 @@
            //int currentIdx = 0;
        
                
            int tblIndex = 0;
            int tblIndex = -1;
            for(int i=0; i < states.size(); i++) {
                state = states.get(i);
                tblIndex = tablet.rowSize++;
                tablet.addTimestamp(tblIndex, state.getTime());
                tablet.addValue("workstation_id", tblIndex, state.getWorkstationId());
                tablet.addValue("value_collect", tblIndex, state.getValueCollect());
@@ -128,7 +127,7 @@
                tablet.addValue("is_deleted", tblIndex, state.getIsDeleted());
                tablet.addValue("employee_id", tblIndex, state.getEmployeeId());
                
                tblIndex++;
                //tblIndex++;
                
                if(tblIndex >= MAX_COUNT) {
                    try {
@@ -136,49 +135,24 @@
                        this.iotdbConfig.getSessionPool().insertAlignedTablet(tablet);
                        log.info("保存设备状态完成");
                        tablet.reset();
                        tblIndex = 0;
                        tblIndex = -1;
                    } catch (Exception e) {
                        log.error("保存固定点数据异常",e);
                    } 
                }
            }
            
            if(tblIndex > 0) {
            if(tablet.rowSize > 0) {
                try {
                    //每个工位批量插入一次数据
                    this.iotdbConfig.getSessionPool().insertAlignedTablet(tablet);
                    log.info("保存设备状态完成2");
                    tablet.reset();
                    tblIndex = 0;
                    //tblIndex = -1;
                } catch (Exception e) {
                    log.error("保存固定点数据异常",e);
                } 
            }
            /*
            for(int i=0;i<states.size();i++) {
                state = states.get(i);
                tablet.addTimestamp(i, state.getTime());
                tablet.addValue("workstation_id", i, state.getWorkstationId());
                tablet.addValue("value_collect", i, state.getValueCollect());
                tablet.addValue("calendar_code", i, state.getCalendarCode());
                tablet.addValue("factory_year", i, state.getFactoryYear());
                tablet.addValue("factory_month", i, state.getFactoryMonth());
                tablet.addValue("factory_week", i, state.getFactoryWeek());
                tablet.addValue("factory_date", i, state.getFactoryDate());
                tablet.addValue("shift_index", i, state.getShiftIndex());
                tablet.addValue("shift_time_type", i, state.getShiftTimeType());
                tablet.addValue("wcs", i, state.getWcs());
                tablet.addValue("rps", i, state.getRps());
                tablet.addValue("is_fix_point", i, state.getIsFixPoint());
                tablet.addValue("is_sync", i, state.getIsSync());
                tablet.addValue("is_plan", i, state.getIsPlan());
                tablet.addValue("feedback_point_type", i, state.getFeedbackPointType());
                tablet.addValue("feedback_id", i, state.getFeedbackId());
                tablet.addValue("is_deleted", i, state.getIsDeleted());
                tablet.addValue("employee_id", i, state.getEmployeeId());
            }*/
            
            
        }
collect/src/main/java/com/qianwen/mdc/collect/service/IOTMqttReceiveService.java
@@ -26,8 +26,6 @@
    @Autowired
    private PackedDataService packedDataService;
    @Autowired
    private RedisUtil redisUtil;
    @Autowired
    private WorkstationDatapointsService dpService;
    /**
     * 处理收到的消息,对应TelemetryDataPostingConsumer
@@ -108,21 +106,4 @@
        return dtList;
    }
    
    /**
     * 根据对应表缓存,获取appId对应的id
     * @param appId
     * @return
     */
    /*
    public Long getWorkstationIdByAppId(String appId) {
        Object wid = redisUtil.hget("workstation-appid-map", appId);
        String workstationId = String.valueOf(Optional.ofNullable(wid).orElse(StringUtils.EMPTY));
        if(ObjectUtil.isEmpty(workstationId)) {
            return null;
        }
        return Long.parseLong(workstationId);
    }
    */
}
collect/src/main/java/com/qianwen/mdc/collect/utils/redis/RedisUtil.java
@@ -170,7 +170,7 @@
     * @param key 键
     * @return 对应的多个键值
     */
    public Map<Object, Object> hmget(String key) {
    public Map<?, ?> hmget(String key) {
        return redisTemplate.opsForHash().entries(key);
    }
@@ -180,7 +180,7 @@
     * @param map 对应多个键值
     * @return true 成功 false 失败
     */
    public boolean hmset(String key, Map<String,Object> map) {
    public boolean hmset(String key, Map<?,?> map) {
        try {
            redisTemplate.opsForHash().putAll(key, map);
            return true;
@@ -230,7 +230,7 @@
     * @param value 值
     * @return true 成功 false失败
     */
    public boolean hset(String key, Object item, Object value) {
    public <HK, HV> boolean hset(String key, HK item, HV value) {
        try {
            redisTemplate.opsForHash().put(key, item, value);
            return true;
collect/src/main/resources/application-dev.yml
@@ -1,7 +1,7 @@
spring:
  redis:
    database: 0
    host: localhost
    database: 5
    host: 120.46.212.231
    port: 6379
    password: root
    timeout: 3000
@@ -23,7 +23,7 @@
  #心跳时间
  keepalive: 10
  connectionTimeout: 3000 #连接超时时间
  dataReceiveTopic: forward/testxxx #从iot平台接收mqtt采集数据的topic forward/test
  dataReceiveTopic: forward/test #从iot平台接收mqtt采集数据的topic forward/test
  # mysql
datasource:
  type: mysql