12. 事件总线 - 跨进程事件总线

12. 事件总线 - 跨进程事件总线

跨进程事件总线允许在微服务或应用程序之间发布和接收消息,服务的发布与订阅不在同一个进程中

提供程序

跨进程事件总线系统提供了一个可以被开箱即用的程序

快速入门

  1. 新建ASP.NET Core 空项目Assignment.IntegrationEventBus,并安装Masa.Contrib.Dispatcher.IntegrationEvents.DaprMasa.Contrib.Dispatcher.IntegrationEvents.EventLogs.EFMasa.Contrib.Data.EntityFrameworkCore.SqliteMasa.Contrib.Data.UoW.EFMasa.Utils.Development.Dapr.AspNetCoreMicrosoft.EntityFrameworkCore.Design

    1
    2
    3
    4
    5
    6
    7
    8
    9
    dotnet new web -o Assignment.IntegrationEventBus
    cd Assignment.IntegrationEventBus

    dotnet add package Masa.Contrib.Dispatcher.IntegrationEvents.Dapr --version 0.5.0-rc.2 //跨进程事件
    dotnet add package Masa.Contrib.Dispatcher.IntegrationEvents.EventLogs.EF --version 0.5.0-rc.2 //本地消息表
    dotnet add package Masa.Contrib.Data.EntityFrameworkCore.Sqlite --version 0.5.0-rc.2 //使用EfCore.Sqlite
    dotnet add package Masa.Contrib.Data.UoW.EF --version 0.5.0-rc.2 //使用UoW
    dotnet add package Masa.Utils.Development.Dapr.AspNetCore --version 0.5.0-rc.2 //开发环境使用Dapr Sidecar
    dotnet add package Microsoft.EntityFrameworkCore.Design --version 6.0.6 //方便后续通过CodeFirst迁移数据库
  2. 新建用户上下文类UserDbContext,并继承MasaDbContext

    1
    2
    3
    4
    5
    6
    public class UserDbContext : MasaDbContext
    {
    public UserDbContext(MasaDbContextOptions options) : base(options)
    {
    }
    }
  3. 注册跨进程事件总线,修改类Program

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31

    #region 启动dapr sidecar

    if (builder.Environment.IsDevelopment())
    {
    builder.Services.AddDaprStarter(option =>
    {
    option.AppPort = 5061;
    option.DaprGrpcPort = 5032;
    option.DaprHttpPort = 5031;
    }, false);//开发环境使用Dapr Starter管理Sidecar
    }

    #endregion

    builder.Services.AddIntegrationEventBus(option =>
    {
    option.UseDapr();
    option.UseUoW<UserDbContext>(optionBuilder => optionBuilder.UseSqlite($"Data Source=./Db/{Guid.NewGuid():N}.db;"));
    option.UseEventLog<UserDbContext>();
    });

    var app = builder.Build();

    app.UseRouting();

    app.UseCloudEvents();
    app.UseEndpoints(endpoints =>
    {
    endpoints.MapSubscribeHandler();
    });

    借助Dapr发布跨进程事件需要Dapr Sidecar支持,开发环境如何轻松使用dapr Sidecar

  4. 迁移数据库,修改类Program

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    #region 迁移数据库

    using (var scope =app.Services.CreateScope())
    {
    var services = scope.ServiceProvider;
    var context = services.GetRequiredService<UserDbContext>();
    context.Database.Migrate();
    context.Database.EnsureCreated();
    }

    #endregion
  5. 新增用户注册事件RegisterUserEvent

    1
    2
    3
    4
    5
    6
    7
    8
    public record RegisterUserEvent : IntegrationEvent
    {
    public override string Topic { get; set; } = nameof(RegisterUserEvent);

    public string Account { get; set; }

    public string Mobile { get; set; }
    }
  6. 打开Assignment.IntegrationEventBus所在文件夹,打开cmd或Powershell执行

    1
    dotnet ef migrations add init //创建迁移
  7. 发送跨进程事件,修改Program

    1
    2
    3
    4
    5
    6
    7
    8
    9
    app.MapPost("/register", async (IIntegrationEventBus eventBus) =>
    {
    //todo: 模拟注册用户并发布注册用户事件
    await eventBus.PublishAsync(new RegisterUserEvent()
    {
    Account = "Tom",
    Mobile = "19999999999"
    });
    });
  8. 订阅事件,修改Program

    1
    2
    3
    4
    app.MapPost("/IntegrationEvent/RegisterUser", [Topic("pubsub", nameof(RegisterUserEvent))](RegisterUserEvent @event) =>
    {
    Console.WriteLine($"注册用户成功: {@event.Account}");
    });

    订阅事件暂时未抽象,因此使用的是Dapr原生的订阅方式,后续我们会支持Bind,届时不会由于更换实现而导致订阅方式的改变

