From 935ed9c8e792b31096f5d90d88ebde17ce6bf173 Mon Sep 17 00:00:00 2001 From: Andras Schmelczer Date: Tue, 12 May 2026 22:18:10 +0100 Subject: [PATCH] Remove expected FS events --- .../file-operations/file-operations.test.ts | 4 +- .../src/file-operations/file-operations.ts | 33 +---- frontend/sync-client/src/sync-client.ts | 26 +--- .../src/sync-operations/expected-fs-events.ts | 138 ------------------ .../src/sync-operations/reconciler.ts | 70 +++++++-- .../sync-operations/sync-event-queue.test.ts | 39 +++-- .../src/sync-operations/sync-event-queue.ts | 22 ++- .../sync-client/src/sync-operations/syncer.ts | 26 +++- scripts/clean-up.sh | 2 +- scripts/e2e.sh | 2 +- sync-server/config-e2e.yml | 2 +- sync-server/src/consts.rs | 2 +- sync-server/src/server/update_document.rs | 6 +- 13 files changed, 129 insertions(+), 243 deletions(-) delete mode 100644 frontend/sync-client/src/sync-operations/expected-fs-events.ts diff --git a/frontend/sync-client/src/file-operations/file-operations.test.ts b/frontend/sync-client/src/file-operations/file-operations.test.ts index 44b4fe7e..0597ca8b 100644 --- a/frontend/sync-client/src/file-operations/file-operations.test.ts +++ b/frontend/sync-client/src/file-operations/file-operations.test.ts @@ -7,7 +7,6 @@ import { assertSetContainsExactly } from "../utils/assert-set-contains-exactly"; import type { FileSystemOperations } from "./filesystem-operations"; import type { TextWithCursors } from "reconcile-text"; import type { ServerConfig, ServerConfigData } from "../services/server-config"; -import { ExpectedFsEvents } from "../sync-operations/expected-fs-events"; import { FileAlreadyExistsError } from "../errors/file-already-exists-error"; class MockServerConfig implements Pick { @@ -72,8 +71,7 @@ function makeOps(): { const ops = new FileOperations( new Logger(), fs, - new MockServerConfig() as ServerConfig, // eslint-disable-line @typescript-eslint/no-unsafe-type-assertion - new ExpectedFsEvents() + new MockServerConfig() as ServerConfig // eslint-disable-line @typescript-eslint/no-unsafe-type-assertion ); return { fs, ops }; } diff --git a/frontend/sync-client/src/file-operations/file-operations.ts b/frontend/sync-client/src/file-operations/file-operations.ts index b73bcec9..efd1d5b2 100644 --- a/frontend/sync-client/src/file-operations/file-operations.ts +++ b/frontend/sync-client/src/file-operations/file-operations.ts @@ -9,7 +9,6 @@ import { isBinary } from "../utils/is-binary"; import type { ServerConfig } from "../services/server-config"; import { FileNotFoundError } from "../errors/file-not-found-error"; import { FileAlreadyExistsError } from "../errors/file-already-exists-error"; -import type { ExpectedFsEvents } from "../sync-operations/expected-fs-events"; export class FileOperations { private readonly fs: SafeFileSystemOperations; @@ -18,7 +17,6 @@ export class FileOperations { private readonly logger: Logger, fs: FileSystemOperations, private readonly serverConfig: ServerConfig, - private readonly expectedFsEvents: ExpectedFsEvents, private readonly nativeLineEndings = "\n" ) { this.fs = new SafeFileSystemOperations(fs, logger); @@ -67,13 +65,7 @@ export class FileOperations { } await this.createParentDirectories(path); - this.expectedFsEvents.expectCreate(path); - try { - await this.fs.write(path, this.toNativeLineEndings(newContent)); - } catch (e) { - this.expectedFsEvents.unexpectCreate(path); - throw e; - } + await this.fs.write(path, this.toNativeLineEndings(newContent)); return path; } @@ -95,12 +87,6 @@ export class FileOperations { return; } - // Single-source the expectation registration: register exactly once - // per call, and unexpect from the catch if the underlying fs op - // throws (FileNotFoundError or otherwise). The previous shape - // registered inside each branch and let the catch swallow - // FileNotFoundError, leaking the expectation into the map. - this.expectedFsEvents.expectUpdate(path); try { if ( !isFileTypeMergable( @@ -165,7 +151,6 @@ export class FileOperations { } ); } catch (e) { - this.expectedFsEvents.unexpectUpdate(path); if (e instanceof FileNotFoundError) { this.logger.debug( `File ${path} disappeared during write; not recreating` @@ -178,13 +163,7 @@ export class FileOperations { public async delete(path: RelativePath): Promise { if (await this.exists(path)) { - this.expectedFsEvents.expectDelete(path); - try { - await this.fs.delete(path); - } catch (e) { - this.expectedFsEvents.unexpectDelete(path); - throw e; - } + await this.fs.delete(path); await this.deletingEmptyParentDirectoriesOfDeletedFile(path); } else { this.logger.debug(`No need to delete '${path}', it doesn't exist`); @@ -223,13 +202,7 @@ export class FileOperations { } await this.createParentDirectories(newPath); - this.expectedFsEvents.expectRename(oldPath, newPath); - try { - await this.fs.rename(oldPath, newPath); - } catch (e) { - this.expectedFsEvents.unexpectRename(oldPath, newPath); - throw e; - } + await this.fs.rename(oldPath, newPath); await this.deletingEmptyParentDirectoriesOfDeletedFile(oldPath); return newPath; } diff --git a/frontend/sync-client/src/sync-client.ts b/frontend/sync-client/src/sync-client.ts index 3a47152e..463ac081 100644 --- a/frontend/sync-client/src/sync-client.ts +++ b/frontend/sync-client/src/sync-client.ts @@ -30,7 +30,6 @@ import { setUpTelemetry } from "./utils/set-up-telemetry"; import { ServerConfig } from "./services/server-config"; import type { EventListeners } from "./utils/data-structures/event-listeners"; import { Lock } from "./utils/data-structures/locks"; -import { ExpectedFsEvents } from "./sync-operations/expected-fs-events"; export class SyncClient { private hasFinishedOfflineSync = false; @@ -55,8 +54,7 @@ export class SyncClient { private readonly fileChangeNotifier: FileChangeNotifier, private readonly contentCache: FixedSizeDocumentCache, private readonly serverConfig: ServerConfig, - private readonly syncService: SyncService, - private readonly expectedFsEvents: ExpectedFsEvents + private readonly syncService: SyncService ) {} public get syncedDocumentCount(): number { @@ -209,13 +207,10 @@ export class SyncClient { const serverConfig = new ServerConfig(syncService, settings); - const expectedFsEvents = new ExpectedFsEvents(); - const fileOperations = new FileOperations( logger, fs, serverConfig, - expectedFsEvents, nativeLineEndings ); @@ -262,8 +257,7 @@ export class SyncClient { fileChangeNotifier, contentCache, serverConfig, - syncService, - expectedFsEvents + syncService ); logger.info("SyncClient created successfully"); @@ -378,10 +372,6 @@ export class SyncClient { this.checkIfDestroyed("syncLocallyCreatedFile"); this.fileChangeNotifier.notifyOfFileChange(relativePath); // this is for updating cursors - if (this.expectedFsEvents.matchCreate(relativePath)) { - return; - } - this.syncer.syncLocallyCreatedFile(relativePath); } @@ -395,10 +385,6 @@ export class SyncClient { this.checkIfDestroyed("syncLocallyUpdatedFile"); this.fileChangeNotifier.notifyOfFileChange(relativePath); // this is for updating cursors - if (this.expectedFsEvents.matchUpdate(relativePath, oldPath)) { - return; - } - this.syncer.syncLocallyUpdatedFile({ oldPath, relativePath @@ -409,10 +395,6 @@ export class SyncClient { this.checkIfDestroyed("syncLocallyDeletedFile"); this.fileChangeNotifier.notifyOfFileChange(relativePath); // this is for updating cursors - if (this.expectedFsEvents.matchDelete(relativePath)) { - return; - } - this.syncer.syncLocallyDeletedFile(relativePath); } @@ -540,10 +522,6 @@ export class SyncClient { // paused (offline edits, deletes, renames) wouldn't be detected, and // an incoming remote update would silently overwrite them. this.syncer.clearOfflineScanGate(); - // Drop any expected fs events that were registered but never matched - // (e.g. an op aborted by SyncResetError). Otherwise a real user edit - // at the same path after re-enable would be swallowed. - this.expectedFsEvents.clear(); } private resetInMemoryState(): void { diff --git a/frontend/sync-client/src/sync-operations/expected-fs-events.ts b/frontend/sync-client/src/sync-operations/expected-fs-events.ts deleted file mode 100644 index a2c4f52f..00000000 --- a/frontend/sync-client/src/sync-operations/expected-fs-events.ts +++ /dev/null @@ -1,138 +0,0 @@ -import type { RelativePath } from "./types"; - -/** - * Counter-based registry of filesystem events the syncer is about to - * cause. The syncer's own writes/renames/deletes go through - * `FileOperations`, which calls into the host filesystem; the host then - * fires watcher events that come back through `SyncClient.syncLocallyXxx`. - * Without filtering, those echo events would be re-uploaded to the server - * and broadcast back, producing an unbounded loop. - * - * The fix: every fs call in `FileOperations` registers the event it is - * about to provoke; the matching `syncLocallyXxx` handler consumes it. - * User-initiated edits never register, so they pass through unchanged. - * - * Counts are per (kind, path) so back-to-back syncer ops on the same path - * (e.g. apply remote update then re-apply during convergence) match - * one-for-one. If the watcher never fires for a registered op (e.g. the - * fs throws before notifying), the entry is left behind; `clear()` is - * called on pause/destroy to drop those before they collide with a real - * user event later. - */ -export class ExpectedFsEvents { - private readonly creates = new Map(); - private readonly updates = new Map(); - private readonly deletes = new Map(); - // Renames are keyed by `JSON.stringify({oldPath, newPath})` so the - // delimiter cannot occur inside either path. - private readonly renames = new Map(); - - private static renameKey( - oldPath: RelativePath, - newPath: RelativePath - ): string { - return JSON.stringify({ oldPath, newPath }); - } - - public expectCreate(path: RelativePath): void { - this.bump(this.creates, path); - } - - public expectUpdate(path: RelativePath): void { - this.bump(this.updates, path); - } - - public expectDelete(path: RelativePath): void { - this.bump(this.deletes, path); - } - - public expectRename(oldPath: RelativePath, newPath: RelativePath): void { - this.bump(this.renames, ExpectedFsEvents.renameKey(oldPath, newPath)); - } - - /** - * Cancel a previously-registered expectation when the fs op that registered - * it failed before any watcher event could fire. Without this, a leaked - * expectation silently swallows the next genuine user event at the same - * path (or, for renames, the same `oldPath → newPath` pair). - * - * Floored at zero: if the watcher *did* fire (op partially completed) and - * already consumed the entry, the unexpect is a no-op. The fallback is - * acceptable — at worst we re-upload a real edit we'd otherwise filter. - */ - public unexpectCreate(path: RelativePath): void { - this.decrement(this.creates, path); - } - - public unexpectUpdate(path: RelativePath): void { - this.decrement(this.updates, path); - } - - public unexpectDelete(path: RelativePath): void { - this.decrement(this.deletes, path); - } - - public unexpectRename(oldPath: RelativePath, newPath: RelativePath): void { - this.decrement( - this.renames, - ExpectedFsEvents.renameKey(oldPath, newPath) - ); - } - - public matchCreate(path: RelativePath): boolean { - return this.consume(this.creates, path); - } - - public matchUpdate( - path: RelativePath, - oldPath: RelativePath | undefined - ): boolean { - if (oldPath !== undefined) { - return this.consume( - this.renames, - ExpectedFsEvents.renameKey(oldPath, path) - ); - } - return this.consume(this.updates, path); - } - - public matchDelete(path: RelativePath): boolean { - return this.consume(this.deletes, path); - } - - public clear(): void { - this.creates.clear(); - this.updates.clear(); - this.deletes.clear(); - this.renames.clear(); - } - - private bump(map: Map, key: RelativePath): void { - map.set(key, (map.get(key) ?? 0) + 1); - } - - private consume( - map: Map, - key: RelativePath - ): boolean { - const count = map.get(key) ?? 0; - if (count === 0) { - return false; - } - if (count === 1) { - map.delete(key); - } else { - map.set(key, count - 1); - } - return true; - } - - private decrement(map: Map, key: RelativePath): void { - const count = map.get(key) ?? 0; - if (count <= 1) { - map.delete(key); - } else { - map.set(key, count - 1); - } - } -} diff --git a/frontend/sync-client/src/sync-operations/reconciler.ts b/frontend/sync-client/src/sync-operations/reconciler.ts index 93505a3c..f4d57762 100644 --- a/frontend/sync-client/src/sync-operations/reconciler.ts +++ b/frontend/sync-client/src/sync-operations/reconciler.ts @@ -306,9 +306,65 @@ export class Reconciler { } } + // Re-check ownership after the content fetch. A user rename or + // other interleaved op may have placed bytes / claimed the slot + // during the await. Without this, `upsertRecord` below would + // displace the new owner (clearing its `localPath`) and the + // following `operations.create` would then throw + // `FileAlreadyExistsError`, leaving the displaced record + // placement-pending with its bytes orphaned on disk. + try { + if (await this.operations.exists(target)) { + this.logger.debug( + `Reconciler: cannot place ${record.documentId} at ${target} ` + + `— slot newly occupied on disk after fetch; will retry next pass` + ); + return; + } + } catch (e) { + this.logger.error( + `Reconciler: existence check failed for ${target}: ${String(e)}` + ); + return; + } + if (this.queue.byLocalPath.get(target) !== undefined) { + this.logger.debug( + `Reconciler: cannot place ${record.documentId} at ${target} ` + + `— slot newly tracked by another record after fetch; will retry next pass` + ); + return; + } + + // Install the slot *before* the disk write so the watcher's + // create echo, when it arrives, sees `byLocalPath[target]` set + // and the queue's enqueue-time echo guard drops it. Also pre- + // populates `remoteHash` so any downstream operation that + // compares against it (e.g. `processLocalUpdate`'s hashChanged + // skip) sees the right value. Mirrors the ordering in + // `processRemoteCreateForNewDocument`'s quick-write branch. + const contentHash = await hash(content); + try { + await this.queue.upsertRecord({ + documentId: record.documentId, + parentVersionId: record.parentVersionId, + remoteRelativePath: record.remoteRelativePath, + remoteHash: contentHash, + localPath: target + }); + } catch (e) { + this.logger.error( + `Reconciler: upsertRecord before create failed for ${record.documentId}: ${String(e)}` + ); + return; + } + try { await this.operations.create(target, content); } catch (e) { + // Roll back the slot claim so a later pass can retry or + // re-resolve. Without this, the record looks placed but the + // bytes never made it to disk. + await this.queue.setLocalPath(record.documentId, undefined); if (e instanceof FileNotFoundError) { this.logger.debug( `Reconciler: create at ${target} hit FileNotFound (likely parent ` + @@ -329,14 +385,6 @@ export class Reconciler { return; } - try { - await this.queue.setLocalPath(record.documentId, target); - } catch (e) { - this.logger.error( - `Reconciler: setLocalPath after create failed for ${record.documentId}: ${String(e)}` - ); - return; - } this.pendingPlacementContent.delete(record.documentId); this.logger.debug( `Reconciler: placed ${record.documentId} at ${target}` @@ -663,8 +711,10 @@ export class Reconciler { // We pass the freshly-read pre-write content as // `expectedContent` so the 3-way merge inside `operations.write` // becomes a clean overwrite (no concurrent edits to merge with). - // `operations.write` registers `expectUpdate` itself, so the - // watcher swallows each leg's modify event. + // Each leg's echo modify event is harmless: the wire-loop's + // `processLocalUpdate` re-hashes the file and compares to + // `record.remoteHash`, which was set to match the just-written + // bytes — so the echo is dropped as a no-op. const writtenLegs: SwapLeg[] = []; for (const leg of legs) { const newBytes = contentByDocId.get(leg.documentId); diff --git a/frontend/sync-client/src/sync-operations/sync-event-queue.test.ts b/frontend/sync-client/src/sync-operations/sync-event-queue.test.ts index 9aadebb4..f4aa6d22 100644 --- a/frontend/sync-client/src/sync-operations/sync-event-queue.test.ts +++ b/frontend/sync-client/src/sync-operations/sync-event-queue.test.ts @@ -247,36 +247,33 @@ describe("SyncEventQueue", () => { assert.strictEqual(second.isUserRename, true); }); - it("settled record owns a path over a stale pending create", async () => { + it("drops LocalCreate echoes for paths already tracked", async () => { + // The syncer's own remote-create writes (quick-write + + // reconciler placements) upsert the record at `localPath` + // before calling `operations.create`. The watcher echo then + // re-enters as a LocalCreate at the same path — it must be + // dropped here, otherwise the wire-loop would POST a duplicate + // and the server would deconflict it into a phantom file. const queue = createQueue(); await queue.upsertRecord(fakeRecord("A", { localPath: "b.md" })); await queue.enqueue({ type: SyncEventType.LocalCreate, path: "b.md" }); - await queue.enqueue({ - type: SyncEventType.LocalUpdate, - path: "c.md", - oldPath: "b.md" - }); - const aRecord = queue.getDocumentByDocumentId("A"); - assert.strictEqual(aRecord?.localPath, "c.md"); - assert.strictEqual( - queue.getRecordByLocalPath("b.md" as RelativePath), - undefined - ); - assert.strictEqual( - queue.getRecordByLocalPath("c.md" as RelativePath)?.documentId, - "A" - ); + assert.strictEqual(await queue.next(), undefined); + }); + + it("admits LocalCreate when the prior owner is pending server delete", async () => { + // A user create at a path whose previous doc is in the + // HTTP-acked-but-WS-pending window is genuine — propagate it. + const queue = createQueue(); + await queue.upsertRecord(fakeRecord("A", { localPath: "b.md" })); + queue.markServerDeletePending("A"); + + await queue.enqueue({ type: SyncEventType.LocalCreate, path: "b.md" }); const create = await queue.next(); assert.strictEqual(create?.type, SyncEventType.LocalCreate); assert.strictEqual(create.path, "b.md"); - - const update = await queue.next(); - assert.strictEqual(update?.type, SyncEventType.LocalUpdate); - assert.strictEqual(update.documentId, "A"); - assert.strictEqual(update.path, "c.md"); }); it("byLocalPath stays consistent across upsertRecord, setLocalPath, and rename", async () => { diff --git a/frontend/sync-client/src/sync-operations/sync-event-queue.ts b/frontend/sync-client/src/sync-operations/sync-event-queue.ts index 66dcf1a4..5fb861e0 100644 --- a/frontend/sync-client/src/sync-operations/sync-event-queue.ts +++ b/frontend/sync-client/src/sync-operations/sync-event-queue.ts @@ -145,9 +145,9 @@ export class SyncEventQueue { displaced.localPath = undefined; this.logger.warn( `Persisted state had two records sharing localPath ` + - `${record.localPath} (${displaced.documentId} and ` + - `${record.documentId}); clearing the prior holder's ` + - `localPath so the reconciler re-places it` + `${record.localPath} (${displaced.documentId} and ` + + `${record.documentId}); clearing the prior holder's ` + + `localPath so the reconciler re-places it` ); } this._byLocalPath.set(record.localPath, record); @@ -267,6 +267,16 @@ export class SyncEventQueue { } if (input.type === SyncEventType.LocalCreate) { + const owner = this._byLocalPath.get(path); + if ( + owner !== undefined && + !this.hasPendingServerDelete(owner.documentId) + ) { + this.logger.debug( + `Ignoring LocalCreate echo at ${path}: slot is already tracked by ${owner.documentId}` + ); + return; + } this.events.push({ type: SyncEventType.LocalCreate, path, @@ -279,7 +289,7 @@ export class SyncEventQueue { const lookupPath = input.type === SyncEventType.LocalUpdate && - input.oldPath !== undefined + input.oldPath !== undefined ? input.oldPath : path; const record = this._byLocalPath.get(lookupPath); @@ -796,7 +806,7 @@ export class SyncEventQueue { return; } - for (let i = createIndex + 1; i < this.events.length; ) { + for (let i = createIndex + 1; i < this.events.length;) { const event = this.events[i]; if ( event.type === SyncEventType.LocalDelete && @@ -849,7 +859,7 @@ export class SyncEventQueue { return; } - for (let i = createIndex + 1; i < this.events.length; ) { + for (let i = createIndex + 1; i < this.events.length;) { const event = this.events[i]; if ( event.type === SyncEventType.LocalUpdate && diff --git a/frontend/sync-client/src/sync-operations/syncer.ts b/frontend/sync-client/src/sync-operations/syncer.ts index c51e7394..14a990d0 100644 --- a/frontend/sync-client/src/sync-operations/syncer.ts +++ b/frontend/sync-client/src/sync-operations/syncer.ts @@ -592,10 +592,28 @@ export class Syncer { ): Promise { const documentId = await event.documentId; const record = this.queue.getDocumentByDocumentId(documentId); - if ( - record?.localPath !== undefined && - record.localPath !== event.path - ) { + if (record === undefined) { + // The doc is no longer tracked. Typical cause: a remote delete + // arrived first and `processRemoteDelete` already ran + // `removeDocumentById`, but `operations.delete` fired a + // watcher echo that landed in the queue as a stale LocalDelete. + // Without this skip we'd re-send DELETE to the server for a + // doc that's already gone. + this.logger.debug( + `Skipping local-delete for ${documentId} — doc no longer tracked` + ); + return; + } + if (this.queue.hasPendingServerDelete(documentId)) { + // We already initiated the server delete; nothing more to do. + // Reaches here when a LocalDelete echo lands behind the + // already-queued LocalDelete that drove the server delete. + this.logger.debug( + `Skipping local-delete for ${documentId} — server delete already pending` + ); + return; + } + if (record.localPath !== undefined && record.localPath !== event.path) { this.logger.debug( `Skipping local-delete for ${documentId} at ${event.path}: ` + `record now owns ${record.localPath}` diff --git a/scripts/clean-up.sh b/scripts/clean-up.sh index 267a1019..dcf400bb 100755 --- a/scripts/clean-up.sh +++ b/scripts/clean-up.sh @@ -1,4 +1,4 @@ #!/bin/bash -rm -rf /tmp/vaultlink-e2e-databases +rm -rf /host/tmp/vaultlink-e2e-databases rm -rf logs diff --git a/scripts/e2e.sh b/scripts/e2e.sh index abc3dcd2..7ab8d90c 100755 --- a/scripts/e2e.sh +++ b/scripts/e2e.sh @@ -31,7 +31,7 @@ sleep 1 # Clean databases (uses tmpfs via /dev/shm for zero disk I/O) echo "Cleaning databases..." -rm -rf /tmp/databases +rm -rf /host/tmp/vaultlink-e2e-databases # Start the server in the background echo "Starting server..." diff --git a/sync-server/config-e2e.yml b/sync-server/config-e2e.yml index 9ba68682..03b860b7 100644 --- a/sync-server/config-e2e.yml +++ b/sync-server/config-e2e.yml @@ -1,5 +1,5 @@ database: - databases_directory_path: /tmp/databases + databases_directory_path: /host/tmp/vaultlink-e2e-databases max_connections_per_vault: 8 cursor_timeout: 1m server: diff --git a/sync-server/src/consts.rs b/sync-server/src/consts.rs index b92fb139..a88fe5ff 100644 --- a/sync-server/src/consts.rs +++ b/sync-server/src/consts.rs @@ -23,7 +23,7 @@ pub const DEFAULT_LOG_ROTATION_INTERVAL: Duration = Duration::from_hours(24); pub const IDLE_POOL_TIMEOUT: Duration = Duration::from_mins(5); /// Fail fast on pool acquire so a transiently locked database surfaces as -/// a 429 in seconds, not after a 30s busy_timeout. Callers retry. +/// a 429 in seconds, not after a 30s `busy_timeout`. Callers retry. pub const POOL_ACQUIRE_TIMEOUT: Duration = Duration::from_secs(5); pub const GRACEFUL_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(10); pub const HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(10); diff --git a/sync-server/src/server/update_document.rs b/sync-server/src/server/update_document.rs index a071d1e9..7977c644 100644 --- a/sync-server/src/server/update_document.rs +++ b/sync-server/src/server/update_document.rs @@ -244,7 +244,8 @@ pub async fn update_document( let content_clone = content.clone(); let merged = tokio::task::spawn_blocking(move || { - let merged = reconcile( + + reconcile( &parent_owned, &latest_owned.into(), &new_owned.into(), @@ -252,8 +253,7 @@ pub async fn update_document( ) .apply() .text() - .into_bytes(); - merged + .into_bytes() }) .await .map_err(|e| server_error(anyhow::anyhow!("Reconcile task failed: {e}")))?;