diff --git a/ruoyi-modules/ruoyi-job/src/main/java/org/dromara/job/processors/BroadcastProcessorDemo.java b/ruoyi-modules/ruoyi-job/src/main/java/org/dromara/job/processors/BroadcastProcessorDemo.java deleted file mode 100644 index 2b4c28a1f..000000000 --- a/ruoyi-modules/ruoyi-job/src/main/java/org/dromara/job/processors/BroadcastProcessorDemo.java +++ /dev/null @@ -1,56 +0,0 @@ -package org.dromara.job.processors; - -import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Component; -import tech.powerjob.common.utils.NetUtils; -import tech.powerjob.worker.core.processor.ProcessResult; -import tech.powerjob.worker.core.processor.TaskContext; -import tech.powerjob.worker.core.processor.TaskResult; -import tech.powerjob.worker.core.processor.sdk.BroadcastProcessor; -import tech.powerjob.worker.log.OmsLogger; - -import java.util.List; - -/** - * 广播处理器 示例 - * - * @author tjq - * @since 2020/4/17 - */ -@Slf4j -@Component -public class BroadcastProcessorDemo implements BroadcastProcessor { - - @Override - public ProcessResult preProcess(TaskContext context) { - System.out.println("===== BroadcastProcessorDemo#preProcess ======"); - context.getOmsLogger().info("BroadcastProcessorDemo#preProcess, current host: {}", NetUtils.getLocalHost()); - if ("rootFailed".equals(context.getJobParams())) { - return new ProcessResult(false, "console need failed"); - } else { - return new ProcessResult(true); - } - } - - @Override - public ProcessResult process(TaskContext taskContext) throws Exception { - OmsLogger logger = taskContext.getOmsLogger(); - System.out.println("===== BroadcastProcessorDemo#process ======"); - logger.info("BroadcastProcessorDemo#process, current host: {}", NetUtils.getLocalHost()); - long sleepTime = 1000; - try { - sleepTime = Long.parseLong(taskContext.getJobParams()); - } catch (Exception e) { - logger.warn("[BroadcastProcessor] parse sleep time failed!", e); - } - Thread.sleep(Math.max(sleepTime, 1000)); - return new ProcessResult(true); - } - - @Override - public ProcessResult postProcess(TaskContext context, List taskResults) { - System.out.println("===== BroadcastProcessorDemo#postProcess ======"); - context.getOmsLogger().info("BroadcastProcessorDemo#postProcess, current host: {}, taskResult: {}", NetUtils.getLocalHost(), taskResults); - return new ProcessResult(true, "success"); - } -} diff --git a/ruoyi-modules/ruoyi-job/src/main/java/org/dromara/job/processors/LogTestProcessor.java b/ruoyi-modules/ruoyi-job/src/main/java/org/dromara/job/processors/LogTestProcessor.java deleted file mode 100644 index 2a1000fd5..000000000 --- a/ruoyi-modules/ruoyi-job/src/main/java/org/dromara/job/processors/LogTestProcessor.java +++ /dev/null @@ -1,41 +0,0 @@ -package org.dromara.job.processors; - -import com.alibaba.fastjson.JSONObject; -import org.springframework.stereotype.Component; -import tech.powerjob.official.processors.util.CommonUtils; -import tech.powerjob.worker.core.processor.ProcessResult; -import tech.powerjob.worker.core.processor.TaskContext; -import tech.powerjob.worker.core.processor.sdk.BasicProcessor; -import tech.powerjob.worker.log.OmsLogger; - -import java.util.Date; -import java.util.Optional; - -/** - * LogTestProcessor - * - * @author tjq - * @since 2022/9/18 - */ -@Component -public class LogTestProcessor implements BasicProcessor { - - @Override - public ProcessResult process(TaskContext context) throws Exception { - - final OmsLogger omsLogger = context.getOmsLogger(); - final String parseParams = CommonUtils.parseParams(context); - final JSONObject config = Optional.ofNullable(JSONObject.parseObject(parseParams)).orElse(new JSONObject()); - - final long loopTimes = Optional.ofNullable(config.getLong("loopTimes")).orElse(1000L); - - for (int i = 0; i < loopTimes; i++) { - omsLogger.debug("[DEBUG] one DEBUG log in {}", new Date()); - omsLogger.info("[INFO] one INFO log in {}", new Date()); - omsLogger.warn("[WARN] one WARN log in {}", new Date()); - omsLogger.error("[ERROR] one ERROR log in {}", new Date()); - } - - return new ProcessResult(true); - } -} diff --git a/ruoyi-modules/ruoyi-job/src/main/java/org/dromara/job/processors/MapProcessorDemo.java b/ruoyi-modules/ruoyi-job/src/main/java/org/dromara/job/processors/MapProcessorDemo.java deleted file mode 100644 index 720d333e2..000000000 --- a/ruoyi-modules/ruoyi-job/src/main/java/org/dromara/job/processors/MapProcessorDemo.java +++ /dev/null @@ -1,93 +0,0 @@ -package org.dromara.job.processors; - -import com.google.common.collect.Lists; -import lombok.AllArgsConstructor; -import lombok.Getter; -import lombok.NoArgsConstructor; -import org.dromara.common.json.utils.JsonUtils; -import org.springframework.stereotype.Component; -import tech.powerjob.worker.core.processor.ProcessResult; -import tech.powerjob.worker.core.processor.TaskContext; -import tech.powerjob.worker.core.processor.sdk.MapProcessor; - -import java.util.List; -import java.util.concurrent.ThreadLocalRandom; - -/** - * Map处理器 示例 - * - * @author tjq - * @since 2020/4/18 - */ -@Component -public class MapProcessorDemo implements MapProcessor { - - - /** - * 每一批发送任务大小 - */ - private static final int BATCH_SIZE = 100; - /** - * 发送的批次 - */ - private static final int BATCH_NUM = 5; - - @Override - public ProcessResult process(TaskContext context) throws Exception { - - log.info("============== MapProcessorDemo#process =============="); - log.info("isRootTask:{}", isRootTask()); - log.info("taskContext:{}", JsonUtils.toJsonString(context)); - - if (isRootTask()) { - log.info("==== MAP ===="); - List subTasks = Lists.newLinkedList(); - for (int j = 0; j < BATCH_NUM; j++) { - SubTask subTask = new SubTask(); - subTask.siteId = j; - subTask.itemIds = Lists.newLinkedList(); - subTasks.add(subTask); - for (int i = 0; i < BATCH_SIZE; i++) { - subTask.itemIds.add(i + j * 100); - } - } - map(subTasks, "MAP_TEST_TASK"); - return new ProcessResult(true, "map successfully"); - } else { - - log.info("==== PROCESS ===="); - SubTask subTask = (SubTask) context.getSubTask(); - for (Integer itemId : subTask.getItemIds()) { - if (Thread.interrupted()) { - // 任务被中断 - log.info("job has been stop! so stop to process subTask: {} => {}", subTask.getSiteId(), itemId); - break; - } - log.info("processing subTask: {} => {}", subTask.getSiteId(), itemId); - int max = Integer.MAX_VALUE >> 7; - for (int i = 0; ; i++) { - // 模拟耗时操作 - if (i > max) { - break; - } - } - } - // 测试在 Map 任务中追加上下文 - context.getWorkflowContext().appendData2WfContext("Yasuo", "A sword's poor company for a long road."); - boolean b = ThreadLocalRandom.current().nextBoolean(); - if (context.getCurrentRetryTimes() >= 1) { - // 重试的话一定会成功 - b = true; - } - return new ProcessResult(b, "RESULT:" + b); - } - } - - @Getter - @NoArgsConstructor - @AllArgsConstructor - public static class SubTask { - private Integer siteId; - private List itemIds; - } -} diff --git a/ruoyi-modules/ruoyi-job/src/main/java/org/dromara/job/processors/MapReduceProcessorDemo.java b/ruoyi-modules/ruoyi-job/src/main/java/org/dromara/job/processors/MapReduceProcessorDemo.java deleted file mode 100644 index 149885465..000000000 --- a/ruoyi-modules/ruoyi-job/src/main/java/org/dromara/job/processors/MapReduceProcessorDemo.java +++ /dev/null @@ -1,93 +0,0 @@ -package org.dromara.job.processors; - -import cn.hutool.core.lang.Dict; -import com.google.common.collect.Lists; -import lombok.AllArgsConstructor; -import lombok.Getter; -import lombok.NoArgsConstructor; -import lombok.ToString; -import lombok.extern.slf4j.Slf4j; -import org.dromara.common.json.utils.JsonUtils; -import org.springframework.stereotype.Component; -import tech.powerjob.worker.core.processor.ProcessResult; -import tech.powerjob.worker.core.processor.TaskContext; -import tech.powerjob.worker.core.processor.TaskResult; -import tech.powerjob.worker.core.processor.sdk.MapReduceProcessor; -import tech.powerjob.worker.log.OmsLogger; - -import java.util.List; -import java.util.concurrent.ThreadLocalRandom; - -/** - * MapReduce 处理器示例 - * 控制台参数:{"batchSize": 100, "batchNum": 2} - * - * @author tjq - * @since 2020/4/17 - */ -@Slf4j -@Component -public class MapReduceProcessorDemo implements MapReduceProcessor { - - @Override - public ProcessResult process(TaskContext context) throws Exception { - - OmsLogger omsLogger = context.getOmsLogger(); - - log.info("============== TestMapReduceProcessor#process =============="); - log.info("isRootTask:{}", isRootTask()); - log.info("taskContext:{}", JsonUtils.toJsonString(context)); - - // 根据控制台参数获取MR批次及子任务大小 - final Dict jobParams = JsonUtils.parseMap(context.getJobParams()); - - Integer batchSize = (Integer) jobParams.getOrDefault("batchSize", 100); - Integer batchNum = (Integer) jobParams.getOrDefault("batchNum", 10); - - if (isRootTask()) { - log.info("==== MAP ===="); - omsLogger.info("[DemoMRProcessor] start root task~"); - List subTasks = Lists.newLinkedList(); - for (int j = 0; j < batchNum; j++) { - for (int i = 0; i < batchSize; i++) { - int x = j * batchSize + i; - subTasks.add(new TestSubTask("name" + x, x)); - } - map(subTasks, "MAP_TEST_TASK"); - subTasks.clear(); - } - omsLogger.info("[DemoMRProcessor] map success~"); - return new ProcessResult(true, "MAP_SUCCESS"); - } else { - log.info("==== NORMAL_PROCESS ===="); - omsLogger.info("[DemoMRProcessor] process subTask: {}.", JsonUtils.toJsonString(context.getSubTask())); - log.info("subTask: {}", JsonUtils.toJsonString(context.getSubTask())); - Thread.sleep(1000); - if (context.getCurrentRetryTimes() == 0) { - return new ProcessResult(false, "FIRST_FAILED"); - } else { - return new ProcessResult(true, "PROCESS_SUCCESS"); - } - } - } - - @Override - public ProcessResult reduce(TaskContext context, List taskResults) { - log.info("================ MapReduceProcessorDemo#reduce ================"); - log.info("TaskContext: {}", JsonUtils.toJsonString(context)); - log.info("List: {}", JsonUtils.toJsonString(taskResults)); - context.getOmsLogger().info("MapReduce job finished, result is {}.", taskResults); - - boolean success = ThreadLocalRandom.current().nextBoolean(); - return new ProcessResult(success, context + ": " + success); - } - - @Getter - @ToString - @NoArgsConstructor - @AllArgsConstructor - public static class TestSubTask { - private String name; - private int age; - } -} diff --git a/ruoyi-modules/ruoyi-job/src/main/java/org/dromara/job/processors/SimpleProcessor.java b/ruoyi-modules/ruoyi-job/src/main/java/org/dromara/job/processors/SimpleProcessor.java deleted file mode 100644 index 3342cfe55..000000000 --- a/ruoyi-modules/ruoyi-job/src/main/java/org/dromara/job/processors/SimpleProcessor.java +++ /dev/null @@ -1,35 +0,0 @@ -package org.dromara.job.processors; - -import org.springframework.stereotype.Component; -import tech.powerjob.worker.core.processor.ProcessResult; -import tech.powerjob.worker.core.processor.TaskContext; -import tech.powerjob.worker.core.processor.sdk.BasicProcessor; -import tech.powerjob.worker.log.OmsLogger; - -import java.util.Optional; - -/** - * @author Echo009 - * @since 2022/4/27 - */ -@Component -public class SimpleProcessor implements BasicProcessor { - - @Override - public ProcessResult process(TaskContext context) throws Exception { - - OmsLogger logger = context.getOmsLogger(); - - String jobParams = Optional.ofNullable(context.getJobParams()).orElse("S"); - logger.info("Current context:{}", context.getWorkflowContext()); - logger.info("Current job params:{}", jobParams); - - // 测试中文问题 #581 - if (jobParams.contains("CN")) { - return new ProcessResult(true, "任务成功啦!!!"); - } - - return jobParams.contains("F") ? new ProcessResult(false) : new ProcessResult(true, "yeah!"); - - } -} diff --git a/ruoyi-modules/ruoyi-job/src/main/java/org/dromara/job/processors/StandaloneProcessorDemo.java b/ruoyi-modules/ruoyi-job/src/main/java/org/dromara/job/processors/StandaloneProcessorDemo.java deleted file mode 100644 index ea8eff330..000000000 --- a/ruoyi-modules/ruoyi-job/src/main/java/org/dromara/job/processors/StandaloneProcessorDemo.java +++ /dev/null @@ -1,51 +0,0 @@ -package org.dromara.job.processors; - -import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.StringUtils; -import org.springframework.stereotype.Component; -import tech.powerjob.worker.core.processor.ProcessResult; -import tech.powerjob.worker.core.processor.TaskContext; -import tech.powerjob.worker.core.processor.sdk.BasicProcessor; -import tech.powerjob.worker.log.OmsLogger; - -import java.util.Collections; - -/** - * 单机处理器 示例 - * - * @author tjq - * @since 2020/4/17 - */ -@Slf4j -@Component -public class StandaloneProcessorDemo implements BasicProcessor { - - @Override - public ProcessResult process(TaskContext context) throws Exception { - OmsLogger omsLogger = context.getOmsLogger(); - omsLogger.info("StandaloneProcessorDemo start process,context is {}.", context); - omsLogger.info("Notice! If you want this job process failed, your jobParams need to be 'failed'"); - omsLogger.info("Let's test the exception~"); - // 测试异常日志 - try { - Collections.emptyList().add("277"); - } catch (Exception e) { - omsLogger.error("oh~it seems that we have an exception~", e); - } - log.info("================ StandaloneProcessorDemo#process ================"); - log.info("jobParam:{}", context.getJobParams()); - log.info("instanceParams:{}", context.getInstanceParams()); - String param; - // 解析参数,非处于工作流中时,优先取实例参数(允许动态[instanceParams]覆盖静态参数[jobParams]) - if (context.getWorkflowContext() == null) { - param = StringUtils.isBlank(context.getInstanceParams()) ? context.getJobParams() : context.getInstanceParams(); - } else { - param = context.getJobParams(); - } - // 根据参数判断是否成功 - boolean success = !"failed".equals(param); - omsLogger.info("StandaloneProcessorDemo finished process,success: {}", success); - omsLogger.info("anyway, we finished the job successfully~Congratulations!"); - return new ProcessResult(success, context + ": " + success); - } -} diff --git a/ruoyi-modules/ruoyi-job/src/main/java/org/dromara/job/processors/TimeoutProcessor.java b/ruoyi-modules/ruoyi-job/src/main/java/org/dromara/job/processors/TimeoutProcessor.java deleted file mode 100644 index 4b5899f90..000000000 --- a/ruoyi-modules/ruoyi-job/src/main/java/org/dromara/job/processors/TimeoutProcessor.java +++ /dev/null @@ -1,25 +0,0 @@ -package org.dromara.job.processors; - -import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Component; -import tech.powerjob.worker.core.processor.ProcessResult; -import tech.powerjob.worker.core.processor.TaskContext; -import tech.powerjob.worker.core.processor.sdk.BasicProcessor; - -/** - * 测试超时任务(可中断) - * - * @author tjq - * @since 2020/4/20 - */ -@Component -@Slf4j -public class TimeoutProcessor implements BasicProcessor { - @Override - public ProcessResult process(TaskContext context) throws Exception { - long sleepTime = Long.parseLong(context.getJobParams()); - log.info("TaskInstance({}) will sleep {} ms", context.getInstanceId(), sleepTime); - Thread.sleep(Long.parseLong(context.getJobParams())); - return new ProcessResult(true, "impossible~~~~QAQ~"); - } -} diff --git a/ruoyi-modules/ruoyi-job/src/main/java/org/dromara/job/workflow/WorkflowStandaloneProcessor.java b/ruoyi-modules/ruoyi-job/src/main/java/org/dromara/job/workflow/WorkflowStandaloneProcessor.java deleted file mode 100644 index 51187e3d8..000000000 --- a/ruoyi-modules/ruoyi-job/src/main/java/org/dromara/job/workflow/WorkflowStandaloneProcessor.java +++ /dev/null @@ -1,36 +0,0 @@ -package org.dromara.job.workflow; - -import com.alibaba.fastjson.JSON; -import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Component; -import tech.powerjob.worker.core.processor.ProcessResult; -import tech.powerjob.worker.core.processor.TaskContext; -import tech.powerjob.worker.core.processor.sdk.BasicProcessor; -import tech.powerjob.worker.log.OmsLogger; - -import java.util.Map; - -/** - * 工作流测试 - * - * @author tjq - * @since 2020/6/2 - */ -@Component -@Slf4j -public class WorkflowStandaloneProcessor implements BasicProcessor { - - @Override - public ProcessResult process(TaskContext context) throws Exception { - OmsLogger logger = context.getOmsLogger(); - logger.info("current jobParams: {}", context.getJobParams()); - logger.info("current context: {}", context.getWorkflowContext()); - log.info("jobParams:{}", context.getJobParams()); - log.info("currentContext:{}", JSON.toJSONString(context)); - - // 尝试获取上游任务 - Map workflowContext = context.getWorkflowContext().fetchWorkflowContext(); - log.info("工作流上下文数据:{}", workflowContext); - return new ProcessResult(true, context.getJobId() + " process successfully."); - } -}