From 496db062139fe575f367a453727ba0205f53f6d0 Mon Sep 17 00:00:00 2001 From: Andras Schmelczer Date: Sun, 7 Dec 2025 13:30:45 +0000 Subject: [PATCH] Add event handler class --- .../src/views/cursors/file-explorer.ts | 78 +- .../sync-client/src/persistence/settings.ts | 167 ++- .../src/services/websocket-manager.test.ts | 8 +- .../src/services/websocket-manager.ts | 471 ++++---- frontend/sync-client/src/sync-client.ts | 980 ++++++++-------- .../src/sync-operations/cursor-tracker.ts | 416 +++---- .../sync-operations/file-change-notifier.ts | 24 +- .../sync-client/src/sync-operations/syncer.ts | 997 ++++++++-------- .../sync-operations/unrestricted-syncer.ts | 1021 ++++++++--------- frontend/sync-client/src/tracing/logger.ts | 113 +- .../sync-client/src/tracing/sync-history.ts | 242 ++-- .../data-structures/event-listeners.test.ts | 147 +++ .../utils/data-structures/event-listeners.ts | 71 ++ .../src/utils/debugging/log-to-console.ts | 2 +- 14 files changed, 2428 insertions(+), 2309 deletions(-) create mode 100644 frontend/sync-client/src/utils/data-structures/event-listeners.test.ts create mode 100644 frontend/sync-client/src/utils/data-structures/event-listeners.ts diff --git a/frontend/obsidian-plugin/src/views/cursors/file-explorer.ts b/frontend/obsidian-plugin/src/views/cursors/file-explorer.ts index 78bf3e4f..3088c640 100644 --- a/frontend/obsidian-plugin/src/views/cursors/file-explorer.ts +++ b/frontend/obsidian-plugin/src/views/cursors/file-explorer.ts @@ -2,54 +2,54 @@ import "./file-explorer.scss"; import type { App, View } from "obsidian"; import { - utils, - type MaybeOutdatedClientCursors, - type RelativePath + utils, + type MaybeOutdatedClientCursors, + type RelativePath } from "sync-client"; const REMOTE_USER_CONTAINER_CLASS = "remote-users"; export function renderCursorsInFileExplorer( - cursors: MaybeOutdatedClientCursors[], - app: App + cursors: MaybeOutdatedClientCursors[], + app: App ): void { - const fileExplorers = app.workspace.getLeavesOfType("file-explorer"); - if (fileExplorers.length == 0) return; + const fileExplorers = app.workspace.getLeavesOfType("file-explorer"); + if (fileExplorers.length == 0) return; - const [fileExplorer] = fileExplorers; + const [fileExplorer] = fileExplorers; - // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment - const fileExplorerView: View & { - fileItems: Record; // it's an internal API - } = fileExplorer.view as any; // eslint-disable-line + // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment + const fileExplorerView: View & { + fileItems: Record; // it's an internal API + } = fileExplorer.view as any; // eslint-disable-line - for (const key in fileExplorerView.fileItems) { - const element = - fileExplorerView.fileItems[key].el.querySelector(".tree-item-self"); + for (const key in fileExplorerView.fileItems) { + const element = + fileExplorerView.fileItems[key].el.querySelector(".tree-item-self"); - const customElement = createDiv( - { - cls: REMOTE_USER_CONTAINER_CLASS - }, - (parent) => { - cursors.forEach((cursor) => { - cursor.documentsWithCursors.forEach((document) => { - if (document.relative_path.startsWith(key)) { - parent.appendChild( - createSpan({ - text: cursor.userName, - attr: { - style: `border-color: ${utils.getRandomColor(cursor.userName)}` - } - }) - ); - } - }); - }); - } - ); + const customElement = createDiv( + { + cls: REMOTE_USER_CONTAINER_CLASS + }, + (parent) => { + cursors.forEach((cursor) => { + cursor.documentsWithCursors.forEach((document) => { + if (document.relative_path.startsWith(key)) { + parent.appendChild( + createSpan({ + text: cursor.userName, + attr: { + style: `border-color: ${utils.getRandomColor(cursor.userName)}` + } + }) + ); + } + }); + }); + } + ); - element?.querySelector("." + REMOTE_USER_CONTAINER_CLASS)?.remove(); - element?.appendChild(customElement); - } + element?.querySelector("." + REMOTE_USER_CONTAINER_CLASS)?.remove(); + element?.appendChild(customElement); + } } diff --git a/frontend/sync-client/src/persistence/settings.ts b/frontend/sync-client/src/persistence/settings.ts index 8472155a..234c99f6 100644 --- a/frontend/sync-client/src/persistence/settings.ts +++ b/frontend/sync-client/src/persistence/settings.ts @@ -1,113 +1,94 @@ import type { Logger } from "../tracing/logger"; -import { awaitAll } from "../utils/await-all"; import { Lock } from "../utils/data-structures/locks"; -import { removeFromArray } from "../utils/remove-from-array"; +import { EventListeners } from "../utils/data-structures/event-listeners"; export interface SyncSettings { - remoteUri: string; - token: string; - vaultName: string; - syncConcurrency: number; - isSyncEnabled: boolean; - maxFileSizeMB: number; - ignorePatterns: string[]; - webSocketRetryIntervalMs: number; - diffCacheSizeMB: number; - enableTelemetry: boolean; - networkRetryIntervalMs: number; - minimumSaveIntervalMs: number; + remoteUri: string; + token: string; + vaultName: string; + syncConcurrency: number; + isSyncEnabled: boolean; + maxFileSizeMB: number; + ignorePatterns: string[]; + webSocketRetryIntervalMs: number; + diffCacheSizeMB: number; + enableTelemetry: boolean; + networkRetryIntervalMs: number; + minimumSaveIntervalMs: number; } export const DEFAULT_SETTINGS: SyncSettings = { - remoteUri: "", - token: "", - vaultName: "default", - syncConcurrency: 1, - isSyncEnabled: false, - maxFileSizeMB: 10, - ignorePatterns: [], - webSocketRetryIntervalMs: 3500, - diffCacheSizeMB: 4, - enableTelemetry: false, - networkRetryIntervalMs: 1000, - minimumSaveIntervalMs: 1000 + remoteUri: "", + token: "", + vaultName: "default", + syncConcurrency: 1, + isSyncEnabled: false, + maxFileSizeMB: 10, + ignorePatterns: [], + webSocketRetryIntervalMs: 3500, + diffCacheSizeMB: 4, + enableTelemetry: false, + networkRetryIntervalMs: 1000, + minimumSaveIntervalMs: 1000 }; export class Settings { - private settings: SyncSettings; - private readonly lock: Lock = new Lock(); + private settings: SyncSettings; + private readonly lock: Lock = new Lock(); - private readonly onSettingsChangeHandlers: (( - newSettings: SyncSettings, - oldSettings: SyncSettings - ) => unknown)[] = []; + public readonly onSettingsChanged = new EventListeners< + (newSettings: SyncSettings, oldSettings: SyncSettings) => unknown + >(); - public constructor( - private readonly logger: Logger, - initialState: Partial | undefined, - private readonly saveData: (data: SyncSettings) => Promise - ) { - this.settings = { - ...DEFAULT_SETTINGS, - ...(initialState ?? {}) - }; + public constructor( + private readonly logger: Logger, + initialState: Partial | undefined, + private readonly saveData: (data: SyncSettings) => Promise + ) { + this.settings = { + ...DEFAULT_SETTINGS, + ...(initialState ?? {}) + }; - this.logger.debug( - `Loaded settings: ${JSON.stringify(this.settings, null, 2)}` - ); - } + this.logger.debug( + `Loaded settings: ${JSON.stringify(this.settings, null, 2)}` + ); + } - public getSettings(): SyncSettings { - return this.settings; - } + public getSettings(): SyncSettings { + return this.settings; + } - public addOnSettingsChangeListener( - listener: (settings: SyncSettings, oldSettings: SyncSettings) => unknown - ): void { - this.onSettingsChangeHandlers.push(listener); - } + public async setSetting( + key: T, + value: SyncSettings[T] + ): Promise { + await this.setSettings({ + [key]: value + }); + } - public removeOnSettingsChangeListener( - listener: (settings: SyncSettings, oldSettings: SyncSettings) => unknown - ): void { - removeFromArray(this.onSettingsChangeHandlers, listener); - } + public async setSettings(value: Partial): Promise { + await this.lock.withLock(async () => { + this.logger.debug( + `Updating settings with: ${JSON.stringify(value)}` + ); + const oldSettings = this.settings; + this.settings = { + ...this.settings, + ...value + }; - public async setSetting( - key: T, - value: SyncSettings[T] - ): Promise { - await this.setSettings({ - [key]: value - }); - } + await this.onSettingsChanged.triggerAsync( + this.settings, + oldSettings + ); - public async setSettings(value: Partial): Promise { - await this.lock.withLock(async () => { - this.logger.debug( - `Updating settings with: ${JSON.stringify(value)}` - ); - const oldSettings = this.settings; - this.settings = { - ...this.settings, - ...value - }; + await this.save(); + }); + } - await awaitAll( - this.onSettingsChangeHandlers - .map((handler) => { - return handler(this.settings, oldSettings); - }) - .filter((result): result is Promise => { - return result instanceof Promise; - }) - ); - - await this.save(); - }); - } - - private async save(): Promise { - await this.saveData(this.settings); - } + private async save(): Promise { + await this.saveData(this.settings); + } } diff --git a/frontend/sync-client/src/services/websocket-manager.test.ts b/frontend/sync-client/src/services/websocket-manager.test.ts index 13aca939..8dd8180a 100644 --- a/frontend/sync-client/src/services/websocket-manager.test.ts +++ b/frontend/sync-client/src/services/websocket-manager.test.ts @@ -122,7 +122,7 @@ describe("WebSocketManager", () => { MockWebSocket as unknown as typeof WebSocket ); - manager.addRemoteVaultUpdateListener(async () => { + manager.onRemoteVaultUpdateReceived.add(async () => { await new Promise((resolve) => setTimeout(resolve, 10)); }); manager.start(); @@ -152,7 +152,7 @@ describe("WebSocketManager", () => { MockWebSocket as unknown as typeof WebSocket ); - manager.addRemoteCursorsUpdateListener(async () => { + manager.onRemoteCursorsUpdateReceived.add(async () => { await new Promise((resolve) => setTimeout(resolve, 10)); }); manager.start(); @@ -227,7 +227,7 @@ describe("WebSocketManager", () => { ); let statusChangeCount = 0; - manager.addWebSocketStatusChangeListener(() => { + manager.onWebSocketStatusChanged.add(() => { statusChangeCount++; }); @@ -269,7 +269,7 @@ describe("WebSocketManager", () => { resolveListener = resolve; }); - manager.addRemoteVaultUpdateListener(async () => { + manager.onRemoteVaultUpdateReceived.add(async () => { await listenerPromise; }); diff --git a/frontend/sync-client/src/services/websocket-manager.ts b/frontend/sync-client/src/services/websocket-manager.ts index 0dc19d60..f8dc59d4 100644 --- a/frontend/sync-client/src/services/websocket-manager.ts +++ b/frontend/sync-client/src/services/websocket-manager.ts @@ -6,295 +6,260 @@ import type { CursorPositionFromClient } from "./types/CursorPositionFromClient" import type { ClientCursors } from "./types/ClientCursors"; import { createPromise } from "../utils/create-promise"; import type { WebSocketVaultUpdate } from "./types/WebSocketVaultUpdate"; -import { awaitAll } from "../utils/await-all"; import { WEBSOCKET_DISCONNECT_TIMEOUT_IN_S } from "../consts"; import { removeFromArray } from "../utils/remove-from-array"; +import { EventListeners } from "../utils/data-structures/event-listeners"; +import { awaitAll } from "../utils/await-all"; export class WebSocketManager { - private readonly webSocketStatusChangeListeners: (( - isConnected: boolean - ) => unknown)[] = []; + public readonly onWebSocketStatusChanged = new EventListeners< + (isConnected: boolean) => unknown + >(); - private readonly remoteVaultUpdateListeners: (( - update: WebSocketVaultUpdate - ) => Promise)[] = []; + public readonly onRemoteVaultUpdateReceived = new EventListeners< + (update: WebSocketVaultUpdate) => Promise + >(); - private readonly remoteCursorsUpdateListeners: (( - cursors: ClientCursors[] - ) => Promise)[] = []; + public readonly onRemoteCursorsUpdateReceived = new EventListeners< + (cursors: ClientCursors[]) => Promise + >(); - private isStopped = true; - private resolveDisconnectingPromise: null | (() => unknown) = null; - private reconnectTimeoutId: ReturnType | undefined; + private isStopped = true; + private resolveDisconnectingPromise: null | (() => unknown) = null; + private reconnectTimeoutId: ReturnType | undefined; - private readonly outstandingPromises: Promise[] = []; + private readonly outstandingPromises: Promise[] = []; - private webSocket: WebSocket | undefined; - private readonly webSocketFactoryImplementation: typeof globalThis.WebSocket; + private webSocket: WebSocket | undefined; + private readonly webSocketFactoryImplementation: typeof globalThis.WebSocket; - public constructor( - private readonly deviceId: string, - private readonly logger: Logger, - private readonly settings: Settings, - webSocketImplementation?: typeof globalThis.WebSocket - ) { - if (webSocketImplementation) { - this.webSocketFactoryImplementation = webSocketImplementation; - } else { - if ( - typeof globalThis !== "undefined" && - typeof globalThis.WebSocket === "undefined" - ) { - // eslint-disable-next-line - this.webSocketFactoryImplementation = require("ws"); // polyfill for WebSocket in Node.js - } else { - this.webSocketFactoryImplementation = WebSocket; - } - } - } + public constructor( + private readonly deviceId: string, + private readonly logger: Logger, + private readonly settings: Settings, + webSocketImplementation?: typeof globalThis.WebSocket + ) { + if (webSocketImplementation) { + this.webSocketFactoryImplementation = webSocketImplementation; + } else { + if ( + typeof globalThis !== "undefined" && + typeof globalThis.WebSocket === "undefined" + ) { + // eslint-disable-next-line + this.webSocketFactoryImplementation = require("ws"); // polyfill for WebSocket in Node.js + } else { + this.webSocketFactoryImplementation = WebSocket; + } + } + } - public get isWebSocketConnected(): boolean { - return ( - this.webSocket?.readyState === - this.webSocketFactoryImplementation.OPEN - ); - } + public get isWebSocketConnected(): boolean { + return ( + this.webSocket?.readyState === + this.webSocketFactoryImplementation.OPEN + ); + } - public addWebSocketStatusChangeListener( - listener: (isConnected: boolean) => unknown - ): void { - this.webSocketStatusChangeListeners.push(listener); - } + public start(): void { + this.isStopped = false; + this.initializeWebSocket(); + } - public addRemoteCursorsUpdateListener( - listener: (cursors: ClientCursors[]) => Promise - ): void { - this.remoteCursorsUpdateListeners.push(listener); - } + public async stop(): Promise { + const [promise, resolve] = createPromise(); + this.resolveDisconnectingPromise = resolve; - public addRemoteVaultUpdateListener( - listener: (update: WebSocketVaultUpdate) => Promise - ): void { - this.remoteVaultUpdateListeners.push(listener); - } + this.isStopped = true; - public start(): void { - this.isStopped = false; - this.initializeWebSocket(); - } + if (this.reconnectTimeoutId !== undefined) { + clearTimeout(this.reconnectTimeoutId); + this.reconnectTimeoutId = undefined; + } - public async stop(): Promise { - const [promise, resolve] = createPromise(); - this.resolveDisconnectingPromise = resolve; + this.webSocket?.close(1000, "WebSocketManager has been stopped"); - this.isStopped = true; + // eslint-disable-next-line @typescript-eslint/init-declarations + let timeoutId: ReturnType | undefined; + const timeoutPromise = new Promise((_, reject) => { + timeoutId = setTimeout(() => { + reject( + new Error( + `Timeout waiting for WebSocket to close after ${WEBSOCKET_DISCONNECT_TIMEOUT_IN_S} seconds` + ) + ); + }, WEBSOCKET_DISCONNECT_TIMEOUT_IN_S * 1000); + }); - if (this.reconnectTimeoutId !== undefined) { - clearTimeout(this.reconnectTimeoutId); - this.reconnectTimeoutId = undefined; - } + try { + while (this.isWebSocketConnected) { + await Promise.race([promise, timeoutPromise]); + } + } catch (error) { + this.logger.error( + `Error while waiting for WebSocket to close: ${String(error)}` + ); + // Force cleanup even if close didn't work + this.resolveDisconnectingPromise(); + this.resolveDisconnectingPromise = null; + } finally { + // Clear timeout to prevent unhandled rejection + if (timeoutId !== undefined) { + clearTimeout(timeoutId); + } + } - this.webSocket?.close(1000, "WebSocketManager has been stopped"); + await this.waitUntilFinished(); + } - // eslint-disable-next-line @typescript-eslint/init-declarations - let timeoutId: ReturnType | undefined; - const timeoutPromise = new Promise((_, reject) => { - timeoutId = setTimeout(() => { - reject( - new Error( - `Timeout waiting for WebSocket to close after ${WEBSOCKET_DISCONNECT_TIMEOUT_IN_S} seconds` - ) - ); - }, WEBSOCKET_DISCONNECT_TIMEOUT_IN_S * 1000); - }); + public async waitUntilFinished(): Promise { + await awaitAll(this.outstandingPromises); + } - try { - while (this.isWebSocketConnected) { - await Promise.race([promise, timeoutPromise]); - } - } catch (error) { - this.logger.error( - `Error while waiting for WebSocket to close: ${String(error)}` - ); - // Force cleanup even if close didn't work - this.resolveDisconnectingPromise(); - this.resolveDisconnectingPromise = null; - } finally { - // Clear timeout to prevent unhandled rejection - if (timeoutId !== undefined) { - clearTimeout(timeoutId); - } - } + public sendHandshakeMessage( + message: WebSocketClientMessage & { type: "handshake" } + ): void { + const { webSocket } = this; + if (!webSocket) { + throw new Error( + "WebSocket is not connected, cannot send handshake message" + ); + } - await this.waitUntilFinished(); - } + try { + webSocket.send(JSON.stringify(message)); + } catch (error) { + this.logger.error( + `Failed to send handshake message: ${String(error)}` + ); + throw error; + } + } - public async waitUntilFinished(): Promise { - await awaitAll(this.outstandingPromises); - } + public updateLocalCursors(cursorPositions: CursorPositionFromClient): void { + if (!this.isWebSocketConnected || !this.webSocket) { + // A missing cursor update is fine, we can just skip it if needed + this.logger.warn( + "WebSocket is not connected, cannot send cursor positions" + ); + return; + } - public sendHandshakeMessage( - message: WebSocketClientMessage & { type: "handshake" } - ): void { - const { webSocket } = this; - if (!webSocket) { - throw new Error( - "WebSocket is not connected, cannot send handshake message" - ); - } + const message: WebSocketClientMessage = { + type: "cursorPositions", + ...cursorPositions + }; - try { - webSocket.send(JSON.stringify(message)); - } catch (error) { - this.logger.error( - `Failed to send handshake message: ${String(error)}` - ); - throw error; - } - } + try { + this.webSocket.send(JSON.stringify(message)); + this.logger.debug( + `Sent cursor positions: ${JSON.stringify(cursorPositions)}` + ); + } catch (error) { + this.logger.warn( + `Failed to send cursor positions: ${String(error)}` + ); + } + } - public updateLocalCursors(cursorPositions: CursorPositionFromClient): void { - if (!this.isWebSocketConnected || !this.webSocket) { - // A missing cursor update is fine, we can just skip it if needed - this.logger.warn( - "WebSocket is not connected, cannot send cursor positions" - ); - return; - } + private initializeWebSocket(): void { + // Clean up old WebSocket handlers to prevent race conditions + if (this.webSocket) { + try { + // Remove handlers to prevent them from firing after new connection + this.webSocket.onopen = null; + this.webSocket.onclose = null; + this.webSocket.onmessage = null; + this.webSocket.onerror = null; + this.webSocket.close(); + } catch (e) { + this.logger.error( + `Failed to close previous WebSocket connection: ${e}` + ); + } + } - const message: WebSocketClientMessage = { - type: "cursorPositions", - ...cursorPositions - }; + const wsUri = new URL(this.settings.getSettings().remoteUri); + wsUri.protocol = wsUri.protocol === "https" ? "wss" : "ws"; + wsUri.pathname = `/vaults/${this.settings.getSettings().vaultName}/ws`; - try { - this.webSocket.send(JSON.stringify(message)); - this.logger.debug( - `Sent cursor positions: ${JSON.stringify(cursorPositions)}` - ); - } catch (error) { - this.logger.warn( - `Failed to send cursor positions: ${String(error)}` - ); - } - } + this.logger.info(`Connecting to WebSocket at ${wsUri.toString()}`); - private initializeWebSocket(): void { - // Clean up old WebSocket handlers to prevent race conditions - if (this.webSocket) { - try { - // Remove handlers to prevent them from firing after new connection - this.webSocket.onopen = null; - this.webSocket.onclose = null; - this.webSocket.onmessage = null; - this.webSocket.onerror = null; - this.webSocket.close(); - } catch (e) { - this.logger.error( - `Failed to close previous WebSocket connection: ${e}` - ); - } - } + this.webSocket = new this.webSocketFactoryImplementation(wsUri); - const wsUri = new URL(this.settings.getSettings().remoteUri); - wsUri.protocol = wsUri.protocol === "https" ? "wss" : "ws"; - wsUri.pathname = `/vaults/${this.settings.getSettings().vaultName}/ws`; + this.webSocket.onopen = (): void => { + this.logger.info("WebSocket connection opened"); + this.onWebSocketStatusChanged.trigger(true); + }; - this.logger.info(`Connecting to WebSocket at ${wsUri.toString()}`); + this.webSocket.onmessage = (event): void => { + try { + // eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion + const message = JSON.parse( + event.data + ) as WebSocketServerMessage; - this.webSocket = new this.webSocketFactoryImplementation(wsUri); + // Track the message handling promise + const messageHandlingPromise = this.handleWebSocketMessage( + message + ) + .catch((error: unknown) => { + this.logger.error( + `Error handling WebSocket message: ${String(error)}` + ); + }) + .finally(() => { + removeFromArray( + this.outstandingPromises, + messageHandlingPromise + ); + }); - this.webSocket.onopen = (): void => { - this.logger.info("WebSocket connection opened"); - this.webSocketStatusChangeListeners.forEach((listener) => - listener(true) - ); - }; + void this.outstandingPromises.push(messageHandlingPromise); // ignore the returned promise + } catch (error) { + this.logger.error( + `Error parsing WebSocket message: ${String(error)}` + ); + } + }; - this.webSocket.onmessage = (event): void => { - try { - // eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion - const message = JSON.parse( - event.data - ) as WebSocketServerMessage; + this.webSocket.onclose = (event): void => { + this.logger.warn( + `WebSocket closed with code ${event.code} (${event.reason == "" ? "unknown reason" : event.reason})` + ); + this.onWebSocketStatusChanged.trigger(false); - // Track the message handling promise - const messageHandlingPromise = this.handleWebSocketMessage( - message - ) - .catch((error: unknown) => { - this.logger.error( - `Error handling WebSocket message: ${String(error)}` - ); - }) - .finally(() => { - removeFromArray( - this.outstandingPromises, - messageHandlingPromise - ); - }); + if (this.isStopped) { + this.resolveDisconnectingPromise?.(); + this.resolveDisconnectingPromise = null; + } else { + this.reconnectTimeoutId = setTimeout(() => { + this.reconnectTimeoutId = undefined; + this.initializeWebSocket(); + }, this.settings.getSettings().webSocketRetryIntervalMs); + } + }; + } - void this.outstandingPromises.push(messageHandlingPromise); // ignore the returned promise - } catch (error) { - this.logger.error( - `Error parsing WebSocket message: ${String(error)}` - ); - } - }; + private async handleWebSocketMessage( + message: WebSocketServerMessage + ): Promise { + if (message.type === "vaultUpdate") { + await this.onRemoteVaultUpdateReceived.triggerAsync(message); - this.webSocket.onclose = (event): void => { - this.logger.warn( - `WebSocket closed with code ${event.code} (${event.reason == "" ? "unknown reason" : event.reason})` - ); - this.webSocketStatusChangeListeners.forEach((listener) => - listener(false) - ); + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition + } else if (message.type === "cursorPositions") { + this.logger.debug( + `Received cursor positions for ${JSON.stringify(message.clients)}` + ); - if (this.isStopped) { - this.resolveDisconnectingPromise?.(); - this.resolveDisconnectingPromise = null; - } else { - this.reconnectTimeoutId = setTimeout(() => { - this.reconnectTimeoutId = undefined; - this.initializeWebSocket(); - }, this.settings.getSettings().webSocketRetryIntervalMs); - } - }; - } - - private async handleWebSocketMessage( - message: WebSocketServerMessage - ): Promise { - if (message.type === "vaultUpdate") { - await awaitAll( - this.remoteVaultUpdateListeners.map(async (listener) => { - await listener(message).catch((error: unknown) => { - this.logger.error( - `Error in vault update listener: ${String(error)}` - ); - }); - }) - ); - - // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition - } else if (message.type === "cursorPositions") { - this.logger.debug( - `Received cursor positions for ${JSON.stringify(message.clients)}` - ); - - await awaitAll( - this.remoteCursorsUpdateListeners.map(async (listener) => { - await listener(message.clients).catch((error: unknown) => { - this.logger.error( - `Error in cursor positions listener: ${String(error)}` - ); - }); - }) - ); - } else { - this.logger.warn( - `Received unknown message type: ${JSON.stringify(message)}` - ); - } - } + await this.onRemoteCursorsUpdateReceived.triggerAsync( + message.clients + ); + } else { + this.logger.warn( + `Received unknown message type: ${JSON.stringify(message)}` + ); + } + } } diff --git a/frontend/sync-client/src/sync-client.ts b/frontend/sync-client/src/sync-client.ts index b76da9d9..af615f52 100644 --- a/frontend/sync-client/src/sync-client.ts +++ b/frontend/sync-client/src/sync-client.ts @@ -26,495 +26,497 @@ import { FixedSizeDocumentCache } from "./utils/data-structures/fix-sized-cache" import { setUpTelemetry } from "./utils/set-up-telemetry"; import { DIFF_CACHE_SIZE_MB } from "./consts"; import { ServerConfig } from "./services/server-config"; +import { EventListeners } from "./utils/data-structures/event-listeners"; export class SyncClient { - private hasStartedOfflineSync = false; - private hasFinishedOfflineSync = false; - private hasStarted = false; - private hasBeenDestroyed = false; - private unloadTelemetry?: () => void; - - private constructor( - private readonly history: SyncHistory, - private readonly settings: Settings, - private readonly database: Database, - private readonly syncer: Syncer, - private readonly webSocketManager: WebSocketManager, - public readonly logger: Logger, - private readonly fetchController: FetchController, - private readonly cursorTracker: CursorTracker, - private readonly fileChangeNotifier: FileChangeNotifier, - private readonly contentCache: FixedSizeDocumentCache, - private readonly fileOperations: FileOperations, - private readonly serverConfig: ServerConfig, - private readonly persistence: PersistenceProvider< - Partial<{ - settings: Partial; - database: Partial; - }> - > - ) {} - - public get documentCount(): number { - return this.database.length; - } - - public get isWebSocketConnected(): boolean { - return this.webSocketManager.isWebSocketConnected; - } - public static async create({ - fs, - persistence, - fetch, - webSocket, - nativeLineEndings = "\n" - }: { - fs: FileSystemOperations; - persistence: PersistenceProvider< - Partial<{ - settings: Partial; - database: Partial; - }> - >; - fetch?: typeof globalThis.fetch; - webSocket?: typeof globalThis.WebSocket; - nativeLineEndings?: string; - }): Promise { - const logger = new Logger(); - - const deviceId = createClientId(); - - logger.info(`Creating SyncClient with client id ${deviceId}`); - - const history = new SyncHistory(logger); - - let state = (await persistence.load()) ?? { - settings: undefined, - database: undefined - }; - - const settings = new Settings( - logger, - state.settings, - async (data): Promise => { - state = { ...state, settings: data }; - // we're not rate-limiting settings saves as (1) we need to initialise the settings to know the rate limit - // and (2) settings changes are infrequent enough that rate-limiting is not necessary - await persistence.save(state); - } - ); - - const rateLimitedSave = rateLimit( - persistence.save, - () => settings.getSettings().minimumSaveIntervalMs - ); - - const database = new Database( - logger, - state.database, - async (data): Promise => { - state = { ...state, database: data }; - await rateLimitedSave(state); - } - ); - - const fetchController = new FetchController( - settings.getSettings().isSyncEnabled, - logger - ); - settings.addOnSettingsChangeListener((newSettings, oldSettings) => { - if (oldSettings.isSyncEnabled != newSettings.isSyncEnabled) { - fetchController.canFetch = newSettings.isSyncEnabled; - } - }); - - const syncService = new SyncService( - deviceId, - fetchController, - settings, - logger, - fetch - ); - - const serverConfig = new ServerConfig(syncService); - - const fileOperations = new FileOperations( - logger, - database, - fs, - serverConfig, - nativeLineEndings - ); - - const contentCache = new FixedSizeDocumentCache( - 1024 * 1024 * DIFF_CACHE_SIZE_MB - ); - const unrestrictedSyncer = new UnrestrictedSyncer( - logger, - database, - settings, - syncService, - fileOperations, - history, - contentCache, - serverConfig - ); - - const webSocketManager = new WebSocketManager( - deviceId, - logger, - settings, - webSocket - ); - - const syncer = new Syncer( - deviceId, - logger, - database, - settings, - syncService, - webSocketManager, - fileOperations, - unrestrictedSyncer - ); - - const fileChangeNotifier = new FileChangeNotifier(); - const cursorTracker = new CursorTracker( - database, - webSocketManager, - fileOperations, - fileChangeNotifier - ); - const client = new SyncClient( - history, - settings, - database, - syncer, - webSocketManager, - logger, - fetchController, - cursorTracker, - fileChangeNotifier, - contentCache, - fileOperations, - serverConfig, - persistence - ); - - logger.info("SyncClient created successfully"); - - return client; - } - - public async start(): Promise { - this.checkIfDestroyed("start"); - - if (this.hasStarted) { - throw new Error("SyncClient has already been started"); - } - this.hasStarted = true; - - if ( - !this.unloadTelemetry && - this.settings.getSettings().enableTelemetry - ) { - this.unloadTelemetry = setUpTelemetry(); - } - - this.logger.addOnMessageListener((log): void => { - if (log.level === LogLevel.ERROR && Sentry.isInitialized()) { - Sentry.captureMessage(log.message); - } - }); - - this.settings.addOnSettingsChangeListener( - this.onSettingsChange.bind(this) - ); - - if (this.settings.getSettings().isSyncEnabled) { - this.logger.info("Starting SyncClient"); - await this.startSyncing(); - this.logger.info("SyncClient has successfully started"); - } - } - - /** - * Reload settings from disk overriding current in-memory settings. - * Missing values will be filled in from DEFAULT_SETTINGS rather than - * retaining current in-memory settings. - */ - public async reloadSettings(): Promise { - this.checkIfDestroyed("reloadSettings"); - - const state = (await this.persistence.load()) ?? { - settings: undefined - }; - - const settings = { - ...DEFAULT_SETTINGS, - ...(state.settings ?? {}) - }; - - await this.setSettings(settings); - } - - public async checkConnection(): Promise { - this.checkIfDestroyed("checkConnection"); - - const server = await this.serverConfig.checkConnection(true); - return { - isSuccessful: server.isSuccessful, - serverMessage: server.message, - isWebSocketConnected: this.webSocketManager.isWebSocketConnected - }; - } - - public getHistoryEntries(): readonly HistoryEntry[] { - return this.history.entries; - } - - public addSyncHistoryUpdateListener( - listener: (stats: HistoryStats) => unknown - ): void { - this.checkIfDestroyed("addSyncHistoryUpdateListener"); - - this.history.addSyncHistoryUpdateListener(listener); - } - - /** - * Wait for the in-flight operations to finish, reset all tracking, - * and the local database but retain the settings. - * The SyncClient can be used again after calling this method. - */ - public async reset(): Promise { - this.checkIfDestroyed("reset"); - - this.logger.info( - "Stopping SyncClient to apply changed connection settings" - ); - await this.pause(); - - // clear all local state - this.logger.info("Resetting SyncClient's local state"); - this.database.reset(); - await this.database.save(); // ensure the new database reads as empty - this.resetInMemoryState(); - this.hasStartedOfflineSync = false; - this.hasFinishedOfflineSync = false; - this.serverConfig.reset(); - - await this.startSyncing(); - } - - public getSettings(): SyncSettings { - return this.settings.getSettings(); - } - - public async setSetting( - key: T, - value: SyncSettings[T] - ): Promise { - this.checkIfDestroyed("setSetting"); - - await this.settings.setSetting(key, value); - } - - public async setSettings(value: Partial): Promise { - this.checkIfDestroyed("setSettings"); - - await this.settings.setSettings(value); - } - - public addOnSettingsChangeListener( - listener: (settings: SyncSettings, oldSettings: SyncSettings) => unknown - ): void { - this.checkIfDestroyed("addOnSettingsChangeListener"); - - this.settings.addOnSettingsChangeListener(listener); - } - - public addRemainingSyncOperationsListener( - listener: (remainingOperations: number) => unknown - ): void { - this.checkIfDestroyed("addRemainingSyncOperationsListener"); - - this.syncer.addRemainingOperationsListener(listener); - } - - public addWebSocketStatusChangeListener(listener: () => unknown): void { - this.checkIfDestroyed("addWebSocketStatusChangeListener"); - - this.webSocketManager.addWebSocketStatusChangeListener(listener); - } - - public async syncLocallyCreatedFile( - relativePath: RelativePath - ): Promise { - this.checkIfDestroyed("syncLocallyCreatedFile"); - - this.fileChangeNotifier.notifyOfFileChange(relativePath); - return this.syncer.syncLocallyCreatedFile(relativePath); - } - - public async syncLocallyDeletedFile( - relativePath: RelativePath - ): Promise { - this.checkIfDestroyed("syncLocallyDeletedFile"); - - this.fileChangeNotifier.notifyOfFileChange(relativePath); - return this.syncer.syncLocallyDeletedFile(relativePath); - } - - public async syncLocallyUpdatedFile({ - oldPath, - relativePath - }: { - oldPath?: RelativePath; - relativePath: RelativePath; - }): Promise { - this.checkIfDestroyed("syncLocallyUpdatedFile"); - - this.fileChangeNotifier.notifyOfFileChange(relativePath); - return this.syncer.syncLocallyUpdatedFile({ - oldPath, - relativePath - }); - } - - public getDocumentSyncingStatus( - relativePath: RelativePath - ): DocumentSyncStatus { - this.checkIfDestroyed("getDocumentSyncingStatus"); - - if (!this.settings.getSettings().isSyncEnabled) { - return DocumentSyncStatus.SYNCING_IS_DISABLED; - } - - if (!this.syncer.isFirstSyncComplete || !this.hasFinishedOfflineSync) { - return DocumentSyncStatus.SYNCING; - } - - const document = - this.database.getLatestDocumentByRelativePath(relativePath); - if (document === undefined) { - return DocumentSyncStatus.SYNCING; - } - return document.updates.length > 0 - ? DocumentSyncStatus.SYNCING - : DocumentSyncStatus.UP_TO_DATE; - } - - public async updateLocalCursors( - documentToCursors: Record - ): Promise { - this.checkIfDestroyed("updateLocalCursors"); - - await this.cursorTracker.sendLocalCursorsToServer(documentToCursors); - } - - public addRemoteCursorsUpdateListener( - listener: (cursors: MaybeOutdatedClientCursors[]) => unknown - ): void { - this.checkIfDestroyed("addRemoteCursorsUpdateListener"); - - this.cursorTracker.addRemoteCursorsUpdateListener(listener); - } - - public async waitUntilFinished(): Promise { - this.checkIfDestroyed("waitUntilIdle"); - await this.syncer.waitUntilFinished(); - await this.webSocketManager.waitUntilFinished(); - await this.database.save(); // flush all changes to disk - } - - /** - * Completely destroy the SyncClient, cancelling all in-progress operations. - * After calling this method, the SyncClient cannot be used again. - */ - public async destroy(): Promise { - this.checkIfDestroyed("destroy"); - - // cancel everything that's in progress - await this.pause(); - - this.hasBeenDestroyed = true; - - this.resetInMemoryState(); - - this.logger.info("SyncClient has been successfully disposed"); - - this.unloadTelemetry?.(); - } - - private async startSyncing(): Promise { - this.checkIfDestroyed("startSyncing"); - this.fetchController.finishReset(); - - await this.serverConfig.initialize(); - this.webSocketManager.start(); - - if (!this.hasStartedOfflineSync) { - this.hasStartedOfflineSync = true; - await this.syncer.scheduleSyncForOfflineChanges(); - } - - this.hasFinishedOfflineSync = true; - } - - private async pause(): Promise { - this.fetchController.startReset(); - await this.webSocketManager.stop(); - await this.waitUntilFinished(); - } - - private resetInMemoryState(): void { - this.history.reset(); - this.contentCache.reset(); - // don't reset the logger - this.cursorTracker.reset(); - this.syncer.reset(); - this.fileOperations.reset(); - } - - private async onSettingsChange( - newSettings: SyncSettings, - oldSettings: SyncSettings - ): Promise { - this.checkIfDestroyed("onSettingsChange"); - - if ( - newSettings.vaultName !== oldSettings.vaultName || - newSettings.remoteUri !== oldSettings.remoteUri - ) { - await this.reset(); - } - - if (newSettings.isSyncEnabled !== oldSettings.isSyncEnabled) { - if (newSettings.isSyncEnabled) { - await this.startSyncing(); - } else { - await this.pause(); - } - } - - if (newSettings.diffCacheSizeMB !== oldSettings.diffCacheSizeMB) { - this.contentCache.resize(newSettings.diffCacheSizeMB * 1024 * 1024); - } - - if (newSettings.enableTelemetry !== oldSettings.enableTelemetry) { - if (newSettings.enableTelemetry) { - this.unloadTelemetry = setUpTelemetry(); - } else { - this.unloadTelemetry?.(); - } - } - } - - private checkIfDestroyed(origin: string): void { - if (this.hasBeenDestroyed) { - throw new Error( - `SyncClient has been destroyed and can no longer be used; called from ${origin}` - ); - } - } + private hasStartedOfflineSync = false; + private hasFinishedOfflineSync = false; + private hasStarted = false; + private hasBeenDestroyed = false; + private unloadTelemetry?: () => void; + + private constructor( + private readonly history: SyncHistory, + private readonly settings: Settings, + private readonly database: Database, + private readonly syncer: Syncer, + private readonly webSocketManager: WebSocketManager, + public readonly logger: Logger, + private readonly fetchController: FetchController, + private readonly cursorTracker: CursorTracker, + private readonly fileChangeNotifier: FileChangeNotifier, + private readonly contentCache: FixedSizeDocumentCache, + private readonly fileOperations: FileOperations, + private readonly serverConfig: ServerConfig, + private readonly persistence: PersistenceProvider< + Partial<{ + settings: Partial; + database: Partial; + }> + > + ) { } + + public get documentCount(): number { + return this.database.length; + } + + public get isWebSocketConnected(): boolean { + return this.webSocketManager.isWebSocketConnected; + } + public static async create({ + fs, + persistence, + fetch, + webSocket, + nativeLineEndings = "\n" + }: { + fs: FileSystemOperations; + persistence: PersistenceProvider< + Partial<{ + settings: Partial; + database: Partial; + }> + >; + fetch?: typeof globalThis.fetch; + webSocket?: typeof globalThis.WebSocket; + nativeLineEndings?: string; + }): Promise { + const logger = new Logger(); + + const deviceId = createClientId(); + + logger.info(`Creating SyncClient with client id ${deviceId}`); + + const history = new SyncHistory(logger); + + let state = (await persistence.load()) ?? { + settings: undefined, + database: undefined + }; + + const settings = new Settings( + logger, + state.settings, + async (data): Promise => { + state = { ...state, settings: data }; + // we're not rate-limiting settings saves as (1) we need to initialise the settings to know the rate limit + // and (2) settings changes are infrequent enough that rate-limiting is not necessary + await persistence.save(state); + } + ); + + const rateLimitedSave = rateLimit( + persistence.save, + () => settings.getSettings().minimumSaveIntervalMs + ); + + const database = new Database( + logger, + state.database, + async (data): Promise => { + state = { ...state, database: data }; + await rateLimitedSave(state); + } + ); + + const fetchController = new FetchController( + settings.getSettings().isSyncEnabled, + logger + ); + settings.onSettingsChanged.add((newSettings, oldSettings) => { + if (oldSettings.isSyncEnabled != newSettings.isSyncEnabled) { + fetchController.canFetch = newSettings.isSyncEnabled; + } + }); + + const syncService = new SyncService( + deviceId, + fetchController, + settings, + logger, + fetch + ); + + const serverConfig = new ServerConfig(syncService); + + const fileOperations = new FileOperations( + logger, + database, + fs, + serverConfig, + nativeLineEndings + ); + + const contentCache = new FixedSizeDocumentCache( + 1024 * 1024 * DIFF_CACHE_SIZE_MB + ); + const unrestrictedSyncer = new UnrestrictedSyncer( + logger, + database, + settings, + syncService, + fileOperations, + history, + contentCache, + serverConfig + ); + + const webSocketManager = new WebSocketManager( + deviceId, + logger, + settings, + webSocket + ); + + const syncer = new Syncer( + deviceId, + logger, + database, + settings, + syncService, + webSocketManager, + fileOperations, + unrestrictedSyncer + ); + + const fileChangeNotifier = new FileChangeNotifier(); + const cursorTracker = new CursorTracker( + database, + webSocketManager, + fileOperations, + fileChangeNotifier + ); + const client = new SyncClient( + history, + settings, + database, + syncer, + webSocketManager, + logger, + fetchController, + cursorTracker, + fileChangeNotifier, + contentCache, + fileOperations, + serverConfig, + persistence + ); + + logger.info("SyncClient created successfully"); + + return client; + } + + public async start(): Promise { + this.checkIfDestroyed("start"); + + if (this.hasStarted) { + throw new Error("SyncClient has already been started"); + } + this.hasStarted = true; + + if ( + !this.unloadTelemetry && + this.settings.getSettings().enableTelemetry + ) { + this.unloadTelemetry = setUpTelemetry(); + } + + this.logger.onLogEmitted.add((log): void => { + if (log.level === LogLevel.ERROR && Sentry.isInitialized()) { + Sentry.captureMessage(log.message); + } + }); + + this.settings.onSettingsChanged.add( + this.onSettingsChange.bind(this) + ); + + if (this.settings.getSettings().isSyncEnabled) { + this.logger.info("Starting SyncClient"); + await this.startSyncing(); + this.logger.info("SyncClient has successfully started"); + } + } + + /** + * Reload settings from disk overriding current in-memory settings. + * Missing values will be filled in from DEFAULT_SETTINGS rather than + * retaining current in-memory settings. + */ + public async reloadSettings(): Promise { + this.checkIfDestroyed("reloadSettings"); + + const state = (await this.persistence.load()) ?? { + settings: undefined + }; + + const settings = { + ...DEFAULT_SETTINGS, + ...(state.settings ?? {}) + }; + + await this.setSettings(settings); + } + + public async checkConnection(): Promise { + this.checkIfDestroyed("checkConnection"); + + const server = await this.serverConfig.checkConnection(true); + return { + isSuccessful: server.isSuccessful, + serverMessage: server.message, + isWebSocketConnected: this.webSocketManager.isWebSocketConnected + }; + } + + public getHistoryEntries(): readonly HistoryEntry[] { + return this.history.entries; + } + + /** + * Wait for the in-flight operations to finish, reset all tracking, + * and the local database but retain the settings. + * The SyncClient can be used again after calling this method. + */ + public async reset(): Promise { + this.checkIfDestroyed("reset"); + + this.logger.info( + "Stopping SyncClient to apply changed connection settings" + ); + await this.pause(); + + // clear all local state + this.logger.info("Resetting SyncClient's local state"); + this.database.reset(); + await this.database.save(); // ensure the new database reads as empty + this.resetInMemoryState(); + this.hasStartedOfflineSync = false; + this.hasFinishedOfflineSync = false; + this.serverConfig.reset(); + + await this.startSyncing(); + } + + public getSettings(): SyncSettings { + return this.settings.getSettings(); + } + + public async setSetting( + key: T, + value: SyncSettings[T] + ): Promise { + this.checkIfDestroyed("setSetting"); + + await this.settings.setSetting(key, value); + } + + public async setSettings(value: Partial): Promise { + this.checkIfDestroyed("setSettings"); + + await this.settings.setSettings(value); + } + + public get onSyncHistoryUpdated(): EventListeners< + (stats: HistoryStats) => unknown + > { + this.checkIfDestroyed("onSyncHistoryUpdated getter"); + return this.history.onHistoryUpdated; + } + + + + + public get onSettingsChanged(): EventListeners< + (newSettings: SyncSettings, oldSettings: SyncSettings) => unknown + > { + this.checkIfDestroyed("onSettingsChanged getter"); + return this.settings.onSettingsChanged; + } + + public get onRemainingOperationsCountChanged(): EventListeners< + (remainingOperationsCount: number) => unknown + > { + this.checkIfDestroyed("onRemainingOperationsCountChanged getter"); + return this.syncer.onRemainingOperationsCountChanged; + } + + public get onWebSocketStatusChanged(): EventListeners< + (isConnected: boolean) => unknown + > { + this.checkIfDestroyed("onWebSocketStatusChanged getter"); + return this.webSocketManager.onWebSocketStatusChanged; + } + + public async syncLocallyCreatedFile( + relativePath: RelativePath + ): Promise { + this.checkIfDestroyed("syncLocallyCreatedFile"); + + this.fileChangeNotifier.notifyOfFileChange(relativePath); + return this.syncer.syncLocallyCreatedFile(relativePath); + } + + public async syncLocallyDeletedFile( + relativePath: RelativePath + ): Promise { + this.checkIfDestroyed("syncLocallyDeletedFile"); + + this.fileChangeNotifier.notifyOfFileChange(relativePath); + return this.syncer.syncLocallyDeletedFile(relativePath); + } + + public async syncLocallyUpdatedFile({ + oldPath, + relativePath + }: { + oldPath?: RelativePath; + relativePath: RelativePath; + }): Promise { + this.checkIfDestroyed("syncLocallyUpdatedFile"); + + this.fileChangeNotifier.notifyOfFileChange(relativePath); + return this.syncer.syncLocallyUpdatedFile({ + oldPath, + relativePath + }); + } + + public getDocumentSyncingStatus( + relativePath: RelativePath + ): DocumentSyncStatus { + this.checkIfDestroyed("getDocumentSyncingStatus"); + + if (!this.settings.getSettings().isSyncEnabled) { + return DocumentSyncStatus.SYNCING_IS_DISABLED; + } + + if (!this.syncer.isFirstSyncComplete || !this.hasFinishedOfflineSync) { + return DocumentSyncStatus.SYNCING; + } + + const document = + this.database.getLatestDocumentByRelativePath(relativePath); + if (document === undefined) { + return DocumentSyncStatus.SYNCING; + } + return document.updates.length > 0 + ? DocumentSyncStatus.SYNCING + : DocumentSyncStatus.UP_TO_DATE; + } + + public async updateLocalCursors( + documentToCursors: Record + ): Promise { + this.checkIfDestroyed("updateLocalCursors"); + + await this.cursorTracker.sendLocalCursorsToServer(documentToCursors); + } + + + public get onRemoteCursorsUpdated(): EventListeners< + (cursors: MaybeOutdatedClientCursors[]) => unknown + > { + this.checkIfDestroyed("onRemoteCursorsUpdated getter"); + return this.cursorTracker.onRemoteCursorsUpdated; + } + + public async waitUntilFinished(): Promise { + this.checkIfDestroyed("waitUntilIdle"); + await this.syncer.waitUntilFinished(); + await this.webSocketManager.waitUntilFinished(); + await this.database.save(); // flush all changes to disk + } + + /** + * Completely destroy the SyncClient, cancelling all in-progress operations. + * After calling this method, the SyncClient cannot be used again. + */ + public async destroy(): Promise { + this.checkIfDestroyed("destroy"); + + // cancel everything that's in progress + await this.pause(); + + this.hasBeenDestroyed = true; + + this.resetInMemoryState(); + + this.logger.info("SyncClient has been successfully disposed"); + + this.unloadTelemetry?.(); + } + + private async startSyncing(): Promise { + this.checkIfDestroyed("startSyncing"); + this.fetchController.finishReset(); + + await this.serverConfig.initialize(); + this.webSocketManager.start(); + + if (!this.hasStartedOfflineSync) { + this.hasStartedOfflineSync = true; + await this.syncer.scheduleSyncForOfflineChanges(); + } + + this.hasFinishedOfflineSync = true; + } + + private async pause(): Promise { + this.fetchController.startReset(); + await this.webSocketManager.stop(); + await this.waitUntilFinished(); + } + + private resetInMemoryState(): void { + this.history.reset(); + this.contentCache.reset(); + // don't reset the logger + this.cursorTracker.reset(); + this.syncer.reset(); + this.fileOperations.reset(); + } + + private async onSettingsChange( + newSettings: SyncSettings, + oldSettings: SyncSettings + ): Promise { + this.checkIfDestroyed("onSettingsChange"); + + if ( + newSettings.vaultName !== oldSettings.vaultName || + newSettings.remoteUri !== oldSettings.remoteUri + ) { + await this.reset(); + } + + if (newSettings.isSyncEnabled !== oldSettings.isSyncEnabled) { + if (newSettings.isSyncEnabled) { + await this.startSyncing(); + } else { + await this.pause(); + } + } + + if (newSettings.diffCacheSizeMB !== oldSettings.diffCacheSizeMB) { + this.contentCache.resize(newSettings.diffCacheSizeMB * 1024 * 1024); + } + + if (newSettings.enableTelemetry !== oldSettings.enableTelemetry) { + if (newSettings.enableTelemetry) { + this.unloadTelemetry = setUpTelemetry(); + } else { + this.unloadTelemetry?.(); + } + } + } + + private checkIfDestroyed(origin: string): void { + if (this.hasBeenDestroyed) { + throw new Error( + `SyncClient has been destroyed and can no longer be used; called from ${origin}` + ); + } + } } diff --git a/frontend/sync-client/src/sync-operations/cursor-tracker.ts b/frontend/sync-client/src/sync-operations/cursor-tracker.ts index d4cf3c53..f60cd588 100644 --- a/frontend/sync-client/src/sync-operations/cursor-tracker.ts +++ b/frontend/sync-client/src/sync-operations/cursor-tracker.ts @@ -9,252 +9,252 @@ import { DocumentUpToDateness } from "../types/document-up-to-dateness"; import { hash } from "../utils/hash"; import type { FileChangeNotifier } from "./file-change-notifier"; import { Lock } from "../utils/data-structures/locks"; +import { EventListeners } from "../utils/data-structures/event-listeners"; // Cursor positions are updated separately from documents. However, a given cursor position is only // valid within a certain version of the document it belongs to. This class tracks previous and the latest // known remote cursor positions, and for each document, tries to return the latest cursor positions that are // not from the future. export class CursorTracker { - private readonly updateLock = new Lock(); + private readonly updateLock = new Lock(); - private knownRemoteCursors: (ClientCursors & { - upToDateness: DocumentUpToDateness; - })[] = []; + // The returned position may be accurate, if it matches the document version, or outdated, in which case + // the client has to heuristically guess it's current position based on the local edits. + public readonly onRemoteCursorsUpdated = new EventListeners< + (cursors: MaybeOutdatedClientCursors[]) => unknown + >(); - private lastLocalCursorState: DocumentWithCursors[] = []; - private lastLocalCursorStateWithoutDirtyDocuments: DocumentWithCursors[] = - []; + private knownRemoteCursors: (ClientCursors & { + upToDateness: DocumentUpToDateness; + })[] = []; - public constructor( - private readonly database: Database, - private readonly webSocketManager: WebSocketManager, - private readonly fileOperations: FileOperations, - private readonly fileChangeNotifier: FileChangeNotifier - ) { - this.webSocketManager.addRemoteCursorsUpdateListener( - async (clientCursors) => { - await this.updateLock.withLock(async () => { - // The latest message will contain all active clients, so we can delete the ones - // from the local list which are no longer active. - const allIds = new Set( - clientCursors.map((c) => c.deviceId) - ); - const updatedKnownRemoteCursors = - this.knownRemoteCursors.filter((c) => - allIds.has(c.deviceId) - ); + private lastLocalCursorState: DocumentWithCursors[] = []; + private lastLocalCursorStateWithoutDirtyDocuments: DocumentWithCursors[] = + []; - for (const cursor of clientCursors.filter((client) => - client.documentsWithCursors.every( - (doc) => doc.vault_update_id != null - ) - )) { - updatedKnownRemoteCursors.push({ - ...cursor, - upToDateness: - await this.getDocumentsUpToDateness(cursor) - }); - } + public constructor( + private readonly database: Database, + private readonly webSocketManager: WebSocketManager, + private readonly fileOperations: FileOperations, + private readonly fileChangeNotifier: FileChangeNotifier + ) { + this.webSocketManager.onRemoteCursorsUpdateReceived.add( + async (clientCursors) => { + await this.updateLock.withLock(async () => { + // The latest message will contain all active clients, so we can delete the ones + // from the local list which are no longer active. + const allIds = new Set( + clientCursors.map((c) => c.deviceId) + ); + const updatedKnownRemoteCursors = + this.knownRemoteCursors.filter((c) => + allIds.has(c.deviceId) + ); - this.knownRemoteCursors = updatedKnownRemoteCursors; - }); - } - ); + for (const cursor of clientCursors.filter((client) => + client.documentsWithCursors.every( + (doc) => doc.vault_update_id != null + ) + )) { + updatedKnownRemoteCursors.push({ + ...cursor, + upToDateness: + await this.getDocumentsUpToDateness(cursor) + }); + } - this.fileChangeNotifier.addFileChangeListener(async (relativePath) => - this.updateLock.withLock(async () => { - for (const clientCursor of this.knownRemoteCursors) { - if ( - clientCursor.documentsWithCursors.some( - (document) => - document.relative_path === relativePath - ) - ) { - clientCursor.upToDateness = - await this.getDocumentsUpToDateness(clientCursor); - } - } - }) - ); - } + this.knownRemoteCursors = updatedKnownRemoteCursors; + }); - /// Update the local cursors for the given documents. - /// Can be called frequently as it only emits an event - /// if the state has actually changed. - public async sendLocalCursorsToServer( - documentToCursors: Record - ): Promise { - const documentsWithCursors: DocumentWithCursors[] = []; + this.onRemoteCursorsUpdated.trigger( + this.getRelevantAndPruneKnownClientCursors() + ); + } + ); - for (const [relativePath, cursors] of Object.entries( - documentToCursors - )) { - const record = - this.database.getLatestDocumentByRelativePath(relativePath); - if (!record) { - continue; // Let's wait for the file to be created before sending cursors - } + this.fileChangeNotifier.onFileChanged.add(async (relativePath) => + this.updateLock.withLock(async () => { + for (const clientCursor of this.knownRemoteCursors) { + if ( + clientCursor.documentsWithCursors.some( + (document) => + document.relative_path === relativePath + ) + ) { + clientCursor.upToDateness = + await this.getDocumentsUpToDateness(clientCursor); + } + } + }) + ); + } - if (!record.metadata) { - continue; // this is a new document, no need to sync the cursors - } + /// Update the local cursors for the given documents. + /// Can be called frequently as it only emits an event + /// if the state has actually changed. + public async sendLocalCursorsToServer( + documentToCursors: Record + ): Promise { + const documentsWithCursors: DocumentWithCursors[] = []; - documentsWithCursors.push({ - relative_path: relativePath, - document_id: record.documentId, - vault_update_id: record.metadata.parentVersionId, - cursors: cursors.map(({ start, end }) => ({ - start: Math.min(start, end), - end: Math.max(start, end) - })) // the client might send directional selections - }); - } + for (const [relativePath, cursors] of Object.entries( + documentToCursors + )) { + const record = + this.database.getLatestDocumentByRelativePath(relativePath); - if ( - JSON.stringify(this.lastLocalCursorState) === - JSON.stringify(documentsWithCursors) - ) { - // Caching step to avoid reading the edited files all the time - return; - } - this.lastLocalCursorState = documentsWithCursors; + if (!record) { + continue; // Let's wait for the file to be created before sending cursors + } - for (const doc of documentsWithCursors) { - const readContent = await this.fileOperations.read( - doc.relative_path - ); - const record = this.database.getLatestDocumentByRelativePath( - doc.relative_path - ); - if (record?.metadata?.hash !== hash(readContent)) { - doc.vault_update_id = null; - } - } + if (!record.metadata) { + continue; // this is a new document, no need to sync the cursors + } - if ( - JSON.stringify(this.lastLocalCursorStateWithoutDirtyDocuments) === - JSON.stringify(documentsWithCursors) - ) { - return; - } + documentsWithCursors.push({ + relative_path: relativePath, + document_id: record.documentId, + vault_update_id: record.metadata.parentVersionId, + cursors: cursors.map(({ start, end }) => ({ + start: Math.min(start, end), + end: Math.max(start, end) + })) // the client might send directional selections + }); + } - this.lastLocalCursorStateWithoutDirtyDocuments = documentsWithCursors; + if ( + JSON.stringify(this.lastLocalCursorState) === + JSON.stringify(documentsWithCursors) + ) { + // Caching step to avoid reading the edited files all the time + return; + } + this.lastLocalCursorState = documentsWithCursors; - this.webSocketManager.updateLocalCursors({ documentsWithCursors }); - } + for (const doc of documentsWithCursors) { + const readContent = await this.fileOperations.read( + doc.relative_path + ); + const record = this.database.getLatestDocumentByRelativePath( + doc.relative_path + ); + if (record?.metadata?.hash !== hash(readContent)) { + doc.vault_update_id = null; + } + } - // The returned position may be accurate, if it matches the document version, or outdated, in which case - // the client has to heuristically guess it's current position based on the local edits. - public addRemoteCursorsUpdateListener( - listener: (cursors: MaybeOutdatedClientCursors[]) => unknown - ): void { - // CursorTracker registers its own event listener in the constructor so it must have been called before this - this.webSocketManager.addRemoteCursorsUpdateListener(async () => { - await this.updateLock.withLock(() => - listener(this.getRelevantAndPruneKnownClientCursors()) - ); - }); - } + if ( + JSON.stringify(this.lastLocalCursorStateWithoutDirtyDocuments) === + JSON.stringify(documentsWithCursors) + ) { + return; + } - public reset(): void { - this.knownRemoteCursors = []; - this.lastLocalCursorState = []; - this.lastLocalCursorStateWithoutDirtyDocuments = []; - this.updateLock.reset(); - } + this.lastLocalCursorStateWithoutDirtyDocuments = documentsWithCursors; - private getRelevantAndPruneKnownClientCursors(): MaybeOutdatedClientCursors[] { - const result: MaybeOutdatedClientCursors[] = []; - const included = new Set(); + this.webSocketManager.updateLocalCursors({ documentsWithCursors }); + } - const relevantCursors = []; - for (const clientCursors of [...this.knownRemoteCursors].reverse()) { - if (included.has(clientCursors.deviceId)) { - continue; - } - if (clientCursors.upToDateness === DocumentUpToDateness.Later) { - continue; - } + public reset(): void { + this.knownRemoteCursors = []; + this.lastLocalCursorState = []; + this.lastLocalCursorStateWithoutDirtyDocuments = []; + this.updateLock.reset(); + } - result.push({ - ...clientCursors, - isOutdated: - clientCursors.upToDateness === DocumentUpToDateness.Prior - }); + private getRelevantAndPruneKnownClientCursors(): MaybeOutdatedClientCursors[] { + const result: MaybeOutdatedClientCursors[] = []; + const included = new Set(); - included.add(clientCursors.deviceId); - relevantCursors.unshift(clientCursors); // to reverse order back to normal - } + const relevantCursors = []; + for (const clientCursors of [...this.knownRemoteCursors].reverse()) { + if (included.has(clientCursors.deviceId)) { + continue; + } - this.knownRemoteCursors = relevantCursors; + if (clientCursors.upToDateness === DocumentUpToDateness.Later) { + continue; + } - return result; - } + result.push({ + ...clientCursors, + isOutdated: + clientCursors.upToDateness === DocumentUpToDateness.Prior + }); - // We store up-to-dateness on a per-client basis to simplify the implementation. - // An individual client won't have too many documents open at once, so this is a reasonable trade-off. - private async getDocumentsUpToDateness( - clientCursor: ClientCursors - ): Promise { - const results = []; - for (const document of clientCursor.documentsWithCursors) { - results.push(await this.getDocumentUpToDateness(document)); - } + included.add(clientCursors.deviceId); + relevantCursors.unshift(clientCursors); // to reverse order back to normal + } - if ( - results.every((result) => result === DocumentUpToDateness.UpToDate) - ) { - return DocumentUpToDateness.UpToDate; - } + this.knownRemoteCursors = relevantCursors; - if ( - results.every( - (result) => - result === DocumentUpToDateness.UpToDate || - result === DocumentUpToDateness.Prior - ) - ) { - return DocumentUpToDateness.Prior; - } + return result; + } - return DocumentUpToDateness.Later; - } + // We store up-to-dateness on a per-client basis to simplify the implementation. + // An individual client won't have too many documents open at once, so this is a reasonable trade-off. + private async getDocumentsUpToDateness( + clientCursor: ClientCursors + ): Promise { + const results = []; + for (const document of clientCursor.documentsWithCursors) { + results.push(await this.getDocumentUpToDateness(document)); + } - private async getDocumentUpToDateness( - document: DocumentWithCursors - ): Promise { - const record = this.database.getLatestDocumentByRelativePath( - document.relative_path - ); + if ( + results.every((result) => result === DocumentUpToDateness.UpToDate) + ) { + return DocumentUpToDateness.UpToDate; + } - if (!record) { - // the document of the cursor must be from the future - return DocumentUpToDateness.Later; - } + if ( + results.every( + (result) => + result === DocumentUpToDateness.UpToDate || + result === DocumentUpToDateness.Prior + ) + ) { + return DocumentUpToDateness.Prior; + } - if ( - (record.metadata?.parentVersionId ?? 0) < - (document.vault_update_id ?? 0) - ) { - return DocumentUpToDateness.Later; - } else if ( - (document.vault_update_id ?? 0) < - (record.metadata?.parentVersionId ?? 0) - ) { - // the document of the cursor must be from the past - return DocumentUpToDateness.Prior; - } + return DocumentUpToDateness.Later; + } - const currentContent = await this.fileOperations.read( - document.relative_path - ); + private async getDocumentUpToDateness( + document: DocumentWithCursors + ): Promise { + const record = this.database.getLatestDocumentByRelativePath( + document.relative_path + ); - return this.database.getLatestDocumentByRelativePath( - document.relative_path - )?.metadata?.hash === hash(currentContent) - ? DocumentUpToDateness.UpToDate - : DocumentUpToDateness.Prior; - } + if (!record) { + // the document of the cursor must be from the future + return DocumentUpToDateness.Later; + } + + if ( + (record.metadata?.parentVersionId ?? 0) < + (document.vault_update_id ?? 0) + ) { + return DocumentUpToDateness.Later; + } else if ( + (document.vault_update_id ?? 0) < + (record.metadata?.parentVersionId ?? 0) + ) { + // the document of the cursor must be from the past + return DocumentUpToDateness.Prior; + } + + const currentContent = await this.fileOperations.read( + document.relative_path + ); + + return this.database.getLatestDocumentByRelativePath( + document.relative_path + )?.metadata?.hash === hash(currentContent) + ? DocumentUpToDateness.UpToDate + : DocumentUpToDateness.Prior; + } } diff --git a/frontend/sync-client/src/sync-operations/file-change-notifier.ts b/frontend/sync-client/src/sync-operations/file-change-notifier.ts index d2b40c1f..d1e49d62 100644 --- a/frontend/sync-client/src/sync-operations/file-change-notifier.ts +++ b/frontend/sync-client/src/sync-operations/file-change-notifier.ts @@ -1,22 +1,12 @@ import type { RelativePath } from "../persistence/database"; -import { removeFromArray } from "../utils/remove-from-array"; +import { EventListeners } from "../utils/data-structures/event-listeners"; export class FileChangeNotifier { - private readonly listeners: ((filePath: RelativePath) => unknown)[] = []; + public readonly onFileChanged = new EventListeners< + (filePath: RelativePath) => unknown + >(); - public addFileChangeListener( - listener: (filePath: RelativePath) => unknown - ): void { - this.listeners.push(listener); - } - - public removeFileChangeListener( - listener: (filePath: RelativePath) => unknown - ): void { - removeFromArray(this.listeners, listener); - } - - public notifyOfFileChange(filePath: RelativePath): void { - this.listeners.forEach((listener) => listener(filePath)); - } + public notifyOfFileChange(filePath: RelativePath): void { + this.onFileChanged.trigger(filePath); + } } diff --git a/frontend/sync-client/src/sync-operations/syncer.ts b/frontend/sync-client/src/sync-operations/syncer.ts index 65cd020c..e142e409 100644 --- a/frontend/sync-client/src/sync-operations/syncer.ts +++ b/frontend/sync-client/src/sync-operations/syncer.ts @@ -1,8 +1,8 @@ import type { - Database, - DocumentId, - DocumentRecord, - RelativePath + Database, + DocumentId, + DocumentRecord, + RelativePath } from "../persistence/database"; import type { SyncService } from "../services/sync-service"; import type { Logger } from "../tracing/logger"; @@ -21,504 +21,497 @@ import type { WebSocketVaultUpdate } from "../services/types/WebSocketVaultUpdat import type { WebSocketManager } from "../services/websocket-manager"; import type { WebSocketClientMessage } from "../services/types/WebSocketClientMessage"; import { awaitAll } from "../utils/await-all"; +import { EventListeners } from "../utils/data-structures/event-listeners"; export class Syncer { - private readonly remoteDocumentsLock: Locks; - private readonly remainingOperationsListeners: (( - remainingOperations: number - ) => unknown)[] = []; - - // FIFO to limit the number of concurrent sync operations - private readonly syncQueue: PQueue; - - private _isFirstSyncComplete = false; - private runningScheduleSyncForOfflineChanges: Promise | undefined; - - public constructor( - private readonly deviceId: string, - private readonly logger: Logger, - private readonly database: Database, - private readonly settings: Settings, - private readonly syncService: SyncService, - private readonly webSocketManager: WebSocketManager, - private readonly operations: FileOperations, - private readonly internalSyncer: UnrestrictedSyncer - ) { - this.syncQueue = new PQueue({ - concurrency: settings.getSettings().syncConcurrency - }); - - this.remoteDocumentsLock = new Locks(this.logger); - - settings.addOnSettingsChangeListener((newSettings, oldSettings) => { - if (newSettings.syncConcurrency !== oldSettings.syncConcurrency) { - this.syncQueue.concurrency = newSettings.syncConcurrency; - } - }); - - this.syncQueue.on("active", () => { - this.remainingOperationsListeners.forEach((listener) => { - listener(this.syncQueue.size); - }); - }); - - this.webSocketManager.addWebSocketStatusChangeListener( - (isConnected) => { - if (isConnected) { - // The JS WebSocket API doesn't support setting headers, so we have to send the token as a message - this.sendHandshakeMessage(); - } - } - ); - this.webSocketManager.addRemoteVaultUpdateListener( - this.syncRemotelyUpdatedFile.bind(this) - ); - } - - public get isFirstSyncComplete(): boolean { - return this._isFirstSyncComplete; - } - - public addRemainingOperationsListener( - listener: (remainingOperations: number) => unknown - ): void { - this.remainingOperationsListeners.push(listener); - } - - public async syncLocallyCreatedFile( - relativePath: RelativePath - ): Promise { - if ( - this.database.getLatestDocumentByRelativePath(relativePath) - ?.isDeleted === false - ) { - this.logger.debug( - `Document ${relativePath} already exists in the database, skipping` - ); - return; - } - - const [promise, resolve, reject] = createPromise(); - - const id = uuidv4(); - const document = this.database.createNewPendingDocument( - id, - relativePath, - promise - ); - - try { - await this.syncQueue.add(async () => - this.internalSyncer.unrestrictedSyncLocallyCreatedFile(document) - ); - - resolve(); - } catch (e) { - reject(e); - } finally { - this.database.removeDocumentPromise(promise); - } - } - - public async syncLocallyDeletedFile( - relativePath: RelativePath - ): Promise { - if ( - this.database.getLatestDocumentByRelativePath(relativePath) - ?.isDeleted === true - ) { - // This is must be a consequence of us deleting a file because of a remote update - // which triggered a local delete, so we don't need to do anything here. - this.logger.debug( - `Document ${relativePath} has already been markes as deleted, skipping` - ); - return; - } - - // We have to have a record of the delete in case there's an in-flight update for the same - // document which finishes after the delete has succeeded and would introduce a phantom metadata record. - this.database.delete(relativePath); - - const [promise, resolve, reject] = createPromise(); - - const document = await this.database.getResolvedDocumentByRelativePath( - relativePath, - promise - ); - - try { - await this.syncQueue.add(async () => - this.internalSyncer.unrestrictedSyncLocallyDeletedFile(document) - ); - - resolve(); - - this.database.removeDocument(document); - } catch (e) { - reject(e); - } finally { - this.database.removeDocumentPromise(promise); - } - } - - public async syncLocallyUpdatedFile({ - oldPath, - relativePath - }: { - oldPath?: RelativePath; - relativePath: RelativePath; - }): Promise { - if (oldPath !== undefined) { - // We might have moved the document in the database before calling this method, - // in that case, we mustn't move it again. - if ( - this.database.getLatestDocumentByRelativePath(relativePath) === - undefined || - this.database.getLatestDocumentByRelativePath(relativePath) - ?.isDeleted === true - ) { - if (oldPath === relativePath) { - throw new Error( - `Old path and new path are the same: ${oldPath}` - ); - } - - this.database.move(oldPath, relativePath); - } - } - - let document = - this.database.getLatestDocumentByRelativePath(relativePath); - - if ( - oldPath !== undefined && - document?.metadata?.remoteRelativePath === relativePath - ) { - this.logger.debug( - `Document ${relativePath} has been moved as a result of a remote update, skipping sync` - ); - return; - } - - if (document === undefined) { - this.logger.debug( - `Cannot find document ${relativePath} in the database, skipping` - ); - return; - } - - if (document.isDeleted) { - this.logger.debug( - `Document ${relativePath} has been deleted locally, skipping` - ); - return; - } - - const [promise, resolve, reject] = createPromise(); - - document = await this.database.getResolvedDocumentByRelativePath( - relativePath, - promise - ); - - try { - await this.syncQueue.add(async () => - this.internalSyncer.unrestrictedSyncLocallyUpdatedFile({ - oldPath, - document - }) - ); - - resolve(); - } catch (e) { - reject(e); - } finally { - this.database.removeDocumentPromise(promise); - } - } - - public async scheduleSyncForOfflineChanges(): Promise { - if (this.runningScheduleSyncForOfflineChanges !== undefined) { - this.logger.debug("Uploading local changes is already in progress"); - return this.runningScheduleSyncForOfflineChanges; - } - - try { - this.runningScheduleSyncForOfflineChanges = - this.internalScheduleSyncForOfflineChanges(); - await this.runningScheduleSyncForOfflineChanges; - this.logger.info(`All local changes have been applied remotely`); - } catch (e) { - if (e instanceof SyncResetError) { - this.logger.info( - "Failed to apply local changes remotely due to a reset" - ); - return; - } - this.logger.error( - `Not all local changes have been applied remotely: ${e}` - ); - throw e; - } finally { - this.runningScheduleSyncForOfflineChanges = undefined; - } - } - - public async waitUntilFinished(): Promise { - await this.runningScheduleSyncForOfflineChanges; - await this.syncQueue.onEmpty(); - } - - public async syncRemotelyUpdatedFile( - message: WebSocketVaultUpdate - ): Promise { - try { - const handlerPromise = awaitAll( - message.documents.map(async (document) => - this.internalSyncRemotelyUpdatedFile(document) - ) - ); - - await handlerPromise; - - if (message.isInitialSync && message.documents.length > 0) { - this.database.setLastSeenUpdateId( - message.documents - .map((document) => document.vaultUpdateId) - .reduce((a, b) => Math.max(a, b)) - ); - } - - this._isFirstSyncComplete = true; - } catch (e) { - this.logger.error(`Failed to sync remotely updated file: ${e}`); - } - } - - public reset(): void { - this._isFirstSyncComplete = false; - this.syncQueue.clear(); - this.remoteDocumentsLock.reset(); - this.runningScheduleSyncForOfflineChanges = undefined; - } - - private sendHandshakeMessage(): void { - const message: WebSocketClientMessage = { - type: "handshake", - deviceId: this.deviceId, - token: this.settings.getSettings().token, - lastSeenVaultUpdateId: this.database.getLastSeenUpdateId() - }; - this.webSocketManager.sendHandshakeMessage(message); - } - - private async internalSyncRemotelyUpdatedFile( - remoteVersion: DocumentVersionWithoutContent - ): Promise { - let document = this.database.getDocumentByDocumentId( - remoteVersion.documentId - ); - - if (document === undefined) { - // Let's avoid the same documents getting created in parallel multiple times. - // There might be multiple tasks waiting for the lock - return this.remoteDocumentsLock.withLock( - remoteVersion.documentId, - async () => { - document = this.database.getDocumentByDocumentId( - remoteVersion.documentId - ); - - // We're either the first one to get the lock, so we have to create the document in `unrestrictedSyncRemotelyUpdatedFile` - if (document === undefined) { - await this.syncQueue.add(async () => - this.internalSyncer.unrestrictedSyncRemotelyUpdatedFile( - remoteVersion - ) - ); - } else { - const [promise, resolve, reject] = createPromise(); - - document = - await this.database.getResolvedDocumentByRelativePath( - document.relativePath, - promise - ); - - try { - await this.syncQueue.add(async () => - this.internalSyncer.unrestrictedSyncRemotelyUpdatedFile( - remoteVersion, - document - ) - ); - - resolve(); - } catch (e) { - reject(e); - } finally { - this.database.removeDocumentPromise(promise); - } - } - - this.database.addSeenUpdateId(remoteVersion.vaultUpdateId); - } - ); - } - - // We're either the first one to get the lock, so we have to create the document in `unrestrictedSyncRemotelyUpdatedFile` - const [promise, resolve, reject] = createPromise(); - - document = await this.database.getResolvedDocumentByRelativePath( - document.relativePath, - promise - ); - - try { - await this.syncQueue.add(async () => - this.internalSyncer.unrestrictedSyncRemotelyUpdatedFile( - remoteVersion, - document - ) - ); - - resolve(); - } catch (e) { - reject(e); - } finally { - this.database.removeDocumentPromise(promise); - } - - this.database.addSeenUpdateId(remoteVersion.vaultUpdateId); - } - - private async internalScheduleSyncForOfflineChanges(): Promise { - await this.createFakeDocumentsFromRemoteState(); - - const allLocalFiles = await this.operations.listFilesRecursively(); - this.logger.info( - `Scheduling sync for ${allLocalFiles.length} local files` - ); - - let locallyPossiblyDeletedFiles: DocumentRecord[] = []; - - for (const document of this.database.resolvedDocuments) { - if ( - !document.isDeleted && - !(await this.operations.exists(document.relativePath)) - ) { - locallyPossiblyDeletedFiles.push(document); - } - } - - await awaitAll( - allLocalFiles.map(async (relativePath) => { - if ( - this.database.getLatestDocumentByRelativePath(relativePath) - ?.metadata !== undefined - ) { - this.logger.debug( - `Document ${relativePath} might have been updated locally, scheduling sync to validate and update it` - ); - - return this.syncLocallyUpdatedFile({ - relativePath - }); - } - - // Perhaps the file has been moved; let's check by looking at the deleted files - const contentHash = await this.syncQueue.add(async () => { - const contentBytes = - await this.operations.read(relativePath); // this can throw FileNotFoundError - return hash(contentBytes); - }); - - if (contentHash == undefined) { - // The file was deleted before we had a chance to read it, no need to sync it here - return; - } - - const originalFile = findMatchingFile( - contentHash, - locallyPossiblyDeletedFiles - ); - if (originalFile !== undefined) { - // `originalFile` hasn't been deleted but it got moved instead - /* eslint-disable no-restricted-syntax -- Comparing by property, not direct equality */ - locallyPossiblyDeletedFiles = - locallyPossiblyDeletedFiles.filter( - (item) => - item.relativePath !== originalFile.relativePath - ); - /* eslint-enable no-restricted-syntax */ - - this.logger.debug( - `Document '${originalFile.relativePath}' was not found under its current path in the database but was found under a different path (${relativePath}), scheduling sync to move it` - ); - - // We're outside of the pqueue, so we need to call the public wrapper - return this.syncLocallyUpdatedFile({ - oldPath: originalFile.relativePath, - relativePath - }); - } - - this.logger.debug( - `Document ${relativePath} not found in database, scheduling sync to create it` - ); - // We're outside of the pqueue, so we need to call the public wrapper - return this.syncLocallyCreatedFile(relativePath); - }) - ); - - // this has to happen strictly after the previous awaitAll, as that one - // might have removed some of the documents from the list - await awaitAll( - locallyPossiblyDeletedFiles.map(async ({ relativePath }) => { - this.logger.debug( - `Document ${relativePath} has been deleted locally, scheduling sync to delete it` - ); - - // We're outside of the pqueue, so we need to call the public wrapper - return this.syncLocallyDeletedFile(relativePath); - }) - ); - } - - /** - * Create fake documents in the database for all files that are present locally - * and also exist remotely. This will stop the subequent syncs from duplicating - * the documents by creating the same documents from multiple clients. - */ - private async createFakeDocumentsFromRemoteState(): Promise { - if (this.database.getHasInitialSyncCompleted()) { - return; - } - - const [allLocalFiles, remote] = await awaitAll([ - this.operations.listFilesRecursively(), - this.syncQueue.add(async () => this.syncService.getAll()) - ]); - - if (remote !== undefined) { - remote.latestDocuments - .filter( - (remoteDocument) => - allLocalFiles.includes(remoteDocument.relativePath) && - !remoteDocument.isDeleted && - this.database.getDocumentByDocumentId( - remoteDocument.documentId - ) === undefined - ) - .forEach((remoteDocument) => { - this.database.createNewEmptyDocument( - remoteDocument.documentId, - remoteDocument.vaultUpdateId, - remoteDocument.relativePath - ); - }); - } - - this.database.setHasInitialSyncCompleted(true); - } + private readonly remoteDocumentsLock: Locks; + public readonly onRemainingOperationsCountChanged = new EventListeners< + (remainingOperations: number) => unknown + >(); + + // FIFO to limit the number of concurrent sync operations + private readonly syncQueue: PQueue; + + private _isFirstSyncComplete = false; + private runningScheduleSyncForOfflineChanges: Promise | undefined; + + public constructor( + private readonly deviceId: string, + private readonly logger: Logger, + private readonly database: Database, + private readonly settings: Settings, + private readonly syncService: SyncService, + private readonly webSocketManager: WebSocketManager, + private readonly operations: FileOperations, + private readonly internalSyncer: UnrestrictedSyncer + ) { + this.syncQueue = new PQueue({ + concurrency: settings.getSettings().syncConcurrency + }); + + this.remoteDocumentsLock = new Locks(this.logger); + + settings.onSettingsChanged.add((newSettings, oldSettings) => { + if (newSettings.syncConcurrency !== oldSettings.syncConcurrency) { + this.syncQueue.concurrency = newSettings.syncConcurrency; + } + }); + + this.syncQueue.on("active", () => { + this.onRemainingOperationsCountChanged.trigger(this.syncQueue.size); + }); + + this.webSocketManager.onWebSocketStatusChanged.add( + (isConnected) => { + if (isConnected) { + // The JS WebSocket API doesn't support setting headers, so we have to send the token as a message + this.sendHandshakeMessage(); + } + } + ); + this.webSocketManager.onRemoteVaultUpdateReceived.add( + this.syncRemotelyUpdatedFile.bind(this) + ); + } + + public get isFirstSyncComplete(): boolean { + return this._isFirstSyncComplete; + } + + public async syncLocallyCreatedFile( + relativePath: RelativePath + ): Promise { + if ( + this.database.getLatestDocumentByRelativePath(relativePath) + ?.isDeleted === false + ) { + this.logger.debug( + `Document ${relativePath} already exists in the database, skipping` + ); + return; + } + + const [promise, resolve, reject] = createPromise(); + + const id = uuidv4(); + const document = this.database.createNewPendingDocument( + id, + relativePath, + promise + ); + + try { + await this.syncQueue.add(async () => + this.internalSyncer.unrestrictedSyncLocallyCreatedFile(document) + ); + + resolve(); + } catch (e) { + reject(e); + } finally { + this.database.removeDocumentPromise(promise); + } + } + + public async syncLocallyDeletedFile( + relativePath: RelativePath + ): Promise { + if ( + this.database.getLatestDocumentByRelativePath(relativePath) + ?.isDeleted === true + ) { + // This is must be a consequence of us deleting a file because of a remote update + // which triggered a local delete, so we don't need to do anything here. + this.logger.debug( + `Document ${relativePath} has already been markes as deleted, skipping` + ); + return; + } + + // We have to have a record of the delete in case there's an in-flight update for the same + // document which finishes after the delete has succeeded and would introduce a phantom metadata record. + this.database.delete(relativePath); + + const [promise, resolve, reject] = createPromise(); + + const document = await this.database.getResolvedDocumentByRelativePath( + relativePath, + promise + ); + + try { + await this.syncQueue.add(async () => + this.internalSyncer.unrestrictedSyncLocallyDeletedFile(document) + ); + + resolve(); + + this.database.removeDocument(document); + } catch (e) { + reject(e); + } finally { + this.database.removeDocumentPromise(promise); + } + } + + public async syncLocallyUpdatedFile({ + oldPath, + relativePath + }: { + oldPath?: RelativePath; + relativePath: RelativePath; + }): Promise { + if (oldPath !== undefined) { + // We might have moved the document in the database before calling this method, + // in that case, we mustn't move it again. + if ( + this.database.getLatestDocumentByRelativePath(relativePath) === + undefined || + this.database.getLatestDocumentByRelativePath(relativePath) + ?.isDeleted === true + ) { + if (oldPath === relativePath) { + throw new Error( + `Old path and new path are the same: ${oldPath}` + ); + } + + this.database.move(oldPath, relativePath); + } + } + + let document = + this.database.getLatestDocumentByRelativePath(relativePath); + + if ( + oldPath !== undefined && + document?.metadata?.remoteRelativePath === relativePath + ) { + this.logger.debug( + `Document ${relativePath} has been moved as a result of a remote update, skipping sync` + ); + return; + } + + if (document === undefined) { + this.logger.debug( + `Cannot find document ${relativePath} in the database, skipping` + ); + return; + } + + if (document.isDeleted) { + this.logger.debug( + `Document ${relativePath} has been deleted locally, skipping` + ); + return; + } + + const [promise, resolve, reject] = createPromise(); + + document = await this.database.getResolvedDocumentByRelativePath( + relativePath, + promise + ); + + try { + await this.syncQueue.add(async () => + this.internalSyncer.unrestrictedSyncLocallyUpdatedFile({ + oldPath, + document + }) + ); + + resolve(); + } catch (e) { + reject(e); + } finally { + this.database.removeDocumentPromise(promise); + } + } + + public async scheduleSyncForOfflineChanges(): Promise { + if (this.runningScheduleSyncForOfflineChanges !== undefined) { + this.logger.debug("Uploading local changes is already in progress"); + return this.runningScheduleSyncForOfflineChanges; + } + + try { + this.runningScheduleSyncForOfflineChanges = + this.internalScheduleSyncForOfflineChanges(); + await this.runningScheduleSyncForOfflineChanges; + this.logger.info(`All local changes have been applied remotely`); + } catch (e) { + if (e instanceof SyncResetError) { + this.logger.info( + "Failed to apply local changes remotely due to a reset" + ); + return; + } + this.logger.error( + `Not all local changes have been applied remotely: ${e}` + ); + throw e; + } finally { + this.runningScheduleSyncForOfflineChanges = undefined; + } + } + + public async waitUntilFinished(): Promise { + await this.runningScheduleSyncForOfflineChanges; + await this.syncQueue.onEmpty(); + } + + public async syncRemotelyUpdatedFile( + message: WebSocketVaultUpdate + ): Promise { + try { + const handlerPromise = awaitAll( + message.documents.map(async (document) => + this.internalSyncRemotelyUpdatedFile(document) + ) + ); + + await handlerPromise; + + if (message.isInitialSync && message.documents.length > 0) { + this.database.setLastSeenUpdateId( + message.documents + .map((document) => document.vaultUpdateId) + .reduce((a, b) => Math.max(a, b)) + ); + } + + this._isFirstSyncComplete = true; + } catch (e) { + this.logger.error(`Failed to sync remotely updated file: ${e}`); + } + } + + public reset(): void { + this._isFirstSyncComplete = false; + this.syncQueue.clear(); + this.remoteDocumentsLock.reset(); + this.runningScheduleSyncForOfflineChanges = undefined; + } + + private sendHandshakeMessage(): void { + const message: WebSocketClientMessage = { + type: "handshake", + deviceId: this.deviceId, + token: this.settings.getSettings().token, + lastSeenVaultUpdateId: this.database.getLastSeenUpdateId() + }; + this.webSocketManager.sendHandshakeMessage(message); + } + + private async internalSyncRemotelyUpdatedFile( + remoteVersion: DocumentVersionWithoutContent + ): Promise { + let document = this.database.getDocumentByDocumentId( + remoteVersion.documentId + ); + + if (document === undefined) { + // Let's avoid the same documents getting created in parallel multiple times. + // There might be multiple tasks waiting for the lock + return this.remoteDocumentsLock.withLock( + remoteVersion.documentId, + async () => { + document = this.database.getDocumentByDocumentId( + remoteVersion.documentId + ); + + // We're either the first one to get the lock, so we have to create the document in `unrestrictedSyncRemotelyUpdatedFile` + if (document === undefined) { + await this.syncQueue.add(async () => + this.internalSyncer.unrestrictedSyncRemotelyUpdatedFile( + remoteVersion + ) + ); + } else { + const [promise, resolve, reject] = createPromise(); + + document = + await this.database.getResolvedDocumentByRelativePath( + document.relativePath, + promise + ); + + try { + await this.syncQueue.add(async () => + this.internalSyncer.unrestrictedSyncRemotelyUpdatedFile( + remoteVersion, + document + ) + ); + + resolve(); + } catch (e) { + reject(e); + } finally { + this.database.removeDocumentPromise(promise); + } + } + + this.database.addSeenUpdateId(remoteVersion.vaultUpdateId); + } + ); + } + + // We're either the first one to get the lock, so we have to create the document in `unrestrictedSyncRemotelyUpdatedFile` + const [promise, resolve, reject] = createPromise(); + + document = await this.database.getResolvedDocumentByRelativePath( + document.relativePath, + promise + ); + + try { + await this.syncQueue.add(async () => + this.internalSyncer.unrestrictedSyncRemotelyUpdatedFile( + remoteVersion, + document + ) + ); + + resolve(); + } catch (e) { + reject(e); + } finally { + this.database.removeDocumentPromise(promise); + } + + this.database.addSeenUpdateId(remoteVersion.vaultUpdateId); + } + + private async internalScheduleSyncForOfflineChanges(): Promise { + await this.createFakeDocumentsFromRemoteState(); + + const allLocalFiles = await this.operations.listFilesRecursively(); + this.logger.info( + `Scheduling sync for ${allLocalFiles.length} local files` + ); + + let locallyPossiblyDeletedFiles: DocumentRecord[] = []; + + for (const document of this.database.resolvedDocuments) { + if ( + !document.isDeleted && + !(await this.operations.exists(document.relativePath)) + ) { + locallyPossiblyDeletedFiles.push(document); + } + } + + await awaitAll( + allLocalFiles.map(async (relativePath) => { + if ( + this.database.getLatestDocumentByRelativePath(relativePath) + ?.metadata !== undefined + ) { + this.logger.debug( + `Document ${relativePath} might have been updated locally, scheduling sync to validate and update it` + ); + + return this.syncLocallyUpdatedFile({ + relativePath + }); + } + + // Perhaps the file has been moved; let's check by looking at the deleted files + const contentHash = await this.syncQueue.add(async () => { + const contentBytes = + await this.operations.read(relativePath); // this can throw FileNotFoundError + return hash(contentBytes); + }); + + if (contentHash == undefined) { + // The file was deleted before we had a chance to read it, no need to sync it here + return; + } + + const originalFile = findMatchingFile( + contentHash, + locallyPossiblyDeletedFiles + ); + if (originalFile !== undefined) { + // `originalFile` hasn't been deleted but it got moved instead + /* eslint-disable no-restricted-syntax -- Comparing by property, not direct equality */ + locallyPossiblyDeletedFiles = + locallyPossiblyDeletedFiles.filter( + (item) => + item.relativePath !== originalFile.relativePath + ); + /* eslint-enable no-restricted-syntax */ + + this.logger.debug( + `Document '${originalFile.relativePath}' was not found under its current path in the database but was found under a different path (${relativePath}), scheduling sync to move it` + ); + + // We're outside of the pqueue, so we need to call the public wrapper + return this.syncLocallyUpdatedFile({ + oldPath: originalFile.relativePath, + relativePath + }); + } + + this.logger.debug( + `Document ${relativePath} not found in database, scheduling sync to create it` + ); + // We're outside of the pqueue, so we need to call the public wrapper + return this.syncLocallyCreatedFile(relativePath); + }) + ); + + // this has to happen strictly after the previous awaitAll, as that one + // might have removed some of the documents from the list + await awaitAll( + locallyPossiblyDeletedFiles.map(async ({ relativePath }) => { + this.logger.debug( + `Document ${relativePath} has been deleted locally, scheduling sync to delete it` + ); + + // We're outside of the pqueue, so we need to call the public wrapper + return this.syncLocallyDeletedFile(relativePath); + }) + ); + } + + /** + * Create fake documents in the database for all files that are present locally + * and also exist remotely. This will stop the subequent syncs from duplicating + * the documents by creating the same documents from multiple clients. + */ + private async createFakeDocumentsFromRemoteState(): Promise { + if (this.database.getHasInitialSyncCompleted()) { + return; + } + + const [allLocalFiles, remote] = await awaitAll([ + this.operations.listFilesRecursively(), + this.syncQueue.add(async () => this.syncService.getAll()) + ]); + + if (remote !== undefined) { + remote.latestDocuments + .filter( + (remoteDocument) => + allLocalFiles.includes(remoteDocument.relativePath) && + !remoteDocument.isDeleted && + this.database.getDocumentByDocumentId( + remoteDocument.documentId + ) === undefined + ) + .forEach((remoteDocument) => { + this.database.createNewEmptyDocument( + remoteDocument.documentId, + remoteDocument.vaultUpdateId, + remoteDocument.relativePath + ); + }); + } + + this.database.setHasInitialSyncCompleted(true); + } } diff --git a/frontend/sync-client/src/sync-operations/unrestricted-syncer.ts b/frontend/sync-client/src/sync-operations/unrestricted-syncer.ts index 53960ae9..32cfb22a 100644 --- a/frontend/sync-client/src/sync-operations/unrestricted-syncer.ts +++ b/frontend/sync-client/src/sync-operations/unrestricted-syncer.ts @@ -1,20 +1,20 @@ import type { - Database, - DocumentRecord, - RelativePath + Database, + DocumentRecord, + RelativePath } from "../persistence/database"; import { diff } from "reconcile-text"; import type { SyncService } from "../services/sync-service"; import type { Logger } from "../tracing/logger"; import type { - CommonHistoryEntry, - SyncCreateDetails, - SyncDeleteDetails, - SyncDetails, - SyncHistory, - SyncMovedDetails, - SyncUpdateDetails + CommonHistoryEntry, + SyncCreateDetails, + SyncDeleteDetails, + SyncDetails, + SyncHistory, + SyncMovedDetails, + SyncUpdateDetails } from "../tracing/sync-history"; import { SyncStatus, SyncType } from "../tracing/sync-history"; import { EMPTY_HASH, hash } from "../utils/hash"; @@ -35,561 +35,560 @@ import { isBinary } from "../utils/is-binary"; import type { ServerConfig } from "../services/server-config"; export class UnrestrictedSyncer { - private ignorePatterns: RegExp[]; + private ignorePatterns: RegExp[]; - public constructor( - private readonly logger: Logger, - private readonly database: Database, - private readonly settings: Settings, - private readonly syncService: SyncService, - private readonly operations: FileOperations, - private readonly history: SyncHistory, - private readonly contentCache: FixedSizeDocumentCache, - private readonly serverConfig: ServerConfig - ) { - this.ignorePatterns = globsToRegexes( - this.settings.getSettings().ignorePatterns, - this.logger - ); + public constructor( + private readonly logger: Logger, + private readonly database: Database, + private readonly settings: Settings, + private readonly syncService: SyncService, + private readonly operations: FileOperations, + private readonly history: SyncHistory, + private readonly contentCache: FixedSizeDocumentCache, + private readonly serverConfig: ServerConfig + ) { + this.ignorePatterns = globsToRegexes( + this.settings.getSettings().ignorePatterns, + this.logger + ); - this.settings.addOnSettingsChangeListener((newSettings) => { - this.ignorePatterns = globsToRegexes( - newSettings.ignorePatterns, - this.logger - ); - }); - } + this.settings.onSettingsChanged.add((newSettings) => { + this.ignorePatterns = globsToRegexes( + newSettings.ignorePatterns, + this.logger + ); + }); + } - public async unrestrictedSyncLocallyCreatedFile( - document: DocumentRecord - ): Promise { - const updateDetails: SyncCreateDetails = { - type: SyncType.CREATE, - relativePath: document.relativePath - }; + public async unrestrictedSyncLocallyCreatedFile( + document: DocumentRecord + ): Promise { + const updateDetails: SyncCreateDetails = { + type: SyncType.CREATE, + relativePath: document.relativePath + }; - return this.executeSync(updateDetails, async () => { - const originalRelativePath = document.relativePath; - if (document.isDeleted) { - this.logger.debug( - `Document ${originalRelativePath} has been already deleted, no need to create it` - ); - return; - } + return this.executeSync(updateDetails, async () => { + const originalRelativePath = document.relativePath; + if (document.isDeleted) { + this.logger.debug( + `Document ${originalRelativePath} has been already deleted, no need to create it` + ); + return; + } - const contentBytes = - await this.operations.read(originalRelativePath); // this can throw FileNotFoundError - const contentHash = hash(contentBytes); + const contentBytes = + await this.operations.read(originalRelativePath); // this can throw FileNotFoundError + const contentHash = hash(contentBytes); - const response = await this.syncService.create({ - documentId: document.documentId, - relativePath: originalRelativePath, - contentBytes - }); + const response = await this.syncService.create({ + documentId: document.documentId, + relativePath: originalRelativePath, + contentBytes + }); - // In case a document with the same name (but different ID) had existed remotely that we haven't known about - if (response.relativePath != originalRelativePath) { - this.logger.debug( - `Document ${originalRelativePath} has been created remotely at a different path: ${response.relativePath}, moving it locally` - ); - await this.operations.move( - document.relativePath, - response.relativePath - ); // this can throw FileNotFoundError - } + // In case a document with the same name (but different ID) had existed remotely that we haven't known about + if (response.relativePath != originalRelativePath) { + this.logger.debug( + `Document ${originalRelativePath} has been created remotely at a different path: ${response.relativePath}, moving it locally` + ); + await this.operations.move( + document.relativePath, + response.relativePath + ); // this can throw FileNotFoundError + } - this.database.updateDocumentMetadata( - { - parentVersionId: response.vaultUpdateId, - hash: contentHash, - remoteRelativePath: response.relativePath - }, - document - ); + this.database.updateDocumentMetadata( + { + parentVersionId: response.vaultUpdateId, + hash: contentHash, + remoteRelativePath: response.relativePath + }, + document + ); - this.database.addSeenUpdateId(response.vaultUpdateId); - this.updateCache( - response.vaultUpdateId, - contentBytes, - response.relativePath - ); + this.database.addSeenUpdateId(response.vaultUpdateId); + this.updateCache( + response.vaultUpdateId, + contentBytes, + response.relativePath + ); - this.history.addHistoryEntry({ - status: SyncStatus.SUCCESS, - details: updateDetails, - message: `Successfully uploaded locally created file` - }); - }); - } + this.history.addHistoryEntry({ + status: SyncStatus.SUCCESS, + details: updateDetails, + message: `Successfully uploaded locally created file` + }); + }); + } - public async unrestrictedSyncLocallyDeletedFile( - document: DocumentRecord - ): Promise { - const updateDetails: SyncDeleteDetails = { - type: SyncType.DELETE, - relativePath: document.relativePath - }; + public async unrestrictedSyncLocallyDeletedFile( + document: DocumentRecord + ): Promise { + const updateDetails: SyncDeleteDetails = { + type: SyncType.DELETE, + relativePath: document.relativePath + }; - await this.executeSync(updateDetails, async () => { - const response = await this.syncService.delete({ - documentId: document.documentId, - relativePath: document.relativePath - }); + await this.executeSync(updateDetails, async () => { + const response = await this.syncService.delete({ + documentId: document.documentId, + relativePath: document.relativePath + }); - this.database.updateDocumentMetadata( - { - parentVersionId: response.vaultUpdateId, - hash: EMPTY_HASH, - remoteRelativePath: document.relativePath - }, - document - ); + this.database.updateDocumentMetadata( + { + parentVersionId: response.vaultUpdateId, + hash: EMPTY_HASH, + remoteRelativePath: document.relativePath + }, + document + ); - this.database.addSeenUpdateId(response.vaultUpdateId); + this.database.addSeenUpdateId(response.vaultUpdateId); - this.history.addHistoryEntry({ - status: SyncStatus.SUCCESS, - details: updateDetails, - message: `Successfully deleted locally deleted file on the server`, - author: response.userId - }); - }); - } + this.history.addHistoryEntry({ + status: SyncStatus.SUCCESS, + details: updateDetails, + message: `Successfully deleted locally deleted file on the server`, + author: response.userId + }); + }); + } - public async unrestrictedSyncLocallyUpdatedFile({ - oldPath, - document, - // We use the same code path for both local and remote updates. We need to force the update - // if there are no local changes but we know that the remote version is newer. - force = false - }: { - oldPath?: RelativePath; - force?: boolean; - document: DocumentRecord; - }): Promise { - const updateDetails: SyncUpdateDetails | SyncMovedDetails = - oldPath !== undefined - ? { - type: SyncType.MOVE, - relativePath: document.relativePath, - movedFrom: oldPath - } - : { - type: SyncType.UPDATE, - relativePath: document.relativePath - }; + public async unrestrictedSyncLocallyUpdatedFile({ + oldPath, + document, + // We use the same code path for both local and remote updates. We need to force the update + // if there are no local changes but we know that the remote version is newer. + force = false + }: { + oldPath?: RelativePath; + force?: boolean; + document: DocumentRecord; + }): Promise { + const updateDetails: SyncUpdateDetails | SyncMovedDetails = + oldPath !== undefined + ? { + type: SyncType.MOVE, + relativePath: document.relativePath, + movedFrom: oldPath + } + : { + type: SyncType.UPDATE, + relativePath: document.relativePath + }; - await this.executeSync(updateDetails, async () => { - const originalRelativePath = document.relativePath; + await this.executeSync(updateDetails, async () => { + const originalRelativePath = document.relativePath; - if (document.isDeleted || document.metadata === undefined) { - this.logger.debug( - `Document ${document.relativePath} has been already deleted, no need to update it` - ); - return; - } + if (document.isDeleted || document.metadata === undefined) { + this.logger.debug( + `Document ${document.relativePath} has been already deleted, no need to update it` + ); + return; + } - const contentBytes = await this.operations.read( - document.relativePath - ); // this can throw FileNotFoundError - let contentHash = hash(contentBytes); + const contentBytes = await this.operations.read( + document.relativePath + ); // this can throw FileNotFoundError + let contentHash = hash(contentBytes); - const areThereLocalChanges = !( - document.metadata.hash === contentHash && oldPath === undefined - ); + const areThereLocalChanges = !( + document.metadata.hash === contentHash && oldPath === undefined + ); - let response: DocumentVersion | DocumentUpdateResponse | undefined = - undefined; + let response: DocumentVersion | DocumentUpdateResponse | undefined = + undefined; - if (areThereLocalChanges) { - const isText = - !isBinary(contentBytes) && - isFileTypeMergable( - document.relativePath, - this.serverConfig.getConfig().mergeableFileExtensions - ); - const cachedVersion = this.contentCache.get( - document.metadata.parentVersionId - ); + if (areThereLocalChanges) { + const isText = + !isBinary(contentBytes) && + isFileTypeMergable( + document.relativePath, + this.serverConfig.getConfig().mergeableFileExtensions + ); + const cachedVersion = this.contentCache.get( + document.metadata.parentVersionId + ); - response = - isText && cachedVersion !== undefined - ? await this.syncService.putText({ - documentId: document.documentId, - parentVersionId: - document.metadata.parentVersionId, - relativePath: document.relativePath, - content: diff( - new TextDecoder().decode(cachedVersion), - new TextDecoder().decode(contentBytes) - ) - }) - : await this.syncService.putBinary({ - documentId: document.documentId, - parentVersionId: - document.metadata.parentVersionId, - relativePath: document.relativePath, - contentBytes - }); - } else { - if (!force) { - this.logger.debug( - `File hash of ${document.relativePath} matches with last synced version and the path hasn't changed; no need to sync` - ); - return; - } + response = + isText && cachedVersion !== undefined + ? await this.syncService.putText({ + documentId: document.documentId, + parentVersionId: + document.metadata.parentVersionId, + relativePath: document.relativePath, + content: diff( + new TextDecoder().decode(cachedVersion), + new TextDecoder().decode(contentBytes) + ) + }) + : await this.syncService.putBinary({ + documentId: document.documentId, + parentVersionId: + document.metadata.parentVersionId, + relativePath: document.relativePath, + contentBytes + }); + } else { + if (!force) { + this.logger.debug( + `File hash of ${document.relativePath} matches with last synced version and the path hasn't changed; no need to sync` + ); + return; + } - response = await this.syncService.get({ - documentId: document.documentId - }); - } + response = await this.syncService.get({ + documentId: document.documentId + }); + } - // `document` is mutable and reflects the latest state in the local database - // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition - if (document.isDeleted) { - this.logger.info( - `Document ${document.relativePath} has been deleted before we could finish updating it` - ); - this.database.addSeenUpdateId(response.vaultUpdateId); - return; - } + // `document` is mutable and reflects the latest state in the local database + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition + if (document.isDeleted) { + this.logger.info( + `Document ${document.relativePath} has been deleted before we could finish updating it` + ); + this.database.addSeenUpdateId(response.vaultUpdateId); + return; + } - if ( - // `Syncer` creates fake local document metadata for all remote docs with invalid hashes. The parent IDs will likely match - // the latest versions so we still need to update the local versions to turn the fakes into real metadata. - document.metadata.parentVersionId > response.vaultUpdateId - ) { - this.logger.debug( - `Document ${document.relativePath} is already more up to date than the fetched version` - ); - this.database.addSeenUpdateId(response.vaultUpdateId); // in case the previous `vaultUpdateId` update hasn't made it through - return; - } + if ( + // `Syncer` creates fake local document metadata for all remote docs with invalid hashes. The parent IDs will likely match + // the latest versions so we still need to update the local versions to turn the fakes into real metadata. + document.metadata.parentVersionId > response.vaultUpdateId + ) { + this.logger.debug( + `Document ${document.relativePath} is already more up to date than the fetched version` + ); + this.database.addSeenUpdateId(response.vaultUpdateId); // in case the previous `vaultUpdateId` update hasn't made it through + return; + } - if (response.isDeleted) { - return this.applyRemoteDeleteLocally(document, response); - } + if (response.isDeleted) { + return this.applyRemoteDeleteLocally(document, response); + } - let actualPath = document.relativePath; + let actualPath = document.relativePath; - if (response.relativePath != originalRelativePath) { - actualPath = response.relativePath; - // Make sure to update the remote relative path to avoid uploading - // the file as a result of this filesystem event. - document.metadata.remoteRelativePath = response.relativePath; - await this.operations.move( - document.relativePath, - response.relativePath - ); // this can throw FileNotFoundError - } + if (response.relativePath != originalRelativePath) { + actualPath = response.relativePath; + // Make sure to update the remote relative path to avoid uploading + // the file as a result of this filesystem event. + document.metadata.remoteRelativePath = response.relativePath; + await this.operations.move( + document.relativePath, + response.relativePath + ); // this can throw FileNotFoundError + } - if (!("type" in response) || response.type === "MergingUpdate") { - const responseBytes = base64ToBytes(response.contentBase64); - contentHash = hash(responseBytes); + if (!("type" in response) || response.type === "MergingUpdate") { + const responseBytes = base64ToBytes(response.contentBase64); + contentHash = hash(responseBytes); - this.database.updateDocumentMetadata( - { - parentVersionId: response.vaultUpdateId, - hash: contentHash, - remoteRelativePath: response.relativePath - }, - document - ); - await this.operations.write( - actualPath, - contentBytes, - responseBytes - ); - this.updateCache( - response.vaultUpdateId, - responseBytes, - actualPath - ); + this.database.updateDocumentMetadata( + { + parentVersionId: response.vaultUpdateId, + hash: contentHash, + remoteRelativePath: response.relativePath + }, + document + ); + await this.operations.write( + actualPath, + contentBytes, + responseBytes + ); + this.updateCache( + response.vaultUpdateId, + responseBytes, + actualPath + ); - if (!force) { - this.history.addHistoryEntry({ - status: SyncStatus.SUCCESS, - details: updateDetails, - message: `The file we updated had been updated remotely, so we downloaded the merged version` - }); - } - } else { - this.database.updateDocumentMetadata( - { - parentVersionId: response.vaultUpdateId, - hash: contentHash, - remoteRelativePath: response.relativePath - }, - document - ); - this.updateCache( - response.vaultUpdateId, - contentBytes, - actualPath - ); - } + if (!force) { + this.history.addHistoryEntry({ + status: SyncStatus.SUCCESS, + details: updateDetails, + message: `The file we updated had been updated remotely, so we downloaded the merged version` + }); + } + } else { + this.database.updateDocumentMetadata( + { + parentVersionId: response.vaultUpdateId, + hash: contentHash, + remoteRelativePath: response.relativePath + }, + document + ); + this.updateCache( + response.vaultUpdateId, + contentBytes, + actualPath + ); + } - this.database.addSeenUpdateId(response.vaultUpdateId); + this.database.addSeenUpdateId(response.vaultUpdateId); - const actualUpdateDetails: SyncUpdateDetails | SyncMovedDetails = - oldPath !== undefined || - response.relativePath != originalRelativePath - ? { - type: SyncType.MOVE, - relativePath: response.relativePath, - movedFrom: originalRelativePath - } - : { - type: SyncType.UPDATE, - relativePath: response.relativePath - }; + const actualUpdateDetails: SyncUpdateDetails | SyncMovedDetails = + oldPath !== undefined || + response.relativePath != originalRelativePath + ? { + type: SyncType.MOVE, + relativePath: response.relativePath, + movedFrom: originalRelativePath + } + : { + type: SyncType.UPDATE, + relativePath: response.relativePath + }; - if (areThereLocalChanges) { - this.history.addHistoryEntry({ - status: SyncStatus.SUCCESS, - details: actualUpdateDetails, - message: `Successfully uploaded locally updated file to the server`, - author: response.userId - }); - } else { - this.history.addHistoryEntry({ - status: SyncStatus.SUCCESS, - details: actualUpdateDetails, - message: `Successfully downloaded remotely updated file from the server`, - author: response.userId, - timestamp: new Date(response.updatedDate) - }); - } - }); - } + if (areThereLocalChanges) { + this.history.addHistoryEntry({ + status: SyncStatus.SUCCESS, + details: actualUpdateDetails, + message: `Successfully uploaded locally updated file to the server`, + author: response.userId + }); + } else { + this.history.addHistoryEntry({ + status: SyncStatus.SUCCESS, + details: actualUpdateDetails, + message: `Successfully downloaded remotely updated file from the server`, + author: response.userId, + timestamp: new Date(response.updatedDate) + }); + } + }); + } - public async unrestrictedSyncRemotelyUpdatedFile( - remoteVersion: DocumentVersionWithoutContent, - document?: DocumentRecord - ): Promise { - const updateDetails: SyncCreateDetails = { - type: SyncType.CREATE, - relativePath: remoteVersion.relativePath - }; + public async unrestrictedSyncRemotelyUpdatedFile( + remoteVersion: DocumentVersionWithoutContent, + document?: DocumentRecord + ): Promise { + const updateDetails: SyncCreateDetails = { + type: SyncType.CREATE, + relativePath: remoteVersion.relativePath + }; - await this.executeSync(updateDetails, async () => { - if (document?.metadata !== undefined) { - // If the file exists locally, let's pretend the user has updated it - // and deal with remote update/deletion within `unrestrictedSyncLocallyUpdatedFile` - if ( - document.metadata.parentVersionId >= - remoteVersion.vaultUpdateId - ) { - this.logger.debug( - `Document ${remoteVersion.relativePath} is already at least as up to date as the fetched version` - ); + await this.executeSync(updateDetails, async () => { + if (document?.metadata !== undefined) { + // If the file exists locally, let's pretend the user has updated it + // and deal with remote update/deletion within `unrestrictedSyncLocallyUpdatedFile` + if ( + document.metadata.parentVersionId >= + remoteVersion.vaultUpdateId + ) { + this.logger.debug( + `Document ${remoteVersion.relativePath} is already at least as up to date as the fetched version` + ); - return; - } + return; + } - return this.unrestrictedSyncLocallyUpdatedFile({ - document, - force: true - }); - } else if (remoteVersion.isDeleted) { - // Either the document hasn't made it to us before and therefore we don't need to delete it, - // or we already have it, in which case the preceeding if would've dealt with it - this.logger.debug( - `Document ${remoteVersion.relativePath} has been deleted remotely, no need to sync` - ); - return; - } + return this.unrestrictedSyncLocallyUpdatedFile({ + document, + force: true + }); + } else if (remoteVersion.isDeleted) { + // Either the document hasn't made it to us before and therefore we don't need to delete it, + // or we already have it, in which case the preceeding if would've dealt with it + this.logger.debug( + `Document ${remoteVersion.relativePath} has been deleted remotely, no need to sync` + ); + return; + } - // Don't download oversized files - const historyEntryForSkippedOversizedFile = - this.getHistoryEntryForSkippedOversizedFile( - remoteVersion.contentSize, - remoteVersion.relativePath - ); - if (historyEntryForSkippedOversizedFile !== undefined) { - this.history.addHistoryEntry( - historyEntryForSkippedOversizedFile - ); - return; - } + // Don't download oversized files + const historyEntryForSkippedOversizedFile = + this.getHistoryEntryForSkippedOversizedFile( + remoteVersion.contentSize, + remoteVersion.relativePath + ); + if (historyEntryForSkippedOversizedFile !== undefined) { + this.history.addHistoryEntry( + historyEntryForSkippedOversizedFile + ); + return; + } - const contentBytes = - await this.syncService.getDocumentVersionContent({ - documentId: remoteVersion.documentId, - vaultUpdateId: remoteVersion.vaultUpdateId - }); + const contentBytes = + await this.syncService.getDocumentVersionContent({ + documentId: remoteVersion.documentId, + vaultUpdateId: remoteVersion.vaultUpdateId + }); - // We're trying to create an entirely new document that didn't exist locally - document = this.database.getDocumentByDocumentId( - remoteVersion.documentId - ); - // It can happen that a concurrent sync operation has already created the document, so we can bail here - if (document !== undefined) { - this.logger.debug( - `Document ${remoteVersion.relativePath} has already been created locally, no need to create it again` - ); - return; - } + // We're trying to create an entirely new document that didn't exist locally + document = this.database.getDocumentByDocumentId( + remoteVersion.documentId + ); + // It can happen that a concurrent sync operation has already created the document, so we can bail here + if (document !== undefined) { + this.logger.debug( + `Document ${remoteVersion.relativePath} has already been created locally, no need to create it again` + ); + return; + } - await this.operations.ensureClearPath(remoteVersion.relativePath); + await this.operations.ensureClearPath(remoteVersion.relativePath); - const [promise, resolve] = createPromise(); - this.database.updateDocumentMetadata( - { - parentVersionId: remoteVersion.vaultUpdateId, - hash: hash(contentBytes), - remoteRelativePath: remoteVersion.relativePath - }, - this.database.createNewPendingDocument( - remoteVersion.documentId, - remoteVersion.relativePath, - promise - ) - ); + const [promise, resolve] = createPromise(); + this.database.updateDocumentMetadata( + { + parentVersionId: remoteVersion.vaultUpdateId, + hash: hash(contentBytes), + remoteRelativePath: remoteVersion.relativePath + }, + this.database.createNewPendingDocument( + remoteVersion.documentId, + remoteVersion.relativePath, + promise + ) + ); - await this.operations.create( - remoteVersion.relativePath, - contentBytes - ); - this.updateCache( - remoteVersion.vaultUpdateId, - contentBytes, - remoteVersion.relativePath - ); + await this.operations.create( + remoteVersion.relativePath, + contentBytes + ); + this.updateCache( + remoteVersion.vaultUpdateId, + contentBytes, + remoteVersion.relativePath + ); - resolve(); - this.database.removeDocumentPromise(promise); + resolve(); + this.database.removeDocumentPromise(promise); - this.history.addHistoryEntry({ - status: SyncStatus.SUCCESS, - details: updateDetails, - message: `Successfully downloaded remote file which hadn't existed locally`, - author: remoteVersion.userId, - timestamp: new Date(remoteVersion.updatedDate) - }); - }); - } + this.history.addHistoryEntry({ + status: SyncStatus.SUCCESS, + details: updateDetails, + message: `Successfully downloaded remote file which hadn't existed locally`, + author: remoteVersion.userId, + timestamp: new Date(remoteVersion.updatedDate) + }); + }); + } - public async executeSync( - details: SyncDetails, - fn: () => Promise - ): Promise { - for (const pattern of this.ignorePatterns) { - if (pattern.test(details.relativePath)) { - this.logger.debug( - `File '${details.relativePath}' is ignored by the ignore pattern: ${pattern}` - ); - return; // bail without SKIPPED status because we were told to ignore this file and we shouldn't clutter up the history - } - } + public async executeSync( + details: SyncDetails, + fn: () => Promise + ): Promise { + for (const pattern of this.ignorePatterns) { + if (pattern.test(details.relativePath)) { + this.logger.debug( + `File '${details.relativePath}' is ignored by the ignore pattern: ${pattern}` + ); + return; // bail without SKIPPED status because we were told to ignore this file and we shouldn't clutter up the history + } + } - try { - // Only check the size of files which already exist locally. - if (await this.operations.exists(details.relativePath)) { - const sizeInBytes = await this.operations.getFileSize( - details.relativePath - ); - const historyEntryForSkippedOversizedFile = - this.getHistoryEntryForSkippedOversizedFile( - sizeInBytes, - details.relativePath - ); - if (historyEntryForSkippedOversizedFile !== undefined) { - this.history.addHistoryEntry( - historyEntryForSkippedOversizedFile - ); - return; - } - } + try { + // Only check the size of files which already exist locally. + if (await this.operations.exists(details.relativePath)) { + const sizeInBytes = await this.operations.getFileSize( + details.relativePath + ); + const historyEntryForSkippedOversizedFile = + this.getHistoryEntryForSkippedOversizedFile( + sizeInBytes, + details.relativePath + ); + if (historyEntryForSkippedOversizedFile !== undefined) { + this.history.addHistoryEntry( + historyEntryForSkippedOversizedFile + ); + return; + } + } - return await fn(); - } catch (e) { - if (e instanceof FileNotFoundError) { - // A subsequent sync operation must have been creating to deal with this - this.logger.info( - `Skiping file '${details.relativePath}' because it no longer exists when trying to ${details.type.toLocaleLowerCase()} it` - ); - return; - } - if (e instanceof SyncResetError) { - this.logger.info( - `Interrupting sync operation because of a reset` - ); - return; - } else { - this.history.addHistoryEntry({ - status: SyncStatus.ERROR, - details, - message: `Failed to sync file '${details.relativePath}' because of ${e} when trying to ${details.type.toLocaleLowerCase()} it` - }); - throw e; - } - } - } + return await fn(); + } catch (e) { + if (e instanceof FileNotFoundError) { + // A subsequent sync operation must have been creating to deal with this + this.logger.info( + `Skiping file '${details.relativePath}' because it no longer exists when trying to ${details.type.toLocaleLowerCase()} it` + ); + return; + } + if (e instanceof SyncResetError) { + this.logger.info( + `Interrupting sync operation because of a reset` + ); + return; + } else { + this.history.addHistoryEntry({ + status: SyncStatus.ERROR, + details, + message: `Failed to sync file '${details.relativePath}' because of ${e} when trying to ${details.type.toLocaleLowerCase()} it` + }); + throw e; + } + } + } - private getHistoryEntryForSkippedOversizedFile( - sizeInBytes: number, - relativePath: RelativePath - ): CommonHistoryEntry | undefined { - const sizeInMB = Math.round(sizeInBytes / 1024 / 1024); - const { maxFileSizeMB } = this.settings.getSettings(); - if (sizeInMB > maxFileSizeMB) { - return { - status: SyncStatus.SKIPPED, - details: { - type: SyncType.SKIPPED, - relativePath - }, - message: `File size of ${sizeInMB} MB exceeds the maximum file size limit of ${ - maxFileSizeMB - } MB` - }; - } - } + private getHistoryEntryForSkippedOversizedFile( + sizeInBytes: number, + relativePath: RelativePath + ): CommonHistoryEntry | undefined { + const sizeInMB = Math.round(sizeInBytes / 1024 / 1024); + const { maxFileSizeMB } = this.settings.getSettings(); + if (sizeInMB > maxFileSizeMB) { + return { + status: SyncStatus.SKIPPED, + details: { + type: SyncType.SKIPPED, + relativePath + }, + message: `File size of ${sizeInMB} MB exceeds the maximum file size limit of ${maxFileSizeMB + } MB` + }; + } + } - private updateCache( - updateId: number, - contentBytes: Uint8Array, - filePath: RelativePath - ): void { - if ( - isFileTypeMergable( - filePath, - this.serverConfig.getConfig().mergeableFileExtensions - ) && - !isBinary(contentBytes) - ) { - this.contentCache.put(updateId, contentBytes); - } - } + private updateCache( + updateId: number, + contentBytes: Uint8Array, + filePath: RelativePath + ): void { + if ( + isFileTypeMergable( + filePath, + this.serverConfig.getConfig().mergeableFileExtensions + ) && + !isBinary(contentBytes) + ) { + this.contentCache.put(updateId, contentBytes); + } + } - private async applyRemoteDeleteLocally( - document: DocumentRecord, - response: DocumentVersion | DocumentUpdateResponse - ): Promise { - this.history.addHistoryEntry({ - status: SyncStatus.SUCCESS, - details: { - type: SyncType.DELETE, - relativePath: document.relativePath - }, - message: "File has been deleted remotely, so we deleted it locally", - author: response.userId, - timestamp: new Date(response.updatedDate) - }); + private async applyRemoteDeleteLocally( + document: DocumentRecord, + response: DocumentVersion | DocumentUpdateResponse + ): Promise { + this.history.addHistoryEntry({ + status: SyncStatus.SUCCESS, + details: { + type: SyncType.DELETE, + relativePath: document.relativePath + }, + message: "File has been deleted remotely, so we deleted it locally", + author: response.userId, + timestamp: new Date(response.updatedDate) + }); - this.database.delete(document.relativePath); - this.database.updateDocumentMetadata( - { - parentVersionId: response.vaultUpdateId, - hash: EMPTY_HASH, - remoteRelativePath: response.relativePath - }, - document - ); + this.database.delete(document.relativePath); + this.database.updateDocumentMetadata( + { + parentVersionId: response.vaultUpdateId, + hash: EMPTY_HASH, + remoteRelativePath: response.relativePath + }, + document + ); - await this.operations.delete(document.relativePath); + await this.operations.delete(document.relativePath); - this.database.addSeenUpdateId(response.vaultUpdateId); - } + this.database.addSeenUpdateId(response.vaultUpdateId); + } } diff --git a/frontend/sync-client/src/tracing/logger.ts b/frontend/sync-client/src/tracing/logger.ts index 41e25257..6ac2b4e1 100644 --- a/frontend/sync-client/src/tracing/logger.ts +++ b/frontend/sync-client/src/tracing/logger.ts @@ -1,87 +1,72 @@ import { MAX_LOG_MESSAGE_COUNT } from "../consts"; -import { removeFromArray } from "../utils/remove-from-array"; +import { EventListeners } from "../utils/data-structures/event-listeners"; export enum LogLevel { - DEBUG = "DEBUG", - INFO = "INFO", - WARNING = "WARNING", - ERROR = "ERROR" + DEBUG = "DEBUG", + INFO = "INFO", + WARNING = "WARNING", + ERROR = "ERROR" } const LOG_LEVEL_ORDER = { - [LogLevel.DEBUG]: 0, - [LogLevel.INFO]: 1, - [LogLevel.WARNING]: 2, - [LogLevel.ERROR]: 3 + [LogLevel.DEBUG]: 0, + [LogLevel.INFO]: 1, + [LogLevel.WARNING]: 2, + [LogLevel.ERROR]: 3 }; export class LogLine { - public timestamp = new Date(); - public constructor( - public level: LogLevel, - public message: string - ) {} + public timestamp = new Date(); + public constructor( + public level: LogLevel, + public message: string + ) { } } export class Logger { - private readonly messages: LogLine[] = []; - private readonly onMessageListeners: ((message: LogLine) => unknown)[] = []; + private readonly messages: LogLine[] = []; + public readonly onLogEmitted = new EventListeners< + (message: LogLine) => unknown + >(); - public constructor( - ...onMessageListeners: ((message: LogLine) => unknown)[] - ) { - this.onMessageListeners = onMessageListeners; - } - public debug(message: string): void { - this.pushMessage(message, LogLevel.DEBUG); - } + public debug(message: string): void { + this.pushMessage(message, LogLevel.DEBUG); + } - public info(message: string): void { - this.pushMessage(message, LogLevel.INFO); - } + public info(message: string): void { + this.pushMessage(message, LogLevel.INFO); + } - public warn(message: string): void { - this.pushMessage(message, LogLevel.WARNING); - } + public warn(message: string): void { + this.pushMessage(message, LogLevel.WARNING); + } - public error(message: string): void { - this.pushMessage(message, LogLevel.ERROR); - } + public error(message: string): void { + this.pushMessage(message, LogLevel.ERROR); + } - public getMessages(mininumSeverity: LogLevel): LogLine[] { - return this.messages.filter( - (message) => - LOG_LEVEL_ORDER[message.level] >= - LOG_LEVEL_ORDER[mininumSeverity] - ); - } + public getMessages(mininumSeverity: LogLevel): LogLine[] { + return this.messages.filter( + (message) => + LOG_LEVEL_ORDER[message.level] >= + LOG_LEVEL_ORDER[mininumSeverity] + ); + } - public addOnMessageListener(listener: (message: LogLine) => unknown): void { - this.onMessageListeners.push(listener); - } + public reset(): void { + this.messages.length = 0; + this.debug("Logger has been reset"); + } - public removeOnMessageListener( - listener: (message: LogLine) => unknown - ): void { - removeFromArray(this.onMessageListeners, listener); - } + private pushMessage(message: string, level: LogLevel): void { + const logLine = new LogLine(level, message); + this.messages.push(logLine); - public reset(): void { - this.messages.length = 0; - this.debug("Logger has been reset"); - } + while (this.messages.length > MAX_LOG_MESSAGE_COUNT) { + this.messages.shift(); + } - private pushMessage(message: string, level: LogLevel): void { - const logLine = new LogLine(level, message); - this.messages.push(logLine); - - while (this.messages.length > MAX_LOG_MESSAGE_COUNT) { - this.messages.shift(); - } - - this.onMessageListeners.forEach((listener) => { - listener(logLine); - }); - } + this.onLogEmitted.trigger(logLine); + } } diff --git a/frontend/sync-client/src/tracing/sync-history.ts b/frontend/sync-client/src/tracing/sync-history.ts index d60a57d1..99cfb5ce 100644 --- a/frontend/sync-client/src/tracing/sync-history.ts +++ b/frontend/sync-client/src/tracing/sync-history.ts @@ -1,183 +1,169 @@ import { - MAX_HISTORY_ENTRY_COUNT, - TIMEOUT_FOR_MERGING_HISTORY_ENTRIES_IN_SECONDS + MAX_HISTORY_ENTRY_COUNT, + TIMEOUT_FOR_MERGING_HISTORY_ENTRIES_IN_SECONDS } from "../consts"; import type { RelativePath } from "../persistence/database"; import type { Logger } from "./logger"; import { removeFromArray } from "../utils/remove-from-array"; +import { EventListeners } from "../utils/data-structures/event-listeners"; export interface SyncCreateDetails { - type: SyncType.CREATE; - relativePath: RelativePath; + type: SyncType.CREATE; + relativePath: RelativePath; } export interface SyncUpdateDetails { - type: SyncType.UPDATE; - relativePath: RelativePath; + type: SyncType.UPDATE; + relativePath: RelativePath; } export interface SyncMovedDetails { - type: SyncType.MOVE; - relativePath: RelativePath; - movedFrom: RelativePath; + type: SyncType.MOVE; + relativePath: RelativePath; + movedFrom: RelativePath; } export interface SyncDeleteDetails { - type: SyncType.DELETE; - relativePath: RelativePath; + type: SyncType.DELETE; + relativePath: RelativePath; } export interface SyncSkippedDetails { - type: SyncType.SKIPPED; - relativePath: RelativePath; + type: SyncType.SKIPPED; + relativePath: RelativePath; } export type SyncDetails = - | SyncCreateDetails - | SyncUpdateDetails - | SyncDeleteDetails - | SyncMovedDetails - | SyncSkippedDetails; + | SyncCreateDetails + | SyncUpdateDetails + | SyncDeleteDetails + | SyncMovedDetails + | SyncSkippedDetails; export interface CommonHistoryEntry { - status: SyncStatus; - message: string; - details: SyncDetails; - author?: string; - timestamp?: Date; + status: SyncStatus; + message: string; + details: SyncDetails; + author?: string; + timestamp?: Date; } export enum SyncType { - CREATE = "CREATE", - UPDATE = "UPDATE", - DELETE = "DELETE", - MOVE = "MOVE", - SKIPPED = "SKIPPED" + CREATE = "CREATE", + UPDATE = "UPDATE", + DELETE = "DELETE", + MOVE = "MOVE", + SKIPPED = "SKIPPED" } export enum SyncStatus { - SUCCESS = "SUCCESS", - ERROR = "ERROR", - SKIPPED = "SKIPPED" + SUCCESS = "SUCCESS", + ERROR = "ERROR", + SKIPPED = "SKIPPED" } export type HistoryEntry = CommonHistoryEntry & { timestamp: Date }; export interface HistoryStats { - success: number; - error: number; + success: number; + error: number; } export class SyncHistory { - private readonly _entries: HistoryEntry[] = []; + private readonly _entries: HistoryEntry[] = []; - private readonly syncHistoryUpdateListeners: (( - status: HistoryStats - ) => unknown)[] = []; + public readonly onHistoryUpdated = new EventListeners< + (status: HistoryStats) => unknown + >(); - private status: HistoryStats = { - success: 0, - error: 0 - }; + private status: HistoryStats = { + success: 0, + error: 0 + }; - public constructor(private readonly logger: Logger) {} + public constructor(private readonly logger: Logger) { } - public get entries(): readonly HistoryEntry[] { - return this._entries; - } + public get entries(): readonly HistoryEntry[] { + return this._entries; + } - /** - * Insert the entry at the beginning of the history list. If the entry - * already in the list, it will get moved to the beginning and updated. - * - * If the entry list is too long, the oldest entry will be removed. - */ - public addHistoryEntry(entry: CommonHistoryEntry): void { - const historyEntry = { - ...entry, - timestamp: entry.timestamp ?? new Date() - }; + /** + * Insert the entry at the beginning of the history list. If the entry + * already in the list, it will get moved to the beginning and updated. + * + * If the entry list is too long, the oldest entry will be removed. + */ + public addHistoryEntry(entry: CommonHistoryEntry): void { + const historyEntry = { + ...entry, + timestamp: entry.timestamp ?? new Date() + }; - const candidate = this.findSimilarRecentUpdateEntry(historyEntry); - if (candidate !== undefined) { - removeFromArray(this._entries, candidate); - } + const candidate = this.findSimilarRecentUpdateEntry(historyEntry); + if (candidate !== undefined) { + removeFromArray(this._entries, candidate); + } - // Insert the entry at the beginning - this._entries.unshift(historyEntry); + // Insert the entry at the beginning + this._entries.unshift(historyEntry); - if (this._entries.length > MAX_HISTORY_ENTRY_COUNT) { - this._entries.pop(); - } + if (this._entries.length > MAX_HISTORY_ENTRY_COUNT) { + this._entries.pop(); + } - this.updateSuccessCount(historyEntry); - } + this.updateSuccessCount(historyEntry); + } - public addSyncHistoryUpdateListener( - listener: (stats: HistoryStats) => unknown - ): void { - this.syncHistoryUpdateListeners.push(listener); - listener({ ...this.status }); - } - public removeSyncHistoryUpdateListener( - listener: (stats: HistoryStats) => unknown - ): void { - removeFromArray(this.syncHistoryUpdateListeners, listener); - } - public reset(): void { - this._entries.length = 0; - this.status = { - success: 0, - error: 0 - }; - this.syncHistoryUpdateListeners.forEach((listener) => { - listener(this.status); - }); - } + public reset(): void { + this._entries.length = 0; + this.status = { + success: 0, + error: 0 + }; + this.onHistoryUpdated.trigger(this.status); + } - private findSimilarRecentUpdateEntry( - entry: HistoryEntry - ): HistoryEntry | undefined { - if (entry.details.type !== SyncType.UPDATE) { - return; - } + private findSimilarRecentUpdateEntry( + entry: HistoryEntry + ): HistoryEntry | undefined { + if (entry.details.type !== SyncType.UPDATE) { + return; + } - const candidate = this._entries.find( - (e) => - e.details.type === SyncType.UPDATE && - e.details.relativePath === entry.details.relativePath - ); - if ( - candidate !== undefined && - (this._entries[0] === candidate || - candidate.timestamp.getTime() + - TIMEOUT_FOR_MERGING_HISTORY_ENTRIES_IN_SECONDS * 1000 > - entry.timestamp.getTime()) - ) { - return candidate; - } - } + const candidate = this._entries.find( + (e) => + e.details.type === SyncType.UPDATE && + e.details.relativePath === entry.details.relativePath + ); + if ( + candidate !== undefined && + (this._entries[0] === candidate || + candidate.timestamp.getTime() + + TIMEOUT_FOR_MERGING_HISTORY_ENTRIES_IN_SECONDS * 1000 > + entry.timestamp.getTime()) + ) { + return candidate; + } + } - private updateSuccessCount(entry: HistoryEntry): void { - const message = `${entry.details.relativePath} - ${entry.message} (${entry.details.type.toLocaleLowerCase()})`; - switch (entry.status) { - case SyncStatus.SUCCESS: - this.status.success++; - this.logger.info(`History entry: ${message}`); - break; - case SyncStatus.ERROR: - this.status.error++; - this.logger.error(`Cannot sync file: ${message}`); - break; - case SyncStatus.SKIPPED: - this.logger.warn(`Skipping file: ${message}`); - break; - } + private updateSuccessCount(entry: HistoryEntry): void { + const message = `${entry.details.relativePath} - ${entry.message} (${entry.details.type.toLocaleLowerCase()})`; + switch (entry.status) { + case SyncStatus.SUCCESS: + this.status.success++; + this.logger.info(`History entry: ${message}`); + break; + case SyncStatus.ERROR: + this.status.error++; + this.logger.error(`Cannot sync file: ${message}`); + break; + case SyncStatus.SKIPPED: + this.logger.warn(`Skipping file: ${message}`); + break; + } - this.syncHistoryUpdateListeners.forEach((listener) => { - listener(this.status); - }); - } + this.onHistoryUpdated.trigger(this.status); + } } diff --git a/frontend/sync-client/src/utils/data-structures/event-listeners.test.ts b/frontend/sync-client/src/utils/data-structures/event-listeners.test.ts new file mode 100644 index 00000000..c3e5a483 --- /dev/null +++ b/frontend/sync-client/src/utils/data-structures/event-listeners.test.ts @@ -0,0 +1,147 @@ +import { describe, it } from "node:test"; +import assert from "node:assert"; +import { EventListeners } from "./event-listeners"; + +describe("EventListeners", () => { + it("should add & remove listeners", () => { + const listeners = new EventListeners<() => void>(); + const listener = () => { }; + + listeners.add(listener); + + assert.strictEqual(listeners.count, 1); + + const removed = listeners.remove(listener); + assert.strictEqual(removed, true); + assert.strictEqual(listeners.count, 0); + }); + + + it("should remove listeners using unsubscribe function", () => { + const listeners = new EventListeners<() => void>(); + const listener = () => { }; + + const unsubscribe = listeners.add(listener); + unsubscribe(); + + assert.strictEqual(listeners.count, 0); + }); + + it("should return false when removing non-existent listener", () => { + const listeners = new EventListeners<() => void>(); + const listener = () => { }; + + const removed = listeners.remove(listener); + + assert.strictEqual(removed, false); + }); + + it("should handle multiple listeners", () => { + const listeners = new EventListeners<() => void>(); + const listener1 = () => { }; + const listener2 = () => { }; + const listener3 = () => { }; + + listeners.add(listener1); + listeners.add(listener2); + listeners.add(listener3); + + assert.strictEqual(listeners.count, 3); + + listeners.remove(listener2); + + assert.strictEqual(listeners.count, 2); + }); + + it("should trigger all listeners synchronously", () => { + const listeners = new EventListeners<(value: string) => void>(); + const calls: string[] = []; + + listeners.add((value) => calls.push(`listener1-${value}`)); + listeners.add((value) => calls.push(`listener2-${value}`)); + + listeners.trigger("test"); + + assert.deepStrictEqual(calls, ["listener1-test", "listener2-test"]); + }); + + it("should trigger listeners with multiple arguments", () => { + const listeners = new EventListeners< + (a: number, b: string, c: boolean) => void + >(); + const calls: [number, string, boolean][] = []; + + listeners.add((a, b, c) => calls.push([a, b, c])); + listeners.trigger(42, "hello", true); + + assert.deepStrictEqual(calls, [[42, "hello", true]]); + }); + + it("should not trigger removed listeners", () => { + const listeners = new EventListeners<() => void>(); + let count1 = 0; + let count2 = 0; + + const listener1 = () => { + count1++; + }; + const listener2 = () => { + count2++; + }; + + listeners.add(listener1); + const unsubscribe = listeners.add(listener2); + + unsubscribe(); + listeners.trigger(); + + assert.strictEqual(count1, 1); + assert.strictEqual(count2, 0); + }); + + it("should trigger all listeners and await promises", async () => { + const listeners = new EventListeners< + (value: string) => Promise | void + >(); + const results: string[] = []; + + listeners.add(async (value) => { + await new Promise((resolve) => setTimeout(resolve, 10)); + results.push(`async1-${value}`); + }); + + listeners.add((value) => { + results.push(`sync-${value}`); + }); + + listeners.add(async (value) => { + await new Promise((resolve) => setTimeout(resolve, 5)); + results.push(`async2-${value}`); + }); + + await listeners.triggerAsync("test"); + + assert.ok(results.includes("async1-test")); + assert.ok(results.includes("sync-test")); + assert.ok(results.includes("async2-test")); + assert.strictEqual(results.length, 3); + }); + + + + it("should not trigger cleared listeners", () => { + const listeners = new EventListeners<() => void>(); + let called = false; + const listener = () => { + called = true; + }; + + listeners.add(listener); + listeners.clear(); + + assert.strictEqual(listeners.count, 0); + listeners.trigger(); + + assert.strictEqual(called, false); + }); +}); diff --git a/frontend/sync-client/src/utils/data-structures/event-listeners.ts b/frontend/sync-client/src/utils/data-structures/event-listeners.ts new file mode 100644 index 00000000..25be5344 --- /dev/null +++ b/frontend/sync-client/src/utils/data-structures/event-listeners.ts @@ -0,0 +1,71 @@ +import { removeFromArray } from "../remove-from-array"; +import { awaitAll } from "../await-all"; + +/** +* A utility class for managing event listeners with type-safe add/remove operations. +*/ +export class EventListeners any> { + private readonly listeners: TListener[] = []; + + /** + * Adds a new listener to the collection. + * + * @param listener The listener callback to add + * @returns An unsubscribe function that removes this listener when called + */ + public add(listener: TListener): () => void { + this.listeners.push(listener); + return () => this.remove(listener); + } + + /** + * Removes a listener from the collection. + * + * @param listener The listener callback to remove + * @returns true if the listener was found and removed, false otherwise + */ + public remove(listener: TListener): boolean { + return removeFromArray(this.listeners, listener); + } + + /** + * Triggers all listeners synchronously with the provided arguments. + * Any returned promises are ignored. Use triggerAsync() to await them. + * + * @param args The arguments to pass to each listener + */ + public trigger(...args: Parameters): void { + this.listeners.forEach((listener) => { + listener(...args); + }); + } + + /** + * Triggers all listeners and awaits any promises they return. + * Synchronous listeners are called immediately, and any async listeners + * are awaited in parallel. + * + * @param args The arguments to pass to each listener + */ + public async triggerAsync(...args: Parameters): Promise { + await awaitAll( + this.listeners + .map((listener) => { + return listener(...args); + }) + .filter((result): result is Promise => { + return result instanceof Promise; + }) + ); + } + + public clear(): void { + this.listeners.length = 0; + } + + public get count(): number { + return this.listeners.length; + } + + +} diff --git a/frontend/sync-client/src/utils/debugging/log-to-console.ts b/frontend/sync-client/src/utils/debugging/log-to-console.ts index 2d1a12e8..3499f029 100644 --- a/frontend/sync-client/src/utils/debugging/log-to-console.ts +++ b/frontend/sync-client/src/utils/debugging/log-to-console.ts @@ -3,7 +3,7 @@ import type { LogLine } from "../../tracing/logger"; import { LogLevel } from "../../tracing/logger"; export function logToConsole(client: SyncClient): void { - client.logger.addOnMessageListener((logLine: LogLine) => { + client.logger.onLogEmitted.add((logLine: LogLine) => { const formatted = `${logLine.timestamp.toISOString()} ${logLine.level} ${logLine.message}`; switch (logLine.level) {