使用 Tonic gRPC 和 SSE 构建 Puppet 节点的自动化凭证轮换控制平面


配置管理节点(例如 Puppet Agent)的凭证轮换一直是个棘手的运维问题。传统的基于 Cron 的脚本缺乏即时性、可观测性和可靠的错误处理。在一次安全审计后,我们团队被要求构建一个能够按需、自动化、并且全程可观测的凭证轮换机制,以应对潜在的凭证泄露事件,要求响应时间必须在分钟级别。

最初的构想是利用现有的 Puppet 体系,通过修改 Hiera 数据来分发新凭证。但这依然依赖 Puppet Agent 的下一次轮询,无法满足即时性的要求。我们需要一个主动推送的控制平面。这个控制平面必须是高性能、类型安全且内存安全的,因为它将成为核心的基础设施组件。Rust 配合 Tonic gRPC 框架成为了我们的首选。

同时,操作过程必须对 SRE 团队完全透明。轮换任务的每一步——连接节点、备份旧配置、更新文件、重启服务——都应该实时地反馈到操作界面上。轮询 API 来获取状态是一种低效的方式,而 WebSocket 对于这种单向的数据流来说又显得过于笨重。Server-Sent Events (SSE) 以其简单、基于标准 HTTP 的特性,完美契合了这个场景。

最终的架构围绕一个核心的 Rust 服务展开:它通过 Tonic 提供 gRPC 接口,接收来自 GitHub Actions 的轮换指令;执行任务时,将详细的日志和状态变更实时推送到一个内部状态总线;另一个 HTTP 端点则订阅这些状态,并通过 SSE 将其流式传输给前端观察界面。

定义控制平面通信契约

在动手编写任何代码之前,定义清晰的 API 契约是至关重要的一步。在真实项目中,这能确保前后端、服务与客户端之间的关注点分离。我们使用 Protocol Buffers (Protobuf) 来定义我们的 gRPC 服务。

这个服务需要一个核心方法 RotateCredential。该请求需要包含目标节点标识、新的凭证信息。考虑到轮换过程可能耗时较长且包含多个步骤,响应不应该是一个简单的成功/失败消息,而是一个包含详细日志和状态更新的流。

proto/rotation.proto:

syntax = "proto3";

package rotation_service;

// 控制平面服务定义
service RotationController {
  // 执行凭证轮换的核心 RPC
  // 接收一个轮换请求,并以流的形式返回操作日志和状态
  rpc RotateCredential(RotateCredentialRequest) returns (stream RotateCredentialResponse);
}

// 凭证轮换请求
message RotateCredentialRequest {
  // 目标节点的 FQDN 或唯一标识符
  string node_id = 1;

  // 要轮换的新凭证内容。在生产环境中,这应该是一个指向
  // Vault 或其他密钥管理系统地址的引用,而非明文。
  // 为简化示例,这里使用 string。
  string new_credential_payload = 2;

  // 请求的唯一ID,用于追踪和日志关联
  string request_id = 3;
}

// 轮换过程中的流式响应
message RotateCredentialResponse {
  // 响应时间戳,使用 Unix epoch (nanoseconds)
  int64 timestamp_nanos = 1;
  
  // 响应的具体内容
  oneof content {
    // 状态更新消息
    StatusUpdate status_update = 2;
    // 实时日志条目
    LogEntry log_entry = 3;
    // 最终结果
    FinalResult final_result = 4;
  }
}

// 状态更新
message StatusUpdate {
  // 当前执行的步骤
  RotationStep current_step = 1;
  // 步骤的描述信息
  string description = 2;
}

// 实时日志条目
message LogEntry {
  // 日志级别
  LogLevel level = 1;
  // 日志消息
  string message = 2;
}

// 最终结果
message FinalResult {
  // 操作是否成功
  bool success = 1;
  // 如果失败,包含错误信息
  string error_message = 2;
  // 操作的总耗时(毫秒)
  int64 duration_ms = 3;
}

// 定义轮换过程中的各个步骤
enum RotationStep {
  UNKNOWN = 0;
  VALIDATING_REQUEST = 1;
  CONNECTING_TO_NODE = 2;
  BACKING_UP_CONFIG = 3;
  UPDATING_CREDENTIAL = 4;
  RESTARTING_SERVICE = 5;
  VERIFYING_STATUS = 6;
  COMPLETED = 7;
}

// 日志级别
enum LogLevel {
  INFO = 0;
  WARN = 1;
  ERROR = 2;
  DEBUG = 3;
}

这个 Protobuf 定义非常明确:RotateCredential 是一个服务器流式 RPC。客户端发送一个请求,服务器会持续不断地发回 RotateCredentialResponse 消息,直到任务完成。这种设计为构建一个实时、响应式的用户界面提供了坚实的基础。

构建 Tonic gRPC 服务核心

接下来是实现这个 gRPC 服务的 Rust 代码。我们将使用 tonic 作为 gRPC 框架,tokio 作为异步运行时,tracing 进行结构化日志记录,以及 thiserror 来进行规范的错误处理。

首先,项目结构:

.
├── Cargo.toml
├── build.rs
├── proto
│   └── rotation.proto
└── src
    ├── bin
    │   └── server.rs
    ├── error.rs
    └── lib.rs

build.rs 负责在编译时从 .proto 文件生成 Rust 代码:

// build.rs
fn main() -> Result<(), Box<dyn std::error::Error>> {
    tonic_build::configure()
        .compile(&["proto/rotation.proto"], &["proto/"])?;
    Ok(())
}

我们的 server.rs 将包含服务的全部实现。在真实项目中,业务逻辑会进一步拆分,但为了演示,我们将其集中。一个关键的设计是使用 tokio::sync::mpsc channel 将 RPC 的处理逻辑与流式响应的发送解耦。这使得代码更清晰,也更容易进行单元测试。

src/bin/server.rs:

use std::pin::Pin;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use futures::Stream;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tonic::{Request, Response, Status, transport::Server};
use tracing::{error, info, instrument, Level};
use tracing_subscriber::FmtSubscriber;

// 由 build.rs 从 .proto 文件生成的代码
mod rotation_service {
    tonic::include_proto!("rotation_service");
}

use rotation_service::{
    rotation_controller_server::{RotationController, RotationControllerServer},
    LogEntry, LogLevel, RotateCredentialRequest, RotateCredentialResponse, RotationStep, StatusUpdate, FinalResult,
};

// 服务实现结构体
#[derive(Debug, Default)]
pub struct RotationControllerService {}

// 定义一个类型别名,方便使用
type RotationStream = Pin<Box<dyn Stream<Item = Result<RotateCredentialResponse, Status>> + Send>>;

#[tonic::async_trait]
impl RotationController for RotationControllerService {
    type RotateCredentialStream = RotationStream;

    #[instrument(skip(self, request), fields(request_id = %request.get_ref().request_id, node_id = %request.get_ref().node_id))]
    async fn rotate_credential(
        &self,
        request: Request<RotateCredentialRequest>,
    ) -> Result<Response<Self::RotateCredentialStream>, Status> {
        let req = request.into_inner();
        info!("Received credential rotation request.");

        // 创建一个多生产者、单消费者的通道来发送流式响应
        // 缓冲区大小设为 128,对于日志流来说足够了
        let (tx, rx) = mpsc::channel(128);

        // 派生一个异步任务来执行实际的轮换逻辑
        // 这可以防止长时间运行的任务阻塞 gRPC 服务线程
        tokio::spawn(async move {
            let start_time = std::time::Instant::now();
            
            // 模拟轮换过程
            // 在真实世界中,这里会包含 SSH 连接、文件操作等复杂的 I/O
            let result = Self::perform_rotation_logic(req, tx.clone()).await;

            // 任务结束后,发送最终结果
            let final_result = match result {
                Ok(_) => {
                    info!("Rotation process completed successfully.");
                    FinalResult {
                        success: true,
                        error_message: String::new(),
                        duration_ms: start_time.elapsed().as_millis() as i64,
                    }
                }
                Err(e) => {
                    error!("Rotation process failed: {}", e);
                    FinalResult {
                        success: false,
                        error_message: e.to_string(),
                        duration_ms: start_time.elapsed().as_millis() as i64,
                    }
                }
            };

            let response = RotateCredentialResponse {
                timestamp_nanos: current_timestamp_nanos(),
                content: Some(rotation_service::rotate_credential_response::Content::FinalResult(final_result)),
            };
            
            // 发送最终结果消息。如果发送失败(例如客户端已断开连接),
            // 我们记录一个警告即可,因为任务本身已经结束。
            if let Err(e) = tx.send(Ok(response)).await {
                tracing::warn!("Failed to send final result to client: {}", e);
            }
        });

        // 立即返回一个包含接收端的响应流
        let output_stream = ReceiverStream::new(rx);
        Ok(Response::new(Box::pin(output_stream) as Self::RotateCredentialStream))
    }
}

impl RotationControllerService {
    // 实际执行轮换的业务逻辑
    async fn perform_rotation_logic(
        req: RotateCredentialRequest,
        tx: mpsc::Sender<Result<RotateCredentialResponse, Status>>,
    ) -> Result<(), anyhow::Error> {
        // 一个辅助函数,用于发送流式消息,简化代码
        macro_rules! send_update {
            ($content:expr) => {
                let response = RotateCredentialResponse {
                    timestamp_nanos: current_timestamp_nanos(),
                    content: Some($content),
                };
                if tx.send(Ok(response)).await.is_err() {
                    // 如果发送失败,意味着客户端已经断开连接,我们可以提前终止任务
                    return Err(anyhow::anyhow!("Client disconnected"));
                }
            };
        }
        
        // 1. 验证请求
        send_update!(rotation_service::rotate_credential_response::Content::StatusUpdate(StatusUpdate {
            current_step: RotationStep::ValidatingRequest as i32,
            description: "Validating request parameters...".to_string(),
        }));
        tokio::time::sleep(Duration::from_millis(100)).await;
        if req.node_id.is_empty() || req.new_credential_payload.is_empty() {
            return Err(anyhow::anyhow!("Invalid request: node_id and new_credential_payload cannot be empty."));
        }
        send_update!(rotation_service::rotate_credential_response::Content::LogEntry(LogEntry {
            level: LogLevel::Info as i32,
            message: format!("Request for node '{}' validated.", req.node_id),
        }));

        // 2. 连接到节点 (模拟)
        send_update!(rotation_service::rotate_credential_response::Content::StatusUpdate(StatusUpdate {
            current_step: RotationStep::ConnectingToNode as i32,
            description: format!("Connecting to node {}...", req.node_id),
        }));
        tokio::time::sleep(Duration::from_secs(1)).await; // 模拟网络延迟
        send_update!(rotation_service::rotate_credential_response::Content::LogEntry(LogEntry {
            level: LogLevel::Info as i32,
            message: "Connection established.".to_string(),
        }));

        // 3. 备份配置 (模拟)
        send_update!(rotation_service::rotate_credential_response::Content::StatusUpdate(StatusUpdate {
            current_step: RotationStep::BackingUpConfig as i32,
            description: "Backing up existing puppet.conf...".to_string(),
        }));
        tokio::time::sleep(Duration::from_millis(500)).await;
        send_update!(rotation_service::rotate_credential_response::Content::LogEntry(LogEntry {
            level: LogLevel::Info as i32,
            message: "Backup created at /etc/puppetlabs/puppet/puppet.conf.bak".to_string(),
        }));
        
        // ... 此处可以添加更多步骤的模拟实现 ...

        // 4. 更新凭证 (模拟)
        send_update!(rotation_service::rotate_credential_response::Content::StatusUpdate(StatusUpdate {
            current_step: RotationStep::UpdatingCredential as i32,
            description: "Writing new credential to configuration file...".to_string(),
        }));
        tokio::time::sleep(Duration::from_millis(200)).await;
        send_update!(rotation_service::rotate_credential_response::Content::LogEntry(LogEntry {
            level: LogLevel::Info as i32,
            message: "Credential updated successfully.".to_string(),
        }));

        // 5. 重启 Puppet 服务 (模拟)
        send_update!(rotation_service::rotate_credential_response::Content::StatusUpdate(StatusUpdate {
            current_step: RotationStep::RestartingService as i32,
            description: "Restarting puppet agent service...".to_string(),
        }));
        tokio::time::sleep(Duration::from_secs(2)).await;
        send_update!(rotation_service::rotate_credential_response::Content::LogEntry(LogEntry {
            level: LogLevel::Info as i32,
            message: "Service puppet restart signal sent.".to_string(),
        }));

        Ok(())
    }
}

fn current_timestamp_nanos() -> i64 {
    SystemTime::now()
        .duration_since(UNIX_EPOCH)
        .unwrap_or_default()
        .as_nanos() as i64
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // 初始化 tracing 日志系统
    let subscriber = FmtSubscriber::builder()
        .with_max_level(Level::INFO)
        .finish();
    tracing::subscriber::set_global_default(subscriber)
        .expect("setting default subscriber failed");

    let addr = "0.0.0.0:50051".parse()?;
    let rotation_service = RotationControllerService::default();

    info!("RotationController gRPC server listening on {}", addr);

    Server::builder()
        .add_service(RotationControllerServer::new(rotation_service))
        .serve(addr)
        .await?;

    Ok(())
}

这段代码的核心在于 rotate_credential 方法。它立即返回一个数据流,同时将实际的、可能耗时很长的操作 perform_rotation_logic 派生到后台任务中。后台任务通过一个 mpsc channel 将进度更新发送给 gRPC 运行时,后者再将这些更新转发给客户端。这种模式确保了服务的高吞吐量,不会因为单个请求的阻塞而影响其他请求。

集成 SSE 实现实时观测

gRPC 服务已经能够流式返回进度,但 gRPC 通常用于服务间通信。为了给人类操作员提供一个简单的 Web UI,我们需要将这些 gRPC 流转换为浏览器友好的 SSE 流。

我们可以在同一个 Rust 应用中集成一个 Web 框架(如 axum),并添加一个 SSE 端点。这里的挑战在于,如何将特定轮换任务的 gRPC 流与特定的 SSE 连接关联起来。一个常见的错误是尝试在内存中维护复杂的映射关系,这会导致状态管理混乱。

一个更稳健的模式是使用一个发布-订阅(Pub/Sub)系统。当 gRPC 服务处理任务时,它不仅将更新发送回 gRPC 客户端,还将其发布到一个全局或任务特定的主题上。SSE 端点则订阅这些主题,并将消息转发给 Web 客户端。

为了简化,我们将使用 tokio::sync::broadcast channel 作为内存中的 Pub/Sub 实现。在生产环境中,这可以被替换为 Redis Pub/Sub 或 NATS,以支持服务的水平扩展。

graph TD
    subgraph GitHub
        A[GitHub Action] -- gRPC Call --> B
    end

    subgraph Rust Control Plane
        B(Tonic gRPC Server)
        B -- Spawns Task --> C{Rotation Logic}
        C -- gRPC Stream --> B
        C -- Publishes Events --> D(Broadcast Channel)
        E(Axum SSE Endpoint) -- Subscribes to --> D
    end
    
    subgraph SRE Operator
        F[Web UI] -- HTTP GET --> E
        E -- SSE Stream --> F
    end

    subgraph Managed Nodes
        C -- SSH/API --> G[Puppet Node]
    end

我们需要修改 gRPC 服务,让它在处理任务时将事件发布到 broadcast channel。然后,创建一个 axum 服务来暴露 /events/{request_id} 端点。

