split: sync-client services layer

Add build-vault-url helper. Rewrite fetch-controller and websocket-manager
(plus their tests). Update server-config and sync-service to consume the
new error types and the regenerated API types from previous chunks.
This commit is contained in:
Andras Schmelczer 2026-05-08 21:36:41 +01:00
parent 9d99a4ac23
commit 45b86cffe4
7 changed files with 377 additions and 232 deletions

View file

@ -0,0 +1,8 @@
import type { Settings } from "../persistence/settings";
export function buildVaultUrl(settings: Settings, path: string): string {
const { vaultName, remoteUri } = settings.getSettings();
const remoteUriWithoutTrailingSlash = remoteUri.replace(/\/+$/, "");
const encodedVaultName = encodeURIComponent(vaultName.trim());
return `${remoteUriWithoutTrailingSlash}/vaults/${encodedVaultName}${path}`;
}

View file

@ -3,7 +3,7 @@ import { describe, it, mock, beforeEach, afterEach } from "node:test";
import assert from "node:assert"; import assert from "node:assert";
import { FetchController } from "./fetch-controller"; import { FetchController } from "./fetch-controller";
import { Logger } from "../tracing/logger"; import { Logger } from "../tracing/logger";
import { SyncResetError } from "./sync-reset-error"; import { SyncResetError } from "../errors/sync-reset-error";
import { sleep } from "../utils/sleep"; import { sleep } from "../utils/sleep";
describe("FetchController", () => { describe("FetchController", () => {

View file

@ -1,6 +1,5 @@
import type { Logger } from "../tracing/logger"; import type { Logger } from "../tracing/logger";
import { createPromise } from "../utils/create-promise"; import { SyncResetError } from "../errors/sync-reset-error";
import { SyncResetError } from "./sync-reset-error";
/** /**
* Offers a resettable fetch implementation that waits until syncing is enabled * Offers a resettable fetch implementation that waits until syncing is enabled
@ -13,37 +12,43 @@ export class FetchController {
// Promise resolves on the next state change: sync enabled/disabled or reset started/ended // Promise resolves on the next state change: sync enabled/disabled or reset started/ended
private until: Promise<symbol>; private until: Promise<symbol>;
private resolveUntil: (result: symbol) => unknown; private resolveUntil: (value: symbol | PromiseLike<symbol>) => void;
private rejectUntil: (reason: unknown) => unknown; private rejectUntil: (reason?: unknown) => void;
public constructor( public constructor(
private _canFetch: boolean, private _canFetch: boolean,
private readonly logger: Logger private readonly logger: Logger
) { ) {
[this.until, this.resolveUntil, this.rejectUntil] = ({
createPromise<symbol>(); promise: this.until,
resolve: this.resolveUntil,
reject: this.rejectUntil
} = Promise.withResolvers<symbol>());
} }
/** /**
* Whether the fetch implementation can immediately send requests once outside of a reset. * Whether the fetch implementation can immediately send requests once outside of a reset.
*/ */
public get canFetch(): boolean { public get canFetch(): boolean {
return this._canFetch; return this._canFetch;
} }
/** /**
* Allow or disallow fetching. The changes only take effect if not resetting. * Allow or disallow fetching. The changes only take effect if not resetting.
* When called during a reset, its effect is deferred until the reset is finished. * When called during a reset, its effect is deferred until the reset is finished.
* *
* @param canFetch Whether fetching is enabled * @param canFetch Whether fetching is enabled
*/ */
public set canFetch(canFetch: boolean) { public set canFetch(canFetch: boolean) {
this._canFetch = canFetch; this._canFetch = canFetch;
if (!this.isResetting) { if (!this.isResetting) {
const previousResolve = this.resolveUntil; const previousResolve = this.resolveUntil;
[this.until, this.resolveUntil, this.rejectUntil] = ({
createPromise<symbol>(); promise: this.until,
resolve: this.resolveUntil,
reject: this.rejectUntil
} = Promise.withResolvers<symbol>());
previousResolve(FetchController.UNTIL_RESOLUTION); previousResolve(FetchController.UNTIL_RESOLUTION);
} }
} }
@ -59,9 +64,9 @@ export class FetchController {
} }
/** /**
* Starts a reset, causing all ongoing and future fetches to be rejected * Starts a reset, causing all ongoing and future fetches to be rejected
* with a SyncResetError until finishReset is called. * with a SyncResetError until finishReset is called.
*/ */
public startReset(): void { public startReset(): void {
this.isResetting = true; this.isResetting = true;
this.rejectUntil(new SyncResetError()); this.rejectUntil(new SyncResetError());
@ -72,32 +77,36 @@ export class FetchController {
} }
/** /**
* Finishes a reset, allowing fetches to proceed or wait again depending on * Finishes a reset, allowing fetches to proceed or wait again depending on
* the current sync settings. * the current sync settings.
*/ */
public finishReset(): void { public finishReset(): void {
if (!this.isResetting) { if (!this.isResetting) {
return; return;
} }
this.isResetting = false; this.isResetting = false;
[this.until, this.resolveUntil, this.rejectUntil] = createPromise(); ({
promise: this.until,
resolve: this.resolveUntil,
reject: this.rejectUntil
} = Promise.withResolvers<symbol>());
} }
/** /**
* *
* |------------------|---------------|-----------------------------------------------------| * |------------------|---------------|-----------------------------------------------------|
* | | Sync enabled | Sync disabled | * | | Sync enabled | Sync disabled |
* |------------------|-------------- |-----------------------------------------------------| * |------------------|-------------- |-----------------------------------------------------|
* | During reset | Rejects with SyncResetError without sending request | * | During reset | Rejects with SyncResetError without sending request |
* |------------------|-------------- |-----------------------------------------------------| * |------------------|-------------- |-----------------------------------------------------|
* | Outside of reset | Same as fetch | Blocks until sync is enabled and then same as fetch | * | Outside of reset | Same as fetch | Blocks until sync is enabled and then same as fetch |
* |------------------|---------------|-----------------------------------------------------| * |------------------|---------------|-----------------------------------------------------|
* *
* @param logger for errors * @param logger for errors
* @param fetch to wrap * @param fetch to wrap
* @returns a wrapped fetch implementation affected by the FetchController state * @returns a wrapped fetch implementation affected by the FetchController state
*/ */
public getControlledFetchImplementation( public getControlledFetchImplementation(
logger: Logger, logger: Logger,
fetch: typeof globalThis.fetch = globalThis.fetch fetch: typeof globalThis.fetch = globalThis.fetch

View file

@ -1,6 +1,7 @@
import { SUPPORTED_API_VERSION } from "../consts"; import { SUPPORTED_API_VERSION } from "../consts";
import { AuthenticationError } from "./authentication-error"; import { AuthenticationError } from "../errors/authentication-error";
import { ServerVersionMismatchError } from "./server-version-mismatch-error"; import { ServerVersionMismatchError } from "../errors/server-version-mismatch-error";
import type { Settings } from "../persistence/settings";
import type { SyncService } from "./sync-service"; import type { SyncService } from "./sync-service";
import type { PingResponse } from "./types/PingResponse"; import type { PingResponse } from "./types/PingResponse";
@ -14,7 +15,20 @@ export class ServerConfig {
private response: Promise<PingResponse> | undefined; private response: Promise<PingResponse> | undefined;
private config: ServerConfigData | undefined; private config: ServerConfigData | undefined;
public constructor(private readonly syncService: SyncService) {} public constructor(
private readonly syncService: SyncService,
settings: Settings
) {
settings.onSettingsChanged.add((newSettings, oldSettings) => {
if (
newSettings.token !== oldSettings.token ||
newSettings.vaultName !== oldSettings.vaultName ||
newSettings.remoteUri !== oldSettings.remoteUri
) {
this.reset();
}
});
}
private static validateConfig(config: ServerConfigData): void { private static validateConfig(config: ServerConfigData): void {
if (config.supportedApiVersion !== SUPPORTED_API_VERSION) { if (config.supportedApiVersion !== SUPPORTED_API_VERSION) {
@ -34,11 +48,6 @@ export class ServerConfig {
} }
} }
// warm the cache
public async initialize(): Promise<void> {
await this.getConfig();
}
public async checkConnection(forceUpdate = false): Promise<{ public async checkConnection(forceUpdate = false): Promise<{
isSuccessful: boolean; isSuccessful: boolean;
message: string; message: string;
@ -46,7 +55,7 @@ export class ServerConfig {
try { try {
let { response } = this; let { response } = this;
if (!response || forceUpdate) { if (!response || forceUpdate) {
response = this.response = this.syncService.ping(); response = this.startPing();
} }
const result: PingResponse = await response; // it must be defined, otherwise we would have thrown above const result: PingResponse = await response; // it must be defined, otherwise we would have thrown above
@ -73,7 +82,7 @@ export class ServerConfig {
public async getConfig(): Promise<ServerConfigData> { public async getConfig(): Promise<ServerConfigData> {
if (!this.config) { if (!this.config) {
this.response ??= this.syncService.ping(); this.response ??= this.startPing();
this.config = await this.response; this.config = await this.response;
} }
@ -86,4 +95,15 @@ export class ServerConfig {
this.response = undefined; this.response = undefined;
this.config = undefined; this.config = undefined;
} }
private async startPing(): Promise<PingResponse> {
const pending = this.syncService.ping().catch((e: unknown) => {
if (this.response === pending) {
this.response = undefined;
}
throw e;
});
this.response = pending;
return pending;
}
} }

View file

@ -2,25 +2,27 @@ import type {
DocumentId, DocumentId,
RelativePath, RelativePath,
VaultUpdateId VaultUpdateId
} from "../persistence/database"; } from "../sync-operations/types";
import type { Logger } from "../tracing/logger"; import type { Logger } from "../tracing/logger";
import type { Settings } from "../persistence/settings"; import type { Settings } from "../persistence/settings";
import type { FetchController } from "./fetch-controller"; import type { FetchController } from "./fetch-controller";
import { sleep } from "../utils/sleep"; import { sleep } from "../utils/sleep";
import { SyncResetError } from "./sync-reset-error"; import { SyncResetError } from "../errors/sync-reset-error";
import { HttpClientError } from "../errors/http-client-error";
import type { SerializedError } from "./types/SerializedError"; import type { SerializedError } from "./types/SerializedError";
import type { DocumentVersionWithoutContent } from "./types/DocumentVersionWithoutContent"; import type { DocumentVersionWithoutContent } from "./types/DocumentVersionWithoutContent";
import type { DocumentUpdateResponse } from "./types/DocumentUpdateResponse"; import type { DocumentUpdateResponse } from "./types/DocumentUpdateResponse";
import type { DocumentVersion } from "./types/DocumentVersion"; import type { DocumentVersion } from "./types/DocumentVersion";
import type { FetchLatestDocumentsResponse } from "./types/FetchLatestDocumentsResponse"; import type { FetchLatestDocumentsResponse } from "./types/FetchLatestDocumentsResponse";
import type { PingResponse } from "./types/PingResponse"; import type { PingResponse } from "./types/PingResponse";
import type { DeleteDocumentVersion } from "./types/DeleteDocumentVersion";
import type { UpdateTextDocumentVersion } from "./types/UpdateTextDocumentVersion"; import type { UpdateTextDocumentVersion } from "./types/UpdateTextDocumentVersion";
import { buildVaultUrl } from "./build-vault-url";
export class SyncService { export class SyncService {
private readonly client: typeof globalThis.fetch; private readonly client: typeof globalThis.fetch;
private readonly pingClient: typeof globalThis.fetch; private readonly pingClient: typeof globalThis.fetch;
private isStopped = false;
public constructor( public constructor(
private readonly deviceId: string, private readonly deviceId: string,
@ -65,28 +67,68 @@ export class SyncService {
return result; return result;
} }
private static async throwIfNotOk(
response: Response,
operation: string
): Promise<void> {
if (response.ok) {
return;
}
const message = `Failed to ${operation}: ${await SyncService.errorFromResponse(response)}`;
// 429 is the only 4xx the server uses for *transient* contention
// (`WriteBusyError` → HTTP 429). Every other 4xx means the request
// is permanently rejected and shouldn't be retried.
if (response.status === 429) {
throw new Error(message);
}
if (response.status >= 400 && response.status < 500) {
throw new HttpClientError(response.status, message);
}
throw new Error(message);
}
/**
* Signal that the service is shutting down so any in-flight
* `retryForever` exits at its next iteration instead of looping
* indefinitely after the rest of the client has stopped. Idempotent.
*/
public stop(): void {
this.isStopped = true;
}
/**
* Re-enable the service after a `stop()`. Used when the client pauses
* and resumes syncing within the same lifecycle (e.g. user toggles
* sync off and on).
*/
public resume(): void {
this.isStopped = false;
}
public async create({ public async create({
documentId,
relativePath, relativePath,
lastSeenVaultUpdateId,
contentBytes contentBytes
}: { }: {
documentId?: DocumentId;
relativePath: RelativePath; relativePath: RelativePath;
lastSeenVaultUpdateId: VaultUpdateId;
contentBytes: Uint8Array; contentBytes: Uint8Array;
}): Promise<DocumentVersionWithoutContent> { }): Promise<DocumentUpdateResponse> {
return this.retryForever(async () => { return this.retryForever(async () => {
const formData = new FormData(); const formData = new FormData();
if (documentId !== undefined) {
formData.append("document_id", documentId);
}
formData.append("relative_path", relativePath); formData.append("relative_path", relativePath);
formData.append(
"last_seen_vault_update_id",
lastSeenVaultUpdateId.toString()
);
formData.append( formData.append(
"content", "content",
new Blob([new Uint8Array(contentBytes)]) new Blob([new Uint8Array(contentBytes)])
); );
this.logger.debug( this.logger.debug(
`Creating document with id ${documentId} and relative path ${relativePath}` `Creating document with relative path ${relativePath}`
); );
const response = await this.client(this.getUrl("/documents"), { const response = await this.client(this.getUrl("/documents"), {
@ -95,16 +137,10 @@ export class SyncService {
headers: this.getDefaultHeaders() headers: this.getDefaultHeaders()
}); });
if (!response.ok) { await SyncService.throwIfNotOk(response, "create document");
throw new Error(
`Failed to create document: ${await SyncService.errorFromResponse(
response
)}`
);
}
const result: DocumentVersionWithoutContent = const result: DocumentUpdateResponse =
(await response.json()) as DocumentVersionWithoutContent; // eslint-disable-line @typescript-eslint/no-unsafe-type-assertion (await response.json()) as DocumentUpdateResponse; // eslint-disable-line @typescript-eslint/no-unsafe-type-assertion
this.logger.debug(`Created document ${JSON.stringify(result)}`); this.logger.debug(`Created document ${JSON.stringify(result)}`);
@ -120,17 +156,17 @@ export class SyncService {
}: { }: {
parentVersionId: VaultUpdateId; parentVersionId: VaultUpdateId;
documentId: DocumentId; documentId: DocumentId;
relativePath: RelativePath; relativePath: RelativePath | undefined;
content: (number | string)[]; content: (number | string)[];
}): Promise<DocumentUpdateResponse> { }): Promise<DocumentUpdateResponse> {
return this.retryForever(async () => { return this.retryForever(async () => {
this.logger.debug( this.logger.debug(
`Updating text document ${documentId} with parent version ${parentVersionId} and relative path ${relativePath}, content [${content.join(", ")}]` `Updating text document ${documentId} with parent version ${parentVersionId} and relative path ${relativePath ?? "<unchanged>"}, content [${content.join(", ")}]`
); );
const request: UpdateTextDocumentVersion = { const request: UpdateTextDocumentVersion = {
parentVersionId, parentVersionId,
relativePath, relativePath: relativePath ?? null,
content content
}; };
@ -143,13 +179,7 @@ export class SyncService {
} }
); );
if (!response.ok) { await SyncService.throwIfNotOk(response, "update document");
throw new Error(
`Failed to update document: ${await SyncService.errorFromResponse(
response
)}`
);
}
const result: DocumentUpdateResponse = const result: DocumentUpdateResponse =
(await response.json()) as DocumentUpdateResponse; // eslint-disable-line @typescript-eslint/no-unsafe-type-assertion (await response.json()) as DocumentUpdateResponse; // eslint-disable-line @typescript-eslint/no-unsafe-type-assertion
@ -172,16 +202,18 @@ export class SyncService {
}: { }: {
parentVersionId: VaultUpdateId; parentVersionId: VaultUpdateId;
documentId: DocumentId; documentId: DocumentId;
relativePath: RelativePath; relativePath: RelativePath | undefined;
contentBytes: Uint8Array; contentBytes: Uint8Array;
}): Promise<DocumentUpdateResponse> { }): Promise<DocumentUpdateResponse> {
return this.retryForever(async () => { return this.retryForever(async () => {
this.logger.debug( this.logger.debug(
`Updating binary document ${documentId} with parent version ${parentVersionId} and relative path ${relativePath}` `Updating binary document ${documentId} with parent version ${parentVersionId} and relative path ${relativePath ?? "<unchanged>"}`
); );
const formData = new FormData(); const formData = new FormData();
formData.append("parent_version_id", parentVersionId.toString()); formData.append("parent_version_id", parentVersionId.toString());
formData.append("relative_path", relativePath); if (relativePath !== undefined) {
formData.append("relative_path", relativePath);
}
formData.append( formData.append(
"content", "content",
new Blob([new Uint8Array(contentBytes)]) new Blob([new Uint8Array(contentBytes)])
@ -196,13 +228,7 @@ export class SyncService {
} }
); );
if (!response.ok) { await SyncService.throwIfNotOk(response, "update document");
throw new Error(
`Failed to update document: ${await SyncService.errorFromResponse(
response
)}`
);
}
const result: DocumentUpdateResponse = const result: DocumentUpdateResponse =
(await response.json()) as DocumentUpdateResponse; // eslint-disable-line @typescript-eslint/no-unsafe-type-assertion (await response.json()) as DocumentUpdateResponse; // eslint-disable-line @typescript-eslint/no-unsafe-type-assertion
@ -218,44 +244,29 @@ export class SyncService {
} }
public async delete({ public async delete({
documentId, documentId
relativePath
}: { }: {
documentId: DocumentId; documentId: DocumentId;
relativePath: RelativePath;
}): Promise<DocumentVersionWithoutContent> { }): Promise<DocumentVersionWithoutContent> {
return this.retryForever(async () => { return this.retryForever(async () => {
const request: DeleteDocumentVersion = { this.logger.debug(`Delete document with id ${documentId}`);
relativePath
};
this.logger.debug(
`Delete document with id ${documentId} and relative path ${relativePath}`
);
// The server identifies the document by its URL path; no body
// is needed. Sending one was a leftover of an earlier shape.
const response = await this.client( const response = await this.client(
this.getUrl(`/documents/${documentId}`), this.getUrl(`/documents/${documentId}`),
{ {
method: "DELETE", method: "DELETE",
body: JSON.stringify(request), headers: this.getDefaultHeaders()
headers: this.getDefaultHeaders({ type: "json" })
} }
); );
if (!response.ok) { await SyncService.throwIfNotOk(response, "delete document");
throw new Error(
`Failed to delete document: ${await SyncService.errorFromResponse(
response
)}`
);
}
const result: DocumentVersionWithoutContent = const result: DocumentVersionWithoutContent =
(await response.json()) as DocumentVersionWithoutContent; // eslint-disable-line @typescript-eslint/no-unsafe-type-assertion (await response.json()) as DocumentVersionWithoutContent; // eslint-disable-line @typescript-eslint/no-unsafe-type-assertion
this.logger.debug( this.logger.debug(`Deleted document with id ${documentId}`);
`Deleted document ${relativePath} with id ${documentId}`
);
return result; return result;
}); });
@ -276,13 +287,7 @@ export class SyncService {
} }
); );
if (!response.ok) { await SyncService.throwIfNotOk(response, "get document");
throw new Error(
`Failed to get document: ${await SyncService.errorFromResponse(
response
)}`
);
}
const result: DocumentVersion = const result: DocumentVersion =
(await response.json()) as DocumentVersion; // eslint-disable-line @typescript-eslint/no-unsafe-type-assertion (await response.json()) as DocumentVersion; // eslint-disable-line @typescript-eslint/no-unsafe-type-assertion
@ -314,13 +319,10 @@ export class SyncService {
} }
); );
if (!response.ok) { await SyncService.throwIfNotOk(
throw new Error( response,
`Failed to get document: ${await SyncService.errorFromResponse( "get document version content"
response );
)}`
);
}
const result = await response.bytes(); const result = await response.bytes();
this.logger.debug( this.logger.debug(
@ -341,19 +343,13 @@ export class SyncService {
const url = new URL(this.getUrl("/documents")); const url = new URL(this.getUrl("/documents"));
if (since !== undefined) { if (since !== undefined) {
url.searchParams.append("since", since.toString()); url.searchParams.append("since_update_id", since.toString());
} }
const response = await this.client(url.toString(), { const response = await this.client(url.toString(), {
headers: this.getDefaultHeaders() headers: this.getDefaultHeaders()
}); });
if (!response.ok) { await SyncService.throwIfNotOk(response, "get documents");
throw new Error(
`Failed to get documents: ${await SyncService.errorFromResponse(
response
)}`
);
}
const result: FetchLatestDocumentsResponse = const result: FetchLatestDocumentsResponse =
(await response.json()) as FetchLatestDocumentsResponse; // eslint-disable-line @typescript-eslint/no-unsafe-type-assertion (await response.json()) as FetchLatestDocumentsResponse; // eslint-disable-line @typescript-eslint/no-unsafe-type-assertion
@ -390,10 +386,7 @@ export class SyncService {
} }
private getUrl(path: string): string { private getUrl(path: string): string {
const { vaultName, remoteUri } = this.settings.getSettings(); return buildVaultUrl(this.settings, path);
const remoteUriWithoutTrailingSlash = remoteUri.replace(/\/+$/, "");
const encodedVaultName = encodeURIComponent(vaultName.trim());
return `${remoteUriWithoutTrailingSlash}/vaults/${encodedVaultName}${path}`;
} }
private getDefaultHeaders( private getDefaultHeaders(
@ -414,13 +407,17 @@ export class SyncService {
private async retryForever<T>(fn: () => Promise<T>): Promise<T> { private async retryForever<T>(fn: () => Promise<T>): Promise<T> {
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
while (true) { while (true) {
this.throwIfStopped();
try { try {
return await fn(); return await fn();
} catch (e) { } catch (e) {
// We must not retry errors coming from reset if (
if (e instanceof SyncResetError) { e instanceof SyncResetError ||
e instanceof HttpClientError
) {
throw e; throw e;
} }
this.throwIfStopped();
const retryInterval = const retryInterval =
this.settings.getSettings().networkRetryIntervalMs; this.settings.getSettings().networkRetryIntervalMs;
@ -431,4 +428,10 @@ export class SyncService {
} }
} }
} }
private throwIfStopped(): void {
if (this.isStopped) {
throw new SyncResetError();
}
}
} }

View file

@ -4,8 +4,7 @@ import assert from "node:assert";
import { WebSocketManager } from "./websocket-manager"; import { WebSocketManager } from "./websocket-manager";
import type { Logger } from "../tracing/logger"; import type { Logger } from "../tracing/logger";
import type { Settings } from "../persistence/settings"; import type { Settings } from "../persistence/settings";
// eslint-disable-next-line @typescript-eslint/no-require-imports import { awaitAll } from "../utils/await-all";
const WebSocket = require("ws") as typeof globalThis.WebSocket;
class MockCloseEvent extends Event { class MockCloseEvent extends Event {
public code: number; public code: number;
@ -91,10 +90,8 @@ function createMockFn<T extends (...args: unknown[]) => unknown>(
describe("WebSocketManager", () => { describe("WebSocketManager", () => {
let mockLogger: Logger = undefined as unknown as Logger; let mockLogger: Logger = undefined as unknown as Logger;
let mockSettings: Settings = undefined as unknown as Settings; let mockSettings: Settings = undefined as unknown as Settings;
let deviceId = "test-device-123";
beforeEach(() => { beforeEach(() => {
deviceId = "test-device-123";
const noop = (): void => { const noop = (): void => {
// Intentionally empty for mock // Intentionally empty for mock
}; };
@ -116,7 +113,6 @@ describe("WebSocketManager", () => {
it("cleans up promises after message handling", async () => { it("cleans up promises after message handling", async () => {
const manager = new WebSocketManager( const manager = new WebSocketManager(
deviceId,
mockLogger, mockLogger,
mockSettings, mockSettings,
MockWebSocket as unknown as typeof WebSocket MockWebSocket as unknown as typeof WebSocket
@ -146,7 +142,6 @@ describe("WebSocketManager", () => {
it("cleans up cursor position promises", async () => { it("cleans up cursor position promises", async () => {
const manager = new WebSocketManager( const manager = new WebSocketManager(
deviceId,
mockLogger, mockLogger,
mockSettings, mockSettings,
MockWebSocket as unknown as typeof WebSocket MockWebSocket as unknown as typeof WebSocket
@ -176,7 +171,6 @@ describe("WebSocketManager", () => {
it("logs handshake send errors", async () => { it("logs handshake send errors", async () => {
const manager = new WebSocketManager( const manager = new WebSocketManager(
deviceId,
mockLogger, mockLogger,
mockSettings, mockSettings,
MockWebSocket as unknown as typeof WebSocket MockWebSocket as unknown as typeof WebSocket
@ -205,7 +199,6 @@ describe("WebSocketManager", () => {
it("completes stop with timeout protection", async () => { it("completes stop with timeout protection", async () => {
const manager = new WebSocketManager( const manager = new WebSocketManager(
deviceId,
mockLogger, mockLogger,
mockSettings, mockSettings,
MockWebSocket as unknown as typeof WebSocket MockWebSocket as unknown as typeof WebSocket
@ -220,7 +213,6 @@ describe("WebSocketManager", () => {
it("clears old handlers on reconnection", async () => { it("clears old handlers on reconnection", async () => {
const manager = new WebSocketManager( const manager = new WebSocketManager(
deviceId,
mockLogger, mockLogger,
mockSettings, mockSettings,
MockWebSocket as unknown as typeof WebSocket MockWebSocket as unknown as typeof WebSocket
@ -255,9 +247,68 @@ describe("WebSocketManager", () => {
await manager.stop(); await manager.stop();
}); });
it("handles concurrent stop() calls without stranding either caller", async () => {
// Real WebSocket.close() doesn't fire onclose synchronously, and the
// socket stays reachable across the close handshake. Model that
// here so the manager's `while (isWebSocketConnected)` loop is
// actually awaiting when the second stop() races in. Static OPEN
// is required because the manager compares readyState against
// `factory.OPEN`.
class AsyncCloseWebSocket extends MockWebSocket {
public static readonly OPEN = WebSocket.OPEN;
public override close(code?: number, reason?: string): void {
if (
this.readyState === WebSocket.CLOSED ||
(this as { _closing?: boolean })._closing === true
) {
return;
}
(this as { _closing?: boolean })._closing = true;
setTimeout(() => {
this.readyState = WebSocket.CLOSED;
this.onclose?.(
new MockCloseEvent("close", {
code: code ?? 1000,
reason: reason ?? ""
})
);
}, 5);
}
}
const manager = new WebSocketManager(
mockLogger,
mockSettings,
AsyncCloseWebSocket as unknown as typeof WebSocket
);
manager.start();
await new Promise((resolve) => setTimeout(resolve, 50));
const start = Date.now();
// Two concurrent stops mimic destroy() racing onSettingsChange.
await awaitAll([manager.stop(), manager.stop()]);
const elapsed = Date.now() - start;
// Both should resolve via the normal close path; if the second call
// had clobbered the first's resolver, the first would have been
// stranded until the 10s disconnect timeout.
assert.ok(
elapsed < 1000,
`concurrent stop() took ${elapsed}ms — expected fast resolution`
);
const errorCalls = (mockLogger.error as unknown as { calls: unknown[] })
.calls;
assert.strictEqual(
errorCalls.length,
0,
"no timeout-recovery error should be logged"
);
});
it("tracks message handling promises", async () => { it("tracks message handling promises", async () => {
const manager = new WebSocketManager( const manager = new WebSocketManager(
deviceId,
mockLogger, mockLogger,
mockSettings, mockSettings,
MockWebSocket as unknown as typeof WebSocket MockWebSocket as unknown as typeof WebSocket

View file

@ -4,12 +4,15 @@ import type { WebSocketServerMessage } from "./types/WebSocketServerMessage";
import type { WebSocketClientMessage } from "./types/WebSocketClientMessage"; import type { WebSocketClientMessage } from "./types/WebSocketClientMessage";
import type { CursorPositionFromClient } from "./types/CursorPositionFromClient"; import type { CursorPositionFromClient } from "./types/CursorPositionFromClient";
import type { ClientCursors } from "./types/ClientCursors"; import type { ClientCursors } from "./types/ClientCursors";
import { createPromise } from "../utils/create-promise";
import type { WebSocketVaultUpdate } from "./types/WebSocketVaultUpdate"; import type { WebSocketVaultUpdate } from "./types/WebSocketVaultUpdate";
import { WEBSOCKET_DISCONNECT_TIMEOUT_IN_S } from "../consts"; import {
WEBSOCKET_DISCONNECT_TIMEOUT_IN_SECONDS,
WEBSOCKET_CONNECTION_TIMEOUT_IN_SECONDS
} from "../consts";
import { removeFromArray } from "../utils/remove-from-array"; import { removeFromArray } from "../utils/remove-from-array";
import { EventListeners } from "../utils/data-structures/event-listeners"; import { EventListeners } from "../utils/data-structures/event-listeners";
import { awaitAll } from "../utils/await-all"; import { awaitAll } from "../utils/await-all";
import { buildVaultUrl } from "./build-vault-url";
export class WebSocketManager { export class WebSocketManager {
public readonly onWebSocketStatusChanged = new EventListeners< public readonly onWebSocketStatusChanged = new EventListeners<
@ -26,32 +29,22 @@ export class WebSocketManager {
private isStopped = true; private isStopped = true;
private resolveDisconnectingPromise: null | (() => unknown) = null; private resolveDisconnectingPromise: null | (() => unknown) = null;
private stopPromise: Promise<void> | null = null;
private reconnectTimeoutId: ReturnType<typeof setTimeout> | undefined; private reconnectTimeoutId: ReturnType<typeof setTimeout> | undefined;
private connectionTimeoutId: ReturnType<typeof setTimeout> | undefined;
private readonly outstandingPromises: Promise<unknown>[] = []; private readonly outstandingPromises: Promise<unknown>[] = [];
private webSocket: WebSocket | undefined; private webSocket: WebSocket | undefined;
private readonly webSocketFactoryImplementation: typeof globalThis.WebSocket;
public constructor( public constructor(
private readonly deviceId: string,
private readonly logger: Logger, private readonly logger: Logger,
private readonly settings: Settings, private readonly settings: Settings,
webSocketImplementation?: typeof globalThis.WebSocket private readonly webSocketFactoryImplementation: typeof globalThis.WebSocket = WebSocket
) { ) {}
if (webSocketImplementation) {
this.webSocketFactoryImplementation = webSocketImplementation; public get hasOutstandingWork(): boolean {
} else { return this.outstandingPromises.length > 0;
if (
typeof globalThis !== "undefined" &&
typeof globalThis.WebSocket === "undefined"
) {
// eslint-disable-next-line
this.webSocketFactoryImplementation = require("ws"); // polyfill for WebSocket in Node.js
} else {
this.webSocketFactoryImplementation = WebSocket;
}
}
} }
public get isWebSocketConnected(): boolean { public get isWebSocketConnected(): boolean {
@ -67,49 +60,14 @@ export class WebSocketManager {
} }
public async stop(): Promise<void> { public async stop(): Promise<void> {
const [promise, resolve] = createPromise(); // Concurrent callers (e.g. destroy() and onSettingsChange) must share
this.resolveDisconnectingPromise = resolve; // the same disconnect; otherwise the second call would overwrite
// resolveDisconnectingPromise and strand the first caller's await
this.isStopped = true; // until the timeout rejects.
this.stopPromise ??= this.performStop().finally(() => {
if (this.reconnectTimeoutId !== undefined) { this.stopPromise = null;
clearTimeout(this.reconnectTimeoutId);
this.reconnectTimeoutId = undefined;
}
this.webSocket?.close(1000, "WebSocketManager has been stopped");
// eslint-disable-next-line @typescript-eslint/init-declarations
let timeoutId: ReturnType<typeof setTimeout> | undefined;
const timeoutPromise = new Promise<void>((_, reject) => {
timeoutId = setTimeout(() => {
reject(
new Error(
`Timeout waiting for WebSocket to close after ${WEBSOCKET_DISCONNECT_TIMEOUT_IN_S} seconds`
)
);
}, WEBSOCKET_DISCONNECT_TIMEOUT_IN_S * 1000);
}); });
await this.stopPromise;
try {
while (this.isWebSocketConnected) {
await Promise.race([promise, timeoutPromise]);
}
} catch (error) {
this.logger.error(
`Error while waiting for WebSocket to close: ${String(error)}`
);
// Force cleanup even if close didn't work
this.resolveDisconnectingPromise();
this.resolveDisconnectingPromise = null;
} finally {
// Clear timeout to prevent unhandled rejection
if (timeoutId !== undefined) {
clearTimeout(timeoutId);
}
}
await this.waitUntilFinished();
} }
public async waitUntilFinished(): Promise<void> { public async waitUntilFinished(): Promise<void> {
@ -162,6 +120,59 @@ export class WebSocketManager {
} }
} }
private async performStop(): Promise<void> {
const { promise, resolve } = Promise.withResolvers<undefined>();
this.resolveDisconnectingPromise = (): void => {
resolve(undefined);
};
this.isStopped = true;
if (this.reconnectTimeoutId !== undefined) {
clearTimeout(this.reconnectTimeoutId);
this.reconnectTimeoutId = undefined;
}
if (this.connectionTimeoutId !== undefined) {
clearTimeout(this.connectionTimeoutId);
this.connectionTimeoutId = undefined;
}
this.webSocket?.close(1000, "WebSocketManager has been stopped");
// eslint-disable-next-line @typescript-eslint/init-declarations
let timeoutId: ReturnType<typeof setTimeout> | undefined;
const timeoutPromise = new Promise<void>((_, reject) => {
timeoutId = setTimeout(() => {
reject(
new Error(
`Timeout waiting for WebSocket to close after ${WEBSOCKET_DISCONNECT_TIMEOUT_IN_SECONDS} seconds`
)
);
}, WEBSOCKET_DISCONNECT_TIMEOUT_IN_SECONDS * 1000);
});
try {
while (this.isWebSocketConnected) {
await Promise.race([promise, timeoutPromise]);
}
} catch (error) {
this.logger.error(
`Error while waiting for WebSocket to close: ${String(error)}`
);
// Force cleanup even if close didn't work
this.resolveDisconnectingPromise();
this.resolveDisconnectingPromise = null;
} finally {
// Clear timeout to prevent unhandled rejection
if (timeoutId !== undefined) {
clearTimeout(timeoutId);
}
}
await this.waitUntilFinished();
}
private initializeWebSocket(): void { private initializeWebSocket(): void {
// Clean up old WebSocket handlers to prevent race conditions // Clean up old WebSocket handlers to prevent race conditions
if (this.webSocket) { if (this.webSocket) {
@ -171,26 +182,55 @@ export class WebSocketManager {
this.webSocket.onclose = null; this.webSocket.onclose = null;
this.webSocket.onmessage = null; this.webSocket.onmessage = null;
this.webSocket.onerror = null; this.webSocket.onerror = null;
this.webSocket.close(); this.webSocket.close(
1000,
"Closing previous WebSocket connection"
);
} catch (e) { } catch (e) {
this.logger.error( this.logger.error(
`Failed to close previous WebSocket connection: ${e}` `Failed to close previous WebSocket connection: ${e}`
); );
} }
// Abandon any outstanding handler promises from the previous
// connection. They'll still resolve in the background, but we
// no longer want `waitUntilFinished` / `stop` to block on
// post-reconnect state — and we definitely don't want their
// results applied against a now-stale socket.
this.outstandingPromises.length = 0;
} }
const wsUri = new URL(this.settings.getSettings().remoteUri); // Build the WS URL through the same vault-URL helper the HTTP client
wsUri.protocol = wsUri.protocol === "https" ? "wss" : "ws"; // uses so vault-name encoding, trailing-slash stripping, and any path
wsUri.pathname = `/vaults/${this.settings.getSettings().vaultName}/ws`; // prefix in `remoteUri` stay in sync between transports.
const wsUri = new URL(buildVaultUrl(this.settings, "/ws"));
wsUri.protocol = wsUri.protocol.startsWith("https") ? "wss" : "ws";
this.logger.info(`Connecting to WebSocket at ${wsUri.toString()}`); this.logger.info(`Connecting to WebSocket at ${wsUri.toString()}`);
this.webSocket = new this.webSocketFactoryImplementation(wsUri); const ws = new this.webSocketFactoryImplementation(wsUri);
this.webSocket = ws;
// Set connection timeout to handle cases where server is down and the WebSocket connection won't open.
// The callback closes the *captured* `ws` rather than `this.webSocket` so a delayed timeout cannot
// accidentally close a freshly-constructed replacement socket. (Closing the already-closed `ws` is a no-op.)
this.connectionTimeoutId = setTimeout(() => {
this.connectionTimeoutId = undefined;
this.logger.warn(
`WebSocket connection timeout after ${WEBSOCKET_CONNECTION_TIMEOUT_IN_SECONDS} seconds`
);
// Force close to trigger onclose handler which will schedule reconnection
ws.close(1000, "Connection timeout");
}, WEBSOCKET_CONNECTION_TIMEOUT_IN_SECONDS * 1000);
ws.onopen = (): void => {
if (this.connectionTimeoutId !== undefined) {
clearTimeout(this.connectionTimeoutId);
this.connectionTimeoutId = undefined;
}
this.webSocket.onopen = (): void => {
// Check if we've been stopped while connecting // Check if we've been stopped while connecting
if (this.isStopped) { if (this.isStopped) {
this.webSocket?.close( ws.close(
1000, 1000,
"WebSocketManager was stopped during connection" "WebSocketManager was stopped during connection"
); );
@ -200,7 +240,7 @@ export class WebSocketManager {
this.onWebSocketStatusChanged.trigger(true); this.onWebSocketStatusChanged.trigger(true);
}; };
this.webSocket.onmessage = (event): void => { ws.onmessage = (event): void => {
try { try {
// eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion // eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion
const message = JSON.parse( const message = JSON.parse(
@ -231,7 +271,18 @@ export class WebSocketManager {
} }
}; };
this.webSocket.onclose = (event): void => { ws.onerror = (error): void => {
this.logger.warn(
`WebSocket error occurred: ${error instanceof ErrorEvent ? error.message : "Unknown error"}`
);
};
ws.onclose = (event): void => {
if (this.connectionTimeoutId !== undefined) {
clearTimeout(this.connectionTimeoutId);
this.connectionTimeoutId = undefined;
}
this.logger.warn( this.logger.warn(
`WebSocket closed with code ${event.code} (${event.reason == "" ? "unknown reason" : event.reason})` `WebSocket closed with code ${event.code} (${event.reason == "" ? "unknown reason" : event.reason})`
); );
@ -241,10 +292,13 @@ export class WebSocketManager {
this.resolveDisconnectingPromise?.(); this.resolveDisconnectingPromise?.();
this.resolveDisconnectingPromise = null; this.resolveDisconnectingPromise = null;
} else { } else {
const delay =
this.settings.getSettings().webSocketRetryIntervalMs;
this.logger.info(`Reconnecting to WebSocket in ${delay}ms...`);
this.reconnectTimeoutId = setTimeout(() => { this.reconnectTimeoutId = setTimeout(() => {
this.reconnectTimeoutId = undefined; this.reconnectTimeoutId = undefined;
this.initializeWebSocket(); this.initializeWebSocket();
}, this.settings.getSettings().webSocketRetryIntervalMs); }, delay);
} }
}; };
} }
@ -252,22 +306,22 @@ export class WebSocketManager {
private async handleWebSocketMessage( private async handleWebSocketMessage(
message: WebSocketServerMessage message: WebSocketServerMessage
): Promise<void> { ): Promise<void> {
if (message.type === "vaultUpdate") { switch (message.type) {
await this.onRemoteVaultUpdateReceived.triggerAsync(message); case "vaultUpdate":
await this.onRemoteVaultUpdateReceived.triggerAsync(message);
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition return;
} else if (message.type === "cursorPositions") { case "cursorPositions":
this.logger.debug( this.logger.debug(
`Received cursor positions for ${JSON.stringify(message.clients)}` `Received cursor positions for ${JSON.stringify(message.clients)}`
); );
await this.onRemoteCursorsUpdateReceived.triggerAsync(
await this.onRemoteCursorsUpdateReceived.triggerAsync( message.clients
message.clients );
); return;
} else { default:
this.logger.warn( this.logger.warn(
`Received unknown message type: ${JSON.stringify(message)}` `Received unknown message type: ${JSON.stringify(message)}`
); );
} }
} }
} }