This commit is contained in:
Andras Schmelczer 2025-03-28 22:19:24 +00:00
parent 5058e71412
commit 226b4f1db9
No known key found for this signature in database
GPG key ID: FC8F2C3D3D1A718C

View file

@ -1,4 +1,8 @@
import type { Database, RelativePath } from "../persistence/database";
import type {
Database,
DocumentId,
RelativePath
} from "../persistence/database";
import type { SyncService } from "../services/sync-service";
import type { Logger } from "../tracing/logger";
import PQueue from "p-queue";
@ -11,8 +15,10 @@ import { findMatchingFile } from "../utils/find-matching-file";
import type { UnrestrictedSyncer } from "./unrestricted-syncer";
import { createPromise } from "../utils/create-promise";
import { SyncResetError } from "../services/sync-reset-error";
import { Locks } from "../utils/locks";
export class Syncer {
private readonly remoteDocumentsLock: Locks<DocumentId>;
private readonly remainingOperationsListeners: ((
remainingOperations: number
) => void)[] = [];
@ -38,6 +44,8 @@ export class Syncer {
this.updateWebSocket(settings.getSettings());
this.remoteDocumentsLock = new Locks<DocumentId>(this.logger);
settings.addOnSettingsChangeListener((newSettings, oldSettings) => {
if (
newSettings.remoteUri !== oldSettings.remoteUri ||
@ -64,7 +72,6 @@ export class Syncer {
public async reset(): Promise<void> {
await this.waitUntilFinished();
this.internalSyncer.reset();
this.setWebSocketRefreshInterval();
this.updateWebSocket(this.settings.getSettings());
}
@ -255,6 +262,7 @@ export class Syncer {
typeof globalThis.WebSocket === "undefined"
) {
// polyfill for WebSocket in Node.js
// eslint-disable-next-line
globalThis.WebSocket = require("ws");
}
@ -300,33 +308,52 @@ export class Syncer {
remoteVersion.documentId
);
const [promise, resolve, reject] = createPromise();
let hasLockToRelease = false;
if (document === undefined) {
await this.syncQueue.add(async () =>
this.internalSyncer.unrestrictedSyncRemotelyUpdatedFile(
remoteVersion
)
// Let's avoid the same documents getting created in parallel multiple times
await this.remoteDocumentsLock.waitForLock(
remoteVersion.documentId
);
} else {
document = await this.database.getResolvedDocumentByRelativePath(
document.relativePath,
promise
hasLockToRelease = true;
document = this.database.getDocumentByDocumentId(
remoteVersion.documentId
);
}
try {
try {
if (document === undefined) {
await this.syncQueue.add(async () =>
this.internalSyncer.unrestrictedSyncRemotelyUpdatedFile(
remoteVersion,
document
remoteVersion
)
);
} else {
const [promise, resolve, reject] = createPromise();
resolve();
} catch (e) {
reject(e);
} finally {
this.database.removeDocumentPromise(promise);
document =
await this.database.getResolvedDocumentByRelativePath(
document.relativePath,
promise
);
try {
await this.syncQueue.add(async () =>
this.internalSyncer.unrestrictedSyncRemotelyUpdatedFile(
remoteVersion,
document
)
);
resolve();
} catch (e) {
reject(e);
} finally {
this.database.removeDocumentPromise(promise);
}
}
} finally {
if (hasLockToRelease) {
this.remoteDocumentsLock.unlock(remoteVersion.documentId);
}
}
}