From e17576c6e438c7311bcd2dacfea637ced211b22d Mon Sep 17 00:00:00 2001 From: Siddharth Ganesan Date: Mon, 23 Mar 2026 23:37:12 -0700 Subject: [PATCH 1/9] Fixes --- apps/sim/app/api/copilot/chat/abort/route.ts | 16 +++++ apps/sim/app/api/copilot/chat/route.ts | 16 +++++ apps/sim/app/api/mothership/chat/route.ts | 15 +++- .../[workspaceId]/home/hooks/use-chat.ts | 45 +++++++----- apps/sim/lib/copilot/async-runs/repository.ts | 18 ++++- apps/sim/lib/copilot/chat-streaming.ts | 70 +++++++++++++------ apps/sim/lib/copilot/orchestrator/index.ts | 11 +++ 7 files changed, 145 insertions(+), 46 deletions(-) diff --git a/apps/sim/app/api/copilot/chat/abort/route.ts b/apps/sim/app/api/copilot/chat/abort/route.ts index b22280f330..d35072e68f 100644 --- a/apps/sim/app/api/copilot/chat/abort/route.ts +++ b/apps/sim/app/api/copilot/chat/abort/route.ts @@ -1,6 +1,8 @@ import { NextResponse } from 'next/server' import { abortActiveStream } from '@/lib/copilot/chat-streaming' +import { SIM_AGENT_API_URL } from '@/lib/copilot/constants' import { authenticateCopilotRequestSessionOnly } from '@/lib/copilot/request-helpers' +import { env } from '@/lib/core/config/env' export async function POST(request: Request) { const { userId: authenticatedUserId, isAuthenticated } = @@ -17,6 +19,20 @@ export async function POST(request: Request) { return NextResponse.json({ error: 'streamId is required' }, { status: 400 }) } + try { + const headers: Record = { 'Content-Type': 'application/json' } + if (env.COPILOT_API_KEY) { + headers['x-api-key'] = env.COPILOT_API_KEY + } + await fetch(`${SIM_AGENT_API_URL}/api/streams/explicit-abort`, { + method: 'POST', + headers, + body: JSON.stringify({ messageId: streamId }), + }) + } catch { + // best effort: local abort should still proceed even if Go marker fails + } + const aborted = abortActiveStream(streamId) return NextResponse.json({ aborted }) } diff --git a/apps/sim/app/api/copilot/chat/route.ts b/apps/sim/app/api/copilot/chat/route.ts index 015975e283..afd2fa2d3c 100644 --- a/apps/sim/app/api/copilot/chat/route.ts +++ b/apps/sim/app/api/copilot/chat/route.ts @@ -8,6 +8,7 @@ import { getSession } from '@/lib/auth' import { getAccessibleCopilotChat, resolveOrCreateChat } from '@/lib/copilot/chat-lifecycle' import { buildCopilotRequestPayload } from '@/lib/copilot/chat-payload' import { + acquirePendingChatStream, createSSEStream, requestChatTitle, SSE_RESPONSE_HEADERS, @@ -320,6 +321,19 @@ export async function POST(req: NextRequest) { } } + if (actualChatId) { + const acquired = await acquirePendingChatStream(actualChatId, userMessageIdToUse) + if (!acquired) { + return NextResponse.json( + { + error: + 'A response is already in progress for this chat. Wait for it to finish or use Stop.', + }, + { status: 409 } + ) + } + } + if (stream) { const executionId = crypto.randomUUID() const runId = crypto.randomUUID() @@ -337,6 +351,7 @@ export async function POST(req: NextRequest) { titleProvider: provider, requestId: tracker.requestId, workspaceId: resolvedWorkspaceId, + pendingChatStreamAlreadyRegistered: Boolean(actualChatId), orchestrateOptions: { userId: authenticatedUserId, workflowId, @@ -348,6 +363,7 @@ export async function POST(req: NextRequest) { interactive: true, onComplete: async (result: OrchestratorResult) => { if (!actualChatId) return + if (!result.success) return const assistantMessage: Record = { id: crypto.randomUUID(), diff --git a/apps/sim/app/api/mothership/chat/route.ts b/apps/sim/app/api/mothership/chat/route.ts index 5e93baeff5..ba262842cf 100644 --- a/apps/sim/app/api/mothership/chat/route.ts +++ b/apps/sim/app/api/mothership/chat/route.ts @@ -8,9 +8,9 @@ import { getSession } from '@/lib/auth' import { resolveOrCreateChat } from '@/lib/copilot/chat-lifecycle' import { buildCopilotRequestPayload } from '@/lib/copilot/chat-payload' import { + acquirePendingChatStream, createSSEStream, SSE_RESPONSE_HEADERS, - waitForPendingChatStream, } from '@/lib/copilot/chat-streaming' import type { OrchestratorResult } from '@/lib/copilot/orchestrator/types' import { processContextsServer, resolveActiveResourceContext } from '@/lib/copilot/process-contents' @@ -253,7 +253,16 @@ export async function POST(req: NextRequest) { ) if (actualChatId) { - await waitForPendingChatStream(actualChatId) + const acquired = await acquirePendingChatStream(actualChatId, userMessageId) + if (!acquired) { + return NextResponse.json( + { + error: + 'A response is already in progress for this chat. Wait for it to finish or use Stop.', + }, + { status: 409 } + ) + } } const executionId = crypto.randomUUID() @@ -271,6 +280,7 @@ export async function POST(req: NextRequest) { titleModel: 'claude-opus-4-6', requestId: tracker.requestId, workspaceId, + pendingChatStreamAlreadyRegistered: Boolean(actualChatId), orchestrateOptions: { userId: authenticatedUserId, workspaceId, @@ -282,6 +292,7 @@ export async function POST(req: NextRequest) { interactive: true, onComplete: async (result: OrchestratorResult) => { if (!actualChatId) return + if (!result.success) return const assistantMessage: Record = { id: crypto.randomUUID(), diff --git a/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts b/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts index f5900149e3..b154b86f21 100644 --- a/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts +++ b/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts @@ -85,6 +85,8 @@ const STATE_TO_STATUS: Record = { const DEPLOY_TOOL_NAMES = new Set(['deploy_api', 'deploy_chat', 'deploy_mcp', 'redeploy']) const RECONNECT_TAIL_ERROR = 'Live reconnect failed before the stream finished. The latest response may be incomplete.' +const UNEXPECTED_PROVIDER_ERROR_TAG = + '{"message":"An unexpected provider error occurred"}' function mapStoredBlock(block: TaskStoredContentBlock): ContentBlock { const mapped: ContentBlock = { @@ -415,6 +417,8 @@ export function useChat( setIsReconnecting(false) setResources([]) setActiveResourceId(null) + setStreamingFile(null) + streamingFileRef.current = null setMessageQueue([]) }, [initialChatId, queryClient]) @@ -433,6 +437,8 @@ export function useChat( setIsReconnecting(false) setResources([]) setActiveResourceId(null) + setStreamingFile(null) + streamingFileRef.current = null setMessageQueue([]) }, [isHomePage]) @@ -497,7 +503,6 @@ export function useChat( } if (activeStreamId && !sendingRef.current) { - abortControllerRef.current?.abort() const gen = ++streamGenRef.current const abortController = new AbortController() abortControllerRef.current = abortController @@ -508,6 +513,7 @@ export function useChat( const assistantId = crypto.randomUUID() const reconnect = async () => { + let reconnectFailed = false try { const encoder = new TextEncoder() @@ -515,14 +521,8 @@ export function useChat( const streamStatus = snapshot?.status ?? '' if (batchEvents.length === 0 && streamStatus === 'unknown') { - const cid = chatIdRef.current - if (cid) { - fetch(stopPathRef.current, { - method: 'POST', - headers: { 'Content-Type': 'application/json' }, - body: JSON.stringify({ chatId: cid, streamId: activeStreamId, content: '' }), - }).catch(() => {}) - } + reconnectFailed = true + setError(RECONNECT_TAIL_ERROR) return } @@ -550,6 +550,7 @@ export function useChat( { signal: abortController.signal } ) if (!sseRes.ok || !sseRes.body) { + reconnectFailed = true logger.warn('SSE tail reconnect returned no readable body', { status: sseRes.status, streamId: activeStreamId, @@ -565,6 +566,7 @@ export function useChat( } } catch (err) { if (!(err instanceof Error && err.name === 'AbortError')) { + reconnectFailed = true logger.warn('SSE tail failed during reconnect', err) setError(RECONNECT_TAIL_ERROR) } @@ -578,10 +580,11 @@ export function useChat( await processSSEStreamRef.current(combinedStream.getReader(), assistantId, gen) } catch (err) { if (err instanceof Error && err.name === 'AbortError') return + reconnectFailed = true } finally { setIsReconnecting(false) if (streamGenRef.current === gen) { - finalizeRef.current() + finalizeRef.current(reconnectFailed ? { error: true } : undefined) } } } @@ -619,6 +622,17 @@ export function useChat( return b } + const appendInlineErrorTag = (tag: string) => { + if (runningText.includes(tag)) return + const tb = ensureTextBlock() + const prefix = runningText.length > 0 && !runningText.endsWith('\n') ? '\n' : '' + tb.content = `${tb.content ?? ''}${prefix}${tag}` + if (activeSubagent) tb.subagent = activeSubagent + runningText += `${prefix}${tag}` + streamingContentRef.current = runningText + flush() + } + const isStale = () => expectedGen !== undefined && streamGenRef.current !== expectedGen const flush = () => { @@ -644,12 +658,9 @@ export function useChat( try { while (true) { - if (isStale()) { - reader.cancel().catch(() => {}) - break - } const { done, value } = await reader.read() if (done) break + if (isStale()) continue buffer += decoder.decode(value, { stream: true }) const lines = buffer.split('\n') @@ -1114,14 +1125,11 @@ export function useChat( } case 'error': { setError(parsed.error || 'An error occurred') + appendInlineErrorTag(UNEXPECTED_PROVIDER_ERROR_TAG) break } } } - if (isStale()) { - reader.cancel().catch(() => {}) - break - } } } finally { if (streamReaderRef.current === reader) { @@ -1500,7 +1508,6 @@ export function useChat( useEffect(() => { return () => { - streamReaderRef.current?.cancel().catch(() => {}) streamReaderRef.current = null abortControllerRef.current = null streamGenRef.current++ diff --git a/apps/sim/lib/copilot/async-runs/repository.ts b/apps/sim/lib/copilot/async-runs/repository.ts index 07d0a3d85d..664e6f0555 100644 --- a/apps/sim/lib/copilot/async-runs/repository.ts +++ b/apps/sim/lib/copilot/async-runs/repository.ts @@ -121,6 +121,20 @@ export async function upsertAsyncToolCall(input: { status?: CopilotAsyncToolStatus }) { const existing = await getAsyncToolCall(input.toolCallId) + const incomingStatus = input.status ?? 'pending' + if ( + existing && + (isTerminalAsyncStatus(existing.status) || isDeliveredAsyncStatus(existing.status)) && + !isTerminalAsyncStatus(incomingStatus) && + !isDeliveredAsyncStatus(incomingStatus) + ) { + logger.info('Ignoring async tool upsert that would downgrade terminal state', { + toolCallId: input.toolCallId, + existingStatus: existing.status, + incomingStatus, + }) + return existing + } const effectiveRunId = input.runId ?? existing?.runId ?? null if (!effectiveRunId) { logger.warn('upsertAsyncToolCall missing runId and no existing row', { @@ -140,7 +154,7 @@ export async function upsertAsyncToolCall(input: { toolCallId: input.toolCallId, toolName: input.toolName, args: input.args ?? {}, - status: input.status ?? 'pending', + status: incomingStatus, updatedAt: now, }) .onConflictDoUpdate({ @@ -150,7 +164,7 @@ export async function upsertAsyncToolCall(input: { checkpointId: input.checkpointId ?? null, toolName: input.toolName, args: input.args ?? {}, - status: input.status ?? 'pending', + status: incomingStatus, updatedAt: now, }, }) diff --git a/apps/sim/lib/copilot/chat-streaming.ts b/apps/sim/lib/copilot/chat-streaming.ts index 67144d66a5..2235dbfd3b 100644 --- a/apps/sim/lib/copilot/chat-streaming.ts +++ b/apps/sim/lib/copilot/chat-streaming.ts @@ -49,19 +49,40 @@ function resolvePendingChatStream(chatId: string, streamId: string): void { } /** - * Abort any in-flight stream on `chatId` and wait for it to fully settle - * (including onComplete and Go-side persistence). Returns immediately if - * no stream is active. Gives up after `timeoutMs`. + * Wait for any in-flight stream on `chatId` to settle without force-aborting it. + * Returns true when no stream is active (or it settles in time), false on timeout. */ -export async function waitForPendingChatStream(chatId: string, timeoutMs = 5_000): Promise { +export async function waitForPendingChatStream( + chatId: string, + timeoutMs = 5_000 +): Promise { const entry = pendingChatStreams.get(chatId) - if (!entry) return + if (!entry) return true - // Force-abort the previous stream so we don't passively wait for it to - // finish naturally (which could take tens of seconds for a subagent). - abortActiveStream(entry.streamId) + return await Promise.race([ + entry.promise.then(() => true), + new Promise((r) => setTimeout(() => r(false), timeoutMs)), + ]) +} + +export async function acquirePendingChatStream( + chatId: string, + streamId: string, + timeoutMs = 5_000 +): Promise { + for (;;) { + const existing = pendingChatStreams.get(chatId) + if (!existing) { + registerPendingChatStream(chatId, streamId) + return true + } - await Promise.race([entry.promise, new Promise((r) => setTimeout(r, timeoutMs))]) + const settled = await Promise.race([ + existing.promise.then(() => true), + new Promise((r) => setTimeout(() => r(false), timeoutMs)), + ]) + if (!settled) return false + } } export function abortActiveStream(streamId: string): boolean { @@ -135,6 +156,7 @@ export interface StreamingOrchestrationParams { requestId: string workspaceId?: string orchestrateOptions: Omit + pendingChatStreamAlreadyRegistered?: boolean } export function createSSEStream(params: StreamingOrchestrationParams): ReadableStream { @@ -153,6 +175,7 @@ export function createSSEStream(params: StreamingOrchestrationParams): ReadableS requestId, workspaceId, orchestrateOptions, + pendingChatStreamAlreadyRegistered = false, } = params let eventWriter: ReturnType | null = null @@ -160,7 +183,7 @@ export function createSSEStream(params: StreamingOrchestrationParams): ReadableS const abortController = new AbortController() activeStreams.set(streamId, abortController) - if (chatId) { + if (chatId && !pendingChatStreamAlreadyRegistered) { registerPendingChatStream(chatId, streamId) } @@ -197,8 +220,20 @@ export function createSSEStream(params: StreamingOrchestrationParams): ReadableS const eventId = ++localSeq - // Enqueue to client stream FIRST for minimal latency. - // Redis persistence happens after so the client never waits on I/O. + try { + await eventWriter.write(event) + if (FLUSH_EVENT_TYPES.has(event.type)) { + await eventWriter.flush() + } + } catch (error) { + logger.error(`[${requestId}] Failed to persist stream event`, { + eventType: event.type, + eventId, + error: error instanceof Error ? error.message : String(error), + }) + throw error + } + try { if (!clientDisconnected) { controller.enqueue( @@ -208,17 +243,6 @@ export function createSSEStream(params: StreamingOrchestrationParams): ReadableS } catch { clientDisconnected = true } - - try { - await eventWriter.write(event) - if (FLUSH_EVENT_TYPES.has(event.type)) { - await eventWriter.flush() - } - } catch { - if (clientDisconnected) { - await eventWriter.flush().catch(() => {}) - } - } } if (chatId) { diff --git a/apps/sim/lib/copilot/orchestrator/index.ts b/apps/sim/lib/copilot/orchestrator/index.ts index 8332967592..055ca27c31 100644 --- a/apps/sim/lib/copilot/orchestrator/index.ts +++ b/apps/sim/lib/copilot/orchestrator/index.ts @@ -190,6 +190,7 @@ export async function orchestrateCopilotStream( if (!continuation) break let resumeReady = false + let emptyClaimRetries = 0 for (;;) { claimedToolCallIds = [] const resumeWorkerId = continuation.runId || context.runId || context.messageId @@ -246,6 +247,16 @@ export async function orchestrateCopilotStream( } if (claimableToolCallIds.length === 0) { + if (emptyClaimRetries < 3 && continuation.pendingToolCallIds.length > 0) { + emptyClaimRetries++ + logger.info('Retrying async resume claim after no tool calls were claimable', { + checkpointId: continuation.checkpointId, + runId: continuation.runId, + retry: emptyClaimRetries, + }) + await new Promise((resolve) => setTimeout(resolve, 250 * emptyClaimRetries)) + continue + } logger.warn('Skipping async resume because no tool calls were claimable', { checkpointId: continuation.checkpointId, runId: continuation.runId, From 4a7ac04b9d2339e002960b659e3345a200ebabdb Mon Sep 17 00:00:00 2001 From: Siddharth Ganesan Date: Tue, 24 Mar 2026 00:30:45 -0700 Subject: [PATCH 2/9] Address bugbot --- apps/sim/lib/copilot/chat-streaming.ts | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/apps/sim/lib/copilot/chat-streaming.ts b/apps/sim/lib/copilot/chat-streaming.ts index 2235dbfd3b..574bf56317 100644 --- a/apps/sim/lib/copilot/chat-streaming.ts +++ b/apps/sim/lib/copilot/chat-streaming.ts @@ -245,6 +245,17 @@ export function createSSEStream(params: StreamingOrchestrationParams): ReadableS } } + const pushEventBestEffort = async (event: Record) => { + try { + await pushEvent(event) + } catch (error) { + logger.error(`[${requestId}] Failed to push event`, { + eventType: event.type, + error: error instanceof Error ? error.message : String(error), + }) + } + } + if (chatId) { await pushEvent({ type: 'chat_id', chatId }) } @@ -308,7 +319,7 @@ export function createSSEStream(params: StreamingOrchestrationParams): ReadableS logger.error(`[${requestId}] Orchestration returned failure`, { error: errorMessage, }) - await pushEvent({ + await pushEventBestEffort({ type: 'error', error: errorMessage, data: { @@ -348,7 +359,7 @@ export function createSSEStream(params: StreamingOrchestrationParams): ReadableS } logger.error(`[${requestId}] Orchestration error:`, error) const errorMessage = error instanceof Error ? error.message : 'Stream error' - await pushEvent({ + await pushEventBestEffort({ type: 'error', error: errorMessage, data: { From 73fa20e932d26f75932105878cc73356eba56bae Mon Sep 17 00:00:00 2001 From: Siddharth Ganesan Date: Tue, 24 Mar 2026 00:35:20 -0700 Subject: [PATCH 3/9] Fixes --- .../[workspaceId]/home/hooks/use-chat.ts | 19 ++++++++++++++++--- apps/sim/lib/copilot/chat-streaming.ts | 2 +- 2 files changed, 17 insertions(+), 4 deletions(-) diff --git a/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts b/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts index b154b86f21..61cc00e487 100644 --- a/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts +++ b/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts @@ -85,8 +85,6 @@ const STATE_TO_STATUS: Record = { const DEPLOY_TOOL_NAMES = new Set(['deploy_api', 'deploy_chat', 'deploy_mcp', 'redeploy']) const RECONNECT_TAIL_ERROR = 'Live reconnect failed before the stream finished. The latest response may be incomplete.' -const UNEXPECTED_PROVIDER_ERROR_TAG = - '{"message":"An unexpected provider error occurred"}' function mapStoredBlock(block: TaskStoredContentBlock): ContentBlock { const mapped: ContentBlock = { @@ -633,6 +631,21 @@ export function useChat( flush() } + const buildInlineErrorTag = (payload: SSEPayload) => { + const data = getPayloadData(payload) as Record | undefined + const message = + (data?.displayMessage as string | undefined) || + payload.error || + 'An unexpected error occurred' + const provider = (data?.provider as string | undefined) || undefined + const code = (data?.code as string | undefined) || undefined + return `${JSON.stringify({ + message, + ...(code ? { code } : {}), + ...(provider ? { provider } : {}), + })}` + } + const isStale = () => expectedGen !== undefined && streamGenRef.current !== expectedGen const flush = () => { @@ -1125,7 +1138,7 @@ export function useChat( } case 'error': { setError(parsed.error || 'An error occurred') - appendInlineErrorTag(UNEXPECTED_PROVIDER_ERROR_TAG) + appendInlineErrorTag(buildInlineErrorTag(parsed)) break } } diff --git a/apps/sim/lib/copilot/chat-streaming.ts b/apps/sim/lib/copilot/chat-streaming.ts index 574bf56317..bcbabebaaf 100644 --- a/apps/sim/lib/copilot/chat-streaming.ts +++ b/apps/sim/lib/copilot/chat-streaming.ts @@ -231,7 +231,7 @@ export function createSSEStream(params: StreamingOrchestrationParams): ReadableS eventId, error: error instanceof Error ? error.message : String(error), }) - throw error + // Keep the live SSE stream going even if durable buffering hiccups. } try { From 2378b6a9dc3fc191e76b777ec304995cdbfdde99 Mon Sep 17 00:00:00 2001 From: Siddharth Ganesan Date: Tue, 24 Mar 2026 00:51:12 -0700 Subject: [PATCH 4/9] Fix --- apps/sim/app/api/copilot/chat/abort/route.ts | 2 +- .../[workspaceId]/home/hooks/use-chat.ts | 24 ++++-- apps/sim/lib/copilot/chat-streaming.ts | 77 ++++++++++++++++++- 3 files changed, 93 insertions(+), 10 deletions(-) diff --git a/apps/sim/app/api/copilot/chat/abort/route.ts b/apps/sim/app/api/copilot/chat/abort/route.ts index d35072e68f..2242504d13 100644 --- a/apps/sim/app/api/copilot/chat/abort/route.ts +++ b/apps/sim/app/api/copilot/chat/abort/route.ts @@ -33,6 +33,6 @@ export async function POST(request: Request) { // best effort: local abort should still proceed even if Go marker fails } - const aborted = abortActiveStream(streamId) + const aborted = await abortActiveStream(streamId) return NextResponse.json({ aborted }) } diff --git a/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts b/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts index 61cc00e487..c72e5346cb 100644 --- a/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts +++ b/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts @@ -323,8 +323,8 @@ export function useChat( reader: ReadableStreamDefaultReader, assistantId: string, expectedGen?: number - ) => Promise - >(async () => {}) + ) => Promise + >(async () => false) const finalizeRef = useRef<(options?: { error?: boolean }) => void>(() => {}) const abortControllerRef = useRef(null) @@ -575,7 +575,14 @@ export function useChat( }, }) - await processSSEStreamRef.current(combinedStream.getReader(), assistantId, gen) + const hadStreamError = await processSSEStreamRef.current( + combinedStream.getReader(), + assistantId, + gen + ) + if (hadStreamError) { + reconnectFailed = true + } } catch (err) { if (err instanceof Error && err.name === 'AbortError') return reconnectFailed = true @@ -647,6 +654,7 @@ export function useChat( } const isStale = () => expectedGen !== undefined && streamGenRef.current !== expectedGen + let sawStreamError = false const flush = () => { if (isStale()) return @@ -1137,6 +1145,7 @@ export function useChat( break } case 'error': { + sawStreamError = true setError(parsed.error || 'An error occurred') appendInlineErrorTag(buildInlineErrorTag(parsed)) break @@ -1149,6 +1158,7 @@ export function useChat( streamReaderRef.current = null } } + return sawStreamError }, [workspaceId, queryClient, addResource, removeResource] ) @@ -1375,7 +1385,10 @@ export function useChat( if (!response.body) throw new Error('No response body') - await processSSEStream(response.body.getReader(), assistantId, gen) + const hadStreamError = await processSSEStream(response.body.getReader(), assistantId, gen) + if (streamGenRef.current === gen) { + finalize(hadStreamError ? { error: true } : undefined) + } } catch (err) { if (err instanceof Error && err.name === 'AbortError') return setError(err instanceof Error ? err.message : 'Failed to send message') @@ -1384,9 +1397,6 @@ export function useChat( } return } - if (streamGenRef.current === gen) { - finalize() - } }, [workspaceId, queryClient, processSSEStream, finalize] ) diff --git a/apps/sim/lib/copilot/chat-streaming.ts b/apps/sim/lib/copilot/chat-streaming.ts index bcbabebaaf..f6eb4ba29e 100644 --- a/apps/sim/lib/copilot/chat-streaming.ts +++ b/apps/sim/lib/copilot/chat-streaming.ts @@ -13,9 +13,13 @@ import { } from '@/lib/copilot/orchestrator/stream/buffer' import { taskPubSub } from '@/lib/copilot/task-events' import { env } from '@/lib/core/config/env' +import { acquireLock, getRedisClient, releaseLock } from '@/lib/core/config/redis' import { SSE_HEADERS } from '@/lib/core/utils/sse' const logger = createLogger('CopilotChatStreaming') +const CHAT_STREAM_LOCK_TTL_SECONDS = 2 * 60 * 60 +const STREAM_ABORT_TTL_SECONDS = 10 * 60 +const STREAM_ABORT_POLL_MS = 1000 // Registry of in-flight Sim→Go streams so the explicit abort endpoint can // reach them. Keyed by streamId, cleaned up when the stream completes. @@ -48,6 +52,14 @@ function resolvePendingChatStream(chatId: string, streamId: string): void { } } +function getChatStreamLockKey(chatId: string): string { + return `copilot:chat-stream-lock:${chatId}` +} + +function getStreamAbortKey(streamId: string): string { + return `copilot:stream-abort:${streamId}` +} + /** * Wait for any in-flight stream on `chatId` to settle without force-aborting it. * Returns true when no stream is active (or it settles in time), false on timeout. @@ -70,6 +82,24 @@ export async function acquirePendingChatStream( streamId: string, timeoutMs = 5_000 ): Promise { + const redis = getRedisClient() + if (redis) { + const deadline = Date.now() + timeoutMs + for (;;) { + const acquired = await acquireLock( + getChatStreamLockKey(chatId), + streamId, + CHAT_STREAM_LOCK_TTL_SECONDS + ) + if (acquired) { + registerPendingChatStream(chatId, streamId) + return true + } + if (Date.now() >= deadline) return false + await new Promise((resolve) => setTimeout(resolve, 200)) + } + } + for (;;) { const existing = pendingChatStreams.get(chatId) if (!existing) { @@ -85,9 +115,22 @@ export async function acquirePendingChatStream( } } -export function abortActiveStream(streamId: string): boolean { +export async function abortActiveStream(streamId: string): Promise { + const redis = getRedisClient() + let published = false + if (redis) { + try { + await redis.set(getStreamAbortKey(streamId), '1', 'EX', STREAM_ABORT_TTL_SECONDS) + published = true + } catch (error) { + logger.warn('Failed to publish distributed stream abort', { + streamId, + error: error instanceof Error ? error.message : String(error), + }) + } + } const controller = activeStreams.get(streamId) - if (!controller) return false + if (!controller) return published controller.abort() activeStreams.delete(streamId) return true @@ -214,6 +257,27 @@ export function createSSEStream(params: StreamingOrchestrationParams): ReadableS eventWriter = createStreamEventWriter(streamId) let localSeq = 0 + let abortPoller: ReturnType | null = null + + const redis = getRedisClient() + if (redis) { + abortPoller = setInterval(() => { + void (async () => { + try { + const shouldAbort = await redis.get(getStreamAbortKey(streamId)) + if (shouldAbort && !abortController.signal.aborted) { + abortController.abort() + await redis.del(getStreamAbortKey(streamId)) + } + } catch (error) { + logger.warn(`[${requestId}] Failed to poll distributed stream abort`, { + streamId, + error: error instanceof Error ? error.message : String(error), + }) + } + })() + }, STREAM_ABORT_POLL_MS) + } const pushEvent = async (event: Record) => { if (!eventWriter) return @@ -380,10 +444,19 @@ export function createSSEStream(params: StreamingOrchestrationParams): ReadableS }).catch(() => {}) } finally { clearInterval(keepaliveInterval) + if (abortPoller) { + clearInterval(abortPoller) + } activeStreams.delete(streamId) if (chatId) { + if (redis) { + await releaseLock(getChatStreamLockKey(chatId), streamId).catch(() => false) + } resolvePendingChatStream(chatId, streamId) } + if (redis) { + await redis.del(getStreamAbortKey(streamId)).catch(() => {}) + } try { controller.close() } catch { From 6c52af19473934fba57e7b9c055af1718e1f0c2a Mon Sep 17 00:00:00 2001 From: Siddharth Ganesan Date: Tue, 24 Mar 2026 01:02:24 -0700 Subject: [PATCH 5/9] Fixes --- apps/sim/app/api/copilot/chat/route.ts | 43 ++++++++++++++++++- .../[workspaceId]/home/hooks/use-chat.ts | 2 + 2 files changed, 43 insertions(+), 2 deletions(-) diff --git a/apps/sim/app/api/copilot/chat/route.ts b/apps/sim/app/api/copilot/chat/route.ts index afd2fa2d3c..9f072265b2 100644 --- a/apps/sim/app/api/copilot/chat/route.ts +++ b/apps/sim/app/api/copilot/chat/route.ts @@ -17,6 +17,7 @@ import { COPILOT_REQUEST_MODES } from '@/lib/copilot/models' import { orchestrateCopilotStream } from '@/lib/copilot/orchestrator' import { getStreamMeta, readStreamEvents } from '@/lib/copilot/orchestrator/stream/buffer' import type { OrchestratorResult } from '@/lib/copilot/orchestrator/types' +import { resolveActiveResourceContext } from '@/lib/copilot/process-contents' import { authenticateCopilotRequestSessionOnly, createBadRequestResponse, @@ -45,6 +46,13 @@ const FileAttachmentSchema = z.object({ size: z.number(), }) +const ResourceAttachmentSchema = z.object({ + type: z.enum(['workflow', 'table', 'file', 'knowledgebase']), + id: z.string().min(1), + title: z.string().optional(), + active: z.boolean().optional(), +}) + const ChatMessageSchema = z.object({ message: z.string().min(1, 'Message is required'), userMessageId: z.string().optional(), @@ -59,6 +67,7 @@ const ChatMessageSchema = z.object({ stream: z.boolean().optional().default(true), implicitFeedback: z.string().optional(), fileAttachments: z.array(FileAttachmentSchema).optional(), + resourceAttachments: z.array(ResourceAttachmentSchema).optional(), provider: z.string().optional(), contexts: z .array( @@ -125,6 +134,7 @@ export async function POST(req: NextRequest) { stream, implicitFeedback, fileAttachments, + resourceAttachments, provider, contexts, commands, @@ -242,6 +252,35 @@ export async function POST(req: NextRequest) { } } + if (Array.isArray(resourceAttachments) && resourceAttachments.length > 0 && resolvedWorkspaceId) { + const results = await Promise.allSettled( + resourceAttachments.map(async (r) => { + const ctx = await resolveActiveResourceContext( + r.type, + r.id, + resolvedWorkspaceId!, + authenticatedUserId, + actualChatId + ) + if (!ctx) return null + return { + ...ctx, + tag: r.active ? '@active_tab' : '@open_tab', + } + }) + ) + for (const result of results) { + if (result.status === 'fulfilled' && result.value) { + agentContexts.push(result.value) + } else if (result.status === 'rejected') { + logger.error( + `[${tracker.requestId}] Failed to resolve resource attachment`, + result.reason + ) + } + } + } + const effectiveMode = mode === 'agent' ? 'build' : mode const userPermission = resolvedWorkspaceId @@ -321,7 +360,7 @@ export async function POST(req: NextRequest) { } } - if (actualChatId) { + if (stream && actualChatId) { const acquired = await acquirePendingChatStream(actualChatId, userMessageIdToUse) if (!acquired) { return NextResponse.json( @@ -351,7 +390,7 @@ export async function POST(req: NextRequest) { titleProvider: provider, requestId: tracker.requestId, workspaceId: resolvedWorkspaceId, - pendingChatStreamAlreadyRegistered: Boolean(actualChatId), + pendingChatStreamAlreadyRegistered: Boolean(actualChatId && stream), orchestrateOptions: { userId: authenticatedUserId, workflowId, diff --git a/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts b/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts index c72e5346cb..8641a03d9b 100644 --- a/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts +++ b/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts @@ -447,6 +447,8 @@ export function useChat( const snapshot = chatHistory.streamSnapshot if (activeStreamId && !snapshot && !sendingRef.current) { + appliedChatIdRef.current = chatHistory.id + setError(RECONNECT_TAIL_ERROR) queryClient.invalidateQueries({ queryKey: taskKeys.detail(chatHistory.id) }) return } From 630a293a73b29cc545da3e05b7c2e26ba980e767 Mon Sep 17 00:00:00 2001 From: Siddharth Ganesan Date: Tue, 24 Mar 2026 01:03:54 -0700 Subject: [PATCH 6/9] Fix lint --- apps/sim/app/api/copilot/chat/route.ts | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/apps/sim/app/api/copilot/chat/route.ts b/apps/sim/app/api/copilot/chat/route.ts index 9f072265b2..3e33b207b8 100644 --- a/apps/sim/app/api/copilot/chat/route.ts +++ b/apps/sim/app/api/copilot/chat/route.ts @@ -252,7 +252,11 @@ export async function POST(req: NextRequest) { } } - if (Array.isArray(resourceAttachments) && resourceAttachments.length > 0 && resolvedWorkspaceId) { + if ( + Array.isArray(resourceAttachments) && + resourceAttachments.length > 0 && + resolvedWorkspaceId + ) { const results = await Promise.allSettled( resourceAttachments.map(async (r) => { const ctx = await resolveActiveResourceContext( From 32ee4a94713900349f14833020e22200772d6b28 Mon Sep 17 00:00:00 2001 From: Siddharth Ganesan Date: Tue, 24 Mar 2026 01:59:04 -0700 Subject: [PATCH 7/9] Fixes --- apps/sim/app/api/copilot/chat/abort/route.ts | 34 +++++++- apps/sim/app/api/copilot/chat/route.ts | 44 ++++++---- .../[workspaceId]/home/hooks/use-chat.ts | 20 ++++- apps/sim/lib/copilot/async-runs/repository.ts | 14 ++++ apps/sim/lib/copilot/chat-streaming.ts | 84 +++++++++++++++---- .../sse/handlers/tool-execution.ts | 26 +++++- .../orchestrator/tool-executor/index.ts | 6 +- .../tool-executor/integration-tools.ts | 35 +++++--- .../tools/server/files/workspace-file.ts | 17 ++++ .../tools/server/image/generate-image.ts | 13 +++ .../tools/server/knowledge/knowledge-base.ts | 10 ++- .../copilot/tools/server/table/user-table.ts | 30 +++++-- .../visualization/generate-visualization.ts | 44 +++++++--- apps/sim/lib/copilot/tools/shared/schemas.ts | 5 +- apps/sim/lib/copilot/vfs/operations.test.ts | 10 +++ apps/sim/lib/copilot/vfs/serializers.ts | 1 + apps/sim/lib/copilot/vfs/workspace-vfs.ts | 13 ++- .../workspace/workspace-file-manager.ts | 33 ++++++-- 18 files changed, 351 insertions(+), 88 deletions(-) diff --git a/apps/sim/app/api/copilot/chat/abort/route.ts b/apps/sim/app/api/copilot/chat/abort/route.ts index 2242504d13..33fe68c8d8 100644 --- a/apps/sim/app/api/copilot/chat/abort/route.ts +++ b/apps/sim/app/api/copilot/chat/abort/route.ts @@ -1,9 +1,12 @@ import { NextResponse } from 'next/server' -import { abortActiveStream } from '@/lib/copilot/chat-streaming' +import { getLatestRunForStream } from '@/lib/copilot/async-runs/repository' +import { abortActiveStream, waitForPendingChatStream } from '@/lib/copilot/chat-streaming' import { SIM_AGENT_API_URL } from '@/lib/copilot/constants' import { authenticateCopilotRequestSessionOnly } from '@/lib/copilot/request-helpers' import { env } from '@/lib/core/config/env' +const GO_EXPLICIT_ABORT_TIMEOUT_MS = 3000 + export async function POST(request: Request) { const { userId: authenticatedUserId, isAuthenticated } = await authenticateCopilotRequestSessionOnly() @@ -14,25 +17,48 @@ export async function POST(request: Request) { const body = await request.json().catch(() => ({})) const streamId = typeof body.streamId === 'string' ? body.streamId : '' + let chatId = typeof body.chatId === 'string' ? body.chatId : '' if (!streamId) { return NextResponse.json({ error: 'streamId is required' }, { status: 400 }) } + if (!chatId) { + const run = await getLatestRunForStream(streamId, authenticatedUserId).catch(() => null) + if (run?.chatId) { + chatId = run.chatId + } + } + try { const headers: Record = { 'Content-Type': 'application/json' } if (env.COPILOT_API_KEY) { headers['x-api-key'] = env.COPILOT_API_KEY } - await fetch(`${SIM_AGENT_API_URL}/api/streams/explicit-abort`, { + const controller = new AbortController() + const timeout = setTimeout(() => controller.abort(), GO_EXPLICIT_ABORT_TIMEOUT_MS) + const response = await fetch(`${SIM_AGENT_API_URL}/api/streams/explicit-abort`, { method: 'POST', headers, - body: JSON.stringify({ messageId: streamId }), - }) + signal: controller.signal, + body: JSON.stringify({ + messageId: streamId, + userId: authenticatedUserId, + ...(chatId ? { chatId } : {}), + }), + }).finally(() => clearTimeout(timeout)) + if (!response.ok) { + throw new Error(`Explicit abort marker request failed: ${response.status}`) + } } catch { // best effort: local abort should still proceed even if Go marker fails } const aborted = await abortActiveStream(streamId) + if (chatId) { + await waitForPendingChatStream(chatId, GO_EXPLICIT_ABORT_TIMEOUT_MS + 1000, streamId).catch( + () => false + ) + } return NextResponse.json({ aborted }) } diff --git a/apps/sim/app/api/copilot/chat/route.ts b/apps/sim/app/api/copilot/chat/route.ts index 3e33b207b8..a3349dfe60 100644 --- a/apps/sim/app/api/copilot/chat/route.ts +++ b/apps/sim/app/api/copilot/chat/route.ts @@ -10,6 +10,7 @@ import { buildCopilotRequestPayload } from '@/lib/copilot/chat-payload' import { acquirePendingChatStream, createSSEStream, + releasePendingChatStream, requestChatTitle, SSE_RESPONSE_HEADERS, } from '@/lib/copilot/chat-streaming' @@ -108,6 +109,10 @@ const ChatMessageSchema = z.object({ */ export async function POST(req: NextRequest) { const tracker = createRequestTracker() + let actualChatId: string | undefined + let pendingChatStreamAcquired = false + let pendingChatStreamHandedOff = false + let pendingChatStreamID: string | undefined try { // Get session to access user information including name @@ -200,7 +205,7 @@ export async function POST(req: NextRequest) { let currentChat: any = null let conversationHistory: any[] = [] - let actualChatId = chatId + actualChatId = chatId const selectedModel = model || 'claude-opus-4-6' if (chatId || createNewChat) { @@ -335,6 +340,21 @@ export async function POST(req: NextRequest) { }) } catch {} + if (stream && actualChatId) { + const acquired = await acquirePendingChatStream(actualChatId, userMessageIdToUse) + if (!acquired) { + return NextResponse.json( + { + error: + 'A response is already in progress for this chat. Wait for it to finish or use Stop.', + }, + { status: 409 } + ) + } + pendingChatStreamAcquired = true + pendingChatStreamID = userMessageIdToUse + } + if (actualChatId) { const userMsg = { id: userMessageIdToUse, @@ -364,19 +384,6 @@ export async function POST(req: NextRequest) { } } - if (stream && actualChatId) { - const acquired = await acquirePendingChatStream(actualChatId, userMessageIdToUse) - if (!acquired) { - return NextResponse.json( - { - error: - 'A response is already in progress for this chat. Wait for it to finish or use Stop.', - }, - { status: 409 } - ) - } - } - if (stream) { const executionId = crypto.randomUUID() const runId = crypto.randomUUID() @@ -482,6 +489,7 @@ export async function POST(req: NextRequest) { }, }, }) + pendingChatStreamHandedOff = true return new Response(sseStream, { headers: SSE_RESPONSE_HEADERS }) } @@ -587,6 +595,14 @@ export async function POST(req: NextRequest) { }, }) } catch (error) { + if ( + actualChatId && + pendingChatStreamAcquired && + !pendingChatStreamHandedOff && + pendingChatStreamID + ) { + await releasePendingChatStream(actualChatId, pendingChatStreamID).catch(() => {}) + } const duration = tracker.getDuration() if (error instanceof z.ZodError) { diff --git a/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts b/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts index 8641a03d9b..6113d5040c 100644 --- a/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts +++ b/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts @@ -441,18 +441,34 @@ export function useChat( }, [isHomePage]) useEffect(() => { - if (!chatHistory || appliedChatIdRef.current === chatHistory.id) return + if (!chatHistory) return const activeStreamId = chatHistory.activeStreamId const snapshot = chatHistory.streamSnapshot + const staleKey = `${chatHistory.id}:stale-stream` if (activeStreamId && !snapshot && !sendingRef.current) { - appliedChatIdRef.current = chatHistory.id + if (appliedChatIdRef.current === staleKey) return + appliedChatIdRef.current = staleKey + setMessages(chatHistory.messages.map(mapStoredMessage)) + + const persistedResources = chatHistory.resources.filter((r) => r.id !== 'streaming-file') + if (persistedResources.length > 0) { + setResources(persistedResources) + setActiveResourceId(persistedResources[persistedResources.length - 1].id) + for (const resource of persistedResources) { + if (resource.type !== 'workflow') continue + ensureWorkflowInRegistry(resource.id, resource.title, workspaceId) + } + } + setError(RECONNECT_TAIL_ERROR) queryClient.invalidateQueries({ queryKey: taskKeys.detail(chatHistory.id) }) return } + if (appliedChatIdRef.current === chatHistory.id) return + appliedChatIdRef.current = chatHistory.id const mappedMessages = chatHistory.messages.map(mapStoredMessage) const shouldPreserveActiveStreamingMessage = diff --git a/apps/sim/lib/copilot/async-runs/repository.ts b/apps/sim/lib/copilot/async-runs/repository.ts index 664e6f0555..acea65f610 100644 --- a/apps/sim/lib/copilot/async-runs/repository.ts +++ b/apps/sim/lib/copilot/async-runs/repository.ts @@ -86,6 +86,20 @@ export async function getLatestRunForExecution(executionId: string) { return run ?? null } +export async function getLatestRunForStream(streamId: string, userId?: string) { + const conditions = userId + ? and(eq(copilotRuns.streamId, streamId), eq(copilotRuns.userId, userId)) + : eq(copilotRuns.streamId, streamId) + const [run] = await db + .select() + .from(copilotRuns) + .where(conditions) + .orderBy(desc(copilotRuns.startedAt)) + .limit(1) + + return run ?? null +} + export async function getRunSegment(runId: string) { const [run] = await db.select().from(copilotRuns).where(eq(copilotRuns.id, runId)).limit(1) return run ?? null diff --git a/apps/sim/lib/copilot/chat-streaming.ts b/apps/sim/lib/copilot/chat-streaming.ts index f6eb4ba29e..dc0ad0a698 100644 --- a/apps/sim/lib/copilot/chat-streaming.ts +++ b/apps/sim/lib/copilot/chat-streaming.ts @@ -8,6 +8,7 @@ import type { OrchestrateStreamOptions } from '@/lib/copilot/orchestrator' import { orchestrateCopilotStream } from '@/lib/copilot/orchestrator' import { createStreamEventWriter, + getStreamMeta, resetStreamBuffer, setStreamMeta, } from '@/lib/copilot/orchestrator/stream/buffer' @@ -66,15 +67,46 @@ function getStreamAbortKey(streamId: string): string { */ export async function waitForPendingChatStream( chatId: string, - timeoutMs = 5_000 + timeoutMs = 5_000, + expectedStreamId?: string ): Promise { - const entry = pendingChatStreams.get(chatId) - if (!entry) return true + const redis = getRedisClient() + const deadline = Date.now() + timeoutMs + + for (;;) { + const entry = pendingChatStreams.get(chatId) + const localPending = !!entry && (!expectedStreamId || entry.streamId === expectedStreamId) + + if (redis) { + try { + const ownerStreamId = await redis.get(getChatStreamLockKey(chatId)) + const lockReleased = + !ownerStreamId || (expectedStreamId !== undefined && ownerStreamId !== expectedStreamId) + if (!localPending && lockReleased) { + return true + } + } catch (error) { + logger.warn('Failed to check distributed chat stream lock while waiting', { + chatId, + expectedStreamId, + error: error instanceof Error ? error.message : String(error), + }) + } + } else if (!localPending) { + return true + } - return await Promise.race([ - entry.promise.then(() => true), - new Promise((r) => setTimeout(() => r(false), timeoutMs)), - ]) + if (Date.now() >= deadline) return false + await new Promise((resolve) => setTimeout(resolve, 200)) + } +} + +export async function releasePendingChatStream(chatId: string, streamId: string): Promise { + const redis = getRedisClient() + if (redis) { + await releaseLock(getChatStreamLockKey(chatId), streamId).catch(() => false) + } + resolvePendingChatStream(chatId, streamId) } export async function acquirePendingChatStream( @@ -86,14 +118,36 @@ export async function acquirePendingChatStream( if (redis) { const deadline = Date.now() + timeoutMs for (;;) { - const acquired = await acquireLock( - getChatStreamLockKey(chatId), - streamId, - CHAT_STREAM_LOCK_TTL_SECONDS - ) - if (acquired) { - registerPendingChatStream(chatId, streamId) - return true + try { + const acquired = await acquireLock( + getChatStreamLockKey(chatId), + streamId, + CHAT_STREAM_LOCK_TTL_SECONDS + ) + if (acquired) { + registerPendingChatStream(chatId, streamId) + return true + } + if (!pendingChatStreams.has(chatId)) { + const ownerStreamId = await redis.get(getChatStreamLockKey(chatId)) + if (ownerStreamId) { + const ownerMeta = await getStreamMeta(ownerStreamId) + const ownerTerminal = + ownerMeta?.status === 'complete' || + ownerMeta?.status === 'error' || + ownerMeta?.status === 'cancelled' + if (ownerTerminal) { + await releaseLock(getChatStreamLockKey(chatId), ownerStreamId).catch(() => false) + continue + } + } + } + } catch (error) { + logger.warn('Distributed chat stream lock failed; retrying distributed coordination', { + chatId, + streamId, + error: error instanceof Error ? error.message : String(error), + }) } if (Date.now() >= deadline) return false await new Promise((resolve) => setTimeout(resolve, 200)) diff --git a/apps/sim/lib/copilot/orchestrator/sse/handlers/tool-execution.ts b/apps/sim/lib/copilot/orchestrator/sse/handlers/tool-execution.ts index 1e447ac28c..84f09010a1 100644 --- a/apps/sim/lib/copilot/orchestrator/sse/handlers/tool-execution.ts +++ b/apps/sim/lib/copilot/orchestrator/sse/handlers/tool-execution.ts @@ -117,6 +117,20 @@ const FORMAT_TO_CONTENT_TYPE: Record = { html: 'text/html', } +function normalizeOutputWorkspaceFileName(outputPath: string): string { + const trimmed = outputPath.trim().replace(/^\/+/, '') + const withoutPrefix = trimmed.startsWith('files/') ? trimmed.slice('files/'.length) : trimmed + if (!withoutPrefix) { + throw new Error('outputPath must include a file name, e.g. "files/result.json"') + } + if (withoutPrefix.includes('/')) { + throw new Error( + 'outputPath must target a flat workspace file, e.g. "files/result.json". Nested paths like "files/reports/result.json" are not supported.' + ) + } + return withoutPrefix +} + function resolveOutputFormat(fileName: string, explicit?: string): OutputFormat { if (explicit && explicit in FORMAT_TO_CONTENT_TYPE) return explicit as OutputFormat const ext = fileName.slice(fileName.lastIndexOf('.')).toLowerCase() @@ -153,10 +167,10 @@ async function maybeWriteOutputToFile( const explicitFormat = (params?.outputFormat as string | undefined) ?? (args?.outputFormat as string | undefined) - const fileName = outputPath.replace(/^files\//, '') - const format = resolveOutputFormat(fileName, explicitFormat) try { + const fileName = normalizeOutputWorkspaceFileName(outputPath) + const format = resolveOutputFormat(fileName, explicitFormat) if (context.abortSignal?.aborted) { throw new Error('Request aborted before tool mutation could be applied') } @@ -193,12 +207,16 @@ async function maybeWriteOutputToFile( }, } } catch (err) { + const message = err instanceof Error ? err.message : String(err) logger.warn('Failed to write tool output to file', { toolName, outputPath, - error: err instanceof Error ? err.message : String(err), + error: message, }) - return result + return { + success: false, + error: `Failed to write output file: ${message}`, + } } } diff --git a/apps/sim/lib/copilot/orchestrator/tool-executor/index.ts b/apps/sim/lib/copilot/orchestrator/tool-executor/index.ts index db8f430c0d..68880b77e0 100644 --- a/apps/sim/lib/copilot/orchestrator/tool-executor/index.ts +++ b/apps/sim/lib/copilot/orchestrator/tool-executor/index.ts @@ -1050,21 +1050,21 @@ const SIM_WORKFLOW_TOOL_HANDLERS: Record< return { success: false, error: - 'Opening a workspace file requires workspace context. Pass the file UUID from files//meta.json.', + 'Opening a workspace file requires workspace context. Pass the canonical file UUID from files/by-id//meta.json.', } } if (!isUuid(params.id)) { return { success: false, error: - 'open_resource for files requires the canonical UUID from files//meta.json (the "id" field). Do not pass VFS paths, display names, or file_ strings.', + 'open_resource for files requires the canonical file UUID. Read files/by-id//meta.json or files//meta.json and pass the "id" field. Do not pass VFS paths or display names.', } } const record = await getWorkspaceFile(c.workspaceId, params.id) if (!record) { return { success: false, - error: `No workspace file with id "${params.id}". Confirm the UUID from meta.json.`, + error: `No workspace file with id "${params.id}". Confirm the UUID from files/by-id//meta.json.`, } } resourceId = record.id diff --git a/apps/sim/lib/copilot/orchestrator/tool-executor/integration-tools.ts b/apps/sim/lib/copilot/orchestrator/tool-executor/integration-tools.ts index 853208cadb..fe377312e5 100644 --- a/apps/sim/lib/copilot/orchestrator/tool-executor/integration-tools.ts +++ b/apps/sim/lib/copilot/orchestrator/tool-executor/integration-tools.ts @@ -16,6 +16,7 @@ import { getTableById, queryRows } from '@/lib/table/service' import { downloadWorkspaceFile, findWorkspaceFileRecord, + getSandboxWorkspaceFilePath, listWorkspaceFiles, } from '@/lib/uploads/contexts/workspace/workspace-file-manager' import { getWorkflowById } from '@/lib/workflows/utils' @@ -179,23 +180,30 @@ export async function executeIntegrationToolDirect( ]) let totalSize = 0 - const inputFilePaths = executionParams.inputFiles as string[] | undefined - if (inputFilePaths?.length) { + const inputFileIds = executionParams.inputFiles as string[] | undefined + if (inputFileIds?.length) { const allFiles = await listWorkspaceFiles(workspaceId) - for (const filePath of inputFilePaths) { - const fileName = filePath.replace(/^files\//, '') - const ext = fileName.split('.').pop()?.toLowerCase() ?? '' - if (!TEXT_EXTENSIONS.has(ext)) { - logger.warn('Skipping non-text sandbox input file', { fileName, ext }) + for (const fileRef of inputFileIds) { + const record = findWorkspaceFileRecord(allFiles, fileRef) + if (!record) { + logger.warn('Sandbox input file not found', { fileRef }) continue } - const record = findWorkspaceFileRecord(allFiles, filePath) - if (!record) { - logger.warn('Sandbox input file not found', { fileName }) + const ext = record.name.split('.').pop()?.toLowerCase() ?? '' + if (!TEXT_EXTENSIONS.has(ext)) { + logger.warn('Skipping non-text sandbox input file', { + fileId: record.id, + fileName: record.name, + ext, + }) continue } if (record.size > MAX_FILE_SIZE) { - logger.warn('Sandbox input file exceeds size limit', { fileName, size: record.size }) + logger.warn('Sandbox input file exceeds size limit', { + fileId: record.id, + fileName: record.name, + size: record.size, + }) continue } if (totalSize + record.size > MAX_TOTAL_SIZE) { @@ -204,7 +212,10 @@ export async function executeIntegrationToolDirect( } const buffer = await downloadWorkspaceFile(record) totalSize += buffer.length - sandboxFiles.push({ path: `/home/user/${fileName}`, content: buffer.toString('utf-8') }) + sandboxFiles.push({ + path: getSandboxWorkspaceFilePath(record), + content: buffer.toString('utf-8'), + }) } } diff --git a/apps/sim/lib/copilot/tools/server/files/workspace-file.ts b/apps/sim/lib/copilot/tools/server/files/workspace-file.ts index 2d0943fb21..e85fec4075 100644 --- a/apps/sim/lib/copilot/tools/server/files/workspace-file.ts +++ b/apps/sim/lib/copilot/tools/server/files/workspace-file.ts @@ -35,6 +35,15 @@ function inferContentType(fileName: string, explicitType?: string): string { return EXT_TO_MIME[ext] || 'text/plain' } +function validateFlatWorkspaceFileName(fileName: string): string | null { + const trimmed = fileName.trim() + if (!trimmed) return 'File name cannot be empty' + if (trimmed.includes('/')) { + return 'Workspace files use a flat namespace. Use a plain file name like "report.csv", not a path like "files/reports/report.csv".' + } + return null +} + export const workspaceFileServerTool: BaseServerTool = { name: 'workspace_file', async execute( @@ -67,6 +76,10 @@ export const workspaceFileServerTool: BaseServerTool = { '3:4': '768x1024', } +function validateGeneratedWorkspaceFileName(fileName: string): string | null { + const trimmed = fileName.trim() + if (!trimmed) return 'File name cannot be empty' + if (trimmed.includes('/')) { + return 'Workspace files use a flat namespace. Use a plain file name like "generated-image.png", not a path like "images/generated-image.png".' + } + return null +} + interface GenerateImageArgs { prompt: string referenceFileIds?: string[] @@ -151,6 +160,10 @@ export const generateImageServerTool: BaseServerTool { - const record = await resolveWorkspaceFileReference(workspaceId, filePath) + const record = await resolveWorkspaceFileReference(workspaceId, fileReference) if (!record) { throw new Error( - `File not found: "${filePath}". Use glob("files/*/meta.json") to list available files.` + `File not found: "${fileReference}". Use glob("files/by-id/*/meta.json") to list canonical file IDs.` ) } const buffer = await downloadWorkspaceFile(record) @@ -645,15 +645,21 @@ export const userTableServerTool: BaseServerTool } case 'create_from_file': { + const fileId = (args as Record).fileId as string | undefined const filePath = (args as Record).filePath as string | undefined - if (!filePath) { - return { success: false, message: 'filePath is required (e.g. "files/data.csv")' } + const fileReference = fileId || filePath + if (!fileReference) { + return { + success: false, + message: + 'fileId is required for create_from_file. Read files/{name}/meta.json or files/by-id/*/meta.json to get the canonical file ID.', + } } if (!workspaceId) { return { success: false, message: 'Workspace ID is required' } } - const file = await resolveWorkspaceFile(filePath, workspaceId) + const file = await resolveWorkspaceFile(fileReference, workspaceId) const { headers, rows } = await parseFileRows(file.buffer, file.name, file.type) if (rows.length === 0) { return { success: false, message: 'File contains no data rows' } @@ -700,10 +706,16 @@ export const userTableServerTool: BaseServerTool } case 'import_file': { + const fileId = (args as Record).fileId as string | undefined const filePath = (args as Record).filePath as string | undefined const tableId = (args as Record).tableId as string | undefined - if (!filePath) { - return { success: false, message: 'filePath is required (e.g. "files/data.csv")' } + const fileReference = fileId || filePath + if (!fileReference) { + return { + success: false, + message: + 'fileId is required for import_file. Read files/{name}/meta.json or files/by-id/*/meta.json to get the canonical file ID.', + } } if (!tableId) { return { success: false, message: 'tableId is required for import_file' } @@ -717,7 +729,7 @@ export const userTableServerTool: BaseServerTool return { success: false, message: `Table not found: ${tableId}` } } - const file = await resolveWorkspaceFile(filePath, workspaceId) + const file = await resolveWorkspaceFile(fileReference, workspaceId) const { headers, rows } = await parseFileRows(file.buffer, file.name, file.type) if (rows.length === 0) { return { success: false, message: 'File contains no data rows' } diff --git a/apps/sim/lib/copilot/tools/server/visualization/generate-visualization.ts b/apps/sim/lib/copilot/tools/server/visualization/generate-visualization.ts index 29d4d6b43e..9be7fac1c4 100644 --- a/apps/sim/lib/copilot/tools/server/visualization/generate-visualization.ts +++ b/apps/sim/lib/copilot/tools/server/visualization/generate-visualization.ts @@ -11,6 +11,7 @@ import { getServePathPrefix } from '@/lib/uploads' import { downloadWorkspaceFile, findWorkspaceFileRecord, + getSandboxWorkspaceFilePath, getWorkspaceFile, listWorkspaceFiles, updateWorkspaceFileContent, @@ -49,6 +50,15 @@ const TEXT_EXTENSIONS = new Set(['csv', 'json', 'txt', 'md', 'html', 'xml', 'tsv const MAX_FILE_SIZE = 10 * 1024 * 1024 const MAX_TOTAL_SIZE = 50 * 1024 * 1024 +function validateGeneratedWorkspaceFileName(fileName: string): string | null { + const trimmed = fileName.trim() + if (!trimmed) return 'File name cannot be empty' + if (trimmed.includes('/')) { + return 'Workspace files use a flat namespace. Use a plain file name like "chart.png", not a path like "charts/chart.png".' + } + return null +} + async function collectSandboxFiles( workspaceId: string, inputFiles?: string[], @@ -59,20 +69,27 @@ async function collectSandboxFiles( if (inputFiles?.length) { const allFiles = await listWorkspaceFiles(workspaceId) - for (const filePath of inputFiles) { - const fileName = filePath.replace(/^files\//, '') - const ext = fileName.split('.').pop()?.toLowerCase() ?? '' - if (!TEXT_EXTENSIONS.has(ext)) { - logger.warn('Skipping non-text sandbox input file', { fileName, ext }) + for (const fileRef of inputFiles) { + const record = findWorkspaceFileRecord(allFiles, fileRef) + if (!record) { + logger.warn('Sandbox input file not found', { fileRef }) continue } - const record = findWorkspaceFileRecord(allFiles, filePath) - if (!record) { - logger.warn('Sandbox input file not found', { fileName }) + const ext = record.name.split('.').pop()?.toLowerCase() ?? '' + if (!TEXT_EXTENSIONS.has(ext)) { + logger.warn('Skipping non-text sandbox input file', { + fileId: record.id, + fileName: record.name, + ext, + }) continue } if (record.size > MAX_FILE_SIZE) { - logger.warn('Sandbox input file exceeds size limit', { fileName, size: record.size }) + logger.warn('Sandbox input file exceeds size limit', { + fileId: record.id, + fileName: record.name, + size: record.size, + }) continue } if (totalSize + record.size > MAX_TOTAL_SIZE) { @@ -81,7 +98,10 @@ async function collectSandboxFiles( } const buffer = await downloadWorkspaceFile(record) totalSize += buffer.length - sandboxFiles.push({ path: `/home/user/${fileName}`, content: buffer.toString('utf-8') }) + sandboxFiles.push({ + path: getSandboxWorkspaceFilePath(record), + content: buffer.toString('utf-8'), + }) } } @@ -185,6 +205,10 @@ export const generateVisualizationServerTool: BaseServerTool< } const fileName = params.fileName || 'chart.png' + const fileNameValidationError = validateGeneratedWorkspaceFileName(fileName) + if (fileNameValidationError) { + return { success: false, message: fileNameValidationError } + } const imageBuffer = Buffer.from(imageBase64, 'base64') if (params.overwriteFileId) { diff --git a/apps/sim/lib/copilot/tools/shared/schemas.ts b/apps/sim/lib/copilot/tools/shared/schemas.ts index 5a5cb42df4..c59200e846 100644 --- a/apps/sim/lib/copilot/tools/shared/schemas.ts +++ b/apps/sim/lib/copilot/tools/shared/schemas.ts @@ -50,7 +50,9 @@ export const KnowledgeBaseArgsSchema = z.object({ workspaceId: z.string().optional(), /** Knowledge base ID (required for get, query, add_file, list_tags, create_tag, get_tag_usage, add_connector) */ knowledgeBaseId: z.string().optional(), - /** Workspace file path to add as a document (required for add_file). Example: "files/report.pdf" */ + /** Workspace file ID to add as a document (required for add_file). */ + fileId: z.string().optional(), + /** Legacy workspace file reference for add_file. Prefer fileId. */ filePath: z.string().optional(), /** Search query text (required for query) */ query: z.string().optional(), @@ -145,6 +147,7 @@ export const UserTableArgsSchema = z.object({ sort: z.record(z.enum(['asc', 'desc'])).optional(), limit: z.number().optional(), offset: z.number().optional(), + fileId: z.string().optional(), filePath: z.string().optional(), column: z .object({ diff --git a/apps/sim/lib/copilot/vfs/operations.test.ts b/apps/sim/lib/copilot/vfs/operations.test.ts index add84c2901..b1e8956def 100644 --- a/apps/sim/lib/copilot/vfs/operations.test.ts +++ b/apps/sim/lib/copilot/vfs/operations.test.ts @@ -9,6 +9,16 @@ function vfsFromEntries(entries: [string, string][]): Map { } describe('glob', () => { + it('matches canonical file metadata paths by id', () => { + const files = vfsFromEntries([ + ['files/by-id/wf_123/meta.json', '{}'], + ['files/data.csv/meta.json', '{}'], + ]) + const hits = glob(files, 'files/by-id/*/meta.json') + expect(hits).toContain('files/by-id/wf_123/meta.json') + expect(hits).not.toContain('files/data.csv/meta.json') + }) + it('matches one path segment for single star (files listing pattern)', () => { const files = vfsFromEntries([ ['files/a/meta.json', '{}'], diff --git a/apps/sim/lib/copilot/vfs/serializers.ts b/apps/sim/lib/copilot/vfs/serializers.ts index 78c349eab8..219ea04196 100644 --- a/apps/sim/lib/copilot/vfs/serializers.ts +++ b/apps/sim/lib/copilot/vfs/serializers.ts @@ -262,6 +262,7 @@ export function serializeConnectorOverview(connectors: SerializableConnectorConf /** * Serialize workspace file metadata for VFS files/{name}/meta.json + * and files/by-id/{id}/meta.json. */ export function serializeFileMeta(file: { id: string diff --git a/apps/sim/lib/copilot/vfs/workspace-vfs.ts b/apps/sim/lib/copilot/vfs/workspace-vfs.ts index 872e6120e8..e43e5b03ec 100644 --- a/apps/sim/lib/copilot/vfs/workspace-vfs.ts +++ b/apps/sim/lib/copilot/vfs/workspace-vfs.ts @@ -271,6 +271,7 @@ function getStaticComponentFiles(): Map { * knowledgebases/{name}/connectors.json * tables/{name}/meta.json * files/{name}/meta.json + * files/by-id/{id}/meta.json * jobs/{title}/meta.json * jobs/{title}/history.json * jobs/{title}/executions.json @@ -390,7 +391,7 @@ export class WorkspaceVFS { /** * Attempt to read dynamic workspace file content from storage. * Handles images (base64), parseable documents (PDF, etc.), and text files. - * Returns null if the path doesn't match `files/{name}` or the file isn't found. + * Returns null if the path doesn't match `files/{name}` / `files/by-id/{id}` or the file isn't found. */ async readFileContent(path: string): Promise { const match = path.match(/^files\/(.+?)(?:\/content)?$/) @@ -676,6 +677,16 @@ export class WorkspaceVFS { uploadedAt: file.uploadedAt, }) ) + this.files.set( + `files/by-id/${file.id}/meta.json`, + serializeFileMeta({ + id: file.id, + name: file.name, + contentType: file.type, + size: file.size, + uploadedAt: file.uploadedAt, + }) + ) } return files.map((f) => ({ name: f.name, type: f.type, size: f.size })) diff --git a/apps/sim/lib/uploads/contexts/workspace/workspace-file-manager.ts b/apps/sim/lib/uploads/contexts/workspace/workspace-file-manager.ts index c3819cd1b3..1d56d47b4b 100644 --- a/apps/sim/lib/uploads/contexts/workspace/workspace-file-manager.ts +++ b/apps/sim/lib/uploads/contexts/workspace/workspace-file-manager.ts @@ -382,16 +382,21 @@ export async function listWorkspaceFiles( } /** - * Normalize a workspace file reference to its display name. - * Supports raw names and VFS-style paths like `files/name`, `files/name/content`, - * and `files/name/meta.json`. - * - * Used by storage resolution (`findWorkspaceFileRecord`), not by `open_resource`, which - * requires the canonical database UUID only. + * Normalize a workspace file reference to either a display name or canonical file ID. + * Supports raw IDs, `files/{name}`, `files/{name}/content`, `files/{name}/meta.json`, + * and canonical VFS aliases like `files/by-id/{fileId}/content`. */ export function normalizeWorkspaceFileReference(fileReference: string): string { const trimmed = fileReference.trim().replace(/^\/+/, '') + if (trimmed.startsWith('files/by-id/')) { + const byIdRef = trimmed.slice('files/by-id/'.length) + const match = byIdRef.match(/^([^/]+)(?:\/(?:meta\.json|content))?$/) + if (match?.[1]) { + return match[1] + } + } + if (trimmed.startsWith('files/')) { const withoutPrefix = trimmed.slice('files/'.length) if (withoutPrefix.endsWith('/meta.json')) { @@ -406,6 +411,13 @@ export function normalizeWorkspaceFileReference(fileReference: string): string { return trimmed } +/** + * Canonical sandbox mount path for an existing workspace file. + */ +export function getSandboxWorkspaceFilePath(file: Pick): string { + return `/home/user/files/${file.id}/${file.name}` +} + /** * Find a workspace file record in an existing list from either its id or a VFS/name reference. * For copilot `open_resource` and the resource panel, use {@link getWorkspaceFile} with a UUID only. @@ -420,10 +432,13 @@ export function findWorkspaceFileRecord( } const normalizedReference = normalizeWorkspaceFileReference(fileReference) + const normalizedIdMatch = files.find((file) => file.id === normalizedReference) + if (normalizedIdMatch) { + return normalizedIdMatch + } + const segmentKey = normalizeVfsSegment(normalizedReference) - return ( - files.find((file) => normalizeVfsSegment(file.name) === segmentKey) ?? null - ) + return files.find((file) => normalizeVfsSegment(file.name) === segmentKey) ?? null } /** From 47484972af4eab70525e8ab19a9a063714316ded Mon Sep 17 00:00:00 2001 From: Siddharth Ganesan Date: Tue, 24 Mar 2026 02:11:35 -0700 Subject: [PATCH 8/9] Fixes --- .../[workspaceId]/home/hooks/use-chat.ts | 64 ++++++------------- .../tool-executor/integration-tools.ts | 7 +- .../copilot/tools/server/table/user-table.ts | 43 ++++++++++++- .../visualization/generate-visualization.ts | 7 +- 4 files changed, 73 insertions(+), 48 deletions(-) diff --git a/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts b/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts index 6113d5040c..8aa8b1a43b 100644 --- a/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts +++ b/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts @@ -441,34 +441,10 @@ export function useChat( }, [isHomePage]) useEffect(() => { - if (!chatHistory) return + if (!chatHistory || appliedChatIdRef.current === chatHistory.id) return const activeStreamId = chatHistory.activeStreamId const snapshot = chatHistory.streamSnapshot - const staleKey = `${chatHistory.id}:stale-stream` - - if (activeStreamId && !snapshot && !sendingRef.current) { - if (appliedChatIdRef.current === staleKey) return - appliedChatIdRef.current = staleKey - setMessages(chatHistory.messages.map(mapStoredMessage)) - - const persistedResources = chatHistory.resources.filter((r) => r.id !== 'streaming-file') - if (persistedResources.length > 0) { - setResources(persistedResources) - setActiveResourceId(persistedResources[persistedResources.length - 1].id) - for (const resource of persistedResources) { - if (resource.type !== 'workflow') continue - ensureWorkflowInRegistry(resource.id, resource.title, workspaceId) - } - } - - setError(RECONNECT_TAIL_ERROR) - queryClient.invalidateQueries({ queryKey: taskKeys.detail(chatHistory.id) }) - return - } - - if (appliedChatIdRef.current === chatHistory.id) return - appliedChatIdRef.current = chatHistory.id const mappedMessages = chatHistory.messages.map(mapStoredMessage) const shouldPreserveActiveStreamingMessage = @@ -1436,6 +1412,25 @@ export function useChat( sendingRef.current = false setIsSending(false) + setMessages((prev) => + prev.map((msg) => { + if (!msg.contentBlocks?.some((b) => b.toolCall?.status === 'executing')) return msg + const updated = msg.contentBlocks!.map((block) => { + if (block.toolCall?.status !== 'executing') return block + return { + ...block, + toolCall: { + ...block.toolCall, + status: 'cancelled' as const, + displayTitle: 'Stopped by user', + }, + } + }) + updated.push({ type: 'stopped' as const }) + return { ...msg, contentBlocks: updated } + }) + ) + if (sid) { fetch('/api/copilot/chat/abort', { method: 'POST', @@ -1459,25 +1454,6 @@ export function useChat( streamingFileRef.current = null setResources((rs) => rs.filter((resource) => resource.id !== 'streaming-file')) - setMessages((prev) => - prev.map((msg) => { - if (!msg.contentBlocks?.some((b) => b.toolCall?.status === 'executing')) return msg - const updated = msg.contentBlocks!.map((block) => { - if (block.toolCall?.status !== 'executing') return block - return { - ...block, - toolCall: { - ...block.toolCall, - status: 'cancelled' as const, - displayTitle: 'Stopped by user', - }, - } - }) - updated.push({ type: 'stopped' as const }) - return { ...msg, contentBlocks: updated } - }) - ) - const execState = useExecutionStore.getState() const consoleStore = useTerminalConsoleStore.getState() for (const [workflowId, wfExec] of execState.workflowExecutions) { diff --git a/apps/sim/lib/copilot/orchestrator/tool-executor/integration-tools.ts b/apps/sim/lib/copilot/orchestrator/tool-executor/integration-tools.ts index fe377312e5..9beaeacdf2 100644 --- a/apps/sim/lib/copilot/orchestrator/tool-executor/integration-tools.ts +++ b/apps/sim/lib/copilot/orchestrator/tool-executor/integration-tools.ts @@ -212,9 +212,14 @@ export async function executeIntegrationToolDirect( } const buffer = await downloadWorkspaceFile(record) totalSize += buffer.length + const textContent = buffer.toString('utf-8') sandboxFiles.push({ path: getSandboxWorkspaceFilePath(record), - content: buffer.toString('utf-8'), + content: textContent, + }) + sandboxFiles.push({ + path: `/home/user/${record.name}`, + content: textContent, }) } } diff --git a/apps/sim/lib/copilot/tools/server/table/user-table.ts b/apps/sim/lib/copilot/tools/server/table/user-table.ts index 616c13c9f7..fb8b1eb1c7 100644 --- a/apps/sim/lib/copilot/tools/server/table/user-table.ts +++ b/apps/sim/lib/copilot/tools/server/table/user-table.ts @@ -41,6 +41,45 @@ const SCHEMA_SAMPLE_SIZE = 100 type ColumnType = 'string' | 'number' | 'boolean' | 'date' | 'json' +function sanitizeColumnName(raw: string): string { + let name = raw + .trim() + .replace(/[^a-zA-Z0-9_]/g, '_') + .replace(/_+/g, '_') + .replace(/^_|_$/g, '') + if (!name || /^\d/.test(name)) name = `col_${name}` + return name +} + +function sanitizeHeaders( + headers: string[], + rows: Record[] +): { headers: string[]; rows: Record[] } { + const renamed = new Map() + const seen = new Set() + + for (const raw of headers) { + let safe = sanitizeColumnName(raw) + while (seen.has(safe)) safe = `${safe}_` + seen.add(safe) + renamed.set(raw, safe) + } + + const noChange = headers.every((h) => renamed.get(h) === h) + if (noChange) return { headers, rows } + + return { + headers: headers.map((h) => renamed.get(h)!), + rows: rows.map((row) => { + const out: Record = {} + for (const [raw, safe] of renamed) { + if (raw in row) out[safe] = row[raw] + } + return out + }), + } +} + async function resolveWorkspaceFile( fileReference: string, workspaceId: string @@ -87,7 +126,7 @@ async function parseJsonRows( } for (const key of Object.keys(row)) headerSet.add(key) } - return { headers: [...headerSet], rows: parsed } + return sanitizeHeaders([...headerSet], parsed) } async function parseCsvRows( @@ -110,7 +149,7 @@ async function parseCsvRows( if (headers.length === 0) { throw new Error('CSV file has no headers') } - return { headers, rows: parsed } + return sanitizeHeaders(headers, parsed) } function inferColumnType(values: unknown[]): ColumnType { diff --git a/apps/sim/lib/copilot/tools/server/visualization/generate-visualization.ts b/apps/sim/lib/copilot/tools/server/visualization/generate-visualization.ts index 9be7fac1c4..b1eaf61a16 100644 --- a/apps/sim/lib/copilot/tools/server/visualization/generate-visualization.ts +++ b/apps/sim/lib/copilot/tools/server/visualization/generate-visualization.ts @@ -98,9 +98,14 @@ async function collectSandboxFiles( } const buffer = await downloadWorkspaceFile(record) totalSize += buffer.length + const textContent = buffer.toString('utf-8') sandboxFiles.push({ path: getSandboxWorkspaceFilePath(record), - content: buffer.toString('utf-8'), + content: textContent, + }) + sandboxFiles.push({ + path: `/home/user/${record.name}`, + content: textContent, }) } } From 8ca2ff779767f58f353914995c2a351a63f663d3 Mon Sep 17 00:00:00 2001 From: Siddharth Ganesan Date: Tue, 24 Mar 2026 02:14:10 -0700 Subject: [PATCH 9/9] Truncate log --- .../copilot/orchestrator/sse/handlers/tool-execution.ts | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/apps/sim/lib/copilot/orchestrator/sse/handlers/tool-execution.ts b/apps/sim/lib/copilot/orchestrator/sse/handlers/tool-execution.ts index 84f09010a1..4d5196d961 100644 --- a/apps/sim/lib/copilot/orchestrator/sse/handlers/tool-execution.ts +++ b/apps/sim/lib/copilot/orchestrator/sse/handlers/tool-execution.ts @@ -599,10 +599,17 @@ export async function executeToolAndReport( toolCall.endTime = Date.now() if (result.success) { + const raw = result.output + const preview = + typeof raw === 'string' + ? raw.slice(0, 200) + : raw && typeof raw === 'object' + ? JSON.stringify(raw).slice(0, 200) + : undefined logger.info('Tool execution succeeded', { toolCallId: toolCall.id, toolName: toolCall.name, - output: result.output, + outputPreview: preview, }) } else { logger.warn('Tool execution failed', {