yangys
2024-10-09 7ef593e1e3c35aaeecf9318f0b3941230d3ed002
增加在数据点计算规则后数据点名称加_n的适配
已修改10个文件
194 ■■■■■ 文件已修改
collect/pom.xml 11 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
collect/src/main/java/com/qianwen/mdc/collect/controller/MqttController.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
collect/src/main/java/com/qianwen/mdc/collect/handler/DeviceStatusDataHandler.java 4 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
collect/src/main/java/com/qianwen/mdc/collect/job/FeedbackDealJob.java 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
collect/src/main/java/com/qianwen/mdc/collect/service/CollectDataService.java 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
collect/src/main/java/com/qianwen/mdc/collect/service/DeviceStateAggregateNoFeedbackService.java 86 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
collect/src/main/java/com/qianwen/mdc/collect/service/DeviceStateService.java 69 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
collect/src/main/java/com/qianwen/mdc/collect/service/IOTMqttReceiveService.java 9 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
collect/src/main/resources/application-dev.yml 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
collect/src/main/resources/com/qianwen/mdc/collect/mapper/iotdb/DeviceStateMapper.xml 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
collect/pom.xml
@@ -165,27 +165,22 @@
    </dependencies>
    <build>
        <finalName>collect-api</finalName>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <!--<configuration>
                <configuration>
                    <mainClass>com.qianwen.mdc.collect.MdcTansApplication</mainClass>
                    <layout>ZIP</layout>
                    <includes>
                         打包时包含核心模块依赖包,需手动指定
                        <include>
                            <groupId>nothing</groupId>
                            <artifactId>nothing</artifactId>
                        </include>
                        
                        <include>
                            <groupId>com.xxx</groupId>
                            <artifactId>common</artifactId>
                        </include>
                    </includes>
                </configuration>-->
                </configuration>
            </plugin>
            
            <plugin>
collect/src/main/java/com/qianwen/mdc/collect/controller/MqttController.java
@@ -41,7 +41,7 @@
    public void testRec2() {
        //数据格式:{"174":[{"values":{"d1":12},"ts":"1721978780449"}]} 174是应用id
        //多条格式:{"174":[{"values":{"DeviceStatus":2},"ts":"1722478128278"},{"values":{"SpindleSpeed":22},"ts":"1722478128281"}]}
        String payload = "{\"174\":[{\"values\":{\"DeviceStatus\":2,\"Output\":38},\"ts\":\""+System.currentTimeMillis()+"\"}]}";
        String payload = "{\"174\":[{\"values\":{\"DeviceStatus_n\":2,\"Output\":38},\"ts\":\""+System.currentTimeMillis()+"\"}]}";
        //payload = "{\"174\":[{\"values\":{\"Output\":11},\"ts\":\"1722478128278\"},{\"values\":{\"SpindleSpeed\":22},\"ts\":\"1722478128281\"}]}";
        recService.handle(payload);
    }
collect/src/main/java/com/qianwen/mdc/collect/handler/DeviceStatusDataHandler.java
@@ -70,9 +70,11 @@
    }
    
    int translateStatus(String statusVal) {
        int oriStatus = Integer.valueOf(statusVal);
        
        int result = oriStatus;
        /*
        //西门子828d,   cnc_run_status: 运行状态(0:RESET,1:STOP,2:HOLD,3:START,4:SPENDLE_CW_CCW,5:OTHER)
        switch(oriStatus) {
            case 3://START
@@ -94,7 +96,7 @@
        log.info("statusconvert,ori={},result={}",oriStatus,result);
        if(result == 0) {
            result = 2;//
        }
        }*/
        return result;
    }
}
collect/src/main/java/com/qianwen/mdc/collect/job/FeedbackDealJob.java
@@ -6,8 +6,6 @@
import javax.annotation.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.qianwen.mdc.collect.service.DeviceStateFixPointService;
import com.qianwen.mdc.collect.service.feedback.WorkstationFeedbackService;
import org.springframework.stereotype.Component;
collect/src/main/java/com/qianwen/mdc/collect/service/CollectDataService.java
@@ -4,7 +4,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.stream.Collectors;
import org.apache.iotdb.isession.pool.SessionDataSetWrapper;
@@ -26,9 +25,6 @@
import com.qianwen.mdc.collect.domain.TelemetryData;
import com.qianwen.mdc.collect.domain.TelemetryDataItem;
import com.qianwen.mdc.collect.mqtt.MqttMessageSender;
import com.qianwen.mdc.collect.utils.redis.RedisUtil;
import cn.hutool.json.JSONUtil;
/**
 * 采集数据处理入库
@@ -129,7 +125,6 @@
        int rowIndex = 0;
        
        Map<String, List<TypedTelemetryData>> processParamsMap = parseTelemetryToTypedMapList(dt);
        
        String[] nameArr = processParamsMap.keySet().toArray(new String[0]);
        String name;
collect/src/main/java/com/qianwen/mdc/collect/service/DeviceStateAggregateNoFeedbackService.java
@@ -39,6 +39,8 @@
    private IotDBSessionConfig iotdbConfig;
    @Autowired
    private IotDBCommonService iotDBCommonService;
    private static final int MAX_COUNT = 1000;
    public List<AggregateState> stateAggregateForSpecialTimeRange(Long workstationId, StateAggregateTimeDTO timeRange, List<DeviceState> effectiveStateList) {
        //按timeRange查询时间区间内的状态数据,除了已删除的,其他数据都查出来了
@@ -62,14 +64,6 @@
     * @param effectTimeRangeList
     */
    public void handlerAggregateState(List<AggregateState> result, Long workstationId, StateAggregateTimeDTO timeRange) {
        /*
        if (Func.isNotEmpty(result)) {
            Map<String, List> stringListMap = CommonUtil.groupList(getFinallyAggregateStateList(result, workstationId, effectTimeRangeList), CommonConstant.MAX_RECORDS_FOR_SQL_LENGTH.intValue());
            stringListMap.forEach(k, v -> {
                this.workstationAggregateStateMapper.batchSave(workstationId, v);
            });
        }*/
        if(result.isEmpty()) {
            return;
        }
@@ -93,7 +87,7 @@
        schemas.add(new MeasurementSchema("rps", TSDataType.INT32));
        //schemas.add(new MeasurementSchema("is_sync", TSDataType.BOOLEAN));
        schemas.add(new MeasurementSchema("is_plan", TSDataType.INT32));//TODO 这个属性应该是GlobalWcsOfRps中的值,如何填写?
        //schemas.add(new MeasurementSchema("feedback_id", TSDataType.INT64));
        schemas.add(new MeasurementSchema("is_deleted", TSDataType.BOOLEAN));
        schemas.add(new MeasurementSchema("employee_id", TSDataType.INT64));
        
@@ -104,41 +98,63 @@
        
        Tablet tablet = new Tablet(deviceId, schemas);
        tablet.rowSize = aggStates.size();
        AggregateState aggState;
        int tblIndex = 0;
        for(int i=0;i<aggStates.size();i++) {
            aggState = aggStates.get(i);
            tablet.addTimestamp(i, aggState.getTime());
            tablet.addValue("workstation_id", i, aggState.getWorkstationId());
            tablet.addValue("value_collect", i, aggState.getValueCollect());
            tablet.addValue("end_time", i, aggState.getEndTime());
            tablet.addValue("duration_collect", i, aggState.getDurationCollect());
            tablet.addValue("calendar_code", i, aggState.getCalendarCode());
            tablet.addValue("factory_year", i, aggState.getFactoryYear());
            tablet.addValue("factory_month", i, aggState.getFactoryMonth());
            tablet.addValue("factory_week", i, aggState.getFactoryWeek());
            tablet.addValue("factory_date", i, aggState.getFactoryDate());
            tablet.addValue("shift_index", i, aggState.getShiftIndex());
            tablet.addValue("shift_time_type", i, aggState.getShiftTimeType());
            tablet.addValue("wcs", i, aggState.getWcs());
            tablet.addValue("rps", i, aggState.getRps());
            tablet.addTimestamp(tblIndex, aggState.getTime());
            tablet.addValue("workstation_id", tblIndex, aggState.getWorkstationId());
            tablet.addValue("value_collect", tblIndex, aggState.getValueCollect());
            tablet.addValue("end_time", tblIndex, aggState.getEndTime());
            tablet.addValue("duration_collect", tblIndex, aggState.getDurationCollect());
            tablet.addValue("calendar_code", tblIndex, aggState.getCalendarCode());
            tablet.addValue("factory_year", tblIndex, aggState.getFactoryYear());
            tablet.addValue("factory_month", tblIndex, aggState.getFactoryMonth());
            tablet.addValue("factory_week", tblIndex, aggState.getFactoryWeek());
            tablet.addValue("factory_date", tblIndex, aggState.getFactoryDate());
            tablet.addValue("shift_index", tblIndex, aggState.getShiftIndex());
            tablet.addValue("shift_time_type", tblIndex, aggState.getShiftTimeType());
            tablet.addValue("wcs", tblIndex, aggState.getWcs());
            tablet.addValue("rps", tblIndex, aggState.getRps());
            
            tablet.addValue("is_plan", i, aggState.getIsPlan());
            //tablet.addValue("feedback_id", i, aggState.getFeedbackId());
            tablet.addValue("is_deleted", i, aggState.getIsDeleted());
            tablet.addValue("employee_id", i, aggState.getEmployeeId());
            tablet.addValue("is_plan", tblIndex, aggState.getIsPlan());
            tablet.addValue("is_deleted", tblIndex, aggState.getIsDeleted());
            tablet.addValue("employee_id", tblIndex, aggState.getEmployeeId());
            tblIndex++;
            if(tblIndex >= MAX_COUNT) {
                try {
                    //每个工位批量插入一次数据
                    this.iotdbConfig.getSessionPool().insertAlignedTablet(tablet);
                    log.info("保存聚合状态完成tblIndex={}",tblIndex);
                    tablet.reset();
                    tblIndex = 0;
                } catch (Exception e) {
                    log.error("保存固定点数据异常",e);
                }
            }
            
        }
        try {
            this.iotdbConfig.getSessionPool().insertAlignedTablet(tablet);
            log.info("保存聚合状态完成");
        } catch (Exception e) {
            log.error("保存聚合状态数据异常",e);
        }
        if(tblIndex > 0) {
            try {
                //每个工位批量插入一次数据
                this.iotdbConfig.getSessionPool().insertAlignedTablet(tablet);
                log.info("保存聚合状态完成finaltblIndex={}",tblIndex);
                tablet.reset();
                tblIndex = 0;
            } catch (Exception e) {
                log.error("保存固定点数据异常",e);
            }
        }
            
    }
    
    private List<AggregateState> getFinallyAggregateStateList(List<AggregateState> result, Long workstationId, StateAggregateTimeDTO timeRange) {
        /*
        List<StateAggregateTimeDTO> effectTimeRangeList2 = effectTimeRangeList.stream().filter(x -> {
collect/src/main/java/com/qianwen/mdc/collect/service/DeviceStateService.java
@@ -94,9 +94,68 @@
            
            Tablet tablet = new Tablet(deviceId, schemas);
            
            states = entry.getValue();
            tablet.rowSize = states.size();
            DeviceState state;
            final int MAX_COUNT = 1000;
            //int currentIdx = 0;
            int tblIndex = 0;
            for(int i=0; i < states.size(); i++) {
                state = states.get(i);
                tablet.addTimestamp(tblIndex, state.getTime());
                tablet.addValue("workstation_id", tblIndex, state.getWorkstationId());
                tablet.addValue("value_collect", tblIndex, state.getValueCollect());
                tablet.addValue("calendar_code", tblIndex, state.getCalendarCode());
                tablet.addValue("factory_year", tblIndex, state.getFactoryYear());
                tablet.addValue("factory_month", tblIndex, state.getFactoryMonth());
                tablet.addValue("factory_week", tblIndex, state.getFactoryWeek());
                tablet.addValue("factory_date", tblIndex, state.getFactoryDate());
                tablet.addValue("shift_index", tblIndex, state.getShiftIndex());
                tablet.addValue("shift_time_type", tblIndex, state.getShiftTimeType());
                tablet.addValue("wcs", tblIndex, state.getWcs());
                tablet.addValue("rps", tblIndex, state.getRps());
                tablet.addValue("is_fix_point", tblIndex, state.getIsFixPoint());
                tablet.addValue("is_sync", tblIndex, state.getIsSync());
                tablet.addValue("is_plan", tblIndex, state.getIsPlan());
                tablet.addValue("feedback_point_type", tblIndex, state.getFeedbackPointType());
                tablet.addValue("feedback_id", tblIndex, state.getFeedbackId());
                tablet.addValue("is_deleted", tblIndex, state.getIsDeleted());
                tablet.addValue("employee_id", tblIndex, state.getEmployeeId());
                tblIndex++;
                if(tblIndex >= MAX_COUNT) {
                    try {
                        //每个工位批量插入一次数据
                        this.iotdbConfig.getSessionPool().insertAlignedTablet(tablet);
                        log.info("保存设备状态完成");
                        tablet.reset();
                        tblIndex = 0;
                    } catch (Exception e) {
                        log.error("保存固定点数据异常",e);
                    }
                }
            }
            if(tblIndex > 0) {
                try {
                    //每个工位批量插入一次数据
                    this.iotdbConfig.getSessionPool().insertAlignedTablet(tablet);
                    log.info("保存设备状态完成2");
                    tablet.reset();
                    tblIndex = 0;
                } catch (Exception e) {
                    log.error("保存固定点数据异常",e);
                }
            }
            /*
            for(int i=0;i<states.size();i++) {
                state = states.get(i);
                tablet.addTimestamp(i, state.getTime());
@@ -119,14 +178,8 @@
                tablet.addValue("feedback_id", i, state.getFeedbackId());
                tablet.addValue("is_deleted", i, state.getIsDeleted());
                tablet.addValue("employee_id", i, state.getEmployeeId());
            }
            try {
                //每个工位批量插入一次数据
                this.iotdbConfig.getSessionPool().insertAlignedTablet(tablet);
            } catch (Exception e) {
                log.error("保存固定点数据异常",e);
            }
            }*/
            
        }
    }
