/** * BladeX Commercial License Agreement * Copyright (c) 2018-2099, https://bladex.cn. All rights reserved. *

* Use of this software is governed by the Commercial License Agreement * obtained after purchasing a license from BladeX. *

* 1. This software is for development use only under a valid license * from BladeX. *

* 2. Redistribution of this software's source code to any third party * without a commercial license is strictly prohibited. *

* 3. Licensees may copyright their own code but cannot use segments * from this software for such purposes. Copyright of this software * remains with BladeX. *

* Using this software signifies agreement to this License, and the software * must not be used for illegal purposes. *

* THIS SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY. The author is * not liable for any claims arising from secondary or illegal development. *

* Author: Chill Zhuang (bladejava@qq.com) */ package org.springblade.job.service.impl; import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.core.toolkit.Wrappers; import lombok.AllArgsConstructor; import org.springblade.core.log.exception.ServiceException; import org.springblade.core.mp.base.BaseServiceImpl; import org.springblade.core.powerjob.constant.PowerJobConstant; import org.springblade.core.tool.jackson.JsonUtil; import org.springblade.core.tool.utils.ConvertUtil; import org.springblade.core.tool.utils.DateUtil; import org.springblade.core.tool.utils.Func; import org.springblade.core.tool.utils.StringPool; import org.springblade.job.pojo.dto.JobDTO; import org.springblade.job.pojo.entity.JobInfo; import org.springblade.job.pojo.entity.JobServer; import org.springblade.job.mapper.JobInfoMapper; import org.springblade.job.service.IJobInfoService; import org.springblade.job.service.IJobServerService; import org.springblade.job.pojo.vo.JobInfoVO; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import tech.powerjob.client.PowerJobClient; import tech.powerjob.common.enums.DispatchStrategy; import tech.powerjob.common.enums.ExecuteType; import tech.powerjob.common.enums.ProcessorType; import tech.powerjob.common.enums.TimeExpressionType; import tech.powerjob.common.model.AlarmConfig; import tech.powerjob.common.model.LifeCycle; import tech.powerjob.common.model.LogConfig; import tech.powerjob.common.request.http.SaveJobInfoRequest; import tech.powerjob.common.response.JobInfoDTO; import tech.powerjob.common.response.ResultDTO; import java.math.BigDecimal; import java.util.*; import java.util.stream.Collectors; /** * 任务信息表 服务实现类 * * @author BladeX */ @Service @AllArgsConstructor public class JobInfoServiceImpl extends BaseServiceImpl implements IJobInfoService { private final IJobServerService jobServerService; @Override public IPage selectJobInfoPage(IPage page, JobInfoVO jobInfo) { return page.setRecords(baseMapper.selectJobInfoPage(page, jobInfo)); } @Override @Transactional(rollbackFor = Exception.class) public Boolean submitAndSync(JobInfo jobInfo) { //获取应用分组服务端信息 JobServer jobServer = jobServerService.getById(jobInfo.getJobServerId()); //构建Job客户端 PowerJobClient client = new PowerJobClient(jobServer.getJobServerUrl(), jobServer.getJobAppName(), jobServer.getJobAppPassword()); SaveJobInfoRequest request = convertToServer(jobInfo); //获取上传结果 ResultDTO result = client.saveJob(request); if (result.isSuccess()) { jobInfo.setJobId(result.getData()); return this.saveOrUpdate(jobInfo); } else { throw new ServiceException(result.getMessage()); } } @Override @Transactional(rollbackFor = Exception.class) public Boolean removeAndSync(List ids) { ids.forEach(id -> { JobDTO jobDTO = JobData(id); if (Func.isNotEmpty(jobDTO)) { JobInfo jobInfo = jobDTO.getJobInfo(); PowerJobClient powerJobClient = jobDTO.getPowerJobClient(); //删除服务数据 ResultDTO result = powerJobClient.deleteJob(jobInfo.getJobId()); if (result.isSuccess()) { this.removeById(id); } else { throw new ServiceException(result.getMessage()); } } }); return true; } @Override public Boolean changeServerJob(Long id, Integer enable) { JobDTO jobDTO = JobData(id); if (Func.isNotEmpty(jobDTO)) { JobInfo jobInfo = jobDTO.getJobInfo(); PowerJobClient powerJobClient = jobDTO.getPowerJobClient(); //更换服务端状态 ResultDTO result = (enable == PowerJobConstant.JOB_ENABLED) ? powerJobClient.enableJob(jobInfo.getJobId()) : powerJobClient.disableJob(jobInfo.getJobId()); //删除客户端数据 if (result.isSuccess()) { return this.update(Wrappers.update().lambda().set(JobInfo::getEnable, enable).eq(JobInfo::getId, id)); } else { throw new ServiceException(result.getMessage()); } } return false; } @Override public Boolean runServerJob(Long id) { JobDTO jobDTO = JobData(id); if (Func.isNotEmpty(jobDTO)) { JobInfo jobInfo = jobDTO.getJobInfo(); PowerJobClient powerJobClient = jobDTO.getPowerJobClient(); ResultDTO result = powerJobClient.runJob(jobInfo.getJobId()); return result.isSuccess(); } return false; } @Override @Transactional(rollbackFor = Exception.class) public Boolean sync() { //任务信息列表 List jobInfos = this.list(); //任务服务列表 List jobServers = jobServerService.list(); //按应用分组 Map> jobGroups = jobInfos.stream().collect(Collectors.groupingBy(JobInfo::getJobServerId)); //处理服务端数据下载 jobServers.forEach(jobServer -> { //构建Job客户端 PowerJobClient client = new PowerJobClient(jobServer.getJobServerUrl(), jobServer.getJobAppName(), jobServer.getJobAppPassword()); //从服务端获取数据 List serverInfoList = Optional.ofNullable(client.fetchAllJob()) .filter(ResultDTO::isSuccess) .map(ResultDTO::getData) .orElseGet(ArrayList::new); //获取客户端数据 List localInfoList = jobGroups.get(jobServer.getId()); //处理需要从服务端下载的数据 List jobInfoDTOList = serverInfoList.stream() .filter(serverData -> serverData.getStatus() != PowerJobConstant.JOB_DELETED) .filter(serverData -> Func.isEmpty(localInfoList) || localInfoList.stream().noneMatch(localData -> Func.equalsSafe(localData.getJobId(), serverData.getId()))) .collect(Collectors.toList()); List dataToDownload = convertToLocalList(jobInfoDTOList, jobServer.getId()); //调用本地Service保存数据 this.saveBatch(dataToDownload); }); //处理客户端数据上传 jobGroups.forEach((jobServerId, localInfoList) -> { //获取应用分组服务端信息 JobServer jobServer = jobServers.stream().filter(js -> Func.equalsSafe(js.getId(), jobServerId)) .findFirst().orElseThrow(() -> new ServiceException(PowerJobConstant.JOB_SYNC_ALERT)); //构建Job客户端 PowerJobClient client = new PowerJobClient(jobServer.getJobServerUrl(), jobServer.getJobAppName(), jobServer.getJobAppPassword()); //处理需要上传到服务端的数据 localInfoList.forEach(localData -> { //转换数据格式 SaveJobInfoRequest data = convertToServer(localData); //调用OpenAPI接口上传数据 ResultDTO saveResult = client.saveJob(data); if (saveResult.isSuccess()) { //更新服务端JobId至客户端 this.update(Wrappers.update().lambda().set(JobInfo::getJobId, saveResult.getData()).eq(JobInfo::getId, localData.getId())); } else { throw new ServiceException(saveResult.getMessage()); } }); }); return true; } /** * 获取Job数据集合 * * @param jobInfoId 服务信息ID * @return PowerJobClient */ public JobDTO JobData(Long jobInfoId) { //构建DTO类 JobDTO jobDTO = new JobDTO(); //获取任务信息 JobInfo jobInfo = this.getById(jobInfoId); jobDTO.setJobInfo(jobInfo); if (Func.isEmpty(jobInfo.getJobId())) { throw new ServiceException(PowerJobConstant.JOB_SYNC_ALERT); } if (Func.isNotEmpty(jobInfo.getJobServerId())) { //获取应用分组服务端信息 JobServer jobServer = jobServerService.getById(jobInfo.getJobServerId()); jobDTO.setJobServer(jobServer); //构建Job客户端 PowerJobClient powerJobClient = new PowerJobClient(jobServer.getJobServerUrl(), jobServer.getJobAppName(), jobServer.getJobAppPassword()); jobDTO.setPowerJobClient(powerJobClient); return jobDTO; } return null; } /** * 服务端Job列表转换 * * @param jobInfoList 本地任务信息列表 * @return List */ public List convertToServerList(List jobInfoList) { return jobInfoList.stream().map(this::convertToServer).collect(Collectors.toList()); } /** * 本地Job列表转换 * * @param jobInfoDTOList 服务端任务信息列表 * @return List */ public List convertToLocalList(List jobInfoDTOList, Long jobServerId) { return jobInfoDTOList.stream().map(jobInfoDTO -> convertToLocal(jobInfoDTO, jobServerId)).collect(Collectors.toList()); } /** * 服务端Job单个转换 * * @param jobInfo 本地任务信息 * @return SaveJobInfoRequest */ public SaveJobInfoRequest convertToServer(JobInfo jobInfo) { SaveJobInfoRequest saveJobInfoRequest = new SaveJobInfoRequest(); if (Func.toLong(jobInfo.getJobId()) > 0L) { saveJobInfoRequest.setId(jobInfo.getJobId()); } saveJobInfoRequest.setJobName(jobInfo.getJobName()); saveJobInfoRequest.setJobDescription(jobInfo.getJobDescription()); saveJobInfoRequest.setJobParams(jobInfo.getJobParams()); saveJobInfoRequest.setTimeExpressionType(TimeExpressionType.of(jobInfo.getTimeExpressionType())); saveJobInfoRequest.setTimeExpression(jobInfo.getTimeExpression()); saveJobInfoRequest.setExecuteType(ExecuteType.of(jobInfo.getExecuteType())); saveJobInfoRequest.setProcessorType(ProcessorType.of(jobInfo.getProcessorType())); saveJobInfoRequest.setProcessorInfo(jobInfo.getProcessorInfo()); saveJobInfoRequest.setMaxInstanceNum(jobInfo.getMaxInstanceNum()); saveJobInfoRequest.setConcurrency(jobInfo.getConcurrency()); saveJobInfoRequest.setInstanceTimeLimit(jobInfo.getInstanceTimeLimit()); saveJobInfoRequest.setInstanceRetryNum(jobInfo.getInstanceRetryNum()); saveJobInfoRequest.setTaskRetryNum(jobInfo.getTaskRetryNum()); saveJobInfoRequest.setMinCpuCores(jobInfo.getMinCpuCores().doubleValue()); saveJobInfoRequest.setMinMemorySpace(jobInfo.getMinMemorySpace().doubleValue()); saveJobInfoRequest.setMinDiskSpace(jobInfo.getMinDiskSpace().doubleValue()); saveJobInfoRequest.setDesignatedWorkers(jobInfo.getDesignatedWorkers()); saveJobInfoRequest.setMaxWorkerCount(jobInfo.getMaxWorkerCount()); saveJobInfoRequest.setNotifyUserIds(Func.toLongList(jobInfo.getNotifyUserIds())); saveJobInfoRequest.setEnable(jobInfo.getEnable() == 1); saveJobInfoRequest.setDispatchStrategy(DispatchStrategy.of(jobInfo.getDispatchStrategy())); saveJobInfoRequest.setAlarmConfig(new AlarmConfig(jobInfo.getAlertThreshold(), jobInfo.getStatisticWindowLen(), jobInfo.getSilenceWindowLen())); saveJobInfoRequest.setLogConfig(new LogConfig().setLevel(jobInfo.getLogLevel()).setType(jobInfo.getLogType())); if (Func.isNotEmpty(jobInfo.getLifecycle())) { LifeCycle lifeCycle = new LifeCycle(); String[] lifeCycleArr = Func.toStrArray(jobInfo.getLifecycle()); lifeCycle.setStart(DateUtil.parse(lifeCycleArr[0], DateUtil.PATTERN_DATETIME).getTime()); lifeCycle.setEnd(DateUtil.parse(lifeCycleArr[1], DateUtil.PATTERN_DATETIME).getTime()); saveJobInfoRequest.setLifeCycle(lifeCycle); } saveJobInfoRequest.setExtra(jobInfo.getExtra()); return saveJobInfoRequest; } /** * 本地Job单个转换 * * @param jobInfoDTO 服务端任务信息 * @return SaveJobInfoRequest */ public JobInfo convertToLocal(JobInfoDTO jobInfoDTO, Long jobServerId) { JobInfo jobInfo = new JobInfo(); jobInfo.setJobServerId(jobServerId); jobInfo.setJobId(jobInfoDTO.getId()); jobInfo.setJobName(jobInfoDTO.getJobName()); jobInfo.setJobDescription(jobInfoDTO.getJobDescription()); jobInfo.setJobParams(jobInfoDTO.getJobParams()); jobInfo.setTimeExpressionType(jobInfoDTO.getTimeExpressionType()); jobInfo.setTimeExpression(jobInfoDTO.getTimeExpression()); jobInfo.setExecuteType(jobInfoDTO.getExecuteType()); jobInfo.setProcessorType(jobInfoDTO.getProcessorType()); jobInfo.setProcessorInfo(jobInfoDTO.getProcessorInfo()); jobInfo.setMaxInstanceNum(jobInfoDTO.getMaxInstanceNum()); jobInfo.setConcurrency(jobInfoDTO.getConcurrency()); jobInfo.setInstanceTimeLimit(jobInfoDTO.getInstanceTimeLimit()); jobInfo.setInstanceRetryNum(jobInfoDTO.getInstanceRetryNum()); jobInfo.setTaskRetryNum(jobInfoDTO.getTaskRetryNum()); jobInfo.setMinCpuCores(ConvertUtil.convert(jobInfoDTO.getMinCpuCores(), BigDecimal.class)); jobInfo.setMinMemorySpace(ConvertUtil.convert(jobInfoDTO.getMinMemorySpace(), BigDecimal.class)); jobInfo.setMinDiskSpace(ConvertUtil.convert(jobInfoDTO.getMinDiskSpace(), BigDecimal.class)); jobInfo.setDesignatedWorkers(jobInfoDTO.getDesignatedWorkers()); jobInfo.setMaxWorkerCount(jobInfoDTO.getMaxWorkerCount()); jobInfo.setNotifyUserIds(jobInfoDTO.getNotifyUserIds()); jobInfo.setEnable(jobInfoDTO.getStatus()); jobInfo.setDispatchStrategy(jobInfoDTO.getDispatchStrategy()); if (Func.isNotEmpty(jobInfoDTO.getLifecycle()) && !Func.equalsSafe(jobInfoDTO.getLifecycle(), StringPool.EMPTY_JSON)) { LifeCycle lifeCycle = JsonUtil.parse(jobInfoDTO.getLifecycle(), LifeCycle.class); String start = DateUtil.format(new Date(lifeCycle.getStart()), DateUtil.PATTERN_DATETIME); String end = DateUtil.format(new Date(lifeCycle.getEnd()), DateUtil.PATTERN_DATETIME); jobInfo.setLifecycle(start + StringPool.COMMA + end); } if (Func.isNotEmpty(jobInfoDTO.getAlarmConfig())) { jobInfo.setAlertThreshold(jobInfoDTO.getAlarmConfig().getAlertThreshold()); jobInfo.setStatisticWindowLen(jobInfoDTO.getAlarmConfig().getStatisticWindowLen()); jobInfo.setSilenceWindowLen(jobInfoDTO.getAlarmConfig().getSilenceWindowLen()); } if (Func.isNotEmpty(jobInfoDTO.getLogConfig())) { jobInfo.setLogType(jobInfoDTO.getLogConfig().getType()); jobInfo.setLogLevel(jobInfoDTO.getLogConfig().getLevel()); } jobInfo.setExtra(jobInfoDTO.getExtra()); return jobInfo; } }