diff --git a/frontend/deterministic-tests/README.md b/frontend/deterministic-tests/README.md index 5c835326..678cd0fe 100644 --- a/frontend/deterministic-tests/README.md +++ b/frontend/deterministic-tests/README.md @@ -24,6 +24,9 @@ Clients always start with syncing disabled. - `barrier` — retry until all clients converge to identical file state (60s timeout) - `enable-sync` / `disable-sync` — simulate going online/offline +**WebSocket control** (per-client): +- `pause-websocket` / `resume-websocket` — buffer/release WebSocket messages for a specific client + **Server control:** - `pause-server` / `resume-server` — SIGSTOP/SIGCONT the server process diff --git a/frontend/deterministic-tests/src/consts.ts b/frontend/deterministic-tests/src/consts.ts index 32c03efa..a04c9b61 100644 --- a/frontend/deterministic-tests/src/consts.ts +++ b/frontend/deterministic-tests/src/consts.ts @@ -6,7 +6,7 @@ export const STOP_TIMEOUT_MS = 5_000; export const CONVERGENCE_TIMEOUT_MS = 60_000; export const CONVERGENCE_RETRY_DELAY_MS = 500; export const AGENT_INIT_TIMEOUT_MS = 30_000; -export const IS_SYNC_ENABLED_DEFAULT = false; +export const IS_SYNC_ENABLED_BY_DEFAULT = false; export const WAIT_TIMEOUT_MS = 60_000; export const WEBSOCKET_CONNECT_TIMEOUT_MS = 10_000; diff --git a/frontend/deterministic-tests/src/deterministic-agent.ts b/frontend/deterministic-tests/src/deterministic-agent.ts index 136d5ed8..00020908 100644 --- a/frontend/deterministic-tests/src/deterministic-agent.ts +++ b/frontend/deterministic-tests/src/deterministic-agent.ts @@ -3,7 +3,8 @@ import { SyncClient, debugging, LogLevel } from "sync-client"; import { assert } from "./utils/assert"; import { sleep } from "./utils/sleep"; import { withTimeout } from "./utils/with-timeout"; -import { IS_SYNC_ENABLED_DEFAULT, WAIT_TIMEOUT_MS, WEBSOCKET_CONNECT_TIMEOUT_MS, WEBSOCKET_POLL_INTERVAL_MS } from "./consts"; +import { IS_SYNC_ENABLED_BY_DEFAULT, WAIT_TIMEOUT_MS, WEBSOCKET_CONNECT_TIMEOUT_MS, WEBSOCKET_POLL_INTERVAL_MS } from "./consts"; +import { ManagedWebSocketFactory } from "./managed-websocket"; @@ -15,9 +16,10 @@ export class DeterministicAgent extends debugging.InMemoryFileSystem { settings: Partial; database: Partial; }> = {}; - private isSyncEnabled = IS_SYNC_ENABLED_DEFAULT; + private isSyncEnabled = IS_SYNC_ENABLED_BY_DEFAULT; private readonly syncErrors: Error[] = []; private readonly pendingSyncOperations = new Set>(); + private readonly wsFactory = new ManagedWebSocketFactory(); public constructor( clientId: number, @@ -32,7 +34,6 @@ export class DeterministicAgent extends debugging.InMemoryFileSystem { public async init( fetchImplementation: typeof globalThis.fetch, - webSocketImplementation: typeof globalThis.WebSocket ): Promise { this.client = await SyncClient.create({ fs: this, @@ -41,7 +42,7 @@ export class DeterministicAgent extends debugging.InMemoryFileSystem { save: async (data) => void (this.data = data) }, fetch: fetchImplementation, - webSocket: webSocketImplementation + webSocket: this.wsFactory.constructorFn }); this.client.logger.onLogEmitted.add((line) => { @@ -75,68 +76,14 @@ export class DeterministicAgent extends debugging.InMemoryFileSystem { } } - public async createFile(path: string, content: string): Promise { - this.log(`Creating file ${path} with content: ${content}`); - if (this.files.has(path)) { - throw new Error(`File ${path} already exists`); - } - const contentBytes = new TextEncoder().encode(content); - this.files.set(path, contentBytes); - - if (this.isSyncEnabled) { - this.enqueueSync(async () => - this.client.syncLocallyCreatedFile(path) - ); - } + public pauseWebSocket(): void { + this.log("Pausing WebSocket message delivery"); + this.wsFactory.pause(); } - public async updateFile(path: string, content: string): Promise { - this.log(`Updating file ${path} with content: ${content}`); - if (!this.files.has(path)) { - throw new Error( - `File ${path} does not exist on client ${this.clientId}` - ); - } - const contentBytes = new TextEncoder().encode(content); - this.files.set(path, contentBytes); - - if (this.isSyncEnabled) { - this.enqueueSync(async () => - this.client.syncLocallyUpdatedFile({ relativePath: path }) - ); - } - } - - public async renameFile(oldPath: string, newPath: string): Promise { - this.log(`Renaming file ${oldPath} to ${newPath}`); - const file = this.files.get(oldPath); - if (!file) { - throw new Error( - `File ${oldPath} does not exist on client ${this.clientId}` - ); - } - this.files.set(newPath, file); - if (oldPath !== newPath) { - this.files.delete(oldPath); - } - if (this.isSyncEnabled) { - this.enqueueSync(async () => - this.client.syncLocallyUpdatedFile({ - oldPath, - relativePath: newPath - }) - ); - } - } - - public async deleteFile(path: string): Promise { - this.log(`Deleting file ${path}`); - this.files.delete(path); - if (this.isSyncEnabled) { - this.enqueueSync(async () => - this.client.syncLocallyDeletedFile(path) - ); - } + public resumeWebSocket(): void { + this.log("Resuming WebSocket message delivery"); + this.wsFactory.resume(); } public async waitForSync(): Promise { @@ -191,9 +138,6 @@ export class DeterministicAgent extends debugging.InMemoryFileSystem { await this.waitForWebSocket(); } - public async getFiles(): Promise { - return this.listFilesRecursively(); - } public async getFileContent(path: string): Promise { const bytes = await this.read(path); @@ -226,10 +170,6 @@ export class DeterministicAgent extends debugging.InMemoryFileSystem { this.log("Cleanup complete"); } - // Yield the event loop before each FS operation so that the SyncClient's - // async calls create real interleaving points, matching the behavior of - // actual disk I/O. Without this, all FS operations resolve in the same - // microtask, hiding concurrency bugs that only manifest with real latency. public override async read(path: RelativePath): Promise { await Promise.resolve(); return super.read(path); @@ -240,33 +180,50 @@ export class DeterministicAgent extends debugging.InMemoryFileSystem { content: Uint8Array ): Promise { await Promise.resolve(); - return super.write(path, content); + const isNew = !this.files.has(path); + await super.write(path, content); + + if (isNew) { + this.enqueueSync(async () => this.client.syncLocallyCreatedFile(path) + ); + } else { + this.enqueueSync(async () => this.client.syncLocallyUpdatedFile({ relativePath: path }) + ); + } } public override async atomicUpdateText( path: RelativePath, updater: (current: TextWithCursors) => TextWithCursors ): Promise { - await Promise.resolve(); - return super.atomicUpdateText(path, updater); + const result = await super.atomicUpdateText(path, updater); + this.enqueueSync(async () => this.client.syncLocallyUpdatedFile({ relativePath: path }) + ); + return result; + } - public override async exists(path: RelativePath): Promise { - await Promise.resolve(); - return super.exists(path); - } public override async delete(path: RelativePath): Promise { - await Promise.resolve(); - return super.delete(path); + await super.delete(path); + if (this.isSyncEnabled) { + this.enqueueSync(async () => { this.client.syncLocallyDeletedFile(path); } + ); + } } public override async rename( oldPath: RelativePath, newPath: RelativePath ): Promise { - await Promise.resolve(); - return super.rename(oldPath, newPath); + await super.rename(oldPath, newPath); + this.enqueueSync(async () => { + this.client.syncLocallyUpdatedFile({ + oldPath, + relativePath: newPath + }); + } + ); } private async waitForWebSocket(): Promise { diff --git a/frontend/deterministic-tests/src/managed-websocket.ts b/frontend/deterministic-tests/src/managed-websocket.ts new file mode 100644 index 00000000..c09b44d7 --- /dev/null +++ b/frontend/deterministic-tests/src/managed-websocket.ts @@ -0,0 +1,170 @@ +/** + * A WebSocket wrapper that can pause and resume message delivery. + * When paused, incoming messages are buffered. When resumed, buffered + * messages are delivered in order via the onmessage handler. + */ +export class ManagedWebSocket implements WebSocket { + private readonly ws: WebSocket; + private paused = false; + private readonly bufferedMessages: MessageEvent[] = []; + private externalOnMessage: ((event: MessageEvent) => unknown) | null = null; + + public constructor(url: string | URL, protocols?: string | string[]) { + this.ws = new WebSocket(url, protocols); + + this.ws.onmessage = (event: MessageEvent): void => { + if (this.paused) { + this.bufferedMessages.push(event); + } else { + this.externalOnMessage?.(event); + } + }; + } + + public pause(): void { + this.paused = true; + } + + public resume(): void { + this.paused = false; + const messages = this.bufferedMessages.splice(0); + for (const msg of messages) { + this.externalOnMessage?.(msg); + } + } + + get readyState(): number { + return this.ws.readyState; + } + + get url(): string { + return this.ws.url; + } + + get protocol(): string { + return this.ws.protocol; + } + + get extensions(): string { + return this.ws.extensions; + } + + get bufferedAmount(): number { + return this.ws.bufferedAmount; + } + + get binaryType(): BinaryType { + return this.ws.binaryType; + } + + set binaryType(value: BinaryType) { + this.ws.binaryType = value; + } + + get onopen(): ((this: WebSocket, ev: Event) => unknown) | null { + return this.ws.onopen; + } + + set onopen(handler: ((this: WebSocket, ev: Event) => unknown) | null) { + this.ws.onopen = handler; + } + + get onclose(): ((this: WebSocket, ev: CloseEvent) => unknown) | null { + return this.ws.onclose; + } + + set onclose(handler: ((this: WebSocket, ev: CloseEvent) => unknown) | null) { + this.ws.onclose = handler; + } + + get onerror(): ((this: WebSocket, ev: Event) => unknown) | null { + return this.ws.onerror; + } + + set onerror(handler: ((this: WebSocket, ev: Event) => unknown) | null) { + this.ws.onerror = handler; + } + + get onmessage(): ((this: WebSocket, ev: MessageEvent) => unknown) | null { + return this.externalOnMessage; + } + + set onmessage( + handler: ((this: WebSocket, ev: MessageEvent) => unknown) | null + ) { + this.externalOnMessage = handler; + } + + public send(data: string | ArrayBufferLike | Blob | ArrayBufferView): void { + this.ws.send(data); + } + + public close(code?: number, reason?: string): void { + this.ws.close(code, reason); + } + + public addEventListener( + ...args: Parameters + ): void { + this.ws.addEventListener(...args); + } + + public removeEventListener( + ...args: Parameters + ): void { + this.ws.removeEventListener(...args); + } + + public dispatchEvent(event: Event): boolean { + return this.ws.dispatchEvent(event); + } + + static readonly CONNECTING = WebSocket.CONNECTING; + static readonly OPEN = WebSocket.OPEN; + static readonly CLOSING = WebSocket.CLOSING; + static readonly CLOSED = WebSocket.CLOSED; + + readonly CONNECTING = WebSocket.CONNECTING; + readonly OPEN = WebSocket.OPEN; + readonly CLOSING = WebSocket.CLOSING; + readonly CLOSED = WebSocket.CLOSED; +} + +/** + * Factory that creates ManagedWebSocket instances and tracks them + * for pause/resume control from the test harness + */ +export class ManagedWebSocketFactory { + private readonly instances: ManagedWebSocket[] = []; + + public get constructorFn(): typeof globalThis.WebSocket { + const factory = this; + const ctor = function ManagedWS( + url: string | URL, + protocols?: string | string[] + ): ManagedWebSocket { + const ws = new ManagedWebSocket(url, protocols); + factory.instances.push(ws); + return ws; + } as unknown as typeof globalThis.WebSocket; + + Object.defineProperty(ctor, "CONNECTING", { value: WebSocket.CONNECTING }); + Object.defineProperty(ctor, "OPEN", { value: WebSocket.OPEN }); + Object.defineProperty(ctor, "CLOSING", { value: WebSocket.CLOSING }); + Object.defineProperty(ctor, "CLOSED", { value: WebSocket.CLOSED }); + + return ctor; + } + + public pause(): void { + for (const ws of this.instances) { + ws.pause(); + } + } + + public resume(): void { + for (const ws of this.instances) { + ws.resume(); + } + } +} diff --git a/frontend/deterministic-tests/src/server-control.ts b/frontend/deterministic-tests/src/server-control.ts index c2d353db..de0dbe4b 100644 --- a/frontend/deterministic-tests/src/server-control.ts +++ b/frontend/deterministic-tests/src/server-control.ts @@ -40,8 +40,10 @@ export class ServerControl { const reservation = await findFreePort(); this._port = reservation.port; + // Prefer tmpfs (/host/tmp) over disk-backed /tmp for faster SQLite I/O + const tmpBase = fs.existsSync("/host/tmp") ? "/host/tmp" : os.tmpdir(); this.tempDir = fs.mkdtempSync( - path.join(os.tmpdir(), "vault-link-test-") + path.join(tmpBase, "vault-link-test-") ); const tempConfigPath = path.join(this.tempDir, "config.yml"); const dbDir = path.join(this.tempDir, "databases"); diff --git a/frontend/deterministic-tests/src/test-definition.ts b/frontend/deterministic-tests/src/test-definition.ts index f8dac1fe..826c6014 100644 --- a/frontend/deterministic-tests/src/test-definition.ts +++ b/frontend/deterministic-tests/src/test-definition.ts @@ -16,7 +16,9 @@ export type TestStep = | { type: "pause-server" } | { type: "resume-server" } | { type: "barrier" } - | { type: "assert-consistent"; verify?: (state: AssertableState) => void }; + | { type: "assert-consistent"; verify?: (state: AssertableState) => void } + | { type: "pause-websocket"; client: number } + | { type: "resume-websocket"; client: number }; export interface TestDefinition { description?: string; diff --git a/frontend/deterministic-tests/src/test-registry.ts b/frontend/deterministic-tests/src/test-registry.ts index 0785926b..35a00c9c 100644 --- a/frontend/deterministic-tests/src/test-registry.ts +++ b/frontend/deterministic-tests/src/test-registry.ts @@ -65,6 +65,7 @@ import { createRenameResponseSkipsFileTest } from "./tests/create-rename-respons import { onlineCreateRenameConcurrentCreateOrphanTest } from "./tests/online-create-rename-concurrent-create-orphan.test"; import { concurrentRenameFirstWinsTest } from "./tests/concurrent-rename-first-wins.test"; import { binaryToTextTransitionTest } from "./tests/binary-to-text-transition.test"; +import { updateThenRenameContentLostTest } from "./tests/update-then-rename-content-lost.test"; export const TESTS: Partial> = { "rename-create-conflict": renameCreateConflictTest, @@ -133,4 +134,5 @@ export const TESTS: Partial> = { "online-create-rename-concurrent-create-orphan": onlineCreateRenameConcurrentCreateOrphanTest, "concurrent-rename-first-wins": concurrentRenameFirstWinsTest, "binary-to-text-transition": binaryToTextTransitionTest, + "update-then-rename-content-lost": updateThenRenameContentLostTest, }; diff --git a/frontend/deterministic-tests/src/test-runner.ts b/frontend/deterministic-tests/src/test-runner.ts index 05ac1611..2d469fa2 100644 --- a/frontend/deterministic-tests/src/test-runner.ts +++ b/frontend/deterministic-tests/src/test-runner.ts @@ -14,7 +14,7 @@ import { CONVERGENCE_TIMEOUT_MS, CONVERGENCE_RETRY_DELAY_MS, AGENT_INIT_TIMEOUT_MS, - IS_SYNC_ENABLED_DEFAULT + IS_SYNC_ENABLED_BY_DEFAULT } from "./consts"; import { randomUUID } from "node:crypto"; @@ -100,7 +100,7 @@ export class TestRunner { for (let i = 0; i < count; i++) { const settings: Partial = { - isSyncEnabled: IS_SYNC_ENABLED_DEFAULT, + isSyncEnabled: IS_SYNC_ENABLED_BY_DEFAULT, token: this.token, vaultName, remoteUri: this.remoteUri @@ -115,8 +115,6 @@ export class TestRunner { await withTimeout( agent.init( fetch, - // eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion - WebSocket as unknown as typeof globalThis.WebSocket ), AGENT_INIT_TIMEOUT_MS, `Client ${i} init timed out after ${AGENT_INIT_TIMEOUT_MS}ms` @@ -138,28 +136,22 @@ export class TestRunner { private async executeStep(step: TestStep): Promise { switch (step.type) { case "create": - await this.getAgent(step.client).createFile( - step.path, - step.content - ); - break; - case "update": - await this.getAgent(step.client).updateFile( + await this.getAgent(step.client).write( step.path, - step.content + new TextEncoder().encode(step.content) ); break; case "rename": - await this.getAgent(step.client).renameFile( + await this.getAgent(step.client).rename( step.oldPath, step.newPath ); break; case "delete": - await this.getAgent(step.client).deleteFile(step.path); + await this.getAgent(step.client).delete(step.path); break; case "sync": @@ -199,6 +191,14 @@ export class TestRunner { await this.assertConsistent(step.verify); break; + case "pause-websocket": + this.getAgent(step.client).pauseWebSocket(); + break; + + case "resume-websocket": + this.getAgent(step.client).resumeWebSocket(); + break; + default: { const unknownStep = step as { type: string }; throw new Error(`Unknown step type: ${unknownStep.type}`); @@ -282,7 +282,7 @@ export class TestRunner { // where background sync could mutate state between reads. const clientFiles: Map[] = []; for (const agent of this.agents) { - const sortedFiles = (await agent.getFiles()).sort(); + const sortedFiles = (await agent.listFilesRecursively()).sort(); const fileMap = new Map(); for (const file of sortedFiles) { const content = await agent.getFileContent(file); diff --git a/frontend/test-client/src/agent/mock-agent.ts b/frontend/test-client/src/agent/mock-agent.ts index 308c5441..1422ac23 100644 --- a/frontend/test-client/src/agent/mock-agent.ts +++ b/frontend/test-client/src/agent/mock-agent.ts @@ -333,16 +333,19 @@ export class MockAgent extends MockClient { .includes(content); }); - if ( - !this.useSlowFileEvents - - ) { + if (!this.useSlowFileEvents) { assert( found.length <= 1, `[${this.name}] Binary content ${content} found in multiple files: ${found.join(", ")}` ); } + if (!this.useSlowFileEvents && !this.doDeletes) { + assert( + found.length >= 1, + `[${this.name}] Binary content ${content} not found in any files` + ); + } } } @@ -510,9 +513,7 @@ export class MockAgent extends MockClient { `Decided to update binary file ${file}` ); this.doNotTouchWhileOffline.push(file); - this.files.set(file, bytes); - - + await this.write(file, bytes); } private async deleteFileAction(): Promise { diff --git a/frontend/test-client/src/agent/mock-client.ts b/frontend/test-client/src/agent/mock-client.ts index 145cecd0..5d816aa4 100644 --- a/frontend/test-client/src/agent/mock-client.ts +++ b/frontend/test-client/src/agent/mock-client.ts @@ -45,10 +45,18 @@ export class MockClient extends debugging.InMemoryFileSystem { path: RelativePath, content: Uint8Array ): Promise { + const isNew = !this.files.has(path); + this.files.set(path, content); - this.executeFileOperation( - async () => this.client.syncLocallyUpdatedFile({ relativePath: path }), - ); + + if (isNew) { + this.executeFileOperation(async () => { this.client.syncLocallyCreatedFile(path); } + ); + } else { + this.executeFileOperation( + async () => { this.client.syncLocallyUpdatedFile({ relativePath: path }); }, + ); + } } @@ -66,7 +74,7 @@ export class MockClient extends debugging.InMemoryFileSystem { this.files.set(path, newContentUint8Array); this.executeFileOperation( - async () => this.client.syncLocallyUpdatedFile({ relativePath: path }), + async () => { this.client.syncLocallyUpdatedFile({ relativePath: path }); }, ); return newContent; @@ -77,7 +85,7 @@ export class MockClient extends debugging.InMemoryFileSystem { public override async delete(path: RelativePath): Promise { this.files.delete(path); this.executeFileOperation( - async () => this.client.syncLocallyDeletedFile(path), + async () => { this.client.syncLocallyDeletedFile(path); }, ); } @@ -94,10 +102,10 @@ export class MockClient extends debugging.InMemoryFileSystem { this.files.delete(oldPath); } this.executeFileOperation( - async () => this.client.syncLocallyUpdatedFile({ + async () => { this.client.syncLocallyUpdatedFile({ oldPath, relativePath: newPath - }), + }); }, ); } diff --git a/frontend/test-client/src/cli.ts b/frontend/test-client/src/cli.ts index 28684dc2..0fcd975b 100644 --- a/frontend/test-client/src/cli.ts +++ b/frontend/test-client/src/cli.ts @@ -104,11 +104,8 @@ async function runTest({ } } - // Settling rounds to drain cascading broadcasts between agents. - // Completing work on agent A can trigger broadcasts to agent B, - // which can cascade further. With N agents the worst case is N - // hops, so N+1 passes guarantees all cascades are drained. - for (let round = 0; round <= clients.length; round++) { + // Settling rounds: drain cascading broadcasts between agents + for (let round = 0; round < 10; round++) { for (const client of clients) { try { await client.waitUntilSynced(); @@ -118,8 +115,13 @@ async function runTest({ } } } + // TODO: it's very ugly, let's remove this + await sleep(2000); } + + + for (const client of clients) { try { logger.info(`Destroying ${client.name}`);