Stop leaking promises in ws manager

This commit is contained in:
Andras Schmelczer 2025-11-27 21:21:43 +00:00
parent 3ed2e4f666
commit c3cc678446

View file

@ -7,6 +7,7 @@ import type { ClientCursors } from "./types/ClientCursors";
import { createPromise } from "../utils/create-promise";
import type { WebSocketVaultUpdate } from "./types/WebSocketVaultUpdate";
import { awaitAll } from "../utils/await-all";
import { WEBSOCKET_DISCONNECT_TIMEOUT_IN_S } from "../consts";
export class WebSocketManager {
private readonly webSocketStatusChangeListeners: ((
@ -87,7 +88,6 @@ export class WebSocketManager {
this.isStopped = true;
// Clear pending reconnect timeout
if (this.reconnectTimeoutId !== undefined) {
clearTimeout(this.reconnectTimeoutId);
this.reconnectTimeoutId = undefined;
@ -95,10 +95,40 @@ export class WebSocketManager {
this.webSocket?.close(1000, "WebSocketManager has been stopped");
while (this.isWebSocketConnected) {
await promise;
// eslint-disable-next-line @typescript-eslint/init-declarations
let timeoutId: ReturnType<typeof setTimeout> | undefined;
const timeoutPromise = new Promise<void>((_, reject) => {
timeoutId = setTimeout(() => {
reject(
new Error(
`Timeout waiting for WebSocket to close after ${WEBSOCKET_DISCONNECT_TIMEOUT_IN_S} seconds`
)
);
}, WEBSOCKET_DISCONNECT_TIMEOUT_IN_S * 1000);
});
try {
while (this.isWebSocketConnected) {
await Promise.race([promise, timeoutPromise]);
}
} catch (error) {
this.logger.error(
`Error while waiting for WebSocket to close: ${String(error)}`
);
// Force cleanup even if close didn't work
this.resolveDisconnectingPromise();
this.resolveDisconnectingPromise = null;
} finally {
// Clear timeout to prevent unhandled rejection
if (timeoutId !== undefined) {
clearTimeout(timeoutId);
}
}
await this.waitUntilFinished();
}
public async waitUntilFinished(): Promise<void> {
await awaitAll(this.outstandingPromises);
}
@ -112,41 +142,57 @@ export class WebSocketManager {
);
}
webSocket.send(JSON.stringify(message));
try {
webSocket.send(JSON.stringify(message));
} catch (error) {
this.logger.error(
`Failed to send handshake message: ${String(error)}`
);
throw error;
}
}
public updateLocalCursors(cursorPositions: CursorPositionFromClient): void {
if (!this.isWebSocketConnected) {
if (!this.isWebSocketConnected || !this.webSocket) {
// A missing cursor update is fine, we can just skip it if needed
this.logger.warn(
"WebSocket is not connected, cannot send cursor positions"
);
return;
}
const message: WebSocketClientMessage = {
type: "cursorPositions",
...cursorPositions
};
const { webSocket } = this;
if (!webSocket) {
this.logger.warn(
"WebSocket is not connected, cannot send cursor positions"
try {
this.webSocket.send(JSON.stringify(message));
this.logger.debug(
`Sent cursor positions: ${JSON.stringify(cursorPositions)}`
);
} catch (error) {
this.logger.warn(
`Failed to send cursor positions: ${String(error)}`
);
return;
}
webSocket.send(JSON.stringify(message));
this.logger.debug(
`Sent cursor positions: ${JSON.stringify(cursorPositions)}`
);
}
private initializeWebSocket(): void {
try {
this.webSocket?.close();
} catch (e) {
this.logger.error(
`Failed to close previous WebSocket connection: ${e}`
);
// Clean up old WebSocket handlers to prevent race conditions
if (this.webSocket) {
try {
// Remove handlers to prevent them from firing after new connection
this.webSocket.onopen = null;
this.webSocket.onclose = null;
this.webSocket.onmessage = null;
this.webSocket.onerror = null;
this.webSocket.close();
} catch (e) {
this.logger.error(
`Failed to close previous WebSocket connection: ${e}`
);
}
}
const wsUri = new URL(this.settings.getSettings().remoteUri);
@ -171,13 +217,25 @@ export class WebSocketManager {
event.data
) as WebSocketServerMessage;
void this.handleWebSocketMessage(message).catch(
(error: unknown) => {
// Track the message handling promise
const messageHandlingPromise = this.handleWebSocketMessage(
message
)
.catch((error: unknown) => {
this.logger.error(
`Error handling WebSocket message: ${String(error)}`
);
}
);
})
.finally(() => {
const index = this.outstandingPromises.indexOf(
messageHandlingPromise
);
if (index !== -1) {
void this.outstandingPromises.splice(index, 1); // ignore the returned promise
}
});
void this.outstandingPromises.push(messageHandlingPromise); // ignore the returned promise
} catch (error) {
this.logger.error(
`Error parsing WebSocket message: ${String(error)}`
@ -186,7 +244,7 @@ export class WebSocketManager {
};
this.webSocket.onclose = (event): void => {
this.logger.error(
this.logger.warn(
`WebSocket closed with code ${event.code} (${event.reason == "" ? "unknown reason" : event.reason})`
);
this.webSocketStatusChangeListeners.forEach((listener) =>
@ -209,28 +267,16 @@ export class WebSocketManager {
message: WebSocketServerMessage
): Promise<void> {
if (message.type === "vaultUpdate") {
const promises = this.remoteVaultUpdateListeners.map(
async (listener) => {
const trackedPromise = listener(message)
.catch((error: unknown) => {
this.logger.error(
`Error in vault update listener: ${String(error)}`
);
})
.finally(() => {
const index =
this.outstandingPromises.indexOf(
trackedPromise
);
if (index !== -1) {
// eslint-disable-next-line @typescript-eslint/no-floating-promises
this.outstandingPromises.splice(index, 1);
}
});
await trackedPromise;
}
await awaitAll(
this.remoteVaultUpdateListeners.map(async (listener) => {
await listener(message).catch((error: unknown) => {
this.logger.error(
`Error in vault update listener: ${String(error)}`
);
});
})
);
this.outstandingPromises.push(...promises);
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
} else if (message.type === "cursorPositions") {
this.logger.debug(
@ -239,28 +285,16 @@ export class WebSocketManager {
const filteredClients = message.clients.filter(
(client) => client.deviceId !== this.deviceId
);
const promises = this.remoteCursorsUpdateListeners.map(
async (listener) => {
const trackedPromise = listener(filteredClients)
.catch((error: unknown) => {
this.logger.error(
`Error in cursor positions listener: ${String(error)}`
);
})
.finally(() => {
const index =
this.outstandingPromises.indexOf(
trackedPromise
);
if (index !== -1) {
// eslint-disable-next-line @typescript-eslint/no-floating-promises
this.outstandingPromises.splice(index, 1);
}
});
await trackedPromise;
}
await awaitAll(
this.remoteCursorsUpdateListeners.map(async (listener) => {
await listener(filteredClients).catch((error: unknown) => {
this.logger.error(
`Error in cursor positions listener: ${String(error)}`
);
});
})
);
this.outstandingPromises.push(...promises);
} else {
this.logger.warn(
`Received unknown message type: ${JSON.stringify(message)}`