构建基于 Fastify 与 ASP.NET Core 的异构 MLOps 推理服务


我们的技术栈从来不是单一的。模型训练团队主力使用 Python 和 MLflow,而核心业务后端则由一个稳定的 ASP.NET Core 集群承载。最近,一个新的需求是将模型推理能力暴露为高性能、低延迟的 API,前端和移动端团队则更偏爱 Node.js 生态的轻量级 BFF (Backend for Frontend)。强行统一技术栈会扼杀团队效率,而各自为政又会导致 MLOps 流程割裂,模型上线周期变得不可控。

痛点很明确:我们需要一个方案,能将 MLflow 作为模型治理的单一事实来源,允许 .NET 服务安全地管理和注册模型,同时由一个高性能的 Node.js 服务作为流量入口,动态加载并提供推理服务。整个体系必须是云原生的,遵循 OCI 标准,并且可以轻松部署在 DigitalOcean 这样的平台上。

初步构想是一个双服务架构:

  1. Model Publisher Service (.NET Core): 一个内部服务,负责与 MLflow 通信。它的职责不是训练,而是将已经训练好、验证过的模型文件(比如 ONNX 格式)及其元数据注册到 MLflow 的 Model Registry 中,并负责更新模型的阶段(Staging, Production)。这是一个典型的控制平面组件。
  2. Inference Gateway (Fastify): 一个公开的、高性能的 API 网关。它不直接与训练过程耦合,而是定期轮询 MLflow Model Registry,拉取标记为 “Production” 的最新模型版本。模型工件被下载到本地并加载到内存中,然后通过一个 /predict 接口提供服务。这是一个数据平面组件。

这个架构的核心是解耦。模型生命周期管理和在线推理服务被彻底分开,它们之间唯一的契约就是 MLflow。

graph TD
    subgraph "控制平面 (Internal)"
        A[ASP.NET Core Publisher] -- "1. Register Model & Set Stage" --> B(MLflow Server);
        B -- "Stores Metadata" --> C[PostgreSQL];
        B -- "Stores Artifact" --> D[Artifact Store e.g., DO Spaces];
    end

    subgraph "数据平面 (Public Facing)"
        E[User Request] --> F{Fastify Inference Gateway};
        F -- "3. Serve Prediction" --> E;
        F -- "Loads Model into Memory" --> G[Loaded ONNX Model];
        F -- "2. Poll for 'Production' Model" --> B;
    end

    style F fill:#f9f,stroke:#333,stroke-width:2px
    style A fill:#ccf,stroke:#333,stroke-width:2px

第一部分: ASP.NET Core 模型发布服务

选择 ASP.NET Core 是因为它在处理强类型业务逻辑、与企业级系统集成方面的成熟度。这个服务的功能很简单:提供一个内部 API,接收模型文件,然后将其推送到 MLflow。

在真实项目中,这个服务可能会订阅一个消息队列,当模型训练流水线完成后自动触发注册流程。为了聚焦核心,我们用一个简单的 Minimal API 来实现。

首先,我们需要一个能与 MLflow API 交互的客户端。虽然没有官方的 .NET MLflow 客户端,但其 REST API 是开放的,我们可以自己封装一个。这里我们只实现创建实验、运行和注册模型的必要部分。

Program.cs - Minimal API 设置:

// Program.cs
using Microsoft.AspNetCore.Mvc;
using System.Net.Http.Headers;
using System.Text;
using System.Text.Json;
using System.Text.Json.Nodes;

var builder = WebApplication.CreateBuilder(args);

// 从配置中读取 MLflow 服务地址
var mlflowTrackingUri = builder.Configuration["MLFLOW_TRACKING_URI"] ?? 
    throw new InvalidOperationException("MLFLOW_TRACKING_URI is not configured.");

builder.Services.AddHttpClient("MLflowClient", client =>
{
    client.BaseAddress = new Uri(mlflowTrackingUri);
    client.DefaultRequestHeaders.Accept.Add(new MediaTypeWithQualityHeaderValue("application/json"));
});

var app = builder.Build();

// 端点: 注册一个新模型
// 在实际场景中,模型文件可能来自 S3/Blob 存储的 URL 或 CI/CD 的产物
app.MapPost("/register-model", async (
    [FromServices] IHttpClientFactory httpClientFactory,
    [FromQuery] string modelName,
    [FromQuery] string modelFilePath) =>
{
    if (string.IsNullOrEmpty(modelName) || string.IsNullOrEmpty(modelFilePath) || !File.Exists(modelFilePath))
    {
        return Results.BadRequest("Invalid model name or file path.");
    }
    
    var client = httpClientFactory.CreateClient("MLflowClient");
    var experimentName = "DotNetModelPublishers";

    try
    {
        // 步骤 1: 确保实验存在,如果不存在则创建
        var experimentId = await GetOrCreateExperimentAsync(client, experimentName);

        // 步骤 2: 创建一个新的 Run
        var runId = await CreateRunAsync(client, experimentId);

        // 步骤 3: 将模型文件作为工件上传 (MLflow REST API v2.0)
        // 注意:这是一个简化的实现。在生产中,MLflow 的工件存储通常是 S3、Azure Blob 等。
        // 这里假设 MLflow server 和此服务可以访问同一个文件系统路径,或使用代理上传。
        // 一个更健壮的方法是使用 MLflow 客户端库或直接与云存储SDK交互。
        // 为了演示,我们仅记录路径。真实上传逻辑会更复杂。
        await LogArtifactAsync(client, runId, modelFilePath);

        // 步骤 4: 创建注册模型 (如果不存在)
        await CreateRegisteredModelAsync(client, modelName);

        // 步骤 5: 从 Run 创建一个新的模型版本
        var modelVersion = await CreateModelVersionAsync(client, modelName, runId, "model");

        return Results.Ok(new {
            Message = "Model registered successfully.",
            RunId = runId,
            ModelName = modelName,
            Version = modelVersion
        });
    }
    catch (HttpRequestException ex)
    {
        // 关键的错误处理:提供上下文信息
        return Results.Problem($"Failed to communicate with MLflow. Status: {ex.StatusCode}. Message: {ex.Message}");
    }
    catch (Exception ex)
    {
        // 捕获其他所有异常,防止敏感信息泄露
        app.Logger.LogError(ex, "An unexpected error occurred during model registration.");
        return Results.Problem("An internal server error occurred.");
    }
});

// 将模型版本设置为特定阶段, 例如 "Production"
app.MapPut("/set-model-stage", async (
    [FromServices] IHttpClientFactory httpClientFactory,
    [FromQuery] string modelName,
    [FromQuery] int version,
    [FromQuery] string stage) =>
{
    var client = httpClientFactory.CreateClient("MLflowClient");
    var payload = new
    {
        name = modelName,
        version = version.ToString(),
        stage,
        archive_existing_versions = true // 将此阶段以前的版本归档
    };
    var jsonPayload = JsonSerializer.Serialize(payload);
    var content = new StringContent(jsonPayload, Encoding.UTF8, "application/json");

    var response = await client.PostAsync("/api/2.0/mlflow/model-versions/transition-stage", content);
    if (!response.IsSuccessStatusCode)
    {
        var error = await response.Content.ReadAsStringAsync();
        return Results.Problem($"Failed to set model stage. MLflow response: {error}");
    }

    return Results.Ok($"Model {modelName} version {version} transitioned to {stage}.");
});

app.Run();


// --- MLflow REST API 交互的辅助方法 ---