这是一个概念性的实现,需要对项目结构进行调整以同时容纳 Tonic 和 Axum。这里只展示关键逻辑。

1. 引入全局状态和 Broadcast Channel
我们需要一个能在多个请求间共享的状态,比如一个 DashMap 来存储每个 request_id 对应的 broadcast::Sender

// 在 main.rs 或 lib.rs
use dashmap::DashMap;
use std::sync::Arc;
use tokio::sync::broadcast;

// 代表广播的消息,需要 Clone
#[derive(Clone, Debug)]
pub enum RotationEvent {
    Update(RotateCredentialResponse),
    Completed,
}

// 共享的应用状态
pub type AppState = Arc<DashMap<String, broadcast::Sender<RotationEvent>>>;

2. 修改 gRPC 服务以发布事件
perform_rotation_logic 需要接收 AppStaterequest_id,并在每次发送更新时也向 broadcast channel 发布。

// 在 gRPC 实现中
async fn perform_rotation_logic(
    //...
    app_state: AppState,
    request_id: String,
) -> Result<(), anyhow::Error> {
    // 在任务开始时,为这个 request_id 创建一个新的 broadcast channel
    let (b_tx, _) = broadcast::channel(128);
    app_state.insert(request_id.clone(), b_tx.clone());
    
    // ... 
    
    macro_rules! send_and_broadcast_update {
        ($content:expr) => {
            let response = RotateCredentialResponse { /* ... */ };
            // 发送回 gRPC 客户端
            if grpc_tx.send(Ok(response.clone())).await.is_err() { /* ... */ }
            // 广播给 SSE 订阅者
            let _ = b_tx.send(RotationEvent::Update(response));
        };
    }

    // ... 在逻辑的每个步骤使用 send_and_broadcast_update!

    // 任务结束后,发送完成信号并从 AppState 中移除 channel
    let _ = b_tx.send(RotationEvent::Completed);
    app_state.remove(&request_id);
    
    Ok(())
}

3. 创建 Axum SSE 端点

// 在 web_server.rs 或 main.rs 中
use axum::{
    extract::{Path, State},
    response::sse::{Event, KeepAlive, Sse},
    routing::get,
    Router,
};
use futures::stream::Stream;
use tokio_stream::wrappers::BroadcastStream;
use tokio_stream::StreamExt as _;

async fn sse_handler(
    Path(request_id): Path<String>,
    State(app_state): State<AppState>,
) -> Sse<impl Stream<Item = Result<Event, axum::Error>>> {
    
    let stream = async_stream::stream! {
        // 等待并获取对应 request_id 的 broadcast receiver
        // 在真实项目中,这里需要更复杂的逻辑,例如超时或在 channel 不存在时立即返回错误
        let mut rx = loop {
            if let Some(tx) = app_state.get(&request_id) {
                break tx.subscribe();
            }
            tokio::time::sleep(Duration::from_millis(100)).await;
        };

        let mut broadcast_stream = BroadcastStream::new(rx);

        while let Some(Ok(event)) = broadcast_stream.next().await {
            match event {
                RotationEvent::Update(resp) => {
                    // 将 gRPC 响应序列化为 JSON 字符串
                    let json_data = serde_json::to_string(&resp).unwrap_or_default();
                    yield Ok(Event::default().data(json_data));
                },
                RotationEvent::Completed => {
                    yield Ok(Event::default().event("completed").data("task finished"));
                    break; // 结束 SSE 流
                }
            }
        }
    };
    
    Sse::new(stream).keep_alive(KeepAlive::default())
}

// 在 main 函数中启动 axum 服务
// let app = Router::new().route("/events/:request_id", get(sse_handler)).with_state(app_state);
// axum::Server::bind(&"0.0.0.0:3000".parse().unwrap()).serve(app.into_make_service()).await.unwrap();

这个 SSE 端点通过路径参数 request_id 获取任务标识,然后从共享状态 AppState 中找到对应的 broadcast::Receiver。一旦订阅成功,它就会将接收到的 RotationEvent 转换成 SSE Event 并发送给客户端,直到收到 Completed 信号。

使用 GitHub Actions 作为触发器

最后一步是创建一个 GitHub Actions 工作流,作为 SRE 团队与我们控制平面交互的入口。这提供了一个带审计日志、权限控制且易于使用的界面。

.github/workflows/rotate-credential.yml:

name: "Manual Credential Rotation"

on:
  workflow_dispatch:
    inputs:
      node_id:
        description: 'The FQDN of the target Puppet node'
        required: true
        type: string
      secret_name:
        description: 'The name of the GitHub Secret containing the new credential'
        required: true
        type: string

jobs:
  rotate:
    runs-on: ubuntu-latest
    steps:
      - name: Checkout code
        uses: actions/checkout@v3

      - name: Install gRPCurl
        run: |
          sudo apt-get update && sudo apt-get install -y grpcurl

      - name: Generate Request ID
        id: request_id
        run: echo "uuid=$(uuidgen)" >> $GITHUB_OUTPUT

      - name: Execute Rotation via gRPC
        env:
          # 从 GitHub Secrets 中获取凭证
          NEW_CREDENTIAL: ${{ secrets[inputs.secret_name] }}
          # gRPC 服务器地址,应配置在 GitHub 环境或仓库变量中
          GRPC_SERVER_ADDRESS: "grpc.internal.mycorp.com:50051"
        run: |
          echo "Starting rotation for node: ${{ inputs.node_id }}"
          echo "Request ID: ${{ steps.request_id.outputs.uuid }}"
          echo "You can monitor the progress at: https://ops-dashboard.internal.mycorp.com/rotation/${{ steps.request_id.outputs.uuid }}"

          # 使用 grpcurl 调用服务
          # -plaintext: 假设内部网络,禁用 TLS。生产环境必须使用 TLS。
          # -d: 指定请求体,使用 JSON 格式
          grpcurl -plaintext \
            -d '{
              "node_id": "${{ inputs.node_id }}",
              "new_credential_payload": "${NEW_CREDENTIAL}",
              "request_id": "${{ steps.request_id.outputs.uuid }}"
            }' \
            ${GRPC_SERVER_ADDRESS} \
            rotation_service.RotationController/RotateCredential

这个工作流使用 workflow_dispatch 允许手动触发。它接收 node_id 和一个 GitHub Secret 的名称作为输入。然后,它使用 grpcurl 这个方便的命令行工具来调用我们的 gRPC 服务。一个重要的细节是,它会打印出一个指向我们(虚构的)前端观察页面的链接,SRE 可以点击这个链接实时观看轮换进度。

当前方案的局限性与未来展望

我们构建的这个系统,虽然在功能上满足了初步要求,但在生产环境中推广前仍有几个需要解决的局限性。

首先,控制平面与 Puppet 节点的交互方式目前是“推”模型(例如通过 SSH 执行命令)。这要求控制平面对所有节点有网络可达性和凭证。在大规模或网络隔离的环境中,这会成为一个瓶颈和安全风险。一个更优的架构是采用“拉”模型,在每个 Puppet 节点上部署一个轻量级 gRPC客户端(Agent),由它主动连接到控制平面并监听指令。

其次,使用内存中的 broadcast channel 作为事件总线,使得控制平面服务本身成为了一个有状态的单点。如果服务重启,所有进行中的任务状态都会丢失。将其替换为外部的持久化消息队列(如 Redis Streams 或 NATS JetStream)是扩展到高可用部署的必要步骤。

最后,安全性需要进一步加强。gRPC 服务与 GitHub Actions 之间、以及与节点之间的通信都必须强制启用 mTLS。凭证本身不应直接通过 GitHub Actions 的环境变量传递,而应由控制平面服务使用自身的身份,在运行时从 Vault 等密钥管理系统中拉取。


  目录