diff --git a/frontend/sync-client/src/sync-operations/sync-event-queue.test.ts b/frontend/sync-client/src/sync-operations/sync-event-queue.test.ts index db6c9a19..23f31891 100644 --- a/frontend/sync-client/src/sync-operations/sync-event-queue.test.ts +++ b/frontend/sync-client/src/sync-operations/sync-event-queue.test.ts @@ -71,21 +71,21 @@ describe("SyncEventQueue", () => { const queue = createQueue(); queue.enqueue({ - type: SyncEventType.RemoteUpdate, + type: SyncEventType.RemoteChange, remoteVersion: fakeRemoteVersion("A", { vaultUpdateId: 1 }) }); queue.enqueue({ - type: SyncEventType.RemoteUpdate, + type: SyncEventType.RemoteChange, remoteVersion: fakeRemoteVersion("A", { vaultUpdateId: 2 }) }); queue.enqueue({ - type: SyncEventType.RemoteUpdate, + type: SyncEventType.RemoteChange, remoteVersion: fakeRemoteVersion("A", { vaultUpdateId: 3 }) }); const event = await queue.next(); - assert.strictEqual(event?.type, SyncEventType.RemoteUpdate); - if (event?.type === SyncEventType.RemoteUpdate) { + assert.strictEqual(event?.type, SyncEventType.RemoteChange); + if (event?.type === SyncEventType.RemoteChange) { assert.strictEqual(event.remoteVersion.vaultUpdateId, 3); } assert.strictEqual(await queue.next(), undefined); @@ -217,7 +217,7 @@ describe("SyncEventQueue", () => { queue.enqueue({ type: SyncEventType.LocalDelete, path: "a.md" }); queue.enqueue({ - type: SyncEventType.RemoteUpdate, + type: SyncEventType.RemoteChange, remoteVersion: fakeRemoteVersion("A", { vaultUpdateId: 5 }) }); @@ -238,7 +238,7 @@ describe("SyncEventQueue", () => { queue.enqueue({ type: SyncEventType.LocalUpdate, path: "a.md" }); queue.enqueue({ type: SyncEventType.LocalCreate, path: "b.md" }); queue.enqueue({ - type: SyncEventType.RemoteUpdate, + type: SyncEventType.RemoteChange, remoteVersion: fakeRemoteVersion("A", { vaultUpdateId: 5 }) }); @@ -342,7 +342,7 @@ describe("SyncEventQueue", () => { assert.strictEqual(queue.pendingUpdateCount, 1); queue.enqueue({ - type: SyncEventType.RemoteUpdate, + type: SyncEventType.RemoteChange, remoteVersion: fakeRemoteVersion("N") }); assert.strictEqual(queue.pendingUpdateCount, 2); diff --git a/frontend/sync-client/src/sync-operations/sync-event-queue.ts b/frontend/sync-client/src/sync-operations/sync-event-queue.ts index 7697ac9c..401541b1 100644 --- a/frontend/sync-client/src/sync-operations/sync-event-queue.ts +++ b/frontend/sync-client/src/sync-operations/sync-event-queue.ts @@ -13,10 +13,7 @@ import { type SyncEvent, type VaultUpdateId, } from "./types"; -import { sleep } from "../utils/sleep"; -export const SAVE_RETRY_BASE_DELAY_MS = 50; -export const SAVE_RETRY_MAX_ATTEMPTS = 3; export class SyncEventQueue { // Latest state of the filesystem as we know it, excluding @@ -88,14 +85,14 @@ export class SyncEventQueue { } public enqueue(input: FileSyncEvent): void { - if (input.type === SyncEventType.RemoteUpdate) { + if (input.type === SyncEventType.RemoteChange) { this.events.push(input); return; } const { path } = input; - if (this.isIgnored(path)) { + if (this.ignorePatterns.some((pattern) => pattern.test(path))) { this.logger.info( `Ignoring ${input.type} for ${path} as it matches ignore patterns` ); @@ -103,15 +100,20 @@ export class SyncEventQueue { } if (input.type === SyncEventType.LocalCreate) { - this.events.push({ type: SyncEventType.LocalCreate, path, originalPath: path }); + this.events.push({ type: SyncEventType.LocalCreate, path, originalPath: path, resolvers: Promise.withResolvers() }); return; } const lookupPath = (input.type === SyncEventType.LocalUpdate && input.oldPath) ? input.oldPath : path; const record = this.documents.get(lookupPath); const documentId: DocumentId | Promise | undefined = - this.getLatestCreatePromise(lookupPath) ?? record?.documentId; - if (documentId === undefined) return; + this.findLatestCreateForPath(lookupPath)?.resolvers.promise ?? record?.documentId; + + if (documentId === undefined) { + // we can get here when deleting a local document after a remote update + + return; + } if (input.type === SyncEventType.LocalDelete) { this.events.push({ type: SyncEventType.LocalDelete, documentId }); @@ -146,29 +148,56 @@ export class SyncEventQueue { /** * Call once a create has been acknowledged by the server. */ - public resolveCreate( + public async resolveCreate( event: Extract, record: DocumentRecord - ): void { - const promise = event.resolvers?.promise; - - this.documents.set(event.path, record); + ): Promise { + removeFromArray(this.events, event); // in case the create event is still pending + await this.setDocument(event.path, record); event.resolvers?.resolve(record.documentId); + } - if (promise !== undefined) { - for (const e of this.events) { - if ( - (e.type === SyncEventType.LocalUpdate || e.type === SyncEventType.LocalDelete) && - e.documentId === promise - ) { - (e as { documentId: DocumentId | Promise }).documentId = record.documentId; - } + /** + * Update the settled document map and persist the new document version. + */ + public setDocument(path: RelativePath, record: DocumentRecord): Promise { + this.documents.set(path, record); + return this.save(); + + } + + public removeDocument(path: RelativePath): Promise { + this.documents.delete(path); + return this.save(); + } + + public getDocumentByDocumentId( + target: DocumentId + ): { path: RelativePath; record: DocumentRecord } | undefined { + for (const [path, record] of this.documents) { + if (record.documentId === target) { + return { path, record }; } } - - this.saveInTheBackground(); + return undefined; } + + + public getDocumentByDocumentIdOrFail( + target: DocumentId + ): { path: RelativePath; record: DocumentRecord } { + const result = this.getDocumentByDocumentId(target); + if (!result) { + throw new Error(`No document found with id ${target}`); + } + return result; + } + + + + + public async save(): Promise { return this.saveData({ documents: Array.from(this.documents.entries()).map( @@ -186,35 +215,9 @@ export class SyncEventQueue { return this.documents.get(path); } - public getDocumentByDocumentId( - target: DocumentId - ): { path: RelativePath; record: DocumentRecord } | undefined { - for (const [path, record] of this.documents) { - if (record.documentId === target) { - return { path, record }; - } - } - return undefined; - } - - public setDocument(path: RelativePath, record: DocumentRecord): void { - this.documents.set(path, record); - this.saveInTheBackground(); - } - - public removeDocument(path: RelativePath): void { - this.documents.delete(path); - this.saveInTheBackground(); - } - public getLatestCreatePromise(path: RelativePath): Promise | undefined { - const event = this.findLatestCreate(path); - if (event === undefined) return undefined; - event.resolvers ??= Promise.withResolvers(); - return event.resolvers.promise; - } public allSettledDocuments(): [RelativePath, DocumentRecord][] { return Array.from(this.documents.entries()); @@ -257,7 +260,7 @@ export class SyncEventQueue { public hasPendingEventsForPath(path: RelativePath): boolean { const record = this.documents.get(path); - if (!record) { + if (record === undefined) { return true; // if we don't know about this path, it must be pending creation } const docId = record.documentId; @@ -268,12 +271,22 @@ export class SyncEventQueue { e.documentId === docId) || (e.type === SyncEventType.LocalDelete && e.documentId === docId) || - (e.type === SyncEventType.RemoteUpdate && + (e.type === SyncEventType.RemoteChange && // we care about the local path not the remote this.getDocumentByDocumentId(e.remoteVersion.documentId)?.path === path) ); } + public hasPendingLocalEventsForDocumentId(documentId: DocumentId): boolean { + return this.events.some( + (e) => + (e.type === SyncEventType.LocalUpdate && + e.documentId === documentId) || + (e.type === SyncEventType.LocalDelete && + e.documentId === documentId) + ); + } + public resetState(): void { this.rejectAllPendingCreates(); @@ -288,18 +301,13 @@ export class SyncEventQueue { - - private isIgnored(path: RelativePath): boolean { - return this.ignorePatterns.some((pattern) => pattern.test(path)); - } - public removeAllEventsForDocumentId(documentId: DocumentId): void { for (let i = this.events.length - 1; i >= 0; i--) { const e = this.events[i]; if ( (e.type === SyncEventType.LocalUpdate && e.documentId === documentId) || - (e.type === SyncEventType.RemoteUpdate && + (e.type === SyncEventType.RemoteChange && e.remoteVersion.documentId === documentId) || (e.type === SyncEventType.LocalDelete && e.documentId === documentId) @@ -310,11 +318,11 @@ export class SyncEventQueue { } } - public updatePendingCreatePath( + private updatePendingCreatePath( oldPath: RelativePath, newPath: RelativePath ): void { - const createEvent = this.findLatestCreate(oldPath); + const createEvent = this.findLatestCreateForPath(oldPath); if (createEvent === undefined) return; const promise = createEvent.resolvers?.promise; @@ -332,7 +340,7 @@ export class SyncEventQueue { } } - private findLatestCreate( + public findLatestCreateForPath( path: RelativePath ): Extract | undefined { for (let i = this.events.length - 1; i >= 0; i--) { @@ -344,40 +352,9 @@ export class SyncEventQueue { return undefined; } - /** - * Returns whether there is an unsynced Create event queued at `path`. - * A caller uses this to decide between displacing the local file vs. - * merging it with a concurrent remote create. - */ - public hasPendingCreateAt(path: RelativePath): boolean { - return this.findLatestCreate(path) !== undefined; - } - /** - * Cancel the latest queued Create for `path`. Rejects its resolver - * promise (so any dependent SyncLocal/Delete events that `await`ed - * the future documentId skip themselves gracefully) and removes the - * Create event from the queue. Returns true if a Create was found - * and cancelled. - */ - public cancelPendingCreate(path: RelativePath): boolean { - const event = this.findLatestCreate(path); - if (event === undefined) return false; - if (event.resolvers !== undefined) { - event.resolvers.promise.catch(() => { - /* suppressed — consumer may not be listening */ - }); - event.resolvers.reject( - new Error( - "Create was cancelled — merged with concurrent remote create" - ) - ); - } - removeFromArray(this.events, event); - return true; - } private rejectAllPendingCreates(): void { for (const event of this.events) { diff --git a/frontend/sync-client/src/sync-operations/syncer.ts b/frontend/sync-client/src/sync-operations/syncer.ts index 6e123edc..3dbcd6cf 100644 --- a/frontend/sync-client/src/sync-operations/syncer.ts +++ b/frontend/sync-client/src/sync-operations/syncer.ts @@ -46,7 +46,8 @@ export class Syncer { private _isFirstSyncStarted = false; private runningScheduleSyncForOfflineChanges: Promise | undefined; - private draining: Promise | undefined; + private drainPromise: Promise | undefined; + private isScanning = false; private previousRemainingOperationsCount = 0; public constructor( @@ -101,15 +102,13 @@ export class Syncer { this.ensureDraining(); } - - public async syncRemotelyUpdatedFile( message: WebSocketVaultUpdate ): Promise { await this.scheduleSyncForOfflineChanges(); this.queue.enqueue({ - type: SyncEventType.RemoteUpdate, + type: SyncEventType.RemoteChange, remoteVersion: message.document }); @@ -145,13 +144,10 @@ export class Syncer { public async waitUntilFinished(): Promise { await this.runningScheduleSyncForOfflineChanges; - // Loop until the draining promise stabilises — new drains can be - // chained by events enqueued during processing - let current = this.draining; - while (current !== undefined) { - await current; - if (this.draining === current) break; - current = this.draining; + // A drain that finishes can be immediately followed by a new one + // (e.g. a remote event arriving), so re-check after each await. + while (this.drainPromise !== undefined) { + await this.drainPromise; } } @@ -176,9 +172,6 @@ export class Syncer { } }); } - // Do not set this.draining = undefined — the in-flight drain will - // exit naturally (SyncResetError or empty queue) and the promise - // chain stays intact, preventing concurrent drain invocations } @@ -199,45 +192,36 @@ export class Syncer { // Offline scan wipes the event queue via `queue.clear()` and then // rebuilds events from disk. That MUST NOT race against an // in-flight drain iteration that may already hold a reference to - // a freshly-cleared event — chain onto the drain so the scan runs - // between drain ticks, never concurrently. - await this.chainOntoDrain(async () => { + // a freshly-cleared event — wait for any drain to finish, and + // suppress new drains for the duration of the scan. + this.isScanning = true; + try { + while (this.drainPromise !== undefined) { + await this.drainPromise; + } await scheduleOfflineChanges( { logger: this.logger, operations: this.operations, queue: this.queue }, (path) => { this.syncLocallyCreatedFile(path); }, (args) => { this.syncLocallyUpdatedFile(args); }, (path) => { this.syncLocallyDeletedFile(path); }, ); - }); + } finally { + this.isScanning = false; + } this.ensureDraining(); - await this.draining; + await this.drainPromise; } - /** - * Serialize a unit of work onto the same promise chain the drain - * uses. This is how direct WebSocket handlers (`syncRemotelyChangedPath`, - * offline-scan) avoid racing against the drain loop: every mutator of - * the queue / disk goes through this single chain, in order of arrival. - */ - private async chainOntoDrain(work: () => Promise): Promise { - const chained = (this.draining ?? Promise.resolve()).then( - async () => work() - ); - // We track the chain via `this.draining` so later work chains onto - // the latest link. Swallow the result-typed value for storage; the - // caller still awaits the true result via `chained`. - this.draining = chained.then( - () => undefined, - () => undefined - ); - return chained; - } private ensureDraining(): void { - void this.chainOntoDrain(async () => this.drain()); + if (this.drainPromise !== undefined) return; + if (this.isScanning) return; + this.drainPromise = this.drain().finally(() => { + this.drainPromise = undefined; + }); } @@ -269,6 +253,10 @@ export class Syncer { } try { + if (await this.skipIfOversized(event)) { + return; + } + switch (event.type) { case SyncEventType.LocalCreate: await this.processCreate(event); @@ -277,10 +265,10 @@ export class Syncer { await this.processDelete(event); break; case SyncEventType.LocalUpdate: - await this.processSyncLocal(event); + await this.processLocalUpdate(event); break; - case SyncEventType.RemoteUpdate: - await this.processSyncRemoteContent(event); + case SyncEventType.RemoteChange: + await this.processRemoteChange(event); break; } } catch (e) { @@ -319,6 +307,61 @@ export class Syncer { } + private async skipIfOversized(event: SyncEvent): Promise { + let sizeInBytes: number; + let relativePath: RelativePath; + + switch (event.type) { + case SyncEventType.LocalDelete: + return false; + case SyncEventType.LocalCreate: + case SyncEventType.LocalUpdate: + sizeInBytes = await this.operations.getFileSize(event.path); + relativePath = event.path; + break; + case SyncEventType.RemoteChange: + if (event.remoteVersion.isDeleted) return false; + sizeInBytes = event.remoteVersion.contentSize; + relativePath = event.remoteVersion.relativePath; + break; + } + + const oversizedEntry = this.getHistoryEntryForSkippedOversizedFile( + sizeInBytes, + relativePath + ); + if (oversizedEntry === undefined) return false; + + this.history.addHistoryEntry(oversizedEntry); + + if (event.type === SyncEventType.LocalCreate) { + event.resolvers?.promise.catch(() => { }); + event.resolvers?.reject(new Error("Create was cancelled")); + } + + return true; + } + + private getHistoryEntryForSkippedOversizedFile( + sizeInBytes: number, + relativePath: RelativePath + ): CommonHistoryEntry | undefined { + const sizeInMB = Math.round(sizeInBytes / 1024 / 1024); + const { maxFileSizeMB } = this.settings.getSettings(); + if (sizeInMB > maxFileSizeMB) { + return { + status: SyncStatus.SKIPPED, + details: { + type: SyncType.SKIPPED as const, + relativePath + }, + message: `File size of ${sizeInMB} MB exceeds the maximum file size limit of ${maxFileSizeMB} MB` + }; + } + } + + + private async processCreate( event: Extract @@ -327,44 +370,12 @@ export class Syncer { const contentBytes = await this.operations.read(effectivePath); const contentHash = await hash(contentBytes); - const oversizedEntry = this.getHistoryEntryForSkippedOversizedFile( - contentBytes.byteLength, - effectivePath - ); - if (oversizedEntry !== undefined) { - this.history.addHistoryEntry(oversizedEntry); - event.resolvers?.promise.catch(() => { }); - event.resolvers?.reject(new Error("Create was cancelled")); - return; - } - const response = await this.syncService.create({ relativePath: event.originalPath, lastSeenVaultUpdateId: this.queue.lastSeenUpdateId, contentBytes }); - - // Handle concurrent move & creation: the server merged our create - // with an existing document that we also have locally at a different path - const existingDoc = this.queue.getDocumentByDocumentId( - response.documentId - ); - - // need to merge in db - if (existingDoc !== undefined && existingDoc.path !== effectivePath) { - // this.logger.info( - // `Merging existing document ${existingDoc.path} into ${effectivePath} after concurrent move & creation` - // ); - // await this.operations.delete(existingDoc.path); - // this.queue.removeDocument(existingDoc.path); - // if (!this.queue.getDocumentByDocumentId(existingDoc.record.documentId)) { - // this.queue.removeAllEventsForDocumentId(existingDoc.record.documentId); - // } - // } - } - - await this.handleMaybeMergingResponse({ path: effectivePath, response, @@ -387,27 +398,9 @@ export class Syncer { private async processDelete( event: Extract ): Promise { - let documentId: DocumentId; - if (typeof event.documentId === "string") { - documentId = event.documentId; - } else { - try { - documentId = await event.documentId; - } catch { - this.logger.debug( - "Skipping delete for a document whose create was cancelled" - ); - return; - } - } + let documentId = await event.documentId; - const doc = this.queue.getDocumentByDocumentId(documentId); - if (doc === undefined) { - this.logger.debug( - `Skipping delete for unknown documentId ${documentId}` - ); - return; - } + const doc = this.queue.getDocumentByDocumentIdOrFail(documentId); const relativePath = doc.path; const response = await this.syncService.delete({ @@ -415,7 +408,7 @@ export class Syncer { relativePath }); - this.queue.removeDocument(doc.path); + await this.queue.removeDocument(doc.path); this.history.addHistoryEntry({ status: SyncStatus.SUCCESS, @@ -428,56 +421,32 @@ export class Syncer { }); } - private async processSyncLocal( + private async processLocalUpdate( event: Extract ): Promise { - let documentId: DocumentId; - if (typeof event.documentId === "string") { - documentId = event.documentId; - } else { - try { - documentId = await event.documentId; - } catch { - this.logger.debug( - "Skipping sync-local for a document whose create was cancelled" - ); - return; - } - } + let documentId = await event.documentId; - const doc = this.queue.getDocumentByDocumentId(documentId); + const { path: diskPath, record } = this.queue.getDocumentByDocumentIdOrFail(documentId); - if (doc === undefined) { - this.logger.debug( - `Skipping sync-local for unknown document ${documentId}` - ); - return; - } - - const { path: diskPath, record } = doc; - - // Read file from the current disk path const contentBytes = await this.operations.read(diskPath); const contentHash = await hash(contentBytes); - // Upload using the original path - const uploadPath = event.originalPath; - + const hashChanged = contentHash !== record.remoteHash; const pathChanged = - record.remoteRelativePath !== undefined && - record.remoteRelativePath !== uploadPath; + record.remoteRelativePath !== event.originalPath; - if (contentHash === record.remoteHash && !pathChanged) { + if (!hashChanged && !pathChanged) { this.logger.debug( `File hash of ${diskPath} matches last synced version; no need to sync` ); return; } - const response = await this.sendUpdate( + const response = await this.sendUpdate({ record, - uploadPath, + relativePath: event.originalPath, contentBytes + } ); await this.handleMaybeMergingResponse({ @@ -504,274 +473,181 @@ export class Syncer { }); } - private async processSyncRemoteContent( - event: Extract - ): Promise { - const { remoteVersion } = event; - const existingDoc = this.queue.getDocumentByDocumentId( - remoteVersion.documentId - ); - - if (existingDoc !== undefined) { - if ( - existingDoc.record.parentVersionId >= - remoteVersion.vaultUpdateId - ) { - this.logger.debug( - `Document ${existingDoc.path} is already up-to-date` - ); - return; - } - - await this.processRemoteUpdateForExistingDocument( - existingDoc.path, - existingDoc.record, - remoteVersion - ); - return; - } - - if (remoteVersion.isDeleted) { - this.logger.debug( - `Document ${remoteVersion.relativePath} has been deleted remotely, no need to sync` - ); - return; - } - - await this.processRemoteUpdateForNewDocument(remoteVersion); - } - - private async processRemoteUpdateForExistingDocument( - currentPath: RelativePath, - record: DocumentRecord, - remoteVersion: DocumentVersionWithoutContent - ): Promise { - if (remoteVersion.isDeleted) { - // Check for local changes before deleting - let hasLocalChanges = false; - try { - const contentBytes = await this.operations.read(currentPath); - const contentHash = await hash(contentBytes); - hasLocalChanges = record.remoteHash !== contentHash; - } catch (e) { - if (!(e instanceof FileNotFoundError)) throw e; - } - - if (hasLocalChanges) { - // Local changes survive; re-upload as a new document - this.queue.removeDocument(currentPath); - this.syncLocallyCreatedFile(currentPath); - return; - } - - await this.operations.delete(currentPath); - this.queue.removeDocument(currentPath); - - this.history.addHistoryEntry({ - status: SyncStatus.SUCCESS, - details: { - type: SyncType.DELETE, - relativePath: currentPath - }, - message: - "Successfully deleted file which had been deleted remotely", - author: remoteVersion.userId, - timestamp: new Date(remoteVersion.updatedDate) - }); - return; - } - - // Fetch the latest full version from the server - const fullVersion = await this.syncService.get({ - documentId: remoteVersion.documentId - }); - - // The document may have been deleted between the broadcast - // and the fetch — handle it the same as a remote delete - if (fullVersion.isDeleted) { - const contentBytes = await this.operations.read(currentPath); - const localHash = await hash(contentBytes); - if (localHash !== record.remoteHash) { - this.queue.removeDocument(currentPath); - this.syncLocallyCreatedFile(currentPath); - } else { - await this.operations.delete(currentPath); - this.queue.removeDocument(currentPath); - } - return; - } - - const contentBytes = await this.operations.read(currentPath); - const contentHash = await hash(contentBytes); - - const hasLocalChanges = record.remoteHash !== contentHash; - - if (hasLocalChanges) { - const response = await this.sendUpdate( - record, - currentPath, - contentBytes - ); - - await this.handleMaybeMergingResponse({ - path: currentPath, - response, - contentHash, - originalContentBytes: contentBytes - }); - - - this.history.addHistoryEntry({ - status: SyncStatus.SUCCESS, - details: { - type: SyncType.UPDATE, - relativePath: currentPath - }, - message: "Merged local changes with remote update", - author: response.userId, - timestamp: new Date(response.updatedDate) - }); - } else { - const responseBytes = base64ToBytes(fullVersion.contentBase64); - - // Path reconciliation fallback for the reconnect case. - // - // In steady-state streaming, server-initiated renames arrive - // as `VaultUpdate` events with `originatesFromSelf=true` for - // the author and drive `processSyncRemotePath`. The reconnect - // catch-up (`get_unseen_documents` → `is_initial_sync=true`) - // replays versions authored by any device with - // `originatesFromSelf=false`, so those take the full remote- - // sync branch and we need this in-branch path reconciliation - // to avoid leaving the local file stuck at its old path. - // - // Only apply the server's path when the record's - // `remoteRelativePath` still matches `currentPath` — that means - // we haven't locally renamed since we last heard from the - // server, so the server's path is authoritative. Any local - // rename in flight keeps priority (it'll be resolved by the - // server on its next write). - let targetPath = currentPath; - if ( - fullVersion.relativePath !== currentPath && - record.remoteRelativePath === currentPath - ) { - await this.operations.move(currentPath, fullVersion.relativePath); - targetPath = fullVersion.relativePath; - } + private async handleMaybeMergingResponse({ + path, + response, + contentHash, + originalContentBytes, + createEvent + }: { + path: RelativePath; + response: DocumentUpdateResponse; + contentHash: string; + originalContentBytes: Uint8Array; + // When processing a Create, pass the originating event so its + // `resolvers` promise can be fulfilled (or rejected, on a deleted + // response) + createEvent?: Extract; + }): Promise { + let record = { + documentId: response.documentId, + parentVersionId: response.vaultUpdateId, + remoteRelativePath: response.relativePath + }; + let remoteHash: string; + if ("type" in response && response.type === "MergingUpdate") { + const responseBytes = base64ToBytes(response.contentBase64); await this.operations.write( - targetPath, - contentBytes, + path, + originalContentBytes, responseBytes ); - // Re-read and re-hash after write (the 3-way merge may produce different content) - const afterWriteBytes = await this.operations.read(targetPath); - const afterWriteHash = await hash(afterWriteBytes); - - if (targetPath !== currentPath) { - this.queue.removeDocument(currentPath); - } - this.queue.setDocument(targetPath, { - documentId: fullVersion.documentId, - parentVersionId: fullVersion.vaultUpdateId, - remoteHash: afterWriteHash, - remoteRelativePath: fullVersion.relativePath - }); + remoteHash = await hash(responseBytes); await this.updateCache( - fullVersion.vaultUpdateId, + response.vaultUpdateId, responseBytes, - targetPath + path ); + } else { + // Fast-forward update: no merge needed + remoteHash = contentHash; - this.history.addHistoryEntry({ - status: SyncStatus.SUCCESS, - details: - targetPath !== currentPath - ? { - type: SyncType.MOVE, - relativePath: targetPath, - movedFrom: currentPath - } - : { - type: SyncType.UPDATE, - relativePath: targetPath - }, - message: - "Successfully downloaded remotely updated file from the server", - author: fullVersion.userId, - timestamp: new Date(fullVersion.updatedDate) + await this.updateCache( + response.vaultUpdateId, + originalContentBytes, + path + ); + } + + if (createEvent === undefined) { + this.ensurePath(path, response.relativePath, Move.Existing); + await this.queue.setDocument(response.relativePath, { + ...record, + remoteHash + }); + + } else { + // The response to a create must contain the path from the create request + await this.queue.resolveCreate(createEvent, { + ...record, + remoteHash }); } } - private async processRemoteUpdateForNewDocument( - remoteVersion: DocumentVersionWithoutContent + + private async processRemoteChange( + event: Extract ): Promise { - const oversizedEntry = this.getHistoryEntryForSkippedOversizedFile( - remoteVersion.contentSize, - remoteVersion.relativePath - ); - if (oversizedEntry !== undefined) { - this.history.addHistoryEntry(oversizedEntry); - return; - } - - const contentBytes = - await this.syncService.getDocumentVersionContent({ - documentId: remoteVersion.documentId, - vaultUpdateId: remoteVersion.vaultUpdateId - }); - - // A concurrent operation may have created the document already - const existingDoc = this.queue.getDocumentByDocumentId( + const { remoteVersion } = event; + const documentWithPath = this.queue.getDocumentByDocumentId( remoteVersion.documentId ); - if (existingDoc !== undefined) { + + if (remoteVersion.isDeleted) { + if (documentWithPath === undefined) { + // trying to delete a document we've already scheduled for deletion locally + return; + } + return this.processRemoteDelete(documentWithPath.path, remoteVersion); + } + + + + if (documentWithPath !== undefined) { + // must be the update to an existing doc + return this.processRemoteUpdate(documentWithPath.path, documentWithPath.record, remoteVersion); + } + + const pendingCreate = this.queue.findLatestCreateForPath(remoteVersion.relativePath); + + if (pendingCreate === undefined) { + return this.processRemoteCreateForNewDocument(remoteVersion); + } else { + return this.processRemoteCreateForPendingDocument(remoteVersion, pendingCreate); + } + } + + + private async processRemoteDelete(path: RelativePath, remoteVersion: DocumentVersionWithoutContent): Promise { + await this.operations.delete(path); + await this.queue.removeDocument(path); + + this.history.addHistoryEntry({ + status: SyncStatus.SUCCESS, + details: { + type: SyncType.DELETE, + relativePath: path + }, + message: + "Successfully deleted file which had been deleted remotely", + author: remoteVersion.userId, + timestamp: new Date(remoteVersion.updatedDate) + }); + } + + private async processRemoteUpdate(path: RelativePath, record: DocumentRecord, remoteVersion: DocumentVersionWithoutContent): Promise { + if ( + record.parentVersionId >= + remoteVersion.vaultUpdateId + ) { this.logger.debug( - `Document ${remoteVersion.relativePath} has already been created locally` + `Document ${path} is already up-to-date` ); return; } - // Special case: local has an *unsynced* new file at the same path. - // The client must cancel the outgoing Create and merge the two files - // instead of displacing the local one to a conflict path — those - // files are semantically "the same user-intended document" that two - // devices created concurrently, so we want to preserve both sides' - // edits, not shelve one aside. - if (this.queue.hasPendingCreateAt(remoteVersion.relativePath)) { - await this.mergeUnsyncedLocalWithRemoteCreate( - remoteVersion, - contentBytes - ); - return; - } + if (!this.queue.hasPendingLocalEventsForDocumentId(remoteVersion.documentId)) { + // no local changes + const currentContent = await this.operations.read(path); + const remoteContent = await this.syncService.getDocumentVersionContent({ documentId: remoteVersion.documentId, vaultUpdateId: remoteVersion.vaultUpdateId }); + this.operations.write(path, currentContent, remoteContent); + // todo: update last seen id - await this.operations.ensureClearPath(remoteVersion.relativePath); + } // else we don't need to update the content, a subsequent local update will do that - const contentHash = await hash(contentBytes); - this.queue.setDocument(remoteVersion.relativePath, { + this.ensurePath(path, remoteVersion.relativePath); + + this.history.addHistoryEntry({ + status: SyncStatus.SUCCESS, + details: { + type: SyncType.MOVE, + relativePath: remoteVersion.relativePath, + movedFrom: path + }, + // todo: eh + message: `File was renamed remotely from ${path} to ${remoteVersion.relativePath}`, + }); + } + + private async processRemoteCreateForNewDocument(remoteVersion: DocumentVersionWithoutContent): Promise { + const remoteContent = await this.syncService.getDocumentVersionContent({ + documentId: remoteVersion.documentId, + vaultUpdateId: remoteVersion.vaultUpdateId + }); + + await this.operations.create( + remoteVersion.relativePath, + remoteContent + ); + + await this.updateCache( + remoteVersion.vaultUpdateId, + remoteContent, + remoteVersion.relativePath + ); + + const contentHash = await hash(remoteContent); + await this.queue.setDocument(remoteVersion.relativePath, { documentId: remoteVersion.documentId, parentVersionId: remoteVersion.vaultUpdateId, remoteHash: contentHash, remoteRelativePath: remoteVersion.relativePath }); - await this.operations.create( - remoteVersion.relativePath, - contentBytes - ); - - await this.updateCache( - remoteVersion.vaultUpdateId, - contentBytes, - remoteVersion.relativePath - ); - - this.history.addHistoryEntry({ status: SyncStatus.SUCCESS, details: { @@ -786,13 +662,18 @@ export class Syncer { } // A remote create landed at a path where we have an unsynced local - // create. How we resolve depends on whether both sides are mergeable - // text: text gets an in-place union merge and one follow-up update; - // binary falls through to displacement so *both* files survive. - private async mergeUnsyncedLocalWithRemoteCreate( + // create. This might be becuase there's another sync client running. + // We must avoid duplicating files. + private async processRemoteCreateForPendingDocument( remoteVersion: DocumentVersionWithoutContent, - remoteContent: Uint8Array + pendingCreateEvent: Extract ): Promise { + const remoteContent = await this.syncService.getDocumentVersionContent({ + documentId: remoteVersion.documentId, + vaultUpdateId: remoteVersion.vaultUpdateId + }); + const remoteHash = await hash(remoteContent); + const path = remoteVersion.relativePath; const localContent = await this.operations.read(path); @@ -804,33 +685,19 @@ export class Syncer { !isBinary(localContent) && !isBinary(remoteContent); - if (!canMergeText) { - // Binary (or non-mergeable) concurrent creates: leave the local - // Create in the queue and let the default displacement flow - // take over (local bytes are moved to `conflict--…` by - // `ensureClearPath`, remote bytes take `path`). When the Create - // eventually fires it reads the remote content at `path` — not - // what we want — so cancel *just* the Create event and - // re-enqueue a fresh one sourced from the displaced path, so - // the server receives the user's original bytes and dedupes - // the path on its own. - this.queue.cancelPendingCreate(path); + if (canMergeText) { + const currentContent = await this.operations.read(pendingCreateEvent.path); - // `ensureClearPath` may return `undefined` if the file was - // deleted between `read(path)` above and this call (a TOCTOU - // race with a concurrent filesystem delete). That's fine: - // nothing to displace means no local bytes to preserve, and - // we just proceed with the remote content. - const conflictPath = - await this.operations.ensureClearPath(path); - - this.queue.setDocument(path, { + this.queue.resolveCreate(pendingCreateEvent, { documentId: remoteVersion.documentId, parentVersionId: remoteVersion.vaultUpdateId, - remoteHash: await hash(remoteContent), + remoteHash, remoteRelativePath: path }); - await this.operations.create(path, remoteContent); + + + const merged = reconcile("", new TextDecoder().decode(currentContent), new TextDecoder().decode(remoteContent)).text; + await this.operations.write(path, currentContent, new TextEncoder().encode(merged)); await this.updateCache( remoteVersion.vaultUpdateId, remoteContent, @@ -840,82 +707,48 @@ export class Syncer { this.history.addHistoryEntry({ status: SyncStatus.SUCCESS, details: { - type: SyncType.CREATE, + type: SyncType.UPDATE, relativePath: path }, message: - conflictPath !== undefined - ? `Adopted remote create at ${path}; unsynced local bytes preserved at ${conflictPath} for manual recovery` - : `Adopted remote create at ${path}; local file had already been removed`, + `Adopted remote create at ${path}`, author: remoteVersion.userId, timestamp: new Date(remoteVersion.updatedDate) }); return; + } else { + await this.operations.ensureClearPath(path); + await this.operations.create(path, remoteContent); + await this.queue.setDocument(path, { + documentId: remoteVersion.documentId, + parentVersionId: remoteVersion.vaultUpdateId, + remoteHash, + remoteRelativePath: path + }); + this.history.addHistoryEntry({ + status: SyncStatus.SUCCESS, + details: { + type: SyncType.CREATE, + relativePath: path + }, + message: + `Created remotly created file at ${path}`, + author: remoteVersion.userId, + timestamp: new Date(remoteVersion.updatedDate) + }); } - - // Mergeable text: union-merge with empty parent (every byte in - // either side is treated as an insertion), overwrite disk, and - // push the merged result to the server if it diverged from the - // remote copy. Cancelling the Create and re-emitting as a - // SyncLocal update lets the existing merge-response pipeline - // handle parentVersionId/content reconciliation end-to-end. - this.queue.cancelPendingCreate(path); - - const mergedContent = new TextEncoder().encode( - reconcile( - "", - new TextDecoder().decode(localContent), - new TextDecoder().decode(remoteContent) - ).text - ); - - // Adopt the remote document's identity locally *before* touching - // disk so an interleaved event can't mistake the file for a fresh - // create again. `remoteHash` is deliberately the server's content - // hash (not the merged one) so the SyncLocal below sees a real - // diff and actually uploads the merge. - const remoteHash = await hash(remoteContent); - this.queue.setDocument(path, { - documentId: remoteVersion.documentId, - parentVersionId: remoteVersion.vaultUpdateId, - remoteHash, - remoteRelativePath: path - }); - - // Overwrite disk with the merged result. We pass `localContent` as - // the "expected" content so `operations.write`'s internal 3-way - // merge is a no-op (expected == disk ⇒ apply `new` verbatim). - await this.operations.write(path, localContent, mergedContent); - - await this.updateCache( - remoteVersion.vaultUpdateId, - remoteContent, - path - ); - - const mergedHash = await hash(mergedContent); - if (mergedHash !== remoteHash) { - this.syncLocallyUpdatedFile({ relativePath: path }); - } - - this.history.addHistoryEntry({ - status: SyncStatus.SUCCESS, - details: { - type: SyncType.CREATE, - relativePath: path - }, - message: "Merged unsynced local file with concurrent remote create", - author: remoteVersion.userId, - timestamp: new Date(remoteVersion.updatedDate) - }); } + + private async sendUpdate( - record: DocumentRecord, - relativePath: RelativePath, - contentBytes: Uint8Array + { record, relativePath, contentBytes }: { + record: DocumentRecord, + relativePath: RelativePath, + contentBytes: Uint8Array + } ): Promise { const isText = !isBinary(contentBytes) && @@ -946,145 +779,7 @@ export class Syncer { }); } - private async handleMaybeMergingResponse({ - path, - response, - contentHash, - originalContentBytes, - createEvent - }: { - path: RelativePath; - response: DocumentUpdateResponse; - contentHash: string; - originalContentBytes: Uint8Array; - // When processing a Create, pass the originating event so its - // `resolvers` promise can be fulfilled (or rejected, on a deleted - // response). Dependent SyncLocal/Delete events are chained through - // that promise and would otherwise `await` forever. - createEvent?: Extract; - }): Promise { - if (response.isDeleted) { - // A Create that the server returned as already-deleted means - // nothing we can sync — reject the waiting promise so chained - // Delete / SyncLocal events skip themselves instead of hanging. - if (createEvent?.resolvers !== undefined) { - createEvent.resolvers.promise.catch(() => { - /* suppressed — consumer may not be listening */ - }); - createEvent.resolvers.reject( - new Error( - "Create was cancelled — server reported the document as deleted" - ) - ); - } - // Capture the documentId of the record we *believe* is at - // `path` now. If a concurrent `syncRemotelyChangedPath` moves - // this document between our exists-check and our read, the - // record at `path` after those awaits may belong to a - // DIFFERENT document. Guard against that. - const originalRecord = - this.queue.getSettledDocumentByPath(path); - const originalDocumentId = originalRecord?.documentId; - - // If the local file has been edited, re-create it as a new - // document so local edits survive the remote delete — but only - // if nothing else is already queuing a Create for this path, to - // avoid doubling up when offline-change detection races with us. - if (await this.operations.exists(path)) { - const localBytes = await this.operations.read(path); - const localHash = await hash(localBytes); - const currentRecord = - this.queue.getSettledDocumentByPath(path); - // Re-verify the record's identity hasn't shifted under us. - if ( - currentRecord !== undefined && - currentRecord.documentId === originalDocumentId && - localHash !== currentRecord.remoteHash && - !this.queue.hasPendingCreateAt(path) - ) { - this.queue.removeDocument(path); - this.syncLocallyCreatedFile(path); - return; - } - } - // Only delete on disk if the record at `path` is still the one - // we expected — if a self-origin path-change moved another doc - // here, we shouldn't delete its file. - const finalRecord = this.queue.getSettledDocumentByPath(path); - if ( - finalRecord === undefined || - finalRecord.documentId === originalDocumentId - ) { - await this.operations.delete(path); - this.queue.removeDocument(path); - } - return; - } - - // The response carries content only — path reconciliation is the - // sole responsibility of the self-origin `VaultUpdate` echo (the - // `originatesFromSelf=true` branch of `syncRemoteVaultUpdate`), - // which fires independently for renames/dedupes. We therefore - // always record the current local `path` here; an in-flight echo - // will move the file and fix `remoteRelativePath` if the server - // placed the document somewhere else. - const existingRecord = this.queue.getSettledDocumentByPath(path); - const remoteRelativePath = existingRecord?.remoteRelativePath ?? path; - - let record: DocumentRecord; - if ("type" in response && response.type === "MergingUpdate") { - const responseBytes = base64ToBytes(response.contentBase64); - await this.operations.write( - path, - originalContentBytes, - responseBytes - ); - - // Re-read and re-hash after write (invariant #3) - const afterWriteBytes = await this.operations.read(path); - const afterWriteHash = await hash(afterWriteBytes); - - record = { - documentId: response.documentId, - parentVersionId: response.vaultUpdateId, - remoteHash: afterWriteHash, - remoteRelativePath - }; - - // Cache the SERVER's content, not local (invariant #2) - await this.updateCache( - response.vaultUpdateId, - responseBytes, - path - ); - } else { - // Fast-forward update: no merge needed - record = { - documentId: response.documentId, - parentVersionId: response.vaultUpdateId, - remoteHash: contentHash, - remoteRelativePath - }; - - await this.updateCache( - response.vaultUpdateId, - originalContentBytes, - path - ); - } - - // For a Create, fulfill the resolver promise and replace any - // `documentId: Promise<...>` references in queued Delete/SyncLocal - // events with the now-known string id. For everything else a plain - // `setDocument` is enough — the record's identity was already - // resolved when the Create originally settled. - if (createEvent !== undefined) { - this.queue.resolveCreate(createEvent, record); - } else { - this.queue.setDocument(path, record); - } - } private async updateCache( updateId: VaultUpdateId, @@ -1102,24 +797,6 @@ export class Syncer { } } - private getHistoryEntryForSkippedOversizedFile( - sizeInBytes: number, - relativePath: RelativePath - ): CommonHistoryEntry | undefined { - const sizeInMB = Math.round(sizeInBytes / 1024 / 1024); - const { maxFileSizeMB } = this.settings.getSettings(); - if (sizeInMB > maxFileSizeMB) { - return { - status: SyncStatus.SKIPPED, - details: { - type: SyncType.SKIPPED as const, - relativePath - }, - message: `File size of ${sizeInMB} MB exceeds the maximum file size limit of ${maxFileSizeMB} MB` - }; - } - } - private notifyRemainingOperationsChanged(): void { const currentCount = this.queue.pendingUpdateCount; if (this.previousRemainingOperationsCount !== currentCount) { diff --git a/frontend/sync-client/src/sync-operations/types.ts b/frontend/sync-client/src/sync-operations/types.ts index 57cd8a6f..9642665a 100644 --- a/frontend/sync-client/src/sync-operations/types.ts +++ b/frontend/sync-client/src/sync-operations/types.ts @@ -8,7 +8,7 @@ export interface DocumentRecord { documentId: DocumentId; parentVersionId: VaultUpdateId; remoteHash: string; - remoteRelativePath?: RelativePath; + remoteRelativePath: RelativePath; } export interface StoredDocument extends DocumentRecord { @@ -24,21 +24,21 @@ export enum SyncEventType { LocalCreate = "local-create", LocalUpdate = "local-update", // includes both content and path changes LocalDelete = "local-delete", - RemoteUpdate = "remote-update", // includes every type of update coming from the server + RemoteChange = "remote-change", // includes every type of create/update/delete coming from the server } export type FileSyncEvent = | { type: SyncEventType.LocalCreate; path: RelativePath } | { type: SyncEventType.LocalUpdate; path: RelativePath; oldPath?: RelativePath } | { type: SyncEventType.LocalDelete; path: RelativePath } - | { type: SyncEventType.RemoteUpdate; remoteVersion: DocumentVersionWithoutContent }; + | { type: SyncEventType.RemoteChange; remoteVersion: DocumentVersionWithoutContent }; export type SyncEvent = | { type: SyncEventType.LocalCreate; path: RelativePath; // current path on disk originalPath: RelativePath; // original path on disk when the event was queued - resolvers?: PromiseWithResolvers + resolvers: PromiseWithResolvers } | { type: SyncEventType.LocalUpdate; @@ -52,6 +52,6 @@ export type SyncEvent = documentId: DocumentId | Promise; // if it's a promise, the promise is fulfilled once the document's create event is processed } | { - type: SyncEventType.RemoteUpdate; + type: SyncEventType.RemoteChange; remoteVersion: DocumentVersionWithoutContent; };