Simplify syncing logic

This commit is contained in:
Andras Schmelczer 2026-03-28 11:55:37 +00:00
parent e8c57b3a37
commit 4493365076
48 changed files with 1054 additions and 918 deletions

View file

@ -3,7 +3,7 @@ import { describe, it, mock, beforeEach, afterEach } from "node:test";
import assert from "node:assert";
import { FetchController } from "./fetch-controller";
import { Logger } from "../tracing/logger";
import { SyncResetError } from "./sync-reset-error";
import { SyncResetError } from "../errors/sync-reset-error";
import { sleep } from "../utils/sleep";
describe("FetchController", () => {

View file

@ -1,6 +1,6 @@
import type { Logger } from "../tracing/logger";
import { createPromise } from "../utils/create-promise";
import { SyncResetError } from "./sync-reset-error";
import { SyncResetError } from "../errors/sync-reset-error";
/**
* Offers a resettable fetch implementation that waits until syncing is enabled
@ -25,18 +25,18 @@ export class FetchController {
}
/**
* Whether the fetch implementation can immediately send requests once outside of a reset.
*/
* Whether the fetch implementation can immediately send requests once outside of a reset.
*/
public get canFetch(): boolean {
return this._canFetch;
}
/**
* Allow or disallow fetching. The changes only take effect if not resetting.
* When called during a reset, its effect is deferred until the reset is finished.
*
* @param canFetch Whether fetching is enabled
*/
* Allow or disallow fetching. The changes only take effect if not resetting.
* When called during a reset, its effect is deferred until the reset is finished.
*
* @param canFetch Whether fetching is enabled
*/
public set canFetch(canFetch: boolean) {
this._canFetch = canFetch;
@ -59,9 +59,9 @@ export class FetchController {
}
/**
* Starts a reset, causing all ongoing and future fetches to be rejected
* with a SyncResetError until finishReset is called.
*/
* Starts a reset, causing all ongoing and future fetches to be rejected
* with a SyncResetError until finishReset is called.
*/
public startReset(): void {
this.isResetting = true;
this.rejectUntil(new SyncResetError());
@ -72,9 +72,9 @@ export class FetchController {
}
/**
* Finishes a reset, allowing fetches to proceed or wait again depending on
* the current sync settings.
*/
* Finishes a reset, allowing fetches to proceed or wait again depending on
* the current sync settings.
*/
public finishReset(): void {
if (!this.isResetting) {
return;
@ -85,19 +85,19 @@ export class FetchController {
}
/**
*
* |------------------|---------------|-----------------------------------------------------|
* | | Sync enabled | Sync disabled |
* |------------------|-------------- |-----------------------------------------------------|
* | During reset | Rejects with SyncResetError without sending request |
* |------------------|-------------- |-----------------------------------------------------|
* | Outside of reset | Same as fetch | Blocks until sync is enabled and then same as fetch |
* |------------------|---------------|-----------------------------------------------------|
*
* @param logger for errors
* @param fetch to wrap
* @returns a wrapped fetch implementation affected by the FetchController state
*/
*
* |------------------|---------------|-----------------------------------------------------|
* | | Sync enabled | Sync disabled |
* |------------------|-------------- |-----------------------------------------------------|
* | During reset | Rejects with SyncResetError without sending request |
* |------------------|-------------- |-----------------------------------------------------|
* | Outside of reset | Same as fetch | Blocks until sync is enabled and then same as fetch |
* |------------------|---------------|-----------------------------------------------------|
*
* @param logger for errors
* @param fetch to wrap
* @returns a wrapped fetch implementation affected by the FetchController state
*/
public getControlledFetchImplementation(
logger: Logger,
fetch: typeof globalThis.fetch = globalThis.fetch

View file

@ -1,6 +1,6 @@
import { SUPPORTED_API_VERSION } from "../consts";
import { AuthenticationError } from "./authentication-error";
import { ServerVersionMismatchError } from "./server-version-mismatch-error";
import { AuthenticationError } from "../errors/authentication-error";
import { ServerVersionMismatchError } from "../errors/server-version-mismatch-error";
import type { SyncService } from "./sync-service";
import type { PingResponse } from "./types/PingResponse";
@ -34,11 +34,6 @@ export class ServerConfig {
}
}
// warm the cache
public async initialize(): Promise<void> {
await this.getConfig();
}
public async checkConnection(forceUpdate = false): Promise<{
isSuccessful: boolean;
message: string;

View file

@ -8,7 +8,7 @@ import type { Logger } from "../tracing/logger";
import type { Settings } from "../persistence/settings";
import type { FetchController } from "./fetch-controller";
import { sleep } from "../utils/sleep";
import { SyncResetError } from "./sync-reset-error";
import { SyncResetError } from "../errors/sync-reset-error";
import type { SerializedError } from "./types/SerializedError";
import type { DocumentVersionWithoutContent } from "./types/DocumentVersionWithoutContent";
import type { DocumentUpdateResponse } from "./types/DocumentUpdateResponse";
@ -66,19 +66,15 @@ export class SyncService {
}
public async create({
documentId,
relativePath,
contentBytes
}: {
documentId?: DocumentId;
relativePath: RelativePath;
contentBytes: Uint8Array;
}): Promise<DocumentVersionWithoutContent> {
}): Promise<DocumentUpdateResponse> {
return this.retryForever(async () => {
const formData = new FormData();
if (documentId !== undefined) {
formData.append("document_id", documentId);
}
formData.append("relative_path", relativePath);
formData.append(
"content",
@ -86,7 +82,7 @@ export class SyncService {
);
this.logger.debug(
`Creating document with id ${documentId} and relative path ${relativePath}`
`Creating document with relative path ${relativePath}`
);
const response = await this.client(this.getUrl("/documents"), {
@ -103,8 +99,8 @@ export class SyncService {
);
}
const result: DocumentVersionWithoutContent =
(await response.json()) as DocumentVersionWithoutContent; // eslint-disable-line @typescript-eslint/no-unsafe-type-assertion
const result: DocumentUpdateResponse =
(await response.json()) as DocumentUpdateResponse; // eslint-disable-line @typescript-eslint/no-unsafe-type-assertion
this.logger.debug(`Created document ${JSON.stringify(result)}`);

View file

@ -1,4 +1,8 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { DocumentWithCursors } from "./DocumentWithCursors";
export interface ClientCursors { userName: string, deviceId: string, documentsWithCursors: DocumentWithCursors[], }
export interface ClientCursors {
userName: string;
deviceId: string;
documentsWithCursors: DocumentWithCursors[];
}

View file

@ -1,3 +1,6 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
export interface CreateDocumentVersion { relative_path: string, content: number[], }
export interface CreateDocumentVersion {
relative_path: string;
content: number[];
}

View file

@ -1,4 +1,6 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { DocumentWithCursors } from "./DocumentWithCursors";
export interface CursorPositionFromClient { documentsWithCursors: DocumentWithCursors[], }
export interface CursorPositionFromClient {
documentsWithCursors: DocumentWithCursors[];
}

View file

@ -1,4 +1,6 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { ClientCursors } from "./ClientCursors";
export interface CursorPositionFromServer { clients: ClientCursors[], }
export interface CursorPositionFromServer {
clients: ClientCursors[];
}

View file

@ -1,3 +1,6 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
export interface CursorSpan { start: number, end: number, }
export interface CursorSpan {
start: number;
end: number;
}

View file

@ -1,3 +1,5 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
export type DeleteDocumentVersion = Record<string, never>;
export interface DeleteDocumentVersion {
relativePath: string;
}

View file

@ -5,4 +5,6 @@ import type { DocumentVersionWithoutContent } from "./DocumentVersionWithoutCont
/**
* Response to an update document request.
*/
export type DocumentUpdateResponse = { "type": "FastForwardUpdate" } & DocumentVersionWithoutContent | { "type": "MergingUpdate" } & DocumentVersion;
export type DocumentUpdateResponse =
| ({ type: "FastForwardUpdate" } & DocumentVersionWithoutContent)
| ({ type: "MergingUpdate" } & DocumentVersion);

View file

@ -1,3 +1,12 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
export interface DocumentVersion { vaultUpdateId: number, documentId: string, relativePath: string, updatedDate: string, contentBase64: string, isDeleted: boolean, userId: string, deviceId: string, }
export interface DocumentVersion {
vaultUpdateId: number;
documentId: string;
relativePath: string;
updatedDate: string;
contentBase64: string;
isDeleted: boolean;
userId: string;
deviceId: string;
}

View file

@ -1,3 +1,12 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
export interface DocumentVersionWithoutContent { vaultUpdateId: number, documentId: string, relativePath: string, updatedDate: string, isDeleted: boolean, userId: string, deviceId: string, contentSize: number, }
export interface DocumentVersionWithoutContent {
vaultUpdateId: number;
documentId: string;
relativePath: string;
updatedDate: string;
isDeleted: boolean;
userId: string;
deviceId: string;
contentSize: number;
}

View file

@ -1,4 +1,9 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { CursorSpan } from "./CursorSpan";
export interface DocumentWithCursors { vault_update_id: number | null, document_id: string, relative_path: string, cursors: CursorSpan[], }
export interface DocumentWithCursors {
vault_update_id: number | null;
document_id: string;
relative_path: string;
cursors: CursorSpan[];
}

View file

@ -4,8 +4,10 @@ import type { DocumentVersionWithoutContent } from "./DocumentVersionWithoutCont
/**
* Response to a fetch latest documents request.
*/
export interface FetchLatestDocumentsResponse { latestDocuments: DocumentVersionWithoutContent[],
/**
* The update ID of the latest document in the response.
*/
lastUpdateId: bigint, }
export interface FetchLatestDocumentsResponse {
latestDocuments: DocumentVersionWithoutContent[];
/**
* The update ID of the latest document in the response.
*/
lastUpdateId: bigint;
}

View file

@ -3,22 +3,23 @@
/**
* Response to a ping request.
*/
export interface PingResponse {
/**
* Semantic version of the server.
*/
serverVersion: string,
/**
* Whether the client is authenticated based on the sent Authorization
* header.
*/
isAuthenticated: boolean,
/**
* List of file extensions that are allowed to be merged.
*/
mergeableFileExtensions: string[],
/**
* API version ensuring backwards & forwards compatibility between the client
* and server.
*/
supportedApiVersion: number, }
export interface PingResponse {
/**
* Semantic version of the server.
*/
serverVersion: string;
/**
* Whether the client is authenticated based on the sent Authorization
* header.
*/
isAuthenticated: boolean;
/**
* List of file extensions that are allowed to be merged.
*/
mergeableFileExtensions: string[];
/**
* API version ensuring backwards & forwards compatibility between the client
* and server.
*/
supportedApiVersion: number;
}

View file

@ -1,3 +1,7 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
export interface SerializedError { errorType: string, message: string, causes: string[], }
export interface SerializedError {
errorType: string;
message: string;
causes: string[];
}

View file

@ -1,3 +1,7 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
export interface UpdateTextDocumentVersion { parentVersionId: number, relativePath: string, content: (number | string)[], }
export interface UpdateTextDocumentVersion {
parentVersionId: number;
relativePath: string;
content: (number | string)[];
}

View file

@ -2,4 +2,6 @@
import type { CursorPositionFromClient } from "./CursorPositionFromClient";
import type { WebSocketHandshake } from "./WebSocketHandshake";
export type WebSocketClientMessage = { "type": "handshake" } & WebSocketHandshake | { "type": "cursorPositions" } & CursorPositionFromClient;
export type WebSocketClientMessage =
| ({ type: "handshake" } & WebSocketHandshake)
| ({ type: "cursorPositions" } & CursorPositionFromClient);

View file

@ -1,3 +1,7 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
export interface WebSocketHandshake { token: string, deviceId: string, lastSeenVaultUpdateId: number | null, }
export interface WebSocketHandshake {
token: string;
deviceId: string;
lastSeenVaultUpdateId: number | null;
}

View file

@ -2,4 +2,6 @@
import type { CursorPositionFromServer } from "./CursorPositionFromServer";
import type { WebSocketVaultUpdate } from "./WebSocketVaultUpdate";
export type WebSocketServerMessage = { "type": "vaultUpdate" } & WebSocketVaultUpdate | { "type": "cursorPositions" } & CursorPositionFromServer;
export type WebSocketServerMessage =
| ({ type: "vaultUpdate" } & WebSocketVaultUpdate)
| ({ type: "cursorPositions" } & CursorPositionFromServer);

View file

@ -1,4 +1,7 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { DocumentVersionWithoutContent } from "./DocumentVersionWithoutContent";
export interface WebSocketVaultUpdate { documents: DocumentVersionWithoutContent[], isInitialSync: boolean, }
export interface WebSocketVaultUpdate {
documents: DocumentVersionWithoutContent[];
isInitialSync: boolean;
}

View file

@ -4,8 +4,6 @@ import assert from "node:assert";
import { WebSocketManager } from "./websocket-manager";
import type { Logger } from "../tracing/logger";
import type { Settings } from "../persistence/settings";
// eslint-disable-next-line @typescript-eslint/no-require-imports
const WebSocket = require("ws") as typeof globalThis.WebSocket;
class MockCloseEvent extends Event {
public code: number;
@ -91,10 +89,8 @@ function createMockFn<T extends (...args: unknown[]) => unknown>(
describe("WebSocketManager", () => {
let mockLogger: Logger = undefined as unknown as Logger;
let mockSettings: Settings = undefined as unknown as Settings;
let deviceId = "test-device-123";
beforeEach(() => {
deviceId = "test-device-123";
const noop = (): void => {
// Intentionally empty for mock
};
@ -116,7 +112,6 @@ describe("WebSocketManager", () => {
it("cleans up promises after message handling", async () => {
const manager = new WebSocketManager(
deviceId,
mockLogger,
mockSettings,
MockWebSocket as unknown as typeof WebSocket
@ -146,7 +141,6 @@ describe("WebSocketManager", () => {
it("cleans up cursor position promises", async () => {
const manager = new WebSocketManager(
deviceId,
mockLogger,
mockSettings,
MockWebSocket as unknown as typeof WebSocket
@ -176,7 +170,6 @@ describe("WebSocketManager", () => {
it("logs handshake send errors", async () => {
const manager = new WebSocketManager(
deviceId,
mockLogger,
mockSettings,
MockWebSocket as unknown as typeof WebSocket
@ -205,7 +198,6 @@ describe("WebSocketManager", () => {
it("completes stop with timeout protection", async () => {
const manager = new WebSocketManager(
deviceId,
mockLogger,
mockSettings,
MockWebSocket as unknown as typeof WebSocket
@ -220,7 +212,6 @@ describe("WebSocketManager", () => {
it("clears old handlers on reconnection", async () => {
const manager = new WebSocketManager(
deviceId,
mockLogger,
mockSettings,
MockWebSocket as unknown as typeof WebSocket
@ -257,7 +248,6 @@ describe("WebSocketManager", () => {
it("tracks message handling promises", async () => {
const manager = new WebSocketManager(
deviceId,
mockLogger,
mockSettings,
MockWebSocket as unknown as typeof WebSocket

View file

@ -6,7 +6,10 @@ import type { CursorPositionFromClient } from "./types/CursorPositionFromClient"
import type { ClientCursors } from "./types/ClientCursors";
import { createPromise } from "../utils/create-promise";
import type { WebSocketVaultUpdate } from "./types/WebSocketVaultUpdate";
import { WEBSOCKET_DISCONNECT_TIMEOUT_IN_S } from "../consts";
import {
WEBSOCKET_DISCONNECT_TIMEOUT_IN_SECONDS,
WEBSOCKET_CONNECTION_TIMEOUT_IN_SECONDS
} from "../consts";
import { removeFromArray } from "../utils/remove-from-array";
import { EventListeners } from "../utils/data-structures/event-listeners";
import { awaitAll } from "../utils/await-all";
@ -27,32 +30,17 @@ export class WebSocketManager {
private isStopped = true;
private resolveDisconnectingPromise: null | (() => unknown) = null;
private reconnectTimeoutId: ReturnType<typeof setTimeout> | undefined;
private connectionTimeoutId: ReturnType<typeof setTimeout> | undefined;
private readonly outstandingPromises: Promise<unknown>[] = [];
private webSocket: WebSocket | undefined;
private readonly webSocketFactoryImplementation: typeof globalThis.WebSocket;
public constructor(
private readonly deviceId: string,
private readonly logger: Logger,
private readonly settings: Settings,
webSocketImplementation?: typeof globalThis.WebSocket
) {
if (webSocketImplementation) {
this.webSocketFactoryImplementation = webSocketImplementation;
} else {
if (
typeof globalThis !== "undefined" &&
typeof globalThis.WebSocket === "undefined"
) {
// eslint-disable-next-line
this.webSocketFactoryImplementation = require("ws"); // polyfill for WebSocket in Node.js
} else {
this.webSocketFactoryImplementation = WebSocket;
}
}
}
private readonly webSocketFactoryImplementation: typeof globalThis.WebSocket = WebSocket
) {}
public get isWebSocketConnected(): boolean {
return (
@ -77,6 +65,11 @@ export class WebSocketManager {
this.reconnectTimeoutId = undefined;
}
if (this.connectionTimeoutId !== undefined) {
clearTimeout(this.connectionTimeoutId);
this.connectionTimeoutId = undefined;
}
this.webSocket?.close(1000, "WebSocketManager has been stopped");
// eslint-disable-next-line @typescript-eslint/init-declarations
@ -85,10 +78,10 @@ export class WebSocketManager {
timeoutId = setTimeout(() => {
reject(
new Error(
`Timeout waiting for WebSocket to close after ${WEBSOCKET_DISCONNECT_TIMEOUT_IN_S} seconds`
`Timeout waiting for WebSocket to close after ${WEBSOCKET_DISCONNECT_TIMEOUT_IN_SECONDS} seconds`
)
);
}, WEBSOCKET_DISCONNECT_TIMEOUT_IN_S * 1000);
}, WEBSOCKET_DISCONNECT_TIMEOUT_IN_SECONDS * 1000);
});
try {
@ -171,7 +164,10 @@ export class WebSocketManager {
this.webSocket.onclose = null;
this.webSocket.onmessage = null;
this.webSocket.onerror = null;
this.webSocket.close();
this.webSocket.close(
1000,
"Closing previous WebSocket connection"
);
} catch (e) {
this.logger.error(
`Failed to close previous WebSocket connection: ${e}`
@ -187,7 +183,22 @@ export class WebSocketManager {
this.webSocket = new this.webSocketFactoryImplementation(wsUri);
// Set connection timeout to handle cases where server is down and the WebSocket connection won't open
this.connectionTimeoutId = setTimeout(() => {
this.connectionTimeoutId = undefined;
this.logger.warn(
`WebSocket connection timeout after ${WEBSOCKET_CONNECTION_TIMEOUT_IN_SECONDS} seconds`
);
// Force close to trigger onclose handler which will schedule reconnection
this.webSocket?.close(1000, "Connection timeout");
}, WEBSOCKET_CONNECTION_TIMEOUT_IN_SECONDS * 1000);
this.webSocket.onopen = (): void => {
if (this.connectionTimeoutId !== undefined) {
clearTimeout(this.connectionTimeoutId);
this.connectionTimeoutId = undefined;
}
// Check if we've been stopped while connecting
if (this.isStopped) {
this.webSocket?.close(
@ -231,7 +242,18 @@ export class WebSocketManager {
}
};
this.webSocket.onerror = (error): void => {
this.logger.warn(
`WebSocket error occurred: ${error instanceof ErrorEvent ? error.message : "Unknown error"}`
);
};
this.webSocket.onclose = (event): void => {
if (this.connectionTimeoutId !== undefined) {
clearTimeout(this.connectionTimeoutId);
this.connectionTimeoutId = undefined;
}
this.logger.warn(
`WebSocket closed with code ${event.code} (${event.reason == "" ? "unknown reason" : event.reason})`
);
@ -241,10 +263,13 @@ export class WebSocketManager {
this.resolveDisconnectingPromise?.();
this.resolveDisconnectingPromise = null;
} else {
const delay =
this.settings.getSettings().webSocketRetryIntervalMs;
this.logger.info(`Reconnecting to WebSocket in ${delay}ms...`);
this.reconnectTimeoutId = setTimeout(() => {
this.reconnectTimeoutId = undefined;
this.initializeWebSocket();
}, this.settings.getSettings().webSocketRetryIntervalMs);
}, delay);
}
};
}