Bug fixes

This commit is contained in:
Andras Schmelczer 2026-03-15 13:27:18 +00:00
parent bbec7f14dd
commit df37e6c236
15 changed files with 632 additions and 157 deletions

View file

@ -333,3 +333,94 @@ When fixing the duplicate-document-after-interrupted-create problem, several heu
3. **In-memory tracking** (e.g., `pendingLocalId`): Any in-memory state is lost on app crash. The whole point of the fix is to handle interrupted creates, which include crashes.
The idempotency key approach works because it's: (a) crash-safe (persisted locally); (b) deterministic (UUID lookup, no heuristics); (c) server-authoritative (the server resolves keys to documentIds).
### Critical Implementation Invariants (Learned from Bugs)
These invariants were discovered through deep auditing and E2E testing. Violating any of them causes data loss, sync stalls, or test failures.
**1. `waitUntilFinished` must loop until both sync queue AND WebSocket handlers are simultaneously idle.**
WebSocket message handlers (`onRemoteVaultUpdateReceived`) enqueue new sync operations. If you wait for the sync queue first, then WebSocket handlers, the handlers may have enqueued new operations that aren't awaited. The correct implementation loops: wait for WS handlers → wait for sync queue → check if WS has new work → repeat if needed. See `SyncClient.waitUntilFinished()`.
**2. `enqueueSyncOperation` must catch ALL errors, not just `SyncResetError`.**
`executeSync` re-throws non-SyncReset/non-FileNotFound errors (they're logged in sync history as ERROR). If `enqueueSyncOperation` doesn't catch these, they become unhandled promise rejections that crash the process. The catch logs the error and returns undefined — failed operations will be retried on the next WebSocket reconnect (which clears `runningScheduleSyncForOfflineChanges` and triggers a fresh filesystem scan).
**3. `Locks.reset()` must NOT clear `this.locked`.**
In-flight operations (currently executing their callback) still hold conceptual locks. If `reset()` clears `this.locked`, new operations can acquire the same key and run concurrently with the still-running old operation. Only clear `this.waiters` (to reject pending waiters with SyncResetError). Let running operations release their locks naturally via the `finally` block in `withLock`.
**4. `handleMaybeMergingResponse` must write the file BEFORE updating metadata.**
If metadata is updated first and the write fails (crash, OS error), the metadata points to a server version whose content was never written locally. On recovery, the stale local content is uploaded, potentially overwriting other clients' changes that were part of the merge. Order: write file → re-read + re-hash → update metadata → update cache.
**5. After a MergingUpdate, cache the SERVER's content (`responseBytes`), not the local content.**
The content cache is used to compute diffs for subsequent updates: `diff(cached, newFileContent)`. The server applies this diff against its content at `parentVersionId`. If the cache stores the local content (which may differ from the server's due to the 3-way merge in `FileOperations.write`), the diff won't match the server's state and the update will fail with "Invalid diff".
**6. After a MergingUpdate, re-read the file and re-hash.**
The 3-way merge in `operations.write()` may produce content different from `responseBytes` (because the user edited the file between the read and the write). The stored hash must match the actual on-disk content, not the server's merged content. Otherwise, the next sync cycle incorrectly detects "no changes" (phantom hash match) or always detects changes (phantom hash mismatch).
**7. Snapshot `parentVersionId` before computing diffs.**
`document.metadata` is a mutable shared reference. A concurrent operation (via a WebSocket handler running during an `await` in the same sync operation) can update `parentVersionId` between the cache lookup and the `putText` call. Always capture `const parentVersionIdForUpdate = document.metadata.parentVersionId` and use that value for both the cache lookup and the HTTP request.
**8. Guard `updateDocumentMetadata` against concurrently removed documents.**
After any `await` (file write, re-read, HTTP call), the document may have been removed from the database by a concurrent delete operation. Always check `database.containsDocument(document)` before calling `updateDocumentMetadata` if there was an `await` since the document reference was obtained. Return gracefully if removed — the file is on disk and `scheduleSyncForOfflineChanges` will re-detect it.
**9. When assigning a `documentId` to a pending doc, check for duplicates first.**
Both `resolveIdempotencyKeys` and `handleMaybeMergingResponse` (for deleted pending docs) assign documentIds. Before setting metadata, call `getDocumentByDocumentId(id)`. If another document already has that ID, remove the stale pending doc instead of creating a duplicate. `ensureConsistency` checks for duplicate documentIds across ALL documents (not just `resolvedDocuments`).
**10. `resolveIdempotencyKeys` sets `parentVersionId: 0` — treat this as a create, not an update.**
When `resolveIdempotencyKeys` assigns a documentId to a pending doc, it uses `parentVersionId: 0` as a placeholder. The sync path must check for `parentVersionId === 0` and take the CREATE path (sending a create with the idempotency key), not the UPDATE path (which would fail because version 0 doesn't exist on the server).
**11. Idempotent create returns can have stale content — check `contentSize`.**
When the server returns a `FastForwardUpdate` for a create with an idempotency key, it may return the ORIGINAL version (from the first create), not a new version with the current content. The response's `contentSize` may not match `originalContentBytes.length`. If they differ, fetch the actual server content for that version and use it for the cache and hash, so subsequent diffs are correct.
**12. `SyncClient.pause()` must swallow `SyncResetError`.**
`pause()` calls `fetchController.startReset()` which rejects in-flight fetches. Those rejections propagate through `waitUntilFinished()`. Since `pause()` CAUSED the reset, the resulting `SyncResetError` is expected and must be caught (not re-thrown). Only re-throw non-SyncResetError exceptions. Also call `fetchController.finishReset()` in the catch block to prevent the FetchController from getting stuck in resetting state.
**13. `runningScheduleSyncForOfflineChanges` must be cleared on WebSocket disconnect.**
After the initial `scheduleSyncForOfflineChanges()` completes, the field retains the resolved promise. On WebSocket disconnect/reconnect (without a full client reset), the field must be cleared so the next call triggers a fresh filesystem scan. Add a handler on `onWebSocketStatusChanged` that sets the field to `undefined` when `isConnected` is false.
**14. The server must not `expect()` / panic on UTF-8 conversion — return a client error.**
In `update_text`, the parent version's content may be binary (if another client uploaded binary via `putBinary`). Using `.expect()` on `str::from_utf8()` panics the server. Use `.context(...).map_err(client_error)?` to return a 4xx error, allowing the client to fall back to `putBinary`.
**15. The create-merge parent content must be `latest_version.content`, not empty.**
In `create_document.rs`, when a create merges with an existing document, the 3-way merge parent must be the latest version's content (`&latest_version.content`), not an empty vector (`&Vec::new()`). An empty parent causes `reconcile("", existing, new)` to treat all content as additions, producing garbled interleaved text.
**16. `retryForever` must not retry 4xx HTTP errors.**
4xx errors indicate the request itself is wrong (e.g., invalid diff, missing parent version). Retrying won't help. The `HttpClientError` class (in `errors/http-client-error.ts`) carries the status code. `retryForever` checks for it and re-throws immediately. Only 5xx errors (transient server failures) are retried.
**17. The broadcast channel's `RecvError::Lagged` must be handled explicitly.**
The `while let Ok(update) = broadcast_receiver.recv().await` pattern silently exits the loop on `Lagged`, disconnecting the client without logging. Handle `Lagged` explicitly with a `warn!` log and `break`. The channel capacity (`broadcast_channel_capacity` in config, default 1024) is separate from `max_clients_per_vault`.
### E2E Test Debugging Guide
**How to run E2E tests:**
```bash
cd sync-server && rm -rf databases && ./target/release/sync_server config-e2e.yml &
sleep 3
cd /volumes/syncthing/Projects/vault-link && scripts/e2e.sh 8
```
Always clean the `databases` directory before running. The server must be running separately.
**Common E2E failure patterns:**
1. **`SyncResetError` unhandled rejection**: Check that `enqueueSyncOperation` catches all errors and that `pause()` swallows SyncResetError. The test client's `unhandledRejection` handler checks `error.name === "SyncResetError"` — if the error message changes, update the filter in `test-client/src/cli.ts`.
2. **"Files from agent-X missing in agent-Y"**: This is a consistency assertion. Check the agent's LOCAL file list (now correctly logged per-agent after a logging bug fix). Common causes:
- **Broadcasts lost during shutdown**: Operations completed on one agent but the WebSocket broadcast didn't reach the other before destroy. The 5-second sleep between finish and destroy helps.
- **Path deconfliction**: Both agents have the same DOCUMENT but at different LOCAL paths (e.g., `binary-10.bin` vs `binary-10 (1).bin`). This is a known limitation with concurrent creates at the same path.
- **Failed sync operations not retried**: If `executeSync` throws, the failed file won't be retried until the next WebSocket reconnect (which clears `runningScheduleSyncForOfflineChanges` and triggers a fresh filesystem scan).
3. **"Document not found in database"**: A concurrent operation removed the document between the last `await` and the `updateDocumentMetadata` call. Add a `containsDocument` guard.
4. **"Duplicate documentId found in database"**: Two documents have the same `documentId`. Usually caused by `resolveIdempotencyKeys` or `handleMaybeMergingResponse` assigning a documentId without checking if another doc already has it.
5. **"Invalid diff: attempting to access N characters..."**: The content cache has wrong content for a `parentVersionId`. Common causes: (a) cached local content instead of server content after MergingUpdate; (b) idempotent create returned a stale version but the client cached its current content under that version ID; (c) `parentVersionId` changed between cache lookup and `putText` call due to mutable shared reference.
6. **"Parent version with id 0 not found"**: A document's `parentVersionId` is 0 (set by `resolveIdempotencyKeys`). The sync path should treat `parentVersionId === 0` as a create, not an update.
**Test client internals (`test-client/src/agent/mock-agent.ts`):**
- `files`: InMemoryFileSystem map — the ACTUAL filesystem state
- `data`: Map of expected file contents — what the agent CREATED/UPDATED
- `assertFileSystemsAreConsistent`: Compares `files` maps between two agents
- `assertAllContentIsPresentOnce`: Checks no duplicate content across files
- The `finish()` and `destroy()` methods use `withTimeout(TIMEOUT_MS)` — operations that exceed 30s are killed
**Logging bug (fixed):** In `assertFileSystemsAreConsistent`, the error handler's "Local files" log previously printed `otherAgent.files.keys()` for BOTH agents. Now correctly prints `this.files.keys()` for the current agent.

View file

@ -0,0 +1,8 @@
export class HttpClientError extends Error {
public readonly status: number;
public constructor(status: number, message: string) {
super(message);
this.name = "HttpClientError";
this.status = status;
}
}

View file

@ -64,32 +64,45 @@ export class Database {
) {
initialState ??= {};
this.documents =
initialState.documents?.map(({ relativePath, ...metadata }) => ({
const validDocuments = (initialState.documents ?? []).filter(
(doc) =>
this.validateStoredField(doc, "relativePath", "string") &&
this.validateStoredField(doc, "documentId", "string") &&
this.validateStoredField(doc, "parentVersionId", "number")
);
this.documents = validDocuments.map(
({ relativePath, ...metadata }) => ({
relativePath,
metadata,
isDeleted: false,
parallelVersion: 0
})) ?? [];
})
);
if (initialState.pendingDocuments) {
for (const pending of initialState.pendingDocuments) {
const existing =
this.getLatestDocumentByRelativePath(
pending.relativePath
);
this.documents.push({
relativePath: pending.relativePath,
metadata: undefined,
isDeleted: false,
parallelVersion:
existing !== undefined
? existing.parallelVersion + 1
: 0,
originalCreationPath: pending.originalCreationPath,
idempotencyKey: pending.idempotencyKey
});
}
const validPendingDocuments = (
initialState.pendingDocuments ?? []
).filter(
(doc) =>
this.validateStoredField(doc, "relativePath", "string") &&
this.validateStoredField(doc, "idempotencyKey", "string")
);
for (const pending of validPendingDocuments) {
const existing = this.getLatestDocumentByRelativePath(
pending.relativePath
);
this.documents.push({
relativePath: pending.relativePath,
metadata: undefined,
isDeleted: false,
parallelVersion:
existing !== undefined
? existing.parallelVersion + 1
: 0,
originalCreationPath: pending.originalCreationPath,
idempotencyKey: pending.idempotencyKey
});
}
this.ensureConsistency();
@ -106,6 +119,25 @@ export class Database {
});
}
private validateStoredField(
doc: object,
field: string,
expectedType: "string" | "number"
): boolean {
const value = (doc as Record<string, unknown>)[field];
if (
typeof value !== expectedType ||
(expectedType === "string" && !value) ||
(expectedType === "number" && isNaN(value as number))
) {
this.logger.warn(
`Skipping stored document with invalid ${field}: ${JSON.stringify(doc)}`
);
return false;
}
return true;
}
public get length(): number {
return this.documents.length;
}
@ -301,7 +333,7 @@ export class Database {
({ relativePath, metadata }) => ({
relativePath,
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
...metadata! // `resolvedDocuments` only returns docs with metadata set
...metadata! // filtered to only docs with metadata set
})
),
pendingDocuments: this.pendingDocuments.map(
@ -316,6 +348,25 @@ export class Database {
}
private ensureConsistency(): void {
// Check for duplicate documentIds across ALL documents with metadata,
// not just the deduplicated resolvedDocuments view. A duplicate on a
// lower-parallelVersion record would otherwise go undetected.
const allWithMetadata = this.documents
// eslint-disable-next-line no-restricted-syntax -- Type narrowing, not removing a specific item
.filter((d) => d.metadata !== undefined);
const documentIdSet = new Set<string>();
for (const doc of allWithMetadata) {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const docId = doc.metadata!.documentId;
if (documentIdSet.has(docId)) {
throw new Error(
`Duplicate documentId ${docId} found in database`
);
}
documentIdSet.add(docId);
}
// Also check the deduplicated view for path-level invariants
const idToPath = new Map<string, string[]>();
this.resolvedDocuments.forEach(({ relativePath, metadata }) => {

View file

@ -9,6 +9,7 @@ import type { Settings } from "../persistence/settings";
import type { FetchController } from "./fetch-controller";
import { sleep } from "../utils/sleep";
import { SyncResetError } from "../errors/sync-reset-error";
import { HttpClientError } from "../errors/http-client-error";
import type { SerializedError } from "./types/SerializedError";
import type { DocumentVersionWithoutContent } from "./types/DocumentVersionWithoutContent";
import type { DocumentUpdateResponse } from "./types/DocumentUpdateResponse";
@ -65,6 +66,17 @@ export class SyncService {
return result;
}
private static async throwHttpError(
response: Response,
context: string
): Promise<never> {
const message = `${context}: ${await SyncService.errorFromResponse(response)}`;
if (response.status >= 400 && response.status < 500) {
throw new HttpClientError(response.status, message);
}
throw new Error(message);
}
public async create({
relativePath,
contentBytes,
@ -98,10 +110,9 @@ export class SyncService {
});
if (!response.ok) {
throw new Error(
`Failed to create document: ${await SyncService.errorFromResponse(
response
)}`
await SyncService.throwHttpError(
response,
"Failed to create document"
);
}
@ -146,10 +157,9 @@ export class SyncService {
);
if (!response.ok) {
throw new Error(
`Failed to update document: ${await SyncService.errorFromResponse(
response
)}`
await SyncService.throwHttpError(
response,
"Failed to update document"
);
}
@ -157,8 +167,7 @@ export class SyncService {
(await response.json()) as DocumentUpdateResponse; // eslint-disable-line @typescript-eslint/no-unsafe-type-assertion
this.logger.debug(
`Updated document ${JSON.stringify(result)} with id ${
result.documentId
`Updated document ${JSON.stringify(result)} with id ${result.documentId
}}`
);
@ -199,10 +208,9 @@ export class SyncService {
);
if (!response.ok) {
throw new Error(
`Failed to update document: ${await SyncService.errorFromResponse(
response
)}`
await SyncService.throwHttpError(
response,
"Failed to update document"
);
}
@ -210,8 +218,7 @@ export class SyncService {
(await response.json()) as DocumentUpdateResponse; // eslint-disable-line @typescript-eslint/no-unsafe-type-assertion
this.logger.debug(
`Updated document ${JSON.stringify(result)} with id ${
result.documentId
`Updated document ${JSON.stringify(result)} with id ${result.documentId
}}`
);
@ -245,10 +252,9 @@ export class SyncService {
);
if (!response.ok) {
throw new Error(
`Failed to delete document: ${await SyncService.errorFromResponse(
response
)}`
await SyncService.throwHttpError(
response,
"Failed to delete document"
);
}
@ -279,10 +285,9 @@ export class SyncService {
);
if (!response.ok) {
throw new Error(
`Failed to get document: ${await SyncService.errorFromResponse(
response
)}`
await SyncService.throwHttpError(
response,
"Failed to get document"
);
}
@ -317,10 +322,9 @@ export class SyncService {
);
if (!response.ok) {
throw new Error(
`Failed to get document: ${await SyncService.errorFromResponse(
response
)}`
await SyncService.throwHttpError(
response,
"Failed to get document"
);
}
@ -338,7 +342,7 @@ export class SyncService {
return this.retryForever(async () => {
this.logger.debug(
"Getting all documents" +
(since != null ? ` since ${since}` : "")
(since != null ? ` since ${since}` : "")
);
const url = new URL(this.getUrl("/documents"));
@ -350,10 +354,9 @@ export class SyncService {
});
if (!response.ok) {
throw new Error(
`Failed to get documents: ${await SyncService.errorFromResponse(
response
)}`
await SyncService.throwHttpError(
response,
"Failed to get documents"
);
}
@ -464,6 +467,12 @@ export class SyncService {
throw e;
}
// Don't retry 4xx client errors — the request itself is wrong
// and retrying won't help
if (e instanceof HttpClientError) {
throw e;
}
const retryInterval =
this.settings.getSettings().networkRetryIntervalMs;
this.logger.error(

View file

@ -109,6 +109,10 @@ export class WebSocketManager {
await awaitAll(this.outstandingPromises);
}
public hasOutstandingWork(): boolean {
return this.outstandingPromises.length > 0;
}
public sendHandshakeMessage(
message: WebSocketClientMessage & { type: "handshake" }
): void {

View file

@ -18,6 +18,7 @@ import type { NetworkConnectionStatus } from "./types/network-connection-status"
import { DocumentSyncStatus } from "./types/document-sync-status";
import { WebSocketManager } from "./services/websocket-manager";
import { createClientId } from "./utils/create-client-id";
import { SyncResetError } from "./errors/sync-reset-error";
import { CursorTracker } from "./sync-operations/cursor-tracker";
import type { CursorSpan } from "./services/types/CursorSpan";
import type { MaybeOutdatedClientCursors } from "./types/maybe-outdated-client-cursors";
@ -424,8 +425,21 @@ export class SyncClient {
public async waitUntilFinished(): Promise<void> {
this.checkIfDestroyed("waitUntilIdle");
await this.syncer.waitUntilFinished();
await this.webSocketManager.waitUntilFinished();
// Loop until both sync queue and WebSocket handlers are
// simultaneously idle. WS handlers can enqueue new sync
// operations, and completed sync operations can trigger
// broadcasts that create new WS handler promises.
let iteration = 0;
while (true) {
iteration++;
this.logger.info(`waitUntilFinished: iteration ${iteration}`);
await this.webSocketManager.waitUntilFinished();
await this.syncer.waitUntilFinished();
// Check if anything new arrived while we were waiting
if (!this.webSocketManager.hasOutstandingWork()) {
break;
}
}
await this.database.save(); // flush all changes to disk
}
@ -476,10 +490,40 @@ export class SyncClient {
this.hasFinishedOfflineSync = true;
}
/**
* Hard pause: aborts all in-flight HTTP operations via FetchController reset.
* Used when the SyncClient is being destroyed or fully reset (connection
* settings changed). This is the nuclear option every outstanding fetch
* is rejected with SyncResetError so the queue drains immediately.
*/
private async pause(): Promise<void> {
this.hasFinishedOfflineSync = false;
this.fetchController.startReset();
try {
await this.webSocketManager.stop();
await this.waitUntilFinished();
} catch (e) {
// SyncResetError is expected here — we just called startReset()
// which rejects in-flight fetches. Only re-throw non-reset errors
// (after ensuring the FetchController is left in a usable state).
this.fetchController.finishReset();
if (!(e instanceof SyncResetError)) {
throw e;
}
}
}
/**
* Soft pause: stops the WebSocket and clears the sync queue, but lets
* in-flight HTTP operations complete naturally. Used when the user toggles
* sync off we don't want to abort creates/updates that are mid-flight
* because they'd just be re-queued on re-enable, potentially leading to
* an infinite retry loop with flaky connections.
*/
private async softPause(): Promise<void> {
this.hasFinishedOfflineSync = false;
await this.webSocketManager.stop();
this.syncer.reset();
await this.waitUntilFinished();
}
@ -509,7 +553,7 @@ export class SyncClient {
if (newSettings.isSyncEnabled) {
await this.startSyncing();
} else {
await this.pause();
await this.softPause();
}
}

View file

@ -71,6 +71,10 @@ export class Syncer {
if (isConnected) {
// The JS WebSocket API doesn't support setting headers, so we have to send the token as a message
this.sendHandshakeMessage();
} else {
// Clear so that the next reconnect re-runs scheduleSyncForOfflineChanges
// instead of returning the stale resolved promise.
this.runningScheduleSyncForOfflineChanges = undefined;
}
});
this.webSocketManager.onRemoteVaultUpdateReceived.add(
@ -267,7 +271,7 @@ export class Syncer {
public async waitUntilFinished(): Promise<void> {
await this.runningScheduleSyncForOfflineChanges;
await this.syncQueue.onIdle(); // Wait for queue to be empty and running tasks to finish
await this.syncQueue.onIdle();
}
public async syncRemotelyUpdatedFile(
@ -330,19 +334,19 @@ export class Syncer {
remoteVersion.documentId
);
await this.enqueueSyncOperation(
async () =>
this.unrestrictedSyncer.unrestrictedSyncRemotelyUpdatedFile(
async () => {
await this.unrestrictedSyncer.unrestrictedSyncRemotelyUpdatedFile(
remoteVersion,
document
),
);
this.database.addSeenUpdateId(remoteVersion.vaultUpdateId);
},
[
document?.relativePath,
remoteVersion.relativePath,
remoteVersion.documentId
]
);
this.database.addSeenUpdateId(remoteVersion.vaultUpdateId);
}
private async internalScheduleSyncForOfflineChanges(): Promise<void> {
@ -371,9 +375,12 @@ export class Syncer {
}
const instructions: (Instruction | undefined)[] = await awaitAll(
allLocalFiles.map(async (relativePath) => {
if (
const existingMetadata =
this.database.getLatestDocumentByRelativePath(relativePath)
?.metadata !== undefined
?.metadata;
if (
existingMetadata !== undefined &&
existingMetadata.parentVersionId > 0
) {
this.logger.debug(
`Document ${relativePath} might have been updated locally, scheduling sync to validate and update it`
@ -382,12 +389,27 @@ export class Syncer {
return { type: "update", relativePath } as Instruction;
}
// Perhaps the file has been moved; let's check by looking at the deleted files
const contentHash = await this.syncQueue.add(async () => {
// Perhaps the file has been moved; let's check by looking at the deleted files.
// Skip reading oversized files into memory for hash computation —
// they can't participate in move detection and will be scheduled as creates.
const hashResult = await this.syncQueue.add(async () => {
try {
const sizeInBytes =
await this.operations.getFileSize(relativePath);
const sizeInMB = Math.ceil(
sizeInBytes / 1024 / 1024
);
const { maxFileSizeMB } =
this.settings.getSettings();
if (sizeInMB > maxFileSizeMB) {
// File exceeds size limit — skip hash-based move
// detection and schedule as a create instead
return { skippedOversized: true } as const;
}
const contentBytes =
await this.operations.read(relativePath); // this can throw FileNotFoundError
return hash(contentBytes);
return { hash: hash(contentBytes) } as const;
} catch (e) {
if (
e instanceof Error &&
@ -399,15 +421,21 @@ export class Syncer {
}
});
if (contentHash == undefined) {
if (hashResult == undefined) {
// The file was deleted before we had a chance to read it, no need to sync it here
return;
}
const originalFile = findMatchingFile(
contentHash,
locallyPossiblyDeletedFiles
);
const contentHash =
"hash" in hashResult ? hashResult.hash : undefined;
const originalFile =
contentHash != undefined
? findMatchingFile(
contentHash,
locallyPossiblyDeletedFiles
)
: undefined;
if (originalFile !== undefined) {
// `originalFile` hasn't been deleted but it got moved instead
/* eslint-disable no-restricted-syntax -- Comparing by property, not direct equality */
@ -505,12 +533,25 @@ export class Syncer {
//
// The result type needs special handling since syncQueue.add() can
// return undefined when the queue is paused/cleared.
const result = await this.syncQueue.add(async () =>
this.updatedDocumentsByPathAndKeysLocks.withLock(
filteredKeys,
operation
)
);
const result = await this.syncQueue.add(async () => {
try {
return await this.updatedDocumentsByPathAndKeysLocks.withLock(
filteredKeys,
operation
);
} catch (e) {
// Catch all errors to prevent unhandled promise rejections.
// SyncResetError: lock waiter rejected during reset (expected).
// Other errors: logged by executeSync's history entry, will
// be retried on the next scheduleSyncForOfflineChanges cycle.
if (!(e instanceof SyncResetError)) {
this.logger.info(
`Sync operation failed, will retry on next cycle: ${e}`
);
}
return undefined;
}
});
return result as T;
}
}

View file

@ -83,6 +83,15 @@ export class UnrestrictedSyncer {
doc.idempotencyKey !== undefined &&
resolved.has(doc.idempotencyKey)
) {
// Check if document was removed by a concurrent operation
// (e.g., a delete) between the snapshot and now
if (!this.database.containsDocument(doc)) {
this.logger.info(
`Pending doc at ${doc.relativePath} was removed during key resolution, skipping`
);
continue;
}
const documentId = resolved.get(doc.idempotencyKey)!; // eslint-disable-line @typescript-eslint/no-non-null-assertion
// Skip if this documentId is already assigned to another document
@ -160,7 +169,14 @@ export class UnrestrictedSyncer {
let response: DocumentVersion | DocumentUpdateResponse | undefined =
undefined;
if (document.metadata === undefined) {
if (
document.metadata === undefined ||
document.metadata.parentVersionId === 0
) {
// parentVersionId === 0 occurs when resolveIdempotencyKeys
// assigned a documentId but hasn't synced yet. Treat as a
// create — the server will recognise the idempotency key
// and return the existing document.
response = await this.syncService.create({
relativePath: originalRelativePath,
contentBytes,
@ -188,16 +204,22 @@ export class UnrestrictedSyncer {
(await this.serverConfig.getConfig())
.mergeableFileExtensions
);
// Snapshot parentVersionId atomically with the cache
// lookup. document.metadata is a mutable shared
// reference — a concurrent operation could update
// parentVersionId between the cache lookup and the
// putText call, causing a diff/version mismatch.
const parentVersionIdForUpdate =
document.metadata.parentVersionId;
const cachedVersion = this.contentCache.get(
document.metadata.parentVersionId
parentVersionIdForUpdate
);
response =
isText && cachedVersion !== undefined
? await this.syncService.putText({
documentId: document.metadata.documentId,
parentVersionId:
document.metadata.parentVersionId,
parentVersionId: parentVersionIdForUpdate,
relativePath: document.relativePath,
content: diff(
new TextDecoder().decode(cachedVersion),
@ -206,8 +228,7 @@ export class UnrestrictedSyncer {
})
: await this.syncService.putBinary({
documentId: document.metadata.documentId,
parentVersionId:
document.metadata.parentVersionId,
parentVersionId: parentVersionIdForUpdate,
relativePath: document.relativePath,
contentBytes
});
@ -522,6 +543,31 @@ export class UnrestrictedSyncer {
this.logger.info(
`Document ${document.relativePath} has been deleted before we could finish updating it`
);
// Assign metadata so the pending delete can inform the server
if (document.metadata === undefined) {
const existingWithSameId =
this.database.getDocumentByDocumentId(
response.documentId
);
if (
existingWithSameId !== undefined &&
existingWithSameId !== document
) {
// Another doc already has this documentId — the server
// knows about it. Just remove this stale pending doc.
this.database.removeDocument(document);
} else {
this.database.updateDocumentMetadata(
{
documentId: response.documentId,
parentVersionId: response.vaultUpdateId,
hash: contentHash,
remoteRelativePath: response.relativePath
},
document
);
}
}
this.database.addSeenUpdateId(response.vaultUpdateId);
return;
}
@ -615,18 +661,9 @@ export class UnrestrictedSyncer {
if (!("type" in response) || response.type === "MergingUpdate") {
const responseBytes = base64ToBytes(response.contentBase64);
contentHash = hash(responseBytes);
this.database.updateDocumentMetadata(
{
documentId: response.documentId,
parentVersionId: response.vaultUpdateId,
hash: contentHash,
remoteRelativePath: response.relativePath
},
document
);
// Write file BEFORE updating metadata so that if the write fails,
// metadata doesn't point to a version whose content was never written.
await this.operations.write(
actualPath,
originalContentBytes,
@ -642,27 +679,90 @@ export class UnrestrictedSyncer {
);
}
// Re-read and re-hash after write because the 3-way merge in
// operations.write() may produce content different from responseBytes.
const actualContent = await this.operations.read(actualPath);
const actualHash = hash(actualContent);
// The document may have been removed by a concurrent operation
// (e.g., a delete) during the awaited file write/read above.
// The file is safely on disk; recovery will re-detect it.
if (!this.database.containsDocument(document)) {
this.logger.info(
`Document ${document.relativePath} was removed during sync, skipping metadata update`
);
return;
}
this.database.updateDocumentMetadata(
{
documentId: response.documentId,
parentVersionId: response.vaultUpdateId,
hash: actualHash,
remoteRelativePath: response.relativePath
},
document
);
// Cache the SERVER's content (responseBytes), not the local
// content (actualContent). The cache is used to compute diffs
// for subsequent updates: diff(cached, newFileContent). The
// server applies this diff against its content at
// parentVersionId, which is responseBytes. Using actualContent
// would produce diffs that don't match the server's state.
await this.updateCache(
response.vaultUpdateId,
responseBytes,
actualPath
);
} else {
this.database.updateDocumentMetadata(
{
documentId: response.documentId,
parentVersionId: response.vaultUpdateId,
hash: contentHash,
remoteRelativePath: response.relativePath
},
document
);
await this.updateCache(
response.vaultUpdateId,
originalContentBytes,
actualPath
);
// FastForwardUpdate — the server accepted our content as-is,
// UNLESS this was an idempotent create return (the server
// returned the original version, whose content may differ from
// what we sent). Detect this by comparing contentSize.
const serverContentMatchesLocal =
!("contentSize" in response) ||
response.contentSize === originalContentBytes.length;
if (serverContentMatchesLocal) {
this.database.updateDocumentMetadata(
{
documentId: response.documentId,
parentVersionId: response.vaultUpdateId,
hash: contentHash,
remoteRelativePath: response.relativePath
},
document
);
await this.updateCache(
response.vaultUpdateId,
originalContentBytes,
actualPath
);
} else {
// The server returned a stale idempotent version. Fetch
// the actual content so the cache stays consistent, then
// the hash mismatch will trigger a follow-up update sync.
const serverContent =
await this.syncService.getDocumentVersionContent({
documentId: response.documentId,
vaultUpdateId: response.vaultUpdateId
});
this.database.updateDocumentMetadata(
{
documentId: response.documentId,
parentVersionId: response.vaultUpdateId,
hash: hash(serverContent),
remoteRelativePath: response.relativePath
},
document
);
await this.updateCache(
response.vaultUpdateId,
serverContent,
actualPath
);
}
}
this.database.addSeenUpdateId(response.vaultUpdateId);
@ -672,9 +772,10 @@ export class UnrestrictedSyncer {
sizeInBytes: number,
relativePath: RelativePath
): CommonHistoryEntry | undefined {
const sizeInMB = Math.round(sizeInBytes / 1024 / 1024);
const { maxFileSizeMB } = this.settings.getSettings();
if (sizeInMB > maxFileSizeMB) {
const maxFileSizeBytes = maxFileSizeMB * 1024 * 1024;
if (sizeInBytes > maxFileSizeBytes) {
const sizeInMB = (sizeInBytes / 1024 / 1024).toFixed(1);
return {
status: SyncStatus.SKIPPED,
details: {

View file

@ -85,7 +85,11 @@ export class Locks<T> {
reject(new SyncResetError());
}
}
this.locked.clear();
// Do NOT clear this.locked — let running operations release their own
// locks via the finally block in withLock. Clearing this.locked would
// allow new operations to acquire locks on keys still held by in-flight
// operations, breaking mutual exclusion.
this.waiters.clear();
}

View file

@ -13,6 +13,7 @@ const TIMEOUT_MS = 10 * 60 * 1000;
export class MockAgent extends MockClient {
private readonly writtenContents: string[] = [];
private readonly writtenBinaryContents: string[] = [];
private readonly pendingActions: Promise<unknown>[] = [];
// The renamed file finding algorithm isn't too smart so we can't both update and rename the same file
@ -51,7 +52,7 @@ export class MockAgent extends MockClient {
const formatted = `[${this.name} ${state}] ${logLine.timestamp.toISOString()} ${logLine.level} ${logLine.message}`;
// HACK: we have to ensure the file has been synced if we want to change it offline without data loss
const historyEntry = /.*History entry: (.*.md).*/.exec(
const historyEntry = /.*History entry: (.*\.(?:md|bin)).*/.exec(
logLine.message
);
@ -115,9 +116,11 @@ export class MockAgent extends MockClient {
);
}
public async act(): Promise<void> {
const options: (() => Promise<unknown>)[] = [
this.createFileAction.bind(this)
this.createFileAction.bind(this),
this.createBinaryFileAction.bind(this)
];
if (
@ -132,7 +135,8 @@ export class MockAgent extends MockClient {
options.push(
this.renameFileAction.bind(this),
this.updateFileAction.bind(this)
this.updateFileAction.bind(this),
this.updateBinaryFileAction.bind(this)
);
if (this.doDeletes) {
@ -226,26 +230,26 @@ export class MockAgent extends MockClient {
"Local data: " + JSON.stringify(this.data, null, 2)
);
this.client.logger.info(
"Local files: " + Array.from(otherAgent.files.keys()).join(", ")
"Local files: " + Array.from(this.files.keys()).join(", ")
);
otherAgent.client.logger.info(
"Local data: " + JSON.stringify(otherAgent.data, null, 2)
"Other agent's data: " + JSON.stringify(otherAgent.data, null, 2)
);
otherAgent.client.logger.info(
"Local files: " + Array.from(otherAgent.files.keys()).join(", ")
"Other agent's files: " + Array.from(otherAgent.files.keys()).join(", ")
);
throw e;
}
}
// For slow file events, still check for duplicates (skip existence check).
// Duplication is always a bug regardless of timing.
public assertAllContentIsPresentOnce(): void {
if (this.useSlowFileEvents) {
this.client.logger.info(
// We can't ensure that we have seen every single update
`Skipping content check for ${this.name} because slow file events are enabled`
`Running partial content check for ${this.name} (slow file events: skipping existence check)`
);
return;
}
for (const content of this.writtenContents) {
@ -260,14 +264,13 @@ export class MockAgent extends MockClient {
`[${this.name}] Content ${content} found in multiple files: ${found.join(", ")}`
);
if (!this.doDeletes) {
if (!this.useSlowFileEvents && !this.doDeletes) {
assert(
found.length >= 1,
`[${this.name}] Content ${content} not found in any files`
);
}
if (found.length === 1) {
const [file] = found;
const fileContent = new TextDecoder().decode(
@ -281,6 +284,31 @@ export class MockAgent extends MockClient {
}
}
// Check binary content isn't duplicated across files.
// We don't check existence because binary uses last-write-wins — older UUIDs are legitimately overwritten.
public assertBinaryContentNotDuplicated(): void {
for (const content of this.writtenBinaryContents) {
const found = Array.from(this.files.keys()).filter((key) => {
return new TextDecoder()
.decode(this.files.get(key))
.includes(content);
});
assert(
found.length <= 1,
`[${this.name}] Binary content ${content} found in multiple files: ${found.join(", ")}`
);
}
}
public getFileList(): string[] {
return Array.from(this.files.keys());
}
public getFileContent(path: string): Uint8Array | undefined {
return this.files.get(path);
}
private async resetClient(): Promise<void> {
this.client.logger.info(`Resetting client ${this.name}`);
await this.client.destroy();
@ -308,6 +336,28 @@ export class MockAgent extends MockClient {
});
}
// Binary file creation — exercises the putBinary server path (not in mergeable_file_extensions)
private async createBinaryFileAction(): Promise<void> {
const file = this.getBinaryFileName();
if (
(!this.lastSyncEnabledState &&
this.doNotTouchWhileOffline.includes(file)) ||
(await this.exists(file))
) {
return;
}
const content = this.getBinaryContent();
this.client.logger.info(
`Decided to create binary file ${file}`
);
return this.create(file, content, {
ignoreSlowFileEvents: true
});
}
private async disableSyncAction(): Promise<void> {
this.client.logger.info(`Decided to disable sync`);
this.lastSyncEnabledState = false;
@ -357,7 +407,9 @@ export class MockAgent extends MockClient {
}
private async updateFileAction(): Promise<void> {
const files = await this.listFilesRecursively();
const files = (await this.listFilesRecursively()).filter((f) =>
f.endsWith(".md")
);
if (files.length === 0) {
return;
}
@ -391,6 +443,40 @@ export class MockAgent extends MockClient {
);
}
// Binary file update — complete replacement (last-write-wins)
private async updateBinaryFileAction(): Promise<void> {
const files = (await this.listFilesRecursively()).filter((f) =>
f.endsWith(".bin")
);
if (files.length === 0) {
return;
}
const file = choose(files);
if (
!this.lastSyncEnabledState &&
this.doNotTouchWhileOffline.includes(file)
) {
return;
}
const content = this.getBinaryContent();
this.client.logger.info(
`Decided to update binary file ${file}`
);
this.doNotTouchWhileOffline.push(file);
this.files.set(file, content);
this.executeFileOperation(
async () =>
this.client.syncLocallyUpdatedFile({
relativePath: file
}),
true
);
}
private async deleteFileAction(): Promise<void> {
const files = await this.listFilesRecursively();
if (files.length === 0) {
@ -408,8 +494,19 @@ export class MockAgent extends MockClient {
return uuid;
}
private getBinaryContent(): Uint8Array {
const uuid = uuidv4();
this.writtenBinaryContents.push(uuid);
return new TextEncoder().encode(`BINARY:${uuid}`);
}
private getFileName(): string {
// Simulate name collisions between the clients
return `file-${Math.floor(Math.random() * 64)}.md`;
}
private getBinaryFileName(): string {
// Smaller range to increase collision frequency for last-write-wins testing
return `binary-${Math.floor(Math.random() * 16)}.bin`;
}
}

View file

@ -177,7 +177,7 @@ export class MockClient extends debugging.InMemoryFileSystem {
);
}
private executeFileOperation(
protected executeFileOperation(
callback: () => unknown,
ignoreSlowFileEvents = false
): void {

View file

@ -119,7 +119,7 @@ async function runTest({
logger.info(
`Checking consistency between ${client.name} and ${clients[i + 1].name}`
);
client.assertFileSystemsAreConsistent(clients[i]);
client.assertFileSystemsAreConsistent(clients[i + 1]);
logger.info(`Consistency check for ${client.name} passed`);
});

View file

@ -55,7 +55,9 @@ pub async fn create_document(
.await
.map_err(server_error)?;
if let Some(existing) = existing {
info!("Found existing document with idempotency key `{idempotency_key}`, returning existing document");
info!(
"Found existing document with idempotency key `{idempotency_key}`, returning existing document"
);
transaction
.rollback()
.await
@ -78,6 +80,7 @@ pub async fn create_document(
)
.await
.map_err(server_error)?;
if let Some(latest_version) = latest_version {
info!(
"Document already exists at new location: `{sanitized_relative_path}` when trying to create it in vault `{vault_id}`, merging into existing document"
@ -85,7 +88,7 @@ pub async fn create_document(
return merge_with_stored_version(
&sanitized_relative_path,
&Vec::new(),
&latest_version.content.clone(),
latest_version,
vault_id,
user,

View file

@ -79,9 +79,12 @@ pub async fn update_text(
) -> Result<Json<DocumentUpdateResponse>, SyncServerError> {
let parent_document = get_parent_document(&state, &vault_id, request.parent_version_id).await?;
let parent_content = str::from_utf8(&parent_document.content)
.context("Parent document content is not valid UTF-8")
.map_err(client_error)?;
let edited_text = EditedText::from_diff(
str::from_utf8(&parent_document.content)
.expect("parent must be valid UTF-8 because it's a text document"),
parent_content,
request.content,
&*BuiltinTokenizer::Word,
)
@ -232,15 +235,20 @@ pub async fn merge_with_stored_version(
"Merging changes for document `{}` in vault `{vault_id}`",
latest_version.document_id
);
let parent_str = str::from_utf8(parent_document_content)
.context("Parent document content is not valid UTF-8")
.map_err(server_error)?;
let latest_str = str::from_utf8(&latest_version.content)
.context("Latest version content is not valid UTF-8")
.map_err(server_error)?;
let content_str = str::from_utf8(&content)
.context("New content is not valid UTF-8")
.map_err(server_error)?;
reconcile(
str::from_utf8(parent_document_content)
.expect("parent must be valid UTF-8 because it's not binary"),
&str::from_utf8(&latest_version.content)
.expect("latest_version must be valid UTF-8 because it's not binary")
.into(),
&str::from_utf8(&content)
.expect("content must be valid UTF-8 because it's not binary")
.into(),
parent_str,
&latest_str.into(),
&content_str.into(),
&*BuiltinTokenizer::Word,
)
.apply()

View file

@ -7,7 +7,7 @@ use axum::{
response::Response,
};
use futures::stream::StreamExt;
use log::{debug, info};
use log::{debug, info, warn};
use serde::Deserialize;
use crate::{
@ -101,24 +101,38 @@ async fn websocket(
let device_id = authed_handshake.handshake.device_id.clone();
let mut send_task = tokio::spawn(async move {
while let Ok(update) = broadcast_receiver.recv().await {
if Some(&device_id) == update.origin_device_id.as_ref() {
continue;
}
loop {
match broadcast_receiver.recv().await {
Ok(update) => {
if Some(&device_id) == update.origin_device_id.as_ref() {
continue;
}
let message = match update.message {
WebSocketServerMessage::CursorPositions(CursorPositionFromServer { clients }) => {
WebSocketServerMessage::CursorPositions(CursorPositionFromServer {
clients: clients
.into_iter()
.filter(|client| client.device_id != device_id)
.collect(),
})
let message = match update.message {
WebSocketServerMessage::CursorPositions(
CursorPositionFromServer { clients },
) => WebSocketServerMessage::CursorPositions(CursorPositionFromServer {
clients: clients
.into_iter()
.filter(|client| client.device_id != device_id)
.collect(),
}),
WebSocketServerMessage::VaultUpdate(_) => update.message,
};
send_update_over_websocket(&message, &mut sender).await?;
}
WebSocketServerMessage::VaultUpdate(_) => update.message,
};
send_update_over_websocket(&message, &mut sender).await?;
Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
warn!(
"WebSocket receiver for device {device_id} lagged by {n} messages, \
disconnecting for re-sync"
);
break;
}
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
break;
}
}
}
Ok::<(), SyncServerError>(())