From 8aeb0d6027817207105aa557bec9031bca1ed86d Mon Sep 17 00:00:00 2001 From: Andras Schmelczer Date: Tue, 5 May 2026 21:50:24 +0100 Subject: [PATCH] codex --- .../src/deterministic-agent.ts | 161 +++++++++++- .../src/test-definition.ts | 14 ++ .../deterministic-tests/src/test-registry.ts | 23 +- .../deterministic-tests/src/test-runner.ts | 28 +++ ...-create-with-stale-deleting-record.test.ts | 52 ++++ ...delete-does-not-hijack-reused-path.test.ts | 56 +++++ ...e-quick-write-rename-before-record.test.ts | 36 +++ ...rwrites-pending-create-then-delete.test.ts | 51 ++++ ...ng-create-onto-pending-delete-path.test.ts | 59 +++++ ...ing-create-reused-path-then-delete.test.ts | 65 +++++ ...ote-quick-write-and-pending-rename.test.ts | 82 +++++++ frontend/sync-client/src/index.ts | 1 + frontend/sync-client/src/sync-client.ts | 2 + .../src/sync-operations/reconciler.test.ts | 69 ++++++ .../src/sync-operations/reconciler.ts | 15 ++ .../sync-operations/sync-event-queue.test.ts | 186 ++++++++++++++ .../src/sync-operations/sync-event-queue.ts | 138 ++++++++++- .../sync-client/src/sync-operations/syncer.ts | 232 +++++++++++++----- .../sync-client/src/sync-operations/types.ts | 1 + frontend/test-client/src/agent/mock-agent.ts | 15 +- 20 files changed, 1198 insertions(+), 88 deletions(-) create mode 100644 frontend/deterministic-tests/src/tests/delete-recreated-pending-create-with-stale-deleting-record.test.ts create mode 100644 frontend/deterministic-tests/src/tests/queued-create-delete-does-not-hijack-reused-path.test.ts create mode 100644 frontend/deterministic-tests/src/tests/remote-quick-write-rename-before-record.test.ts create mode 100644 frontend/deterministic-tests/src/tests/rename-overwrites-pending-create-then-delete.test.ts create mode 100644 frontend/deterministic-tests/src/tests/rename-pending-create-onto-pending-delete-path.test.ts create mode 100644 frontend/deterministic-tests/src/tests/renamed-pending-create-reused-path-then-delete.test.ts create mode 100644 frontend/deterministic-tests/src/tests/same-doc-id-collapse-after-remote-quick-write-and-pending-rename.test.ts create mode 100644 frontend/sync-client/src/sync-operations/reconciler.test.ts diff --git a/frontend/deterministic-tests/src/deterministic-agent.ts b/frontend/deterministic-tests/src/deterministic-agent.ts index a18c3652..74ec2b8d 100644 --- a/frontend/deterministic-tests/src/deterministic-agent.ts +++ b/frontend/deterministic-tests/src/deterministic-agent.ts @@ -1,10 +1,17 @@ import type { + HistoryEntry, StoredDatabase, SyncSettings, RelativePath, TextWithCursors } from "sync-client"; -import { SyncClient, debugging, LogLevel, utils } from "sync-client"; +import { + SyncClient, + SyncResetError, + debugging, + LogLevel, + utils +} from "sync-client"; import { assert } from "./utils/assert"; import { sleep } from "./utils/sleep"; import { withTimeout } from "./utils/with-timeout"; @@ -28,6 +35,18 @@ export class DeterministicAgent extends debugging.InMemoryFileSystem { private readonly syncErrors: Error[] = []; private readonly pendingSyncOperations = new Set>(); private readonly wsFactory = new ManagedWebSocketFactory(); + private nextWriteRename: + | { + oldPath: RelativePath; + newPath: RelativePath; + } + | undefined; + private nextCreateResponseDrop: + | { + dropped: Promise; + resolveDropped: () => void; + } + | undefined; public constructor( clientId: number, @@ -49,7 +68,7 @@ export class DeterministicAgent extends debugging.InMemoryFileSystem { load: async () => this.data, save: async (data) => void (this.data = data) }, - fetch: fetchImplementation, + fetch: this.wrapFetch(fetchImplementation), webSocket: this.wsFactory.constructorFn }); @@ -94,6 +113,65 @@ export class DeterministicAgent extends debugging.InMemoryFileSystem { this.wsFactory.resume(); } + public dropNextCreateResponse(): void { + assert( + this.nextCreateResponseDrop === undefined, + `Client ${this.clientId} already has a create response drop armed` + ); + let resolveDropped!: () => void; + const dropped = new Promise((resolve) => { + resolveDropped = resolve; + }); + this.nextCreateResponseDrop = { + dropped, + resolveDropped + }; + this.log("Armed next create response drop"); + } + + public async waitForDroppedCreateResponse(): Promise { + assert( + this.nextCreateResponseDrop !== undefined, + `Client ${this.clientId} has no create response drop armed` + ); + await withTimeout( + this.nextCreateResponseDrop.dropped, + WAIT_TIMEOUT_MS, + `Client ${this.clientId} timed out waiting for create response drop` + ); + this.log("Create response was dropped after server commit"); + } + + public async waitForHistoryEntry( + matches: (entry: HistoryEntry) => boolean, + onMatch?: (entry: HistoryEntry) => void + ): Promise { + const existing = this.client.getHistoryEntries().find(matches); + if (existing !== undefined) { + onMatch?.(existing); + return; + } + + await withTimeout( + new Promise((resolve) => { + const unsubscribe = this.client.onSyncHistoryUpdated.add(() => { + const entry = this.client + .getHistoryEntries() + .find(matches); + if (entry === undefined) { + return; + } + + unsubscribe(); + onMatch?.(entry); + resolve(); + }); + }), + WAIT_TIMEOUT_MS, + `Client ${this.clientId} timed out waiting for history entry` + ); + } + public async waitForSync(): Promise { this.log("Waiting for sync to complete..."); // Drain agent-level sync operations first. These are the fire-and-forget @@ -160,6 +238,15 @@ export class DeterministicAgent extends debugging.InMemoryFileSystem { return new TextDecoder().decode(bytes); } + public renameNextWrite(oldPath: RelativePath, newPath: RelativePath): void { + assert( + this.nextWriteRename === undefined, + `Client ${this.clientId} already has a next-write rename armed` + ); + this.nextWriteRename = { oldPath, newPath }; + this.log(`Armed next write rename: ${oldPath} -> ${newPath}`); + } + public async cleanup(): Promise { this.log("Cleaning up..."); // Guard against uninitialized client (init() failed partway). @@ -201,15 +288,37 @@ export class DeterministicAgent extends debugging.InMemoryFileSystem { const isNew = !this.files.has(path); await super.write(path, content); + if (this.isSyncEnabled && isNew) { + this.enqueueSync(async () => { + this.client.syncLocallyCreatedFile(path); + }); + } + + const nextWriteRename = this.nextWriteRename; + if ( + nextWriteRename !== undefined && + nextWriteRename.oldPath === path + ) { + this.nextWriteRename = undefined; + await super.rename( + nextWriteRename.oldPath, + nextWriteRename.newPath + ); + if (this.isSyncEnabled) { + this.enqueueSync(async () => { + this.client.syncLocallyUpdatedFile({ + oldPath: nextWriteRename.oldPath, + relativePath: nextWriteRename.newPath + }); + }); + } + } + if (!this.isSyncEnabled) { return; } - if (isNew) { - this.enqueueSync(async () => { - this.client.syncLocallyCreatedFile(path); - }); - } else { + if (!isNew) { this.enqueueSync(async () => { this.client.syncLocallyUpdatedFile({ relativePath: path }); }); @@ -314,4 +423,42 @@ export class DeterministicAgent extends debugging.InMemoryFileSystem { private log(message: string): void { this.logger(`[Client ${this.clientId}] ${message}`); } + + private wrapFetch( + fetchImplementation: typeof globalThis.fetch + ): typeof globalThis.fetch { + return async (input, init) => { + const response = await fetchImplementation(input, init); + const drop = this.nextCreateResponseDrop; + if ( + drop !== undefined && + DeterministicAgent.isCreateDocumentRequest(input, init) + ) { + this.nextCreateResponseDrop = undefined; + drop.resolveDropped(); + throw new SyncResetError(); + } + return response; + }; + } + + private static isCreateDocumentRequest( + input: RequestInfo | URL, + init: RequestInit | undefined + ): boolean { + const method = + init?.method ?? + (typeof Request !== "undefined" && input instanceof Request + ? input.method + : "GET"); + if (method.toUpperCase() !== "POST") { + return false; + } + + const url = + input instanceof URL + ? input + : new URL(typeof input === "string" ? input : input.url); + return /\/documents\/?$/.test(url.pathname); + } } diff --git a/frontend/deterministic-tests/src/test-definition.ts b/frontend/deterministic-tests/src/test-definition.ts index 81cdfd12..bd832a50 100644 --- a/frontend/deterministic-tests/src/test-definition.ts +++ b/frontend/deterministic-tests/src/test-definition.ts @@ -9,16 +9,30 @@ export type TestStep = | { type: "create"; client: number; path: string; content: string } | { type: "update"; client: number; path: string; content: string } | { type: "rename"; client: number; oldPath: string; newPath: string } + | { + type: "rename-next-write"; + client: number; + oldPath: string; + newPath: string; + } | { type: "delete"; client: number; path: string } | { type: "sync"; client?: number } | { type: "disable-sync"; client: number } | { type: "enable-sync"; client: number } | { type: "pause-server" } | { type: "resume-server" } + | { + type: "resume-server-until-history-then-pause"; + client: number; + syncType: "CREATE" | "UPDATE" | "DELETE"; + path: string; + } | { type: "barrier" } | { type: "assert-consistent"; verify?: (state: AssertableState) => void } | { type: "pause-websocket"; client: number } | { type: "resume-websocket"; client: number } + | { type: "drop-next-create-response"; client: number } + | { type: "wait-for-dropped-create-response"; client: number } | { type: "sleep"; ms: number } | { type: "reset"; client: number }; diff --git a/frontend/deterministic-tests/src/test-registry.ts b/frontend/deterministic-tests/src/test-registry.ts index 89ce6b45..6436292f 100644 --- a/frontend/deterministic-tests/src/test-registry.ts +++ b/frontend/deterministic-tests/src/test-registry.ts @@ -98,6 +98,13 @@ import { renameChainDuringPendingCreateTest } from "./tests/rename-chain-during- import { remoteRenameCollidesWithPendingLocalCreateTest } from "./tests/remote-rename-collides-with-pending-local-create.test"; import { remoteUpdateSurvivesUserRenameTest } from "./tests/remote-update-survives-user-rename.test"; import { sameDocIdCollapseOnLocalCreateAfterRemoteCreateTest } from "./tests/same-doc-id-collapse-on-local-create-after-remote-create.test"; +import { sameDocIdCollapseAfterRemoteQuickWriteAndPendingRenameTest } from "./tests/same-doc-id-collapse-after-remote-quick-write-and-pending-rename.test"; +import { renameOverwritesPendingCreateThenDeleteTest } from "./tests/rename-overwrites-pending-create-then-delete.test"; +import { deleteRecreatedPendingCreateWithStaleDeletingRecordTest } from "./tests/delete-recreated-pending-create-with-stale-deleting-record.test"; +import { queuedCreateDeleteDoesNotHijackReusedPathTest } from "./tests/queued-create-delete-does-not-hijack-reused-path.test"; +import { renamedPendingCreateReusedPathThenDeleteTest } from "./tests/renamed-pending-create-reused-path-then-delete.test"; +import { renamePendingCreateOntoPendingDeletePathTest } from "./tests/rename-pending-create-onto-pending-delete-path.test"; +import { remoteQuickWriteRenameBeforeRecordTest } from "./tests/remote-quick-write-rename-before-record.test"; export const TESTS: Partial> = { "rename-create-conflict": renameCreateConflictTest, @@ -221,5 +228,19 @@ export const TESTS: Partial> = { remoteRenameCollidesWithPendingLocalCreateTest, "remote-update-survives-user-rename": remoteUpdateSurvivesUserRenameTest, "same-doc-id-collapse-on-local-create-after-remote-create": - sameDocIdCollapseOnLocalCreateAfterRemoteCreateTest + sameDocIdCollapseOnLocalCreateAfterRemoteCreateTest, + "renamed-pending-create-reused-path-then-delete": + renamedPendingCreateReusedPathThenDeleteTest, + "rename-pending-create-onto-pending-delete-path": + renamePendingCreateOntoPendingDeletePathTest, + "rename-overwrites-pending-create-then-delete": + renameOverwritesPendingCreateThenDeleteTest, + "same-doc-id-collapse-after-remote-quick-write-and-pending-rename": + sameDocIdCollapseAfterRemoteQuickWriteAndPendingRenameTest, + "delete-recreated-pending-create-with-stale-deleting-record": + deleteRecreatedPendingCreateWithStaleDeletingRecordTest, + "queued-create-delete-does-not-hijack-reused-path": + queuedCreateDeleteDoesNotHijackReusedPathTest, + "remote-quick-write-rename-before-record": + remoteQuickWriteRenameBeforeRecordTest }; diff --git a/frontend/deterministic-tests/src/test-runner.ts b/frontend/deterministic-tests/src/test-runner.ts index 6e4aafbf..c8cbadd0 100644 --- a/frontend/deterministic-tests/src/test-runner.ts +++ b/frontend/deterministic-tests/src/test-runner.ts @@ -144,6 +144,13 @@ export class TestRunner { ); break; + case "rename-next-write": + this.getAgent(step.client).renameNextWrite( + step.oldPath, + step.newPath + ); + break; + case "delete": await this.getAgent(step.client).delete(step.path); break; @@ -177,6 +184,19 @@ export class TestRunner { await this.serverControl.waitForReady(); break; + case "resume-server-until-history-then-pause": { + const agent = this.getAgent(step.client); + const historySeen = agent.waitForHistoryEntry( + (entry) => + entry.details.type === step.syncType && + entry.details.relativePath === step.path, + () => this.serverControl.pause() + ); + this.serverControl.resume(); + await historySeen; + break; + } + case "barrier": await this.waitForConvergence(); break; @@ -193,6 +213,14 @@ export class TestRunner { this.getAgent(step.client).resumeWebSocket(); break; + case "drop-next-create-response": + this.getAgent(step.client).dropNextCreateResponse(); + break; + + case "wait-for-dropped-create-response": + await this.getAgent(step.client).waitForDroppedCreateResponse(); + break; + case "sleep": await sleep(step.ms); break; diff --git a/frontend/deterministic-tests/src/tests/delete-recreated-pending-create-with-stale-deleting-record.test.ts b/frontend/deterministic-tests/src/tests/delete-recreated-pending-create-with-stale-deleting-record.test.ts new file mode 100644 index 00000000..80e95f48 --- /dev/null +++ b/frontend/deterministic-tests/src/tests/delete-recreated-pending-create-with-stale-deleting-record.test.ts @@ -0,0 +1,52 @@ +import type { AssertableState } from "../utils/assertable-state"; +import type { TestDefinition } from "../test-definition"; + +export const deleteRecreatedPendingCreateWithStaleDeletingRecordTest: TestDefinition = + { + description: + "A local delete for a recreated pending create must target the " + + "new pending create, not an older same-path record whose server " + + "delete has been acked but whose WebSocket delete receipt is " + + "still paused.", + clients: 2, + steps: [ + { type: "enable-sync", client: 0 }, + { type: "enable-sync", client: 1 }, + { type: "barrier" }, + + { type: "pause-websocket", client: 0 }, + { type: "pause-server" }, + { + type: "create", + client: 0, + path: "binary-14.bin", + content: "BINARY:first" + }, + { type: "sleep", ms: 100 }, + { type: "delete", client: 0, path: "binary-14.bin" }, + { type: "resume-server" }, + { type: "sync", client: 0 }, + + { type: "pause-server" }, + { + type: "create", + client: 0, + path: "binary-14.bin", + content: "BINARY:second" + }, + { type: "sleep", ms: 100 }, + { type: "delete", client: 0, path: "binary-14.bin" }, + { type: "resume-server" }, + { type: "sync", client: 0 }, + + { type: "resume-websocket", client: 0 }, + { type: "barrier" }, + + { + type: "assert-consistent", + verify: (state: AssertableState): void => { + state.assertFileCount(0); + } + } + ] + }; diff --git a/frontend/deterministic-tests/src/tests/queued-create-delete-does-not-hijack-reused-path.test.ts b/frontend/deterministic-tests/src/tests/queued-create-delete-does-not-hijack-reused-path.test.ts new file mode 100644 index 00000000..a29f8314 --- /dev/null +++ b/frontend/deterministic-tests/src/tests/queued-create-delete-does-not-hijack-reused-path.test.ts @@ -0,0 +1,56 @@ +import type { AssertableState } from "../utils/assertable-state"; +import type { TestDefinition } from "../test-definition"; + +export const queuedCreateDeleteDoesNotHijackReusedPathTest: TestDefinition = { + description: + "A create/delete pair that is still queued behind another request " + + "must collapse locally. It must not later read a different file " + + "that reused the same path before the queued create drained.", + clients: 2, + steps: [ + { type: "enable-sync", client: 0 }, + { type: "enable-sync", client: 1 }, + { type: "barrier" }, + + { type: "pause-server" }, + { + type: "create", + client: 1, + path: "blocker.bin", + content: "BINARY:blocker" + }, + { type: "sleep", ms: 100 }, + { + type: "create", + client: 1, + path: "target.bin", + content: "BINARY:old" + }, + { type: "delete", client: 1, path: "target.bin" }, + { + type: "create", + client: 1, + path: "source.bin", + content: "BINARY:new" + }, + { + type: "rename", + client: 1, + oldPath: "source.bin", + newPath: "target.bin" + }, + { type: "resume-server" }, + { type: "barrier" }, + + { + type: "assert-consistent", + verify: (state: AssertableState): void => { + state + .assertFileCount(2) + .assertContent("blocker.bin", "BINARY:blocker") + .assertContent("target.bin", "BINARY:new") + .assertFileNotExists("source.bin"); + } + } + ] +}; diff --git a/frontend/deterministic-tests/src/tests/remote-quick-write-rename-before-record.test.ts b/frontend/deterministic-tests/src/tests/remote-quick-write-rename-before-record.test.ts new file mode 100644 index 00000000..ca184b27 --- /dev/null +++ b/frontend/deterministic-tests/src/tests/remote-quick-write-rename-before-record.test.ts @@ -0,0 +1,36 @@ +import type { AssertableState } from "../utils/assertable-state"; +import type { TestDefinition } from "../test-definition"; + +export const remoteQuickWriteRenameBeforeRecordTest: TestDefinition = { + description: + "Client 0 receives a remote create and the user renames the new " + + "file immediately after the syncer writes it. The watcher event " + + "must bind to the new document instead of being dropped before " + + "the remote-create handler persists the record.", + clients: 2, + steps: [ + { type: "enable-sync", client: 0 }, + { type: "enable-sync", client: 1 }, + + { + type: "rename-next-write", + client: 0, + oldPath: "doc.md", + newPath: "renamed.md" + }, + + { type: "create", client: 1, path: "doc.md", content: "v1\n" }, + { type: "sync", client: 1 }, + { type: "barrier" }, + + { + type: "assert-consistent", + verify: (s: AssertableState): void => { + s.assertFileCount(1); + s.assertFileExists("renamed.md"); + s.assertFileNotExists("doc.md"); + s.assertContent("renamed.md", "v1\n"); + } + } + ] +}; diff --git a/frontend/deterministic-tests/src/tests/rename-overwrites-pending-create-then-delete.test.ts b/frontend/deterministic-tests/src/tests/rename-overwrites-pending-create-then-delete.test.ts new file mode 100644 index 00000000..0b47c781 --- /dev/null +++ b/frontend/deterministic-tests/src/tests/rename-overwrites-pending-create-then-delete.test.ts @@ -0,0 +1,51 @@ +import type { AssertableState } from "../utils/assertable-state"; +import type { TestDefinition } from "../test-definition"; + +export const renameOverwritesPendingCreateThenDeleteTest: TestDefinition = { + description: + "A pending local create at a path must not mask a synced document renamed onto that path; later rename/delete events still belong to the synced document.", + clients: 2, + steps: [ + { type: "enable-sync", client: 0 }, + { type: "enable-sync", client: 1 }, + { + type: "create", + client: 0, + path: "tracked.bin", + content: "BINARY:tracked" + }, + { type: "barrier" }, + + { type: "pause-server" }, + + { + type: "create", + client: 0, + path: "pending.bin", + content: "BINARY:pending" + }, + { + type: "rename", + client: 0, + oldPath: "tracked.bin", + newPath: "pending.bin" + }, + { + type: "rename", + client: 0, + oldPath: "pending.bin", + newPath: "final.bin" + }, + { type: "delete", client: 0, path: "final.bin" }, + + { type: "resume-server" }, + { type: "barrier" }, + + { + type: "assert-consistent", + verify: (state: AssertableState): void => { + state.assertFileCount(0); + } + } + ] +}; diff --git a/frontend/deterministic-tests/src/tests/rename-pending-create-onto-pending-delete-path.test.ts b/frontend/deterministic-tests/src/tests/rename-pending-create-onto-pending-delete-path.test.ts new file mode 100644 index 00000000..0906f209 --- /dev/null +++ b/frontend/deterministic-tests/src/tests/rename-pending-create-onto-pending-delete-path.test.ts @@ -0,0 +1,59 @@ +import type { AssertableState } from "../utils/assertable-state"; +import type { TestDefinition } from "../test-definition"; + +export const renamePendingCreateOntoPendingDeletePathTest: TestDefinition = { + description: + "A pending create is renamed onto a path whose old server document " + + "has a queued delete. The delete must reach the server before the " + + "new create so the new generation is not merged into the soon-to-be " + + "deleted document.", + clients: 2, + steps: [ + { type: "enable-sync", client: 0 }, + { type: "enable-sync", client: 1 }, + { type: "barrier" }, + + { + type: "create", + client: 1, + path: "file-17.md", + content: "old\n" + }, + { type: "barrier" }, + + { type: "pause-server" }, + { + type: "create", + client: 1, + path: "blocker.md", + content: "blocker\n" + }, + { type: "sleep", ms: 100 }, + { + type: "create", + client: 1, + path: "file-23.md", + content: "new\n" + }, + { type: "delete", client: 1, path: "file-17.md" }, + { + type: "rename", + client: 1, + oldPath: "file-23.md", + newPath: "file-17.md" + }, + { type: "resume-server" }, + { type: "barrier" }, + + { + type: "assert-consistent", + verify: (state: AssertableState): void => { + state + .assertFileCount(2) + .assertContent("blocker.md", "blocker\n") + .assertContent("file-17.md", "new\n") + .assertFileNotExists("file-23.md"); + } + } + ] +}; diff --git a/frontend/deterministic-tests/src/tests/renamed-pending-create-reused-path-then-delete.test.ts b/frontend/deterministic-tests/src/tests/renamed-pending-create-reused-path-then-delete.test.ts new file mode 100644 index 00000000..3ffb376e --- /dev/null +++ b/frontend/deterministic-tests/src/tests/renamed-pending-create-reused-path-then-delete.test.ts @@ -0,0 +1,65 @@ +import type { AssertableState } from "../utils/assertable-state"; +import type { TestDefinition } from "../test-definition"; + +export const renamedPendingCreateReusedPathThenDeleteTest: TestDefinition = { + description: + "A queued create is renamed away from file-59.md, a newer local " + + "file reuses file-59.md before the queued create drains, and the " + + "renamed-away generation is deleted. The delete must not erase or " + + "orphan the newer file-59.md generation.", + clients: 2, + steps: [ + { type: "enable-sync", client: 0 }, + { type: "enable-sync", client: 1 }, + { type: "barrier" }, + + { type: "pause-server" }, + { + type: "create", + client: 1, + path: "blocker.md", + content: "blocker\n" + }, + { type: "sleep", ms: 100 }, + + { + type: "create", + client: 1, + path: "file-59.md", + content: "old\n" + }, + { + type: "rename", + client: 1, + oldPath: "file-59.md", + newPath: "file-33.md" + }, + { + type: "create", + client: 1, + path: "file-59.md", + content: "new\n" + }, + + { + type: "resume-server-until-history-then-pause", + client: 1, + syncType: "CREATE", + path: "file-33.md" + }, + { type: "delete", client: 1, path: "file-33.md" }, + { type: "resume-server" }, + { type: "barrier" }, + + { + type: "assert-consistent", + verify: (state: AssertableState): void => { + state + .assertFileCount(2) + .assertContent("blocker.md", "blocker\n") + .assertContent("file-59.md", "new\n") + .assertFileNotExists("file-33.md"); + } + } + ] +}; diff --git a/frontend/deterministic-tests/src/tests/same-doc-id-collapse-after-remote-quick-write-and-pending-rename.test.ts b/frontend/deterministic-tests/src/tests/same-doc-id-collapse-after-remote-quick-write-and-pending-rename.test.ts new file mode 100644 index 00000000..2a3b5de4 --- /dev/null +++ b/frontend/deterministic-tests/src/tests/same-doc-id-collapse-after-remote-quick-write-and-pending-rename.test.ts @@ -0,0 +1,82 @@ +import type { AssertableState } from "../utils/assertable-state"; +import type { TestDefinition } from "../test-definition"; + +export const sameDocIdCollapseAfterRemoteQuickWriteAndPendingRenameTest: TestDefinition = + { + description: + "A remote create starts quick-writing at doc.md while a local " + + "create for the same path is queued and renamed to renamed.md. " + + "Because the local create was renamed before it reached the " + + "server, the two generations should remain separate tracked " + + "documents.", + clients: 2, + steps: [ + { type: "enable-sync", client: 0 }, + + // Create a deleted latest version before client 1 joins. + // Catch-up will advance MinCovered with a non-contiguous id, + // keeping client 1's create lastSeen low enough to exercise + // the server's same-doc merge path from the e2e failure. + { + type: "create", + client: 0, + path: "history.md", + content: "history-v1" + }, + { type: "sync", client: 0 }, + { + type: "update", + client: 0, + path: "history.md", + content: "history-v2" + }, + { type: "sync", client: 0 }, + { type: "delete", client: 0, path: "history.md" }, + { type: "sync", client: 0 }, + + { type: "enable-sync", client: 1 }, + { type: "sync", client: 1 }, + + { type: "pause-websocket", client: 1 }, + + { + type: "create", + client: 0, + path: "doc.md", + content: "remote\n" + }, + { type: "sync", client: 0 }, + + // Let client 1's buffered RemoteCreate enter the quick-write + // path, but hold the content fetch until the local create has + // appeared and moved away from doc.md. + { type: "pause-server" }, + { type: "resume-websocket", client: 1 }, + { type: "sleep", ms: 100 }, + + { + type: "create", + client: 1, + path: "doc.md", + content: "local\n" + }, + { + type: "rename", + client: 1, + oldPath: "doc.md", + newPath: "renamed.md" + }, + + { type: "resume-server" }, + { type: "barrier" }, + + { + type: "assert-consistent", + verify: (state: AssertableState): void => { + state.assertFileCount(2); + state.assertContent("doc.md", "remote\n"); + state.assertContent("renamed.md", "local\n"); + } + } + ] + }; diff --git a/frontend/sync-client/src/index.ts b/frontend/sync-client/src/index.ts index c79ace63..f06523a6 100644 --- a/frontend/sync-client/src/index.ts +++ b/frontend/sync-client/src/index.ts @@ -34,6 +34,7 @@ export type { ClientCursors } from "./services/types/ClientCursors"; export type { NetworkConnectionStatus } from "./types/network-connection-status"; export type { ServerVersionMismatchError } from "./errors/server-version-mismatch-error"; export type { AuthenticationError } from "./errors/authentication-error"; +export { SyncResetError } from "./errors/sync-reset-error"; export type { MaybeOutdatedClientCursors } from "./types/maybe-outdated-client-cursors"; export { DocumentSyncStatus } from "./types/document-sync-status"; export { SyncClient } from "./sync-client"; diff --git a/frontend/sync-client/src/sync-client.ts b/frontend/sync-client/src/sync-client.ts index f5007ecb..441b46e5 100644 --- a/frontend/sync-client/src/sync-client.ts +++ b/frontend/sync-client/src/sync-client.ts @@ -507,6 +507,7 @@ export class SyncClient { await this.serverConfig.getConfig(); await this.syncer.scheduleSyncForOfflineChanges(); + this.syncer.resumeDraining(); this.webSocketManager.start(); this.hasFinishedOfflineSync = true; @@ -514,6 +515,7 @@ export class SyncClient { private async pause(): Promise { this.hasFinishedOfflineSync = false; + this.syncer.pauseDraining(); this.fetchController.startReset(); // Signal the service so any `retryForever` loop exits at its next // iteration instead of continuing to retry a network request while diff --git a/frontend/sync-client/src/sync-operations/reconciler.test.ts b/frontend/sync-client/src/sync-operations/reconciler.test.ts new file mode 100644 index 00000000..13a08363 --- /dev/null +++ b/frontend/sync-client/src/sync-operations/reconciler.test.ts @@ -0,0 +1,69 @@ +import { describe, it } from "node:test"; +import assert from "node:assert"; +import { Logger, LogLevel } from "../tracing/logger"; +import { Settings } from "../persistence/settings"; +import { STORED_STATE_SCHEMA_VERSION, SyncEventQueue } from "./sync-event-queue"; +import { Reconciler } from "./reconciler"; +import { SyncResetError } from "../errors/sync-reset-error"; +import type { FileOperations } from "../file-operations/file-operations"; +import type { SyncService } from "../services/sync-service"; +import type { RelativePath } from "./types"; + +describe("Reconciler", () => { + it("does not emit an error when placement fetch is interrupted by reset", async () => { + const logger = new Logger(); + const settings = new Settings(logger, {}, async () => { + /* no-op */ + }); + const queue = new SyncEventQueue( + settings, + logger, + { schemaVersion: STORED_STATE_SCHEMA_VERSION }, + async () => { + /* no-op */ + } + ); + + await queue.upsertRecord({ + documentId: "DOC-1", + parentVersionId: 1, + remoteHash: "hash", + remoteRelativePath: "remote.md" as RelativePath, + localPath: undefined + }); + + const operations = { + exists: async () => false, + create: async () => { + assert.fail("reset-interrupted placement should not write"); + } + } as unknown as FileOperations; + + const syncService = { + getDocumentVersionContent: async () => { + throw new SyncResetError(); + } + } as unknown as SyncService; + + const reconciler = new Reconciler( + logger, + operations, + syncService, + queue, + new Map() + ); + + await reconciler.run(); + + assert.deepStrictEqual(logger.getMessages(LogLevel.ERROR), []); + assert.ok( + logger + .getMessages(LogLevel.INFO) + .some((line) => + line.message.includes( + "content fetch for DOC-1 interrupted by sync reset" + ) + ) + ); + }); +}); diff --git a/frontend/sync-client/src/sync-operations/reconciler.ts b/frontend/sync-client/src/sync-operations/reconciler.ts index dadcd222..61525118 100644 --- a/frontend/sync-client/src/sync-operations/reconciler.ts +++ b/frontend/sync-client/src/sync-operations/reconciler.ts @@ -6,6 +6,7 @@ import type { SyncService } from "../services/sync-service"; import type { SyncEventQueue } from "./sync-event-queue"; import type { DocumentId, DocumentRecord, RelativePath } from "./types"; import { hash } from "../utils/hash"; +import { SyncResetError } from "../errors/sync-reset-error"; const SWAP_MARKER_DIR = ".vaultlink"; const SWAP_MARKER_PREFIX = "swap-"; @@ -225,6 +226,14 @@ export class Reconciler { private async tryInitialPlacement(record: DocumentRecord): Promise { const target = record.remoteRelativePath; + if (this.queue.hasPendingCreateForPath(target)) { + this.logger.debug( + `Reconciler: cannot place ${record.documentId} at ${target} ` + + `— pending local create still claims that path; will retry next pass` + ); + return; + } + // Slot occupancy: pre-check both the disk and our tracked // records. Either form of occupancy means we wait — the // occupant's own reconciliation pass (after their next wire-loop @@ -259,6 +268,12 @@ export class Reconciler { vaultUpdateId: record.parentVersionId }); } catch (e) { + if (e instanceof SyncResetError) { + this.logger.info( + `Reconciler: content fetch for ${record.documentId} interrupted by sync reset` + ); + return; + } this.logger.error( `Reconciler: failed to fetch content for ${record.documentId}: ${String(e)}` ); diff --git a/frontend/sync-client/src/sync-operations/sync-event-queue.test.ts b/frontend/sync-client/src/sync-operations/sync-event-queue.test.ts index 23c7dfcc..82f78af4 100644 --- a/frontend/sync-client/src/sync-operations/sync-event-queue.test.ts +++ b/frontend/sync-client/src/sync-operations/sync-event-queue.test.ts @@ -248,6 +248,38 @@ describe("SyncEventQueue", () => { assert.strictEqual(second.isUserRename, true); }); + it("settled record owns a path over a stale pending create", async () => { + const queue = createQueue(); + await queue.upsertRecord(fakeRecord("A", { localPath: "b.md" })); + + await queue.enqueue({ type: SyncEventType.LocalCreate, path: "b.md" }); + await queue.enqueue({ + type: SyncEventType.LocalUpdate, + path: "c.md", + oldPath: "b.md" + }); + + const aRecord = queue.getDocumentByDocumentId("A"); + assert.strictEqual(aRecord?.localPath, "c.md"); + assert.strictEqual( + queue.getRecordByLocalPath("b.md" as RelativePath), + undefined + ); + assert.strictEqual( + queue.getRecordByLocalPath("c.md" as RelativePath)?.documentId, + "A" + ); + + const create = await queue.next(); + assert.strictEqual(create?.type, SyncEventType.LocalCreate); + assert.strictEqual(create.path, "b.md"); + + const update = await queue.next(); + assert.strictEqual(update?.type, SyncEventType.LocalUpdate); + assert.strictEqual(update.documentId, "A"); + assert.strictEqual(update.path, "c.md"); + }); + it("byLocalPath stays consistent across upsertRecord, setLocalPath, and rename", async () => { const queue = createQueue(); @@ -502,6 +534,160 @@ describe("SyncEventQueue", () => { assert.strictEqual(await createPromise, "DOC-1"); }); + it("delete collapses a pending create that has not started processing", async () => { + const queue = createQueue(); + + await queue.enqueue({ type: SyncEventType.LocalCreate, path: "a.md" }); + const create = queue.peekFront(); + assert.ok(create?.type === SyncEventType.LocalCreate); + + await queue.enqueue({ type: SyncEventType.LocalDelete, path: "a.md" }); + + assert.strictEqual(queue.pendingUpdateCount, 0); + assert.strictEqual(await queue.next(), undefined); + await assert.rejects(create.resolvers.promise, /cancelled/); + }); + + it("resolveCreate does not claim a localPath after an in-flight pending create was deleted", async () => { + const queue = createQueue(); + + await queue.enqueue({ type: SyncEventType.LocalCreate, path: "a.md" }); + const create = queue.peekFront(); + assert.ok(create?.type === SyncEventType.LocalCreate); + create.isProcessing = true; + + await queue.enqueue({ type: SyncEventType.LocalDelete, path: "a.md" }); + + await queue.resolveCreate( + create, + fakeRecord("DOC-1", { + localPath: "a.md" as RelativePath, + remoteRelativePath: "a.md" as RelativePath + }) + ); + + assert.strictEqual( + queue.getDocumentByDocumentId("DOC-1")?.localPath, + undefined + ); + assert.strictEqual( + queue.getRecordByLocalPath("a.md" as RelativePath), + undefined + ); + + const deleteEvent = await queue.next(); + assert.strictEqual(deleteEvent?.type, SyncEventType.LocalDelete); + assert.strictEqual(deleteEvent.documentId, "DOC-1"); + }); + + it("resolveCreate only clears localPath for a pending delete of that path", async () => { + const queue = createQueue(); + + await queue.enqueue({ + type: SyncEventType.LocalCreate, + path: "old.md" + }); + const create = queue.peekFront(); + assert.ok(create?.type === SyncEventType.LocalCreate); + create.isProcessing = true; + + await queue.enqueue({ + type: SyncEventType.LocalDelete, + path: "old.md" + }); + + await queue.resolveCreate( + create, + fakeRecord("DOC-1", { + localPath: "new.md" as RelativePath, + remoteRelativePath: "new.md" as RelativePath + }) + ); + + assert.strictEqual( + queue.getDocumentByDocumentId("DOC-1")?.localPath, + "new.md" + ); + assert.strictEqual( + queue.getRecordByLocalPath("new.md" as RelativePath)?.documentId, + "DOC-1" + ); + + const deleteEvent = await queue.next(); + assert.strictEqual(deleteEvent?.type, SyncEventType.LocalDelete); + assert.strictEqual(deleteEvent.documentId, "DOC-1"); + assert.strictEqual(deleteEvent.path, "old.md"); + }); + + it("pending create owns a same-path delete over a stale deleting record", async () => { + const queue = createQueue(); + await queue.upsertRecord( + fakeRecord("OLD", { localPath: "a.md" as RelativePath }) + ); + queue.markServerDeletePending("OLD"); + + await queue.enqueue({ type: SyncEventType.LocalCreate, path: "a.md" }); + const create = queue.peekFront(); + assert.ok(create?.type === SyncEventType.LocalCreate); + create.isProcessing = true; + + await queue.enqueue({ type: SyncEventType.LocalDelete, path: "a.md" }); + + assert.strictEqual( + queue.getDocumentByDocumentId("OLD")?.localPath, + undefined + ); + assert.strictEqual( + queue.getRecordByLocalPath("a.md" as RelativePath), + undefined + ); + + const createEvent = await queue.next(); + assert.strictEqual(createEvent, create); + + const deleteEvent = await queue.next(); + assert.strictEqual(deleteEvent?.type, SyncEventType.LocalDelete); + assert.strictEqual(deleteEvent.documentId, create.resolvers.promise); + }); + + it("rename of a queued create drains same-path deletes first", async () => { + const queue = createQueue(); + await queue.upsertRecord( + fakeRecord("OLD", { localPath: "target.md" as RelativePath }) + ); + + await queue.enqueue({ + type: SyncEventType.LocalCreate, + path: "source.md" + }); + const create = queue.peekFront(); + assert.ok(create?.type === SyncEventType.LocalCreate); + + await queue.enqueue({ + type: SyncEventType.LocalDelete, + path: "target.md" + }); + await queue.enqueue({ + type: SyncEventType.LocalUpdate, + oldPath: "source.md", + path: "target.md" + }); + + const deleteEvent = await queue.next(); + assert.strictEqual(deleteEvent?.type, SyncEventType.LocalDelete); + assert.strictEqual(deleteEvent.documentId, "OLD"); + assert.strictEqual(deleteEvent.path, "target.md"); + + const createEvent = await queue.next(); + assert.strictEqual(createEvent, create); + assert.strictEqual(createEvent.path, "target.md"); + + const updateEvent = await queue.next(); + assert.strictEqual(updateEvent?.type, SyncEventType.LocalUpdate); + assert.strictEqual(updateEvent.documentId, create.resolvers.promise); + assert.strictEqual(updateEvent.path, "target.md"); + }); + it("findLatestCreateForPath returns the pending create", async () => { const queue = createQueue(); diff --git a/frontend/sync-client/src/sync-operations/sync-event-queue.ts b/frontend/sync-client/src/sync-operations/sync-event-queue.ts index c52cde03..f9209676 100644 --- a/frontend/sync-client/src/sync-operations/sync-event-queue.ts +++ b/frontend/sync-client/src/sync-operations/sync-event-queue.ts @@ -210,6 +210,7 @@ export class SyncEventQueue { this.events.push({ type: SyncEventType.LocalCreate, path, + isProcessing: false, resolvers: Promise.withResolvers() }); this.notifyPendingUpdateCountChanged(); @@ -223,22 +224,54 @@ export class SyncEventQueue { : path; const record = this._byLocalPath.get(lookupPath); - // latest creation must take precedence as it's from the doc's latest generation + // If a settled record and a pending create both claim this path, the + // settled record owns the current disk slot, unless the record is + // already being deleted. A deleting record can briefly remain in the + // localPath index when a create/delete pair was queued while the + // create was pending; it must not steal the next same-path create's + // delete/update. + const pendingCreate = this.findLatestCreateForPath(lookupPath); const pendingDocumentId: Promise | undefined = - this.findLatestCreateForPath(lookupPath)?.resolvers.promise; + pendingCreate?.resolvers.promise; - const documentId: DocumentId | undefined = record?.documentId; + const recordIsDeleting = + record !== undefined && + (this.hasPendingLocalDeleteForDocumentId(record.documentId) || + this.hasPendingServerDelete(record.documentId)); + const recordOwnsLookupPath = + record !== undefined && + !(recordIsDeleting && pendingDocumentId !== undefined); + + const documentId: DocumentId | undefined = recordOwnsLookupPath + ? record.documentId + : undefined; const effectiveDocumentId: | Promise | DocumentId - | undefined = pendingDocumentId ?? documentId; + | undefined = documentId ?? pendingDocumentId; if (effectiveDocumentId === undefined) { // we can get here when deleting a local document after a remote update return; } if (input.type === SyncEventType.LocalDelete) { + if ( + documentId === undefined && + pendingCreate !== undefined && + !pendingCreate.isProcessing + ) { + this.cancelPendingCreate(pendingCreate); + if (recordIsDeleting && record !== undefined) { + // A stale deleting record was still claiming this path. + // The not-yet-started create/delete pair collapsed to + // nothing, and the disk file is gone, so clear the stale + // claim too. + await this.setLocalPath(record.documentId, undefined); + } + return; + } + // Push BEFORE awaiting `setLocalPath` (and its inner `save()`). // See the comment below on the synchronicity contract with // `ensureDraining()`. @@ -248,10 +281,15 @@ export class SyncEventQueue { path: lookupPath }); this.notifyPendingUpdateCountChanged(); - if (record !== undefined) { + if (recordOwnsLookupPath && record !== undefined) { // The file is gone from disk; clear the doc's localPath so the // Reconciler doesn't try to operate on a vacated slot. await this.setLocalPath(record.documentId, undefined); + } else if (recordIsDeleting && record !== undefined) { + // A stale deleting record was still claiming this path while a + // newer pending create owned the actual disk file. Drop the + // stale claim now that the file is gone. + await this.setLocalPath(record.documentId, undefined); } return; } @@ -259,10 +297,10 @@ export class SyncEventQueue { const isUserRename = input.oldPath !== undefined; let needsSave = false; if (input.oldPath !== undefined) { - if (pendingDocumentId !== undefined) { + if (!recordOwnsLookupPath && pendingDocumentId !== undefined) { this.updatePendingCreatePath(input.oldPath, path); } else { - if (record === undefined) { + if (record === undefined || !recordOwnsLookupPath) { throw new Error( "Unreachable: record must be defined for non-pending update" ); @@ -352,10 +390,7 @@ export class SyncEventQueue { * Return the next event without removing it. Drain uses this so the * event stays visible in the queue while it is being processed — * critical for `findLatestCreateForPath` to update an in-flight - * `LocalCreate`'s path when a rename arrives mid-process. Also marks - * the event as in-flight so dedup checks in `enqueue` know not to - * fold a fresh content change into an event whose disk read already - * happened. + * `LocalCreate`'s local read path when a rename arrives mid-process. */ public peekFront(): SyncEvent | undefined { return this.events[0]; @@ -397,7 +432,13 @@ export class SyncEventQueue { event.resolvers.promise, record.documentId ); - await this.upsertRecord(record); + const localPath = this.hasPendingLocalDeleteForDocumentId( + record.documentId, + record.localPath + ) + ? undefined + : record.localPath; + await this.upsertRecord({ ...record, localPath }); event.resolvers.resolve(record.documentId); } @@ -613,6 +654,18 @@ export class SyncEventQueue { ); } + public hasPendingLocalDeleteForDocumentId( + documentId: DocumentId, + path?: RelativePath + ): boolean { + return this.events.some( + (e) => + e.type === SyncEventType.LocalDelete && + e.documentId === documentId && + (path === undefined || e.path === path) + ); + } + public async clearAllState(): Promise { this.clearPending(); this.byDocId.clear(); @@ -643,6 +696,12 @@ export class SyncEventQueue { return undefined; } + public hasPendingCreateForPath(path: RelativePath): boolean { + return this.events.some( + (e) => e.type === SyncEventType.LocalCreate && e.path === path + ); + } + public updatePendingCreatePath( oldPath: RelativePath, newPath: RelativePath @@ -654,6 +713,9 @@ export class SyncEventQueue { const { promise } = createEvent.resolvers; createEvent.path = newPath; + if (!createEvent.isProcessing) { + this.moveBlockingDeletesBeforeCreate(createEvent, newPath); + } for (const e of this.events) { if ( @@ -665,6 +727,32 @@ export class SyncEventQueue { } } + private moveBlockingDeletesBeforeCreate( + createEvent: Extract, + path: RelativePath + ): void { + const { promise } = createEvent.resolvers; + let createIndex = this.events.indexOf(createEvent); + if (createIndex < 0) { + return; + } + + for (let i = createIndex + 1; i < this.events.length; ) { + const event = this.events[i]; + if ( + event.type === SyncEventType.LocalDelete && + event.path === path && + event.documentId !== promise + ) { + this.events.splice(i, 1); + this.events.splice(createIndex, 0, event); + createIndex++; + continue; + } + i++; + } + } + /** * Synchronous half of `setLocalPath`: mutate `record.localPath` and * re-key `_byLocalPath` without persisting. Used by `enqueue`'s @@ -724,6 +812,32 @@ export class SyncEventQueue { } } + private cancelPendingCreate( + createEvent: Extract + ): void { + const { promise } = createEvent.resolvers; + const toRemove = this.events.filter( + (event) => + event === createEvent || + ((event.type === SyncEventType.LocalUpdate || + event.type === SyncEventType.LocalDelete) && + event.documentId === promise) + ); + + for (const event of toRemove) { + removeFromArray(this.events, event); + } + + createEvent.resolvers.promise.catch(() => { + /* suppressed — the create/delete pair collapsed locally */ + }); + createEvent.resolvers.reject(new Error("Create was cancelled")); + + if (toRemove.length > 0) { + this.notifyPendingUpdateCountChanged(); + } + } + private purgeRemoteChangesForDocumentId(documentId: DocumentId): void { const toRemove = this.events.filter( (e) => diff --git a/frontend/sync-client/src/sync-operations/syncer.ts b/frontend/sync-client/src/sync-operations/syncer.ts index 46b2ba3b..20c5024c 100644 --- a/frontend/sync-client/src/sync-operations/syncer.ts +++ b/frontend/sync-client/src/sync-operations/syncer.ts @@ -65,6 +65,8 @@ export class Syncer { private runningScheduleSyncForOfflineChanges: Promise | undefined; private drainPromise: Promise | undefined; + private drainRequestedWhileRunning = false; + private isDrainingPaused = false; private isScanning = false; private previousRemainingOperationsCount = 0; @@ -244,6 +246,15 @@ export class Syncer { } } + public pauseDraining(): void { + this.isDrainingPaused = true; + } + + public resumeDraining(): void { + this.isDrainingPaused = false; + this.ensureDraining(); + } + private sendHandshakeMessage(): void { const message: WebSocketClientMessage = { type: "handshake", @@ -282,13 +293,27 @@ export class Syncer { private ensureDraining(): void { if (this.drainPromise !== undefined) { + this.drainRequestedWhileRunning = true; return; } if (this.isScanning) { return; } + if (this.isDrainingPaused) { + return; + } this.drainPromise = this.drain().finally(() => { this.drainPromise = undefined; + const shouldRestart = + this.drainRequestedWhileRunning && + this.queue.pendingUpdateCount > 0 && + !this.isScanning && + !this.isDrainingPaused && + this.settings.getSettings().isSyncEnabled; + this.drainRequestedWhileRunning = false; + if (shouldRestart) { + this.ensureDraining(); + } }); } @@ -296,9 +321,12 @@ export class Syncer { // Peek then remove-after-processing (instead of shift-then-process): // the event must remain reachable through `findLatestCreateForPath` // while it is in flight, so a rename event arriving mid-process can - // call `updatePendingCreatePath` to retarget this create's path. + // call `updatePendingCreatePath` to retarget this create's local path. for (;;) { - if (!this.settings.getSettings().isSyncEnabled) { + if ( + this.isDrainingPaused || + !this.settings.getSettings().isSyncEnabled + ) { this.logger.debug( "Drain pausing because sync is disabled; events stay queued" ); @@ -333,6 +361,10 @@ export class Syncer { private async processEvent(event: SyncEvent): Promise { try { + if (event.type === SyncEventType.LocalCreate) { + event.isProcessing = true; + } + if (await this.skipIfOversized(event)) { return; } @@ -460,21 +492,26 @@ export class Syncer { private async processCreate( event: Extract ): Promise { - const contentBytes = await this.operations.read(event.path); + const requestPath = event.path; + const contentBytes = await this.operations.read(requestPath); const contentHash = await hash(contentBytes); - // Read `event.path` live: `updatePendingCreatePath` mutates it in - // place when the user renames the pending create mid-roundtrip. - // Sending `originalPath` here would tell the server the pre-rename - // location, then the queued LocalUpdate from the rename would - // fail on `getFileSize(renamedPath)` after the reconciler moved - // the file back to match the (stale) server-side path. + // Use the path the pending create has when it reaches the wire loop. + // `updatePendingCreatePath` mutates queued creates when a not-yet-sent + // local file is renamed, so a renamed-away generation does not create + // a server document at a path that a newer local file has reused. const response = await this.syncService.create({ - relativePath: event.path, + relativePath: requestPath, lastSeenVaultUpdateId: this.queue.lastSeenUpdateId, contentBytes }); + // If the user renamed the file while the create request was in flight, + // event.path now points at the renamed disk slot. Apply response bytes + // and install the local record there; the queued LocalUpdate carries + // the server-side rename intent. + const localPath = event.path; + // Same-docId collapse. While our LocalCreate sat in the queue, a // RemoteCreate may have arrived for this same path. The wire-loop's // `processRemoteCreateForNewDocument` would have built a record with @@ -487,7 +524,7 @@ export class Syncer { if (response.type === "MergingUpdate") { const responseBytes = base64ToBytes(response.contentBase64); await this.operations.write( - event.path, + localPath, contentBytes, responseBytes ); @@ -495,13 +532,13 @@ export class Syncer { await this.updateCache( response.vaultUpdateId, responseBytes, - event.path + localPath ); } else { await this.updateCache( response.vaultUpdateId, contentBytes, - event.path + localPath ); } @@ -516,13 +553,13 @@ export class Syncer { parentVersionId: response.vaultUpdateId, remoteRelativePath: response.relativePath, remoteHash, - localPath: event.path + localPath }); this.queue.lastSeenUpdateId = response.vaultUpdateId; this.history.addHistoryEntry({ status: SyncStatus.SUCCESS, - details: { type: SyncType.CREATE, relativePath: event.path }, + details: { type: SyncType.CREATE, relativePath: localPath }, message: response.type === "MergingUpdate" ? "Created file and merged with existing remote version" @@ -536,6 +573,24 @@ export class Syncer { event: Extract ): Promise { const documentId = await event.documentId; + const record = this.queue.getDocumentByDocumentId(documentId); + if ( + record?.localPath !== undefined && + record.localPath !== event.path + ) { + this.logger.debug( + `Skipping local-delete for ${documentId} at ${event.path}: ` + + `record now owns ${record.localPath}` + ); + return; + } + + // The disk file is already gone when a LocalDelete reaches the wire + // loop. This is redundant for settled records deleted through + // `enqueue`, but load-bearing for creates that were deleted while the + // create request was still pending: their record only exists after the + // create ack resolves. + await this.queue.setLocalPath(documentId, undefined); const response = await this.syncService.delete({ documentId @@ -754,23 +809,32 @@ export class Syncer { } if (trackedRecord !== undefined) { - // The doc is tracked. If we have a local file backing it - // and that file has gone missing — e.g. the user deleted it - // and the LocalDelete hasn't drained yet, or our HTTP - // DELETE just landed and we're still waiting on the - // WebSocket receipt — ignore the update. Otherwise we'd - // try to operate on a vanished file (or recreate one we're - // tearing down). + // The doc is tracked, but the disk slot can be stale. One + // concrete race: a remote create quick-writes a file, a + // watcher rename/delete lands before the record is fully + // settled, and the record is left claiming a path that no + // longer exists. If no queued local operation owns that + // disappearance, clear the localPath and let + // processRemoteUpdate stash/place the active server version. if (trackedRecord.localPath !== undefined) { const fileExists = await this.operations.exists( trackedRecord.localPath ); - if (!fileExists) { - this.queue.lastSeenUpdateId = remoteVersion.vaultUpdateId; + if ( + !fileExists && + !this.queue.hasPendingLocalEventsForDocumentId( + remoteVersion.documentId + ) + ) { this.logger.debug( - `Ignoring remote update for ${remoteVersion.documentId}: local file at ${trackedRecord.localPath} is missing` + `Remote update for ${remoteVersion.documentId}: ` + + `local file at ${trackedRecord.localPath} is missing; ` + + `clearing localPath for placement` + ); + await this.queue.setLocalPath( + trackedRecord.documentId, + undefined ); - return; } } return this.processRemoteUpdate(trackedRecord, remoteVersion); @@ -992,9 +1056,7 @@ export class Syncer { // design, no buffering at receive time — the reconciler will // fetch on demand. const target = remoteVersion.relativePath; - const slotFree = - !(await this.operations.exists(target)) && - this.queue.getRecordByLocalPath(target) === undefined; + const slotFree = await this.canPlaceRemoteCreateAt(target); let localPath: RelativePath | undefined = undefined; let remoteHash: string | undefined = undefined; @@ -1004,49 +1066,77 @@ export class Syncer { documentId: remoteVersion.documentId, vaultUpdateId: remoteVersion.vaultUpdateId }); - try { - const result = await this.operations.create( - target, - remoteContent - ); - localPath = result.actualPath; - remoteHash = await hash(remoteContent); - await this.updateCache( - remoteVersion.vaultUpdateId, - remoteContent, - localPath - ); - } catch (e) { - if (!(e instanceof FileAlreadyExistsError)) { - throw e; - } - // TOCTOU: the slot was free at the pre-check but - // something landed there between then and now. Fall - // through to the no-localPath branch and let the - // reconciler retry placement once the slot frees. + if (!(await this.canPlaceRemoteCreateAt(target))) { this.logger.debug( `Quick-write for ${remoteVersion.documentId} at ${target} ` + - `lost a TOCTOU race; deferring to reconciler` + `became blocked while fetching content; deferring to reconciler` ); - localPath = undefined; - remoteHash = undefined; + } else { + try { + remoteHash = await hash(remoteContent); + await this.queue.upsertRecord({ + documentId: remoteVersion.documentId, + parentVersionId: remoteVersion.vaultUpdateId, + remoteRelativePath: remoteVersion.relativePath, + remoteHash, + localPath: target + }); + const result = await this.operations.create( + target, + remoteContent + ); + const liveRecord = this.queue.getDocumentByDocumentId( + remoteVersion.documentId + ); + localPath = + liveRecord === undefined + ? result.actualPath + : liveRecord.localPath; + await this.updateCache( + remoteVersion.vaultUpdateId, + remoteContent, + localPath ?? remoteVersion.relativePath + ); + } catch (e) { + await this.queue.setLocalPath( + remoteVersion.documentId, + undefined + ); + if (!(e instanceof FileAlreadyExistsError)) { + throw e; + } + // TOCTOU: the slot was free at the pre-check but + // something landed there between then and now. Fall + // through to the no-localPath branch and let the + // reconciler retry placement once the slot frees. + this.logger.debug( + `Quick-write for ${remoteVersion.documentId} at ${target} ` + + `lost a TOCTOU race; deferring to reconciler` + ); + localPath = undefined; + } } } - await this.queue.upsertRecord({ - documentId: remoteVersion.documentId, - parentVersionId: remoteVersion.vaultUpdateId, - remoteRelativePath: remoteVersion.relativePath, - // `remoteHash` is undefined when we deferred fetching content. - // Consumers (`processLocalUpdate`'s fast-skip, - // `findMatchingFile`'s offline-rename detection) treat - // undefined as "no comparison possible" and fall through to a - // real upload / no-match. The hash gets populated the next - // time we observe a real version (a remote update, or a - // local edit that triggers an upload). - remoteHash, - localPath - }); + if ( + this.queue.getDocumentByDocumentId(remoteVersion.documentId) === + undefined + ) { + await this.queue.upsertRecord({ + documentId: remoteVersion.documentId, + parentVersionId: remoteVersion.vaultUpdateId, + remoteRelativePath: remoteVersion.relativePath, + // `remoteHash` is undefined when we deferred fetching content. + // Consumers (`processLocalUpdate`'s fast-skip, + // `findMatchingFile`'s offline-rename detection) treat + // undefined as "no comparison possible" and fall through to a + // real upload / no-match. The hash gets populated the next + // time we observe a real version (a remote update, or a + // local edit that triggers an upload). + remoteHash, + localPath + }); + } this.queue.lastSeenUpdateId = remoteVersion.vaultUpdateId; @@ -1065,6 +1155,16 @@ export class Syncer { } } + private async canPlaceRemoteCreateAt( + target: RelativePath + ): Promise { + return ( + !this.queue.hasPendingCreateForPath(target) && + !(await this.operations.exists(target)) && + this.queue.getRecordByLocalPath(target) === undefined + ); + } + private async sendUpdate({ record, relativePath, diff --git a/frontend/sync-client/src/sync-operations/types.ts b/frontend/sync-client/src/sync-operations/types.ts index fd5c8b4b..80a64cd7 100644 --- a/frontend/sync-client/src/sync-operations/types.ts +++ b/frontend/sync-client/src/sync-operations/types.ts @@ -53,6 +53,7 @@ export type SyncEvent = | { type: SyncEventType.LocalCreate; path: RelativePath; // current path on disk; mutated in place by `updatePendingCreatePath` when the user renames mid-flight + isProcessing: boolean; // true once the wire loop has started this create; deletes after that must wait for the server ack resolvers: PromiseWithResolvers; } | { diff --git a/frontend/test-client/src/agent/mock-agent.ts b/frontend/test-client/src/agent/mock-agent.ts index 93603c2a..86a14e4c 100644 --- a/frontend/test-client/src/agent/mock-agent.ts +++ b/frontend/test-client/src/agent/mock-agent.ts @@ -18,6 +18,7 @@ export class MockAgent extends MockClient { // The renamed file finding algorithm isn't too smart so we can't both update and rename the same file private readonly doNotTouchWhileOffline: string[] = []; + private readonly doNotRenameWhileOffline: string[] = []; private lastSyncEnabledState = true; public constructor( @@ -62,6 +63,10 @@ export class MockAgent extends MockClient { this.doNotTouchWhileOffline, historyEntry[1] ); + utils.removeFromArray( + this.doNotRenameWhileOffline, + historyEntry[1] + ); } switch (logLine.level) { case LogLevel.ERROR: @@ -365,6 +370,8 @@ export class MockAgent extends MockClient { `Decided to create file ${file} with content ${content}` ); + this.doNotRenameWhileOffline.push(file); + return this.write(file, new TextEncoder().encode(` ${content} `)); } @@ -385,6 +392,8 @@ export class MockAgent extends MockClient { `Decided to create binary file ${file}: ${uuid}` ); + this.doNotRenameWhileOffline.push(file); + return this.write(file, bytes); } @@ -412,10 +421,11 @@ export class MockAgent extends MockClient { // Otherwise, the resolution logic couldn't handle it. if ( !this.lastSyncEnabledState && - this.doNotTouchWhileOffline.includes(file) + (this.doNotTouchWhileOffline.includes(file) || + this.doNotRenameWhileOffline.includes(file)) ) { this.client.logger.info( - `Skipping file ${file} because it has been updated while offline` + `Skipping file ${file} because it cannot be renamed while offline` ); return; } @@ -516,6 +526,7 @@ export class MockAgent extends MockClient { `Deleting file: ${file} with:\n content '${new TextDecoder().decode(this.files.get(file))}'` ); await this.delete(file); + utils.removeFromArray(this.doNotRenameWhileOffline, file); } private getContent(): string {