在 Flask 推荐系统中实现基于 Saga 的分布式事务与 Sentry 异常治理


一个推荐系统的反馈闭环,看似简单,实则暗流涌动。当一个用户点击或收藏了我们推荐的内容,后台需要触发一系列操作:更新用户画像服务、调整物品特征向量、记录交互日志用于模型重训练,甚至可能要调用一个外部的积分服务。

# 一个极其脆弱的实现
def process_feedback(user_id, item_id, interaction_type):
    # 第一步:更新用户画像
    user_profile_service.update_user_preferences(user_id, item_id)
    
    # 第二步:更新物品热度
    item_feature_service.increment_item_hot_score(item_id)
    
    # 第三步:触发模型训练任务
    # 如果这一步因为网络抖动或服务过载而失败...
    # 前两步的操作已经无法撤回,数据进入了不一致状态
    model_retraining_service.trigger_new_task(user_id, item_id)

    return {"status": "success"}

这段代码在真实项目中是灾难的开始。如果model_retraining_service调用失败,用户画像和物品热度已经被修改。数据不一致性就此产生,这种不一致会像毒药一样在系统中蔓延,导致模型效果劣化,推荐质量下降。

传统的数据库教学会告诉我们,用事务来保证原子性。在分布式环境下,这对应着两阶段提交协议(Two-Phase Commit, 2PC)。但 2PC 在微服务架构中几乎是一个被禁用的词。它要求所有参与者在事务期间锁定资源,整个过程是同步阻塞的。协调者的单点故障风险、网络分区导致的全局阻塞,都让它在追求高可用的互联网服务中显得格格不入。我们的推荐系统反馈处理是一个典型的长周期业务流程,跨越多个独立的服务,使用 2PC 会导致整个系统的吞吐量急剧下降,是不可接受的。

我们需要一种更务实的方案。这个方案必须能保证最终一致性,同时允许服务保持独立和高可用。Saga 模式是这个问题的答案。它将一个长事务拆分为多个本地事务,每个本地事务都有一个对应的补偿(Compensation)操作。如果任何一个本地事务失败,Saga 会反向调用已经成功执行的本地事务的补偿操作,从而回滚整个业务流程。

为了让这个流程更直观,我们可以把它想象成一个简化的 Kanban 看板。一个反馈处理任务从 “待处理” 开始,依次流经 “用户画像已更新”、”物品特征已更新”,最终到达 “模型任务已触发” 的完成状态。如果在哪一列卡住了,我们就得把它安全地移回初始状态。

构想与服务设计

我们将构建一个基于 Flask 的 Saga 编排器(Orchestrator)来管理这个流程。编排器负责按顺序调用各个服务,并记录流程状态。如果中途失败,它将负责执行补偿逻辑。

首先,定义我们需要交互的微服务接口。在真实项目中,这些会是独立的 RPC 或 RESTful 服务。为了聚焦核心逻辑,我们用 Python 类来模拟它们。关键在于,这些模拟服务必须能被控制,以便于我们测试成功和失败的场景。

# services.py

import random
import logging

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# --- 模拟可能失败的外部服务 ---

class UserProfileService:
    """模拟用户画像服务"""
    def update_preferences(self, saga_id: str, user_id: str, item_id: str):
        logging.info(f"[Saga:{saga_id}] [UserProfileService] 更新用户 {user_id} 对物品 {item_id} 的偏好...")
        # 模拟操作耗时
        # random.choice([True, False]) # 可用于模拟随机失败
        return True

    def compensate_update_preferences(self, saga_id: str, user_id: str, item_id: str):
        logging.warning(f"[Saga:{saga_id}] [UserProfileService] [补偿] 撤销用户 {user_id} 对物品 {item_id} 偏好的更新。")
        # 补偿逻辑通常不会失败,但必须设计为幂等
        return True

class ItemFeatureService:
    """模拟物品特征服务"""
    def increment_hot_score(self, saga_id: str, item_id: str, fail_this_step: bool = False):
        logging.info(f"[Saga:{saga_id}] [ItemFeatureService] 增加物品 {item_id} 的热度分...")
        if fail_this_step:
            logging.error(f"[Saga:{saga_id}] [ItemFeatureService] 操作失败!模拟服务故障。")
            raise ConnectionError("无法连接到物品特征服务")
        return True

    def compensate_increment_hot_score(self, saga_id: str, item_id: str):
        logging.warning(f"[Saga:{saga_id}] [ItemFeatureService] [补偿] 减少物品 {item_id} 的热度分。")
        return True

