首页 星云 工具 资源 星选 资讯 热门工具
:

PDF转图片 完全免费 小红书视频下载 无水印 抖音视频下载 无水印 数字星空

简单设计一个JAVA并行处理工具类

编程知识
2024年08月06日 09:05

在工作中,我们肯定遇到过一个接口要处理N多事项导致接口响应速度很慢的情况,通常我们会综合使用两种方式来提升接口响应速度

  1. 优化查询SQL,提升查询效率
  2. 开启多线程并发处理业务数据

这里讨论第二种方案:使用多线程并发处理业务数据,最后处理完成以后,拼装起来返回给前端,每个人的实现方案都不一样,我在工作的这几年也经历了几种写法。

一、几种常见的并行处理写法

方法一:Future写法

其代码形式如下

@Test
public void test1() {
    //定义线程池
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 4, 30,
                    TimeUnit.SECONDS,
                    new ArrayBlockingQueue<>(10),
                    Executors.defaultThreadFactory(),
                    new ThreadPoolExecutor.DiscardPolicy());
    //异步执行
    Future<String> getUserName = threadPoolExecutor.submit(() -> {
        //do something...
        return "kdyzm";
    });
    //异步执行
    Future<Integer> getUserAge = threadPoolExecutor.submit(() -> {
        //do something...
        return 12;
    });
    //拼装回调结果
    try {
        UserInfo user = new UserInfo();
        user.setName(getUserName.get());
        user.setAge(getUserAge.get());
        log.info(JsonUtils.toPrettyString(user));
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
}

@Data
static class UserInfo {
    private String name;
    private Integer age;
}

多几个submit一起执行,最后集中get获取最终结果。

这种方式任务一旦多了,就会显得代码很乱,一堆的变量名会让代码可读性很差。

方法二:CompletableFuture.allOf写法

其代码形式如下

