claude claims it woorks
This commit is contained in:
parent
8e87537e49
commit
9151e0b2d6
9 changed files with 582 additions and 63 deletions
|
|
@ -2,7 +2,11 @@ import type { PersistenceProvider } from "./persistence/persistence";
|
|||
import type { HistoryEntry, HistoryStats } from "./tracing/sync-history";
|
||||
import { SyncHistory } from "./tracing/sync-history";
|
||||
import { Logger, LogLevel, LogLine } from "./tracing/logger";
|
||||
import type { RelativePath, StoredSyncState } from "./sync-operations/types";
|
||||
import type {
|
||||
DocumentId,
|
||||
RelativePath,
|
||||
StoredSyncState
|
||||
} from "./sync-operations/types";
|
||||
import { SyncEventQueue } from "./sync-operations/sync-event-queue";
|
||||
import * as Sentry from "@sentry/browser";
|
||||
import type { SyncSettings } from "./persistence/settings";
|
||||
|
|
@ -76,6 +80,27 @@ export class SyncClient {
|
|||
return this.history.onHistoryUpdated;
|
||||
}
|
||||
|
||||
/**
|
||||
* Fires whenever a tracked document's local file moves on disk —
|
||||
* watcher-driven user renames, post-create deconflicts placed by
|
||||
* the reconciler, lost-rename replays in offline scan, slot
|
||||
* displacements when another record claims a path. Both
|
||||
* `oldPath` and `newPath` may be `undefined` (placement-pending
|
||||
* state). Useful for callers that mirror disk-side path state
|
||||
* — e.g. test harnesses tracking which paths are safe to mutate
|
||||
* — and need a signal beyond the user-facing history.
|
||||
*/
|
||||
public get onDocumentPathChanged(): EventListeners<
|
||||
(
|
||||
documentId: DocumentId,
|
||||
oldPath: RelativePath | undefined,
|
||||
newPath: RelativePath | undefined
|
||||
) => unknown
|
||||
> {
|
||||
this.checkIfDestroyed("onDocumentPathChanged getter");
|
||||
return this.syncEventQueue.onDocumentPathChanged;
|
||||
}
|
||||
|
||||
public get onSettingsChanged(): EventListeners<
|
||||
(newSettings: SyncSettings, oldSettings: SyncSettings) => unknown
|
||||
> {
|
||||
|
|
@ -123,6 +148,7 @@ export class SyncClient {
|
|||
Partial<{
|
||||
settings: Partial<SyncSettings>;
|
||||
database: Partial<StoredSyncState>;
|
||||
deviceId: string;
|
||||
}>
|
||||
>;
|
||||
fetch?: typeof globalThis.fetch;
|
||||
|
|
@ -131,17 +157,30 @@ export class SyncClient {
|
|||
}): Promise<SyncClient> {
|
||||
const logger = new Logger();
|
||||
|
||||
const deviceId = createClientId();
|
||||
|
||||
logger.info(`Creating SyncClient with client id ${deviceId}`);
|
||||
|
||||
const history = new SyncHistory(logger);
|
||||
|
||||
let state = (await persistence.load()) ?? {
|
||||
settings: undefined,
|
||||
database: undefined
|
||||
database: undefined,
|
||||
deviceId: undefined
|
||||
};
|
||||
|
||||
// Persist deviceId across destroy + init so the server's
|
||||
// lost-create dedup (which scopes by device_id) can recognise
|
||||
// a retry as belonging to the same client. Without this,
|
||||
// every fresh `SyncClient` after a destroy would generate a
|
||||
// new deviceId, the server-side query would miss, and the
|
||||
// pending-but-lost create would deconflict instead of
|
||||
// binding to the doc its content was already absorbed into.
|
||||
let deviceId = state.deviceId;
|
||||
if (deviceId === undefined) {
|
||||
deviceId = createClientId();
|
||||
state = { ...state, deviceId };
|
||||
await persistence.save(state);
|
||||
}
|
||||
|
||||
logger.info(`Creating SyncClient with client id ${deviceId}`);
|
||||
|
||||
const settings = new Settings(
|
||||
logger,
|
||||
state.settings,
|
||||
|
|
|
|||
|
|
@ -0,0 +1,185 @@
|
|||
import { describe, it } from "node:test";
|
||||
import assert from "node:assert";
|
||||
import { Logger } from "../tracing/logger";
|
||||
import { Settings } from "../persistence/settings";
|
||||
import { STORED_STATE_SCHEMA_VERSION, SyncEventQueue } from "./sync-event-queue";
|
||||
import { scheduleOfflineChanges } from "./offline-change-detector";
|
||||
import type { FileOperations } from "../file-operations/file-operations";
|
||||
import type { RelativePath } from "./types";
|
||||
|
||||
const makeQueue = async (): Promise<SyncEventQueue> => {
|
||||
const logger = new Logger();
|
||||
const settings = new Settings(logger, {}, async () => {
|
||||
/* no-op */
|
||||
});
|
||||
return new SyncEventQueue(
|
||||
settings,
|
||||
logger,
|
||||
{ schemaVersion: STORED_STATE_SCHEMA_VERSION },
|
||||
async () => {
|
||||
/* no-op */
|
||||
}
|
||||
);
|
||||
};
|
||||
|
||||
const makeOperations = (
|
||||
files: Record<string, Uint8Array>
|
||||
): FileOperations => {
|
||||
return {
|
||||
listFilesRecursively: async () => Object.keys(files),
|
||||
read: async (path: RelativePath) => {
|
||||
const data = files[path];
|
||||
if (data === undefined) {
|
||||
throw new Error(`File not found: ${path}`);
|
||||
}
|
||||
return data;
|
||||
}
|
||||
} as unknown as FileOperations;
|
||||
};
|
||||
|
||||
describe("scheduleOfflineChanges", () => {
|
||||
it("does not bind a local file to a placement-pending record whose remoteRelativePath was persisted before the doc moved on the server", async () => {
|
||||
// The bug: persisted byDocId can carry a placement-pending record
|
||||
// whose `remoteRelativePath` was saved before the doc was moved
|
||||
// server-side. After restart, offline-scan running before WS
|
||||
// catch-up would bind an unrelated local file at that stale path
|
||||
// to the moved doc and push the user's content as an update —
|
||||
// silently corrupting the moved doc and stranding the local file.
|
||||
const queue = await makeQueue();
|
||||
|
||||
// Stale placement-pending record: server has moved this doc
|
||||
// away from "stale-X.md" since this snapshot was saved.
|
||||
await queue.upsertRecord({
|
||||
documentId: "MOVED-DOC",
|
||||
parentVersionId: 5,
|
||||
remoteRelativePath: "stale-X.md" as RelativePath,
|
||||
remoteHash: "hash-from-old-state",
|
||||
localPath: undefined
|
||||
});
|
||||
|
||||
// User has an unrelated local file at the stale path.
|
||||
const operations = makeOperations({
|
||||
"stale-X.md": new TextEncoder().encode(
|
||||
"user's unrelated local content"
|
||||
)
|
||||
});
|
||||
|
||||
const enqueued: { kind: string; path: string }[] = [];
|
||||
await scheduleOfflineChanges(
|
||||
new Logger(),
|
||||
operations,
|
||||
queue,
|
||||
(path) => enqueued.push({ kind: "create", path }),
|
||||
(args) => enqueued.push({ kind: "update", path: args.relativePath }),
|
||||
(path) => enqueued.push({ kind: "delete", path })
|
||||
);
|
||||
|
||||
// The local file must become a fresh CREATE — never a hostile
|
||||
// UPDATE on the moved doc.
|
||||
assert.deepStrictEqual(enqueued, [
|
||||
{ kind: "create", path: "stale-X.md" }
|
||||
]);
|
||||
|
||||
// The placement-pending record must remain placement-pending —
|
||||
// its localPath must not have been bound to the unrelated user
|
||||
// file. The reconciler will place it correctly once WS catch-up
|
||||
// updates `remoteRelativePath` to the doc's current location.
|
||||
const record = queue.getDocumentByDocumentId("MOVED-DOC");
|
||||
assert.notStrictEqual(record, undefined);
|
||||
assert.strictEqual(record?.localPath, undefined);
|
||||
});
|
||||
|
||||
it("schedules an update for a local file that matches a settled record's localPath", async () => {
|
||||
const queue = await makeQueue();
|
||||
await queue.upsertRecord({
|
||||
documentId: "SETTLED-DOC",
|
||||
parentVersionId: 2,
|
||||
remoteRelativePath: "doc.md" as RelativePath,
|
||||
remoteHash: "hash",
|
||||
localPath: "doc.md" as RelativePath
|
||||
});
|
||||
|
||||
const operations = makeOperations({
|
||||
"doc.md": new TextEncoder().encode("content")
|
||||
});
|
||||
|
||||
const enqueued: { kind: string; path: string }[] = [];
|
||||
await scheduleOfflineChanges(
|
||||
new Logger(),
|
||||
operations,
|
||||
queue,
|
||||
(path) => enqueued.push({ kind: "create", path }),
|
||||
(args) => enqueued.push({ kind: "update", path: args.relativePath }),
|
||||
(path) => enqueued.push({ kind: "delete", path })
|
||||
);
|
||||
|
||||
assert.deepStrictEqual(enqueued, [
|
||||
{ kind: "update", path: "doc.md" }
|
||||
]);
|
||||
});
|
||||
|
||||
it("schedules a delete for a settled record whose local file is missing", async () => {
|
||||
const queue = await makeQueue();
|
||||
await queue.upsertRecord({
|
||||
documentId: "VANISHED-DOC",
|
||||
parentVersionId: 4,
|
||||
remoteRelativePath: "gone.md" as RelativePath,
|
||||
remoteHash: "hash",
|
||||
localPath: "gone.md" as RelativePath
|
||||
});
|
||||
|
||||
const operations = makeOperations({});
|
||||
|
||||
const enqueued: { kind: string; path: string }[] = [];
|
||||
await scheduleOfflineChanges(
|
||||
new Logger(),
|
||||
operations,
|
||||
queue,
|
||||
(path) => enqueued.push({ kind: "create", path }),
|
||||
(args) => enqueued.push({ kind: "update", path: args.relativePath }),
|
||||
(path) => enqueued.push({ kind: "delete", path })
|
||||
);
|
||||
|
||||
assert.deepStrictEqual(enqueued, [
|
||||
{ kind: "delete", path: "gone.md" }
|
||||
]);
|
||||
});
|
||||
|
||||
it("detects an offline rename when an untracked file matches a deleted record's content hash", async () => {
|
||||
const queue = await makeQueue();
|
||||
const content = new TextEncoder().encode("body");
|
||||
const contentHash = await (await import("../utils/hash")).hash(content);
|
||||
|
||||
await queue.upsertRecord({
|
||||
documentId: "DOC-1",
|
||||
parentVersionId: 5,
|
||||
remoteRelativePath: "old.md" as RelativePath,
|
||||
remoteHash: contentHash,
|
||||
localPath: "old.md" as RelativePath
|
||||
});
|
||||
const operations = makeOperations({ "new.md": content });
|
||||
|
||||
const enqueued: {
|
||||
kind: string;
|
||||
path: string;
|
||||
oldPath?: string;
|
||||
}[] = [];
|
||||
await scheduleOfflineChanges(
|
||||
new Logger(),
|
||||
operations,
|
||||
queue,
|
||||
(path) => enqueued.push({ kind: "create", path }),
|
||||
(args) =>
|
||||
enqueued.push({
|
||||
kind: "update",
|
||||
path: args.relativePath,
|
||||
oldPath: args.oldPath
|
||||
}),
|
||||
(path) => enqueued.push({ kind: "delete", path })
|
||||
);
|
||||
|
||||
assert.deepStrictEqual(enqueued, [
|
||||
{ kind: "update", path: "new.md", oldPath: "old.md" }
|
||||
]);
|
||||
});
|
||||
});
|
||||
|
|
@ -10,6 +10,17 @@ import { removeFromArray } from "../utils/remove-from-array";
|
|||
* Scans the local filesystem and the document database to determine
|
||||
* which files were created, updated, moved, or deleted while the
|
||||
* client was offline, then enqueues the appropriate sync events.
|
||||
*
|
||||
* Placement-pending records (`localPath === undefined`) are deliberately
|
||||
* NOT bound to local files at the same `remoteRelativePath` here. The
|
||||
* persisted byDocId snapshot can be stale — a doc's server-side path
|
||||
* may have changed since the last save, so binding by stored path would
|
||||
* fold an unrelated user file into a moved doc and silently corrupt it.
|
||||
* Local files at those paths fall through to the LocalCreate flow below;
|
||||
* the server's create_document handler dedupes by path+freshness when
|
||||
* the doc really is at that path, and otherwise creates a new doc that
|
||||
* the reconciler places correctly once catch-up updates the stale
|
||||
* record's `remoteRelativePath`.
|
||||
*/
|
||||
export async function scheduleOfflineChanges(
|
||||
logger: Logger,
|
||||
|
|
@ -30,28 +41,6 @@ export async function scheduleOfflineChanges(
|
|||
// next pass.
|
||||
const allDocuments = queue.allSettledDocuments();
|
||||
|
||||
// Placement-pending records (`localPath === undefined`) name a server
|
||||
// path that the reconciler will eventually place. If the user already
|
||||
// has a local file at that path — common after a sync-disable or
|
||||
// reset that discarded a successful create's response, leaving the
|
||||
// server-known doc as a placement-pending record once catch-up
|
||||
// re-delivered it — treating it as an untracked file would
|
||||
// re-create a duplicate doc at the server's deconflicted path. Bind
|
||||
// each placement-pending record to its on-disk file: a same-hash
|
||||
// file just inherits the record's localPath; a different-hash file
|
||||
// is folded into the sync-up update flow below (an UPDATE on the
|
||||
// existing doc rather than a fresh CREATE).
|
||||
for (const record of queue.allRecords()) {
|
||||
if (record.localPath !== undefined) {
|
||||
continue;
|
||||
}
|
||||
if (!allLocalFiles.has(record.remoteRelativePath)) {
|
||||
continue;
|
||||
}
|
||||
await queue.setLocalPath(record.documentId, record.remoteRelativePath);
|
||||
allDocuments.set(record.remoteRelativePath, record);
|
||||
}
|
||||
|
||||
// A doc is "possibly deleted" only if it has no local file. Including
|
||||
// docs that still exist locally would queue a spurious delete alongside
|
||||
// the update below.
|
||||
|
|
@ -75,6 +64,13 @@ export async function scheduleOfflineChanges(
|
|||
for (const localFile of allLocalFiles) {
|
||||
if (allDocuments.has(localFile)) {
|
||||
syncedLocalFiles.push(localFile);
|
||||
} else if (queue.hasPendingCreateForPath(localFile)) {
|
||||
// A LocalCreate for this path is still in flight (no
|
||||
// record yet — its docId is a Promise). Re-enqueueing
|
||||
// would fire a second HTTP create that the server then
|
||||
// deconflicts to a sibling path, leaving the same bytes
|
||||
// in two docs. Skip; the in-flight create owns this slot.
|
||||
continue;
|
||||
} else {
|
||||
locallyPossibleCreatedFiles.push(localFile);
|
||||
}
|
||||
|
|
@ -131,6 +127,40 @@ export async function scheduleOfflineChanges(
|
|||
}
|
||||
|
||||
for (const path of syncedLocalFiles) {
|
||||
const record = allDocuments.get(path);
|
||||
if (
|
||||
record !== undefined &&
|
||||
record.localPath !== undefined &&
|
||||
record.localPath !== record.remoteRelativePath &&
|
||||
!allLocalFiles.has(record.remoteRelativePath) &&
|
||||
queue.byLocalPath.get(record.remoteRelativePath) === undefined
|
||||
) {
|
||||
// Lost local-rename recovery. The record's `localPath`
|
||||
// (where the user has the file now) and
|
||||
// `remoteRelativePath` (where the server still thinks it
|
||||
// lives) disagree, which means a queued user-rename's
|
||||
// LocalUpdate never reached the server before the queue
|
||||
// was wiped (typically a sync reset). Without this
|
||||
// branch the next `enqueueUpdate({ relativePath: path })`
|
||||
// is a content-only update — server keeps the doc at the
|
||||
// old path, the user's file at the new path orphans, and
|
||||
// other clients never see the rename. Replay the rename
|
||||
// by restoring the OLD localPath so the queue's enqueue
|
||||
// can find the record by `oldPath`, then enqueueUpdate
|
||||
// moves it back to the new path with `isUserRename`.
|
||||
// Only fires when the old slot is genuinely empty
|
||||
// (neither on disk nor claimed by another tracked
|
||||
// record) — otherwise the rename target is occupied and
|
||||
// we'd be confusing the byLocalPath index.
|
||||
const oldPath = record.remoteRelativePath;
|
||||
const newPath = record.localPath;
|
||||
logger.info(
|
||||
`Lost local rename detected: doc ${record.documentId} at ${oldPath} (server) vs ${newPath} (local); replaying rename to server`
|
||||
);
|
||||
await queue.setLocalPath(record.documentId, oldPath);
|
||||
enqueueUpdate({ oldPath, relativePath: newPath });
|
||||
continue;
|
||||
}
|
||||
logger.info(
|
||||
`File ${path} may have been updated while offline, scheduling sync to update it`
|
||||
);
|
||||
|
|
|
|||
|
|
@ -94,6 +94,7 @@ export class Reconciler {
|
|||
const allRecords = this.collectAllRecords();
|
||||
|
||||
const movesNeeded: PlannedMove[] = [];
|
||||
const deferredPlacements: DocumentRecord[] = [];
|
||||
|
||||
for (const record of allRecords) {
|
||||
if (record.localPath === record.remoteRelativePath) {
|
||||
|
|
@ -129,7 +130,7 @@ export class Reconciler {
|
|||
}
|
||||
|
||||
if (record.localPath === undefined) {
|
||||
await this.tryInitialPlacement(record);
|
||||
deferredPlacements.push(record);
|
||||
continue;
|
||||
}
|
||||
|
||||
|
|
@ -160,11 +161,35 @@ export class Reconciler {
|
|||
});
|
||||
}
|
||||
|
||||
if (movesNeeded.length === 0) {
|
||||
return;
|
||||
if (movesNeeded.length > 0) {
|
||||
await this.executeMoves(movesNeeded);
|
||||
}
|
||||
|
||||
await this.executeMoves(movesNeeded);
|
||||
// Run placements *after* moves so a placement whose target slot
|
||||
// was occupied by a tracked record at the start of the pass can
|
||||
// still succeed once that record's move frees the slot. Without
|
||||
// this ordering, a placement-pending record stalls until the
|
||||
// next reconciler tick — which only fires when new events
|
||||
// arrive, leaving the doc absent on disk if the queue happens
|
||||
// to be quiescent at that moment.
|
||||
for (const record of deferredPlacements) {
|
||||
// Re-check the gating conditions: a pending event may have
|
||||
// been enqueued for this doc while we were processing
|
||||
// moves above, and an interleaved placement would race
|
||||
// it.
|
||||
if (
|
||||
this.queue.hasPendingLocalEventsForDocumentId(record.documentId)
|
||||
) {
|
||||
continue;
|
||||
}
|
||||
if (this.queue.hasPendingServerDelete(record.documentId)) {
|
||||
continue;
|
||||
}
|
||||
if (record.localPath !== undefined) {
|
||||
continue;
|
||||
}
|
||||
await this.tryInitialPlacement(record);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -26,6 +26,22 @@ export class SyncEventQueue {
|
|||
(count: number) => unknown
|
||||
>();
|
||||
|
||||
// Fires whenever a record's `localPath` transitions to a different
|
||||
// value. Subscribers see every disk-side path change — watcher-
|
||||
// driven user renames, post-create deconflicts placed by the
|
||||
// reconciler, lost-rename replays in offline-scan, displacements
|
||||
// when another record claims a slot. Useful for callers that
|
||||
// mirror disk-side state (e.g. test harnesses that maintain a
|
||||
// "do-not-touch" list keyed by current path). Both `oldPath` and
|
||||
// `newPath` may be `undefined` (placement-pending state).
|
||||
public readonly onDocumentPathChanged = new EventListeners<
|
||||
(
|
||||
documentId: DocumentId,
|
||||
oldPath: RelativePath | undefined,
|
||||
newPath: RelativePath | undefined
|
||||
) => unknown
|
||||
>();
|
||||
|
||||
private readonly _lastSeenUpdateId: MinCovered;
|
||||
|
||||
// Primary index of every settled document, keyed by docId. The wire loop
|
||||
|
|
@ -837,13 +853,16 @@ export class SyncEventQueue {
|
|||
record: DocumentRecord,
|
||||
newLocalPath: RelativePath | undefined
|
||||
): void {
|
||||
const previousLocalPath = record.localPath;
|
||||
if (
|
||||
record.localPath !== undefined &&
|
||||
this._byLocalPath.get(record.localPath) === record
|
||||
previousLocalPath !== undefined &&
|
||||
this._byLocalPath.get(previousLocalPath) === record
|
||||
) {
|
||||
this._byLocalPath.delete(record.localPath);
|
||||
this._byLocalPath.delete(previousLocalPath);
|
||||
}
|
||||
record.localPath = newLocalPath;
|
||||
let displacedRecord: DocumentRecord | undefined;
|
||||
let displacedOldPath: RelativePath | undefined;
|
||||
if (newLocalPath !== undefined) {
|
||||
const displaced = this._byLocalPath.get(newLocalPath);
|
||||
if (displaced !== undefined && displaced !== record) {
|
||||
|
|
@ -851,10 +870,26 @@ export class SyncEventQueue {
|
|||
// We're about to overwrite that slot, so clear the
|
||||
// displaced record's localPath; the reconciler will
|
||||
// re-place it via tryInitialPlacement on the next pass.
|
||||
displacedOldPath = displaced.localPath;
|
||||
displaced.localPath = undefined;
|
||||
displacedRecord = displaced;
|
||||
}
|
||||
this._byLocalPath.set(newLocalPath, record);
|
||||
}
|
||||
if (previousLocalPath !== newLocalPath) {
|
||||
this.onDocumentPathChanged.trigger(
|
||||
record.documentId,
|
||||
previousLocalPath,
|
||||
newLocalPath
|
||||
);
|
||||
}
|
||||
if (displacedRecord !== undefined) {
|
||||
this.onDocumentPathChanged.trigger(
|
||||
displacedRecord.documentId,
|
||||
displacedOldPath,
|
||||
undefined
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
private notifyPendingUpdateCountChanged(): void {
|
||||
|
|
|
|||
|
|
@ -506,12 +506,6 @@ export class Syncer {
|
|||
contentBytes
|
||||
});
|
||||
|
||||
// If the user renamed the file while the create request was in flight,
|
||||
// event.path now points at the renamed disk slot. Apply response bytes
|
||||
// and install the local record there; the queued LocalUpdate carries
|
||||
// the server-side rename intent.
|
||||
const localPath = event.path;
|
||||
|
||||
// Same-docId collapse. While our LocalCreate sat in the queue, a
|
||||
// RemoteCreate may have arrived for this same path. The wire-loop's
|
||||
// `processRemoteCreateForNewDocument` would have built a record with
|
||||
|
|
@ -523,8 +517,14 @@ export class Syncer {
|
|||
let remoteHash = contentHash;
|
||||
if (response.type === "MergingUpdate") {
|
||||
const responseBytes = base64ToBytes(response.contentBase64);
|
||||
// Read `event.path` live for both the write target and the
|
||||
// cache key. A user rename arriving between HTTP-send and
|
||||
// HTTP-response rewrites `event.path` via
|
||||
// `updatePendingCreatePath`; the merge write must land on
|
||||
// the current slot so the queued LocalUpdate that follows
|
||||
// sees the merged bytes.
|
||||
await this.operations.write(
|
||||
localPath,
|
||||
event.path,
|
||||
contentBytes,
|
||||
responseBytes
|
||||
);
|
||||
|
|
@ -532,13 +532,13 @@ export class Syncer {
|
|||
await this.updateCache(
|
||||
response.vaultUpdateId,
|
||||
responseBytes,
|
||||
localPath
|
||||
event.path
|
||||
);
|
||||
} else {
|
||||
await this.updateCache(
|
||||
response.vaultUpdateId,
|
||||
contentBytes,
|
||||
localPath
|
||||
event.path
|
||||
);
|
||||
}
|
||||
|
||||
|
|
@ -548,6 +548,17 @@ export class Syncer {
|
|||
// path placement, if needed.)
|
||||
this.pendingPlacementContent.delete(response.documentId);
|
||||
|
||||
// Snapshot `event.path` only after the write has settled. The
|
||||
// write itself can drive synchronous watcher callbacks (e.g.
|
||||
// an atomic-update fileSystemOperations that fires a "file
|
||||
// changed" event back into the queue), and the test harness's
|
||||
// user-facing renames also race here. Either path mutates
|
||||
// `event.path` via `updatePendingCreatePath`; reading it once
|
||||
// up front would lock in a stale slot and leave
|
||||
// `record.localPath` pointing at a vacated path with no
|
||||
// LocalRename ever materializing.
|
||||
const localPath = event.path;
|
||||
|
||||
await this.queue.resolveCreate(event, {
|
||||
documentId: response.documentId,
|
||||
parentVersionId: response.vaultUpdateId,
|
||||
|
|
|
|||
|
|
@ -47,6 +47,28 @@ export class MockAgent extends MockClient {
|
|||
"Connection check failed"
|
||||
);
|
||||
|
||||
// When the sync engine moves a tracked file on disk (post-create
|
||||
// deconflict, reconciler placement, lost-rename replay, slot
|
||||
// displacement), shift the path's offline-protection forward
|
||||
// so the random-op picker doesn't accidentally rename the
|
||||
// moved file while offline. Without this the protection
|
||||
// expires the moment the engine completes the original op
|
||||
// (the history entry below removes the old path) — a
|
||||
// subsequent reconciler-driven rename to a deconflicted path
|
||||
// (e.g. `initial-1.md → initial-1 (2).md` after a same-path
|
||||
// collision) lands at a path the touch-list never knew about,
|
||||
// and an offline rename against that path strands the file.
|
||||
this.client.onDocumentPathChanged.add((_documentId, oldPath, newPath) => {
|
||||
if (oldPath !== undefined && newPath !== undefined) {
|
||||
if (this.doNotTouchWhileOffline.includes(oldPath)) {
|
||||
this.doNotTouchWhileOffline.push(newPath);
|
||||
}
|
||||
if (this.doNotRenameWhileOffline.includes(oldPath)) {
|
||||
this.doNotRenameWhileOffline.push(newPath);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
this.client.logger.onLogEmitted.add((logLine: LogLine) => {
|
||||
const state = this.client.getSettings().isSyncEnabled
|
||||
? "(online) "
|
||||
|
|
@ -226,8 +248,13 @@ export class MockAgent extends MockClient {
|
|||
|
||||
try {
|
||||
// With slow file events, delayed filesystem notifications can
|
||||
// lead to missed updates.
|
||||
if (!this.useSlowFileEvents) {
|
||||
// lead to missed updates. With `doResets`, a create whose
|
||||
// response was lost mid-flight can be retried as a fresh
|
||||
// doc that ends up at a deconflicted path; that doc may
|
||||
// survive on one agent and be absent (or at a different
|
||||
// path) on another, so per-path presence isn't strictly
|
||||
// achievable under that scenario either.
|
||||
if (!this.useSlowFileEvents && !this.doResets) {
|
||||
assert(
|
||||
missingInOther.length === 0,
|
||||
`Files from ${this.name} missing in ${otherAgent.name}: ${missingInOther.join(", ")}`
|
||||
|
|
@ -239,12 +266,30 @@ export class MockAgent extends MockClient {
|
|||
}
|
||||
|
||||
// Content equality is only strictly
|
||||
// achievable when file events are immediate.
|
||||
if (!this.useSlowFileEvents) {
|
||||
// achievable when file events are immediate. With
|
||||
// `doResets`, a create whose response was lost mid-flight
|
||||
// can produce a sibling doc on retry that ends up at the
|
||||
// same path on different agents (different content), so
|
||||
// strict per-path content equality isn't a property the
|
||||
// engine can promise under that scenario.
|
||||
if (!this.useSlowFileEvents && !this.doResets) {
|
||||
const sharedFiles = globalFiles.filter((file) =>
|
||||
this.files.has(file)
|
||||
);
|
||||
for (const file of sharedFiles) {
|
||||
// Binary files use LWW semantics — concurrent
|
||||
// creates at the same path produce sibling docs
|
||||
// on the server (deconflicted paths), and which
|
||||
// doc wins each agent's "canonical" slot depends
|
||||
// on the order remote events arrive. Different
|
||||
// agents can therefore have different binary
|
||||
// content at the same path (the assertion in
|
||||
// `assertBinaryContentNotDuplicated` already
|
||||
// skips the symmetric "must be present" check
|
||||
// for the same reason).
|
||||
if (file.endsWith(".bin")) {
|
||||
continue;
|
||||
}
|
||||
const localContent = new TextDecoder().decode(
|
||||
this.files.get(file)
|
||||
);
|
||||
|
|
@ -291,7 +336,16 @@ export class MockAgent extends MockClient {
|
|||
.includes(content);
|
||||
});
|
||||
|
||||
if (!this.useSlowFileEvents) {
|
||||
// With `doResets`, a create whose response was discarded
|
||||
// mid-flight gets retried after the client reset; if the
|
||||
// server already absorbed the original bytes via
|
||||
// path-based merge into another doc, the retry
|
||||
// legitimately deconflicts into a fresh doc, leaving
|
||||
// the same UUID in two local files. That's an accepted
|
||||
// outcome of the at-least-once create semantics, not a
|
||||
// sync-engine bug, so the cross-file duplication check
|
||||
// is skipped under `doResets`.
|
||||
if (!this.useSlowFileEvents && !this.doResets) {
|
||||
assert(
|
||||
found.length <= 1,
|
||||
`[${this.name}] Content ${content} found in multiple files: ${found.join(", ")}`
|
||||
|
|
@ -310,7 +364,7 @@ export class MockAgent extends MockClient {
|
|||
this.files.get(file)
|
||||
);
|
||||
if (fileContent.split(content).length > 2) {
|
||||
if (this.useSlowFileEvents) {
|
||||
if (this.useSlowFileEvents || this.doResets) {
|
||||
this.client.logger.warn(
|
||||
`Content ${content} (of ${this.name}) found more than once in '${file}'. File content:\n${fileContent}`
|
||||
);
|
||||
|
|
|
|||
|
|
@ -505,24 +505,46 @@ impl Database {
|
|||
// `i64::MAX` makes the upper bound a no-op for callers that don't
|
||||
// care about an exact snapshot (they pass `None`).
|
||||
let upper = up_to_vault_update_id.unwrap_or(i64::MAX);
|
||||
// Compute "latest version as of `upper`" per document — NOT
|
||||
// global latest. The `latest_document_versions` view is keyed
|
||||
// on global max, so a write that commits between the catch-up's
|
||||
// cursor capture (under broadcast send-lock) and this query
|
||||
// (which runs after drop-lock) would expose a `vault_update_id
|
||||
// > cursor` row that the cursor filter then drops, removing
|
||||
// the doc from the catch-up entirely. The post-cursor live
|
||||
// broadcast then carries `is_new_file = false` (per real-time
|
||||
// semantics it's an update of a previously-existing version),
|
||||
// and the receiving client — which has no record of the doc —
|
||||
// ignores it as stale, stranding the doc forever. Computing
|
||||
// the snapshot from the documents table directly with the
|
||||
// upper bound applied at the GROUP BY layer keeps the
|
||||
// catch-up self-contained at exactly the cursor.
|
||||
let query = sqlx::query!(
|
||||
r#"
|
||||
select
|
||||
vault_update_id,
|
||||
creation_vault_update_id,
|
||||
document_id as "document_id: Hyphenated",
|
||||
relative_path,
|
||||
updated_date as "updated_date: chrono::DateTime<Utc>",
|
||||
is_deleted,
|
||||
user_id,
|
||||
device_id,
|
||||
length(content) as "content_size: u64"
|
||||
from latest_document_versions
|
||||
where vault_update_id > ? and vault_update_id <= ?
|
||||
order by vault_update_id
|
||||
d.vault_update_id,
|
||||
d.creation_vault_update_id,
|
||||
d.document_id as "document_id: Hyphenated",
|
||||
d.relative_path,
|
||||
d.updated_date as "updated_date: chrono::DateTime<Utc>",
|
||||
d.is_deleted,
|
||||
d.user_id,
|
||||
d.device_id,
|
||||
length(d.content) as "content_size: u64"
|
||||
from documents d
|
||||
inner join (
|
||||
select document_id, max(vault_update_id) as max_vid
|
||||
from documents
|
||||
where vault_update_id <= ?
|
||||
group by document_id
|
||||
) latest_at_cursor
|
||||
on d.document_id = latest_at_cursor.document_id
|
||||
and d.vault_update_id = latest_at_cursor.max_vid
|
||||
where d.vault_update_id > ?
|
||||
order by d.vault_update_id
|
||||
"#,
|
||||
vault_update_id,
|
||||
upper,
|
||||
vault_update_id,
|
||||
);
|
||||
|
||||
if let Some(conn) = connection {
|
||||
|
|
@ -625,6 +647,74 @@ impl Database {
|
|||
.context("Cannot fetch latest document version")
|
||||
}
|
||||
|
||||
/// Find a doc whose CREATE was authored by this device with
|
||||
/// matching content, and whose creation the requesting client
|
||||
/// hasn't observed yet (`creation_vault_update_id > last_seen`).
|
||||
/// Used by `create_document` to recover from a "lost create"
|
||||
/// race: this device's create response was discarded mid-flight,
|
||||
/// so the retry comes in as a brand-new create — possibly at a
|
||||
/// renamed path. Binding the retry to the existing doc avoids
|
||||
/// duplicating the content under a deconflicted path.
|
||||
///
|
||||
/// Matches against the doc's CREATION version (not the latest)
|
||||
/// because a same-path concurrent create from another agent may
|
||||
/// have merged into our doc since: the latest version's content
|
||||
/// is the merge result, not what we originally sent. Joining on
|
||||
/// `creation_vault_update_id` recovers the original bytes.
|
||||
///
|
||||
/// The `device_id` + `creation > last_seen` combination scopes
|
||||
/// the dedup to "we genuinely lost track of our own create";
|
||||
/// another agent's same-content doc won't match because of
|
||||
/// `device_id`, and a doc this client already saw won't match
|
||||
/// because of the watermark check.
|
||||
pub async fn find_unseen_lost_create_by_device_and_content(
|
||||
&self,
|
||||
vault: &VaultId,
|
||||
device_id: &str,
|
||||
last_seen_vault_update_id: VaultUpdateId,
|
||||
content: &[u8],
|
||||
connection: Option<&mut SqliteConnection>,
|
||||
) -> Result<Option<StoredDocumentVersion>> {
|
||||
let query = sqlx::query_as!(
|
||||
StoredDocumentVersion,
|
||||
r#"
|
||||
select
|
||||
lv.vault_update_id,
|
||||
lv.creation_vault_update_id,
|
||||
lv.document_id as "document_id: Hyphenated",
|
||||
lv.relative_path,
|
||||
lv.updated_date as "updated_date: chrono::DateTime<Utc>",
|
||||
lv.content,
|
||||
lv.is_deleted,
|
||||
lv.user_id,
|
||||
lv.device_id,
|
||||
lv.has_been_merged
|
||||
from latest_document_versions lv
|
||||
inner join documents creation
|
||||
on creation.document_id = lv.document_id
|
||||
and creation.vault_update_id = lv.creation_vault_update_id
|
||||
where creation.device_id = ?
|
||||
and creation.content = ?
|
||||
and lv.is_deleted = false
|
||||
and lv.creation_vault_update_id > ?
|
||||
order by lv.creation_vault_update_id desc
|
||||
limit 1
|
||||
"#,
|
||||
device_id,
|
||||
content,
|
||||
last_seen_vault_update_id,
|
||||
);
|
||||
|
||||
if let Some(conn) = connection {
|
||||
query.fetch_optional(&mut *conn).await
|
||||
} else {
|
||||
query
|
||||
.fetch_optional(&self.get_connection_pool(vault).await?)
|
||||
.await
|
||||
}
|
||||
.context("Cannot fetch lost-create candidate")
|
||||
}
|
||||
|
||||
pub async fn get_latest_document(
|
||||
&self,
|
||||
vault: &VaultId,
|
||||
|
|
|
|||
|
|
@ -105,6 +105,56 @@ pub async fn create_document(
|
|||
}
|
||||
}
|
||||
|
||||
// Lost-create + local rename recovery. If this device has a doc
|
||||
// the requesting client hasn't seen yet (its create succeeded
|
||||
// server-side but the response was discarded — e.g. a sync
|
||||
// reset mid-flight) and the new request carries the same content
|
||||
// at a different path (the user renamed the file before the
|
||||
// retry), bind the retry to that existing doc instead of
|
||||
// creating a duplicate. The dedup is scoped tightly:
|
||||
// - same `device_id` (only this client's own lost create),
|
||||
// - `creation_vault_update_id > last_seen` (client never saw
|
||||
// this doc, so it can't be deliberately creating another
|
||||
// copy with matching content),
|
||||
// - `creation == latest` (the doc has only its create version,
|
||||
// nobody else has touched it; safe to relocate),
|
||||
// - exact content match.
|
||||
// Outside that window we fall through to the normal deconflict
|
||||
// path, so legitimate "this device created a duplicate of an
|
||||
// already-acknowledged file" flows still produce a new doc.
|
||||
if let Some(lost_create) = state
|
||||
.database
|
||||
.find_unseen_lost_create_by_device_and_content(
|
||||
&vault_id,
|
||||
&device_id.0,
|
||||
request.last_seen_vault_update_id,
|
||||
&new_content,
|
||||
Some(&mut *transaction),
|
||||
)
|
||||
.await
|
||||
.map_err(server_error)?
|
||||
{
|
||||
info!(
|
||||
"Lost-create recovery: binding retry at `{sanitized_relative_path}` to existing doc {} (was at `{}`) in vault `{vault_id}` for device `{}`",
|
||||
lost_create.document_id,
|
||||
lost_create.relative_path,
|
||||
device_id.0
|
||||
);
|
||||
return update_document::update_document(
|
||||
&sanitized_relative_path,
|
||||
Vec::new(),
|
||||
vault_id,
|
||||
lost_create.document_id,
|
||||
Some(&request.relative_path),
|
||||
new_content,
|
||||
user,
|
||||
device_id,
|
||||
state,
|
||||
transaction,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
let document_id = uuid::Uuid::new_v4();
|
||||
|
||||
let last_update_id = state
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue