Reject pending locks on reset
This commit is contained in:
parent
299c3baea9
commit
580c993071
1 changed files with 15 additions and 7 deletions
|
|
@ -1,3 +1,4 @@
|
||||||
|
import { SyncResetError } from "../../services/sync-reset-error";
|
||||||
import type { Logger } from "../../tracing/logger";
|
import type { Logger } from "../../tracing/logger";
|
||||||
import { awaitAll } from "../await-all";
|
import { awaitAll } from "../await-all";
|
||||||
|
|
||||||
|
|
@ -12,9 +13,9 @@ export class Locks<T> {
|
||||||
private readonly locked = new Set<T>();
|
private readonly locked = new Set<T>();
|
||||||
|
|
||||||
/** Queue of resolve functions waiting for each key */
|
/** Queue of resolve functions waiting for each key */
|
||||||
private readonly waiters = new Map<T, (() => unknown)[]>();
|
private readonly waiters = new Map<T, ([() => unknown, (err: unknown) => unknown])[]>();
|
||||||
|
|
||||||
public constructor(private readonly logger?: Logger) {}
|
public constructor(private readonly logger?: Logger) { }
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Executes a function while holding exclusive locks on one or more keys.
|
* Executes a function while holding exclusive locks on one or more keys.
|
||||||
|
|
@ -67,6 +68,13 @@ export class Locks<T> {
|
||||||
}
|
}
|
||||||
|
|
||||||
public reset(): void {
|
public reset(): void {
|
||||||
|
// Resolve all waiting promises before clearing to prevent deadlock
|
||||||
|
// Any operation waiting for a lock will be granted access immediately
|
||||||
|
for (const waiting of this.waiters.values()) {
|
||||||
|
for (const [_, reject] of waiting) {
|
||||||
|
reject(new SyncResetError());
|
||||||
|
}
|
||||||
|
}
|
||||||
this.locked.clear();
|
this.locked.clear();
|
||||||
this.waiters.clear();
|
this.waiters.clear();
|
||||||
}
|
}
|
||||||
|
|
@ -102,7 +110,7 @@ export class Locks<T> {
|
||||||
|
|
||||||
this.logger?.debug(`Waiting for lock on ${key}`);
|
this.logger?.debug(`Waiting for lock on ${key}`);
|
||||||
|
|
||||||
return new Promise((resolve) => {
|
return new Promise((resolve, reject) => {
|
||||||
// DefaultDict behavior
|
// DefaultDict behavior
|
||||||
let waiting = this.waiters.get(key);
|
let waiting = this.waiters.get(key);
|
||||||
if (!waiting) {
|
if (!waiting) {
|
||||||
|
|
@ -110,7 +118,7 @@ export class Locks<T> {
|
||||||
this.waiters.set(key, waiting);
|
this.waiters.set(key, waiting);
|
||||||
}
|
}
|
||||||
|
|
||||||
waiting.push(resolve);
|
waiting.push([resolve, reject]);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -127,11 +135,11 @@ export class Locks<T> {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Remove first waiter to ensure FIFO order
|
// Remove first waiter to ensure FIFO order
|
||||||
const nextWaiting = this.waiters.get(key)?.shift();
|
const [resolveNextWaiting, _] = this.waiters.get(key)?.shift() ?? [];
|
||||||
|
|
||||||
if (nextWaiting) {
|
if (resolveNextWaiting) {
|
||||||
this.logger?.debug(`Granted lock on ${key}`);
|
this.logger?.debug(`Granted lock on ${key}`);
|
||||||
nextWaiting();
|
resolveNextWaiting();
|
||||||
} else {
|
} else {
|
||||||
this.locked.delete(key);
|
this.locked.delete(key);
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue