Inline fetch-retry with cancellation
This commit is contained in:
parent
d772cda164
commit
a9223156a6
7 changed files with 109 additions and 100 deletions
5
frontend/package-lock.json
generated
5
frontend/package-lock.json
generated
|
|
@ -3157,10 +3157,6 @@
|
|||
"bser": "2.1.1"
|
||||
}
|
||||
},
|
||||
"node_modules/fetch-retry": {
|
||||
"version": "6.0.0",
|
||||
"license": "MIT"
|
||||
},
|
||||
"node_modules/file-entry-cache": {
|
||||
"version": "8.0.0",
|
||||
"dev": true,
|
||||
|
|
@ -6755,7 +6751,6 @@
|
|||
"version": "0.1.5",
|
||||
"dependencies": {
|
||||
"byte-base64": "^1.1.0",
|
||||
"fetch-retry": "^6.0.0",
|
||||
"openapi-fetch": "0.13.5",
|
||||
"openapi-typescript": "7.6.1",
|
||||
"p-queue": "^8.1.0",
|
||||
|
|
|
|||
|
|
@ -14,7 +14,6 @@
|
|||
},
|
||||
"dependencies": {
|
||||
"byte-base64": "^1.1.0",
|
||||
"fetch-retry": "^6.0.0",
|
||||
"openapi-fetch": "0.13.5",
|
||||
"openapi-typescript": "7.6.1",
|
||||
"p-queue": "^8.1.0",
|
||||
|
|
|
|||
|
|
@ -1,51 +0,0 @@
|
|||
import type { Settings } from "../persistence/settings";
|
||||
import type { Logger } from "../tracing/logger";
|
||||
import { createPromise } from "../utils/create-promise";
|
||||
import { retriedFetchFactory } from "../utils/retried-fetch";
|
||||
|
||||
export class ConnectedState {
|
||||
private resolveIsSyncEnabled: (() => void) | undefined;
|
||||
private syncIsEnabled: Promise<void> | undefined;
|
||||
|
||||
public constructor(
|
||||
settings: Settings,
|
||||
private readonly logger: Logger
|
||||
) {
|
||||
settings.addOnSettingsChangeHandlers((newSettings, oldSettings) => {
|
||||
if (!oldSettings.isSyncEnabled && newSettings.isSyncEnabled) {
|
||||
this.handleComingOnline();
|
||||
} else if (
|
||||
oldSettings.isSyncEnabled &&
|
||||
!newSettings.isSyncEnabled
|
||||
) {
|
||||
this.handleGoingOffline();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public getFetchImplementation(
|
||||
fetch: typeof globalThis.fetch,
|
||||
{ doRetries = true }: { doRetries: boolean } = { doRetries: true }
|
||||
): typeof globalThis.fetch {
|
||||
const retriedFetch = doRetries
|
||||
? retriedFetchFactory(this.logger, fetch)
|
||||
: fetch;
|
||||
|
||||
return async (input: RequestInfo | URL): Promise<Response> => {
|
||||
if (this.syncIsEnabled !== undefined) {
|
||||
await this.syncIsEnabled;
|
||||
}
|
||||
return retriedFetch(input);
|
||||
};
|
||||
}
|
||||
|
||||
private handleComingOnline(): void {
|
||||
this.logger.debug("Sync is enabled");
|
||||
this.resolveIsSyncEnabled?.();
|
||||
}
|
||||
|
||||
private handleGoingOffline(): void {
|
||||
this.logger.debug("Sync is disabled");
|
||||
[this.syncIsEnabled, this.resolveIsSyncEnabled] = createPromise();
|
||||
}
|
||||
}
|
||||
101
frontend/sync-client/src/services/connection-status.ts
Normal file
101
frontend/sync-client/src/services/connection-status.ts
Normal file
|
|
@ -0,0 +1,101 @@
|
|||
import type { Settings } from "../persistence/settings";
|
||||
import type { Logger } from "../tracing/logger";
|
||||
import { createPromise } from "../utils/create-promise";
|
||||
import { sleep } from "../utils/sleep";
|
||||
|
||||
export class ConnectionStatus {
|
||||
private static readonly UNTIL_RESOLUTION = Symbol();
|
||||
private canFetch = true;
|
||||
private until: Promise<Symbol>;
|
||||
private resolveUntil: (result: Symbol) => void;
|
||||
private rejectUntil: (reason: any) => void;
|
||||
|
||||
public constructor(
|
||||
settings: Settings,
|
||||
private readonly logger: Logger
|
||||
) {
|
||||
[this.until, this.resolveUntil, this.rejectUntil] =
|
||||
createPromise<Symbol>();
|
||||
|
||||
settings.addOnSettingsChangeHandlers((newSettings, oldSettings) => {
|
||||
if (oldSettings.isSyncEnabled != newSettings.isSyncEnabled) {
|
||||
this.canFetch = newSettings.isSyncEnabled;
|
||||
this.resolveUntil(ConnectionStatus.UNTIL_RESOLUTION);
|
||||
[this.until, this.resolveUntil, this.rejectUntil] =
|
||||
createPromise<Symbol>();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public getFetchImplementation(
|
||||
fetch: typeof globalThis.fetch,
|
||||
{ doRetries = true }: { doRetries: boolean } = { doRetries: true }
|
||||
): typeof globalThis.fetch {
|
||||
return doRetries ? this.retriedFetchFactory(this.logger, fetch) : fetch;
|
||||
}
|
||||
|
||||
public reset() {
|
||||
this.rejectUntil(new Error("Sync was reset"));
|
||||
[this.until, this.resolveUntil, this.rejectUntil] = createPromise();
|
||||
}
|
||||
|
||||
private retriedFetchFactory(
|
||||
logger: Logger,
|
||||
fetch: typeof globalThis.fetch = globalThis.fetch
|
||||
) {
|
||||
return async (input: RequestInfo | URL): Promise<Response> => {
|
||||
while (true) {
|
||||
while (this.canFetch === false) {
|
||||
await this.until;
|
||||
}
|
||||
|
||||
try {
|
||||
// https://github.com/jonbern/fetch-retry/blob/8684ef4e688375f623bd76f13add76dbc1d67cfb/index.js#L67C1-L70C21
|
||||
let _input =
|
||||
typeof Request !== "undefined" &&
|
||||
input instanceof Request
|
||||
? input.clone()
|
||||
: input;
|
||||
|
||||
const fetchPromise = fetch(_input);
|
||||
|
||||
// We only want to catch rejections from `this.until`
|
||||
let result;
|
||||
do {
|
||||
result = await Promise.race([this.until, fetchPromise]);
|
||||
} while (result === ConnectionStatus.UNTIL_RESOLUTION);
|
||||
|
||||
let fetchResult: Response = result as Response;
|
||||
|
||||
if (!fetchResult.ok) {
|
||||
this.logger.warn(
|
||||
`Retrying fetch for ${ConnectionStatus.getUrlFromInput(
|
||||
input
|
||||
)}, got status ${fetchResult.status}`
|
||||
);
|
||||
}
|
||||
|
||||
return fetchResult;
|
||||
} catch (error) {
|
||||
logger.warn(
|
||||
`Retrying fetch for ${ConnectionStatus.getUrlFromInput(
|
||||
input
|
||||
)}, got error: ${error}`
|
||||
);
|
||||
}
|
||||
|
||||
await Promise.race([this.until, sleep(1000)]);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private static getUrlFromInput(input: RequestInfo | URL): string {
|
||||
if (input instanceof URL) {
|
||||
return input.href;
|
||||
}
|
||||
if (typeof input === "string") {
|
||||
return input;
|
||||
}
|
||||
return input.url;
|
||||
}
|
||||
}
|
||||
|
|
@ -8,7 +8,7 @@ import type {
|
|||
} from "../persistence/database";
|
||||
import type { Logger } from "../tracing/logger";
|
||||
import type { Settings } from "../persistence/settings";
|
||||
import type { ConnectedState } from "./connected-state";
|
||||
import type { ConnectionStatus } from "./connection-status";
|
||||
|
||||
export interface CheckConnectionResult {
|
||||
isSuccessful: boolean;
|
||||
|
|
@ -21,7 +21,7 @@ export class SyncService {
|
|||
private _fetchImplementation: typeof globalThis.fetch = globalThis.fetch;
|
||||
|
||||
public constructor(
|
||||
private readonly connectedState: ConnectedState,
|
||||
private readonly connectionStatus: ConnectionStatus,
|
||||
private readonly settings: Settings,
|
||||
private readonly logger: Logger
|
||||
) {
|
||||
|
|
@ -296,14 +296,14 @@ export class SyncService {
|
|||
private createClient(remoteUri: string): void {
|
||||
this.client = createClient<paths>({
|
||||
baseUrl: remoteUri,
|
||||
fetch: this.connectedState.getFetchImplementation(
|
||||
fetch: this.connectionStatus.getFetchImplementation(
|
||||
this._fetchImplementation
|
||||
)
|
||||
});
|
||||
|
||||
this.clientWithoutRetries = createClient<paths>({
|
||||
baseUrl: remoteUri,
|
||||
fetch: this.connectedState.getFetchImplementation(
|
||||
fetch: this.connectionStatus.getFetchImplementation(
|
||||
this._fetchImplementation,
|
||||
{ doRetries: false }
|
||||
)
|
||||
|
|
|
|||
|
|
@ -1,38 +0,0 @@
|
|||
import * as fetchRetryFactory from "fetch-retry";
|
||||
import type { RequestInitRetryParams } from "fetch-retry";
|
||||
import type { Logger } from "../tracing/logger";
|
||||
|
||||
function getUrlFromInput(input: RequestInfo | URL): string {
|
||||
if (input instanceof URL) {
|
||||
return input.href;
|
||||
}
|
||||
if (typeof input === "string") {
|
||||
return input;
|
||||
}
|
||||
return input.url;
|
||||
}
|
||||
|
||||
export function retriedFetchFactory(
|
||||
logger: Logger,
|
||||
fetch: typeof globalThis.fetch = globalThis.fetch
|
||||
) {
|
||||
return async (
|
||||
input: RequestInfo | URL,
|
||||
init: RequestInitRetryParams<typeof fetch> = {}
|
||||
): Promise<Response> => {
|
||||
return fetchRetryFactory.default(fetch)(input, {
|
||||
retryOn: function (attempt, error, response) {
|
||||
if (error !== null || !response || response.status >= 500) {
|
||||
logger.warn(
|
||||
`Retrying fetch for ${getUrlFromInput(input)}, attempt ${attempt}`
|
||||
);
|
||||
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
},
|
||||
retryDelay: (attempt) => Math.pow(1.5, attempt) * 500,
|
||||
...init
|
||||
});
|
||||
};
|
||||
}
|
||||
3
frontend/sync-client/src/utils/sleep.ts
Normal file
3
frontend/sync-client/src/utils/sleep.ts
Normal file
|
|
@ -0,0 +1,3 @@
|
|||
export async function sleep(ms: number): Promise<void> {
|
||||
return new Promise((resolve) => setTimeout(resolve, ms));
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue