diff --git a/apps/sim/app/workspace/[workspaceId]/home/components/mothership-chat/mothership-chat.tsx b/apps/sim/app/workspace/[workspaceId]/home/components/mothership-chat/mothership-chat.tsx index e3d32331d25..abc9e7272bd 100644 --- a/apps/sim/app/workspace/[workspaceId]/home/components/mothership-chat/mothership-chat.tsx +++ b/apps/sim/app/workspace/[workspaceId]/home/components/mothership-chat/mothership-chat.tsx @@ -23,6 +23,7 @@ import type { ChatContext } from '@/stores/panel' interface MothershipChatProps { messages: ChatMessage[] isSending: boolean + isReconnecting?: boolean onSubmit: ( text: string, fileAttachments?: FileAttachmentForApi[], @@ -71,6 +72,7 @@ const LAYOUT_STYLES = { export function MothershipChat({ messages, isSending, + isReconnecting = false, onSubmit, onStopGeneration, messageQueue, @@ -88,7 +90,8 @@ export function MothershipChat({ className, }: MothershipChatProps) { const styles = LAYOUT_STYLES[layout] - const { ref: scrollContainerRef, scrollToBottom } = useAutoScroll(isSending) + const isStreamActive = isSending || isReconnecting + const { ref: scrollContainerRef, scrollToBottom } = useAutoScroll(isStreamActive) const hasMessages = messages.length > 0 const initialScrollDoneRef = useRef(false) @@ -131,7 +134,7 @@ export function MothershipChat({ msg.content ?? '' ) const isLastAssistant = index === messages.length - 1 - const isThisStreaming = isSending && isLastAssistant + const isThisStreaming = isStreamActive && isLastAssistant if (!hasAnyBlocks && !msg.content?.trim() && isThisStreaming) { return @@ -175,7 +178,7 @@ export function MothershipChat({ /> = { const DEPLOY_TOOL_NAMES = new Set(['deploy_api', 'deploy_chat', 'deploy_mcp', 'redeploy']) const RECONNECT_TAIL_ERROR = 'Live reconnect failed before the stream finished. The latest response may be incomplete.' +const TERMINAL_STREAM_STATUSES = new Set(['complete', 'error', 'cancelled']) + +interface StreamEventEnvelope { + eventId: number + streamId: string + event: Record +} + +interface StreamBatchResponse { + success: boolean + events: StreamEventEnvelope[] + status: string +} + +interface StreamTerminationResult { + sawStreamError: boolean + sawDoneEvent: boolean + lastEventId: number +} + +interface StreamProcessingOptions { + expectedGen?: number + initialLastEventId?: number + preserveExistingState?: boolean +} + +interface AttachToStreamOptions { + streamId: string + assistantId: string + expectedGen: number + snapshot?: StreamSnapshot | null + initialLastEventId?: number +} + +interface AttachToStreamResult { + aborted: boolean + error: boolean +} + +interface PendingStreamRecovery { + streamId: string + snapshot?: StreamSnapshot | null +} + +function isTerminalStreamStatus(status?: string | null): boolean { + return Boolean(status && TERMINAL_STREAM_STATUSES.has(status)) +} + +function isActiveStreamConflictError(input: unknown): boolean { + if (typeof input !== 'string') return false + return input.includes('A response is already in progress for this chat') +} + +function buildReplayStream(events: StreamEventEnvelope[]): ReadableStream { + const encoder = new TextEncoder() + return new ReadableStream({ + start(controller) { + if (events.length > 0) { + const payload = events + .map( + (entry) => + `data: ${JSON.stringify({ ...entry.event, eventId: entry.eventId, streamId: entry.streamId })}\n\n` + ) + .join('') + controller.enqueue(encoder.encode(payload)) + } + controller.close() + }, + }) +} function mapStoredBlock(block: TaskStoredContentBlock): ContentBlock { const mapped: ContentBlock = { @@ -339,15 +415,22 @@ export function useChat( const [messageQueue, setMessageQueue] = useState([]) const messageQueueRef = useRef([]) messageQueueRef.current = messageQueue + const [pendingRecoveryMessage, setPendingRecoveryMessage] = useState(null) + const pendingRecoveryMessageRef = useRef(null) + pendingRecoveryMessageRef.current = pendingRecoveryMessage const sendMessageRef = useRef(async () => {}) const processSSEStreamRef = useRef< ( reader: ReadableStreamDefaultReader, assistantId: string, - expectedGen?: number - ) => Promise - >(async () => false) + options?: StreamProcessingOptions + ) => Promise + >(async () => ({ + sawStreamError: false, + sawDoneEvent: false, + lastEventId: 0, + })) const finalizeRef = useRef<(options?: { error?: boolean }) => void>(() => {}) const abortControllerRef = useRef(null) @@ -359,10 +442,12 @@ export function useChat( const appliedChatIdRef = useRef(undefined) const pendingUserMsgRef = useRef<{ id: string; content: string } | null>(null) const streamIdRef = useRef(undefined) + const lastEventIdRef = useRef(0) const sendingRef = useRef(false) const streamGenRef = useRef(0) const streamingContentRef = useRef('') const streamingBlocksRef = useRef([]) + const clientExecutionStartedRef = useRef>(new Set()) const executionStream = useExecutionStream() const isHomePage = pathname.endsWith('/home') @@ -420,6 +505,10 @@ export function useChat( abortControllerRef.current = null sendingRef.current = false setIsSending(false) + setIsReconnecting(false) + lastEventIdRef.current = 0 + pendingRecoveryMessageRef.current = null + setPendingRecoveryMessage(null) if (abandonedChatId) { queryClient.invalidateQueries({ queryKey: taskKeys.detail(abandonedChatId) }) } @@ -441,6 +530,10 @@ export function useChat( setStreamingFile(null) streamingFileRef.current = null setMessageQueue([]) + lastEventIdRef.current = 0 + clientExecutionStartedRef.current.clear() + pendingRecoveryMessageRef.current = null + setPendingRecoveryMessage(null) }, [initialChatId, queryClient]) useEffect(() => { @@ -461,180 +554,374 @@ export function useChat( setStreamingFile(null) streamingFileRef.current = null setMessageQueue([]) + lastEventIdRef.current = 0 + clientExecutionStartedRef.current.clear() + pendingRecoveryMessageRef.current = null + setPendingRecoveryMessage(null) }, [isHomePage]) - useEffect(() => { - if (!chatHistory || appliedChatIdRef.current === chatHistory.id) return + const fetchStreamBatch = useCallback( + async ( + streamId: string, + fromEventId: number, + signal?: AbortSignal + ): Promise => { + const response = await fetch( + `${COPILOT_CHAT_STREAM_API_PATH}?streamId=${encodeURIComponent(streamId)}&from=${fromEventId}&batch=true`, + { signal } + ) + + if (!response.ok) { + throw new Error(`Stream resume batch failed: ${response.status}`) + } - const activeStreamId = chatHistory.activeStreamId - const snapshot = chatHistory.streamSnapshot - appliedChatIdRef.current = chatHistory.id - const mappedMessages = chatHistory.messages.map(mapStoredMessage) - const shouldPreserveActiveStreamingMessage = - sendingRef.current && Boolean(activeStreamId) && activeStreamId === streamIdRef.current - - if (shouldPreserveActiveStreamingMessage) { - setMessages((prev) => { - const localStreamingAssistant = prev[prev.length - 1] - if (localStreamingAssistant?.role !== 'assistant') { - return mappedMessages - } + return response.json() + }, + [] + ) + + const attachToExistingStream = useCallback( + async ({ + streamId, + assistantId, + expectedGen, + snapshot, + initialLastEventId = 0, + }: AttachToStreamOptions): Promise => { + let latestEventId = initialLastEventId + let seedEvents = snapshot?.events ?? [] + let streamStatus = snapshot?.status ?? 'unknown' + let attachAttempt = 0 - const nextMessages = - mappedMessages[mappedMessages.length - 1]?.role === 'assistant' - ? mappedMessages.slice(0, -1) - : mappedMessages + setIsSending(true) + setIsReconnecting(true) + setError(null) - return [...nextMessages, localStreamingAssistant] + logger.info('Attaching to existing stream', { + streamId, + expectedGen, + initialLastEventId, + seedEventCount: seedEvents.length, + streamStatus, }) - } else { - setMessages(mappedMessages) - } - if (chatHistory.resources.some((r) => r.id === 'streaming-file')) { - fetch('/api/copilot/chat/resources', { - method: 'DELETE', - headers: { 'Content-Type': 'application/json' }, - body: JSON.stringify({ - chatId: chatHistory.id, - resourceType: 'file', - resourceId: 'streaming-file', - }), - }).catch(() => {}) - } + try { + while (streamGenRef.current === expectedGen) { + if (seedEvents.length > 0) { + const replayResult = await processSSEStreamRef.current( + buildReplayStream(seedEvents).getReader(), + assistantId, + { + expectedGen, + initialLastEventId: latestEventId, + preserveExistingState: true, + } + ) + latestEventId = Math.max( + replayResult.lastEventId, + seedEvents[seedEvents.length - 1]?.eventId ?? latestEventId + ) + lastEventIdRef.current = latestEventId + seedEvents = [] + + if (replayResult.sawStreamError) { + logger.warn('Replay stream ended with error event', { streamId, latestEventId }) + return { aborted: false, error: true } + } + } - const persistedResources = chatHistory.resources.filter((r) => r.id !== 'streaming-file') - if (persistedResources.length > 0) { - setResources(persistedResources) - setActiveResourceId(persistedResources[persistedResources.length - 1].id) + if (isTerminalStreamStatus(streamStatus)) { + logger.info('Existing stream already reached terminal status', { + streamId, + latestEventId, + streamStatus, + }) + if (streamStatus === 'error') { + setError(RECONNECT_TAIL_ERROR) + } + return { aborted: false, error: streamStatus === 'error' } + } + + const activeAbortController = abortControllerRef.current + if (!activeAbortController) { + return { aborted: true, error: false } + } + + logger.info('Opening live stream tail', { + streamId, + fromEventId: latestEventId, + attempt: attachAttempt, + }) - for (const resource of persistedResources) { - if (resource.type !== 'workflow') continue - ensureWorkflowInRegistry(resource.id, resource.title, workspaceId) + const sseRes = await fetch( + `${COPILOT_CHAT_STREAM_API_PATH}?streamId=${encodeURIComponent(streamId)}&from=${latestEventId}`, + { signal: activeAbortController.signal } + ) + if (!sseRes.ok || !sseRes.body) { + throw new Error(RECONNECT_TAIL_ERROR) + } + + setIsReconnecting(false) + + const liveResult = await processSSEStreamRef.current( + sseRes.body.getReader(), + assistantId, + { + expectedGen, + initialLastEventId: latestEventId, + preserveExistingState: true, + } + ) + latestEventId = Math.max(latestEventId, liveResult.lastEventId) + lastEventIdRef.current = latestEventId + + if (liveResult.sawStreamError) { + logger.warn('Live stream tail ended with error event', { streamId, latestEventId }) + return { aborted: false, error: true } + } + + attachAttempt += 1 + setIsReconnecting(true) + + logger.warn('Live stream ended without terminal event, fetching replay batch', { + streamId, + latestEventId, + attempt: attachAttempt, + }) + + const batch = await fetchStreamBatch( + streamId, + latestEventId, + activeAbortController.signal + ) + seedEvents = batch.events + streamStatus = batch.status + + if (batch.events.length > 0) { + latestEventId = batch.events[batch.events.length - 1].eventId + lastEventIdRef.current = latestEventId + } + + logger.info('Fetched replay batch after non-terminal stream close', { + streamId, + latestEventId, + streamStatus, + eventCount: batch.events.length, + attempt: attachAttempt, + }) + + if (batch.events.length === 0 && !isTerminalStreamStatus(batch.status)) { + logger.info('No new replay events yet; reopening active stream tail', { + streamId, + latestEventId, + streamStatus, + attempt: attachAttempt, + }) + if (activeAbortController.signal.aborted || streamGenRef.current !== expectedGen) { + return { aborted: true, error: false } + } + } + } + + return { aborted: true, error: false } + } catch (err) { + if (err instanceof Error && err.name === 'AbortError') { + return { aborted: true, error: false } + } + + logger.warn('Failed to attach to existing stream', { + streamId, + latestEventId, + error: err instanceof Error ? err.message : String(err), + }) + setError(err instanceof Error ? err.message : RECONNECT_TAIL_ERROR) + return { aborted: false, error: true } + } finally { + setIsReconnecting(false) } - } else if (chatHistory.resources.some((r) => r.id === 'streaming-file')) { - setResources([]) - setActiveResourceId(null) - } + }, + [fetchStreamBatch] + ) + + const applyChatHistorySnapshot = useCallback( + (history: TaskChatHistory, options?: { preserveActiveStreamingMessage?: boolean }) => { + const preserveActiveStreamingMessage = options?.preserveActiveStreamingMessage ?? false + const activeStreamId = history.activeStreamId + appliedChatIdRef.current = history.id + + const mappedMessages = history.messages.map(mapStoredMessage) + const shouldPreserveActiveStreamingMessage = + preserveActiveStreamingMessage && + sendingRef.current && + Boolean(activeStreamId) && + activeStreamId === streamIdRef.current + + if (shouldPreserveActiveStreamingMessage) { + setMessages((prev) => { + const localStreamingAssistant = prev[prev.length - 1] + if (localStreamingAssistant?.role !== 'assistant') { + return mappedMessages + } + + const nextMessages = + mappedMessages[mappedMessages.length - 1]?.role === 'assistant' + ? mappedMessages.slice(0, -1) + : mappedMessages + + return [...nextMessages, localStreamingAssistant] + }) + } else { + setMessages(mappedMessages) + } + + if (history.resources.some((r) => r.id === 'streaming-file')) { + fetch('/api/copilot/chat/resources', { + method: 'DELETE', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ + chatId: history.id, + resourceType: 'file', + resourceId: 'streaming-file', + }), + }).catch(() => {}) + } + + const persistedResources = history.resources.filter((r) => r.id !== 'streaming-file') + if (persistedResources.length > 0) { + setResources(persistedResources) + setActiveResourceId(persistedResources[persistedResources.length - 1].id) + + for (const resource of persistedResources) { + if (resource.type !== 'workflow') continue + ensureWorkflowInRegistry(resource.id, resource.title, workspaceId) + } + } else if (history.resources.some((r) => r.id === 'streaming-file')) { + setResources([]) + setActiveResourceId(null) + } + }, + [workspaceId] + ) + + const preparePendingStreamRecovery = useCallback( + async (chatId: string): Promise => { + const latestHistory = await fetchChatHistory(chatId) + queryClient.setQueryData(taskKeys.detail(chatId), latestHistory) + applyChatHistorySnapshot(latestHistory) + + if (!latestHistory.activeStreamId) { + return null + } + + return { + streamId: latestHistory.activeStreamId, + snapshot: latestHistory.streamSnapshot, + } + }, + [applyChatHistorySnapshot, queryClient] + ) + + useEffect(() => { + if (!chatHistory || appliedChatIdRef.current === chatHistory.id) return + + const activeStreamId = chatHistory.activeStreamId + const snapshot = chatHistory.streamSnapshot + applyChatHistorySnapshot(chatHistory, { preserveActiveStreamingMessage: true }) if (activeStreamId && !sendingRef.current) { const gen = ++streamGenRef.current const abortController = new AbortController() abortControllerRef.current = abortController streamIdRef.current = activeStreamId + lastEventIdRef.current = snapshot?.events?.[snapshot.events.length - 1]?.eventId ?? 0 sendingRef.current = true - setIsReconnecting(true) + streamingContentRef.current = '' + streamingBlocksRef.current = [] + clientExecutionStartedRef.current.clear() const assistantId = crypto.randomUUID() const reconnect = async () => { - let reconnectFailed = false try { - const encoder = new TextEncoder() - - const batchEvents = snapshot?.events ?? [] - const streamStatus = snapshot?.status ?? '' - - if (batchEvents.length === 0 && streamStatus === 'unknown') { - reconnectFailed = true - setError(RECONNECT_TAIL_ERROR) - return - } - - setIsSending(true) - setIsReconnecting(false) - - const lastEventId = - batchEvents.length > 0 ? batchEvents[batchEvents.length - 1].eventId : 0 - const isStreamDone = - streamStatus === 'complete' || streamStatus === 'error' || streamStatus === 'cancelled' - - const combinedStream = new ReadableStream({ - async start(controller) { - if (batchEvents.length > 0) { - const sseText = batchEvents - .map((e) => `data: ${JSON.stringify(e.event)}\n`) - .join('\n') - controller.enqueue(encoder.encode(`${sseText}\n`)) - } - - if (!isStreamDone) { - try { - const sseRes = await fetch( - `/api/copilot/chat/stream?streamId=${activeStreamId}&from=${lastEventId}`, - { signal: abortController.signal } - ) - if (!sseRes.ok || !sseRes.body) { - reconnectFailed = true - logger.warn('SSE tail reconnect returned no readable body', { - status: sseRes.status, - streamId: activeStreamId, - }) - setError(RECONNECT_TAIL_ERROR) - } else { - const reader = sseRes.body.getReader() - while (true) { - const { done, value } = await reader.read() - if (done) break - controller.enqueue(value) - } - } - } catch (err) { - if (!(err instanceof Error && err.name === 'AbortError')) { - reconnectFailed = true - logger.warn('SSE tail failed during reconnect', err) - setError(RECONNECT_TAIL_ERROR) - } - } - } - - controller.close() - }, - }) - - const hadStreamError = await processSSEStreamRef.current( - combinedStream.getReader(), + const result = await attachToExistingStream({ + streamId: activeStreamId, assistantId, - gen - ) - if (hadStreamError) { - reconnectFailed = true + expectedGen: gen, + snapshot, + initialLastEventId: lastEventIdRef.current, + }) + if (streamGenRef.current === gen && !result.aborted) { + finalizeRef.current(result.error ? { error: true } : undefined) } } catch (err) { if (err instanceof Error && err.name === 'AbortError') return - reconnectFailed = true - } finally { - setIsReconnecting(false) + logger.warn('Unexpected error during reconnect', { + streamId: activeStreamId, + chatId: chatHistory.id, + error: err instanceof Error ? err.message : String(err), + }) if (streamGenRef.current === gen) { - finalizeRef.current(reconnectFailed ? { error: true } : undefined) + try { + finalizeRef.current({ error: true }) + } catch (finalizeError) { + logger.error('Reconnect fallback finalize failed', { + streamId: activeStreamId, + chatId: chatHistory.id, + error: + finalizeError instanceof Error ? finalizeError.message : String(finalizeError), + }) + sendingRef.current = false + setIsSending(false) + setIsReconnecting(false) + abortControllerRef.current = null + setError('Failed to reconnect to the active stream') + } + } + } finally { + if (abortControllerRef.current === abortController) { + abortControllerRef.current = null } } } reconnect() } - }, [chatHistory, workspaceId, queryClient]) + }, [applyChatHistorySnapshot, attachToExistingStream, chatHistory, queryClient]) const processSSEStream = useCallback( async ( reader: ReadableStreamDefaultReader, assistantId: string, - expectedGen?: number + options?: StreamProcessingOptions ) => { + const { expectedGen, initialLastEventId = 0, preserveExistingState = false } = options ?? {} const decoder = new TextDecoder() streamReaderRef.current = reader let buffer = '' - const blocks: ContentBlock[] = [] + const blocks: ContentBlock[] = preserveExistingState ? [...streamingBlocksRef.current] : [] const toolMap = new Map() const toolArgsMap = new Map>() - const clientExecutionStarted = new Set() + const clientExecutionStarted = clientExecutionStartedRef.current let activeSubagent: string | undefined let activeCompactionId: string | undefined - let runningText = '' + let runningText = preserveExistingState ? streamingContentRef.current : '' let lastContentSource: 'main' | 'subagent' | null = null let streamRequestId: string | undefined + let lastEventId = initialLastEventId + let sawDoneEvent = false - streamingContentRef.current = '' - streamingBlocksRef.current = [] + if (!preserveExistingState) { + streamingContentRef.current = '' + streamingBlocksRef.current = [] + } + + for (const [index, block] of blocks.entries()) { + if (block.type === 'tool_call' && block.toolCall?.id) { + toolMap.set(block.toolCall.id, index) + if (block.toolCall.params) { + toolArgsMap.set(block.toolCall.id, block.toolCall.params) + } + } + } const ensureTextBlock = (): ContentBlock => { const last = blocks[blocks.length - 1] @@ -716,6 +1003,14 @@ export function useChat( continue } + if (typeof (parsed as SSEPayload & { eventId?: unknown }).eventId === 'number') { + lastEventId = Math.max( + lastEventId, + (parsed as SSEPayload & { eventId: number }).eventId + ) + lastEventIdRef.current = lastEventId + } + logger.debug('SSE event received', parsed) switch (parsed.type) { case 'chat_id': { @@ -1167,6 +1462,10 @@ export function useChat( appendInlineErrorTag(buildInlineErrorTag(parsed)) break } + case 'done': { + sawDoneEvent = true + break + } } } } @@ -1175,7 +1474,11 @@ export function useChat( streamReaderRef.current = null } } - return sawStreamError + return { + sawStreamError, + sawDoneEvent, + lastEventId, + } }, [workspaceId, queryClient, addResource, removeResource] ) @@ -1247,6 +1550,16 @@ export function useChat( const messagesRef = useRef(messages) messagesRef.current = messages + const visibleMessageQueue = useMemo( + () => + pendingRecoveryMessage + ? [ + pendingRecoveryMessage, + ...messageQueue.filter((msg) => msg.id !== pendingRecoveryMessage.id), + ] + : messageQueue, + [messageQueue, pendingRecoveryMessage] + ) const finalize = useCallback( (options?: { error?: boolean }) => { @@ -1267,6 +1580,21 @@ export function useChat( return } + const recoveryMessage = pendingRecoveryMessageRef.current + if (recoveryMessage) { + setPendingRecoveryMessage(null) + const gen = streamGenRef.current + queueMicrotask(() => { + if (streamGenRef.current !== gen) return + sendMessageRef.current( + recoveryMessage.content, + recoveryMessage.fileAttachments, + recoveryMessage.contexts + ) + }) + return + } + const next = messageQueueRef.current[0] if (next) { setMessageQueue((prev) => prev.filter((m) => m.id !== next.id)) @@ -1307,6 +1635,8 @@ export function useChat( pendingUserMsgRef.current = { id: userMessageId, content: message } streamIdRef.current = userMessageId + lastEventIdRef.current = 0 + clientExecutionStartedRef.current.clear() const storedAttachments: TaskStoredFileAttachment[] | undefined = fileAttachments && fileAttachments.length > 0 @@ -1320,6 +1650,9 @@ export function useChat( : undefined const requestChatId = selectedChatIdRef.current ?? chatIdRef.current + const previousChatHistory = requestChatId + ? queryClient.getQueryData(taskKeys.detail(requestChatId)) + : undefined if (requestChatId) { const cachedUserMsg: TaskStoredMessage = { id: userMessageId, @@ -1339,6 +1672,7 @@ export function useChat( } const userAttachments = storedAttachments?.map(toDisplayAttachment) + const previousMessages = messagesRef.current const messageContexts = contexts?.map((c) => ({ kind: c.kind, @@ -1402,20 +1736,132 @@ export function useChat( if (!response.body) throw new Error('No response body') - const hadStreamError = await processSSEStream(response.body.getReader(), assistantId, gen) + const termination = await processSSEStream(response.body.getReader(), assistantId, { + expectedGen: gen, + }) if (streamGenRef.current === gen) { - finalize(hadStreamError ? { error: true } : undefined) + if (termination.sawStreamError) { + finalize({ error: true }) + return + } + + const batch = await fetchStreamBatch( + userMessageId, + termination.lastEventId, + abortController.signal + ) + if (streamGenRef.current !== gen) { + return + } + if (isTerminalStreamStatus(batch.status)) { + finalize(batch.status === 'error' ? { error: true } : undefined) + return + } + + logger.warn( + 'Primary stream ended without terminal event, attempting in-place reconnect', + { + streamId: userMessageId, + lastEventId: termination.lastEventId, + streamStatus: batch.status, + sawDoneEvent: termination.sawDoneEvent, + } + ) + + const reconnectResult = await attachToExistingStream({ + streamId: userMessageId, + assistantId, + expectedGen: gen, + snapshot: { + events: batch.events, + status: batch.status, + }, + initialLastEventId: + batch.events[batch.events.length - 1]?.eventId ?? termination.lastEventId, + }) + + if (streamGenRef.current === gen && !reconnectResult.aborted) { + finalize(reconnectResult.error ? { error: true } : undefined) + } } } catch (err) { if (err instanceof Error && err.name === 'AbortError') return - setError(err instanceof Error ? err.message : 'Failed to send message') + const errorMessage = err instanceof Error ? err.message : 'Failed to send message' + if (requestChatId && isActiveStreamConflictError(errorMessage)) { + logger.info('Active stream conflict detected while sending message; reattaching', { + chatId: requestChatId, + attemptedStreamId: userMessageId, + }) + + if (previousChatHistory) { + queryClient.setQueryData(taskKeys.detail(requestChatId), previousChatHistory) + } + setMessages(previousMessages) + const queuedMessage: QueuedMessage = { + id: crypto.randomUUID(), + content: message, + fileAttachments, + contexts, + } + pendingRecoveryMessageRef.current = queuedMessage + setPendingRecoveryMessage(queuedMessage) + + try { + const pendingRecovery = await preparePendingStreamRecovery(requestChatId) + if (!pendingRecovery) { + setError(errorMessage) + if (streamGenRef.current === gen) { + finalize({ error: true }) + } + return + } + + streamIdRef.current = pendingRecovery.streamId + lastEventIdRef.current = + pendingRecovery.snapshot?.events?.[pendingRecovery.snapshot.events.length - 1] + ?.eventId ?? 0 + + const rehydratedMessages = messagesRef.current + const lastAssistantMsg = [...rehydratedMessages] + .reverse() + .find((m) => m.role === 'assistant') + const recoveryAssistantId = lastAssistantMsg?.id ?? assistantId + + const reconnectResult = await attachToExistingStream({ + streamId: pendingRecovery.streamId, + assistantId: recoveryAssistantId, + expectedGen: gen, + snapshot: pendingRecovery.snapshot, + initialLastEventId: lastEventIdRef.current, + }) + + if (streamGenRef.current === gen && !reconnectResult.aborted) { + finalize(reconnectResult.error ? { error: true } : undefined) + } + return + } catch (recoveryError) { + logger.warn('Failed to recover active stream after conflict', { + chatId: requestChatId, + error: recoveryError instanceof Error ? recoveryError.message : String(recoveryError), + }) + } + } + + setError(errorMessage) if (streamGenRef.current === gen) { finalize({ error: true }) } return } }, - [workspaceId, queryClient, processSSEStream, finalize] + [ + workspaceId, + queryClient, + processSSEStream, + finalize, + attachToExistingStream, + preparePendingStreamRecovery, + ] ) sendMessageRef.current = sendMessage @@ -1434,6 +1880,10 @@ export function useChat( abortControllerRef.current = null sendingRef.current = false setIsSending(false) + setIsReconnecting(false) + lastEventIdRef.current = 0 + pendingRecoveryMessageRef.current = null + setPendingRecoveryMessage(null) setMessages((prev) => prev.map((msg) => { @@ -1521,24 +1971,47 @@ export function useChat( }, [invalidateChatQueries, persistPartialResponse, executionStream]) const removeFromQueue = useCallback((id: string) => { + if (pendingRecoveryMessageRef.current?.id === id) { + pendingRecoveryMessageRef.current = null + setPendingRecoveryMessage(null) + return + } messageQueueRef.current = messageQueueRef.current.filter((m) => m.id !== id) setMessageQueue((prev) => prev.filter((m) => m.id !== id)) }, []) const sendNow = useCallback( async (id: string) => { - const msg = messageQueueRef.current.find((m) => m.id === id) + const recoveryMessage = pendingRecoveryMessageRef.current + const msg = + recoveryMessage?.id === id + ? recoveryMessage + : messageQueueRef.current.find((m) => m.id === id) if (!msg) return // Eagerly update ref so a rapid second click finds the message already gone - messageQueueRef.current = messageQueueRef.current.filter((m) => m.id !== id) + if (recoveryMessage?.id === id) { + pendingRecoveryMessageRef.current = null + setPendingRecoveryMessage(null) + } else { + messageQueueRef.current = messageQueueRef.current.filter((m) => m.id !== id) + } await stopGeneration() - setMessageQueue((prev) => prev.filter((m) => m.id !== id)) + if (recoveryMessage?.id !== id) { + setMessageQueue((prev) => prev.filter((m) => m.id !== id)) + } await sendMessage(msg.content, msg.fileAttachments, msg.contexts) }, [stopGeneration, sendMessage] ) const editQueuedMessage = useCallback((id: string): QueuedMessage | undefined => { + const recoveryMessage = pendingRecoveryMessageRef.current + if (recoveryMessage?.id === id) { + pendingRecoveryMessageRef.current = null + setPendingRecoveryMessage(null) + return recoveryMessage + } + const msg = messageQueueRef.current.find((m) => m.id === id) if (!msg) return undefined messageQueueRef.current = messageQueueRef.current.filter((m) => m.id !== id) @@ -1552,6 +2025,9 @@ export function useChat( abortControllerRef.current = null streamGenRef.current++ sendingRef.current = false + lastEventIdRef.current = 0 + clientExecutionStartedRef.current.clear() + pendingRecoveryMessageRef.current = null } }, []) @@ -1569,7 +2045,7 @@ export function useChat( addResource, removeResource, reorderResources, - messageQueue, + messageQueue: visibleMessageQueue, removeFromQueue, sendNow, editQueuedMessage, diff --git a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/panel.tsx b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/panel.tsx index f5a65ab0760..0ea9cb6c51a 100644 --- a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/panel.tsx +++ b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/panel.tsx @@ -318,6 +318,7 @@ export const Panel = memo(function Panel() { const { messages: copilotMessages, isSending: copilotIsSending, + isReconnecting: copilotIsReconnecting, sendMessage: copilotSendMessage, stopGeneration: copilotStopGeneration, resolvedChatId: copilotResolvedChatId, @@ -812,6 +813,7 @@ export const Panel = memo(function Panel() { className='min-h-0 flex-1' messages={copilotMessages} isSending={copilotIsSending} + isReconnecting={copilotIsReconnecting} onSubmit={handleCopilotSubmit} onStopGeneration={copilotStopGeneration} messageQueue={copilotMessageQueue} diff --git a/apps/sim/hooks/queries/tasks.ts b/apps/sim/hooks/queries/tasks.ts index 8bc3c10c536..6e071b60588 100644 --- a/apps/sim/hooks/queries/tasks.ts +++ b/apps/sim/hooks/queries/tasks.ts @@ -129,7 +129,10 @@ export function useTasks(workspaceId?: string) { }) } -async function fetchChatHistory(chatId: string, signal?: AbortSignal): Promise { +export async function fetchChatHistory( + chatId: string, + signal?: AbortSignal +): Promise { const response = await fetch(`/api/copilot/chat?chatId=${chatId}`, { signal }) if (!response.ok) { diff --git a/apps/sim/lib/copilot/chat-streaming.ts b/apps/sim/lib/copilot/chat-streaming.ts index 6d090a4866b..b3d465667d6 100644 --- a/apps/sim/lib/copilot/chat-streaming.ts +++ b/apps/sim/lib/copilot/chat-streaming.ts @@ -294,6 +294,21 @@ export function createSSEStream(params: StreamingOrchestrationParams): ReadableS return new ReadableStream({ async start(controller) { const encoder = new TextEncoder() + const markClientDisconnected = (reason: string) => { + if (clientDisconnected) return + clientDisconnected = true + logger.info( + appendCopilotLogContext('Client disconnected from live SSE stream', { + requestId, + messageId, + }), + { + streamId, + runId, + reason, + } + ) + } await resetStreamBuffer(streamId) await setStreamMeta(streamId, { status: 'active', userId, executionId, runId }) @@ -381,7 +396,7 @@ export function createSSEStream(params: StreamingOrchestrationParams): ReadableS ) } } catch { - clientDisconnected = true + markClientDisconnected('enqueue_failed') } } @@ -424,7 +439,7 @@ export function createSSEStream(params: StreamingOrchestrationParams): ReadableS try { controller.enqueue(encoder.encode(': keepalive\n\n')) } catch { - clientDisconnected = true + markClientDisconnected('keepalive_failed') } }, 15_000) @@ -498,6 +513,18 @@ export function createSSEStream(params: StreamingOrchestrationParams): ReadableS await eventWriter.close() await setStreamMeta(streamId, { status: 'complete', userId, executionId, runId }) await updateRunStatus(runId, 'complete', { completedAt: new Date() }).catch(() => {}) + if (clientDisconnected) { + logger.info( + appendCopilotLogContext('Orchestration completed after client disconnect', { + requestId, + messageId, + }), + { + streamId, + runId, + } + ) + } } catch (error) { if (abortController.signal.aborted) { logger.error( @@ -544,6 +571,12 @@ export function createSSEStream(params: StreamingOrchestrationParams): ReadableS error: errorMessage, }).catch(() => {}) } finally { + logger.info(appendCopilotLogContext('Closing live SSE stream', { requestId, messageId }), { + streamId, + runId, + clientDisconnected, + aborted: abortController.signal.aborted, + }) clearInterval(keepaliveInterval) if (abortPoller) { clearInterval(abortPoller) @@ -566,6 +599,16 @@ export function createSSEStream(params: StreamingOrchestrationParams): ReadableS } }, cancel() { + logger.info( + appendCopilotLogContext('ReadableStream cancel received from client', { + requestId, + messageId, + }), + { + streamId, + runId, + } + ) clientDisconnected = true if (eventWriter) { eventWriter.flush().catch(() => {})