python-cqrs:借助 Outbox、Sagas 与 Mediator 模式构建弹性事件驱动系统
深度解析
python-cqrs 库针对分布式系统中的一个基础架构难题:在保持服务间松耦合的同时,维护一致性边界。与仅分离读写路径的简单 CQRS 实现不同,该框架将生产系统所需的关键基础设施模式——事务性 Outbox、Saga 编排以及事件中介——整合为一个有机整体。
架构设计理念
传统 CQRS 实现往往在生产环境中失效,因为它们忽视了“双写问题”。当服务更新数据库后尝试向消息代理发布事件时,任一操作都可能独立失败。如果数据库提交成功而代理发布失败,事件就会丢失——其他服务收不到通知,系统状态出现分歧。
python-cqrs 通过将事务性 Outbox 模式作为一等公民来解决此问题。事件与领域状态变更写入同一数据库事务,然后异步转发到 Kafka。这保证了最终一致性:只要事务提交,事件就存在且会被发布。
该框架的架构围绕三大支柱构建:
1. 基于中介的请求处理:EventMediator 将请求发起者与处理器解耦。命令与查询流经中央中介,由 DI 容器解析处理器。这不仅是抽象本身的价值——它使框架能注入横切关注点(日志、指标、Outbox 持久化),而不污染领域逻辑。
2. 双重事件类型:DomainEvent 与 NotificationEvent 的区分在架构上意义重大。领域事件表示有界上下文内发生的事情——它们是领域模型的一部分。通知事件是发布到外部系统的集成事件。这种分离让内部事件流可独立于外部契约演进。
3. 带补偿的 Saga 编排:跨服务的分布式事务需要协调。编排式 Saga 实现维护 Saga 状态、跟踪参与者响应,并在失败时执行补偿动作。关键设计决策在于:Saga 状态与业务数据一起持久化在数据库中,使 Saga 恢复具备防崩溃能力。
v5.0.0 破坏性变更:脱离 Pydantic
v5.0.0 从基于 Pydantic 的默认实现迁移到基于 dataclass 的实现,反映了一次面向生产环境的加固决策。Pydantic 的验证开销——尽管对输入清理有价值——在处理器之间传递内部构造的事件时变成不必要的成本。通过使 Pydantic 变为可选,框架允许:
- 在热点路径上零开销地构造事件
- 对无需验证的服务减小依赖树
- 自定义序列化策略,而无需与 Pydantic 的模型体系对抗
IRequest 与 IResponse 接口定义契约:to_dict()、from_dict() 与类型注解。你可以用 Pydantic、dataclasses、attrs 或原生类实现这些——中介并不关心。
事件映射机制重构
重新设计的事件到处理器映射机制使用运行时解析的注册表模式。当你绑定命令到处理器时:
def commands_mapper(mapper: cqrs.RequestMap) -> None:
mapper.bind(CreateOrderCommand, CreateOrderHandler)框架将映射存储为 (request_type, handler_class)。在派发时,中介:
- 查找请求类型对应的处理器类
- 从 DI 容器解析处理器实例(支持构造器注入)
- 用请求调用处理器
这种间接层使 StreamingRequestMediator 与 CORRequestHandler 模式无需改动客户端代码即可实现——绑定保持不变,但中介在内部处理流式或责任链派发。
实现
面向生产的启动配置
bootstrap 模块提供合理默认值,但生产部署需显式配置。以下是包含 Kafka、Outbox 与 Saga 支持的完整设置:
import di
import cqrs
from cqrs.requests import bootstrap
from cqrs.outbox import Outbox
from cqrs.events import Event
from cqrs.saga import SagaOrchestrator
class OrderCreatedEvent(cqrs.NotificationEvent):
event_type: str = "order.created"
order_id: str
customer_id: str
total: float
class CreateOrderCommand(cqrs.Request):
order_id: str
customer_id: str
items: list[dict]
class CreateOrderHandler(cqrs.RequestHandler[CreateOrderCommand, None]):
def __init__(
self,
order_repository: OrderRepository,
outbox: Outbox,
) -> None:
self._orders = order_repository
self._outbox = outbox
self._events: list[Event] = []
@property
def events(self) -> list[Event]:
return self._events
async def handle(self, request: CreateOrderCommand) -> None:
# 领域逻辑
order = await self._orders.create(
order_id=request.order_id,
customer_id=request.customer_id,
items=request.items,
)
# 创建供外部系统使用的通知事件
event = OrderCreatedEvent(
order_id=order.id,
customer_id=order.customer_id,
total=order.total,
)
# 在同一事务边界内加入 Outbox
self._events.append(event)
# Outbox 持久化由中介中间件完成此处关键模式:self。_events 在处理期间累积事件。中介的 Outbox 中间件在处理器完成后拦截这些事件并原子化持久化。
基于 PostgreSQL 的事务性 Outbox
生产使用时,实现持久化的 Outbox 仓库:
import asyncpg
from cqrs.outbox import OutboxRepository, OutboxMessage
class PostgresOutboxRepository(OutboxRepository):
def __init__(self, pool: asyncpg.Pool) -> None:
self._pool = pool
async def add(self, message: OutboxMessage) -> None:
# 在领域变更同一事务中调用
async with self._pool.acquire() as conn:
await conn.execute(
"""
INSERT INTO outbox (id, event_type, payload, created_at, status)
VALUES ($1, $2, $3, $4, 'pending')
""",
message.id,
message.event_type,
message.payload,
message.created_at,
)
async def get_pending_batch(self, batch_size: int = 100) -> list[OutboxMessage]:
async with self._pool.acquire() as conn:
rows = await conn.fetch(
"""
UPDATE outbox SET status = 'processing'
WHERE id IN (
SELECT id FROM outbox
WHERE status = 'pending'
ORDER BY created_at
LIMIT $1
FOR UPDATE SKIP LOCKED
)
RETURNING id, event_type, payload, created_at
""",
batch_size,
)
return [OutboxMessage(**row) for row in rows]
async def mark_processed(self, message_ids: list[str]) -> None:
async with self._pool.acquire() as conn:
await conn.execute(
"UPDATE outbox SET status = 'processed' WHERE id = ANY($1)",
message_ids,
)FOR UPDATE SKIP LOCKED 子句对水平扩展至关重要。多个 Outbox 处理器实例可从同一表拉取而无争用——每个实例仅锁定自己处理的行。
分布式事务的 Saga 编排
Saga 实现处理协调复杂性。以“创建订单并支付与库存”为例:
from cqrs.saga import Saga, SagaStep, SagaOrchestrator
from dataclasses import dataclass
@dataclass
class CreateOrderSagaState:
order_id: str
customer_id: str
payment_id: str | None = None
inventory_reservation_id: str | None = None
status: str = "started"
class CreateOrderSaga(Saga[CreateOrderSagaState]):
def __init__(self, saga_id: str, state: CreateOrderSagaState):
super().__init__(saga_id, state)
def define_steps(self) -> list[SagaStep]:
return [
SagaStep(
name="create_payment",
action_command=CreatePaymentCommand(
order_id=self.state.order_id,
amount=self.state.total,
),
compensation_command=CancelPaymentCommand(
payment_id=self.state.payment_id,
),
condition=lambda: self.state.payment_id is None,
),
SagaStep(
name="reserve_inventory",
action_command=ReserveInventoryCommand(
order_id=self.state.order_id,
items=self.state.items,
),
compensation_command=ReleaseInventoryCommand(
reservation_id=self.state.inventory_reservation_id,
),
condition=lambda: self.state.inventory_reservation_id is None,
),
SagaStep(
name="confirm_order",
action_command=ConfirmOrderCommand(
order_id=self.state.order_id,
),
# 无补偿——最终步骤
),
]Saga 编排器处理状态机:
saga_orchestrator = SagaOrchestrator(
saga_repository=saga_repo,
mediator=mediator,
circuit_breaker=aiobreaker.CircuitBreaker(
fail_max=5,
reset_timeout=60,
),
)
# 启动 Saga
saga = CreateOrderSaga(
saga_id=str(uuid.uuid4()),
state=CreateOrderSagaState(
order_id="ord-123",
customer_id="cust-456",
),
)
await saga_orchestrator.start(saga)断路器集成(aiobreaker)防止级联故障。连续 5 次步骤失败后,电路打开,Saga 进入补偿模式而不会反复冲击失效服务。
带并发控制的并行事件处理
框架支持可配置限制的并行事件处理:
from cqrs.events import EventMediator, EventHandler
mediator = EventMediator(
event_handlers_registry=handlers,
max_concurrent_handlers=10, # 同时处理最多 10 个事件
)这对高吞吐场景至关重要。如果从 Kafka 每秒处理 10,000 个事件但处理器是同步的,会产生积压。使用 max_concurrent_handlers=10 与异步处理器,可在不压垮系统的前提下饱和 I/O 密集型操作。
面向大容量事件流的 Protobuf 序列化
对每日交换数百万事件的服务,JSON 序列化会成为瓶颈。框架在接口层面支持 Protobuf:
from cqrs.protobuf import ProtobufSerializable
class OrderCreatedEvent(cqrs.NotificationEvent, ProtobufSerializable):
event_type: str = "order.created"
def to_protobuf(self) -> bytes:
proto_event = order_pb2.OrderCreated(
order_id=self.order_id,
customer_id=self.customer_id,
total=self.total,
)
return proto_event.SerializeToString()
@classmethod
def from_protobuf(cls, data: bytes) -> "OrderCreatedEvent":
proto_event = order_pb2.OrderCreated()
proto_event.ParseFromString(data)
return cls(
order_id=proto_event.order_id,
customer_id=proto_event.customer_id,
total=proto_event.total,
)基准测试中,Protobuf 序列化比 JSON 快 3-5 倍,且在典型事件结构上体积减少 60-80%。
陷阱与权衡
仅靠 Outbox 无法解决双写问题
事务性 Outbox 模式要求领域变更与 Outbox 写入在同一数据库事务中完成。如果你的架构为扩展而对写入与事件使用不同数据库(一种合理模式),则又引入了双写问题。
解决方案:使用事务日志尾随(CDC)或确保 Outbox 表与领域实体位于同一数据库。框架支持两种模式,但必须谨慎选择。
Saga 补偿需具备幂等性
补偿命令可能多次执行。如果 CancelPaymentCommand 运行两次,支付服务须优雅处理。框架提供 Saga 状态跟踪,但下游服务必须实现幂等性:
class PaymentService:
async def cancel_payment(self, payment_id: str) -> None:
# 行动前检查当前状态
payment = await self._repo.get(payment_id)
if payment.status == "cancelled":
return # 已取消,无需操作
if payment.status == "completed":
# 启动退款流程
await self._refunds.create(payment_id=payment_id, amount=payment.amount)
await self._repo.update_status(payment_id, "cancelled")事件版本化且不破坏消费者
NotificationEvent 上的 event_type 字段应包含版本后缀:
class OrderCreatedV1(cqrs.NotificationEvent):
event_type: str = "order.created.v1"
order_id: str
customer_id: str
class OrderCreatedV2(cqrs.NotificationEvent):
event_type: str = "order.created.v2"
order_id: str
customer_id: str
shipping_method: str # 新字段消费者订阅特定版本。需要变更事件结构时,向新版本主题发布,同时保留旧版以保证向后兼容。在定义的迁移窗口(通常生产系统为 90 天)内废弃旧版本。
事件累积导致内存泄漏
RequestHandler。events 属性为 Outbox 中间件累积事件。如果在长生命周期对象中处理请求(不推荐,但在 DI 容器配置错误时常见),事件会无限累积。
正确模式:处理器应按请求实例化,或在处理后被中介清空事件列表。bootstrap 负责此事,但自定义中介配置必须显式实现事件收集。
Kafka 消费延迟与背压
从 Kafka 消费事件时,若处理器耗时超过消息到达速率,延迟会迅速累积。框架不会自动暂停消费——你必须实现背压:
from aiokafka import AIOKafkaConsumer
consumer = AIOKafkaConsumer(
"order-events",
bootstrap_servers="kafka:9092",
enable_auto_commit=False, # 手动提交以实现精确一次语义
max_poll_records=100, # 限制批次大小
)
async def process_with_backpressure():
async for batch in consumer:
# 处理批次
for record in batch:
await event_mediator.handle(record.value)
# 仅在成功处理后提交
await consumer.commit()进阶考量
多区域部署与事件顺序
事务性 Outbox 保证单数据库实例内的顺序。在多区域部署且各区域有独立数据库时,跨区域发布的事件可能乱序。
策略:在每个事件中纳入单调递增序列号,由数据库事务生成:
-- PostgreSQL 示例
CREATE SEQUENCE event_sequence;
-- Outbox 插入时:
INSERT INTO outbox (id, event_type, payload, sequence_num)
VALUES (gen_random_uuid(), $1, $2, nextval('event_sequence'));消费者可通过可配置窗口缓冲来检测并重排乱序事件。
事务日志尾随 vs 轮询 Outbox
框架支持两种 Outbox 处理模式:
轮询式:后台任务每 N 秒查询 Outbox 表:
from cqrs.outbox import OutboxProcessor
processor = OutboxProcessor(
repository=outbox_repo,
producer=kafka_producer,
poll_interval=1.0, # 1 秒
batch_size=100,
)
await processor.start()CDC 式:使用 Debezium 等工具尾随数据库事务日志:
# Debezium 连接器配置
connector.class: io.debezium.connector.postgresql.PostgresConnector
database.hostname: postgres
database.port: 5432
database.user: debezium
database.password: ${DEBEZIUM_PASSWORD}
table.include.list: public.outbox
database.server.name: order-serviceCDC 延迟更低(亚秒级)且不消耗数据库性能,但增加运维复杂度。轮询更简单但产生查询负载,通常延迟 1-5 秒。
监控与可观测性集成
中介管道是插入分布式追踪的理想位置:
from opentelemetry import trace
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
class TracingMiddleware:
def __init__(self, tracer: trace.Tracer):
self._tracer = tracer
async def __call__(self, request: cqrs.Request, handler: Callable) -> None:
# 提取或创建追踪上下文
propagator = TraceContextTextMapPropagator()
ctx = propagator.extract(request.metadata)
with self._tracer.start_as_current_span(
f"{request.__class__.__name__}.handle",
context=ctx,
) as span:
span.set_attribute("request.id", request.request_id)
await handler(request)需跟踪的关键指标:
- Outbox 待处理数量(>1000 时告警)
- Saga 完成率(<95% 时告警)
- 事件处理延迟(p99 应 <500ms)
- 各主题消费延迟(>10,000 条消息时告警)
处理毒消息
无效或不可处理的事件会使消费者无限阻塞。实现死信队列模式:
class ResilientEventHandler(EventHandler):
def __init__(self, dlq_producer: KafkaProducer):
self._dlq = dlq_producer
async def handle(self, event: Event) -> None:
try:
await self._process(event)
except (ValidationError, DeserializationError) as e:
# 不可重试——发送至 DLQ
await self._dlq.send(
topic=f"{event.event_type}.dlq",
value={
"event": event.to_dict(),
"error": str(e),
"timestamp": datetime.utcnow().isoformat(),
},
)
except TransientError:
# 可重试——抛出异常触发消费者重试
raise中介的扩展
若所有请求流经单一中介实例,中介本身可能成为瓶颈。框架设计支持水平扩展:
- 无状态处理器:确保处理器在请求间不保持内存状态。
- 外部化 Outbox:Outbox 仓库必须共享(基于数据库)。
- 分区 Kafka 主题:使用基于聚合 ID 的分区键以保持每实体的顺序。
对处理 >100K 请求/秒的服务,考虑:
- 负载均衡后的多个中介实例
- 分区命令队列(如
orders。commands。partition-0、orders。commands。partition-1) - 针对 I/O 密集与 CPU 密集工作负载的异步处理器池与专用事件循环
用于横切关注点的责任链模式
CORRequestHandler 支持通过处理器管道处理请求:
from cqrs.requests import CORRequestHandler
class LoggingCORHandler(CORRequestHandler):
async def handle(self, request: Request) -> Response | None:
logger.info(f"Processing {request.__class__.__name__}")
result = await self._next.handle(request) if self._next else None
logger.info(f"Completed {request.__class__.__name__}")
return result
class AuthorizationCORHandler(CORRequestHandler):
def __init__(self, next_handler: Handler | None, auth_service: AuthService):
super().__init__(next_handler)
self._auth = auth_service
async def handle(self, request: Request) -> Response | None:
if hasattr(request, 'user_id'):
if not await self._auth.can_execute(request.user_id, request.__class__):
raise UnauthorizedError(f"User cannot execute {request.__class__.__name__}")
return await self._next.handle(request) if self._next else None
# 链:Logging -> Auth -> ActualHandler
handler = LoggingCORHandler(
AuthorizationCORHandler(
CreateOrderHandler(order_repo, outbox),
auth_service,
),
)此模式将横切关注点与领域逻辑分离,但要注意性能开销——链中每个处理器增加一个函数调用和潜在的异步上下文切换。对延迟要求严格的路径,建议在中介层面使用内联中间件。
核心要点
- 事务性 Outbox 模式要求领域变更与事件写入在同一数据库事务中完成;使用不同数据库会重新引入该模式旨在解决的双写问题。
- Saga 补偿命令必须幂等——在恢复期间可能多次执行,下游服务须优雅地处理重复补偿而不造成数据损坏。
- 对大容量事件流(>100K 事件/秒),Protobuf 序列化相比 JSON 提供 3-5 倍吞吐提升及 60-80% 体积缩减,但需要版本化与消费者协作。
- v5.0.0 迁移至基于 dataclass 的默认值消除了热点路径上的 Pydantic 验证开销——对外部输入验证可用 Pydantic,但内部事件传播推荐 dataclasses。
- 基于 Kafka 的系统消费延迟需显式背压处理;框架不会自动暂停消费,因此要实现批次大小限制与手动提交策略,以防处理高峰时内存耗尽。