split: sync-client utils and errors reorganization

Move error classes from services/ and file-operations/ into a new errors/
directory (authentication-error, server-version-mismatch-error,
sync-reset-error, file-not-found-error), plus add file-already-exists-error
and http-client-error. Update consts.ts and utils/* (await-all,
create-client-id, hash, rate-limit, find-matching-file). Replace
data-structures (locks, min-covered, event-listeners, fix-sized-cache) and
add debugging utilities (in-memory-file-system, log-to-console,
slow-web-socket-factory). Removes utils/create-promise.ts.
This commit is contained in:
Andras Schmelczer 2026-05-08 21:36:29 +01:00
parent f7beb31d8f
commit 9d99a4ac23
22 changed files with 384 additions and 215 deletions

View file

@ -1,6 +1,6 @@
export const TIMEOUT_FOR_MERGING_HISTORY_ENTRIES_IN_SECONDS = 60;
export const DIFF_CACHE_SIZE_MB = 2;
export const MAX_LOG_MESSAGE_COUNT = 100000;
export const MAX_HISTORY_ENTRY_COUNT = 5000;
export const SUPPORTED_API_VERSION = 2;
export const WEBSOCKET_DISCONNECT_TIMEOUT_IN_S = 10;
export const SUPPORTED_API_VERSION = 3;
export const WEBSOCKET_DISCONNECT_TIMEOUT_IN_SECONDS = 10;
export const WEBSOCKET_CONNECTION_TIMEOUT_IN_SECONDS = 10;

View file

@ -0,0 +1,9 @@
export class FileAlreadyExistsError extends Error {
public constructor(
message: string,
public readonly filePath: string
) {
super(message);
this.name = "FileAlreadyExistsError";
}
}

View file

@ -0,0 +1,9 @@
export class HttpClientError extends Error {
public constructor(
public readonly statusCode: number,
message: string
) {
super(message);
this.name = "HttpClientError";
}
}

View file

@ -9,7 +9,7 @@ type ResolvedTuple<T extends readonly unknown[]> = {
export const awaitAll = async <T extends readonly unknown[]>(
promises: PromiseTuple<T>
): Promise<ResolvedTuple<T>> => {
// eslint-disable-next-line no-restricted-properties
// eslint-disable-next-line no-restricted-properties, @typescript-eslint/await-thenable
const result = await Promise.allSettled(promises);
for (const res of result) {
if (res.status === "rejected") {

View file

@ -1,5 +1,3 @@
import { v4 as uuidv4 } from "uuid";
export function createClientId(): string {
// @ts-expect-error, injected by webpack
const packageVersion = __CURRENT_VERSION__; // eslint-disable-line
@ -11,5 +9,5 @@ export function createClientId(): string {
? process.platform
: "unknown";
return `vault-link/${packageVersion} (${uuidv4()}; ${platform})`;
return `vault-link/${packageVersion} (${Math.round(Math.random() * 1e10)}; ${platform})`;
}

View file

@ -1,25 +0,0 @@
type ResolveFunction<T> = undefined extends T
? (value?: T) => unknown
: (value: T) => unknown;
/**
* A type-safe utility function to create a Promise with resolve and reject functions.
* @returns A tuple containing a Promise, a resolve function, and a reject function.
*/
export function createPromise<T = unknown>(): [
Promise<T>,
ResolveFunction<T>,
(error: unknown) => unknown
] {
let resolve: undefined | ResolveFunction<T> = undefined;
let reject: undefined | ((error: unknown) => unknown) = undefined;
const creationPromise = new Promise<T>(
(resolve_, reject_) =>
// eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion
((resolve = resolve_ as ResolveFunction<T>), (reject = reject_))
);
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
return [creationPromise, resolve!, reject!];
}

View file

@ -40,9 +40,14 @@ export class EventListeners<TListener extends (...args: any[]) => any> {
* @param args The arguments to pass to each listener
*/
public trigger(...args: Parameters<TListener>): void {
this.listeners.forEach((listener) => {
const snapshot = this.listeners.slice();
for (const listener of snapshot) {
// allow removing listeners during the trigger loop
if (!this.listeners.includes(listener)) {
continue;
}
listener(...args);
});
}
}
/**
@ -53,16 +58,19 @@ export class EventListeners<TListener extends (...args: any[]) => any> {
* @param args The arguments to pass to each listener
*/
public async triggerAsync(...args: Parameters<TListener>): Promise<void> {
await awaitAll(
this.listeners
.map((listener) => {
// eslint-disable-next-line @typescript-eslint/no-unsafe-return
return listener(...args);
})
.filter((result): result is Promise<unknown> => {
return result instanceof Promise;
})
);
const snapshot = this.listeners.slice();
const promises: Promise<unknown>[] = [];
for (const listener of snapshot) {
if (!this.listeners.includes(listener)) {
continue;
}
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
const result = listener(...args);
if (result instanceof Promise) {
promises.push(result);
}
}
await awaitAll(promises);
}
public clear(): void {

View file

@ -1,6 +1,6 @@
// Implements an in-memory fixed-size cache for document contents,
import type { VaultUpdateId } from "../../persistence/database";
import type { VaultUpdateId } from "../../sync-operations/types";
// Doubly-linked list node for O(1) LRU operations
class LRUNode {

View file

@ -1,22 +1,24 @@
import { describe, it, beforeEach } from "node:test";
import assert from "node:assert";
import { Logger } from "../../tracing/logger";
import type { RelativePath } from "../../persistence/database";
import type { RelativePath } from "../../sync-operations/types";
import { Locks } from "./locks";
import { awaitAll } from "../await-all";
import { sleep } from "../sleep";
import { SyncResetError } from "../../services/sync-reset-error";
import { SyncResetError } from "../../errors/sync-reset-error";
describe("withLock", () => {
const testPath: RelativePath = "test/document/path";
const testPath2: RelativePath = "test/document/path2";
const testPath3: RelativePath = "test/document/path3";
const logger = new Logger();
// eslint-disable-next-line @typescript-eslint/init-declarations
let locks: Locks<RelativePath>;
beforeEach(() => {
locks = new Locks<RelativePath>(logger);
locks = new Locks<RelativePath>("locks-test", logger);
});
it("should execute function with single key lock", async () => {
@ -56,22 +58,32 @@ describe("withLock", () => {
it("should sort multiple keys to prevent deadlocks", async () => {
const executionOrder: string[] = [];
// Start two concurrent operations with keys in different orders
const promise1 = locks.withLock([testPath2, testPath], async () => {
await locks.waitForLock(testPath);
const promise = awaitAll([
locks.withLock([testPath2, testPath3, testPath], async () => {
executionOrder.push("operation1-start");
await sleep(50);
executionOrder.push("operation1-end");
return "result1";
});
}),
const promise2 = locks.withLock([testPath, testPath2], async () => {
locks.withLock([testPath3, testPath, testPath2], async () => {
executionOrder.push("operation2-start");
await sleep(50);
executionOrder.push("operation2-end");
return "result2";
});
})
]);
const [result1, result2] = await awaitAll([promise1, promise2]);
locks.unlock(testPath);
const [result1, result2] = await Promise.race([
promise,
new Promise<never>((_, reject) => {
setTimeout(() => {
reject(new Error("Deadlock detected"));
}, 1000);
})
]);
assert.strictEqual(result1, "result1");
assert.strictEqual(result2, "result2");
@ -234,13 +246,14 @@ describe("withLock", () => {
describe("reset", () => {
const testPath: RelativePath = "test/document/path";
const testPath2: RelativePath = "test/document/path2";
const logger = new Logger();
// eslint-disable-next-line @typescript-eslint/init-declarations
let locks: Locks<RelativePath>;
beforeEach(() => {
locks = new Locks<RelativePath>(logger);
locks = new Locks<RelativePath>("locks-test", logger);
});
it("should reject pending waiters with SyncResetError while running operation completes", async () => {
@ -289,4 +302,38 @@ describe("reset", () => {
const result = await locks.withLock(testPath, () => "success");
assert.strictEqual(result, "success");
});
it("should release partially acquired locks when reset interrupts multi-key acquisition", async () => {
// Hold testPath2 so multi-key acquisition will block on it
await locks.waitForLock(testPath2);
// Start multi-key lock that will acquire testPath first, then block on testPath2
const multiKeyPromise = locks.withLock(
[testPath, testPath2],
async () => "multi"
);
void multiKeyPromise.catch(() => {}); // eslint-disable-line @typescript-eslint/no-empty-function
// Wait for the multi-key operation to acquire testPath and start waiting on testPath2
await sleep(10);
// Reset should reject the waiting operation
locks.reset();
await assert.rejects(multiKeyPromise, (err: Error) => {
assert.ok(err instanceof SyncResetError);
return true;
});
// The key that was already acquired (testPath) should now be released
// This would hang/timeout if the lock was leaked
const result = await Promise.race([
locks.withLock(testPath, () => "success"),
sleep(100).then(() => {
throw new Error("Lock was not released - deadlock detected");
})
]);
assert.strictEqual(result, "success");
});
});

View file

@ -1,6 +1,5 @@
import { SyncResetError } from "../../services/sync-reset-error";
import { SyncResetError } from "../../errors/sync-reset-error";
import type { Logger } from "../../tracing/logger";
import { awaitAll } from "../await-all";
/**
* Manages exclusive locks on items to prevent concurrent modifications.
@ -8,17 +7,23 @@ import { awaitAll } from "../await-all";
*
* @template T The type of the key used for locking
*/
/** Waiter entry with callbacks */
interface WaiterEntry {
resolve: () => unknown;
reject: (err: unknown) => unknown;
}
export class Locks<T> {
/** Currently locked keys */
private readonly locked = new Set<T>();
/** Queue of resolve functions waiting for each key */
private readonly waiters = new Map<
T,
[() => unknown, (err: unknown) => unknown][]
>();
/** Queue of waiters for each key */
private readonly waiters = new Map<T, WaiterEntry[]>();
public constructor(private readonly logger?: Logger) {}
public constructor(
private readonly name: string,
private readonly logger?: Logger
) {}
/**
* Executes a function while holding exclusive locks on one or more keys.
@ -59,12 +64,17 @@ export class Locks<T> {
const uniqueKeys = Array.from(new Set(keys));
uniqueKeys.sort((a, b) => String(a).localeCompare(String(b))); // Ensure consistent order to prevent deadlocks
await awaitAll(uniqueKeys.map(async (key) => this.waitForLock(key)));
const lockedKeys = [];
try {
for (const key of uniqueKeys) {
// Must acquire locks in-order (not concurrently) to prevent deadlocks
await this.waitForLock(key);
lockedKeys.push(key);
}
return await fn();
} finally {
uniqueKeys.forEach((key) => {
lockedKeys.forEach((key) => {
this.unlock(key);
});
}
@ -74,7 +84,7 @@ export class Locks<T> {
// Resolve all waiting promises before clearing to prevent deadlock
// Any operation waiting for a lock will be granted access immediately
for (const waiting of this.waiters.values()) {
for (const [_, reject] of waiting) {
for (const { reject } of waiting) {
reject(new SyncResetError());
}
}
@ -82,6 +92,10 @@ export class Locks<T> {
this.waiters.clear();
}
public isLocked(key: T): boolean {
return this.locked.has(key);
}
/**
* Attempts to acquire a lock immediately without waiting.
* Must call `unlock()` if successful.
@ -111,7 +125,7 @@ export class Locks<T> {
return Promise.resolve();
}
this.logger?.debug(`Waiting for lock on ${key}`);
this.logger?.debug(`Waiting for lock '${this.name}' on '${key}'`);
return new Promise((resolve, reject) => {
// DefaultDict behavior
@ -121,7 +135,10 @@ export class Locks<T> {
this.waiters.set(key, waiting);
}
waiting.push([resolve, reject]);
waiting.push({
resolve,
reject
});
});
}
@ -134,15 +151,20 @@ export class Locks<T> {
*/
public unlock(key: T): void {
if (!this.locked.has(key)) {
this.logger?.debug(
`Attempted to unlock '${this.name}' on '${key}' which is not locked`
);
return;
}
// Remove first waiter to ensure FIFO order
const [resolveNextWaiting, _] = this.waiters.get(key)?.shift() ?? [];
this.logger?.debug(`Releasing lock '${this.name}' on '${key}'`);
if (resolveNextWaiting) {
this.logger?.debug(`Granted lock on ${key}`);
resolveNextWaiting();
// Remove first waiter to ensure FIFO order
const nextWaiter = this.waiters.get(key)?.shift();
if (nextWaiter) {
this.logger?.debug(`Granted lock '${this.name}' on '${key}'`);
nextWaiter.resolve();
} else {
this.locked.delete(key);
}
@ -152,8 +174,8 @@ export class Locks<T> {
export class Lock {
private readonly locks: Locks<boolean>;
public constructor(logger?: Logger) {
this.locks = new Locks(logger);
public constructor(name: string, logger?: Logger) {
this.locks = new Locks(name, logger);
}
public async withLock<R>(fn: () => R | Promise<R>): Promise<R> {

View file

@ -1,15 +1,15 @@
import { describe, it } from "node:test";
import assert from "node:assert";
import { CoveredValues } from "./min-covered";
import { MinCovered } from "./min-covered";
describe("CoveredValues", () => {
describe("MinCovered", () => {
it("should initialize with the given min value", () => {
const covered = new CoveredValues(5);
const covered = new MinCovered(5);
assert.strictEqual(covered.min, 5);
});
it("should add values greater than min", () => {
const covered = new CoveredValues(0);
const covered = new MinCovered(0);
covered.add(3);
assert.strictEqual(covered.min, 0);
covered.add(1);
@ -21,7 +21,7 @@ describe("CoveredValues", () => {
});
it("should ignore duplicate values", () => {
const covered = new CoveredValues(0);
const covered = new MinCovered(0);
covered.add(3);
covered.add(3);
covered.add(3);
@ -32,7 +32,7 @@ describe("CoveredValues", () => {
});
it("should handle multiple consecutive values", () => {
const covered = new CoveredValues(132);
const covered = new MinCovered(132);
for (let i = 250; i > 132; i--) {
assert.strictEqual(covered.min, 132);
covered.add(i);
@ -41,36 +41,32 @@ describe("CoveredValues", () => {
});
it("should handle adding values lower than current min", () => {
const covered = new CoveredValues(5);
const covered = new MinCovered(5);
covered.add(3);
assert.strictEqual(covered.min, 5);
covered.add(6);
assert.strictEqual(covered.min, 6);
});
it("should auto-advance when setting min value", () => {
const covered = new CoveredValues(5);
it("should auto-advance when adding the value that fills the next gap", () => {
const covered = new MinCovered(5);
covered.add(7);
covered.add(8);
covered.add(9);
assert.strictEqual(covered.min, 5);
// Setting min to 6 should auto-advance through 7, 8, 9
covered.min = 6;
// Adding 6 fills the gap and auto-advances through 7, 8, 9
covered.add(6);
assert.strictEqual(covered.min, 9);
covered.add(10);
assert.strictEqual(covered.min, 10);
});
it("should handle setting min value with no consecutive values", () => {
const covered = new CoveredValues(5);
covered.add(10);
covered.add(15);
assert.strictEqual(covered.min, 5);
// Setting min to 8 should not auto-advance (no consecutive values)
covered.min = 8;
assert.strictEqual(covered.min, 8);
// Add 9 to trigger auto-advance to 10
covered.add(9);
assert.strictEqual(covered.min, 10);
it("should rewind when reset is called explicitly", () => {
const covered = new MinCovered(5);
covered.add(7);
covered.reset(3);
assert.strictEqual(covered.min, 3);
covered.add(4);
assert.strictEqual(covered.min, 4);
});
});

View file

@ -7,13 +7,13 @@
*
* @example
* ```typescript
* const covered = new CoveredValues(0);
* const covered = new MinCovered(0);
* covered.add(2); // seenValues = [2], min = 0
* covered.add(1); // seenValues = [], min = 2
* covered.min; // returns 2
* ```
*/
export class CoveredValues {
export class MinCovered {
private seenValues: number[] = [];
public constructor(private minValue: number) {}
@ -22,12 +22,6 @@ export class CoveredValues {
return this.minValue;
}
public set min(value: number) {
this.minValue = Math.max(value, this.minValue);
this.seenValues = this.seenValues.filter((v) => v > this.minValue);
this.advanceMinWhilePossible();
}
public add(value: number | undefined): void {
if (value === undefined || value < this.minValue) {
return;
@ -49,6 +43,11 @@ export class CoveredValues {
this.advanceMinWhilePossible();
}
public reset(minValue?: number): void {
this.minValue = minValue ?? 0;
this.seenValues = [];
}
private advanceMinWhilePossible(): void {
while (
this.seenValues.length > 0 &&

View file

@ -0,0 +1,69 @@
import type { RelativePath } from "../../sync-operations/types";
import type { TextWithCursors } from "reconcile-text";
import type { FileSystemOperations } from "../../file-operations/filesystem-operations";
export class InMemoryFileSystem implements FileSystemOperations {
protected readonly files = new Map<string, Uint8Array>();
public async listFilesRecursively(
_root: RelativePath | undefined = undefined // we don't use multi-level paths during tests
): Promise<RelativePath[]> {
return Array.from(this.files.keys());
}
public async read(path: RelativePath): Promise<Uint8Array> {
const file = this.files.get(path);
if (!file) {
throw new Error(`File ${path} does not exist`);
}
return file;
}
public async write(path: RelativePath, content: Uint8Array): Promise<void> {
this.files.set(path, content);
}
public async atomicUpdateText(
path: RelativePath,
updater: (current: TextWithCursors) => TextWithCursors
): Promise<string> {
const file = this.files.get(path);
if (!file) {
throw new Error(`File ${path} does not exist`);
}
const currentContent = new TextDecoder().decode(file);
const newContent = updater({ text: currentContent, cursors: [] }).text;
this.files.set(path, new TextEncoder().encode(newContent));
return newContent;
}
public async getFileSize(path: RelativePath): Promise<number> {
return (await this.read(path)).length;
}
public async exists(path: RelativePath): Promise<boolean> {
return this.files.has(path);
}
public async createDirectory(_path: RelativePath): Promise<void> {
// This doesn't mean anything in our virtual FS representation
}
public async delete(path: RelativePath): Promise<void> {
this.files.delete(path);
}
public async rename(
oldPath: RelativePath,
newPath: RelativePath
): Promise<void> {
const file = this.files.get(oldPath);
if (!file) {
throw new Error(`File ${oldPath} does not exist`);
}
this.files.set(newPath, file);
if (oldPath !== newPath) {
this.files.delete(oldPath);
}
}
}

View file

@ -1,10 +1,44 @@
import type { SyncClient } from "../../sync-client";
import type { LogLine } from "../../tracing/logger";
/* eslint-disable no-console */
import type { Logger, LogLine } from "../../tracing/logger";
import { LogLevel } from "../../tracing/logger";
export function logToConsole(client: SyncClient): void {
client.logger.onLogEmitted.add((logLine: LogLine) => {
const formatted = `${logLine.timestamp.toISOString()} ${logLine.level} ${logLine.message}`;
const COLORS = {
reset: "\x1b[0m",
red: "\x1b[31m",
yellow: "\x1b[33m",
blue: "\x1b[34m",
gray: "\x1b[90m"
};
export function logToConsole(
logger: Logger,
{ useColors = true }: { useColors?: boolean } = {}
): void {
logger.onLogEmitted.add((logLine: LogLine) => {
const timestamp = logLine.timestamp.toISOString();
const { message } = logLine;
let color = "";
let reset = "";
if (useColors) {
({ reset } = COLORS);
switch (logLine.level) {
case LogLevel.ERROR:
color = COLORS.red;
break;
case LogLevel.WARNING:
color = COLORS.yellow;
break;
case LogLevel.INFO:
color = COLORS.blue;
break;
case LogLevel.DEBUG:
color = COLORS.gray;
break;
}
}
const formatted = `${timestamp} ${color}${logLine.level}${reset} ${message}`;
switch (logLine.level) {
case LogLevel.ERROR:

View file

@ -11,7 +11,7 @@ export function slowWebSocketFactory(
private static readonly RECEIVE_KEY = "websocket-receive";
private static readonly SEND_KEY = "websocket-send";
private readonly locks = new Locks(logger);
private readonly locks = new Locks(FlakyWebSocket.name, logger);
public set onopen(callback: ((event: Event) => void) | null) {
super.onopen = async (event: Event): Promise<void> => {

View file

@ -1,14 +1,17 @@
import type { DocumentRecord } from "../persistence/database";
import type { DocumentRecord } from "../sync-operations/types";
import { EMPTY_HASH } from "./hash";
// TODO: make this smarter so that offline files can be renamed & edited at the same time
export function findMatchingFile(
export async function findMatchingFile(
contentHash: string,
candidates: DocumentRecord[]
): DocumentRecord | undefined {
if (contentHash === EMPTY_HASH) {
): Promise<DocumentRecord | undefined> {
if (contentHash === (await EMPTY_HASH)) {
return undefined;
}
return candidates.find(({ metadata }) => metadata?.hash === contentHash);
return candidates.find(
(record) =>
record.remoteHash !== undefined && record.remoteHash === contentHash
);
}

View file

@ -1,12 +1,14 @@
// https://stackoverflow.com/questions/7616461/generate-a-hash-from-string-in-javascript
export function hash(content: Uint8Array): string {
let result = 0;
// eslint-disable-next-line @typescript-eslint/prefer-for-of
for (let i = 0; i < content.length; i++) {
result = (result << 5) - result + content[i];
result |= 0; // Convert to 32bit integer
}
return Math.abs(result).toString(16).padStart(8, "0");
export async function hash(content: Uint8Array): Promise<string> {
// Re-wrap into a fresh Uint8Array<ArrayBuffer> so SubtleCrypto's
// BufferSource overload accepts it without an unsafe type assertion.
// The lib types require an ArrayBuffer-backed view; the source may
// be backed by SharedArrayBuffer in some runtimes.
const buffer = new ArrayBuffer(content.byteLength);
new Uint8Array(buffer).set(content);
const digest = await crypto.subtle.digest("SHA-256", buffer);
const bytes = new Uint8Array(digest);
return Array.from(bytes, (b) => b.toString(16).padStart(2, "0")).join("");
}
export const EMPTY_HASH = hash(new Uint8Array(0));
// SHA-256 of empty content, computed once at import time
export const EMPTY_HASH: Promise<string> = hash(new Uint8Array());

View file

@ -1,4 +1,4 @@
import { createPromise } from "./create-promise";
import { awaitAll } from "./await-all";
import { sleep } from "./sleep";
/**
@ -45,18 +45,16 @@ export function rateLimit<
newArgs = undefined;
}
const [promise, resolve] = createPromise();
running = promise;
sleep(
// `running` must signal both "minimum interval has elapsed" *and*
// "fn() has finished" — otherwise an `fn` that takes longer than
// the interval would let a queued waiter fire a concurrent `fn`
const interval =
typeof minIntervalMs === "function"
? minIntervalMs()
: minIntervalMs
)
.then(resolve)
.catch(() => {
// sleep cannot fail
});
return fn(...args);
: minIntervalMs;
const fnPromise = fn(...args);
running = awaitAll([fnPromise.catch(() => undefined), sleep(interval)]);
return fnPromise;
};
return decoratedFn;