我们的一个核心业务严重依赖Celery进行异步任务处理,随着系统复杂度的攀升,这套体系逐渐变成了一个难以捉摸的“黑盒”。当任务失败时,追溯其根源就像大海捞针;性能瓶颈隐藏在成千上万个执行节点中,无法定位;更关键的是,审计需求要求我们能明确追踪到每个任务链的触发源头,而我们现有的日志系统只是一堆散乱的、无关联的文本。问题的核心是缺乏一个集中的、结构化的、带有身份上下文的可观测性系统。
初步的构想很简单:将所有Celery worker的日志集中起来。但这远远不够。单纯的日志聚合只会制造一个更大的信息垃圾场。我们需要的是一个能够回答以下问题的系统:
- 对于一个特定的业务流程(可能跨越多个任务),其完整的执行链路是怎样的?
- 哪个任务最耗时?在哪个worker上执行失败了?
- 一个任务是由哪个用户或服务触发的?其权限上下文是什么?
为了解决这个痛点,我们设计并实现了一套基于Terraform自动化部署的、集成了OIDC身份认证、Fluentd日志收集、以及Solr作为索引后端的分布式任务可观测性管道。
# 架构选型与决策
这个技术栈的选择并非偶然,而是经过了审慎的考量。
- Terraform: 在生产环境中,任何基础设施的变更都必须是可追溯、可复现的。手动配置虚拟机、安装软件的方式是灾难的开始。Terraform让我们能用代码来定义和管理整个可观测性平台,从Solr集群到Fluentd的配置,确保了环境的一致性和自动化部署。
- Fluentd: 它的插件生态系统是无与伦比的。我们需要解析Celery输出的JSON日志,需要从日志中提取字段,需要添加额外的元数据(如
trace_id),最后还需要可靠地将数据发送到Solr。Fluentd的in_tail,filter_parser,filter_record_transformer, 和out_solr插件组合完美地满足了这些需求。 - Apache Solr: 团队内部已经有维护Solr集群的经验,复用现有技术栈能降低认知和运维成本。更重要的是,Solr强大的Schema定义能力让我们能够强制实施结构化日志的规范。相比于schema-on-read的方案,我们更倾向于schema-on-write,这能在数据入口处就保证质量,为后续的精确查询和聚合分析打下坚实基础。
- Celery: 这是我们要观测的对象,它的信号机制和自定义Task基类的能力为我们注入追踪逻辑提供了入口。
- OpenID Connect (OIDC): 这是整个方案的画龙点睛之笔。我们不仅需要追踪任务,还需要追踪“谁”触发了任务。通过引入OIDC,我们可以为触发任务的服务或用户颁发JWT。这个JWT不仅用于认证,其Claims中还携带了身份信息、会话ID等关键审计上下文。我们将这些信息注入到每一条日志中,实现了端到端的身份追踪。
下面是整个数据流的架构图:
graph TD
subgraph "Celery Worker Node"
A[Celery Task] -- writes structured log --> B(File Log: /var/log/celery.log)
end
subgraph "Observability Pipeline"
C[Fluentd Agent] -- tails log --> B
C -- parses & enriches --> D{Fluentd Filters}
D -- adds trace_id, jwt_claims --> D
D -- sends to --> E[SolrCloud Cluster]
end
subgraph "Triggering Service"
F[API Service] -- obtains OIDC token --> G[Identity Provider]
F -- calls task with token --> A
end
subgraph "Analysis & Auditing"
H[Auditor/Developer] -- queries logs --> E
end
A --> |propagates context| A
# 基础设施即代码:用Terraform定义一切
我们首先从基础设施层开始。一个可靠的系统必须建立在稳固、自动化的基石之上。
1. 定义SolrCloud集群
我们使用AWS EC2来部署SolrCloud,并用Zookeeper进行协调。为了简化,这里展示一个基础的、非生产强化的Terraform配置。在真实项目中,我们会使用更复杂的模块,包含Auto Scaling Group、ELB和更精细的安全组规则。
solr-cluster.tf:
# 声明变量,增加复用性
variable "instance_count" {
description = "Number of Solr instances"
type = number
default = 3
}
variable "instance_type" {
description = "EC2 instance type for Solr nodes"
type = string
default = "t3.large"
}
variable "ami_id" {
description = "AMI for Solr nodes (pre-baked with Solr & Java)"
type = string
# 实际项目中应使用 Packer 构建的自定义 AMI
default = "ami-0c55b159cbfafe1f0"
}
# 安全组,仅允许内部流量和SSH
resource "aws_security_group" "solr_sg" {
name = "solr-cluster-sg"
description = "Allow traffic within the Solr cluster"
ingress {
from_port = 22
to_port = 22
protocol = "tcp"
cidr_blocks = ["YOUR_BASTION_IP/32"] # 堡垒机IP
}
ingress {
from_port = 8983 // Solr port
to_port = 8983
protocol = "tcp"
self = true # 允许组内互访
}
ingress {
from_port = 2181 // Zookeeper port
to_port = 2181
protocol = "tcp"
self = true
}
# 允许 Fluentd 节点的流量进入
# 实际项目中应使用 security_group_id
ingress {
from_port = 8983
to_port = 8983
protocol = "tcp"
cidr_blocks = ["FLUENTD_AGENT_SUBNET/24"]
}
egress {
from_port = 0
to_port = 0
protocol = "-1"
cidr_blocks = ["0.0.0.0/0"]
}
}
# 创建多个Solr实例
resource "aws_instance" "solr_node" {
count = var.instance_count
ami = var.ami_id
instance_type = var.instance_type
security_groups = [aws_security_group.solr_sg.name]
tags = {
Name = "solr-node-${count.index}"
Project = "Observability"
}
# User data 脚本用于启动 Solr,并指向 Zookeeper ensemble
# 在真实项目中,Zookeeper 也应由 Terraform 管理
user_data = <<-EOF
#!/bin/bash
/opt/solr/bin/solr start -c -p 8983 -z "zk1:2181,zk2:2181,zk3:2181"
EOF
}
这里的坑在于,user_data过于简单。生产环境的启动脚本需要处理更复杂的逻辑,例如等待Zookeeper可用、设置JVM参数等。此外,Zookeeper集群本身也应该通过Terraform进行管理。
2. Solr Schema定义
在Solr中创建collection之前,我们必须定义一个严格的Schema。这是保证数据质量的关键。我们将schema定义文件schema.xml通过配置管理工具(如Ansible)或在AMI构建时分发到Solr节点。
celery_logs_schema.xml:
<?xml version="1.0" encoding="UTF-8" ?>
<schema name="celery-logs" version="1.6">
<!-- 关键字段定义 -->
<field name="id" type="string" indexed="true" stored="true" required="true" multiValued="false" />
<field name="timestamp" type="pdate" indexed="true" stored="true" default="NOW" />
<field name="trace_id" type="string" indexed="true" stored="true" />
<field name="task_id" type="string" indexed="true" stored="true" />
<field name="task_name" type="string" indexed="true" stored="true" />
<field name="worker_hostname" type="string" indexed="true" stored="true" />
<field name="log_level" type="string" indexed="true" stored="true" />
<field name="message" type="text_general" indexed="true" stored="true" />
<field name="exception" type="text_general" indexed="true" stored="true" />
<!-- 动态字段,用于存储所有JWT Claims -->
<!-- 这样我们就不需要为每个claim预定义字段 -->
<dynamicField name="jwt_claim_*" type="string" indexed="true" stored="true"/>
<!-- 确保 trace_id, task_id 和 hostname 能被快速过滤 -->
<copyField source="trace_id" dest="trace_id_str"/>
<copyField source="task_id" dest="task_id_str"/>
<copyField source="worker_hostname" dest="hostname_str"/>
<fieldType name="pdate" class="solr.DatePointField" docValues="true"/>
<fieldType name="string" class="solr.StrField" sortMissingLast="true" docValues="true" />
<fieldType name="text_general" class="solr.TextField" positionIncrementGap="100">
<analyzer>
<tokenizer class="solr.StandardTokenizerFactory"/>
<filter class="solr.LowerCaseFilterFactory"/>
</analyzer>
</fieldType>
<uniqueKey>id</uniqueKey>
</schema>
我们使用了动态字段 jwt_claim_*,这是一个非常实用的技巧。它允许我们将JWT中所有Claims(如sub, iss, aud等)自动索引为jwt_claim_sub, jwt_claim_iss等字段,而无需预先在schema中声明,极大地增强了灵活性。
# 改造应用层:让Celery说“结构化语言”
仅仅有基础设施是不够的,我们必须改造Celery应用,让它产生我们需要的结构化数据。
1. 输出JSON格式日志
第一步是让Celery的日志记录器停止输出纯文本,改为输出JSON。我们通过自定义Python的logging.Formatter来实现。
json_formatter.py:
import logging
import json
from datetime import datetime
class JsonFormatter(logging.Formatter):
"""
自定义 Formatter,将日志记录输出为JSON格式。
"""
def format(self, record):
log_record = {
"timestamp": self.formatTime(record, self.datefmt),
"log_level": record.levelname,
"message": record.getMessage(),
"task_id": getattr(record, "task_id", "N/A"),
"task_name": getattr(record, "task_name", "N/A"),
"trace_id": getattr(record, "trace_id", "N/A"),
}
if record.exc_info:
log_record["exception"] = self.formatException(record.exc_info)
# 添加来自JWT的上下文
if hasattr(record, "jwt_claims"):
for key, value in record.jwt_claims.items():
log_record[f"jwt_claim_{key}"] = value
return json.dumps(log_record)
# 在Celery配置中应用这个Formatter
# celery_config.py
# from celery_app.json_formatter import JsonFormatter
#
# handler = logging.FileHandler('/var/log/celery/app.log')
# handler.setFormatter(JsonFormatter())
#
# # 获取 celery.task logger 并添加 handler
# task_logger = logging.getLogger('celery.task')
# task_logger.addHandler(handler)
# task_logger.setLevel(logging.INFO)
这段代码的核心是format方法,它将LogRecord对象转换成一个字典,然后序列化为JSON。注意我们为task_id, trace_id等预留了位置。
2. 注入追踪与身份上下文
如何将trace_id和OIDC上下文注入到日志中?我们通过创建一个自定义的Celery Task基类来优雅地解决这个问题。所有业务任务都将继承自这个基类。
context_task.py:
import uuid
import jwt
from celery import Task
from celery.signals import before_task_publish, task_prerun
from logging import getLogger
from threading import local
# 使用 ThreadLocal 来安全地在单个线程(即单个任务执行)中存储上下文
_task_context = local()
_task_context.trace_id = None
_task_context.jwt_claims = {}
logger = getLogger(__name__)
# 一个简化的OIDC公钥获取器,生产环境应有缓存和错误处理
def get_oidc_public_key(issuer_url, kid):
# In a real scenario, this would fetch keys from the JWKS endpoint
# and cache them.
# For this example, we'll use a placeholder.
# e.g., requests.get(f"{issuer_url}/.well-known/jwks.json")
return "YOUR_OIDC_PUBLIC_KEY_PEM_STRING"
@before_task_publish.connect
def propagate_trace_context(sender=None, headers=None, body=None, **kwargs):
"""
在任务发布前,将当前上下文注入到任务头中。
"""
if _task_context.trace_id:
headers.setdefault('trace_id', _task_context.trace_id)
# JWT token 也通过 headers 传递
if 'oidc_token' in headers:
# No need to do anything, it's already there
pass
@task_prerun.connect
def load_trace_context(sender=None, task_id=None, task=None, args=None, kwargs=None, **opts):
"""
在任务执行前,从任务请求头中加载上下文。
"""
request = task.request
trace_id = request.headers.get('trace_id') if request.headers else None
_task_context.trace_id = trace_id or str(uuid.uuid4())
_task_context.jwt_claims = {}
oidc_token = request.headers.get('oidc_token') if request.headers else None
if oidc_token:
try:
# 这里的坑:每次都验证JWT会带来性能开销。
# 优化方案:可以在网关层或任务入口处验证一次,后续任务仅透传解码后的claims。
# 但为了审计的完整性,保留每个任务的验证逻辑更安全。
header = jwt.get_unverified_header(oidc_token)
public_key = get_oidc_public_key("YOUR_ISSUER_URL", header['kid'])
# 验证签名、签发者、受众等
claims = jwt.decode(
oidc_token,
public_key,
algorithms=["RS256"],
issuer="YOUR_ISSUER_URL",
audience="YOUR_AUDIENCE"
)
_task_context.jwt_claims = claims
except jwt.PyJWTError as e:
logger.error(f"OIDC token validation failed for task {task.name}: {e}")
# 根据安全策略决定是否中止任务
# raise SecurityException("Invalid OIDC token")
class AuditableTask(Task):
"""
我们所有业务Task的基类,自动处理日志上下文。
"""
def __call__(self, *args, **kwargs):
# 将上下文绑定到 logger record factory
old_factory = getLogger().getLogRecordFactory()
def record_factory(*args, **kwargs):
record = old_factory(*args, **kwargs)
record.task_id = self.request.id
record.task_name = self.name
record.trace_id = _task_context.trace_id
record.jwt_claims = _task_context.jwt_claims
return record
getLogger().setLogRecordFactory(record_factory)
try:
result = super().__call__(*args, **kwargs)
finally:
# 恢复原始 factory,避免影响其他非任务代码
getLogger().setLogRecordFactory(old_factory)
return result
# 示例任务
# from celery_app import app
# from celery_app.context_task import AuditableTask
# @app.task(base=AuditableTask, bind=True)
# def process_order(self, order_id, oidc_token=None):
# logger = getLogger(__name__)
# logger.info(f"Processing order {order_id}")
# # ... 业务逻辑 ...
# if some_condition:
# logger.warning("Potential issue detected.")
# # 调用子任务,上下文会自动传播
# another_task.apply_async(args=[...], headers={'oidc_token': oidc_token})
这段代码是整个方案的核心。
-
_task_context使用threading.local()来存储当前任务的上下文,保证了线程安全。 -
before_task_publish信号确保当一个任务调用另一个任务时,trace_id和oidc_token会被自动放入消息头,实现上下文传播。 -
task_prerun信号在任务执行前,从消息头中提取这些信息,并解码OIDC token,填充到_task_context中。如果这是一个任务链的起点,它会生成一个新的trace_id。 -
AuditableTask基类重写了__call__方法,通过setLogRecordFactory这个高级技巧,动态地将上下文信息注入到该任务生命周期内产生的所有LogRecord对象中。这是比使用LoggerAdapter或extra参数更彻底、更无侵入性的方法。
# 数据管道:Fluentd的粘合艺术
现在,Celery正在生成我们想要的JSON日志,下一步就是用Fluentd捕获、处理并发送它们。
fluent.conf:
# Source: 监听Celery日志文件
<source>
@type tail
path /var/log/celery/app.log
pos_file /var/log/td-agent/celery-app.log.pos
tag celery.app
<parse>
@type json
</parse>
</source>
# Filter: 添加 worker 主机名
<filter celery.app>
@type record_transformer
<record>
worker_hostname "#{ENV['HOSTNAME']}"
</record>
</filter>
# Match: 发送到Solr
<match celery.app>
@type solr
# Solr连接信息
url http://solr-node-0.internal:8983/solr
# 目标 collection
collection celery_logs
# Schema字段定义
# Fluentd 插件会自动将 record 的 key 映射到 Solr 字段
# 我们需要确保字段名一致
defined_fields ["id", "timestamp", "trace_id", "task_id", "task_name", "worker_hostname", "log_level", "message", "exception"]
# 唯一ID,防止重复
unique_key_field id
# 使用 record 中的 timestamp
time_field timestamp
time_format %Y-%m-%dT%H:%M:%S,%L%z
# 缓冲区配置,对于生产环境至关重要
<buffer>
@type file
path /var/log/td-agent/buffer/solr
flush_interval 10s
retry_max_interval 300
retry_forever true
chunk_limit_size 2M
</buffer>
</match>
这个配置非常直观:
-
source插件tail持续监控日志文件,并使用json解析器将每一行转换为一个事件记录。 -
filter插件record_transformer在每个记录上添加了worker_hostname字段。我们通过环境变量获取主机名,这在容器化环境中尤其方便。 -
match插件solr负责将数据发送到Solr。这里的buffer配置是生产环境的关键,它能在Solr暂时不可用时将日志缓存在本地磁盘,并在恢复后重试,保证了数据的可靠性。一个常见的错误是忽略缓冲区配置,这会导致网络抖动或后端故障时丢失大量日志。
# 最终成果:可审计、可追踪的任务系统
部署完这套系统后,我们的运维和审计能力得到了质的飞跃。当需要调查一个失败的订单处理流程时,我们不再是无头苍蝇。
全链路追踪:我们只需获取到该流程的
trace_id,就可以在Solr中执行查询:
q=trace_id:"f47ac10b-58cc-4372-a567-0e02b2c3d479"&sort=timestamp asc
这将返回该trace_id下所有任务的所有日志,按时间排序,清晰地展示了整个执行链条。性能瓶颈分析:我们可以通过聚合查询来分析哪些任务平均耗时最长:
q=*:*&json.facet={ tasks:{ type:terms, field:task_name, facet:{ avg_duration:"avg(duration_ms)" } } }
(假设我们也在日志中记录了任务执行时间duration_ms)安全审计:当需要确定某个敏感操作是由谁触发时,我们可以查询JWT的
sub(subject) claim:
q=task_name:"delete_user_data" AND jwt_claim_sub:"user-id-123"
这能精确定位到由user-id-123触发的所有delete_user_data任务的日志,为合规性和安全审计提供了强有力的证据。
# 局限性与未来展望
尽管这套方案解决了我们眼前的核心痛点,但它并非完美无缺。当前的实现存在一些局限性,也是我们下一步迭代的方向。
首先,Solr集群的管理和扩容相对复杂。随着日志量的指数级增长,我们需要更自动化的sharding策略和更精细的性能调优。探索将日志数据冷热分离,将旧日志归档到成本更低的存储(如S3)是一个必须考虑的成本优化方向。
其次,OIDC token在每个任务中都进行一次完整的公钥获取和验证,在高吞吐量场景下可能会成为性能瓶颈。一个可行的优化路径是引入一个轻量级的内部服务或sidecar,专门负责缓存JWKS并进行token验证,业务任务只与这个可信的服务交互。
最后,当前的方案主要解决了日志和身份追踪的问题。为了构建更全面的可观测性体系,我们应当引入分布式追踪标准,如OpenTelemetry。将trace_id替换为W3C Trace Context标准的traceparent,并将日志与Metrics、Traces关联起来,才能真正实现可观测性的“三位一体”,让我们对这个复杂的分布式系统有更深刻的洞察。