From 9a9b747962cc00801d8cce4137d1e123d556a79b Mon Sep 17 00:00:00 2001
From: yangys <y_ys79@sina.com>
Date: 星期六, 02 十一月 2024 16:59:59 +0800
Subject: [PATCH] 修复iotdb大批插入数据问题

---
 collect/src/main/java/com/qianwen/mdc/collect/service/DeviceStateFixPointService.java            |    2 
 collect/src/main/java/com/qianwen/mdc/collect/job/DeviceStatusAggregateJob.java                  |   16 +++--
 collect/src/main/java/com/qianwen/mdc/collect/service/DeviceStateAggregateNoFeedbackService.java |   16 +++--
 collect/src/main/java/com/qianwen/mdc/collect/utils/redis/RedisUtil.java                         |    6 +-
 collect/src/main/java/com/qianwen/mdc/collect/service/DeviceStateService.java                    |   38 ++----------
 collect/src/main/java/com/qianwen/mdc/collect/cache/WorkstationCache.java                        |   38 ++++++++----
 collect/src/main/java/com/qianwen/mdc/collect/service/IOTMqttReceiveService.java                 |   19 ------
 collect/src/main/resources/application-dev.yml                                                   |    6 +-
 collect/src/main/java/com/qianwen/mdc/collect/handler/DeviceStatusDataHandler.java               |    6 -
 9 files changed, 58 insertions(+), 89 deletions(-)

diff --git a/collect/src/main/java/com/qianwen/mdc/collect/cache/WorkstationCache.java b/collect/src/main/java/com/qianwen/mdc/collect/cache/WorkstationCache.java
index 766f62b..dc91d11 100644
--- a/collect/src/main/java/com/qianwen/mdc/collect/cache/WorkstationCache.java
+++ b/collect/src/main/java/com/qianwen/mdc/collect/cache/WorkstationCache.java
@@ -8,6 +8,7 @@
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -46,7 +47,7 @@
     private EmployeeOnOffWorkMapper employeeOnOffWorkMapper;
     
     
-    public Map<Long, WorkstationDTO> getWorkstations() {
+    public Map<String, WorkstationDTO> getWorkstations() {
         String redisKey = COLLECT_WORKSTATION.concat("::").concat(WORKSTATION_ALL);
         /*Map<String, WorkstationDTO> map = bladeRedis.hGetAll(redisKey);
        
@@ -56,7 +57,7 @@
         }
         return map;
         */
-        Map<Long, WorkstationDTO> map = convertMap(redisUtil.hmget(redisKey));
+        Map<String, WorkstationDTO> map = convertMap(redisUtil.hmget(redisKey));
        
         if (ObjectUtil.isEmpty(map)) {
             map = setWorkstations();
@@ -66,28 +67,37 @@
         
     }
     
-    private Map<Long, WorkstationDTO> setWorkstations() {
+    private Map<String, WorkstationDTO> setWorkstations() {
         List<Workstation> list = workstationService.list();
         String redisKey = COLLECT_WORKSTATION.concat("::").concat(WORKSTATION_ALL);
-       
+        /*
         list.forEach(ws -> {
-        	/*
-            WorkStationDTO workStationDTO = WorkstationConvert.INSTANCE.convertDTO(workStation);
-            bladeRedis.hSet(redisKey, workStation.getId(), workStationDTO);
-            */
+        	
+            //WorkStationDTO workStationDTO = WorkstationConvert.INSTANCE.convertDTO(workStation);
+            //bladeRedis.hSet(redisKey, workStation.getId(), workStationDTO);
+           
         	WorkstationDTO dto = new WorkstationDTO();
         	dto.setCalendarCode(ws.getCalendarCode());
         	dto.setCode(ws.getCode());
         	dto.setId(ws.getId());
         	dto.setName(ws.getName());
             redisUtil.hset(redisKey, ws.getId(), dto);
-        });
-       
-        //bladeRedis.expire(redisKey, 259200L);
-        redisUtil.expire(redisKey, 259200L);
-        //return bladeRedis.hGetAll(redisKey);
+        });*/
+        //Map<String, String> map = str.collect(Collectors.toMap(p -> p[0], p -> p[1]));
+        Map<String,WorkstationDTO> mp = list.stream().collect(Collectors.toMap(ws -> String.valueOf(ws.getId()), ws->{
+        	WorkstationDTO dto = new WorkstationDTO();
+        	dto.setCalendarCode(ws.getCalendarCode());
+        	dto.setCode(ws.getCode());
+        	dto.setId(ws.getId());
+        	dto.setName(ws.getName());
+        	return dto;
+        }));
+        redisUtil.hmset(redisKey, mp);
         
