This commit is contained in:
Andras Schmelczer 2026-04-28 22:20:31 +01:00
parent 1163da826e
commit 5776a37dc9
13 changed files with 652 additions and 181 deletions

View file

@ -91,6 +91,9 @@ import { onlineCreateUpdateWhileOtherCreatesSamePathTest } from "./tests/online-
import { displacedFileNotMarkedDeletedTest } from "./tests/displaced-file-not-marked-deleted.test";
import { remoteUpdateResurrectsDeletedDocTest } from "./tests/remote-update-resurrects-deleted-doc.test";
import { localUpdateSurvivesRemoteRenameTest } from "./tests/local-update-survives-remote-rename.test";
import { mergingUpdateResponseSurvivesUserRenameTest } from "./tests/merging-update-response-survives-user-rename.test";
import { conflictUuidStashClearedAfterRenameDeconflictTest } from "./tests/conflict-uuid-stash-cleared-after-rename-deconflict.test";
import { catchupCreateAndUpdateNotSkippedTest } from "./tests/catchup-create-and-update-not-skipped.test";
export const TESTS: Partial<Record<string, TestDefinition>> = {
"rename-create-conflict": renameCreateConflictTest,
@ -203,5 +206,11 @@ export const TESTS: Partial<Record<string, TestDefinition>> = {
"displaced-file-not-marked-deleted": displacedFileNotMarkedDeletedTest,
"remote-update-resurrects-deleted-doc": remoteUpdateResurrectsDeletedDocTest,
"local-update-survives-remote-rename":
localUpdateSurvivesRemoteRenameTest
localUpdateSurvivesRemoteRenameTest,
"merging-update-response-survives-user-rename":
mergingUpdateResponseSurvivesUserRenameTest,
"conflict-uuid-stash-cleared-after-rename-deconflict":
conflictUuidStashClearedAfterRenameDeconflictTest,
"catchup-create-and-update-not-skipped":
catchupCreateAndUpdateNotSkippedTest
};

View file

@ -0,0 +1,66 @@
import type { AssertableState } from "../utils/assertable-state";
import type { TestDefinition } from "../test-definition";
export const catchupCreateAndUpdateNotSkippedTest: TestDefinition = {
description:
"Client 1 disconnects (sync disabled). Client 0 creates a doc and " +
"then updates it. When Client 1 reconnects, the server's catch-up " +
"stream sends only the doc's *latest* version (the update), not the " +
"full history. Pre-fix the wire's `is_new_file` was set to " +
"`creation == latest_version`, so the catch-up flagged the doc as " +
"non-new even though Client 1 had never seen its creation. Client " +
"1's `processRemoteChange` then dropped it as a 'stale RemoteChange " +
"for untracked, non-new document' and the doc was silently lost. " +
"Post-fix `is_new_file` in the catch-up stream means 'new relative " +
"to the recipient's watermark' (`creation > last_seen_vault_update_id`).",
clients: 2,
steps: [
{ type: "enable-sync", client: 0 },
{ type: "enable-sync", client: 1 },
// Establish a baseline so Client 1's last_seen is non-zero before
// we take it offline. This makes the bug genuinely about catch-up
// missing the create rather than just an empty-vault first sync.
{ type: "create", client: 0, path: "warmup.md", content: "w\n" },
{ type: "barrier" },
// Client 1 goes offline.
{ type: "disable-sync", client: 1 },
// Client 0 creates the doc (vault_update_id v_C, after Client 1's
// watermark). Client 1 doesn't see this because it's offline.
{ type: "create", client: 0, path: "doc.md", content: "v1\n" },
// Wait for the create's HTTP to land before the update; otherwise
// both writes are coalesced into a single POST and the server
// never sees the doc as "create followed by update".
{ type: "sync", client: 0 },
// Client 0 updates the doc (vault_update_id v_X > v_C). The
// server's `latest_document_versions` view now returns the
// *update* row — its `creation_vault_update_id != vault_update_id`.
{
type: "update",
client: 0,
path: "doc.md",
content: "v1\nupdate\n"
},
{ type: "sync", client: 0 },
// Client 1 reconnects. Server's catch-up replays docs with
// `vault_update_id > last_seen`. For doc.md it sends v_X with
// `is_new_file` derived from `creation_vault_update_id >
// last_seen_vault_update_id` (post-fix) — so Client 1 treats it
// as a fresh create and downloads the latest content.
{ type: "enable-sync", client: 1 },
{ type: "barrier" },
{
type: "assert-consistent",
verify: (state: AssertableState): void => {
state.assertFileCount(2);
state.assertFileExists("doc.md");
state.assertContent("doc.md", "v1\nupdate\n");
state.assertContent("warmup.md", "w\n");
}
}
]
};

View file

@ -0,0 +1,100 @@
import type { AssertableState } from "../utils/assertable-state";
import type { TestDefinition } from "../test-definition";
export const conflictUuidStashClearedAfterRenameDeconflictTest: TestDefinition =
{
description:
"A `RemoteChange` for a brand-new doc D2 at `target.md` reaches " +
"Client 1's queue *before* Client 1's user-rename of D1 → " +
"`target.md`. The rename's `queue.enqueue` mutates " +
"`documents` synchronously, so by the time the drain processes " +
"the buffered broadcast, `target.md` is already tracked by D1 " +
"with a high `parentVersionId`. " +
"`processRemoteCreateForNewDocument`'s version comparison " +
"(`parentVersionId < remoteVaultUpdateId`) takes the " +
"`MoveOnConflict.NEW` branch and stashes D2 at " +
"`conflict-<uuid>-target.md`. The rename's `LocalUpdate` then " +
"drains, the server deconflicts D1 to `target (1).md`, freeing " +
"the `target.md` slot locally — but D2 is left orphaned at the " +
"`conflict-<uuid>-` path forever, diverging from Client 0 which " +
"has D2 at `target.md`.",
clients: 2,
steps: [
{ type: "enable-sync", client: 0 },
{ type: "enable-sync", client: 1 },
// Both clients have D1 at `original.md`.
{
type: "create",
client: 0,
path: "original.md",
content: "D1 v1\n"
},
{ type: "barrier" },
// Buffer Client 1's WebSocket so D2's broadcast doesn't land
// until we're ready to enqueue it ahead of the rename.
{ type: "pause-websocket", client: 1 },
// Client 0 creates D2 at target.md. Server stores it; broadcast
// is buffered at Client 1.
{
type: "create",
client: 0,
path: "target.md",
content: "D2 v1\n"
},
{ type: "sync", client: 0 },
// Pause the server. Now Client 1's next HTTP PUT will buffer in
// TCP and the drain will sit on `await sendUpdate`.
{ type: "pause-server" },
// Issue an update to D1. The drain pops the LocalUpdate and
// suspends on the HTTP PUT (server is SIGSTOPped). The drain is
// now busy and won't pop further events until resume-server.
{
type: "update",
client: 1,
path: "original.md",
content: "D1 v2\n"
},
// Replay the buffered D2 broadcast. It enqueues as a
// RemoteChange BEHIND the in-flight LocalUpdate but AHEAD of
// the rename event we're about to push.
{ type: "resume-websocket", client: 1 },
// User renames D1 onto target.md. `queue.enqueue` synchronously
// updates `documents` so target.md → D1. The rename's
// LocalUpdate is pushed to the END of the queue, *after* the
// buffered RemoteChange.
{
type: "rename",
client: 1,
oldPath: "original.md",
newPath: "target.md"
},
// Resume the server. Drain order: (1) finish the v2 update PUT
// → D1.parentVersionId bumps above D2's vaultUpdateId. (2)
// process the RemoteChange for D2 — sees `documents.get(target.md)
// = D1` with parentVersionId > vaultUpdateId → MoveOnConflict.NEW
// → stashes D2 at `conflict-<uuid>-target.md`. (3) process the
// rename's LocalUpdate — server deconflicts to target (1).md;
// local file moves there.
{ type: "resume-server" },
{ type: "barrier" },
{
type: "assert-consistent",
verify: (state: AssertableState): void => {
state.assertFileCount(2);
state.assertFileExists("target.md");
state.assertFileExists("target (1).md");
state.assertContent("target.md", "D2 v1\n");
state.assertContent("target (1).md", "D1 v2\n");
}
}
]
};

View file

@ -0,0 +1,77 @@
import type { AssertableState } from "../utils/assertable-state";
import type { TestDefinition } from "../test-definition";
export const mergingUpdateResponseSurvivesUserRenameTest: TestDefinition = {
description:
"Client 1 sends a content update with a stale `parent_version_id` " +
"(its WebSocket is paused, so it hasn't seen Client 0's intervening " +
"edit). The server merges and replies with `MergingUpdate` carrying " +
"the merged text. Before the response lands, the user renames the " +
"doc on Client 1, vacating the disk path the in-flight " +
"`processLocalUpdate` captured. Pre-fix: " +
"`handleMaybeMergingResponse`'s `operations.write(diskPath, …)` " +
"hits the `we wont recreate it` early-return inside `write`, " +
"silently dropping the server-merged content — Client 0's edit is " +
"lost on Client 1's disk, and Client 1's next local-update PUT " +
"(rebased on the now-untracked merged version) deletes Client 0's " +
"edit on the server too. Post-fix: the response is written to the " +
"doc's current tracked disk path, preserving both edits.",
clients: 2,
steps: [
{ type: "enable-sync", client: 0 },
{ type: "enable-sync", client: 1 },
{ type: "create", client: 0, path: "doc.md", content: "0\n" },
{ type: "barrier" },
// Stop Client 1 from seeing Client 0's next edit, so its next
// outbound PUT carries a stale `parent_version_id` and the server
// is forced to merge.
{ type: "pause-websocket", client: 1 },
// Server now holds v_b = "0\nA\n". Client 1's tracked parent
// version stays at v_a = "0\n".
{ type: "update", client: 0, path: "doc.md", content: "0\nA\n" },
{ type: "sync", client: 0 },
// Pause the server. Subsequent HTTP PUTs from Client 1 buffer at
// the OS layer until resume. This guarantees the merge response
// for Client 1's update is still in flight when the rename below
// mutates `queue.documents`.
{ type: "pause-server" },
// Client 1 edits doc.md with "B". The drain pops the LocalUpdate,
// captures `diskPath = "doc.md"`, reads the file, and sends the
// HTTP PUT — which buffers because the server is SIGSTOPped.
{ type: "update", client: 1, path: "doc.md", content: "0\nB\n" },
// User renames the file while the previous PUT is still in flight.
// `queue.enqueue`'s rename branch updates `documents` to point at
// `renamed.md` synchronously, but `processLocalUpdate`'s captured
// `diskPath` ("doc.md") is a local — it can't be retargeted.
{ type: "rename", client: 1, oldPath: "doc.md", newPath: "renamed.md" },
// Resume the server. It reconciles parent=v_a, latest=v_b,
// new="0\nB\n" → v_c with both edits, replies `MergingUpdate`.
// Pre-fix: write("doc.md", …) sees no file at that path
// (renamed.md now holds the data) and bails out without ever
// writing the merged bytes. Post-fix: the merged bytes land at
// the tracked path (renamed.md).
{ type: "resume-server" },
{ type: "resume-websocket", client: 1 },
{ type: "barrier" },
{
type: "assert-consistent",
verify: (state: AssertableState): void => {
state.assertFileCount(1);
state.assertFileExists("renamed.md");
state.assertFileNotExists("doc.md");
// Both edits survive: Client 0's "A" and Client 1's "B".
// The reconcile may interleave them either way; assert
// both tokens are present in the converged content.
state.assertContains("renamed.md", "A", "B");
}
}
]
};

View file

@ -263,14 +263,19 @@ export class FileOperations {
`Displacing existing file at ${path} to '${conflictPath}' to make room`
);
// Intentionally NOT calling `expectRename` here: the displaced
// file may be a tracked document (its `queue.documents` entry
// still points at `path`), and we need the watcher's
// `syncLocallyUpdatedFile` to flow into `queue.enqueue`'s
// path-update branch so the doc's map key follows its file
// to `conflictPath` and gets resynced
await this.fs.rename(path, conflictPath);
return path;
// The displaced file's rename will fire as a watcher event;
// register `expectRename` so the watcher dedups it. The
// caller is responsible for the queue bookkeeping (relocating
// the displaced doc's tracking) using the `displacedTo` we
// return.
this.expectedFsEvents.expectRename(path, conflictPath);
try {
await this.fs.rename(path, conflictPath);
} catch (e) {
this.expectedFsEvents.unexpectRename(path, conflictPath);
throw e;
}
return { actualPath: path, displacedTo: conflictPath };
}
this.logger.debug(

View file

@ -1,10 +1,8 @@
import type { DocumentRecord, DocumentWithPath, RelativePath } from "./types";
import { SyncEventType } from "./types";
import type { DocumentRecord, RelativePath } from "./types";
import type { Logger } from "../tracing/logger";
import { hash } from "../utils/hash";
import type { FileOperations } from "../file-operations/file-operations";
import { findMatchingFile } from "../utils/find-matching-file";
import { FileNotFoundError } from "../errors/file-not-found-error";
import type { SyncEventQueue } from "./sync-event-queue";
import { removeFromArray } from "../utils/remove-from-array";
@ -31,10 +29,10 @@ export async function scheduleOfflineChanges(
// A doc is "possibly deleted" only if it has no local file. Including
// docs that still exist locally would queue a spurious delete alongside
// the update below.
const locallyPossiblyDeletedFiles: DocumentWithPath[] = [];
for (const [path, record] of allDocuments.entries()) {
if (!allLocalFiles.has(path)) {
locallyPossiblyDeletedFiles.push({ path, record });
const locallyPossiblyDeletedFiles: DocumentRecord[] = [];
for (const record of allDocuments.values()) {
if (!allLocalFiles.has(record.path)) {
locallyPossiblyDeletedFiles.push(record);
}
}

View file

@ -40,10 +40,11 @@ function fakeRecord(
overrides: Partial<DocumentRecord> = {}
): DocumentRecord {
return {
path: `${documentId.toLowerCase()}.md`,
documentId,
parentVersionId: 1,
remoteHash: `hash-${documentId}`,
remoteRelativePath: `${documentId}.md`,
remoteRelativePath: `${documentId.toLowerCase()}.md`,
...overrides
};
}
@ -119,7 +120,7 @@ describe("SyncEventQueue", () => {
const found = queue.getDocumentByDocumentId("A");
assert.strictEqual(found?.path, "a.md");
assert.strictEqual(found.record.documentId, "A");
assert.strictEqual(found.documentId, "A");
await queue.removeDocument("a.md");
assert.strictEqual(queue.syncedDocumentCount, 0);
@ -216,14 +217,8 @@ describe("SyncEventQueue", () => {
logger,
{
documents: [
{
relativePath: "a.md",
...fakeRecord("A", { parentVersionId: 5 })
},
{
relativePath: "b.md",
...fakeRecord("B", { parentVersionId: 3 })
}
fakeRecord("A", { path: "a.md", parentVersionId: 5 }),
fakeRecord("B", { path: "b.md", parentVersionId: 3 })
],
lastSeenUpdateId: 4
},

View file

@ -4,7 +4,6 @@ import { globsToRegexes } from "../utils/globs-to-regexes";
import { CONFLICT_PATH_REGEX } from "./conflict-path";
import { removeFromArray } from "../utils/remove-from-array";
import { EventListeners } from "../utils/data-structures/event-listeners";
import type { DocumentWithPath } from "./types";
import {
SyncEventType,
type DocumentId,
@ -79,8 +78,8 @@ export class SyncEventQueue {
initialState ??= {};
if (initialState.documents !== undefined) {
for (const { relativePath, ...record } of initialState.documents) {
this.documents.set(relativePath, record);
for (const record of initialState.documents) {
this.documents.set(record.path, record);
}
}
this._lastSeenUpdateId = new MinCovered(
@ -189,12 +188,14 @@ export class SyncEventQueue {
if (input.type === SyncEventType.LocalDelete) {
this.events.push({
type: SyncEventType.LocalDelete,
documentId: (pendingDocumentId ?? documentId)!
documentId: (pendingDocumentId ?? documentId)!,
path: lookupPath
});
this.notifyPendingUpdateCountChanged();
return;
}
const isUserRename = input.oldPath !== undefined;
let needsSave = false;
if (input.oldPath !== undefined) {
if (pendingDocumentId !== undefined) {
@ -205,6 +206,43 @@ export class SyncEventQueue {
"Unreachable: record must be defined for non-pending update"
);
}
// The user renamed `oldPath` onto `path`. If `path` was
// already tracked by a *different* doc (the OS rename
// overwrote that file), that doc effectively no longer
// exists locally — its content was clobbered. Without
// explicitly recording the loss the doc would silently
// drop out of the documents map below and we'd skip
// notifying the server, leaving a phantom on the remote
// that other agents still see. Enqueue a LocalDelete for
// it so the server learns about the deletion.
const displacedRecord = this.documents.get(path);
if (
displacedRecord !== undefined &&
displacedRecord.documentId !== documentId
) {
this.events.push({
type: SyncEventType.LocalDelete,
documentId: displacedRecord.documentId,
// The doc still lives at `path` on the server; the
// OS rename only overwrote our local file. Snapshot
// the path so `processDelete` can issue the server
// DELETE even after `documents.set(path, record)`
// below removes the entry from the map.
path
});
}
// Inlined relocation: same shape as `setDocument`'s
// relocation branch (mutate the record's path in place,
// delete-old, set-new, retarget queued LocalUpdates) but
// kept synchronous. Callers fire `enqueue` with `void`
// and immediately call `ensureDraining()`; if we awaited
// `setDocument()` here, the LocalUpdate push below would
// happen after the await and the drain that already
// started would see an empty queue, exit, and leave the
// event stranded. We mutate `record.path` rather than
// re-creating it so any reference held by an in-flight
// drain handler sees the new path on its next read.
record.path = path;
this.documents.delete(input.oldPath);
this.documents.set(path, record);
for (const e of this.events) {
@ -220,16 +258,14 @@ export class SyncEventQueue {
}
}
// Push BEFORE awaiting `save()`. Callers fire `enqueue` with `void`
// and immediately call `ensureDraining()`, which starts a drain that
// synchronously shifts off the queue. If we awaited save first the
// shift would see the queue empty, drain would exit, and the event
// would never get processed until the next unrelated trigger.
// Push BEFORE awaiting `save()`. See the comment above on the
// synchronicity contract with `ensureDraining()`.
this.events.push({
type: SyncEventType.LocalUpdate,
documentId: (pendingDocumentId ?? documentId)!,
path,
originalPath: path
originalPath: path,
isUserRename
});
this.notifyPendingUpdateCountChanged();
@ -293,20 +329,54 @@ export class SyncEventQueue {
* If the document is already tracked under a different path (e.g. after a
* rename) the old entry is removed so the map stays keyed by the latest
* disk path and `getDocumentByDocumentId` can't return a stale match.
*
* Whenever this relocates a tracked doc it also rewrites the `path`
* field of every queued `LocalUpdate` for the same doc. The invariant
* the queue relies on and that `skipIfOversized` and the watcher
* dedup checks bake in is that `event.path` always points at the
* doc's current disk location. Letting the map move out from under
* the events would leave readers like `getFileSize(event.path)`
* pointing at a vacated slot and silently swallowing the event.
*/
public async setDocument(
path: RelativePath,
record: DocumentRecord
): Promise<void> {
// If a record for the same docId is already tracked, mutate it in
// place instead of inserting a fresh object. Callers (drain
// handlers, queued events) hold long-lived references to the
// record and read `.path` from it on every access — replacing the
// reference would orphan those reads at the old object's path
// value. Keeping the same object identity also keeps the
// `documents.get(record.path) === record` invariant trivially
// true after a rename.
let target: DocumentRecord | undefined;
for (const [existingPath, existingRecord] of this.documents) {
if (
existingPath !== path &&
existingRecord.documentId === record.documentId
) {
this.documents.delete(existingPath);
if (existingRecord.documentId === record.documentId) {
target = existingRecord;
if (existingPath !== path) {
this.documents.delete(existingPath);
}
}
}
if (target === undefined) {
target = { ...record, path };
} else {
target.path = path;
target.intendedPath = record.intendedPath;
target.parentVersionId = record.parentVersionId;
target.remoteHash = record.remoteHash;
target.remoteRelativePath = record.remoteRelativePath;
}
this.documents.set(path, target);
for (const e of this.events) {
if (
e.type === SyncEventType.LocalUpdate &&
e.documentId === record.documentId
) {
e.path = path;
}
}
this.documents.set(path, record);
return this.save();
}
@ -317,16 +387,16 @@ export class SyncEventQueue {
public getDocumentByDocumentId(
target: DocumentId
): DocumentWithPath | undefined {
for (const [path, record] of this.documents) {
): DocumentRecord | undefined {
for (const record of this.documents.values()) {
if (record.documentId === target) {
return { path, record };
return record;
}
}
return undefined;
}
public getDocumentByDocumentIdOrFail(target: DocumentId): DocumentWithPath {
public getDocumentByDocumentIdOrFail(target: DocumentId): DocumentRecord {
const result = this.getDocumentByDocumentId(target);
if (!result) {
throw new Error(`No document found with id ${target}`);
@ -336,12 +406,7 @@ export class SyncEventQueue {
public async save(): Promise<void> {
return this.saveData({
documents: Array.from(this.documents.entries()).map(
([relativePath, record]) => ({
relativePath,
...record
})
),
documents: Array.from(this.documents.values()),
lastSeenUpdateId: this.lastSeenUpdateId
});
}
@ -387,8 +452,6 @@ export class SyncEventQueue {
);
}
public async clearAllState(): Promise<void> {
this.clearPending();
this.documents.clear();

View file

@ -14,6 +14,7 @@ import {
type FileOperations
} from "../file-operations/file-operations";
import { scheduleOfflineChanges } from "./offline-change-detector";
import { CONFLICT_PATH_REGEX } from "./conflict-path";
import { SyncResetError } from "../errors/sync-reset-error";
import type { DocumentVersionWithoutContent } from "../services/types/DocumentVersionWithoutContent";
import type { WebSocketVaultUpdate } from "../services/types/WebSocketVaultUpdate";
@ -422,7 +423,6 @@ export class Syncer {
});
await this.handleMaybeMergingResponse({
path: event.path,
response,
contentHash,
originalContentBytes: contentBytes,
@ -473,8 +473,8 @@ export class Syncer {
): Promise<void> {
const documentId = await event.documentId;
const tracked = this.queue.getDocumentByDocumentId(documentId);
if (tracked === undefined) {
const record = this.queue.getDocumentByDocumentId(documentId);
if (record === undefined) {
// The doc was deleted between this event being queued and
// drained — skip silently. Common when a LocalDelete drains
// ahead of a LocalUpdate that was already in the queue.
@ -483,9 +483,13 @@ export class Syncer {
);
return;
}
const { path: diskPath, record } = tracked;
const contentBytes = await this.operations.read(diskPath);
// Read `record.path` (not a captured local) on every access. The
// queue mutates `record.path` in place when a user rename arrives
// mid-roundtrip, so re-reading from the live record keeps the
// path current; capturing into a local variable would freeze it
// at function entry and then write/read against a vacated slot.
const contentBytes = await this.operations.read(record.path);
const contentHash = await hash(contentBytes);
// For a user-driven rename the user's intent is `event.originalPath`
@ -506,7 +510,7 @@ export class Syncer {
if (!hashChanged && !pathChanged) {
this.logger.debug(
`File hash of ${diskPath} matches last synced version; no need to sync`
`File hash of ${record.path} matches last synced version; no need to sync`
);
return;
}
@ -518,7 +522,7 @@ export class Syncer {
});
if (response.isDeleted) {
await this.processRemoteDelete(diskPath, {
await this.processRemoteDelete(record.path, {
...response,
contentSize: 0,
isNewFile: false
@ -527,7 +531,8 @@ export class Syncer {
}
await this.handleMaybeMergingResponse({
path: diskPath,
record,
pathBeforeRoundtrip: record.path,
response,
contentHash,
originalContentBytes: contentBytes
@ -538,7 +543,7 @@ export class Syncer {
status: SyncStatus.SUCCESS,
details: {
type: SyncType.UPDATE,
relativePath: diskPath
relativePath: record.path
},
message: isMerge
? "Updated file and merged with remote changes"
@ -549,39 +554,62 @@ export class Syncer {
}
private async handleMaybeMergingResponse({
path,
record,
pathBeforeRoundtrip,
response,
contentHash,
originalContentBytes,
createEvent
}: {
path: RelativePath;
// Live record reference for a LocalUpdate flow. Path reads go
// through `record.path` so a user-rename mid-roundtrip is seen
// on every access.
record?: DocumentRecord;
// Snapshot of `record.path` captured before `sendUpdate`
// awaited. Compared against the live `record.path` after the
// roundtrip to decide whether a user rename happened in
// between.
pathBeforeRoundtrip?: RelativePath;
response: DocumentUpdateResponse;
contentHash: string;
originalContentBytes: Uint8Array;
// When processing a Create, pass the originating event so its
// `resolvers` promise can be fulfilled (or rejected, on a deleted
// response)
// `resolvers` promise can be fulfilled (or rejected, on a
// deleted response). The create flow reads the live disk path
// off `createEvent.path` (mutated by
// `updatePendingCreatePath` on a user rename).
createEvent?: Extract<SyncEvent, { type: SyncEventType.LocalCreate }>;
}): Promise<void> {
const record = {
const newRecord = {
documentId: response.documentId,
parentVersionId: response.vaultUpdateId,
remoteRelativePath: response.relativePath
};
let remoteHash: string;
// The two flows see rename retargeting through different live
// objects:
// - LocalUpdate: `record.path` is mutated in place by
// `queue.enqueue`'s rename branch and `setDocument`.
// - LocalCreate: the doc isn't tracked yet (no
// `resolveCreate` has run); the rename retargets
// `createEvent.path` via `updatePendingCreatePath`.
// In both cases reading the live property at write time keeps
// the merged bytes from being written to a vacated path.
const writePath =
createEvent !== undefined ? createEvent.path : record!.path;
if ("type" in response && response.type === "MergingUpdate") {
const responseBytes = base64ToBytes(response.contentBase64);
await this.operations.write(
path,
writePath,
originalContentBytes,
responseBytes
);
remoteHash = await hash(responseBytes);
await this.updateCache(response.vaultUpdateId, responseBytes, path);
await this.updateCache(response.vaultUpdateId, responseBytes, writePath);
} else {
// Fast-forward update: no merge needed
remoteHash = contentHash;
@ -589,73 +617,169 @@ export class Syncer {
await this.updateCache(
response.vaultUpdateId,
originalContentBytes,
path
writePath
);
}
if (createEvent === undefined) {
// The disk path captured at the start of `processLocalUpdate`
// can be stale: the user may have renamed the file during the
// server roundtrip, in which case `queue.documents` already
// points at the new path and a follow-up rename's LocalUpdate
// is queued behind us. If we forced the disk back to
// `response.relativePath` here we'd undo the user's intent;
// worse, `setDocument`'s same-docId cleanup would clobber the
// map entry that was tracking the latest disk path, leaving
// future LocalUpdates for this doc reading from a vacated
// slot and getting skipped as `FileNotFoundError`. Refresh
// the latest tracked path and only touch disk when it still
// matches the captured one.
const tracked = this.queue.getDocumentByDocumentId(
response.documentId
);
if (tracked === undefined) {
this.logger.debug(
`Document ${response.documentId} is no longer tracked after update; cannot reconcile potential rename`
if (record === undefined || pathBeforeRoundtrip === undefined) {
throw new Error(
"Unreachable: LocalUpdate flow must pass `record` and `pathBeforeRoundtrip`"
);
}
// `record.path` is the *live* path. If a user rename ran
// during the roundtrip, `queue.enqueue` mutated it (and the
// queued LocalUpdate event's `path` field) to the user's
// new target; otherwise it still equals
// `pathBeforeRoundtrip`.
const currentPath = record.path;
if (currentPath === pathBeforeRoundtrip) {
// Move our local file onto the server-assigned path.
// `MoveOnConflict.NEW` means "if the target is taken
// locally by some other doc, route ours to a
// `conflict-<uuid>-` path instead of evicting them".
// We then record `intendedPath = response.relativePath`
// so future server-bound requests for this doc reference
// the path the server actually has it at, not the local
// conflict-uuid path. The other doc keeps its slot;
// local convergence is left to manual user resolution.
const moveResult = await this.operations.move(
currentPath,
response.relativePath,
MoveOnConflict.NEW
);
this.queue.updatePendingCreatePath(currentPath, moveResult.actualPath);
await this.queue.setDocument(moveResult.actualPath, {
...newRecord,
path: moveResult.actualPath,
intendedPath:
moveResult.actualPath === response.relativePath
? undefined
: response.relativePath,
remoteHash
});
} else {
const currentPath = tracked.path;
if (currentPath === path) {
// a http response will always be more up-to-date than any queued remote update
// move will always move to the relative path when MoveOnConflict.EXISTING is given
await this.operations.move(
currentPath,
response.relativePath,
MoveOnConflict.EXISTING
);
this.queue.updatePendingCreatePath(currentPath, response.relativePath);
await this.queue.setDocument(response.relativePath, {
...record,
remoteHash
});
} else {
// User renamed during the roundtrip. Leave the disk file
// at `currentPath`; the queued rename's LocalUpdate will
// reconcile the server on its next drain.
await this.queue.setDocument(currentPath, {
...record,
remoteHash
});
}
// User renamed during the roundtrip. Leave the disk file
// at `currentPath`; the queued rename's LocalUpdate will
// reconcile the server on its next drain.
await this.queue.setDocument(currentPath, {
...newRecord,
path: currentPath,
remoteHash
});
}
} else {
// The server may have deconflicted the path on create (e.g.
// another client raced us to the same path and won). Move the
// local file to match the server-assigned path so the queue's
// disk-path key, the on-disk path, and `remoteRelativePath` stay
// consistent. Without this, a later remote create at the
// originally-requested path would see a phantom local conflict
// and stash the new file under a `conflict-<uuid>-` path.
if (response.relativePath !== createEvent.originalPath) {
await this.operations.move(
// Displacement-merge: while this LocalCreate sat in the queue, a
// RemoteCreate for `originalPath` was processed first, displaced
// our local file to a `conflict-…` path, and tracked the remote
// doc at `originalPath`. The server then de-duplicated our
// create into that already-tracked doc and returned its id.
// Relocate the just-merged content from the conflict path to
// the existing tracked path (overwriting the older content the
// displacement wrote there) and drop the conflict file.
//
// Falling through to `resolveCreate(createEvent, ...)` would
// call `setDocument(conflict-…, D)`, whose same-docId cleanup
// strips D's tracking from `originalPath` and leaves the file
// there orphaned on disk.
const existing = this.queue.getDocumentByDocumentId(
response.documentId
);
if (
existing !== undefined &&
existing.path === response.relativePath &&
existing.path !== createEvent.path
) {
// The merged content already lives at `createEvent.path`
// (the MergingUpdate branch above wrote it there). Slot
// it into `response.relativePath` by deleting D's stale
// content there and renaming the conflict file in. We
// can't `operations.write` the merged bytes onto the
// existing path: that runs a 3-way merge against the
// stale content as if it were a concurrent edit, which
// strips out the very content the server just merged.
await this.operations.delete(response.relativePath);
// We just deleted `response.relativePath`. With
// `MoveOnConflict.NEW` a stray racing occupant would
// route our file to a `conflict-<uuid>-` path; we'd
// then track the doc there with `intendedPath` set.
const moveResult = await this.operations.move(
createEvent.path,
response.relativePath,
MoveOnConflict.EXISTING
MoveOnConflict.NEW
);
this.queue.updatePendingCreatePath(createEvent.path, response.relativePath);
await this.queue.setDocument(moveResult.actualPath, {
...newRecord,
path: moveResult.actualPath,
intendedPath:
moveResult.actualPath === response.relativePath
? undefined
: response.relativePath,
remoteHash
});
this.queue.consumeEvent(createEvent);
createEvent.resolvers.resolve(newRecord.documentId);
this.queue.lastSeenUpdateId = response.vaultUpdateId;
return;
}
// Reconcile disk and tracking with the server-assigned path.
// Two cases produce a mismatch:
// 1. Server deconflicted (e.g. another client raced us): we
// know because `response.relativePath !== createEvent.originalPath`.
// Move the local file to the server-assigned path, otherwise
// a later remote create at our original path would see a
// phantom local conflict and stash the new file under
// `conflict-<uuid>-`.
// 2. The create's local file was displaced to a `conflict-…`
// path while it sat in the queue, but the server still
// placed the doc at our original path (e.g. the existing
// doc that forced the displacement was meanwhile deleted,
// so the server-side merge / deconflict path didn't
// fire). Move the conflict file onto the original path
// so `resolveCreate` tracks the doc at the path the
// server returned, instead of the displaced conflict
// path which would orphan the file.
//
// We must NOT move when `createEvent.path` differs from
// `originalPath` because of a *user rename* of the pending
// create (e.g. write A.md, rename to B.md): there the user's
// intent is the renamed path, the server places the doc at
// `originalPath`, and the queued `LocalUpdate` from the
// watcher will replay the rename to the server.
let resolvedPath = createEvent.path;
let resolvedIntendedPath: RelativePath | undefined;
if (response.relativePath !== createEvent.originalPath) {
const moveResult = await this.operations.move(
createEvent.path,
response.relativePath,
MoveOnConflict.NEW
);
this.queue.updatePendingCreatePath(createEvent.path, moveResult.actualPath);
resolvedPath = moveResult.actualPath;
resolvedIntendedPath =
moveResult.actualPath === response.relativePath
? undefined
: response.relativePath;
} else if (
createEvent.path !== response.relativePath &&
CONFLICT_PATH_REGEX.test(createEvent.path)
) {
const moveResult = await this.operations.move(
createEvent.path,
response.relativePath,
MoveOnConflict.NEW
);
this.queue.updatePendingCreatePath(createEvent.path, moveResult.actualPath);
resolvedPath = moveResult.actualPath;
resolvedIntendedPath =
moveResult.actualPath === response.relativePath
? undefined
: response.relativePath;
}
await this.queue.resolveCreate(createEvent, {
...record,
...newRecord,
path: resolvedPath,
intendedPath: resolvedIntendedPath,
remoteHash
});
}
@ -667,12 +791,12 @@ export class Syncer {
event: Extract<SyncEvent, { type: SyncEventType.RemoteChange }>
): Promise<void> {
const { remoteVersion } = event;
const documentWithPath = this.queue.getDocumentByDocumentId(
const trackedRecord = this.queue.getDocumentByDocumentId(
remoteVersion.documentId
);
if (remoteVersion.isDeleted) {
if (documentWithPath === undefined) {
if (trackedRecord === undefined) {
// The doc isn't tracked locally — either we never had
// it (joined the vault after the delete) or a previous
// delete already cleaned it up. Just advance
@ -682,13 +806,13 @@ export class Syncer {
return;
}
return this.processRemoteDelete(
documentWithPath.path,
trackedRecord.path,
remoteVersion
);
}
if (
(documentWithPath?.record.parentVersionId ?? 0) >=
(trackedRecord?.parentVersionId ?? 0) >=
remoteVersion.vaultUpdateId
) {
this.queue.lastSeenUpdateId = remoteVersion.vaultUpdateId;
@ -698,7 +822,7 @@ export class Syncer {
return;
}
if (documentWithPath !== undefined) {
if (trackedRecord !== undefined) {
// The doc is tracked. If the local file backing it has
// gone missing — e.g. the user deleted it and the
// LocalDelete hasn't drained yet, or our HTTP DELETE just
@ -706,20 +830,16 @@ export class Syncer {
// — ignore the update. Otherwise we'd try to operate on a
// vanished file (or recreate one we're tearing down).
const fileExists = await this.operations.exists(
documentWithPath.path
trackedRecord.path
);
if (!fileExists) {
this.queue.lastSeenUpdateId = remoteVersion.vaultUpdateId;
this.logger.debug(
`Ignoring remote update for ${remoteVersion.documentId}: local file at ${documentWithPath.path} is missing`
`Ignoring remote update for ${remoteVersion.documentId}: local file at ${trackedRecord.path} is missing`
);
return;
}
return this.processRemoteUpdate(
documentWithPath.path,
documentWithPath.record,
remoteVersion
);
return this.processRemoteUpdate(trackedRecord, remoteVersion);
}
if (!remoteVersion.isNewFile) {
@ -756,21 +876,33 @@ export class Syncer {
}
private async processRemoteUpdate(
path: RelativePath,
record: DocumentRecord,
remoteVersion: DocumentVersionWithoutContent
): Promise<void> {
// wait for a local edit to do the actual updating here, so we can't even update the lastSeenUpdateId here
const conflictingDoc = this.queue.getSettledDocumentByPath(
remoteVersion.relativePath
);
const actualPath = await this.operations.move(
path,
// Snapshot the doc's path before any await: the post-write
// history entry needs the "before" value to compose a
// `renamed remotely from X to Y` line. All other path reads
// below go through `record.path`, which `setDocument` and the
// queue's rename branch mutate in place, so any concurrent
// user rename is reflected on every access.
const pathBeforeRoundtrip = record.path;
const moveResult = await this.operations.move(
record.path,
remoteVersion.relativePath,
(conflictingDoc?.parentVersionId ?? 0) < remoteVersion.vaultUpdateId
? MoveOnConflict.EXISTING
: MoveOnConflict.NEW
// Never evict a different doc to make room for the remote
// rename target — if the slot is taken locally our file
// routes to a `conflict-<uuid>-` path and we record the
// server-side intent on the record. Convergence at the
// local level is left to manual user resolution; server
// state stays consistent because all server-bound requests
// route through `intendedPath`.
MoveOnConflict.NEW
);
const { actualPath } = moveResult;
const intendedPath =
actualPath === remoteVersion.relativePath
? undefined
: remoteVersion.relativePath;
if (
!this.queue.hasPendingLocalEventsForDocumentId(
remoteVersion.documentId
@ -799,8 +931,10 @@ export class Syncer {
);
await this.queue.setDocument(actualPath, {
...record,
path: actualPath,
intendedPath,
parentVersionId: remoteVersion.vaultUpdateId,
remoteRelativePath: actualPath,
remoteRelativePath: remoteVersion.relativePath,
remoteHash: await hash(remoteContent)
});
this.queue.lastSeenUpdateId = remoteVersion.vaultUpdateId;
@ -811,24 +945,29 @@ export class Syncer {
document: remoteVersion
});
await this.queue.setDocument(actualPath, {
// `record.path` is live: if a user rename's `queue.enqueue`
// ran during the `operations.move` await, the queue mutated
// `record.path` to the user's new target. Reading it now
// gives the latest disk location, so `setDocument` doesn't
// clobber the rename's map entry the way passing the
// pre-await `actualPath` would.
await this.queue.setDocument(record.path, {
...record,
remoteRelativePath: actualPath
intendedPath,
remoteRelativePath: remoteVersion.relativePath
});
}
if (actualPath !== path) {
if (actualPath !== pathBeforeRoundtrip) {
this.history.addHistoryEntry({
status: SyncStatus.SUCCESS,
details: {
type: SyncType.MOVE,
relativePath: actualPath,
movedFrom: path
movedFrom: pathBeforeRoundtrip
},
message: `File was renamed remotely from ${path} to ${actualPath}`,
message: `File was renamed remotely from ${pathBeforeRoundtrip} to ${actualPath}`,
author: remoteVersion.userId,
timestamp: new Date(remoteVersion.updatedDate)
});
@ -854,17 +993,19 @@ export class Syncer {
vaultUpdateId: remoteVersion.vaultUpdateId
});
const conflictingDoc = this.queue.getSettledDocumentByPath(
remoteVersion.relativePath
);
const actualPath = await this.operations.create(
const createResult = await this.operations.create(
remoteVersion.relativePath,
remoteContent,
(conflictingDoc?.parentVersionId ?? 0) < remoteVersion.vaultUpdateId
? MoveOnConflict.EXISTING
: MoveOnConflict.NEW
// Never evict a local file occupying the path the server has
// this remote create at — stash the new file at a
// `conflict-<uuid>-` path instead and record `intendedPath`.
MoveOnConflict.NEW
);
const { actualPath } = createResult;
const intendedPath =
actualPath === remoteVersion.relativePath
? undefined
: remoteVersion.relativePath;
await this.updateCache(
remoteVersion.vaultUpdateId,
@ -874,6 +1015,8 @@ export class Syncer {
const contentHash = await hash(remoteContent);
await this.queue.setDocument(actualPath, {
path: actualPath,
intendedPath,
documentId: remoteVersion.documentId,
parentVersionId: remoteVersion.vaultUpdateId,
remoteHash: contentHash,

View file

@ -5,23 +5,31 @@ export type DocumentId = string;
export type RelativePath = string;
export interface DocumentRecord {
// The doc's current local disk path. The queue's `documents` map is
// keyed by this same string and the invariant `documents.get(record.path)
// === record` is held by every queue mutation. Stored as a field on the
// record (not just as the map key) so callers can hold a stable
// reference to the record and read `.path` for the live value rather
// than capturing a string into a local variable that goes stale on the
// next rename.
path: RelativePath;
// Set when the doc's local file lives at a `conflict-<uuid>-` path
// because an earlier remote create / remote rename couldn't claim the
// path the server has it at (it was occupied locally at the time).
// Server-bound requests for this doc must use `intendedPath` rather
// than `path`, otherwise the server would learn about the local
// conflict-uuid path and propagate it as the doc's canonical location
// to every other client. `undefined` for docs whose local path matches
// the server's view.
intendedPath?: RelativePath;
documentId: DocumentId;
parentVersionId: VaultUpdateId;
remoteHash: string;
remoteRelativePath: RelativePath;
}
export interface DocumentWithPath {
path: RelativePath;
record: DocumentRecord;
}
interface StoredDocument extends DocumentRecord {
relativePath: RelativePath;
}
export interface StoredSyncState {
documents: StoredDocument[] | undefined;
documents: DocumentRecord[] | undefined;
lastSeenUpdateId: VaultUpdateId | undefined;
}

View file

@ -1,18 +1,14 @@
import type {
DocumentRecord,
DocumentWithPath,
RelativePath
} from "../sync-operations/types";
import type { DocumentRecord } from "../sync-operations/types";
import { EMPTY_HASH } from "./hash";
// TODO: make this smarter so that offline files can be renamed & edited at the same time
export async function findMatchingFile(
contentHash: string,
candidates: { path: RelativePath; record: DocumentRecord }[]
): Promise<DocumentWithPath | undefined> {
candidates: DocumentRecord[]
): Promise<DocumentRecord | undefined> {
if (contentHash === (await EMPTY_HASH)) {
return undefined;
}
return candidates.find(({ record }) => record.remoteHash === contentHash);
return candidates.find((record) => record.remoteHash === contentHash);
}

View file

@ -7,6 +7,7 @@ server:
port: 3010
max_body_size_mb: 512
max_clients_per_vault: 256
max_pending_websocket_connections: 4096
broadcast_channel_capacity: 1024
response_timeout: 30m
mergeable_file_extensions:

View file

@ -529,7 +529,17 @@ impl Database {
user_id: row.user_id,
device_id: row.device_id,
content_size: row.content_size.unwrap_or(0),
is_new_file: row.creation_vault_update_id == row.vault_update_id,
// For catch-up streams, "new file" means "new to this
// recipient" — the doc was created past the recipient's
// watermark. The catch-up only carries the doc's
// *latest* version (not its full history), so using
// `creation == latest` instead would mis-flag every
// doc that was created and then updated before the
// client reconnected, and the client's
// `processRemoteChange` would drop it as "stale
// RemoteChange for untracked, non-new document",
// silently leaking docs to clients catching up.
is_new_file: row.creation_vault_update_id > vault_update_id,
})
.collect()
})