From 7ef593e1e3c35aaeecf9318f0b3941230d3ed002 Mon Sep 17 00:00:00 2001 From: yangys <y_ys79@sina.com> Date: 星期三, 09 十月 2024 11:22:54 +0800 Subject: [PATCH] 增加在数据点计算规则后数据点名称加_n的适配 --- collect/pom.xml | 11 -- collect/src/main/java/com/qianwen/mdc/collect/service/DeviceStateAggregateNoFeedbackService.java | 86 ++++++++++++-------- collect/src/main/java/com/qianwen/mdc/collect/service/CollectDataService.java | 5 - collect/src/main/java/com/qianwen/mdc/collect/controller/MqttController.java | 2 collect/src/main/java/com/qianwen/mdc/collect/service/DeviceStateService.java | 69 +++++++++++++++-- collect/src/main/java/com/qianwen/mdc/collect/service/IOTMqttReceiveService.java | 9 ++ collect/src/main/resources/application-dev.yml | 2 collect/src/main/resources/com/qianwen/mdc/collect/mapper/iotdb/DeviceStateMapper.xml | 4 collect/src/main/java/com/qianwen/mdc/collect/job/FeedbackDealJob.java | 2 collect/src/main/java/com/qianwen/mdc/collect/handler/DeviceStatusDataHandler.java | 4 10 files changed, 130 insertions(+), 64 deletions(-) diff --git a/collect/pom.xml b/collect/pom.xml index 90da6a3..682b8a1 100644 --- a/collect/pom.xml +++ b/collect/pom.xml @@ -165,27 +165,22 @@ </dependencies> <build> + <finalName>collect-api</finalName> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> - <!--<configuration> + <configuration> <mainClass>com.qianwen.mdc.collect.MdcTansApplication</mainClass> <layout>ZIP</layout> <includes> - 鎵撳寘鏃跺寘鍚牳蹇冩ā鍧椾緷璧栧寘锛岄渶鎵嬪姩鎸囧畾 <include> <groupId>nothing</groupId> <artifactId>nothing</artifactId> </include> - <include> - <groupId>com.xxx</groupId> - <artifactId>common</artifactId> - </include> - </includes> - </configuration>--> + </configuration> </plugin> <plugin> diff --git a/collect/src/main/java/com/qianwen/mdc/collect/controller/MqttController.java b/collect/src/main/java/com/qianwen/mdc/collect/controller/MqttController.java index 70c0011..18d1bc8 100644 --- a/collect/src/main/java/com/qianwen/mdc/collect/controller/MqttController.java +++ b/collect/src/main/java/com/qianwen/mdc/collect/controller/MqttController.java @@ -41,7 +41,7 @@ public void testRec2() { //鏁版嵁鏍煎紡锛歿"174":[{"values":{"d1":12},"ts":"1721978780449"}]} 174鏄簲鐢╥d //澶氭潯鏍煎紡锛歿"174":[{"values":{"DeviceStatus":2},"ts":"1722478128278"},{"values":{"SpindleSpeed":22},"ts":"1722478128281"}]} - String payload = "{\"174\":[{\"values\":{\"DeviceStatus\":2,\"Output\":38},\"ts\":\""+System.currentTimeMillis()+"\"}]}"; + String payload = "{\"174\":[{\"values\":{\"DeviceStatus_n\":2,\"Output\":38},\"ts\":\""+System.currentTimeMillis()+"\"}]}"; //payload = "{\"174\":[{\"values\":{\"Output\":11},\"ts\":\"1722478128278\"},{\"values\":{\"SpindleSpeed\":22},\"ts\":\"1722478128281\"}]}"; recService.handle(payload); } diff --git a/collect/src/main/java/com/qianwen/mdc/collect/handler/DeviceStatusDataHandler.java b/collect/src/main/java/com/qianwen/mdc/collect/handler/DeviceStatusDataHandler.java index c4ab080..31cfb59 100644 --- a/collect/src/main/java/com/qianwen/mdc/collect/handler/DeviceStatusDataHandler.java +++ b/collect/src/main/java/com/qianwen/mdc/collect/handler/DeviceStatusDataHandler.java @@ -70,9 +70,11 @@ } int translateStatus(String statusVal) { + int oriStatus = Integer.valueOf(statusVal); int result = oriStatus; + /* //瑗块棬瀛�828d, cnc_run_status: 杩愯鐘舵��(0锛歊ESET锛�1锛歋TOP锛�2锛欻OLD锛�3锛歋TART锛�4锛歋PENDLE_CW_CCW锛�5锛歄THER) switch(oriStatus) { case 3://START @@ -94,7 +96,7 @@ log.info("statusconvert,ori={},result={}",oriStatus,result); if(result == 0) { result = 2;// - } + }*/ return result; } } diff --git a/collect/src/main/java/com/qianwen/mdc/collect/job/FeedbackDealJob.java b/collect/src/main/java/com/qianwen/mdc/collect/job/FeedbackDealJob.java index c7c229b..8fb660a 100644 --- a/collect/src/main/java/com/qianwen/mdc/collect/job/FeedbackDealJob.java +++ b/collect/src/main/java/com/qianwen/mdc/collect/job/FeedbackDealJob.java @@ -6,8 +6,6 @@ import javax.annotation.Resource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - -import com.qianwen.mdc.collect.service.DeviceStateFixPointService; import com.qianwen.mdc.collect.service.feedback.WorkstationFeedbackService; import org.springframework.stereotype.Component; diff --git a/collect/src/main/java/com/qianwen/mdc/collect/service/CollectDataService.java b/collect/src/main/java/com/qianwen/mdc/collect/service/CollectDataService.java index f219f26..6e86e38 100644 --- a/collect/src/main/java/com/qianwen/mdc/collect/service/CollectDataService.java +++ b/collect/src/main/java/com/qianwen/mdc/collect/service/CollectDataService.java @@ -4,7 +4,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Random; import java.util.stream.Collectors; import org.apache.iotdb.isession.pool.SessionDataSetWrapper; @@ -26,9 +25,6 @@ import com.qianwen.mdc.collect.domain.TelemetryData; import com.qianwen.mdc.collect.domain.TelemetryDataItem; import com.qianwen.mdc.collect.mqtt.MqttMessageSender; -import com.qianwen.mdc.collect.utils.redis.RedisUtil; - -import cn.hutool.json.JSONUtil; /** * 閲囬泦鏁版嵁澶勭悊鍏ュ簱 @@ -129,7 +125,6 @@ int rowIndex = 0; Map<String, List<TypedTelemetryData>> processParamsMap = parseTelemetryToTypedMapList(dt); - String[] nameArr = processParamsMap.keySet().toArray(new String[0]); String name; diff --git a/collect/src/main/java/com/qianwen/mdc/collect/service/DeviceStateAggregateNoFeedbackService.java b/collect/src/main/java/com/qianwen/mdc/collect/service/DeviceStateAggregateNoFeedbackService.java index 875e9a1..ef1a33a 100644 --- a/collect/src/main/java/com/qianwen/mdc/collect/service/DeviceStateAggregateNoFeedbackService.java +++ b/collect/src/main/java/com/qianwen/mdc/collect/service/DeviceStateAggregateNoFeedbackService.java @@ -39,6 +39,8 @@ private IotDBSessionConfig iotdbConfig; @Autowired private IotDBCommonService iotDBCommonService; + + private static final int MAX_COUNT = 1000; public List<AggregateState> stateAggregateForSpecialTimeRange(Long workstationId, StateAggregateTimeDTO timeRange, List<DeviceState> effectiveStateList) { //鎸塼imeRange鏌ヨ鏃堕棿鍖洪棿鍐呯殑鐘舵�佹暟鎹紝闄や簡宸插垹闄ょ殑锛屽叾浠栨暟鎹兘鏌ュ嚭鏉ヤ簡 @@ -62,14 +64,6 @@ * @param effectTimeRangeList */ public void handlerAggregateState(List<AggregateState> result, Long workstationId, StateAggregateTimeDTO timeRange) { - /* - if (Func.isNotEmpty(result)) { - Map<String, List> stringListMap = CommonUtil.groupList(getFinallyAggregateStateList(result, workstationId, effectTimeRangeList), CommonConstant.MAX_RECORDS_FOR_SQL_LENGTH.intValue()); - stringListMap.forEach(k, v -> { - this.workstationAggregateStateMapper.batchSave(workstationId, v); - }); - }*/ - if(result.isEmpty()) { return; } @@ -93,7 +87,7 @@ schemas.add(new MeasurementSchema("rps", TSDataType.INT32)); //schemas.add(new MeasurementSchema("is_sync", TSDataType.BOOLEAN)); schemas.add(new MeasurementSchema("is_plan", TSDataType.INT32));//TODO 杩欎釜灞炴�у簲璇ユ槸GlobalWcsOfRps涓殑鍊硷紝濡備綍濉啓锛� - //schemas.add(new MeasurementSchema("feedback_id", TSDataType.INT64)); + schemas.add(new MeasurementSchema("is_deleted", TSDataType.BOOLEAN)); schemas.add(new MeasurementSchema("employee_id", TSDataType.INT64)); @@ -104,41 +98,63 @@ Tablet tablet = new Tablet(deviceId, schemas); tablet.rowSize = aggStates.size(); - + AggregateState aggState; + int tblIndex = 0; + for(int i=0;i<aggStates.size();i++) { aggState = aggStates.get(i); - tablet.addTimestamp(i, aggState.getTime()); - tablet.addValue("workstation_id", i, aggState.getWorkstationId()); - tablet.addValue("value_collect", i, aggState.getValueCollect()); - tablet.addValue("end_time", i, aggState.getEndTime()); - tablet.addValue("duration_collect", i, aggState.getDurationCollect()); - tablet.addValue("calendar_code", i, aggState.getCalendarCode()); - tablet.addValue("factory_year", i, aggState.getFactoryYear()); - tablet.addValue("factory_month", i, aggState.getFactoryMonth()); - tablet.addValue("factory_week", i, aggState.getFactoryWeek()); - tablet.addValue("factory_date", i, aggState.getFactoryDate()); - tablet.addValue("shift_index", i, aggState.getShiftIndex()); - tablet.addValue("shift_time_type", i, aggState.getShiftTimeType()); - tablet.addValue("wcs", i, aggState.getWcs()); - tablet.addValue("rps", i, aggState.getRps()); + tablet.addTimestamp(tblIndex, aggState.getTime()); + tablet.addValue("workstation_id", tblIndex, aggState.getWorkstationId()); + tablet.addValue("value_collect", tblIndex, aggState.getValueCollect()); + tablet.addValue("end_time", tblIndex, aggState.getEndTime()); + tablet.addValue("duration_collect", tblIndex, aggState.getDurationCollect()); + tablet.addValue("calendar_code", tblIndex, aggState.getCalendarCode()); + tablet.addValue("factory_year", tblIndex, aggState.getFactoryYear()); + tablet.addValue("factory_month", tblIndex, aggState.getFactoryMonth()); + tablet.addValue("factory_week", tblIndex, aggState.getFactoryWeek()); + tablet.addValue("factory_date", tblIndex, aggState.getFactoryDate()); + tablet.addValue("shift_index", tblIndex, aggState.getShiftIndex()); + tablet.addValue("shift_time_type", tblIndex, aggState.getShiftTimeType()); + tablet.addValue("wcs", tblIndex, aggState.getWcs()); + tablet.addValue("rps", tblIndex, aggState.getRps()); - tablet.addValue("is_plan", i, aggState.getIsPlan()); - //tablet.addValue("feedback_id", i, aggState.getFeedbackId()); - tablet.addValue("is_deleted", i, aggState.getIsDeleted()); - tablet.addValue("employee_id", i, aggState.getEmployeeId()); + tablet.addValue("is_plan", tblIndex, aggState.getIsPlan()); + tablet.addValue("is_deleted", tblIndex, aggState.getIsDeleted()); + tablet.addValue("employee_id", tblIndex, aggState.getEmployeeId()); + + tblIndex++; + if(tblIndex >= MAX_COUNT) { + try { + //姣忎釜宸ヤ綅鎵归噺鎻掑叆涓�娆℃暟鎹� + this.iotdbConfig.getSessionPool().insertAlignedTablet(tablet); + log.info("淇濆瓨鑱氬悎鐘舵�佸畬鎴恡blIndex={}",tblIndex); + tablet.reset(); + tblIndex = 0; + } catch (Exception e) { + log.error("淇濆瓨鍥哄畾鐐规暟鎹紓甯�",e); + } + } } - try { - this.iotdbConfig.getSessionPool().insertAlignedTablet(tablet); - log.info("淇濆瓨鑱氬悎鐘舵�佸畬鎴�"); - } catch (Exception e) { - log.error("淇濆瓨鑱氬悎鐘舵�佹暟鎹紓甯�",e); - } + + if(tblIndex > 0) { + try { + //姣忎釜宸ヤ綅鎵归噺鎻掑叆涓�娆℃暟鎹� + this.iotdbConfig.getSessionPool().insertAlignedTablet(tablet); + log.info("淇濆瓨鑱氬悎鐘舵�佸畬鎴恌inaltblIndex={}",tblIndex); + tablet.reset(); + tblIndex = 0; + } catch (Exception e) { + log.error("淇濆瓨鍥哄畾鐐规暟鎹紓甯�",e); + } + + } + } - + private List<AggregateState> getFinallyAggregateStateList(List<AggregateState> result, Long workstationId, StateAggregateTimeDTO timeRange) { /* List<StateAggregateTimeDTO> effectTimeRangeList2 = effectTimeRangeList.stream().filter(x -> { diff --git a/collect/src/main/java/com/qianwen/mdc/collect/service/DeviceStateService.java b/collect/src/main/java/com/qianwen/mdc/collect/service/DeviceStateService.java index 95725fb..15eaad5 100644 --- a/collect/src/main/java/com/qianwen/mdc/collect/service/DeviceStateService.java +++ b/collect/src/main/java/com/qianwen/mdc/collect/service/DeviceStateService.java @@ -94,9 +94,68 @@ Tablet tablet = new Tablet(deviceId, schemas); + states = entry.getValue(); tablet.rowSize = states.size(); DeviceState state; + + + final int MAX_COUNT = 1000; + //int currentIdx = 0; + + + int tblIndex = 0; + for(int i=0; i < states.size(); i++) { + state = states.get(i); + tablet.addTimestamp(tblIndex, state.getTime()); + tablet.addValue("workstation_id", tblIndex, state.getWorkstationId()); + tablet.addValue("value_collect", tblIndex, state.getValueCollect()); + + tablet.addValue("calendar_code", tblIndex, state.getCalendarCode()); + tablet.addValue("factory_year", tblIndex, state.getFactoryYear()); + tablet.addValue("factory_month", tblIndex, state.getFactoryMonth()); + tablet.addValue("factory_week", tblIndex, state.getFactoryWeek()); + tablet.addValue("factory_date", tblIndex, state.getFactoryDate()); + tablet.addValue("shift_index", tblIndex, state.getShiftIndex()); + tablet.addValue("shift_time_type", tblIndex, state.getShiftTimeType()); + tablet.addValue("wcs", tblIndex, state.getWcs()); + tablet.addValue("rps", tblIndex, state.getRps()); + tablet.addValue("is_fix_point", tblIndex, state.getIsFixPoint()); + tablet.addValue("is_sync", tblIndex, state.getIsSync()); + tablet.addValue("is_plan", tblIndex, state.getIsPlan()); + tablet.addValue("feedback_point_type", tblIndex, state.getFeedbackPointType()); + tablet.addValue("feedback_id", tblIndex, state.getFeedbackId()); + tablet.addValue("is_deleted", tblIndex, state.getIsDeleted()); + tablet.addValue("employee_id", tblIndex, state.getEmployeeId()); + + tblIndex++; + + if(tblIndex >= MAX_COUNT) { + try { + //姣忎釜宸ヤ綅鎵归噺鎻掑叆涓�娆℃暟鎹� + this.iotdbConfig.getSessionPool().insertAlignedTablet(tablet); + log.info("淇濆瓨璁惧鐘舵�佸畬鎴�"); + tablet.reset(); + tblIndex = 0; + } catch (Exception e) { + log.error("淇濆瓨鍥哄畾鐐规暟鎹紓甯�",e); + } + } + } + + if(tblIndex > 0) { + try { + //姣忎釜宸ヤ綅鎵归噺鎻掑叆涓�娆℃暟鎹� + this.iotdbConfig.getSessionPool().insertAlignedTablet(tablet); + log.info("淇濆瓨璁惧鐘舵�佸畬鎴�2"); + tablet.reset(); + tblIndex = 0; + } catch (Exception e) { + log.error("淇濆瓨鍥哄畾鐐规暟鎹紓甯�",e); + } + } + + /* for(int i=0;i<states.size();i++) { state = states.get(i); tablet.addTimestamp(i, state.getTime()); @@ -119,14 +178,8 @@ tablet.addValue("feedback_id", i, state.getFeedbackId()); tablet.addValue("is_deleted", i, state.getIsDeleted()); tablet.addValue("employee_id", i, state.getEmployeeId()); - - } - try { - //姣忎釜宸ヤ綅鎵归噺鎻掑叆涓�娆℃暟鎹� - this.iotdbConfig.getSessionPool().insertAlignedTablet(tablet); - } catch (Exception e) { - log.error("淇濆瓨鍥哄畾鐐规暟鎹紓甯�",e); - } + }*/ + } } diff --git a/collect/src/main/java/com/qianwen/mdc/collect/service/IOTMqttReceiveService.java b/collect/src/main/java/com/qianwen/mdc/collect/service/IOTMqttReceiveService.java index 5b16ccd..63e7ee7 100644 --- a/collect/src/main/java/com/qianwen/mdc/collect/service/IOTMqttReceiveService.java +++ b/collect/src/main/java/com/qianwen/mdc/collect/service/IOTMqttReceiveService.java @@ -5,6 +5,7 @@ import java.util.Optional; import java.util.Set; +import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -55,12 +56,14 @@ Set<String> keySet = jsonObj.keySet(); String[] keys = keySet.toArray(new String[] {}); + final String NEWDP_SUFFIX = "_n";//璁$畻瑙勫垯浣跨敤涔嬪悗鏂版暟鎹偣鐨勭粨灏� for(String key : keys) { String appId = key;//iot绯荤粺涓殑搴旂敤id锛屾湰搴旂敤涓簲璇ョ敤琛ㄥ幓瀵瑰簲 long workstationId = getWorkstationIdByAppId(appId); TelemetryData tdata = new TelemetryData(); tdata.setWorkstationId(workstationId); + JSONArray dtArr = jsonObj.getJSONArray(appId); for(int i=0;i<dtArr.size();i++) { @@ -74,7 +77,11 @@ Set<String> valueKeySet = values.keySet(); valueKeySet.forEach(valueKey ->{ - tdataItem.addPoint(valueKey,values.getString(valueKey)); + String oriValueKey = valueKey;;//鐢变簬浣跨敤璁$畻瑙勫垯鐨勯噰闆嗙偣鍚嶇О浼氬悗闈㈠鍔犱竴涓�"_n",鎵�浠ヨ繖涓猳riValueKey浠h〃娌℃湁澧炲姞"_n"鐨� + if(StringUtils.endsWith(valueKey, NEWDP_SUFFIX)) { + oriValueKey = StringUtils.removeEnd(valueKey, NEWDP_SUFFIX); + } + tdataItem.addPoint(oriValueKey,values.getString(valueKey));//浣跨敤鍘熷閰嶇疆鐐逛繚鎸佷繚瀛樻暟鎹� }); tdata.addItem(tdataItem); diff --git a/collect/src/main/resources/application-dev.yml b/collect/src/main/resources/application-dev.yml index ba09542..5f73d6b 100644 --- a/collect/src/main/resources/application-dev.yml +++ b/collect/src/main/resources/application-dev.yml @@ -35,7 +35,7 @@ #iotdb 浠ュ強鍏秊dbc涓�璧烽厤缃� iotdb: driver: org.apache.iotdb.jdbc.IoTDBDriver - host: 127.0.0.1 + host: 120.46.212.231 port: 6667 maxSize: 100 username: root diff --git a/collect/src/main/resources/com/qianwen/mdc/collect/mapper/iotdb/DeviceStateMapper.xml b/collect/src/main/resources/com/qianwen/mdc/collect/mapper/iotdb/DeviceStateMapper.xml index 0c357b7..668b060 100644 --- a/collect/src/main/resources/com/qianwen/mdc/collect/mapper/iotdb/DeviceStateMapper.xml +++ b/collect/src/main/resources/com/qianwen/mdc/collect/mapper/iotdb/DeviceStateMapper.xml @@ -73,13 +73,13 @@ </sql> <!-- resultType="com.qianwen.mdc.collect.entity.iotdb.DeviceState" --> <select id="lastSyncedNoFeedbackPointState" resultMap="BaseResultMap"> - select <include refid="all_columns" /> FROM root.f2.aggregate_state_${workstationId} + select <include refid="all_columns" /> FROM root.f2.state_${workstationId} where is_sync=true and is_fix_point=false and feedback_point_type=0 and is_deleted=false order by time desc limit 1 </select> <select id="firstNotSyncedNofeedbackPointState" resultMap="BaseResultMap"> - select <include refid="all_columns" /> FROM root.f2.aggregate_state_${workstationId} + select <include refid="all_columns" /> FROM root.f2.state_${workstationId} where is_sync=false and is_fix_point=false and feedback_point_type=0 and is_deleted=false order by time asc limit 1 </select> -- Gitblit v1.9.3