diff --git a/frontend/obsidian-plugin/package.json b/frontend/obsidian-plugin/package.json index 24a95e03..0d004408 100644 --- a/frontend/obsidian-plugin/package.json +++ b/frontend/obsidian-plugin/package.json @@ -34,6 +34,7 @@ "url": "^0.11.4", "virtual-scroller": "^1.13.1", "webpack": "^5.99.9", - "webpack-cli": "^6.0.1" + "webpack-cli": "^6.0.1", + "reconcile-text": "^0.5.0" } } diff --git a/frontend/obsidian-plugin/src/vault-link-plugin.ts b/frontend/obsidian-plugin/src/vault-link-plugin.ts index 0d4c1a1d..7e0eff1b 100644 --- a/frontend/obsidian-plugin/src/vault-link-plugin.ts +++ b/frontend/obsidian-plugin/src/vault-link-plugin.ts @@ -19,7 +19,7 @@ import { updateEditorStatusDisplay } from "./views/editor-sync-line/editor-sync- import { remoteCursorsTheme } from "./views/cursors/remote-cursor-theme"; import { remoteCursorsPlugin, - setCursors + RemoteCursorsPluginValue } from "./views/cursors/remote-cursors-plugin"; import { LocalCursorUpdateListener } from "./views/cursors/local-cursor-update-listener"; import { slowFetchFactory } from "./debugging/slow-fetch-factory"; @@ -93,7 +93,7 @@ export default class VaultLinkPlugin extends Plugin { this.registerEditorExtension([remoteCursorsTheme, remoteCursorsPlugin]); this.client.addRemoteCursorsUpdateListener((cursors) => { - setCursors(cursors, this.app); + RemoteCursorsPluginValue.setCursors(cursors, this.app); }); const cursorListener = new LocalCursorUpdateListener( diff --git a/frontend/obsidian-plugin/src/views/cursors/remote-cursors-plugin.ts b/frontend/obsidian-plugin/src/views/cursors/remote-cursors-plugin.ts index 869cbef9..7676dc08 100644 --- a/frontend/obsidian-plugin/src/views/cursors/remote-cursors-plugin.ts +++ b/frontend/obsidian-plugin/src/views/cursors/remote-cursors-plugin.ts @@ -13,97 +13,196 @@ import type { CursorSpan, MaybeOutdatedClientCursors } from "sync-client"; import type { App } from "obsidian"; import { MarkdownView } from "obsidian"; -let cursors: { - name: string; - path: string; - span: CursorSpan; - deviceId: string; -}[] = []; - import { StateEffect } from "@codemirror/state"; import { getRandomColor } from "src/utils/get-random-color"; -import { updateSelection } from "./update-selection"; +import { reconcileWithHistory, SpanWithHistory } from "reconcile-text"; + +function findWhereToMoveCursor( + cursor: number, + spans: SpanWithHistory[] +): number | null { + let position = 0; + for (const span of spans) { + // left and origin are the same + if (position === cursor && span.history === "AddedFromRight") { + return position + span.text.length; + } + position += span.text.length; + if (position === cursor && span.history === "RemovedFromRight") { + return position - span.text.length; + } + } + + return null; +} const forceUpdate = StateEffect.define(); export class RemoteCursorsPluginValue implements PluginValue { public decorations: DecorationSet = RangeSet.of([]); - public update(update: ViewUpdate): void { - update.changes.iterChanges((fromA, toA, fromB, toB, _inserted) => { - const spans = cursors.map((cursor) => cursor.span); - updateSelection({ - fromA, - toA, - fromB, - toB, - spans + private static cursors: { + name: string; + path: string; + span: CursorSpan; + deviceId: string; + isOutdated: boolean; + }[] = []; + + public static setCursors( + clients: MaybeOutdatedClientCursors[], + app: App + ): void { + RemoteCursorsPluginValue.cursors = [ + ...RemoteCursorsPluginValue.cursors.filter(({ deviceId }) => + clients.some( + (client) => + client.deviceId === deviceId && client.isOutdated + ) + ), + ...clients + .filter( + ({ isOutdated, deviceId }) => + !isOutdated || + RemoteCursorsPluginValue.cursors.every( + (c) => deviceId !== c.deviceId + ) + ) + .flatMap((client) => { + const clientCursors = client.documentsWithCursors; + return clientCursors.flatMap((cursor) => + cursor.cursors.map((span) => ({ + name: client.userName, + path: cursor.relative_path, + deviceId: client.deviceId, + isOutdated: client.isOutdated, + span: { ...span } + })) + ); + }) + ]; + + app.workspace + .getLeavesOfType("markdown") + .map((leaf) => leaf.view) + .filter((view) => view instanceof MarkdownView) + .forEach((view) => { + // @ts-expect-error, not typed + // eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion + const editor = view.editor.cm as EditorView; + + editor.dispatch({ + effects: [forceUpdate.of(null)] + }); }); + } + + public update(update: ViewUpdate): void { + const original = update.startState.doc.toString(); + const edited = update.state.doc.toString(); + + let updatedPositions: number[] = []; + const reconciled = reconcileWithHistory( + original, + { + text: original, + cursors: RemoteCursorsPluginValue.cursors.flatMap( + ({ span }, i) => [ + { id: i * 2, position: span.start }, + { id: i * 2 + 1, position: span.end } + ] + ) + }, + edited, + "Character" + ); + + reconciled.cursors.forEach(({ id, position }) => { + const whereToJump = findWhereToMoveCursor( + position, + reconciled.history + ); + if (whereToJump !== null) { + updatedPositions[id] = whereToJump; + } else { + updatedPositions[id] = position; + } + }); + + RemoteCursorsPluginValue.cursors.forEach(({ span }, i) => { + span.start = updatedPositions[i * 2]; + span.end = updatedPositions[i * 2 + 1]; }); const decorations: Range[] = []; - cursors.forEach(({ name, span: { start, end } }) => { - const color = getRandomColor(name); - const startLine = update.view.state.doc.lineAt(start); - const endLine = update.view.state.doc.lineAt(end); + RemoteCursorsPluginValue.cursors.forEach( + ({ name, span: { start, end } }) => { + const color = getRandomColor(name); + const startLine = update.view.state.doc.lineAt(start); + const endLine = update.view.state.doc.lineAt(end); - const attributes = { - style: `background-color: ${color};` - }; + const attributes = { + style: `background-color: ${color};` + }; - if (startLine.number === endLine.number) { - // selected content in a single line. - decorations.push({ - from: start, - to: end, - value: Decoration.mark({ - attributes - }) - }); - } else { - // selected content in multiple lines - // first, render text-selection in the first line - decorations.push({ - from: start, - to: startLine.from + startLine.length, - value: Decoration.mark({ - attributes - }) - }); - - // render text-selection in the lines between the first and last line - for (let i = startLine.number + 1; i < endLine.number; i++) { - const currentLine = update.view.state.doc.line(i); + if (startLine.number === endLine.number) { + // selected content in a single line. decorations.push({ - from: currentLine.from, - to: currentLine.to, + from: start, + to: end, + value: Decoration.mark({ + attributes + }) + }); + } else { + // selected content in multiple lines + // first, render text-selection in the first line + decorations.push({ + from: start, + to: startLine.from + startLine.length, + value: Decoration.mark({ + attributes + }) + }); + + // render text-selection in the lines between the first and last line + for ( + let i = startLine.number + 1; + i < endLine.number; + i++ + ) { + const currentLine = update.view.state.doc.line(i); + decorations.push({ + from: currentLine.from, + to: currentLine.to, + value: Decoration.mark({ + attributes + }) + }); + } + + // render text-selection in the last line + decorations.push({ + from: endLine.from, + to: end, value: Decoration.mark({ attributes }) }); } - // render text-selection in the last line decorations.push({ - from: endLine.from, + from: end, to: end, - value: Decoration.mark({ - attributes + value: Decoration.widget({ + side: end - start > 0 ? -1 : 1, // the local cursor should be rendered outside the remote selection + block: false, + widget: new RemoteCursorWidget(color, name) }) }); } - - decorations.push({ - from: end, - to: end, - value: Decoration.widget({ - side: end - start > 0 ? -1 : 1, // the local cursor should be rendered outside the remote selection - block: false, - widget: new RemoteCursorWidget(color, name) - }) - }); - }); + ); this.decorations = Decoration.set(decorations, true); } @@ -115,43 +214,3 @@ export const remoteCursorsPlugin = ViewPlugin.fromClass( decorations: (v) => v.decorations } ); - -export function setCursors( - clients: MaybeOutdatedClientCursors[], - app: App -): void { - cursors = [ - ...cursors.filter(({ deviceId }) => - clients.some( - (client) => client.deviceId === deviceId && client.isOutdated - ) - ), - ...clients - .filter(({ isOutdated }) => !isOutdated) - .flatMap((client) => { - const clientCursors = client.documentsWithCursors; - return clientCursors.flatMap((cursor) => - cursor.cursors.map((span) => ({ - name: client.userName, - path: cursor.relative_path, - deviceId: client.deviceId, - span - })) - ); - }) - ]; - - app.workspace - .getLeavesOfType("markdown") - .map((leaf) => leaf.view) - .filter((view) => view instanceof MarkdownView) - .forEach((view) => { - // @ts-expect-error, not typed - // eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion - const editor = view.editor.cm as EditorView; - - editor.dispatch({ - effects: [forceUpdate.of(null)] - }); - }); -} diff --git a/frontend/obsidian-plugin/src/views/cursors/update-selection.test.ts b/frontend/obsidian-plugin/src/views/cursors/update-selection.test.ts deleted file mode 100644 index 991ff76b..00000000 --- a/frontend/obsidian-plugin/src/views/cursors/update-selection.test.ts +++ /dev/null @@ -1,111 +0,0 @@ -import { updateSelection } from "./update-selection"; - -describe("Selection update", () => { - it("should handle span fully before - insert", () => { - const spans = [{ start: 3, end: 5 }]; - updateSelection({ - fromA: 0, - toA: 0, - fromB: 0, - toB: 2, - spans - }); - expect(spans).toEqual([{ start: 5, end: 7 }]); - }); - - it("should handle span fully before - delete", () => { - const spans = [{ start: 3, end: 5 }]; - updateSelection({ - fromA: 0, - toA: 2, - fromB: 0, - toB: 0, - spans - }); - expect(spans).toEqual([{ start: 1, end: 3 }]); - }); - - it("should handle span fully after - insert", () => { - const spans = [{ start: 3, end: 5 }]; - updateSelection({ - fromA: 6, - toA: 6, - fromB: 6, - toB: 10, - spans - }); - expect(spans).toEqual([{ start: 3, end: 5 }]); - }); - - it("should handle span fully after - delete", () => { - const spans = [{ start: 3, end: 5 }]; - updateSelection({ - fromA: 6, - toA: 10, - fromB: 6, - toB: 6, - spans - }); - expect(spans).toEqual([{ start: 3, end: 5 }]); - }); - - it("should handle span fully within - insert", () => { - const spans = [{ start: 3, end: 5 }]; - updateSelection({ - fromA: 4, - toA: 4, - fromB: 4, - toB: 6, - spans - }); - expect(spans).toEqual([{ start: 3, end: 7 }]); - }); - - it("should handle span fully within - delete", () => { - const spans = [{ start: 3, end: 5 }]; - updateSelection({ - fromA: 4, - toA: 5, - fromB: 4, - toB: 4, - spans - }); - expect(spans).toEqual([{ start: 3, end: 4 }]); - }); - - it("should handle span overlapping with start", () => { - const spans = [{ start: 3, end: 5 }]; - updateSelection({ - fromA: 2, - toA: 4, - fromB: 2, - toB: 2, - spans - }); - expect(spans).toEqual([{ start: 2, end: 4 }]); - }); - - it("should handle span overlapping with end", () => { - const spans = [{ start: 3, end: 5 }]; - updateSelection({ - fromA: 4, - toA: 6, - fromB: 4, - toB: 4, - spans - }); - expect(spans).toEqual([{ start: 3, end: 4 }]); - }); - - it("delete entire selection", () => { - const spans = [{ start: 3, end: 5 }]; - updateSelection({ - fromA: 0, - toA: 10, - fromB: 0, - toB: 0, - spans - }); - expect(spans).toEqual([{ start: 0, end: 0 }]); - }); -}); diff --git a/frontend/obsidian-plugin/src/views/cursors/update-selection.ts b/frontend/obsidian-plugin/src/views/cursors/update-selection.ts deleted file mode 100644 index 9ff7c207..00000000 --- a/frontend/obsidian-plugin/src/views/cursors/update-selection.ts +++ /dev/null @@ -1,40 +0,0 @@ -import type { CursorSpan } from "sync-client"; - -export const updateSelection = ({ - fromA, - toA, - toB, - spans -}: { - fromA: number; - toA: number; - fromB: number; - toB: number; - spans: CursorSpan[]; -}): void => { - spans.forEach((span) => { - if (fromA <= span.start) { - // the change covers the entirety of the selection - if (toA > span.end) { - span.start = toB; - span.end = toB; - return; - } - - let change = toB - toA; - if (change < 0) { - // it's a deletion - // if overlaps with the start, we can't move it back more than the deleted range - change = Math.max(change, fromA - span.start); - } - - span.start += change; - span.end += change; - } else if (toA <= span.end) { - span.end += toB - toA; - } else if (toB <= span.end) { - // a deletion overlaps with the end, so we move the end - span.end = toB; - } - }); -}; diff --git a/frontend/package-lock.json b/frontend/package-lock.json index 36221400..f73ac157 100644 --- a/frontend/package-lock.json +++ b/frontend/package-lock.json @@ -6373,6 +6373,7 @@ "jest": "^29.7.0", "mini-css-extract-plugin": "^2.9.2", "obsidian": "1.8.7", + "reconcile-text": "^0.5.0", "resolve-url-loader": "^5.0.0", "sass": "^1.89.1", "sass-loader": "^16.0.5", diff --git a/frontend/sync-client/src/sync-client.ts b/frontend/sync-client/src/sync-client.ts index c7f0ea1b..ddab8860 100644 --- a/frontend/sync-client/src/sync-client.ts +++ b/frontend/sync-client/src/sync-client.ts @@ -14,26 +14,16 @@ import { ConnectionStatus } from "./services/connection-status"; import { UnrestrictedSyncer } from "./sync-operations/unrestricted-syncer"; import { rateLimit } from "./utils/rate-limit"; import type { NetworkConnectionStatus } from "./types/network-connection-status"; -import { DocumentUpdateStatus } from "./types/document-update-status"; +import { DocumentSyncStatus } from "./types/document-sync-status"; import { WebSocketManager } from "./services/websocket-manager"; import { createClientId } from "./utils/create-client-id"; +import { CursorTracker } from "./sync-operations/cursor-tracker"; import type { CursorSpan } from "./services/types/CursorSpan"; -import type { ClientCursors } from "./services/types/ClientCursors"; -import type { DocumentWithCursors } from "./services/types/DocumentWithCursors"; -import { hash } from "./utils/hash"; -import type { DocumentWithMaybeOutdatedClientCursors } from "./types/maybe-outdated-client-cursors"; - -enum DocumentUpToDateness { - UpToDate = "UpToDate", - Prior = "Prior", - Later = "Later" -} +import type { MaybeOutdatedClientCursors } from "./types/maybe-outdated-client-cursors"; +import { FileChangeNotifier } from "./sync-operations/file-change-notifier"; export class SyncClient { private static readonly MINIMUM_SAVE_INTERVAL_MS = 1000; - private lastCursorState: DocumentWithCursors[] = []; - - private readonly knownClientCursors: ClientCursors[] = []; // eslint-disable-next-line @typescript-eslint/max-params private constructor( @@ -45,7 +35,8 @@ export class SyncClient { private readonly webSocketManager: WebSocketManager, private readonly _logger: Logger, private readonly connectionStatus: ConnectionStatus, - private readonly fileOperations: FileOperations + private readonly cursorTracker: CursorTracker, + private readonly fileChangeNotifier: FileChangeNotifier ) { this.settings.addOnSettingsChangeListener( async (newSettings, oldSettings) => { @@ -54,10 +45,6 @@ export class SyncClient { } } ); - - this.webSocketManager.addRemoteCursorsUpdateListener((cursors) => { - this.knownClientCursors.push(...cursors); - }); } public get logger(): Logger { @@ -148,7 +135,6 @@ export class SyncClient { ); const syncer = new Syncer( - deviceId, logger, database, settings, @@ -166,6 +152,13 @@ export class SyncClient { webSocket ); + const fileChangeNotifier = new FileChangeNotifier(); + const cursorTracker = new CursorTracker( + database, + webSocketManager, + fileOperations, + fileChangeNotifier + ); const client = new SyncClient( history, settings, @@ -175,7 +168,8 @@ export class SyncClient { webSocketManager, logger, connectionStatus, - fileOperations + cursorTracker, + fileChangeNotifier ); logger.info("SyncClient initialised"); @@ -264,12 +258,14 @@ export class SyncClient { public async syncLocallyCreatedFile( relativePath: RelativePath ): Promise { + this.fileChangeNotifier.notifyOfFileChange(relativePath); return this.syncer.syncLocallyCreatedFile(relativePath); } public async syncLocallyDeletedFile( relativePath: RelativePath ): Promise { + this.fileChangeNotifier.notifyOfFileChange(relativePath); return this.syncer.syncLocallyDeletedFile(relativePath); } @@ -280,6 +276,7 @@ export class SyncClient { oldPath?: RelativePath; relativePath: RelativePath; }): Promise { + this.fileChangeNotifier.notifyOfFileChange(relativePath); return this.syncer.syncLocallyUpdatedFile({ oldPath, relativePath @@ -288,154 +285,26 @@ export class SyncClient { public getDocumentSyncingStatus( relativePath: RelativePath - ): DocumentUpdateStatus { + ): DocumentSyncStatus { const document = this.database.getLatestDocumentByRelativePath(relativePath); if (document === undefined) { - return DocumentUpdateStatus.SYNCING; + return DocumentSyncStatus.SYNCING; } return document.updates.length > 0 - ? DocumentUpdateStatus.SYNCING - : DocumentUpdateStatus.UP_TO_DATE; + ? DocumentSyncStatus.SYNCING + : DocumentSyncStatus.UP_TO_DATE; } - /// Update the local cursors for the given documents. - /// Can be called frequently as it only emits an event - // if the state has actually changed. public async updateLocalCursors( documentToCursors: Record ): Promise { - const documentsWithCursors: DocumentWithCursors[] = []; - - for (const [relativePath, cursors] of Object.entries( - documentToCursors - )) { - const record = - this.database.getLatestDocumentByRelativePath(relativePath); - - if (!record) { - continue; // Let's wait for the file to be created before sending cursors - } - - const readContent = await this.fileOperations.read(relativePath); - - if (record.metadata?.hash !== hash(readContent)) { - continue; // Wouldn't make sense to sync the positions in a dirty file - } - - documentsWithCursors.push({ - relative_path: relativePath, - document_id: record.documentId, - vault_update_id: record.metadata.parentVersionId, - cursors - }); - } - - if ( - JSON.stringify(this.lastCursorState) === - JSON.stringify(documentsWithCursors) - ) { - return; - } - - this.lastCursorState = documentsWithCursors; - - this.webSocketManager.updateLocalCursors({ documentsWithCursors }); + await this.cursorTracker.sendLocalCursorsToServer(documentToCursors); } public addRemoteCursorsUpdateListener( - listener: (cursors: DocumentWithMaybeOutdatedClientCursors[]) => unknown + listener: (cursors: MaybeOutdatedClientCursors[]) => unknown ): void { - this.webSocketManager.addRemoteCursorsUpdateListener(async () => { - listener(await this.getRelevantClientCursors()); - }); - } - - private async getRelevantClientCursors(): Promise< - DocumentWithMaybeOutdatedClientCursors[] - > { - const result: DocumentWithMaybeOutdatedClientCursors[] = []; - const included = new Set(); - for (const clientCursors of [...this.knownClientCursors].reverse()) { - if (included.has(clientCursors.deviceId)) { - continue; - } - - const upToDateness = - await this.getDocumentsUpToDateness(clientCursors); - if (upToDateness == DocumentUpToDateness.Later) { - continue; - } - - result.push({ - ...clientCursors, - isOutdated: upToDateness == DocumentUpToDateness.Prior - }); - - included.add(clientCursors.deviceId); - } - - return result; - } - - private async getDocumentsUpToDateness( - clientCursor: ClientCursors - ): Promise { - const results = []; - for (const document of clientCursor.documentsWithCursors) { - results.push(await this.getDocumentUpToDateness(document)); - } - - if ( - results.every((result) => result === DocumentUpToDateness.UpToDate) - ) { - return DocumentUpToDateness.UpToDate; - } - - if ( - results.every( - (result) => - result === DocumentUpToDateness.UpToDate || - result === DocumentUpToDateness.Prior - ) - ) { - return DocumentUpToDateness.Prior; - } - - return DocumentUpToDateness.Later; - } - - private async getDocumentUpToDateness( - document: DocumentWithCursors - ): Promise { - const record = this.database.getLatestDocumentByRelativePath( - document.relative_path - ); - - if (!record) { - // the document of the cursor must be from the future - return DocumentUpToDateness.Later; - } - - if ( - (record.metadata?.parentVersionId ?? 0) < document.vault_update_id - ) { - return DocumentUpToDateness.Later; - } else if ( - document.vault_update_id < (record.metadata?.parentVersionId ?? 0) - ) { - // the document of the cursor must be from the past - return DocumentUpToDateness.Prior; - } - - const currentContent = await this.fileOperations.read( - document.relative_path - ); - - return this.database.getLatestDocumentByRelativePath( - document.relative_path - )?.metadata?.hash === hash(currentContent) - ? DocumentUpToDateness.UpToDate - : DocumentUpToDateness.Prior; + this.cursorTracker.addRemoteCursorsUpdateListener(listener); } } diff --git a/frontend/sync-client/src/sync-operations/cursor-tracker.ts b/frontend/sync-client/src/sync-operations/cursor-tracker.ts new file mode 100644 index 00000000..e50c9e1f --- /dev/null +++ b/frontend/sync-client/src/sync-operations/cursor-tracker.ts @@ -0,0 +1,250 @@ +import type { FileOperations } from "../file-operations/file-operations"; +import type { Database, RelativePath } from "../persistence/database"; +import type { ClientCursors } from "../services/types/ClientCursors"; +import type { CursorSpan } from "../services/types/CursorSpan"; +import type { DocumentWithCursors } from "../services/types/DocumentWithCursors"; +import type { WebSocketManager } from "../services/websocket-manager"; +import type { MaybeOutdatedClientCursors } from "../types/maybe-outdated-client-cursors"; +import { DocumentUpToDateness } from "../types/document-up-to-dateness"; +import { hash } from "../utils/hash"; +import type { FileChangeNotifier } from "./file-change-notifier"; +import { Lock } from "../utils/locks"; + +// Cursor positions are updated separately from documents. However, a given cursor position is only +// valid within a certain version of the document it belongs to. This class tracks previous and the latest +// known remote cursor positions, and for each document, tries to return the latest cursor positions that are +// not from the future. +export class CursorTracker { + private readonly updateLock = new Lock(); + + private knownRemoteCursors: (ClientCursors & { + upToDateness: DocumentUpToDateness; + })[] = []; + + private lastLocalCursorState: DocumentWithCursors[] = []; + private lastLocalCursorStateWithoutDirtyDocuments: DocumentWithCursors[] = + []; + + public constructor( + private readonly database: Database, + private readonly webSocketManager: WebSocketManager, + private readonly fileOperations: FileOperations, + private readonly fileChangeNotifier: FileChangeNotifier + ) { + this.webSocketManager.addRemoteCursorsUpdateListener( + async (clientCursors) => { + await this.updateLock.withLock(async () => { + // The latest message will contain all active clients, so we can delete the ones + // from the local list which are no longer active. + const allIds = new Set( + clientCursors.map((c) => c.deviceId) + ); + const updatedKnownRemoteCursors = + this.knownRemoteCursors.filter((c) => + allIds.has(c.deviceId) + ); + + for (const cursor of clientCursors.filter((client) => + client.documentsWithCursors.every( + (doc) => doc.vault_update_id != null + ) + )) { + updatedKnownRemoteCursors.push({ + ...cursor, + upToDateness: + await this.getDocumentsUpToDateness(cursor) + }); + } + + this.knownRemoteCursors = updatedKnownRemoteCursors; + }); + } + ); + + this.fileChangeNotifier.addFileChangeListener(async (relativePath) => { + this.updateLock.withLock(async () => { + for (const clientCursor of this.knownRemoteCursors) { + if ( + clientCursor.documentsWithCursors.some( + (document) => + document.relative_path === relativePath + ) + ) { + clientCursor.upToDateness = + await this.getDocumentsUpToDateness(clientCursor); + } + } + }); + }); + } + + /// Update the local cursors for the given documents. + /// Can be called frequently as it only emits an event + /// if the state has actually changed. + public async sendLocalCursorsToServer( + documentToCursors: Record + ): Promise { + const documentsWithCursors: DocumentWithCursors[] = []; + + for (const [relativePath, cursors] of Object.entries( + documentToCursors + )) { + const record = + this.database.getLatestDocumentByRelativePath(relativePath); + + if (!record) { + continue; // Let's wait for the file to be created before sending cursors + } + + if (!record.metadata) { + continue; // this is a new document, no need to sync the cursors + } + + documentsWithCursors.push({ + relative_path: relativePath, + document_id: record.documentId, + vault_update_id: record.metadata.parentVersionId, + cursors + }); + } + + if ( + JSON.stringify(this.lastLocalCursorState) === + JSON.stringify(documentsWithCursors) + ) { + // Caching step to avoid reading the edited files all the time + return; + } + this.lastLocalCursorState = documentsWithCursors; + + for (const doc of documentsWithCursors) { + const readContent = await this.fileOperations.read( + doc.relative_path + ); + const record = this.database.getLatestDocumentByRelativePath( + doc.relative_path + ); + if (record?.metadata?.hash !== hash(readContent)) { + doc.vault_update_id = null; + } + } + + if ( + JSON.stringify(this.lastLocalCursorStateWithoutDirtyDocuments) === + JSON.stringify(documentsWithCursors) + ) { + return; + } + + this.lastLocalCursorStateWithoutDirtyDocuments = documentsWithCursors; + + this.webSocketManager.updateLocalCursors({ documentsWithCursors }); + } + + // The returned position may be accurate, if it matches the document version, or outdated, in which case + // the client has to heuristically guess it's current position based on the local edits. + public addRemoteCursorsUpdateListener( + listener: (cursors: MaybeOutdatedClientCursors[]) => unknown + ): void { + // CursorTracker registers its own event listener in the constructor so it must get called first + this.webSocketManager.addRemoteCursorsUpdateListener(async () => { + await this.updateLock.withLock(() => + listener(this.getRelevantAndPruneKnownClientCursors()) + ); + }); + } + + private getRelevantAndPruneKnownClientCursors(): MaybeOutdatedClientCursors[] { + const result: MaybeOutdatedClientCursors[] = []; + const included = new Set(); + + const relevantCursors = []; + for (const clientCursors of [...this.knownRemoteCursors].reverse()) { + if (included.has(clientCursors.deviceId)) { + continue; + } + + if (clientCursors.upToDateness == DocumentUpToDateness.Later) { + continue; + } + + result.push({ + ...clientCursors, + isOutdated: + clientCursors.upToDateness == DocumentUpToDateness.Prior + }); + + included.add(clientCursors.deviceId); + relevantCursors.unshift(clientCursors); // to reverse order back to normal + } + + this.knownRemoteCursors = relevantCursors; + + return result; + } + + // We store up-to-dateness on a per-client basis to simplify the implementation. + // An individual client won't have too many documents open at once, so this is a reasonable trade-off. + private async getDocumentsUpToDateness( + clientCursor: ClientCursors + ): Promise { + const results = []; + for (const document of clientCursor.documentsWithCursors) { + results.push(await this.getDocumentUpToDateness(document)); + } + + if ( + results.every((result) => result === DocumentUpToDateness.UpToDate) + ) { + return DocumentUpToDateness.UpToDate; + } + + if ( + results.every( + (result) => + result === DocumentUpToDateness.UpToDate || + result === DocumentUpToDateness.Prior + ) + ) { + return DocumentUpToDateness.Prior; + } + + return DocumentUpToDateness.Later; + } + + private async getDocumentUpToDateness( + document: DocumentWithCursors + ): Promise { + const record = this.database.getLatestDocumentByRelativePath( + document.relative_path + ); + + if (!record) { + // the document of the cursor must be from the future + return DocumentUpToDateness.Later; + } + + if ( + (record.metadata?.parentVersionId ?? 0) < + (document.vault_update_id ?? 0) + ) { + return DocumentUpToDateness.Later; + } else if ( + (document.vault_update_id ?? 0) < + (record.metadata?.parentVersionId ?? 0) + ) { + // the document of the cursor must be from the past + return DocumentUpToDateness.Prior; + } + + const currentContent = await this.fileOperations.read( + document.relative_path + ); + + return this.database.getLatestDocumentByRelativePath( + document.relative_path + )?.metadata?.hash === hash(currentContent) + ? DocumentUpToDateness.UpToDate + : DocumentUpToDateness.Prior; + } +} diff --git a/frontend/sync-client/src/sync-operations/file-change-notifier.ts b/frontend/sync-client/src/sync-operations/file-change-notifier.ts new file mode 100644 index 00000000..8a7af66c --- /dev/null +++ b/frontend/sync-client/src/sync-operations/file-change-notifier.ts @@ -0,0 +1,15 @@ +import type { RelativePath } from "../persistence/database"; + +export class FileChangeNotifier { + private readonly listeners: ((filePath: RelativePath) => unknown)[] = []; + + public addFileChangeListener( + listener: (filePath: RelativePath) => unknown + ): void { + this.listeners.push(listener); + } + + public notifyOfFileChange(filePath: RelativePath): void { + this.listeners.forEach((listener) => listener(filePath)); + } +} diff --git a/frontend/sync-client/src/sync-operations/syncer.ts b/frontend/sync-client/src/sync-operations/syncer.ts index 7e9301a5..186b9a9b 100644 --- a/frontend/sync-client/src/sync-operations/syncer.ts +++ b/frontend/sync-client/src/sync-operations/syncer.ts @@ -9,7 +9,7 @@ import type { Logger } from "../tracing/logger"; import PQueue from "p-queue"; import { hash } from "../utils/hash"; import { v4 as uuidv4 } from "uuid"; -import type { Settings, SyncSettings } from "../persistence/settings"; +import type { Settings } from "../persistence/settings"; import type { FileOperations } from "../file-operations/file-operations"; import { findMatchingFile } from "../utils/find-matching-file"; import type { UnrestrictedSyncer } from "./unrestricted-syncer"; @@ -27,12 +27,10 @@ export class Syncer { private runningScheduleSyncForOfflineChanges: Promise | undefined; - // eslint-disable-next-line @typescript-eslint/max-params public constructor( - private readonly deviceId: string, private readonly logger: Logger, private readonly database: Database, - private readonly settings: Settings, + settings: Settings, private readonly syncService: SyncService, private readonly operations: FileOperations, private readonly internalSyncer: UnrestrictedSyncer @@ -261,58 +259,77 @@ export class Syncer { remoteVersion.documentId ); - let hasLockToRelease = false; if (document === undefined) { // Let's avoid the same documents getting created in parallel multiple times. // There might be multiple tasks waiting for the lock - await this.remoteDocumentsLock.waitForLock( - remoteVersion.documentId - ); - hasLockToRelease = true; - document = this.database.getDocumentByDocumentId( - remoteVersion.documentId + return this.remoteDocumentsLock.withLock( + remoteVersion.documentId, + async () => { + document = this.database.getDocumentByDocumentId( + remoteVersion.documentId + ); + + // We're either the first one to get the lock, so we have to create the document in `unrestrictedSyncRemotelyUpdatedFile` + if (document === undefined) { + await this.syncQueue.add(async () => + this.internalSyncer.unrestrictedSyncRemotelyUpdatedFile( + remoteVersion + ) + ); + } else { + const [promise, resolve, reject] = createPromise(); + + document = + await this.database.getResolvedDocumentByRelativePath( + document.relativePath, + promise + ); + + try { + await this.syncQueue.add(async () => + this.internalSyncer.unrestrictedSyncRemotelyUpdatedFile( + remoteVersion, + document + ) + ); + + resolve(); + } catch (e) { + reject(e); + } finally { + this.database.removeDocumentPromise(promise); + } + } + + this.database.addSeenUpdateId(remoteVersion.vaultUpdateId); + } ); } + // We're either the first one to get the lock, so we have to create the document in `unrestrictedSyncRemotelyUpdatedFile` + const [promise, resolve, reject] = createPromise(); + + document = await this.database.getResolvedDocumentByRelativePath( + document.relativePath, + promise + ); + try { - // We're either the first one to get the lock, so we have to create the document in `unrestrictedSyncRemotelyUpdatedFile` - if (document === undefined) { - await this.syncQueue.add(async () => - this.internalSyncer.unrestrictedSyncRemotelyUpdatedFile( - remoteVersion - ) - ); - } else { - const [promise, resolve, reject] = createPromise(); + await this.syncQueue.add(async () => + this.internalSyncer.unrestrictedSyncRemotelyUpdatedFile( + remoteVersion, + document + ) + ); - document = - await this.database.getResolvedDocumentByRelativePath( - document.relativePath, - promise - ); - - try { - await this.syncQueue.add(async () => - this.internalSyncer.unrestrictedSyncRemotelyUpdatedFile( - remoteVersion, - document - ) - ); - - resolve(); - } catch (e) { - reject(e); - } finally { - this.database.removeDocumentPromise(promise); - } - } - - this.database.addSeenUpdateId(remoteVersion.vaultUpdateId); + resolve(); + } catch (e) { + reject(e); } finally { - if (hasLockToRelease) { - this.remoteDocumentsLock.unlock(remoteVersion.documentId); - } + this.database.removeDocumentPromise(promise); } + + this.database.addSeenUpdateId(remoteVersion.vaultUpdateId); } private async internalScheduleSyncForOfflineChanges(): Promise { diff --git a/frontend/sync-client/src/utils/create-promise.ts b/frontend/sync-client/src/utils/create-promise.ts index f644dfbd..9d6c41a2 100644 --- a/frontend/sync-client/src/utils/create-promise.ts +++ b/frontend/sync-client/src/utils/create-promise.ts @@ -6,7 +6,7 @@ type ResolveFunction = undefined extends T * A type-safe utility function to create a Promise with resolve and reject functions. * @returns A tuple containing a Promise, a resolve function, and a reject function. */ -export function createPromise(): [ +export function createPromise(): [ Promise, ResolveFunction, (error: unknown) => unknown diff --git a/frontend/sync-client/src/utils/locks.test.ts b/frontend/sync-client/src/utils/locks.test.ts index 33d99da9..a721d2aa 100644 --- a/frontend/sync-client/src/utils/locks.test.ts +++ b/frontend/sync-client/src/utils/locks.test.ts @@ -2,8 +2,9 @@ import { Logger } from "../tracing/logger"; import type { RelativePath } from "../persistence/database"; import { Locks } from "./locks"; -describe("Document lock", () => { +describe("withLock", () => { const testPath: RelativePath = "test/document/path"; + const testPath2: RelativePath = "test/document/path2"; const logger = new Logger(); // eslint-disable-next-line @typescript-eslint/init-declarations @@ -13,77 +14,197 @@ describe("Document lock", () => { locks = new Locks(logger); }); - test("should lock a document successfully", () => { - const result = locks.tryLock(testPath); - expect(result).toBe(true); - }); - - test("should not lock a document that is already locked", () => { - locks.tryLock(testPath); - const result = locks.tryLock(testPath); - expect(result).toBe(false); - }); - - test("should unlock a locked document", () => { - locks.tryLock(testPath); - locks.unlock(testPath); - const result = locks.tryLock(testPath); - expect(result).toBe(true); - locks.unlock(testPath); - }); - - test("should throw an error when unlocking a document that is not locked", () => { - expect(() => { - locks.unlock(testPath); - }).toThrow(`Key '${testPath}' is not locked, cannot unlock`); - }); - - test("should wait for a document lock and resolve when unlocked", async () => { - locks.tryLock(testPath); - - let resolved = false; - const waitPromise = locks.waitForLock(testPath).then(() => { - resolved = true; + test("should execute function with single key lock", async () => { + let executionCount = 0; + const result = await locks.withLock(testPath, () => { + executionCount++; + return "success"; }); - locks.unlock(testPath); - await waitPromise; - - expect(resolved).toBe(true); + expect(result).toBe("success"); + expect(executionCount).toBe(1); }); - test("should resolve multiple waiters in FIFO order", async () => { - locks.tryLock(testPath); - - let firstResolved = false; - let secondResolved = false; - let thirdResolved = false; - - const firstWaitPromise = locks.waitForLock(testPath).then(() => { - firstResolved = true; + test("should execute async function with single key lock", async () => { + let executionCount = 0; + const result = await locks.withLock(testPath, async () => { + executionCount++; + await new Promise(resolve => setTimeout(resolve, 10)); + return "async-success"; }); - const secondWaitPromise = locks.waitForLock(testPath).then(() => { - secondResolved = true; - }); - - const thirdWaitPromise = locks.waitForLock(testPath).then(() => { - thirdResolved = true; - }); - - locks.unlock(testPath); - await firstWaitPromise; - expect(firstResolved).toBe(true); - expect(secondResolved).toBe(false); - expect(thirdResolved).toBe(false); - - locks.unlock(testPath); - await secondWaitPromise; - expect(secondResolved).toBe(true); - expect(thirdResolved).toBe(false); - - locks.unlock(testPath); - await thirdWaitPromise; - expect(thirdResolved).toBe(true); + expect(result).toBe("async-success"); + expect(executionCount).toBe(1); }); -}); + + test("should execute function with multiple key locks", async () => { + let executionCount = 0; + const result = await locks.withLock([testPath, testPath2], () => { + executionCount++; + return "multi-success"; + }); + + expect(result).toBe("multi-success"); + expect(executionCount).toBe(1); + }); + + test("should sort multiple keys to prevent deadlocks", async () => { + const executionOrder: string[] = []; + + // Start two concurrent operations with keys in different orders + const promise1 = locks.withLock([testPath2, testPath], async () => { + executionOrder.push("operation1-start"); + await new Promise(resolve => setTimeout(resolve, 50)); + executionOrder.push("operation1-end"); + return "result1"; + }); + + const promise2 = locks.withLock([testPath, testPath2], async () => { + executionOrder.push("operation2-start"); + await new Promise(resolve => setTimeout(resolve, 50)); + executionOrder.push("operation2-end"); + return "result2"; + }); + + const [result1, result2] = await Promise.all([promise1, promise2]); + + expect(result1).toBe("result1"); + expect(result2).toBe("result2"); + // One operation should complete entirely before the other starts + expect(executionOrder).toEqual([ + "operation1-start", + "operation1-end", + "operation2-start", + "operation2-end" + ]); + }); + + test("should serialize access to same key", async () => { + const executionOrder: string[] = []; + + const promise1 = locks.withLock(testPath, async () => { + executionOrder.push("operation1-start"); + await new Promise(resolve => setTimeout(resolve, 50)); + executionOrder.push("operation1-end"); + return "result1"; + }); + + const promise2 = locks.withLock(testPath, async () => { + executionOrder.push("operation2-start"); + await new Promise(resolve => setTimeout(resolve, 30)); + executionOrder.push("operation2-end"); + return "result2"; + }); + + const [result1, result2] = await Promise.all([promise1, promise2]); + + expect(result1).toBe("result1"); + expect(result2).toBe("result2"); + expect(executionOrder).toEqual([ + "operation1-start", + "operation1-end", + "operation2-start", + "operation2-end" + ]); + }); + + test("should allow concurrent access to different keys", async () => { + const executionOrder: string[] = []; + + const promise1 = locks.withLock(testPath, async () => { + executionOrder.push("operation1-start"); + await new Promise(resolve => setTimeout(resolve, 50)); + executionOrder.push("operation1-end"); + return "result1"; + }); + + const promise2 = locks.withLock(testPath2, async () => { + executionOrder.push("operation2-start"); + await new Promise(resolve => setTimeout(resolve, 30)); + executionOrder.push("operation2-end"); + return "result2"; + }); + + const [result1, result2] = await Promise.all([promise1, promise2]); + + expect(result1).toBe("result1"); + expect(result2).toBe("result2"); + // Both operations should run concurrently + expect(executionOrder[0]).toBe("operation1-start"); + expect(executionOrder[1]).toBe("operation2-start"); + }); + + test("should release locks even if function throws", async () => { + const error = new Error("test error"); + + await expect(locks.withLock(testPath, () => { + throw error; + })).rejects.toThrow("test error"); + + // Lock should be released, allowing another operation + const result = await locks.withLock(testPath, () => "success-after-error"); + expect(result).toBe("success-after-error"); + }); + + test("should release locks even if async function throws", async () => { + const error = new Error("async test error"); + + await expect(locks.withLock(testPath, async () => { + await new Promise(resolve => setTimeout(resolve, 10)); + throw error; + })).rejects.toThrow("async test error"); + + // Lock should be released, allowing another operation + const result = await locks.withLock(testPath, () => "success-after-async-error"); + expect(result).toBe("success-after-async-error"); + }); + + test("should handle empty array of keys", async () => { + const result = await locks.withLock([], () => "empty-keys"); + expect(result).toBe("empty-keys"); + }); + + test("should maintain FIFO order for multiple waiters", async () => { + const executionOrder: string[] = []; + + // Start first operation that holds the lock + const firstPromise = locks.withLock(testPath, async () => { + executionOrder.push("first-start"); + await new Promise(resolve => setTimeout(resolve, 100)); + executionOrder.push("first-end"); + return "first"; + }); + + // Small delay to ensure first operation starts + await new Promise(resolve => setTimeout(resolve, 10)); + + // Queue second and third operations + const secondPromise = locks.withLock(testPath, async () => { + executionOrder.push("second-start"); + await new Promise(resolve => setTimeout(resolve, 30)); + executionOrder.push("second-end"); + return "second"; + }); + + const thirdPromise = locks.withLock(testPath, async () => { + executionOrder.push("third-start"); + await new Promise(resolve => setTimeout(resolve, 20)); + executionOrder.push("third-end"); + return "third"; + }); + + const [first, second, third] = await Promise.all([firstPromise, secondPromise, thirdPromise]); + + expect(first).toBe("first"); + expect(second).toBe("second"); + expect(third).toBe("third"); + expect(executionOrder).toEqual([ + "first-start", + "first-end", + "second-start", + "second-end", + "third-start", + "third-end" + ]); + }); +}); \ No newline at end of file