Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion apps/sim/app/api/workflows/[id]/execute/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -865,15 +865,21 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
registerManualExecutionAborter(executionId, timeoutController.abort)
isManualAbortRegistered = true

let localEventSeq = 0
const sendEvent = (event: ExecutionEvent) => {
const isBuffered = event.type !== 'stream:chunk' && event.type !== 'stream:done'
if (isBuffered) {
localEventSeq++
event.eventId = localEventSeq
}
if (!isStreamClosed) {
try {
controller.enqueue(encodeSSEEvent(event))
} catch {
isStreamClosed = true
}
}
if (event.type !== 'stream:chunk' && event.type !== 'stream:done') {
if (isBuffered) {
eventWriter.write(event).catch(() => {})
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import { authorizeWorkflowByWorkspacePermission } from '@/lib/workflows/utils'
const logger = createLogger('ExecutionStreamReconnectAPI')

const POLL_INTERVAL_MS = 500
const MAX_POLL_DURATION_MS = 10 * 60 * 1000 // 10 minutes
const MAX_POLL_DURATION_MS = 55 * 60 * 1000 // 55 minutes (just under Redis 1hr TTL)

function isTerminalStatus(status: ExecutionStreamStatus): boolean {
return status === 'complete' || status === 'error' || status === 'cancelled'
Expand Down Expand Up @@ -101,6 +101,7 @@ export async function GET(
const events = await readExecutionEvents(executionId, lastEventId)
for (const entry of events) {
if (closed) return
entry.event.eventId = entry.eventId
enqueue(formatSSEEvent(entry.event))
lastEventId = entry.eventId
}
Expand All @@ -119,6 +120,7 @@ export async function GET(
const newEvents = await readExecutionEvents(executionId, lastEventId)
for (const entry of newEvents) {
if (closed) return
entry.event.eventId = entry.eventId
enqueue(formatSSEEvent(entry.event))
lastEventId = entry.eventId
}
Expand All @@ -128,6 +130,7 @@ export async function GET(
const finalEvents = await readExecutionEvents(executionId, lastEventId)
for (const entry of finalEvents) {
if (closed) return
entry.event.eventId = entry.eventId
enqueue(formatSSEEvent(entry.event))
lastEventId = entry.eventId
}
Expand Down
21 changes: 19 additions & 2 deletions apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import { useExecutionStream } from '@/hooks/use-execution-stream'
import { useExecutionStore } from '@/stores/execution/store'
import { useFolderStore } from '@/stores/folders/store'
import type { ChatContext } from '@/stores/panel'
import { useTerminalConsoleStore } from '@/stores/terminal'
import { consolePersistence, useTerminalConsoleStore } from '@/stores/terminal'
import { useWorkflowRegistry } from '@/stores/workflows/registry/store'
import type {
ChatMessage,
Expand Down Expand Up @@ -147,6 +147,22 @@ function isActiveStreamConflictError(input: unknown): boolean {
return input.includes('A response is already in progress for this chat')
}

/**
* Extracts tool call IDs from snapshot events so that replayed client-executable
* tool calls are not re-executed after a page refresh.
*/
function extractToolCallIdsFromSnapshot(snapshot?: StreamSnapshot | null): Set<string> {
const ids = new Set<string>()
if (!snapshot?.events) return ids
for (const entry of snapshot.events) {
const event = entry.event
if (event.type === 'tool_call' && typeof event.toolCallId === 'string') {
ids.add(event.toolCallId)
}
}
return ids
}

function buildReplayStream(events: StreamEventEnvelope[]): ReadableStream<Uint8Array> {
const encoder = new TextEncoder()
return new ReadableStream<Uint8Array>({
Expand Down Expand Up @@ -860,7 +876,7 @@ export function useChat(
sendingRef.current = true
streamingContentRef.current = ''
streamingBlocksRef.current = []
clientExecutionStartedRef.current.clear()
clientExecutionStartedRef.current = extractToolCallIdsFromSnapshot(snapshot)

const assistantId = crypto.randomUUID()

Expand Down Expand Up @@ -2071,6 +2087,7 @@ export function useChat(
})

executionStream.cancel(workflowId)
consolePersistence.executionEnded()
execState.setIsExecuting(workflowId, false)
execState.setIsDebugging(workflowId, false)
execState.setActiveBlocks(workflowId, new Set())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ import { useChatStore } from '@/stores/chat/store'
import { getChatPosition } from '@/stores/chat/utils'
import { useCurrentWorkflowExecution } from '@/stores/execution'
import { useOperationQueue } from '@/stores/operation-queue/store'
import { useTerminalConsoleStore } from '@/stores/terminal'
import { useTerminalConsoleStore, useWorkflowConsoleEntries } from '@/stores/terminal'
import { useWorkflowRegistry } from '@/stores/workflows/registry/store'
import { useSubBlockStore } from '@/stores/workflows/subblock/store'
import { useWorkflowStore } from '@/stores/workflows/workflow/store'
Expand Down Expand Up @@ -265,8 +265,9 @@ export function Chat() {
)

const hasConsoleHydrated = useTerminalConsoleStore((state) => state._hasHydrated)
const entriesFromStore = useTerminalConsoleStore((state) => state.entries)
const entries = hasConsoleHydrated ? entriesFromStore : []
const entries = useWorkflowConsoleEntries(
hasConsoleHydrated && typeof activeWorkflowId === 'string' ? activeWorkflowId : undefined
)
const { isExecuting } = useCurrentWorkflowExecution()
const { handleRunWorkflow, handleCancelExecution } = useWorkflowExecution()
const { data: session } = useSession()
Expand Down Expand Up @@ -427,9 +428,8 @@ export function Chat() {
})

const outputEntries = useMemo(() => {
if (!activeWorkflowId) return []
return entries.filter((entry) => entry.workflowId === activeWorkflowId && entry.output)
}, [entries, activeWorkflowId])
return entries.filter((entry) => entry.output)
}, [entries])

const workflowMessages = useMemo(() => {
if (!activeWorkflowId) return []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ export function EvalInput({
placeholder='How accurate is the response?'
disabled={isPreview || disabled}
className={cn(
'min-h-[80px] whitespace-pre-wrap text-transparent caret-white'
'min-h-[80px] whitespace-pre-wrap text-transparent caret-foreground'
)}
rows={3}
/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import { ToggleButton } from '@/app/workspace/[workspaceId]/w/[workflowId]/compo
import { useContextMenu } from '@/app/workspace/[workspaceId]/w/components/sidebar/hooks'
import { useCodeViewerFeatures } from '@/hooks/use-code-viewer'
import type { ConsoleEntry } from '@/stores/terminal'
import { useTerminalStore } from '@/stores/terminal'
import { safeConsoleStringify, useTerminalStore } from '@/stores/terminal'

interface OutputCodeContentProps {
code: string
Expand Down Expand Up @@ -93,11 +93,10 @@ export interface OutputPanelProps {
handleTrainingClick: (e: React.MouseEvent) => void
showCopySuccess: boolean
handleCopy: () => void
filteredEntries: ConsoleEntry[]
hasEntries: boolean
handleExportConsole: (e: React.MouseEvent) => void
handleClearConsole: (e: React.MouseEvent) => void
shouldShowCodeDisplay: boolean
outputDataStringified: string
outputData: unknown
handleClearConsoleFromMenu: () => void
}
Expand All @@ -121,11 +120,10 @@ export const OutputPanel = React.memo(function OutputPanel({
handleTrainingClick,
showCopySuccess,
handleCopy,
filteredEntries,
hasEntries,
handleExportConsole,
handleClearConsole,
shouldShowCodeDisplay,
outputDataStringified,
outputData,
handleClearConsoleFromMenu,
}: OutputPanelProps) {
Expand Down Expand Up @@ -276,6 +274,19 @@ export const OutputPanel = React.memo(function OutputPanel({
[isOutputSearchActive, outputSearchQuery]
)

const outputDataStringified = useMemo(() => {
if (
structuredView ||
shouldShowCodeDisplay ||
outputData === null ||
outputData === undefined
) {
return ''
}

return safeConsoleStringify(outputData)
}, [outputData, shouldShowCodeDisplay, structuredView])

return (
<>
<div
Expand Down Expand Up @@ -420,7 +431,7 @@ export const OutputPanel = React.memo(function OutputPanel({
<span>{showCopySuccess ? 'Copied' : 'Copy output'}</span>
</Tooltip.Content>
</Tooltip.Root>
{filteredEntries.length > 0 && (
{hasEntries && (
<>
<Tooltip.Root>
<Tooltip.Trigger asChild>
Expand Down
Loading
Loading