为 Fastify 构建由 Apollo 动态驱动的熔断器中间件


一个下游服务的网络抖动,直接导致我们核心交易链路的三个 Pod 全部雪崩。复盘时,原因清晰得令人沮丧:对该下游服务的调用,虽然包裹在 try-catch 中,却没有设置超时,更没有熔断机制。请求在 TCP 层面挂起,最终耗尽了 Fastify 服务器的事件循环,使其无法响应新的健康检查,然后被 Kubernetes 无情地重启。硬编码超时和熔断阈值是第一反应,但这治标不治本。下次抖动时,也许 2 秒的超时依然太长,或者失败率阈值设得太高。在紧急情况下,调整这些参数需要修改代码、CI/CD、重新部署,这个响应周期对于线上故障是致命的。

我们的痛点很明确:需要一套能够实时、动态调整服务韧性策略的机制,而不需要重新部署应用。

初步构想与技术选型

核心构想是创建一个 Fastify 中间件(或者说,一个 Plugin),它能实现熔断器逻辑,并且其所有关键参数——超时时间、失败阈值、重置周期——都由外部配置中心动态管理。当我们在配置中心修改参数并发布后,运行中的 Fastify 实例必须“热加载”这些新配置,并立刻应用到熔断器上。

技术栈选型:

  1. Web 框架: Fastify。选择它是因为其极致的性能和强大的插件化架构。我们需要一个轻量且可扩展的基座来承载我们的中间件,Fastify 的 onRequestpreHandler 钩子为实现请求拦截提供了完美的切入点。
  2. 配置中心: Apollo。Apollo 不仅仅是一个配置中心,它的核心优势在于配置的实时推送能力。客户端通过长轮询与服务端保持连接,一旦配置变更,客户端能近乎实时地收到通知。这正是实现“动态驱动”的关键。
  3. **熔断器实现: opossum**。这是一个功能强大、经过生产验证的 Node.js 熔断器库。它原生支持 async/await,提供了丰富的事件钩子(如 open, close, halfOpen),便于我们集成日志和监控。自己造轮子在时间和稳定性上都不划算,opossum 是一个务实的选择。

整体架构流程如下:

graph TD
    subgraph Apollo 配置中心
        A[熔断器配置: timeout, errorThresholdPercentage]
    end

    subgraph Fastify 应用实例
        B(Apollo Client) -- 长轮询 --> A
        B -- 监听到配置变更 --> C{配置管理器}
        C -- 更新配置 --> D[动态熔断器中间件]
        D -- 包裹 --> E[业务逻辑 Route]
        E -- 调用 --> F[下游服务]
    end

    G[用户请求] --> D

    style A fill:#f9f,stroke:#333,stroke-width:2px
    style B fill:#ccf,stroke:#333,stroke-width:2px

步骤化实现:从静态到动态

1. 项目基础结构搭建

我们先建立一个标准的 Fastify 项目。

# 初始化项目
mkdir fastify-dynamic-breaker
cd fastify-dynamic-breaker
npm init -y
npm install fastify pino-pretty opossum @apollo/ctrip-apollo

# 目录结构
# .
# ├── src
# │   ├── plugins
# │   │   └── dynamic-breaker.js  # 我们的核心中间件
# │   ├── routes
# │   │   └── external-service.js # 模拟调用下游服务的路由
# │   └── app.js                  # Fastify 应用入口
# └── package.json

2. 静态熔断器的初步集成

在进入动态化之前,先实现一个静态配置的熔断器中间件,以验证 opossum 和 Fastify 插件机制能正常协同工作。

src/plugins/dynamic-breaker.js 的第一个版本可能如下:

// src/plugins/dynamic-breaker.js

const fp = require('fastify-plugin');
const Opossum = require('opossum');

/**
 * 一个简单的静态熔断器插件
 * @param {import('fastify').FastifyInstance} fastify
 * @param {object} opts
 */
async function staticBreakerPlugin(fastify, opts) {
  // 硬编码的静态配置
  const breakerOptions = {
    timeout: 3000, // 3秒超时
    errorThresholdPercentage: 50, // 失败率达到50%时打开熔断器
    resetTimeout: 10000 // 10秒后进入半开状态
  };

  const breaker = new Opossum(async (request, reply) => {
    // 这里是需要被保护的业务逻辑
    // 在实际插件中,这里会是 next() 或者 route handler
    // 但为简化,我们暂时留空
  }, breakerOptions);

  // 附加到 fastify 实例,以便在路由中使用
  fastify.decorate('breaker', breaker);

  // 监听熔断器状态变化,用于日志和监控
  breaker.on('open', () => fastify.log.warn('Breaker is open.'));
  breaker.on('halfOpen', () => fastify.log.info('Breaker is halfOpen. Probing with next request.'));
  breaker.on('close', () => fastify.log.info('Breaker is closed.'));
}

