diff --git a/frontend/obsidian-plugin/src/vault-link-plugin.ts b/frontend/obsidian-plugin/src/vault-link-plugin.ts index 0f3e5fb8..50048e83 100644 --- a/frontend/obsidian-plugin/src/vault-link-plugin.ts +++ b/frontend/obsidian-plugin/src/vault-link-plugin.ts @@ -11,7 +11,7 @@ import { HistoryView } from "./views/history/history-view"; import { StatusBar } from "./views/status-bar/status-bar"; import { LogsView } from "./views/logs/logs-view"; import { StatusDescription } from "./views/status-description/status-description"; -import { SyncClient, rateLimit, DEFAULT_SETTINGS } from "sync-client"; +import { SyncClient, rateLimit, DEFAULT_SETTINGS, Logger } from "sync-client"; import { ObsidianFileSystemOperations } from "./obsidian-file-system"; import { SyncSettingsTab } from "./views/settings/settings-tab"; import { logToConsole } from "./utils/log-to-console"; @@ -26,6 +26,7 @@ import { slowFetchFactory } from "./debugging/slow-fetch-factory"; import { flakyWebSocketFactory } from "./debugging/flaky-websocket-factory"; const MIN_WAIT_BETWEEN_UPDATES_IN_MS = 250; + export default class VaultLinkPlugin extends Plugin { private readonly disposables: (() => unknown)[] = []; @@ -61,7 +62,8 @@ export default class VaultLinkPlugin extends Plugin { load: this.loadData.bind(this), save: this.saveData.bind(this) }, - nativeLineEndings: Platform.isWin ? "\r\n" : "\n" + nativeLineEndings: Platform.isWin ? "\r\n" : "\n", + ...debugOptions }); logToConsole(this.client); diff --git a/frontend/obsidian-plugin/src/views/cursors/local-cursor-update-listener.ts b/frontend/obsidian-plugin/src/views/cursors/local-cursor-update-listener.ts index 883a92ea..da67c70d 100644 --- a/frontend/obsidian-plugin/src/views/cursors/local-cursor-update-listener.ts +++ b/frontend/obsidian-plugin/src/views/cursors/local-cursor-update-listener.ts @@ -7,7 +7,6 @@ import { getSelectionsFromEditor } from "./get-selections-from-editor"; export class LocalCursorUpdateListener { private static readonly UPDATE_INTERVAL_MS = 50; private readonly eventHandle: NodeJS.Timeout; - private lastCursorState: Record = {}; public constructor( private readonly client: SyncClient, @@ -24,13 +23,6 @@ export class LocalCursorUpdateListener { private updateAllSelections(): void { const currentCursors = this.getAllSelections(); - if ( - JSON.stringify(this.lastCursorState) === - JSON.stringify(currentCursors) - ) { - return; - } - this.lastCursorState = currentCursors; this.client .updateLocalCursors(currentCursors) .catch((error: unknown) => { diff --git a/frontend/obsidian-plugin/src/views/cursors/remote-cursors-plugin.ts b/frontend/obsidian-plugin/src/views/cursors/remote-cursors-plugin.ts index e7797d1a..661aa452 100644 --- a/frontend/obsidian-plugin/src/views/cursors/remote-cursors-plugin.ts +++ b/frontend/obsidian-plugin/src/views/cursors/remote-cursors-plugin.ts @@ -1,6 +1,6 @@ import type { Range } from "@codemirror/state"; -import { RangeSet, Annotation, AnnotationType } from "@codemirror/state"; -import { ViewPlugin, Decoration, WidgetType } from "@codemirror/view"; +import { RangeSet } from "@codemirror/state"; +import { ViewPlugin, Decoration } from "@codemirror/view"; import type { PluginValue, @@ -9,7 +9,10 @@ import type { ViewUpdate } from "@codemirror/view"; import { RemoteCursorWidget } from "./remote-cursor-widget"; -import type { ClientCursors, CursorSpan } from "sync-client"; +import type { + CursorSpan, + DocumentWithMaybeOutdatedClientCursors +} from "sync-client"; import type { App } from "obsidian"; import { MarkdownView } from "obsidian"; @@ -17,10 +20,12 @@ let cursors: { name: string; path: string; span: CursorSpan; + deviceId: string; }[] = []; import { StateEffect } from "@codemirror/state"; import { getRandomColor } from "src/utils/get-random-color"; +import { updateSelection } from "./update-selection"; const forceUpdate = StateEffect.define(); @@ -28,6 +33,17 @@ export class RemoteCursorsPluginValue implements PluginValue { public decorations: DecorationSet = RangeSet.of([]); public update(update: ViewUpdate): void { + update.changes.iterChanges((fromA, toA, fromB, toB, _inserted) => { + const spans = cursors.map((cursor) => cursor.span); + updateSelection({ + fromA, + toA, + fromB, + toB, + spans + }); + }); + const decorations: Range[] = []; cursors.forEach(({ name, span: { start, end } }) => { @@ -103,20 +119,30 @@ export const remoteCursorsPlugin = ViewPlugin.fromClass( } ); -export function setCursors(clients: ClientCursors[], app: App): void { - cursors = clients.flatMap((client) => { - const clientCursors = client.cursors; - return Object.keys(clientCursors).flatMap((path) => { - const spans = clientCursors[path]; - return spans - ? spans.map((span) => ({ +export function setCursors( + clients: DocumentWithMaybeOutdatedClientCursors[], + app: App +): void { + cursors = [ + ...cursors.filter(({ deviceId }) => + clients.some( + (client) => client.deviceId === deviceId && client.isOutdated + ) + ), + ...clients + .filter(({ isOutdated }) => !isOutdated) + .flatMap((client) => { + const clientCursors = client.documentsWithCursors; + return clientCursors.flatMap((cursor) => + cursor.cursors.map((span) => ({ name: client.userName, - path, + path: cursor.relative_path, + deviceId: client.deviceId, span })) - : []; - }); - }); + ); + }) + ]; app.workspace .getLeavesOfType("markdown") diff --git a/frontend/sync-client/src/index.ts b/frontend/sync-client/src/index.ts index 4a5f5d1e..ce903f25 100644 --- a/frontend/sync-client/src/index.ts +++ b/frontend/sync-client/src/index.ts @@ -18,6 +18,7 @@ export type { PersistenceProvider } from "./persistence/persistence"; export type { CursorSpan } from "./services/types/CursorSpan"; export type { ClientCursors } from "./services/types/ClientCursors"; export type { NetworkConnectionStatus } from "./types/network-connection-status"; +export type { DocumentWithMaybeOutdatedClientCursors } from "./types/maybe-outdated-client-cursors"; export { DocumentUpdateStatus } from "./types/document-update-status"; export { SyncClient } from "./sync-client"; diff --git a/frontend/sync-client/src/sync-client.ts b/frontend/sync-client/src/sync-client.ts index 41ab6781..972d2cd3 100644 --- a/frontend/sync-client/src/sync-client.ts +++ b/frontend/sync-client/src/sync-client.ts @@ -19,9 +19,21 @@ import { WebSocketManager } from "./services/websocket-manager"; import { createClientId } from "./utils/create-client-id"; import type { CursorSpan } from "./services/types/CursorSpan"; import type { ClientCursors } from "./services/types/ClientCursors"; +import type { DocumentWithCursors } from "./services/types/DocumentWithCursors"; +import { hash } from "./utils/hash"; +import type { DocumentWithMaybeOutdatedClientCursors } from "./types/maybe-outdated-client-cursors"; + +enum DocumentUpToDateness { + UpToDate = "UpToDate", + Prior = "Prior", + Later = "Later" +} export class SyncClient { private static readonly MINIMUM_SAVE_INTERVAL_MS = 1000; + private lastCursorState: DocumentWithCursors[] = []; + + private readonly knownClientCursors: ClientCursors[] = []; // eslint-disable-next-line @typescript-eslint/max-params private constructor( @@ -32,7 +44,8 @@ export class SyncClient { private readonly syncService: SyncService, private readonly webSocketManager: WebSocketManager, private readonly _logger: Logger, - private readonly connectionStatus: ConnectionStatus + private readonly connectionStatus: ConnectionStatus, + private readonly fileOperations: FileOperations ) { this.settings.addOnSettingsChangeListener( (newSettings, oldSettings) => { @@ -41,6 +54,10 @@ export class SyncClient { } } ); + + this.webSocketManager.addRemoteCursorsUpdateListener((cursors) => { + this.knownClientCursors.push(...cursors); + }); } public get logger(): Logger { @@ -157,7 +174,8 @@ export class SyncClient { syncService, webSocketManager, logger, - connectionStatus + connectionStatus, + fileOperations ); logger.info("SyncClient initialised"); @@ -268,18 +286,6 @@ export class SyncClient { }); } - public async updateLocalCursors( - documentToCursors: Record - ): Promise { - this.webSocketManager.updateLocalCursors({ documentToCursors }); - } - - public addRemoteCursorsUpdateListener( - listener: (cursors: ClientCursors[]) => void - ): void { - this.webSocketManager.addRemoteCursorsUpdateListener(listener); - } - public getDocumentSyncingStatus( relativePath: RelativePath ): DocumentUpdateStatus { @@ -292,4 +298,144 @@ export class SyncClient { ? DocumentUpdateStatus.SYNCING : DocumentUpdateStatus.UP_TO_DATE; } + + /// Update the local cursors for the given documents. + /// Can be called frequently as it only emits an event + // if the state has actually changed. + public async updateLocalCursors( + documentToCursors: Record + ): Promise { + const documentsWithCursors: DocumentWithCursors[] = []; + + for (const [relativePath, cursors] of Object.entries( + documentToCursors + )) { + const record = + this.database.getLatestDocumentByRelativePath(relativePath); + + if (!record) { + continue; // Let's wait for the file to be created before sending cursors + } + + const readContent = await this.fileOperations.read(relativePath); + + if (record.metadata?.hash !== hash(readContent)) { + continue; // Wouldn't make sense to sync the positions in a dirty file + } + + documentsWithCursors.push({ + relative_path: relativePath, + document_id: record.documentId, + vault_update_id: record.metadata.parentVersionId, + cursors + }); + } + + if ( + JSON.stringify(this.lastCursorState) === + JSON.stringify(documentsWithCursors) + ) { + return; + } + + this.lastCursorState = documentsWithCursors; + + this.webSocketManager.updateLocalCursors({ documentsWithCursors }); + } + + public addRemoteCursorsUpdateListener( + listener: (cursors: DocumentWithMaybeOutdatedClientCursors[]) => void + ): void { + this.webSocketManager.addRemoteCursorsUpdateListener(async () => { + listener(await this.getRelevantClientCursors()); + }); + } + + private async getRelevantClientCursors(): Promise< + DocumentWithMaybeOutdatedClientCursors[] + > { + const result: DocumentWithMaybeOutdatedClientCursors[] = []; + const included = new Set(); + for (const clientCursors of [...this.knownClientCursors].reverse()) { + if (included.has(clientCursors.deviceId)) { + continue; + } + + const upToDateness = + await this.getDocumentsUpToDateness(clientCursors); + if (upToDateness == DocumentUpToDateness.Later) { + continue; + } + + result.push({ + ...clientCursors, + isOutdated: upToDateness == DocumentUpToDateness.Prior + }); + + included.add(clientCursors.deviceId); + } + + return result; + } + + private async getDocumentsUpToDateness( + clientCursor: ClientCursors + ): Promise { + const results = []; + for (const document of clientCursor.documentsWithCursors) { + results.push(await this.getDocumentUpToDateness(document)); + } + + if ( + results.every((result) => result === DocumentUpToDateness.UpToDate) + ) { + return DocumentUpToDateness.UpToDate; + } + + if ( + results.every( + (result) => + result === DocumentUpToDateness.UpToDate || + result === DocumentUpToDateness.Prior + ) + ) { + return DocumentUpToDateness.Prior; + } + + return DocumentUpToDateness.Later; + } + + private async getDocumentUpToDateness( + document: DocumentWithCursors + ): Promise { + const record = this.database.getLatestDocumentByRelativePath( + document.relative_path + ); + + if (!record) { + // the document of the cursor must be from the future + return DocumentUpToDateness.Later; + } + + if ( + (record.metadata?.parentVersionId ?? 0) < document.vault_update_id + ) { + return DocumentUpToDateness.Later; + } else if ( + document.vault_update_id < (record.metadata?.parentVersionId ?? 0) + ) { + // the document of the cursor must be from the past + return DocumentUpToDateness.Prior; + } + + const currentContent = await this.fileOperations.read( + document.relative_path + ); + + return this.database.getLatestDocumentByRelativePath( + document.relative_path + )?.metadata?.hash === hash(currentContent) + ? DocumentUpToDateness.UpToDate + : DocumentUpToDateness.Prior; + } } diff --git a/frontend/test-client/src/agent/mock-agent.ts b/frontend/test-client/src/agent/mock-agent.ts index 160a782e..b4d1a62e 100644 --- a/frontend/test-client/src/agent/mock-agent.ts +++ b/frontend/test-client/src/agent/mock-agent.ts @@ -7,7 +7,7 @@ import { MockClient } from "./mock-client"; import { sleep } from "../utils/sleep"; import type { LogLine } from "sync-client/dist/types/tracing/logger"; import { flakyFetchFactory } from "../utils/flaky-fetch"; -import { flakyWebSocketFactory } from "../utils/flaky-websocket"; +import { flakyWebSocketFactory } from "../utils/flaky-websocket-factory"; export class MockAgent extends MockClient { private readonly writtenContents: string[] = []; @@ -62,7 +62,7 @@ export class MockAgent extends MockClient { console.error(formatted); if (!this.useSlowFileEvents) { - // Let's not ignore errors + // Let's wait for the error to be caught if there was one // eslint-disable-next-line @typescript-eslint/no-floating-promises sleep(100).then(() => process.exit(1)); }