Fix main & improve cursor sync (#101)

This commit is contained in:
Andras Schmelczer 2025-08-25 17:15:52 +01:00 committed by GitHub
parent 81b81e30ff
commit a36a24effc
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
36 changed files with 926 additions and 686 deletions

View file

@ -31,16 +31,17 @@ export class SafeFileSystemOperations implements FileSystemOperations {
this.logger.debug(`Reading file '${path}'`);
return this.safeOperation(
path,
this.decorateToHoldLock(path, async () => this.fs.read(path)),
async () =>
this.locks.withLock(path, async () => this.fs.read(path)),
"read"
);
}
public async write(path: RelativePath, content: Uint8Array): Promise<void> {
this.logger.debug(`Writing to file '${path}'`);
return this.decorateToHoldLock(path, async () =>
return this.locks.withLock(path, async () =>
this.fs.write(path, content)
)();
);
}
public async atomicUpdateText(
@ -50,9 +51,10 @@ export class SafeFileSystemOperations implements FileSystemOperations {
this.logger.debug(`Atomically updating file '${path}'`);
return this.safeOperation(
path,
this.decorateToHoldLock(path, async () =>
this.fs.atomicUpdateText(path, updater)
),
async () =>
this.locks.withLock(path, async () =>
this.fs.atomicUpdateText(path, updater)
),
"atomicUpdateText"
);
}
@ -61,32 +63,29 @@ export class SafeFileSystemOperations implements FileSystemOperations {
// Logging this would be too noisy
return this.safeOperation(
path,
this.decorateToHoldLock(path, async () =>
this.fs.getFileSize(path)
),
async () =>
this.locks.withLock(path, async () =>
this.fs.getFileSize(path)
),
"getFileSize"
);
}
public async exists(path: RelativePath): Promise<boolean> {
this.logger.debug(`Checking if file '${path}' exists`);
return this.decorateToHoldLock(path, async () =>
this.fs.exists(path)
)();
return this.locks.withLock(path, async () => this.fs.exists(path));
}
public async createDirectory(path: RelativePath): Promise<void> {
this.logger.debug(`Creating directory '${path}'`);
return this.decorateToHoldLock(path, async () =>
return this.locks.withLock(path, async () =>
this.fs.createDirectory(path)
)();
);
}
public async delete(path: RelativePath): Promise<void> {
this.logger.debug(`Deleting file '${path}'`);
return this.decorateToHoldLock(path, async () =>
this.fs.delete(path)
)();
return this.locks.withLock(path, async () => this.fs.delete(path));
}
public async rename(
@ -96,43 +95,14 @@ export class SafeFileSystemOperations implements FileSystemOperations {
this.logger.debug(`Renaming file '${oldPath}' to '${newPath}'`);
return this.safeOperation(
oldPath,
this.decorateToHoldLock([oldPath, newPath], async () =>
this.fs.rename(oldPath, newPath)
),
async () =>
this.locks.withLock([oldPath, newPath], async () =>
this.fs.rename(oldPath, newPath)
),
"rename"
);
}
/**
* Decorate an operation to ensure that the file is locked before running it
* and that the lock is released afterwards. This results in at-most one
* concurrent operation running per file.
*/
private decorateToHoldLock<T>(
pathOrPaths: RelativePath | RelativePath[],
operation: () => Promise<T>
): () => Promise<T> {
return async () => {
const paths = Array.isArray(pathOrPaths)
? pathOrPaths
: [pathOrPaths];
await Promise.all(
paths.map(async (path) => this.locks.waitForLock(path))
);
try {
return await operation();
} finally {
await Promise.all(
paths.map((path) => {
this.locks.unlock(path);
})
);
}
};
}
/**
* Decorate an operation to ensure that the file exists before running it.
* If the operation fails, it will check if the file still exists and throw

View file

@ -18,8 +18,8 @@ export type { PersistenceProvider } from "./persistence/persistence";
export type { CursorSpan } from "./services/types/CursorSpan";
export type { ClientCursors } from "./services/types/ClientCursors";
export type { NetworkConnectionStatus } from "./types/network-connection-status";
export type { DocumentWithMaybeOutdatedClientCursors } from "./types/maybe-outdated-client-cursors";
export { DocumentUpdateStatus } from "./types/document-update-status";
export type { MaybeOutdatedClientCursors } from "./types/maybe-outdated-client-cursors";
export { DocumentSyncStatus } from "./types/document-sync-status";
export { SyncClient } from "./sync-client";
import { Locks } from "./utils/locks";

View file

@ -37,7 +37,7 @@ export interface DocumentRecord {
documentId: DocumentId;
metadata: DocumentMetadata | undefined;
isDeleted: boolean;
updates: Promise<void>[];
updates: Promise<unknown>[];
parallelVersion: number;
}
@ -135,7 +135,7 @@ export class Database {
this.save();
}
public removeDocumentPromise(promise: Promise<void>): void {
public removeDocumentPromise(promise: Promise<unknown>): void {
const entry = this.documents.find(({ updates }) =>
updates.includes(promise)
);
@ -167,7 +167,7 @@ export class Database {
public async getResolvedDocumentByRelativePath(
relativePath: RelativePath,
promise: Promise<void>
promise: Promise<unknown>
): Promise<DocumentRecord> {
const entry = this.getLatestDocumentByRelativePath(relativePath);
@ -191,7 +191,7 @@ export class Database {
public createNewPendingDocument(
documentId: DocumentId,
relativePath: RelativePath,
promise: Promise<void>
promise: Promise<unknown>
): DocumentRecord {
const previousEntry =
this.getLatestDocumentByRelativePath(relativePath);

View file

@ -2,7 +2,7 @@
import type { CursorSpan } from "./CursorSpan";
export interface DocumentWithCursors {
vault_update_id: number;
vault_update_id: number | null;
document_id: string;
relative_path: string;
cursors: CursorSpan[];

View file

@ -152,7 +152,7 @@ export class WebSocketManager {
}
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
} else if (message.type === "cursorPositions") {
this.logger.info(
this.logger.debug(
`Received cursor positions for ${JSON.stringify(message.clients)}`
);
this.remoteCursorsUpdateListeners.forEach((listener) => {

View file

@ -14,26 +14,16 @@ import { ConnectionStatus } from "./services/connection-status";
import { UnrestrictedSyncer } from "./sync-operations/unrestricted-syncer";
import { rateLimit } from "./utils/rate-limit";
import type { NetworkConnectionStatus } from "./types/network-connection-status";
import { DocumentUpdateStatus } from "./types/document-update-status";
import { DocumentSyncStatus } from "./types/document-sync-status";
import { WebSocketManager } from "./services/websocket-manager";
import { createClientId } from "./utils/create-client-id";
import { CursorTracker } from "./sync-operations/cursor-tracker";
import type { CursorSpan } from "./services/types/CursorSpan";
import type { ClientCursors } from "./services/types/ClientCursors";
import type { DocumentWithCursors } from "./services/types/DocumentWithCursors";
import { hash } from "./utils/hash";
import type { DocumentWithMaybeOutdatedClientCursors } from "./types/maybe-outdated-client-cursors";
enum DocumentUpToDateness {
UpToDate = "UpToDate",
Prior = "Prior",
Later = "Later"
}
import type { MaybeOutdatedClientCursors } from "./types/maybe-outdated-client-cursors";
import { FileChangeNotifier } from "./sync-operations/file-change-notifier";
export class SyncClient {
private static readonly MINIMUM_SAVE_INTERVAL_MS = 1000;
private lastCursorState: DocumentWithCursors[] = [];
private readonly knownClientCursors: ClientCursors[] = [];
// eslint-disable-next-line @typescript-eslint/max-params
private constructor(
@ -45,7 +35,8 @@ export class SyncClient {
private readonly webSocketManager: WebSocketManager,
private readonly _logger: Logger,
private readonly connectionStatus: ConnectionStatus,
private readonly fileOperations: FileOperations
private readonly cursorTracker: CursorTracker,
private readonly fileChangeNotifier: FileChangeNotifier
) {
this.settings.addOnSettingsChangeListener(
async (newSettings, oldSettings) => {
@ -54,10 +45,6 @@ export class SyncClient {
}
}
);
this.webSocketManager.addRemoteCursorsUpdateListener((cursors) => {
this.knownClientCursors.push(...cursors);
});
}
public get logger(): Logger {
@ -148,7 +135,6 @@ export class SyncClient {
);
const syncer = new Syncer(
deviceId,
logger,
database,
settings,
@ -166,6 +152,13 @@ export class SyncClient {
webSocket
);
const fileChangeNotifier = new FileChangeNotifier();
const cursorTracker = new CursorTracker(
database,
webSocketManager,
fileOperations,
fileChangeNotifier
);
const client = new SyncClient(
history,
settings,
@ -175,7 +168,8 @@ export class SyncClient {
webSocketManager,
logger,
connectionStatus,
fileOperations
cursorTracker,
fileChangeNotifier
);
logger.info("SyncClient initialised");
@ -264,12 +258,14 @@ export class SyncClient {
public async syncLocallyCreatedFile(
relativePath: RelativePath
): Promise<void> {
this.fileChangeNotifier.notifyOfFileChange(relativePath);
return this.syncer.syncLocallyCreatedFile(relativePath);
}
public async syncLocallyDeletedFile(
relativePath: RelativePath
): Promise<void> {
this.fileChangeNotifier.notifyOfFileChange(relativePath);
return this.syncer.syncLocallyDeletedFile(relativePath);
}
@ -280,6 +276,7 @@ export class SyncClient {
oldPath?: RelativePath;
relativePath: RelativePath;
}): Promise<void> {
this.fileChangeNotifier.notifyOfFileChange(relativePath);
return this.syncer.syncLocallyUpdatedFile({
oldPath,
relativePath
@ -288,154 +285,26 @@ export class SyncClient {
public getDocumentSyncingStatus(
relativePath: RelativePath
): DocumentUpdateStatus {
): DocumentSyncStatus {
const document =
this.database.getLatestDocumentByRelativePath(relativePath);
if (document === undefined) {
return DocumentUpdateStatus.SYNCING;
return DocumentSyncStatus.SYNCING;
}
return document.updates.length > 0
? DocumentUpdateStatus.SYNCING
: DocumentUpdateStatus.UP_TO_DATE;
? DocumentSyncStatus.SYNCING
: DocumentSyncStatus.UP_TO_DATE;
}
/// Update the local cursors for the given documents.
/// Can be called frequently as it only emits an event
// if the state has actually changed.
public async updateLocalCursors(
documentToCursors: Record<RelativePath, CursorSpan[]>
): Promise<void> {
const documentsWithCursors: DocumentWithCursors[] = [];
for (const [relativePath, cursors] of Object.entries(
documentToCursors
)) {
const record =
this.database.getLatestDocumentByRelativePath(relativePath);
if (!record) {
continue; // Let's wait for the file to be created before sending cursors
}
const readContent = await this.fileOperations.read(relativePath);
if (record.metadata?.hash !== hash(readContent)) {
continue; // Wouldn't make sense to sync the positions in a dirty file
}
documentsWithCursors.push({
relative_path: relativePath,
document_id: record.documentId,
vault_update_id: record.metadata.parentVersionId,
cursors
});
}
if (
JSON.stringify(this.lastCursorState) ===
JSON.stringify(documentsWithCursors)
) {
return;
}
this.lastCursorState = documentsWithCursors;
this.webSocketManager.updateLocalCursors({ documentsWithCursors });
await this.cursorTracker.sendLocalCursorsToServer(documentToCursors);
}
public addRemoteCursorsUpdateListener(
listener: (cursors: DocumentWithMaybeOutdatedClientCursors[]) => unknown
listener: (cursors: MaybeOutdatedClientCursors[]) => unknown
): void {
this.webSocketManager.addRemoteCursorsUpdateListener(async () => {
listener(await this.getRelevantClientCursors());
});
}
private async getRelevantClientCursors(): Promise<
DocumentWithMaybeOutdatedClientCursors[]
> {
const result: DocumentWithMaybeOutdatedClientCursors[] = [];
const included = new Set<string>();
for (const clientCursors of [...this.knownClientCursors].reverse()) {
if (included.has(clientCursors.deviceId)) {
continue;
}
const upToDateness =
await this.getDocumentsUpToDateness(clientCursors);
if (upToDateness == DocumentUpToDateness.Later) {
continue;
}
result.push({
...clientCursors,
isOutdated: upToDateness == DocumentUpToDateness.Prior
});
included.add(clientCursors.deviceId);
}
return result;
}
private async getDocumentsUpToDateness(
clientCursor: ClientCursors
): Promise<DocumentUpToDateness> {
const results = [];
for (const document of clientCursor.documentsWithCursors) {
results.push(await this.getDocumentUpToDateness(document));
}
if (
results.every((result) => result === DocumentUpToDateness.UpToDate)
) {
return DocumentUpToDateness.UpToDate;
}
if (
results.every(
(result) =>
result === DocumentUpToDateness.UpToDate ||
result === DocumentUpToDateness.Prior
)
) {
return DocumentUpToDateness.Prior;
}
return DocumentUpToDateness.Later;
}
private async getDocumentUpToDateness(
document: DocumentWithCursors
): Promise<DocumentUpToDateness> {
const record = this.database.getLatestDocumentByRelativePath(
document.relative_path
);
if (!record) {
// the document of the cursor must be from the future
return DocumentUpToDateness.Later;
}
if (
(record.metadata?.parentVersionId ?? 0) < document.vault_update_id
) {
return DocumentUpToDateness.Later;
} else if (
document.vault_update_id < (record.metadata?.parentVersionId ?? 0)
) {
// the document of the cursor must be from the past
return DocumentUpToDateness.Prior;
}
const currentContent = await this.fileOperations.read(
document.relative_path
);
return this.database.getLatestDocumentByRelativePath(
document.relative_path
)?.metadata?.hash === hash(currentContent)
? DocumentUpToDateness.UpToDate
: DocumentUpToDateness.Prior;
this.cursorTracker.addRemoteCursorsUpdateListener(listener);
}
}

View file

@ -0,0 +1,253 @@
import type { FileOperations } from "../file-operations/file-operations";
import type { Database, RelativePath } from "../persistence/database";
import type { ClientCursors } from "../services/types/ClientCursors";
import type { CursorSpan } from "../services/types/CursorSpan";
import type { DocumentWithCursors } from "../services/types/DocumentWithCursors";
import type { WebSocketManager } from "../services/websocket-manager";
import type { MaybeOutdatedClientCursors } from "../types/maybe-outdated-client-cursors";
import { DocumentUpToDateness } from "../types/document-up-to-dateness";
import { hash } from "../utils/hash";
import type { FileChangeNotifier } from "./file-change-notifier";
import { Lock } from "../utils/locks";
// Cursor positions are updated separately from documents. However, a given cursor position is only
// valid within a certain version of the document it belongs to. This class tracks previous and the latest
// known remote cursor positions, and for each document, tries to return the latest cursor positions that are
// not from the future.
export class CursorTracker {
private readonly updateLock = new Lock();
private knownRemoteCursors: (ClientCursors & {
upToDateness: DocumentUpToDateness;
})[] = [];
private lastLocalCursorState: DocumentWithCursors[] = [];
private lastLocalCursorStateWithoutDirtyDocuments: DocumentWithCursors[] =
[];
public constructor(
private readonly database: Database,
private readonly webSocketManager: WebSocketManager,
private readonly fileOperations: FileOperations,
private readonly fileChangeNotifier: FileChangeNotifier
) {
this.webSocketManager.addRemoteCursorsUpdateListener(
async (clientCursors) => {
await this.updateLock.withLock(async () => {
// The latest message will contain all active clients, so we can delete the ones
// from the local list which are no longer active.
const allIds = new Set(
clientCursors.map((c) => c.deviceId)
);
const updatedKnownRemoteCursors =
this.knownRemoteCursors.filter((c) =>
allIds.has(c.deviceId)
);
for (const cursor of clientCursors.filter((client) =>
client.documentsWithCursors.every(
(doc) => doc.vault_update_id != null
)
)) {
updatedKnownRemoteCursors.push({
...cursor,
upToDateness:
await this.getDocumentsUpToDateness(cursor)
});
}
this.knownRemoteCursors = updatedKnownRemoteCursors;
});
}
);
this.fileChangeNotifier.addFileChangeListener(async (relativePath) =>
this.updateLock.withLock(async () => {
for (const clientCursor of this.knownRemoteCursors) {
if (
clientCursor.documentsWithCursors.some(
(document) =>
document.relative_path === relativePath
)
) {
clientCursor.upToDateness =
await this.getDocumentsUpToDateness(clientCursor);
}
}
})
);
}
/// Update the local cursors for the given documents.
/// Can be called frequently as it only emits an event
/// if the state has actually changed.
public async sendLocalCursorsToServer(
documentToCursors: Record<RelativePath, CursorSpan[]>
): Promise<void> {
const documentsWithCursors: DocumentWithCursors[] = [];
for (const [relativePath, cursors] of Object.entries(
documentToCursors
)) {
const record =
this.database.getLatestDocumentByRelativePath(relativePath);
if (!record) {
continue; // Let's wait for the file to be created before sending cursors
}
if (!record.metadata) {
continue; // this is a new document, no need to sync the cursors
}
documentsWithCursors.push({
relative_path: relativePath,
document_id: record.documentId,
vault_update_id: record.metadata.parentVersionId,
cursors: cursors.map(({ start, end }) => ({
start: Math.min(start, end),
end: Math.max(start, end)
})) // the client might send directional selections
});
}
if (
JSON.stringify(this.lastLocalCursorState) ===
JSON.stringify(documentsWithCursors)
) {
// Caching step to avoid reading the edited files all the time
return;
}
this.lastLocalCursorState = documentsWithCursors;
for (const doc of documentsWithCursors) {
const readContent = await this.fileOperations.read(
doc.relative_path
);
const record = this.database.getLatestDocumentByRelativePath(
doc.relative_path
);
if (record?.metadata?.hash !== hash(readContent)) {
doc.vault_update_id = null;
}
}
if (
JSON.stringify(this.lastLocalCursorStateWithoutDirtyDocuments) ===
JSON.stringify(documentsWithCursors)
) {
return;
}
this.lastLocalCursorStateWithoutDirtyDocuments = documentsWithCursors;
this.webSocketManager.updateLocalCursors({ documentsWithCursors });
}
// The returned position may be accurate, if it matches the document version, or outdated, in which case
// the client has to heuristically guess it's current position based on the local edits.
public addRemoteCursorsUpdateListener(
listener: (cursors: MaybeOutdatedClientCursors[]) => unknown
): void {
// CursorTracker registers its own event listener in the constructor so it must have been called before this
this.webSocketManager.addRemoteCursorsUpdateListener(async () => {
await this.updateLock.withLock(() =>
listener(this.getRelevantAndPruneKnownClientCursors())
);
});
}
private getRelevantAndPruneKnownClientCursors(): MaybeOutdatedClientCursors[] {
const result: MaybeOutdatedClientCursors[] = [];
const included = new Set<string>();
const relevantCursors = [];
for (const clientCursors of [...this.knownRemoteCursors].reverse()) {
if (included.has(clientCursors.deviceId)) {
continue;
}
if (clientCursors.upToDateness == DocumentUpToDateness.Later) {
continue;
}
result.push({
...clientCursors,
isOutdated:
clientCursors.upToDateness == DocumentUpToDateness.Prior
});
included.add(clientCursors.deviceId);
relevantCursors.unshift(clientCursors); // to reverse order back to normal
}
this.knownRemoteCursors = relevantCursors;
return result;
}
// We store up-to-dateness on a per-client basis to simplify the implementation.
// An individual client won't have too many documents open at once, so this is a reasonable trade-off.
private async getDocumentsUpToDateness(
clientCursor: ClientCursors
): Promise<DocumentUpToDateness> {
const results = [];
for (const document of clientCursor.documentsWithCursors) {
results.push(await this.getDocumentUpToDateness(document));
}
if (
results.every((result) => result === DocumentUpToDateness.UpToDate)
) {
return DocumentUpToDateness.UpToDate;
}
if (
results.every(
(result) =>
result === DocumentUpToDateness.UpToDate ||
result === DocumentUpToDateness.Prior
)
) {
return DocumentUpToDateness.Prior;
}
return DocumentUpToDateness.Later;
}
private async getDocumentUpToDateness(
document: DocumentWithCursors
): Promise<DocumentUpToDateness> {
const record = this.database.getLatestDocumentByRelativePath(
document.relative_path
);
if (!record) {
// the document of the cursor must be from the future
return DocumentUpToDateness.Later;
}
if (
(record.metadata?.parentVersionId ?? 0) <
(document.vault_update_id ?? 0)
) {
return DocumentUpToDateness.Later;
} else if (
(document.vault_update_id ?? 0) <
(record.metadata?.parentVersionId ?? 0)
) {
// the document of the cursor must be from the past
return DocumentUpToDateness.Prior;
}
const currentContent = await this.fileOperations.read(
document.relative_path
);
return this.database.getLatestDocumentByRelativePath(
document.relative_path
)?.metadata?.hash === hash(currentContent)
? DocumentUpToDateness.UpToDate
: DocumentUpToDateness.Prior;
}
}

View file

@ -0,0 +1,15 @@
import type { RelativePath } from "../persistence/database";
export class FileChangeNotifier {
private readonly listeners: ((filePath: RelativePath) => unknown)[] = [];
public addFileChangeListener(
listener: (filePath: RelativePath) => unknown
): void {
this.listeners.push(listener);
}
public notifyOfFileChange(filePath: RelativePath): void {
this.listeners.forEach((listener) => listener(filePath));
}
}

View file

@ -9,7 +9,7 @@ import type { Logger } from "../tracing/logger";
import PQueue from "p-queue";
import { hash } from "../utils/hash";
import { v4 as uuidv4 } from "uuid";
import type { Settings, SyncSettings } from "../persistence/settings";
import type { Settings } from "../persistence/settings";
import type { FileOperations } from "../file-operations/file-operations";
import { findMatchingFile } from "../utils/find-matching-file";
import type { UnrestrictedSyncer } from "./unrestricted-syncer";
@ -27,12 +27,10 @@ export class Syncer {
private runningScheduleSyncForOfflineChanges: Promise<void> | undefined;
// eslint-disable-next-line @typescript-eslint/max-params
public constructor(
private readonly deviceId: string,
private readonly logger: Logger,
private readonly database: Database,
private readonly settings: Settings,
settings: Settings,
private readonly syncService: SyncService,
private readonly operations: FileOperations,
private readonly internalSyncer: UnrestrictedSyncer
@ -261,58 +259,77 @@ export class Syncer {
remoteVersion.documentId
);
let hasLockToRelease = false;
if (document === undefined) {
// Let's avoid the same documents getting created in parallel multiple times.
// There might be multiple tasks waiting for the lock
await this.remoteDocumentsLock.waitForLock(
remoteVersion.documentId
);
hasLockToRelease = true;
document = this.database.getDocumentByDocumentId(
remoteVersion.documentId
return this.remoteDocumentsLock.withLock(
remoteVersion.documentId,
async () => {
document = this.database.getDocumentByDocumentId(
remoteVersion.documentId
);
// We're either the first one to get the lock, so we have to create the document in `unrestrictedSyncRemotelyUpdatedFile`
if (document === undefined) {
await this.syncQueue.add(async () =>
this.internalSyncer.unrestrictedSyncRemotelyUpdatedFile(
remoteVersion
)
);
} else {
const [promise, resolve, reject] = createPromise();
document =
await this.database.getResolvedDocumentByRelativePath(
document.relativePath,
promise
);
try {
await this.syncQueue.add(async () =>
this.internalSyncer.unrestrictedSyncRemotelyUpdatedFile(
remoteVersion,
document
)
);
resolve();
} catch (e) {
reject(e);
} finally {
this.database.removeDocumentPromise(promise);
}
}
this.database.addSeenUpdateId(remoteVersion.vaultUpdateId);
}
);
}
// We're either the first one to get the lock, so we have to create the document in `unrestrictedSyncRemotelyUpdatedFile`
const [promise, resolve, reject] = createPromise();
document = await this.database.getResolvedDocumentByRelativePath(
document.relativePath,
promise
);
try {
// We're either the first one to get the lock, so we have to create the document in `unrestrictedSyncRemotelyUpdatedFile`
if (document === undefined) {
await this.syncQueue.add(async () =>
this.internalSyncer.unrestrictedSyncRemotelyUpdatedFile(
remoteVersion
)
);
} else {
const [promise, resolve, reject] = createPromise();
await this.syncQueue.add(async () =>
this.internalSyncer.unrestrictedSyncRemotelyUpdatedFile(
remoteVersion,
document
)
);
document =
await this.database.getResolvedDocumentByRelativePath(
document.relativePath,
promise
);
try {
await this.syncQueue.add(async () =>
this.internalSyncer.unrestrictedSyncRemotelyUpdatedFile(
remoteVersion,
document
)
);
resolve();
} catch (e) {
reject(e);
} finally {
this.database.removeDocumentPromise(promise);
}
}
this.database.addSeenUpdateId(remoteVersion.vaultUpdateId);
resolve();
} catch (e) {
reject(e);
} finally {
if (hasLockToRelease) {
this.remoteDocumentsLock.unlock(remoteVersion.documentId);
}
this.database.removeDocumentPromise(promise);
}
this.database.addSeenUpdateId(remoteVersion.vaultUpdateId);
}
private async internalScheduleSyncForOfflineChanges(): Promise<void> {

View file

@ -1,4 +1,4 @@
export enum DocumentUpdateStatus {
export enum DocumentSyncStatus {
UP_TO_DATE = "UP_TO_DATE",
SYNCING = "SYNCING"
}

View file

@ -0,0 +1,5 @@
export enum DocumentUpToDateness {
UpToDate = "UpToDate", // easiest case, the client can just show the cursors as-is
Prior = "Prior", // The cursors are outdated, so the client has to guess the cursor positions based on local updates. This is only possible if this client's cursor has once been up-to-date in a given document.
Later = "Later" // The cursors are from a future version of a document, there's no way we can accuratly show them locally.
}

View file

@ -1,5 +1,5 @@
import type { ClientCursors } from "../services/types/ClientCursors";
export interface DocumentWithMaybeOutdatedClientCursors extends ClientCursors {
export interface MaybeOutdatedClientCursors extends ClientCursors {
isOutdated: boolean;
}

View file

@ -1,17 +1,25 @@
type ResolveFunction<T> = undefined extends T
? (value?: T) => unknown
: (value: T) => unknown;
/**
* A type-safe utility function to create a Promise with resolve and reject functions.
* @returns A tuple containing a Promise, a resolve function, and a reject function.
*/
export function createPromise<T = unknown>(): [
Promise<T>,
(value: T) => unknown,
ResolveFunction<T>,
(error: unknown) => unknown
] {
let resolve: undefined | ((resolved: T) => unknown) = undefined;
let resolve: undefined | ResolveFunction<T> = undefined;
let reject: undefined | ((error: unknown) => unknown) = undefined;
const creationPromise = new Promise<T>(
(resolve_, reject_) => ((resolve = resolve_), (reject = reject_))
(resolve_, reject_) =>
(
// eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion
(resolve = resolve_ as ResolveFunction<T>), (reject = reject_)
)
);
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion

View file

@ -2,8 +2,9 @@ import { Logger } from "../tracing/logger";
import type { RelativePath } from "../persistence/database";
import { Locks } from "./locks";
describe("Document lock", () => {
describe("withLock", () => {
const testPath: RelativePath = "test/document/path";
const testPath2: RelativePath = "test/document/path2";
const logger = new Logger();
// eslint-disable-next-line @typescript-eslint/init-declarations
@ -13,77 +14,211 @@ describe("Document lock", () => {
locks = new Locks<RelativePath>(logger);
});
test("should lock a document successfully", () => {
const result = locks.tryLock(testPath);
expect(result).toBe(true);
});
test("should not lock a document that is already locked", () => {
locks.tryLock(testPath);
const result = locks.tryLock(testPath);
expect(result).toBe(false);
});
test("should unlock a locked document", () => {
locks.tryLock(testPath);
locks.unlock(testPath);
const result = locks.tryLock(testPath);
expect(result).toBe(true);
locks.unlock(testPath);
});
test("should throw an error when unlocking a document that is not locked", () => {
expect(() => {
locks.unlock(testPath);
}).toThrow(`Key '${testPath}' is not locked, cannot unlock`);
});
test("should wait for a document lock and resolve when unlocked", async () => {
locks.tryLock(testPath);
let resolved = false;
const waitPromise = locks.waitForLock(testPath).then(() => {
resolved = true;
test("should execute function with single key lock", async () => {
let executionCount = 0;
const result = await locks.withLock(testPath, () => {
executionCount++;
return "success";
});
locks.unlock(testPath);
await waitPromise;
expect(resolved).toBe(true);
expect(result).toBe("success");
expect(executionCount).toBe(1);
});
test("should resolve multiple waiters in FIFO order", async () => {
locks.tryLock(testPath);
let firstResolved = false;
let secondResolved = false;
let thirdResolved = false;
const firstWaitPromise = locks.waitForLock(testPath).then(() => {
firstResolved = true;
test("should execute async function with single key lock", async () => {
let executionCount = 0;
const result = await locks.withLock(testPath, async () => {
executionCount++;
await new Promise((resolve) => setTimeout(resolve, 10));
return "async-success";
});
const secondWaitPromise = locks.waitForLock(testPath).then(() => {
secondResolved = true;
expect(result).toBe("async-success");
expect(executionCount).toBe(1);
});
test("should execute function with multiple key locks", async () => {
let executionCount = 0;
const result = await locks.withLock([testPath, testPath2], () => {
executionCount++;
return "multi-success";
});
const thirdWaitPromise = locks.waitForLock(testPath).then(() => {
thirdResolved = true;
expect(result).toBe("multi-success");
expect(executionCount).toBe(1);
});
test("should sort multiple keys to prevent deadlocks", async () => {
const executionOrder: string[] = [];
// Start two concurrent operations with keys in different orders
const promise1 = locks.withLock([testPath2, testPath], async () => {
executionOrder.push("operation1-start");
await new Promise((resolve) => setTimeout(resolve, 50));
executionOrder.push("operation1-end");
return "result1";
});
locks.unlock(testPath);
await firstWaitPromise;
expect(firstResolved).toBe(true);
expect(secondResolved).toBe(false);
expect(thirdResolved).toBe(false);
const promise2 = locks.withLock([testPath, testPath2], async () => {
executionOrder.push("operation2-start");
await new Promise((resolve) => setTimeout(resolve, 50));
executionOrder.push("operation2-end");
return "result2";
});
locks.unlock(testPath);
await secondWaitPromise;
expect(secondResolved).toBe(true);
expect(thirdResolved).toBe(false);
const [result1, result2] = await Promise.all([promise1, promise2]);
locks.unlock(testPath);
await thirdWaitPromise;
expect(thirdResolved).toBe(true);
expect(result1).toBe("result1");
expect(result2).toBe("result2");
// One operation should complete entirely before the other starts
expect(executionOrder).toEqual([
"operation1-start",
"operation1-end",
"operation2-start",
"operation2-end"
]);
});
test("should serialize access to same key", async () => {
const executionOrder: string[] = [];
const promise1 = locks.withLock(testPath, async () => {
executionOrder.push("operation1-start");
await new Promise((resolve) => setTimeout(resolve, 50));
executionOrder.push("operation1-end");
return "result1";
});
const promise2 = locks.withLock(testPath, async () => {
executionOrder.push("operation2-start");
await new Promise((resolve) => setTimeout(resolve, 30));
executionOrder.push("operation2-end");
return "result2";
});
const [result1, result2] = await Promise.all([promise1, promise2]);
expect(result1).toBe("result1");
expect(result2).toBe("result2");
expect(executionOrder).toEqual([
"operation1-start",
"operation1-end",
"operation2-start",
"operation2-end"
]);
});
test("should allow concurrent access to different keys", async () => {
const executionOrder: string[] = [];
const promise1 = locks.withLock(testPath, async () => {
executionOrder.push("operation1-start");
await new Promise((resolve) => setTimeout(resolve, 50));
executionOrder.push("operation1-end");
return "result1";
});
const promise2 = locks.withLock(testPath2, async () => {
executionOrder.push("operation2-start");
await new Promise((resolve) => setTimeout(resolve, 30));
executionOrder.push("operation2-end");
return "result2";
});
const [result1, result2] = await Promise.all([promise1, promise2]);
expect(result1).toBe("result1");
expect(result2).toBe("result2");
// Both operations should run concurrently
expect(executionOrder[0]).toBe("operation1-start");
expect(executionOrder[1]).toBe("operation2-start");
});
test("should release locks even if function throws", async () => {
const error = new Error("test error");
await expect(
locks.withLock(testPath, () => {
throw error;
})
).rejects.toThrow("test error");
// Lock should be released, allowing another operation
const result = await locks.withLock(
testPath,
() => "success-after-error"
);
expect(result).toBe("success-after-error");
});
test("should release locks even if async function throws", async () => {
const error = new Error("async test error");
await expect(
locks.withLock(testPath, async () => {
await new Promise((resolve) => setTimeout(resolve, 10));
throw error;
})
).rejects.toThrow("async test error");
// Lock should be released, allowing another operation
const result = await locks.withLock(
testPath,
() => "success-after-async-error"
);
expect(result).toBe("success-after-async-error");
});
test("should handle empty array of keys", async () => {
const result = await locks.withLock([], () => "empty-keys");
expect(result).toBe("empty-keys");
});
test("should maintain FIFO order for multiple waiters", async () => {
const executionOrder: string[] = [];
// Start first operation that holds the lock
const firstPromise = locks.withLock(testPath, async () => {
executionOrder.push("first-start");
await new Promise((resolve) => setTimeout(resolve, 100));
executionOrder.push("first-end");
return "first";
});
// Small delay to ensure first operation starts
await new Promise((resolve) => setTimeout(resolve, 10));
// Queue second and third operations
const secondPromise = locks.withLock(testPath, async () => {
executionOrder.push("second-start");
await new Promise((resolve) => setTimeout(resolve, 30));
executionOrder.push("second-end");
return "second";
});
const thirdPromise = locks.withLock(testPath, async () => {
executionOrder.push("third-start");
await new Promise((resolve) => setTimeout(resolve, 20));
executionOrder.push("third-end");
return "third";
});
const [first, second, third] = await Promise.all([
firstPromise,
secondPromise,
thirdPromise
]);
expect(first).toBe("first");
expect(second).toBe("second");
expect(third).toBe("third");
expect(executionOrder).toEqual([
"first-start",
"first-end",
"second-start",
"second-end",
"third-start",
"third-end"
]);
});
});

View file

@ -13,7 +13,54 @@ export class Locks<T> {
/** Queue of resolve functions waiting for each key */
private readonly waiters = new Map<T, (() => unknown)[]>();
public constructor(private readonly logger: Logger) {}
public constructor(private readonly logger?: Logger) {}
/**
* Executes a function while holding exclusive locks on one or more keys.
*
* This method ensures that the provided function runs with exclusive access to the
* specified key(s). Multiple keys are sorted to prevent deadlocks when different
* operations request the same keys in different orders.
*
* @template R The return type of the function to execute
* @param keyOrKeys A single key or array of keys to lock during function execution
* @param fn The function to execute while holding the lock(s). Can be sync or async.
* @returns A Promise that resolves to the return value of the executed function
*
* @example
* ```typescript
* // Lock a single key
* const result = await locks.withLock('file1', () => {
* // Critical section - only one operation can access 'file1' at a time
* return processFile('file1');
* });
*
* // Lock multiple keys (prevents deadlocks through consistent ordering)
* await locks.withLock(['file1', 'file2'], async () => {
* // Critical section - exclusive access to both files
* await moveFile('file1', 'file2');
* });
* ```
*
* @throws Any error thrown by the provided function will be propagated after locks are released
*/
public async withLock<R>(
keyOrKeys: T | T[],
fn: () => R | Promise<R>
): Promise<R> {
const keys = Array.isArray(keyOrKeys) ? keyOrKeys : [keyOrKeys];
keys.sort((a, b) => String(a).localeCompare(String(b))); // Ensure consistent order to prevent deadlocks
await Promise.all(keys.map(async (key) => this.waitForLock(key)));
try {
return await fn();
} finally {
keys.forEach((key) => {
this.unlock(key);
});
}
}
/**
* Attempts to acquire a lock immediately without waiting.
@ -22,7 +69,7 @@ export class Locks<T> {
* @param key The key to lock
* @returns `true` if lock acquired, `false` if already locked
*/
public tryLock(key: T): boolean {
private tryLock(key: T): boolean {
if (this.locked.has(key)) {
return false;
}
@ -39,12 +86,12 @@ export class Locks<T> {
* @param key The key to wait for and lock
* @returns Promise that resolves when lock is acquired
*/
public async waitForLock(key: T): Promise<void> {
private async waitForLock(key: T): Promise<void> {
if (this.tryLock(key)) {
return Promise.resolve();
}
this.logger.debug(`Waiting for lock on ${key}`);
this.logger?.debug(`Waiting for lock on ${key}`);
return new Promise((resolve) => {
// DefaultDict behavior
@ -65,7 +112,7 @@ export class Locks<T> {
* @param key The key to unlock
* @throws {Error} If key is not currently locked
*/
public unlock(key: T): void {
private unlock(key: T): void {
if (!this.locked.has(key)) {
throw new Error(`Key '${key}' is not locked, cannot unlock`);
}
@ -74,19 +121,22 @@ export class Locks<T> {
const nextWaiting = this.waiters.get(key)?.shift();
if (nextWaiting) {
this.logger.debug(`Granted lock on ${key}`);
this.logger?.debug(`Granted lock on ${key}`);
nextWaiting();
} else {
this.locked.delete(key);
}
}
}
/**
* Clears all locks and waiters. Causes waiting operations to hang indefinitely.
* Use with caution.
*/
public reset(): void {
this.locked.clear();
this.waiters.clear();
export class Lock {
private readonly locks: Locks<boolean>;
public constructor(logger?: Logger) {
this.locks = new Locks(logger);
}
public async withLock<R>(fn: () => R | Promise<R>): Promise<R> {
return this.locks.withLock(true, fn);
}
}