构建基于 Kotlin Multiplatform 的强化学习感知型 EKS 自定义调度器并以 mTLS 强化安全


默认的 Kubernetes 调度器在通用工作负载上表现出色,但对于分布式强化学习(RL)这类通信密集型任务,其标准的 Pod 打散策略往往会成为性能瓶颈。一个典型的 RL 训练作业通常包含一个参数服务器或协调器(Master)和多个执行环境交互的执行器(Worker)。当这些 Worker Pod 被调度到不同的可用区(AZ)甚至只是不同的物理节点时,梯度同步和状态更新引入的跨节点网络延迟会严重拖慢整个训练周期。

我们的挑战是:在 AWS EKS 集群上,为 RL 训练作业实现一个能够感知其拓扑结构、并将关联 Pod 尽可能紧密部署的调度策略。我们选择不编写 Go 语言的调度器插件,而是利用 Scheduler Extender 机制,这样就可以使用我们团队更擅长的技术栈——Kotlin。更进一步,我们希望将核心调度算法封装在一个 Kotlin Multiplatform (KMP) 模块中,以便未来能在后端、CLI 工具或模拟器中复用。整个通信链路的安全性是生产环境的硬性要求,因此 API Server 与 Extender 之间必须通过 mTLS 进行双向认证通信。

架构决策:Scheduler Extender vs. Scheduler Plugin

在 Kubernetes 中扩展调度能力主要有两种路径:调度框架插件(Scheduler Framework Plugins)和调度器扩展器(Scheduler Extender)。

方案A:调度框架插件
这是 Kubernetes 1.15+ 推荐的现代方式。通过实现 FilterScoreBind 等扩展点接口,将自定义逻辑编译进 kube-scheduler 二进制文件或作为一个独立的调度器配置文件运行。

  • 优势:
    • 性能最高,因为是进程内调用,无网络开销。
    • 与核心调度流程紧密集成,状态管理更直接。
  • 劣势:
    • 必须使用 Go 语言开发。
    • 开发、调试和部署周期更长,需要重新编译或管理独立的调度器实例。
    • 对于我们希望使用 Kotlin Multiplatform 的目标,此方案直接被排除。

方案B:调度器扩展器
一个存在已久的机制,kube-scheduler 通过 webhook 的方式,在调度周期的特定阶段(FilterPrioritize)调用一个外部的 HTTP(S) 服务。

  • 优势:
    • 技术栈无关。任何能提供 HTTP 端点的语言都可以,这为我们使用 Kotlin/Ktor 打开了大门。
    • 独立部署和迭代。Extender 是一个普通的 Kubernetes Deployment,与 kube-scheduler 解耦。
    • 逻辑简单,本质上是实现几个 HTTP API。
  • 劣势:
    • 引入了网络延迟。每次调度 Pod 都可能需要一次 HTTP round-trip。
    • Extender 成为调度路径上的关键故障点,必须保证其高可用。

最终选择与理由
我们选择方案B:Scheduler Extender。最核心的驱动因素是技术栈的自由度,这使得我们能够实践 KMP 的构想。对于网络延迟问题,在 AWS EKS 环境中,通过将 Extender Pod 与控制平面节点置于同一 VPC 和子网,可以将延迟控制在毫秒级,对于调度决策而言是可以接受的。其高可用性则可以通过标准的 Kubernetes Deployment 配置多副本和 PodAntiAffinity 来保障。这个权衡在我们的场景中是值得的。

核心实现概览

我们的实现包含三个主要部分:

  1. 核心调度逻辑 (Kotlin Multiplatform commonMain): 一个纯粹的、无依赖的 Kotlin 模块,实现对节点和 Pod 的评分算法。
  2. 调度器扩展器服务 (Ktor jvmMain): 一个基于 Ktor 的 JVM 应用,暴露 HTTP 端点供 kube-scheduler 调用,并配置 mTLS。
  3. 单元测试 (Vitest jsMain): KMP 模块被编译成 JavaScript,我们使用 Vitest 在 Node.js 环境中对其核心算法进行单元测试。

以下是整体架构的交互流程:

