构建基于 Redis Streams 与 Go-Fiber 的准实时 Solr 索引管道


一个常见的痛点在于,当核心业务数据发生变更时,如何高效且可靠地同步到搜索引擎中。传统的做法是在业务逻辑中直接调用搜索引擎的API进行同步更新。这种强耦合的设计在生产环境中脆弱不堪:搜索引擎一次抖动或网络延迟,就会直接拖慢主业务的API响应时间,甚至导致整个写操作失败。如果索引更新逻辑复杂,这种同步调用还会严重侵蚀核心业务的处理能力。

我们最初的系统就面临这个问题。每次商品信息更新,Go服务在写入数据库后,会同步调用Solr的HTTP接口更新索引。在促销活动期间,商品信息批量变更,API的P99延迟急剧恶化,Solr集群的瞬时压力也常常导致写入超时,进而引发数据不一致,需要耗费大量人力进行数据校对和重建索引。

必须解耦。初步构想是引入一个消息队列,将索引更新操作异步化。业务方只需将变更事件(如商品创建、更新、删除)投递到队列中,就可以立即返回,由一个独立的消费者服务负责从队列中拉取事件并与Solr交互。这样,核心业务的性能和稳定性将不再受搜索引擎的影响。

在技术选型上,我们排除了重量级的Kafka。对于当前的业务量级,Kafka的运维复杂度和资源消耗都显得过高。我们最终选择了Redis Streams。它轻量、性能卓越,且内建于我们已经广泛使用的Redis中,无需引入新的技术栈。其持久化能力、消费者组(Consumer Groups)机制以及消息确认(ACK)机制,完美契合了我们构建一个可靠的、可水平扩展的索引管道的需求。

整个架构的最终形态如下:

graph TD
    subgraph "浏览器"
        A[Vue.js 前端]
    end

    subgraph "API 网关层"
        B(Go-Fiber API)
    end

    subgraph "数据与消息层"
        C[PostgreSQL/MySQL]
        D[Redis Streams: `index_events`]
    end

    subgraph "索引消费层"
        E[Go 索引消费者服务]
    end

    subgraph "搜索层"
        F[Apache Solr]
    end

    A -- "1. 发起数据变更请求 (POST /product)" --> B
    A -- "6. 发起搜索请求 (GET /search)" --> B
    B -- "2. 写入主数据库" --> C
    B -- "3. 发送变更事件到Stream" --> D
    B -- "7. 查询 Solr" --> F
    E -- "4. 从Stream拉取事件" --> D
    E -- "5. 更新 Solr 索引" --> F
    F -- "8. 返回搜索结果" --> B

步骤一:事件生产者 - 在 Go-Fiber 中投递变更事件

生产者是我们的主业务API,使用Go-Fiber构建。当处理一个创建或更新商品的请求时,它在完成数据库事务后,需要向Redis Streams中名为 index_events 的流里添加一条消息。

消息体必须包含足够的信息让消费者能够独立完成工作。一个好的实践是包含操作类型(create, update, delete)、实体ID以及完整的数据载荷。

services/producer/main.go

package main

import (
	"context"
	"encoding/json"
	"log"
	"os"

	"github.com/go-redis/redis/v8"
	"github.com/gofiber/fiber/v2"
	"github.com/gofiber/fiber/v2/middleware/logger"
)

// IndexEvent 定义了发送到 Redis Stream 的事件结构
type IndexEvent struct {
	Action string      `json:"action"` // "index" or "delete"
	ID     string      `json:"id"`
	Data   ProductData `json:"data,omitempty"`
}

// ProductData 代表商品信息
type ProductData struct {
	Name        string   `json:"name"`
	Category    string   `json:"category"`
	Price       float64  `json:"price"`
	Description string   `json:"description"`
	Tags        []string `json:"tags"`
}

var redisClient *redis.Client
var streamName = "index_events"
var ctx = context.Background()

