Start fixing tests
This commit is contained in:
parent
727b6b7ed5
commit
7fcd0f0bfa
19 changed files with 210 additions and 218 deletions
|
|
@ -80,5 +80,4 @@ And to clean up the logs & database files, run `task clean`
|
|||
|
||||
- [Sync server](./sync-server/README.md)
|
||||
|
||||
|
||||
remove force merge everywhere
|
||||
|
|
|
|||
|
|
@ -80,10 +80,10 @@ async function main(): Promise<void> {
|
|||
|
||||
if (!result.success) {
|
||||
allPassed = false;
|
||||
logger.error(`\n✗ FAILED: ${test.name}`);
|
||||
logger.error(`✗ FAILED: ${test.name}`);
|
||||
logger.error(`Error: ${result.error}`);
|
||||
} else {
|
||||
logger.info(`\n✓ PASSED: ${test.name} (${result.duration}ms)`);
|
||||
logger.info(`✓ PASSED: ${test.name} (${result.duration}ms)`);
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
|
|
|
|||
|
|
@ -46,12 +46,11 @@ export class TestRunner {
|
|||
for (let i = 0; i < test.steps.length; i++) {
|
||||
const step = test.steps[i];
|
||||
this.logger.info(
|
||||
`\nStep ${i + 1}/${test.steps.length}: ${JSON.stringify(step)}`
|
||||
`Step ${i + 1}/${test.steps.length}: ${JSON.stringify(step)}`
|
||||
);
|
||||
await this.executeStep(step);
|
||||
}
|
||||
|
||||
// Cleanup
|
||||
await this.cleanup();
|
||||
|
||||
const duration = Date.now() - startTime;
|
||||
|
|
|
|||
|
|
@ -33,14 +33,13 @@ export type { AuthenticationError } from "./errors/authentication-error";
|
|||
export type { MaybeOutdatedClientCursors } from "./types/maybe-outdated-client-cursors";
|
||||
export { DocumentSyncStatus } from "./types/document-sync-status";
|
||||
export { SyncClient } from "./sync-client";
|
||||
export { __debug_locks } from "./sync-operations/syncer";
|
||||
export type { TextWithCursors, CursorPosition } from "reconcile-text";
|
||||
|
||||
export const debugging = {
|
||||
slowFetchFactory,
|
||||
slowWebSocketFactory,
|
||||
logToConsole,
|
||||
InMemoryFileSystem,
|
||||
InMemoryFileSystem
|
||||
};
|
||||
|
||||
export const utils = {
|
||||
|
|
|
|||
|
|
@ -130,11 +130,7 @@ export class Database {
|
|||
target.metadata,
|
||||
null,
|
||||
2
|
||||
)} to ${JSON.stringify(
|
||||
metadata,
|
||||
null,
|
||||
2
|
||||
)}`
|
||||
)} to ${JSON.stringify(metadata, null, 2)}`
|
||||
);
|
||||
|
||||
target.metadata = metadata;
|
||||
|
|
@ -142,7 +138,6 @@ export class Database {
|
|||
this.saveInTheBackground();
|
||||
}
|
||||
|
||||
|
||||
public getLatestDocumentByRelativePath(
|
||||
find: RelativePath
|
||||
): DocumentRecord | undefined {
|
||||
|
|
@ -153,9 +148,8 @@ export class Database {
|
|||
return candidates[0];
|
||||
}
|
||||
|
||||
|
||||
public createNewPendingDocument(
|
||||
relativePath: RelativePath,
|
||||
relativePath: RelativePath
|
||||
): DocumentRecord {
|
||||
this.logger.debug(`Creating new pending document: ${relativePath}`);
|
||||
const previousEntry =
|
||||
|
|
@ -223,7 +217,6 @@ export class Database {
|
|||
candidate.isDeleted = true;
|
||||
}
|
||||
|
||||
|
||||
public removeDocument(find: DocumentRecord): void {
|
||||
removeFromArray(this.documents, find);
|
||||
this.saveInTheBackground();
|
||||
|
|
|
|||
|
|
@ -40,7 +40,6 @@ export class SyncClient {
|
|||
private readonly history: SyncHistory,
|
||||
private readonly settings: Settings,
|
||||
private readonly database: Database,
|
||||
private readonly unrestrictedSyncer: UnrestrictedSyncer,
|
||||
private readonly syncer: Syncer,
|
||||
private readonly webSocketManager: WebSocketManager,
|
||||
public readonly logger: Logger,
|
||||
|
|
@ -221,7 +220,6 @@ export class SyncClient {
|
|||
history,
|
||||
settings,
|
||||
database,
|
||||
unrestrictedSyncer,
|
||||
syncer,
|
||||
webSocketManager,
|
||||
logger,
|
||||
|
|
@ -410,7 +408,6 @@ export class SyncClient {
|
|||
return DocumentSyncStatus.SYNCING;
|
||||
}
|
||||
|
||||
|
||||
return this.syncer.hasPendingOperationsForDocument(relativePath)
|
||||
? DocumentSyncStatus.SYNCING
|
||||
: DocumentSyncStatus.UP_TO_DATE;
|
||||
|
|
|
|||
|
|
@ -11,7 +11,6 @@ import type { Settings } from "../persistence/settings";
|
|||
import type { FileOperations } from "../file-operations/file-operations";
|
||||
import { findMatchingFile } from "../utils/find-matching-file";
|
||||
import type { UnrestrictedSyncer } from "./unrestricted-syncer";
|
||||
import { createPromise } from "../utils/create-promise";
|
||||
import { SyncResetError } from "../errors/sync-reset-error";
|
||||
import { Locks } from "../utils/data-structures/locks";
|
||||
import type { DocumentVersionWithoutContent } from "../services/types/DocumentVersionWithoutContent";
|
||||
|
|
@ -21,14 +20,12 @@ import type { WebSocketClientMessage } from "../services/types/WebSocketClientMe
|
|||
import { awaitAll } from "../utils/await-all";
|
||||
import { EventListeners } from "../utils/data-structures/event-listeners";
|
||||
|
||||
export const __debug_locks: Locks<any>[] = []; // Used only for debugging timeouts
|
||||
|
||||
export class Syncer {
|
||||
public readonly onRemainingOperationsCountChanged = new EventListeners<
|
||||
(remainingOperations: number) => unknown
|
||||
>();
|
||||
|
||||
public readonly updatedDocumentsByPathAndKeysLock: Locks<DocumentId | RelativePath>;
|
||||
public readonly updatedDocumentsByPathAndKeysLocks: Locks<string>; // can be DocumentId or RelativePath
|
||||
|
||||
// FIFO to limit the number of concurrent sync operations
|
||||
private readonly syncQueue: PQueue;
|
||||
|
|
@ -50,8 +47,9 @@ export class Syncer {
|
|||
concurrency: settings.getSettings().syncConcurrency
|
||||
});
|
||||
|
||||
this.updatedDocumentsByPathAndKeysLock = new Locks<DocumentId>(this.logger);
|
||||
__debug_locks.push(this.updatedDocumentsByPathAndKeysLock); // Used only for debugging timeouts
|
||||
this.updatedDocumentsByPathAndKeysLocks = new Locks<DocumentId>(
|
||||
this.logger
|
||||
);
|
||||
|
||||
settings.onSettingsChanged.add((newSettings, oldSettings) => {
|
||||
if (newSettings.syncConcurrency !== oldSettings.syncConcurrency) {
|
||||
|
|
@ -84,7 +82,7 @@ export class Syncer {
|
|||
}
|
||||
|
||||
public hasPendingOperationsForDocument(relativePath: string): boolean {
|
||||
return this.updatedDocumentsByPathAndKeysLock.isLocked(relativePath);
|
||||
return this.updatedDocumentsByPathAndKeysLocks.isLocked(relativePath);
|
||||
}
|
||||
|
||||
public async syncLocallyCreatedFile(
|
||||
|
|
@ -102,29 +100,26 @@ export class Syncer {
|
|||
return;
|
||||
}
|
||||
|
||||
const document = this.database.createNewPendingDocument(
|
||||
relativePath
|
||||
);
|
||||
const document = this.database.createNewPendingDocument(relativePath);
|
||||
|
||||
await this.enqueueSyncOperation(async () =>
|
||||
await this.enqueueSyncOperation(
|
||||
async () =>
|
||||
this.unrestrictedSyncer.unrestrictedSyncLocallyCreatedOrUpdatedFile(
|
||||
{
|
||||
document
|
||||
}
|
||||
), [relativePath]
|
||||
),
|
||||
[relativePath]
|
||||
);
|
||||
}
|
||||
|
||||
public async syncLocallyDeletedFile(
|
||||
relativePath: RelativePath
|
||||
): Promise<void> {
|
||||
const document = this.database.getLatestDocumentByRelativePath(relativePath);
|
||||
let document =
|
||||
this.database.getLatestDocumentByRelativePath(relativePath);
|
||||
|
||||
|
||||
if (
|
||||
document
|
||||
?.isDeleted === true
|
||||
) {
|
||||
if (document == null || document.isDeleted === true) {
|
||||
// This is must be a consequence of us deleting a file because of a remote update
|
||||
// which triggered a local delete, so we don't need to do anything here.
|
||||
this.logger.debug(
|
||||
|
|
@ -137,25 +132,13 @@ export class Syncer {
|
|||
// document which finishes after the delete has succeeded and would introduce a phantom metadata record.
|
||||
this.database.delete(relativePath);
|
||||
|
||||
|
||||
|
||||
await this.enqueueSyncOperation(async () => {
|
||||
const document = this.database.getLatestDocumentByRelativePath(relativePath);
|
||||
|
||||
if (document === undefined) {
|
||||
this.logger.debug(
|
||||
`Cannot find document ${relativePath} in the database, must have been deleted already, skipping`
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
await this.unrestrictedSyncer.unrestrictedSyncLocallyDeletedFile(
|
||||
document
|
||||
);
|
||||
|
||||
this.database.removeDocument(document);
|
||||
}, [document?.metadata?.documentId, relativePath]
|
||||
);
|
||||
}, [document?.metadata?.documentId, relativePath]);
|
||||
}
|
||||
|
||||
public async syncLocallyUpdatedFile({
|
||||
|
|
@ -165,18 +148,15 @@ export class Syncer {
|
|||
oldPath?: RelativePath;
|
||||
relativePath: RelativePath;
|
||||
}): Promise<void> {
|
||||
const documentAtNewPath = this.database.getLatestDocumentByRelativePath(
|
||||
relativePath
|
||||
);
|
||||
const documentAtNewPath =
|
||||
this.database.getLatestDocumentByRelativePath(relativePath);
|
||||
|
||||
if (oldPath !== undefined) {
|
||||
// We might have moved the document in the database before calling this method,
|
||||
// in that case, we mustn't move it again.
|
||||
if (
|
||||
documentAtNewPath ===
|
||||
undefined ||
|
||||
documentAtNewPath
|
||||
?.isDeleted === true
|
||||
documentAtNewPath === undefined ||
|
||||
documentAtNewPath.isDeleted
|
||||
) {
|
||||
if (oldPath === relativePath) {
|
||||
throw new Error(
|
||||
|
|
@ -188,7 +168,7 @@ export class Syncer {
|
|||
}
|
||||
}
|
||||
|
||||
let document =
|
||||
const document =
|
||||
this.database.getLatestDocumentByRelativePath(relativePath);
|
||||
|
||||
if (
|
||||
|
|
@ -216,17 +196,16 @@ export class Syncer {
|
|||
return;
|
||||
}
|
||||
|
||||
|
||||
await this.enqueueSyncOperation(async () =>
|
||||
await this.enqueueSyncOperation(
|
||||
async () =>
|
||||
this.unrestrictedSyncer.unrestrictedSyncLocallyCreatedOrUpdatedFile(
|
||||
{
|
||||
oldPath,
|
||||
document
|
||||
}
|
||||
), [document.metadata?.documentId, relativePath, oldPath]
|
||||
),
|
||||
[document.metadata?.documentId, relativePath, oldPath]
|
||||
);
|
||||
|
||||
|
||||
}
|
||||
|
||||
public async scheduleSyncForOfflineChanges(): Promise<void> {
|
||||
|
|
@ -290,7 +269,7 @@ export class Syncer {
|
|||
public reset(): void {
|
||||
this._isFirstSyncComplete = false;
|
||||
this.syncQueue.clear();
|
||||
this.updatedDocumentsByPathAndKeysLock.reset();
|
||||
this.updatedDocumentsByPathAndKeysLocks.reset();
|
||||
this.runningScheduleSyncForOfflineChanges = undefined;
|
||||
}
|
||||
|
||||
|
|
@ -310,13 +289,17 @@ export class Syncer {
|
|||
const document = this.database.getDocumentByDocumentId(
|
||||
remoteVersion.documentId
|
||||
);
|
||||
this.enqueueSyncOperation(async () =>
|
||||
await this.syncQueue.add(async () =>
|
||||
await this.enqueueSyncOperation(
|
||||
async () =>
|
||||
this.unrestrictedSyncer.unrestrictedSyncRemotelyUpdatedFile(
|
||||
remoteVersion,
|
||||
document
|
||||
)
|
||||
), [document?.relativePath, remoteVersion.relativePath, remoteVersion.documentId]
|
||||
),
|
||||
[
|
||||
document?.relativePath,
|
||||
remoteVersion.relativePath,
|
||||
remoteVersion.documentId
|
||||
]
|
||||
);
|
||||
|
||||
this.database.addSeenUpdateId(remoteVersion.vaultUpdateId);
|
||||
|
|
@ -465,10 +448,11 @@ export class Syncer {
|
|||
|
||||
private async enqueueSyncOperation<T>(
|
||||
operation: () => Promise<T>,
|
||||
keys: Array<DocumentId | RelativePath | undefined | null>
|
||||
keys: (DocumentId | undefined | null)[]
|
||||
): Promise<T> {
|
||||
return this.updatedDocumentsByPathAndKeysLock.withLock(keys.filter(k => k !== undefined && k !== null), async () =>
|
||||
this.syncQueue.add(operation)
|
||||
return this.updatedDocumentsByPathAndKeysLocks.withLock(
|
||||
keys.filter((k) => k !== undefined && k !== null),
|
||||
async () => this.syncQueue.add(operation)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -3,7 +3,6 @@ import type {
|
|||
DocumentRecord,
|
||||
RelativePath
|
||||
} from "../persistence/database";
|
||||
|
||||
import { diff } from "reconcile-text";
|
||||
import type { SyncService } from "../services/sync-service";
|
||||
import type { Logger } from "../tracing/logger";
|
||||
|
|
@ -18,11 +17,9 @@ import type {
|
|||
} from "../tracing/sync-history";
|
||||
import { SyncStatus, SyncType } from "../tracing/sync-history";
|
||||
import { EMPTY_HASH, hash } from "../utils/hash";
|
||||
|
||||
import { base64ToBytes } from "byte-base64";
|
||||
import type { Settings } from "../persistence/settings";
|
||||
import type { FileOperations } from "../file-operations/file-operations";
|
||||
import { createPromise } from "../utils/create-promise";
|
||||
import { FileNotFoundError } from "../errors/file-not-found-error";
|
||||
import { SyncResetError } from "../errors/sync-reset-error";
|
||||
import { globsToRegexes } from "../utils/globs-to-regexes";
|
||||
|
|
@ -33,7 +30,6 @@ import type { FixedSizeDocumentCache } from "../utils/data-structures/fix-sized-
|
|||
import { isFileTypeMergable } from "../utils/is-file-type-mergable";
|
||||
import { isBinary } from "../utils/is-binary";
|
||||
import type { ServerConfig } from "../services/server-config";
|
||||
import { Locks } from "../utils/data-structures/locks";
|
||||
|
||||
export class UnrestrictedSyncer {
|
||||
private ignorePatterns: RegExp[];
|
||||
|
|
@ -66,7 +62,7 @@ export class UnrestrictedSyncer {
|
|||
// We use the same code path for both local and remote updates. We need to force the update
|
||||
// if there are no local changes but we know that the remote version is newer.
|
||||
force = false,
|
||||
document,
|
||||
document
|
||||
}: {
|
||||
oldPath?: RelativePath;
|
||||
force?: boolean;
|
||||
|
|
@ -123,7 +119,6 @@ export class UnrestrictedSyncer {
|
|||
originalContentBytes: contentBytes,
|
||||
isCreate: true
|
||||
});
|
||||
|
||||
} else {
|
||||
const areThereLocalChanges =
|
||||
document.metadata.hash !== contentHash ||
|
||||
|
|
@ -351,7 +346,7 @@ export class UnrestrictedSyncer {
|
|||
remoteRelativePath: remoteVersion.relativePath
|
||||
},
|
||||
this.database.createNewPendingDocument(
|
||||
remoteVersion.relativePath,
|
||||
remoteVersion.relativePath
|
||||
)
|
||||
);
|
||||
|
||||
|
|
@ -365,7 +360,6 @@ export class UnrestrictedSyncer {
|
|||
remoteVersion.relativePath
|
||||
);
|
||||
|
||||
|
||||
this.history.addHistoryEntry({
|
||||
status: SyncStatus.SUCCESS,
|
||||
details: updateDetails,
|
||||
|
|
@ -376,8 +370,6 @@ export class UnrestrictedSyncer {
|
|||
});
|
||||
}
|
||||
|
||||
|
||||
|
||||
private async executeSync<T>(
|
||||
details: SyncDetails,
|
||||
fn: () => Promise<T>
|
||||
|
|
@ -481,9 +473,9 @@ export class UnrestrictedSyncer {
|
|||
}
|
||||
|
||||
let actualPath = document.relativePath;
|
||||
let mustCreate = false;
|
||||
|
||||
|
||||
if (isCreate === true) {
|
||||
if (isCreate) {
|
||||
// We have a file locally that got moved by another client to the same path as the one we're trying to create.
|
||||
// The server returns a merging update for the document ID that already exists locally (but at another path).
|
||||
// We have to merge these two documents by extending the provenance of the existing document and deleting
|
||||
|
|
@ -492,14 +484,18 @@ export class UnrestrictedSyncer {
|
|||
response.documentId
|
||||
);
|
||||
if (existingDocument !== undefined) {
|
||||
this.logger.info(`Merging document ${existingDocument.relativePath} into existing document ${document.relativePath} after concurrent move & creation`);
|
||||
this.logger.info(
|
||||
`Merging document ${existingDocument.relativePath} into existing document ${document.relativePath
|
||||
} after concurrent move & creation`
|
||||
);
|
||||
this.database.removeDocument(document); // this was a (fake) pending document
|
||||
if (!existingDocument.isDeleted) {
|
||||
this.operations.delete(document.relativePath);
|
||||
this.database.delete(existingDocument.relativePath); // make sure syncLocallyDeletedFile doesn't actually schedule deleting the new file
|
||||
await this.operations.delete(existingDocument.relativePath);
|
||||
}
|
||||
mustCreate = true;
|
||||
document = existingDocument;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// this can't happen on the creation path as we can only get a merging response if a document already exists remotely on the same path
|
||||
|
|
@ -516,12 +512,26 @@ export class UnrestrictedSyncer {
|
|||
); // this can throw FileNotFoundError
|
||||
}
|
||||
|
||||
|
||||
if (!("type" in response) || response.type === "MergingUpdate") {
|
||||
const responseBytes = base64ToBytes(response.contentBase64);
|
||||
contentHash = hash(responseBytes);
|
||||
|
||||
|
||||
|
||||
if (mustCreate) {
|
||||
this.database.createNewPendingDocument(actualPath);
|
||||
this.database.updateDocumentMetadata(
|
||||
{
|
||||
documentId: response.documentId,
|
||||
parentVersionId: response.vaultUpdateId,
|
||||
hash: contentHash,
|
||||
remoteRelativePath: response.relativePath
|
||||
},
|
||||
document
|
||||
);
|
||||
|
||||
await this.operations.create(actualPath, responseBytes);
|
||||
} else {
|
||||
this.database.updateDocumentMetadata(
|
||||
{
|
||||
documentId: response.documentId,
|
||||
|
|
@ -536,6 +546,7 @@ export class UnrestrictedSyncer {
|
|||
originalContentBytes,
|
||||
responseBytes
|
||||
);
|
||||
}
|
||||
await this.updateCache(
|
||||
response.vaultUpdateId,
|
||||
responseBytes,
|
||||
|
|
|
|||
|
|
@ -60,7 +60,8 @@ describe("withLock", () => {
|
|||
|
||||
await locks.waitForLock(testPath);
|
||||
|
||||
const promise = awaitAll([locks.withLock([testPath2, testPath3, testPath], async () => {
|
||||
const promise = awaitAll([
|
||||
locks.withLock([testPath2, testPath3, testPath], async () => {
|
||||
executionOrder.push("operation1-start");
|
||||
executionOrder.push("operation1-end");
|
||||
return "result1";
|
||||
|
|
@ -70,17 +71,19 @@ describe("withLock", () => {
|
|||
executionOrder.push("operation2-start");
|
||||
executionOrder.push("operation2-end");
|
||||
return "result2";
|
||||
})]);
|
||||
|
||||
})
|
||||
]);
|
||||
|
||||
locks.unlock(testPath);
|
||||
|
||||
const [result1, result2] = await Promise.race([promise, new Promise<never>((_, reject) => {
|
||||
const [result1, result2] = await Promise.race([
|
||||
promise,
|
||||
new Promise<never>((_, reject) => {
|
||||
setTimeout(() => {
|
||||
reject(new Error("Deadlock detected"));
|
||||
}, 1000);
|
||||
})]);
|
||||
|
||||
})
|
||||
]);
|
||||
|
||||
assert.strictEqual(result1, "result1");
|
||||
assert.strictEqual(result2, "result2");
|
||||
|
|
@ -243,6 +246,7 @@ describe("withLock", () => {
|
|||
|
||||
describe("reset", () => {
|
||||
const testPath: RelativePath = "test/document/path";
|
||||
const testPath2: RelativePath = "test/document/path2";
|
||||
const logger = new Logger();
|
||||
|
||||
// eslint-disable-next-line @typescript-eslint/init-declarations
|
||||
|
|
@ -298,4 +302,38 @@ describe("reset", () => {
|
|||
const result = await locks.withLock(testPath, () => "success");
|
||||
assert.strictEqual(result, "success");
|
||||
});
|
||||
|
||||
it("should release partially acquired locks when reset interrupts multi-key acquisition", async () => {
|
||||
// Hold testPath2 so multi-key acquisition will block on it
|
||||
await locks.waitForLock(testPath2);
|
||||
|
||||
// Start multi-key lock that will acquire testPath first, then block on testPath2
|
||||
const multiKeyPromise = locks.withLock(
|
||||
[testPath, testPath2],
|
||||
async () => "multi"
|
||||
);
|
||||
void multiKeyPromise.catch(() => {}); // eslint-disable-line @typescript-eslint/no-empty-function
|
||||
|
||||
// Wait for the multi-key operation to acquire testPath and start waiting on testPath2
|
||||
await sleep(10);
|
||||
|
||||
// Reset should reject the waiting operation
|
||||
locks.reset();
|
||||
|
||||
await assert.rejects(multiKeyPromise, (err: Error) => {
|
||||
assert.ok(err instanceof SyncResetError);
|
||||
return true;
|
||||
});
|
||||
|
||||
// The key that was already acquired (testPath) should now be released
|
||||
// This would hang/timeout if the lock was leaked
|
||||
const result = await Promise.race([
|
||||
locks.withLock(testPath, () => "success"),
|
||||
sleep(100).then(() => {
|
||||
throw new Error("Lock was not released - deadlock detected");
|
||||
})
|
||||
]);
|
||||
|
||||
assert.strictEqual(result, "success");
|
||||
});
|
||||
});
|
||||
|
|
|
|||
|
|
@ -1,6 +1,5 @@
|
|||
import { SyncResetError } from "../../errors/sync-reset-error";
|
||||
import type { Logger } from "../../tracing/logger";
|
||||
import { awaitAll } from "../await-all";
|
||||
|
||||
/**
|
||||
* Manages exclusive locks on items to prevent concurrent modifications.
|
||||
|
|
@ -8,15 +7,18 @@ import { awaitAll } from "../await-all";
|
|||
*
|
||||
* @template T The type of the key used for locking
|
||||
*/
|
||||
/** Waiter entry with callbacks */
|
||||
interface WaiterEntry<T> {
|
||||
resolve: () => unknown;
|
||||
reject: (err: unknown) => unknown;
|
||||
}
|
||||
|
||||
export class Locks<T> {
|
||||
/** Currently locked keys */
|
||||
private readonly locked = new Set<T>();
|
||||
|
||||
/** Queue of resolve functions waiting for each key */
|
||||
private readonly waiters = new Map<
|
||||
T,
|
||||
[() => unknown, (err: unknown) => unknown][]
|
||||
>();
|
||||
/** Queue of waiters for each key */
|
||||
private readonly waiters = new Map<T, WaiterEntry<T>[]>();
|
||||
|
||||
public constructor(private readonly logger?: Logger) { }
|
||||
|
||||
|
|
@ -59,15 +61,17 @@ export class Locks<T> {
|
|||
const uniqueKeys = Array.from(new Set(keys));
|
||||
uniqueKeys.sort((a, b) => String(a).localeCompare(String(b))); // Ensure consistent order to prevent deadlocks
|
||||
|
||||
const lockedKeys = [];
|
||||
try {
|
||||
for (const key of uniqueKeys) {
|
||||
// Must acquire locks in-order (not concurrently) to prevent deadlocks
|
||||
await this.waitForLock(key);
|
||||
lockedKeys.push(key);
|
||||
}
|
||||
|
||||
try {
|
||||
return await fn();
|
||||
} finally {
|
||||
uniqueKeys.forEach((key) => {
|
||||
lockedKeys.forEach((key) => {
|
||||
this.unlock(key);
|
||||
});
|
||||
}
|
||||
|
|
@ -77,7 +81,7 @@ export class Locks<T> {
|
|||
// Resolve all waiting promises before clearing to prevent deadlock
|
||||
// Any operation waiting for a lock will be granted access immediately
|
||||
for (const waiting of this.waiters.values()) {
|
||||
for (const [_, reject] of waiting) {
|
||||
for (const { reject } of waiting) {
|
||||
reject(new SyncResetError());
|
||||
}
|
||||
}
|
||||
|
|
@ -89,40 +93,6 @@ export class Locks<T> {
|
|||
return this.locked.has(key);
|
||||
}
|
||||
|
||||
public getDebugString(): string {
|
||||
const lockedKeys = Array.from(this.locked).map((key) => String(key));
|
||||
const waiterEntries = Array.from(this.waiters.entries()).filter(
|
||||
([_, waiting]) => waiting.length > 0
|
||||
);
|
||||
|
||||
const lines: string[] = [];
|
||||
lines.push("=== Locks Debug ===");
|
||||
lines.push(`Locked keys (${lockedKeys.length}):`);
|
||||
if (lockedKeys.length === 0) {
|
||||
lines.push(" (none)");
|
||||
} else {
|
||||
for (const key of lockedKeys) {
|
||||
const waiterCount =
|
||||
this.waiters.get(key as T)?.length ?? 0;
|
||||
lines.push(
|
||||
` - ${key}${waiterCount > 0 ? ` (${waiterCount} waiting)` : ""}`
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
lines.push(`Waiters (${waiterEntries.length} keys):`);
|
||||
if (waiterEntries.length === 0) {
|
||||
lines.push(" (none)");
|
||||
} else {
|
||||
for (const [key, waiting] of waiterEntries) {
|
||||
lines.push(` - ${String(key)}: ${waiting.length} waiting`);
|
||||
}
|
||||
}
|
||||
lines.push("===================");
|
||||
|
||||
return lines.join("\n");
|
||||
}
|
||||
|
||||
/**
|
||||
* Attempts to acquire a lock immediately without waiting.
|
||||
* Must call `unlock()` if successful.
|
||||
|
|
@ -162,11 +132,13 @@ export class Locks<T> {
|
|||
this.waiters.set(key, waiting);
|
||||
}
|
||||
|
||||
waiting.push([resolve, reject]);
|
||||
waiting.push({
|
||||
resolve,
|
||||
reject,
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Releases a lock and grants access to the next waiting operation in FIFO order.
|
||||
* Removes the key from locked set if no waiters.
|
||||
|
|
@ -176,15 +148,20 @@ export class Locks<T> {
|
|||
*/
|
||||
public unlock(key: T): void {
|
||||
if (!this.locked.has(key)) {
|
||||
this.logger?.debug(
|
||||
`Attempted to unlock ${key} which is not locked`
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
// Remove first waiter to ensure FIFO order
|
||||
const [resolveNextWaiting, _] = this.waiters.get(key)?.shift() ?? [];
|
||||
this.logger?.debug(`Releasing lock on ${key}`);
|
||||
|
||||
if (resolveNextWaiting) {
|
||||
// Remove first waiter to ensure FIFO order
|
||||
const nextWaiter = this.waiters.get(key)?.shift();
|
||||
|
||||
if (nextWaiter) {
|
||||
this.logger?.debug(`Granted lock on ${key}`);
|
||||
resolveNextWaiting();
|
||||
nextWaiter.resolve();
|
||||
} else {
|
||||
this.locked.delete(key);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -45,7 +45,6 @@ export class InMemoryFileSystem implements FileSystemOperations {
|
|||
return this.files.has(path);
|
||||
}
|
||||
|
||||
// eslint-disable-next-line @typescript-eslint/no-empty-function
|
||||
public async createDirectory(_path: RelativePath): Promise<void> {
|
||||
// This doesn't mean anything in our virtual FS representation
|
||||
}
|
||||
|
|
|
|||
|
|
@ -17,7 +17,7 @@ export class MockAgent extends MockClient {
|
|||
|
||||
// The renamed file finding algorithm isn't too smart so we can't both update and rename the same file
|
||||
private readonly doNotTouchWhileOffline: string[] = [];
|
||||
private lastSyncEnabledState: boolean = true;
|
||||
private lastSyncEnabledState = true;
|
||||
|
||||
public constructor(
|
||||
initialSettings: Partial<SyncSettings>,
|
||||
|
|
@ -107,14 +107,12 @@ export class MockAgent extends MockClient {
|
|||
public async waitUntilSynced(): Promise<void> {
|
||||
await withTimeout(
|
||||
(async (): Promise<void> => {
|
||||
this.client.setSetting("isSyncEnabled", true);
|
||||
await this.client.setSetting("isSyncEnabled", true);
|
||||
await this.client.waitUntilFinished();
|
||||
})(),
|
||||
TIMEOUT_MS,
|
||||
"waitUntilSynced()"
|
||||
);
|
||||
|
||||
|
||||
}
|
||||
|
||||
public async act(): Promise<void> {
|
||||
|
|
|
|||
|
|
@ -80,8 +80,6 @@ async function runTest({
|
|||
|
||||
await utils.awaitAll(clients.map(async (client) => client.init()));
|
||||
|
||||
|
||||
|
||||
for (let i = 0; i < iterations; i++) {
|
||||
logger.info(`Iteration ${i + 1}/${iterations}`);
|
||||
await utils.awaitAll(clients.map(async (client) => client.act()));
|
||||
|
|
@ -184,7 +182,7 @@ process.on("uncaughtException", (error) => {
|
|||
}
|
||||
|
||||
logger.error(`Error - uncaught exception: ${error}`);
|
||||
if (error instanceof Error && error.stack) {
|
||||
if (error instanceof Error && error.stack != null) {
|
||||
logger.error(error.stack);
|
||||
}
|
||||
process.exit(1);
|
||||
|
|
@ -215,7 +213,7 @@ process.on("unhandledRejection", (error, _promise) => {
|
|||
}
|
||||
|
||||
logger.error(`Error - unhandled rejection: ${error}`);
|
||||
if (error instanceof Error && error.stack) {
|
||||
if (error instanceof Error && error.stack != null) {
|
||||
logger.error(error.stack);
|
||||
}
|
||||
process.exit(1);
|
||||
|
|
@ -227,7 +225,7 @@ runTests()
|
|||
})
|
||||
.catch((error: unknown) => {
|
||||
logger.error(`Error - tests failed with ${error}`);
|
||||
if (error instanceof Error && error.stack) {
|
||||
if (error instanceof Error && error.stack != null) {
|
||||
logger.error(error.stack);
|
||||
}
|
||||
process.exit(1);
|
||||
|
|
|
|||
|
|
@ -1,4 +1,9 @@
|
|||
import { __debug_locks } from "sync-client";
|
||||
export class TimeoutError extends Error {
|
||||
public constructor(message: string) {
|
||||
super(message);
|
||||
this.name = "TimeoutError";
|
||||
}
|
||||
}
|
||||
|
||||
export async function withTimeout<T>(
|
||||
promise: Promise<T>,
|
||||
|
|
@ -10,16 +15,11 @@ export async function withTimeout<T>(
|
|||
new Promise<T>((_, reject) =>
|
||||
setTimeout(() => {
|
||||
reject(
|
||||
new TimeoutError(`${operationName} timed out after ${timeoutMs}ms ${__debug_locks.map(lock => lock.getDebugString()).join(", ")}`)
|
||||
new TimeoutError(
|
||||
`${operationName} timed out after ${timeoutMs}ms`
|
||||
)
|
||||
);
|
||||
}, timeoutMs)
|
||||
)
|
||||
]);
|
||||
}
|
||||
|
||||
export class TimeoutError extends Error {
|
||||
constructor(message: string) {
|
||||
super(message);
|
||||
this.name = "TimeoutError";
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue