use allSettled
This commit is contained in:
parent
e51fcf296f
commit
c4f40b3549
5 changed files with 15 additions and 22 deletions
|
|
@ -183,7 +183,7 @@ export class Database {
|
||||||
|
|
||||||
const currentPromises = entry.updates;
|
const currentPromises = entry.updates;
|
||||||
entry.updates = [...currentPromises, promise];
|
entry.updates = [...currentPromises, promise];
|
||||||
await Promise.all(currentPromises);
|
await Promise.allSettled(currentPromises);
|
||||||
|
|
||||||
return entry;
|
return entry;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -26,6 +26,8 @@ export class Syncer {
|
||||||
private readonly remainingOperationsListeners: ((
|
private readonly remainingOperationsListeners: ((
|
||||||
remainingOperations: number
|
remainingOperations: number
|
||||||
) => unknown)[] = [];
|
) => unknown)[] = [];
|
||||||
|
|
||||||
|
// FIFO to limit the number of concurrent sync operations
|
||||||
private readonly syncQueue: PQueue;
|
private readonly syncQueue: PQueue;
|
||||||
|
|
||||||
private _isFirstSyncComplete = false;
|
private _isFirstSyncComplete = false;
|
||||||
|
|
@ -83,15 +85,6 @@ export class Syncer {
|
||||||
this.remainingOperationsListeners.push(listener);
|
this.remainingOperationsListeners.push(listener);
|
||||||
}
|
}
|
||||||
|
|
||||||
public removeRemainingOperationsListener(
|
|
||||||
listener: (remainingOperations: number) => unknown
|
|
||||||
): void {
|
|
||||||
const index = this.remainingOperationsListeners.indexOf(listener);
|
|
||||||
if (index !== -1) {
|
|
||||||
this.remainingOperationsListeners.splice(index, 1);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public async syncLocallyCreatedFile(
|
public async syncLocallyCreatedFile(
|
||||||
relativePath: RelativePath
|
relativePath: RelativePath
|
||||||
): Promise<void> {
|
): Promise<void> {
|
||||||
|
|
@ -280,10 +273,6 @@ export class Syncer {
|
||||||
return this.syncQueue.onEmpty();
|
return this.syncQueue.onEmpty();
|
||||||
}
|
}
|
||||||
|
|
||||||
public async reset(): Promise<void> {
|
|
||||||
await this.waitUntilFinished();
|
|
||||||
}
|
|
||||||
|
|
||||||
public async syncRemotelyUpdatedFile(
|
public async syncRemotelyUpdatedFile(
|
||||||
message: WebSocketVaultUpdate
|
message: WebSocketVaultUpdate
|
||||||
): Promise<void> {
|
): Promise<void> {
|
||||||
|
|
@ -416,7 +405,7 @@ export class Syncer {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
const updates = Promise.all(
|
const updates = Promise.allSettled(
|
||||||
allLocalFiles.map(async (relativePath) => {
|
allLocalFiles.map(async (relativePath) => {
|
||||||
if (
|
if (
|
||||||
this.database.getLatestDocumentByRelativePath(relativePath)
|
this.database.getLatestDocumentByRelativePath(relativePath)
|
||||||
|
|
@ -474,7 +463,7 @@ export class Syncer {
|
||||||
})
|
})
|
||||||
);
|
);
|
||||||
|
|
||||||
const deletes = Promise.all(
|
const deletes = Promise.allSettled(
|
||||||
locallyPossiblyDeletedFiles.map(async ({ relativePath }) => {
|
locallyPossiblyDeletedFiles.map(async ({ relativePath }) => {
|
||||||
this.logger.debug(
|
this.logger.debug(
|
||||||
`Document ${relativePath} has been deleted locally, scheduling sync to delete it`
|
`Document ${relativePath} has been deleted locally, scheduling sync to delete it`
|
||||||
|
|
@ -485,7 +474,7 @@ export class Syncer {
|
||||||
})
|
})
|
||||||
);
|
);
|
||||||
|
|
||||||
await Promise.all([updates, deletes]);
|
await Promise.allSettled([updates, deletes]);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -498,7 +487,7 @@ export class Syncer {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
const [allLocalFiles, remote] = await Promise.all([
|
const [allLocalFiles, remote] = await Promise.allSettled([
|
||||||
this.operations.listFilesRecursively(),
|
this.operations.listFilesRecursively(),
|
||||||
this.syncQueue.add(async () => this.syncService.getAll())
|
this.syncQueue.add(async () => this.syncService.getAll())
|
||||||
]);
|
]);
|
||||||
|
|
|
||||||
|
|
@ -54,7 +54,9 @@ export class Locks<T> {
|
||||||
const uniqueKeys = Array.from(new Set(keys));
|
const uniqueKeys = Array.from(new Set(keys));
|
||||||
uniqueKeys.sort((a, b) => String(a).localeCompare(String(b))); // Ensure consistent order to prevent deadlocks
|
uniqueKeys.sort((a, b) => String(a).localeCompare(String(b))); // Ensure consistent order to prevent deadlocks
|
||||||
|
|
||||||
await Promise.all(uniqueKeys.map(async (key) => this.waitForLock(key)));
|
await Promise.allSettled(
|
||||||
|
uniqueKeys.map(async (key) => this.waitForLock(key))
|
||||||
|
);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
return await fn();
|
return await fn();
|
||||||
|
|
|
||||||
|
|
@ -127,7 +127,7 @@ export class MockAgent extends MockClient {
|
||||||
|
|
||||||
public async finish(): Promise<void> {
|
public async finish(): Promise<void> {
|
||||||
await this.client.setSetting("isSyncEnabled", true);
|
await this.client.setSetting("isSyncEnabled", true);
|
||||||
await Promise.all(this.pendingActions);
|
await Promise.allSettled(this.pendingActions);
|
||||||
await this.client.waitAndStop();
|
await this.client.waitAndStop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -53,11 +53,13 @@ async function runTest({
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
await Promise.all(clients.map(async (client) => client.init()));
|
await Promise.allSettled(clients.map(async (client) => client.init()));
|
||||||
|
|
||||||
for (let i = 0; i < iterations; i++) {
|
for (let i = 0; i < iterations; i++) {
|
||||||
console.info(`Iteration ${i + 1}/${iterations}`);
|
console.info(`Iteration ${i + 1}/${iterations}`);
|
||||||
await Promise.all(clients.map(async (client) => client.act()));
|
await Promise.allSettled(
|
||||||
|
clients.map(async (client) => client.act())
|
||||||
|
);
|
||||||
await sleep(100);
|
await sleep(100);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue