使用Terraform构建集成OIDC、Fluentd与Solr的Celery分布式任务可观测性管道


我们的一个核心业务严重依赖Celery进行异步任务处理,随着系统复杂度的攀升,这套体系逐渐变成了一个难以捉摸的“黑盒”。当任务失败时,追溯其根源就像大海捞针;性能瓶颈隐藏在成千上万个执行节点中,无法定位;更关键的是,审计需求要求我们能明确追踪到每个任务链的触发源头,而我们现有的日志系统只是一堆散乱的、无关联的文本。问题的核心是缺乏一个集中的、结构化的、带有身份上下文的可观测性系统。

初步的构想很简单:将所有Celery worker的日志集中起来。但这远远不够。单纯的日志聚合只会制造一个更大的信息垃圾场。我们需要的是一个能够回答以下问题的系统:

  1. 对于一个特定的业务流程(可能跨越多个任务),其完整的执行链路是怎样的?
  2. 哪个任务最耗时?在哪个worker上执行失败了?
  3. 一个任务是由哪个用户或服务触发的?其权限上下文是什么?

为了解决这个痛点,我们设计并实现了一套基于Terraform自动化部署的、集成了OIDC身份认证、Fluentd日志收集、以及Solr作为索引后端的分布式任务可观测性管道。

# 架构选型与决策

这个技术栈的选择并非偶然,而是经过了审慎的考量。

  • Terraform: 在生产环境中,任何基础设施的变更都必须是可追溯、可复现的。手动配置虚拟机、安装软件的方式是灾难的开始。Terraform让我们能用代码来定义和管理整个可观测性平台,从Solr集群到Fluentd的配置,确保了环境的一致性和自动化部署。
  • Fluentd: 它的插件生态系统是无与伦比的。我们需要解析Celery输出的JSON日志,需要从日志中提取字段,需要添加额外的元数据(如trace_id),最后还需要可靠地将数据发送到Solr。Fluentd的in_tail, filter_parser, filter_record_transformer, 和 out_solr 插件组合完美地满足了这些需求。
  • Apache Solr: 团队内部已经有维护Solr集群的经验,复用现有技术栈能降低认知和运维成本。更重要的是,Solr强大的Schema定义能力让我们能够强制实施结构化日志的规范。相比于schema-on-read的方案,我们更倾向于schema-on-write,这能在数据入口处就保证质量,为后续的精确查询和聚合分析打下坚实基础。
  • Celery: 这是我们要观测的对象,它的信号机制和自定义Task基类的能力为我们注入追踪逻辑提供了入口。
  • OpenID Connect (OIDC): 这是整个方案的画龙点睛之笔。我们不仅需要追踪任务,还需要追踪“谁”触发了任务。通过引入OIDC,我们可以为触发任务的服务或用户颁发JWT。这个JWT不仅用于认证,其Claims中还携带了身份信息、会话ID等关键审计上下文。我们将这些信息注入到每一条日志中,实现了端到端的身份追踪。

下面是整个数据流的架构图:

graph TD
    subgraph "Celery Worker Node"
        A[Celery Task] -- writes structured log --> B(File Log: /var/log/celery.log)
    end

    subgraph "Observability Pipeline"
        C[Fluentd Agent] -- tails log --> B
        C -- parses & enriches --> D{Fluentd Filters}
        D -- adds trace_id, jwt_claims --> D
        D -- sends to --> E[SolrCloud Cluster]
    end

    subgraph "Triggering Service"
        F[API Service] -- obtains OIDC token --> G[Identity Provider]
        F -- calls task with token --> A
    end

    subgraph "Analysis & Auditing"
        H[Auditor/Developer] -- queries logs --> E
    end

    A --> |propagates context| A

# 基础设施即代码:用Terraform定义一切

我们首先从基础设施层开始。一个可靠的系统必须建立在稳固、自动化的基石之上。

1. 定义SolrCloud集群

我们使用AWS EC2来部署SolrCloud,并用Zookeeper进行协调。为了简化,这里展示一个基础的、非生产强化的Terraform配置。在真实项目中,我们会使用更复杂的模块,包含Auto Scaling Group、ELB和更精细的安全组规则。

solr-cluster.tf:

# 声明变量,增加复用性
variable "instance_count" {
  description = "Number of Solr instances"
  type        = number
  default     = 3
}

variable "instance_type" {
  description = "EC2 instance type for Solr nodes"
  type        = string
  default     = "t3.large"
}

variable "ami_id" {
  description = "AMI for Solr nodes (pre-baked with Solr & Java)"
  type        = string
  # 实际项目中应使用 Packer 构建的自定义 AMI
  default     = "ami-0c55b159cbfafe1f0"
}

# 安全组,仅允许内部流量和SSH
resource "aws_security_group" "solr_sg" {
  name        = "solr-cluster-sg"
  description = "Allow traffic within the Solr cluster"

  ingress {
    from_port   = 22
    to_port     = 22
    protocol    = "tcp"
    cidr_blocks = ["YOUR_BASTION_IP/32"] # 堡垒机IP
  }

  ingress {
    from_port   = 8983 // Solr port
    to_port     = 8983
    protocol    = "tcp"
    self        = true # 允许组内互访
  }

  ingress {
    from_port   = 2181 // Zookeeper port
    to_port     = 2181
    protocol    = "tcp"
    self        = true
  }
  
  # 允许 Fluentd 节点的流量进入
  # 实际项目中应使用 security_group_id
  ingress {
    from_port = 8983
    to_port = 8983
    protocol = "tcp"
    cidr_blocks = ["FLUENTD_AGENT_SUBNET/24"]
  }

  egress {
    from_port   = 0
    to_port     = 0
    protocol    = "-1"
    cidr_blocks = ["0.0.0.0/0"]
  }
}

# 创建多个Solr实例
resource "aws_instance" "solr_node" {
  count         = var.instance_count
  ami           = var.ami_id
  instance_type = var.instance_type
  security_groups = [aws_security_group.solr_sg.name]

  tags = {
    Name = "solr-node-${count.index}"
    Project = "Observability"
  }

  # User data 脚本用于启动 Solr,并指向 Zookeeper ensemble
  # 在真实项目中,Zookeeper 也应由 Terraform 管理
  user_data = <<-EOF
              #!/bin/bash
              /opt/solr/bin/solr start -c -p 8983 -z "zk1:2181,zk2:2181,zk3:2181"
              EOF
}

这里的坑在于,user_data过于简单。生产环境的启动脚本需要处理更复杂的逻辑,例如等待Zookeeper可用、设置JVM参数等。此外,Zookeeper集群本身也应该通过Terraform进行管理。

2. Solr Schema定义

在Solr中创建collection之前,我们必须定义一个严格的Schema。这是保证数据质量的关键。我们将schema定义文件schema.xml通过配置管理工具(如Ansible)或在AMI构建时分发到Solr节点。

celery_logs_schema.xml:

<?xml version="1.0" encoding="UTF-8" ?>
<schema name="celery-logs" version="1.6">
    <!-- 关键字段定义 -->
    <field name="id" type="string" indexed="true" stored="true" required="true" multiValued="false" />
    <field name="timestamp" type="pdate" indexed="true" stored="true" default="NOW" />
    <field name="trace_id" type="string" indexed="true" stored="true" />
    <field name="task_id" type="string" indexed="true" stored="true" />
    <field name="task_name" type="string" indexed="true" stored="true" />
    <field name="worker_hostname" type="string" indexed="true" stored="true" />
    <field name="log_level" type="string" indexed="true" stored="true" />
    <field name="message" type="text_general" indexed="true" stored="true" />
    <field name="exception" type="text_general" indexed="true" stored="true" />

    <!-- 动态字段,用于存储所有JWT Claims -->
    <!-- 这样我们就不需要为每个claim预定义字段 -->
    <dynamicField name="jwt_claim_*" type="string" indexed="true" stored="true"/>

    <!-- 确保 trace_id, task_id 和 hostname 能被快速过滤 -->
    <copyField source="trace_id" dest="trace_id_str"/>
    <copyField source="task_id" dest="task_id_str"/>
    <copyField source="worker_hostname" dest="hostname_str"/>

    <fieldType name="pdate" class="solr.DatePointField" docValues="true"/>
    <fieldType name="string" class="solr.StrField" sortMissingLast="true" docValues="true" />
    <fieldType name="text_general" class="solr.TextField" positionIncrementGap="100">
      <analyzer>
        <tokenizer class="solr.StandardTokenizerFactory"/>
        <filter class="solr.LowerCaseFilterFactory"/>
      </analyzer>
    </fieldType>

    <uniqueKey>id</uniqueKey>
</schema>

我们使用了动态字段 jwt_claim_*,这是一个非常实用的技巧。它允许我们将JWT中所有Claims(如sub, iss, aud等)自动索引为jwt_claim_sub, jwt_claim_iss等字段,而无需预先在schema中声明,极大地增强了灵活性。

# 改造应用层:让Celery说“结构化语言”

仅仅有基础设施是不够的,我们必须改造Celery应用,让它产生我们需要的结构化数据。

1. 输出JSON格式日志

第一步是让Celery的日志记录器停止输出纯文本,改为输出JSON。我们通过自定义Python的logging.Formatter来实现。

json_formatter.py:

import logging
import json
from datetime import datetime

class JsonFormatter(logging.Formatter):
    """
    自定义 Formatter,将日志记录输出为JSON格式。
    """
    def format(self, record):
        log_record = {
            "timestamp": self.formatTime(record, self.datefmt),
            "log_level": record.levelname,
            "message": record.getMessage(),
            "task_id": getattr(record, "task_id", "N/A"),
            "task_name": getattr(record, "task_name", "N/A"),
            "trace_id": getattr(record, "trace_id", "N/A"),
        }

        if record.exc_info:
            log_record["exception"] = self.formatException(record.exc_info)

        # 添加来自JWT的上下文
        if hasattr(record, "jwt_claims"):
            for key, value in record.jwt_claims.items():
                log_record[f"jwt_claim_{key}"] = value
        
        return json.dumps(log_record)

# 在Celery配置中应用这个Formatter
# celery_config.py
# from celery_app.json_formatter import JsonFormatter
#
# handler = logging.FileHandler('/var/log/celery/app.log')
# handler.setFormatter(JsonFormatter())
#
# # 获取 celery.task logger 并添加 handler
# task_logger = logging.getLogger('celery.task')
# task_logger.addHandler(handler)
# task_logger.setLevel(logging.INFO)

这段代码的核心是format方法,它将LogRecord对象转换成一个字典,然后序列化为JSON。注意我们为task_id, trace_id等预留了位置。

2. 注入追踪与身份上下文

如何将trace_id和OIDC上下文注入到日志中?我们通过创建一个自定义的Celery Task基类来优雅地解决这个问题。所有业务任务都将继承自这个基类。

context_task.py:

import uuid
import jwt
from celery import Task
from celery.signals import before_task_publish, task_prerun
from logging import getLogger
from threading import local

# 使用 ThreadLocal 来安全地在单个线程(即单个任务执行)中存储上下文
_task_context = local()
_task_context.trace_id = None
_task_context.jwt_claims = {}

logger = getLogger(__name__)

# 一个简化的OIDC公钥获取器,生产环境应有缓存和错误处理
def get_oidc_public_key(issuer_url, kid):
    # In a real scenario, this would fetch keys from the JWKS endpoint
    # and cache them.
    # For this example, we'll use a placeholder.
    # e.g., requests.get(f"{issuer_url}/.well-known/jwks.json")
    return "YOUR_OIDC_PUBLIC_KEY_PEM_STRING"

@before_task_publish.connect
def propagate_trace_context(sender=None, headers=None, body=None, **kwargs):
    """
    在任务发布前,将当前上下文注入到任务头中。
    """
    if _task_context.trace_id:
        headers.setdefault('trace_id', _task_context.trace_id)
    # JWT token 也通过 headers 传递
    if 'oidc_token' in headers:
         # No need to do anything, it's already there
         pass

