loop
This commit is contained in:
parent
0d9aebf900
commit
7198639db4
9 changed files with 636 additions and 252 deletions
|
|
@ -311,6 +311,16 @@ export class SyncEventQueue {
|
|||
|
||||
/**
|
||||
* Call once a create has been acknowledged by the server.
|
||||
*
|
||||
* Queued `LocalUpdate` / `LocalDelete` events that were pushed while
|
||||
* this create was still in-flight carry the create's `resolvers.promise`
|
||||
* as their `documentId` (see the `pendingDocumentId` branch of
|
||||
* `enqueue`). We must rewrite those references to the resolved string
|
||||
* id *before* calling `setDocument`, otherwise its event-rewrite loop
|
||||
* (which compares `e.documentId === record.documentId`) would silently
|
||||
* skip them — leaving their `event.path` pointing at the pre-rename
|
||||
* slot and causing the next drain step's `getFileSize(event.path)` to
|
||||
* throw `FileNotFoundError`, dropping the user's intent.
|
||||
*/
|
||||
public async resolveCreate(
|
||||
event: Extract<SyncEvent, { type: SyncEventType.LocalCreate }>,
|
||||
|
|
@ -319,10 +329,36 @@ export class SyncEventQueue {
|
|||
if (removeFromArray(this.events, event)) {
|
||||
this.notifyPendingUpdateCountChanged();
|
||||
}
|
||||
this.replacePendingDocumentId(
|
||||
event.resolvers.promise,
|
||||
record.documentId
|
||||
);
|
||||
await this.setDocument(event.path, record);
|
||||
event.resolvers.resolve(record.documentId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Swap a pending create's `Promise<DocumentId>` reference for the
|
||||
* resolved string id across every queued `LocalUpdate` / `LocalDelete`.
|
||||
* Call this whenever a create resolves (regular ack OR
|
||||
* displacement-merge into an existing doc) — see `resolveCreate` for
|
||||
* the failure mode if it's skipped.
|
||||
*/
|
||||
public replacePendingDocumentId(
|
||||
promise: Promise<DocumentId>,
|
||||
documentId: DocumentId
|
||||
): void {
|
||||
for (const e of this.events) {
|
||||
if (
|
||||
(e.type === SyncEventType.LocalUpdate ||
|
||||
e.type === SyncEventType.LocalDelete) &&
|
||||
e.documentId === promise
|
||||
) {
|
||||
e.documentId = documentId;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Update the settled document map and persist the new document version.
|
||||
*
|
||||
|
|
|
|||
|
|
@ -422,13 +422,132 @@ export class Syncer {
|
|||
contentBytes
|
||||
});
|
||||
|
||||
await this.handleMaybeMergingResponse({
|
||||
response,
|
||||
contentHash,
|
||||
originalContentBytes: contentBytes,
|
||||
createEvent: event
|
||||
// `event.path` is mutated in place by `updatePendingCreatePath`
|
||||
// when a user renames the pending create mid-roundtrip, so we
|
||||
// read it live on every access — capturing it into a local
|
||||
// would freeze it at function entry and write the merged bytes
|
||||
// to the now-vacated path.
|
||||
let remoteHash: string;
|
||||
if (response.type === "MergingUpdate") {
|
||||
const responseBytes = base64ToBytes(response.contentBase64);
|
||||
await this.operations.write(event.path, contentBytes, responseBytes);
|
||||
remoteHash = await hash(responseBytes);
|
||||
await this.updateCache(response.vaultUpdateId, responseBytes, event.path);
|
||||
} else {
|
||||
remoteHash = contentHash;
|
||||
await this.updateCache(response.vaultUpdateId, contentBytes, event.path);
|
||||
}
|
||||
|
||||
const newRecord = {
|
||||
documentId: response.documentId,
|
||||
parentVersionId: response.vaultUpdateId,
|
||||
remoteRelativePath: response.relativePath
|
||||
};
|
||||
|
||||
// Displacement-merge: while this LocalCreate sat in the queue, a
|
||||
// RemoteCreate for `originalPath` was processed first, displaced
|
||||
// our local file to a `conflict-…` path, and tracked the remote
|
||||
// doc at `originalPath`. The server then de-duplicated our
|
||||
// create into that already-tracked doc and returned its id.
|
||||
// Slot the merged content into `response.relativePath` by
|
||||
// deleting D's stale content there and renaming the conflict
|
||||
// file in. Falling through to the regular `resolveCreate` path
|
||||
// would call `setDocument(conflict-…, D)`, whose same-docId
|
||||
// cleanup strips D's tracking from `originalPath` and orphans
|
||||
// the file there.
|
||||
const existing = this.queue.getDocumentByDocumentId(response.documentId);
|
||||
if (
|
||||
existing !== undefined &&
|
||||
existing.path === response.relativePath &&
|
||||
existing.path !== event.path
|
||||
) {
|
||||
// We can't `operations.write` the merged bytes onto the
|
||||
// existing path: that runs a 3-way merge against the stale
|
||||
// content as if it were a concurrent edit, which strips
|
||||
// out the very content the server just merged.
|
||||
await this.operations.delete(response.relativePath);
|
||||
// We just deleted `response.relativePath`. With
|
||||
// `MoveOnConflict.NEW` a stray racing occupant would route
|
||||
// our file to a `conflict-<uuid>-` path; we'd then track
|
||||
// the doc there with `intendedPath` set.
|
||||
const moveResult = await this.operations.move(
|
||||
event.path,
|
||||
response.relativePath,
|
||||
MoveOnConflict.NEW
|
||||
);
|
||||
await this.queue.setDocument(moveResult.actualPath, {
|
||||
...newRecord,
|
||||
path: moveResult.actualPath,
|
||||
intendedPath:
|
||||
moveResult.actualPath === response.relativePath
|
||||
? undefined
|
||||
: response.relativePath,
|
||||
remoteHash
|
||||
});
|
||||
this.queue.consumeEvent(event);
|
||||
event.resolvers.resolve(newRecord.documentId);
|
||||
this.queue.lastSeenUpdateId = response.vaultUpdateId;
|
||||
this.history.addHistoryEntry({
|
||||
status: SyncStatus.SUCCESS,
|
||||
details: { type: SyncType.CREATE, relativePath: event.path },
|
||||
message: "Created file and merged with existing remote version",
|
||||
author: response.userId,
|
||||
timestamp: new Date(response.updatedDate)
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
// Reconcile disk and tracking with the server-assigned path.
|
||||
// Two cases produce a mismatch:
|
||||
// 1. Server deconflicted (e.g. another client raced us): we
|
||||
// know because `response.relativePath !== event.originalPath`.
|
||||
// Move the local file to the server-assigned path, otherwise
|
||||
// a later remote create at our original path would see a
|
||||
// phantom local conflict and stash the new file under
|
||||
// `conflict-<uuid>-`.
|
||||
// 2. The create's local file was displaced to a `conflict-…`
|
||||
// path while it sat in the queue, but the server still
|
||||
// placed the doc at our original path (e.g. the existing
|
||||
// doc that forced the displacement was meanwhile deleted,
|
||||
// so the server-side merge / deconflict path didn't fire).
|
||||
// Move the conflict file onto the original path so
|
||||
// `resolveCreate` tracks the doc at the path the server
|
||||
// returned, instead of the displaced conflict path which
|
||||
// would orphan the file.
|
||||
//
|
||||
// We must NOT move when `event.path` differs from `originalPath`
|
||||
// because of a *user rename* of the pending create (e.g. write
|
||||
// A.md, rename to B.md): there the user's intent is the renamed
|
||||
// path, the server places the doc at `originalPath`, and the
|
||||
// queued `LocalUpdate` from the watcher will replay the rename
|
||||
// to the server.
|
||||
let resolvedPath = event.path;
|
||||
let resolvedIntendedPath: RelativePath | undefined;
|
||||
const needsMove =
|
||||
response.relativePath !== event.originalPath ||
|
||||
(event.path !== response.relativePath &&
|
||||
CONFLICT_PATH_REGEX.test(event.path));
|
||||
if (needsMove) {
|
||||
const moveResult = await this.operations.move(
|
||||
event.path,
|
||||
response.relativePath,
|
||||
MoveOnConflict.NEW
|
||||
);
|
||||
this.queue.updatePendingCreatePath(event.path, moveResult.actualPath);
|
||||
resolvedPath = moveResult.actualPath;
|
||||
resolvedIntendedPath =
|
||||
moveResult.actualPath === response.relativePath
|
||||
? undefined
|
||||
: response.relativePath;
|
||||
}
|
||||
await this.queue.resolveCreate(event, {
|
||||
...newRecord,
|
||||
path: resolvedPath,
|
||||
intendedPath: resolvedIntendedPath,
|
||||
remoteHash
|
||||
});
|
||||
|
||||
this.queue.lastSeenUpdateId = response.vaultUpdateId;
|
||||
this.history.addHistoryEntry({
|
||||
status: SyncStatus.SUCCESS,
|
||||
details: { type: SyncType.CREATE, relativePath: event.path },
|
||||
|
|
@ -509,6 +628,13 @@ export class Syncer {
|
|||
return;
|
||||
}
|
||||
|
||||
// Snapshot of `record.path` before the HTTP roundtrip — used
|
||||
// after the response to detect a user rename that ran while we
|
||||
// were awaiting (`record.path !== pathBeforeRoundtrip`). All
|
||||
// other reads go through `record.path` live so the merged
|
||||
// bytes land at the user's new location, not the vacated one.
|
||||
const pathBeforeRoundtrip = record.path;
|
||||
|
||||
const response = await this.sendUpdate({
|
||||
record,
|
||||
relativePath: renameTarget,
|
||||
|
|
@ -524,262 +650,75 @@ export class Syncer {
|
|||
return;
|
||||
}
|
||||
|
||||
await this.handleMaybeMergingResponse({
|
||||
record,
|
||||
pathBeforeRoundtrip: record.path,
|
||||
response,
|
||||
contentHash,
|
||||
originalContentBytes: contentBytes
|
||||
});
|
||||
let remoteHash: string;
|
||||
if (response.type === "MergingUpdate") {
|
||||
const responseBytes = base64ToBytes(response.contentBase64);
|
||||
await this.operations.write(record.path, contentBytes, responseBytes);
|
||||
remoteHash = await hash(responseBytes);
|
||||
await this.updateCache(response.vaultUpdateId, responseBytes, record.path);
|
||||
} else {
|
||||
remoteHash = contentHash;
|
||||
await this.updateCache(response.vaultUpdateId, contentBytes, record.path);
|
||||
}
|
||||
|
||||
const newRecord = {
|
||||
documentId: response.documentId,
|
||||
parentVersionId: response.vaultUpdateId,
|
||||
remoteRelativePath: response.relativePath
|
||||
};
|
||||
|
||||
if (record.path === pathBeforeRoundtrip) {
|
||||
// No user rename mid-flight. Move our local file onto the
|
||||
// server-assigned path (which may differ from what we
|
||||
// requested if the server deconflicted). `MoveOnConflict.NEW`
|
||||
// routes ours to a `conflict-<uuid>-` path if the slot is
|
||||
// locally taken; we record `intendedPath` so future
|
||||
// server-bound requests use the path the server actually has
|
||||
// it at. The other doc keeps its slot; local convergence is
|
||||
// left to manual user resolution.
|
||||
const moveResult = await this.operations.move(
|
||||
record.path,
|
||||
response.relativePath,
|
||||
MoveOnConflict.NEW
|
||||
);
|
||||
this.queue.updatePendingCreatePath(record.path, moveResult.actualPath);
|
||||
await this.queue.setDocument(moveResult.actualPath, {
|
||||
...newRecord,
|
||||
path: moveResult.actualPath,
|
||||
intendedPath:
|
||||
moveResult.actualPath === response.relativePath
|
||||
? undefined
|
||||
: response.relativePath,
|
||||
remoteHash
|
||||
});
|
||||
} else {
|
||||
// User renamed during the roundtrip. Leave the disk file at
|
||||
// `record.path`; the queued rename's LocalUpdate will
|
||||
// reconcile the server on its next drain.
|
||||
await this.queue.setDocument(record.path, {
|
||||
...newRecord,
|
||||
path: record.path,
|
||||
remoteHash
|
||||
});
|
||||
}
|
||||
|
||||
this.queue.lastSeenUpdateId = response.vaultUpdateId;
|
||||
|
||||
const isMerge = "type" in response && response.type === "MergingUpdate";
|
||||
this.history.addHistoryEntry({
|
||||
status: SyncStatus.SUCCESS,
|
||||
details: {
|
||||
type: SyncType.UPDATE,
|
||||
relativePath: record.path
|
||||
},
|
||||
message: isMerge
|
||||
? "Updated file and merged with remote changes"
|
||||
: "Successfully updated file on the server",
|
||||
message:
|
||||
response.type === "MergingUpdate"
|
||||
? "Updated file and merged with remote changes"
|
||||
: "Successfully updated file on the server",
|
||||
author: response.userId,
|
||||
timestamp: new Date(response.updatedDate)
|
||||
});
|
||||
}
|
||||
|
||||
private async handleMaybeMergingResponse({
|
||||
record,
|
||||
pathBeforeRoundtrip,
|
||||
response,
|
||||
contentHash,
|
||||
originalContentBytes,
|
||||
createEvent
|
||||
}: {
|
||||
// Live record reference for a LocalUpdate flow. Path reads go
|
||||
// through `record.path` so a user-rename mid-roundtrip is seen
|
||||
// on every access.
|
||||
record?: DocumentRecord;
|
||||
// Snapshot of `record.path` captured before `sendUpdate`
|
||||
// awaited. Compared against the live `record.path` after the
|
||||
// roundtrip to decide whether a user rename happened in
|
||||
// between.
|
||||
pathBeforeRoundtrip?: RelativePath;
|
||||
response: DocumentUpdateResponse;
|
||||
contentHash: string;
|
||||
originalContentBytes: Uint8Array;
|
||||
// When processing a Create, pass the originating event so its
|
||||
// `resolvers` promise can be fulfilled (or rejected, on a
|
||||
// deleted response). The create flow reads the live disk path
|
||||
// off `createEvent.path` (mutated by
|
||||
// `updatePendingCreatePath` on a user rename).
|
||||
createEvent?: Extract<SyncEvent, { type: SyncEventType.LocalCreate }>;
|
||||
}): Promise<void> {
|
||||
const newRecord = {
|
||||
documentId: response.documentId,
|
||||
parentVersionId: response.vaultUpdateId,
|
||||
remoteRelativePath: response.relativePath
|
||||
};
|
||||
let remoteHash: string;
|
||||
|
||||
// The two flows see rename retargeting through different live
|
||||
// objects:
|
||||
// - LocalUpdate: `record.path` is mutated in place by
|
||||
// `queue.enqueue`'s rename branch and `setDocument`.
|
||||
// - LocalCreate: the doc isn't tracked yet (no
|
||||
// `resolveCreate` has run); the rename retargets
|
||||
// `createEvent.path` via `updatePendingCreatePath`.
|
||||
// In both cases reading the live property at write time keeps
|
||||
// the merged bytes from being written to a vacated path.
|
||||
const writePath =
|
||||
createEvent !== undefined ? createEvent.path : record!.path;
|
||||
|
||||
if ("type" in response && response.type === "MergingUpdate") {
|
||||
const responseBytes = base64ToBytes(response.contentBase64);
|
||||
await this.operations.write(
|
||||
writePath,
|
||||
originalContentBytes,
|
||||
responseBytes
|
||||
);
|
||||
|
||||
remoteHash = await hash(responseBytes);
|
||||
|
||||
await this.updateCache(response.vaultUpdateId, responseBytes, writePath);
|
||||
} else {
|
||||
// Fast-forward update: no merge needed
|
||||
remoteHash = contentHash;
|
||||
|
||||
await this.updateCache(
|
||||
response.vaultUpdateId,
|
||||
originalContentBytes,
|
||||
writePath
|
||||
);
|
||||
}
|
||||
|
||||
if (createEvent === undefined) {
|
||||
if (record === undefined || pathBeforeRoundtrip === undefined) {
|
||||
throw new Error(
|
||||
"Unreachable: LocalUpdate flow must pass `record` and `pathBeforeRoundtrip`"
|
||||
);
|
||||
}
|
||||
// `record.path` is the *live* path. If a user rename ran
|
||||
// during the roundtrip, `queue.enqueue` mutated it (and the
|
||||
// queued LocalUpdate event's `path` field) to the user's
|
||||
// new target; otherwise it still equals
|
||||
// `pathBeforeRoundtrip`.
|
||||
const currentPath = record.path;
|
||||
if (currentPath === pathBeforeRoundtrip) {
|
||||
// Move our local file onto the server-assigned path.
|
||||
// `MoveOnConflict.NEW` means "if the target is taken
|
||||
// locally by some other doc, route ours to a
|
||||
// `conflict-<uuid>-` path instead of evicting them".
|
||||
// We then record `intendedPath = response.relativePath`
|
||||
// so future server-bound requests for this doc reference
|
||||
// the path the server actually has it at, not the local
|
||||
// conflict-uuid path. The other doc keeps its slot;
|
||||
// local convergence is left to manual user resolution.
|
||||
const moveResult = await this.operations.move(
|
||||
currentPath,
|
||||
response.relativePath,
|
||||
MoveOnConflict.NEW
|
||||
);
|
||||
this.queue.updatePendingCreatePath(currentPath, moveResult.actualPath);
|
||||
await this.queue.setDocument(moveResult.actualPath, {
|
||||
...newRecord,
|
||||
path: moveResult.actualPath,
|
||||
intendedPath:
|
||||
moveResult.actualPath === response.relativePath
|
||||
? undefined
|
||||
: response.relativePath,
|
||||
remoteHash
|
||||
});
|
||||
} else {
|
||||
// User renamed during the roundtrip. Leave the disk file
|
||||
// at `currentPath`; the queued rename's LocalUpdate will
|
||||
// reconcile the server on its next drain.
|
||||
await this.queue.setDocument(currentPath, {
|
||||
...newRecord,
|
||||
path: currentPath,
|
||||
remoteHash
|
||||
});
|
||||
}
|
||||
} else {
|
||||
// Displacement-merge: while this LocalCreate sat in the queue, a
|
||||
// RemoteCreate for `originalPath` was processed first, displaced
|
||||
// our local file to a `conflict-…` path, and tracked the remote
|
||||
// doc at `originalPath`. The server then de-duplicated our
|
||||
// create into that already-tracked doc and returned its id.
|
||||
// Relocate the just-merged content from the conflict path to
|
||||
// the existing tracked path (overwriting the older content the
|
||||
// displacement wrote there) and drop the conflict file.
|
||||
//
|
||||
// Falling through to `resolveCreate(createEvent, ...)` would
|
||||
// call `setDocument(conflict-…, D)`, whose same-docId cleanup
|
||||
// strips D's tracking from `originalPath` and leaves the file
|
||||
// there orphaned on disk.
|
||||
const existing = this.queue.getDocumentByDocumentId(
|
||||
response.documentId
|
||||
);
|
||||
if (
|
||||
existing !== undefined &&
|
||||
existing.path === response.relativePath &&
|
||||
existing.path !== createEvent.path
|
||||
) {
|
||||
// The merged content already lives at `createEvent.path`
|
||||
// (the MergingUpdate branch above wrote it there). Slot
|
||||
// it into `response.relativePath` by deleting D's stale
|
||||
// content there and renaming the conflict file in. We
|
||||
// can't `operations.write` the merged bytes onto the
|
||||
// existing path: that runs a 3-way merge against the
|
||||
// stale content as if it were a concurrent edit, which
|
||||
// strips out the very content the server just merged.
|
||||
await this.operations.delete(response.relativePath);
|
||||
// We just deleted `response.relativePath`. With
|
||||
// `MoveOnConflict.NEW` a stray racing occupant would
|
||||
// route our file to a `conflict-<uuid>-` path; we'd
|
||||
// then track the doc there with `intendedPath` set.
|
||||
const moveResult = await this.operations.move(
|
||||
createEvent.path,
|
||||
response.relativePath,
|
||||
MoveOnConflict.NEW
|
||||
);
|
||||
await this.queue.setDocument(moveResult.actualPath, {
|
||||
...newRecord,
|
||||
path: moveResult.actualPath,
|
||||
intendedPath:
|
||||
moveResult.actualPath === response.relativePath
|
||||
? undefined
|
||||
: response.relativePath,
|
||||
remoteHash
|
||||
});
|
||||
this.queue.consumeEvent(createEvent);
|
||||
createEvent.resolvers.resolve(newRecord.documentId);
|
||||
this.queue.lastSeenUpdateId = response.vaultUpdateId;
|
||||
return;
|
||||
}
|
||||
// Reconcile disk and tracking with the server-assigned path.
|
||||
// Two cases produce a mismatch:
|
||||
// 1. Server deconflicted (e.g. another client raced us): we
|
||||
// know because `response.relativePath !== createEvent.originalPath`.
|
||||
// Move the local file to the server-assigned path, otherwise
|
||||
// a later remote create at our original path would see a
|
||||
// phantom local conflict and stash the new file under
|
||||
// `conflict-<uuid>-`.
|
||||
// 2. The create's local file was displaced to a `conflict-…`
|
||||
// path while it sat in the queue, but the server still
|
||||
// placed the doc at our original path (e.g. the existing
|
||||
// doc that forced the displacement was meanwhile deleted,
|
||||
// so the server-side merge / deconflict path didn't
|
||||
// fire). Move the conflict file onto the original path
|
||||
// so `resolveCreate` tracks the doc at the path the
|
||||
// server returned, instead of the displaced conflict
|
||||
// path which would orphan the file.
|
||||
//
|
||||
// We must NOT move when `createEvent.path` differs from
|
||||
// `originalPath` because of a *user rename* of the pending
|
||||
// create (e.g. write A.md, rename to B.md): there the user's
|
||||
// intent is the renamed path, the server places the doc at
|
||||
// `originalPath`, and the queued `LocalUpdate` from the
|
||||
// watcher will replay the rename to the server.
|
||||
let resolvedPath = createEvent.path;
|
||||
let resolvedIntendedPath: RelativePath | undefined;
|
||||
if (response.relativePath !== createEvent.originalPath) {
|
||||
const moveResult = await this.operations.move(
|
||||
createEvent.path,
|
||||
response.relativePath,
|
||||
MoveOnConflict.NEW
|
||||
);
|
||||
this.queue.updatePendingCreatePath(createEvent.path, moveResult.actualPath);
|
||||
resolvedPath = moveResult.actualPath;
|
||||
resolvedIntendedPath =
|
||||
moveResult.actualPath === response.relativePath
|
||||
? undefined
|
||||
: response.relativePath;
|
||||
} else if (
|
||||
createEvent.path !== response.relativePath &&
|
||||
CONFLICT_PATH_REGEX.test(createEvent.path)
|
||||
) {
|
||||
const moveResult = await this.operations.move(
|
||||
createEvent.path,
|
||||
response.relativePath,
|
||||
MoveOnConflict.NEW
|
||||
);
|
||||
this.queue.updatePendingCreatePath(createEvent.path, moveResult.actualPath);
|
||||
resolvedPath = moveResult.actualPath;
|
||||
resolvedIntendedPath =
|
||||
moveResult.actualPath === response.relativePath
|
||||
? undefined
|
||||
: response.relativePath;
|
||||
}
|
||||
await this.queue.resolveCreate(createEvent, {
|
||||
...newRecord,
|
||||
path: resolvedPath,
|
||||
intendedPath: resolvedIntendedPath,
|
||||
remoteHash
|
||||
});
|
||||
}
|
||||
|
||||
this.queue.lastSeenUpdateId = response.vaultUpdateId;
|
||||
}
|
||||
|
||||
private async processRemoteChange(
|
||||
event: Extract<SyncEvent, { type: SyncEventType.RemoteChange }>
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue