首页 星云 工具 资源 星选 资讯 热门工具
:

PDF转图片 完全免费 小红书视频下载 无水印 抖音视频下载 无水印 数字星空

说说XXLJob分片任务实现原理?

编程知识
2024年07月17日 17:01

XXL Job 是一个开源的分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、易扩展的分布式任务调度框架。

这两天咱们开发的 AI Cloud 项目中,也使用到了 XXL Job 来执行分布式任务的调度,可以看出它的部署和使用虽然步骤很多,但用起来还是很简单的。

因为其本身为 Spring Boot 项目,所有对于 Java 程序员来说很友好,而且它还提供中文控制台,所以这也是他能在国内分布式任务调度系统这块一直流行的原因,如下图所示:
image.png

那么接下来咱们就来聊聊,XXL Job 的路由策略,以及路由策略中分片任务的执行原理。

1.路由策略

XXL Job 的路由策略主要作用是在任务执行器集群环境中,决定如何选择合适的执行器来执行任务。

XXL Job 路由策略包含以下几个:
image.png
其中:

  1. 第一个:选取执行器管理的注册地址列表中的第一个执行器来执行任务;
  2. 最后一个:选取执行器管理的注册地址列表中的最后一个执行器来执行任务;
  3. 轮询:依次选取执行器管理的注册地址列表中的执行器,周而复始。为了应对多个定时任务同时触发带来的数据一致性问题,XXL-JOB 使用一个静态的同步 Map 来存储每个任务的 jobId 和其对应的计数。每次计数增加后,对执行器地址列表的数量取余,将结果作为索引来获取对应的执行器地址。如果超过 24 小时没有触发调用该任务,会清空 Map 以释放一定空间;
  4. 随机:从执行器管理的注册地址列表中随机选取一个执行器来执行任务;
  5. 一致性 HASH:实现一致性 HASH 负载均衡算法;
  6. 最不经常使用:选择最近最少被调度的执行器执行任务(通过次数维度选取任务);
  7. 最近最久未使用:选择距离上次被调度时间最长的执行器执行任务(通过时间维度选取任务),有助于平衡各执行器的工作负载;
  8. 故障转移:在任务路由策略选择“故障转移”的情况下,如果执行器集群中的某一台机器出现故障,将会自动 Failover 切换到一台正常的执行器发送调度请求;
  9. 忙碌转移:当任务分配到某个执行器时,如果该执行器正处于忙碌状态(可能正在执行其他任务或资源紧张),则会尝试将任务转移到其他相对空闲的执行器上执行;
  10. 分片广播:选取执行器管理的注册地址列表中的所有地址,每个地址都执行一次任务。这种方式类似于 MQ 的广播模式,可以将任务广播到集群中的所有执行器上执行。此策略适用于需要在多个执行器上同时执行相同任务的场景,例如数据同步或分布式计算等。

也就是说在这些路由策略中,最复杂的就是分片广播了。

2.分片任务实现

所谓的分片广播也就是分片(执行)任务,它是将一个大任务划分为多个子任务并行执行,以提高效率。

假设,我们现在要使用分片任务执行一个大数据的查询与处理,此时的实现代码如下:

import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.log.XxlJobLogger;
import org.springframework.stereotype.Component;

import java.util.List;

@Component
public class ShardingBroadcastJob {

    @XxlJob("shardingBroadcastTask") 
    public void execute(String param) {
        // 获取分片参数:分片总数和分片序列号
        int shardIndex = XxlJobHelper.getShardIndex(); 
        int shardTotal = XxlJobHelper.getShardTotal(); 

        XxlJobLogger.log("当前节点的 index={}, 总结点数={}, 参数={}", shardIndex, shardTotal, param);

        // 模拟获取数据列表
        List<String> dataList = getDataList(); 

        // 执行分片逻辑
        shardingExecute(dataList, shardIndex, shardTotal);
    }

    public List<String> getDataList() {
        // 这里可以根据实际情况从数据库或其他数据源获取数据列表
        // 为了示例简单,直接返回一个固定的列表
        return List.of("data1", "data2", "data3", "data4", "data5", "data6", "data7", "data8", "data9", "data10");
    }

    public void shardingExecute(List<String> dataList, int shardIndex, int shardTotal) {
        XxlJobLogger.log("开始执行分片任务,当前分片={}, 总分片数={}", shardIndex, shardTotal);

        // 计算当前分片应处理的数据范围
        int start = (shardIndex * dataList.size()) / shardTotal;
        int end = ((shardIndex + 1) * dataList.size()) / shardTotal;

        // 处理当前分片的数据
        for (int i = start; i < end; i++) {
            String data = dataList.get(i);
            XxlJobLogger.log("处理数据: {}", data);
            // 在此处添加具体的数据处理逻辑
        }

        XxlJobLogger.log("分片任务执行完成");
    }
}

在上述代码中,在execute方法中,通过 XxlJobHelper.getShardIndex() 获取当前分片序号,通过 XxlJobHelper.getShardTotal() 获取总分片数。然后模拟获取了一个数据列表 dataList,接下来使用 shardingExecute 方法执行分片逻辑。

在 shardingExecute 方法中,根据分片序号和总分片数计算出当前分片应处理的数据范围,然后遍历该范围内的数据并进行处理(此处仅打印数据,实际应用中可添加具体的数据处理逻辑)。

在实际使用时,需要将任务部署到 XXL Job 执行器集群中,并在调度中心配置相应的任务,选择分片广播的路由策略。这样,当调度中心触发任务时,所有执行器都会执行该任务,并根据分片参数处理相应的数据分片,这样就能提升程序整体的执行效率了。

3.执行原理

了解了 XXL Job 的代码实现就能明白其运行原理,它的实现原理如下:

  1. 任务配置与分发:在 XXL Job 的调度中心,用户通过 Web 界面创建一个分片广播类型的任务,并设置相应的参数,如分片总数(shardingTotalCount)。当调度触发时,调度中心会将此任务广播至所有注册的执行器。
  2. 分片参数传递:每个执行器在接收到广播的任务时,会自动获得分片参数,包括分片总数和当前执行器应该处理的分片序号(shardingItem)。这些参数由 XXL Job 框架自动注入,使得执行器能够知道它应当处理哪个数据分片。
  3. 分片逻辑执行:实际的分片逻辑需要在执行器的任务处理器代码中实现,开发者需根据分片序号和总数,决定处理哪些数据。这通常涉及对数据源的分片访问,如数据库查询时使用分页查询或者 ID 取模等方法来确定每个执行器处理的数据范围。然后各个执行器并行处理各自分片的数据,互不影响。
  4. 结果汇总:由于是广播任务,每个执行器处理的是全量数据的一个子集,因此不存在汇总操作,每个执行器独立完成自己的处理逻辑。如果需要最终汇总结果,需要额外的逻辑来收集和整合各个执行器的输出。

课后思考

在分片任务时,如果其中某台机器掉电了导致结果一直未能正常返回,XXL Job 会如何处理?XXL Job 怎么保证任务只会被执行一次的?

本文已收录到我的面试小站 www.javacn.site,其中包含的内容有:Redis、JVM、并发、并发、MySQL、Spring、Spring MVC、Spring Boot、Spring Cloud、MyBatis、设计模式、消息队列等模块。

