XXL Job 是一个开源的分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、易扩展的分布式任务调度框架。
这两天咱们开发的 AI Cloud 项目中,也使用到了 XXL Job 来执行分布式任务的调度,可以看出它的部署和使用虽然步骤很多,但用起来还是很简单的。
因为其本身为 Spring Boot 项目,所有对于 Java 程序员来说很友好,而且它还提供中文控制台,所以这也是他能在国内分布式任务调度系统这块一直流行的原因,如下图所示:
那么接下来咱们就来聊聊,XXL Job 的路由策略,以及路由策略中分片任务的执行原理。
XXL Job 的路由策略主要作用是在任务执行器集群环境中,决定如何选择合适的执行器来执行任务。
XXL Job 路由策略包含以下几个:
其中:
也就是说在这些路由策略中,最复杂的就是分片广播了。
所谓的分片广播也就是分片(执行)任务,它是将一个大任务划分为多个子任务并行执行,以提高效率。
假设,我们现在要使用分片任务执行一个大数据的查询与处理,此时的实现代码如下:
import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.log.XxlJobLogger;
import org.springframework.stereotype.Component;
import java.util.List;
@Component
public class ShardingBroadcastJob {
@XxlJob("shardingBroadcastTask")
public void execute(String param) {
// 获取分片参数:分片总数和分片序列号
int shardIndex = XxlJobHelper.getShardIndex();
int shardTotal = XxlJobHelper.getShardTotal();
XxlJobLogger.log("当前节点的 index={}, 总结点数={}, 参数={}", shardIndex, shardTotal, param);
// 模拟获取数据列表
List<String> dataList = getDataList();
// 执行分片逻辑
shardingExecute(dataList, shardIndex, shardTotal);
}
public List<String> getDataList() {
// 这里可以根据实际情况从数据库或其他数据源获取数据列表
// 为了示例简单,直接返回一个固定的列表
return List.of("data1", "data2", "data3", "data4", "data5", "data6", "data7", "data8", "data9", "data10");
}
public void shardingExecute(List<String> dataList, int shardIndex, int shardTotal) {
XxlJobLogger.log("开始执行分片任务,当前分片={}, 总分片数={}", shardIndex, shardTotal);
// 计算当前分片应处理的数据范围
int start = (shardIndex * dataList.size()) / shardTotal;
int end = ((shardIndex + 1) * dataList.size()) / shardTotal;
// 处理当前分片的数据
for (int i = start; i < end; i++) {
String data = dataList.get(i);
XxlJobLogger.log("处理数据: {}", data);
// 在此处添加具体的数据处理逻辑
}
XxlJobLogger.log("分片任务执行完成");
}
}
在上述代码中,在execute方法中,通过 XxlJobHelper.getShardIndex() 获取当前分片序号,通过 XxlJobHelper.getShardTotal() 获取总分片数。然后模拟获取了一个数据列表 dataList,接下来使用 shardingExecute 方法执行分片逻辑。
在 shardingExecute 方法中,根据分片序号和总分片数计算出当前分片应处理的数据范围,然后遍历该范围内的数据并进行处理(此处仅打印数据,实际应用中可添加具体的数据处理逻辑)。
在实际使用时,需要将任务部署到 XXL Job 执行器集群中,并在调度中心配置相应的任务,选择分片广播的路由策略。这样,当调度中心触发任务时,所有执行器都会执行该任务,并根据分片参数处理相应的数据分片,这样就能提升程序整体的执行效率了。
了解了 XXL Job 的代码实现就能明白其运行原理,它的实现原理如下:
在分片任务时,如果其中某台机器掉电了导致结果一直未能正常返回,XXL Job 会如何处理?XXL Job 怎么保证任务只会被执行一次的?
本文已收录到我的面试小站 www.javacn.site,其中包含的内容有:Redis、JVM、并发、并发、MySQL、Spring、Spring MVC、Spring Boot、Spring Cloud、MyBatis、设计模式、消息队列等模块。