@Test
public void test2() {
    try {
        UserInfo userInfo = new UserInfo();
        
        CompletableFuture.allOf(
            	//异步执行
                CompletableFuture.runAsync(() -> {
                    userInfo.setName("kdyzm");
                }),
            	//异步执行
                CompletableFuture.runAsync(() -> {
                    userInfo.setAge(12);
                })
        //同步返回
        ).get();

        log.info(JsonUtils.toPrettyString(userInfo));
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
}

@Data
static class UserInfo {
    private String name;
    private Integer age;
}

这种方法使用了CompletableFuture的API,通过将多个异步任务收集起来统一调度最后通过一个get方法同步到主线程。比直接使用Future简化了些。

方法三:CompletableFuture::join写法

其代码形式如下

@Test
public void test3(){
    UserInfo userInfo = new UserInfo();
    Arrays.asList(
			//异步执行
            CompletableFuture.supplyAsync(()->{
                return "kdyzm";
            //回调执行
            }).thenAccept(name->{
                userInfo.setName(name);
            }),

        	//异步执行
            CompletableFuture.supplyAsync(()->{
                return 12;
            //回调执行
            }).thenAccept(age->{
                userInfo.setAge(age);
            })
        
        //等待所有线程执行完毕
    ).forEach(CompletableFuture::join);

    log.info(JsonUtils.toPrettyString(userInfo));

}

@Data
static class UserInfo {
    private String name;
    private Integer age;
}

这种写法和上面的写法相比具有更高的可读性,但是它也有缺点:thenAccept只能接收一个返回值,如果想处理多个值,则没有办法,只能使用方法2。

总结

几种写法中第二、三种写法比较常见,使用起来也更加方便,两者各有优缺点:方法2能处理多个返回值,方法3可读性更高。但是无论是方法2还是方法3,它们的使用总是要记住相关的API,使用起来总不是很顺手,可读性虽然方法3更强一些,但是总还是差点意思。此时我就有了自己设计一个简单的并行处理工具类的想法,既要易用,还要可读性高。

二、并行处理工具类设计

1、设计模式选型

因为平时比较喜欢链式调用的API,所以一开始一开始设计,我就想用建造者模式来实现这个工具类。关于建造者模式,详情可以看我之前的文章:设计模式(六):建造者模式 。建造者模式在实际应用中的特点就是链式调用,无论是StringBuilder还是lombok的@Data注解,都使用了建造者模式。

2、第一版代码

仿照方法三,我开发了第一版代码

import lombok.Data;
import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.Supplier;

/**
 * @author kdyzm
 */
@Slf4j
public class ConcurrentWorker {

    private List<Task> workers = new ArrayList<>();

    public static ConcurrentWorker runner() {
        return new ConcurrentWorker();
    }

    public <R> ConcurrentWorker addTask(Consumer<? super R> action, Supplier<R> value) {
        Task<R> worker = new Task<>(action, value);
        this.workers.add(worker);
        return this;
    }

    public void run() {
        workers.forEach(item -> {
            CompletableFuture completableFuture = CompletableFuture.supplyAsync(item.getValue());
            item.setCompletableFuture(completableFuture);
        });
        workers
                .stream()
                .map(
                        item -> {
                            return item.completableFuture.thenAccept(item.getAction());
                        }
                )
                .forEach(CompletableFuture::join);
    }

    @Data
    public static class Task<R> {
        private Consumer<? super R> action;
        private Supplier<R> value;
        private CompletableFuture<R> completableFuture;

        public Task(Consumer<? super R> action, Supplier<R> value) {
            this.action = action;
            this.value = value;
        }
    }
}

这段代码一共不到60行,使用了Lambda表达式和函数式编程相关的API对方法三进行改造,最终使用效果如下

@Test
    public void test() {

        UserInfo userInfo = new UserInfo();

        ConcurrentWorker.runner()
            	//添加任务
                .addTask(userInfo::setName, () -> {
                    //延迟1000毫秒打印线程执行情况
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    log.info(Thread.currentThread().getName()+"-name");
                    return "张三";
                })
            	//添加任务
                .addTask(userInfo::setAge, () -> {
                    //延迟1000毫秒打印线程执行情况
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    log.info(Thread.currentThread().getName()+"-age");
                    return 13;
                })
            	//执行任务
                .run();
        log.info(JsonUtils.toPrettyString(userInfo));
    }

    @Data
    static class UserInfo {
        private String name;
        private Integer age;
        private String sex;
    }

它的使用方式就是

ConcurrentWorker.runner()
                .addTask(setter function, return_value function )
    			.addTask(setter function, return_value function)
    			.run()

可以看到易用性够了,可读性也很好,但是它的缺点和方法三一样,都只能接收一个参数,毕竟它是根据方法3封装的,接下来改造代码让它支持多参数处理。

3、第二版代码

已知,第一版代码已经支持了如下形式的功能

ConcurrentWorker.runner()
                .addTask(setter function, return_value function )
    			.addTask(setter function, return_value function)
    			.run()

现在我想添加以下形式的重载方法

.addTask(handle function)

没错,就一个参数,在这个方法中可以任意设置对象值。最终使用的效果如下

@Test
public void test() {

    UserInfo userInfo = new UserInfo();

    ConcurrentWorker.runner()
            .addTask(userInfo::setName, () -> {
                try {
                    Thread.sleep(1000);
                    log.info(Thread.currentThread().getName());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                log.info(Thread.currentThread().getName()+"-name");
                return "张三";
            })
            .addTask(userInfo::setAge, () -> {
                try {
                    Thread.sleep(1000);
                    log.info(Thread.currentThread().getName());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                log.info(Thread.currentThread().getName()+"-age");
                return 13;
            })
        	//新方法:处理任意多属性值填充
            .addTask(()->{
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                log.info(Thread.currentThread().getName()+"-sex");
                userInfo.setSex("男");
            })
            .run();
    log.info(JsonUtils.toPrettyString(userInfo));
}

@Data
static class UserInfo {
    private String name;
    private Integer age;
    private String sex;
}

完整工具类方法如下

import lombok.Data;
import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.Supplier;

/**
 * @author kdyzm
 */
@Slf4j
public class ConcurrentWorker {

    private List<Task> workers = new ArrayList<>();

    public static ConcurrentWorker runner() {
        return new ConcurrentWorker();
    }

    public <R> ConcurrentWorker addTask(Consumer<? super R> action, Supplier<R> value) {
        Task<R> worker = new Task<>(action, value);
        this.workers.add(worker);
        return this;
    }

    public <R> ConcurrentWorker addTask(Runnable runnable) {
        Task<R> worker = new Task<>(runnable);
        this.workers.add(worker);
        return this;
    }

    public void run() {
        workers.forEach(item -> {
            int taskType = item.getTaskType();
            CompletableFuture completableFuture = null;
            switch (taskType) {
                case TaskType.RETURN_VALUE:
                    completableFuture = CompletableFuture.supplyAsync(item.getValue());
                    break;
                case TaskType.VOID_RETURN:
                    completableFuture = CompletableFuture.runAsync(item.getRunnable());
                    break;
                default:
                    break;
            }
            item.setCompletableFuture(completableFuture);
        });
        workers
                .stream()
                .map(
                        item -> {
                            int taskType = item.getTaskType();
                            switch (taskType) {
                                case TaskType.RETURN_VALUE:
                                    return item.completableFuture.thenAccept(item.getAction());
                                default:
                                    return item.completableFuture.thenAccept(temp->{
                                        //空
                                    });
                            }
                        }
                )
                .forEach(CompletableFuture::join);
    }

    @Data
    public static class Task<R> {
        private Consumer<? super R> action;
        private Supplier<R> value;
        private CompletableFuture<R> completableFuture;
        private Runnable runnable;
        private int taskType;

        public Task(Consumer<? super R> action, Supplier<R> value) {
            this.action = action;
            this.value = value;
            this.taskType = TaskType.RETURN_VALUE;
        }


        public Task(Runnable runnable) {
            this.runnable = runnable;
            this.taskType = TaskType.VOID_RETURN;
        }
    }


    public static class TaskType {

        /**
         * 有返回值的
         */
        public static final int RETURN_VALUE = 1;

        /**
         * 没有返回值的
         */
        public static final int VOID_RETURN = 2;
    }
}

我将任务类型分为两种,并使用TaskType类封装成常量值:1表示任务执行回调有返回值;2表示任务执行没有返回值,属性填充将在任务执行过程中完成,该类型任务使用Runnable接口实现。

4、工具类jar包

相关代码我已经打包成jar包上传到maven中央仓库,可以通过引入以下maven依赖使用ConcurrentWorker工具类

<dependency>
    <groupId>cn.kdyzm</groupId>
    <artifactId>kdyzm-util</artifactId>
    <version>0.0.2</version>
</dependency>


最后,欢迎关注我的博客:https://blog.kdyzm.cn

END.

From:https://www.cnblogs.com/kuangdaoyizhimei/p/18344600
本文地址: http://shuzixingkong.net/article/831
0评论
提交 加载更多评论
其他文章 .NET 开源权限认证项目 MiniAuth上线
前言 在Web应用项目中权限认证是个绕不开的话题,传统方法复杂又耗时。MiniAuth推出专为.NET开发者设计的简单、实用的权限认证项目。 MiniAuth,作为ASP.NET Core的插件,让我们快速轻松实现用户登录、权限检查等功能。它支持多种认证方式,如JWT、Cookie,且易于集成到现有
.NET 开源权限认证项目 MiniAuth上线 .NET 开源权限认证项目 MiniAuth上线 .NET 开源权限认证项目 MiniAuth上线
SemanticKernel/C#:实现接口,接入本地嵌入模型
前言 本文通过Codeblaze.SemanticKernel这个项目,学习如何实现ITextEmbeddingGenerationService接口,接入本地嵌入模型。 项目地址:https://github.com/BLaZeKiLL/Codeblaze.SemanticKernel 实践 Se
SemanticKernel/C#:实现接口,接入本地嵌入模型 SemanticKernel/C#:实现接口,接入本地嵌入模型 SemanticKernel/C#:实现接口,接入本地嵌入模型
《最新出炉》系列初窥篇-Python+Playwright自动化测试-64 - Canvas和SVG元素推拽
1.简介 今天宏哥分享的在实际测试工作中很少遇到,比较生僻,如果突然遇到我们可能会脑大、懵逼,一时之间不知道怎么办?所以宏哥这里提供一种思路供大家学习和参考。 2.SVG简介 svg也是html5新增的一个标签,它跟canvas很相似。都可以实现绘图、动画。但是svg绘制出来的都是矢量图,不像can
《最新出炉》系列初窥篇-Python+Playwright自动化测试-64 - Canvas和SVG元素推拽 《最新出炉》系列初窥篇-Python+Playwright自动化测试-64 - Canvas和SVG元素推拽 《最新出炉》系列初窥篇-Python+Playwright自动化测试-64 - Canvas和SVG元素推拽
随时随地与 LLMs 聊天的开源项目「GitHub 热点速览」
众所周知,本地运行 LLMs 需要下载模型(体积大),并且还比较吃硬件配置。近日 GitHub 推出了 GitHub Models 服务,让开发者可以在 GitHub 上免费测试 Llama、Phi 3、Mistral 和 GPT-4o 等大模型。但是,目前该服务仍处于公测阶段,类似早期的 Copi
随时随地与 LLMs 聊天的开源项目「GitHub 热点速览」 随时随地与 LLMs 聊天的开源项目「GitHub 热点速览」 随时随地与 LLMs 聊天的开源项目「GitHub 热点速览」
神秘 Arco 样式出现,祭出 Webpack 解决预期外的引用问题
神秘 Arco 样式出现,祭出 Webpack 解决预期外的引用问题 Webpack是现代化的静态资源模块化管理和打包工具,其能够通过插件配置处理和打包多种文件格式,生成优化后的静态资源,核心原理是将各种资源文件视为模块,通过配置文件定义模块间的依赖关系和处理规则,从而实现模块化开发。Webpack
ComfyUI插件:efficiency-nodes-comfyui节点
前言: 学习ComfyUI是一场持久战, efficiency-nodes-comfyui是提高工作流创造效率的工具,包含效率节点整合工作流中的基础功能,比如Efficient Loader节点相当于Load Checkpoint+Clip set layer+Load VAE等等的合集,并且该插件
ComfyUI插件:efficiency-nodes-comfyui节点 ComfyUI插件:efficiency-nodes-comfyui节点 ComfyUI插件:efficiency-nodes-comfyui节点
Spring Boot 中使用 JSON Schema 来校验复杂JSON数据
JSON是我们编写API时候用于数据传递的常用格式,那么你是否知道JSON Schema呢? 在数据交换领域,JSON Schema 以其强大的标准化能力,为定义和规范 JSON 数据的结构与规则提供了有力支持。通过一系列精心设计的关键字,JSON Schema 能够详尽地描述数据的各项属性。然而,
Spring Boot 中使用 JSON Schema 来校验复杂JSON数据
深度解读KubeEdge架构设计与边缘AI实践探索
摘要:解读业界首个云原生边缘计算框架KubeEdge的架构设计,如何实现边云协同AI,将AI能力无缝下沉至边缘,让AI赋能边侧各行各业,构建智能、高效、自治的边缘计算新时代,共同探索智能边缘的新篇章。 本文分享自华为云社区《DTSE Tech Talk | 第63期:KubeEdge架构设计与边缘A
深度解读KubeEdge架构设计与边缘AI实践探索 深度解读KubeEdge架构设计与边缘AI实践探索 深度解读KubeEdge架构设计与边缘AI实践探索