Add local prediction for remote cursor updates
This commit is contained in:
parent
b7e80c39f1
commit
e73f147fbc
6 changed files with 207 additions and 40 deletions
|
|
@ -11,7 +11,7 @@ import { HistoryView } from "./views/history/history-view";
|
||||||
import { StatusBar } from "./views/status-bar/status-bar";
|
import { StatusBar } from "./views/status-bar/status-bar";
|
||||||
import { LogsView } from "./views/logs/logs-view";
|
import { LogsView } from "./views/logs/logs-view";
|
||||||
import { StatusDescription } from "./views/status-description/status-description";
|
import { StatusDescription } from "./views/status-description/status-description";
|
||||||
import { SyncClient, rateLimit, DEFAULT_SETTINGS } from "sync-client";
|
import { SyncClient, rateLimit, DEFAULT_SETTINGS, Logger } from "sync-client";
|
||||||
import { ObsidianFileSystemOperations } from "./obsidian-file-system";
|
import { ObsidianFileSystemOperations } from "./obsidian-file-system";
|
||||||
import { SyncSettingsTab } from "./views/settings/settings-tab";
|
import { SyncSettingsTab } from "./views/settings/settings-tab";
|
||||||
import { logToConsole } from "./utils/log-to-console";
|
import { logToConsole } from "./utils/log-to-console";
|
||||||
|
|
@ -26,6 +26,7 @@ import { slowFetchFactory } from "./debugging/slow-fetch-factory";
|
||||||
import { flakyWebSocketFactory } from "./debugging/flaky-websocket-factory";
|
import { flakyWebSocketFactory } from "./debugging/flaky-websocket-factory";
|
||||||
|
|
||||||
const MIN_WAIT_BETWEEN_UPDATES_IN_MS = 250;
|
const MIN_WAIT_BETWEEN_UPDATES_IN_MS = 250;
|
||||||
|
|
||||||
export default class VaultLinkPlugin extends Plugin {
|
export default class VaultLinkPlugin extends Plugin {
|
||||||
private readonly disposables: (() => unknown)[] = [];
|
private readonly disposables: (() => unknown)[] = [];
|
||||||
|
|
||||||
|
|
@ -61,7 +62,8 @@ export default class VaultLinkPlugin extends Plugin {
|
||||||
load: this.loadData.bind(this),
|
load: this.loadData.bind(this),
|
||||||
save: this.saveData.bind(this)
|
save: this.saveData.bind(this)
|
||||||
},
|
},
|
||||||
nativeLineEndings: Platform.isWin ? "\r\n" : "\n"
|
nativeLineEndings: Platform.isWin ? "\r\n" : "\n",
|
||||||
|
...debugOptions
|
||||||
});
|
});
|
||||||
|
|
||||||
logToConsole(this.client);
|
logToConsole(this.client);
|
||||||
|
|
|
||||||
|
|
@ -7,7 +7,6 @@ import { getSelectionsFromEditor } from "./get-selections-from-editor";
|
||||||
export class LocalCursorUpdateListener {
|
export class LocalCursorUpdateListener {
|
||||||
private static readonly UPDATE_INTERVAL_MS = 50;
|
private static readonly UPDATE_INTERVAL_MS = 50;
|
||||||
private readonly eventHandle: NodeJS.Timeout;
|
private readonly eventHandle: NodeJS.Timeout;
|
||||||
private lastCursorState: Record<string, Selection[]> = {};
|
|
||||||
|
|
||||||
public constructor(
|
public constructor(
|
||||||
private readonly client: SyncClient,
|
private readonly client: SyncClient,
|
||||||
|
|
@ -24,13 +23,6 @@ export class LocalCursorUpdateListener {
|
||||||
|
|
||||||
private updateAllSelections(): void {
|
private updateAllSelections(): void {
|
||||||
const currentCursors = this.getAllSelections();
|
const currentCursors = this.getAllSelections();
|
||||||
if (
|
|
||||||
JSON.stringify(this.lastCursorState) ===
|
|
||||||
JSON.stringify(currentCursors)
|
|
||||||
) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
this.lastCursorState = currentCursors;
|
|
||||||
this.client
|
this.client
|
||||||
.updateLocalCursors(currentCursors)
|
.updateLocalCursors(currentCursors)
|
||||||
.catch((error: unknown) => {
|
.catch((error: unknown) => {
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,6 @@
|
||||||
import type { Range } from "@codemirror/state";
|
import type { Range } from "@codemirror/state";
|
||||||
import { RangeSet, Annotation, AnnotationType } from "@codemirror/state";
|
import { RangeSet } from "@codemirror/state";
|
||||||
import { ViewPlugin, Decoration, WidgetType } from "@codemirror/view";
|
import { ViewPlugin, Decoration } from "@codemirror/view";
|
||||||
|
|
||||||
import type {
|
import type {
|
||||||
PluginValue,
|
PluginValue,
|
||||||
|
|
@ -9,7 +9,10 @@ import type {
|
||||||
ViewUpdate
|
ViewUpdate
|
||||||
} from "@codemirror/view";
|
} from "@codemirror/view";
|
||||||
import { RemoteCursorWidget } from "./remote-cursor-widget";
|
import { RemoteCursorWidget } from "./remote-cursor-widget";
|
||||||
import type { ClientCursors, CursorSpan } from "sync-client";
|
import type {
|
||||||
|
CursorSpan,
|
||||||
|
DocumentWithMaybeOutdatedClientCursors
|
||||||
|
} from "sync-client";
|
||||||
import type { App } from "obsidian";
|
import type { App } from "obsidian";
|
||||||
import { MarkdownView } from "obsidian";
|
import { MarkdownView } from "obsidian";
|
||||||
|
|
||||||
|
|
@ -17,10 +20,12 @@ let cursors: {
|
||||||
name: string;
|
name: string;
|
||||||
path: string;
|
path: string;
|
||||||
span: CursorSpan;
|
span: CursorSpan;
|
||||||
|
deviceId: string;
|
||||||
}[] = [];
|
}[] = [];
|
||||||
|
|
||||||
import { StateEffect } from "@codemirror/state";
|
import { StateEffect } from "@codemirror/state";
|
||||||
import { getRandomColor } from "src/utils/get-random-color";
|
import { getRandomColor } from "src/utils/get-random-color";
|
||||||
|
import { updateSelection } from "./update-selection";
|
||||||
|
|
||||||
const forceUpdate = StateEffect.define();
|
const forceUpdate = StateEffect.define();
|
||||||
|
|
||||||
|
|
@ -28,6 +33,17 @@ export class RemoteCursorsPluginValue implements PluginValue {
|
||||||
public decorations: DecorationSet = RangeSet.of([]);
|
public decorations: DecorationSet = RangeSet.of([]);
|
||||||
|
|
||||||
public update(update: ViewUpdate): void {
|
public update(update: ViewUpdate): void {
|
||||||
|
update.changes.iterChanges((fromA, toA, fromB, toB, _inserted) => {
|
||||||
|
const spans = cursors.map((cursor) => cursor.span);
|
||||||
|
updateSelection({
|
||||||
|
fromA,
|
||||||
|
toA,
|
||||||
|
fromB,
|
||||||
|
toB,
|
||||||
|
spans
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
const decorations: Range<Decoration>[] = [];
|
const decorations: Range<Decoration>[] = [];
|
||||||
|
|
||||||
cursors.forEach(({ name, span: { start, end } }) => {
|
cursors.forEach(({ name, span: { start, end } }) => {
|
||||||
|
|
@ -103,20 +119,30 @@ export const remoteCursorsPlugin = ViewPlugin.fromClass(
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
|
||||||
export function setCursors(clients: ClientCursors[], app: App): void {
|
export function setCursors(
|
||||||
cursors = clients.flatMap((client) => {
|
clients: DocumentWithMaybeOutdatedClientCursors[],
|
||||||
const clientCursors = client.cursors;
|
app: App
|
||||||
return Object.keys(clientCursors).flatMap((path) => {
|
): void {
|
||||||
const spans = clientCursors[path];
|
cursors = [
|
||||||
return spans
|
...cursors.filter(({ deviceId }) =>
|
||||||
? spans.map((span) => ({
|
clients.some(
|
||||||
|
(client) => client.deviceId === deviceId && client.isOutdated
|
||||||
|
)
|
||||||
|
),
|
||||||
|
...clients
|
||||||
|
.filter(({ isOutdated }) => !isOutdated)
|
||||||
|
.flatMap((client) => {
|
||||||
|
const clientCursors = client.documentsWithCursors;
|
||||||
|
return clientCursors.flatMap((cursor) =>
|
||||||
|
cursor.cursors.map((span) => ({
|
||||||
name: client.userName,
|
name: client.userName,
|
||||||
path,
|
path: cursor.relative_path,
|
||||||
|
deviceId: client.deviceId,
|
||||||
span
|
span
|
||||||
}))
|
}))
|
||||||
: [];
|
);
|
||||||
});
|
})
|
||||||
});
|
];
|
||||||
|
|
||||||
app.workspace
|
app.workspace
|
||||||
.getLeavesOfType("markdown")
|
.getLeavesOfType("markdown")
|
||||||
|
|
|
||||||
|
|
@ -18,6 +18,7 @@ export type { PersistenceProvider } from "./persistence/persistence";
|
||||||
export type { CursorSpan } from "./services/types/CursorSpan";
|
export type { CursorSpan } from "./services/types/CursorSpan";
|
||||||
export type { ClientCursors } from "./services/types/ClientCursors";
|
export type { ClientCursors } from "./services/types/ClientCursors";
|
||||||
export type { NetworkConnectionStatus } from "./types/network-connection-status";
|
export type { NetworkConnectionStatus } from "./types/network-connection-status";
|
||||||
|
export type { DocumentWithMaybeOutdatedClientCursors } from "./types/maybe-outdated-client-cursors";
|
||||||
export { DocumentUpdateStatus } from "./types/document-update-status";
|
export { DocumentUpdateStatus } from "./types/document-update-status";
|
||||||
export { SyncClient } from "./sync-client";
|
export { SyncClient } from "./sync-client";
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -19,9 +19,21 @@ import { WebSocketManager } from "./services/websocket-manager";
|
||||||
import { createClientId } from "./utils/create-client-id";
|
import { createClientId } from "./utils/create-client-id";
|
||||||
import type { CursorSpan } from "./services/types/CursorSpan";
|
import type { CursorSpan } from "./services/types/CursorSpan";
|
||||||
import type { ClientCursors } from "./services/types/ClientCursors";
|
import type { ClientCursors } from "./services/types/ClientCursors";
|
||||||
|
import type { DocumentWithCursors } from "./services/types/DocumentWithCursors";
|
||||||
|
import { hash } from "./utils/hash";
|
||||||
|
import type { DocumentWithMaybeOutdatedClientCursors } from "./types/maybe-outdated-client-cursors";
|
||||||
|
|
||||||
|
enum DocumentUpToDateness {
|
||||||
|
UpToDate = "UpToDate",
|
||||||
|
Prior = "Prior",
|
||||||
|
Later = "Later"
|
||||||
|
}
|
||||||
|
|
||||||
export class SyncClient {
|
export class SyncClient {
|
||||||
private static readonly MINIMUM_SAVE_INTERVAL_MS = 1000;
|
private static readonly MINIMUM_SAVE_INTERVAL_MS = 1000;
|
||||||
|
private lastCursorState: DocumentWithCursors[] = [];
|
||||||
|
|
||||||
|
private readonly knownClientCursors: ClientCursors[] = [];
|
||||||
|
|
||||||
// eslint-disable-next-line @typescript-eslint/max-params
|
// eslint-disable-next-line @typescript-eslint/max-params
|
||||||
private constructor(
|
private constructor(
|
||||||
|
|
@ -32,7 +44,8 @@ export class SyncClient {
|
||||||
private readonly syncService: SyncService,
|
private readonly syncService: SyncService,
|
||||||
private readonly webSocketManager: WebSocketManager,
|
private readonly webSocketManager: WebSocketManager,
|
||||||
private readonly _logger: Logger,
|
private readonly _logger: Logger,
|
||||||
private readonly connectionStatus: ConnectionStatus
|
private readonly connectionStatus: ConnectionStatus,
|
||||||
|
private readonly fileOperations: FileOperations
|
||||||
) {
|
) {
|
||||||
this.settings.addOnSettingsChangeListener(
|
this.settings.addOnSettingsChangeListener(
|
||||||
(newSettings, oldSettings) => {
|
(newSettings, oldSettings) => {
|
||||||
|
|
@ -41,6 +54,10 @@ export class SyncClient {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
|
||||||
|
this.webSocketManager.addRemoteCursorsUpdateListener((cursors) => {
|
||||||
|
this.knownClientCursors.push(...cursors);
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
public get logger(): Logger {
|
public get logger(): Logger {
|
||||||
|
|
@ -157,7 +174,8 @@ export class SyncClient {
|
||||||
syncService,
|
syncService,
|
||||||
webSocketManager,
|
webSocketManager,
|
||||||
logger,
|
logger,
|
||||||
connectionStatus
|
connectionStatus,
|
||||||
|
fileOperations
|
||||||
);
|
);
|
||||||
|
|
||||||
logger.info("SyncClient initialised");
|
logger.info("SyncClient initialised");
|
||||||
|
|
@ -268,18 +286,6 @@ export class SyncClient {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
public async updateLocalCursors(
|
|
||||||
documentToCursors: Record<RelativePath, CursorSpan[]>
|
|
||||||
): Promise<void> {
|
|
||||||
this.webSocketManager.updateLocalCursors({ documentToCursors });
|
|
||||||
}
|
|
||||||
|
|
||||||
public addRemoteCursorsUpdateListener(
|
|
||||||
listener: (cursors: ClientCursors[]) => void
|
|
||||||
): void {
|
|
||||||
this.webSocketManager.addRemoteCursorsUpdateListener(listener);
|
|
||||||
}
|
|
||||||
|
|
||||||
public getDocumentSyncingStatus(
|
public getDocumentSyncingStatus(
|
||||||
relativePath: RelativePath
|
relativePath: RelativePath
|
||||||
): DocumentUpdateStatus {
|
): DocumentUpdateStatus {
|
||||||
|
|
@ -292,4 +298,144 @@ export class SyncClient {
|
||||||
? DocumentUpdateStatus.SYNCING
|
? DocumentUpdateStatus.SYNCING
|
||||||
: DocumentUpdateStatus.UP_TO_DATE;
|
: DocumentUpdateStatus.UP_TO_DATE;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Update the local cursors for the given documents.
|
||||||
|
/// Can be called frequently as it only emits an event
|
||||||
|
// if the state has actually changed.
|
||||||
|
public async updateLocalCursors(
|
||||||
|
documentToCursors: Record<RelativePath, CursorSpan[]>
|
||||||
|
): Promise<void> {
|
||||||
|
const documentsWithCursors: DocumentWithCursors[] = [];
|
||||||
|
|
||||||
|
for (const [relativePath, cursors] of Object.entries(
|
||||||
|
documentToCursors
|
||||||
|
)) {
|
||||||
|
const record =
|
||||||
|
this.database.getLatestDocumentByRelativePath(relativePath);
|
||||||
|
|
||||||
|
if (!record) {
|
||||||
|
continue; // Let's wait for the file to be created before sending cursors
|
||||||
|
}
|
||||||
|
|
||||||
|
const readContent = await this.fileOperations.read(relativePath);
|
||||||
|
|
||||||
|
if (record.metadata?.hash !== hash(readContent)) {
|
||||||
|
continue; // Wouldn't make sense to sync the positions in a dirty file
|
||||||
|
}
|
||||||
|
|
||||||
|
documentsWithCursors.push({
|
||||||
|
relative_path: relativePath,
|
||||||
|
document_id: record.documentId,
|
||||||
|
vault_update_id: record.metadata.parentVersionId,
|
||||||
|
cursors
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
if (
|
||||||
|
JSON.stringify(this.lastCursorState) ===
|
||||||
|
JSON.stringify(documentsWithCursors)
|
||||||
|
) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
this.lastCursorState = documentsWithCursors;
|
||||||
|
|
||||||
|
this.webSocketManager.updateLocalCursors({ documentsWithCursors });
|
||||||
|
}
|
||||||
|
|
||||||
|
public addRemoteCursorsUpdateListener(
|
||||||
|
listener: (cursors: DocumentWithMaybeOutdatedClientCursors[]) => void
|
||||||
|
): void {
|
||||||
|
this.webSocketManager.addRemoteCursorsUpdateListener(async () => {
|
||||||
|
listener(await this.getRelevantClientCursors());
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private async getRelevantClientCursors(): Promise<
|
||||||
|
DocumentWithMaybeOutdatedClientCursors[]
|
||||||
|
> {
|
||||||
|
const result: DocumentWithMaybeOutdatedClientCursors[] = [];
|
||||||
|
const included = new Set<string>();
|
||||||
|
for (const clientCursors of [...this.knownClientCursors].reverse()) {
|
||||||
|
if (included.has(clientCursors.deviceId)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
const upToDateness =
|
||||||
|
await this.getDocumentsUpToDateness(clientCursors);
|
||||||
|
if (upToDateness == DocumentUpToDateness.Later) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
result.push({
|
||||||
|
...clientCursors,
|
||||||
|
isOutdated: upToDateness == DocumentUpToDateness.Prior
|
||||||
|
});
|
||||||
|
|
||||||
|
included.add(clientCursors.deviceId);
|
||||||
|
}
|
||||||
|
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
private async getDocumentsUpToDateness(
|
||||||
|
clientCursor: ClientCursors
|
||||||
|
): Promise<DocumentUpToDateness> {
|
||||||
|
const results = [];
|
||||||
|
for (const document of clientCursor.documentsWithCursors) {
|
||||||
|
results.push(await this.getDocumentUpToDateness(document));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (
|
||||||
|
results.every((result) => result === DocumentUpToDateness.UpToDate)
|
||||||
|
) {
|
||||||
|
return DocumentUpToDateness.UpToDate;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (
|
||||||
|
results.every(
|
||||||
|
(result) =>
|
||||||
|
result === DocumentUpToDateness.UpToDate ||
|
||||||
|
result === DocumentUpToDateness.Prior
|
||||||
|
)
|
||||||
|
) {
|
||||||
|
return DocumentUpToDateness.Prior;
|
||||||
|
}
|
||||||
|
|
||||||
|
return DocumentUpToDateness.Later;
|
||||||
|
}
|
||||||
|
|
||||||
|
private async getDocumentUpToDateness(
|
||||||
|
document: DocumentWithCursors
|
||||||
|
): Promise<DocumentUpToDateness> {
|
||||||
|
const record = this.database.getLatestDocumentByRelativePath(
|
||||||
|
document.relative_path
|
||||||
|
);
|
||||||
|
|
||||||
|
if (!record) {
|
||||||
|
// the document of the cursor must be from the future
|
||||||
|
return DocumentUpToDateness.Later;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (
|
||||||
|
(record.metadata?.parentVersionId ?? 0) < document.vault_update_id
|
||||||
|
) {
|
||||||
|
return DocumentUpToDateness.Later;
|
||||||
|
} else if (
|
||||||
|
document.vault_update_id < (record.metadata?.parentVersionId ?? 0)
|
||||||
|
) {
|
||||||
|
// the document of the cursor must be from the past
|
||||||
|
return DocumentUpToDateness.Prior;
|
||||||
|
}
|
||||||
|
|
||||||
|
const currentContent = await this.fileOperations.read(
|
||||||
|
document.relative_path
|
||||||
|
);
|
||||||
|
|
||||||
|
return this.database.getLatestDocumentByRelativePath(
|
||||||
|
document.relative_path
|
||||||
|
)?.metadata?.hash === hash(currentContent)
|
||||||
|
? DocumentUpToDateness.UpToDate
|
||||||
|
: DocumentUpToDateness.Prior;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -7,7 +7,7 @@ import { MockClient } from "./mock-client";
|
||||||
import { sleep } from "../utils/sleep";
|
import { sleep } from "../utils/sleep";
|
||||||
import type { LogLine } from "sync-client/dist/types/tracing/logger";
|
import type { LogLine } from "sync-client/dist/types/tracing/logger";
|
||||||
import { flakyFetchFactory } from "../utils/flaky-fetch";
|
import { flakyFetchFactory } from "../utils/flaky-fetch";
|
||||||
import { flakyWebSocketFactory } from "../utils/flaky-websocket";
|
import { flakyWebSocketFactory } from "../utils/flaky-websocket-factory";
|
||||||
|
|
||||||
export class MockAgent extends MockClient {
|
export class MockAgent extends MockClient {
|
||||||
private readonly writtenContents: string[] = [];
|
private readonly writtenContents: string[] = [];
|
||||||
|
|
@ -62,7 +62,7 @@ export class MockAgent extends MockClient {
|
||||||
console.error(formatted);
|
console.error(formatted);
|
||||||
|
|
||||||
if (!this.useSlowFileEvents) {
|
if (!this.useSlowFileEvents) {
|
||||||
// Let's not ignore errors
|
// Let's wait for the error to be caught if there was one
|
||||||
// eslint-disable-next-line @typescript-eslint/no-floating-promises
|
// eslint-disable-next-line @typescript-eslint/no-floating-promises
|
||||||
sleep(100).then(() => process.exit(1));
|
sleep(100).then(() => process.exit(1));
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue