15. 命令查询职责分离 - CQRS

15. 命令查询职责分离 - CQRS

概念

CQRS是一种与领域驱动设计和事件溯源相关的架构模式, 它的全称是Command Query Responsibility Segregation, 又叫命令查询职责分离, Greg Young在2010年创造了这个术语, 它是基于Bertrand Meyer 的 CQS (Command-Query Separation 命令查询分离原则) 设计模式。

CQRS认为不论业务多复杂在最终实现的时候, 无非是读写操作, 因此建议将应用程序分为两个方面, 即Command(命令)和Query(查询)

  • 命令端:

    • 关注各种业务如何处理, 更新状态进行持久化
    • 不返回任何结果 (void)
  • 查询端:

    • 查询, 并从不修改数据库

CQRS的三种实现

单一数据库的CQRS

命令与读取操作的是同一个数据库, 命令端通过ORM框架将实体保存到数据库中, 查询端通过数据访问层获取数据 (数据访问层通过ORM框架或者存储过程获取数据)

双数据库的CQRS

命令与读取操作的是不同的数据库, 命令端通过ORM框架将实体保存到 写库 (Write Db), 并将本地改动推送到 读库 (Read Db), 查询端通过数据访问层访问 读库 (Read Db), 使用这种模式可以带来以下好处:

  • 查询更简单
    • 读操作不需要任何的完整性校验, 也不需要外键约束, 可以减少锁争用, 我们可以针对查询端单独优化, 还可以使用刚好包含每个模板需要的数据的数据库视图,使得查询变得更快更简单
  • 提升查询端的使用体验
    • 由于这种架构将读写彻底分离,由于一般系统是读操作远远大于写操作, 这给我们的系统带来了巨大的性能提升, 极大的提升了客户的使用体验
  • 关注点分离
    • 读写分离的模型可以使得关注点分离, 使得读模型会变得相对简单

事件溯源 (Event Sourcing) CQRS

通过事件溯源实现的CQRS中会将应用程序的改变都以事件的方式存储起来, 使用这种模式可以带来以下好处:

  • 事件存储中了完整的审计跟踪, 后续出现问题时方便跟踪
  • 可以在任何的时间点重建实体的状态, 它将有助于排查问题并修复问题
  • 提升查询端的使用体验
    • 查询端与命令端可以是完全不同的数据源, 查询端可以针对查询条件做针对应的优化, 或者使用像ESRedis等用来存储数据, 提升查询效率
  • 独立缩放
    • 命令端与查询端可以被独立缩放, 减少锁争用

当然事情有利自然也有弊, CQRS的使用固然会带来很多好处, 但同样它也会给项目带来复杂度的提升, 并且双数据库模式、事件溯源模式CQRS, 使用的是最终一致性, 这些都是我们在选择技术方案时必须要考虑的

设计

上述文章中我们了解到了CQRS其本质上是一种读写分离的设计思想, 它并不是强制性的规定必须要怎样去做, 这点与之前的IEvent (进程内事件)、IIntegrationEvent (跨进程事件)不同, 它并不是强制性的, 根据CQRS的设计模式我们将事件分成CommandQuery

  • 由于Query (查询) 是需要有返回值的, 因此我们在继承IEvent的同时, 还额外增加了一个Result属性用以存储结果, 我们希望将查询的结果保存到Result中, 但它不是强制性的, 我们并没有强制性要求必须要将结果保存到Result中。

  • 由于Command (命令) 是没有返回值的, 因此我们并没有额外新增Result属性, 我们认为命令会更新数据, 那就需要用到工作单元, 因此Command除了继承IEvent之外, 还继承了ITransaction,这方便了我们在Handler中的可以通过@event.UnitOfWork来管理工作单元, 而不需要通过构造函数来获取

MasaFramework 并没有要求必须使用 Event Sourcing 模式 或者 双数据库模式 的CQRS, 具体使用哪种实现, 它取决于业务的决策者

下面就就来看看MasaFramework提供的CQRS是如何使用的

入门

  1. 新建ASP.NET Core 空项目Assignment.CqrsDemo,并安装Masa.Contrib.Dispatcher.EventsMasa.Contrib.Dispatcher.IntegrationEventsMasa.Contrib.Dispatcher.IntegrationEvents.DaprMasa.Contrib.ReadWriteSplitting.CqrsMasa.Contrib.Development.DaprStarter.AspNetCore
1
2
3
4
5
6
7
8
9
dotnet new web -o Assignment.CqrsDemo
cd Assignment.CqrsDemo

dotnet add package Masa.Contrib.Dispatcher.Events --version 0.7.0-preview.9 //使用进程内事件总线
dotnet add package Masa.Contrib.Dispatcher.IntegrationEvents --version 0.7.0-preview.9 //使用跨进程事件总线
dotnet add package Masa.Contrib.Dispatcher.IntegrationEvents.Dapr --version 0.7.0-preview.9 //使用Dapr提供pubsub能力
dotnet add package Masa.Contrib.ReadWriteSplitting.Cqrs --version 0.7.0-preview.9 //使用CQRS

dotnet add package Masa.Contrib.Development.DaprStarter.AspNetCore --version 0.7.0-preview.9 //开发环境下协助 Dapr Sidecar, 用于通过Dapr发布集成事件
  1. 注册跨进程事件总线、进程内事件总线, 修改类Program.cs

示例中未真实使用DB, 不再使用发件箱模式, 只需要使用集成事件提供的PubSub能力即可

1
2
3
4
5
builder.Services.AddIntegrationEventBus(dispatcherOptions =>
{
dispatcherOptions.UseDapr();//使用 Dapr 提供的PubSub能力
dispatcherOptions.UseEventBus();//使用进程内事件总线
});
  1. 注册Dapr Starter 协助管理Dapr Sidecar (开发环境使用)
1
2
if (builder.Environment.IsDevelopment())
builder.Services.AddDaprStarter();
  1. 新增加添加商品方法, 修改类Program.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
app.MapPost("/goods/add", async (AddGoodsCommand command, IEventBus eventBus) =>
{
await eventBus.PublishAsync(command);
});

/// <summary>
/// 添加商品参数, 用于接受商品参数
/// </summary>
public record AddGoodsCommand : Command
{
public string Name { get; set; }

public string Cover { get; set; }

public decimal Price { get; set; }

public int Count { get; set; }
}
  1. 新增加查询商品的方法, 修改类Program.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
app.MapGet("/goods/{id}", async (Guid id, IEventBus eventBus) =>
{
var query = new GoodsItemQuery(id);
await eventBus.PublishAsync(query);
return query.Result;
});

/// <summary>
/// 用于接收查询商品信息参数
/// </summary>
public record GoodsItemQuery : Query<GoodsItemDto>
{
public Guid Id { get; set; } = default!;

public override GoodsItemDto Result { get; set; }

public GoodsItemQuery(Guid id)
{
Id = id;
}
}

/// <summary>
/// 用于返回商品信息
/// </summary>
public class GoodsItemDto
{
public Guid Id { get; set; }

public string Name { get; set; }

public string Cover { get; set; }

public decimal Price { get; set; }

public int Count { get; set; }

public DateTime DateTime { get; set; }
}
  1. 新增Command处理程序, 添加类CommandHandler.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class CommandHandler
{
/// <summary>
/// 将商品添加到Db,并发送跨进程事件
/// </summary>
/// <param name="command"></param>
/// <param name="integrationEventBus"></param>
[EventHandler]
public async Task AddGoods(AddGoodsCommand command, IIntegrationEventBus integrationEventBus)
{
//todo: 模拟添加商品到db并发送添加商品集成事件

var goodsId = Guid.NewGuid(); //模拟添加到db后并获取商品id
await integrationEventBus.PublishAsync(new AddGoodsIntegrationEvent(goodsId, command.Name, command.Cover, command.Price,
command.Count));
}
}

/// <summary>
/// 跨进程事件, 发送添加商品事件
/// </summary>
/// <param name="Id"></param>
/// <param name="Name"></param>
/// <param name="Cover"></param>
/// <param name="Price"></param>
/// <param name="Count"></param>
public record AddGoodsIntegrationEvent(Guid Id, string Name, string Cover, decimal Price, int Count) : IntegrationEvent
{
public Guid Id { get; set; } = Id;

public string Name { get; set; } = Name;

public string Cover { get; set; } = Cover;

public decimal Price { get; set; } = Price;

public int Count { get; set; } = Count;

public override string Topic { get; set; } = nameof(AddGoodsIntegrationEvent);
}
  1. 新增Query处理程序, 添加类QueryHandler.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class QueryHandler
{
/// <summary>
/// 从缓存查询商品信息
/// </summary>
/// <param name="query"></param>
/// <returns></returns>
[EventHandler]
public Task GetGoods(GoodsItemQuery query)
{
//todo: 模拟从cache获取商品
var goods = new GoodsItemDto();

query.Result = goods;
return Task.CompletedTask;
}
}
  1. 新增添加商品的跨进程事件的处理服务, 修改Program.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
app.MapPost(
"/integration/goods/add",
[Topic("pubsub", nameof(AddGoodsIntegrationEvent))]
(AddGoodsIntegrationEvent @event, ILogger<Program> logger) =>
{
//todo: 模拟添加商品到缓存
logger.LogInformation("添加商品到缓存, {Event}", @event);
});

// 使用 dapr 来订阅跨进程事件
app.UseRouting();
app.UseCloudEvents();
app.UseEndpoints(endpoint =>
{
endpoint.MapSubscribeHandler();
});

流水账式的服务会使得Program.cs变得十分臃肿, 可以通过Masa Framework提供的MinimalAPIs来简化Program.cs 点击查看详情

总结

我们上面的例子是通过事件总线来完成解耦以及数据模型的同步, 使用的双数据库模式, 但读库使用的是 缓存数据库, 在Command端做商品的添加操作, 在Query端只做查询, 且两端分别使用各自的数据源, 两者业务互不影响, 并且由于缓存数据库性能更强, 它将最大限度的提升性能, 使得我们有更好的使用体验。

Masa Framework中仅仅是通过ICommandIQuery将读写分开, 但这并没有硬性要求, 事实上你使用IEvent也是可以的, CQRS只是一种设计模式, 这点我们要清楚, 它只是告诉我们要按照一个什么样的标准去做, 但具体怎么来做, 取决于业务的决策者, 除此之外, 后续Masa Framework还会增加对Event Sourcing事件溯源)的支持, 通过事件重放, 允许我们随时重建到对象的任何状态

本章源码

Assignment15

https://github.com/zhenlei520/MasaFramework.Practice

CQRS架构项目:https://github.com/masalabs/MASA.EShop/tree/main/src/Services/Masa.EShop.Services.Catalog

参考

开源地址

MASA.Framework:https://github.com/masastack/MASA.Framework

MASA.EShop:https://github.com/masalabs/MASA.EShop

MASA.Blazor:https://github.com/BlazorComponent/MASA.Blazor

如果你对我们的 MASA Framework 感兴趣, 无论是代码贡献、使用、提 Issue, 欢迎联系我们

16373211753064.png

作者

MASA

发布于

2022-11-23

更新于

2023-05-26

许可协议