首页 星云 工具 资源 星选 资讯 热门工具
:

PDF转图片 完全免费 小红书视频下载 无水印 抖音视频下载 无水印 数字星空

.NET 高性能缓冲队列实现 BufferQueue

编程知识
2024年07月29日 19:32

目录

前言

BufferQueue 是一个用 .NET 编写的高性能的缓冲队列实现,支持多线程并发操作。

项目是从 mocha 项目中独立出来的一个组件,经过修改以提供更通用的缓冲队列功能。

目前支持的缓冲区类型为内存缓冲区,后续会考虑支持更多类型的缓冲区。

适用场景

生产者和消费者之间的速度不一致,需要并发批量处理数据的场景。

功能说明

  1. 支持创建多个 Topic,每个 Topic 可以有多种数据类型。每一对 Topic 和数据类型对应一个独立的缓冲区。

BufferQueue

  1. 支持创建多个 Consumer Group,每个 Consumer Group 的消费进度都是独立的。支持多个 Consumer Group 并发消费同一个 Topic。

  2. 支持同一个 Consumer Group 创建多个 Consumer,以负载均衡的方式消费数据。

  3. 支持数据的批量消费,可以一次性获取多条数据。

  4. 支持 pull 模式和 push 模式两种消费模式。

  5. pull 模式下和 push 模式下都支持 auto commit 和 manual commit 两种提交方式。auto commit 模式下,消费者在收到数据后自动提交消费进度,如果消费失败不会重试。manual commit 模式下,消费者需要手动提交消费进度,如果消费失败只要不提交进度就可以重试。

需要注意的是,当前版本出于简化实现的考虑,暂不支持消费者的动态扩容和缩容,需要在创建消费者时指定消费者数量。

使用示例

安装 Nuget 包:

dotnet add package BufferQueue

项目基于 Microsoft.Extensions.DependencyInjection,使用时需要先注册服务。

BufferQueue 支持两种消费模式:pull 模式和 push 模式。


builder.Services.AddBufferQueue(options =>
{
    options.UseMemory(bufferOptions =>
        {
            // 每一对 Topic 和数据类型对应一个独立的缓冲区,可以设置 partitionNumber
            bufferOptions.AddTopic<Foo>("topic-foo1", partitionNumber: 6);
            bufferOptions.AddTopic<Foo>("topic-foo2", partitionNumber: 4);
            bufferOptions.AddTopic<Bar>("topic-bar", partitionNumber: 8);
        })
        // 添加 push 模式的消费者
        // 扫描指定程序集中的标记了 BufferPushCustomerAttribute 的类,
        // 注册为 push 模式的消费者
        .AddPushCustomers(typeof(Program).Assembly);
});

// 在 HostedService 中使用 pull模式 消费数据
builder.Services.AddHostedService<Foo1PullConsumerHostService>();

pull 模式的消费者示例:

public class Foo1PullConsumerHostService(
    IBufferQueue bufferQueue,
    ILogger<Foo1PullConsumerHostService> logger) : IHostedService
{
    private readonly CancellationTokenSource _cancellationTokenSource = new();

    public Task StartAsync(CancellationToken cancellationToken)
    {
        var token = CancellationTokenSource
            .CreateLinkedTokenSource(cancellationToken, _cancellationTokenSource.Token)
            .Token;

        var consumers = bufferQueue.CreatePullConsumers<Foo>(
            new BufferPullConsumerOptions
            {
                TopicName = "topic-foo1", GroupName = "group-foo1", AutoCommit = true, BatchSize = 100,
            }, consumerNumber: 4);

        foreach (var consumer in consumers)
        {
            _ = ConsumeAsync(consumer, token);
        }

        return Task.CompletedTask;
    }

    public Task StopAsync(CancellationToken cancellationToken)
    {
        _cancellationTokenSource.Cancel();
        return Task.CompletedTask;
    }

    private async Task ConsumeAsync(IBufferPullConsumer<Foo> consumer, CancellationToken cancellationToken)
    {
        await foreach (var buffer in consumer.ConsumeAsync(cancellationToken))
        {
            foreach (var foo in buffer)
            {
                // Process the foo
                logger.LogInformation("Foo1PullConsumerHostService.ConsumeAsync: {Foo}", foo);
            }
        }
    }
}

push 模式的消费者示例:

通过 BufferPushCustomer 特性注册 push 模式的消费者。

push consumer 会被注册到 DI 容器中,可以通过构造函数注入其他服务,可以通过设置 ServiceLifetime 来控制 consumer 的生命周期。

BufferPushCustomerAttribute 中的 concurrency 参数用于设置 push consumer 的消费并发数,对应 pull consumer 的 consumerNumber。


[BufferPushCustomer(
    topicName: "topic-foo2",
    groupName: "group-foo2",
    batchSize: 100,
    serviceLifetime: ServiceLifetime.Singleton,
    concurrency: 2)]
public class Foo2PushConsumer(ILogger<Foo2PushConsumer> logger) : IBufferAutoCommitPushConsumer<Foo>
{
    public Task ConsumeAsync(IEnumerable<Foo> buffer, CancellationToken cancellationToken)
    {
        foreach (var foo in buffer)
        {
            logger.LogInformation("Foo2PushConsumer.ConsumeAsync: {Foo}", foo);
        }

        return Task.CompletedTask;
    }
}
[BufferPushCustomer(
    "topic-bar",
    "group-bar",
    100,
    ServiceLifetime.Scoped,
    2)]
public class BarPushConsumer(ILogger<BarPushConsumer> logger) : IBufferManualCommitPushConsumer<Bar>
{
    public async Task ConsumeAsync(IEnumerable<Bar> buffer, IBufferConsumerCommitter committer,
        CancellationToken cancellationToken)
    {
        foreach (var bar in buffer)
        {
            logger.LogInformation("BarPushConsumer.ConsumeAsync: {Bar}", bar);
        }

        var commitTask = committer.CommitAsync();
        if (!commitTask.IsCompletedSuccessfully)
        {
            await commitTask.AsTask();
        }
    }
}

BufferQueue 内部设计概述

Topic 的隔离

BufferQueue 有以下的特性:

  • 同一个数据类型 下的 不同 Topic 的 BufferQueue 互不干扰。

  • 同一个 Topic 下的 不同数据类型 的 BufferQueue 互不干扰。

BufferQueue

这个特性是通过以下两层接口设计实现的:

  • IBufferQueue:根据 TopicName类型参数 T 将请求转发给具体的 IBufferQueue<T> 实现(借助 KeyedService 实现),其中参数 T 代表 Buffer 所承载的数据实体的类型。

  • IBufferQueue<T>:具体的 BufferQueue 实现,负责管理 Topic 下的数据。属于 Buffer 模块的内部实现,不对外暴露。

IBufferQueue

Partition 的设计

为了保证消费速度,BufferQueue 将数据划分为多个 Partition,每个 Partition 都是一个独立的队列,每个 Partition 都有一个对应的消费者线程。

Producer 以轮询的方式往每个 Partition 中写入数据。
Consumer 最多不允许超过 Partition 的数量,Partition 按平均分配到组内每个 Customer 上。
当一个 Consumer 被分配了多个 Partition 时,以轮训的方式进行消费。
每个 Partition 上会记录不同消费组的消费进度,不同组之间的消费进度互不干扰。

Partition

对并发的支持

Producer 支持并发写入。

Consumer 消费时是绑定 Partition 的,为保证能正确管理 Partition 的消费进度,Consumer 不支持并发消费。

如果要增加消费速度,需创建多个 Consumer。

Partition 的动态扩容

Partition 的基本组成单元是 Segment,Segment 代表保存数据的数组,多个 Segment 通过链表的形式组合成一个 Partition。

当一个 Segment 写满后,通过在其后面追加一个 Segment 实现扩容。

Segment 中用于保存数据的数组的每一个元素称为 Slot,每个 Slot 都有一个Partition 内唯一的自增 Offset。

Segment

Segment 的回收机制

每次在 Partition 中新增 Segment 时,会从头判断此前的 Segment 是否已经被所有消费组消费完,回收最后一个消费完的 Segment 作为新的 Segment 追加到 Partition 末尾使用。

SegmentRecycle

Benchmark

测试环境:Apple M2 Max 64GB

写入性能测试

与 BlockingCollection 对比并发,并发线程数为 CPU 逻辑核心数 12, partitionNumber 为 1 和 12。

测试结果
Benchmark

在并发写入时,BufferQueue 的写入性能明显优于 BlockingCollection。

消费性能测试

pull 模式 consumer 与 BlockingCollection 对比并发读取性能,并发线程数为 CPU 逻辑核心数 12,partitionNumber 为 12。

测试结果
Benchmark

在批量消费时,随着批量大小的增加,BufferQueue 的消费性能优势更加明显。

From:https://www.cnblogs.com/eventhorizon/p/18331018
本文地址: http://www.shuzixingkong.net/article/569
0评论
提交 加载更多评论
其他文章 软件测试必备 - 14个接口与自动化测试练习网站
随着互联网和移动应用的快速发展,接口和自动化测试的重要性日益凸显。越来越多的企业开始重视API测试,因为它不仅能提升开发效率,还能确保系统的稳定性和安全性。这些练习网站为测试人员提供了宝贵的资源,帮助他们掌握必要的技能,应对日益复杂的测试需求。 在软件测试的世界里,接口与自动化测试是提升效率和确保质
软件测试必备 - 14个接口与自动化测试练习网站 软件测试必备 - 14个接口与自动化测试练习网站 软件测试必备 - 14个接口与自动化测试练习网站
C++命名空间、标准输入输出、引用
1、简述C++中命名空间的作用。 答:避免重复定义全局变量的问题。 2、定义两个命名空间A 和 B 分别在A中和B中定义变量value。在main函数中将两个空间的value打印出来。 #include &quot;iostream&quot; using namespace std; namesp
安全可信,Solon v2.8.6 发布
Java “纯血国产”应用开发框架:更快、更小、更简单。 并发高 2~ 3 倍;内存省 50%;调试重启可快至 10 倍;打包最多缩小 90%;同时支持 java8 ~ java22, graalvm native image 运行时。
从DDPM到DDIM(四) 预测噪声与后处理
从DDPM到DDIM(四) 预测噪声与后处理 前情回顾 下图展示了DDPM的双向马尔可夫模型。 训练目标。最大化证据下界等价于最小化以下损失函数: \[\boldsymbol{\theta}^*=\underset{\boldsymbol{\theta}}{\operatorname{argmin}
从DDPM到DDIM(四) 预测噪声与后处理
.NET周刊【7月第4期 2024-07-28】
国内文章 .NET 高性能缓冲队列实现 BufferQueue https://mp.weixin.qq.com/s/fUhJpyPqwcmb3whuV3CDyg BufferQueue 是一个用 .NET 编写的高性能的缓冲队列实现,支持多线程并发操作。 项目地址:https://github.c
.NET周刊【7月第4期 2024-07-28】 .NET周刊【7月第4期 2024-07-28】
LLM并行训练7-混合并行总结
概述 根据前面的系列文章, 对预训练大模型里用到的主要并行加速技术做了一系列拆分. 但是在实际的训练里往往是多种并行混合训练. 我们要怎么配置这些并行策略才能让训练框架尽可能的减少通信瓶颈, 提升GPU计算利用率呢? 这里的变量太多了, 以最简单的3D并行为例: 硬件层面有: 单台机器的卡数/卡间带
LLM并行训练7-混合并行总结 LLM并行训练7-混合并行总结 LLM并行训练7-混合并行总结
如何在Arch Linux上构建Raspberry Pi虚拟环境
如何在Linux上构建Raspberry Pi虚拟环境 ​ 下面我们来讲讲如何使用QEMU来仿照树莓派环境。这里首先先分成两大类。第一类是跑比较老的,安全性较低的老树莓派,主要指代的是22年4月份发布之前的版本,这个版本当中,树莓派镜像自己内部就配置了一份默认的账户密码。对于之后的版本则不配备这种默
如何在Arch Linux上构建Raspberry Pi虚拟环境 如何在Arch Linux上构建Raspberry Pi虚拟环境 如何在Arch Linux上构建Raspberry Pi虚拟环境
痞子衡嵌入式:MCUXpresso IDE下在线联合调试i.MXRT1170双核工程的三种方法
大家好,我是痞子衡,是正经搞技术的痞子。今天痞子衡给大家分享的是MCUXpresso IDE下在线联合调试i.MXRT1170双核工程的三种方法。 两年前痞子衡写过一篇《i.MXRT1170下在线联合调试双核工程的三种方法(IAR篇)》,那篇文章详细介绍了 IAR 下调试 RT1170 双核工程的几
痞子衡嵌入式:MCUXpresso IDE下在线联合调试i.MXRT1170双核工程的三种方法 痞子衡嵌入式:MCUXpresso IDE下在线联合调试i.MXRT1170双核工程的三种方法 痞子衡嵌入式:MCUXpresso IDE下在线联合调试i.MXRT1170双核工程的三种方法