From:https://www.cnblogs.com/vipstone/p/18308025
本文地址: http://shuzixingkong.net/article/84
0评论
提交 加载更多评论
其他文章 (开源)都进来!简单易懂、功能强大的权限+可视化流程管理系统
1、预览地址:http://139.155.137.144:9012 2、qq群:801913255 一、前言 随着网络的发展,企业对于信息系统数据的保密工作愈发重视,不同身份、角色对于数据的访问权限都应该大相径庭。 列如 1、不同登录人员对一个数据列表的可见度是不一样的,如数据列、数据行、数据按钮
(开源)都进来!简单易懂、功能强大的权限+可视化流程管理系统 (开源)都进来!简单易懂、功能强大的权限+可视化流程管理系统 (开源)都进来!简单易懂、功能强大的权限+可视化流程管理系统
表格集算表高性能原理:揭秘纯前端百万行数据秒级响应的魔法
最新技术资源(建议收藏) https://www.grapecity.com.cn/resources/ 集算表 (Table Sheet)是一个具备高性能渲染、数据绑定功能、公式计算能力的数据表格,通过全新构建的关系型数据管理器结合结构化公式,在高性能表格的基础上提供排序、筛选、样式、行列冻结、自
表格集算表高性能原理:揭秘纯前端百万行数据秒级响应的魔法 表格集算表高性能原理:揭秘纯前端百万行数据秒级响应的魔法 表格集算表高性能原理:揭秘纯前端百万行数据秒级响应的魔法
Netcode for Entities如何添加自定义序列化,让GhostField支持任意类型?以int3为例(1.2.3版本)
一句话省流:很麻烦也很抽象,能用内置支持的类型就尽量用。 首先看文档。官方文档里一开头就列出了所有内置的支持的类型:Ghost Type Templates 其中Entity类型需要特别注意一下:在同步这个类型的时候,如果是刚刚Instantiate的Ghost(也就是GhostId尚未生效,上一篇
Java21的虚拟线程Virtual Thread初体验
我们之前使用的是操作系统平台的线程,就称之为“系统线程”吧。虚拟线程是JDK维护的,原理跟WebFlux的底层实现差不多,都是工作线程分离。 要使用虚拟线程,需要使用JDK21以上,包括21。 虚拟线程可以创建很多很多 系统线程不能轻易创建太多,我们一直被教导创建线程是很重的活动。 for (int
Java21的虚拟线程Virtual Thread初体验
开启GitLab的邮件通知功能以及一些外观配置
前言 维护GitLab的同事离职了 刚好又有新实习生需要申请账号 只能我来出手了 其实之前安装了 GitLab 之后一直还是用得比较粗糙的 属于是勉强能用的水平,有些配置都还没改好 这次把邮件功能、域名、外观啥的配置好了,写篇文章记录一下 目录结构 先来回顾一下 GitLab 的目录结构 我们的 G
开启GitLab的邮件通知功能以及一些外观配置
解码 xsync 的 map 实现
解码 xsync 的 map 实现 最近在寻找 Go 的并发 map 库的时候,翻到一个 github 宝藏库,xsync (https://github.com/puzpuzpuz/xsync) 。这个库提供了一些支持并发的数据结构,计数器Counter,哈希 Map,队列Queue。我着重看了下
解码 xsync 的 map 实现 解码 xsync 的 map 实现 解码 xsync 的 map 实现
为视觉语言多模态模型进行偏好优化
为视觉语言多模态模型进行偏好优化 训练模型使得它能够理解并预测人类偏好是一项比较复杂的任务。诸如 SFT (Supervised finetuning) 的传统的方法一般都需要耗费较大成本,因为这些算法需要对数据打上特定的标签。而偏好优化 (Preference Optimization) 作为一种
为视觉语言多模态模型进行偏好优化 为视觉语言多模态模型进行偏好优化 为视觉语言多模态模型进行偏好优化
设计模式-C#实现简单工厂模式
前言 上一篇文章写了如何使用RabbitMQ做个简单的发送邮件项目,然后评论也是比较多,也是准备去学习一下如何确保RabbitMQ的消息可靠性,但是由于时间原因,先来说说设计模式中的简单工厂模式吧! 在了解简单工厂模式之前,我们要知道C#是一款面向对象的高级程序语言。它有3大特性,封装、继承、多态。
设计模式-C#实现简单工厂模式