class ModelTrainingService:
    """模拟模型训练服务"""
    def trigger_new_task(self, saga_id: str, user_id: str, item_id: str):
        logging.info(f"[Saga:{saga_id}] [ModelTrainingService] 为用户 {user_id} 和物品 {item_id} 触发新的模型训练任务...")
        return True
    
    def compensate_trigger_new_task(self, saga_id: str, user_id: str, item_id: str):
        # 某些操作可能没有补偿逻辑,或者补偿逻辑是取消一个已经提交的任务
        logging.warning(f"[Saga:{saga_id}] [ModelTrainingService] [补偿] 尝试取消已触发的训练任务。")
        return True

# 服务实例,在真实项目中会通过服务发现获取
user_profile_service = UserProfileService()
item_feature_service = ItemFeatureService()
model_training_service = ModelTrainingService()

这里的关键点是每个服务的正向操作(update_preferences)都有一个对应的补偿操作(compensate_update_preferences)。补偿操作必须是幂等的,即多次执行和一次执行的效果相同。这是保证回滚过程健壮性的基础。

Saga 编排器的核心实现

编排器是 Saga 模式的核心。它是一个状态机,驱动整个业务流程,并在出现故障时执行恢复逻辑。

# saga.py

import uuid
import logging
from typing import List, Dict, Any, Callable

# 引入 Sentry SDK 用于更丰富的错误捕获
import sentry_sdk

from services import (
    user_profile_service,
    item_feature_service,
    model_training_service
)

class SagaOrchestrator:
    """
    一个简单的基于内存的 Saga 编排器
    在生产环境中,状态应持久化到 Redis 或数据库中
    """

    def __init__(self, saga_id: str, payload: Dict[str, Any]):
        self.saga_id = saga_id
        self.payload = payload
        self.steps = self._define_steps()
        # 记录已成功完成的步骤及其补偿操作
        self.completed_steps: List[Dict[str, Any]] = []

    def _define_steps(self) -> List[Dict[str, Callable]]:
        """
        定义 Saga 的所有步骤及其补偿操作
        这是一个核心部分,清晰地定义了业务流程
        """
        return [
            {
                "name": "update_user_profile",
                "action": lambda: user_profile_service.update_preferences(
                    self.saga_id, self.payload['user_id'], self.payload['item_id']
                ),
                "compensation": lambda: user_profile_service.compensate_update_preferences(
                    self.saga_id, self.payload['user_id'], self.payload['item_id']
                )
            },
            {
                "name": "update_item_features",
                "action": lambda: item_feature_service.increment_hot_score(
                    self.saga_id, self.payload['item_id'], self.payload.get('fail_at_this_step', False)
                ),
                "compensation": lambda: item_feature_service.compensate_increment_hot_score(
                    self.saga_id, self.payload['item_id']
                )
            },
            {
                "name": "trigger_model_training",
                "action": lambda: model_training_service.trigger_new_task(
                    self.saga_id, self.payload['user_id'], self.payload['item_id']
                ),
                "compensation": lambda: model_training_service.compensate_trigger_new_task(
                    self.saga_id, self.payload['user_id'], self.payload['item_id']
                )
            }
        ]

    def execute(self):
        """执行 Saga 流程"""
        try:
            for step in self.steps:
                logging.info(f"[Saga:{self.saga_id}] executing step: {step['name']}")
                step['action']()
                # 只有成功后才记录到已完成列表
                self.completed_steps.append(step)
            
            logging.info(f"[Saga:{self.saga_id}] execution completed successfully.")
            return True
        except Exception as e:
            # 这里的异常捕获是整个模式的关键
            logging.error(f"[Saga:{self.saga_id}] execution failed at step: {step['name']}. Reason: {e}")
            
            # --- Sentry 集成:捕获带有丰富上下文的异常 ---
            with sentry_sdk.push_scope() as scope:
                scope.set_tag("saga_id", self.saga_id)
                scope.set_tag("failed_step", step['name'])
                scope.set_context("saga_payload", self.payload)
                scope.set_context("completed_steps", [s['name'] for s in self.completed_steps])
                sentry_sdk.capture_exception(e)
            
            self._compensate()
            return False

    def _compensate(self):
        """执行补偿逻辑"""
        logging.warning(f"[Saga:{self.saga_id}] starting compensation process...")
        # 从后往前,对已完成的步骤执行补偿
        for step in reversed(self.completed_steps):
            try:
                logging.warning(f"[Saga:{self.saga_id}] compensating step: {step['name']}")
                step['compensation']()
            except Exception as e:
                # 补偿失败是一个严重问题,需要人工介入
                logging.critical(f"[Saga:{self.saga_id}] FATAL: Compensation failed for step: {step['name']}. Reason: {e}")
                # 再次上报 Sentry,但使用更严重的级别
                with sentry_sdk.push_scope() as scope:
                    scope.set_level("fatal")
                    scope.set_tag("saga_id", self.saga_id)
                    scope.set_tag("compensation_failed_step", step['name'])
                    sentry_sdk.capture_exception(e)
                # 此时可能需要将此 Saga 标记为“需要人工处理”
                # 在此中断补偿流程,防止造成更严重的问题
                break
        logging.warning(f"[Saga:{self.saga_id}] compensation process finished.")

