在 Azure Functions 中构建基于 Saga 模式与读写分离的事务性工作流


在无服务器(Serverless)架构中处理跨多个服务的复杂业务事务,是一个无法回避的挑战。一个典型的场景是:用户发起一个订单请求,系统需要依次完成库存扣减、优惠券核销、支付网关调用、以及物流单创建。这些操作分布在不同限界上下文中,任何一步失败,都需要回滚已成功的操作。传统的单体应用可以通过数据库的 ACID 事务来保证原子性,但在 FaaS (Function as a Service) 模型下,函数是短暂且无状态的,依赖长连接的数据库事务变得不切实际且极易出错。

直接暴露一个长时间运行的 HTTP-triggered Function 来按顺序调用各个服务是一种反模式。它不仅会轻易超出 Azure Functions 的默认超时限制(5分钟,可调至10分钟),还会造成调用方长时间的同步等待,并且在函数实例崩溃时,整个事务的状态会丢失,无法恢复。

为了解决这个问题,我们需要一个能够编排分布式事务、具备容错和补偿能力的架构。同时,对于订单这类系统,写入(创建订单)的并发量远低于读取(查询订单状态),对读写性能的要求也截然不同。这自然引出了两种架构模式的结合:使用 Saga 模式来管理分布式事务的生命周期,并采用读写分离(CQRS 的一种简化形式)来优化查询性能。

方案权衡:编排式 Saga vs. 协同式 Saga

在选择 Saga 实现方式时,主要有两种流派:

  1. 协同式 (Choreography): 每个服务完成自己的任务后,发布一个事件。其他服务订阅这些事件并触发自己的后续操作。这种方式松耦合,服务间没有直接调用。但其缺点也十分明显:整个业务流程是隐式的,散落在各个服务的事件订阅逻辑中。当流程复杂时,很难追踪一个完整事务的当前状态,也难以定位问题。增加或修改一个步骤,可能需要改动多个服务。补偿逻辑也变得异常复杂,一个失败事件可能需要触发一连串的补偿事件。

  2. 编排式 (Orchestration): 引入一个中心化的编排器(Orchestrator)。由编排器负责驱动整个业务流程,向各个参与方服务发送命令(Command),并根据执行结果(成功或失败事件)决定下一步是继续推进还是执行补偿操作。这种方式将复杂的业务流程逻辑集中管理,使得事务状态可见、可追踪。虽然引入了编排器这个“中心节点”,但在无服务器环境中,这个编排器本身可以是一个高可用的、由事件驱动的函数,从而避免了单点故障。

在真实项目中,对于有明确生命周期和状态转换的复杂业务流程,编排式 Saga 的可维护性和可观测性优势远大于其带来的耦合。因此,我们选择基于编排器的方式来构建我们的工作流。

最终架构:基于 Azure Functions 与 Cosmos DB 的实现

我们的目标架构如下:

graph TD
    subgraph "客户端"
        A[Client App]
    end

    subgraph "写入路径 (Write Path)"
        A -- 1. 发起订单 (HTTP POST) --> B(OrderOrchestrator
Azure Function - HTTP Trigger) B -- 2. 保存初始状态 --> C{OrderSagaState
Cosmos DB - Write Model} B -- 3. 发送扣减库存命令 --> D[CommandBus
Azure Service Bus Queue] D -- 4. 触发 --> E(DeductStockFunc
Azure Function - ServiceBus Trigger) E -- 5. 执行业务逻辑 --> F[Inventory Service] E -- 6. 发送成功/失败事件 --> G[EventBus
Azure Service Bus Topic] G -- 7. 触发 --> B B -- 8. 更新Saga状态, 发送下一步命令 --> C & D end subgraph "读取路径 (Read Path)" C -- 9. 变更流 (Change Feed) --> H(OrderProjector
Azure Function - CosmosDB Trigger) H -- 10. 更新物化视图 --> I{OrderReadModel
Cosmos DB - Read Model} J(GetOrderFunc
Azure Function - HTTP Trigger) -- 11. 查询 --> I A -- 12. 查询订单状态 (HTTP GET) --> J end

这个架构的核心组件包括:

  • OrderOrchestrator: 一个 HTTP 触发的 Azure Function,作为 Saga 编排器。它接收初始请求,管理整个订单流程的状态,并向命令总线发送指令。
  • OrderSagaState (Write Model): Cosmos DB 中的一个容器,用于持久化 Saga 的状态。只服务于写入和状态管理,结构为流程状态而设计。
  • CommandBus / EventBus: 使用 Azure Service Bus 实现。命令总线(Queue)用于点对点发送指令,事件总线(Topic)用于发布/订阅模式的状态变更通知。
  • Participant Functions (e.g., DeductStockFunc): 多个由 Service Bus 触发的函数,每个函数负责执行一个具体的业务步骤。
  • OrderProjector: 一个由 OrderSagaState 容器的 Cosmos DB Change Feed 触发的函数。这是实现读写分离的关键。它监听写入模型的任何变化,并将其转换为一个或多个为查询优化的读取模型。
  • OrderReadModel (Read Model): Cosmos DB 中的另一个容器,存储为查询而优化的、非规范化的数据(物化视图)。
  • GetOrderFunc: 一个简单的 HTTP 触发函数,直接从 OrderReadModel 中快速读取数据,提供给客户端。

代码实现:构建可落地的骨架

我们将使用 TypeScript 进行开发,并利用 Rome 作为代码格式化和检查工具,以保证在包含多个函数应用的 monorepo 项目中的代码一致性。

1. 项目结构与工具链配置

一个典型的 monorepo 结构可能如下:

/
├── rome.json
├── packages/
│   ├── functions/
│   │   ├── order-orchestrator/
│   │   │   ├── function.json
│   │   │   └── index.ts
│   │   ├── deduct-stock/
│   │   │   └── ...
│   │   ├── order-projector/
│   │   │   └── ...
│   │   └── get-order/
│   │       └── ...
│   ├── shared/
│   │   ├── types.ts
│   │   └── cosmos-db-client.ts
├── package.json
└── host.json

rome.json 配置非常简单,用于统一整个项目代码风格:

{
  "$schema": "https://docs.rome.tools/schemas/12.1.3/schema.json",
  "organizeImports": {
    "enabled": true
  },
  "linter": {
    "enabled": true,
    "rules": {
      "recommended": true
    }
  },
  "formatter": {
    "enabled": true,
    "indentStyle": "space",
    "indentSize": 2
  }
}

在根 package.jsonscripts 中添加 "rome": "rome format --write . && rome check ." 即可一键格式化和检查。

2. Saga 编排器 (order-orchestrator)

这是系统的核心。它既要处理初始的 HTTP 请求,也要处理后续来自事件总线的事件。

// packages/functions/order-orchestrator/index.ts
import { app, HttpRequest, HttpResponseInit, InvocationContext } from "@azure/functions";
import { ServiceBusClient } from "@azure/service-bus";
import { CosmosClient } from "@azure/cosmos";
import { OrderSagaState, SagaStatus, OrderSagaStep } from "../../shared/types";
import { v4 as uuidv4 } from "uuid";

// 从环境变量获取连接字符串,这是最佳实践
const COSMOS_ENDPOINT = process.env.COSMOS_DB_ENDPOINT;
const COSMOS_KEY = process.env.COSMOS_DB_KEY;
const SERVICE_BUS_CONNECTION_STRING = process.env.SERVICE_BUS_CONNECTION_STRING;

const cosmosClient = new CosmosClient({ endpoint: COSMOS_ENDPOINT, key: COSMOS_KEY });
const database = cosmosClient.database("OrdersDB");
const writeContainer = database.container("OrderSagaState");

const sbClient = new ServiceBusClient(SERVICE_BUS_CONNECTION_STRING);

// HTTP Trigger to start the saga
async function startOrderSaga(request: HttpRequest, context: InvocationContext): Promise<HttpResponseInit> {
    context.log("HTTP trigger function processed a request to start a new order saga.");
    const orderData = await request.json() as { userId: string, items: any[] };

    if (!orderData || !orderData.userId) {
        return { status: 400, body: "Invalid order data." };
    }

    const orderId = uuidv4();
    const initialState: OrderSagaState = {
        id: orderId,
        orderId: orderId,
        userId: orderData.userId,
        items: orderData.items,
        status: SagaStatus.PENDING,
        currentStep: OrderSagaStep.DEDUCT_STOCK,
        history: [{ step: "START", status: "SUCCESS", timestamp: new Date().toISOString() }],
        failureReason: null,
    };

    try {
        await writeContainer.items.create(initialState);
        context.log(`Created initial saga state for orderId: ${orderId}`);

        // Send the first command
        const commandSender = sbClient.createSender("deduct-stock-command");
        await commandSender.sendMessages({
            body: { orderId: orderId, items: orderData.items },
            correlationId: orderId,
        });
        await commandSender.close();

        return { status: 202, body: `Order process started with ID: ${orderId}` };
    } catch (error) {
        context.error("Failed to start saga", error);
        return { status: 500, body: "Internal server error." };
    }
}


// Service Bus Trigger to handle events from participant functions
async function advanceOrderSaga(message: any, context: InvocationContext): Promise<void> {
    const { orderId, step, status, details } = message;
    context.log(`Received event for orderId: ${orderId}, step: ${step}, status: ${status}`);

    const { resource: sagaState } = await writeContainer.item(orderId, orderId).read<OrderSagaState>();

    if (!sagaState || sagaState.status === SagaStatus.COMPLETED || sagaState.status === SagaStatus.FAILED) {
        context.log(`Saga for order ${orderId} is already completed or failed. Ignoring event.`);
        return;
    }

    // Update history
    sagaState.history.push({ step, status, timestamp: new Date().toISOString(), details });

    if (status === "SUCCESS") {
        await handleSuccess(sagaState, context);
    } else {
        await handleFailure(sagaState, details, context);
    }
}

async function handleSuccess(sagaState: OrderSagaState, context: InvocationContext) {
    let nextStep: OrderSagaStep | null = null;
    let nextCommandQueue: string | null = null;
    let commandPayload: object | null = null;

    // State machine logic
    switch (sagaState.currentStep) {
        case OrderSagaStep.DEDUCT_STOCK:
            nextStep = OrderSagaStep.PROCESS_PAYMENT;
            nextCommandQueue = "process-payment-command";
            commandPayload = { orderId: sagaState.orderId, amount: 100 }; // Amount should be calculated
            break;
        case OrderSagaStep.PROCESS_PAYMENT:
            nextStep = OrderSagaStep.CREATE_SHIPMENT;
            nextCommandQueue = "create-shipment-command";
            commandPayload = { orderId: sagaState.orderId, address: "..." };
            break;
        case OrderSagaStep.CREATE_SHIPMENT:
            sagaState.status = SagaStatus.COMPLETED;
            sagaState.currentStep = null;
            context.log(`Saga for order ${sagaState.orderId} completed successfully.`);
            break;
    }

    if (nextStep && nextCommandQueue && commandPayload) {
        sagaState.currentStep = nextStep;
        const commandSender = sbClient.createSender(nextCommandQueue);
        await commandSender.sendMessages({
            body: commandPayload,
            correlationId: sagaState.orderId,
        });
        await commandSender.close();
    }
    
    await writeContainer.item(sagaState.id, sagaState.orderId).replace(sagaState);
}

async function handleFailure(sagaState: OrderSagaState, reason: string, context: InvocationContext) {
    sagaState.status = SagaStatus.FAILED;
    sagaState.failureReason = reason;

    // Start compensation
    const lastSuccessfulStep = sagaState.history.filter(h => h.status === "SUCCESS").pop();
    if (!lastSuccessfulStep) {
        context.log(`No steps to compensate for order ${sagaState.orderId}.`);
        await writeContainer.item(sagaState.id, sagaState.orderId).replace(sagaState);
        return;
    }

    // A real implementation would have a reverse workflow
    context.log(`Initiating compensation for order ${sagaState.orderId} from step ${lastSuccessfulStep.step}`);
    
    // Example: send a 'restore-stock' command
    if (lastSuccessfulStep.step === OrderSagaStep.DEDUCT_STOCK) {
        const compensationSender = sbClient.createSender("restore-stock-command");
        await compensationSender.sendMessages({
            body: { orderId: sagaState.orderId, items: sagaState.items },
            correlationId: sagaState.orderId,
        });
        await compensationSender.close();
    }
    
    await writeContainer.item(sagaState.id, sagaState.orderId).replace(sagaState);
}


