diff --git a/apps/client/src/lightweight/messaging_provider.ts b/apps/client/src/lightweight/messaging_provider.ts new file mode 100644 index 000000000..28b72c964 --- /dev/null +++ b/apps/client/src/lightweight/messaging_provider.ts @@ -0,0 +1,92 @@ +import type { WebSocketMessage } from "@triliumnext/commons"; +import type { MessagingProvider, MessageHandler } from "@triliumnext/core"; + +/** + * Messaging provider for browser Worker environments. + * + * This provider uses the Worker's postMessage API to communicate + * with the main thread. It's designed to be used inside a Web Worker + * that runs the core services. + * + * Message flow: + * - Outbound (worker → main): Uses self.postMessage() with type: "WS_MESSAGE" + * - Inbound (main → worker): Listens to onmessage for type: "WS_MESSAGE" + */ +export default class WorkerMessagingProvider implements MessagingProvider { + private messageHandlers: MessageHandler[] = []; + private isDisposed = false; + + constructor() { + // Listen for incoming messages from the main thread + self.addEventListener("message", this.handleIncomingMessage); + console.log("[WorkerMessagingProvider] Initialized"); + } + + private handleIncomingMessage = (event: MessageEvent) => { + if (this.isDisposed) return; + + const { type, message } = event.data || {}; + + if (type === "WS_MESSAGE" && message) { + // Dispatch to all registered handlers + for (const handler of this.messageHandlers) { + try { + handler(message as WebSocketMessage); + } catch (e) { + console.error("[WorkerMessagingProvider] Error in message handler:", e); + } + } + } + }; + + /** + * Send a message to all clients (in this case, the main thread). + * The main thread is responsible for further distribution if needed. + */ + sendMessageToAllClients(message: WebSocketMessage): void { + if (this.isDisposed) { + console.warn("[WorkerMessagingProvider] Cannot send message - provider is disposed"); + return; + } + + try { + self.postMessage({ + type: "WS_MESSAGE", + message + }); + } catch (e) { + console.error("[WorkerMessagingProvider] Error sending message:", e); + } + } + + /** + * Subscribe to incoming messages from the main thread. + */ + onMessage(handler: MessageHandler): () => void { + this.messageHandlers.push(handler); + + return () => { + this.messageHandlers = this.messageHandlers.filter(h => h !== handler); + }; + } + + /** + * Get the number of connected "clients". + * In worker context, there's always exactly 1 client (the main thread). + */ + getClientCount(): number { + return this.isDisposed ? 0 : 1; + } + + /** + * Clean up resources. + */ + dispose(): void { + if (this.isDisposed) return; + + this.isDisposed = true; + self.removeEventListener("message", this.handleIncomingMessage); + this.messageHandlers = []; + console.log("[WorkerMessagingProvider] Disposed"); + } +} diff --git a/apps/client/src/local-server-worker.ts b/apps/client/src/local-server-worker.ts index 536e2a0f0..9d9a9df23 100644 --- a/apps/client/src/local-server-worker.ts +++ b/apps/client/src/local-server-worker.ts @@ -5,6 +5,7 @@ import BrowserExecutionContext from './lightweight/cls_provider'; import BrowserCryptoProvider from './lightweight/crypto_provider'; import BrowserSqlProvider from './lightweight/sql_provider'; +import WorkerMessagingProvider from './lightweight/messaging_provider'; import { BrowserRouter } from './lightweight/browser_router'; import { createConfiguredRouter } from './lightweight/browser_routes'; @@ -50,6 +51,9 @@ console.log("[Worker] Error handlers installed"); // Shared SQL provider instance const sqlProvider = new BrowserSqlProvider(); +// Messaging provider for worker-to-main-thread communication +const messagingProvider = new WorkerMessagingProvider(); + // Core module, router, and initialization state let coreModule: typeof import("@triliumnext/core") | null = null; let router: BrowserRouter | null = null; @@ -100,6 +104,7 @@ async function initialize(): Promise { coreModule.initializeCore({ executionContext: new BrowserExecutionContext(), crypto: new BrowserCryptoProvider(), + messaging: messagingProvider, dbConfig: { provider: sqlProvider, isReadOnly: false, diff --git a/packages/trilium-core/src/index.ts b/packages/trilium-core/src/index.ts index 561b6d8d3..3da24fa87 100644 --- a/packages/trilium-core/src/index.ts +++ b/packages/trilium-core/src/index.ts @@ -3,6 +3,7 @@ import { CryptoProvider, initCrypto } from "./services/encryption/crypto"; import { getLog, initLog } from "./services/log"; import { initSql } from "./services/sql/index"; import { SqlService, SqlServiceParams } from "./services/sql/sql"; +import { initMessaging, MessagingProvider } from "./services/messaging/index"; export type * from "./services/sql/types"; export * from "./services/sql/index"; @@ -34,6 +35,10 @@ export { default as TaskContext } from "./services/task_context"; export { default as revisions } from "./services/revisions"; export { default as erase } from "./services/erase"; +// Messaging system +export * from "./services/messaging/index"; +export type { MessagingProvider, ServerMessagingProvider, MessageClient, MessageHandler } from "./services/messaging/types"; + export { default as becca } from "./becca/becca"; export { default as becca_loader } from "./becca/becca_loader"; export { default as becca_service } from "./becca/becca_service"; @@ -58,13 +63,17 @@ export type { NoteParams } from "./services/notes"; export * as sanitize from "./services/sanitizer"; export * as routes from "./routes"; -export function initializeCore({ dbConfig, executionContext, crypto }: { +export function initializeCore({ dbConfig, executionContext, crypto, messaging }: { dbConfig: SqlServiceParams, executionContext: ExecutionContext, - crypto: CryptoProvider + crypto: CryptoProvider, + messaging?: MessagingProvider }) { initLog(); initCrypto(crypto); initSql(new SqlService(dbConfig, getLog())); initContext(executionContext); + if (messaging) { + initMessaging(messaging); + } }; diff --git a/packages/trilium-core/src/services/messaging/index.ts b/packages/trilium-core/src/services/messaging/index.ts new file mode 100644 index 000000000..1fb303676 --- /dev/null +++ b/packages/trilium-core/src/services/messaging/index.ts @@ -0,0 +1,46 @@ +import type { WebSocketMessage } from "@triliumnext/commons"; +import type { MessagingProvider } from "./types.js"; + +let messagingProvider: MessagingProvider | null = null; + +/** + * Initialize the messaging system with a provider. + * This should be called during application startup. + */ +export function initMessaging(provider: MessagingProvider): void { + messagingProvider = provider; +} + +/** + * Get the current messaging provider. + * Throws if messaging hasn't been initialized. + */ +export function getMessagingProvider(): MessagingProvider { + if (!messagingProvider) { + throw new Error("Messaging provider not initialized. Call initMessaging() first."); + } + return messagingProvider; +} + +/** + * Check if messaging has been initialized. + */ +export function isMessagingInitialized(): boolean { + return messagingProvider !== null; +} + +/** + * Send a message to all connected clients. + * This is a convenience function that uses the current provider. + */ +export function sendMessageToAllClients(message: WebSocketMessage): void { + if (!messagingProvider) { + // Silently ignore if no provider - allows core to work without messaging + console.debug("[Messaging] No provider initialized, message not sent:", message.type); + return; + } + messagingProvider.sendMessageToAllClients(message); +} + +// Re-export types +export * from "./types.js"; diff --git a/packages/trilium-core/src/services/messaging/types.ts b/packages/trilium-core/src/services/messaging/types.ts new file mode 100644 index 000000000..ae02ae061 --- /dev/null +++ b/packages/trilium-core/src/services/messaging/types.ts @@ -0,0 +1,97 @@ +import type { EntityChange, WebSocketMessage } from "@triliumnext/commons"; + +/** + * Handler function for incoming messages from clients. + */ +export type MessageHandler = (message: WebSocketMessage) => void | Promise; + +/** + * Represents a connected client that can receive messages. + */ +export interface MessageClient { + /** Unique identifier for this client */ + readonly id: string; + /** Send a message to this specific client */ + send(message: WebSocketMessage): void; + /** Check if the client is still connected */ + isConnected(): boolean; +} + +/** + * Provider interface for server-to-client messaging. + * + * This abstraction allows different transport mechanisms: + * - WebSocket for traditional server environments + * - Worker postMessage for browser environments + * - Mock implementations for testing + */ +export interface MessagingProvider { + /** + * Send a message to all connected clients. + * This is the primary method used by core services like TaskContext. + */ + sendMessageToAllClients(message: WebSocketMessage): void; + + /** + * Send a message to a specific client by ID. + * Returns false if the client is not found or disconnected. + */ + sendMessageToClient?(clientId: string, message: WebSocketMessage): boolean; + + /** + * Subscribe to incoming messages from clients. + * Returns an unsubscribe function. + */ + onMessage?(handler: MessageHandler): () => void; + + /** + * Get the number of connected clients. + */ + getClientCount?(): number; + + /** + * Called when the provider should clean up resources. + */ + dispose?(): void; +} + +/** + * Extended interface for server-side messaging with entity change support. + * This is used by the WebSocket implementation to handle entity sync. + */ +export interface ServerMessagingProvider extends MessagingProvider { + /** + * Send entity changes to all clients (for frontend-update messages). + */ + sendEntityChangesToAllClients(entityChanges: EntityChange[]): void; + + /** + * Set the last synced push ID for sync status messages. + */ + setLastSyncedPush(entityChangeId: number): void; + + /** + * Notify clients that sync pull is in progress. + */ + syncPullInProgress(): void; + + /** + * Notify clients that sync push is in progress. + */ + syncPushInProgress(): void; + + /** + * Notify clients that sync has finished. + */ + syncFinished(): void; + + /** + * Notify clients that sync has failed. + */ + syncFailed(): void; + + /** + * Request all clients to reload their frontend. + */ + reloadFrontend(reason: string): void; +} diff --git a/packages/trilium-core/src/services/ws.ts b/packages/trilium-core/src/services/ws.ts index 95a41ee20..9e390a03e 100644 --- a/packages/trilium-core/src/services/ws.ts +++ b/packages/trilium-core/src/services/ws.ts @@ -1,5 +1,20 @@ +import type { WebSocketMessage } from "@triliumnext/commons"; +import { sendMessageToAllClients as sendMessage } from "./messaging/index.js"; + +/** + * WebSocket service abstraction for core. + * + * This module provides a simple interface for sending messages to clients. + * The actual transport mechanism is provided by the messaging provider + * configured during initialization. + * + * @deprecated Use the messaging module directly instead. + */ export default { - sendMessageToAllClients(message: object) { - console.warn("Ignored ws", message); + /** + * Send a message to all connected clients. + */ + sendMessageToAllClients(message: WebSocketMessage) { + sendMessage(message); } }