基于BDD规范与Nacos动态配置的时序事件处理引擎实现


项目中的监控告警规则一开始是硬编码在代码里的。每次业务需要调整一个阈值、增加一个告警维度,或者改变一个时间窗口,都意味着一次完整的“修改-测试-发布”流程。这不仅响应缓慢,而且风险很高。特别是对于时序数据驱动的复杂规则,比如“当A服务在过去5分钟内的P99延迟超过200ms,并且其CPU使用率的1分钟平均值高于80%时”,这种逻辑用if-else写出来,既难以阅读,也难以维护。

我们需要一个方案,能将规则的定义与执行引擎解耦,允许非开发人员(如SRE或数据分析师)以一种易于理解的方式定义规则,并且让这些规则能够动态加载、更新,无需重启服务。

初步构想是,使用一种自然语言语法来描述规则,将这些规则文本存储在配置中心,由后端服务监听变更并动态应用。这自然引出了几个技术选型决策。

首先,规则定义语言。行为驱动开发(BDD)的Gherkin语法是理想的选择。它Given-When-Then的结构几乎就是为描述场景和规则而生的,可读性极高。

其次,规则的动态管理。这正是配置中心(如Nacos或Apollo)的用武之地。我们可以将整个Gherkin .feature文件内容作为一个配置项存储在Nacos中。服务通过监听该配置项的变化,实现规则的热更新。

再次,数据存储。我们的监控数据是典型的时序数据,量大、写入频繁、查询多按时间范围聚合。PostgreSQL的插件TimescaleDB是这个场景下的专业选手,其超表(Hypertable)和优化的时间聚合函数是关键。

接着,是数据访问层。尽管JPA很流行,但在处理需要精细化SQL调优的场景,特别是利用TimescaleDB特有函数时,MyBatis提供了更直接的控制。它允许我们编写原生SQL,同时保留了对象映射的便利性。

最后,结果的实时呈现。当规则被触发,生成的告警需要实时推送到前端。一个基于WebSocket和Redux状态管理的单页应用,是处理这种数据流驱动UI更新的成熟模式。

下面是这个引擎的完整构建过程。

一、数据模型与数据库准备

我们的基础是时序事件。假设我们正在监控服务器指标,可以定义一个MonitoringEvent

// src/main/java/com/example/engine/model/MonitoringEvent.java
package com.example.engine.model;

import java.time.Instant;

public class MonitoringEvent {
    private Instant timestamp;
    private String hostname;
    private String metricName;
    private double metricValue;

    // Getters and Setters...
    
    public MonitoringEvent(Instant timestamp, String hostname, String metricName, double metricValue) {
        this.timestamp = timestamp;
        this.hostname = hostname;
        this.metricName = metricName;
        this.metricValue = metricValue;
    }

    @Override
    public String toString() {
        return "MonitoringEvent{" +
                "timestamp=" + timestamp +
                ", hostname='" + hostname + '\'' +
                ", metricName='" + metricName + '\'' +
                ", metricValue=" + metricValue +
                '}';
    }
}

在TimescaleDB中,我们需要为这些事件创建一个超表。超表是TimescaleDB的核心概念,它会自动按时间对数据进行分区。

-- DDL for monitoring_events table
CREATE TABLE monitoring_events (
    "timestamp"   TIMESTAMPTZ       NOT NULL,
    hostname      TEXT              NOT NULL,
    metric_name   TEXT              NOT NULL,
    metric_value  DOUBLE PRECISION  NOT NULL
);

-- Convert the table to a hypertable, partitioned by the 'timestamp' column.
-- This is the magic of TimescaleDB.
SELECT create_hypertable('monitoring_events', 'timestamp');

-- Create indexes for efficient querying
CREATE INDEX ON monitoring_events (hostname, metric_name, "timestamp" DESC);
CREATE INDEX ON monitoring_events (metric_name, "timestamp" DESC);

-- Table for storing triggered alerts
CREATE TABLE triggered_alerts (
    id            SERIAL PRIMARY KEY,
    rule_name     TEXT NOT-NULL,
    "timestamp"   TIMESTAMPTZ NOT NULL,
    hostname      TEXT,
    details       JSONB
);

二、BDD规则的定义与动态加载

我们将告警规则用Gherkin语法定义。例如,我们要定义一个CPU高使用率的规则。

# high-cpu-usage.feature

Feature: High CPU Usage Alerting
  As an SRE, I want to be alerted when CPU usage is critically high.

  Scenario: CPU usage exceeds 90% for 1 minute
    Given the metric is "cpu.usage.percent" for host "server-01"
    When the average value over the last "1" minute is greater than "90.0"
    Then an alert named "HighCpuUsage" should be triggered with details

这个.feature文件的全部内容将被存储在Nacos的一个配置项里。假设dataIdrules.cpu.high-usagegroupMONITORING_RULES

接下来,我们需要一个服务来从Nacos加载这些规则并监听变化。

// src/main/java/com/example/engine/config/NacosRuleLoader.java
package com.example.engine.config;

import com.alibaba.nacos.api.NacosFactory;
import com.alibaba.nacos.api.config.ConfigService;
import com.alibaba.nacos.api.config.listener.Listener;
import com.alibaba.nacos.api.exception.NacosException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import jakarta.annotation.PostConstruct;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

@Component
public class NacosRuleLoader {

    private static final Logger logger = LoggerFactory.getLogger(NacosRuleLoader.class);
    private static final String NACOS_SERVER_ADDR = "localhost:8848";
    private static final String RULES_GROUP = "MONITORING_RULES";
    private static final String CPU_RULE_DATA_ID = "rules.cpu.high-usage";

    // A map to store the raw Gherkin feature strings, keyed by dataId
    private final ConcurrentHashMap<String, String> ruleFeatures = new ConcurrentHashMap<>();
    private ConfigService configService;

    @PostConstruct
    public void init() {
        try {
            Properties properties = new Properties();
            properties.put("serverAddr", NACOS_SERVER_ADDR);
            configService = NacosFactory.createConfigService(properties);

            // Initial load
            loadAndListen(CPU_RULE_DATA_ID);
            // ... load other rules if any

        } catch (NacosException e) {
            logger.error("Failed to connect to Nacos or load initial rules", e);
            // In a real project, this might trigger a health check failure.
            throw new RuntimeException("Cannot initialize Nacos Rule Loader", e);
        }
    }

    private void loadAndListen(String dataId) throws NacosException {
        // Get initial configuration
        String initialConfig = configService.getConfig(dataId, RULES_GROUP, 5000);
        if (initialConfig != null && !initialConfig.isEmpty()) {
            logger.info("Initial load of rule [{}]:\n{}", dataId, initialConfig);
            ruleFeatures.put(dataId, initialConfig);
        } else {
            logger.warn("Rule [{}] is not configured in Nacos.", dataId);
        }

        // Add a listener for dynamic updates
        configService.addListener(dataId, RULES_GROUP, new Listener() {
            @Override
            public Executor getExecutor() {
                // It's good practice to use a dedicated thread pool for listeners
                return Executors.newSingleThreadExecutor(r -> new Thread(r, "NacosRuleListener"));
            }

            @Override
            public void receiveConfigInfo(String configInfo) {
                logger.info("Detected rule change for [{}], reloading.", dataId);
                if (configInfo != null && !configInfo.isEmpty()) {
                    ruleFeatures.put(dataId, configInfo);
                    logger.debug("New rule content for [{}]:\n{}", dataId, configInfo);
                } else {
                    logger.warn("Rule [{}] was removed from Nacos, deactivating.", dataId);
                    ruleFeatures.remove(dataId);
                }
                // Here you would typically notify the rule engine to rebuild its execution plan
            }
        });
    }

    public ConcurrentHashMap<String, String> getRuleFeatures() {
        return ruleFeatures;
    }
}

三、核心规则引擎的实现

这里的挑战在于,我们不是在测试时运行Cucumber,而是在生产环境中动态地将Gherkin文本作为执行逻辑。cucumber-core库可以帮助我们解析Gherkin,但执行需要我们自己粘合。

首先是MyBatis的Mapper,用于从TimescaleDB查询聚合数据。

// src/main/java/com/example/engine/mapper/EventMapper.java
package com.example.engine.mapper;

import com.example.engine.model.MonitoringEvent;
import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select;

import java.time.Instant;
import java.util.Optional;

@Mapper
public interface EventMapper {

    @Insert("INSERT INTO monitoring_events(\"timestamp\", hostname, metric_name, metric_value) " +
            "VALUES (#{timestamp}, #{hostname}, #{metricName}, #{metricValue})")
    void save(MonitoringEvent event);

    /**
     * This query uses TimescaleDB's time_bucket function for efficient time-series aggregation.
     * It's a prime example of why MyBatis is a good choice here, allowing direct use of DB-specific features.
     */
    @Select("SELECT avg(metric_value) " +
            "FROM monitoring_events " +
            "WHERE hostname = #{hostname} " +
            "AND metric_name = #{metricName} " +
            "AND \"timestamp\" > #{startTime}")
    Optional<Double> getAverageMetricValueSince(
            @Param("hostname") String hostname,
            @Param("metricName") String metricName,
            @Param("startTime") Instant startTime);
}

现在是引擎的核心部分。我们需要定义与Gherkin步骤对应的“Step Definitions”。这些方法将成为规则的原子操作。

// src/main/java/com/example/engine/rules/RuleStepDefinitions.java
package com.example.engine.rules;

import com.example.engine.mapper.EventMapper;
import io.cucumber.java.en.Given;
import io.cucumber.java.en.Then;
import io.cucumber.java.en.When;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import java.time.Duration;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

import static org.junit.jupiter.api.Assertions.fail;

// We use JUnit assertions for semantics, but this isn't a test.
// A failed assertion will throw an exception, halting the rule evaluation chain.

@Component
public class RuleStepDefinitions {

    private static final Logger logger = LoggerFactory.getLogger(RuleStepDefinitions.class);

    private final EventMapper eventMapper;
    
    // A context to hold state during the evaluation of a single scenario
    private final ThreadLocal<Map<String, Object>> scenarioContext = ThreadLocal.withInitial(HashMap::new);

    public RuleStepDefinitions(EventMapper eventMapper) {
        this.eventMapper = eventMapper;
    }

    public void clearScenarioContext() {
        scenarioContext.get().clear();
    }
    
    @Given("the metric is {string} for host {string}")
    public void the_metric_is_for_host(String metricName, String hostname) {
        scenarioContext.get().put("metricName", metricName);
        scenarioContext.get().put("hostname", hostname);
    }

    @When("the average value over the last {string} minute is greater than {string}")
    public void the_average_value_is_greater_than(String minutes, String thresholdStr) {
        String metricName = (String) scenarioContext.get().get("metricName");
        String hostname = (String) scenarioContext.get().get("hostname");
        long durationMinutes = Long.parseLong(minutes);
        double threshold = Double.parseDouble(thresholdStr);

        Instant startTime = Instant.now().minus(Duration.ofMinutes(durationMinutes));

        // The core logic: query the database using our MyBatis mapper
        Optional<Double> average = eventMapper.getAverageMetricValueSince(hostname, metricName, startTime);

        if (average.isEmpty()) {
            logger.warn("No data found for metric '{}' on host '{}' in the last {} minutes.", metricName, hostname, durationMinutes);
            fail("Condition not met: No data available.");
        }
        
        double actualAverage = average.get();
        logger.debug("Evaluating rule for {} on {}: avg over {}m is {:.2f}, threshold is {:.2f}",
                metricName, hostname, durationMinutes, actualAverage, threshold);

        if (actualAverage <= threshold) {
            fail(String.format("Condition not met: Average %.2f is not greater than threshold %.2f", actualAverage, threshold));
        }
        
        // If the condition is met, store the result for the 'Then' step
        scenarioContext.get().put("isTriggered", true);
        scenarioContext.get().put("actualValue", actualAverage);
    }

    @Then("an alert named {string} should be triggered with details")
    public void an_alert_should_be_triggered(String alertName) {
        if (!scenarioContext.get().getOrDefault("isTriggered", false).equals(true)) {
            // This 'Then' step shouldn't even be reached if the 'When' step failed,
            // but it's good practice for robustness.
            return;
        }

        // The rule passed. We mark it as a success in the context.
        Map<String, Object> details = new HashMap<>();
        details.put("metricName", scenarioContext.get().get("metricName"));
        details.put("hostname", scenarioContext.get().get("hostname"));
        details.put("actualValue", scenarioContext.get().get("actualValue"));

        scenarioContext.get().put("triggeredAlertName", alertName);
        scenarioContext.get().put("triggeredAlertDetails", details);
        logger.info("Rule matched! Alert '{}' is ready to be triggered for host '{}'.", alertName, details.get("hostname"));
    }
}

最后,是串联一切的RuleEngineService。它从NacosRuleLoader获取规则,使用cucumber-coreRuntime来执行它们。

// src/main/java/com/example/engine/service/RuleEngineService.java
package com.example.engine.service;

import com.example.engine.config.NacosRuleLoader;
import com.example.engine.rules.RuleStepDefinitions;
import io.cucumber.core.backend.ObjectFactory;
import io.cucumber.core.backend.StaticObjectFactory;
import io.cucumber.core.runtime.BackendServiceLoader;
import io.cucumber.core.runtime.CucumberExecutionContext;
import io.cucumber.core.runtime.ExitStatus;
import io.cucumber.core.runtime.FeaturePathFeatureSupplier;
import io.cucumber.core.runtime.Runtime;
import io.cucumber.core.runtime.TimeServiceEventBus;
import io.cucumber.plugin.DefaultSummaryPrinter;
import io.cucumber.plugin.Plugin;
import io.cucumber.plugin.event.EventBus;
import io.cucumber.plugin.event.PickleStepTestStep;
import io.cucumber.plugin.event.StepArgument;
import io.cucumber.plugin.event.TestCase;
import io.cucumber.plugin.event.TestStepFinished;
import io.cucumber.plugin.event.TestStepStarted;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.time.Clock;
import java.util.Collections;
import java.util.Map;
import java.util.UUID;
import java.util.function.Supplier;

@Service
public class RuleEngineService {
    
    private static final Logger logger = LoggerFactory.getLogger(RuleEngineService.class);

    private final NacosRuleLoader ruleLoader;
    private final RuleStepDefinitions stepDefinitions;

    public RuleEngineService(NacosRuleLoader ruleLoader, RuleStepDefinitions stepDefinitions) {
        this.ruleLoader = ruleLoader;
        this.stepDefinitions = stepDefinitions;
    }

    public void evaluateAllRules() {
        logger.info("Starting rule evaluation cycle...");
        for (Map.Entry<String, String> entry : ruleLoader.getRuleFeatures().entrySet()) {
            String ruleId = entry.getKey();
            String featureContent = entry.getValue();
            try {
                evaluateRule(ruleId, featureContent);
            } catch (Exception e) {
                // The key is to isolate failures. One failing rule shouldn't stop others.
                logger.error("Critical error during evaluation of rule [{}].", ruleId, e);
            }
        }
    }
    
