首页 星云 工具 资源 星选 资讯 热门工具
:

PDF转图片 完全免费 小红书视频下载 无水印 抖音视频下载 无水印 数字星空

Elasticjob执行job幂等

编程知识
2024年07月25日 09:43

ElasticJob的幂等机制,是指作业分片执行的幂等,他需要做到以下两点:

  • 同一个分片在当前作业实例上不会被重复执行

  • 一个作业分片不能同时在多个作业实例上执行

如何实现幂等

场景模拟:存在任务A执行周期为10s一次。正常情况下任务处理耗时3-5s。但是某一时刻因为数据量突然增大或者因为数据库压力,导致任务耗时超过了10s。在该过程中,任务每10s调度一次,如果没有幂等,那么会存在一个任务同时多个调度的情况,处理相同的数据。

ElasticJob任务执行:com.dangdang.ddframe.job.executor.AbstractElasticJobExecutor#execute()

public final void execute() {
        ...
          // 获取当前作业服务器的分片上下文
        ShardingContexts shardingContexts = jobFacade.getShardingContexts();
        //是否允许可以发送作业事件
        if (shardingContexts.isAllowSendJobEvent()) {
            // 发布作业状态追踪事件
            jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_STAGING, String.format("Job '%s' execute begin.", jobName));
        }
        // 在一个调度任务触发后如果上一次任务还未执行,则需要设置该分片状态为mirefire,表示错失了一次任务执行
        if (jobFacade.misfireIfRunning(shardingContexts.getShardingItemParameters().keySet())) {
            if (shardingContexts.isAllowSendJobEvent()) {
                jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED, String.format(
                        "Previous job '%s' - shardingItems '%s' is still running, misfired job will start after previous job completed.", jobName, 
                        shardingContexts.getShardingItemParameters().keySet()));
            }
            return;
        }
        ...
    }

接下来主要看一下jobFacade.misfireIfRunning的实现逻辑

 public boolean misfireIfHasRunningItems(Collection<Integer> items) {
       // 没有分片正在运行,返回false,此次任务调度正常进行,否则设置mirefire
       if (!this.hasRunningItems(items)) {
            return false;
        } else {
            this.setMisfire(items);
            return true;
        }
    }

如果存在未完成调度的分片,则调用setMisfire(items)方法。如何判断是否有未完成调度的分片呢,看看hasRunningItems(items)的实现逻辑。

public boolean hasRunningItems(Collection<Integer> items) {
        LiteJobConfiguration jobConfig = this.configService.load(true);
        if (null != jobConfig && jobConfig.isMonitorExecution()) {
            Iterator i$ = items.iterator();

            int each;
            do {
                if (!i$.hasNext()) {
                    return false;
                }

                each = (Integer)i$.next();
            } while(!this.jobNodeStorage.isJobNodeExisted(ShardingNode.getRunningNode(each)));
            // ShardingNode.getRunningNode(each)
          /*
            public static String getRunningNode(int item) {
                return String.format("sharding/%s/running", item);
            }*/
            return true;
        } else {
            return false;
        }
    }

在Elasticjob开启monitorExecution的机制下,分片任务开始时会创建sharding/分片/running节点,任务完成后删除该节点。所以上述代码中,可以看出来,可以通过是否存在该分片的节点来判断是否有分片正在运行。

同时,调用setMisfire(items)方法的时候,根据代码判断,setMisfire(items)方法为分配给该实例下的所有分片创建持久节点/shading/{item}/misfire节点,只要分配给该实例的任何一分片未执行完毕,则在该实例下的所有分片都增加misfire节点,然后忽略本次任务触发执行,等待任务结束后再执行。

    public void setMisfire(Collection<Integer> items) {
        Iterator i$ = items.iterator();

        while(i$.hasNext()) {
            int each = (Integer)i$.next();
            this.jobNodeStorage.createJobNodeIfNeeded(ShardingNode.getMisfireNode(each));
        }
        // ShardingNode.getMisfireNode(each)
        /**
            static String getMisfireNode(int item) {
                return String.format("sharding/%s/misfire", item);
            }
        */

    }

