diff --git a/frontend/sync-client/src/sync-operations/sync-event-queue.test.ts b/frontend/sync-client/src/sync-operations/sync-event-queue.test.ts index 8c4d68ab..aae010b6 100644 --- a/frontend/sync-client/src/sync-operations/sync-event-queue.test.ts +++ b/frontend/sync-client/src/sync-operations/sync-event-queue.test.ts @@ -38,12 +38,9 @@ describe("SyncEventQueue", () => { remoteHash: "hash-a" }); - queue.enqueue({ type: SyncEventType.SyncLocal, documentId: "A", path: "a.md", originalPath: "a.md" }); - queue.enqueue({ type: SyncEventType.SyncLocal, documentId: "A", path: "a.md", originalPath: "a.md" }); - queue.enqueue({ - type: SyncEventType.Delete, - documentId: "A", - }); + queue.enqueue({ type: SyncEventType.SyncLocal, path: "a.md" }); + queue.enqueue({ type: SyncEventType.SyncLocal, path: "a.md" }); + queue.enqueue({ type: SyncEventType.Delete, path: "a.md" }); const event = await queue.next(); assert.strictEqual(event?.type, SyncEventType.Delete); @@ -61,9 +58,9 @@ describe("SyncEventQueue", () => { remoteHash: "hash-a" }); - queue.enqueue({ type: SyncEventType.SyncLocal, documentId: "A", path: "a.md", originalPath: "a.md" }); - queue.enqueue({ type: SyncEventType.SyncLocal, documentId: "A", path: "a.md", originalPath: "a.md" }); - queue.enqueue({ type: SyncEventType.SyncLocal, documentId: "A", path: "a.md", originalPath: "a.md" }); + queue.enqueue({ type: SyncEventType.SyncLocal, path: "a.md" }); + queue.enqueue({ type: SyncEventType.SyncLocal, path: "a.md" }); + queue.enqueue({ type: SyncEventType.SyncLocal, path: "a.md" }); const event = await queue.next(); assert.strictEqual(event?.type, SyncEventType.SyncLocal); @@ -96,8 +93,8 @@ describe("SyncEventQueue", () => { it("create events are returned FIFO", async () => { const queue = createQueue(); - queue.enqueue({ type: SyncEventType.Create, path: "a.md", originalPath: "a.md" }); - queue.enqueue({ type: SyncEventType.Create, path: "b.md", originalPath: "b.md" }); + queue.enqueue({ type: SyncEventType.Create, path: "a.md" }); + queue.enqueue({ type: SyncEventType.Create, path: "b.md" }); const first = await queue.next(); assert.strictEqual(first?.type, SyncEventType.Create); @@ -112,14 +109,16 @@ describe("SyncEventQueue", () => { } }); - it("delete uses the provided documentId", async () => { + it("delete resolves documentId from path", async () => { const queue = createQueue(); - - queue.enqueue({ - type: SyncEventType.Delete, + queue.setDocument("a.md", { documentId: "A", + parentVersionId: 1, + remoteHash: "hash-a" }); + queue.enqueue({ type: SyncEventType.Delete, path: "a.md" }); + const event = await queue.next(); assert.strictEqual(event?.type, SyncEventType.Delete); if (event?.type === SyncEventType.Delete) { @@ -127,6 +126,12 @@ describe("SyncEventQueue", () => { } }); + it("delete for unknown path is silently ignored", () => { + const queue = createQueue(); + queue.enqueue({ type: SyncEventType.Delete, path: "unknown.md" }); + assert.strictEqual(queue.size, 0); + }); + it("document store CRUD operations work correctly", () => { const queue = createQueue(); @@ -154,37 +159,17 @@ describe("SyncEventQueue", () => { assert.strictEqual(queue.getSettledDocumentByPath("a.md"), undefined); }); - it("moveDocument moves a document and returns displaced documentId", () => { + it("SyncLocal with oldPath moves the document in the store", () => { const queue = createQueue(); queue.setDocument("a.md", { documentId: "A", parentVersionId: 1, remoteHash: "hash-a" }); - queue.setDocument("b.md", { - documentId: "B", - parentVersionId: 2, - remoteHash: "hash-b" - }); - const displacedId = queue.moveDocument("a.md", "b.md"); - assert.strictEqual(displacedId, "B"); + queue.enqueue({ type: SyncEventType.SyncLocal, path: "b.md", oldPath: "a.md" }); assert.strictEqual(queue.getSettledDocumentByPath("a.md"), undefined); assert.strictEqual(queue.getSettledDocumentByPath("b.md")?.documentId, "A"); - assert.strictEqual(queue.documentCount, 1); - }); - - it("moveDocument returns undefined when target is unoccupied", () => { - const queue = createQueue(); - queue.setDocument("a.md", { - documentId: "A", - parentVersionId: 1, - remoteHash: "hash-a" - }); - - const displacedId = queue.moveDocument("a.md", "b.md"); - assert.strictEqual(displacedId, undefined); - assert.strictEqual(queue.getSettledDocumentByPath("b.md")?.documentId, "A"); }); it("interleaved events for different documents are not confused", async () => { @@ -200,13 +185,10 @@ describe("SyncEventQueue", () => { remoteHash: "hash-b" }); - queue.enqueue({ type: SyncEventType.SyncLocal, documentId: "A", path: "a.md", originalPath: "a.md" }); - queue.enqueue({ type: SyncEventType.SyncLocal, documentId: "B", path: "b.md", originalPath: "b.md" }); - queue.enqueue({ - type: SyncEventType.Delete, - documentId: "A", - }); - queue.enqueue({ type: SyncEventType.SyncLocal, documentId: "B", path: "b.md", originalPath: "b.md" }); + queue.enqueue({ type: SyncEventType.SyncLocal, path: "a.md" }); + queue.enqueue({ type: SyncEventType.SyncLocal, path: "b.md" }); + queue.enqueue({ type: SyncEventType.Delete, path: "a.md" }); + queue.enqueue({ type: SyncEventType.SyncLocal, path: "b.md" }); // First next() should see the delete for A (coalescing sync-local + delete) const first = await queue.next(); @@ -227,11 +209,13 @@ describe("SyncEventQueue", () => { it("delete discards subsequent sync-remote events for the same document", async () => { const queue = createQueue(); - - queue.enqueue({ - type: SyncEventType.Delete, + queue.setDocument("a.md", { documentId: "A", + parentVersionId: 1, + remoteHash: "hash-a" }); + + queue.enqueue({ type: SyncEventType.Delete, path: "a.md" }); queue.enqueue({ type: SyncEventType.SyncRemote, remoteVersion: fakeRemoteVersion("A", { vaultUpdateId: 5 }) @@ -250,12 +234,9 @@ describe("SyncEventQueue", () => { remoteHash: "hash-a" }); - queue.enqueue({ - type: SyncEventType.Delete, - documentId: "A", - }); - queue.enqueue({ type: SyncEventType.SyncLocal, documentId: "A", path: "a.md", originalPath: "a.md" }); - queue.enqueue({ type: SyncEventType.Create, path: "b.md", originalPath: "b.md" }); + queue.enqueue({ type: SyncEventType.Delete, path: "a.md" }); + queue.enqueue({ type: SyncEventType.SyncLocal, path: "a.md" }); + queue.enqueue({ type: SyncEventType.Create, path: "b.md" }); queue.enqueue({ type: SyncEventType.SyncRemote, remoteVersion: fakeRemoteVersion("A", { vaultUpdateId: 5 }) @@ -278,19 +259,15 @@ describe("SyncEventQueue", () => { remoteHash: "hash-a" }); - queue.enqueue({ type: SyncEventType.Create, path: "unknown.md", originalPath: "unknown.md" }); - const createPromise = queue.getCreatePromise("unknown.md"); - assert.ok(createPromise !== undefined); - const event = await queue.next(); // dequeue the create - assert.ok(event?.type === SyncEventType.Create); - // Resolve so the delete's await doesn't hang - event.resolvers!.resolve("NEW"); + // Create is pending — Delete for same path gets a promise documentId + queue.enqueue({ type: SyncEventType.Create, path: "unknown.md" }); + queue.enqueue({ type: SyncEventType.Delete, path: "unknown.md" }); + queue.enqueue({ type: SyncEventType.SyncLocal, path: "a.md" }); - queue.enqueue({ - type: SyncEventType.Delete, - documentId: createPromise, - }); - queue.enqueue({ type: SyncEventType.SyncLocal, documentId: "A", path: "a.md", originalPath: "a.md" }); + // Dequeue and resolve the Create + const event = await queue.next(); + assert.ok(event?.type === SyncEventType.Create); + event.resolvers!.resolve("NEW"); await queue.next(); // delete const second = await queue.next(); @@ -299,7 +276,7 @@ describe("SyncEventQueue", () => { it("getCreatePromise returns a promise resolved by the event's resolvers", async () => { const queue = createQueue(); - queue.enqueue({ type: SyncEventType.Create, path: "a.md", originalPath: "a.md" }); + queue.enqueue({ type: SyncEventType.Create, path: "a.md" }); const promise = queue.getCreatePromise("a.md"); assert.ok(promise !== undefined); @@ -315,7 +292,7 @@ describe("SyncEventQueue", () => { it("rejecting the event's resolvers rejects the create promise", async () => { const queue = createQueue(); - queue.enqueue({ type: SyncEventType.Create, path: "a.md", originalPath: "a.md" }); + queue.enqueue({ type: SyncEventType.Create, path: "a.md" }); const promise = queue.getCreatePromise("a.md"); assert.ok(promise !== undefined); @@ -331,8 +308,8 @@ describe("SyncEventQueue", () => { it("clear rejects all pending create promises", async () => { const queue = createQueue(); - queue.enqueue({ type: SyncEventType.Create, path: "a.md", originalPath: "a.md" }); - queue.enqueue({ type: SyncEventType.Create, path: "b.md", originalPath: "b.md" }); + queue.enqueue({ type: SyncEventType.Create, path: "a.md" }); + queue.enqueue({ type: SyncEventType.Create, path: "b.md" }); const promiseA = queue.getCreatePromise("a.md"); const promiseB = queue.getCreatePromise("b.md"); @@ -347,25 +324,21 @@ describe("SyncEventQueue", () => { it("create can be re-enqueued after being dequeued", async () => { const queue = createQueue(); - queue.enqueue({ type: SyncEventType.Create, path: "a.md", originalPath: "a.md" }); + queue.enqueue({ type: SyncEventType.Create, path: "a.md" }); await queue.next(); - queue.enqueue({ type: SyncEventType.Create, path: "a.md", originalPath: "a.md" }); + queue.enqueue({ type: SyncEventType.Create, path: "a.md" }); assert.strictEqual(queue.size, 1); }); it("silently ignores create events matching ignore patterns", () => { const queue = createQueue(["*.tmp", ".hidden/**"]); - queue.enqueue({ type: SyncEventType.Create, path: "scratch.tmp", originalPath: "scratch.tmp" }); - queue.enqueue({ - type: SyncEventType.Create, - path: ".hidden/secret.md", - originalPath: ".hidden/secret.md", - }); + queue.enqueue({ type: SyncEventType.Create, path: "scratch.tmp" }); + queue.enqueue({ type: SyncEventType.Create, path: ".hidden/secret.md" }); assert.strictEqual(queue.size, 0); - queue.enqueue({ type: SyncEventType.Create, path: "notes-new.md", originalPath: "notes-new.md" }); + queue.enqueue({ type: SyncEventType.Create, path: "notes-new.md" }); assert.strictEqual(queue.size, 1); queue.enqueue({ @@ -382,8 +355,8 @@ describe("SyncEventQueue", () => { parentVersionId: 1, remoteHash: "hash-a" }); - queue.enqueue({ type: SyncEventType.Create, path: "b.md", originalPath: "b.md" }); - queue.enqueue({ type: SyncEventType.SyncLocal, documentId: "A", path: "a.md", originalPath: "a.md" }); + queue.enqueue({ type: SyncEventType.Create, path: "b.md" }); + queue.enqueue({ type: SyncEventType.SyncLocal, path: "a.md" }); assert.strictEqual(queue.size, 2); @@ -454,12 +427,9 @@ describe("SyncEventQueue", () => { }); // Pending create adds a path - queue.enqueue({ type: SyncEventType.Create, path: "c.md", originalPath: "c.md" }); + queue.enqueue({ type: SyncEventType.Create, path: "c.md" }); // Pending delete removes a path - queue.enqueue({ - type: SyncEventType.Delete, - documentId: "A", - }); + queue.enqueue({ type: SyncEventType.Delete, path: "a.md" }); const paths = queue.trackedPaths(); assert.deepStrictEqual( @@ -471,30 +441,22 @@ describe("SyncEventQueue", () => { it("trackedPaths handles create-delete-create for the same path", () => { const queue = createQueue(); - queue.enqueue({ type: SyncEventType.Create, path: "a.md", originalPath: "a.md" }); - queue.enqueue({ - type: SyncEventType.Delete, - documentId: Promise.resolve("X"), - }); - queue.enqueue({ type: SyncEventType.Create, path: "a.md", originalPath: "a.md" }); + queue.enqueue({ type: SyncEventType.Create, path: "a.md" }); + // Delete gets promise documentId from pending Create + queue.enqueue({ type: SyncEventType.Delete, path: "a.md" }); + queue.enqueue({ type: SyncEventType.Create, path: "a.md" }); const paths = queue.trackedPaths(); assert.ok(paths.has("a.md")); }); - it("trackedPaths applies moves for promise-based SyncLocal events", () => { + it("trackedPaths applies moves for pending SyncLocal events", () => { const queue = createQueue(); - queue.enqueue({ type: SyncEventType.Create, path: "a.md", originalPath: "a.md" }); - const createPromise = queue.getCreatePromise("a.md")!; + queue.enqueue({ type: SyncEventType.Create, path: "a.md" }); // File was renamed from a.md to b.md - queue.enqueue({ - type: SyncEventType.SyncLocal, - documentId: createPromise, - path: "b.md", - originalPath: "a.md", - }); + queue.enqueue({ type: SyncEventType.SyncLocal, path: "b.md", oldPath: "a.md" }); const paths = queue.trackedPaths(); assert.ok(!paths.has("a.md")); @@ -504,21 +466,10 @@ describe("SyncEventQueue", () => { it("trackedPaths tracks multiple moves for the same pending create", () => { const queue = createQueue(); - queue.enqueue({ type: SyncEventType.Create, path: "a.md", originalPath: "a.md" }); - const createPromise = queue.getCreatePromise("a.md")!; + queue.enqueue({ type: SyncEventType.Create, path: "a.md" }); - queue.enqueue({ - type: SyncEventType.SyncLocal, - documentId: createPromise, - path: "b.md", - originalPath: "a.md", - }); - queue.enqueue({ - type: SyncEventType.SyncLocal, - documentId: createPromise, - path: "c.md", - originalPath: "a.md", - }); + queue.enqueue({ type: SyncEventType.SyncLocal, path: "b.md", oldPath: "a.md" }); + queue.enqueue({ type: SyncEventType.SyncLocal, path: "c.md", oldPath: "b.md" }); const paths = queue.trackedPaths(); assert.ok(!paths.has("a.md")); @@ -529,20 +480,12 @@ describe("SyncEventQueue", () => { it("resolveCreate settles the document and replaces promise documentIds in the queue", async () => { const queue = createQueue(); - queue.enqueue({ type: SyncEventType.Create, path: "a.md", originalPath: "a.md" }); + queue.enqueue({ type: SyncEventType.Create, path: "a.md" }); const createPromise = queue.getCreatePromise("a.md")!; - // Dependent events enqueued while create is in flight - queue.enqueue({ - type: SyncEventType.SyncLocal, - documentId: createPromise, - path: "a.md", - originalPath: "a.md", - }); - queue.enqueue({ - type: SyncEventType.Delete, - documentId: createPromise, - }); + // Dependent events enqueued while create is still pending + queue.enqueue({ type: SyncEventType.SyncLocal, path: "a.md" }); + queue.enqueue({ type: SyncEventType.Delete, path: "a.md" }); const event = await queue.next(); // dequeue the create assert.ok(event?.type === SyncEventType.Create); diff --git a/frontend/sync-client/src/sync-operations/sync-event-queue.ts b/frontend/sync-client/src/sync-operations/sync-event-queue.ts index ab987e46..623d9033 100644 --- a/frontend/sync-client/src/sync-operations/sync-event-queue.ts +++ b/frontend/sync-client/src/sync-operations/sync-event-queue.ts @@ -1,12 +1,12 @@ import type { Settings } from "../persistence/settings"; import type { Logger } from "../tracing/logger"; import { globsToRegexes } from "../utils/globs-to-regexes"; -import { CoveredValues } from "../utils/data-structures/min-covered"; import { removeFromArray } from "../utils/remove-from-array"; import { SyncEventType, type DocumentId, type DocumentRecord, + type FileSyncEvent, type RelativePath, type StoredSyncState, type SyncEvent, @@ -33,11 +33,6 @@ export class SyncEventQueue { // It maps pending changes onto the local filesystem. private readonly events: SyncEvent[] = []; - // TODO: remove - // Log the last seen update before which we've seen all ids so that - // on the next startup, we can skip re-syncing what we have already - private lastSeenUpdateIds: CoveredValues; - // file creations for paths matching any of these patterns will be ignored private ignorePatterns: RegExp[]; @@ -67,18 +62,7 @@ export class SyncEventQueue { } } - const { lastSeenUpdateId } = initialState; - - - this.lastSeenUpdateIds = new CoveredValues( - Math.max(0, lastSeenUpdateId ?? 0) - ); - - for (const [, record] of this.documents) { - this.lastSeenUpdateIds.add(record.parentVersionId); - } - - this.logger.debug(`Loaded ${this.documents.size} documents and lastSeenUpdateId=${this.lastSeenUpdateIds.min}`); + this.logger.debug(`Loaded ${this.documents.size} documents`); } public get size(): number { @@ -90,22 +74,16 @@ export class SyncEventQueue { } public get lastSeenUpdateId(): VaultUpdateId { - return this.lastSeenUpdateIds.min; - } - - public set lastSeenUpdateId(value: number) { - this.lastSeenUpdateIds.min = value; - this.saveInTheBackground(); - } - - public addSeenUpdateId(value: number): void { - const previousMin = this.lastSeenUpdateIds.min; - this.lastSeenUpdateIds.add(value); - if (previousMin !== this.lastSeenUpdateIds.min) { - this.saveInTheBackground(); + let max = 0; + for (const record of this.documents.values()) { + if (record.parentVersionId > max) { + max = record.parentVersionId; + } } + return max; } + // todo: let's remove public getSettledDocumentByPath(path: RelativePath): DocumentRecord | undefined { return this.documents.get(path); @@ -231,14 +209,13 @@ export class SyncEventQueue { ...record }) ), - lastSeenUpdateId: this.lastSeenUpdateIds.min + lastSeenUpdateId: this.lastSeenUpdateId }); } public resetState(): void { this.rejectAllPendingCreates(); this.documents.clear(); - this.lastSeenUpdateIds = new CoveredValues(0); this.saveInTheBackground(); } @@ -247,45 +224,49 @@ export class SyncEventQueue { this.events.length = 0; } - // todo: maybe move next() logic here to stop storing rubbish - public enqueue(event: SyncEvent): void { // new type - if (this.isIgnored(event)) return; - - if (event.type === SyncEventType.SyncLocal) { - const { path: newPath } = event; - - if (typeof event.documentId === "string") { - const existing = this.getDocumentByDocumentId(event.documentId); - if (!existing) { - throw new Error(`SyncLocal event for unknown documentId ${event.documentId}`); - } - - if (this.documents.has(newPath)) { - throw new Error(`SyncLocal event for documentId ${event.documentId} has newPath ${newPath} which is already tracked by another document`); - } - - if (existing.path !== newPath) { - this.documents.delete(existing.path); - this.documents.set(newPath, existing.record); - for (const e of this.events) { - if ( - e.type === SyncEventType.SyncLocal && - e.documentId === event.documentId - ) { - e.path = newPath; - } - } - this.saveInTheBackground(); - } - } else { - const oldPath = this.findCreatePathByPromise(event.documentId); - if (oldPath !== undefined && oldPath !== newPath) { - this.updatePendingCreatePath(oldPath, newPath); - } - } + public enqueue(input: FileSyncEvent): void { + if (input.type === SyncEventType.SyncRemote) { + this.events.push(input); + return; } - this.events.push(event); + const { path } = input; + + if (input.type === SyncEventType.Create) { + if (this.isIgnored(path)) { + this.logger.info(`Ignoring create for ${path} as it matches ignore patterns`); + return; + } + this.events.push({ type: SyncEventType.Create, path, originalPath: path }); + return; + } + + const lookupPath = (input.type === SyncEventType.SyncLocal && input.oldPath) ? input.oldPath : path; + const record = this.documents.get(lookupPath); + const documentId: DocumentId | Promise | undefined = + record?.documentId ?? this.getCreatePromise(lookupPath); + if (documentId === undefined) return; + + if (input.type === SyncEventType.Delete) { + this.events.push({ type: SyncEventType.Delete, documentId }); + return; + } + + if (input.oldPath !== undefined) { + if (typeof documentId === "string") { + this.documents.delete(input.oldPath); + this.documents.set(path, record!); + for (const e of this.events) { + if (e.type === SyncEventType.SyncLocal && e.documentId === documentId) { + e.path = path; + } + } + this.saveInTheBackground(); + } else { + this.updatePendingCreatePath(input.oldPath, path); + } + } + this.events.push({ type: SyncEventType.SyncLocal, documentId, path, originalPath: path }); } @@ -355,11 +336,8 @@ export class SyncEventQueue { return result; } - private isIgnored(event: SyncEvent): boolean { - if (event.type !== SyncEventType.Create) { - return false; - } - return this.ignorePatterns.some((pattern) => pattern.test(event.path)); + private isIgnored(path: RelativePath): boolean { + return this.ignorePatterns.some((pattern) => pattern.test(path)); } private removeAllEventsForDocumentId(documentId: DocumentId): void { diff --git a/frontend/sync-client/src/sync-operations/types.ts b/frontend/sync-client/src/sync-operations/types.ts index 1bf99b8a..4db503c4 100644 --- a/frontend/sync-client/src/sync-operations/types.ts +++ b/frontend/sync-client/src/sync-operations/types.ts @@ -27,6 +27,12 @@ export enum SyncEventType { SyncRemote = "sync-remote", } +export type FileSyncEvent = + | { type: SyncEventType.Create; path: RelativePath } + | { type: SyncEventType.SyncLocal; path: RelativePath; oldPath?: RelativePath } + | { type: SyncEventType.Delete; path: RelativePath } + | { type: SyncEventType.SyncRemote; remoteVersion: DocumentVersionWithoutContent }; + export type SyncEvent = | { type: SyncEventType.Create; diff --git a/frontend/sync-client/src/utils/data-structures/min-covered.test.ts b/frontend/sync-client/src/utils/data-structures/min-covered.test.ts deleted file mode 100644 index 7b7271d7..00000000 --- a/frontend/sync-client/src/utils/data-structures/min-covered.test.ts +++ /dev/null @@ -1,76 +0,0 @@ -import { describe, it } from "node:test"; -import assert from "node:assert"; -import { CoveredValues } from "./min-covered"; - -describe("CoveredValues", () => { - it("should initialize with the given min value", () => { - const covered = new CoveredValues(5); - assert.strictEqual(covered.min, 5); - }); - - it("should add values greater than min", () => { - const covered = new CoveredValues(0); - covered.add(3); - assert.strictEqual(covered.min, 0); - covered.add(1); - assert.strictEqual(covered.min, 1); - covered.add(4); - assert.strictEqual(covered.min, 1); - covered.add(2); - assert.strictEqual(covered.min, 4); - }); - - it("should ignore duplicate values", () => { - const covered = new CoveredValues(0); - covered.add(3); - covered.add(3); - covered.add(3); - assert.strictEqual(covered.min, 0); - covered.add(1); - covered.add(2); - assert.strictEqual(covered.min, 3); - }); - - it("should handle multiple consecutive values", () => { - const covered = new CoveredValues(132); - for (let i = 250; i > 132; i--) { - assert.strictEqual(covered.min, 132); - covered.add(i); - } - assert.strictEqual(covered.min, 250); - }); - - it("should handle adding values lower than current min", () => { - const covered = new CoveredValues(5); - covered.add(3); - assert.strictEqual(covered.min, 5); - covered.add(6); - assert.strictEqual(covered.min, 6); - }); - - it("should auto-advance when setting min value", () => { - const covered = new CoveredValues(5); - covered.add(7); - covered.add(8); - covered.add(9); - assert.strictEqual(covered.min, 5); - // Setting min to 6 should auto-advance through 7, 8, 9 - covered.min = 6; - assert.strictEqual(covered.min, 9); - covered.add(10); - assert.strictEqual(covered.min, 10); - }); - - it("should handle setting min value with no consecutive values", () => { - const covered = new CoveredValues(5); - covered.add(10); - covered.add(15); - assert.strictEqual(covered.min, 5); - // Setting min to 8 should not auto-advance (no consecutive values) - covered.min = 8; - assert.strictEqual(covered.min, 8); - // Add 9 to trigger auto-advance to 10 - covered.add(9); - assert.strictEqual(covered.min, 10); - }); -}); diff --git a/frontend/sync-client/src/utils/data-structures/min-covered.ts b/frontend/sync-client/src/utils/data-structures/min-covered.ts deleted file mode 100644 index 8b38822f..00000000 --- a/frontend/sync-client/src/utils/data-structures/min-covered.ts +++ /dev/null @@ -1,61 +0,0 @@ -/** - * A class that tracks the minimum covered value in a sequence of numbers. - * It keeps track of a minimum value based on the seen values. - * - * It expects integers slightly out of order and makes sure that the value of `min` is - * always the minimum of the seen values. This is done with bounded memory usage. - * - * @example - * ```typescript - * const covered = new CoveredValues(0); - * covered.add(2); // seenValues = [2], min = 0 - * covered.add(1); // seenValues = [], min = 2 - * covered.min; // returns 2 - * ``` - */ -export class CoveredValues { - private seenValues: number[] = []; - - public constructor(private minValue: number) {} - - public get min(): number { - return this.minValue; - } - - public set min(value: number) { - this.minValue = Math.max(value, this.minValue); - this.seenValues = this.seenValues.filter((v) => v > this.minValue); - this.advanceMinWhilePossible(); - } - - public add(value: number | undefined): void { - if (value === undefined || value < this.minValue) { - return; - } - - let i = 0; - while (i < this.seenValues.length && this.seenValues[i] < value) { - i++; - } - - if (i === this.seenValues.length) { - this.seenValues.push(value); - } else if (this.seenValues[i] === value) { - return; - } else { - this.seenValues.splice(i, 0, value); - } - - this.advanceMinWhilePossible(); - } - - private advanceMinWhilePossible(): void { - while ( - this.seenValues.length > 0 && - this.seenValues[0] === this.minValue + 1 - ) { - this.seenValues.shift(); - this.minValue++; - } - } -} diff --git a/sync-server/src/app_state/database.rs b/sync-server/src/app_state/database.rs index 195dc7e7..c91bc28a 100644 --- a/sync-server/src/app_state/database.rs +++ b/sync-server/src/app_state/database.rs @@ -663,12 +663,12 @@ impl Database { .context("Cannot fetch document version") } - // inserting the document must be the last step of the transaction if there's one + // inserting the document must be the last step of the transaction pub async fn insert_document_version( &self, vault_id: &VaultId, version: &StoredDocumentVersion, - transaction: Option, + mut transaction: WriteTransaction, ) -> Result<()> { let document_id = version.document_id.as_hyphenated(); let query = sqlx::query!( @@ -697,22 +697,20 @@ impl Database { version.has_been_merged ); - if let Some(mut transaction) = transaction { - query - .execute(&mut *transaction) - .await - .context("Cannot insert document version")?; + // Acquire the broadcast send lock before the insert so that + // broadcasts are serialized in vault_update_id order even after + // the write transaction (and its per-vault lock) is released. + let _send_guard = self.broadcasts.acquire_send_lock(vault_id).await; - transaction - .commit() - .await - .context("Failed to commit transaction")?; - } else { - query - .execute(&self.get_connection_pool(vault_id).await?) - .await - .context("Cannot insert document version")?; - } + query + .execute(&mut *transaction) + .await + .context("Cannot insert document version")?; + + transaction + .commit() + .await + .context("Failed to commit transaction")?; self.broadcasts .send_document_update( diff --git a/sync-server/src/app_state/websocket/broadcasts.rs b/sync-server/src/app_state/websocket/broadcasts.rs index cf359497..91183970 100644 --- a/sync-server/src/app_state/websocket/broadcasts.rs +++ b/sync-server/src/app_state/websocket/broadcasts.rs @@ -10,6 +10,7 @@ use crate::{app_state::database::models::VaultId, config::server_config::ServerC pub struct Broadcasts { broadcast_channel_capacity: usize, tx: Arc>>>, + send_locks: Arc>>>>, } type TxMap = HashMap>; @@ -19,9 +20,23 @@ impl Broadcasts { Self { broadcast_channel_capacity: server_config.broadcast_channel_capacity, tx: Arc::new(Mutex::new(HashMap::new())), + send_locks: Arc::new(Mutex::new(HashMap::new())), } } + /// Acquire a per-vault lock that serializes broadcasts in commit order. + /// Must be acquired before the insert, held through commit and broadcast. + pub async fn acquire_send_lock(&self, vault: &VaultId) -> tokio::sync::OwnedMutexGuard<()> { + let lock = { + let mut locks = self.send_locks.lock().await; + locks + .entry(vault.clone()) + .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(()))) + .clone() + }; + lock.lock_owned().await + } + /// Remove senders for vaults with no active receivers fn prune_inactive_vaults(tx_map: &mut TxMap) { tx_map.retain(|_, sender| sender.receiver_count() > 0); diff --git a/sync-server/src/server/create_document.rs b/sync-server/src/server/create_document.rs index 3b073a88..89941b9a 100644 --- a/sync-server/src/server/create_document.rs +++ b/sync-server/src/server/create_document.rs @@ -130,7 +130,7 @@ pub async fn create_document( state .database - .insert_document_version(&vault_id, &new_version, Some(transaction)) + .insert_document_version(&vault_id, &new_version, transaction) .await .map_err(server_error)?; diff --git a/sync-server/src/server/delete_document.rs b/sync-server/src/server/delete_document.rs index ccfd7ebf..3e6398b8 100644 --- a/sync-server/src/server/delete_document.rs +++ b/sync-server/src/server/delete_document.rs @@ -91,7 +91,7 @@ pub async fn delete_document( state .database - .insert_document_version(&vault_id, &new_version, Some(transaction)) + .insert_document_version(&vault_id, &new_version, transaction) .await .map_err(server_error)?; diff --git a/sync-server/src/server/restore_document_version.rs b/sync-server/src/server/restore_document_version.rs index f759fa59..36c0344e 100644 --- a/sync-server/src/server/restore_document_version.rs +++ b/sync-server/src/server/restore_document_version.rs @@ -134,7 +134,7 @@ pub async fn restore_document_version( state .database - .insert_document_version(&vault_id, &new_version, Some(transaction)) + .insert_document_version(&vault_id, &new_version, transaction) .await .map_err(server_error)?; diff --git a/sync-server/src/server/update_document.rs b/sync-server/src/server/update_document.rs index 561a6e33..b6227de5 100644 --- a/sync-server/src/server/update_document.rs +++ b/sync-server/src/server/update_document.rs @@ -306,7 +306,7 @@ pub async fn update_document( state .database - .insert_document_version(&vault_id, &new_version, Some(transaction)) + .insert_document_version(&vault_id, &new_version, transaction) .await .map_err(server_error)?;