diff --git a/CLAUDE.md b/CLAUDE.md index 323681d9..eb33cc1d 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -206,3 +206,130 @@ scripts/clean-up.sh # Clean up after tests - `.editorconfig` at project root defines baseline formatting rules - `rustfmt.toml` and Prettier config explicitly mirror these settings - Both formatters enforce: 4-space indent (2 for YAML/MD), LF endings, final newline, trim trailing whitespace + +## Sync Logic Deep Dive + +### Document Lifecycle + +Documents go through these states on the client: + +1. **Pending create**: `metadata === undefined`, `idempotencyKey` set. File exists locally but hasn't been confirmed by the server yet. +2. **Synced**: `metadata` has `documentId`, `parentVersionId`, `hash`. The server knows about this document. +3. **Deleted**: `isDeleted === true`. Locally deleted, may or may not be synced to server yet. + +Pending creates are persisted to the local DB (via `StoredPendingDocument`) so they survive app crashes. + +### Create Flow and Idempotency + +The create flow is designed to handle interrupted creates (lost responses, app crashes): + +1. Client generates `idempotencyKey` (UUID) and persists it locally before sending the request +2. Client sends HTTP POST with the key and file content to the server +3. Server checks if the `idempotency_key` already exists — if so, returns existing document (idempotent) +4. Server stores the key in the `documents` table alongside the document version +5. When a create results in a merge (document already exists at that path), both the original key and the new key are preserved — they're on different version rows of the same document + +On reconnect, the client calls `POST /documents/resolve-keys` with all pending idempotency keys. The server maps each key to a `documentId`. The client assigns these documentIds to pending documents so they're recognized during subsequent remote fetch, preventing duplicates. + +If key resolution fails (e.g., during a SyncReset), the pending creates retry normally with the same key — the server deduplicates. + +### Server-Side Smart Create + +When a client sends a create request for a path where a document already exists: + +1. Server calls `merge_with_stored_version` instead of creating a new document +2. Content is 3-way merged using `reconcile-text` (for text files) or last-write-wins (for binary) +3. The response uses the EXISTING document's `documentId` — the client adopts it +4. The `idempotency_key` from the create request is stored on the new merged version + +### Concurrency Model (Client) + +The client uses two layers of concurrency control: + +1. **PQueue (`syncQueue`)**: Limits concurrent sync operations (configurable via `syncConcurrency`) +2. **Locks (`updatedDocumentsByPathAndKeysLocks`)**: Per-document locks keyed by `relativePath` and `documentId` + +**Critical ordering**: Locks are acquired INSIDE the queue, not outside. Acquiring locks while waiting for queue slots causes deadlocks (two operations hold locks on different keys while both waiting for queue capacity). + +``` +syncQueue.add(async () => + locks.withLock(keys, operation) // lock acquired only when queue slot is available +) +``` + +### Sync Reset and Recovery + +A `SyncResetError` is thrown when the WebSocket disconnects or sync is toggled off. This: +- Clears the sync queue +- Rejects all pending lock waiters +- On reconnect, `scheduleSyncForOfflineChanges()` runs to reconcile local state with server + +**Important**: `SyncResetError` during `syncRemotelyUpdatedFile` must be caught and logged as INFO, not ERROR. The test client exits on ERROR-level logs (except retries), so logging SyncResetError as ERROR during expected resets causes false test failures. + +### The Offline Sync Algorithm (`scheduleSyncForOfflineChanges`) + +Runs on reconnect to detect what changed while offline: + +1. **Resolve idempotency keys first**: Call `resolveIdempotencyKeys()` to map pending creates to server-side documentIds before scanning files +2. List all local files +3. For each file with metadata: schedule as update (hash comparison will skip unchanged) +4. For each file without metadata: try to match against "deleted" DB records by content hash (detects moves). If no match, schedule as create. +5. For DB records whose files don't exist locally: schedule as delete +6. Deletes and updates run first, THEN creates — to avoid the server merging creates with about-to-be-deleted docs + +### Remote Update Processing + +When the server broadcasts updates via WebSocket: + +1. `scheduleSyncForOfflineChanges()` runs first (ensures local changes are queued) +2. For each remote document update: + - If client knows the `documentId`: treat as update to existing doc + - If client doesn't know the `documentId`: it's a new remote document — create locally +3. Before creating a new local file for an unknown remote doc, check if a pending local create exists at the same `originalCreationPath`. If so, skip (the pending retry with idempotency key will handle it). + +### Known Concurrency Pitfalls + +1. **Interrupted create + rename + modify**: A create request succeeds on the server but the response is lost. The file is renamed and modified locally. On reconnect, the idempotency key resolution maps the pending doc to the server's documentId, preventing a duplicate. + +2. **Two clients create at same path**: Both send creates with different idempotency keys. Server merges them under one `documentId`. Each key is stored on its respective version row. Both clients can resolve their keys to the same document. + +3. **Lock ordering**: Multi-key locks are sorted alphabetically to prevent deadlocks. Lock acquisition is sequential (not concurrent) even for multiple keys. + +4. **`resolvedDocuments` vs `pendingDocuments`**: `resolvedDocuments` only includes docs with metadata (filters by `metadata !== undefined`). `pendingDocuments` returns docs with `metadata === undefined && !isDeleted`. Never confuse the two — scanning `resolvedDocuments` for pending docs returns nothing. + +5. **`saveInTheBackground` triggers `ensureConsistency`**: The consistency check calls `resolvedDocuments` which can throw if there are duplicate paths with the same `parallelVersion`. Avoid calling `saveInTheBackground` during operations that temporarily create inconsistent state — use `save()` directly instead. This is why `createNewPendingDocument` calls `save()` directly. + +6. **Pending doc `parallelVersion` on load**: When loading pending documents from storage, compute `parallelVersion` based on existing docs at the same path (use `getLatestDocumentByRelativePath` to find the current max). Setting all to 0 causes collisions if a resolved doc at the same path also has `parallelVersion: 0`. + +7. **Key resolution with stale documentIds**: When `resolveIdempotencyKeys` returns a documentId, check `getDocumentByDocumentId` first. If another document already has that ID (assigned through normal sync), remove the stale pending doc instead of creating a duplicate. + +8. **`resolveIdempotencyKeys` must not use `retryForever`**: The HTTP call to `/documents/resolve-keys` is an optimization. If it fails (e.g., SyncReset aborts the fetch), return an empty map and let the pending creates retry normally with their keys. Using `retryForever` can cause deadlocks — the sync pipeline stalls waiting for the retry while the WebSocket is disconnected. + +### E2E Test Configuration + +The test client (`frontend/test-client/src/cli.ts`) runs 5 iterations of 9 test configurations per process: +- 2 agents, concurrency 16 and 1, with/without deletes, with/without resets, with/without slow file events +- Tests assert: file system consistency between agents AND no duplicate content across files +- Uses `jitterScaleInSeconds: 0.75` to simulate network latency + +**Running E2E**: Requires a server running with `config-e2e.yml`. Always clean the server databases before running. Use `scripts/e2e.sh 8` for 8 concurrent processes (each running the full test suite independently). + +**E2E test harness known issue**: The named pipe mechanism for log collection can cause processes to hang when debug output exceeds the pipe buffer size. This is an infrastructure issue, not a sync bug. If processes appear stuck with logs that stopped growing, it's likely a pipe buffer issue. + +### File Operations Abstraction + +`FileOperations` has an `ensureClearPath` method that renames existing files to `(1).md`, `(2).md` etc. if a file already exists at the target path. This prevents data loss but can create apparent duplicates if the sync logic doesn't handle it. + +The `write` method does a 3-way merge: `write(path, oldContent, newContent)`. It reads the current file, computes a diff from `oldContent` to `newContent`, and applies that diff to the current file content. This preserves local changes that happened between the read and write. If the old content doesn't match what's expected, the merge can fail with "Part X not found in new content". + +### Approaches That Were Tried and Failed + +When fixing the duplicate-document-after-interrupted-create problem, several heuristic approaches were attempted before landing on idempotency keys: + +1. **Content-hash matching during remote fetch**: Scan all pending docs, read each file, hash it, and compare against incoming remote document. Failed because: (a) local content can be modified between the create and the fetch, so hashes don't match; (b) O(pending × remote) file I/O; (c) the `resolvedDocuments` getter was used instead of `pendingDocuments`, which filtered out all pending docs — a silent no-op bug. + +2. **`originalCreationPath` matching**: Track where each pending doc was originally created. When a remote doc arrives at that path, assign metadata. Failed because: (a) two different clients can create at the same path — false matches assign wrong metadata, causing 3-way merge errors on the other client; (b) adding a `deviceId` check to limit false matches broke the case where another client updated the document (changing the deviceId in the broadcast). + +3. **In-memory tracking** (e.g., `pendingLocalId`): Any in-memory state is lost on app crash. The whole point of the fix is to handle interrupted creates, which include crashes. + +The idempotency key approach works because it's: (a) crash-safe (persisted locally); (b) deterministic (UUID lookup, no heuristics); (c) server-authoritative (the server resolves keys to documentIds). diff --git a/frontend/sync-client/src/persistence/database.ts b/frontend/sync-client/src/persistence/database.ts index 2a5e901e..2f69e4bb 100644 --- a/frontend/sync-client/src/persistence/database.ts +++ b/frontend/sync-client/src/persistence/database.ts @@ -23,8 +23,15 @@ export interface StoredDocumentMetadata { hash: string; } +export interface StoredPendingDocument { + relativePath: RelativePath; + idempotencyKey: string; + originalCreationPath: RelativePath; +} + export interface StoredDatabase { documents: StoredDocumentMetadata[]; + pendingDocuments?: StoredPendingDocument[]; lastSeenUpdateId: VaultUpdateId | undefined; } @@ -39,6 +46,11 @@ export interface DocumentRecord { metadata: DocumentMetadata | undefined; isDeleted: boolean; parallelVersion: number; + /** The path when this pending document was first created locally. + * Survives renames so we can match it against server responses + * when a create request succeeded but the response was lost. */ + originalCreationPath?: RelativePath; + idempotencyKey?: string; } export class Database { @@ -60,6 +72,26 @@ export class Database { parallelVersion: 0 })) ?? []; + if (initialState.pendingDocuments) { + for (const pending of initialState.pendingDocuments) { + const existing = + this.getLatestDocumentByRelativePath( + pending.relativePath + ); + this.documents.push({ + relativePath: pending.relativePath, + metadata: undefined, + isDeleted: false, + parallelVersion: + existing !== undefined + ? existing.parallelVersion + 1 + : 0, + originalCreationPath: pending.originalCreationPath, + idempotencyKey: pending.idempotencyKey + }); + } + } + this.ensureConsistency(); this.logger.debug(`Loaded ${this.documents.length} documents`); @@ -112,6 +144,12 @@ export class Database { }); } + public get pendingDocuments(): DocumentRecord[] { + return this.documents.filter( + (doc) => doc.metadata === undefined && !doc.isDeleted + ); + } + public updateDocumentMetadata( metadata: { documentId: DocumentId; @@ -155,19 +193,25 @@ export class Database { const previousEntry = this.getLatestDocumentByRelativePath(relativePath); - const entry = { + const entry: DocumentRecord = { relativePath, metadata: undefined, isDeleted: false, parallelVersion: previousEntry?.parallelVersion === undefined ? 0 - : previousEntry.parallelVersion + 1 + : previousEntry.parallelVersion + 1, + originalCreationPath: relativePath, + idempotencyKey: crypto.randomUUID() }; this.documents.push(entry); - // no need to save as we only save documents which have metadata + // Save without consistency check — pending docs can't violate + // the documentId uniqueness invariant since they have no metadata. + void this.save().catch((error: unknown) => { + this.logger.error(`Error saving data: ${error}`); + }); return entry; } @@ -222,6 +266,10 @@ export class Database { this.saveInTheBackground(); } + public containsDocument(target: DocumentRecord): boolean { + return this.documents.includes(target); + } + public getLastSeenUpdateId(): VaultUpdateId { return this.lastSeenUpdateIds.min; } @@ -256,6 +304,13 @@ export class Database { ...metadata! // `resolvedDocuments` only returns docs with metadata set }) ), + pendingDocuments: this.pendingDocuments.map( + ({ relativePath, idempotencyKey, originalCreationPath }) => ({ + relativePath, + idempotencyKey: idempotencyKey!, + originalCreationPath: originalCreationPath! + }) + ), lastSeenUpdateId: this.lastSeenUpdateIds.min }); } diff --git a/frontend/sync-client/src/services/sync-service.ts b/frontend/sync-client/src/services/sync-service.ts index 647ac8da..a0b67830 100644 --- a/frontend/sync-client/src/services/sync-service.ts +++ b/frontend/sync-client/src/services/sync-service.ts @@ -67,10 +67,12 @@ export class SyncService { public async create({ relativePath, - contentBytes + contentBytes, + idempotencyKey }: { relativePath: RelativePath; contentBytes: Uint8Array; + idempotencyKey?: string; }): Promise { return this.retryForever(async () => { const formData = new FormData(); @@ -81,6 +83,10 @@ export class SyncService { new Blob([new Uint8Array(contentBytes)]) ); + if (idempotencyKey !== undefined) { + formData.append("idempotency_key", idempotencyKey); + } + this.logger.debug( `Creating document with relative path ${relativePath}` ); @@ -362,6 +368,52 @@ export class SyncService { }); } + public async resolveIdempotencyKeys( + keys: string[] + ): Promise> { + this.logger.debug( + `Resolving ${keys.length} idempotency keys` + ); + + try { + const response = await this.client( + this.getUrl("/documents/resolve-keys"), + { + method: "POST", + body: JSON.stringify({ idempotencyKeys: keys }), + headers: this.getDefaultHeaders({ type: "json" }) + } + ); + + if (!response.ok) { + this.logger.warn( + `Failed to resolve idempotency keys: ${await SyncService.errorFromResponse( + response + )}` + ); + return new Map(); + } + + const result: { resolved: Record } = + (await response.json()) as { resolved: Record }; // eslint-disable-line @typescript-eslint/no-unsafe-type-assertion + + const resolved = new Map( + Object.entries(result.resolved) + ); + + this.logger.debug( + `Resolved ${resolved.size}/${keys.length} idempotency keys` + ); + + return resolved; + } catch (e) { + this.logger.warn( + `Failed to resolve idempotency keys: ${e}` + ); + return new Map(); + } + } + public async ping(): Promise { this.logger.debug("Pinging server"); const response = await this.pingClient(this.getUrl("/ping"), { diff --git a/frontend/sync-client/src/services/types/ClientCursors.ts b/frontend/sync-client/src/services/types/ClientCursors.ts index e8c9b93d..5b1ec040 100644 --- a/frontend/sync-client/src/services/types/ClientCursors.ts +++ b/frontend/sync-client/src/services/types/ClientCursors.ts @@ -1,8 +1,4 @@ // This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. import type { DocumentWithCursors } from "./DocumentWithCursors"; -export interface ClientCursors { - userName: string; - deviceId: string; - documentsWithCursors: DocumentWithCursors[]; -} +export interface ClientCursors { userName: string, deviceId: string, documentsWithCursors: DocumentWithCursors[], } diff --git a/frontend/sync-client/src/services/types/CreateDocumentVersion.ts b/frontend/sync-client/src/services/types/CreateDocumentVersion.ts index 17103be5..d4ed2831 100644 --- a/frontend/sync-client/src/services/types/CreateDocumentVersion.ts +++ b/frontend/sync-client/src/services/types/CreateDocumentVersion.ts @@ -1,6 +1,3 @@ // This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. -export interface CreateDocumentVersion { - relative_path: string; - content: number[]; -} +export interface CreateDocumentVersion { relative_path: string, content: number[], } diff --git a/frontend/sync-client/src/services/types/CursorPositionFromClient.ts b/frontend/sync-client/src/services/types/CursorPositionFromClient.ts index ee937f4e..78823b5d 100644 --- a/frontend/sync-client/src/services/types/CursorPositionFromClient.ts +++ b/frontend/sync-client/src/services/types/CursorPositionFromClient.ts @@ -1,6 +1,4 @@ // This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. import type { DocumentWithCursors } from "./DocumentWithCursors"; -export interface CursorPositionFromClient { - documentsWithCursors: DocumentWithCursors[]; -} +export interface CursorPositionFromClient { documentsWithCursors: DocumentWithCursors[], } diff --git a/frontend/sync-client/src/services/types/CursorPositionFromServer.ts b/frontend/sync-client/src/services/types/CursorPositionFromServer.ts index 52a24f27..ed6ac7b2 100644 --- a/frontend/sync-client/src/services/types/CursorPositionFromServer.ts +++ b/frontend/sync-client/src/services/types/CursorPositionFromServer.ts @@ -1,6 +1,4 @@ // This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. import type { ClientCursors } from "./ClientCursors"; -export interface CursorPositionFromServer { - clients: ClientCursors[]; -} +export interface CursorPositionFromServer { clients: ClientCursors[], } diff --git a/frontend/sync-client/src/services/types/CursorSpan.ts b/frontend/sync-client/src/services/types/CursorSpan.ts index 2cc2b7fc..7424067c 100644 --- a/frontend/sync-client/src/services/types/CursorSpan.ts +++ b/frontend/sync-client/src/services/types/CursorSpan.ts @@ -1,6 +1,3 @@ // This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. -export interface CursorSpan { - start: number; - end: number; -} +export interface CursorSpan { start: number, end: number, } diff --git a/frontend/sync-client/src/services/types/DeleteDocumentVersion.ts b/frontend/sync-client/src/services/types/DeleteDocumentVersion.ts index 99ecc9e7..5d4bad98 100644 --- a/frontend/sync-client/src/services/types/DeleteDocumentVersion.ts +++ b/frontend/sync-client/src/services/types/DeleteDocumentVersion.ts @@ -1,5 +1,3 @@ // This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. -export interface DeleteDocumentVersion { - relativePath: string; -} +export interface DeleteDocumentVersion { relativePath: string, } diff --git a/frontend/sync-client/src/services/types/DocumentUpdateResponse.ts b/frontend/sync-client/src/services/types/DocumentUpdateResponse.ts index 7fd06c7a..418117e6 100644 --- a/frontend/sync-client/src/services/types/DocumentUpdateResponse.ts +++ b/frontend/sync-client/src/services/types/DocumentUpdateResponse.ts @@ -5,6 +5,4 @@ import type { DocumentVersionWithoutContent } from "./DocumentVersionWithoutCont /** * Response to an update document request. */ -export type DocumentUpdateResponse = - | ({ type: "FastForwardUpdate" } & DocumentVersionWithoutContent) - | ({ type: "MergingUpdate" } & DocumentVersion); +export type DocumentUpdateResponse = { "type": "FastForwardUpdate" } & DocumentVersionWithoutContent | { "type": "MergingUpdate" } & DocumentVersion; diff --git a/frontend/sync-client/src/services/types/DocumentVersion.ts b/frontend/sync-client/src/services/types/DocumentVersion.ts index 3b9aa37b..3d50ae65 100644 --- a/frontend/sync-client/src/services/types/DocumentVersion.ts +++ b/frontend/sync-client/src/services/types/DocumentVersion.ts @@ -1,12 +1,3 @@ // This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. -export interface DocumentVersion { - vaultUpdateId: number; - documentId: string; - relativePath: string; - updatedDate: string; - contentBase64: string; - isDeleted: boolean; - userId: string; - deviceId: string; -} +export interface DocumentVersion { vaultUpdateId: number, documentId: string, relativePath: string, updatedDate: string, contentBase64: string, isDeleted: boolean, userId: string, deviceId: string, } diff --git a/frontend/sync-client/src/services/types/DocumentVersionWithoutContent.ts b/frontend/sync-client/src/services/types/DocumentVersionWithoutContent.ts index 4b24e7c5..af064db8 100644 --- a/frontend/sync-client/src/services/types/DocumentVersionWithoutContent.ts +++ b/frontend/sync-client/src/services/types/DocumentVersionWithoutContent.ts @@ -1,12 +1,3 @@ // This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. -export interface DocumentVersionWithoutContent { - vaultUpdateId: number; - documentId: string; - relativePath: string; - updatedDate: string; - isDeleted: boolean; - userId: string; - deviceId: string; - contentSize: number; -} +export interface DocumentVersionWithoutContent { vaultUpdateId: number, documentId: string, relativePath: string, updatedDate: string, isDeleted: boolean, userId: string, deviceId: string, contentSize: number, } diff --git a/frontend/sync-client/src/services/types/DocumentWithCursors.ts b/frontend/sync-client/src/services/types/DocumentWithCursors.ts index dcfe6e2d..e7dad119 100644 --- a/frontend/sync-client/src/services/types/DocumentWithCursors.ts +++ b/frontend/sync-client/src/services/types/DocumentWithCursors.ts @@ -1,9 +1,4 @@ // This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. import type { CursorSpan } from "./CursorSpan"; -export interface DocumentWithCursors { - vault_update_id: number | null; - document_id: string; - relative_path: string; - cursors: CursorSpan[]; -} +export interface DocumentWithCursors { vault_update_id: number | null, document_id: string, relative_path: string, cursors: CursorSpan[], } diff --git a/frontend/sync-client/src/services/types/FetchLatestDocumentsResponse.ts b/frontend/sync-client/src/services/types/FetchLatestDocumentsResponse.ts index 315d701a..3be625bd 100644 --- a/frontend/sync-client/src/services/types/FetchLatestDocumentsResponse.ts +++ b/frontend/sync-client/src/services/types/FetchLatestDocumentsResponse.ts @@ -4,10 +4,8 @@ import type { DocumentVersionWithoutContent } from "./DocumentVersionWithoutCont /** * Response to a fetch latest documents request. */ -export interface FetchLatestDocumentsResponse { - latestDocuments: DocumentVersionWithoutContent[]; - /** - * The update ID of the latest document in the response. - */ - lastUpdateId: bigint; -} +export interface FetchLatestDocumentsResponse { latestDocuments: DocumentVersionWithoutContent[], +/** + * The update ID of the latest document in the response. + */ +lastUpdateId: bigint, } diff --git a/frontend/sync-client/src/services/types/PingResponse.ts b/frontend/sync-client/src/services/types/PingResponse.ts index f96520e9..ba8ceb48 100644 --- a/frontend/sync-client/src/services/types/PingResponse.ts +++ b/frontend/sync-client/src/services/types/PingResponse.ts @@ -3,23 +3,22 @@ /** * Response to a ping request. */ -export interface PingResponse { - /** - * Semantic version of the server. - */ - serverVersion: string; - /** - * Whether the client is authenticated based on the sent Authorization - * header. - */ - isAuthenticated: boolean; - /** - * List of file extensions that are allowed to be merged. - */ - mergeableFileExtensions: string[]; - /** - * API version ensuring backwards & forwards compatibility between the client - * and server. - */ - supportedApiVersion: number; -} +export interface PingResponse { +/** + * Semantic version of the server. + */ +serverVersion: string, +/** + * Whether the client is authenticated based on the sent Authorization + * header. + */ +isAuthenticated: boolean, +/** + * List of file extensions that are allowed to be merged. + */ +mergeableFileExtensions: string[], +/** + * API version ensuring backwards & forwards compatibility between the client + * and server. + */ +supportedApiVersion: number, } diff --git a/frontend/sync-client/src/services/types/SerializedError.ts b/frontend/sync-client/src/services/types/SerializedError.ts index ec1c4503..4389289e 100644 --- a/frontend/sync-client/src/services/types/SerializedError.ts +++ b/frontend/sync-client/src/services/types/SerializedError.ts @@ -1,7 +1,3 @@ // This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. -export interface SerializedError { - errorType: string; - message: string; - causes: string[]; -} +export interface SerializedError { errorType: string, message: string, causes: string[], } diff --git a/frontend/sync-client/src/services/types/UpdateTextDocumentVersion.ts b/frontend/sync-client/src/services/types/UpdateTextDocumentVersion.ts index 46f36bd0..aeb69f5a 100644 --- a/frontend/sync-client/src/services/types/UpdateTextDocumentVersion.ts +++ b/frontend/sync-client/src/services/types/UpdateTextDocumentVersion.ts @@ -1,7 +1,3 @@ // This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. -export interface UpdateTextDocumentVersion { - parentVersionId: number; - relativePath: string; - content: (number | string)[]; -} +export interface UpdateTextDocumentVersion { parentVersionId: number, relativePath: string, content: (number | string)[], } diff --git a/frontend/sync-client/src/services/types/WebSocketClientMessage.ts b/frontend/sync-client/src/services/types/WebSocketClientMessage.ts index 9608f3af..5765a0d0 100644 --- a/frontend/sync-client/src/services/types/WebSocketClientMessage.ts +++ b/frontend/sync-client/src/services/types/WebSocketClientMessage.ts @@ -2,6 +2,4 @@ import type { CursorPositionFromClient } from "./CursorPositionFromClient"; import type { WebSocketHandshake } from "./WebSocketHandshake"; -export type WebSocketClientMessage = - | ({ type: "handshake" } & WebSocketHandshake) - | ({ type: "cursorPositions" } & CursorPositionFromClient); +export type WebSocketClientMessage = { "type": "handshake" } & WebSocketHandshake | { "type": "cursorPositions" } & CursorPositionFromClient; diff --git a/frontend/sync-client/src/services/types/WebSocketHandshake.ts b/frontend/sync-client/src/services/types/WebSocketHandshake.ts index a2910f49..d25651f9 100644 --- a/frontend/sync-client/src/services/types/WebSocketHandshake.ts +++ b/frontend/sync-client/src/services/types/WebSocketHandshake.ts @@ -1,7 +1,3 @@ // This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. -export interface WebSocketHandshake { - token: string; - deviceId: string; - lastSeenVaultUpdateId: number | null; -} +export interface WebSocketHandshake { token: string, deviceId: string, lastSeenVaultUpdateId: number | null, } diff --git a/frontend/sync-client/src/services/types/WebSocketServerMessage.ts b/frontend/sync-client/src/services/types/WebSocketServerMessage.ts index fd250b7b..45e37358 100644 --- a/frontend/sync-client/src/services/types/WebSocketServerMessage.ts +++ b/frontend/sync-client/src/services/types/WebSocketServerMessage.ts @@ -2,6 +2,4 @@ import type { CursorPositionFromServer } from "./CursorPositionFromServer"; import type { WebSocketVaultUpdate } from "./WebSocketVaultUpdate"; -export type WebSocketServerMessage = - | ({ type: "vaultUpdate" } & WebSocketVaultUpdate) - | ({ type: "cursorPositions" } & CursorPositionFromServer); +export type WebSocketServerMessage = { "type": "vaultUpdate" } & WebSocketVaultUpdate | { "type": "cursorPositions" } & CursorPositionFromServer; diff --git a/frontend/sync-client/src/services/types/WebSocketVaultUpdate.ts b/frontend/sync-client/src/services/types/WebSocketVaultUpdate.ts index f1ea0f80..39e03b6f 100644 --- a/frontend/sync-client/src/services/types/WebSocketVaultUpdate.ts +++ b/frontend/sync-client/src/services/types/WebSocketVaultUpdate.ts @@ -1,7 +1,4 @@ // This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. import type { DocumentVersionWithoutContent } from "./DocumentVersionWithoutContent"; -export interface WebSocketVaultUpdate { - documents: DocumentVersionWithoutContent[]; - isInitialSync: boolean; -} +export interface WebSocketVaultUpdate { documents: DocumentVersionWithoutContent[], isInitialSync: boolean, } diff --git a/frontend/sync-client/src/sync-operations/cursor-tracker.ts b/frontend/sync-client/src/sync-operations/cursor-tracker.ts index 589e4b3b..abbfc973 100644 --- a/frontend/sync-client/src/sync-operations/cursor-tracker.ts +++ b/frontend/sync-client/src/sync-operations/cursor-tracker.ts @@ -10,7 +10,7 @@ import { hash } from "../utils/hash"; import type { FileChangeNotifier } from "./file-change-notifier"; import { Lock } from "../utils/data-structures/locks"; import { EventListeners } from "../utils/data-structures/event-listeners"; -import { Logger } from "../tracing/logger"; +import type { Logger } from "../tracing/logger"; // Cursor positions are updated separately from documents. However, a given cursor position is only // valid within a certain version of the document it belongs to. This class tracks previous and the latest diff --git a/frontend/sync-client/src/sync-operations/syncer.ts b/frontend/sync-client/src/sync-operations/syncer.ts index 05e3bdf0..7624d0a8 100644 --- a/frontend/sync-client/src/sync-operations/syncer.ts +++ b/frontend/sync-client/src/sync-operations/syncer.ts @@ -89,15 +89,33 @@ export class Syncer { public async syncLocallyCreatedFile( relativePath: RelativePath ): Promise { - // check whether someone else has already created the document in the database - if ( - this.database.getLatestDocumentByRelativePath(relativePath) - ?.isDeleted === false - ) { - // This is likely a consequence of us creating a file because of a remote update - // which triggered a local create, so we don't need to do anything here. + const existingDocument = + this.database.getLatestDocumentByRelativePath(relativePath); + + // Check whether someone else has already created the document in the database + if (existingDocument?.isDeleted === false) { + if (existingDocument.metadata !== undefined) { + // Fully synced document — likely created by a remote update + // which triggered a local create, so we don't need to do anything here. + this.logger.debug( + `Document ${relativePath} already exists in the database with metadata, skipping` + ); + return; + } + + // Pending create (interrupted by a sync reset or duplicate file watcher event) + // — reuse the existing record and retry the sync. this.logger.debug( - `Document ${relativePath} already exists in the database, skipping` + `Document ${relativePath} has a pending create that was interrupted, retrying sync` + ); + await this.enqueueSyncOperation( + async () => + this.unrestrictedSyncer.unrestrictedSyncLocallyCreatedOrUpdatedFile( + { + document: existingDocument + } + ), + [relativePath] ); return; } @@ -118,10 +136,10 @@ export class Syncer { public async syncLocallyDeletedFile( relativePath: RelativePath ): Promise { - let document = + const document = this.database.getLatestDocumentByRelativePath(relativePath); - if (document == null || document.isDeleted === true) { + if (document == null || document.isDeleted) { // This is must be a consequence of us deleting a file because of a remote update // which triggered a local delete, so we don't need to do anything here. this.logger.debug( @@ -199,6 +217,17 @@ export class Syncer { return; } + // If a create operation is already in progress for this document (no metadata + // yet), skip the HTTP sync. The create operation will handle syncing the content. + // We've already updated the document's path in the database above if needed, + // so the create operation will use the correct path. + if (document.metadata === undefined) { + this.logger.debug( + `Document ${relativePath} has a pending create operation, skipping HTTP sync` + ); + return; + } + await this.enqueueSyncOperation( async () => this.unrestrictedSyncer.unrestrictedSyncLocallyCreatedOrUpdatedFile( @@ -265,7 +294,15 @@ export class Syncer { this._isFirstSyncComplete = true; } catch (e) { - this.logger.error(`Failed to sync remotely updated file: ${e}`); + if (e instanceof SyncResetError) { + this.logger.info( + "Sync reset during remote update processing" + ); + } else { + this.logger.error( + `Failed to sync remotely updated file: ${e}` + ); + } } } @@ -309,6 +346,8 @@ export class Syncer { } private async internalScheduleSyncForOfflineChanges(): Promise { + await this.unrestrictedSyncer.resolveIdempotencyKeys(); + const allLocalFiles = await this.operations.listFilesRecursively(); this.logger.info( `Scheduling sync for ${allLocalFiles.length} local files` @@ -453,9 +492,25 @@ export class Syncer { operation: () => Promise, keys: (string | undefined | null)[] ): Promise { - return this.updatedDocumentsByPathAndKeysLocks.withLock( - keys.filter((k) => k !== undefined && k !== null), - async () => this.syncQueue.add(operation) + const filteredKeys = keys.filter((k) => k !== undefined && k !== null); + + // IMPORTANT: We must NOT hold locks while waiting for a queue slot. + // If we did, we could deadlock when two concurrent operations hold + // locks on different keys while both waiting for queue capacity. + // + // Instead, we acquire locks INSIDE the queued operation. This ensures: + // 1. We only hold locks during actual operation execution + // 2. The queue serializes access to queue slots + // 3. Locks serialize access to the same document/path + // + // The result type needs special handling since syncQueue.add() can + // return undefined when the queue is paused/cleared. + const result = await this.syncQueue.add(async () => + this.updatedDocumentsByPathAndKeysLocks.withLock( + filteredKeys, + operation + ) ); + return result as T; } } diff --git a/frontend/sync-client/src/sync-operations/unrestricted-syncer.ts b/frontend/sync-client/src/sync-operations/unrestricted-syncer.ts index d32e983e..a41bf8c2 100644 --- a/frontend/sync-client/src/sync-operations/unrestricted-syncer.ts +++ b/frontend/sync-client/src/sync-operations/unrestricted-syncer.ts @@ -57,6 +57,61 @@ export class UnrestrictedSyncer { }); } + public async resolveIdempotencyKeys(): Promise { + const pendingDocs = this.database.pendingDocuments; + if (pendingDocs.length === 0) { + return; + } + + const keys = pendingDocs + .map((d) => d.idempotencyKey) + // eslint-disable-next-line no-restricted-syntax -- Type narrowing, not removing a specific item + .filter((k): k is string => k !== undefined); + if (keys.length === 0) { + return; + } + + this.logger.debug( + `Resolving ${keys.length} pending idempotency keys` + ); + + const resolved = + await this.syncService.resolveIdempotencyKeys(keys); + + for (const doc of pendingDocs) { + if ( + doc.idempotencyKey !== undefined && + resolved.has(doc.idempotencyKey) + ) { + const documentId = resolved.get(doc.idempotencyKey)!; // eslint-disable-line @typescript-eslint/no-non-null-assertion + + // Skip if this documentId is already assigned to another document + const existing = + this.database.getDocumentByDocumentId(documentId); + if (existing !== undefined) { + this.logger.debug( + `Document ${documentId} already exists at ${existing.relativePath}, removing stale pending doc at ${doc.relativePath}` + ); + this.database.removeDocument(doc); + continue; + } + + this.logger.info( + `Resolved idempotency key ${doc.idempotencyKey} to document ${documentId} for ${doc.relativePath}` + ); + this.database.updateDocumentMetadata( + { + documentId, + parentVersionId: 0, + hash: "", + remoteRelativePath: doc.relativePath + }, + doc + ); + } + } + } + public async unrestrictedSyncLocallyCreatedOrUpdatedFile({ oldPath, // We use the same code path for both local and remote updates. We need to force the update @@ -108,7 +163,8 @@ export class UnrestrictedSyncer { if (document.metadata === undefined) { response = await this.syncService.create({ relativePath: originalRelativePath, - contentBytes + contentBytes, + idempotencyKey: document.idempotencyKey }); await this.handleMaybeMergingResponse({ @@ -247,6 +303,18 @@ export class UnrestrictedSyncer { relativePath: document.relativePath }); + // A concurrent merge operation may have removed this document from the + // database while we were waiting for the delete response. In that case, + // the merge already handled the state transition and we should not + // update metadata (which would fail anyway since the document is gone). + if (!this.database.containsDocument(document)) { + this.logger.debug( + `Document ${document.relativePath} was removed from database by a concurrent operation, skipping metadata update after delete` + ); + this.database.addSeenUpdateId(response.vaultUpdateId); + return; + } + this.database.updateDocumentMetadata( { documentId: response.documentId, @@ -474,6 +542,8 @@ export class UnrestrictedSyncer { let actualPath = document.relativePath; + let existingContentBytes: Uint8Array | undefined; + if (isCreate) { // We have a file locally that got moved by another client to the same path as the one we're trying to create. // The server returns a merging update for the document ID that already exists locally (but at another path). @@ -482,21 +552,53 @@ export class UnrestrictedSyncer { const existingDocument = this.database.getDocumentByDocumentId( response.documentId ); - if (existingDocument !== undefined) { + // If existingDocument === document, then a previous sync operation already + // assigned this documentId to our document. We don't need to merge - just + // continue to update the metadata below. + if (existingDocument !== undefined && existingDocument !== document) { this.logger.info( `Merging existing document ${existingDocument.relativePath} into ${document.relativePath } after concurrent move & creation` ); if (!existingDocument.isDeleted) { this.database.delete(existingDocument.relativePath); // make sure syncLocallyDeletedFile doesn't actually schedule deleting the new file + + try { + existingContentBytes = await this.operations.read( + existingDocument.relativePath + ); + } catch (e) { + if (e instanceof FileNotFoundError) { + return; + } + throw e; + } + this.database.removeDocument(existingDocument); - await this.operations.move(existingDocument.relativePath, document.relativePath); + await this.operations.delete(existingDocument.relativePath); + } else { this.database.removeDocument(existingDocument); } } } + // A document's documentId should never change once assigned. If the response has a + // different documentId than what the document already has, it means the file was + // renamed during the sync operation and the response is for a different document. + // We should bail out and let subsequent sync operations fix the state. + if ( + document.metadata?.documentId !== undefined && + document.metadata.documentId !== response.documentId + ) { + this.logger.info( + `Document ${document.relativePath} already has documentId ${document.metadata.documentId}, ` + + `but response has documentId ${response.documentId}. Ignoring response to prevent documentId corruption.` + ); + this.database.addSeenUpdateId(response.vaultUpdateId); + return; + } + // this can't happen on the creation path as we can only get a merging response if a document already exists remotely on the same path if (response.relativePath != originalRelativePath) { actualPath = response.relativePath; @@ -530,6 +632,17 @@ export class UnrestrictedSyncer { originalContentBytes, responseBytes ); + + if (existingContentBytes !== undefined) { + // the merge case is only always for text files, so don't mind that we have to provide a byte array here + await this.operations.write( + actualPath, + new Uint8Array(0), + existingContentBytes + ); + } + + await this.updateCache( response.vaultUpdateId, responseBytes, diff --git a/frontend/sync-client/src/utils/debugging/log-to-console.ts b/frontend/sync-client/src/utils/debugging/log-to-console.ts index c8940536..4c9db250 100644 --- a/frontend/sync-client/src/utils/debugging/log-to-console.ts +++ b/frontend/sync-client/src/utils/debugging/log-to-console.ts @@ -12,11 +12,11 @@ const COLORS = { export function logToConsole( logger: Logger, - { useColors = true }: { useColors?: boolean } = {} + { useColors = true, prefix }: { useColors?: boolean; prefix?: string } = {} ): void { logger.onLogEmitted.add((logLine: LogLine) => { const timestamp = logLine.timestamp.toISOString(); - const message = logLine.message; + const {message} = logLine; let color = ""; let reset = ""; @@ -38,7 +38,8 @@ export function logToConsole( } } - const formatted = `${timestamp} ${color}${logLine.level}${reset} ${message}`; + const prefixPart = prefix !== undefined ? `${prefix} ` : ""; + const formatted = `${prefixPart}${timestamp} ${color}${logLine.level}${reset} ${message}`; switch (logLine.level) { case LogLevel.ERROR: diff --git a/frontend/test-client/src/agent/mock-client.ts b/frontend/test-client/src/agent/mock-client.ts index 7623a7c6..17f17e80 100644 --- a/frontend/test-client/src/agent/mock-client.ts +++ b/frontend/test-client/src/agent/mock-client.ts @@ -83,18 +83,18 @@ export class MockClient extends debugging.InMemoryFileSystem { .map((part) => part.trim()); const newParts = newContent.split(" ").map((part) => part.trim()); existingParts.forEach((part) => - // all changes should be additive - { - assert( - newParts.includes(part), - `Part ${part} not found in new content: ${newContent}` - ); - } + // all changes should be additive + { + assert( + newParts.includes(part), + `Part ${part} not found in new content: '${newContent}'` + ); + } ); } this.client.logger.info( - `Updated file ${path} with:\n current content: ${currentContent}\n new content: ${newContent}` + `Updated file ${path} with:\n current content: '${currentContent}'\n new content: '${newContent}'` ); this.executeFileOperation( @@ -137,7 +137,7 @@ export class MockClient extends debugging.InMemoryFileSystem { } ): Promise { this.client.logger.info( - `Deleting file: ${path} with:\n content ${new TextDecoder().decode(this.files.get(path))}` + `Deleting file: ${path} with:\n content '${new TextDecoder().decode(this.files.get(path))}'` ); this.files.delete(path); diff --git a/sync-server/src/app_state/database.rs b/sync-server/src/app_state/database.rs index 7c2b440c..d0565be7 100644 --- a/sync-server/src/app_state/database.rs +++ b/sync-server/src/app_state/database.rs @@ -325,7 +325,8 @@ impl Database { is_deleted, user_id, device_id, - has_been_merged + has_been_merged, + idempotency_key from latest_document_versions where relative_path = ? and is_deleted = false order by vault_update_id desc -- `latest_document_versions` only contains a single latest version of each document, however, @@ -365,7 +366,8 @@ impl Database { is_deleted, user_id, device_id, - has_been_merged + has_been_merged, + idempotency_key from latest_document_versions where document_id = ? "#, @@ -400,7 +402,8 @@ impl Database { is_deleted, user_id, device_id, - has_been_merged + has_been_merged, + idempotency_key from documents where vault_update_id = ?"#, vault_update_id @@ -434,9 +437,10 @@ impl Database { content, is_deleted, user_id, - device_id + device_id, + idempotency_key ) - values (?, ?, ?, ?, ?, ?, ?, ?) + values (?, ?, ?, ?, ?, ?, ?, ?, ?) "#, version.vault_update_id, document_id, @@ -445,7 +449,8 @@ impl Database { version.content, version.is_deleted, version.user_id, - version.device_id + version.device_id, + version.idempotency_key ); if let Some(mut transaction) = transaction { @@ -481,6 +486,44 @@ impl Database { Ok(()) } + pub async fn get_document_by_idempotency_key( + &self, + vault: &VaultId, + idempotency_key: &str, + transaction: Option<&mut Transaction<'_>>, + ) -> Result> { + let query = sqlx::query_as!( + StoredDocumentVersion, + r#" + select + d.vault_update_id, + d.document_id as "document_id: Hyphenated", + d.relative_path, + d.updated_date as "updated_date: chrono::DateTime", + d.content, + d.is_deleted, + d.user_id, + d.device_id, + d.has_been_merged, + d.idempotency_key + from latest_document_versions d + inner join documents d2 on d.document_id = d2.document_id + where d2.idempotency_key = ? + limit 1 + "#, + idempotency_key + ); + + if let Some(transaction) = transaction { + query.fetch_optional(&mut **transaction).await + } else { + query + .fetch_optional(&self.get_connection_pool(vault).await?) + .await + } + .context("Cannot fetch document by idempotency key") + } + /// Cleanup idle connection pools that haven't been accessed in more than 5 minutes async fn cleanup_idle_pools(&self) { let mut pools = self.connection_pools.lock().await; diff --git a/sync-server/src/app_state/database/migrations/20260314000000_add_idempotency_key.sql b/sync-server/src/app_state/database/migrations/20260314000000_add_idempotency_key.sql new file mode 100644 index 00000000..0ff62743 --- /dev/null +++ b/sync-server/src/app_state/database/migrations/20260314000000_add_idempotency_key.sql @@ -0,0 +1 @@ +ALTER TABLE documents ADD COLUMN idempotency_key TEXT; diff --git a/sync-server/src/app_state/database/models.rs b/sync-server/src/app_state/database/models.rs index a216125a..6e39ca58 100644 --- a/sync-server/src/app_state/database/models.rs +++ b/sync-server/src/app_state/database/models.rs @@ -22,6 +22,7 @@ pub struct StoredDocumentVersion { pub device_id: DeviceId, #[allow(dead_code)] // This is for manual analysis pub has_been_merged: bool, + pub idempotency_key: Option, } impl PartialEq for StoredDocumentVersion { diff --git a/sync-server/src/server.rs b/sync-server/src/server.rs index 01b09cf6..2d4a0b6b 100644 --- a/sync-server/src/server.rs +++ b/sync-server/src/server.rs @@ -9,6 +9,7 @@ mod fetch_latest_documents; mod index; mod ping; mod requests; +mod resolve_keys; mod responses; mod update_document; mod websocket; @@ -108,6 +109,10 @@ fn get_authed_routes(app_state: AppState) -> Router { "/vaults/:vault_id/documents", post(create_document::create_document), ) + .route( + "/vaults/:vault_id/documents/resolve-keys", + post(resolve_keys::resolve_keys), + ) .route( "/vaults/:vault_id/documents/:document_id", get(fetch_latest_document_version::fetch_latest_document_version), diff --git a/sync-server/src/server/create_document.rs b/sync-server/src/server/create_document.rs index a5ab451f..90e08b30 100644 --- a/sync-server/src/server/create_document.rs +++ b/sync-server/src/server/create_document.rs @@ -1,3 +1,4 @@ +use anyhow::Context; use axum::{ Extension, Json, extract::{Path, State}, @@ -47,6 +48,25 @@ pub async fn create_document( .await .map_err(server_error)?; + if let Some(ref idempotency_key) = request.idempotency_key { + let existing = state + .database + .get_document_by_idempotency_key(&vault_id, idempotency_key, Some(&mut transaction)) + .await + .map_err(server_error)?; + if let Some(existing) = existing { + info!("Found existing document with idempotency key `{idempotency_key}`, returning existing document"); + transaction + .rollback() + .await + .context("Failed to roll back transaction") + .map_err(server_error)?; + return Ok(Json(DocumentUpdateResponse::FastForwardUpdate( + existing.into(), + ))); + } + } + let sanitized_relative_path = sanitize_path(&request.relative_path); let latest_version = state @@ -74,6 +94,7 @@ pub async fn create_document( &sanitized_relative_path, request.content.contents.to_vec(), transaction, + request.idempotency_key, ) .await; } @@ -111,6 +132,7 @@ pub async fn create_document( user_id: user.name, device_id: device_id.0, has_been_merged: false, + idempotency_key: request.idempotency_key, }; state diff --git a/sync-server/src/server/delete_document.rs b/sync-server/src/server/delete_document.rs index e126d6b5..3bcd31bb 100644 --- a/sync-server/src/server/delete_document.rs +++ b/sync-server/src/server/delete_document.rs @@ -84,6 +84,7 @@ pub async fn delete_document( user_id: user.name, device_id: device_id.0, has_been_merged: false, + idempotency_key: None, }; state diff --git a/sync-server/src/server/requests.rs b/sync-server/src/server/requests.rs index 386e682d..4c486284 100644 --- a/sync-server/src/server/requests.rs +++ b/sync-server/src/server/requests.rs @@ -14,6 +14,8 @@ pub struct CreateDocumentVersion { #[ts(as = "Vec")] #[form_data(limit = "unlimited")] pub content: FieldData, + + pub idempotency_key: Option, } #[derive(Debug, TryFromMultipart)] diff --git a/sync-server/src/server/resolve_keys.rs b/sync-server/src/server/resolve_keys.rs new file mode 100644 index 00000000..a0be6bce --- /dev/null +++ b/sync-server/src/server/resolve_keys.rs @@ -0,0 +1,63 @@ +use std::collections::HashMap; + +use axum::{ + Json, + extract::{Path, State}, +}; +use log::debug; +use serde::{Deserialize, Serialize}; + +use crate::{ + app_state::{AppState, database::models::VaultId}, + errors::{SyncServerError, server_error}, + utils::normalize::normalize, +}; + +#[derive(Deserialize)] +pub struct ResolveKeysPathParams { + #[serde(deserialize_with = "normalize")] + vault_id: VaultId, +} + +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct ResolveKeysRequest { + pub idempotency_keys: Vec, +} + +#[derive(Debug, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct ResolveKeysResponse { + /// Maps `idempotency_key` -> `document_id` for keys that were found + pub resolved: HashMap, +} + +#[axum::debug_handler] +pub async fn resolve_keys( + Path(ResolveKeysPathParams { vault_id }): Path, + State(state): State, + Json(request): Json, +) -> Result, SyncServerError> { + debug!( + "Resolving {} idempotency keys in vault `{vault_id}`", + request.idempotency_keys.len() + ); + + let mut resolved = HashMap::new(); + + for key in &request.idempotency_keys { + let document = state + .database + .get_document_by_idempotency_key(&vault_id, key, None) + .await + .map_err(server_error)?; + + if let Some(doc) = document { + resolved.insert(key.clone(), doc.document_id.to_string()); + } + } + + debug!("Resolved {}/{} idempotency keys", resolved.len(), request.idempotency_keys.len()); + + Ok(Json(ResolveKeysResponse { resolved })) +} diff --git a/sync-server/src/server/update_document.rs b/sync-server/src/server/update_document.rs index b5d9bf0a..a07aec54 100644 --- a/sync-server/src/server/update_document.rs +++ b/sync-server/src/server/update_document.rs @@ -182,6 +182,7 @@ async fn update_document( &sanitized_relative_path, content, transaction, + None, ) .await } @@ -198,6 +199,7 @@ pub async fn merge_with_stored_version( sanitized_relative_path: &str, content: Vec, mut transaction: Transaction<'_>, + idempotency_key: Option, ) -> Result, SyncServerError> { // Return the latest version if the content and path are the same as the latest // version @@ -290,6 +292,7 @@ pub async fn merge_with_stored_version( user_id: user.name, device_id: device_id.0, has_been_merged: are_all_participants_mergable && is_different_from_request_content, + idempotency_key, }; state