利用编排式Saga保障Flutter应用驱动下主数据与搜索引擎的最终一致性


在一个典型的移动应用驱动的微服务架构中,我们很快就会遇到一个棘手的问题:如何维护跨多个服务的数据一致性。假设我们正在构建一个体验预订平台,用户通过Flutter应用预订活动。这个操作至少会触及两个服务:BookingService,负责将预订记录写入主数据库(如PostgreSQL);以及SearchService,负责更新Elasticsearch索引,以便其他用户能看到最新的可预订名额。

如果BookingService成功写入数据库,但在调用SearchService时网络发生抖动或SearchService暂时不可用,会发生什么?主数据和搜索数据立刻出现不一致。用户可能会在搜索结果中看到一个已经被预订的名额,导致糟糕的用户体验。传统的两阶段提交(2PC)在微服务架构中因其同步阻塞和对资源的高锁定性而基本不被采用。这正是Saga模式的用武之地。

我们最初的构想是在BookingService中加入重试逻辑,并在失败后记录日志,由人工或定时任务补偿。但这种方式非常脆弱。如果BookingService在写入数据库后、调用SearchService前发生崩溃,这次更新就永远丢失了。我们需要一个更可靠、可追踪的机制。

最终,我们决定采用编排式Saga(Orchestration-based Saga)。它引入了一个中心化的协调器——SagaOrchestrator,来管理整个分布式事务的流程。这种方式相比于编舞式Saga(Choreography-based Saga),其状态和失败逻辑都集中管理,对于调试和理解复杂流程更为直观。在真实项目中,清晰的故障排查路径往往比极致的解耦更有价值。

架构设计与流程

我们的目标是实现一个从Flutter客户端发起的预订流程,该流程能原子性地完成数据库写入和搜索引擎索引更新。

sequenceDiagram
    participant FlutterApp as Flutter客户端
    participant Gateway as API网关
    participant Orchestrator as Saga编排器
    participant BookingSvc as 预订服务 (Postgres)
    participant SearchSvc as 搜索服务 (Elasticsearch)

    FlutterApp->>+Gateway: POST /saga/bookings (发起预订)
    Gateway->>+Orchestrator: startBookingSaga(bookingDetails)
    Orchestrator->>Orchestrator: 创建Saga实例, state=PENDING
    Orchestrator->>+BookingSvc: POST /bookings (创建预订)
    BookingSvc->>-Orchestrator: 201 Created (bookingId)
    Orchestrator->>Orchestrator: Saga state=BOOKING_CREATED, 记录bookingId
    Orchestrator->>+SearchSvc: POST /indexes (更新索引)
    Note right of SearchSvc: 假设此处调用失败
    SearchSvc-->>-Orchestrator: 500 Internal Server Error
    Orchestrator->>Orchestrator: Saga state=COMPENSATING
    Orchestrator->>+BookingSvc: DELETE /bookings/{bookingId} (补偿操作)
    BookingSvc->>-Orchestrator: 204 No Content
    Orchestrator->>Orchestrator: Saga state=FAILED
    Orchestrator-->>-Gateway: 202 Accepted (sagaId, status=FAILED)
    Gateway-->>-FlutterApp: 返回Saga状态

这个流程图清晰地展示了失败路径。当SearchService调用失败时,编排器会启动补偿逻辑,调用BookingService的删除接口来回滚已经完成的操作,从而保证系统的最终一致性。

核心实现:Saga编排器

我们使用Node.js和TypeScript构建一个简单的内存Saga编排器。在生产环境中,Saga的状态必须持久化到数据库(如Redis或Postgres)中,以防编排器自身崩溃导致状态丢失。

saga.orchestrator.ts

import { randomUUID } from 'crypto';
import axios from 'axios';

// 在真实项目中,这个状态会存储在Redis或数据库中
const sagaStore: Map<string, SagaInstance> = new Map();

// 服务地址配置,从环境变量读取
const BOOKING_SERVICE_URL = process.env.BOOKING_SERVICE_URL || 'http://localhost:3001';
const SEARCH_SERVICE_URL = process.env.SEARCH_SERVICE_URL || 'http://localhost:3002';

enum SagaState {
  PENDING,
  CREATING_BOOKING,
  UPDATING_INDEX,
  COMPENSATING_BOOKING,
  SUCCEEDED,
  FAILED,
}

interface SagaContext {
  bookingDetails: any;
  bookingId?: string;
  error?: string;
}

interface SagaInstance {
  id: string;
  state: SagaState;
  context: SagaContext;
}

// 定义Saga的步骤
const sagaDefinition = {
  [SagaState.CREATING_BOOKING]: {
    async execute(context: SagaContext) {
      console.log('[SAGA] Executing: Create Booking');
      const response = await axios.post(`${BOOKING_SERVICE_URL}/bookings`, context.bookingDetails);
      return { bookingId: response.data.id };
    },
    onSuccess: SagaState.UPDATING_INDEX,
    onFailure: SagaState.FAILED, // 第一步失败,直接结束
  },
  [SagaState.UPDATING_INDEX]: {
    async execute(context: SagaContext) {
      console.log('[SAGA] Executing: Update Search Index');
      // 这里的 payload 应该更复杂,包含需要索引的数据
      await axios.post(`${SEARCH_SERVICE_URL}/indexes`, {
        bookingId: context.bookingId,
        ...context.bookingDetails,
      });
      return {};
    },
    onSuccess: SagaState.SUCCEEDED,
    onFailure: SagaState.COMPENSATING_BOOKING, // 失败时触发补偿
  },
  [SagaState.COMPENSATING_BOOKING]: {
    async execute(context: SagaContext) {
      console.log(`[SAGA] Compensating: Delete Booking ${context.bookingId}`);
      if (!context.bookingId) {
        throw new Error('Cannot compensate booking without bookingId');
      }
      await axios.delete(`${BOOKING_SERVICE_URL}/bookings/${context.bookingId}`);
      return {};
    },
    onSuccess: SagaState.FAILED, // 补偿成功后,Saga最终状态为失败
    onFailure: SagaState.FAILED, // 补偿失败需要告警和人工介入
  },
};

class SagaOrchestrator {
  public start(bookingDetails: any): SagaInstance {
    const sagaId = randomUUID();
    const instance: SagaInstance = {
      id: sagaId,
      state: SagaState.PENDING,
      context: { bookingDetails },
    };
    sagaStore.set(sagaId, instance);
    console.log(`[SAGA] Started Saga with ID: ${sagaId}`);
    this.transition(sagaId, SagaState.CREATING_BOOKING);
    return instance;
  }

  public getStatus(sagaId: string): SagaInstance | undefined {
    return sagaStore.get(sagaId);
  }

  private async transition(sagaId: string, nextState: SagaState) {
    const instance = sagaStore.get(sagaId);
    if (!instance) {
        console.error(`[SAGA] Saga instance not found: ${sagaId}`);
        return;
    }

    instance.state = nextState;
    sagaStore.set(sagaId, instance);
    console.log(`[SAGA] Transitioned ${sagaId} to state: ${SagaState[nextState]}`);

    const step = sagaDefinition[nextState];
    if (step) {
      try {
        const result = await step.execute(instance.context);
        // 更新上下文,为后续步骤提供数据
        instance.context = { ...instance.context, ...result };
        this.transition(sagaId, step.onSuccess);
      } catch (error: any) {
        console.error(`[SAGA] Step ${SagaState[nextState]} failed for ${sagaId}:`, error.message);
        instance.context.error = error.message;
        // 这里的坑在于:如果补偿操作本身也失败了,系统将处于一个不一致的“脏”状态。
        // 生产级系统需要为补偿操作设计重试和告警机制。
        this.transition(sagaId, step.onFailure);
      }
    }
  }
}

export const orchestrator = new SagaOrchestrator();

这份代码实现了一个简单的状态机。transition函数是核心,它驱动Saga从一个状态转移到下一个。每个步骤定义了执行体(execute)、成功转移状态(onSuccess)和失败转移状态(onFailure)。注意,UPDATING_INDEX步骤失败后会转移到COMPENSATING_BOOKING,这就是Saga的回滚逻辑。

参与者服务:保证幂等性

Saga编排器可能会因为网络问题重试对参与者服务的调用。因此,参与者服务的接口必须是幂等的。

booking.service.ts

// 使用 Express.js 模拟 BookingService
import express from 'express';
const app = express();
app.use(express.json());

// 模拟数据库
const bookingsDB = new Map<string, any>();

