使用 Tyk Python 插件与事件驱动架构实现数据库分片的动态路由


我们的用户服务扛不住了。单体 PostgreSQL 实例的写入 I/O 已经饱和,垂直扩展的成本曲线变得异常陡峭。唯一的出路是水平扩展,也就是数据库分片 (Sharding)。这个决定不难做,但随之而来的问题却极其棘手:分片逻辑应该放在哪里?

最初的构想是将其封装在应用层的共享库里。所有微服务都引入这个库,库里包含分片键的计算逻辑和数据源的路由表。这很快就被否决了。每次增加一个新分片,或者调整分片策略,就意味着所有依赖此库的服务都需要重新编译、测试和部署。这是一场运维灾难,而且严重违反了微服务独立演进的原则。

下一个方案是引入一个专用的“数据库路由服务”。应用层将 SQL 和分片键发给这个服务,由它代理执行到正确的分片。这比共享库好一些,至少逻辑是集中的。但它引入了新的网络跳数,并且自身成为了一个关键的单点故障和性能瓶le。

我们团队已经在使用 Tyk 作为 API 网关,它处理了所有的入口流量、认证和速率限制。一个想法浮现出来:如果路由逻辑能下沉到网关层呢?如果 Tyk 能在请求到达应用服务之前,就识别出分片键并决定目标数据源,应用层就可以变得“分片无知”,继续像操作单库一样工作。这非常诱人。Tyk 强大的中间件插件系统,特别是对 Python 的支持,似乎为这个构想提供了可能性。

但新的问题又来了:Tyk 插件是无状态的,它如何知道哪个分片键对应哪个数据库连接?它总不能在每次请求时都去查询一个元数据中心吧?性能开销会无法接受。插件必须在内存中维护一份分片拓扑的路由表。那么,当数据库扩容,增加新分片时,我们如何通知所有 Tyk 网关节点,让它们动态、实时地更新这份内存路由表,而且整个过程不能重启服务,不能有任何流量中断?

这就是事件驱动架构 (EDA) 发挥作用的地方。我们可以构建一个简单的控制平面,当数据库拓扑发生变化时,它会发布一个“拓扑变更”事件。所有的 Tyk 插件实例都作为订阅者,监听这些事件,并安全地更新自己的内存状态。

这个方案将三个组件完美地结合在一起:

  1. 数据库分片 (PostgreSQL): 作为数据存储的最终载体,解决数据容量和写入瓶颈。
  2. Tyk API Gateway: 作为流量入口,通过自定义 Python 插件实现智能、动态的路由决策。
  3. 事件驱动架构 (EDA): 作为控制平面,解耦了拓扑管理和网关路由,实现了配置的动态热更新。

我们决定动手验证这个架构。

第一步:定义分片策略与数据库结构

我们选择一个简单的基于租户ID (tenant_id) 的范围分片策略。假设系统初期有两个分片:

  • shard1: 负责 tenant_id 从 1 到 10000。
  • shard2: 负责 tenant_id 从 10001 到 20000。

在两个独立的 PostgreSQL 实例上,我们创建结构完全相同的 users 表。

Shard 1 (db_shard1) & Shard 2 (db_shard2) DDL:

-- DDL for both shards. Note the check constraint which is crucial for data integrity.
CREATE TABLE users (
    id SERIAL PRIMARY KEY,
    tenant_id BIGINT NOT NULL,
    username VARCHAR(100) NOT NULL,
    email VARCHAR(255) UNIQUE NOT NULL,
    created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
);

-- Optional but highly recommended: add a check constraint to prevent wrong data insertion.
-- For shard1:
ALTER TABLE users ADD CONSTRAINT tenant_id_shard1_check CHECK (tenant_id > 0 AND tenant_id <= 10000);
-- For shard2:
ALTER TABLE users ADD CONSTRAINT tenant_id_shard2_check CHECK (tenant_id > 10000 AND tenant_id <= 20000);