sequenceDiagram
    participant User
    participant Kube API Server
    participant Kube Scheduler
    participant RL Scheduler Extender
    participant Node

    User->>Kube API Server: kubectl apply -f rl-job.yaml
    Kube API Server->>Kube Scheduler: New Pod to schedule
    Kube Scheduler->>RL Scheduler Extender: POST /filter (HTTPS/mTLS)
    RL Scheduler Extender-->>Kube Scheduler: Filtered Nodes
    Kube Scheduler->>RL Scheduler Extender: POST /prioritize (HTTPS/mTLS)
    RL Scheduler Extender-->>Kube Scheduler: Scored Nodes
    Kube Scheduler->>Kube API Server: Bind Pod to selected Node
    Kube API Server->>Node: Create Pod

1. 核心调度逻辑:Kotlin Multiplatform 公共模块

项目结构遵循 KMP 标准。核心逻辑位于 commonMain。我们的调度策略很简单:如果一个 Pod 带有 rl-job-group 标签,我们会优先将它调度到已经有相同标签 Pod 存在的节点上。

src/commonMain/kotlin/com/example/rlscheduler/SchedulerLogic.kt:

package com.example.rlscheduler

// K8s Pod 和 Node 的简化数据模型,使其与平台无关
// 在真实项目中,这些会更复杂,并使用 kotlinx.serialization
data class Pod(val name: String, val labels: Map<String, String>)
data class Node(val name: String, val podsOnNode: List<Pod>)
data class NodeScore(val nodeName: String, val score: Int)

const val RL_JOB_LABEL = "rl-job-group"
const val MAX_SCORE = 10
const val MIN_SCORE = 0

/**
 * RL调度算法的核心实现.
 * 这是一个纯函数,易于在任何平台测试.
 */
object SchedulerLogic {

    /**
     * 为一组节点打分,决定哪个节点最适合放置新的RL Pod.
     * @param podToSchedule 待调度的Pod.
     * @param nodes 集群中可用的节点列表.
     * @return 返回每个节点及其得分的列表.
     */
    fun prioritize(podToSchedule: Pod, nodes: List<Node>): List<NodeScore> {
        val jobGroup = podToSchedule.labels[RL_JOB_LABEL]
            ?: return nodes.map { NodeScore(it.name, MIN_SCORE) } // 如果不是RL Pod,给予最低分,让默认调度器决定

        // 这里的核心是检查每个节点上是否已存在同一RL作业组的Pod
        return nodes.map { node ->
            val hasSiblingPod = node.podsOnNode.any { existingPod ->
                existingPod.labels[RL_JOB_LABEL] == jobGroup
            }
            // 如果节点上已存在兄弟Pod,给予最高分,以实现亲和性
            val score = if (hasSiblingPod) MAX_SCORE else MIN_SCORE
            NodeScore(node.name, score)
        }
    }

    /**
     * 过滤掉不满足条件的节点.
     * 在这个简单实现中,我们不过滤任何节点,让所有节点都参与评分.
     * 在真实项目中,这里可能会检查GPU型号、内存等硬性要求.
     * @param nodes 集群中可用的节点列表.
     * @return 返回可以通过过滤的节点名称列表.
     */
    fun filter(nodes: List<Node>): List<String> {
        // 生产级的实现会在这里检查节点的硬性资源约束
        // 例如:检查节点是否有可用的GPU,或者特定的CPU架构
        // for (node in nodes) { if (!node.hasGpu) continue }
        return nodes.map { it.name }
    }
}

2. Extender 服务:基于 Ktor 的 mTLS 端点

jvmMain 包含了 Ktor 服务器的实现。这里的关键是正确配置 mTLS 和处理来自 kube-scheduler 的请求体。

build.gradle.kts 中需要加入 Ktor 相关依赖:

// build.gradle.kts (jvmMain source set dependencies)
dependencies {
    implementation("io.ktor:ktor-server-core:$ktorVersion")
    implementation("io.ktor:ktor-server-cio:$ktorVersion")
    implementation("io.ktor:ktor-server-content-negotiation:$ktorVersion")
    implementation("io.ktor:ktor-serialization-kotlinx-json:$ktorVersion")
    implementation("ch.qos.logback:logback-classic:1.4.11")
}

首先,定义与 Kubernetes API 兼容的数据结构。

src/jvmMain/kotlin/com/example/rlscheduler/K8sApiModels.kt:

package com.example.rlscheduler.k8s

import kotlinx.serialization.Serializable

// 这些模型精确匹配Kubernetes Extender Webhook的JSON结构
@Serializable
data class ExtenderArgs(
    val Pod: Pod,
    val Nodes: NodeList,
)

@Serializable
data class NodeList(
    val items: List<Node> = emptyList()
)

@Serializable
data class Pod(
    val metadata: Metadata
)

@Serializable
data class Node(
    val metadata: Metadata,
    val status: NodeStatus
)

@Serializable
data class Metadata(
    val name: String,
    val labels: Map<String, String> = emptyMap()
)

@Serializable
data class NodeStatus(
    // 此处可以添加更多节点状态信息,如容量、已分配资源等
    val capacity: Map<String, String> = emptyMap()
)

@Serializable
data class ExtenderFilterResult(
    val NodeNames: List<String>? = null,
    val FailedNodes: Map<String, String>? = null,
    val Error: String? = null
)

@Serializable
data class HostPriority(
    val host: String,
    val score: Int
)

接下来是 Ktor 服务器的实现,包括 mTLS 配置。

src/jvmMain/kotlin/com/example/rlscheduler/Server.kt:

package com.example.rlscheduler

import com.example.rlscheduler.k8s.*
import io.ktor.serialization.kotlinx.json.*
import io.ktor.server.application.*
import io.ktor.server.cio.*
import io.ktor.server.engine.*
import io.ktor.server.plugins.contentnegotiation.*
import io.ktor.server.request.*
import io.ktor.server.response.*
import io.ktor.server.routing.*
import java.io.File
import java.security.KeyStore
import org.slf4j.LoggerFactory

private val logger = LoggerFactory.getLogger("RLSchedulerExtender")

fun main() {
    // 从环境变量或挂载的Secret中获取证书路径和密码
    val keyStorePath = System.getenv("TLS_KEYSTORE_PATH") ?: "server.p12"
    val keyStorePassword = System.getenv("TLS_KEYSTORE_PASSWORD") ?: "changeit"
    val trustStorePath = System.getenv("TLS_TRUSTSTORE_PATH") ?: "ca.p12"
    val trustStorePassword = System.getenv("TLS_TRUSTSTORE_PASSWORD") ?: "changeit"
    
    val keyStoreFile = File(keyStorePath)
    if (!keyStoreFile.exists()) {
        logger.error("Keystore file not found at: ${keyStoreFile.absolutePath}")
        return
    }

    val keyStore = KeyStore.getInstance("PKCS12").apply {
        keyStoreFile.inputStream().use { load(it, keyStorePassword.toCharArray()) }
    }

    val trustStore = KeyStore.getInstance("PKCS12").apply {
        File(trustStorePath).inputStream().use { load(it, trustStorePassword.toCharArray()) }
    }
    
    val environment = applicationEngineEnvironment {
        log = logger
        
        // 配置HTTPS/mTLS连接器
        sslConnector(
            keyStore = keyStore,
            keyAlias = "server", // p12文件中的密钥别名
            keyStorePassword = { keyStorePassword.toCharArray() },
            privateKeyPassword = { keyStorePassword.toCharArray() }) {
            port = 8443
            host = "0.0.0.0"

            // 关键:要求客户端提供证书并使用我们的CA进行验证
            trustStore = trustStore
            clientAuth = io.ktor.network.tls.ClientAuth.REQUIRE
        }
        
        module {
            install(ContentNegotiation) {
                json()
            }
            
            routing {
                post("/filter") {
                    try {
                        val args = call.receive<ExtenderArgs>()
                        logger.info("Received filter request for pod: ${args.Pod.metadata.name}")
                        
                        // 调用KMP公共模块的逻辑
                        val availableNodeNames = SchedulerLogic.filter(
                            args.Nodes.items.map { node ->
                                // 此处需要一个转换层,将k8s模型转换为我们纯粹的领域模型
                                // 简单起见,这里直接传递节点名
                                Node(node.metadata.name, emptyList())
                            }
                        )
                        
                        call.respond(ExtenderFilterResult(NodeNames = availableNodeNames))
                    } catch (e: Exception) {
                        logger.error("Error during filter operation", e)
                        call.respond(ExtenderFilterResult(Error = e.message))
                    }
                }
                
                post("/prioritize") {
                    try {
                        val args = call.receive<ExtenderArgs>()
                        logger.info("Received prioritize request for pod: ${args.Pod.metadata.name}")

                        // 将K8s模型转换为我们共享的、纯净的领域模型
                        val podToSchedule = Pod(
                            name = args.Pod.metadata.name,
                            labels = args.Pod.metadata.labels
                        )
                        
                        // 在真实世界中,我们需要从API Server获取节点上所有Pod的信息
                        // 为了简化,这里假设`args.Nodes`已经包含了所需信息,但实际并非如此
                        // 生产级实现需要一个Kubernetes客户端来获取更完整的集群状态
                        val nodesWithPods = args.Nodes.items.map { node ->
                             Node(node.metadata.name, emptyList()) // 简化:假设节点上没有Pod
                        }
                        
                        val scores = SchedulerLogic.prioritize(podToSchedule, nodesWithPods)
                        val hostPriorities = scores.map { HostPriority(it.nodeName, it.score) }
                        
                        call.respond(hostPriorities)
                    } catch (e: Exception) {
                        logger.error("Error during prioritize operation", e)
                        call.respond(emptyList<HostPriority>())
                    }
                }
            }
        }
    }
    
    embeddedServer(CIO, environment).start(wait = true)
}

