From 9ae1a5e09e1e446351e9626a1ad4d046d715b414 Mon Sep 17 00:00:00 2001 From: Andras Schmelczer Date: Sat, 28 Mar 2026 17:24:45 +0000 Subject: [PATCH] Add sync event queue --- .../sync-operations/sync-event-queue.test.ts | 46 ++++++++++ .../src/sync-operations/sync-event-queue.ts | 85 +++++++++++++++++++ 2 files changed, 131 insertions(+) create mode 100644 frontend/sync-client/src/sync-operations/sync-event-queue.test.ts create mode 100644 frontend/sync-client/src/sync-operations/sync-event-queue.ts diff --git a/frontend/sync-client/src/sync-operations/sync-event-queue.test.ts b/frontend/sync-client/src/sync-operations/sync-event-queue.test.ts new file mode 100644 index 00000000..7e43b700 --- /dev/null +++ b/frontend/sync-client/src/sync-operations/sync-event-queue.test.ts @@ -0,0 +1,46 @@ +import { describe, it } from "node:test"; +import assert from "node:assert"; +import { SyncEventQueue, type SyncEvent } from "./sync-event-queue"; + +describe("SyncEventQueue", () => { + it("delete collapses interleaved events for one document while leaving the other intact", () => { + const queue = new SyncEventQueue(); + queue.enqueue({ type: "local-content-update", documentId: "A" }); + queue.enqueue({ type: "remote-content-update", documentId: "B" }); + queue.enqueue({ type: "local-content-update", documentId: "A" }); + queue.enqueue({ type: "move", documentId: "A" }); + queue.enqueue({ type: "remote-content-update", documentId: "A" }); + queue.enqueue({ type: "delete", documentId: "A" }); + queue.enqueue({ type: "local-content-update", documentId: "B" }); + + assert.deepStrictEqual(queue.next(), { type: "delete", documentId: "A" }); + assert.deepStrictEqual(queue.next(), { + type: "local-content-update", + documentId: "B" + }); + assert.strictEqual(queue.next(), undefined); + }); + + it("updates coalesce up to a move boundary then post-move events are processed separately", () => { + const queue = new SyncEventQueue(); + queue.enqueue({ type: "local-content-update", documentId: "X" }); + queue.enqueue({ type: "remote-content-update", documentId: "X" }); + queue.enqueue({ type: "file-create", path: "new.md" }); + queue.enqueue({ type: "local-content-update", documentId: "X" }); + queue.enqueue({ type: "move", documentId: "X" }); + queue.enqueue({ type: "remote-content-update", documentId: "X" }); + queue.enqueue({ type: "local-content-update", documentId: "X" }); + + assert.deepStrictEqual(queue.next(), { + type: "local-content-update", + documentId: "X" + }); + assert.deepStrictEqual(queue.next(), { type: "file-create", path: "new.md" }); + assert.deepStrictEqual(queue.next(), { type: "move", documentId: "X" }); + assert.deepStrictEqual(queue.next(), { + type: "local-content-update", + documentId: "X" + }); + assert.strictEqual(queue.next(), undefined); + }); +}); diff --git a/frontend/sync-client/src/sync-operations/sync-event-queue.ts b/frontend/sync-client/src/sync-operations/sync-event-queue.ts new file mode 100644 index 00000000..c3d8af82 --- /dev/null +++ b/frontend/sync-client/src/sync-operations/sync-event-queue.ts @@ -0,0 +1,85 @@ +import type { DocumentId, RelativePath } from "../persistence/database"; + +export type SyncEvent = + | { type: "file-create"; path: RelativePath } + | { type: "local-content-update"; documentId: DocumentId } + | { type: "remote-content-update"; documentId: DocumentId } + | { type: "move"; documentId: DocumentId } + | { type: "delete"; documentId: DocumentId }; + +export class SyncEventQueue { + private readonly events: SyncEvent[] = []; + + public get size(): number { + return this.events.length; + } + + public clear(): void { + this.events.length = 0; + } + + public enqueue(event: SyncEvent): void { + this.events.push(event); + } + + public next(): SyncEvent | undefined { + if (this.events.length === 0) return undefined; + + const first = this.events[0]; + if (first.type === "file-create") { + this.events.shift(); + return first; + } + + const { documentId } = first; + + // If there's an eventual delete, discard everything for this document + const deleteEvent = this.events.find( + (e) => e.type === "delete" && e.documentId === documentId + ); + if (deleteEvent) { + this.removeAllForDocument(documentId); + return deleteEvent; + } + + // Coalesce updates: return the last update before the next move for this document. + // Moves act as barriers since they depend on each other + const moveIndex = this.events.findIndex( + (e) => e.type === "move" && e.documentId === documentId + ); + const boundary = moveIndex === -1 ? this.events.length : moveIndex; + + const updateIndices: number[] = []; + for (let i = 0; i < boundary; i++) { + const e = this.events[i]; + if ( + (e.type === "local-content-update" || + e.type === "remote-content-update") && + e.documentId === documentId + ) { + updateIndices.push(i); + } + } + + if (updateIndices.length > 0) { + const result = this.events[updateIndices[updateIndices.length - 1]]; + for (let i = updateIndices.length - 1; i >= 0; i--) { + this.events.splice(updateIndices[i], 1); + } + return result; + } + + // First event is a move with no preceding updates + this.events.shift(); + return first; + } + + private removeAllForDocument(documentId: DocumentId): void { + for (let i = this.events.length - 1; i >= 0; i--) { + const e = this.events[i]; + if (e.type !== "file-create" && e.documentId === documentId) { + this.events.splice(i, 1); + } + } + } +}