CREATE INDEX idx_users_tenant_id ON users (tenant_id);

-- Insert some sample data for testing
-- On shard1:
INSERT INTO users (tenant_id, username, email) VALUES (123, 'alice_shard1', '[email protected]');
-- On shard2:
INSERT INTO users (tenant_id, username, email) VALUES (15000, 'bob_shard2', '[email protected]');

这里的 CHECK 约束是一个常见的工程实践,它能在数据库层面提供一层保护,防止因应用层或路由层逻辑错误导致数据被写入错误的分片。

第二步:构建事件驱动的控制平面

我们选择 Redis Pub/Sub 作为事件总线,因为它轻量、快速,且 redis-py 库非常易用。控制平面可以是一个简单的管理脚本,在DBA执行完数据库扩容操作后,由这个脚本发布拓扑变更消息。

消息格式必须标准化。我们定义了一个 JSON 结构:

{
  "op": "ADD_SHARD", // or "REMOVE_SHARD", "UPDATE_SHARD_RANGE"
  "payload": {
    "shard_id": "shard3",
    "range_start": 20001,
    "range_end": 30000,
    "dsn": "postgres://user:password@host3:5432/db_shard3"
  }
}

控制平面发布脚本 (control_plane.py):

import redis
import json
import os

# 在真实项目中,这些配置应该来自环境变量或配置中心
REDIS_HOST = os.getenv("REDIS_HOST", "localhost")
REDIS_PORT = int(os.getenv("REDIS_PORT", 6379))
TOPOLOGY_CHANNEL = "db_shard_topology_updates"

def publish_topology_update(operation: str, payload: dict):
    """Publishes a shard topology update message to Redis."""
    try:
        r = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, db=0)
        message = {
            "op": operation,
            "payload": payload
        }
        message_json = json.dumps(message)
        published_count = r.publish(TOPOLOGY_CHANNEL, message_json)
        print(f"Published message to '{TOPOLOGY_CHANNEL}': {message_json}")
        print(f"Message received by {published_count} subscribers.")
    except redis.exceptions.ConnectionError as e:
        print(f"Error connecting to Redis: {e}")
    except Exception as e:
        print(f"An unexpected error occurred: {e}")

if __name__ == '__main__':
    # 模拟添加一个新的分片 shard3
    new_shard_payload = {
        "shard_id": "shard3",
        "range_start": 20001,
        "range_end": 30000,
        # DSN (Data Source Name) 包含连接所需的所有信息
        "dsn": "postgres://postgres:mysecretpassword@pg_shard3:5432/postgres"
    }
    
    # 确保在运行此脚本之前,相应的数据库实例已经准备就绪
    print("Simulating the addition of a new shard (shard3)...")
    publish_topology_update("ADD_SHARD", new_shard_payload)

这个脚本非常简单,其核心职责就是将结构化的拓扑信息广播出去。

第三步:编写核心 - Tyk Python 中间件

这是整个架构的核心。我们需要在 Tyk 的 Python 中间件中实现以下逻辑:

  1. 初始化: 在网关加载插件时,从一个初始配置文件(或元数据API)加载基础分片信息,并启动一个后台线程。
  2. 后台线程: 这个线程专门负责连接 Redis,订阅 db_shard_topology_updates 频道,并持续监听消息。
  3. 状态更新: 收到拓扑变更消息后,后台线程需要以线程安全的方式更新一个全局的、内存中的路由表。
  4. 请求处理: 对于每一个进来的API请求,中间件会解析请求体,提取 tenant_id,查询内存路由表,找到对应的数据库DSN。
  5. 请求重写: 将找到的 DSN 注入到一个自定义的请求头中 (例如 X-Shard-DSN),然后将请求放行到上游服务。

上游服务是一个简单的 “Query Forwarder”,它读取 X-Shard-DSN 头,并使用该 DSN 连接到正确的数据库分片执行操作。这样做的好处是,我们的核心业务逻辑服务可以保持纯粹,而这个 Forwarder 则专门处理数据库连接的细节。

Tyk API 定义

首先,在 Tyk 中定义一个 API,启用自定义 Python 中间件。

tyk_api_definition.json:

{
  "name": "Sharded User Service",
  "api_id": "sharded-user-service",
  "org_id": "1",
  "use_keyless": true,
  "auth": {
    "auth_header_name": ""
  },
  "definition": {
    "location": "header",
    "key": "x-api-version"
  },
  "version_data": {
    "not_versioned": true,
    "versions": {
      "Default": {
        "name": "Default",
        "expires": "",
        "paths": {
          "ignored": [],
          "white_list": [],
          "black_list": []
        },
        "use_extended_paths": true,
        "extended_paths": {
          "custom_middleware": {
            "path": "/users",
            "method": "POST",
            "name": "ShardRoutingMiddleware",
            "require_session": false
          }
        }
      }
    }
  },
  "proxy": {
    "listen_path": "/sharded-user-service/",
    "target_url": "http://query-forwarder:8000/",
    "strip_listen_path": true
  },
  "custom_middleware": {
    "driver": "cpython",
    "pre": [
      {
        "name": "ShardRoutingMiddleware",
        "path": "middleware/middleware.py",
        "function_name": "ShardRoutingMiddleware"
      }
    ]
  }
}

关键配置是 custom_middleware 部分,它告诉 Tyk 在处理请求前,加载 middleware/middleware.py 文件并执行 ShardRoutingMiddleware 函数。

Python 中间件代码 (middleware.py)

import redis
import json
import threading
import logging
from tyk.gateway import TykGateway as tyk

# 配置日志,这在生产环境中对于调试至关重要
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# --- 线程安全的状态管理 ---
# 这是我们内存中的路由表。key是shard_id,value是包含范围和DSN的字典。
# 在真实项目中,初始状态应从配置文件、环境变量或启动时从配置中心拉取。
SHARD_MAP = {
    "shard1": { "range_start": 1, "range_end": 10000, "dsn": "postgres://postgres:mysecretpassword@pg_shard1:5432/postgres" },
    "shard2": { "range_start": 10001, "range_end": 20000, "dsn": "postgres://postgres:mysecretpassword@pg_shard2:5432/postgres" }
}
# 读写 SHARD_MAP 时必须使用锁,因为主请求处理线程和后台更新线程会并发访问它。
shard_map_lock = threading.Lock()
# Redis 配置
REDIS_HOST = "redis"
REDIS_PORT = 6379
TOPOLOGY_CHANNEL = "db_shard_topology_updates"

# --- 后台 Redis 订阅线程 ---
def topology_update_listener():
    """
    在后台运行,监听Redis Pub/Sub以获取分片拓扑更新。
    """
    logger.info("Topology listener thread started.")
    while True:
        try:
            r = redis.Redis(host=REDIS_HOST, port=REDIS_PORT)
            p = r.pubsub()
            p.subscribe(TOPOLOGY_CHANNEL)
            logger.info(f"Subscribed to Redis channel: {TOPOLOGY_CHANNEL}")
            
            for message in p.listen():
                if message['type'] == 'message':
                    logger.info(f"Received topology update: {message['data']}")
                    try:
                        data = json.loads(message['data'])
                        op = data.get("op")
                        payload = data.get("payload")

                        if not op or not payload:
                            logger.error("Invalid message format received.")
                            continue

                        with shard_map_lock:
                            if op == "ADD_SHARD":
                                shard_id = payload.get("shard_id")
                                if shard_id:
                                    SHARD_MAP[shard_id] = payload
                                    logger.info(f"Successfully added shard '{shard_id}'. New map: {SHARD_MAP}")
                                else:
                                    logger.error("ADD_SHARD operation missing 'shard_id'.")
                            # 此处可以添加处理 REMOVE_SHARD 等其他操作的逻辑
                            else:
                                logger.warning(f"Unsupported operation received: {op}")

                    except json.JSONDecodeError:
                        logger.error(f"Failed to decode JSON from message: {message['data']}")
                    except Exception as e:
                        logger.error(f"Error processing message: {e}")

        except redis.exceptions.ConnectionError:
            logger.error("Redis connection failed. Retrying in 5 seconds...")
            threading.Event().wait(5)
        except Exception as e:
            logger.error(f"An unexpected error occurred in listener thread: {e}. Restarting...")
            threading.Event().wait(5)

# 确保后台线程只启动一次
# Tyk加载Python文件时会执行全局代码,这是启动后台任务的理想位置。
listener_thread = threading.Thread(target=topology_update_listener, daemon=True)
listener_thread.start()

# --- Tyk 中间件入口函数 ---
@tyk.middleware
def ShardRoutingMiddleware(request, session, spec):
    """
    这是Tyk为每个请求调用的主处理函数。
    """
    logger.info("ShardRoutingMiddleware triggered.")

    # 1. 解析请求体以获取分片键
    try:
        body = json.loads(request.get_body())
        tenant_id = body.get("tenant_id")
        
        # 验证分片键是否存在且为整数
        if tenant_id is None:
            logger.warning("Request body missing 'tenant_id'.")
            request.set_return_data(
                '{"error": "tenant_id is required"}', 400,
                {"Content-Type": "application/json"}
            )
            return request, session
        
        tenant_id = int(tenant_id)

    except (json.JSONDecodeError, ValueError):
        logger.warning("Invalid JSON body or non-integer tenant_id.")
        request.set_return_data(
            '{"error": "Invalid request body or tenant_id format"}', 400,
            {"Content-Type": "application/json"}
        )
        return request, session
    
    # 2. 查询路由表
    target_dsn = None
    with shard_map_lock: # 使用读锁保护对 SHARD_MAP 的访问
        for shard_info in SHARD_MAP.values():
            if shard_info["range_start"] <= tenant_id <= shard_info["range_end"]:
                target_dsn = shard_info["dsn"]
                break
    
    # 3. 处理找不到分片的情况
    if not target_dsn:
        logger.error(f"No shard found for tenant_id: {tenant_id}")
        request.set_return_data(
            '{"error": "Configuration error: No shard found for the provided tenant_id"}', 503, # 503 Service Unavailable 更合适
            {"Content-Type": "application/json"}
        )
        return request, session

    # 4. 注入 DSN 到请求头
    logger.info(f"Routing tenant_id {tenant_id} to DSN ending with ...{target_dsn[-20:]}")
    request.set_header("X-Shard-DSN", target_dsn)

    # 将请求传递给下一个中间件或上游服务
    return request, session

这段代码是整个系统的智能核心。它不仅处理了 happy path,还包含了关键的生产级考量:

  • 日志: 详细的日志记录对于排查问题至关重要。
  • 线程安全: 使用 threading.Lock 确保了在多线程环境下(Tyk 的 worker + 我们的后台线程)对共享状态 SHARD_MAP 的访问是安全的。这是一个常见的错误点,如果忽略,会导致间歇性的、难以复现的 bug。
  • 错误处理: 妥善处理了 JSON 解析失败、tenant_id 缺失、找不到分片等异常情况,并向客户端返回了明确的 HTTP 错误码和信息。
  • 韧性: 后台线程中的 try-except 块确保了即使 Redis 连接暂时中断,线程也不会崩溃,而是会尝试重连。

第四步:结果验证与架构图

部署完成后,我们来验证整个流程。

  1. 启动 Tyk Gateway, PostgreSQL 实例, Redis, 和我们的 Query Forwarder 服务。
  2. 发送一个创建用户的请求,其 tenant_idshard1 的范围内。
curl -X POST http://localhost:8080/sharded-user-service/users \
-H "Content-Type: application/json" \
-d '{
    "tenant_id": 123,
    "username": "new_user_123",
    "email": "[email protected]"
}'