collect/src/main/java/com/qianwen/mdc/collect/service/IOTMqttReceiveService.java
@@ -5,6 +5,7 @@
import java.util.Optional;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -55,12 +56,14 @@
        Set<String> keySet = jsonObj.keySet();
        String[] keys = keySet.toArray(new String[] {});
        
        final String NEWDP_SUFFIX = "_n";//计算规则使用之后新数据点的结尾
        for(String key : keys) {
            String appId = key;//iot系统中的应用id,本应用中应该用表去对应
            long workstationId = getWorkstationIdByAppId(appId);
            
            TelemetryData tdata = new TelemetryData();
            tdata.setWorkstationId(workstationId);
            
            JSONArray dtArr = jsonObj.getJSONArray(appId);
            for(int i=0;i<dtArr.size();i++) {
@@ -74,7 +77,11 @@
                
                Set<String> valueKeySet = values.keySet();
                valueKeySet.forEach(valueKey ->{
                    tdataItem.addPoint(valueKey,values.getString(valueKey));
                    String oriValueKey = valueKey;;//由于使用计算规则的采集点名称会后面增加一个"_n",所以这个oriValueKey代表没有增加"_n"的
                    if(StringUtils.endsWith(valueKey, NEWDP_SUFFIX)) {
                        oriValueKey = StringUtils.removeEnd(valueKey, NEWDP_SUFFIX);
                    }
                    tdataItem.addPoint(oriValueKey,values.getString(valueKey));//使用原始配置点保持保存数据
                });
                
                tdata.addItem(tdataItem);
collect/src/main/resources/application-dev.yml
@@ -35,7 +35,7 @@
#iotdb 以及其jdbc一起配置
iotdb:
  driver: org.apache.iotdb.jdbc.IoTDBDriver
  host: 127.0.0.1
  host: 120.46.212.231
  port: 6667
  maxSize: 100
  username: root
collect/src/main/resources/com/qianwen/mdc/collect/mapper/iotdb/DeviceStateMapper.xml
@@ -73,13 +73,13 @@
    </sql>
    <!--  resultType="com.qianwen.mdc.collect.entity.iotdb.DeviceState" -->
    <select id="lastSyncedNoFeedbackPointState" resultMap="BaseResultMap">
        select <include refid="all_columns" /> FROM root.f2.aggregate_state_${workstationId}
        select <include refid="all_columns" /> FROM root.f2.state_${workstationId}
        where is_sync=true and is_fix_point=false and feedback_point_type=0 and is_deleted=false
        order by time desc limit 1
    </select>
    
    <select id="firstNotSyncedNofeedbackPointState" resultMap="BaseResultMap">
        select <include refid="all_columns" /> FROM root.f2.aggregate_state_${workstationId}
        select <include refid="all_columns" /> FROM root.f2.state_${workstationId}
        where is_sync=false and is_fix_point=false and feedback_point_type=0 and is_deleted=false
        order by time asc limit 1
    </select>