12. 事件总线 - 跨进程事件总线
概述
跨进程事件总线允许发布和订阅跨服务传输的消息, 服务的发布与订阅不在同一个进程中
在Masa Framework中, 跨进程总线事件提供了一个可以被开箱即用的程序
- IntegrationEvents: 提供了发件箱模式
- IntegrationEvents.Dapr: 借助Dapr实现了消息的发布
- EventLogs.EFCore: 基于EFCore实现的集成事件日志的提供者, 提供消息的记录与状态更新、失败日志重试、删除过期的日志记录等
入门
跨进程事件与Dapr
并不是强绑定的, Masa Framework使用了Dapr
提供的pub/sub的能力, 如果你不想使用它, 你也可以更换为其它实现, 但目前Masa Framwork中仅提供了Dapr
的实现
- 新建ASP.NET Core 空项目
Assignment.IntegrationEventBus
,并安装Masa.Contrib.Dispatcher.IntegrationEvents.Dapr
、Masa.Contrib.Dispatcher.IntegrationEvents.EventLogs.EFCore
、Masa.Contrib.Data.EFCore.Sqlite
、Masa.Contrib.Data.UoW.EFCore
、Masa.Contrib.Development.DaprStarter.AspNetCore
、Microsoft.EntityFrameworkCore.Design
1 | dotnet new web -o Assignment.IntegrationEventBus |
- 新建用户上下文类
UserDbContext
,并继承MasaDbContext
1 | public class UserDbContext : MasaDbContext |
- 注册
DaprStarter
, 协助管理Dapr Sidecar
, 修改Program.cs
1 | if (builder.Environment.IsDevelopment()) |
通过
Dapr
发布集成事件需要运行Dapr
, 线上环境可通过Kubernetes
来运行, 开发环境可借助Dapr Starter运行Dapr
, 因此仅需要在开发环境使用它
- 注册跨进程事件总线,修改类
Program
1 | builder.Services.AddIntegrationEventBus(option => |
- 新增用户注册事件的集成事件
RegisterUserEvent
1 | public record RegisterUserEvent : IntegrationEvent |
- 打开
Assignment.IntegrationEventBus
所在文件夹,打开cmd或Powershell执行
1 | dotnet ef migrations add init //创建迁移 |
- 发送跨进程事件,修改
Program
1 | app.MapPost("/register", async (IIntegrationEventBus eventBus) => |
- 订阅事件,修改
Program
1 | app.MapPost("/IntegrationEvent/RegisterUser", [Topic("pubsub", nameof(RegisterUserEvent))](RegisterUserEvent @event) => |
订阅事件暂时未抽象,目前使用的是
Dapr
原生的订阅方式,后续我们会支持Bind,届时不会由于更换pubsub的实现而导致订阅方式的改变
尽管跨进程事件目前仅支持了Dapr
,但这不代表你与RabbitMq
、Kafka
等无缘,发布/订阅是Dapr
抽象出的能力,实现发布订阅的组件有很多种,RabbitMq
、Kafka
是其中一种实现,如果你想深入了解他们之间的关系,可以参考:
源码解读
首先我们先要知道的基础知识点:
- IIntegrationEvent: 集成事件接口, 继承 IEvent (本地事件接口)、ITopic (订阅接口, 发布订阅的主题)、ITransaction (事务接口)
- IIntegrationEventBus: 集成事件总线接口、用于提供发送集成事件的功能
- IIntegrationEventLogService: 集成事件日志服务的接口 (提供保存本地日志、修改状态为进行中、成功、失败、删除过期日志、获取等待重试日志列表的功能)
- IntegrationEventLog: 集成事件日志, 提供本地消息表的模型
- IHasConcurrencyStamp: 并发标记接口 (实现此接口的类会自动为
RowVersion
赋值)
Masa.Contrib.Dispatcher.IntegrationEvents
提供了集成事件接口的实现类, 并支持了发件箱模式, 其中:
- IPublisher: 集成事件的发送者
- IProcessingServer: 后台服务接口
- IProcessor: 处理程序接口 (后台处理程序中会获取所有的程序程序)
- DeleteLocalQueueExpiresProcessor: 删除过期程序 (从本地队列删除)
- DeletePublishedExpireEventProcessor: 删除已过期的发布成功的本地消息程序 (从Db删除)
- RetryByLocalQueueProcessor: 重试本地消息记录 (从本地队列中获取, 条件: 发送状态为失败或进行中且重试次数小于最大重试次数且重试间隔大于最小重试间隔)
- RetryByDataProcessor: 重试本地消息记录 (从Db获取, 条件: 发送状态为失败或进行中且重试次数小于最大重试次数且重试间隔大于最小重试间隔, 且不在本地重试队列中)
- IntegrationEventBus: IIntegrationEvent的实现
在Masa.Contrib.Dispatcher.IntegrationEvents
中仅提供了发件箱的功能, 但集成事件的发布是由 IPublisher
的实现类来提供, 由Db获取本地消息表的功能是由IIntegrationEventLogService
的实现类来提供, 它们分别属于Masa.Contrib.Dispatcher.IntegrationEvents.Dapr
、Masa.Contrib.Dispatcher.IntegrationEvents.EventLogs.EFCore
的功能, 这也是为什么使用集成事件需要引用包
Masa.Contrib.Dispatcher.IntegrationEvents
Masa.Contrib.Dispatcher.IntegrationEvents.Dapr
Masa.Contrib.Dispatcher.IntegrationEvents.EventLogs.EFCore
如何快速接入其它实现
那会有小伙伴问了, 我现在没有使用Dapr
, 未来一段时间暂时也还不希望接入Dapr
, 我想自己接入, 以实现集成事件的发布可以吗?
当然是可以的, 如果你希望自行实现集成事件, 那么这个时候你会遇到两种情况
接入方支持发件箱模式
以社区用的较多的库CAP为例, 由于它本身已经完成了发件箱模式, 我们不需要再处理本地消息表, 也无需考虑本地消息记录的管理, 那我们可以这样做
- 新建类库
Masa.Contrib.Dispatcher.IntegrationEvents.Cap
, 添加Masa.BuildingBlocks.Dispatcher.IntegrationEvents
的引用, 并安装DotNetCore.CAP
1 | dotnet add package DotNetCore.CAP |
- 新增类
IntegrationEventBus
, 并实现IIntegrationEventBus
1 | public class IntegrationEventBus : IIntegrationEventBus |
CAP已支持本地事务, 使用当前
IUnitOfWork
提供的事务, 确保数据的原子性
- 新建类
ServiceCollectionExtensions
, 将自定义Publisher
注册到服务集合
1 | public static class ServiceCollectionExtensions |
已经实现发件箱模式的可以直接使用, 而不需要引用
Masa.Contrib.Dispatcher.IntegrationEvents
Masa.Contrib.Dispatcher.IntegrationEvents.Dapr
Masa.Contrib.Dispatcher.IntegrationEvents.EventLogs.EFCore
以上未经过实际验证, 感兴趣的可以尝试下, 欢迎随时提
pr
接入方不支持发件箱模式
我希望直接接入RabbitMq
, 但我自己没有做发件箱模式, 那我可以怎么做呢?
由于Masa.Contrib.Dispatcher.IntegrationEvents
已提供发件箱模式, 如果仅仅希望更换一个发布事件的实现者, 那我们仅需要实现IPublisher
即可
- 新建类库
Masa.Contrib.Dispatcher.IntegrationEvents.RabbitMq
, 添加Masa.Contrib.Dispatcher.IntegrationEvents
项目引用, 并安装RabbitMQ.Client
1 | dotnet add package RabbitMQ.Client //使用RabbitMq |
- 新增类
Publisher
,并实现IPublisher
1 | public class Publisher : IPublisher |
- 新建类
DispatcherOptionsExtensions
, 将自定义Publisher
注册到服务集合
1 | public static class DispatcherOptionsExtensions |
- 如何使用自定义实现
RabbitMq
1 | builder.Services.AddIntegrationEventBus(option => |
本章源码
Assignment12
https://github.com/zhenlei520/MasaFramework.Practice
开源地址
MASA.Framework:https://github.com/masastack/MASA.Framework
MASA.EShop:https://github.com/masalabs/MASA.EShop
MASA.Blazor:https://github.com/BlazorComponent/MASA.Blazor
如果你对我们的 MASA Framework 感兴趣,无论是代码贡献、使用、提 Issue,欢迎联系我们