Fix fetch controller

This commit is contained in:
Andras Schmelczer 2025-11-23 10:43:20 +00:00
parent 4186aa9e0c
commit 56c77dc3f6
4 changed files with 158 additions and 102 deletions

View file

@ -1,98 +0,0 @@
import type { Settings } from "../persistence/settings";
import type { Logger } from "../tracing/logger";
import { createPromise } from "../utils/create-promise";
import { SyncResetError } from "./sync-reset-error";
export class ConnectionStatus {
private static readonly UNTIL_RESOLUTION = Symbol();
private canFetch: boolean;
private until: Promise<symbol>;
private resolveUntil: (result: symbol) => unknown;
private rejectUntil: (reason: unknown) => unknown;
public constructor(
settings: Settings,
private readonly logger: Logger
) {
this.canFetch = settings.getSettings().isSyncEnabled;
[this.until, this.resolveUntil, this.rejectUntil] =
createPromise<symbol>();
settings.addOnSettingsChangeListener((newSettings, oldSettings) => {
if (oldSettings.isSyncEnabled != newSettings.isSyncEnabled) {
this.canFetch = newSettings.isSyncEnabled;
this.resolveUntil(ConnectionStatus.UNTIL_RESOLUTION);
[this.until, this.resolveUntil, this.rejectUntil] =
createPromise<symbol>();
}
});
}
private static getUrlFromInput(input: RequestInfo | URL): string {
if (input instanceof URL) {
return input.href;
}
if (typeof input === "string") {
return input;
}
return input.url;
}
public startReset(): void {
this.rejectUntil(new SyncResetError());
}
public finishReset(): void {
[this.until, this.resolveUntil, this.rejectUntil] = createPromise();
}
public getFetchImplementation(
logger: Logger,
fetch: typeof globalThis.fetch = globalThis.fetch
): typeof globalThis.fetch {
return async (
input: RequestInfo | URL,
init?: RequestInit
): Promise<Response> => {
while (!this.canFetch) {
await this.until;
}
try {
// https://github.com/jonbern/fetch-retry/blob/8684ef4e688375f623bd76f13add76dbc1d67cfb/index.js#L67C1-L70C21
const _input =
typeof Request !== "undefined" && input instanceof Request
? input.clone()
: input;
const fetchPromise = fetch(_input, init);
// We only want to catch rejections from `this.until`
let result: symbol | Response | undefined = undefined;
do {
result = await Promise.race([this.until, fetchPromise]);
} while (result === ConnectionStatus.UNTIL_RESOLUTION);
const fetchResult: Response = result as Response; // eslint-disable-line @typescript-eslint/no-unsafe-type-assertion
if (!fetchResult.ok) {
this.logger.warn(
`Fetch for ${ConnectionStatus.getUrlFromInput(
input
)}, got status ${fetchResult.status}`
);
}
return fetchResult;
} catch (error) {
logger.warn(
`Fetch for ${ConnectionStatus.getUrlFromInput(
input
)}, got error: ${error}`
);
throw error;
}
};
}
}

View file

@ -0,0 +1,145 @@
import type { Logger } from "../tracing/logger";
import { createPromise } from "../utils/create-promise";
import { SyncResetError } from "./sync-reset-error";
/**
* Offers a resettable fetch implementation that waits until syncing is enabled
* and aborts outstanding requests when a reset is started.
*/
export class FetchController {
private static readonly UNTIL_RESOLUTION = Symbol();
private isResetting = false;
// Promise resolves on the next state change: sync enabled/disabled or reset started/ended
private until: Promise<symbol>;
private resolveUntil: (result: symbol) => unknown;
private rejectUntil: (reason: unknown) => unknown;
public constructor(
private _canFetch: boolean,
private readonly logger: Logger
) {
[this.until, this.resolveUntil, this.rejectUntil] =
createPromise<symbol>();
}
private static getUrlFromInput(input: RequestInfo | URL): string {
if (input instanceof URL) {
return input.href;
}
if (typeof input === "string") {
return input;
}
return input.url;
}
/**
* Whether the fetch implementation can immediately send requests once outside of a reset.
*/
public get canFetch(): boolean {
return this._canFetch;
}
/**
* Allow or disallow fetching. The changes only take effect if not resetting.
* When called during a reset, its effect is deferred until the reset is finished.
*
* @param canFetch Whether fetching is enabled
*/
public set canFetch(canFetch: boolean) {
this._canFetch = canFetch;
if (!this.isResetting) {
const previousResolve = this.resolveUntil;
[this.until, this.resolveUntil, this.rejectUntil] =
createPromise<symbol>();
previousResolve(FetchController.UNTIL_RESOLUTION);
}
}
/**
* Starts a reset, causing all ongoing and future fetches to be rejected
* with a SyncResetError until finishReset is called.
*/
public startReset(): void {
this.isResetting = true;
this.rejectUntil(new SyncResetError());
}
/**
* Finishes a reset, allowing fetches to proceed or wait again depending on
* the current sync settings.
*/
public finishReset(): void {
if (!this.isResetting) {
throw new Error("Cannot finish reset when not resetting");
}
this.isResetting = false;
[this.until, this.resolveUntil, this.rejectUntil] = createPromise();
}
/**
*
* |------------------|---------------|-----------------------------------------------------|
* | | Sync enabled | Sync disabled |
* |------------------|-------------- |-----------------------------------------------------|
* | During reset | Rejects with SyncResetError without sending request |
* |------------------|-------------- |-----------------------------------------------------|
* | Outside of reset | Same as fetch | Blocks until sync is enabled and then same as fetch |
* |------------------|---------------|-----------------------------------------------------|
*
* @param logger for errors
* @param fetch to wrap
* @returns a wrapped fetch implementation affected by the FetchController state
*/
public getControlledFetchImplementation(
logger: Logger,
fetch: typeof globalThis.fetch = globalThis.fetch
): typeof globalThis.fetch {
return async (
input: RequestInfo | URL,
init?: RequestInit
): Promise<Response> => {
while (!this.canFetch || this.isResetting) {
await this.until;
}
try {
// https://github.com/jonbern/fetch-retry/blob/8684ef4e688375f623bd76f13add76dbc1d67cfb/index.js#L67C1-L70C21
const _input =
typeof Request !== "undefined" && input instanceof Request
? input.clone()
: input;
const fetchPromise = fetch(_input, init);
// We only want to catch rejections from `this.until`
let result: symbol | Response | undefined = undefined;
do {
result = await Promise.race([this.until, fetchPromise]);
} while (result === FetchController.UNTIL_RESOLUTION);
const fetchResult: Response = result as Response; // eslint-disable-line @typescript-eslint/no-unsafe-type-assertion
if (!fetchResult.ok) {
this.logger.warn(
`Fetch for ${FetchController.getUrlFromInput(
input
)}, got status ${fetchResult.status}`
);
}
return fetchResult;
} catch (error) {
logger.warn(
`Fetch for ${FetchController.getUrlFromInput(
input
)}, got error: ${error}`
);
throw error;
}
};
}
}

View file

@ -6,7 +6,7 @@ import type {
import type { Logger } from "../tracing/logger";
import type { Settings } from "../persistence/settings";
import type { ConnectionStatus } from "./connection-status";
import type { FetchController } from "./fetch-controller";
import { sleep } from "../utils/sleep";
import { SyncResetError } from "./sync-reset-error";
import type { SerializedError } from "./types/SerializedError";
@ -25,7 +25,7 @@ export class SyncService {
public constructor(
private readonly deviceId: string,
private readonly connectionStatus: ConnectionStatus,
private readonly connectionStatus: FetchController,
private readonly settings: Settings,
private readonly logger: Logger,
fetchImplementation: typeof globalThis.fetch = globalThis.fetch
@ -34,7 +34,7 @@ export class SyncService {
const unboundFetch: typeof globalThis.fetch = async (...args) =>
fetchImplementation(...args);
this.client = this.connectionStatus.getFetchImplementation(
this.client = this.connectionStatus.getControlledFetchImplementation(
this.logger,
unboundFetch
);

View file

@ -148,7 +148,16 @@ export class SyncClient {
}
);
const connectionStatus = new ConnectionStatus(settings, logger);
const connectionStatus = new FetchController(
settings.getSettings().isSyncEnabled,
logger
);
settings.addOnSettingsChangeListener((newSettings, oldSettings) => {
if (oldSettings.isSyncEnabled != newSettings.isSyncEnabled) {
connectionStatus.canFetch = newSettings.isSyncEnabled;
}
});
const syncService = new SyncService(
deviceId,
connectionStatus,