Add event handler class

This commit is contained in:
Andras Schmelczer 2025-12-07 13:30:45 +00:00
parent 1ea465fcf8
commit 496db06213
14 changed files with 2428 additions and 2309 deletions

View file

@ -2,54 +2,54 @@ import "./file-explorer.scss";
import type { App, View } from "obsidian";
import {
utils,
type MaybeOutdatedClientCursors,
type RelativePath
utils,
type MaybeOutdatedClientCursors,
type RelativePath
} from "sync-client";
const REMOTE_USER_CONTAINER_CLASS = "remote-users";
export function renderCursorsInFileExplorer(
cursors: MaybeOutdatedClientCursors[],
app: App
cursors: MaybeOutdatedClientCursors[],
app: App
): void {
const fileExplorers = app.workspace.getLeavesOfType("file-explorer");
if (fileExplorers.length == 0) return;
const fileExplorers = app.workspace.getLeavesOfType("file-explorer");
if (fileExplorers.length == 0) return;
const [fileExplorer] = fileExplorers;
const [fileExplorer] = fileExplorers;
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
const fileExplorerView: View & {
fileItems: Record<RelativePath, { el: Element }>; // it's an internal API
} = fileExplorer.view as any; // eslint-disable-line
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
const fileExplorerView: View & {
fileItems: Record<RelativePath, { el: Element }>; // it's an internal API
} = fileExplorer.view as any; // eslint-disable-line
for (const key in fileExplorerView.fileItems) {
const element =
fileExplorerView.fileItems[key].el.querySelector(".tree-item-self");
for (const key in fileExplorerView.fileItems) {
const element =
fileExplorerView.fileItems[key].el.querySelector(".tree-item-self");
const customElement = createDiv(
{
cls: REMOTE_USER_CONTAINER_CLASS
},
(parent) => {
cursors.forEach((cursor) => {
cursor.documentsWithCursors.forEach((document) => {
if (document.relative_path.startsWith(key)) {
parent.appendChild(
createSpan({
text: cursor.userName,
attr: {
style: `border-color: ${utils.getRandomColor(cursor.userName)}`
}
})
);
}
});
});
}
);
const customElement = createDiv(
{
cls: REMOTE_USER_CONTAINER_CLASS
},
(parent) => {
cursors.forEach((cursor) => {
cursor.documentsWithCursors.forEach((document) => {
if (document.relative_path.startsWith(key)) {
parent.appendChild(
createSpan({
text: cursor.userName,
attr: {
style: `border-color: ${utils.getRandomColor(cursor.userName)}`
}
})
);
}
});
});
}
);
element?.querySelector("." + REMOTE_USER_CONTAINER_CLASS)?.remove();
element?.appendChild(customElement);
}
element?.querySelector("." + REMOTE_USER_CONTAINER_CLASS)?.remove();
element?.appendChild(customElement);
}
}

View file

@ -1,113 +1,94 @@
import type { Logger } from "../tracing/logger";
import { awaitAll } from "../utils/await-all";
import { Lock } from "../utils/data-structures/locks";
import { removeFromArray } from "../utils/remove-from-array";
import { EventListeners } from "../utils/data-structures/event-listeners";
export interface SyncSettings {
remoteUri: string;
token: string;
vaultName: string;
syncConcurrency: number;
isSyncEnabled: boolean;
maxFileSizeMB: number;
ignorePatterns: string[];
webSocketRetryIntervalMs: number;
diffCacheSizeMB: number;
enableTelemetry: boolean;
networkRetryIntervalMs: number;
minimumSaveIntervalMs: number;
remoteUri: string;
token: string;
vaultName: string;
syncConcurrency: number;
isSyncEnabled: boolean;
maxFileSizeMB: number;
ignorePatterns: string[];
webSocketRetryIntervalMs: number;
diffCacheSizeMB: number;
enableTelemetry: boolean;
networkRetryIntervalMs: number;
minimumSaveIntervalMs: number;
}
export const DEFAULT_SETTINGS: SyncSettings = {
remoteUri: "",
token: "",
vaultName: "default",
syncConcurrency: 1,
isSyncEnabled: false,
maxFileSizeMB: 10,
ignorePatterns: [],
webSocketRetryIntervalMs: 3500,
diffCacheSizeMB: 4,
enableTelemetry: false,
networkRetryIntervalMs: 1000,
minimumSaveIntervalMs: 1000
remoteUri: "",
token: "",
vaultName: "default",
syncConcurrency: 1,
isSyncEnabled: false,
maxFileSizeMB: 10,
ignorePatterns: [],
webSocketRetryIntervalMs: 3500,
diffCacheSizeMB: 4,
enableTelemetry: false,
networkRetryIntervalMs: 1000,
minimumSaveIntervalMs: 1000
};
export class Settings {
private settings: SyncSettings;
private readonly lock: Lock = new Lock();
private settings: SyncSettings;
private readonly lock: Lock = new Lock();
private readonly onSettingsChangeHandlers: ((
newSettings: SyncSettings,
oldSettings: SyncSettings
) => unknown)[] = [];
public readonly onSettingsChanged = new EventListeners<
(newSettings: SyncSettings, oldSettings: SyncSettings) => unknown
>();
public constructor(
private readonly logger: Logger,
initialState: Partial<SyncSettings> | undefined,
private readonly saveData: (data: SyncSettings) => Promise<void>
) {
this.settings = {
...DEFAULT_SETTINGS,
...(initialState ?? {})
};
public constructor(
private readonly logger: Logger,
initialState: Partial<SyncSettings> | undefined,
private readonly saveData: (data: SyncSettings) => Promise<void>
) {
this.settings = {
...DEFAULT_SETTINGS,
...(initialState ?? {})
};
this.logger.debug(
`Loaded settings: ${JSON.stringify(this.settings, null, 2)}`
);
}
this.logger.debug(
`Loaded settings: ${JSON.stringify(this.settings, null, 2)}`
);
}
public getSettings(): SyncSettings {
return this.settings;
}
public getSettings(): SyncSettings {
return this.settings;
}
public addOnSettingsChangeListener(
listener: (settings: SyncSettings, oldSettings: SyncSettings) => unknown
): void {
this.onSettingsChangeHandlers.push(listener);
}
public async setSetting<T extends keyof SyncSettings>(
key: T,
value: SyncSettings[T]
): Promise<void> {
await this.setSettings({
[key]: value
});
}
public removeOnSettingsChangeListener(
listener: (settings: SyncSettings, oldSettings: SyncSettings) => unknown
): void {
removeFromArray(this.onSettingsChangeHandlers, listener);
}
public async setSettings(value: Partial<SyncSettings>): Promise<void> {
await this.lock.withLock(async () => {
this.logger.debug(
`Updating settings with: ${JSON.stringify(value)}`
);
const oldSettings = this.settings;
this.settings = {
...this.settings,
...value
};
public async setSetting<T extends keyof SyncSettings>(
key: T,
value: SyncSettings[T]
): Promise<void> {
await this.setSettings({
[key]: value
});
}
await this.onSettingsChanged.triggerAsync(
this.settings,
oldSettings
);
public async setSettings(value: Partial<SyncSettings>): Promise<void> {
await this.lock.withLock(async () => {
this.logger.debug(
`Updating settings with: ${JSON.stringify(value)}`
);
const oldSettings = this.settings;
this.settings = {
...this.settings,
...value
};
await this.save();
});
}
await awaitAll(
this.onSettingsChangeHandlers
.map((handler) => {
return handler(this.settings, oldSettings);
})
.filter((result): result is Promise<unknown> => {
return result instanceof Promise;
})
);
await this.save();
});
}
private async save(): Promise<void> {
await this.saveData(this.settings);
}
private async save(): Promise<void> {
await this.saveData(this.settings);
}
}

View file