async Task<string> GetOrCreateExperimentAsync(HttpClient client, string name)
{
    var response = await client.GetAsync($"/api/2.0/mlflow/experiments/get-by-name?experiment_name={name}");
    if (response.IsSuccessStatusCode)
    {
        var json = await response.Content.ReadFromJsonAsync<JsonObject>();
        return json?["experiment"]?["experiment_id"]?.GetValue<string>() ?? throw new InvalidOperationException("Experiment ID not found.");
    }

    // 不存在则创建
    var createPayload = new { name };
    var createResponse = await client.PostAsJsonAsync("/api/2.0/mlflow/experiments/create", createPayload);
    createResponse.EnsureSuccessStatusCode();
    var createJson = await createResponse.Content.ReadFromJsonAsync<JsonObject>();
    return createJson?["experiment_id"]?.GetValue<string>() ?? throw new InvalidOperationException("Failed to create experiment.");
}

async Task<string> CreateRunAsync(HttpClient client, string experimentId)
{
    var payload = new { experiment_id = experimentId };
    var response = await client.PostAsJsonAsync("/api/2.0/mlflow/runs/create", payload);
    response.EnsureSuccessStatusCode();
    var json = await response.Content.ReadFromJsonAsync<JsonObject>();
    return json?["run"]?["info"]?["run_id"]?.GetValue<string>() ?? throw new InvalidOperationException("Run ID not found.");
}

async Task LogArtifactAsync(HttpClient client, string runId, string localPath)
{
    // 这是一个简化版。MLflow API 的 log-artifact 需要 multipart/form-data
    // 并且通常不直接上传文件内容,而是由客户端库处理。
    // 在 REST API 层面,我们通常记录一个参数指向工件位置。
    var payload = new
    {
        run_id = runId,
        key = "model_path",
        value = localPath
    };
    var response = await client.PostAsJsonAsync("/api/2.0/mlflow/runs/log-parameter", payload);
    response.EnsureSuccessStatusCode();
    
    // 真正的工件上传会更复杂,需要与 MLflow 的工件存储后端(如 S3)集成。
    // 为了让 Fastify 服务能下载,我们必须确保工件被放置在一个可访问的位置。
    // 在 DigitalOcean 场景下,我们会上传到 DO Spaces,然后在这里记录 Spaces 的 URI。
}

async Task CreateRegisteredModelAsync(HttpClient client, string name)
{
    var getResponse = await client.GetAsync($"/api/2.0/mlflow/registered-models/get?name={name}");
    if (getResponse.IsSuccessStatusCode) return; // 已存在

    var payload = new { name };
    var createResponse = await client.PostAsJsonAsync("/api/2.0/mlflow/registered-models/create", payload);
    createResponse.EnsureSuccessStatusCode();
}

async Task<int> CreateModelVersionAsync(HttpClient client, string name, string runId, string sourceSubPath)
{
    // 这里的 source 应该是工件存储中的相对路径
    // 例如 s3://my-bucket/experiment_id/run_id/artifacts/model
    // 我们简化为 run-relative path
    var payload = new
    {
        name,
        source = $"runs:/{runId}/{sourceSubPath}",
        run_id = runId
    };
    var response = await client.PostAsJsonAsync("/api/2.0/mlflow/model-versions/create", payload);
    response.EnsureSuccessStatusCode();
    var json = await response.Content.ReadFromJsonAsync<JsonObject>();
    var versionString = json?["model_version"]?["version"]?.GetValue<string>() ?? throw new InvalidOperationException("Model version not found.");
    return int.Parse(versionString);
}

这个服务的 Dockerfile 采用多阶段构建,以保证生产镜像的最小化和安全性。

# Dockerfile for ASP.NET Core Publisher
FROM mcr.microsoft.com/dotnet/sdk:7.0 AS build
WORKDIR /src
COPY *.csproj .
RUN dotnet restore
COPY . .
RUN dotnet publish -c Release -o /app/publish --no-restore

FROM mcr.microsoft.com/dotnet/aspnet:7.0 AS final
WORKDIR /app
COPY --from=build /app/publish .
# 生产环境中,端口应该通过环境变量配置
ENV ASPNETCORE_URLS=http://+:5001
# 配置 MLflow Tracking Server 的地址
ENV MLFLOW_TRACKING_URI="http://mlflow:5000"
ENTRYPOINT ["dotnet", "PublisherService.dll"]

