From 8ce33541a3ed0c274666dcbfeac374f365fb786f Mon Sep 17 00:00:00 2001 From: Andras Schmelczer Date: Sat, 25 Apr 2026 20:29:38 +0100 Subject: [PATCH] more ai changes --- frontend/deterministic-tests/README.md | 2 +- .../src/file-operations/file-operations.ts | 129 ++++++++++-------- .../sync-client/src/services/sync-service.ts | 2 +- .../src/services/websocket-manager.ts | 19 +-- frontend/sync-client/src/sync-client.ts | 15 +- .../src/sync-operations/cursor-tracker.ts | 104 +++++++------- .../sync-client/src/sync-operations/syncer.ts | 41 +++--- .../utils/data-structures/event-listeners.ts | 28 ++-- .../utils/data-structures/min-covered.test.ts | 22 ++- .../src/utils/data-structures/min-covered.ts | 8 +- frontend/sync-client/src/utils/rate-limit.ts | 23 ++-- 11 files changed, 208 insertions(+), 185 deletions(-) diff --git a/frontend/deterministic-tests/README.md b/frontend/deterministic-tests/README.md index fb60a919..c422406d 100644 --- a/frontend/deterministic-tests/README.md +++ b/frontend/deterministic-tests/README.md @@ -45,7 +45,7 @@ Clients always start with syncing disabled. cd sync-server && cargo build --release && cd - # Run all tests -cd frontend && npm run test -w deterministic-tests +cd frontend && npm run build -w sync-client && npm run test -w deterministic-tests # Filter by name npm run test -w deterministic-tests -- --filter=rename diff --git a/frontend/sync-client/src/file-operations/file-operations.ts b/frontend/sync-client/src/file-operations/file-operations.ts index 29e9f0b6..f136afc0 100644 --- a/frontend/sync-client/src/file-operations/file-operations.ts +++ b/frontend/sync-client/src/file-operations/file-operations.ts @@ -8,6 +8,7 @@ import { isFileTypeMergable } from "../utils/is-file-type-mergable"; import { isBinary } from "../utils/is-binary"; import { buildConflictFileName } from "../sync-operations/conflict-path"; import type { ServerConfig } from "../services/server-config"; +import { FileNotFoundError } from "../errors/file-not-found-error"; export enum MoveOnConflict { EXISTING = "EXISTING", @@ -95,67 +96,83 @@ export class FileOperations { return; } - if ( - !isFileTypeMergable( - path, - (await this.serverConfig.getConfig()).mergeableFileExtensions - ) || - isBinary(expectedContent) || - isBinary(newContent) - ) { - this.logger.debug( - `The expected content is not mergable, so we won't perform a 3-way merge, just overwrite it` - ); - await this.fs.write( - path, - // `newContent` might not be binary so we still have to ensure the line endings are correct - this.toNativeLineEndings(newContent) - ); - return; - } - - let expectedText = ""; - let newText = ""; + // The exists() check above is racy: between it returning true and + // any of the writes below running, the file can be deleted. The + // safe wrapper around `atomicUpdateText` raises FileNotFoundError + // in that window — treat it the same as the upfront-missing case + // (skip silently) so callers see one consistent outcome regardless + // of when the deletion happened to occur. try { - expectedText = new TextDecoder("utf-8", { fatal: true }).decode( - expectedContent - ); // this comes from a previous read which must only have \n line endings - newText = new TextDecoder("utf-8", { fatal: true }).decode( - newContent - ); // this comes from the server which stores text with \n line endings - } catch (decodeError) { - this.logger.warn( - `3-way merge aborted for ${path}: one of expected/new is not valid UTF-8 (${decodeError}); falling back to overwrite` - ); - await this.fs.write(path, this.toNativeLineEndings(newContent)); - return; - } - - await this.fs.atomicUpdateText( - path, - ({ text, cursors }: TextWithCursors): TextWithCursors => { + if ( + !isFileTypeMergable( + path, + (await this.serverConfig.getConfig()).mergeableFileExtensions + ) || + isBinary(expectedContent) || + isBinary(newContent) + ) { this.logger.debug( - `Performing a 3-way merge for ${path} with the expected content` + `The expected content is not mergable, so we won't perform a 3-way merge, just overwrite it` ); - - text = text.replaceAll(this.nativeLineEndings, "\n"); - const merged = reconcile( - expectedText, - { text, cursors }, - newText + await this.fs.write( + path, + // `newContent` might not be binary so we still have to ensure the line endings are correct + this.toNativeLineEndings(newContent) ); - - const resultText = merged.text.replaceAll( - "\n", - this.nativeLineEndings - ); - - return { - text: resultText, - cursors: merged.cursors - }; + return; } - ); + + let expectedText = ""; + let newText = ""; + try { + expectedText = new TextDecoder("utf-8", { fatal: true }).decode( + expectedContent + ); // this comes from a previous read which must only have \n line endings + newText = new TextDecoder("utf-8", { fatal: true }).decode( + newContent + ); // this comes from the server which stores text with \n line endings + } catch (decodeError) { + this.logger.warn( + `3-way merge aborted for ${path}: one of expected/new is not valid UTF-8 (${decodeError}); falling back to overwrite` + ); + await this.fs.write(path, this.toNativeLineEndings(newContent)); + return; + } + + await this.fs.atomicUpdateText( + path, + ({ text, cursors }: TextWithCursors): TextWithCursors => { + this.logger.debug( + `Performing a 3-way merge for ${path} with the expected content` + ); + + text = text.replaceAll(this.nativeLineEndings, "\n"); + const merged = reconcile( + expectedText, + { text, cursors }, + newText + ); + + const resultText = merged.text.replaceAll( + "\n", + this.nativeLineEndings + ); + + return { + text: resultText, + cursors: merged.cursors + }; + } + ); + } catch (e) { + if (e instanceof FileNotFoundError) { + this.logger.debug( + `File ${path} disappeared during write; not recreating` + ); + return; + } + throw e; + } } public async delete(path: RelativePath): Promise { diff --git a/frontend/sync-client/src/services/sync-service.ts b/frontend/sync-client/src/services/sync-service.ts index 65726d73..228cc2f2 100644 --- a/frontend/sync-client/src/services/sync-service.ts +++ b/frontend/sync-client/src/services/sync-service.ts @@ -342,7 +342,7 @@ export class SyncService { const url = new URL(this.getUrl("/documents")); if (since !== undefined) { - url.searchParams.append("since", since.toString()); + url.searchParams.append("since_update_id", since.toString()); } const response = await this.client(url.toString(), { headers: this.getDefaultHeaders() diff --git a/frontend/sync-client/src/services/websocket-manager.ts b/frontend/sync-client/src/services/websocket-manager.ts index 4d26d404..3b0d4d44 100644 --- a/frontend/sync-client/src/services/websocket-manager.ts +++ b/frontend/sync-client/src/services/websocket-manager.ts @@ -204,19 +204,22 @@ export class WebSocketManager { this.logger.info(`Connecting to WebSocket at ${wsUri.toString()}`); - this.webSocket = new this.webSocketFactoryImplementation(wsUri); + const ws = new this.webSocketFactoryImplementation(wsUri); + this.webSocket = ws; - // Set connection timeout to handle cases where server is down and the WebSocket connection won't open + // Set connection timeout to handle cases where server is down and the WebSocket connection won't open. + // The callback closes the *captured* `ws` rather than `this.webSocket` so a delayed timeout cannot + // accidentally close a freshly-constructed replacement socket. (Closing the already-closed `ws` is a no-op.) this.connectionTimeoutId = setTimeout(() => { this.connectionTimeoutId = undefined; this.logger.warn( `WebSocket connection timeout after ${WEBSOCKET_CONNECTION_TIMEOUT_IN_SECONDS} seconds` ); // Force close to trigger onclose handler which will schedule reconnection - this.webSocket?.close(1000, "Connection timeout"); + ws.close(1000, "Connection timeout"); }, WEBSOCKET_CONNECTION_TIMEOUT_IN_SECONDS * 1000); - this.webSocket.onopen = (): void => { + ws.onopen = (): void => { if (this.connectionTimeoutId !== undefined) { clearTimeout(this.connectionTimeoutId); this.connectionTimeoutId = undefined; @@ -224,7 +227,7 @@ export class WebSocketManager { // Check if we've been stopped while connecting if (this.isStopped) { - this.webSocket?.close( + ws.close( 1000, "WebSocketManager was stopped during connection" ); @@ -234,7 +237,7 @@ export class WebSocketManager { this.onWebSocketStatusChanged.trigger(true); }; - this.webSocket.onmessage = (event): void => { + ws.onmessage = (event): void => { try { // eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion const message = JSON.parse( @@ -265,13 +268,13 @@ export class WebSocketManager { } }; - this.webSocket.onerror = (error): void => { + ws.onerror = (error): void => { this.logger.warn( `WebSocket error occurred: ${error instanceof ErrorEvent ? error.message : "Unknown error"}` ); }; - this.webSocket.onclose = (event): void => { + ws.onclose = (event): void => { if (this.connectionTimeoutId !== undefined) { clearTimeout(this.connectionTimeoutId); this.connectionTimeoutId = undefined; diff --git a/frontend/sync-client/src/sync-client.ts b/frontend/sync-client/src/sync-client.ts index 9171b998..99a02a87 100644 --- a/frontend/sync-client/src/sync-client.ts +++ b/frontend/sync-client/src/sync-client.ts @@ -399,7 +399,7 @@ export class SyncClient { return DocumentSyncStatus.SYNCING_IS_DISABLED; } - if (!this.syncer.isFirstSyncComplete || !this.hasFinishedOfflineSync) { + if (!this.syncer.isFirstSyncStarted || !this.hasFinishedOfflineSync) { return DocumentSyncStatus.SYNCING; } @@ -428,8 +428,11 @@ export class SyncClient { * After calling this method, the SyncClient cannot be used again. */ public async destroy(): Promise { - this.checkIfDestroyed("destroy"); - + if (this.hasBeenDestroyed) { + throw new Error( + "SyncClient has been destroyed and can no longer be used; called from destroy" + ); + } if (this.isDestroying) { this.logger.warn( "destroy() called while already destroying, ignoring" @@ -534,7 +537,11 @@ export class SyncClient { } private checkIfDestroyed(origin: string): void { - if (this.hasBeenDestroyed) { + // Reject new public-API entries the moment destroy() is called, + // not after `pause()` returns. Otherwise an external caller could + // pass the guard and start mutating state while destroy() is + // tearing down the websocket / clearing caches. + if (this.hasBeenDestroyed || this.isDestroying) { throw new Error( `SyncClient has been destroyed and can no longer be used; called from ${origin}` ); diff --git a/frontend/sync-client/src/sync-operations/cursor-tracker.ts b/frontend/sync-client/src/sync-operations/cursor-tracker.ts index a52fea99..928272b4 100644 --- a/frontend/sync-client/src/sync-operations/cursor-tracker.ts +++ b/frontend/sync-client/src/sync-operations/cursor-tracker.ts @@ -30,9 +30,13 @@ export class CursorTracker { upToDateness: DocumentUpToDateness; })[] = []; - private lastLocalCursorState: DocumentWithCursors[] = []; - private lastLocalCursorStateWithoutDirtyDocuments: DocumentWithCursors[] = - []; + // Cache the previously sent state as a JSON string rather than as the + // array. We mutate `documentsWithCursors` in-place after the cache check + // (setting `vaultUpdateId = null` for dirty docs); storing the array would + // alias and the next call's equality check would compare against + // post-mutation state. + private lastLocalCursorStateJson = "[]"; + private lastLocalCursorStateWithoutDirtyDocumentsJson = "[]"; public constructor( logger: Logger, @@ -99,65 +103,65 @@ export class CursorTracker { public async sendLocalCursorsToServer( documentToCursors: Record ): Promise { - const documentsWithCursors: DocumentWithCursors[] = []; + // Serialise concurrent senders so they don't interleave on the + // disk reads + state mutations and emit out-of-order cursor messages. + await this.updateLock.withLock(async () => { + const documentsWithCursors: DocumentWithCursors[] = []; - for (const [relativePath, cursors] of Object.entries( - documentToCursors - )) { - const record = this.queue.getSettledDocumentByPath(relativePath); + for (const [relativePath, cursors] of Object.entries( + documentToCursors + )) { + const record = this.queue.getSettledDocumentByPath(relativePath); - if (!record) { - continue; // Let's wait for the file to be created before sending cursors + if (!record) { + continue; // Let's wait for the file to be created before sending cursors + } + + documentsWithCursors.push({ + relativePath: relativePath, + documentId: record.documentId, + vaultUpdateId: record.parentVersionId, + cursors: cursors.map(({ start, end }) => ({ + start: Math.min(start, end), + end: Math.max(start, end) + })) // the client might send directional selections + }); } - documentsWithCursors.push({ - relativePath: relativePath, - documentId: record.documentId, - vaultUpdateId: record.parentVersionId, - cursors: cursors.map(({ start, end }) => ({ - start: Math.min(start, end), - end: Math.max(start, end) - })) // the client might send directional selections - }); - } - - if ( - JSON.stringify(this.lastLocalCursorState) === - JSON.stringify(documentsWithCursors) - ) { - // Caching step to avoid reading the edited files all the time - return; - } - this.lastLocalCursorState = documentsWithCursors; - - for (const doc of documentsWithCursors) { - const readContent = await this.fileOperations.read( - doc.relativePath - ); - const record = this.queue.getSettledDocumentByPath( - doc.relativePath - ); - if (record?.remoteHash !== (await hash(readContent))) { - doc.vaultUpdateId = null; + const beforeJson = JSON.stringify(documentsWithCursors); + if (this.lastLocalCursorStateJson === beforeJson) { + // Caching step to avoid reading the edited files all the time + return; } - } + this.lastLocalCursorStateJson = beforeJson; - if ( - JSON.stringify(this.lastLocalCursorStateWithoutDirtyDocuments) === - JSON.stringify(documentsWithCursors) - ) { - return; - } + for (const doc of documentsWithCursors) { + const readContent = await this.fileOperations.read( + doc.relativePath + ); + const record = this.queue.getSettledDocumentByPath( + doc.relativePath + ); + if (record?.remoteHash !== (await hash(readContent))) { + doc.vaultUpdateId = null; + } + } - this.lastLocalCursorStateWithoutDirtyDocuments = documentsWithCursors; + const afterJson = JSON.stringify(documentsWithCursors); + if (this.lastLocalCursorStateWithoutDirtyDocumentsJson === afterJson) { + return; + } - this.webSocketManager.updateLocalCursors({ documentsWithCursors }); + this.lastLocalCursorStateWithoutDirtyDocumentsJson = afterJson; + + this.webSocketManager.updateLocalCursors({ documentsWithCursors }); + }); } public reset(): void { this.knownRemoteCursors = []; - this.lastLocalCursorState = []; - this.lastLocalCursorStateWithoutDirtyDocuments = []; + this.lastLocalCursorStateJson = "[]"; + this.lastLocalCursorStateWithoutDirtyDocumentsJson = "[]"; this.updateLock.reset(); } diff --git a/frontend/sync-client/src/sync-operations/syncer.ts b/frontend/sync-client/src/sync-operations/syncer.ts index 871a61b6..1bc3bd48 100644 --- a/frontend/sync-client/src/sync-operations/syncer.ts +++ b/frontend/sync-client/src/sync-operations/syncer.ts @@ -75,7 +75,7 @@ export class Syncer { ); } - public get isFirstSyncComplete(): boolean { + public get isFirstSyncStarted(): boolean { return this._isFirstSyncStarted; } @@ -264,36 +264,34 @@ export class Syncer { break; } } catch (e) { + // The currently-processed event was already shifted off the queue + // by drain() before processEvent ran. If it's a LocalCreate, any + // queued Delete/Update events whose `documentId` is this Create's + // resolvers.promise would `await` it forever once we return — so + // settle the resolvers on every failure path before + // dispatching/re-throwing. clearPending()'s rejectAllPendingCreates + // walks the queue and so cannot reach this in-flight event. + // Re-rejecting an already-resolved promise is a no-op, so it's + // safe to call this unconditionally on the LocalCreate branch. + if (event.type === SyncEventType.LocalCreate) { + event.resolvers.promise.catch(() => { + /* suppressed */ + }); + event.resolvers.reject( + new Error(`Create was cancelled: ${e}`) + ); + } + if (e instanceof FileNotFoundError) { this.logger.info( `Skipping sync event '${event.type}' because the file no longer exists` ); - if (event.type === SyncEventType.LocalCreate) { - event.resolvers.promise.catch(() => { - /* suppressed */ - }); - event.resolvers.reject(new Error("Create was cancelled")); - } return; } if (e instanceof HttpClientError) { this.logger.error( `Server rejected ${event.type} request: ${e.message}` ); - // The event was already shifted off the queue before - // `processEvent` ran; if it was a Create, its resolver - // promise would otherwise hang forever, blocking any - // queued Delete / SyncLocal that `await`s it. - if (event.type === SyncEventType.LocalCreate) { - event.resolvers.promise.catch(() => { - /* suppressed */ - }); - event.resolvers.reject( - new Error( - `Create was cancelled — server rejected the request (${e.message})` - ) - ); - } return; } throw e; @@ -513,6 +511,7 @@ export class Syncer { if (createEvent === undefined) { // a http response will always be more up-to-date than any queued remote update + // move will always move to the relative path when MoveOnConflict.EXISTING is given await this.operations.move( path, response.relativePath, diff --git a/frontend/sync-client/src/utils/data-structures/event-listeners.ts b/frontend/sync-client/src/utils/data-structures/event-listeners.ts index 8b9a08e9..47c8b8ee 100644 --- a/frontend/sync-client/src/utils/data-structures/event-listeners.ts +++ b/frontend/sync-client/src/utils/data-structures/event-listeners.ts @@ -40,9 +40,12 @@ export class EventListeners any> { * @param args The arguments to pass to each listener */ public trigger(...args: Parameters): void { - this.listeners.forEach((listener) => { + const snapshot = this.listeners.slice(); + for (const listener of snapshot) { + // allow removing listeners during the trigger loop + if (!this.listeners.includes(listener)) continue; listener(...args); - }); + } } /** @@ -53,16 +56,17 @@ export class EventListeners any> { * @param args The arguments to pass to each listener */ public async triggerAsync(...args: Parameters): Promise { - await awaitAll( - this.listeners - .map((listener) => { - // eslint-disable-next-line @typescript-eslint/no-unsafe-return - return listener(...args); - }) - .filter((result): result is Promise => { - return result instanceof Promise; - }) - ); + const snapshot = this.listeners.slice(); + const promises: Promise[] = []; + for (const listener of snapshot) { + if (!this.listeners.includes(listener)) continue; + // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment + const result = listener(...args); + if (result instanceof Promise) { + promises.push(result); + } + } + await awaitAll(promises); } public clear(): void { diff --git a/frontend/sync-client/src/utils/data-structures/min-covered.test.ts b/frontend/sync-client/src/utils/data-structures/min-covered.test.ts index 8ebc94a4..752227c0 100644 --- a/frontend/sync-client/src/utils/data-structures/min-covered.test.ts +++ b/frontend/sync-client/src/utils/data-structures/min-covered.test.ts @@ -48,29 +48,25 @@ describe("MinCovered", () => { assert.strictEqual(covered.min, 6); }); - it("should auto-advance when setting min value", () => { + it("should auto-advance when adding the value that fills the next gap", () => { const covered = new MinCovered(5); covered.add(7); covered.add(8); covered.add(9); assert.strictEqual(covered.min, 5); - // Setting min to 6 should auto-advance through 7, 8, 9 - covered.min = 6; + // Adding 6 fills the gap and auto-advances through 7, 8, 9 + covered.add(6); assert.strictEqual(covered.min, 9); covered.add(10); assert.strictEqual(covered.min, 10); }); - it("should handle setting min value with no consecutive values", () => { + it("should rewind when reset is called explicitly", () => { const covered = new MinCovered(5); - covered.add(10); - covered.add(15); - assert.strictEqual(covered.min, 5); - // Setting min to 8 should not auto-advance (no consecutive values) - covered.min = 8; - assert.strictEqual(covered.min, 8); - // Add 9 to trigger auto-advance to 10 - covered.add(9); - assert.strictEqual(covered.min, 10); + covered.add(7); + covered.reset(3); + assert.strictEqual(covered.min, 3); + covered.add(4); + assert.strictEqual(covered.min, 4); }); }); diff --git a/frontend/sync-client/src/utils/data-structures/min-covered.ts b/frontend/sync-client/src/utils/data-structures/min-covered.ts index f92ef26c..82ba1077 100644 --- a/frontend/sync-client/src/utils/data-structures/min-covered.ts +++ b/frontend/sync-client/src/utils/data-structures/min-covered.ts @@ -16,18 +16,12 @@ export class MinCovered { private seenValues: number[] = []; - public constructor(private minValue: number) {} + public constructor(private minValue: number) { } public get min(): number { return this.minValue; } - public set min(value: number) { - this.minValue = Math.max(value, this.minValue); - this.seenValues = this.seenValues.filter((v) => v > this.minValue); - this.advanceMinWhilePossible(); - } - public add(value: number | undefined): void { if (value === undefined || value < this.minValue) { return; diff --git a/frontend/sync-client/src/utils/rate-limit.ts b/frontend/sync-client/src/utils/rate-limit.ts index 2721de16..99ad68e1 100644 --- a/frontend/sync-client/src/utils/rate-limit.ts +++ b/frontend/sync-client/src/utils/rate-limit.ts @@ -44,20 +44,19 @@ export function rateLimit< newArgs = undefined; } - const { promise, resolve } = Promise.withResolvers(); - running = promise; - sleep( + // `running` must signal both "minimum interval has elapsed" *and* + // "fn() has finished" — otherwise an `fn` that takes longer than + // the interval would let a queued waiter fire a concurrent `fn` + const interval = typeof minIntervalMs === "function" ? minIntervalMs() - : minIntervalMs - ) - .then(() => { - resolve(undefined); - }) - .catch(() => { - // sleep cannot fail - }); - return fn(...args); + : minIntervalMs; + const fnPromise = fn(...args); + running = Promise.all([ + fnPromise.catch(() => undefined), + sleep(interval) + ]); + return fnPromise; }; return decoratedFn;