@@ -4,7 +4,7 @@ import { inject } from '@adonisjs/core'
44import logger from '@adonisjs/core/services/logger'
55import { TokenChunker } from '@chonkiejs/core'
66import sharp from 'sharp'
7- import { deleteFileIfExists , determineFileType , getFile , getFileStatsIfExists , listDirectoryContentsRecursive } from '../utils/fs.js'
7+ import { deleteFileIfExists , determineFileType , getFile , getFileStatsIfExists , listDirectoryContentsRecursive , ZIM_STORAGE_PATH } from '../utils/fs.js'
88import { PDFParse } from 'pdf-parse'
99import { createWorker } from 'tesseract.js'
1010import { fromBuffer } from 'pdf2pic'
@@ -399,14 +399,14 @@ export class RagService {
399399 totalArticles ?: number
400400 } > {
401401 const zimExtractionService = new ZIMExtractionService ( )
402-
402+
403403 // Process in batches to avoid lock timeout
404404 const startOffset = batchOffset || 0
405-
405+
406406 logger . info (
407407 `[RAG] Extracting ZIM content (batch: offset=${ startOffset } , size=${ ZIM_BATCH_SIZE } )`
408408 )
409-
409+
410410 const zimChunks = await zimExtractionService . extractZIMContent ( filepath , {
411411 startOffset,
412412 batchSize : ZIM_BATCH_SIZE ,
@@ -935,4 +935,149 @@ export class RagService {
935935 return { success : false , message : 'Error discovering Nomad docs.' }
936936 }
937937 }
938+
939+ /**
940+ * Scans the knowledge base storage directories and syncs with Qdrant.
941+ * Identifies files that exist in storage but haven't been embedded yet,
942+ * and dispatches EmbedFileJob for each missing file.
943+ *
944+ * @returns Object containing success status, message, and counts of scanned/queued files
945+ */
946+ public async scanAndSyncStorage ( ) : Promise < {
947+ success : boolean
948+ message : string
949+ filesScanned ?: number
950+ filesQueued ?: number
951+ } > {
952+ try {
953+ logger . info ( '[RAG] Starting knowledge base sync scan' )
954+
955+ const KB_UPLOADS_PATH = join ( process . cwd ( ) , RagService . UPLOADS_STORAGE_PATH )
956+ const ZIM_PATH = join ( process . cwd ( ) , ZIM_STORAGE_PATH )
957+
958+ const filesInStorage : string [ ] = [ ]
959+
960+ // Force resync of Nomad docs
961+ await this . discoverNomadDocs ( true ) . catch ( ( error ) => {
962+ logger . error ( '[RAG] Error during Nomad docs discovery in sync process:' , error )
963+ } )
964+
965+ // Scan kb_uploads directory
966+ try {
967+ const kbContents = await listDirectoryContentsRecursive ( KB_UPLOADS_PATH )
968+ kbContents . forEach ( ( entry ) => {
969+ if ( entry . type === 'file' ) {
970+ filesInStorage . push ( entry . key )
971+ }
972+ } )
973+ logger . debug ( `[RAG] Found ${ kbContents . length } files in ${ RagService . UPLOADS_STORAGE_PATH } ` )
974+ } catch ( error ) {
975+ if ( error . code === 'ENOENT' ) {
976+ logger . debug ( `[RAG] ${ RagService . UPLOADS_STORAGE_PATH } directory does not exist, skipping` )
977+ } else {
978+ throw error
979+ }
980+ }
981+
982+ // Scan zim directory
983+ try {
984+ const zimContents = await listDirectoryContentsRecursive ( ZIM_PATH )
985+ zimContents . forEach ( ( entry ) => {
986+ if ( entry . type === 'file' ) {
987+ filesInStorage . push ( entry . key )
988+ }
989+ } )
990+ logger . debug ( `[RAG] Found ${ zimContents . length } files in ${ ZIM_STORAGE_PATH } ` )
991+ } catch ( error ) {
992+ if ( error . code === 'ENOENT' ) {
993+ logger . debug ( `[RAG] ${ ZIM_STORAGE_PATH } directory does not exist, skipping` )
994+ } else {
995+ throw error
996+ }
997+ }
998+
999+ logger . info ( `[RAG] Found ${ filesInStorage . length } total files in storage directories` )
1000+
1001+ // Get all stored sources from Qdrant
1002+ await this . _ensureCollection (
1003+ RagService . CONTENT_COLLECTION_NAME ,
1004+ RagService . EMBEDDING_DIMENSION
1005+ )
1006+
1007+ const sourcesInQdrant = new Set < string > ( )
1008+ let offset : string | number | null | Record < string , unknown > = null
1009+ const batchSize = 100
1010+
1011+ // Scroll through all points to get sources
1012+ do {
1013+ const scrollResult = await this . qdrant ! . scroll ( RagService . CONTENT_COLLECTION_NAME , {
1014+ limit : batchSize ,
1015+ offset : offset ,
1016+ with_payload : [ 'source' ] , // Only fetch source field for efficiency
1017+ with_vector : false ,
1018+ } )
1019+
1020+ scrollResult . points . forEach ( ( point ) => {
1021+ const source = point . payload ?. source
1022+ if ( source && typeof source === 'string' ) {
1023+ sourcesInQdrant . add ( source )
1024+ }
1025+ } )
1026+
1027+ offset = scrollResult . next_page_offset || null
1028+ } while ( offset !== null )
1029+
1030+ logger . info ( `[RAG] Found ${ sourcesInQdrant . size } unique sources in Qdrant` )
1031+
1032+ // Find files that are in storage but not in Qdrant
1033+ const filesToEmbed = filesInStorage . filter ( ( filePath ) => ! sourcesInQdrant . has ( filePath ) )
1034+
1035+ logger . info ( `[RAG] Found ${ filesToEmbed . length } files that need embedding` )
1036+
1037+ if ( filesToEmbed . length === 0 ) {
1038+ return {
1039+ success : true ,
1040+ message : 'Knowledge base is already in sync' ,
1041+ filesScanned : filesInStorage . length ,
1042+ filesQueued : 0 ,
1043+ }
1044+ }
1045+
1046+ // Import EmbedFileJob dynamically to avoid circular dependencies
1047+ const { EmbedFileJob } = await import ( '#jobs/embed_file_job' )
1048+
1049+ // Dispatch jobs for files that need embedding
1050+ let queuedCount = 0
1051+ for ( const filePath of filesToEmbed ) {
1052+ try {
1053+ const fileName = filePath . split ( / [ / \\ ] / ) . pop ( ) || filePath
1054+ const stats = await getFileStatsIfExists ( filePath )
1055+
1056+ logger . info ( `[RAG] Dispatching embed job for: ${ fileName } ` )
1057+ await EmbedFileJob . dispatch ( {
1058+ filePath : filePath ,
1059+ fileName : fileName ,
1060+ fileSize : stats ?. size ,
1061+ } )
1062+ queuedCount ++
1063+ logger . debug ( `[RAG] Successfully dispatched job for ${ fileName } ` )
1064+ } catch ( fileError ) {
1065+ logger . error ( `[RAG] Error dispatching job for file ${ filePath } :` , fileError )
1066+ }
1067+ }
1068+
1069+ return {
1070+ success : true ,
1071+ message : `Scanned ${ filesInStorage . length } files, queued ${ queuedCount } for embedding` ,
1072+ filesScanned : filesInStorage . length ,
1073+ filesQueued : queuedCount ,
1074+ }
1075+ } catch ( error ) {
1076+ logger . error ( '[RAG] Error scanning and syncing knowledge base:' , error )
1077+ return {
1078+ success : false ,
1079+ message : 'Error scanning and syncing knowledge base' ,
1080+ }
1081+ }
1082+ }
9381083}
0 commit comments