FastAPI 后端实现
后端核心在 examples/durable-mini/backend/app/engine.py。它把 Restate 的 context action 思想压缩成两个方法:run 和 sleep,并通过 business_orders 表展示 workflow 如何驱动业务状态。
API 层
POST /api/orders 创建 invocation。它读取 Idempotency-Key,如果 key 已存在,直接返回旧 invocation。
@app.post("/api/orders", response_model=InvocationOut)
def create_order(payload: OrderRequest, idempotency_key: str | None = Header(default=None)):
key = idempotency_key or payload.idempotency_key
return create_or_get_invocation(key, payload.model_dump())Worker loop
后台 worker 周期扫描可执行 invocation:
while True:
ids = list_runnable_invocation_ids()
for invocation_id in ids:
run_invocation_once(invocation_id)
await asyncio.sleep(1)教学版没有做复杂调度,只要状态是 QUEUED、RETRYABLE 或 WAITING_TIMER,worker 都会尝试执行。timer 未到期时,ctx.sleep 会再次暂停。
ctx.run
def run(self, name: str, fn: Callable[[], JsonDict]) -> JsonDict:
step_index = self._next_step_index()
existing = self._find_step(step_index)
if existing and existing.status == "COMPLETED":
self.event(f"replay step {step_index}: {name}")
return existing.result
result = fn()
self._append_step(step_index, name, "COMPLETED", result)
return result这就是重放跳过的核心。
业务表更新放在哪里
订单业务代码在 examples/durable-mini/backend/app/handlers.py。每个 durable step 不只返回一个结果,也会更新 business_orders:
def charge_payment(ctx, order):
business_order = _get_order(ctx, order["order_id"])
payment_id = _now_id("pay")
business_order.payment_id = payment_id
business_order.status = "PAID"
ctx.record_event("business order PAID")
return {"payment_id": payment_id, "order_id": business_order.order_id}注意这里没有直接 commit()。教学版让业务表更新、runtime event 和 journal entry 通过同一个 SQLAlchemy session 一起提交。这样读者能观察到:
| durable step | 业务表变化 |
|---|---|
create-order | 插入订单,status=CREATED |
charge-payment | 写入 payment_id,status=PAID |
reserve-inventory | 写入 reservation_id,status=INVENTORY_RESERVED |
mark-settlement-waiting | status=WAITING_SETTLEMENT |
send-receipt | 写入 receipt_id,status=COMPLETED |
重放时,已经完成的 durable step 会直接返回 Journal 里的结果,因此不会再次更新订单表或重新生成支付单号。
ctx.sleep
def sleep(self, name: str, seconds: int) -> None:
step_index = self._next_step_index()
existing = self._find_step(step_index)
if existing is None:
fire_at = utcnow() + timedelta(seconds=seconds)
self._append_step(step_index, name, "PENDING_TIMER", {"fire_at": fire_at.isoformat()})
raise SuspendExecution()
if utcnow() < parse(existing.result["fire_at"]):
raise SuspendExecution()
existing.status = "COMPLETED"
self.session.commit()注意它不是阻塞等待,而是把等待意图写入 Journal,然后让 invocation 进入 WAITING_TIMER。
故障注入
业务 handler 支持 crash_after:
maybe_crash(ctx, payload, "charge-payment")它只在第一次 attempt 触发。这样你可以观察第二次 attempt 如何 replay 已有步骤。
生产化缺口
教学版足以解释原理,但离生产系统还有距离:
| 缺口 | 生产化方向 |
|---|---|
| 多 worker 竞争 | 行级锁、租约、epoch fencing |
| 外部副作用窗口 | 外部 idempotency key、outbox、补偿 |
| 日志与物化视图未分离 | append-only log + materializer |
| 无服务协议 | 独立 SDK、双向流、action ack |
| 无分区 | key hash、partition processor、跨分区投递 |