在该执行方法中(com.dangdang.ddframe.job.executor.AbstractElasticJobExecutor#execute())

//执行job        
execute(shardingContexts, JobExecutionEvent.ExecutionSource.NORMAL_TRIGGER);

//  如果存在Misfire节点,则清除该节点
while (jobFacade.isExecuteMisfired(shardingContexts.getShardingItemParameters().keySet())) {    
  // 清除Misfire节点
  jobFacade.clearMisfire(shardingContexts.getShardingItemParameters().keySet());
  execute(shardingContexts, JobExecutionEvent.ExecutionSource.MISFIRE);
 }

总结:在下一个调度周期到达之后,只要发现这个分片的任何一个分片正在执行,则为该实例分片的所有分片都设置为misfire,等任务执行完毕后,再统一执行下一次任务调度。

From:https://www.cnblogs.com/lijchah/p/18322500
本文地址: http://shuzixingkong.net/article/407
0评论
提交 加载更多评论
其他文章 Django+Bootstrip 卡片模板设计 经典精品
下面是一个完整的卡片模板代码,包含所有元素,并使用Django的模板语言来处理状态字段的条件渲染。同时还包括示例视图和URL配置。 完整的卡片模板 &lt;div class=&quot;card&quot;&gt; &lt;!-- 卡片图片 --&gt; &lt;img src=&quot;{{
Django+Bootstrip 卡片模板设计 经典精品
使用 @Audited 增强Spring Boot 应用程序的数据审计能力
介绍 在Spring Boot开发的动态世界中,确保数据完整性和跟踪变化是至关重要的。实现这一目标的一个强大工具是@Audited注解。本文深入探讨了该注解的复杂性、其目的、实现步骤以及如何利用其功能进行有效的实体审计。 理解@Audited Spring Boot中的@Audited注解用于审计实
.NET 轻量级 命令行工具 CSharpRepl
前言 当我们需要快速测试代码片段时,常见的做法是启动Visual Studio或使用在线代码编辑器。然而,Visual Studio的启动可能较为缓慢且占用较多系统资源,而在线编辑器则可能遇到语法支持局限或网络延迟问题。 为解决这个问题,给大家推荐一款轻量级的本地C#执行工具——CSharpRepl
.NET 轻量级 命令行工具 CSharpRepl .NET 轻量级 命令行工具 CSharpRepl .NET 轻量级 命令行工具 CSharpRepl
学了十几种编程语言后,我终于悟了!
我为什么要学这么多编程语言?是怎么学习的?学了这么多语言对我有哪些好处和坏处?现在我们到底应不应该学多门语言呢?
学了十几种编程语言后,我终于悟了! 学了十几种编程语言后,我终于悟了! 学了十几种编程语言后,我终于悟了!
常回家看看之largebin_attack
常回家看看之largebin_attack 先简单介绍一下什么是largebin largebin 是 glibc 的 malloc 实现中用于管理大块内存的一种数据结构。在 glibc 的内存分配中,largebin 是 bin 系列的一部分,用于存储大小超过某个阈值的空闲内存块。largebin
常回家看看之largebin_attack 常回家看看之largebin_attack 常回家看看之largebin_attack
胜未来:国内大模型+Agent应用案例精选,以及主流Agent框架开源项目推荐
智胜未来:国内大模型+Agent应用案例精选,以及主流Agent框架开源项目推荐 Agent是以大模型为核心的智能体,通过与用户对话的形式,来完成各种任务,它很像一个“人”。如果和人做类比,它应该具备以下能力: Agent的各个要素各个子模块: 1.对话式Agent 1.1 月之暗面(Moonsho
胜未来:国内大模型+Agent应用案例精选,以及主流Agent框架开源项目推荐 胜未来:国内大模型+Agent应用案例精选,以及主流Agent框架开源项目推荐 胜未来:国内大模型+Agent应用案例精选,以及主流Agent框架开源项目推荐
manim边学边做--Table
表格是一种常见的数据展示形式,manim提供了Table模块专门用于显示表格形式的数据。表格Table和上一节介绍的矩阵Matrix都是用来显示二维数据的,不过,Table的表现力更强,比如,它可以显示表头信息,列名信息等等。 Table模块也细分了多个对象: 通用Table:显示任何内容 Deci
manim边学边做--Table manim边学边做--Table manim边学边做--Table
.NET TCP、UDP、Socket、WebSocket
做.NET应用开发肯定会用到网络通信,而进程间通信是客户端开发使用频率较高的场景。 进程间通信方式主要有命名管道、消息队列、共享内存、Socket通信,个人使用最多的是Sokcet相关。 而Socket也有很多使用方式,Socket、WebSocket、TcpClient、UdpClient,是不是