@ -122,7 +122,7 @@ describe("WebSocketManager", () => {
MockWebSocket as unknown as typeof WebSocket
);
manager.addRemoteVaultUpdateListener(async () => {
manager.onRemoteVaultUpdateReceived.add(async () => {
await new Promise((resolve) => setTimeout(resolve, 10));
});
manager.start();
@ -152,7 +152,7 @@ describe("WebSocketManager", () => {
MockWebSocket as unknown as typeof WebSocket
);
manager.addRemoteCursorsUpdateListener(async () => {
manager.onRemoteCursorsUpdateReceived.add(async () => {
await new Promise((resolve) => setTimeout(resolve, 10));
});
manager.start();
@ -227,7 +227,7 @@ describe("WebSocketManager", () => {
);
let statusChangeCount = 0;
manager.addWebSocketStatusChangeListener(() => {
manager.onWebSocketStatusChanged.add(() => {
statusChangeCount++;
});
@ -269,7 +269,7 @@ describe("WebSocketManager", () => {
resolveListener = resolve;
});
manager.addRemoteVaultUpdateListener(async () => {
manager.onRemoteVaultUpdateReceived.add(async () => {
await listenerPromise;
});

View file

@ -6,295 +6,260 @@ import type { CursorPositionFromClient } from "./types/CursorPositionFromClient"
import type { ClientCursors } from "./types/ClientCursors";
import { createPromise } from "../utils/create-promise";
import type { WebSocketVaultUpdate } from "./types/WebSocketVaultUpdate";
import { awaitAll } from "../utils/await-all";
import { WEBSOCKET_DISCONNECT_TIMEOUT_IN_S } from "../consts";
import { removeFromArray } from "../utils/remove-from-array";
import { EventListeners } from "../utils/data-structures/event-listeners";
import { awaitAll } from "../utils/await-all";
export class WebSocketManager {
private readonly webSocketStatusChangeListeners: ((
isConnected: boolean
) => unknown)[] = [];
public readonly onWebSocketStatusChanged = new EventListeners<
(isConnected: boolean) => unknown
>();
private readonly remoteVaultUpdateListeners: ((
update: WebSocketVaultUpdate
) => Promise<void>)[] = [];
public readonly onRemoteVaultUpdateReceived = new EventListeners<
(update: WebSocketVaultUpdate) => Promise<void>
>();
private readonly remoteCursorsUpdateListeners: ((
cursors: ClientCursors[]
) => Promise<void>)[] = [];
public readonly onRemoteCursorsUpdateReceived = new EventListeners<
(cursors: ClientCursors[]) => Promise<void>
>();
private isStopped = true;
private resolveDisconnectingPromise: null | (() => unknown) = null;
private reconnectTimeoutId: ReturnType<typeof setTimeout> | undefined;
private isStopped = true;
private resolveDisconnectingPromise: null | (() => unknown) = null;
private reconnectTimeoutId: ReturnType<typeof setTimeout> | undefined;
private readonly outstandingPromises: Promise<unknown>[] = [];
private readonly outstandingPromises: Promise<unknown>[] = [];
private webSocket: WebSocket | undefined;
private readonly webSocketFactoryImplementation: typeof globalThis.WebSocket;
private webSocket: WebSocket | undefined;
private readonly webSocketFactoryImplementation: typeof globalThis.WebSocket;
public constructor(
private readonly deviceId: string,
private readonly logger: Logger,
private readonly settings: Settings,
webSocketImplementation?: typeof globalThis.WebSocket
) {
if (webSocketImplementation) {
this.webSocketFactoryImplementation = webSocketImplementation;
} else {
if (
typeof globalThis !== "undefined" &&
typeof globalThis.WebSocket === "undefined"
) {
// eslint-disable-next-line
this.webSocketFactoryImplementation = require("ws"); // polyfill for WebSocket in Node.js
} else {
this.webSocketFactoryImplementation = WebSocket;
}
}
}
public constructor(
private readonly deviceId: string,
private readonly logger: Logger,
private readonly settings: Settings,
webSocketImplementation?: typeof globalThis.WebSocket
) {
if (webSocketImplementation) {
this.webSocketFactoryImplementation = webSocketImplementation;
} else {
if (
typeof globalThis !== "undefined" &&
typeof globalThis.WebSocket === "undefined"
) {
// eslint-disable-next-line
this.webSocketFactoryImplementation = require("ws"); // polyfill for WebSocket in Node.js
} else {
this.webSocketFactoryImplementation = WebSocket;
}
}
}
public get isWebSocketConnected(): boolean {
return (
this.webSocket?.readyState ===
this.webSocketFactoryImplementation.OPEN
);
}
public get isWebSocketConnected(): boolean {
return (
this.webSocket?.readyState ===
this.webSocketFactoryImplementation.OPEN
);
}
public addWebSocketStatusChangeListener(
listener: (isConnected: boolean) => unknown
): void {
this.webSocketStatusChangeListeners.push(listener);
}
public start(): void {
this.isStopped = false;
this.initializeWebSocket();
}
public addRemoteCursorsUpdateListener(
listener: (cursors: ClientCursors[]) => Promise<void>
): void {
this.remoteCursorsUpdateListeners.push(listener);
}
public async stop(): Promise<void> {
const [promise, resolve] = createPromise();
this.resolveDisconnectingPromise = resolve;
public addRemoteVaultUpdateListener(
listener: (update: WebSocketVaultUpdate) => Promise<void>
): void {
this.remoteVaultUpdateListeners.push(listener);
}
this.isStopped = true;
public start(): void {
this.isStopped = false;
this.initializeWebSocket();
}
if (this.reconnectTimeoutId !== undefined) {
clearTimeout(this.reconnectTimeoutId);
this.reconnectTimeoutId = undefined;
}
public async stop(): Promise<void> {
const [promise, resolve] = createPromise();
this.resolveDisconnectingPromise = resolve;
this.webSocket?.close(1000, "WebSocketManager has been stopped");
this.isStopped = true;
// eslint-disable-next-line @typescript-eslint/init-declarations
let timeoutId: ReturnType<typeof setTimeout> | undefined;
const timeoutPromise = new Promise<void>((_, reject) => {
timeoutId = setTimeout(() => {
reject(
new Error(
`Timeout waiting for WebSocket to close after ${WEBSOCKET_DISCONNECT_TIMEOUT_IN_S} seconds`
)
);
}, WEBSOCKET_DISCONNECT_TIMEOUT_IN_S * 1000);
});
if (this.reconnectTimeoutId !== undefined) {
clearTimeout(this.reconnectTimeoutId);
this.reconnectTimeoutId = undefined;
}
try {
while (this.isWebSocketConnected) {
await Promise.race([promise, timeoutPromise]);
}
} catch (error) {
this.logger.error(
`Error while waiting for WebSocket to close: ${String(error)}`
);
// Force cleanup even if close didn't work
this.resolveDisconnectingPromise();
this.resolveDisconnectingPromise = null;
} finally {
// Clear timeout to prevent unhandled rejection
if (timeoutId !== undefined) {
clearTimeout(timeoutId);
}
}
this.webSocket?.close(1000, "WebSocketManager has been stopped");
await this.waitUntilFinished();
}
// eslint-disable-next-line @typescript-eslint/init-declarations
let timeoutId: ReturnType<typeof setTimeout> | undefined;
const timeoutPromise = new Promise<void>((_, reject) => {
timeoutId = setTimeout(() => {
reject(
new Error(
`Timeout waiting for WebSocket to close after ${WEBSOCKET_DISCONNECT_TIMEOUT_IN_S} seconds`
)
);
}, WEBSOCKET_DISCONNECT_TIMEOUT_IN_S * 1000);
});
public async waitUntilFinished(): Promise<void> {
await awaitAll(this.outstandingPromises);
}
try {
while (this.isWebSocketConnected) {
await Promise.race([promise, timeoutPromise]);
}
} catch (error) {
this.logger.error(
`Error while waiting for WebSocket to close: ${String(error)}`
);
// Force cleanup even if close didn't work
this.resolveDisconnectingPromise();
this.resolveDisconnectingPromise = null;
} finally {
// Clear timeout to prevent unhandled rejection
if (timeoutId !== undefined) {
clearTimeout(timeoutId);
}
}
public sendHandshakeMessage(
message: WebSocketClientMessage & { type: "handshake" }
): void {
const { webSocket } = this;
if (!webSocket) {
throw new Error(
"WebSocket is not connected, cannot send handshake message"
);
}
await this.waitUntilFinished();
}
try {
webSocket.send(JSON.stringify(message));
} catch (error) {
this.logger.error(
`Failed to send handshake message: ${String(error)}`
);
throw error;
}
}
public async waitUntilFinished(): Promise<void> {
await awaitAll(this.outstandingPromises);
}
public updateLocalCursors(cursorPositions: CursorPositionFromClient): void {
if (!this.isWebSocketConnected || !this.webSocket) {
// A missing cursor update is fine, we can just skip it if needed
this.logger.warn(
"WebSocket is not connected, cannot send cursor positions"
);
return;
}
public sendHandshakeMessage(
message: WebSocketClientMessage & { type: "handshake" }
): void {
const { webSocket } = this;
if (!webSocket) {
throw new Error(
"WebSocket is not connected, cannot send handshake message"
);
}
const message: WebSocketClientMessage = {
type: "cursorPositions",
...cursorPositions
};
try {
webSocket.send(JSON.stringify(message));
} catch (error) {
this.logger.error(
`Failed to send handshake message: ${String(error)}`
);
throw error;
}
}
try {
this.webSocket.send(JSON.stringify(message));
this.logger.debug(
`Sent cursor positions: ${JSON.stringify(cursorPositions)}`
);
} catch (error) {
this.logger.warn(
`Failed to send cursor positions: ${String(error)}`
);
}
}
public updateLocalCursors(cursorPositions: CursorPositionFromClient): void {
if (!this.isWebSocketConnected || !this.webSocket) {
// A missing cursor update is fine, we can just skip it if needed
this.logger.warn(
"WebSocket is not connected, cannot send cursor positions"
);
return;
}
private initializeWebSocket(): void {
// Clean up old WebSocket handlers to prevent race conditions
if (this.webSocket) {
try {
// Remove handlers to prevent them from firing after new connection
this.webSocket.onopen = null;
this.webSocket.onclose = null;
this.webSocket.onmessage = null;
this.webSocket.onerror = null;
this.webSocket.close();
} catch (e) {
this.logger.error(
`Failed to close previous WebSocket connection: ${e}`
);
}
}
const message: WebSocketClientMessage = {
type: "cursorPositions",
...cursorPositions
};
const wsUri = new URL(this.settings.getSettings().remoteUri);
wsUri.protocol = wsUri.protocol === "https" ? "wss" : "ws";
wsUri.pathname = `/vaults/${this.settings.getSettings().vaultName}/ws`;
try {
this.webSocket.send(JSON.stringify(message));
this.logger.debug(
`Sent cursor positions: ${JSON.stringify(cursorPositions)}`
);
} catch (error) {
this.logger.warn(
`Failed to send cursor positions: ${String(error)}`
);
}
}
this.logger.info(`Connecting to WebSocket at ${wsUri.toString()}`);
private initializeWebSocket(): void {
// Clean up old WebSocket handlers to prevent race conditions
if (this.webSocket) {
try {
// Remove handlers to prevent them from firing after new connection
this.webSocket.onopen = null;
this.webSocket.onclose = null;
this.webSocket.onmessage = null;
this.webSocket.onerror = null;
this.webSocket.close();
} catch (e) {
this.logger.error(
`Failed to close previous WebSocket connection: ${e}`
);
}
}
this.webSocket = new this.webSocketFactoryImplementation(wsUri);
const wsUri = new URL(this.settings.getSettings().remoteUri);
wsUri.protocol = wsUri.protocol === "https" ? "wss" : "ws";
wsUri.pathname = `/vaults/${this.settings.getSettings().vaultName}/ws`;
this.webSocket.onopen = (): void => {
this.logger.info("WebSocket connection opened");
this.onWebSocketStatusChanged.trigger(true);
};
this.logger.info(`Connecting to WebSocket at ${wsUri.toString()}`);
this.webSocket.onmessage = (event): void => {
try {
// eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion
const message = JSON.parse(
event.data
) as WebSocketServerMessage;
this.webSocket = new this.webSocketFactoryImplementation(wsUri);
// Track the message handling promise
const messageHandlingPromise = this.handleWebSocketMessage(
message
)
.catch((error: unknown) => {
this.logger.error(
`Error handling WebSocket message: ${String(error)}`
);
})
.finally(() => {
removeFromArray(
this.outstandingPromises,
messageHandlingPromise
);
});
this.webSocket.onopen = (): void => {
this.logger.info("WebSocket connection opened");
this.webSocketStatusChangeListeners.forEach((listener) =>
listener(true)
);
};
void this.outstandingPromises.push(messageHandlingPromise); // ignore the returned promise
} catch (error) {
this.logger.error(
`Error parsing WebSocket message: ${String(error)}`
);
}
};
this.webSocket.onmessage = (event): void => {
try {
// eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion
const message = JSON.parse(
event.data
) as WebSocketServerMessage;
this.webSocket.onclose = (event): void => {
this.logger.warn(
`WebSocket closed with code ${event.code} (${event.reason == "" ? "unknown reason" : event.reason})`
);
this.onWebSocketStatusChanged.trigger(false);
// Track the message handling promise
const messageHandlingPromise = this.handleWebSocketMessage(
message
)
.catch((error: unknown) => {
this.logger.error(
`Error handling WebSocket message: ${String(error)}`
);
})
.finally(() => {
removeFromArray(
this.outstandingPromises,
messageHandlingPromise
);
});
if (this.isStopped) {
this.resolveDisconnectingPromise?.();
this.resolveDisconnectingPromise = null;
} else {
this.reconnectTimeoutId = setTimeout(() => {
this.reconnectTimeoutId = undefined;
this.initializeWebSocket();
}, this.settings.getSettings().webSocketRetryIntervalMs);
}
};
}
void this.outstandingPromises.push(messageHandlingPromise); // ignore the returned promise
} catch (error) {
this.logger.error(
`Error parsing WebSocket message: ${String(error)}`
);
}
};
private async handleWebSocketMessage(
message: WebSocketServerMessage
): Promise<void> {
if (message.type === "vaultUpdate") {
await this.onRemoteVaultUpdateReceived.triggerAsync(message);
this.webSocket.onclose = (event): void => {
this.logger.warn(
`WebSocket closed with code ${event.code} (${event.reason == "" ? "unknown reason" : event.reason})`
);
this.webSocketStatusChangeListeners.forEach((listener) =>
listener(false)
);
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
} else if (message.type === "cursorPositions") {
this.logger.debug(
`Received cursor positions for ${JSON.stringify(message.clients)}`
);
if (this.isStopped) {
this.resolveDisconnectingPromise?.();
this.resolveDisconnectingPromise = null;
} else {
this.reconnectTimeoutId = setTimeout(() => {
this.reconnectTimeoutId = undefined;
this.initializeWebSocket();
}, this.settings.getSettings().webSocketRetryIntervalMs);
}
};
}
private async handleWebSocketMessage(
message: WebSocketServerMessage
): Promise<void> {
if (message.type === "vaultUpdate") {
await awaitAll(
this.remoteVaultUpdateListeners.map(async (listener) => {
await listener(message).catch((error: unknown) => {
this.logger.error(
`Error in vault update listener: ${String(error)}`
);
});
})
);
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
} else if (message.type === "cursorPositions") {
this.logger.debug(
`Received cursor positions for ${JSON.stringify(message.clients)}`
);
await awaitAll(
this.remoteCursorsUpdateListeners.map(async (listener) => {
await listener(message.clients).catch((error: unknown) => {
this.logger.error(
`Error in cursor positions listener: ${String(error)}`
);
});
})
);
} else {
this.logger.warn(
`Received unknown message type: ${JSON.stringify(message)}`
);
}
}
await this.onRemoteCursorsUpdateReceived.triggerAsync(
message.clients
);
} else {
this.logger.warn(
`Received unknown message type: ${JSON.stringify(message)}`
);
}
}
}

View file

@ -26,495 +26,497 @@ import { FixedSizeDocumentCache } from "./utils/data-structures/fix-sized-cache"
import { setUpTelemetry } from "./utils/set-up-telemetry";
import { DIFF_CACHE_SIZE_MB } from "./consts";
import { ServerConfig } from "./services/server-config";
import { EventListeners } from "./utils/data-structures/event-listeners";
export class SyncClient {
private hasStartedOfflineSync = false;
private hasFinishedOfflineSync = false;
private hasStarted = false;
private hasBeenDestroyed = false;
private unloadTelemetry?: () => void;
private constructor(
private readonly history: SyncHistory,
private readonly settings: Settings,
private readonly database: Database,
private readonly syncer: Syncer,
private readonly webSocketManager: WebSocketManager,
public readonly logger: Logger,
private readonly fetchController: FetchController,
private readonly cursorTracker: CursorTracker,
private readonly fileChangeNotifier: FileChangeNotifier,
private readonly contentCache: FixedSizeDocumentCache,
private readonly fileOperations: FileOperations,
private readonly serverConfig: ServerConfig,
private readonly persistence: PersistenceProvider<
Partial<{
settings: Partial<SyncSettings>;
database: Partial<StoredDatabase>;
}>
>
) {}
public get documentCount(): number {
return this.database.length;
}
public get isWebSocketConnected(): boolean {
return this.webSocketManager.isWebSocketConnected;
}
public static async create({
fs,
persistence,
fetch,
webSocket,
nativeLineEndings = "\n"
}: {
fs: FileSystemOperations;
persistence: PersistenceProvider<
Partial<{
settings: Partial<SyncSettings>;
database: Partial<StoredDatabase>;
}>
>;
fetch?: typeof globalThis.fetch;
webSocket?: typeof globalThis.WebSocket;
nativeLineEndings?: string;
}): Promise<SyncClient> {
const logger = new Logger();
const deviceId = createClientId();
logger.info(`Creating SyncClient with client id ${deviceId}`);
const history = new SyncHistory(logger);
let state = (await persistence.load()) ?? {
settings: undefined,
database: undefined
};
const settings = new Settings(
logger,
state.settings,
async (data): Promise<void> => {
state = { ...state, settings: data };
// we're not rate-limiting settings saves as (1) we need to initialise the settings to know the rate limit
// and (2) settings changes are infrequent enough that rate-limiting is not necessary
await persistence.save(state);
}
);
const rateLimitedSave = rateLimit(
persistence.save,
() => settings.getSettings().minimumSaveIntervalMs
);
const database = new Database(
logger,
state.database,
async (data): Promise<void> => {
state = { ...state, database: data };
await rateLimitedSave(state);
}
);
const fetchController = new FetchController(
settings.getSettings().isSyncEnabled,
logger
);
settings.addOnSettingsChangeListener((newSettings, oldSettings) => {
if (oldSettings.isSyncEnabled != newSettings.isSyncEnabled) {
fetchController.canFetch = newSettings.isSyncEnabled;
}
});
const syncService = new SyncService(
deviceId,
fetchController,
settings,
logger,
fetch
);
const serverConfig = new ServerConfig(syncService);
const fileOperations = new FileOperations(
logger,
database,
fs,
serverConfig,
nativeLineEndings
);
const contentCache = new FixedSizeDocumentCache(
1024 * 1024 * DIFF_CACHE_SIZE_MB
);
const unrestrictedSyncer = new UnrestrictedSyncer(
logger,
database,
settings,
syncService,
fileOperations,
history,
contentCache,
serverConfig
);
const webSocketManager = new WebSocketManager(
deviceId,
logger,
settings,
webSocket
);
const syncer = new Syncer(
deviceId,
logger,
database,
settings,
syncService,
webSocketManager,
fileOperations,
unrestrictedSyncer
);
const fileChangeNotifier = new FileChangeNotifier();
const cursorTracker = new CursorTracker(
database,
webSocketManager,
fileOperations,
fileChangeNotifier
);
const client = new SyncClient(
history,
settings,
database,
syncer,
webSocketManager,
logger,
fetchController,
cursorTracker,
fileChangeNotifier,
contentCache,
fileOperations,
serverConfig,
persistence
);
logger.info("SyncClient created successfully");
return client;
}
public async start(): Promise<void> {
this.checkIfDestroyed("start");
if (this.hasStarted) {
throw new Error("SyncClient has already been started");
}
this.hasStarted = true;
if (
!this.unloadTelemetry &&
this.settings.getSettings().enableTelemetry
) {
this.unloadTelemetry = setUpTelemetry();
}
this.logger.addOnMessageListener((log): void => {
if (log.level === LogLevel.ERROR && Sentry.isInitialized()) {
Sentry.captureMessage(log.message);
}
});
this.settings.addOnSettingsChangeListener(
this.onSettingsChange.bind(this)
);
if (this.settings.getSettings().isSyncEnabled) {
this.logger.info("Starting SyncClient");
await this.startSyncing();
this.logger.info("SyncClient has successfully started");
}
}
/**
* Reload settings from disk overriding current in-memory settings.
* Missing values will be filled in from DEFAULT_SETTINGS rather than
* retaining current in-memory settings.
*/
public async reloadSettings(): Promise<void> {
this.checkIfDestroyed("reloadSettings");
const state = (await this.persistence.load()) ?? {
settings: undefined
};
const settings = {
...DEFAULT_SETTINGS,
...(state.settings ?? {})
};
await this.setSettings(settings);
}
public async checkConnection(): Promise<NetworkConnectionStatus> {
this.checkIfDestroyed("checkConnection");
const server = await this.serverConfig.checkConnection(true);
return {
isSuccessful: server.isSuccessful,
serverMessage: server.message,
isWebSocketConnected: this.webSocketManager.isWebSocketConnected
};
}
public getHistoryEntries(): readonly HistoryEntry[] {
return this.history.entries;
}
public addSyncHistoryUpdateListener(
listener: (stats: HistoryStats) => unknown
): void {
this.checkIfDestroyed("addSyncHistoryUpdateListener");
this.history.addSyncHistoryUpdateListener(listener);
}
/**
* Wait for the in-flight operations to finish, reset all tracking,
* and the local database but retain the settings.
* The SyncClient can be used again after calling this method.
*/
public async reset(): Promise<void> {
this.checkIfDestroyed("reset");
this.logger.info(
"Stopping SyncClient to apply changed connection settings"
);
await this.pause();
// clear all local state
this.logger.info("Resetting SyncClient's local state");
this.database.reset();
await this.database.save(); // ensure the new database reads as empty
this.resetInMemoryState();
this.hasStartedOfflineSync = false;
this.hasFinishedOfflineSync = false;
this.serverConfig.reset();
await this.startSyncing();
}
public getSettings(): SyncSettings {
return this.settings.getSettings();
}
public async setSetting<T extends keyof SyncSettings>(
key: T,
value: SyncSettings[T]
): Promise<void> {
this.checkIfDestroyed("setSetting");
await this.settings.setSetting(key, value);
}
public async setSettings(value: Partial<SyncSettings>): Promise<void> {
this.checkIfDestroyed("setSettings");
await this.settings.setSettings(value);
}
public addOnSettingsChangeListener(
listener: (settings: SyncSettings, oldSettings: SyncSettings) => unknown
): void {
this.checkIfDestroyed("addOnSettingsChangeListener");
this.settings.addOnSettingsChangeListener(listener);
}
public addRemainingSyncOperationsListener(
listener: (remainingOperations: number) => unknown
): void {
this.checkIfDestroyed("addRemainingSyncOperationsListener");
this.syncer.addRemainingOperationsListener(listener);
}
public addWebSocketStatusChangeListener(listener: () => unknown): void {
this.checkIfDestroyed("addWebSocketStatusChangeListener");
this.webSocketManager.addWebSocketStatusChangeListener(listener);
}
public async syncLocallyCreatedFile(
relativePath: RelativePath
): Promise<void> {
this.checkIfDestroyed("syncLocallyCreatedFile");
this.fileChangeNotifier.notifyOfFileChange(relativePath);
return this.syncer.syncLocallyCreatedFile(relativePath);
}
public async syncLocallyDeletedFile(
relativePath: RelativePath
): Promise<void> {
this.checkIfDestroyed("syncLocallyDeletedFile");
this.fileChangeNotifier.notifyOfFileChange(relativePath);
return this.syncer.syncLocallyDeletedFile(relativePath);
}
public async syncLocallyUpdatedFile({
oldPath,
relativePath
}: {
oldPath?: RelativePath;
relativePath: RelativePath;
}): Promise<void> {
this.checkIfDestroyed("syncLocallyUpdatedFile");
this.fileChangeNotifier.notifyOfFileChange(relativePath);
return this.syncer.syncLocallyUpdatedFile({
oldPath,
relativePath
});
}
public getDocumentSyncingStatus(
relativePath: RelativePath
): DocumentSyncStatus {
this.checkIfDestroyed("getDocumentSyncingStatus");
if (!this.settings.getSettings().isSyncEnabled) {
return DocumentSyncStatus.SYNCING_IS_DISABLED;
}
if (!this.syncer.isFirstSyncComplete || !this.hasFinishedOfflineSync) {
return DocumentSyncStatus.SYNCING;
}
const document =
this.database.getLatestDocumentByRelativePath(relativePath);
if (document === undefined) {
return DocumentSyncStatus.SYNCING;
}
return document.updates.length > 0
? DocumentSyncStatus.SYNCING
: DocumentSyncStatus.UP_TO_DATE;
}
public async updateLocalCursors(
documentToCursors: Record<RelativePath, CursorSpan[]>
): Promise<void> {
this.checkIfDestroyed("updateLocalCursors");
await this.cursorTracker.sendLocalCursorsToServer(documentToCursors);
}
public addRemoteCursorsUpdateListener(
listener: (cursors: MaybeOutdatedClientCursors[]) => unknown
): void {
this.checkIfDestroyed("addRemoteCursorsUpdateListener");
this.cursorTracker.addRemoteCursorsUpdateListener(listener);
}
public async waitUntilFinished(): Promise<void> {
this.checkIfDestroyed("waitUntilIdle");
await this.syncer.waitUntilFinished();
await this.webSocketManager.waitUntilFinished();
await this.database.save(); // flush all changes to disk
}
/**
* Completely destroy the SyncClient, cancelling all in-progress operations.
* After calling this method, the SyncClient cannot be used again.
*/
public async destroy(): Promise<void> {
this.checkIfDestroyed("destroy");
// cancel everything that's in progress
await this.pause();
this.hasBeenDestroyed = true;
this.resetInMemoryState();
this.logger.info("SyncClient has been successfully disposed");
this.unloadTelemetry?.();
}
private async startSyncing(): Promise<void> {
this.checkIfDestroyed("startSyncing");
this.fetchController.finishReset();
await this.serverConfig.initialize();
this.webSocketManager.start();
if (!this.hasStartedOfflineSync) {
this.hasStartedOfflineSync = true;
await this.syncer.scheduleSyncForOfflineChanges();
}
this.hasFinishedOfflineSync = true;
}
private async pause(): Promise<void> {
this.fetchController.startReset();
await this.webSocketManager.stop();
await this.waitUntilFinished();
}
private resetInMemoryState(): void {
this.history.reset();
this.contentCache.reset();
// don't reset the logger
this.cursorTracker.reset();
this.syncer.reset();
this.fileOperations.reset();
}
private async onSettingsChange(
newSettings: SyncSettings,
oldSettings: SyncSettings
): Promise<void> {
this.checkIfDestroyed("onSettingsChange");
if (
newSettings.vaultName !== oldSettings.vaultName ||
newSettings.remoteUri !== oldSettings.remoteUri
) {
await this.reset();
}
if (newSettings.isSyncEnabled !== oldSettings.isSyncEnabled) {
if (newSettings.isSyncEnabled) {
await this.startSyncing();
} else {
await this.pause();
}
}
if (newSettings.diffCacheSizeMB !== oldSettings.diffCacheSizeMB) {
this.contentCache.resize(newSettings.diffCacheSizeMB * 1024 * 1024);
}
if (newSettings.enableTelemetry !== oldSettings.enableTelemetry) {
if (newSettings.enableTelemetry) {
this.unloadTelemetry = setUpTelemetry();
} else {
this.unloadTelemetry?.();
}
}
}
private checkIfDestroyed(origin: string): void {
if (this.hasBeenDestroyed) {
throw new Error(
`SyncClient has been destroyed and can no longer be used; called from ${origin}`
);
}
}
private hasStartedOfflineSync = false;
private hasFinishedOfflineSync = false;
private hasStarted = false;
private hasBeenDestroyed = false;
private unloadTelemetry?: () => void;
private constructor(
private readonly history: SyncHistory,
private readonly settings: Settings,
private readonly database: Database,
private readonly syncer: Syncer,
private readonly webSocketManager: WebSocketManager,
public readonly logger: Logger,
private readonly fetchController: FetchController,
private readonly cursorTracker: CursorTracker,
private readonly fileChangeNotifier: FileChangeNotifier,
private readonly contentCache: FixedSizeDocumentCache,
private readonly fileOperations: FileOperations,
private readonly serverConfig: ServerConfig,
private readonly persistence: PersistenceProvider<
Partial<{
settings: Partial<SyncSettings>;
database: Partial<StoredDatabase>;
}>
>
) { }
public get documentCount(): number {
return this.database.length;
}
public get isWebSocketConnected(): boolean {
return this.webSocketManager.isWebSocketConnected;
}
public static async create({
fs,
persistence,
fetch,
webSocket,
nativeLineEndings = "\n"
}: {
fs: FileSystemOperations;
persistence: PersistenceProvider<
Partial<{
settings: Partial<SyncSettings>;
database: Partial<StoredDatabase>;
}>
>;
fetch?: typeof globalThis.fetch;
webSocket?: typeof globalThis.WebSocket;
nativeLineEndings?: string;
}): Promise<SyncClient> {
const logger = new Logger();
const deviceId = createClientId();
logger.info(`Creating SyncClient with client id ${deviceId}`);
const history = new SyncHistory(logger);
let state = (await persistence.load()) ?? {
settings: undefined,
database: undefined
};
const settings = new Settings(
logger,
state.settings,
async (data): Promise<void> => {
state = { ...state, settings: data };
// we're not rate-limiting settings saves as (1) we need to initialise the settings to know the rate limit
// and (2) settings changes are infrequent enough that rate-limiting is not necessary
await persistence.save(state);
}
);
const rateLimitedSave = rateLimit(
persistence.save,
() => settings.getSettings().minimumSaveIntervalMs
);
const database = new Database(
logger,
state.database,
async (data): Promise<void> => {
state = { ...state, database: data };
await rateLimitedSave(state);
}
);
const fetchController = new FetchController(
settings.getSettings().isSyncEnabled,
logger
);
settings.onSettingsChanged.add((newSettings, oldSettings) => {
if (oldSettings.isSyncEnabled != newSettings.isSyncEnabled) {
fetchController.canFetch = newSettings.isSyncEnabled;
}
});
const syncService = new SyncService(
deviceId,
fetchController,
settings,
logger,
fetch
);
const serverConfig = new ServerConfig(syncService);
const fileOperations = new FileOperations(
logger,
database,
fs,
serverConfig,
nativeLineEndings
);
const contentCache = new FixedSizeDocumentCache(
1024 * 1024 * DIFF_CACHE_SIZE_MB
);
const unrestrictedSyncer = new UnrestrictedSyncer(
logger,
database,
settings,
syncService,
fileOperations,
history,
contentCache,
serverConfig
);
const webSocketManager = new WebSocketManager(
deviceId,
logger,
settings,
webSocket
);
const syncer = new Syncer(
deviceId,
logger,
database,
settings,
syncService,
webSocketManager,
fileOperations,
unrestrictedSyncer
);
const fileChangeNotifier = new FileChangeNotifier();
const cursorTracker = new CursorTracker(
database,
webSocketManager,
fileOperations,
fileChangeNotifier
);
const client = new SyncClient(
history,
settings,
database,
syncer,
webSocketManager,
logger,
fetchController,
cursorTracker,
fileChangeNotifier,
contentCache,
fileOperations,
serverConfig,
persistence
);
logger.info("SyncClient created successfully");
return client;
}
public async start(): Promise<void> {
this.checkIfDestroyed("start");
if (this.hasStarted) {
throw new Error("SyncClient has already been started");
}
this.hasStarted = true;
if (
!this.unloadTelemetry &&
this.settings.getSettings().enableTelemetry
) {
this.unloadTelemetry = setUpTelemetry();
}
this.logger.onLogEmitted.add((log): void => {
if (log.level === LogLevel.ERROR && Sentry.isInitialized()) {
Sentry.captureMessage(log.message);
}
});
this.settings.onSettingsChanged.add(
this.onSettingsChange.bind(this)
);
if (this.settings.getSettings().isSyncEnabled) {
this.logger.info("Starting SyncClient");
await this.startSyncing();
this.logger.info("SyncClient has successfully started");
}
}
/**
* Reload settings from disk overriding current in-memory settings.
* Missing values will be filled in from DEFAULT_SETTINGS rather than
* retaining current in-memory settings.
*/
public async reloadSettings(): Promise<void> {
this.checkIfDestroyed("reloadSettings");
const state = (await this.persistence.load()) ?? {
settings: undefined
};
const settings = {
...DEFAULT_SETTINGS,
...(state.settings ?? {})
};
await this.setSettings(settings);
}
public async checkConnection(): Promise<NetworkConnectionStatus> {
this.checkIfDestroyed("checkConnection");
const server = await this.serverConfig.checkConnection(true);
return {
isSuccessful: server.isSuccessful,
serverMessage: server.message,
isWebSocketConnected: this.webSocketManager.isWebSocketConnected
};
}
public getHistoryEntries(): readonly HistoryEntry[] {
return this.history.entries;
}
/**
* Wait for the in-flight operations to finish, reset all tracking,
* and the local database but retain the settings.
* The SyncClient can be used again after calling this method.
*/
public async reset(): Promise<void> {
this.checkIfDestroyed("reset");
this.logger.info(
"Stopping SyncClient to apply changed connection settings"
);
await this.pause();
// clear all local state
this.logger.info("Resetting SyncClient's local state");
this.database.reset();
await this.database.save(); // ensure the new database reads as empty
this.resetInMemoryState();
this.hasStartedOfflineSync = false;
this.hasFinishedOfflineSync = false;
this.serverConfig.reset();
await this.startSyncing();
}
public getSettings(): SyncSettings {
return this.settings.getSettings();
}
public async setSetting<T extends keyof SyncSettings>(
key: T,
value: SyncSettings[T]
): Promise<void> {
this.checkIfDestroyed("setSetting");
await this.settings.setSetting(key, value);
}
public async setSettings(value: Partial<SyncSettings>): Promise<void> {
this.checkIfDestroyed("setSettings");
await this.settings.setSettings(value);
}
public get onSyncHistoryUpdated(): EventListeners<
(stats: HistoryStats) => unknown
> {
this.checkIfDestroyed("onSyncHistoryUpdated getter");
return this.history.onHistoryUpdated;
}
public get onSettingsChanged(): EventListeners<
(newSettings: SyncSettings, oldSettings: SyncSettings) => unknown
> {
this.checkIfDestroyed("onSettingsChanged getter");
return this.settings.onSettingsChanged;
}
public get onRemainingOperationsCountChanged(): EventListeners<
(remainingOperationsCount: number) => unknown
> {
this.checkIfDestroyed("onRemainingOperationsCountChanged getter");
return this.syncer.onRemainingOperationsCountChanged;
}
public get onWebSocketStatusChanged(): EventListeners<
(isConnected: boolean) => unknown
> {
this.checkIfDestroyed("onWebSocketStatusChanged getter");
return this.webSocketManager.onWebSocketStatusChanged;
}
public async syncLocallyCreatedFile(
relativePath: RelativePath
): Promise<void> {
this.checkIfDestroyed("syncLocallyCreatedFile");
this.fileChangeNotifier.notifyOfFileChange(relativePath);
return this.syncer.syncLocallyCreatedFile(relativePath);
}
public async syncLocallyDeletedFile(
relativePath: RelativePath
): Promise<void> {
this.checkIfDestroyed("syncLocallyDeletedFile");
this.fileChangeNotifier.notifyOfFileChange(relativePath);
return this.syncer.syncLocallyDeletedFile(relativePath);
}
public async syncLocallyUpdatedFile({
oldPath,
relativePath
}: {
oldPath?: RelativePath;
relativePath: RelativePath;
}): Promise<void> {
this.checkIfDestroyed("syncLocallyUpdatedFile");
this.fileChangeNotifier.notifyOfFileChange(relativePath);
return this.syncer.syncLocallyUpdatedFile({
oldPath,
relativePath
});
}
public getDocumentSyncingStatus(
relativePath: RelativePath
): DocumentSyncStatus {
this.checkIfDestroyed("getDocumentSyncingStatus");
if (!this.settings.getSettings().isSyncEnabled) {
return DocumentSyncStatus.SYNCING_IS_DISABLED;
}
if (!this.syncer.isFirstSyncComplete || !this.hasFinishedOfflineSync) {
return DocumentSyncStatus.SYNCING;
}
const document =
this.database.getLatestDocumentByRelativePath(relativePath);
if (document === undefined) {
return DocumentSyncStatus.SYNCING;
}
return document.updates.length > 0
? DocumentSyncStatus.SYNCING
: DocumentSyncStatus.UP_TO_DATE;
}
public async updateLocalCursors(
documentToCursors: Record<RelativePath, CursorSpan[]>
): Promise<void> {
this.checkIfDestroyed("updateLocalCursors");
await this.cursorTracker.sendLocalCursorsToServer(documentToCursors);
}
public get onRemoteCursorsUpdated(): EventListeners<
(cursors: MaybeOutdatedClientCursors[]) => unknown
> {
this.checkIfDestroyed("onRemoteCursorsUpdated getter");
return this.cursorTracker.onRemoteCursorsUpdated;
}
public async waitUntilFinished(): Promise<void> {
this.checkIfDestroyed("waitUntilIdle");
await this.syncer.waitUntilFinished();
await this.webSocketManager.waitUntilFinished();
await this.database.save(); // flush all changes to disk
}
/**
* Completely destroy the SyncClient, cancelling all in-progress operations.
* After calling this method, the SyncClient cannot be used again.
*/
public async destroy(): Promise<void> {
this.checkIfDestroyed("destroy");
// cancel everything that's in progress
await this.pause();
this.hasBeenDestroyed = true;
this.resetInMemoryState();
this.logger.info("SyncClient has been successfully disposed");
this.unloadTelemetry?.();
}
private async startSyncing(): Promise<void> {
this.checkIfDestroyed("startSyncing");
this.fetchController.finishReset();
await this.serverConfig.initialize();
this.webSocketManager.start();
if (!this.hasStartedOfflineSync) {
this.hasStartedOfflineSync = true;
await this.syncer.scheduleSyncForOfflineChanges();
}
this.hasFinishedOfflineSync = true;
}
private async pause(): Promise<void> {
this.fetchController.startReset();
await this.webSocketManager.stop();
await this.waitUntilFinished();
}
private resetInMemoryState(): void {
this.history.reset();
this.contentCache.reset();
// don't reset the logger
this.cursorTracker.reset();
this.syncer.reset();
this.fileOperations.reset();
}
private async onSettingsChange(
newSettings: SyncSettings,
oldSettings: SyncSettings
): Promise<void> {
this.checkIfDestroyed("onSettingsChange");
if (
newSettings.vaultName !== oldSettings.vaultName ||
newSettings.remoteUri !== oldSettings.remoteUri
) {
await this.reset();
}
if (newSettings.isSyncEnabled !== oldSettings.isSyncEnabled) {
if (newSettings.isSyncEnabled) {
await this.startSyncing();
} else {
await this.pause();
}
}
if (newSettings.diffCacheSizeMB !== oldSettings.diffCacheSizeMB) {
this.contentCache.resize(newSettings.diffCacheSizeMB * 1024 * 1024);
}
if (newSettings.enableTelemetry !== oldSettings.enableTelemetry) {
if (newSettings.enableTelemetry) {
this.unloadTelemetry = setUpTelemetry();
} else {
this.unloadTelemetry?.();
}
}
}
private checkIfDestroyed(origin: string): void {
if (this.hasBeenDestroyed) {
throw new Error(
`SyncClient has been destroyed and can no longer be used; called from ${origin}`
);
}
}
}

View file

@ -9,252 +9,252 @@ import { DocumentUpToDateness } from "../types/document-up-to-dateness";
import { hash } from "../utils/hash";
import type { FileChangeNotifier } from "./file-change-notifier";
import { Lock } from "../utils/data-structures/locks";
import { EventListeners } from "../utils/data-structures/event-listeners";
// Cursor positions are updated separately from documents. However, a given cursor position is only
// valid within a certain version of the document it belongs to. This class tracks previous and the latest
// known remote cursor positions, and for each document, tries to return the latest cursor positions that are
// not from the future.
export class CursorTracker {
private readonly updateLock = new Lock();
private readonly updateLock = new Lock();
private knownRemoteCursors: (ClientCursors & {
upToDateness: DocumentUpToDateness;
})[] = [];
// The returned position may be accurate, if it matches the document version, or outdated, in which case
// the client has to heuristically guess it's current position based on the local edits.
public readonly onRemoteCursorsUpdated = new EventListeners<
(cursors: MaybeOutdatedClientCursors[]) => unknown
>();
private lastLocalCursorState: DocumentWithCursors[] = [];
private lastLocalCursorStateWithoutDirtyDocuments: DocumentWithCursors[] =
[];
private knownRemoteCursors: (ClientCursors & {
upToDateness: DocumentUpToDateness;
})[] = [];
public constructor(
private readonly database: Database,
private readonly webSocketManager: WebSocketManager,
private readonly fileOperations: FileOperations,
private readonly fileChangeNotifier: FileChangeNotifier
) {
this.webSocketManager.addRemoteCursorsUpdateListener(
async (clientCursors) => {
await this.updateLock.withLock(async () => {
// The latest message will contain all active clients, so we can delete the ones
// from the local list which are no longer active.
const allIds = new Set(
clientCursors.map((c) => c.deviceId)
);
const updatedKnownRemoteCursors =
this.knownRemoteCursors.filter((c) =>
allIds.has(c.deviceId)
);
private lastLocalCursorState: DocumentWithCursors[] = [];
private lastLocalCursorStateWithoutDirtyDocuments: DocumentWithCursors[] =
[];
for (const cursor of clientCursors.filter((client) =>
client.documentsWithCursors.every(
(doc) => doc.vault_update_id != null
)
)) {
updatedKnownRemoteCursors.push({
...cursor,
upToDateness:
await this.getDocumentsUpToDateness(cursor)
});
}
public constructor(
private readonly database: Database,
private readonly webSocketManager: WebSocketManager,
private readonly fileOperations: FileOperations,
private readonly fileChangeNotifier: FileChangeNotifier
) {
this.webSocketManager.onRemoteCursorsUpdateReceived.add(
async (clientCursors) => {
await this.updateLock.withLock(async () => {
// The latest message will contain all active clients, so we can delete the ones
// from the local list which are no longer active.
const allIds = new Set(
clientCursors.map((c) => c.deviceId)
);
const updatedKnownRemoteCursors =
this.knownRemoteCursors.filter((c) =>
allIds.has(c.deviceId)
);
this.knownRemoteCursors = updatedKnownRemoteCursors;
});
}
);
for (const cursor of clientCursors.filter((client) =>
client.documentsWithCursors.every(
(doc) => doc.vault_update_id != null
)
)) {
updatedKnownRemoteCursors.push({
...cursor,
upToDateness:
await this.getDocumentsUpToDateness(cursor)
});
}
this.fileChangeNotifier.addFileChangeListener(async (relativePath) =>
this.updateLock.withLock(async () => {
for (const clientCursor of this.knownRemoteCursors) {
if (
clientCursor.documentsWithCursors.some(
(document) =>
document.relative_path === relativePath
)
) {
clientCursor.upToDateness =
await this.getDocumentsUpToDateness(clientCursor);
}
}
})
);
}
this.knownRemoteCursors = updatedKnownRemoteCursors;
});
/// Update the local cursors for the given documents.
/// Can be called frequently as it only emits an event
/// if the state has actually changed.
public async sendLocalCursorsToServer(
documentToCursors: Record<RelativePath, CursorSpan[]>
): Promise<void> {
const documentsWithCursors: DocumentWithCursors[] = [];
this.onRemoteCursorsUpdated.trigger(
this.getRelevantAndPruneKnownClientCursors()
);
}
);
for (const [relativePath, cursors] of Object.entries(
documentToCursors
)) {
const record =
this.database.getLatestDocumentByRelativePath(relativePath);
if (!record) {
continue; // Let's wait for the file to be created before sending cursors
}
this.fileChangeNotifier.onFileChanged.add(async (relativePath) =>
this.updateLock.withLock(async () => {
for (const clientCursor of this.knownRemoteCursors) {
if (
clientCursor.documentsWithCursors.some(
(document) =>
document.relative_path === relativePath
)
) {
clientCursor.upToDateness =
await this.getDocumentsUpToDateness(clientCursor);
}
}
})
);
}
if (!record.metadata) {
continue; // this is a new document, no need to sync the cursors
}
/// Update the local cursors for the given documents.
/// Can be called frequently as it only emits an event
/// if the state has actually changed.
public async sendLocalCursorsToServer(
documentToCursors: Record<RelativePath, CursorSpan[]>
): Promise<void> {
const documentsWithCursors: DocumentWithCursors[] = [];
documentsWithCursors.push({
relative_path: relativePath,
document_id: record.documentId,
vault_update_id: record.metadata.parentVersionId,
cursors: cursors.map(({ start, end }) => ({
start: Math.min(start, end),
end: Math.max(start, end)
})) // the client might send directional selections
});
}
for (const [relativePath, cursors] of Object.entries(
documentToCursors
)) {
const record =
this.database.getLatestDocumentByRelativePath(relativePath);
if (
JSON.stringify(this.lastLocalCursorState) ===
JSON.stringify(documentsWithCursors)
) {
// Caching step to avoid reading the edited files all the time
return;
}
this.lastLocalCursorState = documentsWithCursors;
if (!record) {
continue; // Let's wait for the file to be created before sending cursors
}
for (const doc of documentsWithCursors) {
const readContent = await this.fileOperations.read(
doc.relative_path
);
const record = this.database.getLatestDocumentByRelativePath(
doc.relative_path
);
if (record?.metadata?.hash !== hash(readContent)) {
doc.vault_update_id = null;
}
}
if (!record.metadata) {
continue; // this is a new document, no need to sync the cursors
}
if (
JSON.stringify(this.lastLocalCursorStateWithoutDirtyDocuments) ===
JSON.stringify(documentsWithCursors)
) {
return;
}
documentsWithCursors.push({
relative_path: relativePath,
document_id: record.documentId,
vault_update_id: record.metadata.parentVersionId,
cursors: cursors.map(({ start, end }) => ({
start: Math.min(start, end),
end: Math.max(start, end)
})) // the client might send directional selections
});
}
this.lastLocalCursorStateWithoutDirtyDocuments = documentsWithCursors;
if (
JSON.stringify(this.lastLocalCursorState) ===
JSON.stringify(documentsWithCursors)
) {
// Caching step to avoid reading the edited files all the time
return;
}
this.lastLocalCursorState = documentsWithCursors;
this.webSocketManager.updateLocalCursors({ documentsWithCursors });
}
for (const doc of documentsWithCursors) {
const readContent = await this.fileOperations.read(
doc.relative_path
);
const record = this.database.getLatestDocumentByRelativePath(
doc.relative_path
);
if (record?.metadata?.hash !== hash(readContent)) {
doc.vault_update_id = null;
}
}
// The returned position may be accurate, if it matches the document version, or outdated, in which case
// the client has to heuristically guess it's current position based on the local edits.
public addRemoteCursorsUpdateListener(
listener: (cursors: MaybeOutdatedClientCursors[]) => unknown
): void {
// CursorTracker registers its own event listener in the constructor so it must have been called before this
this.webSocketManager.addRemoteCursorsUpdateListener(async () => {
await this.updateLock.withLock(() =>
listener(this.getRelevantAndPruneKnownClientCursors())
);
});
}
if (
JSON.stringify(this.lastLocalCursorStateWithoutDirtyDocuments) ===
JSON.stringify(documentsWithCursors)
) {
return;
}
public reset(): void {
this.knownRemoteCursors = [];
this.lastLocalCursorState = [];
this.lastLocalCursorStateWithoutDirtyDocuments = [];
this.updateLock.reset();
}
this.lastLocalCursorStateWithoutDirtyDocuments = documentsWithCursors;
private getRelevantAndPruneKnownClientCursors(): MaybeOutdatedClientCursors[] {
const result: MaybeOutdatedClientCursors[] = [];
const included = new Set<string>();
this.webSocketManager.updateLocalCursors({ documentsWithCursors });
}
const relevantCursors = [];
for (const clientCursors of [...this.knownRemoteCursors].reverse()) {
if (included.has(clientCursors.deviceId)) {
continue;
}
if (clientCursors.upToDateness === DocumentUpToDateness.Later) {
continue;
}
public reset(): void {
this.knownRemoteCursors = [];
this.lastLocalCursorState = [];
this.lastLocalCursorStateWithoutDirtyDocuments = [];
this.updateLock.reset();
}
result.push({
...clientCursors,
isOutdated:
clientCursors.upToDateness === DocumentUpToDateness.Prior
});
private getRelevantAndPruneKnownClientCursors(): MaybeOutdatedClientCursors[] {
const result: MaybeOutdatedClientCursors[] = [];
const included = new Set<string>();
included.add(clientCursors.deviceId);
relevantCursors.unshift(clientCursors); // to reverse order back to normal
}
const relevantCursors = [];
for (const clientCursors of [...this.knownRemoteCursors].reverse()) {
if (included.has(clientCursors.deviceId)) {
continue;
}
this.knownRemoteCursors = relevantCursors;
if (clientCursors.upToDateness === DocumentUpToDateness.Later) {
continue;
}
return result;
}
result.push({
...clientCursors,
isOutdated:
clientCursors.upToDateness === DocumentUpToDateness.Prior
});
// We store up-to-dateness on a per-client basis to simplify the implementation.
// An individual client won't have too many documents open at once, so this is a reasonable trade-off.
private async getDocumentsUpToDateness(
clientCursor: ClientCursors
): Promise<DocumentUpToDateness> {
const results = [];
for (const document of clientCursor.documentsWithCursors) {
results.push(await this.getDocumentUpToDateness(document));
}
included.add(clientCursors.deviceId);
relevantCursors.unshift(clientCursors); // to reverse order back to normal
}
if (
results.every((result) => result === DocumentUpToDateness.UpToDate)
) {
return DocumentUpToDateness.UpToDate;
}
this.knownRemoteCursors = relevantCursors;
if (
results.every(
(result) =>
result === DocumentUpToDateness.UpToDate ||
result === DocumentUpToDateness.Prior
)
) {
return DocumentUpToDateness.Prior;
}
return result;
}
return DocumentUpToDateness.Later;
}
// We store up-to-dateness on a per-client basis to simplify the implementation.
// An individual client won't have too many documents open at once, so this is a reasonable trade-off.
private async getDocumentsUpToDateness(
clientCursor: ClientCursors
): Promise<DocumentUpToDateness> {
const results = [];
for (const document of clientCursor.documentsWithCursors) {
results.push(await this.getDocumentUpToDateness(document));
}
private async getDocumentUpToDateness(
document: DocumentWithCursors
): Promise<DocumentUpToDateness> {
const record = this.database.getLatestDocumentByRelativePath(
document.relative_path
);
if (
results.every((result) => result === DocumentUpToDateness.UpToDate)
) {
return DocumentUpToDateness.UpToDate;
}
if (!record) {
// the document of the cursor must be from the future
return DocumentUpToDateness.Later;
}
if (
results.every(
(result) =>
result === DocumentUpToDateness.UpToDate ||
result === DocumentUpToDateness.Prior
)
) {
return DocumentUpToDateness.Prior;
}
if (
(record.metadata?.parentVersionId ?? 0) <
(document.vault_update_id ?? 0)
) {
return DocumentUpToDateness.Later;
} else if (
(document.vault_update_id ?? 0) <
(record.metadata?.parentVersionId ?? 0)
) {
// the document of the cursor must be from the past
return DocumentUpToDateness.Prior;
}
return DocumentUpToDateness.Later;
}
const currentContent = await this.fileOperations.read(
document.relative_path
);
private async getDocumentUpToDateness(
document: DocumentWithCursors
): Promise<DocumentUpToDateness> {
const record = this.database.getLatestDocumentByRelativePath(
document.relative_path
);
return this.database.getLatestDocumentByRelativePath(
document.relative_path
)?.metadata?.hash === hash(currentContent)
? DocumentUpToDateness.UpToDate
: DocumentUpToDateness.Prior;
}
if (!record) {
// the document of the cursor must be from the future
return DocumentUpToDateness.Later;
}
if (
(record.metadata?.parentVersionId ?? 0) <
(document.vault_update_id ?? 0)
) {
return DocumentUpToDateness.Later;
} else if (
(document.vault_update_id ?? 0) <
(record.metadata?.parentVersionId ?? 0)
) {
// the document of the cursor must be from the past
return DocumentUpToDateness.Prior;
}
const currentContent = await this.fileOperations.read(
document.relative_path
);
return this.database.getLatestDocumentByRelativePath(
document.relative_path
)?.metadata?.hash === hash(currentContent)
? DocumentUpToDateness.UpToDate
: DocumentUpToDateness.Prior;
}
}

View file

@ -1,22 +1,12 @@
import type { RelativePath } from "../persistence/database";
import { removeFromArray } from "../utils/remove-from-array";
import { EventListeners } from "../utils/data-structures/event-listeners";
export class FileChangeNotifier {
private readonly listeners: ((filePath: RelativePath) => unknown)[] = [];
public readonly onFileChanged = new EventListeners<
(filePath: RelativePath) => unknown
>();
public addFileChangeListener(
listener: (filePath: RelativePath) => unknown
): void {
this.listeners.push(listener);
}
public removeFileChangeListener(
listener: (filePath: RelativePath) => unknown
): void {
removeFromArray(this.listeners, listener);
}
public notifyOfFileChange(filePath: RelativePath): void {
this.listeners.forEach((listener) => listener(filePath));
}
public notifyOfFileChange(filePath: RelativePath): void {
this.onFileChanged.trigger(filePath);
}
}

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

View file

@ -1,87 +1,72 @@
import { MAX_LOG_MESSAGE_COUNT } from "../consts";
import { removeFromArray } from "../utils/remove-from-array";
import { EventListeners } from "../utils/data-structures/event-listeners";
export enum LogLevel {
DEBUG = "DEBUG",
INFO = "INFO",
WARNING = "WARNING",
ERROR = "ERROR"
DEBUG = "DEBUG",
INFO = "INFO",
WARNING = "WARNING",
ERROR = "ERROR"
}
const LOG_LEVEL_ORDER = {
[LogLevel.DEBUG]: 0,
[LogLevel.INFO]: 1,
[LogLevel.WARNING]: 2,
[LogLevel.ERROR]: 3
[LogLevel.DEBUG]: 0,
[LogLevel.INFO]: 1,
[LogLevel.WARNING]: 2,
[LogLevel.ERROR]: 3
};
export class LogLine {
public timestamp = new Date();
public constructor(
public level: LogLevel,
public message: string
) {}
public timestamp = new Date();
public constructor(
public level: LogLevel,
public message: string
) { }
}
export class Logger {
private readonly messages: LogLine[] = [];
private readonly onMessageListeners: ((message: LogLine) => unknown)[] = [];
private readonly messages: LogLine[] = [];
public readonly onLogEmitted = new EventListeners<
(message: LogLine) => unknown
>();
public constructor(
...onMessageListeners: ((message: LogLine) => unknown)[]
) {
this.onMessageListeners = onMessageListeners;
}
public debug(message: string): void {
this.pushMessage(message, LogLevel.DEBUG);
}
public debug(message: string): void {
this.pushMessage(message, LogLevel.DEBUG);
}
public info(message: string): void {
this.pushMessage(message, LogLevel.INFO);
}
public info(message: string): void {
this.pushMessage(message, LogLevel.INFO);
}
public warn(message: string): void {
this.pushMessage(message, LogLevel.WARNING);
}
public warn(message: string): void {
this.pushMessage(message, LogLevel.WARNING);
}
public error(message: string): void {
this.pushMessage(message, LogLevel.ERROR);
}
public error(message: string): void {
this.pushMessage(message, LogLevel.ERROR);
}
public getMessages(mininumSeverity: LogLevel): LogLine[] {
return this.messages.filter(
(message) =>
LOG_LEVEL_ORDER[message.level] >=
LOG_LEVEL_ORDER[mininumSeverity]
);
}
public getMessages(mininumSeverity: LogLevel): LogLine[] {
return this.messages.filter(
(message) =>
LOG_LEVEL_ORDER[message.level] >=
LOG_LEVEL_ORDER[mininumSeverity]
);
}
public addOnMessageListener(listener: (message: LogLine) => unknown): void {
this.onMessageListeners.push(listener);
}
public reset(): void {
this.messages.length = 0;
this.debug("Logger has been reset");
}
public removeOnMessageListener(
listener: (message: LogLine) => unknown
): void {
removeFromArray(this.onMessageListeners, listener);
}
private pushMessage(message: string, level: LogLevel): void {
const logLine = new LogLine(level, message);
this.messages.push(logLine);
public reset(): void {
this.messages.length = 0;
this.debug("Logger has been reset");
}
while (this.messages.length > MAX_LOG_MESSAGE_COUNT) {
this.messages.shift();
}
private pushMessage(message: string, level: LogLevel): void {
const logLine = new LogLine(level, message);
this.messages.push(logLine);
while (this.messages.length > MAX_LOG_MESSAGE_COUNT) {
this.messages.shift();
}
this.onMessageListeners.forEach((listener) => {
listener(logLine);
});
}
this.onLogEmitted.trigger(logLine);
}
}

View file

@ -1,183 +1,169 @@
import {
MAX_HISTORY_ENTRY_COUNT,
TIMEOUT_FOR_MERGING_HISTORY_ENTRIES_IN_SECONDS
MAX_HISTORY_ENTRY_COUNT,
TIMEOUT_FOR_MERGING_HISTORY_ENTRIES_IN_SECONDS
} from "../consts";
import type { RelativePath } from "../persistence/database";
import type { Logger } from "./logger";
import { removeFromArray } from "../utils/remove-from-array";
import { EventListeners } from "../utils/data-structures/event-listeners";
export interface SyncCreateDetails {
type: SyncType.CREATE;
relativePath: RelativePath;
type: SyncType.CREATE;
relativePath: RelativePath;
}
export interface SyncUpdateDetails {
type: SyncType.UPDATE;
relativePath: RelativePath;
type: SyncType.UPDATE;
relativePath: RelativePath;
}
export interface SyncMovedDetails {
type: SyncType.MOVE;
relativePath: RelativePath;
movedFrom: RelativePath;
type: SyncType.MOVE;
relativePath: RelativePath;
movedFrom: RelativePath;
}
export interface SyncDeleteDetails {
type: SyncType.DELETE;
relativePath: RelativePath;
type: SyncType.DELETE;
relativePath: RelativePath;
}
export interface SyncSkippedDetails {
type: SyncType.SKIPPED;
relativePath: RelativePath;
type: SyncType.SKIPPED;
relativePath: RelativePath;
}
export type SyncDetails =
| SyncCreateDetails
| SyncUpdateDetails
| SyncDeleteDetails
| SyncMovedDetails
| SyncSkippedDetails;
| SyncCreateDetails
| SyncUpdateDetails
| SyncDeleteDetails
| SyncMovedDetails
| SyncSkippedDetails;
export interface CommonHistoryEntry {
status: SyncStatus;
message: string;
details: SyncDetails;
author?: string;
timestamp?: Date;
status: SyncStatus;
message: string;
details: SyncDetails;
author?: string;
timestamp?: Date;
}
export enum SyncType {
CREATE = "CREATE",
UPDATE = "UPDATE",
DELETE = "DELETE",
MOVE = "MOVE",
SKIPPED = "SKIPPED"
CREATE = "CREATE",
UPDATE = "UPDATE",
DELETE = "DELETE",
MOVE = "MOVE",
SKIPPED = "SKIPPED"
}
export enum SyncStatus {
SUCCESS = "SUCCESS",
ERROR = "ERROR",
SKIPPED = "SKIPPED"
SUCCESS = "SUCCESS",
ERROR = "ERROR",
SKIPPED = "SKIPPED"
}
export type HistoryEntry = CommonHistoryEntry & { timestamp: Date };
export interface HistoryStats {
success: number;
error: number;
success: number;
error: number;
}
export class SyncHistory {
private readonly _entries: HistoryEntry[] = [];
private readonly _entries: HistoryEntry[] = [];
private readonly syncHistoryUpdateListeners: ((
status: HistoryStats
) => unknown)[] = [];
public readonly onHistoryUpdated = new EventListeners<
(status: HistoryStats) => unknown
>();
private status: HistoryStats = {
success: 0,
error: 0
};
private status: HistoryStats = {
success: 0,
error: 0
};
public constructor(private readonly logger: Logger) {}
public constructor(private readonly logger: Logger) { }
public get entries(): readonly HistoryEntry[] {
return this._entries;
}
public get entries(): readonly HistoryEntry[] {
return this._entries;
}
/**
* Insert the entry at the beginning of the history list. If the entry
* already in the list, it will get moved to the beginning and updated.
*
* If the entry list is too long, the oldest entry will be removed.
*/
public addHistoryEntry(entry: CommonHistoryEntry): void {
const historyEntry = {
...entry,
timestamp: entry.timestamp ?? new Date()
};
/**
* Insert the entry at the beginning of the history list. If the entry
* already in the list, it will get moved to the beginning and updated.
*
* If the entry list is too long, the oldest entry will be removed.
*/
public addHistoryEntry(entry: CommonHistoryEntry): void {
const historyEntry = {
...entry,
timestamp: entry.timestamp ?? new Date()
};
const candidate = this.findSimilarRecentUpdateEntry(historyEntry);
if (candidate !== undefined) {
removeFromArray(this._entries, candidate);
}
const candidate = this.findSimilarRecentUpdateEntry(historyEntry);
if (candidate !== undefined) {
removeFromArray(this._entries, candidate);
}
// Insert the entry at the beginning
this._entries.unshift(historyEntry);
// Insert the entry at the beginning
this._entries.unshift(historyEntry);
if (this._entries.length > MAX_HISTORY_ENTRY_COUNT) {
this._entries.pop();
}
if (this._entries.length > MAX_HISTORY_ENTRY_COUNT) {
this._entries.pop();
}
this.updateSuccessCount(historyEntry);
}
this.updateSuccessCount(historyEntry);
}
public addSyncHistoryUpdateListener(
listener: (stats: HistoryStats) => unknown
): void {
this.syncHistoryUpdateListeners.push(listener);
listener({ ...this.status });
}
public removeSyncHistoryUpdateListener(
listener: (stats: HistoryStats) => unknown
): void {
removeFromArray(this.syncHistoryUpdateListeners, listener);
}
public reset(): void {
this._entries.length = 0;
this.status = {
success: 0,
error: 0
};
this.syncHistoryUpdateListeners.forEach((listener) => {
listener(this.status);
});
}
public reset(): void {
this._entries.length = 0;
this.status = {
success: 0,
error: 0
};
this.onHistoryUpdated.trigger(this.status);
}
private findSimilarRecentUpdateEntry(
entry: HistoryEntry
): HistoryEntry | undefined {
if (entry.details.type !== SyncType.UPDATE) {
return;
}
private findSimilarRecentUpdateEntry(
entry: HistoryEntry
): HistoryEntry | undefined {
if (entry.details.type !== SyncType.UPDATE) {
return;
}
const candidate = this._entries.find(
(e) =>
e.details.type === SyncType.UPDATE &&
e.details.relativePath === entry.details.relativePath
);
if (
candidate !== undefined &&
(this._entries[0] === candidate ||
candidate.timestamp.getTime() +
TIMEOUT_FOR_MERGING_HISTORY_ENTRIES_IN_SECONDS * 1000 >
entry.timestamp.getTime())
) {
return candidate;
}
}
const candidate = this._entries.find(
(e) =>
e.details.type === SyncType.UPDATE &&
e.details.relativePath === entry.details.relativePath
);
if (
candidate !== undefined &&
(this._entries[0] === candidate ||
candidate.timestamp.getTime() +
TIMEOUT_FOR_MERGING_HISTORY_ENTRIES_IN_SECONDS * 1000 >
entry.timestamp.getTime())
) {
return candidate;
}
}
private updateSuccessCount(entry: HistoryEntry): void {
const message = `${entry.details.relativePath} - ${entry.message} (${entry.details.type.toLocaleLowerCase()})`;
switch (entry.status) {
case SyncStatus.SUCCESS:
this.status.success++;
this.logger.info(`History entry: ${message}`);
break;
case SyncStatus.ERROR:
this.status.error++;
this.logger.error(`Cannot sync file: ${message}`);
break;
case SyncStatus.SKIPPED:
this.logger.warn(`Skipping file: ${message}`);
break;
}
private updateSuccessCount(entry: HistoryEntry): void {
const message = `${entry.details.relativePath} - ${entry.message} (${entry.details.type.toLocaleLowerCase()})`;
switch (entry.status) {
case SyncStatus.SUCCESS:
this.status.success++;
this.logger.info(`History entry: ${message}`);
break;
case SyncStatus.ERROR:
this.status.error++;
this.logger.error(`Cannot sync file: ${message}`);
break;
case SyncStatus.SKIPPED:
this.logger.warn(`Skipping file: ${message}`);
break;
}
this.syncHistoryUpdateListeners.forEach((listener) => {
listener(this.status);
});
}
this.onHistoryUpdated.trigger(this.status);
}
}

View file

@ -0,0 +1,147 @@
import { describe, it } from "node:test";
import assert from "node:assert";
import { EventListeners } from "./event-listeners";
describe("EventListeners", () => {
it("should add & remove listeners", () => {
const listeners = new EventListeners<() => void>();
const listener = () => { };
listeners.add(listener);
assert.strictEqual(listeners.count, 1);
const removed = listeners.remove(listener);
assert.strictEqual(removed, true);
assert.strictEqual(listeners.count, 0);
});
it("should remove listeners using unsubscribe function", () => {
const listeners = new EventListeners<() => void>();
const listener = () => { };
const unsubscribe = listeners.add(listener);
unsubscribe();
assert.strictEqual(listeners.count, 0);
});
it("should return false when removing non-existent listener", () => {
const listeners = new EventListeners<() => void>();
const listener = () => { };
const removed = listeners.remove(listener);
assert.strictEqual(removed, false);
});
it("should handle multiple listeners", () => {
const listeners = new EventListeners<() => void>();
const listener1 = () => { };
const listener2 = () => { };
const listener3 = () => { };
listeners.add(listener1);
listeners.add(listener2);
listeners.add(listener3);
assert.strictEqual(listeners.count, 3);
listeners.remove(listener2);
assert.strictEqual(listeners.count, 2);
});
it("should trigger all listeners synchronously", () => {
const listeners = new EventListeners<(value: string) => void>();
const calls: string[] = [];
listeners.add((value) => calls.push(`listener1-${value}`));
listeners.add((value) => calls.push(`listener2-${value}`));
listeners.trigger("test");
assert.deepStrictEqual(calls, ["listener1-test", "listener2-test"]);
});
it("should trigger listeners with multiple arguments", () => {
const listeners = new EventListeners<
(a: number, b: string, c: boolean) => void
>();
const calls: [number, string, boolean][] = [];
listeners.add((a, b, c) => calls.push([a, b, c]));
listeners.trigger(42, "hello", true);
assert.deepStrictEqual(calls, [[42, "hello", true]]);
});
it("should not trigger removed listeners", () => {
const listeners = new EventListeners<() => void>();
let count1 = 0;
let count2 = 0;
const listener1 = () => {
count1++;
};
const listener2 = () => {
count2++;
};
listeners.add(listener1);
const unsubscribe = listeners.add(listener2);
unsubscribe();
listeners.trigger();
assert.strictEqual(count1, 1);
assert.strictEqual(count2, 0);
});
it("should trigger all listeners and await promises", async () => {
const listeners = new EventListeners<
(value: string) => Promise<void> | void
>();
const results: string[] = [];
listeners.add(async (value) => {
await new Promise((resolve) => setTimeout(resolve, 10));
results.push(`async1-${value}`);
});
listeners.add((value) => {
results.push(`sync-${value}`);
});
listeners.add(async (value) => {
await new Promise((resolve) => setTimeout(resolve, 5));
results.push(`async2-${value}`);
});
await listeners.triggerAsync("test");
assert.ok(results.includes("async1-test"));
assert.ok(results.includes("sync-test"));
assert.ok(results.includes("async2-test"));
assert.strictEqual(results.length, 3);
});
it("should not trigger cleared listeners", () => {
const listeners = new EventListeners<() => void>();
let called = false;
const listener = () => {
called = true;
};
listeners.add(listener);
listeners.clear();
assert.strictEqual(listeners.count, 0);
listeners.trigger();
assert.strictEqual(called, false);
});
});

View file

@ -0,0 +1,71 @@
import { removeFromArray } from "../remove-from-array";
import { awaitAll } from "../await-all";
/**
* A utility class for managing event listeners with type-safe add/remove operations.
*/
export class EventListeners<TListener extends (...args: any[]) => any> {
private readonly listeners: TListener[] = [];
/**
* Adds a new listener to the collection.
*
* @param listener The listener callback to add
* @returns An unsubscribe function that removes this listener when called
*/
public add(listener: TListener): () => void {
this.listeners.push(listener);
return () => this.remove(listener);
}
/**
* Removes a listener from the collection.
*
* @param listener The listener callback to remove
* @returns true if the listener was found and removed, false otherwise
*/
public remove(listener: TListener): boolean {
return removeFromArray(this.listeners, listener);
}
/**
* Triggers all listeners synchronously with the provided arguments.
* Any returned promises are ignored. Use triggerAsync() to await them.
*
* @param args The arguments to pass to each listener
*/
public trigger(...args: Parameters<TListener>): void {
this.listeners.forEach((listener) => {
listener(...args);
});
}
/**
* Triggers all listeners and awaits any promises they return.
* Synchronous listeners are called immediately, and any async listeners
* are awaited in parallel.
*
* @param args The arguments to pass to each listener
*/
public async triggerAsync(...args: Parameters<TListener>): Promise<void> {
await awaitAll(
this.listeners
.map((listener) => {
return listener(...args);
})
.filter((result): result is Promise<unknown> => {
return result instanceof Promise;
})
);
}
public clear(): void {
this.listeners.length = 0;
}
public get count(): number {
return this.listeners.length;
}
}

View file

@ -3,7 +3,7 @@ import type { LogLine } from "../../tracing/logger";
import { LogLevel } from "../../tracing/logger";
export function logToConsole(client: SyncClient): void {
client.logger.addOnMessageListener((logLine: LogLine) => {
client.logger.onLogEmitted.add((logLine: LogLine) => {
const formatted = `${logLine.timestamp.toISOString()} ${logLine.level} ${logLine.message}`;
switch (logLine.level) {