From 7ef593e1e3c35aaeecf9318f0b3941230d3ed002 Mon Sep 17 00:00:00 2001
From: yangys <y_ys79@sina.com>
Date: 星期三, 09 十月 2024 11:22:54 +0800
Subject: [PATCH] 增加在数据点计算规则后数据点名称加_n的适配

---
 collect/pom.xml                                                                                  |   11 --
 collect/src/main/java/com/qianwen/mdc/collect/service/DeviceStateAggregateNoFeedbackService.java |   86 ++++++++++++--------
 collect/src/main/java/com/qianwen/mdc/collect/service/CollectDataService.java                    |    5 -
 collect/src/main/java/com/qianwen/mdc/collect/controller/MqttController.java                     |    2 
 collect/src/main/java/com/qianwen/mdc/collect/service/DeviceStateService.java                    |   69 +++++++++++++++--
 collect/src/main/java/com/qianwen/mdc/collect/service/IOTMqttReceiveService.java                 |    9 ++
 collect/src/main/resources/application-dev.yml                                                   |    2 
 collect/src/main/resources/com/qianwen/mdc/collect/mapper/iotdb/DeviceStateMapper.xml            |    4 
 collect/src/main/java/com/qianwen/mdc/collect/job/FeedbackDealJob.java                           |    2 
 collect/src/main/java/com/qianwen/mdc/collect/handler/DeviceStatusDataHandler.java               |    4 
 10 files changed, 130 insertions(+), 64 deletions(-)

diff --git a/collect/pom.xml b/collect/pom.xml
index 90da6a3..682b8a1 100644
--- a/collect/pom.xml
+++ b/collect/pom.xml
@@ -165,27 +165,22 @@
     </dependencies>
 
     <build>
+    	<finalName>collect-api</finalName>
         <plugins>
             <plugin>
                 <groupId>org.springframework.boot</groupId>
                 <artifactId>spring-boot-maven-plugin</artifactId>
-                <!--<configuration>
+                <configuration>
 					<mainClass>com.qianwen.mdc.collect.MdcTansApplication</mainClass>
                     <layout>ZIP</layout>
                     <includes>
-                         鎵撳寘鏃跺寘鍚牳蹇冩ā鍧椾緷璧栧寘锛岄渶鎵嬪姩鎸囧畾 
                         <include>
                             <groupId>nothing</groupId>
                             <artifactId>nothing</artifactId>
                         </include>
                         
-                        <include>
-                            <groupId>com.xxx</groupId>
-                            <artifactId>common</artifactId>
-                        </include>
-                      
                     </includes>
-                </configuration>-->
+                </configuration>
             </plugin>
             
             <plugin>
diff --git a/collect/src/main/java/com/qianwen/mdc/collect/controller/MqttController.java b/collect/src/main/java/com/qianwen/mdc/collect/controller/MqttController.java
index 70c0011..18d1bc8 100644
--- a/collect/src/main/java/com/qianwen/mdc/collect/controller/MqttController.java
+++ b/collect/src/main/java/com/qianwen/mdc/collect/controller/MqttController.java
@@ -41,7 +41,7 @@
     public void testRec2() {
     	//鏁版嵁鏍煎紡锛歿"174":[{"values":{"d1":12},"ts":"1721978780449"}]} 174鏄簲鐢╥d
     	//澶氭潯鏍煎紡锛歿"174":[{"values":{"DeviceStatus":2},"ts":"1722478128278"},{"values":{"SpindleSpeed":22},"ts":"1722478128281"}]}
-    	String payload = "{\"174\":[{\"values\":{\"DeviceStatus\":2,\"Output\":38},\"ts\":\""+System.currentTimeMillis()+"\"}]}";
+    	String payload = "{\"174\":[{\"values\":{\"DeviceStatus_n\":2,\"Output\":38},\"ts\":\""+System.currentTimeMillis()+"\"}]}";
     	//payload = "{\"174\":[{\"values\":{\"Output\":11},\"ts\":\"1722478128278\"},{\"values\":{\"SpindleSpeed\":22},\"ts\":\"1722478128281\"}]}";
     	recService.handle(payload);
     }
