From 5776a37dc98236e49715c1cf99e5c3a23f4a7efb Mon Sep 17 00:00:00 2001 From: Andras Schmelczer Date: Tue, 28 Apr 2026 22:20:31 +0100 Subject: [PATCH] eh --- .../deterministic-tests/src/test-registry.ts | 11 +- ...chup-create-and-update-not-skipped.test.ts | 66 ++++ ...sh-cleared-after-rename-deconflict.test.ts | 100 +++++ ...date-response-survives-user-rename.test.ts | 77 ++++ .../src/file-operations/file-operations.ts | 21 +- .../offline-change-detector.ts | 12 +- .../sync-operations/sync-event-queue.test.ts | 15 +- .../src/sync-operations/sync-event-queue.ts | 119 ++++-- .../sync-client/src/sync-operations/syncer.ts | 359 ++++++++++++------ .../sync-client/src/sync-operations/types.ts | 28 +- .../src/utils/find-matching-file.ts | 12 +- sync-server/config-e2e.yml | 1 + sync-server/src/app_state/database.rs | 12 +- 13 files changed, 652 insertions(+), 181 deletions(-) create mode 100644 frontend/deterministic-tests/src/tests/catchup-create-and-update-not-skipped.test.ts create mode 100644 frontend/deterministic-tests/src/tests/conflict-uuid-stash-cleared-after-rename-deconflict.test.ts create mode 100644 frontend/deterministic-tests/src/tests/merging-update-response-survives-user-rename.test.ts diff --git a/frontend/deterministic-tests/src/test-registry.ts b/frontend/deterministic-tests/src/test-registry.ts index f8bfd220..bab7d586 100644 --- a/frontend/deterministic-tests/src/test-registry.ts +++ b/frontend/deterministic-tests/src/test-registry.ts @@ -91,6 +91,9 @@ import { onlineCreateUpdateWhileOtherCreatesSamePathTest } from "./tests/online- import { displacedFileNotMarkedDeletedTest } from "./tests/displaced-file-not-marked-deleted.test"; import { remoteUpdateResurrectsDeletedDocTest } from "./tests/remote-update-resurrects-deleted-doc.test"; import { localUpdateSurvivesRemoteRenameTest } from "./tests/local-update-survives-remote-rename.test"; +import { mergingUpdateResponseSurvivesUserRenameTest } from "./tests/merging-update-response-survives-user-rename.test"; +import { conflictUuidStashClearedAfterRenameDeconflictTest } from "./tests/conflict-uuid-stash-cleared-after-rename-deconflict.test"; +import { catchupCreateAndUpdateNotSkippedTest } from "./tests/catchup-create-and-update-not-skipped.test"; export const TESTS: Partial> = { "rename-create-conflict": renameCreateConflictTest, @@ -203,5 +206,11 @@ export const TESTS: Partial> = { "displaced-file-not-marked-deleted": displacedFileNotMarkedDeletedTest, "remote-update-resurrects-deleted-doc": remoteUpdateResurrectsDeletedDocTest, "local-update-survives-remote-rename": - localUpdateSurvivesRemoteRenameTest + localUpdateSurvivesRemoteRenameTest, + "merging-update-response-survives-user-rename": + mergingUpdateResponseSurvivesUserRenameTest, + "conflict-uuid-stash-cleared-after-rename-deconflict": + conflictUuidStashClearedAfterRenameDeconflictTest, + "catchup-create-and-update-not-skipped": + catchupCreateAndUpdateNotSkippedTest }; diff --git a/frontend/deterministic-tests/src/tests/catchup-create-and-update-not-skipped.test.ts b/frontend/deterministic-tests/src/tests/catchup-create-and-update-not-skipped.test.ts new file mode 100644 index 00000000..2d40228f --- /dev/null +++ b/frontend/deterministic-tests/src/tests/catchup-create-and-update-not-skipped.test.ts @@ -0,0 +1,66 @@ +import type { AssertableState } from "../utils/assertable-state"; +import type { TestDefinition } from "../test-definition"; + +export const catchupCreateAndUpdateNotSkippedTest: TestDefinition = { + description: + "Client 1 disconnects (sync disabled). Client 0 creates a doc and " + + "then updates it. When Client 1 reconnects, the server's catch-up " + + "stream sends only the doc's *latest* version (the update), not the " + + "full history. Pre-fix the wire's `is_new_file` was set to " + + "`creation == latest_version`, so the catch-up flagged the doc as " + + "non-new even though Client 1 had never seen its creation. Client " + + "1's `processRemoteChange` then dropped it as a 'stale RemoteChange " + + "for untracked, non-new document' and the doc was silently lost. " + + "Post-fix `is_new_file` in the catch-up stream means 'new relative " + + "to the recipient's watermark' (`creation > last_seen_vault_update_id`).", + clients: 2, + steps: [ + { type: "enable-sync", client: 0 }, + { type: "enable-sync", client: 1 }, + // Establish a baseline so Client 1's last_seen is non-zero before + // we take it offline. This makes the bug genuinely about catch-up + // missing the create rather than just an empty-vault first sync. + { type: "create", client: 0, path: "warmup.md", content: "w\n" }, + { type: "barrier" }, + + // Client 1 goes offline. + { type: "disable-sync", client: 1 }, + + // Client 0 creates the doc (vault_update_id v_C, after Client 1's + // watermark). Client 1 doesn't see this because it's offline. + { type: "create", client: 0, path: "doc.md", content: "v1\n" }, + // Wait for the create's HTTP to land before the update; otherwise + // both writes are coalesced into a single POST and the server + // never sees the doc as "create followed by update". + { type: "sync", client: 0 }, + + // Client 0 updates the doc (vault_update_id v_X > v_C). The + // server's `latest_document_versions` view now returns the + // *update* row — its `creation_vault_update_id != vault_update_id`. + { + type: "update", + client: 0, + path: "doc.md", + content: "v1\nupdate\n" + }, + { type: "sync", client: 0 }, + + // Client 1 reconnects. Server's catch-up replays docs with + // `vault_update_id > last_seen`. For doc.md it sends v_X with + // `is_new_file` derived from `creation_vault_update_id > + // last_seen_vault_update_id` (post-fix) — so Client 1 treats it + // as a fresh create and downloads the latest content. + { type: "enable-sync", client: 1 }, + { type: "barrier" }, + + { + type: "assert-consistent", + verify: (state: AssertableState): void => { + state.assertFileCount(2); + state.assertFileExists("doc.md"); + state.assertContent("doc.md", "v1\nupdate\n"); + state.assertContent("warmup.md", "w\n"); + } + } + ] +}; diff --git a/frontend/deterministic-tests/src/tests/conflict-uuid-stash-cleared-after-rename-deconflict.test.ts b/frontend/deterministic-tests/src/tests/conflict-uuid-stash-cleared-after-rename-deconflict.test.ts new file mode 100644 index 00000000..b5783688 --- /dev/null +++ b/frontend/deterministic-tests/src/tests/conflict-uuid-stash-cleared-after-rename-deconflict.test.ts @@ -0,0 +1,100 @@ +import type { AssertableState } from "../utils/assertable-state"; +import type { TestDefinition } from "../test-definition"; + +export const conflictUuidStashClearedAfterRenameDeconflictTest: TestDefinition = +{ + description: + "A `RemoteChange` for a brand-new doc D2 at `target.md` reaches " + + "Client 1's queue *before* Client 1's user-rename of D1 → " + + "`target.md`. The rename's `queue.enqueue` mutates " + + "`documents` synchronously, so by the time the drain processes " + + "the buffered broadcast, `target.md` is already tracked by D1 " + + "with a high `parentVersionId`. " + + "`processRemoteCreateForNewDocument`'s version comparison " + + "(`parentVersionId < remoteVaultUpdateId`) takes the " + + "`MoveOnConflict.NEW` branch and stashes D2 at " + + "`conflict--target.md`. The rename's `LocalUpdate` then " + + "drains, the server deconflicts D1 to `target (1).md`, freeing " + + "the `target.md` slot locally — but D2 is left orphaned at the " + + "`conflict--` path forever, diverging from Client 0 which " + + "has D2 at `target.md`.", + clients: 2, + steps: [ + { type: "enable-sync", client: 0 }, + { type: "enable-sync", client: 1 }, + // Both clients have D1 at `original.md`. + { + type: "create", + client: 0, + path: "original.md", + content: "D1 v1\n" + }, + { type: "barrier" }, + + // Buffer Client 1's WebSocket so D2's broadcast doesn't land + // until we're ready to enqueue it ahead of the rename. + { type: "pause-websocket", client: 1 }, + + // Client 0 creates D2 at target.md. Server stores it; broadcast + // is buffered at Client 1. + { + type: "create", + client: 0, + path: "target.md", + content: "D2 v1\n" + }, + { type: "sync", client: 0 }, + + // Pause the server. Now Client 1's next HTTP PUT will buffer in + // TCP and the drain will sit on `await sendUpdate`. + { type: "pause-server" }, + + // Issue an update to D1. The drain pops the LocalUpdate and + // suspends on the HTTP PUT (server is SIGSTOPped). The drain is + // now busy and won't pop further events until resume-server. + { + type: "update", + client: 1, + path: "original.md", + content: "D1 v2\n" + }, + + // Replay the buffered D2 broadcast. It enqueues as a + // RemoteChange BEHIND the in-flight LocalUpdate but AHEAD of + // the rename event we're about to push. + { type: "resume-websocket", client: 1 }, + + // User renames D1 onto target.md. `queue.enqueue` synchronously + // updates `documents` so target.md → D1. The rename's + // LocalUpdate is pushed to the END of the queue, *after* the + // buffered RemoteChange. + { + type: "rename", + client: 1, + oldPath: "original.md", + newPath: "target.md" + }, + + // Resume the server. Drain order: (1) finish the v2 update PUT + // → D1.parentVersionId bumps above D2's vaultUpdateId. (2) + // process the RemoteChange for D2 — sees `documents.get(target.md) + // = D1` with parentVersionId > vaultUpdateId → MoveOnConflict.NEW + // → stashes D2 at `conflict--target.md`. (3) process the + // rename's LocalUpdate — server deconflicts to target (1).md; + // local file moves there. + { type: "resume-server" }, + + { type: "barrier" }, + + { + type: "assert-consistent", + verify: (state: AssertableState): void => { + state.assertFileCount(2); + state.assertFileExists("target.md"); + state.assertFileExists("target (1).md"); + state.assertContent("target.md", "D2 v1\n"); + state.assertContent("target (1).md", "D1 v2\n"); + } + } + ] +}; diff --git a/frontend/deterministic-tests/src/tests/merging-update-response-survives-user-rename.test.ts b/frontend/deterministic-tests/src/tests/merging-update-response-survives-user-rename.test.ts new file mode 100644 index 00000000..e93240f9 --- /dev/null +++ b/frontend/deterministic-tests/src/tests/merging-update-response-survives-user-rename.test.ts @@ -0,0 +1,77 @@ +import type { AssertableState } from "../utils/assertable-state"; +import type { TestDefinition } from "../test-definition"; + +export const mergingUpdateResponseSurvivesUserRenameTest: TestDefinition = { + description: + "Client 1 sends a content update with a stale `parent_version_id` " + + "(its WebSocket is paused, so it hasn't seen Client 0's intervening " + + "edit). The server merges and replies with `MergingUpdate` carrying " + + "the merged text. Before the response lands, the user renames the " + + "doc on Client 1, vacating the disk path the in-flight " + + "`processLocalUpdate` captured. Pre-fix: " + + "`handleMaybeMergingResponse`'s `operations.write(diskPath, …)` " + + "hits the `we wont recreate it` early-return inside `write`, " + + "silently dropping the server-merged content — Client 0's edit is " + + "lost on Client 1's disk, and Client 1's next local-update PUT " + + "(rebased on the now-untracked merged version) deletes Client 0's " + + "edit on the server too. Post-fix: the response is written to the " + + "doc's current tracked disk path, preserving both edits.", + clients: 2, + steps: [ + { type: "enable-sync", client: 0 }, + { type: "enable-sync", client: 1 }, + { type: "create", client: 0, path: "doc.md", content: "0\n" }, + { type: "barrier" }, + + // Stop Client 1 from seeing Client 0's next edit, so its next + // outbound PUT carries a stale `parent_version_id` and the server + // is forced to merge. + { type: "pause-websocket", client: 1 }, + + // Server now holds v_b = "0\nA\n". Client 1's tracked parent + // version stays at v_a = "0\n". + { type: "update", client: 0, path: "doc.md", content: "0\nA\n" }, + { type: "sync", client: 0 }, + + // Pause the server. Subsequent HTTP PUTs from Client 1 buffer at + // the OS layer until resume. This guarantees the merge response + // for Client 1's update is still in flight when the rename below + // mutates `queue.documents`. + { type: "pause-server" }, + + // Client 1 edits doc.md with "B". The drain pops the LocalUpdate, + // captures `diskPath = "doc.md"`, reads the file, and sends the + // HTTP PUT — which buffers because the server is SIGSTOPped. + { type: "update", client: 1, path: "doc.md", content: "0\nB\n" }, + + // User renames the file while the previous PUT is still in flight. + // `queue.enqueue`'s rename branch updates `documents` to point at + // `renamed.md` synchronously, but `processLocalUpdate`'s captured + // `diskPath` ("doc.md") is a local — it can't be retargeted. + { type: "rename", client: 1, oldPath: "doc.md", newPath: "renamed.md" }, + + // Resume the server. It reconciles parent=v_a, latest=v_b, + // new="0\nB\n" → v_c with both edits, replies `MergingUpdate`. + // Pre-fix: write("doc.md", …) sees no file at that path + // (renamed.md now holds the data) and bails out without ever + // writing the merged bytes. Post-fix: the merged bytes land at + // the tracked path (renamed.md). + { type: "resume-server" }, + { type: "resume-websocket", client: 1 }, + + { type: "barrier" }, + + { + type: "assert-consistent", + verify: (state: AssertableState): void => { + state.assertFileCount(1); + state.assertFileExists("renamed.md"); + state.assertFileNotExists("doc.md"); + // Both edits survive: Client 0's "A" and Client 1's "B". + // The reconcile may interleave them either way; assert + // both tokens are present in the converged content. + state.assertContains("renamed.md", "A", "B"); + } + } + ] +}; diff --git a/frontend/sync-client/src/file-operations/file-operations.ts b/frontend/sync-client/src/file-operations/file-operations.ts index 84826c65..512fe0b3 100644 --- a/frontend/sync-client/src/file-operations/file-operations.ts +++ b/frontend/sync-client/src/file-operations/file-operations.ts @@ -263,14 +263,19 @@ export class FileOperations { `Displacing existing file at ${path} to '${conflictPath}' to make room` ); - // Intentionally NOT calling `expectRename` here: the displaced - // file may be a tracked document (its `queue.documents` entry - // still points at `path`), and we need the watcher's - // `syncLocallyUpdatedFile` to flow into `queue.enqueue`'s - // path-update branch so the doc's map key follows its file - // to `conflictPath` and gets resynced - await this.fs.rename(path, conflictPath); - return path; + // The displaced file's rename will fire as a watcher event; + // register `expectRename` so the watcher dedups it. The + // caller is responsible for the queue bookkeeping (relocating + // the displaced doc's tracking) using the `displacedTo` we + // return. + this.expectedFsEvents.expectRename(path, conflictPath); + try { + await this.fs.rename(path, conflictPath); + } catch (e) { + this.expectedFsEvents.unexpectRename(path, conflictPath); + throw e; + } + return { actualPath: path, displacedTo: conflictPath }; } this.logger.debug( diff --git a/frontend/sync-client/src/sync-operations/offline-change-detector.ts b/frontend/sync-client/src/sync-operations/offline-change-detector.ts index 534e35c5..4b8b3609 100644 --- a/frontend/sync-client/src/sync-operations/offline-change-detector.ts +++ b/frontend/sync-client/src/sync-operations/offline-change-detector.ts @@ -1,10 +1,8 @@ -import type { DocumentRecord, DocumentWithPath, RelativePath } from "./types"; -import { SyncEventType } from "./types"; +import type { DocumentRecord, RelativePath } from "./types"; import type { Logger } from "../tracing/logger"; import { hash } from "../utils/hash"; import type { FileOperations } from "../file-operations/file-operations"; import { findMatchingFile } from "../utils/find-matching-file"; -import { FileNotFoundError } from "../errors/file-not-found-error"; import type { SyncEventQueue } from "./sync-event-queue"; import { removeFromArray } from "../utils/remove-from-array"; @@ -31,10 +29,10 @@ export async function scheduleOfflineChanges( // A doc is "possibly deleted" only if it has no local file. Including // docs that still exist locally would queue a spurious delete alongside // the update below. - const locallyPossiblyDeletedFiles: DocumentWithPath[] = []; - for (const [path, record] of allDocuments.entries()) { - if (!allLocalFiles.has(path)) { - locallyPossiblyDeletedFiles.push({ path, record }); + const locallyPossiblyDeletedFiles: DocumentRecord[] = []; + for (const record of allDocuments.values()) { + if (!allLocalFiles.has(record.path)) { + locallyPossiblyDeletedFiles.push(record); } } diff --git a/frontend/sync-client/src/sync-operations/sync-event-queue.test.ts b/frontend/sync-client/src/sync-operations/sync-event-queue.test.ts index 30692681..03048a8c 100644 --- a/frontend/sync-client/src/sync-operations/sync-event-queue.test.ts +++ b/frontend/sync-client/src/sync-operations/sync-event-queue.test.ts @@ -40,10 +40,11 @@ function fakeRecord( overrides: Partial = {} ): DocumentRecord { return { + path: `${documentId.toLowerCase()}.md`, documentId, parentVersionId: 1, remoteHash: `hash-${documentId}`, - remoteRelativePath: `${documentId}.md`, + remoteRelativePath: `${documentId.toLowerCase()}.md`, ...overrides }; } @@ -119,7 +120,7 @@ describe("SyncEventQueue", () => { const found = queue.getDocumentByDocumentId("A"); assert.strictEqual(found?.path, "a.md"); - assert.strictEqual(found.record.documentId, "A"); + assert.strictEqual(found.documentId, "A"); await queue.removeDocument("a.md"); assert.strictEqual(queue.syncedDocumentCount, 0); @@ -216,14 +217,8 @@ describe("SyncEventQueue", () => { logger, { documents: [ - { - relativePath: "a.md", - ...fakeRecord("A", { parentVersionId: 5 }) - }, - { - relativePath: "b.md", - ...fakeRecord("B", { parentVersionId: 3 }) - } + fakeRecord("A", { path: "a.md", parentVersionId: 5 }), + fakeRecord("B", { path: "b.md", parentVersionId: 3 }) ], lastSeenUpdateId: 4 }, diff --git a/frontend/sync-client/src/sync-operations/sync-event-queue.ts b/frontend/sync-client/src/sync-operations/sync-event-queue.ts index 491c52c1..e5983ef9 100644 --- a/frontend/sync-client/src/sync-operations/sync-event-queue.ts +++ b/frontend/sync-client/src/sync-operations/sync-event-queue.ts @@ -4,7 +4,6 @@ import { globsToRegexes } from "../utils/globs-to-regexes"; import { CONFLICT_PATH_REGEX } from "./conflict-path"; import { removeFromArray } from "../utils/remove-from-array"; import { EventListeners } from "../utils/data-structures/event-listeners"; -import type { DocumentWithPath } from "./types"; import { SyncEventType, type DocumentId, @@ -79,8 +78,8 @@ export class SyncEventQueue { initialState ??= {}; if (initialState.documents !== undefined) { - for (const { relativePath, ...record } of initialState.documents) { - this.documents.set(relativePath, record); + for (const record of initialState.documents) { + this.documents.set(record.path, record); } } this._lastSeenUpdateId = new MinCovered( @@ -189,12 +188,14 @@ export class SyncEventQueue { if (input.type === SyncEventType.LocalDelete) { this.events.push({ type: SyncEventType.LocalDelete, - documentId: (pendingDocumentId ?? documentId)! + documentId: (pendingDocumentId ?? documentId)!, + path: lookupPath }); this.notifyPendingUpdateCountChanged(); return; } + const isUserRename = input.oldPath !== undefined; let needsSave = false; if (input.oldPath !== undefined) { if (pendingDocumentId !== undefined) { @@ -205,6 +206,43 @@ export class SyncEventQueue { "Unreachable: record must be defined for non-pending update" ); } + // The user renamed `oldPath` onto `path`. If `path` was + // already tracked by a *different* doc (the OS rename + // overwrote that file), that doc effectively no longer + // exists locally — its content was clobbered. Without + // explicitly recording the loss the doc would silently + // drop out of the documents map below and we'd skip + // notifying the server, leaving a phantom on the remote + // that other agents still see. Enqueue a LocalDelete for + // it so the server learns about the deletion. + const displacedRecord = this.documents.get(path); + if ( + displacedRecord !== undefined && + displacedRecord.documentId !== documentId + ) { + this.events.push({ + type: SyncEventType.LocalDelete, + documentId: displacedRecord.documentId, + // The doc still lives at `path` on the server; the + // OS rename only overwrote our local file. Snapshot + // the path so `processDelete` can issue the server + // DELETE even after `documents.set(path, record)` + // below removes the entry from the map. + path + }); + } + // Inlined relocation: same shape as `setDocument`'s + // relocation branch (mutate the record's path in place, + // delete-old, set-new, retarget queued LocalUpdates) but + // kept synchronous. Callers fire `enqueue` with `void` + // and immediately call `ensureDraining()`; if we awaited + // `setDocument()` here, the LocalUpdate push below would + // happen after the await and the drain that already + // started would see an empty queue, exit, and leave the + // event stranded. We mutate `record.path` rather than + // re-creating it so any reference held by an in-flight + // drain handler sees the new path on its next read. + record.path = path; this.documents.delete(input.oldPath); this.documents.set(path, record); for (const e of this.events) { @@ -220,16 +258,14 @@ export class SyncEventQueue { } } - // Push BEFORE awaiting `save()`. Callers fire `enqueue` with `void` - // and immediately call `ensureDraining()`, which starts a drain that - // synchronously shifts off the queue. If we awaited save first the - // shift would see the queue empty, drain would exit, and the event - // would never get processed until the next unrelated trigger. + // Push BEFORE awaiting `save()`. See the comment above on the + // synchronicity contract with `ensureDraining()`. this.events.push({ type: SyncEventType.LocalUpdate, documentId: (pendingDocumentId ?? documentId)!, path, - originalPath: path + originalPath: path, + isUserRename }); this.notifyPendingUpdateCountChanged(); @@ -293,20 +329,54 @@ export class SyncEventQueue { * If the document is already tracked under a different path (e.g. after a * rename) the old entry is removed so the map stays keyed by the latest * disk path and `getDocumentByDocumentId` can't return a stale match. + * + * Whenever this relocates a tracked doc it also rewrites the `path` + * field of every queued `LocalUpdate` for the same doc. The invariant + * the queue relies on — and that `skipIfOversized` and the watcher + * dedup checks bake in — is that `event.path` always points at the + * doc's current disk location. Letting the map move out from under + * the events would leave readers like `getFileSize(event.path)` + * pointing at a vacated slot and silently swallowing the event. */ public async setDocument( path: RelativePath, record: DocumentRecord ): Promise { + // If a record for the same docId is already tracked, mutate it in + // place instead of inserting a fresh object. Callers (drain + // handlers, queued events) hold long-lived references to the + // record and read `.path` from it on every access — replacing the + // reference would orphan those reads at the old object's path + // value. Keeping the same object identity also keeps the + // `documents.get(record.path) === record` invariant trivially + // true after a rename. + let target: DocumentRecord | undefined; for (const [existingPath, existingRecord] of this.documents) { - if ( - existingPath !== path && - existingRecord.documentId === record.documentId - ) { - this.documents.delete(existingPath); + if (existingRecord.documentId === record.documentId) { + target = existingRecord; + if (existingPath !== path) { + this.documents.delete(existingPath); + } + } + } + if (target === undefined) { + target = { ...record, path }; + } else { + target.path = path; + target.intendedPath = record.intendedPath; + target.parentVersionId = record.parentVersionId; + target.remoteHash = record.remoteHash; + target.remoteRelativePath = record.remoteRelativePath; + } + this.documents.set(path, target); + for (const e of this.events) { + if ( + e.type === SyncEventType.LocalUpdate && + e.documentId === record.documentId + ) { + e.path = path; } } - this.documents.set(path, record); return this.save(); } @@ -317,16 +387,16 @@ export class SyncEventQueue { public getDocumentByDocumentId( target: DocumentId - ): DocumentWithPath | undefined { - for (const [path, record] of this.documents) { + ): DocumentRecord | undefined { + for (const record of this.documents.values()) { if (record.documentId === target) { - return { path, record }; + return record; } } return undefined; } - public getDocumentByDocumentIdOrFail(target: DocumentId): DocumentWithPath { + public getDocumentByDocumentIdOrFail(target: DocumentId): DocumentRecord { const result = this.getDocumentByDocumentId(target); if (!result) { throw new Error(`No document found with id ${target}`); @@ -336,12 +406,7 @@ export class SyncEventQueue { public async save(): Promise { return this.saveData({ - documents: Array.from(this.documents.entries()).map( - ([relativePath, record]) => ({ - relativePath, - ...record - }) - ), + documents: Array.from(this.documents.values()), lastSeenUpdateId: this.lastSeenUpdateId }); } @@ -387,8 +452,6 @@ export class SyncEventQueue { ); } - - public async clearAllState(): Promise { this.clearPending(); this.documents.clear(); diff --git a/frontend/sync-client/src/sync-operations/syncer.ts b/frontend/sync-client/src/sync-operations/syncer.ts index 9ccc76c8..0ee17278 100644 --- a/frontend/sync-client/src/sync-operations/syncer.ts +++ b/frontend/sync-client/src/sync-operations/syncer.ts @@ -14,6 +14,7 @@ import { type FileOperations } from "../file-operations/file-operations"; import { scheduleOfflineChanges } from "./offline-change-detector"; +import { CONFLICT_PATH_REGEX } from "./conflict-path"; import { SyncResetError } from "../errors/sync-reset-error"; import type { DocumentVersionWithoutContent } from "../services/types/DocumentVersionWithoutContent"; import type { WebSocketVaultUpdate } from "../services/types/WebSocketVaultUpdate"; @@ -422,7 +423,6 @@ export class Syncer { }); await this.handleMaybeMergingResponse({ - path: event.path, response, contentHash, originalContentBytes: contentBytes, @@ -473,8 +473,8 @@ export class Syncer { ): Promise { const documentId = await event.documentId; - const tracked = this.queue.getDocumentByDocumentId(documentId); - if (tracked === undefined) { + const record = this.queue.getDocumentByDocumentId(documentId); + if (record === undefined) { // The doc was deleted between this event being queued and // drained — skip silently. Common when a LocalDelete drains // ahead of a LocalUpdate that was already in the queue. @@ -483,9 +483,13 @@ export class Syncer { ); return; } - const { path: diskPath, record } = tracked; - const contentBytes = await this.operations.read(diskPath); + // Read `record.path` (not a captured local) on every access. The + // queue mutates `record.path` in place when a user rename arrives + // mid-roundtrip, so re-reading from the live record keeps the + // path current; capturing into a local variable would freeze it + // at function entry and then write/read against a vacated slot. + const contentBytes = await this.operations.read(record.path); const contentHash = await hash(contentBytes); // For a user-driven rename the user's intent is `event.originalPath` @@ -506,7 +510,7 @@ export class Syncer { if (!hashChanged && !pathChanged) { this.logger.debug( - `File hash of ${diskPath} matches last synced version; no need to sync` + `File hash of ${record.path} matches last synced version; no need to sync` ); return; } @@ -518,7 +522,7 @@ export class Syncer { }); if (response.isDeleted) { - await this.processRemoteDelete(diskPath, { + await this.processRemoteDelete(record.path, { ...response, contentSize: 0, isNewFile: false @@ -527,7 +531,8 @@ export class Syncer { } await this.handleMaybeMergingResponse({ - path: diskPath, + record, + pathBeforeRoundtrip: record.path, response, contentHash, originalContentBytes: contentBytes @@ -538,7 +543,7 @@ export class Syncer { status: SyncStatus.SUCCESS, details: { type: SyncType.UPDATE, - relativePath: diskPath + relativePath: record.path }, message: isMerge ? "Updated file and merged with remote changes" @@ -549,39 +554,62 @@ export class Syncer { } private async handleMaybeMergingResponse({ - path, + record, + pathBeforeRoundtrip, response, contentHash, originalContentBytes, createEvent }: { - path: RelativePath; + // Live record reference for a LocalUpdate flow. Path reads go + // through `record.path` so a user-rename mid-roundtrip is seen + // on every access. + record?: DocumentRecord; + // Snapshot of `record.path` captured before `sendUpdate` + // awaited. Compared against the live `record.path` after the + // roundtrip to decide whether a user rename happened in + // between. + pathBeforeRoundtrip?: RelativePath; response: DocumentUpdateResponse; contentHash: string; originalContentBytes: Uint8Array; // When processing a Create, pass the originating event so its - // `resolvers` promise can be fulfilled (or rejected, on a deleted - // response) + // `resolvers` promise can be fulfilled (or rejected, on a + // deleted response). The create flow reads the live disk path + // off `createEvent.path` (mutated by + // `updatePendingCreatePath` on a user rename). createEvent?: Extract; }): Promise { - const record = { + const newRecord = { documentId: response.documentId, parentVersionId: response.vaultUpdateId, remoteRelativePath: response.relativePath }; let remoteHash: string; + // The two flows see rename retargeting through different live + // objects: + // - LocalUpdate: `record.path` is mutated in place by + // `queue.enqueue`'s rename branch and `setDocument`. + // - LocalCreate: the doc isn't tracked yet (no + // `resolveCreate` has run); the rename retargets + // `createEvent.path` via `updatePendingCreatePath`. + // In both cases reading the live property at write time keeps + // the merged bytes from being written to a vacated path. + const writePath = + createEvent !== undefined ? createEvent.path : record!.path; + if ("type" in response && response.type === "MergingUpdate") { const responseBytes = base64ToBytes(response.contentBase64); await this.operations.write( - path, + writePath, originalContentBytes, responseBytes ); remoteHash = await hash(responseBytes); - await this.updateCache(response.vaultUpdateId, responseBytes, path); + await this.updateCache(response.vaultUpdateId, responseBytes, writePath); } else { // Fast-forward update: no merge needed remoteHash = contentHash; @@ -589,73 +617,169 @@ export class Syncer { await this.updateCache( response.vaultUpdateId, originalContentBytes, - path + writePath ); } if (createEvent === undefined) { - // The disk path captured at the start of `processLocalUpdate` - // can be stale: the user may have renamed the file during the - // server roundtrip, in which case `queue.documents` already - // points at the new path and a follow-up rename's LocalUpdate - // is queued behind us. If we forced the disk back to - // `response.relativePath` here we'd undo the user's intent; - // worse, `setDocument`'s same-docId cleanup would clobber the - // map entry that was tracking the latest disk path, leaving - // future LocalUpdates for this doc reading from a vacated - // slot and getting skipped as `FileNotFoundError`. Refresh - // the latest tracked path and only touch disk when it still - // matches the captured one. - const tracked = this.queue.getDocumentByDocumentId( - response.documentId - ); - if (tracked === undefined) { - this.logger.debug( - `Document ${response.documentId} is no longer tracked after update; cannot reconcile potential rename` + if (record === undefined || pathBeforeRoundtrip === undefined) { + throw new Error( + "Unreachable: LocalUpdate flow must pass `record` and `pathBeforeRoundtrip`" ); + } + // `record.path` is the *live* path. If a user rename ran + // during the roundtrip, `queue.enqueue` mutated it (and the + // queued LocalUpdate event's `path` field) to the user's + // new target; otherwise it still equals + // `pathBeforeRoundtrip`. + const currentPath = record.path; + if (currentPath === pathBeforeRoundtrip) { + // Move our local file onto the server-assigned path. + // `MoveOnConflict.NEW` means "if the target is taken + // locally by some other doc, route ours to a + // `conflict--` path instead of evicting them". + // We then record `intendedPath = response.relativePath` + // so future server-bound requests for this doc reference + // the path the server actually has it at, not the local + // conflict-uuid path. The other doc keeps its slot; + // local convergence is left to manual user resolution. + const moveResult = await this.operations.move( + currentPath, + response.relativePath, + MoveOnConflict.NEW + ); + this.queue.updatePendingCreatePath(currentPath, moveResult.actualPath); + await this.queue.setDocument(moveResult.actualPath, { + ...newRecord, + path: moveResult.actualPath, + intendedPath: + moveResult.actualPath === response.relativePath + ? undefined + : response.relativePath, + remoteHash + }); } else { - const currentPath = tracked.path; - if (currentPath === path) { - // a http response will always be more up-to-date than any queued remote update - // move will always move to the relative path when MoveOnConflict.EXISTING is given - await this.operations.move( - currentPath, - response.relativePath, - MoveOnConflict.EXISTING - ); - this.queue.updatePendingCreatePath(currentPath, response.relativePath); - await this.queue.setDocument(response.relativePath, { - ...record, - remoteHash - }); - } else { - // User renamed during the roundtrip. Leave the disk file - // at `currentPath`; the queued rename's LocalUpdate will - // reconcile the server on its next drain. - await this.queue.setDocument(currentPath, { - ...record, - remoteHash - }); - } + // User renamed during the roundtrip. Leave the disk file + // at `currentPath`; the queued rename's LocalUpdate will + // reconcile the server on its next drain. + await this.queue.setDocument(currentPath, { + ...newRecord, + path: currentPath, + remoteHash + }); } } else { - // The server may have deconflicted the path on create (e.g. - // another client raced us to the same path and won). Move the - // local file to match the server-assigned path so the queue's - // disk-path key, the on-disk path, and `remoteRelativePath` stay - // consistent. Without this, a later remote create at the - // originally-requested path would see a phantom local conflict - // and stash the new file under a `conflict--` path. - if (response.relativePath !== createEvent.originalPath) { - await this.operations.move( + // Displacement-merge: while this LocalCreate sat in the queue, a + // RemoteCreate for `originalPath` was processed first, displaced + // our local file to a `conflict-…` path, and tracked the remote + // doc at `originalPath`. The server then de-duplicated our + // create into that already-tracked doc and returned its id. + // Relocate the just-merged content from the conflict path to + // the existing tracked path (overwriting the older content the + // displacement wrote there) and drop the conflict file. + // + // Falling through to `resolveCreate(createEvent, ...)` would + // call `setDocument(conflict-…, D)`, whose same-docId cleanup + // strips D's tracking from `originalPath` and leaves the file + // there orphaned on disk. + const existing = this.queue.getDocumentByDocumentId( + response.documentId + ); + if ( + existing !== undefined && + existing.path === response.relativePath && + existing.path !== createEvent.path + ) { + // The merged content already lives at `createEvent.path` + // (the MergingUpdate branch above wrote it there). Slot + // it into `response.relativePath` by deleting D's stale + // content there and renaming the conflict file in. We + // can't `operations.write` the merged bytes onto the + // existing path: that runs a 3-way merge against the + // stale content as if it were a concurrent edit, which + // strips out the very content the server just merged. + await this.operations.delete(response.relativePath); + // We just deleted `response.relativePath`. With + // `MoveOnConflict.NEW` a stray racing occupant would + // route our file to a `conflict--` path; we'd + // then track the doc there with `intendedPath` set. + const moveResult = await this.operations.move( createEvent.path, response.relativePath, - MoveOnConflict.EXISTING + MoveOnConflict.NEW ); - this.queue.updatePendingCreatePath(createEvent.path, response.relativePath); + await this.queue.setDocument(moveResult.actualPath, { + ...newRecord, + path: moveResult.actualPath, + intendedPath: + moveResult.actualPath === response.relativePath + ? undefined + : response.relativePath, + remoteHash + }); + this.queue.consumeEvent(createEvent); + createEvent.resolvers.resolve(newRecord.documentId); + this.queue.lastSeenUpdateId = response.vaultUpdateId; + return; + } + // Reconcile disk and tracking with the server-assigned path. + // Two cases produce a mismatch: + // 1. Server deconflicted (e.g. another client raced us): we + // know because `response.relativePath !== createEvent.originalPath`. + // Move the local file to the server-assigned path, otherwise + // a later remote create at our original path would see a + // phantom local conflict and stash the new file under + // `conflict--`. + // 2. The create's local file was displaced to a `conflict-…` + // path while it sat in the queue, but the server still + // placed the doc at our original path (e.g. the existing + // doc that forced the displacement was meanwhile deleted, + // so the server-side merge / deconflict path didn't + // fire). Move the conflict file onto the original path + // so `resolveCreate` tracks the doc at the path the + // server returned, instead of the displaced conflict + // path which would orphan the file. + // + // We must NOT move when `createEvent.path` differs from + // `originalPath` because of a *user rename* of the pending + // create (e.g. write A.md, rename to B.md): there the user's + // intent is the renamed path, the server places the doc at + // `originalPath`, and the queued `LocalUpdate` from the + // watcher will replay the rename to the server. + let resolvedPath = createEvent.path; + let resolvedIntendedPath: RelativePath | undefined; + if (response.relativePath !== createEvent.originalPath) { + const moveResult = await this.operations.move( + createEvent.path, + response.relativePath, + MoveOnConflict.NEW + ); + this.queue.updatePendingCreatePath(createEvent.path, moveResult.actualPath); + resolvedPath = moveResult.actualPath; + resolvedIntendedPath = + moveResult.actualPath === response.relativePath + ? undefined + : response.relativePath; + } else if ( + createEvent.path !== response.relativePath && + CONFLICT_PATH_REGEX.test(createEvent.path) + ) { + const moveResult = await this.operations.move( + createEvent.path, + response.relativePath, + MoveOnConflict.NEW + ); + this.queue.updatePendingCreatePath(createEvent.path, moveResult.actualPath); + resolvedPath = moveResult.actualPath; + resolvedIntendedPath = + moveResult.actualPath === response.relativePath + ? undefined + : response.relativePath; } await this.queue.resolveCreate(createEvent, { - ...record, + ...newRecord, + path: resolvedPath, + intendedPath: resolvedIntendedPath, remoteHash }); } @@ -667,12 +791,12 @@ export class Syncer { event: Extract ): Promise { const { remoteVersion } = event; - const documentWithPath = this.queue.getDocumentByDocumentId( + const trackedRecord = this.queue.getDocumentByDocumentId( remoteVersion.documentId ); if (remoteVersion.isDeleted) { - if (documentWithPath === undefined) { + if (trackedRecord === undefined) { // The doc isn't tracked locally — either we never had // it (joined the vault after the delete) or a previous // delete already cleaned it up. Just advance @@ -682,13 +806,13 @@ export class Syncer { return; } return this.processRemoteDelete( - documentWithPath.path, + trackedRecord.path, remoteVersion ); } if ( - (documentWithPath?.record.parentVersionId ?? 0) >= + (trackedRecord?.parentVersionId ?? 0) >= remoteVersion.vaultUpdateId ) { this.queue.lastSeenUpdateId = remoteVersion.vaultUpdateId; @@ -698,7 +822,7 @@ export class Syncer { return; } - if (documentWithPath !== undefined) { + if (trackedRecord !== undefined) { // The doc is tracked. If the local file backing it has // gone missing — e.g. the user deleted it and the // LocalDelete hasn't drained yet, or our HTTP DELETE just @@ -706,20 +830,16 @@ export class Syncer { // — ignore the update. Otherwise we'd try to operate on a // vanished file (or recreate one we're tearing down). const fileExists = await this.operations.exists( - documentWithPath.path + trackedRecord.path ); if (!fileExists) { this.queue.lastSeenUpdateId = remoteVersion.vaultUpdateId; this.logger.debug( - `Ignoring remote update for ${remoteVersion.documentId}: local file at ${documentWithPath.path} is missing` + `Ignoring remote update for ${remoteVersion.documentId}: local file at ${trackedRecord.path} is missing` ); return; } - return this.processRemoteUpdate( - documentWithPath.path, - documentWithPath.record, - remoteVersion - ); + return this.processRemoteUpdate(trackedRecord, remoteVersion); } if (!remoteVersion.isNewFile) { @@ -756,21 +876,33 @@ export class Syncer { } private async processRemoteUpdate( - path: RelativePath, record: DocumentRecord, remoteVersion: DocumentVersionWithoutContent ): Promise { - // wait for a local edit to do the actual updating here, so we can't even update the lastSeenUpdateId here - const conflictingDoc = this.queue.getSettledDocumentByPath( - remoteVersion.relativePath - ); - const actualPath = await this.operations.move( - path, + // Snapshot the doc's path before any await: the post-write + // history entry needs the "before" value to compose a + // `renamed remotely from X to Y` line. All other path reads + // below go through `record.path`, which `setDocument` and the + // queue's rename branch mutate in place, so any concurrent + // user rename is reflected on every access. + const pathBeforeRoundtrip = record.path; + const moveResult = await this.operations.move( + record.path, remoteVersion.relativePath, - (conflictingDoc?.parentVersionId ?? 0) < remoteVersion.vaultUpdateId - ? MoveOnConflict.EXISTING - : MoveOnConflict.NEW + // Never evict a different doc to make room for the remote + // rename target — if the slot is taken locally our file + // routes to a `conflict--` path and we record the + // server-side intent on the record. Convergence at the + // local level is left to manual user resolution; server + // state stays consistent because all server-bound requests + // route through `intendedPath`. + MoveOnConflict.NEW ); + const { actualPath } = moveResult; + const intendedPath = + actualPath === remoteVersion.relativePath + ? undefined + : remoteVersion.relativePath; if ( !this.queue.hasPendingLocalEventsForDocumentId( remoteVersion.documentId @@ -799,8 +931,10 @@ export class Syncer { ); await this.queue.setDocument(actualPath, { ...record, + path: actualPath, + intendedPath, parentVersionId: remoteVersion.vaultUpdateId, - remoteRelativePath: actualPath, + remoteRelativePath: remoteVersion.relativePath, remoteHash: await hash(remoteContent) }); this.queue.lastSeenUpdateId = remoteVersion.vaultUpdateId; @@ -811,24 +945,29 @@ export class Syncer { document: remoteVersion }); - - - await this.queue.setDocument(actualPath, { + // `record.path` is live: if a user rename's `queue.enqueue` + // ran during the `operations.move` await, the queue mutated + // `record.path` to the user's new target. Reading it now + // gives the latest disk location, so `setDocument` doesn't + // clobber the rename's map entry the way passing the + // pre-await `actualPath` would. + await this.queue.setDocument(record.path, { ...record, - remoteRelativePath: actualPath + intendedPath, + remoteRelativePath: remoteVersion.relativePath }); } - if (actualPath !== path) { + if (actualPath !== pathBeforeRoundtrip) { this.history.addHistoryEntry({ status: SyncStatus.SUCCESS, details: { type: SyncType.MOVE, relativePath: actualPath, - movedFrom: path + movedFrom: pathBeforeRoundtrip }, - message: `File was renamed remotely from ${path} to ${actualPath}`, + message: `File was renamed remotely from ${pathBeforeRoundtrip} to ${actualPath}`, author: remoteVersion.userId, timestamp: new Date(remoteVersion.updatedDate) }); @@ -854,17 +993,19 @@ export class Syncer { vaultUpdateId: remoteVersion.vaultUpdateId }); - const conflictingDoc = this.queue.getSettledDocumentByPath( - remoteVersion.relativePath - ); - - const actualPath = await this.operations.create( + const createResult = await this.operations.create( remoteVersion.relativePath, remoteContent, - (conflictingDoc?.parentVersionId ?? 0) < remoteVersion.vaultUpdateId - ? MoveOnConflict.EXISTING - : MoveOnConflict.NEW + // Never evict a local file occupying the path the server has + // this remote create at — stash the new file at a + // `conflict--` path instead and record `intendedPath`. + MoveOnConflict.NEW ); + const { actualPath } = createResult; + const intendedPath = + actualPath === remoteVersion.relativePath + ? undefined + : remoteVersion.relativePath; await this.updateCache( remoteVersion.vaultUpdateId, @@ -874,6 +1015,8 @@ export class Syncer { const contentHash = await hash(remoteContent); await this.queue.setDocument(actualPath, { + path: actualPath, + intendedPath, documentId: remoteVersion.documentId, parentVersionId: remoteVersion.vaultUpdateId, remoteHash: contentHash, diff --git a/frontend/sync-client/src/sync-operations/types.ts b/frontend/sync-client/src/sync-operations/types.ts index 80c74238..1d5e11dc 100644 --- a/frontend/sync-client/src/sync-operations/types.ts +++ b/frontend/sync-client/src/sync-operations/types.ts @@ -5,23 +5,31 @@ export type DocumentId = string; export type RelativePath = string; export interface DocumentRecord { + // The doc's current local disk path. The queue's `documents` map is + // keyed by this same string and the invariant `documents.get(record.path) + // === record` is held by every queue mutation. Stored as a field on the + // record (not just as the map key) so callers can hold a stable + // reference to the record and read `.path` for the live value rather + // than capturing a string into a local variable that goes stale on the + // next rename. + path: RelativePath; + // Set when the doc's local file lives at a `conflict--` path + // because an earlier remote create / remote rename couldn't claim the + // path the server has it at (it was occupied locally at the time). + // Server-bound requests for this doc must use `intendedPath` rather + // than `path`, otherwise the server would learn about the local + // conflict-uuid path and propagate it as the doc's canonical location + // to every other client. `undefined` for docs whose local path matches + // the server's view. + intendedPath?: RelativePath; documentId: DocumentId; parentVersionId: VaultUpdateId; remoteHash: string; remoteRelativePath: RelativePath; } -export interface DocumentWithPath { - path: RelativePath; - record: DocumentRecord; -} - -interface StoredDocument extends DocumentRecord { - relativePath: RelativePath; -} - export interface StoredSyncState { - documents: StoredDocument[] | undefined; + documents: DocumentRecord[] | undefined; lastSeenUpdateId: VaultUpdateId | undefined; } diff --git a/frontend/sync-client/src/utils/find-matching-file.ts b/frontend/sync-client/src/utils/find-matching-file.ts index f5d65b39..1f42d1a0 100644 --- a/frontend/sync-client/src/utils/find-matching-file.ts +++ b/frontend/sync-client/src/utils/find-matching-file.ts @@ -1,18 +1,14 @@ -import type { - DocumentRecord, - DocumentWithPath, - RelativePath -} from "../sync-operations/types"; +import type { DocumentRecord } from "../sync-operations/types"; import { EMPTY_HASH } from "./hash"; // TODO: make this smarter so that offline files can be renamed & edited at the same time export async function findMatchingFile( contentHash: string, - candidates: { path: RelativePath; record: DocumentRecord }[] -): Promise { + candidates: DocumentRecord[] +): Promise { if (contentHash === (await EMPTY_HASH)) { return undefined; } - return candidates.find(({ record }) => record.remoteHash === contentHash); + return candidates.find((record) => record.remoteHash === contentHash); } diff --git a/sync-server/config-e2e.yml b/sync-server/config-e2e.yml index 242f69ef..03b860b7 100644 --- a/sync-server/config-e2e.yml +++ b/sync-server/config-e2e.yml @@ -7,6 +7,7 @@ server: port: 3010 max_body_size_mb: 512 max_clients_per_vault: 256 + max_pending_websocket_connections: 4096 broadcast_channel_capacity: 1024 response_timeout: 30m mergeable_file_extensions: diff --git a/sync-server/src/app_state/database.rs b/sync-server/src/app_state/database.rs index a3273724..87afebe1 100644 --- a/sync-server/src/app_state/database.rs +++ b/sync-server/src/app_state/database.rs @@ -529,7 +529,17 @@ impl Database { user_id: row.user_id, device_id: row.device_id, content_size: row.content_size.unwrap_or(0), - is_new_file: row.creation_vault_update_id == row.vault_update_id, + // For catch-up streams, "new file" means "new to this + // recipient" — the doc was created past the recipient's + // watermark. The catch-up only carries the doc's + // *latest* version (not its full history), so using + // `creation == latest` instead would mis-flag every + // doc that was created and then updated before the + // client reconnected, and the client's + // `processRemoteChange` would drop it as "stale + // RemoteChange for untracked, non-new document", + // silently leaking docs to clients catching up. + is_new_file: row.creation_vault_update_id > vault_update_id, }) .collect() })