add 新增 一大堆snailjob的demo案例(感谢 老马)
This commit is contained in:
parent
71dddee146
commit
c171817d6a
@ -0,0 +1,30 @@
|
||||
package org.dromara.job.entity;
|
||||
|
||||
import lombok.Data;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
|
||||
@Data
|
||||
public class BillDto {
|
||||
|
||||
/**
|
||||
* 账单ID
|
||||
*/
|
||||
private Long billId;
|
||||
|
||||
/**
|
||||
* 账单渠道
|
||||
*/
|
||||
private String billChannel;
|
||||
|
||||
/**
|
||||
* 账单日期
|
||||
*/
|
||||
private String billDate;
|
||||
|
||||
/**
|
||||
* 账单金额
|
||||
*/
|
||||
private BigDecimal billAmount;
|
||||
|
||||
}
|
@ -0,0 +1,42 @@
|
||||
package org.dromara.job.snailjob;
|
||||
|
||||
import cn.hutool.core.date.DateUtil;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import com.aizuda.snailjob.client.job.core.annotation.JobExecutor;
|
||||
import com.aizuda.snailjob.client.job.core.dto.JobArgs;
|
||||
import com.aizuda.snailjob.client.model.ExecuteResult;
|
||||
import com.aizuda.snailjob.common.log.SnailJobLog;
|
||||
import org.dromara.common.json.utils.JsonUtils;
|
||||
import org.dromara.job.entity.BillDto;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
|
||||
/**
|
||||
* DAG工作流任务-模拟支付宝账单任务
|
||||
* <a href="https://juejin.cn/post/7487860254114644019"></a>
|
||||
*
|
||||
* @author 老马
|
||||
*/
|
||||
@Component
|
||||
@JobExecutor(name = "alipayBillTask")
|
||||
public class AlipayBillTask {
|
||||
|
||||
public ExecuteResult jobExecute(JobArgs jobArgs) throws InterruptedException {
|
||||
BillDto billDto = new BillDto();
|
||||
billDto.setBillId(23456789L);
|
||||
billDto.setBillChannel("alipay");
|
||||
// 设置清算日期
|
||||
String settlementDate = (String) jobArgs.getWfContext().get("settlementDate");
|
||||
if (StrUtil.equals(settlementDate, "sysdate")) {
|
||||
settlementDate = DateUtil.today();
|
||||
}
|
||||
billDto.setBillDate(settlementDate);
|
||||
billDto.setBillAmount(new BigDecimal("2345.67"));
|
||||
// 把billDto对象放入上下文进行传递
|
||||
jobArgs.appendContext("alipay", JsonUtils.toJsonString(billDto));
|
||||
SnailJobLog.REMOTE.info("上下文: {}", jobArgs.getWfContext());
|
||||
return ExecuteResult.success(billDto);
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,45 @@
|
||||
package org.dromara.job.snailjob;
|
||||
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import com.aizuda.snailjob.client.job.core.annotation.JobExecutor;
|
||||
import com.aizuda.snailjob.client.job.core.dto.JobArgs;
|
||||
import com.aizuda.snailjob.client.model.ExecuteResult;
|
||||
import com.aizuda.snailjob.common.log.SnailJobLog;
|
||||
import org.dromara.common.json.utils.JsonUtils;
|
||||
import org.dromara.job.entity.BillDto;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
|
||||
/**
|
||||
* DAG工作流任务-模拟汇总账单任务
|
||||
* <a href="https://juejin.cn/post/7487860254114644019"></a>
|
||||
*
|
||||
* @author 老马
|
||||
*/
|
||||
@Component
|
||||
@JobExecutor(name = "summaryBillTask")
|
||||
public class SummaryBillTask {
|
||||
|
||||
public ExecuteResult jobExecute(JobArgs jobArgs) throws InterruptedException {
|
||||
// 获得微信账单
|
||||
BigDecimal wechatAmount = BigDecimal.valueOf(0);
|
||||
String wechat = (String) jobArgs.getWfContext("wechat");
|
||||
if (StrUtil.isNotBlank(wechat)) {
|
||||
BillDto wechatBillDto = JsonUtils.parseObject(wechat, BillDto.class);
|
||||
wechatAmount = wechatBillDto.getBillAmount();
|
||||
}
|
||||
// 获得支付宝账单
|
||||
BigDecimal alipayAmount = BigDecimal.valueOf(0);
|
||||
String alipay = (String) jobArgs.getWfContext("alipay");
|
||||
if (StrUtil.isNotBlank(alipay)) {
|
||||
BillDto alipayBillDto = JsonUtils.parseObject(alipay, BillDto.class);
|
||||
alipayAmount = alipayBillDto.getBillAmount();
|
||||
}
|
||||
// 汇总账单
|
||||
BigDecimal totalAmount = wechatAmount.add(alipayAmount);
|
||||
SnailJobLog.REMOTE.info("总金额: {}", totalAmount);
|
||||
return ExecuteResult.success(totalAmount);
|
||||
}
|
||||
|
||||
}
|
@ -8,8 +8,10 @@ import com.aizuda.snailjob.common.log.SnailJobLog;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* @author opensnail
|
||||
* @date 2024-05-17
|
||||
* 正常任务
|
||||
* <a href="https://juejin.cn/post/7418074037392293914"></a>
|
||||
*
|
||||
* @author 老马
|
||||
*/
|
||||
@Component
|
||||
@JobExecutor(name = "testJobExecutor")
|
||||
|
@ -0,0 +1,37 @@
|
||||
package org.dromara.job.snailjob;
|
||||
|
||||
import cn.hutool.core.util.RandomUtil;
|
||||
import com.aizuda.snailjob.client.job.core.annotation.JobExecutor;
|
||||
import com.aizuda.snailjob.client.job.core.dto.JobArgs;
|
||||
import com.aizuda.snailjob.client.model.ExecuteResult;
|
||||
import com.aizuda.snailjob.common.log.SnailJobLog;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* 广播任务
|
||||
* <a href="https://juejin.cn/post/7422948006150438950"></a>
|
||||
*
|
||||
* @author 老马
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
@JobExecutor(name = "testBroadcastJob")
|
||||
public class TestBroadcastJob {
|
||||
|
||||
@Value("${snail-job.port}")
|
||||
private int clientPort;
|
||||
|
||||
public ExecuteResult jobExecute(JobArgs jobArgs) {
|
||||
int randomInt = RandomUtil.randomInt(100);
|
||||
log.info("随机数: {}", randomInt);
|
||||
SnailJobLog.REMOTE.info("随机数: {},客户端端口:{}", randomInt, clientPort);
|
||||
if (randomInt < 50) {
|
||||
throw new RuntimeException("随机数小于50,收集日志任务执行失败");
|
||||
}
|
||||
// 获得jobArgs 中传入的相加的两个数
|
||||
return ExecuteResult.success("随机数大于50,收集日志任务执行成功");
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,53 @@
|
||||
package org.dromara.job.snailjob;
|
||||
|
||||
import cn.hutool.core.thread.ThreadUtil;
|
||||
import cn.hutool.extra.spring.SpringUtil;
|
||||
import com.aizuda.snailjob.client.job.core.MapHandler;
|
||||
import com.aizuda.snailjob.client.job.core.annotation.JobExecutor;
|
||||
import com.aizuda.snailjob.client.job.core.annotation.MapExecutor;
|
||||
import com.aizuda.snailjob.client.job.core.dto.MapArgs;
|
||||
import com.aizuda.snailjob.client.model.ExecuteResult;
|
||||
import com.aizuda.snailjob.common.log.SnailJobLog;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
/**
|
||||
* Map任务 动态分配 只分片不关注结果
|
||||
* <a href="https://juejin.cn/post/7446362500478894106"></a>
|
||||
*
|
||||
* @author 老马
|
||||
*/
|
||||
@Component
|
||||
@JobExecutor(name = "testMapJobAnnotation")
|
||||
public class TestMapJobAnnotation {
|
||||
|
||||
@MapExecutor
|
||||
public ExecuteResult doJobMapExecute(MapArgs mapArgs, MapHandler mapHandler) {
|
||||
// 生成1~200数值并分片
|
||||
int partitionSize = 50;
|
||||
List<List<Integer>> partition = IntStream.rangeClosed(1, 200)
|
||||
.boxed()
|
||||
.collect(Collectors.groupingBy(i -> (i - 1) / partitionSize))
|
||||
.values()
|
||||
.stream()
|
||||
.toList();
|
||||
SnailJobLog.REMOTE.info("端口:{}完成分配任务", SpringUtil.getProperty("server.port"));
|
||||
return mapHandler.doMap(partition, "doCalc");
|
||||
}
|
||||
|
||||
@MapExecutor(taskName = "doCalc")
|
||||
public ExecuteResult doCalc(MapArgs mapArgs) {
|
||||
List<Integer> sourceList = (List<Integer>) mapArgs.getMapResult();
|
||||
// 遍历sourceList的每一个元素,计算出一个累加值partitionTotal
|
||||
int partitionTotal = sourceList.stream().mapToInt(i -> i).sum();
|
||||
// 打印日志到服务器
|
||||
ThreadUtil.sleep(3, TimeUnit.SECONDS);
|
||||
SnailJobLog.REMOTE.info("端口:{},partitionTotal:{}", SpringUtil.getProperty("server.port"), partitionTotal);
|
||||
return ExecuteResult.success(partitionTotal);
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,60 @@
|
||||
package org.dromara.job.snailjob;
|
||||
|
||||
import cn.hutool.core.thread.ThreadUtil;
|
||||
import cn.hutool.extra.spring.SpringUtil;
|
||||
import com.aizuda.snailjob.client.job.core.MapHandler;
|
||||
import com.aizuda.snailjob.client.job.core.annotation.JobExecutor;
|
||||
import com.aizuda.snailjob.client.job.core.annotation.MapExecutor;
|
||||
import com.aizuda.snailjob.client.job.core.annotation.ReduceExecutor;
|
||||
import com.aizuda.snailjob.client.job.core.dto.MapArgs;
|
||||
import com.aizuda.snailjob.client.job.core.dto.ReduceArgs;
|
||||
import com.aizuda.snailjob.client.model.ExecuteResult;
|
||||
import com.aizuda.snailjob.common.log.SnailJobLog;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
/**
|
||||
* MapReduce任务 动态分配 分片后合并结果
|
||||
* <a href="https://juejin.cn/post/7448551286506913802"></a>
|
||||
*
|
||||
* @author 老马
|
||||
*/
|
||||
@Component
|
||||
@JobExecutor(name = "testMapReduceAnnotation1")
|
||||
public class TestMapReduceAnnotation1 {
|
||||
|
||||
@MapExecutor
|
||||
public ExecuteResult rootMapExecute(MapArgs mapArgs, MapHandler mapHandler) {
|
||||
int partitionSize = 50;
|
||||
List<List<Integer>> partition = IntStream.rangeClosed(1, 200)
|
||||
.boxed()
|
||||
.collect(Collectors.groupingBy(i -> (i - 1) / partitionSize))
|
||||
.values()
|
||||
.stream()
|
||||
.toList();
|
||||
SnailJobLog.REMOTE.info("端口:{}完成分配任务", SpringUtil.getProperty("server.port"));
|
||||
return mapHandler.doMap(partition, "doCalc");
|
||||
}
|
||||
|
||||
@MapExecutor(taskName = "doCalc")
|
||||
public ExecuteResult doCalc(MapArgs mapArgs) {
|
||||
List<Integer> sourceList = (List<Integer>) mapArgs.getMapResult();
|
||||
// 遍历sourceList的每一个元素,计算出一个累加值partitionTotal
|
||||
int partitionTotal = sourceList.stream().mapToInt(i -> i).sum();
|
||||
// 打印日志到服务器
|
||||
ThreadUtil.sleep(3, TimeUnit.SECONDS);
|
||||
SnailJobLog.REMOTE.info("端口:{},partitionTotal:{}", SpringUtil.getProperty("server.port"), partitionTotal);
|
||||
return ExecuteResult.success(partitionTotal);
|
||||
}
|
||||
|
||||
@ReduceExecutor
|
||||
public ExecuteResult reduceExecute(ReduceArgs reduceArgs) {
|
||||
int reduceTotal = reduceArgs.getMapResult().stream().mapToInt(i -> Integer.parseInt((String) i)).sum();
|
||||
SnailJobLog.REMOTE.info("端口:{},reduceTotal:{}", SpringUtil.getProperty("server.port"), reduceTotal);
|
||||
return ExecuteResult.success(reduceTotal);
|
||||
}
|
||||
}
|
@ -0,0 +1,36 @@
|
||||
package org.dromara.job.snailjob;
|
||||
|
||||
import com.aizuda.snailjob.client.job.core.annotation.JobExecutor;
|
||||
import com.aizuda.snailjob.client.job.core.dto.JobArgs;
|
||||
import com.aizuda.snailjob.client.model.ExecuteResult;
|
||||
import com.aizuda.snailjob.common.log.SnailJobLog;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* 静态分片 根据服务端任务参数分片
|
||||
* <a href="https://juejin.cn/post/7426232375703896101"></a>
|
||||
*
|
||||
* @author 老马
|
||||
*/
|
||||
@Component
|
||||
@JobExecutor(name = "testStaticShardingJob")
|
||||
public class TestStaticShardingJob {
|
||||
|
||||
public ExecuteResult jobExecute(JobArgs jobArgs) {
|
||||
String jobParams = String.valueOf(jobArgs.getJobParams());
|
||||
SnailJobLog.LOCAL.info("开始执行分片任务,参数:{}", jobParams);
|
||||
// 获得jobArgs 中传入的开始id和结束id
|
||||
String[] split = jobParams.split(",");
|
||||
Long fromId = Long.parseLong(split[0]);
|
||||
Long toId = Long.parseLong(split[1]);
|
||||
// 模拟数据库操作,对范围id,进行加密处理
|
||||
try {
|
||||
SnailJobLog.REMOTE.info("开始对id范围:{}进行加密处理", fromId + "-" + toId);
|
||||
Thread.sleep(3000);
|
||||
SnailJobLog.REMOTE.info("对id范围:{}进行加密处理完成", fromId + "-" + toId);
|
||||
} catch (InterruptedException e) {
|
||||
return ExecuteResult.failure("任务执行失败");
|
||||
}
|
||||
return ExecuteResult.success("执行分片任务完成");
|
||||
}
|
||||
}
|
@ -0,0 +1,43 @@
|
||||
package org.dromara.job.snailjob;
|
||||
|
||||
import cn.hutool.core.date.DateUtil;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import com.aizuda.snailjob.client.job.core.annotation.JobExecutor;
|
||||
import com.aizuda.snailjob.client.job.core.dto.JobArgs;
|
||||
import com.aizuda.snailjob.client.model.ExecuteResult;
|
||||
import com.aizuda.snailjob.common.log.SnailJobLog;
|
||||
import org.dromara.common.json.utils.JsonUtils;
|
||||
import org.dromara.job.entity.BillDto;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
|
||||
/**
|
||||
* DAG工作流任务-模拟微信账单任务
|
||||
* <a href="https://juejin.cn/post/7487860254114644019"></a>
|
||||
*
|
||||
* @author 老马
|
||||
*/
|
||||
@Component
|
||||
@JobExecutor(name = "wechatBillTask")
|
||||
public class WechatBillTask {
|
||||
|
||||
public ExecuteResult jobExecute(JobArgs jobArgs) throws InterruptedException {
|
||||
BillDto billDto = new BillDto();
|
||||
billDto.setBillId(123456789L);
|
||||
billDto.setBillChannel("wechat");
|
||||
// 从上下文中获得清算日期并设置,如果上下文中清算日期
|
||||
// 是sysdate设置为当前日期;否则取管理页面设置的值
|
||||
String settlementDate = (String) jobArgs.getWfContext().get("settlementDate");
|
||||
if (StrUtil.equals(settlementDate, "sysdate")) {
|
||||
settlementDate = DateUtil.today();
|
||||
}
|
||||
billDto.setBillDate(settlementDate);
|
||||
billDto.setBillAmount(new BigDecimal("1234.56"));
|
||||
// 把billDto对象放入上下文进行传递
|
||||
jobArgs.appendContext("wechat", JsonUtils.toJsonString(billDto));
|
||||
SnailJobLog.REMOTE.info("上下文: {}", jobArgs.getWfContext());
|
||||
return ExecuteResult.success(billDto);
|
||||
}
|
||||
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user