Add awaitAll
This commit is contained in:
parent
ef4444afc2
commit
d8058d396c
8 changed files with 100 additions and 17 deletions
|
|
@ -42,7 +42,12 @@ export default [
|
||||||
{
|
{
|
||||||
object: "Promise",
|
object: "Promise",
|
||||||
property: "all",
|
property: "all",
|
||||||
message: "Use Promise.allSettled instead of Promise.all to always await all promises."
|
message: "Use `awaitAll` instead of Promise.all to always await all promises."
|
||||||
|
},
|
||||||
|
{
|
||||||
|
object: "Promise",
|
||||||
|
property: "allSettled",
|
||||||
|
message: "Use `awaitAll` instead of Promise.allSettled to always await all promises and throw on errors."
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
object: "String",
|
object: "String",
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,7 @@
|
||||||
import type { Logger } from "../tracing/logger";
|
import type { Logger } from "../tracing/logger";
|
||||||
import { EMPTY_HASH } from "../utils/hash";
|
import { EMPTY_HASH } from "../utils/hash";
|
||||||
import { CoveredValues } from "../utils/data-structures/min-covered";
|
import { CoveredValues } from "../utils/data-structures/min-covered";
|
||||||
|
import { awaitAll } from "../utils/await-all";
|
||||||
|
|
||||||
export type VaultUpdateId = number;
|
export type VaultUpdateId = number;
|
||||||
export type DocumentId = string;
|
export type DocumentId = string;
|
||||||
|
|
@ -183,7 +184,7 @@ export class Database {
|
||||||
|
|
||||||
const currentPromises = entry.updates;
|
const currentPromises = entry.updates;
|
||||||
entry.updates = [...currentPromises, promise];
|
entry.updates = [...currentPromises, promise];
|
||||||
await Promise.allSettled(currentPromises);
|
await awaitAll(currentPromises);
|
||||||
|
|
||||||
return entry;
|
return entry;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -6,6 +6,7 @@ import type { CursorPositionFromClient } from "./types/CursorPositionFromClient"
|
||||||
import type { ClientCursors } from "./types/ClientCursors";
|
import type { ClientCursors } from "./types/ClientCursors";
|
||||||
import { createPromise } from "../utils/create-promise";
|
import { createPromise } from "../utils/create-promise";
|
||||||
import type { WebSocketVaultUpdate } from "./types/WebSocketVaultUpdate";
|
import type { WebSocketVaultUpdate } from "./types/WebSocketVaultUpdate";
|
||||||
|
import { awaitAll } from "../utils/await-all";
|
||||||
|
|
||||||
export class WebSocketManager {
|
export class WebSocketManager {
|
||||||
private readonly webSocketStatusChangeListeners: ((
|
private readonly webSocketStatusChangeListeners: ((
|
||||||
|
|
@ -98,13 +99,13 @@ export class WebSocketManager {
|
||||||
await promise;
|
await promise;
|
||||||
}
|
}
|
||||||
|
|
||||||
await Promise.allSettled(this.outstandingPromises).then(() => {});
|
await awaitAll(this.outstandingPromises).then(() => {});
|
||||||
}
|
}
|
||||||
|
|
||||||
public sendHandshakeMessage(
|
public sendHandshakeMessage(
|
||||||
message: WebSocketClientMessage & { type: "handshake" }
|
message: WebSocketClientMessage & { type: "handshake" }
|
||||||
): void {
|
): void {
|
||||||
const {webSocket} = this;
|
const { webSocket } = this;
|
||||||
if (!webSocket) {
|
if (!webSocket) {
|
||||||
throw new Error(
|
throw new Error(
|
||||||
"WebSocket is not connected, cannot send handshake message"
|
"WebSocket is not connected, cannot send handshake message"
|
||||||
|
|
@ -126,7 +127,7 @@ export class WebSocketManager {
|
||||||
type: "cursorPositions",
|
type: "cursorPositions",
|
||||||
...cursorPositions
|
...cursorPositions
|
||||||
};
|
};
|
||||||
const {webSocket} = this;
|
const { webSocket } = this;
|
||||||
if (!webSocket) {
|
if (!webSocket) {
|
||||||
this.logger.warn(
|
this.logger.warn(
|
||||||
"WebSocket is not connected, cannot send cursor positions"
|
"WebSocket is not connected, cannot send cursor positions"
|
||||||
|
|
|
||||||
|
|
@ -20,6 +20,7 @@ import type { DocumentVersionWithoutContent } from "../services/types/DocumentVe
|
||||||
import type { WebSocketVaultUpdate } from "../services/types/WebSocketVaultUpdate";
|
import type { WebSocketVaultUpdate } from "../services/types/WebSocketVaultUpdate";
|
||||||
import type { WebSocketManager } from "../services/websocket-manager";
|
import type { WebSocketManager } from "../services/websocket-manager";
|
||||||
import type { WebSocketClientMessage } from "../services/types/WebSocketClientMessage";
|
import type { WebSocketClientMessage } from "../services/types/WebSocketClientMessage";
|
||||||
|
import { awaitAll } from "../utils/await-all";
|
||||||
|
|
||||||
export class Syncer {
|
export class Syncer {
|
||||||
private readonly remoteDocumentsLock: Locks<DocumentId>;
|
private readonly remoteDocumentsLock: Locks<DocumentId>;
|
||||||
|
|
@ -277,7 +278,7 @@ export class Syncer {
|
||||||
message: WebSocketVaultUpdate
|
message: WebSocketVaultUpdate
|
||||||
): Promise<void> {
|
): Promise<void> {
|
||||||
try {
|
try {
|
||||||
const handlerPromise = Promise.allSettled(
|
const handlerPromise = awaitAll(
|
||||||
message.documents.map(async (document) =>
|
message.documents.map(async (document) =>
|
||||||
this.internalSyncRemotelyUpdatedFile(document)
|
this.internalSyncRemotelyUpdatedFile(document)
|
||||||
)
|
)
|
||||||
|
|
@ -405,7 +406,7 @@ export class Syncer {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
const updates = Promise.allSettled(
|
const updates = awaitAll(
|
||||||
allLocalFiles.map(async (relativePath) => {
|
allLocalFiles.map(async (relativePath) => {
|
||||||
if (
|
if (
|
||||||
this.database.getLatestDocumentByRelativePath(relativePath)
|
this.database.getLatestDocumentByRelativePath(relativePath)
|
||||||
|
|
@ -463,7 +464,7 @@ export class Syncer {
|
||||||
})
|
})
|
||||||
);
|
);
|
||||||
|
|
||||||
const deletes = Promise.allSettled(
|
const deletes = awaitAll(
|
||||||
locallyPossiblyDeletedFiles.map(async ({ relativePath }) => {
|
locallyPossiblyDeletedFiles.map(async ({ relativePath }) => {
|
||||||
this.logger.debug(
|
this.logger.debug(
|
||||||
`Document ${relativePath} has been deleted locally, scheduling sync to delete it`
|
`Document ${relativePath} has been deleted locally, scheduling sync to delete it`
|
||||||
|
|
@ -474,7 +475,7 @@ export class Syncer {
|
||||||
})
|
})
|
||||||
);
|
);
|
||||||
|
|
||||||
await Promise.allSettled([updates, deletes]);
|
await awaitAll([updates, deletes]);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -487,7 +488,7 @@ export class Syncer {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
const [allLocalFiles, remote] = await Promise.allSettled([
|
const [allLocalFiles, remote] = await awaitAll([
|
||||||
this.operations.listFilesRecursively(),
|
this.operations.listFilesRecursively(),
|
||||||
this.syncQueue.add(async () => this.syncService.getAll())
|
this.syncQueue.add(async () => this.syncService.getAll())
|
||||||
]);
|
]);
|
||||||
|
|
|
||||||
56
frontend/sync-client/src/utils/await-all.test.ts
Normal file
56
frontend/sync-client/src/utils/await-all.test.ts
Normal file
|
|
@ -0,0 +1,56 @@
|
||||||
|
import { test } from "node:test";
|
||||||
|
import assert from "node:assert";
|
||||||
|
import { awaitAll } from "./await-all";
|
||||||
|
|
||||||
|
void test("awaitAll resolves promises of the same type", async () => {
|
||||||
|
const promises = [
|
||||||
|
Promise.resolve(1),
|
||||||
|
Promise.resolve(2),
|
||||||
|
Promise.resolve(3)
|
||||||
|
];
|
||||||
|
|
||||||
|
const results = await awaitAll(promises);
|
||||||
|
assert.deepStrictEqual(results, [1, 2, 3]);
|
||||||
|
});
|
||||||
|
|
||||||
|
void test("awaitAll resolves promises of different types", async () => {
|
||||||
|
const promises = [
|
||||||
|
Promise.resolve("hello"),
|
||||||
|
Promise.resolve(42),
|
||||||
|
Promise.resolve(true)
|
||||||
|
] as const;
|
||||||
|
|
||||||
|
const results = await awaitAll(promises);
|
||||||
|
|
||||||
|
// Type assertions to verify type inference
|
||||||
|
const str: string = results[0];
|
||||||
|
const num: number = results[1];
|
||||||
|
const bool: boolean = results[2];
|
||||||
|
|
||||||
|
assert.strictEqual(str, "hello");
|
||||||
|
assert.strictEqual(num, 42);
|
||||||
|
assert.strictEqual(bool, true);
|
||||||
|
});
|
||||||
|
|
||||||
|
void test("awaitAll throws on first rejection", async () => {
|
||||||
|
const error = new Error("Test error");
|
||||||
|
const promises = [
|
||||||
|
Promise.resolve(1),
|
||||||
|
Promise.reject(error),
|
||||||
|
Promise.resolve(3)
|
||||||
|
];
|
||||||
|
|
||||||
|
await assert.rejects(async () => {
|
||||||
|
await awaitAll(promises);
|
||||||
|
}, error);
|
||||||
|
});
|
||||||
|
|
||||||
|
void test("awaitAll works with async functions", async () => {
|
||||||
|
const asyncString = async (): Promise<string> => "async";
|
||||||
|
const asyncNumber = async (): Promise<number> => 123;
|
||||||
|
|
||||||
|
const results = await awaitAll([asyncString(), asyncNumber()]);
|
||||||
|
|
||||||
|
assert.strictEqual(results[0], "async");
|
||||||
|
assert.strictEqual(results[1], 123);
|
||||||
|
});
|
||||||
22
frontend/sync-client/src/utils/await-all.ts
Normal file
22
frontend/sync-client/src/utils/await-all.ts
Normal file
|
|
@ -0,0 +1,22 @@
|
||||||
|
type PromiseTuple<T extends readonly unknown[]> = readonly [
|
||||||
|
...{ [K in keyof T]: Promise<T[K]> }
|
||||||
|
];
|
||||||
|
|
||||||
|
type ResolvedTuple<T extends readonly unknown[]> = {
|
||||||
|
[K in keyof T]: T[K];
|
||||||
|
};
|
||||||
|
|
||||||
|
export const awaitAll = async <T extends readonly unknown[]>(
|
||||||
|
promises: PromiseTuple<T>
|
||||||
|
): Promise<ResolvedTuple<T>> => {
|
||||||
|
const result = await Promise.allSettled(promises);
|
||||||
|
for (const res of result) {
|
||||||
|
if (res.status === "rejected") {
|
||||||
|
throw res.reason;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return result.map(
|
||||||
|
(res) => (res as PromiseFulfilledResult<unknown>).value
|
||||||
|
) as ResolvedTuple<T>;
|
||||||
|
};
|
||||||
|
|
@ -1,4 +1,5 @@
|
||||||
import type { Logger } from "../../tracing/logger";
|
import type { Logger } from "../../tracing/logger";
|
||||||
|
import { awaitAll } from "../await-all";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Manages exclusive locks on items to prevent concurrent modifications.
|
* Manages exclusive locks on items to prevent concurrent modifications.
|
||||||
|
|
@ -54,9 +55,7 @@ export class Locks<T> {
|
||||||
const uniqueKeys = Array.from(new Set(keys));
|
const uniqueKeys = Array.from(new Set(keys));
|
||||||
uniqueKeys.sort((a, b) => String(a).localeCompare(String(b))); // Ensure consistent order to prevent deadlocks
|
uniqueKeys.sort((a, b) => String(a).localeCompare(String(b))); // Ensure consistent order to prevent deadlocks
|
||||||
|
|
||||||
await Promise.allSettled(
|
await awaitAll(uniqueKeys.map(async (key) => this.waitForLock(key)));
|
||||||
uniqueKeys.map(async (key) => this.waitForLock(key))
|
|
||||||
);
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
return await fn();
|
return await fn();
|
||||||
|
|
|
||||||
|
|
@ -53,13 +53,11 @@ async function runTest({
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
await Promise.allSettled(clients.map(async (client) => client.init()));
|
await Promise.all(clients.map(async (client) => client.init()));
|
||||||
|
|
||||||
for (let i = 0; i < iterations; i++) {
|
for (let i = 0; i < iterations; i++) {
|
||||||
console.info(`Iteration ${i + 1}/${iterations}`);
|
console.info(`Iteration ${i + 1}/${iterations}`);
|
||||||
await Promise.allSettled(
|
await Promise.all(clients.map(async (client) => client.act()));
|
||||||
clients.map(async (client) => client.act())
|
|
||||||
);
|
|
||||||
await sleep(100);
|
await sleep(100);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue