142 lines
3.9 KiB
TypeScript
142 lines
3.9 KiB
TypeScript
import type { Logger } from "../../tracing/logger";
|
|
|
|
/**
|
|
* Manages exclusive locks on items to prevent concurrent modifications.
|
|
* Locks are granted in FIFO order.
|
|
*
|
|
* @template T The type of the key used for locking
|
|
*/
|
|
export class Locks<T> {
|
|
/** Currently locked keys */
|
|
private readonly locked = new Set<T>();
|
|
|
|
/** Queue of resolve functions waiting for each key */
|
|
private readonly waiters = new Map<T, (() => unknown)[]>();
|
|
|
|
public constructor(private readonly logger?: Logger) {}
|
|
|
|
/**
|
|
* Executes a function while holding exclusive locks on one or more keys.
|
|
*
|
|
* This method ensures that the provided function runs with exclusive access to the
|
|
* specified key(s). Multiple keys are sorted to prevent deadlocks when different
|
|
* operations request the same keys in different orders.
|
|
*
|
|
* @template R The return type of the function to execute
|
|
* @param keyOrKeys A single key or array of keys to lock during function execution
|
|
* @param fn The function to execute while holding the lock(s). Can be sync or async.
|
|
* @returns A Promise that resolves to the return value of the executed function
|
|
*
|
|
* @example
|
|
* ```typescript
|
|
* // Lock a single key
|
|
* const result = await locks.withLock('file1', () => {
|
|
* // Critical section - only one operation can access 'file1' at a time
|
|
* return processFile('file1');
|
|
* });
|
|
*
|
|
* // Lock multiple keys (prevents deadlocks through consistent ordering)
|
|
* await locks.withLock(['file1', 'file2'], async () => {
|
|
* // Critical section - exclusive access to both files
|
|
* await moveFile('file1', 'file2');
|
|
* });
|
|
* ```
|
|
*
|
|
* @throws Any error thrown by the provided function will be propagated after locks are released
|
|
*/
|
|
public async withLock<R>(
|
|
keyOrKeys: T | T[],
|
|
fn: () => R | Promise<R>
|
|
): Promise<R> {
|
|
const keys = Array.isArray(keyOrKeys) ? keyOrKeys : [keyOrKeys];
|
|
keys.sort((a, b) => String(a).localeCompare(String(b))); // Ensure consistent order to prevent deadlocks
|
|
|
|
await Promise.all(keys.map(async (key) => this.waitForLock(key)));
|
|
|
|
try {
|
|
return await fn();
|
|
} finally {
|
|
keys.forEach((key) => {
|
|
this.unlock(key);
|
|
});
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Attempts to acquire a lock immediately without waiting.
|
|
* Must call `unlock()` if successful.
|
|
*
|
|
* @param key The key to lock
|
|
* @returns `true` if lock acquired, `false` if already locked
|
|
*/
|
|
private tryLock(key: T): boolean {
|
|
if (this.locked.has(key)) {
|
|
return false;
|
|
}
|
|
|
|
this.locked.add(key);
|
|
|
|
return true;
|
|
}
|
|
|
|
/**
|
|
* Waits to acquire a lock, blocking until available.
|
|
* Operations are queued in FIFO order. Must call `unlock()` when done.
|
|
*
|
|
* @param key The key to wait for and lock
|
|
* @returns Promise that resolves when lock is acquired
|
|
*/
|
|
private async waitForLock(key: T): Promise<void> {
|
|
if (this.tryLock(key)) {
|
|
return Promise.resolve();
|
|
}
|
|
|
|
this.logger?.debug(`Waiting for lock on ${key}`);
|
|
|
|
return new Promise((resolve) => {
|
|
// DefaultDict behavior
|
|
let waiting = this.waiters.get(key);
|
|
if (!waiting) {
|
|
waiting = [];
|
|
this.waiters.set(key, waiting);
|
|
}
|
|
|
|
waiting.push(resolve);
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Releases a lock and grants access to the next waiting operation in FIFO order.
|
|
* Removes the key from locked set if no waiters.
|
|
*
|
|
* @param key The key to unlock
|
|
* @throws {Error} If key is not currently locked
|
|
*/
|
|
private unlock(key: T): void {
|
|
if (!this.locked.has(key)) {
|
|
throw new Error(`Key '${key}' is not locked, cannot unlock`);
|
|
}
|
|
|
|
// Remove first waiter to ensure FIFO order
|
|
const nextWaiting = this.waiters.get(key)?.shift();
|
|
|
|
if (nextWaiting) {
|
|
this.logger?.debug(`Granted lock on ${key}`);
|
|
nextWaiting();
|
|
} else {
|
|
this.locked.delete(key);
|
|
}
|
|
}
|
|
}
|
|
|
|
export class Lock {
|
|
private readonly locks: Locks<boolean>;
|
|
|
|
public constructor(logger?: Logger) {
|
|
this.locks = new Locks(logger);
|
|
}
|
|
|
|
public async withLock<R>(fn: () => R | Promise<R>): Promise<R> {
|
|
return this.locks.withLock(true, fn);
|
|
}
|
|
}
|