This commit is contained in:
Andras Schmelczer 2026-05-05 21:50:24 +01:00
parent 35877b69da
commit 8aeb0d6027
20 changed files with 1198 additions and 88 deletions

View file

@ -65,6 +65,8 @@ export class Syncer {
private runningScheduleSyncForOfflineChanges: Promise<void> | undefined;
private drainPromise: Promise<void> | undefined;
private drainRequestedWhileRunning = false;
private isDrainingPaused = false;
private isScanning = false;
private previousRemainingOperationsCount = 0;
@ -244,6 +246,15 @@ export class Syncer {
}
}
public pauseDraining(): void {
this.isDrainingPaused = true;
}
public resumeDraining(): void {
this.isDrainingPaused = false;
this.ensureDraining();
}
private sendHandshakeMessage(): void {
const message: WebSocketClientMessage = {
type: "handshake",
@ -282,13 +293,27 @@ export class Syncer {
private ensureDraining(): void {
if (this.drainPromise !== undefined) {
this.drainRequestedWhileRunning = true;
return;
}
if (this.isScanning) {
return;
}
if (this.isDrainingPaused) {
return;
}
this.drainPromise = this.drain().finally(() => {
this.drainPromise = undefined;
const shouldRestart =
this.drainRequestedWhileRunning &&
this.queue.pendingUpdateCount > 0 &&
!this.isScanning &&
!this.isDrainingPaused &&
this.settings.getSettings().isSyncEnabled;
this.drainRequestedWhileRunning = false;
if (shouldRestart) {
this.ensureDraining();
}
});
}
@ -296,9 +321,12 @@ export class Syncer {
// Peek then remove-after-processing (instead of shift-then-process):
// the event must remain reachable through `findLatestCreateForPath`
// while it is in flight, so a rename event arriving mid-process can
// call `updatePendingCreatePath` to retarget this create's path.
// call `updatePendingCreatePath` to retarget this create's local path.
for (;;) {
if (!this.settings.getSettings().isSyncEnabled) {
if (
this.isDrainingPaused ||
!this.settings.getSettings().isSyncEnabled
) {
this.logger.debug(
"Drain pausing because sync is disabled; events stay queued"
);
@ -333,6 +361,10 @@ export class Syncer {
private async processEvent(event: SyncEvent): Promise<void> {
try {
if (event.type === SyncEventType.LocalCreate) {
event.isProcessing = true;
}
if (await this.skipIfOversized(event)) {
return;
}
@ -460,21 +492,26 @@ export class Syncer {
private async processCreate(
event: Extract<SyncEvent, { type: SyncEventType.LocalCreate }>
): Promise<void> {
const contentBytes = await this.operations.read(event.path);
const requestPath = event.path;
const contentBytes = await this.operations.read(requestPath);
const contentHash = await hash(contentBytes);
// Read `event.path` live: `updatePendingCreatePath` mutates it in
// place when the user renames the pending create mid-roundtrip.
// Sending `originalPath` here would tell the server the pre-rename
// location, then the queued LocalUpdate from the rename would
// fail on `getFileSize(renamedPath)` after the reconciler moved
// the file back to match the (stale) server-side path.
// Use the path the pending create has when it reaches the wire loop.
// `updatePendingCreatePath` mutates queued creates when a not-yet-sent
// local file is renamed, so a renamed-away generation does not create
// a server document at a path that a newer local file has reused.
const response = await this.syncService.create({
relativePath: event.path,
relativePath: requestPath,
lastSeenVaultUpdateId: this.queue.lastSeenUpdateId,
contentBytes
});
// If the user renamed the file while the create request was in flight,
// event.path now points at the renamed disk slot. Apply response bytes
// and install the local record there; the queued LocalUpdate carries
// the server-side rename intent.
const localPath = event.path;
// Same-docId collapse. While our LocalCreate sat in the queue, a
// RemoteCreate may have arrived for this same path. The wire-loop's
// `processRemoteCreateForNewDocument` would have built a record with
@ -487,7 +524,7 @@ export class Syncer {
if (response.type === "MergingUpdate") {
const responseBytes = base64ToBytes(response.contentBase64);
await this.operations.write(
event.path,
localPath,
contentBytes,
responseBytes
);
@ -495,13 +532,13 @@ export class Syncer {
await this.updateCache(
response.vaultUpdateId,
responseBytes,
event.path
localPath
);
} else {
await this.updateCache(
response.vaultUpdateId,
contentBytes,
event.path
localPath
);
}
@ -516,13 +553,13 @@ export class Syncer {
parentVersionId: response.vaultUpdateId,
remoteRelativePath: response.relativePath,
remoteHash,
localPath: event.path
localPath
});
this.queue.lastSeenUpdateId = response.vaultUpdateId;
this.history.addHistoryEntry({
status: SyncStatus.SUCCESS,
details: { type: SyncType.CREATE, relativePath: event.path },
details: { type: SyncType.CREATE, relativePath: localPath },
message:
response.type === "MergingUpdate"
? "Created file and merged with existing remote version"
@ -536,6 +573,24 @@ export class Syncer {
event: Extract<SyncEvent, { type: SyncEventType.LocalDelete }>
): Promise<void> {
const documentId = await event.documentId;
const record = this.queue.getDocumentByDocumentId(documentId);
if (
record?.localPath !== undefined &&
record.localPath !== event.path
) {
this.logger.debug(
`Skipping local-delete for ${documentId} at ${event.path}: ` +
`record now owns ${record.localPath}`
);
return;
}
// The disk file is already gone when a LocalDelete reaches the wire
// loop. This is redundant for settled records deleted through
// `enqueue`, but load-bearing for creates that were deleted while the
// create request was still pending: their record only exists after the
// create ack resolves.
await this.queue.setLocalPath(documentId, undefined);
const response = await this.syncService.delete({
documentId
@ -754,23 +809,32 @@ export class Syncer {
}
if (trackedRecord !== undefined) {
// The doc is tracked. If we have a local file backing it
// and that file has gone missing — e.g. the user deleted it
// and the LocalDelete hasn't drained yet, or our HTTP
// DELETE just landed and we're still waiting on the
// WebSocket receipt — ignore the update. Otherwise we'd
// try to operate on a vanished file (or recreate one we're
// tearing down).
// The doc is tracked, but the disk slot can be stale. One
// concrete race: a remote create quick-writes a file, a
// watcher rename/delete lands before the record is fully
// settled, and the record is left claiming a path that no
// longer exists. If no queued local operation owns that
// disappearance, clear the localPath and let
// processRemoteUpdate stash/place the active server version.
if (trackedRecord.localPath !== undefined) {
const fileExists = await this.operations.exists(
trackedRecord.localPath
);
if (!fileExists) {
this.queue.lastSeenUpdateId = remoteVersion.vaultUpdateId;
if (
!fileExists &&
!this.queue.hasPendingLocalEventsForDocumentId(
remoteVersion.documentId
)
) {
this.logger.debug(
`Ignoring remote update for ${remoteVersion.documentId}: local file at ${trackedRecord.localPath} is missing`
`Remote update for ${remoteVersion.documentId}: ` +
`local file at ${trackedRecord.localPath} is missing; ` +
`clearing localPath for placement`
);
await this.queue.setLocalPath(
trackedRecord.documentId,
undefined
);
return;
}
}
return this.processRemoteUpdate(trackedRecord, remoteVersion);
@ -992,9 +1056,7 @@ export class Syncer {
// design, no buffering at receive time — the reconciler will
// fetch on demand.
const target = remoteVersion.relativePath;
const slotFree =
!(await this.operations.exists(target)) &&
this.queue.getRecordByLocalPath(target) === undefined;
const slotFree = await this.canPlaceRemoteCreateAt(target);
let localPath: RelativePath | undefined = undefined;
let remoteHash: string | undefined = undefined;
@ -1004,49 +1066,77 @@ export class Syncer {
documentId: remoteVersion.documentId,
vaultUpdateId: remoteVersion.vaultUpdateId
});
try {
const result = await this.operations.create(
target,
remoteContent
);
localPath = result.actualPath;
remoteHash = await hash(remoteContent);
await this.updateCache(
remoteVersion.vaultUpdateId,
remoteContent,
localPath
);
} catch (e) {
if (!(e instanceof FileAlreadyExistsError)) {
throw e;
}
// TOCTOU: the slot was free at the pre-check but
// something landed there between then and now. Fall
// through to the no-localPath branch and let the
// reconciler retry placement once the slot frees.
if (!(await this.canPlaceRemoteCreateAt(target))) {
this.logger.debug(
`Quick-write for ${remoteVersion.documentId} at ${target} ` +
`lost a TOCTOU race; deferring to reconciler`
`became blocked while fetching content; deferring to reconciler`
);
localPath = undefined;
remoteHash = undefined;
} else {
try {
remoteHash = await hash(remoteContent);
await this.queue.upsertRecord({
documentId: remoteVersion.documentId,
parentVersionId: remoteVersion.vaultUpdateId,
remoteRelativePath: remoteVersion.relativePath,
remoteHash,
localPath: target
});
const result = await this.operations.create(
target,
remoteContent
);
const liveRecord = this.queue.getDocumentByDocumentId(
remoteVersion.documentId
);
localPath =
liveRecord === undefined
? result.actualPath
: liveRecord.localPath;
await this.updateCache(
remoteVersion.vaultUpdateId,
remoteContent,
localPath ?? remoteVersion.relativePath
);
} catch (e) {
await this.queue.setLocalPath(
remoteVersion.documentId,
undefined
);
if (!(e instanceof FileAlreadyExistsError)) {
throw e;
}
// TOCTOU: the slot was free at the pre-check but
// something landed there between then and now. Fall
// through to the no-localPath branch and let the
// reconciler retry placement once the slot frees.
this.logger.debug(
`Quick-write for ${remoteVersion.documentId} at ${target} ` +
`lost a TOCTOU race; deferring to reconciler`
);
localPath = undefined;
}
}
}
await this.queue.upsertRecord({
documentId: remoteVersion.documentId,
parentVersionId: remoteVersion.vaultUpdateId,
remoteRelativePath: remoteVersion.relativePath,
// `remoteHash` is undefined when we deferred fetching content.
// Consumers (`processLocalUpdate`'s fast-skip,
// `findMatchingFile`'s offline-rename detection) treat
// undefined as "no comparison possible" and fall through to a
// real upload / no-match. The hash gets populated the next
// time we observe a real version (a remote update, or a
// local edit that triggers an upload).
remoteHash,
localPath
});
if (
this.queue.getDocumentByDocumentId(remoteVersion.documentId) ===
undefined
) {
await this.queue.upsertRecord({
documentId: remoteVersion.documentId,
parentVersionId: remoteVersion.vaultUpdateId,
remoteRelativePath: remoteVersion.relativePath,
// `remoteHash` is undefined when we deferred fetching content.
// Consumers (`processLocalUpdate`'s fast-skip,
// `findMatchingFile`'s offline-rename detection) treat
// undefined as "no comparison possible" and fall through to a
// real upload / no-match. The hash gets populated the next
// time we observe a real version (a remote update, or a
// local edit that triggers an upload).
remoteHash,
localPath
});
}
this.queue.lastSeenUpdateId = remoteVersion.vaultUpdateId;
@ -1065,6 +1155,16 @@ export class Syncer {
}
}
private async canPlaceRemoteCreateAt(
target: RelativePath
): Promise<boolean> {
return (
!this.queue.hasPendingCreateForPath(target) &&
!(await this.operations.exists(target)) &&
this.queue.getRecordByLocalPath(target) === undefined
);
}
private async sendUpdate({
record,
relativePath,