配置管理节点(例如 Puppet Agent)的凭证轮换一直是个棘手的运维问题。传统的基于 Cron 的脚本缺乏即时性、可观测性和可靠的错误处理。在一次安全审计后,我们团队被要求构建一个能够按需、自动化、并且全程可观测的凭证轮换机制,以应对潜在的凭证泄露事件,要求响应时间必须在分钟级别。
最初的构想是利用现有的 Puppet 体系,通过修改 Hiera 数据来分发新凭证。但这依然依赖 Puppet Agent 的下一次轮询,无法满足即时性的要求。我们需要一个主动推送的控制平面。这个控制平面必须是高性能、类型安全且内存安全的,因为它将成为核心的基础设施组件。Rust 配合 Tonic gRPC 框架成为了我们的首选。
同时,操作过程必须对 SRE 团队完全透明。轮换任务的每一步——连接节点、备份旧配置、更新文件、重启服务——都应该实时地反馈到操作界面上。轮询 API 来获取状态是一种低效的方式,而 WebSocket 对于这种单向的数据流来说又显得过于笨重。Server-Sent Events (SSE) 以其简单、基于标准 HTTP 的特性,完美契合了这个场景。
最终的架构围绕一个核心的 Rust 服务展开:它通过 Tonic 提供 gRPC 接口,接收来自 GitHub Actions 的轮换指令;执行任务时,将详细的日志和状态变更实时推送到一个内部状态总线;另一个 HTTP 端点则订阅这些状态,并通过 SSE 将其流式传输给前端观察界面。
定义控制平面通信契约
在动手编写任何代码之前,定义清晰的 API 契约是至关重要的一步。在真实项目中,这能确保前后端、服务与客户端之间的关注点分离。我们使用 Protocol Buffers (Protobuf) 来定义我们的 gRPC 服务。
这个服务需要一个核心方法 RotateCredential。该请求需要包含目标节点标识、新的凭证信息。考虑到轮换过程可能耗时较长且包含多个步骤,响应不应该是一个简单的成功/失败消息,而是一个包含详细日志和状态更新的流。
proto/rotation.proto:
syntax = "proto3";
package rotation_service;
// 控制平面服务定义
service RotationController {
// 执行凭证轮换的核心 RPC
// 接收一个轮换请求,并以流的形式返回操作日志和状态
rpc RotateCredential(RotateCredentialRequest) returns (stream RotateCredentialResponse);
}
// 凭证轮换请求
message RotateCredentialRequest {
// 目标节点的 FQDN 或唯一标识符
string node_id = 1;
// 要轮换的新凭证内容。在生产环境中,这应该是一个指向
// Vault 或其他密钥管理系统地址的引用,而非明文。
// 为简化示例,这里使用 string。
string new_credential_payload = 2;
// 请求的唯一ID,用于追踪和日志关联
string request_id = 3;
}
// 轮换过程中的流式响应
message RotateCredentialResponse {
// 响应时间戳,使用 Unix epoch (nanoseconds)
int64 timestamp_nanos = 1;
// 响应的具体内容
oneof content {
// 状态更新消息
StatusUpdate status_update = 2;
// 实时日志条目
LogEntry log_entry = 3;
// 最终结果
FinalResult final_result = 4;
}
}
// 状态更新
message StatusUpdate {
// 当前执行的步骤
RotationStep current_step = 1;
// 步骤的描述信息
string description = 2;
}
// 实时日志条目
message LogEntry {
// 日志级别
LogLevel level = 1;
// 日志消息
string message = 2;
}
// 最终结果
message FinalResult {
// 操作是否成功
bool success = 1;
// 如果失败,包含错误信息
string error_message = 2;
// 操作的总耗时(毫秒)
int64 duration_ms = 3;
}
// 定义轮换过程中的各个步骤
enum RotationStep {
UNKNOWN = 0;
VALIDATING_REQUEST = 1;
CONNECTING_TO_NODE = 2;
BACKING_UP_CONFIG = 3;
UPDATING_CREDENTIAL = 4;
RESTARTING_SERVICE = 5;
VERIFYING_STATUS = 6;
COMPLETED = 7;
}
// 日志级别
enum LogLevel {
INFO = 0;
WARN = 1;
ERROR = 2;
DEBUG = 3;
}
这个 Protobuf 定义非常明确:RotateCredential 是一个服务器流式 RPC。客户端发送一个请求,服务器会持续不断地发回 RotateCredentialResponse 消息,直到任务完成。这种设计为构建一个实时、响应式的用户界面提供了坚实的基础。
构建 Tonic gRPC 服务核心
接下来是实现这个 gRPC 服务的 Rust 代码。我们将使用 tonic 作为 gRPC 框架,tokio 作为异步运行时,tracing 进行结构化日志记录,以及 thiserror 来进行规范的错误处理。
首先,项目结构:
.
├── Cargo.toml
├── build.rs
├── proto
│ └── rotation.proto
└── src
├── bin
│ └── server.rs
├── error.rs
└── lib.rs
build.rs 负责在编译时从 .proto 文件生成 Rust 代码:
// build.rs
fn main() -> Result<(), Box<dyn std::error::Error>> {
tonic_build::configure()
.compile(&["proto/rotation.proto"], &["proto/"])?;
Ok(())
}
我们的 server.rs 将包含服务的全部实现。在真实项目中,业务逻辑会进一步拆分,但为了演示,我们将其集中。一个关键的设计是使用 tokio::sync::mpsc channel 将 RPC 的处理逻辑与流式响应的发送解耦。这使得代码更清晰,也更容易进行单元测试。
src/bin/server.rs:
use std::pin::Pin;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use futures::Stream;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tonic::{Request, Response, Status, transport::Server};
use tracing::{error, info, instrument, Level};
use tracing_subscriber::FmtSubscriber;
// 由 build.rs 从 .proto 文件生成的代码
mod rotation_service {
tonic::include_proto!("rotation_service");
}
use rotation_service::{
rotation_controller_server::{RotationController, RotationControllerServer},
LogEntry, LogLevel, RotateCredentialRequest, RotateCredentialResponse, RotationStep, StatusUpdate, FinalResult,
};
// 服务实现结构体
#[derive(Debug, Default)]
pub struct RotationControllerService {}
// 定义一个类型别名,方便使用
type RotationStream = Pin<Box<dyn Stream<Item = Result<RotateCredentialResponse, Status>> + Send>>;
#[tonic::async_trait]
impl RotationController for RotationControllerService {
type RotateCredentialStream = RotationStream;
#[instrument(skip(self, request), fields(request_id = %request.get_ref().request_id, node_id = %request.get_ref().node_id))]
async fn rotate_credential(
&self,
request: Request<RotateCredentialRequest>,
) -> Result<Response<Self::RotateCredentialStream>, Status> {
let req = request.into_inner();
info!("Received credential rotation request.");
// 创建一个多生产者、单消费者的通道来发送流式响应
// 缓冲区大小设为 128,对于日志流来说足够了
let (tx, rx) = mpsc::channel(128);
// 派生一个异步任务来执行实际的轮换逻辑
// 这可以防止长时间运行的任务阻塞 gRPC 服务线程
tokio::spawn(async move {
let start_time = std::time::Instant::now();
// 模拟轮换过程
// 在真实世界中,这里会包含 SSH 连接、文件操作等复杂的 I/O
let result = Self::perform_rotation_logic(req, tx.clone()).await;
// 任务结束后,发送最终结果
let final_result = match result {
Ok(_) => {
info!("Rotation process completed successfully.");
FinalResult {
success: true,
error_message: String::new(),
duration_ms: start_time.elapsed().as_millis() as i64,
}
}
Err(e) => {
error!("Rotation process failed: {}", e);
FinalResult {
success: false,
error_message: e.to_string(),
duration_ms: start_time.elapsed().as_millis() as i64,
}
}
};
let response = RotateCredentialResponse {
timestamp_nanos: current_timestamp_nanos(),
content: Some(rotation_service::rotate_credential_response::Content::FinalResult(final_result)),
};
// 发送最终结果消息。如果发送失败(例如客户端已断开连接),
// 我们记录一个警告即可,因为任务本身已经结束。
if let Err(e) = tx.send(Ok(response)).await {
tracing::warn!("Failed to send final result to client: {}", e);
}
});
// 立即返回一个包含接收端的响应流
let output_stream = ReceiverStream::new(rx);
Ok(Response::new(Box::pin(output_stream) as Self::RotateCredentialStream))
}
}
impl RotationControllerService {
// 实际执行轮换的业务逻辑
async fn perform_rotation_logic(
req: RotateCredentialRequest,
tx: mpsc::Sender<Result<RotateCredentialResponse, Status>>,
) -> Result<(), anyhow::Error> {
// 一个辅助函数,用于发送流式消息,简化代码
macro_rules! send_update {
($content:expr) => {
let response = RotateCredentialResponse {
timestamp_nanos: current_timestamp_nanos(),
content: Some($content),
};
if tx.send(Ok(response)).await.is_err() {
// 如果发送失败,意味着客户端已经断开连接,我们可以提前终止任务
return Err(anyhow::anyhow!("Client disconnected"));
}
};
}
// 1. 验证请求
send_update!(rotation_service::rotate_credential_response::Content::StatusUpdate(StatusUpdate {
current_step: RotationStep::ValidatingRequest as i32,
description: "Validating request parameters...".to_string(),
}));
tokio::time::sleep(Duration::from_millis(100)).await;
if req.node_id.is_empty() || req.new_credential_payload.is_empty() {
return Err(anyhow::anyhow!("Invalid request: node_id and new_credential_payload cannot be empty."));
}
send_update!(rotation_service::rotate_credential_response::Content::LogEntry(LogEntry {
level: LogLevel::Info as i32,
message: format!("Request for node '{}' validated.", req.node_id),
}));
// 2. 连接到节点 (模拟)
send_update!(rotation_service::rotate_credential_response::Content::StatusUpdate(StatusUpdate {
current_step: RotationStep::ConnectingToNode as i32,
description: format!("Connecting to node {}...", req.node_id),
}));
tokio::time::sleep(Duration::from_secs(1)).await; // 模拟网络延迟
send_update!(rotation_service::rotate_credential_response::Content::LogEntry(LogEntry {
level: LogLevel::Info as i32,
message: "Connection established.".to_string(),
}));
// 3. 备份配置 (模拟)
send_update!(rotation_service::rotate_credential_response::Content::StatusUpdate(StatusUpdate {
current_step: RotationStep::BackingUpConfig as i32,
description: "Backing up existing puppet.conf...".to_string(),
}));
tokio::time::sleep(Duration::from_millis(500)).await;
send_update!(rotation_service::rotate_credential_response::Content::LogEntry(LogEntry {
level: LogLevel::Info as i32,
message: "Backup created at /etc/puppetlabs/puppet/puppet.conf.bak".to_string(),
}));
// ... 此处可以添加更多步骤的模拟实现 ...
// 4. 更新凭证 (模拟)
send_update!(rotation_service::rotate_credential_response::Content::StatusUpdate(StatusUpdate {
current_step: RotationStep::UpdatingCredential as i32,
description: "Writing new credential to configuration file...".to_string(),
}));
tokio::time::sleep(Duration::from_millis(200)).await;
send_update!(rotation_service::rotate_credential_response::Content::LogEntry(LogEntry {
level: LogLevel::Info as i32,
message: "Credential updated successfully.".to_string(),
}));
// 5. 重启 Puppet 服务 (模拟)
send_update!(rotation_service::rotate_credential_response::Content::StatusUpdate(StatusUpdate {
current_step: RotationStep::RestartingService as i32,
description: "Restarting puppet agent service...".to_string(),
}));
tokio::time::sleep(Duration::from_secs(2)).await;
send_update!(rotation_service::rotate_credential_response::Content::LogEntry(LogEntry {
level: LogLevel::Info as i32,
message: "Service puppet restart signal sent.".to_string(),
}));
Ok(())
}
}
fn current_timestamp_nanos() -> i64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_nanos() as i64
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// 初始化 tracing 日志系统
let subscriber = FmtSubscriber::builder()
.with_max_level(Level::INFO)
.finish();
tracing::subscriber::set_global_default(subscriber)
.expect("setting default subscriber failed");
let addr = "0.0.0.0:50051".parse()?;
let rotation_service = RotationControllerService::default();
info!("RotationController gRPC server listening on {}", addr);
Server::builder()
.add_service(RotationControllerServer::new(rotation_service))
.serve(addr)
.await?;
Ok(())
}
这段代码的核心在于 rotate_credential 方法。它立即返回一个数据流,同时将实际的、可能耗时很长的操作 perform_rotation_logic 派生到后台任务中。后台任务通过一个 mpsc channel 将进度更新发送给 gRPC 运行时,后者再将这些更新转发给客户端。这种模式确保了服务的高吞吐量,不会因为单个请求的阻塞而影响其他请求。
集成 SSE 实现实时观测
gRPC 服务已经能够流式返回进度,但 gRPC 通常用于服务间通信。为了给人类操作员提供一个简单的 Web UI,我们需要将这些 gRPC 流转换为浏览器友好的 SSE 流。
我们可以在同一个 Rust 应用中集成一个 Web 框架(如 axum),并添加一个 SSE 端点。这里的挑战在于,如何将特定轮换任务的 gRPC 流与特定的 SSE 连接关联起来。一个常见的错误是尝试在内存中维护复杂的映射关系,这会导致状态管理混乱。
一个更稳健的模式是使用一个发布-订阅(Pub/Sub)系统。当 gRPC 服务处理任务时,它不仅将更新发送回 gRPC 客户端,还将其发布到一个全局或任务特定的主题上。SSE 端点则订阅这些主题,并将消息转发给 Web 客户端。
为了简化,我们将使用 tokio::sync::broadcast channel 作为内存中的 Pub/Sub 实现。在生产环境中,这可以被替换为 Redis Pub/Sub 或 NATS,以支持服务的水平扩展。
graph TD
subgraph GitHub
A[GitHub Action] -- gRPC Call --> B
end
subgraph Rust Control Plane
B(Tonic gRPC Server)
B -- Spawns Task --> C{Rotation Logic}
C -- gRPC Stream --> B
C -- Publishes Events --> D(Broadcast Channel)
E(Axum SSE Endpoint) -- Subscribes to --> D
end
subgraph SRE Operator
F[Web UI] -- HTTP GET --> E
E -- SSE Stream --> F
end
subgraph Managed Nodes
C -- SSH/API --> G[Puppet Node]
end
我们需要修改 gRPC 服务,让它在处理任务时将事件发布到 broadcast channel。然后,创建一个 axum 服务来暴露 /events/{request_id} 端点。
这是一个概念性的实现,需要对项目结构进行调整以同时容纳 Tonic 和 Axum。这里只展示关键逻辑。
1. 引入全局状态和 Broadcast Channel
我们需要一个能在多个请求间共享的状态,比如一个 DashMap 来存储每个 request_id 对应的 broadcast::Sender。
// 在 main.rs 或 lib.rs
use dashmap::DashMap;
use std::sync::Arc;
use tokio::sync::broadcast;
// 代表广播的消息,需要 Clone
#[derive(Clone, Debug)]
pub enum RotationEvent {
Update(RotateCredentialResponse),
Completed,
}
// 共享的应用状态
pub type AppState = Arc<DashMap<String, broadcast::Sender<RotationEvent>>>;
2. 修改 gRPC 服务以发布事件perform_rotation_logic 需要接收 AppState 和 request_id,并在每次发送更新时也向 broadcast channel 发布。
// 在 gRPC 实现中
async fn perform_rotation_logic(
//...
app_state: AppState,
request_id: String,
) -> Result<(), anyhow::Error> {
// 在任务开始时,为这个 request_id 创建一个新的 broadcast channel
let (b_tx, _) = broadcast::channel(128);
app_state.insert(request_id.clone(), b_tx.clone());
// ...
macro_rules! send_and_broadcast_update {
($content:expr) => {
let response = RotateCredentialResponse { /* ... */ };
// 发送回 gRPC 客户端
if grpc_tx.send(Ok(response.clone())).await.is_err() { /* ... */ }
// 广播给 SSE 订阅者
let _ = b_tx.send(RotationEvent::Update(response));
};
}
// ... 在逻辑的每个步骤使用 send_and_broadcast_update!
// 任务结束后,发送完成信号并从 AppState 中移除 channel
let _ = b_tx.send(RotationEvent::Completed);
app_state.remove(&request_id);
Ok(())
}
3. 创建 Axum SSE 端点
// 在 web_server.rs 或 main.rs 中
use axum::{
extract::{Path, State},
response::sse::{Event, KeepAlive, Sse},
routing::get,
Router,
};
use futures::stream::Stream;
use tokio_stream::wrappers::BroadcastStream;
use tokio_stream::StreamExt as _;
async fn sse_handler(
Path(request_id): Path<String>,
State(app_state): State<AppState>,
) -> Sse<impl Stream<Item = Result<Event, axum::Error>>> {
let stream = async_stream::stream! {
// 等待并获取对应 request_id 的 broadcast receiver
// 在真实项目中,这里需要更复杂的逻辑,例如超时或在 channel 不存在时立即返回错误
let mut rx = loop {
if let Some(tx) = app_state.get(&request_id) {
break tx.subscribe();
}
tokio::time::sleep(Duration::from_millis(100)).await;
};
let mut broadcast_stream = BroadcastStream::new(rx);
while let Some(Ok(event)) = broadcast_stream.next().await {
match event {
RotationEvent::Update(resp) => {
// 将 gRPC 响应序列化为 JSON 字符串
let json_data = serde_json::to_string(&resp).unwrap_or_default();
yield Ok(Event::default().data(json_data));
},
RotationEvent::Completed => {
yield Ok(Event::default().event("completed").data("task finished"));
break; // 结束 SSE 流
}
}
}
};
Sse::new(stream).keep_alive(KeepAlive::default())
}
// 在 main 函数中启动 axum 服务
// let app = Router::new().route("/events/:request_id", get(sse_handler)).with_state(app_state);
// axum::Server::bind(&"0.0.0.0:3000".parse().unwrap()).serve(app.into_make_service()).await.unwrap();
这个 SSE 端点通过路径参数 request_id 获取任务标识,然后从共享状态 AppState 中找到对应的 broadcast::Receiver。一旦订阅成功,它就会将接收到的 RotationEvent 转换成 SSE Event 并发送给客户端,直到收到 Completed 信号。
使用 GitHub Actions 作为触发器
最后一步是创建一个 GitHub Actions 工作流,作为 SRE 团队与我们控制平面交互的入口。这提供了一个带审计日志、权限控制且易于使用的界面。
.github/workflows/rotate-credential.yml:
name: "Manual Credential Rotation"
on:
workflow_dispatch:
inputs:
node_id:
description: 'The FQDN of the target Puppet node'
required: true
type: string
secret_name:
description: 'The name of the GitHub Secret containing the new credential'
required: true
type: string
jobs:
rotate:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v3
- name: Install gRPCurl
run: |
sudo apt-get update && sudo apt-get install -y grpcurl
- name: Generate Request ID
id: request_id
run: echo "uuid=$(uuidgen)" >> $GITHUB_OUTPUT
- name: Execute Rotation via gRPC
env:
# 从 GitHub Secrets 中获取凭证
NEW_CREDENTIAL: ${{ secrets[inputs.secret_name] }}
# gRPC 服务器地址,应配置在 GitHub 环境或仓库变量中
GRPC_SERVER_ADDRESS: "grpc.internal.mycorp.com:50051"
run: |
echo "Starting rotation for node: ${{ inputs.node_id }}"
echo "Request ID: ${{ steps.request_id.outputs.uuid }}"
echo "You can monitor the progress at: https://ops-dashboard.internal.mycorp.com/rotation/${{ steps.request_id.outputs.uuid }}"
# 使用 grpcurl 调用服务
# -plaintext: 假设内部网络,禁用 TLS。生产环境必须使用 TLS。
# -d: 指定请求体,使用 JSON 格式
grpcurl -plaintext \
-d '{
"node_id": "${{ inputs.node_id }}",
"new_credential_payload": "${NEW_CREDENTIAL}",
"request_id": "${{ steps.request_id.outputs.uuid }}"
}' \
${GRPC_SERVER_ADDRESS} \
rotation_service.RotationController/RotateCredential
这个工作流使用 workflow_dispatch 允许手动触发。它接收 node_id 和一个 GitHub Secret 的名称作为输入。然后,它使用 grpcurl 这个方便的命令行工具来调用我们的 gRPC 服务。一个重要的细节是,它会打印出一个指向我们(虚构的)前端观察页面的链接,SRE 可以点击这个链接实时观看轮换进度。
当前方案的局限性与未来展望
我们构建的这个系统,虽然在功能上满足了初步要求,但在生产环境中推广前仍有几个需要解决的局限性。
首先,控制平面与 Puppet 节点的交互方式目前是“推”模型(例如通过 SSH 执行命令)。这要求控制平面对所有节点有网络可达性和凭证。在大规模或网络隔离的环境中,这会成为一个瓶颈和安全风险。一个更优的架构是采用“拉”模型,在每个 Puppet 节点上部署一个轻量级 gRPC客户端(Agent),由它主动连接到控制平面并监听指令。
其次,使用内存中的 broadcast channel 作为事件总线,使得控制平面服务本身成为了一个有状态的单点。如果服务重启,所有进行中的任务状态都会丢失。将其替换为外部的持久化消息队列(如 Redis Streams 或 NATS JetStream)是扩展到高可用部署的必要步骤。
最后,安全性需要进一步加强。gRPC 服务与 GitHub Actions 之间、以及与节点之间的通信都必须强制启用 mTLS。凭证本身不应直接通过 GitHub Actions 的环境变量传递,而应由控制平面服务使用自身的身份,在运行时从 Vault 等密钥管理系统中拉取。