diff --git a/.changeset/fix-local-build-load.md b/.changeset/fix-local-build-load.md new file mode 100644 index 00000000000..13f91da9d6a --- /dev/null +++ b/.changeset/fix-local-build-load.md @@ -0,0 +1,5 @@ +--- +"trigger.dev": patch +--- + +Fix `--load` flag being silently ignored on local/self-hosted builds. diff --git a/.claude/skills/span-timeline-events/SKILL.md b/.claude/skills/span-timeline-events/SKILL.md new file mode 100644 index 00000000000..122f49912d7 --- /dev/null +++ b/.claude/skills/span-timeline-events/SKILL.md @@ -0,0 +1,78 @@ +--- +name: span-timeline-events +description: Use when adding, modifying, or debugging OTel span timeline events in the trace view. Covers event structure, ClickHouse storage constraints, rendering in SpanTimeline component, admin visibility, and the step-by-step process for adding new events. +allowed-tools: Read, Write, Edit, Glob, Grep, Bash +--- + +# Span Timeline Events + +The trace view's right panel shows a timeline of events for the selected span. These are OTel span events rendered by `app/utils/timelineSpanEvents.ts` and the `SpanTimeline` component. + +## How They Work + +1. **Span events** in OTel are attached to a parent span. In ClickHouse, they're stored as separate rows with `kind: "SPAN_EVENT"` sharing the parent span's `span_id`. The `#mergeRecordsIntoSpanDetail` method reassembles them into the span's `events` array at query time. +2. The timeline only renders events whose `name` starts with `trigger.dev/` - all others are silently filtered out. +3. The **display name** comes from `properties.event` (not the span event name), mapped through `getFriendlyNameForEvent()`. +4. Events are shown on the **span they belong to** - events on one span don't appear in another span's timeline. + +## ClickHouse Storage Constraint + +When events are written to ClickHouse, `spanEventsToTaskEventV1Input()` filters out events whose `start_time` is not greater than the parent span's `startTime`. Events at or before the span start are silently dropped. This means span events must have timestamps strictly after the span's own `startTimeUnixNano`. + +## Timeline Rendering (SpanTimeline component) + +The `SpanTimeline` component in `app/components/run/RunTimeline.tsx` renders: + +1. **Events** (thin 1px line with hollow dots) - all events from `createTimelineSpanEventsFromSpanEvents()` +2. **"Started"** marker (thick cap) - at the span's `startTime` +3. **Duration bar** (thick 7px line) - from "Started" to "Finished" +4. **"Finished"** marker (thick cap) - at `startTime + duration` + +The thin line before "Started" only appears when there are events with timestamps between the span start and the first child span. For the Attempt span this works well (Dequeued -> Pod scheduled -> Launched -> etc. all happen before execution starts). Events all get `lineVariant: "light"` (thin) while the execution bar gets `variant: "normal"` (thick). + +## Trace View Sort Order + +Sibling spans (same parent) are sorted by `start_time ASC` from the ClickHouse query. The `createTreeFromFlatItems` function preserves this order. Event timestamps don't affect sort order - only the span's own `start_time`. + +## Event Structure + +```typescript +// OTel span event format +{ + name: "trigger.dev/run", // Must start with "trigger.dev/" to render + timeUnixNano: "1711200000000000000", + attributes: [ + { key: "event", value: { stringValue: "dequeue" } }, // The actual event type + { key: "duration", value: { intValue: 150 } }, // Optional: duration in ms + ] +} +``` + +## Admin-Only Events + +`getAdminOnlyForEvent()` controls visibility. Events default to **admin-only** (`true`). + +| Event | Admin-only | Friendly name | +|-------|-----------|---------------| +| `dequeue` | No | Dequeued | +| `fork` | No | Launched | +| `import` | No (if no fork event) | Importing task file | +| `create_attempt` | Yes | Attempt created | +| `lazy_payload` | Yes | Lazy attempt initialized | +| `pod_scheduled` | Yes | Pod scheduled | +| (default) | Yes | (raw event name) | + +## Adding New Timeline Events + +1. Add OTLP span event with `name: "trigger.dev/"` and `properties.event: ""` +2. Event timestamp must be strictly after the parent span's `startTimeUnixNano` (ClickHouse drops earlier events) +3. Add friendly name in `getFriendlyNameForEvent()` in `app/utils/timelineSpanEvents.ts` +4. Set admin visibility in `getAdminOnlyForEvent()` +5. Optionally add help text in `getHelpTextForEvent()` + +## Key Files + +- `app/utils/timelineSpanEvents.ts` - filtering, naming, admin logic +- `app/components/run/RunTimeline.tsx` - `SpanTimeline` component (thin line + thick bar rendering) +- `app/presenters/v3/SpanPresenter.server.ts` - loads span data including events +- `app/v3/eventRepository/clickhouseEventRepository.server.ts` - `spanEventsToTaskEventV1Input()` (storage filter), `#mergeRecordsIntoSpanDetail` (reassembly) diff --git a/apps/supervisor/package.json b/apps/supervisor/package.json index e9609bf1541..092d9dcf604 100644 --- a/apps/supervisor/package.json +++ b/apps/supervisor/package.json @@ -15,6 +15,7 @@ "dependencies": { "@aws-sdk/client-ecr": "^3.839.0", "@kubernetes/client-node": "^1.0.0", + "@internal/compute": "workspace:*", "@trigger.dev/core": "workspace:*", "dockerode": "^4.0.6", "prom-client": "^15.1.0", diff --git a/apps/supervisor/src/env.ts b/apps/supervisor/src/env.ts index 9eb4aead840..96359e5a9e3 100644 --- a/apps/supervisor/src/env.ts +++ b/apps/supervisor/src/env.ts @@ -3,153 +3,187 @@ import { env as stdEnv } from "std-env"; import { z } from "zod"; import { AdditionalEnvVars, BoolEnv } from "./envUtil.js"; -const Env = z.object({ - // This will come from `spec.nodeName` in k8s - TRIGGER_WORKER_INSTANCE_NAME: z.string().default(randomUUID()), - TRIGGER_WORKER_HEARTBEAT_INTERVAL_SECONDS: z.coerce.number().default(30), - - // Required settings - TRIGGER_API_URL: z.string().url(), - TRIGGER_WORKER_TOKEN: z.string(), // accepts file:// path to read from a file - MANAGED_WORKER_SECRET: z.string(), - OTEL_EXPORTER_OTLP_ENDPOINT: z.string().url(), // set on the runners - - // Workload API settings (coordinator mode) - the workload API is what the run controller connects to - TRIGGER_WORKLOAD_API_ENABLED: BoolEnv.default(true), - TRIGGER_WORKLOAD_API_PROTOCOL: z - .string() - .transform((s) => z.enum(["http", "https"]).parse(s.toLowerCase())) - .default("http"), - TRIGGER_WORKLOAD_API_DOMAIN: z.string().optional(), // If unset, will use orchestrator-specific default - TRIGGER_WORKLOAD_API_HOST_INTERNAL: z.string().default("0.0.0.0"), - TRIGGER_WORKLOAD_API_PORT_INTERNAL: z.coerce.number().default(8020), // This is the port the workload API listens on - TRIGGER_WORKLOAD_API_PORT_EXTERNAL: z.coerce.number().default(8020), // This is the exposed port passed to the run controller - - // Runner settings - RUNNER_HEARTBEAT_INTERVAL_SECONDS: z.coerce.number().optional(), - RUNNER_SNAPSHOT_POLL_INTERVAL_SECONDS: z.coerce.number().optional(), - RUNNER_ADDITIONAL_ENV_VARS: AdditionalEnvVars, // optional (csv) - RUNNER_PRETTY_LOGS: BoolEnv.default(false), - - // Dequeue settings (provider mode) - TRIGGER_DEQUEUE_ENABLED: BoolEnv.default(true), - TRIGGER_DEQUEUE_INTERVAL_MS: z.coerce.number().int().default(250), - TRIGGER_DEQUEUE_IDLE_INTERVAL_MS: z.coerce.number().int().default(1000), - TRIGGER_DEQUEUE_MAX_RUN_COUNT: z.coerce.number().int().default(1), - TRIGGER_DEQUEUE_MIN_CONSUMER_COUNT: z.coerce.number().int().default(1), - TRIGGER_DEQUEUE_MAX_CONSUMER_COUNT: z.coerce.number().int().default(10), - TRIGGER_DEQUEUE_SCALING_STRATEGY: z.enum(["none", "smooth", "aggressive"]).default("none"), - TRIGGER_DEQUEUE_SCALING_UP_COOLDOWN_MS: z.coerce.number().int().default(5000), // 5 seconds - TRIGGER_DEQUEUE_SCALING_DOWN_COOLDOWN_MS: z.coerce.number().int().default(30000), // 30 seconds - TRIGGER_DEQUEUE_SCALING_TARGET_RATIO: z.coerce.number().default(1.0), // Target ratio of queue items to consumers (1.0 = 1 item per consumer) - TRIGGER_DEQUEUE_SCALING_EWMA_ALPHA: z.coerce.number().min(0).max(1).default(0.3), // Smooths queue length measurements (0=historical, 1=current) - TRIGGER_DEQUEUE_SCALING_BATCH_WINDOW_MS: z.coerce.number().int().positive().default(1000), // Batch window for metrics processing (ms) - TRIGGER_DEQUEUE_SCALING_DAMPING_FACTOR: z.coerce.number().min(0).max(1).default(0.7), // Smooths consumer count changes after EWMA (0=no scaling, 1=immediate) - - // Optional services - TRIGGER_WARM_START_URL: z.string().optional(), - TRIGGER_CHECKPOINT_URL: z.string().optional(), - TRIGGER_METADATA_URL: z.string().optional(), - - // Used by the resource monitor - RESOURCE_MONITOR_ENABLED: BoolEnv.default(false), - RESOURCE_MONITOR_OVERRIDE_CPU_TOTAL: z.coerce.number().optional(), - RESOURCE_MONITOR_OVERRIDE_MEMORY_TOTAL_GB: z.coerce.number().optional(), - - // Docker settings - DOCKER_API_VERSION: z.string().optional(), - DOCKER_PLATFORM: z.string().optional(), // e.g. linux/amd64, linux/arm64 - DOCKER_STRIP_IMAGE_DIGEST: BoolEnv.default(true), - DOCKER_REGISTRY_USERNAME: z.string().optional(), - DOCKER_REGISTRY_PASSWORD: z.string().optional(), - DOCKER_REGISTRY_URL: z.string().optional(), // e.g. https://index.docker.io/v1 - DOCKER_ENFORCE_MACHINE_PRESETS: BoolEnv.default(true), - DOCKER_AUTOREMOVE_EXITED_CONTAINERS: BoolEnv.default(true), - /** - * Network mode to use for all runners. Supported standard values are: `bridge`, `host`, `none`, and `container:`. - * Any other value is taken as a custom network's name to which all runners should connect to. - * - * Accepts a list of comma-separated values to attach to multiple networks. Additional networks are interpreted as network names and will be attached after container creation. - * - * **WARNING**: Specifying multiple networks will slightly increase startup times. - * - * @default "host" - */ - DOCKER_RUNNER_NETWORKS: z.string().default("host"), - - // Kubernetes settings - KUBERNETES_FORCE_ENABLED: BoolEnv.default(false), - KUBERNETES_NAMESPACE: z.string().default("default"), - KUBERNETES_WORKER_NODETYPE_LABEL: z.string().default("v4-worker"), - KUBERNETES_IMAGE_PULL_SECRETS: z.string().optional(), // csv - KUBERNETES_EPHEMERAL_STORAGE_SIZE_LIMIT: z.string().default("10Gi"), - KUBERNETES_EPHEMERAL_STORAGE_SIZE_REQUEST: z.string().default("2Gi"), - KUBERNETES_STRIP_IMAGE_DIGEST: BoolEnv.default(false), - KUBERNETES_CPU_REQUEST_MIN_CORES: z.coerce.number().min(0).default(0), - KUBERNETES_CPU_REQUEST_RATIO: z.coerce.number().min(0).max(1).default(0.75), // Ratio of CPU limit, so 0.75 = 75% of CPU limit - KUBERNETES_MEMORY_REQUEST_MIN_GB: z.coerce.number().min(0).default(0), - KUBERNETES_MEMORY_REQUEST_RATIO: z.coerce.number().min(0).max(1).default(1), // Ratio of memory limit, so 1 = 100% of memory limit - - // Per-preset overrides of the global KUBERNETES_CPU_REQUEST_RATIO - KUBERNETES_CPU_REQUEST_RATIO_MICRO: z.coerce.number().min(0).max(1).optional(), - KUBERNETES_CPU_REQUEST_RATIO_SMALL_1X: z.coerce.number().min(0).max(1).optional(), - KUBERNETES_CPU_REQUEST_RATIO_SMALL_2X: z.coerce.number().min(0).max(1).optional(), - KUBERNETES_CPU_REQUEST_RATIO_MEDIUM_1X: z.coerce.number().min(0).max(1).optional(), - KUBERNETES_CPU_REQUEST_RATIO_MEDIUM_2X: z.coerce.number().min(0).max(1).optional(), - KUBERNETES_CPU_REQUEST_RATIO_LARGE_1X: z.coerce.number().min(0).max(1).optional(), - KUBERNETES_CPU_REQUEST_RATIO_LARGE_2X: z.coerce.number().min(0).max(1).optional(), - - // Per-preset overrides of the global KUBERNETES_MEMORY_REQUEST_RATIO - KUBERNETES_MEMORY_REQUEST_RATIO_MICRO: z.coerce.number().min(0).max(1).optional(), - KUBERNETES_MEMORY_REQUEST_RATIO_SMALL_1X: z.coerce.number().min(0).max(1).optional(), - KUBERNETES_MEMORY_REQUEST_RATIO_SMALL_2X: z.coerce.number().min(0).max(1).optional(), - KUBERNETES_MEMORY_REQUEST_RATIO_MEDIUM_1X: z.coerce.number().min(0).max(1).optional(), - KUBERNETES_MEMORY_REQUEST_RATIO_MEDIUM_2X: z.coerce.number().min(0).max(1).optional(), - KUBERNETES_MEMORY_REQUEST_RATIO_LARGE_1X: z.coerce.number().min(0).max(1).optional(), - KUBERNETES_MEMORY_REQUEST_RATIO_LARGE_2X: z.coerce.number().min(0).max(1).optional(), - - KUBERNETES_MEMORY_OVERHEAD_GB: z.coerce.number().min(0).optional(), // Optional memory overhead to add to the limit in GB - KUBERNETES_SCHEDULER_NAME: z.string().optional(), // Custom scheduler name for pods - // Large machine affinity settings - large-* presets prefer a dedicated pool - KUBERNETES_LARGE_MACHINE_AFFINITY_ENABLED: BoolEnv.default(false), - KUBERNETES_LARGE_MACHINE_AFFINITY_POOL_LABEL_KEY: z.string().trim().min(1).default("node.cluster.x-k8s.io/machinepool"), - KUBERNETES_LARGE_MACHINE_AFFINITY_POOL_LABEL_VALUE: z.string().trim().min(1).default("large-machines"), - KUBERNETES_LARGE_MACHINE_AFFINITY_WEIGHT: z.coerce.number().int().min(1).max(100).default(100), - - // Project affinity settings - pods from the same project prefer the same node - KUBERNETES_PROJECT_AFFINITY_ENABLED: BoolEnv.default(false), - KUBERNETES_PROJECT_AFFINITY_WEIGHT: z.coerce.number().int().min(1).max(100).default(50), - KUBERNETES_PROJECT_AFFINITY_TOPOLOGY_KEY: z.string().trim().min(1).default("kubernetes.io/hostname"), - - // Schedule affinity settings - runs from schedule trees prefer a dedicated pool - KUBERNETES_SCHEDULE_AFFINITY_ENABLED: BoolEnv.default(false), - KUBERNETES_SCHEDULE_AFFINITY_POOL_LABEL_KEY: z.string().trim().min(1).default("node.cluster.x-k8s.io/machinepool"), - KUBERNETES_SCHEDULE_AFFINITY_POOL_LABEL_VALUE: z.string().trim().min(1).default("scheduled-runs"), - KUBERNETES_SCHEDULE_AFFINITY_WEIGHT: z.coerce.number().int().min(1).max(100).default(80), - KUBERNETES_SCHEDULE_ANTI_AFFINITY_WEIGHT: z.coerce.number().int().min(1).max(100).default(20), - - // Placement tags settings - PLACEMENT_TAGS_ENABLED: BoolEnv.default(false), - PLACEMENT_TAGS_PREFIX: z.string().default("node.cluster.x-k8s.io"), - - // Metrics - METRICS_ENABLED: BoolEnv.default(true), - METRICS_COLLECT_DEFAULTS: BoolEnv.default(true), - METRICS_HOST: z.string().default("127.0.0.1"), - METRICS_PORT: z.coerce.number().int().default(9090), - - // Pod cleaner - POD_CLEANER_ENABLED: BoolEnv.default(true), - POD_CLEANER_INTERVAL_MS: z.coerce.number().int().default(10000), - POD_CLEANER_BATCH_SIZE: z.coerce.number().int().default(500), - - // Failed pod handler - FAILED_POD_HANDLER_ENABLED: BoolEnv.default(true), - FAILED_POD_HANDLER_RECONNECT_INTERVAL_MS: z.coerce.number().int().default(1000), - - // Debug - DEBUG: BoolEnv.default(false), - SEND_RUN_DEBUG_LOGS: BoolEnv.default(false), -}); +const Env = z + .object({ + // This will come from `spec.nodeName` in k8s + TRIGGER_WORKER_INSTANCE_NAME: z.string().default(randomUUID()), + TRIGGER_WORKER_HEARTBEAT_INTERVAL_SECONDS: z.coerce.number().default(30), + + // Required settings + TRIGGER_API_URL: z.string().url(), + TRIGGER_WORKER_TOKEN: z.string(), // accepts file:// path to read from a file + MANAGED_WORKER_SECRET: z.string(), + OTEL_EXPORTER_OTLP_ENDPOINT: z.string().url(), // set on the runners + + // Workload API settings (coordinator mode) - the workload API is what the run controller connects to + TRIGGER_WORKLOAD_API_ENABLED: BoolEnv.default(true), + TRIGGER_WORKLOAD_API_PROTOCOL: z + .string() + .transform((s) => z.enum(["http", "https"]).parse(s.toLowerCase())) + .default("http"), + TRIGGER_WORKLOAD_API_DOMAIN: z.string().optional(), // If unset, will use orchestrator-specific default + TRIGGER_WORKLOAD_API_HOST_INTERNAL: z.string().default("0.0.0.0"), + TRIGGER_WORKLOAD_API_PORT_INTERNAL: z.coerce.number().default(8020), // This is the port the workload API listens on + TRIGGER_WORKLOAD_API_PORT_EXTERNAL: z.coerce.number().default(8020), // This is the exposed port passed to the run controller + + // Runner settings + RUNNER_HEARTBEAT_INTERVAL_SECONDS: z.coerce.number().optional(), + RUNNER_SNAPSHOT_POLL_INTERVAL_SECONDS: z.coerce.number().optional(), + RUNNER_ADDITIONAL_ENV_VARS: AdditionalEnvVars, // optional (csv) + RUNNER_PRETTY_LOGS: BoolEnv.default(false), + + // Dequeue settings (provider mode) + TRIGGER_DEQUEUE_ENABLED: BoolEnv.default(true), + TRIGGER_DEQUEUE_INTERVAL_MS: z.coerce.number().int().default(250), + TRIGGER_DEQUEUE_IDLE_INTERVAL_MS: z.coerce.number().int().default(1000), + TRIGGER_DEQUEUE_MAX_RUN_COUNT: z.coerce.number().int().default(1), + TRIGGER_DEQUEUE_MIN_CONSUMER_COUNT: z.coerce.number().int().default(1), + TRIGGER_DEQUEUE_MAX_CONSUMER_COUNT: z.coerce.number().int().default(10), + TRIGGER_DEQUEUE_SCALING_STRATEGY: z.enum(["none", "smooth", "aggressive"]).default("none"), + TRIGGER_DEQUEUE_SCALING_UP_COOLDOWN_MS: z.coerce.number().int().default(5000), // 5 seconds + TRIGGER_DEQUEUE_SCALING_DOWN_COOLDOWN_MS: z.coerce.number().int().default(30000), // 30 seconds + TRIGGER_DEQUEUE_SCALING_TARGET_RATIO: z.coerce.number().default(1.0), // Target ratio of queue items to consumers (1.0 = 1 item per consumer) + TRIGGER_DEQUEUE_SCALING_EWMA_ALPHA: z.coerce.number().min(0).max(1).default(0.3), // Smooths queue length measurements (0=historical, 1=current) + TRIGGER_DEQUEUE_SCALING_BATCH_WINDOW_MS: z.coerce.number().int().positive().default(1000), // Batch window for metrics processing (ms) + TRIGGER_DEQUEUE_SCALING_DAMPING_FACTOR: z.coerce.number().min(0).max(1).default(0.7), // Smooths consumer count changes after EWMA (0=no scaling, 1=immediate) + + // Optional services + TRIGGER_WARM_START_URL: z.string().optional(), + TRIGGER_CHECKPOINT_URL: z.string().optional(), + TRIGGER_METADATA_URL: z.string().optional(), + + // Used by the resource monitor + RESOURCE_MONITOR_ENABLED: BoolEnv.default(false), + RESOURCE_MONITOR_OVERRIDE_CPU_TOTAL: z.coerce.number().optional(), + RESOURCE_MONITOR_OVERRIDE_MEMORY_TOTAL_GB: z.coerce.number().optional(), + + // Docker settings + DOCKER_API_VERSION: z.string().optional(), + DOCKER_PLATFORM: z.string().optional(), // e.g. linux/amd64, linux/arm64 + DOCKER_STRIP_IMAGE_DIGEST: BoolEnv.default(true), + DOCKER_REGISTRY_USERNAME: z.string().optional(), + DOCKER_REGISTRY_PASSWORD: z.string().optional(), + DOCKER_REGISTRY_URL: z.string().optional(), // e.g. https://index.docker.io/v1 + DOCKER_ENFORCE_MACHINE_PRESETS: BoolEnv.default(true), + DOCKER_AUTOREMOVE_EXITED_CONTAINERS: BoolEnv.default(true), + /** + * Network mode to use for all runners. Supported standard values are: `bridge`, `host`, `none`, and `container:`. + * Any other value is taken as a custom network's name to which all runners should connect to. + * + * Accepts a list of comma-separated values to attach to multiple networks. Additional networks are interpreted as network names and will be attached after container creation. + * + * **WARNING**: Specifying multiple networks will slightly increase startup times. + * + * @default "host" + */ + DOCKER_RUNNER_NETWORKS: z.string().default("host"), + + // Compute settings + COMPUTE_GATEWAY_URL: z.string().url().optional(), + COMPUTE_GATEWAY_AUTH_TOKEN: z.string().optional(), + COMPUTE_GATEWAY_TIMEOUT_MS: z.coerce.number().int().default(30_000), + COMPUTE_SNAPSHOTS_ENABLED: BoolEnv.default(false), + COMPUTE_TRACE_SPANS_ENABLED: BoolEnv.default(true), + COMPUTE_TRACE_OTLP_ENDPOINT: z.string().url().optional(), // Override for span export (derived from TRIGGER_API_URL if unset) + COMPUTE_SNAPSHOT_DELAY_MS: z.coerce.number().int().min(0).max(60_000).default(5_000), + + // Kubernetes settings + KUBERNETES_FORCE_ENABLED: BoolEnv.default(false), + KUBERNETES_NAMESPACE: z.string().default("default"), + KUBERNETES_WORKER_NODETYPE_LABEL: z.string().default("v4-worker"), + KUBERNETES_IMAGE_PULL_SECRETS: z.string().optional(), // csv + KUBERNETES_EPHEMERAL_STORAGE_SIZE_LIMIT: z.string().default("10Gi"), + KUBERNETES_EPHEMERAL_STORAGE_SIZE_REQUEST: z.string().default("2Gi"), + KUBERNETES_STRIP_IMAGE_DIGEST: BoolEnv.default(false), + KUBERNETES_CPU_REQUEST_MIN_CORES: z.coerce.number().min(0).default(0), + KUBERNETES_CPU_REQUEST_RATIO: z.coerce.number().min(0).max(1).default(0.75), // Ratio of CPU limit, so 0.75 = 75% of CPU limit + KUBERNETES_MEMORY_REQUEST_MIN_GB: z.coerce.number().min(0).default(0), + KUBERNETES_MEMORY_REQUEST_RATIO: z.coerce.number().min(0).max(1).default(1), // Ratio of memory limit, so 1 = 100% of memory limit + + // Per-preset overrides of the global KUBERNETES_CPU_REQUEST_RATIO + KUBERNETES_CPU_REQUEST_RATIO_MICRO: z.coerce.number().min(0).max(1).optional(), + KUBERNETES_CPU_REQUEST_RATIO_SMALL_1X: z.coerce.number().min(0).max(1).optional(), + KUBERNETES_CPU_REQUEST_RATIO_SMALL_2X: z.coerce.number().min(0).max(1).optional(), + KUBERNETES_CPU_REQUEST_RATIO_MEDIUM_1X: z.coerce.number().min(0).max(1).optional(), + KUBERNETES_CPU_REQUEST_RATIO_MEDIUM_2X: z.coerce.number().min(0).max(1).optional(), + KUBERNETES_CPU_REQUEST_RATIO_LARGE_1X: z.coerce.number().min(0).max(1).optional(), + KUBERNETES_CPU_REQUEST_RATIO_LARGE_2X: z.coerce.number().min(0).max(1).optional(), + + // Per-preset overrides of the global KUBERNETES_MEMORY_REQUEST_RATIO + KUBERNETES_MEMORY_REQUEST_RATIO_MICRO: z.coerce.number().min(0).max(1).optional(), + KUBERNETES_MEMORY_REQUEST_RATIO_SMALL_1X: z.coerce.number().min(0).max(1).optional(), + KUBERNETES_MEMORY_REQUEST_RATIO_SMALL_2X: z.coerce.number().min(0).max(1).optional(), + KUBERNETES_MEMORY_REQUEST_RATIO_MEDIUM_1X: z.coerce.number().min(0).max(1).optional(), + KUBERNETES_MEMORY_REQUEST_RATIO_MEDIUM_2X: z.coerce.number().min(0).max(1).optional(), + KUBERNETES_MEMORY_REQUEST_RATIO_LARGE_1X: z.coerce.number().min(0).max(1).optional(), + KUBERNETES_MEMORY_REQUEST_RATIO_LARGE_2X: z.coerce.number().min(0).max(1).optional(), + + KUBERNETES_MEMORY_OVERHEAD_GB: z.coerce.number().min(0).optional(), // Optional memory overhead to add to the limit in GB + KUBERNETES_SCHEDULER_NAME: z.string().optional(), // Custom scheduler name for pods + // Large machine affinity settings - large-* presets prefer a dedicated pool + KUBERNETES_LARGE_MACHINE_AFFINITY_ENABLED: BoolEnv.default(false), + KUBERNETES_LARGE_MACHINE_AFFINITY_POOL_LABEL_KEY: z.string().trim().min(1).default("node.cluster.x-k8s.io/machinepool"), + KUBERNETES_LARGE_MACHINE_AFFINITY_POOL_LABEL_VALUE: z.string().trim().min(1).default("large-machines"), + KUBERNETES_LARGE_MACHINE_AFFINITY_WEIGHT: z.coerce.number().int().min(1).max(100).default(100), + + // Project affinity settings - pods from the same project prefer the same node + KUBERNETES_PROJECT_AFFINITY_ENABLED: BoolEnv.default(false), + KUBERNETES_PROJECT_AFFINITY_WEIGHT: z.coerce.number().int().min(1).max(100).default(50), + KUBERNETES_PROJECT_AFFINITY_TOPOLOGY_KEY: z + .string() + .trim() + .min(1) + .default("kubernetes.io/hostname"), + + // Schedule affinity settings - runs from schedule trees prefer a dedicated pool + KUBERNETES_SCHEDULE_AFFINITY_ENABLED: BoolEnv.default(false), + KUBERNETES_SCHEDULE_AFFINITY_POOL_LABEL_KEY: z.string().trim().min(1).default("node.cluster.x-k8s.io/machinepool"), + KUBERNETES_SCHEDULE_AFFINITY_POOL_LABEL_VALUE: z.string().trim().min(1).default("scheduled-runs"), + KUBERNETES_SCHEDULE_AFFINITY_WEIGHT: z.coerce.number().int().min(1).max(100).default(80), + KUBERNETES_SCHEDULE_ANTI_AFFINITY_WEIGHT: z.coerce.number().int().min(1).max(100).default(20), + + // Placement tags settings + PLACEMENT_TAGS_ENABLED: BoolEnv.default(false), + PLACEMENT_TAGS_PREFIX: z.string().default("node.cluster.x-k8s.io"), + + // Metrics + METRICS_ENABLED: BoolEnv.default(true), + METRICS_COLLECT_DEFAULTS: BoolEnv.default(true), + METRICS_HOST: z.string().default("127.0.0.1"), + METRICS_PORT: z.coerce.number().int().default(9090), + + // Pod cleaner + POD_CLEANER_ENABLED: BoolEnv.default(true), + POD_CLEANER_INTERVAL_MS: z.coerce.number().int().default(10000), + POD_CLEANER_BATCH_SIZE: z.coerce.number().int().default(500), + + // Failed pod handler + FAILED_POD_HANDLER_ENABLED: BoolEnv.default(true), + FAILED_POD_HANDLER_RECONNECT_INTERVAL_MS: z.coerce.number().int().default(1000), + + // Debug + DEBUG: BoolEnv.default(false), + SEND_RUN_DEBUG_LOGS: BoolEnv.default(false), + }) + .superRefine((data, ctx) => { + if (data.COMPUTE_SNAPSHOTS_ENABLED && !data.TRIGGER_METADATA_URL) { + ctx.addIssue({ + code: z.ZodIssueCode.custom, + message: "TRIGGER_METADATA_URL is required when COMPUTE_SNAPSHOTS_ENABLED is true", + path: ["TRIGGER_METADATA_URL"], + }); + } + if (data.COMPUTE_SNAPSHOTS_ENABLED && !data.TRIGGER_WORKLOAD_API_DOMAIN) { + ctx.addIssue({ + code: z.ZodIssueCode.custom, + message: "TRIGGER_WORKLOAD_API_DOMAIN is required when COMPUTE_SNAPSHOTS_ENABLED is true", + path: ["TRIGGER_WORKLOAD_API_DOMAIN"], + }); + } + }) + .transform((data) => ({ + ...data, + COMPUTE_TRACE_OTLP_ENDPOINT: data.COMPUTE_TRACE_OTLP_ENDPOINT ?? `${data.TRIGGER_API_URL}/otel`, + })); export const env = Env.parse(stdEnv); diff --git a/apps/supervisor/src/index.ts b/apps/supervisor/src/index.ts index bcd68318246..f148d181c57 100644 --- a/apps/supervisor/src/index.ts +++ b/apps/supervisor/src/index.ts @@ -14,6 +14,7 @@ import { } from "./resourceMonitor.js"; import { KubernetesWorkloadManager } from "./workloadManager/kubernetes.js"; import { DockerWorkloadManager } from "./workloadManager/docker.js"; +import { ComputeWorkloadManager } from "./workloadManager/compute.js"; import { HttpServer, CheckpointClient, @@ -35,9 +36,11 @@ class ManagedSupervisor { private readonly metricsServer?: HttpServer; private readonly workloadServer: WorkloadServer; private readonly workloadManager: WorkloadManager; + private readonly computeManager?: ComputeWorkloadManager; private readonly logger = new SimpleStructuredLogger("managed-supervisor"); private readonly resourceMonitor: ResourceMonitor; private readonly checkpointClient?: CheckpointClient; + private readonly isComputeMode: boolean; private readonly podCleaner?: PodCleaner; private readonly failedPodHandler?: FailedPodHandler; @@ -77,9 +80,22 @@ class ManagedSupervisor { : new DockerResourceMonitor(new Docker()) : new NoopResourceMonitor(); - this.workloadManager = this.isKubernetes - ? new KubernetesWorkloadManager(workloadManagerOptions) - : new DockerWorkloadManager(workloadManagerOptions); + this.isComputeMode = !!env.COMPUTE_GATEWAY_URL; + + if (env.COMPUTE_GATEWAY_URL) { + const computeManager = new ComputeWorkloadManager({ + ...workloadManagerOptions, + gatewayUrl: env.COMPUTE_GATEWAY_URL, + gatewayAuthToken: env.COMPUTE_GATEWAY_AUTH_TOKEN, + gatewayTimeoutMs: env.COMPUTE_GATEWAY_TIMEOUT_MS, + }); + this.computeManager = computeManager; + this.workloadManager = computeManager; + } else { + this.workloadManager = this.isKubernetes + ? new KubernetesWorkloadManager(workloadManagerOptions) + : new DockerWorkloadManager(workloadManagerOptions); + } if (this.isKubernetes) { if (env.POD_CLEANER_ENABLED) { @@ -182,13 +198,13 @@ class ManagedSupervisor { } this.workerSession.on("runNotification", async ({ time, run }) => { - this.logger.log("runNotification", { time, run }); + this.logger.verbose("runNotification", { time, run }); this.workloadServer.notifyRun({ run }); }); - this.workerSession.on("runQueueMessage", async ({ time, message }) => { - this.logger.log(`Received message with timestamp ${time.toLocaleString()}`, message); + this.workerSession.on("runQueueMessage", async ({ time, message, dequeueResponseMs, pollingIntervalMs }) => { + this.logger.verbose(`Received message with timestamp ${time.toLocaleString()}`, message); if (message.completedWaitpoints.length > 0) { this.logger.debug("Run has completed waitpoints", { @@ -205,7 +221,39 @@ class ManagedSupervisor { const { checkpoint, ...rest } = message; if (checkpoint) { - this.logger.log("Restoring run", { runId: message.run.id }); + this.logger.debug("Restoring run", { runId: message.run.id }); + + if (this.isComputeMode && this.computeManager && env.COMPUTE_SNAPSHOTS_ENABLED) { + try { + // Derive runnerId unique per restore cycle (matches iceman's pattern) + const runIdShort = message.run.friendlyId.replace("run_", ""); + const checkpointSuffix = checkpoint.id.slice(-8); + const runnerId = `runner-${runIdShort}-${checkpointSuffix}`; + + const didRestore = await this.computeManager.restore({ + snapshotId: checkpoint.location, + runnerId, + runFriendlyId: message.run.friendlyId, + snapshotFriendlyId: message.snapshot.friendlyId, + machine: message.run.machine, + traceContext: message.run.traceContext, + envId: message.environment.id, + orgId: message.organization.id, + projectId: message.project.id, + dequeuedAt: message.dequeuedAt, + }); + + if (didRestore) { + this.logger.debug("Compute restore successful", { runId: message.run.id, runnerId }); + } else { + this.logger.error("Compute restore failed", { runId: message.run.id, runnerId }); + } + } catch (error) { + this.logger.error("Failed to restore run (compute)", { error }); + } + + return; + } if (!this.checkpointClient) { this.logger.error("No checkpoint client", { runId: message.run.id }); @@ -223,7 +271,7 @@ class ManagedSupervisor { }); if (didRestore) { - this.logger.log("Restore successful", { runId: message.run.id }); + this.logger.debug("Restore successful", { runId: message.run.id }); } else { this.logger.error("Restore failed", { runId: message.run.id }); } @@ -234,15 +282,35 @@ class ManagedSupervisor { return; } - this.logger.log("Scheduling run", { runId: message.run.id }); + this.logger.debug("Scheduling run", { runId: message.run.id }); + const warmStartStart = performance.now(); const didWarmStart = await this.tryWarmStart(message); + const warmStartCheckMs = Math.round(performance.now() - warmStartStart); if (didWarmStart) { - this.logger.log("Warm start successful", { runId: message.run.id }); + this.logger.debug("Warm start successful", { runId: message.run.id }); return; } + if (this.isComputeMode && env.COMPUTE_TRACE_SPANS_ENABLED) { + const traceparent = + message.run.traceContext && + "traceparent" in message.run.traceContext && + typeof message.run.traceContext.traceparent === "string" + ? message.run.traceContext.traceparent + : undefined; + + if (traceparent) { + this.workloadServer.registerRunTraceContext(message.run.friendlyId, { + traceparent, + envId: message.environment.id, + orgId: message.organization.id, + projectId: message.project.id, + }); + } + } + try { if (!message.deployment.friendlyId) { // mostly a type guard, deployments always exists for deployed environments @@ -252,6 +320,9 @@ class ManagedSupervisor { await this.workloadManager.create({ dequeuedAt: message.dequeuedAt, + dequeueResponseMs, + pollingIntervalMs, + warmStartCheckMs, envId: message.environment.id, envType: message.environment.type, image: message.image, @@ -267,6 +338,7 @@ class ManagedSupervisor { snapshotId: message.snapshot.id, snapshotFriendlyId: message.snapshot.friendlyId, placementTags: message.placementTags, + traceContext: message.run.traceContext, annotations: message.run.annotations, }); @@ -297,6 +369,7 @@ class ManagedSupervisor { host: env.TRIGGER_WORKLOAD_API_HOST_INTERNAL, workerClient: this.workerSession.httpClient, checkpointClient: this.checkpointClient, + computeManager: this.computeManager, }); this.workloadServer.on("runConnected", this.onRunConnected.bind(this)); @@ -381,6 +454,7 @@ class ManagedSupervisor { async stop() { this.logger.log("Shutting down"); + await this.workloadServer.stop(); await this.workerSession.stop(); // Optional services diff --git a/apps/supervisor/src/otlpPayload.ts b/apps/supervisor/src/otlpPayload.ts new file mode 100644 index 00000000000..3e5b48b530f --- /dev/null +++ b/apps/supervisor/src/otlpPayload.ts @@ -0,0 +1,63 @@ +import { randomBytes } from "crypto"; + +export interface OtlpTraceOptions { + traceId: string; + parentSpanId?: string; + spanName: string; + startTimeMs: number; + endTimeMs: number; + resourceAttributes: Record; + spanAttributes: Record; +} + +/** Build an OTLP JSON ExportTraceServiceRequest payload */ +export function buildOtlpTracePayload(opts: OtlpTraceOptions) { + const spanId = randomBytes(8).toString("hex"); + + return { + resourceSpans: [ + { + resource: { + attributes: [ + { key: "$trigger", value: { boolValue: true } }, + ...toOtlpAttributes(opts.resourceAttributes), + ], + }, + scopeSpans: [ + { + scope: { name: "supervisor.compute" }, + spans: [ + { + traceId: opts.traceId, + spanId, + parentSpanId: opts.parentSpanId, + name: opts.spanName, + kind: 3, // SPAN_KIND_CLIENT + startTimeUnixNano: String(opts.startTimeMs * 1_000_000), + endTimeUnixNano: String(opts.endTimeMs * 1_000_000), + attributes: toOtlpAttributes(opts.spanAttributes), + status: { code: 1 }, // STATUS_CODE_OK + }, + ], + }, + ], + }, + ], + }; +} + +function toOtlpAttributes( + attrs: Record +): Array<{ key: string; value: Record }> { + return Object.entries(attrs).map(([key, value]) => ({ + key, + value: toOtlpValue(value), + })); +} + +function toOtlpValue(value: string | number | boolean): Record { + if (typeof value === "string") return { stringValue: value }; + if (typeof value === "boolean") return { boolValue: value }; + if (Number.isInteger(value)) return { intValue: value }; + return { doubleValue: value }; +} diff --git a/apps/supervisor/src/otlpTrace.test.ts b/apps/supervisor/src/otlpTrace.test.ts new file mode 100644 index 00000000000..506a4d497d0 --- /dev/null +++ b/apps/supervisor/src/otlpTrace.test.ts @@ -0,0 +1,114 @@ +import { describe, it, expect } from "vitest"; +import { buildOtlpTracePayload } from "./otlpPayload.js"; + +describe("buildOtlpTracePayload", () => { + it("builds valid OTLP JSON with timing attributes", () => { + const payload = buildOtlpTracePayload({ + traceId: "abcd1234abcd1234abcd1234abcd1234", + parentSpanId: "1234567890abcdef", + spanName: "compute.provision", + startTimeMs: 1000, + endTimeMs: 1250, + resourceAttributes: { + "ctx.environment.id": "env_123", + "ctx.organization.id": "org_456", + "ctx.project.id": "proj_789", + "ctx.run.id": "run_abc", + }, + spanAttributes: { + "compute.total_ms": 250, + "compute.gateway.schedule_ms": 1, + "compute.cache.image_cached": true, + }, + }); + + expect(payload.resourceSpans).toHaveLength(1); + + const resourceSpan = payload.resourceSpans[0]!; + + // $trigger=true so the webapp accepts it + const triggerAttr = resourceSpan.resource.attributes.find((a) => a.key === "$trigger"); + expect(triggerAttr).toEqual({ key: "$trigger", value: { boolValue: true } }); + + // Resource attributes + const envAttr = resourceSpan.resource.attributes.find( + (a) => a.key === "ctx.environment.id" + ); + expect(envAttr).toEqual({ + key: "ctx.environment.id", + value: { stringValue: "env_123" }, + }); + + // Span basics + const span = resourceSpan.scopeSpans[0]!.spans[0]!; + expect(span.name).toBe("compute.provision"); + expect(span.traceId).toBe("abcd1234abcd1234abcd1234abcd1234"); + expect(span.parentSpanId).toBe("1234567890abcdef"); + + // Integer attribute + const totalMs = span.attributes.find((a) => a.key === "compute.total_ms"); + expect(totalMs).toEqual({ key: "compute.total_ms", value: { intValue: 250 } }); + + // Boolean attribute + const cached = span.attributes.find((a) => a.key === "compute.cache.image_cached"); + expect(cached).toEqual({ key: "compute.cache.image_cached", value: { boolValue: true } }); + }); + + it("generates a valid 16-char hex span ID", () => { + const payload = buildOtlpTracePayload({ + traceId: "abcd1234abcd1234abcd1234abcd1234", + spanName: "test", + startTimeMs: 1000, + endTimeMs: 1001, + resourceAttributes: {}, + spanAttributes: {}, + }); + + const span = payload.resourceSpans[0]!.scopeSpans[0]!.spans[0]!; + expect(span.spanId).toMatch(/^[0-9a-f]{16}$/); + }); + + it("converts timestamps to nanoseconds", () => { + const payload = buildOtlpTracePayload({ + traceId: "abcd1234abcd1234abcd1234abcd1234", + spanName: "test", + startTimeMs: 1000, + endTimeMs: 1250, + resourceAttributes: {}, + spanAttributes: {}, + }); + + const span = payload.resourceSpans[0]!.scopeSpans[0]!.spans[0]!; + expect(span.startTimeUnixNano).toBe("1000000000"); + expect(span.endTimeUnixNano).toBe("1250000000"); + }); + + it("omits parentSpanId when not provided", () => { + const payload = buildOtlpTracePayload({ + traceId: "abcd1234abcd1234abcd1234abcd1234", + spanName: "test", + startTimeMs: 1000, + endTimeMs: 1001, + resourceAttributes: {}, + spanAttributes: {}, + }); + + const span = payload.resourceSpans[0]!.scopeSpans[0]!.spans[0]!; + expect(span.parentSpanId).toBeUndefined(); + }); + + it("handles double values for non-integer numbers", () => { + const payload = buildOtlpTracePayload({ + traceId: "abcd1234abcd1234abcd1234abcd1234", + spanName: "test", + startTimeMs: 1000, + endTimeMs: 1001, + resourceAttributes: {}, + spanAttributes: { "compute.cpu": 0.25 }, + }); + + const span = payload.resourceSpans[0]!.scopeSpans[0]!.spans[0]!; + const cpu = span.attributes.find((a) => a.key === "compute.cpu"); + expect(cpu).toEqual({ key: "compute.cpu", value: { doubleValue: 0.25 } }); + }); +}); diff --git a/apps/supervisor/src/otlpTrace.ts b/apps/supervisor/src/otlpTrace.ts new file mode 100644 index 00000000000..9cef2cb0d1f --- /dev/null +++ b/apps/supervisor/src/otlpTrace.ts @@ -0,0 +1,19 @@ +import { SimpleStructuredLogger } from "@trigger.dev/core/v3/utils/structuredLogger"; +import { env } from "./env.js"; +import type { buildOtlpTracePayload } from "./otlpPayload.js"; + +const logger = new SimpleStructuredLogger("otlp-trace"); + +/** Fire-and-forget: send an OTLP trace payload to the configured endpoint */ +export function sendOtlpTrace(payload: ReturnType) { + fetch(`${env.COMPUTE_TRACE_OTLP_ENDPOINT}/v1/traces`, { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify(payload), + signal: AbortSignal.timeout(5_000), + }).catch((err) => { + logger.warn("failed to send compute trace span", { + error: err instanceof Error ? err.message : String(err), + }); + }); +} diff --git a/apps/supervisor/src/services/failedPodHandler.ts b/apps/supervisor/src/services/failedPodHandler.ts index 07217243769..3d56c92b213 100644 --- a/apps/supervisor/src/services/failedPodHandler.ts +++ b/apps/supervisor/src/services/failedPodHandler.ts @@ -151,7 +151,7 @@ export class FailedPodHandler { } private async onPodCompleted(pod: V1Pod) { - this.logger.info("pod-completed", this.podSummary(pod)); + this.logger.debug("pod-completed", this.podSummary(pod)); this.informerEventsTotal.inc({ namespace: this.namespace, verb: "add" }); if (!pod.metadata?.name) { @@ -165,7 +165,7 @@ export class FailedPodHandler { } if (pod.metadata?.deletionTimestamp) { - this.logger.info("pod-completed: pod is being deleted", this.podSummary(pod)); + this.logger.verbose("pod-completed: pod is being deleted", this.podSummary(pod)); return; } @@ -188,7 +188,7 @@ export class FailedPodHandler { } private async onPodSucceeded(pod: V1Pod) { - this.logger.info("pod-succeeded", this.podSummary(pod)); + this.logger.debug("pod-succeeded", this.podSummary(pod)); this.processedPodsTotal.inc({ namespace: this.namespace, status: this.podStatus(pod), @@ -196,7 +196,7 @@ export class FailedPodHandler { } private async onPodFailed(pod: V1Pod) { - this.logger.info("pod-failed", this.podSummary(pod)); + this.logger.debug("pod-failed", this.podSummary(pod)); try { await this.processFailedPod(pod); @@ -208,7 +208,7 @@ export class FailedPodHandler { } private async processFailedPod(pod: V1Pod) { - this.logger.info("pod-failed: processing pod", this.podSummary(pod)); + this.logger.verbose("pod-failed: processing pod", this.podSummary(pod)); const mainContainer = pod.status?.containerStatuses?.find((c) => c.name === "run-controller"); @@ -231,7 +231,7 @@ export class FailedPodHandler { } private async deletePod(pod: V1Pod) { - this.logger.info("pod-failed: deleting pod", this.podSummary(pod)); + this.logger.verbose("pod-failed: deleting pod", this.podSummary(pod)); try { await this.k8s.core.deleteNamespacedPod({ name: pod.metadata!.name!, diff --git a/apps/supervisor/src/services/podCleaner.ts b/apps/supervisor/src/services/podCleaner.ts index 56eaaeb88af..3ac5da293df 100644 --- a/apps/supervisor/src/services/podCleaner.ts +++ b/apps/supervisor/src/services/podCleaner.ts @@ -90,7 +90,7 @@ export class PodCleaner { status: "succeeded", }); - this.logger.info("Deleted batch of pods", { continuationToken }); + this.logger.debug("Deleted batch of pods", { continuationToken }); } catch (err) { this.logger.error("Failed to delete batch of pods", { err: err instanceof Error ? err.message : String(err), diff --git a/apps/supervisor/src/services/timerWheel.test.ts b/apps/supervisor/src/services/timerWheel.test.ts new file mode 100644 index 00000000000..3f6bb9aa19b --- /dev/null +++ b/apps/supervisor/src/services/timerWheel.test.ts @@ -0,0 +1,254 @@ +import { describe, it, expect, vi, beforeEach, afterEach } from "vitest"; +import { TimerWheel } from "./timerWheel.js"; + +describe("TimerWheel", () => { + beforeEach(() => { + vi.useFakeTimers(); + }); + + afterEach(() => { + vi.useRealTimers(); + }); + + it("dispatches item after delay", () => { + const dispatched: string[] = []; + const wheel = new TimerWheel({ + delayMs: 3000, + onExpire: (item) => dispatched.push(item.key), + }); + + wheel.start(); + wheel.submit("run-1", "snapshot-data"); + + // Not yet + vi.advanceTimersByTime(2900); + expect(dispatched).toEqual([]); + + // After delay + vi.advanceTimersByTime(200); + expect(dispatched).toEqual(["run-1"]); + + wheel.stop(); + }); + + it("cancels item before it fires", () => { + const dispatched: string[] = []; + const wheel = new TimerWheel({ + delayMs: 3000, + onExpire: (item) => dispatched.push(item.key), + }); + + wheel.start(); + wheel.submit("run-1", "data"); + + vi.advanceTimersByTime(1000); + expect(wheel.cancel("run-1")).toBe(true); + + vi.advanceTimersByTime(5000); + expect(dispatched).toEqual([]); + expect(wheel.size).toBe(0); + + wheel.stop(); + }); + + it("cancel returns false for unknown key", () => { + const wheel = new TimerWheel({ + delayMs: 3000, + onExpire: () => {}, + }); + expect(wheel.cancel("nonexistent")).toBe(false); + }); + + it("deduplicates: resubmitting same key replaces the entry", () => { + const dispatched: { key: string; data: string }[] = []; + const wheel = new TimerWheel({ + delayMs: 3000, + onExpire: (item) => dispatched.push({ key: item.key, data: item.data }), + }); + + wheel.start(); + wheel.submit("run-1", "old-data"); + + vi.advanceTimersByTime(1000); + wheel.submit("run-1", "new-data"); + + // Original would have fired at t=3000, but was replaced + // New one fires at t=1000+3000=4000 + vi.advanceTimersByTime(2100); + expect(dispatched).toEqual([]); + + vi.advanceTimersByTime(1000); + expect(dispatched).toEqual([{ key: "run-1", data: "new-data" }]); + + wheel.stop(); + }); + + it("handles many concurrent items", () => { + const dispatched: string[] = []; + const wheel = new TimerWheel({ + delayMs: 3000, + onExpire: (item) => dispatched.push(item.key), + }); + + wheel.start(); + + for (let i = 0; i < 1000; i++) { + wheel.submit(`run-${i}`, `data-${i}`); + } + expect(wheel.size).toBe(1000); + + vi.advanceTimersByTime(3100); + expect(dispatched.length).toBe(1000); + expect(wheel.size).toBe(0); + + wheel.stop(); + }); + + it("handles items submitted at different times", () => { + const dispatched: string[] = []; + const wheel = new TimerWheel({ + delayMs: 3000, + onExpire: (item) => dispatched.push(item.key), + }); + + wheel.start(); + + wheel.submit("run-1", "data"); + vi.advanceTimersByTime(1000); + wheel.submit("run-2", "data"); + vi.advanceTimersByTime(1000); + wheel.submit("run-3", "data"); + + // t=2000: nothing yet + expect(dispatched).toEqual([]); + + // t=3100: run-1 fires + vi.advanceTimersByTime(1100); + expect(dispatched).toEqual(["run-1"]); + + // t=4100: run-2 fires + vi.advanceTimersByTime(1000); + expect(dispatched).toEqual(["run-1", "run-2"]); + + // t=5100: run-3 fires + vi.advanceTimersByTime(1000); + expect(dispatched).toEqual(["run-1", "run-2", "run-3"]); + + wheel.stop(); + }); + + it("setDelay changes delay for new items only", () => { + const dispatched: string[] = []; + const wheel = new TimerWheel({ + delayMs: 3000, + onExpire: (item) => dispatched.push(item.key), + }); + + wheel.start(); + + wheel.submit("run-1", "data"); // 3s delay + + vi.advanceTimersByTime(500); + wheel.setDelay(1000); + wheel.submit("run-2", "data"); // 1s delay + + // t=1500: run-2 should have fired (submitted at t=500 with 1s delay) + vi.advanceTimersByTime(1100); + expect(dispatched).toEqual(["run-2"]); + + // t=3100: run-1 fires at its original 3s delay + vi.advanceTimersByTime(1500); + expect(dispatched).toEqual(["run-2", "run-1"]); + + wheel.stop(); + }); + + it("stop returns unprocessed items", () => { + const dispatched: string[] = []; + const wheel = new TimerWheel({ + delayMs: 3000, + onExpire: (item) => dispatched.push(item.key), + }); + + wheel.start(); + wheel.submit("run-1", "data-1"); + wheel.submit("run-2", "data-2"); + wheel.submit("run-3", "data-3"); + + const remaining = wheel.stop(); + expect(dispatched).toEqual([]); + expect(wheel.size).toBe(0); + expect(remaining.length).toBe(3); + expect(remaining.map((r) => r.key).sort()).toEqual(["run-1", "run-2", "run-3"]); + expect(remaining.find((r) => r.key === "run-1")?.data).toBe("data-1"); + }); + + it("after stop, new submissions are silently dropped", () => { + const dispatched: string[] = []; + const wheel = new TimerWheel({ + delayMs: 3000, + onExpire: (item) => dispatched.push(item.key), + }); + + wheel.start(); + wheel.stop(); + + wheel.submit("run-late", "data"); + expect(dispatched).toEqual([]); + expect(wheel.size).toBe(0); + }); + + it("tracks size correctly through submit/cancel/dispatch", () => { + const wheel = new TimerWheel({ + delayMs: 3000, + onExpire: () => {}, + }); + + wheel.start(); + + wheel.submit("a", "data"); + wheel.submit("b", "data"); + expect(wheel.size).toBe(2); + + wheel.cancel("a"); + expect(wheel.size).toBe(1); + + vi.advanceTimersByTime(3100); + expect(wheel.size).toBe(0); + + wheel.stop(); + }); + + it("clamps delay to valid range", () => { + const dispatched: string[] = []; + + // Very small delay (should be at least 1 tick = 100ms) + const wheel = new TimerWheel({ + delayMs: 0, + onExpire: (item) => dispatched.push(item.key), + }); + + wheel.start(); + wheel.submit("run-1", "data"); + + vi.advanceTimersByTime(200); + expect(dispatched).toEqual(["run-1"]); + + wheel.stop(); + }); + + it("multiple cancel calls are safe", () => { + const wheel = new TimerWheel({ + delayMs: 3000, + onExpire: () => {}, + }); + + wheel.start(); + wheel.submit("run-1", "data"); + + expect(wheel.cancel("run-1")).toBe(true); + expect(wheel.cancel("run-1")).toBe(false); + + wheel.stop(); + }); +}); diff --git a/apps/supervisor/src/services/timerWheel.ts b/apps/supervisor/src/services/timerWheel.ts new file mode 100644 index 00000000000..9584423824d --- /dev/null +++ b/apps/supervisor/src/services/timerWheel.ts @@ -0,0 +1,160 @@ +/** + * TimerWheel implements a hashed timer wheel for efficiently managing large numbers + * of delayed operations with O(1) submit, cancel, and per-item dispatch. + * + * Used by the supervisor to delay snapshot requests so that short-lived waitpoints + * (e.g. triggerAndWait that resolves in <3s) skip the snapshot entirely. + * + * The wheel is a ring buffer of slots. A single setInterval advances a cursor. + * When the cursor reaches a slot, all items in that slot are dispatched. + * + * Fixed capacity: 600 slots at 100ms tick = 60s max delay. + */ + +const TICK_MS = 100; +const NUM_SLOTS = 600; // 60s max delay at 100ms tick + +export type TimerWheelItem = { + key: string; + data: T; +}; + +export type TimerWheelOptions = { + /** Called when an item's delay expires. */ + onExpire: (item: TimerWheelItem) => void; + /** Delay in milliseconds before items fire. Clamped to [100, 60000]. */ + delayMs: number; +}; + +type Entry = { + key: string; + data: T; + slotIndex: number; +}; + +export class TimerWheel { + private slots: Set[]; + private entries: Map>; + private cursor: number; + private intervalId: ReturnType | null; + private onExpire: (item: TimerWheelItem) => void; + private delaySlots: number; + + constructor(opts: TimerWheelOptions) { + this.slots = Array.from({ length: NUM_SLOTS }, () => new Set()); + this.entries = new Map(); + this.cursor = 0; + this.intervalId = null; + this.onExpire = opts.onExpire; + this.delaySlots = Math.max(1, Math.min(NUM_SLOTS, Math.ceil(opts.delayMs / TICK_MS))); + } + + /** Start the timer wheel. Must be called before submitting items. */ + start(): void { + if (this.intervalId) return; + this.intervalId = setInterval(() => this.tick(), TICK_MS); + // Don't hold the process open just for the timer wheel + if (this.intervalId && typeof this.intervalId === "object" && "unref" in this.intervalId) { + this.intervalId.unref(); + } + } + + /** + * Stop the timer wheel and return all unprocessed items. + * The wheel keeps running normally during graceful shutdown - call stop() + * only when you're ready to tear down. Caller decides what to do with leftovers. + */ + stop(): TimerWheelItem[] { + if (this.intervalId) { + clearInterval(this.intervalId); + this.intervalId = null; + } + + const remaining: TimerWheelItem[] = []; + for (const [key, entry] of this.entries) { + remaining.push({ key, data: entry.data }); + } + + for (const slot of this.slots) { + slot.clear(); + } + this.entries.clear(); + + return remaining; + } + + /** + * Update the delay for future submissions. Already-queued items keep their original timing. + * Clamped to [TICK_MS, 60000ms]. + */ + setDelay(delayMs: number): void { + this.delaySlots = Math.max(1, Math.min(NUM_SLOTS, Math.ceil(delayMs / TICK_MS))); + } + + /** + * Submit an item to be dispatched after the configured delay. + * If an item with the same key already exists, it is replaced (dedup). + * No-op if the wheel is stopped. + */ + submit(key: string, data: T): void { + if (!this.intervalId) return; + + // Dedup: remove existing entry for this key + this.cancel(key); + + const slotIndex = (this.cursor + this.delaySlots) % NUM_SLOTS; + const entry: Entry = { key, data, slotIndex }; + + this.entries.set(key, entry); + this.slot(slotIndex).add(key); + } + + /** + * Cancel a pending item. Returns true if the item was found and removed. + */ + cancel(key: string): boolean { + const entry = this.entries.get(key); + if (!entry) return false; + + this.slot(entry.slotIndex).delete(key); + this.entries.delete(key); + return true; + } + + /** Number of pending items in the wheel. */ + get size(): number { + return this.entries.size; + } + + /** Whether the wheel is running. */ + get running(): boolean { + return this.intervalId !== null; + } + + /** Get a slot by index. The array is fully initialized so this always returns a Set. */ + private slot(index: number): Set { + const s = this.slots[index]; + if (!s) throw new Error(`TimerWheel: invalid slot index ${index}`); + return s; + } + + /** Advance the cursor and dispatch all items in the current slot. */ + private tick(): void { + this.cursor = (this.cursor + 1) % NUM_SLOTS; + const slot = this.slot(this.cursor); + + if (slot.size === 0) return; + + // Collect items to dispatch (copy keys since we mutate during iteration) + const keys = [...slot]; + slot.clear(); + + for (const key of keys) { + const entry = this.entries.get(key); + if (!entry) continue; + + this.entries.delete(key); + this.onExpire({ key, data: entry.data }); + } + } +} diff --git a/apps/supervisor/src/workloadManager/compute.ts b/apps/supervisor/src/workloadManager/compute.ts new file mode 100644 index 00000000000..44695a3766f --- /dev/null +++ b/apps/supervisor/src/workloadManager/compute.ts @@ -0,0 +1,367 @@ +import { SimpleStructuredLogger } from "@trigger.dev/core/v3/utils/structuredLogger"; +import { parseTraceparent } from "@trigger.dev/core/v3/isomorphic"; +import { flattenAttributes } from "@trigger.dev/core/v3/utils/flattenAttributes"; +import { + type WorkloadManager, + type WorkloadManagerCreateOptions, + type WorkloadManagerOptions, +} from "./types.js"; +import { ComputeClient, stripImageDigest } from "@internal/compute"; +import { env } from "../env.js"; +import { getRunnerId } from "../util.js"; +import { buildOtlpTracePayload } from "../otlpPayload.js"; +import { sendOtlpTrace } from "../otlpTrace.js"; +import { tryCatch } from "@trigger.dev/core"; + +type ComputeWorkloadManagerOptions = WorkloadManagerOptions & { + gatewayUrl: string; + gatewayAuthToken?: string; + gatewayTimeoutMs: number; +}; + +export class ComputeWorkloadManager implements WorkloadManager { + private readonly logger = new SimpleStructuredLogger("compute-workload-manager"); + private readonly compute: ComputeClient; + + constructor(private opts: ComputeWorkloadManagerOptions) { + if (opts.workloadApiDomain) { + this.logger.warn("⚠️ Custom workload API domain", { + domain: opts.workloadApiDomain, + }); + } + + this.compute = new ComputeClient({ + gatewayUrl: opts.gatewayUrl, + authToken: opts.gatewayAuthToken, + timeoutMs: opts.gatewayTimeoutMs, + }); + } + + async create(opts: WorkloadManagerCreateOptions) { + const runnerId = getRunnerId(opts.runFriendlyId, opts.nextAttemptNumber); + + const envVars: Record = { + OTEL_EXPORTER_OTLP_ENDPOINT: env.OTEL_EXPORTER_OTLP_ENDPOINT, + TRIGGER_DEQUEUED_AT_MS: String(opts.dequeuedAt.getTime()), + TRIGGER_POD_SCHEDULED_AT_MS: String(Date.now()), + TRIGGER_ENV_ID: opts.envId, + TRIGGER_DEPLOYMENT_ID: opts.deploymentFriendlyId, + TRIGGER_DEPLOYMENT_VERSION: opts.deploymentVersion, + TRIGGER_RUN_ID: opts.runFriendlyId, + TRIGGER_SNAPSHOT_ID: opts.snapshotFriendlyId, + TRIGGER_SUPERVISOR_API_PROTOCOL: this.opts.workloadApiProtocol, + TRIGGER_SUPERVISOR_API_PORT: String(this.opts.workloadApiPort), + TRIGGER_SUPERVISOR_API_DOMAIN: this.opts.workloadApiDomain ?? "", + TRIGGER_WORKER_INSTANCE_NAME: env.TRIGGER_WORKER_INSTANCE_NAME, + TRIGGER_RUNNER_ID: runnerId, + TRIGGER_MACHINE_CPU: String(opts.machine.cpu), + TRIGGER_MACHINE_MEMORY: String(opts.machine.memory), + PRETTY_LOGS: String(env.RUNNER_PRETTY_LOGS), + }; + + if (this.opts.warmStartUrl) { + envVars.TRIGGER_WARM_START_URL = this.opts.warmStartUrl; + } + + if (env.COMPUTE_SNAPSHOTS_ENABLED && this.opts.metadataUrl) { + envVars.TRIGGER_METADATA_URL = this.opts.metadataUrl; + } + + if (this.opts.heartbeatIntervalSeconds) { + envVars.TRIGGER_HEARTBEAT_INTERVAL_SECONDS = String(this.opts.heartbeatIntervalSeconds); + } + + if (this.opts.snapshotPollIntervalSeconds) { + envVars.TRIGGER_SNAPSHOT_POLL_INTERVAL_SECONDS = String( + this.opts.snapshotPollIntervalSeconds + ); + } + + if (this.opts.additionalEnvVars) { + Object.assign(envVars, this.opts.additionalEnvVars); + } + + // Strip image digest - resolve by tag, not digest + const imageRef = stripImageDigest(opts.image); + + // Wide event: single canonical log line emitted in finally + const event: Record = { + // High-cardinality identifiers + runId: opts.runFriendlyId, + runnerId, + envId: opts.envId, + envType: opts.envType, + orgId: opts.orgId, + projectId: opts.projectId, + deploymentVersion: opts.deploymentVersion, + machine: opts.machine.name, + // Environment + instanceName: env.TRIGGER_WORKER_INSTANCE_NAME, + // Supervisor timing + dequeueResponseMs: opts.dequeueResponseMs, + pollingIntervalMs: opts.pollingIntervalMs, + warmStartCheckMs: opts.warmStartCheckMs, + // Request + image: imageRef, + }; + + const startMs = performance.now(); + + try { + const [error, data] = await tryCatch( + this.compute.instances.create({ + name: runnerId, + image: imageRef, + env: envVars, + cpu: opts.machine.cpu, + memory_gb: opts.machine.memory, + metadata: { + runId: opts.runFriendlyId, + envId: opts.envId, + envType: opts.envType, + orgId: opts.orgId, + projectId: opts.projectId, + deploymentVersion: opts.deploymentVersion, + machine: opts.machine.name, + }, + }) + ); + + if (error) { + event.error = error instanceof Error ? error.message : String(error); + event.errorType = + error instanceof DOMException && error.name === "TimeoutError" ? "timeout" : "fetch"; + return; + } + + event.instanceId = data.id; + event.ok = true; + + // Parse timing data from compute response (optional - requires gateway timing flag) + if (data._timing) { + event.timing = data._timing; + } + + this.#emitProvisionSpan(opts, startMs, data._timing); + } finally { + event.durationMs = Math.round(performance.now() - startMs); + event.ok ??= false; + this.logger.debug("create instance", event); + } + } + + async snapshot(opts: { + runnerId: string; + callbackUrl: string; + metadata: Record; + }): Promise { + const [error] = await tryCatch( + this.compute.instances.snapshot(opts.runnerId, { + callback: { + url: opts.callbackUrl, + metadata: opts.metadata, + }, + }) + ); + + if (error) { + this.logger.error("snapshot request failed", { + runnerId: opts.runnerId, + error: error instanceof Error ? error.message : String(error), + }); + return false; + } + + this.logger.debug("snapshot request accepted", { runnerId: opts.runnerId }); + return true; + } + + async deleteInstance(runnerId: string): Promise { + const [error] = await tryCatch(this.compute.instances.delete(runnerId)); + + if (error) { + this.logger.error("delete instance failed", { + runnerId, + error: error instanceof Error ? error.message : String(error), + }); + return false; + } + + this.logger.debug("delete instance success", { runnerId }); + return true; + } + + #emitProvisionSpan( + opts: WorkloadManagerCreateOptions, + startMs: number, + timing?: unknown + ) { + if (!env.COMPUTE_TRACE_SPANS_ENABLED) return; + const traceparent = + opts.traceContext && + "traceparent" in opts.traceContext && + typeof opts.traceContext.traceparent === "string" + ? opts.traceContext.traceparent + : undefined; + + const parsed = parseTraceparent(traceparent); + if (!parsed) return; + + const endMs = performance.now(); + const now = Date.now(); + const provisionStartEpochMs = now - (endMs - startMs); + const endEpochMs = now; + + // Span starts at dequeue time so events (dequeue) render in the thin-line section + // before "Started". The actual provision call time is in provisionStartEpochMs. + // Subtract 1ms so compute span always sorts before the attempt span (same dequeue time) + const startEpochMs = opts.dequeuedAt.getTime() - 1; + + const spanAttributes: Record = { + "compute.type": "create", + "compute.provision_start_ms": provisionStartEpochMs, + ...(timing ? (flattenAttributes(timing, "compute") as Record) : {}), + }; + + if (opts.dequeueResponseMs !== undefined) { + spanAttributes["supervisor.dequeue_response_ms"] = opts.dequeueResponseMs; + } + if (opts.warmStartCheckMs !== undefined) { + spanAttributes["supervisor.warm_start_check_ms"] = opts.warmStartCheckMs; + } + + const payload = buildOtlpTracePayload({ + traceId: parsed.traceId, + parentSpanId: parsed.spanId, + spanName: "compute.provision", + startTimeMs: startEpochMs, + endTimeMs: endEpochMs, + resourceAttributes: { + "ctx.environment.id": opts.envId, + "ctx.organization.id": opts.orgId, + "ctx.project.id": opts.projectId, + "ctx.run.id": opts.runFriendlyId, + }, + spanAttributes, + }); + + // Use the platform API URL, not the runner OTLP endpoint (which may be a VM gateway IP) + sendOtlpTrace(payload); + } + + async restore(opts: { + snapshotId: string; + runnerId: string; + runFriendlyId: string; + snapshotFriendlyId: string; + machine: { cpu: number; memory: number }; + // Trace context for OTel span emission + traceContext?: Record; + envId?: string; + orgId?: string; + projectId?: string; + dequeuedAt?: Date; + }): Promise { + const metadata: Record = { + TRIGGER_RUNNER_ID: opts.runnerId, + TRIGGER_RUN_ID: opts.runFriendlyId, + TRIGGER_SNAPSHOT_ID: opts.snapshotFriendlyId, + TRIGGER_SUPERVISOR_API_PROTOCOL: this.opts.workloadApiProtocol, + TRIGGER_SUPERVISOR_API_PORT: String(this.opts.workloadApiPort), + TRIGGER_SUPERVISOR_API_DOMAIN: this.opts.workloadApiDomain ?? "", + TRIGGER_WORKER_INSTANCE_NAME: env.TRIGGER_WORKER_INSTANCE_NAME, + }; + + this.logger.verbose("restore request body", { + snapshotId: opts.snapshotId, + runnerId: opts.runnerId, + }); + + const startMs = performance.now(); + + const [error] = await tryCatch( + this.compute.snapshots.restore(opts.snapshotId, { + name: opts.runnerId, + metadata, + cpu: opts.machine.cpu, + memory_mb: opts.machine.memory * 1024, + }) + ); + + const durationMs = Math.round(performance.now() - startMs); + + if (error) { + this.logger.error("restore request failed", { + snapshotId: opts.snapshotId, + runnerId: opts.runnerId, + error: error instanceof Error ? error.message : String(error), + durationMs, + }); + return false; + } + + this.logger.debug("restore request success", { + snapshotId: opts.snapshotId, + runnerId: opts.runnerId, + durationMs, + }); + + this.#emitRestoreSpan(opts, startMs); + + return true; + } + + #emitRestoreSpan( + opts: { + snapshotId: string; + runnerId: string; + runFriendlyId: string; + traceContext?: Record; + envId?: string; + orgId?: string; + projectId?: string; + dequeuedAt?: Date; + }, + startMs: number + ) { + if (!env.COMPUTE_TRACE_SPANS_ENABLED) return; + + const traceparent = + opts.traceContext && + "traceparent" in opts.traceContext && + typeof opts.traceContext.traceparent === "string" + ? opts.traceContext.traceparent + : undefined; + + const parsed = parseTraceparent(traceparent); + if (!parsed || !opts.envId || !opts.orgId || !opts.projectId) return; + + const endMs = performance.now(); + const now = Date.now(); + const restoreStartEpochMs = now - (endMs - startMs); + const endEpochMs = now; + + // Subtract 1ms so restore span always sorts before the attempt span + const startEpochMs = (opts.dequeuedAt?.getTime() ?? restoreStartEpochMs) - 1; + + const payload = buildOtlpTracePayload({ + traceId: parsed.traceId, + parentSpanId: parsed.spanId, + spanName: "compute.restore", + startTimeMs: startEpochMs, + endTimeMs: endEpochMs, + resourceAttributes: { + "ctx.environment.id": opts.envId, + "ctx.organization.id": opts.orgId, + "ctx.project.id": opts.projectId, + "ctx.run.id": opts.runFriendlyId, + }, + spanAttributes: { + "compute.type": "restore", + "compute.snapshot_id": opts.snapshotId, + }, + }); + + sendOtlpTrace(payload); + } +} diff --git a/apps/supervisor/src/workloadManager/docker.ts b/apps/supervisor/src/workloadManager/docker.ts index d6651d325a2..66405df9ba5 100644 --- a/apps/supervisor/src/workloadManager/docker.ts +++ b/apps/supervisor/src/workloadManager/docker.ts @@ -62,7 +62,7 @@ export class DockerWorkloadManager implements WorkloadManager { } async create(opts: WorkloadManagerCreateOptions) { - this.logger.log("create()", { opts }); + this.logger.verbose("create()", { opts }); const runnerId = getRunnerId(opts.runFriendlyId, opts.nextAttemptNumber); diff --git a/apps/supervisor/src/workloadManager/kubernetes.ts b/apps/supervisor/src/workloadManager/kubernetes.ts index 0aa5b170126..aa1da410038 100644 --- a/apps/supervisor/src/workloadManager/kubernetes.ts +++ b/apps/supervisor/src/workloadManager/kubernetes.ts @@ -100,7 +100,7 @@ export class KubernetesWorkloadManager implements WorkloadManager { } async create(opts: WorkloadManagerCreateOptions) { - this.logger.log("[KubernetesWorkloadManager] Creating container", { opts }); + this.logger.verbose("[KubernetesWorkloadManager] Creating container", { opts }); const runnerId = getRunnerId(opts.runFriendlyId, opts.nextAttemptNumber); diff --git a/apps/supervisor/src/workloadManager/types.ts b/apps/supervisor/src/workloadManager/types.ts index fca27b249a2..22d2bcfe11b 100644 --- a/apps/supervisor/src/workloadManager/types.ts +++ b/apps/supervisor/src/workloadManager/types.ts @@ -24,6 +24,10 @@ export interface WorkloadManagerCreateOptions { nextAttemptNumber?: number; dequeuedAt: Date; placementTags?: PlacementTag[]; + // Timing context (populated by supervisor handler, included in wide event) + dequeueResponseMs?: number; + pollingIntervalMs?: number; + warmStartCheckMs?: number; // identifiers envId: string; envType: EnvironmentType; @@ -35,5 +39,7 @@ export interface WorkloadManagerCreateOptions { runFriendlyId: string; snapshotId: string; snapshotFriendlyId: string; + // Trace context for OTel span emission (W3C format: { traceparent: "00-...", tracestate?: "..." }) + traceContext?: Record; annotations?: RunAnnotations; } diff --git a/apps/supervisor/src/workloadServer/index.ts b/apps/supervisor/src/workloadServer/index.ts index 35d53d36099..0ee28ec77ed 100644 --- a/apps/supervisor/src/workloadServer/index.ts +++ b/apps/supervisor/src/workloadServer/index.ts @@ -24,6 +24,11 @@ import { HttpServer, type CheckpointClient } from "@trigger.dev/core/v3/serverOn import { type IncomingMessage } from "node:http"; import { register } from "../metrics.js"; import { env } from "../env.js"; +import type { ComputeWorkloadManager } from "../workloadManager/compute.js"; +import { TimerWheel } from "../services/timerWheel.js"; +import { parseTraceparent } from "@trigger.dev/core/v3/isomorphic"; +import { buildOtlpTracePayload } from "../otlpPayload.js"; +import { sendOtlpTrace } from "../otlpTrace.js"; // Use the official export when upgrading to socket.io@4.8.0 interface DefaultEventsMap { @@ -53,15 +58,39 @@ type WorkloadServerEvents = { ]; }; +const ComputeSnapshotCallbackBody = z.object({ + snapshot_id: z.string(), + instance_id: z.string(), + status: z.enum(["completed", "failed"]), + error: z.string().optional(), + metadata: z.record(z.string()).optional(), + duration_ms: z.number().optional(), +}); + +type DelayedSnapshot = { + runnerId: string; + runFriendlyId: string; + snapshotFriendlyId: string; +}; + +type RunTraceContext = { + traceparent: string; + envId: string; + orgId: string; + projectId: string; +}; + type WorkloadServerOptions = { port: number; host?: string; workerClient: SupervisorHttpClient; checkpointClient?: CheckpointClient; + computeManager?: ComputeWorkloadManager; }; export class WorkloadServer extends EventEmitter { private checkpointClient?: CheckpointClient; + private computeManager?: ComputeWorkloadManager; private readonly logger = new SimpleStructuredLogger("workload-server"); @@ -84,6 +113,13 @@ export class WorkloadServer extends EventEmitter { >(); private readonly workerClient: SupervisorHttpClient; + // Bounded map for trace contexts used by compute snapshot spans. + // Entries are added on dequeue and consumed on snapshot callback, which may arrive + // hours later after a checkpoint/restore cycle. Using a capped map avoids unbounded + // growth while keeping recent contexts available. Oldest entries are evicted first. + private static readonly MAX_TRACE_CONTEXTS = 10_000; + private readonly runTraceContexts = new Map(); + private readonly snapshotDelayWheel?: TimerWheel; constructor(opts: WorkloadServerOptions) { super(); @@ -93,6 +129,23 @@ export class WorkloadServer extends EventEmitter { this.workerClient = opts.workerClient; this.checkpointClient = opts.checkpointClient; + this.computeManager = opts.computeManager; + + if (this.computeManager && env.COMPUTE_SNAPSHOTS_ENABLED) { + this.snapshotDelayWheel = new TimerWheel({ + delayMs: env.COMPUTE_SNAPSHOT_DELAY_MS, + onExpire: (item) => { + this.dispatchComputeSnapshot(item.data).catch((error) => { + this.logger.error("Compute snapshot dispatch failed", { + runId: item.data.runFriendlyId, + runnerId: item.data.runnerId, + error, + }); + }); + }, + }); + this.snapshotDelayWheel.start(); + } this.httpServer = this.createHttpServer({ host, port }); this.websocketServer = this.createWebsocketServer(); @@ -231,11 +284,19 @@ export class WorkloadServer extends EventEmitter { handler: async ({ reply, params, req }) => { this.logger.debug("Suspend request", { params, headers: req.headers }); - if (!this.checkpointClient) { + const runnerId = this.runnerIdFromRequest(req); + const deploymentVersion = this.deploymentVersionFromRequest(req); + const projectRef = this.projectRefFromRequest(req); + + if (!runnerId || !deploymentVersion || !projectRef) { + this.logger.error("Invalid headers for suspend request", { + ...params, + headers: req.headers, + }); reply.json( { ok: false, - error: "Checkpoints disabled", + error: "Invalid headers", } satisfies WorkloadSuspendRunResponseBody, false, 400 @@ -243,19 +304,45 @@ export class WorkloadServer extends EventEmitter { return; } - const runnerId = this.runnerIdFromRequest(req); - const deploymentVersion = this.deploymentVersionFromRequest(req); - const projectRef = this.projectRefFromRequest(req); + if (this.snapshotDelayWheel && this.computeManager && env.COMPUTE_SNAPSHOTS_ENABLED) { + if (!env.TRIGGER_WORKLOAD_API_DOMAIN) { + this.logger.error( + "TRIGGER_WORKLOAD_API_DOMAIN is not set, cannot create snapshot callback URL" + ); + reply.json( + { + ok: false, + error: "Snapshot callbacks not configured", + } satisfies WorkloadSuspendRunResponseBody, + false, + 500 + ); + return; + } + + // Compute mode: delay snapshot to avoid wasted work on short-lived waitpoints. + // If the run continues before the delay expires, the snapshot is cancelled. + reply.json({ ok: true } satisfies WorkloadSuspendRunResponseBody, false, 202); + + this.snapshotDelayWheel.submit(params.runFriendlyId, { + runnerId, + runFriendlyId: params.runFriendlyId, + snapshotFriendlyId: params.snapshotFriendlyId, + }); - if (!runnerId || !deploymentVersion || !projectRef) { - this.logger.error("Invalid headers for suspend request", { - ...params, - headers: req.headers, + this.logger.debug("Snapshot delayed", { + runId: params.runFriendlyId, + delayMs: env.COMPUTE_SNAPSHOT_DELAY_MS, }); + + return; + } + + if (!this.checkpointClient) { reply.json( { ok: false, - error: "Invalid headers", + error: "Checkpoints disabled", } satisfies WorkloadSuspendRunResponseBody, false, 400 @@ -298,6 +385,11 @@ export class WorkloadServer extends EventEmitter { handler: async ({ req, reply, params }) => { this.logger.debug("Run continuation request", { params }); + // Cancel any pending delayed snapshot for this run + if (this.snapshotDelayWheel?.cancel(params.runFriendlyId)) { + this.logger.debug("Cancelled delayed snapshot", { runId: params.runFriendlyId }); + } + const continuationResult = await this.workerClient.continueRunExecution( params.runFriendlyId, params.snapshotFriendlyId, @@ -394,6 +486,80 @@ export class WorkloadServer extends EventEmitter { }); } + // Compute snapshot callback endpoint + httpServer.route("/api/v1/compute/snapshot-complete", "POST", { + bodySchema: ComputeSnapshotCallbackBody, + handler: async ({ reply, body }) => { + this.logger.debug("Compute snapshot callback", { + snapshotId: body.snapshot_id, + instanceId: body.instance_id, + status: body.status, + error: body.error, + metadata: body.metadata, + durationMs: body.duration_ms, + }); + + const runId = body.metadata?.runId; + const snapshotFriendlyId = body.metadata?.snapshotFriendlyId; + + if (!runId || !snapshotFriendlyId) { + this.logger.error("Compute snapshot callback missing metadata", { body }); + reply.empty(400); + return; + } + + // Emit snapshot span (best-effort - requires trace context from dequeue on this instance) + this.#emitSnapshotSpan(runId, body.duration_ms, body.snapshot_id); + + if (body.status === "completed") { + const result = await this.workerClient.submitSuspendCompletion({ + runId, + snapshotId: snapshotFriendlyId, + body: { + success: true, + checkpoint: { + type: "KUBERNETES", + location: body.snapshot_id, + }, + }, + }); + + if (result.success) { + this.logger.debug("Suspend completion submitted", { + runId, + instanceId: body.instance_id, + snapshotId: body.snapshot_id, + }); + } else { + this.logger.error("Failed to submit suspend completion", { + runId, + snapshotFriendlyId, + error: result.error, + }); + } + } else { + const result = await this.workerClient.submitSuspendCompletion({ + runId, + snapshotId: snapshotFriendlyId, + body: { + success: false, + error: body.error ?? "Snapshot failed", + }, + }); + + if (!result.success) { + this.logger.error("Failed to submit suspend failure", { + runId, + snapshotFriendlyId, + error: result.error, + }); + } + } + + reply.empty(200); + }, + }); + return httpServer; } @@ -408,7 +574,7 @@ export class WorkloadServer extends EventEmitter { > = io.of("/workload"); websocketServer.on("disconnect", (socket) => { - this.logger.log("[WS] disconnect", socket.id); + this.logger.verbose("[WS] disconnect", socket.id); }); websocketServer.use(async (socket, next) => { const setSocketDataFromHeader = ( @@ -490,7 +656,7 @@ export class WorkloadServer extends EventEmitter { socket.data.runFriendlyId = undefined; }; - socketLogger.log("wsServer socket connected", { ...getSocketMetadata() }); + socketLogger.debug("wsServer socket connected", { ...getSocketMetadata() }); // FIXME: where does this get set? if (socket.data.runFriendlyId) { @@ -498,7 +664,7 @@ export class WorkloadServer extends EventEmitter { } socket.on("disconnecting", (reason, description) => { - socketLogger.log("Socket disconnecting", { ...getSocketMetadata(), reason, description }); + socketLogger.verbose("Socket disconnecting", { ...getSocketMetadata(), reason, description }); if (socket.data.runFriendlyId) { runDisconnected(socket.data.runFriendlyId); @@ -506,7 +672,7 @@ export class WorkloadServer extends EventEmitter { }); socket.on("disconnect", (reason, description) => { - socketLogger.log("Socket disconnected", { ...getSocketMetadata(), reason, description }); + socketLogger.debug("Socket disconnected", { ...getSocketMetadata(), reason, description }); }); socket.on("error", (error) => { @@ -527,7 +693,7 @@ export class WorkloadServer extends EventEmitter { ...message, }); - log.log("Handling run:start"); + log.debug("Handling run:start"); try { runConnected(message.run.friendlyId); @@ -543,10 +709,13 @@ export class WorkloadServer extends EventEmitter { ...message, }); - log.log("Handling run:stop"); + log.debug("Handling run:stop"); try { runDisconnected(message.run.friendlyId); + // Don't delete trace context here - run:stop fires after each snapshot/shutdown + // but the run may be restored on a new VM and snapshot again. Trace context is + // re-populated on dequeue, and entries are small (4 strings per run). } catch (error) { log.error("run:stop error", { error }); } @@ -588,11 +757,105 @@ export class WorkloadServer extends EventEmitter { } } + /** + * Dispatch a compute snapshot request to the gateway. Called by the timer wheel + * when the delay expires, or immediately during drain. + */ + private async dispatchComputeSnapshot(snapshot: DelayedSnapshot): Promise { + if (!this.computeManager) return; + + const callbackUrl = `${env.TRIGGER_WORKLOAD_API_PROTOCOL}://${env.TRIGGER_WORKLOAD_API_DOMAIN}:${env.TRIGGER_WORKLOAD_API_PORT_EXTERNAL}/api/v1/compute/snapshot-complete`; + + const result = await this.computeManager.snapshot({ + runnerId: snapshot.runnerId, + callbackUrl, + metadata: { + runId: snapshot.runFriendlyId, + snapshotFriendlyId: snapshot.snapshotFriendlyId, + }, + }); + + if (!result) { + this.logger.error("Failed to request compute snapshot", { + runId: snapshot.runFriendlyId, + runnerId: snapshot.runnerId, + }); + } + } + + #emitSnapshotSpan(runFriendlyId: string, durationMs?: number, snapshotId?: string) { + if (!env.COMPUTE_TRACE_SPANS_ENABLED) return; + + const ctx = this.runTraceContexts.get(runFriendlyId); + if (!ctx) return; + + const parsed = parseTraceparent(ctx.traceparent); + if (!parsed) return; + + const endEpochMs = Date.now(); + const startEpochMs = durationMs ? endEpochMs - durationMs : endEpochMs; + + const spanAttributes: Record = { + "compute.type": "snapshot", + }; + + if (durationMs !== undefined) { + spanAttributes["compute.total_ms"] = durationMs; + } + + if (snapshotId) { + spanAttributes["compute.snapshot_id"] = snapshotId; + } + + const payload = buildOtlpTracePayload({ + traceId: parsed.traceId, + parentSpanId: parsed.spanId, + spanName: "compute.snapshot", + startTimeMs: startEpochMs, + endTimeMs: endEpochMs, + resourceAttributes: { + "ctx.environment.id": ctx.envId, + "ctx.organization.id": ctx.orgId, + "ctx.project.id": ctx.projectId, + "ctx.run.id": runFriendlyId, + }, + spanAttributes, + }); + + sendOtlpTrace(payload); + } + + registerRunTraceContext(runFriendlyId: string, ctx: RunTraceContext) { + // Evict oldest entries if we've hit the cap + if (this.runTraceContexts.size >= WorkloadServer.MAX_TRACE_CONTEXTS) { + const firstKey = this.runTraceContexts.keys().next().value; + if (firstKey) { + this.runTraceContexts.delete(firstKey); + } + } + + this.runTraceContexts.set(runFriendlyId, ctx); + } + async start() { await this.httpServer.start(); } async stop() { + // Intentionally drop pending snapshots rather than dispatching them. The supervisor + // is shutting down, so our callback URL will be dead by the time the gateway responds. + // Runners detect the supervisor is gone and reconnect to a new instance, which + // re-triggers the snapshot workflow. Snapshots are an optimization, not a correctness + // requirement - runs continue fine without them. + const remaining = this.snapshotDelayWheel?.stop() ?? []; + if (remaining.length > 0) { + this.logger.info("Snapshot delay wheel stopped, dropped pending snapshots", { + count: remaining.length, + }); + this.logger.debug("Dropped snapshot details", { + runs: remaining.map((item) => item.key), + }); + } await this.httpServer.stop(); } } diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index 4b4f22623b0..3df78ce4ee8 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -333,6 +333,11 @@ const EnvironmentSchema = z .optional() .transform((v) => v ?? process.env.DEPLOY_REGISTRY_ECR_ASSUME_ROLE_EXTERNAL_ID), + // Compute gateway (template creation during deploy finalize) + COMPUTE_GATEWAY_URL: z.string().optional(), + COMPUTE_GATEWAY_AUTH_TOKEN: z.string().optional(), + COMPUTE_TEMPLATE_SHADOW_ROLLOUT_PCT: z.string().optional(), + DEPLOY_IMAGE_PLATFORM: z.string().default("linux/amd64"), DEPLOY_TIMEOUT_MS: z.coerce .number() diff --git a/apps/webapp/app/v3/featureFlags.server.ts b/apps/webapp/app/v3/featureFlags.server.ts index f32f34c64b8..8a2e879395d 100644 --- a/apps/webapp/app/v3/featureFlags.server.ts +++ b/apps/webapp/app/v3/featureFlags.server.ts @@ -9,6 +9,7 @@ export const FEATURE_FLAG = { hasLogsPageAccess: "hasLogsPageAccess", hasAiAccess: "hasAiAccess", hasAiModelsAccess: "hasAiModelsAccess", + hasComputeAccess: "hasComputeAccess", } as const; const FeatureFlagCatalog = { @@ -19,6 +20,7 @@ const FeatureFlagCatalog = { [FEATURE_FLAG.hasLogsPageAccess]: z.coerce.boolean(), [FEATURE_FLAG.hasAiAccess]: z.coerce.boolean(), [FEATURE_FLAG.hasAiModelsAccess]: z.coerce.boolean(), + [FEATURE_FLAG.hasComputeAccess]: z.coerce.boolean(), }; type FeatureFlagKey = keyof typeof FeatureFlagCatalog; diff --git a/apps/webapp/app/v3/services/computeTemplateCreation.server.ts b/apps/webapp/app/v3/services/computeTemplateCreation.server.ts new file mode 100644 index 00000000000..873f2f089e3 --- /dev/null +++ b/apps/webapp/app/v3/services/computeTemplateCreation.server.ts @@ -0,0 +1,181 @@ +import { ComputeClient, stripImageDigest } from "@internal/compute"; +import { machinePresetFromName } from "~/v3/machinePresets.server"; +import { env } from "~/env.server"; +import { logger } from "~/services/logger.server"; +import type { PrismaClientOrTransaction } from "~/db.server"; +import { FEATURE_FLAG, makeFlag } from "~/v3/featureFlags.server"; +import type { AuthenticatedEnvironment } from "~/services/apiAuth.server"; +import { ServiceValidationError } from "./baseService.server"; +import { FailDeploymentService } from "./failDeployment.server"; + +type TemplateCreationMode = "required" | "shadow" | "skip"; + +export class ComputeTemplateCreationService { + private client: ComputeClient | undefined; + + constructor() { + if (env.COMPUTE_GATEWAY_URL) { + this.client = new ComputeClient({ + gatewayUrl: env.COMPUTE_GATEWAY_URL, + authToken: env.COMPUTE_GATEWAY_AUTH_TOKEN, + timeoutMs: 5 * 60 * 1000, // 5 minutes + }); + } + } + + /** + * Handle template creation for a deployment. Call this before setting DEPLOYED. + * + * - Required mode: creates template synchronously, fails deployment on error + * - Shadow mode: fires background template creation (returns immediately) + * - Skip: no-op + * + * Throws ServiceValidationError if required mode fails (caller should stop finalize). + */ + async handleDeployTemplate(options: { + projectId: string; + imageReference: string; + deploymentFriendlyId: string; + authenticatedEnv: AuthenticatedEnvironment; + prisma: PrismaClientOrTransaction; + writer?: WritableStreamDefaultWriter; + }): Promise { + const mode = await this.resolveMode(options.projectId, options.prisma); + + if (mode === "skip") { + return; + } + + if (mode === "shadow") { + this.createTemplate(options.imageReference, { background: true }) + .then((result) => { + if (!result.success) { + logger.error("Shadow template creation failed", { + id: options.deploymentFriendlyId, + imageReference: options.imageReference, + error: result.error, + }); + } + }) + .catch((error) => { + logger.error("Shadow template creation threw unexpectedly", { + id: options.deploymentFriendlyId, + imageReference: options.imageReference, + error: error instanceof Error ? error.message : String(error), + }); + }); + return; + } + + // Required mode + if (options.writer) { + await options.writer.write( + `event: log\ndata: ${JSON.stringify({ message: "Building compute template..." })}\n\n` + ); + } + + logger.info("Creating compute template (required mode)", { + id: options.deploymentFriendlyId, + imageReference: options.imageReference, + }); + + const result = await this.createTemplate(options.imageReference); + + if (!result.success) { + logger.error("Compute template creation failed", { + id: options.deploymentFriendlyId, + imageReference: options.imageReference, + error: result.error, + }); + + const failService = new FailDeploymentService(); + await failService.call(options.authenticatedEnv, options.deploymentFriendlyId, { + error: { + name: "TemplateCreationFailed", + message: `Failed to create compute template: ${result.error}`, + }, + }); + + throw new ServiceValidationError( + `Compute template creation failed: ${result.error}` + ); + } + + logger.info("Compute template created", { + id: options.deploymentFriendlyId, + imageReference: options.imageReference, + }); + } + + async resolveMode( + projectId: string, + prisma: PrismaClientOrTransaction + ): Promise { + if (!this.client) { + return "skip"; + } + + const project = await prisma.project.findFirst({ + where: { id: projectId }, + select: { + defaultWorkerGroup: { + select: { workloadType: true }, + }, + organization: { + select: { featureFlags: true }, + }, + }, + }); + + if (project?.defaultWorkerGroup?.workloadType === "MICROVM") { + return "required"; + } + + const flag = makeFlag(prisma); + const hasComputeAccess = await flag({ + key: FEATURE_FLAG.hasComputeAccess, + defaultValue: false, + overrides: (project?.organization?.featureFlags as Record) ?? {}, + }); + + if (hasComputeAccess) { + return "required"; + } + + const rolloutPct = Number(env.COMPUTE_TEMPLATE_SHADOW_ROLLOUT_PCT ?? "0"); + if (rolloutPct > 0 && Math.random() * 100 < rolloutPct) { + return "shadow"; + } + + return "skip"; + } + + async createTemplate( + imageReference: string, + options?: { background?: boolean } + ): Promise<{ success: boolean; error?: string }> { + if (!this.client) { + return { success: false, error: "Compute gateway not configured" }; + } + + try { + // Templates are resource-agnostic - these values don't affect template content. + const machine = machinePresetFromName("small-1x"); + + await this.client.templates.create({ + image: stripImageDigest(imageReference), + cpu: machine.cpu, + memory_mb: machine.memory * 1024, + background: options?.background, + }); + return { success: true }; + } catch (error) { + const message = error instanceof Error ? error.message : "Unknown error"; + logger.error("Failed to create compute template", { + imageReference, + error: message, + }); + return { success: false, error: message }; + } + } +} diff --git a/apps/webapp/app/v3/services/finalizeDeploymentV2.server.ts b/apps/webapp/app/v3/services/finalizeDeploymentV2.server.ts index 2ad2b7b8258..7ca8a379a3d 100644 --- a/apps/webapp/app/v3/services/finalizeDeploymentV2.server.ts +++ b/apps/webapp/app/v3/services/finalizeDeploymentV2.server.ts @@ -15,6 +15,7 @@ import { remoteBuildsEnabled } from "../remoteImageBuilder.server"; import { getEcrAuthToken, isEcrRegistry } from "../getDeploymentImageRef.server"; import { tryCatch } from "@trigger.dev/core"; import { getRegistryConfig, type RegistryConfig } from "../registryConfig.server"; +import { ComputeTemplateCreationService } from "./computeTemplateCreation.server"; export class FinalizeDeploymentV2Service extends BaseService { public async call( @@ -23,12 +24,6 @@ export class FinalizeDeploymentV2Service extends BaseService { body: FinalizeDeploymentRequestBody, writer?: WritableStreamDefaultWriter ) { - // If remote builds are not enabled, lets just use the v1 finalize deployment service - if (!remoteBuildsEnabled()) { - const finalizeService = new FinalizeDeploymentService(); - return finalizeService.call(authenticatedEnv, id, body); - } - const deployment = await this._prisma.workerDeployment.findFirst({ where: { friendlyId: id, @@ -62,7 +57,6 @@ export class FinalizeDeploymentV2Service extends BaseService { if (deployment.status === "DEPLOYED") { logger.debug("Worker deployment is already deployed", { id }); - return deployment; } @@ -73,11 +67,16 @@ export class FinalizeDeploymentV2Service extends BaseService { const finalizeService = new FinalizeDeploymentService(); - if (body.skipPushToRegistry) { - logger.debug("Skipping push to registry during deployment finalization", { - deployment, - }); - return await finalizeService.call(authenticatedEnv, id, body); + // If remote builds are not enabled, skip image push and go straight to template + finalize + if (!remoteBuildsEnabled() || body.skipPushToRegistry) { + if (body.skipPushToRegistry) { + logger.debug("Skipping push to registry during deployment finalization", { + deployment, + }); + } + + await this.#createTemplateIfNeeded(deployment, id, authenticatedEnv, writer); + return finalizeService.call(authenticatedEnv, id, body); } const externalBuildData = deployment.externalBuildData @@ -143,9 +142,29 @@ export class FinalizeDeploymentV2Service extends BaseService { pushedImage: pushResult.image, }); - const finalizedDeployment = await finalizeService.call(authenticatedEnv, id, body); + await this.#createTemplateIfNeeded(deployment, id, authenticatedEnv, writer); + return finalizeService.call(authenticatedEnv, id, body); + } - return finalizedDeployment; + async #createTemplateIfNeeded( + deployment: { imageReference: string | null; worker: { project: { id: string } } | null }, + deploymentFriendlyId: string, + authenticatedEnv: AuthenticatedEnvironment, + writer?: WritableStreamDefaultWriter + ): Promise { + if (!deployment.imageReference || !deployment.worker) { + return; + } + + const templateService = new ComputeTemplateCreationService(); + await templateService.handleDeployTemplate({ + projectId: deployment.worker.project.id, + imageReference: deployment.imageReference, + deploymentFriendlyId, + authenticatedEnv, + prisma: this._prisma, + writer, + }); } } diff --git a/apps/webapp/package.json b/apps/webapp/package.json index e5c5fb472ba..15771c6ea68 100644 --- a/apps/webapp/package.json +++ b/apps/webapp/package.json @@ -57,6 +57,7 @@ "@heroicons/react": "^2.0.12", "@jsonhero/schema-infer": "^0.1.5", "@internal/cache": "workspace:*", + "@internal/compute": "workspace:*", "@internal/llm-model-catalog": "workspace:*", "@internal/redis": "workspace:*", "@internal/run-engine": "workspace:*", diff --git a/internal-packages/compute/package.json b/internal-packages/compute/package.json new file mode 100644 index 00000000000..4671565936c --- /dev/null +++ b/internal-packages/compute/package.json @@ -0,0 +1,14 @@ +{ + "name": "@internal/compute", + "private": true, + "version": "0.0.1", + "main": "./src/index.ts", + "types": "./src/index.ts", + "type": "module", + "dependencies": { + "zod": "3.23.8" + }, + "scripts": { + "typecheck": "tsc --noEmit" + } +} diff --git a/internal-packages/compute/src/client.ts b/internal-packages/compute/src/client.ts new file mode 100644 index 00000000000..4f627bd2830 --- /dev/null +++ b/internal-packages/compute/src/client.ts @@ -0,0 +1,151 @@ +import type { + TemplateCreateRequest, + InstanceCreateRequest, + InstanceCreateResponse, + InstanceSnapshotRequest, + SnapshotRestoreRequest, +} from "./types.js"; + +export type ComputeClientOptions = { + gatewayUrl: string; + authToken?: string; + timeoutMs: number; +}; + +export class ComputeClient { + readonly templates: TemplatesNamespace; + readonly instances: InstancesNamespace; + readonly snapshots: SnapshotsNamespace; + + constructor(private opts: ComputeClientOptions) { + const http = new HttpTransport(opts); + this.templates = new TemplatesNamespace(http); + this.instances = new InstancesNamespace(http); + this.snapshots = new SnapshotsNamespace(http); + } +} + +// ── HTTP transport (shared plumbing) ───────────────────────────────────────── + +type RequestOptions = { + signal?: AbortSignal; +}; + +class HttpTransport { + constructor(private opts: ComputeClientOptions) {} + + private get headers(): Record { + const h: Record = { "Content-Type": "application/json" }; + if (this.opts.authToken) { + h["Authorization"] = `Bearer ${this.opts.authToken}`; + } + return h; + } + + private signal(options?: RequestOptions): AbortSignal { + return options?.signal ?? AbortSignal.timeout(this.opts.timeoutMs); + } + + async post(path: string, body: unknown, options?: RequestOptions): Promise { + const url = `${this.opts.gatewayUrl}${path}`; + + const response = await fetch(url, { + method: "POST", + headers: this.headers, + body: JSON.stringify(body), + signal: this.signal(options), + }); + + if (!response.ok) { + const errorBody = await response.text().catch(() => "unknown error"); + throw new ComputeClientError(response.status, errorBody, url); + } + + // 202 Accepted or 204 No Content - no body to parse + if (response.status === 202 || response.status === 204) { + return undefined; + } + + return (await response.json()) as T; + } + + async delete(path: string, options?: RequestOptions): Promise { + const url = `${this.opts.gatewayUrl}${path}`; + + const response = await fetch(url, { + method: "DELETE", + headers: this.headers, + signal: this.signal(options), + }); + + if (!response.ok) { + const errorBody = await response.text().catch(() => "unknown error"); + throw new ComputeClientError(response.status, errorBody, url); + } + } +} + +// ── Error ──────────────────────────────────────────────────────────────────── + +export class ComputeClientError extends Error { + constructor( + public readonly status: number, + public readonly body: string, + public readonly url: string + ) { + super(`Compute gateway request failed (${status}): ${body}`); + this.name = "ComputeClientError"; + } +} + +// ── Namespaces ─────────────────────────────────────────────────────────────── + +class TemplatesNamespace { + constructor(private http: HttpTransport) {} + + async create( + req: TemplateCreateRequest, + options?: RequestOptions + ): Promise { + await this.http.post("/api/templates", req, options); + } +} + +class InstancesNamespace { + constructor(private http: HttpTransport) {} + + async create( + req: InstanceCreateRequest, + options?: RequestOptions + ): Promise { + const result = await this.http.post("/api/instances", req, options); + if (!result) { + throw new Error("Compute gateway returned no instance body"); + } + return result; + } + + async delete(runnerId: string, options?: RequestOptions): Promise { + return this.http.delete(`/api/instances/${runnerId}`, options); + } + + async snapshot( + runnerId: string, + req: InstanceSnapshotRequest, + options?: RequestOptions + ): Promise { + await this.http.post(`/api/instances/${runnerId}/snapshot`, req, options); + } +} + +class SnapshotsNamespace { + constructor(private http: HttpTransport) {} + + async restore( + snapshotId: string, + req: SnapshotRestoreRequest, + options?: RequestOptions + ): Promise { + await this.http.post(`/api/snapshots/${snapshotId}/restore`, req, options); + } +} diff --git a/internal-packages/compute/src/imageRef.ts b/internal-packages/compute/src/imageRef.ts new file mode 100644 index 00000000000..813f2a6a663 --- /dev/null +++ b/internal-packages/compute/src/imageRef.ts @@ -0,0 +1,11 @@ +/** + * Strip the digest suffix from a container image reference. + * Tags are immutable, so we resolve by tag rather than pinning to a digest. + * + * "ghcr.io/org/image:tag@sha256:abc..." -> "ghcr.io/org/image:tag" + * "ghcr.io/org/image@sha256:abc..." -> "ghcr.io/org/image" + * "ghcr.io/org/image:tag" -> "ghcr.io/org/image:tag" (unchanged) + */ +export function stripImageDigest(imageRef: string): string { + return imageRef.split("@")[0] ?? imageRef; +} diff --git a/internal-packages/compute/src/index.ts b/internal-packages/compute/src/index.ts new file mode 100644 index 00000000000..a8f3e8edb5c --- /dev/null +++ b/internal-packages/compute/src/index.ts @@ -0,0 +1,19 @@ +export { ComputeClient, ComputeClientError } from "./client.js"; +export type { ComputeClientOptions } from "./client.js"; +export { stripImageDigest } from "./imageRef.js"; +export { + TemplateCreateRequestSchema, + TemplateCallbackPayloadSchema, + InstanceCreateRequestSchema, + InstanceCreateResponseSchema, + InstanceSnapshotRequestSchema, + SnapshotRestoreRequestSchema, +} from "./types.js"; +export type { + TemplateCreateRequest, + TemplateCallbackPayload, + InstanceCreateRequest, + InstanceCreateResponse, + InstanceSnapshotRequest, + SnapshotRestoreRequest, +} from "./types.js"; diff --git a/internal-packages/compute/src/types.ts b/internal-packages/compute/src/types.ts new file mode 100644 index 00000000000..6f97ad9847e --- /dev/null +++ b/internal-packages/compute/src/types.ts @@ -0,0 +1,63 @@ +import { z } from "zod"; + +// ── Templates ──────────────────────────────────────────────────────────────── + +export const TemplateCreateRequestSchema = z.object({ + image: z.string(), + cpu: z.number(), + memory_mb: z.number(), + background: z.boolean().optional(), + callback: z + .object({ + url: z.string(), + metadata: z.record(z.string()).optional(), + }) + .optional(), +}); +export type TemplateCreateRequest = z.infer; + +export const TemplateCallbackPayloadSchema = z.object({ + template_id: z.string().optional(), + image: z.string(), + status: z.enum(["completed", "failed"]), + error: z.string().optional(), + metadata: z.record(z.string()).optional(), + duration_ms: z.number().optional(), +}); +export type TemplateCallbackPayload = z.infer; + +// ── Instances ──────────────────────────────────────────────────────────────── + +export const InstanceCreateRequestSchema = z.object({ + name: z.string(), + image: z.string(), + env: z.record(z.string()), + cpu: z.number(), + memory_gb: z.number(), + metadata: z.record(z.unknown()).optional(), +}); +export type InstanceCreateRequest = z.infer; + +export const InstanceCreateResponseSchema = z.object({ + id: z.string(), + _timing: z.unknown().optional(), +}); +export type InstanceCreateResponse = z.infer; + +export const InstanceSnapshotRequestSchema = z.object({ + callback: z.object({ + url: z.string(), + metadata: z.record(z.string()), + }), +}); +export type InstanceSnapshotRequest = z.infer; + +// ── Snapshots ──────────────────────────────────────────────────────────────── + +export const SnapshotRestoreRequestSchema = z.object({ + name: z.string(), + metadata: z.record(z.string()), + cpu: z.number(), + memory_mb: z.number(), +}); +export type SnapshotRestoreRequest = z.infer; diff --git a/internal-packages/compute/tsconfig.json b/internal-packages/compute/tsconfig.json new file mode 100644 index 00000000000..ec9998c5e00 --- /dev/null +++ b/internal-packages/compute/tsconfig.json @@ -0,0 +1,18 @@ +{ + "compilerOptions": { + "target": "ES2019", + "lib": ["ES2019", "DOM", "DOM.Iterable", "DOM.AsyncIterable"], + "module": "Node16", + "moduleResolution": "Node16", + "moduleDetection": "force", + "verbatimModuleSyntax": false, + "esModuleInterop": true, + "forceConsistentCasingInFileNames": true, + "isolatedModules": true, + "preserveWatchOutput": true, + "skipLibCheck": true, + "noEmit": true, + "strict": true + }, + "exclude": ["node_modules"] +} diff --git a/internal-packages/database/prisma/migrations/20260326150000_add_workload_type_to_worker_instance_group/migration.sql b/internal-packages/database/prisma/migrations/20260326150000_add_workload_type_to_worker_instance_group/migration.sql new file mode 100644 index 00000000000..4865ae070e3 --- /dev/null +++ b/internal-packages/database/prisma/migrations/20260326150000_add_workload_type_to_worker_instance_group/migration.sql @@ -0,0 +1,5 @@ +-- CreateEnum +CREATE TYPE "WorkloadType" AS ENUM ('CONTAINER', 'MICROVM'); + +-- AlterTable +ALTER TABLE "WorkerInstanceGroup" ADD COLUMN "workloadType" "WorkloadType" NOT NULL DEFAULT 'CONTAINER'; diff --git a/internal-packages/database/prisma/schema.prisma b/internal-packages/database/prisma/schema.prisma index b60dcd7c9b0..2bcf00c43f3 100644 --- a/internal-packages/database/prisma/schema.prisma +++ b/internal-packages/database/prisma/schema.prisma @@ -1289,6 +1289,11 @@ enum WorkerInstanceGroupType { UNMANAGED } +enum WorkloadType { + CONTAINER + MICROVM +} + model WorkerInstanceGroup { id String @id @default(cuid()) type WorkerInstanceGroupType @@ -1323,6 +1328,8 @@ model WorkerInstanceGroup { location String? staticIPs String? + workloadType WorkloadType @default(CONTAINER) + createdAt DateTime @default(now()) updatedAt DateTime @updatedAt } diff --git a/packages/cli-v3/src/deploy/buildImage.ts b/packages/cli-v3/src/deploy/buildImage.ts index 2225d7db056..31a2b658545 100644 --- a/packages/cli-v3/src/deploy/buildImage.ts +++ b/packages/cli-v3/src/deploy/buildImage.ts @@ -205,6 +205,7 @@ async function remoteBuildImage(options: DepotBuildImageOptions): Promise { await testConsumers[0].onDequeue(messages); } - expect(mockOnDequeue).toHaveBeenCalledWith(messages); + expect(mockOnDequeue).toHaveBeenCalledWith(messages, undefined); advanceTimeAndProcessMetrics(1100); const metrics = pool.getMetrics(); diff --git a/packages/core/src/v3/runEngineWorker/supervisor/consumerPool.ts b/packages/core/src/v3/runEngineWorker/supervisor/consumerPool.ts index 2dd3d1b898b..d72cef75c7c 100644 --- a/packages/core/src/v3/runEngineWorker/supervisor/consumerPool.ts +++ b/packages/core/src/v3/runEngineWorker/supervisor/consumerPool.ts @@ -351,12 +351,12 @@ export class RunQueueConsumerPool { const consumer = this.consumerFactory({ ...this.consumerOptions, - onDequeue: async (messages) => { + onDequeue: async (messages, timing) => { // Always update queue length, default to 0 for empty dequeues or missing value this.updateQueueLength(messages[0]?.workerQueueLength ?? 0); // Forward to the original handler - await this.consumerOptions.onDequeue(messages); + await this.consumerOptions.onDequeue(messages, timing); }, }); diff --git a/packages/core/src/v3/runEngineWorker/supervisor/events.ts b/packages/core/src/v3/runEngineWorker/supervisor/events.ts index a51c504a3e6..df4a93686a9 100644 --- a/packages/core/src/v3/runEngineWorker/supervisor/events.ts +++ b/packages/core/src/v3/runEngineWorker/supervisor/events.ts @@ -6,6 +6,8 @@ export type WorkerEvents = { { time: Date; message: DequeuedMessage; + dequeueResponseMs?: number; + pollingIntervalMs?: number; }, ]; requestRunAttemptStart: [ diff --git a/packages/core/src/v3/runEngineWorker/supervisor/queueConsumer.ts b/packages/core/src/v3/runEngineWorker/supervisor/queueConsumer.ts index 4379eb54f37..76faee40809 100644 --- a/packages/core/src/v3/runEngineWorker/supervisor/queueConsumer.ts +++ b/packages/core/src/v3/runEngineWorker/supervisor/queueConsumer.ts @@ -15,7 +15,7 @@ export type RunQueueConsumerOptions = { preDequeue?: PreDequeueFn; preSkip?: PreSkipFn; maxRunCount?: number; - onDequeue: (messages: WorkerApiDequeueResponseBody) => Promise; + onDequeue: (messages: WorkerApiDequeueResponseBody, timing?: { dequeueResponseMs: number; pollingIntervalMs: number }) => Promise; }; export class RunQueueConsumer implements QueueConsumer { @@ -23,13 +23,14 @@ export class RunQueueConsumer implements QueueConsumer { private readonly preDequeue?: PreDequeueFn; private readonly preSkip?: PreSkipFn; private readonly maxRunCount?: number; - private readonly onDequeue: (messages: WorkerApiDequeueResponseBody) => Promise; + private readonly onDequeue: (messages: WorkerApiDequeueResponseBody, timing?: { dequeueResponseMs: number; pollingIntervalMs: number }) => Promise; private readonly logger = new SimpleStructuredLogger("queue-consumer"); private intervalMs: number; private idleIntervalMs: number; private isEnabled: boolean; + private lastScheduledIntervalMs: number; constructor(opts: RunQueueConsumerOptions) { this.isEnabled = false; @@ -38,6 +39,7 @@ export class RunQueueConsumer implements QueueConsumer { this.preDequeue = opts.preDequeue; this.preSkip = opts.preSkip; this.maxRunCount = opts.maxRunCount; + this.lastScheduledIntervalMs = opts.idleIntervalMs; this.onDequeue = opts.onDequeue; this.client = opts.client; } @@ -111,16 +113,18 @@ export class RunQueueConsumer implements QueueConsumer { let nextIntervalMs = this.idleIntervalMs; try { + const dequeueStart = performance.now(); const response = await this.client.dequeue({ maxResources: preDequeueResult?.maxResources, maxRunCount: this.maxRunCount, }); + const dequeueResponseMs = Math.round(performance.now() - dequeueStart); if (!response.success) { this.logger.error("Failed to dequeue", { error: response.error }); } else { try { - await this.onDequeue(response.data); + await this.onDequeue(response.data, { dequeueResponseMs, pollingIntervalMs: this.lastScheduledIntervalMs }); if (response.data.length > 0) { nextIntervalMs = this.intervalMs; @@ -141,6 +145,7 @@ export class RunQueueConsumer implements QueueConsumer { this.logger.verbose("scheduled dequeue with idle interval", { delayMs }); } + this.lastScheduledIntervalMs = delayMs; setTimeout(this.dequeue.bind(this), delayMs); } } diff --git a/packages/core/src/v3/runEngineWorker/supervisor/session.ts b/packages/core/src/v3/runEngineWorker/supervisor/session.ts index e5a783b8d41..b2d344fb3dc 100644 --- a/packages/core/src/v3/runEngineWorker/supervisor/session.ts +++ b/packages/core/src/v3/runEngineWorker/supervisor/session.ts @@ -80,13 +80,15 @@ export class SupervisorSession extends EventEmitter { }); } - private async onDequeue(messages: WorkerApiDequeueResponseBody): Promise { + private async onDequeue(messages: WorkerApiDequeueResponseBody, timing?: { dequeueResponseMs: number; pollingIntervalMs: number }): Promise { this.logger.verbose("Dequeued messages with contents", { count: messages.length, messages }); for (const message of messages) { this.emit("runQueueMessage", { time: new Date(), message, + dequeueResponseMs: timing?.dequeueResponseMs, + pollingIntervalMs: timing?.pollingIntervalMs, }); } } diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 192a5747f2a..1f2b01e5065 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -190,6 +190,9 @@ importers: '@aws-sdk/client-ecr': specifier: ^3.839.0 version: 3.839.0 + '@internal/compute': + specifier: workspace:* + version: link:../../internal-packages/compute '@kubernetes/client-node': specifier: ^1.0.0 version: 1.0.0(patch_hash=ba1a06f46256cdb8d6faf7167246692c0de2e7cd846a9dc0f13be0137e1c3745)(bufferutil@4.0.9)(encoding@0.1.13) @@ -305,6 +308,9 @@ importers: '@internal/cache': specifier: workspace:* version: link:../../internal-packages/cache + '@internal/compute': + specifier: workspace:* + version: link:../../internal-packages/compute '@internal/llm-model-catalog': specifier: workspace:* version: link:../../internal-packages/llm-model-catalog @@ -1078,6 +1084,12 @@ importers: specifier: 6.0.1 version: 6.0.1 + internal-packages/compute: + dependencies: + zod: + specifier: 3.23.8 + version: 3.23.8 + internal-packages/database: dependencies: '@prisma/client':