Fix testing setup

This commit is contained in:
Andras Schmelczer 2026-04-01 22:36:22 +01:00
parent 0897f7a545
commit 3784418567
11 changed files with 266 additions and 119 deletions

View file

@ -24,6 +24,9 @@ Clients always start with syncing disabled.
- `barrier` — retry until all clients converge to identical file state (60s timeout) - `barrier` — retry until all clients converge to identical file state (60s timeout)
- `enable-sync` / `disable-sync` — simulate going online/offline - `enable-sync` / `disable-sync` — simulate going online/offline
**WebSocket control** (per-client):
- `pause-websocket` / `resume-websocket` — buffer/release WebSocket messages for a specific client
**Server control:** **Server control:**
- `pause-server` / `resume-server` — SIGSTOP/SIGCONT the server process - `pause-server` / `resume-server` — SIGSTOP/SIGCONT the server process

View file

@ -6,7 +6,7 @@ export const STOP_TIMEOUT_MS = 5_000;
export const CONVERGENCE_TIMEOUT_MS = 60_000; export const CONVERGENCE_TIMEOUT_MS = 60_000;
export const CONVERGENCE_RETRY_DELAY_MS = 500; export const CONVERGENCE_RETRY_DELAY_MS = 500;
export const AGENT_INIT_TIMEOUT_MS = 30_000; export const AGENT_INIT_TIMEOUT_MS = 30_000;
export const IS_SYNC_ENABLED_DEFAULT = false; export const IS_SYNC_ENABLED_BY_DEFAULT = false;
export const WAIT_TIMEOUT_MS = 60_000; export const WAIT_TIMEOUT_MS = 60_000;
export const WEBSOCKET_CONNECT_TIMEOUT_MS = 10_000; export const WEBSOCKET_CONNECT_TIMEOUT_MS = 10_000;

View file

