diff --git a/apps/sim/app/workspace/[workspaceId]/home/components/mothership-chat/mothership-chat.tsx b/apps/sim/app/workspace/[workspaceId]/home/components/mothership-chat/mothership-chat.tsx
index e3d32331d25..abc9e7272bd 100644
--- a/apps/sim/app/workspace/[workspaceId]/home/components/mothership-chat/mothership-chat.tsx
+++ b/apps/sim/app/workspace/[workspaceId]/home/components/mothership-chat/mothership-chat.tsx
@@ -23,6 +23,7 @@ import type { ChatContext } from '@/stores/panel'
interface MothershipChatProps {
messages: ChatMessage[]
isSending: boolean
+ isReconnecting?: boolean
onSubmit: (
text: string,
fileAttachments?: FileAttachmentForApi[],
@@ -71,6 +72,7 @@ const LAYOUT_STYLES = {
export function MothershipChat({
messages,
isSending,
+ isReconnecting = false,
onSubmit,
onStopGeneration,
messageQueue,
@@ -88,7 +90,8 @@ export function MothershipChat({
className,
}: MothershipChatProps) {
const styles = LAYOUT_STYLES[layout]
- const { ref: scrollContainerRef, scrollToBottom } = useAutoScroll(isSending)
+ const isStreamActive = isSending || isReconnecting
+ const { ref: scrollContainerRef, scrollToBottom } = useAutoScroll(isStreamActive)
const hasMessages = messages.length > 0
const initialScrollDoneRef = useRef(false)
@@ -131,7 +134,7 @@ export function MothershipChat({
msg.content ?? ''
)
const isLastAssistant = index === messages.length - 1
- const isThisStreaming = isSending && isLastAssistant
+ const isThisStreaming = isStreamActive && isLastAssistant
if (!hasAnyBlocks && !msg.content?.trim() && isThisStreaming) {
return
@@ -175,7 +178,7 @@ export function MothershipChat({
/>
= {
const DEPLOY_TOOL_NAMES = new Set(['deploy_api', 'deploy_chat', 'deploy_mcp', 'redeploy'])
const RECONNECT_TAIL_ERROR =
'Live reconnect failed before the stream finished. The latest response may be incomplete.'
+const TERMINAL_STREAM_STATUSES = new Set(['complete', 'error', 'cancelled'])
+
+interface StreamEventEnvelope {
+ eventId: number
+ streamId: string
+ event: Record
+}
+
+interface StreamBatchResponse {
+ success: boolean
+ events: StreamEventEnvelope[]
+ status: string
+}
+
+interface StreamTerminationResult {
+ sawStreamError: boolean
+ sawDoneEvent: boolean
+ lastEventId: number
+}
+
+interface StreamProcessingOptions {
+ expectedGen?: number
+ initialLastEventId?: number
+ preserveExistingState?: boolean
+}
+
+interface AttachToStreamOptions {
+ streamId: string
+ assistantId: string
+ expectedGen: number
+ snapshot?: StreamSnapshot | null
+ initialLastEventId?: number
+}
+
+interface AttachToStreamResult {
+ aborted: boolean
+ error: boolean
+}
+
+interface PendingStreamRecovery {
+ streamId: string
+ snapshot?: StreamSnapshot | null
+}
+
+function isTerminalStreamStatus(status?: string | null): boolean {
+ return Boolean(status && TERMINAL_STREAM_STATUSES.has(status))
+}
+
+function isActiveStreamConflictError(input: unknown): boolean {
+ if (typeof input !== 'string') return false
+ return input.includes('A response is already in progress for this chat')
+}
+
+function buildReplayStream(events: StreamEventEnvelope[]): ReadableStream {
+ const encoder = new TextEncoder()
+ return new ReadableStream({
+ start(controller) {
+ if (events.length > 0) {
+ const payload = events
+ .map(
+ (entry) =>
+ `data: ${JSON.stringify({ ...entry.event, eventId: entry.eventId, streamId: entry.streamId })}\n\n`
+ )
+ .join('')
+ controller.enqueue(encoder.encode(payload))
+ }
+ controller.close()
+ },
+ })
+}
function mapStoredBlock(block: TaskStoredContentBlock): ContentBlock {
const mapped: ContentBlock = {
@@ -339,15 +415,22 @@ export function useChat(
const [messageQueue, setMessageQueue] = useState([])
const messageQueueRef = useRef([])
messageQueueRef.current = messageQueue
+ const [pendingRecoveryMessage, setPendingRecoveryMessage] = useState(null)
+ const pendingRecoveryMessageRef = useRef(null)
+ pendingRecoveryMessageRef.current = pendingRecoveryMessage
const sendMessageRef = useRef(async () => {})
const processSSEStreamRef = useRef<
(
reader: ReadableStreamDefaultReader,
assistantId: string,
- expectedGen?: number
- ) => Promise
- >(async () => false)
+ options?: StreamProcessingOptions
+ ) => Promise
+ >(async () => ({
+ sawStreamError: false,
+ sawDoneEvent: false,
+ lastEventId: 0,
+ }))
const finalizeRef = useRef<(options?: { error?: boolean }) => void>(() => {})
const abortControllerRef = useRef(null)
@@ -359,10 +442,12 @@ export function useChat(
const appliedChatIdRef = useRef(undefined)
const pendingUserMsgRef = useRef<{ id: string; content: string } | null>(null)
const streamIdRef = useRef(undefined)
+ const lastEventIdRef = useRef(0)
const sendingRef = useRef(false)
const streamGenRef = useRef(0)
const streamingContentRef = useRef('')
const streamingBlocksRef = useRef([])
+ const clientExecutionStartedRef = useRef>(new Set())
const executionStream = useExecutionStream()
const isHomePage = pathname.endsWith('/home')
@@ -420,6 +505,10 @@ export function useChat(
abortControllerRef.current = null
sendingRef.current = false
setIsSending(false)
+ setIsReconnecting(false)
+ lastEventIdRef.current = 0
+ pendingRecoveryMessageRef.current = null
+ setPendingRecoveryMessage(null)
if (abandonedChatId) {
queryClient.invalidateQueries({ queryKey: taskKeys.detail(abandonedChatId) })
}
@@ -441,6 +530,10 @@ export function useChat(
setStreamingFile(null)
streamingFileRef.current = null
setMessageQueue([])
+ lastEventIdRef.current = 0
+ clientExecutionStartedRef.current.clear()
+ pendingRecoveryMessageRef.current = null
+ setPendingRecoveryMessage(null)
}, [initialChatId, queryClient])
useEffect(() => {
@@ -461,180 +554,374 @@ export function useChat(
setStreamingFile(null)
streamingFileRef.current = null
setMessageQueue([])
+ lastEventIdRef.current = 0
+ clientExecutionStartedRef.current.clear()
+ pendingRecoveryMessageRef.current = null
+ setPendingRecoveryMessage(null)
}, [isHomePage])
- useEffect(() => {
- if (!chatHistory || appliedChatIdRef.current === chatHistory.id) return
+ const fetchStreamBatch = useCallback(
+ async (
+ streamId: string,
+ fromEventId: number,
+ signal?: AbortSignal
+ ): Promise => {
+ const response = await fetch(
+ `${COPILOT_CHAT_STREAM_API_PATH}?streamId=${encodeURIComponent(streamId)}&from=${fromEventId}&batch=true`,
+ { signal }
+ )
+
+ if (!response.ok) {
+ throw new Error(`Stream resume batch failed: ${response.status}`)
+ }
- const activeStreamId = chatHistory.activeStreamId
- const snapshot = chatHistory.streamSnapshot
- appliedChatIdRef.current = chatHistory.id
- const mappedMessages = chatHistory.messages.map(mapStoredMessage)
- const shouldPreserveActiveStreamingMessage =
- sendingRef.current && Boolean(activeStreamId) && activeStreamId === streamIdRef.current
-
- if (shouldPreserveActiveStreamingMessage) {
- setMessages((prev) => {
- const localStreamingAssistant = prev[prev.length - 1]
- if (localStreamingAssistant?.role !== 'assistant') {
- return mappedMessages
- }
+ return response.json()
+ },
+ []
+ )
+
+ const attachToExistingStream = useCallback(
+ async ({
+ streamId,
+ assistantId,
+ expectedGen,
+ snapshot,
+ initialLastEventId = 0,
+ }: AttachToStreamOptions): Promise => {
+ let latestEventId = initialLastEventId
+ let seedEvents = snapshot?.events ?? []
+ let streamStatus = snapshot?.status ?? 'unknown'
+ let attachAttempt = 0
- const nextMessages =
- mappedMessages[mappedMessages.length - 1]?.role === 'assistant'
- ? mappedMessages.slice(0, -1)
- : mappedMessages
+ setIsSending(true)
+ setIsReconnecting(true)
+ setError(null)
- return [...nextMessages, localStreamingAssistant]
+ logger.info('Attaching to existing stream', {
+ streamId,
+ expectedGen,
+ initialLastEventId,
+ seedEventCount: seedEvents.length,
+ streamStatus,
})
- } else {
- setMessages(mappedMessages)
- }
- if (chatHistory.resources.some((r) => r.id === 'streaming-file')) {
- fetch('/api/copilot/chat/resources', {
- method: 'DELETE',
- headers: { 'Content-Type': 'application/json' },
- body: JSON.stringify({
- chatId: chatHistory.id,
- resourceType: 'file',
- resourceId: 'streaming-file',
- }),
- }).catch(() => {})
- }
+ try {
+ while (streamGenRef.current === expectedGen) {
+ if (seedEvents.length > 0) {
+ const replayResult = await processSSEStreamRef.current(
+ buildReplayStream(seedEvents).getReader(),
+ assistantId,
+ {
+ expectedGen,
+ initialLastEventId: latestEventId,
+ preserveExistingState: true,
+ }
+ )
+ latestEventId = Math.max(
+ replayResult.lastEventId,
+ seedEvents[seedEvents.length - 1]?.eventId ?? latestEventId
+ )
+ lastEventIdRef.current = latestEventId
+ seedEvents = []
+
+ if (replayResult.sawStreamError) {
+ logger.warn('Replay stream ended with error event', { streamId, latestEventId })
+ return { aborted: false, error: true }
+ }
+ }
- const persistedResources = chatHistory.resources.filter((r) => r.id !== 'streaming-file')
- if (persistedResources.length > 0) {
- setResources(persistedResources)
- setActiveResourceId(persistedResources[persistedResources.length - 1].id)
+ if (isTerminalStreamStatus(streamStatus)) {
+ logger.info('Existing stream already reached terminal status', {
+ streamId,
+ latestEventId,
+ streamStatus,
+ })
+ if (streamStatus === 'error') {
+ setError(RECONNECT_TAIL_ERROR)
+ }
+ return { aborted: false, error: streamStatus === 'error' }
+ }
+
+ const activeAbortController = abortControllerRef.current
+ if (!activeAbortController) {
+ return { aborted: true, error: false }
+ }
+
+ logger.info('Opening live stream tail', {
+ streamId,
+ fromEventId: latestEventId,
+ attempt: attachAttempt,
+ })
- for (const resource of persistedResources) {
- if (resource.type !== 'workflow') continue
- ensureWorkflowInRegistry(resource.id, resource.title, workspaceId)
+ const sseRes = await fetch(
+ `${COPILOT_CHAT_STREAM_API_PATH}?streamId=${encodeURIComponent(streamId)}&from=${latestEventId}`,
+ { signal: activeAbortController.signal }
+ )
+ if (!sseRes.ok || !sseRes.body) {
+ throw new Error(RECONNECT_TAIL_ERROR)
+ }
+
+ setIsReconnecting(false)
+
+ const liveResult = await processSSEStreamRef.current(
+ sseRes.body.getReader(),
+ assistantId,
+ {
+ expectedGen,
+ initialLastEventId: latestEventId,
+ preserveExistingState: true,
+ }
+ )
+ latestEventId = Math.max(latestEventId, liveResult.lastEventId)
+ lastEventIdRef.current = latestEventId
+
+ if (liveResult.sawStreamError) {
+ logger.warn('Live stream tail ended with error event', { streamId, latestEventId })
+ return { aborted: false, error: true }
+ }
+
+ attachAttempt += 1
+ setIsReconnecting(true)
+
+ logger.warn('Live stream ended without terminal event, fetching replay batch', {
+ streamId,
+ latestEventId,
+ attempt: attachAttempt,
+ })
+
+ const batch = await fetchStreamBatch(
+ streamId,
+ latestEventId,
+ activeAbortController.signal
+ )
+ seedEvents = batch.events
+ streamStatus = batch.status
+
+ if (batch.events.length > 0) {
+ latestEventId = batch.events[batch.events.length - 1].eventId
+ lastEventIdRef.current = latestEventId
+ }
+
+ logger.info('Fetched replay batch after non-terminal stream close', {
+ streamId,
+ latestEventId,
+ streamStatus,
+ eventCount: batch.events.length,
+ attempt: attachAttempt,
+ })
+
+ if (batch.events.length === 0 && !isTerminalStreamStatus(batch.status)) {
+ logger.info('No new replay events yet; reopening active stream tail', {
+ streamId,
+ latestEventId,
+ streamStatus,
+ attempt: attachAttempt,
+ })
+ if (activeAbortController.signal.aborted || streamGenRef.current !== expectedGen) {
+ return { aborted: true, error: false }
+ }
+ }
+ }
+
+ return { aborted: true, error: false }
+ } catch (err) {
+ if (err instanceof Error && err.name === 'AbortError') {
+ return { aborted: true, error: false }
+ }
+
+ logger.warn('Failed to attach to existing stream', {
+ streamId,
+ latestEventId,
+ error: err instanceof Error ? err.message : String(err),
+ })
+ setError(err instanceof Error ? err.message : RECONNECT_TAIL_ERROR)
+ return { aborted: false, error: true }
+ } finally {
+ setIsReconnecting(false)
}
- } else if (chatHistory.resources.some((r) => r.id === 'streaming-file')) {
- setResources([])
- setActiveResourceId(null)
- }
+ },
+ [fetchStreamBatch]
+ )
+
+ const applyChatHistorySnapshot = useCallback(
+ (history: TaskChatHistory, options?: { preserveActiveStreamingMessage?: boolean }) => {
+ const preserveActiveStreamingMessage = options?.preserveActiveStreamingMessage ?? false
+ const activeStreamId = history.activeStreamId
+ appliedChatIdRef.current = history.id
+
+ const mappedMessages = history.messages.map(mapStoredMessage)
+ const shouldPreserveActiveStreamingMessage =
+ preserveActiveStreamingMessage &&
+ sendingRef.current &&
+ Boolean(activeStreamId) &&
+ activeStreamId === streamIdRef.current
+
+ if (shouldPreserveActiveStreamingMessage) {
+ setMessages((prev) => {
+ const localStreamingAssistant = prev[prev.length - 1]
+ if (localStreamingAssistant?.role !== 'assistant') {
+ return mappedMessages
+ }
+
+ const nextMessages =
+ mappedMessages[mappedMessages.length - 1]?.role === 'assistant'
+ ? mappedMessages.slice(0, -1)
+ : mappedMessages
+
+ return [...nextMessages, localStreamingAssistant]
+ })
+ } else {
+ setMessages(mappedMessages)
+ }
+
+ if (history.resources.some((r) => r.id === 'streaming-file')) {
+ fetch('/api/copilot/chat/resources', {
+ method: 'DELETE',
+ headers: { 'Content-Type': 'application/json' },
+ body: JSON.stringify({
+ chatId: history.id,
+ resourceType: 'file',
+ resourceId: 'streaming-file',
+ }),
+ }).catch(() => {})
+ }
+
+ const persistedResources = history.resources.filter((r) => r.id !== 'streaming-file')
+ if (persistedResources.length > 0) {
+ setResources(persistedResources)
+ setActiveResourceId(persistedResources[persistedResources.length - 1].id)
+
+ for (const resource of persistedResources) {
+ if (resource.type !== 'workflow') continue
+ ensureWorkflowInRegistry(resource.id, resource.title, workspaceId)
+ }
+ } else if (history.resources.some((r) => r.id === 'streaming-file')) {
+ setResources([])
+ setActiveResourceId(null)
+ }
+ },
+ [workspaceId]
+ )
+
+ const preparePendingStreamRecovery = useCallback(
+ async (chatId: string): Promise => {
+ const latestHistory = await fetchChatHistory(chatId)
+ queryClient.setQueryData(taskKeys.detail(chatId), latestHistory)
+ applyChatHistorySnapshot(latestHistory)
+
+ if (!latestHistory.activeStreamId) {
+ return null
+ }
+
+ return {
+ streamId: latestHistory.activeStreamId,
+ snapshot: latestHistory.streamSnapshot,
+ }
+ },
+ [applyChatHistorySnapshot, queryClient]
+ )
+
+ useEffect(() => {
+ if (!chatHistory || appliedChatIdRef.current === chatHistory.id) return
+
+ const activeStreamId = chatHistory.activeStreamId
+ const snapshot = chatHistory.streamSnapshot
+ applyChatHistorySnapshot(chatHistory, { preserveActiveStreamingMessage: true })
if (activeStreamId && !sendingRef.current) {
const gen = ++streamGenRef.current
const abortController = new AbortController()
abortControllerRef.current = abortController
streamIdRef.current = activeStreamId
+ lastEventIdRef.current = snapshot?.events?.[snapshot.events.length - 1]?.eventId ?? 0
sendingRef.current = true
- setIsReconnecting(true)
+ streamingContentRef.current = ''
+ streamingBlocksRef.current = []
+ clientExecutionStartedRef.current.clear()
const assistantId = crypto.randomUUID()
const reconnect = async () => {
- let reconnectFailed = false
try {
- const encoder = new TextEncoder()
-
- const batchEvents = snapshot?.events ?? []
- const streamStatus = snapshot?.status ?? ''
-
- if (batchEvents.length === 0 && streamStatus === 'unknown') {
- reconnectFailed = true
- setError(RECONNECT_TAIL_ERROR)
- return
- }
-
- setIsSending(true)
- setIsReconnecting(false)
-
- const lastEventId =
- batchEvents.length > 0 ? batchEvents[batchEvents.length - 1].eventId : 0
- const isStreamDone =
- streamStatus === 'complete' || streamStatus === 'error' || streamStatus === 'cancelled'
-
- const combinedStream = new ReadableStream({
- async start(controller) {
- if (batchEvents.length > 0) {
- const sseText = batchEvents
- .map((e) => `data: ${JSON.stringify(e.event)}\n`)
- .join('\n')
- controller.enqueue(encoder.encode(`${sseText}\n`))
- }
-
- if (!isStreamDone) {
- try {
- const sseRes = await fetch(
- `/api/copilot/chat/stream?streamId=${activeStreamId}&from=${lastEventId}`,
- { signal: abortController.signal }
- )
- if (!sseRes.ok || !sseRes.body) {
- reconnectFailed = true
- logger.warn('SSE tail reconnect returned no readable body', {
- status: sseRes.status,
- streamId: activeStreamId,
- })
- setError(RECONNECT_TAIL_ERROR)
- } else {
- const reader = sseRes.body.getReader()
- while (true) {
- const { done, value } = await reader.read()
- if (done) break
- controller.enqueue(value)
- }
- }
- } catch (err) {
- if (!(err instanceof Error && err.name === 'AbortError')) {
- reconnectFailed = true
- logger.warn('SSE tail failed during reconnect', err)
- setError(RECONNECT_TAIL_ERROR)
- }
- }
- }
-
- controller.close()
- },
- })
-
- const hadStreamError = await processSSEStreamRef.current(
- combinedStream.getReader(),
+ const result = await attachToExistingStream({
+ streamId: activeStreamId,
assistantId,
- gen
- )
- if (hadStreamError) {
- reconnectFailed = true
+ expectedGen: gen,
+ snapshot,
+ initialLastEventId: lastEventIdRef.current,
+ })
+ if (streamGenRef.current === gen && !result.aborted) {
+ finalizeRef.current(result.error ? { error: true } : undefined)
}
} catch (err) {
if (err instanceof Error && err.name === 'AbortError') return
- reconnectFailed = true
- } finally {
- setIsReconnecting(false)
+ logger.warn('Unexpected error during reconnect', {
+ streamId: activeStreamId,
+ chatId: chatHistory.id,
+ error: err instanceof Error ? err.message : String(err),
+ })
if (streamGenRef.current === gen) {
- finalizeRef.current(reconnectFailed ? { error: true } : undefined)
+ try {
+ finalizeRef.current({ error: true })
+ } catch (finalizeError) {
+ logger.error('Reconnect fallback finalize failed', {
+ streamId: activeStreamId,
+ chatId: chatHistory.id,
+ error:
+ finalizeError instanceof Error ? finalizeError.message : String(finalizeError),
+ })
+ sendingRef.current = false
+ setIsSending(false)
+ setIsReconnecting(false)
+ abortControllerRef.current = null
+ setError('Failed to reconnect to the active stream')
+ }
+ }
+ } finally {
+ if (abortControllerRef.current === abortController) {
+ abortControllerRef.current = null
}
}
}
reconnect()
}
- }, [chatHistory, workspaceId, queryClient])
+ }, [applyChatHistorySnapshot, attachToExistingStream, chatHistory, queryClient])
const processSSEStream = useCallback(
async (
reader: ReadableStreamDefaultReader,
assistantId: string,
- expectedGen?: number
+ options?: StreamProcessingOptions
) => {
+ const { expectedGen, initialLastEventId = 0, preserveExistingState = false } = options ?? {}
const decoder = new TextDecoder()
streamReaderRef.current = reader
let buffer = ''
- const blocks: ContentBlock[] = []
+ const blocks: ContentBlock[] = preserveExistingState ? [...streamingBlocksRef.current] : []
const toolMap = new Map()
const toolArgsMap = new Map>()
- const clientExecutionStarted = new Set()
+ const clientExecutionStarted = clientExecutionStartedRef.current
let activeSubagent: string | undefined
let activeCompactionId: string | undefined
- let runningText = ''
+ let runningText = preserveExistingState ? streamingContentRef.current : ''
let lastContentSource: 'main' | 'subagent' | null = null
let streamRequestId: string | undefined
+ let lastEventId = initialLastEventId
+ let sawDoneEvent = false
- streamingContentRef.current = ''
- streamingBlocksRef.current = []
+ if (!preserveExistingState) {
+ streamingContentRef.current = ''
+ streamingBlocksRef.current = []
+ }
+
+ for (const [index, block] of blocks.entries()) {
+ if (block.type === 'tool_call' && block.toolCall?.id) {
+ toolMap.set(block.toolCall.id, index)
+ if (block.toolCall.params) {
+ toolArgsMap.set(block.toolCall.id, block.toolCall.params)
+ }
+ }
+ }
const ensureTextBlock = (): ContentBlock => {
const last = blocks[blocks.length - 1]
@@ -716,6 +1003,14 @@ export function useChat(
continue
}
+ if (typeof (parsed as SSEPayload & { eventId?: unknown }).eventId === 'number') {
+ lastEventId = Math.max(
+ lastEventId,
+ (parsed as SSEPayload & { eventId: number }).eventId
+ )
+ lastEventIdRef.current = lastEventId
+ }
+
logger.debug('SSE event received', parsed)
switch (parsed.type) {
case 'chat_id': {
@@ -1167,6 +1462,10 @@ export function useChat(
appendInlineErrorTag(buildInlineErrorTag(parsed))
break
}
+ case 'done': {
+ sawDoneEvent = true
+ break
+ }
}
}
}
@@ -1175,7 +1474,11 @@ export function useChat(
streamReaderRef.current = null
}
}
- return sawStreamError
+ return {
+ sawStreamError,
+ sawDoneEvent,
+ lastEventId,
+ }
},
[workspaceId, queryClient, addResource, removeResource]
)
@@ -1247,6 +1550,16 @@ export function useChat(
const messagesRef = useRef(messages)
messagesRef.current = messages
+ const visibleMessageQueue = useMemo(
+ () =>
+ pendingRecoveryMessage
+ ? [
+ pendingRecoveryMessage,
+ ...messageQueue.filter((msg) => msg.id !== pendingRecoveryMessage.id),
+ ]
+ : messageQueue,
+ [messageQueue, pendingRecoveryMessage]
+ )
const finalize = useCallback(
(options?: { error?: boolean }) => {
@@ -1267,6 +1580,21 @@ export function useChat(
return
}
+ const recoveryMessage = pendingRecoveryMessageRef.current
+ if (recoveryMessage) {
+ setPendingRecoveryMessage(null)
+ const gen = streamGenRef.current
+ queueMicrotask(() => {
+ if (streamGenRef.current !== gen) return
+ sendMessageRef.current(
+ recoveryMessage.content,
+ recoveryMessage.fileAttachments,
+ recoveryMessage.contexts
+ )
+ })
+ return
+ }
+
const next = messageQueueRef.current[0]
if (next) {
setMessageQueue((prev) => prev.filter((m) => m.id !== next.id))
@@ -1307,6 +1635,8 @@ export function useChat(
pendingUserMsgRef.current = { id: userMessageId, content: message }
streamIdRef.current = userMessageId
+ lastEventIdRef.current = 0
+ clientExecutionStartedRef.current.clear()
const storedAttachments: TaskStoredFileAttachment[] | undefined =
fileAttachments && fileAttachments.length > 0
@@ -1320,6 +1650,9 @@ export function useChat(
: undefined
const requestChatId = selectedChatIdRef.current ?? chatIdRef.current
+ const previousChatHistory = requestChatId
+ ? queryClient.getQueryData(taskKeys.detail(requestChatId))
+ : undefined
if (requestChatId) {
const cachedUserMsg: TaskStoredMessage = {
id: userMessageId,
@@ -1339,6 +1672,7 @@ export function useChat(
}
const userAttachments = storedAttachments?.map(toDisplayAttachment)
+ const previousMessages = messagesRef.current
const messageContexts = contexts?.map((c) => ({
kind: c.kind,
@@ -1402,20 +1736,132 @@ export function useChat(
if (!response.body) throw new Error('No response body')
- const hadStreamError = await processSSEStream(response.body.getReader(), assistantId, gen)
+ const termination = await processSSEStream(response.body.getReader(), assistantId, {
+ expectedGen: gen,
+ })
if (streamGenRef.current === gen) {
- finalize(hadStreamError ? { error: true } : undefined)
+ if (termination.sawStreamError) {
+ finalize({ error: true })
+ return
+ }
+
+ const batch = await fetchStreamBatch(
+ userMessageId,
+ termination.lastEventId,
+ abortController.signal
+ )
+ if (streamGenRef.current !== gen) {
+ return
+ }
+ if (isTerminalStreamStatus(batch.status)) {
+ finalize(batch.status === 'error' ? { error: true } : undefined)
+ return
+ }
+
+ logger.warn(
+ 'Primary stream ended without terminal event, attempting in-place reconnect',
+ {
+ streamId: userMessageId,
+ lastEventId: termination.lastEventId,
+ streamStatus: batch.status,
+ sawDoneEvent: termination.sawDoneEvent,
+ }
+ )
+
+ const reconnectResult = await attachToExistingStream({
+ streamId: userMessageId,
+ assistantId,
+ expectedGen: gen,
+ snapshot: {
+ events: batch.events,
+ status: batch.status,
+ },
+ initialLastEventId:
+ batch.events[batch.events.length - 1]?.eventId ?? termination.lastEventId,
+ })
+
+ if (streamGenRef.current === gen && !reconnectResult.aborted) {
+ finalize(reconnectResult.error ? { error: true } : undefined)
+ }
}
} catch (err) {
if (err instanceof Error && err.name === 'AbortError') return
- setError(err instanceof Error ? err.message : 'Failed to send message')
+ const errorMessage = err instanceof Error ? err.message : 'Failed to send message'
+ if (requestChatId && isActiveStreamConflictError(errorMessage)) {
+ logger.info('Active stream conflict detected while sending message; reattaching', {
+ chatId: requestChatId,
+ attemptedStreamId: userMessageId,
+ })
+
+ if (previousChatHistory) {
+ queryClient.setQueryData(taskKeys.detail(requestChatId), previousChatHistory)
+ }
+ setMessages(previousMessages)
+ const queuedMessage: QueuedMessage = {
+ id: crypto.randomUUID(),
+ content: message,
+ fileAttachments,
+ contexts,
+ }
+ pendingRecoveryMessageRef.current = queuedMessage
+ setPendingRecoveryMessage(queuedMessage)
+
+ try {
+ const pendingRecovery = await preparePendingStreamRecovery(requestChatId)
+ if (!pendingRecovery) {
+ setError(errorMessage)
+ if (streamGenRef.current === gen) {
+ finalize({ error: true })
+ }
+ return
+ }
+
+ streamIdRef.current = pendingRecovery.streamId
+ lastEventIdRef.current =
+ pendingRecovery.snapshot?.events?.[pendingRecovery.snapshot.events.length - 1]
+ ?.eventId ?? 0
+
+ const rehydratedMessages = messagesRef.current
+ const lastAssistantMsg = [...rehydratedMessages]
+ .reverse()
+ .find((m) => m.role === 'assistant')
+ const recoveryAssistantId = lastAssistantMsg?.id ?? assistantId
+
+ const reconnectResult = await attachToExistingStream({
+ streamId: pendingRecovery.streamId,
+ assistantId: recoveryAssistantId,
+ expectedGen: gen,
+ snapshot: pendingRecovery.snapshot,
+ initialLastEventId: lastEventIdRef.current,
+ })
+
+ if (streamGenRef.current === gen && !reconnectResult.aborted) {
+ finalize(reconnectResult.error ? { error: true } : undefined)
+ }
+ return
+ } catch (recoveryError) {
+ logger.warn('Failed to recover active stream after conflict', {
+ chatId: requestChatId,
+ error: recoveryError instanceof Error ? recoveryError.message : String(recoveryError),
+ })
+ }
+ }
+
+ setError(errorMessage)
if (streamGenRef.current === gen) {
finalize({ error: true })
}
return
}
},
- [workspaceId, queryClient, processSSEStream, finalize]
+ [
+ workspaceId,
+ queryClient,
+ processSSEStream,
+ finalize,
+ attachToExistingStream,
+ preparePendingStreamRecovery,
+ ]
)
sendMessageRef.current = sendMessage
@@ -1434,6 +1880,10 @@ export function useChat(
abortControllerRef.current = null
sendingRef.current = false
setIsSending(false)
+ setIsReconnecting(false)
+ lastEventIdRef.current = 0
+ pendingRecoveryMessageRef.current = null
+ setPendingRecoveryMessage(null)
setMessages((prev) =>
prev.map((msg) => {
@@ -1521,24 +1971,47 @@ export function useChat(
}, [invalidateChatQueries, persistPartialResponse, executionStream])
const removeFromQueue = useCallback((id: string) => {
+ if (pendingRecoveryMessageRef.current?.id === id) {
+ pendingRecoveryMessageRef.current = null
+ setPendingRecoveryMessage(null)
+ return
+ }
messageQueueRef.current = messageQueueRef.current.filter((m) => m.id !== id)
setMessageQueue((prev) => prev.filter((m) => m.id !== id))
}, [])
const sendNow = useCallback(
async (id: string) => {
- const msg = messageQueueRef.current.find((m) => m.id === id)
+ const recoveryMessage = pendingRecoveryMessageRef.current
+ const msg =
+ recoveryMessage?.id === id
+ ? recoveryMessage
+ : messageQueueRef.current.find((m) => m.id === id)
if (!msg) return
// Eagerly update ref so a rapid second click finds the message already gone
- messageQueueRef.current = messageQueueRef.current.filter((m) => m.id !== id)
+ if (recoveryMessage?.id === id) {
+ pendingRecoveryMessageRef.current = null
+ setPendingRecoveryMessage(null)
+ } else {
+ messageQueueRef.current = messageQueueRef.current.filter((m) => m.id !== id)
+ }
await stopGeneration()
- setMessageQueue((prev) => prev.filter((m) => m.id !== id))
+ if (recoveryMessage?.id !== id) {
+ setMessageQueue((prev) => prev.filter((m) => m.id !== id))
+ }
await sendMessage(msg.content, msg.fileAttachments, msg.contexts)
},
[stopGeneration, sendMessage]
)
const editQueuedMessage = useCallback((id: string): QueuedMessage | undefined => {
+ const recoveryMessage = pendingRecoveryMessageRef.current
+ if (recoveryMessage?.id === id) {
+ pendingRecoveryMessageRef.current = null
+ setPendingRecoveryMessage(null)
+ return recoveryMessage
+ }
+
const msg = messageQueueRef.current.find((m) => m.id === id)
if (!msg) return undefined
messageQueueRef.current = messageQueueRef.current.filter((m) => m.id !== id)
@@ -1552,6 +2025,9 @@ export function useChat(
abortControllerRef.current = null
streamGenRef.current++
sendingRef.current = false
+ lastEventIdRef.current = 0
+ clientExecutionStartedRef.current.clear()
+ pendingRecoveryMessageRef.current = null
}
}, [])
@@ -1569,7 +2045,7 @@ export function useChat(
addResource,
removeResource,
reorderResources,
- messageQueue,
+ messageQueue: visibleMessageQueue,
removeFromQueue,
sendNow,
editQueuedMessage,
diff --git a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/panel.tsx b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/panel.tsx
index f5a65ab0760..0ea9cb6c51a 100644
--- a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/panel.tsx
+++ b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/panel.tsx
@@ -318,6 +318,7 @@ export const Panel = memo(function Panel() {
const {
messages: copilotMessages,
isSending: copilotIsSending,
+ isReconnecting: copilotIsReconnecting,
sendMessage: copilotSendMessage,
stopGeneration: copilotStopGeneration,
resolvedChatId: copilotResolvedChatId,
@@ -812,6 +813,7 @@ export const Panel = memo(function Panel() {
className='min-h-0 flex-1'
messages={copilotMessages}
isSending={copilotIsSending}
+ isReconnecting={copilotIsReconnecting}
onSubmit={handleCopilotSubmit}
onStopGeneration={copilotStopGeneration}
messageQueue={copilotMessageQueue}
diff --git a/apps/sim/hooks/queries/tasks.ts b/apps/sim/hooks/queries/tasks.ts
index 8bc3c10c536..6e071b60588 100644
--- a/apps/sim/hooks/queries/tasks.ts
+++ b/apps/sim/hooks/queries/tasks.ts
@@ -129,7 +129,10 @@ export function useTasks(workspaceId?: string) {
})
}
-async function fetchChatHistory(chatId: string, signal?: AbortSignal): Promise {
+export async function fetchChatHistory(
+ chatId: string,
+ signal?: AbortSignal
+): Promise {
const response = await fetch(`/api/copilot/chat?chatId=${chatId}`, { signal })
if (!response.ok) {
diff --git a/apps/sim/lib/copilot/chat-streaming.ts b/apps/sim/lib/copilot/chat-streaming.ts
index 6d090a4866b..b3d465667d6 100644
--- a/apps/sim/lib/copilot/chat-streaming.ts
+++ b/apps/sim/lib/copilot/chat-streaming.ts
@@ -294,6 +294,21 @@ export function createSSEStream(params: StreamingOrchestrationParams): ReadableS
return new ReadableStream({
async start(controller) {
const encoder = new TextEncoder()
+ const markClientDisconnected = (reason: string) => {
+ if (clientDisconnected) return
+ clientDisconnected = true
+ logger.info(
+ appendCopilotLogContext('Client disconnected from live SSE stream', {
+ requestId,
+ messageId,
+ }),
+ {
+ streamId,
+ runId,
+ reason,
+ }
+ )
+ }
await resetStreamBuffer(streamId)
await setStreamMeta(streamId, { status: 'active', userId, executionId, runId })
@@ -381,7 +396,7 @@ export function createSSEStream(params: StreamingOrchestrationParams): ReadableS
)
}
} catch {
- clientDisconnected = true
+ markClientDisconnected('enqueue_failed')
}
}
@@ -424,7 +439,7 @@ export function createSSEStream(params: StreamingOrchestrationParams): ReadableS
try {
controller.enqueue(encoder.encode(': keepalive\n\n'))
} catch {
- clientDisconnected = true
+ markClientDisconnected('keepalive_failed')
}
}, 15_000)
@@ -498,6 +513,18 @@ export function createSSEStream(params: StreamingOrchestrationParams): ReadableS
await eventWriter.close()
await setStreamMeta(streamId, { status: 'complete', userId, executionId, runId })
await updateRunStatus(runId, 'complete', { completedAt: new Date() }).catch(() => {})
+ if (clientDisconnected) {
+ logger.info(
+ appendCopilotLogContext('Orchestration completed after client disconnect', {
+ requestId,
+ messageId,
+ }),
+ {
+ streamId,
+ runId,
+ }
+ )
+ }
} catch (error) {
if (abortController.signal.aborted) {
logger.error(
@@ -544,6 +571,12 @@ export function createSSEStream(params: StreamingOrchestrationParams): ReadableS
error: errorMessage,
}).catch(() => {})
} finally {
+ logger.info(appendCopilotLogContext('Closing live SSE stream', { requestId, messageId }), {
+ streamId,
+ runId,
+ clientDisconnected,
+ aborted: abortController.signal.aborted,
+ })
clearInterval(keepaliveInterval)
if (abortPoller) {
clearInterval(abortPoller)
@@ -566,6 +599,16 @@ export function createSSEStream(params: StreamingOrchestrationParams): ReadableS
}
},
cancel() {
+ logger.info(
+ appendCopilotLogContext('ReadableStream cancel received from client', {
+ requestId,
+ messageId,
+ }),
+ {
+ streamId,
+ runId,
+ }
+ )
clientDisconnected = true
if (eventWriter) {
eventWriter.flush().catch(() => {})