使用 Koa GitHub Actions 和分布式锁构建一个健壮的 Python SciPy 计算工作流


我们面临一个棘手的需求:为内部的风控团队提供一个API,能够对上传的数百万条交易记录进行实时的异常点检测。Node.js生态,特别是Koa,处理高并发的I/O请求是家常便饭,但这次的核心计算依赖于Python的SciPy库,特别是其中的信号处理和统计模块。最直接的方案,在Koa服务中通过child_process调用Python脚本,几乎立刻就被否决了。这种做法在生产环境中就是一颗定时炸弹,无法有效管理进程生命周期、资源分配,更别提水平扩展了。

我们需要的不是一个简单的胶水层,而是一个健壮、解耦、可观测的异构计算工作流。最终的技术栈选型看起来有些不寻常,但每个组件都精确地解决了特定问题:

  • Koa: 作为任务调度器和API网关。它极度轻量,其基于async/await的中间件模型非常适合处理任务提交、状态查询这类异步流程。
  • Python + SciPy: 专职的计算单元。部署为独立的Worker进程组,与API服务完全解耦。
  • Redis: 系统的中枢神经。它同时扮演三个角色:任务队列(LIST)、任务状态与结果存储(HASH),以及保证计算过程数据一致性的分布式锁(SETNX)。
  • GitHub Actions: 自动化CI/CD管道,负责构建、测试和部署这个包含两种语言、两种运行环境的复杂系统。

整个架构的核心思想是“关注点分离”。Koa不关心计算细节,只负责任务的生命周期管理。Python Worker不关心API请求,只负责从队列中获取任务并执行计算。

graph TD
    subgraph "CI/CD Pipeline"
        GHA[GitHub Actions]
    end

    subgraph "User Space"
        Client[Client]
    end

    subgraph "API Layer (Node.js)"
        Koa[Koa Service]
    end

    subgraph "Broker & Storage"
        Redis[(Redis)]
    end

    subgraph "Compute Layer (Python)"
        Worker1[SciPy Worker 1]
        Worker2[SciPy Worker 2]
        WorkerN[SciPy Worker N]
    end

    subgraph "Shared Resource"
        Data[Shared Model/Dataset]
    end

    Client -- HTTP POST /calculate --> Koa
    Koa -- 1. LPUSH task --> Redis
    Koa -- 2. HSET job_id status:queued --> Redis
    Koa -- HTTP 202 Accepted (job_id) --> Client

    Worker1 -- 3. BRPOP task --> Redis
    Worker2 -- 3. BRPOP task --> Redis
    WorkerN -- 3. BRPOP task --> Redis

    Worker1 -- 4. Acquire Lock --> Redis
    Worker1 -- 5. Access Resource --> Data
    Worker1 -- 6. Perform SciPy Calc --> Worker1
    Worker1 -- 7. Release Lock --> Redis
    Worker1 -- 8. HSET job_id status:completed, result:data --> Redis

    Client -- HTTP GET /status/job_id --> Koa
    Koa -- 9. HGETALL job_id --> Redis
    Redis -- Job Status & Result --> Koa
    Koa -- Job Status & Result --> Client

    GHA -- Deploy --> Koa
    GHA -- Deploy --> Worker1
    GHA -- Deploy --> Worker2
    GHA -- Deploy --> WorkerN

第一步:Koa任务调度器的实现

Koa层必须做到足够“薄”,它的主要职责是验证输入、创建任务ID、将任务推入队列,并提供一个查询接口。我们使用uuid生成任务ID,用ioredis库与Redis交互。

这是任务提交路由的核心代码。注意,我们没有在请求-响应周期内等待计算结果,而是立即返回202 Accepted和一个任务ID。这是异步任务处理的标准模式。

src/app.js

const Koa = require('koa');
const Router = require('@koa/router');
const bodyParser = require('koa-bodyparser');
const Redis = require('ioredis');
const { v4: uuidv4 } = require('uuid');

const app = new Koa();
const router = new Router();

// Redis 配置
const redisConfig = {
  host: process.env.REDIS_HOST || '127.0.0.1',
  port: process.env.REDIS_PORT || 6379,
  password: process.env.REDIS_PASSWORD || null,
};
const redis = new Redis(redisConfig);

const TASK_QUEUE_KEY = 'scipy_task_queue';
const JOB_STATUS_PREFIX = 'job_status:';

