15. 命令查询职责分离 - CQRS

15. 命令查询职责分离 - CQRS

介绍

CQRS是一种与领域驱动设计和事件溯源相关的架构模式,它的全称是Command Query Responsibility Segregation,又叫命令查询职责分离,Greg Young在2010年创造了这个术语,它是基于Bertrand Meyer 的 CQS(Command-Query Separation 命令查询分离原则)设计模式。

CQRS认为不论业务多复杂在最终实现的时候,无非是读写操作,因此建议将应用程序分为两个方面,即Command(命令)和Query(查询)

  • 命令端:
  1. 非幂等
  2. 关注各种业务如何处理,最后将数据更新写入并进行持久化
  3. 不返回任何结果(void)
  • 查询端:
  1. 幂等,仅仅是查询操作,并不会修改数据
  2. 返回结果(通过查询条件对应返回数据,在数据没有发生变更的情况下针对相同的条件,返回的数据也应该是一致的,针对这种特性,我们可以通过数据缓存提高系统性能,提高系统的QPS (Queries Per Second意思是“每秒查询率”)

为什么是Masa.Contrib.ReadWriteSpliting.Cqrs?

由于CQRS是设计模式,其本质上是一种读写分离的设计思想,与之前提到的IEventIIntegrationEvent不同,它并不是强制性的,基于CQRS我们将事件分成CommandQuery

  • 由于Query是需要有返回值的,因此我们在集成IEvent的同时,还额外增加了一个Result属性用以存储结果,我们希望将查询的结果保存到Result中,但它不是强制性的,我们并没有强制性要求必须要将结果保存到Result中。

  • 由于Command是没有返回值的,因此我们并没有额外新增Result属性,但命令不是幂等的,我们认为命令会更新数据,那就需要用到工作单元,因此Command还继承了ITransaction,这方便了我们在Handler中的可以通过@event.UnitOfWork来管理工作单元,而不需要通过构造函数来获取

下面就就来看看如何在项目中使用CQRS

入门

  1. 新建ASP.NET Core 空项目Assignment.CqrsDemo,并安装Masa.Contrib.Dispatcher.EventsMasa.Contrib.ReadWriteSpliting.CqrsMasa.Contrib.Dispatcher.IntegrationEvents.DaprMasa.Contrib.ReadWriteSpliting.CqrsMasa.Utils.Development.Dapr.AspNetCore

    1
    2
    3
    4
    5
    6
    7
    8
    9
    dotnet new web -o Assignment.CqrsDemo
    cd Assignment.CqrsDemo

    dotnet add package Masa.Contrib.Dispatcher.Events --version 0.5.0-preview.7
    dotnet add package Masa.Contrib.ReadWriteSpliting.Cqrs --version 0.5.0-preview.7
    dotnet add package Masa.Contrib.Dispatcher.IntegrationEvents --version 0.5.0-preview.7 // 使用跨进程事件总线
    dotnet add package Masa.Contrib.Dispatcher.IntegrationEvents.Dapr --version 0.5.0-preview.7 // 使用Dapr提供pubsub能力

    dotnet add package Masa.Utils.Development.Dapr.AspNetCore --version 0.5.0-rc.1 // 开发环境下管理dapr sidecar
  2. 注册跨进程事件总线、进程内事件总线,修改Program

    1
    2
    3
    4
    5
    6
    builder.Services.AddIntegrationEventBus(dispatcherOptions =>
    {
    //不使用本地消息表
    dispatcherOptions.UseDapr();
    dispatcherOptions.UseEventBus();
    });
  3. 使用Dapr Starter管理Dapr Sidecar(开发环境使用)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    if (builder.Environment.IsDevelopment())
    {
    builder.Services.AddDaprStarter(opt =>
    {
    opt.DaprHttpPort = 7100;
    opt.DaprGrpcPort = 7101;
    opt.AppPort = 5072;
    });
    }
  4. 新增加添加商品方法,修改Program

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    app.MapPost("/goods/add", async (AddGoodsCommand command, IEventBus eventBus) =>
    {
    await eventBus.PublishAsync(command);
    });

    /// <summary>
    /// 添加商品参数,用于接受商品参数
    /// </summary>
    public record AddGoodsCommand : Command
    {
    public string Name { get; set; }

    public string Cover { get; set; }

    public decimal Price { get; set; }

    public int Count { get; set; }
    }
  5. 新增加查询商品的方法,修改Program

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    app.MapGet("/goods/{id}", async (Guid id, IEventBus eventBus) =>
    {
    var query = new GoodsItemQuery(id);
    await eventBus.PublishAsync(query);
    return query.Result;
    });

    /// <summary>
    /// 用于接收查询商品信息参数
    /// </summary>
    public record GoodsItemQuery : Query<GoodsItemDto>
    {
    public Guid Id { get; set; } = default!;

    public override GoodsItemDto Result { get; set; }

    public GoodsItemQuery(Guid id)
    {
    Id = id;
    }
    }

    /// <summary>
    /// 用于返回商品信息
    /// </summary>
    public class GoodsItemDto
    {
    public Guid Id { get; set; }

    public string Name { get; set; }

    public string Cover { get; set; }

    public decimal Price { get; set; }

    public int Count { get; set; }

    public DateTime DateTime { get; set; }
    }
  6. 新增添加商品、查询商品处理程序,添加类GoodsHandler

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    public class GoodsHandler
    {
    private readonly IIntegrationEventBus _integrationEventBus;

    public GoodsHandler(IIntegrationEventBus integrationEventBus)
    {
    _integrationEventBus = integrationEventBus;
    }

    /// <summary>
    /// 将商品添加到Db,并发送跨进程事件,用于处理读数据
    /// </summary>
    /// <param name="command"></param>
    [EventHandler]
    public async Task AddGoods(AddGoodsCommand command)
    {
    //todo: 模拟添加商品到db并发送添加商品集成事件

    var goodsId = Guid.NewGuid(); //模拟添加到db后并获取商品id
    await _integrationEventBus.PublishAsync(new AddGoodsIntegrationEvent(goodsId, command.Name, command.Cover, command.Price,
    command.Count));
    }

    /// <summary>
    /// 从缓存查询商品信息
    /// </summary>
    /// <param name="query"></param>
    /// <returns></returns>
    [EventHandler]
    public Task GetGoods(GoodsItemQuery query)
    {
    //todo: 模拟从cache获取商品
    var goods = new GoodsItemDto();

    query.Result = goods;
    return Task.CompletedTask;
    }
    }

    /// <summary>
    /// 跨进程事件,发送添加商品事件,用于处理读数据的问题
    /// </summary>
    /// <param name="Id"></param>
    /// <param name="Name"></param>
    /// <param name="Cover"></param>
    /// <param name="Price"></param>
    /// <param name="Count"></param>
    public record AddGoodsIntegrationEvent(Guid Id, string Name, string Cover, decimal Price, int Count) : IntegrationEvent
    {
    public Guid Id { get; set; } = Id;

    public string Name { get; set; } = Name;

    public string Cover { get; set; } = Cover;

    public decimal Price { get; set; } = Price;

    public int Count { get; set; } = Count;

    public override string Topic { get; set; } = nameof(AddGoodsIntegrationEvent);
    }
  7. 新增添加商品的跨进程事件的处理服务,修改Program

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    app.MapPost("/integration/goods/add", (AddGoodsIntegrationEvent @event, ILogger<Program> logger) =>
    {
    //todo: 模拟添加商品到缓存
    logger.LogInformation("添加商品到缓存");
    }).WithTopic("pubsub", nameof(AddGoodsIntegrationEvent));

    // 使用 dapr 来订阅跨进程事件
    app.UseRouting();
    app.UseCloudEvents();
    app.UseEndpoints(endpoint =>
    {
    endpoint.MapSubscribeHandler();
    });

流水账式的服务会使得Program变得十分臃肿,可以通过Masa版的MinimalAPIs来简化Program 点击查看详情

总结

我们上面的例子是通过事件总线来完成解耦以及数据模型的同步,在Command端做商品的添加操作,在Query端只做查询,且两端分别使用各自的数据源,两者业务互不影响。

但与基于传统的CRUD的应用程序不同的是,CQRS可以通过使用单独的查询模型、更新模型来简化设计和实现,但它们也可以使用一个模型,这个不是强制性的,CQRS只是一种设计模式,而不是必须要遵守的,但按照CQRS的模式来设计可以最大限度的提升性能以及扩展性,比如Command操作的是 RDBMS(Relational Database Management System 关系型数据库),而Query查询的是Redis或者Es都是没有问题的。

除此之外,后续会增加对Event Sourcing事件溯源)的支持,通过事件重放,允许我们随时重建到对象的任何状态。

本章源码

Assignment15

https://github.com/zhenlei520/MasaFramework.Practice

CQRS架构项目:https://github.com/masalabs/MASA.EShop/tree/main/src/Services/Masa.EShop.Services.Catalog

开源地址

MASA.BuildingBlocks:https://github.com/masastack/MASA.BuildingBlocks

MASA.Contrib:https://github.com/masastack/MASA.Contrib

MASA.Utils:https://github.com/masastack/MASA.Utils

MASA.EShop:https://github.com/masalabs/MASA.EShop

MASA.Blazor:https://github.com/BlazorComponent/MASA.Blazor

如果你对我们的 MASA Framework 感兴趣,无论是代码贡献、使用、提 Issue,欢迎联系我们

16373211753064.png

作者

MASA

发布于

2022-07-15

更新于

2022-09-26

许可协议