3. Kubernetes 配置与部署

首先,我们需要为 mTLS 生成证书。

证书生成 (使用 openssl)

# 1. 生成 CA 私钥和证书
openssl genrsa -out ca.key 4096
openssl req -new -x509 -days 3650 -key ca.key -out ca.crt -subj "/CN=rl-scheduler-ca"

# 2. 生成 Server 私钥和证书签名请求 (CSR)
openssl genrsa -out server.key 4096
# 这里的CN必须匹配Extender Service的DNS名称
openssl req -new -key server.key -out server.csr -subj "/CN=rl-scheduler-extender.default.svc"

# 3. 使用 CA 签署 Server 证书
openssl x509 -req -days 365 -in server.csr -CA ca.crt -CAkey ca.key -CAcreateserial -out server.crt

# 4. 为Ktor服务器创建PKCS12密钥库
openssl pkcs12 -export -out server.p12 -inkey server.key -in server.crt -password pass:changeit -name server

# 5. 为kube-scheduler客户端创建PKCS12信任库
openssl pkcs12 -export -out ca.p12 -in ca.crt -nokeys -password pass:changeit -name ca

接下来是将这些证书和应用部署到 EKS。

k8s/deployment.yaml

apiVersion: v1
kind: Secret
metadata:
  name: rl-scheduler-tls
type: Opaque
data:
  # Base64编码后的server.p12和ca.p12文件内容
  server.p12: LS0tLS1CR...
  ca.p12: LS0tLS1CR...
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: rl-scheduler-extender
  labels:
    app: rl-scheduler-extender
spec:
  replicas: 2
  selector:
    matchLabels:
      app: rl-scheduler-extender
  template:
    metadata:
      labels:
        app: rl-scheduler-extender
    spec:
      containers:
      - name: scheduler
        image: your-repo/rl-scheduler-extender:latest # 替换为你的Docker镜像
        ports:
        - containerPort: 8443
        env:
        - name: TLS_KEYSTORE_PATH
          value: /etc/tls/server.p12
        - name: TLS_TRUSTSTORE_PATH
          value: /etc/tls/ca.p12
        - name: TLS_KEYSTORE_PASSWORD
          value: "changeit" # 生产环境应使用Secret Key引用
        - name: TLS_TRUSTSTORE_PASSWORD
          value: "changeit"
        volumeMounts:
        - name: tls-volume
          mountPath: /etc/tls
          readOnly: true
      volumes:
      - name: tls-volume
        secret:
          secretName: rl-scheduler-tls
---
apiVersion: v1
kind: Service
metadata:
  name: rl-scheduler-extender
spec:
  selector:
    app: rl-scheduler-extender
  ports:
  - protocol: TCP
    port: 443
    targetPort: 8443

调度器策略配置 (scheduler-policy.json)

{
  "kind": "Policy",
  "apiVersion": "v1",
  "extenders": [
    {
      "urlPrefix": "https://rl-scheduler-extender.default.svc/api",
      "filterVerb": "filter",
      "prioritizeVerb": "prioritize",
      "weight": 1,
      "enableHTTPS": true,
      "tlsConfig": {
        "insecure": false,
        "caFile": "/etc/kubernetes/pki/extender/ca.crt",
        "certFile": "/etc/kubernetes/pki/extender/apiserver-client.crt",
        "keyFile": "/etc/kubernetes/pki/extender/apiserver-client.key"
      },
      "httpTimeout": 5000000000,
      "nodeCacheCapable": false
    }
  ]
}

要在 EKS 上应用此策略,需要修改位于控制平面节点上的 /etc/kubernetes/manifests/kube-scheduler.yaml 文件,添加策略文件路径和证书卷挂载。这是一个有风险的操作,在真实项目中通常会通过 EKS Managed Node Groups 的启动模板或自定义 AMI 来固化。

4. 使用 Vitest 测试 KMP 核心逻辑

KMP 的魅力在于 commonMain 模块可以被编译成 JavaScript。我们可以利用这一点,在 Node.js 环境中使用现代测试框架 Vitest 来测试我们的调度算法。

build.gradle.kts 中配置 JS 目标:

// build.gradle.kts
kotlin {
    js(IR) {
        browser()
        binaries.executable()
    }
    // ... jvm target
    sourceSets {
        // ...
        val jsMain by getting {
            dependencies {
                // No specific dependencies needed for pure logic
            }
        }
    }
}

运行 ./gradlew jsBrowserDistribution 会在 build/dist/js/productionExecutable 目录下生成 JS 文件。

js-tests/scheduler.test.ts:

import { describe, it, expect } from 'vitest';
// 导入从Kotlin编译过来的JS模块
// 路径和模块名取决于Kotlin编译器的输出
const SchedulerLogic = require('../build/dist/js/productionExecutable/my-project.js').com.example.rlscheduler.SchedulerLogic;

describe('SchedulerLogic.prioritize', () => {

  const podToSchedule = { name: 'rl-worker-3', labels: { 'rl-job-group': 'job-alpha' } };

  it('should give max score to a node with a sibling pod', () => {
    const nodes = [
      { 
        name: 'node-1', 
        podsOnNode: [{ name: 'other-pod', labels: {} }] 
      },
      { 
        name: 'node-2', 
        podsOnNode: [{ name: 'rl-worker-1', labels: { 'rl-job-group': 'job-alpha' } }] 
      },
    ];

    const scores = SchedulerLogic.prioritize(podToSchedule, nodes);
    
    expect(scores).toContainEqual({ nodeName: 'node-1', score: 0 });
    expect(scores).toContainEqual({ nodeName: 'node-2', score: 10 });
  });

  it('should give min score to all nodes if no sibling pods exist', () => {
    const nodes = [
      { name: 'node-1', podsOnNode: [] },
      { name: 'node-2', podsOnNode: [{ name: 'other-job-pod', labels: { 'rl-job-group': 'job-beta' } }] },
    ];

    const scores = SchedulerLogic.prioritize(podToSchedule, nodes);
    
    expect(scores).toContainEqual({ nodeName: 'node-1', score: 0 });
    expect(scores).toContainEqual({ nodeName: 'node-2', score: 0 });
  });

  it('should give min score if the pod to schedule is not an RL pod', () => {
    const nonRlPod = { name: 'nginx-pod', labels: {} };
    const nodes = [
      { name: 'node-1', podsOnNode: [{ name: 'rl-worker-1', labels: { 'rl-job-group': 'job-alpha' } }] },
    ];

    const scores = SchedulerLogic.prioritize(nonRlPod, nodes);
    expect(scores[0].score).toBe(0);
  });
});

这种测试方法验证了核心算法的正确性,并且完全独立于 JVM 和 Kubernetes 环境,执行速度极快,非常适合在 CI 流程中快速反馈。

局限性与未来展望

当前这个实现只是一个起点。一个生产级的 RL 调度器远比这复杂。

  1. 状态感知不足: 我们的 Extender 是无状态的。它不知道节点上真实的、实时的 Pod 分布。一个真正的实现需要一个 Kubernetes 客户端,在每次请求时或者通过 Informer 缓存来查询集群的完整状态,这会显著增加实现的复杂度。
  2. 调度算法过于简单: 实际的 RL 调度可能需要考虑 GPU 拓扑(例如 NVLink)、节点间的网络带宽、或者训练作业不同阶段的资源需求变化。算法本身甚至可以由一个小的强化学习模型来驱动,实现调度的元学习。
  3. Extender 的性能瓶颈: 尽管在同一 VPC 内延迟很低,但对于需要极高调度吞吐率的大规模集群,HTTP webhook 模式终将成为瓶颈。在那种规模下,回归到使用 Go 编写的原生调度器插件是更务实的选择。

尽管存在这些局限,该架构展示了一种灵活且强大的方式来扩展 Kubernetes,同时利用 Kotlin Multiplatform 的代码复用能力。它允许团队在不深入 Go 和 Kubernetes 源码的情况下,快速原型化并部署特定于工作负载的调度策略,同时通过 mTLS 确保了生产环境所要求的安全基线。


  目录