From 9a9b747962cc00801d8cce4137d1e123d556a79b Mon Sep 17 00:00:00 2001 From: yangys <y_ys79@sina.com> Date: 星期六, 02 十一月 2024 16:59:59 +0800 Subject: [PATCH] 修复iotdb大批插入数据问题 --- collect/src/main/java/com/qianwen/mdc/collect/service/DeviceStateFixPointService.java | 2 collect/src/main/java/com/qianwen/mdc/collect/job/DeviceStatusAggregateJob.java | 16 +++-- collect/src/main/java/com/qianwen/mdc/collect/service/DeviceStateAggregateNoFeedbackService.java | 16 +++-- collect/src/main/java/com/qianwen/mdc/collect/utils/redis/RedisUtil.java | 6 +- collect/src/main/java/com/qianwen/mdc/collect/service/DeviceStateService.java | 38 ++---------- collect/src/main/java/com/qianwen/mdc/collect/cache/WorkstationCache.java | 38 ++++++++---- collect/src/main/java/com/qianwen/mdc/collect/service/IOTMqttReceiveService.java | 19 ------ collect/src/main/resources/application-dev.yml | 6 +- collect/src/main/java/com/qianwen/mdc/collect/handler/DeviceStatusDataHandler.java | 6 - 9 files changed, 58 insertions(+), 89 deletions(-) diff --git a/collect/src/main/java/com/qianwen/mdc/collect/cache/WorkstationCache.java b/collect/src/main/java/com/qianwen/mdc/collect/cache/WorkstationCache.java index 766f62b..dc91d11 100644 --- a/collect/src/main/java/com/qianwen/mdc/collect/cache/WorkstationCache.java +++ b/collect/src/main/java/com/qianwen/mdc/collect/cache/WorkstationCache.java @@ -8,6 +8,7 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,7 +47,7 @@ private EmployeeOnOffWorkMapper employeeOnOffWorkMapper; - public Map<Long, WorkstationDTO> getWorkstations() { + public Map<String, WorkstationDTO> getWorkstations() { String redisKey = COLLECT_WORKSTATION.concat("::").concat(WORKSTATION_ALL); /*Map<String, WorkstationDTO> map = bladeRedis.hGetAll(redisKey); @@ -56,7 +57,7 @@ } return map; */ - Map<Long, WorkstationDTO> map = convertMap(redisUtil.hmget(redisKey)); + Map<String, WorkstationDTO> map = convertMap(redisUtil.hmget(redisKey)); if (ObjectUtil.isEmpty(map)) { map = setWorkstations(); @@ -66,28 +67,37 @@ } - private Map<Long, WorkstationDTO> setWorkstations() { + private Map<String, WorkstationDTO> setWorkstations() { List<Workstation> list = workstationService.list(); String redisKey = COLLECT_WORKSTATION.concat("::").concat(WORKSTATION_ALL); - + /* list.forEach(ws -> { - /* - WorkStationDTO workStationDTO = WorkstationConvert.INSTANCE.convertDTO(workStation); - bladeRedis.hSet(redisKey, workStation.getId(), workStationDTO); - */ + + //WorkStationDTO workStationDTO = WorkstationConvert.INSTANCE.convertDTO(workStation); + //bladeRedis.hSet(redisKey, workStation.getId(), workStationDTO); + WorkstationDTO dto = new WorkstationDTO(); dto.setCalendarCode(ws.getCalendarCode()); dto.setCode(ws.getCode()); dto.setId(ws.getId()); dto.setName(ws.getName()); redisUtil.hset(redisKey, ws.getId(), dto); - }); - - //bladeRedis.expire(redisKey, 259200L); - redisUtil.expire(redisKey, 259200L); - //return bladeRedis.hGetAll(redisKey); + });*/ + //Map<String, String> map = str.collect(Collectors.toMap(p -> p[0], p -> p[1])); + Map<String,WorkstationDTO> mp = list.stream().collect(Collectors.toMap(ws -> String.valueOf(ws.getId()), ws->{ + WorkstationDTO dto = new WorkstationDTO(); + dto.setCalendarCode(ws.getCalendarCode()); + dto.setCode(ws.getCode()); + dto.setId(ws.getId()); + dto.setName(ws.getName()); + return dto; + })); + redisUtil.hmset(redisKey, mp); - return convertMap(redisUtil.hmget(redisKey)); + redisUtil.expire(redisKey, 259200L); + + + return (Map<String, WorkstationDTO>)redisUtil.hmget(redisKey); } static <K,V> Map<K,V> convertMap(Map<?,?> map){ diff --git a/collect/src/main/java/com/qianwen/mdc/collect/handler/DeviceStatusDataHandler.java b/collect/src/main/java/com/qianwen/mdc/collect/handler/DeviceStatusDataHandler.java index 31cfb59..a3e16c3 100644 --- a/collect/src/main/java/com/qianwen/mdc/collect/handler/DeviceStatusDataHandler.java +++ b/collect/src/main/java/com/qianwen/mdc/collect/handler/DeviceStatusDataHandler.java @@ -20,10 +20,7 @@ private static final Logger log = LoggerFactory.getLogger(DeviceStatusDataHandler.class); @Autowired private WorkstationCache workstationCache; - //@Autowired - //private IotDBSessionConfig iotdbCfg; - //@Autowired - //private IotDBCommonService iotDBCommonService; + @Autowired private DeviceStateService deviceStateService; @Override @@ -53,7 +50,6 @@ state.setShiftTimeType(data.getShiftTimeType()); fillWorkStationCondition(data, state); - //insertState(state); deviceStateService.saveDeviceStates(Arrays.asList(state)); log.info("璁惧鐘舵�佷繚瀛樺畬鎴�"); diff --git a/collect/src/main/java/com/qianwen/mdc/collect/job/DeviceStatusAggregateJob.java b/collect/src/main/java/com/qianwen/mdc/collect/job/DeviceStatusAggregateJob.java index f11cedb..bdb5dd8 100644 --- a/collect/src/main/java/com/qianwen/mdc/collect/job/DeviceStatusAggregateJob.java +++ b/collect/src/main/java/com/qianwen/mdc/collect/job/DeviceStatusAggregateJob.java @@ -29,14 +29,18 @@ public ReturnT<String> workStationAggregateJobHandler(String param) throws Exception { XxlJobLogger.log("XXL-JOB, 瀹氭椂璁$畻宸ヤ綅鐨勭姸鎬侊紝浜ч噺绛変俊鎭�,寮�濮嬪彂閫�.....", new Object[0]); - Map<Long, WorkstationDTO> workstations = workstationCache.getWorkstations(); - if (ObjectUtil.isNotEmpty(workstations)) { + Map<String, WorkstationDTO> workstationsMap = workstationCache.getWorkstations(); + if (ObjectUtil.isNotEmpty(workstationsMap)) { - Set<Long> workStationIds = workstations.keySet(); - log.info("鑱氬悎鐘舵�佸伐浣嶆�绘暟:{}",workStationIds.size()); - for(Long workstationId :workStationIds) { + Set<String> workstationIds = workstationsMap.keySet(); + + log.info("鑱氬悎鐘舵�佸伐浣嶆�绘暟:{}",workstationsMap.size()); + + + for(String workstationId :workstationIds) { log.info("寮�濮嬭仛鍚堝伐浣峽}鐨勭姸鎬�",workstationId); - deviceStateAggregateService.stateAggregate(workstationId); + Long wid = Long.parseLong(workstationId); + deviceStateAggregateService.stateAggregate(wid); } } log.info("鑱氬悎鐘舵�佹暣浣撶粨鏉�"); diff --git a/collect/src/main/java/com/qianwen/mdc/collect/service/DeviceStateAggregateNoFeedbackService.java b/collect/src/main/java/com/qianwen/mdc/collect/service/DeviceStateAggregateNoFeedbackService.java index ef1a33a..fd4bdf7 100644 --- a/collect/src/main/java/com/qianwen/mdc/collect/service/DeviceStateAggregateNoFeedbackService.java +++ b/collect/src/main/java/com/qianwen/mdc/collect/service/DeviceStateAggregateNoFeedbackService.java @@ -97,13 +97,13 @@ iotDBCommonService.setTemmplateIfNotSet(IOTDBConstant.TEMPLATE_AGGREGATESTATE, deviceId);//鎸傝浇妯℃澘 Tablet tablet = new Tablet(deviceId, schemas); - tablet.rowSize = aggStates.size(); - + AggregateState aggState; - int tblIndex = 0; + int tblIndex = -1; for(int i=0;i<aggStates.size();i++) { aggState = aggStates.get(i); + tblIndex = tablet.rowSize++; tablet.addTimestamp(tblIndex, aggState.getTime()); tablet.addValue("workstation_id", tblIndex, aggState.getWorkstationId()); tablet.addValue("value_collect", tblIndex, aggState.getValueCollect()); @@ -123,12 +123,16 @@ tablet.addValue("is_deleted", tblIndex, aggState.getIsDeleted()); tablet.addValue("employee_id", tblIndex, aggState.getEmployeeId()); - tblIndex++; + if(aggState.getWorkstationId() == 1656819337286631426L) { + System.out.println("laile"+aggState.getWorkstationId()); + } + //tblIndex++; if(tblIndex >= MAX_COUNT) { try { //姣忎釜宸ヤ綅鎵归噺鎻掑叆涓�娆℃暟鎹� this.iotdbConfig.getSessionPool().insertAlignedTablet(tablet); log.info("淇濆瓨鑱氬悎鐘舵�佸畬鎴恡blIndex={}",tblIndex); + //tablet.rowSize = 0; tablet.reset(); tblIndex = 0; } catch (Exception e) { @@ -138,13 +142,13 @@ } - if(tblIndex > 0) { + if(tablet.rowSize > 0) { try { //姣忎釜宸ヤ綅鎵归噺鎻掑叆涓�娆℃暟鎹� this.iotdbConfig.getSessionPool().insertAlignedTablet(tablet); log.info("淇濆瓨鑱氬悎鐘舵�佸畬鎴恌inaltblIndex={}",tblIndex); tablet.reset(); - tblIndex = 0; + tblIndex = -1; } catch (Exception e) { log.error("淇濆瓨鍥哄畾鐐规暟鎹紓甯�",e); } diff --git a/collect/src/main/java/com/qianwen/mdc/collect/service/DeviceStateFixPointService.java b/collect/src/main/java/com/qianwen/mdc/collect/service/DeviceStateFixPointService.java index 9301b0c..ca31fd4 100644 --- a/collect/src/main/java/com/qianwen/mdc/collect/service/DeviceStateFixPointService.java +++ b/collect/src/main/java/com/qianwen/mdc/collect/service/DeviceStateFixPointService.java @@ -58,7 +58,7 @@ public void deviceStateFixPoint(DateTime dateTime, List<String> includeWorkstationIds) { List<DeviceState> result; - Map<Long, WorkstationDTO> workStations = workstationCache.getWorkstations(); + Map<String, WorkstationDTO> workStations = workstationCache.getWorkstations(); if (ObjectUtil.isEmpty(workStations)) { return; } diff --git a/collect/src/main/java/com/qianwen/mdc/collect/service/DeviceStateService.java b/collect/src/main/java/com/qianwen/mdc/collect/service/DeviceStateService.java index 15eaad5..a59fce6 100644 --- a/collect/src/main/java/com/qianwen/mdc/collect/service/DeviceStateService.java +++ b/collect/src/main/java/com/qianwen/mdc/collect/service/DeviceStateService.java @@ -94,9 +94,7 @@ Tablet tablet = new Tablet(deviceId, schemas); - states = entry.getValue(); - tablet.rowSize = states.size(); DeviceState state; @@ -104,9 +102,10 @@ //int currentIdx = 0; - int tblIndex = 0; + int tblIndex = -1; for(int i=0; i < states.size(); i++) { state = states.get(i); + tblIndex = tablet.rowSize++; tablet.addTimestamp(tblIndex, state.getTime()); tablet.addValue("workstation_id", tblIndex, state.getWorkstationId()); tablet.addValue("value_collect", tblIndex, state.getValueCollect()); @@ -128,7 +127,7 @@ tablet.addValue("is_deleted", tblIndex, state.getIsDeleted()); tablet.addValue("employee_id", tblIndex, state.getEmployeeId()); - tblIndex++; + //tblIndex++; if(tblIndex >= MAX_COUNT) { try { @@ -136,49 +135,24 @@ this.iotdbConfig.getSessionPool().insertAlignedTablet(tablet); log.info("淇濆瓨璁惧鐘舵�佸畬鎴�"); tablet.reset(); - tblIndex = 0; + tblIndex = -1; } catch (Exception e) { log.error("淇濆瓨鍥哄畾鐐规暟鎹紓甯�",e); } } } - if(tblIndex > 0) { + if(tablet.rowSize > 0) { try { //姣忎釜宸ヤ綅鎵归噺鎻掑叆涓�娆℃暟鎹� this.iotdbConfig.getSessionPool().insertAlignedTablet(tablet); log.info("淇濆瓨璁惧鐘舵�佸畬鎴�2"); tablet.reset(); - tblIndex = 0; + //tblIndex = -1; } catch (Exception e) { log.error("淇濆瓨鍥哄畾鐐规暟鎹紓甯�",e); } } - - /* - for(int i=0;i<states.size();i++) { - state = states.get(i); - tablet.addTimestamp(i, state.getTime()); - tablet.addValue("workstation_id", i, state.getWorkstationId()); - tablet.addValue("value_collect", i, state.getValueCollect()); - - tablet.addValue("calendar_code", i, state.getCalendarCode()); - tablet.addValue("factory_year", i, state.getFactoryYear()); - tablet.addValue("factory_month", i, state.getFactoryMonth()); - tablet.addValue("factory_week", i, state.getFactoryWeek()); - tablet.addValue("factory_date", i, state.getFactoryDate()); - tablet.addValue("shift_index", i, state.getShiftIndex()); - tablet.addValue("shift_time_type", i, state.getShiftTimeType()); - tablet.addValue("wcs", i, state.getWcs()); - tablet.addValue("rps", i, state.getRps()); - tablet.addValue("is_fix_point", i, state.getIsFixPoint()); - tablet.addValue("is_sync", i, state.getIsSync()); - tablet.addValue("is_plan", i, state.getIsPlan()); - tablet.addValue("feedback_point_type", i, state.getFeedbackPointType()); - tablet.addValue("feedback_id", i, state.getFeedbackId()); - tablet.addValue("is_deleted", i, state.getIsDeleted()); - tablet.addValue("employee_id", i, state.getEmployeeId()); - }*/ } diff --git a/collect/src/main/java/com/qianwen/mdc/collect/service/IOTMqttReceiveService.java b/collect/src/main/java/com/qianwen/mdc/collect/service/IOTMqttReceiveService.java index 3246a2f..be25cb6 100644 --- a/collect/src/main/java/com/qianwen/mdc/collect/service/IOTMqttReceiveService.java +++ b/collect/src/main/java/com/qianwen/mdc/collect/service/IOTMqttReceiveService.java @@ -26,8 +26,6 @@ @Autowired private PackedDataService packedDataService; @Autowired - private RedisUtil redisUtil; - @Autowired private WorkstationDatapointsService dpService; /** * 澶勭悊鏀跺埌鐨勬秷鎭�,瀵瑰簲TelemetryDataPostingConsumer @@ -108,21 +106,4 @@ return dtList; } - /** - * 鏍规嵁瀵瑰簲琛ㄧ紦瀛橈紝鑾峰彇appId瀵瑰簲鐨刬d - * @param appId - * @return - */ - /* - public Long getWorkstationIdByAppId(String appId) { - - Object wid = redisUtil.hget("workstation-appid-map", appId); - - String workstationId = String.valueOf(Optional.ofNullable(wid).orElse(StringUtils.EMPTY)); - if(ObjectUtil.isEmpty(workstationId)) { - return null; - } - return Long.parseLong(workstationId); - } - */ } diff --git a/collect/src/main/java/com/qianwen/mdc/collect/utils/redis/RedisUtil.java b/collect/src/main/java/com/qianwen/mdc/collect/utils/redis/RedisUtil.java index b8a9082..beb3c98 100644 --- a/collect/src/main/java/com/qianwen/mdc/collect/utils/redis/RedisUtil.java +++ b/collect/src/main/java/com/qianwen/mdc/collect/utils/redis/RedisUtil.java @@ -170,7 +170,7 @@ * @param key 閿� * @return 瀵瑰簲鐨勫涓敭鍊� */ - public Map<Object, Object> hmget(String key) { + public Map<?, ?> hmget(String key) { return redisTemplate.opsForHash().entries(key); } @@ -180,7 +180,7 @@ * @param map 瀵瑰簲澶氫釜閿�� * @return true 鎴愬姛 false 澶辫触 */ - public boolean hmset(String key, Map<String,Object> map) { + public boolean hmset(String key, Map<?,?> map) { try { redisTemplate.opsForHash().putAll(key, map); return true; @@ -230,7 +230,7 @@ * @param value 鍊� * @return true 鎴愬姛 false澶辫触 */ - public boolean hset(String key, Object item, Object value) { + public <HK, HV> boolean hset(String key, HK item, HV value) { try { redisTemplate.opsForHash().put(key, item, value); return true; diff --git a/collect/src/main/resources/application-dev.yml b/collect/src/main/resources/application-dev.yml index b5986ce..d47b0e7 100644 --- a/collect/src/main/resources/application-dev.yml +++ b/collect/src/main/resources/application-dev.yml @@ -1,7 +1,7 @@ spring: redis: - database: 0 - host: localhost + database: 5 + host: 120.46.212.231 port: 6379 password: root timeout: 3000 @@ -23,7 +23,7 @@ #蹇冭烦鏃堕棿 keepalive: 10 connectionTimeout: 3000 #杩炴帴瓒呮椂鏃堕棿 - dataReceiveTopic: forward/testxxx #浠巌ot骞冲彴鎺ユ敹mqtt閲囬泦鏁版嵁鐨則opic forward/test + dataReceiveTopic: forward/test #浠巌ot骞冲彴鎺ユ敹mqtt閲囬泦鏁版嵁鐨則opic forward/test # mysql datasource: type: mysql -- Gitblit v1.9.3