module.exports = fp(staticBreakerPlugin, {
  name: 'static-breaker',
  fastify: '4.x'
});

这种方式的问题显而易见:breakerOptions 是写死的。

3. 引入 Apollo Client

现在,我们引入 Apollo 来管理配置。假设我们在 Apollo 上创建了一个名为 my-fastify-app 的应用,并在 default namespace 的 application 格式下配置了如下内容:

# my-fastify-app.default.application
breaker.timeout=2000
breaker.errorThresholdPercentage=50
breaker.resetTimeout=15000

我们需要一个模块来初始化 Apollo Client 并拉取这些配置。

// src/config/apollo-client.js
const { ApolloClient } = require('@apollo/ctrip-apollo');

const client = new ApolloClient({
  appId: 'my-fastify-app',
  configServerUrl: 'http://apollo-config-server-url:8080', // 替换为你的Apollo Config Service地址
  clusterName: 'default',
  namespaceNames: ['application'],
  logger: console // 在生产中替换为真实logger
});

async function getBreakerConfig() {
  try {
    const timeout = await client.getConfig('breaker.timeout', 'application');
    const errorThresholdPercentage = await client.getConfig('breaker.errorThresholdPercentage', 'application');
    const resetTimeout = await client.getConfig('breaker.resetTimeout', 'application');

    // 注意:Apollo返回的是字符串,需要类型转换
    return {
      timeout: parseInt(timeout.value, 10) || 3000,
      errorThresholdPercentage: parseInt(errorThresholdPercentage.value, 10) || 50,
      resetTimeout: parseInt(resetTimeout.value, 10) || 10000
    };
  } catch (error) {
    console.error('Failed to fetch initial config from Apollo', error);
    // 返回一个安全的默认值
    return {
      timeout: 3000,
      errorThresholdPercentage: 50,
      resetTimeout: 10000
    };
  }
}

module.exports = {
  apolloClient: client,
  getBreakerConfig
};

4. 核心:实现动态熔断器中间件

这是最关键的一步。我们需要将 Apollo 的动态更新能力与 opossum 实例结合起来。这里的挑战是:opossum 实例一旦创建,其配置就不能直接修改。天真的做法是每次配置变更时都创建一个新的 Opossum 实例,但这会导致状态丢失(例如,失败计数器会被重置)。

一个更健壮的方案是创建一个代理或包装器,它内部持有一个 Opossum 实例。当配置变更时,我们创建一个新的实例,并将所有后续请求导向这个新实例,同时优雅地处理旧实例。但在我们的场景里,一个更简单的策略是可行的,并且对业务影响更小:只更新那些 opossum 允许动态修改的属性。幸运的是,opossumtimeouterrorThresholdPercentageresetTimeout 等关键属性是可以在运行时直接修改的。

src/plugins/dynamic-breaker.js 的最终版本:

// src/plugins/dynamic-breaker.js

const fp = require('fastify-plugin');
const Opossum = require('opossum');
const { apolloClient, getBreakerConfig } = require('../config/apollo-client');

/**
 * @typedef {object} BreakerOptions
 * @property {number} timeout
 * @property {number} errorThresholdPercentage
 * @property {number} resetTimeout
 */

/**
 * 动态熔断器插件
 * @param {import('fastify').FastifyInstance} fastify
 */
async function dynamicBreakerPlugin(fastify, opts) {
  // 1. 从 Apollo 获取初始配置
  const initialConfig = await getBreakerConfig();
  fastify.log.info({ initialConfig }, 'Initialized breaker with config from Apollo');

  // 2. 创建一个通用的 action 函数,它会执行被包裹的路由处理器
  // 这个 action 必须是异步函数
  const action = async (handler, request, reply) => {
    return handler(request, reply);
  };
  
  // 3. 创建 Opossum 实例
  // 注意:这里的 action (第一个参数) 是一个占位符,实际的业务逻辑在路由处理器中。
  // 我们通过 Opossum 的 fire 方法传递真正的业务逻辑。
  const breaker = new Opossum(action, initialConfig);
  
  // 4. 监听 Apollo 配置变更
  apolloClient.onChange(changeEvent => {
    const changes = changeEvent.changes;
    let configUpdated = false;

    // 我们只关心与熔断器相关的配置项
    const newConfig = {
      timeout: changes.get('breaker.timeout'),
      errorThresholdPercentage: changes.get('breaker.errorThresholdPercentage'),
      resetTimeout: changes.get('breaker.resetTimeout'),
    };

    if (newConfig.timeout && newConfig.timeout.newValue) {
      const val = parseInt(newConfig.timeout.newValue, 10);
      breaker.options.timeout = val;
      configUpdated = true;
      fastify.log.warn(`Breaker config hot-reloaded: timeout changed from ${newConfig.timeout.oldValue} to ${val}`);
    }

    if (newConfig.errorThresholdPercentage && newConfig.errorThresholdPercentage.newValue) {
      const val = parseInt(newConfig.errorThresholdPercentage.newValue, 10);
      breaker.options.errorThresholdPercentage = val;
      configUpdated = true;
      fastify.log.warn(`Breaker config hot-reloaded: errorThresholdPercentage changed from ${newConfig.errorThresholdPercentage.oldValue} to ${val}`);
    }
    
    if (newConfig.resetTimeout && newConfig.resetTimeout.newValue) {
      const val = parseInt(newConfig.resetTimeout.newValue, 10);
      breaker.options.resetTimeout = val;
      configUpdated = true;
      fastify.log.warn(`Breaker config hot-reloaded: resetTimeout changed from ${newConfig.resetTimeout.oldValue} to ${val}`);
    }

    if (configUpdated) {
        fastify.log.info({ newOptions: breaker.options }, 'Breaker options updated successfully.');
    }
  });

  // 5. 监听熔断器状态,这是可观测性的关键
  breaker.on('open', () => fastify.log.error({ status: 'OPEN' }, 'Circuit breaker has opened. Downstream service is likely unavailable.'));
  breaker.on('halfOpen', () => fastify.log.warn({ status: 'HALF_OPEN' }, 'Circuit breaker is half-open. Attempting a probe request.'));
  breaker.on('close', () => fastify.log.info({ status: 'CLOSE' }, 'Circuit breaker has closed. Service has recovered.'));
  breaker.on('failure', (result, error) => fastify.log.warn({ err: error, result }, 'Protected call failed.'));
  
  // 6. 暴露一个 decorator,让路由可以方便地使用熔断器
  // 这提供了一个 clean API
  fastify.decorate('withBreaker', (handler) => {
    return async (request, reply) => {
      try {
        // 使用 breaker.fire() 来执行真正的业务逻辑 (handler)
        // 将 request 和 reply 作为参数传递给 action
        return await breaker.fire(handler, request, reply);
      } catch (err) {
        // Opossum 在熔断打开时会抛出 'EOPENBREAKER' 错误
        // 在超时或业务逻辑失败时会抛出原始错误
        if (err.code === 'EOPENBREAKER') {
          fastify.log.error('Request rejected because circuit breaker is open.');
          reply.code(503).send({ error: 'Service Unavailable', message: 'Downstream service is temporarily unavailable.' });
        } else {
          // 其他错误,可能是业务逻辑自身的错误或超时
          fastify.log.error({ err }, 'An error occurred within the circuit breaker execution.');
          reply.code(500).send({ error: 'Internal Server Error', message: err.message });
        }
        // return reply; // 返回 reply 以确保 Fastify 的请求生命周期正确结束
      }
    };
  });
}

module.exports = fp(dynamicBreakerPlugin, {
  name: 'dynamic-breaker',
  fastify: '4.x'
});

这个实现的精髓在于:

  • 我们只创建了一个 Opossum 实例,并在生命周期内复用它。
  • Apollo 的 onChange 回调直接修改这个实例的 options 属性。这种方式是线程安全的,因为 Node.js 的事件循环模型保证了在任一时刻只有一个回调在执行。
  • 我们提供了一个 fastify.decorate('withBreaker', ...) 装饰器,这使得在路由中应用熔断器变得极其简单和声明式。

5. 应用到业务路由

现在我们创建一个模拟的业务路由来使用这个动态熔断器。

// src/routes/external-service.js

// 模拟一个不稳定的下游服务
let requestCount = 0;
async function callFlakyService() {
  requestCount++;
  return new Promise((resolve, reject) => {
    // 模拟每 3 次请求中就有 2 次失败
    if (requestCount % 3 !== 1) {
      setTimeout(() => reject(new Error(`Flaky service failed on request #${requestCount}`)), 200);
    } else {
      // 模拟成功
      setTimeout(() => resolve({ status: 'ok', requestNum: requestCount }), 150);
    }
  });
}

/**
 * @param {import('fastify').FastifyInstance} fastify
 */
async function externalServiceRoutes(fastify, opts) {
  const handler = async (request, reply) => {
    fastify.log.info('Calling flaky service...');
    const result = await callFlakyService();
    return { data: result };
  };

  fastify.get('/external', {
    // 使用我们的装饰器来包裹路由处理器
    handler: fastify.withBreaker(handler)
  });

  // 提供一个端点来查看当前熔断器的状态和配置
  fastify.get('/breaker-status', (request, reply) => {
    // fastify.breaker 是不可访问的了,因为我们没有直接 decorate 它
    // 更好的方式是在插件内部暴露状态
    // 为简单起见,我们可以在插件中添加一个状态获取函数
    // 这里暂时省略,但生产环境必须要有
    reply.send({ message: "Status endpoint needs implementation" });
  });
}

module.exports = externalServiceRoutes;

6. 组装并启动应用

最后,在 app.js 中把所有部分串联起来。

// src/app.js

const fastify = require('fastify');
const dynamicBreakerPlugin = require('./plugins/dynamic-breaker');
const externalServiceRoutes = require('./routes/external-service');

async function buildApp() {
  const app = fastify({
    logger: {
      transport: {
        target: 'pino-pretty'
      }
    }
  });

  // 注册我们的核心插件
  await app.register(dynamicBreakerPlugin);
  
  // 注册业务路由
  await app.register(externalServiceRoutes, { prefix: '/api' });

  app.get('/', (req, reply) => {
    reply.send({ health: 'ok' });
  });

  return app;
}

async function start() {
  try {
    const app = await buildApp();
    await app.listen({ port: 3000, host: '0.0.0.0' });
    app.log.info('Server started successfully.');
  } catch (err) {
    console.error(err);
    process.exit(1);
  }
}

start();

最终成果与验证

现在,启动应用 node src/app.js

  1. 触发熔断: 连续多次请求 http://localhost:3000/api/external

    curl http://localhost:3000/api/external # 第一次成功
    curl http://localhost:3000/api/external # 第二次失败
    curl http://localhost:3000/api/external # 第三次失败

    当失败率超过 errorThresholdPercentage (默认50%) 时,你会看到日志中打印出 Circuit breaker has opened。此时再访问该接口,会立即收到 503 Service Unavailable 响应,且日志会显示 Request rejected because circuit breaker is open。这证明熔断器已打开,保护了我们的应用。

  2. 动态调整配置:

    • 熔断器打开后,默认需要 resetTimeout (15000ms) 才会进入 HALF_OPEN 状态。
    • 现在,登录 Apollo 管理界面,将 breaker.resetTimeout 的值从 15000 修改为 5000,然后发布。
    • 几乎在发布的同时,Fastify 应用的控制台会打印出日志:Breaker config hot-reloaded: resetTimeout changed from 15000 to 5000
    • 等待 5 秒(而不是原来的 15 秒)后,再次请求 api/external。你会看到日志打印 Circuit breaker is half-open,熔断器会尝试发送一次真实请求。如果这次请求成功,熔断器会关闭。

这个简单的验证流程证明了我们已经成功构建了一个由 Apollo 动态驱动、可实时调整策略的熔断器中间件。

局限性与未来迭代方向

尽管此方案解决了最初的痛点,但在生产环境中仍有几个值得考量的局限性和优化点:

  1. 熔断器粒度: 当前实现是全局单例的。所有使用 withBreaker 的路由共享同一个熔断器实例。在真实场景中,我们可能需要更细的粒度,例如为每个下游服务或每条路由创建独立的熔断器实例。这可以通过改造插件,使其接受一个 keyname 参数,并维护一个熔断器实例池来实现。Apollo 的配置也可以相应地按服务命名,如 breaker.downstream-A.timeout

  2. 状态可见性: 我们缺乏一个简单的方式从外部查询熔断器的当前状态(打开、关闭、失败率等)。一个好的实践是插件可以提供一个内部接口或注册一个专门的 status 路由,用于暴露这些 metrics,以便集成到 Prometheus 等监控系统中。

  3. 配置更新的原子性: 当前代码逐个更新配置项。如果一次发布同时修改了 timeouterrorThresholdPercentage,理论上存在一个极小的时间窗口,熔断器的配置处于一个中间状态。对于熔断器这种场景,影响不大。但对于更敏感的配置,可能需要设计一种机制来批量、原子地应用一组配置变更。

  4. 与服务网格的对比: 像 Istio 这样的服务网格在基础设施层提供了强大的熔断、重试和超时控制,无需应用代码介入。我们的应用层熔断器与之相比,优势在于能够获取更丰富的应用上下文(例如,可以根据特定的用户、租户或请求体内容来决定是否执行熔断逻辑),但缺点是与业务代码存在耦合,且需要为不同技术栈的应用单独实现。技术选型时需要权衡这种控制力与透明性之间的利弊。


  目录