This commit is contained in:
Andras Schmelczer 2025-10-18 15:36:49 +01:00
parent 0c42c23669
commit 352c71af65
185 changed files with 20165 additions and 0 deletions

View file

@ -0,0 +1,33 @@
{
"name": "sync-client",
"version": "0.8.0",
"main": "dist/sync-client.node.js",
"browser": "dist/sync-client.web.js",
"types": "dist/types/index.d.ts",
"files": [
"dist/**/*"
],
"scripts": {
"dev": "webpack watch --mode development",
"build": "webpack --mode production",
"test": "tsx --test src/**/*.test.ts"
},
"dependencies": {
"byte-base64": "^1.1.0",
"minimatch": "^10.0.1",
"p-queue": "^8.1.0",
"reconcile-text": "^0.5.0",
"uuid": "^11.1.0"
},
"devDependencies": {
"@types/node": "^22.15.30",
"ts-loader": "^9.5.2",
"tslib": "2.8.1",
"tsx": "^4.20.5",
"typescript": "5.8.3",
"webpack": "^5.99.9",
"webpack-cli": "^6.0.1",
"webpack-merge": "^6.0.1",
"ws": "^8.18.3"
}
}

View file

@ -0,0 +1,24 @@
import type { SyncClient } from "../sync-client";
import type { LogLine } from "../tracing/logger";
import { LogLevel } from "../tracing/logger";
export function logToConsole(client: SyncClient): void {
client.logger.addOnMessageListener((logLine: LogLine) => {
const formatted = `${logLine.timestamp.toISOString()} ${logLine.level} ${logLine.message}`;
switch (logLine.level) {
case LogLevel.ERROR:
console.error(formatted);
break;
case LogLevel.WARNING:
console.warn(formatted);
break;
case LogLevel.INFO:
console.info(formatted);
break;
case LogLevel.DEBUG:
console.debug(formatted);
break;
}
});
}

View file

@ -0,0 +1,16 @@
import { sleep } from "../utils/sleep";
export const slowFetchFactory =
(jitterScaleInSeconds: number) =>
async (
input: string | URL | globalThis.Request,
init?: RequestInit
): Promise<Response> => {
if (jitterScaleInSeconds > 0) {
await sleep(Math.random() * jitterScaleInSeconds * 1000);
}
const response = await fetch(input, init);
return response;
};

View file

@ -0,0 +1,81 @@
import { sleep } from "../utils/sleep";
import { Locks } from "../utils/locks";
import type { Logger } from "../tracing/logger";
export function slowWebSocketFactory(
jitterScaleInSeconds: number,
logger: Logger
): typeof WebSocket {
// eslint-disable-next-line
return class FlakyWebSocket extends WebSocket {
private static readonly RECEIVE_KEY = "websocket-receive";
private static readonly SEND_KEY = "websocket-send";
private readonly locks = new Locks(logger);
public set onopen(callback: (event: Event) => void) {
super.onopen = async (event: Event): Promise<void> => {
if (jitterScaleInSeconds > 0) {
await sleep(Math.random() * jitterScaleInSeconds * 1000);
}
callback(event);
};
}
public set onmessage(callback: (event: MessageEvent) => void) {
super.onmessage = async (event: MessageEvent): Promise<void> => {
await this.locks.withLock(
FlakyWebSocket.RECEIVE_KEY,
async () => {
if (jitterScaleInSeconds > 0) {
await sleep(
Math.random() * jitterScaleInSeconds * 1000
);
}
callback(event);
}
);
};
}
public set onclose(callback: (event: CloseEvent) => void) {
super.onclose = async (event: CloseEvent): Promise<void> => {
if (jitterScaleInSeconds > 0) {
await sleep(Math.random() * jitterScaleInSeconds * 1000);
}
callback(event);
};
}
public set onerror(callback: (event: Event) => void) {
super.onerror = async (event: Event): Promise<void> => {
if (jitterScaleInSeconds > 0) {
await sleep(Math.random() * jitterScaleInSeconds * 1000);
}
callback(event);
};
}
public send(
data: string | ArrayBufferLike | Blob | ArrayBufferView
): void {
this.waitingSend(data).catch((error: unknown) => {
logger.error(`Error sending WebSocket message: ${error}`);
});
}
private async waitingSend(
data: string | ArrayBufferLike | Blob | ArrayBufferView
): Promise<void> {
// maintain message order
await this.locks.withLock(FlakyWebSocket.SEND_KEY, async () => {
if (jitterScaleInSeconds > 0) {
await sleep(Math.random() * jitterScaleInSeconds * 1000);
}
super.send(data);
});
}
} as unknown as typeof WebSocket;
}

View file

@ -0,0 +1,6 @@
export class FileNotFoundError extends Error {
public constructor(message: string) {
super(message);
this.name = "FileNotFoundError";
}
}

View file

@ -0,0 +1,160 @@
import { describe, it } from "node:test";
import type {
Database,
DocumentRecord,
RelativePath
} from "../persistence/database";
import { FileOperations } from "./file-operations";
import { Logger } from "../tracing/logger";
import { assertSetContainsExactly } from "../utils/assert-set-contains-exactly";
import type { FileSystemOperations } from "./filesystem-operations";
import type { TextWithCursors } from "reconcile-text";
class MockDatabase implements Partial<Database> {
public getLatestDocumentByRelativePath(
_find: RelativePath
): DocumentRecord | undefined {
// no-op
return undefined;
}
public move(
_oldRelativePath: RelativePath,
_newRelativePath: RelativePath
): void {
// no-op
}
}
class FakeFileSystemOperations implements FileSystemOperations {
public readonly names = new Set<string>();
public async listAllFiles(): Promise<RelativePath[]> {
throw new Error("Method not implemented.");
}
public async read(_path: RelativePath): Promise<Uint8Array> {
throw new Error("Method not implemented.");
}
public async write(
path: RelativePath,
_content: Uint8Array
): Promise<void> {
this.names.add(path);
}
public async atomicUpdateText(
_path: RelativePath,
_updater: (current: TextWithCursors) => TextWithCursors
): Promise<string> {
throw new Error("Method not implemented.");
}
public async getFileSize(_path: RelativePath): Promise<number> {
throw new Error("Method not implemented.");
}
public async getModificationTime(_path: RelativePath): Promise<Date> {
throw new Error("Method not implemented.");
}
public async exists(path: RelativePath): Promise<boolean> {
return this.names.has(path);
}
public async createDirectory(_path: RelativePath): Promise<void> {
// this is called but irrelevant for this mock
}
public async delete(_path: RelativePath): Promise<void> {
throw new Error("Method not implemented.");
}
public async rename(
oldPath: RelativePath,
newPath: RelativePath
): Promise<void> {
this.names.delete(oldPath);
this.names.add(newPath);
}
}
describe("File operations", () => {
it("should deconflict renames", async () => {
const fileSystemOperations = new FakeFileSystemOperations();
const fileOperations = new FileOperations(
new Logger(),
new MockDatabase() as Database, // eslint-disable-line @typescript-eslint/no-unsafe-type-assertion
fileSystemOperations
);
await fileOperations.create("a", new Uint8Array());
assertSetContainsExactly(fileSystemOperations.names, "a");
await fileOperations.move("a", "b");
assertSetContainsExactly(fileSystemOperations.names, "b");
await fileOperations.create("c", new Uint8Array());
assertSetContainsExactly(fileSystemOperations.names, "b", "c");
await fileOperations.move("c", "b");
assertSetContainsExactly(fileSystemOperations.names, "b", "b (1)");
await fileOperations.create("c", new Uint8Array());
await fileOperations.move("c", "b");
assertSetContainsExactly(
fileSystemOperations.names,
"b",
"b (1)",
"b (2)"
);
});
it("should deconflict renames with file extension", async () => {
const fileSystemOperations = new FakeFileSystemOperations();
const fileOperations = new FileOperations(
new Logger(),
new MockDatabase() as Database, // eslint-disable-line @typescript-eslint/no-unsafe-type-assertion
fileSystemOperations
);
await fileOperations.create("b.md", new Uint8Array());
await fileOperations.create("c.md", new Uint8Array());
await fileOperations.move("c.md", "b.md");
assertSetContainsExactly(
fileSystemOperations.names,
"b.md",
"b (1).md"
);
await fileOperations.create("d.md", new Uint8Array());
await fileOperations.move("d.md", "b.md");
assertSetContainsExactly(
fileSystemOperations.names,
"b.md",
"b (1).md",
"b (2).md"
);
await fileOperations.create("file-23.md", new Uint8Array());
await fileOperations.create("file-23 (1).md", new Uint8Array());
await fileOperations.move("file-23.md", "file-23 (1).md");
assertSetContainsExactly(
fileSystemOperations.names,
"b.md",
"b (1).md",
"b (2).md",
"file-23 (1).md",
"file-23 (2).md"
);
});
it("should deconflict renames with paths", async () => {
const fileSystemOperations = new FakeFileSystemOperations();
const fileOperations = new FileOperations(
new Logger(),
new MockDatabase() as Database, // eslint-disable-line @typescript-eslint/no-unsafe-type-assertion
fileSystemOperations
);
await fileOperations.create("a/b.c/d", new Uint8Array());
await fileOperations.create("a/b.c/e", new Uint8Array());
await fileOperations.move("a/b.c/d", "a/b.c/e");
assertSetContainsExactly(
fileSystemOperations.names,
"a/b.c/e",
"a/b.c/e (1)"
);
});
});

View file

@ -0,0 +1,215 @@
import type { Logger } from "../tracing/logger";
import type { FileSystemOperations } from "./filesystem-operations";
import type { Database, RelativePath } from "../persistence/database";
import { SafeFileSystemOperations } from "./safe-filesystem-operations";
import type { TextWithCursors } from "reconcile-text";
import { isBinary, reconcile } from "reconcile-text";
import { isFileTypeMergable } from "../utils/is-file-type-mergable";
export class FileOperations {
private static readonly PARENTHESES_REGEX = / \((\d+)\)$/;
private readonly fs: SafeFileSystemOperations;
public constructor(
private readonly logger: Logger,
private readonly database: Database,
fs: FileSystemOperations,
private readonly nativeLineEndings = "\n"
) {
this.fs = new SafeFileSystemOperations(fs, logger);
}
public async listAllFiles(): Promise<RelativePath[]> {
return this.fs.listAllFiles();
}
public async read(path: RelativePath): Promise<Uint8Array> {
return this.fromNativeLineEndings(await this.fs.read(path));
}
/**
* Create a file at the specified path.
*
* If a file with the same name already exists, it is moved before creating the new one.
* Parent directories are created if necessary.
*/
public async create(
path: RelativePath,
newContent: Uint8Array
): Promise<void> {
await this.ensureClearPath(path);
return this.fs.write(path, this.toNativeLineEndings(newContent));
}
public async ensureClearPath(path: RelativePath): Promise<void> {
if (await this.fs.exists(path)) {
const deconflictedPath = await this.deconflictPath(path);
this.logger.debug(
`Didn't expect ${path} to exist, deconflicting by moving it to '${deconflictedPath}'`
);
this.database.move(path, deconflictedPath);
await this.fs.rename(path, deconflictedPath);
} else {
await this.createParentDirectories(path);
}
}
/**
* Update the file at the given path.
*
* Performs a 3-way merge before writing if the file's content differs from `expectedContent`.
* Does not recreate the file if it no longer exists, returning an empty array instead.
*/
public async write(
path: RelativePath,
expectedContent: Uint8Array,
newContent: Uint8Array
): Promise<void> {
if (!(await this.fs.exists(path))) {
this.logger.debug(
`The caller assumed ${path} exists, but it no longer, so we wont recreate it`
);
return;
}
if (
!isFileTypeMergable(path) ||
isBinary(expectedContent) ||
isBinary(newContent)
) {
this.logger.debug(
`The expected content is not mergable, so we won't perform a 3-way merge, just overwrite it`
);
await this.fs.write(
path,
// `newContent` might not be binary so we still have to ensure the line endings are correct
this.toNativeLineEndings(newContent)
);
return;
}
const expectedText = new TextDecoder().decode(expectedContent); // this comes from a previous read which must only have \n line endings
const newText = new TextDecoder().decode(newContent); // this comes from the server which stores text with \n line endings
await this.fs.atomicUpdateText(
path,
({ text, cursors }: TextWithCursors): TextWithCursors => {
this.logger.debug(
`Performing a 3-way merge for ${path} with the expected content`
);
text = text.replace(this.nativeLineEndings, "\n");
const merged = reconcile(
expectedText,
{ text, cursors },
newText
);
const resultText = merged.text.replace(
"\n",
this.nativeLineEndings
);
return {
text: resultText,
cursors: merged.cursors
};
}
);
}
public async delete(path: RelativePath): Promise<void> {
if (await this.exists(path)) {
return this.fs.delete(path);
} else {
this.logger.debug(`No need to delete '${path}', it doesn't exist`);
}
}
public async getFileSize(path: RelativePath): Promise<number> {
return this.fs.getFileSize(path);
}
public async exists(path: RelativePath): Promise<boolean> {
return this.fs.exists(path);
}
public async move(
oldPath: RelativePath,
newPath: RelativePath
): Promise<void> {
if (oldPath === newPath) {
return;
}
await this.ensureClearPath(newPath);
this.database.move(oldPath, newPath);
await this.fs.rename(oldPath, newPath);
}
private fromNativeLineEndings(content: Uint8Array): Uint8Array {
if (isBinary(content)) {
return content;
}
const decoder = new TextDecoder("utf-8");
let text = decoder.decode(content);
text = text.replace(this.nativeLineEndings, "\n");
return new TextEncoder().encode(text);
}
private toNativeLineEndings(content: Uint8Array): Uint8Array {
if (isBinary(content)) {
return content;
}
const decoder = new TextDecoder("utf-8");
let text = decoder.decode(content);
text = text.replace("\n", this.nativeLineEndings);
return new TextEncoder().encode(text);
}
private async createParentDirectories(path: string): Promise<void> {
const components = path.split("/");
if (components.length === 1) {
return;
}
for (let i = 1; i < components.length; i++) {
const parentDir = components.slice(0, i).join("/");
if (!(await this.fs.exists(parentDir))) {
await this.fs.createDirectory(parentDir);
}
}
}
private async deconflictPath(path: RelativePath): Promise<RelativePath> {
const pathParts = path.split("/");
const fileName = pathParts.pop();
if (fileName == "" || fileName == null) {
throw new Error(`Path '${path}' cannot be empty`);
}
let directory = pathParts.join("/");
if (directory) {
directory += "/";
}
const nameParts = fileName.split(".");
const extension =
nameParts.length > 1 ? "." + nameParts[nameParts.length - 1] : "";
let stem = extension ? nameParts.slice(0, -1).join(".") : fileName;
let currentCount = Number.parseInt(
FileOperations.PARENTHESES_REGEX.exec(stem)?.groups?.[0] ?? "0"
);
stem = stem.replace(FileOperations.PARENTHESES_REGEX, "");
let newName = path;
do {
currentCount++;
newName = `${directory}${stem} (${currentCount})${extension}`;
} while (await this.fs.exists(newName));
return newName;
}
}

View file

@ -0,0 +1,35 @@
import type { RelativePath } from "../persistence/database";
import type { TextWithCursors } from "reconcile-text";
export interface FileSystemOperations {
// List all files that should be synced.
listAllFiles: () => Promise<RelativePath[]>;
// Read the content of a file.
read: (path: RelativePath) => Promise<Uint8Array>;
// Create or overwrite a file with the given content.
write: (path: RelativePath, content: Uint8Array) => Promise<void>;
// Atomically update the content of a text file.
atomicUpdateText: (
path: RelativePath,
updater: (current: TextWithCursors) => TextWithCursors
) => Promise<string>;
// Get the size of a file in bytes.
getFileSize: (path: RelativePath) => Promise<number>;
// Check if a file exists.
exists: (path: RelativePath) => Promise<boolean>;
// Create a directory at the specified path. All parent directories must already exist.
createDirectory: (path: RelativePath) => Promise<void>;
// Delete a file. It is expected that the path points to an existing file.
delete: (path: RelativePath) => Promise<void>;
// Rename a file. It is expected that the oldPath points to an existing file and the newPath does not exist.
rename: (oldPath: RelativePath, newPath: RelativePath) => Promise<void>;
}

View file

