Allow overriding WebSocket implementation and add flaky version for testing

This commit is contained in:
Andras Schmelczer 2025-04-07 23:13:45 +01:00
parent 74a8060246
commit 3ec6bd4d5b
No known key found for this signature in database
GPG key ID: FC8F2C3D3D1A718C
8 changed files with 162 additions and 73 deletions

View file

@ -1,36 +1,36 @@
{ {
"name": "sync-client", "name": "sync-client",
"version": "0.3.8", "version": "0.3.8",
"main": "dist/sync-client.node.js", "main": "dist/sync-client.node.js",
"browser": "dist/sync-client.web.js", "browser": "dist/sync-client.web.js",
"types": "dist/types/index.d.ts", "types": "dist/types/index.d.ts",
"files": [ "files": [
"dist/**/*" "dist/**/*"
], ],
"scripts": { "scripts": {
"dev": "webpack watch --mode development", "dev": "webpack watch --mode development",
"build": "webpack --mode production", "build": "webpack --mode production",
"test": "NODE_OPTIONS=\"$NODE_OPTIONS --experimental-vm-modules\" jest" "test": "NODE_OPTIONS=\"$NODE_OPTIONS --experimental-vm-modules\" jest"
}, },
"dependencies": { "dependencies": {
"byte-base64": "^1.1.0", "byte-base64": "^1.1.0",
"openapi-fetch": "0.13.5", "openapi-fetch": "0.13.5",
"openapi-typescript": "7.6.1", "openapi-typescript": "7.6.1",
"p-queue": "^8.1.0", "p-queue": "^8.1.0",
"uuid": "^11.1.0" "uuid": "^11.1.0"
}, },
"devDependencies": { "devDependencies": {
"@types/jest": "^29.5.14", "@types/jest": "^29.5.14",
"@types/node": "^22.14.0", "@types/node": "^22.14.0",
"jest": "^29.7.0", "jest": "^29.7.0",
"sync_lib": "file:../../backend/sync_lib/pkg", "sync_lib": "file:../../backend/sync_lib/pkg",
"ts-jest": "^29.3.1", "ts-jest": "^29.3.1",
"ts-loader": "^9.5.2", "ts-loader": "^9.5.2",
"tslib": "2.8.1", "tslib": "2.8.1",
"typescript": "5.8.2", "typescript": "5.8.2",
"webpack": "^5.98.0", "webpack": "^5.98.0",
"webpack-cli": "^6.0.1", "webpack-cli": "^6.0.1",
"webpack-merge": "^6.0.1", "webpack-merge": "^6.0.1",
"ws": "^8.18.1" "ws": "^8.18.1"
} }
} }

View file