// POST /bookings
// 一个常见的错误是没有处理重复请求。
// 这里的实现是简化的,真实项目需要基于请求ID或业务唯一标识来确保幂等。
app.post('/bookings', (req, res) => {
  const bookingId = `bk_${Date.now()}`;
  console.log(`[BookingService] Creating booking ${bookingId}`);
  bookingsDB.set(bookingId, req.body);
  res.status(201).json({ id: bookingId, status: 'created' });
});

// DELETE /bookings/:id
// 删除操作天然具有幂等性,删除一个不存在的资源不应该报错。
app.delete('/bookings/:id', (req, res) => {
  const { id } = req.params;
  if (bookingsDB.has(id)) {
    console.log(`[BookingService] Deleting booking ${id}`);
    bookingsDB.delete(id);
  } else {
    // 即使资源不存在,也应该返回成功,这是幂等性的关键。
    console.log(`[BookingService] Booking ${id} not found, compensation considered successful.`);
  }
  res.status(204).send();
});

app.listen(3001, () => console.log('BookingService listening on port 3001'));

SearchService的实现类似,但为了测试失败场景,我们让它有一定概率失败。

search.service.ts

import express from 'express';
const app = express();
app.use(express.json());

// 模拟Elasticsearch索引
const searchIndex = new Map<string, any>();

app.post('/indexes', (req, res) => {
  // 模拟服务不稳定的情况
  if (Math.random() < 0.5) {
    console.error('[SearchService] Failed to update index due to a transient error.');
    return res.status(500).json({ error: 'Failed to connect to search cluster' });
  }
  
  const { bookingId } = req.body;
  console.log(`[SearchService] Indexing document for booking ${bookingId}`);
  searchIndex.set(bookingId, req.body);
  res.status(200).json({ status: 'indexed' });
});

app.listen(3002, () => console.log('SearchService listening on port 3002'));

Flutter客户端:处理异步事务状态

Saga本质上是异步的。客户端发起一个Saga后,不能同步等待结果,因为整个流程可能需要几秒钟甚至更长时间。客户端的角色是:

  1. 发起Saga,并获得一个唯一的sagaId
  2. 使用sagaId轮询编排器,获取Saga的最终状态。
  3. 根据状态更新UI,向用户提供明确的反馈。

我们使用Riverpod进行状态管理。

saga_service.dart

import 'dart:async';
import 'package:dio/dio.dart';

// 你的API网关或编排器地址
const String _baseUrl = 'http://10.0.2.2:3000';

enum SagaStatus { pending, succeeded, failed, unknown }

class SagaService {
  final Dio _dio = Dio();

  // 1. 发起Saga
  Future<String> startBookingSaga(Map<String, dynamic> bookingDetails) async {
    try {
      final response = await _dio.post('$_baseUrl/saga/bookings', data: bookingDetails);
      if (response.statusCode == 202 && response.data['sagaId'] != null) {
        return response.data['sagaId'];
      }
      throw Exception('Failed to start Saga: Invalid response');
    } catch (e) {
      // 这里的错误处理至关重要,如果Saga启动失败,需要立即通知用户。
      print('Error starting saga: $e');
      rethrow;
    }
  }

  // 2. 查询Saga状态
  Future<SagaStatus> getSagaStatus(String sagaId) async {
    try {
      final response = await _dio.get('$_baseUrl/saga/status/$sagaId');
      final statusString = response.data['status'];
      switch (statusString) {
        case 'SUCCEEDED':
          return SagaStatus.succeeded;
        case 'FAILED':
          return SagaStatus.failed;
        default:
          return SagaStatus.pending;
      }
    } catch (e) {
      print('Error getting saga status: $e');
      return SagaStatus.unknown;
    }
  }
}

booking_provider.dart

import 'dart:async';
import 'package:flutter_riverpod/flutter_riverpod.dart';
import 'saga_service.dart'; // 假设SagaService在另一个文件

// 提供SagaService实例
final sagaServiceProvider = Provider((ref) => SagaService());

// 用于追踪特定Saga状态的Family Provider
final sagaStatusProvider = FutureProvider.family<SagaStatus, String>((ref, sagaId) async {
  final sagaService = ref.watch(sagaServiceProvider);
  
  // 使用轮询来检查状态。在生产应用中,WebSocket或SSE是更好的选择。
  while (true) {
    final status = await sagaService.getSagaStatus(sagaId);
    if (status == SagaStatus.succeeded || status == SagaStatus.failed) {
      return status;
    }
    // 等待2秒再查询,避免过于频繁的请求
    await Future.delayed(const Duration(seconds: 2));
  }
});