@ -0,0 +1,137 @@
import type { RelativePath } from "../persistence/database";
import type { FileSystemOperations } from "./filesystem-operations";
import type { Logger } from "../tracing/logger";
import { Locks } from "../utils/locks";
import { FileNotFoundError } from "./file-not-found-error";
import type { TextWithCursors } from "reconcile-text";
/**
* Decorates `FileSystemOperations` to replace errors with `FileNotFoundError`
* if the accessed file doesn't exist. It also ensures that there's at most a
* single request in-flight for any one file through the use of locks.
*/
export class SafeFileSystemOperations implements FileSystemOperations {
private readonly locks: Locks<RelativePath>;
public constructor(
private readonly fs: FileSystemOperations,
private readonly logger: Logger
) {
this.locks = new Locks(logger);
}
public async listAllFiles(): Promise<RelativePath[]> {
this.logger.debug("Listing all files");
const result = await this.fs.listAllFiles();
this.logger.debug(`Listed ${result.length} files`);
return result;
}
public async read(path: RelativePath): Promise<Uint8Array> {
this.logger.debug(`Reading file '${path}'`);
return this.safeOperation(
path,
async () =>
this.locks.withLock(path, async () => this.fs.read(path)),
"read"
);
}
public async write(path: RelativePath, content: Uint8Array): Promise<void> {
this.logger.debug(`Writing to file '${path}'`);
return this.locks.withLock(path, async () =>
this.fs.write(path, content)
);
}
public async atomicUpdateText(
path: RelativePath,
updater: (current: TextWithCursors) => TextWithCursors
): Promise<string> {
this.logger.debug(`Atomically updating file '${path}'`);
return this.safeOperation(
path,
async () =>
this.locks.withLock(path, async () =>
this.fs.atomicUpdateText(path, updater)
),
"atomicUpdateText"
);
}
public async getFileSize(path: RelativePath): Promise<number> {
// Logging this would be too noisy
return this.safeOperation(
path,
async () =>
this.locks.withLock(path, async () =>
this.fs.getFileSize(path)
),
"getFileSize"
);
}
public async exists(path: RelativePath): Promise<boolean> {
this.logger.debug(`Checking if file '${path}' exists`);
return this.locks.withLock(path, async () => this.fs.exists(path));
}
public async createDirectory(path: RelativePath): Promise<void> {
this.logger.debug(`Creating directory '${path}'`);
return this.locks.withLock(path, async () =>
this.fs.createDirectory(path)
);
}
public async delete(path: RelativePath): Promise<void> {
this.logger.debug(`Deleting file '${path}'`);
return this.locks.withLock(path, async () => this.fs.delete(path));
}
public async rename(
oldPath: RelativePath,
newPath: RelativePath
): Promise<void> {
this.logger.debug(`Renaming file '${oldPath}' to '${newPath}'`);
return this.safeOperation(
oldPath,
async () =>
this.locks.withLock([oldPath, newPath], async () =>
this.fs.rename(oldPath, newPath)
),
"rename"
);
}
/**
* Decorate an operation to ensure that the file exists before running it.
* If the operation fails, it will check if the file still exists and throw
* a FileNotFoundError if it doesn't.
*/
private async safeOperation<T>(
path: RelativePath,
operation: () => Promise<T>,
operationName: string
): Promise<T> {
if (!(await this.fs.exists(path))) {
throw new FileNotFoundError(
`File '${path}' not found before trying to ${operationName}`
);
}
try {
return await operation();
} catch (error) {
// Without locking the file, this isn't atomic, however, it's good enough in practice.
// This will only break if the file exists, gets deleted and then immediately
// recreated while `operation` is running.
if (await this.fs.exists(path)) {
throw error;
} else {
throw new FileNotFoundError(
`File '${path}' not found when trying to ${operationName}`
);
}
}
}
}

View file

@ -0,0 +1,42 @@
import { logToConsole } from "./debugging/log-to-console";
import { slowFetchFactory } from "./debugging/slow-fetch-factory";
import { slowWebSocketFactory } from "./debugging/slow-web-socket-factory";
import { getRandomColor } from "./utils/get-random-color";
import { lineAndColumnToPosition } from "./utils/line-and-column-to-position";
import { positionToLineAndColumn } from "./utils/position-to-line-and-column";
export {
SyncType,
SyncStatus,
type HistoryStats,
type HistoryEntry,
type SyncDetails,
type SyncCreateDetails,
type SyncUpdateDetails,
type SyncMovedDetails,
type SyncDeleteDetails
} from "./tracing/sync-history";
export { Logger, LogLevel, LogLine } from "./tracing/logger";
export { type SyncSettings, DEFAULT_SETTINGS } from "./persistence/settings";
export { rateLimit } from "./utils/rate-limit";
export type { RelativePath, StoredDatabase } from "./persistence/database";
export type { FileSystemOperations } from "./file-operations/filesystem-operations";
export type { PersistenceProvider } from "./persistence/persistence";
export type { CursorSpan } from "./services/types/CursorSpan";
export type { ClientCursors } from "./services/types/ClientCursors";
export type { NetworkConnectionStatus } from "./types/network-connection-status";
export type { MaybeOutdatedClientCursors } from "./types/maybe-outdated-client-cursors";
export { DocumentSyncStatus } from "./types/document-sync-status";
export { SyncClient } from "./sync-client";
export const debugging = {
slowFetchFactory,
slowWebSocketFactory,
logToConsole
};
export const utils = {
getRandomColor,
positionToLineAndColumn,
lineAndColumnToPosition
};

View file

@ -0,0 +1,360 @@
import type { Logger } from "../tracing/logger";
import { EMPTY_HASH } from "../utils/hash";
import { CoveredValues } from "../utils/min-covered";
export type VaultUpdateId = number;
export type DocumentId = string;
export type RelativePath = string;
export interface DocumentMetadata {
parentVersionId: VaultUpdateId;
hash: string;
remoteRelativePath?: RelativePath;
}
export interface StoredDocumentMetadata {
relativePath: RelativePath;
documentId: DocumentId;
parentVersionId: VaultUpdateId;
remoteRelativePath?: RelativePath;
hash: string;
}
export interface StoredDatabase {
documents: StoredDocumentMetadata[];
lastSeenUpdateId: VaultUpdateId | undefined;
hasInitialSyncCompleted: boolean;
}
/**
* Represents a document in the database.
*
* It is mutable and its content should always represent the latest
* state of the document on disk based on the update events we have seen.
*/
export interface DocumentRecord {
relativePath: RelativePath;
documentId: DocumentId;
metadata: DocumentMetadata | undefined;
isDeleted: boolean;
updates: Promise<unknown>[];
parallelVersion: number;
}
export class Database {
private documents: DocumentRecord[];
private lastSeenUpdateIds: CoveredValues;
private hasInitialSyncCompleted: boolean;
public constructor(
private readonly logger: Logger,
initialState: Partial<StoredDatabase> | undefined,
private readonly saveData: (data: StoredDatabase) => Promise<void>
) {
initialState ??= {};
this.documents =
initialState.documents?.map(
({ relativePath, documentId, ...metadata }) => ({
relativePath,
documentId,
metadata,
isDeleted: false,
updates: [],
parallelVersion: 0
})
) ?? [];
this.ensureConsistency();
this.logger.debug(`Loaded ${this.documents.length} documents`);
const { lastSeenUpdateId } = initialState;
this.logger.debug(`Loaded last seen update id: ${lastSeenUpdateId}`);
this.lastSeenUpdateIds = new CoveredValues(
Math.max(0, lastSeenUpdateId ?? 0) // the first updateId will be 1 which is the first integer after -1
);
this.hasInitialSyncCompleted =
initialState.hasInitialSyncCompleted ?? false;
this.logger.debug(
`Loaded hasInitialSyncCompleted: ${this.hasInitialSyncCompleted}`
);
}
public get length(): number {
return this.documents.length;
}
public get resolvedDocuments(): DocumentRecord[] {
const paths = new Map<string, DocumentRecord[]>();
this.documents
.filter(({ metadata }) => metadata !== undefined)
.forEach((record) =>
paths.set(record.relativePath, [
record,
...(paths.get(record.relativePath) ?? [])
])
);
return Array.from(paths.values()).map((records) => {
records.sort(
(a, b) => b.parallelVersion - a.parallelVersion // descending
);
if (
records.length > 1 &&
records.some((current, i) =>
i === 0
? false
: records[i - 1].parallelVersion ===
current.parallelVersion
)
) {
throw new Error(
`Multiple documents with the same parallel version and path at ${records[0].relativePath}`
);
}
return records[0];
});
}
public updateDocumentMetadata(
metadata: {
parentVersionId: VaultUpdateId;
hash: string;
remoteRelativePath: RelativePath;
},
toUpdate: DocumentRecord
): void {
if (!this.documents.includes(toUpdate)) {
throw new Error("Document not found in database");
}
toUpdate.metadata = metadata;
this.save();
}
public removeDocumentPromise(promise: Promise<unknown>): void {
const entry = this.documents.find(({ updates }) =>
updates.includes(promise)
);
if (entry === undefined) {
// This method should be idempotent and tolerant of
// stragglers calling it after the databse has been reset.
return;
}
entry.updates = entry.updates.filter((update) => update !== promise);
// No need to save as Promises don't get serialized
}
public removeDocument(find: DocumentRecord): void {
this.documents = this.documents.filter((document) => document !== find);
this.save();
}
public getLatestDocumentByRelativePath(
find: RelativePath
): DocumentRecord | undefined {
const candidates = this.documents.filter(
({ relativePath }) => relativePath === find
);
candidates.sort((a, b) => b.parallelVersion - a.parallelVersion); // descending
return candidates[0];
}
public async getResolvedDocumentByRelativePath(
relativePath: RelativePath,
promise: Promise<unknown>
): Promise<DocumentRecord> {
const entry = this.getLatestDocumentByRelativePath(relativePath);
if (entry === undefined) {
throw new Error(
`Document not found by relative path: ${relativePath}, ${JSON.stringify(
this.documents,
null,
2
)}`
);
}
const currentPromises = entry.updates;
entry.updates = [...currentPromises, promise];
await Promise.all(currentPromises);
return entry;
}
public createNewPendingDocument(
documentId: DocumentId,
relativePath: RelativePath,
promise: Promise<unknown>
): DocumentRecord {
const previousEntry =
this.getLatestDocumentByRelativePath(relativePath);
const entry = {
relativePath,
documentId,
metadata: undefined,
isDeleted: false,
updates: [promise],
parallelVersion:
previousEntry?.parallelVersion === undefined
? 0
: previousEntry.parallelVersion + 1
};
this.documents.push(entry);
this.save();
return entry;
}
public createNewEmptyDocument(
documentId: DocumentId,
parentVersionId: VaultUpdateId,
relativePath: RelativePath
): DocumentRecord {
const entry = {
relativePath,
documentId,
metadata: {
parentVersionId,
hash: EMPTY_HASH,
remoteRelativePath: relativePath
},
isDeleted: false,
updates: [],
parallelVersion: 0
};
this.documents.push(entry);
this.save();
return entry;
}
public getDocumentByDocumentId(
find: DocumentId
): DocumentRecord | undefined {
return this.documents.find(({ documentId }) => documentId === find);
}
public move(
oldRelativePath: RelativePath,
newRelativePath: RelativePath
): void {
const oldDocument =
this.getLatestDocumentByRelativePath(oldRelativePath);
if (oldDocument === undefined) {
return;
}
const newDocument =
this.getLatestDocumentByRelativePath(newRelativePath);
if (newDocument?.isDeleted === false) {
throw new Error(
`Document already exists at new location: ${newRelativePath}`
);
}
oldDocument.relativePath = newRelativePath;
// We're in a strange state where the target of the move has just got deleted,
// however, its metadata might already have a bunch of updates queued up for
// the document at the new location. We need to keep these updates.
oldDocument.parallelVersion =
newDocument !== undefined ? newDocument.parallelVersion + 1 : 0;
this.save();
}
public delete(relativePath: RelativePath): void {
const candidate = this.getLatestDocumentByRelativePath(relativePath);
if (candidate === undefined) {
throw new Error(
`Document not found by relative path: ${relativePath}`
);
}
candidate.isDeleted = true;
}
public getHasInitialSyncCompleted(): boolean {
return this.hasInitialSyncCompleted;
}
public setHasInitialSyncCompleted(value: boolean): void {
this.hasInitialSyncCompleted = value;
this.save();
}
public getLastSeenUpdateId(): VaultUpdateId {
return this.lastSeenUpdateIds.min;
}
public addSeenUpdateId(value: number): void {
const previousMin = this.lastSeenUpdateIds.min;
this.lastSeenUpdateIds.add(value);
if (previousMin !== this.lastSeenUpdateIds.min) {
this.save();
}
}
public setLastSeenUpdateId(value: number): void {
this.lastSeenUpdateIds.min = value;
this.save();
}
public reset(): void {
this.documents = [];
this.lastSeenUpdateIds = new CoveredValues(
0 // the first updateId will be 1 which is the first integer after -1
);
this.hasInitialSyncCompleted = false;
this.save();
}
private save(): void {
this.ensureConsistency();
void this.saveData({
documents: this.resolvedDocuments.map(
({ relativePath, documentId, metadata }) => ({
documentId,
relativePath,
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
...metadata! // `resolvedDocuments` only returns docs with metadata set
})
),
lastSeenUpdateId: this.lastSeenUpdateIds.min,
hasInitialSyncCompleted: this.hasInitialSyncCompleted
}).catch((error: unknown) => {
this.logger.error(`Error saving data: ${error}`);
});
}
private ensureConsistency(): void {
const idToPath = new Map<string, string[]>();
this.resolvedDocuments.forEach(({ relativePath, documentId }) => {
idToPath.set(documentId, [
...(idToPath.get(documentId) ?? []),
relativePath
]);
});
const duplicates = Array.from(idToPath.entries())
.filter(([_, paths]) => paths.length > 1)
.map(([id, paths]) => `${id} (${paths.join(", ")})`);
if (duplicates.length > 0) {
throw new Error(
"Document IDs are not unique, found duplicates: " +
duplicates.join("; ")
);
}
}
}

View file

@ -0,0 +1,4 @@
export interface PersistenceProvider<T> {
load: () => Promise<T | undefined>;
save: (data: T) => Promise<void>;
}

View file

@ -0,0 +1,84 @@
import type { Logger } from "../tracing/logger";
export interface SyncSettings {
remoteUri: string;
token: string;
vaultName: string;
syncConcurrency: number;
isSyncEnabled: boolean;
maxFileSizeMB: number;
ignorePatterns: string[];
webSocketRetryIntervalMs: number;
}
export const DEFAULT_SETTINGS: SyncSettings = {
remoteUri: "",
token: "",
vaultName: "default",
syncConcurrency: 1,
isSyncEnabled: false,
maxFileSizeMB: 10,
ignorePatterns: [],
webSocketRetryIntervalMs: 3500
};
export class Settings {
private settings: SyncSettings;
private readonly onSettingsChangeHandlers: ((
newSettings: SyncSettings,
oldSettings: SyncSettings
) => unknown)[] = [];
public constructor(
private readonly logger: Logger,
initialState: Partial<SyncSettings> | undefined,
private readonly saveData: (data: SyncSettings) => Promise<void>
) {
this.settings = {
...DEFAULT_SETTINGS,
...(initialState ?? {})
};
this.logger.debug(
`Loaded settings: ${JSON.stringify(this.settings, null, 2)}`
);
}
public getSettings(): SyncSettings {
return this.settings;
}
public addOnSettingsChangeListener(
handler: (settings: SyncSettings, oldSettings: SyncSettings) => unknown
): void {
this.onSettingsChangeHandlers.push(handler);
}
public async setSetting<T extends keyof SyncSettings>(
key: T,
value: SyncSettings[T]
): Promise<void> {
this.logger.debug(`Setting '${key}' to '${value}'`);
await this.setSettings({
[key]: value
});
}
public async setSettings(value: Partial<SyncSettings>): Promise<void> {
const oldSettings = this.settings;
this.settings = {
...this.settings,
...value
};
this.onSettingsChangeHandlers.forEach((handler) => {
handler(this.settings, oldSettings);
});
await this.save();
}
private async save(): Promise<void> {
await this.saveData(this.settings);
}
}

View file

@ -0,0 +1,98 @@
import type { Settings } from "../persistence/settings";
import type { Logger } from "../tracing/logger";
import { createPromise } from "../utils/create-promise";
import { SyncResetError } from "./sync-reset-error";
export class ConnectionStatus {
private static readonly UNTIL_RESOLUTION = Symbol();
private canFetch: boolean;
private until: Promise<symbol>;
private resolveUntil: (result: symbol) => unknown;
private rejectUntil: (reason: unknown) => unknown;
public constructor(
settings: Settings,
private readonly logger: Logger
) {
this.canFetch = settings.getSettings().isSyncEnabled;
[this.until, this.resolveUntil, this.rejectUntil] =
createPromise<symbol>();
settings.addOnSettingsChangeListener((newSettings, oldSettings) => {
if (oldSettings.isSyncEnabled != newSettings.isSyncEnabled) {
this.canFetch = newSettings.isSyncEnabled;
this.resolveUntil(ConnectionStatus.UNTIL_RESOLUTION);
[this.until, this.resolveUntil, this.rejectUntil] =
createPromise<symbol>();
}
});
}
private static getUrlFromInput(input: RequestInfo | URL): string {
if (input instanceof URL) {
return input.href;
}
if (typeof input === "string") {
return input;
}
return input.url;
}
public startReset(): void {
this.rejectUntil(new SyncResetError());
}
public finishReset(): void {
[this.until, this.resolveUntil, this.rejectUntil] = createPromise();
}
public getFetchImplementation(
logger: Logger,
fetch: typeof globalThis.fetch = globalThis.fetch
): typeof globalThis.fetch {
return async (
input: RequestInfo | URL,
init?: RequestInit
): Promise<Response> => {
while (!this.canFetch) {
await this.until;
}
try {
// https://github.com/jonbern/fetch-retry/blob/8684ef4e688375f623bd76f13add76dbc1d67cfb/index.js#L67C1-L70C21
const _input =
typeof Request !== "undefined" && input instanceof Request
? input.clone()
: input;
const fetchPromise = fetch(_input, init);
// We only want to catch rejections from `this.until`
let result: symbol | Response | undefined = undefined;
do {
result = await Promise.race([this.until, fetchPromise]);
} while (result === ConnectionStatus.UNTIL_RESOLUTION);
const fetchResult: Response = result as Response; // eslint-disable-line @typescript-eslint/no-unsafe-type-assertion
if (!fetchResult.ok) {
this.logger.warn(
`Fetch for ${ConnectionStatus.getUrlFromInput(
input
)}, got status ${fetchResult.status}`
);
}
return fetchResult;
} catch (error) {
logger.warn(
`Fetch for ${ConnectionStatus.getUrlFromInput(
input
)}, got error: ${error}`
);
throw error;
}
};
}
}

View file

@ -0,0 +1,6 @@
export class SyncResetError extends Error {
public constructor() {
super("Sync was reset");
this.name = "SyncResetError";
}
}

View file

@ -0,0 +1,325 @@
import type {
DocumentId,
RelativePath,
VaultUpdateId
} from "../persistence/database";
import type { Logger } from "../tracing/logger";
import type { Settings } from "../persistence/settings";
import type { ConnectionStatus } from "./connection-status";
import { sleep } from "../utils/sleep";
import { SyncResetError } from "./sync-reset-error";
import type { SerializedError } from "./types/SerializedError";
import type { DocumentVersionWithoutContent } from "./types/DocumentVersionWithoutContent";
import type { DocumentUpdateResponse } from "./types/DocumentUpdateResponse";
import type { DocumentVersion } from "./types/DocumentVersion";
import type { FetchLatestDocumentsResponse } from "./types/FetchLatestDocumentsResponse";
import type { PingResponse } from "./types/PingResponse";
import type { DeleteDocumentVersion } from "./types/DeleteDocumentVersion";
export interface CheckConnectionResult {
isSuccessful: boolean;
message: string;
}
export class SyncService {
private static readonly NETWORK_RETRY_INTERVAL_MS = 1000;
private readonly client: typeof globalThis.fetch;
private readonly pingClient: typeof globalThis.fetch;
public constructor(
private readonly deviceId: string,
private readonly connectionStatus: ConnectionStatus,
private readonly settings: Settings,
private readonly logger: Logger,
fetchImplementation: typeof globalThis.fetch = globalThis.fetch
) {
// ensure that if it's called a method, `this` won't be bound to the instance
const unboundFetch: typeof globalThis.fetch = async (...args) =>
fetchImplementation(...args);
this.client = this.connectionStatus.getFetchImplementation(
this.logger,
unboundFetch
);
this.pingClient = unboundFetch;
}
private static formatError(error: SerializedError): string {
let result = error.message;
if (error.causes.length > 0) {
const causes = error.causes.join(", ");
result += ` caused by: ${causes}`;
}
return result;
}
public async create({
documentId,
relativePath,
contentBytes
}: {
documentId?: DocumentId;
relativePath: RelativePath;
contentBytes: Uint8Array;
}): Promise<DocumentVersionWithoutContent> {
return this.withRetries(async () => {
const formData = new FormData();
if (documentId !== undefined) {
formData.append("document_id", documentId);
}
formData.append("relative_path", relativePath);
formData.append(
"content",
new Blob([new Uint8Array(contentBytes)])
);
const response = await this.client(this.getUrl("/documents"), {
method: "POST",
body: formData,
headers: this.getDefaultHeaders()
});
const result: SerializedError | DocumentVersionWithoutContent =
(await response.json()) as // eslint-disable-line @typescript-eslint/no-unsafe-type-assertion
| SerializedError
| DocumentVersionWithoutContent;
if ("errorType" in result) {
throw new Error(
`Failed to create document: ${SyncService.formatError(result)}`
);
}
this.logger.debug(
`Created document ${JSON.stringify(result)} with id ${
result.documentId
}`
);
return result;
});
}
public async put({
parentVersionId,
documentId,
relativePath,
contentBytes
}: {
parentVersionId: VaultUpdateId;
documentId: DocumentId;
relativePath: RelativePath;
contentBytes: Uint8Array;
}): Promise<DocumentUpdateResponse> {
return this.withRetries(async () => {
this.logger.debug(
`Updating document ${documentId} with parent version ${parentVersionId} and relative path ${relativePath}`
);
const formData = new FormData();
formData.append("parent_version_id", parentVersionId.toString());
formData.append("relative_path", relativePath);
formData.append(
"content",
new Blob([new Uint8Array(contentBytes)])
);
const response = await this.client(
this.getUrl(`/documents/${documentId}`),
{
method: "PUT",
body: formData,
headers: this.getDefaultHeaders()
}
);
const result: SerializedError | DocumentUpdateResponse =
(await response.json()) as // eslint-disable-line @typescript-eslint/no-unsafe-type-assertion
| SerializedError
| DocumentUpdateResponse;
if ("errorType" in result) {
throw new Error(
`Failed to update document: ${SyncService.formatError(result)}`
);
}
this.logger.debug(
`Updated document ${JSON.stringify(result)} with id ${
result.documentId
}}`
);
return result;
});
}
public async delete({
documentId,
relativePath
}: {
documentId: DocumentId;
relativePath: RelativePath;
}): Promise<DocumentVersionWithoutContent> {
return this.withRetries(async () => {
const request: DeleteDocumentVersion = {
relativePath
};
const response = await this.client(
this.getUrl(`/documents/${documentId}`),
{
method: "DELETE",
body: JSON.stringify(request),
headers: {
"Content-Type": "application/json",
...this.getDefaultHeaders()
}
}
);
const result: SerializedError | DocumentVersionWithoutContent =
(await response.json()) as // eslint-disable-line @typescript-eslint/no-unsafe-type-assertion
| SerializedError
| DocumentVersionWithoutContent;
if ("errorType" in result) {
throw new Error(
`Failed to delete document: ${SyncService.formatError(result)}`
);
}
this.logger.debug(
`Deleted document ${relativePath} with id ${documentId}`
);
return result;
});
}
public async get({
documentId
}: {
documentId: DocumentId;
}): Promise<DocumentVersion> {
return this.withRetries(async () => {
const response = await this.client(
this.getUrl(`/documents/${documentId}`),
{
headers: this.getDefaultHeaders()
}
);
const result: SerializedError | DocumentVersion =
(await response.json()) as SerializedError | DocumentVersion; // eslint-disable-line @typescript-eslint/no-unsafe-type-assertion
if ("errorType" in result) {
throw new Error(
`Failed to get document: ${SyncService.formatError(result)}`
);
}
this.logger.debug(
`Get document ${result.relativePath} with id ${result.documentId}`
);
return result;
});
}
public async getAll(
since?: VaultUpdateId
): Promise<FetchLatestDocumentsResponse> {
return this.withRetries(async () => {
const url = new URL(this.getUrl("/documents"));
if (since !== undefined) {
url.searchParams.append("since", since.toString());
}
const response = await this.client(url.toString(), {
headers: this.getDefaultHeaders()
});
const result: SerializedError | FetchLatestDocumentsResponse =
(await response.json()) as // eslint-disable-line @typescript-eslint/no-unsafe-type-assertion
| SerializedError
| FetchLatestDocumentsResponse;
if ("errorType" in result) {
throw new Error(
`Failed to get documents: ${SyncService.formatError(result)}`
);
}
this.logger.debug(
`Got ${result.latestDocuments.length} document metadata`
);
return result;
});
}
public async checkConnection(): Promise<CheckConnectionResult> {
try {
const response = await this.pingClient(this.getUrl("/ping"), {
headers: this.getDefaultHeaders()
});
const result: PingResponse | SerializedError =
(await response.json()) as PingResponse | SerializedError; // eslint-disable-line @typescript-eslint/no-unsafe-type-assertion
if ("errorType" in result) {
throw new Error(
`Failed to ping server: ${SyncService.formatError(result)}`
);
}
if (result.isAuthenticated) {
return {
isSuccessful: true,
message: `Successfully connected to server (version: ${result.serverVersion}) and authenticated`
};
}
return {
isSuccessful: false,
message: `Successfully connected to server (version: ${result.serverVersion}) but failed to authenticate`
};
} catch (e) {
return {
isSuccessful: false,
message: `Failed to connect to server: ${e}`
};
}
}
private getUrl(path: string): string {
const { vaultName, remoteUri } = this.settings.getSettings();
const safeRemoteUri = remoteUri.replace(/\/+$/, "");
return `${safeRemoteUri}/vaults/${vaultName}${path}`;
}
private getDefaultHeaders(): Record<string, string> {
return {
"device-id": this.deviceId,
authorization: `Bearer ${this.settings.getSettings().token}`
};
}
private async withRetries<T>(fn: () => Promise<T>): Promise<T> {
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
while (true) {
try {
return await fn();
} catch (e) {
// We must not retry errors coming from reset
if (e instanceof SyncResetError) {
throw e;
}
this.logger.error(
`Failed network call (${e}), retrying in ${SyncService.NETWORK_RETRY_INTERVAL_MS}ms`
);
await sleep(SyncService.NETWORK_RETRY_INTERVAL_MS);
}
}
}
}

View file

@ -0,0 +1,8 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { DocumentWithCursors } from "./DocumentWithCursors";
export interface ClientCursors {
userName: string;
deviceId: string;
documentsWithCursors: DocumentWithCursors[];
}

View file

@ -0,0 +1,13 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
export interface CreateDocumentVersion {
/**
* The client can decide the document id (if it wishes to) in order
* to help with syncing. If the client does not provide a document id,
* the server will generate one. If the client provides a document id
* it must not already exist in the database.
*/
document_id: string | null;
relative_path: string;
content: number[];
}

View file

@ -0,0 +1,6 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { DocumentWithCursors } from "./DocumentWithCursors";
export interface CursorPositionFromClient {
documentsWithCursors: DocumentWithCursors[];
}

View file

@ -0,0 +1,6 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { ClientCursors } from "./ClientCursors";
export interface CursorPositionFromServer {
clients: ClientCursors[];
}

View file

@ -0,0 +1,6 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
export interface CursorSpan {
start: number;
end: number;
}

View file

@ -0,0 +1,5 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
export interface DeleteDocumentVersion {
relativePath: string;
}

View file

@ -0,0 +1,10 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { DocumentVersion } from "./DocumentVersion";
import type { DocumentVersionWithoutContent } from "./DocumentVersionWithoutContent";
/**
* Response to an update document request.
*/
export type DocumentUpdateResponse =
| ({ type: "FastForwardUpdate" } & DocumentVersionWithoutContent)
| ({ type: "MergingUpdate" } & DocumentVersion);

View file

@ -0,0 +1,12 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
export interface DocumentVersion {
vaultUpdateId: number;
documentId: string;
relativePath: string;
updatedDate: string;
contentBase64: string;
isDeleted: boolean;
userId: string;
deviceId: string;
}

View file

@ -0,0 +1,12 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
export interface DocumentVersionWithoutContent {
vaultUpdateId: number;
documentId: string;
relativePath: string;
updatedDate: string;
isDeleted: boolean;
userId: string;
deviceId: string;
contentSize: number;
}

View file

@ -0,0 +1,9 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { CursorSpan } from "./CursorSpan";
export interface DocumentWithCursors {
vault_update_id: number | null;
document_id: string;
relative_path: string;
cursors: CursorSpan[];
}

View file

@ -0,0 +1,13 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { DocumentVersionWithoutContent } from "./DocumentVersionWithoutContent";
/**
* Response to a fetch latest documents request.
*/
export interface FetchLatestDocumentsResponse {
latestDocuments: DocumentVersionWithoutContent[];
/**
* The update ID of the latest document in the response.
*/
lastUpdateId: bigint;
}

View file

@ -0,0 +1,16 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
/**
* Response to a ping request.
*/
export interface PingResponse {
/**
* Semantic version of the server.
*/
serverVersion: string;
/**
* Whether the client is authenticated based on the sent Authorization
* header.
*/
isAuthenticated: boolean;
}

View file

@ -0,0 +1,7 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
export interface SerializedError {
errorType: string;
message: string;
causes: string[];
}

View file

@ -0,0 +1,7 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
export interface UpdateDocumentVersion {
parent_version_id: bigint;
relative_path: string;
content: number[];
}

View file

@ -0,0 +1,7 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { CursorPositionFromClient } from "./CursorPositionFromClient";
import type { WebSocketHandshake } from "./WebSocketHandshake";
export type WebSocketClientMessage =
| ({ type: "handshake" } & WebSocketHandshake)
| ({ type: "cursorPositions" } & CursorPositionFromClient);

View file

@ -0,0 +1,7 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
export interface WebSocketHandshake {
token: string;
deviceId: string;
lastSeenVaultUpdateId: number | null;
}

View file

@ -0,0 +1,7 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { CursorPositionFromServer } from "./CursorPositionFromServer";
import type { WebSocketVaultUpdate } from "./WebSocketVaultUpdate";
export type WebSocketServerMessage =
| ({ type: "vaultUpdate" } & WebSocketVaultUpdate)
| ({ type: "cursorPositions" } & CursorPositionFromServer);

View file

@ -0,0 +1,7 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { DocumentVersionWithoutContent } from "./DocumentVersionWithoutContent";
export interface WebSocketVaultUpdate {
documents: DocumentVersionWithoutContent[];
isInitialSync: boolean;
}

View file

@ -0,0 +1,199 @@
import type { Database } from "../persistence/database";
import type { Logger } from "../tracing/logger";
import type { Settings, SyncSettings } from "../persistence/settings";
import type { WebSocketServerMessage } from "./types/WebSocketServerMessage";
import type { Syncer } from "../sync-operations/syncer";
import type { WebSocketClientMessage } from "./types/WebSocketClientMessage";
import type { CursorPositionFromClient } from "./types/CursorPositionFromClient";
import type { ClientCursors } from "./types/ClientCursors";
export class WebSocketManager {
private readonly webSocketStatusChangeListeners: (() => unknown)[] = [];
private readonly remoteCursorsUpdateListeners: ((
cursors: ClientCursors[]
) => unknown)[] = [];
private webSocket: WebSocket | undefined;
private isStopped = true;
private _isFirstSyncCompleted = false;
private readonly webSocketFactoryImplementation: typeof globalThis.WebSocket;
public constructor(
private readonly deviceId: string,
private readonly logger: Logger,
private readonly database: Database,
private readonly settings: Settings,
private readonly syncer: Syncer,
webSocketImplementation?: typeof globalThis.WebSocket
) {
if (webSocketImplementation) {
this.webSocketFactoryImplementation = webSocketImplementation;
} else {
if (
typeof globalThis !== "undefined" &&
typeof globalThis.WebSocket === "undefined"
) {
// eslint-disable-next-line
this.webSocketFactoryImplementation = require("ws"); // polyfill for WebSocket in Node.js
} else {
this.webSocketFactoryImplementation = WebSocket;
}
}
settings.addOnSettingsChangeListener((newSettings, oldSettings) => {
if (
newSettings.remoteUri !== oldSettings.remoteUri ||
newSettings.vaultName !== oldSettings.vaultName ||
newSettings.token !== oldSettings.token
) {
this.initializeWebSocket(newSettings);
}
});
}
public get isWebSocketConnected(): boolean {
return (
this.webSocket?.readyState ===
this.webSocketFactoryImplementation.OPEN
);
}
public get isFirstSyncCompleted(): boolean {
return this._isFirstSyncCompleted;
}
public addWebSocketStatusChangeListener(listener: () => unknown): void {
this.webSocketStatusChangeListeners.push(listener);
}
public addRemoteCursorsUpdateListener(
listener: (cursors: ClientCursors[]) => unknown
): void {
this.remoteCursorsUpdateListeners.push(listener);
}
public start(): void {
this.isStopped = false;
this._isFirstSyncCompleted = false;
this.initializeWebSocket(this.settings.getSettings());
}
public stop(): void {
this.isStopped = true;
this.webSocket?.close(1000, "WebSocketManager has been stopped");
}
public updateLocalCursors(cursorPositions: CursorPositionFromClient): void {
if (!this.isWebSocketConnected) {
this.logger.warn(
"WebSocket is not connected, cannot send cursor positions"
);
return;
}
const message: WebSocketClientMessage = {
type: "cursorPositions",
...cursorPositions
};
this.webSocket?.send(JSON.stringify(message));
this.logger.debug(
`Sent cursor positions: ${JSON.stringify(cursorPositions)}`
);
}
private initializeWebSocket(settings: SyncSettings): void {
if (this.isStopped) {
return;
}
try {
this.webSocket?.close();
} catch (e) {
this.logger.warn(`Failed to close WebSocket: ${e}`);
}
const wsUri = new URL(settings.remoteUri);
wsUri.protocol = wsUri.protocol === "https" ? "wss" : "ws";
wsUri.pathname = `/vaults/${settings.vaultName}/ws`;
this.logger.info(`Connecting to WebSocket at ${wsUri.toString()}`);
this.webSocket = new this.webSocketFactoryImplementation(wsUri);
// The JS WebSocket API doesn't support setting headers, so we have to send the token as a message
this.webSocket.onopen = (): void => {
this.logger.info("WebSocket connection opened");
this.webSocketStatusChangeListeners.forEach((l) => l());
const message: WebSocketClientMessage = {
type: "handshake",
deviceId: this.deviceId,
token: settings.token,
lastSeenVaultUpdateId: this.database.getLastSeenUpdateId()
};
this.webSocket?.send(JSON.stringify(message));
};
this.webSocket.onmessage = async (event): Promise<void> => {
// eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion
const message = JSON.parse(event.data) as WebSocketServerMessage;
return this.handleWebSocketMessage(message);
};
this.webSocket.onclose = (event): void => {
this.logger.warn(
`WebSocket closed with code ${event.code} (${event.reason == "" ? "unknown reason" : event.reason})`
);
this.webSocketStatusChangeListeners.forEach((l) => l());
if (!this.isStopped) {
setTimeout(() => {
this.initializeWebSocket(this.settings.getSettings());
}, this.settings.getSettings().webSocketRetryIntervalMs);
}
};
}
private async handleWebSocketMessage(
message: WebSocketServerMessage
): Promise<void> {
if (message.type === "vaultUpdate") {
try {
await Promise.all(
message.documents.map(async (document) =>
this.syncer.syncRemotelyUpdatedFile(document)
)
);
if (message.isInitialSync && message.documents.length > 0) {
this.database.setLastSeenUpdateId(
message.documents
.map((document) => document.vaultUpdateId)
.reduce((a, b) => Math.max(a, b))
);
}
this._isFirstSyncCompleted = true;
} catch (e) {
this.logger.error(`Failed to sync remotely updated file: ${e}`);
}
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
} else if (message.type === "cursorPositions") {
this.logger.debug(
`Received cursor positions for ${JSON.stringify(message.clients)}`
);
this.remoteCursorsUpdateListeners.forEach((listener) => {
listener(
message.clients.filter(
(client) => client.deviceId !== this.deviceId
)
);
});
} else {
this.logger.warn(
`Received unknown message type: ${JSON.stringify(message)}`
);
}
}
}

View file

@ -0,0 +1,337 @@
import type { PersistenceProvider } from "./persistence/persistence";
import type { HistoryEntry, HistoryStats } from "./tracing/sync-history";
import { SyncHistory } from "./tracing/sync-history";
import { Logger } from "./tracing/logger";
import type { RelativePath, StoredDatabase } from "./persistence/database";
import { Database } from "./persistence/database";
import type { SyncSettings } from "./persistence/settings";
import { Settings } from "./persistence/settings";
import { SyncService } from "./services/sync-service";
import { Syncer } from "./sync-operations/syncer";
import type { FileSystemOperations } from "./file-operations/filesystem-operations";
import { FileOperations } from "./file-operations/file-operations";
import { ConnectionStatus } from "./services/connection-status";
import { UnrestrictedSyncer } from "./sync-operations/unrestricted-syncer";
import { rateLimit } from "./utils/rate-limit";
import type { NetworkConnectionStatus } from "./types/network-connection-status";
import { DocumentSyncStatus } from "./types/document-sync-status";
import { WebSocketManager } from "./services/websocket-manager";
import { createClientId } from "./utils/create-client-id";
import { CursorTracker } from "./sync-operations/cursor-tracker";
import type { CursorSpan } from "./services/types/CursorSpan";
import type { MaybeOutdatedClientCursors } from "./types/maybe-outdated-client-cursors";
import { FileChangeNotifier } from "./sync-operations/file-change-notifier";
export class SyncClient {
private static readonly MINIMUM_SAVE_INTERVAL_MS = 1000;
private hasStartedOfflineSync = false;
private hasFinishedOfflineSync = false;
// eslint-disable-next-line @typescript-eslint/max-params
private constructor(
private readonly history: SyncHistory,
private readonly settings: Settings,
private readonly database: Database,
private readonly syncer: Syncer,
private readonly syncService: SyncService,
private readonly webSocketManager: WebSocketManager,
private readonly _logger: Logger,
private readonly connectionStatus: ConnectionStatus,
private readonly cursorTracker: CursorTracker,
private readonly fileChangeNotifier: FileChangeNotifier
) {
this.settings.addOnSettingsChangeListener(
async (newSettings, oldSettings) => {
if (newSettings.vaultName !== oldSettings.vaultName) {
await this.reset();
}
if (newSettings.isSyncEnabled !== oldSettings.isSyncEnabled) {
if (newSettings.isSyncEnabled) {
await this.start();
} else {
this.stop();
}
}
}
);
}
public get logger(): Logger {
return this._logger;
}
public get documentCount(): number {
return this.database.length;
}
public static async create({
fs,
persistence,
fetch,
webSocket,
nativeLineEndings = "\n"
}: {
fs: FileSystemOperations;
persistence: PersistenceProvider<
Partial<{
settings: Partial<SyncSettings>;
database: Partial<StoredDatabase>;
}>
>;
fetch?: typeof globalThis.fetch;
webSocket?: typeof globalThis.WebSocket;
nativeLineEndings?: string;
}): Promise<SyncClient> {
const logger = new Logger();
const deviceId = createClientId();
logger.info(`Initialising SyncClient with client id ${deviceId}`);
const history = new SyncHistory(logger);
let state = (await persistence.load()) ?? {
settings: undefined,
database: undefined
};
const rateLimitedSave = rateLimit(
persistence.save,
SyncClient.MINIMUM_SAVE_INTERVAL_MS
);
const database = new Database(
logger,
state.database,
async (data): Promise<void> => {
state = { ...state, database: data };
await rateLimitedSave(state);
}
);
const settings = new Settings(
logger,
state.settings,
async (data): Promise<void> => {
state = { ...state, settings: data };
await rateLimitedSave(state);
}
);
const connectionStatus = new ConnectionStatus(settings, logger);
const syncService = new SyncService(
deviceId,
connectionStatus,
settings,
logger,
fetch
);
const fileOperations = new FileOperations(
logger,
database,
fs,
nativeLineEndings
);
const unrestrictedSyncer = new UnrestrictedSyncer(
logger,
database,
settings,
syncService,
fileOperations,
history
);
const syncer = new Syncer(
logger,
database,
settings,
syncService,
fileOperations,
unrestrictedSyncer
);
const webSocketManager = new WebSocketManager(
deviceId,
logger,
database,
settings,
syncer,
webSocket
);
const fileChangeNotifier = new FileChangeNotifier();
const cursorTracker = new CursorTracker(
database,
webSocketManager,
fileOperations,
fileChangeNotifier
);
const client = new SyncClient(
history,
settings,
database,
syncer,
syncService,
webSocketManager,
logger,
connectionStatus,
cursorTracker,
fileChangeNotifier
);
logger.info("SyncClient initialised");
return client;
}
public async checkConnection(): Promise<NetworkConnectionStatus> {
const server = await this.syncService.checkConnection();
return {
isSuccessful: server.isSuccessful,
serverMessage: server.message,
isWebSocketConnected: this.webSocketManager.isWebSocketConnected
};
}
public getHistoryEntries(): readonly HistoryEntry[] {
return this.history.entries;
}
public addSyncHistoryUpdateListener(
listener: (stats: HistoryStats) => unknown
): void {
this.history.addSyncHistoryUpdateListener(listener);
}
public async start(): Promise<void> {
if (!this.hasStartedOfflineSync) {
await this.syncer.scheduleSyncForOfflineChanges();
this.hasStartedOfflineSync = true;
}
this.hasFinishedOfflineSync = true;
this.webSocketManager.start();
}
public stop(): void {
this.hasFinishedOfflineSync = false;
this.webSocketManager.stop();
}
public async waitAndStop(): Promise<void> {
this.stop();
await this.syncer.waitUntilFinished();
}
/// Wait for the in-flight operations to finish, reset all tracking,
/// and the local database but retain the settings.
/// The SyncClient can be used again after calling this method.
public async reset(): Promise<void> {
this.stop();
this.connectionStatus.startReset();
await this.syncer.reset();
this.history.reset();
this.database.reset();
this._logger.reset();
this.connectionStatus.finishReset();
await this.start();
}
public getSettings(): SyncSettings {
return this.settings.getSettings();
}
public async setSetting<T extends keyof SyncSettings>(
key: T,
value: SyncSettings[T]
): Promise<void> {
await this.settings.setSetting(key, value);
}
public async setSettings(value: Partial<SyncSettings>): Promise<void> {
await this.settings.setSettings(value);
}
public addOnSettingsChangeListener(
handler: (settings: SyncSettings, oldSettings: SyncSettings) => unknown
): void {
this.settings.addOnSettingsChangeListener(handler);
}
public addRemainingSyncOperationsListener(
listener: (remainingOperations: number) => unknown
): void {
this.syncer.addRemainingOperationsListener(listener);
}
public addWebSocketStatusChangeListener(listener: () => unknown): void {
this.webSocketManager.addWebSocketStatusChangeListener(listener);
}
public async syncLocallyCreatedFile(
relativePath: RelativePath
): Promise<void> {
this.fileChangeNotifier.notifyOfFileChange(relativePath);
return this.syncer.syncLocallyCreatedFile(relativePath);
}
public async syncLocallyDeletedFile(
relativePath: RelativePath
): Promise<void> {
this.fileChangeNotifier.notifyOfFileChange(relativePath);
return this.syncer.syncLocallyDeletedFile(relativePath);
}
public async syncLocallyUpdatedFile({
oldPath,
relativePath
}: {
oldPath?: RelativePath;
relativePath: RelativePath;
}): Promise<void> {
this.fileChangeNotifier.notifyOfFileChange(relativePath);
return this.syncer.syncLocallyUpdatedFile({
oldPath,
relativePath
});
}
public getDocumentSyncingStatus(
relativePath: RelativePath
): DocumentSyncStatus {
if (!this.settings.getSettings().isSyncEnabled) {
return DocumentSyncStatus.SYNCING_IS_DISABLED;
}
if (
!this.webSocketManager.isFirstSyncCompleted ||
!this.hasFinishedOfflineSync
) {
return DocumentSyncStatus.SYNCING;
}
const document =
this.database.getLatestDocumentByRelativePath(relativePath);
if (document === undefined) {
return DocumentSyncStatus.SYNCING;
}
return document.updates.length > 0
? DocumentSyncStatus.SYNCING
: DocumentSyncStatus.UP_TO_DATE;
}
public async updateLocalCursors(
documentToCursors: Record<RelativePath, CursorSpan[]>
): Promise<void> {
await this.cursorTracker.sendLocalCursorsToServer(documentToCursors);
}
public addRemoteCursorsUpdateListener(
listener: (cursors: MaybeOutdatedClientCursors[]) => unknown
): void {
this.cursorTracker.addRemoteCursorsUpdateListener(listener);
}
}

View file

@ -0,0 +1,253 @@
import type { FileOperations } from "../file-operations/file-operations";
import type { Database, RelativePath } from "../persistence/database";
import type { ClientCursors } from "../services/types/ClientCursors";
import type { CursorSpan } from "../services/types/CursorSpan";
import type { DocumentWithCursors } from "../services/types/DocumentWithCursors";
import type { WebSocketManager } from "../services/websocket-manager";
import type { MaybeOutdatedClientCursors } from "../types/maybe-outdated-client-cursors";
import { DocumentUpToDateness } from "../types/document-up-to-dateness";
import { hash } from "../utils/hash";
import type { FileChangeNotifier } from "./file-change-notifier";
import { Lock } from "../utils/locks";
// Cursor positions are updated separately from documents. However, a given cursor position is only
// valid within a certain version of the document it belongs to. This class tracks previous and the latest
// known remote cursor positions, and for each document, tries to return the latest cursor positions that are
// not from the future.
export class CursorTracker {
private readonly updateLock = new Lock();
private knownRemoteCursors: (ClientCursors & {
upToDateness: DocumentUpToDateness;
})[] = [];
private lastLocalCursorState: DocumentWithCursors[] = [];
private lastLocalCursorStateWithoutDirtyDocuments: DocumentWithCursors[] =
[];
public constructor(
private readonly database: Database,
private readonly webSocketManager: WebSocketManager,
private readonly fileOperations: FileOperations,
private readonly fileChangeNotifier: FileChangeNotifier
) {
this.webSocketManager.addRemoteCursorsUpdateListener(
async (clientCursors) => {
await this.updateLock.withLock(async () => {
// The latest message will contain all active clients, so we can delete the ones
// from the local list which are no longer active.
const allIds = new Set(
clientCursors.map((c) => c.deviceId)
);
const updatedKnownRemoteCursors =
this.knownRemoteCursors.filter((c) =>
allIds.has(c.deviceId)
);
for (const cursor of clientCursors.filter((client) =>
client.documentsWithCursors.every(
(doc) => doc.vault_update_id != null
)
)) {
updatedKnownRemoteCursors.push({
...cursor,
upToDateness:
await this.getDocumentsUpToDateness(cursor)
});
}
this.knownRemoteCursors = updatedKnownRemoteCursors;
});
}
);
this.fileChangeNotifier.addFileChangeListener(async (relativePath) =>
this.updateLock.withLock(async () => {
for (const clientCursor of this.knownRemoteCursors) {
if (
clientCursor.documentsWithCursors.some(
(document) =>
document.relative_path === relativePath
)
) {
clientCursor.upToDateness =
await this.getDocumentsUpToDateness(clientCursor);
}
}
})
);
}
/// Update the local cursors for the given documents.
/// Can be called frequently as it only emits an event
/// if the state has actually changed.
public async sendLocalCursorsToServer(
documentToCursors: Record<RelativePath, CursorSpan[]>
): Promise<void> {
const documentsWithCursors: DocumentWithCursors[] = [];
for (const [relativePath, cursors] of Object.entries(
documentToCursors
)) {
const record =
this.database.getLatestDocumentByRelativePath(relativePath);
if (!record) {
continue; // Let's wait for the file to be created before sending cursors
}
if (!record.metadata) {
continue; // this is a new document, no need to sync the cursors
}
documentsWithCursors.push({
relative_path: relativePath,
document_id: record.documentId,
vault_update_id: record.metadata.parentVersionId,
cursors: cursors.map(({ start, end }) => ({
start: Math.min(start, end),
end: Math.max(start, end)
})) // the client might send directional selections
});
}
if (
JSON.stringify(this.lastLocalCursorState) ===
JSON.stringify(documentsWithCursors)
) {
// Caching step to avoid reading the edited files all the time
return;
}
this.lastLocalCursorState = documentsWithCursors;
for (const doc of documentsWithCursors) {
const readContent = await this.fileOperations.read(
doc.relative_path
);
const record = this.database.getLatestDocumentByRelativePath(
doc.relative_path
);
if (record?.metadata?.hash !== hash(readContent)) {
doc.vault_update_id = null;
}
}
if (
JSON.stringify(this.lastLocalCursorStateWithoutDirtyDocuments) ===
JSON.stringify(documentsWithCursors)
) {
return;
}
this.lastLocalCursorStateWithoutDirtyDocuments = documentsWithCursors;
this.webSocketManager.updateLocalCursors({ documentsWithCursors });
}
// The returned position may be accurate, if it matches the document version, or outdated, in which case
// the client has to heuristically guess it's current position based on the local edits.
public addRemoteCursorsUpdateListener(
listener: (cursors: MaybeOutdatedClientCursors[]) => unknown
): void {
// CursorTracker registers its own event listener in the constructor so it must have been called before this
this.webSocketManager.addRemoteCursorsUpdateListener(async () => {
await this.updateLock.withLock(() =>
listener(this.getRelevantAndPruneKnownClientCursors())
);
});
}
private getRelevantAndPruneKnownClientCursors(): MaybeOutdatedClientCursors[] {
const result: MaybeOutdatedClientCursors[] = [];
const included = new Set<string>();
const relevantCursors = [];
for (const clientCursors of [...this.knownRemoteCursors].reverse()) {
if (included.has(clientCursors.deviceId)) {
continue;
}
if (clientCursors.upToDateness == DocumentUpToDateness.Later) {
continue;
}
result.push({
...clientCursors,
isOutdated:
clientCursors.upToDateness == DocumentUpToDateness.Prior
});
included.add(clientCursors.deviceId);
relevantCursors.unshift(clientCursors); // to reverse order back to normal
}
this.knownRemoteCursors = relevantCursors;
return result;
}
// We store up-to-dateness on a per-client basis to simplify the implementation.
// An individual client won't have too many documents open at once, so this is a reasonable trade-off.
private async getDocumentsUpToDateness(
clientCursor: ClientCursors
): Promise<DocumentUpToDateness> {
const results = [];
for (const document of clientCursor.documentsWithCursors) {
results.push(await this.getDocumentUpToDateness(document));
}
if (
results.every((result) => result === DocumentUpToDateness.UpToDate)
) {
return DocumentUpToDateness.UpToDate;
}
if (
results.every(
(result) =>
result === DocumentUpToDateness.UpToDate ||
result === DocumentUpToDateness.Prior
)
) {
return DocumentUpToDateness.Prior;
}
return DocumentUpToDateness.Later;
}
private async getDocumentUpToDateness(
document: DocumentWithCursors
): Promise<DocumentUpToDateness> {
const record = this.database.getLatestDocumentByRelativePath(
document.relative_path
);
if (!record) {
// the document of the cursor must be from the future
return DocumentUpToDateness.Later;
}
if (
(record.metadata?.parentVersionId ?? 0) <
(document.vault_update_id ?? 0)
) {
return DocumentUpToDateness.Later;
} else if (
(document.vault_update_id ?? 0) <
(record.metadata?.parentVersionId ?? 0)
) {
// the document of the cursor must be from the past
return DocumentUpToDateness.Prior;
}
const currentContent = await this.fileOperations.read(
document.relative_path
);
return this.database.getLatestDocumentByRelativePath(
document.relative_path
)?.metadata?.hash === hash(currentContent)
? DocumentUpToDateness.UpToDate
: DocumentUpToDateness.Prior;
}
}

