diff --git a/CLAUDE.md b/CLAUDE.md index bc2cd1d7..9cd20feb 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -333,3 +333,94 @@ When fixing the duplicate-document-after-interrupted-create problem, several heu 3. **In-memory tracking** (e.g., `pendingLocalId`): Any in-memory state is lost on app crash. The whole point of the fix is to handle interrupted creates, which include crashes. The idempotency key approach works because it's: (a) crash-safe (persisted locally); (b) deterministic (UUID lookup, no heuristics); (c) server-authoritative (the server resolves keys to documentIds). + +### Critical Implementation Invariants (Learned from Bugs) + +These invariants were discovered through deep auditing and E2E testing. Violating any of them causes data loss, sync stalls, or test failures. + +**1. `waitUntilFinished` must loop until both sync queue AND WebSocket handlers are simultaneously idle.** +WebSocket message handlers (`onRemoteVaultUpdateReceived`) enqueue new sync operations. If you wait for the sync queue first, then WebSocket handlers, the handlers may have enqueued new operations that aren't awaited. The correct implementation loops: wait for WS handlers → wait for sync queue → check if WS has new work → repeat if needed. See `SyncClient.waitUntilFinished()`. + +**2. `enqueueSyncOperation` must catch ALL errors, not just `SyncResetError`.** +`executeSync` re-throws non-SyncReset/non-FileNotFound errors (they're logged in sync history as ERROR). If `enqueueSyncOperation` doesn't catch these, they become unhandled promise rejections that crash the process. The catch logs the error and returns undefined — failed operations will be retried on the next WebSocket reconnect (which clears `runningScheduleSyncForOfflineChanges` and triggers a fresh filesystem scan). + +**3. `Locks.reset()` must NOT clear `this.locked`.** +In-flight operations (currently executing their callback) still hold conceptual locks. If `reset()` clears `this.locked`, new operations can acquire the same key and run concurrently with the still-running old operation. Only clear `this.waiters` (to reject pending waiters with SyncResetError). Let running operations release their locks naturally via the `finally` block in `withLock`. + +**4. `handleMaybeMergingResponse` must write the file BEFORE updating metadata.** +If metadata is updated first and the write fails (crash, OS error), the metadata points to a server version whose content was never written locally. On recovery, the stale local content is uploaded, potentially overwriting other clients' changes that were part of the merge. Order: write file → re-read + re-hash → update metadata → update cache. + +**5. After a MergingUpdate, cache the SERVER's content (`responseBytes`), not the local content.** +The content cache is used to compute diffs for subsequent updates: `diff(cached, newFileContent)`. The server applies this diff against its content at `parentVersionId`. If the cache stores the local content (which may differ from the server's due to the 3-way merge in `FileOperations.write`), the diff won't match the server's state and the update will fail with "Invalid diff". + +**6. After a MergingUpdate, re-read the file and re-hash.** +The 3-way merge in `operations.write()` may produce content different from `responseBytes` (because the user edited the file between the read and the write). The stored hash must match the actual on-disk content, not the server's merged content. Otherwise, the next sync cycle incorrectly detects "no changes" (phantom hash match) or always detects changes (phantom hash mismatch). + +**7. Snapshot `parentVersionId` before computing diffs.** +`document.metadata` is a mutable shared reference. A concurrent operation (via a WebSocket handler running during an `await` in the same sync operation) can update `parentVersionId` between the cache lookup and the `putText` call. Always capture `const parentVersionIdForUpdate = document.metadata.parentVersionId` and use that value for both the cache lookup and the HTTP request. + +**8. Guard `updateDocumentMetadata` against concurrently removed documents.** +After any `await` (file write, re-read, HTTP call), the document may have been removed from the database by a concurrent delete operation. Always check `database.containsDocument(document)` before calling `updateDocumentMetadata` if there was an `await` since the document reference was obtained. Return gracefully if removed — the file is on disk and `scheduleSyncForOfflineChanges` will re-detect it. + +**9. When assigning a `documentId` to a pending doc, check for duplicates first.** +Both `resolveIdempotencyKeys` and `handleMaybeMergingResponse` (for deleted pending docs) assign documentIds. Before setting metadata, call `getDocumentByDocumentId(id)`. If another document already has that ID, remove the stale pending doc instead of creating a duplicate. `ensureConsistency` checks for duplicate documentIds across ALL documents (not just `resolvedDocuments`). + +**10. `resolveIdempotencyKeys` sets `parentVersionId: 0` — treat this as a create, not an update.** +When `resolveIdempotencyKeys` assigns a documentId to a pending doc, it uses `parentVersionId: 0` as a placeholder. The sync path must check for `parentVersionId === 0` and take the CREATE path (sending a create with the idempotency key), not the UPDATE path (which would fail because version 0 doesn't exist on the server). + +**11. Idempotent create returns can have stale content — check `contentSize`.** +When the server returns a `FastForwardUpdate` for a create with an idempotency key, it may return the ORIGINAL version (from the first create), not a new version with the current content. The response's `contentSize` may not match `originalContentBytes.length`. If they differ, fetch the actual server content for that version and use it for the cache and hash, so subsequent diffs are correct. + +**12. `SyncClient.pause()` must swallow `SyncResetError`.** +`pause()` calls `fetchController.startReset()` which rejects in-flight fetches. Those rejections propagate through `waitUntilFinished()`. Since `pause()` CAUSED the reset, the resulting `SyncResetError` is expected and must be caught (not re-thrown). Only re-throw non-SyncResetError exceptions. Also call `fetchController.finishReset()` in the catch block to prevent the FetchController from getting stuck in resetting state. + +**13. `runningScheduleSyncForOfflineChanges` must be cleared on WebSocket disconnect.** +After the initial `scheduleSyncForOfflineChanges()` completes, the field retains the resolved promise. On WebSocket disconnect/reconnect (without a full client reset), the field must be cleared so the next call triggers a fresh filesystem scan. Add a handler on `onWebSocketStatusChanged` that sets the field to `undefined` when `isConnected` is false. + +**14. The server must not `expect()` / panic on UTF-8 conversion — return a client error.** +In `update_text`, the parent version's content may be binary (if another client uploaded binary via `putBinary`). Using `.expect()` on `str::from_utf8()` panics the server. Use `.context(...).map_err(client_error)?` to return a 4xx error, allowing the client to fall back to `putBinary`. + +**15. The create-merge parent content must be `latest_version.content`, not empty.** +In `create_document.rs`, when a create merges with an existing document, the 3-way merge parent must be the latest version's content (`&latest_version.content`), not an empty vector (`&Vec::new()`). An empty parent causes `reconcile("", existing, new)` to treat all content as additions, producing garbled interleaved text. + +**16. `retryForever` must not retry 4xx HTTP errors.** +4xx errors indicate the request itself is wrong (e.g., invalid diff, missing parent version). Retrying won't help. The `HttpClientError` class (in `errors/http-client-error.ts`) carries the status code. `retryForever` checks for it and re-throws immediately. Only 5xx errors (transient server failures) are retried. + +**17. The broadcast channel's `RecvError::Lagged` must be handled explicitly.** +The `while let Ok(update) = broadcast_receiver.recv().await` pattern silently exits the loop on `Lagged`, disconnecting the client without logging. Handle `Lagged` explicitly with a `warn!` log and `break`. The channel capacity (`broadcast_channel_capacity` in config, default 1024) is separate from `max_clients_per_vault`. + +### E2E Test Debugging Guide + +**How to run E2E tests:** +```bash +cd sync-server && rm -rf databases && ./target/release/sync_server config-e2e.yml & +sleep 3 +cd /volumes/syncthing/Projects/vault-link && scripts/e2e.sh 8 +``` +Always clean the `databases` directory before running. The server must be running separately. + +**Common E2E failure patterns:** + +1. **`SyncResetError` unhandled rejection**: Check that `enqueueSyncOperation` catches all errors and that `pause()` swallows SyncResetError. The test client's `unhandledRejection` handler checks `error.name === "SyncResetError"` — if the error message changes, update the filter in `test-client/src/cli.ts`. + +2. **"Files from agent-X missing in agent-Y"**: This is a consistency assertion. Check the agent's LOCAL file list (now correctly logged per-agent after a logging bug fix). Common causes: + - **Broadcasts lost during shutdown**: Operations completed on one agent but the WebSocket broadcast didn't reach the other before destroy. The 5-second sleep between finish and destroy helps. + - **Path deconfliction**: Both agents have the same DOCUMENT but at different LOCAL paths (e.g., `binary-10.bin` vs `binary-10 (1).bin`). This is a known limitation with concurrent creates at the same path. + - **Failed sync operations not retried**: If `executeSync` throws, the failed file won't be retried until the next WebSocket reconnect (which clears `runningScheduleSyncForOfflineChanges` and triggers a fresh filesystem scan). + +3. **"Document not found in database"**: A concurrent operation removed the document between the last `await` and the `updateDocumentMetadata` call. Add a `containsDocument` guard. + +4. **"Duplicate documentId found in database"**: Two documents have the same `documentId`. Usually caused by `resolveIdempotencyKeys` or `handleMaybeMergingResponse` assigning a documentId without checking if another doc already has it. + +5. **"Invalid diff: attempting to access N characters..."**: The content cache has wrong content for a `parentVersionId`. Common causes: (a) cached local content instead of server content after MergingUpdate; (b) idempotent create returned a stale version but the client cached its current content under that version ID; (c) `parentVersionId` changed between cache lookup and `putText` call due to mutable shared reference. + +6. **"Parent version with id 0 not found"**: A document's `parentVersionId` is 0 (set by `resolveIdempotencyKeys`). The sync path should treat `parentVersionId === 0` as a create, not an update. + +**Test client internals (`test-client/src/agent/mock-agent.ts`):** +- `files`: InMemoryFileSystem map — the ACTUAL filesystem state +- `data`: Map of expected file contents — what the agent CREATED/UPDATED +- `assertFileSystemsAreConsistent`: Compares `files` maps between two agents +- `assertAllContentIsPresentOnce`: Checks no duplicate content across files +- The `finish()` and `destroy()` methods use `withTimeout(TIMEOUT_MS)` — operations that exceed 30s are killed + +**Logging bug (fixed):** In `assertFileSystemsAreConsistent`, the error handler's "Local files" log previously printed `otherAgent.files.keys()` for BOTH agents. Now correctly prints `this.files.keys()` for the current agent. diff --git a/frontend/sync-client/src/errors/http-client-error.ts b/frontend/sync-client/src/errors/http-client-error.ts new file mode 100644 index 00000000..38bb2fd0 --- /dev/null +++ b/frontend/sync-client/src/errors/http-client-error.ts @@ -0,0 +1,8 @@ +export class HttpClientError extends Error { + public readonly status: number; + public constructor(status: number, message: string) { + super(message); + this.name = "HttpClientError"; + this.status = status; + } +} diff --git a/frontend/sync-client/src/persistence/database.ts b/frontend/sync-client/src/persistence/database.ts index 2f69e4bb..91d9473c 100644 --- a/frontend/sync-client/src/persistence/database.ts +++ b/frontend/sync-client/src/persistence/database.ts @@ -64,32 +64,45 @@ export class Database { ) { initialState ??= {}; - this.documents = - initialState.documents?.map(({ relativePath, ...metadata }) => ({ + const validDocuments = (initialState.documents ?? []).filter( + (doc) => + this.validateStoredField(doc, "relativePath", "string") && + this.validateStoredField(doc, "documentId", "string") && + this.validateStoredField(doc, "parentVersionId", "number") + ); + + this.documents = validDocuments.map( + ({ relativePath, ...metadata }) => ({ relativePath, metadata, isDeleted: false, parallelVersion: 0 - })) ?? []; + }) + ); - if (initialState.pendingDocuments) { - for (const pending of initialState.pendingDocuments) { - const existing = - this.getLatestDocumentByRelativePath( - pending.relativePath - ); - this.documents.push({ - relativePath: pending.relativePath, - metadata: undefined, - isDeleted: false, - parallelVersion: - existing !== undefined - ? existing.parallelVersion + 1 - : 0, - originalCreationPath: pending.originalCreationPath, - idempotencyKey: pending.idempotencyKey - }); - } + const validPendingDocuments = ( + initialState.pendingDocuments ?? [] + ).filter( + (doc) => + this.validateStoredField(doc, "relativePath", "string") && + this.validateStoredField(doc, "idempotencyKey", "string") + ); + + for (const pending of validPendingDocuments) { + const existing = this.getLatestDocumentByRelativePath( + pending.relativePath + ); + this.documents.push({ + relativePath: pending.relativePath, + metadata: undefined, + isDeleted: false, + parallelVersion: + existing !== undefined + ? existing.parallelVersion + 1 + : 0, + originalCreationPath: pending.originalCreationPath, + idempotencyKey: pending.idempotencyKey + }); } this.ensureConsistency(); @@ -106,6 +119,25 @@ export class Database { }); } + private validateStoredField( + doc: object, + field: string, + expectedType: "string" | "number" + ): boolean { + const value = (doc as Record)[field]; + if ( + typeof value !== expectedType || + (expectedType === "string" && !value) || + (expectedType === "number" && isNaN(value as number)) + ) { + this.logger.warn( + `Skipping stored document with invalid ${field}: ${JSON.stringify(doc)}` + ); + return false; + } + return true; + } + public get length(): number { return this.documents.length; } @@ -301,7 +333,7 @@ export class Database { ({ relativePath, metadata }) => ({ relativePath, // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - ...metadata! // `resolvedDocuments` only returns docs with metadata set + ...metadata! // filtered to only docs with metadata set }) ), pendingDocuments: this.pendingDocuments.map( @@ -316,6 +348,25 @@ export class Database { } private ensureConsistency(): void { + // Check for duplicate documentIds across ALL documents with metadata, + // not just the deduplicated resolvedDocuments view. A duplicate on a + // lower-parallelVersion record would otherwise go undetected. + const allWithMetadata = this.documents + // eslint-disable-next-line no-restricted-syntax -- Type narrowing, not removing a specific item + .filter((d) => d.metadata !== undefined); + const documentIdSet = new Set(); + for (const doc of allWithMetadata) { + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + const docId = doc.metadata!.documentId; + if (documentIdSet.has(docId)) { + throw new Error( + `Duplicate documentId ${docId} found in database` + ); + } + documentIdSet.add(docId); + } + + // Also check the deduplicated view for path-level invariants const idToPath = new Map(); this.resolvedDocuments.forEach(({ relativePath, metadata }) => { diff --git a/frontend/sync-client/src/services/sync-service.ts b/frontend/sync-client/src/services/sync-service.ts index acac8958..458d8efe 100644 --- a/frontend/sync-client/src/services/sync-service.ts +++ b/frontend/sync-client/src/services/sync-service.ts @@ -9,6 +9,7 @@ import type { Settings } from "../persistence/settings"; import type { FetchController } from "./fetch-controller"; import { sleep } from "../utils/sleep"; import { SyncResetError } from "../errors/sync-reset-error"; +import { HttpClientError } from "../errors/http-client-error"; import type { SerializedError } from "./types/SerializedError"; import type { DocumentVersionWithoutContent } from "./types/DocumentVersionWithoutContent"; import type { DocumentUpdateResponse } from "./types/DocumentUpdateResponse"; @@ -65,6 +66,17 @@ export class SyncService { return result; } + private static async throwHttpError( + response: Response, + context: string + ): Promise { + const message = `${context}: ${await SyncService.errorFromResponse(response)}`; + if (response.status >= 400 && response.status < 500) { + throw new HttpClientError(response.status, message); + } + throw new Error(message); + } + public async create({ relativePath, contentBytes, @@ -98,10 +110,9 @@ export class SyncService { }); if (!response.ok) { - throw new Error( - `Failed to create document: ${await SyncService.errorFromResponse( - response - )}` + await SyncService.throwHttpError( + response, + "Failed to create document" ); } @@ -146,10 +157,9 @@ export class SyncService { ); if (!response.ok) { - throw new Error( - `Failed to update document: ${await SyncService.errorFromResponse( - response - )}` + await SyncService.throwHttpError( + response, + "Failed to update document" ); } @@ -157,8 +167,7 @@ export class SyncService { (await response.json()) as DocumentUpdateResponse; // eslint-disable-line @typescript-eslint/no-unsafe-type-assertion this.logger.debug( - `Updated document ${JSON.stringify(result)} with id ${ - result.documentId + `Updated document ${JSON.stringify(result)} with id ${result.documentId }}` ); @@ -199,10 +208,9 @@ export class SyncService { ); if (!response.ok) { - throw new Error( - `Failed to update document: ${await SyncService.errorFromResponse( - response - )}` + await SyncService.throwHttpError( + response, + "Failed to update document" ); } @@ -210,8 +218,7 @@ export class SyncService { (await response.json()) as DocumentUpdateResponse; // eslint-disable-line @typescript-eslint/no-unsafe-type-assertion this.logger.debug( - `Updated document ${JSON.stringify(result)} with id ${ - result.documentId + `Updated document ${JSON.stringify(result)} with id ${result.documentId }}` ); @@ -245,10 +252,9 @@ export class SyncService { ); if (!response.ok) { - throw new Error( - `Failed to delete document: ${await SyncService.errorFromResponse( - response - )}` + await SyncService.throwHttpError( + response, + "Failed to delete document" ); } @@ -279,10 +285,9 @@ export class SyncService { ); if (!response.ok) { - throw new Error( - `Failed to get document: ${await SyncService.errorFromResponse( - response - )}` + await SyncService.throwHttpError( + response, + "Failed to get document" ); } @@ -317,10 +322,9 @@ export class SyncService { ); if (!response.ok) { - throw new Error( - `Failed to get document: ${await SyncService.errorFromResponse( - response - )}` + await SyncService.throwHttpError( + response, + "Failed to get document" ); } @@ -338,7 +342,7 @@ export class SyncService { return this.retryForever(async () => { this.logger.debug( "Getting all documents" + - (since != null ? ` since ${since}` : "") + (since != null ? ` since ${since}` : "") ); const url = new URL(this.getUrl("/documents")); @@ -350,10 +354,9 @@ export class SyncService { }); if (!response.ok) { - throw new Error( - `Failed to get documents: ${await SyncService.errorFromResponse( - response - )}` + await SyncService.throwHttpError( + response, + "Failed to get documents" ); } @@ -464,6 +467,12 @@ export class SyncService { throw e; } + // Don't retry 4xx client errors — the request itself is wrong + // and retrying won't help + if (e instanceof HttpClientError) { + throw e; + } + const retryInterval = this.settings.getSettings().networkRetryIntervalMs; this.logger.error( diff --git a/frontend/sync-client/src/services/websocket-manager.ts b/frontend/sync-client/src/services/websocket-manager.ts index e99b8662..4f06d0b9 100644 --- a/frontend/sync-client/src/services/websocket-manager.ts +++ b/frontend/sync-client/src/services/websocket-manager.ts @@ -109,6 +109,10 @@ export class WebSocketManager { await awaitAll(this.outstandingPromises); } + public hasOutstandingWork(): boolean { + return this.outstandingPromises.length > 0; + } + public sendHandshakeMessage( message: WebSocketClientMessage & { type: "handshake" } ): void { diff --git a/frontend/sync-client/src/sync-client.ts b/frontend/sync-client/src/sync-client.ts index db6ff902..3edd9a70 100644 --- a/frontend/sync-client/src/sync-client.ts +++ b/frontend/sync-client/src/sync-client.ts @@ -18,6 +18,7 @@ import type { NetworkConnectionStatus } from "./types/network-connection-status" import { DocumentSyncStatus } from "./types/document-sync-status"; import { WebSocketManager } from "./services/websocket-manager"; import { createClientId } from "./utils/create-client-id"; +import { SyncResetError } from "./errors/sync-reset-error"; import { CursorTracker } from "./sync-operations/cursor-tracker"; import type { CursorSpan } from "./services/types/CursorSpan"; import type { MaybeOutdatedClientCursors } from "./types/maybe-outdated-client-cursors"; @@ -424,8 +425,21 @@ export class SyncClient { public async waitUntilFinished(): Promise { this.checkIfDestroyed("waitUntilIdle"); - await this.syncer.waitUntilFinished(); - await this.webSocketManager.waitUntilFinished(); + // Loop until both sync queue and WebSocket handlers are + // simultaneously idle. WS handlers can enqueue new sync + // operations, and completed sync operations can trigger + // broadcasts that create new WS handler promises. + let iteration = 0; + while (true) { + iteration++; + this.logger.info(`waitUntilFinished: iteration ${iteration}`); + await this.webSocketManager.waitUntilFinished(); + await this.syncer.waitUntilFinished(); + // Check if anything new arrived while we were waiting + if (!this.webSocketManager.hasOutstandingWork()) { + break; + } + } await this.database.save(); // flush all changes to disk } @@ -476,10 +490,40 @@ export class SyncClient { this.hasFinishedOfflineSync = true; } + /** + * Hard pause: aborts all in-flight HTTP operations via FetchController reset. + * Used when the SyncClient is being destroyed or fully reset (connection + * settings changed). This is the nuclear option — every outstanding fetch + * is rejected with SyncResetError so the queue drains immediately. + */ private async pause(): Promise { this.hasFinishedOfflineSync = false; this.fetchController.startReset(); + try { + await this.webSocketManager.stop(); + await this.waitUntilFinished(); + } catch (e) { + // SyncResetError is expected here — we just called startReset() + // which rejects in-flight fetches. Only re-throw non-reset errors + // (after ensuring the FetchController is left in a usable state). + this.fetchController.finishReset(); + if (!(e instanceof SyncResetError)) { + throw e; + } + } + } + + /** + * Soft pause: stops the WebSocket and clears the sync queue, but lets + * in-flight HTTP operations complete naturally. Used when the user toggles + * sync off — we don't want to abort creates/updates that are mid-flight + * because they'd just be re-queued on re-enable, potentially leading to + * an infinite retry loop with flaky connections. + */ + private async softPause(): Promise { + this.hasFinishedOfflineSync = false; await this.webSocketManager.stop(); + this.syncer.reset(); await this.waitUntilFinished(); } @@ -509,7 +553,7 @@ export class SyncClient { if (newSettings.isSyncEnabled) { await this.startSyncing(); } else { - await this.pause(); + await this.softPause(); } } diff --git a/frontend/sync-client/src/sync-operations/syncer.ts b/frontend/sync-client/src/sync-operations/syncer.ts index 7624d0a8..95f0ca33 100644 --- a/frontend/sync-client/src/sync-operations/syncer.ts +++ b/frontend/sync-client/src/sync-operations/syncer.ts @@ -71,6 +71,10 @@ export class Syncer { if (isConnected) { // The JS WebSocket API doesn't support setting headers, so we have to send the token as a message this.sendHandshakeMessage(); + } else { + // Clear so that the next reconnect re-runs scheduleSyncForOfflineChanges + // instead of returning the stale resolved promise. + this.runningScheduleSyncForOfflineChanges = undefined; } }); this.webSocketManager.onRemoteVaultUpdateReceived.add( @@ -267,7 +271,7 @@ export class Syncer { public async waitUntilFinished(): Promise { await this.runningScheduleSyncForOfflineChanges; - await this.syncQueue.onIdle(); // Wait for queue to be empty and running tasks to finish + await this.syncQueue.onIdle(); } public async syncRemotelyUpdatedFile( @@ -330,19 +334,19 @@ export class Syncer { remoteVersion.documentId ); await this.enqueueSyncOperation( - async () => - this.unrestrictedSyncer.unrestrictedSyncRemotelyUpdatedFile( + async () => { + await this.unrestrictedSyncer.unrestrictedSyncRemotelyUpdatedFile( remoteVersion, document - ), + ); + this.database.addSeenUpdateId(remoteVersion.vaultUpdateId); + }, [ document?.relativePath, remoteVersion.relativePath, remoteVersion.documentId ] ); - - this.database.addSeenUpdateId(remoteVersion.vaultUpdateId); } private async internalScheduleSyncForOfflineChanges(): Promise { @@ -371,9 +375,12 @@ export class Syncer { } const instructions: (Instruction | undefined)[] = await awaitAll( allLocalFiles.map(async (relativePath) => { - if ( + const existingMetadata = this.database.getLatestDocumentByRelativePath(relativePath) - ?.metadata !== undefined + ?.metadata; + if ( + existingMetadata !== undefined && + existingMetadata.parentVersionId > 0 ) { this.logger.debug( `Document ${relativePath} might have been updated locally, scheduling sync to validate and update it` @@ -382,12 +389,27 @@ export class Syncer { return { type: "update", relativePath } as Instruction; } - // Perhaps the file has been moved; let's check by looking at the deleted files - const contentHash = await this.syncQueue.add(async () => { + // Perhaps the file has been moved; let's check by looking at the deleted files. + // Skip reading oversized files into memory for hash computation — + // they can't participate in move detection and will be scheduled as creates. + const hashResult = await this.syncQueue.add(async () => { try { + const sizeInBytes = + await this.operations.getFileSize(relativePath); + const sizeInMB = Math.ceil( + sizeInBytes / 1024 / 1024 + ); + const { maxFileSizeMB } = + this.settings.getSettings(); + if (sizeInMB > maxFileSizeMB) { + // File exceeds size limit — skip hash-based move + // detection and schedule as a create instead + return { skippedOversized: true } as const; + } + const contentBytes = await this.operations.read(relativePath); // this can throw FileNotFoundError - return hash(contentBytes); + return { hash: hash(contentBytes) } as const; } catch (e) { if ( e instanceof Error && @@ -399,15 +421,21 @@ export class Syncer { } }); - if (contentHash == undefined) { + if (hashResult == undefined) { // The file was deleted before we had a chance to read it, no need to sync it here return; } - const originalFile = findMatchingFile( - contentHash, - locallyPossiblyDeletedFiles - ); + const contentHash = + "hash" in hashResult ? hashResult.hash : undefined; + + const originalFile = + contentHash != undefined + ? findMatchingFile( + contentHash, + locallyPossiblyDeletedFiles + ) + : undefined; if (originalFile !== undefined) { // `originalFile` hasn't been deleted but it got moved instead /* eslint-disable no-restricted-syntax -- Comparing by property, not direct equality */ @@ -505,12 +533,25 @@ export class Syncer { // // The result type needs special handling since syncQueue.add() can // return undefined when the queue is paused/cleared. - const result = await this.syncQueue.add(async () => - this.updatedDocumentsByPathAndKeysLocks.withLock( - filteredKeys, - operation - ) - ); + const result = await this.syncQueue.add(async () => { + try { + return await this.updatedDocumentsByPathAndKeysLocks.withLock( + filteredKeys, + operation + ); + } catch (e) { + // Catch all errors to prevent unhandled promise rejections. + // SyncResetError: lock waiter rejected during reset (expected). + // Other errors: logged by executeSync's history entry, will + // be retried on the next scheduleSyncForOfflineChanges cycle. + if (!(e instanceof SyncResetError)) { + this.logger.info( + `Sync operation failed, will retry on next cycle: ${e}` + ); + } + return undefined; + } + }); return result as T; } } diff --git a/frontend/sync-client/src/sync-operations/unrestricted-syncer.ts b/frontend/sync-client/src/sync-operations/unrestricted-syncer.ts index a41bf8c2..59b42978 100644 --- a/frontend/sync-client/src/sync-operations/unrestricted-syncer.ts +++ b/frontend/sync-client/src/sync-operations/unrestricted-syncer.ts @@ -83,6 +83,15 @@ export class UnrestrictedSyncer { doc.idempotencyKey !== undefined && resolved.has(doc.idempotencyKey) ) { + // Check if document was removed by a concurrent operation + // (e.g., a delete) between the snapshot and now + if (!this.database.containsDocument(doc)) { + this.logger.info( + `Pending doc at ${doc.relativePath} was removed during key resolution, skipping` + ); + continue; + } + const documentId = resolved.get(doc.idempotencyKey)!; // eslint-disable-line @typescript-eslint/no-non-null-assertion // Skip if this documentId is already assigned to another document @@ -160,7 +169,14 @@ export class UnrestrictedSyncer { let response: DocumentVersion | DocumentUpdateResponse | undefined = undefined; - if (document.metadata === undefined) { + if ( + document.metadata === undefined || + document.metadata.parentVersionId === 0 + ) { + // parentVersionId === 0 occurs when resolveIdempotencyKeys + // assigned a documentId but hasn't synced yet. Treat as a + // create — the server will recognise the idempotency key + // and return the existing document. response = await this.syncService.create({ relativePath: originalRelativePath, contentBytes, @@ -188,16 +204,22 @@ export class UnrestrictedSyncer { (await this.serverConfig.getConfig()) .mergeableFileExtensions ); + // Snapshot parentVersionId atomically with the cache + // lookup. document.metadata is a mutable shared + // reference — a concurrent operation could update + // parentVersionId between the cache lookup and the + // putText call, causing a diff/version mismatch. + const parentVersionIdForUpdate = + document.metadata.parentVersionId; const cachedVersion = this.contentCache.get( - document.metadata.parentVersionId + parentVersionIdForUpdate ); response = isText && cachedVersion !== undefined ? await this.syncService.putText({ documentId: document.metadata.documentId, - parentVersionId: - document.metadata.parentVersionId, + parentVersionId: parentVersionIdForUpdate, relativePath: document.relativePath, content: diff( new TextDecoder().decode(cachedVersion), @@ -206,8 +228,7 @@ export class UnrestrictedSyncer { }) : await this.syncService.putBinary({ documentId: document.metadata.documentId, - parentVersionId: - document.metadata.parentVersionId, + parentVersionId: parentVersionIdForUpdate, relativePath: document.relativePath, contentBytes }); @@ -522,6 +543,31 @@ export class UnrestrictedSyncer { this.logger.info( `Document ${document.relativePath} has been deleted before we could finish updating it` ); + // Assign metadata so the pending delete can inform the server + if (document.metadata === undefined) { + const existingWithSameId = + this.database.getDocumentByDocumentId( + response.documentId + ); + if ( + existingWithSameId !== undefined && + existingWithSameId !== document + ) { + // Another doc already has this documentId — the server + // knows about it. Just remove this stale pending doc. + this.database.removeDocument(document); + } else { + this.database.updateDocumentMetadata( + { + documentId: response.documentId, + parentVersionId: response.vaultUpdateId, + hash: contentHash, + remoteRelativePath: response.relativePath + }, + document + ); + } + } this.database.addSeenUpdateId(response.vaultUpdateId); return; } @@ -615,18 +661,9 @@ export class UnrestrictedSyncer { if (!("type" in response) || response.type === "MergingUpdate") { const responseBytes = base64ToBytes(response.contentBase64); - contentHash = hash(responseBytes); - - this.database.updateDocumentMetadata( - { - documentId: response.documentId, - parentVersionId: response.vaultUpdateId, - hash: contentHash, - remoteRelativePath: response.relativePath - }, - document - ); + // Write file BEFORE updating metadata so that if the write fails, + // metadata doesn't point to a version whose content was never written. await this.operations.write( actualPath, originalContentBytes, @@ -642,27 +679,90 @@ export class UnrestrictedSyncer { ); } + // Re-read and re-hash after write because the 3-way merge in + // operations.write() may produce content different from responseBytes. + const actualContent = await this.operations.read(actualPath); + const actualHash = hash(actualContent); + // The document may have been removed by a concurrent operation + // (e.g., a delete) during the awaited file write/read above. + // The file is safely on disk; recovery will re-detect it. + if (!this.database.containsDocument(document)) { + this.logger.info( + `Document ${document.relativePath} was removed during sync, skipping metadata update` + ); + return; + } + + this.database.updateDocumentMetadata( + { + documentId: response.documentId, + parentVersionId: response.vaultUpdateId, + hash: actualHash, + remoteRelativePath: response.relativePath + }, + document + ); + + // Cache the SERVER's content (responseBytes), not the local + // content (actualContent). The cache is used to compute diffs + // for subsequent updates: diff(cached, newFileContent). The + // server applies this diff against its content at + // parentVersionId, which is responseBytes. Using actualContent + // would produce diffs that don't match the server's state. await this.updateCache( response.vaultUpdateId, responseBytes, actualPath ); } else { - this.database.updateDocumentMetadata( - { - documentId: response.documentId, - parentVersionId: response.vaultUpdateId, - hash: contentHash, - remoteRelativePath: response.relativePath - }, - document - ); - await this.updateCache( - response.vaultUpdateId, - originalContentBytes, - actualPath - ); + // FastForwardUpdate — the server accepted our content as-is, + // UNLESS this was an idempotent create return (the server + // returned the original version, whose content may differ from + // what we sent). Detect this by comparing contentSize. + const serverContentMatchesLocal = + !("contentSize" in response) || + response.contentSize === originalContentBytes.length; + + if (serverContentMatchesLocal) { + this.database.updateDocumentMetadata( + { + documentId: response.documentId, + parentVersionId: response.vaultUpdateId, + hash: contentHash, + remoteRelativePath: response.relativePath + }, + document + ); + await this.updateCache( + response.vaultUpdateId, + originalContentBytes, + actualPath + ); + } else { + // The server returned a stale idempotent version. Fetch + // the actual content so the cache stays consistent, then + // the hash mismatch will trigger a follow-up update sync. + const serverContent = + await this.syncService.getDocumentVersionContent({ + documentId: response.documentId, + vaultUpdateId: response.vaultUpdateId + }); + this.database.updateDocumentMetadata( + { + documentId: response.documentId, + parentVersionId: response.vaultUpdateId, + hash: hash(serverContent), + remoteRelativePath: response.relativePath + }, + document + ); + await this.updateCache( + response.vaultUpdateId, + serverContent, + actualPath + ); + } } this.database.addSeenUpdateId(response.vaultUpdateId); @@ -672,9 +772,10 @@ export class UnrestrictedSyncer { sizeInBytes: number, relativePath: RelativePath ): CommonHistoryEntry | undefined { - const sizeInMB = Math.round(sizeInBytes / 1024 / 1024); const { maxFileSizeMB } = this.settings.getSettings(); - if (sizeInMB > maxFileSizeMB) { + const maxFileSizeBytes = maxFileSizeMB * 1024 * 1024; + if (sizeInBytes > maxFileSizeBytes) { + const sizeInMB = (sizeInBytes / 1024 / 1024).toFixed(1); return { status: SyncStatus.SKIPPED, details: { diff --git a/frontend/sync-client/src/utils/data-structures/locks.ts b/frontend/sync-client/src/utils/data-structures/locks.ts index 4e512869..2945ff5e 100644 --- a/frontend/sync-client/src/utils/data-structures/locks.ts +++ b/frontend/sync-client/src/utils/data-structures/locks.ts @@ -85,7 +85,11 @@ export class Locks { reject(new SyncResetError()); } } - this.locked.clear(); + + // Do NOT clear this.locked — let running operations release their own + // locks via the finally block in withLock. Clearing this.locked would + // allow new operations to acquire locks on keys still held by in-flight + // operations, breaking mutual exclusion. this.waiters.clear(); } diff --git a/frontend/test-client/src/agent/mock-agent.ts b/frontend/test-client/src/agent/mock-agent.ts index a089bae3..f11e6b34 100644 --- a/frontend/test-client/src/agent/mock-agent.ts +++ b/frontend/test-client/src/agent/mock-agent.ts @@ -13,6 +13,7 @@ const TIMEOUT_MS = 10 * 60 * 1000; export class MockAgent extends MockClient { private readonly writtenContents: string[] = []; + private readonly writtenBinaryContents: string[] = []; private readonly pendingActions: Promise[] = []; // The renamed file finding algorithm isn't too smart so we can't both update and rename the same file @@ -51,7 +52,7 @@ export class MockAgent extends MockClient { const formatted = `[${this.name} ${state}] ${logLine.timestamp.toISOString()} ${logLine.level} ${logLine.message}`; // HACK: we have to ensure the file has been synced if we want to change it offline without data loss - const historyEntry = /.*History entry: (.*.md).*/.exec( + const historyEntry = /.*History entry: (.*\.(?:md|bin)).*/.exec( logLine.message ); @@ -115,9 +116,11 @@ export class MockAgent extends MockClient { ); } + public async act(): Promise { const options: (() => Promise)[] = [ - this.createFileAction.bind(this) + this.createFileAction.bind(this), + this.createBinaryFileAction.bind(this) ]; if ( @@ -132,7 +135,8 @@ export class MockAgent extends MockClient { options.push( this.renameFileAction.bind(this), - this.updateFileAction.bind(this) + this.updateFileAction.bind(this), + this.updateBinaryFileAction.bind(this) ); if (this.doDeletes) { @@ -226,26 +230,26 @@ export class MockAgent extends MockClient { "Local data: " + JSON.stringify(this.data, null, 2) ); this.client.logger.info( - "Local files: " + Array.from(otherAgent.files.keys()).join(", ") + "Local files: " + Array.from(this.files.keys()).join(", ") ); otherAgent.client.logger.info( - "Local data: " + JSON.stringify(otherAgent.data, null, 2) + "Other agent's data: " + JSON.stringify(otherAgent.data, null, 2) ); otherAgent.client.logger.info( - "Local files: " + Array.from(otherAgent.files.keys()).join(", ") + "Other agent's files: " + Array.from(otherAgent.files.keys()).join(", ") ); throw e; } } + // For slow file events, still check for duplicates (skip existence check). + // Duplication is always a bug regardless of timing. public assertAllContentIsPresentOnce(): void { if (this.useSlowFileEvents) { this.client.logger.info( - // We can't ensure that we have seen every single update - `Skipping content check for ${this.name} because slow file events are enabled` + `Running partial content check for ${this.name} (slow file events: skipping existence check)` ); - return; } for (const content of this.writtenContents) { @@ -260,14 +264,13 @@ export class MockAgent extends MockClient { `[${this.name}] Content ${content} found in multiple files: ${found.join(", ")}` ); - if (!this.doDeletes) { + if (!this.useSlowFileEvents && !this.doDeletes) { assert( found.length >= 1, `[${this.name}] Content ${content} not found in any files` ); } - if (found.length === 1) { const [file] = found; const fileContent = new TextDecoder().decode( @@ -281,6 +284,31 @@ export class MockAgent extends MockClient { } } + // Check binary content isn't duplicated across files. + // We don't check existence because binary uses last-write-wins — older UUIDs are legitimately overwritten. + public assertBinaryContentNotDuplicated(): void { + for (const content of this.writtenBinaryContents) { + const found = Array.from(this.files.keys()).filter((key) => { + return new TextDecoder() + .decode(this.files.get(key)) + .includes(content); + }); + + assert( + found.length <= 1, + `[${this.name}] Binary content ${content} found in multiple files: ${found.join(", ")}` + ); + } + } + + public getFileList(): string[] { + return Array.from(this.files.keys()); + } + + public getFileContent(path: string): Uint8Array | undefined { + return this.files.get(path); + } + private async resetClient(): Promise { this.client.logger.info(`Resetting client ${this.name}`); await this.client.destroy(); @@ -308,6 +336,28 @@ export class MockAgent extends MockClient { }); } + // Binary file creation — exercises the putBinary server path (not in mergeable_file_extensions) + private async createBinaryFileAction(): Promise { + const file = this.getBinaryFileName(); + + if ( + (!this.lastSyncEnabledState && + this.doNotTouchWhileOffline.includes(file)) || + (await this.exists(file)) + ) { + return; + } + + const content = this.getBinaryContent(); + this.client.logger.info( + `Decided to create binary file ${file}` + ); + + return this.create(file, content, { + ignoreSlowFileEvents: true + }); + } + private async disableSyncAction(): Promise { this.client.logger.info(`Decided to disable sync`); this.lastSyncEnabledState = false; @@ -357,7 +407,9 @@ export class MockAgent extends MockClient { } private async updateFileAction(): Promise { - const files = await this.listFilesRecursively(); + const files = (await this.listFilesRecursively()).filter((f) => + f.endsWith(".md") + ); if (files.length === 0) { return; } @@ -391,6 +443,40 @@ export class MockAgent extends MockClient { ); } + // Binary file update — complete replacement (last-write-wins) + private async updateBinaryFileAction(): Promise { + const files = (await this.listFilesRecursively()).filter((f) => + f.endsWith(".bin") + ); + if (files.length === 0) { + return; + } + + const file = choose(files); + + if ( + !this.lastSyncEnabledState && + this.doNotTouchWhileOffline.includes(file) + ) { + return; + } + + const content = this.getBinaryContent(); + this.client.logger.info( + `Decided to update binary file ${file}` + ); + this.doNotTouchWhileOffline.push(file); + this.files.set(file, content); + + this.executeFileOperation( + async () => + this.client.syncLocallyUpdatedFile({ + relativePath: file + }), + true + ); + } + private async deleteFileAction(): Promise { const files = await this.listFilesRecursively(); if (files.length === 0) { @@ -408,8 +494,19 @@ export class MockAgent extends MockClient { return uuid; } + private getBinaryContent(): Uint8Array { + const uuid = uuidv4(); + this.writtenBinaryContents.push(uuid); + return new TextEncoder().encode(`BINARY:${uuid}`); + } + private getFileName(): string { // Simulate name collisions between the clients return `file-${Math.floor(Math.random() * 64)}.md`; } + + private getBinaryFileName(): string { + // Smaller range to increase collision frequency for last-write-wins testing + return `binary-${Math.floor(Math.random() * 16)}.bin`; + } } diff --git a/frontend/test-client/src/agent/mock-client.ts b/frontend/test-client/src/agent/mock-client.ts index 17f17e80..283a36d3 100644 --- a/frontend/test-client/src/agent/mock-client.ts +++ b/frontend/test-client/src/agent/mock-client.ts @@ -177,7 +177,7 @@ export class MockClient extends debugging.InMemoryFileSystem { ); } - private executeFileOperation( + protected executeFileOperation( callback: () => unknown, ignoreSlowFileEvents = false ): void { diff --git a/frontend/test-client/src/cli.ts b/frontend/test-client/src/cli.ts index 4b97fbef..2b6dd774 100644 --- a/frontend/test-client/src/cli.ts +++ b/frontend/test-client/src/cli.ts @@ -119,7 +119,7 @@ async function runTest({ logger.info( `Checking consistency between ${client.name} and ${clients[i + 1].name}` ); - client.assertFileSystemsAreConsistent(clients[i]); + client.assertFileSystemsAreConsistent(clients[i + 1]); logger.info(`Consistency check for ${client.name} passed`); }); diff --git a/sync-server/src/server/create_document.rs b/sync-server/src/server/create_document.rs index 90e08b30..e112dc36 100644 --- a/sync-server/src/server/create_document.rs +++ b/sync-server/src/server/create_document.rs @@ -55,7 +55,9 @@ pub async fn create_document( .await .map_err(server_error)?; if let Some(existing) = existing { - info!("Found existing document with idempotency key `{idempotency_key}`, returning existing document"); + info!( + "Found existing document with idempotency key `{idempotency_key}`, returning existing document" + ); transaction .rollback() .await @@ -78,6 +80,7 @@ pub async fn create_document( ) .await .map_err(server_error)?; + if let Some(latest_version) = latest_version { info!( "Document already exists at new location: `{sanitized_relative_path}` when trying to create it in vault `{vault_id}`, merging into existing document" @@ -85,7 +88,7 @@ pub async fn create_document( return merge_with_stored_version( &sanitized_relative_path, - &Vec::new(), + &latest_version.content.clone(), latest_version, vault_id, user, diff --git a/sync-server/src/server/update_document.rs b/sync-server/src/server/update_document.rs index a07aec54..d97e394e 100644 --- a/sync-server/src/server/update_document.rs +++ b/sync-server/src/server/update_document.rs @@ -79,9 +79,12 @@ pub async fn update_text( ) -> Result, SyncServerError> { let parent_document = get_parent_document(&state, &vault_id, request.parent_version_id).await?; + let parent_content = str::from_utf8(&parent_document.content) + .context("Parent document content is not valid UTF-8") + .map_err(client_error)?; + let edited_text = EditedText::from_diff( - str::from_utf8(&parent_document.content) - .expect("parent must be valid UTF-8 because it's a text document"), + parent_content, request.content, &*BuiltinTokenizer::Word, ) @@ -232,15 +235,20 @@ pub async fn merge_with_stored_version( "Merging changes for document `{}` in vault `{vault_id}`", latest_version.document_id ); + let parent_str = str::from_utf8(parent_document_content) + .context("Parent document content is not valid UTF-8") + .map_err(server_error)?; + let latest_str = str::from_utf8(&latest_version.content) + .context("Latest version content is not valid UTF-8") + .map_err(server_error)?; + let content_str = str::from_utf8(&content) + .context("New content is not valid UTF-8") + .map_err(server_error)?; + reconcile( - str::from_utf8(parent_document_content) - .expect("parent must be valid UTF-8 because it's not binary"), - &str::from_utf8(&latest_version.content) - .expect("latest_version must be valid UTF-8 because it's not binary") - .into(), - &str::from_utf8(&content) - .expect("content must be valid UTF-8 because it's not binary") - .into(), + parent_str, + &latest_str.into(), + &content_str.into(), &*BuiltinTokenizer::Word, ) .apply() diff --git a/sync-server/src/server/websocket.rs b/sync-server/src/server/websocket.rs index bb10b49f..afb3b710 100644 --- a/sync-server/src/server/websocket.rs +++ b/sync-server/src/server/websocket.rs @@ -7,7 +7,7 @@ use axum::{ response::Response, }; use futures::stream::StreamExt; -use log::{debug, info}; +use log::{debug, info, warn}; use serde::Deserialize; use crate::{ @@ -101,24 +101,38 @@ async fn websocket( let device_id = authed_handshake.handshake.device_id.clone(); let mut send_task = tokio::spawn(async move { - while let Ok(update) = broadcast_receiver.recv().await { - if Some(&device_id) == update.origin_device_id.as_ref() { - continue; - } + loop { + match broadcast_receiver.recv().await { + Ok(update) => { + if Some(&device_id) == update.origin_device_id.as_ref() { + continue; + } - let message = match update.message { - WebSocketServerMessage::CursorPositions(CursorPositionFromServer { clients }) => { - WebSocketServerMessage::CursorPositions(CursorPositionFromServer { - clients: clients - .into_iter() - .filter(|client| client.device_id != device_id) - .collect(), - }) + let message = match update.message { + WebSocketServerMessage::CursorPositions( + CursorPositionFromServer { clients }, + ) => WebSocketServerMessage::CursorPositions(CursorPositionFromServer { + clients: clients + .into_iter() + .filter(|client| client.device_id != device_id) + .collect(), + }), + WebSocketServerMessage::VaultUpdate(_) => update.message, + }; + + send_update_over_websocket(&message, &mut sender).await?; } - WebSocketServerMessage::VaultUpdate(_) => update.message, - }; - - send_update_over_websocket(&message, &mut sender).await?; + Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => { + warn!( + "WebSocket receiver for device {device_id} lagged by {n} messages, \ + disconnecting for re-sync" + ); + break; + } + Err(tokio::sync::broadcast::error::RecvError::Closed) => { + break; + } + } } Ok::<(), SyncServerError>(())