@task_prerun.connect
def load_trace_context(sender=None, task_id=None, task=None, args=None, kwargs=None, **opts):
    """
    在任务执行前,从任务请求头中加载上下文。
    """
    request = task.request
    trace_id = request.headers.get('trace_id') if request.headers else None
    
    _task_context.trace_id = trace_id or str(uuid.uuid4())
    _task_context.jwt_claims = {}

    oidc_token = request.headers.get('oidc_token') if request.headers else None
    if oidc_token:
        try:
            # 这里的坑:每次都验证JWT会带来性能开销。
            # 优化方案:可以在网关层或任务入口处验证一次,后续任务仅透传解码后的claims。
            # 但为了审计的完整性,保留每个任务的验证逻辑更安全。
            header = jwt.get_unverified_header(oidc_token)
            public_key = get_oidc_public_key("YOUR_ISSUER_URL", header['kid'])
            
            # 验证签名、签发者、受众等
            claims = jwt.decode(
                oidc_token,
                public_key,
                algorithms=["RS256"],
                issuer="YOUR_ISSUER_URL",
                audience="YOUR_AUDIENCE"
            )
            _task_context.jwt_claims = claims
        except jwt.PyJWTError as e:
            logger.error(f"OIDC token validation failed for task {task.name}: {e}")
            # 根据安全策略决定是否中止任务
            # raise SecurityException("Invalid OIDC token")


class AuditableTask(Task):
    """
    我们所有业务Task的基类,自动处理日志上下文。
    """
    def __call__(self, *args, **kwargs):
        # 将上下文绑定到 logger record factory
        old_factory = getLogger().getLogRecordFactory()

        def record_factory(*args, **kwargs):
            record = old_factory(*args, **kwargs)
            record.task_id = self.request.id
            record.task_name = self.name
            record.trace_id = _task_context.trace_id
            record.jwt_claims = _task_context.jwt_claims
            return record
        
        getLogger().setLogRecordFactory(record_factory)

        try:
            result = super().__call__(*args, **kwargs)
        finally:
            # 恢复原始 factory,避免影响其他非任务代码
            getLogger().setLogRecordFactory(old_factory)
        
        return result

# 示例任务
# from celery_app import app
# from celery_app.context_task import AuditableTask

# @app.task(base=AuditableTask, bind=True)
# def process_order(self, order_id, oidc_token=None):
#     logger = getLogger(__name__)
#     logger.info(f"Processing order {order_id}")
#     # ... 业务逻辑 ...
#     if some_condition:
#         logger.warning("Potential issue detected.")
#     # 调用子任务,上下文会自动传播
#     another_task.apply_async(args=[...], headers={'oidc_token': oidc_token})

这段代码是整个方案的核心。

  1. _task_context 使用 threading.local() 来存储当前任务的上下文,保证了线程安全。
  2. before_task_publish 信号确保当一个任务调用另一个任务时,trace_idoidc_token 会被自动放入消息头,实现上下文传播。
  3. task_prerun 信号在任务执行前,从消息头中提取这些信息,并解码OIDC token,填充到_task_context中。如果这是一个任务链的起点,它会生成一个新的trace_id
  4. AuditableTask 基类重写了 __call__ 方法,通过 setLogRecordFactory 这个高级技巧,动态地将上下文信息注入到该任务生命周期内产生的所有LogRecord对象中。这是比使用LoggerAdapterextra参数更彻底、更无侵入性的方法。

# 数据管道:Fluentd的粘合艺术

现在,Celery正在生成我们想要的JSON日志,下一步就是用Fluentd捕获、处理并发送它们。

fluent.conf:

# Source: 监听Celery日志文件
<source>
  @type tail
  path /var/log/celery/app.log
  pos_file /var/log/td-agent/celery-app.log.pos
  tag celery.app
  <parse>
    @type json
  </parse>
</source>

