在 DigitalOcean 上使用 Redis Streams 构建具备幂等消费与死信队列的事件处理器


我们团队最近接手了一个需求:为用户行为生成异步的、可审计的轨迹记录。这个功能本身不复杂,但对系统的要求却很苛刻:主流程不能被阻塞,记录必须保证最终送达,且不能因为重复投递导致数据错乱。起初,团队的几个同事本能地想到了 Kafka。毫无疑问,Kafka 在这个领域是黄金标准,但我们的整个技术栈都构建在 DigitalOcean 上,以 Droplets 和 Managed Databases 为主,引入一套 Zookeeper + Kafka 集群的运维成本和资源开销,对于我们这个规模的团队和当前项目阶段来说,显得过于沉重。

我们的基础设施里已经有了 DigitalOcean Managed Redis,主要用于缓存和分布式锁。一个问题自然而然地浮现在我脑海:我们能否只用 Redis 来构建一个足够健壮的事件处理系统?不是用 Pub/Sub,那玩意儿丢了消息就没了。也不是用 LIST 做队列,那个 BRPOPLPUSH 的模式虽然可靠,但实现多消费者和消息确认的逻辑太繁琐。我的目光最终落在了 Redis 5.0 之后引入的 Streams 上。它看起来就像一个内置在 Redis 里的微型 Kafka,这正是我们需要的。

决策的核心考量是:用一个我们已经熟悉且托管好的服务,去解决 80% 的问题,远比引入一个全新的、复杂的组件来解决 99% 的问题更具性价比。我们决定沿着这条路走下去。

初版:基于消费组的天真实现

最初的实现非常直接。一个生产者服务,使用 XADD 命令向一个名为 events:user_activity 的流中添加事件。事件内容是一个简单的 JSON 字符串。

// producer/main.go
package main

import (
	"context"
	"encoding/json"
	"fmt"
	"log"
	"os"
	"time"

	"github.com/go-redis/redis/v8"
)

var ctx = context.Background()

type UserEvent struct {
	UserID    string    `json:"user_id"`
	EventType string    `json:"event_type"`
	Timestamp time.Time `json:"timestamp"`
	Payload   map[string]interface{} `json:"payload"`
}

func main() {
	redisAddr := os.Getenv("REDIS_ADDR")
	if redisAddr == "" {
		redisAddr = "localhost:6379"
	}

	rdb := redis.NewClient(&redis.Options{
		Addr: redisAddr,
	})

	// 模拟连续产生事件
	for i := 0; i < 10; i++ {
		event := UserEvent{
			UserID:    fmt.Sprintf("user-%d", i%3),
			EventType: "login",
			Timestamp: time.Now().UTC(),
			Payload:   map[string]interface{}{"ip_address": "192.168.1.100"},
		}

		eventJSON, err := json.Marshal(event)
		if err != nil {
			log.Printf("Failed to marshal event: %v", err)
			continue
		}

		// 使用 XADD 将事件添加到流中
		// "*" 表示让 Redis 自动生成消息 ID
		id, err := rdb.XAdd(ctx, &redis.XAddArgs{
			Stream: "events:user_activity",
			Values: map[string]interface{}{"data": eventJSON},
		}).Result()

		if err != nil {
			log.Fatalf("Failed to add event to stream: %v", err)
		}
		log.Printf("Produced event with ID: %s", id)
		time.Sleep(500 * time.Millisecond)
	}
}

消费者侧,我们创建了一个消费组 activity_processors,并启动一个消费者 consumer-1 来处理消息。

// consumer/main.go
package main

import (
	"context"
	"log"
	"os"

	"github.com/go-redis/redis/v8"
)

var (
	ctx         = context.Background()
	streamName  = "events:user_activity"
	groupName   = "activity_processors"
	consumerName = "consumer-1"
)

func main() {
	redisAddr := os.Getenv("REDIS_ADDR")
	if redisAddr == "" {
		redisAddr = "localhost:6379"
	}

	rdb := redis.NewClient(&redis.Options{
		Addr: redisAddr,
	})

    // 创建消费组,如果已存在会返回错误,我们忽略这个错误
    // "0" 表示从流的开头开始消费
    // 在真实项目中,你可能想用 "$" 从流的末尾开始,只消费新消息
	rdb.XGroupCreate(ctx, streamName, groupName, "0").Err()

	log.Printf("Consumer '%s' started, listening to group '%s'", consumerName, groupName)

	for {
		// XReadGroup 会阻塞,直到有新消息
		streams, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
			Group:    groupName,
			Consumer: consumerName,
			Streams:  []string{streamName, ">"}, // ">" 表示只接收从未被投递给组内任何消费者的消息
			Count:    1,
			Block:    0, // 0 表示无限期阻塞
		}).Result()

		if err != nil {
			log.Printf("Error reading from stream: %v", err)
			continue
		}

		for _, stream := range streams {
			for _, message := range stream.Messages {
				log.Printf("Processing message ID: %s, Data: %s", message.ID, message.Values["data"])

				// 模拟处理耗时
				// time.Sleep(1 * time.Second)

				// 确认消息处理完成
				rdb.XAck(ctx, streamName, groupName, message.ID)
				log.Printf("Acknowledged message ID: %s", message.ID)
			}
		}
	}
}

这个版本能跑起来,但它脆弱得像纸一样。一个致命的问题是:如果消费者在 log.Printf("Processing...") 之后、XAck 之前崩溃了会怎么样?这条消息被消费了,但从未被确认。它会永远留在消费组的待处理条目列表(Pending Entries List, PEL)中,变成一个“幽灵消息”,造成事实上的消息丢失。

第二阶段:引入“幽灵消息”收割者

为了解决这个问题,我们需要一个机制来处理那些被投递出去但长时间未被确认的消息。Redis Streams 提供了 XPENDING 命令来检查 PEL,以及 XCLAIM (或在新版本中更推荐的 XAUTOCLAIM) 来重新认领这些消息。

我们可以在消费者启动时,或者作为一个独立的“监工”进程,来执行这个收割逻辑。在我们的场景中,让每个消费者在主循环之外承担一部分收割工作是更简单的部署方案。

我修改了消费者的主循环,增加了一个定时任务,每分钟检查一次是否有超过5分钟未确认的“僵尸”消息,并将其重新认-领到自己名下进行处理。

// consumer/main.go (部分修改)
func main() {
    // ... redis client setup ...
	
    // 启动一个 goroutine 来定期处理僵尸消息
	go reclaimPendingMessages(rdb)

	log.Printf("Consumer '%s' started...", consumerName)

	// ... 主消费循环 (同上) ...
}

func reclaimPendingMessages(rdb *redis.Client) {
	ticker := time.NewTicker(1 * time.Minute)
	defer ticker.Stop()

	for range ticker.C {
		pendingResult, err := rdb.XPendingExt(ctx, &redis.XPendingExtArgs{
			Stream: streamName,
			Group:  groupName,
			Start:  "-", // 从最早的 ID 开始
			End:    "+", // 到最新的 ID 结束
			Count:  10,
		}).Result()
		if err != nil {
			log.Printf("Error fetching pending messages: %v", err)
			continue
		}

		if len(pendingResult) == 0 {
			continue
		}

		log.Printf("Found %d pending messages potentially stuck.", len(pendingResult))
		
		var messageIDsToClaim []string
		for _, p := range pendingResult {
			// 如果消息闲置超过5分钟,就认为前一个消费者已经死亡
			if p.Idle > 5*time.Minute {
				log.Printf("Message %s has been idle for %v, will claim.", p.ID, p.Idle)
				messageIDsToClaim = append(messageIDsToClaim, p.ID)
			}
		}

		if len(messageIDsToClaim) > 0 {
			// 认领这些消息,min-idle-time 设为0,强制认领
			claimResult, err := rdb.XClaim(ctx, &redis.XClaimArgs{
				Stream:   streamName,
				Group:    groupName,
				Consumer: consumerName,
				MinIdle:  0,
				Messages: messageIDsToClaim,
			}).Result()

			if err != nil {
				log.Printf("Error claiming messages: %v", err)
			} else {
				log.Printf("Successfully claimed %d messages.", len(claimResult))
			}
		}
	}
}

注意:XAUTOCLAIM 在 Redis 6.2+ 中是更好的选择,它将检查和认领合二为一,更高效。我们的 DigitalOcean Managed Redis 版本支持它,但在代码中用 XPENDINGXCLAIM 的组合更能清晰地展示其工作原理。

现在,系统对消费者崩溃有了一定的容错能力。但新的问题随之而来:如果一条消息本身就是“有毒的”呢?比如,一个格式错误的 JSON,或者一个会引发数据库唯一键冲突的事件。这条消息会导致消费者逻辑恐慌或返回错误。由于没有被 XACK,它会变成僵尸消息,然后被收割者重新认领,再次投递给某个消费者,再次失败……周而复始,形成一个无限循环的“毒丸”,阻塞整个队列。

第三阶段:构建死信队列(DLQ)与重试逻辑

解决毒丸问题的标准方法是实现一个死信队列(Dead-Letter Queue, DLQ)。当一条消息处理失败达到一定次数后,我们就不再尝试处理它,而是将其转移到一个专门的队列中,供后续人工排查或修复。

利用 Redis Streams,实现 DLQ 相当直接:我们再创建一个 stream,比如 dlq:user_activity。当主 stream 的消息处理失败达到阈值时,我们将其从主 stream 中 XACK 掉(防止它再被投递),然后用 XADD 将其内容连同失败原因一起写入 DLQ stream。

关键在于如何跟踪一条消息的重试次数。XPENDING 命令返回的结果中包含了每条待处理消息的 DeliveryCount。这正是我们需要的计数器!

这是我们的最终版处理器逻辑:

// consumer_final/main.go
package main

import (
	"context"
	"encoding/json"
	"errors"
	"fmt"
	"log"
	"os"
	"time"

	"github.com/go-redis/redis/v8"
)

// --- 配置 ---
const (
	streamName         = "events:user_activity"
	groupName          = "activity_processors"
	dlqStreamName      = "dlq:user_activity"
	maxRetries         = 3
	messageTTL         = 24 * time.Hour // 消息处理状态的有效期
	reclaimInterval    = 30 * time.Second
	reclaimIdleTime    = 2 * time.Minute
)

var (
	ctx          = context.Background()
	consumerName string
)

type UserEvent struct {
	UserID    string                 `json:"user_id"`
	EventType string                 `json:"event_type"`
	Timestamp time.Time              `json:"timestamp"`
	Payload   map[string]interface{} `json:"payload"`
}

// isEventProcessed 检查事件是否已被处理,实现幂等性
func isEventProcessed(rdb *redis.Client, messageID string) (bool, error) {
	key := fmt.Sprintf("processed:events:%s", messageID)
	// SET a key with NX (Not Exists) and EX (Expire) options.
	// This is an atomic operation.
	// If the key already exists, it returns false.
	// If it doesn't exist, it sets the key and returns true.
	wasSet, err := rdb.SetNX(ctx, key, 1, messageTTL).Result()
	if err != nil {
		return false, fmt.Errorf("redis SETNX failed: %w", err)
	}
	return !wasSet, nil
}

// processMessage 模拟真实的消息处理逻辑
func processMessage(messageData string) error {
	var event UserEvent
	if err := json.Unmarshal([]byte(messageData), &event); err != nil {
		log.Printf("FATAL: Unmarshal failed for data: %s. Error: %v", messageData, err)
		return fmt.Errorf("unmarshal error: %w", err)
	}

	// 模拟一个会持续失败的 "毒丸" 消息
	if event.UserID == "user-poison" {
		log.Printf("ERROR: This is a poison pill for user %s. Failing deliberately.", event.UserID)
		return errors.New("poison pill message")
	}

	log.Printf("Successfully processed event for user: %s, type: %s", event.UserID, event.EventType)
	return nil
}

// moveToDLQ 将消息移入死信队列
func moveToDLQ(rdb *redis.Client, originalID string, message map[string]interface{}, reason string) {
	log.Printf("Moving message %s to DLQ. Reason: %s", originalID, reason)
	
    // 将原始消息体和失败原因一起存入DLQ
	dlqValues := map[string]interface{}{
		"original_id":   originalID,
		"failed_at":     time.Now().UTC().Format(time.RFC3339),
		"fail_reason":   reason,
		"consumer":      consumerName,
		"original_data": message["data"],
	}

	if _, err := rdb.XAdd(ctx, &redis.XAddArgs{
		Stream: dlqStreamName,
		Values: dlqValues,
	}).Result(); err != nil {
		log.Printf("CRITICAL: Failed to move message %s to DLQ: %v", originalID, err)
		// 在这种情况下,我们选择不 ACK 原消息,让它被重新处理
		// 也许 DLQ 服务暂时不可用,下次重试时可能会成功
		return
	}

    // 成功移入 DLQ后,才从主消费组中确认并移除
	if _, err := rdb.XAck(ctx, streamName, groupName, originalID).Result(); err != nil {
		log.Printf("CRITICAL: Message %s was moved to DLQ but failed to be ACKed: %v", originalID, err)
	}
}

// handleMessage 是核心处理逻辑,包含了幂等性、重试和 DLQ
func handleMessage(rdb *redis.Client, message *redis.XMessage) {
	messageID := message.ID
	messageData, ok := message.Values["data"].(string)
	if !ok {
		moveToDLQ(rdb, messageID, message.Values, "Message data is not a string")
		return
	}
	
	// 1. 幂等性检查
	alreadyProcessed, err := isEventProcessed(rdb, messageID)
	if err != nil {
		log.Printf("WARN: Idempotency check failed for %s: %v. Will retry.", messageID, err)
		return // 不 ACK,让它被重新投递
	}
	if alreadyProcessed {
		log.Printf("INFO: Message %s was already processed. Acknowledging again.", messageID)
		rdb.XAck(ctx, streamName, groupName, messageID)
		return
	}
	
	// 2. 核心业务逻辑处理
	if err := processMessage(messageData); err != nil {
		log.Printf("WARN: Failed to process message %s. Error: %v", messageID, err)
		
		// 3. 失败处理:检查重试次数
		// 注意: XPending 检查的是整个组的 pending 状态,而不是单个 consumer
		// 在这里我们用 XPendingExt 来获取单个消息的详细信息
		pendingResult, err := rdb.XPendingExt(ctx, &redis.XPendingExtArgs{
			Stream: streamName,
			Group:  groupName,
			Start:  messageID,
			End:    messageID,
			Count:  1,
		}).Result()
		
		if err != nil || len(pendingResult) == 0 {
			log.Printf("WARN: Could not get pending info for %s: %v. Will let it be reclaimed.", messageID, err)
			return // 同样不 ACK
		}
		
		deliveryCount := pendingResult[0].DeliveryCount
		log.Printf("Message %s delivery count: %d", messageID, deliveryCount)

		if deliveryCount >= maxRetries {
			moveToDLQ(rdb, messageID, message.Values, err.Error())
		} else {
			// 次数未到,什么都不做。消息会留在 PEL 中,等待被收割者重新认领,从而实现延迟重试
			log.Printf("Message %s will be retried later.", messageID)
		}
		return
	}

	// 4. 成功处理,ACK 消息
	if _, err := rdb.XAck(ctx, streamName, groupName, messageID).Result(); err != nil {
		log.Printf("ERROR: Failed to ACK processed message %s: %v", messageID, err)
	} else {
		log.Printf("Successfully processed and acknowledged message %s", messageID)
	}
}