diff --git a/collect/src/main/java/com/qianwen/mdc/collect/handler/DeviceStatusDataHandler.java b/collect/src/main/java/com/qianwen/mdc/collect/handler/DeviceStatusDataHandler.java
index c4ab080..31cfb59 100644
--- a/collect/src/main/java/com/qianwen/mdc/collect/handler/DeviceStatusDataHandler.java
+++ b/collect/src/main/java/com/qianwen/mdc/collect/handler/DeviceStatusDataHandler.java
@@ -70,9 +70,11 @@
     }
     
     int translateStatus(String statusVal) {
+    	
     	int oriStatus = Integer.valueOf(statusVal);
     	
     	int result = oriStatus;
+    	/*
     	//瑗块棬瀛�828d,   cnc_run_status: 杩愯鐘舵��(0锛歊ESET锛�1锛歋TOP锛�2锛欻OLD锛�3锛歋TART锛�4锛歋PENDLE_CW_CCW锛�5锛歄THER)
     	switch(oriStatus) {
 	    	case 3://START
@@ -94,7 +96,7 @@
     	log.info("statusconvert,ori={},result={}",oriStatus,result);
     	if(result == 0) {
     		result = 2;//
-    	}
+    	}*/
     	return result;
     }
 }
diff --git a/collect/src/main/java/com/qianwen/mdc/collect/job/FeedbackDealJob.java b/collect/src/main/java/com/qianwen/mdc/collect/job/FeedbackDealJob.java
index c7c229b..8fb660a 100644
--- a/collect/src/main/java/com/qianwen/mdc/collect/job/FeedbackDealJob.java
+++ b/collect/src/main/java/com/qianwen/mdc/collect/job/FeedbackDealJob.java
@@ -6,8 +6,6 @@
 import javax.annotation.Resource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
-import com.qianwen.mdc.collect.service.DeviceStateFixPointService;
 import com.qianwen.mdc.collect.service.feedback.WorkstationFeedbackService;
 
 import org.springframework.stereotype.Component;
diff --git a/collect/src/main/java/com/qianwen/mdc/collect/service/CollectDataService.java b/collect/src/main/java/com/qianwen/mdc/collect/service/CollectDataService.java
index f219f26..6e86e38 100644
--- a/collect/src/main/java/com/qianwen/mdc/collect/service/CollectDataService.java
+++ b/collect/src/main/java/com/qianwen/mdc/collect/service/CollectDataService.java
@@ -4,7 +4,6 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Random;
 import java.util.stream.Collectors;
 
 import org.apache.iotdb.isession.pool.SessionDataSetWrapper;
@@ -26,9 +25,6 @@
 import com.qianwen.mdc.collect.domain.TelemetryData;
 import com.qianwen.mdc.collect.domain.TelemetryDataItem;
 import com.qianwen.mdc.collect.mqtt.MqttMessageSender;
-import com.qianwen.mdc.collect.utils.redis.RedisUtil;
-
-import cn.hutool.json.JSONUtil;
 
 /**
  * 閲囬泦鏁版嵁澶勭悊鍏ュ簱
@@ -129,7 +125,6 @@
 		int rowIndex = 0;
 		
 		Map<String, List<TypedTelemetryData>> processParamsMap = parseTelemetryToTypedMapList(dt);
-		
 		
 		String[] nameArr = processParamsMap.keySet().toArray(new String[0]);
 		String name;
diff --git a/collect/src/main/java/com/qianwen/mdc/collect/service/DeviceStateAggregateNoFeedbackService.java b/collect/src/main/java/com/qianwen/mdc/collect/service/DeviceStateAggregateNoFeedbackService.java
index 875e9a1..ef1a33a 100644
--- a/collect/src/main/java/com/qianwen/mdc/collect/service/DeviceStateAggregateNoFeedbackService.java
+++ b/collect/src/main/java/com/qianwen/mdc/collect/service/DeviceStateAggregateNoFeedbackService.java
@@ -39,6 +39,8 @@
 	private IotDBSessionConfig iotdbConfig;
     @Autowired
 	private IotDBCommonService iotDBCommonService;
+    
+    private static final int MAX_COUNT = 1000;
 
     public List<AggregateState> stateAggregateForSpecialTimeRange(Long workstationId, StateAggregateTimeDTO timeRange, List<DeviceState> effectiveStateList) {
     	//鎸塼imeRange鏌ヨ鏃堕棿鍖洪棿鍐呯殑鐘舵�佹暟鎹紝闄や簡宸插垹闄ょ殑锛屽叾浠栨暟鎹兘鏌ュ嚭鏉ヤ簡
@@ -62,14 +64,6 @@
      * @param effectTimeRangeList
      */
     public void handlerAggregateState(List<AggregateState> result, Long workstationId, StateAggregateTimeDTO timeRange) {
-    	/*
-        if (Func.isNotEmpty(result)) {
-            Map<String, List> stringListMap = CommonUtil.groupList(getFinallyAggregateStateList(result, workstationId, effectTimeRangeList), CommonConstant.MAX_RECORDS_FOR_SQL_LENGTH.intValue());
-            stringListMap.forEach(k, v -> {
-                this.workstationAggregateStateMapper.batchSave(workstationId, v);
-            });
-        }*/
-    	
     	if(result.isEmpty()) {
     		return;
     	}
@@ -93,7 +87,7 @@
 		schemas.add(new MeasurementSchema("rps", TSDataType.INT32));
 		//schemas.add(new MeasurementSchema("is_sync", TSDataType.BOOLEAN));
 		schemas.add(new MeasurementSchema("is_plan", TSDataType.INT32));//TODO 杩欎釜灞炴�у簲璇ユ槸GlobalWcsOfRps涓殑鍊硷紝濡備綍濉啓锛�
-		//schemas.add(new MeasurementSchema("feedback_id", TSDataType.INT64));
+		
 		schemas.add(new MeasurementSchema("is_deleted", TSDataType.BOOLEAN));
 		schemas.add(new MeasurementSchema("employee_id", TSDataType.INT64));
 		
@@ -104,41 +98,63 @@
 		
 		Tablet tablet = new Tablet(deviceId, schemas);
 		tablet.rowSize = aggStates.size();
-	
+		
 		AggregateState aggState;
+		int tblIndex = 0;
+		
 		for(int i=0;i<aggStates.size();i++) {
 			aggState = aggStates.get(i);
-			tablet.addTimestamp(i, aggState.getTime());
-			tablet.addValue("workstation_id", i, aggState.getWorkstationId());
-			tablet.addValue("value_collect", i, aggState.getValueCollect());
-			tablet.addValue("end_time", i, aggState.getEndTime());
-			tablet.addValue("duration_collect", i, aggState.getDurationCollect());
-			tablet.addValue("calendar_code", i, aggState.getCalendarCode());
-			tablet.addValue("factory_year", i, aggState.getFactoryYear());
-			tablet.addValue("factory_month", i, aggState.getFactoryMonth());
-			tablet.addValue("factory_week", i, aggState.getFactoryWeek());
-			tablet.addValue("factory_date", i, aggState.getFactoryDate());
-			tablet.addValue("shift_index", i, aggState.getShiftIndex());
-			tablet.addValue("shift_time_type", i, aggState.getShiftTimeType());
-			tablet.addValue("wcs", i, aggState.getWcs());
-			tablet.addValue("rps", i, aggState.getRps());
+			tablet.addTimestamp(tblIndex, aggState.getTime());
+			tablet.addValue("workstation_id", tblIndex, aggState.getWorkstationId());
+			tablet.addValue("value_collect", tblIndex, aggState.getValueCollect());
+			tablet.addValue("end_time", tblIndex, aggState.getEndTime());
+			tablet.addValue("duration_collect", tblIndex, aggState.getDurationCollect());
+			tablet.addValue("calendar_code", tblIndex, aggState.getCalendarCode());
+			tablet.addValue("factory_year", tblIndex, aggState.getFactoryYear());
+			tablet.addValue("factory_month", tblIndex, aggState.getFactoryMonth());
+			tablet.addValue("factory_week", tblIndex, aggState.getFactoryWeek());
+			tablet.addValue("factory_date", tblIndex, aggState.getFactoryDate());
+			tablet.addValue("shift_index", tblIndex, aggState.getShiftIndex());
+			tablet.addValue("shift_time_type", tblIndex, aggState.getShiftTimeType());
+			tablet.addValue("wcs", tblIndex, aggState.getWcs());
+			tablet.addValue("rps", tblIndex, aggState.getRps());
 			
-			tablet.addValue("is_plan", i, aggState.getIsPlan());
-			//tablet.addValue("feedback_id", i, aggState.getFeedbackId());
-			tablet.addValue("is_deleted", i, aggState.getIsDeleted());
-			tablet.addValue("employee_id", i, aggState.getEmployeeId());
+			tablet.addValue("is_plan", tblIndex, aggState.getIsPlan());
+			tablet.addValue("is_deleted", tblIndex, aggState.getIsDeleted());
+			tablet.addValue("employee_id", tblIndex, aggState.getEmployeeId());
+			
+			tblIndex++;
+			if(tblIndex >= MAX_COUNT) {
+				try {
+					//姣忎釜宸ヤ綅鎵归噺鎻掑叆涓�娆℃暟鎹�
+					this.iotdbConfig.getSessionPool().insertAlignedTablet(tablet);
+					log.info("淇濆瓨鑱氬悎鐘舵�佸畬鎴恡blIndex={}",tblIndex);
+					tablet.reset();
+					tblIndex = 0;
+				} catch (Exception e) {
+					log.error("淇濆瓨鍥哄畾鐐规暟鎹紓甯�",e);
+				} 
+			}
 			
 		}
-		try {
-			this.iotdbConfig.getSessionPool().insertAlignedTablet(tablet);
-			log.info("淇濆瓨鑱氬悎鐘舵�佸畬鎴�");
-		} catch (Exception e) {
-			log.error("淇濆瓨鑱氬悎鐘舵�佹暟鎹紓甯�",e);
-		} 
+		
+		if(tblIndex > 0) {
+			try {
+				//姣忎釜宸ヤ綅鎵归噺鎻掑叆涓�娆℃暟鎹�
+				this.iotdbConfig.getSessionPool().insertAlignedTablet(tablet);
+				log.info("淇濆瓨鑱氬悎鐘舵�佸畬鎴恌inaltblIndex={}",tblIndex);
+				tablet.reset();
+				tblIndex = 0;
+			} catch (Exception e) {
+				log.error("淇濆瓨鍥哄畾鐐规暟鎹紓甯�",e);
+			} 
+			
+		}
+		
 			
     }
     
-
+    
     private List<AggregateState> getFinallyAggregateStateList(List<AggregateState> result, Long workstationId, StateAggregateTimeDTO timeRange) {
     	/*
         List<StateAggregateTimeDTO> effectTimeRangeList2 = effectTimeRangeList.stream().filter(x -> {
diff --git a/collect/src/main/java/com/qianwen/mdc/collect/service/DeviceStateService.java b/collect/src/main/java/com/qianwen/mdc/collect/service/DeviceStateService.java
index 95725fb..15eaad5 100644
--- a/collect/src/main/java/com/qianwen/mdc/collect/service/DeviceStateService.java
+++ b/collect/src/main/java/com/qianwen/mdc/collect/service/DeviceStateService.java
@@ -94,9 +94,68 @@
 			
 			Tablet tablet = new Tablet(deviceId, schemas);
 			
+			
 			states = entry.getValue();
 			tablet.rowSize = states.size();
 			DeviceState state;
+			
+			
+			final int MAX_COUNT = 1000;
+			//int currentIdx = 0;
+		
+				
+			int tblIndex = 0;
+			for(int i=0; i < states.size(); i++) {
+				state = states.get(i);
+				tablet.addTimestamp(tblIndex, state.getTime());
+				tablet.addValue("workstation_id", tblIndex, state.getWorkstationId());
+				tablet.addValue("value_collect", tblIndex, state.getValueCollect());
+				
+				tablet.addValue("calendar_code", tblIndex, state.getCalendarCode());
+				tablet.addValue("factory_year", tblIndex, state.getFactoryYear());
+				tablet.addValue("factory_month", tblIndex, state.getFactoryMonth());
+				tablet.addValue("factory_week", tblIndex, state.getFactoryWeek());
+				tablet.addValue("factory_date", tblIndex, state.getFactoryDate());
+				tablet.addValue("shift_index", tblIndex, state.getShiftIndex());
+				tablet.addValue("shift_time_type", tblIndex, state.getShiftTimeType());
+				tablet.addValue("wcs", tblIndex, state.getWcs());
+				tablet.addValue("rps", tblIndex, state.getRps());
+				tablet.addValue("is_fix_point", tblIndex, state.getIsFixPoint());
+				tablet.addValue("is_sync", tblIndex, state.getIsSync());
+				tablet.addValue("is_plan", tblIndex, state.getIsPlan());
+				tablet.addValue("feedback_point_type", tblIndex, state.getFeedbackPointType());
+				tablet.addValue("feedback_id", tblIndex, state.getFeedbackId());
+				tablet.addValue("is_deleted", tblIndex, state.getIsDeleted());
+				tablet.addValue("employee_id", tblIndex, state.getEmployeeId());
+				
+				tblIndex++;
+				
+				if(tblIndex >= MAX_COUNT) {
+					try {
+						//姣忎釜宸ヤ綅鎵归噺鎻掑叆涓�娆℃暟鎹�
+						this.iotdbConfig.getSessionPool().insertAlignedTablet(tablet);
+						log.info("淇濆瓨璁惧鐘舵�佸畬鎴�");
+						tablet.reset();
+						tblIndex = 0;
+					} catch (Exception e) {
+						log.error("淇濆瓨鍥哄畾鐐规暟鎹紓甯�",e);
+					} 
+				}
+			}
+			
+			if(tblIndex > 0) {
+				try {
+					//姣忎釜宸ヤ綅鎵归噺鎻掑叆涓�娆℃暟鎹�
+					this.iotdbConfig.getSessionPool().insertAlignedTablet(tablet);
+					log.info("淇濆瓨璁惧鐘舵�佸畬鎴�2");
+					tablet.reset();
+					tblIndex = 0;
+				} catch (Exception e) {
+					log.error("淇濆瓨鍥哄畾鐐规暟鎹紓甯�",e);
+				} 
+			}
+			
+			/*
 			for(int i=0;i<states.size();i++) {
 				state = states.get(i);
 				tablet.addTimestamp(i, state.getTime());
@@ -119,14 +178,8 @@
 				tablet.addValue("feedback_id", i, state.getFeedbackId());
 				tablet.addValue("is_deleted", i, state.getIsDeleted());
 				tablet.addValue("employee_id", i, state.getEmployeeId());
-				
-			}
-			try {
-				//姣忎釜宸ヤ綅鎵归噺鎻掑叆涓�娆℃暟鎹�
-				this.iotdbConfig.getSessionPool().insertAlignedTablet(tablet);
-			} catch (Exception e) {
-				log.error("淇濆瓨鍥哄畾鐐规暟鎹紓甯�",e);
-			} 
+			}*/
+			
 			
 		}
     }
diff --git a/collect/src/main/java/com/qianwen/mdc/collect/service/IOTMqttReceiveService.java b/collect/src/main/java/com/qianwen/mdc/collect/service/IOTMqttReceiveService.java
index 5b16ccd..63e7ee7 100644
--- a/collect/src/main/java/com/qianwen/mdc/collect/service/IOTMqttReceiveService.java
+++ b/collect/src/main/java/com/qianwen/mdc/collect/service/IOTMqttReceiveService.java
@@ -5,6 +5,7 @@
 import java.util.Optional;
 import java.util.Set;
 
+import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -55,12 +56,14 @@
         Set<String> keySet = jsonObj.keySet();
         String[] keys = keySet.toArray(new String[] {});
         
+        final String NEWDP_SUFFIX = "_n";//璁$畻瑙勫垯浣跨敤涔嬪悗鏂版暟鎹偣鐨勭粨灏�
         for(String key : keys) {
         	String appId = key;//iot绯荤粺涓殑搴旂敤id锛屾湰搴旂敤涓簲璇ョ敤琛ㄥ幓瀵瑰簲
         	long workstationId = getWorkstationIdByAppId(appId);
         	
         	TelemetryData tdata = new TelemetryData();
         	tdata.setWorkstationId(workstationId);
+        	
         	
         	JSONArray dtArr = jsonObj.getJSONArray(appId);
         	for(int i=0;i<dtArr.size();i++) {
@@ -74,7 +77,11 @@
 	        	
 	        	Set<String> valueKeySet = values.keySet();
 	        	valueKeySet.forEach(valueKey ->{
-	        		tdataItem.addPoint(valueKey,values.getString(valueKey));
+	        		String oriValueKey = valueKey;;//鐢变簬浣跨敤璁$畻瑙勫垯鐨勯噰闆嗙偣鍚嶇О浼氬悗闈㈠鍔犱竴涓�"_n",鎵�浠ヨ繖涓猳riValueKey浠h〃娌℃湁澧炲姞"_n"鐨�
+	        		if(StringUtils.endsWith(valueKey, NEWDP_SUFFIX)) {
+	        			oriValueKey = StringUtils.removeEnd(valueKey, NEWDP_SUFFIX);
+	        		}
+	        		tdataItem.addPoint(oriValueKey,values.getString(valueKey));//浣跨敤鍘熷閰嶇疆鐐逛繚鎸佷繚瀛樻暟鎹�
 	        	});
 	        	
 	        	tdata.addItem(tdataItem);
diff --git a/collect/src/main/resources/application-dev.yml b/collect/src/main/resources/application-dev.yml
index ba09542..5f73d6b 100644
--- a/collect/src/main/resources/application-dev.yml
+++ b/collect/src/main/resources/application-dev.yml
@@ -35,7 +35,7 @@
 #iotdb 浠ュ強鍏秊dbc涓�璧烽厤缃�
 iotdb:
   driver: org.apache.iotdb.jdbc.IoTDBDriver
-  host: 127.0.0.1
+  host: 120.46.212.231
   port: 6667
   maxSize: 100
   username: root
diff --git a/collect/src/main/resources/com/qianwen/mdc/collect/mapper/iotdb/DeviceStateMapper.xml b/collect/src/main/resources/com/qianwen/mdc/collect/mapper/iotdb/DeviceStateMapper.xml
index 0c357b7..668b060 100644
--- a/collect/src/main/resources/com/qianwen/mdc/collect/mapper/iotdb/DeviceStateMapper.xml
+++ b/collect/src/main/resources/com/qianwen/mdc/collect/mapper/iotdb/DeviceStateMapper.xml
@@ -73,13 +73,13 @@
 	</sql>
 	<!--  resultType="com.qianwen.mdc.collect.entity.iotdb.DeviceState" -->
 	<select id="lastSyncedNoFeedbackPointState" resultMap="BaseResultMap">
-        select <include refid="all_columns" /> FROM root.f2.aggregate_state_${workstationId}
+        select <include refid="all_columns" /> FROM root.f2.state_${workstationId}
         where is_sync=true and is_fix_point=false and feedback_point_type=0 and is_deleted=false
         order by time desc limit 1
     </select>
     
     <select id="firstNotSyncedNofeedbackPointState" resultMap="BaseResultMap">
-        select <include refid="all_columns" /> FROM root.f2.aggregate_state_${workstationId}
+        select <include refid="all_columns" /> FROM root.f2.state_${workstationId}
         where is_sync=false and is_fix_point=false and feedback_point_type=0 and is_deleted=false
         order by time asc limit 1
     </select>

--
Gitblit v1.9.3