构建基于RabbitMQ与OpenTelemetry的可观测异步SSG构建管道


我们维护的一个大型文档中心遇到了瓶颈。它由超过一万个Markdown文件构成,通过Next.js的SSG(静态站点生成)功能构建。最初的方案是简单地通过GitLab的webhook触发一个bash脚本,git pull然后npm run build。这个方案在项目初期运行良好,但随着文档规模和更新频率的增加,问题开始集中爆发:

  1. 构建耦合与阻塞: Webhook直接触发构建,如果短时间内有多次内容提交,会产生多个构建进程,互相争抢CPU和内存资源,导致服务器宕机或构建失败。
  2. 缺乏状态反馈: 触发构建后,编辑人员无法知道构建的状态——是正在排队、正在进行、成功了还是失败了。唯一的反馈途径是去检查网站是否更新,或者去翻阅服务器上杂乱的日志。
  3. 排障困难: 构建失败是常有的事,可能是某个Markdown文件格式错误,也可能是依赖安装失败。定位问题需要SSH到服务器,手动翻阅大量的stdout日志,效率极低。

这个混乱的局面必须终结。我们需要一个解耦的、有状态的、可观测的异步构建系统。

初步构想与技术选型

核心思路是将“触发”与“执行”解耦。Webhook不再直接执行构建,而是发送一个“构建请求”到消息队列。一个或多个独立的Worker进程消费队列中的消息,并执行实际的构建任务。同时,需要一个Web界面来可视化整个流程。

  • 消息队列: RabbitMQ
    我们选择了RabbitMQ而非Redis Stream或Kafka。原因是它成熟、稳定,并且提供了关键特性:消息持久化、ACK机制和死信队列(Dead-Letter Exchange)。这保证了即使Worker崩溃,构建请求也不会丢失。对于构建失败的任务,可以自动转入死信队列,方便后续的人工介入或自动重试。这对于一个生产系统至关重要。

  • 构建Worker: Node.js
    由于我们的SSG工具是Next.js,使用Node.js来编写Worker是最自然的选择。它可以方便地通过child_process模块来调用npm脚本,并能精细地控制子进程的stdio流。

  • 状态与监控面板: Ant Design & React
    我们需要一个内部仪表盘来展示构建队列、历史记录和实时日志。Ant Design提供了构建此类企业级中后台应用所需的所有高质量组件,如Table、Tag、Badge、Spin等,能极大地提升开发效率。

  • 可观测性: OpenTelemetry
    这是整个方案的灵魂。我们需要追踪一个构建请求从被API接收,到进入RabbitMQ,再到被Worker消费,直至构建完成或失败的全过程。OpenTelemetry作为厂商中立的开放标准,提供了分布式追踪(Tracing)、指标(Metrics)和日志(Logs)的统一解决方案,能完美地将这些孤立的组件串联起来,形成完整的调用链。

系统架构

整体流程被设计为:

graph TD
    subgraph "触发源"
        A[内容编辑] -- "保存内容" --> B{CMS/Git}
    end

    B -- "Webhook" --> C[API Gateway]

    subgraph "任务分发 (Producer)"
        C -- "发布构建任务" --> D[RabbitMQ Exchange: build.tasks]
    end

    subgraph "构建执行 (Consumer)"
        E[Build Worker] -- "消费任务" --> F[执行SSG构建]
        F -- "成功/失败" --> G[部署/归档]
    end
    
    subgraph "状态反馈"
        E -- "更新状态" --> H[RabbitMQ Exchange: build.status]
        H --> I[Status Backend]
        I --> J[PostgreSQL]
    end

    subgraph "监控与可观测性"
        C -- "Trace/Metrics" --> K((Observability Backend))
        E -- "Trace/Metrics/Logs" --> K
        L[Dashboard Frontend] -- "查询状态" --> I
    end

    D -- "路由" --> M[Queue: build.queue]
    M -- "pull" --> E
    
    L[AntD Dashboard]
    
    style K fill:#f9f,stroke:#333,stroke-width:2px

核心实现:从代码看细节

1. 可观测性初始化 (Tracer Provider)

在所有服务(API Gateway, Build Worker)的入口,我们都需要初始化OpenTelemetry SDK。这是实现全链路追踪的基石。

common/tracer.js

// common/tracer.js
// 这个模块负责初始化 OpenTelemetry SDK
const { NodeSDK } = require('@opentelemetry/sdk-node');
const { OTLPTraceExporter } = require('@opentelemetry/exporter-trace-otlp-http');
const { Resource } = require('@opentelemetry/resources');
const { SemanticResourceAttributes } = require('@opentelemetry/semantic-conventions');
const { getNodeAutoInstrumentations } = require('@opentelemetry/auto-instrumentations-node');
const { AmqplibInstrumentation } = require('opentelemetry-instrumentation-amqplib');

// 生产环境中,这些配置应该来自环境变量
const OTLP_EXPORTER_URL = process.env.OTLP_EXPORTER_URL || 'http://localhost:4318/v1/traces';
const SERVICE_NAME = process.env.SERVICE_NAME || 'default-service';

// 创建一个 OTLP Exporter 实例
const traceExporter = new OTLPTraceExporter({
  url: OTLP_EXPORTER_URL,
});

const sdk = new NodeSDK({
  resource: new Resource({
    [SemanticResourceAttributes.SERVICE_NAME]: SERVICE_NAME,
  }),
  traceExporter,
  instrumentations: [
    // 自动对 Node.js 常用模块进行埋点
    getNodeAutoInstrumentations({
      '@opentelemetry/instrumentation-fs': {
        enabled: false, // 文件系统操作太频繁,通常需要关闭以减少噪音
      },
    }),
    // 专门为 amqplib (RabbitMQ 客户端) 提供的埋点库
    new AmqplibInstrumentation({
        // hook an amqplib method to extract propagation context
        consumeHook: (span, consumeMessage) => {
            span.setAttribute('messaging.rabbitmq.routing_key', consumeMessage.fields.routingKey);
        },
    }),
  ],
});

// 优雅地关闭 SDK
process.on('SIGTERM', () => {
  sdk.shutdown()
    .then(() => console.log('Tracing terminated'))
    .catch((error) => console.log('Error terminating tracing', error))
    .finally(() => process.exit(0));
});

// 启动 SDK
sdk.start();

console.log(`OpenTelemetry SDK started for service: ${SERVICE_NAME}`);

2. API Gateway: 接收请求并发布任务

这是一个简单的Express服务器,它接收webhook,创建一个顶级的Trace Span,然后将任务发布到RabbitMQ。关键在于它如何将Trace Context注入到RabbitMQ消息中。AmqplibInstrumentation会自动处理这个过程。

api-gateway/server.js

// 设置服务名,必须在 tracer 之前
process.env.SERVICE_NAME = 'api-gateway';
require('../common/tracer'); // 初始化 OTel

const express = require('express');
const amqp = require('amqplib');
const { v4: uuidv4 } = require('uuid');
const opentelemetry = require('@opentelemetry/api');

const app = express();
app.use(express.json());

const RABBITMQ_URL = process.env.RABBITMQ_URL || 'amqp://localhost';
const TASK_EXCHANGE = 'build.tasks';
const TASK_QUEUE = 'build.queue';
const TASK_ROUTING_KEY = 'new.build';

let channel;

async function connectRabbitMQ() {
  try {
    const connection = await amqp.connect(RABBITMQ_URL);
    channel = await connection.createChannel();
    await channel.assertExchange(TASK_EXCHANGE, 'direct', { durable: true });
    await channel.assertQueue(TASK_QUEUE, { 
        durable: true,
        // 配置死信队列,这是生产环境的关键
        deadLetterExchange: 'build.tasks.dlx',
        deadLetterRoutingKey: 'failed.build'
    });
    await channel.bindQueue(TASK_QUEUE, TASK_EXCHANGE, TASK_ROUTING_KEY);
    console.log('RabbitMQ connected and configured.');
  } catch (error) {
    console.error('Failed to connect to RabbitMQ', error);
    process.exit(1);
  }
}

