diff --git a/ruoyi-modules/ruoyi-job/src/main/java/org/dromara/job/entity/BillDto.java b/ruoyi-modules/ruoyi-job/src/main/java/org/dromara/job/entity/BillDto.java new file mode 100644 index 000000000..2661e3465 --- /dev/null +++ b/ruoyi-modules/ruoyi-job/src/main/java/org/dromara/job/entity/BillDto.java @@ -0,0 +1,30 @@ +package org.dromara.job.entity; + +import lombok.Data; + +import java.math.BigDecimal; + +@Data +public class BillDto { + + /** + * 账单ID + */ + private Long billId; + + /** + * 账单渠道 + */ + private String billChannel; + + /** + * 账单日期 + */ + private String billDate; + + /** + * 账单金额 + */ + private BigDecimal billAmount; + +} diff --git a/ruoyi-modules/ruoyi-job/src/main/java/org/dromara/job/snailjob/AlipayBillTask.java b/ruoyi-modules/ruoyi-job/src/main/java/org/dromara/job/snailjob/AlipayBillTask.java new file mode 100644 index 000000000..b8ad8cc39 --- /dev/null +++ b/ruoyi-modules/ruoyi-job/src/main/java/org/dromara/job/snailjob/AlipayBillTask.java @@ -0,0 +1,42 @@ +package org.dromara.job.snailjob; + +import cn.hutool.core.date.DateUtil; +import cn.hutool.core.util.StrUtil; +import com.aizuda.snailjob.client.job.core.annotation.JobExecutor; +import com.aizuda.snailjob.client.job.core.dto.JobArgs; +import com.aizuda.snailjob.client.model.ExecuteResult; +import com.aizuda.snailjob.common.log.SnailJobLog; +import org.dromara.common.json.utils.JsonUtils; +import org.dromara.job.entity.BillDto; +import org.springframework.stereotype.Component; + +import java.math.BigDecimal; + +/** + * DAG工作流任务-模拟支付宝账单任务 + * + * + * @author 老马 + */ +@Component +@JobExecutor(name = "alipayBillTask") +public class AlipayBillTask { + + public ExecuteResult jobExecute(JobArgs jobArgs) throws InterruptedException { + BillDto billDto = new BillDto(); + billDto.setBillId(23456789L); + billDto.setBillChannel("alipay"); + // 设置清算日期 + String settlementDate = (String) jobArgs.getWfContext().get("settlementDate"); + if (StrUtil.equals(settlementDate, "sysdate")) { + settlementDate = DateUtil.today(); + } + billDto.setBillDate(settlementDate); + billDto.setBillAmount(new BigDecimal("2345.67")); + // 把billDto对象放入上下文进行传递 + jobArgs.appendContext("alipay", JsonUtils.toJsonString(billDto)); + SnailJobLog.REMOTE.info("上下文: {}", jobArgs.getWfContext()); + return ExecuteResult.success(billDto); + } + +} diff --git a/ruoyi-modules/ruoyi-job/src/main/java/org/dromara/job/snailjob/SummaryBillTask.java b/ruoyi-modules/ruoyi-job/src/main/java/org/dromara/job/snailjob/SummaryBillTask.java new file mode 100644 index 000000000..bff15f97e --- /dev/null +++ b/ruoyi-modules/ruoyi-job/src/main/java/org/dromara/job/snailjob/SummaryBillTask.java @@ -0,0 +1,45 @@ +package org.dromara.job.snailjob; + +import cn.hutool.core.util.StrUtil; +import com.aizuda.snailjob.client.job.core.annotation.JobExecutor; +import com.aizuda.snailjob.client.job.core.dto.JobArgs; +import com.aizuda.snailjob.client.model.ExecuteResult; +import com.aizuda.snailjob.common.log.SnailJobLog; +import org.dromara.common.json.utils.JsonUtils; +import org.dromara.job.entity.BillDto; +import org.springframework.stereotype.Component; + +import java.math.BigDecimal; + +/** + * DAG工作流任务-模拟汇总账单任务 + * + * + * @author 老马 + */ +@Component +@JobExecutor(name = "summaryBillTask") +public class SummaryBillTask { + + public ExecuteResult jobExecute(JobArgs jobArgs) throws InterruptedException { + // 获得微信账单 + BigDecimal wechatAmount = BigDecimal.valueOf(0); + String wechat = (String) jobArgs.getWfContext("wechat"); + if (StrUtil.isNotBlank(wechat)) { + BillDto wechatBillDto = JsonUtils.parseObject(wechat, BillDto.class); + wechatAmount = wechatBillDto.getBillAmount(); + } + // 获得支付宝账单 + BigDecimal alipayAmount = BigDecimal.valueOf(0); + String alipay = (String) jobArgs.getWfContext("alipay"); + if (StrUtil.isNotBlank(alipay)) { + BillDto alipayBillDto = JsonUtils.parseObject(alipay, BillDto.class); + alipayAmount = alipayBillDto.getBillAmount(); + } + // 汇总账单 + BigDecimal totalAmount = wechatAmount.add(alipayAmount); + SnailJobLog.REMOTE.info("总金额: {}", totalAmount); + return ExecuteResult.success(totalAmount); + } + +} diff --git a/ruoyi-modules/ruoyi-job/src/main/java/org/dromara/job/snailjob/TestAnnoJobExecutor.java b/ruoyi-modules/ruoyi-job/src/main/java/org/dromara/job/snailjob/TestAnnoJobExecutor.java index 5bea9daf3..e5339f5b4 100644 --- a/ruoyi-modules/ruoyi-job/src/main/java/org/dromara/job/snailjob/TestAnnoJobExecutor.java +++ b/ruoyi-modules/ruoyi-job/src/main/java/org/dromara/job/snailjob/TestAnnoJobExecutor.java @@ -8,8 +8,10 @@ import com.aizuda.snailjob.common.log.SnailJobLog; import org.springframework.stereotype.Component; /** - * @author opensnail - * @date 2024-05-17 + * 正常任务 + * + * + * @author 老马 */ @Component @JobExecutor(name = "testJobExecutor") diff --git a/ruoyi-modules/ruoyi-job/src/main/java/org/dromara/job/snailjob/TestBroadcastJob.java b/ruoyi-modules/ruoyi-job/src/main/java/org/dromara/job/snailjob/TestBroadcastJob.java new file mode 100644 index 000000000..d77e72e6c --- /dev/null +++ b/ruoyi-modules/ruoyi-job/src/main/java/org/dromara/job/snailjob/TestBroadcastJob.java @@ -0,0 +1,37 @@ +package org.dromara.job.snailjob; + +import cn.hutool.core.util.RandomUtil; +import com.aizuda.snailjob.client.job.core.annotation.JobExecutor; +import com.aizuda.snailjob.client.job.core.dto.JobArgs; +import com.aizuda.snailjob.client.model.ExecuteResult; +import com.aizuda.snailjob.common.log.SnailJobLog; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +/** + * 广播任务 + * + * + * @author 老马 + */ +@Slf4j +@Component +@JobExecutor(name = "testBroadcastJob") +public class TestBroadcastJob { + + @Value("${snail-job.port}") + private int clientPort; + + public ExecuteResult jobExecute(JobArgs jobArgs) { + int randomInt = RandomUtil.randomInt(100); + log.info("随机数: {}", randomInt); + SnailJobLog.REMOTE.info("随机数: {},客户端端口:{}", randomInt, clientPort); + if (randomInt < 50) { + throw new RuntimeException("随机数小于50,收集日志任务执行失败"); + } + // 获得jobArgs 中传入的相加的两个数 + return ExecuteResult.success("随机数大于50,收集日志任务执行成功"); + } + +} diff --git a/ruoyi-modules/ruoyi-job/src/main/java/org/dromara/job/snailjob/TestMapJobAnnotation.java b/ruoyi-modules/ruoyi-job/src/main/java/org/dromara/job/snailjob/TestMapJobAnnotation.java new file mode 100644 index 000000000..6589ed1c5 --- /dev/null +++ b/ruoyi-modules/ruoyi-job/src/main/java/org/dromara/job/snailjob/TestMapJobAnnotation.java @@ -0,0 +1,53 @@ +package org.dromara.job.snailjob; + +import cn.hutool.core.thread.ThreadUtil; +import cn.hutool.extra.spring.SpringUtil; +import com.aizuda.snailjob.client.job.core.MapHandler; +import com.aizuda.snailjob.client.job.core.annotation.JobExecutor; +import com.aizuda.snailjob.client.job.core.annotation.MapExecutor; +import com.aizuda.snailjob.client.job.core.dto.MapArgs; +import com.aizuda.snailjob.client.model.ExecuteResult; +import com.aizuda.snailjob.common.log.SnailJobLog; +import org.springframework.stereotype.Component; + +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * Map任务 动态分配 只分片不关注结果 + * + * + * @author 老马 + */ +@Component +@JobExecutor(name = "testMapJobAnnotation") +public class TestMapJobAnnotation { + + @MapExecutor + public ExecuteResult doJobMapExecute(MapArgs mapArgs, MapHandler mapHandler) { + // 生成1~200数值并分片 + int partitionSize = 50; + List> partition = IntStream.rangeClosed(1, 200) + .boxed() + .collect(Collectors.groupingBy(i -> (i - 1) / partitionSize)) + .values() + .stream() + .toList(); + SnailJobLog.REMOTE.info("端口:{}完成分配任务", SpringUtil.getProperty("server.port")); + return mapHandler.doMap(partition, "doCalc"); + } + + @MapExecutor(taskName = "doCalc") + public ExecuteResult doCalc(MapArgs mapArgs) { + List sourceList = (List) mapArgs.getMapResult(); + // 遍历sourceList的每一个元素,计算出一个累加值partitionTotal + int partitionTotal = sourceList.stream().mapToInt(i -> i).sum(); + // 打印日志到服务器 + ThreadUtil.sleep(3, TimeUnit.SECONDS); + SnailJobLog.REMOTE.info("端口:{},partitionTotal:{}", SpringUtil.getProperty("server.port"), partitionTotal); + return ExecuteResult.success(partitionTotal); + } + +} diff --git a/ruoyi-modules/ruoyi-job/src/main/java/org/dromara/job/snailjob/TestMapReduceAnnotation1.java b/ruoyi-modules/ruoyi-job/src/main/java/org/dromara/job/snailjob/TestMapReduceAnnotation1.java new file mode 100644 index 000000000..4ae2fa80a --- /dev/null +++ b/ruoyi-modules/ruoyi-job/src/main/java/org/dromara/job/snailjob/TestMapReduceAnnotation1.java @@ -0,0 +1,60 @@ +package org.dromara.job.snailjob; + +import cn.hutool.core.thread.ThreadUtil; +import cn.hutool.extra.spring.SpringUtil; +import com.aizuda.snailjob.client.job.core.MapHandler; +import com.aizuda.snailjob.client.job.core.annotation.JobExecutor; +import com.aizuda.snailjob.client.job.core.annotation.MapExecutor; +import com.aizuda.snailjob.client.job.core.annotation.ReduceExecutor; +import com.aizuda.snailjob.client.job.core.dto.MapArgs; +import com.aizuda.snailjob.client.job.core.dto.ReduceArgs; +import com.aizuda.snailjob.client.model.ExecuteResult; +import com.aizuda.snailjob.common.log.SnailJobLog; +import org.springframework.stereotype.Component; + +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * MapReduce任务 动态分配 分片后合并结果 + * + * + * @author 老马 + */ +@Component +@JobExecutor(name = "testMapReduceAnnotation1") +public class TestMapReduceAnnotation1 { + + @MapExecutor + public ExecuteResult rootMapExecute(MapArgs mapArgs, MapHandler mapHandler) { + int partitionSize = 50; + List> partition = IntStream.rangeClosed(1, 200) + .boxed() + .collect(Collectors.groupingBy(i -> (i - 1) / partitionSize)) + .values() + .stream() + .toList(); + SnailJobLog.REMOTE.info("端口:{}完成分配任务", SpringUtil.getProperty("server.port")); + return mapHandler.doMap(partition, "doCalc"); + } + + @MapExecutor(taskName = "doCalc") + public ExecuteResult doCalc(MapArgs mapArgs) { + List sourceList = (List) mapArgs.getMapResult(); + // 遍历sourceList的每一个元素,计算出一个累加值partitionTotal + int partitionTotal = sourceList.stream().mapToInt(i -> i).sum(); + // 打印日志到服务器 + ThreadUtil.sleep(3, TimeUnit.SECONDS); + SnailJobLog.REMOTE.info("端口:{},partitionTotal:{}", SpringUtil.getProperty("server.port"), partitionTotal); + return ExecuteResult.success(partitionTotal); + } + + @ReduceExecutor + public ExecuteResult reduceExecute(ReduceArgs reduceArgs) { + int reduceTotal = reduceArgs.getMapResult().stream().mapToInt(i -> Integer.parseInt((String) i)).sum(); + SnailJobLog.REMOTE.info("端口:{},reduceTotal:{}", SpringUtil.getProperty("server.port"), reduceTotal); + return ExecuteResult.success(reduceTotal); + } +} diff --git a/ruoyi-modules/ruoyi-job/src/main/java/org/dromara/job/snailjob/TestStaticShardingJob.java b/ruoyi-modules/ruoyi-job/src/main/java/org/dromara/job/snailjob/TestStaticShardingJob.java new file mode 100644 index 000000000..07a1bc566 --- /dev/null +++ b/ruoyi-modules/ruoyi-job/src/main/java/org/dromara/job/snailjob/TestStaticShardingJob.java @@ -0,0 +1,36 @@ +package org.dromara.job.snailjob; + +import com.aizuda.snailjob.client.job.core.annotation.JobExecutor; +import com.aizuda.snailjob.client.job.core.dto.JobArgs; +import com.aizuda.snailjob.client.model.ExecuteResult; +import com.aizuda.snailjob.common.log.SnailJobLog; +import org.springframework.stereotype.Component; + +/** + * 静态分片 根据服务端任务参数分片 + * + * + * @author 老马 + */ +@Component +@JobExecutor(name = "testStaticShardingJob") +public class TestStaticShardingJob { + + public ExecuteResult jobExecute(JobArgs jobArgs) { + String jobParams = String.valueOf(jobArgs.getJobParams()); + SnailJobLog.LOCAL.info("开始执行分片任务,参数:{}", jobParams); + // 获得jobArgs 中传入的开始id和结束id + String[] split = jobParams.split(","); + Long fromId = Long.parseLong(split[0]); + Long toId = Long.parseLong(split[1]); + // 模拟数据库操作,对范围id,进行加密处理 + try { + SnailJobLog.REMOTE.info("开始对id范围:{}进行加密处理", fromId + "-" + toId); + Thread.sleep(3000); + SnailJobLog.REMOTE.info("对id范围:{}进行加密处理完成", fromId + "-" + toId); + } catch (InterruptedException e) { + return ExecuteResult.failure("任务执行失败"); + } + return ExecuteResult.success("执行分片任务完成"); + } +} diff --git a/ruoyi-modules/ruoyi-job/src/main/java/org/dromara/job/snailjob/WechatBillTask.java b/ruoyi-modules/ruoyi-job/src/main/java/org/dromara/job/snailjob/WechatBillTask.java new file mode 100644 index 000000000..d8caf1af0 --- /dev/null +++ b/ruoyi-modules/ruoyi-job/src/main/java/org/dromara/job/snailjob/WechatBillTask.java @@ -0,0 +1,43 @@ +package org.dromara.job.snailjob; + +import cn.hutool.core.date.DateUtil; +import cn.hutool.core.util.StrUtil; +import com.aizuda.snailjob.client.job.core.annotation.JobExecutor; +import com.aizuda.snailjob.client.job.core.dto.JobArgs; +import com.aizuda.snailjob.client.model.ExecuteResult; +import com.aizuda.snailjob.common.log.SnailJobLog; +import org.dromara.common.json.utils.JsonUtils; +import org.dromara.job.entity.BillDto; +import org.springframework.stereotype.Component; + +import java.math.BigDecimal; + +/** + * DAG工作流任务-模拟微信账单任务 + * + * + * @author 老马 + */ +@Component +@JobExecutor(name = "wechatBillTask") +public class WechatBillTask { + + public ExecuteResult jobExecute(JobArgs jobArgs) throws InterruptedException { + BillDto billDto = new BillDto(); + billDto.setBillId(123456789L); + billDto.setBillChannel("wechat"); + // 从上下文中获得清算日期并设置,如果上下文中清算日期 + // 是sysdate设置为当前日期;否则取管理页面设置的值 + String settlementDate = (String) jobArgs.getWfContext().get("settlementDate"); + if (StrUtil.equals(settlementDate, "sysdate")) { + settlementDate = DateUtil.today(); + } + billDto.setBillDate(settlementDate); + billDto.setBillAmount(new BigDecimal("1234.56")); + // 把billDto对象放入上下文进行传递 + jobArgs.appendContext("wechat", JsonUtils.toJsonString(billDto)); + SnailJobLog.REMOTE.info("上下文: {}", jobArgs.getWfContext()); + return ExecuteResult.success(billDto); + } + +}