From 4c19754b05c3e89ac7fe8e66b9752c67624b07b4 Mon Sep 17 00:00:00 2001 From: Waleed Latif Date: Thu, 26 Mar 2026 15:09:33 -0700 Subject: [PATCH 01/11] fix(knowledge): enqueue connector docs per-batch to survive sync timeouts --- .../lib/knowledge/connectors/sync-engine.ts | 40 +++++++++---------- 1 file changed, 19 insertions(+), 21 deletions(-) diff --git a/apps/sim/lib/knowledge/connectors/sync-engine.ts b/apps/sim/lib/knowledge/connectors/sync-engine.ts index 458281dd447..8440126c7f7 100644 --- a/apps/sim/lib/knowledge/connectors/sync-engine.ts +++ b/apps/sim/lib/knowledge/connectors/sync-engine.ts @@ -401,8 +401,6 @@ export async function executeSync( const seenExternalIds = new Set() - const pendingProcessing: DocumentData[] = [] - const pendingOps: DocOp[] = [] for (const extDoc of externalDocs) { seenExternalIds.add(extDoc.externalId) @@ -508,10 +506,11 @@ export async function executeSync( }) ) + const batchDocs: DocumentData[] = [] for (let j = 0; j < settled.length; j++) { const outcome = settled[j] if (outcome.status === 'fulfilled') { - pendingProcessing.push(outcome.value) + batchDocs.push(outcome.value) if (batch[j].type === 'add') result.docsAdded++ else result.docsUpdated++ } else { @@ -524,6 +523,23 @@ export async function executeSync( }) } } + + if (batchDocs.length > 0) { + try { + await processDocumentsWithQueue( + batchDocs, + connector.knowledgeBaseId, + {}, + crypto.randomUUID() + ) + } catch (error) { + logger.warn('Failed to enqueue batch for processing — will retry on next sync', { + connectorId, + count: batchDocs.length, + error: error instanceof Error ? error.message : String(error), + }) + } + } } // Skip deletion reconciliation during incremental syncs — results only contain changed docs @@ -608,24 +624,6 @@ export async function executeSync( } } - // Enqueue all added/updated documents for processing in a single batch - if (pendingProcessing.length > 0) { - try { - await processDocumentsWithQueue( - pendingProcessing, - connector.knowledgeBaseId, - {}, - crypto.randomUUID() - ) - } catch (error) { - logger.warn('Failed to enqueue documents for processing — will retry on next sync', { - connectorId, - count: pendingProcessing.length, - error: error instanceof Error ? error.message : String(error), - }) - } - } - await completeSyncLog(syncLogId, 'completed', result) const [{ count: actualDocCount }] = await db From f9785be5be5541b84d39f1436d07f09a4e5f9a7e Mon Sep 17 00:00:00 2001 From: Waleed Latif Date: Thu, 26 Mar 2026 15:55:19 -0700 Subject: [PATCH 02/11] fix(connectors): convert all connectors to contentDeferred pattern and fix validation issues All 10 connectors now use contentDeferred: true in listDocuments, returning lightweight metadata stubs instead of downloading content during listing. Content is fetched lazily via getDocument only for new/changed documents, preventing Trigger.dev task timeouts on large syncs. Connector-specific fixes from validation audit: - Google Drive: metadata-based contentHash, orderBy for deterministic pagination, precise maxFiles, byte-length size check with truncation warning - OneDrive: metadata-based contentHash, orderBy for deterministic pagination - SharePoint: metadata-based contentHash, byte-length size check - Dropbox: metadata-based contentHash using content_hash field - Notion: code/equation block extraction, empty page fallback to title, reduced CHILD_PAGE_CONCURRENCY to 5, syncContext parameter - Confluence: syncContext caching for cloudId, reduced label concurrency to 5 - Gmail: use joinTagArray for label tags - Obsidian: syncRunId-based stub hash for forced re-fetch, mtime-based hash in getDocument, .trim() on vaultUrl, lightweight validateConfig - Evernote: retryOptions threaded through apiFindNotesMetadata and apiGetNote - GitHub: added contentDeferred: false to getDocument, syncContext parameter Infrastructure: - sync-engine: added syncRunId to syncContext for Obsidian change detection - confluence/utils: replaced raw fetch with fetchWithRetry, added retryOptions - oauth: added supportsRefreshTokenRotation: false for Dropbox - Updated add-connector and validate-connector skills with contentDeferred docs Co-Authored-By: Claude Opus 4.6 --- .agents/skills/add-connector/SKILL.md | 115 ++++++++++++++++-- .agents/skills/validate-connector/SKILL.md | 19 ++- apps/sim/connectors/confluence/confluence.ts | 101 +++++++-------- apps/sim/connectors/dropbox/dropbox.ts | 66 ++++------ apps/sim/connectors/evernote/evernote.ts | 68 +++++------ apps/sim/connectors/github/github.ts | 4 +- apps/sim/connectors/gmail/gmail.ts | 79 +++++------- .../connectors/google-drive/google-drive.ts | 92 +++++++------- apps/sim/connectors/notion/notion.ts | 95 +++++++-------- apps/sim/connectors/obsidian/obsidian.ts | 76 ++++-------- apps/sim/connectors/onedrive/onedrive.ts | 87 ++++++------- apps/sim/connectors/sharepoint/sharepoint.ts | 93 ++++++-------- .../lib/knowledge/connectors/sync-engine.ts | 2 +- apps/sim/lib/oauth/oauth.ts | 1 + apps/sim/tools/confluence/utils.ts | 25 ++-- 15 files changed, 463 insertions(+), 460 deletions(-) diff --git a/.agents/skills/add-connector/SKILL.md b/.agents/skills/add-connector/SKILL.md index b26718f92f8..1336fd5502d 100644 --- a/.agents/skills/add-connector/SKILL.md +++ b/.agents/skills/add-connector/SKILL.md @@ -71,12 +71,14 @@ export const {service}Connector: ConnectorConfig = { ], listDocuments: async (accessToken, sourceConfig, cursor) => { - // Paginate via cursor, extract text, compute SHA-256 hash + // Return metadata stubs with contentDeferred: true (if per-doc content fetch needed) + // Or full documents with content (if list API returns content inline) // Return { documents: ExternalDocument[], nextCursor?, hasMore } }, getDocument: async (accessToken, sourceConfig, externalId) => { - // Return ExternalDocument or null + // Fetch full content for a single document + // Return ExternalDocument with contentDeferred: false, or null }, validateConfig: async (accessToken, sourceConfig) => { @@ -281,26 +283,110 @@ Every document returned from `listDocuments`/`getDocument` must include: { externalId: string // Source-specific unique ID title: string // Document title - content: string // Extracted plain text + content: string // Extracted plain text (or '' if contentDeferred) + contentDeferred?: boolean // true = content will be fetched via getDocument mimeType: 'text/plain' // Always text/plain (content is extracted) - contentHash: string // SHA-256 of content (change detection) + contentHash: string // Metadata-based hash for change detection sourceUrl?: string // Link back to original (stored on document record) metadata?: Record // Source-specific data (fed to mapTags) } ``` -## Content Hashing (Required) +## Content Deferral (Required for file/content-download connectors) -The sync engine uses content hashes for change detection: +**All connectors that require per-document API calls to fetch content MUST use `contentDeferred: true`.** This is the standard pattern — `listDocuments` returns lightweight metadata stubs, and content is fetched lazily by the sync engine via `getDocument` only for new/changed documents. + +This pattern is critical for reliability: the sync engine processes documents in batches and enqueues each batch for processing immediately. If a sync times out, all previously-batched documents are already queued. Without deferral, content downloads during listing can exhaust the sync task's time budget before any documents are saved. + +### When to use `contentDeferred: true` + +- The service's list API does NOT return document content (only metadata) +- Content requires a separate download/export API call per document +- Examples: Google Drive, OneDrive, SharePoint, Dropbox, Notion, Confluence, Gmail, Obsidian, Evernote, GitHub + +### When NOT to use `contentDeferred` + +- The list API already returns the full content inline (e.g., Slack messages, Reddit posts, HubSpot notes) +- No per-document API call is needed to get content + +### Content Hash Strategy + +Use a **metadata-based** `contentHash` — never a content-based hash. The hash must be derivable from the list response metadata alone, so the sync engine can detect changes without downloading content. + +Good metadata hash sources: +- `modifiedTime` / `lastModifiedDateTime` — changes when file is edited +- Git blob SHA — unique per content version +- API-provided content hash (e.g., Dropbox `content_hash`) +- Version number (e.g., Confluence page version) + +Format: `{service}:{id}:{changeIndicator}` ```typescript -async function computeContentHash(content: string): Promise { - const data = new TextEncoder().encode(content) - const hashBuffer = await crypto.subtle.digest('SHA-256', data) - return Array.from(new Uint8Array(hashBuffer)).map(b => b.toString(16).padStart(2, '0')).join('') +// Google Drive: modifiedTime changes on edit +contentHash: `gdrive:${file.id}:${file.modifiedTime ?? ''}` + +// GitHub: blob SHA is a content-addressable hash +contentHash: `gitsha:${item.sha}` + +// Dropbox: API provides content_hash +contentHash: `dropbox:${entry.id}:${entry.content_hash ?? entry.server_modified}` + +// Confluence: version number increments on edit +contentHash: `confluence:${page.id}:${page.version.number}` +``` + +**Critical invariant:** The `contentHash` MUST be identical whether produced by `listDocuments` (stub) or `getDocument` (full doc). Both should use the same stub function to guarantee this. + +### Implementation Pattern + +```typescript +// 1. Create a stub function (sync, no API calls) +function fileToStub(file: ServiceFile): ExternalDocument { + return { + externalId: file.id, + title: file.name || 'Untitled', + content: '', + contentDeferred: true, + mimeType: 'text/plain', + sourceUrl: `https://service.com/file/${file.id}`, + contentHash: `service:${file.id}:${file.modifiedTime ?? ''}`, + metadata: { /* fields needed by mapTags */ }, + } +} + +// 2. listDocuments returns stubs (fast, metadata only) +listDocuments: async (accessToken, sourceConfig, cursor) => { + const response = await fetchWithRetry(listUrl, { ... }) + const files = (await response.json()).files + const documents = files.map(fileToStub) + return { documents, nextCursor, hasMore } +} + +// 3. getDocument fetches content and returns full doc with SAME contentHash +getDocument: async (accessToken, sourceConfig, externalId) => { + const metadata = await fetchWithRetry(metadataUrl, { ... }) + const file = await metadata.json() + if (file.trashed) return null + + try { + const content = await fetchContent(accessToken, file) + if (!content.trim()) return null + const stub = fileToStub(file) + return { ...stub, content, contentDeferred: false } + } catch (error) { + logger.warn(`Failed to fetch content for: ${file.name}`, { error }) + return null + } } ``` +### Reference Implementations + +- **Google Drive**: `connectors/google-drive/google-drive.ts` — file download/export with `modifiedTime` hash +- **GitHub**: `connectors/github/github.ts` — git blob SHA hash +- **Notion**: `connectors/notion/notion.ts` — blocks API with `last_edited_time` hash +- **Confluence**: `connectors/confluence/confluence.ts` — version number hash + ## tagDefinitions — Declared Tag Definitions Declare which tags the connector populates using semantic IDs. Shown in the add-connector modal as opt-out checkboxes. @@ -409,7 +495,10 @@ export const CONNECTOR_REGISTRY: ConnectorRegistry = { ## Reference Implementations -- **OAuth**: `apps/sim/connectors/confluence/confluence.ts` — multiple config field types, `mapTags`, label fetching +- **OAuth + contentDeferred**: `apps/sim/connectors/google-drive/google-drive.ts` — file download with metadata-based hash, `orderBy` for deterministic pagination +- **OAuth + contentDeferred (blocks API)**: `apps/sim/connectors/notion/notion.ts` — complex block content extraction deferred to `getDocument` +- **OAuth + contentDeferred (git)**: `apps/sim/connectors/github/github.ts` — blob SHA hash, tree listing +- **OAuth + inline content**: `apps/sim/connectors/confluence/confluence.ts` — multiple config field types, `mapTags`, label fetching - **API key**: `apps/sim/connectors/fireflies/fireflies.ts` — GraphQL API with Bearer token auth ## Checklist @@ -425,7 +514,9 @@ export const CONNECTOR_REGISTRY: ConnectorRegistry = { - `selectorKey` exists in `hooks/selectors/registry.ts` - `dependsOn` references selector field IDs (not `canonicalParamId`) - Dependency `canonicalParamId` values exist in `SELECTOR_CONTEXT_FIELDS` -- [ ] `listDocuments` handles pagination and computes content hashes +- [ ] `listDocuments` handles pagination with metadata-based content hashes +- [ ] `contentDeferred: true` used if content requires per-doc API calls (file download, export, blocks fetch) +- [ ] `contentHash` is metadata-based (not content-based) and identical between stub and `getDocument` - [ ] `sourceUrl` set on each ExternalDocument (full URL, not relative) - [ ] `metadata` includes source-specific data for tag mapping - [ ] `tagDefinitions` declared for each semantic key returned by `mapTags` diff --git a/.agents/skills/validate-connector/SKILL.md b/.agents/skills/validate-connector/SKILL.md index 4bafaa07dcb..ceae7d4542c 100644 --- a/.agents/skills/validate-connector/SKILL.md +++ b/.agents/skills/validate-connector/SKILL.md @@ -141,12 +141,24 @@ For each API endpoint the connector calls: ## Step 6: Validate Data Transformation +### Content Deferral (CRITICAL) +Connectors that require per-document API calls to fetch content (file download, export, blocks fetch) MUST use `contentDeferred: true`. This is the standard pattern for reliability — without it, content downloads during listing can exhaust the sync task's time budget before any documents are saved. + +- [ ] If the connector downloads content per-doc during `listDocuments`, it MUST use `contentDeferred: true` instead +- [ ] `listDocuments` returns lightweight stubs with `content: ''` and `contentDeferred: true` +- [ ] `getDocument` fetches actual content and returns the full document with `contentDeferred: false` +- [ ] A shared stub function (e.g., `fileToStub`) is used by both `listDocuments` and `getDocument` to guarantee `contentHash` consistency +- [ ] `contentHash` is metadata-based (e.g., `service:{id}:{modifiedTime}`), NOT content-based — it must be derivable from list metadata alone +- [ ] The `contentHash` is identical whether produced by `listDocuments` or `getDocument` + +Connectors where the list API already returns content inline (e.g., Slack messages, Reddit posts) do NOT need `contentDeferred`. + ### ExternalDocument Construction - [ ] `externalId` is a stable, unique identifier from the source API - [ ] `title` is extracted from the correct field and has a sensible fallback (e.g., `'Untitled'`) - [ ] `content` is plain text — HTML content is stripped using `htmlToPlainText` from `@/connectors/utils` - [ ] `mimeType` is `'text/plain'` -- [ ] `contentHash` is computed using `computeContentHash` from `@/connectors/utils` +- [ ] `contentHash` uses a metadata-based format (e.g., `service:{id}:{modifiedTime}`) for connectors with `contentDeferred: true`, or `computeContentHash` from `@/connectors/utils` for inline-content connectors - [ ] `sourceUrl` is a valid, complete URL back to the original resource (not relative) - [ ] `metadata` contains all fields referenced by `mapTags` and `tagDefinitions` @@ -200,6 +212,8 @@ For each API endpoint the connector calls: - [ ] Fetches a single document by `externalId` - [ ] Returns `null` for 404 / not found (does not throw) - [ ] Returns the same `ExternalDocument` shape as `listDocuments` +- [ ] If `listDocuments` uses `contentDeferred: true`, `getDocument` MUST fetch actual content and return `contentDeferred: false` +- [ ] If `listDocuments` uses `contentDeferred: true`, `getDocument` MUST use the same stub function to ensure `contentHash` is identical - [ ] Handles all content types that `listDocuments` can produce (e.g., if `listDocuments` returns both pages and blogposts, `getDocument` must handle both — not hardcode one endpoint) - [ ] Forwards `syncContext` if it needs cached state (user names, field maps, etc.) - [ ] Error handling is graceful (catches, logs, returns null or throws with context) @@ -253,6 +267,8 @@ Group findings by severity: - Missing error handling that would crash the sync - `requiredScopes` not a subset of OAuth provider scopes - Query/filter injection: user-controlled values interpolated into OData `$filter`, SOQL, or query strings without escaping +- Per-document content download in `listDocuments` without `contentDeferred: true` — causes sync timeouts for large document sets +- `contentHash` mismatch between `listDocuments` stub and `getDocument` return — causes unnecessary re-processing every sync **Warning** (incorrect behavior, data quality issues, or convention violations): - HTML content not stripped via `htmlToPlainText` @@ -300,6 +316,7 @@ After fixing, confirm: - [ ] Validated scopes are sufficient for all API endpoints the connector calls - [ ] Validated token refresh config (`useBasicAuth`, `supportsRefreshTokenRotation`) - [ ] Validated pagination: cursor names, page sizes, hasMore logic, no silent caps +- [ ] Validated content deferral: `contentDeferred: true` used when per-doc content fetch required, metadata-based `contentHash` consistent between stub and `getDocument` - [ ] Validated data transformation: plain text extraction, HTML stripping, content hashing - [ ] Validated tag definitions match mapTags output, correct fieldTypes - [ ] Validated config fields: canonical pairs, selector keys, required flags diff --git a/apps/sim/connectors/confluence/confluence.ts b/apps/sim/connectors/confluence/confluence.ts index 319f6654488..f20f8fc385b 100644 --- a/apps/sim/connectors/confluence/confluence.ts +++ b/apps/sim/connectors/confluence/confluence.ts @@ -2,7 +2,7 @@ import { createLogger } from '@sim/logger' import { ConfluenceIcon } from '@/components/icons' import { fetchWithRetry, VALIDATE_RETRY_OPTIONS } from '@/lib/knowledge/documents/utils' import type { ConnectorConfig, ExternalDocument, ExternalDocumentList } from '@/connectors/types' -import { computeContentHash, htmlToPlainText, joinTagArray, parseTagDate } from '@/connectors/utils' +import { htmlToPlainText, joinTagArray, parseTagDate } from '@/connectors/utils' import { getConfluenceCloudId } from '@/tools/confluence/utils' const logger = createLogger('ConfluenceConnector') @@ -17,7 +17,7 @@ export function escapeCql(value: string): string { /** * Fetches labels for a batch of page IDs using the v2 labels endpoint. */ -const LABEL_FETCH_CONCURRENCY = 10 +const LABEL_FETCH_CONCURRENCY = 5 async function fetchLabelsForPages( cloudId: string, @@ -68,35 +68,29 @@ async function fetchLabelsForPages( } /** - * Converts a v1 CQL search result item to an ExternalDocument. + * Converts a v1 CQL search result item to a lightweight metadata stub. */ -async function cqlResultToDocument( - item: Record, - domain: string -): Promise { - const body = item.body as Record> | undefined - const rawContent = body?.storage?.value || '' - const plainText = htmlToPlainText(rawContent) - const contentHash = await computeContentHash(plainText) - +function cqlResultToStub(item: Record, domain: string): ExternalDocument { const version = item.version as Record | undefined const links = item._links as Record | undefined const metadata = item.metadata as Record | undefined const labelsWrapper = metadata?.labels as Record | undefined const labelResults = (labelsWrapper?.results || []) as Record[] const labels = labelResults.map((l) => l.name as string) + const versionNumber = version?.number ?? '' return { externalId: String(item.id), title: (item.title as string) || 'Untitled', - content: plainText, + content: '', + contentDeferred: true, mimeType: 'text/plain', sourceUrl: links?.webui ? `https://${domain}/wiki${links.webui}` : undefined, - contentHash, + contentHash: `confluence:${item.id}:${versionNumber}`, metadata: { spaceId: (item.space as Record)?.key, status: item.status, - version: version?.number, + version: versionNumber, labels, lastModified: version?.when, }, @@ -238,10 +232,15 @@ export const confluenceConnector: ConnectorConfig = { getDocument: async ( accessToken: string, sourceConfig: Record, - externalId: string + externalId: string, + syncContext?: Record ): Promise => { const domain = sourceConfig.domain as string - const cloudId = await getConfluenceCloudId(domain, accessToken) + let cloudId = syncContext?.cloudId as string | undefined + if (!cloudId) { + cloudId = await getConfluenceCloudId(domain, accessToken) + if (syncContext) syncContext.cloudId = cloudId + } // Try pages first, fall back to blogposts if not found let page: Record | null = null @@ -269,26 +268,26 @@ export const confluenceConnector: ConnectorConfig = { const storage = body?.storage as Record | undefined const rawContent = (storage?.value as string) || '' const plainText = htmlToPlainText(rawContent) - const contentHash = await computeContentHash(plainText) - // Fetch labels for this page const labelMap = await fetchLabelsForPages(cloudId, accessToken, [String(page.id)]) const labels = labelMap.get(String(page.id)) ?? [] const links = page._links as Record | undefined const version = page.version as Record | undefined + const versionNumber = version?.number ?? '' return { externalId: String(page.id), title: (page.title as string) || 'Untitled', content: plainText, + contentDeferred: false, mimeType: 'text/plain', sourceUrl: links?.webui ? `https://${domain}/wiki${links.webui}` : undefined, - contentHash, + contentHash: `confluence:${page.id}:${versionNumber}`, metadata: { spaceId: page.spaceId, status: page.status, - version: version?.number, + version: versionNumber, labels, lastModified: version?.createdAt, }, @@ -379,7 +378,6 @@ async function listDocumentsV2( ): Promise { const queryParams = new URLSearchParams() queryParams.append('limit', '250') - queryParams.append('body-format', 'storage') if (cursor) { queryParams.append('cursor', cursor) } @@ -409,35 +407,30 @@ async function listDocumentsV2( const data = await response.json() const results = data.results || [] - const pageIds = results.map((page: Record) => String(page.id)) - const labelsByPageId = await fetchLabelsForPages(cloudId, accessToken, pageIds) - - const documents: ExternalDocument[] = await Promise.all( - results.map(async (page: Record) => { - const rawContent = (page.body as Record>)?.storage?.value || '' - const plainText = htmlToPlainText(rawContent) - const contentHash = await computeContentHash(plainText) - const pageId = String(page.id) - - return { - externalId: pageId, - title: (page.title as string) || 'Untitled', - content: plainText, - mimeType: 'text/plain', - sourceUrl: (page._links as Record)?.webui - ? `https://${domain}/wiki${(page._links as Record).webui}` - : undefined, - contentHash, - metadata: { - spaceId: page.spaceId, - status: page.status, - version: (page.version as Record)?.number, - labels: labelsByPageId.get(pageId) ?? [], - lastModified: (page.version as Record)?.createdAt, - }, - } - }) - ) + const documents: ExternalDocument[] = results.map((page: Record) => { + const pageId = String(page.id) + const version = page.version as Record | undefined + const versionNumber = version?.number ?? '' + + return { + externalId: pageId, + title: (page.title as string) || 'Untitled', + content: '', + contentDeferred: true, + mimeType: 'text/plain', + sourceUrl: (page._links as Record)?.webui + ? `https://${domain}/wiki${(page._links as Record).webui}` + : undefined, + contentHash: `confluence:${pageId}:${versionNumber}`, + metadata: { + spaceId: page.spaceId, + status: page.status, + version: versionNumber, + labels: [], + lastModified: version?.createdAt, + }, + } + }) let nextCursor: string | undefined const nextLink = (data._links as Record)?.next @@ -584,7 +577,7 @@ async function listDocumentsViaCql( queryParams.append('cql', cql) queryParams.append('limit', String(limit)) queryParams.append('start', String(start)) - queryParams.append('expand', 'body.storage,version,metadata.labels') + queryParams.append('expand', 'version,metadata.labels') const url = `https://api.atlassian.com/ex/confluence/${cloudId}/wiki/rest/api/content/search?${queryParams.toString()}` @@ -610,8 +603,8 @@ async function listDocumentsViaCql( const data = await response.json() const results = data.results || [] - const documents: ExternalDocument[] = await Promise.all( - results.map((item: Record) => cqlResultToDocument(item, domain)) + const documents: ExternalDocument[] = results.map((item: Record) => + cqlResultToStub(item, domain) ) const totalFetched = ((syncContext?.totalDocsFetched as number) ?? 0) + documents.length diff --git a/apps/sim/connectors/dropbox/dropbox.ts b/apps/sim/connectors/dropbox/dropbox.ts index 510078a4dd8..68ca60eabad 100644 --- a/apps/sim/connectors/dropbox/dropbox.ts +++ b/apps/sim/connectors/dropbox/dropbox.ts @@ -2,7 +2,7 @@ import { createLogger } from '@sim/logger' import { DropboxIcon } from '@/components/icons' import { fetchWithRetry, VALIDATE_RETRY_OPTIONS } from '@/lib/knowledge/documents/utils' import type { ConnectorConfig, ExternalDocument, ExternalDocumentList } from '@/connectors/types' -import { computeContentHash, htmlToPlainText, parseTagDate } from '@/connectors/utils' +import { htmlToPlainText, parseTagDate } from '@/connectors/utils' const logger = createLogger('DropboxConnector') @@ -33,6 +33,7 @@ interface DropboxFileEntry { client_modified?: string server_modified?: string size?: number + content_hash?: string is_downloadable?: boolean } @@ -76,37 +77,20 @@ async function downloadFileContent(accessToken: string, filePath: string): Promi return text } -async function fileToDocument( - accessToken: string, - entry: DropboxFileEntry -): Promise { - try { - const content = await downloadFileContent(accessToken, entry.path_lower) - if (!content.trim()) { - logger.info(`Skipping empty file: ${entry.name}`) - return null - } - - const contentHash = await computeContentHash(content) - - return { - externalId: entry.id, - title: entry.name, - content, - mimeType: 'text/plain', - sourceUrl: `https://www.dropbox.com/home${entry.path_display}`, - contentHash, - metadata: { - path: entry.path_display, - lastModified: entry.server_modified || entry.client_modified, - fileSize: entry.size, - }, - } - } catch (error) { - logger.warn(`Failed to extract content from file: ${entry.name}`, { - error: error instanceof Error ? error.message : String(error), - }) - return null +function fileToStub(entry: DropboxFileEntry): ExternalDocument { + return { + externalId: entry.id, + title: entry.name, + content: '', + contentDeferred: true, + mimeType: 'text/plain', + sourceUrl: `https://www.dropbox.com/home${entry.path_display}`, + contentHash: `dropbox:${entry.id}:${entry.content_hash ?? entry.server_modified ?? ''}`, + metadata: { + path: entry.path_display, + lastModified: entry.server_modified || entry.client_modified, + fileSize: entry.size, + }, } } @@ -210,18 +194,12 @@ export const dropboxConnector: ConnectorConfig = { const maxFiles = sourceConfig.maxFiles ? Number(sourceConfig.maxFiles) : 0 const previouslyFetched = (syncContext?.totalDocsFetched as number) ?? 0 - const CONCURRENCY = 5 - const documents: ExternalDocument[] = [] - for (let i = 0; i < supportedFiles.length; i += CONCURRENCY) { - if (maxFiles > 0 && previouslyFetched + documents.length >= maxFiles) break - const batch = supportedFiles.slice(i, i + CONCURRENCY) - const results = await Promise.all(batch.map((entry) => fileToDocument(accessToken, entry))) - documents.push(...(results.filter(Boolean) as ExternalDocument[])) - } + let documents = supportedFiles.map(fileToStub) + if (maxFiles > 0) { const remaining = maxFiles - previouslyFetched if (documents.length > remaining) { - documents.splice(remaining) + documents = documents.slice(0, remaining) } } @@ -260,7 +238,11 @@ export const dropboxConnector: ConnectorConfig = { if (!isSupportedFile(entry)) return null - return fileToDocument(accessToken, entry) + const content = await downloadFileContent(accessToken, entry.path_lower) + if (!content.trim()) return null + + const stub = fileToStub(entry) + return { ...stub, content, contentDeferred: false } } catch (error) { logger.warn(`Failed to fetch document ${externalId}`, { error: error instanceof Error ? error.message : String(error), diff --git a/apps/sim/connectors/evernote/evernote.ts b/apps/sim/connectors/evernote/evernote.ts index 51b6f4da517..553f3d7d04b 100644 --- a/apps/sim/connectors/evernote/evernote.ts +++ b/apps/sim/connectors/evernote/evernote.ts @@ -11,7 +11,7 @@ import { TYPE_STRUCT, } from '@/app/api/tools/evernote/lib/thrift' import type { ConnectorConfig, ExternalDocument, ExternalDocumentList } from '@/connectors/types' -import { computeContentHash, htmlToPlainText, joinTagArray, parseTagDate } from '@/connectors/utils' +import { htmlToPlainText, joinTagArray, parseTagDate } from '@/connectors/utils' const logger = createLogger('EvernoteConnector') @@ -231,7 +231,8 @@ async function apiFindNotesMetadata( token: string, offset: number, maxNotes: number, - notebookGuid?: string + notebookGuid?: string, + retryOptions?: Parameters[2] ): Promise<{ totalNotes: number; notes: NoteMetadata[] }> { const w = new ThriftWriter() w.writeMessageBegin('findNotesMetadata', 0) @@ -256,7 +257,7 @@ async function apiFindNotesMetadata( w.writeFieldStop() - const r = await callNoteStore(token, w) + const r = await callNoteStore(token, w, retryOptions) let totalNotes = 0 const notes: NoteMetadata[] = [] @@ -312,7 +313,11 @@ async function apiFindNotesMetadata( * 5:withResourcesRecognition, 6:withResourcesAlternateData) * Note: 1:guid, 2:title, 3:content, 6:created, 7:updated, 11:notebookGuid, 12:tagGuids */ -async function apiGetNote(token: string, guid: string): Promise { +async function apiGetNote( + token: string, + guid: string, + retryOptions?: Parameters[2] +): Promise { const w = new ThriftWriter() w.writeMessageBegin('getNote', 0) w.writeStringField(1, token) @@ -323,7 +328,7 @@ async function apiGetNote(token: string, guid: string): Promise { w.writeBoolField(6, false) // withResourcesAlternateData w.writeFieldStop() - const r = await callNoteStore(token, w) + const r = await callNoteStore(token, w, retryOptions) let noteGuid = '' let title = '' @@ -409,36 +414,25 @@ export const evernoteConnector: ConnectorConfig = { const result = await apiFindNotesMetadata(accessToken, offset, NOTES_PER_PAGE, notebookGuid) - const documents: ExternalDocument[] = [] - - for (const meta of result.notes) { - try { - const note = await apiGetNote(accessToken, meta.guid) - const plainText = htmlToPlainText(note.content) - const contentHash = await computeContentHash(plainText) - const tagNames = note.tagGuids.map((g) => tagMap[g]).filter(Boolean) - - documents.push({ - externalId: note.guid, - title: note.title || 'Untitled', - content: plainText, - mimeType: 'text/plain', - sourceUrl: `https://${host}/shard/${shardId}/nl/${userId}/${note.guid}/`, - contentHash, - metadata: { - tags: tagNames, - notebook: notebookMap[note.notebookGuid] || '', - createdAt: note.created ? new Date(note.created).toISOString() : undefined, - updatedAt: note.updated ? new Date(note.updated).toISOString() : undefined, - }, - }) - } catch (error) { - logger.warn('Failed to fetch note content', { - guid: meta.guid, - error: error instanceof Error ? error.message : String(error), - }) + const documents: ExternalDocument[] = result.notes.map((meta) => { + const tagNames = meta.tagGuids.map((g) => tagMap[g]).filter(Boolean) + + return { + externalId: meta.guid, + title: meta.title || 'Untitled', + content: '', + contentDeferred: true, + mimeType: 'text/plain', + sourceUrl: `https://${host}/shard/${shardId}/nl/${userId}/${meta.guid}/`, + contentHash: `evernote:${meta.guid}:${meta.updated}`, + metadata: { + tags: tagNames, + notebook: notebookMap[meta.notebookGuid] || '', + createdAt: meta.created ? new Date(meta.created).toISOString() : undefined, + updatedAt: meta.updated ? new Date(meta.updated).toISOString() : undefined, + }, } - } + }) const nextOffset = offset + result.notes.length const hasMore = nextOffset < result.totalNotes @@ -459,7 +453,8 @@ export const evernoteConnector: ConnectorConfig = { try { const note = await apiGetNote(accessToken, externalId) const plainText = htmlToPlainText(note.content) - const contentHash = await computeContentHash(plainText) + if (!plainText.trim()) return null + const shardId = extractShardId(accessToken) const userId = extractUserId(accessToken) const host = getHost(accessToken) @@ -492,9 +487,10 @@ export const evernoteConnector: ConnectorConfig = { externalId, title: note.title || 'Untitled', content: plainText, + contentDeferred: false, mimeType: 'text/plain', sourceUrl: `https://${host}/shard/${shardId}/nl/${userId}/${externalId}/`, - contentHash, + contentHash: `evernote:${note.guid}:${note.updated}`, metadata: { tags: tagNames, notebook: notebookName, diff --git a/apps/sim/connectors/github/github.ts b/apps/sim/connectors/github/github.ts index 2ff2276eaff..5ffabc4229d 100644 --- a/apps/sim/connectors/github/github.ts +++ b/apps/sim/connectors/github/github.ts @@ -228,7 +228,8 @@ export const githubConnector: ConnectorConfig = { getDocument: async ( accessToken: string, sourceConfig: Record, - externalId: string + externalId: string, + _syncContext?: Record ): Promise => { const { owner, repo } = parseRepo(sourceConfig.repository as string) const branch = ((sourceConfig.branch as string) || 'main').trim() @@ -264,6 +265,7 @@ export const githubConnector: ConnectorConfig = { externalId, title: path.split('/').pop() || path, content, + contentDeferred: false, mimeType: 'text/plain', sourceUrl: `https://github.com/${owner}/${repo}/blob/${encodeURIComponent(branch)}/${path.split('/').map(encodeURIComponent).join('/')}`, contentHash: `${GIT_SHA_PREFIX}${data.sha as string}`, diff --git a/apps/sim/connectors/gmail/gmail.ts b/apps/sim/connectors/gmail/gmail.ts index 2806664bedc..90fe0ef22d4 100644 --- a/apps/sim/connectors/gmail/gmail.ts +++ b/apps/sim/connectors/gmail/gmail.ts @@ -2,14 +2,13 @@ import { createLogger } from '@sim/logger' import { GmailIcon } from '@/components/icons' import { fetchWithRetry, VALIDATE_RETRY_OPTIONS } from '@/lib/knowledge/documents/utils' import type { ConnectorConfig, ExternalDocument, ExternalDocumentList } from '@/connectors/types' -import { computeContentHash, htmlToPlainText, parseTagDate } from '@/connectors/utils' +import { htmlToPlainText, joinTagArray, parseTagDate } from '@/connectors/utils' const logger = createLogger('GmailConnector') const GMAIL_API_BASE = 'https://gmail.googleapis.com/gmail/v1/users/me' const DEFAULT_MAX_THREADS = 500 const THREADS_PER_PAGE = 100 -const CONCURRENCY = 5 interface GmailHeader { name: string @@ -281,6 +280,27 @@ async function resolveLabelNames( .filter((name) => !name.startsWith('CATEGORY_') && name !== 'UNREAD') } +/** + * Creates a lightweight document stub from a thread list entry. + * Uses metadata-based contentHash for change detection without downloading content. + */ +function threadToStub(thread: { + id: string + snippet?: string + historyId?: string +}): ExternalDocument { + return { + externalId: thread.id, + title: thread.snippet || 'Untitled Thread', + content: '', + contentDeferred: true, + mimeType: 'text/plain', + sourceUrl: `https://mail.google.com/mail/u/0/#inbox/${thread.id}`, + contentHash: `gmail:${thread.id}:${thread.historyId ?? ''}`, + metadata: {}, + } +} + export const gmailConnector: ConnectorConfig = { id: 'gmail', name: 'Gmail', @@ -421,51 +441,13 @@ export const gmailConnector: ConnectorConfig = { } const data = await response.json() - const threadStubs = (data.threads || []) as { id: string }[] + const threads = (data.threads || []) as { id: string; snippet?: string; historyId?: string }[] - if (threadStubs.length === 0) { + if (threads.length === 0) { return { documents: [], hasMore: false } } - // Fetch full threads with concurrency limit - const documents: ExternalDocument[] = [] - for (let i = 0; i < threadStubs.length; i += CONCURRENCY) { - const batch = threadStubs.slice(i, i + CONCURRENCY) - const results = await Promise.all( - batch.map(async (stub) => { - try { - const thread = await fetchThread(accessToken, stub.id) - if (!thread) return null - - const { content, subject, metadata } = formatThread(thread) - if (!content.trim()) return null - - // Resolve label names - const labelIds = (metadata.labelIds as string[]) || [] - const labelNames = await resolveLabelNames(accessToken, labelIds, syncContext) - metadata.labels = labelNames - - const contentHash = await computeContentHash(content) - - return { - externalId: thread.id, - title: subject, - content, - mimeType: 'text/plain' as const, - sourceUrl: `https://mail.google.com/mail/u/0/#inbox/${thread.id}`, - contentHash, - metadata, - } - } catch (error) { - logger.warn(`Failed to process thread ${stub.id}`, { - error: error instanceof Error ? error.message : String(error), - }) - return null - } - }) - ) - documents.push(...(results.filter(Boolean) as ExternalDocument[])) - } + const documents = threads.map(threadToStub) const newTotal = totalFetched + documents.length if (syncContext) syncContext.totalThreadsFetched = newTotal @@ -497,15 +479,14 @@ export const gmailConnector: ConnectorConfig = { const labelNames = await resolveLabelNames(accessToken, labelIds, syncContext) metadata.labels = labelNames - const contentHash = await computeContentHash(content) - return { externalId: thread.id, title: subject, content, + contentDeferred: false, mimeType: 'text/plain', sourceUrl: `https://mail.google.com/mail/u/0/#inbox/${thread.id}`, - contentHash, + contentHash: `gmail:${thread.id}:${thread.historyId ?? ''}`, metadata, } } catch (error) { @@ -630,9 +611,9 @@ export const gmailConnector: ConnectorConfig = { result.from = metadata.from } - const labels = Array.isArray(metadata.labels) ? (metadata.labels as string[]) : [] - if (labels.length > 0) { - result.labels = labels.join(', ') + const labels = joinTagArray(metadata.labels) + if (labels) { + result.labels = labels } if (typeof metadata.messageCount === 'number') { diff --git a/apps/sim/connectors/google-drive/google-drive.ts b/apps/sim/connectors/google-drive/google-drive.ts index 36d31067b10..3addf7a93fe 100644 --- a/apps/sim/connectors/google-drive/google-drive.ts +++ b/apps/sim/connectors/google-drive/google-drive.ts @@ -2,7 +2,7 @@ import { createLogger } from '@sim/logger' import { GoogleDriveIcon } from '@/components/icons' import { fetchWithRetry, VALIDATE_RETRY_OPTIONS } from '@/lib/knowledge/documents/utils' import type { ConnectorConfig, ExternalDocument, ExternalDocumentList } from '@/connectors/types' -import { computeContentHash, htmlToPlainText, joinTagArray, parseTagDate } from '@/connectors/utils' +import { htmlToPlainText, joinTagArray, parseTagDate } from '@/connectors/utils' const logger = createLogger('GoogleDriveConnector') @@ -68,8 +68,9 @@ async function downloadTextFile(accessToken: string, fileId: string): Promise MAX_EXPORT_SIZE) { - return text.slice(0, MAX_EXPORT_SIZE) + if (Buffer.byteLength(text, 'utf8') > MAX_EXPORT_SIZE) { + logger.warn(`File exceeds ${MAX_EXPORT_SIZE} bytes, truncating`) + return Buffer.from(text, 'utf8').subarray(0, MAX_EXPORT_SIZE).toString('utf8') } return text } @@ -143,40 +144,23 @@ function buildQuery(sourceConfig: Record): string { return parts.join(' and ') } -async function fileToDocument( - accessToken: string, - file: DriveFile -): Promise { - try { - const content = await fetchFileContent(accessToken, file.id, file.mimeType) - if (!content.trim()) { - logger.info(`Skipping empty file: ${file.name} (${file.id})`) - return null - } - - const contentHash = await computeContentHash(content) - - return { - externalId: file.id, - title: file.name || 'Untitled', - content, - mimeType: 'text/plain', - sourceUrl: file.webViewLink || `https://drive.google.com/file/d/${file.id}/view`, - contentHash, - metadata: { - originalMimeType: file.mimeType, - modifiedTime: file.modifiedTime, - createdTime: file.createdTime, - owners: file.owners?.map((o) => o.displayName || o.emailAddress).filter(Boolean), - starred: file.starred, - fileSize: file.size ? Number(file.size) : undefined, - }, - } - } catch (error) { - logger.warn(`Failed to extract content from file: ${file.name} (${file.id})`, { - error: error instanceof Error ? error.message : String(error), - }) - return null +function fileToStub(file: DriveFile): ExternalDocument { + return { + externalId: file.id, + title: file.name || 'Untitled', + content: '', + contentDeferred: true, + mimeType: 'text/plain', + sourceUrl: file.webViewLink || `https://drive.google.com/file/d/${file.id}/view`, + contentHash: `gdrive:${file.id}:${file.modifiedTime ?? ''}`, + metadata: { + originalMimeType: file.mimeType, + modifiedTime: file.modifiedTime, + createdTime: file.createdTime, + owners: file.owners?.map((o) => o.displayName || o.emailAddress).filter(Boolean), + starred: file.starred, + fileSize: file.size ? Number(file.size) : undefined, + }, } } @@ -232,9 +216,15 @@ export const googleDriveConnector: ConnectorConfig = { const query = buildQuery(sourceConfig) const pageSize = 100 + const maxFiles = sourceConfig.maxFiles ? Number(sourceConfig.maxFiles) : 0 + const previouslyFetched = (syncContext?.totalDocsFetched as number) ?? 0 + const remaining = maxFiles > 0 ? maxFiles - previouslyFetched : 0 + const effectivePageSize = maxFiles > 0 ? Math.min(pageSize, remaining) : pageSize + const queryParams = new URLSearchParams({ q: query, - pageSize: String(pageSize), + pageSize: String(effectivePageSize), + orderBy: 'modifiedTime desc', fields: 'nextPageToken,files(id,name,mimeType,modifiedTime,createdTime,webViewLink,parents,owners,size,starred)', supportsAllDrives: 'true', @@ -269,17 +259,12 @@ export const googleDriveConnector: ConnectorConfig = { const data = await response.json() const files = (data.files || []) as DriveFile[] - const CONCURRENCY = 5 - const documents: ExternalDocument[] = [] - for (let i = 0; i < files.length; i += CONCURRENCY) { - const batch = files.slice(i, i + CONCURRENCY) - const results = await Promise.all(batch.map((file) => fileToDocument(accessToken, file))) - documents.push(...(results.filter(Boolean) as ExternalDocument[])) - } + const documents = files + .filter((f) => isGoogleWorkspaceFile(f.mimeType) || isSupportedTextFile(f.mimeType)) + .map(fileToStub) - const totalFetched = ((syncContext?.totalDocsFetched as number) ?? 0) + documents.length + const totalFetched = previouslyFetched + documents.length if (syncContext) syncContext.totalDocsFetched = totalFetched - const maxFiles = sourceConfig.maxFiles ? Number(sourceConfig.maxFiles) : 0 const hitLimit = maxFiles > 0 && totalFetched >= maxFiles const nextPageToken = data.nextPageToken as string | undefined @@ -317,7 +302,18 @@ export const googleDriveConnector: ConnectorConfig = { if (file.trashed) return null - return fileToDocument(accessToken, file) + try { + const content = await fetchFileContent(accessToken, file.id, file.mimeType) + if (!content.trim()) return null + + const stub = fileToStub(file) + return { ...stub, content, contentDeferred: false } + } catch (error) { + logger.warn(`Failed to fetch content for file: ${file.name} (${file.id})`, { + error: error instanceof Error ? error.message : String(error), + }) + return null + } }, validateConfig: async ( diff --git a/apps/sim/connectors/notion/notion.ts b/apps/sim/connectors/notion/notion.ts index 254fc511598..6c87c7ead14 100644 --- a/apps/sim/connectors/notion/notion.ts +++ b/apps/sim/connectors/notion/notion.ts @@ -2,7 +2,7 @@ import { createLogger } from '@sim/logger' import { NotionIcon } from '@/components/icons' import { fetchWithRetry, VALIDATE_RETRY_OPTIONS } from '@/lib/knowledge/documents/utils' import type { ConnectorConfig, ExternalDocument, ExternalDocumentList } from '@/connectors/types' -import { computeContentHash, joinTagArray, parseTagDate } from '@/connectors/utils' +import { joinTagArray, parseTagDate } from '@/connectors/utils' const logger = createLogger('NotionConnector') @@ -39,6 +39,18 @@ function blocksToPlainText(blocks: Record[]): string { const blockData = block[type] as Record | undefined if (!blockData) return '' + if (type === 'code') { + const richText = blockData.rich_text as Record[] | undefined + const language = (blockData.language as string) || '' + const code = richText ? richTextToPlain(richText) : '' + return language ? `\`\`\`${language}\n${code}\n\`\`\`` : `\`\`\`\n${code}\n\`\`\`` + } + + if (type === 'equation') { + const expression = (blockData.expression as string) || '' + return expression ? `$$${expression}$$` : '' + } + const richText = blockData.rich_text as Record[] | undefined if (!richText) return '' @@ -135,32 +147,25 @@ function extractTags(properties: Record): string[] { } /** - * Converts a Notion page to an ExternalDocument by fetching its block content. + * Converts a Notion page to a lightweight metadata stub (no content fetching). */ -async function pageToExternalDocument( - accessToken: string, - page: Record -): Promise { +function pageToStub(page: Record): ExternalDocument { const pageId = page.id as string const properties = (page.properties || {}) as Record const title = extractTitle(properties) const url = page.url as string + const lastEditedTime = (page.last_edited_time as string) ?? '' - // Fetch page content - const blocks = await fetchAllBlocks(accessToken, pageId) - const plainText = blocksToPlainText(blocks) - const contentHash = await computeContentHash(plainText) - - // Extract tags from multi_select/select properties const tags = extractTags(properties) return { externalId: pageId, title: title || 'Untitled', - content: plainText, + content: '', + contentDeferred: true, mimeType: 'text/plain', sourceUrl: url, - contentHash, + contentHash: `notion:${pageId}:${lastEditedTime}`, metadata: { tags, lastModified: page.last_edited_time as string, @@ -260,7 +265,8 @@ export const notionConnector: ConnectorConfig = { getDocument: async ( accessToken: string, _sourceConfig: Record, - externalId: string + externalId: string, + _syncContext?: Record ): Promise => { const response = await fetchWithRetry(`${NOTION_BASE_URL}/pages/${externalId}`, { method: 'GET', @@ -276,7 +282,20 @@ export const notionConnector: ConnectorConfig = { } const page = await response.json() - return pageToExternalDocument(accessToken, page) + if (page.archived) return null + + try { + const blocks = await fetchAllBlocks(accessToken, externalId) + const blockContent = blocksToPlainText(blocks) + const stub = pageToStub(page) + const content = blockContent.trim() || stub.title + return { ...stub, content, contentDeferred: false } + } catch (error) { + logger.warn(`Failed to fetch content for Notion page: ${externalId}`, { + error: error instanceof Error ? error.message : String(error), + }) + return null + } }, validateConfig: async ( @@ -430,7 +449,7 @@ async function listFromWorkspace( const results = (data.results || []) as Record[] const pages = results.filter((r) => r.object === 'page' && !(r.archived as boolean)) - const documents = await processPages(accessToken, pages) + const documents = pages.map(pageToStub) const totalFetched = ((syncContext?.totalDocsFetched as number) ?? 0) + documents.length if (syncContext) syncContext.totalDocsFetched = totalFetched @@ -485,7 +504,7 @@ async function listFromDatabase( const results = (data.results || []) as Record[] const pages = results.filter((r) => r.object === 'page' && !(r.archived as boolean)) - const documents = await processPages(accessToken, pages) + const documents = pages.map(pageToStub) const totalFetched = ((syncContext?.totalDocsFetched as number) ?? 0) + documents.length if (syncContext) syncContext.totalDocsFetched = totalFetched @@ -504,7 +523,7 @@ async function listFromDatabase( * Lists child pages under a specific parent page. * * Uses the blocks children endpoint to find child_page blocks, - * then fetches each page's content. + * then fetches each page's metadata to build lightweight stubs. */ async function listFromParentPage( accessToken: string, @@ -536,15 +555,17 @@ async function listFromParentPage( } const data = await response.json() - const blocks = (data.results || []) as Record[] + const blockResults = (data.results || []) as Record[] // Filter to child_page blocks only (child_database blocks cannot be fetched via the Pages API) - const childPageIds = blocks.filter((b) => b.type === 'child_page').map((b) => b.id as string) + const childPageIds = blockResults + .filter((b) => b.type === 'child_page') + .map((b) => b.id as string) // Also include the root page itself on the first call (no cursor) const pageIdsToFetch = !cursor ? [rootPageId, ...childPageIds] : childPageIds - // Fetch child pages in concurrent batches + // Fetch page metadata (not content) in concurrent batches to build stubs const CHILD_PAGE_CONCURRENCY = 5 const documents: ExternalDocument[] = [] @@ -568,7 +589,7 @@ async function listFromParentPage( } const page = await pageResponse.json() if (page.archived) return null - return pageToExternalDocument(accessToken, page) + return pageToStub(page) } catch (error) { logger.warn(`Failed to process child page ${pageId}`, { error: error instanceof Error ? error.message : String(error), @@ -592,31 +613,3 @@ async function listFromParentPage( hasMore: hitLimit ? false : data.has_more === true, } } - -/** - * Converts an array of Notion page objects to ExternalDocuments. - */ -async function processPages( - accessToken: string, - pages: Record[] -): Promise { - const CONCURRENCY = 3 - const documents: ExternalDocument[] = [] - for (let i = 0; i < pages.length; i += CONCURRENCY) { - const batch = pages.slice(i, i + CONCURRENCY) - const results = await Promise.all( - batch.map(async (page) => { - try { - return await pageToExternalDocument(accessToken, page) - } catch (error) { - logger.warn(`Failed to process Notion page ${page.id}`, { - error: error instanceof Error ? error.message : String(error), - }) - return null - } - }) - ) - documents.push(...(results.filter(Boolean) as ExternalDocument[])) - } - return documents -} diff --git a/apps/sim/connectors/obsidian/obsidian.ts b/apps/sim/connectors/obsidian/obsidian.ts index 1ad77a6da90..b6f7a3013f3 100644 --- a/apps/sim/connectors/obsidian/obsidian.ts +++ b/apps/sim/connectors/obsidian/obsidian.ts @@ -2,7 +2,7 @@ import { createLogger } from '@sim/logger' import { ObsidianIcon } from '@/components/icons' import { fetchWithRetry, VALIDATE_RETRY_OPTIONS } from '@/lib/knowledge/documents/utils' import type { ConnectorConfig, ExternalDocument, ExternalDocumentList } from '@/connectors/types' -import { computeContentHash, joinTagArray, parseTagDate } from '@/connectors/utils' +import { joinTagArray, parseTagDate } from '@/connectors/utils' const logger = createLogger('ObsidianConnector') @@ -24,7 +24,7 @@ interface NoteJson { * Normalizes the vault URL by removing trailing slashes. */ function normalizeVaultUrl(url: string): string { - return url.replace(/\/+$/, '') + return url.trim().replace(/\/+$/, '') } /** @@ -198,51 +198,20 @@ export const obsidianConnector: ConnectorConfig = { const offset = cursor ? Number(cursor) : 0 const pageFiles = allFiles.slice(offset, offset + DOCS_PER_PAGE) - const documents: ExternalDocument[] = [] - - const BATCH_SIZE = 5 - for (let i = 0; i < pageFiles.length; i += BATCH_SIZE) { - const batch = pageFiles.slice(i, i + BATCH_SIZE) - const results = await Promise.all( - batch.map(async (filePath) => { - try { - const note = await fetchNote(baseUrl, accessToken, filePath) - const content = note.content || '' - const contentHash = await computeContentHash(content) - - return { - externalId: filePath, - title: titleFromPath(filePath), - content, - mimeType: 'text/plain' as const, - sourceUrl: `${baseUrl}/vault/${filePath.split('/').map(encodeURIComponent).join('/')}`, - contentHash, - metadata: { - tags: note.tags, - frontmatter: note.frontmatter, - createdAt: note.stat?.ctime ? new Date(note.stat.ctime).toISOString() : undefined, - modifiedAt: note.stat?.mtime ? new Date(note.stat.mtime).toISOString() : undefined, - size: note.stat?.size, - folder: filePath.includes('/') - ? filePath.substring(0, filePath.lastIndexOf('/')) - : '', - }, - } - } catch (error) { - logger.warn('Failed to fetch note', { - filePath, - error: error instanceof Error ? error.message : String(error), - }) - return null - } - }) - ) - for (const doc of results) { - if (doc) { - documents.push(doc) - } - } - } + const syncRunId = (syncContext?.syncRunId as string) ?? '' + + const documents: ExternalDocument[] = pageFiles.map((filePath) => ({ + externalId: filePath, + title: titleFromPath(filePath), + content: '', + contentDeferred: true, + mimeType: 'text/plain' as const, + sourceUrl: `${baseUrl}/vault/${filePath.split('/').map(encodeURIComponent).join('/')}`, + contentHash: `obsidian:stub:${filePath}:${syncRunId}`, + metadata: { + folder: filePath.includes('/') ? filePath.substring(0, filePath.lastIndexOf('/')) : '', + }, + })) const nextOffset = offset + pageFiles.length const hasMore = nextOffset < allFiles.length @@ -257,7 +226,8 @@ export const obsidianConnector: ConnectorConfig = { getDocument: async ( accessToken: string, sourceConfig: Record, - externalId: string + externalId: string, + _syncContext?: Record ): Promise => { const baseUrl = normalizeVaultUrl( (sourceConfig.vaultUrl as string) || 'https://127.0.0.1:27124' @@ -266,15 +236,15 @@ export const obsidianConnector: ConnectorConfig = { try { const note = await fetchNote(baseUrl, accessToken, externalId) const content = note.content || '' - const contentHash = await computeContentHash(content) return { externalId, title: titleFromPath(externalId), content, + contentDeferred: false, mimeType: 'text/plain', sourceUrl: `${baseUrl}/vault/${externalId.split('/').map(encodeURIComponent).join('/')}`, - contentHash, + contentHash: `obsidian:${externalId}:${note.stat?.mtime ?? ''}`, metadata: { tags: note.tags, frontmatter: note.frontmatter, @@ -329,14 +299,14 @@ export const obsidianConnector: ConnectorConfig = { const folderPath = (sourceConfig.folderPath as string) || '' if (folderPath.trim()) { - const files = await listVaultFiles( + const entries = await listDirectory( baseUrl, accessToken, folderPath.trim(), VALIDATE_RETRY_OPTIONS ) - if (files.length === 0) { - logger.info('Folder path returned no markdown files', { folderPath }) + if (entries.length === 0) { + logger.info('Folder path returned no entries', { folderPath }) } } diff --git a/apps/sim/connectors/onedrive/onedrive.ts b/apps/sim/connectors/onedrive/onedrive.ts index 3557767878f..dc7142670ed 100644 --- a/apps/sim/connectors/onedrive/onedrive.ts +++ b/apps/sim/connectors/onedrive/onedrive.ts @@ -2,7 +2,7 @@ import { createLogger } from '@sim/logger' import { MicrosoftOneDriveIcon } from '@/components/icons' import { fetchWithRetry, VALIDATE_RETRY_OPTIONS } from '@/lib/knowledge/documents/utils' import type { ConnectorConfig, ExternalDocument, ExternalDocumentList } from '@/connectors/types' -import { computeContentHash, htmlToPlainText, parseTagDate } from '@/connectors/utils' +import { htmlToPlainText, parseTagDate } from '@/connectors/utils' const logger = createLogger('OneDriveConnector') @@ -96,42 +96,25 @@ async function fetchFileContent( } /** - * Converts a OneDrive item to an ExternalDocument. + * Converts a OneDrive item to a lightweight metadata stub (no content). */ -async function itemToDocument( - accessToken: string, - item: OneDriveItem -): Promise { - try { - const content = await fetchFileContent(accessToken, item.id, item.name) - if (!content.trim()) { - logger.info(`Skipping empty file: ${item.name} (${item.id})`) - return null - } - - const contentHash = await computeContentHash(content) - - return { - externalId: item.id, - title: item.name || 'Untitled', - content, - mimeType: 'text/plain', - sourceUrl: item.webUrl, - contentHash, - metadata: { - name: item.name, - lastModifiedDateTime: item.lastModifiedDateTime, - createdBy: item.createdBy?.user?.displayName, - size: item.size, - webUrl: item.webUrl, - parentPath: item.parentReference?.path, - }, - } - } catch (error) { - logger.warn(`Failed to extract content from file: ${item.name} (${item.id})`, { - error: error instanceof Error ? error.message : String(error), - }) - return null +function fileToStub(item: OneDriveItem): ExternalDocument { + return { + externalId: item.id, + title: item.name || 'Untitled', + content: '', + contentDeferred: true, + mimeType: 'text/plain', + sourceUrl: item.webUrl, + contentHash: `onedrive:${item.id}:${item.lastModifiedDateTime ?? ''}`, + metadata: { + name: item.name, + lastModifiedDateTime: item.lastModifiedDateTime, + createdBy: item.createdBy?.user?.displayName, + size: item.size, + webUrl: item.webUrl, + parentPath: item.parentReference?.path, + }, } } @@ -199,7 +182,9 @@ export const onedriveConnector: ConnectorConfig = { pageUrl = cursor } } else { - pageUrl = buildListUrl(folderPath) + const baseUrl = buildListUrl(folderPath) + const separator = baseUrl.includes('?') ? '&' : '?' + pageUrl = `${baseUrl}${separator}$orderby=lastModifiedDateTime desc` } logger.info('Listing OneDrive files', { @@ -242,18 +227,12 @@ export const onedriveConnector: ConnectorConfig = { const maxFiles = sourceConfig.maxFiles ? Number(sourceConfig.maxFiles) : 0 const previouslyFetched = (syncContext?.totalDocsFetched as number) ?? 0 - const CONCURRENCY = 5 - const documents: ExternalDocument[] = [] - for (let i = 0; i < textFiles.length; i += CONCURRENCY) { - if (maxFiles > 0 && previouslyFetched + documents.length >= maxFiles) break - const batch = textFiles.slice(i, i + CONCURRENCY) - const results = await Promise.all(batch.map((item) => itemToDocument(accessToken, item))) - documents.push(...(results.filter(Boolean) as ExternalDocument[])) - } + let documents = textFiles.map(fileToStub) + if (maxFiles > 0) { const remaining = maxFiles - previouslyFetched if (documents.length > remaining) { - documents.splice(remaining) + documents = documents.slice(0, remaining) } } @@ -273,7 +252,7 @@ export const onedriveConnector: ConnectorConfig = { hasMore = true } else if (folderQueue.length > 0) { const nextFolderId = folderQueue.shift()! - const nextUrl = `${GRAPH_BASE_URL}/me/drive/items/${nextFolderId}/children` + const nextUrl = `${GRAPH_BASE_URL}/me/drive/items/${nextFolderId}/children?$orderby=lastModifiedDateTime desc` nextCursor = JSON.stringify({ pageUrl: nextUrl, folderQueue }) hasMore = true } @@ -308,10 +287,20 @@ export const onedriveConnector: ConnectorConfig = { const item = (await response.json()) as OneDriveItem - // Only process files with supported extensions if (!item.file || !isSupportedTextFile(item.name)) return null - return itemToDocument(accessToken, item) + try { + const content = await fetchFileContent(accessToken, item.id, item.name) + if (!content.trim()) return null + + const stub = fileToStub(item) + return { ...stub, content, contentDeferred: false } + } catch (error) { + logger.warn(`Failed to fetch content for file: ${item.name} (${item.id})`, { + error: error instanceof Error ? error.message : String(error), + }) + return null + } }, validateConfig: async ( diff --git a/apps/sim/connectors/sharepoint/sharepoint.ts b/apps/sim/connectors/sharepoint/sharepoint.ts index ec3d08655ff..41beb0e08d0 100644 --- a/apps/sim/connectors/sharepoint/sharepoint.ts +++ b/apps/sim/connectors/sharepoint/sharepoint.ts @@ -2,7 +2,7 @@ import { createLogger } from '@sim/logger' import { MicrosoftSharepointIcon } from '@/components/icons' import { fetchWithRetry, VALIDATE_RETRY_OPTIONS } from '@/lib/knowledge/documents/utils' import type { ConnectorConfig, ExternalDocument, ExternalDocumentList } from '@/connectors/types' -import { computeContentHash, htmlToPlainText, parseTagDate } from '@/connectors/utils' +import { htmlToPlainText, parseTagDate } from '@/connectors/utils' const logger = createLogger('SharePointConnector') @@ -133,8 +133,9 @@ async function downloadFileContent( } const text = await response.text() - if (text.length > MAX_DOWNLOAD_SIZE) { - return text.slice(0, MAX_DOWNLOAD_SIZE) + if (Buffer.byteLength(text, 'utf8') > MAX_DOWNLOAD_SIZE) { + logger.warn(`File "${fileName}" exceeds ${MAX_DOWNLOAD_SIZE} bytes, truncating`) + return Buffer.from(text, 'utf8').subarray(0, MAX_DOWNLOAD_SIZE).toString('utf8') } return text } @@ -156,44 +157,25 @@ async function fetchFileContent( } /** - * Converts a DriveItem to an ExternalDocument by downloading its content. + * Converts a DriveItem to a lightweight metadata stub (no content download). */ -async function itemToDocument( - accessToken: string, - siteId: string, - item: DriveItem, - siteName: string -): Promise { - try { - const content = await fetchFileContent(accessToken, siteId, item.id, item.name) - if (!content.trim()) { - logger.info(`Skipping empty file: ${item.name} (${item.id})`) - return null - } - - const contentHash = await computeContentHash(content) - - return { - externalId: item.id, - title: item.name || 'Untitled', - content, - mimeType: 'text/plain', - sourceUrl: item.webUrl, - contentHash, - metadata: { - lastModifiedDateTime: item.lastModifiedDateTime, - createdDateTime: item.createdDateTime, - createdBy: item.createdBy?.user?.displayName, - fileSize: item.size, - path: item.parentReference?.path, - siteName, - }, - } - } catch (error) { - logger.warn(`Failed to extract content from file: ${item.name} (${item.id})`, { - error: error instanceof Error ? error.message : String(error), - }) - return null +function itemToStub(item: DriveItem, siteName: string): ExternalDocument { + return { + externalId: item.id, + title: item.name || 'Untitled', + content: '', + contentDeferred: true, + mimeType: 'text/plain', + sourceUrl: item.webUrl, + contentHash: `sharepoint:${item.id}:${item.lastModifiedDateTime ?? ''}`, + metadata: { + lastModifiedDateTime: item.lastModifiedDateTime, + createdDateTime: item.createdDateTime, + createdBy: item.createdBy?.user?.displayName, + fileSize: item.size, + path: item.parentReference?.path, + siteName, + }, } } @@ -397,26 +379,14 @@ export const sharepointConnector: ConnectorConfig = { // Push subfolders onto the stack for depth-first traversal state.folderStack.push(...subfolders) - // Convert files to documents in batches - const CONCURRENCY = 5 + // Convert files to lightweight stubs (no content download) const previouslyFetched = totalFetched - for (let i = 0; i < files.length; i += CONCURRENCY) { + for (const file of files) { if (maxFiles > 0 && previouslyFetched + documents.length >= maxFiles) break - const batch = files.slice(i, i + CONCURRENCY) - const results = await Promise.all( - batch.map((file) => itemToDocument(accessToken, siteId, file, siteName)) - ) - documents.push(...(results.filter(Boolean) as ExternalDocument[])) + documents.push(itemToStub(file, siteName)) } totalFetched += documents.length - if (maxFiles > 0) { - const remaining = maxFiles - previouslyFetched - if (documents.length > remaining) { - documents.splice(remaining) - totalFetched = maxFiles - } - } if (syncContext) syncContext.totalDocsFetched = totalFetched const hitLimit = maxFiles > 0 && totalFetched >= maxFiles @@ -492,7 +462,18 @@ export const sharepointConnector: ConnectorConfig = { return null } - return itemToDocument(accessToken, siteId, item, siteName ?? siteUrl) + try { + const content = await fetchFileContent(accessToken, siteId, item.id, item.name) + if (!content.trim()) return null + + const stub = itemToStub(item, siteName ?? siteUrl) + return { ...stub, content, contentDeferred: false } + } catch (error) { + logger.warn(`Failed to fetch content for file: ${item.name} (${item.id})`, { + error: error instanceof Error ? error.message : String(error), + }) + return null + } }, validateConfig: async ( diff --git a/apps/sim/lib/knowledge/connectors/sync-engine.ts b/apps/sim/lib/knowledge/connectors/sync-engine.ts index 8440126c7f7..1a0502bbd04 100644 --- a/apps/sim/lib/knowledge/connectors/sync-engine.ts +++ b/apps/sim/lib/knowledge/connectors/sync-engine.ts @@ -292,7 +292,7 @@ export async function executeSync( const externalDocs: ExternalDocument[] = [] let cursor: string | undefined let hasMore = true - const syncContext: Record = {} + const syncContext: Record = { syncRunId: crypto.randomUUID() } // Determine if this sync should be incremental const isIncremental = diff --git a/apps/sim/lib/oauth/oauth.ts b/apps/sim/lib/oauth/oauth.ts index 31f7d290fc4..ded48dc0800 100644 --- a/apps/sim/lib/oauth/oauth.ts +++ b/apps/sim/lib/oauth/oauth.ts @@ -1190,6 +1190,7 @@ function getProviderAuthConfig(provider: string): ProviderAuthConfig { clientId, clientSecret, useBasicAuth: false, + supportsRefreshTokenRotation: false, } } case 'slack': { diff --git a/apps/sim/tools/confluence/utils.ts b/apps/sim/tools/confluence/utils.ts index d7e55a2c565..303464bdbf2 100644 --- a/apps/sim/tools/confluence/utils.ts +++ b/apps/sim/tools/confluence/utils.ts @@ -1,11 +1,22 @@ -export async function getConfluenceCloudId(domain: string, accessToken: string): Promise { - const response = await fetch('https://api.atlassian.com/oauth/token/accessible-resources', { - method: 'GET', - headers: { - Authorization: `Bearer ${accessToken}`, - Accept: 'application/json', +import type { RetryOptions } from '@/lib/knowledge/documents/utils' +import { fetchWithRetry } from '@/lib/knowledge/documents/utils' + +export async function getConfluenceCloudId( + domain: string, + accessToken: string, + retryOptions?: RetryOptions +): Promise { + const response = await fetchWithRetry( + 'https://api.atlassian.com/oauth/token/accessible-resources', + { + method: 'GET', + headers: { + Authorization: `Bearer ${accessToken}`, + Accept: 'application/json', + }, }, - }) + retryOptions + ) const resources = await response.json() From 228afa40b8e60d55a130fa9e07aea30ecfb25904 Mon Sep 17 00:00:00 2001 From: Waleed Latif Date: Thu, 26 Mar 2026 16:14:20 -0700 Subject: [PATCH 03/11] fix(connectors): address PR review comments - metadata merge, retryOptions, UTF-8 safety MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Sync engine: merge metadata from getDocument during deferred hydration, so Gmail/Obsidian/Confluence tags and metadata survive the stub→full transition - Evernote: pass retryOptions {retries:3, backoff:500} from listDocuments and getDocument callers into apiFindNotesMetadata and apiGetNote - Google Drive + SharePoint: safe UTF-8 truncation that walks back to the last complete character boundary instead of splitting multi-byte chars Co-Authored-By: Claude Opus 4.6 --- apps/sim/connectors/evernote/evernote.ts | 12 ++++++++++-- apps/sim/connectors/google-drive/google-drive.ts | 5 ++++- apps/sim/connectors/sharepoint/sharepoint.ts | 5 ++++- apps/sim/lib/knowledge/connectors/sync-engine.ts | 1 + 4 files changed, 19 insertions(+), 4 deletions(-) diff --git a/apps/sim/connectors/evernote/evernote.ts b/apps/sim/connectors/evernote/evernote.ts index 553f3d7d04b..6d8ac8060c0 100644 --- a/apps/sim/connectors/evernote/evernote.ts +++ b/apps/sim/connectors/evernote/evernote.ts @@ -412,7 +412,14 @@ export const evernoteConnector: ConnectorConfig = { logger.info('Listing Evernote notes', { offset, maxNotes: NOTES_PER_PAGE }) - const result = await apiFindNotesMetadata(accessToken, offset, NOTES_PER_PAGE, notebookGuid) + const retryOptions = { retries: 3, backoff: 500 } + const result = await apiFindNotesMetadata( + accessToken, + offset, + NOTES_PER_PAGE, + notebookGuid, + retryOptions + ) const documents: ExternalDocument[] = result.notes.map((meta) => { const tagNames = meta.tagGuids.map((g) => tagMap[g]).filter(Boolean) @@ -451,7 +458,8 @@ export const evernoteConnector: ConnectorConfig = { syncContext?: Record ): Promise => { try { - const note = await apiGetNote(accessToken, externalId) + const retryOptions = { retries: 3, backoff: 500 } + const note = await apiGetNote(accessToken, externalId, retryOptions) const plainText = htmlToPlainText(note.content) if (!plainText.trim()) return null diff --git a/apps/sim/connectors/google-drive/google-drive.ts b/apps/sim/connectors/google-drive/google-drive.ts index 3addf7a93fe..11120c08355 100644 --- a/apps/sim/connectors/google-drive/google-drive.ts +++ b/apps/sim/connectors/google-drive/google-drive.ts @@ -70,7 +70,10 @@ async function downloadTextFile(accessToken: string, fileId: string): Promise MAX_EXPORT_SIZE) { logger.warn(`File exceeds ${MAX_EXPORT_SIZE} bytes, truncating`) - return Buffer.from(text, 'utf8').subarray(0, MAX_EXPORT_SIZE).toString('utf8') + const buf = Buffer.from(text, 'utf8') + let end = MAX_EXPORT_SIZE + while (end > 0 && (buf[end] & 0xc0) === 0x80) end-- + return buf.subarray(0, end).toString('utf8') } return text } diff --git a/apps/sim/connectors/sharepoint/sharepoint.ts b/apps/sim/connectors/sharepoint/sharepoint.ts index 41beb0e08d0..452e99a8d23 100644 --- a/apps/sim/connectors/sharepoint/sharepoint.ts +++ b/apps/sim/connectors/sharepoint/sharepoint.ts @@ -135,7 +135,10 @@ async function downloadFileContent( const text = await response.text() if (Buffer.byteLength(text, 'utf8') > MAX_DOWNLOAD_SIZE) { logger.warn(`File "${fileName}" exceeds ${MAX_DOWNLOAD_SIZE} bytes, truncating`) - return Buffer.from(text, 'utf8').subarray(0, MAX_DOWNLOAD_SIZE).toString('utf8') + const buf = Buffer.from(text, 'utf8') + let end = MAX_DOWNLOAD_SIZE + while (end > 0 && (buf[end] & 0xc0) === 0x80) end-- + return buf.subarray(0, end).toString('utf8') } return text } diff --git a/apps/sim/lib/knowledge/connectors/sync-engine.ts b/apps/sim/lib/knowledge/connectors/sync-engine.ts index 1a0502bbd04..667c06bed45 100644 --- a/apps/sim/lib/knowledge/connectors/sync-engine.ts +++ b/apps/sim/lib/knowledge/connectors/sync-engine.ts @@ -463,6 +463,7 @@ export async function executeSync( content: fullDoc.content, contentHash: fullDoc.contentHash ?? op.extDoc.contentHash, contentDeferred: false, + metadata: { ...op.extDoc.metadata, ...fullDoc.metadata }, }, } }) From eb788141bca6b055ba3aaa8b4f767cb6aa4c1741 Mon Sep 17 00:00:00 2001 From: Waleed Latif Date: Thu, 26 Mar 2026 16:19:48 -0700 Subject: [PATCH 04/11] fix(evernote): use correct RetryOptions property names maxRetries/initialDelayMs instead of retries/backoff to match the RetryOptions interface from lib/knowledge/documents/utils. Co-Authored-By: Claude Opus 4.6 --- apps/sim/connectors/evernote/evernote.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/sim/connectors/evernote/evernote.ts b/apps/sim/connectors/evernote/evernote.ts index 6d8ac8060c0..5504ed9e9b6 100644 --- a/apps/sim/connectors/evernote/evernote.ts +++ b/apps/sim/connectors/evernote/evernote.ts @@ -412,7 +412,7 @@ export const evernoteConnector: ConnectorConfig = { logger.info('Listing Evernote notes', { offset, maxNotes: NOTES_PER_PAGE }) - const retryOptions = { retries: 3, backoff: 500 } + const retryOptions = { maxRetries: 3, initialDelayMs: 500 } const result = await apiFindNotesMetadata( accessToken, offset, @@ -458,7 +458,7 @@ export const evernoteConnector: ConnectorConfig = { syncContext?: Record ): Promise => { try { - const retryOptions = { retries: 3, backoff: 500 } + const retryOptions = { maxRetries: 3, initialDelayMs: 500 } const note = await apiGetNote(accessToken, externalId, retryOptions) const plainText = htmlToPlainText(note.content) if (!plainText.trim()) return null From 8c409c2f85b6ee31f8ce5f601fd6366bf4d5249c Mon Sep 17 00:00:00 2001 From: Waleed Latif Date: Thu, 26 Mar 2026 16:31:14 -0700 Subject: [PATCH 05/11] fix(sync-engine): merge title from getDocument and skip unchanged docs after hydration MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Merge title from getDocument during deferred hydration so Gmail documents get the email Subject header instead of the snippet text - After hydration, compare the hydrated contentHash against the stored DB hash — if they match, skip the update. This prevents Obsidian (and any connector with a force-refresh stub hash) from re-uploading and re-processing unchanged documents every sync Co-Authored-By: Claude Opus 4.6 --- apps/sim/lib/knowledge/connectors/sync-engine.ts | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/apps/sim/lib/knowledge/connectors/sync-engine.ts b/apps/sim/lib/knowledge/connectors/sync-engine.ts index 667c06bed45..db63126f1c6 100644 --- a/apps/sim/lib/knowledge/connectors/sync-engine.ts +++ b/apps/sim/lib/knowledge/connectors/sync-engine.ts @@ -456,12 +456,21 @@ export async function executeSync( syncContext ) if (!fullDoc?.content.trim()) return null + const hydratedHash = fullDoc.contentHash ?? op.extDoc.contentHash + if ( + op.type === 'update' && + existingByExternalId.get(op.extDoc.externalId)?.contentHash === hydratedHash + ) { + result.docsUnchanged++ + return null + } return { ...op, extDoc: { ...op.extDoc, + title: fullDoc.title || op.extDoc.title, content: fullDoc.content, - contentHash: fullDoc.contentHash ?? op.extDoc.contentHash, + contentHash: hydratedHash, contentDeferred: false, metadata: { ...op.extDoc.metadata, ...fullDoc.metadata }, }, From 70e9726d7162d99ad025d1182b0318338e8016da Mon Sep 17 00:00:00 2001 From: Waleed Latif Date: Thu, 26 Mar 2026 16:40:27 -0700 Subject: [PATCH 06/11] fix(sync-engine): dedup externalIds, enable deletion reconciliation, merge sourceUrl MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three sync engine gaps identified during audit: 1. Duplicate externalId guard: if a connector returns the same externalId across pages (pagination overlap), skip the second occurrence to prevent unique constraint violations on add and double-uploads on update. 2. Deletion reconciliation: previously required explicit fullSync or syncMode='full', meaning docs deleted from the source accumulated in the KB forever. Now runs on all non-incremental syncs (which return ALL docs). Includes a safety threshold: if >50% of existing docs (and >5 docs) would be deleted, skip and warn — protects against partial listing failures. Explicit fullSync bypasses the threshold. 3. sourceUrl merge: hydration now picks up sourceUrl from getDocument, falling back to the stub's sourceUrl if getDocument doesn't set one. Co-Authored-By: Claude Opus 4.6 --- .../lib/knowledge/connectors/sync-engine.ts | 21 +++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/apps/sim/lib/knowledge/connectors/sync-engine.ts b/apps/sim/lib/knowledge/connectors/sync-engine.ts index db63126f1c6..fbc1c7f8539 100644 --- a/apps/sim/lib/knowledge/connectors/sync-engine.ts +++ b/apps/sim/lib/knowledge/connectors/sync-engine.ts @@ -403,6 +403,7 @@ export async function executeSync( const pendingOps: DocOp[] = [] for (const extDoc of externalDocs) { + if (seenExternalIds.has(extDoc.externalId)) continue seenExternalIds.add(extDoc.externalId) if (excludedExternalIds.has(extDoc.externalId)) { @@ -472,6 +473,7 @@ export async function executeSync( content: fullDoc.content, contentHash: hydratedHash, contentDeferred: false, + sourceUrl: fullDoc.sourceUrl ?? op.extDoc.sourceUrl, metadata: { ...op.extDoc.metadata, ...fullDoc.metadata }, }, } @@ -552,15 +554,26 @@ export async function executeSync( } } - // Skip deletion reconciliation during incremental syncs — results only contain changed docs - if (!isIncremental && (options?.fullSync || connector.syncMode === 'full')) { + // Reconcile deletions for non-incremental syncs (which return ALL docs). + // Skip for incremental syncs since results only contain changed docs. + if (!isIncremental) { const removedIds = existingDocs .filter((d) => d.externalId && !seenExternalIds.has(d.externalId)) .map((d) => d.id) if (removedIds.length > 0) { - await hardDeleteDocuments(removedIds, syncLogId) - result.docsDeleted += removedIds.length + const deletionRatio = + existingDocs.length > 0 ? removedIds.length / existingDocs.length : 0 + + if (deletionRatio > 0.5 && removedIds.length > 5 && !options?.fullSync) { + logger.warn( + `Skipping deletion of ${removedIds.length}/${existingDocs.length} docs — exceeds safety threshold. Trigger a full sync to force cleanup.`, + { connectorId, deletionRatio: Math.round(deletionRatio * 100) } + ) + } else { + await hardDeleteDocuments(removedIds, syncLogId) + result.docsDeleted += removedIds.length + } } } From 39e7768d2a5f0d9307ca361f25af99690114b7d4 Mon Sep 17 00:00:00 2001 From: Waleed Latif Date: Thu, 26 Mar 2026 16:51:12 -0700 Subject: [PATCH 07/11] lint --- apps/sim/lib/knowledge/connectors/sync-engine.ts | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/apps/sim/lib/knowledge/connectors/sync-engine.ts b/apps/sim/lib/knowledge/connectors/sync-engine.ts index fbc1c7f8539..3b532a5472d 100644 --- a/apps/sim/lib/knowledge/connectors/sync-engine.ts +++ b/apps/sim/lib/knowledge/connectors/sync-engine.ts @@ -562,8 +562,7 @@ export async function executeSync( .map((d) => d.id) if (removedIds.length > 0) { - const deletionRatio = - existingDocs.length > 0 ? removedIds.length / existingDocs.length : 0 + const deletionRatio = existingDocs.length > 0 ? removedIds.length / existingDocs.length : 0 if (deletionRatio > 0.5 && removedIds.length > 5 && !options?.fullSync) { logger.warn( From bc73154d6dc69d30f5c1df20be947058e898c8c0 Mon Sep 17 00:00:00 2001 From: Waleed Latif Date: Thu, 26 Mar 2026 18:05:39 -0700 Subject: [PATCH 08/11] fix(connectors): confluence version metadata fallback and google drive maxFiles guard - Confluence: use `version?.number` directly (undefined) in metadata instead of `?? ''` (empty string) to prevent Number('') = 0 passing NaN check in mapTags. Hash still uses `?? ''` for string interpolation. - Google Drive: add early return when previouslyFetched >= maxFiles to prevent effectivePageSize <= 0 which violates the API's pageSize requirement (1-1000). Co-Authored-By: Claude Opus 4.6 --- apps/sim/connectors/confluence/confluence.ts | 12 ++++++------ apps/sim/connectors/google-drive/google-drive.ts | 5 +++++ 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/apps/sim/connectors/confluence/confluence.ts b/apps/sim/connectors/confluence/confluence.ts index f20f8fc385b..ab93226614e 100644 --- a/apps/sim/connectors/confluence/confluence.ts +++ b/apps/sim/connectors/confluence/confluence.ts @@ -77,7 +77,7 @@ function cqlResultToStub(item: Record, domain: string): Externa const labelsWrapper = metadata?.labels as Record | undefined const labelResults = (labelsWrapper?.results || []) as Record[] const labels = labelResults.map((l) => l.name as string) - const versionNumber = version?.number ?? '' + const versionNumber = version?.number return { externalId: String(item.id), @@ -86,7 +86,7 @@ function cqlResultToStub(item: Record, domain: string): Externa contentDeferred: true, mimeType: 'text/plain', sourceUrl: links?.webui ? `https://${domain}/wiki${links.webui}` : undefined, - contentHash: `confluence:${item.id}:${versionNumber}`, + contentHash: `confluence:${item.id}:${versionNumber ?? ''}`, metadata: { spaceId: (item.space as Record)?.key, status: item.status, @@ -274,7 +274,7 @@ export const confluenceConnector: ConnectorConfig = { const links = page._links as Record | undefined const version = page.version as Record | undefined - const versionNumber = version?.number ?? '' + const versionNumber = version?.number return { externalId: String(page.id), @@ -283,7 +283,7 @@ export const confluenceConnector: ConnectorConfig = { contentDeferred: false, mimeType: 'text/plain', sourceUrl: links?.webui ? `https://${domain}/wiki${links.webui}` : undefined, - contentHash: `confluence:${page.id}:${versionNumber}`, + contentHash: `confluence:${page.id}:${versionNumber ?? ''}`, metadata: { spaceId: page.spaceId, status: page.status, @@ -410,7 +410,7 @@ async function listDocumentsV2( const documents: ExternalDocument[] = results.map((page: Record) => { const pageId = String(page.id) const version = page.version as Record | undefined - const versionNumber = version?.number ?? '' + const versionNumber = version?.number return { externalId: pageId, @@ -421,7 +421,7 @@ async function listDocumentsV2( sourceUrl: (page._links as Record)?.webui ? `https://${domain}/wiki${(page._links as Record).webui}` : undefined, - contentHash: `confluence:${pageId}:${versionNumber}`, + contentHash: `confluence:${pageId}:${versionNumber ?? ''}`, metadata: { spaceId: page.spaceId, status: page.status, diff --git a/apps/sim/connectors/google-drive/google-drive.ts b/apps/sim/connectors/google-drive/google-drive.ts index 11120c08355..24ceacabb30 100644 --- a/apps/sim/connectors/google-drive/google-drive.ts +++ b/apps/sim/connectors/google-drive/google-drive.ts @@ -221,6 +221,11 @@ export const googleDriveConnector: ConnectorConfig = { const maxFiles = sourceConfig.maxFiles ? Number(sourceConfig.maxFiles) : 0 const previouslyFetched = (syncContext?.totalDocsFetched as number) ?? 0 + + if (maxFiles > 0 && previouslyFetched >= maxFiles) { + return { documents: [], hasMore: false } + } + const remaining = maxFiles > 0 ? maxFiles - previouslyFetched : 0 const effectivePageSize = maxFiles > 0 ? Math.min(pageSize, remaining) : pageSize From 6ffc0e3666227a2e9adb2aeee2405ddab37d1954 Mon Sep 17 00:00:00 2001 From: Waleed Latif Date: Thu, 26 Mar 2026 18:20:21 -0700 Subject: [PATCH 09/11] fix(connectors): blogpost labels and capped listing deletion reconciliation - Confluence: fetchLabelsForPages now tries both /pages/{id}/labels and /blogposts/{id}/labels, preventing label loss when getDocument hydrates blogpost content (previously returned empty labels on 404). - Sync engine: skip deletion reconciliation when listing was capped (maxFiles/maxThreads). Connectors signal this via syncContext.listingCapped. Prevents incorrect deletion of docs beyond the cap that still exist in source. fullSync override still forces deletion for explicit cleanup. - Google Drive & Gmail: set syncContext.listingCapped = true when cap is hit. Co-Authored-By: Claude Opus 4.6 --- apps/sim/connectors/confluence/confluence.ts | 37 ++++++++++++------- apps/sim/connectors/gmail/gmail.ts | 1 + .../connectors/google-drive/google-drive.ts | 1 + .../lib/knowledge/connectors/sync-engine.ts | 6 +-- 4 files changed, 29 insertions(+), 16 deletions(-) diff --git a/apps/sim/connectors/confluence/confluence.ts b/apps/sim/connectors/confluence/confluence.ts index ab93226614e..8766da45d95 100644 --- a/apps/sim/connectors/confluence/confluence.ts +++ b/apps/sim/connectors/confluence/confluence.ts @@ -31,23 +31,34 @@ async function fetchLabelsForPages( const results = await Promise.all( batch.map(async (pageId) => { try { - const url = `https://api.atlassian.com/ex/confluence/${cloudId}/wiki/api/v2/pages/${pageId}/labels` - const response = await fetchWithRetry(url, { - method: 'GET', - headers: { - Accept: 'application/json', - Authorization: `Bearer ${accessToken}`, - }, - }) + let data: Record | null = null + for (const contentType of ['pages', 'blogposts']) { + const url = `https://api.atlassian.com/ex/confluence/${cloudId}/wiki/api/v2/${contentType}/${pageId}/labels` + const response = await fetchWithRetry(url, { + method: 'GET', + headers: { + Accept: 'application/json', + Authorization: `Bearer ${accessToken}`, + }, + }) + + if (response.ok) { + data = await response.json() + break + } + if (response.status !== 404) { + logger.warn(`Failed to fetch labels for ${contentType} ${pageId}`, { + status: response.status, + }) + } + } - if (!response.ok) { - logger.warn(`Failed to fetch labels for page ${pageId}`, { status: response.status }) + if (!data) { return { pageId, labels: [] as string[] } } - const data = await response.json() - const labels = (data.results || []).map( - (label: Record) => label.name as string + const labels = ((data.results as Record[]) || []).map( + (label) => label.name as string ) return { pageId, labels } } catch (error) { diff --git a/apps/sim/connectors/gmail/gmail.ts b/apps/sim/connectors/gmail/gmail.ts index 90fe0ef22d4..68a8f6c888f 100644 --- a/apps/sim/connectors/gmail/gmail.ts +++ b/apps/sim/connectors/gmail/gmail.ts @@ -454,6 +454,7 @@ export const gmailConnector: ConnectorConfig = { const nextPageToken = data.nextPageToken as string | undefined const hitLimit = newTotal >= maxThreads + if (hitLimit && syncContext) syncContext.listingCapped = true return { documents, diff --git a/apps/sim/connectors/google-drive/google-drive.ts b/apps/sim/connectors/google-drive/google-drive.ts index 24ceacabb30..cb6190c4dbe 100644 --- a/apps/sim/connectors/google-drive/google-drive.ts +++ b/apps/sim/connectors/google-drive/google-drive.ts @@ -274,6 +274,7 @@ export const googleDriveConnector: ConnectorConfig = { const totalFetched = previouslyFetched + documents.length if (syncContext) syncContext.totalDocsFetched = totalFetched const hitLimit = maxFiles > 0 && totalFetched >= maxFiles + if (hitLimit && syncContext) syncContext.listingCapped = true const nextPageToken = data.nextPageToken as string | undefined diff --git a/apps/sim/lib/knowledge/connectors/sync-engine.ts b/apps/sim/lib/knowledge/connectors/sync-engine.ts index 3b532a5472d..0fe70313d74 100644 --- a/apps/sim/lib/knowledge/connectors/sync-engine.ts +++ b/apps/sim/lib/knowledge/connectors/sync-engine.ts @@ -554,9 +554,9 @@ export async function executeSync( } } - // Reconcile deletions for non-incremental syncs (which return ALL docs). - // Skip for incremental syncs since results only contain changed docs. - if (!isIncremental) { + // Reconcile deletions for non-incremental syncs that returned ALL docs. + // Skip when listing was capped (maxFiles/maxThreads) — unseen docs may still exist in the source. + if (!isIncremental && (!syncContext?.listingCapped || options?.fullSync)) { const removedIds = existingDocs .filter((d) => d.externalId && !seenExternalIds.has(d.externalId)) .map((d) => d.id) From 50182d154900b12d07c085a9abc3d24a1adcebbe Mon Sep 17 00:00:00 2001 From: Waleed Latif Date: Thu, 26 Mar 2026 18:31:14 -0700 Subject: [PATCH 10/11] fix(connectors): set syncContext.listingCapped in all connectors with caps OneDrive, Dropbox, SharePoint, Confluence (v2 + CQL), and Notion (3 listing functions) now set syncContext.listingCapped = true when their respective maxFiles/maxPages limit is hit. Without this, the sync engine's deletion reconciliation would run against an incomplete listing and incorrectly hard-delete documents that exist in the source but fell outside the cap window. Co-Authored-By: Claude Opus 4.6 --- apps/sim/connectors/confluence/confluence.ts | 2 ++ apps/sim/connectors/dropbox/dropbox.ts | 1 + apps/sim/connectors/notion/notion.ts | 3 +++ apps/sim/connectors/onedrive/onedrive.ts | 1 + apps/sim/connectors/sharepoint/sharepoint.ts | 2 +- 5 files changed, 8 insertions(+), 1 deletion(-) diff --git a/apps/sim/connectors/confluence/confluence.ts b/apps/sim/connectors/confluence/confluence.ts index 8766da45d95..2ba7b71e038 100644 --- a/apps/sim/connectors/confluence/confluence.ts +++ b/apps/sim/connectors/confluence/confluence.ts @@ -456,6 +456,7 @@ async function listDocumentsV2( const totalFetched = ((syncContext?.totalDocsFetched as number) ?? 0) + documents.length if (syncContext) syncContext.totalDocsFetched = totalFetched const hitLimit = maxPages > 0 && totalFetched >= maxPages + if (hitLimit && syncContext) syncContext.listingCapped = true return { documents, @@ -621,6 +622,7 @@ async function listDocumentsViaCql( const totalFetched = ((syncContext?.totalDocsFetched as number) ?? 0) + documents.length if (syncContext) syncContext.totalDocsFetched = totalFetched const hitLimit = maxPages > 0 && totalFetched >= maxPages + if (hitLimit && syncContext) syncContext.listingCapped = true const totalSize = (data.totalSize as number) ?? 0 const nextStart = start + results.length diff --git a/apps/sim/connectors/dropbox/dropbox.ts b/apps/sim/connectors/dropbox/dropbox.ts index 68ca60eabad..87cbced05b4 100644 --- a/apps/sim/connectors/dropbox/dropbox.ts +++ b/apps/sim/connectors/dropbox/dropbox.ts @@ -206,6 +206,7 @@ export const dropboxConnector: ConnectorConfig = { const totalFetched = previouslyFetched + documents.length if (syncContext) syncContext.totalDocsFetched = totalFetched const hitLimit = maxFiles > 0 && totalFetched >= maxFiles + if (hitLimit && syncContext) syncContext.listingCapped = true return { documents, diff --git a/apps/sim/connectors/notion/notion.ts b/apps/sim/connectors/notion/notion.ts index 6c87c7ead14..4b56cc41f12 100644 --- a/apps/sim/connectors/notion/notion.ts +++ b/apps/sim/connectors/notion/notion.ts @@ -454,6 +454,7 @@ async function listFromWorkspace( const totalFetched = ((syncContext?.totalDocsFetched as number) ?? 0) + documents.length if (syncContext) syncContext.totalDocsFetched = totalFetched const hitLimit = maxPages > 0 && totalFetched >= maxPages + if (hitLimit && syncContext) syncContext.listingCapped = true const nextCursor = hitLimit ? undefined : ((data.next_cursor as string) ?? undefined) @@ -509,6 +510,7 @@ async function listFromDatabase( const totalFetched = ((syncContext?.totalDocsFetched as number) ?? 0) + documents.length if (syncContext) syncContext.totalDocsFetched = totalFetched const hitLimit = maxPages > 0 && totalFetched >= maxPages + if (hitLimit && syncContext) syncContext.listingCapped = true const nextCursor = hitLimit ? undefined : ((data.next_cursor as string) ?? undefined) @@ -604,6 +606,7 @@ async function listFromParentPage( const totalFetched = ((syncContext?.totalDocsFetched as number) ?? 0) + documents.length if (syncContext) syncContext.totalDocsFetched = totalFetched const hitLimit = maxPages > 0 && totalFetched >= maxPages + if (hitLimit && syncContext) syncContext.listingCapped = true const nextCursor = hitLimit ? undefined : ((data.next_cursor as string) ?? undefined) diff --git a/apps/sim/connectors/onedrive/onedrive.ts b/apps/sim/connectors/onedrive/onedrive.ts index dc7142670ed..74037741e11 100644 --- a/apps/sim/connectors/onedrive/onedrive.ts +++ b/apps/sim/connectors/onedrive/onedrive.ts @@ -239,6 +239,7 @@ export const onedriveConnector: ConnectorConfig = { const totalFetched = previouslyFetched + documents.length if (syncContext) syncContext.totalDocsFetched = totalFetched const hitLimit = maxFiles > 0 && totalFetched >= maxFiles + if (hitLimit && syncContext) syncContext.listingCapped = true const nextLink = data['@odata.nextLink'] diff --git a/apps/sim/connectors/sharepoint/sharepoint.ts b/apps/sim/connectors/sharepoint/sharepoint.ts index 452e99a8d23..2c79820b3d1 100644 --- a/apps/sim/connectors/sharepoint/sharepoint.ts +++ b/apps/sim/connectors/sharepoint/sharepoint.ts @@ -393,8 +393,8 @@ export const sharepointConnector: ConnectorConfig = { if (syncContext) syncContext.totalDocsFetched = totalFetched const hitLimit = maxFiles > 0 && totalFetched >= maxFiles + if (hitLimit && syncContext) syncContext.listingCapped = true - // Determine next cursor if (hitLimit) { return { documents, hasMore: false } } From fcde0375a26b233e762db7f3bfbb5e8848ed00a6 Mon Sep 17 00:00:00 2001 From: Waleed Latif Date: Thu, 26 Mar 2026 18:49:23 -0700 Subject: [PATCH 11/11] fix(evernote): thread retryOptions through apiListTags and apiListNotebooks All calls to apiListTags and apiListNotebooks in both listDocuments and getDocument now pass retryOptions for consistent retry protection across all Thrift RPC calls. Co-Authored-By: Claude Opus 4.6 --- apps/sim/connectors/evernote/evernote.ts | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/apps/sim/connectors/evernote/evernote.ts b/apps/sim/connectors/evernote/evernote.ts index 5504ed9e9b6..c84adf6b0c2 100644 --- a/apps/sim/connectors/evernote/evernote.ts +++ b/apps/sim/connectors/evernote/evernote.ts @@ -393,13 +393,14 @@ export const evernoteConnector: ConnectorConfig = { syncContext?: Record ): Promise => { const notebookGuid = (sourceConfig.notebookGuid as string) || undefined + const retryOptions = { maxRetries: 3, initialDelayMs: 500 } if (syncContext && !syncContext.tagMap) { - const tags = await apiListTags(accessToken) + const tags = await apiListTags(accessToken, retryOptions) syncContext.tagMap = Object.fromEntries(tags.map((t) => [t.guid, t.name])) } if (syncContext && !syncContext.notebookMap) { - const notebooks = await apiListNotebooks(accessToken) + const notebooks = await apiListNotebooks(accessToken, retryOptions) syncContext.notebookMap = Object.fromEntries(notebooks.map((nb) => [nb.guid, nb.name])) } @@ -412,7 +413,6 @@ export const evernoteConnector: ConnectorConfig = { logger.info('Listing Evernote notes', { offset, maxNotes: NOTES_PER_PAGE }) - const retryOptions = { maxRetries: 3, initialDelayMs: 500 } const result = await apiFindNotesMetadata( accessToken, offset, @@ -468,11 +468,11 @@ export const evernoteConnector: ConnectorConfig = { const host = getHost(accessToken) if (syncContext && !syncContext.tagMap) { - const tags = await apiListTags(accessToken) + const tags = await apiListTags(accessToken, retryOptions) syncContext.tagMap = Object.fromEntries(tags.map((t) => [t.guid, t.name])) } if (syncContext && !syncContext.notebookMap) { - const notebooks = await apiListNotebooks(accessToken) + const notebooks = await apiListNotebooks(accessToken, retryOptions) syncContext.notebookMap = Object.fromEntries(notebooks.map((nb) => [nb.guid, nb.name])) } @@ -482,9 +482,9 @@ export const evernoteConnector: ConnectorConfig = { tagMap = syncContext.tagMap as Record notebookMap = syncContext.notebookMap as Record } else { - const tags = await apiListTags(accessToken) + const tags = await apiListTags(accessToken, retryOptions) tagMap = Object.fromEntries(tags.map((t) => [t.guid, t.name])) - const notebooks = await apiListNotebooks(accessToken) + const notebooks = await apiListNotebooks(accessToken, retryOptions) notebookMap = Object.fromEntries(notebooks.map((nb) => [nb.guid, nb.name])) }