@ -3,7 +3,8 @@ import { SyncClient, debugging, LogLevel } from "sync-client";
import { assert } from "./utils/assert"; import { assert } from "./utils/assert";
import { sleep } from "./utils/sleep"; import { sleep } from "./utils/sleep";
import { withTimeout } from "./utils/with-timeout"; import { withTimeout } from "./utils/with-timeout";
import { IS_SYNC_ENABLED_DEFAULT, WAIT_TIMEOUT_MS, WEBSOCKET_CONNECT_TIMEOUT_MS, WEBSOCKET_POLL_INTERVAL_MS } from "./consts"; import { IS_SYNC_ENABLED_BY_DEFAULT, WAIT_TIMEOUT_MS, WEBSOCKET_CONNECT_TIMEOUT_MS, WEBSOCKET_POLL_INTERVAL_MS } from "./consts";
import { ManagedWebSocketFactory } from "./managed-websocket";
@ -15,9 +16,10 @@ export class DeterministicAgent extends debugging.InMemoryFileSystem {
settings: Partial<SyncSettings>; settings: Partial<SyncSettings>;
database: Partial<StoredDatabase>; database: Partial<StoredDatabase>;
}> = {}; }> = {};
private isSyncEnabled = IS_SYNC_ENABLED_DEFAULT; private isSyncEnabled = IS_SYNC_ENABLED_BY_DEFAULT;
private readonly syncErrors: Error[] = []; private readonly syncErrors: Error[] = [];
private readonly pendingSyncOperations = new Set<Promise<void>>(); private readonly pendingSyncOperations = new Set<Promise<void>>();
private readonly wsFactory = new ManagedWebSocketFactory();
public constructor( public constructor(
clientId: number, clientId: number,
@ -32,7 +34,6 @@ export class DeterministicAgent extends debugging.InMemoryFileSystem {
public async init( public async init(
fetchImplementation: typeof globalThis.fetch, fetchImplementation: typeof globalThis.fetch,
webSocketImplementation: typeof globalThis.WebSocket
): Promise<void> { ): Promise<void> {
this.client = await SyncClient.create({ this.client = await SyncClient.create({
fs: this, fs: this,
@ -41,7 +42,7 @@ export class DeterministicAgent extends debugging.InMemoryFileSystem {
save: async (data) => void (this.data = data) save: async (data) => void (this.data = data)
}, },
fetch: fetchImplementation, fetch: fetchImplementation,
webSocket: webSocketImplementation webSocket: this.wsFactory.constructorFn
}); });
this.client.logger.onLogEmitted.add((line) => { this.client.logger.onLogEmitted.add((line) => {
@ -75,68 +76,14 @@ export class DeterministicAgent extends debugging.InMemoryFileSystem {
} }
} }
public async createFile(path: string, content: string): Promise<void> { public pauseWebSocket(): void {
this.log(`Creating file ${path} with content: ${content}`); this.log("Pausing WebSocket message delivery");
if (this.files.has(path)) { this.wsFactory.pause();
throw new Error(`File ${path} already exists`);
}
const contentBytes = new TextEncoder().encode(content);
this.files.set(path, contentBytes);
if (this.isSyncEnabled) {
this.enqueueSync(async () =>
this.client.syncLocallyCreatedFile(path)
);
}
} }
public async updateFile(path: string, content: string): Promise<void> { public resumeWebSocket(): void {
this.log(`Updating file ${path} with content: ${content}`); this.log("Resuming WebSocket message delivery");
if (!this.files.has(path)) { this.wsFactory.resume();
throw new Error(
`File ${path} does not exist on client ${this.clientId}`
);
}
const contentBytes = new TextEncoder().encode(content);
this.files.set(path, contentBytes);
if (this.isSyncEnabled) {
this.enqueueSync(async () =>
this.client.syncLocallyUpdatedFile({ relativePath: path })
);
}
}
public async renameFile(oldPath: string, newPath: string): Promise<void> {
this.log(`Renaming file ${oldPath} to ${newPath}`);
const file = this.files.get(oldPath);
if (!file) {
throw new Error(
`File ${oldPath} does not exist on client ${this.clientId}`
);
}
this.files.set(newPath, file);
if (oldPath !== newPath) {
this.files.delete(oldPath);
}
if (this.isSyncEnabled) {
this.enqueueSync(async () =>
this.client.syncLocallyUpdatedFile({
oldPath,
relativePath: newPath
})
);
}
}
public async deleteFile(path: string): Promise<void> {
this.log(`Deleting file ${path}`);
this.files.delete(path);
if (this.isSyncEnabled) {
this.enqueueSync(async () =>
this.client.syncLocallyDeletedFile(path)
);
}
} }
public async waitForSync(): Promise<void> { public async waitForSync(): Promise<void> {
@ -191,9 +138,6 @@ export class DeterministicAgent extends debugging.InMemoryFileSystem {
await this.waitForWebSocket(); await this.waitForWebSocket();
} }
public async getFiles(): Promise<RelativePath[]> {
return this.listFilesRecursively();
}
public async getFileContent(path: string): Promise<string> { public async getFileContent(path: string): Promise<string> {
const bytes = await this.read(path); const bytes = await this.read(path);
@ -226,10 +170,6 @@ export class DeterministicAgent extends debugging.InMemoryFileSystem {
this.log("Cleanup complete"); this.log("Cleanup complete");
} }
// Yield the event loop before each FS operation so that the SyncClient's
// async calls create real interleaving points, matching the behavior of
// actual disk I/O. Without this, all FS operations resolve in the same
// microtask, hiding concurrency bugs that only manifest with real latency.
public override async read(path: RelativePath): Promise<Uint8Array> { public override async read(path: RelativePath): Promise<Uint8Array> {
await Promise.resolve(); await Promise.resolve();
return super.read(path); return super.read(path);
@ -240,33 +180,50 @@ export class DeterministicAgent extends debugging.InMemoryFileSystem {
content: Uint8Array content: Uint8Array
): Promise<void> { ): Promise<void> {
await Promise.resolve(); await Promise.resolve();
return super.write(path, content); const isNew = !this.files.has(path);
await super.write(path, content);
if (isNew) {
this.enqueueSync(async () => this.client.syncLocallyCreatedFile(path)
);
} else {
this.enqueueSync(async () => this.client.syncLocallyUpdatedFile({ relativePath: path })
);
}
} }
public override async atomicUpdateText( public override async atomicUpdateText(
path: RelativePath, path: RelativePath,
updater: (current: TextWithCursors) => TextWithCursors updater: (current: TextWithCursors) => TextWithCursors
): Promise<string> { ): Promise<string> {
await Promise.resolve(); const result = await super.atomicUpdateText(path, updater);
return super.atomicUpdateText(path, updater); this.enqueueSync(async () => this.client.syncLocallyUpdatedFile({ relativePath: path })
);
return result;
} }
public override async exists(path: RelativePath): Promise<boolean> {
await Promise.resolve();
return super.exists(path);
}
public override async delete(path: RelativePath): Promise<void> { public override async delete(path: RelativePath): Promise<void> {
await Promise.resolve(); await super.delete(path);
return super.delete(path); if (this.isSyncEnabled) {
this.enqueueSync(async () => { this.client.syncLocallyDeletedFile(path); }
);
}
} }
public override async rename( public override async rename(
oldPath: RelativePath, oldPath: RelativePath,
newPath: RelativePath newPath: RelativePath
): Promise<void> { ): Promise<void> {
await Promise.resolve(); await super.rename(oldPath, newPath);
return super.rename(oldPath, newPath); this.enqueueSync(async () => {
this.client.syncLocallyUpdatedFile({
oldPath,
relativePath: newPath
});
}
);
} }
private async waitForWebSocket(): Promise<void> { private async waitForWebSocket(): Promise<void> {

View file

@ -0,0 +1,170 @@
/**
* A WebSocket wrapper that can pause and resume message delivery.
* When paused, incoming messages are buffered. When resumed, buffered
* messages are delivered in order via the onmessage handler.
*/
export class ManagedWebSocket implements WebSocket {
private readonly ws: WebSocket;
private paused = false;
private readonly bufferedMessages: MessageEvent[] = [];
private externalOnMessage: ((event: MessageEvent) => unknown) | null = null;
public constructor(url: string | URL, protocols?: string | string[]) {
this.ws = new WebSocket(url, protocols);
this.ws.onmessage = (event: MessageEvent): void => {
if (this.paused) {
this.bufferedMessages.push(event);
} else {
this.externalOnMessage?.(event);
}
};
}
public pause(): void {
this.paused = true;
}
public resume(): void {
this.paused = false;
const messages = this.bufferedMessages.splice(0);
for (const msg of messages) {
this.externalOnMessage?.(msg);
}
}
get readyState(): number {
return this.ws.readyState;
}
get url(): string {
return this.ws.url;
}
get protocol(): string {
return this.ws.protocol;
}
get extensions(): string {
return this.ws.extensions;
}
get bufferedAmount(): number {
return this.ws.bufferedAmount;
}
get binaryType(): BinaryType {
return this.ws.binaryType;
}
set binaryType(value: BinaryType) {
this.ws.binaryType = value;
}
get onopen(): ((this: WebSocket, ev: Event) => unknown) | null {
return this.ws.onopen;
}
set onopen(handler: ((this: WebSocket, ev: Event) => unknown) | null) {
this.ws.onopen = handler;
}
get onclose(): ((this: WebSocket, ev: CloseEvent) => unknown) | null {
return this.ws.onclose;
}
set onclose(handler: ((this: WebSocket, ev: CloseEvent) => unknown) | null) {
this.ws.onclose = handler;
}
get onerror(): ((this: WebSocket, ev: Event) => unknown) | null {
return this.ws.onerror;
}
set onerror(handler: ((this: WebSocket, ev: Event) => unknown) | null) {
this.ws.onerror = handler;
}
get onmessage(): ((this: WebSocket, ev: MessageEvent) => unknown) | null {
return this.externalOnMessage;
}
set onmessage(
handler: ((this: WebSocket, ev: MessageEvent) => unknown) | null
) {
this.externalOnMessage = handler;
}
public send(data: string | ArrayBufferLike | Blob | ArrayBufferView): void {
this.ws.send(data);
}
public close(code?: number, reason?: string): void {
this.ws.close(code, reason);
}
public addEventListener(
...args: Parameters<WebSocket["addEventListener"]>
): void {
this.ws.addEventListener(...args);
}
public removeEventListener(
...args: Parameters<WebSocket["removeEventListener"]>
): void {
this.ws.removeEventListener(...args);
}
public dispatchEvent(event: Event): boolean {
return this.ws.dispatchEvent(event);
}
static readonly CONNECTING = WebSocket.CONNECTING;
static readonly OPEN = WebSocket.OPEN;
static readonly CLOSING = WebSocket.CLOSING;
static readonly CLOSED = WebSocket.CLOSED;
readonly CONNECTING = WebSocket.CONNECTING;
readonly OPEN = WebSocket.OPEN;
readonly CLOSING = WebSocket.CLOSING;
readonly CLOSED = WebSocket.CLOSED;
}
/**
* Factory that creates ManagedWebSocket instances and tracks them
* for pause/resume control from the test harness
*/
export class ManagedWebSocketFactory {
private readonly instances: ManagedWebSocket[] = [];
public get constructorFn(): typeof globalThis.WebSocket {
const factory = this;
const ctor = function ManagedWS(
url: string | URL,
protocols?: string | string[]
): ManagedWebSocket {
const ws = new ManagedWebSocket(url, protocols);
factory.instances.push(ws);
return ws;
} as unknown as typeof globalThis.WebSocket;
Object.defineProperty(ctor, "CONNECTING", { value: WebSocket.CONNECTING });
Object.defineProperty(ctor, "OPEN", { value: WebSocket.OPEN });
Object.defineProperty(ctor, "CLOSING", { value: WebSocket.CLOSING });
Object.defineProperty(ctor, "CLOSED", { value: WebSocket.CLOSED });
return ctor;
}
public pause(): void {
for (const ws of this.instances) {
ws.pause();
}
}
public resume(): void {
for (const ws of this.instances) {
ws.resume();
}
}
}

View file

@ -40,8 +40,10 @@ export class ServerControl {
const reservation = await findFreePort(); const reservation = await findFreePort();
this._port = reservation.port; this._port = reservation.port;
// Prefer tmpfs (/host/tmp) over disk-backed /tmp for faster SQLite I/O
const tmpBase = fs.existsSync("/host/tmp") ? "/host/tmp" : os.tmpdir();
this.tempDir = fs.mkdtempSync( this.tempDir = fs.mkdtempSync(
path.join(os.tmpdir(), "vault-link-test-") path.join(tmpBase, "vault-link-test-")
); );
const tempConfigPath = path.join(this.tempDir, "config.yml"); const tempConfigPath = path.join(this.tempDir, "config.yml");
const dbDir = path.join(this.tempDir, "databases"); const dbDir = path.join(this.tempDir, "databases");

View file

@ -16,7 +16,9 @@ export type TestStep =
| { type: "pause-server" } | { type: "pause-server" }
| { type: "resume-server" } | { type: "resume-server" }
| { type: "barrier" } | { type: "barrier" }
| { type: "assert-consistent"; verify?: (state: AssertableState) => void }; | { type: "assert-consistent"; verify?: (state: AssertableState) => void }
| { type: "pause-websocket"; client: number }
| { type: "resume-websocket"; client: number };
export interface TestDefinition { export interface TestDefinition {
description?: string; description?: string;

View file

@ -65,6 +65,7 @@ import { createRenameResponseSkipsFileTest } from "./tests/create-rename-respons
import { onlineCreateRenameConcurrentCreateOrphanTest } from "./tests/online-create-rename-concurrent-create-orphan.test"; import { onlineCreateRenameConcurrentCreateOrphanTest } from "./tests/online-create-rename-concurrent-create-orphan.test";
import { concurrentRenameFirstWinsTest } from "./tests/concurrent-rename-first-wins.test"; import { concurrentRenameFirstWinsTest } from "./tests/concurrent-rename-first-wins.test";
import { binaryToTextTransitionTest } from "./tests/binary-to-text-transition.test"; import { binaryToTextTransitionTest } from "./tests/binary-to-text-transition.test";
import { updateThenRenameContentLostTest } from "./tests/update-then-rename-content-lost.test";
export const TESTS: Partial<Record<string, TestDefinition>> = { export const TESTS: Partial<Record<string, TestDefinition>> = {
"rename-create-conflict": renameCreateConflictTest, "rename-create-conflict": renameCreateConflictTest,
@ -133,4 +134,5 @@ export const TESTS: Partial<Record<string, TestDefinition>> = {
"online-create-rename-concurrent-create-orphan": onlineCreateRenameConcurrentCreateOrphanTest, "online-create-rename-concurrent-create-orphan": onlineCreateRenameConcurrentCreateOrphanTest,
"concurrent-rename-first-wins": concurrentRenameFirstWinsTest, "concurrent-rename-first-wins": concurrentRenameFirstWinsTest,
"binary-to-text-transition": binaryToTextTransitionTest, "binary-to-text-transition": binaryToTextTransitionTest,
"update-then-rename-content-lost": updateThenRenameContentLostTest,
}; };

View file

@ -14,7 +14,7 @@ import {
CONVERGENCE_TIMEOUT_MS, CONVERGENCE_TIMEOUT_MS,
CONVERGENCE_RETRY_DELAY_MS, CONVERGENCE_RETRY_DELAY_MS,
AGENT_INIT_TIMEOUT_MS, AGENT_INIT_TIMEOUT_MS,
IS_SYNC_ENABLED_DEFAULT IS_SYNC_ENABLED_BY_DEFAULT
} from "./consts"; } from "./consts";
import { randomUUID } from "node:crypto"; import { randomUUID } from "node:crypto";
@ -100,7 +100,7 @@ export class TestRunner {
for (let i = 0; i < count; i++) { for (let i = 0; i < count; i++) {
const settings: Partial<SyncSettings> = { const settings: Partial<SyncSettings> = {
isSyncEnabled: IS_SYNC_ENABLED_DEFAULT, isSyncEnabled: IS_SYNC_ENABLED_BY_DEFAULT,
token: this.token, token: this.token,
vaultName, vaultName,
remoteUri: this.remoteUri remoteUri: this.remoteUri
@ -115,8 +115,6 @@ export class TestRunner {
await withTimeout( await withTimeout(
agent.init( agent.init(
fetch, fetch,
// eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion
WebSocket as unknown as typeof globalThis.WebSocket
), ),
AGENT_INIT_TIMEOUT_MS, AGENT_INIT_TIMEOUT_MS,
`Client ${i} init timed out after ${AGENT_INIT_TIMEOUT_MS}ms` `Client ${i} init timed out after ${AGENT_INIT_TIMEOUT_MS}ms`
@ -138,28 +136,22 @@ export class TestRunner {
private async executeStep(step: TestStep): Promise<void> { private async executeStep(step: TestStep): Promise<void> {
switch (step.type) { switch (step.type) {
case "create": case "create":
await this.getAgent(step.client).createFile(
step.path,
step.content
);
break;
case "update": case "update":
await this.getAgent(step.client).updateFile( await this.getAgent(step.client).write(
step.path, step.path,
step.content new TextEncoder().encode(step.content)
); );
break; break;
case "rename": case "rename":
await this.getAgent(step.client).renameFile( await this.getAgent(step.client).rename(
step.oldPath, step.oldPath,
step.newPath step.newPath
); );
break; break;
case "delete": case "delete":
await this.getAgent(step.client).deleteFile(step.path); await this.getAgent(step.client).delete(step.path);
break; break;
case "sync": case "sync":
@ -199,6 +191,14 @@ export class TestRunner {
await this.assertConsistent(step.verify); await this.assertConsistent(step.verify);
break; break;
case "pause-websocket":
this.getAgent(step.client).pauseWebSocket();
break;
case "resume-websocket":
this.getAgent(step.client).resumeWebSocket();
break;
default: { default: {
const unknownStep = step as { type: string }; const unknownStep = step as { type: string };
throw new Error(`Unknown step type: ${unknownStep.type}`); throw new Error(`Unknown step type: ${unknownStep.type}`);
@ -282,7 +282,7 @@ export class TestRunner {
// where background sync could mutate state between reads. // where background sync could mutate state between reads.
const clientFiles: Map<string, string>[] = []; const clientFiles: Map<string, string>[] = [];
for (const agent of this.agents) { for (const agent of this.agents) {
const sortedFiles = (await agent.getFiles()).sort(); const sortedFiles = (await agent.listFilesRecursively()).sort();
const fileMap = new Map<string, string>(); const fileMap = new Map<string, string>();
for (const file of sortedFiles) { for (const file of sortedFiles) {
const content = await agent.getFileContent(file); const content = await agent.getFileContent(file);

View file

@ -333,16 +333,19 @@ export class MockAgent extends MockClient {
.includes(content); .includes(content);
}); });
if ( if (!this.useSlowFileEvents) {
!this.useSlowFileEvents
) {
assert( assert(
found.length <= 1, found.length <= 1,
`[${this.name}] Binary content ${content} found in multiple files: ${found.join(", ")}` `[${this.name}] Binary content ${content} found in multiple files: ${found.join(", ")}`
); );
} }
if (!this.useSlowFileEvents && !this.doDeletes) {
assert(
found.length >= 1,
`[${this.name}] Binary content ${content} not found in any files`
);
}
} }
} }
@ -510,9 +513,7 @@ export class MockAgent extends MockClient {
`Decided to update binary file ${file}` `Decided to update binary file ${file}`
); );
this.doNotTouchWhileOffline.push(file); this.doNotTouchWhileOffline.push(file);
this.files.set(file, bytes); await this.write(file, bytes);
} }
private async deleteFileAction(): Promise<void> { private async deleteFileAction(): Promise<void> {

View file

@ -45,10 +45,18 @@ export class MockClient extends debugging.InMemoryFileSystem {
path: RelativePath, path: RelativePath,
content: Uint8Array content: Uint8Array
): Promise<void> { ): Promise<void> {
const isNew = !this.files.has(path);
this.files.set(path, content); this.files.set(path, content);
this.executeFileOperation(
async () => this.client.syncLocallyUpdatedFile({ relativePath: path }), if (isNew) {
); this.executeFileOperation(async () => { this.client.syncLocallyCreatedFile(path); }
);
} else {
this.executeFileOperation(
async () => { this.client.syncLocallyUpdatedFile({ relativePath: path }); },
);
}
} }
@ -66,7 +74,7 @@ export class MockClient extends debugging.InMemoryFileSystem {
this.files.set(path, newContentUint8Array); this.files.set(path, newContentUint8Array);
this.executeFileOperation( this.executeFileOperation(
async () => this.client.syncLocallyUpdatedFile({ relativePath: path }), async () => { this.client.syncLocallyUpdatedFile({ relativePath: path }); },
); );
return newContent; return newContent;
@ -77,7 +85,7 @@ export class MockClient extends debugging.InMemoryFileSystem {
public override async delete(path: RelativePath): Promise<void> { public override async delete(path: RelativePath): Promise<void> {
this.files.delete(path); this.files.delete(path);
this.executeFileOperation( this.executeFileOperation(
async () => this.client.syncLocallyDeletedFile(path), async () => { this.client.syncLocallyDeletedFile(path); },
); );
} }
@ -94,10 +102,10 @@ export class MockClient extends debugging.InMemoryFileSystem {
this.files.delete(oldPath); this.files.delete(oldPath);
} }
this.executeFileOperation( this.executeFileOperation(
async () => this.client.syncLocallyUpdatedFile({ async () => { this.client.syncLocallyUpdatedFile({
oldPath, oldPath,
relativePath: newPath relativePath: newPath
}), }); },
); );
} }

View file

@ -104,11 +104,8 @@ async function runTest({
} }
} }
// Settling rounds to drain cascading broadcasts between agents. // Settling rounds: drain cascading broadcasts between agents
// Completing work on agent A can trigger broadcasts to agent B, for (let round = 0; round < 10; round++) {
// which can cascade further. With N agents the worst case is N
// hops, so N+1 passes guarantees all cascades are drained.
for (let round = 0; round <= clients.length; round++) {
for (const client of clients) { for (const client of clients) {
try { try {
await client.waitUntilSynced(); await client.waitUntilSynced();
@ -118,8 +115,13 @@ async function runTest({
} }
} }
} }
// TODO: it's very ugly, let's remove this
await sleep(2000);
} }
for (const client of clients) { for (const client of clients) {
try { try {
logger.info(`Destroying ${client.name}`); logger.info(`Destroying ${client.name}`);