diff --git a/apps/sim/app/api/workflows/[id]/execute/route.ts b/apps/sim/app/api/workflows/[id]/execute/route.ts index 8cee947272f..fa837f43a9e 100644 --- a/apps/sim/app/api/workflows/[id]/execute/route.ts +++ b/apps/sim/app/api/workflows/[id]/execute/route.ts @@ -865,7 +865,13 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: registerManualExecutionAborter(executionId, timeoutController.abort) isManualAbortRegistered = true + let localEventSeq = 0 const sendEvent = (event: ExecutionEvent) => { + const isBuffered = event.type !== 'stream:chunk' && event.type !== 'stream:done' + if (isBuffered) { + localEventSeq++ + event.eventId = localEventSeq + } if (!isStreamClosed) { try { controller.enqueue(encodeSSEEvent(event)) @@ -873,7 +879,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: isStreamClosed = true } } - if (event.type !== 'stream:chunk' && event.type !== 'stream:done') { + if (isBuffered) { eventWriter.write(event).catch(() => {}) } } diff --git a/apps/sim/app/api/workflows/[id]/executions/[executionId]/stream/route.ts b/apps/sim/app/api/workflows/[id]/executions/[executionId]/stream/route.ts index 745c5b7d44e..0893209c961 100644 --- a/apps/sim/app/api/workflows/[id]/executions/[executionId]/stream/route.ts +++ b/apps/sim/app/api/workflows/[id]/executions/[executionId]/stream/route.ts @@ -13,7 +13,7 @@ import { authorizeWorkflowByWorkspacePermission } from '@/lib/workflows/utils' const logger = createLogger('ExecutionStreamReconnectAPI') const POLL_INTERVAL_MS = 500 -const MAX_POLL_DURATION_MS = 10 * 60 * 1000 // 10 minutes +const MAX_POLL_DURATION_MS = 55 * 60 * 1000 // 55 minutes (just under Redis 1hr TTL) function isTerminalStatus(status: ExecutionStreamStatus): boolean { return status === 'complete' || status === 'error' || status === 'cancelled' @@ -101,6 +101,7 @@ export async function GET( const events = await readExecutionEvents(executionId, lastEventId) for (const entry of events) { if (closed) return + entry.event.eventId = entry.eventId enqueue(formatSSEEvent(entry.event)) lastEventId = entry.eventId } @@ -119,6 +120,7 @@ export async function GET( const newEvents = await readExecutionEvents(executionId, lastEventId) for (const entry of newEvents) { if (closed) return + entry.event.eventId = entry.eventId enqueue(formatSSEEvent(entry.event)) lastEventId = entry.eventId } @@ -128,6 +130,7 @@ export async function GET( const finalEvents = await readExecutionEvents(executionId, lastEventId) for (const entry of finalEvents) { if (closed) return + entry.event.eventId = entry.eventId enqueue(formatSSEEvent(entry.event)) lastEventId = entry.eventId } diff --git a/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts b/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts index 1f4e4b2a08b..dde5ca739f2 100644 --- a/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts +++ b/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts @@ -39,7 +39,7 @@ import { useExecutionStream } from '@/hooks/use-execution-stream' import { useExecutionStore } from '@/stores/execution/store' import { useFolderStore } from '@/stores/folders/store' import type { ChatContext } from '@/stores/panel' -import { useTerminalConsoleStore } from '@/stores/terminal' +import { consolePersistence, useTerminalConsoleStore } from '@/stores/terminal' import { useWorkflowRegistry } from '@/stores/workflows/registry/store' import type { ChatMessage, @@ -147,6 +147,22 @@ function isActiveStreamConflictError(input: unknown): boolean { return input.includes('A response is already in progress for this chat') } +/** + * Extracts tool call IDs from snapshot events so that replayed client-executable + * tool calls are not re-executed after a page refresh. + */ +function extractToolCallIdsFromSnapshot(snapshot?: StreamSnapshot | null): Set { + const ids = new Set() + if (!snapshot?.events) return ids + for (const entry of snapshot.events) { + const event = entry.event + if (event.type === 'tool_call' && typeof event.toolCallId === 'string') { + ids.add(event.toolCallId) + } + } + return ids +} + function buildReplayStream(events: StreamEventEnvelope[]): ReadableStream { const encoder = new TextEncoder() return new ReadableStream({ @@ -860,7 +876,7 @@ export function useChat( sendingRef.current = true streamingContentRef.current = '' streamingBlocksRef.current = [] - clientExecutionStartedRef.current.clear() + clientExecutionStartedRef.current = extractToolCallIdsFromSnapshot(snapshot) const assistantId = crypto.randomUUID() @@ -2071,6 +2087,7 @@ export function useChat( }) executionStream.cancel(workflowId) + consolePersistence.executionEnded() execState.setIsExecuting(workflowId, false) execState.setIsDebugging(workflowId, false) execState.setActiveBlocks(workflowId, new Set()) diff --git a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/chat/chat.tsx b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/chat/chat.tsx index 5cf7652de7f..3f0bd88c821 100644 --- a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/chat/chat.tsx +++ b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/chat/chat.tsx @@ -56,7 +56,7 @@ import { useChatStore } from '@/stores/chat/store' import { getChatPosition } from '@/stores/chat/utils' import { useCurrentWorkflowExecution } from '@/stores/execution' import { useOperationQueue } from '@/stores/operation-queue/store' -import { useTerminalConsoleStore } from '@/stores/terminal' +import { useTerminalConsoleStore, useWorkflowConsoleEntries } from '@/stores/terminal' import { useWorkflowRegistry } from '@/stores/workflows/registry/store' import { useSubBlockStore } from '@/stores/workflows/subblock/store' import { useWorkflowStore } from '@/stores/workflows/workflow/store' @@ -265,8 +265,9 @@ export function Chat() { ) const hasConsoleHydrated = useTerminalConsoleStore((state) => state._hasHydrated) - const entriesFromStore = useTerminalConsoleStore((state) => state.entries) - const entries = hasConsoleHydrated ? entriesFromStore : [] + const entries = useWorkflowConsoleEntries( + hasConsoleHydrated && typeof activeWorkflowId === 'string' ? activeWorkflowId : undefined + ) const { isExecuting } = useCurrentWorkflowExecution() const { handleRunWorkflow, handleCancelExecution } = useWorkflowExecution() const { data: session } = useSession() @@ -427,9 +428,8 @@ export function Chat() { }) const outputEntries = useMemo(() => { - if (!activeWorkflowId) return [] - return entries.filter((entry) => entry.workflowId === activeWorkflowId && entry.output) - }, [entries, activeWorkflowId]) + return entries.filter((entry) => entry.output) + }, [entries]) const workflowMessages = useMemo(() => { if (!activeWorkflowId) return [] diff --git a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/editor/components/sub-block/components/eval-input/eval-input.tsx b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/editor/components/sub-block/components/eval-input/eval-input.tsx index 447752868e7..d8c9f8a84d6 100644 --- a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/editor/components/sub-block/components/eval-input/eval-input.tsx +++ b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/editor/components/sub-block/components/eval-input/eval-input.tsx @@ -216,7 +216,7 @@ export function EvalInput({ placeholder='How accurate is the response?' disabled={isPreview || disabled} className={cn( - 'min-h-[80px] whitespace-pre-wrap text-transparent caret-white' + 'min-h-[80px] whitespace-pre-wrap text-transparent caret-foreground' )} rows={3} /> diff --git a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/terminal/components/output-panel/output-panel.tsx b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/terminal/components/output-panel/output-panel.tsx index 20979de5d04..27776be923e 100644 --- a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/terminal/components/output-panel/output-panel.tsx +++ b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/terminal/components/output-panel/output-panel.tsx @@ -33,7 +33,7 @@ import { ToggleButton } from '@/app/workspace/[workspaceId]/w/[workflowId]/compo import { useContextMenu } from '@/app/workspace/[workspaceId]/w/components/sidebar/hooks' import { useCodeViewerFeatures } from '@/hooks/use-code-viewer' import type { ConsoleEntry } from '@/stores/terminal' -import { useTerminalStore } from '@/stores/terminal' +import { safeConsoleStringify, useTerminalStore } from '@/stores/terminal' interface OutputCodeContentProps { code: string @@ -93,11 +93,10 @@ export interface OutputPanelProps { handleTrainingClick: (e: React.MouseEvent) => void showCopySuccess: boolean handleCopy: () => void - filteredEntries: ConsoleEntry[] + hasEntries: boolean handleExportConsole: (e: React.MouseEvent) => void handleClearConsole: (e: React.MouseEvent) => void shouldShowCodeDisplay: boolean - outputDataStringified: string outputData: unknown handleClearConsoleFromMenu: () => void } @@ -121,11 +120,10 @@ export const OutputPanel = React.memo(function OutputPanel({ handleTrainingClick, showCopySuccess, handleCopy, - filteredEntries, + hasEntries, handleExportConsole, handleClearConsole, shouldShowCodeDisplay, - outputDataStringified, outputData, handleClearConsoleFromMenu, }: OutputPanelProps) { @@ -276,6 +274,19 @@ export const OutputPanel = React.memo(function OutputPanel({ [isOutputSearchActive, outputSearchQuery] ) + const outputDataStringified = useMemo(() => { + if ( + structuredView || + shouldShowCodeDisplay || + outputData === null || + outputData === undefined + ) { + return '' + } + + return safeConsoleStringify(outputData) + }, [outputData, shouldShowCodeDisplay, structuredView]) + return ( <>
{showCopySuccess ? 'Copied' : 'Copy output'} - {filteredEntries.length > 0 && ( + {hasEntries && ( <> diff --git a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/terminal/terminal.tsx b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/terminal/terminal.tsx index 4bc8623982d..9272103bab7 100644 --- a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/terminal/terminal.tsx +++ b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/terminal/terminal.tsx @@ -14,6 +14,7 @@ import { Trash2, } from 'lucide-react' import Link from 'next/link' +import { List, type RowComponentProps, useListRef } from 'react-window' import { Button, ChevronDown, @@ -44,19 +45,27 @@ import { type EntryNode, type ExecutionGroup, flattenBlockEntriesOnly, + flattenVisibleExecutionRows, getBlockColor, getBlockIcon, groupEntriesByExecution, isEventFromEditableElement, type NavigableBlockEntry, TERMINAL_CONFIG, + type VisibleTerminalRow, } from '@/app/workspace/[workspaceId]/w/[workflowId]/components/terminal/utils' import { useContextMenu } from '@/app/workspace/[workspaceId]/w/components/sidebar/hooks' import { useShowTrainingControls } from '@/hooks/queries/general-settings' import { OUTPUT_PANEL_WIDTH, TERMINAL_HEIGHT } from '@/stores/constants' import { sendMothershipMessage } from '@/stores/notifications/utils' import type { ConsoleEntry } from '@/stores/terminal' -import { useTerminalConsoleStore, useTerminalStore } from '@/stores/terminal' +import { + safeConsoleStringify, + useConsoleEntry, + useTerminalConsoleStore, + useTerminalStore, + useWorkflowConsoleEntries, +} from '@/stores/terminal' import { useWorkflowRegistry } from '@/stores/workflows/registry/store' import { useWorkflowStore } from '@/stores/workflows/workflow/store' @@ -157,6 +166,7 @@ const IterationNodeRow = memo(function IterationNodeRow({ onToggle, expandedNodes, onToggleNode, + renderChildren = true, }: { node: EntryNode selectedEntryId: string | null @@ -165,6 +175,7 @@ const IterationNodeRow = memo(function IterationNodeRow({ onToggle: () => void expandedNodes: Set onToggleNode: (nodeId: string) => void + renderChildren?: boolean }) { const { entry, children, iterationInfo } = node const hasError = Boolean(entry.error) || children.some((c) => c.entry.error) @@ -219,7 +230,7 @@ const IterationNodeRow = memo(function IterationNodeRow({
{/* Nested Blocks */} - {isExpanded && hasChildren && ( + {renderChildren && isExpanded && hasChildren && (
{children.map((child) => ( void expandedNodes: Set onToggleNode: (nodeId: string) => void + renderChildren?: boolean }) { const { entry, children } = node const BlockIcon = getBlockIcon(entry.blockType) @@ -320,7 +333,7 @@ const SubflowNodeRow = memo(function SubflowNodeRow({
{/* Nested Iterations */} - {isExpanded && hasChildren && ( + {renderChildren && isExpanded && hasChildren && (
{children.map((iterNode) => ( void expandedNodes: Set onToggleNode: (nodeId: string) => void + renderChildren?: boolean }) { const { entry, children } = node const BlockIcon = getBlockIcon(entry.blockType) @@ -431,7 +446,7 @@ const WorkflowNodeRow = memo(function WorkflowNodeRow({
{/* Nested Child Blocks — rendered through EntryNodeRow for full loop/parallel support */} - {isExpanded && hasChildren && ( + {renderChildren && isExpanded && hasChildren && (
{children.map((child) => ( void expandedNodes: Set onToggleNode: (nodeId: string) => void + renderChildren?: boolean }) { const { nodeType } = node @@ -475,6 +492,7 @@ const EntryNodeRow = memo(function EntryNodeRow({ onSelectEntry={onSelectEntry} expandedNodes={expandedNodes} onToggleNode={onToggleNode} + renderChildren={renderChildren} /> ) } @@ -487,6 +505,7 @@ const EntryNodeRow = memo(function EntryNodeRow({ onSelectEntry={onSelectEntry} expandedNodes={expandedNodes} onToggleNode={onToggleNode} + renderChildren={renderChildren} /> ) } @@ -501,6 +520,7 @@ const EntryNodeRow = memo(function EntryNodeRow({ onToggle={() => onToggleNode(node.entry.id)} expandedNodes={expandedNodes} onToggleNode={onToggleNode} + renderChildren={renderChildren} /> ) } @@ -515,42 +535,122 @@ const EntryNodeRow = memo(function EntryNodeRow({ ) }) -/** - * Execution group row component with dashed separator - */ -const ExecutionGroupRow = memo(function ExecutionGroupRow({ - group, - showSeparator, +interface TerminalLogListRowProps { + rows: VisibleTerminalRow[] + selectedEntryId: string | null + onSelectEntry: (entry: ConsoleEntry) => void + expandedNodes: Set + onToggleNode: (nodeId: string) => void +} + +function TerminalLogListRow({ + index, + style, + ...props +}: RowComponentProps) { + const { rows, selectedEntryId, onSelectEntry, expandedNodes, onToggleNode } = props + const row = rows[index] + + if (row.rowType === 'separator') { + return ( +
+
+
+ ) + } + + return ( +
+
+ +
+
+ ) +} + +const TerminalLogsPane = memo(function TerminalLogsPane({ + executionGroups, selectedEntryId, onSelectEntry, expandedNodes, onToggleNode, }: { - group: ExecutionGroup - showSeparator: boolean + executionGroups: ExecutionGroup[] selectedEntryId: string | null onSelectEntry: (entry: ConsoleEntry) => void expandedNodes: Set onToggleNode: (nodeId: string) => void }) { + const containerRef = useRef(null) + const listRef = useListRef(null) + const [listHeight, setListHeight] = useState(400) + + const rows = useMemo( + () => flattenVisibleExecutionRows(executionGroups, expandedNodes), + [executionGroups, expandedNodes] + ) + + useEffect(() => { + const container = containerRef.current + if (!container) return + + const updateHeight = () => { + if (container.clientHeight > 0) { + setListHeight(container.clientHeight) + } + } + + updateHeight() + const resizeObserver = new ResizeObserver(updateHeight) + resizeObserver.observe(container) + return () => resizeObserver.disconnect() + }, []) + + const rowsRef = useRef(rows) + rowsRef.current = rows + + useEffect(() => { + if (!selectedEntryId) return + + const currentRows = rowsRef.current + const rowIndex = currentRows.findIndex( + (row) => row.rowType === 'node' && row.node?.entry.id === selectedEntryId + ) + + if (rowIndex !== -1) { + listRef.current?.scrollToRow({ index: rowIndex, align: 'smart' }) + } + }, [selectedEntryId, listRef]) + + const rowProps = useMemo( + () => ({ + rows, + selectedEntryId, + onSelectEntry, + expandedNodes, + onToggleNode, + }), + [rows, selectedEntryId, onSelectEntry, expandedNodes, onToggleNode] + ) + return ( -
- {/* Separator between executions */} - {showSeparator &&
} - - {/* Entry tree */} -
- {group.entryTree.map((node) => ( - - ))} -
+
+
) }) @@ -560,7 +660,6 @@ const ExecutionGroupRow = memo(function ExecutionGroupRow({ */ export const Terminal = memo(function Terminal() { const terminalRef = useRef(null) - const logsContainerRef = useRef(null) const prevWorkflowEntriesLengthRef = useRef(0) const hasInitializedEntriesRef = useRef(false) const isTerminalFocusedRef = useRef(false) @@ -584,18 +683,15 @@ export const Terminal = memo(function Terminal() { ) const activeWorkflowId = useWorkflowRegistry((state) => state.activeWorkflowId) const hasConsoleHydrated = useTerminalConsoleStore((state) => state._hasHydrated) - - // Get all entries and filter in useMemo to avoid new array on every store update - const allStoreEntries = useTerminalConsoleStore((state) => state.entries) - const entries = useMemo(() => { - if (!hasConsoleHydrated) return [] - return allStoreEntries.filter((entry) => entry.workflowId === activeWorkflowId) - }, [allStoreEntries, activeWorkflowId, hasConsoleHydrated]) + const consoleWorkflowId: string | undefined = + hasConsoleHydrated && typeof activeWorkflowId === 'string' ? activeWorkflowId : undefined + const entries = useWorkflowConsoleEntries(consoleWorkflowId) const clearWorkflowConsole = useTerminalConsoleStore((state) => state.clearWorkflowConsole) const exportConsoleCSV = useTerminalConsoleStore((state) => state.exportConsoleCSV) - const [selectedEntry, setSelectedEntry] = useState(null) + const [selectedEntryId, setSelectedEntryId] = useState(null) + const selectedEntry = useConsoleEntry(selectedEntryId) const [expandedNodes, setExpandedNodes] = useState>(() => new Set()) const [isToggling, setIsToggling] = useState(false) const [showCopySuccess, setShowCopySuccess] = useState(false) @@ -677,6 +773,14 @@ export const Terminal = memo(function Terminal() { return result }, [executionGroups]) + const autoExpandNodeIds = useMemo(() => { + if (executionGroups.length === 0) { + return [] + } + + return collectExpandableNodeIds(executionGroups[0].entryTree) + }, [executionGroups]) + /** * Check if input data exists for selected entry */ @@ -706,11 +810,6 @@ export const Terminal = memo(function Terminal() { return selectedEntry.output }, [selectedEntry, showInput]) - const outputDataStringified = useMemo(() => { - if (outputData === null || outputData === undefined) return '' - return JSON.stringify(outputData, null, 2) - }, [outputData]) - // Keep refs in sync for keyboard handler selectedEntryRef.current = selectedEntry navigableEntriesRef.current = navigableEntries @@ -776,20 +875,20 @@ export const Terminal = memo(function Terminal() { * This always runs regardless of autoSelectEnabled - new runs should always be visible. */ useEffect(() => { - if (executionGroups.length === 0) return - - const nodeIdsToExpand = collectExpandableNodeIds(executionGroups[0].entryTree) + if (autoExpandNodeIds.length === 0) return - if (nodeIdsToExpand.length > 0) { + const rafId = requestAnimationFrame(() => { setExpandedNodes((prev) => { - const hasAll = nodeIdsToExpand.every((id) => prev.has(id)) + const hasAll = autoExpandNodeIds.every((id) => prev.has(id)) if (hasAll) return prev const next = new Set(prev) - nodeIdsToExpand.forEach((id) => next.add(id)) + autoExpandNodeIds.forEach((id) => next.add(id)) return next }) - } - }, [executionGroups]) + }) + + return () => cancelAnimationFrame(rafId) + }, [autoExpandNodeIds]) /** * Focus the terminal for keyboard navigation @@ -805,10 +904,10 @@ export const Terminal = memo(function Terminal() { const handleSelectEntry = useCallback( (entry: ConsoleEntry) => { focusTerminal() - setSelectedEntry((prev) => { + setSelectedEntryId((prev) => { // Disable auto-select on any manual selection/deselection setAutoSelectEnabled(false) - return prev?.id === entry.id ? null : entry + return prev === entry.id ? null : entry.id }) }, [focusTerminal] @@ -854,15 +953,17 @@ export const Terminal = memo(function Terminal() { const handleCopy = useCallback(() => { if (!selectedEntry) return - const textToCopy = shouldShowCodeDisplay ? selectedEntry.input.code : outputDataStringified + const textToCopy = shouldShowCodeDisplay + ? selectedEntry.input.code + : safeConsoleStringify(outputData) navigator.clipboard.writeText(textToCopy) setShowCopySuccess(true) - }, [selectedEntry, outputDataStringified, shouldShowCodeDisplay]) + }, [selectedEntry, outputData, shouldShowCodeDisplay]) const clearCurrentWorkflowConsole = useCallback(() => { if (activeWorkflowId) { clearWorkflowConsole(activeWorkflowId) - setSelectedEntry(null) + setSelectedEntryId(null) setExpandedNodes(new Set()) } }, [activeWorkflowId, clearWorkflowConsole]) @@ -990,19 +1091,10 @@ export const Terminal = memo(function Terminal() { } }, [showCopySuccess]) - const scrollEntryIntoView = useCallback((entryId: string) => { - const container = logsContainerRef.current - if (!container) return - const el = container.querySelector(`[data-entry-id="${entryId}"]`) - if (el) { - el.scrollIntoView({ block: 'nearest', behavior: 'smooth' }) - } - }, []) - useEffect(() => { if (executionGroups.length === 0 || navigableEntries.length === 0) { setAutoSelectEnabled(true) - setSelectedEntry(null) + setSelectedEntryId(null) return } @@ -1020,9 +1112,9 @@ export const Terminal = memo(function Terminal() { } if (!lastNavEntry) return - if (selectedEntry?.id === lastNavEntry.entry.id) return + if (selectedEntryId === lastNavEntry.entry.id) return - setSelectedEntry(lastNavEntry.entry) + setSelectedEntryId(lastNavEntry.entry.id) focusTerminal() if (lastNavEntry.parentNodeIds.length > 0) { @@ -1034,36 +1126,7 @@ export const Terminal = memo(function Terminal() { return next }) } - }, [executionGroups, navigableEntries, autoSelectEnabled, selectedEntry?.id, focusTerminal]) - - useEffect(() => { - if (selectedEntry) { - scrollEntryIntoView(selectedEntry.id) - } - }, [selectedEntry?.id, scrollEntryIntoView]) - - /** - * Sync selected entry with latest data from store. - * This ensures the output panel updates when a running block completes or is canceled. - */ - useEffect(() => { - if (!selectedEntry) return - - const updatedEntry = filteredEntries.find((e) => e.id === selectedEntry.id) - if (updatedEntry && updatedEntry !== selectedEntry) { - // Only update if the entry data has actually changed - const hasChanged = - updatedEntry.output !== selectedEntry.output || - updatedEntry.isRunning !== selectedEntry.isRunning || - updatedEntry.isCanceled !== selectedEntry.isCanceled || - updatedEntry.durationMs !== selectedEntry.durationMs || - updatedEntry.error !== selectedEntry.error || - updatedEntry.success !== selectedEntry.success - if (hasChanged) { - setSelectedEntry(updatedEntry) - } - } - }, [filteredEntries, selectedEntry]) + }, [executionGroups, navigableEntries, autoSelectEnabled, selectedEntryId, focusTerminal]) /** * Clear filters when there are no logs @@ -1080,7 +1143,7 @@ export const Terminal = memo(function Terminal() { const navigateToEntry = useCallback( (navEntry: NavigableBlockEntry) => { setAutoSelectEnabled(false) - setSelectedEntry(navEntry.entry) + setSelectedEntryId(navEntry.entry.id) // Auto-expand parent nodes (subflows, iterations) if (navEntry.parentNodeIds.length > 0) { @@ -1095,11 +1158,8 @@ export const Terminal = memo(function Terminal() { // Keep terminal focused for continued navigation focusTerminal() - - // Scroll entry into view if needed - scrollEntryIntoView(navEntry.entry.id) }, - [focusTerminal, scrollEntryIntoView] + [focusTerminal] ) /** @@ -1123,7 +1183,7 @@ export const Terminal = memo(function Terminal() { if (e.key === 'Escape') { if (currentEntry) { e.preventDefault() - setSelectedEntry(null) + setSelectedEntryId(null) setAutoSelectEnabled(true) } return @@ -1203,7 +1263,7 @@ export const Terminal = memo(function Terminal() { // Close output panel if there's not enough space for minimum width if (maxWidth < MIN_OUTPUT_PANEL_WIDTH_PX) { setAutoSelectEnabled(false) - setSelectedEntry(null) + setSelectedEntryId(null) return } @@ -1423,23 +1483,19 @@ export const Terminal = memo(function Terminal() {
{/* Execution list */} -
+
{executionGroups.length === 0 ? (
No logs yet
) : ( - executionGroups.map((group, index) => ( - 0} - selectedEntryId={selectedEntry?.id || null} - onSelectEntry={handleSelectEntry} - expandedNodes={expandedNodes} - onToggleNode={handleToggleNode} - /> - )) + )}
@@ -1461,11 +1517,10 @@ export const Terminal = memo(function Terminal() { handleTrainingClick={handleTrainingClick} showCopySuccess={showCopySuccess} handleCopy={handleCopy} - filteredEntries={filteredEntries} + hasEntries={filteredEntries.length > 0} handleExportConsole={handleExportConsole} handleClearConsole={handleClearConsole} shouldShowCodeDisplay={shouldShowCodeDisplay} - outputDataStringified={outputDataStringified} outputData={outputData} handleClearConsoleFromMenu={handleClearConsoleFromMenu} /> diff --git a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/terminal/utils.test.ts b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/terminal/utils.test.ts index c27f792aa3b..e7677a608a7 100644 --- a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/terminal/utils.test.ts +++ b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/terminal/utils.test.ts @@ -18,7 +18,12 @@ vi.mock('@/stores/constants', () => ({ })) import type { ConsoleEntry } from '@/stores/terminal' -import { buildEntryTree, type EntryNode, groupEntriesByExecution } from './utils' +import { + buildEntryTree, + type EntryNode, + flattenVisibleExecutionRows, + groupEntriesByExecution, +} from './utils' let entryCounter = 0 @@ -476,3 +481,72 @@ describe('groupEntriesByExecution', () => { expect(topLevelChild).toBeUndefined() }) }) + +describe('flattenVisibleExecutionRows', () => { + it('only includes children for expanded nodes', () => { + const childBlock = makeEntry({ + id: 'child', + blockId: 'child', + blockName: 'Child', + blockType: 'function', + executionId: 'exec-1', + executionOrder: 2, + }) + + const tree: EntryNode[] = [ + { + entry: makeEntry({ + id: 'workflow-parent', + blockId: 'workflow-parent', + blockName: 'Workflow Parent', + blockType: 'workflow', + executionId: 'exec-1', + executionOrder: 1, + }), + children: [{ entry: childBlock, children: [], nodeType: 'block' }], + nodeType: 'workflow', + }, + ] + + const rowsCollapsed = flattenVisibleExecutionRows( + [ + { + executionId: 'exec-1', + startTime: '2025-01-01T00:00:00Z', + endTime: '2025-01-01T00:00:01Z', + startTimeMs: 0, + endTimeMs: 1, + duration: 1, + status: 'success', + entries: [], + entryTree: tree, + }, + ], + new Set() + ) + + expect(rowsCollapsed).toHaveLength(1) + expect(rowsCollapsed[0].node?.entry.id).toBe('workflow-parent') + + const rowsExpanded = flattenVisibleExecutionRows( + [ + { + executionId: 'exec-1', + startTime: '2025-01-01T00:00:00Z', + endTime: '2025-01-01T00:00:01Z', + startTimeMs: 0, + endTimeMs: 1, + duration: 1, + status: 'success', + entries: [], + entryTree: tree, + }, + ], + new Set(['workflow-parent']) + ) + + expect(rowsExpanded).toHaveLength(2) + expect(rowsExpanded[1].node?.entry.id).toBe('child') + expect(rowsExpanded[1].depth).toBe(1) + }) +}) diff --git a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/terminal/utils.ts b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/terminal/utils.ts index 5f350f636d3..545eb662ba6 100644 --- a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/terminal/utils.ts +++ b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/terminal/utils.ts @@ -662,6 +662,14 @@ export interface NavigableBlockEntry { parentNodeIds: string[] } +export interface VisibleTerminalRow { + id: string + rowType: 'separator' | 'node' + executionId: string + depth: number + node?: EntryNode +} + /** * Flattens entry tree to only include actual block entries (not subflows/iterations). * Also tracks parent node IDs for auto-expanding when navigating. @@ -688,6 +696,50 @@ export function flattenBlockEntriesOnly( return result } +export function flattenVisibleExecutionRows( + executionGroups: ExecutionGroup[], + expandedNodes: Set +): VisibleTerminalRow[] { + const rows: VisibleTerminalRow[] = [] + + const appendNodeRows = (nodes: EntryNode[], executionId: string, depth: number) => { + for (const node of nodes) { + rows.push({ + id: `${executionId}:${node.entry.id}`, + rowType: 'node', + executionId, + depth, + node, + }) + + if ( + node.children.length > 0 && + (node.nodeType === 'subflow' || + node.nodeType === 'iteration' || + node.nodeType === 'workflow') && + expandedNodes.has(node.entry.id) + ) { + appendNodeRows(node.children, executionId, depth + 1) + } + } + } + + executionGroups.forEach((group, index) => { + if (index > 0) { + rows.push({ + id: `separator:${group.executionId}`, + rowType: 'separator', + executionId: group.executionId, + depth: 0, + }) + } + + appendNodeRows(group.entryTree, group.executionId, 0) + }) + + return rows +} + /** * Terminal height configuration constants */ @@ -695,4 +747,5 @@ export const TERMINAL_CONFIG = { NEAR_MIN_THRESHOLD: 40, BLOCK_COLUMN_WIDTH_PX: TERMINAL_BLOCK_COLUMN_WIDTH, HEADER_TEXT_CLASS: 'font-base text-[var(--text-icon)] text-small', + LOG_ROW_HEIGHT_PX: 32, } as const diff --git a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-workflow-execution.ts b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-workflow-execution.ts index 9233134b826..75c119565f2 100644 --- a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-workflow-execution.ts +++ b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-workflow-execution.ts @@ -36,7 +36,13 @@ import { useCurrentWorkflowExecution, useExecutionStore } from '@/stores/executi import { useNotificationStore } from '@/stores/notifications' import { useVariablesStore } from '@/stores/panel' import { useEnvironmentStore } from '@/stores/settings/environment' -import { useTerminalConsoleStore } from '@/stores/terminal' +import { + clearExecutionPointer, + consolePersistence, + loadExecutionPointer, + saveExecutionPointer, + useTerminalConsoleStore, +} from '@/stores/terminal' import { useWorkflowDiffStore } from '@/stores/workflow-diff' import { useWorkflowRegistry } from '@/stores/workflows/registry/store' import { mergeSubblockState } from '@/stores/workflows/utils' @@ -123,7 +129,20 @@ export function useWorkflowExecution() { useCurrentWorkflowExecution() const setCurrentExecutionId = useExecutionStore((s) => s.setCurrentExecutionId) const getCurrentExecutionId = useExecutionStore((s) => s.getCurrentExecutionId) - const setIsExecuting = useExecutionStore((s) => s.setIsExecuting) + const rawSetIsExecuting = useExecutionStore((s) => s.setIsExecuting) + + const setIsExecuting = useCallback( + (workflowId: string, executing: boolean) => { + if (executing) { + consolePersistence.executionStarted() + } else { + consolePersistence.executionEnded() + clearExecutionPointer(workflowId) + } + rawSetIsExecuting(workflowId, executing) + }, + [rawSetIsExecuting] + ) const setIsDebugging = useExecutionStore((s) => s.setIsDebugging) const setPendingBlocks = useExecutionStore((s) => s.setPendingBlocks) const setExecutor = useExecutionStore((s) => s.setExecutor) @@ -137,6 +156,7 @@ export function useWorkflowExecution() { const [executionResult, setExecutionResult] = useState(null) const executionStream = useExecutionStream() const currentChatExecutionIdRef = useRef(null) + const lastSeenEventIdRef = useRef(0) const isViewingDiff = useWorkflowDiffStore((state) => state.isShowingDiff) const addNotification = useNotificationStore((state) => state.addNotification) @@ -1010,8 +1030,17 @@ export function useWorkflowExecution() { onExecutionId: (id) => { executionIdRef.current = id setCurrentExecutionId(activeWorkflowId, id) + saveExecutionPointer({ + workflowId: activeWorkflowId, + executionId: id, + lastEventId: 0, + }) }, callbacks: { + onEventId: (eventId) => { + lastSeenEventIdRef.current = eventId + }, + onExecutionStarted: (data) => { logger.info('Server execution started:', data) }, @@ -1780,192 +1809,198 @@ export function useWorkflowExecution() { useEffect(() => { if (!activeWorkflowId || !hasHydrated) return - - const entries = useTerminalConsoleStore.getState().entries - const runningEntries = entries.filter( - (e) => e.isRunning && e.workflowId === activeWorkflowId && e.executionId - ) - if (runningEntries.length === 0) return - if (activeReconnections.has(activeWorkflowId)) return - activeReconnections.add(activeWorkflowId) - executionStream.cancel(activeWorkflowId) + let cleanupRan = false + let reconnectionComplete = false + const reconnectWorkflowId = activeWorkflowId - const sorted = [...runningEntries].sort((a, b) => { - const aTime = a.startedAt ? new Date(a.startedAt).getTime() : 0 - const bTime = b.startedAt ? new Date(b.startedAt).getTime() : 0 - return bTime - aTime - }) - const executionId = sorted[0].executionId! + const runReconnect = async () => { + let executionId: string | undefined + let fromEventId = 0 - const otherExecutionIds = new Set( - sorted.filter((e) => e.executionId !== executionId).map((e) => e.executionId!) - ) - if (otherExecutionIds.size > 0) { - cancelRunningEntries(activeWorkflowId) - } + try { + const pointer = await loadExecutionPointer(reconnectWorkflowId) + if (cleanupRan) return + if (pointer && pointer.executionId) { + executionId = pointer.executionId + fromEventId = pointer.lastEventId + } + } catch { + // fall through to console entries + } - setCurrentExecutionId(activeWorkflowId, executionId) - setIsExecuting(activeWorkflowId, true) - - const workflowEdges = useWorkflowStore.getState().edges - const activeBlocksSet = new Set() - const activeBlockRefCounts = new Map() - const accumulatedBlockLogs: BlockLog[] = [] - const accumulatedBlockStates = new Map() - const executedBlockIds = new Set() - - const executionIdRef = { current: executionId } - - const handlers = buildBlockEventHandlers({ - workflowId: activeWorkflowId, - executionIdRef, - workflowEdges, - activeBlocksSet, - activeBlockRefCounts, - accumulatedBlockLogs, - accumulatedBlockStates, - executedBlockIds, - consoleMode: 'update', - includeStartConsoleEntry: true, - }) + if (!executionId) { + const entries = useTerminalConsoleStore.getState().getWorkflowEntries(reconnectWorkflowId) + const runningEntries = entries.filter( + (e) => e.isRunning && e.workflowId === reconnectWorkflowId && e.executionId + ) + if (runningEntries.length === 0) return - const originalEntries = entries - .filter((e) => e.executionId === executionId) - .map((e) => ({ ...e })) + const sorted = [...runningEntries].sort((a, b) => { + const aTime = a.startedAt ? new Date(a.startedAt).getTime() : 0 + const bTime = b.startedAt ? new Date(b.startedAt).getTime() : 0 + return bTime - aTime + }) + executionId = sorted[0].executionId! - let cleared = false - let reconnectionComplete = false - let cleanupRan = false - const clearOnce = () => { - if (!cleared) { - cleared = true - clearExecutionEntries(executionId) + const otherExecutionIds = new Set( + sorted.filter((e) => e.executionId !== executionId).map((e) => e.executionId!) + ) + if (otherExecutionIds.size > 0) { + cancelRunningEntries(reconnectWorkflowId) + consolePersistence.persist() + } } - } - const reconnectWorkflowId = activeWorkflowId + if (!executionId || cleanupRan) return + if (activeReconnections.has(reconnectWorkflowId)) return + activeReconnections.add(reconnectWorkflowId) - executionStream - .reconnect({ + executionStream.cancel(reconnectWorkflowId) + setCurrentExecutionId(reconnectWorkflowId, executionId) + setIsExecuting(reconnectWorkflowId, true) + + const workflowEdges = useWorkflowStore.getState().edges + const activeBlocksSet = new Set() + const activeBlockRefCounts = new Map() + const accumulatedBlockLogs: BlockLog[] = [] + const accumulatedBlockStates = new Map() + const executedBlockIds = new Set() + const executionIdRef = { current: executionId } + + const handlers = buildBlockEventHandlers({ workflowId: reconnectWorkflowId, - executionId, - callbacks: { - onBlockStarted: (data) => { - clearOnce() - handlers.onBlockStarted(data) - }, - onBlockCompleted: (data) => { - clearOnce() - handlers.onBlockCompleted(data) - }, - onBlockError: (data) => { - clearOnce() - handlers.onBlockError(data) - }, - onBlockChildWorkflowStarted: (data) => { - clearOnce() - handlers.onBlockChildWorkflowStarted(data) - }, - onExecutionCompleted: () => { - const currentId = useExecutionStore - .getState() - .getCurrentExecutionId(reconnectWorkflowId) - if (currentId !== executionId) { - reconnectionComplete = true - activeReconnections.delete(reconnectWorkflowId) - return - } - clearOnce() - reconnectionComplete = true - activeReconnections.delete(reconnectWorkflowId) - setCurrentExecutionId(reconnectWorkflowId, null) - setIsExecuting(reconnectWorkflowId, false) - setActiveBlocks(reconnectWorkflowId, new Set()) - }, - onExecutionError: (data) => { - const currentId = useExecutionStore - .getState() - .getCurrentExecutionId(reconnectWorkflowId) - if (currentId !== executionId) { - reconnectionComplete = true - activeReconnections.delete(reconnectWorkflowId) - return - } - clearOnce() - reconnectionComplete = true - activeReconnections.delete(reconnectWorkflowId) - setCurrentExecutionId(reconnectWorkflowId, null) - setIsExecuting(reconnectWorkflowId, false) - setActiveBlocks(reconnectWorkflowId, new Set()) - handleExecutionErrorConsole({ - workflowId: reconnectWorkflowId, - executionId, - error: data.error, - blockLogs: accumulatedBlockLogs, - }) - }, - onExecutionCancelled: () => { - const currentId = useExecutionStore - .getState() - .getCurrentExecutionId(reconnectWorkflowId) - if (currentId !== executionId) { - reconnectionComplete = true - activeReconnections.delete(reconnectWorkflowId) - return - } - clearOnce() - reconnectionComplete = true - activeReconnections.delete(reconnectWorkflowId) + executionIdRef, + workflowEdges, + activeBlocksSet, + activeBlockRefCounts, + accumulatedBlockLogs, + accumulatedBlockStates, + executedBlockIds, + consoleMode: 'update', + includeStartConsoleEntry: true, + }) + + clearExecutionEntries(executionId) + + const capturedExecutionId = executionId + const MAX_ATTEMPTS = 5 + const BASE_DELAY_MS = 1000 + const MAX_DELAY_MS = 15000 + + const attemptReconnect = async (attempt: number): Promise => { + if (cleanupRan || reconnectionComplete) return + + if (attempt > 0) { + const delay = Math.min(BASE_DELAY_MS * 2 ** (attempt - 1), MAX_DELAY_MS) + await new Promise((resolve) => setTimeout(resolve, delay)) + if (cleanupRan || reconnectionComplete) return + } + + try { + await executionStream.reconnect({ + workflowId: reconnectWorkflowId, + executionId: capturedExecutionId, + fromEventId, + callbacks: { + onEventId: (eid) => { + fromEventId = eid + }, + onBlockStarted: handlers.onBlockStarted, + onBlockCompleted: handlers.onBlockCompleted, + onBlockError: handlers.onBlockError, + onBlockChildWorkflowStarted: handlers.onBlockChildWorkflowStarted, + onExecutionCompleted: () => { + const currentId = useExecutionStore + .getState() + .getCurrentExecutionId(reconnectWorkflowId) + if (currentId !== capturedExecutionId) { + reconnectionComplete = true + activeReconnections.delete(reconnectWorkflowId) + return + } + reconnectionComplete = true + activeReconnections.delete(reconnectWorkflowId) + setCurrentExecutionId(reconnectWorkflowId, null) + setIsExecuting(reconnectWorkflowId, false) + setActiveBlocks(reconnectWorkflowId, new Set()) + }, + onExecutionError: (data) => { + const currentId = useExecutionStore + .getState() + .getCurrentExecutionId(reconnectWorkflowId) + if (currentId !== capturedExecutionId) { + reconnectionComplete = true + activeReconnections.delete(reconnectWorkflowId) + return + } + reconnectionComplete = true + activeReconnections.delete(reconnectWorkflowId) + setCurrentExecutionId(reconnectWorkflowId, null) + setIsExecuting(reconnectWorkflowId, false) + setActiveBlocks(reconnectWorkflowId, new Set()) + handleExecutionErrorConsole({ + workflowId: reconnectWorkflowId, + executionId: capturedExecutionId, + error: data.error, + blockLogs: accumulatedBlockLogs, + }) + }, + onExecutionCancelled: () => { + const currentId = useExecutionStore + .getState() + .getCurrentExecutionId(reconnectWorkflowId) + if (currentId !== capturedExecutionId) { + reconnectionComplete = true + activeReconnections.delete(reconnectWorkflowId) + return + } + reconnectionComplete = true + activeReconnections.delete(reconnectWorkflowId) + setCurrentExecutionId(reconnectWorkflowId, null) + setIsExecuting(reconnectWorkflowId, false) + setActiveBlocks(reconnectWorkflowId, new Set()) + handleExecutionCancelledConsole({ + workflowId: reconnectWorkflowId, + executionId: capturedExecutionId, + }) + }, + }, + }) + } catch (error) { + logger.warn('Execution reconnection attempt failed', { + executionId: capturedExecutionId, + attempt, + error, + }) + if (!cleanupRan && !reconnectionComplete && attempt < MAX_ATTEMPTS) { + return attemptReconnect(attempt + 1) + } + } + + if (!reconnectionComplete && !cleanupRan) { + reconnectionComplete = true + activeReconnections.delete(reconnectWorkflowId) + const currentId = useExecutionStore.getState().getCurrentExecutionId(reconnectWorkflowId) + if (currentId === capturedExecutionId) { + cancelRunningEntries(reconnectWorkflowId) setCurrentExecutionId(reconnectWorkflowId, null) setIsExecuting(reconnectWorkflowId, false) setActiveBlocks(reconnectWorkflowId, new Set()) - handleExecutionCancelledConsole({ - workflowId: reconnectWorkflowId, - executionId, - }) - }, - }, - }) - .catch((error) => { - logger.warn('Execution reconnection failed', { executionId, error }) - }) - .finally(() => { - if (reconnectionComplete || cleanupRan) return - const currentId = useExecutionStore.getState().getCurrentExecutionId(reconnectWorkflowId) - if (currentId !== executionId) return - reconnectionComplete = true - activeReconnections.delete(reconnectWorkflowId) - clearExecutionEntries(executionId) - for (const entry of originalEntries) { - addConsole({ - workflowId: entry.workflowId, - blockId: entry.blockId, - blockName: entry.blockName, - blockType: entry.blockType, - executionId: entry.executionId, - executionOrder: entry.executionOrder, - isRunning: false, - warning: 'Execution result unavailable — check the logs page', - }) + } } - setCurrentExecutionId(reconnectWorkflowId, null) - setIsExecuting(reconnectWorkflowId, false) - setActiveBlocks(reconnectWorkflowId, new Set()) - }) + } + + await attemptReconnect(0) + } + + runReconnect() return () => { cleanupRan = true executionStream.cancel(reconnectWorkflowId) activeReconnections.delete(reconnectWorkflowId) - - if (cleared && !reconnectionComplete) { - clearExecutionEntries(executionId) - for (const entry of originalEntries) { - addConsole(entry) - } - } } // eslint-disable-next-line react-hooks/exhaustive-deps }, [activeWorkflowId, hasHydrated]) diff --git a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/utils/workflow-execution-utils.ts b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/utils/workflow-execution-utils.ts index c63345a1c88..3f27004a04e 100644 --- a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/utils/workflow-execution-utils.ts +++ b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/utils/workflow-execution-utils.ts @@ -19,7 +19,7 @@ const logger = createLogger('workflow-execution-utils') import { useExecutionStore } from '@/stores/execution' import type { ConsoleEntry, ConsoleUpdate } from '@/stores/terminal' -import { useTerminalConsoleStore } from '@/stores/terminal' +import { saveExecutionPointer, useTerminalConsoleStore } from '@/stores/terminal' import { useWorkflowRegistry } from '@/stores/workflows/registry/store' import { useWorkflowStore } from '@/stores/workflows/workflow/store' @@ -118,7 +118,7 @@ export interface BlockEventHandlerConfig { } export interface BlockEventHandlerDeps { - addConsole: (entry: Omit) => ConsoleEntry + addConsole: (entry: Omit) => ConsoleEntry | undefined updateConsole: (blockId: string, update: string | ConsoleUpdate, executionId?: string) => void setActiveBlocks: (workflowId: string, blocks: Set) => void setBlockRunStatus: (workflowId: string, blockId: string, status: 'success' | 'error') => void @@ -407,7 +407,7 @@ export function createBlockEventHandlers( return { onBlockStarted, onBlockCompleted, onBlockError, onBlockChildWorkflowStarted } } -type AddConsoleFn = (entry: Omit) => ConsoleEntry +type AddConsoleFn = (entry: Omit) => ConsoleEntry | undefined type CancelRunningEntriesFn = (workflowId: string) => void export interface ExecutionTimingFields { @@ -667,6 +667,7 @@ export async function executeWorkflowWithFullLogging( if (serverExecutionId) { executionIdRef.current = serverExecutionId setCurrentExecutionId(wfId, serverExecutionId) + saveExecutionPointer({ workflowId: wfId, executionId: serverExecutionId, lastEventId: 0 }) } let executionResult: ExecutionResult = { @@ -679,6 +680,16 @@ export async function executeWorkflowWithFullLogging( await processSSEStream( response.body.getReader(), { + onEventId: (eventId) => { + if (wfId && executionIdRef.current && eventId % 5 === 0) { + saveExecutionPointer({ + workflowId: wfId, + executionId: executionIdRef.current, + lastEventId: eventId, + }) + } + }, + onExecutionStarted: (data) => { logger.info('Execution started', { startTime: data.startTime }) }, diff --git a/apps/sim/hooks/queries/tasks.ts b/apps/sim/hooks/queries/tasks.ts index e667393f588..ea2a6cef4e3 100644 --- a/apps/sim/hooks/queries/tasks.ts +++ b/apps/sim/hooks/queries/tasks.ts @@ -133,13 +133,29 @@ export async function fetchChatHistory( chatId: string, signal?: AbortSignal ): Promise { - const response = await fetch(`/api/mothership/chats/${chatId}`, { signal }) + const mothershipRes = await fetch(`/api/mothership/chats/${chatId}`, { signal }) + + if (mothershipRes.ok) { + const { chat } = await mothershipRes.json() + return { + id: chat.id, + title: chat.title, + messages: Array.isArray(chat.messages) ? chat.messages : [], + activeStreamId: chat.conversationId || null, + resources: Array.isArray(chat.resources) ? chat.resources : [], + streamSnapshot: chat.streamSnapshot || null, + } + } - if (!response.ok) { + const copilotRes = await fetch(`/api/copilot/chat?chatId=${encodeURIComponent(chatId)}`, { + signal, + }) + + if (!copilotRes.ok) { throw new Error('Failed to load chat') } - const { chat } = await response.json() + const { chat } = await copilotRes.json() return { id: chat.id, title: chat.title, diff --git a/apps/sim/hooks/use-execution-stream.ts b/apps/sim/hooks/use-execution-stream.ts index 36fd801db63..1198a179bea 100644 --- a/apps/sim/hooks/use-execution-stream.ts +++ b/apps/sim/hooks/use-execution-stream.ts @@ -63,6 +63,10 @@ export async function processSSEStream( try { const event = JSON.parse(data) as ExecutionEvent + if (event.eventId != null) { + callbacks.onEventId?.(event.eventId) + } + switch (event.type) { case 'execution:started': callbacks.onExecutionStarted?.(event.data) @@ -118,6 +122,7 @@ export interface ExecutionStreamCallbacks { onBlockChildWorkflowStarted?: (data: BlockChildWorkflowStartedData) => void onStreamChunk?: (data: StreamChunkData) => void onStreamDone?: (data: StreamDoneData) => void + onEventId?: (eventId: number) => void } export interface ExecuteStreamOptions { diff --git a/apps/sim/lib/copilot/chat-streaming.ts b/apps/sim/lib/copilot/chat-streaming.ts index 46d9ff758fc..76ae305a9f8 100644 --- a/apps/sim/lib/copilot/chat-streaming.ts +++ b/apps/sim/lib/copilot/chat-streaming.ts @@ -290,6 +290,7 @@ export function createSSEStream(params: StreamingOrchestrationParams): ReadableS let clientDisconnected = false const abortController = new AbortController() const userStopController = new AbortController() + const clientDisconnectedController = new AbortController() activeStreams.set(streamId, { abortController, userStopController }) if (chatId && !pendingChatStreamAlreadyRegistered) { @@ -302,6 +303,9 @@ export function createSSEStream(params: StreamingOrchestrationParams): ReadableS const markClientDisconnected = (reason: string) => { if (clientDisconnected) return clientDisconnected = true + if (!clientDisconnectedController.signal.aborted) { + clientDisconnectedController.abort() + } logger.info( appendCopilotLogContext('Client disconnected from live SSE stream', { requestId, @@ -456,6 +460,7 @@ export function createSSEStream(params: StreamingOrchestrationParams): ReadableS runId, abortSignal: abortController.signal, userStopSignal: userStopController.signal, + clientDisconnectedSignal: clientDisconnectedController.signal, onEvent: async (event) => { await pushEvent(event) }, @@ -616,7 +621,12 @@ export function createSSEStream(params: StreamingOrchestrationParams): ReadableS runId, } ) - clientDisconnected = true + if (!clientDisconnected) { + clientDisconnected = true + if (!clientDisconnectedController.signal.aborted) { + clientDisconnectedController.abort() + } + } if (eventWriter) { eventWriter.flush().catch(() => {}) } diff --git a/apps/sim/lib/copilot/client-sse/handlers.ts b/apps/sim/lib/copilot/client-sse/handlers.ts index 2bb6bc3e318..e812311d5f3 100644 --- a/apps/sim/lib/copilot/client-sse/handlers.ts +++ b/apps/sim/lib/copilot/client-sse/handlers.ts @@ -21,7 +21,6 @@ import { useWorkflowDiffStore } from '@/stores/workflow-diff/store' import { captureBaselineSnapshot } from '@/stores/workflow-diff/utils' import { useWorkflowRegistry } from '@/stores/workflows/registry/store' import type { WorkflowState } from '@/stores/workflows/workflow/types' -import { executeRunToolOnClient } from './run-tool-execution' import type { ClientContentBlock, ClientStreamingContext } from './types' const logger = createLogger('CopilotClientSseHandlers') @@ -988,10 +987,6 @@ export const sseHandlers: Record = { return } - if (clientExecutable && initialState === ClientToolCallState.executing) { - executeRunToolOnClient(id, toolName, args || existing?.params || {}) - } - if (toolName === 'oauth_request_access' && args && typeof window !== 'undefined') { try { window.dispatchEvent( diff --git a/apps/sim/lib/copilot/client-sse/run-tool-execution.test.ts b/apps/sim/lib/copilot/client-sse/run-tool-execution.test.ts index 00c306ac235..395dd25741e 100644 --- a/apps/sim/lib/copilot/client-sse/run-tool-execution.test.ts +++ b/apps/sim/lib/copilot/client-sse/run-tool-execution.test.ts @@ -35,6 +35,16 @@ vi.mock('@/stores/workflows/registry/store', () => ({ }, })) +vi.mock('@/stores/terminal', () => ({ + consolePersistence: { + executionStarted: vi.fn(), + executionEnded: vi.fn(), + persist: vi.fn(), + }, + saveExecutionPointer: vi.fn(), + clearExecutionPointer: vi.fn(), +})) + import { cancelRunToolExecution, executeRunToolOnClient, diff --git a/apps/sim/lib/copilot/client-sse/run-tool-execution.ts b/apps/sim/lib/copilot/client-sse/run-tool-execution.ts index 2d28d149beb..c289202d505 100644 --- a/apps/sim/lib/copilot/client-sse/run-tool-execution.ts +++ b/apps/sim/lib/copilot/client-sse/run-tool-execution.ts @@ -4,6 +4,7 @@ import { COPILOT_CONFIRM_API_PATH } from '@/lib/copilot/constants' import { ClientToolCallState } from '@/lib/copilot/tools/client/tool-display-registry' import { executeWorkflowWithFullLogging } from '@/app/workspace/[workspaceId]/w/[workflowId]/utils/workflow-execution-utils' import { useExecutionStore } from '@/stores/execution/store' +import { clearExecutionPointer, consolePersistence, saveExecutionPointer } from '@/stores/terminal' import { useWorkflowRegistry } from '@/stores/workflows/registry/store' const logger = createLogger('CopilotRunToolExecution') @@ -148,11 +149,33 @@ async function doExecuteRunTool( const abortController = new AbortController() activeRunAbortByWorkflowId.set(targetWorkflowId, abortController) + consolePersistence.executionStarted() setIsExecuting(targetWorkflowId, true) const executionId = uuidv4() setCurrentExecutionId(targetWorkflowId, executionId) + saveExecutionPointer({ workflowId: targetWorkflowId, executionId, lastEventId: 0 }) const executionStartTime = new Date().toISOString() + const onPageHide = () => { + if (manuallyStoppedToolCallIds.has(toolCallId)) return + navigator.sendBeacon( + COPILOT_CONFIRM_API_PATH, + new Blob( + [ + JSON.stringify({ + toolCallId, + status: 'background', + message: 'Client disconnected, execution continuing server-side', + }), + ], + { type: 'application/json' } + ) + ) + } + if (typeof window !== 'undefined') { + window.addEventListener('pagehide', onPageHide) + } + logger.info('[RunTool] Starting client-side workflow execution', { toolCallId, toolName, @@ -230,6 +253,9 @@ async function doExecuteRunTool( await reportCompletion(toolCallId, 'error', msg) } } finally { + if (typeof window !== 'undefined') { + window.removeEventListener('pagehide', onPageHide) + } manuallyStoppedToolCallIds.delete(toolCallId) const activeToolCallId = activeRunToolByWorkflowId.get(targetWorkflowId) if (activeToolCallId === toolCallId) { @@ -241,6 +267,8 @@ async function doExecuteRunTool( } const { setCurrentExecutionId: clearExecId } = useExecutionStore.getState() clearExecId(targetWorkflowId, null) + clearExecutionPointer(targetWorkflowId) + consolePersistence.executionEnded() setIsExecuting(targetWorkflowId, false) } } diff --git a/apps/sim/lib/copilot/orchestrator/sse/handlers/handlers.ts b/apps/sim/lib/copilot/orchestrator/sse/handlers/handlers.ts index 1b242eef77a..3732fed983e 100644 --- a/apps/sim/lib/copilot/orchestrator/sse/handlers/handlers.ts +++ b/apps/sim/lib/copilot/orchestrator/sse/handlers/handlers.ts @@ -25,6 +25,25 @@ import { executeToolAndReport, waitForToolCompletion } from './tool-execution' const logger = createLogger('CopilotSseHandlers') +/** + * Builds an AbortSignal that fires when either the main abort signal OR + * the client-disconnect signal fires. Used for client-executable tool waits + * so the orchestrator doesn't block for the full timeout when the browser dies. + */ +function buildClientToolAbortSignal(options: OrchestratorOptions): AbortSignal | undefined { + const { abortSignal, clientDisconnectedSignal } = options + if (!clientDisconnectedSignal || clientDisconnectedSignal.aborted) { + return clientDisconnectedSignal?.aborted ? AbortSignal.abort() : abortSignal + } + if (!abortSignal) return clientDisconnectedSignal + + const combined = new AbortController() + const fire = () => combined.abort() + abortSignal.addEventListener('abort', fire, { once: true }) + clientDisconnectedSignal.addEventListener('abort', fire, { once: true }) + return combined.signal +} + function registerPendingToolPromise( context: StreamingContext, toolCallId: string, @@ -538,10 +557,11 @@ export const sseHandlers: Record = { } ) }) + const clientWaitSignal = buildClientToolAbortSignal(options) const completion = await waitForToolCompletion( toolCallId, options.timeout || STREAM_TIMEOUT_MS, - options.abortSignal + clientWaitSignal ) handleClientCompletion(toolCall, toolCallId, completion, context) await emitSyntheticToolResult(toolCallId, toolCall.name, completion, options, context) @@ -794,10 +814,11 @@ export const subAgentHandlers: Record = { } ) }) + const clientWaitSignal = buildClientToolAbortSignal(options) const completion = await waitForToolCompletion( toolCallId, options.timeout || STREAM_TIMEOUT_MS, - options.abortSignal + clientWaitSignal ) handleClientCompletion(toolCall, toolCallId, completion, context) await emitSyntheticToolResult(toolCallId, toolCall.name, completion, options, context) diff --git a/apps/sim/lib/copilot/orchestrator/types.ts b/apps/sim/lib/copilot/orchestrator/types.ts index 55cc2ba6331..b5a9e412f8e 100644 --- a/apps/sim/lib/copilot/orchestrator/types.ts +++ b/apps/sim/lib/copilot/orchestrator/types.ts @@ -166,6 +166,12 @@ export interface OrchestratorOptions { abortSignal?: AbortSignal /** Fires only on explicit user stop, never on passive transport disconnect. */ userStopSignal?: AbortSignal + /** + * Fires when the SSE client disconnects (tab close, navigation, etc.). + * Used to short-circuit `waitForToolCompletion` for client-executable tools + * so the orchestrator doesn't block for the full 60-min timeout. + */ + clientDisconnectedSignal?: AbortSignal interactive?: boolean } diff --git a/apps/sim/lib/workflows/diff/diff-engine.test.ts b/apps/sim/lib/workflows/diff/diff-engine.test.ts index 125469bf6cc..04cf3d18197 100644 --- a/apps/sim/lib/workflows/diff/diff-engine.test.ts +++ b/apps/sim/lib/workflows/diff/diff-engine.test.ts @@ -212,6 +212,30 @@ describe('WorkflowDiffEngine', () => { expect(result.diff?.diffAnalysis?.edited_blocks).not.toContain('block-1') }) }) + + describe('parent scope changes', () => { + it.concurrent('should detect when a block moves into a subflow', async () => { + const freshEngine = new WorkflowDiffEngine() + const baseline = createMockWorkflowState({ + 'block-1': createMockBlock({ id: 'block-1' }), + }) + + const proposed = createMockWorkflowState({ + 'block-1': createMockBlock({ + id: 'block-1', + data: { parentId: 'loop-1', extent: 'parent' }, + }), + }) + + const result = await freshEngine.createDiffFromWorkflowState(proposed, undefined, baseline) + + expect(result.success).toBe(true) + expect(result.diff?.diffAnalysis?.edited_blocks).toContain('block-1') + expect(result.diff?.diffAnalysis?.field_diffs?.['block-1']?.changed_fields).toContain( + 'parentId' + ) + }) + }) }) describe('diff lifecycle', () => { diff --git a/apps/sim/lib/workflows/diff/diff-engine.ts b/apps/sim/lib/workflows/diff/diff-engine.ts index 398858cdb54..9a600e90d88 100644 --- a/apps/sim/lib/workflows/diff/diff-engine.ts +++ b/apps/sim/lib/workflows/diff/diff-engine.ts @@ -18,6 +18,7 @@ function hasBlockChanged(currentBlock: BlockState, proposedBlock: BlockState): b if (currentBlock.enabled !== proposedBlock.enabled) return true if (currentBlock.triggerMode !== proposedBlock.triggerMode) return true if (!!currentBlock.locked !== !!proposedBlock.locked) return true + if ((currentBlock.data?.parentId ?? null) !== (proposedBlock.data?.parentId ?? null)) return true // Compare subBlocks const currentSubKeys = Object.keys(currentBlock.subBlocks || {}) @@ -48,17 +49,34 @@ function computeFieldDiff( const unchangedFields: string[] = [] // Check basic fields - const fieldsToCheck = ['type', 'name', 'enabled', 'triggerMode', 'horizontalHandles'] as const + const fieldsToCheck = [ + 'type', + 'name', + 'enabled', + 'triggerMode', + 'horizontalHandles', + 'locked', + ] as const for (const field of fieldsToCheck) { const currentValue = currentBlock[field] const proposedValue = proposedBlock[field] - if (JSON.stringify(currentValue) !== JSON.stringify(proposedValue)) { + if ( + field === 'locked' + ? !!currentValue !== !!proposedValue + : JSON.stringify(currentValue) !== JSON.stringify(proposedValue) + ) { changedFields.push(field) } else if (currentValue !== undefined) { unchangedFields.push(field) } } + if ((currentBlock.data?.parentId ?? null) !== (proposedBlock.data?.parentId ?? null)) { + changedFields.push('parentId') + } else { + unchangedFields.push('parentId') + } + // Check subBlocks - use just the key name for UI compatibility const currentSubKeys = Object.keys(currentBlock.subBlocks || {}) const proposedSubKeys = Object.keys(proposedBlock.subBlocks || {}) diff --git a/apps/sim/lib/workflows/executor/execution-events.ts b/apps/sim/lib/workflows/executor/execution-events.ts index 2a2c06d4016..e87f4d5971f 100644 --- a/apps/sim/lib/workflows/executor/execution-events.ts +++ b/apps/sim/lib/workflows/executor/execution-events.ts @@ -24,6 +24,7 @@ export interface BaseExecutionEvent { type: ExecutionEventType timestamp: string executionId: string + eventId?: number } /** diff --git a/apps/sim/stores/index.ts b/apps/sim/stores/index.ts index dbf926c02ed..4ae3c335f07 100644 --- a/apps/sim/stores/index.ts +++ b/apps/sim/stores/index.ts @@ -5,7 +5,7 @@ import { createLogger } from '@sim/logger' import { useExecutionStore } from '@/stores/execution' import { useVariablesStore } from '@/stores/panel' import { useEnvironmentStore } from '@/stores/settings/environment' -import { useTerminalConsoleStore } from '@/stores/terminal' +import { consolePersistence, useTerminalConsoleStore } from '@/stores/terminal' import { useWorkflowRegistry } from '@/stores/workflows/registry/store' import { useSubBlockStore } from '@/stores/workflows/subblock/store' import { useWorkflowStore } from '@/stores/workflows/workflow/store' @@ -217,7 +217,13 @@ export const resetAllStores = () => { useSubBlockStore.getState().clear() useEnvironmentStore.getState().reset() useExecutionStore.getState().reset() - useTerminalConsoleStore.setState({ entries: [], isOpen: false }) + useTerminalConsoleStore.setState({ + workflowEntries: {}, + entryIdsByBlockExecution: {}, + entryLocationById: {}, + isOpen: false, + }) + consolePersistence.persist() // Custom tools are managed by React Query cache, not a Zustand store // Variables store has no tracking to reset; registry hydrates } diff --git a/apps/sim/stores/terminal/console/index.ts b/apps/sim/stores/terminal/console/index.ts index d2b6679543c..af22e59bb0c 100644 --- a/apps/sim/stores/terminal/console/index.ts +++ b/apps/sim/stores/terminal/console/index.ts @@ -1,3 +1,18 @@ -export { indexedDBStorage } from './storage' -export { useTerminalConsoleStore } from './store' +export { + clearExecutionPointer, + consolePersistence, + type ExecutionPointer, + loadExecutionPointer, + saveExecutionPointer, +} from './storage' +export { useConsoleEntry, useTerminalConsoleStore, useWorkflowConsoleEntries } from './store' export type { ConsoleEntry, ConsoleStore, ConsoleUpdate } from './types' +export { + normalizeConsoleError, + normalizeConsoleInput, + normalizeConsoleOutput, + safeConsoleStringify, + TERMINAL_CONSOLE_LIMITS, + trimConsoleEntries, + trimWorkflowConsoleEntries, +} from './utils' diff --git a/apps/sim/stores/terminal/console/storage.ts b/apps/sim/stores/terminal/console/storage.ts index 1a809648f88..35f3eb91167 100644 --- a/apps/sim/stores/terminal/console/storage.ts +++ b/apps/sim/stores/terminal/console/storage.ts @@ -1,6 +1,6 @@ import { createLogger } from '@sim/logger' -import { del, get, set } from 'idb-keyval' -import type { StateStorage } from 'zustand/middleware' +import { get, set } from 'idb-keyval' +import type { ConsoleEntry } from './types' const logger = createLogger('ConsoleStorage') @@ -8,15 +8,22 @@ const STORE_KEY = 'terminal-console-store' const MIGRATION_KEY = 'terminal-console-store-migrated' /** - * Promise that resolves when migration is complete. - * Used to ensure getItem waits for migration before reading. + * Interval for persisting terminal state during active executions. + * Kept short enough that a hard refresh during execution still has + * recent running entries persisted for the reconnect flow to find. */ -let migrationPromise: Promise | null = null +const EXECUTION_PERSIST_INTERVAL_MS = 5_000 /** - * Migrates existing console data from localStorage to IndexedDB. - * Runs once on first load, then marks migration as complete. + * Shape of terminal console data persisted to IndexedDB. */ +export interface PersistedConsoleData { + workflowEntries: Record + isOpen: boolean +} + +let migrationPromise: Promise | null = null + async function migrateFromLocalStorage(): Promise { if (typeof window === 'undefined') return @@ -43,39 +50,189 @@ if (typeof window !== 'undefined') { }) } -export const indexedDBStorage: StateStorage = { - getItem: async (name: string): Promise => { - if (typeof window === 'undefined') return null +/** + * Loads persisted console data from IndexedDB. + * Handles three historical storage formats: + * 1. Zustand persist wrapper: `{ state: { entries: [...] }, version }` (original flat format) + * 2. Zustand persist wrapper: `{ state: { workflowEntries: {...} }, version }` (refactored format) + * 3. Raw data: `{ workflowEntries: {...}, isOpen }` (current format) + */ +export async function loadConsoleData(): Promise { + if (typeof window === 'undefined') return null - // Ensure migration completes before reading - if (migrationPromise) { - await migrationPromise + if (migrationPromise) { + await migrationPromise + } + + try { + const raw = await get(STORE_KEY) + if (!raw) return null + + const parsed = typeof raw === 'string' ? JSON.parse(raw) : raw + if (!parsed || typeof parsed !== 'object') return null + + const data = parsed.state ?? parsed + + if (Array.isArray(data.entries) && !data.workflowEntries) { + const workflowEntries: Record = {} + for (const entry of data.entries) { + if (!entry?.workflowId) continue + const wfId = entry.workflowId + if (!workflowEntries[wfId]) workflowEntries[wfId] = [] + workflowEntries[wfId].push(entry) + } + return { workflowEntries, isOpen: Boolean(data.isOpen) } } - try { - const value = await get(name) - return value ?? null - } catch (error) { - logger.warn('IndexedDB read failed', { name, error }) - return null + return { + workflowEntries: data.workflowEntries ?? {}, + isOpen: Boolean(data.isOpen), } - }, + } catch (error) { + logger.warn('Failed to load console data from IndexedDB', { error }) + return null + } +} + +let writeSequence = 0 +let activeWrite: Promise | null = null - setItem: async (name: string, value: string): Promise => { - if (typeof window === 'undefined') return +function writeToIndexedDB(data: PersistedConsoleData): void { + const seq = ++writeSequence + + const doWrite = async () => { try { - await set(name, value) + const serialized = JSON.stringify(data) + if (seq !== writeSequence) return + await set(STORE_KEY, serialized) } catch (error) { - logger.warn('IndexedDB write failed', { name, error }) + logger.warn('IndexedDB write failed', { error }) } - }, + } - removeItem: async (name: string): Promise => { - if (typeof window === 'undefined') return - try { - await del(name) - } catch (error) { - logger.warn('IndexedDB delete failed', { name, error }) + activeWrite = (activeWrite ?? Promise.resolve()).then(doWrite) +} + +/** + * Execution-aware persistence manager for the terminal console store. + * + * Writes happen only at meaningful lifecycle boundaries: + * - When an execution ends (success, error, cancel) + * - On explicit user actions (clear console) + * - On page hide (crash safety) + * - Every 30s during very long active executions (safety net) + * + * During normal execution, no serialization or IndexedDB writes occur, + * keeping the hot path completely free of persistence overhead. + */ +class ConsolePersistenceManager { + private dataProvider: (() => PersistedConsoleData) | null = null + private safetyTimer: ReturnType | null = null + private activeExecutions = 0 + private needsInitialPersist = false + + /** + * Binds the data provider function used to snapshot current state. + * Called once during store initialization. + */ + bind(provider: () => PersistedConsoleData): void { + this.dataProvider = provider + } + + /** + * Signals that a workflow execution has started. + * Starts the long-execution safety-net timer if this is the first active execution. + */ + executionStarted(): void { + this.activeExecutions++ + this.needsInitialPersist = true + if (this.activeExecutions === 1) { + this.startSafetyTimer() } - }, + } + + /** + * Called by the store when a running entry is added during an active execution. + * Triggers one immediate persist so the reconnect flow can find running entries + * after a page refresh, then disables until the next execution starts. + */ + onRunningEntryAdded(): void { + if (!this.needsInitialPersist) return + this.needsInitialPersist = false + this.persist() + } + + /** + * Signals that a workflow execution has ended (success, error, or cancel). + * Triggers an immediate persist and stops the safety timer if no executions remain. + */ + executionEnded(): void { + this.activeExecutions = Math.max(0, this.activeExecutions - 1) + this.persist() + if (this.activeExecutions === 0) { + this.stopSafetyTimer() + } + } + + /** + * Triggers an immediate persist. Used for explicit user actions + * like clearing the console, and for page-hide durability. + */ + persist(): void { + if (!this.dataProvider) return + writeToIndexedDB(this.dataProvider()) + } + + private startSafetyTimer(): void { + this.stopSafetyTimer() + this.safetyTimer = setInterval(() => { + this.persist() + }, EXECUTION_PERSIST_INTERVAL_MS) + } + + private stopSafetyTimer(): void { + if (this.safetyTimer !== null) { + clearInterval(this.safetyTimer) + this.safetyTimer = null + } + } +} + +export const consolePersistence = new ConsolePersistenceManager() + +const EXEC_POINTER_PREFIX = 'terminal-active-execution:' + +/** + * Lightweight pointer to an in-flight execution, persisted immediately on + * execution start so the reconnect flow can find it even if no console + * entries have been written yet. Keyed per-workflow so multiple tabs + * running different workflows don't overwrite each other. + */ +export interface ExecutionPointer { + workflowId: string + executionId: string + lastEventId: number +} + +export async function loadExecutionPointer(workflowId: string): Promise { + if (typeof window === 'undefined') return null + try { + const raw = await get(`${EXEC_POINTER_PREFIX}${workflowId}`) + if (!raw) return null + const parsed = typeof raw === 'string' ? JSON.parse(raw) : raw + if (!parsed?.executionId) return null + return parsed as ExecutionPointer + } catch { + return null + } +} + +export function saveExecutionPointer(pointer: ExecutionPointer): void { + if (typeof window === 'undefined') return + set(`${EXEC_POINTER_PREFIX}${pointer.workflowId}`, JSON.stringify(pointer)).catch(() => {}) +} + +export function clearExecutionPointer(workflowId: string): void { + if (typeof window === 'undefined') return + set(`${EXEC_POINTER_PREFIX}${workflowId}`, '').catch(() => {}) } diff --git a/apps/sim/stores/terminal/console/store.test.ts b/apps/sim/stores/terminal/console/store.test.ts new file mode 100644 index 00000000000..50b836a7283 --- /dev/null +++ b/apps/sim/stores/terminal/console/store.test.ts @@ -0,0 +1,115 @@ +/** + * @vitest-environment node + */ +import { beforeEach, describe, expect, it } from 'vitest' +import { useTerminalConsoleStore } from '@/stores/terminal/console/store' + +describe('terminal console store', () => { + beforeEach(() => { + useTerminalConsoleStore.setState({ + workflowEntries: {}, + entryIdsByBlockExecution: {}, + entryLocationById: {}, + isOpen: false, + _hasHydrated: true, + }) + }) + + it('normalizes oversized payloads when adding console entries', () => { + useTerminalConsoleStore.getState().addConsole({ + workflowId: 'wf-1', + blockId: 'block-1', + blockName: 'Function', + blockType: 'function', + executionId: 'exec-1', + executionOrder: 1, + output: { + a: 'x'.repeat(100_000), + b: 'y'.repeat(100_000), + c: 'z'.repeat(100_000), + d: 'q'.repeat(100_000), + e: 'r'.repeat(100_000), + f: 's'.repeat(100_000), + }, + }) + + const [entry] = useTerminalConsoleStore.getState().getWorkflowEntries('wf-1') + + expect(entry.output).toMatchObject({ + __simTruncated: true, + }) + }) + + it('normalizes oversized replaceOutput updates', () => { + useTerminalConsoleStore.getState().addConsole({ + workflowId: 'wf-1', + blockId: 'block-1', + blockName: 'Function', + blockType: 'function', + executionId: 'exec-1', + executionOrder: 1, + output: { ok: true }, + }) + + useTerminalConsoleStore.getState().updateConsole( + 'block-1', + { + executionOrder: 1, + replaceOutput: { + a: 'x'.repeat(100_000), + b: 'y'.repeat(100_000), + c: 'z'.repeat(100_000), + d: 'q'.repeat(100_000), + e: 'r'.repeat(100_000), + f: 's'.repeat(100_000), + }, + }, + 'exec-1' + ) + + const [entry] = useTerminalConsoleStore.getState().getWorkflowEntries('wf-1') + + expect(entry.output).toMatchObject({ + __simTruncated: true, + }) + }) + + it('updates one workflow without replacing unrelated workflow arrays', () => { + useTerminalConsoleStore.getState().addConsole({ + workflowId: 'wf-1', + blockId: 'block-1', + blockName: 'Function', + blockType: 'function', + executionId: 'exec-1', + executionOrder: 1, + output: { ok: true }, + }) + + useTerminalConsoleStore.getState().addConsole({ + workflowId: 'wf-2', + blockId: 'block-2', + blockName: 'Function', + blockType: 'function', + executionId: 'exec-2', + executionOrder: 1, + output: { ok: true }, + }) + + const before = useTerminalConsoleStore.getState() + const workflowTwoEntries = before.workflowEntries['wf-2'] + + useTerminalConsoleStore.getState().updateConsole( + 'block-1', + { + executionOrder: 1, + replaceOutput: { status: 'updated' }, + }, + 'exec-1' + ) + + const after = useTerminalConsoleStore.getState() + + expect(after.workflowEntries['wf-2']).toBe(workflowTwoEntries) + expect(after.getWorkflowEntries('wf-1')[0].output).toMatchObject({ status: 'updated' }) + }) +}) diff --git a/apps/sim/stores/terminal/console/store.ts b/apps/sim/stores/terminal/console/store.ts index 7479ca0d6c6..55f388b465b 100644 --- a/apps/sim/stores/terminal/console/store.ts +++ b/apps/sim/stores/terminal/console/store.ts @@ -1,22 +1,30 @@ import { createLogger } from '@sim/logger' import { create } from 'zustand' -import { createJSONStorage, devtools, persist } from 'zustand/middleware' +import { devtools } from 'zustand/middleware' +import { useShallow } from 'zustand/react/shallow' import { redactApiKeys } from '@/lib/core/security/redaction' import { getQueryClient } from '@/app/_shell/providers/query-provider' import type { NormalizedBlockOutput } from '@/executor/types' import { type GeneralSettings, generalSettingsKeys } from '@/hooks/queries/general-settings' import { useExecutionStore } from '@/stores/execution' import { useNotificationStore } from '@/stores/notifications' -import { indexedDBStorage } from '@/stores/terminal/console/storage' -import type { ConsoleEntry, ConsoleStore, ConsoleUpdate } from '@/stores/terminal/console/types' +import { consolePersistence, loadConsoleData } from '@/stores/terminal/console/storage' +import type { + ConsoleEntry, + ConsoleEntryLocation, + ConsoleStore, + ConsoleUpdate, +} from '@/stores/terminal/console/types' +import { + normalizeConsoleError, + normalizeConsoleInput, + normalizeConsoleOutput, + safeConsoleStringify, + trimWorkflowConsoleEntries, +} from '@/stores/terminal/console/utils' const logger = createLogger('TerminalConsoleStore') - -/** - * Maximum number of console entries to keep per workflow. - * Keeps the stored data size reasonable and improves performance. - */ -const MAX_ENTRIES_PER_WORKFLOW = 5000 +const EMPTY_CONSOLE_ENTRIES: ConsoleEntry[] = [] const updateBlockOutput = ( existingOutput: NormalizedBlockOutput | undefined, @@ -62,6 +70,9 @@ const shouldSkipEntry = (output: any): boolean => { return false } +const getBlockExecutionKey = (blockId: string, executionId?: string): string => + `${executionId ?? 'no-execution'}:${blockId}` + const matchesEntryForUpdate = ( entry: ConsoleEntry, blockId: string, @@ -101,6 +112,122 @@ const matchesEntryForUpdate = ( return true } +function cloneWorkflowEntries( + workflowEntries: Record +): Record { + return { ...workflowEntries } +} + +function removeWorkflowIndexes( + workflowId: string, + entries: ConsoleEntry[], + entryIdsByBlockExecution: Record, + entryLocationById: Record +): void { + for (const entry of entries) { + delete entryLocationById[entry.id] + const blockExecutionKey = getBlockExecutionKey(entry.blockId, entry.executionId) + const existingIds = entryIdsByBlockExecution[blockExecutionKey] + if (!existingIds) { + continue + } + + const nextIds = existingIds.filter((entryId) => entryId !== entry.id) + if (nextIds.length === 0) { + delete entryIdsByBlockExecution[blockExecutionKey] + } else { + entryIdsByBlockExecution[blockExecutionKey] = nextIds + } + } +} + +function indexWorkflowEntries( + workflowId: string, + entries: ConsoleEntry[], + entryIdsByBlockExecution: Record, + entryLocationById: Record +): void { + entries.forEach((entry, index) => { + entryLocationById[entry.id] = { workflowId, index } + const blockExecutionKey = getBlockExecutionKey(entry.blockId, entry.executionId) + const existingIds = entryIdsByBlockExecution[blockExecutionKey] + if (existingIds) { + entryIdsByBlockExecution[blockExecutionKey] = [...existingIds, entry.id] + } else { + entryIdsByBlockExecution[blockExecutionKey] = [entry.id] + } + }) +} + +function rebuildWorkflowStateMaps(workflowEntries: Record) { + const entryIdsByBlockExecution: Record = {} + const entryLocationById: Record = {} + + Object.entries(workflowEntries).forEach(([workflowId, entries]) => { + indexWorkflowEntries(workflowId, entries, entryIdsByBlockExecution, entryLocationById) + }) + + return { entryIdsByBlockExecution, entryLocationById } +} + +function replaceWorkflowEntries( + state: ConsoleStore, + workflowId: string, + nextEntries: ConsoleEntry[] +): Pick { + const workflowEntries = cloneWorkflowEntries(state.workflowEntries) + const entryIdsByBlockExecution = { ...state.entryIdsByBlockExecution } + const entryLocationById = { ...state.entryLocationById } + const previousEntries = workflowEntries[workflowId] ?? EMPTY_CONSOLE_ENTRIES + + removeWorkflowIndexes(workflowId, previousEntries, entryIdsByBlockExecution, entryLocationById) + + if (nextEntries.length === 0) { + delete workflowEntries[workflowId] + } else { + workflowEntries[workflowId] = nextEntries + indexWorkflowEntries(workflowId, nextEntries, entryIdsByBlockExecution, entryLocationById) + } + + return { workflowEntries, entryIdsByBlockExecution, entryLocationById } +} + +function appendWorkflowEntry( + state: ConsoleStore, + workflowId: string, + newEntry: ConsoleEntry, + trimmedEntries: ConsoleEntry[] +): Pick { + const workflowEntries = cloneWorkflowEntries(state.workflowEntries) + const previousEntries = workflowEntries[workflowId] ?? EMPTY_CONSOLE_ENTRIES + workflowEntries[workflowId] = trimmedEntries + + const entryLocationById = { ...state.entryLocationById } + const entryIdsByBlockExecution = { ...state.entryIdsByBlockExecution } + + const survivingIds = new Set(trimmedEntries.map((e) => e.id)) + const droppedEntries = previousEntries.filter((e) => !survivingIds.has(e.id)) + if (droppedEntries.length > 0) { + removeWorkflowIndexes(workflowId, droppedEntries, entryIdsByBlockExecution, entryLocationById) + } + + trimmedEntries.forEach((entry, index) => { + entryLocationById[entry.id] = { workflowId, index } + }) + + const blockExecutionKey = getBlockExecutionKey(newEntry.blockId, newEntry.executionId) + const existingIds = entryIdsByBlockExecution[blockExecutionKey] + if (existingIds) { + if (!existingIds.includes(newEntry.id)) { + entryIdsByBlockExecution[blockExecutionKey] = [...existingIds, newEntry.id] + } + } else { + entryIdsByBlockExecution[blockExecutionKey] = [newEntry.id] + } + + return { workflowEntries, entryIdsByBlockExecution, entryLocationById } +} + interface NotifyBlockErrorParams { error: unknown blockName: string @@ -108,9 +235,6 @@ interface NotifyBlockErrorParams { logContext: Record } -/** - * Sends an error notification for a block failure if error notifications are enabled. - */ const notifyBlockError = ({ error, blockName, workflowId, logContext }: NotifyBlockErrorParams) => { const settings = getQueryClient().getQueryData(generalSettingsKeys.settings()) const isErrorNotificationsEnabled = settings?.errorNotificationsEnabled ?? true @@ -141,367 +265,394 @@ const notifyBlockError = ({ error, blockName, workflowId, logContext }: NotifyBl } export const useTerminalConsoleStore = create()( - devtools( - persist( - (set, get) => ({ - entries: [], - isOpen: false, - _hasHydrated: false, - - setHasHydrated: (hasHydrated) => set({ _hasHydrated: hasHydrated }), - - addConsole: (entry: Omit) => { - set((state) => { - if (shouldSkipEntry(entry.output)) { - return { entries: state.entries } - } + devtools((set, get) => ({ + workflowEntries: {}, + entryIdsByBlockExecution: {}, + entryLocationById: {}, + isOpen: false, + _hasHydrated: false, + + addConsole: (entry: Omit) => { + if (shouldSkipEntry(entry.output)) { + return get().getWorkflowEntries(entry.workflowId)[0] as ConsoleEntry | undefined + } - const redactedEntry = { ...entry } - if ( - !isStreamingOutput(entry.output) && - redactedEntry.output && - typeof redactedEntry.output === 'object' - ) { - redactedEntry.output = redactApiKeys(redactedEntry.output) - } - if (redactedEntry.input && typeof redactedEntry.input === 'object') { - redactedEntry.input = redactApiKeys(redactedEntry.input) - } + const redactedEntry = { ...entry } + if ( + !isStreamingOutput(entry.output) && + redactedEntry.output && + typeof redactedEntry.output === 'object' + ) { + redactedEntry.output = redactApiKeys(redactedEntry.output) + } + if (redactedEntry.input && typeof redactedEntry.input === 'object') { + redactedEntry.input = redactApiKeys(redactedEntry.input) + } - const newEntry: ConsoleEntry = { - ...redactedEntry, - id: crypto.randomUUID(), - timestamp: new Date().toISOString(), - } + const createdEntry: ConsoleEntry = { + ...redactedEntry, + id: crypto.randomUUID(), + timestamp: new Date().toISOString(), + input: normalizeConsoleInput(redactedEntry.input), + output: normalizeConsoleOutput(redactedEntry.output), + error: normalizeConsoleError(redactedEntry.error), + warning: + typeof redactedEntry.warning === 'string' + ? (normalizeConsoleError(redactedEntry.warning) ?? undefined) + : redactedEntry.warning, + } - const newEntries = [newEntry, ...state.entries] + set((state) => { + const workflowEntries = state.workflowEntries[entry.workflowId] ?? EMPTY_CONSOLE_ENTRIES + const nextWorkflowEntries = trimWorkflowConsoleEntries([createdEntry, ...workflowEntries]) + return appendWorkflowEntry(state, entry.workflowId, createdEntry, nextWorkflowEntries) + }) - const executionsToRemove = new Set() + if (createdEntry.isRunning) { + consolePersistence.onRunningEntryAdded() + } - const workflowGroups = new Map() - for (const e of newEntries) { - const group = workflowGroups.get(e.workflowId) || [] - group.push(e) - workflowGroups.set(e.workflowId, group) - } + if (createdEntry.error && createdEntry.blockType !== 'cancelled') { + notifyBlockError({ + error: createdEntry.error, + blockName: createdEntry.blockName || 'Unknown Block', + workflowId: entry.workflowId, + logContext: { entryId: createdEntry.id }, + }) + } + + return createdEntry + }, + + clearWorkflowConsole: (workflowId: string) => { + set((state) => replaceWorkflowEntries(state, workflowId, EMPTY_CONSOLE_ENTRIES)) + useExecutionStore.getState().clearRunPath(workflowId) + consolePersistence.persist() + }, + + clearExecutionEntries: (executionId: string) => + set((state) => { + const nextWorkflowEntries = cloneWorkflowEntries(state.workflowEntries) + let didChange = false + + Object.entries(nextWorkflowEntries).forEach(([workflowId, entries]) => { + const filteredEntries = entries.filter((entry) => entry.executionId !== executionId) + if (filteredEntries.length !== entries.length) { + nextWorkflowEntries[workflowId] = filteredEntries + didChange = true + } + }) + + if (!didChange) { + return state + } + + const normalizedEntries = Object.fromEntries( + Object.entries(nextWorkflowEntries).filter(([, entries]) => entries.length > 0) + ) + + return { + workflowEntries: normalizedEntries, + ...rebuildWorkflowStateMaps(normalizedEntries), + } + }), + + exportConsoleCSV: (workflowId: string) => { + const entries = get().getWorkflowEntries(workflowId) + + if (entries.length === 0) { + return + } + + const formatCSVValue = (value: any): string => { + if (value === null || value === undefined) { + return '' + } + + let stringValue = typeof value === 'object' ? safeConsoleStringify(value) : String(value) + + if (stringValue.includes('"') || stringValue.includes(',') || stringValue.includes('\n')) { + stringValue = `"${stringValue.replace(/"/g, '""')}"` + } + + return stringValue + } + + const headers = [ + 'timestamp', + 'blockName', + 'blockType', + 'startedAt', + 'endedAt', + 'durationMs', + 'success', + 'input', + 'output', + 'error', + 'warning', + ] + + const csvRows = [ + headers.join(','), + ...entries.map((entry) => + [ + formatCSVValue(entry.timestamp), + formatCSVValue(entry.blockName), + formatCSVValue(entry.blockType), + formatCSVValue(entry.startedAt), + formatCSVValue(entry.endedAt), + formatCSVValue(entry.durationMs), + formatCSVValue(entry.success), + formatCSVValue(entry.input), + formatCSVValue(entry.output), + formatCSVValue(entry.error), + formatCSVValue(entry.warning), + ].join(',') + ), + ] + + const csvContent = csvRows.join('\n') + const timestamp = new Date().toISOString().replace(/[:.]/g, '-').slice(0, 19) + const filename = `terminal-console-${workflowId}-${timestamp}.csv` + + const blob = new Blob([csvContent], { type: 'text/csv;charset=utf-8;' }) + const link = document.createElement('a') + + if (link.download !== undefined) { + const url = URL.createObjectURL(blob) + link.setAttribute('href', url) + link.setAttribute('download', filename) + link.style.visibility = 'hidden' + document.body.appendChild(link) + link.click() + document.body.removeChild(link) + URL.revokeObjectURL(url) + } + }, + + getWorkflowEntries: (workflowId) => { + return get().workflowEntries[workflowId] ?? EMPTY_CONSOLE_ENTRIES + }, + + toggleConsole: () => { + set((state) => ({ isOpen: !state.isOpen })) + }, + + updateConsole: (blockId: string, update: string | ConsoleUpdate, executionId?: string) => { + set((state) => { + const candidateIds = + state.entryIdsByBlockExecution[getBlockExecutionKey(blockId, executionId)] ?? [] + if (candidateIds.length === 0) { + return state + } + + const workflowId = state.entryLocationById[candidateIds[0]]?.workflowId + if (!workflowId) { + return state + } + + const currentEntries = state.workflowEntries[workflowId] ?? EMPTY_CONSOLE_ENTRIES + let nextEntries: ConsoleEntry[] | null = null + + for (const candidateId of candidateIds) { + const location = state.entryLocationById[candidateId] + if (!location || location.workflowId !== workflowId) continue + + const source = nextEntries ?? currentEntries + const entry = source[location.index] + if (!entry || entry.id !== candidateId) continue + if (!matchesEntryForUpdate(entry, blockId, executionId, update)) continue + + if (!nextEntries) { + nextEntries = [...currentEntries] + } + + if (typeof update === 'string') { + const newOutput = normalizeConsoleOutput(updateBlockOutput(entry.output, update)) + nextEntries[location.index] = { ...entry, output: newOutput } + continue + } + + const updatedEntry = { ...entry } - for (const [workflowId, entries] of workflowGroups) { - if (entries.length <= MAX_ENTRIES_PER_WORKFLOW) continue - - const execOrder: string[] = [] - const seen = new Set() - for (const e of entries) { - const execId = e.executionId ?? e.id - if (!seen.has(execId)) { - execOrder.push(execId) - seen.add(execId) - } - } - - const counts = new Map() - for (const e of entries) { - const execId = e.executionId ?? e.id - counts.set(execId, (counts.get(execId) || 0) + 1) - } - - let total = 0 - const toKeep = new Set() - for (const execId of execOrder) { - const c = counts.get(execId) || 0 - if (total + c <= MAX_ENTRIES_PER_WORKFLOW) { - toKeep.add(execId) - total += c - } - } - - for (const execId of execOrder) { - if (!toKeep.has(execId)) { - executionsToRemove.add(`${workflowId}:${execId}`) - } - } + if (update.content !== undefined) { + updatedEntry.output = normalizeConsoleOutput( + updateBlockOutput(entry.output, update.content) + ) + } + + if (update.replaceOutput !== undefined) { + const redactedOutput = + typeof update.replaceOutput === 'object' && update.replaceOutput !== null + ? redactApiKeys(update.replaceOutput) + : update.replaceOutput + updatedEntry.output = normalizeConsoleOutput(redactedOutput) + } else if (update.output !== undefined) { + const mergedOutput = { + ...(entry.output || {}), + ...update.output, } + updatedEntry.output = + typeof mergedOutput === 'object' + ? normalizeConsoleOutput(redactApiKeys(mergedOutput)) + : normalizeConsoleOutput(mergedOutput) + } - const trimmedEntries = newEntries.filter((e) => { - const key = `${e.workflowId}:${e.executionId ?? e.id}` - return !executionsToRemove.has(key) - }) + if (update.error !== undefined) { + updatedEntry.error = normalizeConsoleError(update.error) + } - return { entries: trimmedEntries } - }) + if (update.warning !== undefined) { + updatedEntry.warning = normalizeConsoleError(update.warning) ?? undefined + } - const newEntry = get().entries[0] + if (update.success !== undefined) { + updatedEntry.success = update.success + } - if (newEntry?.error && newEntry.blockType !== 'cancelled') { - notifyBlockError({ - error: newEntry.error, - blockName: newEntry.blockName || 'Unknown Block', - workflowId: entry.workflowId, - logContext: { entryId: newEntry.id }, - }) + if (update.startedAt !== undefined) { + updatedEntry.startedAt = update.startedAt } - return newEntry - }, + if (update.endedAt !== undefined) { + updatedEntry.endedAt = update.endedAt + } - clearWorkflowConsole: (workflowId: string) => { - set((state) => ({ - entries: state.entries.filter((entry) => entry.workflowId !== workflowId), - })) - useExecutionStore.getState().clearRunPath(workflowId) - }, + if (update.durationMs !== undefined) { + updatedEntry.durationMs = update.durationMs + } - clearExecutionEntries: (executionId: string) => - set((state) => ({ - entries: state.entries.filter((e) => e.executionId !== executionId), - })), + if (update.input !== undefined) { + updatedEntry.input = + typeof update.input === 'object' && update.input !== null + ? normalizeConsoleInput(redactApiKeys(update.input)) + : normalizeConsoleInput(update.input) + } - exportConsoleCSV: (workflowId: string) => { - const entries = get().entries.filter((entry) => entry.workflowId === workflowId) + if (update.isRunning !== undefined) { + updatedEntry.isRunning = update.isRunning + } - if (entries.length === 0) { - return + if (update.isCanceled !== undefined) { + updatedEntry.isCanceled = update.isCanceled } - const formatCSVValue = (value: any): string => { - if (value === null || value === undefined) { - return '' - } + if (update.iterationCurrent !== undefined) { + updatedEntry.iterationCurrent = update.iterationCurrent + } - let stringValue = typeof value === 'object' ? JSON.stringify(value) : String(value) + if (update.iterationTotal !== undefined) { + updatedEntry.iterationTotal = update.iterationTotal + } - if ( - stringValue.includes('"') || - stringValue.includes(',') || - stringValue.includes('\n') - ) { - stringValue = `"${stringValue.replace(/"/g, '""')}"` - } + if (update.iterationType !== undefined) { + updatedEntry.iterationType = update.iterationType + } - return stringValue + if (update.iterationContainerId !== undefined) { + updatedEntry.iterationContainerId = update.iterationContainerId } - const headers = [ - 'timestamp', - 'blockName', - 'blockType', - 'startedAt', - 'endedAt', - 'durationMs', - 'success', - 'input', - 'output', - 'error', - 'warning', - ] - - const csvRows = [ - headers.join(','), - ...entries.map((entry) => - [ - formatCSVValue(entry.timestamp), - formatCSVValue(entry.blockName), - formatCSVValue(entry.blockType), - formatCSVValue(entry.startedAt), - formatCSVValue(entry.endedAt), - formatCSVValue(entry.durationMs), - formatCSVValue(entry.success), - formatCSVValue(entry.input), - formatCSVValue(entry.output), - formatCSVValue(entry.error), - formatCSVValue(entry.warning), - ].join(',') - ), - ] - - const csvContent = csvRows.join('\n') - const timestamp = new Date().toISOString().replace(/[:.]/g, '-').slice(0, 19) - const filename = `terminal-console-${workflowId}-${timestamp}.csv` - - const blob = new Blob([csvContent], { type: 'text/csv;charset=utf-8;' }) - const link = document.createElement('a') - - if (link.download !== undefined) { - const url = URL.createObjectURL(blob) - link.setAttribute('href', url) - link.setAttribute('download', filename) - link.style.visibility = 'hidden' - document.body.appendChild(link) - link.click() - document.body.removeChild(link) - URL.revokeObjectURL(url) + if (update.parentIterations !== undefined) { + updatedEntry.parentIterations = update.parentIterations } - }, - - getWorkflowEntries: (workflowId) => { - return get().entries.filter((entry) => entry.workflowId === workflowId) - }, - - toggleConsole: () => { - set((state) => ({ isOpen: !state.isOpen })) - }, - - updateConsole: (blockId: string, update: string | ConsoleUpdate, executionId?: string) => { - set((state) => { - const updatedEntries = state.entries.map((entry) => { - if (!matchesEntryForUpdate(entry, blockId, executionId, update)) { - return entry - } - - if (typeof update === 'string') { - const newOutput = updateBlockOutput(entry.output, update) - return { ...entry, output: newOutput } - } - - const updatedEntry = { ...entry } - - if (update.content !== undefined) { - updatedEntry.output = updateBlockOutput(entry.output, update.content) - } - - if (update.replaceOutput !== undefined) { - updatedEntry.output = - typeof update.replaceOutput === 'object' && update.replaceOutput !== null - ? redactApiKeys(update.replaceOutput) - : update.replaceOutput - } else if (update.output !== undefined) { - const mergedOutput = { - ...(entry.output || {}), - ...update.output, - } - updatedEntry.output = - typeof mergedOutput === 'object' ? redactApiKeys(mergedOutput) : mergedOutput - } - - if (update.error !== undefined) { - updatedEntry.error = update.error - } - - if (update.warning !== undefined) { - updatedEntry.warning = update.warning - } - - if (update.success !== undefined) { - updatedEntry.success = update.success - } - - if (update.startedAt !== undefined) { - updatedEntry.startedAt = update.startedAt - } - - if (update.endedAt !== undefined) { - updatedEntry.endedAt = update.endedAt - } - - if (update.durationMs !== undefined) { - updatedEntry.durationMs = update.durationMs - } - - if (update.input !== undefined) { - updatedEntry.input = - typeof update.input === 'object' && update.input !== null - ? redactApiKeys(update.input) - : update.input - } - - if (update.isRunning !== undefined) { - updatedEntry.isRunning = update.isRunning - } - - if (update.isCanceled !== undefined) { - updatedEntry.isCanceled = update.isCanceled - } - - if (update.iterationCurrent !== undefined) { - updatedEntry.iterationCurrent = update.iterationCurrent - } - - if (update.iterationTotal !== undefined) { - updatedEntry.iterationTotal = update.iterationTotal - } - - if (update.iterationType !== undefined) { - updatedEntry.iterationType = update.iterationType - } - - if (update.iterationContainerId !== undefined) { - updatedEntry.iterationContainerId = update.iterationContainerId - } - - if (update.parentIterations !== undefined) { - updatedEntry.parentIterations = update.parentIterations - } - - if (update.childWorkflowBlockId !== undefined) { - updatedEntry.childWorkflowBlockId = update.childWorkflowBlockId - } - - if (update.childWorkflowName !== undefined) { - updatedEntry.childWorkflowName = update.childWorkflowName - } - - if (update.childWorkflowInstanceId !== undefined) { - updatedEntry.childWorkflowInstanceId = update.childWorkflowInstanceId - } - - return updatedEntry - }) - - return { entries: updatedEntries } - }) - if (typeof update === 'object' && update.error) { - const matchingEntry = get().entries.find( - (e) => e.blockId === blockId && e.executionId === executionId - ) - notifyBlockError({ - error: update.error, - blockName: matchingEntry?.blockName || 'Unknown Block', - workflowId: matchingEntry?.workflowId, - logContext: { blockId }, - }) + if (update.childWorkflowBlockId !== undefined) { + updatedEntry.childWorkflowBlockId = update.childWorkflowBlockId } - }, - - cancelRunningEntries: (workflowId: string) => { - set((state) => { - const now = new Date() - const updatedEntries = state.entries.map((entry) => { - if (entry.workflowId === workflowId && entry.isRunning) { - const durationMs = entry.startedAt - ? now.getTime() - new Date(entry.startedAt).getTime() - : entry.durationMs - return { - ...entry, - isRunning: false, - isCanceled: true, - endedAt: now.toISOString(), - durationMs, - } - } - return entry - }) - return { entries: updatedEntries } - }) - }, - }), - { - name: 'terminal-console-store', - storage: createJSONStorage(() => indexedDBStorage), - partialize: (state) => ({ - entries: state.entries, - isOpen: state.isOpen, - }), - onRehydrateStorage: () => (_state, error) => { - if (error) { - logger.error('Failed to rehydrate console store', { error }) + + if (update.childWorkflowName !== undefined) { + updatedEntry.childWorkflowName = update.childWorkflowName } - }, - merge: (persistedState, currentState) => { - const persisted = persistedState as Partial | undefined - const rawEntries = persisted?.entries ?? currentState.entries - const oneHourAgo = Date.now() - 60 * 60 * 1000 - const entries = rawEntries.map((entry, index) => { + if (update.childWorkflowInstanceId !== undefined) { + updatedEntry.childWorkflowInstanceId = update.childWorkflowInstanceId + } + + nextEntries[location.index] = updatedEntry + } + + if (!nextEntries) { + return state + } + + const workflowEntriesClone = cloneWorkflowEntries(state.workflowEntries) + workflowEntriesClone[workflowId] = nextEntries + return { + workflowEntries: workflowEntriesClone, + entryIdsByBlockExecution: state.entryIdsByBlockExecution, + entryLocationById: state.entryLocationById, + } + }) + + if (typeof update === 'object' && update.error) { + const matchingEntry = get() + .getWorkflowEntries( + get().entryLocationById[ + (get().entryIdsByBlockExecution[getBlockExecutionKey(blockId, executionId)] ?? + [])[0] ?? '' + ]?.workflowId ?? '' + ) + .find((entry) => matchesEntryForUpdate(entry, blockId, executionId, update)) + notifyBlockError({ + error: update.error, + blockName: matchingEntry?.blockName || 'Unknown Block', + workflowId: matchingEntry?.workflowId, + logContext: { blockId }, + }) + } + }, + + cancelRunningEntries: (workflowId: string) => { + set((state) => { + const now = new Date() + const workflowEntries = state.workflowEntries[workflowId] ?? EMPTY_CONSOLE_ENTRIES + let didChange = false + const updatedEntries = workflowEntries.map((entry) => { + if (entry.workflowId === workflowId && entry.isRunning) { + didChange = true + const durationMs = entry.startedAt + ? now.getTime() - new Date(entry.startedAt).getTime() + : entry.durationMs + return { + ...entry, + isRunning: false, + isCanceled: true, + endedAt: now.toISOString(), + durationMs, + } + } + return entry + }) + if (!didChange) { + return state + } + return replaceWorkflowEntries(state, workflowId, updatedEntries) + }) + }, + })) +) + +/** + * Hydrates the console store from IndexedDB on startup. + * Applies the same normalization and trimming as the old persist merge. + */ +async function hydrateConsoleStore(): Promise { + try { + const data = await loadConsoleData() + + if (!data) { + useTerminalConsoleStore.setState({ _hasHydrated: true }) + return + } + + const oneHourAgo = Date.now() - 60 * 60 * 1000 + + const workflowEntries = Object.fromEntries( + Object.entries(data.workflowEntries).map(([workflowId, entries]) => [ + workflowId, + trimWorkflowConsoleEntries( + entries.map((entry, index) => { let updated = entry if (entry.executionOrder === undefined) { updated = { ...updated, executionOrder: index + 1 } @@ -513,26 +664,93 @@ export const useTerminalConsoleStore = create()( ) { updated = { ...updated, isRunning: false } } + updated = { + ...updated, + input: normalizeConsoleInput(updated.input), + output: normalizeConsoleOutput(updated.output), + error: normalizeConsoleError(updated.error), + warning: + typeof updated.warning === 'string' + ? (normalizeConsoleError(updated.warning) ?? undefined) + : updated.warning, + } return updated }) + ), + ]) + ) - return { - ...currentState, - entries, - isOpen: persisted?.isOpen ?? currentState.isOpen, - } - }, + const currentState = useTerminalConsoleStore.getState() + const mergedWorkflowEntries = { ...workflowEntries } + + for (const [wfId, currentEntries] of Object.entries(currentState.workflowEntries)) { + if (currentEntries.length > 0) { + const persistedEntries = mergedWorkflowEntries[wfId] ?? [] + const persistedIds = new Set(persistedEntries.map((e) => e.id)) + const newEntries = currentEntries.filter((e) => !persistedIds.has(e.id)) + if (newEntries.length > 0) { + mergedWorkflowEntries[wfId] = trimWorkflowConsoleEntries([ + ...newEntries, + ...persistedEntries, + ]) + } } - ) - ) -) + } -if (typeof window !== 'undefined') { - useTerminalConsoleStore.persist.onFinishHydration(() => { + useTerminalConsoleStore.setState({ + workflowEntries: mergedWorkflowEntries, + ...rebuildWorkflowStateMaps(mergedWorkflowEntries), + isOpen: data.isOpen, + _hasHydrated: true, + }) + } catch (error) { + logger.error('Failed to hydrate console store', { error }) useTerminalConsoleStore.setState({ _hasHydrated: true }) + } +} + +if (typeof window !== 'undefined') { + consolePersistence.bind(() => { + const state = useTerminalConsoleStore.getState() + return { + workflowEntries: state.workflowEntries, + isOpen: state.isOpen, + } }) - if (useTerminalConsoleStore.persist.hasHydrated()) { - useTerminalConsoleStore.setState({ _hasHydrated: true }) - } + hydrateConsoleStore() + + window.addEventListener('pagehide', () => consolePersistence.persist()) +} + +export function useWorkflowConsoleEntries(workflowId?: string): ConsoleEntry[] { + return useTerminalConsoleStore( + useShallow((state) => { + if (!workflowId) { + return EMPTY_CONSOLE_ENTRIES + } + + return state.workflowEntries[workflowId] ?? EMPTY_CONSOLE_ENTRIES + }) + ) +} + +export function useConsoleEntry(entryId?: string | null): ConsoleEntry | null { + return useTerminalConsoleStore((state) => { + if (!entryId) { + return null + } + + const location = state.entryLocationById[entryId] + if (!location) { + return null + } + + const entry = state.workflowEntries[location.workflowId]?.[location.index] + if (!entry || entry.id !== entryId) { + return null + } + + return entry + }) } diff --git a/apps/sim/stores/terminal/console/types.ts b/apps/sim/stores/terminal/console/types.ts index 80610cc8d82..8aa342d7a11 100644 --- a/apps/sim/stores/terminal/console/types.ts +++ b/apps/sim/stores/terminal/console/types.ts @@ -58,10 +58,17 @@ export interface ConsoleUpdate { childWorkflowInstanceId?: string } +export interface ConsoleEntryLocation { + workflowId: string + index: number +} + export interface ConsoleStore { - entries: ConsoleEntry[] + workflowEntries: Record + entryIdsByBlockExecution: Record + entryLocationById: Record isOpen: boolean - addConsole: (entry: Omit) => ConsoleEntry + addConsole: (entry: Omit) => ConsoleEntry | undefined clearWorkflowConsole: (workflowId: string) => void clearExecutionEntries: (executionId: string) => void exportConsoleCSV: (workflowId: string) => void @@ -70,5 +77,4 @@ export interface ConsoleStore { updateConsole: (blockId: string, update: string | ConsoleUpdate, executionId?: string) => void cancelRunningEntries: (workflowId: string) => void _hasHydrated: boolean - setHasHydrated: (hasHydrated: boolean) => void } diff --git a/apps/sim/stores/terminal/console/utils.test.ts b/apps/sim/stores/terminal/console/utils.test.ts new file mode 100644 index 00000000000..d86a27a3f29 --- /dev/null +++ b/apps/sim/stores/terminal/console/utils.test.ts @@ -0,0 +1,89 @@ +/** + * @vitest-environment node + */ +import { describe, expect, it } from 'vitest' +import type { ConsoleEntry } from './types' +import { + normalizeConsoleOutput, + safeConsoleStringify, + TERMINAL_CONSOLE_LIMITS, + trimConsoleEntries, +} from './utils' + +function makeEntry(id: string, executionId: string, workflowId = 'wf-1'): ConsoleEntry { + return { + id, + executionId, + workflowId, + blockId: `block-${id}`, + blockName: `Block ${id}`, + blockType: 'function', + executionOrder: Number.parseInt(id.replace(/\D/g, ''), 10) || 0, + timestamp: '2025-01-01T00:00:00.000Z', + } +} + +describe('terminal console utils', () => { + it('safely stringifies circular values', () => { + const circular: { name: string; self?: unknown } = { name: 'root' } + circular.self = circular + + const result = safeConsoleStringify(circular) + + expect(result).toContain('[Circular]') + expect(result).toContain('"name": "root"') + }) + + it('truncates oversized nested strings in console output', () => { + const output = normalizeConsoleOutput({ + stdout: 'x'.repeat(TERMINAL_CONSOLE_LIMITS.MAX_STRING_LENGTH + 100), + }) + + expect(output?.stdout).toContain('[truncated 100 chars]') + }) + + it('caps oversized normalized payloads with a preview object', () => { + const output = normalizeConsoleOutput({ + a: 'x'.repeat(100_000), + b: 'y'.repeat(100_000), + c: 'z'.repeat(100_000), + d: 'q'.repeat(100_000), + e: 'r'.repeat(100_000), + f: 's'.repeat(100_000), + }) as Record + + expect(output.__simTruncated).toBe(true) + expect(typeof output.__simPreview).toBe('string') + expect(typeof output.__simByteLength).toBe('number') + }) + + it('preserves the newest oversized execution by trimming within it first', () => { + const newestEntries = Array.from({ length: 5_100 }, (_, index) => + makeEntry(`new-${index}`, 'exec-new') + ) + const olderEntries = Array.from({ length: 25 }, (_, index) => + makeEntry(`old-${index}`, 'exec-old') + ) + const trimmed = trimConsoleEntries([...newestEntries, ...olderEntries]) + + expect(trimmed).toHaveLength(TERMINAL_CONSOLE_LIMITS.MAX_ENTRIES_PER_WORKFLOW) + expect(trimmed.every((entry) => entry.executionId === 'exec-new')).toBe(true) + expect(trimmed[0].id).toBe('new-0') + expect(trimmed.at(-1)?.id).toBe(`new-${TERMINAL_CONSOLE_LIMITS.MAX_ENTRIES_PER_WORKFLOW - 1}`) + }) + + it('keeps older whole executions when they still fit after the newest run', () => { + const newestEntries = Array.from({ length: 4_990 }, (_, index) => + makeEntry(`new-${index}`, 'exec-new') + ) + const olderEntries = Array.from({ length: 10 }, (_, index) => + makeEntry(`old-${index}`, 'exec-old') + ) + + const trimmed = trimConsoleEntries([...newestEntries, ...olderEntries]) + + expect(trimmed).toHaveLength(5_000) + expect(trimmed.filter((entry) => entry.executionId === 'exec-new')).toHaveLength(4_990) + expect(trimmed.filter((entry) => entry.executionId === 'exec-old')).toHaveLength(10) + }) +}) diff --git a/apps/sim/stores/terminal/console/utils.ts b/apps/sim/stores/terminal/console/utils.ts new file mode 100644 index 00000000000..659d0e74aaf --- /dev/null +++ b/apps/sim/stores/terminal/console/utils.ts @@ -0,0 +1,294 @@ +import type { NormalizedBlockOutput } from '@/executor/types' +import type { ConsoleEntry } from './types' + +/** + * Terminal console safety limits used to bound persisted debug data. + */ +export const TERMINAL_CONSOLE_LIMITS = { + MAX_ENTRIES_PER_WORKFLOW: 5000, + MAX_STRING_LENGTH: 50_000, + MAX_OBJECT_KEYS: 100, + MAX_ARRAY_ITEMS: 100, + MAX_DEPTH: 6, + MAX_SERIALIZED_BYTES: 256 * 1024, + MAX_SERIALIZED_PREVIEW_LENGTH: 10_000, +} as const + +const textEncoder = new TextEncoder() + +/** + * Returns the UTF-8 byte length of a string. + */ +function getByteLength(value: string): number { + return textEncoder.encode(value).length +} + +/** + * Truncates a string while preserving a short explanation. + */ +function truncateString( + value: string, + maxLength: number = TERMINAL_CONSOLE_LIMITS.MAX_STRING_LENGTH +): string { + if (value.length <= maxLength) { + return value + } + + return `${value.slice(0, maxLength)}... [truncated ${value.length - maxLength} chars]` +} + +/** + * Safely stringifies terminal data without throwing on circular or non-JSON-safe values. + */ +export function safeConsoleStringify(value: unknown): string { + const seen = new WeakSet() + + try { + return ( + JSON.stringify( + value, + (_key, currentValue) => { + if (typeof currentValue === 'bigint') { + return `${currentValue.toString()}n` + } + + if (currentValue instanceof Error) { + return { + name: currentValue.name, + message: currentValue.message, + stack: currentValue.stack, + } + } + + if (typeof currentValue === 'function') { + return `[Function ${currentValue.name || 'anonymous'}]` + } + + if (typeof currentValue === 'symbol') { + return currentValue.toString() + } + + if (typeof currentValue === 'object' && currentValue !== null) { + if (seen.has(currentValue)) { + return '[Circular]' + } + + seen.add(currentValue) + } + + return currentValue + }, + 2 + ) ?? '' + ) + } catch { + try { + return String(value) + } catch { + return '[Unserializable value]' + } + } +} + +/** + * Produces a terminal-safe representation of any value. + */ +export function normalizeConsoleValue(value: unknown, depth = 0): unknown { + if (value === null || value === undefined) { + return value + } + + if (typeof value === 'string') { + return truncateString(value) + } + + if (typeof value === 'number' || typeof value === 'boolean') { + return value + } + + if (typeof value === 'bigint') { + return `${value.toString()}n` + } + + if (typeof value === 'function') { + return `[Function ${value.name || 'anonymous'}]` + } + + if (typeof value === 'symbol') { + return value.toString() + } + + if (value instanceof Error) { + return { + name: value.name, + message: truncateString(value.message), + stack: value.stack ? truncateString(value.stack) : undefined, + } + } + + if (depth >= TERMINAL_CONSOLE_LIMITS.MAX_DEPTH) { + return `[Truncated ${Array.isArray(value) ? 'array' : 'object'}]` + } + + if (Array.isArray(value)) { + const normalizedItems = value + .slice(0, TERMINAL_CONSOLE_LIMITS.MAX_ARRAY_ITEMS) + .map((item) => normalizeConsoleValue(item, depth + 1)) + + if (value.length > TERMINAL_CONSOLE_LIMITS.MAX_ARRAY_ITEMS) { + normalizedItems.push( + `[... truncated ${value.length - TERMINAL_CONSOLE_LIMITS.MAX_ARRAY_ITEMS} items]` + ) + } + + return normalizedItems + } + + const objectEntries = Object.entries(value as Record) + const normalizedObject: Record = {} + + for (const [key, entryValue] of objectEntries.slice(0, TERMINAL_CONSOLE_LIMITS.MAX_OBJECT_KEYS)) { + normalizedObject[key] = normalizeConsoleValue(entryValue, depth + 1) + } + + if (objectEntries.length > TERMINAL_CONSOLE_LIMITS.MAX_OBJECT_KEYS) { + normalizedObject.__simTruncatedKeys = + objectEntries.length - TERMINAL_CONSOLE_LIMITS.MAX_OBJECT_KEYS + } + + return normalizedObject +} + +/** + * Applies a final serialized-size cap after recursive normalization. + */ +function capNormalizedValue(value: unknown): unknown { + if (value === null || value === undefined) { + return value + } + + const serialized = safeConsoleStringify(value) + const serializedBytes = getByteLength(serialized) + + if (serializedBytes <= TERMINAL_CONSOLE_LIMITS.MAX_SERIALIZED_BYTES) { + return value + } + + return { + __simTruncated: true, + __simByteLength: serializedBytes, + __simPreview: truncateString(serialized, TERMINAL_CONSOLE_LIMITS.MAX_SERIALIZED_PREVIEW_LENGTH), + } +} + +/** + * Normalizes terminal input data before it is stored. + */ +export function normalizeConsoleInput(input: unknown): unknown { + return capNormalizedValue(normalizeConsoleValue(input)) +} + +/** + * Normalizes terminal output data before it is stored. + */ +export function normalizeConsoleOutput(output: unknown): NormalizedBlockOutput | undefined { + if (output === undefined) { + return undefined + } + + return capNormalizedValue(normalizeConsoleValue(output)) as NormalizedBlockOutput +} + +/** + * Normalizes terminal error data before it is stored. + */ +export function normalizeConsoleError(error: unknown): string | null | undefined { + if (error === undefined) { + return undefined + } + + if (error === null) { + return null + } + + return truncateString( + typeof error === 'string' ? error : safeConsoleStringify(normalizeConsoleValue(error)) + ) +} + +/** + * Returns a workflow's entries trimmed to the configured cap. + */ +export function trimWorkflowConsoleEntries(entries: ConsoleEntry[]): ConsoleEntry[] { + if (entries.length <= TERMINAL_CONSOLE_LIMITS.MAX_ENTRIES_PER_WORKFLOW) { + return entries + } + + const executionGroups = new Map() + + for (const entry of entries) { + const executionId = entry.executionId ?? entry.id + const group = executionGroups.get(executionId) + if (group) { + group.push(entry) + } else { + executionGroups.set(executionId, [entry]) + } + } + + const executionIds = [...executionGroups.keys()] + const newestExecutionId = executionIds[0] + + if (!newestExecutionId) { + return entries.slice(0, TERMINAL_CONSOLE_LIMITS.MAX_ENTRIES_PER_WORKFLOW) + } + + const keptEntryIds = new Set() + let remainingSlots = TERMINAL_CONSOLE_LIMITS.MAX_ENTRIES_PER_WORKFLOW + + const newestExecutionEntries = executionGroups.get(newestExecutionId) ?? [] + const newestExecutionToKeep = newestExecutionEntries.slice(0, remainingSlots) + newestExecutionToKeep.forEach((entry) => keptEntryIds.add(entry.id)) + remainingSlots -= newestExecutionToKeep.length + + for (const executionId of executionIds.slice(1)) { + const executionEntries = executionGroups.get(executionId) ?? [] + + if (executionEntries.length > remainingSlots) { + continue + } + + executionEntries.forEach((entry) => keptEntryIds.add(entry.id)) + remainingSlots -= executionEntries.length + + if (remainingSlots === 0) { + break + } + } + + return entries.filter((entry) => keptEntryIds.has(entry.id)) +} + +/** + * Applies workflow-level trimming while preserving newest-first order. + */ +export function trimConsoleEntries(entries: ConsoleEntry[]): ConsoleEntry[] { + const workflowGroups = new Map() + + for (const entry of entries) { + const workflowEntries = workflowGroups.get(entry.workflowId) + if (workflowEntries) { + workflowEntries.push(entry) + } else { + workflowGroups.set(entry.workflowId, [entry]) + } + } + + const keptEntryIds = new Set() + + for (const workflowEntries of workflowGroups.values()) { + trimWorkflowConsoleEntries(workflowEntries).forEach((entry) => keptEntryIds.add(entry.id)) + } + + return entries.filter((entry) => keptEntryIds.has(entry.id)) +} diff --git a/apps/sim/stores/terminal/index.ts b/apps/sim/stores/terminal/index.ts index e031ce303d7..42854c0854c 100644 --- a/apps/sim/stores/terminal/index.ts +++ b/apps/sim/stores/terminal/index.ts @@ -1,4 +1,20 @@ export type { ConsoleEntry, ConsoleStore, ConsoleUpdate } from './console' -export { useTerminalConsoleStore } from './console' +export { + clearExecutionPointer, + consolePersistence, + type ExecutionPointer, + loadExecutionPointer, + normalizeConsoleError, + normalizeConsoleInput, + normalizeConsoleOutput, + safeConsoleStringify, + saveExecutionPointer, + TERMINAL_CONSOLE_LIMITS, + trimConsoleEntries, + trimWorkflowConsoleEntries, + useConsoleEntry, + useTerminalConsoleStore, + useWorkflowConsoleEntries, +} from './console' export { useTerminalStore } from './store' export type { TerminalState } from './types'