在Consul Connect服务网格中构建和测试一个强制mTLS的JavaScript CQRS系统


在一个典型的微服务架构中,服务间的通信默认是明文的。这是一个长期被忽视但却致命的安全隐患。传统的解决方案是手动管理TLS证书,但这在服务数量增多时会迅速演变成一场运维灾难。我们的目标是为一套基于JavaScript的CQRS(命令查询职责分离)系统强制启用mTLS,但前提是不能让应用层代码感知到证书、加密或服务发现的复杂性,并且,这套安全强化后的架构必须是高度可测试的。

整个系统的架构图如下所示:

graph TD
    subgraph "安全网络边界 (Consul Connect Service Mesh)"
        C[Command Service] -- localhost --> CP[Sidecar Proxy]
        QP[Sidecar Proxy] -- localhost --> Q[Query Service]
        CP -- mTLS加密通讯 --> QP
    end

    User[客户端] -- HTTP POST --> C
    User -- HTTP GET --> Q

    subgraph "数据存储 (In-Memory)"
        DB[(Store)]
    end

    Q -- 读 --> DB
    Q -- 内部写 --> DB

这里的核心是,Command ServiceQuery Service 自身只监听 127.0.0.1。它们无法被外部直接访问,也无法直接访问其他服务。所有的出入流量都必须经过本地的Consul Connect Sidecar Proxy。Proxy负责服务发现、建立mTLS连接并转发流量。应用代码只需向一个本地端口(由Consul动态分配或静态配置)发送请求,即可安全地与目标服务通信。

第一步:构建基础CQRS服务

我们需要两个独立的服务:command-service 用于处理写操作,query-service 用于处理读操作。为简化模型,数据同步将通过command-service直接调用query-service的内部更新接口来完成,而非引入消息队列。在真实项目中,这里应该替换为Kafka或RabbitMQ。

Query Service (query-service/index.js)

这个服务有两个职责:对外提供数据查询接口,以及对内提供数据更新接口。

// query-service/index.js
const express = require('express');
const app = express();
app.use(express.json());

// 在生产环境中,这应该是一个真正的数据库。
// 为了演示,我们使用一个简单的内存存储。
const inventoryStore = new Map();

const QUERY_PORT = process.env.QUERY_PORT || 7001;
const INTERNAL_PORT = process.env.INTERNAL_PORT || 7002;

// 1. 公开的查询API服务器
const queryApp = express();
queryApp.get('/items/:id', (req, res) => {
    const { id } = req.params;
    if (inventoryStore.has(id)) {
        console.log(`[Query API] Fetched item ${id}`);
        res.status(200).json(inventoryStore.get(id));
    } else {
        console.log(`[Query API] Item ${id} not found`);
        res.status(404).send('Not Found');
    }
});
queryApp.listen(QUERY_PORT, '0.0.0.0', () => {
    console.log(`Query service's public API listening on 0.0.0.0:${QUERY_PORT}`);
});

// 2. 内部数据同步API服务器
// 这里的关键是:它只监听本地回环地址。
// 这意味着除了本机上的进程(比如Consul Sidecar Proxy),没有其他机器可以访问它。
const internalApp = express();
internalApp.use(express.json());
internalApp.put('/internal/items/:id', (req, res) => {
    const { id } = req.params;
    const { name, quantity } = req.body;

    if (!name || typeof quantity !== 'number') {
        console.error(`[Internal API] Invalid data for item ${id}`);
        return res.status(400).send('Invalid data');
    }

    const item = { id, name, quantity, lastUpdated: new Date().toISOString() };
    inventoryStore.set(id, item);
    console.log(`[Internal API] Item ${id} updated/created:`, item);
    res.status(200).json(item);
});
internalApp.listen(INTERNAL_PORT, '127.0.0.1', () => {
    console.log(`Query service's internal API listening on 127.0.0.1:${INTERNAL_PORT}`);
});

Command Service (command-service/index.js)

command-service 接收写命令,处理后通过Consul Connect代理调用query-service的内部接口来同步状态。

// command-service/index.js
const express = require('express');
const axios = require('axios');
const app = express();
app.use(express.json());

const COMMAND_PORT = process.env.COMMAND_PORT || 6001;

// 这是Consul Connect为query-service-internal上游配置的本地代理端口。
// 应用代码不关心目标服务的实际地址和端口,只与本地代理通信。
const QUERY_SERVICE_UPSTREAM_PORT = 9001;
const queryServiceInternalClient = axios.create({
    baseURL: `http://127.0.0.1:${QUERY_SERVICE_UPSTREAM_PORT}`,
});


app.post('/items/:id', async (req, res) => {
    const { id } = req.params;
    const { name, quantity } = req.body;

    // 业务逻辑校验
    if (!name || typeof quantity !== 'number' || quantity < 0) {
        console.error(`[Command API] Invalid command for item ${id}:`, req.body);
        return res.status(400).json({ error: 'Invalid item data' });
    }

    const command = { id, name, quantity };
    console.log(`[Command API] Processing command for item ${id}`);

    try {
        // 通过本地的Consul Connect代理将状态同步到Query Service
        const response = await queryServiceInternalClient.put(`/internal/items/${id}`, { name, quantity });
        console.log(`[Command API] Successfully synced state for item ${id} to query service.`);
        res.status(202).json(response.data);
    } catch (error) {
        console.error(`[Command API] Failed to sync state for item ${id}. Error: ${error.message}`, {
            // 在生产环境中,避免记录过多敏感信息
            code: error.code,
            address: error.address,
            port: error.port,
        });
        res.status(500).json({ error: 'Failed to process command due to internal error' });
    }
});


app.listen(COMMAND_PORT, '0.0.0.0', () => {
    console.log(`Command service API listening on 0.0.0.0:${COMMAND_PORT}`);
});

这里的核心在于queryServiceInternalClientbaseURL。它指向127.0.0.1:9001,而不是query-service的真实IP和端口。9001这个端口将由command-service的sidecar代理监听,所有发往这个端口的请求都会被代理加密并通过mTLS隧道转发给query-service的sidecar代理,最终到达query-service的内部7002端口。

第二步:配置Consul服务定义

为了让Consul管理这些服务并为其启用Connect,我们需要创建服务定义文件。

Query Service 定义 (consul-config/query-service.hcl)

// consul-config/query-service.hcl
service {
  name = "query-service"
  port = 7001
  
  // 健康检查
  check {
    id = "query-api-check"
    name = "Query API health check"
    http = "http://localhost:7001/health" // 假设有一个/health端点
    interval = "10s"
    timeout = "1s"
  }
}

// 这是内部数据同步服务,它本身不被外部发现
// 而是作为 "query-service" 的一个 sidecar 服务存在
service {
  name = "query-service-internal"
  port = 7002
  
  // 关键:启用Consul Connect
  connect {
    sidecar_service {}
  }

  // 这里的地址必须是127.0.0.1,因为服务只监听本地
  address = "127.0.0.1"
}

Command Service 定义 (consul-config/command-service.hcl)

// consul-config/command-service.hcl
service {
  name = "command-service"
  port = 6001

  connect {
    sidecar_service {
      // 关键:定义上游依赖
      // 这会告诉本地的sidecar proxy监听一个端口,并将流量转发到 "query-service-internal"
      proxy {
        upstreams {
          destination_name = "query-service-internal"
          local_bind_port  = 9001 // 这就是代码中使用的端口
        }
      }
    }
  }

  check {
    id = "command-api-check"
    name = "Command API health check"
    http = "http://localhost:6001/health"
    interval = "10s"
    timeout = "1s"
  }
}

第三步:运行与验证

  1. 启动Consul:

    # 以开发模式启动Consul
    consul agent -dev -client 0.0.0.0
  2. 注册服务:

    # 在另一个终端
    consul services register consul-config/query-service.hcl
    consul services register consul-config/command-service.hcl
  3. 启动服务及其Sidecar:

    • Query Service:

      # 启动Query Service应用
      node query-service/index.js &
      # 启动它的Sidecar Proxy
      consul connect proxy -sidecar-for query-service-internal
    • Command Service:

      # 启动Command Service应用
      node command-service/index.js &
      # 启动它的Sidecar Proxy
      consul connect proxy -sidecar-for command-service

      此时,所有组件都已运行。

  4. 功能验证:

    • 发送命令:

      curl -X POST http://localhost:6001/items/item-001 \
           -H "Content-Type: application/json" \
           -d '{"name": "Super Widget", "quantity": 100}'

      你应该能看到command-servicequery-service的日志,显示命令处理和数据同步成功。

    • 查询数据:

      curl http://localhost:7001/items/item-001

      这应该会返回刚才创建的物品信息。

    • 安全验证: 尝试直接访问query-service的内部端口是失败的,因为它只监听127.0.0.1。这就是服务网格提供的网络隔离。

第四步:棘手的测试问题

现在进入了最核心的环节:如何用Jest为command-service编写集成测试?command-service依赖于127.0.0.1:9001上的query-service代理。在标准的Jest测试环境中,既没有Consul运行,也没有Sidecar Proxy。直接运行测试必然失败。

一个常见的错误是修改应用代码,在测试环境下改变baseURL去连接一个mock服务。这破坏了测试的保真度,因为你测试的代码和生产运行的代码不完全一样。我们的目标是,在不修改任何应用代码的前提下,让测试通过

解决方案是创建一个测试套件,它在运行测试前,在127.0.0.1:9001端口上启动一个模拟query-service-internal行为的HTTP服务器。

创建测试文件 (command-service/command.test.js)

// command-service/command.test.js
const axios = require('axios');
const http = require('http');

// 生产代码中的硬编码端口,测试必须遵守这个约定
const QUERY_SERVICE_UPSTREAM_PORT = 9001; 
const COMMAND_SERVICE_BASE_URL = 'http://localhost:6001'; // 假设command-service本身也在本地运行以供测试

// 模拟下游服务 (Query Service Internal)
let mockQueryServer;
let lastReceivedData;

// 在所有测试开始前,启动一个模拟服务器监听在sidecar proxy应该在的端口上
beforeAll((done) => {
    mockQueryServer = http.createServer((req, res) => {
        let body = '';
        req.on('data', chunk => {
            body += chunk.toString();
        });
        req.on('end', () => {
            lastReceivedData = {
                method: req.method,
                url: req.url,
                headers: req.headers,
                body: JSON.parse(body),
            };
            
            // 模拟成功的响应
            if (req.method === 'PUT' && req.url.startsWith('/internal/items/')) {
                res.writeHead(200, { 'Content-Type': 'application/json' });
                res.end(JSON.stringify({ ...JSON.parse(body), status: 'mocked_ok' }));
            } else {
                res.writeHead(404);
                res.end();
            }
        });
    }).listen(QUERY_SERVICE_UPSTREAM_PORT, '127.0.0.1', done);
});

// 所有测试结束后,关闭模拟服务器
afterAll((done) => {
    mockQueryServer.close(done);
});

// 每个测试前重置状态
beforeEach(() => {
    lastReceivedData = null;
});

describe('Command Service Integration with Proxied Query Service', () => {
    it('should send a valid command to the query service via the local upstream port', async () => {
        const itemId = 'item-test-01';
        const payload = { name: 'Test Item', quantity: 50 };

        // 这里的axios实例是测试专用的,用于触发command-service的API
        const commandApiClient = axios.create({ baseURL: COMMAND_SERVICE_BASE_URL });
        
        // 触发被测系统 (command-service)
        // 注意:这里需要确保command-service实例正在运行
        // 在一个完整的测试框架中,我们会在beforeAll中也启动command-service
        // 为简化,我们假设它已手动启动
        const response = await commandApiClient.post(`/items/${itemId}`, payload);

        // 断言1: command-service本身返回了正确的状态码和数据
        expect(response.status).toBe(202);
        expect(response.data.status).toBe('mocked_ok');

        // 断言2: 我们的模拟下游服务器收到了正确的请求
        expect(lastReceivedData).not.toBeNull();
        expect(lastReceivedData.method).toBe('PUT');
        expect(lastReceivedData.url).toBe(`/internal/items/${itemId}`);
        expect(lastReceivedData.body.name).toBe(payload.name);
        expect(lastReceivedData.body.quantity).toBe(payload.quantity);
    });

    it('should return 500 if the downstream query service fails', async () => {
        // 重新配置模拟服务器以返回错误
        mockQueryServer.close();
        mockQueryServer = http.createServer((req, res) => {
            res.writeHead(500, { 'Content-Type': 'application/json' });
            res.end(JSON.stringify({ error: 'mocked internal server error' }));
        }).listen(QUERY_SERVICE_UPSTREAM_PORT, '127.0.0.1');

        const itemId = 'item-fail-01';
        const payload = { name: 'Failing Item', quantity: 10 };
        const commandApiClient = axios.create({ baseURL: COMMAND_SERVICE_BASE_URL });
        
        // 使用try-catch或.catch()来测试失败的Promise
        await expect(commandApiClient.post(`/items/${itemId}`, payload)).rejects.toThrow('Request failed with status code 500');
    });
});

这个测试方案的精妙之处在于,它完全模拟了Consul Connect为应用提供的环境。command-service的代码一行未改,它依然盲目地向127.0.0.1:9001发送请求,而我们的测试工具恰好在这个地址伪装成了它所期望的上游服务。这使得测试既能验证command-service的业务逻辑,又能验证它与下游服务(的代理)的集成方式是否正确。

方案的局限性与展望

当前实现虽然解决了核心的mTLS和可测试性问题,但在生产环境中仍有几个需要演进的地方。

首先,服务间的数据同步机制过于简单。使用HTTP直连进行同步存在可靠性问题,如果query-service暂时不可用,写操作的数据就会丢失。一个健壮的系统应当引入消息队列(如Kafka)或事件溯源(Event Sourcing)模式,实现最终一致性,并提供持久化和重试能力。

其次,我们的集成测试模拟了下游服务,但没有验证mTLS本身。它验证的是command-service是否正确地与它认为的下游代理通信。要进行更深层次的端到端测试,可能需要借助Testcontainers这类工具,在测试执行期间动态启动一个包含Consul和sidecar的Docker环境。这会显著增加测试的复杂度和执行时间,需要在测试覆盖率和执行效率之间做出权衡。

最后,Consul Connect的能力远不止mTLS。它还支持基于服务意图(Intentions)的七层授权。下一步的演进将是定义精细的访问策略,例如只允许command-service调用query-service-internalPUT /internal/items/*端点,从而实现真正的零信任网络。


  目录