View file

@ -0,0 +1,15 @@
import type { RelativePath } from "../persistence/database";
export class FileChangeNotifier {
private readonly listeners: ((filePath: RelativePath) => unknown)[] = [];
public addFileChangeListener(
listener: (filePath: RelativePath) => unknown
): void {
this.listeners.push(listener);
}
public notifyOfFileChange(filePath: RelativePath): void {
this.listeners.forEach((listener) => listener(filePath));
}
}

View file

@ -0,0 +1,459 @@
import type {
Database,
DocumentId,
DocumentRecord,
RelativePath
} from "../persistence/database";
import type { SyncService } from "../services/sync-service";
import type { Logger } from "../tracing/logger";
import PQueue from "p-queue";
import { hash } from "../utils/hash";
import { v4 as uuidv4 } from "uuid";
import type { Settings } from "../persistence/settings";
import type { FileOperations } from "../file-operations/file-operations";
import { findMatchingFile } from "../utils/find-matching-file";
import type { UnrestrictedSyncer } from "./unrestricted-syncer";
import { createPromise } from "../utils/create-promise";
import { SyncResetError } from "../services/sync-reset-error";
import { Locks } from "../utils/locks";
import type { DocumentVersionWithoutContent } from "../services/types/DocumentVersionWithoutContent";
export class Syncer {
private readonly remoteDocumentsLock: Locks<DocumentId>;
private readonly remainingOperationsListeners: ((
remainingOperations: number
) => unknown)[] = [];
private readonly syncQueue: PQueue;
private runningScheduleSyncForOfflineChanges: Promise<void> | undefined;
public constructor(
private readonly logger: Logger,
private readonly database: Database,
settings: Settings,
private readonly syncService: SyncService,
private readonly operations: FileOperations,
private readonly internalSyncer: UnrestrictedSyncer
) {
this.syncQueue = new PQueue({
concurrency: settings.getSettings().syncConcurrency
});
this.remoteDocumentsLock = new Locks<DocumentId>(this.logger);
settings.addOnSettingsChangeListener((newSettings, oldSettings) => {
if (newSettings.syncConcurrency !== oldSettings.syncConcurrency) {
this.syncQueue.concurrency = newSettings.syncConcurrency;
}
});
this.syncQueue.on("active", () => {
this.remainingOperationsListeners.forEach((listener) => {
listener(this.syncQueue.size);
});
});
}
public addRemainingOperationsListener(
listener: (remainingOperations: number) => unknown
): void {
this.remainingOperationsListeners.push(listener);
}
public async syncLocallyCreatedFile(
relativePath: RelativePath
): Promise<void> {
if (
this.database.getLatestDocumentByRelativePath(relativePath)
?.isDeleted === false
) {
this.logger.debug(
`Document ${relativePath} already exists in the database, skipping`
);
return;
}
const [promise, resolve, reject] = createPromise();
const id = uuidv4();
const document = this.database.createNewPendingDocument(
id,
relativePath,
promise
);
this.logger.debug(
`Creating new pending document ${relativePath} with id ${id}`
);
try {
await this.syncQueue.add(async () =>
this.internalSyncer.unrestrictedSyncLocallyCreatedFile(document)
);
resolve();
} catch (e) {
reject(e);
} finally {
this.database.removeDocumentPromise(promise);
}
}
public async syncLocallyDeletedFile(
relativePath: RelativePath
): Promise<void> {
if (
this.database.getLatestDocumentByRelativePath(relativePath)
?.isDeleted === true
) {
// This is must be a consequence of us deleting a file because of a remote update
// which triggered a local delete, so we don't need to do anything here.
this.logger.debug(
`Document ${relativePath} has already been markes as deleted, skipping`
);
return;
}
// We have to have a record of the delete in case there's an in-flight update for the same
// document which finishes after the delete has succeeded and would introduce a phantom metadata record.
this.database.delete(relativePath);
const [promise, resolve, reject] = createPromise();
const document = await this.database.getResolvedDocumentByRelativePath(
relativePath,
promise
);
try {
await this.syncQueue.add(async () =>
this.internalSyncer.unrestrictedSyncLocallyDeletedFile(document)
);
resolve();
this.database.removeDocument(document);
} catch (e) {
reject(e);
} finally {
this.database.removeDocumentPromise(promise);
}
}
public async syncLocallyUpdatedFile({
oldPath,
relativePath
}: {
oldPath?: RelativePath;
relativePath: RelativePath;
}): Promise<void> {
if (oldPath !== undefined) {
// We might have moved the document in the database before calling this method,
// in that case, we mustn't move it again.
if (
this.database.getLatestDocumentByRelativePath(relativePath) ===
undefined ||
this.database.getLatestDocumentByRelativePath(relativePath)
?.isDeleted === true
) {
if (oldPath === relativePath) {
throw new Error(
`Old path and new path are the same: ${oldPath}`
);
}
this.database.move(oldPath, relativePath);
}
}
let document =
this.database.getLatestDocumentByRelativePath(relativePath);
if (
oldPath !== undefined &&
document?.metadata?.remoteRelativePath === relativePath
) {
this.logger.debug(
`Document ${relativePath} has been moved as a result of a remote update, skipping sync`
);
return;
}
if (document === undefined) {
this.logger.debug(
`Cannot find document ${relativePath} in the database, skipping`
);
return;
}
if (document.isDeleted) {
this.logger.debug(
`Document ${relativePath} has been deleted locally, skipping`
);
return;
}
const [promise, resolve, reject] = createPromise();
document = await this.database.getResolvedDocumentByRelativePath(
relativePath,
promise
);
try {
await this.syncQueue.add(async () =>
this.internalSyncer.unrestrictedSyncLocallyUpdatedFile({
oldPath,
document
})
);
resolve();
} catch (e) {
reject(e);
} finally {
this.database.removeDocumentPromise(promise);
}
}
public async scheduleSyncForOfflineChanges(): Promise<void> {
if (this.runningScheduleSyncForOfflineChanges !== undefined) {
this.logger.debug("Uploading local changes is already in progress");
return this.runningScheduleSyncForOfflineChanges;
}
try {
this.runningScheduleSyncForOfflineChanges =
this.internalScheduleSyncForOfflineChanges();
await this.runningScheduleSyncForOfflineChanges;
this.logger.info(`All local changes have been applied remotely`);
} catch (e) {
if (e instanceof SyncResetError) {
this.logger.info(
"Failed to apply local changes remotely due to a reset"
);
return;
}
this.logger.error(
`Not all local changes have been applied remotely: ${e}`
);
throw e;
} finally {
this.runningScheduleSyncForOfflineChanges = undefined;
}
}
public async waitUntilFinished(): Promise<void> {
await this.runningScheduleSyncForOfflineChanges;
return this.syncQueue.onEmpty();
}
public async reset(): Promise<void> {
await this.waitUntilFinished();
}
public async syncRemotelyUpdatedFile(
remoteVersion: DocumentVersionWithoutContent
): Promise<void> {
let document = this.database.getDocumentByDocumentId(
remoteVersion.documentId
);
if (document === undefined) {
// Let's avoid the same documents getting created in parallel multiple times.
// There might be multiple tasks waiting for the lock
return this.remoteDocumentsLock.withLock(
remoteVersion.documentId,
async () => {
document = this.database.getDocumentByDocumentId(
remoteVersion.documentId
);
// We're either the first one to get the lock, so we have to create the document in `unrestrictedSyncRemotelyUpdatedFile`
if (document === undefined) {
await this.syncQueue.add(async () =>
this.internalSyncer.unrestrictedSyncRemotelyUpdatedFile(
remoteVersion
)
);
} else {
const [promise, resolve, reject] = createPromise();
document =
await this.database.getResolvedDocumentByRelativePath(
document.relativePath,
promise
);
try {
await this.syncQueue.add(async () =>
this.internalSyncer.unrestrictedSyncRemotelyUpdatedFile(
remoteVersion,
document
)
);
resolve();
} catch (e) {
reject(e);
} finally {
this.database.removeDocumentPromise(promise);
}
}
this.database.addSeenUpdateId(remoteVersion.vaultUpdateId);
}
);
}
// We're either the first one to get the lock, so we have to create the document in `unrestrictedSyncRemotelyUpdatedFile`
const [promise, resolve, reject] = createPromise();
document = await this.database.getResolvedDocumentByRelativePath(
document.relativePath,
promise
);
try {
await this.syncQueue.add(async () =>
this.internalSyncer.unrestrictedSyncRemotelyUpdatedFile(
remoteVersion,
document
)
);
resolve();
} catch (e) {
reject(e);
} finally {
this.database.removeDocumentPromise(promise);
}
this.database.addSeenUpdateId(remoteVersion.vaultUpdateId);
}
private async internalScheduleSyncForOfflineChanges(): Promise<void> {
await this.createFakeDocumentsFromRemoteState();
const allLocalFiles = await this.operations.listAllFiles();
let locallyPossiblyDeletedFiles: DocumentRecord[] = [];
for (const document of this.database.resolvedDocuments) {
if (
!document.isDeleted &&
!(await this.operations.exists(document.relativePath))
) {
locallyPossiblyDeletedFiles.push(document);
}
}
const updates = Promise.all(
allLocalFiles.map(async (relativePath) => {
if (
this.database.getLatestDocumentByRelativePath(relativePath)
?.metadata !== undefined
) {
this.logger.debug(
`Document ${relativePath} might have been updated locally, scheduling sync to validate and update it`
);
return this.syncLocallyUpdatedFile({
relativePath
});
}
// Perhaps the file has been moved; let's check by looking at the deleted files
const contentHash = await this.syncQueue.add(async () => {
const contentBytes =
await this.operations.read(relativePath); // this can throw FileNotFoundError
return hash(contentBytes);
});
if (contentHash == undefined) {
// The file was deleted before we had a chance to read it, no need to sync it here
return;
}
const originalFile = findMatchingFile(
contentHash,
locallyPossiblyDeletedFiles
);
if (originalFile !== undefined) {
// `originalFile` hasn't been deleted but it got moved instead
locallyPossiblyDeletedFiles =
locallyPossiblyDeletedFiles.filter(
(item) =>
item.relativePath !== originalFile.relativePath
);
this.logger.debug(
`Document '${originalFile.relativePath}' was not found under its current path in the database but was found under a different path (${relativePath}), scheduling sync to move it`
);
// We're outside of the pqueue, so we need to call the public wrapper
return this.syncLocallyUpdatedFile({
oldPath: originalFile.relativePath,
relativePath
});
}
this.logger.debug(
`Document ${relativePath} not found in database, scheduling sync to create it`
);
// We're outside of the pqueue, so we need to call the public wrapper
return this.syncLocallyCreatedFile(relativePath);
})
);
const deletes = Promise.all(
locallyPossiblyDeletedFiles.map(async ({ relativePath }) => {
this.logger.debug(
`Document ${relativePath} has been deleted locally, scheduling sync to delete it`
);
// We're outside of the pqueue, so we need to call the public wrapper
return this.syncLocallyDeletedFile(relativePath);
})
);
await Promise.all([updates, deletes]);
}
/**
* Create fake documents in the database for all files that are present locally
* and also exist remotely. This will stop the subequent syncs from duplicating
* the documents by creating the same documents from multiple clients.
*/
private async createFakeDocumentsFromRemoteState(): Promise<void> {
if (this.database.getHasInitialSyncCompleted()) {
return;
}
const [allLocalFiles, remote] = await Promise.all([
this.operations.listAllFiles(),
this.syncQueue.add(async () => this.syncService.getAll())
]);
if (remote !== undefined) {
remote.latestDocuments
.filter(
(remoteDocument) =>
allLocalFiles.includes(remoteDocument.relativePath) &&
!remoteDocument.isDeleted &&
this.database.getDocumentByDocumentId(
remoteDocument.documentId
) === undefined
)
.forEach((remoteDocument) => {
this.database.createNewEmptyDocument(
remoteDocument.documentId,
remoteDocument.vaultUpdateId,
remoteDocument.relativePath
);
});
}
this.database.setHasInitialSyncCompleted(true);
}
}

View file

