From 19d5dc19993d4f6ca292910bcc11df533d5efa30 Mon Sep 17 00:00:00 2001 From: Andras Schmelczer Date: Fri, 24 Apr 2026 20:56:03 +0100 Subject: [PATCH] . --- .../file-operations/file-operations.test.ts | 2 +- .../src/file-operations/file-operations.ts | 2 +- frontend/sync-client/src/sync-client.ts | 8 +- .../conflict-path.test.ts | 0 .../conflict-path.ts | 2 +- .../offline-change-detector.ts | 4 +- .../sync-operations/sync-event-queue.test.ts | 182 +++++----- .../src/sync-operations/sync-event-queue.ts | 158 +++++---- .../sync-client/src/sync-operations/syncer.ts | 316 +++++++++--------- .../sync-client/src/sync-operations/types.ts | 36 +- sync-server/src/app_state/websocket/models.rs | 3 + 11 files changed, 358 insertions(+), 355 deletions(-) rename frontend/sync-client/src/{utils => sync-operations}/conflict-path.test.ts (100%) rename frontend/sync-client/src/{utils => sync-operations}/conflict-path.ts (97%) diff --git a/frontend/sync-client/src/file-operations/file-operations.test.ts b/frontend/sync-client/src/file-operations/file-operations.test.ts index 78977b14..5a8f5af6 100644 --- a/frontend/sync-client/src/file-operations/file-operations.test.ts +++ b/frontend/sync-client/src/file-operations/file-operations.test.ts @@ -8,7 +8,7 @@ import { assertSetContainsExactly } from "../utils/assert-set-contains-exactly"; import type { FileSystemOperations } from "./filesystem-operations"; import type { TextWithCursors } from "reconcile-text"; import type { ServerConfig, ServerConfigData } from "../services/server-config"; -import { isConflictPath } from "../utils/conflict-path"; +import { isConflictPath } from "../sync-operations/conflict-path"; class MockServerConfig implements Pick { public async getConfig(): Promise { diff --git a/frontend/sync-client/src/file-operations/file-operations.ts b/frontend/sync-client/src/file-operations/file-operations.ts index 3b3d50c4..e2ffd4a5 100644 --- a/frontend/sync-client/src/file-operations/file-operations.ts +++ b/frontend/sync-client/src/file-operations/file-operations.ts @@ -7,7 +7,7 @@ import type { TextWithCursors } from "reconcile-text"; import { reconcile } from "reconcile-text"; import { isFileTypeMergable } from "../utils/is-file-type-mergable"; import { isBinary } from "../utils/is-binary"; -import { buildConflictFileName } from "../utils/conflict-path"; +import { buildConflictFileName } from "../sync-operations/conflict-path"; import type { ServerConfig } from "../services/server-config"; export class FileOperations { diff --git a/frontend/sync-client/src/sync-client.ts b/frontend/sync-client/src/sync-client.ts index ff1c3841..39b0f000 100644 --- a/frontend/sync-client/src/sync-client.ts +++ b/frontend/sync-client/src/sync-client.ts @@ -57,8 +57,8 @@ export class SyncClient { > ) { } - public get documentCount(): number { - return this.syncEventQueue.documentCount; + public get syncedDocumentCount(): number { + return this.syncEventQueue.syncedDocumentCount; } public get isWebSocketConnected(): boolean { @@ -390,7 +390,7 @@ export class SyncClient { public get hasPendingWork(): boolean { return ( - this.syncEventQueue.size > 0 || + this.syncEventQueue.pendingUpdateCount > 0 || this.webSocketManager.hasOutstandingWork ); } @@ -408,7 +408,7 @@ export class SyncClient { return DocumentSyncStatus.SYNCING; } - return this.syncer.hasPendingOperationsForDocument(relativePath) + return this.syncEventQueue.hasPendingEventsForPath(relativePath) ? DocumentSyncStatus.SYNCING : DocumentSyncStatus.UP_TO_DATE; } diff --git a/frontend/sync-client/src/utils/conflict-path.test.ts b/frontend/sync-client/src/sync-operations/conflict-path.test.ts similarity index 100% rename from frontend/sync-client/src/utils/conflict-path.test.ts rename to frontend/sync-client/src/sync-operations/conflict-path.test.ts diff --git a/frontend/sync-client/src/utils/conflict-path.ts b/frontend/sync-client/src/sync-operations/conflict-path.ts similarity index 97% rename from frontend/sync-client/src/utils/conflict-path.ts rename to frontend/sync-client/src/sync-operations/conflict-path.ts index 32c6591c..9e107b9a 100644 --- a/frontend/sync-client/src/utils/conflict-path.ts +++ b/frontend/sync-client/src/sync-operations/conflict-path.ts @@ -1,4 +1,4 @@ -import type { RelativePath } from "../sync-operations/types"; +import type { RelativePath } from "./types"; // Local-only files displaced by `FileOperations.ensureClearPath` are named // `conflict--`. The UUID is a full RFC-4122 v4 value so diff --git a/frontend/sync-client/src/sync-operations/offline-change-detector.ts b/frontend/sync-client/src/sync-operations/offline-change-detector.ts index e6bc2b51..c90f6a78 100644 --- a/frontend/sync-client/src/sync-operations/offline-change-detector.ts +++ b/frontend/sync-client/src/sync-operations/offline-change-detector.ts @@ -89,7 +89,7 @@ function enqueueRenamedDocuments( const hasLocalRename = remoteRelPath !== undefined && remoteRelPath !== path; if (hasLocalRename) { - queue.enqueue({ type: SyncEventType.SyncLocal, path }); + queue.enqueue({ type: SyncEventType.LocalUpdate, path }); locallyRenamedPaths.add(path); logger.debug(`Document ${path} was renamed locally (from ${remoteRelPath}), scheduling sync`); } @@ -243,5 +243,5 @@ async function handleNewFile( } logger.debug(`Document ${relativePath} not found in database, scheduling sync to create it`); - return { instruction: { type: SyncEventType.Create, relativePath } }; + return { instruction: { type: SyncEventType.LocalCreate, relativePath } }; } diff --git a/frontend/sync-client/src/sync-operations/sync-event-queue.test.ts b/frontend/sync-client/src/sync-operations/sync-event-queue.test.ts index aae010b6..a33aa258 100644 --- a/frontend/sync-client/src/sync-operations/sync-event-queue.test.ts +++ b/frontend/sync-client/src/sync-operations/sync-event-queue.test.ts @@ -38,13 +38,13 @@ describe("SyncEventQueue", () => { remoteHash: "hash-a" }); - queue.enqueue({ type: SyncEventType.SyncLocal, path: "a.md" }); - queue.enqueue({ type: SyncEventType.SyncLocal, path: "a.md" }); - queue.enqueue({ type: SyncEventType.Delete, path: "a.md" }); + queue.enqueue({ type: SyncEventType.LocalUpdate, path: "a.md" }); + queue.enqueue({ type: SyncEventType.LocalUpdate, path: "a.md" }); + queue.enqueue({ type: SyncEventType.LocalDelete, path: "a.md" }); const event = await queue.next(); - assert.strictEqual(event?.type, SyncEventType.Delete); - if (event?.type === SyncEventType.Delete) { + assert.strictEqual(event?.type, SyncEventType.LocalDelete); + if (event?.type === SyncEventType.LocalDelete) { assert.strictEqual(event.documentId, "A"); } assert.strictEqual(await queue.next(), undefined); @@ -58,34 +58,34 @@ describe("SyncEventQueue", () => { remoteHash: "hash-a" }); - queue.enqueue({ type: SyncEventType.SyncLocal, path: "a.md" }); - queue.enqueue({ type: SyncEventType.SyncLocal, path: "a.md" }); - queue.enqueue({ type: SyncEventType.SyncLocal, path: "a.md" }); + queue.enqueue({ type: SyncEventType.LocalUpdate, path: "a.md" }); + queue.enqueue({ type: SyncEventType.LocalUpdate, path: "a.md" }); + queue.enqueue({ type: SyncEventType.LocalUpdate, path: "a.md" }); const event = await queue.next(); - assert.strictEqual(event?.type, SyncEventType.SyncLocal); + assert.strictEqual(event?.type, SyncEventType.LocalUpdate); assert.strictEqual(await queue.next(), undefined); }); - it("sync-remote events for the same documentId coalesce to the last one", async () => { + it("sync-remote-content events for the same documentId coalesce to the last one", async () => { const queue = createQueue(); queue.enqueue({ - type: SyncEventType.SyncRemote, + type: SyncEventType.RemoteUpdate, remoteVersion: fakeRemoteVersion("A", { vaultUpdateId: 1 }) }); queue.enqueue({ - type: SyncEventType.SyncRemote, + type: SyncEventType.RemoteUpdate, remoteVersion: fakeRemoteVersion("A", { vaultUpdateId: 2 }) }); queue.enqueue({ - type: SyncEventType.SyncRemote, + type: SyncEventType.RemoteUpdate, remoteVersion: fakeRemoteVersion("A", { vaultUpdateId: 3 }) }); const event = await queue.next(); - assert.strictEqual(event?.type, SyncEventType.SyncRemote); - if (event?.type === SyncEventType.SyncRemote) { + assert.strictEqual(event?.type, SyncEventType.RemoteUpdate); + if (event?.type === SyncEventType.RemoteUpdate) { assert.strictEqual(event.remoteVersion.vaultUpdateId, 3); } assert.strictEqual(await queue.next(), undefined); @@ -93,18 +93,18 @@ describe("SyncEventQueue", () => { it("create events are returned FIFO", async () => { const queue = createQueue(); - queue.enqueue({ type: SyncEventType.Create, path: "a.md" }); - queue.enqueue({ type: SyncEventType.Create, path: "b.md" }); + queue.enqueue({ type: SyncEventType.LocalCreate, path: "a.md" }); + queue.enqueue({ type: SyncEventType.LocalCreate, path: "b.md" }); const first = await queue.next(); - assert.strictEqual(first?.type, SyncEventType.Create); - if (first?.type === SyncEventType.Create) { + assert.strictEqual(first?.type, SyncEventType.LocalCreate); + if (first?.type === SyncEventType.LocalCreate) { assert.strictEqual(first.path, "a.md"); } const second = await queue.next(); - assert.strictEqual(second?.type, SyncEventType.Create); - if (second?.type === SyncEventType.Create) { + assert.strictEqual(second?.type, SyncEventType.LocalCreate); + if (second?.type === SyncEventType.LocalCreate) { assert.strictEqual(second.path, "b.md"); } }); @@ -117,33 +117,33 @@ describe("SyncEventQueue", () => { remoteHash: "hash-a" }); - queue.enqueue({ type: SyncEventType.Delete, path: "a.md" }); + queue.enqueue({ type: SyncEventType.LocalDelete, path: "a.md" }); const event = await queue.next(); - assert.strictEqual(event?.type, SyncEventType.Delete); - if (event?.type === SyncEventType.Delete) { + assert.strictEqual(event?.type, SyncEventType.LocalDelete); + if (event?.type === SyncEventType.LocalDelete) { assert.strictEqual(event.documentId, "A"); } }); it("delete for unknown path is silently ignored", () => { const queue = createQueue(); - queue.enqueue({ type: SyncEventType.Delete, path: "unknown.md" }); - assert.strictEqual(queue.size, 0); + queue.enqueue({ type: SyncEventType.LocalDelete, path: "unknown.md" }); + assert.strictEqual(queue.pendingUpdateCount, 0); }); it("document store CRUD operations work correctly", () => { const queue = createQueue(); assert.strictEqual(queue.getSettledDocumentByPath("a.md"), undefined); - assert.strictEqual(queue.documentCount, 0); + assert.strictEqual(queue.syncedDocumentCount, 0); queue.setDocument("a.md", { documentId: "A", parentVersionId: 1, remoteHash: "hash-a" }); - assert.strictEqual(queue.documentCount, 1); + assert.strictEqual(queue.syncedDocumentCount, 1); assert.deepStrictEqual(queue.getSettledDocumentByPath("a.md"), { documentId: "A", parentVersionId: 1, @@ -155,7 +155,7 @@ describe("SyncEventQueue", () => { assert.strictEqual(found?.record.documentId, "A"); queue.removeDocument("a.md"); - assert.strictEqual(queue.documentCount, 0); + assert.strictEqual(queue.syncedDocumentCount, 0); assert.strictEqual(queue.getSettledDocumentByPath("a.md"), undefined); }); @@ -167,7 +167,7 @@ describe("SyncEventQueue", () => { remoteHash: "hash-a" }); - queue.enqueue({ type: SyncEventType.SyncLocal, path: "b.md", oldPath: "a.md" }); + queue.enqueue({ type: SyncEventType.LocalUpdate, path: "b.md", oldPath: "a.md" }); assert.strictEqual(queue.getSettledDocumentByPath("a.md"), undefined); assert.strictEqual(queue.getSettledDocumentByPath("b.md")?.documentId, "A"); }); @@ -185,29 +185,29 @@ describe("SyncEventQueue", () => { remoteHash: "hash-b" }); - queue.enqueue({ type: SyncEventType.SyncLocal, path: "a.md" }); - queue.enqueue({ type: SyncEventType.SyncLocal, path: "b.md" }); - queue.enqueue({ type: SyncEventType.Delete, path: "a.md" }); - queue.enqueue({ type: SyncEventType.SyncLocal, path: "b.md" }); + queue.enqueue({ type: SyncEventType.LocalUpdate, path: "a.md" }); + queue.enqueue({ type: SyncEventType.LocalUpdate, path: "b.md" }); + queue.enqueue({ type: SyncEventType.LocalDelete, path: "a.md" }); + queue.enqueue({ type: SyncEventType.LocalUpdate, path: "b.md" }); // First next() should see the delete for A (coalescing sync-local + delete) const first = await queue.next(); - assert.strictEqual(first?.type, SyncEventType.Delete); - if (first?.type === SyncEventType.Delete) { + assert.strictEqual(first?.type, SyncEventType.LocalDelete); + if (first?.type === SyncEventType.LocalDelete) { assert.strictEqual(first.documentId, "A"); } // Remaining should be the coalesced sync-local for B const second = await queue.next(); - assert.strictEqual(second?.type, SyncEventType.SyncLocal); - if (second?.type === SyncEventType.SyncLocal) { + assert.strictEqual(second?.type, SyncEventType.LocalUpdate); + if (second?.type === SyncEventType.LocalUpdate) { assert.strictEqual(second.documentId, "B"); } assert.strictEqual(await queue.next(), undefined); }); - it("delete discards subsequent sync-remote events for the same document", async () => { + it("delete discards subsequent sync-remote-content events for the same document", async () => { const queue = createQueue(); queue.setDocument("a.md", { documentId: "A", @@ -215,18 +215,18 @@ describe("SyncEventQueue", () => { remoteHash: "hash-a" }); - queue.enqueue({ type: SyncEventType.Delete, path: "a.md" }); + queue.enqueue({ type: SyncEventType.LocalDelete, path: "a.md" }); queue.enqueue({ - type: SyncEventType.SyncRemote, + type: SyncEventType.RemoteUpdate, remoteVersion: fakeRemoteVersion("A", { vaultUpdateId: 5 }) }); const event = await queue.next(); - assert.strictEqual(event?.type, SyncEventType.Delete); + assert.strictEqual(event?.type, SyncEventType.LocalDelete); assert.strictEqual(await queue.next(), undefined); }); - it("delete discards subsequent sync-local and sync-remote for the same document", async () => { + it("delete discards subsequent sync-local and sync-remote-content for the same document", async () => { const queue = createQueue(); queue.setDocument("a.md", { documentId: "A", @@ -234,20 +234,20 @@ describe("SyncEventQueue", () => { remoteHash: "hash-a" }); - queue.enqueue({ type: SyncEventType.Delete, path: "a.md" }); - queue.enqueue({ type: SyncEventType.SyncLocal, path: "a.md" }); - queue.enqueue({ type: SyncEventType.Create, path: "b.md" }); + queue.enqueue({ type: SyncEventType.LocalDelete, path: "a.md" }); + queue.enqueue({ type: SyncEventType.LocalUpdate, path: "a.md" }); + queue.enqueue({ type: SyncEventType.LocalCreate, path: "b.md" }); queue.enqueue({ - type: SyncEventType.SyncRemote, + type: SyncEventType.RemoteUpdate, remoteVersion: fakeRemoteVersion("A", { vaultUpdateId: 5 }) }); const first = await queue.next(); - assert.strictEqual(first?.type, SyncEventType.Delete); + assert.strictEqual(first?.type, SyncEventType.LocalDelete); // Only the unrelated create should remain const second = await queue.next(); - assert.strictEqual(second?.type, SyncEventType.Create); + assert.strictEqual(second?.type, SyncEventType.LocalCreate); assert.strictEqual(await queue.next(), undefined); }); @@ -260,30 +260,30 @@ describe("SyncEventQueue", () => { }); // Create is pending — Delete for same path gets a promise documentId - queue.enqueue({ type: SyncEventType.Create, path: "unknown.md" }); - queue.enqueue({ type: SyncEventType.Delete, path: "unknown.md" }); - queue.enqueue({ type: SyncEventType.SyncLocal, path: "a.md" }); + queue.enqueue({ type: SyncEventType.LocalCreate, path: "unknown.md" }); + queue.enqueue({ type: SyncEventType.LocalDelete, path: "unknown.md" }); + queue.enqueue({ type: SyncEventType.LocalUpdate, path: "a.md" }); // Dequeue and resolve the Create const event = await queue.next(); - assert.ok(event?.type === SyncEventType.Create); + assert.ok(event?.type === SyncEventType.LocalCreate); event.resolvers!.resolve("NEW"); await queue.next(); // delete const second = await queue.next(); - assert.strictEqual(second?.type, SyncEventType.SyncLocal); + assert.strictEqual(second?.type, SyncEventType.LocalUpdate); }); it("getCreatePromise returns a promise resolved by the event's resolvers", async () => { const queue = createQueue(); - queue.enqueue({ type: SyncEventType.Create, path: "a.md" }); + queue.enqueue({ type: SyncEventType.LocalCreate, path: "a.md" }); const promise = queue.getCreatePromise("a.md"); assert.ok(promise !== undefined); // The syncer resolves via event.resolvers after dequeuing const event = await queue.next(); - assert.ok(event?.type === SyncEventType.Create); + assert.ok(event?.type === SyncEventType.LocalCreate); assert.ok(event.resolvers !== undefined); event.resolvers.resolve("resolved-id"); @@ -292,13 +292,13 @@ describe("SyncEventQueue", () => { it("rejecting the event's resolvers rejects the create promise", async () => { const queue = createQueue(); - queue.enqueue({ type: SyncEventType.Create, path: "a.md" }); + queue.enqueue({ type: SyncEventType.LocalCreate, path: "a.md" }); const promise = queue.getCreatePromise("a.md"); assert.ok(promise !== undefined); const event = await queue.next(); - assert.ok(event?.type === SyncEventType.Create); + assert.ok(event?.type === SyncEventType.LocalCreate); assert.ok(event.resolvers !== undefined); event.resolvers.promise.catch(() => { }); event.resolvers.reject(new Error("cancelled")); @@ -308,8 +308,8 @@ describe("SyncEventQueue", () => { it("clear rejects all pending create promises", async () => { const queue = createQueue(); - queue.enqueue({ type: SyncEventType.Create, path: "a.md" }); - queue.enqueue({ type: SyncEventType.Create, path: "b.md" }); + queue.enqueue({ type: SyncEventType.LocalCreate, path: "a.md" }); + queue.enqueue({ type: SyncEventType.LocalCreate, path: "b.md" }); const promiseA = queue.getCreatePromise("a.md"); const promiseB = queue.getCreatePromise("b.md"); @@ -324,28 +324,28 @@ describe("SyncEventQueue", () => { it("create can be re-enqueued after being dequeued", async () => { const queue = createQueue(); - queue.enqueue({ type: SyncEventType.Create, path: "a.md" }); + queue.enqueue({ type: SyncEventType.LocalCreate, path: "a.md" }); await queue.next(); - queue.enqueue({ type: SyncEventType.Create, path: "a.md" }); - assert.strictEqual(queue.size, 1); + queue.enqueue({ type: SyncEventType.LocalCreate, path: "a.md" }); + assert.strictEqual(queue.pendingUpdateCount, 1); }); it("silently ignores create events matching ignore patterns", () => { const queue = createQueue(["*.tmp", ".hidden/**"]); - queue.enqueue({ type: SyncEventType.Create, path: "scratch.tmp" }); - queue.enqueue({ type: SyncEventType.Create, path: ".hidden/secret.md" }); - assert.strictEqual(queue.size, 0); + queue.enqueue({ type: SyncEventType.LocalCreate, path: "scratch.tmp" }); + queue.enqueue({ type: SyncEventType.LocalCreate, path: ".hidden/secret.md" }); + assert.strictEqual(queue.pendingUpdateCount, 0); - queue.enqueue({ type: SyncEventType.Create, path: "notes-new.md" }); - assert.strictEqual(queue.size, 1); + queue.enqueue({ type: SyncEventType.LocalCreate, path: "notes-new.md" }); + assert.strictEqual(queue.pendingUpdateCount, 1); queue.enqueue({ - type: SyncEventType.SyncRemote, + type: SyncEventType.RemoteUpdate, remoteVersion: fakeRemoteVersion("N") }); - assert.strictEqual(queue.size, 2); + assert.strictEqual(queue.pendingUpdateCount, 2); }); it("clear removes events but keeps documents", () => { @@ -355,15 +355,15 @@ describe("SyncEventQueue", () => { parentVersionId: 1, remoteHash: "hash-a" }); - queue.enqueue({ type: SyncEventType.Create, path: "b.md" }); - queue.enqueue({ type: SyncEventType.SyncLocal, path: "a.md" }); + queue.enqueue({ type: SyncEventType.LocalCreate, path: "b.md" }); + queue.enqueue({ type: SyncEventType.LocalUpdate, path: "a.md" }); - assert.strictEqual(queue.size, 2); + assert.strictEqual(queue.pendingUpdateCount, 2); queue.clear(); - assert.strictEqual(queue.size, 0); - assert.strictEqual(queue.documentCount, 1); + assert.strictEqual(queue.pendingUpdateCount, 0); + assert.strictEqual(queue.syncedDocumentCount, 1); assert.strictEqual(queue.getSettledDocumentByPath("a.md")?.documentId, "A"); }); @@ -407,7 +407,7 @@ describe("SyncEventQueue", () => { lastSeenUpdateId: 4 }, async () => { }); - assert.strictEqual(queue.documentCount, 2); + assert.strictEqual(queue.syncedDocumentCount, 2); assert.strictEqual(queue.getSettledDocumentByPath("a.md")?.documentId, "A"); assert.strictEqual(queue.getSettledDocumentByPath("b.md")?.documentId, "B"); assert.strictEqual(queue.lastSeenUpdateId, 5); @@ -427,9 +427,9 @@ describe("SyncEventQueue", () => { }); // Pending create adds a path - queue.enqueue({ type: SyncEventType.Create, path: "c.md" }); + queue.enqueue({ type: SyncEventType.LocalCreate, path: "c.md" }); // Pending delete removes a path - queue.enqueue({ type: SyncEventType.Delete, path: "a.md" }); + queue.enqueue({ type: SyncEventType.LocalDelete, path: "a.md" }); const paths = queue.trackedPaths(); assert.deepStrictEqual( @@ -441,10 +441,10 @@ describe("SyncEventQueue", () => { it("trackedPaths handles create-delete-create for the same path", () => { const queue = createQueue(); - queue.enqueue({ type: SyncEventType.Create, path: "a.md" }); + queue.enqueue({ type: SyncEventType.LocalCreate, path: "a.md" }); // Delete gets promise documentId from pending Create - queue.enqueue({ type: SyncEventType.Delete, path: "a.md" }); - queue.enqueue({ type: SyncEventType.Create, path: "a.md" }); + queue.enqueue({ type: SyncEventType.LocalDelete, path: "a.md" }); + queue.enqueue({ type: SyncEventType.LocalCreate, path: "a.md" }); const paths = queue.trackedPaths(); assert.ok(paths.has("a.md")); @@ -453,10 +453,10 @@ describe("SyncEventQueue", () => { it("trackedPaths applies moves for pending SyncLocal events", () => { const queue = createQueue(); - queue.enqueue({ type: SyncEventType.Create, path: "a.md" }); + queue.enqueue({ type: SyncEventType.LocalCreate, path: "a.md" }); // File was renamed from a.md to b.md - queue.enqueue({ type: SyncEventType.SyncLocal, path: "b.md", oldPath: "a.md" }); + queue.enqueue({ type: SyncEventType.LocalUpdate, path: "b.md", oldPath: "a.md" }); const paths = queue.trackedPaths(); assert.ok(!paths.has("a.md")); @@ -466,10 +466,10 @@ describe("SyncEventQueue", () => { it("trackedPaths tracks multiple moves for the same pending create", () => { const queue = createQueue(); - queue.enqueue({ type: SyncEventType.Create, path: "a.md" }); + queue.enqueue({ type: SyncEventType.LocalCreate, path: "a.md" }); - queue.enqueue({ type: SyncEventType.SyncLocal, path: "b.md", oldPath: "a.md" }); - queue.enqueue({ type: SyncEventType.SyncLocal, path: "c.md", oldPath: "b.md" }); + queue.enqueue({ type: SyncEventType.LocalUpdate, path: "b.md", oldPath: "a.md" }); + queue.enqueue({ type: SyncEventType.LocalUpdate, path: "c.md", oldPath: "b.md" }); const paths = queue.trackedPaths(); assert.ok(!paths.has("a.md")); @@ -480,15 +480,15 @@ describe("SyncEventQueue", () => { it("resolveCreate settles the document and replaces promise documentIds in the queue", async () => { const queue = createQueue(); - queue.enqueue({ type: SyncEventType.Create, path: "a.md" }); + queue.enqueue({ type: SyncEventType.LocalCreate, path: "a.md" }); const createPromise = queue.getCreatePromise("a.md")!; // Dependent events enqueued while create is still pending - queue.enqueue({ type: SyncEventType.SyncLocal, path: "a.md" }); - queue.enqueue({ type: SyncEventType.Delete, path: "a.md" }); + queue.enqueue({ type: SyncEventType.LocalUpdate, path: "a.md" }); + queue.enqueue({ type: SyncEventType.LocalDelete, path: "a.md" }); const event = await queue.next(); // dequeue the create - assert.ok(event?.type === SyncEventType.Create); + assert.ok(event?.type === SyncEventType.LocalCreate); queue.resolveCreate(event, { documentId: "DOC-1", @@ -506,7 +506,7 @@ describe("SyncEventQueue", () => { // The SyncLocal + Delete for "DOC-1" coalesce: sync-local is // discarded and the delete is returned (standard coalescing). const deleteEvt = await queue.next(); - assert.ok(deleteEvt?.type === SyncEventType.Delete); + assert.ok(deleteEvt?.type === SyncEventType.LocalDelete); assert.strictEqual(deleteEvt.documentId, "DOC-1"); assert.strictEqual(await queue.next(), undefined); diff --git a/frontend/sync-client/src/sync-operations/sync-event-queue.ts b/frontend/sync-client/src/sync-operations/sync-event-queue.ts index a49ce71f..8a19009a 100644 --- a/frontend/sync-client/src/sync-operations/sync-event-queue.ts +++ b/frontend/sync-client/src/sync-operations/sync-event-queue.ts @@ -1,7 +1,7 @@ import type { Settings } from "../persistence/settings"; import type { Logger } from "../tracing/logger"; import { globsToRegexes } from "../utils/globs-to-regexes"; -import { isConflictPath } from "../utils/conflict-path"; +import { isConflictPath } from "./conflict-path"; import { removeFromArray } from "../utils/remove-from-array"; import { SyncEventType, @@ -13,6 +13,10 @@ import { type SyncEvent, type VaultUpdateId, } from "./types"; +import { sleep } from "../utils/sleep"; + +export const SAVE_RETRY_BASE_DELAY_MS = 50; +export const SAVE_RETRY_MAX_ATTEMPTS = 3; export class SyncEventQueue { // Latest state of the filesystem as we know it, excluding @@ -27,7 +31,7 @@ export class SyncEventQueue { // can include multiple generations of the same document, // e.g.: a create, delete, create sequence for the same path. // - // The paths for the events must always correspond to the latest + // The paths within the events must always correspond to the latest // path on disk, so the path of each event may be updated multiple // times. // @@ -37,6 +41,11 @@ export class SyncEventQueue { // file creations for paths matching any of these patterns will be ignored private ignorePatterns: RegExp[]; + private savePending = false; + + + private lastSeenUpdateId: VaultUpdateId; + public constructor( private readonly settings: Settings, private readonly logger: Logger, @@ -62,29 +71,19 @@ export class SyncEventQueue { this.documents.set(relativePath, record); } } + this.lastSeenUpdateId = initialState.lastSeenUpdateId ?? -1; - this.logger.debug(`Loaded ${this.documents.size} documents`); + this.logger.debug(`Loaded ${this.documents.size} documents and lastSeenUpdateId=${this.lastSeenUpdateId} from storage`); } - public get size(): number { + public get pendingUpdateCount(): number { return this.events.length; } - public get documentCount(): number { + public get syncedDocumentCount(): number { return this.documents.size; } - public get lastSeenUpdateId(): VaultUpdateId { - let max = 0; - for (const record of this.documents.values()) { - if (record.parentVersionId > max) { - max = record.parentVersionId; - } - } - return max; - } - - // todo: let's remove public getSettledDocumentByPath(path: RelativePath): DocumentRecord | undefined { return this.documents.get(path); @@ -149,7 +148,7 @@ export class SyncEventQueue { this.documents.set(newPath, record); for (const e of this.events) { if ( - e.type === SyncEventType.SyncLocal && + e.type === SyncEventType.LocalUpdate && e.documentId === record.documentId ) { e.path = newPath; @@ -168,7 +167,7 @@ export class SyncEventQueue { * Call once a create has been acknowledged by the server. */ public resolveCreate( - event: Extract, + event: Extract, record: DocumentRecord ): void { const promise = event.resolvers?.promise; @@ -179,7 +178,7 @@ export class SyncEventQueue { if (promise !== undefined) { for (const e of this.events) { if ( - (e.type === SyncEventType.SyncLocal || e.type === SyncEventType.Delete) && + (e.type === SyncEventType.LocalUpdate || e.type === SyncEventType.LocalDelete) && e.documentId === promise ) { (e as { documentId: DocumentId | Promise }).documentId = record.documentId; @@ -211,12 +210,12 @@ export class SyncEventQueue { const pendingPaths = new Map, RelativePath>(); for (const event of this.events) { - if (event.type === SyncEventType.Create) { + if (event.type === SyncEventType.LocalCreate) { paths.add(event.path); if (event.resolvers !== undefined) { pendingPaths.set(event.resolvers.promise, event.path); } - } else if (event.type === SyncEventType.Delete) { + } else if (event.type === SyncEventType.LocalDelete) { if (typeof event.documentId === "string") { const path = this.getDocumentByDocumentId(event.documentId)?.path; if (path) { @@ -244,14 +243,16 @@ export class SyncEventQueue { const docId = record.documentId; return this.events.some( (e) => - (e.type === SyncEventType.Create && e.path === path) || - (e.type === SyncEventType.SyncLocal && + (e.type === SyncEventType.LocalCreate && e.path === path) || + (e.type === SyncEventType.LocalUpdate && e.documentId === docId) || - (e.type === SyncEventType.Delete && + (e.type === SyncEventType.LocalDelete && e.documentId === docId) || - (e.type === SyncEventType.SyncRemote && + (e.type === SyncEventType.RemoteUpdate && // we care about the local path not the remote - this.getDocumentByDocumentId(e.remoteVersion.documentId)?.path === path) + this.getDocumentByDocumentId(e.remoteVersion.documentId)?.path === path) || + (e.type === SyncEventType.RemotePathChange && + this.getDocumentByDocumentId(e.pathChange.documentId)?.path === path) ); } @@ -279,7 +280,10 @@ export class SyncEventQueue { } public enqueue(input: FileSyncEvent): void { - if (input.type === SyncEventType.SyncRemote) { + if ( + input.type === SyncEventType.RemoteUpdate || + input.type === SyncEventType.RemotePathChange + ) { this.events.push(input); return; } @@ -303,19 +307,19 @@ export class SyncEventQueue { return; } - if (input.type === SyncEventType.Create) { - this.events.push({ type: SyncEventType.Create, path, originalPath: path }); + if (input.type === SyncEventType.LocalCreate) { + this.events.push({ type: SyncEventType.LocalCreate, path, originalPath: path }); return; } - const lookupPath = (input.type === SyncEventType.SyncLocal && input.oldPath) ? input.oldPath : path; + const lookupPath = (input.type === SyncEventType.LocalUpdate && input.oldPath) ? input.oldPath : path; const record = this.documents.get(lookupPath); const documentId: DocumentId | Promise | undefined = record?.documentId ?? this.getCreatePromise(lookupPath); if (documentId === undefined) return; - if (input.type === SyncEventType.Delete) { - this.events.push({ type: SyncEventType.Delete, documentId }); + if (input.type === SyncEventType.LocalDelete) { + this.events.push({ type: SyncEventType.LocalDelete, documentId }); return; } @@ -324,7 +328,7 @@ export class SyncEventQueue { this.documents.delete(input.oldPath); this.documents.set(path, record!); for (const e of this.events) { - if (e.type === SyncEventType.SyncLocal && e.documentId === documentId) { + if (e.type === SyncEventType.LocalUpdate && e.documentId === documentId) { e.path = path; } } @@ -333,7 +337,7 @@ export class SyncEventQueue { this.updatePendingCreatePath(input.oldPath, path); } } - this.events.push({ type: SyncEventType.SyncLocal, documentId, path, originalPath: path }); + this.events.push({ type: SyncEventType.LocalUpdate, documentId, path, originalPath: path }); } @@ -344,7 +348,7 @@ export class SyncEventQueue { const [first] = this.events; // Creates are always returned immediately (FIFO) - if (first.type === SyncEventType.Create) { + if (first.type === SyncEventType.LocalCreate) { this.events.shift(); return first; } @@ -355,7 +359,7 @@ export class SyncEventQueue { // `Promise` (the originating Create hasn't landed // yet), awaiting it may reject — handle that: the Create was // cancelled, so the Delete has nothing to delete, just drop it. - if (first.type === SyncEventType.Delete) { + if (first.type === SyncEventType.LocalDelete) { this.events.shift(); const { documentId } = first; let resolvedId: DocumentId; @@ -371,14 +375,14 @@ export class SyncEventQueue { return first; } - if (first.type === SyncEventType.SyncLocal) { + if (first.type === SyncEventType.LocalUpdate) { const { documentId } = first; // If there's a later delete for the same documentId, discard // all sync-locals for that document and return the delete const deleteEvent = this.events.find( (e) => - e.type === SyncEventType.Delete && + e.type === SyncEventType.LocalDelete && e.documentId === documentId ); if (deleteEvent !== undefined) { @@ -399,7 +403,7 @@ export class SyncEventQueue { // original path to the last one const matching = this.events.filter( (e) => - e.type === SyncEventType.SyncLocal && + e.type === SyncEventType.LocalUpdate && e.documentId === documentId && e.originalPath === first.originalPath // can't coalesce moves as they can depend on each other so we have to sync them in the same order, could do topological sort but let's keep it simple for now ); @@ -410,12 +414,31 @@ export class SyncEventQueue { return result; } - // SyncRemote: coalesce multiple events for the same documentId to the last one - const { documentId } = first.remoteVersion; + // Coalesce multiple events of the same remote kind for the same + // documentId to the last one. Kinds are coalesced independently so + // that an interleaved content+path stream (e.g. VaultUpdate → + // PathChange) still preserves the VaultUpdate-before-PathChange + // ordering invariant the syncer relies on. + if (first.type === SyncEventType.RemoteUpdate) { + const { documentId } = first.remoteVersion; + const matching = this.events.filter( + (e) => + e.type === SyncEventType.RemoteUpdate && + e.remoteVersion.documentId === documentId + ); + const result = matching[matching.length - 1]; + for (const item of matching) { + removeFromArray(this.events, item); + } + return result; + } + + // SyncRemotePath + const { documentId } = first.pathChange; const matching = this.events.filter( (e) => - e.type === SyncEventType.SyncRemote && - e.remoteVersion.documentId === documentId + e.type === SyncEventType.RemotePathChange && + e.pathChange.documentId === documentId ); const result = matching[matching.length - 1]; for (const item of matching) { @@ -436,11 +459,13 @@ export class SyncEventQueue { for (let i = this.events.length - 1; i >= 0; i--) { const e = this.events[i]; if ( - (e.type === SyncEventType.SyncLocal && + (e.type === SyncEventType.LocalUpdate && e.documentId === documentId) || - (e.type === SyncEventType.SyncRemote && + (e.type === SyncEventType.RemoteUpdate && e.remoteVersion.documentId === documentId) || - (e.type === SyncEventType.Delete && + (e.type === SyncEventType.RemotePathChange && + e.pathChange.documentId === documentId) || + (e.type === SyncEventType.LocalDelete && e.documentId === documentId) ) { // eslint-disable-next-line no-restricted-syntax -- Bulk removal by predicate, not single-item removal @@ -462,7 +487,7 @@ export class SyncEventQueue { if (promise !== undefined) { for (const e of this.events) { if ( - e.type === SyncEventType.SyncLocal && + e.type === SyncEventType.LocalUpdate && e.documentId === promise ) { e.path = newPath; @@ -477,7 +502,7 @@ export class SyncEventQueue { for (let i = this.events.length - 1; i >= 0; i--) { const e = this.events[i]; if ( - e.type === SyncEventType.Create && + e.type === SyncEventType.LocalCreate && e.resolvers?.promise === promise ) { return e.path; @@ -488,10 +513,10 @@ export class SyncEventQueue { private findLastCreate( path: RelativePath - ): Extract | undefined { + ): Extract | undefined { for (let i = this.events.length - 1; i >= 0; i--) { const e = this.events[i]; - if (e.type === SyncEventType.Create && e.path === path) { + if (e.type === SyncEventType.LocalCreate && e.path === path) { return e; } } @@ -535,52 +560,23 @@ export class SyncEventQueue { private rejectAllPendingCreates(): void { for (const event of this.events) { - if (event.type === SyncEventType.Create && event.resolvers !== undefined) { + if (event.type === SyncEventType.LocalCreate && event.resolvers !== undefined) { event.resolvers.promise.catch(() => { /* suppressed — consumer may not be listening */ }); event.resolvers.reject(new Error("Create was cancelled")); } } } - private savePending = false; // Coalesce bursts of mutations into one persist per microtask. A drain // iteration can easily produce 10+ mutations; without this, we'd fire // 10 overlapping `save()` calls racing on the persistence backend. - // - // On failure, retry with bounded exponential backoff instead of - // silently dropping the write — otherwise a transient IDB/fs error - // leaves the in-memory state permanently diverged from persisted state - // and the user loses queue progress on restart. private saveInTheBackground(): void { if (this.savePending) return; this.savePending = true; queueMicrotask(() => { this.savePending = false; - void this.saveWithRetry(); + this.save(); }); } - - private async saveWithRetry(): Promise { - const maxAttempts = 3; - for (let attempt = 1; attempt <= maxAttempts; attempt++) { - try { - await this.save(); - return; - } catch (error) { - if (attempt === maxAttempts) { - this.logger.error( - `Error saving sync state after ${maxAttempts} attempts: ${error}` - ); - return; - } - this.logger.warn( - `Error saving sync state (attempt ${attempt}/${maxAttempts}): ${error}; retrying` - ); - await new Promise((resolve) => - setTimeout(resolve, 50 * attempt) - ); - } - } - } } diff --git a/frontend/sync-client/src/sync-operations/syncer.ts b/frontend/sync-client/src/sync-operations/syncer.ts index 4e51976d..202499f8 100644 --- a/frontend/sync-client/src/sync-operations/syncer.ts +++ b/frontend/sync-client/src/sync-operations/syncer.ts @@ -67,27 +67,6 @@ export class Syncer { this.webSocketManager.onWebSocketStatusChanged.add((isConnected) => { if (isConnected) { this.sendHandshakeMessage(); - } else { - // Don't null the reference synchronously — if the scan is - // still in flight, the next reconnect would spawn a second - // concurrent scan racing on the same queue. Defer the - // clear until the in-flight task actually resolves, so a - // fresh scan can only start once the prior one is done. - const current = this.runningScheduleSyncForOfflineChanges; - if (current === undefined) return; - current - .catch(() => { - /* swallow — internal error already logged */ - }) - .finally(() => { - if ( - this.runningScheduleSyncForOfflineChanges === - current - ) { - this.runningScheduleSyncForOfflineChanges = - undefined; - } - }); } }); this.webSocketManager.onRemoteVaultUpdateReceived.add( @@ -102,20 +81,8 @@ export class Syncer { return this._isFirstSyncComplete; } - public hasPendingOperationsForDocument(relativePath: string): boolean { - return this.queue.hasPendingEventsForPath(relativePath); - } - public syncLocallyCreatedFile(relativePath: RelativePath): void { - this.queue.enqueue({ type: SyncEventType.Create, path: relativePath }); - this.ensureDraining(); - } - - public syncLocallyDeletedFile(relativePath: RelativePath): void { - this.queue.enqueue({ - type: SyncEventType.Delete, - path: relativePath, - }); + this.queue.enqueue({ type: SyncEventType.LocalCreate, path: relativePath }); this.ensureDraining(); } @@ -126,10 +93,78 @@ export class Syncer { oldPath?: RelativePath; relativePath: RelativePath; }): void { - this.queue.enqueue({ type: SyncEventType.SyncLocal, path: relativePath, oldPath }); + this.queue.enqueue({ type: SyncEventType.LocalUpdate, path: relativePath, oldPath }); this.ensureDraining(); } + public syncLocallyDeletedFile(relativePath: RelativePath): void { + this.queue.enqueue({ + type: SyncEventType.LocalDelete, + path: relativePath, + }); + this.ensureDraining(); + } + + + public async syncRemotelyUpdatedFile( + message: WebSocketVaultUpdate + ): Promise { + await this.scheduleSyncForOfflineChanges(); + + for (const remoteVersion of message.documents) { + this.queue.enqueue({ + type: SyncEventType.RemoteUpdate, + remoteVersion + }); + } + + if (message.isInitialSync) { + this._isFirstSyncComplete = true; + } + + this.ensureDraining(); + + } + + // A PathChange notifies us that a document now lives at a new server- + // canonical path. It's delivered to every client (origin included) + // because the create/update HTTP response no longer carries the path, + // so the only way the origin learns about dedupe or first-rename-wins + // is via this event. + // + // Algorithmic assumptions: + // (1) Per-vault broadcast ordering is preserved by the server, so if + // the same write produced a `VaultUpdate` (content change) and a + // `PathChange` (path change), the `VaultUpdate` is handled first + // — that's what lets us skip advancing `parentVersionId` here + // without risking a stuck "already up-to-date" check later. + // (2) On a lag-induced disconnect (`broadcast::error::Lagged`) the + // server disconnects the client for a full resync, so out-of- + // order delivery across a reconnect boundary can't leave us with + // a stale PathChange overwriting a newer one. + public async syncRemotelyChangedPath( + pathChange: WebSocketVaultPathChange + ): Promise { + try { + await this.scheduleSyncForOfflineChanges(); + + this.queue.enqueue({ + type: SyncEventType.RemotePathChange, + pathChange + }); + + await this.scheduleDrain(); + } catch (e) { + if (e instanceof SyncResetError) { + this.logger.info( + "Failed to apply remote path change due to a reset" + ); + return; + } + this.logger.error(`Failed to apply remote path change: ${e}`); + } + } + public async scheduleSyncForOfflineChanges(): Promise { if (this.runningScheduleSyncForOfflineChanges !== undefined) { this.logger.debug("Uploading local changes is already in progress"); @@ -167,114 +202,27 @@ export class Syncer { } } - public async syncRemotelyUpdatedFile( - message: WebSocketVaultUpdate - ): Promise { - try { - await this.scheduleSyncForOfflineChanges(); - - for (const remoteVersion of message.documents) { - this.queue.enqueue({ - type: SyncEventType.SyncRemote, - remoteVersion - }); - } - - if (message.isInitialSync) { - this._isFirstSyncComplete = true; - } - - await this.scheduleDrain(); - } catch (e) { - if (e instanceof SyncResetError) { - this.logger.info( - "Failed to sync remotely updated file due to a reset" - ); - return; - } - this.logger.error(`Failed to sync remotely updated file: ${e}`); - } - } - - // A PathChange notifies us that a document now lives at a new server- - // canonical path. It's delivered to every client (origin included) - // because the create/update HTTP response no longer carries the path, - // so the only way the origin learns about dedupe or first-rename-wins - // is via this event. - // - // Algorithmic assumptions: - // (1) Per-vault broadcast ordering is preserved by the server, so if - // the same write produced a `VaultUpdate` (content change) and a - // `PathChange` (path change), the `VaultUpdate` is handled first - // — that's what lets us skip advancing `parentVersionId` here - // without risking a stuck "already up-to-date" check later. - // (2) On a lag-induced disconnect (`broadcast::error::Lagged`) the - // server disconnects the client for a full resync, so out-of- - // order delivery across a reconnect boundary can't leave us with - // a stale PathChange overwriting a newer one. - public async syncRemotelyChangedPath( - pathChange: WebSocketVaultPathChange - ): Promise { - // Serialize onto the drain chain so this handler can't race against - // an in-flight `processSyncRemote` / `processSyncLocal` etc. that - // captured the old path before our move. - try { - await this.chainOntoDrain(async () => { - const existing = this.queue.getDocumentByDocumentId( - pathChange.documentId - ); - if (existing === undefined) { - throw new Error( - `Received path change for unknown document ${pathChange.documentId}` - ); - } - - const { path: currentPath, record } = existing; - const newPath = pathChange.relativePath; - - if (currentPath !== newPath) { - await this.operations.move(currentPath, newPath); - - this.history.addHistoryEntry({ - status: SyncStatus.SUCCESS, - details: { - type: SyncType.MOVE, - relativePath: newPath, - movedFrom: currentPath - }, - message: "Applied remote path change", - author: pathChange.userId, - timestamp: new Date(pathChange.updatedDate) - }); - } - - // `operations.move` updates the queue's path index, but - // doesn't touch `remoteRelativePath`. Refresh it so offline - // change detection compares against the server's path. - // parentVersionId intentionally stays at its prior value: - // if the write also changed content, the corresponding - // VaultUpdate handles that; advancing it here would make us - // skip fetching content we don't yet have. - this.queue.setDocument(newPath, { - ...record, - remoteRelativePath: newPath - }); - }); - } catch (e) { - if (e instanceof SyncResetError) { - this.logger.info( - "Failed to apply remote path change due to a reset" - ); - return; - } - this.logger.error(`Failed to apply remote path change: ${e}`); - } - } public reset(): void { this._isFirstSyncComplete = false; this.queue.clear(); - this.runningScheduleSyncForOfflineChanges = undefined; + // Don't null the reference synchronously — if the scan is + // still in flight, the next reconnect would spawn a second + // concurrent scan racing on the same queue. Defer the + // clear until the in-flight task actually resolves, so a + // fresh scan can only start once the prior one is done. + const current = this.runningScheduleSyncForOfflineChanges; + if (current !== undefined) { + current.finally(() => { + if ( + this.runningScheduleSyncForOfflineChanges === + current + ) { + this.runningScheduleSyncForOfflineChanges = + undefined; + } + }); + } // Do not set this.draining = undefined — the in-flight drain will // exit naturally (SyncResetError or empty queue) and the promise // chain stays intact, preventing concurrent drain invocations @@ -372,17 +320,20 @@ export class Syncer { try { switch (event.type) { - case SyncEventType.Create: + case SyncEventType.LocalCreate: await this.processCreate(event); break; - case SyncEventType.Delete: + case SyncEventType.LocalDelete: await this.processDelete(event); break; - case SyncEventType.SyncLocal: + case SyncEventType.LocalUpdate: await this.processSyncLocal(event); break; - case SyncEventType.SyncRemote: - await this.processSyncRemote(event); + case SyncEventType.RemoteUpdate: + await this.processSyncRemoteContent(event); + break; + case SyncEventType.RemotePathChange: + await this.processSyncRemotePath(event); break; } } catch (e) { @@ -390,7 +341,7 @@ export class Syncer { this.logger.info( `Skipping sync event '${event.type}' because the file no longer exists` ); - if (event.type === SyncEventType.Create) { + if (event.type === SyncEventType.LocalCreate) { event.resolvers?.promise.catch(() => { }); event.resolvers?.reject(new Error("Create was cancelled")); } @@ -404,7 +355,7 @@ export class Syncer { // `processEvent` ran; if it was a Create, its resolver // promise would otherwise hang forever, blocking any // queued Delete / SyncLocal that `await`s it. - if (event.type === SyncEventType.Create) { + if (event.type === SyncEventType.LocalCreate) { event.resolvers?.promise.catch(() => { /* suppressed */ }); @@ -423,7 +374,7 @@ export class Syncer { private async processCreate( - event: Extract + event: Extract ): Promise { const effectivePath = event.path; const contentBytes = await this.operations.read(effectivePath); @@ -487,7 +438,7 @@ export class Syncer { } private async processDelete( - event: Extract + event: Extract ): Promise { let documentId: DocumentId; if (typeof event.documentId === "string") { @@ -531,7 +482,7 @@ export class Syncer { } private async processSyncLocal( - event: Extract + event: Extract ): Promise { let documentId: DocumentId; if (typeof event.documentId === "string") { @@ -606,8 +557,8 @@ export class Syncer { }); } - private async processSyncRemote( - event: Extract + private async processSyncRemoteContent( + event: Extract ): Promise { const { remoteVersion } = event; const existingDoc = this.queue.getDocumentByDocumentId( @@ -643,6 +594,51 @@ export class Syncer { await this.processRemoteUpdateForNewDocument(remoteVersion); } + private async processSyncRemotePath( + event: Extract + ): Promise { + const { pathChange } = event; + const existing = this.queue.getDocumentByDocumentId( + pathChange.documentId + ); + if (existing === undefined) { + throw new Error( + `Received path change for unknown document ${pathChange.documentId}` + ); + } + + const { path: currentPath, record } = existing; + const newPath = pathChange.relativePath; + + if (currentPath !== newPath) { + await this.operations.move(currentPath, newPath); + + this.history.addHistoryEntry({ + status: SyncStatus.SUCCESS, + details: { + type: SyncType.MOVE, + relativePath: newPath, + movedFrom: currentPath + }, + message: "Applied remote path change", + author: pathChange.userId, + timestamp: new Date(pathChange.updatedDate) + }); + } + + // `operations.move` updates the queue's path index, but doesn't + // touch `remoteRelativePath`. Refresh it so offline change + // detection compares against the server's path. parentVersionId + // intentionally stays at its prior value: if the write also + // changed content, the corresponding VaultUpdate handles that; + // advancing it here would make us skip fetching content we don't + // yet have. + this.queue.setDocument(newPath, { + ...record, + remoteRelativePath: newPath + }); + } + private async processRemoteUpdateForExistingDocument( currentPath: RelativePath, record: DocumentRecord, @@ -793,14 +789,14 @@ export class Syncer { details: targetPath !== currentPath ? { - type: SyncType.MOVE, - relativePath: targetPath, - movedFrom: currentPath - } + type: SyncType.MOVE, + relativePath: targetPath, + movedFrom: currentPath + } : { - type: SyncType.UPDATE, - relativePath: targetPath - }, + type: SyncType.UPDATE, + relativePath: targetPath + }, message: "Successfully downloaded remotely updated file from the server", author: fullVersion.userId, @@ -1063,7 +1059,7 @@ export class Syncer { // `resolvers` promise can be fulfilled (or rejected, on a deleted // response). Dependent SyncLocal/Delete events are chained through // that promise and would otherwise `await` forever. - createEvent?: Extract; + createEvent?: Extract; }): Promise { if (response.isDeleted) { // A Create that the server returned as already-deleted means @@ -1222,7 +1218,7 @@ export class Syncer { } private notifyRemainingOperationsChanged(): void { - const currentCount = this.queue.size; + const currentCount = this.queue.pendingUpdateCount; if (this.previousRemainingOperationsCount !== currentCount) { this.previousRemainingOperationsCount = currentCount; this.onRemainingOperationsCountChanged.trigger(currentCount); diff --git a/frontend/sync-client/src/sync-operations/types.ts b/frontend/sync-client/src/sync-operations/types.ts index 4db503c4..22b82b3e 100644 --- a/frontend/sync-client/src/sync-operations/types.ts +++ b/frontend/sync-client/src/sync-operations/types.ts @@ -1,4 +1,5 @@ import type { DocumentVersionWithoutContent } from "../services/types/DocumentVersionWithoutContent"; +import type { WebSocketVaultPathChange } from "../services/types/WebSocketVaultPathChange"; export type VaultUpdateId = number; export type DocumentId = string; @@ -21,36 +22,43 @@ export interface StoredSyncState { } export enum SyncEventType { - Create = "create", - SyncLocal = "sync-local", - Delete = "delete", - SyncRemote = "sync-remote", + LocalCreate = "local-create", + LocalUpdate = "local-update", // includes both content and path changes + LocalDelete = "local-delete", + RemoteUpdate = "remote-update", + RemotePathChange = "remote-path-change", } export type FileSyncEvent = - | { type: SyncEventType.Create; path: RelativePath } - | { type: SyncEventType.SyncLocal; path: RelativePath; oldPath?: RelativePath } - | { type: SyncEventType.Delete; path: RelativePath } - | { type: SyncEventType.SyncRemote; remoteVersion: DocumentVersionWithoutContent }; + | { type: SyncEventType.LocalCreate; path: RelativePath } + | { type: SyncEventType.LocalUpdate; path: RelativePath; oldPath?: RelativePath } + | { type: SyncEventType.LocalDelete; path: RelativePath } + | { type: SyncEventType.RemoteUpdate; remoteVersion: DocumentVersionWithoutContent } + | { type: SyncEventType.RemotePathChange; pathChange: WebSocketVaultPathChange }; export type SyncEvent = | { - type: SyncEventType.Create; + type: SyncEventType.LocalCreate; path: RelativePath; // current path on disk - originalPath: RelativePath; // original path on disk when the event was created + originalPath: RelativePath; // original path on disk when the event was queued resolvers?: PromiseWithResolvers } | { - type: SyncEventType.SyncLocal; + type: SyncEventType.LocalUpdate; documentId: DocumentId | Promise; // if it's a promise, the promise is fulfilled once the document's create event is processed path: RelativePath; // current path on disk - originalPath: RelativePath; // original path on disk when the event was created + originalPath: RelativePath; // original path on disk when the event was queued + // no need to store the old path in case of a rename; the server will figure it out from the parent's path } | { - type: SyncEventType.Delete; + type: SyncEventType.LocalDelete; documentId: DocumentId | Promise; // if it's a promise, the promise is fulfilled once the document's create event is processed } | { - type: SyncEventType.SyncRemote; + type: SyncEventType.RemoteUpdate; remoteVersion: DocumentVersionWithoutContent; + } + | { + type: SyncEventType.RemotePathChange; + pathChange: WebSocketVaultPathChange; }; diff --git a/sync-server/src/app_state/websocket/models.rs b/sync-server/src/app_state/websocket/models.rs index 9ed52400..73e81f26 100644 --- a/sync-server/src/app_state/websocket/models.rs +++ b/sync-server/src/app_state/websocket/models.rs @@ -58,6 +58,7 @@ pub struct CursorPositionFromServer { pub clients: Vec, } +// Clients only get notified of other clients' updates through WebSocketVaultUpdate. #[derive(TS, Serialize, Clone, Debug)] #[serde(rename_all = "camelCase")] pub struct WebSocketVaultUpdate { @@ -65,6 +66,8 @@ pub struct WebSocketVaultUpdate { pub is_initial_sync: bool, } +// Clients get notified of both their own and other clients' path changes through WebSocketVaultPathChange. +// This is becuase we must absolutely order path updates as they may all depend on all previous updates. #[derive(TS, Serialize, Clone, Debug)] #[serde(rename_all = "camelCase")] pub struct WebSocketVaultPathChange {