首页 星云 工具 资源 星选 资讯 热门工具
:

PDF转图片 完全免费 小红书视频下载 无水印 抖音视频下载 无水印 数字星空

服务重启了,如何保证线程池中的数据不丢失?

编程知识
2024年08月30日 10:39

大家好,我是苏三,又跟大家见面了。

前言

最近有位小伙伴在我的技术群里,问了我一个问题:服务down机了,线程池中如何保证不丢失数据?

这个问题挺有意思的,今天通过这篇文章,拿出来跟大家一起探讨一下。

1 什么是线程池?

之前没有线程池的时候,我们在代码中,创建一个线程有两种方式:

  1. 继承Thread类
  2. 实现Runnable接口

虽说通过这两种方式创建一个线程,非常方便。

但也带来了下面的问题:

  1. 创建和销毁一个线程,都是比较耗时,频繁的创建和销毁线程,非常影响系统的性能。
  2. 无限制的创建线程,会导致内存不足。
  3. 有新任务过来时,必须要先创建好线程才能执行,不能直接复用线程。

为了解决上面的这些问题,Java中引入了:线程池

它相当于一个存放线程的池子。

使用线程池带来了下面3个好处:

  1. 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  2. 提高响应速度。当任务到达时,可以直接使用已有空闲的线程,不需要的等到线程创建就能立即执行。
  3. 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性。而如果我们使用线程池,可以对线程进行统一的分配、管理和监控。

2 线程池原理

先看看线程池的构造器:

public ThreadPoolExecutor(
    int corePoolSize,
    int maximumPoolSize,
    long keepAliveTime,
    TimeUnit unit,
    BlockingQueue<Runnable> workQueue,
    ThreadFactory threadFactory,
    RejectedExecutionHandler handler)
  • corePoolSize:核心线程数,线程池维护的最少线程数。
  • maximumPoolSize:最大线程数,线程池允许创建的最大线程数。
  • keepAliveTime:线程存活时间,当线程数超过核心线程数时,多余的空闲线程的存活时间。
  • unit:时间单位。
  • workQueue:任务队列,用于保存等待执行的任务。
  • threadFactory:线程工厂,用于创建新线程。
  • handler:拒绝策略,当任务无法执行时的处理策略。

线程池的核心流程图如下:

线程池的工作过程如下:

  1. 线程池初始化:根据corePoolSize初始化核心线程。
  2. 任务提交:当任务提交到线程池时,根据当前线程数判断:
  • 若当前线程数小于corePoolSize,创建新的线程执行任务。
  • 若当前线程数大于或等于corePoolSize,任务被加入workQueue队列。
  1. 任务处理:当有空闲线程时,从workQueue中取出任务执行。
  2. 线程扩展:若队列已满且当前线程数小于maximumPoolSize,创建新的线程处理任务。
  3. 线程回收:当线程空闲时间超过keepAliveTime,多余的线程会被回收,直到线程数不超过corePoolSize。
  4. 拒绝策略:若队列已满且当前线程数达到maximumPoolSize,则根据拒绝策略处理新任务。

说白了在线程池中,多余的任务会被放到workQueue任务队列中。

这个任务队列的数据保存在内存中。

这样就会出现一些问题。

接下来,看看线程池有哪些问题。

3 线程池有哪些问题?

在JDK中为了方便大家创建线程池,专门提供了Executors这个工具类。

3.1 队列过大

Executors.newFixedThreadPool,它可以创建固定线程数量的线程池,任务队列使用的是LinkedBlockingQueue,默认最大容量是Integer.MAX_VALUE。

public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(nThreads, 
                               nThreads,
                                     0L, 
                  TimeUnit.MILLISECONDS,
     new LinkedBlockingQueue<Runnable>(),
                          threadFactory);
}

如果向newFixedThreadPool线程池中提交的任务太多,可能会导致LinkedBlockingQueue非常大,从而出现OOM问题。

3.2 线程太多