这个 SagaOrchestrator 是我们系统的骨架。_define_steps 方法清晰地定义了业务流程,这是代码可读性和可维护性的关键。execute 方法是正向执行引擎,而 _compensate 则是逆向的回滚引擎。

请注意 Sentry 的集成方式。我们不只是简单地调用 capture_exception。在真实项目中,一个 Sentry 错误报告如果缺乏上下文,就几乎是无用的。我们通过 set_tagset_context 注入了 saga_id、失败的步骤名、业务载荷等关键信息。当运维或开发人员在 Sentry 后台看到这个错误时,他们能立刻知道是哪个分布式事务、在哪一步、因为什么数据而失败,极大地缩短了故障排查时间。

集成到 Flask 应用

现在,我们将这个 Saga 编排器集成到一个 Flask API 端点中。这个端点将接收前端的反馈请求,并启动 Saga 流程。

# app.py

import os
import uuid
import logging
from flask import Flask, request, jsonify

import sentry_sdk
from sentry_sdk.integrations.flask import FlaskIntegration

from saga import SagaOrchestrator

# --- Sentry 初始化 ---
# 在生产环境中,DSN 应该通过环境变量配置
SENTRY_DSN = os.environ.get("SENTRY_DSN")
if SENTRY_DSN:
    sentry_sdk.init(
        dsn=SENTRY_DSN,
        integrations=[FlaskIntegration()],
        # 设置采样率以控制性能影响
        traces_sample_rate=1.0,
        # 开启 PII 以便在上下文里看到 payload,请注意数据合规性
        send_default_pii=True
    )
    logging.info("Sentry SDK initialized.")
else:
    logging.warning("SENTRY_DSN not found. Sentry SDK not initialized.")


app = Flask(__name__)

@app.route('/v1/recommendation/feedback', methods=['POST'])
def process_recommendation_feedback():
    """
    接收推荐反馈并启动 Saga 流程
    """
    data = request.get_json()
    if not data or 'user_id' not in data or 'item_id' not in data:
        return jsonify({"error": "Missing user_id or item_id"}), 400

    # 为每个事务生成一个唯一的 ID
    saga_id = str(uuid.uuid4())
    
    # 业务载荷,这里可以包含用于模拟失败的标志
    payload = {
        "user_id": data["user_id"],
        "item_id": data["item_id"],
        "interaction_type": data.get("interaction_type", "click"),
        "fail_at_this_step": data.get("force_fail_item_service", False) # 用于测试
    }

    # 将 saga_id 添加到 Sentry scope,这样后续该请求内的所有事件都会关联此 ID
    sentry_sdk.set_tag("entry_saga_id", saga_id)

    logging.info(f"Received feedback, starting Saga {saga_id} with payload: {payload}")

    orchestrator = SagaOrchestrator(saga_id=saga_id, payload=payload)
    success = orchestrator.execute()

    if success:
        return jsonify({"message": "Feedback processed successfully", "saga_id": saga_id}), 200
    else:
        # 即使失败,也返回一个明确的响应
        # 客户端不需要知道内部发生了回滚,但服务端必须记录
        return jsonify({
            "error": "Failed to process feedback, state has been rolled back",
            "saga_id": saga_id
        }), 500

if __name__ == '__main__':
    # 为了方便测试,在 debug 模式下运行
    app.run(host='0.0.0.0', port=5001, debug=True)

测试与验证

现在是检验成果的时候。我们可以使用 curl 来模拟 API 请求。

1. 成功场景

curl -X POST http://127.0.0.1:5001/v1/recommendation/feedback \
-H "Content-Type: application/json" \
-d '{
    "user_id": "user-123",
    "item_id": "item-abc"
}'

服务器日志会清晰地显示每个步骤的执行过程:

INFO:werkzeug:127.0.0.1 - - [27/Oct/2023 10:35:00] "POST /v1/recommendation/feedback HTTP/1.1" 200 -
INFO:app:Received feedback, starting Saga a1b2c3d4... with payload: {'user_id': 'user-123', ...}
INFO:saga:[Saga:a1b2c3d4...] executing step: update_user_profile
INFO:services:[Saga:a1b2c3d4...] [UserProfileService] 更新用户 user-123 对物品 item-abc 的偏好...
INFO:saga:[Saga:a1b2c3d4...] executing step: update_item_features
INFO:services:[Saga:a1b2c3d4...] [ItemFeatureService] 增加物品 item-abc 的热度分...
INFO:saga:[Saga:a1b2c3d4...] executing step: trigger_model_training
INFO:services:[Saga:a1b2c3d4...] [ModelTrainingService] 为用户 user-123 和物品 item-abc 触发新的模型训练任务...
INFO:saga:[Saga:a1b2c3d4...] execution completed successfully.

2. 失败与补偿场景

现在,我们利用在 payload 中埋下的 force_fail_item_service 标志来模拟 ItemFeatureService 的失败。

curl -X POST http://127.0.0.1:5001/v1/recommendation/feedback \
-H "Content-Type: application/json" \
-d '{
    "user_id": "user-456",
    "item_id": "item-xyz",
    "force_fail_item_service": true
}'

这次,服务器日志会展示一个完全不同的故事:

INFO:werkzeug:127.0.0.1 - - [27/Oct/2023 10:40:00] "POST /v1/recommendation/feedback HTTP/1.1" 500 -
INFO:app:Received feedback, starting Saga e5f6g7h8... with payload: {'user_id': 'user-456', ..., 'force_fail_item_service': True}
INFO:saga:[Saga:e5f6g7h8...] executing step: update_user_profile
INFO:services:[Saga:e5f6g7h8...] [UserProfileService] 更新用户 user-456 对物品 item-xyz 的偏好...
INFO:saga:[Saga:e5f6g7h8...] executing step: update_item_features
INFO:services:[Saga:e5f6g7h8...] [ItemFeatureService] 增加物品 item-xyz 的热度分...
ERROR:services:[Saga:e5f6g7h8...] [ItemFeatureService] 操作失败!模拟服务故障。
ERROR:saga:[Saga:e5f6g7h8...] execution failed at step: update_item_features. Reason: 无法连接到物品特征服务
WARNING:saga:[Saga:e5f6g7h8...] starting compensation process...
WARNING:saga:[Saga:e5f6g7h8...] compensating step: update_user_profile
WARNING:services:[Saga:e5f6g7h8...] [UserProfileService] [补偿] 撤销用户 user-456 对物品 item-xyz 偏好的更新。
WARNING:saga:[Saga:e5f6g7h8...] compensation process finished.

日志清楚地显示,在 update_item_features 步骤失败后,补偿流程启动,并成功调用了 update_user_profile 的补偿操作。数据最终恢复到了一致状态。同时,在 Sentry 后台,我们会看到一个包含所有关键上下文的错误报告,这对于事后复盘和修复至关重要。

方案的局限性与未来迭代

我们实现的这个 Saga 编排器是基于内存的,这在生产环境中是不可接受的。如果编排器服务在执行 Saga 流程的中途崩溃,整个事务的状态就会丢失,可能导致某些补偿操作永远不会被执行。

一个生产级的 Saga 实现必须将 Saga 的状态(当前执行到哪一步,已完成的步骤有哪些)持久化到可靠的存储中,如 Redis、PostgreSQL 或专用的工作流引擎(如 Cadence 或 Temporal)。每次执行步骤前后都更新持久化状态,这样即使编排器重启,也能从中断的地方恢复,继续执行或进行补偿。

此外,Saga 模式牺牲了隔离性(Isolation)。在事务补偿完成之前,系统可能会处于一个中间状态(例如,用户画像更新了,但物品特征还没更新)。其他服务可能会观察到这种不一致的中间状态。这需要业务层面进行容忍设计,或者引入其他机制如“语义锁”来防止业务逻辑在不完整的事务数据上进行操作。

最后,补偿逻辑的设计本身就是一项挑战。不是所有操作都容易补偿。例如,如果一个步骤是发送邮件,补偿操作可能无法“撤回”邮件。这种情况下,补偿操作可能是发送一封致歉邮件。设计健壮且业务合理的补偿逻辑,是成功实施 Saga 模式的艺术所在。


  目录