11. 事件总线 - 进程内事件总线

11. 事件总线 - 进程内事件总线

事件总线是一种事件发布/订阅架构,通过解耦发布者和订阅者将业务解耦,这里的事件可以理解为消息,本文中统一称为事件,它有以下优点:

  • 松耦合
  • 横切关注点
  • 可测试性
  • 事件驱动

事件总线类型

Masa Framework提供了两种事件总线,它们分别是:

  • 进程内事件总线
    • 服务的发布与订阅需要在同一个进程中
  • 跨进程事件总线
    • 跨进程的消息传递,如微服务发布和订阅跨进程事件,服务的发布与订阅一定不在同一个进程

接下来我们会用一个转账的例子来说明进程内事件总线如何使用

快速入门

  1. 新建ASP.NET Core 空项目Assignment.InProcessEventBus,并安装Masa.Contrib.Dispatcher.Events

    1
    2
    3
    dotnet new web -o Assignment.InProcessEventBus
    cd Assignment.InProcessEventBus
    dotnet add package Masa.Contrib.Dispatcher.Events --version 0.5.0-preview.7
  2. 注册事件总线,修改类Program

    1
    builder.Services.AddEventBus();
  3. 新建转账事件TransferEvent

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    /// <summary>
    /// 本地事件需继承 IEvent
    /// </summary>
    public record TransferEvent : Event
    {
    public string Account { get; set; } = default!;

    public string ReceiveAccount { get; set; } = default!;

    public decimal Money{ get; set; }
    }

    由于Event继承了IEvent,因此我们使用Event即可

  4. 新建转账处理程序TransferHandler(订阅转账事件)

  • 订阅消息有两种方式:

    1
    2
    3
    4
    5
    6
    7
    8
    public class TransferHandler
    {
    [EventHandler]
    public Task TransferAsync(TransferEvent @event)
    {
    //todo: 处理转账业务
    }
    }
    1. 在方法上方增加特性[EventHandler]
    1
    2
    3
    4
    5
    6
    7
    8
    public class TransferHandler : IEventHandler<TransferEvent>
    {
    public Task HandleAsync(TransferEvent @event)
    {
    //todo: 处理转账业务
    return Task.CompletedTask;
    }
    }
    1. 继承IEventHandler<TEvent> (不支持Cancel) 或ISagaEventHandler<TEvent>(支持Cancel)
  1. 发送转账事件,修改Program

    1
    2
    app.MapPost("/transfer", async (TransferEvent @event, IEventBus eventBus)
    => await eventBus.PublishAsync(@event));

进阶

除了上述功能外,进程内事件总线还支持

  • Handler编排:通过事件层层推进满足顺序执行的场景
  • 支持Saga:当服务无法与本地事务结合时,支持取消动作
  • 支持中间件:类似.Net Middleware,支持对Event自定义Handler

Handler编排

以转账业务为例,可以将Handler分为三步:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
[EventHandler(1)]
public Task CheckAccountState(TransferEvent @event)
{
// todo: 第一步: 检查当前余额状态,账户余额
return Task.CompletedTask;
}

[EventHandler(2)]
public Task FreezeMoneyAsync(TransferEvent @event)
{
//todo: 第二步: 冻结余额
return Task.CompletedTask;
}

[EventHandler(3)]
public Task TransferAsync(TransferEvent @event)
{
try
{
//todo: 发起转账请求
}
catch (Exception ex)
{
//todo: 发起转账请求异常则解冻余额
}
return Task.CompletedTask;
}

支持Saga

由于事件总线默认支持Saga,因此我们可以将转账业务拆解为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
[EventHandler(3, FailureLevels.ThrowAndCancel)]
public Task TransferAsync(TransferEvent @event)
{
//todo: 发起转账请求
return Task.CompletedTask;
}

/// <summary>
/// IsCancel为true,代表当前Handler为Cancel,默认IsCancel为false
/// </summary>
/// <param name="event"></param>
/// <returns></returns>
[EventHandler(3, IsCancel = true)]
public Task UnfreezeMoneyAsync(TransferEvent @event)
{
//todo: 发起转账请求异常, 解冻余额
return Task.CompletedTask;
}

支持中间件

进程内事件总线还支持中间件,允许像俄罗斯套娃一样(.Net Middleware)做横切关注点的相关的事情,例如:

增加FluentValidation,并新增转账事件参数验证

  1. 选中类库Assignment.InProcessEventBus,并安装FluentValidation.AspNetCore

    1
    dotnet add package FluentValidation.AspNetCore --version 11.1.0 新增FluentValidation验证
  2. 新增验证中间件ValidatorMiddleware

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    public class ValidatorMiddleware<TEvent> : Middleware<TEvent>
    where TEvent : notnull, IEvent
    {
    private readonly ILogger<ValidatorMiddleware<TEvent>> _logger;
    private readonly IEnumerable<IValidator<TEvent>> _validators;

    public ValidatorMiddleware(IEnumerable<IValidator<TEvent>> validators, ILogger<ValidatorMiddleware<TEvent>> logger)
    {
    _validators = validators;
    _logger = logger;
    }

    public override async Task HandleAsync(TEvent action, EventHandlerDelegate next)
    {
    var typeName = action.GetType().FullName;

    _logger.LogInformation("----- Validating command {CommandType}", typeName);

    var failures = _validators
    .Select(v => v.Validate(action))
    .SelectMany(result => result.Errors)
    .Where(error => error != null)
    .ToList();

    if (failures.Any())
    {
    _logger.LogWarning("Validation errors - {CommandType} - Command: {@Command} - Errors: {@ValidationErrors}", typeName, action, failures);

    throw new ValidationException(failures.Select(x=>x.ErrorMessage).FirstOrDefault());//根据需要抛出异常信息
    }

    await next();
    }
    }
  3. 修改类Program

    1
    2
    3
    4
    5
    builder.Services.AddEventBus(eventBuilder => eventBuilder.UseMiddleware(typeof(ValidatorMiddleware<>)))
    .AddFluentValidation(options =>
    {
    options.RegisterValidatorsFromAssemblyContaining<Program>();
    })
  4. 新增转账验证

    1
    2
    3
    4
    5
    6
    7
    8
    9
    public class TransferEventValidator : AbstractValidator<TransferEvent>
    {
    public TransferEventValidator()
    {
    RuleFor(t => t.Money).GreaterThan(0).WithMessage("转账金额必须大于0");
    RuleFor(t => t.Account).Must(account => !string.IsNullOrEmpty(account)).WithMessage("转账账户错误");
    RuleFor(t => t.ReceiveAccount).Must(account => !string.IsNullOrEmpty(account)).WithMessage("到账账户错误");
    }
    }

    后续如果希望增加其它事件验证,则重复步骤4操作并更换验证类即可

FluentValidation并不是必须的,它只是参数验证的一种,通过FluentValidation可以将参数验证与业务分开,使得Handler中业务更聚焦,当然由于事件总线支持编排,你也可以新增一个Handler,如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/// <summary>
/// 检查参数信息
/// </summary>
/// <param name="event"></param>
/// <returns></returns>
/// <exception cref="ArgumentException"></exception>
[EventHandler(0)]
public Task CheckTransferEventParameter(TransferEvent @event)
{
if (@event.Money <= 0)
throw new ArgumentException("转账金额必须大于0");
if (string.IsNullOrEmpty(@event.Account))
throw new ArgumentException("转账账户错误");
if (string.IsNullOrEmpty(@event.ReceiveAccount))
throw new ArgumentException("到账账户错误");

return Task.CompletedTask;
}

扩展

进程内事件总线除了拥有以上的功能之外,还支持与UoW结合使用,并提供基于EFCore的实现,并默认启用事务

性能测试

我们与市面上使用较多的MeidatR作了对比,结果如下图所示:

BenchmarkDotNet=v0.13.1, OS=Windows 10.0.19043.1023 (21H1/May2021Update)
11th Gen Intel Core i7-11700 2.50GHz, 1 CPU, 16 logical and 8 physical cores
.NET SDK=7.0.100-preview.4.22252.9
[Host] : .NET 6.0.6 (6.0.622.26707), X64 RyuJIT DEBUG
Job-MHJZJL : .NET 6.0.6 (6.0.622.26707), X64 RyuJIT

Runtime=.NET 6.0 IterationCount=100 RunStrategy=ColdStart

Method Mean Error StdDev Median Min Max
AddShoppingCartByEventBusAsync 124.80 us 346.93 us 1,022.94 us 8.650 us 6.500 us 10,202.4 us
AddShoppingCartByMediatRAsync 110.57 us 306.47 us 903.64 us 7.500 us 5.300 us 9,000.1 us

根据性能测试我们发现,EventBus与MediatR性能差距很小,但EventBus提供的功能却要强大的多

常见问题

  1. FailureLevels是什么?有什么区别?

    1
    2
    3
    4
    失败级别
    Throw:发生异常后,依次执行Order小于当前Handler的Order的取消动作,比如:Handler顺序为1、2、3,CancelHandler为1、2、3,如果执行Handler3异常,则依次执行2、1
    ThrowAndCancel:发生异常后,依次执行Order小于等于当前Handler的Order的取消动作,比如:Handler顺序为1、2、3,CancelHandler为1、2、3,如果执行Handler3异常,则依次执行3、2、1
    Ignore:发生异常后,忽略当前异常(不执行取消动作),继续执行其他Handler
  2. EventBus结合UoW、以及Repository来使用时,什么时候执行SaveChange?

    1
    依次执行所有的Handler与自定义中间件后,会自动执行SaveChange
  3. 如果在EventHandler中手动调用UoW的SaveChange方法保存,那框架还会自动保存吗?

    1
    在EventHandler中手动调用了UoW的SaveChange方法保存,且之后并未再使用IRepository提供的Add、Update、Delete操作,则在EventHandler执行结束后不会二次执行SaveChange操作,但如果在手动调用UoW的SaveChange方法保存后又继续使用IRepository提供的Add、Update、Delete操作,则框架会再次调用SaveChange操作以确保数据保存成功
  4. 通过EventBus发布事件,Handler出错,但数据依然保存到数据库中,事务并未回滚

    1
    2
    3
    4
    1. 检查自定义事件或继承类,确保已经实现ITransaction
    2. 确认已使用UoW
    3. 确认UnitOfWork的UseTransaction属性为false
    4. 确认UnitOfWork的DisableRollbackOnFailure属性为true
  5. 按照文档通过EventBus发布事件,Handler没有执行,也没有任何错误?

    1
    EventBus.PublishAsync(@event)是异步方法,需要使用await,如果是同步方法发送事件,则需要转换为同步调用,如EventBus.PublishAsync(@event).ConfigureAwait(false).GetAwaiter().GetResult();

总结

上面说了这么多,我们再来看一下进程内事件总线的执行流程:

EventBus.png

结合上图我们可以看到,从客户端发起请求到服务端接受请求后,再通过EventBus发送事件,然后

  1. 执行事务中间件(默认)
  2. 自定义中间件(根据中间件的添加顺序依次执行)
  3. 根据Order从小到大的顺序执行Handler,如果一旦Handler出现异常,则根据FailureLevels以及Order针对应的执行HandlerCancelHandler

本章源码

Assignment11

https://github.com/zhenlei520/MasaFramework.Practice

开源地址

MASA.BuildingBlocks:https://github.com/masastack/MASA.BuildingBlocks

MASA.Contrib:https://github.com/masastack/MASA.Contrib

MASA.Utils:https://github.com/masastack/MASA.Utils

MASA.EShop:https://github.com/masalabs/MASA.EShop

MASA.Blazor:https://github.com/BlazorComponent/MASA.Blazor

如果你对我们的 MASA Framework 感兴趣,无论是代码贡献、使用、提 Issue,欢迎联系我们

16373211753064.png

作者

MASA

发布于

2022-06-27

更新于

2022-09-26

许可协议