diff --git a/apps/sim/hooks/queries/kb/connectors.ts b/apps/sim/hooks/queries/kb/connectors.ts index fd86f306e0a..1b737d8f2a5 100644 --- a/apps/sim/hooks/queries/kb/connectors.ts +++ b/apps/sim/hooks/queries/kb/connectors.ts @@ -88,6 +88,21 @@ async function fetchConnectorDetail( return result.data } +/** Stop polling for initial sync after 2 minutes */ +const PENDING_SYNC_WINDOW_MS = 2 * 60 * 1000 + +/** + * Checks if a connector is syncing or awaiting its first sync within the allowed window + */ +export function isConnectorSyncingOrPending(connector: ConnectorData): boolean { + if (connector.status === 'syncing') return true + return ( + connector.status === 'active' && + !connector.lastSyncAt && + Date.now() - new Date(connector.createdAt).getTime() < PENDING_SYNC_WINDOW_MS + ) +} + export function useConnectorList(knowledgeBaseId?: string) { return useQuery({ queryKey: connectorKeys.list(knowledgeBaseId), @@ -97,8 +112,8 @@ export function useConnectorList(knowledgeBaseId?: string) { placeholderData: keepPreviousData, refetchInterval: (query) => { const connectors = query.state.data - const hasSyncing = connectors?.some((c) => c.status === 'syncing') - return hasSyncing ? 3000 : false + if (!connectors?.length) return false + return connectors.some(isConnectorSyncingOrPending) ? 3000 : false }, }) } diff --git a/apps/sim/lib/knowledge/connectors/sync-engine.ts b/apps/sim/lib/knowledge/connectors/sync-engine.ts index a07005a95d9..efef605f52c 100644 --- a/apps/sim/lib/knowledge/connectors/sync-engine.ts +++ b/apps/sim/lib/knowledge/connectors/sync-engine.ts @@ -6,7 +6,7 @@ import { knowledgeConnectorSyncLog, } from '@sim/db/schema' import { createLogger } from '@sim/logger' -import { and, eq, inArray, isNull, ne, sql } from 'drizzle-orm' +import { and, eq, inArray, isNull, lt, ne, sql } from 'drizzle-orm' import { decryptApiKey } from '@/lib/api-key/crypto' import { getInternalApiBaseUrl } from '@/lib/core/utils/urls' import { @@ -272,11 +272,12 @@ export async function executeSync( } const syncLogId = crypto.randomUUID() + const syncStartedAt = new Date() await db.insert(knowledgeConnectorSyncLog).values({ id: syncLogId, connectorId, status: 'started', - startedAt: new Date(), + startedAt: syncStartedAt, }) let syncExitedCleanly = false @@ -536,19 +537,23 @@ export async function executeSync( throw new Error(`Knowledge base ${connector.knowledgeBaseId} was deleted during sync`) } - // Retry stuck documents that failed or never completed processing + // Retry stuck documents that failed or never completed processing. + // Only retry docs uploaded BEFORE this sync — docs added in the current sync + // are still processing asynchronously and would cause a duplicate processing race. const stuckDocs = await db .select({ id: document.id, fileUrl: document.fileUrl, filename: document.filename, fileSize: document.fileSize, + mimeType: document.mimeType, }) .from(document) .where( and( eq(document.connectorId, connectorId), inArray(document.processingStatus, ['pending', 'failed']), + lt(document.uploadedAt, syncStartedAt), eq(document.userExcluded, false), isNull(document.archivedAt), isNull(document.deletedAt) @@ -565,7 +570,7 @@ export async function executeSync( filename: doc.filename ?? 'document.txt', fileUrl: doc.fileUrl ?? '', fileSize: doc.fileSize ?? 0, - mimeType: 'text/plain', + mimeType: doc.mimeType ?? 'text/plain', }, {} ).catch((error) => { diff --git a/apps/sim/lib/knowledge/documents/document-processor.ts b/apps/sim/lib/knowledge/documents/document-processor.ts index 0185de495b1..72bf9007c9d 100644 --- a/apps/sim/lib/knowledge/documents/document-processor.ts +++ b/apps/sim/lib/knowledge/documents/document-processor.ts @@ -7,7 +7,7 @@ import { parseBuffer, parseFile } from '@/lib/file-parsers' import type { FileParseMetadata } from '@/lib/file-parsers/types' import { retryWithExponentialBackoff } from '@/lib/knowledge/documents/utils' import { StorageService } from '@/lib/uploads' -import { isInternalFileUrl } from '@/lib/uploads/utils/file-utils' +import { getExtensionFromMimeType, isInternalFileUrl } from '@/lib/uploads/utils/file-utils' import { downloadFileFromUrl } from '@/lib/uploads/utils/file-utils.server' import { mistralParserTool } from '@/tools/mistral/parser' @@ -727,7 +727,7 @@ async function parseWithFileParser(fileUrl: string, filename: string, mimeType: if (fileUrl.startsWith('data:')) { content = await parseDataURI(fileUrl, filename, mimeType) } else if (fileUrl.startsWith('http')) { - const result = await parseHttpFile(fileUrl, filename) + const result = await parseHttpFile(fileUrl, filename, mimeType) content = result.content metadata = result.metadata || {} } else { @@ -759,7 +759,10 @@ async function parseDataURI(fileUrl: string, filename: string, mimeType: string) : decodeURIComponent(base64Data) } - const extension = filename.split('.').pop()?.toLowerCase() || 'txt' + const extension = + (filename.includes('.') ? filename.split('.').pop()?.toLowerCase() : undefined) || + getExtensionFromMimeType(mimeType) || + 'txt' const buffer = Buffer.from(base64Data, 'base64') const result = await parseBuffer(buffer, extension) return result.content @@ -767,13 +770,17 @@ async function parseDataURI(fileUrl: string, filename: string, mimeType: string) async function parseHttpFile( fileUrl: string, - filename: string + filename: string, + mimeType?: string ): Promise<{ content: string; metadata?: FileParseMetadata }> { const buffer = await downloadFileWithTimeout(fileUrl) - const extension = filename.split('.').pop()?.toLowerCase() + let extension = filename.includes('.') ? filename.split('.').pop()?.toLowerCase() : undefined + if (!extension && mimeType) { + extension = getExtensionFromMimeType(mimeType) ?? undefined + } if (!extension) { - throw new Error(`Could not determine file extension: ${filename}`) + throw new Error(`Could not determine file type for: ${filename}`) } const result = await parseBuffer(buffer, extension)