尽管跨进程事件目前仅支持了Dapr,但这不代表你与RabbitMqKafka等无缘,发布/订阅是Dapr抽象出的能力,实现发布订阅的组件有很多种,RabbitMqKafka是其中一种实现,如果你想深入了解他们之间的关系,可以参考:

  1. 手把手教你学Dapr
  2. PubSub代理

如何快速接入其它实现?

那会有小伙伴问了,我现在没有使用Dapr,未来一段时间暂时也还不希望接入Dapr,那我应该怎么做呢?

比如说我希望接入RabbitMq,那我可以怎么做呢?

  1. 新建类库Masa.Contrib.Dispatcher.IntegrationEvents.RabbitMq,添加Masa.Contrib.Dispatcher.IntegrationEvents项目引用,并安装RabbitMQ.Client

    1
    dotnet add RabbitMQ.Client //使用RabbitMq
  2. 新增类Publisher,并实现IPublisher

    1
    2
    3
    4
    5
    6
    7
    8
    public class Publisher : IPublisher
    {
    public async Task PublishAsync<T>(string topicName, T @event, CancellationToken stoppingToken = default) where T : IIntegrationEvent
    {
    //todo: 发送消息到RabbitMq
    throw new NotImplementedException();
    }
    }
  3. 新建类DispatcherOptionsExtensions,将自定义Publisher注册到服务集合

    1
    2
    3
    4
    5
    6
    7
    8
    9
    public static class DispatcherOptionsExtensions
    {
    public static DispatcherOptions UseRabbitMq(this Masa.Contrib.Dispatcher.IntegrationEvents.Options.DispatcherOptions options)
    {
    //todo: 注册RabbitMq信息
    dispatcherOptions.Services.TryAddSingleton<IPublisher, Publisher>();
    return dispatcherOptions;
    }
    }
  4. 如何使用自定义实现RabbitMq

    1
    2
    3
    4
    5
    6
    builder.Services.AddIntegrationEventBus(option =>
    {
    option.UseRabbitMq();//修改为使用RabbitMq
    option.UseUoW<UserDbContext>(optionBuilder => optionBuilder.UseSqlite($"Data Source=./Db/{Guid.NewGuid():N}.db;"));
    option.UseEventLog<UserDbContext>();
    });

本章源码

Assignment12

https://github.com/zhenlei520/MasaFramework.Practice

开源地址

MASA.BuildingBlocks:https://github.com/masastack/MASA.BuildingBlocks

MASA.Contrib:https://github.com/masastack/MASA.Contrib

MASA.Utils:https://github.com/masastack/MASA.Utils

MASA.EShop:https://github.com/masalabs/MASA.EShop

MASA.Blazor:https://github.com/BlazorComponent/MASA.Blazor

如果你对我们的 MASA Framework 感兴趣,无论是代码贡献、使用、提 Issue,欢迎联系我们

16373211753064.png

作者

MASA

发布于

2022-06-28

更新于

2022-09-26

许可协议