    private void evaluateRule(String ruleId, String featureContent) throws IOException {
        // Cucumber's Runtime needs a file path, so we write the feature content to a temp file.
        // This is a known limitation when using the standard runtime.
        File tempFeatureFile = File.createTempFile("rule-" + ruleId, ".feature");
        tempFeatureFile.deleteOnExit();
        try (FileWriter writer = new FileWriter(tempFeatureFile)) {
            writer.write(featureContent);
        }

        // 1. Setup Cucumber Runtime environment
        EventBus bus = new TimeServiceEventBus(Clock.systemUTC(), UUID::randomUUID);
        
        // This ObjectFactory tells Cucumber how to get an instance of our Step Definitions class.
        // Since we manage it with Spring, we just pass the existing instance.
        ObjectFactory objectFactory = new StaticObjectFactory(stepDefinitions);

        // 2. Wire up the components
        var backendSupplier = new BackendServiceLoader(
                () -> Thread.currentThread().getContextClassLoader(), objectFactory);
        
        var featureSupplier = new FeaturePathFeatureSupplier(
                () -> Collections.singletonList(tempFeatureFile.toURI()));

        // We use a custom plugin to capture the result of the 'Then' step.
        ResultCapturePlugin resultCapturePlugin = new ResultCapturePlugin(bus);
        
        Runtime runtime = Runtime.builder()
                .withEventBus(bus)
                .withBackendSupplier(backendSupplier)
                .withFeatureSupplier(featureSupplier)
                .withAdditionalPlugins(resultCapturePlugin)
                .build();
        
        // 3. Execute
        // Before each run, we must clean the context in our step definitions.
        stepDefinitions.clearScenarioContext();
        runtime.run();

        // 4. Process results from our custom plugin
        if ("HighCpuUsage".equals(resultCapturePlugin.getTriggeredAlertName())) {
            logger.info("Action: Persisting and dispatching 'HighCpuUsage' alert. Details: {}", resultCapturePlugin.getTriggeredAlertDetails());
            // Here, you would save the alert to the 'triggered_alerts' table
            // and push it to the frontend via WebSocket.
        }
    }

    // A custom plugin to intercept results without polluting stdout.
    private class ResultCapturePlugin implements Plugin {
        private String triggeredAlertName;
        private Map<String, Object> triggeredAlertDetails;

        public ResultCapturePlugin(EventBus bus) {
            bus.registerHandlerFor(TestStepFinished.class, this::handleTestStepFinished);
        }

        private void handleTestStepFinished(TestStepFinished event) {
            // We check if the last step of a successful scenario has run.
            // A more robust implementation might check the TestCase object.
            if (event.getResult().getStatus().isOk()) {
                // After the 'Then' step runs successfully, its results are in the shared context.
                triggeredAlertName = (String) stepDefinitions.scenarioContext.get().get("triggeredAlertName");
                triggeredAlertDetails = (Map<String, Object>) stepDefinitions.scenarioContext.get().get("triggeredAlertDetails");
            }
        }
        
        public String getTriggeredAlertName() { return triggeredAlertName; }
        public Map<String, Object> getTriggeredAlertDetails() { return triggeredAlertDetails; }

        @Override
        public void setEventPublisher(io.cucumber.plugin.event.EventPublisher publisher) {
            publisher.registerHandlerFor(TestStepFinished.class, this::handleTestStepFinished);
        }
    }
}

下图展示了整个数据流和处理过程:

graph TD
    subgraph "数据源"
        A[Monitoring Agents] -- Pushes Metrics --> B(Event Stream / Kafka);
    end

    subgraph "后端处理引擎"
        B -- Consumes --> C{RuleEngineService};
        D[Nacos] -- Stores Rules --> E(NacosRuleLoader);
        E -- Provides Rules --> C;
        C -- Evaluates Rules --> F(RuleStepDefinitions);
        F -- Queries Data --> G(MyBatis: EventMapper);
        G -- SQL Query --> H[TimescaleDB];
        H -- Returns Aggregates --> G;
        F -- Triggers Alert --> I{Alert Handler};
    end

    subgraph "数据持久化"
        C -- Saves Raw Events --> G;
        I -- Saves Alerts --> J[triggered_alerts Table];
    end

    subgraph "前端展现"
        K[React/Redux App] -- WebSocket Conn --> L(WebSocket Server);
        I -- Publishes Alert --> L;
        L -- Pushes to Client --> K;
    end

    style D fill:#f9f,stroke:#333,stroke-width:2px
    style H fill:#ccf,stroke:#333,stroke-width:2px

四、前端实时展示

在前端,我们使用Redux来管理状态。当WebSocket服务器推送一个新的告警时,一个Redux动作(Action)被分发(dispatch)。

// src/store/alertsSlice.js

import { createSlice } from '@reduxjs/toolkit';

const initialState = {
  items: [], // Array of alert objects
  status: 'idle', // 'idle' | 'connected' | 'disconnected'
};

const alertsSlice = createSlice({
  name: 'alerts',
  initialState,
  reducers: {
    // Action to be dispatched when WebSocket connection is established
    connectionEstablished(state) {
      state.status = 'connected';
    },
    // Action to be dispatched when a new alert is received from WebSocket
    alertReceived(state, action) {
      // Keep only the latest 100 alerts to prevent memory issues
      state.items.unshift(action.payload);
      if (state.items.length > 100) {
        state.items.pop();
      }
    },
  },
});

export const { connectionEstablished, alertReceived } = alertsSlice.actions;

export default alertsSlice.reducer;

React组件会连接到WebSocket,并在收到消息时dispatch这个alertReceived动作。

// src/components/AlertDashboard.js

import React, { useEffect } from 'react';
import { useDispatch, useSelector } from 'react-redux';
import { alertReceived, connectionEstablished } from '../store/alertsSlice';

const WEBSOCKET_URL = 'ws://localhost:8080/ws/alerts';

export const AlertDashboard = () => {
  const dispatch = useDispatch();
  const alerts = useSelector(state => state.alerts.items);
  const connectionStatus = useSelector(state => state.alerts.status);

  useEffect(() => {
    const ws = new WebSocket(WEBSOCKET_URL);

    ws.onopen = () => {
      console.log('WebSocket connection established.');
      dispatch(connectionEstablished());
    };

    ws.onmessage = (event) => {
      try {
        const alertData = JSON.parse(event.data);
        console.log('Received alert:', alertData);
        dispatch(alertReceived(alertData));
      } catch (error) {
        console.error('Failed to parse incoming alert message:', error);
      }
    };

    ws.onerror = (error) => {
      console.error('WebSocket error:', error);
    };

    // Cleanup on component unmount
    return () => {
      ws.close();
    };
  }, [dispatch]);

  return (
    <div>
      <h1>Real-time Alerts ({connectionStatus})</h1>
      {alerts.map(alert => (
        <div key={alert.id} className="alert-item">
          <strong>{alert.ruleName}</strong> on {alert.hostname} at {new Date(alert.timestamp).toLocaleTimeString()}
          <pre>{JSON.stringify(alert.details, null, 2)}</pre>
        </div>
      ))}
    </div>
  );
};

局限性与未来迭代

当前方案成功地将规则定义与执行解耦,并实现了动态化。然而,它并非没有局限性。

首先,性能。在每次评估时,将Gherkin文本写入临时文件并由Cucumber Runtime解析,会带来一定的IO和CPU开销。对于每秒需要处理成千上万事件的超高吞吐量场景,这种方式可能成为瓶颈。一种优化路径是将Gherkin在加载时“编译”成某种更高效的中间表示或直接生成字节码。

其次,规则的复杂度。Gherkin非常适合表达线性的、基于状态快照的规则。但对于需要跨多个事件进行状态跟踪的复杂事件处理(CEP),例如“检测5次失败登录后是否有一次成功登录”,这种模式就显得力不从心。这需要一个真正的有状态流处理引擎,如Apache Flink。我们的方案更适合无状态或简单窗口聚合的场景。

最后,规则的测试与验证。虽然规则本身是可读的,但在推送到生产环境的Nacos之前,如何确保其逻辑正确性?需要建立一套围绕规则的“元测试”流程,比如一个沙箱环境,可以针对历史数据回放来验证新规则的行为,防止一个错误的规则导致告警风暴或静默。


  目录