From 45b86cffe43ce173cfb0e4da40dd82f5e5b6ff27 Mon Sep 17 00:00:00 2001 From: Andras Schmelczer Date: Fri, 8 May 2026 21:36:41 +0100 Subject: [PATCH] split: sync-client services layer Add build-vault-url helper. Rewrite fetch-controller and websocket-manager (plus their tests). Update server-config and sync-service to consume the new error types and the regenerated API types from previous chunks. --- .../src/services/build-vault-url.ts | 8 + .../src/services/fetch-controller.test.ts | 2 +- .../src/services/fetch-controller.ts | 79 +++--- .../sync-client/src/services/server-config.ts | 40 ++- .../sync-client/src/services/sync-service.ts | 181 +++++++------- .../src/services/websocket-manager.test.ts | 71 +++++- .../src/services/websocket-manager.ts | 228 +++++++++++------- 7 files changed, 377 insertions(+), 232 deletions(-) create mode 100644 frontend/sync-client/src/services/build-vault-url.ts diff --git a/frontend/sync-client/src/services/build-vault-url.ts b/frontend/sync-client/src/services/build-vault-url.ts new file mode 100644 index 00000000..1f5002d7 --- /dev/null +++ b/frontend/sync-client/src/services/build-vault-url.ts @@ -0,0 +1,8 @@ +import type { Settings } from "../persistence/settings"; + +export function buildVaultUrl(settings: Settings, path: string): string { + const { vaultName, remoteUri } = settings.getSettings(); + const remoteUriWithoutTrailingSlash = remoteUri.replace(/\/+$/, ""); + const encodedVaultName = encodeURIComponent(vaultName.trim()); + return `${remoteUriWithoutTrailingSlash}/vaults/${encodedVaultName}${path}`; +} diff --git a/frontend/sync-client/src/services/fetch-controller.test.ts b/frontend/sync-client/src/services/fetch-controller.test.ts index 94fa8424..a1b791a6 100644 --- a/frontend/sync-client/src/services/fetch-controller.test.ts +++ b/frontend/sync-client/src/services/fetch-controller.test.ts @@ -3,7 +3,7 @@ import { describe, it, mock, beforeEach, afterEach } from "node:test"; import assert from "node:assert"; import { FetchController } from "./fetch-controller"; import { Logger } from "../tracing/logger"; -import { SyncResetError } from "./sync-reset-error"; +import { SyncResetError } from "../errors/sync-reset-error"; import { sleep } from "../utils/sleep"; describe("FetchController", () => { diff --git a/frontend/sync-client/src/services/fetch-controller.ts b/frontend/sync-client/src/services/fetch-controller.ts index 77b87e3a..f5bb8664 100644 --- a/frontend/sync-client/src/services/fetch-controller.ts +++ b/frontend/sync-client/src/services/fetch-controller.ts @@ -1,6 +1,5 @@ import type { Logger } from "../tracing/logger"; -import { createPromise } from "../utils/create-promise"; -import { SyncResetError } from "./sync-reset-error"; +import { SyncResetError } from "../errors/sync-reset-error"; /** * Offers a resettable fetch implementation that waits until syncing is enabled @@ -13,37 +12,43 @@ export class FetchController { // Promise resolves on the next state change: sync enabled/disabled or reset started/ended private until: Promise; - private resolveUntil: (result: symbol) => unknown; - private rejectUntil: (reason: unknown) => unknown; + private resolveUntil: (value: symbol | PromiseLike) => void; + private rejectUntil: (reason?: unknown) => void; public constructor( private _canFetch: boolean, private readonly logger: Logger ) { - [this.until, this.resolveUntil, this.rejectUntil] = - createPromise(); + ({ + promise: this.until, + resolve: this.resolveUntil, + reject: this.rejectUntil + } = Promise.withResolvers()); } /** - * Whether the fetch implementation can immediately send requests once outside of a reset. - */ + * Whether the fetch implementation can immediately send requests once outside of a reset. + */ public get canFetch(): boolean { return this._canFetch; } /** - * Allow or disallow fetching. The changes only take effect if not resetting. - * When called during a reset, its effect is deferred until the reset is finished. - * - * @param canFetch Whether fetching is enabled - */ + * Allow or disallow fetching. The changes only take effect if not resetting. + * When called during a reset, its effect is deferred until the reset is finished. + * + * @param canFetch Whether fetching is enabled + */ public set canFetch(canFetch: boolean) { this._canFetch = canFetch; if (!this.isResetting) { const previousResolve = this.resolveUntil; - [this.until, this.resolveUntil, this.rejectUntil] = - createPromise(); + ({ + promise: this.until, + resolve: this.resolveUntil, + reject: this.rejectUntil + } = Promise.withResolvers()); previousResolve(FetchController.UNTIL_RESOLUTION); } } @@ -59,9 +64,9 @@ export class FetchController { } /** - * Starts a reset, causing all ongoing and future fetches to be rejected - * with a SyncResetError until finishReset is called. - */ + * Starts a reset, causing all ongoing and future fetches to be rejected + * with a SyncResetError until finishReset is called. + */ public startReset(): void { this.isResetting = true; this.rejectUntil(new SyncResetError()); @@ -72,32 +77,36 @@ export class FetchController { } /** - * Finishes a reset, allowing fetches to proceed or wait again depending on - * the current sync settings. - */ + * Finishes a reset, allowing fetches to proceed or wait again depending on + * the current sync settings. + */ public finishReset(): void { if (!this.isResetting) { return; } this.isResetting = false; - [this.until, this.resolveUntil, this.rejectUntil] = createPromise(); + ({ + promise: this.until, + resolve: this.resolveUntil, + reject: this.rejectUntil + } = Promise.withResolvers()); } /** - * - * |------------------|---------------|-----------------------------------------------------| - * | | Sync enabled | Sync disabled | - * |------------------|-------------- |-----------------------------------------------------| - * | During reset | Rejects with SyncResetError without sending request | - * |------------------|-------------- |-----------------------------------------------------| - * | Outside of reset | Same as fetch | Blocks until sync is enabled and then same as fetch | - * |------------------|---------------|-----------------------------------------------------| - * - * @param logger for errors - * @param fetch to wrap - * @returns a wrapped fetch implementation affected by the FetchController state - */ + * + * |------------------|---------------|-----------------------------------------------------| + * | | Sync enabled | Sync disabled | + * |------------------|-------------- |-----------------------------------------------------| + * | During reset | Rejects with SyncResetError without sending request | + * |------------------|-------------- |-----------------------------------------------------| + * | Outside of reset | Same as fetch | Blocks until sync is enabled and then same as fetch | + * |------------------|---------------|-----------------------------------------------------| + * + * @param logger for errors + * @param fetch to wrap + * @returns a wrapped fetch implementation affected by the FetchController state + */ public getControlledFetchImplementation( logger: Logger, fetch: typeof globalThis.fetch = globalThis.fetch diff --git a/frontend/sync-client/src/services/server-config.ts b/frontend/sync-client/src/services/server-config.ts index 309c637c..187e1bff 100644 --- a/frontend/sync-client/src/services/server-config.ts +++ b/frontend/sync-client/src/services/server-config.ts @@ -1,6 +1,7 @@ import { SUPPORTED_API_VERSION } from "../consts"; -import { AuthenticationError } from "./authentication-error"; -import { ServerVersionMismatchError } from "./server-version-mismatch-error"; +import { AuthenticationError } from "../errors/authentication-error"; +import { ServerVersionMismatchError } from "../errors/server-version-mismatch-error"; +import type { Settings } from "../persistence/settings"; import type { SyncService } from "./sync-service"; import type { PingResponse } from "./types/PingResponse"; @@ -14,7 +15,20 @@ export class ServerConfig { private response: Promise | undefined; private config: ServerConfigData | undefined; - public constructor(private readonly syncService: SyncService) {} + public constructor( + private readonly syncService: SyncService, + settings: Settings + ) { + settings.onSettingsChanged.add((newSettings, oldSettings) => { + if ( + newSettings.token !== oldSettings.token || + newSettings.vaultName !== oldSettings.vaultName || + newSettings.remoteUri !== oldSettings.remoteUri + ) { + this.reset(); + } + }); + } private static validateConfig(config: ServerConfigData): void { if (config.supportedApiVersion !== SUPPORTED_API_VERSION) { @@ -34,11 +48,6 @@ export class ServerConfig { } } - // warm the cache - public async initialize(): Promise { - await this.getConfig(); - } - public async checkConnection(forceUpdate = false): Promise<{ isSuccessful: boolean; message: string; @@ -46,7 +55,7 @@ export class ServerConfig { try { let { response } = this; if (!response || forceUpdate) { - response = this.response = this.syncService.ping(); + response = this.startPing(); } const result: PingResponse = await response; // it must be defined, otherwise we would have thrown above @@ -73,7 +82,7 @@ export class ServerConfig { public async getConfig(): Promise { if (!this.config) { - this.response ??= this.syncService.ping(); + this.response ??= this.startPing(); this.config = await this.response; } @@ -86,4 +95,15 @@ export class ServerConfig { this.response = undefined; this.config = undefined; } + + private async startPing(): Promise { + const pending = this.syncService.ping().catch((e: unknown) => { + if (this.response === pending) { + this.response = undefined; + } + throw e; + }); + this.response = pending; + return pending; + } } diff --git a/frontend/sync-client/src/services/sync-service.ts b/frontend/sync-client/src/services/sync-service.ts index 8190a638..0a99fe84 100644 --- a/frontend/sync-client/src/services/sync-service.ts +++ b/frontend/sync-client/src/services/sync-service.ts @@ -2,25 +2,27 @@ import type { DocumentId, RelativePath, VaultUpdateId -} from "../persistence/database"; +} from "../sync-operations/types"; import type { Logger } from "../tracing/logger"; import type { Settings } from "../persistence/settings"; import type { FetchController } from "./fetch-controller"; import { sleep } from "../utils/sleep"; -import { SyncResetError } from "./sync-reset-error"; +import { SyncResetError } from "../errors/sync-reset-error"; +import { HttpClientError } from "../errors/http-client-error"; import type { SerializedError } from "./types/SerializedError"; import type { DocumentVersionWithoutContent } from "./types/DocumentVersionWithoutContent"; import type { DocumentUpdateResponse } from "./types/DocumentUpdateResponse"; import type { DocumentVersion } from "./types/DocumentVersion"; import type { FetchLatestDocumentsResponse } from "./types/FetchLatestDocumentsResponse"; import type { PingResponse } from "./types/PingResponse"; -import type { DeleteDocumentVersion } from "./types/DeleteDocumentVersion"; import type { UpdateTextDocumentVersion } from "./types/UpdateTextDocumentVersion"; +import { buildVaultUrl } from "./build-vault-url"; export class SyncService { private readonly client: typeof globalThis.fetch; private readonly pingClient: typeof globalThis.fetch; + private isStopped = false; public constructor( private readonly deviceId: string, @@ -65,28 +67,68 @@ export class SyncService { return result; } + private static async throwIfNotOk( + response: Response, + operation: string + ): Promise { + if (response.ok) { + return; + } + const message = `Failed to ${operation}: ${await SyncService.errorFromResponse(response)}`; + // 429 is the only 4xx the server uses for *transient* contention + // (`WriteBusyError` → HTTP 429). Every other 4xx means the request + // is permanently rejected and shouldn't be retried. + if (response.status === 429) { + throw new Error(message); + } + if (response.status >= 400 && response.status < 500) { + throw new HttpClientError(response.status, message); + } + throw new Error(message); + } + + /** + * Signal that the service is shutting down so any in-flight + * `retryForever` exits at its next iteration instead of looping + * indefinitely after the rest of the client has stopped. Idempotent. + */ + public stop(): void { + this.isStopped = true; + } + + /** + * Re-enable the service after a `stop()`. Used when the client pauses + * and resumes syncing within the same lifecycle (e.g. user toggles + * sync off and on). + */ + public resume(): void { + this.isStopped = false; + } + public async create({ - documentId, relativePath, + lastSeenVaultUpdateId, contentBytes }: { - documentId?: DocumentId; relativePath: RelativePath; + lastSeenVaultUpdateId: VaultUpdateId; contentBytes: Uint8Array; - }): Promise { + }): Promise { return this.retryForever(async () => { const formData = new FormData(); - if (documentId !== undefined) { - formData.append("document_id", documentId); - } + formData.append("relative_path", relativePath); + formData.append( + "last_seen_vault_update_id", + lastSeenVaultUpdateId.toString() + ); formData.append( "content", new Blob([new Uint8Array(contentBytes)]) ); this.logger.debug( - `Creating document with id ${documentId} and relative path ${relativePath}` + `Creating document with relative path ${relativePath}` ); const response = await this.client(this.getUrl("/documents"), { @@ -95,16 +137,10 @@ export class SyncService { headers: this.getDefaultHeaders() }); - if (!response.ok) { - throw new Error( - `Failed to create document: ${await SyncService.errorFromResponse( - response - )}` - ); - } + await SyncService.throwIfNotOk(response, "create document"); - const result: DocumentVersionWithoutContent = - (await response.json()) as DocumentVersionWithoutContent; // eslint-disable-line @typescript-eslint/no-unsafe-type-assertion + const result: DocumentUpdateResponse = + (await response.json()) as DocumentUpdateResponse; // eslint-disable-line @typescript-eslint/no-unsafe-type-assertion this.logger.debug(`Created document ${JSON.stringify(result)}`); @@ -120,17 +156,17 @@ export class SyncService { }: { parentVersionId: VaultUpdateId; documentId: DocumentId; - relativePath: RelativePath; + relativePath: RelativePath | undefined; content: (number | string)[]; }): Promise { return this.retryForever(async () => { this.logger.debug( - `Updating text document ${documentId} with parent version ${parentVersionId} and relative path ${relativePath}, content [${content.join(", ")}]` + `Updating text document ${documentId} with parent version ${parentVersionId} and relative path ${relativePath ?? ""}, content [${content.join(", ")}]` ); const request: UpdateTextDocumentVersion = { parentVersionId, - relativePath, + relativePath: relativePath ?? null, content }; @@ -143,13 +179,7 @@ export class SyncService { } ); - if (!response.ok) { - throw new Error( - `Failed to update document: ${await SyncService.errorFromResponse( - response - )}` - ); - } + await SyncService.throwIfNotOk(response, "update document"); const result: DocumentUpdateResponse = (await response.json()) as DocumentUpdateResponse; // eslint-disable-line @typescript-eslint/no-unsafe-type-assertion @@ -172,16 +202,18 @@ export class SyncService { }: { parentVersionId: VaultUpdateId; documentId: DocumentId; - relativePath: RelativePath; + relativePath: RelativePath | undefined; contentBytes: Uint8Array; }): Promise { return this.retryForever(async () => { this.logger.debug( - `Updating binary document ${documentId} with parent version ${parentVersionId} and relative path ${relativePath}` + `Updating binary document ${documentId} with parent version ${parentVersionId} and relative path ${relativePath ?? ""}` ); const formData = new FormData(); formData.append("parent_version_id", parentVersionId.toString()); - formData.append("relative_path", relativePath); + if (relativePath !== undefined) { + formData.append("relative_path", relativePath); + } formData.append( "content", new Blob([new Uint8Array(contentBytes)]) @@ -196,13 +228,7 @@ export class SyncService { } ); - if (!response.ok) { - throw new Error( - `Failed to update document: ${await SyncService.errorFromResponse( - response - )}` - ); - } + await SyncService.throwIfNotOk(response, "update document"); const result: DocumentUpdateResponse = (await response.json()) as DocumentUpdateResponse; // eslint-disable-line @typescript-eslint/no-unsafe-type-assertion @@ -218,44 +244,29 @@ export class SyncService { } public async delete({ - documentId, - relativePath + documentId }: { documentId: DocumentId; - relativePath: RelativePath; }): Promise { return this.retryForever(async () => { - const request: DeleteDocumentVersion = { - relativePath - }; - - this.logger.debug( - `Delete document with id ${documentId} and relative path ${relativePath}` - ); + this.logger.debug(`Delete document with id ${documentId}`); + // The server identifies the document by its URL path; no body + // is needed. Sending one was a leftover of an earlier shape. const response = await this.client( this.getUrl(`/documents/${documentId}`), { method: "DELETE", - body: JSON.stringify(request), - headers: this.getDefaultHeaders({ type: "json" }) + headers: this.getDefaultHeaders() } ); - if (!response.ok) { - throw new Error( - `Failed to delete document: ${await SyncService.errorFromResponse( - response - )}` - ); - } + await SyncService.throwIfNotOk(response, "delete document"); const result: DocumentVersionWithoutContent = (await response.json()) as DocumentVersionWithoutContent; // eslint-disable-line @typescript-eslint/no-unsafe-type-assertion - this.logger.debug( - `Deleted document ${relativePath} with id ${documentId}` - ); + this.logger.debug(`Deleted document with id ${documentId}`); return result; }); @@ -276,13 +287,7 @@ export class SyncService { } ); - if (!response.ok) { - throw new Error( - `Failed to get document: ${await SyncService.errorFromResponse( - response - )}` - ); - } + await SyncService.throwIfNotOk(response, "get document"); const result: DocumentVersion = (await response.json()) as DocumentVersion; // eslint-disable-line @typescript-eslint/no-unsafe-type-assertion @@ -314,13 +319,10 @@ export class SyncService { } ); - if (!response.ok) { - throw new Error( - `Failed to get document: ${await SyncService.errorFromResponse( - response - )}` - ); - } + await SyncService.throwIfNotOk( + response, + "get document version content" + ); const result = await response.bytes(); this.logger.debug( @@ -341,19 +343,13 @@ export class SyncService { const url = new URL(this.getUrl("/documents")); if (since !== undefined) { - url.searchParams.append("since", since.toString()); + url.searchParams.append("since_update_id", since.toString()); } const response = await this.client(url.toString(), { headers: this.getDefaultHeaders() }); - if (!response.ok) { - throw new Error( - `Failed to get documents: ${await SyncService.errorFromResponse( - response - )}` - ); - } + await SyncService.throwIfNotOk(response, "get documents"); const result: FetchLatestDocumentsResponse = (await response.json()) as FetchLatestDocumentsResponse; // eslint-disable-line @typescript-eslint/no-unsafe-type-assertion @@ -390,10 +386,7 @@ export class SyncService { } private getUrl(path: string): string { - const { vaultName, remoteUri } = this.settings.getSettings(); - const remoteUriWithoutTrailingSlash = remoteUri.replace(/\/+$/, ""); - const encodedVaultName = encodeURIComponent(vaultName.trim()); - return `${remoteUriWithoutTrailingSlash}/vaults/${encodedVaultName}${path}`; + return buildVaultUrl(this.settings, path); } private getDefaultHeaders( @@ -414,13 +407,17 @@ export class SyncService { private async retryForever(fn: () => Promise): Promise { // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition while (true) { + this.throwIfStopped(); try { return await fn(); } catch (e) { - // We must not retry errors coming from reset - if (e instanceof SyncResetError) { + if ( + e instanceof SyncResetError || + e instanceof HttpClientError + ) { throw e; } + this.throwIfStopped(); const retryInterval = this.settings.getSettings().networkRetryIntervalMs; @@ -431,4 +428,10 @@ export class SyncService { } } } + + private throwIfStopped(): void { + if (this.isStopped) { + throw new SyncResetError(); + } + } } diff --git a/frontend/sync-client/src/services/websocket-manager.test.ts b/frontend/sync-client/src/services/websocket-manager.test.ts index fef901e7..bde18ef3 100644 --- a/frontend/sync-client/src/services/websocket-manager.test.ts +++ b/frontend/sync-client/src/services/websocket-manager.test.ts @@ -4,8 +4,7 @@ import assert from "node:assert"; import { WebSocketManager } from "./websocket-manager"; import type { Logger } from "../tracing/logger"; import type { Settings } from "../persistence/settings"; -// eslint-disable-next-line @typescript-eslint/no-require-imports -const WebSocket = require("ws") as typeof globalThis.WebSocket; +import { awaitAll } from "../utils/await-all"; class MockCloseEvent extends Event { public code: number; @@ -91,10 +90,8 @@ function createMockFn unknown>( describe("WebSocketManager", () => { let mockLogger: Logger = undefined as unknown as Logger; let mockSettings: Settings = undefined as unknown as Settings; - let deviceId = "test-device-123"; beforeEach(() => { - deviceId = "test-device-123"; const noop = (): void => { // Intentionally empty for mock }; @@ -116,7 +113,6 @@ describe("WebSocketManager", () => { it("cleans up promises after message handling", async () => { const manager = new WebSocketManager( - deviceId, mockLogger, mockSettings, MockWebSocket as unknown as typeof WebSocket @@ -146,7 +142,6 @@ describe("WebSocketManager", () => { it("cleans up cursor position promises", async () => { const manager = new WebSocketManager( - deviceId, mockLogger, mockSettings, MockWebSocket as unknown as typeof WebSocket @@ -176,7 +171,6 @@ describe("WebSocketManager", () => { it("logs handshake send errors", async () => { const manager = new WebSocketManager( - deviceId, mockLogger, mockSettings, MockWebSocket as unknown as typeof WebSocket @@ -205,7 +199,6 @@ describe("WebSocketManager", () => { it("completes stop with timeout protection", async () => { const manager = new WebSocketManager( - deviceId, mockLogger, mockSettings, MockWebSocket as unknown as typeof WebSocket @@ -220,7 +213,6 @@ describe("WebSocketManager", () => { it("clears old handlers on reconnection", async () => { const manager = new WebSocketManager( - deviceId, mockLogger, mockSettings, MockWebSocket as unknown as typeof WebSocket @@ -255,9 +247,68 @@ describe("WebSocketManager", () => { await manager.stop(); }); + it("handles concurrent stop() calls without stranding either caller", async () => { + // Real WebSocket.close() doesn't fire onclose synchronously, and the + // socket stays reachable across the close handshake. Model that + // here so the manager's `while (isWebSocketConnected)` loop is + // actually awaiting when the second stop() races in. Static OPEN + // is required because the manager compares readyState against + // `factory.OPEN`. + class AsyncCloseWebSocket extends MockWebSocket { + public static readonly OPEN = WebSocket.OPEN; + + public override close(code?: number, reason?: string): void { + if ( + this.readyState === WebSocket.CLOSED || + (this as { _closing?: boolean })._closing === true + ) { + return; + } + (this as { _closing?: boolean })._closing = true; + setTimeout(() => { + this.readyState = WebSocket.CLOSED; + this.onclose?.( + new MockCloseEvent("close", { + code: code ?? 1000, + reason: reason ?? "" + }) + ); + }, 5); + } + } + + const manager = new WebSocketManager( + mockLogger, + mockSettings, + AsyncCloseWebSocket as unknown as typeof WebSocket + ); + + manager.start(); + await new Promise((resolve) => setTimeout(resolve, 50)); + + const start = Date.now(); + // Two concurrent stops mimic destroy() racing onSettingsChange. + await awaitAll([manager.stop(), manager.stop()]); + const elapsed = Date.now() - start; + + // Both should resolve via the normal close path; if the second call + // had clobbered the first's resolver, the first would have been + // stranded until the 10s disconnect timeout. + assert.ok( + elapsed < 1000, + `concurrent stop() took ${elapsed}ms — expected fast resolution` + ); + const errorCalls = (mockLogger.error as unknown as { calls: unknown[] }) + .calls; + assert.strictEqual( + errorCalls.length, + 0, + "no timeout-recovery error should be logged" + ); + }); + it("tracks message handling promises", async () => { const manager = new WebSocketManager( - deviceId, mockLogger, mockSettings, MockWebSocket as unknown as typeof WebSocket diff --git a/frontend/sync-client/src/services/websocket-manager.ts b/frontend/sync-client/src/services/websocket-manager.ts index 09787bce..8a4fe34c 100644 --- a/frontend/sync-client/src/services/websocket-manager.ts +++ b/frontend/sync-client/src/services/websocket-manager.ts @@ -4,12 +4,15 @@ import type { WebSocketServerMessage } from "./types/WebSocketServerMessage"; import type { WebSocketClientMessage } from "./types/WebSocketClientMessage"; import type { CursorPositionFromClient } from "./types/CursorPositionFromClient"; import type { ClientCursors } from "./types/ClientCursors"; -import { createPromise } from "../utils/create-promise"; import type { WebSocketVaultUpdate } from "./types/WebSocketVaultUpdate"; -import { WEBSOCKET_DISCONNECT_TIMEOUT_IN_S } from "../consts"; +import { + WEBSOCKET_DISCONNECT_TIMEOUT_IN_SECONDS, + WEBSOCKET_CONNECTION_TIMEOUT_IN_SECONDS +} from "../consts"; import { removeFromArray } from "../utils/remove-from-array"; import { EventListeners } from "../utils/data-structures/event-listeners"; import { awaitAll } from "../utils/await-all"; +import { buildVaultUrl } from "./build-vault-url"; export class WebSocketManager { public readonly onWebSocketStatusChanged = new EventListeners< @@ -26,32 +29,22 @@ export class WebSocketManager { private isStopped = true; private resolveDisconnectingPromise: null | (() => unknown) = null; + private stopPromise: Promise | null = null; private reconnectTimeoutId: ReturnType | undefined; + private connectionTimeoutId: ReturnType | undefined; private readonly outstandingPromises: Promise[] = []; private webSocket: WebSocket | undefined; - private readonly webSocketFactoryImplementation: typeof globalThis.WebSocket; public constructor( - private readonly deviceId: string, private readonly logger: Logger, private readonly settings: Settings, - webSocketImplementation?: typeof globalThis.WebSocket - ) { - if (webSocketImplementation) { - this.webSocketFactoryImplementation = webSocketImplementation; - } else { - if ( - typeof globalThis !== "undefined" && - typeof globalThis.WebSocket === "undefined" - ) { - // eslint-disable-next-line - this.webSocketFactoryImplementation = require("ws"); // polyfill for WebSocket in Node.js - } else { - this.webSocketFactoryImplementation = WebSocket; - } - } + private readonly webSocketFactoryImplementation: typeof globalThis.WebSocket = WebSocket + ) {} + + public get hasOutstandingWork(): boolean { + return this.outstandingPromises.length > 0; } public get isWebSocketConnected(): boolean { @@ -67,49 +60,14 @@ export class WebSocketManager { } public async stop(): Promise { - const [promise, resolve] = createPromise(); - this.resolveDisconnectingPromise = resolve; - - this.isStopped = true; - - if (this.reconnectTimeoutId !== undefined) { - clearTimeout(this.reconnectTimeoutId); - this.reconnectTimeoutId = undefined; - } - - this.webSocket?.close(1000, "WebSocketManager has been stopped"); - - // eslint-disable-next-line @typescript-eslint/init-declarations - let timeoutId: ReturnType | undefined; - const timeoutPromise = new Promise((_, reject) => { - timeoutId = setTimeout(() => { - reject( - new Error( - `Timeout waiting for WebSocket to close after ${WEBSOCKET_DISCONNECT_TIMEOUT_IN_S} seconds` - ) - ); - }, WEBSOCKET_DISCONNECT_TIMEOUT_IN_S * 1000); + // Concurrent callers (e.g. destroy() and onSettingsChange) must share + // the same disconnect; otherwise the second call would overwrite + // resolveDisconnectingPromise and strand the first caller's await + // until the timeout rejects. + this.stopPromise ??= this.performStop().finally(() => { + this.stopPromise = null; }); - - try { - while (this.isWebSocketConnected) { - await Promise.race([promise, timeoutPromise]); - } - } catch (error) { - this.logger.error( - `Error while waiting for WebSocket to close: ${String(error)}` - ); - // Force cleanup even if close didn't work - this.resolveDisconnectingPromise(); - this.resolveDisconnectingPromise = null; - } finally { - // Clear timeout to prevent unhandled rejection - if (timeoutId !== undefined) { - clearTimeout(timeoutId); - } - } - - await this.waitUntilFinished(); + await this.stopPromise; } public async waitUntilFinished(): Promise { @@ -162,6 +120,59 @@ export class WebSocketManager { } } + private async performStop(): Promise { + const { promise, resolve } = Promise.withResolvers(); + this.resolveDisconnectingPromise = (): void => { + resolve(undefined); + }; + + this.isStopped = true; + + if (this.reconnectTimeoutId !== undefined) { + clearTimeout(this.reconnectTimeoutId); + this.reconnectTimeoutId = undefined; + } + + if (this.connectionTimeoutId !== undefined) { + clearTimeout(this.connectionTimeoutId); + this.connectionTimeoutId = undefined; + } + + this.webSocket?.close(1000, "WebSocketManager has been stopped"); + + // eslint-disable-next-line @typescript-eslint/init-declarations + let timeoutId: ReturnType | undefined; + const timeoutPromise = new Promise((_, reject) => { + timeoutId = setTimeout(() => { + reject( + new Error( + `Timeout waiting for WebSocket to close after ${WEBSOCKET_DISCONNECT_TIMEOUT_IN_SECONDS} seconds` + ) + ); + }, WEBSOCKET_DISCONNECT_TIMEOUT_IN_SECONDS * 1000); + }); + + try { + while (this.isWebSocketConnected) { + await Promise.race([promise, timeoutPromise]); + } + } catch (error) { + this.logger.error( + `Error while waiting for WebSocket to close: ${String(error)}` + ); + // Force cleanup even if close didn't work + this.resolveDisconnectingPromise(); + this.resolveDisconnectingPromise = null; + } finally { + // Clear timeout to prevent unhandled rejection + if (timeoutId !== undefined) { + clearTimeout(timeoutId); + } + } + + await this.waitUntilFinished(); + } + private initializeWebSocket(): void { // Clean up old WebSocket handlers to prevent race conditions if (this.webSocket) { @@ -171,26 +182,55 @@ export class WebSocketManager { this.webSocket.onclose = null; this.webSocket.onmessage = null; this.webSocket.onerror = null; - this.webSocket.close(); + this.webSocket.close( + 1000, + "Closing previous WebSocket connection" + ); } catch (e) { this.logger.error( `Failed to close previous WebSocket connection: ${e}` ); } + // Abandon any outstanding handler promises from the previous + // connection. They'll still resolve in the background, but we + // no longer want `waitUntilFinished` / `stop` to block on + // post-reconnect state — and we definitely don't want their + // results applied against a now-stale socket. + this.outstandingPromises.length = 0; } - const wsUri = new URL(this.settings.getSettings().remoteUri); - wsUri.protocol = wsUri.protocol === "https" ? "wss" : "ws"; - wsUri.pathname = `/vaults/${this.settings.getSettings().vaultName}/ws`; + // Build the WS URL through the same vault-URL helper the HTTP client + // uses so vault-name encoding, trailing-slash stripping, and any path + // prefix in `remoteUri` stay in sync between transports. + const wsUri = new URL(buildVaultUrl(this.settings, "/ws")); + wsUri.protocol = wsUri.protocol.startsWith("https") ? "wss" : "ws"; this.logger.info(`Connecting to WebSocket at ${wsUri.toString()}`); - this.webSocket = new this.webSocketFactoryImplementation(wsUri); + const ws = new this.webSocketFactoryImplementation(wsUri); + this.webSocket = ws; + + // Set connection timeout to handle cases where server is down and the WebSocket connection won't open. + // The callback closes the *captured* `ws` rather than `this.webSocket` so a delayed timeout cannot + // accidentally close a freshly-constructed replacement socket. (Closing the already-closed `ws` is a no-op.) + this.connectionTimeoutId = setTimeout(() => { + this.connectionTimeoutId = undefined; + this.logger.warn( + `WebSocket connection timeout after ${WEBSOCKET_CONNECTION_TIMEOUT_IN_SECONDS} seconds` + ); + // Force close to trigger onclose handler which will schedule reconnection + ws.close(1000, "Connection timeout"); + }, WEBSOCKET_CONNECTION_TIMEOUT_IN_SECONDS * 1000); + + ws.onopen = (): void => { + if (this.connectionTimeoutId !== undefined) { + clearTimeout(this.connectionTimeoutId); + this.connectionTimeoutId = undefined; + } - this.webSocket.onopen = (): void => { // Check if we've been stopped while connecting if (this.isStopped) { - this.webSocket?.close( + ws.close( 1000, "WebSocketManager was stopped during connection" ); @@ -200,7 +240,7 @@ export class WebSocketManager { this.onWebSocketStatusChanged.trigger(true); }; - this.webSocket.onmessage = (event): void => { + ws.onmessage = (event): void => { try { // eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion const message = JSON.parse( @@ -231,7 +271,18 @@ export class WebSocketManager { } }; - this.webSocket.onclose = (event): void => { + ws.onerror = (error): void => { + this.logger.warn( + `WebSocket error occurred: ${error instanceof ErrorEvent ? error.message : "Unknown error"}` + ); + }; + + ws.onclose = (event): void => { + if (this.connectionTimeoutId !== undefined) { + clearTimeout(this.connectionTimeoutId); + this.connectionTimeoutId = undefined; + } + this.logger.warn( `WebSocket closed with code ${event.code} (${event.reason == "" ? "unknown reason" : event.reason})` ); @@ -241,10 +292,13 @@ export class WebSocketManager { this.resolveDisconnectingPromise?.(); this.resolveDisconnectingPromise = null; } else { + const delay = + this.settings.getSettings().webSocketRetryIntervalMs; + this.logger.info(`Reconnecting to WebSocket in ${delay}ms...`); this.reconnectTimeoutId = setTimeout(() => { this.reconnectTimeoutId = undefined; this.initializeWebSocket(); - }, this.settings.getSettings().webSocketRetryIntervalMs); + }, delay); } }; } @@ -252,22 +306,22 @@ export class WebSocketManager { private async handleWebSocketMessage( message: WebSocketServerMessage ): Promise { - if (message.type === "vaultUpdate") { - await this.onRemoteVaultUpdateReceived.triggerAsync(message); - - // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition - } else if (message.type === "cursorPositions") { - this.logger.debug( - `Received cursor positions for ${JSON.stringify(message.clients)}` - ); - - await this.onRemoteCursorsUpdateReceived.triggerAsync( - message.clients - ); - } else { - this.logger.warn( - `Received unknown message type: ${JSON.stringify(message)}` - ); + switch (message.type) { + case "vaultUpdate": + await this.onRemoteVaultUpdateReceived.triggerAsync(message); + return; + case "cursorPositions": + this.logger.debug( + `Received cursor positions for ${JSON.stringify(message.clients)}` + ); + await this.onRemoteCursorsUpdateReceived.triggerAsync( + message.clients + ); + return; + default: + this.logger.warn( + `Received unknown message type: ${JSON.stringify(message)}` + ); } } }