From 022c57e88a626595f92b5b75fe65efee328b1ec3 Mon Sep 17 00:00:00 2001 From: Andras Schmelczer Date: Sat, 23 Aug 2025 12:37:05 +0100 Subject: [PATCH] Make locks deadlock-safe --- .../src/debugging/flaky-websocket-factory.ts | 34 ++++----- .../safe-filesystem-operations.ts | 67 ++++------------ frontend/sync-client/src/utils/locks.ts | 76 +++++++++++++++---- .../src/utils/flaky-websocket-factory.ts | 33 ++++---- 4 files changed, 112 insertions(+), 98 deletions(-) diff --git a/frontend/obsidian-plugin/src/debugging/flaky-websocket-factory.ts b/frontend/obsidian-plugin/src/debugging/flaky-websocket-factory.ts index 56310aa6..f59cce19 100644 --- a/frontend/obsidian-plugin/src/debugging/flaky-websocket-factory.ts +++ b/frontend/obsidian-plugin/src/debugging/flaky-websocket-factory.ts @@ -24,15 +24,18 @@ export function flakyWebSocketFactory( public set onmessage(callback: (event: MessageEvent) => void) { super.onmessage = async (event: MessageEvent): Promise => { - await this.locks.waitForLock(FlakyWebSocket.RECEIVE_KEY); + await this.locks.withLock( + FlakyWebSocket.RECEIVE_KEY, + async () => { + if (jitterScaleInSeconds > 0) { + await sleep( + Math.random() * jitterScaleInSeconds * 1000 + ); + } - if (jitterScaleInSeconds > 0) { - await sleep(Math.random() * jitterScaleInSeconds * 1000); - } - - callback(event); - - this.locks.unlock(FlakyWebSocket.RECEIVE_KEY); + callback(event); + } + ); }; } @@ -66,15 +69,12 @@ export function flakyWebSocketFactory( data: string | ArrayBufferLike | Blob | ArrayBufferView ): Promise { // maintain message order - await this.locks.waitForLock(FlakyWebSocket.SEND_KEY); - - if (jitterScaleInSeconds > 0) { - await sleep(Math.random() * jitterScaleInSeconds * 1000); - } - - super.send(data); - - this.locks.unlock(FlakyWebSocket.SEND_KEY); + await this.locks.withLock(FlakyWebSocket.SEND_KEY, async () => { + if (jitterScaleInSeconds > 0) { + await sleep(Math.random() * jitterScaleInSeconds * 1000); + } + super.send(data); + }); } } as unknown as typeof WebSocket; } diff --git a/frontend/sync-client/src/file-operations/safe-filesystem-operations.ts b/frontend/sync-client/src/file-operations/safe-filesystem-operations.ts index 214f9f6e..339401af 100644 --- a/frontend/sync-client/src/file-operations/safe-filesystem-operations.ts +++ b/frontend/sync-client/src/file-operations/safe-filesystem-operations.ts @@ -31,16 +31,14 @@ export class SafeFileSystemOperations implements FileSystemOperations { this.logger.debug(`Reading file '${path}'`); return this.safeOperation( path, - this.decorateToHoldLock(path, async () => this.fs.read(path)), + async () => this.locks.withLock(path, () => this.fs.read(path)), "read" ); } public async write(path: RelativePath, content: Uint8Array): Promise { this.logger.debug(`Writing to file '${path}'`); - return this.decorateToHoldLock(path, async () => - this.fs.write(path, content) - )(); + return this.locks.withLock(path, () => this.fs.write(path, content)); } public async atomicUpdateText( @@ -50,9 +48,10 @@ export class SafeFileSystemOperations implements FileSystemOperations { this.logger.debug(`Atomically updating file '${path}'`); return this.safeOperation( path, - this.decorateToHoldLock(path, async () => - this.fs.atomicUpdateText(path, updater) - ), + async () => + this.locks.withLock(path, () => + this.fs.atomicUpdateText(path, updater) + ), "atomicUpdateText" ); } @@ -61,32 +60,25 @@ export class SafeFileSystemOperations implements FileSystemOperations { // Logging this would be too noisy return this.safeOperation( path, - this.decorateToHoldLock(path, async () => - this.fs.getFileSize(path) - ), + async () => + this.locks.withLock(path, () => this.fs.getFileSize(path)), "getFileSize" ); } public async exists(path: RelativePath): Promise { this.logger.debug(`Checking if file '${path}' exists`); - return this.decorateToHoldLock(path, async () => - this.fs.exists(path) - )(); + return this.locks.withLock(path, () => this.fs.exists(path)); } public async createDirectory(path: RelativePath): Promise { this.logger.debug(`Creating directory '${path}'`); - return this.decorateToHoldLock(path, async () => - this.fs.createDirectory(path) - )(); + return this.locks.withLock(path, () => this.fs.createDirectory(path)); } public async delete(path: RelativePath): Promise { this.logger.debug(`Deleting file '${path}'`); - return this.decorateToHoldLock(path, async () => - this.fs.delete(path) - )(); + return this.locks.withLock(path, async () => this.fs.delete(path)); } public async rename( @@ -96,43 +88,14 @@ export class SafeFileSystemOperations implements FileSystemOperations { this.logger.debug(`Renaming file '${oldPath}' to '${newPath}'`); return this.safeOperation( oldPath, - this.decorateToHoldLock([oldPath, newPath], async () => - this.fs.rename(oldPath, newPath) - ), + async () => + this.locks.withLock([oldPath, newPath], () => + this.fs.rename(oldPath, newPath) + ), "rename" ); } - /** - * Decorate an operation to ensure that the file is locked before running it - * and that the lock is released afterwards. This results in at-most one - * concurrent operation running per file. - */ - private decorateToHoldLock( - pathOrPaths: RelativePath | RelativePath[], - operation: () => Promise - ): () => Promise { - return async () => { - const paths = Array.isArray(pathOrPaths) - ? pathOrPaths - : [pathOrPaths]; - - await Promise.all( - paths.map(async (path) => this.locks.waitForLock(path)) - ); - - try { - return await operation(); - } finally { - await Promise.all( - paths.map((path) => { - this.locks.unlock(path); - }) - ); - } - }; - } - /** * Decorate an operation to ensure that the file exists before running it. * If the operation fails, it will check if the file still exists and throw diff --git a/frontend/sync-client/src/utils/locks.ts b/frontend/sync-client/src/utils/locks.ts index 77b3b767..fa513de1 100644 --- a/frontend/sync-client/src/utils/locks.ts +++ b/frontend/sync-client/src/utils/locks.ts @@ -13,7 +13,54 @@ export class Locks { /** Queue of resolve functions waiting for each key */ private readonly waiters = new Map unknown)[]>(); - public constructor(private readonly logger: Logger) {} + public constructor(private readonly logger?: Logger) {} + + /** + * Executes a function while holding exclusive locks on one or more keys. + * + * This method ensures that the provided function runs with exclusive access to the + * specified key(s). Multiple keys are sorted to prevent deadlocks when different + * operations request the same keys in different orders. + * + * @template R The return type of the function to execute + * @param keyOrKeys A single key or array of keys to lock during function execution + * @param fn The function to execute while holding the lock(s). Can be sync or async. + * @returns A Promise that resolves to the return value of the executed function + * + * @example + * ```typescript + * // Lock a single key + * const result = await locks.withLock('file1', () => { + * // Critical section - only one operation can access 'file1' at a time + * return processFile('file1'); + * }); + * + * // Lock multiple keys (prevents deadlocks through consistent ordering) + * await locks.withLock(['file1', 'file2'], async () => { + * // Critical section - exclusive access to both files + * await moveFile('file1', 'file2'); + * }); + * ``` + * + * @throws Any error thrown by the provided function will be propagated after locks are released + */ + public async withLock( + keyOrKeys: T | T[], + fn: () => R | Promise + ): Promise { + const keys = Array.isArray(keyOrKeys) ? keyOrKeys : [keyOrKeys]; + keys.sort(); // Ensure consistent order to prevent deadlocks + + await Promise.all(keys.map(async (key) => this.waitForLock(key))); + + try { + return await fn(); + } finally { + keys.forEach((key) => { + this.unlock(key); + }); + } + } /** * Attempts to acquire a lock immediately without waiting. @@ -22,7 +69,7 @@ export class Locks { * @param key The key to lock * @returns `true` if lock acquired, `false` if already locked */ - public tryLock(key: T): boolean { + private tryLock(key: T): boolean { if (this.locked.has(key)) { return false; } @@ -39,12 +86,12 @@ export class Locks { * @param key The key to wait for and lock * @returns Promise that resolves when lock is acquired */ - public async waitForLock(key: T): Promise { + private async waitForLock(key: T): Promise { if (this.tryLock(key)) { return Promise.resolve(); } - this.logger.debug(`Waiting for lock on ${key}`); + this.logger?.debug(`Waiting for lock on ${key}`); return new Promise((resolve) => { // DefaultDict behavior @@ -65,7 +112,7 @@ export class Locks { * @param key The key to unlock * @throws {Error} If key is not currently locked */ - public unlock(key: T): void { + private unlock(key: T): void { if (!this.locked.has(key)) { throw new Error(`Key '${key}' is not locked, cannot unlock`); } @@ -74,19 +121,22 @@ export class Locks { const nextWaiting = this.waiters.get(key)?.shift(); if (nextWaiting) { - this.logger.debug(`Granted lock on ${key}`); + this.logger?.debug(`Granted lock on ${key}`); nextWaiting(); } else { this.locked.delete(key); } } +} - /** - * Clears all locks and waiters. Causes waiting operations to hang indefinitely. - * Use with caution. - */ - public reset(): void { - this.locked.clear(); - this.waiters.clear(); +export class Lock { + private readonly locks: Locks; + + public constructor(logger?: Logger) { + this.locks = new Locks(logger); + } + + public async withLock(fn: () => R | Promise): Promise { + return this.locks.withLock(true, fn); } } diff --git a/frontend/test-client/src/utils/flaky-websocket-factory.ts b/frontend/test-client/src/utils/flaky-websocket-factory.ts index 6a146de2..c2c13525 100644 --- a/frontend/test-client/src/utils/flaky-websocket-factory.ts +++ b/frontend/test-client/src/utils/flaky-websocket-factory.ts @@ -25,15 +25,18 @@ export function flakyWebSocketFactory( public set onmessage(callback: (event: MessageEvent) => void) { super.onmessage = async (event: MessageEvent): Promise => { - await this.locks.waitForLock(FlakyWebSocket.RECEIVE_KEY); + return this.locks.withLock( + FlakyWebSocket.RECEIVE_KEY, + async () => { + if (jitterScaleInSeconds > 0) { + await sleep( + Math.random() * jitterScaleInSeconds * 1000 + ); + } - if (jitterScaleInSeconds > 0) { - await sleep(Math.random() * jitterScaleInSeconds * 1000); - } - - callback(event); - - this.locks.unlock(FlakyWebSocket.RECEIVE_KEY); + callback(event); + } + ); }; } @@ -67,15 +70,13 @@ export function flakyWebSocketFactory( data: string | ArrayBufferLike | Blob | ArrayBufferView ): Promise { // maintain message order - await this.locks.waitForLock(FlakyWebSocket.SEND_KEY); + return this.locks.withLock(FlakyWebSocket.SEND_KEY, async () => { + if (jitterScaleInSeconds > 0) { + await sleep(Math.random() * jitterScaleInSeconds * 1000); + } - if (jitterScaleInSeconds > 0) { - await sleep(Math.random() * jitterScaleInSeconds * 1000); - } - - super.send(data); - - this.locks.unlock(FlakyWebSocket.SEND_KEY); + super.send(data); + }); } } as unknown as typeof WebSocket; }