From 8e87537e4994748aa2850ac5f9e97173181791c4 Mon Sep 17 00:00:00 2001 From: Andras Schmelczer Date: Tue, 5 May 2026 21:50:27 +0100 Subject: [PATCH] claude --- .claude/scheduled_tasks.lock | 1 + .../offline-change-detector.ts | 22 +++++ .../sync-operations/sync-event-queue.test.ts | 88 +++++++++++++++++-- .../src/sync-operations/sync-event-queue.ts | 70 +++++++++++++-- .../sync-client/src/sync-operations/syncer.ts | 18 ++-- sync-server/Cargo.lock | 81 +++++++++++++++++ sync-server/Cargo.toml | 1 + sync-server/src/main.rs | 21 +++-- 8 files changed, 277 insertions(+), 25 deletions(-) create mode 100644 .claude/scheduled_tasks.lock diff --git a/.claude/scheduled_tasks.lock b/.claude/scheduled_tasks.lock new file mode 100644 index 00000000..fa24d1c1 --- /dev/null +++ b/.claude/scheduled_tasks.lock @@ -0,0 +1 @@ +{"sessionId":"ba370285-50c8-425b-bbd6-21e8273dd73e","pid":77001,"procStart":"442537909","acquiredAt":1777926837871} \ No newline at end of file diff --git a/frontend/sync-client/src/sync-operations/offline-change-detector.ts b/frontend/sync-client/src/sync-operations/offline-change-detector.ts index 6a885161..f5bc0015 100644 --- a/frontend/sync-client/src/sync-operations/offline-change-detector.ts +++ b/frontend/sync-client/src/sync-operations/offline-change-detector.ts @@ -30,6 +30,28 @@ export async function scheduleOfflineChanges( // next pass. const allDocuments = queue.allSettledDocuments(); + // Placement-pending records (`localPath === undefined`) name a server + // path that the reconciler will eventually place. If the user already + // has a local file at that path — common after a sync-disable or + // reset that discarded a successful create's response, leaving the + // server-known doc as a placement-pending record once catch-up + // re-delivered it — treating it as an untracked file would + // re-create a duplicate doc at the server's deconflicted path. Bind + // each placement-pending record to its on-disk file: a same-hash + // file just inherits the record's localPath; a different-hash file + // is folded into the sync-up update flow below (an UPDATE on the + // existing doc rather than a fresh CREATE). + for (const record of queue.allRecords()) { + if (record.localPath !== undefined) { + continue; + } + if (!allLocalFiles.has(record.remoteRelativePath)) { + continue; + } + await queue.setLocalPath(record.documentId, record.remoteRelativePath); + allDocuments.set(record.remoteRelativePath, record); + } + // A doc is "possibly deleted" only if it has no local file. Including // docs that still exist locally would queue a spurious delete alongside // the update below. diff --git a/frontend/sync-client/src/sync-operations/sync-event-queue.test.ts b/frontend/sync-client/src/sync-operations/sync-event-queue.test.ts index 82f78af4..d2676011 100644 --- a/frontend/sync-client/src/sync-operations/sync-event-queue.test.ts +++ b/frontend/sync-client/src/sync-operations/sync-event-queue.test.ts @@ -290,25 +290,30 @@ describe("SyncEventQueue", () => { "A" ); - // upsertRecord that relocates the localPath should re-key. + // upsertRecord on an existing record with a non-undefined + // localPath does NOT rewrite localPath. The watcher path and the + // reconciler are the only authorities on localPath of an + // already-placed record; letting the wire loop re-key here would + // race a user rename that landed during an HTTP roundtrip. await queue.upsertRecord( fakeRecord("A", { localPath: "renamed.md" as RelativePath }) ); assert.strictEqual(queue.byLocalPath.size, 1); assert.strictEqual( - queue.byLocalPath.get("a.md" as RelativePath), - undefined - ); - assert.strictEqual( - queue.byLocalPath.get("renamed.md" as RelativePath)?.documentId, + queue.byLocalPath.get("a.md" as RelativePath)?.documentId, "A" ); + assert.strictEqual( + queue.byLocalPath.get("renamed.md" as RelativePath), + undefined + ); + assert.strictEqual(queue.getDocumentByDocumentId("A")?.localPath, "a.md"); - // setLocalPath should re-key. + // setLocalPath does re-key — it's the explicit path-mutation API. await queue.setLocalPath("A", "later.md" as RelativePath); assert.strictEqual(queue.byLocalPath.size, 1); assert.strictEqual( - queue.byLocalPath.get("renamed.md" as RelativePath), + queue.byLocalPath.get("a.md" as RelativePath), undefined ); assert.strictEqual( @@ -331,6 +336,73 @@ describe("SyncEventQueue", () => { ); }); + it("upsertRecord installs localPath only when the existing record has none (placement-pending → placed)", async () => { + const queue = createQueue(); + + // Same-docId-collapse shape: a placement-pending record (created + // earlier by a remote-create handler when the slot was occupied) + // gets resolved by a LocalCreate that returns the same docId. + // The watcher hasn't touched localPath since the record is + // placement-pending, so installing the now-known path is correct. + await queue.upsertRecord(fakeRecord("A", { localPath: undefined })); + assert.strictEqual(queue.byLocalPath.size, 0); + + await queue.upsertRecord( + fakeRecord("A", { localPath: "fresh.md" as RelativePath }) + ); + assert.strictEqual(queue.byLocalPath.size, 1); + assert.strictEqual( + queue.byLocalPath.get("fresh.md" as RelativePath)?.documentId, + "A" + ); + assert.strictEqual( + queue.getDocumentByDocumentId("A")?.localPath, + "fresh.md" + ); + }); + + it("upsertRecord ignores stale localPath from the wire loop after a watcher rename", async () => { + const queue = createQueue(); + await queue.upsertRecord(fakeRecord("A")); + + // Watcher renames a.md -> renamed.md while the wire loop is + // mid-roundtrip. The wire loop captured an earlier snapshot of + // localPath and now tries to write it back through upsertRecord. + await queue.enqueue({ + type: SyncEventType.LocalUpdate, + path: "renamed.md", + oldPath: "a.md" + }); + assert.strictEqual( + queue.getDocumentByDocumentId("A")?.localPath, + "renamed.md" + ); + + await queue.upsertRecord( + fakeRecord("A", { + parentVersionId: 2, + remoteRelativePath: "a.md", + remoteHash: "hash-A-v2", + localPath: "a.md" as RelativePath + }) + ); + + // The watcher's rename wins: localPath stays at renamed.md. + const record = queue.getDocumentByDocumentId("A"); + assert.strictEqual(record?.localPath, "renamed.md"); + assert.strictEqual(record.parentVersionId, 2); + assert.strictEqual(record.remoteRelativePath, "a.md"); + assert.strictEqual(record.remoteHash, "hash-A-v2"); + assert.strictEqual( + queue.byLocalPath.get("renamed.md" as RelativePath)?.documentId, + "A" + ); + assert.strictEqual( + queue.byLocalPath.get("a.md" as RelativePath), + undefined + ); + }); + it("create can be re-enqueued after being dequeued", async () => { const queue = createQueue(); await queue.enqueue({ type: SyncEventType.LocalCreate, path: "a.md" }); diff --git a/frontend/sync-client/src/sync-operations/sync-event-queue.ts b/frontend/sync-client/src/sync-operations/sync-event-queue.ts index f9209676..1ca69b8a 100644 --- a/frontend/sync-client/src/sync-operations/sync-event-queue.ts +++ b/frontend/sync-client/src/sync-operations/sync-event-queue.ts @@ -471,9 +471,14 @@ export class SyncEventQueue { * fields on their next read — this stays load-bearing for the Syncer's * drain handlers, which await across HTTP roundtrips. * - * Maintains the `byLocalPath` index. If the `localPath` changes the - * relocation goes through `setLocalPath` (which also persists), so the - * caller doesn't need to call `save()` separately. + * For an existing record this updates the wire fields + * (`parentVersionId`, `remoteHash`, `remoteRelativePath`) and, only + * when the existing record has no local file yet + * (`localPath === undefined`), installs the supplied `localPath`. A + * non-undefined existing localPath is owned by the watcher path and + * the Reconciler — overwriting it from the wire loop would race a + * user rename that landed during an HTTP roundtrip and silently + * resurrect a stale slot. */ public async upsertRecord(record: DocumentRecord): Promise { const existing = this.byDocId.get(record.documentId); @@ -498,8 +503,10 @@ export class SyncEventQueue { existing.parentVersionId = record.parentVersionId; existing.remoteHash = record.remoteHash; existing.remoteRelativePath = record.remoteRelativePath; - if (existing.localPath !== record.localPath) { - // setLocalPath re-keys `byLocalPath` and persists. + if ( + existing.localPath === undefined && + record.localPath !== undefined + ) { return this.setLocalPath(record.documentId, record.localPath); } } @@ -715,6 +722,7 @@ export class SyncEventQueue { createEvent.path = newPath; if (!createEvent.isProcessing) { this.moveBlockingDeletesBeforeCreate(createEvent, newPath); + this.moveBlockingRenamesBeforeCreate(createEvent, newPath); } for (const e of this.events) { @@ -753,6 +761,58 @@ export class SyncEventQueue { } } + /** + * The `path` argument is the create's just-retargeted target. Any + * other tracked doc whose server-side path is still `path` (its + * watcher-driven local rename hasn't reached the server yet) needs + * its pending LocalUpdate to drain *before* this create — otherwise + * the create's HTTP request hits the server while the doc is still + * at `path` and triggers a same-path same-docId merge that + * silently consumes the user's "new doc" intent into the + * already-tracked doc. The pending LocalUpdate is the rename that + * moves the existing doc off `path` server-side; running it first + * frees the slot. Skipped when the create has already been sent — + * at that point the merge has already happened or hasn't, and + * reordering the queue can't unwind it. + */ + private moveBlockingRenamesBeforeCreate( + createEvent: Extract, + path: RelativePath + ): void { + const blockingDocIds = new Set(); + for (const record of this.byDocId.values()) { + if ( + record.remoteRelativePath === path && + record.localPath !== path + ) { + blockingDocIds.add(record.documentId); + } + } + if (blockingDocIds.size === 0) { + return; + } + + let createIndex = this.events.indexOf(createEvent); + if (createIndex < 0) { + return; + } + + for (let i = createIndex + 1; i < this.events.length; ) { + const event = this.events[i]; + if ( + event.type === SyncEventType.LocalUpdate && + typeof event.documentId === "string" && + blockingDocIds.has(event.documentId) + ) { + this.events.splice(i, 1); + this.events.splice(createIndex, 0, event); + createIndex++; + continue; + } + i++; + } + } + /** * Synchronous half of `setLocalPath`: mutate `record.localPath` and * re-key `_byLocalPath` without persisting. Used by `enqueue`'s diff --git a/frontend/sync-client/src/sync-operations/syncer.ts b/frontend/sync-client/src/sync-operations/syncer.ts index 20c5024c..2eae17c6 100644 --- a/frontend/sync-client/src/sync-operations/syncer.ts +++ b/frontend/sync-client/src/sync-operations/syncer.ts @@ -729,10 +729,11 @@ export class Syncer { parentVersionId: response.vaultUpdateId, remoteRelativePath: response.relativePath, remoteHash, - // localPath is unchanged by the wire loop; only the watcher - // (via the queue's enqueue rename branch) and the reconciler - // touch it. Read from the live record so a rename that - // arrived during the await is preserved. + // localPath is owned by the watcher and the reconciler. Pass + // the value we observed pre-await purely as a hint for the + // placement-pending → placed transition; `upsertRecord` ignores + // it when an existing localPath is already set, so a watcher + // rename that landed during the HTTP roundtrip is preserved. localPath: livePath }); this.queue.lastSeenUpdateId = response.vaultUpdateId; @@ -948,8 +949,10 @@ export class Syncer { // doc (a same-path recreate installed a new owner without // clearing this record's stale field — same race shape as the // processRemoteDelete fix above). Writing to a shadowed slot - // would clobber the new owner's bytes. Treat shadowed records - // as if they had no local file: stash for the reconciler. + // would clobber the new owner's bytes. Clear the stale claim now + // so the reconciler treats this record as placement-pending; the + // closing `upsertRecord` no longer touches an existing record's + // localPath, so the clear has to happen explicitly here. const claimedPath = record.localPath; const livePath = claimedPath !== undefined && @@ -960,8 +963,9 @@ export class Syncer { if (claimedPath !== undefined && livePath === undefined) { this.logger.debug( `Remote update for ${record.documentId} at claimed ${claimedPath} ` + - `but slot is shadowed; deferring write to reconciler` + `but slot is shadowed; clearing stale claim and deferring to reconciler` ); + await this.queue.setLocalPath(record.documentId, undefined); } if (livePath !== undefined) { const currentContent = await this.operations.read(livePath); diff --git a/sync-server/Cargo.lock b/sync-server/Cargo.lock index 333d7ae4..82a7ce92 100644 --- a/sync-server/Cargo.lock +++ b/sync-server/Cargo.lock @@ -457,6 +457,15 @@ version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5" +[[package]] +name = "crossbeam-channel" +version = "0.5.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "82b8f8f868b36967f9606790d1903570de9ceaf870a7bf9fbbd3016d636a2cb2" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-queue" version = "0.3.11" @@ -534,6 +543,15 @@ dependencies = [ "zeroize", ] +[[package]] +name = "deranged" +version = "0.5.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7cd812cc2bc1d69d4764bd80df88b4317eaef9e773c75226407d9bc0876b211c" +dependencies = [ + "powerfmt", +] + [[package]] name = "digest" version = "0.10.7" @@ -1352,6 +1370,12 @@ dependencies = [ "zeroize", ] +[[package]] +name = "num-conv" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c6673768db2d862beb9b39a78fdcb1a69439615d5794a1be50caa9bc92c81967" + [[package]] name = "num-integer" version = "0.1.46" @@ -1480,6 +1504,12 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "953ec861398dccce10c670dfeaf3ec4911ca479e9c02154b3a215178c5f566f2" +[[package]] +name = "powerfmt" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" + [[package]] name = "ppv-lite86" version = "0.2.20" @@ -2160,6 +2190,12 @@ version = "2.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292" +[[package]] +name = "symlink" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a7973cce6668464ea31f176d85b13c7ab3bba2cb3b77a2ed26abd7801688010a" + [[package]] name = "syn" version = "1.0.109" @@ -2211,6 +2247,7 @@ dependencies = [ "tokio", "tower-http", "tracing", + "tracing-appender", "tracing-subscriber", "ts-rs", "uuid", @@ -2305,6 +2342,37 @@ dependencies = [ "once_cell", ] +[[package]] +name = "time" +version = "0.3.47" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "743bd48c283afc0388f9b8827b976905fb217ad9e647fae3a379a9283c4def2c" +dependencies = [ + "deranged", + "itoa", + "num-conv", + "powerfmt", + "serde_core", + "time-core", + "time-macros", +] + +[[package]] +name = "time-core" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7694e1cfe791f8d31026952abf09c69ca6f6fa4e1a1229e18988f06a04a12dca" + +[[package]] +name = "time-macros" +version = "0.2.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e70e4c5a0e0a8a4823ad65dfe1a6930e4f4d756dcd9dd7939022b5e8c501215" +dependencies = [ + "num-conv", + "time-core", +] + [[package]] name = "tinystr" version = "0.7.6" @@ -2438,6 +2506,19 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "tracing-appender" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "050686193eb999b4bb3bc2acfa891a13da00f79734704c4b8b4ef1a10b368a3c" +dependencies = [ + "crossbeam-channel", + "symlink", + "thiserror 2.0.18", + "time", + "tracing-subscriber", +] + [[package]] name = "tracing-attributes" version = "0.1.28" diff --git a/sync-server/Cargo.toml b/sync-server/Cargo.toml index ba79ac23..6de17653 100644 --- a/sync-server/Cargo.toml +++ b/sync-server/Cargo.toml @@ -20,6 +20,7 @@ axum_typed_multipart = "0.11.0" tower-http = { version = "0.6.1", features = ["cors", "trace", "limit", "timeout"] } tracing = "0.1.41" tracing-subscriber = { version = "0.3.20", features = ["fmt", "env-filter"]} +tracing-appender = "0.2.5" humantime-serde = "1.1.1" sqlx = { version = "0.8.6", features = ["sqlite", "runtime-tokio", "uuid", "chrono"] } chrono = { version = "0.4.41", features = ["serde"] } diff --git a/sync-server/src/main.rs b/sync-server/src/main.rs index 621717bf..dc00d4d5 100644 --- a/sync-server/src/main.rs +++ b/sync-server/src/main.rs @@ -16,6 +16,7 @@ use consts::DEFAULT_CONFIG_PATH; use errors::{SyncServerError, init_error}; use log::info; use server::create_server; +use tracing_appender::non_blocking::WorkerGuard; use tracing_subscriber::{EnvFilter, fmt::format, layer::SubscriberExt, util::SubscriberInitExt}; use utils::rotating_file_writer::RotatingFileWriter; @@ -43,7 +44,9 @@ async fn main() -> ExitCode { let result = async { config.validate().map_err(init_error)?; - set_up_logging(&args, &config.logging)?; + // Hold the non-blocking writer guards until shutdown so the + // dedicated writer threads stay alive and flush queued log lines. + let _log_guards = set_up_logging(&args, &config.logging)?; start_server(config).await } .await; @@ -60,7 +63,7 @@ async fn main() -> ExitCode { fn set_up_logging( args: &Args, logging_config: &config::logging_config::LoggingConfig, -) -> Result<(), SyncServerError> { +) -> Result<[WorkerGuard; 2], SyncServerError> { let level_filter = logging_config.log_level.as_tracing_level(); let env_filter = EnvFilter::builder() @@ -81,6 +84,14 @@ fn set_up_logging( .context("Failed to create rotating file writer") .map_err(init_error)?; + // Decouple log emission from disk/stderr I/O. Without this, a tokio + // worker that holds the writer's std::sync::Mutex while a `write(2)` + // is throttled by the kernel (e.g. btrfs writeback) cascades the + // stall to every other worker that tries to log, freezing the whole + // runtime. The guards must outlive every emitter. + let (file_writer, file_guard) = tracing_appender::non_blocking(file_appender); + let (stderr_writer, stderr_guard) = tracing_appender::non_blocking(std::io::stderr()); + let format = format() .with_target(is_debug_mode) .with_line_number(is_debug_mode) @@ -88,12 +99,12 @@ fn set_up_logging( let stderr_layer = tracing_subscriber::fmt::layer() .with_ansi(use_colors) - .with_writer(std::io::stderr) + .with_writer(stderr_writer) .event_format(format.clone()); let file_layer = tracing_subscriber::fmt::layer() .with_ansi(false) - .with_writer(file_appender) + .with_writer(file_writer) .event_format(format); tracing_subscriber::registry() @@ -104,7 +115,7 @@ fn set_up_logging( .context("Failed to initialise tracing") .map_err(init_error)?; - Ok(()) + Ok([file_guard, stderr_guard]) } async fn start_server(config: Config) -> Result<(), SyncServerError> {