首页 星云 工具 资源 星选 资讯 热门工具
:

PDF转图片 完全免费 小红书视频下载 无水印 抖音视频下载 无水印 数字星空

事务发件箱模式在 .NET 云原生开发中的应用(基于Aspire)

编程知识
2024年09月07日 14:17

原文:Transactional Outbox in .NET Cloud Native Development via Aspire
作者:Oleksii Nikiforov

总览

这篇文章提供了使用 AspireDotNetCore.CAPAzure Service BusAzure SQLBicepazd 实现 Outbox 模式的示例。

源代码: https://github.com/NikiforovAll/cap-aspire

发件箱模式简介

发件箱模式是分布式系统领域中的一个重要组件。随着现代软件开发朝着更加分布式和解耦的架构发展,确保可靠的消息传递变得越来越重要。

在分布式系统中,不同的组件需要相互通信,通常通过异步消息传递。发件箱模式提供了一种可靠的方法来处理这些消息。它确保即使系统在执行本地事务之后但在发送消息之前发生故障,消息也不会丢失。相反,它会暂时存储在“发件箱”中,并在系统恢复时检索和发送。

通过使用发件箱模式,我们可以确保系统的所有组件以可靠的方式接收必要的消息,从而确保整个系统的完整性和一致性。

在没有发件箱模式的分布式系统中,有几种场景可能会出错,从而导致数据不一致或消息丢失。以下是几个示例:

  1. 事务提交和消息发送不是原子的:在通常情况下,Service 可能首先将事务提交到其数据库,然后向消息代理(Broker)发送消息。如果服务在事务提交之后但在消息发送之前崩溃,则消息将丢失。其他服务将不知道已提交到数据库的更改。
  2. 消息发送失败:即使服务没有崩溃,由于网络问题或消息代理问题,消息发送也可能失败。如果不重试消息发送操作,消息将丢失。
  3. 重复消息:如果服务在失败后重试消息发送操作,如果第一次发送实际上成功但确认丢失,则最终可能会多次发送同一条消息。如果消息使用者不是幂等的,这可能会导致重复处理。
  4. 顺序问题:如果单个事务发送了多条消息,并且这些发送不是原子性的,则这些消息可能会无序接收。如果消息的顺序很重要,这可能会导致不正确的处理。

发件箱模式通过确保事务提交和消息发送操作是原子的,并提供即使在出现故障时也能可靠地发送消息的机制来解决这些问题。

下面是一个序列图,说明了没有发件箱模式的系统所存在的的问题:

幂等消费者在发件箱模式中扮演着重要的角色。在分布式系统的背景下,幂等性是指系统无论执行特定操作多少次都能产生相同结果的能力。这对于确保分布式环境中的数据一致性和可靠性至关重要。

然而,这可能会导致同一条消息被多次发送,尤其是在系统在发送消息后但在发件箱中将消息标记为已发送之前发生故障的情况下。这就是幂等消费者发挥作用的地方。

幂等消费者旨在妥善处理重复消息。它们确保消除多次接收同一条消息的副作用。这通常通过跟踪所有已处理消息的 ID 来实现。当收到一条消息时,消费者会检查它是否已经处理了一条具有相同 ID 的消息。如果已经处理过,它就会忽略该消息。

下面是一个序列图,说明了发件箱模式如何解决问题:

发件箱模式的实现

现在您已经了解了发件箱模式的重要性和好处,让我们深入研究一下实现它需要什么:

发件箱模式的实现涉及以下步骤:

  1. 创建发件箱表:第一步是在数据库中创建发件箱表。此表将存储所有需要发送的消息。每条消息都应具有唯一的 ID 和一个指示消息是否已发送的状态字段。
  2. 修改应用程序代码:下一步是修改应用程序代码。每当您的应用程序需要将消息作为事务的一部分发送时,它都应将该消息作为同一事务的一部分添加到发件箱表中。
  3. 实现发件箱发布器:发件箱发布器是一个单独的组件,它会轮询发件箱表中未发送的消息。当它发现未发送的消息时,它会发送该消息并将发件箱表中该消息的状态更新为“已发送”。

DotNetCore.CAP 简介

幸运的是,有一个名为 DotNetCore.CAP 的 .NET 库可以为我们简化 Outbox 模式的实现。

DotNetCore.CAP 是一个开源库,它提供了一组 API,允许开发人员轻松地将消息作为数据库事务的一部分发送,将其存储在发件箱中,并确保即使在出现故障的情况下也能可靠地将它们传递给所有感兴趣的消费者。

该库还支持幂等消费者,这对于确保分布式环境中的数据一致性和可靠性至关重要。这意味着即使同一条消息被传递多次,也可以消除接收同一条消息的副作用。

通过使用DotNetCore.CAP,开发人员可以专注于其应用程序的业务逻辑,而库则负责确保分布式系统中可靠消息传递的复杂性。

例子

此代码演示了如何在 ASP.NET Core 应用程序中使用 CAP 库进行事件发布和处理。

在生产者中:

  • 定义了“/send”端点的路由处理程序。
  • 它启动一个事务,执行一个 SQL 命令来获取当前服务器时间,并将该时间的消息发布到“test.show.time”主题。
  • 消息发布延迟500毫秒。
  • 如果所有操作都成功,则事务被提交并返回响应。
// Producer/Program.cs  
app.MapGet("/send", async (  
	SqlConnection connection,  
	ICapPublisher capPublisher,  
	TimeProvider timeProvider) =>  
{  
	var startTime = timeProvider.GetTimestamp();  
	using var transaction = connection  
	.BeginTransaction(capPublisher, autoCommit: false);

	var command = connection.CreateCommand();
	command.Transaction = (SqlTransaction)transaction;
	command.CommandText = "SELECT GETDATE()";
	var serverTime = await command.ExecuteScalarAsync();

	await capPublisher.PublishDelayAsync(
		TimeSpan.FromMilliseconds(500),
		"test.show.time",
		new MyMessage(serverTime?.ToString()!));

	transaction.Commit();

	return TypedResults.Ok(new
	{
		Status = "Published",
		Duration = timeProvider.GetElapsedTime(startTime)
	});
});  

💡注意,BeginTransaction 扩展方法是定义在 DotNetCore.CAP.SqlServer中的,负责发件箱表的管理。

public static IDbTransaction BeginTransaction(  
	this IDbConnection dbConnection,  
	ICapPublisher publisher,  
	bool autoCommit = false)  

在消费者方面:

  • 定义一个实现ICapSubscribe的类SampleSubscriber
  • 它有一个HandleAsync的方法,将 CapSubscribe 指定为“test.show.time”作为主题的处理程序。
  • 当收到有关此主题的消息时,它会在 300 毫秒的延迟后记录消息内容。
// Consumer/Program.cs  
public class SampleSubscriber(  
	TimeProvider timeProvider,  
	ILogger <samplesubscriber>logger) : ICapSubscribe  
{  
	public record MyMessage(string CreatedAt);
	
	[CapSubscribe("test.show.time")]
	public async Task HandleAsync(MyMessage message)
	{
		await Task.Delay(TimeSpan.FromMilliseconds(300), timeProvider);

		logger.LogInformation("Message received: {CreatedAt}", message.CreatedAt);
	}
}  

添加Aspire

为了完整地演示该演示,我们需要设置一些真实的基础设施组件 - 消息代理和数据库。

.NET Aspire 提供了一套精选的 NuGet 包(组件),专门用于促进云原生应用程序的集成。每个组件都通过自动配置或标准化配置模式提供必要的云原生功能。

添加消息代理:

.NET Aspire 服务总线组件处理以下问题以将您的应用连接到 Azure 服务总线。它添加 ServiceBusClient 到 DI 容器以连接到 Azure 服务总线。

dotnet add package Aspire.Azure.Messaging.ServiceBus --prerelease  

添加数据库:

.NET Aspire 提供了两个内置配置选项来简化 Azure 上的 SQL Server 部署:

  1. 使用 Azure 容器应用预配容器化 SQL Server 数据库
  2. 提供一个 Azure SQL 数据库实例(我们将使用这个)
dotnet add package Aspire.Microsoft.Data.SqlClient --prerelease  

以下是我们如何根据已安装的组件设置 Aspire Host:

// CapExample.AppHost/Program.cs  
var builder = DistributedApplication.CreateBuilder(args);

var sqlServer = builder.ExecutionContext.IsPublishMode  
	? builder.AddSqlServer("sqlserver").PublishAsAzureSqlDatabase().AddDatabase("sqldb")  
	: builder.AddConnectionString("sqldb");

var serviceBus = builder.ExecutionContext.IsPublishMode  
	? builder.AddAzureServiceBus("serviceBus")  
	: builder.AddConnectionString("serviceBus");

builder.AddProject&lt;Projects.Consumer&gt;("consumer")  
	.WithReference(sqlServer)  
	.WithReference(serviceBus);

builder.AddProject&lt;Projects.Producer&gt;("producer")  
	.WithReference(sqlServer)  
	.WithReference(serviceBus);

builder.Build().Run();  

这个思路是在开发时使用连接字符串,并在发布时配置 Azure 资源。

Aspire 提供灵活的配置系统,让我们无需更改源代码即可在本地开发并部署到云中。我们可以使用由 Aspire Components 管理的连接字符串,这些连接字符串可以在本地开发和云部署环境之间轻松切换。这使我们能够在不同的部署场景之间无缝过渡,而无需修改源代码。

下面您可以找到如何基于 Aspire 组件进行配置 DotNetCore.CAP

// Consumer/Program.cs  
// Producer/Program.cs  
var builder = WebApplication.CreateBuilder(args);

builder.AddServiceDefaults();  
builder.AddAzureServiceBus("serviceBus");  
builder.AddSqlServerClient("sqldb");

builder.Services.AddCap(x =>  
{  
	var dbConnectionString = builder.Configuration.GetConnectionString("sqldb")!;  
	var serviceBusConnection = builder.Configuration.GetConnectionString("serviceBus")!;

	x.UseAzureServiceBus(serviceBusConnection);
	x.UseSqlServer(x => x.ConnectionString = dbConnectionString);
});  

提供基础设施

Azure 开发人员 CLI ( azd) 已得到增强,可支持 .NET Aspire 应用程序的部署。azd init工作流为 .NET Aspire 项目提供定制支持。我在开发此应用程序时使用了这种方法,事实证明它非常好。它提高了生产力。

azd目标是 .NET Aspire 应用程序时,它会使用指定的命令启动( AppHost dotnet run --project AppHost.csproj -- --publisher manifest),从而生成 Aspire 清单文件。

azd provision命令逻辑会询问清单文件,以仅在内存中生成 Bicep 文件(默认情况下)。

更多信息请参阅 https://learn.microsoft.com/en-us/dotnet/aspire/deployment/azure/aca-deployment-azd-in-depth?tabs=linux#how-azure-developer-cli-integration-works

就我个人而言,我发现显式生成 Bicep 文件更方便。这可以通过执行以下命令来实现:

❯ azd infra synth  

以下是将要配置的内容的可视化:

为了配置资源,我们需要运行下一个命令:

❯ azd provision  

这里创建的资源组- rg-cap-dev

配置本地开发

要检索 Azure 服务总线的连接字符串:

#!/bin/bash
resourceGroup="rg-cap-dev"
namespace=$(
    az servicebus namespace list \
        --resource-group $resourceGroup \
        --output tsv \
        --query '[0].name'
)
azbConnectionString=$(
    az servicebus namespace authorization-rule keys list \
        --namespace-name "$namespace" \
        --name RootManageSharedAccessKey \
        --resource-group $resourceGroup \
        --output tsv \
        --query 'primaryConnectionString'
)

dotnet user-secrets --project ./src/CapExample.AppHost \
    set ConnectionStrings:serviceBus $azbConnectionString

要检索 Azure SQL 数据库的连接字符串:

#!/bin/bash

# read server address
if [ -f .azure/cap-dev/.env ]
then
    export $(cat .azure/cap-dev/.env | sed 's/#.*//g' | xargs)
fi

# read server password
db_password=$(jq -r '.inputs.sql.password' .azure/cap-dev/config.json)

dotnet user-secrets --project ./src/CapExample.AppHost set ConnectionStrings:sqldb \
    "Server=$SQLSERVER_SQLSERVERFQDN;Initial Catalog=sqldb;Persist Security Info=False;User ID=CloudSA;Password=$db_password;MultipleActiveResultSets=False;Encrypt=True;TrustServerCertificate=False;Connection Timeout=30;"

要检查机密是否存储成功:

❯ dotnet user-secrets --project ./src/CapExample.AppHost list

# ConnectionStrings:sqldb = Server=sqlserver-gopucer6dsl5q.database.windows.net;Initial Catalog=sqldb;Persist Security Info=False;User ID=CloudSA;Password=<your-password>;MultipleActiveResultSets=False;Encrypt=True;TrustServerCertificate=False;Connection Timeout=30;
# ConnectionStrings:serviceBus = Endpoint=sb://servicebus-gopucer6dsl5q.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=<your-key>

演示

您可能知道,Aspire 具有开箱即用的 OpenTelemetry 支持,您可以在 ServiceDefaults/Extensions.cs 中对其进行配置。

DotNetCore.CAP 有一个支持 OpenTelemetry 功能的包。

OpenTelemetry 中的分布式跨度通过提供一种跨系统不同组件跟踪消息流的方法,帮助您跟踪消息代理上的操作。

当使用像 DotNetCore.CAP 这样的消息代理时,消息通常通过中间代理从生产者发送到消费者。此过程中的每个步骤都可以被视为一个跨度,它代表一个工作单元或一个操作。

通过使用 OpenTelemetry 检测代码,您可以创建捕获有关消息流每个步骤的信息的跨度。这些跨度可以包括详细信息,例如所花费的时间、遇到的任何错误以及其他上下文信息。

借助分布式跨度,您可以直观地看到消息在系统中移动的整个过程,从生产者到代理,最后到消费者。这让您能够深入了解消息处理管道的性能和行为。

通过分析分布式跨度,您可以识别消息处理流程中的瓶颈、延迟问题和潜在错误。这些信息对于排除分布式系统故障和优化其性能非常有价值。

以下是DotNetCore.CAP 安装和配置 OpenTelemetry 所需要做的事情 :

安装 NuGet 包:

❯ dotnet add ./src/CapExample.ServiceDefaults/ package DotNetCore.Cap.OpenTelemetry  

调整默认配置:

builder.Services.AddOpenTelemetry()
    .WithMetrics(metrics =>
    {
        metrics
            .AddAspNetCoreInstrumentation()
            .AddHttpClientInstrumentation()
            .AddProcessInstrumentation()
            .AddRuntimeInstrumentation();
    })
    .WithTracing(tracing =>
    {
        if (builder.Environment.IsDevelopment())
        {
            tracing.SetSampler(new AlwaysOnSampler());
        }

        tracing
            .AddAspNetCoreInstrumentation()
            .AddGrpcClientInstrumentation()
            .AddHttpClientInstrumentation()
            .AddCapInstrumentation(); // <-- add this code
    });

本地

现在,让我们运行演示:

❯ dotnet run --project ./src/CapExample.AppHost
# Building...
# Restore complete (0.6s)
#   CapExample.ServiceDefaults succeeded (0.2s) → src\CapExample.ServiceDefaults\bin\Debug\net9.0\CapExample.ServiceDefaults.dll
#   Consumer succeeded (0.3s) → src\Consumer\bin\Debug\net9.0\Consumer.dll
#   Producer succeeded (0.5s) → src\Producer\bin\Debug\net9.0\Producer.dll
#   CapExample.AppHost succeeded (0.2s) → src\CapExample.AppHost\bin\Debug\net9.0\CapExample.AppHost.dll

# Build succeeded in 2.6s
# info: Aspire.Hosting.DistributedApplication[0]
#       Aspire version: 9.0.0-preview.2.24162.2+eaca163f7737934020f1102a9d11fdf790cccdc0
# info: Aspire.Hosting.DistributedApplication[0]
#       Distributed application starting.
# info: Aspire.Hosting.DistributedApplication[0]
#       Application host directory is: C:\Users\Oleksii_Nikiforov\dev\cap-aspire\src\CapExample.AppHost
# info: Aspire.Hosting.DistributedApplication[0]
#       Now listening on: http://localhost:15118
# info: Aspire.Hosting.DistributedApplication[0]
#       Distributed application started. Press Ctrl+C to shut down.

让我们生成一些负载:

❯ curl -s http://localhost:5288/send | jq
# {
#   "status": "Published",
#   "duration": "00:00:00.5255861"
# }

导航到 Aspire Dashboard 查看跟踪:

这是第一个Request,如您所见,我们需要一些时间来与 Azure 服务总线建立初始连接:

后续请求花费的时间更少:

💡我建议您深入研究源代码和跟踪示例,以增强您对Outbox Pattern工作原理的理解。

Azure

让我们通过运行azd deploy部署到 Azure 容器应用

在初始配置(azd init)期间,我指定了生产者的公共地址。我们可以利用它进行开发测试:

让我们生成一些负载并查看 Azure 服务总线的指标。

❯ curl -s https://producer.mangoforest-17799c26.eastus.azurecontainerapps.io/send | jq
# {
#   "status": "Published",
#   "duration": "00:00:00.0128251"
# }

发件箱表

在后台 DotNetCore.CAP 创建两个表来管理发件箱。

发件箱模式中,Published 和 Received 表用于管理需要发布或已被消息系统接收的消息。让我们仔细看看每个表的用途:

Published 表:负责存储需要发布到外部消息系统的消息。当应用程序生成需要发送的消息时,该消息将存储在已发布表中。此表充当缓冲区或队列,确保如果消息系统暂时不可用或在发布过程中出现任何故障,消息不会丢失。通过使用已发布表,应用程序可以继续生成消息,而不会受到消息系统可用性或性能的阻碍。Published 表中的消息可以由单独的组件或后台进程异步处理,这些组件或后台进程负责将它们发布到外部消息系统。一旦消息成功发布,就可以将其从Published 表中删除。

Received 表:用于跟踪应用程序从外部消息传递系统接收到的消息。当应用程序收到消息时,它会将有关该消息的必要信息存储在Received 表中。此信息可以包括消息内容、元数据和任何其他相关详细信息。Received 表允许应用程序保留已处理的消息记录,从而使其能够处理重复消息。

清理

完成开发后,您可以删除资源组 rg-cap-dev

❯ azd down
# Deleting all resources and deployed code on Azure (azd down)
# Local application code is not deleted when running 'azd down'.
#   Resource group(s) to be deleted:
#     • rg-cap-dev: https://portal.azure.com/#@/resource/subscriptions/0b252e02-9c7a-4d73-8fbc-633c5d111ebc/resourceGroups/rg-cap-dev/overview
# ? Total resources to delete: 10, are you sure you want to continue? Yes
# Deleting your resources can take some time.
#   (✓) Done: Deleting resource group: rg-cap-dev

# SUCCESS: Your application was removed from Azure in 10 minutes 53 seconds.

总结

在本文中,我们探讨了发件箱模式,这是分布式系统领域中的关键组件。我们已经了解了它如何确保分布式系统中可靠的消息传递,从而维护整个系统的完整性和一致性。

我们还研究了 .NET 库 DotNetCore.CAP 如何简化Outbox Pattern的实现,使开发人员能够专注于其应用程序的业务逻辑,而库则负责确保可靠消息传递的复杂性。

最后,我们看到了 .NET Aspire 如何提供精选的 NuGet 包套件,以促进云原生应用程序的集成,并通过自动配置或标准化配置模式提供必要的云原生功能。

总之,Outbox Pattern、DotNetCore.CAP 和 .NET Aspire 的组合提供了一套强大的工具集,用于在 .NET 中构建可靠的云原生应用程序。通过理解和利用这些工具,开发人员可以构建强大、可扩展且可应对现代分布式世界挑战的应用程序。

参考

https://learn.microsoft.com/en-us/azure/developer/azure-developer-cli/
https://cap.dotnetcore.xyz/
https://learn.microsoft.com/en-us/dotnet/aspire/deployment/azure/aca-deployment-azd-in-deep

From:https://www.cnblogs.com/savorboard/p/18401708/aspire-cap
本文地址: http://www.shuzixingkong.net/article/1808
0评论
提交 加载更多评论
其他文章 HashMap深入讲解
HashMap是Java中最常用的集合类框架,也是Java语言中非常典型的数据结构, 而HashSet和HashMap者在Java里有着相同的实现,前者仅仅是对后者做了一层包装,也就是说HashSet里面有一个HashMap(适配器模式)。因此了解HashMap源码也就了解HashSet了 介绍 K
HashMap深入讲解
真人模特失业?AI虚拟试衣一键成图,IDM-VTON下载介绍
在电商行业竞争尤为激烈的当下,除了打价格战外,如何有效的控制成本,是每个从业者都在思考的问题 IDM-VTON是一个AI虚拟换装工具,旨在帮助服装商家解决约拍模特导致的高昂成本问题,只需一张服装图片,就可以生成各种身穿该服装的模特,大大简化了传统的产品展示过程 IDM-VTON最新中文版: 百度网盘
真人模特失业?AI虚拟试衣一键成图,IDM-VTON下载介绍 真人模特失业?AI虚拟试衣一键成图,IDM-VTON下载介绍 真人模特失业?AI虚拟试衣一键成图,IDM-VTON下载介绍
[C#基础1/21] C#概述
Notion原笔记 1. C# 简介 1.1 C# 定义 C# 在继承 C 和 C++ 强大功能的同时去掉了一些它们的复杂特性,使其成为 C 语言家族中的一种高效强大的编程语言 1.2 C# 用途 用于捕获、分析和处理数据的业务应用程序 可从 Web 浏览器访问的动态 Web 应用程序 2D 和 3
manim边学边做--角度标记
manim中绘制一个角度其实就是绘制两条直线,本篇介绍的不是绘制角度,而是绘制角度标记。 对于锐角和钝角,角度标记是一个弧,弧的度数与角的度数一样; 对于直角,角度标记是一个垂直的拐角。 manim中关于角度标记的模型主要有3个: Angle:根据两条直线绘制角度标记 RightAngle:根据两条
manim边学边做--角度标记 manim边学边做--角度标记 manim边学边做--角度标记
设计模式之模板方法模式(三分钟学会一个设计模式)
模板方法模式(Template Method Pattern)也称之为模板模式(Template Pattern),是设计模式中最简单的模式之一。 先来看定义:定义一个操作中算法的骨架(模板),将一些步骤延迟到子类中,模板方法使得子类可以不改变算法的结构即可重新定义算法某些特定的步骤。这个定义还是有
设计模式之模板方法模式(三分钟学会一个设计模式) 设计模式之模板方法模式(三分钟学会一个设计模式)
跳跃表
概述 跳跃表(SkipList)是链表加多级索引组成的数据结构。链表的数据结构的查询复条度是 O(N)。为了提高查询效率,可以在链表上加多级索引来实现快速查询。跳跃表不仅能提高搜索性能。也能提高插入和删除操作的性能。索引的层数也叫作跳跃表的高度 查找 在跳跃表的结构中会首先从顶层开始查找,当顶层不存
跳跃表 跳跃表 跳跃表
Java是值传递还是引用传递,又是怎么体现的
关于Java是值传递还是引用传递,可以从代码层面来实现一下拿到结果 执行下面的代码: public static void main(String[] args) { int num = 10; String name = &quot;Tom&quot;; modify(num, name); Sy
五子棋AI:实现逻辑与相关背景探讨(下)
前文回顾 在上篇文章中,我们约定了一种衡量格子价值的方式,如下表。 综合价值排序 己方价值 敌方价值 对应的奖励数值 1 Lv1 ? \(2^{20}\) 2 ? Lv1 \(2^{16}\) 3 Lv2 ? \(2^{12}\) 4 ? Lv2 \(2^{8}\) 5 Lv3 ? \(2^{4}\