Add API for propagating cursor locations #61

Merged
schmelczer merged 30 commits from asch/show-cursors into main 2025-06-08 20:20:53 +01:00
4 changed files with 224 additions and 157 deletions
Showing only changes of commit e7c8d65b23 - Show all commits

View file

@ -8,6 +8,7 @@ export interface SyncSettings {
isSyncEnabled: boolean;
maxFileSizeMB: number;
ignorePatterns: string[];
webSocketRetryIntervalMs: number;
}
export const DEFAULT_SETTINGS: SyncSettings = {
@ -17,7 +18,8 @@ export const DEFAULT_SETTINGS: SyncSettings = {
syncConcurrency: 1,
isSyncEnabled: false,
maxFileSizeMB: 10,
ignorePatterns: []
ignorePatterns: [],
webSocketRetryIntervalMs: 3500
};
export class Settings {

View file

@ -0,0 +1,196 @@
import type { Database } from "../persistence/database";
import type { Logger } from "../tracing/logger";
import type { Settings, SyncSettings } from "../persistence/settings";
import { WebSocketServerMessage } from "./types/WebSocketServerMessage";
import { Syncer } from "../sync-operations/syncer";
import { WebSocketClientMessage } from "./types/WebSocketClientMessage";
import { CursorPositionFromClient } from "./types/CursorPositionFromClient";
export class WebSocketManager {
private readonly webSocketStatusChangeListeners: (() => unknown)[] = [];
// private readonly cur: (() => unknown)[] = [];
private refreshWebSocketInterval: NodeJS.Timeout | undefined;
private webSocket: WebSocket | undefined;
private readonly webSocketFactoryImplementation: typeof globalThis.WebSocket;
// eslint-disable-next-line @typescript-eslint/max-params
public constructor(
private readonly deviceId: string,
private readonly logger: Logger,
private readonly database: Database,
private readonly settings: Settings,
private readonly syncer: Syncer,
webSocketImplementation?: typeof globalThis.WebSocket
) {
if (webSocketImplementation) {
this.webSocketFactoryImplementation = webSocketImplementation;
} else {
if (
typeof globalThis !== "undefined" &&
typeof globalThis.WebSocket === "undefined"
) {
// eslint-disable-next-line
this.webSocketFactoryImplementation = require("ws"); // polyfill for WebSocket in Node.js
} else {
this.webSocketFactoryImplementation = WebSocket;
}
}
this.updateWebSocket(settings.getSettings());
settings.addOnSettingsChangeListener((newSettings, oldSettings) => {
if (
newSettings.remoteUri !== oldSettings.remoteUri ||
newSettings.vaultName !== oldSettings.vaultName ||
newSettings.token !== oldSettings.token ||
newSettings.isSyncEnabled !== oldSettings.isSyncEnabled
) {
this.updateWebSocket(newSettings);
}
});
this.setWebSocketRefreshInterval();
}
public get isWebSocketConnected(): boolean {
return (
this.webSocket?.readyState ===
this.webSocketFactoryImplementation.OPEN
);
}
public addWebSocketStatusChangeListener(listener: () => void): void {
this.webSocketStatusChangeListeners.push(listener);
}
public async reset(): Promise<void> {
this.setWebSocketRefreshInterval();
this.updateWebSocket(this.settings.getSettings());
}
public stop(): void {
clearInterval(this.refreshWebSocketInterval);
try {
this.webSocket?.close();
} catch (e) {
this.logger.warn(`Failed to close WebSocket: ${e}`);
}
}
private updateWebSocket(settings: SyncSettings): void {
try {
this.webSocket?.close();
} catch (e) {
this.logger.warn(`Failed to close WebSocket: ${e}`);
}
if (!settings.isSyncEnabled) {
this.webSocket = undefined;
return;
}
const wsUri = new URL(settings.remoteUri);
wsUri.protocol = wsUri.protocol === "https" ? "wss" : "ws";
wsUri.pathname = `/vaults/${settings.vaultName}/ws`;
this.logger.info(`Connecting to WebSocket at ${wsUri.toString()}`);
this.webSocket = new this.webSocketFactoryImplementation(wsUri);
this.webSocket.onmessage = async (event): Promise<void> => {
// eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion
const message = JSON.parse(event.data) as WebSocketServerMessage;
if (message.type === "vaultUpdate") {
try {
await Promise.all(
message.documents.map(async (document) =>
this.syncer.syncRemotelyUpdatedFile(document)
)
);
if (message.isInitialSync && message.documents.length > 0) {
this.database.setLastSeenUpdateId(
message.documents
.map((document) => document.vaultUpdateId)
.reduce((a, b) => Math.max(a, b))
);
}
} catch (e) {
this.logger.error(
`Failed to sync remotely updated file: ${e}`
);
}
} else if (message.type === "cursorPositions") {
this.logger.info(
`Received cursor positions for ${JSON.stringify(message.clients)}`
);
// Handle cursor positions if needed
} else {
this.logger.warn(
`Received unknown message type: ${JSON.stringify(message)}`
);
}
};
// The JS WebSocket API doesn't support setting headers, so we have to send the token as a message
this.webSocket.onopen = (): void => {
this.logger.info("WebSocket connection opened");
this.webSocketStatusChangeListeners.forEach((listener) => {
listener();
});
let message: WebSocketClientMessage = {
type: "handshake",
deviceId: this.deviceId,
token: settings.token,
lastSeenVaultUpdateId: this.database.getLastSeenUpdateId()
};
this.webSocket?.send(JSON.stringify(message));
};
this.webSocket.onclose = (event): void => {
this.logger.warn(
`WebSocket closed with code ${event.code} (${event.reason == "" ? "unknown reason" : event.reason})`
);
this.webSocketStatusChangeListeners.forEach((listener) => {
listener();
});
};
}
public sendCursorPositions(
cursorPositions: CursorPositionFromClient
): void {
if (!this.isWebSocketConnected) {
this.logger.warn(
"WebSocket is not connected, cannot send cursor positions"
);
return;
}
let message: WebSocketClientMessage = {
type: "cursorPositions",
...cursorPositions
};
this.webSocket?.send(JSON.stringify(message));
this.logger.info(
`Sent cursor positions: ${JSON.stringify(cursorPositions)}`
);
}
private setWebSocketRefreshInterval(): void {
this.refreshWebSocketInterval = setInterval(() => {
if (
this.webSocket?.readyState ===
this.webSocketFactoryImplementation.CLOSED
) {
this.logger.info("WebSocket is closed, reconnecting...");
this.updateWebSocket(this.settings.getSettings());
}
}, this.settings.getSettings().webSocketRetryIntervalMs);
}
}

View file

@ -15,9 +15,10 @@ import { FileOperations } from "./file-operations/file-operations";
import { ConnectionStatus } from "./services/connection-status";
import { UnrestrictedSyncer } from "./sync-operations/unrestricted-syncer";
import { rateLimit } from "./utils/rate-limit";
import { v4 as uuidv4 } from "uuid";
import type { NetworkConnectionStatus } from "./types/network-connection-status";
import { DocumentUpdateStatus } from "./types/document-update-status";
import { WebSocketManager } from "./services/websocket-manager";
import { createClientId } from "./utils/create-client-id";
export class SyncClient {
private static readonly MINIMUM_SAVE_INTERVAL_MS = 1000;
@ -29,6 +30,7 @@ export class SyncClient {
private readonly database: Database,
private readonly syncer: Syncer,
private readonly syncService: SyncService,
private readonly webSocketManager: WebSocketManager,
private readonly _logger: Logger,
private readonly connectionStatus: ConnectionStatus
) {
@ -68,7 +70,10 @@ export class SyncClient {
nativeLineEndings?: string;
}): Promise<SyncClient> {
const logger = new Logger();
logger.info("Initialising SyncClient");
const deviceId = createClientId();
logger.info(`Initialising SyncClient with client id ${deviceId}`);
const history = new SyncHistory(logger);
@ -104,7 +109,6 @@ export class SyncClient {
await rateLimitedSave(state);
}
);
const deviceId = uuidv4();
const connectionStatus = new ConnectionStatus(settings, logger);
const syncService = new SyncService(
@ -121,6 +125,7 @@ export class SyncClient {
fs,
nativeLineEndings
);
const unrestrictedSyncer = new UnrestrictedSyncer(
logger,
database,
@ -129,6 +134,7 @@ export class SyncClient {
fileOperations,
history
);
const syncer = new Syncer(
deviceId,
logger,
@ -136,7 +142,15 @@ export class SyncClient {
settings,
syncService,
fileOperations,
unrestrictedSyncer,
unrestrictedSyncer
);
const webSocketManager = new WebSocketManager(
deviceId,
logger,
database,
settings,
syncer,
webSocket
);
@ -146,6 +160,7 @@ export class SyncClient {
database,
syncer,
syncService,
webSocketManager,
logger,
connectionStatus
);
@ -160,7 +175,7 @@ export class SyncClient {
return {
isSuccessful: server.isSuccessful,
serverMessage: server.message,
isWebSocketConnected: this.syncer.isWebSocketConnected
isWebSocketConnected: this.webSocketManager.isWebSocketConnected
};
}
@ -179,7 +194,7 @@ export class SyncClient {
}
public stop(): void {
this.syncer.stop();
this.webSocketManager.stop();
}
public async waitAndStop(): Promise<void> {
@ -194,6 +209,7 @@ export class SyncClient {
this.stop();
this.connectionStatus.startReset();
await this.syncer.reset();
await this.webSocketManager.reset();
this.history.reset();
this.database.reset();
this._logger.reset();
@ -229,7 +245,7 @@ export class SyncClient {
}
public addWebSocketStatusChangeListener(listener: () => void): void {
this.syncer.addWebSocketStatusChangeListener(listener);
this.webSocketManager.addWebSocketStatusChangeListener(listener);
}
public async syncLocallyCreatedFile(

View file

@ -18,26 +18,14 @@ import { createPromise } from "../utils/create-promise";
import { SyncResetError } from "../services/sync-reset-error";
import { Locks } from "../utils/locks";
interface WebsocketVaultUpdate {
documents: components["schemas"]["DocumentVersionWithoutContent"][];
isInitialSync: boolean;
}
export class Syncer {
private readonly remoteDocumentsLock: Locks<DocumentId>;
private readonly remainingOperationsListeners: ((
remainingOperations: number
) => void)[] = [];
private readonly webSocketStatusChangeListeners: (() => void)[] = [];
private readonly syncQueue: PQueue;
private runningScheduleSyncForOfflineChanges: Promise<void> | undefined;
private refreshApplyRemoteChangesWebSocketInterval:
| NodeJS.Timeout
| undefined;
private applyRemoteChangesWebSocket: WebSocket | undefined;
private readonly webSocketImplementation: typeof globalThis.WebSocket;
// eslint-disable-next-line @typescript-eslint/max-params
public constructor(
@ -47,41 +35,15 @@ export class Syncer {
private readonly settings: Settings,
private readonly syncService: SyncService,
private readonly operations: FileOperations,
private readonly internalSyncer: UnrestrictedSyncer,
webSocketImplementation?: typeof globalThis.WebSocket
private readonly internalSyncer: UnrestrictedSyncer
) {
this.syncQueue = new PQueue({
concurrency: settings.getSettings().syncConcurrency
});
if (webSocketImplementation) {
this.webSocketImplementation = webSocketImplementation;
} else {
if (
typeof globalThis !== "undefined" &&
typeof globalThis.WebSocket === "undefined"
) {
// eslint-disable-next-line
this.webSocketImplementation = require("ws"); // polyfill for WebSocket in Node.js
} else {
this.webSocketImplementation = WebSocket;
}
}
this.updateWebSocket(settings.getSettings());
this.remoteDocumentsLock = new Locks<DocumentId>(this.logger);
settings.addOnSettingsChangeListener((newSettings, oldSettings) => {
if (
newSettings.remoteUri !== oldSettings.remoteUri ||
newSettings.vaultName !== oldSettings.vaultName ||
newSettings.token !== oldSettings.token ||
newSettings.isSyncEnabled !== oldSettings.isSyncEnabled
) {
this.updateWebSocket(newSettings);
}
if (newSettings.syncConcurrency !== oldSettings.syncConcurrency) {
this.syncQueue.concurrency = newSettings.syncConcurrency;
}
@ -92,15 +54,6 @@ export class Syncer {
listener(this.syncQueue.size);
});
});
this.setWebSocketRefreshInterval();
}
public get isWebSocketConnected(): boolean {
return (
this.applyRemoteChangesWebSocket?.readyState ===
this.webSocketImplementation.OPEN
);
}
public addRemainingOperationsListener(
@ -109,10 +62,6 @@ export class Syncer {
this.remainingOperationsListeners.push(listener);
}
public addWebSocketStatusChangeListener(listener: () => void): void {
this.webSocketStatusChangeListeners.push(listener);
}
public async syncLocallyCreatedFile(
relativePath: RelativePath
): Promise<void> {
@ -303,105 +252,9 @@ export class Syncer {
public async reset(): Promise<void> {
await this.waitUntilFinished();
this.setWebSocketRefreshInterval();
this.updateWebSocket(this.settings.getSettings());
}
public stop(): void {
clearInterval(this.refreshApplyRemoteChangesWebSocketInterval);
try {
this.applyRemoteChangesWebSocket?.close();
} catch (e) {
this.logger.warn(`Failed to close WebSocket: ${e}`);
}
}
private updateWebSocket(settings: SyncSettings): void {
try {
this.applyRemoteChangesWebSocket?.close();
} catch (e) {
this.logger.warn(`Failed to close WebSocket: ${e}`);
}
if (!settings.isSyncEnabled) {
this.applyRemoteChangesWebSocket = undefined;
return;
}
const wsUri = new URL(settings.remoteUri);
wsUri.protocol = wsUri.protocol === "https" ? "wss" : "ws";
wsUri.pathname = `/vaults/${settings.vaultName}/ws`;
this.logger.info(`Connecting to WebSocket at ${wsUri.toString()}`);
this.applyRemoteChangesWebSocket = new this.webSocketImplementation(
wsUri
);
this.applyRemoteChangesWebSocket.onmessage = async (
event
): Promise<void> => {
// eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion
const message = JSON.parse(event.data) as WebsocketVaultUpdate;
try {
await Promise.all(
message.documents.map(async (document) =>
this.syncRemotelyUpdatedFile(document)
)
);
if (message.isInitialSync && message.documents.length > 0) {
this.database.setLastSeenUpdateId(
message.documents
.map((document) => document.vaultUpdateId)
.reduce((a, b) => Math.max(a, b))
);
}
} catch (e) {
this.logger.error(`Failed to sync remotely updated file: ${e}`);
}
};
// The JS WebSocket API doesn't support setting headers, so we have to send the token as a message
this.applyRemoteChangesWebSocket.onopen = (): void => {
this.logger.info("WebSocket connection opened");
this.applyRemoteChangesWebSocket?.send(
JSON.stringify({
deviceId: this.deviceId,
token: settings.token,
lastSeenVaultUpdateId: this.database.getLastSeenUpdateId()
})
);
this.webSocketStatusChangeListeners.forEach((listener) => {
listener();
});
};
this.applyRemoteChangesWebSocket.onclose = (event): void => {
this.logger.warn(
`WebSocket closed with code ${event.code} (${event.reason == "" ? "unknown reason" : event.reason})`
);
this.webSocketStatusChangeListeners.forEach((listener) => {
listener();
});
};
}
private setWebSocketRefreshInterval(): void {
this.refreshApplyRemoteChangesWebSocketInterval = setInterval(() => {
if (
this.applyRemoteChangesWebSocket?.readyState ===
this.webSocketImplementation.OPEN
) {
return;
}
this.updateWebSocket(this.settings.getSettings());
}, 5000);
}
private async syncRemotelyUpdatedFile(
public async syncRemotelyUpdatedFile(
remoteVersion: components["schemas"]["DocumentVersionWithoutContent"]
): Promise<void> {
let document = this.database.getDocumentByDocumentId(