diff --git a/apps/server/src/services/llm/embeddings/index.ts b/apps/server/src/services/llm/embeddings/index.ts index 2757f8808..c931a1745 100644 --- a/apps/server/src/services/llm/embeddings/index.ts +++ b/apps/server/src/services/llm/embeddings/index.ts @@ -65,6 +65,7 @@ export const { export const { getEmbeddingStats, reprocessAllNotes, + queueNotesForMissingEmbeddings, cleanupEmbeddings } = stats; @@ -107,6 +108,7 @@ export default { // Stats and maintenance getEmbeddingStats: stats.getEmbeddingStats, reprocessAllNotes: stats.reprocessAllNotes, + queueNotesForMissingEmbeddings: stats.queueNotesForMissingEmbeddings, cleanupEmbeddings: stats.cleanupEmbeddings, // Index operations diff --git a/apps/server/src/services/llm/embeddings/queue.ts b/apps/server/src/services/llm/embeddings/queue.ts index 12f915c81..b002513c5 100644 --- a/apps/server/src/services/llm/embeddings/queue.ts +++ b/apps/server/src/services/llm/embeddings/queue.ts @@ -282,8 +282,6 @@ export async function processEmbeddingQueue() { continue; } - // Log that we're starting to process this note - log.info(`Starting embedding generation for note ${noteId}`); // Get note context for embedding const context = await getNoteEmbeddingContext(noteId); @@ -334,7 +332,6 @@ export async function processEmbeddingQueue() { "DELETE FROM embedding_queue WHERE noteId = ?", [noteId] ); - log.info(`Successfully completed embedding processing for note ${noteId}`); // Count as successfully processed processedCount++; diff --git a/apps/server/src/services/llm/embeddings/stats.ts b/apps/server/src/services/llm/embeddings/stats.ts index 6154da368..7fa0d6d82 100644 --- a/apps/server/src/services/llm/embeddings/stats.ts +++ b/apps/server/src/services/llm/embeddings/stats.ts @@ -1,28 +1,13 @@ import sql from "../../../services/sql.js"; import log from "../../../services/log.js"; -import cls from "../../../services/cls.js"; -import { queueNoteForEmbedding } from "./queue.js"; +import indexService from '../index_service.js'; /** * Reprocess all notes to update embeddings + * @deprecated Use indexService.reprocessAllNotes() directly instead */ export async function reprocessAllNotes() { - log.info("Queueing all notes for embedding updates"); - - // Get all non-deleted note IDs - const noteIds = await sql.getColumn( - "SELECT noteId FROM notes WHERE isDeleted = 0" - ); - - log.info(`Adding ${noteIds.length} notes to embedding queue`); - - // Process each note ID within a cls context - for (const noteId of noteIds) { - // Use cls.init to ensure proper context for each operation - await cls.init(async () => { - await queueNoteForEmbedding(noteId as string, 'UPDATE'); - }); - } + return indexService.reprocessAllNotes(); } /** @@ -79,6 +64,14 @@ export async function getEmbeddingStats() { }; } +/** + * Queue notes that don't have embeddings for current provider settings + * @deprecated Use indexService.queueNotesForMissingEmbeddings() directly instead + */ +export async function queueNotesForMissingEmbeddings() { + return indexService.queueNotesForMissingEmbeddings(); +} + /** * Cleanup function to remove stale or unused embeddings */ diff --git a/apps/server/src/services/llm/index_service.ts b/apps/server/src/services/llm/index_service.ts index 9d118e274..08d79dcb1 100644 --- a/apps/server/src/services/llm/index_service.ts +++ b/apps/server/src/services/llm/index_service.ts @@ -12,6 +12,7 @@ import log from "../log.js"; import options from "../options.js"; import becca from "../../becca/becca.js"; +import beccaLoader from "../../becca/becca_loader.js"; import vectorStore from "./embeddings/index.js"; import providerManager from "./providers/providers.js"; import { ContextExtractor } from "./context/index.js"; @@ -378,11 +379,10 @@ export class IndexService { if (!shouldProcessEmbeddings) { // This instance is not configured to process embeddings - log.info("Skipping batch indexing as this instance is not configured to process embeddings"); return false; } - // Process the embedding queue + // Process the embedding queue (batch size is controlled by embeddingBatchSize option) await vectorStore.processEmbeddingQueue(); return true; @@ -879,9 +879,16 @@ export class IndexService { log.info(`Automatic embedding indexing started ${isSyncServer ? 'as sync server' : 'as client'}`); } + // Start background processing of the embedding queue + const { setupEmbeddingBackgroundProcessing } = await import('./embeddings/events.js'); + await setupEmbeddingBackgroundProcessing(); + // Re-initialize event listeners this.setupEventListeners(); + // Queue notes that don't have embeddings for current providers + await this.queueNotesForMissingEmbeddings(); + // Start processing the queue immediately await this.runBatchIndexing(20); @@ -892,6 +899,95 @@ export class IndexService { } } + + + /** + * Queue notes that don't have embeddings for current provider settings + */ + async queueNotesForMissingEmbeddings() { + try { + // Wait for becca to be fully loaded before accessing notes + await beccaLoader.beccaLoaded; + + // Get all non-deleted notes + const allNotes = Object.values(becca.notes).filter(note => !note.isDeleted); + + // Get enabled providers + const providers = await providerManager.getEnabledEmbeddingProviders(); + if (providers.length === 0) { + return; + } + + let queuedCount = 0; + let excludedCount = 0; + + // Process notes in batches to avoid overwhelming the system + const batchSize = 100; + for (let i = 0; i < allNotes.length; i += batchSize) { + const batch = allNotes.slice(i, i + batchSize); + + for (const note of batch) { + try { + // Skip notes excluded from AI + if (isNoteExcludedFromAI(note)) { + excludedCount++; + continue; + } + + // Check if note needs embeddings for any enabled provider + let needsEmbedding = false; + + for (const provider of providers) { + const config = provider.getConfig(); + const existingEmbedding = await vectorStore.getEmbeddingForNote( + note.noteId, + provider.name, + config.model + ); + + if (!existingEmbedding) { + needsEmbedding = true; + break; + } + } + + if (needsEmbedding) { + await vectorStore.queueNoteForEmbedding(note.noteId, 'UPDATE'); + queuedCount++; + } + } catch (error: any) { + log.error(`Error checking embeddings for note ${note.noteId}: ${error.message || 'Unknown error'}`); + } + } + + } + } catch (error: any) { + log.error(`Error queuing notes for missing embeddings: ${error.message || 'Unknown error'}`); + } + } + + /** + * Reprocess all notes to update embeddings + */ + async reprocessAllNotes() { + if (!this.initialized) { + await this.initialize(); + } + + try { + // Get all non-deleted note IDs + const noteIds = await sql.getColumn("SELECT noteId FROM notes WHERE isDeleted = 0"); + + // Process each note ID + for (const noteId of noteIds) { + await vectorStore.queueNoteForEmbedding(noteId as string, 'UPDATE'); + } + } catch (error: any) { + log.error(`Error reprocessing all notes: ${error.message || 'Unknown error'}`); + throw error; + } + } + /** * Stop embedding generation (called when AI is disabled) */ @@ -907,7 +1003,8 @@ export class IndexService { } // Stop the background processing from embeddings/events.ts - vectorStore.stopEmbeddingBackgroundProcessing(); + const { stopEmbeddingBackgroundProcessing } = await import('./embeddings/events.js'); + stopEmbeddingBackgroundProcessing(); // Clear all embedding providers to clean up resources providerManager.clearAllEmbeddingProviders();