Executors.newCachedThreadPool,它可以创建可缓冲的线程池,最大线程数量是Integer.MAX_VALUE,任务队列使用的是SynchronousQueue。

public static ExecutorService newCachedThreadPool() {
  return new ThreadPoolExecutor(0, 
                Integer.MAX_VALUE,
                               60L, 
                  TimeUnit.SECONDS,
    new SynchronousQueue<Runnable>());
}

如果向newCachedThreadPool线程池中提交的任务太多,可能会导致创建大量的线程,也会出现OOM问题。

3.3 数据丢失

如果线程池在执行过程中,服务突然被重启了,可能会导致线程池中的数据丢失。

上面的OOM问题,我们在日常开发中,可以通过自定义线程池的方式解决。

比如创建这样的线程池:

new ThreadPoolExecutor(8, 
                       10,
                       30L, 
     TimeUnit.MILLISECONDS,
    new ArrayBlockingQueue<Runnable>(300),
            threadFactory);

自定义了一个最大线程数量和任务队列都在可控范围内线程池。

这样做基本上不会出现OOM问题。

但线程池的数据丢失问题,光靠自身的功能很难解决。

4 如何保证数据不丢失?

线程池中的数据,是保存到内存中的,一旦遇到服务器重启了,数据就会丢失。

之前的系统流程是这样的:

用户请求过来之后,先处理业务逻辑1,它是系统的核心功能。

然后再将任务提交到线程池,由它处理业务逻辑2,它是系统的非核心功能。

但如果线程池在处理的过程中,服务down机了,此时,业务逻辑2的数据就会丢失。

那么,如何保证数据不丢失呢?

答:需要提前做持久化

我们优化的系统流程如下:

用户请求过来之后,先处理业务逻辑1,紧接着向DB中写入一条任务数据,状态是:待执行。

处理业务逻辑1和向DB写任务数据,可以在同一个事务中,方便出现异常时回滚。

然后有一个专门的定时任务,每个一段时间,按添加时间升序,分页查询状态是待执行的任务。

最早的任务,最先被查出来。

然后将查出的任务提交到线程池中,由它处理业务逻辑2。

处理成功之后,修改任务的待执行状态为:已执行。

需要注意的是:业务逻辑2的处理过程,要做幂等性设计,同一个请求允许被执行多次,其结果不会有影响。

如果此时,线程池在处理的过程中,服务down机了,业务逻辑2的数据会丢失。

但此时DB中保存了任务的数据,并且丢失那些任务的状态还是:待执行。

在下一次定时任务周期开始执行时,又会将那些任务数据重新查询出来,重新提交到线程池中。

业务逻辑2丢失的数据,又自动回来了。

如果要考虑失败的情况,还需要在任务表中增加一个失败次数字段。

在定时任务的线程池中执行业务逻辑2失败了,在下定时任务执行时可以自动重试。

但不可能无限制的一直重试下去。

当失败超过了一定的次数,可以将任务状态改成:失败。

这样后续可以人工处理。

 

最后说一句(求关注,别白嫖我)
如果这篇文章对您有所帮助,或者有所启发的话,帮忙扫描下发二维码关注一下,您的支持是我坚持写作最大的动力。

求一键三连:点赞、转发、在看。
关注公众号:【苏三说技术】,在公众号中回复:面试、代码神器、开发手册、时间管理有超赞的粉丝福利,另外回复:加群,可以跟很多BAT大厂的前辈交流和学习。

