diff --git a/frontend/deterministic-tests/README.md b/frontend/deterministic-tests/README.md index 0fe053f0..fb60a919 100644 --- a/frontend/deterministic-tests/README.md +++ b/frontend/deterministic-tests/README.md @@ -42,7 +42,7 @@ Clients always start with syncing disabled. ```sh # Build server first -cd sync-server && cargo build --release +cd sync-server && cargo build --release && cd - # Run all tests cd frontend && npm run test -w deterministic-tests diff --git a/frontend/sync-client/src/services/websocket-manager.test.ts b/frontend/sync-client/src/services/websocket-manager.test.ts index 3b61b5a1..7f72ed4a 100644 --- a/frontend/sync-client/src/services/websocket-manager.test.ts +++ b/frontend/sync-client/src/services/websocket-manager.test.ts @@ -246,6 +246,67 @@ describe("WebSocketManager", () => { await manager.stop(); }); + it("handles concurrent stop() calls without stranding either caller", async () => { + // Real WebSocket.close() doesn't fire onclose synchronously, and the + // socket stays reachable across the close handshake. Model that + // here so the manager's `while (isWebSocketConnected)` loop is + // actually awaiting when the second stop() races in. Static OPEN + // is required because the manager compares readyState against + // `factory.OPEN`. + class AsyncCloseWebSocket extends MockWebSocket { + public static readonly OPEN = WebSocket.OPEN; + + public override close(code?: number, reason?: string): void { + if ( + this.readyState === WebSocket.CLOSED || + (this as { _closing?: boolean })._closing === true + ) { + return; + } + (this as { _closing?: boolean })._closing = true; + setTimeout(() => { + this.readyState = WebSocket.CLOSED; + this.onclose?.( + new MockCloseEvent("close", { + code: code ?? 1000, + reason: reason ?? "" + }) + ); + }, 5); + } + } + + const manager = new WebSocketManager( + mockLogger, + mockSettings, + AsyncCloseWebSocket as unknown as typeof WebSocket + ); + + manager.start(); + await new Promise((resolve) => setTimeout(resolve, 50)); + + const start = Date.now(); + // Two concurrent stops mimic destroy() racing onSettingsChange. + await Promise.all([manager.stop(), manager.stop()]); + const elapsed = Date.now() - start; + + // Both should resolve via the normal close path; if the second call + // had clobbered the first's resolver, the first would have been + // stranded until the 10s disconnect timeout. + assert.ok( + elapsed < 1000, + `concurrent stop() took ${elapsed}ms — expected fast resolution` + ); + const errorCalls = ( + mockLogger.error as unknown as { calls: unknown[] } + ).calls; + assert.strictEqual( + errorCalls.length, + 0, + "no timeout-recovery error should be logged" + ); + }); + it("tracks message handling promises", async () => { const manager = new WebSocketManager( mockLogger, diff --git a/frontend/sync-client/src/services/websocket-manager.ts b/frontend/sync-client/src/services/websocket-manager.ts index 5279d0e6..4d26d404 100644 --- a/frontend/sync-client/src/services/websocket-manager.ts +++ b/frontend/sync-client/src/services/websocket-manager.ts @@ -28,6 +28,7 @@ export class WebSocketManager { private isStopped = true; private resolveDisconnectingPromise: null | (() => unknown) = null; + private stopPromise: Promise | null = null; private reconnectTimeoutId: ReturnType | undefined; private connectionTimeoutId: ReturnType | undefined; @@ -58,6 +59,17 @@ export class WebSocketManager { } public async stop(): Promise { + // Concurrent callers (e.g. destroy() and onSettingsChange) must share + // the same disconnect; otherwise the second call would overwrite + // resolveDisconnectingPromise and strand the first caller's await + // until the timeout rejects. + this.stopPromise ??= this.performStop().finally(() => { + this.stopPromise = null; + }); + await this.stopPromise; + } + + private async performStop(): Promise { const { promise, resolve } = Promise.withResolvers(); this.resolveDisconnectingPromise = (): void => { resolve(undefined); @@ -98,7 +110,7 @@ export class WebSocketManager { `Error while waiting for WebSocket to close: ${String(error)}` ); // Force cleanup even if close didn't work - this.resolveDisconnectingPromise(); + this.resolveDisconnectingPromise?.(); this.resolveDisconnectingPromise = null; } finally { // Clear timeout to prevent unhandled rejection diff --git a/frontend/sync-client/src/sync-client.ts b/frontend/sync-client/src/sync-client.ts index 9c919354..9171b998 100644 --- a/frontend/sync-client/src/sync-client.ts +++ b/frontend/sync-client/src/sync-client.ts @@ -26,6 +26,7 @@ import { setUpTelemetry } from "./utils/set-up-telemetry"; import { DIFF_CACHE_SIZE_MB } from "./consts"; import { ServerConfig } from "./services/server-config"; import type { EventListeners } from "./utils/data-structures/event-listeners"; +import { Lock } from "./utils/data-structures/locks"; export class SyncClient { private hasFinishedOfflineSync = false; @@ -34,6 +35,7 @@ export class SyncClient { private unloadTelemetry?: () => void; private isDestroying = false; private readonly eventUnsubscribers: (() => void)[] = []; + private readonly settingsChangeLock = new Lock("SyncClient.onSettingsChange"); private constructor( public readonly logger: Logger, @@ -490,32 +492,45 @@ export class SyncClient { ): Promise { this.checkIfDestroyed("onSettingsChange"); - if ( - newSettings.vaultName !== oldSettings.vaultName || - newSettings.remoteUri !== oldSettings.remoteUri - ) { - await this.reset(); - } - - if (newSettings.isSyncEnabled !== oldSettings.isSyncEnabled) { - if (newSettings.isSyncEnabled) { - await this.startSyncing(); - } else { - await this.pause(); + // Serialize listener invocations so back-to-back settings updates + // can't run reset()/pause()/startSyncing() concurrently. + await this.settingsChangeLock.withLock(async () => { + // The lock is FIFO, so by the time we run the client may have + // been destroyed in a queued invocation ahead of us. + if (this.hasBeenDestroyed) { + return; } - } - if (newSettings.diffCacheSizeMB !== oldSettings.diffCacheSizeMB) { - this.contentCache.resize(newSettings.diffCacheSizeMB * 1024 * 1024); - } + const connectionChanged = + newSettings.vaultName !== oldSettings.vaultName || + newSettings.remoteUri !== oldSettings.remoteUri; - if (newSettings.enableTelemetry !== oldSettings.enableTelemetry) { - if (newSettings.enableTelemetry) { - this.unloadTelemetry = setUpTelemetry(); - } else { - this.unloadTelemetry?.(); + if (connectionChanged) { + // reset() pauses, clears state, then starts iff isSyncEnabled + // — so any concurrent isSyncEnabled change is already applied. + await this.reset(); + } else if (newSettings.isSyncEnabled !== oldSettings.isSyncEnabled) { + if (newSettings.isSyncEnabled) { + await this.startSyncing(); + } else { + await this.pause(); + } } - } + + if (newSettings.diffCacheSizeMB !== oldSettings.diffCacheSizeMB) { + this.contentCache.resize( + newSettings.diffCacheSizeMB * 1024 * 1024 + ); + } + + if (newSettings.enableTelemetry !== oldSettings.enableTelemetry) { + if (newSettings.enableTelemetry) { + this.unloadTelemetry = setUpTelemetry(); + } else { + this.unloadTelemetry?.(); + } + } + }); } private checkIfDestroyed(origin: string): void { diff --git a/frontend/sync-client/src/sync-operations/offline-change-detector.ts b/frontend/sync-client/src/sync-operations/offline-change-detector.ts index 1c07ef42..a5e753a1 100644 --- a/frontend/sync-client/src/sync-operations/offline-change-detector.ts +++ b/frontend/sync-client/src/sync-operations/offline-change-detector.ts @@ -45,6 +45,7 @@ export async function scheduleOfflineChanges( } } + const renamedPaths = new Set(); for (const path of locallyPossibleCreatedFiles) { const content = await operations.read(path); const contentHash = await hash(content); @@ -62,11 +63,12 @@ export async function scheduleOfflineChanges( relativePath: path }); removeFromArray(locallyPossiblyDeletedFiles, matchingDeletedFile); - removeFromArray(locallyPossibleCreatedFiles, path); + renamedPaths.add(path); } } for (const path of locallyPossibleCreatedFiles) { + if (renamedPaths.has(path)) continue; logger.debug( `File ${path} was created while offline, scheduling sync to create it` ); diff --git a/frontend/sync-client/src/sync-operations/sync-event-queue.ts b/frontend/sync-client/src/sync-operations/sync-event-queue.ts index 66ddf7eb..7ad34fdc 100644 --- a/frontend/sync-client/src/sync-operations/sync-event-queue.ts +++ b/frontend/sync-client/src/sync-operations/sync-event-queue.ts @@ -141,13 +141,9 @@ export class SyncEventQueue { } if (input.type === SyncEventType.LocalDelete) { - const deleteId = pendingDocumentId ?? documentId; - if (deleteId === undefined) { - throw new Error("Unreachable: deleteId must be defined here"); - } this.events.push({ type: SyncEventType.LocalDelete, - documentId: deleteId + documentId: (pendingDocumentId ?? documentId)! }); return; } @@ -174,16 +170,11 @@ export class SyncEventQueue { } await this.save(); } - return; } - const updateId = pendingDocumentId ?? documentId; - if (updateId === undefined) { - throw new Error("Unreachable: updateId must be defined here"); - } this.events.push({ type: SyncEventType.LocalUpdate, - documentId: updateId, + documentId: (pendingDocumentId ?? documentId)!, path, originalPath: path }); diff --git a/frontend/sync-client/src/sync-operations/syncer.ts b/frontend/sync-client/src/sync-operations/syncer.ts index 298c35a4..871a61b6 100644 --- a/frontend/sync-client/src/sync-operations/syncer.ts +++ b/frontend/sync-client/src/sync-operations/syncer.ts @@ -162,11 +162,6 @@ export class Syncer { public reset(): void { this._isFirstSyncStarted = false; this.queue.clearPending(); - // Don't null the reference synchronously — if the scan is - // still in flight, the next reconnect would spawn a second - // concurrent scan racing on the same queue. Defer the - // clear until the in-flight task actually resolves, so a - // fresh scan can only start once the prior one is done. const current = this.runningScheduleSyncForOfflineChanges; if (current !== undefined) { void current.finally(() => { @@ -619,11 +614,17 @@ export class Syncer { record: DocumentRecord, remoteVersion: DocumentVersionWithoutContent ): Promise { - if (record.parentVersionId >= remoteVersion.vaultUpdateId) { - this.logger.debug(`Document ${path} is already up-to-date`); - return; - } - + // wait for a local edit to do the actual updating here, so we can't even update the lastSeenUpdateId here + const conflictingDoc = this.queue.getSettledDocumentByPath( + remoteVersion.relativePath + ); + const actualPath = await this.operations.move( + path, + remoteVersion.relativePath, + (conflictingDoc?.parentVersionId ?? 0) < remoteVersion.vaultUpdateId + ? MoveOnConflict.EXISTING + : MoveOnConflict.NEW + ); if ( !this.queue.hasPendingLocalEventsForDocumentId( remoteVersion.documentId @@ -645,39 +646,45 @@ export class Syncer { ); this.queue.lastSeenUpdateId = remoteVersion.vaultUpdateId; } // else we don't need to update the content, a subsequent local update will do that + else { + void this.syncRemotelyUpdatedFile({ + // schedule it so that the lastSeenUpdateId remains consistent + document: remoteVersion + }); - void this.syncRemotelyUpdatedFile({ - // schedule it so that the lastSeenUpdateId remains consistent - document: remoteVersion - }); - // wait for a local edit to do the actual updating here, so we can't even update the lastSeenUpdateId here - const conflictingDoc = this.queue.getSettledDocumentByPath( - remoteVersion.relativePath - ); - const actualRelativePath = await this.operations.move( - path, - remoteVersion.relativePath, - (conflictingDoc?.parentVersionId ?? 0) < remoteVersion.vaultUpdateId - ? MoveOnConflict.EXISTING - : MoveOnConflict.NEW - ); - await this.queue.setDocument(actualRelativePath, { - ...record, - remoteRelativePath: actualRelativePath - }); + await this.queue.setDocument(actualPath, { + ...record, + remoteRelativePath: actualPath + }); + } - this.history.addHistoryEntry({ - status: SyncStatus.SUCCESS, - details: { - type: SyncType.MOVE, - relativePath: actualRelativePath, - movedFrom: path - }, - // todo: eh - message: `File was renamed remotely from ${path} to ${actualRelativePath}` - }); + + if (actualPath !== path) { + this.history.addHistoryEntry({ + status: SyncStatus.SUCCESS, + details: { + type: SyncType.MOVE, + relativePath: actualPath, + movedFrom: path + }, + message: `File was renamed remotely from ${path} to ${actualPath}`, + author: remoteVersion.userId, + timestamp: new Date(remoteVersion.updatedDate) + }); + } else { + this.history.addHistoryEntry({ + status: SyncStatus.SUCCESS, + details: { + type: SyncType.UPDATE, + relativePath: actualPath + }, + message: "Successfully applied remote update", + author: remoteVersion.userId, + timestamp: new Date(remoteVersion.updatedDate) + }); + } } private async processRemoteCreateForNewDocument( diff --git a/frontend/sync-client/src/utils/hash.ts b/frontend/sync-client/src/utils/hash.ts index dbda085b..933929c5 100644 --- a/frontend/sync-client/src/utils/hash.ts +++ b/frontend/sync-client/src/utils/hash.ts @@ -1,8 +1,8 @@ export async function hash(content: Uint8Array): Promise { - // Copy into a fresh ArrayBuffer-backed Uint8Array so the buffer type - // matches `BufferSource`/`Uint8Array` expected by digest. - const owned = new Uint8Array(content); - const digest = await crypto.subtle.digest("SHA-256", owned); + const digest = await crypto.subtle.digest( + "SHA-256", + content as Uint8Array + ); const bytes = new Uint8Array(digest); return Array.from(bytes, (b) => b.toString(16).padStart(2, "0")).join(""); }