# Filter: 添加 worker 主机名
<filter celery.app>
  @type record_transformer
  <record>
    worker_hostname "#{ENV['HOSTNAME']}"
  </record>
</filter>

# Match: 发送到Solr
<match celery.app>
  @type solr
  
  # Solr连接信息
  url http://solr-node-0.internal:8983/solr
  
  # 目标 collection
  collection celery_logs

  # Schema字段定义
  # Fluentd 插件会自动将 record 的 key 映射到 Solr 字段
  # 我们需要确保字段名一致
  defined_fields ["id", "timestamp", "trace_id", "task_id", "task_name", "worker_hostname", "log_level", "message", "exception"]

  # 唯一ID,防止重复
  unique_key_field id
  
  # 使用 record 中的 timestamp
  time_field timestamp
  time_format %Y-%m-%dT%H:%M:%S,%L%z

  # 缓冲区配置,对于生产环境至关重要
  <buffer>
    @type file
    path /var/log/td-agent/buffer/solr
    flush_interval 10s
    retry_max_interval 300
    retry_forever true
    chunk_limit_size 2M
  </buffer>
</match>

这个配置非常直观:

  1. source 插件 tail 持续监控日志文件,并使用 json 解析器将每一行转换为一个事件记录。
  2. filter 插件 record_transformer 在每个记录上添加了 worker_hostname 字段。我们通过环境变量获取主机名,这在容器化环境中尤其方便。
  3. match 插件 solr 负责将数据发送到Solr。这里的buffer配置是生产环境的关键,它能在Solr暂时不可用时将日志缓存在本地磁盘,并在恢复后重试,保证了数据的可靠性。一个常见的错误是忽略缓冲区配置,这会导致网络抖动或后端故障时丢失大量日志。

# 最终成果:可审计、可追踪的任务系统

部署完这套系统后,我们的运维和审计能力得到了质的飞跃。当需要调查一个失败的订单处理流程时,我们不再是无头苍蝇。

  1. 全链路追踪:我们只需获取到该流程的trace_id,就可以在Solr中执行查询:
    q=trace_id:"f47ac10b-58cc-4372-a567-0e02b2c3d479"&sort=timestamp asc
    这将返回该trace_id下所有任务的所有日志,按时间排序,清晰地展示了整个执行链条。

  2. 性能瓶颈分析:我们可以通过聚合查询来分析哪些任务平均耗时最长:
    q=*:*&json.facet={ tasks:{ type:terms, field:task_name, facet:{ avg_duration:"avg(duration_ms)" } } }
    (假设我们也在日志中记录了任务执行时间 duration_ms)

  3. 安全审计:当需要确定某个敏感操作是由谁触发时,我们可以查询JWT的sub(subject) claim:
    q=task_name:"delete_user_data" AND jwt_claim_sub:"user-id-123"
    这能精确定位到由user-id-123触发的所有delete_user_data任务的日志,为合规性和安全审计提供了强有力的证据。

# 局限性与未来展望

尽管这套方案解决了我们眼前的核心痛点,但它并非完美无缺。当前的实现存在一些局限性,也是我们下一步迭代的方向。

首先,Solr集群的管理和扩容相对复杂。随着日志量的指数级增长,我们需要更自动化的sharding策略和更精细的性能调优。探索将日志数据冷热分离,将旧日志归档到成本更低的存储(如S3)是一个必须考虑的成本优化方向。

其次,OIDC token在每个任务中都进行一次完整的公钥获取和验证,在高吞吐量场景下可能会成为性能瓶颈。一个可行的优化路径是引入一个轻量级的内部服务或sidecar,专门负责缓存JWKS并进行token验证,业务任务只与这个可信的服务交互。

最后,当前的方案主要解决了日志和身份追踪的问题。为了构建更全面的可观测性体系,我们应当引入分布式追踪标准,如OpenTelemetry。将trace_id替换为W3C Trace Context标准的traceparent,并将日志与Metrics、Traces关联起来,才能真正实现可观测性的“三位一体”,让我们对这个复杂的分布式系统有更深刻的洞察。


  目录