第二部分: Fastify 高性能推理网关

Fastify 被选中的理由是其极低的开销和极高的性能,非常适合作为直接面向用户的 API 网关。它的任务是:

  1. 启动时,或按固定周期,向 MLflow 查询名为 prod-model 且阶段为 Production 的最新模型版本。
  2. 获取该版本工件的 URI 并下载模型文件(例如,一个 model.onnx 文件)。
  3. 使用 onnxruntime-node 库将模型加载到内存中。
  4. 提供一个 /predict 端点,使用加载的模型进行推理。
  5. 所有步骤都必须有健壮的日志和错误处理。

gateway.js:

// gateway.js
'use strict'

const Fastify = require('fastify')
const axios = require('axios')
const { InferenceSession } = require('onnxruntime-node')
const fs = require('fs/promises')
const path = require('path')

const fastify = Fastify({
  logger: {
    level: 'info',
    transport: {
      target: 'pino-pretty'
    }
  }
})

const MLFLOW_TRACKING_URI = process.env.MLFLOW_TRACKING_URI || 'http://localhost:5000'
const MODEL_NAME = process.env.MODEL_NAME || 'prod-model'
const POLLING_INTERVAL_MS = process.env.POLLING_INTERVAL_MS || 60000 // 60秒轮询一次

// 全局变量,用于持有当前活动的推理会话
let currentModelSession = null
let currentModelVersion = null

const mlflowClient = axios.create({
  baseURL: MLFLOW_TRACKING_URI,
  timeout: 10000,
})

/**
 * 核心逻辑:从 MLflow 加载最新的生产模型
 */
async function loadLatestProductionModel() {
  fastify.log.info(`Checking for new production model version of '${MODEL_NAME}'...`)
  try {
    // 步骤 1: 获取 "Production" 阶段的最新模型版本
    const response = await mlflowClient.get('/api/2.0/mlflow/model-versions/get-latest-versions', {
      params: {
        name: MODEL_NAME,
        stages: 'Production'
      }
    })

    if (!response.data.model_versions || response.data.model_versions.length === 0) {
      fastify.log.warn(`No model version found in 'Production' stage for '${MODEL_NAME}'. Gateway is not ready.`)
      return
    }

    const latestVersion = response.data.model_versions[0]
    if (latestVersion.version === currentModelVersion) {
      fastify.log.info(`Model version ${currentModelVersion} is already the latest. No update needed.`)
      return
    }
    
    fastify.log.info(`New production model found. Version: ${latestVersion.version}. Status: ${latestVersion.status}.`)

    // 步骤 2: 下载模型工件
    // MLflow 的 source 字段格式通常是 `s3://...` 或 `runs:/...`
    // get-download-uri API 会将其解析为可下载的 HTTP URL。
    // 这是一个关键的生产实践,避免了服务直接访问底层存储。
    const downloadUriResponse = await mlflowClient.get('/api/2.0/mlflow/model-versions/get-download-uri', {
        params: {
            name: MODEL_NAME,
            version: latestVersion.version
        }
    })

    const artifactUri = downloadUriResponse.data.artifact_uri;
    fastify.log.info(`Artifact URI resolved to: ${artifactUri}`);

    // 这里假设 artifactUri 是一个可直接 GET 的 URL。
    // 在真实环境中,这个URL可能是指向 DigitalOcean Spaces 的预签名 URL。
    // 为了本地测试,我们可能需要模拟这个行为,或者直接从文件系统读取。
    // 我们假设 MLflow server 代理了工件的下载
    const modelDownloadUrl = `${MLFLOW_TRACKING_URI}/get-artifact?path=${encodeURIComponent(artifactUri)}&run_uuid=${latestVersion.run_id}`;
    
    const modelResponse = await axios.get(modelDownloadUrl, { responseType: 'arraybuffer' });
    
    const modelDir = path.join(__dirname, 'models')
    await fs.mkdir(modelDir, { recursive: true })
    const modelPath = path.join(modelDir, `model_v${latestVersion.version}.onnx`)
    await fs.writeFile(modelPath, modelResponse.data)

    // 步骤 3: 原子化地替换推理会话
    fastify.log.info(`Loading new model from ${modelPath} into ONNX Runtime...`)
    const newSession = await InferenceSession.create(modelPath)
    
    // 这里的切换必须是原子的,以避免在切换过程中处理请求
    const oldSession = currentModelSession
    currentModelSession = newSession
    currentModelVersion = latestVersion.version
    fastify.log.info(`Successfully switched to model version ${currentModelVersion}. Gateway is now serving the new model.`)
    
    // 清理旧模型文件
    if(oldSession) {
        // 在生产中,可以保留几个旧版本以备回滚
        const oldModelPath = path.join(modelDir, `model_v${oldSession.version}.onnx`);
        // await fs.unlink(oldModelPath).catch(err => fastify.log.error(err, 'Failed to clean up old model file.'));
    }

  } catch (error) {
    // 这里的坑在于,如果MLflow挂掉或网络抖动,不能让整个服务崩溃。
    // 必须记录错误并继续使用旧模型提供服务。
    fastify.log.error(error, 'Failed to update model from MLflow. Continuing with the existing model if available.')
  }
}

// 推理端点
fastify.post('/predict', async (request, reply) => {
  if (!currentModelSession) {
    // 服务尚未就绪,返回 503 Service Unavailable
    reply.code(503).send({ error: 'Model is not loaded yet. Please try again later.' })
    return
  }

  // 输入验证
  if (!request.body || !request.body.data) {
    reply.code(400).send({ error: 'Invalid input. Expecting a JSON object with a "data" field.' })
    return
  }

  try {
    // 假设模型输入名为 "input", 类型为 float32 tensor
    const inputTensor = new ort.Tensor('float32', request.body.data, [1, request.body.data.length])
    const feeds = { input: inputTensor } // 'input' 必须与ONNX模型的输入名匹配

    const results = await currentModelSession.run(feeds)
    const outputTensor = results.output // 'output' 必须与ONNX模型的输出名匹配

    reply.send({
      modelVersion: currentModelVersion,
      prediction: Array.from(outputTensor.data)
    })
  } catch (error) {
    fastify.log.error(error, 'Inference failed.')
    reply.code(500).send({ error: 'An error occurred during inference.' })
  }
})

// 启动服务器并开始轮询
const start = async () => {
  try {
    await fastify.listen({ port: 3000, host: '0.0.0.0' })
    // 首次立即加载模型
    await loadLatestProductionModel()
    // 之后定期轮询
    setInterval(loadLatestProductionModel, POLLING_INTERVAL_MS)
  } catch (err) {
    fastify.log.error(err)
    process.exit(1)
  }
}

start()

对应的 Dockerfile:

# Dockerfile for Fastify Gateway
FROM node:18-alpine AS base
WORKDIR /usr/src/app

# onnxruntime-node 可能需要一些系统依赖
RUN apk add --no-cache libc6-compat

COPY package*.json ./
RUN npm install --omit=dev --production

COPY . .

ENV NODE_ENV=production
ENV MLFLOW_TRACKING_URI="http://mlflow:5000"
ENV MODEL_NAME="prod-model"

EXPOSE 3000
CMD [ "node", "gateway.js" ]

第三部分: 本地编排与部署到 DigitalOcean

为了在本地模拟完整的流程,docker-compose.yml 是必不可少的。它将启动 MLflow 服务器(带有一个 Postgres 后端和本地文件作为工件存储),以及我们的两个自定义服务。

docker-compose.yml:

version: '3.8'

services:
  # MLflow Tracking Server
  mlflow:
    image: ghcr.io/mlflow/mlflow:v2.7.1
    ports:
      - "5000:5000"
    environment:
      - MLFLOW_BACKEND_STORE_URI=postgresql://mlflow_user:mlflow_pwd@db:5432/mlflow_db
      # 本地测试时,工件存储在容器内的 /mlruns 目录
      # 在生产中,这应该是一个 S3 兼容的 URI, 如 DigitalOcean Spaces
      - MLFLOW_ARTIFACTS_DESTINATION=/mlruns 
    command: >
      mlflow server
      --host 0.0.0.0
      --port 5000
      --backend-store-uri ${MLFLOW_BACKEND_STORE_URI}
      --artifacts-destination ${MLFLOW_ARTIFACTS_DESTINATION}
    depends_on:
      db:
        condition: service_healthy
    networks:
      - mlo-network

  # .NET Model Publisher Service
  publisher:
    build:
      context: ./publisher-service # .NET 项目的路径
      dockerfile: Dockerfile
    ports:
      - "5001:5001"
    environment:
      - MLFLOW_TRACKING_URI=http://mlflow:5000
    depends_on:
      - mlflow
    networks:
      - mlo-network

  # Fastify Inference Gateway
  gateway:
    build:
      context: ./gateway-service # Fastify 项目的路径
      dockerfile: Dockerfile
    ports:
      - "3000:3000"
    environment:
      - MLFLOW_TRACKING_URI=http://mlflow:5000
      - MODEL_NAME=prod-model
    depends_on:
      - mlflow
    networks:
      - mlo-network

  # PostgreSQL for MLflow Backend
  db:
    image: postgres:14
    ports:
      - "5432:5432"
    environment:
      - POSTGRES_DB=mlflow_db
      - POSTGRES_USER=mlflow_user
      - POSTGRES_PASSWORD=mlflow_pwd
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U mlflow_user -d mlflow_db"]
      interval: 10s
      timeout: 5s
      retries: 5
    networks:
      - mlo-network

networks:
  mlo-network:
    driver: bridge

部署到 DigitalOcean 的路径很清晰:

  1. OCI 镜像: 将两个服务的 Docker 镜像构建并推送到 DigitalOcean Container Registry (DOCR)。
  2. MLflow 服务: 在生产中,MLflow Server 本身也应该容器化部署。它的后端数据库可以使用 DigitalOcean Managed PostgreSQL,工件存储使用 DigitalOcean Spaces,这只需要修改 MLflow 服务的启动环境变量即可。
  3. 应用部署:
    • 简单方案 (App Platform): 可以将 publishergateway 作为两个独立的组件部署到 DigitalOcean App Platform。App Platform 会处理 HTTPS、负载均衡和自动伸缩。
    • 灵活方案 (Kubernetes): 对于更复杂的场景,可以在 DigitalOcean Kubernetes (DOKS) 上部署。这需要为每个服务编写 Deployment 和 Service 的 YAML 文件,但提供了对网络策略、资源限制和滚动更新更精细的控制。

方案局限性与未来迭代路径

这套架构解决了异构技术栈下的 MLOps 流程统一问题,但并非没有缺点。

首先,模型加载机制是基于轮询的。对于需要模型近实时更新的场景,60秒的延迟可能过长。一个优化方向是利用 MLflow 的 Webhooks,当模型阶段发生变化时,主动通知 Inference Gateway 进行更新,从而实现事件驱动的模型加载。

其次,将模型加载到网关服务的内存中,只适用于中小型模型。对于数十 GB 的大模型,这种方式会耗尽网关实例的内存。届时,网关的角色需要退化为纯粹的请求路由,将推理请求转发给一个专门的、可水平扩展的推理服务集群(如 NVIDIA Triton Inference Server 或 KServe),而网关本身只负责从 MLflow 获取目标推理服务的地址。

最后,当前的方案没有内置 A/B 测试或金丝雀发布的能力。Gateway 可以被扩展,根据请求头或用户百分比,将流量路由到不同版本的模型(例如,同时加载 Production 版和 Staging 版),这是实现更复杂模型部署策略的关键一步。


  目录