// 日志中间件
app.use(async (ctx, next) => {
  const start = Date.now();
  await next();
  const ms = Date.now() - start;
  console.log(`${ctx.method} ${ctx.url} - ${ctx.status} - ${ms}ms`);
});

router.post('/calculate', async (ctx) => {
  const { data_source, parameters } = ctx.request.body;

  // 在真实项目中,这里会有非常严格的输入验证
  if (!data_source || !Array.isArray(parameters)) {
    ctx.status = 400;
    ctx.body = { error: 'Invalid input: data_source and parameters are required.' };
    return;
  }

  const jobId = uuidv4();
  const task = {
    jobId,
    dataSource: data_source,
    params: parameters,
    submittedAt: new Date().toISOString(),
  };

  try {
    // 将任务元数据存入 Hash
    await redis.hmset(`${JOB_STATUS_PREFIX}${jobId}`, {
      status: 'queued',
      submittedAt: task.submittedAt,
      requestBody: JSON.stringify(ctx.request.body),
    });

    // 将任务推入队列
    await redis.lpush(TASK_QUEUE_KEY, JSON.stringify(task));

    ctx.status = 202;
    ctx.body = {
      message: 'Task accepted for processing.',
      jobId: jobId,
      statusUrl: `/status/${jobId}`,
    };
  } catch (err) {
    console.error(`[KOA] Failed to queue task: ${err.message}`);
    ctx.status = 500;
    ctx.body = { error: 'Internal server error: Could not queue the task.' };
  }
});

router.get('/status/:jobId', async (ctx) => {
  const { jobId } = ctx.params;
  if (!jobId) {
    ctx.status = 400;
    ctx.body = { error: 'Job ID is required.' };
    return;
  }

  try {
    const jobStatus = await redis.hgetall(`${JOB_STATUS_PREFIX}${jobId}`);

    if (Object.keys(jobStatus).length === 0) {
      ctx.status = 404;
      ctx.body = { error: `Job with ID ${jobId} not found.` };
      return;
    }
    
    // 如果存在结果,尝试解析
    if (jobStatus.result) {
        try {
            jobStatus.result = JSON.parse(jobStatus.result);
        } catch (e) {
            // 如果结果不是合法的JSON,可能是一个错误信息,直接返回字符串
            console.warn(`[KOA] Result for job ${jobId} is not valid JSON.`);
        }
    }

    ctx.status = 200;
    ctx.body = jobStatus;
  } catch (err) {
    console.error(`[KOA] Failed to get job status for ${jobId}: ${err.message}`);
    ctx.status = 500;
    ctx.body = { error: 'Internal server error: Could not retrieve job status.' };
  }
});

app.use(bodyParser());
app.use(router.routes()).use(router.allowedMethods());

const PORT = process.env.PORT || 3000;
app.listen(PORT, () => {
  console.log(`Koa task orchestrator running on port ${PORT}`);
});

这个Koa服务非常纯粹,它不包含任何业务逻辑,只做调度,这使得它非常稳定且易于维护。

第二步:Python计算Worker与分布式锁的陷阱

计算Worker是系统的核心。它在一个无限循环中阻塞式地等待任务。一个常见的错误是,当计算涉及到共享资源(例如,更新同一个机器学习模型文件,或处理特定用户的数据分区)时,多个并发的Worker进程会产生数据竞争。

假设我们的SciPy任务是找到一组时序数据中的峰值,并将结果写入一个以data_source命名的文件中。如果两个请求同时处理同一个data_source,文件内容就会被破坏。这时,分布式锁就成了必需品。

一个错误的、非原子的分布式锁实现是这样的:

# 这是一个错误的实现,仅用于演示
def acquire_lock_wrong(redis_conn, lock_name, timeout=10):
    # 非原子操作,可能在 SETNX 和 EXPIRE 之间崩溃
    if redis_conn.setnx(lock_name, "locked"):
        redis_conn.expire(lock_name, timeout)
        return True
    return False

问题在于SETNXEXPIRE是两个独立的命令。如果执行SETNX成功后,Worker进程崩溃,EXPIRE命令永远不会被执行,这个锁就会永久存在,造成死锁。

正确的实现必须利用Redis SET命令的原子性选项。

worker/worker.py:

import redis
import json
import time
import os
import uuid
from scipy.signal import find_peaks
import numpy as np

# Redis 配置
REDIS_HOST = os.getenv('REDIS_HOST', '127.0.0.1')
REDIS_PORT = int(os.getenv('REDIS_PORT', '6379'))
REDIS_PASSWORD = os.getenv('REDIS_PASSWORD', None)

TASK_QUEUE_KEY = 'scipy_task_queue'
JOB_STATUS_PREFIX = 'job_status:'
LOCK_PREFIX = 'lock:'

# 生成一个唯一的标识符,用于安全地释放锁
WORKER_ID = str(uuid.uuid4())

# 连接 Redis
# decode_responses=True 使得从 Redis 获取的 key/value 是字符串而非 bytes
r = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, password=REDIS_PASSWORD, decode_responses=True)

def acquire_lock(conn, lock_name, acquire_timeout=10, lock_timeout=60):
    """
    获取分布式锁的生产级实现
    :param conn: redis 连接
    :param lock_name: 锁的名称
    :param acquire_timeout: 获取锁的超时时间 (秒)
    :param lock_timeout: 锁的持有超时时间 (秒)
    :return: 成功返回锁的标识,失败返回 None
    """
    end_time = time.time() + acquire_timeout
    lock_key = f"{LOCK_PREFIX}{lock_name}"

    while time.time() < end_time:
        # 使用 SET 命令的 NX 和 PX 选项保证原子性
        # NX: 只在 key 不存在时设置
        # PX: 设置过期时间,单位是毫秒
        if conn.set(lock_key, WORKER_ID, nx=True, px=lock_timeout * 1000):
            return WORKER_ID # 成功获取锁
        time.sleep(0.01) # 短暂休眠,避免CPU空转
    
    return None # 获取锁超时

# 使用 Lua 脚本保证释放锁的原子性,避免误删其他 worker 的锁
# 脚本会先检查锁的值是否与当前 worker 的 ID 匹配,匹配才删除
LUA_RELEASE_SCRIPT = """
if redis.call("get", KEYS[1]) == ARGV[1] then
    return redis.call("del", KEYS[1])
else
    return 0
end
"""
release_lock_script = r.register_script(LUA_RELEASE_SCRIPT)

def release_lock(conn, lock_name, identifier):
    """安全地释放锁"""
    lock_key = f"{LOCK_PREFIX}{lock_name}"
    release_lock_script(keys=[lock_key], args=[identifier])

def perform_calculation(data):
    """
    一个模拟的、耗时的 SciPy 计算任务
    这里我们用 find_peaks 举例
    """
    # 模拟从某个地方加载数据
    # 在真实场景中,这可能是从 S3, HDFS 或数据库读取
    series = np.array(data)
    # 核心计算逻辑
    peaks, _ = find_peaks(series, height=0.5, distance=10)
    
    # 模拟IO操作,增加任务耗时
    time.sleep(5)
    
    return {"peaks_indices": peaks.tolist(), "count": len(peaks)}


def main_loop():
    print(f"Worker {WORKER_ID} started. Waiting for tasks...")
    while True:
        try:
            # BRPOP 是阻塞式操作,直到有任务或超时
            _, task_json = r.brpop(TASK_QUEUE_KEY, timeout=0)
            task = json.loads(task_json)
            job_id = task['jobId']
            data_source = task['dataSource']

            print(f"[{WORKER_ID}] Received job {job_id} for data_source: {data_source}")

            # 更新任务状态为 'processing'
            r.hset(f"{JOB_STATUS_PREFIX}{job_id}", mapping={
                'status': 'processing',
                'workerId': WORKER_ID,
                'startedAt': time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime())
            })
            
            # 核心:为每个 data_source 获取锁
            lock_identifier = acquire_lock(r, data_source, lock_timeout=120) # 锁持有2分钟

            if not lock_identifier:
                print(f"[{WORKER_ID}] Failed to acquire lock for {data_source}. Re-queueing task.")
                # 获取锁失败,将任务重新放回队列头部,并标记为失败
                r.hset(f"{JOB_STATUS_PREFIX}{job_id}", mapping={
                    'status': 'failed',
                    'error': 'Failed to acquire resource lock, task will be retried.'
                })
                r.lpush(TASK_QUEUE_KEY, task_json) # 放回队列
                continue

            try:
                print(f"[{WORKER_ID}] Lock acquired for {data_source}.")
                # 假设参数是我们要处理的数据
                result = perform_calculation(task['params'])
                
                # 任务完成,更新状态和结果
                r.hset(f"{JOB_STATUS_PREFIX}{job_id}", mapping={
                    'status': 'completed',
                    'result': json.dumps(result),
                    'finishedAt': time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime())
                })
                print(f"[{WORKER_ID}] Job {job_id} completed successfully.")

            except Exception as e:
                # 计算过程中发生异常
                print(f"[{WORKER_ID}] Error processing job {job_id}: {e}")
                r.hset(f"{JOB_STATUS_PREFIX}{job_id}", mapping={
                    'status': 'failed',
                    'error': str(e),
                    'finishedAt': time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime())
                })
            finally:
                # 无论成功失败,都必须释放锁
                release_lock(r, data_source, lock_identifier)
                print(f"[{WORKER_ID}] Lock released for {data_source}.")

        except redis.exceptions.RedisError as e:
            print(f"Redis connection error: {e}. Retrying in 5 seconds...")
            time.sleep(5)
        except Exception as e:
            # 捕获其他所有异常,防止 worker 崩溃
            print(f"An unexpected error occurred in main loop: {e}")
            time.sleep(1)

if __name__ == "__main__":
    main_loop()

这段Worker代码体现了生产级应用的几个关键点:

  1. 原子性锁conn.set(lock_key, WORKER_ID, nx=True, px=lock_timeout * 1000)确保了加锁和设置超时是一个原子操作。
  2. 锁归属:锁的值是一个唯一的WORKER_ID。这防止了一个Worker错误地释放了另一个Worker持有的锁(例如,前一个Worker因为GC暂停导致锁超时,但它自己不知道)。
  3. 安全的锁释放:使用Lua脚本来确保“检查锁的值”和“删除锁”这两个操作的原子性。这是释放分布式锁的标准实践。
  4. finally:无论计算成功还是失败,锁的释放操作都必须在finally块中执行,保证资源不被永久锁定。
  5. 健壮的循环:主循环包裹在try...except中,可以捕获包括Redis连接错误在内的异常,防止整个进程因单个任务失败而退出。

第三步:用GitHub Actions自动化异构环境的CI/CD

管理Node.js和Python两个环境的构建、测试、打包和部署,手动操作是不可靠的。GitHub Actions提供了一个统一的平台来编排这一切。我们的工作流文件 .github/workflows/main.yml 大致如下:

name: CI/CD for Koa SciPy Service

on:
  push:
    branches: [ main ]
  pull_request:
    branches: [ main ]

env:
  DOCKER_IMAGE_KOA: your-docker-registry/koa-orchestrator
  DOCKER_IMAGE_WORKER: your-docker-registry/scipy-worker

jobs:
  test-and-build:
    runs-on: ubuntu-latest
    strategy:
      matrix:
        service: ['koa-app', 'python-worker']
    
    steps:
    - uses: actions/checkout@v3

    # --- Node.js (Koa App) Steps ---
    - name: Set up Node.js
      if: matrix.service == 'koa-app'
      uses: actions/setup-node@v3
      with:
        node-version: '18'
        cache: 'npm'
        cache-dependency-path: 'package-lock.json' # 指向你的Koa项目路径
        
    - name: Install Node.js dependencies
      if: matrix.service == 'koa-app'
      run: npm ci

    - name: Run Node.js tests
      if: matrix.service == 'koa-app'
      run: npm test # 假设你有测试脚本

    # --- Python (SciPy Worker) Steps ---
    - name: Set up Python
      if: matrix.service == 'python-worker'
      uses: actions/setup-python@v4
      with:
        python-version: '3.10'
        cache: 'pip'
        cache-dependency-path: 'worker/requirements.txt'

    - name: Install Python dependencies
      if: matrix.service == 'python-worker'
      run: |
        python -m pip install --upgrade pip
        pip install -r worker/requirements.txt

    - name: Run Python linting and tests
      if: matrix.service == 'python-worker'
      run: |
        pip install flake8 pytest
        flake8 worker/
        pytest worker/tests/ # 假设你有测试

    # --- Docker Build and Push (only on main branch push) ---
    - name: Log in to Docker Hub
      if: github.event_name == 'push' && github.ref == 'refs/heads/main'
      uses: docker/login-action@v2
      with:
        username: ${{ secrets.DOCKERHUB_USERNAME }}
        password: ${{ secrets.DOCKERHUB_TOKEN }}

    - name: Build and push Koa Docker image
      if: github.event_name == 'push' && github.ref == 'refs/heads/main' && matrix.service == 'koa-app'
      uses: docker/build-push-action@v4
      with:
        context: . # Dockerfile 路径
        file: ./Dockerfile.koa # 为Koa服务准备的Dockerfile
        push: true
        tags: ${{ env.DOCKER_IMAGE_KOA }}:latest

    - name: Build and push Worker Docker image
      if: github.event_name == 'push' && github.ref == 'refs/heads/main' && matrix.service == 'python-worker'
      uses: docker/build-push-action@v4
      with:
        context: .
        file: ./worker/Dockerfile.worker # 为Python Worker准备的Dockerfile
        push: true
        tags: ${{ env.DOCKER_IMAGE_WORKER }}:latest

  deploy:
    needs: test-and-build
    runs-on: ubuntu-latest
    if: github.event_name == 'push' && github.ref == 'refs/heads/main'
    steps:
    - name: Deploy to Production
      # 这里的步骤高度依赖你的部署环境 (e.g., SSH to server, kubectl apply, etc.)
      # 这是一个示例,通过SSH更新服务
      run: |
        echo "Deploying new images..."
        ssh user@your-server "docker pull ${{ env.DOCKER_IMAGE_KOA }}:latest && \
                             docker pull ${{ env.DOCKER_IMAGE_WORKER }}:latest && \
                             docker-compose up -d --force-recreate koa-app scipy-worker"
        echo "Deployment complete."

这个CI/CD流程的关键在于使用了matrix策略,它为koa-apppython-worker并行创建了两个独立的job。每个job根据matrix.service的值选择性地执行Node.js或Python的设置、安装和测试步骤。只有当main分支有push事件时,才会执行构建和推送Docker镜像以及后续的部署步骤。这种方式保证了两个服务的独立验证和集成部署。

sequenceDiagram
    participant Dev
    participant GitHub
    participant ActionsRunner
    participant DockerRegistry
    participant ProductionServer

    Dev->>GitHub: git push origin main
    GitHub->>ActionsRunner: Trigger Workflow
    
    par Test Koa App and Test SciPy Worker
        ActionsRunner->>ActionsRunner: Job: test-and-build (matrix: koa-app)
        Note right of ActionsRunner: setup node, npm ci, npm test
        
        ActionsRunner->>ActionsRunner: Job: test-and-build (matrix: python-worker)
        Note right of ActionsRunner: setup python, pip install, pytest
    end

    Note over ActionsRunner: Both test jobs must pass
    
    par Build & Push Koa Image and Build & Push Worker Image
      ActionsRunner->>DockerRegistry: docker push koa-orchestrator:latest
      ActionsRunner->>DockerRegistry: docker push scipy-worker:latest
    end

    ActionsRunner->>ProductionServer: Job: deploy (via SSH)
    ProductionServer->>DockerRegistry: docker pull koa-orchestrator
    ProductionServer->>DockerRegistry: docker pull scipy-worker
    ProductionServer->>ProductionServer: docker-compose up -d --force-recreate

局限性与未来迭代方向

当前这套基于Redis的实现虽然能解决问题,但也并非银弹。它的健壮性高度依赖于Redis单点的稳定性。如果Redis发生主从切换,且客户端没有正确处理连接中断,可能会导致锁状态的短暂不一致。对于金融级别或要求绝对数据一致性的场景,基于Raft或Paxos协议的共识系统(如etcd、ZooKeeper)提供的分布式锁会是更可靠的选择,但它们也带来了更高的运维复杂性。

此外,直接使用Redis的LIST作为任务队列功能较为基础,不支持任务优先级、延迟任务、复杂的路由策略等。如果业务发展需要更精细化的任务调度,引入一个专业的任务队列中间件,如RabbitMQ或Celery(配合Redis/RabbitMQ作为Broker),将是自然而然的演进路径。

最后,当前的Worker是无状态的,但如果计算任务需要加载大型模型(GB级别),每次启动Worker都重新加载模型会非常低效。未来的优化可以考虑实现一个“有状态”的Worker,它在启动时加载模型并常驻内存,只在模型更新时才进行热重载,这将显著降低高频计算任务的启动延迟。


  目录