func initRedis() {
	redisAddr := os.Getenv("REDIS_ADDR")
	if redisAddr == "" {
		redisAddr = "localhost:6379"
	}
	redisClient = redis.NewClient(&redis.Options{
		Addr: redisAddr,
	})

	if _, err := redisClient.Ping(ctx).Result(); err != nil {
		log.Fatalf("无法连接到 Redis: %v", err)
	}
	log.Println("成功连接到 Redis")
}

func main() {
	initRedis()
	app := fiber.New()
	app.Use(logger.New())

	// 模拟创建/更新产品的端点
	app.Post("/product", handleProductUpdate)

	log.Fatal(app.Listen(":3000"))
}

func handleProductUpdate(c *fiber.Ctx) error {
	var product ProductData
	if err := c.BodyParser(&product); err != nil {
		return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{"error": "无法解析请求体"})
	}

	// 实际项目中,这里会有数据库操作
	// log.Printf("正在将产品 %s 存入数据库...", product.Name)
	// time.Sleep(50 * time.Millisecond) // 模拟数据库延迟

	productID := generateID() // 假设有一个ID生成函数

	event := IndexEvent{
		Action: "index",
		ID:     productID,
		Data:   product,
	}

	eventBytes, err := json.Marshal(event)
	if err != nil {
		log.Printf("序列化事件失败: %v", err)
		return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{"error": "内部服务器错误"})
	}

	// 使用 XADD 命令将事件添加到流中
	// '*' 表示让 Redis 自动生成消息ID
	// `Values` 是一个 map[string]interface{}
	args := &redis.XAddArgs{
		Stream: streamName,
		Values: map[string]interface{}{"event": eventBytes},
	}

	messageID, err := redisClient.XAdd(ctx, args).Result()
	if err != nil {
		log.Printf("向 Redis Stream 添加事件失败: %v", err)
		// 这里的错误处理很关键。如果消息发送失败,需要有重试或告警机制。
		// 在生产环境中,可以考虑引入一个本地队列作为缓冲。
		return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{"error": "无法处理索引事件"})
	}

	log.Printf("成功发送事件到流 %s, 消息 ID: %s", streamName, messageID)

	return c.Status(fiber.StatusAccepted).JSON(fiber.Map{
		"message": "产品更新请求已接收,正在异步处理",
		"product_id": productID,
		"event_id": messageID,
	})
}

// 简单的ID生成器
func generateID() string {
    // 实际项目中应使用更可靠的ID生成策略,如UUID
    b := make([]byte, 16)
    _, _ = rand.Read(b)
    return fmt.Sprintf("%x-%x-%x-%x-%x", b[0:4], b[4:6], b[6:8], b[8:10], b[10:])
}

这里的核心是 redisClient.XAdd。它将序列化后的事件作为Values的一部分推送到index_events流。API的响应码是202 Accepted,明确告诉客户端请求已被接受,但处理是异步的。这正是解耦带来的好处。

步骤二:Solr 索引消费服务

这是整个管道的核心。它是一个独立的Go服务,唯一的职责就是从Redis Streams中拉取事件,并将其转化为对Solr的操作。

为了保证可靠性和可扩展性,我们必须使用消费者组。

  1. 消费者组 (Consumer Group): 允许多个消费者实例共同消费同一个流。Redis会确保流中的每条消息只被组内的一个消费者处理。这使得我们可以通过简单地增加消费者服务的实例数量来水平扩展处理能力。
  2. 消息确认 (ACK): 消费者处理完一条消息后,必须向Redis发送XACK命令。这告诉Redis这条消息已被成功处理,可以从待处理列表(Pending Entries List, PEL)中移除。如果消费者在处理过程中崩溃而没有发送ACK,这条消息会一直保留在PEL中,可以被组内的其他消费者或该消费者重启后通过XCLAIM命令重新认领和处理。这就实现了“至少一次”的处理语义。

services/consumer/main.go

package main

import (
	"context"
	"encoding/json"
	"fmt"
	"log"
	"net/http"
	"os"
	"strings"
	"time"

	"github.com/go-redis/redis/v8"
)

// 和生产者共享相同的事件结构
type IndexEvent struct {
	Action string      `json:"action"`
	ID     string      `json:"id"`
	Data   ProductData `json:"data,omitempty"`
}

type ProductData struct {
	Name        string   `json:"name"`
	Category    string   `json:"category"`
	Price       float64  `json:"price"`
	Description string   `json:"description"`
	Tags        []string `json:"tags"`
}

var (
	redisClient *redis.Client
	httpClient  *http.Client
	streamName  = "index_events"
	groupName   = "solr_indexer_group"
	consumerName string
	solrUpdateURL string
	ctx         = context.Background()
)

func init() {
	// 初始化 Redis 客户端
	redisAddr := os.Getenv("REDIS_ADDR")
	if redisAddr == "" {
		redisAddr = "localhost:6379"
	}
	redisClient = redis.NewClient(&redis.Options{Addr: redisAddr})
	if _, err := redisClient.Ping(ctx).Result(); err != nil {
		log.Fatalf("无法连接到 Redis: %v", err)
	}

	// 初始化 Solr HTTP 客户端
	solrHost := os.Getenv("SOLR_HOST")
	if solrHost == "" {
		solrHost = "localhost:8983"
	}
	solrCore := os.Getenv("SOLR_CORE")
	if solrCore == "" {
		solrCore = "products"
	}
	solrUpdateURL = fmt.Sprintf("http://%s/solr/%s/update?commit=true", solrHost, solrCore)
	
	httpClient = &http.Client{Timeout: 10 * time.Second}

	// 生成一个唯一的消费者名称
	hostname, _ := os.Hostname()
	consumerName = fmt.Sprintf("consumer-%s-%d", hostname, os.Getpid())

	log.Printf("消费者 '%s' 启动", consumerName)
}

func main() {
	// 尝试创建消费者组。`MKSTREAM`选项可以在流不存在时自动创建。
	// `$`表示从流的末尾开始消费,只接收新消息。
	// 如果使用`0`,则会从头开始消费所有历史消息。
	_, err := redisClient.XGroupCreateMkStream(ctx, streamName, groupName, "$").Result()
	if err != nil {
		// 如果组已存在,会返回一个特定的错误,我们可以安全地忽略它。
		if !strings.Contains(err.Error(), "BUSYGROUP") {
			log.Fatalf("创建消费者组失败: %v", err)
		}
		log.Printf("消费者组 '%s' 已存在", groupName)
	} else {
		log.Printf("消费者组 '%s' 创建成功", groupName)
	}

	// 启动主消费循环
	consumeEvents()
}

func consumeEvents() {
	for {
		// XReadGroup 从流中读取消息。
		// `>` 是一个特殊ID,表示只读取从未被投递给组内任何消费者的消息。
		// `Count` 指定一次最多拉取多少条。
		// `Block` 指定阻塞等待时间,0表示无限期阻塞。
		streams, err := redisClient.XReadGroup(ctx, &redis.XReadGroupArgs{
			Group:    groupName,
			Consumer: consumerName,
			Streams:  []string{streamName, ">"},
			Count:    10, // 批量处理以提高效率
			Block:    5 * time.Second,
		}).Result()

		if err != nil {
			// `redis.Nil` 表示超时,没有新消息,是正常情况。
			if err == redis.Nil {
				continue
			}
			log.Printf("从流中读取失败: %v. 稍后重试...", err)
			time.Sleep(5 * time.Second)
			continue
		}

		// 处理拉取到的消息
		for _, stream := range streams {
			for _, message := range stream.Messages {
				processMessage(message)
			}
		}
	}
}