@ -21,13 +21,13 @@ export class SyncService {
private static readonly NETWORK_RETRY_INTERVAL_MS = 1000; private static readonly NETWORK_RETRY_INTERVAL_MS = 1000;
private client: Client<paths>; private client: Client<paths>;
private pingClient: Client<paths>; private pingClient: Client<paths>;
private _fetchImplementation: typeof globalThis.fetch = globalThis.fetch;
public constructor( public constructor(
private readonly deviceId: string, private readonly deviceId: string,
private readonly connectionStatus: ConnectionStatus, private readonly connectionStatus: ConnectionStatus,
private readonly settings: Settings, private readonly settings: Settings,
private readonly logger: Logger private readonly logger: Logger,
private readonly fetchImplementation: typeof globalThis.fetch = globalThis.fetch
) { ) {
[this.client, this.pingClient] = this.createClient( [this.client, this.pingClient] = this.createClient(
this.settings.getSettings().remoteUri this.settings.getSettings().remoteUri
@ -44,13 +44,6 @@ export class SyncService {
}); });
} }
public set fetchImplementation(fetch: typeof globalThis.fetch) {
this._fetchImplementation = fetch;
[this.client, this.pingClient] = this.createClient(
this.settings.getSettings().remoteUri
);
}
private static formatError( private static formatError(
error: components["schemas"]["SerializedError"] error: components["schemas"]["SerializedError"]
): string { ): string {
@ -329,7 +322,7 @@ export class SyncService {
baseUrl: remoteUri, baseUrl: remoteUri,
fetch: this.connectionStatus.getFetchImplementation( fetch: this.connectionStatus.getFetchImplementation(
this.logger, this.logger,
this._fetchImplementation this.fetchImplementation
), ),
headers: { headers: {
authorization: `Bearer ${this.settings.getSettings().token}` authorization: `Bearer ${this.settings.getSettings().token}`
@ -337,7 +330,7 @@ export class SyncService {
}), }),
createClient<paths>({ createClient<paths>({
baseUrl: remoteUri, baseUrl: remoteUri,
fetch: this._fetchImplementation, fetch: this.fetchImplementation,
headers: { headers: {
authorization: `Bearer ${this.settings.getSettings().token}` authorization: `Bearer ${this.settings.getSettings().token}`
} }

View file

@ -56,7 +56,8 @@ export class SyncClient {
public static async create({ public static async create({
fs, fs,
persistence, persistence,
fetch = globalThis.fetch, fetch,
webSocket,
nativeLineEndings = "\n" nativeLineEndings = "\n"
}: { }: {
fs: FileSystemOperations; fs: FileSystemOperations;
@ -67,6 +68,7 @@ export class SyncClient {
}> }>
>; >;
fetch?: typeof globalThis.fetch; fetch?: typeof globalThis.fetch;
webSocket?: typeof globalThis.WebSocket;
nativeLineEndings?: string; nativeLineEndings?: string;
}): Promise<SyncClient> { }): Promise<SyncClient> {
const logger = new Logger(); const logger = new Logger();
@ -113,9 +115,10 @@ export class SyncClient {
deviceId, deviceId,
connectionStatus, connectionStatus,
settings, settings,
logger logger,
fetch
); );
syncService.fetchImplementation = fetch;
const fileOperations = new FileOperations( const fileOperations = new FileOperations(
logger, logger,
database, database,
@ -137,7 +140,8 @@ export class SyncClient {
settings, settings,
syncService, syncService,
fileOperations, fileOperations,
unrestrictedSyncer unrestrictedSyncer,
webSocket
); );
const client = new SyncClient( const client = new SyncClient(

View file

@ -31,6 +31,8 @@ export class Syncer {
| undefined; | undefined;
private applyRemoteChangesWebSocket: WebSocket | undefined; private applyRemoteChangesWebSocket: WebSocket | undefined;
private readonly webSocketImplementation: typeof globalThis.WebSocket;
// eslint-disable-next-line @typescript-eslint/max-params // eslint-disable-next-line @typescript-eslint/max-params
public constructor( public constructor(
private readonly deviceId: string, private readonly deviceId: string,
@ -39,12 +41,27 @@ export class Syncer {
private readonly settings: Settings, private readonly settings: Settings,
private readonly syncService: SyncService, private readonly syncService: SyncService,
private readonly operations: FileOperations, private readonly operations: FileOperations,
private readonly internalSyncer: UnrestrictedSyncer private readonly internalSyncer: UnrestrictedSyncer,
webSocketImplementation?: typeof globalThis.WebSocket
) { ) {
this.syncQueue = new PQueue({ this.syncQueue = new PQueue({
concurrency: settings.getSettings().syncConcurrency concurrency: settings.getSettings().syncConcurrency
}); });
if (webSocketImplementation) {
this.webSocketImplementation = webSocketImplementation;
} else {
if (
typeof globalThis !== "undefined" &&
typeof globalThis.WebSocket === "undefined"
) {
// eslint-disable-next-line
this.webSocketImplementation = require("ws"); // polyfill for WebSocket in Node.js
} else {
this.webSocketImplementation = WebSocket;
}
}
this.updateWebSocket(settings.getSettings()); this.updateWebSocket(settings.getSettings());
this.remoteDocumentsLock = new Locks<DocumentId>(this.logger); this.remoteDocumentsLock = new Locks<DocumentId>(this.logger);
@ -74,7 +91,10 @@ export class Syncer {
} }
public get isWebSocketConnected(): boolean { public get isWebSocketConnected(): boolean {
return this.applyRemoteChangesWebSocket?.readyState === WebSocket.OPEN; return (
this.applyRemoteChangesWebSocket?.readyState ===
this.webSocketImplementation.OPEN
);
} }
public addRemainingOperationsListener( public addRemainingOperationsListener(
@ -270,15 +290,9 @@ export class Syncer {
this.logger.info(`Connecting to WebSocket at ${wsUri.toString()}`); this.logger.info(`Connecting to WebSocket at ${wsUri.toString()}`);
if ( this.applyRemoteChangesWebSocket = new this.webSocketImplementation(
typeof globalThis !== "undefined" && wsUri
typeof globalThis.WebSocket === "undefined" );
) {
// eslint-disable-next-line
globalThis.WebSocket = require("ws"); // polyfill for WebSocket in Node.js
}
this.applyRemoteChangesWebSocket = new WebSocket(wsUri);
this.applyRemoteChangesWebSocket.onmessage = (event): void => this.applyRemoteChangesWebSocket.onmessage = (event): void =>
void this.syncRemotelyUpdatedFile(event.data).catch( void this.syncRemotelyUpdatedFile(event.data).catch(
@ -316,7 +330,8 @@ export class Syncer {
private setWebSocketRefreshInterval(): void { private setWebSocketRefreshInterval(): void {
this.refreshApplyRemoteChangesWebSocketInterval = setInterval(() => { this.refreshApplyRemoteChangesWebSocketInterval = setInterval(() => {
if ( if (
this.applyRemoteChangesWebSocket?.readyState === WebSocket.OPEN this.applyRemoteChangesWebSocket?.readyState ===
this.webSocketImplementation.OPEN
) { ) {
return; return;
} }

View file

@ -6,6 +6,8 @@ import { LogLevel } from "sync-client";
import { MockClient } from "./mock-client"; import { MockClient } from "./mock-client";
import { sleep } from "../utils/sleep"; import { sleep } from "../utils/sleep";
import type { LogLine } from "sync-client/dist/types/tracing/logger"; import type { LogLine } from "sync-client/dist/types/tracing/logger";
import { flakyFetchFactory } from "../utils/flaky-fetch";
import { flakyWebSocketFactory } from "../utils/flaky-websocket";
export class MockAgent extends MockClient { export class MockAgent extends MockClient {
private readonly writtenContents: string[] = []; private readonly writtenContents: string[] = [];
@ -26,16 +28,8 @@ export class MockAgent extends MockClient {
public async init(): Promise<void> { public async init(): Promise<void> {
await super.init( await super.init(
// flaky fetch implementation to use during testing flakyFetchFactory(this.jitterScaleInSeconds),
async ( flakyWebSocketFactory(this.jitterScaleInSeconds)
input: string | URL | globalThis.Request,
init?: RequestInit
): Promise<Response> => {
await sleep(Math.random() * this.jitterScaleInSeconds * 1000);
const response = await fetch(input, init);
await sleep(Math.random() * this.jitterScaleInSeconds * 1000);
return response;
}
); );
assert( assert(

View file

@ -30,7 +30,8 @@ export class MockClient implements FileSystemOperations {
} }
public async init( public async init(
fetchImplementation: typeof globalThis.fetch fetchImplementation: typeof globalThis.fetch,
webSocketImplementation: typeof globalThis.WebSocket
): Promise<void> { ): Promise<void> {
this.client = await SyncClient.create({ this.client = await SyncClient.create({
fs: this, fs: this,
@ -38,7 +39,8 @@ export class MockClient implements FileSystemOperations {
load: async () => this.data, load: async () => this.data,
save: async (data) => void (this.data = data) save: async (data) => void (this.data = data)
}, },
fetch: fetchImplementation fetch: fetchImplementation,
webSocket: webSocketImplementation
}); });
await this.client.start(); await this.client.start();

View file

@ -0,0 +1,20 @@
import { sleep } from "./sleep";
export const flakyFetchFactory =
(jitterScaleInSeconds: number) =>
async (
input: string | URL | globalThis.Request,
init?: RequestInit
): Promise<Response> => {
if (jitterScaleInSeconds > 0) {
await sleep(Math.random() * jitterScaleInSeconds * 1000);
}
const response = await fetch(input, init);
if (jitterScaleInSeconds > 0) {
await sleep(Math.random() * jitterScaleInSeconds * 1000);
}
return response;
};

View file

@ -0,0 +1,61 @@
import { sleep } from "./sleep";
export function flakyWebSocketFactory(
jitterScaleInSeconds: number
): typeof WebSocket {
// eslint-disable-next-line
return class FlakyWebSocket extends require("ws") {
public set onopen(callback: (event: Event) => void) {
// eslint-disable-next-line
super.onopen = async (event: Event): Promise<void> => {
if (jitterScaleInSeconds > 0) {
await sleep(Math.random() * jitterScaleInSeconds * 1000);
}
callback(event);
};
}
public set onmessage(callback: (event: MessageEvent) => void) {
// eslint-disable-next-line
super.onmessage = async (event: MessageEvent): Promise<void> => {
if (jitterScaleInSeconds > 0) {
await sleep(Math.random() * jitterScaleInSeconds * 1000);
}
callback(event);
};
}
public set onclose(callback: (event: CloseEvent) => void) {
// eslint-disable-next-line
super.onclose = async (event: CloseEvent): Promise<void> => {
if (jitterScaleInSeconds > 0) {
await sleep(Math.random() * jitterScaleInSeconds * 1000);
}
callback(event);
};
}
public set onerror(callback: (event: Event) => void) {
// eslint-disable-next-line
super.onerror = async (event: Event): Promise<void> => {
if (jitterScaleInSeconds > 0) {
await sleep(Math.random() * jitterScaleInSeconds * 1000);
}
callback(event);
};
}
public async send(
data: string | ArrayBufferLike | Blob | ArrayBufferView
): Promise<void> {
if (jitterScaleInSeconds > 0) {
await sleep(Math.random() * jitterScaleInSeconds * 1000);
}
// eslint-disable-next-line
super.send(data);
}
} as unknown as typeof WebSocket;
}