This commit is contained in:
Andras Schmelczer 2026-04-23 20:35:42 +01:00
parent 6a8c7635f1
commit d715d94b6d
26 changed files with 1007 additions and 453 deletions

View file

@ -32,7 +32,7 @@ import {
} from "../tracing/sync-history";
import { isBinary } from "../utils/is-binary";
import { isFileTypeMergable } from "../utils/is-file-type-mergable";
import { diff } from "reconcile-text";
import { diff, reconcile } from "reconcile-text";
import type { ServerConfig } from "../services/server-config";
import type { FixedSizeDocumentCache } from "../utils/data-structures/fix-sized-cache";
import { base64ToBytes } from "byte-base64";
@ -68,7 +68,26 @@ export class Syncer {
if (isConnected) {
this.sendHandshakeMessage();
} else {
this.runningScheduleSyncForOfflineChanges = undefined;
// Don't null the reference synchronously — if the scan is
// still in flight, the next reconnect would spawn a second
// concurrent scan racing on the same queue. Defer the
// clear until the in-flight task actually resolves, so a
// fresh scan can only start once the prior one is done.
const current = this.runningScheduleSyncForOfflineChanges;
if (current === undefined) return;
current
.catch(() => {
/* swallow — internal error already logged */
})
.finally(() => {
if (
this.runningScheduleSyncForOfflineChanges ===
current
) {
this.runningScheduleSyncForOfflineChanges =
undefined;
}
});
}
});
this.webSocketManager.onRemoteVaultUpdateReceived.add(
@ -182,46 +201,64 @@ export class Syncer {
// because the create/update HTTP response no longer carries the path,
// so the only way the origin learns about dedupe or first-rename-wins
// is via this event.
//
// Algorithmic assumptions:
// (1) Per-vault broadcast ordering is preserved by the server, so if
// the same write produced a `VaultUpdate` (content change) and a
// `PathChange` (path change), the `VaultUpdate` is handled first
// — that's what lets us skip advancing `parentVersionId` here
// without risking a stuck "already up-to-date" check later.
// (2) On a lag-induced disconnect (`broadcast::error::Lagged`) the
// server disconnects the client for a full resync, so out-of-
// order delivery across a reconnect boundary can't leave us with
// a stale PathChange overwriting a newer one.
public async syncRemotelyChangedPath(
pathChange: WebSocketVaultPathChange
): Promise<void> {
// Serialize onto the drain chain so this handler can't race against
// an in-flight `processSyncRemote` / `processSyncLocal` etc. that
// captured the old path before our move.
try {
const existing = this.queue.getDocumentByDocumentId(
pathChange.documentId
);
if (existing === undefined) {
throw new Error(
`Received path change for unknown document ${pathChange.documentId}`
await this.chainOntoDrain(async () => {
const existing = this.queue.getDocumentByDocumentId(
pathChange.documentId
);
}
if (existing === undefined) {
throw new Error(
`Received path change for unknown document ${pathChange.documentId}`
);
}
const { path: currentPath, record } = existing;
const newPath = pathChange.relativePath;
const { path: currentPath, record } = existing;
const newPath = pathChange.relativePath;
if (currentPath !== newPath) {
await this.operations.move(currentPath, newPath);
if (currentPath !== newPath) {
await this.operations.move(currentPath, newPath);
this.history.addHistoryEntry({
status: SyncStatus.SUCCESS,
details: {
type: SyncType.MOVE,
relativePath: newPath,
movedFrom: currentPath
},
message: "Applied remote path change"
this.history.addHistoryEntry({
status: SyncStatus.SUCCESS,
details: {
type: SyncType.MOVE,
relativePath: newPath,
movedFrom: currentPath
},
message: "Applied remote path change",
author: pathChange.userId,
timestamp: new Date(pathChange.updatedDate)
});
}
// `operations.move` updates the queue's path index, but
// doesn't touch `remoteRelativePath`. Refresh it so offline
// change detection compares against the server's path.
// parentVersionId intentionally stays at its prior value:
// if the write also changed content, the corresponding
// VaultUpdate handles that; advancing it here would make us
// skip fetching content we don't yet have.
this.queue.setDocument(newPath, {
...record,
remoteRelativePath: newPath
});
}
// `operations.move` updates the queue's path index, but
// doesn't touch `remoteRelativePath`. Refresh it so offline
// change detection compares against the server's path.
// parentVersionId intentionally stays at its prior value:
// if the write also changed content, the corresponding
// VaultUpdate handles that; advancing it here would make us
// skip fetching content we don't yet have.
this.queue.setDocument(newPath, {
...record,
remoteRelativePath: newPath
});
} catch (e) {
if (e instanceof SyncResetError) {
@ -258,12 +295,19 @@ export class Syncer {
private async internalScheduleSyncForOfflineChanges(): Promise<void> {
await scheduleOfflineChanges(
{ logger: this.logger, operations: this.operations, queue: this.queue },
(path) => { this.syncLocallyCreatedFile(path); },
(args) => { this.syncLocallyUpdatedFile(args); },
(path) => { this.syncLocallyDeletedFile(path); },
);
// Offline scan wipes the event queue via `queue.clear()` and then
// rebuilds events from disk. That MUST NOT race against an
// in-flight drain iteration that may already hold a reference to
// a freshly-cleared event — chain onto the drain so the scan runs
// between drain ticks, never concurrently.
await this.chainOntoDrain(async () => {
await scheduleOfflineChanges(
{ logger: this.logger, operations: this.operations, queue: this.queue },
(path) => { this.syncLocallyCreatedFile(path); },
(args) => { this.syncLocallyUpdatedFile(args); },
(path) => { this.syncLocallyDeletedFile(path); },
);
});
await this.scheduleDrain();
}
@ -271,9 +315,27 @@ export class Syncer {
private ensureDraining(): void {
this.draining = (this.draining ?? Promise.resolve()).then(
async () => this.drain()
void this.chainOntoDrain(async () => this.drain());
}
/**
* Serialize a unit of work onto the same promise chain the drain
* uses. This is how direct WebSocket handlers (`syncRemotelyChangedPath`,
* offline-scan) avoid racing against the drain loop: every mutator of
* the queue / disk goes through this single chain, in order of arrival.
*/
private async chainOntoDrain<T>(work: () => Promise<T>): Promise<T> {
const chained = (this.draining ?? Promise.resolve()).then(
async () => work()
);
// We track the chain via `this.draining` so later work chains onto
// the latest link. Swallow the result-typed value for storage; the
// caller still awaits the true result via `chained`.
this.draining = chained.then(
() => undefined,
() => undefined
);
return chained;
}
private async scheduleDrain(): Promise<void> {
@ -338,6 +400,20 @@ export class Syncer {
this.logger.error(
`Server rejected ${event.type} request: ${e.message}`
);
// The event was already shifted off the queue before
// `processEvent` ran; if it was a Create, its resolver
// promise would otherwise hang forever, blocking any
// queued Delete / SyncLocal that `await`s it.
if (event.type === SyncEventType.Create) {
event.resolvers?.promise.catch(() => {
/* suppressed */
});
event.resolvers?.reject(
new Error(
`Create was cancelled — server rejected the request (${e.message})`
)
);
}
return;
}
throw e;
@ -366,6 +442,7 @@ export class Syncer {
const response = await this.syncService.create({
relativePath: event.originalPath,
lastSeenVaultUpdateId: this.queue.lastSeenUpdateId,
contentBytes
});
@ -394,7 +471,8 @@ export class Syncer {
path: effectivePath,
response,
contentHash,
originalContentBytes: contentBytes
originalContentBytes: contentBytes,
createEvent: event
});
this.history.addHistoryEntry({
@ -658,61 +736,71 @@ export class Syncer {
} else {
const responseBytes = base64ToBytes(fullVersion.contentBase64);
// Handle remote path change
let actualPath = currentPath;
// Path reconciliation fallback for the reconnect case.
//
// In steady-state streaming, server-initiated renames arrive as
// dedicated `PathChange` WebSocket events and are handled by
// `syncRemotelyChangedPath`. But the reconnect catch-up path
// (`get_unseen_documents` → `VaultUpdate(is_initial_sync=…)`)
// replays *versions* from the DB — `PathChange` is emission-
// only and not replayed. Without this branch, a pure rename
// that happened while we were disconnected would leave our
// local file stuck at its old path forever.
//
// Only apply the server's path when the record's
// `remoteRelativePath` still matches `currentPath` — that means
// we haven't locally renamed since we last heard from the
// server, so the server's path is authoritative. Any local
// rename in flight keeps priority (it'll be resolved by the
// server on its next write).
let targetPath = currentPath;
if (
fullVersion.relativePath !== currentPath &&
record.remoteRelativePath === currentPath
) {
actualPath = fullVersion.relativePath;
await this.operations.delete(fullVersion.relativePath);
await this.operations.move(
currentPath,
fullVersion.relativePath
);
await this.operations.move(currentPath, fullVersion.relativePath);
targetPath = fullVersion.relativePath;
}
await this.operations.write(
actualPath,
targetPath,
contentBytes,
responseBytes
);
// Re-read and re-hash after write (the 3-way merge may produce different content)
const afterWriteBytes = await this.operations.read(actualPath);
const afterWriteBytes = await this.operations.read(targetPath);
const afterWriteHash = await hash(afterWriteBytes);
this.queue.setDocument(actualPath, {
if (targetPath !== currentPath) {
this.queue.removeDocument(currentPath);
}
this.queue.setDocument(targetPath, {
documentId: fullVersion.documentId,
parentVersionId: fullVersion.vaultUpdateId,
remoteHash: afterWriteHash,
remoteRelativePath: fullVersion.relativePath
});
// If the path changed, remove the old entry
if (actualPath !== currentPath) {
this.queue.removeDocument(currentPath);
}
await this.updateCache(
fullVersion.vaultUpdateId,
responseBytes,
actualPath
targetPath
);
this.history.addHistoryEntry({
status: SyncStatus.SUCCESS,
details:
actualPath !== currentPath
targetPath !== currentPath
? {
type: SyncType.MOVE,
relativePath: actualPath,
movedFrom: currentPath
}
type: SyncType.MOVE,
relativePath: targetPath,
movedFrom: currentPath
}
: {
type: SyncType.UPDATE,
relativePath: actualPath
},
type: SyncType.UPDATE,
relativePath: targetPath
},
message:
"Successfully downloaded remotely updated file from the server",
author: fullVersion.userId,
@ -750,17 +838,22 @@ export class Syncer {
return;
}
const deconflictedPath = await this.operations.ensureClearPath(
remoteVersion.relativePath
);
if (deconflictedPath !== undefined) {
// The displaced file was moved to a deconflicted path.
// Remove its document record so the offline scan treats
// it as a new file rather than an existing document that
// needs its path synced (which would create duplicates)
this.queue.removeDocument(deconflictedPath);
// Special case: local has an *unsynced* new file at the same path.
// The client must cancel the outgoing Create and merge the two files
// instead of displacing the local one to a conflict path — those
// files are semantically "the same user-intended document" that two
// devices created concurrently, so we want to preserve both sides'
// edits, not shelve one aside.
if (this.queue.hasPendingCreateAt(remoteVersion.relativePath)) {
await this.mergeUnsyncedLocalWithRemoteCreate(
remoteVersion,
contentBytes
);
return;
}
await this.operations.ensureClearPath(remoteVersion.relativePath);
const contentHash = await hash(contentBytes);
this.queue.setDocument(remoteVersion.relativePath, {
documentId: remoteVersion.documentId,
@ -794,6 +887,131 @@ export class Syncer {
});
}
// A remote create landed at a path where we have an unsynced local
// create. How we resolve depends on whether both sides are mergeable
// text: text gets an in-place union merge and one follow-up update;
// binary falls through to displacement so *both* files survive.
private async mergeUnsyncedLocalWithRemoteCreate(
remoteVersion: DocumentVersionWithoutContent,
remoteContent: Uint8Array
): Promise<void> {
const path = remoteVersion.relativePath;
const localContent = await this.operations.read(path);
const canMergeText =
isFileTypeMergable(
path,
(await this.serverConfig.getConfig()).mergeableFileExtensions
) &&
!isBinary(localContent) &&
!isBinary(remoteContent);
if (!canMergeText) {
// Binary (or non-mergeable) concurrent creates: leave the local
// Create in the queue and let the default displacement flow
// take over (local bytes are moved to `conflict-<uuid>-…` by
// `ensureClearPath`, remote bytes take `path`). When the Create
// eventually fires it reads the remote content at `path` — not
// what we want — so cancel *just* the Create event and
// re-enqueue a fresh one sourced from the displaced path, so
// the server receives the user's original bytes and dedupes
// the path on its own.
this.queue.cancelPendingCreate(path);
// `ensureClearPath` may return `undefined` if the file was
// deleted between `read(path)` above and this call (a TOCTOU
// race with a concurrent filesystem delete). That's fine:
// nothing to displace means no local bytes to preserve, and
// we just proceed with the remote content.
const conflictPath =
await this.operations.ensureClearPath(path);
this.queue.setDocument(path, {
documentId: remoteVersion.documentId,
parentVersionId: remoteVersion.vaultUpdateId,
remoteHash: await hash(remoteContent),
remoteRelativePath: path
});
await this.operations.create(path, remoteContent);
await this.updateCache(
remoteVersion.vaultUpdateId,
remoteContent,
path
);
this.history.addHistoryEntry({
status: SyncStatus.SUCCESS,
details: {
type: SyncType.CREATE,
relativePath: path
},
message:
conflictPath !== undefined
? `Adopted remote create at ${path}; unsynced local bytes preserved at ${conflictPath} for manual recovery`
: `Adopted remote create at ${path}; local file had already been removed`,
author: remoteVersion.userId,
timestamp: new Date(remoteVersion.updatedDate)
});
return;
}
// Mergeable text: union-merge with empty parent (every byte in
// either side is treated as an insertion), overwrite disk, and
// push the merged result to the server if it diverged from the
// remote copy. Cancelling the Create and re-emitting as a
// SyncLocal update lets the existing merge-response pipeline
// handle parentVersionId/content reconciliation end-to-end.
this.queue.cancelPendingCreate(path);
const mergedContent = new TextEncoder().encode(
reconcile(
"",
new TextDecoder().decode(localContent),
new TextDecoder().decode(remoteContent)
).text
);
// Adopt the remote document's identity locally *before* touching
// disk so an interleaved event can't mistake the file for a fresh
// create again. `remoteHash` is deliberately the server's content
// hash (not the merged one) so the SyncLocal below sees a real
// diff and actually uploads the merge.
const remoteHash = await hash(remoteContent);
this.queue.setDocument(path, {
documentId: remoteVersion.documentId,
parentVersionId: remoteVersion.vaultUpdateId,
remoteHash,
remoteRelativePath: path
});
// Overwrite disk with the merged result. We pass `localContent` as
// the "expected" content so `operations.write`'s internal 3-way
// merge is a no-op (expected == disk ⇒ apply `new` verbatim).
await this.operations.write(path, localContent, mergedContent);
await this.updateCache(
remoteVersion.vaultUpdateId,
remoteContent,
path
);
const mergedHash = await hash(mergedContent);
if (mergedHash !== remoteHash) {
this.syncLocallyUpdatedFile({ relativePath: path });
}
this.history.addHistoryEntry({
status: SyncStatus.SUCCESS,
details: {
type: SyncType.CREATE,
relativePath: path
},
message: "Merged unsynced local file with concurrent remote create",
author: remoteVersion.userId,
timestamp: new Date(remoteVersion.updatedDate)
});
}
private async sendUpdate(
@ -834,96 +1052,139 @@ export class Syncer {
path,
response,
contentHash,
originalContentBytes
originalContentBytes,
createEvent
}: {
path: RelativePath;
response: DocumentUpdateResponse;
contentHash: string;
originalContentBytes: Uint8Array;
// When processing a Create, pass the originating event so its
// `resolvers` promise can be fulfilled (or rejected, on a deleted
// response). Dependent SyncLocal/Delete events are chained through
// that promise and would otherwise `await` forever.
createEvent?: Extract<SyncEvent, { type: SyncEventType.Create }>;
}): Promise<void> {
if (response.isDeleted) {
// A Create that the server returned as already-deleted means
// nothing we can sync — reject the waiting promise so chained
// Delete / SyncLocal events skip themselves instead of hanging.
if (createEvent?.resolvers !== undefined) {
createEvent.resolvers.promise.catch(() => {
/* suppressed — consumer may not be listening */
});
createEvent.resolvers.reject(
new Error(
"Create was cancelled — server reported the document as deleted"
)
);
}
// Capture the documentId of the record we *believe* is at
// `path` now. If a concurrent `syncRemotelyChangedPath` moves
// this document between our exists-check and our read, the
// record at `path` after those awaits may belong to a
// DIFFERENT document. Guard against that.
const originalRecord =
this.queue.getSettledDocumentByPath(path);
const originalDocumentId = originalRecord?.documentId;
// If the local file has been edited, re-create it as a new
// document so local edits survive the remote delete
// document so local edits survive the remote delete — but only
// if nothing else is already queuing a Create for this path, to
// avoid doubling up when offline-change detection races with us.
if (await this.operations.exists(path)) {
const localBytes = await this.operations.read(path);
const localHash = await hash(localBytes);
const record = this.queue.getSettledDocumentByPath(path);
if (record !== undefined && localHash !== record.remoteHash) {
const currentRecord =
this.queue.getSettledDocumentByPath(path);
// Re-verify the record's identity hasn't shifted under us.
if (
currentRecord !== undefined &&
currentRecord.documentId === originalDocumentId &&
localHash !== currentRecord.remoteHash &&
!this.queue.hasPendingCreateAt(path)
) {
this.queue.removeDocument(path);
this.syncLocallyCreatedFile(path);
return;
}
}
await this.operations.delete(path);
this.queue.removeDocument(path);
// Only delete on disk if the record at `path` is still the one
// we expected — if a PathChange moved another doc here, we
// shouldn't delete its file.
const finalRecord = this.queue.getSettledDocumentByPath(path);
if (
finalRecord === undefined ||
finalRecord.documentId === originalDocumentId
) {
await this.operations.delete(path);
this.queue.removeDocument(path);
}
return;
}
let actualPath = path;
// Server may have changed the path (e.g. first-rename-wins conflict)
if (response.relativePath !== path) {
actualPath = response.relativePath;
const displacedPath = await this.operations.move(
path,
response.relativePath
);
if (displacedPath !== undefined) {
const displacedRecord =
this.queue.getSettledDocumentByPath(displacedPath);
if (displacedRecord !== undefined) {
const displacedBytes =
await this.operations.read(displacedPath);
const displacedHash = await hash(displacedBytes);
if (displacedHash !== displacedRecord.remoteHash) {
this.queue.enqueue({ type: SyncEventType.SyncLocal, path: displacedPath });
}
}
}
// Remove old path entry; the new path will be set below
this.queue.removeDocument(path);
}
// The response carries content only — path reconciliation is the
// sole responsibility of the `PathChange` WebSocket event, which
// fires independently for renames/dedupes. We therefore always
// record the current local `path` here; an in-flight `PathChange`
// will move the file and fix `remoteRelativePath` if the server
// placed the document somewhere else.
const existingRecord = this.queue.getSettledDocumentByPath(path);
const remoteRelativePath = existingRecord?.remoteRelativePath ?? path;
let record: DocumentRecord;
if ("type" in response && response.type === "MergingUpdate") {
const responseBytes = base64ToBytes(response.contentBase64);
await this.operations.write(
actualPath,
path,
originalContentBytes,
responseBytes
);
// Re-read and re-hash after write (invariant #3)
const afterWriteBytes = await this.operations.read(actualPath);
const afterWriteBytes = await this.operations.read(path);
const afterWriteHash = await hash(afterWriteBytes);
this.queue.setDocument(actualPath, {
record = {
documentId: response.documentId,
parentVersionId: response.vaultUpdateId,
remoteHash: afterWriteHash,
remoteRelativePath: response.relativePath
});
remoteRelativePath
};
// Cache the SERVER's content, not local (invariant #2)
await this.updateCache(
response.vaultUpdateId,
responseBytes,
actualPath
path
);
} else {
// Fast-forward update: no merge needed
this.queue.setDocument(actualPath, {
record = {
documentId: response.documentId,
parentVersionId: response.vaultUpdateId,
remoteHash: contentHash,
remoteRelativePath: response.relativePath
});
remoteRelativePath
};
await this.updateCache(
response.vaultUpdateId,
originalContentBytes,
actualPath
path
);
}
// For a Create, fulfill the resolver promise and replace any
// `documentId: Promise<...>` references in queued Delete/SyncLocal
// events with the now-known string id. For everything else a plain
// `setDocument` is enough — the record's identity was already
// resolved when the Create originally settled.
if (createEvent !== undefined) {
this.queue.resolveCreate(createEvent, record);
} else {
this.queue.setDocument(path, record);
}
}
private async updateCache(