Remove HTTP queuing

This commit is contained in:
Andras Schmelczer 2025-01-02 10:53:04 +00:00
parent 2983357946
commit fe66c0751d
No known key found for this signature in database
GPG key ID: FC8F2C3D3D1A718C

View file

@ -10,74 +10,29 @@ import type {
RelativePath,
VaultUpdateId,
} from "src/database/document-metadata";
import PQueue from "p-queue";
import { Logger } from "src/tracing/logger.js";
export interface RequestCountStatus {
waiting: number;
success: number;
failure: number;
}
export class SyncService {
private client: Client<paths>;
private readonly promiseQueue: PQueue;
private readonly requestCountListeners: ((
status: RequestCountStatus
) => void)[] = [];
private readonly status: RequestCountStatus = {
waiting: 0,
success: 0,
failure: 0,
};
public constructor(private readonly database: Database) {
this.createClient(database.getSettings());
this.promiseQueue = new PQueue({
concurrency: database.getSettings().uploadConcurrency,
});
database.addOnSettingsChangeHandlers((s) => {
this.createClient(s);
this.promiseQueue.concurrency = s.uploadConcurrency;
});
this.promiseQueue.on("active", () => {
this.status.waiting = this.promiseQueue.pending;
this.emitRequestCountChange();
});
this.promiseQueue.on("completed", () => {
this.status.success++;
this.emitRequestCountChange();
});
this.promiseQueue.on("error", () => {
this.status.failure++;
this.emitRequestCountChange();
});
}
public addRequestCountChangeListener(
listener: (status: RequestCountStatus) => void
): void {
this.requestCountListeners.push(listener);
listener({ ...this.status });
}
public async ping(): Promise<components["schemas"]["PingResponse"]> {
const response = await this.enqueue(async () =>
this.client.GET("/ping", {
params: {
header: {
authorization: `Bearer ${
this.database.getSettings().token
}`,
},
const response = await this.client.GET("/ping", {
params: {
header: {
authorization: `Bearer ${
this.database.getSettings().token
}`,
},
})
);
},
});
Logger.getInstance().debug(
`Ping response: ${JSON.stringify(response.data)}`
@ -99,8 +54,9 @@ export class SyncService {
contentBytes: Uint8Array;
createdDate: Date;
}): Promise<components["schemas"]["DocumentVersion"]> {
const response = await this.enqueue(async () =>
this.client.POST("/vaults/{vault_id}/documents", {
const response = await this.client.POST(
"/vaults/{vault_id}/documents",
{
params: {
path: {
vault_id: this.database.getSettings().vaultName,
@ -116,7 +72,7 @@ export class SyncService {
createdDate: createdDate.toISOString(),
relativePath,
},
})
}
);
if (!response.data) {
@ -124,7 +80,9 @@ export class SyncService {
}
Logger.getInstance().debug(
`Created document ${JSON.stringify(response.data)}`
`Created document ${JSON.stringify(
response.data.relativePath
)} with id ${response.data.documentId}`
);
return response.data;
@ -143,8 +101,9 @@ export class SyncService {
contentBytes: Uint8Array;
createdDate: Date;
}): Promise<components["schemas"]["DocumentVersion"]> {
const response = await this.enqueue(async () =>
this.client.PUT("/vaults/{vault_id}/documents/{document_id}", {
const response = await this.client.PUT(
"/vaults/{vault_id}/documents/{document_id}",
{
params: {
path: {
vault_id: this.database.getSettings().vaultName,
@ -162,7 +121,7 @@ export class SyncService {
createdDate: createdDate.toISOString(),
relativePath,
},
})
}
);
if (!response.data) {
@ -170,7 +129,7 @@ export class SyncService {
}
Logger.getInstance().debug(
`Updated document ${JSON.stringify(response.data)}`
`Updated document ${response.data.relativePath} with id ${response.data.documentId}`
);
return response.data;
@ -185,8 +144,9 @@ export class SyncService {
relativePath: RelativePath;
createdDate: Date;
}): Promise<void> {
const response = await this.enqueue(async () =>
this.client.DELETE("/vaults/{vault_id}/documents/{document_id}", {
const response = await this.client.DELETE(
"/vaults/{vault_id}/documents/{document_id}",
{
params: {
path: {
vault_id: this.database.getSettings().vaultName,
@ -202,7 +162,7 @@ export class SyncService {
createdDate: createdDate.toISOString(),
relativePath,
},
})
}
);
if (response.error) {
@ -210,7 +170,7 @@ export class SyncService {
}
Logger.getInstance().debug(
`Updated document ${JSON.stringify(response.data)}`
`Deleted document ${relativePath} with id ${documentId}`
);
return response.data;
@ -221,8 +181,9 @@ export class SyncService {
}: {
documentId: DocumentId;
}): Promise<components["schemas"]["DocumentVersion"]> {
const response = await this.enqueue(async () =>
this.client.GET("/vaults/{vault_id}/documents/{document_id}", {
const response = await this.client.GET(
"/vaults/{vault_id}/documents/{document_id}",
{
params: {
path: {
vault_id: this.database.getSettings().vaultName,
@ -234,7 +195,7 @@ export class SyncService {
}`,
},
},
})
}
);
if (!response.data) {
@ -242,7 +203,7 @@ export class SyncService {
}
Logger.getInstance().debug(
`Get document ${JSON.stringify(response.data)}`
`Get document ${response.data.relativePath} with id ${response.data.documentId}`
);
return response.data;
@ -251,23 +212,21 @@ export class SyncService {
public async getAll(
since?: VaultUpdateId
): Promise<components["schemas"]["FetchLatestDocumentsResponse"]> {
const response = await this.enqueue(async () =>
this.client.GET("/vaults/{vault_id}/documents", {
params: {
path: {
vault_id: this.database.getSettings().vaultName,
},
header: {
authorization: `Bearer ${
this.database.getSettings().token
}`,
},
query: {
since_update_id: since,
},
const response = await this.client.GET("/vaults/{vault_id}/documents", {
params: {
path: {
vault_id: this.database.getSettings().vaultName,
},
})
);
header: {
authorization: `Bearer ${
this.database.getSettings().token
}`,
},
query: {
since_update_id: since,
},
},
});
const { error } = response;
if (error) {
@ -275,26 +234,15 @@ export class SyncService {
}
Logger.getInstance().debug(
`Get document ${JSON.stringify(response.data)}`
`Got ${response.data.latestDocuments.length} document metadata`
);
return response.data;
}
private emitRequestCountChange(): void {
this.requestCountListeners.forEach((listener) => {
listener({ ...this.status });
});
}
private createClient(settings: SyncSettings): void {
this.client = createClient<paths>({
baseUrl: settings.remoteUri,
});
}
private async enqueue<T>(fn: () => Promise<T>): Promise<T> {
// eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion
return this.promiseQueue.add(fn) as Promise<T>;
}
}