// 预订页面的状态
class BookingNotifier extends StateNotifier<AsyncValue<String?>> {
  BookingNotifier(this.ref) : super(const AsyncValue.data(null));

  final Ref ref;

  Future<void> createBooking(Map<String, dynamic> details) async {
    state = const AsyncValue.loading();
    try {
      final sagaService = ref.read(sagaServiceProvider);
      final sagaId = await sagaService.startBookingSaga(details);
      state = AsyncValue.data(sagaId);
    } catch (e, st) {
      state = AsyncValue.error(e, st);
    }
  }
}

final bookingNotifierProvider = StateNotifierProvider<BookingNotifier, AsyncValue<String?>>(
  (ref) => BookingNotifier(ref),
);

booking_screen.dart (UI部分)

import 'package:flutter/material.dart';
import 'package:flutter_riverpod/flutter_riverpod.dart';
import 'booking_provider.dart';
import 'saga_service.dart';

class BookingScreen extends ConsumerWidget {
  const BookingScreen({super.key});

  
  Widget build(BuildContext context, WidgetRef ref) {
    // 监听预订操作的发起状态
    ref.listen<AsyncValue<String?>>(bookingNotifierProvider, (_, state) {
      state.whenOrNull(
        error: (err, st) => ScaffoldMessenger.of(context).showSnackBar(
          SnackBar(content: Text('Failed to initiate booking: $err')),
        ),
      );
    });

    final bookingState = ref.watch(bookingNotifierProvider);
    final sagaId = bookingState.value;

    return Scaffold(
      appBar: AppBar(title: const Text('Saga Pattern in Flutter')),
      body: Center(
        child: Column(
          mainAxisAlignment: MainAxisAlignment.center,
          children: [
            if (sagaId == null)
              ElevatedButton(
                onPressed: bookingState.isLoading
                    ? null
                    : () {
                        // 模拟预订数据
                        final bookingDetails = {'userId': 'user123', 'activityId': 'act456'};
                        ref.read(bookingNotifierProvider.notifier).createBooking(bookingDetails);
                      },
                child: const Text('Create Booking'),
              )
            else
              // 一旦Saga启动,我们就监听它的最终状态
              Consumer(builder: (context, ref, _) {
                final sagaStatus = ref.watch(sagaStatusProvider(sagaId));
                return sagaStatus.when(
                  data: (status) {
                    if (status == SagaStatus.succeeded) {
                      return const Column(children: [Icon(Icons.check_circle, color: Colors.green, size: 48), Text('Booking Successful!')]);
                    }
                    if (status == SagaStatus.failed) {
                      return const Column(children: [Icon(Icons.error, color: Colors.red, size: 48), Text('Booking Failed. Please try again.')]);
                    }
                    // 这段逻辑不应该被触及,因为provider会持续运行直到成功或失败
                    return const Text('Unexpected Status');
                  },
                  loading: () => const Column(children: [CircularProgressIndicator(), SizedBox(height: 16), Text('Processing booking...')]),
                  error: (err, st) => Text('Error polling status: $err'),
                );
              }),
          ],
        ),
      ),
    );
  }
}

通过FutureProvider.family,我们可以为每个sagaId创建一个独立的、可缓存的状态流。UI组件订阅这个Provider,便能响应式地展示”处理中”、”成功”或”失败”的状态,即使用户离开再返回这个页面(只要sagaId被持久化),也能看到正确的最终结果。

方案局限与未来路径

当前这套实现虽然验证了核心思想,但在生产环境中仍存在不足。首先,内存中的Saga编排器是单点故障,且无法水平扩展。一个健壮的实现需要将Saga实例的状态持久化到高可用的数据库中,并确保状态转换的事务性。

其次,Flutter客户端使用轮询来获取Saga状态,这会产生不必要的网络流量并带来延迟。在真实项目中,应采用WebSocket或Server-Sent Events (SSE) 等服务端推送技术,由编排器在Saga终态时主动通知客户端,以获得更好的实时性和效率。

最后,补偿逻辑的可靠性是Saga模式的基石。如果补偿操作自身失败(例如,BookingService的数据库宕机),整个系统将陷入不一致状态。因此,生产级的Saga实现必须对补偿操作设计重试机制,并在多次失败后触发明确的告警,以便人工介入处理。对于一些关键业务,甚至可能需要引入更复杂的策略,如记录失败的补偿任务到死信队列中进行后续处理。


  目录