From 72f14f8fd4d37fe90e6c92f3baccb7816fb21617 Mon Sep 17 00:00:00 2001 From: Saadi Myftija Date: Wed, 25 Mar 2026 16:57:37 +0100 Subject: [PATCH 1/4] feat(supervisor): schedule-tree node affinity --- apps/supervisor/src/env.ts | 7 ++ apps/supervisor/src/index.ts | 1 + .../src/workloadManager/kubernetes.ts | 88 +++++++++++++++++-- apps/supervisor/src/workloadManager/types.ts | 3 +- .../src/engine/systems/dequeueSystem.ts | 3 +- packages/core/src/v3/schemas/runEngine.ts | 2 + 6 files changed, 95 insertions(+), 9 deletions(-) diff --git a/apps/supervisor/src/env.ts b/apps/supervisor/src/env.ts index faf34bcd025..76ffb2b48f9 100644 --- a/apps/supervisor/src/env.ts +++ b/apps/supervisor/src/env.ts @@ -117,6 +117,13 @@ const Env = z.object({ KUBERNETES_PROJECT_AFFINITY_WEIGHT: z.coerce.number().int().min(1).max(100).default(50), KUBERNETES_PROJECT_AFFINITY_TOPOLOGY_KEY: z.string().trim().min(1).default("kubernetes.io/hostname"), + // Schedule affinity settings - runs from schedule trees prefer a dedicated pool + KUBERNETES_SCHEDULE_AFFINITY_ENABLED: BoolEnv.default(false), + KUBERNETES_SCHEDULE_AFFINITY_POOL_LABEL_KEY: z.string().default("node.cluster.x-k8s.io/machinepool"), + KUBERNETES_SCHEDULE_AFFINITY_POOL_LABEL_VALUE: z.string().default("scheduled-runs"), + KUBERNETES_SCHEDULE_AFFINITY_WEIGHT: z.coerce.number().int().min(1).max(100).default(80), + KUBERNETES_SCHEDULE_ANTI_AFFINITY_WEIGHT: z.coerce.number().int().min(1).max(100).default(20), + // Placement tags settings PLACEMENT_TAGS_ENABLED: BoolEnv.default(false), PLACEMENT_TAGS_PREFIX: z.string().default("node.cluster.x-k8s.io"), diff --git a/apps/supervisor/src/index.ts b/apps/supervisor/src/index.ts index 0e274b30390..bcd68318246 100644 --- a/apps/supervisor/src/index.ts +++ b/apps/supervisor/src/index.ts @@ -267,6 +267,7 @@ class ManagedSupervisor { snapshotId: message.snapshot.id, snapshotFriendlyId: message.snapshot.friendlyId, placementTags: message.placementTags, + annotations: message.run.annotations, }); // Disabled for now diff --git a/apps/supervisor/src/workloadManager/kubernetes.ts b/apps/supervisor/src/workloadManager/kubernetes.ts index 16c5eff9da1..76edfe49c32 100644 --- a/apps/supervisor/src/workloadManager/kubernetes.ts +++ b/apps/supervisor/src/workloadManager/kubernetes.ts @@ -120,7 +120,7 @@ export class KubernetesWorkloadManager implements WorkloadManager { }, spec: { ...this.addPlacementTags(this.#defaultPodSpec, opts.placementTags), - affinity: this.#getAffinity(opts.machine, opts.projectId), + affinity: this.#getAffinity(opts), terminationGracePeriodSeconds: 60 * 60, containers: [ { @@ -335,6 +335,10 @@ export class KubernetesWorkloadManager implements WorkloadManager { }; } + #isScheduledRun(opts: WorkloadManagerCreateOptions): boolean { + return opts.annotations?.rootTriggerSource === "schedule"; + } + #getSharedLabels(opts: WorkloadManagerCreateOptions): Record { return { env: opts.envId, @@ -342,6 +346,11 @@ export class KubernetesWorkloadManager implements WorkloadManager { org: opts.orgId, project: opts.projectId, machine: opts.machine.name, + // We intentionally use a boolean label rather than exposing the full trigger source + // (e.g. sdk, api, cli, mcp, schedule) to keep label cardinality low in metrics. + // The schedule vs non-schedule distinction is all we need for the current metrics + // and pool-level scheduling decisions; finer-grained source breakdowns live in run annotations. + scheduled: String(this.#isScheduledRun(opts)), }; } @@ -390,16 +399,37 @@ export class KubernetesWorkloadManager implements WorkloadManager { return preset.name.startsWith("large-"); } - #getAffinity(preset: MachinePreset, projectId: string): k8s.V1Affinity | undefined { - const nodeAffinity = this.#getNodeAffinityRules(preset); - const podAffinity = this.#getProjectPodAffinity(projectId); - - if (!nodeAffinity && !podAffinity) { + #getAffinity(opts: WorkloadManagerCreateOptions): k8s.V1Affinity | undefined { + const largeNodeAffinity = this.#getNodeAffinityRules(opts.machine); + const scheduleNodeAffinity = this.#getScheduleNodeAffinityRules(this.#isScheduledRun(opts)); + const podAffinity = this.#getProjectPodAffinity(opts.projectId); + + // Merge node affinity rules from multiple sources + const preferred = [ + ...(largeNodeAffinity?.preferredDuringSchedulingIgnoredDuringExecution ?? []), + ...(scheduleNodeAffinity?.preferredDuringSchedulingIgnoredDuringExecution ?? []), + ]; + // Only large machine affinity produces hard requirements (non-large runs must stay off the large pool). + // Schedule affinity is soft both ways. + const required = [ + ...(largeNodeAffinity?.requiredDuringSchedulingIgnoredDuringExecution?.nodeSelectorTerms ?? []), + ]; + + const hasNodeAffinity = preferred.length > 0 || required.length > 0; + + if (!hasNodeAffinity && !podAffinity) { return undefined; } return { - ...(nodeAffinity && { nodeAffinity }), + ...(hasNodeAffinity && { + nodeAffinity: { + ...(preferred.length > 0 && { preferredDuringSchedulingIgnoredDuringExecution: preferred }), + ...(required.length > 0 && { + requiredDuringSchedulingIgnoredDuringExecution: { nodeSelectorTerms: required }, + }), + }, + }), ...(podAffinity && { podAffinity }), }; } @@ -447,6 +477,50 @@ export class KubernetesWorkloadManager implements WorkloadManager { }; } + #getScheduleNodeAffinityRules(isScheduledRun: boolean): k8s.V1NodeAffinity | undefined { + if (!env.KUBERNETES_SCHEDULE_AFFINITY_ENABLED || !env.KUBERNETES_SCHEDULE_AFFINITY_POOL_LABEL_VALUE) { + return undefined; + } + + if (isScheduledRun) { + // soft preference for the schedule pool + return { + preferredDuringSchedulingIgnoredDuringExecution: [ + { + weight: env.KUBERNETES_SCHEDULE_AFFINITY_WEIGHT, + preference: { + matchExpressions: [ + { + key: env.KUBERNETES_SCHEDULE_AFFINITY_POOL_LABEL_KEY, + operator: "In", + values: [env.KUBERNETES_SCHEDULE_AFFINITY_POOL_LABEL_VALUE], + }, + ], + }, + }, + ], + }; + } + + // soft anti-affinity: non-schedule runs prefer to avoid the schedule pool + return { + preferredDuringSchedulingIgnoredDuringExecution: [ + { + weight: env.KUBERNETES_SCHEDULE_ANTI_AFFINITY_WEIGHT, + preference: { + matchExpressions: [ + { + key: env.KUBERNETES_SCHEDULE_AFFINITY_POOL_LABEL_KEY, + operator: "NotIn", + values: [env.KUBERNETES_SCHEDULE_AFFINITY_POOL_LABEL_VALUE], + }, + ], + }, + }, + ], + }; + } + #getProjectPodAffinity(projectId: string): k8s.V1PodAffinity | undefined { if (!env.KUBERNETES_PROJECT_AFFINITY_ENABLED) { return undefined; diff --git a/apps/supervisor/src/workloadManager/types.ts b/apps/supervisor/src/workloadManager/types.ts index 90b61957795..fca27b249a2 100644 --- a/apps/supervisor/src/workloadManager/types.ts +++ b/apps/supervisor/src/workloadManager/types.ts @@ -1,4 +1,4 @@ -import type { EnvironmentType, MachinePreset, PlacementTag } from "@trigger.dev/core/v3"; +import type { EnvironmentType, MachinePreset, PlacementTag, RunAnnotations } from "@trigger.dev/core/v3"; export interface WorkloadManagerOptions { workloadApiProtocol: "http" | "https"; @@ -35,4 +35,5 @@ export interface WorkloadManagerCreateOptions { runFriendlyId: string; snapshotId: string; snapshotFriendlyId: string; + annotations?: RunAnnotations; } diff --git a/internal-packages/run-engine/src/engine/systems/dequeueSystem.ts b/internal-packages/run-engine/src/engine/systems/dequeueSystem.ts index e8333bcce70..101d7bf243b 100644 --- a/internal-packages/run-engine/src/engine/systems/dequeueSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/dequeueSystem.ts @@ -1,7 +1,7 @@ import type { BillingCache } from "../billingCache.js"; import { startSpan } from "@internal/tracing"; import { assertExhaustive, tryCatch } from "@trigger.dev/core"; -import { DequeuedMessage, RetryOptions } from "@trigger.dev/core/v3"; +import { DequeuedMessage, RetryOptions, RunAnnotations } from "@trigger.dev/core/v3"; import { placementTag } from "@trigger.dev/core/v3/serverOnly"; import { getMaxDuration } from "@trigger.dev/core/v3/isomorphic"; import { @@ -575,6 +575,7 @@ export class DequeueSystem { // Keeping this for backwards compatibility, but really this should be called workerQueue masterQueue: lockedTaskRun.workerQueue, traceContext: lockedTaskRun.traceContext as Record, + annotations: RunAnnotations.safeParse(lockedTaskRun.annotations).data, }, environment: { id: lockedTaskRun.runtimeEnvironment.id, diff --git a/packages/core/src/v3/schemas/runEngine.ts b/packages/core/src/v3/schemas/runEngine.ts index e4057e7ca66..a819ab9e589 100644 --- a/packages/core/src/v3/schemas/runEngine.ts +++ b/packages/core/src/v3/schemas/runEngine.ts @@ -1,6 +1,7 @@ import { z } from "zod"; import { Enum, MachinePreset, RuntimeEnvironmentType, TaskRunExecution } from "./common.js"; import { EnvironmentType } from "./schemas.js"; +import { RunAnnotations } from "./api.js"; import type * as DB_TYPES from "@trigger.dev/database"; export const TaskRunExecutionStatus = { @@ -259,6 +260,7 @@ export const DequeuedMessage = z.object({ attemptNumber: z.number(), masterQueue: z.string(), traceContext: z.record(z.unknown()), + annotations: RunAnnotations.optional(), }), environment: z.object({ id: z.string(), From b085a4110a031a1a6dc93a310537c53c0f6a7ac0 Mon Sep 17 00:00:00 2001 From: Saadi Myftija Date: Wed, 25 Mar 2026 17:01:02 +0100 Subject: [PATCH 2/4] Adjust the large machine affinity flag name for consistency --- apps/supervisor/src/env.ts | 5 ++++- apps/supervisor/src/workloadManager/kubernetes.ts | 10 +++++----- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/apps/supervisor/src/env.ts b/apps/supervisor/src/env.ts index 76ffb2b48f9..558131cd935 100644 --- a/apps/supervisor/src/env.ts +++ b/apps/supervisor/src/env.ts @@ -110,7 +110,10 @@ const Env = z.object({ KUBERNETES_MEMORY_OVERHEAD_GB: z.coerce.number().min(0).optional(), // Optional memory overhead to add to the limit in GB KUBERNETES_SCHEDULER_NAME: z.string().optional(), // Custom scheduler name for pods - KUBERNETES_LARGE_MACHINE_POOL_LABEL: z.string().optional(), // if set, large-* presets affinity for machinepool= + // Large machine affinity settings - large-* presets prefer a dedicated pool + KUBERNETES_LARGE_MACHINE_AFFINITY_ENABLED: BoolEnv.default(false), + KUBERNETES_LARGE_MACHINE_AFFINITY_POOL_LABEL_KEY: z.string().default("node.cluster.x-k8s.io/machinepool"), + KUBERNETES_LARGE_MACHINE_AFFINITY_POOL_LABEL_VALUE: z.string().default("large-machines"), // Project affinity settings - pods from the same project prefer the same node KUBERNETES_PROJECT_AFFINITY_ENABLED: BoolEnv.default(false), diff --git a/apps/supervisor/src/workloadManager/kubernetes.ts b/apps/supervisor/src/workloadManager/kubernetes.ts index 76edfe49c32..5672964bab6 100644 --- a/apps/supervisor/src/workloadManager/kubernetes.ts +++ b/apps/supervisor/src/workloadManager/kubernetes.ts @@ -435,7 +435,7 @@ export class KubernetesWorkloadManager implements WorkloadManager { } #getNodeAffinityRules(preset: MachinePreset): k8s.V1NodeAffinity | undefined { - if (!env.KUBERNETES_LARGE_MACHINE_POOL_LABEL) { + if (!env.KUBERNETES_LARGE_MACHINE_AFFINITY_ENABLED) { return undefined; } @@ -448,9 +448,9 @@ export class KubernetesWorkloadManager implements WorkloadManager { preference: { matchExpressions: [ { - key: "node.cluster.x-k8s.io/machinepool", + key: env.KUBERNETES_LARGE_MACHINE_AFFINITY_POOL_LABEL_KEY, operator: "In", - values: [env.KUBERNETES_LARGE_MACHINE_POOL_LABEL], + values: [env.KUBERNETES_LARGE_MACHINE_AFFINITY_POOL_LABEL_VALUE], }, ], }, @@ -466,9 +466,9 @@ export class KubernetesWorkloadManager implements WorkloadManager { { matchExpressions: [ { - key: "node.cluster.x-k8s.io/machinepool", + key: env.KUBERNETES_LARGE_MACHINE_AFFINITY_POOL_LABEL_KEY, operator: "NotIn", - values: [env.KUBERNETES_LARGE_MACHINE_POOL_LABEL], + values: [env.KUBERNETES_LARGE_MACHINE_AFFINITY_POOL_LABEL_VALUE], }, ], }, From ea78788f149b9a196aafcfbc10d0957a76f68e50 Mon Sep 17 00:00:00 2001 From: Saadi Myftija Date: Wed, 25 Mar 2026 17:42:01 +0100 Subject: [PATCH 3/4] Fix schema issue --- packages/core/src/v3/schemas/api.ts | 20 ++------------------ packages/core/src/v3/schemas/runEngine.ts | 22 +++++++++++++++++++++- 2 files changed, 23 insertions(+), 19 deletions(-) diff --git a/packages/core/src/v3/schemas/api.ts b/packages/core/src/v3/schemas/api.ts index 152005eb001..e29a0685eab 100644 --- a/packages/core/src/v3/schemas/api.ts +++ b/packages/core/src/v3/schemas/api.ts @@ -540,24 +540,8 @@ export const DeploymentTriggeredVia = z export type DeploymentTriggeredVia = z.infer; -export const TriggerSource = z - .enum(["sdk", "api", "dashboard", "cli", "mcp", "schedule"]) - .or(anyString); - -export type TriggerSource = z.infer; - -export const TriggerAction = z.enum(["trigger", "replay", "test"]).or(anyString); - -export type TriggerAction = z.infer; - -export const RunAnnotations = z.object({ - triggerSource: TriggerSource, - triggerAction: TriggerAction, - rootTriggerSource: TriggerSource, - rootScheduleId: z.string().optional(), -}); - -export type RunAnnotations = z.infer; +// TriggerSource, TriggerAction, and RunAnnotations are defined in runEngine.ts +// They are re-exported through the schemas barrel (index.ts) export const UpsertBranchRequestBody = z.object({ git: GitMeta.optional(), diff --git a/packages/core/src/v3/schemas/runEngine.ts b/packages/core/src/v3/schemas/runEngine.ts index a819ab9e589..5195be2d728 100644 --- a/packages/core/src/v3/schemas/runEngine.ts +++ b/packages/core/src/v3/schemas/runEngine.ts @@ -1,9 +1,29 @@ import { z } from "zod"; import { Enum, MachinePreset, RuntimeEnvironmentType, TaskRunExecution } from "./common.js"; import { EnvironmentType } from "./schemas.js"; -import { RunAnnotations } from "./api.js"; import type * as DB_TYPES from "@trigger.dev/database"; +const anyString = z.custom((v) => typeof v === "string"); + +export const TriggerSource = z + .enum(["sdk", "api", "dashboard", "cli", "mcp", "schedule"]) + .or(anyString); + +export type TriggerSource = z.infer; + +export const TriggerAction = z.enum(["trigger", "replay", "test"]).or(anyString); + +export type TriggerAction = z.infer; + +export const RunAnnotations = z.object({ + triggerSource: TriggerSource, + triggerAction: TriggerAction, + rootTriggerSource: TriggerSource, + rootScheduleId: z.string().optional(), +}); + +export type RunAnnotations = z.infer; + export const TaskRunExecutionStatus = { RUN_CREATED: "RUN_CREATED", QUEUED: "QUEUED", From 9edddfd695f729b7c32ac93043600160656ca5a0 Mon Sep 17 00:00:00 2001 From: Saadi Myftija Date: Wed, 25 Mar 2026 17:50:49 +0100 Subject: [PATCH 4/4] Add validation for a couple of env vars --- apps/supervisor/src/env.ts | 9 +++++---- apps/supervisor/src/workloadManager/kubernetes.ts | 2 +- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/apps/supervisor/src/env.ts b/apps/supervisor/src/env.ts index 558131cd935..9eb4aead840 100644 --- a/apps/supervisor/src/env.ts +++ b/apps/supervisor/src/env.ts @@ -112,8 +112,9 @@ const Env = z.object({ KUBERNETES_SCHEDULER_NAME: z.string().optional(), // Custom scheduler name for pods // Large machine affinity settings - large-* presets prefer a dedicated pool KUBERNETES_LARGE_MACHINE_AFFINITY_ENABLED: BoolEnv.default(false), - KUBERNETES_LARGE_MACHINE_AFFINITY_POOL_LABEL_KEY: z.string().default("node.cluster.x-k8s.io/machinepool"), - KUBERNETES_LARGE_MACHINE_AFFINITY_POOL_LABEL_VALUE: z.string().default("large-machines"), + KUBERNETES_LARGE_MACHINE_AFFINITY_POOL_LABEL_KEY: z.string().trim().min(1).default("node.cluster.x-k8s.io/machinepool"), + KUBERNETES_LARGE_MACHINE_AFFINITY_POOL_LABEL_VALUE: z.string().trim().min(1).default("large-machines"), + KUBERNETES_LARGE_MACHINE_AFFINITY_WEIGHT: z.coerce.number().int().min(1).max(100).default(100), // Project affinity settings - pods from the same project prefer the same node KUBERNETES_PROJECT_AFFINITY_ENABLED: BoolEnv.default(false), @@ -122,8 +123,8 @@ const Env = z.object({ // Schedule affinity settings - runs from schedule trees prefer a dedicated pool KUBERNETES_SCHEDULE_AFFINITY_ENABLED: BoolEnv.default(false), - KUBERNETES_SCHEDULE_AFFINITY_POOL_LABEL_KEY: z.string().default("node.cluster.x-k8s.io/machinepool"), - KUBERNETES_SCHEDULE_AFFINITY_POOL_LABEL_VALUE: z.string().default("scheduled-runs"), + KUBERNETES_SCHEDULE_AFFINITY_POOL_LABEL_KEY: z.string().trim().min(1).default("node.cluster.x-k8s.io/machinepool"), + KUBERNETES_SCHEDULE_AFFINITY_POOL_LABEL_VALUE: z.string().trim().min(1).default("scheduled-runs"), KUBERNETES_SCHEDULE_AFFINITY_WEIGHT: z.coerce.number().int().min(1).max(100).default(80), KUBERNETES_SCHEDULE_ANTI_AFFINITY_WEIGHT: z.coerce.number().int().min(1).max(100).default(20), diff --git a/apps/supervisor/src/workloadManager/kubernetes.ts b/apps/supervisor/src/workloadManager/kubernetes.ts index 5672964bab6..0aa5b170126 100644 --- a/apps/supervisor/src/workloadManager/kubernetes.ts +++ b/apps/supervisor/src/workloadManager/kubernetes.ts @@ -444,7 +444,7 @@ export class KubernetesWorkloadManager implements WorkloadManager { return { preferredDuringSchedulingIgnoredDuringExecution: [ { - weight: 100, + weight: env.KUBERNETES_LARGE_MACHINE_AFFINITY_WEIGHT, preference: { matchExpressions: [ {