@ -0,0 +1,513 @@
import type {
Database,
DocumentRecord,
RelativePath
} from "../persistence/database";
import type { SyncService } from "../services/sync-service";
import type { Logger } from "../tracing/logger";
import type {
CommonHistoryEntry,
SyncCreateDetails,
SyncDeleteDetails,
SyncDetails,
SyncHistory,
SyncMovedDetails,
SyncUpdateDetails
} from "../tracing/sync-history";
import { SyncStatus, SyncType } from "../tracing/sync-history";
import { EMPTY_HASH, hash } from "../utils/hash";
import { deserialize } from "../utils/deserialize";
import type { Settings } from "../persistence/settings";
import type { FileOperations } from "../file-operations/file-operations";
import { createPromise } from "../utils/create-promise";
import { FileNotFoundError } from "../file-operations/file-not-found-error";
import { SyncResetError } from "../services/sync-reset-error";
import { globsToRegexes } from "../utils/globs-to-regexes";
import type { DocumentVersion } from "../services/types/DocumentVersion";
import type { DocumentUpdateResponse } from "../services/types/DocumentUpdateResponse";
import type { DocumentVersionWithoutContent } from "../services/types/DocumentVersionWithoutContent";
export class UnrestrictedSyncer {
private ignorePatterns: RegExp[];
public constructor(
private readonly logger: Logger,
private readonly database: Database,
private readonly settings: Settings,
private readonly syncService: SyncService,
private readonly operations: FileOperations,
private readonly history: SyncHistory
) {
this.ignorePatterns = globsToRegexes(
this.settings.getSettings().ignorePatterns,
this.logger
);
this.settings.addOnSettingsChangeListener((newSettings) => {
this.ignorePatterns = globsToRegexes(
newSettings.ignorePatterns,
this.logger
);
});
}
public async unrestrictedSyncLocallyCreatedFile(
document: DocumentRecord
): Promise<void> {
const updateDetails: SyncCreateDetails = {
type: SyncType.CREATE,
relativePath: document.relativePath
};
return this.executeSync(updateDetails, async () => {
if (document.isDeleted) {
this.logger.debug(
`Document ${document.relativePath} has been already deleted, no need to create it`
);
return;
}
const contentBytes = await this.operations.read(
document.relativePath
); // this can throw FileNotFoundError
const contentHash = hash(contentBytes);
const response = await this.syncService.create({
documentId: document.documentId,
relativePath: document.relativePath,
contentBytes
});
this.database.updateDocumentMetadata(
{
parentVersionId: response.vaultUpdateId,
hash: contentHash,
remoteRelativePath: response.relativePath
},
document
);
this.database.addSeenUpdateId(response.vaultUpdateId);
this.history.addHistoryEntry({
status: SyncStatus.SUCCESS,
details: updateDetails,
message: `Successfully uploaded locally created file`
});
});
}
public async unrestrictedSyncLocallyDeletedFile(
document: DocumentRecord
): Promise<void> {
const updateDetails: SyncDeleteDetails = {
type: SyncType.DELETE,
relativePath: document.relativePath
};
await this.executeSync(updateDetails, async () => {
const response = await this.syncService.delete({
documentId: document.documentId,
relativePath: document.relativePath
});
this.database.updateDocumentMetadata(
{
parentVersionId: response.vaultUpdateId,
hash: EMPTY_HASH,
remoteRelativePath: document.relativePath
},
document
);
this.database.addSeenUpdateId(response.vaultUpdateId);
this.history.addHistoryEntry({
status: SyncStatus.SUCCESS,
details: updateDetails,
message: `Successfully deleted locally deleted file on the server`,
author: response.userId
});
});
}
public async unrestrictedSyncLocallyUpdatedFile({
oldPath,
document,
// We use the same code path for both local and remote updates. We need to force the update
// if there are no local changes but we know that the remote version is newer.
force = false
}: {
oldPath?: RelativePath;
force?: boolean;
document: DocumentRecord;
}): Promise<void> {
const updateDetails: SyncUpdateDetails | SyncMovedDetails =
oldPath !== undefined
? {
type: SyncType.MOVE,
relativePath: document.relativePath,
movedFrom: oldPath
}
: {
type: SyncType.UPDATE,
relativePath: document.relativePath
};
await this.executeSync(updateDetails, async () => {
const originalRelativePath = document.relativePath;
if (document.isDeleted || document.metadata === undefined) {
this.logger.debug(
`Document ${document.relativePath} has been already deleted, no need to update it`
);
return;
}
const contentBytes = await this.operations.read(
document.relativePath
); // this can throw FileNotFoundError
let contentHash = hash(contentBytes);
const areThereLocalChanges = !(
document.metadata.hash === contentHash && oldPath === undefined
);
let response: DocumentVersion | DocumentUpdateResponse | undefined =
undefined;
if (areThereLocalChanges) {
response = await this.syncService.put({
documentId: document.documentId,
parentVersionId: document.metadata.parentVersionId,
relativePath: document.relativePath,
contentBytes
});
} else {
if (!force) {
this.logger.debug(
`File hash of ${document.relativePath} matches with last synced version and the path hasn't changed; no need to sync`
);
return;
}
response = await this.syncService.get({
documentId: document.documentId
});
}
// `document` is mutable and reflects the latest state in the local database
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
if (document.isDeleted) {
this.logger.info(
`Document ${document.relativePath} has been deleted before we could finish updating it`
);
this.database.addSeenUpdateId(response.vaultUpdateId);
return;
}
if (
// `Syncer` creates fake local document metadata for all remote docs with invalid hashes. The parent IDs will likely match
// the latest versions so we still need to update the local versions to turn the fakes into real metadata.
document.metadata.parentVersionId > response.vaultUpdateId
) {
this.logger.debug(
`Document ${document.relativePath} is already more up to date than the fetched version`
);
this.database.addSeenUpdateId(response.vaultUpdateId); // in case the previous `vaultUpdateId` update hasn't made it through
return;
}
if (response.isDeleted) {
this.history.addHistoryEntry({
status: SyncStatus.SUCCESS,
details: {
type: SyncType.DELETE,
relativePath: document.relativePath
},
message:
"File has been deleted remotely, so we deleted it locally",
author: response.userId
});
this.database.delete(document.relativePath);
this.database.updateDocumentMetadata(
{
parentVersionId: response.vaultUpdateId,
hash: EMPTY_HASH,
remoteRelativePath: response.relativePath
},
document
);
await this.operations.delete(document.relativePath);
this.database.addSeenUpdateId(response.vaultUpdateId);
return;
}
let actualPath = document.relativePath;
if (response.relativePath != originalRelativePath) {
actualPath = response.relativePath;
// Make sure to update the remote relative path to avoid uploading
// the file as a result of this filesystem event.
document.metadata.remoteRelativePath = response.relativePath;
await this.operations.move(
document.relativePath,
response.relativePath
); // this can throw FileNotFoundError
}
if (!("type" in response) || response.type === "MergingUpdate") {
const responseBytes = deserialize(response.contentBase64);
contentHash = hash(responseBytes);
this.database.updateDocumentMetadata(
{
parentVersionId: response.vaultUpdateId,
hash: contentHash,
remoteRelativePath: response.relativePath
},
document
);
await this.operations.write(
actualPath,
contentBytes,
responseBytes
);
if (!force) {
this.history.addHistoryEntry({
status: SyncStatus.SUCCESS,
details: updateDetails,
message: `The file we updated had been updated remotely, so we downloaded the merged version`
});
}
} else {
this.database.updateDocumentMetadata(
{
parentVersionId: response.vaultUpdateId,
hash: contentHash,
remoteRelativePath: response.relativePath
},
document
);
}
this.database.addSeenUpdateId(response.vaultUpdateId);
const actualUpdateDetails: SyncUpdateDetails | SyncMovedDetails =
oldPath !== undefined ||
response.relativePath != originalRelativePath
? {
type: SyncType.MOVE,
relativePath: response.relativePath,
movedFrom: originalRelativePath
}
: {
type: SyncType.UPDATE,
relativePath: response.relativePath
};
if (areThereLocalChanges) {
this.history.addHistoryEntry({
status: SyncStatus.SUCCESS,
details: actualUpdateDetails,
message: `Successfully uploaded locally updated file to the server`,
author: response.userId
});
} else {
this.history.addHistoryEntry({
status: SyncStatus.SUCCESS,
details: actualUpdateDetails,
message: `Successfully downloaded remotely updated file from the server`,
author: response.userId
});
}
});
}
public async unrestrictedSyncRemotelyUpdatedFile(
remoteVersion: DocumentVersionWithoutContent,
document?: DocumentRecord
): Promise<void> {
const updateDetails: SyncCreateDetails = {
type: SyncType.CREATE,
relativePath: remoteVersion.relativePath
};
await this.executeSync(updateDetails, async () => {
if (document?.metadata !== undefined) {
// If the file exists locally, let's pretend the user has updated it
// and deal with remote update/deletion within `unrestrictedSyncLocallyUpdatedFile`
if (
document.metadata.parentVersionId >=
remoteVersion.vaultUpdateId
) {
this.logger.debug(
`Document ${remoteVersion.relativePath} is already at least as up to date as the fetched version`
);
return;
}
return this.unrestrictedSyncLocallyUpdatedFile({
document,
force: true
});
} else if (remoteVersion.isDeleted) {
// Either the document hasn't made it to us before and therefore we don't need to delete it,
// or we already have it, in which case the preceeding if would've dealt with it
this.logger.debug(
`Document ${remoteVersion.relativePath} has been deleted remotely, no need to sync`
);
return;
}
// Don't download oversized files
const historyEntryForSkippedOversizedFile =
this.getHistoryEntryForSkippedOversizedFile(
remoteVersion.contentSize,
remoteVersion.relativePath
);
if (historyEntryForSkippedOversizedFile !== undefined) {
this.history.addHistoryEntry(
historyEntryForSkippedOversizedFile
);
return;
}
const content = (
await this.syncService.get({
documentId: remoteVersion.documentId
})
).contentBase64;
// We're trying to create an entirely new document that didn't exist locally
document = this.database.getDocumentByDocumentId(
remoteVersion.documentId
);
// It can happen that a concurrent sync operation has already created the document, so we can bail here
if (document !== undefined) {
this.logger.debug(
`Document ${remoteVersion.relativePath} has already been created locally, no need to create it again`
);
return;
}
const contentBytes = deserialize(content);
await this.operations.ensureClearPath(remoteVersion.relativePath);
const [promise, resolve] = createPromise();
this.database.updateDocumentMetadata(
{
parentVersionId: remoteVersion.vaultUpdateId,
hash: hash(contentBytes),
remoteRelativePath: remoteVersion.relativePath
},
this.database.createNewPendingDocument(
remoteVersion.documentId,
remoteVersion.relativePath,
promise
)
);
await this.operations.create(
remoteVersion.relativePath,
contentBytes
);
resolve();
this.database.removeDocumentPromise(promise);
this.history.addHistoryEntry({
status: SyncStatus.SUCCESS,
details: updateDetails,
message: `Successfully downloaded remote file which hadn't existed locally`,
author: remoteVersion.userId
});
});
}
public async executeSync<T>(
details: SyncDetails,
fn: () => Promise<T>
): Promise<T | undefined> {
for (const pattern of this.ignorePatterns) {
if (pattern.test(details.relativePath)) {
this.logger.debug(
`File '${details.relativePath}' is ignored by the ignore pattern: ${pattern}`
);
return; // bail without SKIPPED status because we were told to ignore this file and we shouldn't clutter up the history
}
}
try {
// Only check the size of files which already exist locally.
if (await this.operations.exists(details.relativePath)) {
const sizeInBytes = await this.operations.getFileSize(
details.relativePath
);
const historyEntryForSkippedOversizedFile =
this.getHistoryEntryForSkippedOversizedFile(
sizeInBytes,
details.relativePath
);
if (historyEntryForSkippedOversizedFile !== undefined) {
this.history.addHistoryEntry(
historyEntryForSkippedOversizedFile
);
return;
}
}
return await fn();
} catch (e) {
if (e instanceof FileNotFoundError) {
// A subsequent sync operation must have been creating to deal with this
this.logger.info(
`Skiping file '${details.relativePath}' because it no longer exists when trying to ${details.type.toLocaleLowerCase()} it`
);
return;
}
if (e instanceof SyncResetError) {
this.logger.info(
`Interrupting sync operation because of a reset`
);
return;
} else {
this.history.addHistoryEntry({
status: SyncStatus.ERROR,
details,
message: `Failed to sync file '${details.relativePath}' because of ${e} when trying to ${details.type.toLocaleLowerCase()} it`
});
throw e;
}
}
}
private getHistoryEntryForSkippedOversizedFile(
sizeInBytes: number,
relativePath: RelativePath
): CommonHistoryEntry | undefined {
const sizeInMB = Math.round(sizeInBytes / 1024 / 1024);
const { maxFileSizeMB } = this.settings.getSettings();
if (sizeInMB > maxFileSizeMB) {
return {
status: SyncStatus.SKIPPED,
details: {
type: SyncType.SKIPPED,
relativePath
},
message: `File size of ${sizeInMB} MB exceeds the maximum file size limit of ${
maxFileSizeMB
} MB`
};
}
}
}

View file

@ -0,0 +1,79 @@
export enum LogLevel {
DEBUG = "DEBUG",
INFO = "INFO",
WARNING = "WARNING",
ERROR = "ERROR"
}
const LOG_LEVEL_ORDER = {
[LogLevel.DEBUG]: 0,
[LogLevel.INFO]: 1,
[LogLevel.WARNING]: 2,
[LogLevel.ERROR]: 3
};
export class LogLine {
public timestamp = new Date();
public constructor(
public level: LogLevel,
public message: string
) {}
}
export class Logger {
private static readonly MAX_MESSAGES = 100000;
private readonly messages: LogLine[] = [];
private readonly onMessageListeners: ((message: LogLine) => unknown)[] = [];
public constructor(
...onMessageListeners: ((message: LogLine) => unknown)[]
) {
this.onMessageListeners = onMessageListeners;
}
public debug(message: string): void {
this.pushMessage(message, LogLevel.DEBUG);
}
public info(message: string): void {
this.pushMessage(message, LogLevel.INFO);
}
public warn(message: string): void {
this.pushMessage(message, LogLevel.WARNING);
}
public error(message: string): void {
this.pushMessage(message, LogLevel.ERROR);
}
public getMessages(mininumSeverity: LogLevel): LogLine[] {
return this.messages.filter(
(message) =>
LOG_LEVEL_ORDER[message.level] >=
LOG_LEVEL_ORDER[mininumSeverity]
);
}
public addOnMessageListener(listener: (message: LogLine) => unknown): void {
this.onMessageListeners.push(listener);
}
public reset(): void {
this.messages.length = 0;
this.debug("Logger has been reset");
}
private pushMessage(message: string, level: LogLevel): void {
const logLine = new LogLine(level, message);
this.messages.push(logLine);
while (this.messages.length > Logger.MAX_MESSAGES) {
this.messages.shift();
}
this.onMessageListeners.forEach((listener) => {
listener(logLine);
});
}
}

View file

@ -0,0 +1,174 @@
import type { RelativePath } from "../persistence/database";
import type { Logger } from "./logger";
export interface SyncCreateDetails {
type: SyncType.CREATE;
relativePath: RelativePath;
}
export interface SyncUpdateDetails {
type: SyncType.UPDATE;
relativePath: RelativePath;
}
export interface SyncMovedDetails {
type: SyncType.MOVE;
relativePath: RelativePath;
movedFrom: RelativePath;
}
export interface SyncDeleteDetails {
type: SyncType.DELETE;
relativePath: RelativePath;
}
export interface SyncSkippedDetails {
type: SyncType.SKIPPED;
relativePath: RelativePath;
}
export type SyncDetails =
| SyncCreateDetails
| SyncUpdateDetails
| SyncDeleteDetails
| SyncMovedDetails
| SyncSkippedDetails;
export interface CommonHistoryEntry {
status: SyncStatus;
message: string;
details: SyncDetails;
author?: string;
}
export enum SyncType {
CREATE = "CREATE",
UPDATE = "UPDATE",
DELETE = "DELETE",
MOVE = "MOVE",
SKIPPED = "SKIPPED"
}
export enum SyncStatus {
SUCCESS = "SUCCESS",
ERROR = "ERROR",
SKIPPED = "SKIPPED"
}
export type HistoryEntry = CommonHistoryEntry & { timestamp: Date };
export interface HistoryStats {
success: number;
error: number;
}
export class SyncHistory {
private static readonly MAX_ENTRIES = 5000;
private static readonly TIMEOUT_FOR_MERGING_ENTRIES_IN_SECONDS = 60;
private _entries: HistoryEntry[] = [];
private readonly syncHistoryUpdateListeners: ((
status: HistoryStats
) => unknown)[] = [];
private status: HistoryStats = {
success: 0,
error: 0
};
public constructor(private readonly logger: Logger) {}
public get entries(): readonly HistoryEntry[] {
return this._entries;
}
/**
* Insert the entry at the beginning of the history list. If the entry
* already in the list, it will get moved to the beginning and updated.
*
* If the entry list is too long, the oldest entry will be removed.
*/
public addHistoryEntry(entry: CommonHistoryEntry): void {
const historyEntry = {
...entry,
timestamp: new Date()
};
const candidate = this.findSimilarRecentUpdateEntry(historyEntry);
if (candidate !== undefined) {
this._entries = this._entries.filter((e) => e !== candidate);
}
// Insert the entry at the beginning
this._entries.unshift(historyEntry);
if (this._entries.length > SyncHistory.MAX_ENTRIES) {
this._entries.pop();
}
this.updateSuccessCount(historyEntry);
}
public addSyncHistoryUpdateListener(
listener: (stats: HistoryStats) => unknown
): void {
this.syncHistoryUpdateListeners.push(listener);
listener({ ...this.status });
}
public reset(): void {
this._entries.length = 0;
this.status = {
success: 0,
error: 0
};
this.syncHistoryUpdateListeners.forEach((listener) => {
listener(this.status);
});
}
private findSimilarRecentUpdateEntry(
entry: HistoryEntry
): HistoryEntry | undefined {
if (entry.details.type !== SyncType.UPDATE) {
return;
}
const candidate = this._entries.find(
(e) =>
e.details.type === SyncType.UPDATE &&
e.details.relativePath === entry.details.relativePath
);
if (
candidate !== undefined &&
(this._entries[0] === candidate ||
candidate.timestamp.getTime() +
SyncHistory.TIMEOUT_FOR_MERGING_ENTRIES_IN_SECONDS * 1000 >
entry.timestamp.getTime())
) {
return candidate;
}
}
private updateSuccessCount(entry: HistoryEntry): void {
const message = `${entry.details.relativePath} - ${entry.message} (${entry.details.type.toLocaleLowerCase()})`;
switch (entry.status) {
case SyncStatus.SUCCESS:
this.status.success++;
this.logger.info(`History entry: ${message}`);
break;
case SyncStatus.ERROR:
this.status.error++;
this.logger.error(`Cannot sync file: ${message}`);
break;
case SyncStatus.SKIPPED:
this.logger.error(`Skipping file: ${message}`);
break;
}
this.syncHistoryUpdateListeners.forEach((listener) => {
listener(this.status);
});
}
}

View file

@ -0,0 +1,5 @@
export enum DocumentSyncStatus {
UP_TO_DATE = "UP_TO_DATE",
SYNCING = "SYNCING",
SYNCING_IS_DISABLED = "SYNCING_IS_DISABLED"
}

View file

@ -0,0 +1,5 @@
export enum DocumentUpToDateness {
UpToDate = "UpToDate", // easiest case, the client can just show the cursors as-is
Prior = "Prior", // The cursors are outdated, so the client has to guess the cursor positions based on local updates. This is only possible if this client's cursor has once been up-to-date in a given document.
Later = "Later" // The cursors are from a future version of a document, there's no way we can accuratly show them locally.
}

View file

@ -0,0 +1,5 @@
import type { ClientCursors } from "../services/types/ClientCursors";
export interface MaybeOutdatedClientCursors extends ClientCursors {
isOutdated: boolean;
}

View file

@ -0,0 +1,5 @@
export interface NetworkConnectionStatus {
isSuccessful: boolean;
serverMessage: string;
isWebSocketConnected: boolean;
}

View file

@ -0,0 +1,13 @@
import assert from "node:assert";
export function assertSetContainsExactly<T>(set: Set<T>, ...values: T[]): void {
assert.ok(
set.size === values.length &&
Array.from(set).every((value) => values.includes(value)),
`Expected set to contain only ${values.map((v) => '"' + v + '"').join(", ")}, but it contained ${Array.from(
set
)
.map((v) => '"' + v + '"')
.join(", ")}`
);
}

View file

@ -0,0 +1,15 @@
import { v4 as uuidv4 } from "uuid";
export function createClientId(): string {
// @ts-expect-error, injected by webpack
const packageVersion = __CURRENT_VERSION__; // eslint-disable-line
const platform =
typeof navigator !== "undefined"
? navigator.platform // eslint-disable-line @typescript-eslint/no-deprecated
: typeof process !== "undefined"
? process.platform
: "unknown";
return `vault-link/${packageVersion} (${uuidv4()}; ${platform})`;
}

View file

@ -0,0 +1,25 @@
type ResolveFunction<T> = undefined extends T
? (value?: T) => unknown
: (value: T) => unknown;
/**
* A type-safe utility function to create a Promise with resolve and reject functions.
* @returns A tuple containing a Promise, a resolve function, and a reject function.
*/
export function createPromise<T = unknown>(): [
Promise<T>,
ResolveFunction<T>,
(error: unknown) => unknown
] {
let resolve: undefined | ResolveFunction<T> = undefined;
let reject: undefined | ((error: unknown) => unknown) = undefined;
const creationPromise = new Promise<T>(
(resolve_, reject_) =>
// eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion
((resolve = resolve_ as ResolveFunction<T>), (reject = reject_))
);
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
return [creationPromise, resolve!, reject!];
}

View file

@ -0,0 +1,5 @@
import { base64ToBytes } from "byte-base64";
export function deserialize(data: string): Uint8Array {
return base64ToBytes(data);
}

View file

@ -0,0 +1,14 @@
import type { DocumentRecord } from "../persistence/database";
import { EMPTY_HASH } from "./hash";
// TODO: make this smarter so that offline files can be renamed & edited at the same time
export function findMatchingFile(
contentHash: string,
candidates: DocumentRecord[]
): DocumentRecord | undefined {
if (contentHash === EMPTY_HASH) {
return undefined;
}
return candidates.find(({ metadata }) => metadata?.hash === contentHash);
}

View file

@ -0,0 +1,9 @@
export function getRandomColor(name: string): string {
let hash = 0;
for (let i = 0; i < name.length; i++) {
hash = (hash << 5) - hash + name.charCodeAt(i);
hash |= 0; // Convert to 32bit integer
}
const normalised = hash / 0x7fffffff;
return `oklch(0.58 0.15 ${Math.round(Math.abs(normalised * 360))})`;
}

View file

@ -0,0 +1,13 @@
import { describe, it } from "node:test";
import assert from "node:assert";
import { Logger } from "../tracing/logger";
import { globsToRegexes } from "./globs-to-regexes";
describe("globsToRegexes", () => {
it("basicExample", async () => {
const [regex] = globsToRegexes([".git/**"], new Logger());
assert.ok(regex.test(".git/objects/object"));
assert.ok(regex.test(".git/objects/.object"));
});
});

View file

@ -0,0 +1,18 @@
import { makeRe } from "minimatch";
import type { Logger } from "../tracing/logger";
export function globsToRegexes(globs: string[], logger: Logger): RegExp[] {
return globs
.map((pattern) => {
const result = makeRe(pattern, {
dot: true
});
if (result === false) {
logger.warn(
`Failed to parse ${pattern}' as a glob pattern, skipping it`
);
}
return result;
})
.filter((pattern) => pattern !== false);
}

View file

@ -0,0 +1,12 @@
// https://stackoverflow.com/questions/7616461/generate-a-hash-from-string-in-javascript
export function hash(content: Uint8Array): string {
let result = 0;
// eslint-disable-next-line @typescript-eslint/prefer-for-of
for (let i = 0; i < content.length; i++) {
result = (result << 5) - result + content[i];
result |= 0; // Convert to 32bit integer
}
return Math.abs(result).toString(16).padStart(8, "0");
}
export const EMPTY_HASH = hash(new Uint8Array(0));

View file

@ -0,0 +1,29 @@
import { describe, it } from "node:test";
import assert from "node:assert";
import { isEqualBytes } from "./is-equal-bytes";
describe("isEqualBytes", () => {
it("should return true for equal byte arrays", () => {
const bytes1 = new Uint8Array([1, 2, 3, 4]);
const bytes2 = new Uint8Array([1, 2, 3, 4]);
assert.strictEqual(isEqualBytes(bytes1, bytes2), true);
});
it("should return false for byte arrays of different lengths", () => {
const bytes1 = new Uint8Array([1, 2, 3, 4]);
const bytes2 = new Uint8Array([1, 2, 3]);
assert.strictEqual(isEqualBytes(bytes1, bytes2), false);
});
it("should return true for empty byte arrays", () => {
const bytes1 = new Uint8Array([]);
const bytes2 = new Uint8Array([]);
assert.strictEqual(isEqualBytes(bytes1, bytes2), true);
});
it("should return false for byte arrays with same length but different content", () => {
const bytes1 = new Uint8Array([1, 2, 3, 4]);
const bytes2 = new Uint8Array([4, 3, 2, 1]);
assert.strictEqual(isEqualBytes(bytes1, bytes2), false);
});
});

View file

@ -0,0 +1,13 @@
export function isEqualBytes(bytes1: Uint8Array, bytes2: Uint8Array): boolean {
if (bytes1.length !== bytes2.length) {
return false;
}
for (let i = 0; i < bytes1.length; i++) {
if (bytes1[i] !== bytes2[i]) {
return false;
}
}
return true;
}

View file

@ -0,0 +1,42 @@
import { describe, it } from "node:test";
import assert from "node:assert";
import { isFileTypeMergable } from "./is-file-type-mergable";
describe("isFileTypeMergable", () => {
it("should return true for .md files", () => {
assert.strictEqual(isFileTypeMergable(".md"), true);
assert.strictEqual(isFileTypeMergable("hi.md"), true);
assert.strictEqual(
isFileTypeMergable("my/path/to/my/document.md"),
true
);
});
it("should return true for .txt files", () => {
assert.strictEqual(isFileTypeMergable(".txt"), true);
assert.strictEqual(isFileTypeMergable("hi.txt"), true);
assert.strictEqual(
isFileTypeMergable("my/path/to/my/document.txt"),
true
);
});
it("should be case insensitive", () => {
assert.strictEqual(isFileTypeMergable("hi.MD"), true);
assert.strictEqual(
isFileTypeMergable("my/path/to/my/DOCUMENT.MD"),
true
);
assert.strictEqual(isFileTypeMergable("hi.TXT"), true);
assert.strictEqual(
isFileTypeMergable("my/path/to/my/DOCUMENT.TXT"),
true
);
});
it("should return false for non-mergable file types", () => {
assert.strictEqual(isFileTypeMergable(".json"), false);
assert.strictEqual(isFileTypeMergable("HELLO.JSON"), false);
assert.strictEqual(isFileTypeMergable("my/config.yml"), false);
});
});

View file

@ -0,0 +1,6 @@
export function isFileTypeMergable(pathOrFileName: string): boolean {
const parts = pathOrFileName.split(".");
const fileExtension = parts.at(-1) ?? "";
return ["md", "txt"].includes(fileExtension.toLowerCase());
}

View file

@ -0,0 +1,44 @@
import { describe, it } from "node:test";
import assert from "node:assert";
import { lineAndColumnToPosition } from "./line-and-column-to-position";
describe("lineAndColumnToPosition", () => {
it("should return the correct position for the first line", () => {
const text = "Hello\nWorld";
const position = lineAndColumnToPosition(text, 0, 3);
assert.strictEqual(position, 3);
});
it("should return the correct position for the second line", () => {
const text = "Hello\nWorld";
const position = lineAndColumnToPosition(text, 1, 2);
assert.strictEqual(position, 8);
});
it("should return the correct position for an empty string", () => {
const text = "";
const position = lineAndColumnToPosition(text, 0, 0);
assert.strictEqual(position, 0);
});
it("with carrige return", () => {
assert.strictEqual(lineAndColumnToPosition("a\nb", 1, 1), 3);
assert.strictEqual(lineAndColumnToPosition("a\r\nb", 1, 1), 3);
});
it("should handle multi-line strings with varying lengths", () => {
const text = "Line1\nLongerLine2\nShort3";
const position = lineAndColumnToPosition(text, 2, 4);
assert.strictEqual(position, 22);
});
it("should throw an error if the line number is out of range", () => {
const text = "Line1\nLine2";
assert.throws(() => lineAndColumnToPosition(text, 3, 0));
});
it("should throw an error if the column number is out of range", () => {
const text = "Line1\nLine2";
assert.throws(() => lineAndColumnToPosition(text, 1, 10));
});
});

View file

@ -0,0 +1,34 @@
/**
* Converts line and column coordinates to an absolute character position in a text string.
*
* @param line - The zero-based line number
* @param column - The zero-based column number
* @param text - The text string to calculate position in
* @returns The absolute character position (zero-based index) in the text string
* @throws Error if line number is out of range
* @throws Error if column number is out of range
*/
export function lineAndColumnToPosition(
text: string,
line: number,
column: number
): number {
const lines = text.replace("\r", "").split("\n");
if (line >= lines.length) {
throw new Error(`Line number ${line} is out of range.`);
}
if (column > lines[line].length) {
throw new Error(`Column number ${column} is out of range.`);
}
let position = 0;
for (let i = 0; i < line; i++) {
position += lines[i].length + 1;
}
position += column;
return position;
}

View file

@ -0,0 +1,228 @@
import { describe, it, beforeEach } from "node:test";
import assert from "node:assert";
import { Logger } from "../tracing/logger";
import type { RelativePath } from "../persistence/database";
import { Locks } from "./locks";
describe("withLock", () => {
const testPath: RelativePath = "test/document/path";
const testPath2: RelativePath = "test/document/path2";
const logger = new Logger();
// eslint-disable-next-line @typescript-eslint/init-declarations
let locks: Locks<RelativePath>;
beforeEach(() => {
locks = new Locks<RelativePath>(logger);
});
it("should execute function with single key lock", async () => {
let executionCount = 0;
const result = await locks.withLock(testPath, () => {
executionCount++;
return "success";
});
assert.strictEqual(result, "success");
assert.strictEqual(executionCount, 1);
});
it("should execute async function with single key lock", async () => {
let executionCount = 0;
const result = await locks.withLock(testPath, async () => {
executionCount++;
await new Promise((resolve) => setTimeout(resolve, 10));
return "async-success";
});
assert.strictEqual(result, "async-success");
assert.strictEqual(executionCount, 1);
});
it("should execute function with multiple key locks", async () => {
let executionCount = 0;
const result = await locks.withLock([testPath, testPath2], () => {
executionCount++;
return "multi-success";
});
assert.strictEqual(result, "multi-success");
assert.strictEqual(executionCount, 1);
});
it("should sort multiple keys to prevent deadlocks", async () => {
const executionOrder: string[] = [];
// Start two concurrent operations with keys in different orders
const promise1 = locks.withLock([testPath2, testPath], async () => {
executionOrder.push("operation1-start");
await new Promise((resolve) => setTimeout(resolve, 50));
executionOrder.push("operation1-end");
return "result1";
});
const promise2 = locks.withLock([testPath, testPath2], async () => {
executionOrder.push("operation2-start");
await new Promise((resolve) => setTimeout(resolve, 50));
executionOrder.push("operation2-end");
return "result2";
});
const [result1, result2] = await Promise.all([promise1, promise2]);
assert.strictEqual(result1, "result1");
assert.strictEqual(result2, "result2");
// One operation should complete entirely before the other starts
assert.deepStrictEqual(executionOrder, [
"operation1-start",
"operation1-end",
"operation2-start",
"operation2-end"
]);
});
it("should serialize access to same key", async () => {
const executionOrder: string[] = [];
const promise1 = locks.withLock(testPath, async () => {
executionOrder.push("operation1-start");
await new Promise((resolve) => setTimeout(resolve, 50));
executionOrder.push("operation1-end");
return "result1";
});
const promise2 = locks.withLock(testPath, async () => {
executionOrder.push("operation2-start");
await new Promise((resolve) => setTimeout(resolve, 30));
executionOrder.push("operation2-end");
return "result2";
});
const [result1, result2] = await Promise.all([promise1, promise2]);
assert.strictEqual(result1, "result1");
assert.strictEqual(result2, "result2");
assert.deepStrictEqual(executionOrder, [
"operation1-start",
"operation1-end",
"operation2-start",
"operation2-end"
]);
});
it("should allow concurrent access to different keys", async () => {
const executionOrder: string[] = [];
const promise1 = locks.withLock(testPath, async () => {
executionOrder.push("operation1-start");
await new Promise((resolve) => setTimeout(resolve, 50));
executionOrder.push("operation1-end");
return "result1";
});
const promise2 = locks.withLock(testPath2, async () => {
executionOrder.push("operation2-start");
await new Promise((resolve) => setTimeout(resolve, 30));
executionOrder.push("operation2-end");
return "result2";
});
const [result1, result2] = await Promise.all([promise1, promise2]);
assert.strictEqual(result1, "result1");
assert.strictEqual(result2, "result2");
// Both operations should run concurrently
assert.strictEqual(executionOrder[0], "operation1-start");
assert.strictEqual(executionOrder[1], "operation2-start");
});
it("should release locks even if function throws", async () => {
const error = new Error("test error");
await assert.rejects(
locks.withLock(testPath, () => {
throw error;
}),
{ message: "test error" }
);
// Lock should be released, allowing another operation
const result = await locks.withLock(
testPath,
() => "success-after-error"
);
assert.strictEqual(result, "success-after-error");
});
it("should release locks even if async function throws", async () => {
const error = new Error("async test error");
await assert.rejects(
locks.withLock(testPath, async () => {
await new Promise((resolve) => setTimeout(resolve, 10));
throw error;
}),
{ message: "async test error" }
);
// Lock should be released, allowing another operation
const result = await locks.withLock(
testPath,
() => "success-after-async-error"
);
assert.strictEqual(result, "success-after-async-error");
});
it("should handle empty array of keys", async () => {
const result = await locks.withLock([], () => "empty-keys");
assert.strictEqual(result, "empty-keys");
});
it("should maintain FIFO order for multiple waiters", async () => {
const executionOrder: string[] = [];
// Start first operation that holds the lock
const firstPromise = locks.withLock(testPath, async () => {
executionOrder.push("first-start");
await new Promise((resolve) => setTimeout(resolve, 100));
executionOrder.push("first-end");
return "first";
});
// Small delay to ensure first operation starts
await new Promise((resolve) => setTimeout(resolve, 10));
// Queue second and third operations
const secondPromise = locks.withLock(testPath, async () => {
executionOrder.push("second-start");
await new Promise((resolve) => setTimeout(resolve, 30));
executionOrder.push("second-end");
return "second";
});
const thirdPromise = locks.withLock(testPath, async () => {
executionOrder.push("third-start");
await new Promise((resolve) => setTimeout(resolve, 20));
executionOrder.push("third-end");
return "third";
});
const [first, second, third] = await Promise.all([
firstPromise,
secondPromise,
thirdPromise
]);
assert.strictEqual(first, "first");
assert.strictEqual(second, "second");
assert.strictEqual(third, "third");
assert.deepStrictEqual(executionOrder, [
"first-start",
"first-end",
"second-start",
"second-end",
"third-start",
"third-end"
]);
});
});