-        return convertMap(redisUtil.hmget(redisKey));
+        redisUtil.expire(redisKey, 259200L);
+        
+        
+        return (Map<String, WorkstationDTO>)redisUtil.hmget(redisKey);
     }
     
     static <K,V> Map<K,V> convertMap(Map<?,?> map){
diff --git a/collect/src/main/java/com/qianwen/mdc/collect/handler/DeviceStatusDataHandler.java b/collect/src/main/java/com/qianwen/mdc/collect/handler/DeviceStatusDataHandler.java
index 31cfb59..a3e16c3 100644
--- a/collect/src/main/java/com/qianwen/mdc/collect/handler/DeviceStatusDataHandler.java
+++ b/collect/src/main/java/com/qianwen/mdc/collect/handler/DeviceStatusDataHandler.java
@@ -20,10 +20,7 @@
     private static final Logger log = LoggerFactory.getLogger(DeviceStatusDataHandler.class);
     @Autowired
     private WorkstationCache workstationCache;
-    //@Autowired
-	//private IotDBSessionConfig iotdbCfg;
-    //@Autowired
-    //private IotDBCommonService iotDBCommonService;
+
     @Autowired
     private DeviceStateService deviceStateService;
     @Override
@@ -53,7 +50,6 @@
     	state.setShiftTimeType(data.getShiftTimeType());
         fillWorkStationCondition(data, state);
        
-        //insertState(state);
         deviceStateService.saveDeviceStates(Arrays.asList(state));
         
         log.info("璁惧鐘舵�佷繚瀛樺畬鎴�");
diff --git a/collect/src/main/java/com/qianwen/mdc/collect/job/DeviceStatusAggregateJob.java b/collect/src/main/java/com/qianwen/mdc/collect/job/DeviceStatusAggregateJob.java
index f11cedb..bdb5dd8 100644
--- a/collect/src/main/java/com/qianwen/mdc/collect/job/DeviceStatusAggregateJob.java
+++ b/collect/src/main/java/com/qianwen/mdc/collect/job/DeviceStatusAggregateJob.java
@@ -29,14 +29,18 @@
     public ReturnT<String> workStationAggregateJobHandler(String param) throws Exception {
         XxlJobLogger.log("XXL-JOB, 瀹氭椂璁$畻宸ヤ綅鐨勭姸鎬侊紝浜ч噺绛変俊鎭�,寮�濮嬪彂閫�.....", new Object[0]);
         
-        Map<Long, WorkstationDTO> workstations = workstationCache.getWorkstations();
-        if (ObjectUtil.isNotEmpty(workstations)) {
+        Map<String, WorkstationDTO> workstationsMap = workstationCache.getWorkstations();
+        if (ObjectUtil.isNotEmpty(workstationsMap)) {
         	
-            Set<Long> workStationIds = workstations.keySet();
-            log.info("鑱氬悎鐘舵�佸伐浣嶆�绘暟:{}",workStationIds.size());
-            for(Long workstationId :workStationIds) {
+            Set<String> workstationIds = workstationsMap.keySet();
+            
+            log.info("鑱氬悎鐘舵�佸伐浣嶆�绘暟:{}",workstationsMap.size());
+        	
+            
+            for(String workstationId :workstationIds) {
             	log.info("寮�濮嬭仛鍚堝伐浣峽}鐨勭姸鎬�",workstationId);
-            	deviceStateAggregateService.stateAggregate(workstationId);
+            	Long wid = Long.parseLong(workstationId);
+            	deviceStateAggregateService.stateAggregate(wid);
             }
         }
         log.info("鑱氬悎鐘舵�佹暣浣撶粨鏉�");
diff --git a/collect/src/main/java/com/qianwen/mdc/collect/service/DeviceStateAggregateNoFeedbackService.java b/collect/src/main/java/com/qianwen/mdc/collect/service/DeviceStateAggregateNoFeedbackService.java
index ef1a33a..fd4bdf7 100644
--- a/collect/src/main/java/com/qianwen/mdc/collect/service/DeviceStateAggregateNoFeedbackService.java
+++ b/collect/src/main/java/com/qianwen/mdc/collect/service/DeviceStateAggregateNoFeedbackService.java
@@ -97,13 +97,13 @@
 		iotDBCommonService.setTemmplateIfNotSet(IOTDBConstant.TEMPLATE_AGGREGATESTATE, deviceId);//鎸傝浇妯℃澘
 		
 		Tablet tablet = new Tablet(deviceId, schemas);
-		tablet.rowSize = aggStates.size();
-		
+	
 		AggregateState aggState;
-		int tblIndex = 0;
+		int tblIndex = -1;
 		
 		for(int i=0;i<aggStates.size();i++) {
 			aggState = aggStates.get(i);
+			tblIndex = tablet.rowSize++;
 			tablet.addTimestamp(tblIndex, aggState.getTime());
 			tablet.addValue("workstation_id", tblIndex, aggState.getWorkstationId());
 			tablet.addValue("value_collect", tblIndex, aggState.getValueCollect());
@@ -123,12 +123,16 @@
 			tablet.addValue("is_deleted", tblIndex, aggState.getIsDeleted());
 			tablet.addValue("employee_id", tblIndex, aggState.getEmployeeId());
 			
-			tblIndex++;
+			if(aggState.getWorkstationId() == 1656819337286631426L) {
+				System.out.println("laile"+aggState.getWorkstationId());
+			}
+			//tblIndex++;
 			if(tblIndex >= MAX_COUNT) {
 				try {
 					//姣忎釜宸ヤ綅鎵归噺鎻掑叆涓�娆℃暟鎹�
 					this.iotdbConfig.getSessionPool().insertAlignedTablet(tablet);
 					log.info("淇濆瓨鑱氬悎鐘舵�佸畬鎴恡blIndex={}",tblIndex);
+					//tablet.rowSize = 0;
 					tablet.reset();
 					tblIndex = 0;
 				} catch (Exception e) {
@@ -138,13 +142,13 @@
 			
 		}
 		
-		if(tblIndex > 0) {
+		if(tablet.rowSize > 0) {
 			try {
 				//姣忎釜宸ヤ綅鎵归噺鎻掑叆涓�娆℃暟鎹�
 				this.iotdbConfig.getSessionPool().insertAlignedTablet(tablet);
 				log.info("淇濆瓨鑱氬悎鐘舵�佸畬鎴恌inaltblIndex={}",tblIndex);
 				tablet.reset();
-				tblIndex = 0;
+				tblIndex = -1;
 			} catch (Exception e) {
 				log.error("淇濆瓨鍥哄畾鐐规暟鎹紓甯�",e);
 			} 
diff --git a/collect/src/main/java/com/qianwen/mdc/collect/service/DeviceStateFixPointService.java b/collect/src/main/java/com/qianwen/mdc/collect/service/DeviceStateFixPointService.java
index 9301b0c..ca31fd4 100644
--- a/collect/src/main/java/com/qianwen/mdc/collect/service/DeviceStateFixPointService.java
+++ b/collect/src/main/java/com/qianwen/mdc/collect/service/DeviceStateFixPointService.java
@@ -58,7 +58,7 @@
     public void deviceStateFixPoint(DateTime dateTime, List<String> includeWorkstationIds) {
         List<DeviceState> result;
         
-        Map<Long, WorkstationDTO> workStations = workstationCache.getWorkstations();
+        Map<String, WorkstationDTO> workStations = workstationCache.getWorkstations();
         if (ObjectUtil.isEmpty(workStations)) {
             return;
         }
diff --git a/collect/src/main/java/com/qianwen/mdc/collect/service/DeviceStateService.java b/collect/src/main/java/com/qianwen/mdc/collect/service/DeviceStateService.java
index 15eaad5..a59fce6 100644
--- a/collect/src/main/java/com/qianwen/mdc/collect/service/DeviceStateService.java
+++ b/collect/src/main/java/com/qianwen/mdc/collect/service/DeviceStateService.java
@@ -94,9 +94,7 @@
 			
 			Tablet tablet = new Tablet(deviceId, schemas);
 			
-			
 			states = entry.getValue();
-			tablet.rowSize = states.size();
 			DeviceState state;
 			
 			
@@ -104,9 +102,10 @@
 			//int currentIdx = 0;
 		
 				
-			int tblIndex = 0;
+			int tblIndex = -1;
 			for(int i=0; i < states.size(); i++) {
 				state = states.get(i);
+				tblIndex = tablet.rowSize++;
 				tablet.addTimestamp(tblIndex, state.getTime());
 				tablet.addValue("workstation_id", tblIndex, state.getWorkstationId());
 				tablet.addValue("value_collect", tblIndex, state.getValueCollect());
@@ -128,7 +127,7 @@
 				tablet.addValue("is_deleted", tblIndex, state.getIsDeleted());
 				tablet.addValue("employee_id", tblIndex, state.getEmployeeId());
 				
-				tblIndex++;
+				//tblIndex++;
 				
 				if(tblIndex >= MAX_COUNT) {
 					try {
@@ -136,49 +135,24 @@
 						this.iotdbConfig.getSessionPool().insertAlignedTablet(tablet);
 						log.info("淇濆瓨璁惧鐘舵�佸畬鎴�");
 						tablet.reset();
-						tblIndex = 0;
+						tblIndex = -1;
 					} catch (Exception e) {
 						log.error("淇濆瓨鍥哄畾鐐规暟鎹紓甯�",e);
 					} 
 				}
 			}
 			
-			if(tblIndex > 0) {
+			if(tablet.rowSize > 0) {
 				try {
 					//姣忎釜宸ヤ綅鎵归噺鎻掑叆涓�娆℃暟鎹�
 					this.iotdbConfig.getSessionPool().insertAlignedTablet(tablet);
 					log.info("淇濆瓨璁惧鐘舵�佸畬鎴�2");
 					tablet.reset();
-					tblIndex = 0;
+					//tblIndex = -1;
 				} catch (Exception e) {
 					log.error("淇濆瓨鍥哄畾鐐规暟鎹紓甯�",e);
 				} 
 			}
-			
-			/*
-			for(int i=0;i<states.size();i++) {
-				state = states.get(i);
-				tablet.addTimestamp(i, state.getTime());
-				tablet.addValue("workstation_id", i, state.getWorkstationId());
-				tablet.addValue("value_collect", i, state.getValueCollect());
-				
-				tablet.addValue("calendar_code", i, state.getCalendarCode());
-				tablet.addValue("factory_year", i, state.getFactoryYear());
-				tablet.addValue("factory_month", i, state.getFactoryMonth());
-				tablet.addValue("factory_week", i, state.getFactoryWeek());
-				tablet.addValue("factory_date", i, state.getFactoryDate());
-				tablet.addValue("shift_index", i, state.getShiftIndex());
-				tablet.addValue("shift_time_type", i, state.getShiftTimeType());
-				tablet.addValue("wcs", i, state.getWcs());
-				tablet.addValue("rps", i, state.getRps());
-				tablet.addValue("is_fix_point", i, state.getIsFixPoint());
-				tablet.addValue("is_sync", i, state.getIsSync());
-				tablet.addValue("is_plan", i, state.getIsPlan());
-				tablet.addValue("feedback_point_type", i, state.getFeedbackPointType());
-				tablet.addValue("feedback_id", i, state.getFeedbackId());
-				tablet.addValue("is_deleted", i, state.getIsDeleted());
-				tablet.addValue("employee_id", i, state.getEmployeeId());
-			}*/
 			
 			
 		}
diff --git a/collect/src/main/java/com/qianwen/mdc/collect/service/IOTMqttReceiveService.java b/collect/src/main/java/com/qianwen/mdc/collect/service/IOTMqttReceiveService.java
index 3246a2f..be25cb6 100644
--- a/collect/src/main/java/com/qianwen/mdc/collect/service/IOTMqttReceiveService.java
+++ b/collect/src/main/java/com/qianwen/mdc/collect/service/IOTMqttReceiveService.java
@@ -26,8 +26,6 @@
 	@Autowired
 	private PackedDataService packedDataService;
 	@Autowired
-	private RedisUtil redisUtil;
-	@Autowired
     private WorkstationDatapointsService dpService;
 	/**
 	 * 澶勭悊鏀跺埌鐨勬秷鎭�,瀵瑰簲TelemetryDataPostingConsumer
@@ -108,21 +106,4 @@
         return dtList;
 	}
 	
-	/**
-	 * 鏍规嵁瀵瑰簲琛ㄧ紦瀛橈紝鑾峰彇appId瀵瑰簲鐨刬d
-	 * @param appId
-	 * @return
-	 */
-	/*
-	public Long getWorkstationIdByAppId(String appId) {
-
-		Object wid = redisUtil.hget("workstation-appid-map", appId);
-		
-		String workstationId = String.valueOf(Optional.ofNullable(wid).orElse(StringUtils.EMPTY));
-		if(ObjectUtil.isEmpty(workstationId)) {
-			return null;
-		}
-		return Long.parseLong(workstationId);
-	}
-	*/
 }
diff --git a/collect/src/main/java/com/qianwen/mdc/collect/utils/redis/RedisUtil.java b/collect/src/main/java/com/qianwen/mdc/collect/utils/redis/RedisUtil.java
index b8a9082..beb3c98 100644
--- a/collect/src/main/java/com/qianwen/mdc/collect/utils/redis/RedisUtil.java
+++ b/collect/src/main/java/com/qianwen/mdc/collect/utils/redis/RedisUtil.java
@@ -170,7 +170,7 @@
      * @param key 閿�
      * @return 瀵瑰簲鐨勫涓敭鍊�
      */
-    public Map<Object, Object> hmget(String key) {
+    public Map<?, ?> hmget(String key) {
         return redisTemplate.opsForHash().entries(key);
     }
 
@@ -180,7 +180,7 @@
      * @param map 瀵瑰簲澶氫釜閿��
      * @return true 鎴愬姛 false 澶辫触
      */
-    public boolean hmset(String key, Map<String,Object> map) {
+    public boolean hmset(String key, Map<?,?> map) {
         try {
             redisTemplate.opsForHash().putAll(key, map);
             return true;
@@ -230,7 +230,7 @@
      * @param value 鍊�
      * @return true 鎴愬姛 false澶辫触
      */
-    public boolean hset(String key, Object item, Object value) {
+    public <HK, HV> boolean hset(String key, HK item, HV value) {
         try {
             redisTemplate.opsForHash().put(key, item, value);
             return true;
diff --git a/collect/src/main/resources/application-dev.yml b/collect/src/main/resources/application-dev.yml
index b5986ce..d47b0e7 100644
--- a/collect/src/main/resources/application-dev.yml
+++ b/collect/src/main/resources/application-dev.yml
@@ -1,7 +1,7 @@
 spring:
   redis:
-    database: 0
-    host: localhost
+    database: 5
+    host: 120.46.212.231
     port: 6379
     password: root
     timeout: 3000
@@ -23,7 +23,7 @@
   #蹇冭烦鏃堕棿
   keepalive: 10
   connectionTimeout: 3000 #杩炴帴瓒呮椂鏃堕棿
-  dataReceiveTopic: forward/testxxx #浠巌ot骞冲彴鎺ユ敹mqtt閲囬泦鏁版嵁鐨則opic forward/test
+  dataReceiveTopic: forward/test #浠巌ot骞冲彴鎺ユ敹mqtt閲囬泦鏁版嵁鐨則opic forward/test
   # mysql
 datasource:
   type: mysql

--
Gitblit v1.9.3