From 859d6321b1b1c606de09e9b6a6286aaeace638fe Mon Sep 17 00:00:00 2001
From: yangys <y_ys79@sina.com>
Date: 星期三, 16 十月 2024 21:03:52 +0800
Subject: [PATCH] 接收采集数据,增加了使用配置数据点限制,配置了的数据点才会保存。未配置则丢弃

---
 collect/src/main/java/com/qianwen/mdc/collect/service/PackedDataService.java              |    5 
 collect/src/main/java/com/qianwen/mdc/collect/service/WorkstationDatapointsService.java   |   63 +++++++
 collect/src/main/java/com/qianwen/mdc/collect/handler/AlarmDataHandler.java               |   59 +++---
 collect/src/test/java/com/qianwen/mdc/collect/service/IOTMqttReceiveServiceTest.java      |   14 +
 collect/src/main/java/com/qianwen/mdc/collect/controller/MqttController.java              |    2 
 collect/src/main/java/com/qianwen/mdc/collect/domain/TelemetryData.java                   |   13 +
 collect/src/main/java/com/qianwen/mdc/collect/vo/WorkstationDatapointsVO.java             |  143 +++++++++++++++++
 collect/src/main/resources/application-dev.yml                                            |    4 
 collect/src/main/java/com/qianwen/mdc/collect/config/MqttConfig.java                      |   31 ++-
 collect/src/main/java/com/qianwen/mdc/collect/mapper/mgr/WorkstationDatapointsMapper.java |   11 +
 collect/src/main/java/com/qianwen/mdc/collect/service/CollectDataService.java             |   18 +-
 collect/src/main/java/com/qianwen/mdc/collect/runner/InitRunner.java                      |    8 
 collect/src/main/java/com/qianwen/mdc/collect/service/IOTMqttReceiveService.java          |   42 +++-
 collect/src/main/java/com/qianwen/mdc/collect/entity/mgr/WorkstationDatapoints.java       |   65 ++++++++
 14 files changed, 413 insertions(+), 65 deletions(-)

diff --git a/collect/src/main/java/com/qianwen/mdc/collect/config/MqttConfig.java b/collect/src/main/java/com/qianwen/mdc/collect/config/MqttConfig.java
index df16c29..21800ca 100644
--- a/collect/src/main/java/com/qianwen/mdc/collect/config/MqttConfig.java
+++ b/collect/src/main/java/com/qianwen/mdc/collect/config/MqttConfig.java
@@ -21,9 +21,11 @@
 import org.springframework.messaging.MessageChannel;
 import org.springframework.messaging.MessageHandler;
 
+import com.alibaba.fastjson.JSONObject;
 import com.qianwen.mdc.collect.service.DeviceStateFixPointService;
 import com.qianwen.mdc.collect.service.IOTMqttReceiveService;
 import com.qianwen.mdc.collect.service.WorkstationAppMappingService;
+import com.qianwen.mdc.collect.service.WorkstationDatapointsService;
 import com.qianwen.mdc.collect.service.feedback.WorkstationFeedbackService;
 
 import cn.hutool.core.date.DateTime;
@@ -40,19 +42,22 @@
 
     @Value("${mqtt.password:}")
     private String mqttPassword;
-
 	@Autowired
 	private IOTMqttReceiveService recService;
 	@Autowired
     private DeviceStateFixPointService stateFixPointService;
-	
 	@Autowired
     private WorkstationFeedbackService workstationFeedbackService;
-	
 	@Autowired
     private WorkstationAppMappingService workstationAppMappingService;
 	
-	public static final String COLLECT_DATA_TOPIC = "forward/test";
+	@Autowired
+    private WorkstationDatapointsService dpService;
+	/**
+	 * 鎺ユ敹鏁版嵁鐨刴qtt topic锛屽湪IOT骞冲彴閰嶇疆鐨�
+	 */
+	@Value("${mqtt.dataReceiveTopic:}")
+	public String COLLECT_DATA_TOPIC;
 	
 	/**
 	 * 鍙嶉鍒涘缓鐨則opic锛坢dc涓級锛屾湰搴旂敤鎺ユ敹骞跺鐞�
@@ -61,7 +66,10 @@
 	
 	public static final String WOCKSTATION_CREATE_TOPIC = "mdc/workstation-create";
 	
-	private final String WORKSTATION_APP_MAPPING_CHANGED_TOPIC = "mdc/workstation_app_mapping_changed";
+	/**
+	 * 宸ヤ綅鏁版嵁鐐瑰彉鍖�
+	 */
+	private final String WORKSTATION_DATAPOINT_CHANGED_TOPIC = "mdc/workstation_datapoint_changed";
 	
 	@Bean
     public MqttPahoClientFactory mqttClientFactory() {
@@ -93,7 +101,7 @@
     	String clientId = "spring-boot-mqtt-client-inbound"+r.nextInt(1000);
         MqttPahoMessageDrivenChannelAdapter adapter =
                 new MqttPahoMessageDrivenChannelAdapter(clientId,
-                        mqttClientFactory(), COLLECT_DATA_TOPIC, FEEDBACK_TOPIC,WOCKSTATION_CREATE_TOPIC,WORKSTATION_APP_MAPPING_CHANGED_TOPIC);//鏈�鍚庝竴涓弬鏁板厑璁稿涓猼opic鍙傛暟
+                        mqttClientFactory(), COLLECT_DATA_TOPIC, FEEDBACK_TOPIC,WOCKSTATION_CREATE_TOPIC,WORKSTATION_DATAPOINT_CHANGED_TOPIC);//鏈�鍚庝竴涓弬鏁板厑璁稿涓猼opic鍙傛暟
         adapter.setCompletionTimeout(5000);
         adapter.setConverter(new DefaultPahoMessageConverter());
         adapter.setQos(1);
@@ -123,10 +131,15 @@
         		logger.info("宸ヤ綅鍒涘缓鎺ユ敹娑堟伅={}",workstationId);
         		stateFixPointService.deviceStateFixPoint(DateTime.now(), Arrays.asList(workstationId));
         		recService.handle((String)message.getPayload());
-        	}else if(WORKSTATION_APP_MAPPING_CHANGED_TOPIC.equals(topic)) {
-        		String workstationId = (String)message.getPayload();
-        		logger.info("宸ヤ綅appId鏄犲皠鍙樺寲娑堟伅={}",workstationId);
+        	}else if(WORKSTATION_DATAPOINT_CHANGED_TOPIC.equals(topic)) {
+        		String payload = (String)message.getPayload();
+        		logger.info("宸ヤ綅appId鏄犲皠鍙樺寲娑堟伅={}",payload);
         		workstationAppMappingService.saveToCache();
+        		//娓呴櫎璇ュ伐浣嶇殑鏁版嵁鐐圭紦瀛�
+        		JSONObject payloadObj = JSONObject.parseObject(payload);
+        		//payloadObj.getLong("workstationId");
+        		
+        		dpService.datapointsCacheEvict(payloadObj.getString("appId"));
         	} else {//璁㈤槄浜嗗嚑涓猼opic灏变細鎺ユ敹鍒板嚑涓紝鍏朵粬鐨勪笉浼氳繘鏉�
         		logger.warn("topic={},msg={},鏃犲搴旂殑澶勭悊鍣�",topic,message.getPayload());
         	}
diff --git a/collect/src/main/java/com/qianwen/mdc/collect/controller/MqttController.java b/collect/src/main/java/com/qianwen/mdc/collect/controller/MqttController.java
index 18d1bc8..1638041 100644
--- a/collect/src/main/java/com/qianwen/mdc/collect/controller/MqttController.java
+++ b/collect/src/main/java/com/qianwen/mdc/collect/controller/MqttController.java
@@ -41,7 +41,7 @@
     public void testRec2() {
     	//鏁版嵁鏍煎紡锛歿"174":[{"values":{"d1":12},"ts":"1721978780449"}]} 174鏄簲鐢╥d
     	//澶氭潯鏍煎紡锛歿"174":[{"values":{"DeviceStatus":2},"ts":"1722478128278"},{"values":{"SpindleSpeed":22},"ts":"1722478128281"}]}
-    	String payload = "{\"174\":[{\"values\":{\"DeviceStatus_n\":2,\"Output\":38},\"ts\":\""+System.currentTimeMillis()+"\"}]}";
+    	String payload = "{\"182\":[{\"values\":{\"DeviceStatus_n\":2,\"Output\":38},\"ts\":\""+System.currentTimeMillis()+"\"}]}";
     	//payload = "{\"174\":[{\"values\":{\"Output\":11},\"ts\":\"1722478128278\"},{\"values\":{\"SpindleSpeed\":22},\"ts\":\"1722478128281\"}]}";
     	recService.handle(payload);
     }