func main() {
	redisAddr := os.Getenv("REDIS_ADDR")
	if redisAddr == "" {
		redisAddr = "localhost:6379"
	}
	hostname, _ := os.Hostname()
	consumerName = fmt.Sprintf("consumer-%s-%d", hostname, os.Getpid())

	rdb := redis.NewClient(&redis.Options{Addr: redisAddr})
	
	rdb.XGroupCreateMkStream(ctx, streamName, groupName, "$").Err()

	// 使用 XAUTOCLAIM 替代手动的 XPENDING + XCLAIM
	go func(rdb *redis.Client) {
		ticker := time.NewTicker(reclaimInterval)
		defer ticker.Stop()
		for range ticker.C {
			result, err := rdb.XAutoClaim(ctx, &redis.XAutoClaimArgs{
				Stream:   streamName,
				Group:    groupName,
				Consumer: consumerName,
				MinIdle:  reclaimIdleTime,
				Start:    "0-0", // 从最早的消息开始检查
				Count:    10,
			}).Result()
			
			if err != nil && err != redis.Nil {
				log.Printf("Error during XAUTOCLAIM: %v", err)
			}
			if len(result.Messages) > 0 {
				log.Printf("Reclaimed %d messages", len(result.Messages))
				for i := range result.Messages {
					go handleMessage(rdb, &result.Messages[i])
				}
			}
		}
	}(rdb)
	
	log.Printf("Consumer '%s' started on group '%s'", consumerName, groupName)

	for {
		streams, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
			Group:    groupName,
			Consumer: consumerName,
			Streams:  []string{streamName, ">"},
			Count:    10,
			Block:    5 * time.Second,
		}).Result()

		if err != nil {
			if err != redis.Nil {
				log.Printf("Error reading from group: %v", err)
			}
			continue
		}

		for _, stream := range streams {
			for i := range stream.Messages {
				go handleMessage(rdb, &stream.Messages[i])
			}
		}
	}
}

最后,我们还需要考虑“at-least-once delivery”(至少一次投递)语义带来的一个经典问题:幂等性。由于消息可能因为网络问题、消费者崩溃重连等原因被重复投递,我们的处理逻辑必须能够承受同一条消息被处理多次而结果不变。最简单的实现方式是利用 Redis 的 SETNX。在处理消息前,我们尝试设置一个 processed:<message_id> 的键,如果设置成功,说明是第一次处理;如果失败,说明已经有别的进程处理过了,直接 XACK 并跳过即可。这个键需要设置一个合理的过期时间,以防处理成功后 XACK 失败导致该消息永远无法被再次处理。

下面是整个流程的架构图:

graph TD
    subgraph Producer
        P[App Service] -- XADD --> S[Stream: events:user_activity]
    end

    subgraph Consumer Cluster
        C1[Consumer 1]
        C2[Consumer 2]
        C3[Consumer N]
    end

    S -- XREADGROUP --> C1
    S -- XREADGROUP --> C2
    S -- XREADGROUP --> C3

    subgraph "Processing Logic (in each consumer)"
        Start((Start)) --> Idempotency{Idempotency Check?};
        Idempotency -- Yes --> Ack[XACK] & End((End));
        Idempotency -- No --> Process[Process Message];
        Process -- Success --> Ack;
        Process -- Failure --> CheckRetries{Retry Count < 3?};
        CheckRetries -- Yes --> Wait[Do Nothing, Wait for Reclaim];
        CheckRetries -- No --> MoveToDLQ[Move to DLQ];
        MoveToDLQ -- XADD --> DLQS[Stream: dlq:user_activity];
        MoveToDLQ --> Ack;
    end
    
    subgraph "Reaper Logic (in each consumer)"
        T[Timer: 30s] --> AutoClaim{XAUTOCLAIM};
        AutoClaim -- Finds Idle Msg --> Reprocess[Pass to Processing Logic];
    end

    subgraph Monitoring
        M[Admin/Monitor] -- XREAD --> DLQS;
    end

这个架构最终在 DigitalOcean 上稳定运行了下来。我们只用了一个我们已有的 Managed Redis 实例,就实现了一个具备持久化、消费组、故障转移、毒丸处理和幂等性保障的事件处理系统。它的吞吐量或许不及 Kafka,但对于我们当前的业务量来说绰绰有余,而且运维成本几乎为零。

当然,这个方案并非银弹。它的重试策略是基于固定的认领间隔,无法实现指数退避等更复杂的策略,除非引入额外的 Redis 数据结构来记录每条消息的失败时间和状态,这会增加系统的复杂度。此外,DLQ 中的消息处理也需要配套的工具或脚本来进行监控、告警和手动重放。对于超大规模、需要严格顺序保证或跨地域复制的场景,专用的消息队列系统依然是更合适的选择。但对于大量中小型项目而言,榨干你手中现有工具的潜力,往往是更务实和高效的工程决策。


  目录