Fix document merging logic

This commit is contained in:
Andras Schmelczer 2026-01-24 17:29:12 +00:00
parent 75ef370703
commit a63903734d
11 changed files with 77 additions and 96 deletions

View file

@ -24,7 +24,7 @@ process.on("uncaughtException", (error) => {
});
const TESTS: Partial<Record<string, TestDefinition>> = {
"write-write-conflict": writeWriteConflictTest,
// "write-write-conflict": writeWriteConflictTest,
"rename-create-conflict": renameCreateConflictTest
};

View file

@ -38,6 +38,8 @@ export class DeterministicAgent extends debugging.InMemoryFileSystem {
webSocket: webSocketImplementation
});
debugging.logToConsole(this.client.logger, { useColors: true });
await this.client.start();
const connectionCheck = await this.client.checkConnection();

View file

@ -37,7 +37,7 @@ export class ServerControl {
this.process.stderr?.on("data", (data: Buffer) => {
const msg = data.toString().trim();
this.logger.error(`[SERVER ERROR] ${msg}`);
this.logger.info(`[SERVER] ${msg}`);
if (msg.includes("Failed to") || msg.includes("Error")) {
startupError = msg;
}

View file

@ -191,34 +191,24 @@ export class TestRunner {
}
}
private async waitForConvergence(maxAttempts = 50): Promise<void> {
private async waitForConvergence(): Promise<void> {
this.logger.info("Barrier: waiting for convergence...");
for (let attempt = 0; attempt < maxAttempts; attempt++) {
for (const agent of this.agents) {
await agent.waitForSync();
}
for (const agent of this.agents) {
await agent.waitForSync();
}
if (await this.checkConsistency()) {
this.logger.info("Barrier complete: all clients converged");
return;
}
this.logger.info(
`Convergence attempt ${attempt + 1}/${maxAttempts}: not yet consistent, syncing again...`
);
if (await this.checkConsistency()) {
this.logger.info("Barrier complete: all clients converged");
return;
}
throw new Error(
`Clients did not converge after ${maxAttempts} attempts`
`Clients did not converge`
);
}
private async checkConsistency(): Promise<boolean> {
if (this.agents.length < 2) {
return true;
}
const [referenceAgent] = this.agents;
const referenceFiles = (await referenceAgent.getFiles()).sort();
@ -227,13 +217,9 @@ export class TestRunner {
const files = (await agent.getFiles()).sort();
if (files.length !== referenceFiles.length) {
return false;
}
for (let j = 0; j < files.length; j++) {
if (files[j] !== referenceFiles[j]) {
return false;
}
throw new Error(
`File count mismatch: client 0 has ${referenceFiles.length} files, client ${i} has ${files.length} files.\n Files: ${files.join(", ")}\n Reference: ${referenceFiles.join(", ")}`
);
}
for (const file of referenceFiles) {
@ -242,7 +228,9 @@ export class TestRunner {
const agentContent = await agent.getFileContent(file);
if (referenceContent !== agentContent) {
return false;
throw new Error(
`Content mismatch for ${file}:\nReference: "${referenceContent}"\nClient ${i}: "${agentContent}"`
);
}
}
}

View file

@ -23,7 +23,7 @@ class MockServerConfig implements Pick<ServerConfig, "getConfig"> {
class MockDatabase implements Partial<Database> {
public getLatestDocumentByRelativePath(
_find: RelativePath
_target: RelativePath
): DocumentRecord | undefined {
// no-op
return undefined;

View file

@ -101,7 +101,7 @@ export class Database {
i === 0
? false
: records[i - 1].parallelVersion ===
current.parallelVersion
current.parallelVersion
)
) {
throw new Error(
@ -139,10 +139,10 @@ export class Database {
}
public getLatestDocumentByRelativePath(
find: RelativePath
target: RelativePath
): DocumentRecord | undefined {
const candidates = this.documents.filter(
({ relativePath }) => relativePath === find
({ relativePath }) => relativePath === target
);
candidates.sort((a, b) => b.parallelVersion - a.parallelVersion); // descending
return candidates[0];
@ -173,10 +173,10 @@ export class Database {
}
public getDocumentByDocumentId(
find: DocumentId
target: DocumentId
): DocumentRecord | undefined {
return this.documents.find(
({ metadata }) => metadata?.documentId === find
({ metadata }) => metadata?.documentId === target
);
}
@ -217,8 +217,8 @@ export class Database {
candidate.isDeleted = true;
}
public removeDocument(find: DocumentRecord): void {
removeFromArray(this.documents, find);
public removeDocument(target: DocumentRecord): void {
removeFromArray(this.documents, target);
this.saveInTheBackground();
}
@ -287,7 +287,7 @@ export class Database {
if (duplicates.length > 0) {
throw new Error(
"Document IDs are not unique, found duplicates: " +
duplicates.join("; ")
duplicates.join("; ")
);
}
}

View file

@ -37,12 +37,12 @@ export class SyncClient {
private readonly eventUnsubscribers: (() => void)[] = [];
private constructor(
public readonly logger: Logger,
private readonly history: SyncHistory,
private readonly settings: Settings,
private readonly database: Database,
private readonly syncer: Syncer,
private readonly webSocketManager: WebSocketManager,
public readonly logger: Logger,
private readonly fetchController: FetchController,
private readonly cursorTracker: CursorTracker,
private readonly fileChangeNotifier: FileChangeNotifier,
@ -55,7 +55,7 @@ export class SyncClient {
database: Partial<StoredDatabase>;
}>
>
) {}
) { }
public get documentCount(): number {
return this.database.length;
@ -211,18 +211,19 @@ export class SyncClient {
const fileChangeNotifier = new FileChangeNotifier();
const cursorTracker = new CursorTracker(
logger,
database,
webSocketManager,
fileOperations,
fileChangeNotifier
);
const client = new SyncClient(
logger,
history,
settings,
database,
syncer,
webSocketManager,
logger,
fetchController,
cursorTracker,
fileChangeNotifier,

View file

@ -10,6 +10,7 @@ import { hash } from "../utils/hash";
import type { FileChangeNotifier } from "./file-change-notifier";
import { Lock } from "../utils/data-structures/locks";
import { EventListeners } from "../utils/data-structures/event-listeners";
import { Logger } from "../tracing/logger";
// Cursor positions are updated separately from documents. However, a given cursor position is only
// valid within a certain version of the document it belongs to. This class tracks previous and the latest
@ -22,7 +23,7 @@ export class CursorTracker {
(cursors: MaybeOutdatedClientCursors[]) => unknown
>();
private readonly updateLock = new Lock(CursorTracker.name);
private readonly updateLock: Lock;
private knownRemoteCursors: (ClientCursors & {
upToDateness: DocumentUpToDateness;
@ -33,11 +34,14 @@ export class CursorTracker {
[];
public constructor(
private readonly logger: Logger,
private readonly database: Database,
private readonly webSocketManager: WebSocketManager,
private readonly fileOperations: FileOperations,
private readonly fileChangeNotifier: FileChangeNotifier
) {
this.updateLock = new Lock(CursorTracker.name, logger);
this.webSocketManager.onRemoteCursorsUpdateReceived.add(
async (clientCursors) => {
await this.updateLock.withLock(async () => {

View file

@ -48,6 +48,7 @@ export class Syncer {
});
this.updatedDocumentsByPathAndKeysLocks = new Locks<DocumentId>(
Syncer.name,
this.logger
);
@ -88,6 +89,7 @@ export class Syncer {
public async syncLocallyCreatedFile(
relativePath: RelativePath
): Promise<void> {
// check whether someone else has already created the document in the database
if (
this.database.getLatestDocumentByRelativePath(relativePath)
?.isDeleted === false
@ -148,6 +150,24 @@ export class Syncer {
oldPath?: RelativePath;
relativePath: RelativePath;
}): Promise<void> {
const document =
this.database.getLatestDocumentByRelativePath(oldPath ?? relativePath);
// must have been removed after a successful delete
if (document === undefined) {
this.logger.debug(
`Cannot find document ${relativePath} in the database, skipping`
);
return;
}
if (document.isDeleted) {
this.logger.debug(
`Document ${relativePath} has been deleted locally, skipping`
);
return;
}
const documentAtNewPath =
this.database.getLatestDocumentByRelativePath(relativePath);
@ -168,8 +188,6 @@ export class Syncer {
}
}
const document =
this.database.getLatestDocumentByRelativePath(relativePath);
if (
oldPath !== undefined &&
@ -181,21 +199,6 @@ export class Syncer {
return;
}
// must have been removed after a successful delete
if (document === undefined) {
this.logger.debug(
`Cannot find document ${relativePath} in the database, skipping`
);
return;
}
if (document.isDeleted) {
this.logger.debug(
`Document ${relativePath} has been deleted locally, skipping`
);
return;
}
await this.enqueueSyncOperation(
async () =>
this.unrestrictedSyncer.unrestrictedSyncLocallyCreatedOrUpdatedFile(
@ -448,7 +451,7 @@ export class Syncer {
private async enqueueSyncOperation<T>(
operation: () => Promise<T>,
keys: (DocumentId | undefined | null)[]
keys: (string | undefined | null)[]
): Promise<T> {
return this.updatedDocumentsByPathAndKeysLocks.withLock(
keys.filter((k) => k !== undefined && k !== null),

View file

@ -473,7 +473,6 @@ export class UnrestrictedSyncer {
}
let actualPath = document.relativePath;
let mustCreate = false;
if (isCreate) {
// We have a file locally that got moved by another client to the same path as the one we're trying to create.
@ -485,16 +484,16 @@ export class UnrestrictedSyncer {
);
if (existingDocument !== undefined) {
this.logger.info(
`Merging document ${existingDocument.relativePath} into existing document ${document.relativePath
`Merging existing document ${existingDocument.relativePath} into ${document.relativePath
} after concurrent move & creation`
);
this.database.removeDocument(document); // this was a (fake) pending document
if (!existingDocument.isDeleted) {
this.database.delete(existingDocument.relativePath); // make sure syncLocallyDeletedFile doesn't actually schedule deleting the new file
await this.operations.delete(existingDocument.relativePath);
this.database.removeDocument(existingDocument);
await this.operations.move(existingDocument.relativePath, document.relativePath);
} else {
this.database.removeDocument(existingDocument);
}
mustCreate = true;
document = existingDocument;
}
}
@ -516,37 +515,21 @@ export class UnrestrictedSyncer {
const responseBytes = base64ToBytes(response.contentBase64);
contentHash = hash(responseBytes);
this.database.updateDocumentMetadata(
{
documentId: response.documentId,
parentVersionId: response.vaultUpdateId,
hash: contentHash,
remoteRelativePath: response.relativePath
},
document
);
if (mustCreate) {
this.database.createNewPendingDocument(actualPath);
this.database.updateDocumentMetadata(
{
documentId: response.documentId,
parentVersionId: response.vaultUpdateId,
hash: contentHash,
remoteRelativePath: response.relativePath
},
document
);
await this.operations.create(actualPath, responseBytes);
} else {
this.database.updateDocumentMetadata(
{
documentId: response.documentId,
parentVersionId: response.vaultUpdateId,
hash: contentHash,
remoteRelativePath: response.relativePath
},
document
);
await this.operations.write(
actualPath,
originalContentBytes,
responseBytes
);
}
await this.operations.write(
actualPath,
originalContentBytes,
responseBytes
);
await this.updateCache(
response.vaultUpdateId,
responseBytes,

View file

@ -9,7 +9,7 @@ import { sleep } from "../utils/sleep";
import type { LogLine } from "sync-client";
import { withTimeout } from "../utils/with-timeout";
const TIMEOUT_MS = 2 * 60 * 1000;
const TIMEOUT_MS = 10 * 60 * 1000;
export class MockAgent extends MockClient {
private readonly writtenContents: string[] = [];