diff --git a/frontend/sync-client/src/persistence/settings.ts b/frontend/sync-client/src/persistence/settings.ts index a62e4f0c..bcb32531 100644 --- a/frontend/sync-client/src/persistence/settings.ts +++ b/frontend/sync-client/src/persistence/settings.ts @@ -8,6 +8,7 @@ export interface SyncSettings { isSyncEnabled: boolean; maxFileSizeMB: number; ignorePatterns: string[]; + webSocketRetryIntervalMs: number; } export const DEFAULT_SETTINGS: SyncSettings = { @@ -17,7 +18,8 @@ export const DEFAULT_SETTINGS: SyncSettings = { syncConcurrency: 1, isSyncEnabled: false, maxFileSizeMB: 10, - ignorePatterns: [] + ignorePatterns: [], + webSocketRetryIntervalMs: 3500 }; export class Settings { diff --git a/frontend/sync-client/src/services/websocket-manager.ts b/frontend/sync-client/src/services/websocket-manager.ts new file mode 100644 index 00000000..8a37f9ff --- /dev/null +++ b/frontend/sync-client/src/services/websocket-manager.ts @@ -0,0 +1,196 @@ +import type { Database } from "../persistence/database"; +import type { Logger } from "../tracing/logger"; +import type { Settings, SyncSettings } from "../persistence/settings"; +import { WebSocketServerMessage } from "./types/WebSocketServerMessage"; +import { Syncer } from "../sync-operations/syncer"; +import { WebSocketClientMessage } from "./types/WebSocketClientMessage"; +import { CursorPositionFromClient } from "./types/CursorPositionFromClient"; + +export class WebSocketManager { + private readonly webSocketStatusChangeListeners: (() => unknown)[] = []; + // private readonly cur: (() => unknown)[] = []; + + private refreshWebSocketInterval: NodeJS.Timeout | undefined; + + private webSocket: WebSocket | undefined; + + private readonly webSocketFactoryImplementation: typeof globalThis.WebSocket; + + // eslint-disable-next-line @typescript-eslint/max-params + public constructor( + private readonly deviceId: string, + private readonly logger: Logger, + private readonly database: Database, + private readonly settings: Settings, + private readonly syncer: Syncer, + webSocketImplementation?: typeof globalThis.WebSocket + ) { + if (webSocketImplementation) { + this.webSocketFactoryImplementation = webSocketImplementation; + } else { + if ( + typeof globalThis !== "undefined" && + typeof globalThis.WebSocket === "undefined" + ) { + // eslint-disable-next-line + this.webSocketFactoryImplementation = require("ws"); // polyfill for WebSocket in Node.js + } else { + this.webSocketFactoryImplementation = WebSocket; + } + } + + this.updateWebSocket(settings.getSettings()); + + settings.addOnSettingsChangeListener((newSettings, oldSettings) => { + if ( + newSettings.remoteUri !== oldSettings.remoteUri || + newSettings.vaultName !== oldSettings.vaultName || + newSettings.token !== oldSettings.token || + newSettings.isSyncEnabled !== oldSettings.isSyncEnabled + ) { + this.updateWebSocket(newSettings); + } + }); + + this.setWebSocketRefreshInterval(); + } + + public get isWebSocketConnected(): boolean { + return ( + this.webSocket?.readyState === + this.webSocketFactoryImplementation.OPEN + ); + } + + public addWebSocketStatusChangeListener(listener: () => void): void { + this.webSocketStatusChangeListeners.push(listener); + } + + public async reset(): Promise { + this.setWebSocketRefreshInterval(); + this.updateWebSocket(this.settings.getSettings()); + } + + public stop(): void { + clearInterval(this.refreshWebSocketInterval); + + try { + this.webSocket?.close(); + } catch (e) { + this.logger.warn(`Failed to close WebSocket: ${e}`); + } + } + + private updateWebSocket(settings: SyncSettings): void { + try { + this.webSocket?.close(); + } catch (e) { + this.logger.warn(`Failed to close WebSocket: ${e}`); + } + + if (!settings.isSyncEnabled) { + this.webSocket = undefined; + return; + } + + const wsUri = new URL(settings.remoteUri); + wsUri.protocol = wsUri.protocol === "https" ? "wss" : "ws"; + wsUri.pathname = `/vaults/${settings.vaultName}/ws`; + + this.logger.info(`Connecting to WebSocket at ${wsUri.toString()}`); + + this.webSocket = new this.webSocketFactoryImplementation(wsUri); + + this.webSocket.onmessage = async (event): Promise => { + // eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion + const message = JSON.parse(event.data) as WebSocketServerMessage; + + if (message.type === "vaultUpdate") { + try { + await Promise.all( + message.documents.map(async (document) => + this.syncer.syncRemotelyUpdatedFile(document) + ) + ); + + if (message.isInitialSync && message.documents.length > 0) { + this.database.setLastSeenUpdateId( + message.documents + .map((document) => document.vaultUpdateId) + .reduce((a, b) => Math.max(a, b)) + ); + } + } catch (e) { + this.logger.error( + `Failed to sync remotely updated file: ${e}` + ); + } + } else if (message.type === "cursorPositions") { + this.logger.info( + `Received cursor positions for ${JSON.stringify(message.clients)}` + ); + // Handle cursor positions if needed + } else { + this.logger.warn( + `Received unknown message type: ${JSON.stringify(message)}` + ); + } + }; + + // The JS WebSocket API doesn't support setting headers, so we have to send the token as a message + this.webSocket.onopen = (): void => { + this.logger.info("WebSocket connection opened"); + this.webSocketStatusChangeListeners.forEach((listener) => { + listener(); + }); + + let message: WebSocketClientMessage = { + type: "handshake", + deviceId: this.deviceId, + token: settings.token, + lastSeenVaultUpdateId: this.database.getLastSeenUpdateId() + }; + this.webSocket?.send(JSON.stringify(message)); + }; + + this.webSocket.onclose = (event): void => { + this.logger.warn( + `WebSocket closed with code ${event.code} (${event.reason == "" ? "unknown reason" : event.reason})` + ); + this.webSocketStatusChangeListeners.forEach((listener) => { + listener(); + }); + }; + } + + public sendCursorPositions( + cursorPositions: CursorPositionFromClient + ): void { + if (!this.isWebSocketConnected) { + this.logger.warn( + "WebSocket is not connected, cannot send cursor positions" + ); + return; + } + let message: WebSocketClientMessage = { + type: "cursorPositions", + ...cursorPositions + }; + this.webSocket?.send(JSON.stringify(message)); + this.logger.info( + `Sent cursor positions: ${JSON.stringify(cursorPositions)}` + ); + } + + private setWebSocketRefreshInterval(): void { + this.refreshWebSocketInterval = setInterval(() => { + if ( + this.webSocket?.readyState === + this.webSocketFactoryImplementation.CLOSED + ) { + this.logger.info("WebSocket is closed, reconnecting..."); + this.updateWebSocket(this.settings.getSettings()); + } + }, this.settings.getSettings().webSocketRetryIntervalMs); + } +} diff --git a/frontend/sync-client/src/sync-client.ts b/frontend/sync-client/src/sync-client.ts index 94c446e8..b9fbbcc2 100644 --- a/frontend/sync-client/src/sync-client.ts +++ b/frontend/sync-client/src/sync-client.ts @@ -15,9 +15,10 @@ import { FileOperations } from "./file-operations/file-operations"; import { ConnectionStatus } from "./services/connection-status"; import { UnrestrictedSyncer } from "./sync-operations/unrestricted-syncer"; import { rateLimit } from "./utils/rate-limit"; -import { v4 as uuidv4 } from "uuid"; import type { NetworkConnectionStatus } from "./types/network-connection-status"; import { DocumentUpdateStatus } from "./types/document-update-status"; +import { WebSocketManager } from "./services/websocket-manager"; +import { createClientId } from "./utils/create-client-id"; export class SyncClient { private static readonly MINIMUM_SAVE_INTERVAL_MS = 1000; @@ -29,6 +30,7 @@ export class SyncClient { private readonly database: Database, private readonly syncer: Syncer, private readonly syncService: SyncService, + private readonly webSocketManager: WebSocketManager, private readonly _logger: Logger, private readonly connectionStatus: ConnectionStatus ) { @@ -68,7 +70,10 @@ export class SyncClient { nativeLineEndings?: string; }): Promise { const logger = new Logger(); - logger.info("Initialising SyncClient"); + + const deviceId = createClientId(); + + logger.info(`Initialising SyncClient with client id ${deviceId}`); const history = new SyncHistory(logger); @@ -104,7 +109,6 @@ export class SyncClient { await rateLimitedSave(state); } ); - const deviceId = uuidv4(); const connectionStatus = new ConnectionStatus(settings, logger); const syncService = new SyncService( @@ -121,6 +125,7 @@ export class SyncClient { fs, nativeLineEndings ); + const unrestrictedSyncer = new UnrestrictedSyncer( logger, database, @@ -129,6 +134,7 @@ export class SyncClient { fileOperations, history ); + const syncer = new Syncer( deviceId, logger, @@ -136,7 +142,15 @@ export class SyncClient { settings, syncService, fileOperations, - unrestrictedSyncer, + unrestrictedSyncer + ); + + const webSocketManager = new WebSocketManager( + deviceId, + logger, + database, + settings, + syncer, webSocket ); @@ -146,6 +160,7 @@ export class SyncClient { database, syncer, syncService, + webSocketManager, logger, connectionStatus ); @@ -160,7 +175,7 @@ export class SyncClient { return { isSuccessful: server.isSuccessful, serverMessage: server.message, - isWebSocketConnected: this.syncer.isWebSocketConnected + isWebSocketConnected: this.webSocketManager.isWebSocketConnected }; } @@ -179,7 +194,7 @@ export class SyncClient { } public stop(): void { - this.syncer.stop(); + this.webSocketManager.stop(); } public async waitAndStop(): Promise { @@ -194,6 +209,7 @@ export class SyncClient { this.stop(); this.connectionStatus.startReset(); await this.syncer.reset(); + await this.webSocketManager.reset(); this.history.reset(); this.database.reset(); this._logger.reset(); @@ -229,7 +245,7 @@ export class SyncClient { } public addWebSocketStatusChangeListener(listener: () => void): void { - this.syncer.addWebSocketStatusChangeListener(listener); + this.webSocketManager.addWebSocketStatusChangeListener(listener); } public async syncLocallyCreatedFile( diff --git a/frontend/sync-client/src/sync-operations/syncer.ts b/frontend/sync-client/src/sync-operations/syncer.ts index e141ce9d..1498fbfa 100644 --- a/frontend/sync-client/src/sync-operations/syncer.ts +++ b/frontend/sync-client/src/sync-operations/syncer.ts @@ -18,26 +18,14 @@ import { createPromise } from "../utils/create-promise"; import { SyncResetError } from "../services/sync-reset-error"; import { Locks } from "../utils/locks"; -interface WebsocketVaultUpdate { - documents: components["schemas"]["DocumentVersionWithoutContent"][]; - isInitialSync: boolean; -} - export class Syncer { private readonly remoteDocumentsLock: Locks; private readonly remainingOperationsListeners: (( remainingOperations: number ) => void)[] = []; - private readonly webSocketStatusChangeListeners: (() => void)[] = []; private readonly syncQueue: PQueue; private runningScheduleSyncForOfflineChanges: Promise | undefined; - private refreshApplyRemoteChangesWebSocketInterval: - | NodeJS.Timeout - | undefined; - private applyRemoteChangesWebSocket: WebSocket | undefined; - - private readonly webSocketImplementation: typeof globalThis.WebSocket; // eslint-disable-next-line @typescript-eslint/max-params public constructor( @@ -47,41 +35,15 @@ export class Syncer { private readonly settings: Settings, private readonly syncService: SyncService, private readonly operations: FileOperations, - private readonly internalSyncer: UnrestrictedSyncer, - webSocketImplementation?: typeof globalThis.WebSocket + private readonly internalSyncer: UnrestrictedSyncer ) { this.syncQueue = new PQueue({ concurrency: settings.getSettings().syncConcurrency }); - if (webSocketImplementation) { - this.webSocketImplementation = webSocketImplementation; - } else { - if ( - typeof globalThis !== "undefined" && - typeof globalThis.WebSocket === "undefined" - ) { - // eslint-disable-next-line - this.webSocketImplementation = require("ws"); // polyfill for WebSocket in Node.js - } else { - this.webSocketImplementation = WebSocket; - } - } - - this.updateWebSocket(settings.getSettings()); - this.remoteDocumentsLock = new Locks(this.logger); settings.addOnSettingsChangeListener((newSettings, oldSettings) => { - if ( - newSettings.remoteUri !== oldSettings.remoteUri || - newSettings.vaultName !== oldSettings.vaultName || - newSettings.token !== oldSettings.token || - newSettings.isSyncEnabled !== oldSettings.isSyncEnabled - ) { - this.updateWebSocket(newSettings); - } - if (newSettings.syncConcurrency !== oldSettings.syncConcurrency) { this.syncQueue.concurrency = newSettings.syncConcurrency; } @@ -92,15 +54,6 @@ export class Syncer { listener(this.syncQueue.size); }); }); - - this.setWebSocketRefreshInterval(); - } - - public get isWebSocketConnected(): boolean { - return ( - this.applyRemoteChangesWebSocket?.readyState === - this.webSocketImplementation.OPEN - ); } public addRemainingOperationsListener( @@ -109,10 +62,6 @@ export class Syncer { this.remainingOperationsListeners.push(listener); } - public addWebSocketStatusChangeListener(listener: () => void): void { - this.webSocketStatusChangeListeners.push(listener); - } - public async syncLocallyCreatedFile( relativePath: RelativePath ): Promise { @@ -303,105 +252,9 @@ export class Syncer { public async reset(): Promise { await this.waitUntilFinished(); - this.setWebSocketRefreshInterval(); - this.updateWebSocket(this.settings.getSettings()); } - public stop(): void { - clearInterval(this.refreshApplyRemoteChangesWebSocketInterval); - - try { - this.applyRemoteChangesWebSocket?.close(); - } catch (e) { - this.logger.warn(`Failed to close WebSocket: ${e}`); - } - } - - private updateWebSocket(settings: SyncSettings): void { - try { - this.applyRemoteChangesWebSocket?.close(); - } catch (e) { - this.logger.warn(`Failed to close WebSocket: ${e}`); - } - - if (!settings.isSyncEnabled) { - this.applyRemoteChangesWebSocket = undefined; - return; - } - - const wsUri = new URL(settings.remoteUri); - wsUri.protocol = wsUri.protocol === "https" ? "wss" : "ws"; - wsUri.pathname = `/vaults/${settings.vaultName}/ws`; - - this.logger.info(`Connecting to WebSocket at ${wsUri.toString()}`); - - this.applyRemoteChangesWebSocket = new this.webSocketImplementation( - wsUri - ); - - this.applyRemoteChangesWebSocket.onmessage = async ( - event - ): Promise => { - // eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion - const message = JSON.parse(event.data) as WebsocketVaultUpdate; - - try { - await Promise.all( - message.documents.map(async (document) => - this.syncRemotelyUpdatedFile(document) - ) - ); - - if (message.isInitialSync && message.documents.length > 0) { - this.database.setLastSeenUpdateId( - message.documents - .map((document) => document.vaultUpdateId) - .reduce((a, b) => Math.max(a, b)) - ); - } - } catch (e) { - this.logger.error(`Failed to sync remotely updated file: ${e}`); - } - }; - - // The JS WebSocket API doesn't support setting headers, so we have to send the token as a message - this.applyRemoteChangesWebSocket.onopen = (): void => { - this.logger.info("WebSocket connection opened"); - this.applyRemoteChangesWebSocket?.send( - JSON.stringify({ - deviceId: this.deviceId, - token: settings.token, - lastSeenVaultUpdateId: this.database.getLastSeenUpdateId() - }) - ); - this.webSocketStatusChangeListeners.forEach((listener) => { - listener(); - }); - }; - - this.applyRemoteChangesWebSocket.onclose = (event): void => { - this.logger.warn( - `WebSocket closed with code ${event.code} (${event.reason == "" ? "unknown reason" : event.reason})` - ); - this.webSocketStatusChangeListeners.forEach((listener) => { - listener(); - }); - }; - } - - private setWebSocketRefreshInterval(): void { - this.refreshApplyRemoteChangesWebSocketInterval = setInterval(() => { - if ( - this.applyRemoteChangesWebSocket?.readyState === - this.webSocketImplementation.OPEN - ) { - return; - } - this.updateWebSocket(this.settings.getSettings()); - }, 5000); - } - - private async syncRemotelyUpdatedFile( + public async syncRemotelyUpdatedFile( remoteVersion: components["schemas"]["DocumentVersionWithoutContent"] ): Promise { let document = this.database.getDocumentByDocumentId(