app.post('/v1/build', async (req, res) => {
  const tracer = opentelemetry.trace.getTracer('api-gateway-tracer');
  // 创建一个新的 Span 作为整个构建流程的根 Span
  await tracer.startActiveSpan('api-receive-build-request', async (span) => {
    try {
      const { repositoryUrl, branch } = req.body;
      if (!repositoryUrl || !branch) {
        span.setStatus({ code: opentelemetry.SpanStatusCode.ERROR, message: 'Invalid payload' });
        return res.status(400).json({ error: 'repositoryUrl and branch are required' });
      }

      const buildId = uuidv4();
      const task = { buildId, repositoryUrl, branch, triggerTime: new Date().toISOString() };
      
      span.setAttributes({
        'build.id': buildId,
        'git.repo_url': repositoryUrl,
        'git.branch': branch,
      });

      const messageBuffer = Buffer.from(JSON.stringify(task));

      // OpenTelemetry 的 amqplib instrumentation 会自动注入 trace context 到 headers
      channel.publish(TASK_EXCHANGE, TASK_ROUTING_KEY, messageBuffer, {
        persistent: true, // 消息持久化
        messageId: buildId,
      });

      span.addEvent('Task published to RabbitMQ');
      console.log(`[${buildId}] Build task published for ${repositoryUrl}`);
      res.status(202).json({ message: 'Build task accepted', buildId });
      span.end();

    } catch (error) {
      console.error('Error processing build request:', error);
      span.recordException(error);
      span.setStatus({ code: opentelemetry.SpanStatusCode.ERROR, message: error.message });
      res.status(500).json({ error: 'Internal server error' });
      span.end();
    }
  });
});

const PORT = process.env.PORT || 3000;
app.listen(PORT, async () => {
  await connectRabbitMQ();
  console.log(`API Gateway listening on port ${PORT}`);
});

3. Build Worker: 核心执行者

Worker是系统的核心,它负责消费消息、执行耗时的构建任务,并处理各种成功和失败的情况。

build-worker/worker.js

// 设置服务名
process.env.SERVICE_NAME = 'build-worker';
require('../common/tracer'); // 初始化 OTel

const amqp = require('amqplib');
const { exec } = require('child_process');
const fs = require('fs/promises');
const path = require('path');
const opentelemetry = require('@opentelemetry/api');

const RABBITMQ_URL = process.env.RABBITMQ_URL || 'amqp://localhost';
const TASK_QUEUE = 'build.queue';
const WORKSPACE_DIR = path.join(__dirname, 'workspace');

let channel;

async function executeCommand(command, cwd, span) {
  return new Promise((resolve, reject) => {
    // 记录命令执行的子 Span
    const tracer = opentelemetry.trace.getTracer('command-executor-tracer');
    tracer.startActiveSpan(`exec: ${command.split(' ')[0]}`, (childSpan) => {
        childSpan.setAttribute('command', command);
        childSpan.setAttribute('cwd', cwd);

        const proc = exec(command, { cwd });
        let stdout = '';
        let stderr = '';

        proc.stdout.on('data', (data) => {
            stdout += data.toString();
            // 可以在这里将实时日志发送到 WebSocket 或其他地方
        });

        proc.stderr.on('data', (data) => {
            stderr += data.toString();
        });

        proc.on('close', (code) => {
            childSpan.setAttribute('exit_code', code);
            // 将标准输出和错误作为 event 附加到 span
            childSpan.addEvent('Command Output', { stdout, stderr });
            if (code === 0) {
                console.log(`Command succeeded: ${command}`);
                childSpan.setStatus({ code: opentelemetry.SpanStatusCode.OK });
                childSpan.end();
                resolve(stdout);
            } else {
                console.error(`Command failed with code ${code}: ${command}`);
                const error = new Error(`Command failed: ${stderr}`);
                childSpan.recordException(error);
                childSpan.setStatus({ code: opentelemetry.SpanStatusCode.ERROR, message: stderr.slice(0, 1024) });
                childSpan.end();
                reject(error);
            }
        });
    });
  });
}

