我们的用户服务扛不住了。单体 PostgreSQL 实例的写入 I/O 已经饱和,垂直扩展的成本曲线变得异常陡峭。唯一的出路是水平扩展,也就是数据库分片 (Sharding)。这个决定不难做,但随之而来的问题却极其棘手:分片逻辑应该放在哪里?
最初的构想是将其封装在应用层的共享库里。所有微服务都引入这个库,库里包含分片键的计算逻辑和数据源的路由表。这很快就被否决了。每次增加一个新分片,或者调整分片策略,就意味着所有依赖此库的服务都需要重新编译、测试和部署。这是一场运维灾难,而且严重违反了微服务独立演进的原则。
下一个方案是引入一个专用的“数据库路由服务”。应用层将 SQL 和分片键发给这个服务,由它代理执行到正确的分片。这比共享库好一些,至少逻辑是集中的。但它引入了新的网络跳数,并且自身成为了一个关键的单点故障和性能瓶le。
我们团队已经在使用 Tyk 作为 API 网关,它处理了所有的入口流量、认证和速率限制。一个想法浮现出来:如果路由逻辑能下沉到网关层呢?如果 Tyk 能在请求到达应用服务之前,就识别出分片键并决定目标数据源,应用层就可以变得“分片无知”,继续像操作单库一样工作。这非常诱人。Tyk 强大的中间件插件系统,特别是对 Python 的支持,似乎为这个构想提供了可能性。
但新的问题又来了:Tyk 插件是无状态的,它如何知道哪个分片键对应哪个数据库连接?它总不能在每次请求时都去查询一个元数据中心吧?性能开销会无法接受。插件必须在内存中维护一份分片拓扑的路由表。那么,当数据库扩容,增加新分片时,我们如何通知所有 Tyk 网关节点,让它们动态、实时地更新这份内存路由表,而且整个过程不能重启服务,不能有任何流量中断?
这就是事件驱动架构 (EDA) 发挥作用的地方。我们可以构建一个简单的控制平面,当数据库拓扑发生变化时,它会发布一个“拓扑变更”事件。所有的 Tyk 插件实例都作为订阅者,监听这些事件,并安全地更新自己的内存状态。
这个方案将三个组件完美地结合在一起:
- 数据库分片 (PostgreSQL): 作为数据存储的最终载体,解决数据容量和写入瓶颈。
- Tyk API Gateway: 作为流量入口,通过自定义 Python 插件实现智能、动态的路由决策。
- 事件驱动架构 (EDA): 作为控制平面,解耦了拓扑管理和网关路由,实现了配置的动态热更新。
我们决定动手验证这个架构。
第一步:定义分片策略与数据库结构
我们选择一个简单的基于租户ID (tenant_id) 的范围分片策略。假设系统初期有两个分片:
-
shard1: 负责tenant_id从 1 到 10000。 -
shard2: 负责tenant_id从 10001 到 20000。
在两个独立的 PostgreSQL 实例上,我们创建结构完全相同的 users 表。
Shard 1 (db_shard1) & Shard 2 (db_shard2) DDL:
-- DDL for both shards. Note the check constraint which is crucial for data integrity.
CREATE TABLE users (
id SERIAL PRIMARY KEY,
tenant_id BIGINT NOT NULL,
username VARCHAR(100) NOT NULL,
email VARCHAR(255) UNIQUE NOT NULL,
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
);
-- Optional but highly recommended: add a check constraint to prevent wrong data insertion.
-- For shard1:
ALTER TABLE users ADD CONSTRAINT tenant_id_shard1_check CHECK (tenant_id > 0 AND tenant_id <= 10000);
-- For shard2:
ALTER TABLE users ADD CONSTRAINT tenant_id_shard2_check CHECK (tenant_id > 10000 AND tenant_id <= 20000);
CREATE INDEX idx_users_tenant_id ON users (tenant_id);
-- Insert some sample data for testing
-- On shard1:
INSERT INTO users (tenant_id, username, email) VALUES (123, 'alice_shard1', '[email protected]');
-- On shard2:
INSERT INTO users (tenant_id, username, email) VALUES (15000, 'bob_shard2', '[email protected]');
这里的 CHECK 约束是一个常见的工程实践,它能在数据库层面提供一层保护,防止因应用层或路由层逻辑错误导致数据被写入错误的分片。
第二步:构建事件驱动的控制平面
我们选择 Redis Pub/Sub 作为事件总线,因为它轻量、快速,且 redis-py 库非常易用。控制平面可以是一个简单的管理脚本,在DBA执行完数据库扩容操作后,由这个脚本发布拓扑变更消息。
消息格式必须标准化。我们定义了一个 JSON 结构:
{
"op": "ADD_SHARD", // or "REMOVE_SHARD", "UPDATE_SHARD_RANGE"
"payload": {
"shard_id": "shard3",
"range_start": 20001,
"range_end": 30000,
"dsn": "postgres://user:password@host3:5432/db_shard3"
}
}
控制平面发布脚本 (control_plane.py):
import redis
import json
import os
# 在真实项目中,这些配置应该来自环境变量或配置中心
REDIS_HOST = os.getenv("REDIS_HOST", "localhost")
REDIS_PORT = int(os.getenv("REDIS_PORT", 6379))
TOPOLOGY_CHANNEL = "db_shard_topology_updates"
def publish_topology_update(operation: str, payload: dict):
"""Publishes a shard topology update message to Redis."""
try:
r = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, db=0)
message = {
"op": operation,
"payload": payload
}
message_json = json.dumps(message)
published_count = r.publish(TOPOLOGY_CHANNEL, message_json)
print(f"Published message to '{TOPOLOGY_CHANNEL}': {message_json}")
print(f"Message received by {published_count} subscribers.")
except redis.exceptions.ConnectionError as e:
print(f"Error connecting to Redis: {e}")
except Exception as e:
print(f"An unexpected error occurred: {e}")
if __name__ == '__main__':
# 模拟添加一个新的分片 shard3
new_shard_payload = {
"shard_id": "shard3",
"range_start": 20001,
"range_end": 30000,
# DSN (Data Source Name) 包含连接所需的所有信息
"dsn": "postgres://postgres:mysecretpassword@pg_shard3:5432/postgres"
}
# 确保在运行此脚本之前,相应的数据库实例已经准备就绪
print("Simulating the addition of a new shard (shard3)...")
publish_topology_update("ADD_SHARD", new_shard_payload)
这个脚本非常简单,其核心职责就是将结构化的拓扑信息广播出去。
第三步:编写核心 - Tyk Python 中间件
这是整个架构的核心。我们需要在 Tyk 的 Python 中间件中实现以下逻辑:
- 初始化: 在网关加载插件时,从一个初始配置文件(或元数据API)加载基础分片信息,并启动一个后台线程。
- 后台线程: 这个线程专门负责连接 Redis,订阅
db_shard_topology_updates频道,并持续监听消息。 - 状态更新: 收到拓扑变更消息后,后台线程需要以线程安全的方式更新一个全局的、内存中的路由表。
- 请求处理: 对于每一个进来的API请求,中间件会解析请求体,提取
tenant_id,查询内存路由表,找到对应的数据库DSN。 - 请求重写: 将找到的 DSN 注入到一个自定义的请求头中 (例如
X-Shard-DSN),然后将请求放行到上游服务。
上游服务是一个简单的 “Query Forwarder”,它读取 X-Shard-DSN 头,并使用该 DSN 连接到正确的数据库分片执行操作。这样做的好处是,我们的核心业务逻辑服务可以保持纯粹,而这个 Forwarder 则专门处理数据库连接的细节。
Tyk API 定义
首先,在 Tyk 中定义一个 API,启用自定义 Python 中间件。
tyk_api_definition.json:
{
"name": "Sharded User Service",
"api_id": "sharded-user-service",
"org_id": "1",
"use_keyless": true,
"auth": {
"auth_header_name": ""
},
"definition": {
"location": "header",
"key": "x-api-version"
},
"version_data": {
"not_versioned": true,
"versions": {
"Default": {
"name": "Default",
"expires": "",
"paths": {
"ignored": [],
"white_list": [],
"black_list": []
},
"use_extended_paths": true,
"extended_paths": {
"custom_middleware": {
"path": "/users",
"method": "POST",
"name": "ShardRoutingMiddleware",
"require_session": false
}
}
}
}
},
"proxy": {
"listen_path": "/sharded-user-service/",
"target_url": "http://query-forwarder:8000/",
"strip_listen_path": true
},
"custom_middleware": {
"driver": "cpython",
"pre": [
{
"name": "ShardRoutingMiddleware",
"path": "middleware/middleware.py",
"function_name": "ShardRoutingMiddleware"
}
]
}
}
关键配置是 custom_middleware 部分,它告诉 Tyk 在处理请求前,加载 middleware/middleware.py 文件并执行 ShardRoutingMiddleware 函数。
Python 中间件代码 (middleware.py)
import redis
import json
import threading
import logging
from tyk.gateway import TykGateway as tyk
# 配置日志,这在生产环境中对于调试至关重要
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# --- 线程安全的状态管理 ---
# 这是我们内存中的路由表。key是shard_id,value是包含范围和DSN的字典。
# 在真实项目中,初始状态应从配置文件、环境变量或启动时从配置中心拉取。
SHARD_MAP = {
"shard1": { "range_start": 1, "range_end": 10000, "dsn": "postgres://postgres:mysecretpassword@pg_shard1:5432/postgres" },
"shard2": { "range_start": 10001, "range_end": 20000, "dsn": "postgres://postgres:mysecretpassword@pg_shard2:5432/postgres" }
}
# 读写 SHARD_MAP 时必须使用锁,因为主请求处理线程和后台更新线程会并发访问它。
shard_map_lock = threading.Lock()
# Redis 配置
REDIS_HOST = "redis"
REDIS_PORT = 6379
TOPOLOGY_CHANNEL = "db_shard_topology_updates"
# --- 后台 Redis 订阅线程 ---
def topology_update_listener():
"""
在后台运行,监听Redis Pub/Sub以获取分片拓扑更新。
"""
logger.info("Topology listener thread started.")
while True:
try:
r = redis.Redis(host=REDIS_HOST, port=REDIS_PORT)
p = r.pubsub()
p.subscribe(TOPOLOGY_CHANNEL)
logger.info(f"Subscribed to Redis channel: {TOPOLOGY_CHANNEL}")
for message in p.listen():
if message['type'] == 'message':
logger.info(f"Received topology update: {message['data']}")
try:
data = json.loads(message['data'])
op = data.get("op")
payload = data.get("payload")
if not op or not payload:
logger.error("Invalid message format received.")
continue
with shard_map_lock:
if op == "ADD_SHARD":
shard_id = payload.get("shard_id")
if shard_id:
SHARD_MAP[shard_id] = payload
logger.info(f"Successfully added shard '{shard_id}'. New map: {SHARD_MAP}")
else:
logger.error("ADD_SHARD operation missing 'shard_id'.")
# 此处可以添加处理 REMOVE_SHARD 等其他操作的逻辑
else:
logger.warning(f"Unsupported operation received: {op}")
except json.JSONDecodeError:
logger.error(f"Failed to decode JSON from message: {message['data']}")
except Exception as e:
logger.error(f"Error processing message: {e}")
except redis.exceptions.ConnectionError:
logger.error("Redis connection failed. Retrying in 5 seconds...")
threading.Event().wait(5)
except Exception as e:
logger.error(f"An unexpected error occurred in listener thread: {e}. Restarting...")
threading.Event().wait(5)
# 确保后台线程只启动一次
# Tyk加载Python文件时会执行全局代码,这是启动后台任务的理想位置。
listener_thread = threading.Thread(target=topology_update_listener, daemon=True)
listener_thread.start()
# --- Tyk 中间件入口函数 ---
@tyk.middleware
def ShardRoutingMiddleware(request, session, spec):
"""
这是Tyk为每个请求调用的主处理函数。
"""
logger.info("ShardRoutingMiddleware triggered.")
# 1. 解析请求体以获取分片键
try:
body = json.loads(request.get_body())
tenant_id = body.get("tenant_id")
# 验证分片键是否存在且为整数
if tenant_id is None:
logger.warning("Request body missing 'tenant_id'.")
request.set_return_data(
'{"error": "tenant_id is required"}', 400,
{"Content-Type": "application/json"}
)
return request, session
tenant_id = int(tenant_id)
except (json.JSONDecodeError, ValueError):
logger.warning("Invalid JSON body or non-integer tenant_id.")
request.set_return_data(
'{"error": "Invalid request body or tenant_id format"}', 400,
{"Content-Type": "application/json"}
)
return request, session
# 2. 查询路由表
target_dsn = None
with shard_map_lock: # 使用读锁保护对 SHARD_MAP 的访问
for shard_info in SHARD_MAP.values():
if shard_info["range_start"] <= tenant_id <= shard_info["range_end"]:
target_dsn = shard_info["dsn"]
break
# 3. 处理找不到分片的情况
if not target_dsn:
logger.error(f"No shard found for tenant_id: {tenant_id}")
request.set_return_data(
'{"error": "Configuration error: No shard found for the provided tenant_id"}', 503, # 503 Service Unavailable 更合适
{"Content-Type": "application/json"}
)
return request, session
# 4. 注入 DSN 到请求头
logger.info(f"Routing tenant_id {tenant_id} to DSN ending with ...{target_dsn[-20:]}")
request.set_header("X-Shard-DSN", target_dsn)
# 将请求传递给下一个中间件或上游服务
return request, session
这段代码是整个系统的智能核心。它不仅处理了 happy path,还包含了关键的生产级考量:
- 日志: 详细的日志记录对于排查问题至关重要。
- 线程安全: 使用
threading.Lock确保了在多线程环境下(Tyk 的 worker + 我们的后台线程)对共享状态SHARD_MAP的访问是安全的。这是一个常见的错误点,如果忽略,会导致间歇性的、难以复现的 bug。 - 错误处理: 妥善处理了 JSON 解析失败、
tenant_id缺失、找不到分片等异常情况,并向客户端返回了明确的 HTTP 错误码和信息。 - 韧性: 后台线程中的
try-except块确保了即使 Redis 连接暂时中断,线程也不会崩溃,而是会尝试重连。
第四步:结果验证与架构图
部署完成后,我们来验证整个流程。
- 启动 Tyk Gateway, PostgreSQL 实例, Redis, 和我们的 Query Forwarder 服务。
- 发送一个创建用户的请求,其
tenant_id在shard1的范围内。
curl -X POST http://localhost:8080/sharded-user-service/users \
-H "Content-Type: application/json" \
-d '{
"tenant_id": 123,
"username": "new_user_123",
"email": "[email protected]"
}'
Tyk Gateway 的日志会显示 ShardRoutingMiddleware 触发,并为 tenant_id: 123 找到了 shard1 的 DSN,并将其注入 X-Shard-DSN 头。请求被转发,数据成功写入 shard1。
现在,模拟数据库扩容。运行我们的
control_plane.py脚本。python control_plane.pyTyk Gateway 的日志中,由后台线程打印的日志会立刻显示收到 Redis 消息,并更新了内存中的
SHARD_MAP。无需任何重启或配置重载,立即发送一个
tenant_id在新分片范围内的请求。curl -X POST http://localhost:8080/sharded-user-service/users \ -H "Content-Type: application/json" \ -d '{ "tenant_id": 25000, "username": "new_user_25000", "email": "[email protected]" }'请求成功!Tyk 插件使用了刚刚动态更新的路由表,正确地将请求路由到了
shard3。我们实现了对数据库分片的动态、透明路由。
最终的系统架构如下:
graph TD
subgraph "Clients"
Client[Client Application]
end
subgraph "API Gateway Layer (Tyk)"
TykGateway[Tyk Gateway Instance]
subgraph TykGateway
Plugin[Python ShardRouting Middleware]
InMemoryMap{"In-Memory Shard Map
(Thread-Safe)"}
RedisSubscriber[Redis Subscriber Thread]
Plugin --> InMemoryMap
end
end
subgraph "Upstream Services"
QueryForwarder[Query Forwarder Service]
end
subgraph "Data Layer"
Shard1[PostgreSQL Shard 1
tenant_id: 1-10000]
Shard2[PostgreSQL Shard 2
tenant_id: 10001-20000]
Shard3[PostgreSQL Shard 3
tenant_id: 20001-30000]
end
subgraph "Control Plane"
ControlPlaneScript[DBA/Control Plane Script]
Redis[Redis Pub/Sub
Channel: db_shard_topology_updates]
end
Client -- HTTP Request (with tenant_id) --> TykGateway
TykGateway -- Proxies with X-Shard-DSN header --> QueryForwarder
QueryForwarder -- SQL Query --> Shard1
QueryForwarder -- SQL Query --> Shard2
QueryForwarder -- SQL Query --> Shard3
RedisSubscriber -- Subscribes to --> Redis
ControlPlaneScript -- Publishes update --> Redis
Redis -- Pushes update --> RedisSubscriber
RedisSubscriber -- Updates --> InMemoryMap
style TykGateway fill:#f9f,stroke:#333,stroke-width:2px
style Plugin fill:#ccf,stroke:#333,stroke-width:2px
局限性与未来展望
这个方案优雅地解决了动态分片路由的核心问题,但在真实生产环境中,还有几个方面需要加固。当前实现的 Python 插件中的后台线程模型相对简单,如果 Redis 连接频繁抖动,可能会导致短暂的配置不一致。使用一个更健壮的客户端库,并加入更复杂的重连和状态验证逻辑是必要的。
其次,我们引入了 Query Forwarder 作为额外的网络跳数。虽然它逻辑简单,但毕竟存在性能开销和运维成本。一个更极致的优化是开发一个 Tyk 的 Go 插件。Go 插件能更深入地与 Tyk 的代理引擎集成,甚至可能直接在插件内部管理到后端数据库的连接池,从而完全消除 Query Forwarder 这一层。
此外,当前方案只处理了基于 tenant_id 的写操作路由。对于读操作(例如,通过 username 查询用户),系统需要一个二级索引或散列表来定位 tenant_id,或者将这类查询路由到一个聚合了所有分片数据的只读数据仓库。这超出了路由层的范畴,属于数据架构的另一项挑战。
最后,事件总线的选择。对于需要保证消息必达的超高可靠场景,可以考虑用 Kafka 或 NATS JetStream 替换 Redis Pub/Sub,利用其持久化和至少一次送达的特性,确保任何拓扑变更事件都不会丢失。