app.http("startOrderSaga", {
    methods: ["POST"],
    authLevel: "function",
    handler: startOrderSaga,
});

app.serviceBusTopic("advanceOrderSaga", {
    connection: "SERVICE_BUS_CONNECTION_STRING",
    topicName: "order-events",
    subscriptionName: "saga-orchestrator-subscription",
    handler: advanceOrderSaga,
});

这里的坑在于:编排器函数必须是幂等的。由于消息队列至少一次(At-Least-Once)的传递保证,同一个事件可能被多次处理。通过检查 sagaState.status,我们可以防止已经完成或失败的事务被重复处理。

3. 参与者函数 (deduct-stock)

这是一个简单的、职责单一的函数,它只负责处理扣减库存的命令,然后发布一个结果事件。

// packages/functions/deduct-stock/index.ts
import { app, InvocationContext } from "@azure/functions";
import { ServiceBusClient } from "@azure/service-bus";
import { OrderSagaStep } from "../../shared/types";

const SERVICE_BUS_CONNECTION_STRING = process.env.SERVICE_BUS_CONNECTION_STRING;
const sbClient = new ServiceBusClient(SERVICE_BUS_CONNECTION_STRING);

async function deductStock(message: any, context: InvocationContext): Promise<void> {
    const { orderId, items } = message;
    context.log(`Processing deduct-stock command for orderId: ${orderId}`);

    const eventSender = sbClient.createSender("order-events");
    let event;

    try {
        // --- Actual business logic would be here ---
        // For example, call an inventory database or service
        const stockAvailable = true; // Simulate success
        // -------------------------------------------

        if (stockAvailable) {
            context.log(`Stock deducted successfully for orderId: ${orderId}`);
            event = {
                body: {
                    orderId,
                    step: OrderSagaStep.DEDUCT_STOCK,
                    status: "SUCCESS",
                    details: { itemsDeducted: items.length }
                },
                correlationId: orderId,
            };
        } else {
            throw new Error("Insufficient stock");
        }
    } catch (error) {
        context.error(`Failed to deduct stock for orderId: ${orderId}`, error);
        event = {
            body: {
                orderId,
                step: OrderSagaStep.DEDUCT_STOCK,
                status: "FAILURE",
                details: error.message,
            },
            correlationId: orderId,
        };
    }
    
    await eventSender.sendMessages(event);
    await eventSender.close();
}

app.serviceBusQueue("deductStock", {
    connection: "SERVICE_BUS_CONNECTION_STRING",
    queueName: "deduct-stock-command",
    handler: deductStock,
});

4. 投影仪函数 (order-projector)

这是实现读写分离的核心。它利用了 Cosmos DB 的 Change Feed 特性,这是一个持久化的、按顺序排列的文档变更日志。

// packages/functions/order-projector/index.ts
import { app, InvocationContext } from "@azure/functions";
import { CosmosClient } from "@azure/cosmos";
import { OrderSagaState, OrderReadModel } from "../../shared/types";

const COSMOS_ENDPOINT = process.env.COSMOS_DB_ENDPOINT;
const COSMOS_KEY = process.env.COSMOS_DB_KEY;

const cosmosClient = new CosmosClient({ endpoint: COSMOS_ENDPOINT, key: COSMOS_KEY });
const database = cosmosClient.database("OrdersDB");
const readContainer = database.container("OrderReadModel");

async function projectOrderState(documents: OrderSagaState[], context: InvocationContext): Promise<void> {
    context.log(`Cosmos DB trigger function processed ${documents.length} documents.`);
    
    for (const doc of documents) {
        // 一个常见的错误是直接复制整个写入模型。
        // 读取模型应该只包含查询所需的最小字段,并且可能是非规范化的。
        const readModel: OrderReadModel = {
            id: doc.orderId,
            orderId: doc.orderId,
            userId: doc.userId,
            status: doc.status,
            itemCount: doc.items.length,
            // 假设我们需要一个总价,可以在此计算并存储,避免查询时再计算
            totalPrice: doc.items.reduce((acc, item) => acc + item.price, 0),
            lastUpdated: new Date().toISOString(),
            failureReason: doc.failureReason,
        };

        try {
            // Upsert operation: creates the item if it doesn't exist, or updates it if it does.
            // This makes the projector resilient to re-processing events from the Change Feed.
            await readContainer.items.upsert(readModel);
            context.log(`Upserted read model for orderId: ${doc.orderId}`);
        } catch (error) {
            // 在真实项目中,这里需要有重试和死信队列逻辑。
            // 如果投影失败,会导致读写数据不一致。
            context.error(`Failed to upsert read model for orderId: ${doc.orderId}`, error);
        }
    }
}

app.cosmosDB("projectOrderState", {
    connection: "COSMOS_DB_CONNECTION_STRING",
    databaseName: "OrdersDB",
    containerName: "OrderSagaState",
    createLeaseContainerIfNotExists: true,
    leaseContainerName: "leases-projector", // Lease container tracks the processor's progress
    handler: projectOrderState,
});

5. 查询函数 (get-order)

这个函数极其简单和高效。它只从为查询优化的读取模型中进行一次点查(Point Read),这是 Cosmos DB 中成本最低、速度最快的操作之一。

// packages/functions/get-order/index.ts
import { app, HttpRequest, HttpResponseInit, InvocationContext } from "@azure/functions";
import { CosmosClient } from "@azure/cosmos";
import { OrderReadModel } from "../../shared/types";

const COSMOS_ENDPOINT = process.env.COSMOS_DB_ENDPOINT;
const COSMOS_KEY = process.env.COSMOS_DB_KEY;

const cosmosClient = new CosmosClient({ endpoint: COSMOS_ENDPOINT, key: COSMOS_KEY });
const database = cosmosClient.database("OrdersDB");
const readContainer = database.container("OrderReadModel");

async function getOrder(request: HttpRequest, context: InvocationContext): Promise<HttpResponseInit> {
    const orderId = request.params.orderId;
    if (!orderId) {
        return { status: 400, body: "Please provide an orderId." };
    }
    
    try {
        // Point read using partition key and id for maximum performance.
        // Assuming orderId is also the partition key for the read model.
        const { resource: order } = await readContainer.item(orderId, orderId).read<OrderReadModel>();

        if (order) {
            return { status: 200, jsonBody: order };
        } else {
            return { status: 404, body: "Order not found." };
        }
    } catch (error) {
        // Cosmos DB throws an error for 404s in some SDK versions/methods.
        if (error.code === 404) {
             return { status: 404, body: "Order not found." };
        }
        context.error(`Error fetching order ${orderId}`, error);
        return { status: 500, body: "Internal server error." };
    }
}

app.http("getOrder", {
    methods: ["GET"],
    authLevel: "function",
    route: "orders/{orderId}",
    handler: getOrder,
});

架构的局限性与未来展望

这套架构解决了无服务器环境下的分布式事务和读性能问题,但并非银弹。它的主要局限性在于最终一致性。从 OrderSagaState 的变更到 OrderReadModel 的更新之间存在延迟,通常是秒级。客户端在创建订单后立即查询,可能会看到旧的状态,甚至404。业务上必须能够容忍这种延迟。一种缓解策略是在发起订单的 startOrderSaga 响应中,返回一个初始状态的快照,或者前端采用轮询或 WebSocket 等方式等待读取模型更新的通知。

其次,系统的复杂性显著增加。调试一个跨越多个函数和消息队列的分布式流程比调试单体应用要困难得多。必须建立完善的可观测性体系,包括使用关联ID(Correlation ID)贯穿整个调用链的结构化日志、分布式追踪(如 Azure Monitor Application Insights)以及业务状态的健康度量指标。

未来的优化路径可以探索使用 Azure Durable Functions 来替代手写的编排器逻辑。Durable Functions 提供了更高级的抽象(如 Orchestrator Functions 和 Activity Functions)来管理状态和流程控制,能进一步简化编排器的代码。然而,理解底层的 Saga 和事件驱动原理,仍然是设计和排查 Durable Functions 应用问题的基础。对于需要跨云或与外部非 Azure 服务集成的场景,当前基于 Service Bus 和 Cosmos DB Change Feed 的手动编排模式提供了更高的灵活性。


  目录