diff --git a/collect/src/main/java/com/qianwen/mdc/collect/domain/TelemetryData.java b/collect/src/main/java/com/qianwen/mdc/collect/domain/TelemetryData.java
index e3ad96e..6d0d3ec 100644
--- a/collect/src/main/java/com/qianwen/mdc/collect/domain/TelemetryData.java
+++ b/collect/src/main/java/com/qianwen/mdc/collect/domain/TelemetryData.java
@@ -10,6 +10,10 @@
  */
 public class TelemetryData {
 	private long workstationId;
+	/**
+	 * iotdb鐨刟ppId
+	 */
+	private String appId;
 	private List<TelemetryDataItem> dataItems = new ArrayList<>();
 
 	public void addItem(TelemetryDataItem item) {
@@ -35,9 +39,16 @@
 	public void setDataItems(List<TelemetryDataItem> dataItems) {
 		this.dataItems = dataItems;
 	}
+	
+	public String getAppId() {
+		return appId;
+	}
+	public void setAppId(String appId) {
+		this.appId = appId;
+	}
 	@Override
 	public String toString() {
-		return "TelemetryData [workstationId=" + workstationId + ", dataItems=" + dataItems + "]";
+		return "TelemetryData [workstationId=" + workstationId + ", appId=" + appId + ", dataItems=" + dataItems + "]";
 	}
 	
 	
diff --git a/collect/src/main/java/com/qianwen/mdc/collect/entity/mgr/WorkstationDatapoints.java b/collect/src/main/java/com/qianwen/mdc/collect/entity/mgr/WorkstationDatapoints.java
new file mode 100644
index 0000000..0685661
--- /dev/null
+++ b/collect/src/main/java/com/qianwen/mdc/collect/entity/mgr/WorkstationDatapoints.java
@@ -0,0 +1,65 @@
+package com.qianwen.mdc.collect.entity.mgr;
+
+import com.baomidou.mybatisplus.annotation.TableName;
+import com.qianwen.core.mp.base.BaseEntity;
+
+import io.swagger.annotations.ApiModelProperty;
+
+
+/**
+ * 宸ヤ綅閲囬泦鏁版嵁鐐�
+ */
+@TableName("workstation_datapoints")
+public class WorkstationDatapoints  extends BaseEntity {
+    private static final long serialVersionUID = 1;
+    
+    @ApiModelProperty("妯℃澘绫诲瀷")
+    private Integer type;
+    
+    @ApiModelProperty("鐐逛綅閰嶇疆(json鏁扮粍)")
+    private String dpConfig;
+    
+    /**
+     * 宸ヤ綅id
+     */
+    private long workstationId;
+   
+    /**
+     * IOT骞冲彴appId
+     */
+    private String appId;
+    
+	public Integer getType() {
+		return type;
+	}
+
+	public void setType(Integer type) {
+		this.type = type;
+	}
+
+	public String getDpConfig() {
+		return dpConfig;
+	}
+
+	public void setDpConfig(String dpConfig) {
+		this.dpConfig = dpConfig;
+	}
+
+	public long getWorkstationId() {
+		return workstationId;
+	}
+
+	public void setWorkstationId(long workstationId) {
+		this.workstationId = workstationId;
+	}
+
+	public String getAppId() {
+		return appId;
+	}
+
+	public void setAppId(String appId) {
+		this.appId = appId;
+	}
+
+    
+}
diff --git a/collect/src/main/java/com/qianwen/mdc/collect/handler/AlarmDataHandler.java b/collect/src/main/java/com/qianwen/mdc/collect/handler/AlarmDataHandler.java
index bd0c2f2..de40a7b 100644
--- a/collect/src/main/java/com/qianwen/mdc/collect/handler/AlarmDataHandler.java
+++ b/collect/src/main/java/com/qianwen/mdc/collect/handler/AlarmDataHandler.java
@@ -1,12 +1,16 @@
 package com.qianwen.mdc.collect.handler;
 
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.List;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
+import com.alibaba.fastjson.JSONArray;
+import com.alibaba.fastjson.JSONObject;
 import com.qianwen.mdc.collect.dto.PackedTelemetryData;
 import com.qianwen.mdc.collect.entity.iotdb.Alarm;
 import com.qianwen.mdc.collect.service.AlarmService;
@@ -20,39 +24,38 @@
     @Override
     public void handleData(PackedTelemetryData data) {
     	
-    	Alarm alarm =  new Alarm();//Objects.requireNonNull(BeanUtil.copy(data, Alarm.class));
-    	alarm.setTime(data.getTime());
-    	alarm.setWorkstationId(data.getWorkstationId());
-    	alarm.setCalendarCode(data.getCalendarCode());
-    	alarm.setFactoryDate(data.getFactoryDate());
-    	alarm.setFactoryMonth(data.getFactoryMonth());
-    	alarm.setFactoryWeek(data.getFactoryWeek());
-    	alarm.setFactoryYear(data.getFactoryYear());
-    	alarm.setShiftIndex(data.getShiftIndex());
-    	alarm.setShiftTimeType(data.getShiftTimeType());
-    	fileAlarmDetail(alarm,data);
-    	alarmService.saveAlarms(Arrays.asList(alarm));
+    	alarmService.saveAlarms(parseAlarm(data));
         
-        log.info("鎶ヨ鏁版嵁澶勭悊瀹屾垚:鍛婅鏁版嵁{} ",alarm);
+        log.info("鎶ヨ鏁版嵁澶勭悊瀹屾垚:鍛婅鏁版嵁{} ",data);
         
     }
     
-    /**
-     * 濉厖鍛婅鐨勮缁嗕俊鎭紝绾у埆锛屼俊鎭紝浠g爜
-     * @param alarm
-     */
-    void fileAlarmDetail(Alarm alarm,PackedTelemetryData data){
-    	alarm.setCode(this.parseCode(data.getValue()));
-    	alarm.setMessage(data.getValue());
+    List<Alarm> parseAlarm(PackedTelemetryData data){
+    	List<Alarm> alarmList = new ArrayList<>();
+    	//828d json鏁扮粍鏍煎紡[{"alarmNo":"8084"}]
+    	JSONArray alarmArr = JSONArray.parseArray(data.getValue());
+    	
+    	for(int i=0;i<alarmArr.size();i++) {
+    		Alarm alarm = new Alarm();
+    		alarm.setTime(data.getTime());
+        	alarm.setWorkstationId(data.getWorkstationId());
+        	alarm.setCalendarCode(data.getCalendarCode());
+        	alarm.setFactoryDate(data.getFactoryDate());
+        	alarm.setFactoryMonth(data.getFactoryMonth());
+        	alarm.setFactoryWeek(data.getFactoryWeek());
+        	alarm.setFactoryYear(data.getFactoryYear());
+        	alarm.setShiftIndex(data.getShiftIndex());
+        	alarm.setShiftTimeType(data.getShiftTimeType());
+        	
+        	JSONObject alarmJson = alarmArr.getJSONObject(i);
+        	alarm.setCode(alarmJson.getString("alarmNo"));
+        	
+        	alarm.setMessage("");//鏆傛椂娌℃湁锛岄渶瑕佸弽鏌�
+        	
+        	alarmList.add(alarm);
+    	}
+    	return alarmList;
     }
     
     
-    /**
-     * 瑙f瀽鎶ヨ浠g爜
-     * @param collectText
-     * @return
-     */
-    String parseCode(String collectText) {
-    	return "0000";
-    }
 }
diff --git a/collect/src/main/java/com/qianwen/mdc/collect/mapper/mgr/WorkstationDatapointsMapper.java b/collect/src/main/java/com/qianwen/mdc/collect/mapper/mgr/WorkstationDatapointsMapper.java
new file mode 100644
index 0000000..b7b128e
--- /dev/null
+++ b/collect/src/main/java/com/qianwen/mdc/collect/mapper/mgr/WorkstationDatapointsMapper.java
@@ -0,0 +1,11 @@
+package com.qianwen.mdc.collect.mapper.mgr;
+
+import com.qianwen.core.mp.mapper.BladeMapper;
+import com.qianwen.mdc.collect.entity.mgr.WorkstationDatapoints;
+
+/**
+ * 宸ヤ綅鏁版嵁鐐筸apper
+ */
+public interface WorkstationDatapointsMapper extends BladeMapper<WorkstationDatapoints> {
+
+}
diff --git a/collect/src/main/java/com/qianwen/mdc/collect/runner/InitRunner.java b/collect/src/main/java/com/qianwen/mdc/collect/runner/InitRunner.java
index 4bc3d32..e97e613 100644
--- a/collect/src/main/java/com/qianwen/mdc/collect/runner/InitRunner.java
+++ b/collect/src/main/java/com/qianwen/mdc/collect/runner/InitRunner.java
@@ -30,13 +30,13 @@
     private TimeSliceCache timeSliceCache;
     @Autowired
     private DeviceStateMapper deviceStateMapper;
-    @Autowired
-    private WorkstationAppMappingService mappingService;;
+    //@Autowired
+    //private WorkstationAppMappingService mappingService;;
     
     @Override
     public void run(ApplicationArguments args) throws Exception {
     	
-    	mappingService.saveToCache();
+    	//mappingService.saveToCache();
     	
     	//鐢熸垚鏃堕棿鍒囩墖
         CacheBuildDTO cacheBuildDTO = CacheBuildDTO.builder().tenantIds(Sets.newHashSet(new String[]{"000000"})).targetDate(LocalDate.now()).build();
@@ -48,7 +48,7 @@
     //@RedisLock("posting:lock:initStateFixPoint")
     public void checkNeedStateFixPoint() {
         DateTime dateTime = DateTime.now();
-        log.info("杩涘叆绋嬪簭鍚姩鏍¢獙鏄惁瀛樺湪宸ヤ綅鎵撹繃鍥哄畾鐐�....... ");
+        log.info("绋嬪簭鍚姩鏍¢獙鏄惁瀛樺湪宸ヤ綅鎵撹繃鍥哄畾鐐�....... ");
        
         Long count = deviceStateMapper.fixPointCountByDate(Integer.valueOf(DatePattern.PURE_DATE_FORMAT.format(dateTime)));
         /*
diff --git a/collect/src/main/java/com/qianwen/mdc/collect/service/CollectDataService.java b/collect/src/main/java/com/qianwen/mdc/collect/service/CollectDataService.java
index 6e86e38..f599308 100644
--- a/collect/src/main/java/com/qianwen/mdc/collect/service/CollectDataService.java
+++ b/collect/src/main/java/com/qianwen/mdc/collect/service/CollectDataService.java
@@ -25,6 +25,7 @@
 import com.qianwen.mdc.collect.domain.TelemetryData;
 import com.qianwen.mdc.collect.domain.TelemetryDataItem;
 import com.qianwen.mdc.collect.mqtt.MqttMessageSender;
+import com.qianwen.mdc.collect.vo.WorkstationDatapointsVO;
 
 /**
  * 閲囬泦鏁版嵁澶勭悊鍏ュ簱
@@ -33,7 +34,6 @@
 public class CollectDataService {
 	private static final Logger log = LoggerFactory.getLogger(CollectDataService.class);
 	
-	//private String DB_PREFIX = "root.f2.";
 	private static final Map<Integer, String> PROCESS_PARAM_MAP = new HashMap<>();
 	@Autowired
 	private IotDBSessionConfig iotdbConfig;
@@ -41,6 +41,7 @@
 	private IotDBCommonService iotDBCommonService;
 	@Autowired
 	private MqttMessageSender mqttMessageSender;
+	
 	
 	/**
 	 * 瀹炴椂鏁版嵁topic锛岃涓巑dc閲岄潰寰楃浉鍚�
@@ -63,6 +64,7 @@
 	public void handleCollectData(List<TelemetryData> telemetryDataList) {
 
 		for (TelemetryData dt : telemetryDataList) {
+			
 			handleOneWorkstation(dt);
 			
 			sendRealtimeDataMsg(dt);
@@ -113,14 +115,14 @@
 	 * @param dt
 	 */
 	void handleOneWorkstation(TelemetryData dt) {
-		String deviceId;// = DB_PREFIX+TEMPLATE_NAME + "_" + dt.getWorkstationId();
-
-		List<MeasurementSchema> schemas = new ArrayList<>();
+		String deviceId;
 		
+		//long workstationId = dpVo.getWorkstationId();
+		
+		List<MeasurementSchema> schemas = new ArrayList<>();
 		schemas.add(new MeasurementSchema("workstation_id", TSDataType.INT64));
 		schemas.add(new MeasurementSchema("n", TSDataType.TEXT));
 		schemas.add(new MeasurementSchema("v", TSDataType.TEXT));
-		
 		
 		int rowIndex = 0;
 		
@@ -137,6 +139,8 @@
 			iotDBCommonService.setTemmplateIfNotSet(TEMPLATE_NAME, deviceId);
 			Tablet tablet = new Tablet(deviceId, schemas);
 			for(TypedTelemetryData tdata : typeList) {
+				
+				
 				rowIndex = tablet.rowSize++;
 				tablet.addTimestamp(rowIndex, tdata.getTime());
 				tablet.addValue("workstation_id",rowIndex,dt.getWorkstationId());
@@ -144,18 +148,16 @@
 				if(!tdata.getName().equals("Alarm")) {
 					tablet.addValue("v",rowIndex,tdata.getValue());
 				}else {
-					//鍛婅淇℃伅鏍规嵁鍘熺増闇�瑕佸鐞嗕竴涓嬶紝鏍煎紡鏈猨son瀵硅薄锛歿"timestamp":1718839644476,"code":1000,"msg":"EMERGENCY STOP","alarmtype":15,"level":""}
+					//鍛婅淇℃伅鏍规嵁鍘熺増闇�瑕佸鐞嗕竴涓嬶紝鏍煎紡涓簀son瀵硅薄锛歿"timestamp":1718839644476,"code":1000,"msg":"EMERGENCY STOP","alarmtype":15,"level":""}
 					tablet.addValue("v",rowIndex,formatAlarmMsg(tdata.getTime(),tdata.getValue()));
 				}
 			}
 			
 			try {
 				iotdbConfig.getSessionPool().insertAlignedTablet(tablet);
-				
 				//updateLastParam(dt.getWorkstationId(),typeList);
 			} catch (Exception e) {
 				log.error("IOTDB鍏ュ簱澶辫触",e);
-				e.printStackTrace();
 			}finally {
 				//iotdbConfig.getSessionPool().clo1se();
 			}
diff --git a/collect/src/main/java/com/qianwen/mdc/collect/service/IOTMqttReceiveService.java b/collect/src/main/java/com/qianwen/mdc/collect/service/IOTMqttReceiveService.java
index 63e7ee7..3246a2f 100644
--- a/collect/src/main/java/com/qianwen/mdc/collect/service/IOTMqttReceiveService.java
+++ b/collect/src/main/java/com/qianwen/mdc/collect/service/IOTMqttReceiveService.java
@@ -2,7 +2,6 @@
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Optional;
 import java.util.Set;
 
 import org.apache.commons.lang3.StringUtils;
@@ -16,6 +15,7 @@
 import com.qianwen.mdc.collect.domain.TelemetryData;
 import com.qianwen.mdc.collect.domain.TelemetryDataItem;
 import com.qianwen.mdc.collect.utils.redis.RedisUtil;
+import com.qianwen.mdc.collect.vo.WorkstationDatapointsVO;
 
 @Service
 public class IOTMqttReceiveService {
@@ -26,13 +26,14 @@
 	@Autowired
 	private PackedDataService packedDataService;
 	@Autowired
-	private  RedisUtil redisUtil;
+	private RedisUtil redisUtil;
+	@Autowired
+    private WorkstationDatapointsService dpService;
 	/**
 	 * 澶勭悊鏀跺埌鐨勬秷鎭�,瀵瑰簲TelemetryDataPostingConsumer
 	 * @param payload
 	 */
 	public void handle(String payload) {
-        //System.out.println("Received message122: " + payload);
         //瑙f瀽娑堟伅
         List<TelemetryData> teleList = parsePayload(payload);
         
@@ -48,22 +49,32 @@
 	 */
 	List<TelemetryData> parsePayload(String payload){
 		List<TelemetryData> dtList = new ArrayList<TelemetryData> ();
-		//鏁版嵁鏍煎紡锛歿"174":[{"values":{"d1":12},"ts":"1721978780449"}]} 174鏄簲鐢╥d
-    	//澶氭潯鏍煎紡锛歿"174":[{"values":{"output":11},"ts":"1722478128278"},{"values":{"spindleSpeed":22},"ts":"1722478128281"}]}
+    	//澶氭潯 鏁版嵁鏍煎紡锛歿"174":[{"values":{"output":11},"ts":"1722478128278"},{"values":{"spindleSpeed":22},"ts":"1722478128281"}]}   174鏄簲鐢╥d
 		//瑙f瀽娑堟伅 name,value褰㈠紡锛屽n=output,v=11
         JSONObject jsonObj = JSONObject.parseObject(payload);
         
         Set<String> keySet = jsonObj.keySet();
         String[] keys = keySet.toArray(new String[] {});
         
+        //WorkstationDatapointsVO dpVo;
+        
         final String NEWDP_SUFFIX = "_n";//璁$畻瑙勫垯浣跨敤涔嬪悗鏂版暟鎹偣鐨勭粨灏�
         for(String key : keys) {
         	String appId = key;//iot绯荤粺涓殑搴旂敤id锛屾湰搴旂敤涓簲璇ョ敤琛ㄥ幓瀵瑰簲
-        	long workstationId = getWorkstationIdByAppId(appId);
+        	
+        	
+        	//TODO 鑾峰彇宸ヤ綅鏁版嵁鐐归厤缃�,鍙繚瀛橀厤缃ソ鐨勬暟鎹偣锛屾病鏈夐厤缃殑閲囬泦鏁版嵁鎶涘純銆�
+        	final WorkstationDatapointsVO dpVo = dpService.getDatapointsByAppIdFromCache(appId);
+        	if(dpVo == null) {
+        		//宸ヤ綅娌℃湁瀹氫箟杩囨暟鎹偣鎴栬�卆ppId涓嶅尮閰�
+        		log.warn("appId={}鏈壘鍒版暟鎹偣瀹氫箟璁板綍锛屼涪寮冩暟鎹�",appId);
+        		continue;
+        	}
         	
         	TelemetryData tdata = new TelemetryData();
-        	tdata.setWorkstationId(workstationId);
-        	
+        	//tdata.setWorkstationId(workstationId);
+        	tdata.setAppId(appId);
+        	tdata.setWorkstationId(dpVo.getWorkstationId());
         	
         	JSONArray dtArr = jsonObj.getJSONArray(appId);
         	for(int i=0;i<dtArr.size();i++) {
@@ -81,6 +92,10 @@
 	        		if(StringUtils.endsWith(valueKey, NEWDP_SUFFIX)) {
 	        			oriValueKey = StringUtils.removeEnd(valueKey, NEWDP_SUFFIX);
 	        		}
+	        		if(!dpVo.containsDataPoint(oriValueKey)) {
+	    				//濡傛灉涓嶅瓨鍦ㄨ鏁版嵁鐐归厤缃紝璇ユ暟鎹洿鎺ュ拷鐣�
+	    				return;
+	    			}
 	        		tdataItem.addPoint(oriValueKey,values.getString(valueKey));//浣跨敤鍘熷閰嶇疆鐐逛繚鎸佷繚瀛樻暟鎹�
 	        	});
 	        	
@@ -98,11 +113,16 @@
 	 * @param appId
 	 * @return
 	 */
-	public long getWorkstationIdByAppId(String appId) {
+	/*
+	public Long getWorkstationIdByAppId(String appId) {
 
 		Object wid = redisUtil.hget("workstation-appid-map", appId);
-		String workstationId = String.valueOf(Optional.ofNullable(wid).orElse(appId));
-	
+		
+		String workstationId = String.valueOf(Optional.ofNullable(wid).orElse(StringUtils.EMPTY));
+		if(ObjectUtil.isEmpty(workstationId)) {
+			return null;
+		}
 		return Long.parseLong(workstationId);
 	}
+	*/
 }
diff --git a/collect/src/main/java/com/qianwen/mdc/collect/service/PackedDataService.java b/collect/src/main/java/com/qianwen/mdc/collect/service/PackedDataService.java
index 99392cd..2e5d115 100644
--- a/collect/src/main/java/com/qianwen/mdc/collect/service/PackedDataService.java
+++ b/collect/src/main/java/com/qianwen/mdc/collect/service/PackedDataService.java
@@ -47,6 +47,8 @@
 	private WorkstationCache workstationCache;
 	@Autowired
 	private TimeSliceCache timeSliceCache;
+	@Autowired
+	private WorkstationDatapointsService dpService;
 
 	static {
 		PROCESS_PARAM_MAP.put(1, "STATE");
@@ -91,12 +93,13 @@
 					String[] keys = map.keySet().toArray(new String[0]);
 					for (int j = 0; j < keys.length; j++) {
 						//TODO: 杩欓噷锛屽師绯荤粺杩涜浜嗚繃婊ゃ�俉orkstationCollectDataServiceImpl.handlerWorkstationCollectData閲�
+						
 						PackedTelemetryData pkData = new PackedTelemetryData();
 						pkData.setWorkstationId(tdata.getWorkstationId());
 						pkData.setValue(map.get(keys[j]));
 						pkData.setTime(item.getTime());
 
-						pkData.setName(keys[j]);//鍙傛暟鍚嶇О
+						pkData.setName(keys[j]);//鏁版嵁鐐瑰悕绉�
 								
 						fillByCalendar(pkData);
 						
diff --git a/collect/src/main/java/com/qianwen/mdc/collect/service/WorkstationDatapointsService.java b/collect/src/main/java/com/qianwen/mdc/collect/service/WorkstationDatapointsService.java
new file mode 100644
index 0000000..7762b5b
--- /dev/null
+++ b/collect/src/main/java/com/qianwen/mdc/collect/service/WorkstationDatapointsService.java
@@ -0,0 +1,63 @@
+package com.qianwen.mdc.collect.service;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.cache.annotation.CacheEvict;
+import org.springframework.cache.annotation.Cacheable;
+import org.springframework.stereotype.Service;
+
+import com.baomidou.mybatisplus.core.toolkit.Wrappers;
+import com.qianwen.core.mp.base.BaseServiceImpl;
+import com.qianwen.mdc.collect.entity.mgr.WorkstationDatapoints;
+import com.qianwen.mdc.collect.mapper.mgr.WorkstationDatapointsMapper;
+import com.qianwen.mdc.collect.vo.WorkstationDatapointsVO;
+/**
+ * 宸ヤ綅鏁版嵁鐐规湇鍔★紝涓昏鐢ㄤ簬鑾峰彇鏁版嵁鐐瑰畾涔夈�傚畾涔夊湪smartman搴旂敤
+ */
+@Service
+public class WorkstationDatapointsService extends BaseServiceImpl<WorkstationDatapointsMapper, WorkstationDatapoints> {
+	private Logger log = LoggerFactory.getLogger(this.getClass());
+	
+	
+	/**
+	 * 鑾峰彇宸ヤ綅鏁版嵁鐐归厤缃� ,浠庣紦瀛�
+	 * @param workstationId
+	 * @return
+	 */
+	@Cacheable(value = "collect:datapoint" ,key = "#appId")
+	public WorkstationDatapointsVO getDatapointsByAppIdFromCache(String appId) {
+		return this.getDataPointByAppId(appId);
+	}
+	
+	/*
+	@CachePut(value = "collect:datapoint" ,key = "#appId")
+	public WorkstationDatapointsVO datapointsCachePut(String appId) {
+		return this.getDataPointByAppId(appId);
+	}
+	*/
+	
+	@CacheEvict(value = "collect:datapoint" ,key = "#appId")
+	public void datapointsCacheEvict(String appId) {
+		
+	}
+	/**
+	 * 浠庢暟鎹簱鏌ヨ鏁版嵁鐐瑰畾涔�
+	 * @param workstationId 宸ヤ綅id
+	 * @return
+	 */
+	WorkstationDatapointsVO getDataPointByAppId(String appId) {
+		log.info("appid={}", appId);
+		WorkstationDatapoints dp = baseMapper.selectOne(Wrappers.<WorkstationDatapoints>lambdaQuery()
+				.eq(WorkstationDatapoints::getAppId, appId));
+
+		log.info("dp={}", dp);
+
+		WorkstationDatapointsVO dpVO = null;
+
+		if (dp != null) {
+			dpVO = new WorkstationDatapointsVO(dp.getWorkstationId(), dp.getAppId(), dp.getDpConfig());
+		}
+		return dpVO;
+	}
+
+}
diff --git a/collect/src/main/java/com/qianwen/mdc/collect/vo/WorkstationDatapointsVO.java b/collect/src/main/java/com/qianwen/mdc/collect/vo/WorkstationDatapointsVO.java
new file mode 100644
index 0000000..7404419
--- /dev/null
+++ b/collect/src/main/java/com/qianwen/mdc/collect/vo/WorkstationDatapointsVO.java
@@ -0,0 +1,143 @@
+package com.qianwen.mdc.collect.vo;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang3.StringUtils;
+
+import com.alibaba.fastjson.JSONArray;
+import com.alibaba.fastjson.JSONObject;
+
+import cn.hutool.core.util.ObjectUtil;
+import cn.hutool.json.JSONUtil;
+import io.swagger.annotations.ApiModelProperty;
+
+
+/**
+ * 宸ヤ綅閲囬泦鏁版嵁鐐筕O
+ */
+public class WorkstationDatapointsVO implements Serializable{
+    
+    /**
+	 * 搴忓垪鍖栵紝搴斾负闇�瑕乻pring缂撳瓨
+	 */
+	private static final long serialVersionUID = 6558493027948435061L;
+
+	@ApiModelProperty("鐐逛綅琛ㄥご(json鏁扮粍)")
+    private String dpHead;
+    
+    @ApiModelProperty("鐐逛綅閰嶇疆(json鏁扮粍)")
+    private String dpConfig;
+    
+    /**
+     * 宸ヤ綅id
+     */
+    private long workstationId;
+   
+    /**
+     * IOT骞冲彴appId
+     */
+    private String appId;
+    
+    //private List<DataPoint> points = null;
+    private List<String> points = new ArrayList<>();
+
+	public WorkstationDatapointsVO(long workstationId, String appId,String dpConfig) {
+		super();
+		this.dpConfig = dpConfig;
+		this.workstationId = workstationId;
+		this.appId = appId;
+		
+		initPoints();
+	}
+	void initPoints() {
+		if(ObjectUtil.isEmpty(dpConfig)) {
+			return;
+		}
+		
+		JSONArray ptArr = JSONArray.parseArray(dpConfig);
+		
+		points = new ArrayList<>();
+		JSONObject ptObj;
+		for(int i=0;i<ptArr.size();i++) {
+			ptObj = ptArr.getJSONObject(i);
+			
+			//DataPoint dp = new DataPoint();
+			//dp.setDpName(ptObj.getString("dpName"));
+			
+			points.add(ptObj.getString("dpName"));
+		}
+	}
+
+	public String getDpConfig() {
+		return dpConfig;
+	}
+
+	public void setDpConfig(String dpConfig) {
+		this.dpConfig = dpConfig;
+	}
+
+	public long getWorkstationId() {
+		return workstationId;
+	}
+
+	public void setWorkstationId(long workstationId) {
+		this.workstationId = workstationId;
+	}
+
+	public String getDpHead() {
+		return dpHead;
+	}
+
+	public void setDpHead(String dpHead) {
+		this.dpHead = dpHead;
+	}
+
+	public String getAppId() {
+		return appId;
+	}
+
+	public void setAppId(String appId) {
+		this.appId = appId;
+	}
+
+	
+	/**
+	 * 鍒ゆ柇閲囬泦鍙橀噺鍚嶇О鏄惁鍦ㄦ暟鎹偣閰嶇疆绉�
+	 * @param dpName
+	 * @return
+	 */
+	public boolean containsDataPoint(String dpName) {
+		if(ObjectUtil.isEmpty(points)) {
+			return false;
+		}
+		
+		return points.contains(dpName);
+		/*
+		for(String dpn : points) {
+			if(StringUtils.equals(dpn, dpName)) {
+				return true;
+			}
+		}
+		return false;
+		*/
+	}
+
+    
+}
+/*
+class DataPoint{
+	private String dpName;
+
+	public String getDpName() {
+		return dpName;
+	}
+
+	public void setDpName(String dpName) {
+		this.dpName = dpName;
+	}
+	
+	
+}
+*/
diff --git a/collect/src/main/resources/application-dev.yml b/collect/src/main/resources/application-dev.yml
index dda4930..afd9ca6 100644
--- a/collect/src/main/resources/application-dev.yml
+++ b/collect/src/main/resources/application-dev.yml
@@ -23,7 +23,7 @@
   #蹇冭烦鏃堕棿
   keepalive: 10
   connectionTimeout: 3000 #杩炴帴瓒呮椂鏃堕棿
-
+  dataReceiveTopic: forward/test #浠巌ot骞冲彴鎺ユ敹mqtt閲囬泦鏁版嵁鐨則opic
   # mysql
 datasource:
   type: mysql
@@ -35,7 +35,7 @@
 #iotdb 浠ュ強鍏秊dbc涓�璧烽厤缃�
 iotdb:
   driver: org.apache.iotdb.jdbc.IoTDBDriver
-  host: 120.46.212.231
+  host: localhost #120.46.212.231
   port: 6667
   maxSize: 100
   username: root
diff --git a/collect/src/test/java/com/qianwen/mdc/collect/service/IOTMqttReceiveServiceTest.java b/collect/src/test/java/com/qianwen/mdc/collect/service/IOTMqttReceiveServiceTest.java
new file mode 100644
index 0000000..cd44a41
--- /dev/null
+++ b/collect/src/test/java/com/qianwen/mdc/collect/service/IOTMqttReceiveServiceTest.java
@@ -0,0 +1,14 @@
+package com.qianwen.mdc.collect.service;
+
+import static org.junit.Assert.assertNull;
+
+import org.junit.jupiter.api.Test;
+
+public class IOTMqttReceiveServiceTest {
+
+	@Test
+	public void t() {
+		
+		//System.out.println(a == null);
+	}
+}

--
Gitblit v1.9.3