From 727b6b7ed5a896cc0d450bc65aa89aac228eba51 Mon Sep 17 00:00:00 2001 From: Andras Schmelczer Date: Thu, 22 Jan 2026 20:21:30 +0000 Subject: [PATCH] Use locks --- .../deterministic-tests/src/test-runner.ts | 1 - frontend/sync-client/src/index.ts | 3 +- .../sync-client/src/persistence/database.ts | 104 +++------ frontend/sync-client/src/sync-client.ts | 11 +- .../sync-client/src/sync-operations/syncer.ts | 201 ++++++------------ .../sync-operations/unrestricted-syncer.ts | 131 ++++++------ .../src/utils/data-structures/locks.test.ts | 29 ++- .../src/utils/data-structures/locks.ts | 56 +++-- frontend/test-client/src/agent/mock-agent.ts | 13 +- frontend/test-client/src/cli.ts | 13 +- 10 files changed, 245 insertions(+), 317 deletions(-) diff --git a/frontend/deterministic-tests/src/test-runner.ts b/frontend/deterministic-tests/src/test-runner.ts index eb778c1a..50e44c3e 100644 --- a/frontend/deterministic-tests/src/test-runner.ts +++ b/frontend/deterministic-tests/src/test-runner.ts @@ -8,7 +8,6 @@ import { DeterministicAgent } from "./deterministic-agent"; import type { ServerControl } from "./server-control"; import type { SyncSettings, Logger } from "sync-client"; import { assert } from "./utils/assert"; -import WebSocket from "ws"; import { randomUUID } from "node:crypto"; export class TestRunner { diff --git a/frontend/sync-client/src/index.ts b/frontend/sync-client/src/index.ts index c4e4313d..07a2b598 100644 --- a/frontend/sync-client/src/index.ts +++ b/frontend/sync-client/src/index.ts @@ -33,13 +33,14 @@ export type { AuthenticationError } from "./errors/authentication-error"; export type { MaybeOutdatedClientCursors } from "./types/maybe-outdated-client-cursors"; export { DocumentSyncStatus } from "./types/document-sync-status"; export { SyncClient } from "./sync-client"; +export { __debug_locks } from "./sync-operations/syncer"; export type { TextWithCursors, CursorPosition } from "reconcile-text"; export const debugging = { slowFetchFactory, slowWebSocketFactory, logToConsole, - InMemoryFileSystem + InMemoryFileSystem, }; export const utils = { diff --git a/frontend/sync-client/src/persistence/database.ts b/frontend/sync-client/src/persistence/database.ts index 6cd53504..02356ff9 100644 --- a/frontend/sync-client/src/persistence/database.ts +++ b/frontend/sync-client/src/persistence/database.ts @@ -38,7 +38,6 @@ export interface DocumentRecord { relativePath: RelativePath; metadata: DocumentMetadata | undefined; isDeleted: boolean; - updates: Promise[]; parallelVersion: number; } @@ -58,7 +57,6 @@ export class Database { relativePath, metadata, isDeleted: false, - updates: [], parallelVersion: 0 })) ?? []; @@ -103,7 +101,7 @@ export class Database { i === 0 ? false : records[i - 1].parallelVersion === - current.parallelVersion + current.parallelVersion ) ) { throw new Error( @@ -121,37 +119,30 @@ export class Database { hash: string; remoteRelativePath: RelativePath; }, - toUpdate: DocumentRecord + target: DocumentRecord ): void { - if (!this.documents.includes(toUpdate)) { + if (!this.documents.includes(target)) { throw new Error("Document not found in database"); } - toUpdate.metadata = metadata; - - this.saveInTheBackground(); - } - - public removeDocumentPromise(promise: Promise): void { - const entry = this.documents.find(({ updates }) => - updates.includes(promise) + this.logger.debug( + `Updating document metadata for ${target.relativePath} from ${JSON.stringify( + target.metadata, + null, + 2 + )} to ${JSON.stringify( + metadata, + null, + 2 + )}` ); - if (entry === undefined) { - // This method should be idempotent and tolerant of - // stragglers calling it after the databse has been reset. - return; - } + target.metadata = metadata; - removeFromArray(entry.updates, promise); - // No need to save as Promises don't get serialized - } - - public removeDocument(find: DocumentRecord): void { - removeFromArray(this.documents, find); this.saveInTheBackground(); } + public getLatestDocumentByRelativePath( find: RelativePath ): DocumentRecord | undefined { @@ -162,32 +153,9 @@ export class Database { return candidates[0]; } - public async getResolvedDocumentByRelativePath( - relativePath: RelativePath, - promise: Promise - ): Promise { - const entry = this.getLatestDocumentByRelativePath(relativePath); - - if (entry === undefined) { - throw new Error( - `Document not found by relative path in getResolvedDocumentByRelativePath: ${relativePath}, ${JSON.stringify( - this.documents, - null, - 2 - )}` - ); - } - - const currentPromises = entry.updates; - entry.updates = [...currentPromises, promise]; - await awaitAll(currentPromises); - - return entry; - } public createNewPendingDocument( relativePath: RelativePath, - promise: Promise ): DocumentRecord { this.logger.debug(`Creating new pending document: ${relativePath}`); const previousEntry = @@ -197,7 +165,6 @@ export class Database { relativePath, metadata: undefined, isDeleted: false, - updates: [promise], parallelVersion: previousEntry?.parallelVersion === undefined ? 0 @@ -205,31 +172,8 @@ export class Database { }; this.documents.push(entry); - this.saveInTheBackground(); - return entry; - } - - public createNewEmptyDocument( - documentId: DocumentId, - parentVersionId: VaultUpdateId, - relativePath: RelativePath - ): DocumentRecord { - const entry = { - relativePath, - metadata: { - documentId, - parentVersionId, - hash: EMPTY_HASH, - remoteRelativePath: relativePath - }, - isDeleted: false, - updates: [], - parallelVersion: 0 - }; - - this.documents.push(entry); - this.saveInTheBackground(); + // no need to save as we only save documents which have metadata return entry; } @@ -274,17 +218,17 @@ export class Database { public delete(relativePath: RelativePath): void { const candidate = this.getLatestDocumentByRelativePath(relativePath); if (candidate === undefined) { - throw new Error( - `Document not found by relative path in delete: ${relativePath}, ${JSON.stringify( - this.documents, - null, - 2 - )}` - ); + return; } candidate.isDeleted = true; } + + public removeDocument(find: DocumentRecord): void { + removeFromArray(this.documents, find); + this.saveInTheBackground(); + } + public getLastSeenUpdateId(): VaultUpdateId { return this.lastSeenUpdateIds.min; } @@ -350,7 +294,7 @@ export class Database { if (duplicates.length > 0) { throw new Error( "Document IDs are not unique, found duplicates: " + - duplicates.join("; ") + duplicates.join("; ") ); } } diff --git a/frontend/sync-client/src/sync-client.ts b/frontend/sync-client/src/sync-client.ts index 1d3b3c6b..b9e15a6f 100644 --- a/frontend/sync-client/src/sync-client.ts +++ b/frontend/sync-client/src/sync-client.ts @@ -56,7 +56,7 @@ export class SyncClient { database: Partial; }> > - ) {} + ) { } public get documentCount(): number { return this.database.length; @@ -410,12 +410,8 @@ export class SyncClient { return DocumentSyncStatus.SYNCING; } - const document = - this.database.getLatestDocumentByRelativePath(relativePath); - if (document === undefined) { - return DocumentSyncStatus.SYNCING; - } - return document.updates.length > 0 + + return this.syncer.hasPendingOperationsForDocument(relativePath) ? DocumentSyncStatus.SYNCING : DocumentSyncStatus.UP_TO_DATE; } @@ -495,7 +491,6 @@ export class SyncClient { // don't reset the logger this.cursorTracker.reset(); this.syncer.reset(); - this.unrestrictedSyncer.reset(); this.fileOperations.reset(); } diff --git a/frontend/sync-client/src/sync-operations/syncer.ts b/frontend/sync-client/src/sync-operations/syncer.ts index 0b85601e..e0787672 100644 --- a/frontend/sync-client/src/sync-operations/syncer.ts +++ b/frontend/sync-client/src/sync-operations/syncer.ts @@ -21,12 +21,14 @@ import type { WebSocketClientMessage } from "../services/types/WebSocketClientMe import { awaitAll } from "../utils/await-all"; import { EventListeners } from "../utils/data-structures/event-listeners"; +export const __debug_locks: Locks[] = []; // Used only for debugging timeouts + export class Syncer { public readonly onRemainingOperationsCountChanged = new EventListeners< (remainingOperations: number) => unknown >(); - private readonly remoteDocumentsLock: Locks; + public readonly updatedDocumentsByPathAndKeysLock: Locks; // FIFO to limit the number of concurrent sync operations private readonly syncQueue: PQueue; @@ -48,7 +50,8 @@ export class Syncer { concurrency: settings.getSettings().syncConcurrency }); - this.remoteDocumentsLock = new Locks(this.logger); + this.updatedDocumentsByPathAndKeysLock = new Locks(this.logger); + __debug_locks.push(this.updatedDocumentsByPathAndKeysLock); // Used only for debugging timeouts settings.onSettingsChanged.add((newSettings, oldSettings) => { if (newSettings.syncConcurrency !== oldSettings.syncConcurrency) { @@ -80,6 +83,10 @@ export class Syncer { return this._isFirstSyncComplete; } + public hasPendingOperationsForDocument(relativePath: string): boolean { + return this.updatedDocumentsByPathAndKeysLock.isLocked(relativePath); + } + public async syncLocallyCreatedFile( relativePath: RelativePath ): Promise { @@ -95,33 +102,27 @@ export class Syncer { return; } - const [promise, resolve, reject] = createPromise(); - const document = this.database.createNewPendingDocument( - relativePath, - promise + relativePath ); - try { - await this.syncQueue.add(async () => - this.unrestrictedSyncer.unrestrictedSyncLocallyCreatedOrUpdatedFile( - { document } - ) - ); - - resolve(); - } catch (e) { - reject(e); - } finally { - this.database.removeDocumentPromise(promise); - } + await this.enqueueSyncOperation(async () => + this.unrestrictedSyncer.unrestrictedSyncLocallyCreatedOrUpdatedFile( + { + document + } + ), [relativePath] + ); } public async syncLocallyDeletedFile( relativePath: RelativePath ): Promise { + const document = this.database.getLatestDocumentByRelativePath(relativePath); + + if ( - this.database.getLatestDocumentByRelativePath(relativePath) + document ?.isDeleted === true ) { // This is must be a consequence of us deleting a file because of a remote update @@ -136,28 +137,25 @@ export class Syncer { // document which finishes after the delete has succeeded and would introduce a phantom metadata record. this.database.delete(relativePath); - const [promise, resolve, reject] = createPromise(); - const document = await this.database.getResolvedDocumentByRelativePath( - relativePath, - promise - ); - try { - await this.syncQueue.add(async () => - this.unrestrictedSyncer.unrestrictedSyncLocallyDeletedFile( - document - ) + await this.enqueueSyncOperation(async () => { + const document = this.database.getLatestDocumentByRelativePath(relativePath); + + if (document === undefined) { + this.logger.debug( + `Cannot find document ${relativePath} in the database, must have been deleted already, skipping` + ); + return; + } + + await this.unrestrictedSyncer.unrestrictedSyncLocallyDeletedFile( + document ); - resolve(); - this.database.removeDocument(document); - } catch (e) { - reject(e); - } finally { - this.database.removeDocumentPromise(promise); - } + }, [document?.metadata?.documentId, relativePath] + ); } public async syncLocallyUpdatedFile({ @@ -167,13 +165,17 @@ export class Syncer { oldPath?: RelativePath; relativePath: RelativePath; }): Promise { + const documentAtNewPath = this.database.getLatestDocumentByRelativePath( + relativePath + ); + if (oldPath !== undefined) { // We might have moved the document in the database before calling this method, // in that case, we mustn't move it again. if ( - this.database.getLatestDocumentByRelativePath(relativePath) === - undefined || - this.database.getLatestDocumentByRelativePath(relativePath) + documentAtNewPath === + undefined || + documentAtNewPath ?.isDeleted === true ) { if (oldPath === relativePath) { @@ -214,29 +216,17 @@ export class Syncer { return; } - const [promise, resolve, reject] = createPromise(); - document = await this.database.getResolvedDocumentByRelativePath( - relativePath, - promise + await this.enqueueSyncOperation(async () => + this.unrestrictedSyncer.unrestrictedSyncLocallyCreatedOrUpdatedFile( + { + oldPath, + document + } + ), [document.metadata?.documentId, relativePath, oldPath] ); - try { - await this.syncQueue.add(async () => - this.unrestrictedSyncer.unrestrictedSyncLocallyCreatedOrUpdatedFile( - { - oldPath, - document - } - ) - ); - resolve(); - } catch (e) { - reject(e); - } finally { - this.database.removeDocumentPromise(promise); - } } public async scheduleSyncForOfflineChanges(): Promise { @@ -300,7 +290,7 @@ export class Syncer { public reset(): void { this._isFirstSyncComplete = false; this.syncQueue.clear(); - this.remoteDocumentsLock.reset(); + this.updatedDocumentsByPathAndKeysLock.reset(); this.runningScheduleSyncForOfflineChanges = undefined; } @@ -317,91 +307,17 @@ export class Syncer { private async internalSyncRemotelyUpdatedFile( remoteVersion: DocumentVersionWithoutContent ): Promise { - let document = this.database.getDocumentByDocumentId( + const document = this.database.getDocumentByDocumentId( remoteVersion.documentId ); - - if (document === undefined) { - return this.remoteDocumentsLock.withLock( - // Avoid the same documents getting created in parallel multiple times through fetching multiple updates of the same - // new remote document concurrently. - // There might be multiple tasks waiting for the lock - remoteVersion.documentId, - async () => { - // We have to wait for any ongoing creates sent for this file to finish, - // This is to avoid fetching one's own creates before the corresponding local create has finished syncing. This is a concern because - // documents being created don't yet have a document id in the local database and we could be notified of the remote create - // before the local create has finished syncing, so we can't just ignore the update based on the local DB content as we - // can't find the corresponding document yet. - if (document?.metadata === undefined) { - await this.unrestrictedSyncer.fileCreationLock.waitForLockWithoutAcquiringLock( - remoteVersion.relativePath - ); - } - - document = this.database.getDocumentByDocumentId( - remoteVersion.documentId - ); - - // We're the first one to get the lock, so we have to create the document in `unrestrictedSyncRemotelyUpdatedFile` - if (document === undefined) { - await this.syncQueue.add(async () => - this.unrestrictedSyncer.unrestrictedSyncRemotelyUpdatedFile( - remoteVersion - ) - ); - } else { - const [promise, resolve, reject] = createPromise(); - - document = - await this.database.getResolvedDocumentByRelativePath( - document.relativePath, - promise - ); - - try { - await this.syncQueue.add(async () => - this.unrestrictedSyncer.unrestrictedSyncRemotelyUpdatedFile( - remoteVersion, - document - ) - ); - - resolve(); - } catch (e) { - reject(e); - } finally { - this.database.removeDocumentPromise(promise); - } - } - - this.database.addSeenUpdateId(remoteVersion.vaultUpdateId); - } - ); - } - - // We're either the first one to get the lock, so we have to create the document in `unrestrictedSyncRemotelyUpdatedFile` - const [promise, resolve, reject] = createPromise(); - - document = await this.database.getResolvedDocumentByRelativePath( - document.relativePath, - promise - ); - - try { + this.enqueueSyncOperation(async () => await this.syncQueue.add(async () => this.unrestrictedSyncer.unrestrictedSyncRemotelyUpdatedFile( remoteVersion, document ) - ); - - resolve(); - } catch (e) { - reject(e); - } finally { - this.database.removeDocumentPromise(promise); - } + ), [document?.relativePath, remoteVersion.relativePath, remoteVersion.documentId] + ); this.database.addSeenUpdateId(remoteVersion.vaultUpdateId); } @@ -546,4 +462,13 @@ export class Syncer { }) ); } + + private async enqueueSyncOperation( + operation: () => Promise, + keys: Array + ): Promise { + return this.updatedDocumentsByPathAndKeysLock.withLock(keys.filter(k => k !== undefined && k !== null), async () => + this.syncQueue.add(operation) + ); + } } diff --git a/frontend/sync-client/src/sync-operations/unrestricted-syncer.ts b/frontend/sync-client/src/sync-operations/unrestricted-syncer.ts index 5514d617..e1163dca 100644 --- a/frontend/sync-client/src/sync-operations/unrestricted-syncer.ts +++ b/frontend/sync-client/src/sync-operations/unrestricted-syncer.ts @@ -36,8 +36,6 @@ import type { ServerConfig } from "../services/server-config"; import { Locks } from "../utils/data-structures/locks"; export class UnrestrictedSyncer { - public readonly fileCreationLock: Locks = - new Locks(); private ignorePatterns: RegExp[]; public constructor( @@ -65,10 +63,10 @@ export class UnrestrictedSyncer { public async unrestrictedSyncLocallyCreatedOrUpdatedFile({ oldPath, - document, // We use the same code path for both local and remote updates. We need to force the update // if there are no local changes but we know that the remote version is newer. - force = false + force = false, + document, }: { oldPath?: RelativePath; force?: boolean; @@ -80,16 +78,16 @@ export class UnrestrictedSyncer { | SyncMovedDetails = document.metadata === undefined ? { - type: SyncType.CREATE, - relativePath: document.relativePath - } + type: SyncType.CREATE, + relativePath: document.relativePath + } : oldPath !== undefined - ? { + ? { type: SyncType.MOVE, relativePath: document.relativePath, movedFrom: oldPath } - : { + : { type: SyncType.UPDATE, relativePath: document.relativePath }; @@ -111,27 +109,21 @@ export class UnrestrictedSyncer { let response: DocumentVersion | DocumentUpdateResponse | undefined = undefined; - if (document.metadata === undefined) { - response = await this.fileCreationLock.withLock( - document.relativePath, - async () => { - const createResponse = await this.syncService.create({ - relativePath: originalRelativePath, - contentBytes - }); + response = await this.syncService.create({ + relativePath: originalRelativePath, + contentBytes + }); - await this.handleMaybeMergingResponse({ - document, - response: createResponse, - contentHash, - originalRelativePath, - originalContentBytes: contentBytes - }); + await this.handleMaybeMergingResponse({ + document, + response, + contentHash, + originalRelativePath, + originalContentBytes: contentBytes, + isCreate: true + }); - return createResponse; - } - ); } else { const areThereLocalChanges = document.metadata.hash !== contentHash || @@ -152,22 +144,22 @@ export class UnrestrictedSyncer { response = isText && cachedVersion !== undefined ? await this.syncService.putText({ - documentId: document.metadata.documentId, - parentVersionId: - document.metadata.parentVersionId, - relativePath: document.relativePath, - content: diff( - new TextDecoder().decode(cachedVersion), - new TextDecoder().decode(contentBytes) - ) - }) + documentId: document.metadata.documentId, + parentVersionId: + document.metadata.parentVersionId, + relativePath: document.relativePath, + content: diff( + new TextDecoder().decode(cachedVersion), + new TextDecoder().decode(contentBytes) + ) + }) : await this.syncService.putBinary({ - documentId: document.metadata.documentId, - parentVersionId: - document.metadata.parentVersionId, - relativePath: document.relativePath, - contentBytes - }); + documentId: document.metadata.documentId, + parentVersionId: + document.metadata.parentVersionId, + relativePath: document.relativePath, + contentBytes + }); } else { if (!force) { this.logger.debug( @@ -204,16 +196,16 @@ export class UnrestrictedSyncer { const actualUpdateDetails: SyncUpdateDetails | SyncMovedDetails = oldPath !== undefined || - response.relativePath != originalRelativePath + response.relativePath != originalRelativePath ? { - type: SyncType.MOVE, - relativePath: response.relativePath, - movedFrom: originalRelativePath - } + type: SyncType.MOVE, + relativePath: response.relativePath, + movedFrom: originalRelativePath + } : { - type: SyncType.UPDATE, - relativePath: response.relativePath - }; + type: SyncType.UPDATE, + relativePath: response.relativePath + }; if (!response.isDeleted) { this.history.addHistoryEntry({ @@ -351,7 +343,6 @@ export class UnrestrictedSyncer { await this.operations.ensureClearPath(remoteVersion.relativePath); - const [promise, resolve] = createPromise(); this.database.updateDocumentMetadata( { documentId: remoteVersion.documentId, @@ -361,7 +352,6 @@ export class UnrestrictedSyncer { }, this.database.createNewPendingDocument( remoteVersion.relativePath, - promise ) ); @@ -375,8 +365,6 @@ export class UnrestrictedSyncer { remoteVersion.relativePath ); - resolve(); - this.database.removeDocumentPromise(promise); this.history.addHistoryEntry({ status: SyncStatus.SUCCESS, @@ -388,9 +376,7 @@ export class UnrestrictedSyncer { }); } - public reset(): void { - this.fileCreationLock.reset(); - } + private async executeSync( details: SyncDetails, @@ -461,13 +447,15 @@ export class UnrestrictedSyncer { response, contentHash, originalRelativePath, - originalContentBytes + originalContentBytes, + isCreate }: { document: DocumentRecord; response: DocumentVersion | DocumentUpdateResponse; contentHash: string; originalRelativePath: string; originalContentBytes: Uint8Array; + isCreate?: boolean; }): Promise { // `document` is mutable and reflects the latest state in the local database if (document.isDeleted) { @@ -494,6 +482,26 @@ export class UnrestrictedSyncer { let actualPath = document.relativePath; + + if (isCreate === true) { + // We have a file locally that got moved by another client to the same path as the one we're trying to create. + // The server returns a merging update for the document ID that already exists locally (but at another path). + // We have to merge these two documents by extending the provenance of the existing document and deleting + // the old document that the new document already contains the content for. + const existingDocument = this.database.getDocumentByDocumentId( + response.documentId + ); + if (existingDocument !== undefined) { + this.logger.info(`Merging document ${existingDocument.relativePath} into existing document ${document.relativePath} after concurrent move & creation`); + this.database.removeDocument(document); // this was a (fake) pending document + if (!existingDocument.isDeleted) { + this.operations.delete(document.relativePath); + } + document = existingDocument; + } + + } + // this can't happen on the creation path as we can only get a merging response if a document already exists remotely on the same path if (response.relativePath != originalRelativePath) { actualPath = response.relativePath; @@ -508,10 +516,12 @@ export class UnrestrictedSyncer { ); // this can throw FileNotFoundError } + if (!("type" in response) || response.type === "MergingUpdate") { const responseBytes = base64ToBytes(response.contentBase64); contentHash = hash(responseBytes); + this.database.updateDocumentMetadata( { documentId: response.documentId, @@ -564,9 +574,8 @@ export class UnrestrictedSyncer { type: SyncType.SKIPPED, relativePath }, - message: `File size of ${sizeInMB} MB exceeds the maximum file size limit of ${ - maxFileSizeMB - } MB` + message: `File size of ${sizeInMB} MB exceeds the maximum file size limit of ${maxFileSizeMB + } MB` }; } } diff --git a/frontend/sync-client/src/utils/data-structures/locks.test.ts b/frontend/sync-client/src/utils/data-structures/locks.test.ts index d1dcc6d7..5a487298 100644 --- a/frontend/sync-client/src/utils/data-structures/locks.test.ts +++ b/frontend/sync-client/src/utils/data-structures/locks.test.ts @@ -10,6 +10,8 @@ import { SyncResetError } from "../../errors/sync-reset-error"; describe("withLock", () => { const testPath: RelativePath = "test/document/path"; const testPath2: RelativePath = "test/document/path2"; + const testPath3: RelativePath = "test/document/path3"; + const logger = new Logger(); // eslint-disable-next-line @typescript-eslint/init-declarations @@ -56,22 +58,29 @@ describe("withLock", () => { it("should sort multiple keys to prevent deadlocks", async () => { const executionOrder: string[] = []; - // Start two concurrent operations with keys in different orders - const promise1 = locks.withLock([testPath2, testPath], async () => { + await locks.waitForLock(testPath); + + const promise = awaitAll([locks.withLock([testPath2, testPath3, testPath], async () => { executionOrder.push("operation1-start"); - await sleep(50); executionOrder.push("operation1-end"); return "result1"; - }); + }), - const promise2 = locks.withLock([testPath, testPath2], async () => { + locks.withLock([testPath3, testPath, testPath2], async () => { executionOrder.push("operation2-start"); - await sleep(50); executionOrder.push("operation2-end"); return "result2"; - }); + })]); + + + locks.unlock(testPath); + + const [result1, result2] = await Promise.race([promise, new Promise((_, reject) => { + setTimeout(() => { + reject(new Error("Deadlock detected")); + }, 1000); + })]); - const [result1, result2] = await awaitAll([promise1, promise2]); assert.strictEqual(result1, "result1"); assert.strictEqual(result2, "result2"); @@ -252,7 +261,7 @@ describe("reset", () => { await sleep(1); const secondPromise = locks.withLock(testPath, async () => "second"); - void secondPromise.catch(() => {}); // eslint-disable-line @typescript-eslint/no-empty-function + void secondPromise.catch(() => { }); // eslint-disable-line @typescript-eslint/no-empty-function locks.reset(); @@ -273,7 +282,7 @@ describe("reset", () => { await sleep(1); const secondPromise = locks.withLock(testPath, async () => "second"); - void secondPromise.catch(() => {}); // eslint-disable-line @typescript-eslint/no-empty-function + void secondPromise.catch(() => { }); // eslint-disable-line @typescript-eslint/no-empty-function locks.reset(); diff --git a/frontend/sync-client/src/utils/data-structures/locks.ts b/frontend/sync-client/src/utils/data-structures/locks.ts index f0f79a46..20bd378f 100644 --- a/frontend/sync-client/src/utils/data-structures/locks.ts +++ b/frontend/sync-client/src/utils/data-structures/locks.ts @@ -18,7 +18,7 @@ export class Locks { [() => unknown, (err: unknown) => unknown][] >(); - public constructor(private readonly logger?: Logger) {} + public constructor(private readonly logger?: Logger) { } /** * Executes a function while holding exclusive locks on one or more keys. @@ -59,7 +59,10 @@ export class Locks { const uniqueKeys = Array.from(new Set(keys)); uniqueKeys.sort((a, b) => String(a).localeCompare(String(b))); // Ensure consistent order to prevent deadlocks - await awaitAll(uniqueKeys.map(async (key) => this.waitForLock(key))); + for (const key of uniqueKeys) { + // Must acquire locks in-order (not concurrently) to prevent deadlocks + await this.waitForLock(key); + } try { return await fn(); @@ -82,6 +85,44 @@ export class Locks { this.waiters.clear(); } + public isLocked(key: T): boolean { + return this.locked.has(key); + } + + public getDebugString(): string { + const lockedKeys = Array.from(this.locked).map((key) => String(key)); + const waiterEntries = Array.from(this.waiters.entries()).filter( + ([_, waiting]) => waiting.length > 0 + ); + + const lines: string[] = []; + lines.push("=== Locks Debug ==="); + lines.push(`Locked keys (${lockedKeys.length}):`); + if (lockedKeys.length === 0) { + lines.push(" (none)"); + } else { + for (const key of lockedKeys) { + const waiterCount = + this.waiters.get(key as T)?.length ?? 0; + lines.push( + ` - ${key}${waiterCount > 0 ? ` (${waiterCount} waiting)` : ""}` + ); + } + } + + lines.push(`Waiters (${waiterEntries.length} keys):`); + if (waiterEntries.length === 0) { + lines.push(" (none)"); + } else { + for (const [key, waiting] of waiterEntries) { + lines.push(` - ${String(key)}: ${waiting.length} waiting`); + } + } + lines.push("==================="); + + return lines.join("\n"); + } + /** * Attempts to acquire a lock immediately without waiting. * Must call `unlock()` if successful. @@ -125,17 +166,6 @@ export class Locks { }); } - /** - * Waits until a lock is released without acquiring it. - * Operations are queued in FIFO order. - * - * @param key The key to wait for - * @returns Promise that resolves when lock is released - */ - public async waitForLockWithoutAcquiringLock(key: T): Promise { - await this.waitForLock(key); - this.unlock(key); - } /** * Releases a lock and grants access to the next waiting operation in FIFO order. diff --git a/frontend/test-client/src/agent/mock-agent.ts b/frontend/test-client/src/agent/mock-agent.ts index 9f9e6a45..73dbc5ec 100644 --- a/frontend/test-client/src/agent/mock-agent.ts +++ b/frontend/test-client/src/agent/mock-agent.ts @@ -9,7 +9,7 @@ import { sleep } from "../utils/sleep"; import type { LogLine } from "sync-client"; import { withTimeout } from "../utils/with-timeout"; -const TIMEOUT_MS = 10 * 60 * 1000; +const TIMEOUT_MS = 2 * 60 * 1000; export class MockAgent extends MockClient { private readonly writtenContents: string[] = []; @@ -105,7 +105,16 @@ export class MockAgent extends MockClient { } public async waitUntilSynced(): Promise { - await this.client.waitUntilFinished(); + await withTimeout( + (async (): Promise => { + this.client.setSetting("isSyncEnabled", true); + await this.client.waitUntilFinished(); + })(), + TIMEOUT_MS, + "waitUntilSynced()" + ); + + } public async act(): Promise { diff --git a/frontend/test-client/src/cli.ts b/frontend/test-client/src/cli.ts index 45421660..b48398ff 100644 --- a/frontend/test-client/src/cli.ts +++ b/frontend/test-client/src/cli.ts @@ -7,7 +7,7 @@ import { randomCasing } from "./utils/random-casing"; import { TimeoutError } from "./utils/with-timeout"; const TEST_ITERATIONS = 5; -const MAX_INITIAL_DOCS = 0; +const MAX_INITIAL_DOCS = 10; // Simulate async file access by injecting waiting time before returning from file operations. let slowFileEvents = false; @@ -90,10 +90,11 @@ async function runTest({ logger.info("Stopping agents"); - // Each agent can have unpushed changes which might conflict with eachother so each has to resolve the conflicts & push, and + // Each agent can have unpushed changes which might conflict with eachother so each has to resolve the conflicts & push, and pull for (const client of clients) { try { logger.info(`Finishing up ${client.name}`); + await client.waitUntilSynced(); await client.finish(); } catch (err) { if (err instanceof TimeoutError || !slowFileEvents) { @@ -102,7 +103,7 @@ async function runTest({ } } - // then we need a second pass to ensure that all agents pull the same state. + // then we need a second pass to ensure that all agents pull the same state for (const client of clients) { try { logger.info(`Destroying ${client.name}`); @@ -183,6 +184,9 @@ process.on("uncaughtException", (error) => { } logger.error(`Error - uncaught exception: ${error}`); + if (error instanceof Error && error.stack) { + logger.error(error.stack); + } process.exit(1); }); @@ -211,6 +215,9 @@ process.on("unhandledRejection", (error, _promise) => { } logger.error(`Error - unhandled rejection: ${error}`); + if (error instanceof Error && error.stack) { + logger.error(error.stack); + } process.exit(1); });