From 0320308f1a71b9a8bdade99d27c45a3f9441e2bf Mon Sep 17 00:00:00 2001 From: Andras Schmelczer Date: Tue, 25 Mar 2025 22:26:08 +0000 Subject: [PATCH] Use websocket on the FE --- frontend/sync-client/src/sync-client.ts | 6 + .../sync-client/src/sync-operations/syncer.ts | 141 ++++++++++-------- 2 files changed, 81 insertions(+), 66 deletions(-) diff --git a/frontend/sync-client/src/sync-client.ts b/frontend/sync-client/src/sync-client.ts index 6100bc99..f771ae06 100644 --- a/frontend/sync-client/src/sync-client.ts +++ b/frontend/sync-client/src/sync-client.ts @@ -152,7 +152,12 @@ export class SyncClient { await this.syncer.scheduleSyncForOfflineChanges(); } + public stop(): void { + this.syncer.stop(); + } + public async waitAndStop(): Promise { + this.stop(); await this.syncer.waitUntilFinished(); } @@ -160,6 +165,7 @@ export class SyncClient { /// and the local database but retain the settings. /// The SyncClient can be used again after calling this method. public async reset(): Promise { + this.stop(); this.connectionStatus.startReset(); await this.syncer.reset(); this.history.reset(); diff --git a/frontend/sync-client/src/sync-operations/syncer.ts b/frontend/sync-client/src/sync-operations/syncer.ts index 1c3b064c..70c7cb31 100644 --- a/frontend/sync-client/src/sync-operations/syncer.ts +++ b/frontend/sync-client/src/sync-operations/syncer.ts @@ -5,7 +5,7 @@ import PQueue from "p-queue"; import { hash } from "../utils/hash"; import { v4 as uuidv4 } from "uuid"; import type { components } from "../services/types"; -import type { Settings } from "../persistence/settings"; +import type { Settings, SyncSettings } from "../persistence/settings"; import type { FileOperations } from "../file-operations/file-operations"; import { findMatchingFile } from "../utils/find-matching-file"; import type { UnrestrictedSyncer } from "./unrestricted-syncer"; @@ -19,12 +19,15 @@ export class Syncer { private readonly syncQueue: PQueue; private runningScheduleSyncForOfflineChanges: Promise | undefined; - private runningApplyRemoteChangesLocally: Promise | undefined; + private refreshApplyRemoteChangesWebSocketInterval: + | NodeJS.Timeout + | undefined; + private applyRemoteChangesWebSocket: WebSocket | undefined; public constructor( private readonly logger: Logger, private readonly database: Database, - settings: Settings, + private readonly settings: Settings, private readonly syncService: SyncService, private readonly operations: FileOperations, private readonly internalSyncer: UnrestrictedSyncer @@ -33,11 +36,21 @@ export class Syncer { concurrency: settings.getSettings().syncConcurrency }); + this.updateWebSocket(settings.getSettings()); + settings.addOnSettingsChangeListener((newSettings, oldSettings) => { - if (newSettings.syncConcurrency === oldSettings.syncConcurrency) { - return; + if ( + newSettings.remoteUri !== oldSettings.remoteUri || + newSettings.vaultName !== oldSettings.vaultName || + newSettings.token !== oldSettings.token || + newSettings.isSyncEnabled !== oldSettings.isSyncEnabled + ) { + this.updateWebSocket(newSettings); + } + + if (newSettings.syncConcurrency !== oldSettings.syncConcurrency) { + this.syncQueue.concurrency = newSettings.syncConcurrency; } - this.syncQueue.concurrency = newSettings.syncConcurrency; }); this.syncQueue.on("active", () => { @@ -45,6 +58,20 @@ export class Syncer { listener(this.syncQueue.size); }); }); + + this.setWebSocketRefreshInterval(); + } + + public async reset(): Promise { + await this.waitUntilFinished(); + this.internalSyncer.reset(); + this.setWebSocketRefreshInterval(); + this.updateWebSocket(this.settings.getSettings()); + } + + public stop(): void { + clearInterval(this.refreshApplyRemoteChangesWebSocketInterval); + this.applyRemoteChangesWebSocket?.close(); } public addRemainingOperationsListener( @@ -206,78 +233,60 @@ export class Syncer { } } - public async applyRemoteChangesLocally(): Promise { - if (this.runningApplyRemoteChangesLocally !== undefined) { - this.logger.debug( - "Applying remote changes locally is already in progress" - ); - return this.runningApplyRemoteChangesLocally; - } - - try { - this.runningApplyRemoteChangesLocally = - this.internalApplyRemoteChangesLocally(); - await this.runningApplyRemoteChangesLocally; - this.logger.info("All remote changes have been applied locally"); - } catch (e) { - if (e instanceof SyncResetError) { - this.logger.info( - "Failed to apply remote changes locally due to a reset" - ); - return; - } - this.logger.error(`Failed to apply remote changes locally: ${e}`); - throw e; - } finally { - this.runningApplyRemoteChangesLocally = undefined; - } - } - - public async reset(): Promise { - await this.waitUntilFinished(); - this.internalSyncer.reset(); - } - public async waitUntilFinished(): Promise { - await Promise.allSettled([ - this.runningScheduleSyncForOfflineChanges, - this.runningApplyRemoteChangesLocally - ]); + await this.runningScheduleSyncForOfflineChanges; return this.syncQueue.onEmpty(); } - private async internalApplyRemoteChangesLocally(): Promise { - const remote = await this.syncQueue.add(async () => - this.syncService.getAll(this.database.getLastSeenUpdateId()) - ); + private updateWebSocket(settings: SyncSettings): void { + this.applyRemoteChangesWebSocket?.close(); - if (!remote) { - throw new Error("Failed to fetch remote changes"); - } - - if (remote.latestDocuments.length === 0) { - this.logger.debug("No remote changes to apply"); + if (!settings.isSyncEnabled) { + this.applyRemoteChangesWebSocket = undefined; return; } - this.logger.info("Applying remote changes locally"); + const wsUri = new URL(settings.remoteUri); + wsUri.protocol = wsUri.protocol === "https" ? "wss" : "ws"; + wsUri.pathname = `/vaults/${settings.vaultName}/ws`; + this.applyRemoteChangesWebSocket = new WebSocket(wsUri); - await Promise.all( - remote.latestDocuments.map(this.syncRemotelyUpdatedFile.bind(this)) - ); + this.applyRemoteChangesWebSocket.onmessage = (event): void => + void this.syncRemotelyUpdatedFile(event.data).catch( + (e: unknown) => { + this.logger.error( + `Failed to sync remotely updated file: ${e}` + ); + } + ); - const lastSeenUpdateId = this.database.getLastSeenUpdateId(); - if ( - lastSeenUpdateId === undefined || - lastSeenUpdateId < remote.lastUpdateId - ) { - this.database.setLastSeenUpdateId(remote.lastUpdateId); - } + this.applyRemoteChangesWebSocket.onerror = (event): void => { + console.error(event); + this.logger.error(`WebSocket error`); + }; + + // The JS WebSocket API doesn't support setting headers, so we have to send the token as a message + this.applyRemoteChangesWebSocket.onopen = (): void => + this.applyRemoteChangesWebSocket?.send(settings.token); } - private async syncRemotelyUpdatedFile( - remoteVersion: components["schemas"]["DocumentVersionWithoutContent"] - ): Promise { + private setWebSocketRefreshInterval(): void { + this.refreshApplyRemoteChangesWebSocketInterval = setInterval(() => { + if ( + this.applyRemoteChangesWebSocket?.readyState === WebSocket.OPEN + ) { + return; + } + this.updateWebSocket(this.settings.getSettings()); + }, 5000); + } + + private async syncRemotelyUpdatedFile(message: string): Promise { + // eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion + const remoteVersion = JSON.parse( + message + ) as components["schemas"]["DocumentVersionWithoutContent"]; + let document = this.database.getDocumentByDocumentId( remoteVersion.documentId );