Add concurrency limit to service

This commit is contained in:
Andras Schmelczer 2024-12-18 20:37:54 +00:00
parent 5bea3c94c1
commit eb87de8e68
No known key found for this signature in database
GPG key ID: FC8F2C3D3D1A718C
9 changed files with 116 additions and 58 deletions

View file

@ -20,6 +20,7 @@
"obsidian": "1.7.2",
"openapi-fetch": "0.13.3",
"openapi-typescript": "7.4.4",
"p-queue": "^8.0.1",
"tslib": "2.4.0",
"typescript": "5.7.2"
}
@ -1543,6 +1544,13 @@
"node": ">=0.10.0"
}
},
"node_modules/eventemitter3": {
"version": "5.0.1",
"resolved": "https://registry.npmjs.org/eventemitter3/-/eventemitter3-5.0.1.tgz",
"integrity": "sha512-GWkBvjiSZK87ELrYOSESUYeVIc9mvLLf/nXalMOS5dYrgZq9o5OVkbZAVM06CVxYsCwH9BDZFPlQTlPA1j4ahA==",
"dev": true,
"license": "MIT"
},
"node_modules/fast-deep-equal": {
"version": "3.1.3",
"resolved": "https://registry.npmjs.org/fast-deep-equal/-/fast-deep-equal-3.1.3.tgz",
@ -2118,6 +2126,36 @@
"url": "https://github.com/sponsors/sindresorhus"
}
},
"node_modules/p-queue": {
"version": "8.0.1",
"resolved": "https://registry.npmjs.org/p-queue/-/p-queue-8.0.1.tgz",
"integrity": "sha512-NXzu9aQJTAzbBqOt2hwsR63ea7yvxJc0PwN/zobNAudYfb1B7R08SzB4TsLeSbUCuG467NhnoT0oO6w1qRO+BA==",
"dev": true,
"license": "MIT",
"dependencies": {
"eventemitter3": "^5.0.1",
"p-timeout": "^6.1.2"
},
"engines": {
"node": ">=18"
},
"funding": {
"url": "https://github.com/sponsors/sindresorhus"
}
},
"node_modules/p-timeout": {
"version": "6.1.3",
"resolved": "https://registry.npmjs.org/p-timeout/-/p-timeout-6.1.3.tgz",
"integrity": "sha512-UJUyfKbwvr/uZSV6btANfb+0t/mOhKV/KXcCUTp8FcQI+v/0d+wXqH4htrW0E4rR6WiEO/EPvUFiV9D5OI4vlw==",
"dev": true,
"license": "MIT",
"engines": {
"node": ">=14.16"
},
"funding": {
"url": "https://github.com/sponsors/sindresorhus"
}
},
"node_modules/parent-module": {
"version": "1.0.1",
"resolved": "https://registry.npmjs.org/parent-module/-/parent-module-1.0.1.tgz",

View file

@ -25,6 +25,7 @@
"openapi-fetch": "0.13.3",
"openapi-typescript": "7.4.4",
"tslib": "2.4.0",
"typescript": "5.7.2"
"typescript": "5.7.2",
"p-queue": "^8.0.1"
}
}

View file

@ -3,6 +3,7 @@ export interface SyncSettings {
token: string;
vaultName: string;
fetchChangesUpdateIntervalMs: number;
uploadConcurrency: number;
isSyncEnabled: boolean;
}
@ -11,5 +12,6 @@ export const DEFAULT_SETTINGS: SyncSettings = {
token: "",
vaultName: "default",
fetchChangesUpdateIntervalMs: 1000,
uploadConcurrency: 10,
isSyncEnabled: true,
};

View file

