Make locks deadlock-safe
This commit is contained in:
parent
115c1067f9
commit
022c57e88a
4 changed files with 112 additions and 98 deletions
|
|
@ -24,15 +24,18 @@ export function flakyWebSocketFactory(
|
|||
|
||||
public set onmessage(callback: (event: MessageEvent) => void) {
|
||||
super.onmessage = async (event: MessageEvent): Promise<void> => {
|
||||
await this.locks.waitForLock(FlakyWebSocket.RECEIVE_KEY);
|
||||
await this.locks.withLock(
|
||||
FlakyWebSocket.RECEIVE_KEY,
|
||||
async () => {
|
||||
if (jitterScaleInSeconds > 0) {
|
||||
await sleep(
|
||||
Math.random() * jitterScaleInSeconds * 1000
|
||||
);
|
||||
}
|
||||
|
||||
if (jitterScaleInSeconds > 0) {
|
||||
await sleep(Math.random() * jitterScaleInSeconds * 1000);
|
||||
}
|
||||
|
||||
callback(event);
|
||||
|
||||
this.locks.unlock(FlakyWebSocket.RECEIVE_KEY);
|
||||
callback(event);
|
||||
}
|
||||
);
|
||||
};
|
||||
}
|
||||
|
||||
|
|
@ -66,15 +69,12 @@ export function flakyWebSocketFactory(
|
|||
data: string | ArrayBufferLike | Blob | ArrayBufferView
|
||||
): Promise<void> {
|
||||
// maintain message order
|
||||
await this.locks.waitForLock(FlakyWebSocket.SEND_KEY);
|
||||
|
||||
if (jitterScaleInSeconds > 0) {
|
||||
await sleep(Math.random() * jitterScaleInSeconds * 1000);
|
||||
}
|
||||
|
||||
super.send(data);
|
||||
|
||||
this.locks.unlock(FlakyWebSocket.SEND_KEY);
|
||||
await this.locks.withLock(FlakyWebSocket.SEND_KEY, async () => {
|
||||
if (jitterScaleInSeconds > 0) {
|
||||
await sleep(Math.random() * jitterScaleInSeconds * 1000);
|
||||
}
|
||||
super.send(data);
|
||||
});
|
||||
}
|
||||
} as unknown as typeof WebSocket;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -31,16 +31,14 @@ export class SafeFileSystemOperations implements FileSystemOperations {
|
|||
this.logger.debug(`Reading file '${path}'`);
|
||||
return this.safeOperation(
|
||||
path,
|
||||
this.decorateToHoldLock(path, async () => this.fs.read(path)),
|
||||
async () => this.locks.withLock(path, () => this.fs.read(path)),
|
||||
"read"
|
||||
);
|
||||
}
|
||||
|
||||
public async write(path: RelativePath, content: Uint8Array): Promise<void> {
|
||||
this.logger.debug(`Writing to file '${path}'`);
|
||||
return this.decorateToHoldLock(path, async () =>
|
||||
this.fs.write(path, content)
|
||||
)();
|
||||
return this.locks.withLock(path, () => this.fs.write(path, content));
|
||||
}
|
||||
|
||||
public async atomicUpdateText(
|
||||
|
|
@ -50,9 +48,10 @@ export class SafeFileSystemOperations implements FileSystemOperations {
|
|||
this.logger.debug(`Atomically updating file '${path}'`);
|
||||
return this.safeOperation(
|
||||
path,
|
||||
this.decorateToHoldLock(path, async () =>
|
||||
this.fs.atomicUpdateText(path, updater)
|
||||
),
|
||||
async () =>
|
||||
this.locks.withLock(path, () =>
|
||||
this.fs.atomicUpdateText(path, updater)
|
||||
),
|
||||
"atomicUpdateText"
|
||||
);
|
||||
}
|
||||
|
|
@ -61,32 +60,25 @@ export class SafeFileSystemOperations implements FileSystemOperations {
|
|||
// Logging this would be too noisy
|
||||
return this.safeOperation(
|
||||
path,
|
||||
this.decorateToHoldLock(path, async () =>
|
||||
this.fs.getFileSize(path)
|
||||
),
|
||||
async () =>
|
||||
this.locks.withLock(path, () => this.fs.getFileSize(path)),
|
||||
"getFileSize"
|
||||
);
|
||||
}
|
||||
|
||||
public async exists(path: RelativePath): Promise<boolean> {
|
||||
this.logger.debug(`Checking if file '${path}' exists`);
|
||||
return this.decorateToHoldLock(path, async () =>
|
||||
this.fs.exists(path)
|
||||
)();
|
||||
return this.locks.withLock(path, () => this.fs.exists(path));
|
||||
}
|
||||
|
||||
public async createDirectory(path: RelativePath): Promise<void> {
|
||||
this.logger.debug(`Creating directory '${path}'`);
|
||||
return this.decorateToHoldLock(path, async () =>
|
||||
this.fs.createDirectory(path)
|
||||
)();
|
||||
return this.locks.withLock(path, () => this.fs.createDirectory(path));
|
||||
}
|
||||
|
||||
public async delete(path: RelativePath): Promise<void> {
|
||||
this.logger.debug(`Deleting file '${path}'`);
|
||||
return this.decorateToHoldLock(path, async () =>
|
||||
this.fs.delete(path)
|
||||
)();
|
||||
return this.locks.withLock(path, async () => this.fs.delete(path));
|
||||
}
|
||||
|
||||
public async rename(
|
||||
|
|
@ -96,43 +88,14 @@ export class SafeFileSystemOperations implements FileSystemOperations {
|
|||
this.logger.debug(`Renaming file '${oldPath}' to '${newPath}'`);
|
||||
return this.safeOperation(
|
||||
oldPath,
|
||||
this.decorateToHoldLock([oldPath, newPath], async () =>
|
||||
this.fs.rename(oldPath, newPath)
|
||||
),
|
||||
async () =>
|
||||
this.locks.withLock([oldPath, newPath], () =>
|
||||
this.fs.rename(oldPath, newPath)
|
||||
),
|
||||
"rename"
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Decorate an operation to ensure that the file is locked before running it
|
||||
* and that the lock is released afterwards. This results in at-most one
|
||||
* concurrent operation running per file.
|
||||
*/
|
||||
private decorateToHoldLock<T>(
|
||||
pathOrPaths: RelativePath | RelativePath[],
|
||||
operation: () => Promise<T>
|
||||
): () => Promise<T> {
|
||||
return async () => {
|
||||
const paths = Array.isArray(pathOrPaths)
|
||||
? pathOrPaths
|
||||
: [pathOrPaths];
|
||||
|
||||
await Promise.all(
|
||||
paths.map(async (path) => this.locks.waitForLock(path))
|
||||
);
|
||||
|
||||
try {
|
||||
return await operation();
|
||||
} finally {
|
||||
await Promise.all(
|
||||
paths.map((path) => {
|
||||
this.locks.unlock(path);
|
||||
})
|
||||
);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Decorate an operation to ensure that the file exists before running it.
|
||||
* If the operation fails, it will check if the file still exists and throw
|
||||
|
|
|
|||
|
|
@ -13,7 +13,54 @@ export class Locks<T> {
|
|||
/** Queue of resolve functions waiting for each key */
|
||||
private readonly waiters = new Map<T, (() => unknown)[]>();
|
||||
|
||||
public constructor(private readonly logger: Logger) {}
|
||||
public constructor(private readonly logger?: Logger) {}
|
||||
|
||||
/**
|
||||
* Executes a function while holding exclusive locks on one or more keys.
|
||||
*
|
||||
* This method ensures that the provided function runs with exclusive access to the
|
||||
* specified key(s). Multiple keys are sorted to prevent deadlocks when different
|
||||
* operations request the same keys in different orders.
|
||||
*
|
||||
* @template R The return type of the function to execute
|
||||
* @param keyOrKeys A single key or array of keys to lock during function execution
|
||||
* @param fn The function to execute while holding the lock(s). Can be sync or async.
|
||||
* @returns A Promise that resolves to the return value of the executed function
|
||||
*
|
||||
* @example
|
||||
* ```typescript
|
||||
* // Lock a single key
|
||||
* const result = await locks.withLock('file1', () => {
|
||||
* // Critical section - only one operation can access 'file1' at a time
|
||||
* return processFile('file1');
|
||||
* });
|
||||
*
|
||||
* // Lock multiple keys (prevents deadlocks through consistent ordering)
|
||||
* await locks.withLock(['file1', 'file2'], async () => {
|
||||
* // Critical section - exclusive access to both files
|
||||
* await moveFile('file1', 'file2');
|
||||
* });
|
||||
* ```
|
||||
*
|
||||
* @throws Any error thrown by the provided function will be propagated after locks are released
|
||||
*/
|
||||
public async withLock<R>(
|
||||
keyOrKeys: T | T[],
|
||||
fn: () => R | Promise<R>
|
||||
): Promise<R> {
|
||||
const keys = Array.isArray(keyOrKeys) ? keyOrKeys : [keyOrKeys];
|
||||
keys.sort(); // Ensure consistent order to prevent deadlocks
|
||||
|
||||
await Promise.all(keys.map(async (key) => this.waitForLock(key)));
|
||||
|
||||
try {
|
||||
return await fn();
|
||||
} finally {
|
||||
keys.forEach((key) => {
|
||||
this.unlock(key);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Attempts to acquire a lock immediately without waiting.
|
||||
|
|
@ -22,7 +69,7 @@ export class Locks<T> {
|
|||
* @param key The key to lock
|
||||
* @returns `true` if lock acquired, `false` if already locked
|
||||
*/
|
||||
public tryLock(key: T): boolean {
|
||||
private tryLock(key: T): boolean {
|
||||
if (this.locked.has(key)) {
|
||||
return false;
|
||||
}
|
||||
|
|
@ -39,12 +86,12 @@ export class Locks<T> {
|
|||
* @param key The key to wait for and lock
|
||||
* @returns Promise that resolves when lock is acquired
|
||||
*/
|
||||
public async waitForLock(key: T): Promise<void> {
|
||||
private async waitForLock(key: T): Promise<void> {
|
||||
if (this.tryLock(key)) {
|
||||
return Promise.resolve();
|
||||
}
|
||||
|
||||
this.logger.debug(`Waiting for lock on ${key}`);
|
||||
this.logger?.debug(`Waiting for lock on ${key}`);
|
||||
|
||||
return new Promise((resolve) => {
|
||||
// DefaultDict behavior
|
||||
|
|
@ -65,7 +112,7 @@ export class Locks<T> {
|
|||
* @param key The key to unlock
|
||||
* @throws {Error} If key is not currently locked
|
||||
*/
|
||||
public unlock(key: T): void {
|
||||
private unlock(key: T): void {
|
||||
if (!this.locked.has(key)) {
|
||||
throw new Error(`Key '${key}' is not locked, cannot unlock`);
|
||||
}
|
||||
|
|
@ -74,19 +121,22 @@ export class Locks<T> {
|
|||
const nextWaiting = this.waiters.get(key)?.shift();
|
||||
|
||||
if (nextWaiting) {
|
||||
this.logger.debug(`Granted lock on ${key}`);
|
||||
this.logger?.debug(`Granted lock on ${key}`);
|
||||
nextWaiting();
|
||||
} else {
|
||||
this.locked.delete(key);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Clears all locks and waiters. Causes waiting operations to hang indefinitely.
|
||||
* Use with caution.
|
||||
*/
|
||||
public reset(): void {
|
||||
this.locked.clear();
|
||||
this.waiters.clear();
|
||||
export class Lock {
|
||||
private readonly locks: Locks<boolean>;
|
||||
|
||||
public constructor(logger?: Logger) {
|
||||
this.locks = new Locks(logger);
|
||||
}
|
||||
|
||||
public async withLock<R>(fn: () => R | Promise<R>): Promise<R> {
|
||||
return this.locks.withLock(true, fn);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -25,15 +25,18 @@ export function flakyWebSocketFactory(
|
|||
|
||||
public set onmessage(callback: (event: MessageEvent) => void) {
|
||||
super.onmessage = async (event: MessageEvent): Promise<void> => {
|
||||
await this.locks.waitForLock(FlakyWebSocket.RECEIVE_KEY);
|
||||
return this.locks.withLock(
|
||||
FlakyWebSocket.RECEIVE_KEY,
|
||||
async () => {
|
||||
if (jitterScaleInSeconds > 0) {
|
||||
await sleep(
|
||||
Math.random() * jitterScaleInSeconds * 1000
|
||||
);
|
||||
}
|
||||
|
||||
if (jitterScaleInSeconds > 0) {
|
||||
await sleep(Math.random() * jitterScaleInSeconds * 1000);
|
||||
}
|
||||
|
||||
callback(event);
|
||||
|
||||
this.locks.unlock(FlakyWebSocket.RECEIVE_KEY);
|
||||
callback(event);
|
||||
}
|
||||
);
|
||||
};
|
||||
}
|
||||
|
||||
|
|
@ -67,15 +70,13 @@ export function flakyWebSocketFactory(
|
|||
data: string | ArrayBufferLike | Blob | ArrayBufferView
|
||||
): Promise<void> {
|
||||
// maintain message order
|
||||
await this.locks.waitForLock(FlakyWebSocket.SEND_KEY);
|
||||
return this.locks.withLock(FlakyWebSocket.SEND_KEY, async () => {
|
||||
if (jitterScaleInSeconds > 0) {
|
||||
await sleep(Math.random() * jitterScaleInSeconds * 1000);
|
||||
}
|
||||
|
||||
if (jitterScaleInSeconds > 0) {
|
||||
await sleep(Math.random() * jitterScaleInSeconds * 1000);
|
||||
}
|
||||
|
||||
super.send(data);
|
||||
|
||||
this.locks.unlock(FlakyWebSocket.SEND_KEY);
|
||||
super.send(data);
|
||||
});
|
||||
}
|
||||
} as unknown as typeof WebSocket;
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue