From eb87de8e6803fd47e46c6dec09663474c34b137f Mon Sep 17 00:00:00 2001 From: Andras Schmelczer Date: Wed, 18 Dec 2024 20:37:54 +0000 Subject: [PATCH] Add concurrency limit to service --- plugin/package-lock.json | 38 ++++++++ plugin/package.json | 3 +- plugin/src/database/sync-settings.ts | 2 + plugin/src/events/sync-event-handler.ts | 14 +-- plugin/src/services/sync_service.ts | 89 +++++++++++-------- .../sync-locally-created-file.ts | 4 +- .../sync-locally-deleted-file.ts | 16 ++-- .../sync-locally-updated-file.ts | 4 +- .../sync-remotely-updated-file.ts | 4 +- 9 files changed, 116 insertions(+), 58 deletions(-) diff --git a/plugin/package-lock.json b/plugin/package-lock.json index fef5893..9adb954 100644 --- a/plugin/package-lock.json +++ b/plugin/package-lock.json @@ -20,6 +20,7 @@ "obsidian": "1.7.2", "openapi-fetch": "0.13.3", "openapi-typescript": "7.4.4", + "p-queue": "^8.0.1", "tslib": "2.4.0", "typescript": "5.7.2" } @@ -1543,6 +1544,13 @@ "node": ">=0.10.0" } }, + "node_modules/eventemitter3": { + "version": "5.0.1", + "resolved": "https://registry.npmjs.org/eventemitter3/-/eventemitter3-5.0.1.tgz", + "integrity": "sha512-GWkBvjiSZK87ELrYOSESUYeVIc9mvLLf/nXalMOS5dYrgZq9o5OVkbZAVM06CVxYsCwH9BDZFPlQTlPA1j4ahA==", + "dev": true, + "license": "MIT" + }, "node_modules/fast-deep-equal": { "version": "3.1.3", "resolved": "https://registry.npmjs.org/fast-deep-equal/-/fast-deep-equal-3.1.3.tgz", @@ -2118,6 +2126,36 @@ "url": "https://github.com/sponsors/sindresorhus" } }, + "node_modules/p-queue": { + "version": "8.0.1", + "resolved": "https://registry.npmjs.org/p-queue/-/p-queue-8.0.1.tgz", + "integrity": "sha512-NXzu9aQJTAzbBqOt2hwsR63ea7yvxJc0PwN/zobNAudYfb1B7R08SzB4TsLeSbUCuG467NhnoT0oO6w1qRO+BA==", + "dev": true, + "license": "MIT", + "dependencies": { + "eventemitter3": "^5.0.1", + "p-timeout": "^6.1.2" + }, + "engines": { + "node": ">=18" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, + "node_modules/p-timeout": { + "version": "6.1.3", + "resolved": "https://registry.npmjs.org/p-timeout/-/p-timeout-6.1.3.tgz", + "integrity": "sha512-UJUyfKbwvr/uZSV6btANfb+0t/mOhKV/KXcCUTp8FcQI+v/0d+wXqH4htrW0E4rR6WiEO/EPvUFiV9D5OI4vlw==", + "dev": true, + "license": "MIT", + "engines": { + "node": ">=14.16" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, "node_modules/parent-module": { "version": "1.0.1", "resolved": "https://registry.npmjs.org/parent-module/-/parent-module-1.0.1.tgz", diff --git a/plugin/package.json b/plugin/package.json index 958e24e..1ff7a97 100644 --- a/plugin/package.json +++ b/plugin/package.json @@ -25,6 +25,7 @@ "openapi-fetch": "0.13.3", "openapi-typescript": "7.4.4", "tslib": "2.4.0", - "typescript": "5.7.2" + "typescript": "5.7.2", + "p-queue": "^8.0.1" } } \ No newline at end of file diff --git a/plugin/src/database/sync-settings.ts b/plugin/src/database/sync-settings.ts index d4eb9a8..090fb7d 100644 --- a/plugin/src/database/sync-settings.ts +++ b/plugin/src/database/sync-settings.ts @@ -3,6 +3,7 @@ export interface SyncSettings { token: string; vaultName: string; fetchChangesUpdateIntervalMs: number; + uploadConcurrency: number; isSyncEnabled: boolean; } @@ -11,5 +12,6 @@ export const DEFAULT_SETTINGS: SyncSettings = { token: "", vaultName: "default", fetchChangesUpdateIntervalMs: 1000, + uploadConcurrency: 10, isSyncEnabled: true, }; diff --git a/plugin/src/events/sync-event-handler.ts b/plugin/src/events/sync-event-handler.ts index f345477..8fb258c 100644 --- a/plugin/src/events/sync-event-handler.ts +++ b/plugin/src/events/sync-event-handler.ts @@ -1,7 +1,7 @@ import { TAbstractFile, TFile } from "obsidian"; import { FileEventHandler } from "./file-event-handler"; import { Logger } from "src/logger"; -import { SyncServer } from "src/services/sync_service"; +import { SyncService } from "src/services/sync_service"; import { Database } from "src/database/database"; import { syncLocallyDeletedFile } from "src/sync-operations/sync-locally-deleted-file"; import { syncLocallyUpdatedFile } from "src/sync-operations/sync-locally-updated-file"; @@ -11,7 +11,7 @@ import { syncLocallyCreatedFile } from "src/sync-operations/sync-locally-created export class SyncEventHandler implements FileEventHandler { public constructor( private database: Database, - private syncServer: SyncServer, + private syncServer: SyncService, private operations: FileOperations ) {} @@ -49,11 +49,11 @@ export class SyncEventHandler implements FileEventHandler { return; } - await syncLocallyDeletedFile( - this.database, - this.syncServer, - file.path - ); + await syncLocallyDeletedFile({ + database: this.database, + syncServer: this.syncServer, + relativePath: file.path, + }); } else { Logger.getInstance().info(`Folder deleted: ${file.path}, ignored`); } diff --git a/plugin/src/services/sync_service.ts b/plugin/src/services/sync_service.ts index caa9878..592f164 100644 --- a/plugin/src/services/sync_service.ts +++ b/plugin/src/services/sync_service.ts @@ -10,13 +10,22 @@ import { RelativePath, DocumentId, } from "src/database/document-metadata.js"; +import PQueue from "p-queue"; -export class SyncServer { +export class SyncService { + private promiseQueue: PQueue; private client: Client; public constructor(private database: Database) { this.createClient(database.getSettings()); - database.addOnSettingsChangeHandlers((s) => this.createClient(s)); + this.promiseQueue = new PQueue({ + concurrency: database.getSettings().uploadConcurrency, + }); + + database.addOnSettingsChangeHandlers((s) => { + this.createClient(s); + this.promiseQueue.concurrency = s.uploadConcurrency; + }); } private createClient(settings: SyncSettings) { @@ -25,15 +34,21 @@ export class SyncServer { }); } + private enqueue(fn: () => Promise): Promise { + return this.promiseQueue.add(fn) as Promise; + } + public async ping(): Promise { - const response = await this.client.GET("/ping", { - params: { - header: { - authorization: - "Bearer " + this.database.getSettings().token, + const response = await this.enqueue(() => + this.client.GET("/ping", { + params: { + header: { + authorization: + "Bearer " + this.database.getSettings().token, + }, }, - }, - }); + }) + ); Logger.getInstance().debug( "Ping response: " + JSON.stringify(response.data) @@ -55,9 +70,8 @@ export class SyncServer { contentBytes: Uint8Array; createdDate: Date; }): Promise { - const response = await this.client.POST( - "/vaults/{vault_id}/documents", - { + const response = await this.enqueue(() => + this.client.POST("/vaults/{vault_id}/documents", { params: { path: { vault_id: this.database.getSettings().vaultName, @@ -72,7 +86,7 @@ export class SyncServer { createdDate: createdDate.toISOString(), relativePath, }, - } + }) ); if (!response.data) { @@ -99,9 +113,8 @@ export class SyncServer { contentBytes: Uint8Array; createdDate: Date; }): Promise { - const response = await this.client.PUT( - "/vaults/{vault_id}/documents/{document_id}", - { + const response = await this.enqueue(() => + this.client.PUT("/vaults/{vault_id}/documents/{document_id}", { params: { path: { vault_id: this.database.getSettings().vaultName, @@ -118,7 +131,7 @@ export class SyncServer { createdDate: createdDate.toISOString(), relativePath, }, - } + }) ); if (!response.data) { @@ -141,9 +154,8 @@ export class SyncServer { relativePath: RelativePath; createdDate: Date; }): Promise { - const response = await this.client.DELETE( - "/vaults/{vault_id}/documents/{document_id}", - { + const response = await this.enqueue(() => + this.client.DELETE("/vaults/{vault_id}/documents/{document_id}", { params: { path: { vault_id: this.database.getSettings().vaultName, @@ -158,7 +170,7 @@ export class SyncServer { createdDate: createdDate.toISOString(), relativePath, }, - } + }) ); if (response.error) { @@ -177,9 +189,8 @@ export class SyncServer { }: { documentId: DocumentId; }): Promise { - const response = await this.client.GET( - "/vaults/{vault_id}/documents/{document_id}", - { + const response = await this.enqueue(() => + this.client.GET("/vaults/{vault_id}/documents/{document_id}", { params: { path: { vault_id: this.database.getSettings().vaultName, @@ -190,7 +201,7 @@ export class SyncServer { "Bearer " + this.database.getSettings().token, }, }, - } + }) ); if (!response.data) { @@ -207,20 +218,22 @@ export class SyncServer { public async getAll( since?: VaultUpdateId ): Promise { - const response = await this.client.GET("/vaults/{vault_id}/documents", { - params: { - path: { - vault_id: this.database.getSettings().vaultName, + const response = await this.enqueue(() => + this.client.GET("/vaults/{vault_id}/documents", { + params: { + path: { + vault_id: this.database.getSettings().vaultName, + }, + header: { + authorization: + "Bearer " + this.database.getSettings().token, + }, + query: { + since_update_id: since, + }, }, - header: { - authorization: - "Bearer " + this.database.getSettings().token, - }, - query: { - since_update_id: since, - }, - }, - }); + }) + ); if (!response.data) { throw new Error(`Failed to get documents: ${response.error}`); diff --git a/plugin/src/sync-operations/sync-locally-created-file.ts b/plugin/src/sync-operations/sync-locally-created-file.ts index 81a9cd0..f6227fb 100644 --- a/plugin/src/sync-operations/sync-locally-created-file.ts +++ b/plugin/src/sync-operations/sync-locally-created-file.ts @@ -1,7 +1,7 @@ import * as lib from "../../../backend/sync_lib/pkg/sync_lib.js"; import { Database } from "src/database/database"; import { Logger } from "src/logger"; -import { SyncServer } from "src/services/sync_service"; +import { SyncService } from "src/services/sync_service"; import { hash } from "src/utils/hash"; import { unlockDocument, waitForDocumentLock } from "./locks.js"; import { FileOperations } from "src/file-operations/file-operations.js"; @@ -16,7 +16,7 @@ export async function syncLocallyCreatedFile({ filePath, }: { database: Database; - syncServer: SyncServer; + syncServer: SyncService; operations: FileOperations; updateTime: Date; filePath: RelativePath; diff --git a/plugin/src/sync-operations/sync-locally-deleted-file.ts b/plugin/src/sync-operations/sync-locally-deleted-file.ts index d2adf27..2382c7a 100644 --- a/plugin/src/sync-operations/sync-locally-deleted-file.ts +++ b/plugin/src/sync-operations/sync-locally-deleted-file.ts @@ -1,14 +1,18 @@ import { Database } from "src/database/database"; import { RelativePath } from "src/database/document-metadata"; import { Logger } from "src/logger"; -import { SyncServer } from "src/services/sync_service"; +import { SyncService } from "src/services/sync_service"; import { unlockDocument, waitForDocumentLock } from "./locks"; -export async function syncLocallyDeletedFile( - database: Database, - syncServer: SyncServer, - relativePath: RelativePath -): Promise { +export async function syncLocallyDeletedFile({ + database, + syncServer, + relativePath, +}: { + database: Database; + syncServer: SyncService; + relativePath: RelativePath; +}): Promise { await waitForDocumentLock(relativePath); try { diff --git a/plugin/src/sync-operations/sync-locally-updated-file.ts b/plugin/src/sync-operations/sync-locally-updated-file.ts index 951e36b..0ead111 100644 --- a/plugin/src/sync-operations/sync-locally-updated-file.ts +++ b/plugin/src/sync-operations/sync-locally-updated-file.ts @@ -1,7 +1,7 @@ import * as lib from "../../../backend/sync_lib/pkg/sync_lib.js"; import { Database } from "src/database/database"; import { Logger } from "src/logger"; -import { SyncServer } from "src/services/sync_service"; +import { SyncService } from "src/services/sync_service"; import { hash } from "src/utils/hash"; import { unlockDocument, waitForDocumentLock } from "./locks.js"; import { FileOperations } from "src/file-operations/file-operations.js"; @@ -17,7 +17,7 @@ export async function syncLocallyUpdatedFile({ oldPath, }: { database: Database; - syncServer: SyncServer; + syncServer: SyncService; operations: FileOperations; updateTime: Date; filePath: RelativePath; diff --git a/plugin/src/sync-operations/sync-remotely-updated-file.ts b/plugin/src/sync-operations/sync-remotely-updated-file.ts index f817fa3..8e10a28 100644 --- a/plugin/src/sync-operations/sync-remotely-updated-file.ts +++ b/plugin/src/sync-operations/sync-remotely-updated-file.ts @@ -1,6 +1,6 @@ import { Database } from "src/database/database"; import { unlockDocument, waitForDocumentLock } from "./locks"; -import { SyncServer } from "src/services/sync_service"; +import { SyncService } from "src/services/sync_service"; import * as lib from "../../../backend/sync_lib/pkg/sync_lib.js"; import { hash } from "src/utils/hash"; import { Logger } from "src/logger"; @@ -14,7 +14,7 @@ export async function syncRemotelyUpdatedFile({ remoteVersion, }: { database: Database; - syncServer: SyncServer; + syncServer: SyncService; operations: FileOperations; remoteVersion: components["schemas"]["DocumentVersionWithoutContent"]; }): Promise {