Hoist retry logic

This commit is contained in:
Andras Schmelczer 2025-03-22 16:15:33 +00:00
parent 80ad81f872
commit c7e53bff26
No known key found for this signature in database
GPG key ID: FC8F2C3D3D1A718C
4 changed files with 219 additions and 193 deletions

View file

@ -211,6 +211,7 @@ export class SyncSettingsTab extends PluginSettingTab {
new Notice(
"The changes have been applied successfully!"
);
await this.statusDescription.updateConnectionState();
} else {
new Notice("No changes to apply");
}

View file

@ -39,65 +39,53 @@ export class ConnectionStatus {
return input.url;
}
public getFetchImplementation(
fetch: typeof globalThis.fetch,
{ doRetries = true }: { doRetries: boolean } = { doRetries: true }
): typeof globalThis.fetch {
return doRetries ? this.retriedFetchFactory(this.logger, fetch) : fetch;
}
public reset(): void {
this.rejectUntil(new Error("Sync was reset"));
[this.until, this.resolveUntil, this.rejectUntil] = createPromise();
}
private retriedFetchFactory(
public getFetchImplementation(
logger: Logger,
fetch: typeof globalThis.fetch = globalThis.fetch
): typeof globalThis.fetch {
return async (input: RequestInfo | URL): Promise<Response> => {
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
while (true) {
while (!this.canFetch) {
await this.until;
}
while (!this.canFetch) {
await this.until;
}
try {
// https://github.com/jonbern/fetch-retry/blob/8684ef4e688375f623bd76f13add76dbc1d67cfb/index.js#L67C1-L70C21
const _input =
typeof Request !== "undefined" &&
input instanceof Request
? input.clone()
: input;
try {
// https://github.com/jonbern/fetch-retry/blob/8684ef4e688375f623bd76f13add76dbc1d67cfb/index.js#L67C1-L70C21
const _input =
typeof Request !== "undefined" && input instanceof Request
? input.clone()
: input;
const fetchPromise = fetch(_input);
const fetchPromise = fetch(_input);
// We only want to catch rejections from `this.until`
let result: symbol | Response | undefined = undefined;
do {
result = await Promise.race([this.until, fetchPromise]);
} while (result === ConnectionStatus.UNTIL_RESOLUTION);
// We only want to catch rejections from `this.until`
let result: symbol | Response | undefined = undefined;
do {
result = await Promise.race([this.until, fetchPromise]);
} while (result === ConnectionStatus.UNTIL_RESOLUTION);
const fetchResult: Response = result as Response; // eslint-disable-line @typescript-eslint/no-unsafe-type-assertion
const fetchResult: Response = result as Response; // eslint-disable-line @typescript-eslint/no-unsafe-type-assertion
if (!fetchResult.ok) {
this.logger.warn(
`Retrying fetch for ${ConnectionStatus.getUrlFromInput(
input
)}, got status ${fetchResult.status}`
);
}
return fetchResult;
} catch (error) {
logger.warn(
`Retrying fetch for ${ConnectionStatus.getUrlFromInput(
if (!fetchResult.ok) {
this.logger.warn(
`Fetch for ${ConnectionStatus.getUrlFromInput(
input
)}, got error: ${error}`
)}, got status ${fetchResult.status}`
);
}
await Promise.race([this.until, sleep(1000)]);
return fetchResult;
} catch (error) {
logger.warn(
`Fetch for ${ConnectionStatus.getUrlFromInput(
input
)}, got error: ${error}`
);
throw error;
}
};
}

View file

@ -9,6 +9,7 @@ import type {
import type { Logger } from "../tracing/logger";
import type { Settings } from "../persistence/settings";
import type { ConnectionStatus } from "./connection-status";
import { sleep } from "../utils/sleep";
export interface CheckConnectionResult {
isSuccessful: boolean;
@ -16,8 +17,8 @@ export interface CheckConnectionResult {
}
export class SyncService {
private client!: Client<paths>;
private clientWithoutRetries!: Client<paths>;
private client: Client<paths>;
private pingClient: Client<paths>;
private _fetchImplementation: typeof globalThis.fetch = globalThis.fetch;
public constructor(
@ -25,20 +26,26 @@ export class SyncService {
private readonly settings: Settings,
private readonly logger: Logger
) {
this.createClient(this.settings.getSettings().remoteUri);
[this.client, this.pingClient] = this.createClient(
this.settings.getSettings().remoteUri
);
settings.addOnSettingsChangeListener((newSettings, oldSettings) => {
if (newSettings.remoteUri === oldSettings.remoteUri) {
return;
}
this.createClient(newSettings.remoteUri);
[this.client, this.pingClient] = this.createClient(
newSettings.remoteUri
);
});
}
public set fetchImplementation(fetch: typeof globalThis.fetch) {
this._fetchImplementation = fetch;
this.createClient(this.settings.getSettings().remoteUri);
[this.client, this.pingClient] = this.createClient(
this.settings.getSettings().remoteUri
);
}
private static formatError(
@ -62,42 +69,44 @@ export class SyncService {
relativePath: RelativePath;
contentBytes: Uint8Array;
}): Promise<components["schemas"]["DocumentVersionWithoutContent"]> {
const formData = new FormData();
if (documentId !== undefined) {
formData.append("document_id", documentId);
}
formData.append("relative_path", relativePath);
formData.append("content", new Blob([contentBytes]));
const response = await this.client.POST(
"/vaults/{vault_id}/documents",
{
params: {
path: {
vault_id: this.settings.getSettings().vaultName
},
header: {
authorization: `Bearer ${this.settings.getSettings().token}`
}
},
// eslint-disable-next-line
body: formData as any // FormData is not supported by openapi-fetch
return this.withRetries(async () => {
const formData = new FormData();
if (documentId !== undefined) {
formData.append("document_id", documentId);
}
);
formData.append("relative_path", relativePath);
formData.append("content", new Blob([contentBytes]));
if (!response.data) {
throw new Error(
`Failed to create document: ${SyncService.formatError(response.error)}`
const response = await this.client.POST(
"/vaults/{vault_id}/documents",
{
params: {
path: {
vault_id: this.settings.getSettings().vaultName
},
header: {
authorization: `Bearer ${this.settings.getSettings().token}`
}
},
// eslint-disable-next-line
body: formData as any // FormData is not supported by openapi-fetch
}
);
}
this.logger.debug(
`Created document ${JSON.stringify(response.data)} with id ${
response.data.documentId
}`
);
if (!response.data) {
throw new Error(
`Failed to create document: ${SyncService.formatError(response.error)}`
);
}
return response.data;
this.logger.debug(
`Created document ${JSON.stringify(response.data)} with id ${
response.data.documentId
}`
);
return response.data;
});
}
public async put({
@ -111,44 +120,46 @@ export class SyncService {
relativePath: RelativePath;
contentBytes: Uint8Array;
}): Promise<components["schemas"]["DocumentUpdateResponse"]> {
this.logger.debug(
`Updating document ${documentId} with parent version ${parentVersionId} and relative path ${relativePath}`
);
const formData = new FormData();
formData.append("parent_version_id", parentVersionId.toString());
formData.append("relative_path", relativePath);
formData.append("content", new Blob([contentBytes]));
const response = await this.client.PUT(
"/vaults/{vault_id}/documents/{document_id}",
{
params: {
path: {
vault_id: this.settings.getSettings().vaultName,
document_id: documentId
},
header: {
authorization: `Bearer ${this.settings.getSettings().token}`
}
},
// eslint-disable-next-line
body: formData as any // FormData is not supported by openapi-fetch
}
);
if (!response.data) {
throw new Error(
`Failed to update document: ${SyncService.formatError(response.error)}`
return this.withRetries(async () => {
this.logger.debug(
`Updating document ${documentId} with parent version ${parentVersionId} and relative path ${relativePath}`
);
}
const formData = new FormData();
formData.append("parent_version_id", parentVersionId.toString());
formData.append("relative_path", relativePath);
formData.append("content", new Blob([contentBytes]));
this.logger.debug(
`Updated document ${JSON.stringify(response.data)} with id ${
response.data.documentId
}`
);
const response = await this.client.PUT(
"/vaults/{vault_id}/documents/{document_id}",
{
params: {
path: {
vault_id: this.settings.getSettings().vaultName,
document_id: documentId
},
header: {
authorization: `Bearer ${this.settings.getSettings().token}`
}
},
// eslint-disable-next-line
body: formData as any // FormData is not supported by openapi-fetch
}
);
return response.data;
if (!response.data) {
throw new Error(
`Failed to update document: ${SyncService.formatError(response.error)}`
);
}
this.logger.debug(
`Updated document ${JSON.stringify(response.data)} with id ${
response.data.documentId
}`
);
return response.data;
});
}
public async delete({
@ -158,33 +169,35 @@ export class SyncService {
documentId: DocumentId;
relativePath: RelativePath;
}): Promise<components["schemas"]["DocumentVersionWithoutContent"]> {
const response = await this.client.DELETE(
"/vaults/{vault_id}/documents/{document_id}",
{
params: {
path: {
vault_id: this.settings.getSettings().vaultName,
document_id: documentId
return this.withRetries(async () => {
const response = await this.client.DELETE(
"/vaults/{vault_id}/documents/{document_id}",
{
params: {
path: {
vault_id: this.settings.getSettings().vaultName,
document_id: documentId
},
header: {
authorization: `Bearer ${this.settings.getSettings().token}`
}
},
header: {
authorization: `Bearer ${this.settings.getSettings().token}`
body: {
relativePath
}
},
body: {
relativePath
}
);
if (response.error) {
throw new Error(`Failed to delete document`);
}
);
if (response.error) {
throw new Error(`Failed to delete document`);
}
this.logger.debug(
`Deleted document ${relativePath} with id ${documentId}`
);
this.logger.debug(
`Deleted document ${relativePath} with id ${documentId}`
);
return response.data;
return response.data;
});
}
public async get({
@ -192,63 +205,70 @@ export class SyncService {
}: {
documentId: DocumentId;
}): Promise<components["schemas"]["DocumentVersion"]> {
const response = await this.client.GET(
"/vaults/{vault_id}/documents/{document_id}",
{
params: {
path: {
vault_id: this.settings.getSettings().vaultName,
document_id: documentId
},
header: {
authorization: `Bearer ${this.settings.getSettings().token}`
return this.withRetries(async () => {
const response = await this.client.GET(
"/vaults/{vault_id}/documents/{document_id}",
{
params: {
path: {
vault_id: this.settings.getSettings().vaultName,
document_id: documentId
},
header: {
authorization: `Bearer ${this.settings.getSettings().token}`
}
}
}
}
);
if (!response.data) {
throw new Error(
`Failed to get document: ${SyncService.formatError(response.error)}`
);
}
this.logger.debug(
`Get document ${response.data.relativePath} with id ${response.data.documentId}`
);
if (!response.data) {
throw new Error(
`Failed to get document: ${SyncService.formatError(response.error)}`
);
}
return response.data;
this.logger.debug(
`Get document ${response.data.relativePath} with id ${response.data.documentId}`
);
return response.data;
});
}
public async getAll(
since?: VaultUpdateId
): Promise<components["schemas"]["FetchLatestDocumentsResponse"]> {
const response = await this.client.GET("/vaults/{vault_id}/documents", {
params: {
path: {
vault_id: this.settings.getSettings().vaultName
},
header: {
authorization: `Bearer ${this.settings.getSettings().token}`
},
query: {
since_update_id: since
return this.withRetries(async () => {
const response = await this.client.GET(
"/vaults/{vault_id}/documents",
{
params: {
path: {
vault_id: this.settings.getSettings().vaultName
},
header: {
authorization: `Bearer ${this.settings.getSettings().token}`
},
query: {
since_update_id: since
}
}
}
}
});
const { error } = response;
if (error) {
throw new Error(
`Failed to get documents: ${SyncService.formatError(response.error)}`
);
}
this.logger.debug(
`Got ${response.data.latestDocuments.length} document metadata`
);
const { error } = response;
if (error) {
throw new Error(
`Failed to get documents: ${SyncService.formatError(response.error)}`
);
}
return response.data;
this.logger.debug(
`Got ${response.data.latestDocuments.length} document metadata`
);
return response.data;
});
}
public async checkConnection(): Promise<CheckConnectionResult> {
@ -273,8 +293,9 @@ export class SyncService {
}
}
// No retries
private async ping(): Promise<components["schemas"]["PingResponse"]> {
const response = await this.clientWithoutRetries.GET("/ping", {
const response = await this.pingClient.GET("/ping", {
params: {
header: {
authorization: `Bearer ${this.settings.getSettings().token}`
@ -293,20 +314,34 @@ export class SyncService {
return response.data;
}
private createClient(remoteUri: string): void {
this.client = createClient<paths>({
baseUrl: remoteUri,
fetch: this.connectionStatus.getFetchImplementation(
this._fetchImplementation
)
});
/**
* Create a client and a ping client for the given remote URI.
*/
private createClient(remoteUri: string): [Client<paths>, Client<paths>] {
return [
createClient<paths>({
baseUrl: remoteUri,
fetch: this.connectionStatus.getFetchImplementation(
this.logger,
this._fetchImplementation
)
}),
createClient<paths>({
baseUrl: remoteUri,
fetch: this._fetchImplementation
})
];
}
this.clientWithoutRetries = createClient<paths>({
baseUrl: remoteUri,
fetch: this.connectionStatus.getFetchImplementation(
this._fetchImplementation,
{ doRetries: false }
)
});
private async withRetries<T>(fn: () => Promise<T>): Promise<T> {
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
while (true) {
try {
return await fn();
} catch (e) {
this.logger.error(`Failed network call (${e}), retrying`);
await sleep(1000);
}
}
}
}

View file

@ -177,7 +177,9 @@ export class UnrestrictedSyncer {
}
if (
document.metadata.parentVersionId >= response.vaultUpdateId
// `Syncer` creates fake local document metadata for all remote docs with invalid hashes. The parent IDs will likely match
// the latest versions so we still need to update the local versions to turn the fakes into real metadata.
document.metadata.parentVersionId > response.vaultUpdateId
) {
this.logger.debug(
`Document ${document.relativePath} is already more up to date than the fetched version`
@ -281,7 +283,7 @@ export class UnrestrictedSyncer {
remoteVersion.vaultUpdateId
) {
this.logger.debug(
`Document ${remoteVersion.relativePath} is already more up to date than the fetched version`
`Document ${remoteVersion.relativePath} is already at least as up to date as the fetched version`
);
return;
}