func processMessage(msg redis.XMessage) {
	log.Printf("正在处理消息 ID: %s", msg.ID)
	
	eventData, ok := msg.Values["event"].(string)
	if !ok {
		log.Printf("错误: 消息 %s 格式不正确,缺少'event'字段", msg.ID)
		// 消息格式错误,无法处理,直接ACK掉,防止无限重试。
		ackMessage(msg.ID)
		return
	}

	var event IndexEvent
	if err := json.Unmarshal([]byte(eventData), &event); err != nil {
		log.Printf("错误: 反序列化消息 %s 失败: %v", msg.ID, err)
		ackMessage(msg.ID)
		return
	}

	var solrErr error
	switch event.Action {
	case "index":
		solrErr = indexDocument(event)
	case "delete":
		solrErr = deleteDocument(event.ID)
	default:
		log.Printf("警告: 未知的事件动作 '%s' in message %s", event.Action, msg.ID)
		ackMessage(msg.ID) // 未知动作,也直接ACK
		return
	}

	if solrErr != nil {
		log.Printf("错误: 处理消息 %s 失败 (Solr操作失败): %v", msg.ID, solrErr)
		// !!!这里的错误处理至关重要!!!
		// 不 ACK 消息。该消息将保留在待处理列表(PEL)中。
		// 其他消费者可以通过 XCLAIM 认领并重试。
		// 需要有监控来报警 PEL 过长的情况。
		return
	}

	// 成功处理,ACK消息
	ackMessage(msg.ID)
}

func indexDocument(event IndexEvent) error {
	// Solr 需要一个文档数组
	doc := map[string]interface{}{
		"id":           event.ID,
		"name_s":       event.Data.Name,
		"category_s":   event.Data.Category,
		"price_d":      event.Data.Price,
		"description_t": event.Data.Description,
		"tags_ss":      event.Data.Tags,
	}

	body, err := json.Marshal([]interface{}{doc})
	if err != nil {
		return fmt.Errorf("序列化Solr文档失败: %w", err)
	}

	req, err := http.NewRequest("POST", solrUpdateURL, strings.NewReader(string(body)))
	if err != nil {
		return fmt.Errorf("创建Solr请求失败: %w", err)
	}
	req.Header.Set("Content-Type", "application/json")

	resp, err := httpClient.Do(req)
	if err != nil {
		return fmt.Errorf("请求Solr失败: %w", err)
	}
	defer resp.Body.Close()

	if resp.StatusCode != http.StatusOK {
		return fmt.Errorf("Solr返回非200状态码: %s", resp.Status)
	}

	log.Printf("成功为文档 %s 创建/更新了Solr索引", event.ID)
	return nil
}

func deleteDocument(docID string) error {
    // Solr delete by ID
    deletePayload := map[string]interface{}{
        "delete": docID,
    }
    body, err := json.Marshal(deletePayload)
    if err != nil {
        return fmt.Errorf("序列化Solr删除载荷失败: %w", err)
    }

	req, err := http.NewRequest("POST", solrUpdateURL, strings.NewReader(string(body)))
	if err != nil {
		return fmt.Errorf("创建Solr删除请求失败: %w", err)
	}
	req.Header.Set("Content-Type", "application/json")
    
    resp, err := httpClient.Do(req)
	if err != nil {
		return fmt.Errorf("请求Solr失败: %w", err)
	}
	defer resp.Body.Close()

	if resp.StatusCode != http.StatusOK {
		return fmt.Errorf("Solr返回非200状态码: %s", resp.Status)
	}

    log.Printf("成功从Solr删除了文档 %s", docID)
	return nil
}

func ackMessage(messageID string) {
	result, err := redisClient.XAck(ctx, streamName, groupName, messageID).Result()
	if err != nil {
		log.Printf("警告: ACK消息 %s 失败: %v", messageID, err)
	} else if result == 1 {
		log.Printf("成功ACK消息 %s", messageID)
	}
}