@ -1,7 +1,7 @@
import { TAbstractFile, TFile } from "obsidian";
import { FileEventHandler } from "./file-event-handler";
import { Logger } from "src/logger";
import { SyncServer } from "src/services/sync_service";
import { SyncService } from "src/services/sync_service";
import { Database } from "src/database/database";
import { syncLocallyDeletedFile } from "src/sync-operations/sync-locally-deleted-file";
import { syncLocallyUpdatedFile } from "src/sync-operations/sync-locally-updated-file";
@ -11,7 +11,7 @@ import { syncLocallyCreatedFile } from "src/sync-operations/sync-locally-created
export class SyncEventHandler implements FileEventHandler {
public constructor(
private database: Database,
private syncServer: SyncServer,
private syncServer: SyncService,
private operations: FileOperations
) {}
@ -49,11 +49,11 @@ export class SyncEventHandler implements FileEventHandler {
return;
}
await syncLocallyDeletedFile(
this.database,
this.syncServer,
file.path
);
await syncLocallyDeletedFile({
database: this.database,
syncServer: this.syncServer,
relativePath: file.path,
});
} else {
Logger.getInstance().info(`Folder deleted: ${file.path}, ignored`);
}

View file

@ -10,13 +10,22 @@ import {
RelativePath,
DocumentId,
} from "src/database/document-metadata.js";
import PQueue from "p-queue";
export class SyncServer {
export class SyncService {
private promiseQueue: PQueue;
private client: Client<paths>;
public constructor(private database: Database) {
this.createClient(database.getSettings());
database.addOnSettingsChangeHandlers((s) => this.createClient(s));
this.promiseQueue = new PQueue({
concurrency: database.getSettings().uploadConcurrency,
});
database.addOnSettingsChangeHandlers((s) => {
this.createClient(s);
this.promiseQueue.concurrency = s.uploadConcurrency;
});
}
private createClient(settings: SyncSettings) {
@ -25,15 +34,21 @@ export class SyncServer {
});
}
private enqueue<T>(fn: () => Promise<T>): Promise<T> {
return this.promiseQueue.add(fn) as Promise<T>;
}
public async ping(): Promise<components["schemas"]["PingResponse"]> {
const response = await this.client.GET("/ping", {
params: {
header: {
authorization:
"Bearer " + this.database.getSettings().token,
const response = await this.enqueue(() =>
this.client.GET("/ping", {
params: {
header: {
authorization:
"Bearer " + this.database.getSettings().token,
},
},
},
});
})
);
Logger.getInstance().debug(
"Ping response: " + JSON.stringify(response.data)
@ -55,9 +70,8 @@ export class SyncServer {
contentBytes: Uint8Array;
createdDate: Date;
}): Promise<components["schemas"]["DocumentVersion"]> {
const response = await this.client.POST(
"/vaults/{vault_id}/documents",
{
const response = await this.enqueue(() =>
this.client.POST("/vaults/{vault_id}/documents", {
params: {
path: {
vault_id: this.database.getSettings().vaultName,
@ -72,7 +86,7 @@ export class SyncServer {
createdDate: createdDate.toISOString(),
relativePath,
},
}
})
);
if (!response.data) {
@ -99,9 +113,8 @@ export class SyncServer {
contentBytes: Uint8Array;
createdDate: Date;
}): Promise<components["schemas"]["DocumentVersion"]> {
const response = await this.client.PUT(
"/vaults/{vault_id}/documents/{document_id}",
{
const response = await this.enqueue(() =>
this.client.PUT("/vaults/{vault_id}/documents/{document_id}", {
params: {
path: {
vault_id: this.database.getSettings().vaultName,
@ -118,7 +131,7 @@ export class SyncServer {
createdDate: createdDate.toISOString(),
relativePath,
},
}
})
);
if (!response.data) {
@ -141,9 +154,8 @@ export class SyncServer {
relativePath: RelativePath;
createdDate: Date;
}): Promise<void> {
const response = await this.client.DELETE(
"/vaults/{vault_id}/documents/{document_id}",
{
const response = await this.enqueue(() =>
this.client.DELETE("/vaults/{vault_id}/documents/{document_id}", {
params: {
path: {
vault_id: this.database.getSettings().vaultName,
@ -158,7 +170,7 @@ export class SyncServer {
createdDate: createdDate.toISOString(),
relativePath,
},
}
})
);
if (response.error) {
@ -177,9 +189,8 @@ export class SyncServer {
}: {
documentId: DocumentId;
}): Promise<components["schemas"]["DocumentVersion"]> {
const response = await this.client.GET(
"/vaults/{vault_id}/documents/{document_id}",
{
const response = await this.enqueue(() =>
this.client.GET("/vaults/{vault_id}/documents/{document_id}", {
params: {
path: {
vault_id: this.database.getSettings().vaultName,
@ -190,7 +201,7 @@ export class SyncServer {
"Bearer " + this.database.getSettings().token,
},
},
}
})
);
if (!response.data) {
@ -207,20 +218,22 @@ export class SyncServer {
public async getAll(
since?: VaultUpdateId
): Promise<components["schemas"]["FetchLatestDocumentsResponse"]> {
const response = await this.client.GET("/vaults/{vault_id}/documents", {
params: {
path: {
vault_id: this.database.getSettings().vaultName,
const response = await this.enqueue(() =>
this.client.GET("/vaults/{vault_id}/documents", {
params: {
path: {
vault_id: this.database.getSettings().vaultName,
},
header: {
authorization:
"Bearer " + this.database.getSettings().token,
},
query: {
since_update_id: since,
},
},
header: {
authorization:
"Bearer " + this.database.getSettings().token,
},
query: {
since_update_id: since,
},
},
});
})
);
if (!response.data) {
throw new Error(`Failed to get documents: ${response.error}`);

View file

@ -1,7 +1,7 @@
import * as lib from "../../../backend/sync_lib/pkg/sync_lib.js";
import { Database } from "src/database/database";
import { Logger } from "src/logger";
import { SyncServer } from "src/services/sync_service";
import { SyncService } from "src/services/sync_service";
import { hash } from "src/utils/hash";
import { unlockDocument, waitForDocumentLock } from "./locks.js";
import { FileOperations } from "src/file-operations/file-operations.js";
@ -16,7 +16,7 @@ export async function syncLocallyCreatedFile({
filePath,
}: {
database: Database;
syncServer: SyncServer;
syncServer: SyncService;
operations: FileOperations;
updateTime: Date;
filePath: RelativePath;

View file

@ -1,14 +1,18 @@
import { Database } from "src/database/database";
import { RelativePath } from "src/database/document-metadata";
import { Logger } from "src/logger";
import { SyncServer } from "src/services/sync_service";
import { SyncService } from "src/services/sync_service";
import { unlockDocument, waitForDocumentLock } from "./locks";
export async function syncLocallyDeletedFile(
database: Database,
syncServer: SyncServer,
relativePath: RelativePath
): Promise<void> {
export async function syncLocallyDeletedFile({
database,
syncServer,
relativePath,
}: {
database: Database;
syncServer: SyncService;
relativePath: RelativePath;
}): Promise<void> {
await waitForDocumentLock(relativePath);
try {

View file

@ -1,7 +1,7 @@
import * as lib from "../../../backend/sync_lib/pkg/sync_lib.js";
import { Database } from "src/database/database";
import { Logger } from "src/logger";
import { SyncServer } from "src/services/sync_service";
import { SyncService } from "src/services/sync_service";
import { hash } from "src/utils/hash";
import { unlockDocument, waitForDocumentLock } from "./locks.js";
import { FileOperations } from "src/file-operations/file-operations.js";
@ -17,7 +17,7 @@ export async function syncLocallyUpdatedFile({
oldPath,
}: {
database: Database;
syncServer: SyncServer;
syncServer: SyncService;
operations: FileOperations;
updateTime: Date;
filePath: RelativePath;

View file

@ -1,6 +1,6 @@
import { Database } from "src/database/database";
import { unlockDocument, waitForDocumentLock } from "./locks";
import { SyncServer } from "src/services/sync_service";
import { SyncService } from "src/services/sync_service";
import * as lib from "../../../backend/sync_lib/pkg/sync_lib.js";
import { hash } from "src/utils/hash";
import { Logger } from "src/logger";
@ -14,7 +14,7 @@ export async function syncRemotelyUpdatedFile({
remoteVersion,
}: {
database: Database;
syncServer: SyncServer;
syncServer: SyncService;
operations: FileOperations;
remoteVersion: components["schemas"]["DocumentVersionWithoutContent"];
}): Promise<void> {