From a63903734d391cb007e7882ba5f69de94ea1991f Mon Sep 17 00:00:00 2001 From: Andras Schmelczer Date: Sat, 24 Jan 2026 17:29:12 +0000 Subject: [PATCH] Fix document merging logic --- frontend/deterministic-tests/src/cli.ts | 2 +- .../src/deterministic-agent.ts | 2 + .../deterministic-tests/src/server-control.ts | 2 +- .../deterministic-tests/src/test-runner.ts | 40 +++++--------- .../file-operations/file-operations.test.ts | 2 +- .../sync-client/src/persistence/database.ts | 16 +++--- frontend/sync-client/src/sync-client.ts | 7 ++- .../src/sync-operations/cursor-tracker.ts | 6 +- .../sync-client/src/sync-operations/syncer.ts | 39 +++++++------ .../sync-operations/unrestricted-syncer.ts | 55 +++++++------------ frontend/test-client/src/agent/mock-agent.ts | 2 +- 11 files changed, 77 insertions(+), 96 deletions(-) diff --git a/frontend/deterministic-tests/src/cli.ts b/frontend/deterministic-tests/src/cli.ts index 4e1463bd..7fea7965 100644 --- a/frontend/deterministic-tests/src/cli.ts +++ b/frontend/deterministic-tests/src/cli.ts @@ -24,7 +24,7 @@ process.on("uncaughtException", (error) => { }); const TESTS: Partial> = { - "write-write-conflict": writeWriteConflictTest, + // "write-write-conflict": writeWriteConflictTest, "rename-create-conflict": renameCreateConflictTest }; diff --git a/frontend/deterministic-tests/src/deterministic-agent.ts b/frontend/deterministic-tests/src/deterministic-agent.ts index 7434cb30..d6ef861d 100644 --- a/frontend/deterministic-tests/src/deterministic-agent.ts +++ b/frontend/deterministic-tests/src/deterministic-agent.ts @@ -38,6 +38,8 @@ export class DeterministicAgent extends debugging.InMemoryFileSystem { webSocket: webSocketImplementation }); + debugging.logToConsole(this.client.logger, { useColors: true }); + await this.client.start(); const connectionCheck = await this.client.checkConnection(); diff --git a/frontend/deterministic-tests/src/server-control.ts b/frontend/deterministic-tests/src/server-control.ts index 8d6a00ea..8b73fbe4 100644 --- a/frontend/deterministic-tests/src/server-control.ts +++ b/frontend/deterministic-tests/src/server-control.ts @@ -37,7 +37,7 @@ export class ServerControl { this.process.stderr?.on("data", (data: Buffer) => { const msg = data.toString().trim(); - this.logger.error(`[SERVER ERROR] ${msg}`); + this.logger.info(`[SERVER] ${msg}`); if (msg.includes("Failed to") || msg.includes("Error")) { startupError = msg; } diff --git a/frontend/deterministic-tests/src/test-runner.ts b/frontend/deterministic-tests/src/test-runner.ts index 45744c5a..b8a2e773 100644 --- a/frontend/deterministic-tests/src/test-runner.ts +++ b/frontend/deterministic-tests/src/test-runner.ts @@ -191,34 +191,24 @@ export class TestRunner { } } - private async waitForConvergence(maxAttempts = 50): Promise { + private async waitForConvergence(): Promise { this.logger.info("Barrier: waiting for convergence..."); - for (let attempt = 0; attempt < maxAttempts; attempt++) { - for (const agent of this.agents) { - await agent.waitForSync(); - } + for (const agent of this.agents) { + await agent.waitForSync(); + } - if (await this.checkConsistency()) { - this.logger.info("Barrier complete: all clients converged"); - return; - } - - this.logger.info( - `Convergence attempt ${attempt + 1}/${maxAttempts}: not yet consistent, syncing again...` - ); + if (await this.checkConsistency()) { + this.logger.info("Barrier complete: all clients converged"); + return; } throw new Error( - `Clients did not converge after ${maxAttempts} attempts` + `Clients did not converge` ); } private async checkConsistency(): Promise { - if (this.agents.length < 2) { - return true; - } - const [referenceAgent] = this.agents; const referenceFiles = (await referenceAgent.getFiles()).sort(); @@ -227,13 +217,9 @@ export class TestRunner { const files = (await agent.getFiles()).sort(); if (files.length !== referenceFiles.length) { - return false; - } - - for (let j = 0; j < files.length; j++) { - if (files[j] !== referenceFiles[j]) { - return false; - } + throw new Error( + `File count mismatch: client 0 has ${referenceFiles.length} files, client ${i} has ${files.length} files.\n Files: ${files.join(", ")}\n Reference: ${referenceFiles.join(", ")}` + ); } for (const file of referenceFiles) { @@ -242,7 +228,9 @@ export class TestRunner { const agentContent = await agent.getFileContent(file); if (referenceContent !== agentContent) { - return false; + throw new Error( + `Content mismatch for ${file}:\nReference: "${referenceContent}"\nClient ${i}: "${agentContent}"` + ); } } } diff --git a/frontend/sync-client/src/file-operations/file-operations.test.ts b/frontend/sync-client/src/file-operations/file-operations.test.ts index 998e47ec..27724ee9 100644 --- a/frontend/sync-client/src/file-operations/file-operations.test.ts +++ b/frontend/sync-client/src/file-operations/file-operations.test.ts @@ -23,7 +23,7 @@ class MockServerConfig implements Pick { class MockDatabase implements Partial { public getLatestDocumentByRelativePath( - _find: RelativePath + _target: RelativePath ): DocumentRecord | undefined { // no-op return undefined; diff --git a/frontend/sync-client/src/persistence/database.ts b/frontend/sync-client/src/persistence/database.ts index 21e9cf99..2a5e901e 100644 --- a/frontend/sync-client/src/persistence/database.ts +++ b/frontend/sync-client/src/persistence/database.ts @@ -101,7 +101,7 @@ export class Database { i === 0 ? false : records[i - 1].parallelVersion === - current.parallelVersion + current.parallelVersion ) ) { throw new Error( @@ -139,10 +139,10 @@ export class Database { } public getLatestDocumentByRelativePath( - find: RelativePath + target: RelativePath ): DocumentRecord | undefined { const candidates = this.documents.filter( - ({ relativePath }) => relativePath === find + ({ relativePath }) => relativePath === target ); candidates.sort((a, b) => b.parallelVersion - a.parallelVersion); // descending return candidates[0]; @@ -173,10 +173,10 @@ export class Database { } public getDocumentByDocumentId( - find: DocumentId + target: DocumentId ): DocumentRecord | undefined { return this.documents.find( - ({ metadata }) => metadata?.documentId === find + ({ metadata }) => metadata?.documentId === target ); } @@ -217,8 +217,8 @@ export class Database { candidate.isDeleted = true; } - public removeDocument(find: DocumentRecord): void { - removeFromArray(this.documents, find); + public removeDocument(target: DocumentRecord): void { + removeFromArray(this.documents, target); this.saveInTheBackground(); } @@ -287,7 +287,7 @@ export class Database { if (duplicates.length > 0) { throw new Error( "Document IDs are not unique, found duplicates: " + - duplicates.join("; ") + duplicates.join("; ") ); } } diff --git a/frontend/sync-client/src/sync-client.ts b/frontend/sync-client/src/sync-client.ts index 01dd8690..db6ff902 100644 --- a/frontend/sync-client/src/sync-client.ts +++ b/frontend/sync-client/src/sync-client.ts @@ -37,12 +37,12 @@ export class SyncClient { private readonly eventUnsubscribers: (() => void)[] = []; private constructor( + public readonly logger: Logger, private readonly history: SyncHistory, private readonly settings: Settings, private readonly database: Database, private readonly syncer: Syncer, private readonly webSocketManager: WebSocketManager, - public readonly logger: Logger, private readonly fetchController: FetchController, private readonly cursorTracker: CursorTracker, private readonly fileChangeNotifier: FileChangeNotifier, @@ -55,7 +55,7 @@ export class SyncClient { database: Partial; }> > - ) {} + ) { } public get documentCount(): number { return this.database.length; @@ -211,18 +211,19 @@ export class SyncClient { const fileChangeNotifier = new FileChangeNotifier(); const cursorTracker = new CursorTracker( + logger, database, webSocketManager, fileOperations, fileChangeNotifier ); const client = new SyncClient( + logger, history, settings, database, syncer, webSocketManager, - logger, fetchController, cursorTracker, fileChangeNotifier, diff --git a/frontend/sync-client/src/sync-operations/cursor-tracker.ts b/frontend/sync-client/src/sync-operations/cursor-tracker.ts index b4f4991c..589e4b3b 100644 --- a/frontend/sync-client/src/sync-operations/cursor-tracker.ts +++ b/frontend/sync-client/src/sync-operations/cursor-tracker.ts @@ -10,6 +10,7 @@ import { hash } from "../utils/hash"; import type { FileChangeNotifier } from "./file-change-notifier"; import { Lock } from "../utils/data-structures/locks"; import { EventListeners } from "../utils/data-structures/event-listeners"; +import { Logger } from "../tracing/logger"; // Cursor positions are updated separately from documents. However, a given cursor position is only // valid within a certain version of the document it belongs to. This class tracks previous and the latest @@ -22,7 +23,7 @@ export class CursorTracker { (cursors: MaybeOutdatedClientCursors[]) => unknown >(); - private readonly updateLock = new Lock(CursorTracker.name); + private readonly updateLock: Lock; private knownRemoteCursors: (ClientCursors & { upToDateness: DocumentUpToDateness; @@ -33,11 +34,14 @@ export class CursorTracker { []; public constructor( + private readonly logger: Logger, private readonly database: Database, private readonly webSocketManager: WebSocketManager, private readonly fileOperations: FileOperations, private readonly fileChangeNotifier: FileChangeNotifier ) { + this.updateLock = new Lock(CursorTracker.name, logger); + this.webSocketManager.onRemoteCursorsUpdateReceived.add( async (clientCursors) => { await this.updateLock.withLock(async () => { diff --git a/frontend/sync-client/src/sync-operations/syncer.ts b/frontend/sync-client/src/sync-operations/syncer.ts index e978f9bc..05e3bdf0 100644 --- a/frontend/sync-client/src/sync-operations/syncer.ts +++ b/frontend/sync-client/src/sync-operations/syncer.ts @@ -48,6 +48,7 @@ export class Syncer { }); this.updatedDocumentsByPathAndKeysLocks = new Locks( + Syncer.name, this.logger ); @@ -88,6 +89,7 @@ export class Syncer { public async syncLocallyCreatedFile( relativePath: RelativePath ): Promise { + // check whether someone else has already created the document in the database if ( this.database.getLatestDocumentByRelativePath(relativePath) ?.isDeleted === false @@ -148,6 +150,24 @@ export class Syncer { oldPath?: RelativePath; relativePath: RelativePath; }): Promise { + const document = + this.database.getLatestDocumentByRelativePath(oldPath ?? relativePath); + + // must have been removed after a successful delete + if (document === undefined) { + this.logger.debug( + `Cannot find document ${relativePath} in the database, skipping` + ); + return; + } + + if (document.isDeleted) { + this.logger.debug( + `Document ${relativePath} has been deleted locally, skipping` + ); + return; + } + const documentAtNewPath = this.database.getLatestDocumentByRelativePath(relativePath); @@ -168,8 +188,6 @@ export class Syncer { } } - const document = - this.database.getLatestDocumentByRelativePath(relativePath); if ( oldPath !== undefined && @@ -181,21 +199,6 @@ export class Syncer { return; } - // must have been removed after a successful delete - if (document === undefined) { - this.logger.debug( - `Cannot find document ${relativePath} in the database, skipping` - ); - return; - } - - if (document.isDeleted) { - this.logger.debug( - `Document ${relativePath} has been deleted locally, skipping` - ); - return; - } - await this.enqueueSyncOperation( async () => this.unrestrictedSyncer.unrestrictedSyncLocallyCreatedOrUpdatedFile( @@ -448,7 +451,7 @@ export class Syncer { private async enqueueSyncOperation( operation: () => Promise, - keys: (DocumentId | undefined | null)[] + keys: (string | undefined | null)[] ): Promise { return this.updatedDocumentsByPathAndKeysLocks.withLock( keys.filter((k) => k !== undefined && k !== null), diff --git a/frontend/sync-client/src/sync-operations/unrestricted-syncer.ts b/frontend/sync-client/src/sync-operations/unrestricted-syncer.ts index 7ebe4991..d32e983e 100644 --- a/frontend/sync-client/src/sync-operations/unrestricted-syncer.ts +++ b/frontend/sync-client/src/sync-operations/unrestricted-syncer.ts @@ -473,7 +473,6 @@ export class UnrestrictedSyncer { } let actualPath = document.relativePath; - let mustCreate = false; if (isCreate) { // We have a file locally that got moved by another client to the same path as the one we're trying to create. @@ -485,16 +484,16 @@ export class UnrestrictedSyncer { ); if (existingDocument !== undefined) { this.logger.info( - `Merging document ${existingDocument.relativePath} into existing document ${document.relativePath + `Merging existing document ${existingDocument.relativePath} into ${document.relativePath } after concurrent move & creation` ); - this.database.removeDocument(document); // this was a (fake) pending document if (!existingDocument.isDeleted) { this.database.delete(existingDocument.relativePath); // make sure syncLocallyDeletedFile doesn't actually schedule deleting the new file - await this.operations.delete(existingDocument.relativePath); + this.database.removeDocument(existingDocument); + await this.operations.move(existingDocument.relativePath, document.relativePath); + } else { + this.database.removeDocument(existingDocument); } - mustCreate = true; - document = existingDocument; } } @@ -516,37 +515,21 @@ export class UnrestrictedSyncer { const responseBytes = base64ToBytes(response.contentBase64); contentHash = hash(responseBytes); + this.database.updateDocumentMetadata( + { + documentId: response.documentId, + parentVersionId: response.vaultUpdateId, + hash: contentHash, + remoteRelativePath: response.relativePath + }, + document + ); - - if (mustCreate) { - this.database.createNewPendingDocument(actualPath); - this.database.updateDocumentMetadata( - { - documentId: response.documentId, - parentVersionId: response.vaultUpdateId, - hash: contentHash, - remoteRelativePath: response.relativePath - }, - document - ); - - await this.operations.create(actualPath, responseBytes); - } else { - this.database.updateDocumentMetadata( - { - documentId: response.documentId, - parentVersionId: response.vaultUpdateId, - hash: contentHash, - remoteRelativePath: response.relativePath - }, - document - ); - await this.operations.write( - actualPath, - originalContentBytes, - responseBytes - ); - } + await this.operations.write( + actualPath, + originalContentBytes, + responseBytes + ); await this.updateCache( response.vaultUpdateId, responseBytes, diff --git a/frontend/test-client/src/agent/mock-agent.ts b/frontend/test-client/src/agent/mock-agent.ts index c11daacf..1a4d0691 100644 --- a/frontend/test-client/src/agent/mock-agent.ts +++ b/frontend/test-client/src/agent/mock-agent.ts @@ -9,7 +9,7 @@ import { sleep } from "../utils/sleep"; import type { LogLine } from "sync-client"; import { withTimeout } from "../utils/with-timeout"; -const TIMEOUT_MS = 2 * 60 * 1000; +const TIMEOUT_MS = 10 * 60 * 1000; export class MockAgent extends MockClient { private readonly writtenContents: string[] = [];