async function handleBuildTask(msg) {
  const taskContent = msg.content.toString();
  const task = JSON.parse(taskContent);
  const { buildId, repositoryUrl, branch } = task;

  console.log(`[${buildId}] Received build task.`);

  // 从消息中恢复 Trace Context,amqplib instrumentation 会自动处理
  // 我们只需要创建一个新的子 Span
  const tracer = opentelemetry.trace.getTracer('build-worker-tracer');
  await tracer.startActiveSpan(`process-build-task:${buildId}`, async (parentSpan) => {
    parentSpan.setAttributes({
        'build.id': buildId,
        'messaging.message_id': msg.properties.messageId
    });

    const repoDir = path.join(WORKSPACE_DIR, buildId);

    try {
        await fs.mkdir(repoDir, { recursive: true });
        parentSpan.addEvent('Workspace created');

        // 详细的步骤,每个步骤都是一个子 Span,便于分析耗时
        await executeCommand(`git clone --branch ${branch} --depth 1 ${repositoryUrl} .`, repoDir, parentSpan);
        await executeCommand('npm install', repoDir, parentSpan);
        await executeCommand('npm run build', repoDir, parentSpan); // 假设 `next build && next export`
        
        // 此处应有部署逻辑
        parentSpan.addEvent('Build Succeeded. Deployment logic placeholder.');
        
        console.log(`[${buildId}] Build finished successfully.`);
        parentSpan.setStatus({ code: opentelemetry.SpanStatusCode.OK });
        channel.ack(msg); // 任务处理成功,确认消息
    } catch (error) {
        console.error(`[${buildId}] Build failed:`, error.message);
        parentSpan.recordException(error);
        parentSpan.setStatus({ code: opentelemetry.SpanStatusCode.ERROR, message: error.message });
        
        // 这里的坑在于:如果直接 nack 并 requeue=true,失败的任务会无限循环。
        // 正确的做法是 nack 且 requeue=false,让消息进入死信队列。
        channel.nack(msg, false, false); 
    } finally {
        parentSpan.end();
        // 清理工作区
        await fs.rm(repoDir, { recursive: true, force: true }).catch(err => console.error(`Failed to cleanup workspace ${repoDir}`, err));
    }
  });
}

async function startWorker() {
  try {
    await fs.mkdir(WORKSPACE_DIR, { recursive: true });
    const connection = await amqp.connect(RABBITMQ_URL);
    channel = await connection.createChannel();
    
    // 设置 prefetch 为 1,确保一个 worker 一次只处理一个任务
    // 这是保证资源不被耗尽的关键配置
    await channel.prefetch(1);
    
    console.log('Worker is waiting for tasks...');
    channel.consume(TASK_QUEUE, handleBuildTask, { noAck: false });
  } catch (error) {
    console.error('Worker failed to start', error);
    process.exit(1);
  }
}

startWorker();

这个Worker有几个关键的设计点:

  1. prefetch(1): 确保Worker同一时间只从队列中取一个任务。这能防止一个Worker因内存耗尽而崩溃,尤其是在构建这种资源密集型任务中。
  2. 原子化操作追踪: 每个shell命令的执行都被包裹在一个独立的Span中,这使得我们可以在Jaeger或类似工具中清晰地看到git clone, npm install, npm run build各自的耗时。
  3. 正确的错误处理与ACK/NACK: 任务成功后调用channel.ack(msg),消息才会被从队列中删除。失败后调用channel.nack(msg, false, false),消息会被拒绝且不会重新入队,而是根据队列配置进入死信队列,这避免了“毒丸消息”(poison pill message)导致Worker无限重启。

4. Ant Design 监控面板

前端部分通过轮询一个后端API(Status Backend)来获取构建任务的最新状态。这个后端服务会消费build.status交换机中的消息并存入数据库。这里我们只展示前端核心组件的代码,以说明其如何与系统集成。

dashboard/src/components/BuildQueue.jsx

import React, { useState, useEffect } from 'react';
import { Table, Tag, Spin, Tooltip } from 'antd';
import axios from 'axios';

// 模拟API地址
const API_URL = '/api/v1/builds';

