This commit is contained in:
Andras Schmelczer 2026-05-05 21:50:27 +01:00
parent 8aeb0d6027
commit 8e87537e49
8 changed files with 277 additions and 25 deletions

View file

@ -0,0 +1 @@
{"sessionId":"ba370285-50c8-425b-bbd6-21e8273dd73e","pid":77001,"procStart":"442537909","acquiredAt":1777926837871}

View file

@ -30,6 +30,28 @@ export async function scheduleOfflineChanges(
// next pass.
const allDocuments = queue.allSettledDocuments();
// Placement-pending records (`localPath === undefined`) name a server
// path that the reconciler will eventually place. If the user already
// has a local file at that path — common after a sync-disable or
// reset that discarded a successful create's response, leaving the
// server-known doc as a placement-pending record once catch-up
// re-delivered it — treating it as an untracked file would
// re-create a duplicate doc at the server's deconflicted path. Bind
// each placement-pending record to its on-disk file: a same-hash
// file just inherits the record's localPath; a different-hash file
// is folded into the sync-up update flow below (an UPDATE on the
// existing doc rather than a fresh CREATE).
for (const record of queue.allRecords()) {
if (record.localPath !== undefined) {
continue;
}
if (!allLocalFiles.has(record.remoteRelativePath)) {
continue;
}
await queue.setLocalPath(record.documentId, record.remoteRelativePath);
allDocuments.set(record.remoteRelativePath, record);
}
// A doc is "possibly deleted" only if it has no local file. Including
// docs that still exist locally would queue a spurious delete alongside
// the update below.

View file

@ -290,25 +290,30 @@ describe("SyncEventQueue", () => {
"A"
);
// upsertRecord that relocates the localPath should re-key.
// upsertRecord on an existing record with a non-undefined
// localPath does NOT rewrite localPath. The watcher path and the
// reconciler are the only authorities on localPath of an
// already-placed record; letting the wire loop re-key here would
// race a user rename that landed during an HTTP roundtrip.
await queue.upsertRecord(
fakeRecord("A", { localPath: "renamed.md" as RelativePath })
);
assert.strictEqual(queue.byLocalPath.size, 1);
assert.strictEqual(
queue.byLocalPath.get("a.md" as RelativePath),
undefined
);
assert.strictEqual(
queue.byLocalPath.get("renamed.md" as RelativePath)?.documentId,
queue.byLocalPath.get("a.md" as RelativePath)?.documentId,
"A"
);
assert.strictEqual(
queue.byLocalPath.get("renamed.md" as RelativePath),
undefined
);
assert.strictEqual(queue.getDocumentByDocumentId("A")?.localPath, "a.md");
// setLocalPath should re-key.
// setLocalPath does re-key — it's the explicit path-mutation API.
await queue.setLocalPath("A", "later.md" as RelativePath);
assert.strictEqual(queue.byLocalPath.size, 1);
assert.strictEqual(
queue.byLocalPath.get("renamed.md" as RelativePath),
queue.byLocalPath.get("a.md" as RelativePath),
undefined
);
assert.strictEqual(
@ -331,6 +336,73 @@ describe("SyncEventQueue", () => {
);
});
it("upsertRecord installs localPath only when the existing record has none (placement-pending → placed)", async () => {
const queue = createQueue();
// Same-docId-collapse shape: a placement-pending record (created
// earlier by a remote-create handler when the slot was occupied)
// gets resolved by a LocalCreate that returns the same docId.
// The watcher hasn't touched localPath since the record is
// placement-pending, so installing the now-known path is correct.
await queue.upsertRecord(fakeRecord("A", { localPath: undefined }));
assert.strictEqual(queue.byLocalPath.size, 0);
await queue.upsertRecord(
fakeRecord("A", { localPath: "fresh.md" as RelativePath })
);
assert.strictEqual(queue.byLocalPath.size, 1);
assert.strictEqual(
queue.byLocalPath.get("fresh.md" as RelativePath)?.documentId,
"A"
);
assert.strictEqual(
queue.getDocumentByDocumentId("A")?.localPath,
"fresh.md"
);
});
it("upsertRecord ignores stale localPath from the wire loop after a watcher rename", async () => {
const queue = createQueue();
await queue.upsertRecord(fakeRecord("A"));
// Watcher renames a.md -> renamed.md while the wire loop is
// mid-roundtrip. The wire loop captured an earlier snapshot of
// localPath and now tries to write it back through upsertRecord.
await queue.enqueue({
type: SyncEventType.LocalUpdate,
path: "renamed.md",
oldPath: "a.md"
});
assert.strictEqual(
queue.getDocumentByDocumentId("A")?.localPath,
"renamed.md"
);
await queue.upsertRecord(
fakeRecord("A", {
parentVersionId: 2,
remoteRelativePath: "a.md",
remoteHash: "hash-A-v2",
localPath: "a.md" as RelativePath
})
);
// The watcher's rename wins: localPath stays at renamed.md.
const record = queue.getDocumentByDocumentId("A");
assert.strictEqual(record?.localPath, "renamed.md");
assert.strictEqual(record.parentVersionId, 2);
assert.strictEqual(record.remoteRelativePath, "a.md");
assert.strictEqual(record.remoteHash, "hash-A-v2");
assert.strictEqual(
queue.byLocalPath.get("renamed.md" as RelativePath)?.documentId,
"A"
);
assert.strictEqual(
queue.byLocalPath.get("a.md" as RelativePath),
undefined
);
});
it("create can be re-enqueued after being dequeued", async () => {
const queue = createQueue();
await queue.enqueue({ type: SyncEventType.LocalCreate, path: "a.md" });

View file

@ -471,9 +471,14 @@ export class SyncEventQueue {
* fields on their next read this stays load-bearing for the Syncer's
* drain handlers, which await across HTTP roundtrips.
*
* Maintains the `byLocalPath` index. If the `localPath` changes the
* relocation goes through `setLocalPath` (which also persists), so the
* caller doesn't need to call `save()` separately.
* For an existing record this updates the wire fields
* (`parentVersionId`, `remoteHash`, `remoteRelativePath`) and, only
* when the existing record has no local file yet
* (`localPath === undefined`), installs the supplied `localPath`. A
* non-undefined existing localPath is owned by the watcher path and
* the Reconciler overwriting it from the wire loop would race a
* user rename that landed during an HTTP roundtrip and silently
* resurrect a stale slot.
*/
public async upsertRecord(record: DocumentRecord): Promise<void> {
const existing = this.byDocId.get(record.documentId);
@ -498,8 +503,10 @@ export class SyncEventQueue {
existing.parentVersionId = record.parentVersionId;
existing.remoteHash = record.remoteHash;
existing.remoteRelativePath = record.remoteRelativePath;
if (existing.localPath !== record.localPath) {
// setLocalPath re-keys `byLocalPath` and persists.
if (
existing.localPath === undefined &&
record.localPath !== undefined
) {
return this.setLocalPath(record.documentId, record.localPath);
}
}
@ -715,6 +722,7 @@ export class SyncEventQueue {
createEvent.path = newPath;
if (!createEvent.isProcessing) {
this.moveBlockingDeletesBeforeCreate(createEvent, newPath);
this.moveBlockingRenamesBeforeCreate(createEvent, newPath);
}
for (const e of this.events) {
@ -753,6 +761,58 @@ export class SyncEventQueue {
}
}
/**
* The `path` argument is the create's just-retargeted target. Any
* other tracked doc whose server-side path is still `path` (its
* watcher-driven local rename hasn't reached the server yet) needs
* its pending LocalUpdate to drain *before* this create otherwise
* the create's HTTP request hits the server while the doc is still
* at `path` and triggers a same-path same-docId merge that
* silently consumes the user's "new doc" intent into the
* already-tracked doc. The pending LocalUpdate is the rename that
* moves the existing doc off `path` server-side; running it first
* frees the slot. Skipped when the create has already been sent
* at that point the merge has already happened or hasn't, and
* reordering the queue can't unwind it.
*/
private moveBlockingRenamesBeforeCreate(
createEvent: Extract<SyncEvent, { type: SyncEventType.LocalCreate }>,
path: RelativePath
): void {
const blockingDocIds = new Set<DocumentId>();
for (const record of this.byDocId.values()) {
if (
record.remoteRelativePath === path &&
record.localPath !== path
) {
blockingDocIds.add(record.documentId);
}
}
if (blockingDocIds.size === 0) {
return;
}
let createIndex = this.events.indexOf(createEvent);
if (createIndex < 0) {
return;
}
for (let i = createIndex + 1; i < this.events.length; ) {
const event = this.events[i];
if (
event.type === SyncEventType.LocalUpdate &&
typeof event.documentId === "string" &&
blockingDocIds.has(event.documentId)
) {
this.events.splice(i, 1);
this.events.splice(createIndex, 0, event);
createIndex++;
continue;
}
i++;
}
}
/**
* Synchronous half of `setLocalPath`: mutate `record.localPath` and
* re-key `_byLocalPath` without persisting. Used by `enqueue`'s

View file

@ -729,10 +729,11 @@ export class Syncer {
parentVersionId: response.vaultUpdateId,
remoteRelativePath: response.relativePath,
remoteHash,
// localPath is unchanged by the wire loop; only the watcher
// (via the queue's enqueue rename branch) and the reconciler
// touch it. Read from the live record so a rename that
// arrived during the await is preserved.
// localPath is owned by the watcher and the reconciler. Pass
// the value we observed pre-await purely as a hint for the
// placement-pending → placed transition; `upsertRecord` ignores
// it when an existing localPath is already set, so a watcher
// rename that landed during the HTTP roundtrip is preserved.
localPath: livePath
});
this.queue.lastSeenUpdateId = response.vaultUpdateId;
@ -948,8 +949,10 @@ export class Syncer {
// doc (a same-path recreate installed a new owner without
// clearing this record's stale field — same race shape as the
// processRemoteDelete fix above). Writing to a shadowed slot
// would clobber the new owner's bytes. Treat shadowed records
// as if they had no local file: stash for the reconciler.
// would clobber the new owner's bytes. Clear the stale claim now
// so the reconciler treats this record as placement-pending; the
// closing `upsertRecord` no longer touches an existing record's
// localPath, so the clear has to happen explicitly here.
const claimedPath = record.localPath;
const livePath =
claimedPath !== undefined &&
@ -960,8 +963,9 @@ export class Syncer {
if (claimedPath !== undefined && livePath === undefined) {
this.logger.debug(
`Remote update for ${record.documentId} at claimed ${claimedPath} ` +
`but slot is shadowed; deferring write to reconciler`
`but slot is shadowed; clearing stale claim and deferring to reconciler`
);
await this.queue.setLocalPath(record.documentId, undefined);
}
if (livePath !== undefined) {
const currentContent = await this.operations.read(livePath);

81
sync-server/Cargo.lock generated
View file

@ -457,6 +457,15 @@ version = "2.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5"
[[package]]
name = "crossbeam-channel"
version = "0.5.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "82b8f8f868b36967f9606790d1903570de9ceaf870a7bf9fbbd3016d636a2cb2"
dependencies = [
"crossbeam-utils",
]
[[package]]
name = "crossbeam-queue"
version = "0.3.11"
@ -534,6 +543,15 @@ dependencies = [
"zeroize",
]
[[package]]
name = "deranged"
version = "0.5.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7cd812cc2bc1d69d4764bd80df88b4317eaef9e773c75226407d9bc0876b211c"
dependencies = [
"powerfmt",
]
[[package]]
name = "digest"
version = "0.10.7"
@ -1352,6 +1370,12 @@ dependencies = [
"zeroize",
]
[[package]]
name = "num-conv"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c6673768db2d862beb9b39a78fdcb1a69439615d5794a1be50caa9bc92c81967"
[[package]]
name = "num-integer"
version = "0.1.46"
@ -1480,6 +1504,12 @@ version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "953ec861398dccce10c670dfeaf3ec4911ca479e9c02154b3a215178c5f566f2"
[[package]]
name = "powerfmt"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391"
[[package]]
name = "ppv-lite86"
version = "0.2.20"
@ -2160,6 +2190,12 @@ version = "2.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292"
[[package]]
name = "symlink"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a7973cce6668464ea31f176d85b13c7ab3bba2cb3b77a2ed26abd7801688010a"
[[package]]
name = "syn"
version = "1.0.109"
@ -2211,6 +2247,7 @@ dependencies = [
"tokio",
"tower-http",
"tracing",
"tracing-appender",
"tracing-subscriber",
"ts-rs",
"uuid",
@ -2305,6 +2342,37 @@ dependencies = [
"once_cell",
]
[[package]]
name = "time"
version = "0.3.47"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "743bd48c283afc0388f9b8827b976905fb217ad9e647fae3a379a9283c4def2c"
dependencies = [
"deranged",
"itoa",
"num-conv",
"powerfmt",
"serde_core",
"time-core",
"time-macros",
]
[[package]]
name = "time-core"
version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7694e1cfe791f8d31026952abf09c69ca6f6fa4e1a1229e18988f06a04a12dca"
[[package]]
name = "time-macros"
version = "0.2.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2e70e4c5a0e0a8a4823ad65dfe1a6930e4f4d756dcd9dd7939022b5e8c501215"
dependencies = [
"num-conv",
"time-core",
]
[[package]]
name = "tinystr"
version = "0.7.6"
@ -2438,6 +2506,19 @@ dependencies = [
"tracing-core",
]
[[package]]
name = "tracing-appender"
version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "050686193eb999b4bb3bc2acfa891a13da00f79734704c4b8b4ef1a10b368a3c"
dependencies = [
"crossbeam-channel",
"symlink",
"thiserror 2.0.18",
"time",
"tracing-subscriber",
]
[[package]]
name = "tracing-attributes"
version = "0.1.28"

View file

@ -20,6 +20,7 @@ axum_typed_multipart = "0.11.0"
tower-http = { version = "0.6.1", features = ["cors", "trace", "limit", "timeout"] }
tracing = "0.1.41"
tracing-subscriber = { version = "0.3.20", features = ["fmt", "env-filter"]}
tracing-appender = "0.2.5"
humantime-serde = "1.1.1"
sqlx = { version = "0.8.6", features = ["sqlite", "runtime-tokio", "uuid", "chrono"] }
chrono = { version = "0.4.41", features = ["serde"] }

View file

@ -16,6 +16,7 @@ use consts::DEFAULT_CONFIG_PATH;
use errors::{SyncServerError, init_error};
use log::info;
use server::create_server;
use tracing_appender::non_blocking::WorkerGuard;
use tracing_subscriber::{EnvFilter, fmt::format, layer::SubscriberExt, util::SubscriberInitExt};
use utils::rotating_file_writer::RotatingFileWriter;
@ -43,7 +44,9 @@ async fn main() -> ExitCode {
let result = async {
config.validate().map_err(init_error)?;
set_up_logging(&args, &config.logging)?;
// Hold the non-blocking writer guards until shutdown so the
// dedicated writer threads stay alive and flush queued log lines.
let _log_guards = set_up_logging(&args, &config.logging)?;
start_server(config).await
}
.await;
@ -60,7 +63,7 @@ async fn main() -> ExitCode {
fn set_up_logging(
args: &Args,
logging_config: &config::logging_config::LoggingConfig,
) -> Result<(), SyncServerError> {
) -> Result<[WorkerGuard; 2], SyncServerError> {
let level_filter = logging_config.log_level.as_tracing_level();
let env_filter = EnvFilter::builder()
@ -81,6 +84,14 @@ fn set_up_logging(
.context("Failed to create rotating file writer")
.map_err(init_error)?;
// Decouple log emission from disk/stderr I/O. Without this, a tokio
// worker that holds the writer's std::sync::Mutex while a `write(2)`
// is throttled by the kernel (e.g. btrfs writeback) cascades the
// stall to every other worker that tries to log, freezing the whole
// runtime. The guards must outlive every emitter.
let (file_writer, file_guard) = tracing_appender::non_blocking(file_appender);
let (stderr_writer, stderr_guard) = tracing_appender::non_blocking(std::io::stderr());
let format = format()
.with_target(is_debug_mode)
.with_line_number(is_debug_mode)
@ -88,12 +99,12 @@ fn set_up_logging(
let stderr_layer = tracing_subscriber::fmt::layer()
.with_ansi(use_colors)
.with_writer(std::io::stderr)
.with_writer(stderr_writer)
.event_format(format.clone());
let file_layer = tracing_subscriber::fmt::layer()
.with_ansi(false)
.with_writer(file_appender)
.with_writer(file_writer)
.event_format(format);
tracing_subscriber::registry()
@ -104,7 +115,7 @@ fn set_up_logging(
.context("Failed to initialise tracing")
.map_err(init_error)?;
Ok(())
Ok([file_guard, stderr_guard])
}
async fn start_server(config: Config) -> Result<(), SyncServerError> {