From cba8109b80cbad79fbfdd7d4836fea21f8b35a62 Mon Sep 17 00:00:00 2001 From: Vikhyath Mondreti Date: Tue, 24 Mar 2026 03:34:34 -0700 Subject: [PATCH 1/3] fix(mothership): async resume and tool result ordering --- .../lib/copilot/orchestrator/index.test.ts | 50 +++- apps/sim/lib/copilot/orchestrator/index.ts | 248 +++++++++++------- .../sse/handlers/handlers.test.ts | 40 +++ .../orchestrator/sse/handlers/handlers.ts | 5 + .../sse/handlers/tool-execution.ts | 52 +++- .../copilot/orchestrator/sse/utils.test.ts | 2 + .../sim/lib/copilot/orchestrator/sse/utils.ts | 4 +- 7 files changed, 281 insertions(+), 120 deletions(-) diff --git a/apps/sim/lib/copilot/orchestrator/index.test.ts b/apps/sim/lib/copilot/orchestrator/index.test.ts index 9a39b88220f..41ab6d51242 100644 --- a/apps/sim/lib/copilot/orchestrator/index.test.ts +++ b/apps/sim/lib/copilot/orchestrator/index.test.ts @@ -279,16 +279,7 @@ describe('orchestrateCopilotStream async continuation', () => { expect(markAsyncToolDelivered).not.toHaveBeenCalled() }) - it('does not send a partial resume payload when only some pending tool calls are claimable', async () => { - claimCompletedAsyncToolCall - .mockResolvedValueOnce({ toolCallId: 'tool-1' }) - .mockResolvedValueOnce(null) - .mockResolvedValueOnce({ toolCallId: 'tool-1' }) - .mockResolvedValueOnce(null) - .mockResolvedValueOnce({ toolCallId: 'tool-1' }) - .mockResolvedValueOnce(null) - .mockResolvedValueOnce({ toolCallId: 'tool-1' }) - .mockResolvedValueOnce(null) + it('fails explicitly when async continuation cannot be resumed', async () => { getAsyncToolCall.mockResolvedValue(null) runStreamLoop.mockImplementationOnce(async (_url: string, _opts: RequestInit, context: any) => { @@ -310,9 +301,44 @@ describe('orchestrateCopilotStream async continuation', () => { } ) - expect(result.success).toBe(true) + expect(result.success).toBe(false) + expect(runStreamLoop).toHaveBeenCalledTimes(1) + expect(result.error).toContain('Failed to resume async tool continuation') + expect(markAsyncToolDelivered).not.toHaveBeenCalled() + }) + + it('fails explicitly when a sim-handled tool has no durable async row', async () => { + claimCompletedAsyncToolCall.mockResolvedValue(null) + getAsyncToolCall.mockResolvedValue(null) + + runStreamLoop.mockImplementationOnce(async (_url: string, _opts: RequestInit, context: any) => { + context.awaitingAsyncContinuation = { + checkpointId: 'checkpoint-1', + runId: 'run-1', + pendingToolCallIds: ['tool-1'], + } + context.toolCalls.set('tool-1', { + id: 'tool-1', + name: 'read', + status: 'success', + result: { success: true, output: { ok: true } }, + }) + }) + + const result = await orchestrateCopilotStream( + { message: 'hello' }, + { + userId: 'user-1', + workflowId: 'workflow-1', + chatId: 'chat-1', + executionId: 'exec-1', + runId: 'run-1', + } + ) + + expect(result.success).toBe(false) + expect(result.error).toContain('Failed to resume async tool continuation') expect(runStreamLoop).toHaveBeenCalledTimes(1) - expect(releaseCompletedAsyncToolClaim).toHaveBeenCalledWith('tool-1', 'run-1') expect(markAsyncToolDelivered).not.toHaveBeenCalled() }) }) diff --git a/apps/sim/lib/copilot/orchestrator/index.ts b/apps/sim/lib/copilot/orchestrator/index.ts index 961f97efe68..056d296585a 100644 --- a/apps/sim/lib/copilot/orchestrator/index.ts +++ b/apps/sim/lib/copilot/orchestrator/index.ts @@ -14,12 +14,16 @@ import { updateRunStatus, } from '@/lib/copilot/async-runs/repository' import { SIM_AGENT_API_URL, SIM_AGENT_VERSION } from '@/lib/copilot/constants' -import { prepareExecutionContext } from '@/lib/copilot/orchestrator/tool-executor' +import { + isToolAvailableOnSimSide, + prepareExecutionContext, +} from '@/lib/copilot/orchestrator/tool-executor' import type { ExecutionContext, OrchestratorOptions, OrchestratorResult, SSEEvent, + ToolCallState, } from '@/lib/copilot/orchestrator/types' import { env } from '@/lib/core/config/env' import { getEffectiveDecryptedEnv } from '@/lib/environment/utils' @@ -31,18 +35,9 @@ function didAsyncToolSucceed(input: { durableStatus?: string | null durableResult?: Record durableError?: string | null - completion?: { status: string } | undefined - toolStateSuccess?: boolean | undefined toolStateStatus?: string | undefined }) { - const { - durableStatus, - durableResult, - durableError, - completion, - toolStateSuccess, - toolStateStatus, - } = input + const { durableStatus, durableResult, durableError, toolStateStatus } = input if (durableStatus === ASYNC_TOOL_STATUS.completed) { return true @@ -61,7 +56,25 @@ function didAsyncToolSucceed(input: { if (toolStateStatus === 'success') return true if (toolStateStatus === 'error' || toolStateStatus === 'cancelled') return false - return completion?.status === 'success' || toolStateSuccess === true + return false +} + +function isTerminalToolState(status?: ToolCallState['status']): boolean { + return Boolean( + status === 'success' || + status === 'error' || + status === 'cancelled' || + status === 'skipped' || + status === 'rejected' + ) +} + +interface ReadyContinuationTool { + toolCallId: string + toolState?: ToolCallState + durableRow?: Awaited> + needsDurableClaim: boolean + alreadyClaimedByWorker: boolean } export interface OrchestrateStreamOptions extends OrchestratorOptions { @@ -190,33 +203,21 @@ export async function orchestrateCopilotStream( if (!continuation) break let resumeReady = false - let emptyClaimRetries = 0 + let resumeRetries = 0 for (;;) { claimedToolCallIds = [] + claimedByWorkerId = null const resumeWorkerId = continuation.runId || context.runId || context.messageId - claimedByWorkerId = resumeWorkerId - const claimableToolCallIds: string[] = [] + const readyTools: ReadyContinuationTool[] = [] const localPendingPromises: Promise[] = [] + const missingToolCallIds: string[] = [] + for (const toolCallId of continuation.pendingToolCallIds) { - const claimed = await claimCompletedAsyncToolCall(toolCallId, resumeWorkerId).catch( - () => null - ) - if (claimed) { - claimableToolCallIds.push(toolCallId) - claimedToolCallIds.push(toolCallId) - continue - } const durableRow = await getAsyncToolCall(toolCallId).catch(() => null) const localPendingPromise = context.pendingToolPromises.get(toolCallId) - if (!durableRow && localPendingPromise) { - claimableToolCallIds.push(toolCallId) - continue - } - if ( - durableRow && - durableRow.status === ASYNC_TOOL_STATUS.running && - localPendingPromise - ) { + const toolState = context.toolCalls.get(toolCallId) + + if (localPendingPromise) { localPendingPromises.push(localPendingPromise) logger.info('Waiting for local async tool completion before retrying resume claim', { toolCallId, @@ -224,21 +225,55 @@ export async function orchestrateCopilotStream( }) continue } - const toolState = context.toolCalls.get(toolCallId) - if (!durableRow && !localPendingPromise && toolState) { + + if (durableRow && isTerminalAsyncStatus(durableRow.status)) { + if (durableRow.claimedBy && durableRow.claimedBy !== resumeWorkerId) { + missingToolCallIds.push(toolCallId) + logger.warn('Async tool continuation is waiting on a claim held by another worker', { + toolCallId, + runId: continuation.runId, + claimedBy: durableRow.claimedBy, + }) + continue + } + readyTools.push({ + toolCallId, + toolState, + durableRow, + needsDurableClaim: durableRow.claimedBy !== resumeWorkerId, + alreadyClaimedByWorker: durableRow.claimedBy === resumeWorkerId, + }) + continue + } + + if ( + !durableRow && + toolState && + isTerminalToolState(toolState.status) && + !isToolAvailableOnSimSide(toolState.name) + ) { logger.info('Including Go-handled tool in resume payload (no Sim-side row)', { toolCallId, toolName: toolState.name, status: toolState.status, runId: continuation.runId, }) - claimableToolCallIds.push(toolCallId) + readyTools.push({ + toolCallId, + toolState, + needsDurableClaim: false, + alreadyClaimedByWorker: false, + }) continue } + logger.warn('Skipping already-claimed or missing async tool resume', { toolCallId, runId: continuation.runId, + durableStatus: durableRow?.status, + toolStateStatus: toolState?.status, }) + missingToolCallIds.push(toolCallId) } if (localPendingPromises.length > 0) { @@ -246,83 +281,104 @@ export async function orchestrateCopilotStream( continue } - const missingToolCallIds = continuation.pendingToolCallIds.filter( - (toolCallId) => !claimableToolCallIds.includes(toolCallId) - ) if (missingToolCallIds.length > 0) { - if (claimedToolCallIds.length > 0 && claimedByWorkerId) { - logger.info('Releasing partial async tool claims before retrying resume', { + if (resumeRetries < 3) { + resumeRetries++ + logger.info('Retrying async resume after some tool calls were not yet ready', { checkpointId: continuation.checkpointId, runId: continuation.runId, - claimedToolCallIds, + retry: resumeRetries, missingToolCallIds, }) - await Promise.all( - claimedToolCallIds.map((toolCallId) => - releaseCompletedAsyncToolClaim(toolCallId, claimedByWorkerId!).catch(() => null) - ) - ) - claimedToolCallIds = [] - claimedByWorkerId = null + await new Promise((resolve) => setTimeout(resolve, 250 * resumeRetries)) + continue } - if (emptyClaimRetries < 3) { - emptyClaimRetries++ - logger.info( - 'Retrying async resume claim after only a subset of tool calls were claimable', - { - checkpointId: continuation.checkpointId, - runId: continuation.runId, - retry: emptyClaimRetries, - missingToolCallIds, - } - ) - await new Promise((resolve) => setTimeout(resolve, 250 * emptyClaimRetries)) + throw new Error( + `Failed to resume async tool continuation: pending tool calls were not ready (${missingToolCallIds.join(', ')})` + ) + } + + if (readyTools.length === 0) { + if (resumeRetries < 3 && continuation.pendingToolCallIds.length > 0) { + resumeRetries++ + logger.info('Retrying async resume because no tool calls were ready yet', { + checkpointId: continuation.checkpointId, + runId: continuation.runId, + retry: resumeRetries, + }) + await new Promise((resolve) => setTimeout(resolve, 250 * resumeRetries)) continue } - logger.warn('Skipping async resume because not all tool calls were claimable', { - checkpointId: continuation.checkpointId, - runId: continuation.runId, - claimableToolCallIds, - missingToolCallIds, - }) - context.awaitingAsyncContinuation = undefined - break + throw new Error('Failed to resume async tool continuation: no tool calls were ready') } - if (claimableToolCallIds.length === 0) { - if (emptyClaimRetries < 3 && continuation.pendingToolCallIds.length > 0) { - emptyClaimRetries++ - logger.info('Retrying async resume claim after no tool calls were claimable', { + const claimCandidates = readyTools.filter((tool) => tool.needsDurableClaim) + const newlyClaimedToolCallIds: string[] = [] + const claimFailures: string[] = [] + + for (const tool of claimCandidates) { + const claimed = await claimCompletedAsyncToolCall(tool.toolCallId, resumeWorkerId).catch( + () => null + ) + if (!claimed) { + claimFailures.push(tool.toolCallId) + continue + } + newlyClaimedToolCallIds.push(tool.toolCallId) + } + + if (claimFailures.length > 0) { + if (newlyClaimedToolCallIds.length > 0) { + logger.info('Releasing async tool claims after claim contention during resume', { checkpointId: continuation.checkpointId, runId: continuation.runId, - retry: emptyClaimRetries, + newlyClaimedToolCallIds, + claimFailures, }) - await new Promise((resolve) => setTimeout(resolve, 250 * emptyClaimRetries)) + await Promise.all( + newlyClaimedToolCallIds.map((toolCallId) => + releaseCompletedAsyncToolClaim(toolCallId, resumeWorkerId).catch(() => null) + ) + ) + } + if (resumeRetries < 3) { + resumeRetries++ + logger.info('Retrying async resume after claim contention', { + checkpointId: continuation.checkpointId, + runId: continuation.runId, + retry: resumeRetries, + claimFailures, + }) + await new Promise((resolve) => setTimeout(resolve, 250 * resumeRetries)) continue } - logger.warn('Skipping async resume because no tool calls were claimable', { - checkpointId: continuation.checkpointId, - runId: continuation.runId, - }) - context.awaitingAsyncContinuation = undefined - break + throw new Error( + `Failed to resume async tool continuation: unable to claim tool calls (${claimFailures.join(', ')})` + ) } + claimedToolCallIds = [ + ...readyTools + .filter((tool) => tool.alreadyClaimedByWorker) + .map((tool) => tool.toolCallId), + ...newlyClaimedToolCallIds, + ] + claimedByWorkerId = claimedToolCallIds.length > 0 ? resumeWorkerId : null + logger.info('Resuming async tool continuation', { checkpointId: continuation.checkpointId, runId: continuation.runId, - toolCallIds: claimableToolCallIds, + toolCallIds: readyTools.map((tool) => tool.toolCallId), }) - const durableRows = await getAsyncToolCalls(claimableToolCallIds).catch(() => []) + const durableRows = await getAsyncToolCalls( + readyTools.map((tool) => tool.toolCallId) + ).catch(() => []) const durableByToolCallId = new Map(durableRows.map((row) => [row.toolCallId, row])) const results = await Promise.all( - claimableToolCallIds.map(async (toolCallId) => { - const completion = await context.pendingToolPromises.get(toolCallId) - const toolState = context.toolCalls.get(toolCallId) - - const durable = durableByToolCallId.get(toolCallId) + readyTools.map(async (tool) => { + const durable = durableByToolCallId.get(tool.toolCallId) || tool.durableRow const durableStatus = durable?.status const durableResult = durable?.result && typeof durable.result === 'object' @@ -332,19 +388,15 @@ export async function orchestrateCopilotStream( durableStatus, durableResult, durableError: durable?.error, - completion, - toolStateSuccess: toolState?.result?.success, - toolStateStatus: toolState?.status, + toolStateStatus: tool.toolState?.status, }) const data = durableResult || - completion?.data || - (toolState?.result?.output as Record | undefined) || + (tool.toolState?.result?.output as Record | undefined) || (success - ? { message: completion?.message || 'Tool completed' } + ? { message: 'Tool completed' } : { - error: - completion?.message || durable?.error || toolState?.error || 'Tool failed', + error: durable?.error || tool.toolState?.error || 'Tool failed', }) if ( @@ -353,14 +405,14 @@ export async function orchestrateCopilotStream( !isDeliveredAsyncStatus(durableStatus) ) { logger.warn('Async tool row was claimed for resume without terminal durable state', { - toolCallId, + toolCallId: tool.toolCallId, status: durableStatus, }) } return { - callId: toolCallId, - name: durable?.toolName || toolState?.name || '', + callId: tool.toolCallId, + name: durable?.toolName || tool.toolState?.name || '', data, success, } diff --git a/apps/sim/lib/copilot/orchestrator/sse/handlers/handlers.test.ts b/apps/sim/lib/copilot/orchestrator/sse/handlers/handlers.test.ts index 79fad88e81b..0747c2a67bf 100644 --- a/apps/sim/lib/copilot/orchestrator/sse/handlers/handlers.test.ts +++ b/apps/sim/lib/copilot/orchestrator/sse/handlers/handlers.test.ts @@ -209,4 +209,44 @@ describe('sse-handlers tool lifecycle', () => { expect(markToolComplete).toHaveBeenCalledTimes(1) expect(context.toolCalls.get('tool-upsert-fail')?.status).toBe('success') }) + + it('does not execute a tool if a terminal tool_result arrives before local execution starts', async () => { + let resolveUpsert: ((value: null) => void) | undefined + upsertAsyncToolCall.mockImplementationOnce( + () => + new Promise((resolve) => { + resolveUpsert = resolve + }) + ) + const onEvent = vi.fn() + + await sseHandlers.tool_call( + { + type: 'tool_call', + data: { id: 'tool-race', name: 'read', arguments: { workflowId: 'workflow-1' } }, + } as any, + context, + execContext, + { onEvent, interactive: false, timeout: 1000 } + ) + + await sseHandlers.tool_result( + { + type: 'tool_result', + toolCallId: 'tool-race', + data: { id: 'tool-race', success: true, result: { ok: true } }, + } as any, + context, + execContext, + { onEvent, interactive: false, timeout: 1000 } + ) + + resolveUpsert?.(null) + await new Promise((resolve) => setTimeout(resolve, 0)) + + expect(executeToolServerSide).not.toHaveBeenCalled() + expect(markToolComplete).not.toHaveBeenCalled() + expect(context.toolCalls.get('tool-race')?.status).toBe('success') + expect(context.toolCalls.get('tool-race')?.result?.output).toEqual({ ok: true }) + }) }) diff --git a/apps/sim/lib/copilot/orchestrator/sse/handlers/handlers.ts b/apps/sim/lib/copilot/orchestrator/sse/handlers/handlers.ts index 6466b66174a..506e2be2c30 100644 --- a/apps/sim/lib/copilot/orchestrator/sse/handlers/handlers.ts +++ b/apps/sim/lib/copilot/orchestrator/sse/handlers/handlers.ts @@ -263,6 +263,7 @@ export const sseHandlers: Record = { const resultObj = asRecord(data?.result) current.error = (data?.error || resultObj.error) as string | undefined } + markToolResultSeen(toolCallId) }, tool_error: (event, context) => { const data = getEventData(event) @@ -273,6 +274,7 @@ export const sseHandlers: Record = { current.status = 'error' current.error = (data?.error as string | undefined) || 'Tool execution failed' current.endTime = Date.now() + markToolResultSeen(toolCallId) }, tool_call_delta: () => { // Argument streaming delta — no action needed on orchestrator side @@ -719,6 +721,9 @@ export const subAgentHandlers: Record = { mainToolCall.error = (data?.error || resultObj.error) as string | undefined } } + if (subAgentToolCall || mainToolCall) { + markToolResultSeen(toolCallId) + } }, } diff --git a/apps/sim/lib/copilot/orchestrator/sse/handlers/tool-execution.ts b/apps/sim/lib/copilot/orchestrator/sse/handlers/tool-execution.ts index 4d5196d9611..c299e0d9c06 100644 --- a/apps/sim/lib/copilot/orchestrator/sse/handlers/tool-execution.ts +++ b/apps/sim/lib/copilot/orchestrator/sse/handlers/tool-execution.ts @@ -4,11 +4,7 @@ import { createLogger } from '@sim/logger' import { eq } from 'drizzle-orm' import { completeAsyncToolCall, markAsyncToolRunning } from '@/lib/copilot/async-runs/repository' import { waitForToolConfirmation } from '@/lib/copilot/orchestrator/persistence' -import { - asRecord, - markToolResultSeen, - wasToolResultSeen, -} from '@/lib/copilot/orchestrator/sse/utils' +import { asRecord, markToolResultSeen } from '@/lib/copilot/orchestrator/sse/utils' import { executeToolServerSide, markToolComplete } from '@/lib/copilot/orchestrator/tool-executor' import type { ExecutionContext, @@ -247,6 +243,45 @@ function cancelledCompletion(message: string): AsyncToolCompletion { } } +function isTerminalToolCallStatus(status: string): boolean { + return ( + status === 'success' || + status === 'error' || + status === 'cancelled' || + status === 'skipped' || + status === 'rejected' + ) +} + +function terminalCompletionFromToolCall(toolCall: { + status: string + error?: string + result?: { output?: unknown; error?: string } +}): AsyncToolCompletion { + if (toolCall.status === 'cancelled') { + return cancelledCompletion(toolCall.error || 'Tool execution cancelled') + } + + if (toolCall.status === 'success' || toolCall.status === 'skipped') { + return { + status: 'success', + message: toolCall.error || 'Tool completed', + data: + toolCall.result?.output && + typeof toolCall.result.output === 'object' && + !Array.isArray(toolCall.result.output) + ? (toolCall.result.output as Record) + : undefined, + } + } + + return { + status: toolCall.status === 'rejected' ? 'rejected' : 'error', + message: toolCall.error || toolCall.result?.error || 'Tool failed', + data: { error: toolCall.error || toolCall.result?.error || 'Tool failed' }, + } +} + function reportCancelledTool( toolCall: { id: string; name: string }, message: string, @@ -509,8 +544,8 @@ export async function executeToolAndReport( if (toolCall.status === 'executing') { return { status: 'running', message: 'Tool already executing' } } - if (wasToolResultSeen(toolCall.id)) { - return { status: 'success', message: 'Tool result already processed' } + if (toolCall.endTime || isTerminalToolCallStatus(toolCall.status)) { + return terminalCompletionFromToolCall(toolCall) } if (abortRequested(context, execContext, options)) { @@ -538,6 +573,9 @@ export async function executeToolAndReport( try { let result = await executeToolServerSide(toolCall, execContext) + if (toolCall.endTime || isTerminalToolCallStatus(toolCall.status)) { + return terminalCompletionFromToolCall(toolCall) + } if (abortRequested(context, execContext, options)) { toolCall.status = 'cancelled' toolCall.endTime = Date.now() diff --git a/apps/sim/lib/copilot/orchestrator/sse/utils.test.ts b/apps/sim/lib/copilot/orchestrator/sse/utils.test.ts index 511cf45d658..56eaad33789 100644 --- a/apps/sim/lib/copilot/orchestrator/sse/utils.test.ts +++ b/apps/sim/lib/copilot/orchestrator/sse/utils.test.ts @@ -3,6 +3,7 @@ */ import { describe, expect, it } from 'vitest' import { + markToolResultSeen, normalizeSseEvent, shouldSkipToolCallEvent, shouldSkipToolResultEvent, @@ -37,6 +38,7 @@ describe('sse-utils', () => { it.concurrent('dedupes tool_result events', () => { const event = { type: 'tool_result', data: { id: 'tool_result_1', name: 'plan' } } expect(shouldSkipToolResultEvent(event as any)).toBe(false) + markToolResultSeen('tool_result_1') expect(shouldSkipToolResultEvent(event as any)).toBe(true) }) }) diff --git a/apps/sim/lib/copilot/orchestrator/sse/utils.ts b/apps/sim/lib/copilot/orchestrator/sse/utils.ts index 2b164794d00..3619012d9f9 100644 --- a/apps/sim/lib/copilot/orchestrator/sse/utils.ts +++ b/apps/sim/lib/copilot/orchestrator/sse/utils.ts @@ -125,7 +125,5 @@ export function shouldSkipToolResultEvent(event: SSEEvent): boolean { if (event.type !== 'tool_result') return false const toolCallId = getToolCallIdFromEvent(event) if (!toolCallId) return false - if (wasToolResultSeen(toolCallId)) return true - markToolResultSeen(toolCallId) - return false + return wasToolResultSeen(toolCallId) } From 39bc691d6c9aed0237626194676e204650ed2b88 Mon Sep 17 00:00:00 2001 From: Vikhyath Mondreti Date: Tue, 24 Mar 2026 03:35:23 -0700 Subject: [PATCH 2/3] ensure tool call terminal state --- .../lib/copilot/orchestrator/index.test.ts | 4 ++ .../sse/handlers/handlers.test.ts | 32 ++++++++++++ .../orchestrator/sse/handlers/handlers.ts | 49 +++++++++++++++++-- 3 files changed, 80 insertions(+), 5 deletions(-) diff --git a/apps/sim/lib/copilot/orchestrator/index.test.ts b/apps/sim/lib/copilot/orchestrator/index.test.ts index 41ab6d51242..3e8f71287f2 100644 --- a/apps/sim/lib/copilot/orchestrator/index.test.ts +++ b/apps/sim/lib/copilot/orchestrator/index.test.ts @@ -7,6 +7,7 @@ import type { OrchestratorOptions } from './types' const { prepareExecutionContext, + isToolAvailableOnSimSide, getEffectiveDecryptedEnv, runStreamLoop, claimCompletedAsyncToolCall, @@ -17,6 +18,7 @@ const { updateRunStatus, } = vi.hoisted(() => ({ prepareExecutionContext: vi.fn(), + isToolAvailableOnSimSide: vi.fn().mockReturnValue(true), getEffectiveDecryptedEnv: vi.fn(), runStreamLoop: vi.fn(), claimCompletedAsyncToolCall: vi.fn(), @@ -29,6 +31,7 @@ const { vi.mock('@/lib/copilot/orchestrator/tool-executor', () => ({ prepareExecutionContext, + isToolAvailableOnSimSide, })) vi.mock('@/lib/environment/utils', () => ({ @@ -58,6 +61,7 @@ import { orchestrateCopilotStream } from './index' describe('orchestrateCopilotStream async continuation', () => { beforeEach(() => { vi.clearAllMocks() + isToolAvailableOnSimSide.mockReturnValue(true) prepareExecutionContext.mockResolvedValue({ userId: 'user-1', workflowId: 'workflow-1', diff --git a/apps/sim/lib/copilot/orchestrator/sse/handlers/handlers.test.ts b/apps/sim/lib/copilot/orchestrator/sse/handlers/handlers.test.ts index 0747c2a67bf..0b4001244d6 100644 --- a/apps/sim/lib/copilot/orchestrator/sse/handlers/handlers.test.ts +++ b/apps/sim/lib/copilot/orchestrator/sse/handlers/handlers.test.ts @@ -249,4 +249,36 @@ describe('sse-handlers tool lifecycle', () => { expect(context.toolCalls.get('tool-race')?.status).toBe('success') expect(context.toolCalls.get('tool-race')?.result?.output).toEqual({ ok: true }) }) + + it('does not execute a tool if a tool_result arrives before the tool_call event', async () => { + const onEvent = vi.fn() + + await sseHandlers.tool_result( + { + type: 'tool_result', + toolCallId: 'tool-early-result', + toolName: 'read', + data: { id: 'tool-early-result', name: 'read', success: true, result: { ok: true } }, + } as any, + context, + execContext, + { onEvent, interactive: false, timeout: 1000 } + ) + + await sseHandlers.tool_call( + { + type: 'tool_call', + data: { id: 'tool-early-result', name: 'read', arguments: { workflowId: 'workflow-1' } }, + } as any, + context, + execContext, + { onEvent, interactive: false, timeout: 1000 } + ) + + await new Promise((resolve) => setTimeout(resolve, 0)) + + expect(executeToolServerSide).not.toHaveBeenCalled() + expect(markToolComplete).not.toHaveBeenCalled() + expect(context.toolCalls.get('tool-early-result')?.status).toBe('success') + }) }) diff --git a/apps/sim/lib/copilot/orchestrator/sse/handlers/handlers.ts b/apps/sim/lib/copilot/orchestrator/sse/handlers/handlers.ts index 506e2be2c30..17b820555c2 100644 --- a/apps/sim/lib/copilot/orchestrator/sse/handlers/handlers.ts +++ b/apps/sim/lib/copilot/orchestrator/sse/handlers/handlers.ts @@ -213,6 +213,27 @@ function inferToolSuccess(data: Record | undefined): { return { success, hasResultData, hasError } } +function ensureTerminalToolCallState( + context: StreamingContext, + toolCallId: string, + toolName: string +): ToolCallState { + const existing = context.toolCalls.get(toolCallId) + if (existing) { + return existing + } + + const toolCall: ToolCallState = { + id: toolCallId, + name: toolName || 'unknown_tool', + status: 'pending', + startTime: Date.now(), + } + context.toolCalls.set(toolCallId, toolCall) + addContentBlock(context, { type: 'tool_call', toolCall }) + return toolCall +} + export type SSEHandler = ( event: SSEEvent, context: StreamingContext, @@ -246,8 +267,12 @@ export const sseHandlers: Record = { const data = getEventData(event) const toolCallId = event.toolCallId || (data?.id as string | undefined) if (!toolCallId) return - const current = context.toolCalls.get(toolCallId) - if (!current) return + const toolName = + event.toolName || + (data?.name as string | undefined) || + context.toolCalls.get(toolCallId)?.name || + '' + const current = ensureTerminalToolCallState(context, toolCallId, toolName) const { success, hasResultData, hasError } = inferToolSuccess(data) @@ -269,8 +294,12 @@ export const sseHandlers: Record = { const data = getEventData(event) const toolCallId = event.toolCallId || (data?.id as string | undefined) if (!toolCallId) return - const current = context.toolCalls.get(toolCallId) - if (!current) return + const toolName = + event.toolName || + (data?.name as string | undefined) || + context.toolCalls.get(toolCallId)?.name || + '' + const current = ensureTerminalToolCallState(context, toolCallId, toolName) current.status = 'error' current.error = (data?.error as string | undefined) || 'Tool execution failed' current.endTime = Date.now() @@ -315,6 +344,9 @@ export const sseHandlers: Record = { existing?.endTime || (existing && existing.status !== 'pending' && existing.status !== 'executing') ) { + if (!existing.name && toolName) { + existing.name = toolName + } if (!existing.params && args) { existing.params = args } @@ -560,6 +592,12 @@ export const subAgentHandlers: Record = { const existing = context.toolCalls.get(toolCallId) // Ignore late/duplicate tool_call events once we already have a result. if (wasToolResultSeen(toolCallId) || existing?.endTime) { + if (existing && !existing.name && toolName) { + existing.name = toolName + } + if (existing && !existing.params && args) { + existing.params = args + } return } @@ -688,13 +726,14 @@ export const subAgentHandlers: Record = { const data = getEventData(event) const toolCallId = event.toolCallId || (data?.id as string | undefined) if (!toolCallId) return + const toolName = event.toolName || (data?.name as string | undefined) || '' // Update in subAgentToolCalls. const toolCalls = context.subAgentToolCalls[parentToolCallId] || [] const subAgentToolCall = toolCalls.find((tc) => tc.id === toolCallId) // Also update in main toolCalls (where we added it for execution). - const mainToolCall = context.toolCalls.get(toolCallId) + const mainToolCall = ensureTerminalToolCallState(context, toolCallId, toolName) const { success, hasResultData, hasError } = inferToolSuccess(data) From 24b94d20ba85bbe318ab7d42fc521d05597fccda Mon Sep 17 00:00:00 2001 From: Vikhyath Mondreti Date: Tue, 24 Mar 2026 03:47:32 -0700 Subject: [PATCH 3/3] address comments --- .../lib/copilot/orchestrator/index.test.ts | 348 ------------------ apps/sim/lib/copilot/orchestrator/index.ts | 25 +- .../sse/handlers/tool-execution.ts | 40 +- apps/sim/lib/copilot/orchestrator/types.ts | 12 + 4 files changed, 42 insertions(+), 383 deletions(-) delete mode 100644 apps/sim/lib/copilot/orchestrator/index.test.ts diff --git a/apps/sim/lib/copilot/orchestrator/index.test.ts b/apps/sim/lib/copilot/orchestrator/index.test.ts deleted file mode 100644 index 3e8f71287f2..00000000000 --- a/apps/sim/lib/copilot/orchestrator/index.test.ts +++ /dev/null @@ -1,348 +0,0 @@ -/** - * @vitest-environment node - */ - -import { beforeEach, describe, expect, it, vi } from 'vitest' -import type { OrchestratorOptions } from './types' - -const { - prepareExecutionContext, - isToolAvailableOnSimSide, - getEffectiveDecryptedEnv, - runStreamLoop, - claimCompletedAsyncToolCall, - getAsyncToolCall, - getAsyncToolCalls, - markAsyncToolDelivered, - releaseCompletedAsyncToolClaim, - updateRunStatus, -} = vi.hoisted(() => ({ - prepareExecutionContext: vi.fn(), - isToolAvailableOnSimSide: vi.fn().mockReturnValue(true), - getEffectiveDecryptedEnv: vi.fn(), - runStreamLoop: vi.fn(), - claimCompletedAsyncToolCall: vi.fn(), - getAsyncToolCall: vi.fn(), - getAsyncToolCalls: vi.fn(), - markAsyncToolDelivered: vi.fn(), - releaseCompletedAsyncToolClaim: vi.fn(), - updateRunStatus: vi.fn(), -})) - -vi.mock('@/lib/copilot/orchestrator/tool-executor', () => ({ - prepareExecutionContext, - isToolAvailableOnSimSide, -})) - -vi.mock('@/lib/environment/utils', () => ({ - getEffectiveDecryptedEnv, -})) - -vi.mock('@/lib/copilot/async-runs/repository', () => ({ - claimCompletedAsyncToolCall, - getAsyncToolCall, - getAsyncToolCalls, - markAsyncToolDelivered, - releaseCompletedAsyncToolClaim, - updateRunStatus, -})) - -vi.mock('@/lib/copilot/orchestrator/stream/core', async () => { - const actual = await vi.importActual('./stream/core') - return { - ...actual, - buildToolCallSummaries: vi.fn(() => []), - runStreamLoop, - } -}) - -import { orchestrateCopilotStream } from './index' - -describe('orchestrateCopilotStream async continuation', () => { - beforeEach(() => { - vi.clearAllMocks() - isToolAvailableOnSimSide.mockReturnValue(true) - prepareExecutionContext.mockResolvedValue({ - userId: 'user-1', - workflowId: 'workflow-1', - chatId: 'chat-1', - }) - getEffectiveDecryptedEnv.mockResolvedValue({}) - claimCompletedAsyncToolCall.mockResolvedValue({ toolCallId: 'tool-1' }) - getAsyncToolCall.mockResolvedValue({ - toolCallId: 'tool-1', - toolName: 'read', - status: 'completed', - result: { ok: true }, - error: null, - }) - getAsyncToolCalls.mockResolvedValue([ - { - toolCallId: 'tool-1', - toolName: 'read', - status: 'completed', - result: { ok: true }, - error: null, - }, - ]) - markAsyncToolDelivered.mockResolvedValue(null) - releaseCompletedAsyncToolClaim.mockResolvedValue(null) - updateRunStatus.mockResolvedValue(null) - }) - - it('builds resume payloads with success=true for claimed completed rows', async () => { - runStreamLoop - .mockImplementationOnce(async (_url: string, _opts: RequestInit, context: any) => { - context.awaitingAsyncContinuation = { - checkpointId: 'checkpoint-1', - runId: 'run-1', - pendingToolCallIds: ['tool-1'], - } - }) - .mockImplementationOnce(async (url: string, opts: RequestInit) => { - expect(url).toContain('/api/tools/resume') - const body = JSON.parse(String(opts.body)) - expect(body).toEqual({ - checkpointId: 'checkpoint-1', - results: [ - { - callId: 'tool-1', - name: 'read', - data: { ok: true }, - success: true, - }, - ], - }) - }) - - const result = await orchestrateCopilotStream( - { message: 'hello' }, - { - userId: 'user-1', - workflowId: 'workflow-1', - chatId: 'chat-1', - executionId: 'exec-1', - runId: 'run-1', - } - ) - - expect(result.success).toBe(true) - expect(markAsyncToolDelivered).toHaveBeenCalledWith('tool-1') - }) - - it('marks claimed tool calls delivered even when the resumed stream later records errors', async () => { - runStreamLoop - .mockImplementationOnce(async (_url: string, _opts: RequestInit, context: any) => { - context.awaitingAsyncContinuation = { - checkpointId: 'checkpoint-1', - runId: 'run-1', - pendingToolCallIds: ['tool-1'], - } - }) - .mockImplementationOnce(async (_url: string, _opts: RequestInit, context: any) => { - context.errors.push('resume stream failed after handoff') - }) - - const result = await orchestrateCopilotStream( - { message: 'hello' }, - { - userId: 'user-1', - workflowId: 'workflow-1', - chatId: 'chat-1', - executionId: 'exec-1', - runId: 'run-1', - } - ) - - expect(result.success).toBe(false) - expect(markAsyncToolDelivered).toHaveBeenCalledWith('tool-1') - }) - - it('forwards done events while still marking async pauses on the run', async () => { - const onEvent = vi.fn() - const streamOptions: OrchestratorOptions = { onEvent } - runStreamLoop.mockImplementationOnce( - async (_url: string, _opts: RequestInit, _context: any, _exec: any, loopOptions: any) => { - await loopOptions.onEvent({ - type: 'done', - data: { - response: { - async_pause: { - checkpointId: 'checkpoint-1', - runId: 'run-1', - }, - }, - }, - }) - } - ) - - await orchestrateCopilotStream( - { message: 'hello' }, - { - userId: 'user-1', - workflowId: 'workflow-1', - chatId: 'chat-1', - executionId: 'exec-1', - runId: 'run-1', - ...streamOptions, - } - ) - - expect(onEvent).toHaveBeenCalledWith(expect.objectContaining({ type: 'done' })) - expect(updateRunStatus).toHaveBeenCalledWith('run-1', 'paused_waiting_for_tool') - }) - - it('waits for a local running tool before retrying the claim', async () => { - const localPendingPromise = Promise.resolve({ - status: 'success', - data: { ok: true }, - }) - - claimCompletedAsyncToolCall - .mockResolvedValueOnce(null) - .mockResolvedValueOnce({ toolCallId: 'tool-1' }) - getAsyncToolCall - .mockResolvedValueOnce({ - toolCallId: 'tool-1', - toolName: 'read', - status: 'running', - result: null, - error: null, - }) - .mockResolvedValue({ - toolCallId: 'tool-1', - toolName: 'read', - status: 'completed', - result: { ok: true }, - error: null, - }) - - runStreamLoop - .mockImplementationOnce(async (_url: string, _opts: RequestInit, context: any) => { - context.awaitingAsyncContinuation = { - checkpointId: 'checkpoint-1', - runId: 'run-1', - pendingToolCallIds: ['tool-1'], - } - context.pendingToolPromises.set('tool-1', localPendingPromise) - }) - .mockImplementationOnce(async (url: string, opts: RequestInit) => { - expect(url).toContain('/api/tools/resume') - const body = JSON.parse(String(opts.body)) - expect(body.results[0]).toEqual({ - callId: 'tool-1', - name: 'read', - data: { ok: true }, - success: true, - }) - }) - - const result = await orchestrateCopilotStream( - { message: 'hello' }, - { - userId: 'user-1', - workflowId: 'workflow-1', - chatId: 'chat-1', - executionId: 'exec-1', - runId: 'run-1', - } - ) - - expect(result.success).toBe(true) - expect(runStreamLoop).toHaveBeenCalledTimes(2) - expect(markAsyncToolDelivered).toHaveBeenCalledWith('tool-1') - }) - - it('releases claimed rows if the resume stream throws before delivery is marked', async () => { - runStreamLoop - .mockImplementationOnce(async (_url: string, _opts: RequestInit, context: any) => { - context.awaitingAsyncContinuation = { - checkpointId: 'checkpoint-1', - runId: 'run-1', - pendingToolCallIds: ['tool-1'], - } - }) - .mockImplementationOnce(async () => { - throw new Error('resume failed') - }) - - const result = await orchestrateCopilotStream( - { message: 'hello' }, - { - userId: 'user-1', - workflowId: 'workflow-1', - chatId: 'chat-1', - executionId: 'exec-1', - runId: 'run-1', - } - ) - - expect(result.success).toBe(false) - expect(releaseCompletedAsyncToolClaim).toHaveBeenCalledWith('tool-1', 'run-1') - expect(markAsyncToolDelivered).not.toHaveBeenCalled() - }) - - it('fails explicitly when async continuation cannot be resumed', async () => { - getAsyncToolCall.mockResolvedValue(null) - - runStreamLoop.mockImplementationOnce(async (_url: string, _opts: RequestInit, context: any) => { - context.awaitingAsyncContinuation = { - checkpointId: 'checkpoint-1', - runId: 'run-1', - pendingToolCallIds: ['tool-1', 'tool-2'], - } - }) - - const result = await orchestrateCopilotStream( - { message: 'hello' }, - { - userId: 'user-1', - workflowId: 'workflow-1', - chatId: 'chat-1', - executionId: 'exec-1', - runId: 'run-1', - } - ) - - expect(result.success).toBe(false) - expect(runStreamLoop).toHaveBeenCalledTimes(1) - expect(result.error).toContain('Failed to resume async tool continuation') - expect(markAsyncToolDelivered).not.toHaveBeenCalled() - }) - - it('fails explicitly when a sim-handled tool has no durable async row', async () => { - claimCompletedAsyncToolCall.mockResolvedValue(null) - getAsyncToolCall.mockResolvedValue(null) - - runStreamLoop.mockImplementationOnce(async (_url: string, _opts: RequestInit, context: any) => { - context.awaitingAsyncContinuation = { - checkpointId: 'checkpoint-1', - runId: 'run-1', - pendingToolCallIds: ['tool-1'], - } - context.toolCalls.set('tool-1', { - id: 'tool-1', - name: 'read', - status: 'success', - result: { success: true, output: { ok: true } }, - }) - }) - - const result = await orchestrateCopilotStream( - { message: 'hello' }, - { - userId: 'user-1', - workflowId: 'workflow-1', - chatId: 'chat-1', - executionId: 'exec-1', - runId: 'run-1', - } - ) - - expect(result.success).toBe(false) - expect(result.error).toContain('Failed to resume async tool continuation') - expect(runStreamLoop).toHaveBeenCalledTimes(1) - expect(markAsyncToolDelivered).not.toHaveBeenCalled() - }) -}) diff --git a/apps/sim/lib/copilot/orchestrator/index.ts b/apps/sim/lib/copilot/orchestrator/index.ts index 056d296585a..3320b0df33d 100644 --- a/apps/sim/lib/copilot/orchestrator/index.ts +++ b/apps/sim/lib/copilot/orchestrator/index.ts @@ -18,12 +18,13 @@ import { isToolAvailableOnSimSide, prepareExecutionContext, } from '@/lib/copilot/orchestrator/tool-executor' -import type { - ExecutionContext, - OrchestratorOptions, - OrchestratorResult, - SSEEvent, - ToolCallState, +import { + type ExecutionContext, + isTerminalToolCallStatus, + type OrchestratorOptions, + type OrchestratorResult, + type SSEEvent, + type ToolCallState, } from '@/lib/copilot/orchestrator/types' import { env } from '@/lib/core/config/env' import { getEffectiveDecryptedEnv } from '@/lib/environment/utils' @@ -59,16 +60,6 @@ function didAsyncToolSucceed(input: { return false } -function isTerminalToolState(status?: ToolCallState['status']): boolean { - return Boolean( - status === 'success' || - status === 'error' || - status === 'cancelled' || - status === 'skipped' || - status === 'rejected' - ) -} - interface ReadyContinuationTool { toolCallId: string toolState?: ToolCallState @@ -249,7 +240,7 @@ export async function orchestrateCopilotStream( if ( !durableRow && toolState && - isTerminalToolState(toolState.status) && + isTerminalToolCallStatus(toolState.status) && !isToolAvailableOnSimSide(toolState.name) ) { logger.info('Including Go-handled tool in resume payload (no Sim-side row)', { diff --git a/apps/sim/lib/copilot/orchestrator/sse/handlers/tool-execution.ts b/apps/sim/lib/copilot/orchestrator/sse/handlers/tool-execution.ts index c299e0d9c06..2b48b5a91c9 100644 --- a/apps/sim/lib/copilot/orchestrator/sse/handlers/tool-execution.ts +++ b/apps/sim/lib/copilot/orchestrator/sse/handlers/tool-execution.ts @@ -6,12 +6,13 @@ import { completeAsyncToolCall, markAsyncToolRunning } from '@/lib/copilot/async import { waitForToolConfirmation } from '@/lib/copilot/orchestrator/persistence' import { asRecord, markToolResultSeen } from '@/lib/copilot/orchestrator/sse/utils' import { executeToolServerSide, markToolComplete } from '@/lib/copilot/orchestrator/tool-executor' -import type { - ExecutionContext, - OrchestratorOptions, - SSEEvent, - StreamingContext, - ToolCallResult, +import { + type ExecutionContext, + isTerminalToolCallStatus, + type OrchestratorOptions, + type SSEEvent, + type StreamingContext, + type ToolCallResult, } from '@/lib/copilot/orchestrator/types' import { extractDeletedResourcesFromToolResult, @@ -243,16 +244,6 @@ function cancelledCompletion(message: string): AsyncToolCompletion { } } -function isTerminalToolCallStatus(status: string): boolean { - return ( - status === 'success' || - status === 'error' || - status === 'cancelled' || - status === 'skipped' || - status === 'rejected' - ) -} - function terminalCompletionFromToolCall(toolCall: { status: string error?: string @@ -262,10 +253,23 @@ function terminalCompletionFromToolCall(toolCall: { return cancelledCompletion(toolCall.error || 'Tool execution cancelled') } - if (toolCall.status === 'success' || toolCall.status === 'skipped') { + if (toolCall.status === 'success') { + return { + status: 'success', + message: 'Tool completed', + data: + toolCall.result?.output && + typeof toolCall.result.output === 'object' && + !Array.isArray(toolCall.result.output) + ? (toolCall.result.output as Record) + : undefined, + } + } + + if (toolCall.status === 'skipped') { return { status: 'success', - message: toolCall.error || 'Tool completed', + message: 'Tool skipped', data: toolCall.result?.output && typeof toolCall.result.output === 'object' && diff --git a/apps/sim/lib/copilot/orchestrator/types.ts b/apps/sim/lib/copilot/orchestrator/types.ts index c86169d499b..97c3bd61367 100644 --- a/apps/sim/lib/copilot/orchestrator/types.ts +++ b/apps/sim/lib/copilot/orchestrator/types.ts @@ -59,6 +59,18 @@ export type ToolCallStatus = | 'rejected' | 'cancelled' +const TERMINAL_TOOL_STATUSES: ReadonlySet = new Set([ + 'success', + 'error', + 'cancelled', + 'skipped', + 'rejected', +]) + +export function isTerminalToolCallStatus(status?: string): boolean { + return TERMINAL_TOOL_STATUSES.has(status as ToolCallStatus) +} + export interface ToolCallState { id: string name: string