From fbf0724162d5ced38105d384049a9b0c479ff9ce Mon Sep 17 00:00:00 2001 From: Vikhyath Mondreti Date: Wed, 25 Mar 2026 16:17:06 -0700 Subject: [PATCH 1/8] fix(client): network drops reconnecting behaviour --- .../mothership-chat/mothership-chat.tsx | 9 +- .../app/workspace/[workspaceId]/home/home.tsx | 2 + .../[workspaceId]/home/hooks/use-chat.ts | 642 ++++++++++++++---- .../w/[workflowId]/components/panel/panel.tsx | 2 + apps/sim/hooks/queries/tasks.ts | 5 +- apps/sim/lib/copilot/chat-streaming.ts | 47 +- 6 files changed, 565 insertions(+), 142 deletions(-) diff --git a/apps/sim/app/workspace/[workspaceId]/home/components/mothership-chat/mothership-chat.tsx b/apps/sim/app/workspace/[workspaceId]/home/components/mothership-chat/mothership-chat.tsx index e3d32331d25..abc9e7272bd 100644 --- a/apps/sim/app/workspace/[workspaceId]/home/components/mothership-chat/mothership-chat.tsx +++ b/apps/sim/app/workspace/[workspaceId]/home/components/mothership-chat/mothership-chat.tsx @@ -23,6 +23,7 @@ import type { ChatContext } from '@/stores/panel' interface MothershipChatProps { messages: ChatMessage[] isSending: boolean + isReconnecting?: boolean onSubmit: ( text: string, fileAttachments?: FileAttachmentForApi[], @@ -71,6 +72,7 @@ const LAYOUT_STYLES = { export function MothershipChat({ messages, isSending, + isReconnecting = false, onSubmit, onStopGeneration, messageQueue, @@ -88,7 +90,8 @@ export function MothershipChat({ className, }: MothershipChatProps) { const styles = LAYOUT_STYLES[layout] - const { ref: scrollContainerRef, scrollToBottom } = useAutoScroll(isSending) + const isStreamActive = isSending || isReconnecting + const { ref: scrollContainerRef, scrollToBottom } = useAutoScroll(isStreamActive) const hasMessages = messages.length > 0 const initialScrollDoneRef = useRef(false) @@ -131,7 +134,7 @@ export function MothershipChat({ msg.content ?? '' ) const isLastAssistant = index === messages.length - 1 - const isThisStreaming = isSending && isLastAssistant + const isThisStreaming = isStreamActive && isLastAssistant if (!hasAnyBlocks && !msg.content?.trim() && isThisStreaming) { return @@ -175,7 +178,7 @@ export function MothershipChat({ /> = { const DEPLOY_TOOL_NAMES = new Set(['deploy_api', 'deploy_chat', 'deploy_mcp', 'redeploy']) const RECONNECT_TAIL_ERROR = 'Live reconnect failed before the stream finished. The latest response may be incomplete.' +const TERMINAL_STREAM_STATUSES = new Set(['complete', 'error', 'cancelled']) + +interface StreamEventEnvelope { + eventId: number + streamId: string + event: Record +} + +interface StreamBatchResponse { + success: boolean + events: StreamEventEnvelope[] + status: string +} + +interface StreamTerminationResult { + sawStreamError: boolean + sawDoneEvent: boolean + lastEventId: number +} + +interface StreamProcessingOptions { + expectedGen?: number + initialLastEventId?: number + preserveExistingState?: boolean +} + +interface AttachToStreamOptions { + streamId: string + assistantId: string + expectedGen: number + snapshot?: StreamSnapshot | null + initialLastEventId?: number +} + +interface AttachToStreamResult { + aborted: boolean + error: boolean +} + +interface PendingStreamRecovery { + streamId: string + snapshot?: StreamSnapshot | null +} + +function isTerminalStreamStatus(status?: string | null): boolean { + return Boolean(status && TERMINAL_STREAM_STATUSES.has(status)) +} + +function isActiveStreamConflictError(input: unknown): boolean { + if (typeof input !== 'string') return false + return input.includes('A response is already in progress for this chat') +} + +function buildReplayStream(events: StreamEventEnvelope[]): ReadableStream { + const encoder = new TextEncoder() + return new ReadableStream({ + start(controller) { + if (events.length > 0) { + const payload = events + .map( + (entry) => + `data: ${JSON.stringify({ ...entry.event, eventId: entry.eventId, streamId: entry.streamId })}\n\n` + ) + .join('') + controller.enqueue(encoder.encode(payload)) + } + controller.close() + }, + }) +} function mapStoredBlock(block: TaskStoredContentBlock): ContentBlock { const mapped: ContentBlock = { @@ -345,9 +421,13 @@ export function useChat( ( reader: ReadableStreamDefaultReader, assistantId: string, - expectedGen?: number - ) => Promise - >(async () => false) + options?: StreamProcessingOptions + ) => Promise + >(async () => ({ + sawStreamError: false, + sawDoneEvent: false, + lastEventId: 0, + })) const finalizeRef = useRef<(options?: { error?: boolean }) => void>(() => {}) const abortControllerRef = useRef(null) @@ -359,10 +439,12 @@ export function useChat( const appliedChatIdRef = useRef(undefined) const pendingUserMsgRef = useRef<{ id: string; content: string } | null>(null) const streamIdRef = useRef(undefined) + const lastEventIdRef = useRef(0) const sendingRef = useRef(false) const streamGenRef = useRef(0) const streamingContentRef = useRef('') const streamingBlocksRef = useRef([]) + const clientExecutionStartedRef = useRef>(new Set()) const executionStream = useExecutionStream() const isHomePage = pathname.endsWith('/home') @@ -420,6 +502,8 @@ export function useChat( abortControllerRef.current = null sendingRef.current = false setIsSending(false) + setIsReconnecting(false) + lastEventIdRef.current = 0 if (abandonedChatId) { queryClient.invalidateQueries({ queryKey: taskKeys.detail(abandonedChatId) }) } @@ -441,6 +525,8 @@ export function useChat( setStreamingFile(null) streamingFileRef.current = null setMessageQueue([]) + lastEventIdRef.current = 0 + clientExecutionStartedRef.current.clear() }, [initialChatId, queryClient]) useEffect(() => { @@ -461,180 +547,338 @@ export function useChat( setStreamingFile(null) streamingFileRef.current = null setMessageQueue([]) + lastEventIdRef.current = 0 + clientExecutionStartedRef.current.clear() }, [isHomePage]) - useEffect(() => { - if (!chatHistory || appliedChatIdRef.current === chatHistory.id) return + const fetchStreamBatch = useCallback( + async ( + streamId: string, + fromEventId: number, + signal?: AbortSignal + ): Promise => { + const response = await fetch( + `${COPILOT_CHAT_STREAM_API_PATH}?streamId=${encodeURIComponent(streamId)}&from=${fromEventId}&batch=true`, + { signal } + ) + + if (!response.ok) { + throw new Error(`Stream resume batch failed: ${response.status}`) + } - const activeStreamId = chatHistory.activeStreamId - const snapshot = chatHistory.streamSnapshot - appliedChatIdRef.current = chatHistory.id - const mappedMessages = chatHistory.messages.map(mapStoredMessage) - const shouldPreserveActiveStreamingMessage = - sendingRef.current && Boolean(activeStreamId) && activeStreamId === streamIdRef.current - - if (shouldPreserveActiveStreamingMessage) { - setMessages((prev) => { - const localStreamingAssistant = prev[prev.length - 1] - if (localStreamingAssistant?.role !== 'assistant') { - return mappedMessages - } + return response.json() + }, + [] + ) - const nextMessages = - mappedMessages[mappedMessages.length - 1]?.role === 'assistant' - ? mappedMessages.slice(0, -1) - : mappedMessages + const attachToExistingStream = useCallback( + async ({ + streamId, + assistantId, + expectedGen, + snapshot, + initialLastEventId = 0, + }: AttachToStreamOptions): Promise => { + let latestEventId = initialLastEventId + let seedEvents = snapshot?.events ?? [] + let streamStatus = snapshot?.status ?? 'unknown' + let attachAttempt = 0 - return [...nextMessages, localStreamingAssistant] + setIsSending(true) + setIsReconnecting(true) + setError(null) + + logger.info('Attaching to existing stream', { + streamId, + expectedGen, + initialLastEventId, + seedEventCount: seedEvents.length, + streamStatus, }) - } else { - setMessages(mappedMessages) - } - if (chatHistory.resources.some((r) => r.id === 'streaming-file')) { - fetch('/api/copilot/chat/resources', { - method: 'DELETE', - headers: { 'Content-Type': 'application/json' }, - body: JSON.stringify({ - chatId: chatHistory.id, - resourceType: 'file', - resourceId: 'streaming-file', - }), - }).catch(() => {}) - } + try { + while (streamGenRef.current === expectedGen) { + if (seedEvents.length > 0) { + const replayResult = await processSSEStreamRef.current( + buildReplayStream(seedEvents).getReader(), + assistantId, + { + expectedGen, + initialLastEventId: latestEventId, + preserveExistingState: true, + } + ) + latestEventId = Math.max( + replayResult.lastEventId, + seedEvents[seedEvents.length - 1]?.eventId ?? latestEventId + ) + lastEventIdRef.current = latestEventId + seedEvents = [] + + if (replayResult.sawStreamError) { + logger.warn('Replay stream ended with error event', { streamId, latestEventId }) + return { aborted: false, error: true } + } + } + + if (isTerminalStreamStatus(streamStatus)) { + logger.info('Existing stream already reached terminal status', { + streamId, + latestEventId, + streamStatus, + }) + if (streamStatus === 'error') { + setError(RECONNECT_TAIL_ERROR) + } + return { aborted: false, error: streamStatus === 'error' } + } + + const activeAbortController = abortControllerRef.current + if (!activeAbortController) { + return { aborted: true, error: false } + } + + logger.info('Opening live stream tail', { + streamId, + fromEventId: latestEventId, + attempt: attachAttempt, + }) + + const sseRes = await fetch( + `${COPILOT_CHAT_STREAM_API_PATH}?streamId=${encodeURIComponent(streamId)}&from=${latestEventId}`, + { signal: activeAbortController.signal } + ) + if (!sseRes.ok || !sseRes.body) { + throw new Error(RECONNECT_TAIL_ERROR) + } - const persistedResources = chatHistory.resources.filter((r) => r.id !== 'streaming-file') - if (persistedResources.length > 0) { - setResources(persistedResources) - setActiveResourceId(persistedResources[persistedResources.length - 1].id) + setIsReconnecting(false) + + const liveResult = await processSSEStreamRef.current( + sseRes.body.getReader(), + assistantId, + { + expectedGen, + initialLastEventId: latestEventId, + preserveExistingState: true, + } + ) + latestEventId = Math.max(latestEventId, liveResult.lastEventId) + lastEventIdRef.current = latestEventId + + if (liveResult.sawStreamError) { + logger.warn('Live stream tail ended with error event', { streamId, latestEventId }) + return { aborted: false, error: true } + } + + attachAttempt += 1 + setIsReconnecting(true) + + logger.warn('Live stream ended without terminal event, fetching replay batch', { + streamId, + latestEventId, + attempt: attachAttempt, + }) + + const batch = await fetchStreamBatch( + streamId, + latestEventId, + activeAbortController.signal + ) + seedEvents = batch.events + streamStatus = batch.status - for (const resource of persistedResources) { - if (resource.type !== 'workflow') continue - ensureWorkflowInRegistry(resource.id, resource.title, workspaceId) + if (batch.events.length > 0) { + latestEventId = batch.events[batch.events.length - 1].eventId + lastEventIdRef.current = latestEventId + } + + logger.info('Fetched replay batch after non-terminal stream close', { + streamId, + latestEventId, + streamStatus, + eventCount: batch.events.length, + attempt: attachAttempt, + }) + + if (batch.events.length === 0 && batch.status === 'unknown') { + throw new Error(RECONNECT_TAIL_ERROR) + } + } + + return { aborted: true, error: false } + } catch (err) { + if (err instanceof Error && err.name === 'AbortError') { + return { aborted: true, error: false } + } + + logger.warn('Failed to attach to existing stream', { + streamId, + latestEventId, + error: err instanceof Error ? err.message : String(err), + }) + setError(err instanceof Error ? err.message : RECONNECT_TAIL_ERROR) + return { aborted: false, error: true } + } finally { + setIsReconnecting(false) } - } else if (chatHistory.resources.some((r) => r.id === 'streaming-file')) { - setResources([]) - setActiveResourceId(null) - } + }, + [fetchStreamBatch] + ) + + const applyChatHistorySnapshot = useCallback( + (history: TaskChatHistory, options?: { preserveActiveStreamingMessage?: boolean }) => { + const preserveActiveStreamingMessage = options?.preserveActiveStreamingMessage ?? false + const activeStreamId = history.activeStreamId + appliedChatIdRef.current = history.id + + const mappedMessages = history.messages.map(mapStoredMessage) + const shouldPreserveActiveStreamingMessage = + preserveActiveStreamingMessage && + sendingRef.current && + Boolean(activeStreamId) && + activeStreamId === streamIdRef.current + + if (shouldPreserveActiveStreamingMessage) { + setMessages((prev) => { + const localStreamingAssistant = prev[prev.length - 1] + if (localStreamingAssistant?.role !== 'assistant') { + return mappedMessages + } + + const nextMessages = + mappedMessages[mappedMessages.length - 1]?.role === 'assistant' + ? mappedMessages.slice(0, -1) + : mappedMessages + + return [...nextMessages, localStreamingAssistant] + }) + } else { + setMessages(mappedMessages) + } + + if (history.resources.some((r) => r.id === 'streaming-file')) { + fetch('/api/copilot/chat/resources', { + method: 'DELETE', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ + chatId: history.id, + resourceType: 'file', + resourceId: 'streaming-file', + }), + }).catch(() => {}) + } + + const persistedResources = history.resources.filter((r) => r.id !== 'streaming-file') + if (persistedResources.length > 0) { + setResources(persistedResources) + setActiveResourceId(persistedResources[persistedResources.length - 1].id) + + for (const resource of persistedResources) { + if (resource.type !== 'workflow') continue + ensureWorkflowInRegistry(resource.id, resource.title, workspaceId) + } + } else if (history.resources.some((r) => r.id === 'streaming-file')) { + setResources([]) + setActiveResourceId(null) + } + }, + [workspaceId] + ) + + const preparePendingStreamRecovery = useCallback( + async (chatId: string): Promise => { + const latestHistory = await fetchChatHistory(chatId) + queryClient.setQueryData(taskKeys.detail(chatId), latestHistory) + applyChatHistorySnapshot(latestHistory) + + if (!latestHistory.activeStreamId) { + return null + } + + return { + streamId: latestHistory.activeStreamId, + snapshot: latestHistory.streamSnapshot, + } + }, + [applyChatHistorySnapshot, queryClient] + ) + + useEffect(() => { + if (!chatHistory || appliedChatIdRef.current === chatHistory.id) return + + const activeStreamId = chatHistory.activeStreamId + const snapshot = chatHistory.streamSnapshot + applyChatHistorySnapshot(chatHistory, { preserveActiveStreamingMessage: true }) if (activeStreamId && !sendingRef.current) { const gen = ++streamGenRef.current const abortController = new AbortController() abortControllerRef.current = abortController streamIdRef.current = activeStreamId + lastEventIdRef.current = snapshot?.events?.[snapshot.events.length - 1]?.eventId ?? 0 sendingRef.current = true - setIsReconnecting(true) const assistantId = crypto.randomUUID() const reconnect = async () => { - let reconnectFailed = false try { - const encoder = new TextEncoder() - - const batchEvents = snapshot?.events ?? [] - const streamStatus = snapshot?.status ?? '' - - if (batchEvents.length === 0 && streamStatus === 'unknown') { - reconnectFailed = true - setError(RECONNECT_TAIL_ERROR) - return - } - - setIsSending(true) - setIsReconnecting(false) - - const lastEventId = - batchEvents.length > 0 ? batchEvents[batchEvents.length - 1].eventId : 0 - const isStreamDone = - streamStatus === 'complete' || streamStatus === 'error' || streamStatus === 'cancelled' - - const combinedStream = new ReadableStream({ - async start(controller) { - if (batchEvents.length > 0) { - const sseText = batchEvents - .map((e) => `data: ${JSON.stringify(e.event)}\n`) - .join('\n') - controller.enqueue(encoder.encode(`${sseText}\n`)) - } - - if (!isStreamDone) { - try { - const sseRes = await fetch( - `/api/copilot/chat/stream?streamId=${activeStreamId}&from=${lastEventId}`, - { signal: abortController.signal } - ) - if (!sseRes.ok || !sseRes.body) { - reconnectFailed = true - logger.warn('SSE tail reconnect returned no readable body', { - status: sseRes.status, - streamId: activeStreamId, - }) - setError(RECONNECT_TAIL_ERROR) - } else { - const reader = sseRes.body.getReader() - while (true) { - const { done, value } = await reader.read() - if (done) break - controller.enqueue(value) - } - } - } catch (err) { - if (!(err instanceof Error && err.name === 'AbortError')) { - reconnectFailed = true - logger.warn('SSE tail failed during reconnect', err) - setError(RECONNECT_TAIL_ERROR) - } - } - } - - controller.close() - }, - }) - - const hadStreamError = await processSSEStreamRef.current( - combinedStream.getReader(), + const result = await attachToExistingStream({ + streamId: activeStreamId, assistantId, - gen - ) - if (hadStreamError) { - reconnectFailed = true + expectedGen: gen, + snapshot, + initialLastEventId: lastEventIdRef.current, + }) + if (streamGenRef.current === gen && !result.aborted) { + finalizeRef.current(result.error ? { error: true } : undefined) } } catch (err) { - if (err instanceof Error && err.name === 'AbortError') return - reconnectFailed = true } finally { - setIsReconnecting(false) - if (streamGenRef.current === gen) { - finalizeRef.current(reconnectFailed ? { error: true } : undefined) + if (abortControllerRef.current === abortController) { + abortControllerRef.current = null } } } reconnect() } - }, [chatHistory, workspaceId, queryClient]) + }, [applyChatHistorySnapshot, attachToExistingStream, chatHistory, queryClient]) const processSSEStream = useCallback( async ( reader: ReadableStreamDefaultReader, assistantId: string, - expectedGen?: number + options?: StreamProcessingOptions ) => { + const { expectedGen, initialLastEventId = 0, preserveExistingState = false } = options ?? {} const decoder = new TextDecoder() streamReaderRef.current = reader let buffer = '' - const blocks: ContentBlock[] = [] + const blocks: ContentBlock[] = preserveExistingState ? [...streamingBlocksRef.current] : [] const toolMap = new Map() const toolArgsMap = new Map>() - const clientExecutionStarted = new Set() + const clientExecutionStarted = clientExecutionStartedRef.current let activeSubagent: string | undefined let activeCompactionId: string | undefined - let runningText = '' + let runningText = preserveExistingState ? streamingContentRef.current : '' let lastContentSource: 'main' | 'subagent' | null = null let streamRequestId: string | undefined + let lastEventId = initialLastEventId + let sawDoneEvent = false - streamingContentRef.current = '' - streamingBlocksRef.current = [] + if (!preserveExistingState) { + streamingContentRef.current = '' + streamingBlocksRef.current = [] + } + + for (const [index, block] of blocks.entries()) { + if (block.type === 'tool_call' && block.toolCall?.id) { + toolMap.set(block.toolCall.id, index) + if (block.toolCall.params) { + toolArgsMap.set(block.toolCall.id, block.toolCall.params) + } + } + } const ensureTextBlock = (): ContentBlock => { const last = blocks[blocks.length - 1] @@ -716,6 +960,14 @@ export function useChat( continue } + if (typeof (parsed as SSEPayload & { eventId?: unknown }).eventId === 'number') { + lastEventId = Math.max( + lastEventId, + (parsed as SSEPayload & { eventId: number }).eventId + ) + lastEventIdRef.current = lastEventId + } + logger.debug('SSE event received', parsed) switch (parsed.type) { case 'chat_id': { @@ -1167,6 +1419,10 @@ export function useChat( appendInlineErrorTag(buildInlineErrorTag(parsed)) break } + case 'done': { + sawDoneEvent = true + break + } } } } @@ -1175,7 +1431,11 @@ export function useChat( streamReaderRef.current = null } } - return sawStreamError + return { + sawStreamError, + sawDoneEvent, + lastEventId, + } }, [workspaceId, queryClient, addResource, removeResource] ) @@ -1307,6 +1567,8 @@ export function useChat( pendingUserMsgRef.current = { id: userMessageId, content: message } streamIdRef.current = userMessageId + lastEventIdRef.current = 0 + clientExecutionStartedRef.current.clear() const storedAttachments: TaskStoredFileAttachment[] | undefined = fileAttachments && fileAttachments.length > 0 @@ -1339,6 +1601,10 @@ export function useChat( } const userAttachments = storedAttachments?.map(toDisplayAttachment) + const previousMessages = messagesRef.current + const previousChatHistory = requestChatId + ? queryClient.getQueryData(taskKeys.detail(requestChatId)) + : undefined const messageContexts = contexts?.map((c) => ({ kind: c.kind, @@ -1402,20 +1668,120 @@ export function useChat( if (!response.body) throw new Error('No response body') - const hadStreamError = await processSSEStream(response.body.getReader(), assistantId, gen) + const termination = await processSSEStream(response.body.getReader(), assistantId, { + expectedGen: gen, + }) if (streamGenRef.current === gen) { - finalize(hadStreamError ? { error: true } : undefined) + if (termination.sawStreamError) { + finalize({ error: true }) + return + } + + const batch = await fetchStreamBatch(userMessageId, termination.lastEventId) + if (isTerminalStreamStatus(batch.status)) { + finalize(batch.status === 'error' ? { error: true } : undefined) + return + } + + logger.warn( + 'Primary stream ended without terminal event, attempting in-place reconnect', + { + streamId: userMessageId, + lastEventId: termination.lastEventId, + streamStatus: batch.status, + sawDoneEvent: termination.sawDoneEvent, + } + ) + + const reconnectResult = await attachToExistingStream({ + streamId: userMessageId, + assistantId, + expectedGen: gen, + snapshot: { + events: batch.events, + status: batch.status, + }, + initialLastEventId: + batch.events[batch.events.length - 1]?.eventId ?? termination.lastEventId, + }) + + if (streamGenRef.current === gen && !reconnectResult.aborted) { + finalize(reconnectResult.error ? { error: true } : undefined) + } } } catch (err) { if (err instanceof Error && err.name === 'AbortError') return - setError(err instanceof Error ? err.message : 'Failed to send message') + const errorMessage = err instanceof Error ? err.message : 'Failed to send message' + if (requestChatId && isActiveStreamConflictError(errorMessage)) { + logger.info('Active stream conflict detected while sending message; reattaching', { + chatId: requestChatId, + attemptedStreamId: userMessageId, + }) + + if (previousChatHistory) { + queryClient.setQueryData(taskKeys.detail(requestChatId), previousChatHistory) + } + setMessages(previousMessages) + const queuedMessage: QueuedMessage = { + id: crypto.randomUUID(), + content: message, + fileAttachments, + contexts, + } + const nextQueue = [...messageQueueRef.current, queuedMessage] + messageQueueRef.current = nextQueue + setMessageQueue(nextQueue) + + try { + const pendingRecovery = await preparePendingStreamRecovery(requestChatId) + if (!pendingRecovery) { + setError(errorMessage) + if (streamGenRef.current === gen) { + finalize({ error: true }) + } + return + } + + streamIdRef.current = pendingRecovery.streamId + lastEventIdRef.current = + pendingRecovery.snapshot?.events?.[pendingRecovery.snapshot.events.length - 1] + ?.eventId ?? 0 + + const reconnectResult = await attachToExistingStream({ + streamId: pendingRecovery.streamId, + assistantId, + expectedGen: gen, + snapshot: pendingRecovery.snapshot, + initialLastEventId: lastEventIdRef.current, + }) + + if (streamGenRef.current === gen && !reconnectResult.aborted) { + finalize(reconnectResult.error ? { error: true } : undefined) + } + return + } catch (recoveryError) { + logger.warn('Failed to recover active stream after conflict', { + chatId: requestChatId, + error: recoveryError instanceof Error ? recoveryError.message : String(recoveryError), + }) + } + } + + setError(errorMessage) if (streamGenRef.current === gen) { finalize({ error: true }) } return } }, - [workspaceId, queryClient, processSSEStream, finalize] + [ + workspaceId, + queryClient, + processSSEStream, + finalize, + attachToExistingStream, + preparePendingStreamRecovery, + ] ) sendMessageRef.current = sendMessage @@ -1434,6 +1800,8 @@ export function useChat( abortControllerRef.current = null sendingRef.current = false setIsSending(false) + setIsReconnecting(false) + lastEventIdRef.current = 0 setMessages((prev) => prev.map((msg) => { @@ -1552,6 +1920,8 @@ export function useChat( abortControllerRef.current = null streamGenRef.current++ sendingRef.current = false + lastEventIdRef.current = 0 + clientExecutionStartedRef.current.clear() } }, []) diff --git a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/panel.tsx b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/panel.tsx index f5a65ab0760..0ea9cb6c51a 100644 --- a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/panel.tsx +++ b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/panel.tsx @@ -318,6 +318,7 @@ export const Panel = memo(function Panel() { const { messages: copilotMessages, isSending: copilotIsSending, + isReconnecting: copilotIsReconnecting, sendMessage: copilotSendMessage, stopGeneration: copilotStopGeneration, resolvedChatId: copilotResolvedChatId, @@ -812,6 +813,7 @@ export const Panel = memo(function Panel() { className='min-h-0 flex-1' messages={copilotMessages} isSending={copilotIsSending} + isReconnecting={copilotIsReconnecting} onSubmit={handleCopilotSubmit} onStopGeneration={copilotStopGeneration} messageQueue={copilotMessageQueue} diff --git a/apps/sim/hooks/queries/tasks.ts b/apps/sim/hooks/queries/tasks.ts index 8bc3c10c536..6e071b60588 100644 --- a/apps/sim/hooks/queries/tasks.ts +++ b/apps/sim/hooks/queries/tasks.ts @@ -129,7 +129,10 @@ export function useTasks(workspaceId?: string) { }) } -async function fetchChatHistory(chatId: string, signal?: AbortSignal): Promise { +export async function fetchChatHistory( + chatId: string, + signal?: AbortSignal +): Promise { const response = await fetch(`/api/copilot/chat?chatId=${chatId}`, { signal }) if (!response.ok) { diff --git a/apps/sim/lib/copilot/chat-streaming.ts b/apps/sim/lib/copilot/chat-streaming.ts index 6d090a4866b..b3d465667d6 100644 --- a/apps/sim/lib/copilot/chat-streaming.ts +++ b/apps/sim/lib/copilot/chat-streaming.ts @@ -294,6 +294,21 @@ export function createSSEStream(params: StreamingOrchestrationParams): ReadableS return new ReadableStream({ async start(controller) { const encoder = new TextEncoder() + const markClientDisconnected = (reason: string) => { + if (clientDisconnected) return + clientDisconnected = true + logger.info( + appendCopilotLogContext('Client disconnected from live SSE stream', { + requestId, + messageId, + }), + { + streamId, + runId, + reason, + } + ) + } await resetStreamBuffer(streamId) await setStreamMeta(streamId, { status: 'active', userId, executionId, runId }) @@ -381,7 +396,7 @@ export function createSSEStream(params: StreamingOrchestrationParams): ReadableS ) } } catch { - clientDisconnected = true + markClientDisconnected('enqueue_failed') } } @@ -424,7 +439,7 @@ export function createSSEStream(params: StreamingOrchestrationParams): ReadableS try { controller.enqueue(encoder.encode(': keepalive\n\n')) } catch { - clientDisconnected = true + markClientDisconnected('keepalive_failed') } }, 15_000) @@ -498,6 +513,18 @@ export function createSSEStream(params: StreamingOrchestrationParams): ReadableS await eventWriter.close() await setStreamMeta(streamId, { status: 'complete', userId, executionId, runId }) await updateRunStatus(runId, 'complete', { completedAt: new Date() }).catch(() => {}) + if (clientDisconnected) { + logger.info( + appendCopilotLogContext('Orchestration completed after client disconnect', { + requestId, + messageId, + }), + { + streamId, + runId, + } + ) + } } catch (error) { if (abortController.signal.aborted) { logger.error( @@ -544,6 +571,12 @@ export function createSSEStream(params: StreamingOrchestrationParams): ReadableS error: errorMessage, }).catch(() => {}) } finally { + logger.info(appendCopilotLogContext('Closing live SSE stream', { requestId, messageId }), { + streamId, + runId, + clientDisconnected, + aborted: abortController.signal.aborted, + }) clearInterval(keepaliveInterval) if (abortPoller) { clearInterval(abortPoller) @@ -566,6 +599,16 @@ export function createSSEStream(params: StreamingOrchestrationParams): ReadableS } }, cancel() { + logger.info( + appendCopilotLogContext('ReadableStream cancel received from client', { + requestId, + messageId, + }), + { + streamId, + runId, + } + ) clientDisconnected = true if (eventWriter) { eventWriter.flush().catch(() => {}) From bd3f7ab212641b47bde40cf76b32626340d2dbe7 Mon Sep 17 00:00:00 2001 From: Vikhyath Mondreti Date: Wed, 25 Mar 2026 16:26:52 -0700 Subject: [PATCH 2/8] address bugbot comments --- .../app/workspace/[workspaceId]/home/hooks/use-chat.ts | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts b/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts index d49e6fae13c..811b460320f 100644 --- a/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts +++ b/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts @@ -1582,6 +1582,9 @@ export function useChat( : undefined const requestChatId = selectedChatIdRef.current ?? chatIdRef.current + const previousChatHistory = requestChatId + ? queryClient.getQueryData(taskKeys.detail(requestChatId)) + : undefined if (requestChatId) { const cachedUserMsg: TaskStoredMessage = { id: userMessageId, @@ -1602,9 +1605,6 @@ export function useChat( const userAttachments = storedAttachments?.map(toDisplayAttachment) const previousMessages = messagesRef.current - const previousChatHistory = requestChatId - ? queryClient.getQueryData(taskKeys.detail(requestChatId)) - : undefined const messageContexts = contexts?.map((c) => ({ kind: c.kind, @@ -1678,6 +1678,9 @@ export function useChat( } const batch = await fetchStreamBatch(userMessageId, termination.lastEventId) + if (streamGenRef.current !== gen) { + return + } if (isTerminalStreamStatus(batch.status)) { finalize(batch.status === 'error' ? { error: true } : undefined) return From 635bd1abad180ba7ae89feb8e7441f1e9103f032 Mon Sep 17 00:00:00 2001 From: Vikhyath Mondreti Date: Wed, 25 Mar 2026 16:30:32 -0700 Subject: [PATCH 3/8] address comments --- .../[workspaceId]/home/hooks/use-chat.ts | 29 ++++++++++++++++++- 1 file changed, 28 insertions(+), 1 deletion(-) diff --git a/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts b/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts index 811b460320f..bc84b1c20f5 100644 --- a/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts +++ b/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts @@ -834,6 +834,29 @@ export function useChat( finalizeRef.current(result.error ? { error: true } : undefined) } } catch (err) { + if (err instanceof Error && err.name === 'AbortError') return + logger.warn('Unexpected error during reconnect', { + streamId: activeStreamId, + chatId: chatHistory.id, + error: err instanceof Error ? err.message : String(err), + }) + if (streamGenRef.current === gen) { + try { + finalizeRef.current({ error: true }) + } catch (finalizeError) { + logger.error('Reconnect fallback finalize failed', { + streamId: activeStreamId, + chatId: chatHistory.id, + error: + finalizeError instanceof Error ? finalizeError.message : String(finalizeError), + }) + sendingRef.current = false + setIsSending(false) + setIsReconnecting(false) + abortControllerRef.current = null + setError('Failed to reconnect to the active stream') + } + } } finally { if (abortControllerRef.current === abortController) { abortControllerRef.current = null @@ -1677,7 +1700,11 @@ export function useChat( return } - const batch = await fetchStreamBatch(userMessageId, termination.lastEventId) + const batch = await fetchStreamBatch( + userMessageId, + termination.lastEventId, + abortController.signal + ) if (streamGenRef.current !== gen) { return } From a3c9d3b3d4c0603ea9f43df9abb871f8c5039779 Mon Sep 17 00:00:00 2001 From: Vikhyath Mondreti Date: Wed, 25 Mar 2026 16:43:03 -0700 Subject: [PATCH 4/8] address queued message conflicts during retries --- .../[workspaceId]/home/hooks/use-chat.ts | 64 +++++++++++++++++-- 1 file changed, 57 insertions(+), 7 deletions(-) diff --git a/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts b/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts index bc84b1c20f5..c14fb30dde2 100644 --- a/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts +++ b/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts @@ -415,6 +415,9 @@ export function useChat( const [messageQueue, setMessageQueue] = useState([]) const messageQueueRef = useRef([]) messageQueueRef.current = messageQueue + const [pendingRecoveryMessage, setPendingRecoveryMessage] = useState(null) + const pendingRecoveryMessageRef = useRef(null) + pendingRecoveryMessageRef.current = pendingRecoveryMessage const sendMessageRef = useRef(async () => {}) const processSSEStreamRef = useRef< @@ -1530,6 +1533,16 @@ export function useChat( const messagesRef = useRef(messages) messagesRef.current = messages + const visibleMessageQueue = useMemo( + () => + pendingRecoveryMessage + ? [ + pendingRecoveryMessage, + ...messageQueue.filter((msg) => msg.id !== pendingRecoveryMessage.id), + ] + : messageQueue, + [messageQueue, pendingRecoveryMessage] + ) const finalize = useCallback( (options?: { error?: boolean }) => { @@ -1550,6 +1563,21 @@ export function useChat( return } + const recoveryMessage = pendingRecoveryMessageRef.current + if (recoveryMessage) { + setPendingRecoveryMessage(null) + const gen = streamGenRef.current + queueMicrotask(() => { + if (streamGenRef.current !== gen) return + sendMessageRef.current( + recoveryMessage.content, + recoveryMessage.fileAttachments, + recoveryMessage.contexts + ) + }) + return + } + const next = messageQueueRef.current[0] if (next) { setMessageQueue((prev) => prev.filter((m) => m.id !== next.id)) @@ -1758,9 +1786,7 @@ export function useChat( fileAttachments, contexts, } - const nextQueue = [...messageQueueRef.current, queuedMessage] - messageQueueRef.current = nextQueue - setMessageQueue(nextQueue) + setPendingRecoveryMessage(queuedMessage) try { const pendingRecovery = await preparePendingStreamRecovery(requestChatId) @@ -1919,24 +1945,47 @@ export function useChat( }, [invalidateChatQueries, persistPartialResponse, executionStream]) const removeFromQueue = useCallback((id: string) => { + if (pendingRecoveryMessageRef.current?.id === id) { + pendingRecoveryMessageRef.current = null + setPendingRecoveryMessage(null) + return + } messageQueueRef.current = messageQueueRef.current.filter((m) => m.id !== id) setMessageQueue((prev) => prev.filter((m) => m.id !== id)) }, []) const sendNow = useCallback( async (id: string) => { - const msg = messageQueueRef.current.find((m) => m.id === id) + const recoveryMessage = pendingRecoveryMessageRef.current + const msg = + recoveryMessage?.id === id + ? recoveryMessage + : messageQueueRef.current.find((m) => m.id === id) if (!msg) return // Eagerly update ref so a rapid second click finds the message already gone - messageQueueRef.current = messageQueueRef.current.filter((m) => m.id !== id) + if (recoveryMessage?.id === id) { + pendingRecoveryMessageRef.current = null + setPendingRecoveryMessage(null) + } else { + messageQueueRef.current = messageQueueRef.current.filter((m) => m.id !== id) + } await stopGeneration() - setMessageQueue((prev) => prev.filter((m) => m.id !== id)) + if (recoveryMessage?.id !== id) { + setMessageQueue((prev) => prev.filter((m) => m.id !== id)) + } await sendMessage(msg.content, msg.fileAttachments, msg.contexts) }, [stopGeneration, sendMessage] ) const editQueuedMessage = useCallback((id: string): QueuedMessage | undefined => { + const recoveryMessage = pendingRecoveryMessageRef.current + if (recoveryMessage?.id === id) { + pendingRecoveryMessageRef.current = null + setPendingRecoveryMessage(null) + return recoveryMessage + } + const msg = messageQueueRef.current.find((m) => m.id === id) if (!msg) return undefined messageQueueRef.current = messageQueueRef.current.filter((m) => m.id !== id) @@ -1952,6 +2001,7 @@ export function useChat( sendingRef.current = false lastEventIdRef.current = 0 clientExecutionStartedRef.current.clear() + pendingRecoveryMessageRef.current = null } }, []) @@ -1969,7 +2019,7 @@ export function useChat( addResource, removeResource, reorderResources, - messageQueue, + messageQueue: visibleMessageQueue, removeFromQueue, sendNow, editQueuedMessage, From 8182d8419d9b6fae02f96a69ac832ee5929968d3 Mon Sep 17 00:00:00 2001 From: Vikhyath Mondreti Date: Wed, 25 Mar 2026 16:52:40 -0700 Subject: [PATCH 5/8] fix more review comments --- .../[workspaceId]/home/hooks/use-chat.ts | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts b/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts index c14fb30dde2..e3ed092492c 100644 --- a/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts +++ b/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts @@ -507,6 +507,8 @@ export function useChat( setIsSending(false) setIsReconnecting(false) lastEventIdRef.current = 0 + pendingRecoveryMessageRef.current = null + setPendingRecoveryMessage(null) if (abandonedChatId) { queryClient.invalidateQueries({ queryKey: taskKeys.detail(abandonedChatId) }) } @@ -530,6 +532,8 @@ export function useChat( setMessageQueue([]) lastEventIdRef.current = 0 clientExecutionStartedRef.current.clear() + pendingRecoveryMessageRef.current = null + setPendingRecoveryMessage(null) }, [initialChatId, queryClient]) useEffect(() => { @@ -552,6 +556,8 @@ export function useChat( setMessageQueue([]) lastEventIdRef.current = 0 clientExecutionStartedRef.current.clear() + pendingRecoveryMessageRef.current = null + setPendingRecoveryMessage(null) }, [isHomePage]) const fetchStreamBatch = useCallback( @@ -704,7 +710,7 @@ export function useChat( attempt: attachAttempt, }) - if (batch.events.length === 0 && batch.status === 'unknown') { + if (batch.events.length === 0 && !isTerminalStreamStatus(batch.status)) { throw new Error(RECONNECT_TAIL_ERROR) } } @@ -1803,9 +1809,15 @@ export function useChat( pendingRecovery.snapshot?.events?.[pendingRecovery.snapshot.events.length - 1] ?.eventId ?? 0 + const rehydratedMessages = messagesRef.current + const lastAssistantMsg = [...rehydratedMessages] + .reverse() + .find((m) => m.role === 'assistant') + const recoveryAssistantId = lastAssistantMsg?.id ?? assistantId + const reconnectResult = await attachToExistingStream({ streamId: pendingRecovery.streamId, - assistantId, + assistantId: recoveryAssistantId, expectedGen: gen, snapshot: pendingRecovery.snapshot, initialLastEventId: lastEventIdRef.current, @@ -1858,6 +1870,8 @@ export function useChat( setIsSending(false) setIsReconnecting(false) lastEventIdRef.current = 0 + pendingRecoveryMessageRef.current = null + setPendingRecoveryMessage(null) setMessages((prev) => prev.map((msg) => { From 99e642e077ef45a3496fe2f7948154eabd217a1d Mon Sep 17 00:00:00 2001 From: Vikhyath Mondreti Date: Wed, 25 Mar 2026 17:04:29 -0700 Subject: [PATCH 6/8] fix branch --- .../app/workspace/[workspaceId]/home/hooks/use-chat.ts | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts b/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts index e3ed092492c..f1c58288036 100644 --- a/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts +++ b/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts @@ -711,7 +711,15 @@ export function useChat( }) if (batch.events.length === 0 && !isTerminalStreamStatus(batch.status)) { - throw new Error(RECONNECT_TAIL_ERROR) + logger.info('No new replay events yet; reopening active stream tail', { + streamId, + latestEventId, + streamStatus, + attempt: attachAttempt, + }) + if (activeAbortController.signal.aborted || streamGenRef.current !== expectedGen) { + return { aborted: true, error: false } + } } } From a829ffc5420c1501554c9fbab0e0d07ac8cae55f Mon Sep 17 00:00:00 2001 From: Vikhyath Mondreti Date: Wed, 25 Mar 2026 17:16:49 -0700 Subject: [PATCH 7/8] fix non-clear bug --- apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts | 3 +++ 1 file changed, 3 insertions(+) diff --git a/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts b/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts index f1c58288036..704d2ae7f23 100644 --- a/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts +++ b/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts @@ -835,6 +835,9 @@ export function useChat( streamIdRef.current = activeStreamId lastEventIdRef.current = snapshot?.events?.[snapshot.events.length - 1]?.eventId ?? 0 sendingRef.current = true + streamingContentRef.current = '' + streamingBlocksRef.current = [] + clientExecutionStartedRef.current.clear() const assistantId = crypto.randomUUID() From 03d6cab998fab830bca7554257ba6a068e8fb5d5 Mon Sep 17 00:00:00 2001 From: Vikhyath Mondreti Date: Wed, 25 Mar 2026 17:25:27 -0700 Subject: [PATCH 8/8] fix --- apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts b/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts index 704d2ae7f23..f2a41c89b96 100644 --- a/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts +++ b/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts @@ -1803,6 +1803,7 @@ export function useChat( fileAttachments, contexts, } + pendingRecoveryMessageRef.current = queuedMessage setPendingRecoveryMessage(queuedMessage) try {