From:https://www.cnblogs.com/12lisu/p/18388411
本文地址: http://www.shuzixingkong.net/article/1579
0评论
提交 加载更多评论
其他文章 工作 6 年,@Transactional 注解用的一塌糊涂
接手新项目一言难尽,别的不说单单就一个 @Transactional 注解用的一塌糊涂,五花八门的用法,很大部分还失效无法回滚。 有意识的在涉及事务相关方法上加@Transactional注解,是个好习惯。不过,很多同学只是下意识地添加这个注解,一旦功能正常运行,很少有人会深入验证异常情况下事务是否
工作 6 年,@Transactional 注解用的一塌糊涂
好多kafka难题啊,看看其中的化解之道
前面已经分享过几篇面试了,这是一篇关于更加面向项目和技术的面经详解,第一次遇见问那么多kafka的问题,看看这个粉丝是怎么回答的。
.NET 开源报表神器 Seal-Report
前言 Seal-Report 是一款.NET 开源报表工具,拥有 1.4K Star。它提供了一个完整的框架,使用 C# 编写,最新的版本采用的是 .NET 8.0 。 它能够高效地从各种数据库或 NoSQL 数据源生成日常报表,并支持执行复杂的报表任务。 其简单易用的安装过程和直观的设计界面,我们
.NET 开源报表神器 Seal-Report .NET 开源报表神器 Seal-Report .NET 开源报表神器 Seal-Report
LOTO示波器统计曲线和故障分析pass/fail测试
LOTO示波器统计曲线和故障分析pass/fail测试 虚拟示波器可以应用在工业自动化检测中,除了常规的检测波形和测量值参数以外,由多个行业客户定制和验证的统计曲线和故障分析(pass/fail)功能也为工业自动化检测带来极大的便利。 (一)故障分析(pass/fail)的基础:统计曲线功能 在信号
LOTO示波器统计曲线和故障分析pass/fail测试 LOTO示波器统计曲线和故障分析pass/fail测试 LOTO示波器统计曲线和故障分析pass/fail测试
投屏协议
AirPlay协议是苹果开发、广泛应用于iPhone、iPad和Mac设备,可以通过WiFi将iPhone、iPad等iOS设备上的图片、音频、视频通过无线的方式传输到支持AirPlay 设备。即移动终端显示什么电视大屏就显示什么。随着AirPlay协议逐步普及,国内越来越多网络机顶盒,智能电视都集
POA:已开源,蚂蚁集团提出同时预训练多种尺寸网络的自监督范式 | ECCV 2024
论文提出一种新颖的POA自监督学习范式,通过弹性分支设计允许同时对多种尺寸的模型进行预训练。POA可以直接从预训练teacher生成不同尺寸的模型,并且这些模型可以直接用于下游任务而无需额外的预训练。这个优势显著提高了部署灵活性,并有助于预训练的模型在各种视觉任务中取得SOTA结果。 来源:晓飞的算
POA:已开源,蚂蚁集团提出同时预训练多种尺寸网络的自监督范式 | ECCV 2024 POA:已开源,蚂蚁集团提出同时预训练多种尺寸网络的自监督范式 | ECCV 2024 POA:已开源,蚂蚁集团提出同时预训练多种尺寸网络的自监督范式 | ECCV 2024
[WPF]数据绑定时为何会出现StringFormat失效
在数据绑定过程中,我们经常会使用StringFormat对要显示的数据进行格式化,以便获得更为直观的展示效果,但在某些情况下格式化操作并未生效,例如 Button的 Content属性以及ToolTip属性绑定数据进行StringFormat时是无效的。首先回顾一下StringFormat的基本用法
[WPF]数据绑定时为何会出现StringFormat失效 [WPF]数据绑定时为何会出现StringFormat失效
折腾 Quickwit,Rust 编写的分布式搜索引擎 - 可观测性之分布式追踪
概述 分布式追踪是一种跟踪应用程序请求流经不同服务(如前端、后端、数据库等)的过程。它是一个强大的工具,可以帮助您了解应用程序的工作原理并调试性能问题。 Quickwit 是一个用于索引和搜索非结构化数据的云原生引擎,这使其非常适合用作追踪数据的后端。 此外,Quickwit 本地支持 OpenTe
折腾 Quickwit,Rust 编写的分布式搜索引擎 - 可观测性之分布式追踪 折腾 Quickwit,Rust 编写的分布式搜索引擎 - 可观测性之分布式追踪 折腾 Quickwit,Rust 编写的分布式搜索引擎 - 可观测性之分布式追踪