这个消费者服务的代码体现了生产级应用的几个要点:

  • 批量拉取: Count: 10 减少了网络往返,提高了吞吐量。
  • 优雅的错误处理: 明确区分可重试错误(如Solr连接失败)和不可重试错误(如消息格式错误)。对于前者,不执行XACK,让消息有机会被再次处理。对于后者,则必须XACK以避免”毒丸消息”阻塞整个管道。
  • 唯一消费者名称: 确保了在消费者组内可以准确追踪每个消费者的状态。

步骤三:Solr Schema 配置

为了让上述Go代码能正常工作,Solr中需要有一个对应的core,并定义好schema。这里是一个简化的managed-schema示例,字段类型后缀(如 _s, _d, _t, _ss)是Solr的常见约定,分别代表string、double、text和strings(多值字符串)。

managed-schema

<schema name="products" version="1.6">
  <field name="id" type="string" indexed="true" stored="true" required="true" multiValued="false"/>
  <field name="name_s" type="string" indexed="true" stored="true"/>
  <field name="category_s" type="string" indexed="true" stored="true"/>
  <field name="price_d" type="pdouble" indexed="true" stored="true"/>
  <field name="description_t" type="text_general" indexed="true" stored="true"/>
  <field name="tags_ss" type="string" indexed="true" stored="true" multiValued="true"/>

  <field name="_version_" type="plong" indexed="false" stored="false"/>
  <field name="_text_" type="text_general" indexed="true" stored="false" multiValued="true"/>

  <uniqueKey>id</uniqueKey>

  <fieldType name="pdouble" class="solr.DoublePointField" docValues="true"/>
  <fieldType name="plong" class="solr.LongPointField" docValues="true"/>
  <fieldType name="string" class="solr.StrField" sortMissingLast="true" docValues="true"/>
  <fieldType name="text_general" class="solr.TextField" positionIncrementGap="100">
    <analyzer>
      <tokenizer class="solr.StandardTokenizerFactory"/>
      <filter class="solr.LowerCaseFilterFactory"/>
    </analyzer>
  </fieldType>
</schema>

步骤四:Vue.js 前端与搜索接口

最后,为了验证整个管道,我们需要一个前端界面来触发更新并执行搜索。

首先,在Go-Fiber API服务中增加一个搜索接口,它直接查询Solr。

services/producer/main.go (新增部分)

// ... (在 main 函数中)
app.Get("/search", handleSearch)

// ... (新增函数)
func handleSearch(c *fiber.Ctx) error {
	query := c.Query("q", "*:*") // 默认查询所有
	solrHost := os.Getenv("SOLR_HOST")
	if solrHost == "" {
		solrHost = "localhost:8983"
	}
	solrCore := os.Getenv("SOLR_CORE")
	if solrCore == "" {
		solrCore = "products"
	}
	
	// 实际项目中,应使用更健壮的 Solr 客户端库
	// 这里为了简化,直接拼接 URL
	solrURL := fmt.Sprintf("http://%s/solr/%s/select?q=%s&wt=json", solrHost, solrCore, query)
	
	resp, err := http.Get(solrURL)
	if err != nil {
		log.Printf("查询Solr失败: %v", err)
		return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{"error": "搜索服务暂时不可用"})
	}
	defer resp.Body.Close()

	c.Set(fiber.HeaderContentType, fiber.MIMEApplicationJSON)
	return c.Status(resp.StatusCode).SendStream(resp.Body)
}

前端Vue.js组件则提供一个输入框用于创建商品,另一个输入框用于搜索。

frontend/src/App.vue

