我们的技术栈从来不是单一的。模型训练团队主力使用 Python 和 MLflow,而核心业务后端则由一个稳定的 ASP.NET Core 集群承载。最近,一个新的需求是将模型推理能力暴露为高性能、低延迟的 API,前端和移动端团队则更偏爱 Node.js 生态的轻量级 BFF (Backend for Frontend)。强行统一技术栈会扼杀团队效率,而各自为政又会导致 MLOps 流程割裂,模型上线周期变得不可控。
痛点很明确:我们需要一个方案,能将 MLflow 作为模型治理的单一事实来源,允许 .NET 服务安全地管理和注册模型,同时由一个高性能的 Node.js 服务作为流量入口,动态加载并提供推理服务。整个体系必须是云原生的,遵循 OCI 标准,并且可以轻松部署在 DigitalOcean 这样的平台上。
初步构想是一个双服务架构:
- Model Publisher Service (.NET Core): 一个内部服务,负责与 MLflow 通信。它的职责不是训练,而是将已经训练好、验证过的模型文件(比如 ONNX 格式)及其元数据注册到 MLflow 的 Model Registry 中,并负责更新模型的阶段(Staging, Production)。这是一个典型的控制平面组件。
- Inference Gateway (Fastify): 一个公开的、高性能的 API 网关。它不直接与训练过程耦合,而是定期轮询 MLflow Model Registry,拉取标记为 “Production” 的最新模型版本。模型工件被下载到本地并加载到内存中,然后通过一个
/predict接口提供服务。这是一个数据平面组件。
这个架构的核心是解耦。模型生命周期管理和在线推理服务被彻底分开,它们之间唯一的契约就是 MLflow。
graph TD
subgraph "控制平面 (Internal)"
A[ASP.NET Core Publisher] -- "1. Register Model & Set Stage" --> B(MLflow Server);
B -- "Stores Metadata" --> C[PostgreSQL];
B -- "Stores Artifact" --> D[Artifact Store e.g., DO Spaces];
end
subgraph "数据平面 (Public Facing)"
E[User Request] --> F{Fastify Inference Gateway};
F -- "3. Serve Prediction" --> E;
F -- "Loads Model into Memory" --> G[Loaded ONNX Model];
F -- "2. Poll for 'Production' Model" --> B;
end
style F fill:#f9f,stroke:#333,stroke-width:2px
style A fill:#ccf,stroke:#333,stroke-width:2px
第一部分: ASP.NET Core 模型发布服务
选择 ASP.NET Core 是因为它在处理强类型业务逻辑、与企业级系统集成方面的成熟度。这个服务的功能很简单:提供一个内部 API,接收模型文件,然后将其推送到 MLflow。
在真实项目中,这个服务可能会订阅一个消息队列,当模型训练流水线完成后自动触发注册流程。为了聚焦核心,我们用一个简单的 Minimal API 来实现。
首先,我们需要一个能与 MLflow API 交互的客户端。虽然没有官方的 .NET MLflow 客户端,但其 REST API 是开放的,我们可以自己封装一个。这里我们只实现创建实验、运行和注册模型的必要部分。
Program.cs - Minimal API 设置:
// Program.cs
using Microsoft.AspNetCore.Mvc;
using System.Net.Http.Headers;
using System.Text;
using System.Text.Json;
using System.Text.Json.Nodes;
var builder = WebApplication.CreateBuilder(args);
// 从配置中读取 MLflow 服务地址
var mlflowTrackingUri = builder.Configuration["MLFLOW_TRACKING_URI"] ??
throw new InvalidOperationException("MLFLOW_TRACKING_URI is not configured.");
builder.Services.AddHttpClient("MLflowClient", client =>
{
client.BaseAddress = new Uri(mlflowTrackingUri);
client.DefaultRequestHeaders.Accept.Add(new MediaTypeWithQualityHeaderValue("application/json"));
});
var app = builder.Build();
// 端点: 注册一个新模型
// 在实际场景中,模型文件可能来自 S3/Blob 存储的 URL 或 CI/CD 的产物
app.MapPost("/register-model", async (
[FromServices] IHttpClientFactory httpClientFactory,
[FromQuery] string modelName,
[FromQuery] string modelFilePath) =>
{
if (string.IsNullOrEmpty(modelName) || string.IsNullOrEmpty(modelFilePath) || !File.Exists(modelFilePath))
{
return Results.BadRequest("Invalid model name or file path.");
}
var client = httpClientFactory.CreateClient("MLflowClient");
var experimentName = "DotNetModelPublishers";
try
{
// 步骤 1: 确保实验存在,如果不存在则创建
var experimentId = await GetOrCreateExperimentAsync(client, experimentName);
// 步骤 2: 创建一个新的 Run
var runId = await CreateRunAsync(client, experimentId);
// 步骤 3: 将模型文件作为工件上传 (MLflow REST API v2.0)
// 注意:这是一个简化的实现。在生产中,MLflow 的工件存储通常是 S3、Azure Blob 等。
// 这里假设 MLflow server 和此服务可以访问同一个文件系统路径,或使用代理上传。
// 一个更健壮的方法是使用 MLflow 客户端库或直接与云存储SDK交互。
// 为了演示,我们仅记录路径。真实上传逻辑会更复杂。
await LogArtifactAsync(client, runId, modelFilePath);
// 步骤 4: 创建注册模型 (如果不存在)
await CreateRegisteredModelAsync(client, modelName);
// 步骤 5: 从 Run 创建一个新的模型版本
var modelVersion = await CreateModelVersionAsync(client, modelName, runId, "model");
return Results.Ok(new {
Message = "Model registered successfully.",
RunId = runId,
ModelName = modelName,
Version = modelVersion
});
}
catch (HttpRequestException ex)
{
// 关键的错误处理:提供上下文信息
return Results.Problem($"Failed to communicate with MLflow. Status: {ex.StatusCode}. Message: {ex.Message}");
}
catch (Exception ex)
{
// 捕获其他所有异常,防止敏感信息泄露
app.Logger.LogError(ex, "An unexpected error occurred during model registration.");
return Results.Problem("An internal server error occurred.");
}
});
// 将模型版本设置为特定阶段, 例如 "Production"
app.MapPut("/set-model-stage", async (
[FromServices] IHttpClientFactory httpClientFactory,
[FromQuery] string modelName,
[FromQuery] int version,
[FromQuery] string stage) =>
{
var client = httpClientFactory.CreateClient("MLflowClient");
var payload = new
{
name = modelName,
version = version.ToString(),
stage,
archive_existing_versions = true // 将此阶段以前的版本归档
};
var jsonPayload = JsonSerializer.Serialize(payload);
var content = new StringContent(jsonPayload, Encoding.UTF8, "application/json");
var response = await client.PostAsync("/api/2.0/mlflow/model-versions/transition-stage", content);
if (!response.IsSuccessStatusCode)
{
var error = await response.Content.ReadAsStringAsync();
return Results.Problem($"Failed to set model stage. MLflow response: {error}");
}
return Results.Ok($"Model {modelName} version {version} transitioned to {stage}.");
});
app.Run();
// --- MLflow REST API 交互的辅助方法 ---
async Task<string> GetOrCreateExperimentAsync(HttpClient client, string name)
{
var response = await client.GetAsync($"/api/2.0/mlflow/experiments/get-by-name?experiment_name={name}");
if (response.IsSuccessStatusCode)
{
var json = await response.Content.ReadFromJsonAsync<JsonObject>();
return json?["experiment"]?["experiment_id"]?.GetValue<string>() ?? throw new InvalidOperationException("Experiment ID not found.");
}
// 不存在则创建
var createPayload = new { name };
var createResponse = await client.PostAsJsonAsync("/api/2.0/mlflow/experiments/create", createPayload);
createResponse.EnsureSuccessStatusCode();
var createJson = await createResponse.Content.ReadFromJsonAsync<JsonObject>();
return createJson?["experiment_id"]?.GetValue<string>() ?? throw new InvalidOperationException("Failed to create experiment.");
}
async Task<string> CreateRunAsync(HttpClient client, string experimentId)
{
var payload = new { experiment_id = experimentId };
var response = await client.PostAsJsonAsync("/api/2.0/mlflow/runs/create", payload);
response.EnsureSuccessStatusCode();
var json = await response.Content.ReadFromJsonAsync<JsonObject>();
return json?["run"]?["info"]?["run_id"]?.GetValue<string>() ?? throw new InvalidOperationException("Run ID not found.");
}
async Task LogArtifactAsync(HttpClient client, string runId, string localPath)
{
// 这是一个简化版。MLflow API 的 log-artifact 需要 multipart/form-data
// 并且通常不直接上传文件内容,而是由客户端库处理。
// 在 REST API 层面,我们通常记录一个参数指向工件位置。
var payload = new
{
run_id = runId,
key = "model_path",
value = localPath
};
var response = await client.PostAsJsonAsync("/api/2.0/mlflow/runs/log-parameter", payload);
response.EnsureSuccessStatusCode();
// 真正的工件上传会更复杂,需要与 MLflow 的工件存储后端(如 S3)集成。
// 为了让 Fastify 服务能下载,我们必须确保工件被放置在一个可访问的位置。
// 在 DigitalOcean 场景下,我们会上传到 DO Spaces,然后在这里记录 Spaces 的 URI。
}
async Task CreateRegisteredModelAsync(HttpClient client, string name)
{
var getResponse = await client.GetAsync($"/api/2.0/mlflow/registered-models/get?name={name}");
if (getResponse.IsSuccessStatusCode) return; // 已存在
var payload = new { name };
var createResponse = await client.PostAsJsonAsync("/api/2.0/mlflow/registered-models/create", payload);
createResponse.EnsureSuccessStatusCode();
}
async Task<int> CreateModelVersionAsync(HttpClient client, string name, string runId, string sourceSubPath)
{
// 这里的 source 应该是工件存储中的相对路径
// 例如 s3://my-bucket/experiment_id/run_id/artifacts/model
// 我们简化为 run-relative path
var payload = new
{
name,
source = $"runs:/{runId}/{sourceSubPath}",
run_id = runId
};
var response = await client.PostAsJsonAsync("/api/2.0/mlflow/model-versions/create", payload);
response.EnsureSuccessStatusCode();
var json = await response.Content.ReadFromJsonAsync<JsonObject>();
var versionString = json?["model_version"]?["version"]?.GetValue<string>() ?? throw new InvalidOperationException("Model version not found.");
return int.Parse(versionString);
}
这个服务的 Dockerfile 采用多阶段构建,以保证生产镜像的最小化和安全性。
# Dockerfile for ASP.NET Core Publisher
FROM mcr.microsoft.com/dotnet/sdk:7.0 AS build
WORKDIR /src
COPY *.csproj .
RUN dotnet restore
COPY . .
RUN dotnet publish -c Release -o /app/publish --no-restore
FROM mcr.microsoft.com/dotnet/aspnet:7.0 AS final
WORKDIR /app
COPY /app/publish .
# 生产环境中,端口应该通过环境变量配置
ENV ASPNETCORE_URLS=http://+:5001
# 配置 MLflow Tracking Server 的地址
ENV MLFLOW_TRACKING_URI="http://mlflow:5000"
ENTRYPOINT ["dotnet", "PublisherService.dll"]
第二部分: Fastify 高性能推理网关
Fastify 被选中的理由是其极低的开销和极高的性能,非常适合作为直接面向用户的 API 网关。它的任务是:
- 启动时,或按固定周期,向 MLflow 查询名为
prod-model且阶段为Production的最新模型版本。 - 获取该版本工件的 URI 并下载模型文件(例如,一个
model.onnx文件)。 - 使用
onnxruntime-node库将模型加载到内存中。 - 提供一个
/predict端点,使用加载的模型进行推理。 - 所有步骤都必须有健壮的日志和错误处理。
gateway.js:
// gateway.js
'use strict'
const Fastify = require('fastify')
const axios = require('axios')
const { InferenceSession } = require('onnxruntime-node')
const fs = require('fs/promises')
const path = require('path')
const fastify = Fastify({
logger: {
level: 'info',
transport: {
target: 'pino-pretty'
}
}
})
const MLFLOW_TRACKING_URI = process.env.MLFLOW_TRACKING_URI || 'http://localhost:5000'
const MODEL_NAME = process.env.MODEL_NAME || 'prod-model'
const POLLING_INTERVAL_MS = process.env.POLLING_INTERVAL_MS || 60000 // 60秒轮询一次
// 全局变量,用于持有当前活动的推理会话
let currentModelSession = null
let currentModelVersion = null
const mlflowClient = axios.create({
baseURL: MLFLOW_TRACKING_URI,
timeout: 10000,
})
/**
* 核心逻辑:从 MLflow 加载最新的生产模型
*/
async function loadLatestProductionModel() {
fastify.log.info(`Checking for new production model version of '${MODEL_NAME}'...`)
try {
// 步骤 1: 获取 "Production" 阶段的最新模型版本
const response = await mlflowClient.get('/api/2.0/mlflow/model-versions/get-latest-versions', {
params: {
name: MODEL_NAME,
stages: 'Production'
}
})
if (!response.data.model_versions || response.data.model_versions.length === 0) {
fastify.log.warn(`No model version found in 'Production' stage for '${MODEL_NAME}'. Gateway is not ready.`)
return
}
const latestVersion = response.data.model_versions[0]
if (latestVersion.version === currentModelVersion) {
fastify.log.info(`Model version ${currentModelVersion} is already the latest. No update needed.`)
return
}
fastify.log.info(`New production model found. Version: ${latestVersion.version}. Status: ${latestVersion.status}.`)
// 步骤 2: 下载模型工件
// MLflow 的 source 字段格式通常是 `s3://...` 或 `runs:/...`
// get-download-uri API 会将其解析为可下载的 HTTP URL。
// 这是一个关键的生产实践,避免了服务直接访问底层存储。
const downloadUriResponse = await mlflowClient.get('/api/2.0/mlflow/model-versions/get-download-uri', {
params: {
name: MODEL_NAME,
version: latestVersion.version
}
})
const artifactUri = downloadUriResponse.data.artifact_uri;
fastify.log.info(`Artifact URI resolved to: ${artifactUri}`);
// 这里假设 artifactUri 是一个可直接 GET 的 URL。
// 在真实环境中,这个URL可能是指向 DigitalOcean Spaces 的预签名 URL。
// 为了本地测试,我们可能需要模拟这个行为,或者直接从文件系统读取。
// 我们假设 MLflow server 代理了工件的下载
const modelDownloadUrl = `${MLFLOW_TRACKING_URI}/get-artifact?path=${encodeURIComponent(artifactUri)}&run_uuid=${latestVersion.run_id}`;
const modelResponse = await axios.get(modelDownloadUrl, { responseType: 'arraybuffer' });
const modelDir = path.join(__dirname, 'models')
await fs.mkdir(modelDir, { recursive: true })
const modelPath = path.join(modelDir, `model_v${latestVersion.version}.onnx`)
await fs.writeFile(modelPath, modelResponse.data)
// 步骤 3: 原子化地替换推理会话
fastify.log.info(`Loading new model from ${modelPath} into ONNX Runtime...`)
const newSession = await InferenceSession.create(modelPath)
// 这里的切换必须是原子的,以避免在切换过程中处理请求
const oldSession = currentModelSession
currentModelSession = newSession
currentModelVersion = latestVersion.version
fastify.log.info(`Successfully switched to model version ${currentModelVersion}. Gateway is now serving the new model.`)
// 清理旧模型文件
if(oldSession) {
// 在生产中,可以保留几个旧版本以备回滚
const oldModelPath = path.join(modelDir, `model_v${oldSession.version}.onnx`);
// await fs.unlink(oldModelPath).catch(err => fastify.log.error(err, 'Failed to clean up old model file.'));
}
} catch (error) {
// 这里的坑在于,如果MLflow挂掉或网络抖动,不能让整个服务崩溃。
// 必须记录错误并继续使用旧模型提供服务。
fastify.log.error(error, 'Failed to update model from MLflow. Continuing with the existing model if available.')
}
}
// 推理端点
fastify.post('/predict', async (request, reply) => {
if (!currentModelSession) {
// 服务尚未就绪,返回 503 Service Unavailable
reply.code(503).send({ error: 'Model is not loaded yet. Please try again later.' })
return
}
// 输入验证
if (!request.body || !request.body.data) {
reply.code(400).send({ error: 'Invalid input. Expecting a JSON object with a "data" field.' })
return
}
try {
// 假设模型输入名为 "input", 类型为 float32 tensor
const inputTensor = new ort.Tensor('float32', request.body.data, [1, request.body.data.length])
const feeds = { input: inputTensor } // 'input' 必须与ONNX模型的输入名匹配
const results = await currentModelSession.run(feeds)
const outputTensor = results.output // 'output' 必须与ONNX模型的输出名匹配
reply.send({
modelVersion: currentModelVersion,
prediction: Array.from(outputTensor.data)
})
} catch (error) {
fastify.log.error(error, 'Inference failed.')
reply.code(500).send({ error: 'An error occurred during inference.' })
}
})
// 启动服务器并开始轮询
const start = async () => {
try {
await fastify.listen({ port: 3000, host: '0.0.0.0' })
// 首次立即加载模型
await loadLatestProductionModel()
// 之后定期轮询
setInterval(loadLatestProductionModel, POLLING_INTERVAL_MS)
} catch (err) {
fastify.log.error(err)
process.exit(1)
}
}
start()
对应的 Dockerfile:
# Dockerfile for Fastify Gateway
FROM node:18-alpine AS base
WORKDIR /usr/src/app
# onnxruntime-node 可能需要一些系统依赖
RUN apk add --no-cache libc6-compat
COPY package*.json ./
RUN npm install --omit=dev --production
COPY . .
ENV NODE_ENV=production
ENV MLFLOW_TRACKING_URI="http://mlflow:5000"
ENV MODEL_NAME="prod-model"
EXPOSE 3000
CMD [ "node", "gateway.js" ]
第三部分: 本地编排与部署到 DigitalOcean
为了在本地模拟完整的流程,docker-compose.yml 是必不可少的。它将启动 MLflow 服务器(带有一个 Postgres 后端和本地文件作为工件存储),以及我们的两个自定义服务。
docker-compose.yml:
version: '3.8'
services:
# MLflow Tracking Server
mlflow:
image: ghcr.io/mlflow/mlflow:v2.7.1
ports:
- "5000:5000"
environment:
- MLFLOW_BACKEND_STORE_URI=postgresql://mlflow_user:mlflow_pwd@db:5432/mlflow_db
# 本地测试时,工件存储在容器内的 /mlruns 目录
# 在生产中,这应该是一个 S3 兼容的 URI, 如 DigitalOcean Spaces
- MLFLOW_ARTIFACTS_DESTINATION=/mlruns
command: >
mlflow server
--host 0.0.0.0
--port 5000
--backend-store-uri ${MLFLOW_BACKEND_STORE_URI}
--artifacts-destination ${MLFLOW_ARTIFACTS_DESTINATION}
depends_on:
db:
condition: service_healthy
networks:
- mlo-network
# .NET Model Publisher Service
publisher:
build:
context: ./publisher-service # .NET 项目的路径
dockerfile: Dockerfile
ports:
- "5001:5001"
environment:
- MLFLOW_TRACKING_URI=http://mlflow:5000
depends_on:
- mlflow
networks:
- mlo-network
# Fastify Inference Gateway
gateway:
build:
context: ./gateway-service # Fastify 项目的路径
dockerfile: Dockerfile
ports:
- "3000:3000"
environment:
- MLFLOW_TRACKING_URI=http://mlflow:5000
- MODEL_NAME=prod-model
depends_on:
- mlflow
networks:
- mlo-network
# PostgreSQL for MLflow Backend
db:
image: postgres:14
ports:
- "5432:5432"
environment:
- POSTGRES_DB=mlflow_db
- POSTGRES_USER=mlflow_user
- POSTGRES_PASSWORD=mlflow_pwd
healthcheck:
test: ["CMD-SHELL", "pg_isready -U mlflow_user -d mlflow_db"]
interval: 10s
timeout: 5s
retries: 5
networks:
- mlo-network
networks:
mlo-network:
driver: bridge
部署到 DigitalOcean 的路径很清晰:
- OCI 镜像: 将两个服务的 Docker 镜像构建并推送到 DigitalOcean Container Registry (DOCR)。
- MLflow 服务: 在生产中,MLflow Server 本身也应该容器化部署。它的后端数据库可以使用 DigitalOcean Managed PostgreSQL,工件存储使用 DigitalOcean Spaces,这只需要修改 MLflow 服务的启动环境变量即可。
- 应用部署:
- 简单方案 (App Platform): 可以将
publisher和gateway作为两个独立的组件部署到 DigitalOcean App Platform。App Platform 会处理 HTTPS、负载均衡和自动伸缩。 - 灵活方案 (Kubernetes): 对于更复杂的场景,可以在 DigitalOcean Kubernetes (DOKS) 上部署。这需要为每个服务编写 Deployment 和 Service 的 YAML 文件,但提供了对网络策略、资源限制和滚动更新更精细的控制。
- 简单方案 (App Platform): 可以将
方案局限性与未来迭代路径
这套架构解决了异构技术栈下的 MLOps 流程统一问题,但并非没有缺点。
首先,模型加载机制是基于轮询的。对于需要模型近实时更新的场景,60秒的延迟可能过长。一个优化方向是利用 MLflow 的 Webhooks,当模型阶段发生变化时,主动通知 Inference Gateway 进行更新,从而实现事件驱动的模型加载。
其次,将模型加载到网关服务的内存中,只适用于中小型模型。对于数十 GB 的大模型,这种方式会耗尽网关实例的内存。届时,网关的角色需要退化为纯粹的请求路由,将推理请求转发给一个专门的、可水平扩展的推理服务集群(如 NVIDIA Triton Inference Server 或 KServe),而网关本身只负责从 MLflow 获取目标推理服务的地址。
最后,当前的方案没有内置 A/B 测试或金丝雀发布的能力。Gateway 可以被扩展,根据请求头或用户百分比,将流量路由到不同版本的模型(例如,同时加载 Production 版和 Staging 版),这是实现更复杂模型部署策略的关键一步。