diff --git a/frontend/sync-client/package.json b/frontend/sync-client/package.json index aa369fa7..45c33764 100644 --- a/frontend/sync-client/package.json +++ b/frontend/sync-client/package.json @@ -14,19 +14,17 @@ }, "devDependencies": { "byte-base64": "^1.1.0", - "minimatch": "^10.0.1", - "p-queue": "^8.1.0", + "minimatch": "^10.1.1", + "p-queue": "^9.0.1", "reconcile-text": "^0.8.0", - "uuid": "^13.0.0", - "@types/node": "^24.8.1", - "ts-loader": "^9.5.2", + "@types/node": "^25.0.2", + "ts-loader": "^9.5.4", "tslib": "2.8.1", - "tsx": "^4.20.6", - "typescript": "5.8.3", - "webpack": "^5.99.9", + "tsx": "^4.21.0", + "typescript": "5.9.3", + "webpack": "^5.103.0", "webpack-cli": "^6.0.1", "webpack-merge": "^6.0.1", - "@sentry/browser": "^10.8.0", - "ws": "^8.18.3" + "@sentry/browser": "^10.30.0" } } diff --git a/frontend/sync-client/src/consts.ts b/frontend/sync-client/src/consts.ts index da70ba47..9e4fa7d2 100644 --- a/frontend/sync-client/src/consts.ts +++ b/frontend/sync-client/src/consts.ts @@ -2,5 +2,6 @@ export const TIMEOUT_FOR_MERGING_HISTORY_ENTRIES_IN_SECONDS = 60; export const DIFF_CACHE_SIZE_MB = 2; export const MAX_LOG_MESSAGE_COUNT = 100000; export const MAX_HISTORY_ENTRY_COUNT = 5000; -export const SUPPORTED_API_VERSION = 2; -export const WEBSOCKET_DISCONNECT_TIMEOUT_IN_S = 10; +export const SUPPORTED_API_VERSION = 3; +export const WEBSOCKET_DISCONNECT_TIMEOUT_IN_SECONDS = 10; +export const WEBSOCKET_CONNECTION_TIMEOUT_IN_SECONDS = 10; diff --git a/frontend/sync-client/src/errors/authentication-error.ts b/frontend/sync-client/src/errors/authentication-error.ts new file mode 100644 index 00000000..6be4af24 --- /dev/null +++ b/frontend/sync-client/src/errors/authentication-error.ts @@ -0,0 +1,6 @@ +export class AuthenticationError extends Error { + public constructor(message: string) { + super(message); + this.name = "AuthenticationError"; + } +} diff --git a/frontend/sync-client/src/errors/file-not-found-error.ts b/frontend/sync-client/src/errors/file-not-found-error.ts new file mode 100644 index 00000000..b8acd265 --- /dev/null +++ b/frontend/sync-client/src/errors/file-not-found-error.ts @@ -0,0 +1,9 @@ +export class FileNotFoundError extends Error { + public constructor( + message: string, + public readonly filePath: string + ) { + super(message); + this.name = "FileNotFoundError"; + } +} diff --git a/frontend/sync-client/src/errors/server-version-mismatch-error.ts b/frontend/sync-client/src/errors/server-version-mismatch-error.ts new file mode 100644 index 00000000..0b9960ea --- /dev/null +++ b/frontend/sync-client/src/errors/server-version-mismatch-error.ts @@ -0,0 +1,6 @@ +export class ServerVersionMismatchError extends Error { + public constructor(message: string) { + super(message); + this.name = "ServerVersionMismatchError"; + } +} diff --git a/frontend/sync-client/src/errors/sync-reset-error.ts b/frontend/sync-client/src/errors/sync-reset-error.ts new file mode 100644 index 00000000..7b74e0b9 --- /dev/null +++ b/frontend/sync-client/src/errors/sync-reset-error.ts @@ -0,0 +1,6 @@ +export class SyncResetError extends Error { + public constructor() { + super("SyncClient has been reset, cleaning up"); + this.name = "SyncResetError"; + } +} diff --git a/frontend/sync-client/src/file-operations/file-operations.test.ts b/frontend/sync-client/src/file-operations/file-operations.test.ts index 998e47ec..27724ee9 100644 --- a/frontend/sync-client/src/file-operations/file-operations.test.ts +++ b/frontend/sync-client/src/file-operations/file-operations.test.ts @@ -23,7 +23,7 @@ class MockServerConfig implements Pick { class MockDatabase implements Partial { public getLatestDocumentByRelativePath( - _find: RelativePath + _target: RelativePath ): DocumentRecord | undefined { // no-op return undefined; diff --git a/frontend/sync-client/src/file-operations/file-operations.ts b/frontend/sync-client/src/file-operations/file-operations.ts index 2864bd20..863f62af 100644 --- a/frontend/sync-client/src/file-operations/file-operations.ts +++ b/frontend/sync-client/src/file-operations/file-operations.ts @@ -45,11 +45,11 @@ export class FileOperations { } /** - * Create a file at the specified path. - * - * If a file with the same name already exists, it is moved before creating the new one. - * Parent directories are created if necessary. - */ + * Create a file at the specified path. + * + * If a file with the same name already exists, it is moved before creating the new one. + * Parent directories are created if necessary. + */ public async create( path: RelativePath, newContent: Uint8Array @@ -77,11 +77,11 @@ export class FileOperations { } /** - * Update the file at the given path. - * - * Performs a 3-way merge before writing if the file's content differs from `expectedContent`. - * Does not recreate the file if it no longer exists, returning an empty array instead. - */ + * Update the file at the given path. + * + * Performs a 3-way merge before writing if the file's content differs from `expectedContent`. + * Does not recreate the file if it no longer exists, returning an empty array instead. + */ public async write( path: RelativePath, expectedContent: Uint8Array, @@ -169,9 +169,9 @@ export class FileOperations { } await this.ensureClearPath(newPath); - this.database.move(oldPath, newPath); await this.fs.rename(oldPath, newPath); + await this.deletingEmptyParentDirectoriesOfDeletedFile(oldPath); } @@ -239,12 +239,12 @@ export class FileOperations { } /** - * Deconflicts the given path by appending (1), (2), etc. before the file extension until a non-existent path is found. - * The returned path has a lock acquired on it; it must be released by the caller when no longer needed. - * - * @param path The starting path to deconflict - * @returns a non-existent path with a lock acquired on it - */ + * Deconflicts the given path by appending (1), (2), etc. before the file extension until a non-existent path is found. + * The returned path has a lock acquired on it; it must be released by the caller when no longer needed. + * + * @param path The starting path to deconflict + * @returns a non-existent path with a lock acquired on it + */ private async deconflictPath(path: RelativePath): Promise { // eslint-disable-next-line prefer-const let [directory, fileName] = FileOperations.getParentDirAndFile(path); diff --git a/frontend/sync-client/src/file-operations/safe-filesystem-operations.ts b/frontend/sync-client/src/file-operations/safe-filesystem-operations.ts index 904bf805..3bd84266 100644 --- a/frontend/sync-client/src/file-operations/safe-filesystem-operations.ts +++ b/frontend/sync-client/src/file-operations/safe-filesystem-operations.ts @@ -2,7 +2,7 @@ import type { RelativePath } from "../persistence/database"; import type { FileSystemOperations } from "./filesystem-operations"; import type { Logger } from "../tracing/logger"; import { Locks } from "../utils/data-structures/locks"; -import { FileNotFoundError } from "./file-not-found-error"; +import { FileNotFoundError } from "../errors/file-not-found-error"; import type { TextWithCursors } from "reconcile-text"; /** @@ -17,7 +17,7 @@ export class SafeFileSystemOperations implements FileSystemOperations { private readonly fs: FileSystemOperations, private readonly logger: Logger ) { - this.locks = new Locks(logger); + this.locks = new Locks(SafeFileSystemOperations.name, logger); } public async listFilesRecursively( @@ -135,10 +135,10 @@ export class SafeFileSystemOperations implements FileSystemOperations { } /** - * Decorate an operation to ensure that the file exists before running it. - * If the operation fails, it will check if the file still exists and throw - * a FileNotFoundError if it doesn't. - */ + * Decorate an operation to ensure that the file exists before running it. + * If the operation fails, it will check if the file still exists and throw + * a FileNotFoundError if it doesn't. + */ private async safeOperation( path: RelativePath, operation: () => Promise, diff --git a/frontend/sync-client/src/index.ts b/frontend/sync-client/src/index.ts index cfcc5071..c4e4313d 100644 --- a/frontend/sync-client/src/index.ts +++ b/frontend/sync-client/src/index.ts @@ -2,6 +2,7 @@ import { awaitAll } from "./utils/await-all"; import { logToConsole } from "./utils/debugging/log-to-console"; import { slowFetchFactory } from "./utils/debugging/slow-fetch-factory"; import { slowWebSocketFactory } from "./utils/debugging/slow-web-socket-factory"; +import { InMemoryFileSystem } from "./utils/debugging/in-memory-file-system"; import { getRandomColor } from "./utils/get-random-color"; import { lineAndColumnToPosition } from "./utils/line-and-column-to-position"; import { positionToLineAndColumn } from "./utils/position-to-line-and-column"; @@ -27,8 +28,8 @@ export type { PersistenceProvider } from "./persistence/persistence"; export type { CursorSpan } from "./services/types/CursorSpan"; export type { ClientCursors } from "./services/types/ClientCursors"; export type { NetworkConnectionStatus } from "./types/network-connection-status"; -export type { ServerVersionMismatchError } from "./services/server-version-mismatch-error"; -export type { AuthenticationError } from "./services/authentication-error"; +export type { ServerVersionMismatchError } from "./errors/server-version-mismatch-error"; +export type { AuthenticationError } from "./errors/authentication-error"; export type { MaybeOutdatedClientCursors } from "./types/maybe-outdated-client-cursors"; export { DocumentSyncStatus } from "./types/document-sync-status"; export { SyncClient } from "./sync-client"; @@ -37,7 +38,8 @@ export type { TextWithCursors, CursorPosition } from "reconcile-text"; export const debugging = { slowFetchFactory, slowWebSocketFactory, - logToConsole + logToConsole, + InMemoryFileSystem }; export const utils = { diff --git a/frontend/sync-client/src/persistence/database.ts b/frontend/sync-client/src/persistence/database.ts index 86b2845c..2a5e901e 100644 --- a/frontend/sync-client/src/persistence/database.ts +++ b/frontend/sync-client/src/persistence/database.ts @@ -9,6 +9,7 @@ export type DocumentId = string; export type RelativePath = string; export interface DocumentMetadata { + documentId: DocumentId; parentVersionId: VaultUpdateId; hash: string; remoteRelativePath?: RelativePath; @@ -25,7 +26,6 @@ export interface StoredDocumentMetadata { export interface StoredDatabase { documents: StoredDocumentMetadata[]; lastSeenUpdateId: VaultUpdateId | undefined; - hasInitialSyncCompleted: boolean; } /** @@ -36,17 +36,14 @@ export interface StoredDatabase { */ export interface DocumentRecord { relativePath: RelativePath; - documentId: DocumentId; metadata: DocumentMetadata | undefined; isDeleted: boolean; - updates: Promise[]; parallelVersion: number; } export class Database { private documents: DocumentRecord[]; private lastSeenUpdateIds: CoveredValues; - private hasInitialSyncCompleted: boolean; public constructor( private readonly logger: Logger, @@ -56,16 +53,12 @@ export class Database { initialState ??= {}; this.documents = - initialState.documents?.map( - ({ relativePath, documentId, ...metadata }) => ({ - relativePath, - documentId, - metadata, - isDeleted: false, - updates: [], - parallelVersion: 0 - }) - ) ?? []; + initialState.documents?.map(({ relativePath, ...metadata }) => ({ + relativePath, + metadata, + isDeleted: false, + parallelVersion: 0 + })) ?? []; this.ensureConsistency(); this.logger.debug(`Loaded ${this.documents.length} documents`); @@ -79,12 +72,6 @@ export class Database { this.documents.forEach((doc) => { this.lastSeenUpdateIds.add(doc.metadata?.parentVersionId); }); - - this.hasInitialSyncCompleted = - initialState.hasInitialSyncCompleted ?? false; - this.logger.debug( - `Loaded hasInitialSyncCompleted: ${this.hasInitialSyncCompleted}` - ); } public get length(): number { @@ -127,91 +114,51 @@ export class Database { public updateDocumentMetadata( metadata: { + documentId: DocumentId; parentVersionId: VaultUpdateId; hash: string; remoteRelativePath: RelativePath; }, - toUpdate: DocumentRecord + target: DocumentRecord ): void { - if (!this.documents.includes(toUpdate)) { + if (!this.documents.includes(target)) { throw new Error("Document not found in database"); } - toUpdate.metadata = metadata; - - this.saveInTheBackground(); - } - - public removeDocumentPromise(promise: Promise): void { - const entry = this.documents.find(({ updates }) => - updates.includes(promise) + this.logger.debug( + `Updating document metadata for ${target.relativePath} from ${JSON.stringify( + target.metadata, + null, + 2 + )} to ${JSON.stringify(metadata, null, 2)}` ); - if (entry === undefined) { - // This method should be idempotent and tolerant of - // stragglers calling it after the databse has been reset. - return; - } + target.metadata = metadata; - removeFromArray(entry.updates, promise); - // No need to save as Promises don't get serialized - } - - public removeDocument(find: DocumentRecord): void { - removeFromArray(this.documents, find); this.saveInTheBackground(); } public getLatestDocumentByRelativePath( - find: RelativePath + target: RelativePath ): DocumentRecord | undefined { const candidates = this.documents.filter( - ({ relativePath }) => relativePath === find + ({ relativePath }) => relativePath === target ); candidates.sort((a, b) => b.parallelVersion - a.parallelVersion); // descending return candidates[0]; } - public async getResolvedDocumentByRelativePath( - relativePath: RelativePath, - promise: Promise - ): Promise { - const entry = this.getLatestDocumentByRelativePath(relativePath); - - if (entry === undefined) { - throw new Error( - `Document not found by relative path: ${relativePath}, ${JSON.stringify( - this.documents, - null, - 2 - )}` - ); - } - - const currentPromises = entry.updates; - entry.updates = [...currentPromises, promise]; - await awaitAll(currentPromises); - - return entry; - } - public createNewPendingDocument( - documentId: DocumentId, - relativePath: RelativePath, - promise: Promise + relativePath: RelativePath ): DocumentRecord { - this.logger.debug( - `Creating new pending document: ${relativePath} (${documentId})` - ); + this.logger.debug(`Creating new pending document: ${relativePath}`); const previousEntry = this.getLatestDocumentByRelativePath(relativePath); const entry = { relativePath, - documentId, metadata: undefined, isDeleted: false, - updates: [promise], parallelVersion: previousEntry?.parallelVersion === undefined ? 0 @@ -219,39 +166,18 @@ export class Database { }; this.documents.push(entry); - this.saveInTheBackground(); - return entry; - } - - public createNewEmptyDocument( - documentId: DocumentId, - parentVersionId: VaultUpdateId, - relativePath: RelativePath - ): DocumentRecord { - const entry = { - relativePath, - documentId, - metadata: { - parentVersionId, - hash: EMPTY_HASH, - remoteRelativePath: relativePath - }, - isDeleted: false, - updates: [], - parallelVersion: 0 - }; - - this.documents.push(entry); - this.saveInTheBackground(); + // no need to save as we only save documents which have metadata return entry; } public getDocumentByDocumentId( - find: DocumentId + target: DocumentId ): DocumentRecord | undefined { - return this.documents.find(({ documentId }) => documentId === find); + return this.documents.find( + ({ metadata }) => metadata?.documentId === target + ); } public move( @@ -274,7 +200,7 @@ export class Database { } oldDocument.relativePath = newRelativePath; - // We're in a strange state where the target of the move has just got deleted, + // We might be in a strange state where the target of the move has just got deleted, // however, its metadata might already have a bunch of updates queued up for // the document at the new location. We need to keep these updates. oldDocument.parallelVersion = @@ -286,19 +212,13 @@ export class Database { public delete(relativePath: RelativePath): void { const candidate = this.getLatestDocumentByRelativePath(relativePath); if (candidate === undefined) { - throw new Error( - `Document not found by relative path: ${relativePath}` - ); + return; } candidate.isDeleted = true; } - public getHasInitialSyncCompleted(): boolean { - return this.hasInitialSyncCompleted; - } - - public setHasInitialSyncCompleted(value: boolean): void { - this.hasInitialSyncCompleted = value; + public removeDocument(target: DocumentRecord): void { + removeFromArray(this.documents, target); this.saveInTheBackground(); } @@ -324,43 +244,50 @@ export class Database { this.lastSeenUpdateIds = new CoveredValues( 0 // the first updateId will be 1 which is the first integer after -1 ); - this.hasInitialSyncCompleted = false; this.saveInTheBackground(); } public async save(): Promise { return this.saveData({ documents: this.resolvedDocuments.map( - ({ relativePath, documentId, metadata }) => ({ - documentId, + ({ relativePath, metadata }) => ({ relativePath, // eslint-disable-next-line @typescript-eslint/no-non-null-assertion ...metadata! // `resolvedDocuments` only returns docs with metadata set }) ), - lastSeenUpdateId: this.lastSeenUpdateIds.min, - hasInitialSyncCompleted: this.hasInitialSyncCompleted + lastSeenUpdateId: this.lastSeenUpdateIds.min }); } private ensureConsistency(): void { const idToPath = new Map(); - this.resolvedDocuments.forEach(({ relativePath, documentId }) => { - idToPath.set(documentId, [ - ...(idToPath.get(documentId) ?? []), + this.resolvedDocuments.forEach(({ relativePath, metadata }) => { + if (metadata === undefined) { + return; + } + idToPath.set(metadata.documentId, [ + ...(idToPath.get(metadata.documentId) ?? []), relativePath ]); }); const duplicates = Array.from(idToPath.entries()) .filter(([_, paths]) => paths.length > 1) - .map(([id, paths]) => `${id} (${paths.join(", ")})`); + .map(([id, paths]) => { + let details = ""; + for (const path of paths) { + const doc = this.getLatestDocumentByRelativePath(path); + details += `\n- ${JSON.stringify(doc, null, 2)}`; + } + return `${id} (${paths.join(", ")}): ${details}`; + }); if (duplicates.length > 0) { throw new Error( "Document IDs are not unique, found duplicates: " + - duplicates.join("; ") + duplicates.join("; ") ); } } diff --git a/frontend/sync-client/src/persistence/settings.ts b/frontend/sync-client/src/persistence/settings.ts index d78170e6..9771b7f1 100644 --- a/frontend/sync-client/src/persistence/settings.ts +++ b/frontend/sync-client/src/persistence/settings.ts @@ -38,7 +38,7 @@ export class Settings { >(); private settings: SyncSettings; - private readonly lock: Lock = new Lock(); + private readonly lock: Lock; public constructor( private readonly logger: Logger, @@ -50,6 +50,8 @@ export class Settings { ...(initialState ?? {}) }; + this.lock = new Lock(Settings.name, this.logger); + this.logger.debug( `Loaded settings: ${JSON.stringify(this.settings, null, 2)}` ); diff --git a/frontend/sync-client/src/services/fetch-controller.test.ts b/frontend/sync-client/src/services/fetch-controller.test.ts index 94fa8424..a1b791a6 100644 --- a/frontend/sync-client/src/services/fetch-controller.test.ts +++ b/frontend/sync-client/src/services/fetch-controller.test.ts @@ -3,7 +3,7 @@ import { describe, it, mock, beforeEach, afterEach } from "node:test"; import assert from "node:assert"; import { FetchController } from "./fetch-controller"; import { Logger } from "../tracing/logger"; -import { SyncResetError } from "./sync-reset-error"; +import { SyncResetError } from "../errors/sync-reset-error"; import { sleep } from "../utils/sleep"; describe("FetchController", () => { diff --git a/frontend/sync-client/src/services/fetch-controller.ts b/frontend/sync-client/src/services/fetch-controller.ts index 77b87e3a..e30739da 100644 --- a/frontend/sync-client/src/services/fetch-controller.ts +++ b/frontend/sync-client/src/services/fetch-controller.ts @@ -1,6 +1,6 @@ import type { Logger } from "../tracing/logger"; import { createPromise } from "../utils/create-promise"; -import { SyncResetError } from "./sync-reset-error"; +import { SyncResetError } from "../errors/sync-reset-error"; /** * Offers a resettable fetch implementation that waits until syncing is enabled @@ -25,18 +25,18 @@ export class FetchController { } /** - * Whether the fetch implementation can immediately send requests once outside of a reset. - */ + * Whether the fetch implementation can immediately send requests once outside of a reset. + */ public get canFetch(): boolean { return this._canFetch; } /** - * Allow or disallow fetching. The changes only take effect if not resetting. - * When called during a reset, its effect is deferred until the reset is finished. - * - * @param canFetch Whether fetching is enabled - */ + * Allow or disallow fetching. The changes only take effect if not resetting. + * When called during a reset, its effect is deferred until the reset is finished. + * + * @param canFetch Whether fetching is enabled + */ public set canFetch(canFetch: boolean) { this._canFetch = canFetch; @@ -59,9 +59,9 @@ export class FetchController { } /** - * Starts a reset, causing all ongoing and future fetches to be rejected - * with a SyncResetError until finishReset is called. - */ + * Starts a reset, causing all ongoing and future fetches to be rejected + * with a SyncResetError until finishReset is called. + */ public startReset(): void { this.isResetting = true; this.rejectUntil(new SyncResetError()); @@ -72,9 +72,9 @@ export class FetchController { } /** - * Finishes a reset, allowing fetches to proceed or wait again depending on - * the current sync settings. - */ + * Finishes a reset, allowing fetches to proceed or wait again depending on + * the current sync settings. + */ public finishReset(): void { if (!this.isResetting) { return; @@ -85,19 +85,19 @@ export class FetchController { } /** - * - * |------------------|---------------|-----------------------------------------------------| - * | | Sync enabled | Sync disabled | - * |------------------|-------------- |-----------------------------------------------------| - * | During reset | Rejects with SyncResetError without sending request | - * |------------------|-------------- |-----------------------------------------------------| - * | Outside of reset | Same as fetch | Blocks until sync is enabled and then same as fetch | - * |------------------|---------------|-----------------------------------------------------| - * - * @param logger for errors - * @param fetch to wrap - * @returns a wrapped fetch implementation affected by the FetchController state - */ + * + * |------------------|---------------|-----------------------------------------------------| + * | | Sync enabled | Sync disabled | + * |------------------|-------------- |-----------------------------------------------------| + * | During reset | Rejects with SyncResetError without sending request | + * |------------------|-------------- |-----------------------------------------------------| + * | Outside of reset | Same as fetch | Blocks until sync is enabled and then same as fetch | + * |------------------|---------------|-----------------------------------------------------| + * + * @param logger for errors + * @param fetch to wrap + * @returns a wrapped fetch implementation affected by the FetchController state + */ public getControlledFetchImplementation( logger: Logger, fetch: typeof globalThis.fetch = globalThis.fetch diff --git a/frontend/sync-client/src/services/server-config.ts b/frontend/sync-client/src/services/server-config.ts index 309c637c..da804b2f 100644 --- a/frontend/sync-client/src/services/server-config.ts +++ b/frontend/sync-client/src/services/server-config.ts @@ -1,6 +1,6 @@ import { SUPPORTED_API_VERSION } from "../consts"; -import { AuthenticationError } from "./authentication-error"; -import { ServerVersionMismatchError } from "./server-version-mismatch-error"; +import { AuthenticationError } from "../errors/authentication-error"; +import { ServerVersionMismatchError } from "../errors/server-version-mismatch-error"; import type { SyncService } from "./sync-service"; import type { PingResponse } from "./types/PingResponse"; @@ -34,11 +34,6 @@ export class ServerConfig { } } - // warm the cache - public async initialize(): Promise { - await this.getConfig(); - } - public async checkConnection(forceUpdate = false): Promise<{ isSuccessful: boolean; message: string; diff --git a/frontend/sync-client/src/services/sync-service.ts b/frontend/sync-client/src/services/sync-service.ts index 8190a638..647ac8da 100644 --- a/frontend/sync-client/src/services/sync-service.ts +++ b/frontend/sync-client/src/services/sync-service.ts @@ -8,7 +8,7 @@ import type { Logger } from "../tracing/logger"; import type { Settings } from "../persistence/settings"; import type { FetchController } from "./fetch-controller"; import { sleep } from "../utils/sleep"; -import { SyncResetError } from "./sync-reset-error"; +import { SyncResetError } from "../errors/sync-reset-error"; import type { SerializedError } from "./types/SerializedError"; import type { DocumentVersionWithoutContent } from "./types/DocumentVersionWithoutContent"; import type { DocumentUpdateResponse } from "./types/DocumentUpdateResponse"; @@ -66,19 +66,15 @@ export class SyncService { } public async create({ - documentId, relativePath, contentBytes }: { - documentId?: DocumentId; relativePath: RelativePath; contentBytes: Uint8Array; - }): Promise { + }): Promise { return this.retryForever(async () => { const formData = new FormData(); - if (documentId !== undefined) { - formData.append("document_id", documentId); - } + formData.append("relative_path", relativePath); formData.append( "content", @@ -86,7 +82,7 @@ export class SyncService { ); this.logger.debug( - `Creating document with id ${documentId} and relative path ${relativePath}` + `Creating document with relative path ${relativePath}` ); const response = await this.client(this.getUrl("/documents"), { @@ -103,8 +99,8 @@ export class SyncService { ); } - const result: DocumentVersionWithoutContent = - (await response.json()) as DocumentVersionWithoutContent; // eslint-disable-line @typescript-eslint/no-unsafe-type-assertion + const result: DocumentUpdateResponse = + (await response.json()) as DocumentUpdateResponse; // eslint-disable-line @typescript-eslint/no-unsafe-type-assertion this.logger.debug(`Created document ${JSON.stringify(result)}`); diff --git a/frontend/sync-client/src/services/types/ClientCursors.ts b/frontend/sync-client/src/services/types/ClientCursors.ts index 5b1ec040..e8c9b93d 100644 --- a/frontend/sync-client/src/services/types/ClientCursors.ts +++ b/frontend/sync-client/src/services/types/ClientCursors.ts @@ -1,4 +1,8 @@ // This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. import type { DocumentWithCursors } from "./DocumentWithCursors"; -export interface ClientCursors { userName: string, deviceId: string, documentsWithCursors: DocumentWithCursors[], } +export interface ClientCursors { + userName: string; + deviceId: string; + documentsWithCursors: DocumentWithCursors[]; +} diff --git a/frontend/sync-client/src/services/types/CreateDocumentVersion.ts b/frontend/sync-client/src/services/types/CreateDocumentVersion.ts index d4ed2831..17103be5 100644 --- a/frontend/sync-client/src/services/types/CreateDocumentVersion.ts +++ b/frontend/sync-client/src/services/types/CreateDocumentVersion.ts @@ -1,3 +1,6 @@ // This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. -export interface CreateDocumentVersion { relative_path: string, content: number[], } +export interface CreateDocumentVersion { + relative_path: string; + content: number[]; +} diff --git a/frontend/sync-client/src/services/types/CursorPositionFromClient.ts b/frontend/sync-client/src/services/types/CursorPositionFromClient.ts index 78823b5d..ee937f4e 100644 --- a/frontend/sync-client/src/services/types/CursorPositionFromClient.ts +++ b/frontend/sync-client/src/services/types/CursorPositionFromClient.ts @@ -1,4 +1,6 @@ // This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. import type { DocumentWithCursors } from "./DocumentWithCursors"; -export interface CursorPositionFromClient { documentsWithCursors: DocumentWithCursors[], } +export interface CursorPositionFromClient { + documentsWithCursors: DocumentWithCursors[]; +} diff --git a/frontend/sync-client/src/services/types/CursorPositionFromServer.ts b/frontend/sync-client/src/services/types/CursorPositionFromServer.ts index ed6ac7b2..52a24f27 100644 --- a/frontend/sync-client/src/services/types/CursorPositionFromServer.ts +++ b/frontend/sync-client/src/services/types/CursorPositionFromServer.ts @@ -1,4 +1,6 @@ // This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. import type { ClientCursors } from "./ClientCursors"; -export interface CursorPositionFromServer { clients: ClientCursors[], } +export interface CursorPositionFromServer { + clients: ClientCursors[]; +} diff --git a/frontend/sync-client/src/services/types/CursorSpan.ts b/frontend/sync-client/src/services/types/CursorSpan.ts index 7424067c..2cc2b7fc 100644 --- a/frontend/sync-client/src/services/types/CursorSpan.ts +++ b/frontend/sync-client/src/services/types/CursorSpan.ts @@ -1,3 +1,6 @@ // This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. -export interface CursorSpan { start: number, end: number, } +export interface CursorSpan { + start: number; + end: number; +} diff --git a/frontend/sync-client/src/services/types/DeleteDocumentVersion.ts b/frontend/sync-client/src/services/types/DeleteDocumentVersion.ts index f160406f..99ecc9e7 100644 --- a/frontend/sync-client/src/services/types/DeleteDocumentVersion.ts +++ b/frontend/sync-client/src/services/types/DeleteDocumentVersion.ts @@ -1,3 +1,5 @@ // This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. -export type DeleteDocumentVersion = Record; +export interface DeleteDocumentVersion { + relativePath: string; +} diff --git a/frontend/sync-client/src/services/types/DocumentUpdateResponse.ts b/frontend/sync-client/src/services/types/DocumentUpdateResponse.ts index 418117e6..7fd06c7a 100644 --- a/frontend/sync-client/src/services/types/DocumentUpdateResponse.ts +++ b/frontend/sync-client/src/services/types/DocumentUpdateResponse.ts @@ -5,4 +5,6 @@ import type { DocumentVersionWithoutContent } from "./DocumentVersionWithoutCont /** * Response to an update document request. */ -export type DocumentUpdateResponse = { "type": "FastForwardUpdate" } & DocumentVersionWithoutContent | { "type": "MergingUpdate" } & DocumentVersion; +export type DocumentUpdateResponse = + | ({ type: "FastForwardUpdate" } & DocumentVersionWithoutContent) + | ({ type: "MergingUpdate" } & DocumentVersion); diff --git a/frontend/sync-client/src/services/types/DocumentVersion.ts b/frontend/sync-client/src/services/types/DocumentVersion.ts index 3d50ae65..3b9aa37b 100644 --- a/frontend/sync-client/src/services/types/DocumentVersion.ts +++ b/frontend/sync-client/src/services/types/DocumentVersion.ts @@ -1,3 +1,12 @@ // This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. -export interface DocumentVersion { vaultUpdateId: number, documentId: string, relativePath: string, updatedDate: string, contentBase64: string, isDeleted: boolean, userId: string, deviceId: string, } +export interface DocumentVersion { + vaultUpdateId: number; + documentId: string; + relativePath: string; + updatedDate: string; + contentBase64: string; + isDeleted: boolean; + userId: string; + deviceId: string; +} diff --git a/frontend/sync-client/src/services/types/DocumentVersionWithoutContent.ts b/frontend/sync-client/src/services/types/DocumentVersionWithoutContent.ts index af064db8..4b24e7c5 100644 --- a/frontend/sync-client/src/services/types/DocumentVersionWithoutContent.ts +++ b/frontend/sync-client/src/services/types/DocumentVersionWithoutContent.ts @@ -1,3 +1,12 @@ // This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. -export interface DocumentVersionWithoutContent { vaultUpdateId: number, documentId: string, relativePath: string, updatedDate: string, isDeleted: boolean, userId: string, deviceId: string, contentSize: number, } +export interface DocumentVersionWithoutContent { + vaultUpdateId: number; + documentId: string; + relativePath: string; + updatedDate: string; + isDeleted: boolean; + userId: string; + deviceId: string; + contentSize: number; +} diff --git a/frontend/sync-client/src/services/types/DocumentWithCursors.ts b/frontend/sync-client/src/services/types/DocumentWithCursors.ts index e7dad119..dcfe6e2d 100644 --- a/frontend/sync-client/src/services/types/DocumentWithCursors.ts +++ b/frontend/sync-client/src/services/types/DocumentWithCursors.ts @@ -1,4 +1,9 @@ // This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. import type { CursorSpan } from "./CursorSpan"; -export interface DocumentWithCursors { vault_update_id: number | null, document_id: string, relative_path: string, cursors: CursorSpan[], } +export interface DocumentWithCursors { + vault_update_id: number | null; + document_id: string; + relative_path: string; + cursors: CursorSpan[]; +} diff --git a/frontend/sync-client/src/services/types/FetchLatestDocumentsResponse.ts b/frontend/sync-client/src/services/types/FetchLatestDocumentsResponse.ts index 3be625bd..315d701a 100644 --- a/frontend/sync-client/src/services/types/FetchLatestDocumentsResponse.ts +++ b/frontend/sync-client/src/services/types/FetchLatestDocumentsResponse.ts @@ -4,8 +4,10 @@ import type { DocumentVersionWithoutContent } from "./DocumentVersionWithoutCont /** * Response to a fetch latest documents request. */ -export interface FetchLatestDocumentsResponse { latestDocuments: DocumentVersionWithoutContent[], -/** - * The update ID of the latest document in the response. - */ -lastUpdateId: bigint, } +export interface FetchLatestDocumentsResponse { + latestDocuments: DocumentVersionWithoutContent[]; + /** + * The update ID of the latest document in the response. + */ + lastUpdateId: bigint; +} diff --git a/frontend/sync-client/src/services/types/PingResponse.ts b/frontend/sync-client/src/services/types/PingResponse.ts index ba8ceb48..f96520e9 100644 --- a/frontend/sync-client/src/services/types/PingResponse.ts +++ b/frontend/sync-client/src/services/types/PingResponse.ts @@ -3,22 +3,23 @@ /** * Response to a ping request. */ -export interface PingResponse { -/** - * Semantic version of the server. - */ -serverVersion: string, -/** - * Whether the client is authenticated based on the sent Authorization - * header. - */ -isAuthenticated: boolean, -/** - * List of file extensions that are allowed to be merged. - */ -mergeableFileExtensions: string[], -/** - * API version ensuring backwards & forwards compatibility between the client - * and server. - */ -supportedApiVersion: number, } +export interface PingResponse { + /** + * Semantic version of the server. + */ + serverVersion: string; + /** + * Whether the client is authenticated based on the sent Authorization + * header. + */ + isAuthenticated: boolean; + /** + * List of file extensions that are allowed to be merged. + */ + mergeableFileExtensions: string[]; + /** + * API version ensuring backwards & forwards compatibility between the client + * and server. + */ + supportedApiVersion: number; +} diff --git a/frontend/sync-client/src/services/types/SerializedError.ts b/frontend/sync-client/src/services/types/SerializedError.ts index 4389289e..ec1c4503 100644 --- a/frontend/sync-client/src/services/types/SerializedError.ts +++ b/frontend/sync-client/src/services/types/SerializedError.ts @@ -1,3 +1,7 @@ // This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. -export interface SerializedError { errorType: string, message: string, causes: string[], } +export interface SerializedError { + errorType: string; + message: string; + causes: string[]; +} diff --git a/frontend/sync-client/src/services/types/UpdateTextDocumentVersion.ts b/frontend/sync-client/src/services/types/UpdateTextDocumentVersion.ts index aeb69f5a..46f36bd0 100644 --- a/frontend/sync-client/src/services/types/UpdateTextDocumentVersion.ts +++ b/frontend/sync-client/src/services/types/UpdateTextDocumentVersion.ts @@ -1,3 +1,7 @@ // This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. -export interface UpdateTextDocumentVersion { parentVersionId: number, relativePath: string, content: (number | string)[], } +export interface UpdateTextDocumentVersion { + parentVersionId: number; + relativePath: string; + content: (number | string)[]; +} diff --git a/frontend/sync-client/src/services/types/WebSocketClientMessage.ts b/frontend/sync-client/src/services/types/WebSocketClientMessage.ts index 5765a0d0..9608f3af 100644 --- a/frontend/sync-client/src/services/types/WebSocketClientMessage.ts +++ b/frontend/sync-client/src/services/types/WebSocketClientMessage.ts @@ -2,4 +2,6 @@ import type { CursorPositionFromClient } from "./CursorPositionFromClient"; import type { WebSocketHandshake } from "./WebSocketHandshake"; -export type WebSocketClientMessage = { "type": "handshake" } & WebSocketHandshake | { "type": "cursorPositions" } & CursorPositionFromClient; +export type WebSocketClientMessage = + | ({ type: "handshake" } & WebSocketHandshake) + | ({ type: "cursorPositions" } & CursorPositionFromClient); diff --git a/frontend/sync-client/src/services/types/WebSocketHandshake.ts b/frontend/sync-client/src/services/types/WebSocketHandshake.ts index d25651f9..a2910f49 100644 --- a/frontend/sync-client/src/services/types/WebSocketHandshake.ts +++ b/frontend/sync-client/src/services/types/WebSocketHandshake.ts @@ -1,3 +1,7 @@ // This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. -export interface WebSocketHandshake { token: string, deviceId: string, lastSeenVaultUpdateId: number | null, } +export interface WebSocketHandshake { + token: string; + deviceId: string; + lastSeenVaultUpdateId: number | null; +} diff --git a/frontend/sync-client/src/services/types/WebSocketServerMessage.ts b/frontend/sync-client/src/services/types/WebSocketServerMessage.ts index 45e37358..fd250b7b 100644 --- a/frontend/sync-client/src/services/types/WebSocketServerMessage.ts +++ b/frontend/sync-client/src/services/types/WebSocketServerMessage.ts @@ -2,4 +2,6 @@ import type { CursorPositionFromServer } from "./CursorPositionFromServer"; import type { WebSocketVaultUpdate } from "./WebSocketVaultUpdate"; -export type WebSocketServerMessage = { "type": "vaultUpdate" } & WebSocketVaultUpdate | { "type": "cursorPositions" } & CursorPositionFromServer; +export type WebSocketServerMessage = + | ({ type: "vaultUpdate" } & WebSocketVaultUpdate) + | ({ type: "cursorPositions" } & CursorPositionFromServer); diff --git a/frontend/sync-client/src/services/types/WebSocketVaultUpdate.ts b/frontend/sync-client/src/services/types/WebSocketVaultUpdate.ts index 39e03b6f..f1ea0f80 100644 --- a/frontend/sync-client/src/services/types/WebSocketVaultUpdate.ts +++ b/frontend/sync-client/src/services/types/WebSocketVaultUpdate.ts @@ -1,4 +1,7 @@ // This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. import type { DocumentVersionWithoutContent } from "./DocumentVersionWithoutContent"; -export interface WebSocketVaultUpdate { documents: DocumentVersionWithoutContent[], isInitialSync: boolean, } +export interface WebSocketVaultUpdate { + documents: DocumentVersionWithoutContent[]; + isInitialSync: boolean; +} diff --git a/frontend/sync-client/src/services/websocket-manager.test.ts b/frontend/sync-client/src/services/websocket-manager.test.ts index fef901e7..3b61b5a1 100644 --- a/frontend/sync-client/src/services/websocket-manager.test.ts +++ b/frontend/sync-client/src/services/websocket-manager.test.ts @@ -4,8 +4,6 @@ import assert from "node:assert"; import { WebSocketManager } from "./websocket-manager"; import type { Logger } from "../tracing/logger"; import type { Settings } from "../persistence/settings"; -// eslint-disable-next-line @typescript-eslint/no-require-imports -const WebSocket = require("ws") as typeof globalThis.WebSocket; class MockCloseEvent extends Event { public code: number; @@ -91,10 +89,8 @@ function createMockFn unknown>( describe("WebSocketManager", () => { let mockLogger: Logger = undefined as unknown as Logger; let mockSettings: Settings = undefined as unknown as Settings; - let deviceId = "test-device-123"; beforeEach(() => { - deviceId = "test-device-123"; const noop = (): void => { // Intentionally empty for mock }; @@ -116,7 +112,6 @@ describe("WebSocketManager", () => { it("cleans up promises after message handling", async () => { const manager = new WebSocketManager( - deviceId, mockLogger, mockSettings, MockWebSocket as unknown as typeof WebSocket @@ -146,7 +141,6 @@ describe("WebSocketManager", () => { it("cleans up cursor position promises", async () => { const manager = new WebSocketManager( - deviceId, mockLogger, mockSettings, MockWebSocket as unknown as typeof WebSocket @@ -176,7 +170,6 @@ describe("WebSocketManager", () => { it("logs handshake send errors", async () => { const manager = new WebSocketManager( - deviceId, mockLogger, mockSettings, MockWebSocket as unknown as typeof WebSocket @@ -205,7 +198,6 @@ describe("WebSocketManager", () => { it("completes stop with timeout protection", async () => { const manager = new WebSocketManager( - deviceId, mockLogger, mockSettings, MockWebSocket as unknown as typeof WebSocket @@ -220,7 +212,6 @@ describe("WebSocketManager", () => { it("clears old handlers on reconnection", async () => { const manager = new WebSocketManager( - deviceId, mockLogger, mockSettings, MockWebSocket as unknown as typeof WebSocket @@ -257,7 +248,6 @@ describe("WebSocketManager", () => { it("tracks message handling promises", async () => { const manager = new WebSocketManager( - deviceId, mockLogger, mockSettings, MockWebSocket as unknown as typeof WebSocket diff --git a/frontend/sync-client/src/services/websocket-manager.ts b/frontend/sync-client/src/services/websocket-manager.ts index 09787bce..e99b8662 100644 --- a/frontend/sync-client/src/services/websocket-manager.ts +++ b/frontend/sync-client/src/services/websocket-manager.ts @@ -6,7 +6,10 @@ import type { CursorPositionFromClient } from "./types/CursorPositionFromClient" import type { ClientCursors } from "./types/ClientCursors"; import { createPromise } from "../utils/create-promise"; import type { WebSocketVaultUpdate } from "./types/WebSocketVaultUpdate"; -import { WEBSOCKET_DISCONNECT_TIMEOUT_IN_S } from "../consts"; +import { + WEBSOCKET_DISCONNECT_TIMEOUT_IN_SECONDS, + WEBSOCKET_CONNECTION_TIMEOUT_IN_SECONDS +} from "../consts"; import { removeFromArray } from "../utils/remove-from-array"; import { EventListeners } from "../utils/data-structures/event-listeners"; import { awaitAll } from "../utils/await-all"; @@ -27,32 +30,17 @@ export class WebSocketManager { private isStopped = true; private resolveDisconnectingPromise: null | (() => unknown) = null; private reconnectTimeoutId: ReturnType | undefined; + private connectionTimeoutId: ReturnType | undefined; private readonly outstandingPromises: Promise[] = []; private webSocket: WebSocket | undefined; - private readonly webSocketFactoryImplementation: typeof globalThis.WebSocket; public constructor( - private readonly deviceId: string, private readonly logger: Logger, private readonly settings: Settings, - webSocketImplementation?: typeof globalThis.WebSocket - ) { - if (webSocketImplementation) { - this.webSocketFactoryImplementation = webSocketImplementation; - } else { - if ( - typeof globalThis !== "undefined" && - typeof globalThis.WebSocket === "undefined" - ) { - // eslint-disable-next-line - this.webSocketFactoryImplementation = require("ws"); // polyfill for WebSocket in Node.js - } else { - this.webSocketFactoryImplementation = WebSocket; - } - } - } + private readonly webSocketFactoryImplementation: typeof globalThis.WebSocket = WebSocket + ) {} public get isWebSocketConnected(): boolean { return ( @@ -77,6 +65,11 @@ export class WebSocketManager { this.reconnectTimeoutId = undefined; } + if (this.connectionTimeoutId !== undefined) { + clearTimeout(this.connectionTimeoutId); + this.connectionTimeoutId = undefined; + } + this.webSocket?.close(1000, "WebSocketManager has been stopped"); // eslint-disable-next-line @typescript-eslint/init-declarations @@ -85,10 +78,10 @@ export class WebSocketManager { timeoutId = setTimeout(() => { reject( new Error( - `Timeout waiting for WebSocket to close after ${WEBSOCKET_DISCONNECT_TIMEOUT_IN_S} seconds` + `Timeout waiting for WebSocket to close after ${WEBSOCKET_DISCONNECT_TIMEOUT_IN_SECONDS} seconds` ) ); - }, WEBSOCKET_DISCONNECT_TIMEOUT_IN_S * 1000); + }, WEBSOCKET_DISCONNECT_TIMEOUT_IN_SECONDS * 1000); }); try { @@ -171,7 +164,10 @@ export class WebSocketManager { this.webSocket.onclose = null; this.webSocket.onmessage = null; this.webSocket.onerror = null; - this.webSocket.close(); + this.webSocket.close( + 1000, + "Closing previous WebSocket connection" + ); } catch (e) { this.logger.error( `Failed to close previous WebSocket connection: ${e}` @@ -187,7 +183,22 @@ export class WebSocketManager { this.webSocket = new this.webSocketFactoryImplementation(wsUri); + // Set connection timeout to handle cases where server is down and the WebSocket connection won't open + this.connectionTimeoutId = setTimeout(() => { + this.connectionTimeoutId = undefined; + this.logger.warn( + `WebSocket connection timeout after ${WEBSOCKET_CONNECTION_TIMEOUT_IN_SECONDS} seconds` + ); + // Force close to trigger onclose handler which will schedule reconnection + this.webSocket?.close(1000, "Connection timeout"); + }, WEBSOCKET_CONNECTION_TIMEOUT_IN_SECONDS * 1000); + this.webSocket.onopen = (): void => { + if (this.connectionTimeoutId !== undefined) { + clearTimeout(this.connectionTimeoutId); + this.connectionTimeoutId = undefined; + } + // Check if we've been stopped while connecting if (this.isStopped) { this.webSocket?.close( @@ -231,7 +242,18 @@ export class WebSocketManager { } }; + this.webSocket.onerror = (error): void => { + this.logger.warn( + `WebSocket error occurred: ${error instanceof ErrorEvent ? error.message : "Unknown error"}` + ); + }; + this.webSocket.onclose = (event): void => { + if (this.connectionTimeoutId !== undefined) { + clearTimeout(this.connectionTimeoutId); + this.connectionTimeoutId = undefined; + } + this.logger.warn( `WebSocket closed with code ${event.code} (${event.reason == "" ? "unknown reason" : event.reason})` ); @@ -241,10 +263,13 @@ export class WebSocketManager { this.resolveDisconnectingPromise?.(); this.resolveDisconnectingPromise = null; } else { + const delay = + this.settings.getSettings().webSocketRetryIntervalMs; + this.logger.info(`Reconnecting to WebSocket in ${delay}ms...`); this.reconnectTimeoutId = setTimeout(() => { this.reconnectTimeoutId = undefined; this.initializeWebSocket(); - }, this.settings.getSettings().webSocketRetryIntervalMs); + }, delay); } }; } diff --git a/frontend/sync-client/src/sync-client.ts b/frontend/sync-client/src/sync-client.ts index 2a272c86..db6ff902 100644 --- a/frontend/sync-client/src/sync-client.ts +++ b/frontend/sync-client/src/sync-client.ts @@ -29,7 +29,6 @@ import { ServerConfig } from "./services/server-config"; import type { EventListeners } from "./utils/data-structures/event-listeners"; export class SyncClient { - private hasStartedOfflineSync = false; private hasFinishedOfflineSync = false; private hasStarted = false; private hasBeenDestroyed = false; @@ -38,12 +37,12 @@ export class SyncClient { private readonly eventUnsubscribers: (() => void)[] = []; private constructor( + public readonly logger: Logger, private readonly history: SyncHistory, private readonly settings: Settings, private readonly database: Database, private readonly syncer: Syncer, private readonly webSocketManager: WebSocketManager, - public readonly logger: Logger, private readonly fetchController: FetchController, private readonly cursorTracker: CursorTracker, private readonly fileChangeNotifier: FileChangeNotifier, @@ -56,7 +55,7 @@ export class SyncClient { database: Partial; }> > - ) {} + ) { } public get documentCount(): number { return this.database.length; @@ -195,7 +194,6 @@ export class SyncClient { ); const webSocketManager = new WebSocketManager( - deviceId, logger, settings, webSocket @@ -206,7 +204,6 @@ export class SyncClient { logger, database, settings, - syncService, webSocketManager, fileOperations, unrestrictedSyncer @@ -214,18 +211,19 @@ export class SyncClient { const fileChangeNotifier = new FileChangeNotifier(); const cursorTracker = new CursorTracker( + logger, database, webSocketManager, fileOperations, fileChangeNotifier ); const client = new SyncClient( + logger, history, settings, database, syncer, webSocketManager, - logger, fetchController, cursorTracker, fileChangeNotifier, @@ -285,10 +283,10 @@ export class SyncClient { } /** - * Reload settings from disk overriding current in-memory settings. - * Missing values will be filled in from DEFAULT_SETTINGS rather than - * retaining current in-memory settings. - */ + * Reload settings from disk overriding current in-memory settings. + * Missing values will be filled in from DEFAULT_SETTINGS rather than + * retaining current in-memory settings. + */ public async reloadSettings(): Promise { this.checkIfDestroyed("reloadSettings"); @@ -320,10 +318,10 @@ export class SyncClient { } /** - * Wait for the in-flight operations to finish, reset all tracking, - * and the local database but retain the settings. - * The SyncClient can be used again after calling this method. - */ + * Wait for the in-flight operations to finish, reset all tracking, + * and the local database but retain the settings. + * The SyncClient can be used again after calling this method. + */ public async reset(): Promise { this.checkIfDestroyed("reset"); @@ -337,11 +335,12 @@ export class SyncClient { this.database.reset(); await this.database.save(); // ensure the new database reads as empty this.resetInMemoryState(); - this.hasStartedOfflineSync = false; this.hasFinishedOfflineSync = false; this.serverConfig.reset(); - await this.startSyncing(); + if (this.settings.getSettings().isSyncEnabled) { + await this.startSyncing(); + } } public getSettings(): SyncSettings { @@ -410,12 +409,7 @@ export class SyncClient { return DocumentSyncStatus.SYNCING; } - const document = - this.database.getLatestDocumentByRelativePath(relativePath); - if (document === undefined) { - return DocumentSyncStatus.SYNCING; - } - return document.updates.length > 0 + return this.syncer.hasPendingOperationsForDocument(relativePath) ? DocumentSyncStatus.SYNCING : DocumentSyncStatus.UP_TO_DATE; } @@ -436,9 +430,9 @@ export class SyncClient { } /** - * Completely destroy the SyncClient, cancelling all in-progress operations. - * After calling this method, the SyncClient cannot be used again. - */ + * Completely destroy the SyncClient, cancelling all in-progress operations. + * After calling this method, the SyncClient cannot be used again. + */ public async destroy(): Promise { this.checkIfDestroyed("destroy"); @@ -473,18 +467,17 @@ export class SyncClient { this.checkIfDestroyed("startSyncing"); this.fetchController.finishReset(); - await this.serverConfig.initialize(); - this.webSocketManager.start(); + // warm the cache + await this.serverConfig.getConfig(); - if (!this.hasStartedOfflineSync) { - this.hasStartedOfflineSync = true; - await this.syncer.scheduleSyncForOfflineChanges(); - } + await this.syncer.scheduleSyncForOfflineChanges(); + this.webSocketManager.start(); this.hasFinishedOfflineSync = true; } private async pause(): Promise { + this.hasFinishedOfflineSync = false; this.fetchController.startReset(); await this.webSocketManager.stop(); await this.waitUntilFinished(); diff --git a/frontend/sync-client/src/sync-operations/cursor-tracker.ts b/frontend/sync-client/src/sync-operations/cursor-tracker.ts index bdd7d9b7..589e4b3b 100644 --- a/frontend/sync-client/src/sync-operations/cursor-tracker.ts +++ b/frontend/sync-client/src/sync-operations/cursor-tracker.ts @@ -10,6 +10,7 @@ import { hash } from "../utils/hash"; import type { FileChangeNotifier } from "./file-change-notifier"; import { Lock } from "../utils/data-structures/locks"; import { EventListeners } from "../utils/data-structures/event-listeners"; +import { Logger } from "../tracing/logger"; // Cursor positions are updated separately from documents. However, a given cursor position is only // valid within a certain version of the document it belongs to. This class tracks previous and the latest @@ -22,7 +23,7 @@ export class CursorTracker { (cursors: MaybeOutdatedClientCursors[]) => unknown >(); - private readonly updateLock = new Lock(); + private readonly updateLock: Lock; private knownRemoteCursors: (ClientCursors & { upToDateness: DocumentUpToDateness; @@ -33,11 +34,14 @@ export class CursorTracker { []; public constructor( + private readonly logger: Logger, private readonly database: Database, private readonly webSocketManager: WebSocketManager, private readonly fileOperations: FileOperations, private readonly fileChangeNotifier: FileChangeNotifier ) { + this.updateLock = new Lock(CursorTracker.name, logger); + this.webSocketManager.onRemoteCursorsUpdateReceived.add( async (clientCursors) => { await this.updateLock.withLock(async () => { @@ -113,7 +117,7 @@ export class CursorTracker { documentsWithCursors.push({ relative_path: relativePath, - document_id: record.documentId, + document_id: record.metadata.documentId, vault_update_id: record.metadata.parentVersionId, cursors: cursors.map(({ start, end }) => ({ start: Math.min(start, end), diff --git a/frontend/sync-client/src/sync-operations/syncer.ts b/frontend/sync-client/src/sync-operations/syncer.ts index 71dedd85..05e3bdf0 100644 --- a/frontend/sync-client/src/sync-operations/syncer.ts +++ b/frontend/sync-client/src/sync-operations/syncer.ts @@ -4,17 +4,14 @@ import type { DocumentRecord, RelativePath } from "../persistence/database"; -import type { SyncService } from "../services/sync-service"; import type { Logger } from "../tracing/logger"; import PQueue from "p-queue"; import { hash } from "../utils/hash"; -import { v4 as uuidv4 } from "uuid"; import type { Settings } from "../persistence/settings"; import type { FileOperations } from "../file-operations/file-operations"; import { findMatchingFile } from "../utils/find-matching-file"; import type { UnrestrictedSyncer } from "./unrestricted-syncer"; -import { createPromise } from "../utils/create-promise"; -import { SyncResetError } from "../services/sync-reset-error"; +import { SyncResetError } from "../errors/sync-reset-error"; import { Locks } from "../utils/data-structures/locks"; import type { DocumentVersionWithoutContent } from "../services/types/DocumentVersionWithoutContent"; import type { WebSocketVaultUpdate } from "../services/types/WebSocketVaultUpdate"; @@ -28,7 +25,7 @@ export class Syncer { (remainingOperations: number) => unknown >(); - private readonly remoteDocumentsLock: Locks; + public readonly updatedDocumentsByPathAndKeysLocks: Locks; // can be DocumentId or RelativePath // FIFO to limit the number of concurrent sync operations private readonly syncQueue: PQueue; @@ -42,16 +39,18 @@ export class Syncer { private readonly logger: Logger, private readonly database: Database, private readonly settings: Settings, - private readonly syncService: SyncService, private readonly webSocketManager: WebSocketManager, private readonly operations: FileOperations, - private readonly internalSyncer: UnrestrictedSyncer + private readonly unrestrictedSyncer: UnrestrictedSyncer ) { this.syncQueue = new PQueue({ concurrency: settings.getSettings().syncConcurrency }); - this.remoteDocumentsLock = new Locks(this.logger); + this.updatedDocumentsByPathAndKeysLocks = new Locks( + Syncer.name, + this.logger + ); settings.onSettingsChanged.add((newSettings, oldSettings) => { if (newSettings.syncConcurrency !== oldSettings.syncConcurrency) { @@ -83,52 +82,50 @@ export class Syncer { return this._isFirstSyncComplete; } + public hasPendingOperationsForDocument(relativePath: string): boolean { + return this.updatedDocumentsByPathAndKeysLocks.isLocked(relativePath); + } + public async syncLocallyCreatedFile( relativePath: RelativePath ): Promise { + // check whether someone else has already created the document in the database if ( this.database.getLatestDocumentByRelativePath(relativePath) ?.isDeleted === false ) { + // This is likely a consequence of us creating a file because of a remote update + // which triggered a local create, so we don't need to do anything here. this.logger.debug( `Document ${relativePath} already exists in the database, skipping` ); return; } - const [promise, resolve, reject] = createPromise(); + const document = this.database.createNewPendingDocument(relativePath); - const id = uuidv4(); - const document = this.database.createNewPendingDocument( - id, - relativePath, - promise + await this.enqueueSyncOperation( + async () => + this.unrestrictedSyncer.unrestrictedSyncLocallyCreatedOrUpdatedFile( + { + document + } + ), + [relativePath] ); - - try { - await this.syncQueue.add(async () => - this.internalSyncer.unrestrictedSyncLocallyCreatedFile(document) - ); - - resolve(); - } catch (e) { - reject(e); - } finally { - this.database.removeDocumentPromise(promise); - } } public async syncLocallyDeletedFile( relativePath: RelativePath ): Promise { - if ( - this.database.getLatestDocumentByRelativePath(relativePath) - ?.isDeleted === true - ) { + let document = + this.database.getLatestDocumentByRelativePath(relativePath); + + if (document == null || document.isDeleted === true) { // This is must be a consequence of us deleting a file because of a remote update // which triggered a local delete, so we don't need to do anything here. this.logger.debug( - `Document ${relativePath} has already been markes as deleted, skipping` + `Document ${relativePath} has already been marked as deleted, skipping` ); return; } @@ -137,26 +134,13 @@ export class Syncer { // document which finishes after the delete has succeeded and would introduce a phantom metadata record. this.database.delete(relativePath); - const [promise, resolve, reject] = createPromise(); - - const document = await this.database.getResolvedDocumentByRelativePath( - relativePath, - promise - ); - - try { - await this.syncQueue.add(async () => - this.internalSyncer.unrestrictedSyncLocallyDeletedFile(document) + await this.enqueueSyncOperation(async () => { + await this.unrestrictedSyncer.unrestrictedSyncLocallyDeletedFile( + document ); - resolve(); - this.database.removeDocument(document); - } catch (e) { - reject(e); - } finally { - this.database.removeDocumentPromise(promise); - } + }, [document?.metadata?.documentId, relativePath]); } public async syncLocallyUpdatedFile({ @@ -166,38 +150,10 @@ export class Syncer { oldPath?: RelativePath; relativePath: RelativePath; }): Promise { - if (oldPath !== undefined) { - // We might have moved the document in the database before calling this method, - // in that case, we mustn't move it again. - if ( - this.database.getLatestDocumentByRelativePath(relativePath) === - undefined || - this.database.getLatestDocumentByRelativePath(relativePath) - ?.isDeleted === true - ) { - if (oldPath === relativePath) { - throw new Error( - `Old path and new path are the same: ${oldPath}` - ); - } - - this.database.move(oldPath, relativePath); - } - } - - let document = - this.database.getLatestDocumentByRelativePath(relativePath); - - if ( - oldPath !== undefined && - document?.metadata?.remoteRelativePath === relativePath - ) { - this.logger.debug( - `Document ${relativePath} has been moved as a result of a remote update, skipping sync` - ); - return; - } + const document = + this.database.getLatestDocumentByRelativePath(oldPath ?? relativePath); + // must have been removed after a successful delete if (document === undefined) { this.logger.debug( `Cannot find document ${relativePath} in the database, skipping` @@ -212,27 +168,47 @@ export class Syncer { return; } - const [promise, resolve, reject] = createPromise(); + const documentAtNewPath = + this.database.getLatestDocumentByRelativePath(relativePath); - document = await this.database.getResolvedDocumentByRelativePath( - relativePath, - promise - ); + if (oldPath !== undefined) { + // We might have moved the document in the database before calling this method, + // in that case, we mustn't move it again. + if ( + documentAtNewPath === undefined || + documentAtNewPath.isDeleted + ) { + if (oldPath === relativePath) { + throw new Error( + `Old path and new path are the same: ${oldPath}` + ); + } - try { - await this.syncQueue.add(async () => - this.internalSyncer.unrestrictedSyncLocallyUpdatedFile({ - oldPath, - document - }) - ); - - resolve(); - } catch (e) { - reject(e); - } finally { - this.database.removeDocumentPromise(promise); + this.database.move(oldPath, relativePath); + } } + + + if ( + oldPath !== undefined && + document?.metadata?.remoteRelativePath === relativePath + ) { + this.logger.debug( + `Document ${relativePath} has been moved as a result of a remote update, skipping sync` + ); + return; + } + + await this.enqueueSyncOperation( + async () => + this.unrestrictedSyncer.unrestrictedSyncLocallyCreatedOrUpdatedFile( + { + oldPath, + document + } + ), + [document.metadata?.documentId, relativePath, oldPath] + ); } public async scheduleSyncForOfflineChanges(): Promise { @@ -257,8 +233,6 @@ export class Syncer { `Not all local changes have been applied remotely: ${e}` ); throw e; - } finally { - this.runningScheduleSyncForOfflineChanges = undefined; } } @@ -271,6 +245,8 @@ export class Syncer { message: WebSocketVaultUpdate ): Promise { try { + await this.scheduleSyncForOfflineChanges(); + const handlerPromise = awaitAll( message.documents.map(async (document) => this.internalSyncRemotelyUpdatedFile(document) @@ -296,7 +272,7 @@ export class Syncer { public reset(): void { this._isFirstSyncComplete = false; this.syncQueue.clear(); - this.remoteDocumentsLock.reset(); + this.updatedDocumentsByPathAndKeysLocks.reset(); this.runningScheduleSyncForOfflineChanges = undefined; } @@ -313,86 +289,26 @@ export class Syncer { private async internalSyncRemotelyUpdatedFile( remoteVersion: DocumentVersionWithoutContent ): Promise { - let document = this.database.getDocumentByDocumentId( + const document = this.database.getDocumentByDocumentId( remoteVersion.documentId ); - - if (document === undefined) { - // Let's avoid the same documents getting created in parallel multiple times. - // There might be multiple tasks waiting for the lock - return this.remoteDocumentsLock.withLock( - remoteVersion.documentId, - async () => { - document = this.database.getDocumentByDocumentId( - remoteVersion.documentId - ); - - // We're either the first one to get the lock, so we have to create the document in `unrestrictedSyncRemotelyUpdatedFile` - if (document === undefined) { - await this.syncQueue.add(async () => - this.internalSyncer.unrestrictedSyncRemotelyUpdatedFile( - remoteVersion - ) - ); - } else { - const [promise, resolve, reject] = createPromise(); - - document = - await this.database.getResolvedDocumentByRelativePath( - document.relativePath, - promise - ); - - try { - await this.syncQueue.add(async () => - this.internalSyncer.unrestrictedSyncRemotelyUpdatedFile( - remoteVersion, - document - ) - ); - - resolve(); - } catch (e) { - reject(e); - } finally { - this.database.removeDocumentPromise(promise); - } - } - - this.database.addSeenUpdateId(remoteVersion.vaultUpdateId); - } - ); - } - - // We're either the first one to get the lock, so we have to create the document in `unrestrictedSyncRemotelyUpdatedFile` - const [promise, resolve, reject] = createPromise(); - - document = await this.database.getResolvedDocumentByRelativePath( - document.relativePath, - promise - ); - - try { - await this.syncQueue.add(async () => - this.internalSyncer.unrestrictedSyncRemotelyUpdatedFile( + await this.enqueueSyncOperation( + async () => + this.unrestrictedSyncer.unrestrictedSyncRemotelyUpdatedFile( remoteVersion, document - ) - ); - - resolve(); - } catch (e) { - reject(e); - } finally { - this.database.removeDocumentPromise(promise); - } + ), + [ + document?.relativePath, + remoteVersion.relativePath, + remoteVersion.documentId + ] + ); this.database.addSeenUpdateId(remoteVersion.vaultUpdateId); } private async internalScheduleSyncForOfflineChanges(): Promise { - await this.createFakeDocumentsFromRemoteState(); - const allLocalFiles = await this.operations.listFilesRecursively(); this.logger.info( `Scheduling sync for ${allLocalFiles.length} local files` @@ -409,7 +325,12 @@ export class Syncer { } } - await awaitAll( + interface Instruction { + type: "update" | "create"; + relativePath: string; + oldPath?: string; + } + const instructions: (Instruction | undefined)[] = await awaitAll( allLocalFiles.map(async (relativePath) => { if ( this.database.getLatestDocumentByRelativePath(relativePath) @@ -419,16 +340,24 @@ export class Syncer { `Document ${relativePath} might have been updated locally, scheduling sync to validate and update it` ); - return this.syncLocallyUpdatedFile({ - relativePath - }); + return { type: "update", relativePath } as Instruction; } // Perhaps the file has been moved; let's check by looking at the deleted files const contentHash = await this.syncQueue.add(async () => { - const contentBytes = - await this.operations.read(relativePath); // this can throw FileNotFoundError - return hash(contentBytes); + try { + const contentBytes = + await this.operations.read(relativePath); // this can throw FileNotFoundError + return hash(contentBytes); + } catch (e) { + if ( + e instanceof Error && + e.name === "FileNotFoundError" + ) { + return undefined; + } + throw e; + } }); if (contentHash == undefined) { @@ -454,18 +383,21 @@ export class Syncer { `Document '${originalFile.relativePath}' was not found under its current path in the database but was found under a different path (${relativePath}), scheduling sync to move it` ); - // We're outside of the pqueue, so we need to call the public wrapper - return this.syncLocallyUpdatedFile({ + return { + type: "update", oldPath: originalFile.relativePath, relativePath - }); + } as Instruction; } this.logger.debug( `Document ${relativePath} not found in database, scheduling sync to create it` ); - // We're outside of the pqueue, so we need to call the public wrapper - return this.syncLocallyCreatedFile(relativePath); + + return { + type: "create", + relativePath + } as Instruction; }) ); @@ -481,42 +413,49 @@ export class Syncer { return this.syncLocallyDeletedFile(relativePath); }) ); + + await awaitAll( + instructions.map(async (instruction) => { + if (instruction === undefined) { + return; + } + + if (instruction.type === "update") { + // We're outside of the pqueue, so we need to call the public wrapper + await this.syncLocallyUpdatedFile({ + oldPath: instruction.oldPath, + relativePath: instruction.relativePath + }); + return; + } + }) + ); + + // we have to ensure the deletes & updates have finished before starting creates, + // otherwise the server might return an existing document (that we're about to delete) + // instead of actually creating a new one + await awaitAll( + instructions.map(async (instruction) => { + if (instruction === undefined) { + return; + } + + if (instruction.type === "create") { + // We're outside of the pqueue, so we need to call the public wrapper + await this.syncLocallyCreatedFile(instruction.relativePath); + return; + } + }) + ); } - /** - * Create fake documents in the database for all files that are present locally - * and also exist remotely. This will stop the subequent syncs from duplicating - * the documents by creating the same documents from multiple clients. - */ - private async createFakeDocumentsFromRemoteState(): Promise { - if (this.database.getHasInitialSyncCompleted()) { - return; - } - - const [allLocalFiles, remote] = await awaitAll([ - this.operations.listFilesRecursively(), - this.syncQueue.add(async () => this.syncService.getAll()) - ]); - - if (remote !== undefined) { - remote.latestDocuments - .filter( - (remoteDocument) => - allLocalFiles.includes(remoteDocument.relativePath) && - !remoteDocument.isDeleted && - this.database.getDocumentByDocumentId( - remoteDocument.documentId - ) === undefined - ) - .forEach((remoteDocument) => { - this.database.createNewEmptyDocument( - remoteDocument.documentId, - remoteDocument.vaultUpdateId, - remoteDocument.relativePath - ); - }); - } - - this.database.setHasInitialSyncCompleted(true); + private async enqueueSyncOperation( + operation: () => Promise, + keys: (string | undefined | null)[] + ): Promise { + return this.updatedDocumentsByPathAndKeysLocks.withLock( + keys.filter((k) => k !== undefined && k !== null), + async () => this.syncQueue.add(operation) + ); } } diff --git a/frontend/sync-client/src/sync-operations/unrestricted-syncer.ts b/frontend/sync-client/src/sync-operations/unrestricted-syncer.ts index e3964d30..d32e983e 100644 --- a/frontend/sync-client/src/sync-operations/unrestricted-syncer.ts +++ b/frontend/sync-client/src/sync-operations/unrestricted-syncer.ts @@ -3,7 +3,6 @@ import type { DocumentRecord, RelativePath } from "../persistence/database"; - import { diff } from "reconcile-text"; import type { SyncService } from "../services/sync-service"; import type { Logger } from "../tracing/logger"; @@ -18,13 +17,11 @@ import type { } from "../tracing/sync-history"; import { SyncStatus, SyncType } from "../tracing/sync-history"; import { EMPTY_HASH, hash } from "../utils/hash"; - import { base64ToBytes } from "byte-base64"; import type { Settings } from "../persistence/settings"; import type { FileOperations } from "../file-operations/file-operations"; -import { createPromise } from "../utils/create-promise"; -import { FileNotFoundError } from "../file-operations/file-not-found-error"; -import { SyncResetError } from "../services/sync-reset-error"; +import { FileNotFoundError } from "../errors/file-not-found-error"; +import { SyncResetError } from "../errors/sync-reset-error"; import { globsToRegexes } from "../utils/globs-to-regexes"; import type { DocumentVersion } from "../services/types/DocumentVersion"; import type { DocumentUpdateResponse } from "../services/types/DocumentUpdateResponse"; @@ -60,65 +57,172 @@ export class UnrestrictedSyncer { }); } - public async unrestrictedSyncLocallyCreatedFile( - document: DocumentRecord - ): Promise { - const updateDetails: SyncCreateDetails = { - type: SyncType.CREATE, - relativePath: document.relativePath - }; + public async unrestrictedSyncLocallyCreatedOrUpdatedFile({ + oldPath, + // We use the same code path for both local and remote updates. We need to force the update + // if there are no local changes but we know that the remote version is newer. + force = false, + document + }: { + oldPath?: RelativePath; + force?: boolean; + document: DocumentRecord; + }): Promise { + const updateDetails: + | SyncCreateDetails + | SyncUpdateDetails + | SyncMovedDetails = + document.metadata === undefined + ? { + type: SyncType.CREATE, + relativePath: document.relativePath + } + : oldPath !== undefined + ? { + type: SyncType.MOVE, + relativePath: document.relativePath, + movedFrom: oldPath + } + : { + type: SyncType.UPDATE, + relativePath: document.relativePath + }; - return this.executeSync(updateDetails, async () => { + await this.executeSync(updateDetails, async () => { const originalRelativePath = document.relativePath; + if (document.isDeleted) { this.logger.debug( - `Document ${originalRelativePath} has been already deleted, no need to create it` + `Document ${document.relativePath} has been already deleted, no need to update it` ); return; } - const contentBytes = - await this.operations.read(originalRelativePath); // this can throw FileNotFoundError + const contentBytes = await this.operations.read( + document.relativePath + ); // this can throw FileNotFoundError const contentHash = hash(contentBytes); - const response = await this.syncService.create({ - documentId: document.documentId, - relativePath: originalRelativePath, - contentBytes - }); + let response: DocumentVersion | DocumentUpdateResponse | undefined = + undefined; + if (document.metadata === undefined) { + response = await this.syncService.create({ + relativePath: originalRelativePath, + contentBytes + }); - // In case a document with the same name (but different ID) had existed remotely that we haven't known about - if (response.relativePath != originalRelativePath) { - this.logger.debug( - `Document ${originalRelativePath} has been created remotely at a different path: ${response.relativePath}, moving it locally` - ); - await this.operations.move( - document.relativePath, - response.relativePath - ); // this can throw FileNotFoundError + await this.handleMaybeMergingResponse({ + document, + response, + contentHash, + originalRelativePath, + originalContentBytes: contentBytes, + isCreate: true + }); + } else { + const areThereLocalChanges = + document.metadata.hash !== contentHash || + oldPath !== undefined; + + if (areThereLocalChanges) { + const isText = + !isBinary(contentBytes) && + isFileTypeMergable( + document.relativePath, + (await this.serverConfig.getConfig()) + .mergeableFileExtensions + ); + const cachedVersion = this.contentCache.get( + document.metadata.parentVersionId + ); + + response = + isText && cachedVersion !== undefined + ? await this.syncService.putText({ + documentId: document.metadata.documentId, + parentVersionId: + document.metadata.parentVersionId, + relativePath: document.relativePath, + content: diff( + new TextDecoder().decode(cachedVersion), + new TextDecoder().decode(contentBytes) + ) + }) + : await this.syncService.putBinary({ + documentId: document.metadata.documentId, + parentVersionId: + document.metadata.parentVersionId, + relativePath: document.relativePath, + contentBytes + }); + } else { + if (!force) { + this.logger.debug( + `File hash of ${document.relativePath} matches with last synced version and the path hasn't changed; no need to sync` + ); + return; + } + + // we use this code path (force == true) to sync remotely updated files which have no local changes + response = await this.syncService.get({ + documentId: document.metadata.documentId + }); + } + + await this.handleMaybeMergingResponse({ + document, + response, + contentHash, + originalRelativePath, + originalContentBytes: contentBytes + }); } - this.database.updateDocumentMetadata( - { - parentVersionId: response.vaultUpdateId, - hash: contentHash, - remoteRelativePath: response.relativePath - }, - document - ); + if (!("type" in response) || response.type === "MergingUpdate") { + if (!force) { + this.history.addHistoryEntry({ + status: SyncStatus.SUCCESS, + details: updateDetails, + message: `The file we updated had been updated remotely, so we downloaded the merged version` + }); + return; + } + } - this.database.addSeenUpdateId(response.vaultUpdateId); - await this.updateCache( - response.vaultUpdateId, - contentBytes, - response.relativePath - ); + const actualUpdateDetails: SyncUpdateDetails | SyncMovedDetails = + oldPath !== undefined || + response.relativePath != originalRelativePath + ? { + type: SyncType.MOVE, + relativePath: response.relativePath, + movedFrom: originalRelativePath + } + : { + type: SyncType.UPDATE, + relativePath: response.relativePath + }; - this.history.addHistoryEntry({ - status: SyncStatus.SUCCESS, - details: updateDetails, - message: `Successfully uploaded locally created file` - }); + if (!response.isDeleted) { + this.history.addHistoryEntry({ + status: SyncStatus.SUCCESS, + details: actualUpdateDetails, + message: `Successfully downloaded remotely updated file from the server`, + author: response.userId, + timestamp: new Date(response.updatedDate) + }); + } else { + this.history.addHistoryEntry({ + status: SyncStatus.SUCCESS, + details: { + type: SyncType.DELETE, + relativePath: document.relativePath + }, + message: + "Successfully deleted file which had been deleted remotely", + author: response.userId, + timestamp: new Date(response.updatedDate) + }); + } }); } @@ -131,13 +235,21 @@ export class UnrestrictedSyncer { }; await this.executeSync(updateDetails, async () => { + if (document.metadata === undefined) { + this.logger.debug( + `Document ${document.relativePath} has never been synced, no need to delete it remotely` + ); + return; + } + const response = await this.syncService.delete({ - documentId: document.documentId, + documentId: document.metadata.documentId, relativePath: document.relativePath }); this.database.updateDocumentMetadata( { + documentId: response.documentId, parentVersionId: response.vaultUpdateId, hash: EMPTY_HASH, remoteRelativePath: document.relativePath @@ -156,214 +268,6 @@ export class UnrestrictedSyncer { }); } - public async unrestrictedSyncLocallyUpdatedFile({ - oldPath, - document, - // We use the same code path for both local and remote updates. We need to force the update - // if there are no local changes but we know that the remote version is newer. - force = false - }: { - oldPath?: RelativePath; - force?: boolean; - document: DocumentRecord; - }): Promise { - const updateDetails: SyncUpdateDetails | SyncMovedDetails = - oldPath !== undefined - ? { - type: SyncType.MOVE, - relativePath: document.relativePath, - movedFrom: oldPath - } - : { - type: SyncType.UPDATE, - relativePath: document.relativePath - }; - - await this.executeSync(updateDetails, async () => { - const originalRelativePath = document.relativePath; - - if (document.isDeleted || document.metadata === undefined) { - this.logger.debug( - `Document ${document.relativePath} has been already deleted, no need to update it` - ); - return; - } - - const contentBytes = await this.operations.read( - document.relativePath - ); // this can throw FileNotFoundError - let contentHash = hash(contentBytes); - - const areThereLocalChanges = !( - document.metadata.hash === contentHash && oldPath === undefined - ); - - let response: DocumentVersion | DocumentUpdateResponse | undefined = - undefined; - - if (areThereLocalChanges) { - const isText = - !isBinary(contentBytes) && - isFileTypeMergable( - document.relativePath, - (await this.serverConfig.getConfig()) - .mergeableFileExtensions - ); - const cachedVersion = this.contentCache.get( - document.metadata.parentVersionId - ); - - response = - isText && cachedVersion !== undefined - ? await this.syncService.putText({ - documentId: document.documentId, - parentVersionId: - document.metadata.parentVersionId, - relativePath: document.relativePath, - content: diff( - new TextDecoder().decode(cachedVersion), - new TextDecoder().decode(contentBytes) - ) - }) - : await this.syncService.putBinary({ - documentId: document.documentId, - parentVersionId: - document.metadata.parentVersionId, - relativePath: document.relativePath, - contentBytes - }); - } else { - if (!force) { - this.logger.debug( - `File hash of ${document.relativePath} matches with last synced version and the path hasn't changed; no need to sync` - ); - return; - } - - response = await this.syncService.get({ - documentId: document.documentId - }); - } - - // `document` is mutable and reflects the latest state in the local database - // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition - if (document.isDeleted) { - this.logger.info( - `Document ${document.relativePath} has been deleted before we could finish updating it` - ); - this.database.addSeenUpdateId(response.vaultUpdateId); - return; - } - - if ( - // `Syncer` creates fake local document metadata for all remote docs with invalid hashes. The parent IDs will likely match - // the latest versions so we still need to update the local versions to turn the fakes into real metadata. - document.metadata.parentVersionId > response.vaultUpdateId - ) { - this.logger.debug( - `Document ${document.relativePath} is already more up to date than the fetched version` - ); - this.database.addSeenUpdateId(response.vaultUpdateId); // in case the previous `vaultUpdateId` update hasn't made it through - return; - } - - if (response.isDeleted) { - return this.applyRemoteDeleteLocally(document, response); - } - - let actualPath = document.relativePath; - - if (response.relativePath != originalRelativePath) { - actualPath = response.relativePath; - // Make sure to update the remote relative path to avoid uploading - // the file as a result of this filesystem event. - document.metadata.remoteRelativePath = response.relativePath; - await this.operations.move( - document.relativePath, - response.relativePath - ); // this can throw FileNotFoundError - } - - if (!("type" in response) || response.type === "MergingUpdate") { - const responseBytes = base64ToBytes(response.contentBase64); - contentHash = hash(responseBytes); - - this.database.updateDocumentMetadata( - { - parentVersionId: response.vaultUpdateId, - hash: contentHash, - remoteRelativePath: response.relativePath - }, - document - ); - await this.operations.write( - actualPath, - contentBytes, - responseBytes - ); - await this.updateCache( - response.vaultUpdateId, - responseBytes, - actualPath - ); - - if (!force) { - this.history.addHistoryEntry({ - status: SyncStatus.SUCCESS, - details: updateDetails, - message: `The file we updated had been updated remotely, so we downloaded the merged version` - }); - } - } else { - this.database.updateDocumentMetadata( - { - parentVersionId: response.vaultUpdateId, - hash: contentHash, - remoteRelativePath: response.relativePath - }, - document - ); - await this.updateCache( - response.vaultUpdateId, - contentBytes, - actualPath - ); - } - - this.database.addSeenUpdateId(response.vaultUpdateId); - - const actualUpdateDetails: SyncUpdateDetails | SyncMovedDetails = - oldPath !== undefined || - response.relativePath != originalRelativePath - ? { - type: SyncType.MOVE, - relativePath: response.relativePath, - movedFrom: originalRelativePath - } - : { - type: SyncType.UPDATE, - relativePath: response.relativePath - }; - - if (areThereLocalChanges) { - this.history.addHistoryEntry({ - status: SyncStatus.SUCCESS, - details: actualUpdateDetails, - message: `Successfully uploaded locally updated file to the server`, - author: response.userId - }); - } else { - this.history.addHistoryEntry({ - status: SyncStatus.SUCCESS, - details: actualUpdateDetails, - message: `Successfully downloaded remotely updated file from the server`, - author: response.userId, - timestamp: new Date(response.updatedDate) - }); - } - }); - } - public async unrestrictedSyncRemotelyUpdatedFile( remoteVersion: DocumentVersionWithoutContent, document?: DocumentRecord @@ -382,13 +286,13 @@ export class UnrestrictedSyncer { remoteVersion.vaultUpdateId ) { this.logger.debug( - `Document ${remoteVersion.relativePath} is already at least as up to date as the fetched version` + `Document ${document.relativePath} is already at least as up-to-date as the fetched version` ); return; } - return this.unrestrictedSyncLocallyUpdatedFile({ + return this.unrestrictedSyncLocallyCreatedOrUpdatedFile({ document, force: true }); @@ -434,17 +338,15 @@ export class UnrestrictedSyncer { await this.operations.ensureClearPath(remoteVersion.relativePath); - const [promise, resolve] = createPromise(); this.database.updateDocumentMetadata( { + documentId: remoteVersion.documentId, parentVersionId: remoteVersion.vaultUpdateId, hash: hash(contentBytes), remoteRelativePath: remoteVersion.relativePath }, this.database.createNewPendingDocument( - remoteVersion.documentId, - remoteVersion.relativePath, - promise + remoteVersion.relativePath ) ); @@ -458,9 +360,6 @@ export class UnrestrictedSyncer { remoteVersion.relativePath ); - resolve(); - this.database.removeDocumentPromise(promise); - this.history.addHistoryEntry({ status: SyncStatus.SUCCESS, details: updateDetails, @@ -471,10 +370,17 @@ export class UnrestrictedSyncer { }); } - public async executeSync( + private async executeSync( details: SyncDetails, fn: () => Promise ): Promise { + if (!this.settings.getSettings().isSyncEnabled) { + this.logger.info( + `Skipping sync operation for file '${details.relativePath}' because sync is disabled` + ); + return; + } + for (const pattern of this.ignorePatterns) { if (pattern.test(details.relativePath)) { this.logger.debug( @@ -528,6 +434,127 @@ export class UnrestrictedSyncer { } } + private async handleMaybeMergingResponse({ + document, + response, + contentHash, + originalRelativePath, + originalContentBytes, + isCreate + }: { + document: DocumentRecord; + response: DocumentVersion | DocumentUpdateResponse; + contentHash: string; + originalRelativePath: string; + originalContentBytes: Uint8Array; + isCreate?: boolean; + }): Promise { + // `document` is mutable and reflects the latest state in the local database + if (document.isDeleted) { + this.logger.info( + `Document ${document.relativePath} has been deleted before we could finish updating it` + ); + this.database.addSeenUpdateId(response.vaultUpdateId); + return; + } + + if ( + (document.metadata?.parentVersionId ?? 0) > response.vaultUpdateId + ) { + this.logger.debug( + `Document ${document.relativePath} is already more up to date than the fetched version` + ); + this.database.addSeenUpdateId(response.vaultUpdateId); // in case the previous `vaultUpdateId` update hasn't made it through + return; + } + + if (response.isDeleted) { + return this.applyRemoteDeleteLocally(document, response); + } + + let actualPath = document.relativePath; + + if (isCreate) { + // We have a file locally that got moved by another client to the same path as the one we're trying to create. + // The server returns a merging update for the document ID that already exists locally (but at another path). + // We have to merge these two documents by extending the provenance of the existing document and deleting + // the old document that the new document already contains the content for. + const existingDocument = this.database.getDocumentByDocumentId( + response.documentId + ); + if (existingDocument !== undefined) { + this.logger.info( + `Merging existing document ${existingDocument.relativePath} into ${document.relativePath + } after concurrent move & creation` + ); + if (!existingDocument.isDeleted) { + this.database.delete(existingDocument.relativePath); // make sure syncLocallyDeletedFile doesn't actually schedule deleting the new file + this.database.removeDocument(existingDocument); + await this.operations.move(existingDocument.relativePath, document.relativePath); + } else { + this.database.removeDocument(existingDocument); + } + } + } + + // this can't happen on the creation path as we can only get a merging response if a document already exists remotely on the same path + if (response.relativePath != originalRelativePath) { + actualPath = response.relativePath; + // Make sure to update the remote relative path to avoid uploading + // the file as a result of this filesystem event. + if (document.metadata !== undefined) { + document.metadata.remoteRelativePath = response.relativePath; + } + await this.operations.move( + document.relativePath, + response.relativePath + ); // this can throw FileNotFoundError + } + + if (!("type" in response) || response.type === "MergingUpdate") { + const responseBytes = base64ToBytes(response.contentBase64); + contentHash = hash(responseBytes); + + this.database.updateDocumentMetadata( + { + documentId: response.documentId, + parentVersionId: response.vaultUpdateId, + hash: contentHash, + remoteRelativePath: response.relativePath + }, + document + ); + + await this.operations.write( + actualPath, + originalContentBytes, + responseBytes + ); + await this.updateCache( + response.vaultUpdateId, + responseBytes, + actualPath + ); + } else { + this.database.updateDocumentMetadata( + { + documentId: response.documentId, + parentVersionId: response.vaultUpdateId, + hash: contentHash, + remoteRelativePath: response.relativePath + }, + document + ); + await this.updateCache( + response.vaultUpdateId, + originalContentBytes, + actualPath + ); + } + + this.database.addSeenUpdateId(response.vaultUpdateId); + } + private getHistoryEntryForSkippedOversizedFile( sizeInBytes: number, relativePath: RelativePath @@ -541,9 +568,8 @@ export class UnrestrictedSyncer { type: SyncType.SKIPPED, relativePath }, - message: `File size of ${sizeInMB} MB exceeds the maximum file size limit of ${ - maxFileSizeMB - } MB` + message: `File size of ${sizeInMB} MB exceeds the maximum file size limit of ${maxFileSizeMB + } MB` }; } } @@ -568,20 +594,10 @@ export class UnrestrictedSyncer { document: DocumentRecord, response: DocumentVersion | DocumentUpdateResponse ): Promise { - this.history.addHistoryEntry({ - status: SyncStatus.SUCCESS, - details: { - type: SyncType.DELETE, - relativePath: document.relativePath - }, - message: "File has been deleted remotely, so we deleted it locally", - author: response.userId, - timestamp: new Date(response.updatedDate) - }); - this.database.delete(document.relativePath); this.database.updateDocumentMetadata( { + documentId: response.documentId, parentVersionId: response.vaultUpdateId, hash: EMPTY_HASH, remoteRelativePath: response.relativePath diff --git a/frontend/sync-client/src/utils/await-all.ts b/frontend/sync-client/src/utils/await-all.ts index 9406a6b8..43e06ce6 100644 --- a/frontend/sync-client/src/utils/await-all.ts +++ b/frontend/sync-client/src/utils/await-all.ts @@ -9,7 +9,7 @@ type ResolvedTuple = { export const awaitAll = async ( promises: PromiseTuple ): Promise> => { - // eslint-disable-next-line no-restricted-properties + // eslint-disable-next-line no-restricted-properties, @typescript-eslint/await-thenable const result = await Promise.allSettled(promises); for (const res of result) { if (res.status === "rejected") { diff --git a/frontend/sync-client/src/utils/create-client-id.ts b/frontend/sync-client/src/utils/create-client-id.ts index cfa132da..03dc2ae9 100644 --- a/frontend/sync-client/src/utils/create-client-id.ts +++ b/frontend/sync-client/src/utils/create-client-id.ts @@ -1,5 +1,3 @@ -import { v4 as uuidv4 } from "uuid"; - export function createClientId(): string { // @ts-expect-error, injected by webpack const packageVersion = __CURRENT_VERSION__; // eslint-disable-line @@ -8,8 +6,8 @@ export function createClientId(): string { typeof navigator !== "undefined" ? navigator.platform // eslint-disable-line @typescript-eslint/no-deprecated : typeof process !== "undefined" - ? process.platform - : "unknown"; + ? process.platform + : "unknown"; - return `vault-link/${packageVersion} (${uuidv4()}; ${platform})`; + return `vault-link/${packageVersion} (${Math.round(Math.random() * 1e10)}; ${platform})`; } diff --git a/frontend/sync-client/src/utils/data-structures/locks.test.ts b/frontend/sync-client/src/utils/data-structures/locks.test.ts index 9beb867a..1ea633cc 100644 --- a/frontend/sync-client/src/utils/data-structures/locks.test.ts +++ b/frontend/sync-client/src/utils/data-structures/locks.test.ts @@ -5,18 +5,20 @@ import type { RelativePath } from "../../persistence/database"; import { Locks } from "./locks"; import { awaitAll } from "../await-all"; import { sleep } from "../sleep"; -import { SyncResetError } from "../../services/sync-reset-error"; +import { SyncResetError } from "../../errors/sync-reset-error"; describe("withLock", () => { const testPath: RelativePath = "test/document/path"; const testPath2: RelativePath = "test/document/path2"; + const testPath3: RelativePath = "test/document/path3"; + const logger = new Logger(); // eslint-disable-next-line @typescript-eslint/init-declarations let locks: Locks; beforeEach(() => { - locks = new Locks(logger); + locks = new Locks("locks-test", logger); }); it("should execute function with single key lock", async () => { @@ -56,22 +58,32 @@ describe("withLock", () => { it("should sort multiple keys to prevent deadlocks", async () => { const executionOrder: string[] = []; - // Start two concurrent operations with keys in different orders - const promise1 = locks.withLock([testPath2, testPath], async () => { - executionOrder.push("operation1-start"); - await sleep(50); - executionOrder.push("operation1-end"); - return "result1"; - }); + await locks.waitForLock(testPath); - const promise2 = locks.withLock([testPath, testPath2], async () => { - executionOrder.push("operation2-start"); - await sleep(50); - executionOrder.push("operation2-end"); - return "result2"; - }); + const promise = awaitAll([ + locks.withLock([testPath2, testPath3, testPath], async () => { + executionOrder.push("operation1-start"); + executionOrder.push("operation1-end"); + return "result1"; + }), - const [result1, result2] = await awaitAll([promise1, promise2]); + locks.withLock([testPath3, testPath, testPath2], async () => { + executionOrder.push("operation2-start"); + executionOrder.push("operation2-end"); + return "result2"; + }) + ]); + + locks.unlock(testPath); + + const [result1, result2] = await Promise.race([ + promise, + new Promise((_, reject) => { + setTimeout(() => { + reject(new Error("Deadlock detected")); + }, 1000); + }) + ]); assert.strictEqual(result1, "result1"); assert.strictEqual(result2, "result2"); @@ -234,13 +246,14 @@ describe("withLock", () => { describe("reset", () => { const testPath: RelativePath = "test/document/path"; + const testPath2: RelativePath = "test/document/path2"; const logger = new Logger(); // eslint-disable-next-line @typescript-eslint/init-declarations let locks: Locks; beforeEach(() => { - locks = new Locks(logger); + locks = new Locks("locks-test", logger); }); it("should reject pending waiters with SyncResetError while running operation completes", async () => { @@ -252,7 +265,7 @@ describe("reset", () => { await sleep(1); const secondPromise = locks.withLock(testPath, async () => "second"); - void secondPromise.catch(() => {}); // eslint-disable-line @typescript-eslint/no-empty-function + void secondPromise.catch(() => { }); // eslint-disable-line @typescript-eslint/no-empty-function locks.reset(); @@ -273,7 +286,7 @@ describe("reset", () => { await sleep(1); const secondPromise = locks.withLock(testPath, async () => "second"); - void secondPromise.catch(() => {}); // eslint-disable-line @typescript-eslint/no-empty-function + void secondPromise.catch(() => { }); // eslint-disable-line @typescript-eslint/no-empty-function locks.reset(); @@ -289,4 +302,38 @@ describe("reset", () => { const result = await locks.withLock(testPath, () => "success"); assert.strictEqual(result, "success"); }); + + it("should release partially acquired locks when reset interrupts multi-key acquisition", async () => { + // Hold testPath2 so multi-key acquisition will block on it + await locks.waitForLock(testPath2); + + // Start multi-key lock that will acquire testPath first, then block on testPath2 + const multiKeyPromise = locks.withLock( + [testPath, testPath2], + async () => "multi" + ); + void multiKeyPromise.catch(() => { }); // eslint-disable-line @typescript-eslint/no-empty-function + + // Wait for the multi-key operation to acquire testPath and start waiting on testPath2 + await sleep(10); + + // Reset should reject the waiting operation + locks.reset(); + + await assert.rejects(multiKeyPromise, (err: Error) => { + assert.ok(err instanceof SyncResetError); + return true; + }); + + // The key that was already acquired (testPath) should now be released + // This would hang/timeout if the lock was leaked + const result = await Promise.race([ + locks.withLock(testPath, () => "success"), + sleep(100).then(() => { + throw new Error("Lock was not released - deadlock detected"); + }) + ]); + + assert.strictEqual(result, "success"); + }); }); diff --git a/frontend/sync-client/src/utils/data-structures/locks.ts b/frontend/sync-client/src/utils/data-structures/locks.ts index e55c76b0..4e512869 100644 --- a/frontend/sync-client/src/utils/data-structures/locks.ts +++ b/frontend/sync-client/src/utils/data-structures/locks.ts @@ -1,6 +1,5 @@ -import { SyncResetError } from "../../services/sync-reset-error"; +import { SyncResetError } from "../../errors/sync-reset-error"; import type { Logger } from "../../tracing/logger"; -import { awaitAll } from "../await-all"; /** * Manages exclusive locks on items to prevent concurrent modifications. @@ -8,47 +7,50 @@ import { awaitAll } from "../await-all"; * * @template T The type of the key used for locking */ +/** Waiter entry with callbacks */ +interface WaiterEntry { + resolve: () => unknown; + reject: (err: unknown) => unknown; +} + export class Locks { /** Currently locked keys */ private readonly locked = new Set(); - /** Queue of resolve functions waiting for each key */ - private readonly waiters = new Map< - T, - [() => unknown, (err: unknown) => unknown][] - >(); + /** Queue of waiters for each key */ + private readonly waiters = new Map[]>(); - public constructor(private readonly logger?: Logger) {} + public constructor(private readonly name: string, private readonly logger?: Logger) { } /** - * Executes a function while holding exclusive locks on one or more keys. - * - * This method ensures that the provided function runs with exclusive access to the - * specified key(s). Multiple keys are sorted to prevent deadlocks when different - * operations request the same keys in different orders. - * - * @template R The return type of the function to execute - * @param keyOrKeys A single key or array of keys to lock during function execution - * @param fn The function to execute while holding the lock(s). Can be sync or async. - * @returns A Promise that resolves to the return value of the executed function - * - * @example - * ```typescript - * // Lock a single key - * const result = await locks.withLock('file1', () => { - * // Critical section - only one operation can access 'file1' at a time - * return processFile('file1'); - * }); - * - * // Lock multiple keys (prevents deadlocks through consistent ordering) - * await locks.withLock(['file1', 'file2'], async () => { - * // Critical section - exclusive access to both files - * await moveFile('file1', 'file2'); - * }); - * ``` - * - * @throws Any error thrown by the provided function will be propagated after locks are released - */ + * Executes a function while holding exclusive locks on one or more keys. + * + * This method ensures that the provided function runs with exclusive access to the + * specified key(s). Multiple keys are sorted to prevent deadlocks when different + * operations request the same keys in different orders. + * + * @template R The return type of the function to execute + * @param keyOrKeys A single key or array of keys to lock during function execution + * @param fn The function to execute while holding the lock(s). Can be sync or async. + * @returns A Promise that resolves to the return value of the executed function + * + * @example + * ```typescript + * // Lock a single key + * const result = await locks.withLock('file1', () => { + * // Critical section - only one operation can access 'file1' at a time + * return processFile('file1'); + * }); + * + * // Lock multiple keys (prevents deadlocks through consistent ordering) + * await locks.withLock(['file1', 'file2'], async () => { + * // Critical section - exclusive access to both files + * await moveFile('file1', 'file2'); + * }); + * ``` + * + * @throws Any error thrown by the provided function will be propagated after locks are released + */ public async withLock( keyOrKeys: T | T[], fn: () => R | Promise @@ -59,12 +61,17 @@ export class Locks { const uniqueKeys = Array.from(new Set(keys)); uniqueKeys.sort((a, b) => String(a).localeCompare(String(b))); // Ensure consistent order to prevent deadlocks - await awaitAll(uniqueKeys.map(async (key) => this.waitForLock(key))); - + const lockedKeys = []; try { + for (const key of uniqueKeys) { + // Must acquire locks in-order (not concurrently) to prevent deadlocks + await this.waitForLock(key); + lockedKeys.push(key); + } + return await fn(); } finally { - uniqueKeys.forEach((key) => { + lockedKeys.forEach((key) => { this.unlock(key); }); } @@ -74,7 +81,7 @@ export class Locks { // Resolve all waiting promises before clearing to prevent deadlock // Any operation waiting for a lock will be granted access immediately for (const waiting of this.waiters.values()) { - for (const [_, reject] of waiting) { + for (const { reject } of waiting) { reject(new SyncResetError()); } } @@ -82,13 +89,17 @@ export class Locks { this.waiters.clear(); } + public isLocked(key: T): boolean { + return this.locked.has(key); + } + /** - * Attempts to acquire a lock immediately without waiting. - * Must call `unlock()` if successful. - * - * @param key The key to lock - * @returns `true` if lock acquired, `false` if already locked - */ + * Attempts to acquire a lock immediately without waiting. + * Must call `unlock()` if successful. + * + * @param key The key to lock + * @returns `true` if lock acquired, `false` if already locked + */ public tryLock(key: T): boolean { if (this.locked.has(key)) { return false; @@ -100,18 +111,18 @@ export class Locks { } /** - * Waits to acquire a lock, blocking until available. - * Operations are queued in FIFO order. Must call `unlock()` when done. - * - * @param key The key to wait for and lock - * @returns Promise that resolves when lock is acquired - */ + * Waits to acquire a lock, blocking until available. + * Operations are queued in FIFO order. Must call `unlock()` when done. + * + * @param key The key to wait for and lock + * @returns Promise that resolves when lock is acquired + */ public async waitForLock(key: T): Promise { if (this.tryLock(key)) { return Promise.resolve(); } - this.logger?.debug(`Waiting for lock on ${key}`); + this.logger?.debug(`Waiting for lock '${this.name}' on '${key}'`); return new Promise((resolve, reject) => { // DefaultDict behavior @@ -121,28 +132,36 @@ export class Locks { this.waiters.set(key, waiting); } - waiting.push([resolve, reject]); + waiting.push({ + resolve, + reject, + }); }); } /** - * Releases a lock and grants access to the next waiting operation in FIFO order. - * Removes the key from locked set if no waiters. - * - * @param key The key to unlock - * @throws {Error} If key is not currently locked - */ + * Releases a lock and grants access to the next waiting operation in FIFO order. + * Removes the key from locked set if no waiters. + * + * @param key The key to unlock + * @throws {Error} If key is not currently locked + */ public unlock(key: T): void { if (!this.locked.has(key)) { + this.logger?.debug( + `Attempted to unlock '${this.name}' on '${key}' which is not locked` + ); return; } - // Remove first waiter to ensure FIFO order - const [resolveNextWaiting, _] = this.waiters.get(key)?.shift() ?? []; + this.logger?.debug(`Releasing lock '${this.name}' on '${key}'`); - if (resolveNextWaiting) { - this.logger?.debug(`Granted lock on ${key}`); - resolveNextWaiting(); + // Remove first waiter to ensure FIFO order + const nextWaiter = this.waiters.get(key)?.shift(); + + if (nextWaiter) { + this.logger?.debug(`Granted lock '${this.name}' on '${key}'`); + nextWaiter.resolve(); } else { this.locked.delete(key); } @@ -152,8 +171,8 @@ export class Locks { export class Lock { private readonly locks: Locks; - public constructor(logger?: Logger) { - this.locks = new Locks(logger); + public constructor(name: string, logger?: Logger) { + this.locks = new Locks(name, logger); } public async withLock(fn: () => R | Promise): Promise { diff --git a/frontend/sync-client/src/utils/debugging/in-memory-file-system.ts b/frontend/sync-client/src/utils/debugging/in-memory-file-system.ts new file mode 100644 index 00000000..a2564b3e --- /dev/null +++ b/frontend/sync-client/src/utils/debugging/in-memory-file-system.ts @@ -0,0 +1,69 @@ +import type { RelativePath } from "../../persistence/database"; +import type { TextWithCursors } from "reconcile-text"; +import type { FileSystemOperations } from "../../file-operations/filesystem-operations"; + +export class InMemoryFileSystem implements FileSystemOperations { + protected readonly files = new Map(); + + public async listFilesRecursively( + _root: RelativePath | undefined = undefined // we don't use multi-level paths during tests + ): Promise { + return Array.from(this.files.keys()); + } + + public async read(path: RelativePath): Promise { + const file = this.files.get(path); + if (!file) { + throw new Error(`File ${path} does not exist`); + } + return file; + } + + public async write(path: RelativePath, content: Uint8Array): Promise { + this.files.set(path, content); + } + + public async atomicUpdateText( + path: RelativePath, + updater: (current: TextWithCursors) => TextWithCursors + ): Promise { + const file = this.files.get(path); + if (!file) { + throw new Error(`File ${path} does not exist`); + } + const currentContent = new TextDecoder().decode(file); + const newContent = updater({ text: currentContent, cursors: [] }).text; + this.files.set(path, new TextEncoder().encode(newContent)); + return newContent; + } + + public async getFileSize(path: RelativePath): Promise { + return (await this.read(path)).length; + } + + public async exists(path: RelativePath): Promise { + return this.files.has(path); + } + + public async createDirectory(_path: RelativePath): Promise { + // This doesn't mean anything in our virtual FS representation + } + + public async delete(path: RelativePath): Promise { + this.files.delete(path); + } + + public async rename( + oldPath: RelativePath, + newPath: RelativePath + ): Promise { + const file = this.files.get(oldPath); + if (!file) { + throw new Error(`File ${oldPath} does not exist`); + } + this.files.set(newPath, file); + if (oldPath !== newPath) { + this.files.delete(oldPath); + } + } +} diff --git a/frontend/sync-client/src/utils/debugging/log-to-console.ts b/frontend/sync-client/src/utils/debugging/log-to-console.ts index c47f18f6..c8940536 100644 --- a/frontend/sync-client/src/utils/debugging/log-to-console.ts +++ b/frontend/sync-client/src/utils/debugging/log-to-console.ts @@ -1,10 +1,44 @@ -import type { SyncClient } from "../../sync-client"; -import type { LogLine } from "../../tracing/logger"; +/* eslint-disable no-console */ +import type { Logger, LogLine } from "../../tracing/logger"; import { LogLevel } from "../../tracing/logger"; -export function logToConsole(client: SyncClient): void { - client.logger.onLogEmitted.add((logLine: LogLine) => { - const formatted = `${logLine.timestamp.toISOString()} ${logLine.level} ${logLine.message}`; +const COLORS = { + reset: "\x1b[0m", + red: "\x1b[31m", + yellow: "\x1b[33m", + blue: "\x1b[34m", + gray: "\x1b[90m" +}; + +export function logToConsole( + logger: Logger, + { useColors = true }: { useColors?: boolean } = {} +): void { + logger.onLogEmitted.add((logLine: LogLine) => { + const timestamp = logLine.timestamp.toISOString(); + const message = logLine.message; + + let color = ""; + let reset = ""; + if (useColors) { + reset = COLORS.reset; + switch (logLine.level) { + case LogLevel.ERROR: + color = COLORS.red; + break; + case LogLevel.WARNING: + color = COLORS.yellow; + break; + case LogLevel.INFO: + color = COLORS.blue; + break; + case LogLevel.DEBUG: + color = COLORS.gray; + break; + } + } + + const formatted = `${timestamp} ${color}${logLine.level}${reset} ${message}`; switch (logLine.level) { case LogLevel.ERROR: diff --git a/frontend/sync-client/src/utils/debugging/slow-web-socket-factory.ts b/frontend/sync-client/src/utils/debugging/slow-web-socket-factory.ts index c64bff18..b93460b5 100644 --- a/frontend/sync-client/src/utils/debugging/slow-web-socket-factory.ts +++ b/frontend/sync-client/src/utils/debugging/slow-web-socket-factory.ts @@ -11,7 +11,7 @@ export function slowWebSocketFactory( private static readonly RECEIVE_KEY = "websocket-receive"; private static readonly SEND_KEY = "websocket-send"; - private readonly locks = new Locks(logger); + private readonly locks = new Locks(FlakyWebSocket.name, logger); public set onopen(callback: ((event: Event) => void) | null) { super.onopen = async (event: Event): Promise => { diff --git a/frontend/sync-client/webpack.config.js b/frontend/sync-client/webpack.config.js index b7c3a3fd..413bfeba 100644 --- a/frontend/sync-client/webpack.config.js +++ b/frontend/sync-client/webpack.config.js @@ -49,11 +49,6 @@ module.exports = [ type: "umd" }, globalObject: "this" - }, - resolve: { - fallback: { - ws: false // Exclude `ws` from the browser bundle - } } }), merge(common, { @@ -62,10 +57,6 @@ module.exports = [ path: path.resolve(__dirname, "dist"), filename: "sync-client.node.js", libraryTarget: "commonjs2" - }, - externals: { - bufferutil: "bufferutil", - "utf-8-validate": "utf-8-validate" // required for ws: https://github.com/websockets/ws/issues/2245#issuecomment-2250318733 } }) ];