<template>
  <div id="app">
    <h1>实时索引系统演示</h1>

    <div class="panel">
      <h2>1. 创建/更新商品 (触发索引)</h2>
      <form @submit.prevent="submitProduct">
        <input v-model="newProduct.name" placeholder="商品名称" required />
        <input v-model="newProduct.category" placeholder="分类" required />
        <input v-model.number="newProduct.price" type="number" placeholder="价格" required />
        <input v-model="newProduct.description" placeholder="描述" />
        <input v-model="newProduct.tags" placeholder="标签 (逗号分隔)" />
        <button type="submit">提交</button>
      </form>
      <p v-if="submitStatus">{{ submitStatus }}</p>
    </div>

    <div class="panel">
      <h2>2. 搜索商品</h2>
      <input v-model="searchQuery" @input="search" placeholder="输入搜索词..." />
      <div v-if="isLoading">正在搜索...</div>
      <ul v-else-if="results.length">
        <li v-for="item in results" :key="item.id">
          <strong>{{ item.name_s }}</strong> ({{ item.category_s }}) - ${{ item.price_d }}
          <p>{{ item.description_t }}</p>
          <small>Tags: {{ item.tags_ss.join(', ') }}</small>
        </li>
      </ul>
      <p v-else>无结果</p>
    </div>
  </div>
</template>

<script>
import { debounce } from 'lodash-es';

export default {
  name: 'App',
  data() {
    return {
      newProduct: {
        name: '新款高性能笔记本',
        category: '电子产品',
        price: 9999.99,
        description: '搭载最新款处理器和显卡',
        tags: '笔记本,电脑,高性能',
      },
      submitStatus: '',
      searchQuery: '笔记本',
      results: [],
      isLoading: false,
    };
  },
  methods: {
    async submitProduct() {
      try {
        const payload = {
          ...this.newProduct,
          tags: this.newProduct.tags.split(',').map(t => t.trim()),
        };
        const response = await fetch('/api/product', {
          method: 'POST',
          headers: { 'Content-Type': 'application/json' },
          body: JSON.stringify(payload),
        });
        const data = await response.json();
        if (!response.ok) {
          throw new Error(data.error || '提交失败');
        }
        this.submitStatus = `请求成功: ${data.message}`;
        setTimeout(() => this.search(), 1000); // 1秒后自动搜索
      } catch (error) {
        this.submitStatus = `错误: ${error.message}`;
      }
    },
    search: debounce(async function() {
      if (!this.searchQuery) {
        this.results = [];
        return;
      }
      this.isLoading = true;
      try {
        const response = await fetch(`/api/search?q=name_s:*${this.searchQuery}* OR description_t:*${this.searchQuery}*`);
        const data = await response.json();
        this.results = data.response.docs;
      } catch (error) {
        console.error('搜索失败:', error);
      } finally {
        this.isLoading = false;
      }
    }, 300), // 添加防抖
  },
  mounted() {
    this.search();
  }
};
</script>

<style>
/* 样式... */
</style>

当我们通过Vue前端提交一个新的商品信息,API服务会几乎瞬时返回,同时一个事件被推送到Redis。另一边的消费者服务在几毫秒到几秒内(取决于Block时间和网络延迟)拉取到这个事件,调用Solr API,完成索引。此时,在前端搜索框中输入相关关键词,新提交的商品就能立刻被搜到,实现了准实时的效果,同时核心API的性能和稳定性得到了保障。

当前方案的局限性与展望

这套架构虽然解决了核心的解耦和异步化问题,但在更严苛的生产环境下仍有优化空间。首先,当前是“至少一次”的处理语义。如果消费者在处理完Solr请求后、ACK之前崩溃,重启后可能会重复处理同一条消息,造成对Solr的重复写入。虽然Solr基于uniqueKey的写入是幂等的,但对于需要精确计数的场景,可能需要在消费端实现幂等性检查。

其次,错误处理机制可以更完善。对于持续失败的消息,它们会一直留在PEL中,造成PEL膨胀。需要一个独立的监控和处理机制(有时被称为“死信队列”),定期检查PEL,将处理失败多次的消息转移到另一个流或队列中,并发出警报,供人工干预。

最后,随着业务增长,单个流可能会成为瓶颈。可以考虑按业务类型(如商品、订单)拆分到不同的流和消费者组,实现更精细的资源隔离和故障隔离。对消费者延迟的监控——即消息产生时间和被处理时间的差值——也应纳入APM系统,作为衡量管道健康度的核心SLI。


  目录