python-cqrs:借助 Outbox、Sagas 与 Mediator 模式构建弹性事件驱动系统

深度解析

python-cqrs 库针对分布式系统中的一个基础架构难题:在保持服务间松耦合的同时,维护一致性边界。与仅分离读写路径的简单 CQRS 实现不同,该框架将生产系统所需的关键基础设施模式——事务性 Outbox、Saga 编排以及事件中介——整合为一个有机整体。

架构设计理念

传统 CQRS 实现往往在生产环境中失效,因为它们忽视了“双写问题”。当服务更新数据库后尝试向消息代理发布事件时,任一操作都可能独立失败。如果数据库提交成功而代理发布失败,事件就会丢失——其他服务收不到通知,系统状态出现分歧。

python-cqrs 通过将事务性 Outbox 模式作为一等公民来解决此问题。事件与领域状态变更写入同一数据库事务,然后异步转发到 Kafka。这保证了最终一致性:只要事务提交,事件就存在且会被发布。

该框架的架构围绕三大支柱构建:

1. 基于中介的请求处理EventMediator 将请求发起者与处理器解耦。命令与查询流经中央中介,由 DI 容器解析处理器。这不仅是抽象本身的价值——它使框架能注入横切关注点(日志、指标、Outbox 持久化),而不污染领域逻辑。

2. 双重事件类型DomainEventNotificationEvent 的区分在架构上意义重大。领域事件表示有界上下文内发生的事情——它们是领域模型的一部分。通知事件是发布到外部系统的集成事件。这种分离让内部事件流可独立于外部契约演进。

3. 带补偿的 Saga 编排:跨服务的分布式事务需要协调。编排式 Saga 实现维护 Saga 状态、跟踪参与者响应,并在失败时执行补偿动作。关键设计决策在于:Saga 状态与业务数据一起持久化在数据库中,使 Saga 恢复具备防崩溃能力。

v5.0.0 破坏性变更:脱离 Pydantic

v5.0.0 从基于 Pydantic 的默认实现迁移到基于 dataclass 的实现,反映了一次面向生产环境的加固决策。Pydantic 的验证开销——尽管对输入清理有价值——在处理器之间传递内部构造的事件时变成不必要的成本。通过使 Pydantic 变为可选,框架允许:

  • 在热点路径上零开销地构造事件
  • 对无需验证的服务减小依赖树
  • 自定义序列化策略,而无需与 Pydantic 的模型体系对抗

IRequestIResponse 接口定义契约:to_dict()from_dict() 与类型注解。你可以用 Pydantic、dataclasses、attrs 或原生类实现这些——中介并不关心。

事件映射机制重构

重新设计的事件到处理器映射机制使用运行时解析的注册表模式。当你绑定命令到处理器时:

python
def commands_mapper(mapper: cqrs.RequestMap) -> None:
    mapper.bind(CreateOrderCommand, CreateOrderHandler)

框架将映射存储为 (request_type, handler_class)。在派发时,中介:

  1. 查找请求类型对应的处理器类
  2. 从 DI 容器解析处理器实例(支持构造器注入)
  3. 用请求调用处理器

这种间接层使 StreamingRequestMediatorCORRequestHandler 模式无需改动客户端代码即可实现——绑定保持不变,但中介在内部处理流式或责任链派发。

实现

面向生产的启动配置

bootstrap 模块提供合理默认值,但生产部署需显式配置。以下是包含 Kafka、Outbox 与 Saga 支持的完整设置:

python
import di
import cqrs
from cqrs.requests import bootstrap
from cqrs.outbox import Outbox
from cqrs.events import Event
from cqrs.saga import SagaOrchestrator

class OrderCreatedEvent(cqrs.NotificationEvent):
    event_type: str = "order.created"
    order_id: str
    customer_id: str
    total: float

class CreateOrderCommand(cqrs.Request):
    order_id: str
    customer_id: str
    items: list[dict]

class CreateOrderHandler(cqrs.RequestHandler[CreateOrderCommand, None]):
    def __init__(
        self,
        order_repository: OrderRepository,
        outbox: Outbox,
    ) -> None:
        self._orders = order_repository
        self._outbox = outbox
        self._events: list[Event] = []

    @property
    def events(self) -> list[Event]:
        return self._events

    async def handle(self, request: CreateOrderCommand) -> None:
        # 领域逻辑
        order = await self._orders.create(
            order_id=request.order_id,
            customer_id=request.customer_id,
            items=request.items,
        )
        
        # 创建供外部系统使用的通知事件
        event = OrderCreatedEvent(
            order_id=order.id,
            customer_id=order.customer_id,
            total=order.total,
        )
        
        # 在同一事务边界内加入 Outbox
        self._events.append(event)
        
        # Outbox 持久化由中介中间件完成

此处关键模式:self。_events 在处理期间累积事件。中介的 Outbox 中间件在处理器完成后拦截这些事件并原子化持久化。

基于 PostgreSQL 的事务性 Outbox

生产使用时,实现持久化的 Outbox 仓库:

python
import asyncpg
from cqrs.outbox import OutboxRepository, OutboxMessage

class PostgresOutboxRepository(OutboxRepository):
    def __init__(self, pool: asyncpg.Pool) -> None:
        self._pool = pool
    
    async def add(self, message: OutboxMessage) -> None:
        # 在领域变更同一事务中调用
        async with self._pool.acquire() as conn:
            await conn.execute(
                """
                INSERT INTO outbox (id, event_type, payload, created_at, status)
                VALUES ($1, $2, $3, $4, 'pending')
                """,
                message.id,
                message.event_type,
                message.payload,
                message.created_at,
            )
    
    async def get_pending_batch(self, batch_size: int = 100) -> list[OutboxMessage]:
        async with self._pool.acquire() as conn:
            rows = await conn.fetch(
                """
                UPDATE outbox SET status = 'processing'
                WHERE id IN (
                    SELECT id FROM outbox 
                    WHERE status = 'pending'
                    ORDER BY created_at
                    LIMIT $1
                    FOR UPDATE SKIP LOCKED
                )
                RETURNING id, event_type, payload, created_at
                """,
                batch_size,
            )
            return [OutboxMessage(**row) for row in rows]
    
    async def mark_processed(self, message_ids: list[str]) -> None:
        async with self._pool.acquire() as conn:
            await conn.execute(
                "UPDATE outbox SET status = 'processed' WHERE id = ANY($1)",
                message_ids,
            )

FOR UPDATE SKIP LOCKED 子句对水平扩展至关重要。多个 Outbox 处理器实例可从同一表拉取而无争用——每个实例仅锁定自己处理的行。

分布式事务的 Saga 编排

Saga 实现处理协调复杂性。以“创建订单并支付与库存”为例:

python
from cqrs.saga import Saga, SagaStep, SagaOrchestrator
from dataclasses import dataclass

@dataclass
class CreateOrderSagaState:
    order_id: str
    customer_id: str
    payment_id: str | None = None
    inventory_reservation_id: str | None = None
    status: str = "started"

class CreateOrderSaga(Saga[CreateOrderSagaState]):
    def __init__(self, saga_id: str, state: CreateOrderSagaState):
        super().__init__(saga_id, state)
    
    def define_steps(self) -> list[SagaStep]:
        return [
            SagaStep(
                name="create_payment",
                action_command=CreatePaymentCommand(
                    order_id=self.state.order_id,
                    amount=self.state.total,
                ),
                compensation_command=CancelPaymentCommand(
                    payment_id=self.state.payment_id,
                ),
                condition=lambda: self.state.payment_id is None,
            ),
            SagaStep(
                name="reserve_inventory",
                action_command=ReserveInventoryCommand(
                    order_id=self.state.order_id,
                    items=self.state.items,
                ),
                compensation_command=ReleaseInventoryCommand(
                    reservation_id=self.state.inventory_reservation_id,
                ),
                condition=lambda: self.state.inventory_reservation_id is None,
            ),
            SagaStep(
                name="confirm_order",
                action_command=ConfirmOrderCommand(
                    order_id=self.state.order_id,
                ),
                # 无补偿——最终步骤
            ),
        ]

Saga 编排器处理状态机:

python
saga_orchestrator = SagaOrchestrator(
    saga_repository=saga_repo,
    mediator=mediator,
    circuit_breaker=aiobreaker.CircuitBreaker(
        fail_max=5,
        reset_timeout=60,
    ),
)

# 启动 Saga
saga = CreateOrderSaga(
    saga_id=str(uuid.uuid4()),
    state=CreateOrderSagaState(
        order_id="ord-123",
        customer_id="cust-456",
    ),
)
await saga_orchestrator.start(saga)

断路器集成(aiobreaker)防止级联故障。连续 5 次步骤失败后,电路打开,Saga 进入补偿模式而不会反复冲击失效服务。

带并发控制的并行事件处理

框架支持可配置限制的并行事件处理:

python
from cqrs.events import EventMediator, EventHandler

mediator = EventMediator(
    event_handlers_registry=handlers,
    max_concurrent_handlers=10,  # 同时处理最多 10 个事件
)

这对高吞吐场景至关重要。如果从 Kafka 每秒处理 10,000 个事件但处理器是同步的,会产生积压。使用 max_concurrent_handlers=10 与异步处理器,可在不压垮系统的前提下饱和 I/O 密集型操作。

面向大容量事件流的 Protobuf 序列化

对每日交换数百万事件的服务,JSON 序列化会成为瓶颈。框架在接口层面支持 Protobuf:

python
from cqrs.protobuf import ProtobufSerializable

class OrderCreatedEvent(cqrs.NotificationEvent, ProtobufSerializable):
    event_type: str = "order.created"
    
    def to_protobuf(self) -> bytes:
        proto_event = order_pb2.OrderCreated(
            order_id=self.order_id,
            customer_id=self.customer_id,
            total=self.total,
        )
        return proto_event.SerializeToString()
    
    @classmethod
    def from_protobuf(cls, data: bytes) -> "OrderCreatedEvent":
        proto_event = order_pb2.OrderCreated()
        proto_event.ParseFromString(data)
        return cls(
            order_id=proto_event.order_id,
            customer_id=proto_event.customer_id,
            total=proto_event.total,
        )

基准测试中,Protobuf 序列化比 JSON 快 3-5 倍,且在典型事件结构上体积减少 60-80%。

陷阱与权衡

仅靠 Outbox 无法解决双写问题

事务性 Outbox 模式要求领域变更与 Outbox 写入在同一数据库事务中完成。如果你的架构为扩展而对写入与事件使用不同数据库(一种合理模式),则又引入了双写问题。

解决方案:使用事务日志尾随(CDC)或确保 Outbox 表与领域实体位于同一数据库。框架支持两种模式,但必须谨慎选择。

Saga 补偿需具备幂等性

补偿命令可能多次执行。如果 CancelPaymentCommand 运行两次,支付服务须优雅处理。框架提供 Saga 状态跟踪,但下游服务必须实现幂等性:

python
class PaymentService:
    async def cancel_payment(self, payment_id: str) -> None:
        # 行动前检查当前状态
        payment = await self._repo.get(payment_id)
        if payment.status == "cancelled":
            return  # 已取消,无需操作
        
        if payment.status == "completed":
            # 启动退款流程
            await self._refunds.create(payment_id=payment_id, amount=payment.amount)
        
        await self._repo.update_status(payment_id, "cancelled")

事件版本化且不破坏消费者

NotificationEvent 上的 event_type 字段应包含版本后缀:

python
class OrderCreatedV1(cqrs.NotificationEvent):
    event_type: str = "order.created.v1"
    order_id: str
    customer_id: str

class OrderCreatedV2(cqrs.NotificationEvent):
    event_type: str = "order.created.v2"
    order_id: str
    customer_id: str
    shipping_method: str  # 新字段

消费者订阅特定版本。需要变更事件结构时,向新版本主题发布,同时保留旧版以保证向后兼容。在定义的迁移窗口(通常生产系统为 90 天)内废弃旧版本。

事件累积导致内存泄漏

RequestHandler。events 属性为 Outbox 中间件累积事件。如果在长生命周期对象中处理请求(不推荐,但在 DI 容器配置错误时常见),事件会无限累积。

正确模式:处理器应按请求实例化,或在处理后被中介清空事件列表。bootstrap 负责此事,但自定义中介配置必须显式实现事件收集。

Kafka 消费延迟与背压

从 Kafka 消费事件时,若处理器耗时超过消息到达速率,延迟会迅速累积。框架不会自动暂停消费——你必须实现背压:

python
from aiokafka import AIOKafkaConsumer

consumer = AIOKafkaConsumer(
    "order-events",
    bootstrap_servers="kafka:9092",
    enable_auto_commit=False,  # 手动提交以实现精确一次语义
    max_poll_records=100,  # 限制批次大小
)

async def process_with_backpressure():
    async for batch in consumer:
        # 处理批次
        for record in batch:
            await event_mediator.handle(record.value)
        
        # 仅在成功处理后提交
        await consumer.commit()

进阶考量

多区域部署与事件顺序

事务性 Outbox 保证单数据库实例内的顺序。在多区域部署且各区域有独立数据库时,跨区域发布的事件可能乱序。

策略:在每个事件中纳入单调递增序列号,由数据库事务生成:

sql
-- PostgreSQL 示例
CREATE SEQUENCE event_sequence;

-- Outbox 插入时:
INSERT INTO outbox (id, event_type, payload, sequence_num)
VALUES (gen_random_uuid(), $1, $2, nextval('event_sequence'));

消费者可通过可配置窗口缓冲来检测并重排乱序事件。

事务日志尾随 vs 轮询 Outbox

框架支持两种 Outbox 处理模式:

轮询式:后台任务每 N 秒查询 Outbox 表:

python
from cqrs.outbox import OutboxProcessor

processor = OutboxProcessor(
    repository=outbox_repo,
    producer=kafka_producer,
    poll_interval=1.0,  # 1 秒
    batch_size=100,
)
await processor.start()

CDC 式:使用 Debezium 等工具尾随数据库事务日志:

yaml
# Debezium 连接器配置
connector.class: io.debezium.connector.postgresql.PostgresConnector
database.hostname: postgres
database.port: 5432
database.user: debezium
database.password: ${DEBEZIUM_PASSWORD}
table.include.list: public.outbox
database.server.name: order-service

