From c3cc6784460c2b2fa54f1009ea295907032482f4 Mon Sep 17 00:00:00 2001 From: Andras Schmelczer Date: Thu, 27 Nov 2025 21:21:43 +0000 Subject: [PATCH] Stop leaking promises in ws manager --- .../src/services/websocket-manager.ts | 168 +++++++++++------- 1 file changed, 101 insertions(+), 67 deletions(-) diff --git a/frontend/sync-client/src/services/websocket-manager.ts b/frontend/sync-client/src/services/websocket-manager.ts index f5cb64a1..08442290 100644 --- a/frontend/sync-client/src/services/websocket-manager.ts +++ b/frontend/sync-client/src/services/websocket-manager.ts @@ -7,6 +7,7 @@ import type { ClientCursors } from "./types/ClientCursors"; import { createPromise } from "../utils/create-promise"; import type { WebSocketVaultUpdate } from "./types/WebSocketVaultUpdate"; import { awaitAll } from "../utils/await-all"; +import { WEBSOCKET_DISCONNECT_TIMEOUT_IN_S } from "../consts"; export class WebSocketManager { private readonly webSocketStatusChangeListeners: (( @@ -87,7 +88,6 @@ export class WebSocketManager { this.isStopped = true; - // Clear pending reconnect timeout if (this.reconnectTimeoutId !== undefined) { clearTimeout(this.reconnectTimeoutId); this.reconnectTimeoutId = undefined; @@ -95,10 +95,40 @@ export class WebSocketManager { this.webSocket?.close(1000, "WebSocketManager has been stopped"); - while (this.isWebSocketConnected) { - await promise; + // eslint-disable-next-line @typescript-eslint/init-declarations + let timeoutId: ReturnType | undefined; + const timeoutPromise = new Promise((_, reject) => { + timeoutId = setTimeout(() => { + reject( + new Error( + `Timeout waiting for WebSocket to close after ${WEBSOCKET_DISCONNECT_TIMEOUT_IN_S} seconds` + ) + ); + }, WEBSOCKET_DISCONNECT_TIMEOUT_IN_S * 1000); + }); + + try { + while (this.isWebSocketConnected) { + await Promise.race([promise, timeoutPromise]); + } + } catch (error) { + this.logger.error( + `Error while waiting for WebSocket to close: ${String(error)}` + ); + // Force cleanup even if close didn't work + this.resolveDisconnectingPromise(); + this.resolveDisconnectingPromise = null; + } finally { + // Clear timeout to prevent unhandled rejection + if (timeoutId !== undefined) { + clearTimeout(timeoutId); + } } + await this.waitUntilFinished(); + } + + public async waitUntilFinished(): Promise { await awaitAll(this.outstandingPromises); } @@ -112,41 +142,57 @@ export class WebSocketManager { ); } - webSocket.send(JSON.stringify(message)); + try { + webSocket.send(JSON.stringify(message)); + } catch (error) { + this.logger.error( + `Failed to send handshake message: ${String(error)}` + ); + throw error; + } } public updateLocalCursors(cursorPositions: CursorPositionFromClient): void { - if (!this.isWebSocketConnected) { + if (!this.isWebSocketConnected || !this.webSocket) { // A missing cursor update is fine, we can just skip it if needed this.logger.warn( "WebSocket is not connected, cannot send cursor positions" ); return; } + const message: WebSocketClientMessage = { type: "cursorPositions", ...cursorPositions }; - const { webSocket } = this; - if (!webSocket) { - this.logger.warn( - "WebSocket is not connected, cannot send cursor positions" + + try { + this.webSocket.send(JSON.stringify(message)); + this.logger.debug( + `Sent cursor positions: ${JSON.stringify(cursorPositions)}` + ); + } catch (error) { + this.logger.warn( + `Failed to send cursor positions: ${String(error)}` ); - return; } - webSocket.send(JSON.stringify(message)); - this.logger.debug( - `Sent cursor positions: ${JSON.stringify(cursorPositions)}` - ); } private initializeWebSocket(): void { - try { - this.webSocket?.close(); - } catch (e) { - this.logger.error( - `Failed to close previous WebSocket connection: ${e}` - ); + // Clean up old WebSocket handlers to prevent race conditions + if (this.webSocket) { + try { + // Remove handlers to prevent them from firing after new connection + this.webSocket.onopen = null; + this.webSocket.onclose = null; + this.webSocket.onmessage = null; + this.webSocket.onerror = null; + this.webSocket.close(); + } catch (e) { + this.logger.error( + `Failed to close previous WebSocket connection: ${e}` + ); + } } const wsUri = new URL(this.settings.getSettings().remoteUri); @@ -171,13 +217,25 @@ export class WebSocketManager { event.data ) as WebSocketServerMessage; - void this.handleWebSocketMessage(message).catch( - (error: unknown) => { + // Track the message handling promise + const messageHandlingPromise = this.handleWebSocketMessage( + message + ) + .catch((error: unknown) => { this.logger.error( `Error handling WebSocket message: ${String(error)}` ); - } - ); + }) + .finally(() => { + const index = this.outstandingPromises.indexOf( + messageHandlingPromise + ); + if (index !== -1) { + void this.outstandingPromises.splice(index, 1); // ignore the returned promise + } + }); + + void this.outstandingPromises.push(messageHandlingPromise); // ignore the returned promise } catch (error) { this.logger.error( `Error parsing WebSocket message: ${String(error)}` @@ -186,7 +244,7 @@ export class WebSocketManager { }; this.webSocket.onclose = (event): void => { - this.logger.error( + this.logger.warn( `WebSocket closed with code ${event.code} (${event.reason == "" ? "unknown reason" : event.reason})` ); this.webSocketStatusChangeListeners.forEach((listener) => @@ -209,28 +267,16 @@ export class WebSocketManager { message: WebSocketServerMessage ): Promise { if (message.type === "vaultUpdate") { - const promises = this.remoteVaultUpdateListeners.map( - async (listener) => { - const trackedPromise = listener(message) - .catch((error: unknown) => { - this.logger.error( - `Error in vault update listener: ${String(error)}` - ); - }) - .finally(() => { - const index = - this.outstandingPromises.indexOf( - trackedPromise - ); - if (index !== -1) { - // eslint-disable-next-line @typescript-eslint/no-floating-promises - this.outstandingPromises.splice(index, 1); - } - }); - await trackedPromise; - } + await awaitAll( + this.remoteVaultUpdateListeners.map(async (listener) => { + await listener(message).catch((error: unknown) => { + this.logger.error( + `Error in vault update listener: ${String(error)}` + ); + }); + }) ); - this.outstandingPromises.push(...promises); + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition } else if (message.type === "cursorPositions") { this.logger.debug( @@ -239,28 +285,16 @@ export class WebSocketManager { const filteredClients = message.clients.filter( (client) => client.deviceId !== this.deviceId ); - const promises = this.remoteCursorsUpdateListeners.map( - async (listener) => { - const trackedPromise = listener(filteredClients) - .catch((error: unknown) => { - this.logger.error( - `Error in cursor positions listener: ${String(error)}` - ); - }) - .finally(() => { - const index = - this.outstandingPromises.indexOf( - trackedPromise - ); - if (index !== -1) { - // eslint-disable-next-line @typescript-eslint/no-floating-promises - this.outstandingPromises.splice(index, 1); - } - }); - await trackedPromise; - } + + await awaitAll( + this.remoteCursorsUpdateListeners.map(async (listener) => { + await listener(filteredClients).catch((error: unknown) => { + this.logger.error( + `Error in cursor positions listener: ${String(error)}` + ); + }); + }) ); - this.outstandingPromises.push(...promises); } else { this.logger.warn( `Received unknown message type: ${JSON.stringify(message)}`