From cb2a1c0df1318a73c73153eae0907b8a5a078b1d Mon Sep 17 00:00:00 2001 From: Andras Schmelczer Date: Sun, 23 Nov 2025 14:18:49 +0000 Subject: [PATCH] Fix reset logic for WS --- .../src/services/websocket-manager.ts | 177 ++++++++++-------- 1 file changed, 95 insertions(+), 82 deletions(-) diff --git a/frontend/sync-client/src/services/websocket-manager.ts b/frontend/sync-client/src/services/websocket-manager.ts index 8de399e3..06432e89 100644 --- a/frontend/sync-client/src/services/websocket-manager.ts +++ b/frontend/sync-client/src/services/websocket-manager.ts @@ -1,31 +1,38 @@ -import type { Database } from "../persistence/database"; import type { Logger } from "../tracing/logger"; -import type { Settings, SyncSettings } from "../persistence/settings"; +import type { Settings } from "../persistence/settings"; import type { WebSocketServerMessage } from "./types/WebSocketServerMessage"; -import type { Syncer } from "../sync-operations/syncer"; import type { WebSocketClientMessage } from "./types/WebSocketClientMessage"; import type { CursorPositionFromClient } from "./types/CursorPositionFromClient"; import type { ClientCursors } from "./types/ClientCursors"; +import { createPromise } from "../utils/create-promise"; +import { WebSocketVaultUpdate } from "./types/WebSocketVaultUpdate"; export class WebSocketManager { - private readonly webSocketStatusChangeListeners: (() => unknown)[] = []; + private readonly webSocketStatusChangeListeners: (( + isConnected: boolean + ) => unknown)[] = []; + + private readonly remoteVaultUpdateListeners: (( + update: WebSocketVaultUpdate + ) => Promise)[] = []; + private readonly remoteCursorsUpdateListeners: (( cursors: ClientCursors[] - ) => unknown)[] = []; + ) => Promise)[] = []; private webSocket: WebSocket | undefined; private isStopped = true; - private _isFirstSyncCompleted = false; + private resolveDisconnectingPromise: null | (() => unknown) = null; + private reconnectTimeoutId: ReturnType | undefined; + private readonly outstandingPromises: Array> = []; private readonly webSocketFactoryImplementation: typeof globalThis.WebSocket; public constructor( private readonly deviceId: string, private readonly logger: Logger, - private readonly database: Database, private readonly settings: Settings, - private readonly syncer: Syncer, webSocketImplementation?: typeof globalThis.WebSocket ) { if (webSocketImplementation) { @@ -41,16 +48,6 @@ export class WebSocketManager { this.webSocketFactoryImplementation = WebSocket; } } - - settings.addOnSettingsChangeListener((newSettings, oldSettings) => { - if ( - newSettings.remoteUri !== oldSettings.remoteUri || - newSettings.vaultName !== oldSettings.vaultName || - newSettings.token !== oldSettings.token - ) { - this.initializeWebSocket(newSettings); - } - }); } public get isWebSocketConnected(): boolean { @@ -60,42 +57,66 @@ export class WebSocketManager { ); } - public get isFirstSyncCompleted(): boolean { - return this._isFirstSyncCompleted; - } - - public addWebSocketStatusChangeListener(listener: () => unknown): void { + public addWebSocketStatusChangeListener( + listener: (isConnected: boolean) => unknown + ): void { this.webSocketStatusChangeListeners.push(listener); } public addRemoteCursorsUpdateListener( - listener: (cursors: ClientCursors[]) => unknown + listener: (cursors: ClientCursors[]) => Promise ): void { this.remoteCursorsUpdateListeners.push(listener); } - public removeRemoteCursorsUpdateListener( - listener: (cursors: ClientCursors[]) => unknown + public addRemoteVaultUpdateListener( + listener: (update: WebSocketVaultUpdate) => Promise ): void { - const index = this.remoteCursorsUpdateListeners.indexOf(listener); - if (index !== -1) { - this.remoteCursorsUpdateListeners.splice(index, 1); - } + this.remoteVaultUpdateListeners.push(listener); } public start(): void { this.isStopped = false; - this._isFirstSyncCompleted = false; - this.initializeWebSocket(this.settings.getSettings()); + this.initializeWebSocket(); } - public stop(): void { + public async stop(): Promise { + const [promise, resolve] = createPromise(); + this.resolveDisconnectingPromise = resolve; + this.isStopped = true; + + // Clear pending reconnect timeout + if (this.reconnectTimeoutId !== undefined) { + clearTimeout(this.reconnectTimeoutId); + this.reconnectTimeoutId = undefined; + } + this.webSocket?.close(1000, "WebSocketManager has been stopped"); + + while (this.isWebSocketConnected) { + await promise; + } + + await Promise.allSettled(this.outstandingPromises).then(() => {}); + } + + public sendHandshakeMessage( + message: WebSocketClientMessage & { type: "handshake" } + ): void { + const webSocket = this.webSocket; + if (!webSocket) { + throw new Error( + "WebSocket is not connected, cannot send handshake message" + ); + } + + webSocket.send(JSON.stringify(message)); } public updateLocalCursors(cursorPositions: CursorPositionFromClient): void { if (!this.isWebSocketConnected) { + // A missing cursor update is fine, we can just skip it if needed this.logger.warn( "WebSocket is not connected, cannot send cursor positions" ); @@ -105,43 +126,41 @@ export class WebSocketManager { type: "cursorPositions", ...cursorPositions }; - this.webSocket?.send(JSON.stringify(message)); + const webSocket = this.webSocket; + if (!webSocket) { + this.logger.warn( + "WebSocket is not connected, cannot send cursor positions" + ); + return; + } + webSocket.send(JSON.stringify(message)); this.logger.debug( `Sent cursor positions: ${JSON.stringify(cursorPositions)}` ); } - private initializeWebSocket(settings: SyncSettings): void { - if (this.isStopped) { - return; - } - + private initializeWebSocket(): void { try { this.webSocket?.close(); } catch (e) { - this.logger.warn(`Failed to close WebSocket: ${e}`); + this.logger.error( + `Failed to close previous WebSocket connection: ${e}` + ); } - const wsUri = new URL(settings.remoteUri); + const wsUri = new URL(this.settings.getSettings().remoteUri); wsUri.protocol = wsUri.protocol === "https" ? "wss" : "ws"; - wsUri.pathname = `/vaults/${settings.vaultName}/ws`; + wsUri.pathname = `/vaults/${this.settings.getSettings().vaultName}/ws`; this.logger.info(`Connecting to WebSocket at ${wsUri.toString()}`); this.webSocket = new this.webSocketFactoryImplementation(wsUri); - // The JS WebSocket API doesn't support setting headers, so we have to send the token as a message this.webSocket.onopen = (): void => { this.logger.info("WebSocket connection opened"); - this.webSocketStatusChangeListeners.forEach((l) => l()); - - const message: WebSocketClientMessage = { - type: "handshake", - deviceId: this.deviceId, - token: settings.token, - lastSeenVaultUpdateId: this.database.getLastSeenUpdateId() - }; - this.webSocket?.send(JSON.stringify(message)); + this.webSocketStatusChangeListeners.forEach((listener) => + listener(true) + ); }; this.webSocket.onmessage = async (event): Promise => { @@ -151,14 +170,20 @@ export class WebSocketManager { }; this.webSocket.onclose = (event): void => { - this.logger.warn( + this.logger.error( `WebSocket closed with code ${event.code} (${event.reason == "" ? "unknown reason" : event.reason})` ); - this.webSocketStatusChangeListeners.forEach((l) => l()); + this.webSocketStatusChangeListeners.forEach((listener) => + listener(false) + ); - if (!this.isStopped) { - setTimeout(() => { - this.initializeWebSocket(this.settings.getSettings()); + if (this.isStopped) { + this.resolveDisconnectingPromise?.(); + this.resolveDisconnectingPromise = null; + } else { + this.reconnectTimeoutId = setTimeout(() => { + this.reconnectTimeoutId = undefined; + this.initializeWebSocket(); }, this.settings.getSettings().webSocketRetryIntervalMs); } }; @@ -168,37 +193,25 @@ export class WebSocketManager { message: WebSocketServerMessage ): Promise { if (message.type === "vaultUpdate") { - try { - await Promise.all( - message.documents.map(async (document) => - this.syncer.syncRemotelyUpdatedFile(document) - ) - ); - - if (message.isInitialSync && message.documents.length > 0) { - this.database.setLastSeenUpdateId( - message.documents - .map((document) => document.vaultUpdateId) - .reduce((a, b) => Math.max(a, b)) - ); - } - - this._isFirstSyncCompleted = true; - } catch (e) { - this.logger.error(`Failed to sync remotely updated file: ${e}`); - } + this.outstandingPromises.push( + ...this.remoteVaultUpdateListeners.map((listener) => + listener(message) + ) + ); // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition } else if (message.type === "cursorPositions") { this.logger.debug( `Received cursor positions for ${JSON.stringify(message.clients)}` ); - this.remoteCursorsUpdateListeners.forEach((listener) => { - listener( - message.clients.filter( - (client) => client.deviceId !== this.deviceId + this.outstandingPromises.push( + ...this.remoteCursorsUpdateListeners.map((listener) => + listener( + message.clients.filter( + (client) => client.deviceId !== this.deviceId + ) ) - ); - }); + ) + ); } else { this.logger.warn( `Received unknown message type: ${JSON.stringify(message)}`