From 56c77dc3f6d02ae11d87fc596cceada4219c016a Mon Sep 17 00:00:00 2001 From: Andras Schmelczer Date: Sun, 23 Nov 2025 10:43:20 +0000 Subject: [PATCH] Fix fetch controller --- .../src/services/connection-status.ts | 98 ------------ .../src/services/fetch-controller.ts | 145 ++++++++++++++++++ .../sync-client/src/services/sync-service.ts | 6 +- frontend/sync-client/src/sync-client.ts | 11 +- 4 files changed, 158 insertions(+), 102 deletions(-) delete mode 100644 frontend/sync-client/src/services/connection-status.ts create mode 100644 frontend/sync-client/src/services/fetch-controller.ts diff --git a/frontend/sync-client/src/services/connection-status.ts b/frontend/sync-client/src/services/connection-status.ts deleted file mode 100644 index 18f53a0d..00000000 --- a/frontend/sync-client/src/services/connection-status.ts +++ /dev/null @@ -1,98 +0,0 @@ -import type { Settings } from "../persistence/settings"; -import type { Logger } from "../tracing/logger"; -import { createPromise } from "../utils/create-promise"; -import { SyncResetError } from "./sync-reset-error"; - -export class ConnectionStatus { - private static readonly UNTIL_RESOLUTION = Symbol(); - private canFetch: boolean; - private until: Promise; - private resolveUntil: (result: symbol) => unknown; - private rejectUntil: (reason: unknown) => unknown; - - public constructor( - settings: Settings, - private readonly logger: Logger - ) { - this.canFetch = settings.getSettings().isSyncEnabled; - - [this.until, this.resolveUntil, this.rejectUntil] = - createPromise(); - - settings.addOnSettingsChangeListener((newSettings, oldSettings) => { - if (oldSettings.isSyncEnabled != newSettings.isSyncEnabled) { - this.canFetch = newSettings.isSyncEnabled; - this.resolveUntil(ConnectionStatus.UNTIL_RESOLUTION); - [this.until, this.resolveUntil, this.rejectUntil] = - createPromise(); - } - }); - } - - private static getUrlFromInput(input: RequestInfo | URL): string { - if (input instanceof URL) { - return input.href; - } - if (typeof input === "string") { - return input; - } - return input.url; - } - - public startReset(): void { - this.rejectUntil(new SyncResetError()); - } - - public finishReset(): void { - [this.until, this.resolveUntil, this.rejectUntil] = createPromise(); - } - - public getFetchImplementation( - logger: Logger, - fetch: typeof globalThis.fetch = globalThis.fetch - ): typeof globalThis.fetch { - return async ( - input: RequestInfo | URL, - init?: RequestInit - ): Promise => { - while (!this.canFetch) { - await this.until; - } - - try { - // https://github.com/jonbern/fetch-retry/blob/8684ef4e688375f623bd76f13add76dbc1d67cfb/index.js#L67C1-L70C21 - const _input = - typeof Request !== "undefined" && input instanceof Request - ? input.clone() - : input; - - const fetchPromise = fetch(_input, init); - - // We only want to catch rejections from `this.until` - let result: symbol | Response | undefined = undefined; - do { - result = await Promise.race([this.until, fetchPromise]); - } while (result === ConnectionStatus.UNTIL_RESOLUTION); - - const fetchResult: Response = result as Response; // eslint-disable-line @typescript-eslint/no-unsafe-type-assertion - - if (!fetchResult.ok) { - this.logger.warn( - `Fetch for ${ConnectionStatus.getUrlFromInput( - input - )}, got status ${fetchResult.status}` - ); - } - - return fetchResult; - } catch (error) { - logger.warn( - `Fetch for ${ConnectionStatus.getUrlFromInput( - input - )}, got error: ${error}` - ); - throw error; - } - }; - } -} diff --git a/frontend/sync-client/src/services/fetch-controller.ts b/frontend/sync-client/src/services/fetch-controller.ts new file mode 100644 index 00000000..fbfac59e --- /dev/null +++ b/frontend/sync-client/src/services/fetch-controller.ts @@ -0,0 +1,145 @@ +import type { Logger } from "../tracing/logger"; +import { createPromise } from "../utils/create-promise"; +import { SyncResetError } from "./sync-reset-error"; + +/** + * Offers a resettable fetch implementation that waits until syncing is enabled + * and aborts outstanding requests when a reset is started. + */ +export class FetchController { + private static readonly UNTIL_RESOLUTION = Symbol(); + + private isResetting = false; + + // Promise resolves on the next state change: sync enabled/disabled or reset started/ended + private until: Promise; + private resolveUntil: (result: symbol) => unknown; + private rejectUntil: (reason: unknown) => unknown; + + public constructor( + private _canFetch: boolean, + private readonly logger: Logger + ) { + [this.until, this.resolveUntil, this.rejectUntil] = + createPromise(); + } + + private static getUrlFromInput(input: RequestInfo | URL): string { + if (input instanceof URL) { + return input.href; + } + if (typeof input === "string") { + return input; + } + return input.url; + } + + /** + * Whether the fetch implementation can immediately send requests once outside of a reset. + */ + public get canFetch(): boolean { + return this._canFetch; + } + + /** + * Allow or disallow fetching. The changes only take effect if not resetting. + * When called during a reset, its effect is deferred until the reset is finished. + * + * @param canFetch Whether fetching is enabled + */ + public set canFetch(canFetch: boolean) { + this._canFetch = canFetch; + + if (!this.isResetting) { + const previousResolve = this.resolveUntil; + [this.until, this.resolveUntil, this.rejectUntil] = + createPromise(); + previousResolve(FetchController.UNTIL_RESOLUTION); + } + } + + /** + * Starts a reset, causing all ongoing and future fetches to be rejected + * with a SyncResetError until finishReset is called. + */ + public startReset(): void { + this.isResetting = true; + this.rejectUntil(new SyncResetError()); + } + + /** + * Finishes a reset, allowing fetches to proceed or wait again depending on + * the current sync settings. + */ + public finishReset(): void { + if (!this.isResetting) { + throw new Error("Cannot finish reset when not resetting"); + } + + this.isResetting = false; + [this.until, this.resolveUntil, this.rejectUntil] = createPromise(); + } + + /** + * + * |------------------|---------------|-----------------------------------------------------| + * | | Sync enabled | Sync disabled | + * |------------------|-------------- |-----------------------------------------------------| + * | During reset | Rejects with SyncResetError without sending request | + * |------------------|-------------- |-----------------------------------------------------| + * | Outside of reset | Same as fetch | Blocks until sync is enabled and then same as fetch | + * |------------------|---------------|-----------------------------------------------------| + * + * @param logger for errors + * @param fetch to wrap + * @returns a wrapped fetch implementation affected by the FetchController state + */ + public getControlledFetchImplementation( + logger: Logger, + fetch: typeof globalThis.fetch = globalThis.fetch + ): typeof globalThis.fetch { + return async ( + input: RequestInfo | URL, + init?: RequestInit + ): Promise => { + while (!this.canFetch || this.isResetting) { + await this.until; + } + + try { + // https://github.com/jonbern/fetch-retry/blob/8684ef4e688375f623bd76f13add76dbc1d67cfb/index.js#L67C1-L70C21 + const _input = + typeof Request !== "undefined" && input instanceof Request + ? input.clone() + : input; + + const fetchPromise = fetch(_input, init); + + // We only want to catch rejections from `this.until` + let result: symbol | Response | undefined = undefined; + do { + result = await Promise.race([this.until, fetchPromise]); + } while (result === FetchController.UNTIL_RESOLUTION); + + const fetchResult: Response = result as Response; // eslint-disable-line @typescript-eslint/no-unsafe-type-assertion + + if (!fetchResult.ok) { + this.logger.warn( + `Fetch for ${FetchController.getUrlFromInput( + input + )}, got status ${fetchResult.status}` + ); + } + + return fetchResult; + } catch (error) { + logger.warn( + `Fetch for ${FetchController.getUrlFromInput( + input + )}, got error: ${error}` + ); + throw error; + } + }; + } +} diff --git a/frontend/sync-client/src/services/sync-service.ts b/frontend/sync-client/src/services/sync-service.ts index 8ae85b58..ce5e8cb3 100644 --- a/frontend/sync-client/src/services/sync-service.ts +++ b/frontend/sync-client/src/services/sync-service.ts @@ -6,7 +6,7 @@ import type { import type { Logger } from "../tracing/logger"; import type { Settings } from "../persistence/settings"; -import type { ConnectionStatus } from "./connection-status"; +import type { FetchController } from "./fetch-controller"; import { sleep } from "../utils/sleep"; import { SyncResetError } from "./sync-reset-error"; import type { SerializedError } from "./types/SerializedError"; @@ -25,7 +25,7 @@ export class SyncService { public constructor( private readonly deviceId: string, - private readonly connectionStatus: ConnectionStatus, + private readonly connectionStatus: FetchController, private readonly settings: Settings, private readonly logger: Logger, fetchImplementation: typeof globalThis.fetch = globalThis.fetch @@ -34,7 +34,7 @@ export class SyncService { const unboundFetch: typeof globalThis.fetch = async (...args) => fetchImplementation(...args); - this.client = this.connectionStatus.getFetchImplementation( + this.client = this.connectionStatus.getControlledFetchImplementation( this.logger, unboundFetch ); diff --git a/frontend/sync-client/src/sync-client.ts b/frontend/sync-client/src/sync-client.ts index 28843d3d..5c242045 100644 --- a/frontend/sync-client/src/sync-client.ts +++ b/frontend/sync-client/src/sync-client.ts @@ -148,7 +148,16 @@ export class SyncClient { } ); - const connectionStatus = new ConnectionStatus(settings, logger); + const connectionStatus = new FetchController( + settings.getSettings().isSyncEnabled, + logger + ); + settings.addOnSettingsChangeListener((newSettings, oldSettings) => { + if (oldSettings.isSyncEnabled != newSettings.isSyncEnabled) { + connectionStatus.canFetch = newSettings.isSyncEnabled; + } + }); + const syncService = new SyncService( deviceId, connectionStatus,