Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion apps/supervisor/src/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -110,13 +110,24 @@ const Env = z.object({

KUBERNETES_MEMORY_OVERHEAD_GB: z.coerce.number().min(0).optional(), // Optional memory overhead to add to the limit in GB
KUBERNETES_SCHEDULER_NAME: z.string().optional(), // Custom scheduler name for pods
KUBERNETES_LARGE_MACHINE_POOL_LABEL: z.string().optional(), // if set, large-* presets affinity for machinepool=<value>
// Large machine affinity settings - large-* presets prefer a dedicated pool
KUBERNETES_LARGE_MACHINE_AFFINITY_ENABLED: BoolEnv.default(false),
KUBERNETES_LARGE_MACHINE_AFFINITY_POOL_LABEL_KEY: z.string().trim().min(1).default("node.cluster.x-k8s.io/machinepool"),
KUBERNETES_LARGE_MACHINE_AFFINITY_POOL_LABEL_VALUE: z.string().trim().min(1).default("large-machines"),
KUBERNETES_LARGE_MACHINE_AFFINITY_WEIGHT: z.coerce.number().int().min(1).max(100).default(100),

// Project affinity settings - pods from the same project prefer the same node
KUBERNETES_PROJECT_AFFINITY_ENABLED: BoolEnv.default(false),
KUBERNETES_PROJECT_AFFINITY_WEIGHT: z.coerce.number().int().min(1).max(100).default(50),
KUBERNETES_PROJECT_AFFINITY_TOPOLOGY_KEY: z.string().trim().min(1).default("kubernetes.io/hostname"),

// Schedule affinity settings - runs from schedule trees prefer a dedicated pool
KUBERNETES_SCHEDULE_AFFINITY_ENABLED: BoolEnv.default(false),
KUBERNETES_SCHEDULE_AFFINITY_POOL_LABEL_KEY: z.string().trim().min(1).default("node.cluster.x-k8s.io/machinepool"),
KUBERNETES_SCHEDULE_AFFINITY_POOL_LABEL_VALUE: z.string().trim().min(1).default("scheduled-runs"),
KUBERNETES_SCHEDULE_AFFINITY_WEIGHT: z.coerce.number().int().min(1).max(100).default(80),
KUBERNETES_SCHEDULE_ANTI_AFFINITY_WEIGHT: z.coerce.number().int().min(1).max(100).default(20),

// Placement tags settings
PLACEMENT_TAGS_ENABLED: BoolEnv.default(false),
PLACEMENT_TAGS_PREFIX: z.string().default("node.cluster.x-k8s.io"),
Expand Down
1 change: 1 addition & 0 deletions apps/supervisor/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ class ManagedSupervisor {
snapshotId: message.snapshot.id,
snapshotFriendlyId: message.snapshot.friendlyId,
placementTags: message.placementTags,
annotations: message.run.annotations,
});

// Disabled for now
Expand Down
100 changes: 87 additions & 13 deletions apps/supervisor/src/workloadManager/kubernetes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ export class KubernetesWorkloadManager implements WorkloadManager {
},
spec: {
...this.addPlacementTags(this.#defaultPodSpec, opts.placementTags),
affinity: this.#getAffinity(opts.machine, opts.projectId),
affinity: this.#getAffinity(opts),
terminationGracePeriodSeconds: 60 * 60,
containers: [
{
Expand Down Expand Up @@ -335,13 +335,22 @@ export class KubernetesWorkloadManager implements WorkloadManager {
};
}

#isScheduledRun(opts: WorkloadManagerCreateOptions): boolean {
return opts.annotations?.rootTriggerSource === "schedule";
}

#getSharedLabels(opts: WorkloadManagerCreateOptions): Record<string, string> {
return {
env: opts.envId,
envtype: this.#envTypeToLabelValue(opts.envType),
org: opts.orgId,
project: opts.projectId,
machine: opts.machine.name,
// We intentionally use a boolean label rather than exposing the full trigger source
// (e.g. sdk, api, cli, mcp, schedule) to keep label cardinality low in metrics.
// The schedule vs non-schedule distinction is all we need for the current metrics
// and pool-level scheduling decisions; finer-grained source breakdowns live in run annotations.
scheduled: String(this.#isScheduledRun(opts)),
};
}

Expand Down Expand Up @@ -390,22 +399,43 @@ export class KubernetesWorkloadManager implements WorkloadManager {
return preset.name.startsWith("large-");
}

#getAffinity(preset: MachinePreset, projectId: string): k8s.V1Affinity | undefined {
const nodeAffinity = this.#getNodeAffinityRules(preset);
const podAffinity = this.#getProjectPodAffinity(projectId);

if (!nodeAffinity && !podAffinity) {
#getAffinity(opts: WorkloadManagerCreateOptions): k8s.V1Affinity | undefined {
const largeNodeAffinity = this.#getNodeAffinityRules(opts.machine);
const scheduleNodeAffinity = this.#getScheduleNodeAffinityRules(this.#isScheduledRun(opts));
const podAffinity = this.#getProjectPodAffinity(opts.projectId);

// Merge node affinity rules from multiple sources
const preferred = [
...(largeNodeAffinity?.preferredDuringSchedulingIgnoredDuringExecution ?? []),
...(scheduleNodeAffinity?.preferredDuringSchedulingIgnoredDuringExecution ?? []),
];
// Only large machine affinity produces hard requirements (non-large runs must stay off the large pool).
// Schedule affinity is soft both ways.
const required = [
...(largeNodeAffinity?.requiredDuringSchedulingIgnoredDuringExecution?.nodeSelectorTerms ?? []),
];

const hasNodeAffinity = preferred.length > 0 || required.length > 0;

if (!hasNodeAffinity && !podAffinity) {
return undefined;
}

return {
...(nodeAffinity && { nodeAffinity }),
...(hasNodeAffinity && {
nodeAffinity: {
...(preferred.length > 0 && { preferredDuringSchedulingIgnoredDuringExecution: preferred }),
...(required.length > 0 && {
requiredDuringSchedulingIgnoredDuringExecution: { nodeSelectorTerms: required },
}),
},
}),
...(podAffinity && { podAffinity }),
};
}

#getNodeAffinityRules(preset: MachinePreset): k8s.V1NodeAffinity | undefined {
if (!env.KUBERNETES_LARGE_MACHINE_POOL_LABEL) {
if (!env.KUBERNETES_LARGE_MACHINE_AFFINITY_ENABLED) {
return undefined;
}

Expand All @@ -414,13 +444,13 @@ export class KubernetesWorkloadManager implements WorkloadManager {
return {
preferredDuringSchedulingIgnoredDuringExecution: [
{
weight: 100,
weight: env.KUBERNETES_LARGE_MACHINE_AFFINITY_WEIGHT,
preference: {
matchExpressions: [
{
key: "node.cluster.x-k8s.io/machinepool",
key: env.KUBERNETES_LARGE_MACHINE_AFFINITY_POOL_LABEL_KEY,
operator: "In",
values: [env.KUBERNETES_LARGE_MACHINE_POOL_LABEL],
values: [env.KUBERNETES_LARGE_MACHINE_AFFINITY_POOL_LABEL_VALUE],
},
],
},
Expand All @@ -436,9 +466,9 @@ export class KubernetesWorkloadManager implements WorkloadManager {
{
matchExpressions: [
{
key: "node.cluster.x-k8s.io/machinepool",
key: env.KUBERNETES_LARGE_MACHINE_AFFINITY_POOL_LABEL_KEY,
operator: "NotIn",
values: [env.KUBERNETES_LARGE_MACHINE_POOL_LABEL],
values: [env.KUBERNETES_LARGE_MACHINE_AFFINITY_POOL_LABEL_VALUE],
},
],
},
Expand All @@ -447,6 +477,50 @@ export class KubernetesWorkloadManager implements WorkloadManager {
};
}

#getScheduleNodeAffinityRules(isScheduledRun: boolean): k8s.V1NodeAffinity | undefined {
if (!env.KUBERNETES_SCHEDULE_AFFINITY_ENABLED || !env.KUBERNETES_SCHEDULE_AFFINITY_POOL_LABEL_VALUE) {
return undefined;
}

if (isScheduledRun) {
// soft preference for the schedule pool
return {
preferredDuringSchedulingIgnoredDuringExecution: [
{
weight: env.KUBERNETES_SCHEDULE_AFFINITY_WEIGHT,
preference: {
matchExpressions: [
{
key: env.KUBERNETES_SCHEDULE_AFFINITY_POOL_LABEL_KEY,
operator: "In",
values: [env.KUBERNETES_SCHEDULE_AFFINITY_POOL_LABEL_VALUE],
},
],
},
},
],
};
}

// soft anti-affinity: non-schedule runs prefer to avoid the schedule pool
return {
preferredDuringSchedulingIgnoredDuringExecution: [
{
weight: env.KUBERNETES_SCHEDULE_ANTI_AFFINITY_WEIGHT,
preference: {
matchExpressions: [
{
key: env.KUBERNETES_SCHEDULE_AFFINITY_POOL_LABEL_KEY,
operator: "NotIn",
values: [env.KUBERNETES_SCHEDULE_AFFINITY_POOL_LABEL_VALUE],
},
],
},
},
],
};
}

#getProjectPodAffinity(projectId: string): k8s.V1PodAffinity | undefined {
if (!env.KUBERNETES_PROJECT_AFFINITY_ENABLED) {
return undefined;
Expand Down
3 changes: 2 additions & 1 deletion apps/supervisor/src/workloadManager/types.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { EnvironmentType, MachinePreset, PlacementTag } from "@trigger.dev/core/v3";
import type { EnvironmentType, MachinePreset, PlacementTag, RunAnnotations } from "@trigger.dev/core/v3";

export interface WorkloadManagerOptions {
workloadApiProtocol: "http" | "https";
Expand Down Expand Up @@ -35,4 +35,5 @@ export interface WorkloadManagerCreateOptions {
runFriendlyId: string;
snapshotId: string;
snapshotFriendlyId: string;
annotations?: RunAnnotations;
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import type { BillingCache } from "../billingCache.js";
import { startSpan } from "@internal/tracing";
import { assertExhaustive, tryCatch } from "@trigger.dev/core";
import { DequeuedMessage, RetryOptions } from "@trigger.dev/core/v3";
import { DequeuedMessage, RetryOptions, RunAnnotations } from "@trigger.dev/core/v3";
import { placementTag } from "@trigger.dev/core/v3/serverOnly";
import { getMaxDuration } from "@trigger.dev/core/v3/isomorphic";
import {
Expand Down Expand Up @@ -575,6 +575,7 @@ export class DequeueSystem {
// Keeping this for backwards compatibility, but really this should be called workerQueue
masterQueue: lockedTaskRun.workerQueue,
traceContext: lockedTaskRun.traceContext as Record<string, unknown>,
annotations: RunAnnotations.safeParse(lockedTaskRun.annotations).data,
},
environment: {
id: lockedTaskRun.runtimeEnvironment.id,
Expand Down
20 changes: 2 additions & 18 deletions packages/core/src/v3/schemas/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -540,24 +540,8 @@ export const DeploymentTriggeredVia = z

export type DeploymentTriggeredVia = z.infer<typeof DeploymentTriggeredVia>;

export const TriggerSource = z
.enum(["sdk", "api", "dashboard", "cli", "mcp", "schedule"])
.or(anyString);

export type TriggerSource = z.infer<typeof TriggerSource>;

export const TriggerAction = z.enum(["trigger", "replay", "test"]).or(anyString);

export type TriggerAction = z.infer<typeof TriggerAction>;

export const RunAnnotations = z.object({
triggerSource: TriggerSource,
triggerAction: TriggerAction,
rootTriggerSource: TriggerSource,
rootScheduleId: z.string().optional(),
});

export type RunAnnotations = z.infer<typeof RunAnnotations>;
// TriggerSource, TriggerAction, and RunAnnotations are defined in runEngine.ts
// They are re-exported through the schemas barrel (index.ts)

export const UpsertBranchRequestBody = z.object({
git: GitMeta.optional(),
Expand Down
22 changes: 22 additions & 0 deletions packages/core/src/v3/schemas/runEngine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,27 @@ import { Enum, MachinePreset, RuntimeEnvironmentType, TaskRunExecution } from ".
import { EnvironmentType } from "./schemas.js";
import type * as DB_TYPES from "@trigger.dev/database";

const anyString = z.custom<string & {}>((v) => typeof v === "string");

export const TriggerSource = z
.enum(["sdk", "api", "dashboard", "cli", "mcp", "schedule"])
.or(anyString);

export type TriggerSource = z.infer<typeof TriggerSource>;

export const TriggerAction = z.enum(["trigger", "replay", "test"]).or(anyString);

export type TriggerAction = z.infer<typeof TriggerAction>;

export const RunAnnotations = z.object({
triggerSource: TriggerSource,
triggerAction: TriggerAction,
rootTriggerSource: TriggerSource,
rootScheduleId: z.string().optional(),
});

export type RunAnnotations = z.infer<typeof RunAnnotations>;

export const TaskRunExecutionStatus = {
RUN_CREATED: "RUN_CREATED",
QUEUED: "QUEUED",
Expand Down Expand Up @@ -259,6 +280,7 @@ export const DequeuedMessage = z.object({
attemptNumber: z.number(),
masterQueue: z.string(),
traceContext: z.record(z.unknown()),
annotations: RunAnnotations.optional(),
}),
environment: z.object({
id: z.string(),
Expand Down
Loading