Use locks

This commit is contained in:
Andras Schmelczer 2026-01-22 20:21:30 +00:00
parent 4fb4b498a1
commit 727b6b7ed5
10 changed files with 245 additions and 317 deletions

View file

@ -21,12 +21,14 @@ import type { WebSocketClientMessage } from "../services/types/WebSocketClientMe
import { awaitAll } from "../utils/await-all";
import { EventListeners } from "../utils/data-structures/event-listeners";
export const __debug_locks: Locks<any>[] = []; // Used only for debugging timeouts
export class Syncer {
public readonly onRemainingOperationsCountChanged = new EventListeners<
(remainingOperations: number) => unknown
>();
private readonly remoteDocumentsLock: Locks<DocumentId>;
public readonly updatedDocumentsByPathAndKeysLock: Locks<DocumentId | RelativePath>;
// FIFO to limit the number of concurrent sync operations
private readonly syncQueue: PQueue;
@ -48,7 +50,8 @@ export class Syncer {
concurrency: settings.getSettings().syncConcurrency
});
this.remoteDocumentsLock = new Locks<DocumentId>(this.logger);
this.updatedDocumentsByPathAndKeysLock = new Locks<DocumentId>(this.logger);
__debug_locks.push(this.updatedDocumentsByPathAndKeysLock); // Used only for debugging timeouts
settings.onSettingsChanged.add((newSettings, oldSettings) => {
if (newSettings.syncConcurrency !== oldSettings.syncConcurrency) {
@ -80,6 +83,10 @@ export class Syncer {
return this._isFirstSyncComplete;
}
public hasPendingOperationsForDocument(relativePath: string): boolean {
return this.updatedDocumentsByPathAndKeysLock.isLocked(relativePath);
}
public async syncLocallyCreatedFile(
relativePath: RelativePath
): Promise<void> {
@ -95,33 +102,27 @@ export class Syncer {
return;
}
const [promise, resolve, reject] = createPromise();
const document = this.database.createNewPendingDocument(
relativePath,
promise
relativePath
);
try {
await this.syncQueue.add(async () =>
this.unrestrictedSyncer.unrestrictedSyncLocallyCreatedOrUpdatedFile(
{ document }
)
);
resolve();
} catch (e) {
reject(e);
} finally {
this.database.removeDocumentPromise(promise);
}
await this.enqueueSyncOperation(async () =>
this.unrestrictedSyncer.unrestrictedSyncLocallyCreatedOrUpdatedFile(
{
document
}
), [relativePath]
);
}
public async syncLocallyDeletedFile(
relativePath: RelativePath
): Promise<void> {
const document = this.database.getLatestDocumentByRelativePath(relativePath);
if (
this.database.getLatestDocumentByRelativePath(relativePath)
document
?.isDeleted === true
) {
// This is must be a consequence of us deleting a file because of a remote update
@ -136,28 +137,25 @@ export class Syncer {
// document which finishes after the delete has succeeded and would introduce a phantom metadata record.
this.database.delete(relativePath);
const [promise, resolve, reject] = createPromise();
const document = await this.database.getResolvedDocumentByRelativePath(
relativePath,
promise
);
try {
await this.syncQueue.add(async () =>
this.unrestrictedSyncer.unrestrictedSyncLocallyDeletedFile(
document
)
await this.enqueueSyncOperation(async () => {
const document = this.database.getLatestDocumentByRelativePath(relativePath);
if (document === undefined) {
this.logger.debug(
`Cannot find document ${relativePath} in the database, must have been deleted already, skipping`
);
return;
}
await this.unrestrictedSyncer.unrestrictedSyncLocallyDeletedFile(
document
);
resolve();
this.database.removeDocument(document);
} catch (e) {
reject(e);
} finally {
this.database.removeDocumentPromise(promise);
}
}, [document?.metadata?.documentId, relativePath]
);
}
public async syncLocallyUpdatedFile({
@ -167,13 +165,17 @@ export class Syncer {
oldPath?: RelativePath;
relativePath: RelativePath;
}): Promise<void> {
const documentAtNewPath = this.database.getLatestDocumentByRelativePath(
relativePath
);
if (oldPath !== undefined) {
// We might have moved the document in the database before calling this method,
// in that case, we mustn't move it again.
if (
this.database.getLatestDocumentByRelativePath(relativePath) ===
undefined ||
this.database.getLatestDocumentByRelativePath(relativePath)
documentAtNewPath ===
undefined ||
documentAtNewPath
?.isDeleted === true
) {
if (oldPath === relativePath) {
@ -214,29 +216,17 @@ export class Syncer {
return;
}
const [promise, resolve, reject] = createPromise();
document = await this.database.getResolvedDocumentByRelativePath(
relativePath,
promise
await this.enqueueSyncOperation(async () =>
this.unrestrictedSyncer.unrestrictedSyncLocallyCreatedOrUpdatedFile(
{
oldPath,
document
}
), [document.metadata?.documentId, relativePath, oldPath]
);
try {
await this.syncQueue.add(async () =>
this.unrestrictedSyncer.unrestrictedSyncLocallyCreatedOrUpdatedFile(
{
oldPath,
document
}
)
);
resolve();
} catch (e) {
reject(e);
} finally {
this.database.removeDocumentPromise(promise);
}
}
public async scheduleSyncForOfflineChanges(): Promise<void> {
@ -300,7 +290,7 @@ export class Syncer {
public reset(): void {
this._isFirstSyncComplete = false;
this.syncQueue.clear();
this.remoteDocumentsLock.reset();
this.updatedDocumentsByPathAndKeysLock.reset();
this.runningScheduleSyncForOfflineChanges = undefined;
}
@ -317,91 +307,17 @@ export class Syncer {
private async internalSyncRemotelyUpdatedFile(
remoteVersion: DocumentVersionWithoutContent
): Promise<void> {
let document = this.database.getDocumentByDocumentId(
const document = this.database.getDocumentByDocumentId(
remoteVersion.documentId
);
if (document === undefined) {
return this.remoteDocumentsLock.withLock(
// Avoid the same documents getting created in parallel multiple times through fetching multiple updates of the same
// new remote document concurrently.
// There might be multiple tasks waiting for the lock
remoteVersion.documentId,
async () => {
// We have to wait for any ongoing creates sent for this file to finish,
// This is to avoid fetching one's own creates before the corresponding local create has finished syncing. This is a concern because
// documents being created don't yet have a document id in the local database and we could be notified of the remote create
// before the local create has finished syncing, so we can't just ignore the update based on the local DB content as we
// can't find the corresponding document yet.
if (document?.metadata === undefined) {
await this.unrestrictedSyncer.fileCreationLock.waitForLockWithoutAcquiringLock(
remoteVersion.relativePath
);
}
document = this.database.getDocumentByDocumentId(
remoteVersion.documentId
);
// We're the first one to get the lock, so we have to create the document in `unrestrictedSyncRemotelyUpdatedFile`
if (document === undefined) {
await this.syncQueue.add(async () =>
this.unrestrictedSyncer.unrestrictedSyncRemotelyUpdatedFile(
remoteVersion
)
);
} else {
const [promise, resolve, reject] = createPromise();
document =
await this.database.getResolvedDocumentByRelativePath(
document.relativePath,
promise
);
try {
await this.syncQueue.add(async () =>
this.unrestrictedSyncer.unrestrictedSyncRemotelyUpdatedFile(
remoteVersion,
document
)
);
resolve();
} catch (e) {
reject(e);
} finally {
this.database.removeDocumentPromise(promise);
}
}
this.database.addSeenUpdateId(remoteVersion.vaultUpdateId);
}
);
}
// We're either the first one to get the lock, so we have to create the document in `unrestrictedSyncRemotelyUpdatedFile`
const [promise, resolve, reject] = createPromise();
document = await this.database.getResolvedDocumentByRelativePath(
document.relativePath,
promise
);
try {
this.enqueueSyncOperation(async () =>
await this.syncQueue.add(async () =>
this.unrestrictedSyncer.unrestrictedSyncRemotelyUpdatedFile(
remoteVersion,
document
)
);
resolve();
} catch (e) {
reject(e);
} finally {
this.database.removeDocumentPromise(promise);
}
), [document?.relativePath, remoteVersion.relativePath, remoteVersion.documentId]
);
this.database.addSeenUpdateId(remoteVersion.vaultUpdateId);
}
@ -546,4 +462,13 @@ export class Syncer {
})
);
}
private async enqueueSyncOperation<T>(
operation: () => Promise<T>,
keys: Array<DocumentId | RelativePath | undefined | null>
): Promise<T> {
return this.updatedDocumentsByPathAndKeysLock.withLock(keys.filter(k => k !== undefined && k !== null), async () =>
this.syncQueue.add(operation)
);
}
}

View file

@ -36,8 +36,6 @@ import type { ServerConfig } from "../services/server-config";
import { Locks } from "../utils/data-structures/locks";
export class UnrestrictedSyncer {
public readonly fileCreationLock: Locks<RelativePath> =
new Locks<RelativePath>();
private ignorePatterns: RegExp[];
public constructor(
@ -65,10 +63,10 @@ export class UnrestrictedSyncer {
public async unrestrictedSyncLocallyCreatedOrUpdatedFile({
oldPath,
document,
// We use the same code path for both local and remote updates. We need to force the update
// if there are no local changes but we know that the remote version is newer.
force = false
force = false,
document,
}: {
oldPath?: RelativePath;
force?: boolean;
@ -80,16 +78,16 @@ export class UnrestrictedSyncer {
| SyncMovedDetails =
document.metadata === undefined
? {
type: SyncType.CREATE,
relativePath: document.relativePath
}
type: SyncType.CREATE,
relativePath: document.relativePath
}
: oldPath !== undefined
? {
? {
type: SyncType.MOVE,
relativePath: document.relativePath,
movedFrom: oldPath
}
: {
: {
type: SyncType.UPDATE,
relativePath: document.relativePath
};
@ -111,27 +109,21 @@ export class UnrestrictedSyncer {
let response: DocumentVersion | DocumentUpdateResponse | undefined =
undefined;
if (document.metadata === undefined) {
response = await this.fileCreationLock.withLock(
document.relativePath,
async () => {
const createResponse = await this.syncService.create({
relativePath: originalRelativePath,
contentBytes
});
response = await this.syncService.create({
relativePath: originalRelativePath,
contentBytes
});
await this.handleMaybeMergingResponse({
document,
response: createResponse,
contentHash,
originalRelativePath,
originalContentBytes: contentBytes
});
await this.handleMaybeMergingResponse({
document,
response,
contentHash,
originalRelativePath,
originalContentBytes: contentBytes,
isCreate: true
});
return createResponse;
}
);
} else {
const areThereLocalChanges =
document.metadata.hash !== contentHash ||
@ -152,22 +144,22 @@ export class UnrestrictedSyncer {
response =
isText && cachedVersion !== undefined
? await this.syncService.putText({
documentId: document.metadata.documentId,
parentVersionId:
document.metadata.parentVersionId,
relativePath: document.relativePath,
content: diff(
new TextDecoder().decode(cachedVersion),
new TextDecoder().decode(contentBytes)
)
})
documentId: document.metadata.documentId,
parentVersionId:
document.metadata.parentVersionId,
relativePath: document.relativePath,
content: diff(
new TextDecoder().decode(cachedVersion),
new TextDecoder().decode(contentBytes)
)
})
: await this.syncService.putBinary({
documentId: document.metadata.documentId,
parentVersionId:
document.metadata.parentVersionId,
relativePath: document.relativePath,
contentBytes
});
documentId: document.metadata.documentId,
parentVersionId:
document.metadata.parentVersionId,
relativePath: document.relativePath,
contentBytes
});
} else {
if (!force) {
this.logger.debug(
@ -204,16 +196,16 @@ export class UnrestrictedSyncer {
const actualUpdateDetails: SyncUpdateDetails | SyncMovedDetails =
oldPath !== undefined ||
response.relativePath != originalRelativePath
response.relativePath != originalRelativePath
? {
type: SyncType.MOVE,
relativePath: response.relativePath,
movedFrom: originalRelativePath
}
type: SyncType.MOVE,
relativePath: response.relativePath,
movedFrom: originalRelativePath
}
: {
type: SyncType.UPDATE,
relativePath: response.relativePath
};
type: SyncType.UPDATE,
relativePath: response.relativePath
};
if (!response.isDeleted) {
this.history.addHistoryEntry({
@ -351,7 +343,6 @@ export class UnrestrictedSyncer {
await this.operations.ensureClearPath(remoteVersion.relativePath);
const [promise, resolve] = createPromise();
this.database.updateDocumentMetadata(
{
documentId: remoteVersion.documentId,
@ -361,7 +352,6 @@ export class UnrestrictedSyncer {
},
this.database.createNewPendingDocument(
remoteVersion.relativePath,
promise
)
);
@ -375,8 +365,6 @@ export class UnrestrictedSyncer {
remoteVersion.relativePath
);
resolve();
this.database.removeDocumentPromise(promise);
this.history.addHistoryEntry({
status: SyncStatus.SUCCESS,
@ -388,9 +376,7 @@ export class UnrestrictedSyncer {
});
}
public reset(): void {
this.fileCreationLock.reset();
}
private async executeSync<T>(
details: SyncDetails,
@ -461,13 +447,15 @@ export class UnrestrictedSyncer {
response,
contentHash,
originalRelativePath,
originalContentBytes
originalContentBytes,
isCreate
}: {
document: DocumentRecord;
response: DocumentVersion | DocumentUpdateResponse;
contentHash: string;
originalRelativePath: string;
originalContentBytes: Uint8Array;
isCreate?: boolean;
}): Promise<void> {
// `document` is mutable and reflects the latest state in the local database
if (document.isDeleted) {
@ -494,6 +482,26 @@ export class UnrestrictedSyncer {
let actualPath = document.relativePath;
if (isCreate === true) {
// We have a file locally that got moved by another client to the same path as the one we're trying to create.
// The server returns a merging update for the document ID that already exists locally (but at another path).
// We have to merge these two documents by extending the provenance of the existing document and deleting
// the old document that the new document already contains the content for.
const existingDocument = this.database.getDocumentByDocumentId(
response.documentId
);
if (existingDocument !== undefined) {
this.logger.info(`Merging document ${existingDocument.relativePath} into existing document ${document.relativePath} after concurrent move & creation`);
this.database.removeDocument(document); // this was a (fake) pending document
if (!existingDocument.isDeleted) {
this.operations.delete(document.relativePath);
}
document = existingDocument;
}
}
// this can't happen on the creation path as we can only get a merging response if a document already exists remotely on the same path
if (response.relativePath != originalRelativePath) {
actualPath = response.relativePath;
@ -508,10 +516,12 @@ export class UnrestrictedSyncer {
); // this can throw FileNotFoundError
}
if (!("type" in response) || response.type === "MergingUpdate") {
const responseBytes = base64ToBytes(response.contentBase64);
contentHash = hash(responseBytes);
this.database.updateDocumentMetadata(
{
documentId: response.documentId,
@ -564,9 +574,8 @@ export class UnrestrictedSyncer {
type: SyncType.SKIPPED,
relativePath
},
message: `File size of ${sizeInMB} MB exceeds the maximum file size limit of ${
maxFileSizeMB
} MB`
message: `File size of ${sizeInMB} MB exceeds the maximum file size limit of ${maxFileSizeMB
} MB`
};
}
}