From ba9557b5edf7f31ab8bce0ae57aaaaefd6459bb4 Mon Sep 17 00:00:00 2001 From: yangys <y_ys79@sina.com> Date: 星期二, 08 十月 2024 14:06:54 +0800 Subject: [PATCH] 处理iot链接超时问题 --- collect/src/main/java/com/qianwen/mdc/collect/job/DeviceStatusAggregateJob.java | 6 collect/src/main/java/com/qianwen/mdc/collect/controller/MqttController.java | 4 collect/src/main/resources/application-dev.yml | 12 - collect/src/main/java/com/qianwen/mdc/collect/config/MqttConfig.java | 5 collect/src/main/java/com/qianwen/mdc/collect/handler/DeviceStatusDataHandler.java | 96 +++++---------- collect/src/main/java/com/qianwen/mdc/collect/controller/JobTestController.java | 23 --- collect/src/main/java/com/qianwen/mdc/collect/service/CollectDataService.java | 150 ++++++++++++++++-------- collect/src/main/java/com/qianwen/mdc/collect/runner/InitRunner.java | 13 + collect/src/main/java/com/qianwen/mdc/collect/mapper/iotdb/DeviceStateMapper.java | 2 collect/src/main/java/com/qianwen/mdc/collect/service/DeviceStateAggregateService.java | 2 collect/src/main/java/com/qianwen/mdc/collect/controller/StateController.java | 19 --- collect/src/main/java/com/qianwen/mdc/collect/service/OutputAggregateService.java | 7 - collect/src/main/resources/logback.xml | 3 collect/src/main/java/com/qianwen/mdc/collect/service/IotDBCommonService.java | 3 14 files changed, 160 insertions(+), 185 deletions(-) diff --git a/collect/src/main/java/com/qianwen/mdc/collect/config/MqttConfig.java b/collect/src/main/java/com/qianwen/mdc/collect/config/MqttConfig.java index c48f38f..ec769d0 100644 --- a/collect/src/main/java/com/qianwen/mdc/collect/config/MqttConfig.java +++ b/collect/src/main/java/com/qianwen/mdc/collect/config/MqttConfig.java @@ -81,8 +81,11 @@ */ @Bean public MessageProducer inbound() { + java.util.Random r = new java.util.Random(); + + String clientId = "spring-boot-mqtt-client-inbound"+r.nextInt(1000); MqttPahoMessageDrivenChannelAdapter adapter = - new MqttPahoMessageDrivenChannelAdapter("spring-boot-mqtt-client-inbound", + new MqttPahoMessageDrivenChannelAdapter(clientId, mqttClientFactory(), COLLECT_DATA_TOPIC, FEEDBACK_TOPIC,WOCKSTATION_CREATE_TOPIC);//鏈�鍚庝竴涓弬鏁板厑璁稿涓猼opic鍙傛暟 adapter.setCompletionTimeout(5000); adapter.setConverter(new DefaultPahoMessageConverter()); diff --git a/collect/src/main/java/com/qianwen/mdc/collect/controller/JobTestController.java b/collect/src/main/java/com/qianwen/mdc/collect/controller/JobTestController.java index 34b881e..cfb57a4 100644 --- a/collect/src/main/java/com/qianwen/mdc/collect/controller/JobTestController.java +++ b/collect/src/main/java/com/qianwen/mdc/collect/controller/JobTestController.java @@ -1,8 +1,5 @@ package com.qianwen.mdc.collect.controller; import java.time.LocalDate; -import java.time.format.DateTimeFormatter; -import java.util.Arrays; -import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -10,30 +7,10 @@ import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; -import com.baomidou.dynamic.datasource.annotation.DS; import com.google.common.collect.Sets; import com.qianwen.core.tool.utils.Func; import com.qianwen.mdc.collect.cache.TimeSliceCache; -import com.qianwen.mdc.collect.config.IotDBSessionConfig; import com.qianwen.mdc.collect.dto.CacheBuildDTO; -import com.qianwen.mdc.collect.dto.CalendarShiftInfoDTO; -import com.qianwen.mdc.collect.entity.iotdb.Output; -import com.qianwen.mdc.collect.entity.iotdb.ProcessParam; -import com.qianwen.mdc.collect.mapper.iotdb.OutputMapper; -import com.qianwen.mdc.collect.mapper.iotdb.ProcessParamMapper; -import com.qianwen.mdc.collect.mapper.mgr.CalendarMapper; -import com.qianwen.mdc.collect.mqtt.MqttMessageSender; -import com.qianwen.mdc.collect.service.DeviceStateAggregateService; -import com.qianwen.mdc.collect.service.DeviceStateFixPointService; -import com.qianwen.mdc.collect.service.IOTMqttReceiveService; -import com.qianwen.mdc.collect.service.IotDBCommonService; -import com.qianwen.mdc.collect.service.OutputAggregateService; -import com.qianwen.mdc.collect.utils.redis.RedisUtil; -import com.xxl.job.core.log.XxlJobLogger; - -import cn.hutool.core.date.DateField; -import cn.hutool.core.date.DateTime; -import cn.hutool.core.date.DateUtil; @RestController public class JobTestController { diff --git a/collect/src/main/java/com/qianwen/mdc/collect/controller/MqttController.java b/collect/src/main/java/com/qianwen/mdc/collect/controller/MqttController.java index 9427915..70c0011 100644 --- a/collect/src/main/java/com/qianwen/mdc/collect/controller/MqttController.java +++ b/collect/src/main/java/com/qianwen/mdc/collect/controller/MqttController.java @@ -40,8 +40,8 @@ @GetMapping("/rec2") public void testRec2() { //鏁版嵁鏍煎紡锛歿"174":[{"values":{"d1":12},"ts":"1721978780449"}]} 174鏄簲鐢╥d - //澶氭潯鏍煎紡锛歿"174":[{"values":{"DeviceStatus":2},"ts":"1722478128278"},{"values":{"spindleSpeed":22},"ts":"1722478128281"}]} - String payload = "{\"174\":[{\"values\":{\"DeviceStatus\":2,\"Output\":38},\"ts\":\"1725247557768\"}]}"; + //澶氭潯鏍煎紡锛歿"174":[{"values":{"DeviceStatus":2},"ts":"1722478128278"},{"values":{"SpindleSpeed":22},"ts":"1722478128281"}]} + String payload = "{\"174\":[{\"values\":{\"DeviceStatus\":2,\"Output\":38},\"ts\":\""+System.currentTimeMillis()+"\"}]}"; //payload = "{\"174\":[{\"values\":{\"Output\":11},\"ts\":\"1722478128278\"},{\"values\":{\"SpindleSpeed\":22},\"ts\":\"1722478128281\"}]}"; recService.handle(payload); } diff --git a/collect/src/main/java/com/qianwen/mdc/collect/controller/StateController.java b/collect/src/main/java/com/qianwen/mdc/collect/controller/StateController.java index 5dc0049..6a48f03 100644 --- a/collect/src/main/java/com/qianwen/mdc/collect/controller/StateController.java +++ b/collect/src/main/java/com/qianwen/mdc/collect/controller/StateController.java @@ -1,34 +1,15 @@ package com.qianwen.mdc.collect.controller; -import java.util.Arrays; import java.util.List; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; -import com.baomidou.dynamic.datasource.annotation.DS; -import com.qianwen.mdc.collect.config.IotDBSessionConfig; -import com.qianwen.mdc.collect.dto.CalendarShiftInfoDTO; import com.qianwen.mdc.collect.dto.StateAggregateTimeDTO; import com.qianwen.mdc.collect.entity.iotdb.DeviceState; -import com.qianwen.mdc.collect.entity.iotdb.Output; -import com.qianwen.mdc.collect.entity.iotdb.ProcessParam; import com.qianwen.mdc.collect.mapper.iotdb.DeviceStateMapper; -import com.qianwen.mdc.collect.mapper.iotdb.OutputMapper; -import com.qianwen.mdc.collect.mapper.iotdb.ProcessParamMapper; -import com.qianwen.mdc.collect.mapper.mgr.CalendarMapper; -import com.qianwen.mdc.collect.mqtt.MqttMessageSender; import com.qianwen.mdc.collect.service.DeviceStateAggregateService; -import com.qianwen.mdc.collect.service.DeviceStateFixPointService; -import com.qianwen.mdc.collect.service.IOTMqttReceiveService; -import com.qianwen.mdc.collect.service.IotDBCommonService; -import com.qianwen.mdc.collect.service.OutputAggregateService; -import com.qianwen.mdc.collect.utils.redis.RedisUtil; - -import cn.hutool.core.date.DateField; -import cn.hutool.core.date.DateTime; -import cn.hutool.core.date.DateUtil; @RestController public class StateController { diff --git a/collect/src/main/java/com/qianwen/mdc/collect/handler/DeviceStatusDataHandler.java b/collect/src/main/java/com/qianwen/mdc/collect/handler/DeviceStatusDataHandler.java index 7089893..c4ab080 100644 --- a/collect/src/main/java/com/qianwen/mdc/collect/handler/DeviceStatusDataHandler.java +++ b/collect/src/main/java/com/qianwen/mdc/collect/handler/DeviceStatusDataHandler.java @@ -44,7 +44,8 @@ state.setFeedbackPointType(FeedbackTimePointEnum.NO_FEED_BACK_POINT.getValue()); //WorkstationState propertyData = (WorkstationState) Objects.requireNonNull(BeanUtil.copy(entity, WorkstationState.class)); - state.setValueCollect(Integer.valueOf(data.getValue())); + state.setValueCollect(translateStatus(data.getValue())); + state.setWcs(state.getValueCollect()); state.setWorkstationId(data.getWorkstationId()); @@ -55,7 +56,7 @@ //insertState(state); deviceStateService.saveDeviceStates(Arrays.asList(state)); - log.info("鐘舵�佽仛鍚堣仛鍚堝畬鎴�:鏁版嵁"); + log.info("璁惧鐘舵�佷繚瀛樺畬鎴�"); } private void fillWorkStationCondition(PackedTelemetryData data, DeviceState state) { @@ -67,66 +68,33 @@ //log.info("鑾峰彇鍖呰宸ュ喌浠ュ強缁╂晥淇℃伅" + JsonUtil.toJson(workstationState)); } - /* - void insertState(DeviceState state){ - String deviceId = IOTDBConstant.DB_PREFIX+IOTDBConstant.TEMPLATE_STATE+"_"+state.getWorkstationId(); - try { - iotDBCommonService.setTemmplateIfNotSet(IOTDBConstant.TEMPLATE_STATE, deviceId); - - List<MeasurementSchema> schemas = new ArrayList<>(); - - schemas.add(new MeasurementSchema("workstation_id", TSDataType.INT64)); - schemas.add(new MeasurementSchema("value_collect", TSDataType.INT32)); - schemas.add(new MeasurementSchema("calendar_code", TSDataType.TEXT)); - schemas.add(new MeasurementSchema("factory_year", TSDataType.INT32)); - schemas.add(new MeasurementSchema("factory_month", TSDataType.INT32)); - schemas.add(new MeasurementSchema("factory_week", TSDataType.INT32)); - schemas.add(new MeasurementSchema("factory_date", TSDataType.INT32)); - schemas.add(new MeasurementSchema("shift_index", TSDataType.INT32)); - schemas.add(new MeasurementSchema("shift_time_type", TSDataType.INT32)); - schemas.add(new MeasurementSchema("wcs", TSDataType.INT32)); - schemas.add(new MeasurementSchema("rps", TSDataType.INT32)); - - schemas.add(new MeasurementSchema("is_fix_point", TSDataType.BOOLEAN)); - schemas.add(new MeasurementSchema("is_sync", TSDataType.BOOLEAN)); - - schemas.add(new MeasurementSchema("is_plan", TSDataType.INT32)); - schemas.add(new MeasurementSchema("feedback_point_type", TSDataType.INT32)); - schemas.add(new MeasurementSchema("feedback_id", TSDataType.INT64)); - schemas.add(new MeasurementSchema("is_deleted", TSDataType.BOOLEAN)); - schemas.add(new MeasurementSchema("employee_id", TSDataType.INT64)); - - - Tablet tablet = new Tablet(deviceId, schemas); - int rowIndex = tablet.rowSize++; - - tablet.timestamps[rowIndex] = state.getTime(); - tablet.addValue("workstation_id", rowIndex, state.getWorkstationId()); - tablet.addValue("value_collect", rowIndex, state.getValueCollect()); - tablet.addValue("calendar_code", rowIndex, state.getCalendarCode()); - tablet.addValue("factory_year", rowIndex, state.getFactoryYear()); - tablet.addValue("factory_month", rowIndex, state.getFactoryMonth()); - tablet.addValue("factory_week", rowIndex, state.getFactoryWeek()); - tablet.addValue("factory_date", rowIndex, state.getFactoryDate()); - tablet.addValue("shift_index", rowIndex, state.getShiftIndex());//TODO null - tablet.addValue("shift_time_type", rowIndex, state.getShiftTimeType());//TODO null - tablet.addValue("wcs", rowIndex, state.getWcs()); - tablet.addValue("rps", rowIndex, state.getRps()); - - tablet.addValue("is_fix_point", rowIndex,state.getIsFixPoint()); - tablet.addValue("is_sync", rowIndex, state.getIsSync()); - - tablet.addValue("is_plan", rowIndex, state.getIsPlan()); - tablet.addValue("feedback_point_type", rowIndex, state.getFeedbackPointType()); - tablet.addValue("feedback_id", rowIndex, state.getFeedbackId()); - tablet.addValue("is_deleted", rowIndex, state.getIsDeleted()); - tablet.addValue("employee_id", rowIndex, state.getEmployeeId()); - - iotdbCfg.getSessionPool().insertAlignedTablet(tablet); - - log.info("鐘舵�佹眹鎬诲畬鎴�"); - } catch (Exception e) { - log.error("鑱氬悎浜ч噺IODDB鍏ュ簱澶辫触", e); - } - }*/ + + int translateStatus(String statusVal) { + int oriStatus = Integer.valueOf(statusVal); + + int result = oriStatus; + //瑗块棬瀛�828d, cnc_run_status: 杩愯鐘舵��(0锛歊ESET锛�1锛歋TOP锛�2锛欻OLD锛�3锛歋TART锛�4锛歋PENDLE_CW_CCW锛�5锛歄THER) + switch(oriStatus) { + case 3://START + result = 2; + break; + case 0://锛宺eset + case 2://hold + result = 3;//3寰呮満 + break; + case 4:// SPENDLE_CW_CCW + result = 2; + break; + case 5://鍏朵粬 + result = oriStatus; + break; + default: + result = oriStatus; + } + log.info("statusconvert,ori={},result={}",oriStatus,result); + if(result == 0) { + result = 2;// + } + return result; + } } diff --git a/collect/src/main/java/com/qianwen/mdc/collect/job/DeviceStatusAggregateJob.java b/collect/src/main/java/com/qianwen/mdc/collect/job/DeviceStatusAggregateJob.java index 99e115c..f11cedb 100644 --- a/collect/src/main/java/com/qianwen/mdc/collect/job/DeviceStatusAggregateJob.java +++ b/collect/src/main/java/com/qianwen/mdc/collect/job/DeviceStatusAggregateJob.java @@ -29,15 +29,17 @@ public ReturnT<String> workStationAggregateJobHandler(String param) throws Exception { XxlJobLogger.log("XXL-JOB, 瀹氭椂璁$畻宸ヤ綅鐨勭姸鎬侊紝浜ч噺绛変俊鎭�,寮�濮嬪彂閫�.....", new Object[0]); - Map<Long, WorkstationDTO> workstations = workstationCache.getWorkstations(); if (ObjectUtil.isNotEmpty(workstations)) { + Set<Long> workStationIds = workstations.keySet(); + log.info("鑱氬悎鐘舵�佸伐浣嶆�绘暟:{}",workStationIds.size()); for(Long workstationId :workStationIds) { + log.info("寮�濮嬭仛鍚堝伐浣峽}鐨勭姸鎬�",workstationId); deviceStateAggregateService.stateAggregate(workstationId); } } - + log.info("鑱氬悎鐘舵�佹暣浣撶粨鏉�"); XxlJobLogger.log("XXL-JOB, 瀹氭椂璁$畻宸ヤ綅鐨勭姸鎬侊紝浜ч噺绛変俊鎭�,鍙戦�佺粨鏉�", new Object[0]); return ReturnT.SUCCESS; } diff --git a/collect/src/main/java/com/qianwen/mdc/collect/mapper/iotdb/DeviceStateMapper.java b/collect/src/main/java/com/qianwen/mdc/collect/mapper/iotdb/DeviceStateMapper.java index 22bb57e..7f28a5c 100644 --- a/collect/src/main/java/com/qianwen/mdc/collect/mapper/iotdb/DeviceStateMapper.java +++ b/collect/src/main/java/com/qianwen/mdc/collect/mapper/iotdb/DeviceStateMapper.java @@ -56,7 +56,7 @@ * @param factoryDate * @return */ - long fixPointCountByDate(int factoryDate); + Long fixPointCountByDate(int factoryDate); /** * 鏍规嵁宸ヤ綅id,鑾峰彇 灏忎簬鏃堕棿鎴� 鐨勯潪鍙嶉鐐规暟鎹� diff --git a/collect/src/main/java/com/qianwen/mdc/collect/runner/InitRunner.java b/collect/src/main/java/com/qianwen/mdc/collect/runner/InitRunner.java index f72c9b3..4bc3d32 100644 --- a/collect/src/main/java/com/qianwen/mdc/collect/runner/InitRunner.java +++ b/collect/src/main/java/com/qianwen/mdc/collect/runner/InitRunner.java @@ -15,6 +15,7 @@ import com.qianwen.mdc.collect.dto.CacheBuildDTO; import com.qianwen.mdc.collect.mapper.iotdb.DeviceStateMapper; import com.qianwen.mdc.collect.service.DeviceStateFixPointService; +import com.qianwen.mdc.collect.service.WorkstationAppMappingService; import cn.hutool.core.date.DatePattern; import cn.hutool.core.date.DateTime; @@ -29,9 +30,14 @@ private TimeSliceCache timeSliceCache; @Autowired private DeviceStateMapper deviceStateMapper; - + @Autowired + private WorkstationAppMappingService mappingService;; + @Override public void run(ApplicationArguments args) throws Exception { + + mappingService.saveToCache(); + //鐢熸垚鏃堕棿鍒囩墖 CacheBuildDTO cacheBuildDTO = CacheBuildDTO.builder().tenantIds(Sets.newHashSet(new String[]{"000000"})).targetDate(LocalDate.now()).build(); timeSliceCache.build(cacheBuildDTO); @@ -44,14 +50,15 @@ DateTime dateTime = DateTime.now(); log.info("杩涘叆绋嬪簭鍚姩鏍¢獙鏄惁瀛樺湪宸ヤ綅鎵撹繃鍥哄畾鐐�....... "); - long count = deviceStateMapper.fixPointCountByDate(Integer.valueOf(DatePattern.PURE_DATE_FORMAT.format(dateTime))); + Long count = deviceStateMapper.fixPointCountByDate(Integer.valueOf(DatePattern.PURE_DATE_FORMAT.format(dateTime))); /* Long result = this.workstationStateMapper.selectCount(Wrappers.<WorkstationState>lambdaQuery() .eq(WorkstationState::getFactoryDate, Integer.valueOf(DatePattern.PURE_DATE_FORMAT.format(dateTime))) .eq(WorkstationState::getIsFixPoint, Boolean.TRUE)); */ //Long result = 1L; - if (count <= 0) { + if(count == null || count == 0) { + //if (count <= 0) { log.info("璁惧鏈墦杩囬敋鐐�,鍚姩鏃舵墦鐐�....... "); //this.workStationStateFixPointService.workStationStateFixPoint(dateTime, null); stateFixPointService.deviceStateFixPoint(dateTime, null); diff --git a/collect/src/main/java/com/qianwen/mdc/collect/service/CollectDataService.java b/collect/src/main/java/com/qianwen/mdc/collect/service/CollectDataService.java index 8c2d04b..f219f26 100644 --- a/collect/src/main/java/com/qianwen/mdc/collect/service/CollectDataService.java +++ b/collect/src/main/java/com/qianwen/mdc/collect/service/CollectDataService.java @@ -20,11 +20,15 @@ import org.springframework.stereotype.Service; import com.alibaba.fastjson.JSONObject; +import com.qianwen.core.tool.utils.ObjectUtil; import com.qianwen.mdc.collect.config.IotDBSessionConfig; import com.qianwen.mdc.collect.constants.IOTDBConstant; import com.qianwen.mdc.collect.domain.TelemetryData; import com.qianwen.mdc.collect.domain.TelemetryDataItem; +import com.qianwen.mdc.collect.mqtt.MqttMessageSender; import com.qianwen.mdc.collect.utils.redis.RedisUtil; + +import cn.hutool.json.JSONUtil; /** * 閲囬泦鏁版嵁澶勭悊鍏ュ簱 @@ -39,6 +43,13 @@ private IotDBSessionConfig iotdbConfig; @Autowired private IotDBCommonService iotDBCommonService; + @Autowired + private MqttMessageSender mqttMessageSender; + + /** + * 瀹炴椂鏁版嵁topic锛岃涓巑dc閲岄潰寰楃浉鍚� + */ + public static final String WOCKSTATION_REALTIMEDATA_TOPIC = "mdc/realtimedata"; private static String TEMPLATE_NAME = "process_param"; @@ -57,19 +68,56 @@ for (TelemetryData dt : telemetryDataList) { handleOneWorkstation(dt); + + sendRealtimeDataMsg(dt); } } + void sendRealtimeDataMsg(TelemetryData dt) { + if(ObjectUtil.isEmpty(dt.getDataItems())){ + return; + } + + List<TypedTelemetryData> dataList= new ArrayList<>(); + String propertyName; + for (TelemetryDataItem dataItem : dt.getDataItems()) { + + for (Map<String, String> point : dataItem.getDataPoints()) { + + String[] keys = point.keySet().toArray(new String[0]); + for(int i=0;i<keys.length;i++) { + TypedTelemetryData tpData = new TypedTelemetryData(); + propertyName = keys[i]; + tpData.setTime(dataItem.getTime()); + tpData.setName(propertyName); + tpData.setValue(point.get(propertyName)); + + dataList.add(tpData); + } + + } + + } + + //鍙戦�乵qtt娑堟伅锛岄�氱煡mdc娑堟伅鏉ヤ簡 + for(TypedTelemetryData item : dataList) { + JSONObject json = new JSONObject(); + json.put("workstationId",dt.getWorkstationId()); + json.put("name", item.getName()); + json.put("value", item.getValue()); + json.put("time", item.getTime()); + + mqttMessageSender.sendMessage(WOCKSTATION_REALTIMEDATA_TOPIC, json.toJSONString()); + } + + } /** * 澶勭悊涓�涓伐浣嶇殑鏁版嵁瑙f瀽鍏ュ簱 * @param dt */ void handleOneWorkstation(TelemetryData dt) { String deviceId;// = DB_PREFIX+TEMPLATE_NAME + "_" + dt.getWorkstationId(); - - // 鎸傝浇妯℃澘 - //iotDBCommonService.setTemmplateIsNotSet(TEMPLATE_NAME, deviceId); List<MeasurementSchema> schemas = new ArrayList<>(); @@ -109,10 +157,12 @@ try { iotdbConfig.getSessionPool().insertAlignedTablet(tablet); - updateLastParam(dt.getWorkstationId(),typeList); + //updateLastParam(dt.getWorkstationId(),typeList); } catch (Exception e) { log.error("IOTDB鍏ュ簱澶辫触",e); e.printStackTrace(); + }finally { + //iotdbConfig.getSessionPool().clo1se(); } } @@ -152,64 +202,62 @@ schemas.add(new MeasurementSchema("param_json", TSDataType.TEXT)); Tablet tablet = new Tablet("root.f2.last_process_param", schemas); - for(TypedTelemetryData tdata: typeList) { - - } - + String sql = "select update_time,workstation_id,param_json from root.f2.last_process_param where workstation_id="+workstationId; - SessionDataSetWrapper dsw = iotdbConfig.getSessionPool().executeQueryStatement(sql); - if(dsw.hasNext()) { - RowRecord rec = dsw.next(); - long time = rec.getTimestamp(); - - String paramJsonStr = rec.getFields().get(2).getStringValue(); - - tablet.rowSize = 1; - tablet.addTimestamp(0, time); - tablet.addValue("update_time", 0, updateTime); - tablet.addValue("workstation_id", 0, workstationId); - JSONObject paramObj = JSONObject.parseObject(paramJsonStr); - for(TypedTelemetryData tdata: typeList) { + try(SessionDataSetWrapper dsw = iotdbConfig.getSessionPool().executeQueryStatement(sql)){ + if(dsw.hasNext()) { + RowRecord rec = dsw.next(); + long time = rec.getTimestamp(); - if(paramObj.containsKey(tdata.getName())) { - JSONObject itemObj = paramObj.getJSONObject(tdata.getName()); - itemObj.put("value", tdata.getValue()); - itemObj.put("time", tdata.getTime());//閲囬泦鏃堕棿 - paramObj.put(tdata.getName(), itemObj); - }else { + String paramJsonStr = rec.getFields().get(2).getStringValue(); + + tablet.rowSize = 1; + tablet.addTimestamp(0, time); + tablet.addValue("update_time", 0, updateTime); + tablet.addValue("workstation_id", 0, workstationId); + JSONObject paramObj = JSONObject.parseObject(paramJsonStr); + for(TypedTelemetryData tdata: typeList) { + + if(paramObj.containsKey(tdata.getName())) { + JSONObject itemObj = paramObj.getJSONObject(tdata.getName()); + itemObj.put("value", tdata.getValue()); + itemObj.put("time", tdata.getTime());//閲囬泦鏃堕棿 + paramObj.put(tdata.getName(), itemObj); + }else { + JSONObject itemObj = new JSONObject(); + itemObj.put("value", tdata.getValue()); + itemObj.put("time", tdata.getTime());//閲囬泦鏃堕棿 + paramObj.put(tdata.getName(), itemObj); + + } + } + tablet.addValue("param_json", 0, paramObj.toJSONString()); + + this.iotdbConfig.getSessionPool().insertAlignedTablet(tablet); + + }else { + //娌℃暟鎹紝鏂板姞鍏ヤ竴鏉� + tablet.rowSize = 1; + + tablet.addTimestamp(0, updateTime); + tablet.addValue("update_time", 0, updateTime); + tablet.addValue("workstation_id", 0, workstationId); + + JSONObject paramObj = new JSONObject(); + for(TypedTelemetryData tdata: typeList) { JSONObject itemObj = new JSONObject(); itemObj.put("value", tdata.getValue()); itemObj.put("time", tdata.getTime());//閲囬泦鏃堕棿 paramObj.put(tdata.getName(), itemObj); - } + + tablet.addValue("param_json", 0,paramObj.toJSONString()); + this.iotdbConfig.getSessionPool().insertAlignedTablet(tablet); } - tablet.addValue("param_json", 0, paramObj.toJSONString()); - - this.iotdbConfig.getSessionPool().insertAlignedTablet(tablet); - - }else { - //娌℃暟鎹紝鏂板姞鍏ヤ竴鏉� - tablet.rowSize = 1; - - tablet.addTimestamp(0, updateTime); - tablet.addValue("update_time", 0, updateTime); - tablet.addValue("workstation_id", 0, workstationId); - - JSONObject paramObj = new JSONObject(); - for(TypedTelemetryData tdata: typeList) { - JSONObject itemObj = new JSONObject(); - itemObj.put("value", tdata.getValue()); - itemObj.put("time", tdata.getTime());//閲囬泦鏃堕棿 - paramObj.put(tdata.getName(), itemObj); - } - - tablet.addValue("param_json", 0,paramObj.toJSONString()); - this.iotdbConfig.getSessionPool().insertAlignedTablet(tablet); } - + //dsw.close(); } /** diff --git a/collect/src/main/java/com/qianwen/mdc/collect/service/DeviceStateAggregateService.java b/collect/src/main/java/com/qianwen/mdc/collect/service/DeviceStateAggregateService.java index 460af15..a53d294 100644 --- a/collect/src/main/java/com/qianwen/mdc/collect/service/DeviceStateAggregateService.java +++ b/collect/src/main/java/com/qianwen/mdc/collect/service/DeviceStateAggregateService.java @@ -98,6 +98,8 @@ ////浣跨敤workstationWcsFeedbackMapper鏇存柊鍙嶉寰楃姸鎬�,鎴戜滑涓嶉渶瑕� //IWorkstationFeedbackService workstationFeedbackService = SpringUtil.getBean(IWorkstationFeedbackService.class); workstationFeedbackService.handlerFeedbackComplete(workstationId); + + log.info("鑱氬悎鐘舵�佸畬鎴�:宸ヤ綅{}",workstationId); } diff --git a/collect/src/main/java/com/qianwen/mdc/collect/service/IotDBCommonService.java b/collect/src/main/java/com/qianwen/mdc/collect/service/IotDBCommonService.java index 09ff961..fe8229d 100644 --- a/collect/src/main/java/com/qianwen/mdc/collect/service/IotDBCommonService.java +++ b/collect/src/main/java/com/qianwen/mdc/collect/service/IotDBCommonService.java @@ -32,6 +32,7 @@ List<String> pathlist; try { pathlist = iotdbCfg.getSessionPool().showPathsTemplateSetOn(template); + //logger.info("pathlist"+pathlist); return pathlist.contains(path); } catch (StatementExecutionException|IoTDBConnectionException e) { logger.error("鑾峰彇妯℃澘浣跨敤閿欒",e); @@ -50,7 +51,7 @@ try { iotdbCfg.getSessionPool().setSchemaTemplate(template, deviceId); } catch (Exception e) { - e.printStackTrace(); + logger.error("鑾峰彇妯℃澘浣跨敤閿欒,template="+template+",deviceId="+deviceId,e); } } } diff --git a/collect/src/main/java/com/qianwen/mdc/collect/service/OutputAggregateService.java b/collect/src/main/java/com/qianwen/mdc/collect/service/OutputAggregateService.java index 7ebfa0b..2acc044 100644 --- a/collect/src/main/java/com/qianwen/mdc/collect/service/OutputAggregateService.java +++ b/collect/src/main/java/com/qianwen/mdc/collect/service/OutputAggregateService.java @@ -1,17 +1,12 @@ package com.qianwen.mdc.collect.service; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.Comparator; import java.util.List; -import java.util.Map; import java.util.stream.Collectors; -import org.apache.iotdb.isession.SessionDataSet; -import org.apache.iotdb.isession.pool.SessionDataSetWrapper; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; -import org.apache.iotdb.tsfile.read.common.RowRecord; import org.apache.iotdb.tsfile.write.record.Tablet; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; import org.slf4j.Logger; @@ -20,12 +15,10 @@ import org.springframework.stereotype.Service; import com.baomidou.dynamic.datasource.annotation.DS; -import com.baomidou.mybatisplus.core.toolkit.Wrappers; import com.qianwen.mdc.collect.config.IotDBSessionConfig; import com.qianwen.mdc.collect.constants.IOTDBConstant; import com.qianwen.mdc.collect.entity.iotdb.AggregateOutput; import com.qianwen.mdc.collect.entity.iotdb.Output; -import com.qianwen.mdc.collect.enums.WorkstationParamTypeEnum; import com.qianwen.mdc.collect.mapper.iotdb.OutputMapper; @DS("iotdb") diff --git a/collect/src/main/resources/application-dev.yml b/collect/src/main/resources/application-dev.yml index d326144..ba09542 100644 --- a/collect/src/main/resources/application-dev.yml +++ b/collect/src/main/resources/application-dev.yml @@ -24,14 +24,6 @@ keepalive: 10 connectionTimeout: 3000 #杩炴帴瓒呮椂鏃堕棿 -#绗笁鏂圭櫥闄� -social: - enabled: true - domain: http://127.0.0.1:1888 - - # rocketmq - rocketmq-name-server: 192.168.3.107:9876 - # mysql datasource: type: mysql @@ -45,7 +37,7 @@ driver: org.apache.iotdb.jdbc.IoTDBDriver host: 127.0.0.1 port: 6667 - maxSize: 300 + maxSize: 100 username: root password: root @@ -58,7 +50,7 @@ executor: appname: qwmdc-collect ip: - port: 38801 + port: 8802 ### xxl-job log path logpath: /qwlogs/xxl-job/jobhandler ### xxl-job log remain days diff --git a/collect/src/main/resources/logback.xml b/collect/src/main/resources/logback.xml index c252420..939d369 100644 --- a/collect/src/main/resources/logback.xml +++ b/collect/src/main/resources/logback.xml @@ -13,7 +13,7 @@ <appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender"> <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> <!--鏃ュ織鏂囦欢杈撳嚭鐨勬枃浠跺悕--> - <FileNamePattern>${LOG_HOME}/qwcollect.log.%d{yyyy-MM-dd}.log</FileNamePattern> + <FileNamePattern>${LOG_HOME}/qwcollect.%d{yyyy-MM-dd}.log</FileNamePattern> <!--鏃ュ織鏂囦欢淇濈暀澶╂暟--> <MaxHistory>30</MaxHistory> </rollingPolicy> @@ -30,5 +30,6 @@ <!-- 鏃ュ織杈撳嚭绾у埆 --> <root level="INFO"> <appender-ref ref="STDOUT" /> + <appender-ref ref="FILE" /> </root> </configuration> -- Gitblit v1.9.3