diff --git a/apps/sim/lib/knowledge/connectors/sync-engine.ts b/apps/sim/lib/knowledge/connectors/sync-engine.ts index efef605f52..458281dd44 100644 --- a/apps/sim/lib/knowledge/connectors/sync-engine.ts +++ b/apps/sim/lib/knowledge/connectors/sync-engine.ts @@ -6,13 +6,14 @@ import { knowledgeConnectorSyncLog, } from '@sim/db/schema' import { createLogger } from '@sim/logger' -import { and, eq, inArray, isNull, lt, ne, sql } from 'drizzle-orm' +import { and, eq, gt, inArray, isNull, lt, ne, or, sql } from 'drizzle-orm' import { decryptApiKey } from '@/lib/api-key/crypto' import { getInternalApiBaseUrl } from '@/lib/core/utils/urls' +import type { DocumentData } from '@/lib/knowledge/documents/service' import { hardDeleteDocuments, isTriggerAvailable, - processDocumentAsync, + processDocumentsWithQueue, } from '@/lib/knowledge/documents/service' import { StorageService } from '@/lib/uploads' import { deleteFile } from '@/lib/uploads/core/storage-service' @@ -39,6 +40,8 @@ class ConnectorDeletedException extends Error { const SYNC_BATCH_SIZE = 5 const MAX_PAGES = 500 const MAX_SAFE_TITLE_LENGTH = 200 +const STALE_PROCESSING_MINUTES = 45 +const RETRY_WINDOW_DAYS = 7 /** Sanitizes a document title for use in S3 storage keys. */ function sanitizeStorageTitle(title: string): string { @@ -147,11 +150,14 @@ export async function dispatchSync( const requestId = options?.requestId ?? crypto.randomUUID() if (isTriggerAvailable()) { - await knowledgeConnectorSync.trigger({ - connectorId, - fullSync: options?.fullSync, - requestId, - }) + await knowledgeConnectorSync.trigger( + { + connectorId, + fullSync: options?.fullSync, + requestId, + }, + { tags: [`connector:${connectorId}`] } + ) logger.info(`Dispatched connector sync to Trigger.dev`, { connectorId, requestId }) } else { executeSync(connectorId, { fullSync: options?.fullSync }).catch((error) => { @@ -395,6 +401,8 @@ export async function executeSync( const seenExternalIds = new Set() + const pendingProcessing: DocumentData[] = [] + const pendingOps: DocOp[] = [] for (const extDoc of externalDocs) { seenExternalIds.add(extDoc.externalId) @@ -503,6 +511,7 @@ export async function executeSync( for (let j = 0; j < settled.length; j++) { const outcome = settled[j] if (outcome.status === 'fulfilled') { + pendingProcessing.push(outcome.value) if (batch[j].type === 'add') result.docsAdded++ else result.docsUpdated++ } else { @@ -537,9 +546,14 @@ export async function executeSync( throw new Error(`Knowledge base ${connector.knowledgeBaseId} was deleted during sync`) } - // Retry stuck documents that failed or never completed processing. + // Retry stuck documents that failed, never started, or were abandoned mid-processing. // Only retry docs uploaded BEFORE this sync — docs added in the current sync // are still processing asynchronously and would cause a duplicate processing race. + // Documents stuck in 'processing' beyond STALE_PROCESSING_MINUTES are considered + // abandoned (e.g. the Trigger.dev task process exited before processing completed). + // Documents uploaded more than RETRY_WINDOW_DAYS ago are not retried. + const staleProcessingCutoff = new Date(Date.now() - STALE_PROCESSING_MINUTES * 60 * 1000) + const retryCutoff = new Date(Date.now() - RETRY_WINDOW_DAYS * 24 * 60 * 60 * 1000) const stuckDocs = await db .select({ id: document.id, @@ -552,8 +566,18 @@ export async function executeSync( .where( and( eq(document.connectorId, connectorId), - inArray(document.processingStatus, ['pending', 'failed']), + or( + inArray(document.processingStatus, ['pending', 'failed']), + and( + eq(document.processingStatus, 'processing'), + or( + isNull(document.processingStartedAt), + lt(document.processingStartedAt, staleProcessingCutoff) + ) + ) + ), lt(document.uploadedAt, syncStartedAt), + gt(document.uploadedAt, retryCutoff), eq(document.userExcluded, false), isNull(document.archivedAt), isNull(document.deletedAt) @@ -562,28 +586,60 @@ export async function executeSync( if (stuckDocs.length > 0) { logger.info(`Retrying ${stuckDocs.length} stuck documents`, { connectorId }) - for (const doc of stuckDocs) { - processDocumentAsync( - connector.knowledgeBaseId, - doc.id, - { + try { + await processDocumentsWithQueue( + stuckDocs.map((doc) => ({ + documentId: doc.id, filename: doc.filename ?? 'document.txt', fileUrl: doc.fileUrl ?? '', fileSize: doc.fileSize ?? 0, mimeType: doc.mimeType ?? 'text/plain', - }, - {} - ).catch((error) => { - logger.warn('Failed to retry stuck document', { - documentId: doc.id, - error: error instanceof Error ? error.message : String(error), - }) + })), + connector.knowledgeBaseId, + {}, + crypto.randomUUID() + ) + } catch (error) { + logger.warn('Failed to enqueue stuck documents for reprocessing', { + connectorId, + count: stuckDocs.length, + error: error instanceof Error ? error.message : String(error), + }) + } + } + + // Enqueue all added/updated documents for processing in a single batch + if (pendingProcessing.length > 0) { + try { + await processDocumentsWithQueue( + pendingProcessing, + connector.knowledgeBaseId, + {}, + crypto.randomUUID() + ) + } catch (error) { + logger.warn('Failed to enqueue documents for processing — will retry on next sync', { + connectorId, + count: pendingProcessing.length, + error: error instanceof Error ? error.message : String(error), }) } } await completeSyncLog(syncLogId, 'completed', result) + const [{ count: actualDocCount }] = await db + .select({ count: sql`count(*)::int` }) + .from(document) + .where( + and( + eq(document.connectorId, connectorId), + eq(document.userExcluded, false), + isNull(document.archivedAt), + isNull(document.deletedAt) + ) + ) + const now = new Date() await db .update(knowledgeConnector) @@ -591,7 +647,7 @@ export async function executeSync( status: 'active', lastSyncAt: now, lastSyncError: null, - lastSyncDocCount: externalDocs.length, + lastSyncDocCount: actualDocCount, nextSyncAt: calculateNextSyncTime(connector.syncIntervalMinutes), consecutiveFailures: 0, updatedAt: now, @@ -711,7 +767,7 @@ async function addDocument( connectorType: string, extDoc: ExternalDocument, sourceConfig?: Record -): Promise { +): Promise { if (await isKnowledgeBaseDeleted(knowledgeBaseId)) { throw new Error(`Knowledge base ${knowledgeBaseId} is deleted`) } @@ -773,23 +829,13 @@ async function addDocument( throw error } - processDocumentAsync( - knowledgeBaseId, + return { documentId, - { - filename: processingFilename, - fileUrl, - fileSize: contentBuffer.length, - mimeType: 'text/plain', - }, - {} - ).catch((error) => { - logger.error('Failed to process connector document', { - documentId, - connectorId, - error: error instanceof Error ? error.message : String(error), - }) - }) + filename: processingFilename, + fileUrl, + fileSize: contentBuffer.length, + mimeType: 'text/plain', + } } /** @@ -803,7 +849,7 @@ async function updateDocument( connectorType: string, extDoc: ExternalDocument, sourceConfig?: Record -): Promise { +): Promise { if (await isKnowledgeBaseDeleted(knowledgeBaseId)) { throw new Error(`Knowledge base ${knowledgeBaseId} is deleted`) } @@ -894,21 +940,11 @@ async function updateDocument( } } - processDocumentAsync( - knowledgeBaseId, - existingDocId, - { - filename: processingFilename, - fileUrl, - fileSize: contentBuffer.length, - mimeType: 'text/plain', - }, - {} - ).catch((error) => { - logger.error('Failed to re-process updated connector document', { - documentId: existingDocId, - connectorId, - error: error instanceof Error ? error.message : String(error), - }) - }) + return { + documentId: existingDocId, + filename: processingFilename, + fileUrl, + fileSize: contentBuffer.length, + mimeType: 'text/plain', + } } diff --git a/apps/sim/lib/knowledge/documents/service.ts b/apps/sim/lib/knowledge/documents/service.ts index 4d1b1dc71e..5f7bf7a844 100644 --- a/apps/sim/lib/knowledge/documents/service.ts +++ b/apps/sim/lib/knowledge/documents/service.ts @@ -114,11 +114,11 @@ export interface DocumentData { } export interface ProcessingOptions { - chunkSize: number - minCharactersPerChunk: number - recipe: string - lang: string - chunkOverlap: number + chunkSize?: number + minCharactersPerChunk?: number + recipe?: string + lang?: string + chunkOverlap?: number } export interface DocumentJobData { @@ -668,7 +668,7 @@ export function isTriggerAvailable(): boolean { export async function processDocumentsWithTrigger( documents: DocumentProcessingPayload[], requestId: string -): Promise<{ success: boolean; message: string; jobIds?: string[] }> { +): Promise<{ success: boolean; message: string; batchIds?: string[] }> { if (!isTriggerAvailable()) { throw new Error('Trigger.dev is not configured - TRIGGER_SECRET_KEY missing') } @@ -676,19 +676,32 @@ export async function processDocumentsWithTrigger( try { logger.info(`[${requestId}] Triggering background processing for ${documents.length} documents`) - const jobPromises = documents.map(async (document) => { - const job = await tasks.trigger('knowledge-process-document', document) - return job.id - }) - - const jobIds = await Promise.all(jobPromises) + const MAX_BATCH_SIZE = 1000 + const batchIds: string[] = [] + + for (let i = 0; i < documents.length; i += MAX_BATCH_SIZE) { + const chunk = documents.slice(i, i + MAX_BATCH_SIZE) + const batchResult = await tasks.batchTrigger( + 'knowledge-process-document', + chunk.map((doc) => ({ + payload: doc, + options: { + idempotencyKey: `doc-process-${doc.documentId}-${requestId}`, + tags: [`kb:${doc.knowledgeBaseId}`, `doc:${doc.documentId}`], + }, + })) + ) + batchIds.push(batchResult.batchId) + } - logger.info(`[${requestId}] Triggered ${jobIds.length} document processing jobs`) + logger.info( + `[${requestId}] Triggered ${documents.length} document processing jobs in ${batchIds.length} batch(es)` + ) return { success: true, message: `${documents.length} document processing jobs triggered`, - jobIds, + batchIds, } } catch (error) { logger.error(`[${requestId}] Failed to trigger document processing jobs:`, error) @@ -1590,10 +1603,19 @@ export async function retryDocumentProcessing( chunkOverlap: kbConfig.overlap, } - processDocumentAsync(knowledgeBaseId, documentId, docData, processingOptions).catch( - (error: unknown) => { - logger.error(`[${requestId}] Background retry processing error:`, error) - } + await processDocumentsWithQueue( + [ + { + documentId, + filename: docData.filename, + fileUrl: docData.fileUrl, + fileSize: docData.fileSize, + mimeType: docData.mimeType, + }, + ], + knowledgeBaseId, + processingOptions, + requestId ) logger.info(`[${requestId}] Document retry initiated: ${documentId}`)