| | |
| | | import java.util.HashMap; |
| | | import java.util.List; |
| | | import java.util.Map; |
| | | import java.util.Set; |
| | | import java.util.Random; |
| | | import java.util.stream.Collectors; |
| | | |
| | | import org.apache.commons.lang3.tuple.Pair; |
| | | import org.apache.iotdb.isession.pool.SessionDataSetWrapper; |
| | | import org.apache.iotdb.rpc.IoTDBConnectionException; |
| | | import org.apache.iotdb.rpc.StatementExecutionException; |
| | | import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; |
| | | import org.apache.iotdb.tsfile.read.common.RowRecord; |
| | | import org.apache.iotdb.tsfile.write.record.Tablet; |
| | | import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; |
| | | import org.slf4j.Logger; |
| | |
| | | import org.springframework.beans.factory.annotation.Autowired; |
| | | import org.springframework.stereotype.Service; |
| | | |
| | | import com.alibaba.fastjson.JSONObject; |
| | | import com.qianwen.mdc.collect.config.IotDBSessionConfig; |
| | | import com.qianwen.mdc.collect.domain.TelemetryData; |
| | | import com.qianwen.mdc.collect.domain.TelemetryDataItem; |
| | | import com.qianwen.mdc.collect.entity.iotdb.CollectData; |
| | | import com.qianwen.mdc.collect.utils.redis.RedisUtil; |
| | | |
| | | /** |
| | |
| | | private IotDBSessionConfig iotdbConfig; |
| | | @Autowired |
| | | private IotDBCommonService iotDBCommonService; |
| | | @Autowired |
| | | private RedisUtil redisUtil; |
| | | |
| | | private static String TEMPLATE_NAME = "process_param"; |
| | | |
| | |
| | | PROCESS_PARAM_MAP.put(7, "ALARM"); |
| | | } |
| | | |
| | | // TelemetryDataMessage telemetryDataMessage |
| | | |
| | | public void handleCollectData(List<TelemetryData> telemetryDataList) { |
| | | |
| | | for (TelemetryData dt : telemetryDataList) { |
| | |
| | | List<TypedTelemetryData> typeList = processParamsMap.get(name); |
| | | |
| | | deviceId = generateDeviceId(dt.getWorkstationId(),name); |
| | | System.out.println("deivcdid="+deviceId); |
| | | iotDBCommonService.setTemmplateIsNotSet(TEMPLATE_NAME, deviceId); |
| | | //System.out.println("deivcdid="+deviceId); |
| | | iotDBCommonService.setTemmplateIfNotSet(TEMPLATE_NAME, deviceId); |
| | | Tablet tablet = new Tablet(deviceId, schemas); |
| | | for(TypedTelemetryData tdata : typeList) { |
| | | rowIndex = tablet.rowSize++; |
| | |
| | | |
| | | try { |
| | | iotdbConfig.getSessionPool().insertAlignedTablet(tablet); |
| | | } catch (Exception e) { |
| | | log.error("IOTDB入库失败",e); |
| | | e.printStackTrace(); |
| | | } |
| | | //System.out.println(typeList); |
| | | } |
| | | /* |
| | | for (TelemetryDataItem dataItem : dt.getDataItems()) { |
| | | |
| | | for (Map<String, String> point : dataItem.getDataPoints()) { |
| | | |
| | | String[] keys = point.keySet().toArray(new String[0]); |
| | | for(int i=0;i<keys.length;i++) { |
| | | rowIndex = tablet.rowSize++; |
| | | tablet.addTimestamp(rowIndex, dataItem.getTime()); |
| | | tablet.addValue("workstation_id",rowIndex,new Long(dt.getWorkstationId())); |
| | | propertyName = keys[i]; |
| | | deviceId = generateDeviceId(dt.getWorkstationId(),propertyName);//DB_PREFIX+TEMPLATE_NAME + "_" + dt.getWorkstationId()+"_"+propertyName; |
| | | tablet.addValue("n",rowIndex,propertyName); |
| | | tablet.addValue("v",rowIndex,point.get(propertyName)); |
| | | } |
| | | |
| | | } |
| | | |
| | | try { |
| | | iotdbConfig.getSessionPool().insertAlignedTablet(tablet); |
| | | |
| | | updateLastParam(dt.getWorkstationId(),typeList); |
| | | } catch (Exception e) { |
| | | log.error("IOTDB入库失败",e); |
| | | e.printStackTrace(); |
| | | } |
| | | } |
| | | */ |
| | | |
| | | |
| | | } |
| | | |
| | | /** |
| | | * 填充最新参数的数据 |
| | | * @param typeList |
| | | * @throws StatementExecutionException |
| | | * @throws IoTDBConnectionException |
| | | */ |
| | | void updateLastParam(long workstationId,List<TypedTelemetryData> typeList) throws IoTDBConnectionException, StatementExecutionException { |
| | | if(typeList.isEmpty()){ |
| | | return; |
| | | } |
| | | long updateTime = typeList.get(0).getTime(); |
| | | List<MeasurementSchema> schemas = new ArrayList<>(); |
| | | |
| | | schemas.add(new MeasurementSchema("update_time", TSDataType.INT64)); |
| | | schemas.add(new MeasurementSchema("workstation_id", TSDataType.INT64)); |
| | | schemas.add(new MeasurementSchema("param_json", TSDataType.TEXT)); |
| | | |
| | | Tablet tablet = new Tablet("root.f2.last_process_param", schemas); |
| | | for(TypedTelemetryData tdata: typeList) { |
| | | |
| | | } |
| | | |
| | | |
| | | String sql = "select update_time,workstation_id,param_json from root.f2.last_process_param where workstation_id="+workstationId; |
| | | SessionDataSetWrapper dsw = iotdbConfig.getSessionPool().executeQueryStatement(sql); |
| | | |
| | | if(dsw.hasNext()) { |
| | | RowRecord rec = dsw.next(); |
| | | long time = rec.getTimestamp(); |
| | | |
| | | String paramJsonStr = rec.getFields().get(2).getStringValue(); |
| | | |
| | | tablet.rowSize = 1; |
| | | tablet.addTimestamp(0, time); |
| | | tablet.addValue("update_time", 0, updateTime); |
| | | tablet.addValue("workstation_id", 0, workstationId); |
| | | JSONObject paramObj = JSONObject.parseObject(paramJsonStr); |
| | | for(TypedTelemetryData tdata: typeList) { |
| | | |
| | | if(paramObj.containsKey(tdata.getName())) { |
| | | JSONObject itemObj = paramObj.getJSONObject(tdata.getName()); |
| | | itemObj.put("value", tdata.getValue()); |
| | | itemObj.put("time", tdata.getTime());//采集时间 |
| | | paramObj.put(tdata.getName(), itemObj); |
| | | }else { |
| | | JSONObject itemObj = new JSONObject(); |
| | | itemObj.put("value", tdata.getValue()); |
| | | itemObj.put("time", tdata.getTime());//采集时间 |
| | | paramObj.put(tdata.getName(), itemObj); |
| | | |
| | | } |
| | | } |
| | | tablet.addValue("param_json", 0, paramObj.toJSONString()); |
| | | |
| | | this.iotdbConfig.getSessionPool().insertAlignedTablet(tablet); |
| | | |
| | | }else { |
| | | //没数据,新加入一条 |
| | | tablet.rowSize = 1; |
| | | |
| | | tablet.addTimestamp(0, updateTime); |
| | | tablet.addValue("update_time", 0, updateTime); |
| | | tablet.addValue("workstation_id", 0, workstationId); |
| | | |
| | | JSONObject paramObj = new JSONObject(); |
| | | for(TypedTelemetryData tdata: typeList) { |
| | | JSONObject itemObj = new JSONObject(); |
| | | itemObj.put("value", tdata.getValue()); |
| | | itemObj.put("time", tdata.getTime());//采集时间 |
| | | paramObj.put(tdata.getName(), itemObj); |
| | | } |
| | | |
| | | tablet.addValue("param_json", 0,paramObj.toJSONString()); |
| | | this.iotdbConfig.getSessionPool().insertAlignedTablet(tablet); |
| | | } |
| | | |
| | | } |
| | | |
| | | /** |
| | | * 解析数据,形成name - value对类型的数据列表,并用name进行分组 |
| | | * @param dt |
| | | * @return 按name分组后的数据 |
| | | */ |
| | | Map<String, List<TypedTelemetryData>> parseTelemetryToTypedMapList(TelemetryData dt){ |
| | | List<TypedTelemetryData> list = new ArrayList<>(); |
| | | |