View file

@ -0,0 +1,142 @@
import type { Logger } from "../tracing/logger";
/**
* Manages exclusive locks on items to prevent concurrent modifications.
* Locks are granted in FIFO order.
*
* @template T The type of the key used for locking
*/
export class Locks<T> {
/** Currently locked keys */
private readonly locked = new Set<T>();
/** Queue of resolve functions waiting for each key */
private readonly waiters = new Map<T, (() => unknown)[]>();
public constructor(private readonly logger?: Logger) {}
/**
* Executes a function while holding exclusive locks on one or more keys.
*
* This method ensures that the provided function runs with exclusive access to the
* specified key(s). Multiple keys are sorted to prevent deadlocks when different
* operations request the same keys in different orders.
*
* @template R The return type of the function to execute
* @param keyOrKeys A single key or array of keys to lock during function execution
* @param fn The function to execute while holding the lock(s). Can be sync or async.
* @returns A Promise that resolves to the return value of the executed function
*
* @example
* ```typescript
* // Lock a single key
* const result = await locks.withLock('file1', () => {
* // Critical section - only one operation can access 'file1' at a time
* return processFile('file1');
* });
*
* // Lock multiple keys (prevents deadlocks through consistent ordering)
* await locks.withLock(['file1', 'file2'], async () => {
* // Critical section - exclusive access to both files
* await moveFile('file1', 'file2');
* });
* ```
*
* @throws Any error thrown by the provided function will be propagated after locks are released
*/
public async withLock<R>(
keyOrKeys: T | T[],
fn: () => R | Promise<R>
): Promise<R> {
const keys = Array.isArray(keyOrKeys) ? keyOrKeys : [keyOrKeys];
keys.sort((a, b) => String(a).localeCompare(String(b))); // Ensure consistent order to prevent deadlocks
await Promise.all(keys.map(async (key) => this.waitForLock(key)));
try {
return await fn();
} finally {
keys.forEach((key) => {
this.unlock(key);
});
}
}
/**
* Attempts to acquire a lock immediately without waiting.
* Must call `unlock()` if successful.
*
* @param key The key to lock
* @returns `true` if lock acquired, `false` if already locked
*/
private tryLock(key: T): boolean {
if (this.locked.has(key)) {
return false;
}
this.locked.add(key);
return true;
}
/**
* Waits to acquire a lock, blocking until available.
* Operations are queued in FIFO order. Must call `unlock()` when done.
*
* @param key The key to wait for and lock
* @returns Promise that resolves when lock is acquired
*/
private async waitForLock(key: T): Promise<void> {
if (this.tryLock(key)) {
return Promise.resolve();
}
this.logger?.debug(`Waiting for lock on ${key}`);
return new Promise((resolve) => {
// DefaultDict behavior
let waiting = this.waiters.get(key);
if (!waiting) {
waiting = [];
this.waiters.set(key, waiting);
}
waiting.push(resolve);
});
}
/**
* Releases a lock and grants access to the next waiting operation in FIFO order.
* Removes the key from locked set if no waiters.
*
* @param key The key to unlock
* @throws {Error} If key is not currently locked
*/
private unlock(key: T): void {
if (!this.locked.has(key)) {
throw new Error(`Key '${key}' is not locked, cannot unlock`);
}
// Remove first waiter to ensure FIFO order
const nextWaiting = this.waiters.get(key)?.shift();
if (nextWaiting) {
this.logger?.debug(`Granted lock on ${key}`);
nextWaiting();
} else {
this.locked.delete(key);
}
}
}
export class Lock {
private readonly locks: Locks<boolean>;
public constructor(logger?: Logger) {
this.locks = new Locks(logger);
}
public async withLock<R>(fn: () => R | Promise<R>): Promise<R> {
return this.locks.withLock(true, fn);
}
}

View file

@ -0,0 +1,62 @@
import { describe, it } from "node:test";
import assert from "node:assert";
import { CoveredValues } from "./min-covered";
describe("CoveredValues", () => {
it("should initialize with the given min value", () => {
const covered = new CoveredValues(5);
assert.strictEqual(covered.min, 5);
});
it("should add values greater than min", () => {
const covered = new CoveredValues(0);
covered.add(3);
assert.strictEqual(covered.min, 0);
covered.add(1);
assert.strictEqual(covered.min, 1);
covered.add(4);
assert.strictEqual(covered.min, 1);
covered.add(2);
assert.strictEqual(covered.min, 4);
});
it("should ignore duplicate values", () => {
const covered = new CoveredValues(0);
covered.add(3);
covered.add(3);
covered.add(3);
assert.strictEqual(covered.min, 0);
covered.add(1);
covered.add(2);
assert.strictEqual(covered.min, 3);
});
it("should handle multiple consecutive values", () => {
const covered = new CoveredValues(132);
for (let i = 250; i > 132; i--) {
assert.strictEqual(covered.min, 132);
covered.add(i);
}
assert.strictEqual(covered.min, 250);
});
it("should handle adding values lower than current min", () => {
const covered = new CoveredValues(5);
covered.add(3);
assert.strictEqual(covered.min, 5);
covered.add(6);
assert.strictEqual(covered.min, 6);
});
it("should handle force setting min value", () => {
const covered = new CoveredValues(5);
covered.add(7);
covered.add(8);
covered.add(9);
assert.strictEqual(covered.min, 5);
covered.min = 6;
assert.strictEqual(covered.min, 6);
covered.add(10);
assert.strictEqual(covered.min, 10);
});
});

View file

@ -0,0 +1,56 @@
/**
* A class that tracks the minimum covered value in a sequence of numbers.
* It keeps track of a minimum value based on the seen values.
*
* It expects integers slightly out of order and makes sure that the value of `min` is
* always the minimum of the seen values. This is done with bounded memory usage.
*
* @example
* ```typescript
* const covered = new CoveredValues(0);
* covered.add(2); // seenValues = [2], min = 0
* covered.add(1); // seenValues = [], min = 2
* covered.min; // returns 2
* ```
*/
export class CoveredValues {
private seenValues: number[] = [];
public constructor(private minValue: number) {}
public get min(): number {
return this.minValue;
}
public set min(value: number) {
this.minValue = Math.max(value, this.minValue);
this.seenValues = this.seenValues.filter((v) => v > value);
}
public add(value: number): void {
if (value < this.minValue) {
return;
}
let i = 0;
while (i < this.seenValues.length && this.seenValues[i] < value) {
i++;
}
if (i === this.seenValues.length) {
this.seenValues.push(value);
} else if (this.seenValues[i] === value) {
return;
} else {
this.seenValues.splice(i, 0, value);
}
while (
this.seenValues.length > 0 &&
this.seenValues[0] === this.minValue + 1
) {
this.seenValues.shift();
this.minValue++;
}
}
}

View file

@ -0,0 +1,66 @@
import { describe, test } from "node:test";
import assert from "node:assert";
import { positionToLineAndColumn } from "./position-to-line-and-column";
describe("positionToLineAndColumn", () => {
test("converts position to line and column in multi-line text", () => {
const text = "ab\ncd\n";
assert.deepStrictEqual(positionToLineAndColumn(text, 0), {
line: 0,
column: 0
});
assert.deepStrictEqual(positionToLineAndColumn(text, 1), {
line: 0,
column: 1
});
assert.deepStrictEqual(positionToLineAndColumn(text, 2), {
line: 0,
column: 2
});
assert.deepStrictEqual(positionToLineAndColumn(text, 3), {
line: 1,
column: 0
});
assert.deepStrictEqual(positionToLineAndColumn(text, 4), {
line: 1,
column: 1
});
assert.deepStrictEqual(positionToLineAndColumn(text, 6), {
line: 2,
column: 0
});
});
test("with carrige returns", () => {
assert.deepStrictEqual(positionToLineAndColumn("a\nb", 3), {
line: 1,
column: 1
});
assert.deepStrictEqual(positionToLineAndColumn("a\r\nb", 3), {
line: 1,
column: 1
});
});
test("handles empty input", () => {
assert.deepStrictEqual(positionToLineAndColumn("", 0), {
line: 0,
column: 0
});
});
test("handles positions at the end of text", () => {
const text = "End";
assert.deepStrictEqual(positionToLineAndColumn(text, 3), {
line: 0,
column: 3
});
});
test("throws error for position out of range", () => {
const text = "Short text";
assert.throws(() => positionToLineAndColumn(text, 15));
assert.throws(() => positionToLineAndColumn(text, -1));
});
});

View file

@ -0,0 +1,36 @@
/**
* Converts a character position in text to line and column numbers.
*
* @param text The text content to analyze
* @param position The character position to convert
* @returns An object containing line and column numbers
* @throws Will throw an error if the position is negative or exceeds the text length
*/
export function positionToLineAndColumn(
text: string,
position: number
): { line: number; column: number } {
if (position < 0) {
throw new Error("Position cannot be negative");
}
text = text.replace("\r", "");
if (
position >
text.length + 1
// +1 to account for the cursor being after last character
) {
throw new Error(
`Position ${position} exceeds text length ${text.length}`
);
}
const textUpToPosition = text.substring(0, position);
const lines = textUpToPosition.split("\n");
const line = lines.length - 1;
const column = lines[lines.length - 1].length;
return { line, column };
}

View file

@ -0,0 +1,64 @@
import { rateLimit } from "./rate-limit";
import { describe, it, beforeEach, afterEach, mock } from "node:test";
import assert from "node:assert";
describe("rateLimit", () => {
beforeEach(() => {
mock.timers.enable({ apis: ["setTimeout"] });
});
afterEach(() => {
mock.timers.reset();
});
it("should call the function immediately on first invocation", async () => {
const mockFn = mock.fn<() => Promise<string>>();
mockFn.mock.mockImplementation(async () => "result");
const rateLimited = rateLimit(mockFn, 100);
const promise = rateLimited();
assert.strictEqual(mockFn.mock.callCount(), 1);
await promise;
});
it("should call the function again after the interval has passed", async () => {
const mockFn = mock.fn<(value: number) => Promise<string>>();
mockFn.mock.mockImplementation(async () => "result");
const rateLimited = rateLimit(mockFn, 100);
const promise1 = rateLimited(1);
await promise1;
mock.timers.tick(200);
const promise2 = rateLimited(2);
await promise2;
assert.strictEqual(mockFn.mock.callCount(), 2);
assert.deepStrictEqual(mockFn.mock.calls[1].arguments, [2]);
});
it("should use the most recent arguments if multiple calls are made within interval", async () => {
const mockFn = mock.fn<(value: string) => Promise<string>>();
mockFn.mock.mockImplementation(async (val: string) => `${val}-result`);
const rateLimited = rateLimit(mockFn, 100);
const promise1 = rateLimited("first");
mock.timers.tick(10);
const promise2 = rateLimited("second");
mock.timers.tick(10);
const promise3 = rateLimited("third");
mock.timers.tick(1000);
assert.strictEqual(await promise1, "first-result");
assert.strictEqual(await promise2, "third-result");
assert.strictEqual(await promise3, undefined);
assert.strictEqual(mockFn.mock.callCount(), 2);
assert.deepStrictEqual(mockFn.mock.calls[0].arguments, ["first"]);
assert.deepStrictEqual(mockFn.mock.calls[1].arguments, ["third"]);
});
});

View file

@ -0,0 +1,58 @@
import { createPromise } from "./create-promise";
import { sleep } from "./sleep";
/**
* Creates a rate-limited version of a given asynchronous function.
* Ensures that the function is not called more frequently than specified by `minIntervalMs`.
* If the function is called while a previous call is still within the rate limit window,
* it will queue up the most recent arguments and execute them after the rate limit expires.
* Only the most recent call is preserved in the queue.
*
* @template T - Type of the function to be rate limited
* @param {T} fn - The asynchronous function to rate limit
* @param {number} minIntervalMs - The minimum interval in milliseconds between function calls
* @returns {(...args: Parameters<T>) => ReturnType<T> | Promise<undefined>} A decorated function that respects the rate limit.
* Returns the original function's return type when executed, or undefined if the call was superseded by a newer one.
*/
export function rateLimit<
R,
T extends (
...args: any // eslint-disable-line @typescript-eslint/no-explicit-any
) => Promise<R>
>(
fn: T,
minIntervalMs: number
): (...args: Parameters<T>) => Promise<R | undefined> {
let newArgs: Parameters<T> | undefined = undefined;
let running: Promise<unknown> | undefined = undefined;
const decoratedFn = async (
...args: Parameters<T>
): Promise<R | undefined> => {
if (running !== undefined) {
newArgs = args;
await running;
// args might have changed while we were waiting
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
if (newArgs === undefined) {
// we weren't the first one to wake up, that means a newer
// invocation is running now, we can just bail
return;
}
args = newArgs;
newArgs = undefined;
}
const [promise, resolve] = createPromise();
running = promise;
sleep(minIntervalMs)
.then(resolve)
.catch(() => {
// sleep cannot fail
});
return fn(...args);
};
return decoratedFn;
}

View file

@ -0,0 +1,3 @@
export async function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}

View file

@ -0,0 +1,18 @@
{
"compilerOptions": {
"module": "ESNext",
"target": "ESNext",
"strict": true,
"allowSyntheticDefaultImports": true,
"moduleResolution": "bundler",
"lib": [
"DOM", // to get `fetch` & `WebSocket`
"ES2024"
],
"declaration": true,
"declarationDir": "./dist/types"
},
"exclude": [
"./dist"
]
}

View file

@ -0,0 +1,71 @@
const path = require("path");
const { merge } = require("webpack-merge");
const webpack = require("webpack");
const packageJson = require("./package.json");
const common = {
entry: "./src/index.ts",
module: {
rules: [
{
test: /\.ts$/,
use: ["ts-loader"]
},
{
test: /\.wasm$/,
type: "asset/inline"
}
]
},
plugins: [
new webpack.DefinePlugin({
__CURRENT_VERSION__: JSON.stringify(packageJson.version)
})
],
optimization: {
// the consuming project should take care of minification
minimize: false
},
resolve: {
extensions: [".ts", ".js"],
alias: {
root: __dirname,
src: path.resolve(__dirname, "src")
}
},
performance: {
hints: false // it's a library, no need to warn about its size
}
};
module.exports = [
merge(common, {
target: "web",
output: {
path: path.resolve(__dirname, "dist"),
filename: "sync-client.web.js",
library: {
name: "SyncClient",
type: "umd"
},
globalObject: "this"
},
resolve: {
fallback: {
ws: false // Exclude `ws` from the browser bundle
}
}
}),
merge(common, {
target: "node",
output: {
path: path.resolve(__dirname, "dist"),
filename: "sync-client.node.js",
libraryTarget: "commonjs2"
},
externals: {
bufferutil: "bufferutil",
"utf-8-validate": "utf-8-validate" // required for ws: https://github.com/websockets/ws/issues/2245#issuecomment-2250318733
}
})
];