const columns = [
  {
    title: 'Build ID',
    dataIndex: 'buildId',
    key: 'buildId',
    render: (text) => <code>{text.substring(0, 8)}</code>,
  },
  {
    title: 'Repository',
    dataIndex: 'repositoryUrl',
    key: 'repositoryUrl',
  },
  {
    title: 'Branch',
    dataIndex: 'branch',
    key: 'branch',
    render: (branch) => <Tag color="blue">{branch}</Tag>,
  },
  {
    title: 'Status',
    dataIndex: 'status',
    key: 'status',
    render: (status) => {
      let color;
      switch (status) {
        case 'SUCCEEDED':
          color = 'green';
          break;
        case 'FAILED':
          color = 'red';
          break;
        case 'IN_PROGRESS':
          color = 'processing';
          break;
        case 'QUEUED':
          color = 'gold';
          break;
        default:
          color = 'default';
      }
      return <Tag color={color}>{status.toUpperCase()}</Tag>;
    },
  },
  {
    title: 'Trigger Time',
    dataIndex: 'triggerTime',
    key: 'triggerTime',
    render: (ts) => new Date(ts).toLocaleString(),
  },
  {
    title: 'Trace ID',
    dataIndex: 'traceId',
    key: 'traceId',
    render: (traceId) => (
      <Tooltip title="Click to view trace in Jaeger (conceptual)">
        {/* 在真实项目中,这里会是一个链接 */}
        <a href={`http://localhost:16686/trace/${traceId}`} target="_blank" rel="noopener noreferrer">
          {traceId ? `${traceId.substring(0, 12)}...` : 'N/A'}
        </a>
      </Tooltip>
    ),
  },
];

const BuildQueue = () => {
  const [builds, setBuilds] = useState([]);
  const [loading, setLoading] = useState(true);

  useEffect(() => {
    const fetchData = async () => {
      try {
        setLoading(true);
        const response = await axios.get(API_URL);
        // Status Backend 需要在存储状态时,同时记录下 Trace ID
        setBuilds(response.data);
      } catch (error) {
        console.error('Failed to fetch build data:', error);
      } finally {
        setLoading(false);
      }
    };

    fetchData();
    const intervalId = setInterval(fetchData, 5000); // 每5秒轮询一次

    return () => clearInterval(intervalId);
  }, []);

  return (
    <Spin spinning={loading}>
      <Table 
        columns={columns} 
        dataSource={builds} 
        rowKey="buildId"
        title={() => <h2>SSG Build Pipeline Status</h2>}
      />
    </Spin>
  );
};

export default BuildQueue;

这个面板的核心价值在于将业务状态(构建ID、状态)与底层系统的可观测性数据(Trace ID)关联起来。当一个构建失败时,运维人员不再需要登录服务器,而是可以直接在面板上点击Trace ID,跳转到Jaeger UI,查看整个分布式调用链,清晰地看到是哪个环节、哪行代码出了问题,以及详细的错误日志。这正是可观测性驱动开发(Observability-Driven Development)的威力。

局限性与未来展望

当前这套系统虽然解决了最初的痛点,但它依然有其局限性。

首先,Build Worker是单体的。虽然可以通过部署多个Worker实例来提高并发处理能力,但并没有精细化的资源调度。如果某个构建任务需要异常多的内存,它仍然可能影响到同一台机器上的其他Worker。一个更优的方案是使用Kubernetes,将每个构建任务作为一个Job来运行,实现真正的资源隔离和弹性伸缩。

其次,日志处理还比较粗糙。目前日志只是打印到stdout,并通过OpenTelemetry Span的event来记录。对于大型构建,日志量可能非常大,一个更健壮的方案是将实时日志流式传输到集中式日志平台(如ELK Stack或Loki),并在监控面板上提供实时日志查看功能。

最后,对死信队列的处理目前是缺失的。失败的任务被移动到死信队列后,需要有一套机制来处理它们——是自动重试N次,还是通知相关人员手动处理。这需要根据具体的业务需求来设计更复杂的重试和告警策略。

尽管存在这些可优化的点,这个架构已经将一个脆弱、不透明的脚本转变为一个健壮、可观测的自动化管道,为后续的平台化演进奠定了坚实的基础。


  目录