Remove expected FS events

This commit is contained in:
Andras Schmelczer 2026-05-12 22:18:10 +01:00
parent cd08cd80c7
commit 935ed9c8e7
13 changed files with 129 additions and 243 deletions

View file

@ -7,7 +7,6 @@ import { assertSetContainsExactly } from "../utils/assert-set-contains-exactly";
import type { FileSystemOperations } from "./filesystem-operations";
import type { TextWithCursors } from "reconcile-text";
import type { ServerConfig, ServerConfigData } from "../services/server-config";
import { ExpectedFsEvents } from "../sync-operations/expected-fs-events";
import { FileAlreadyExistsError } from "../errors/file-already-exists-error";
class MockServerConfig implements Pick<ServerConfig, "getConfig"> {
@ -72,8 +71,7 @@ function makeOps(): {
const ops = new FileOperations(
new Logger(),
fs,
new MockServerConfig() as ServerConfig, // eslint-disable-line @typescript-eslint/no-unsafe-type-assertion
new ExpectedFsEvents()
new MockServerConfig() as ServerConfig // eslint-disable-line @typescript-eslint/no-unsafe-type-assertion
);
return { fs, ops };
}

View file

@ -9,7 +9,6 @@ import { isBinary } from "../utils/is-binary";
import type { ServerConfig } from "../services/server-config";
import { FileNotFoundError } from "../errors/file-not-found-error";
import { FileAlreadyExistsError } from "../errors/file-already-exists-error";
import type { ExpectedFsEvents } from "../sync-operations/expected-fs-events";
export class FileOperations {
private readonly fs: SafeFileSystemOperations;
@ -18,7 +17,6 @@ export class FileOperations {
private readonly logger: Logger,
fs: FileSystemOperations,
private readonly serverConfig: ServerConfig,
private readonly expectedFsEvents: ExpectedFsEvents,
private readonly nativeLineEndings = "\n"
) {
this.fs = new SafeFileSystemOperations(fs, logger);
@ -67,13 +65,7 @@ export class FileOperations {
}
await this.createParentDirectories(path);
this.expectedFsEvents.expectCreate(path);
try {
await this.fs.write(path, this.toNativeLineEndings(newContent));
} catch (e) {
this.expectedFsEvents.unexpectCreate(path);
throw e;
}
await this.fs.write(path, this.toNativeLineEndings(newContent));
return path;
}
@ -95,12 +87,6 @@ export class FileOperations {
return;
}
// Single-source the expectation registration: register exactly once
// per call, and unexpect from the catch if the underlying fs op
// throws (FileNotFoundError or otherwise). The previous shape
// registered inside each branch and let the catch swallow
// FileNotFoundError, leaking the expectation into the map.
this.expectedFsEvents.expectUpdate(path);
try {
if (
!isFileTypeMergable(
@ -165,7 +151,6 @@ export class FileOperations {
}
);
} catch (e) {
this.expectedFsEvents.unexpectUpdate(path);
if (e instanceof FileNotFoundError) {
this.logger.debug(
`File ${path} disappeared during write; not recreating`
@ -178,13 +163,7 @@ export class FileOperations {
public async delete(path: RelativePath): Promise<void> {
if (await this.exists(path)) {
this.expectedFsEvents.expectDelete(path);
try {
await this.fs.delete(path);
} catch (e) {
this.expectedFsEvents.unexpectDelete(path);
throw e;
}
await this.fs.delete(path);
await this.deletingEmptyParentDirectoriesOfDeletedFile(path);
} else {
this.logger.debug(`No need to delete '${path}', it doesn't exist`);
@ -223,13 +202,7 @@ export class FileOperations {
}
await this.createParentDirectories(newPath);
this.expectedFsEvents.expectRename(oldPath, newPath);
try {
await this.fs.rename(oldPath, newPath);
} catch (e) {
this.expectedFsEvents.unexpectRename(oldPath, newPath);
throw e;
}
await this.fs.rename(oldPath, newPath);
await this.deletingEmptyParentDirectoriesOfDeletedFile(oldPath);
return newPath;
}

View file

@ -30,7 +30,6 @@ import { setUpTelemetry } from "./utils/set-up-telemetry";
import { ServerConfig } from "./services/server-config";
import type { EventListeners } from "./utils/data-structures/event-listeners";
import { Lock } from "./utils/data-structures/locks";
import { ExpectedFsEvents } from "./sync-operations/expected-fs-events";
export class SyncClient {
private hasFinishedOfflineSync = false;
@ -55,8 +54,7 @@ export class SyncClient {
private readonly fileChangeNotifier: FileChangeNotifier,
private readonly contentCache: FixedSizeDocumentCache,
private readonly serverConfig: ServerConfig,
private readonly syncService: SyncService,
private readonly expectedFsEvents: ExpectedFsEvents
private readonly syncService: SyncService
) {}
public get syncedDocumentCount(): number {
@ -209,13 +207,10 @@ export class SyncClient {
const serverConfig = new ServerConfig(syncService, settings);
const expectedFsEvents = new ExpectedFsEvents();
const fileOperations = new FileOperations(
logger,
fs,
serverConfig,
expectedFsEvents,
nativeLineEndings
);
@ -262,8 +257,7 @@ export class SyncClient {
fileChangeNotifier,
contentCache,
serverConfig,
syncService,
expectedFsEvents
syncService
);
logger.info("SyncClient created successfully");
@ -378,10 +372,6 @@ export class SyncClient {
this.checkIfDestroyed("syncLocallyCreatedFile");
this.fileChangeNotifier.notifyOfFileChange(relativePath); // this is for updating cursors
if (this.expectedFsEvents.matchCreate(relativePath)) {
return;
}
this.syncer.syncLocallyCreatedFile(relativePath);
}
@ -395,10 +385,6 @@ export class SyncClient {
this.checkIfDestroyed("syncLocallyUpdatedFile");
this.fileChangeNotifier.notifyOfFileChange(relativePath); // this is for updating cursors
if (this.expectedFsEvents.matchUpdate(relativePath, oldPath)) {
return;
}
this.syncer.syncLocallyUpdatedFile({
oldPath,
relativePath
@ -409,10 +395,6 @@ export class SyncClient {
this.checkIfDestroyed("syncLocallyDeletedFile");
this.fileChangeNotifier.notifyOfFileChange(relativePath); // this is for updating cursors
if (this.expectedFsEvents.matchDelete(relativePath)) {
return;
}
this.syncer.syncLocallyDeletedFile(relativePath);
}
@ -540,10 +522,6 @@ export class SyncClient {
// paused (offline edits, deletes, renames) wouldn't be detected, and
// an incoming remote update would silently overwrite them.
this.syncer.clearOfflineScanGate();
// Drop any expected fs events that were registered but never matched
// (e.g. an op aborted by SyncResetError). Otherwise a real user edit
// at the same path after re-enable would be swallowed.
this.expectedFsEvents.clear();
}
private resetInMemoryState(): void {

View file

@ -1,138 +0,0 @@
import type { RelativePath } from "./types";
/**
* Counter-based registry of filesystem events the syncer is about to
* cause. The syncer's own writes/renames/deletes go through
* `FileOperations`, which calls into the host filesystem; the host then
* fires watcher events that come back through `SyncClient.syncLocallyXxx`.
* Without filtering, those echo events would be re-uploaded to the server
* and broadcast back, producing an unbounded loop.
*
* The fix: every fs call in `FileOperations` registers the event it is
* about to provoke; the matching `syncLocallyXxx` handler consumes it.
* User-initiated edits never register, so they pass through unchanged.
*
* Counts are per (kind, path) so back-to-back syncer ops on the same path
* (e.g. apply remote update then re-apply during convergence) match
* one-for-one. If the watcher never fires for a registered op (e.g. the
* fs throws before notifying), the entry is left behind; `clear()` is
* called on pause/destroy to drop those before they collide with a real
* user event later.
*/
export class ExpectedFsEvents {
private readonly creates = new Map<RelativePath, number>();
private readonly updates = new Map<RelativePath, number>();
private readonly deletes = new Map<RelativePath, number>();
// Renames are keyed by `JSON.stringify({oldPath, newPath})` so the
// delimiter cannot occur inside either path.
private readonly renames = new Map<RelativePath, number>();
private static renameKey(
oldPath: RelativePath,
newPath: RelativePath
): string {
return JSON.stringify({ oldPath, newPath });
}
public expectCreate(path: RelativePath): void {
this.bump(this.creates, path);
}
public expectUpdate(path: RelativePath): void {
this.bump(this.updates, path);
}
public expectDelete(path: RelativePath): void {
this.bump(this.deletes, path);
}
public expectRename(oldPath: RelativePath, newPath: RelativePath): void {
this.bump(this.renames, ExpectedFsEvents.renameKey(oldPath, newPath));
}
/**
* Cancel a previously-registered expectation when the fs op that registered
* it failed before any watcher event could fire. Without this, a leaked
* expectation silently swallows the next genuine user event at the same
* path (or, for renames, the same `oldPath → newPath` pair).
*
* Floored at zero: if the watcher *did* fire (op partially completed) and
* already consumed the entry, the unexpect is a no-op. The fallback is
* acceptable at worst we re-upload a real edit we'd otherwise filter.
*/
public unexpectCreate(path: RelativePath): void {
this.decrement(this.creates, path);
}
public unexpectUpdate(path: RelativePath): void {
this.decrement(this.updates, path);
}
public unexpectDelete(path: RelativePath): void {
this.decrement(this.deletes, path);
}
public unexpectRename(oldPath: RelativePath, newPath: RelativePath): void {
this.decrement(
this.renames,
ExpectedFsEvents.renameKey(oldPath, newPath)
);
}
public matchCreate(path: RelativePath): boolean {
return this.consume(this.creates, path);
}
public matchUpdate(
path: RelativePath,
oldPath: RelativePath | undefined
): boolean {
if (oldPath !== undefined) {
return this.consume(
this.renames,
ExpectedFsEvents.renameKey(oldPath, path)
);
}
return this.consume(this.updates, path);
}
public matchDelete(path: RelativePath): boolean {
return this.consume(this.deletes, path);
}
public clear(): void {
this.creates.clear();
this.updates.clear();
this.deletes.clear();
this.renames.clear();
}
private bump(map: Map<RelativePath, number>, key: RelativePath): void {
map.set(key, (map.get(key) ?? 0) + 1);
}
private consume(
map: Map<RelativePath, number>,
key: RelativePath
): boolean {
const count = map.get(key) ?? 0;
if (count === 0) {
return false;
}
if (count === 1) {
map.delete(key);
} else {
map.set(key, count - 1);
}
return true;
}
private decrement(map: Map<RelativePath, number>, key: RelativePath): void {
const count = map.get(key) ?? 0;
if (count <= 1) {
map.delete(key);
} else {
map.set(key, count - 1);
}
}
}

View file

@ -306,9 +306,65 @@ export class Reconciler {
}
}
// Re-check ownership after the content fetch. A user rename or
// other interleaved op may have placed bytes / claimed the slot
// during the await. Without this, `upsertRecord` below would
// displace the new owner (clearing its `localPath`) and the
// following `operations.create` would then throw
// `FileAlreadyExistsError`, leaving the displaced record
// placement-pending with its bytes orphaned on disk.
try {
if (await this.operations.exists(target)) {
this.logger.debug(
`Reconciler: cannot place ${record.documentId} at ${target} ` +
`— slot newly occupied on disk after fetch; will retry next pass`
);
return;
}
} catch (e) {
this.logger.error(
`Reconciler: existence check failed for ${target}: ${String(e)}`
);
return;
}
if (this.queue.byLocalPath.get(target) !== undefined) {
this.logger.debug(
`Reconciler: cannot place ${record.documentId} at ${target} ` +
`— slot newly tracked by another record after fetch; will retry next pass`
);
return;
}
// Install the slot *before* the disk write so the watcher's
// create echo, when it arrives, sees `byLocalPath[target]` set
// and the queue's enqueue-time echo guard drops it. Also pre-
// populates `remoteHash` so any downstream operation that
// compares against it (e.g. `processLocalUpdate`'s hashChanged
// skip) sees the right value. Mirrors the ordering in
// `processRemoteCreateForNewDocument`'s quick-write branch.
const contentHash = await hash(content);
try {
await this.queue.upsertRecord({
documentId: record.documentId,
parentVersionId: record.parentVersionId,
remoteRelativePath: record.remoteRelativePath,
remoteHash: contentHash,
localPath: target
});
} catch (e) {
this.logger.error(
`Reconciler: upsertRecord before create failed for ${record.documentId}: ${String(e)}`
);
return;
}
try {
await this.operations.create(target, content);
} catch (e) {
// Roll back the slot claim so a later pass can retry or
// re-resolve. Without this, the record looks placed but the
// bytes never made it to disk.
await this.queue.setLocalPath(record.documentId, undefined);
if (e instanceof FileNotFoundError) {
this.logger.debug(
`Reconciler: create at ${target} hit FileNotFound (likely parent ` +
@ -329,14 +385,6 @@ export class Reconciler {
return;
}
try {
await this.queue.setLocalPath(record.documentId, target);
} catch (e) {
this.logger.error(
`Reconciler: setLocalPath after create failed for ${record.documentId}: ${String(e)}`
);
return;
}
this.pendingPlacementContent.delete(record.documentId);
this.logger.debug(
`Reconciler: placed ${record.documentId} at ${target}`
@ -663,8 +711,10 @@ export class Reconciler {
// We pass the freshly-read pre-write content as
// `expectedContent` so the 3-way merge inside `operations.write`
// becomes a clean overwrite (no concurrent edits to merge with).
// `operations.write` registers `expectUpdate` itself, so the
// watcher swallows each leg's modify event.
// Each leg's echo modify event is harmless: the wire-loop's
// `processLocalUpdate` re-hashes the file and compares to
// `record.remoteHash`, which was set to match the just-written
// bytes — so the echo is dropped as a no-op.
const writtenLegs: SwapLeg[] = [];
for (const leg of legs) {
const newBytes = contentByDocId.get(leg.documentId);

View file

@ -247,36 +247,33 @@ describe("SyncEventQueue", () => {
assert.strictEqual(second.isUserRename, true);
});
it("settled record owns a path over a stale pending create", async () => {
it("drops LocalCreate echoes for paths already tracked", async () => {
// The syncer's own remote-create writes (quick-write +
// reconciler placements) upsert the record at `localPath`
// before calling `operations.create`. The watcher echo then
// re-enters as a LocalCreate at the same path — it must be
// dropped here, otherwise the wire-loop would POST a duplicate
// and the server would deconflict it into a phantom file.
const queue = createQueue();
await queue.upsertRecord(fakeRecord("A", { localPath: "b.md" }));
await queue.enqueue({ type: SyncEventType.LocalCreate, path: "b.md" });
await queue.enqueue({
type: SyncEventType.LocalUpdate,
path: "c.md",
oldPath: "b.md"
});
const aRecord = queue.getDocumentByDocumentId("A");
assert.strictEqual(aRecord?.localPath, "c.md");
assert.strictEqual(
queue.getRecordByLocalPath("b.md" as RelativePath),
undefined
);
assert.strictEqual(
queue.getRecordByLocalPath("c.md" as RelativePath)?.documentId,
"A"
);
assert.strictEqual(await queue.next(), undefined);
});
it("admits LocalCreate when the prior owner is pending server delete", async () => {
// A user create at a path whose previous doc is in the
// HTTP-acked-but-WS-pending window is genuine — propagate it.
const queue = createQueue();
await queue.upsertRecord(fakeRecord("A", { localPath: "b.md" }));
queue.markServerDeletePending("A");
await queue.enqueue({ type: SyncEventType.LocalCreate, path: "b.md" });
const create = await queue.next();
assert.strictEqual(create?.type, SyncEventType.LocalCreate);
assert.strictEqual(create.path, "b.md");
const update = await queue.next();
assert.strictEqual(update?.type, SyncEventType.LocalUpdate);
assert.strictEqual(update.documentId, "A");
assert.strictEqual(update.path, "c.md");
});
it("byLocalPath stays consistent across upsertRecord, setLocalPath, and rename", async () => {

View file

@ -145,9 +145,9 @@ export class SyncEventQueue {
displaced.localPath = undefined;
this.logger.warn(
`Persisted state had two records sharing localPath ` +
`${record.localPath} (${displaced.documentId} and ` +
`${record.documentId}); clearing the prior holder's ` +
`localPath so the reconciler re-places it`
`${record.localPath} (${displaced.documentId} and ` +
`${record.documentId}); clearing the prior holder's ` +
`localPath so the reconciler re-places it`
);
}
this._byLocalPath.set(record.localPath, record);
@ -267,6 +267,16 @@ export class SyncEventQueue {
}
if (input.type === SyncEventType.LocalCreate) {
const owner = this._byLocalPath.get(path);
if (
owner !== undefined &&
!this.hasPendingServerDelete(owner.documentId)
) {
this.logger.debug(
`Ignoring LocalCreate echo at ${path}: slot is already tracked by ${owner.documentId}`
);
return;
}
this.events.push({
type: SyncEventType.LocalCreate,
path,
@ -279,7 +289,7 @@ export class SyncEventQueue {
const lookupPath =
input.type === SyncEventType.LocalUpdate &&
input.oldPath !== undefined
input.oldPath !== undefined
? input.oldPath
: path;
const record = this._byLocalPath.get(lookupPath);
@ -796,7 +806,7 @@ export class SyncEventQueue {
return;
}
for (let i = createIndex + 1; i < this.events.length; ) {
for (let i = createIndex + 1; i < this.events.length;) {
const event = this.events[i];
if (
event.type === SyncEventType.LocalDelete &&
@ -849,7 +859,7 @@ export class SyncEventQueue {
return;
}
for (let i = createIndex + 1; i < this.events.length; ) {
for (let i = createIndex + 1; i < this.events.length;) {
const event = this.events[i];
if (
event.type === SyncEventType.LocalUpdate &&

View file

@ -592,10 +592,28 @@ export class Syncer {
): Promise<void> {
const documentId = await event.documentId;
const record = this.queue.getDocumentByDocumentId(documentId);
if (
record?.localPath !== undefined &&
record.localPath !== event.path
) {
if (record === undefined) {
// The doc is no longer tracked. Typical cause: a remote delete
// arrived first and `processRemoteDelete` already ran
// `removeDocumentById`, but `operations.delete` fired a
// watcher echo that landed in the queue as a stale LocalDelete.
// Without this skip we'd re-send DELETE to the server for a
// doc that's already gone.
this.logger.debug(
`Skipping local-delete for ${documentId} — doc no longer tracked`
);
return;
}
if (this.queue.hasPendingServerDelete(documentId)) {
// We already initiated the server delete; nothing more to do.
// Reaches here when a LocalDelete echo lands behind the
// already-queued LocalDelete that drove the server delete.
this.logger.debug(
`Skipping local-delete for ${documentId} — server delete already pending`
);
return;
}
if (record.localPath !== undefined && record.localPath !== event.path) {
this.logger.debug(
`Skipping local-delete for ${documentId} at ${event.path}: ` +
`record now owns ${record.localPath}`

View file

@ -1,4 +1,4 @@
#!/bin/bash
rm -rf /tmp/vaultlink-e2e-databases
rm -rf /host/tmp/vaultlink-e2e-databases
rm -rf logs

View file

@ -31,7 +31,7 @@ sleep 1
# Clean databases (uses tmpfs via /dev/shm for zero disk I/O)
echo "Cleaning databases..."
rm -rf /tmp/databases
rm -rf /host/tmp/vaultlink-e2e-databases
# Start the server in the background
echo "Starting server..."

View file

@ -1,5 +1,5 @@
database:
databases_directory_path: /tmp/databases
databases_directory_path: /host/tmp/vaultlink-e2e-databases
max_connections_per_vault: 8
cursor_timeout: 1m
server:

View file

@ -23,7 +23,7 @@ pub const DEFAULT_LOG_ROTATION_INTERVAL: Duration = Duration::from_hours(24);
pub const IDLE_POOL_TIMEOUT: Duration = Duration::from_mins(5);
/// Fail fast on pool acquire so a transiently locked database surfaces as
/// a 429 in seconds, not after a 30s busy_timeout. Callers retry.
/// a 429 in seconds, not after a 30s `busy_timeout`. Callers retry.
pub const POOL_ACQUIRE_TIMEOUT: Duration = Duration::from_secs(5);
pub const GRACEFUL_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(10);
pub const HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(10);

View file

@ -244,7 +244,8 @@ pub async fn update_document(
let content_clone = content.clone();
let merged = tokio::task::spawn_blocking(move || {
let merged = reconcile(
reconcile(
&parent_owned,
&latest_owned.into(),
&new_owned.into(),
@ -252,8 +253,7 @@ pub async fn update_document(
)
.apply()
.text()
.into_bytes();
merged
.into_bytes()
})
.await
.map_err(|e| server_error(anyhow::anyhow!("Reconcile task failed: {e}")))?;