From d715d94b6ddd534a963687291766e85db447d648 Mon Sep 17 00:00:00 2001 From: Andras Schmelczer Date: Thu, 23 Apr 2026 20:35:42 +0100 Subject: [PATCH] . --- frontend/history-ui/src/lib/api.ts | 7 +- .../src/lib/types/CreateDocumentVersion.ts | 2 +- .../src/lib/types/DocumentUpdateResponse.ts | 4 - .../src/lib/types/DocumentWithCursors.ts | 2 +- .../src/lib/types/WebSocketVaultPathChange.ts | 11 +- frontend/history-ui/src/lib/types/index.ts | 2 + .../file-operations/file-operations.test.ts | 256 +++++---- .../src/file-operations/file-operations.ts | 121 ++--- .../sync-client/src/services/sync-service.ts | 87 ++-- .../services/types/CreateDocumentVersion.ts | 2 +- .../services/types/DeleteDocumentVersion.ts | 5 - .../services/types/DocumentUpdateResponse.ts | 4 - .../src/services/types/DocumentWithCursors.ts | 2 +- .../types/WebSocketVaultPathChange.ts | 11 +- .../src/services/websocket-manager.ts | 6 + frontend/sync-client/src/sync-client.ts | 8 + .../src/sync-operations/cursor-tracker.ts | 26 +- .../src/sync-operations/sync-event-queue.ts | 183 ++++++- .../sync-client/src/sync-operations/syncer.ts | 493 +++++++++++++----- .../src/utils/conflict-path.test.ts | 85 +++ .../sync-client/src/utils/conflict-path.ts | 66 +++ sync-server/src/app_state/database.rs | 58 +-- sync-server/src/app_state/websocket/models.rs | 3 +- sync-server/src/server/delete_document.rs | 3 +- sync-server/src/server/requests.rs | 2 - sync-server/src/utils/sanitize_path.rs | 11 + 26 files changed, 1007 insertions(+), 453 deletions(-) delete mode 100644 frontend/sync-client/src/services/types/DeleteDocumentVersion.ts create mode 100644 frontend/sync-client/src/utils/conflict-path.test.ts create mode 100644 frontend/sync-client/src/utils/conflict-path.ts diff --git a/frontend/history-ui/src/lib/api.ts b/frontend/history-ui/src/lib/api.ts index 6d52a0f7..d80b5eb1 100644 --- a/frontend/history-ui/src/lib/api.ts +++ b/frontend/history-ui/src/lib/api.ts @@ -81,7 +81,12 @@ export class ApiClient { ): Promise { const response = await fetch( `${this.baseUrl}/documents/${documentId}/versions/${vaultUpdateId}/content`, - { headers: this.headers() } + { + headers: { + Authorization: `Bearer ${this.token}`, + "device-id": "history-ui" + } + } ); if (!response.ok) { throw new Error(`HTTP ${response.status}`); diff --git a/frontend/history-ui/src/lib/types/CreateDocumentVersion.ts b/frontend/history-ui/src/lib/types/CreateDocumentVersion.ts index 86ba60f3..29d3f55e 100644 --- a/frontend/history-ui/src/lib/types/CreateDocumentVersion.ts +++ b/frontend/history-ui/src/lib/types/CreateDocumentVersion.ts @@ -1,3 +1,3 @@ // This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. -export type CreateDocumentVersion = { relative_path: string, content: Array, }; +export type CreateDocumentVersion = { relative_path: string, last_seen_vault_update_id: number, content: Array, }; diff --git a/frontend/history-ui/src/lib/types/DocumentUpdateResponse.ts b/frontend/history-ui/src/lib/types/DocumentUpdateResponse.ts index 48f0fd1c..4e2ef3ab 100644 --- a/frontend/history-ui/src/lib/types/DocumentUpdateResponse.ts +++ b/frontend/history-ui/src/lib/types/DocumentUpdateResponse.ts @@ -4,9 +4,5 @@ import type { DocumentUpdateMetadata } from "./DocumentUpdateMetadata"; /** * Response to a create/update document request. - * - * Neither variant contains `relative_path`: the client tracks the document's - * on-disk path locally and the server is the authority on document identity - * (`document_id`), not on its path. */ export type DocumentUpdateResponse = { "type": "FastForwardUpdate" } & DocumentUpdateMetadata | { "type": "MergingUpdate" } & DocumentUpdateMergedContent; diff --git a/frontend/history-ui/src/lib/types/DocumentWithCursors.ts b/frontend/history-ui/src/lib/types/DocumentWithCursors.ts index 38857a35..3504ce33 100644 --- a/frontend/history-ui/src/lib/types/DocumentWithCursors.ts +++ b/frontend/history-ui/src/lib/types/DocumentWithCursors.ts @@ -1,4 +1,4 @@ // This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. import type { CursorSpan } from "./CursorSpan"; -export type DocumentWithCursors = { vault_update_id: number | null, document_id: string, relative_path: string, cursors: Array, }; +export type DocumentWithCursors = { vaultUpdateId: number | null, documentId: string, relativePath: string, cursors: Array, }; diff --git a/frontend/history-ui/src/lib/types/WebSocketVaultPathChange.ts b/frontend/history-ui/src/lib/types/WebSocketVaultPathChange.ts index 5079b14b..a0af0a7b 100644 --- a/frontend/history-ui/src/lib/types/WebSocketVaultPathChange.ts +++ b/frontend/history-ui/src/lib/types/WebSocketVaultPathChange.ts @@ -1,12 +1,3 @@ // This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. -/** - * A rename notification. Emitted whenever a write commits a document at - * a path that differs from what the origin client sent and/or from the - * document's previous stored path. Unlike [`WebSocketVaultUpdate`] this - * event is delivered to all subscribers *including the origin device*, - * because the create/update HTTP response no longer carries the path and - * the origin needs this event to learn the server-canonical path - * (e.g. when the server deduped or rejected a rename). - */ -export type WebSocketVaultPathChange = { vaultUpdateId: number, documentId: string, relativePath: string, }; +export type WebSocketVaultPathChange = { vaultUpdateId: number, documentId: string, relativePath: string, updatedDate: string, userId: string, deviceId: string, }; diff --git a/frontend/history-ui/src/lib/types/index.ts b/frontend/history-ui/src/lib/types/index.ts index a2c2b346..ad1b4d41 100644 --- a/frontend/history-ui/src/lib/types/index.ts +++ b/frontend/history-ui/src/lib/types/index.ts @@ -1,3 +1,5 @@ +import type { DocumentVersionWithoutContent } from "./DocumentVersionWithoutContent"; + export type { DocumentVersion } from "./DocumentVersion"; export type { DocumentVersionWithoutContent } from "./DocumentVersionWithoutContent"; export type { FetchLatestDocumentsResponse } from "./FetchLatestDocumentsResponse"; diff --git a/frontend/sync-client/src/file-operations/file-operations.test.ts b/frontend/sync-client/src/file-operations/file-operations.test.ts index b5851a3e..78977b14 100644 --- a/frontend/sync-client/src/file-operations/file-operations.test.ts +++ b/frontend/sync-client/src/file-operations/file-operations.test.ts @@ -1,5 +1,6 @@ import { describe, it } from "node:test"; -import type { DocumentId, DocumentRecord, RelativePath } from "../sync-operations/types"; +import assert from "node:assert/strict"; +import type { RelativePath } from "../sync-operations/types"; import type { SyncEventQueue } from "../sync-operations/sync-event-queue"; import { FileOperations } from "./file-operations"; import { Logger } from "../tracing/logger"; @@ -7,6 +8,7 @@ import { assertSetContainsExactly } from "../utils/assert-set-contains-exactly"; import type { FileSystemOperations } from "./filesystem-operations"; import type { TextWithCursors } from "reconcile-text"; import type { ServerConfig, ServerConfigData } from "../services/server-config"; +import { isConflictPath } from "../utils/conflict-path"; class MockServerConfig implements Pick { public async getConfig(): Promise { @@ -18,18 +20,19 @@ class MockServerConfig implements Pick { } } -class MockQueue implements Pick { - public getDocumentByPath( - _path: RelativePath - ): DocumentRecord | undefined { - return undefined; - } - +// The queue only receives `moveDocument`/`removeDocument` from file-ops; for +// these tests we just need no-op implementations that let the type-check +// pass when cast to `SyncEventQueue`. +class MockQueue implements Pick { public moveDocument( _oldPath: RelativePath, _newPath: RelativePath - ): DocumentId | undefined { - return undefined; + ): void { + // no-op + } + + public removeDocument(_path: RelativePath): void { + // no-op } } @@ -39,7 +42,7 @@ class FakeFileSystemOperations implements FileSystemOperations { public async listFilesRecursively( _root: RelativePath | undefined ): Promise { - return ["file.md"]; + return Array.from(this.names); } public async read(_path: RelativePath): Promise { throw new Error("Method not implemented."); @@ -65,9 +68,6 @@ class FakeFileSystemOperations implements FileSystemOperations { public async exists(path: RelativePath): Promise { return this.names.has(path); } - public async createDirectory(_path: RelativePath): Promise { - // this is called but irrelevant for this mock - } public async delete(_path: RelativePath): Promise { throw new Error("Method not implemented."); } @@ -80,152 +80,140 @@ class FakeFileSystemOperations implements FileSystemOperations { } } +function makeOps(): { + fs: FakeFileSystemOperations; + ops: FileOperations; +} { + const fs = new FakeFileSystemOperations(); + const ops = new FileOperations( + new Logger(), + new MockQueue() as SyncEventQueue, // eslint-disable-line @typescript-eslint/no-unsafe-type-assertion + fs, + new MockServerConfig() as ServerConfig // eslint-disable-line @typescript-eslint/no-unsafe-type-assertion + ); + return { fs, ops }; +} + +function singleConflictPath( + names: Set, + expectedNonConflictNames: string[] +): string { + const expected = new Set(expectedNonConflictNames); + const conflicts = Array.from(names).filter( + (name) => !expected.has(name) + ); + assert.equal( + conflicts.length, + 1, + `expected exactly one conflict-path entry, got ${JSON.stringify(conflicts)}` + ); + assert.ok( + isConflictPath(conflicts[0]), + `expected ${conflicts[0]} to match the conflict-path pattern` + ); + return conflicts[0]; +} + describe("File operations", () => { - it("should deconflict renames", async () => { - const fileSystemOperations = new FakeFileSystemOperations(); - const fileOperations = new FileOperations( - new Logger(), - new MockQueue() as SyncEventQueue, // eslint-disable-line @typescript-eslint/no-unsafe-type-assertion - fileSystemOperations, - new MockServerConfig() as ServerConfig // eslint-disable-line @typescript-eslint/no-unsafe-type-assertion - ); + it("move to empty target just renames the file", async () => { + const { fs, ops } = makeOps(); - await fileOperations.create("a", new Uint8Array()); - assertSetContainsExactly(fileSystemOperations.names, "a"); - await fileOperations.move("a", "b"); - assertSetContainsExactly(fileSystemOperations.names, "b"); + await ops.create("a", new Uint8Array()); + assertSetContainsExactly(fs.names, "a"); - await fileOperations.create("c", new Uint8Array()); - assertSetContainsExactly(fileSystemOperations.names, "b", "c"); + await ops.move("a", "b"); + assertSetContainsExactly(fs.names, "b"); + }); - await fileOperations.move("c", "b"); - assertSetContainsExactly(fileSystemOperations.names, "b", "b (1)"); + it("create at an occupied path displaces the existing file to a conflict-uuid path", async () => { + const { fs, ops } = makeOps(); - await fileOperations.create("c", new Uint8Array()); - await fileOperations.move("c", "b"); - assertSetContainsExactly( - fileSystemOperations.names, - "b", - "b (1)", - "b (2)" + await ops.create("note.md", new Uint8Array()); + await ops.create("note.md", new Uint8Array()); + + const conflict = singleConflictPath(fs.names, ["note.md"]); + assert.ok( + conflict.endsWith("-note.md"), + `conflict name should preserve the original filename, got ${conflict}` ); }); - it("should deconflict renames with file extension", async () => { - const fileSystemOperations = new FakeFileSystemOperations(); - const fileOperations = new FileOperations( - new Logger(), - new MockQueue() as SyncEventQueue, // eslint-disable-line @typescript-eslint/no-unsafe-type-assertion - fileSystemOperations, - new MockServerConfig() as ServerConfig // eslint-disable-line @typescript-eslint/no-unsafe-type-assertion - ); + it("move to an occupied target displaces the target to a conflict-uuid path", async () => { + const { fs, ops } = makeOps(); - await fileOperations.create("b.md", new Uint8Array()); - await fileOperations.create("c.md", new Uint8Array()); - await fileOperations.move("c.md", "b.md"); - assertSetContainsExactly( - fileSystemOperations.names, - "b.md", - "b (1).md" - ); + await ops.create("source.md", new Uint8Array()); + await ops.create("dest.md", new Uint8Array()); - await fileOperations.create("d.md", new Uint8Array()); - await fileOperations.move("d.md", "b.md"); - assertSetContainsExactly( - fileSystemOperations.names, - "b.md", - "b (1).md", - "b (2).md" - ); + await ops.move("source.md", "dest.md"); - await fileOperations.create("file-23.md", new Uint8Array()); - await fileOperations.create("file-23 (1).md", new Uint8Array()); - await fileOperations.move("file-23.md", "file-23 (1).md"); - assertSetContainsExactly( - fileSystemOperations.names, - "b.md", - "b (1).md", - "b (2).md", - "file-23 (1).md", - "file-23 (2).md" + // `dest.md` now holds what used to be at `source.md`; the original + // `dest.md` moved to a conflict path in the same directory. + const conflict = singleConflictPath(fs.names, ["dest.md"]); + assert.ok( + conflict.endsWith("-dest.md"), + `conflict should preserve the original filename, got ${conflict}` ); }); - it("should deconflict renames with paths", async () => { - const fileSystemOperations = new FakeFileSystemOperations(); - const fileOperations = new FileOperations( - new Logger(), - new MockQueue() as SyncEventQueue, // eslint-disable-line @typescript-eslint/no-unsafe-type-assertion - fileSystemOperations, - new MockServerConfig() as ServerConfig // eslint-disable-line @typescript-eslint/no-unsafe-type-assertion - ); + it("preserves the parent directory when generating a conflict path", async () => { + const { fs, ops } = makeOps(); - await fileOperations.create("a/b.c/d", new Uint8Array()); - await fileOperations.create("a/b.c/e", new Uint8Array()); - await fileOperations.move("a/b.c/d", "a/b.c/e"); - assertSetContainsExactly( - fileSystemOperations.names, - "a/b.c/e", - "a/b.c/e (1)" + await ops.create("a/b.c/d", new Uint8Array()); + await ops.create("a/b.c/e", new Uint8Array()); + await ops.move("a/b.c/d", "a/b.c/e"); + + const conflict = singleConflictPath(fs.names, ["a/b.c/e"]); + assert.ok( + conflict.startsWith("a/b.c/"), + `conflict should live in the same directory, got ${conflict}` + ); + assert.ok( + conflict.endsWith("-e"), + `conflict should preserve the filename, got ${conflict}` ); }); - it("should continue deconfliction from existing number in filename", async () => { - const fileSystemOperations = new FakeFileSystemOperations(); - const fileOperations = new FileOperations( - new Logger(), - new MockQueue() as SyncEventQueue, // eslint-disable-line @typescript-eslint/no-unsafe-type-assertion - fileSystemOperations, - new MockServerConfig() as ServerConfig // eslint-disable-line @typescript-eslint/no-unsafe-type-assertion + it("handles dotfiles without mangling the extension", async () => { + const { fs, ops } = makeOps(); + + await ops.create(".gitignore", new Uint8Array()); + await ops.create("temp", new Uint8Array()); + await ops.move("temp", ".gitignore"); + + const conflict = singleConflictPath(fs.names, [".gitignore"]); + assert.ok( + conflict.endsWith("-.gitignore"), + `conflict should preserve the dotfile name verbatim, got ${conflict}` ); - await fileOperations.create("document (5).md", new Uint8Array()); - await fileOperations.create("other.md", new Uint8Array()); + await ops.create(".config.json", new Uint8Array()); + await ops.create("temp2", new Uint8Array()); + await ops.move("temp2", ".config.json"); - await fileOperations.move("other.md", "document (5).md"); - assertSetContainsExactly( - fileSystemOperations.names, - "document (5).md", - "document (6).md" - ); - - await fileOperations.create("another.md", new Uint8Array()); - await fileOperations.move("another.md", "document (5).md"); - assertSetContainsExactly( - fileSystemOperations.names, - "document (5).md", - "document (6).md", - "document (7).md" + // Now one conflict for .gitignore, one for .config.json. + const conflicts = Array.from(fs.names).filter( + (name) => name !== ".gitignore" && name !== ".config.json" ); + assert.equal(conflicts.length, 2); + assert.ok(conflicts.every(isConflictPath)); + assert.ok(conflicts.some((c) => c.endsWith("-.gitignore"))); + assert.ok(conflicts.some((c) => c.endsWith("-.config.json"))); }); - it("should handle dotfiles correctly", async () => { - const fileSystemOperations = new FakeFileSystemOperations(); - const fileOperations = new FileOperations( - new Logger(), - new MockQueue() as SyncEventQueue, // eslint-disable-line @typescript-eslint/no-unsafe-type-assertion - fileSystemOperations, - new MockServerConfig() as ServerConfig // eslint-disable-line @typescript-eslint/no-unsafe-type-assertion - ); + it("generates a fresh conflict path on every displacement", async () => { + const { fs, ops } = makeOps(); - await fileOperations.create(".gitignore", new Uint8Array()); - await fileOperations.create("temp", new Uint8Array()); - await fileOperations.move("temp", ".gitignore"); - assertSetContainsExactly( - fileSystemOperations.names, - ".gitignore", - ".gitignore (1)" - ); + await ops.create("x", new Uint8Array()); + await ops.create("x", new Uint8Array()); + await ops.create("x", new Uint8Array()); - await fileOperations.create(".config.json", new Uint8Array()); - await fileOperations.create("temp2", new Uint8Array()); - await fileOperations.move("temp2", ".config.json"); - assertSetContainsExactly( - fileSystemOperations.names, - ".gitignore", - ".gitignore (1)", - ".config.json", - ".config (1).json" + const conflicts = Array.from(fs.names).filter((n) => n !== "x"); + assert.equal(conflicts.length, 2); + assert.ok(conflicts.every(isConflictPath)); + assert.notEqual( + conflicts[0], + conflicts[1], + "each displacement should produce a unique conflict path" ); }); }); diff --git a/frontend/sync-client/src/file-operations/file-operations.ts b/frontend/sync-client/src/file-operations/file-operations.ts index 8a9b10ba..3b3d50c4 100644 --- a/frontend/sync-client/src/file-operations/file-operations.ts +++ b/frontend/sync-client/src/file-operations/file-operations.ts @@ -7,10 +7,10 @@ import type { TextWithCursors } from "reconcile-text"; import { reconcile } from "reconcile-text"; import { isFileTypeMergable } from "../utils/is-file-type-mergable"; import { isBinary } from "../utils/is-binary"; +import { buildConflictFileName } from "../utils/conflict-path"; import type { ServerConfig } from "../services/server-config"; export class FileOperations { - private static readonly PARENTHESES_REGEX = / \((?\d+)\)$/; private readonly fs: SafeFileSystemOperations; public constructor( @@ -59,26 +59,34 @@ export class FileOperations { return this.fs.write(path, this.toNativeLineEndings(newContent)); } - // Returns the deconflicted path if a file was moved, undefined otherwise + /** + * Ensure nothing sits at `path` so the caller can write to it. + * + * If a file is already there, it is moved aside to a `conflict--` + * path in the same directory. The sync layer treats conflict-named files + * as invisible (see `isConflictPath`), so no events are enqueued and no + * document records are touched — any pre-existing record or pending + * events for the displaced path are left behind for the caller to + * overwrite as part of whatever operation prompted the displacement. + * + * Returns the conflict path the existing file was moved to, or `undefined` + * if the path was already clear. + */ public async ensureClearPath( path: RelativePath ): Promise { if (await this.fs.exists(path)) { - const deconflictedPath = await this.deconflictPath(path); - try { - this.logger.debug( - `Didn't expect ${path} to exist, deconflicting by moving it to '${deconflictedPath}'` - ); + const conflictPath = FileOperations.buildConflictPath(path); + this.logger.debug( + `Displacing existing file at ${path} to '${conflictPath}' to make room` + ); - this.queue.moveDocument(path, deconflictedPath); - await this.fs.rename(path, deconflictedPath, true); - return deconflictedPath; - } finally { - this.fs.unlock(deconflictedPath); - } - } else { - await this.createParentDirectories(path); + this.queue.moveDocument(path, conflictPath); + await this.fs.rename(path, conflictPath, true); + return conflictPath; } + + await this.createParentDirectories(path); return undefined; } @@ -119,8 +127,22 @@ export class FileOperations { return; } - const expectedText = new TextDecoder().decode(expectedContent); // this comes from a previous read which must only have \n line endings - const newText = new TextDecoder().decode(newContent); // this comes from the server which stores text with \n line endings + let expectedText: string; + let newText: string; + try { + expectedText = new TextDecoder("utf-8", { fatal: true }).decode( + expectedContent + ); // this comes from a previous read which must only have \n line endings + newText = new TextDecoder("utf-8", { fatal: true }).decode( + newContent + ); // this comes from the server which stores text with \n line endings + } catch (decodeError) { + this.logger.warn( + `3-way merge aborted for ${path}: one of expected/new is not valid UTF-8 (${decodeError}); falling back to overwrite` + ); + await this.fs.write(path, this.toNativeLineEndings(newContent)); + return; + } await this.fs.atomicUpdateText( path, @@ -166,7 +188,7 @@ export class FileOperations { return this.fs.exists(path); } - // Returns the deconflicted path if a file at the target was displaced + // Returns the conflict path a displaced file was moved to, or undefined. public async move( oldPath: RelativePath, newPath: RelativePath @@ -175,12 +197,16 @@ export class FileOperations { return undefined; } - const deconflictedPath = await this.ensureClearPath(newPath); - this.queue.moveDocument(oldPath, newPath); + const conflictPath = await this.ensureClearPath(newPath); + // Do the disk rename *before* updating the queue. If the rename + // throws (permissions, concurrent deletion, …), the queue still + // reflects the actual on-disk state instead of claiming the doc + // has already moved. await this.fs.rename(oldPath, newPath); + this.queue.moveDocument(oldPath, newPath); await this.deletingEmptyParentDirectoriesOfDeletedFile(oldPath); - return deconflictedPath; + return conflictPath; } @@ -248,51 +274,14 @@ export class FileOperations { } /** - * Deconflicts the given path by appending (1), (2), etc. before the file extension until a non-existent path is found. - * The returned path has a lock acquired on it; it must be released by the caller when no longer needed. - * - * @param path The starting path to deconflict - * @returns a non-existent path with a lock acquired on it + * Build a local-only conflict path for a file the client has to set aside. + * Format: `/conflict--` — UUID makes collisions + * statistically impossible, so no disk probe / lock dance is needed. */ - private async deconflictPath(path: RelativePath): Promise { - // eslint-disable-next-line prefer-const - let [directory, fileName] = FileOperations.getParentDirAndFile(path); - - if (directory) { - directory += "/"; - } - - const nameParts = fileName.split("."); - // Handle dotfiles: ".gitignore" should have no extension, ".config.json" should have ".json" - const isDotfile = fileName.startsWith(".") && nameParts[0] === ""; - const extension = - nameParts.length > 1 && !(isDotfile && nameParts.length === 2) - ? "." + nameParts[nameParts.length - 1] - : ""; - let stem = extension ? nameParts.slice(0, -1).join(".") : fileName; - let currentCount = Number.parseInt( - FileOperations.PARENTHESES_REGEX.exec(stem)?.groups?.count ?? "0" - ); - stem = stem.replace(FileOperations.PARENTHESES_REGEX, ""); - - let newName = path; - - // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition - while (true) { - currentCount++; - newName = `${directory}${stem} (${currentCount})${extension}`; - - // Avoid multiple deconflictPath calls returning the same path - await this.fs.waitForLock(newName); - const existingRecord = this.queue.getSettledDocumentByPath(newName); - if ( - existingRecord !== undefined || // the document might have been confirmed by the server at a new path but haven't yet moved there locally - (await this.fs.exists(newName, true)) - ) { - this.fs.unlock(newName); - } else { - return newName; - } - } + private static buildConflictPath(path: RelativePath): RelativePath { + const [directory, fileName] = + FileOperations.getParentDirAndFile(path); + const conflictName = buildConflictFileName(fileName); + return directory ? `${directory}/${conflictName}` : conflictName; } } diff --git a/frontend/sync-client/src/services/sync-service.ts b/frontend/sync-client/src/services/sync-service.ts index ad268814..873783c3 100644 --- a/frontend/sync-client/src/services/sync-service.ts +++ b/frontend/sync-client/src/services/sync-service.ts @@ -16,12 +16,12 @@ import type { DocumentUpdateResponse } from "./types/DocumentUpdateResponse"; import type { DocumentVersion } from "./types/DocumentVersion"; import type { FetchLatestDocumentsResponse } from "./types/FetchLatestDocumentsResponse"; import type { PingResponse } from "./types/PingResponse"; -import type { DeleteDocumentVersion } from "./types/DeleteDocumentVersion"; import type { UpdateTextDocumentVersion } from "./types/UpdateTextDocumentVersion"; export class SyncService { private readonly client: typeof globalThis.fetch; private readonly pingClient: typeof globalThis.fetch; + private isStopped = false; public constructor( private readonly deviceId: string, @@ -68,15 +68,21 @@ export class SyncService { public async create({ relativePath, + lastSeenVaultUpdateId, contentBytes }: { relativePath: RelativePath; + lastSeenVaultUpdateId: VaultUpdateId; contentBytes: Uint8Array; }): Promise { return this.retryForever(async () => { const formData = new FormData(); formData.append("relative_path", relativePath); + formData.append( + "last_seen_vault_update_id", + lastSeenVaultUpdateId.toString() + ); formData.append( "content", new Blob([new Uint8Array(contentBytes)]) @@ -92,13 +98,7 @@ export class SyncService { headers: this.getDefaultHeaders() }); - if (!response.ok) { - throw new Error( - `Failed to create document: ${await SyncService.errorFromResponse( - response - )}` - ); - } + await SyncService.throwIfNotOk(response, "create document"); const result: DocumentUpdateResponse = (await response.json()) as DocumentUpdateResponse; // eslint-disable-line @typescript-eslint/no-unsafe-type-assertion @@ -210,30 +210,21 @@ export class SyncService { relativePath: RelativePath; }): Promise { return this.retryForever(async () => { - const request: DeleteDocumentVersion = { - relativePath - }; - this.logger.debug( `Delete document with id ${documentId} and relative path ${relativePath}` ); + // The server identifies the document by its URL path; no body + // is needed. Sending one was a leftover of an earlier shape. const response = await this.client( this.getUrl(`/documents/${documentId}`), { method: "DELETE", - body: JSON.stringify(request), - headers: this.getDefaultHeaders({ type: "json" }) + headers: this.getDefaultHeaders() } ); - if (!response.ok) { - throw new Error( - `Failed to delete document: ${await SyncService.errorFromResponse( - response - )}` - ); - } + await SyncService.throwIfNotOk(response, "delete document"); const result: DocumentVersionWithoutContent = (await response.json()) as DocumentVersionWithoutContent; // eslint-disable-line @typescript-eslint/no-unsafe-type-assertion @@ -261,13 +252,7 @@ export class SyncService { } ); - if (!response.ok) { - throw new Error( - `Failed to get document: ${await SyncService.errorFromResponse( - response - )}` - ); - } + await SyncService.throwIfNotOk(response, "get document"); const result: DocumentVersion = (await response.json()) as DocumentVersion; // eslint-disable-line @typescript-eslint/no-unsafe-type-assertion @@ -299,13 +284,7 @@ export class SyncService { } ); - if (!response.ok) { - throw new Error( - `Failed to get document: ${await SyncService.errorFromResponse( - response - )}` - ); - } + await SyncService.throwIfNotOk(response, "get document version content"); const result = await response.bytes(); this.logger.debug( @@ -332,13 +311,7 @@ export class SyncService { headers: this.getDefaultHeaders() }); - if (!response.ok) { - throw new Error( - `Failed to get documents: ${await SyncService.errorFromResponse( - response - )}` - ); - } + await SyncService.throwIfNotOk(response, "get documents"); const result: FetchLatestDocumentsResponse = (await response.json()) as FetchLatestDocumentsResponse; // eslint-disable-line @typescript-eslint/no-unsafe-type-assertion @@ -396,9 +369,30 @@ export class SyncService { return headers; } + /** + * Signal that the service is shutting down so any in-flight + * `retryForever` exits at its next iteration instead of looping + * indefinitely after the rest of the client has stopped. Idempotent. + */ + public stop(): void { + this.isStopped = true; + } + + /** + * Re-enable the service after a `stop()`. Used when the client pauses + * and resumes syncing within the same lifecycle (e.g. user toggles + * sync off and on). + */ + public resume(): void { + this.isStopped = false; + } + private async retryForever(fn: () => Promise): Promise { // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition while (true) { + if (this.isStopped) { + throw new SyncResetError(); + } try { return await fn(); } catch (e) { @@ -408,6 +402,9 @@ export class SyncService { ) { throw e; } + if (this.isStopped) { + throw new SyncResetError(); + } const retryInterval = this.settings.getSettings().networkRetryIntervalMs; @@ -425,6 +422,12 @@ export class SyncService { ): Promise { if (response.ok) return; const message = `Failed to ${operation}: ${await SyncService.errorFromResponse(response)}`; + // 429 is the only 4xx the server uses for *transient* contention + // (`WriteBusyError` → HTTP 429). Every other 4xx means the request + // is permanently rejected and shouldn't be retried. + if (response.status === 429) { + throw new Error(message); + } if (response.status >= 400 && response.status < 500) { throw new HttpClientError(response.status, message); } diff --git a/frontend/sync-client/src/services/types/CreateDocumentVersion.ts b/frontend/sync-client/src/services/types/CreateDocumentVersion.ts index d4ed2831..4d1b324e 100644 --- a/frontend/sync-client/src/services/types/CreateDocumentVersion.ts +++ b/frontend/sync-client/src/services/types/CreateDocumentVersion.ts @@ -1,3 +1,3 @@ // This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. -export interface CreateDocumentVersion { relative_path: string, content: number[], } +export interface CreateDocumentVersion { relative_path: string, last_seen_vault_update_id: number, content: number[], } diff --git a/frontend/sync-client/src/services/types/DeleteDocumentVersion.ts b/frontend/sync-client/src/services/types/DeleteDocumentVersion.ts deleted file mode 100644 index 99ecc9e7..00000000 --- a/frontend/sync-client/src/services/types/DeleteDocumentVersion.ts +++ /dev/null @@ -1,5 +0,0 @@ -// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. - -export interface DeleteDocumentVersion { - relativePath: string; -} diff --git a/frontend/sync-client/src/services/types/DocumentUpdateResponse.ts b/frontend/sync-client/src/services/types/DocumentUpdateResponse.ts index 48f0fd1c..4e2ef3ab 100644 --- a/frontend/sync-client/src/services/types/DocumentUpdateResponse.ts +++ b/frontend/sync-client/src/services/types/DocumentUpdateResponse.ts @@ -4,9 +4,5 @@ import type { DocumentUpdateMetadata } from "./DocumentUpdateMetadata"; /** * Response to a create/update document request. - * - * Neither variant contains `relative_path`: the client tracks the document's - * on-disk path locally and the server is the authority on document identity - * (`document_id`), not on its path. */ export type DocumentUpdateResponse = { "type": "FastForwardUpdate" } & DocumentUpdateMetadata | { "type": "MergingUpdate" } & DocumentUpdateMergedContent; diff --git a/frontend/sync-client/src/services/types/DocumentWithCursors.ts b/frontend/sync-client/src/services/types/DocumentWithCursors.ts index e7dad119..d29b3f79 100644 --- a/frontend/sync-client/src/services/types/DocumentWithCursors.ts +++ b/frontend/sync-client/src/services/types/DocumentWithCursors.ts @@ -1,4 +1,4 @@ // This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. import type { CursorSpan } from "./CursorSpan"; -export interface DocumentWithCursors { vault_update_id: number | null, document_id: string, relative_path: string, cursors: CursorSpan[], } +export interface DocumentWithCursors { vaultUpdateId: number | null, documentId: string, relativePath: string, cursors: CursorSpan[], } diff --git a/frontend/sync-client/src/services/types/WebSocketVaultPathChange.ts b/frontend/sync-client/src/services/types/WebSocketVaultPathChange.ts index 337eb135..f4b5bb84 100644 --- a/frontend/sync-client/src/services/types/WebSocketVaultPathChange.ts +++ b/frontend/sync-client/src/services/types/WebSocketVaultPathChange.ts @@ -1,12 +1,3 @@ // This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. -/** - * A rename notification. Emitted whenever a write commits a document at - * a path that differs from what the origin client sent and/or from the - * document's previous stored path. Unlike [`WebSocketVaultUpdate`] this - * event is delivered to all subscribers *including the origin device*, - * because the create/update HTTP response no longer carries the path and - * the origin needs this event to learn the server-canonical path - * (e.g. when the server deduped or rejected a rename). - */ -export interface WebSocketVaultPathChange { vaultUpdateId: number, documentId: string, relativePath: string, } +export interface WebSocketVaultPathChange { vaultUpdateId: number, documentId: string, relativePath: string, updatedDate: string, userId: string, deviceId: string, } diff --git a/frontend/sync-client/src/services/websocket-manager.ts b/frontend/sync-client/src/services/websocket-manager.ts index 5ec40e49..6c938dc7 100644 --- a/frontend/sync-client/src/services/websocket-manager.ts +++ b/frontend/sync-client/src/services/websocket-manager.ts @@ -181,6 +181,12 @@ export class WebSocketManager { `Failed to close previous WebSocket connection: ${e}` ); } + // Abandon any outstanding handler promises from the previous + // connection. They'll still resolve in the background, but we + // no longer want `waitUntilFinished` / `stop` to block on + // post-reconnect state — and we definitely don't want their + // results applied against a now-stale socket. + this.outstandingPromises.length = 0; } const wsUri = new URL(this.settings.getSettings().remoteUri); diff --git a/frontend/sync-client/src/sync-client.ts b/frontend/sync-client/src/sync-client.ts index 1a88c269..ff1c3841 100644 --- a/frontend/sync-client/src/sync-client.ts +++ b/frontend/sync-client/src/sync-client.ts @@ -48,6 +48,7 @@ export class SyncClient { private readonly contentCache: FixedSizeDocumentCache, private readonly fileOperations: FileOperations, private readonly serverConfig: ServerConfig, + private readonly syncService: SyncService, private readonly persistence: PersistenceProvider< Partial<{ settings: Partial; @@ -221,6 +222,7 @@ export class SyncClient { contentCache, fileOperations, serverConfig, + syncService, persistence ); @@ -460,6 +462,8 @@ export class SyncClient { private async startSyncing(): Promise { this.checkIfDestroyed("startSyncing"); this.fetchController.finishReset(); + // Undo any earlier `pause()` stop so retryForever keeps retrying. + this.syncService.resume(); await this.serverConfig.getConfig(); @@ -472,6 +476,10 @@ export class SyncClient { private async pause(): Promise { this.hasFinishedOfflineSync = false; this.fetchController.startReset(); + // Signal the service so any `retryForever` loop exits at its next + // iteration instead of continuing to retry a network request while + // the rest of the client is winding down. + this.syncService.stop(); await this.webSocketManager.stop(); await this.waitUntilFinished(); } diff --git a/frontend/sync-client/src/sync-operations/cursor-tracker.ts b/frontend/sync-client/src/sync-operations/cursor-tracker.ts index 703f654c..98548f73 100644 --- a/frontend/sync-client/src/sync-operations/cursor-tracker.ts +++ b/frontend/sync-client/src/sync-operations/cursor-tracker.ts @@ -58,7 +58,7 @@ export class CursorTracker { for (const cursor of clientCursors.filter((client) => client.documentsWithCursors.every( - (doc) => doc.vault_update_id != null + (doc) => doc.vaultUpdateId != null ) )) { updatedKnownRemoteCursors.push({ @@ -83,7 +83,7 @@ export class CursorTracker { if ( clientCursor.documentsWithCursors.some( (document) => - document.relative_path === relativePath + document.relativePath === relativePath ) ) { clientCursor.upToDateness = @@ -112,9 +112,9 @@ export class CursorTracker { } documentsWithCursors.push({ - relative_path: relativePath, - document_id: record.documentId, - vault_update_id: record.parentVersionId, + relativePath: relativePath, + documentId: record.documentId, + vaultUpdateId: record.parentVersionId, cursors: cursors.map(({ start, end }) => ({ start: Math.min(start, end), end: Math.max(start, end) @@ -133,11 +133,11 @@ export class CursorTracker { for (const doc of documentsWithCursors) { const readContent = await this.fileOperations.read( - doc.relative_path + doc.relativePath ); - const record = this.queue.getSettledDocumentByPath(doc.relative_path); + const record = this.queue.getSettledDocumentByPath(doc.relativePath); if (record?.remoteHash !== (await hash(readContent))) { - doc.vault_update_id = null; + doc.vaultUpdateId = null; } } @@ -221,7 +221,7 @@ export class CursorTracker { private async getDocumentUpToDateness( document: DocumentWithCursors ): Promise { - const record = this.queue.getSettledDocumentByPath(document.relative_path); + const record = this.queue.getSettledDocumentByPath(document.relativePath); if (!record) { // the document of the cursor must be from the future @@ -229,21 +229,21 @@ export class CursorTracker { } if ( - record.parentVersionId < (document.vault_update_id ?? 0) + record.parentVersionId < (document.vaultUpdateId ?? 0) ) { return DocumentUpToDateness.Later; } else if ( - (document.vault_update_id ?? 0) < record.parentVersionId + (document.vaultUpdateId ?? 0) < record.parentVersionId ) { // the document of the cursor must be from the past return DocumentUpToDateness.Prior; } const currentContent = await this.fileOperations.read( - document.relative_path + document.relativePath ); - const currentRecord = this.queue.getSettledDocumentByPath(document.relative_path); + const currentRecord = this.queue.getSettledDocumentByPath(document.relativePath); return currentRecord?.remoteHash === (await hash(currentContent)) ? DocumentUpToDateness.UpToDate : DocumentUpToDateness.Prior; diff --git a/frontend/sync-client/src/sync-operations/sync-event-queue.ts b/frontend/sync-client/src/sync-operations/sync-event-queue.ts index 9e10ba94..a49ce71f 100644 --- a/frontend/sync-client/src/sync-operations/sync-event-queue.ts +++ b/frontend/sync-client/src/sync-operations/sync-event-queue.ts @@ -1,6 +1,7 @@ import type { Settings } from "../persistence/settings"; import type { Logger } from "../tracing/logger"; import { globsToRegexes } from "../utils/globs-to-regexes"; +import { isConflictPath } from "../utils/conflict-path"; import { removeFromArray } from "../utils/remove-from-array"; import { SyncEventType, @@ -110,6 +111,59 @@ export class SyncEventQueue { this.saveInTheBackground(); } + /** + * Reflect a local rename in the queue's disk-path index. + * + * Mirrors the `input.oldPath !== undefined` branch of `enqueue`, but + * without emitting a new `SyncLocal` — used by `FileOperations.move` + * when the rename is a byproduct of another sync operation (e.g. the + * user dragging a file) and the caller will push the resulting event + * separately, or not at all. + * + * If the rename targets a path that already holds a settled record + * (e.g. concurrent clobber), the destination's record is dropped: the + * caller is expected to have moved the displaced file out of the way + * via `ensureClearPath` already, so the dropped record reflects the + * now-orphaned disk state. + */ + public moveDocument( + oldPath: RelativePath, + newPath: RelativePath + ): void { + if (oldPath === newPath) return; + + const record = this.documents.get(oldPath); + if (record !== undefined) { + // If `newPath` already holds a settled record, overwriting it + // silently would orphan that document's identity. Warn so the + // bug is visible; the caller is expected to have freed the + // destination via `ensureClearPath` first. + const clobbered = this.documents.get(newPath); + if (clobbered !== undefined) { + this.logger.warn( + `moveDocument(${oldPath} → ${newPath}) is overwriting a settled record for document ${clobbered.documentId}; caller should have displaced it first` + ); + } + + this.documents.delete(oldPath); + this.documents.set(newPath, record); + for (const e of this.events) { + if ( + e.type === SyncEventType.SyncLocal && + e.documentId === record.documentId + ) { + e.path = newPath; + } + } + this.saveInTheBackground(); + return; + } + + // No settled record — the rename may be over a pending Create + // whose document hasn't been persisted on the server yet. + this.updatePendingCreatePath(oldPath, newPath); + } + /** * Call once a create has been acknowledged by the server. */ @@ -232,11 +286,24 @@ export class SyncEventQueue { const { path } = input; + // Conflict-displaced files are local-only bookkeeping so a conflict + // hit is a debug-level event. A hit against a user-configured glob + // is a higher-signal "we're deliberately not syncing this" and + // stays at info. + if (isConflictPath(path)) { + this.logger.debug( + `Ignoring ${input.type} for ${path}: conflict-displaced file` + ); + return; + } + if (this.matchesUserIgnorePattern(path)) { + this.logger.info( + `Ignoring ${input.type} for ${path} as it matches ignore patterns` + ); + return; + } + if (input.type === SyncEventType.Create) { - if (this.isIgnored(path)) { - this.logger.info(`Ignoring create for ${path} as it matches ignore patterns`); - return; - } this.events.push({ type: SyncEventType.Create, path, originalPath: path }); return; } @@ -284,11 +351,23 @@ export class SyncEventQueue { // Deletes are returned immediately; also discard any subsequent // events for the same documentId so stale broadcasts don't - // resurrect the document + // resurrect the document. If the documentId is still a pending + // `Promise` (the originating Create hasn't landed + // yet), awaiting it may reject — handle that: the Create was + // cancelled, so the Delete has nothing to delete, just drop it. if (first.type === SyncEventType.Delete) { this.events.shift(); const { documentId } = first; - this.removeAllEventsForDocumentId(await documentId); + let resolvedId: DocumentId; + try { + resolvedId = await documentId; + } catch { + this.logger.debug( + "Dropping Delete whose Create was cancelled before it could be synced" + ); + return this.next(); + } + this.removeAllEventsForDocumentId(resolvedId); return first; } @@ -303,7 +382,16 @@ export class SyncEventQueue { e.documentId === documentId ); if (deleteEvent !== undefined) { - this.removeAllEventsForDocumentId(await documentId); + let resolvedId: DocumentId; + try { + resolvedId = await documentId; + } catch { + this.logger.debug( + "Dropping SyncLocal+Delete whose Create was cancelled before it could be synced" + ); + return this.next(); + } + this.removeAllEventsForDocumentId(resolvedId); return deleteEvent; } @@ -336,10 +424,14 @@ export class SyncEventQueue { return result; } - private isIgnored(path: RelativePath): boolean { + private matchesUserIgnorePattern(path: RelativePath): boolean { return this.ignorePatterns.some((pattern) => pattern.test(path)); } + private isIgnored(path: RelativePath): boolean { + return isConflictPath(path) || this.matchesUserIgnorePattern(path); + } + public removeAllEventsForDocumentId(documentId: DocumentId): void { for (let i = this.events.length - 1; i >= 0; i--) { const e = this.events[i]; @@ -406,6 +498,41 @@ export class SyncEventQueue { return undefined; } + /** + * Returns whether there is an unsynced Create event queued at `path`. + * A caller uses this to decide between displacing the local file vs. + * merging it with a concurrent remote create. + */ + public hasPendingCreateAt(path: RelativePath): boolean { + return this.findLastCreate(path) !== undefined; + } + + /** + * Cancel the latest queued Create for `path`. Rejects its resolver + * promise (so any dependent SyncLocal/Delete events that `await`ed + * the future documentId skip themselves gracefully) and removes the + * Create event from the queue. Returns true if a Create was found + * and cancelled. + */ + public cancelPendingCreate(path: RelativePath): boolean { + const event = this.findLastCreate(path); + if (event === undefined) return false; + + if (event.resolvers !== undefined) { + event.resolvers.promise.catch(() => { + /* suppressed — consumer may not be listening */ + }); + event.resolvers.reject( + new Error( + "Create was cancelled — merged with concurrent remote create" + ) + ); + } + + removeFromArray(this.events, event); + return true; + } + private rejectAllPendingCreates(): void { for (const event of this.events) { if (event.type === SyncEventType.Create && event.resolvers !== undefined) { @@ -415,9 +542,45 @@ export class SyncEventQueue { } } + private savePending = false; + + // Coalesce bursts of mutations into one persist per microtask. A drain + // iteration can easily produce 10+ mutations; without this, we'd fire + // 10 overlapping `save()` calls racing on the persistence backend. + // + // On failure, retry with bounded exponential backoff instead of + // silently dropping the write — otherwise a transient IDB/fs error + // leaves the in-memory state permanently diverged from persisted state + // and the user loses queue progress on restart. private saveInTheBackground(): void { - void this.save().catch((error: unknown) => { - this.logger.error(`Error saving sync state: ${error}`); + if (this.savePending) return; + this.savePending = true; + queueMicrotask(() => { + this.savePending = false; + void this.saveWithRetry(); }); } + + private async saveWithRetry(): Promise { + const maxAttempts = 3; + for (let attempt = 1; attempt <= maxAttempts; attempt++) { + try { + await this.save(); + return; + } catch (error) { + if (attempt === maxAttempts) { + this.logger.error( + `Error saving sync state after ${maxAttempts} attempts: ${error}` + ); + return; + } + this.logger.warn( + `Error saving sync state (attempt ${attempt}/${maxAttempts}): ${error}; retrying` + ); + await new Promise((resolve) => + setTimeout(resolve, 50 * attempt) + ); + } + } + } } diff --git a/frontend/sync-client/src/sync-operations/syncer.ts b/frontend/sync-client/src/sync-operations/syncer.ts index 7e772432..4e51976d 100644 --- a/frontend/sync-client/src/sync-operations/syncer.ts +++ b/frontend/sync-client/src/sync-operations/syncer.ts @@ -32,7 +32,7 @@ import { } from "../tracing/sync-history"; import { isBinary } from "../utils/is-binary"; import { isFileTypeMergable } from "../utils/is-file-type-mergable"; -import { diff } from "reconcile-text"; +import { diff, reconcile } from "reconcile-text"; import type { ServerConfig } from "../services/server-config"; import type { FixedSizeDocumentCache } from "../utils/data-structures/fix-sized-cache"; import { base64ToBytes } from "byte-base64"; @@ -68,7 +68,26 @@ export class Syncer { if (isConnected) { this.sendHandshakeMessage(); } else { - this.runningScheduleSyncForOfflineChanges = undefined; + // Don't null the reference synchronously — if the scan is + // still in flight, the next reconnect would spawn a second + // concurrent scan racing on the same queue. Defer the + // clear until the in-flight task actually resolves, so a + // fresh scan can only start once the prior one is done. + const current = this.runningScheduleSyncForOfflineChanges; + if (current === undefined) return; + current + .catch(() => { + /* swallow — internal error already logged */ + }) + .finally(() => { + if ( + this.runningScheduleSyncForOfflineChanges === + current + ) { + this.runningScheduleSyncForOfflineChanges = + undefined; + } + }); } }); this.webSocketManager.onRemoteVaultUpdateReceived.add( @@ -182,46 +201,64 @@ export class Syncer { // because the create/update HTTP response no longer carries the path, // so the only way the origin learns about dedupe or first-rename-wins // is via this event. + // + // Algorithmic assumptions: + // (1) Per-vault broadcast ordering is preserved by the server, so if + // the same write produced a `VaultUpdate` (content change) and a + // `PathChange` (path change), the `VaultUpdate` is handled first + // — that's what lets us skip advancing `parentVersionId` here + // without risking a stuck "already up-to-date" check later. + // (2) On a lag-induced disconnect (`broadcast::error::Lagged`) the + // server disconnects the client for a full resync, so out-of- + // order delivery across a reconnect boundary can't leave us with + // a stale PathChange overwriting a newer one. public async syncRemotelyChangedPath( pathChange: WebSocketVaultPathChange ): Promise { + // Serialize onto the drain chain so this handler can't race against + // an in-flight `processSyncRemote` / `processSyncLocal` etc. that + // captured the old path before our move. try { - const existing = this.queue.getDocumentByDocumentId( - pathChange.documentId - ); - if (existing === undefined) { - throw new Error( - `Received path change for unknown document ${pathChange.documentId}` + await this.chainOntoDrain(async () => { + const existing = this.queue.getDocumentByDocumentId( + pathChange.documentId ); - } + if (existing === undefined) { + throw new Error( + `Received path change for unknown document ${pathChange.documentId}` + ); + } - const { path: currentPath, record } = existing; - const newPath = pathChange.relativePath; + const { path: currentPath, record } = existing; + const newPath = pathChange.relativePath; - if (currentPath !== newPath) { - await this.operations.move(currentPath, newPath); + if (currentPath !== newPath) { + await this.operations.move(currentPath, newPath); - this.history.addHistoryEntry({ - status: SyncStatus.SUCCESS, - details: { - type: SyncType.MOVE, - relativePath: newPath, - movedFrom: currentPath - }, - message: "Applied remote path change" + this.history.addHistoryEntry({ + status: SyncStatus.SUCCESS, + details: { + type: SyncType.MOVE, + relativePath: newPath, + movedFrom: currentPath + }, + message: "Applied remote path change", + author: pathChange.userId, + timestamp: new Date(pathChange.updatedDate) + }); + } + + // `operations.move` updates the queue's path index, but + // doesn't touch `remoteRelativePath`. Refresh it so offline + // change detection compares against the server's path. + // parentVersionId intentionally stays at its prior value: + // if the write also changed content, the corresponding + // VaultUpdate handles that; advancing it here would make us + // skip fetching content we don't yet have. + this.queue.setDocument(newPath, { + ...record, + remoteRelativePath: newPath }); - } - - // `operations.move` updates the queue's path index, but - // doesn't touch `remoteRelativePath`. Refresh it so offline - // change detection compares against the server's path. - // parentVersionId intentionally stays at its prior value: - // if the write also changed content, the corresponding - // VaultUpdate handles that; advancing it here would make us - // skip fetching content we don't yet have. - this.queue.setDocument(newPath, { - ...record, - remoteRelativePath: newPath }); } catch (e) { if (e instanceof SyncResetError) { @@ -258,12 +295,19 @@ export class Syncer { private async internalScheduleSyncForOfflineChanges(): Promise { - await scheduleOfflineChanges( - { logger: this.logger, operations: this.operations, queue: this.queue }, - (path) => { this.syncLocallyCreatedFile(path); }, - (args) => { this.syncLocallyUpdatedFile(args); }, - (path) => { this.syncLocallyDeletedFile(path); }, - ); + // Offline scan wipes the event queue via `queue.clear()` and then + // rebuilds events from disk. That MUST NOT race against an + // in-flight drain iteration that may already hold a reference to + // a freshly-cleared event — chain onto the drain so the scan runs + // between drain ticks, never concurrently. + await this.chainOntoDrain(async () => { + await scheduleOfflineChanges( + { logger: this.logger, operations: this.operations, queue: this.queue }, + (path) => { this.syncLocallyCreatedFile(path); }, + (args) => { this.syncLocallyUpdatedFile(args); }, + (path) => { this.syncLocallyDeletedFile(path); }, + ); + }); await this.scheduleDrain(); } @@ -271,9 +315,27 @@ export class Syncer { private ensureDraining(): void { - this.draining = (this.draining ?? Promise.resolve()).then( - async () => this.drain() + void this.chainOntoDrain(async () => this.drain()); + } + + /** + * Serialize a unit of work onto the same promise chain the drain + * uses. This is how direct WebSocket handlers (`syncRemotelyChangedPath`, + * offline-scan) avoid racing against the drain loop: every mutator of + * the queue / disk goes through this single chain, in order of arrival. + */ + private async chainOntoDrain(work: () => Promise): Promise { + const chained = (this.draining ?? Promise.resolve()).then( + async () => work() ); + // We track the chain via `this.draining` so later work chains onto + // the latest link. Swallow the result-typed value for storage; the + // caller still awaits the true result via `chained`. + this.draining = chained.then( + () => undefined, + () => undefined + ); + return chained; } private async scheduleDrain(): Promise { @@ -338,6 +400,20 @@ export class Syncer { this.logger.error( `Server rejected ${event.type} request: ${e.message}` ); + // The event was already shifted off the queue before + // `processEvent` ran; if it was a Create, its resolver + // promise would otherwise hang forever, blocking any + // queued Delete / SyncLocal that `await`s it. + if (event.type === SyncEventType.Create) { + event.resolvers?.promise.catch(() => { + /* suppressed */ + }); + event.resolvers?.reject( + new Error( + `Create was cancelled — server rejected the request (${e.message})` + ) + ); + } return; } throw e; @@ -366,6 +442,7 @@ export class Syncer { const response = await this.syncService.create({ relativePath: event.originalPath, + lastSeenVaultUpdateId: this.queue.lastSeenUpdateId, contentBytes }); @@ -394,7 +471,8 @@ export class Syncer { path: effectivePath, response, contentHash, - originalContentBytes: contentBytes + originalContentBytes: contentBytes, + createEvent: event }); this.history.addHistoryEntry({ @@ -658,61 +736,71 @@ export class Syncer { } else { const responseBytes = base64ToBytes(fullVersion.contentBase64); - // Handle remote path change - let actualPath = currentPath; + // Path reconciliation fallback for the reconnect case. + // + // In steady-state streaming, server-initiated renames arrive as + // dedicated `PathChange` WebSocket events and are handled by + // `syncRemotelyChangedPath`. But the reconnect catch-up path + // (`get_unseen_documents` → `VaultUpdate(is_initial_sync=…)`) + // replays *versions* from the DB — `PathChange` is emission- + // only and not replayed. Without this branch, a pure rename + // that happened while we were disconnected would leave our + // local file stuck at its old path forever. + // + // Only apply the server's path when the record's + // `remoteRelativePath` still matches `currentPath` — that means + // we haven't locally renamed since we last heard from the + // server, so the server's path is authoritative. Any local + // rename in flight keeps priority (it'll be resolved by the + // server on its next write). + let targetPath = currentPath; if ( fullVersion.relativePath !== currentPath && record.remoteRelativePath === currentPath ) { - actualPath = fullVersion.relativePath; - await this.operations.delete(fullVersion.relativePath); - await this.operations.move( - currentPath, - fullVersion.relativePath - ); + await this.operations.move(currentPath, fullVersion.relativePath); + targetPath = fullVersion.relativePath; } await this.operations.write( - actualPath, + targetPath, contentBytes, responseBytes ); // Re-read and re-hash after write (the 3-way merge may produce different content) - const afterWriteBytes = await this.operations.read(actualPath); + const afterWriteBytes = await this.operations.read(targetPath); const afterWriteHash = await hash(afterWriteBytes); - this.queue.setDocument(actualPath, { + if (targetPath !== currentPath) { + this.queue.removeDocument(currentPath); + } + this.queue.setDocument(targetPath, { documentId: fullVersion.documentId, parentVersionId: fullVersion.vaultUpdateId, remoteHash: afterWriteHash, remoteRelativePath: fullVersion.relativePath }); - // If the path changed, remove the old entry - if (actualPath !== currentPath) { - this.queue.removeDocument(currentPath); - } - await this.updateCache( fullVersion.vaultUpdateId, responseBytes, - actualPath + targetPath ); this.history.addHistoryEntry({ status: SyncStatus.SUCCESS, details: - actualPath !== currentPath + targetPath !== currentPath ? { - type: SyncType.MOVE, - relativePath: actualPath, - movedFrom: currentPath - } + type: SyncType.MOVE, + relativePath: targetPath, + movedFrom: currentPath + } : { - type: SyncType.UPDATE, - relativePath: actualPath - }, + type: SyncType.UPDATE, + relativePath: targetPath + }, message: "Successfully downloaded remotely updated file from the server", author: fullVersion.userId, @@ -750,17 +838,22 @@ export class Syncer { return; } - const deconflictedPath = await this.operations.ensureClearPath( - remoteVersion.relativePath - ); - if (deconflictedPath !== undefined) { - // The displaced file was moved to a deconflicted path. - // Remove its document record so the offline scan treats - // it as a new file rather than an existing document that - // needs its path synced (which would create duplicates) - this.queue.removeDocument(deconflictedPath); + // Special case: local has an *unsynced* new file at the same path. + // The client must cancel the outgoing Create and merge the two files + // instead of displacing the local one to a conflict path — those + // files are semantically "the same user-intended document" that two + // devices created concurrently, so we want to preserve both sides' + // edits, not shelve one aside. + if (this.queue.hasPendingCreateAt(remoteVersion.relativePath)) { + await this.mergeUnsyncedLocalWithRemoteCreate( + remoteVersion, + contentBytes + ); + return; } + await this.operations.ensureClearPath(remoteVersion.relativePath); + const contentHash = await hash(contentBytes); this.queue.setDocument(remoteVersion.relativePath, { documentId: remoteVersion.documentId, @@ -794,6 +887,131 @@ export class Syncer { }); } + // A remote create landed at a path where we have an unsynced local + // create. How we resolve depends on whether both sides are mergeable + // text: text gets an in-place union merge and one follow-up update; + // binary falls through to displacement so *both* files survive. + private async mergeUnsyncedLocalWithRemoteCreate( + remoteVersion: DocumentVersionWithoutContent, + remoteContent: Uint8Array + ): Promise { + const path = remoteVersion.relativePath; + const localContent = await this.operations.read(path); + + const canMergeText = + isFileTypeMergable( + path, + (await this.serverConfig.getConfig()).mergeableFileExtensions + ) && + !isBinary(localContent) && + !isBinary(remoteContent); + + if (!canMergeText) { + // Binary (or non-mergeable) concurrent creates: leave the local + // Create in the queue and let the default displacement flow + // take over (local bytes are moved to `conflict--…` by + // `ensureClearPath`, remote bytes take `path`). When the Create + // eventually fires it reads the remote content at `path` — not + // what we want — so cancel *just* the Create event and + // re-enqueue a fresh one sourced from the displaced path, so + // the server receives the user's original bytes and dedupes + // the path on its own. + this.queue.cancelPendingCreate(path); + + // `ensureClearPath` may return `undefined` if the file was + // deleted between `read(path)` above and this call (a TOCTOU + // race with a concurrent filesystem delete). That's fine: + // nothing to displace means no local bytes to preserve, and + // we just proceed with the remote content. + const conflictPath = + await this.operations.ensureClearPath(path); + + this.queue.setDocument(path, { + documentId: remoteVersion.documentId, + parentVersionId: remoteVersion.vaultUpdateId, + remoteHash: await hash(remoteContent), + remoteRelativePath: path + }); + await this.operations.create(path, remoteContent); + await this.updateCache( + remoteVersion.vaultUpdateId, + remoteContent, + path + ); + + this.history.addHistoryEntry({ + status: SyncStatus.SUCCESS, + details: { + type: SyncType.CREATE, + relativePath: path + }, + message: + conflictPath !== undefined + ? `Adopted remote create at ${path}; unsynced local bytes preserved at ${conflictPath} for manual recovery` + : `Adopted remote create at ${path}; local file had already been removed`, + author: remoteVersion.userId, + timestamp: new Date(remoteVersion.updatedDate) + }); + return; + } + + // Mergeable text: union-merge with empty parent (every byte in + // either side is treated as an insertion), overwrite disk, and + // push the merged result to the server if it diverged from the + // remote copy. Cancelling the Create and re-emitting as a + // SyncLocal update lets the existing merge-response pipeline + // handle parentVersionId/content reconciliation end-to-end. + this.queue.cancelPendingCreate(path); + + const mergedContent = new TextEncoder().encode( + reconcile( + "", + new TextDecoder().decode(localContent), + new TextDecoder().decode(remoteContent) + ).text + ); + + // Adopt the remote document's identity locally *before* touching + // disk so an interleaved event can't mistake the file for a fresh + // create again. `remoteHash` is deliberately the server's content + // hash (not the merged one) so the SyncLocal below sees a real + // diff and actually uploads the merge. + const remoteHash = await hash(remoteContent); + this.queue.setDocument(path, { + documentId: remoteVersion.documentId, + parentVersionId: remoteVersion.vaultUpdateId, + remoteHash, + remoteRelativePath: path + }); + + // Overwrite disk with the merged result. We pass `localContent` as + // the "expected" content so `operations.write`'s internal 3-way + // merge is a no-op (expected == disk ⇒ apply `new` verbatim). + await this.operations.write(path, localContent, mergedContent); + + await this.updateCache( + remoteVersion.vaultUpdateId, + remoteContent, + path + ); + + const mergedHash = await hash(mergedContent); + if (mergedHash !== remoteHash) { + this.syncLocallyUpdatedFile({ relativePath: path }); + } + + this.history.addHistoryEntry({ + status: SyncStatus.SUCCESS, + details: { + type: SyncType.CREATE, + relativePath: path + }, + message: "Merged unsynced local file with concurrent remote create", + author: remoteVersion.userId, + timestamp: new Date(remoteVersion.updatedDate) + }); + } + private async sendUpdate( @@ -834,96 +1052,139 @@ export class Syncer { path, response, contentHash, - originalContentBytes + originalContentBytes, + createEvent }: { path: RelativePath; response: DocumentUpdateResponse; contentHash: string; originalContentBytes: Uint8Array; + // When processing a Create, pass the originating event so its + // `resolvers` promise can be fulfilled (or rejected, on a deleted + // response). Dependent SyncLocal/Delete events are chained through + // that promise and would otherwise `await` forever. + createEvent?: Extract; }): Promise { if (response.isDeleted) { + // A Create that the server returned as already-deleted means + // nothing we can sync — reject the waiting promise so chained + // Delete / SyncLocal events skip themselves instead of hanging. + if (createEvent?.resolvers !== undefined) { + createEvent.resolvers.promise.catch(() => { + /* suppressed — consumer may not be listening */ + }); + createEvent.resolvers.reject( + new Error( + "Create was cancelled — server reported the document as deleted" + ) + ); + } + + // Capture the documentId of the record we *believe* is at + // `path` now. If a concurrent `syncRemotelyChangedPath` moves + // this document between our exists-check and our read, the + // record at `path` after those awaits may belong to a + // DIFFERENT document. Guard against that. + const originalRecord = + this.queue.getSettledDocumentByPath(path); + const originalDocumentId = originalRecord?.documentId; + // If the local file has been edited, re-create it as a new - // document so local edits survive the remote delete + // document so local edits survive the remote delete — but only + // if nothing else is already queuing a Create for this path, to + // avoid doubling up when offline-change detection races with us. if (await this.operations.exists(path)) { const localBytes = await this.operations.read(path); const localHash = await hash(localBytes); - const record = this.queue.getSettledDocumentByPath(path); - if (record !== undefined && localHash !== record.remoteHash) { + const currentRecord = + this.queue.getSettledDocumentByPath(path); + // Re-verify the record's identity hasn't shifted under us. + if ( + currentRecord !== undefined && + currentRecord.documentId === originalDocumentId && + localHash !== currentRecord.remoteHash && + !this.queue.hasPendingCreateAt(path) + ) { this.queue.removeDocument(path); this.syncLocallyCreatedFile(path); return; } } - await this.operations.delete(path); - this.queue.removeDocument(path); + // Only delete on disk if the record at `path` is still the one + // we expected — if a PathChange moved another doc here, we + // shouldn't delete its file. + const finalRecord = this.queue.getSettledDocumentByPath(path); + if ( + finalRecord === undefined || + finalRecord.documentId === originalDocumentId + ) { + await this.operations.delete(path); + this.queue.removeDocument(path); + } return; } - let actualPath = path; - - // Server may have changed the path (e.g. first-rename-wins conflict) - if (response.relativePath !== path) { - actualPath = response.relativePath; - const displacedPath = await this.operations.move( - path, - response.relativePath - ); - if (displacedPath !== undefined) { - const displacedRecord = - this.queue.getSettledDocumentByPath(displacedPath); - if (displacedRecord !== undefined) { - const displacedBytes = - await this.operations.read(displacedPath); - const displacedHash = await hash(displacedBytes); - if (displacedHash !== displacedRecord.remoteHash) { - this.queue.enqueue({ type: SyncEventType.SyncLocal, path: displacedPath }); - } - } - } - // Remove old path entry; the new path will be set below - this.queue.removeDocument(path); - } + // The response carries content only — path reconciliation is the + // sole responsibility of the `PathChange` WebSocket event, which + // fires independently for renames/dedupes. We therefore always + // record the current local `path` here; an in-flight `PathChange` + // will move the file and fix `remoteRelativePath` if the server + // placed the document somewhere else. + const existingRecord = this.queue.getSettledDocumentByPath(path); + const remoteRelativePath = existingRecord?.remoteRelativePath ?? path; + let record: DocumentRecord; if ("type" in response && response.type === "MergingUpdate") { const responseBytes = base64ToBytes(response.contentBase64); await this.operations.write( - actualPath, + path, originalContentBytes, responseBytes ); // Re-read and re-hash after write (invariant #3) - const afterWriteBytes = await this.operations.read(actualPath); + const afterWriteBytes = await this.operations.read(path); const afterWriteHash = await hash(afterWriteBytes); - this.queue.setDocument(actualPath, { + record = { documentId: response.documentId, parentVersionId: response.vaultUpdateId, remoteHash: afterWriteHash, - remoteRelativePath: response.relativePath - }); + remoteRelativePath + }; // Cache the SERVER's content, not local (invariant #2) await this.updateCache( response.vaultUpdateId, responseBytes, - actualPath + path ); } else { // Fast-forward update: no merge needed - this.queue.setDocument(actualPath, { + record = { documentId: response.documentId, parentVersionId: response.vaultUpdateId, remoteHash: contentHash, - remoteRelativePath: response.relativePath - }); + remoteRelativePath + }; await this.updateCache( response.vaultUpdateId, originalContentBytes, - actualPath + path ); } + + // For a Create, fulfill the resolver promise and replace any + // `documentId: Promise<...>` references in queued Delete/SyncLocal + // events with the now-known string id. For everything else a plain + // `setDocument` is enough — the record's identity was already + // resolved when the Create originally settled. + if (createEvent !== undefined) { + this.queue.resolveCreate(createEvent, record); + } else { + this.queue.setDocument(path, record); + } } private async updateCache( diff --git a/frontend/sync-client/src/utils/conflict-path.test.ts b/frontend/sync-client/src/utils/conflict-path.test.ts new file mode 100644 index 00000000..ba39c238 --- /dev/null +++ b/frontend/sync-client/src/utils/conflict-path.test.ts @@ -0,0 +1,85 @@ +import { describe, it } from "node:test"; +import assert from "node:assert"; +import { buildConflictFileName, isConflictPath } from "./conflict-path"; + +describe("buildConflictFileName", () => { + it("truncates to the filesystem byte limit while preserving the extension", () => { + const result = buildConflictFileName(`${"a".repeat(300)}.md`); + assert.ok(Buffer.byteLength(result, "utf8") <= 255); + assert.ok(result.endsWith(".md")); + }); + + it("truncates on a codepoint boundary for multi-byte UTF-8 names", () => { + // "🎉" is 4 bytes in UTF-8; splitting one would yield U+FFFD. + const result = buildConflictFileName(`${"🎉".repeat(100)}.md`); + assert.ok(Buffer.byteLength(result, "utf8") <= 255); + assert.ok(!result.includes("�")); + }); + + it("does not split a ZWJ emoji sequence", () => { + // 👨‍👩‍👧 is one grapheme but 5 code points joined by U+200D. + // A codepoint-only truncation can leave a dangling ZWJ. + const family = "\u{1F468}‍\u{1F469}‍\u{1F467}"; + const result = buildConflictFileName(`${family.repeat(20)}.md`); + assert.ok(Buffer.byteLength(result, "utf8") <= 255); + const stem = result.slice( + "conflict-".length + 36 + 1, + result.length - ".md".length + ); + assert.strictEqual( + stem.length % family.length, + 0, + "stem length must be a whole number of families" + ); + assert.ok( + !stem.endsWith("‍"), + "stem must not end with a dangling ZWJ" + ); + }); + + it("does not split a base character from its combining mark", () => { + // NFD "é" = "e" (U+0065) + combining acute (U+0301): one grapheme, + // two code points. A codepoint-only loop can strand the accent. + const grapheme = "é"; + const result = buildConflictFileName(`${grapheme.repeat(150)}.md`); + assert.ok(Buffer.byteLength(result, "utf8") <= 255); + const stem = result.slice( + "conflict-".length + 36 + 1, + result.length - ".md".length + ); + assert.strictEqual( + stem.length % grapheme.length, + 0, + "stem length must be a whole number of graphemes" + ); + assert.ok( + !stem.endsWith("́") || stem.endsWith(grapheme), + "combining mark must stay attached to its base character" + ); + }); +}); + +describe("isConflictPath", () => { + it("does not misclassify user-authored names that start with `conflict-`", () => { + assert.strictEqual(isConflictPath("conflict-resolution.md"), false); + }); + + it("only inspects the final path segment", () => { + assert.strictEqual( + isConflictPath( + "conflict-12345678-1234-1234-1234-123456789abc-x/note.md" + ), + false + ); + assert.strictEqual( + isConflictPath( + "a/b/conflict-12345678-1234-1234-1234-123456789abc-note.md" + ), + true + ); + }); + + it("round-trips with buildConflictFileName", () => { + assert.strictEqual(isConflictPath(buildConflictFileName("note.md")), true); + }); +}); diff --git a/frontend/sync-client/src/utils/conflict-path.ts b/frontend/sync-client/src/utils/conflict-path.ts new file mode 100644 index 00000000..32c6591c --- /dev/null +++ b/frontend/sync-client/src/utils/conflict-path.ts @@ -0,0 +1,66 @@ +import type { RelativePath } from "../sync-operations/types"; + +// Local-only files displaced by `FileOperations.ensureClearPath` are named +// `conflict--`. The UUID is a full RFC-4122 v4 value so +// a user-authored filename that happens to start with `conflict-` doesn't +// get misclassified. +const CONFLICT_UUID_REGEX = + /^conflict-[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}-/u; + +// Safe segment length for common filesystems (ext4 / NTFS / APFS all cap +// at 255 bytes). `conflict-<36-char-uuid>-` adds 46 bytes; reserve a few +// extra bytes for a future prefix bump and leave room for multi-byte UTF-8 +// characters in the original name. +const CONFLICT_PREFIX_LEN = "conflict-".length + 36 + 1; +const MAX_SEGMENT_BYTES = 255; +const MAX_ORIGINAL_BYTES = MAX_SEGMENT_BYTES - CONFLICT_PREFIX_LEN - 4; + +export function buildConflictFileName(fileName: string): string { + // Truncate the original name if keeping it whole would bust the + // filesystem's segment-length cap. Preserve the trailing extension + // so the file is still recognizable / openable. + const safeName = truncateFileNameToByteLimit(fileName, MAX_ORIGINAL_BYTES); + return `conflict-${crypto.randomUUID()}-${safeName}`; +} + +function truncateFileNameToByteLimit( + fileName: string, + maxBytes: number +): string { + const encoder = new TextEncoder(); + if (encoder.encode(fileName).byteLength <= maxBytes) return fileName; + + const dotIndex = fileName.lastIndexOf("."); + // Dotfile (starts with "." and nothing else) → no extension to preserve. + const hasExtension = dotIndex > 0; + const extension = hasExtension ? fileName.slice(dotIndex) : ""; + const stem = hasExtension ? fileName.slice(0, dotIndex) : fileName; + + const extensionBytes = encoder.encode(extension).byteLength; + const stemBudget = Math.max(0, maxBytes - extensionBytes); + + // Walk the stem by grapheme cluster so we never split an emoji sequence + // (e.g. ZWJ families, skin-tone modifiers) or a base+combining-mark pair. + const segmenter = new Intl.Segmenter(undefined, { granularity: "grapheme" }); + let truncatedStem = ""; + let usedBytes = 0; + for (const { segment } of segmenter.segment(stem)) { + const segmentBytes = encoder.encode(segment).byteLength; + if (usedBytes + segmentBytes > stemBudget) break; + truncatedStem += segment; + usedBytes += segmentBytes; + } + return truncatedStem + extension; +} + +/** + * Is `path`'s final segment a conflict-displaced filename? + * + * Any sync code that would otherwise create/update/delete/sync the path + * should short-circuit when this returns true: conflict-displaced files are + * strictly local and must stay invisible to the server. + */ +export function isConflictPath(path: RelativePath): boolean { + const fileName = path.substring(path.lastIndexOf("/") + 1); + return CONFLICT_UUID_REGEX.test(fileName); +} diff --git a/sync-server/src/app_state/database.rs b/sync-server/src/app_state/database.rs index e8c02b31..83d2561c 100644 --- a/sync-server/src/app_state/database.rs +++ b/sync-server/src/app_state/database.rs @@ -1,5 +1,9 @@ use core::time::Duration; -use std::{collections::HashMap, sync::Arc, sync::atomic::{AtomicU64, Ordering}}; +use std::{ + collections::HashMap, + sync::Arc, + sync::atomic::{AtomicU64, Ordering}, +}; use anyhow::{Context as _, Result}; use log::info; @@ -96,21 +100,21 @@ pub struct WriteTransaction { } impl WriteTransaction { - async fn new(pool: &Pool, write_guard: tokio::sync::OwnedMutexGuard<()>) -> Result { + async fn new( + pool: &Pool, + write_guard: tokio::sync::OwnedMutexGuard<()>, + ) -> Result { let mut conn = pool .acquire() .await .context("Cannot acquire connection for write transaction")?; - if let Err(e) = sqlx::query("BEGIN IMMEDIATE") - .execute(&mut *conn) - .await - { + if let Err(e) = sqlx::query("BEGIN IMMEDIATE").execute(&mut *conn).await { let is_busy = match &e { sqlx::Error::Database(db_err) => { // SQLITE_BUSY base code is 5. Extended codes share base 5. - let busy_by_code = db_err.code().is_some_and(|c| { - c.parse::().is_ok_and(|n| n & 0xFF == 5) - }); + let busy_by_code = db_err + .code() + .is_some_and(|c| c.parse::().is_ok_and(|n| n & 0xFF == 5)); busy_by_code || db_err.message().contains("database is locked") } _ => false, @@ -120,7 +124,10 @@ impl WriteTransaction { } return Err(e).context("Cannot begin immediate transaction"); } - Ok(Self { conn: Some(conn), _write_guard: write_guard }) + Ok(Self { + conn: Some(conn), + _write_guard: write_guard, + }) } pub async fn commit(mut self) -> Result<()> { @@ -215,10 +222,7 @@ impl Database { Ok(vaults) } - pub async fn get_vault_stats( - &self, - vault: &VaultId, - ) -> Result { + pub async fn get_vault_stats(&self, vault: &VaultId) -> Result { let pool = self.get_connection_pool(vault).await?; let row = sqlx::query!( r#" @@ -295,10 +299,7 @@ impl Database { Ok(database) } - async fn create_vault_database( - config: &DatabaseConfig, - vault: &VaultId, - ) -> Result { + async fn create_vault_database(config: &DatabaseConfig, vault: &VaultId) -> Result { let file_name = config .databases_directory_path .join(format!("{vault}.sqlite")); @@ -384,7 +385,6 @@ impl Database { Ok(VaultPools { reader, writer }) } - fn validate_vault_id(vault: &VaultId) -> Result<()> { if vault.is_empty() { anyhow::bail!("Vault ID must not be empty"); @@ -427,12 +427,12 @@ impl Database { let vault_clone = vault.clone(); let pools = vault_pool .cell - .get_or_try_init(|| async { - Self::create_vault_database(&config, &vault_clone).await - }) + .get_or_try_init(|| async { Self::create_vault_database(&config, &vault_clone).await }) .await?; - vault_pool.last_accessed_ms.store(self.now_ms(), Ordering::Relaxed); + vault_pool + .last_accessed_ms + .store(self.now_ms(), Ordering::Relaxed); Ok(pools.clone()) } @@ -739,9 +739,6 @@ impl Database { .await .context("Failed to commit transaction")?; - // Both sends are synchronous: there's no `.await` between the - // `commit()` above and function return, so a task cancellation - // can't drop the broadcast and leave peers permanently behind. if broadcast.content_changed { // Content events are filtered out for the origin device — the // origin already has the content (or learns about the merge @@ -945,7 +942,11 @@ impl Database { let closures: Vec<_> = idle_pools .into_iter() .filter_map(|(vault_id, vault_pool)| { - vault_pool.cell.get().cloned().map(|pools| (vault_id, pools)) + vault_pool + .cell + .get() + .cloned() + .map(|pools| (vault_id, pools)) }) .collect(); @@ -958,8 +959,7 @@ impl Database { let writer_clone = pools.writer.clone(); let ckpt_result = tokio::task::spawn_blocking(move || { futures::executor::block_on( - sqlx::query("PRAGMA wal_checkpoint(TRUNCATE)") - .execute(&writer_clone), + sqlx::query("PRAGMA wal_checkpoint(TRUNCATE)").execute(&writer_clone), ) }) .await; diff --git a/sync-server/src/app_state/websocket/models.rs b/sync-server/src/app_state/websocket/models.rs index 97247229..282aa03a 100644 --- a/sync-server/src/app_state/websocket/models.rs +++ b/sync-server/src/app_state/websocket/models.rs @@ -2,7 +2,7 @@ use serde::{Deserialize, Serialize}; use ts_rs::TS; use crate::app_state::database::models::{ - DeviceId, DocumentId, DocumentVersionWithoutContent, VaultUpdateId, + DeviceId, DocumentId, DocumentVersionWithoutContent, UserId, VaultUpdateId, }; #[derive(TS, Deserialize, Clone, Debug)] @@ -22,6 +22,7 @@ pub struct CursorPositionFromClient { } #[derive(TS, Serialize, Deserialize, Clone, Debug)] +#[serde(rename_all = "camelCase")] pub struct DocumentWithCursors { // It's None in case the document is dirty. // We still want to sync the cursor to mark diff --git a/sync-server/src/server/delete_document.rs b/sync-server/src/server/delete_document.rs index 3057bd6e..aeec13d3 100644 --- a/sync-server/src/server/delete_document.rs +++ b/sync-server/src/server/delete_document.rs @@ -7,7 +7,7 @@ use axum_extra::TypedHeader; use log::{debug, info}; use serde::Deserialize; -use super::{device_id_header::DeviceIdHeader, requests::DeleteDocumentVersion}; +use super::device_id_header::DeviceIdHeader; use crate::{ app_state::{ AppState, @@ -38,7 +38,6 @@ pub async fn delete_document( Extension(user): Extension, TypedHeader(device_id): TypedHeader, State(state): State, - Json(_request): Json, ) -> Result, SyncServerError> { debug!("Deleting document `{document_id}` in vault `{vault_id}`"); diff --git a/sync-server/src/server/requests.rs b/sync-server/src/server/requests.rs index 250c65d7..f0499194 100644 --- a/sync-server/src/server/requests.rs +++ b/sync-server/src/server/requests.rs @@ -41,5 +41,3 @@ pub struct UpdateTextDocumentVersion { pub content: Vec, } -#[derive(Debug, Deserialize)] -pub struct DeleteDocumentVersion {} diff --git a/sync-server/src/utils/sanitize_path.rs b/sync-server/src/utils/sanitize_path.rs index e8a2a335..46dcc64c 100644 --- a/sync-server/src/utils/sanitize_path.rs +++ b/sync-server/src/utils/sanitize_path.rs @@ -1,9 +1,20 @@ use anyhow::{Result, ensure}; +use crate::consts::MAX_RELATIVE_PATH_LEN; + /// Sanitize the document's path to allow all clients to create the same path in /// their filesystem. If we didn't do this server-side, client's would need to /// deal with mapping invalid names to valid ones and then back. pub fn sanitize_path(path: &str) -> Result { + // Enforce the length cap at the single chokepoint every create/update + // handler goes through, so clients can't blow up axum's JSON/multipart + // parser with a 1 MB `relative_path` before the handler ever runs. + // The WebSocket cursor handler enforces this separately. + ensure!( + path.len() <= MAX_RELATIVE_PATH_LEN, + "Relative path exceeds the maximum length of {MAX_RELATIVE_PATH_LEN} bytes" + ); + let options = sanitize_filename::Options { truncate: true, windows: true, // Windows is the lowest common denominator