Tyk Gateway 的日志会显示 ShardRoutingMiddleware 触发,并为 tenant_id: 123 找到了 shard1 的 DSN,并将其注入 X-Shard-DSN 头。请求被转发,数据成功写入 shard1

  1. 现在,模拟数据库扩容。运行我们的 control_plane.py 脚本。

    python control_plane.py

    Tyk Gateway 的日志中,由后台线程打印的日志会立刻显示收到 Redis 消息,并更新了内存中的 SHARD_MAP

  2. 无需任何重启或配置重载,立即发送一个 tenant_id 在新分片范围内的请求。

    curl -X POST http://localhost:8080/sharded-user-service/users \
    -H "Content-Type: application/json" \
    -d '{
        "tenant_id": 25000,
        "username": "new_user_25000",
        "email": "[email protected]"
    }'

    请求成功!Tyk 插件使用了刚刚动态更新的路由表,正确地将请求路由到了 shard3。我们实现了对数据库分片的动态、透明路由。

最终的系统架构如下:

graph TD
    subgraph "Clients"
        Client[Client Application]
    end

    subgraph "API Gateway Layer (Tyk)"
        TykGateway[Tyk Gateway Instance]
        subgraph TykGateway
            Plugin[Python ShardRouting Middleware]
            InMemoryMap{"In-Memory Shard Map
(Thread-Safe)"} RedisSubscriber[Redis Subscriber Thread] Plugin --> InMemoryMap end end subgraph "Upstream Services" QueryForwarder[Query Forwarder Service] end subgraph "Data Layer" Shard1[PostgreSQL Shard 1
tenant_id: 1-10000] Shard2[PostgreSQL Shard 2
tenant_id: 10001-20000] Shard3[PostgreSQL Shard 3
tenant_id: 20001-30000] end subgraph "Control Plane" ControlPlaneScript[DBA/Control Plane Script] Redis[Redis Pub/Sub
Channel: db_shard_topology_updates] end Client -- HTTP Request (with tenant_id) --> TykGateway TykGateway -- Proxies with X-Shard-DSN header --> QueryForwarder QueryForwarder -- SQL Query --> Shard1 QueryForwarder -- SQL Query --> Shard2 QueryForwarder -- SQL Query --> Shard3 RedisSubscriber -- Subscribes to --> Redis ControlPlaneScript -- Publishes update --> Redis Redis -- Pushes update --> RedisSubscriber RedisSubscriber -- Updates --> InMemoryMap style TykGateway fill:#f9f,stroke:#333,stroke-width:2px style Plugin fill:#ccf,stroke:#333,stroke-width:2px

局限性与未来展望

这个方案优雅地解决了动态分片路由的核心问题,但在真实生产环境中,还有几个方面需要加固。当前实现的 Python 插件中的后台线程模型相对简单,如果 Redis 连接频繁抖动,可能会导致短暂的配置不一致。使用一个更健壮的客户端库,并加入更复杂的重连和状态验证逻辑是必要的。

其次,我们引入了 Query Forwarder 作为额外的网络跳数。虽然它逻辑简单,但毕竟存在性能开销和运维成本。一个更极致的优化是开发一个 Tyk 的 Go 插件。Go 插件能更深入地与 Tyk 的代理引擎集成,甚至可能直接在插件内部管理到后端数据库的连接池,从而完全消除 Query Forwarder 这一层。

此外,当前方案只处理了基于 tenant_id 的写操作路由。对于读操作(例如,通过 username 查询用户),系统需要一个二级索引或散列表来定位 tenant_id,或者将这类查询路由到一个聚合了所有分片数据的只读数据仓库。这超出了路由层的范畴,属于数据架构的另一项挑战。

最后,事件总线的选择。对于需要保证消息必达的超高可靠场景,可以考虑用 Kafka 或 NATS JetStream 替换 Redis Pub/Sub,利用其持久化和至少一次送达的特性,确保任何拓扑变更事件都不会丢失。


  目录