CDC 延迟更低(亚秒级)且不消耗数据库性能,但增加运维复杂度。轮询更简单但产生查询负载,通常延迟 1-5 秒。

监控与可观测性集成

中介管道是插入分布式追踪的理想位置:

python
from opentelemetry import trace
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator

class TracingMiddleware:
    def __init__(self, tracer: trace.Tracer):
        self._tracer = tracer
    
    async def __call__(self, request: cqrs.Request, handler: Callable) -> None:
        # 提取或创建追踪上下文
        propagator = TraceContextTextMapPropagator()
        ctx = propagator.extract(request.metadata)
        
        with self._tracer.start_as_current_span(
            f"{request.__class__.__name__}.handle",
            context=ctx,
        ) as span:
            span.set_attribute("request.id", request.request_id)
            await handler(request)

需跟踪的关键指标:

  • Outbox 待处理数量(>1000 时告警)
  • Saga 完成率(<95% 时告警)
  • 事件处理延迟(p99 应 <500ms)
  • 各主题消费延迟(>10,000 条消息时告警)

处理毒消息

无效或不可处理的事件会使消费者无限阻塞。实现死信队列模式:

python
class ResilientEventHandler(EventHandler):
    def __init__(self, dlq_producer: KafkaProducer):
        self._dlq = dlq_producer
    
    async def handle(self, event: Event) -> None:
        try:
            await self._process(event)
        except (ValidationError, DeserializationError) as e:
            # 不可重试——发送至 DLQ
            await self._dlq.send(
                topic=f"{event.event_type}.dlq",
                value={
                    "event": event.to_dict(),
                    "error": str(e),
                    "timestamp": datetime.utcnow().isoformat(),
                },
            )
        except TransientError:
            # 可重试——抛出异常触发消费者重试
            raise

中介的扩展

若所有请求流经单一中介实例,中介本身可能成为瓶颈。框架设计支持水平扩展:

  1. 无状态处理器:确保处理器在请求间不保持内存状态。
  2. 外部化 Outbox:Outbox 仓库必须共享(基于数据库)。
  3. 分区 Kafka 主题:使用基于聚合 ID 的分区键以保持每实体的顺序。

对处理 >100K 请求/秒的服务,考虑:

  • 负载均衡后的多个中介实例
  • 分区命令队列(如 orders。commands。partition-0orders。commands。partition-1
  • 针对 I/O 密集与 CPU 密集工作负载的异步处理器池与专用事件循环

用于横切关注点的责任链模式

CORRequestHandler 支持通过处理器管道处理请求:

python
from cqrs.requests import CORRequestHandler

class LoggingCORHandler(CORRequestHandler):
    async def handle(self, request: Request) -> Response | None:
        logger.info(f"Processing {request.__class__.__name__}")
        result = await self._next.handle(request) if self._next else None
        logger.info(f"Completed {request.__class__.__name__}")
        return result

class AuthorizationCORHandler(CORRequestHandler):
    def __init__(self, next_handler: Handler | None, auth_service: AuthService):
        super().__init__(next_handler)
        self._auth = auth_service
    
    async def handle(self, request: Request) -> Response | None:
        if hasattr(request, 'user_id'):
            if not await self._auth.can_execute(request.user_id, request.__class__):
                raise UnauthorizedError(f"User cannot execute {request.__class__.__name__}")
        return await self._next.handle(request) if self._next else None

# 链:Logging -> Auth -> ActualHandler
handler = LoggingCORHandler(
    AuthorizationCORHandler(
        CreateOrderHandler(order_repo, outbox),
        auth_service,
    ),
)

此模式将横切关注点与领域逻辑分离,但要注意性能开销——链中每个处理器增加一个函数调用和潜在的异步上下文切换。对延迟要求严格的路径,建议在中介层面使用内联中间件。


核心要点

  1. 事务性 Outbox 模式要求领域变更与事件写入在同一数据库事务中完成;使用不同数据库会重新引入该模式旨在解决的双写问题。
  2. Saga 补偿命令必须幂等——在恢复期间可能多次执行,下游服务须优雅地处理重复补偿而不造成数据损坏。
  3. 对大容量事件流(>100K 事件/秒),Protobuf 序列化相比 JSON 提供 3-5 倍吞吐提升及 60-80% 体积缩减,但需要版本化与消费者协作。
  4. v5.0.0 迁移至基于 dataclass 的默认值消除了热点路径上的 Pydantic 验证开销——对外部输入验证可用 Pydantic,但内部事件传播推荐 dataclasses。
  5. 基于 Kafka 的系统消费延迟需显式背压处理;框架不会自动暂停消费,因此要实现批次大小限制与手动提交策略,以防处理高峰时内存耗尽。