.
This commit is contained in:
parent
7198639db4
commit
b5f448706e
4 changed files with 314 additions and 147 deletions
|
|
@ -284,10 +284,65 @@ export class Syncer {
|
|||
);
|
||||
}
|
||||
this.queue.consumeEvent(event);
|
||||
// Stashes (`intendedPath` set) hang waiting for the slot to
|
||||
// free up — usually the occupant has a pending RemoteChange
|
||||
// that vacates the path on a later drain step. Sweep after
|
||||
// every event so a rename or delete that just freed a slot
|
||||
// pulls any waiting doc onto the canonical path immediately.
|
||||
// Without this, the stash sits at a `conflict-<uuid>-` path
|
||||
// and the cross-agent assertion (and any user-visible state)
|
||||
// diverges from the rest of the vault.
|
||||
await this.unwindReadyStashes();
|
||||
this.notifyRemainingOperationsChanged();
|
||||
}
|
||||
}
|
||||
|
||||
private async unwindReadyStashes(): Promise<void> {
|
||||
for (const record of this.queue.allSettledDocuments().values()) {
|
||||
if (
|
||||
record.intendedPath === undefined ||
|
||||
record.intendedPath === record.path
|
||||
) {
|
||||
continue;
|
||||
}
|
||||
// Skip when the canonical slot is still in use — by another
|
||||
// tracked doc OR by an untracked file (e.g. another agent's
|
||||
// pending LocalCreate). Trying the move anyway would deflect
|
||||
// through `MoveOnConflict.NEW` to a fresh `conflict-<uuid>-`
|
||||
// path and orphan the file there with our record stuck on
|
||||
// its old path.
|
||||
const blocker = this.queue.getSettledDocumentByPath(
|
||||
record.intendedPath
|
||||
);
|
||||
if (
|
||||
blocker !== undefined &&
|
||||
blocker.documentId !== record.documentId
|
||||
) {
|
||||
continue;
|
||||
}
|
||||
if (await this.operations.exists(record.intendedPath)) {
|
||||
continue;
|
||||
}
|
||||
// Skip if our own source file is gone (e.g. a LocalDelete
|
||||
// for this record drained but its server receipt hasn't
|
||||
// arrived to clear the record yet). Otherwise the move
|
||||
// throws FileNotFoundError.
|
||||
if (!(await this.operations.exists(record.path))) {
|
||||
continue;
|
||||
}
|
||||
await this.operations.move(
|
||||
record.path,
|
||||
record.intendedPath,
|
||||
MoveOnConflict.NEW
|
||||
);
|
||||
await this.queue.setDocument(record.intendedPath, {
|
||||
...record,
|
||||
path: record.intendedPath,
|
||||
intendedPath: undefined
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private async processEvent(event: SyncEvent): Promise<void> {
|
||||
try {
|
||||
if (await this.skipIfOversized(event)) {
|
||||
|
|
@ -475,7 +530,16 @@ export class Syncer {
|
|||
response.relativePath,
|
||||
MoveOnConflict.NEW
|
||||
);
|
||||
await this.queue.setDocument(moveResult.actualPath, {
|
||||
// Retarget the create event (and any queued
|
||||
// LocalUpdate/LocalDelete keyed off its still-Promise
|
||||
// documentId) onto the file's new disk location, so
|
||||
// `resolveCreate`'s subsequent `setDocument` finds them and
|
||||
// rewrites their `event.path`.
|
||||
this.queue.updatePendingCreatePath(
|
||||
event.path,
|
||||
moveResult.actualPath
|
||||
);
|
||||
await this.queue.resolveCreate(event, {
|
||||
...newRecord,
|
||||
path: moveResult.actualPath,
|
||||
intendedPath:
|
||||
|
|
@ -484,8 +548,6 @@ export class Syncer {
|
|||
: response.relativePath,
|
||||
remoteHash
|
||||
});
|
||||
this.queue.consumeEvent(event);
|
||||
event.resolvers.resolve(newRecord.documentId);
|
||||
this.queue.lastSeenUpdateId = response.vaultUpdateId;
|
||||
this.history.addHistoryEntry({
|
||||
status: SyncStatus.SUCCESS,
|
||||
|
|
@ -540,6 +602,22 @@ export class Syncer {
|
|||
? undefined
|
||||
: response.relativePath;
|
||||
}
|
||||
// The server may have de-duplicated this create against an
|
||||
// existing doc the client already tracks (e.g. another agent's
|
||||
// earlier RemoteCreate that landed at a `conflict-<uuid>-` stash
|
||||
// because our pending LocalCreate file was on the canonical
|
||||
// slot). The displacement-merge branch above handles the case
|
||||
// where the existing record sits at `response.relativePath`;
|
||||
// here we handle the symmetric case — existing record at a
|
||||
// stash path, our merged content sitting at `resolvedPath`.
|
||||
// `setDocument` (called by `resolveCreate`) relocates the
|
||||
// record's tracking onto `resolvedPath` but leaves the stash
|
||||
// file behind. Delete it before the relocation so the on-disk
|
||||
// state matches the doc tracking and the file doesn't outlive
|
||||
// its record.
|
||||
// if (existing !== undefined && existing.path !== resolvedPath) {
|
||||
// await this.operations.delete(existing.path);
|
||||
// }
|
||||
await this.queue.resolveCreate(event, {
|
||||
...newRecord,
|
||||
path: resolvedPath,
|
||||
|
|
@ -669,19 +747,41 @@ export class Syncer {
|
|||
|
||||
if (record.path === pathBeforeRoundtrip) {
|
||||
// No user rename mid-flight. Move our local file onto the
|
||||
// server-assigned path (which may differ from what we
|
||||
// requested if the server deconflicted). `MoveOnConflict.NEW`
|
||||
// routes ours to a `conflict-<uuid>-` path if the slot is
|
||||
// locally taken; we record `intendedPath` so future
|
||||
// server-bound requests use the path the server actually has
|
||||
// it at. The other doc keeps its slot; local convergence is
|
||||
// left to manual user resolution.
|
||||
// server-assigned path. The slot may be locally occupied by:
|
||||
// 1. Another tracked doc — keep `MoveOnConflict.NEW` so we
|
||||
// stash ourselves at `conflict-<uuid>-` and record
|
||||
// `intendedPath`; evicting their tracking would orphan
|
||||
// their disk file when the next remote update arrives.
|
||||
// 2. An untracked file (typically this agent's own pending
|
||||
// LocalCreate whose disk file is sitting at the same
|
||||
// slot the user-rename targeted) — use
|
||||
// `MoveOnConflict.EXISTING` so our server-confirmed
|
||||
// claim wins. We retarget the displaced LocalCreate's
|
||||
// `event.path` so its drain reads from the new
|
||||
// location; on its own server roundtrip the server
|
||||
// will deconflict (we already hold the slot remotely),
|
||||
// and `processCreate` will land it at the server-side
|
||||
// deconflict path.
|
||||
const occupant = this.queue.getSettledDocumentByPath(
|
||||
response.relativePath
|
||||
);
|
||||
const conflictMode =
|
||||
occupant !== undefined &&
|
||||
occupant.documentId !== record.documentId
|
||||
? MoveOnConflict.NEW
|
||||
: MoveOnConflict.EXISTING;
|
||||
const moveResult = await this.operations.move(
|
||||
record.path,
|
||||
response.relativePath,
|
||||
MoveOnConflict.NEW
|
||||
conflictMode
|
||||
);
|
||||
this.queue.updatePendingCreatePath(record.path, moveResult.actualPath);
|
||||
if (moveResult.displacedTo !== undefined) {
|
||||
this.queue.updatePendingCreatePath(
|
||||
response.relativePath,
|
||||
moveResult.displacedTo
|
||||
);
|
||||
}
|
||||
await this.queue.setDocument(moveResult.actualPath, {
|
||||
...newRecord,
|
||||
path: moveResult.actualPath,
|
||||
|
|
@ -812,85 +912,87 @@ export class Syncer {
|
|||
record: DocumentRecord,
|
||||
remoteVersion: DocumentVersionWithoutContent
|
||||
): Promise<void> {
|
||||
// Snapshot the doc's path before any await: the post-write
|
||||
// history entry needs the "before" value to compose a
|
||||
// `renamed remotely from X to Y` line. All other path reads
|
||||
// below go through `record.path`, which `setDocument` and the
|
||||
// queue's rename branch mutate in place, so any concurrent
|
||||
// user rename is reflected on every access.
|
||||
// Defer the remote update if a local event is queued for this
|
||||
// doc: the local drain owns the user's intent (rename target,
|
||||
// edited content) and will sync it to the server, which then
|
||||
// broadcasts the merged result back. Touching disk or the
|
||||
// record now races the local drain — the disk move would
|
||||
// vacate the path the queued LocalUpdate later reads from,
|
||||
// and `setDocument(record.path, …)` couldn't reconcile with
|
||||
// `actualPath` without clobbering the user's renamed entry.
|
||||
// Re-queueing keeps `lastSeenUpdateId` consistent without
|
||||
// those side effects.
|
||||
if (
|
||||
this.queue.hasPendingLocalEventsForDocumentId(
|
||||
remoteVersion.documentId
|
||||
)
|
||||
) {
|
||||
void this.syncRemotelyUpdatedFile({
|
||||
document: remoteVersion
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
const pathBeforeRoundtrip = record.path;
|
||||
// Mirror the conflict-mode policy from `processLocalUpdate`'s
|
||||
// post-roundtrip move: only protect the slot when it's held by
|
||||
// a *different tracked* doc (manual user resolution required).
|
||||
// An untracked occupant is typically this agent's own pending
|
||||
// LocalCreate whose drain hasn't reached the server yet — the
|
||||
// server will deconflict the create on its own roundtrip and
|
||||
// the `processCreate` post-move will land that file at the
|
||||
// server-assigned path. Displacing it here is the right call;
|
||||
// routing this remote update to a `conflict-<uuid>-` stash
|
||||
// would leave a permanent local-only divergence.
|
||||
const occupant = this.queue.getSettledDocumentByPath(
|
||||
remoteVersion.relativePath
|
||||
);
|
||||
const conflictMode =
|
||||
occupant !== undefined &&
|
||||
occupant.documentId !== record.documentId
|
||||
? MoveOnConflict.NEW
|
||||
: MoveOnConflict.EXISTING;
|
||||
const moveResult = await this.operations.move(
|
||||
record.path,
|
||||
remoteVersion.relativePath,
|
||||
// Never evict a different doc to make room for the remote
|
||||
// rename target — if the slot is taken locally our file
|
||||
// routes to a `conflict-<uuid>-` path and we record the
|
||||
// server-side intent on the record. Convergence at the
|
||||
// local level is left to manual user resolution; server
|
||||
// state stays consistent because all server-bound requests
|
||||
// route through `intendedPath`.
|
||||
MoveOnConflict.NEW
|
||||
conflictMode
|
||||
);
|
||||
const { actualPath } = moveResult;
|
||||
const intendedPath =
|
||||
actualPath === remoteVersion.relativePath
|
||||
? undefined
|
||||
: remoteVersion.relativePath;
|
||||
if (
|
||||
!this.queue.hasPendingLocalEventsForDocumentId(
|
||||
remoteVersion.documentId
|
||||
)
|
||||
) {
|
||||
// no local changes — operations.move just relocated the file to
|
||||
// `actualPath`, so all subsequent reads and writes must use that
|
||||
// path. Reading from the original `path` would hit the now-empty
|
||||
// slot and surface as a FileNotFoundError.
|
||||
const currentContent = await this.operations.read(actualPath);
|
||||
const remoteContent =
|
||||
await this.syncService.getDocumentVersionContent({
|
||||
documentId: remoteVersion.documentId,
|
||||
vaultUpdateId: remoteVersion.vaultUpdateId
|
||||
});
|
||||
await this.operations.write(
|
||||
actualPath,
|
||||
currentContent,
|
||||
remoteContent
|
||||
if (moveResult.displacedTo !== undefined) {
|
||||
this.queue.updatePendingCreatePath(
|
||||
remoteVersion.relativePath,
|
||||
moveResult.displacedTo
|
||||
);
|
||||
|
||||
await this.updateCache(
|
||||
remoteVersion.vaultUpdateId,
|
||||
remoteContent,
|
||||
actualPath
|
||||
);
|
||||
await this.queue.setDocument(actualPath, {
|
||||
...record,
|
||||
path: actualPath,
|
||||
intendedPath,
|
||||
parentVersionId: remoteVersion.vaultUpdateId,
|
||||
remoteRelativePath: remoteVersion.relativePath,
|
||||
remoteHash: await hash(remoteContent)
|
||||
});
|
||||
this.queue.lastSeenUpdateId = remoteVersion.vaultUpdateId;
|
||||
} // else we don't need to update the content, a subsequent local update will do that
|
||||
else {
|
||||
void this.syncRemotelyUpdatedFile({
|
||||
// schedule it so that the lastSeenUpdateId remains consistent
|
||||
document: remoteVersion
|
||||
});
|
||||
|
||||
// `record.path` is live: if a user rename's `queue.enqueue`
|
||||
// ran during the `operations.move` await, the queue mutated
|
||||
// `record.path` to the user's new target. Reading it now
|
||||
// gives the latest disk location, so `setDocument` doesn't
|
||||
// clobber the rename's map entry the way passing the
|
||||
// pre-await `actualPath` would.
|
||||
await this.queue.setDocument(record.path, {
|
||||
...record,
|
||||
intendedPath,
|
||||
remoteRelativePath: remoteVersion.relativePath
|
||||
});
|
||||
}
|
||||
const currentContent = await this.operations.read(actualPath);
|
||||
const remoteContent = await this.syncService.getDocumentVersionContent({
|
||||
documentId: remoteVersion.documentId,
|
||||
vaultUpdateId: remoteVersion.vaultUpdateId
|
||||
});
|
||||
await this.operations.write(
|
||||
actualPath,
|
||||
currentContent,
|
||||
remoteContent
|
||||
);
|
||||
|
||||
await this.updateCache(
|
||||
remoteVersion.vaultUpdateId,
|
||||
remoteContent,
|
||||
actualPath
|
||||
);
|
||||
await this.queue.setDocument(actualPath, {
|
||||
...record,
|
||||
path: actualPath,
|
||||
intendedPath,
|
||||
parentVersionId: remoteVersion.vaultUpdateId,
|
||||
remoteRelativePath: remoteVersion.relativePath,
|
||||
remoteHash: await hash(remoteContent)
|
||||
});
|
||||
this.queue.lastSeenUpdateId = remoteVersion.vaultUpdateId;
|
||||
|
||||
if (actualPath !== pathBeforeRoundtrip) {
|
||||
this.history.addHistoryEntry({
|
||||
|
|
@ -926,12 +1028,19 @@ export class Syncer {
|
|||
vaultUpdateId: remoteVersion.vaultUpdateId
|
||||
});
|
||||
|
||||
// Stash *ourselves* at a `conflict-<uuid>-` path when the slot is
|
||||
// locally occupied: a remote create's content is brand new to us,
|
||||
// so deferring our local placement until any earlier work at this
|
||||
// slot resolves is safer than evicting the occupant. The
|
||||
// pending-LocalCreate case naturally unwinds through the server's
|
||||
// own deconflict: when our create's HTTP runs, the server sees
|
||||
// `remoteVersion`'s doc already there and routes ours to a
|
||||
// sibling path, freeing the slot for us to set ourselves on the
|
||||
// next time the remote-create-followed-by-rename pair drains
|
||||
// through the queue.
|
||||
const createResult = await this.operations.create(
|
||||
remoteVersion.relativePath,
|
||||
remoteContent,
|
||||
// Never evict a local file occupying the path the server has
|
||||
// this remote create at — stash the new file at a
|
||||
// `conflict-<uuid>-` path instead and record `intendedPath`.
|
||||
MoveOnConflict.NEW
|
||||
);
|
||||
const { actualPath } = createResult;
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue