This commit is contained in:
Andras Schmelczer 2026-04-06 13:01:47 +01:00
parent 0e3e5a99cd
commit d034ad5cb3
50 changed files with 6515 additions and 1492 deletions

View file

@ -1,9 +1,6 @@
import { describe, it } from "node:test";
import type {
Database,
DocumentRecord,
RelativePath
} from "../persistence/database";
import type { DocumentId, DocumentRecord, RelativePath } from "../sync-operations/types";
import type { SyncEventQueue } from "../sync-operations/sync-event-queue";
import { FileOperations } from "./file-operations";
import { Logger } from "../tracing/logger";
import { assertSetContainsExactly } from "../utils/assert-set-contains-exactly";
@ -21,19 +18,18 @@ class MockServerConfig implements Pick<ServerConfig, "getConfig"> {
}
}
class MockDatabase implements Partial<Database> {
public getLatestDocumentByRelativePath(
_target: RelativePath
class MockQueue implements Pick<SyncEventQueue, "getDocument" | "moveDocument"> {
public getDocument(
_path: RelativePath
): DocumentRecord | undefined {
// no-op
return undefined;
}
public move(
_oldRelativePath: RelativePath,
_newRelativePath: RelativePath
): void {
// no-op
public moveDocument(
_oldPath: RelativePath,
_newPath: RelativePath
): DocumentId | undefined {
return undefined;
}
}
@ -89,7 +85,7 @@ describe("File operations", () => {
const fileSystemOperations = new FakeFileSystemOperations();
const fileOperations = new FileOperations(
new Logger(),
new MockDatabase() as Database, // eslint-disable-line @typescript-eslint/no-unsafe-type-assertion
new MockQueue() as SyncEventQueue, // eslint-disable-line @typescript-eslint/no-unsafe-type-assertion
fileSystemOperations,
new MockServerConfig() as ServerConfig // eslint-disable-line @typescript-eslint/no-unsafe-type-assertion
);
@ -119,7 +115,7 @@ describe("File operations", () => {
const fileSystemOperations = new FakeFileSystemOperations();
const fileOperations = new FileOperations(
new Logger(),
new MockDatabase() as Database, // eslint-disable-line @typescript-eslint/no-unsafe-type-assertion
new MockQueue() as SyncEventQueue, // eslint-disable-line @typescript-eslint/no-unsafe-type-assertion
fileSystemOperations,
new MockServerConfig() as ServerConfig // eslint-disable-line @typescript-eslint/no-unsafe-type-assertion
);
@ -159,7 +155,7 @@ describe("File operations", () => {
const fileSystemOperations = new FakeFileSystemOperations();
const fileOperations = new FileOperations(
new Logger(),
new MockDatabase() as Database, // eslint-disable-line @typescript-eslint/no-unsafe-type-assertion
new MockQueue() as SyncEventQueue, // eslint-disable-line @typescript-eslint/no-unsafe-type-assertion
fileSystemOperations,
new MockServerConfig() as ServerConfig // eslint-disable-line @typescript-eslint/no-unsafe-type-assertion
);
@ -178,7 +174,7 @@ describe("File operations", () => {
const fileSystemOperations = new FakeFileSystemOperations();
const fileOperations = new FileOperations(
new Logger(),
new MockDatabase() as Database, // eslint-disable-line @typescript-eslint/no-unsafe-type-assertion
new MockQueue() as SyncEventQueue, // eslint-disable-line @typescript-eslint/no-unsafe-type-assertion
fileSystemOperations,
new MockServerConfig() as ServerConfig // eslint-disable-line @typescript-eslint/no-unsafe-type-assertion
);
@ -207,7 +203,7 @@ describe("File operations", () => {
const fileSystemOperations = new FakeFileSystemOperations();
const fileOperations = new FileOperations(
new Logger(),
new MockDatabase() as Database, // eslint-disable-line @typescript-eslint/no-unsafe-type-assertion
new MockQueue() as SyncEventQueue, // eslint-disable-line @typescript-eslint/no-unsafe-type-assertion
fileSystemOperations,
new MockServerConfig() as ServerConfig // eslint-disable-line @typescript-eslint/no-unsafe-type-assertion
);

View file

@ -1,6 +1,7 @@
import type { Logger } from "../tracing/logger";
import type { FileSystemOperations } from "./filesystem-operations";
import type { Database, RelativePath } from "../persistence/database";
import type { RelativePath } from "../sync-operations/types";
import type { SyncEventQueue } from "../sync-operations/sync-event-queue";
import { SafeFileSystemOperations } from "./safe-filesystem-operations";
import type { TextWithCursors } from "reconcile-text";
import { reconcile } from "reconcile-text";
@ -14,7 +15,7 @@ export class FileOperations {
public constructor(
private readonly logger: Logger,
private readonly database: Database,
private readonly queue: SyncEventQueue,
fs: FileSystemOperations,
private readonly serverConfig: ServerConfig,
private readonly nativeLineEndings = "\n"
@ -58,7 +59,10 @@ export class FileOperations {
return this.fs.write(path, this.toNativeLineEndings(newContent));
}
public async ensureClearPath(path: RelativePath): Promise<void> {
// Returns the deconflicted path if a file was moved, undefined otherwise
public async ensureClearPath(
path: RelativePath
): Promise<RelativePath | undefined> {
if (await this.fs.exists(path)) {
const deconflictedPath = await this.deconflictPath(path);
try {
@ -66,14 +70,16 @@ export class FileOperations {
`Didn't expect ${path} to exist, deconflicting by moving it to '${deconflictedPath}'`
);
this.database.move(path, deconflictedPath);
this.queue.moveDocument(path, deconflictedPath);
await this.fs.rename(path, deconflictedPath, true);
return deconflictedPath;
} finally {
this.fs.unlock(deconflictedPath);
}
} else {
await this.createParentDirectories(path);
}
return undefined;
}
/**
@ -160,21 +166,24 @@ export class FileOperations {
return this.fs.exists(path);
}
// Returns the deconflicted path if a file at the target was displaced
public async move(
oldPath: RelativePath,
newPath: RelativePath
): Promise<void> {
): Promise<RelativePath | undefined> {
if (oldPath === newPath) {
return;
return undefined;
}
await this.ensureClearPath(newPath);
this.database.move(oldPath, newPath);
const deconflictedPath = await this.ensureClearPath(newPath);
this.queue.moveDocument(oldPath, newPath);
await this.fs.rename(oldPath, newPath);
await this.deletingEmptyParentDirectoriesOfDeletedFile(oldPath);
return deconflictedPath;
}
public reset(): void {
this.fs.reset();
}
@ -274,17 +283,15 @@ export class FileOperations {
newName = `${directory}${stem} (${currentCount})${extension}`;
// Avoid multiple deconflictPath calls returning the same path
if (this.fs.tryLock(newName)) {
const newDocument =
this.database.getLatestDocumentByRelativePath(newName);
if (
newDocument?.isDeleted === false || // the document might have been confirmed by the server at a new path but haven't yet moved there locally
(await this.fs.exists(newName, true))
) {
this.fs.unlock(newName);
} else {
return newName;
}
await this.fs.waitForLock(newName);
const existingRecord = this.queue.getDocument(newName);
if (
existingRecord !== undefined || // the document might have been confirmed by the server at a new path but haven't yet moved there locally
(await this.fs.exists(newName, true))
) {
this.fs.unlock(newName);
} else {
return newName;
}
}
}

View file

@ -1,301 +1,2 @@
import type { Logger } from "../tracing/logger";
import { EMPTY_HASH } from "../utils/hash";
import { CoveredValues } from "../utils/data-structures/min-covered";
import { awaitAll } from "../utils/await-all";
import { removeFromArray } from "../utils/remove-from-array";
export type VaultUpdateId = number;
export type DocumentId = string;
export type RelativePath = string;
export interface DocumentMetadata {
documentId: DocumentId;
parentVersionId: VaultUpdateId;
hash: string;
remoteRelativePath?: RelativePath;
}
export interface StoredDocumentMetadata {
relativePath: RelativePath;
documentId: DocumentId;
parentVersionId: VaultUpdateId;
remoteRelativePath?: RelativePath;
hash: string;
}
export interface StoredDatabase {
documents: StoredDocumentMetadata[];
lastSeenUpdateId: VaultUpdateId | undefined;
}
/**
* Represents a document in the database.
*
* It is mutable and its content should always represent the latest
* state of the document on disk based on the update events we have seen.
*/
export interface DocumentRecord {
relativePath: RelativePath;
metadata: DocumentMetadata | undefined;
isDeleted: boolean;
parallelVersion: number;
}
export class Database {
private documents: DocumentRecord[];
private lastSeenUpdateIds: CoveredValues;
public constructor(
private readonly logger: Logger,
initialState: Partial<StoredDatabase> | undefined,
private readonly saveData: (data: StoredDatabase) => Promise<void>
) {
initialState ??= {};
this.documents =
initialState.documents?.map(({ relativePath, ...metadata }) => ({
relativePath,
metadata,
isDeleted: false,
parallelVersion: 0
})) ?? [];
this.ensureConsistency();
this.logger.debug(`Loaded ${this.documents.length} documents`);
const { lastSeenUpdateId } = initialState;
this.logger.debug(`Loaded last seen update id: ${lastSeenUpdateId}`);
this.lastSeenUpdateIds = new CoveredValues(
Math.max(0, lastSeenUpdateId ?? 0) // the first updateId will be 1 which is the first integer after -1
);
this.documents.forEach((doc) => {
this.lastSeenUpdateIds.add(doc.metadata?.parentVersionId);
});
}
public get length(): number {
return this.documents.length;
}
public get resolvedDocuments(): DocumentRecord[] {
const paths = new Map<string, DocumentRecord[]>();
this.documents
// eslint-disable-next-line no-restricted-syntax -- Type narrowing, not removing a specific item
.filter(({ metadata }) => metadata !== undefined)
.forEach((record) =>
paths.set(record.relativePath, [
record,
...(paths.get(record.relativePath) ?? [])
])
);
return Array.from(paths.values()).map((records) => {
records.sort(
(a, b) => b.parallelVersion - a.parallelVersion // descending
);
if (
records.length > 1 &&
records.some((current, i) =>
i === 0
? false
: records[i - 1].parallelVersion ===
current.parallelVersion
)
) {
throw new Error(
`Multiple documents with the same parallel version and path at ${records[0].relativePath}`
);
}
return records[0];
});
}
public updateDocumentMetadata(
metadata: {
documentId: DocumentId;
parentVersionId: VaultUpdateId;
hash: string;
remoteRelativePath: RelativePath;
},
target: DocumentRecord
): void {
if (!this.documents.includes(target)) {
throw new Error("Document not found in database");
}
this.logger.debug(
`Updating document metadata for ${target.relativePath} from ${JSON.stringify(
target.metadata,
null,
2
)} to ${JSON.stringify(metadata, null, 2)}`
);
target.metadata = metadata;
this.saveInTheBackground();
}
public getLatestDocumentByRelativePath(
target: RelativePath
): DocumentRecord | undefined {
const candidates = this.documents.filter(
({ relativePath }) => relativePath === target
);
candidates.sort((a, b) => b.parallelVersion - a.parallelVersion); // descending
return candidates[0];
}
public createNewPendingDocument(
relativePath: RelativePath
): DocumentRecord {
this.logger.debug(`Creating new pending document: ${relativePath}`);
const previousEntry =
this.getLatestDocumentByRelativePath(relativePath);
const entry = {
relativePath,
metadata: undefined,
isDeleted: false,
parallelVersion:
previousEntry?.parallelVersion === undefined
? 0
: previousEntry.parallelVersion + 1
};
this.documents.push(entry);
// no need to save as we only save documents which have metadata
return entry;
}
public getDocumentByDocumentId(
target: DocumentId
): DocumentRecord | undefined {
return this.documents.find(
({ metadata }) => metadata?.documentId === target
);
}
public move(
oldRelativePath: RelativePath,
newRelativePath: RelativePath
): void {
const oldDocument =
this.getLatestDocumentByRelativePath(oldRelativePath);
if (oldDocument === undefined) {
return;
}
const newDocument =
this.getLatestDocumentByRelativePath(newRelativePath);
if (newDocument?.isDeleted === false) {
throw new Error(
`Document already exists at new location: ${newRelativePath}`
);
}
oldDocument.relativePath = newRelativePath;
// We might be in a strange state where the target of the move has just got deleted,
// however, its metadata might already have a bunch of updates queued up for
// the document at the new location. We need to keep these updates.
oldDocument.parallelVersion =
newDocument !== undefined ? newDocument.parallelVersion + 1 : 0;
this.saveInTheBackground();
}
public delete(relativePath: RelativePath): void {
const candidate = this.getLatestDocumentByRelativePath(relativePath);
if (candidate === undefined) {
return;
}
candidate.isDeleted = true;
}
public removeDocument(target: DocumentRecord): void {
removeFromArray(this.documents, target);
this.saveInTheBackground();
}
public getLastSeenUpdateId(): VaultUpdateId {
return this.lastSeenUpdateIds.min;
}
public addSeenUpdateId(value: number): void {
const previousMin = this.lastSeenUpdateIds.min;
this.lastSeenUpdateIds.add(value);
if (previousMin !== this.lastSeenUpdateIds.min) {
this.saveInTheBackground();
}
}
public setLastSeenUpdateId(value: number): void {
this.lastSeenUpdateIds.min = value;
this.saveInTheBackground();
}
public reset(): void {
this.documents = [];
this.lastSeenUpdateIds = new CoveredValues(
0 // the first updateId will be 1 which is the first integer after -1
);
this.saveInTheBackground();
}
public async save(): Promise<void> {
return this.saveData({
documents: this.resolvedDocuments.map(
({ relativePath, metadata }) => ({
relativePath,
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
...metadata! // `resolvedDocuments` only returns docs with metadata set
})
),
lastSeenUpdateId: this.lastSeenUpdateIds.min
});
}
private ensureConsistency(): void {
const idToPath = new Map<string, string[]>();
this.resolvedDocuments.forEach(({ relativePath, metadata }) => {
if (metadata === undefined) {
return;
}
idToPath.set(metadata.documentId, [
...(idToPath.get(metadata.documentId) ?? []),
relativePath
]);
});
const duplicates = Array.from(idToPath.entries())
.filter(([_, paths]) => paths.length > 1)
.map(([id, paths]) => {
let details = "";
for (const path of paths) {
const doc = this.getLatestDocumentByRelativePath(path);
details += `\n- ${JSON.stringify(doc, null, 2)}`;
}
return `${id} (${paths.join(", ")}): ${details}`;
});
if (duplicates.length > 0) {
throw new Error(
"Document IDs are not unique, found duplicates: " +
duplicates.join("; ")
);
}
}
private saveInTheBackground(): void {
this.ensureConsistency();
void this.save().catch((error: unknown) => {
this.logger.error(`Error saving data: ${error}`);
});
}
}
// This file is intentionally empty
// All document tracking has been moved to sync-event-queue.ts

View file

@ -2,13 +2,14 @@ import type {
DocumentId,
RelativePath,
VaultUpdateId
} from "../persistence/database";
} from "../sync-operations/types";
import type { Logger } from "../tracing/logger";
import type { Settings } from "../persistence/settings";
import type { FetchController } from "./fetch-controller";
import { sleep } from "../utils/sleep";
import { SyncResetError } from "../errors/sync-reset-error";
import { HttpClientError } from "../errors/http-client-error";
import type { SerializedError } from "./types/SerializedError";
import type { DocumentVersionWithoutContent } from "./types/DocumentVersionWithoutContent";
import type { DocumentUpdateResponse } from "./types/DocumentUpdateResponse";
@ -139,13 +140,7 @@ export class SyncService {
}
);
if (!response.ok) {
throw new Error(
`Failed to update document: ${await SyncService.errorFromResponse(
response
)}`
);
}
await SyncService.throwIfNotOk(response, "update document");
const result: DocumentUpdateResponse =
(await response.json()) as DocumentUpdateResponse; // eslint-disable-line @typescript-eslint/no-unsafe-type-assertion
@ -192,13 +187,7 @@ export class SyncService {
}
);
if (!response.ok) {
throw new Error(
`Failed to update document: ${await SyncService.errorFromResponse(
response
)}`
);
}
await SyncService.throwIfNotOk(response, "update document");
const result: DocumentUpdateResponse =
(await response.json()) as DocumentUpdateResponse; // eslint-disable-line @typescript-eslint/no-unsafe-type-assertion
@ -413,8 +402,10 @@ export class SyncService {
try {
return await fn();
} catch (e) {
// We must not retry errors coming from reset
if (e instanceof SyncResetError) {
if (
e instanceof SyncResetError ||
e instanceof HttpClientError
) {
throw e;
}
@ -427,4 +418,16 @@ export class SyncService {
}
}
}
private static async throwIfNotOk(
response: Response,
operation: string
): Promise<void> {
if (response.ok) return;
const message = `Failed to ${operation}: ${await SyncService.errorFromResponse(response)}`;
if (response.status >= 400 && response.status < 500) {
throw new HttpClientError(response.status, message);
}
throw new Error(message);
}
}

View file

@ -0,0 +1,7 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { VaultInfo } from "./VaultInfo";
/**
* Response to listing vaults accessible to the authenticated user.
*/
export interface ListVaultsResponse { vaults: VaultInfo[], hasMore: boolean, userName: string, }

View file

@ -0,0 +1,6 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
/**
* Summary of a single vault returned by the list-vaults endpoint.
*/
export interface VaultInfo { name: string, documentCount: number, createdAt: string | null, }

View file

@ -4,7 +4,6 @@ import type { WebSocketServerMessage } from "./types/WebSocketServerMessage";
import type { WebSocketClientMessage } from "./types/WebSocketClientMessage";
import type { CursorPositionFromClient } from "./types/CursorPositionFromClient";
import type { ClientCursors } from "./types/ClientCursors";
import { createPromise } from "../utils/create-promise";
import type { WebSocketVaultUpdate } from "./types/WebSocketVaultUpdate";
import {
WEBSOCKET_DISCONNECT_TIMEOUT_IN_SECONDS,
@ -42,6 +41,10 @@ export class WebSocketManager {
private readonly webSocketFactoryImplementation: typeof globalThis.WebSocket = WebSocket
) {}
public get hasOutstandingWork(): boolean {
return this.outstandingPromises.length > 0;
}
public get isWebSocketConnected(): boolean {
return (
this.webSocket?.readyState ===
@ -55,7 +58,7 @@ export class WebSocketManager {
}
public async stop(): Promise<void> {
const [promise, resolve] = createPromise();
const { promise, resolve } = Promise.withResolvers<void>();
this.resolveDisconnectingPromise = resolve;
this.isStopped = true;

View file

@ -2,8 +2,8 @@ import type { PersistenceProvider } from "./persistence/persistence";
import type { HistoryEntry, HistoryStats } from "./tracing/sync-history";
import { SyncHistory } from "./tracing/sync-history";
import { Logger, LogLevel, LogLine } from "./tracing/logger";
import type { RelativePath, StoredDatabase } from "./persistence/database";
import { Database } from "./persistence/database";
import type { RelativePath, StoredSyncState } from "./sync-operations/types";
import { SyncEventQueue } from "./sync-operations/sync-event-queue";
import * as Sentry from "@sentry/browser";
import type { SyncSettings } from "./persistence/settings";
import { DEFAULT_SETTINGS, Settings } from "./persistence/settings";
@ -12,7 +12,6 @@ import { Syncer } from "./sync-operations/syncer";
import type { FileSystemOperations } from "./file-operations/filesystem-operations";
import { FileOperations } from "./file-operations/file-operations";
import { FetchController } from "./services/fetch-controller";
import { UnrestrictedSyncer } from "./sync-operations/unrestricted-syncer";
import { rateLimit } from "./utils/rate-limit";
import type { NetworkConnectionStatus } from "./types/network-connection-status";
import { DocumentSyncStatus } from "./types/document-sync-status";
@ -40,7 +39,7 @@ export class SyncClient {
public readonly logger: Logger,
private readonly history: SyncHistory,
private readonly settings: Settings,
private readonly database: Database,
private readonly syncEventQueue: SyncEventQueue,
private readonly syncer: Syncer,
private readonly webSocketManager: WebSocketManager,
private readonly fetchController: FetchController,
@ -52,13 +51,13 @@ export class SyncClient {
private readonly persistence: PersistenceProvider<
Partial<{
settings: Partial<SyncSettings>;
database: Partial<StoredDatabase>;
database: Partial<StoredSyncState>;
}>
>
) { }
public get documentCount(): number {
return this.database.length;
return this.syncEventQueue.documentCount;
}
public get isWebSocketConnected(): boolean {
@ -111,7 +110,7 @@ export class SyncClient {
persistence: PersistenceProvider<
Partial<{
settings: Partial<SyncSettings>;
database: Partial<StoredDatabase>;
database: Partial<StoredSyncState>;
}>
>;
fetch?: typeof globalThis.fetch;
@ -136,8 +135,6 @@ export class SyncClient {
state.settings,
async (data): Promise<void> => {
state = { ...state, settings: data };
// we're not rate-limiting settings saves as (1) we need to initialise the settings to know the rate limit
// and (2) settings changes are infrequent enough that rate-limiting is not necessary
await persistence.save(state);
}
);
@ -147,7 +144,8 @@ export class SyncClient {
() => settings.getSettings().minimumSaveIntervalMs
);
const database = new Database(
const syncEventQueue = new SyncEventQueue(
settings,
logger,
state.database,
async (data): Promise<void> => {
@ -173,7 +171,7 @@ export class SyncClient {
const fileOperations = new FileOperations(
logger,
database,
syncEventQueue,
fs,
serverConfig,
nativeLineEndings
@ -182,16 +180,6 @@ export class SyncClient {
const contentCache = new FixedSizeDocumentCache(
1024 * 1024 * DIFF_CACHE_SIZE_MB
);
const unrestrictedSyncer = new UnrestrictedSyncer(
logger,
database,
settings,
syncService,
fileOperations,
history,
contentCache,
serverConfig
);
const webSocketManager = new WebSocketManager(
logger,
@ -202,17 +190,20 @@ export class SyncClient {
const syncer = new Syncer(
deviceId,
logger,
database,
settings,
webSocketManager,
fileOperations,
unrestrictedSyncer
syncService,
history,
contentCache,
serverConfig,
syncEventQueue
);
const fileChangeNotifier = new FileChangeNotifier();
const cursorTracker = new CursorTracker(
logger,
database,
syncEventQueue,
webSocketManager,
fileOperations,
fileChangeNotifier
@ -221,7 +212,7 @@ export class SyncClient {
logger,
history,
settings,
database,
syncEventQueue,
syncer,
webSocketManager,
fetchController,
@ -319,7 +310,7 @@ export class SyncClient {
/**
* Wait for the in-flight operations to finish, reset all tracking,
* and the local database but retain the settings.
* and the local state but retain the settings.
* The SyncClient can be used again after calling this method.
*/
public async reset(): Promise<void> {
@ -330,10 +321,9 @@ export class SyncClient {
);
await this.pause();
// clear all local state
this.logger.info("Resetting SyncClient's local state");
this.database.reset();
await this.database.save(); // ensure the new database reads as empty
this.syncEventQueue.resetState();
await this.syncEventQueue.save();
this.resetInMemoryState();
this.hasFinishedOfflineSync = false;
this.serverConfig.reset();
@ -362,40 +352,47 @@ export class SyncClient {
await this.settings.setSettings(value);
}
public async syncLocallyCreatedFile(
public syncLocallyCreatedFile(
relativePath: RelativePath
): Promise<void> {
): void {
this.checkIfDestroyed("syncLocallyCreatedFile");
this.fileChangeNotifier.notifyOfFileChange(relativePath);
return this.syncer.syncLocallyCreatedFile(relativePath);
this.syncer.syncLocallyCreatedFile(relativePath);
}
public async syncLocallyDeletedFile(
public syncLocallyDeletedFile(
relativePath: RelativePath
): Promise<void> {
): void {
this.checkIfDestroyed("syncLocallyDeletedFile");
this.fileChangeNotifier.notifyOfFileChange(relativePath);
return this.syncer.syncLocallyDeletedFile(relativePath);
this.syncer.syncLocallyDeletedFile(relativePath);
}
public async syncLocallyUpdatedFile({
public syncLocallyUpdatedFile({
oldPath,
relativePath
}: {
oldPath?: RelativePath;
relativePath: RelativePath;
}): Promise<void> {
}): void {
this.checkIfDestroyed("syncLocallyUpdatedFile");
this.fileChangeNotifier.notifyOfFileChange(relativePath);
return this.syncer.syncLocallyUpdatedFile({
this.syncer.syncLocallyUpdatedFile({
oldPath,
relativePath
});
}
public get hasPendingWork(): boolean {
return (
this.syncEventQueue.size > 0 ||
this.webSocketManager.hasOutstandingWork
);
}
public getDocumentSyncingStatus(
relativePath: RelativePath
): DocumentSyncStatus {
@ -426,7 +423,7 @@ export class SyncClient {
this.checkIfDestroyed("waitUntilIdle");
await this.syncer.waitUntilFinished();
await this.webSocketManager.waitUntilFinished();
await this.database.save(); // flush all changes to disk
await this.syncEventQueue.save();
}
/**
@ -436,7 +433,6 @@ export class SyncClient {
public async destroy(): Promise<void> {
this.checkIfDestroyed("destroy");
// Prevent concurrent destroy calls
if (this.isDestroying) {
this.logger.warn(
"destroy() called while already destroying, ignoring"
@ -445,14 +441,12 @@ export class SyncClient {
}
this.isDestroying = true;
// cancel everything that's in progress
await this.pause();
this.hasBeenDestroyed = true;
this.resetInMemoryState();
// Clean up event listeners to prevent memory leaks
this.eventUnsubscribers.forEach((unsubscribe) => {
unsubscribe();
});
@ -467,7 +461,6 @@ export class SyncClient {
this.checkIfDestroyed("startSyncing");
this.fetchController.finishReset();
// warm the cache
await this.serverConfig.getConfig();
await this.syncer.scheduleSyncForOfflineChanges();
@ -486,7 +479,6 @@ export class SyncClient {
private resetInMemoryState(): void {
this.history.reset();
this.contentCache.reset();
// don't reset the logger
this.cursorTracker.reset();
this.syncer.reset();
this.fileOperations.reset();

View file

@ -1,5 +1,6 @@
import type { FileOperations } from "../file-operations/file-operations";
import type { Database, RelativePath } from "../persistence/database";
import type { RelativePath } from "./types";
import type { SyncEventQueue } from "./sync-event-queue";
import type { ClientCursors } from "../services/types/ClientCursors";
import type { CursorSpan } from "../services/types/CursorSpan";
import type { DocumentWithCursors } from "../services/types/DocumentWithCursors";
@ -35,7 +36,7 @@ export class CursorTracker {
public constructor(
private readonly logger: Logger,
private readonly database: Database,
private readonly queue: SyncEventQueue,
private readonly webSocketManager: WebSocketManager,
private readonly fileOperations: FileOperations,
private readonly fileChangeNotifier: FileChangeNotifier
@ -104,21 +105,16 @@ export class CursorTracker {
for (const [relativePath, cursors] of Object.entries(
documentToCursors
)) {
const record =
this.database.getLatestDocumentByRelativePath(relativePath);
const record = this.queue.getDocument(relativePath);
if (!record) {
continue; // Let's wait for the file to be created before sending cursors
}
if (!record.metadata) {
continue; // this is a new document, no need to sync the cursors
}
documentsWithCursors.push({
relative_path: relativePath,
document_id: record.metadata.documentId,
vault_update_id: record.metadata.parentVersionId,
document_id: record.documentId,
vault_update_id: record.parentVersionId,
cursors: cursors.map(({ start, end }) => ({
start: Math.min(start, end),
end: Math.max(start, end)
@ -139,10 +135,8 @@ export class CursorTracker {
const readContent = await this.fileOperations.read(
doc.relative_path
);
const record = this.database.getLatestDocumentByRelativePath(
doc.relative_path
);
if (record?.metadata?.hash !== (await hash(readContent))) {
const record = this.queue.getDocument(doc.relative_path);
if (record?.hash !== (await hash(readContent))) {
doc.vault_update_id = null;
}
}
@ -227,9 +221,7 @@ export class CursorTracker {
private async getDocumentUpToDateness(
document: DocumentWithCursors
): Promise<DocumentUpToDateness> {
const record = this.database.getLatestDocumentByRelativePath(
document.relative_path
);
const record = this.queue.getDocument(document.relative_path);
if (!record) {
// the document of the cursor must be from the future
@ -237,13 +229,11 @@ export class CursorTracker {
}
if (
(record.metadata?.parentVersionId ?? 0) <
(document.vault_update_id ?? 0)
record.parentVersionId < (document.vault_update_id ?? 0)
) {
return DocumentUpToDateness.Later;
} else if (
(document.vault_update_id ?? 0) <
(record.metadata?.parentVersionId ?? 0)
(document.vault_update_id ?? 0) < record.parentVersionId
) {
// the document of the cursor must be from the past
return DocumentUpToDateness.Prior;
@ -253,9 +243,8 @@ export class CursorTracker {
document.relative_path
);
return this.database.getLatestDocumentByRelativePath(
document.relative_path
)?.metadata?.hash === (await hash(currentContent))
const currentRecord = this.queue.getDocument(document.relative_path);
return currentRecord?.hash === (await hash(currentContent))
? DocumentUpToDateness.UpToDate
: DocumentUpToDateness.Prior;
}

View file

@ -1,46 +1,443 @@
import { describe, it } from "node:test";
import assert from "node:assert";
import { SyncEventQueue, type SyncEvent } from "./sync-event-queue";
import { SyncEventQueue } from "./sync-event-queue";
import { Settings } from "../persistence/settings";
import { Logger } from "../tracing/logger";
import type { DocumentVersionWithoutContent } from "../services/types/DocumentVersionWithoutContent";
import { SyncEventType } from "./types";
function createQueue(ignorePatterns: string[] = []): SyncEventQueue {
const logger = new Logger();
const settings = new Settings(logger, { ignorePatterns }, async () => {});
return new SyncEventQueue(settings, logger, undefined, async () => {});
}
function fakeRemoteVersion(
documentId: string,
overrides: Partial<DocumentVersionWithoutContent> = {}
): DocumentVersionWithoutContent {
return {
vaultUpdateId: 1,
documentId,
relativePath: `${documentId}.md`,
updatedDate: "2026-01-01",
isDeleted: false,
userId: "user",
deviceId: "device",
contentSize: 100,
...overrides
};
}
describe("SyncEventQueue", () => {
it("delete collapses interleaved events for one document while leaving the other intact", () => {
const queue = new SyncEventQueue();
queue.enqueue({ type: "local-content-update", documentId: "A" });
queue.enqueue({ type: "remote-content-update", documentId: "B" });
queue.enqueue({ type: "local-content-update", documentId: "A" });
queue.enqueue({ type: "move", documentId: "A" });
queue.enqueue({ type: "remote-content-update", documentId: "A" });
queue.enqueue({ type: "delete", documentId: "A" });
queue.enqueue({ type: "local-content-update", documentId: "B" });
assert.deepStrictEqual(queue.next(), { type: "delete", documentId: "A" });
assert.deepStrictEqual(queue.next(), {
type: "local-content-update",
documentId: "B"
it("sync-local followed by delete for the same document returns only the delete", () => {
const queue = createQueue();
queue.setDocument("a.md", {
documentId: "A",
parentVersionId: 1,
hash: "hash-a"
});
queue.enqueue({ type: SyncEventType.SyncLocal, documentId: "A" });
queue.enqueue({ type: SyncEventType.SyncLocal, documentId: "A" });
queue.enqueue({
type: SyncEventType.Delete,
documentId: "A",
path: "a.md",
});
const event = queue.next();
assert.strictEqual(event?.type, SyncEventType.Delete);
if (event?.type === SyncEventType.Delete) {
assert.strictEqual(event.documentId, "A");
}
assert.strictEqual(queue.next(), undefined);
});
it("updates coalesce up to a move boundary then post-move events are processed separately", () => {
const queue = new SyncEventQueue();
queue.enqueue({ type: "local-content-update", documentId: "X" });
queue.enqueue({ type: "remote-content-update", documentId: "X" });
queue.enqueue({ type: "file-create", path: "new.md" });
queue.enqueue({ type: "local-content-update", documentId: "X" });
queue.enqueue({ type: "move", documentId: "X" });
queue.enqueue({ type: "remote-content-update", documentId: "X" });
queue.enqueue({ type: "local-content-update", documentId: "X" });
it("sync-local events for the same document coalesce to one", () => {
const queue = createQueue();
queue.setDocument("a.md", {
documentId: "A",
parentVersionId: 1,
hash: "hash-a"
});
assert.deepStrictEqual(queue.next(), {
type: "local-content-update",
documentId: "X"
});
assert.deepStrictEqual(queue.next(), { type: "file-create", path: "new.md" });
assert.deepStrictEqual(queue.next(), { type: "move", documentId: "X" });
assert.deepStrictEqual(queue.next(), {
type: "local-content-update",
documentId: "X"
});
queue.enqueue({ type: SyncEventType.SyncLocal, documentId: "A" });
queue.enqueue({ type: SyncEventType.SyncLocal, documentId: "A" });
queue.enqueue({ type: SyncEventType.SyncLocal, documentId: "A" });
const event = queue.next();
assert.strictEqual(event?.type, SyncEventType.SyncLocal);
assert.strictEqual(queue.next(), undefined);
});
it("sync-remote events for the same documentId coalesce to the last one", () => {
const queue = createQueue();
queue.enqueue({
type: SyncEventType.SyncRemote,
remoteVersion: fakeRemoteVersion("A", { vaultUpdateId: 1 })
});
queue.enqueue({
type: SyncEventType.SyncRemote,
remoteVersion: fakeRemoteVersion("A", { vaultUpdateId: 2 })
});
queue.enqueue({
type: SyncEventType.SyncRemote,
remoteVersion: fakeRemoteVersion("A", { vaultUpdateId: 3 })
});
const event = queue.next();
assert.strictEqual(event?.type, SyncEventType.SyncRemote);
if (event?.type === SyncEventType.SyncRemote) {
assert.strictEqual(event.remoteVersion.vaultUpdateId, 3);
}
assert.strictEqual(queue.next(), undefined);
});
it("create events are returned FIFO", () => {
const queue = createQueue();
queue.enqueue({ type: SyncEventType.Create, path: "a.md" });
queue.enqueue({ type: SyncEventType.Create, path: "b.md" });
const first = queue.next();
assert.strictEqual(first?.type, SyncEventType.Create);
if (first?.type === SyncEventType.Create) {
assert.strictEqual(first.path, "a.md");
}
const second = queue.next();
assert.strictEqual(second?.type, SyncEventType.Create);
if (second?.type === SyncEventType.Create) {
assert.strictEqual(second.path, "b.md");
}
});
it("duplicate creates for the same path are skipped", () => {
const queue = createQueue();
queue.enqueue({ type: SyncEventType.Create, path: "a.md" });
queue.enqueue({ type: SyncEventType.Create, path: "a.md" });
assert.strictEqual(queue.size, 1);
});
it("create is skipped if the path already has a tracked document", () => {
const queue = createQueue();
queue.setDocument("a.md", {
documentId: "A",
parentVersionId: 1,
hash: "hash-a"
});
queue.enqueue({ type: SyncEventType.Create, path: "a.md" });
assert.strictEqual(queue.size, 0);
});
it("delete uses the provided documentId", () => {
const queue = createQueue();
queue.enqueue({
type: SyncEventType.Delete,
documentId: "A",
path: "a.md",
});
const event = queue.next();
assert.strictEqual(event?.type, SyncEventType.Delete);
if (event?.type === SyncEventType.Delete) {
assert.strictEqual(event.documentId, "A");
}
});
it("updateCreatePath updates the path of a create event in the queue", () => {
const queue = createQueue();
queue.enqueue({ type: SyncEventType.Create, path: "old.md" });
const updated = queue.updateCreatePath("old.md", "new.md");
assert.strictEqual(updated, true);
assert.strictEqual(queue.hasCreateEvent("old.md"), false);
assert.strictEqual(queue.hasCreateEvent("new.md"), true);
const event = queue.next();
assert.strictEqual(event?.type, SyncEventType.Create);
if (event?.type === SyncEventType.Create) {
assert.strictEqual(event.path, "new.md");
}
});
it("updateCreatePath returns false when no create event exists", () => {
const queue = createQueue();
const updated = queue.updateCreatePath("old.md", "new.md");
assert.strictEqual(updated, false);
});
it("hasCreateEvent detects pending creates", () => {
const queue = createQueue();
assert.strictEqual(queue.hasCreateEvent("a.md"), false);
queue.enqueue({ type: SyncEventType.Create, path: "a.md" });
assert.strictEqual(queue.hasCreateEvent("a.md"), true);
queue.next();
assert.strictEqual(queue.hasCreateEvent("a.md"), false);
});
it("document store CRUD operations work correctly", () => {
const queue = createQueue();
assert.strictEqual(queue.getDocument("a.md"), undefined);
assert.strictEqual(queue.documentCount, 0);
queue.setDocument("a.md", {
documentId: "A",
parentVersionId: 1,
hash: "hash-a"
});
assert.strictEqual(queue.documentCount, 1);
assert.deepStrictEqual(queue.getDocument("a.md"), {
documentId: "A",
parentVersionId: 1,
hash: "hash-a"
});
const found = queue.getDocumentByDocumentId("A");
assert.strictEqual(found?.path, "a.md");
assert.strictEqual(found?.record.documentId, "A");
queue.removeDocument("a.md");
assert.strictEqual(queue.documentCount, 0);
assert.strictEqual(queue.getDocument("a.md"), undefined);
});
it("moveDocument moves a document and returns displaced documentId", () => {
const queue = createQueue();
queue.setDocument("a.md", {
documentId: "A",
parentVersionId: 1,
hash: "hash-a"
});
queue.setDocument("b.md", {
documentId: "B",
parentVersionId: 2,
hash: "hash-b"
});
const displacedId = queue.moveDocument("a.md", "b.md");
assert.strictEqual(displacedId, "B");
assert.strictEqual(queue.getDocument("a.md"), undefined);
assert.strictEqual(queue.getDocument("b.md")?.documentId, "A");
assert.strictEqual(queue.documentCount, 1);
});
it("moveDocument returns undefined when target is unoccupied", () => {
const queue = createQueue();
queue.setDocument("a.md", {
documentId: "A",
parentVersionId: 1,
hash: "hash-a"
});
const displacedId = queue.moveDocument("a.md", "b.md");
assert.strictEqual(displacedId, undefined);
assert.strictEqual(queue.getDocument("b.md")?.documentId, "A");
});
it("interleaved events for different documents are not confused", () => {
const queue = createQueue();
queue.setDocument("a.md", {
documentId: "A",
parentVersionId: 1,
hash: "hash-a"
});
queue.setDocument("b.md", {
documentId: "B",
parentVersionId: 2,
hash: "hash-b"
});
queue.enqueue({ type: SyncEventType.SyncLocal, documentId: "A" });
queue.enqueue({ type: SyncEventType.SyncLocal, documentId: "B" });
queue.enqueue({
type: SyncEventType.Delete,
documentId: "A",
path: "a.md",
});
queue.enqueue({ type: SyncEventType.SyncLocal, documentId: "B" });
// First next() should see the delete for A (coalescing sync-local + delete)
const first = queue.next();
assert.strictEqual(first?.type, SyncEventType.Delete);
if (first?.type === SyncEventType.Delete) {
assert.strictEqual(first.documentId, "A");
}
// Remaining should be the coalesced sync-local for B
const second = queue.next();
assert.strictEqual(second?.type, SyncEventType.SyncLocal);
if (second?.type === SyncEventType.SyncLocal) {
assert.strictEqual(second.documentId, "B");
}
assert.strictEqual(queue.next(), undefined);
});
it("delete discards subsequent sync-remote events for the same document", () => {
const queue = createQueue();
queue.enqueue({
type: SyncEventType.Delete,
documentId: "A",
path: "a.md",
});
queue.enqueue({
type: SyncEventType.SyncRemote,
remoteVersion: fakeRemoteVersion("A", { vaultUpdateId: 5 })
});
const event = queue.next();
assert.strictEqual(event?.type, SyncEventType.Delete);
assert.strictEqual(queue.next(), undefined);
});
it("delete discards subsequent sync-local and sync-remote for the same document", () => {
const queue = createQueue();
queue.setDocument("a.md", {
documentId: "A",
parentVersionId: 1,
hash: "hash-a"
});
queue.enqueue({
type: SyncEventType.Delete,
documentId: "A",
path: "a.md",
});
queue.enqueue({ type: SyncEventType.SyncLocal, documentId: "A" });
queue.enqueue({ type: SyncEventType.Create, path: "b.md" });
queue.enqueue({
type: SyncEventType.SyncRemote,
remoteVersion: fakeRemoteVersion("A", { vaultUpdateId: 5 })
});
const first = queue.next();
assert.strictEqual(first?.type, SyncEventType.Delete);
// Only the unrelated create should remain
const second = queue.next();
assert.strictEqual(second?.type, SyncEventType.Create);
assert.strictEqual(queue.next(), undefined);
});
it("delete with empty documentId does not discard other events", () => {
const queue = createQueue();
queue.setDocument("a.md", {
documentId: "A",
parentVersionId: 1,
hash: "hash-a"
});
queue.enqueue({
type: SyncEventType.Delete,
documentId: "",
path: "unknown.md",
});
queue.enqueue({ type: SyncEventType.SyncLocal, documentId: "A" });
queue.next();
const second = queue.next();
assert.strictEqual(second?.type, SyncEventType.SyncLocal);
});
it("create can be re-enqueued after being dequeued", () => {
const queue = createQueue();
queue.enqueue({ type: SyncEventType.Create, path: "a.md" });
queue.next();
queue.enqueue({ type: SyncEventType.Create, path: "a.md" });
assert.strictEqual(queue.size, 1);
});
it("silently ignores create events matching ignore patterns", () => {
const queue = createQueue(["*.tmp", ".hidden/**"]);
queue.enqueue({ type: SyncEventType.Create, path: "scratch.tmp" });
queue.enqueue({
type: SyncEventType.Create,
path: ".hidden/secret.md",
});
assert.strictEqual(queue.size, 0);
queue.enqueue({ type: SyncEventType.Create, path: "notes-new.md" });
assert.strictEqual(queue.size, 1);
queue.enqueue({
type: SyncEventType.SyncRemote,
remoteVersion: fakeRemoteVersion("N")
});
assert.strictEqual(queue.size, 2);
});
it("clear removes events but keeps documents", () => {
const queue = createQueue();
queue.setDocument("a.md", {
documentId: "A",
parentVersionId: 1,
hash: "hash-a"
});
queue.enqueue({ type: SyncEventType.Create, path: "b.md" });
queue.enqueue({ type: SyncEventType.SyncLocal, documentId: "A" });
assert.strictEqual(queue.size, 2);
queue.clear();
assert.strictEqual(queue.size, 0);
assert.strictEqual(queue.documentCount, 1);
assert.strictEqual(queue.getDocument("a.md")?.documentId, "A");
});
it("allDocuments returns all tracked documents", () => {
const queue = createQueue();
queue.setDocument("a.md", {
documentId: "A",
parentVersionId: 1,
hash: "hash-a"
});
queue.setDocument("b.md", {
documentId: "B",
parentVersionId: 2,
hash: "hash-b"
});
const docs = queue.allDocuments();
assert.strictEqual(docs.length, 2);
const paths = docs.map(([p]) => p).sort();
assert.deepStrictEqual(paths, ["a.md", "b.md"]);
});
it("loads initial state from persistence", () => {
const logger = new Logger();
const settings = new Settings(logger, {}, async () => {});
const queue = new SyncEventQueue(settings, logger, {
documents: [
{
relativePath: "a.md",
documentId: "A",
parentVersionId: 5,
hash: "hash-a"
},
{
relativePath: "b.md",
documentId: "B",
parentVersionId: 3,
hash: "hash-b"
}
],
lastSeenUpdateId: 4
}, async () => {});
assert.strictEqual(queue.documentCount, 2);
assert.strictEqual(queue.getDocument("a.md")?.documentId, "A");
assert.strictEqual(queue.getDocument("b.md")?.documentId, "B");
assert.strictEqual(queue.getLastSeenUpdateId(), 5);
});
});

View file

@ -1,85 +1,336 @@
import type { DocumentId, RelativePath } from "../persistence/database";
export type SyncEvent =
| { type: "file-create"; path: RelativePath }
| { type: "local-content-update"; documentId: DocumentId }
| { type: "remote-content-update"; documentId: DocumentId }
| { type: "move"; documentId: DocumentId }
| { type: "delete"; documentId: DocumentId };
import type { Settings } from "../persistence/settings";
import type { Logger } from "../tracing/logger";
import { globsToRegexes } from "../utils/globs-to-regexes";
import { CoveredValues } from "../utils/data-structures/min-covered";
import { removeFromArray } from "../utils/remove-from-array";
import {
SyncEventType,
type DocumentId,
type DocumentRecord,
type RelativePath,
type StoredSyncState,
type SyncEvent,
type VaultUpdateId,
} from "./types";
export class SyncEventQueue {
private readonly events: SyncEvent[] = [];
private readonly documents = new Map<RelativePath, DocumentRecord>();
private readonly recentlyDeletedDocumentIds = new Set<DocumentId>();
private lastSeenUpdateIds: CoveredValues;
private ignorePatterns: RegExp[];
public constructor(
private readonly settings: Settings,
private readonly logger: Logger,
initialState: Partial<StoredSyncState> | undefined,
private readonly saveData: (data: StoredSyncState) => Promise<void>
) {
this.ignorePatterns = globsToRegexes(
this.settings.getSettings().ignorePatterns,
this.logger
);
this.settings.onSettingsChanged.add((newSettings) => {
this.ignorePatterns = globsToRegexes(
newSettings.ignorePatterns,
this.logger
);
});
initialState ??= {};
if (initialState.documents !== undefined) {
for (const { relativePath, ...record } of initialState.documents) {
this.documents.set(relativePath, record);
}
}
const { lastSeenUpdateId } = initialState;
this.lastSeenUpdateIds = new CoveredValues(
Math.max(0, lastSeenUpdateId ?? 0)
);
for (const [, record] of this.documents) {
this.lastSeenUpdateIds.add(record.parentVersionId);
}
this.logger.debug(`Loaded ${this.documents.size} documents`);
}
public get size(): number {
return this.events.length;
}
public get documentCount(): number {
return this.documents.size;
}
public getLastSeenUpdateId(): VaultUpdateId {
return this.lastSeenUpdateIds.min;
}
public addSeenUpdateId(value: number): void {
const previousMin = this.lastSeenUpdateIds.min;
this.lastSeenUpdateIds.add(value);
if (previousMin !== this.lastSeenUpdateIds.min) {
this.saveInTheBackground();
}
}
public setLastSeenUpdateId(value: number): void {
this.lastSeenUpdateIds.min = value;
this.saveInTheBackground();
}
public getDocument(path: RelativePath): DocumentRecord | undefined {
return this.documents.get(path);
}
public getDocumentByDocumentId(
target: DocumentId
): { path: RelativePath; record: DocumentRecord } | undefined {
for (const [path, record] of this.documents) {
if (record.documentId === target) {
return { path, record };
}
}
return undefined;
}
public setDocument(path: RelativePath, record: DocumentRecord): void {
this.documents.set(path, record);
this.saveInTheBackground();
}
public removeDocument(path: RelativePath): void {
const record = this.documents.get(path);
if (record !== undefined) {
this.recentlyDeletedDocumentIds.add(record.documentId);
}
this.documents.delete(path);
this.saveInTheBackground();
}
/**
* Move a document from oldPath to newPath.
* If the target path is occupied by a different document, it is removed
* and its documentId is returned so the caller can handle the displacement.
*/
public moveDocument(
oldPath: RelativePath,
newPath: RelativePath
): DocumentId | undefined {
const record = this.documents.get(oldPath);
if (record === undefined) return undefined;
let displacedDocumentId: DocumentId | undefined = undefined;
const existingAtTarget = this.documents.get(newPath);
if (
existingAtTarget !== undefined &&
existingAtTarget.documentId !== record.documentId
) {
displacedDocumentId = existingAtTarget.documentId;
this.recentlyDeletedDocumentIds.add(displacedDocumentId);
this.documents.delete(newPath);
}
this.documents.delete(oldPath);
this.documents.set(newPath, record);
this.saveInTheBackground();
return displacedDocumentId;
}
public wasRecentlyDeleted(documentId: DocumentId): boolean {
return this.recentlyDeletedDocumentIds.has(documentId);
}
public unmarkRecentlyDeleted(documentId: DocumentId): void {
this.recentlyDeletedDocumentIds.delete(documentId);
}
public allDocuments(): [RelativePath, DocumentRecord][] {
return Array.from(this.documents.entries());
}
public hasCreateEvent(path: RelativePath): boolean {
return this.events.some(
(e) => e.type === SyncEventType.Create && e.path === path
);
}
public updateCreatePath(
oldPath: RelativePath,
newPath: RelativePath
): boolean {
for (const event of this.events) {
if (event.type === SyncEventType.Create && event.path === oldPath) {
event.path = newPath;
return true;
}
}
return false;
}
public hasPendingEventsForPath(path: RelativePath): boolean {
const record = this.documents.get(path);
const docId = record?.documentId;
return this.events.some(
(e) =>
(e.type === SyncEventType.Create && e.path === path) ||
(e.type === SyncEventType.SyncLocal &&
docId !== undefined &&
e.documentId === docId) ||
(e.type === SyncEventType.Delete &&
docId !== undefined &&
e.documentId === docId) ||
(e.type === SyncEventType.SyncRemote &&
e.remoteVersion.relativePath === path)
);
}
public async save(): Promise<void> {
return this.saveData({
documents: Array.from(this.documents.entries()).map(
([relativePath, record]) => ({
relativePath,
...record
})
),
lastSeenUpdateId: this.lastSeenUpdateIds.min
});
}
public resetState(): void {
this.documents.clear();
this.recentlyDeletedDocumentIds.clear();
this.lastSeenUpdateIds = new CoveredValues(0);
this.saveInTheBackground();
}
public clear(): void {
this.events.length = 0;
this.recentlyDeletedDocumentIds.clear();
}
public enqueue(event: SyncEvent): void {
if (this.isIgnored(event)) return;
if (event.type === SyncEventType.Create) {
if (this.documents.has(event.path)) return;
if (this.hasCreateEvent(event.path)) return;
}
this.events.push(event);
}
public next(): SyncEvent | undefined {
if (this.events.length === 0) return undefined;
const first = this.events[0];
if (first.type === "file-create") {
const [first] = this.events;
// Creates are always returned immediately (FIFO)
if (first.type === SyncEventType.Create) {
this.events.shift();
return first;
}
const { documentId } = first;
// If there's an eventual delete, discard everything for this document
const deleteEvent = this.events.find(
(e) => e.type === "delete" && e.documentId === documentId
);
if (deleteEvent) {
this.removeAllForDocument(documentId);
return deleteEvent;
}
// Coalesce updates: return the last update before the next move for this document.
// Moves act as barriers since they depend on each other
const moveIndex = this.events.findIndex(
(e) => e.type === "move" && e.documentId === documentId
);
const boundary = moveIndex === -1 ? this.events.length : moveIndex;
const updateIndices: number[] = [];
for (let i = 0; i < boundary; i++) {
const e = this.events[i];
if (
(e.type === "local-content-update" ||
e.type === "remote-content-update") &&
e.documentId === documentId
) {
updateIndices.push(i);
// Deletes are returned immediately; also discard any subsequent
// events for the same documentId so stale broadcasts don't
// resurrect the document
if (first.type === SyncEventType.Delete) {
this.events.shift();
const { documentId } = first;
if (documentId !== "") {
this.removeAllEventsForDocumentId(documentId);
}
return first;
}
if (updateIndices.length > 0) {
const result = this.events[updateIndices[updateIndices.length - 1]];
for (let i = updateIndices.length - 1; i >= 0; i--) {
this.events.splice(updateIndices[i], 1);
if (first.type === SyncEventType.SyncLocal) {
const { documentId } = first;
// If there's a later delete for the same documentId, discard
// all sync-locals for that document and return the delete
const deleteEvent = this.events.find(
(e) =>
e.type === SyncEventType.Delete &&
e.documentId === documentId
);
if (deleteEvent !== undefined) {
this.removeAllSyncLocalsForDocumentId(documentId);
removeFromArray(this.events, deleteEvent);
return deleteEvent;
}
// Coalesce multiple sync-locals for the same documentId to the last one
const matching = this.events.filter(
(e) =>
e.type === SyncEventType.SyncLocal &&
e.documentId === documentId
);
const result = matching[matching.length - 1];
for (const item of matching) {
removeFromArray(this.events, item);
}
return result;
}
// First event is a move with no preceding updates
this.events.shift();
return first;
// SyncRemote: coalesce multiple events for the same documentId to the last one
const { documentId } = first.remoteVersion;
const matching = this.events.filter(
(e) =>
e.type === SyncEventType.SyncRemote &&
e.remoteVersion.documentId === documentId
);
const result = matching[matching.length - 1];
for (const item of matching) {
removeFromArray(this.events, item);
}
return result;
}
private removeAllForDocument(documentId: DocumentId): void {
private isIgnored(event: SyncEvent): boolean {
if (event.type !== SyncEventType.Create) return false;
return this.ignorePatterns.some((pattern) => pattern.test(event.path));
}
private removeAllEventsForDocumentId(documentId: DocumentId): void {
for (let i = this.events.length - 1; i >= 0; i--) {
const e = this.events[i];
if (e.type !== "file-create" && e.documentId === documentId) {
if (
(e.type === SyncEventType.SyncLocal &&
e.documentId === documentId) ||
(e.type === SyncEventType.SyncRemote &&
e.remoteVersion.documentId === documentId) ||
(e.type === SyncEventType.Delete &&
e.documentId === documentId)
) {
// eslint-disable-next-line no-restricted-syntax -- Bulk removal by predicate, not single-item removal
this.events.splice(i, 1);
}
}
}
private removeAllSyncLocalsForDocumentId(documentId: DocumentId): void {
for (let i = this.events.length - 1; i >= 0; i--) {
const e = this.events[i];
if (
e.type === SyncEventType.SyncLocal &&
e.documentId === documentId
) {
// eslint-disable-next-line no-restricted-syntax -- Bulk removal by predicate, not single-item removal
this.events.splice(i, 1);
}
}
}
private saveInTheBackground(): void {
void this.save().catch((error: unknown) => {
this.logger.error(`Error saving sync state: ${error}`);
});
}
}

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,42 @@
import type { DocumentVersionWithoutContent } from "../services/types/DocumentVersionWithoutContent";
export type VaultUpdateId = number;
export type DocumentId = string;
export type RelativePath = string;
export interface DocumentRecord {
documentId: DocumentId;
parentVersionId: VaultUpdateId;
hash: string;
remoteRelativePath?: RelativePath;
}
export interface StoredDocument extends DocumentRecord {
relativePath: RelativePath;
}
export interface StoredSyncState {
documents: StoredDocument[];
lastSeenUpdateId: VaultUpdateId | undefined;
}
export enum SyncEventType {
Create = "create",
SyncLocal = "sync-local",
SyncRemote = "sync-remote",
Delete = "delete",
}
export type SyncEvent =
| { type: SyncEventType.Create; path: RelativePath }
| { type: SyncEventType.SyncLocal; documentId: DocumentId }
| {
type: SyncEventType.Delete;
documentId: DocumentId;
path: RelativePath;
displacedAtVersion?: VaultUpdateId;
}
| {
type: SyncEventType.SyncRemote;
remoteVersion: DocumentVersionWithoutContent;
};

View file

@ -1,612 +0,0 @@
import type {
Database,
DocumentRecord,
RelativePath
} from "../persistence/database";
import { diff } from "reconcile-text";
import type { SyncService } from "../services/sync-service";
import type { Logger } from "../tracing/logger";
import type {
CommonHistoryEntry,
SyncCreateDetails,
SyncDeleteDetails,
SyncDetails,
SyncHistory,
SyncMovedDetails,
SyncUpdateDetails
} from "../tracing/sync-history";
import { SyncStatus, SyncType } from "../tracing/sync-history";
import { EMPTY_HASH, hash } from "../utils/hash";
import { base64ToBytes } from "byte-base64";
import type { Settings } from "../persistence/settings";
import type { FileOperations } from "../file-operations/file-operations";
import { FileNotFoundError } from "../errors/file-not-found-error";
import { SyncResetError } from "../errors/sync-reset-error";
import { globsToRegexes } from "../utils/globs-to-regexes";
import type { DocumentVersion } from "../services/types/DocumentVersion";
import type { DocumentUpdateResponse } from "../services/types/DocumentUpdateResponse";
import type { DocumentVersionWithoutContent } from "../services/types/DocumentVersionWithoutContent";
import type { FixedSizeDocumentCache } from "../utils/data-structures/fix-sized-cache";
import { isFileTypeMergable } from "../utils/is-file-type-mergable";
import { isBinary } from "../utils/is-binary";
import type { ServerConfig } from "../services/server-config";
export class UnrestrictedSyncer {
private ignorePatterns: RegExp[];
public constructor(
private readonly logger: Logger,
private readonly database: Database,
private readonly settings: Settings,
private readonly syncService: SyncService,
private readonly operations: FileOperations,
private readonly history: SyncHistory,
private readonly contentCache: FixedSizeDocumentCache,
private readonly serverConfig: ServerConfig
) {
this.ignorePatterns = globsToRegexes(
this.settings.getSettings().ignorePatterns,
this.logger
);
this.settings.onSettingsChanged.add((newSettings) => {
this.ignorePatterns = globsToRegexes(
newSettings.ignorePatterns,
this.logger
);
});
}
public async unrestrictedSyncLocallyCreatedOrUpdatedFile({
oldPath,
// We use the same code path for both local and remote updates. We need to force the update
// if there are no local changes but we know that the remote version is newer.
force = false,
document
}: {
oldPath?: RelativePath;
force?: boolean;
document: DocumentRecord;
}): Promise<void> {
const updateDetails:
| SyncCreateDetails
| SyncUpdateDetails
| SyncMovedDetails =
document.metadata === undefined
? {
type: SyncType.CREATE,
relativePath: document.relativePath
}
: oldPath !== undefined
? {
type: SyncType.MOVE,
relativePath: document.relativePath,
movedFrom: oldPath
}
: {
type: SyncType.UPDATE,
relativePath: document.relativePath
};
await this.executeSync(updateDetails, async () => {
const originalRelativePath = document.relativePath;
if (document.isDeleted) {
this.logger.debug(
`Document ${document.relativePath} has been already deleted, no need to update it`
);
return;
}
const contentBytes = await this.operations.read(
document.relativePath
); // this can throw FileNotFoundError
const contentHash = await hash(contentBytes);
let response: DocumentVersion | DocumentUpdateResponse | undefined =
undefined;
if (document.metadata === undefined) {
response = await this.syncService.create({
relativePath: originalRelativePath,
contentBytes
});
await this.handleMaybeMergingResponse({
document,
response,
contentHash,
originalRelativePath,
originalContentBytes: contentBytes,
isCreate: true
});
} else {
const areThereLocalChanges =
document.metadata.hash !== contentHash ||
oldPath !== undefined;
if (areThereLocalChanges) {
const isText =
!isBinary(contentBytes) &&
isFileTypeMergable(
document.relativePath,
(await this.serverConfig.getConfig())
.mergeableFileExtensions
);
const cachedVersion = this.contentCache.get(
document.metadata.parentVersionId
);
response =
isText && cachedVersion !== undefined
? await this.syncService.putText({
documentId: document.metadata.documentId,
parentVersionId:
document.metadata.parentVersionId,
relativePath: document.relativePath,
content: diff(
new TextDecoder().decode(cachedVersion),
new TextDecoder().decode(contentBytes)
)
})
: await this.syncService.putBinary({
documentId: document.metadata.documentId,
parentVersionId:
document.metadata.parentVersionId,
relativePath: document.relativePath,
contentBytes
});
} else {
if (!force) {
this.logger.debug(
`File hash of ${document.relativePath} matches with last synced version and the path hasn't changed; no need to sync`
);
return;
}
// we use this code path (force == true) to sync remotely updated files which have no local changes
response = await this.syncService.get({
documentId: document.metadata.documentId
});
}
await this.handleMaybeMergingResponse({
document,
response,
contentHash,
originalRelativePath,
originalContentBytes: contentBytes
});
}
if (!("type" in response) || response.type === "MergingUpdate") {
if (!force) {
this.history.addHistoryEntry({
status: SyncStatus.SUCCESS,
details: updateDetails,
message: `The file we updated had been updated remotely, so we downloaded the merged version`
});
return;
}
}
const actualUpdateDetails: SyncUpdateDetails | SyncMovedDetails =
oldPath !== undefined ||
response.relativePath != originalRelativePath
? {
type: SyncType.MOVE,
relativePath: response.relativePath,
movedFrom: originalRelativePath
}
: {
type: SyncType.UPDATE,
relativePath: response.relativePath
};
if (!response.isDeleted) {
this.history.addHistoryEntry({
status: SyncStatus.SUCCESS,
details: actualUpdateDetails,
message: `Successfully downloaded remotely updated file from the server`,
author: response.userId,
timestamp: new Date(response.updatedDate)
});
} else {
this.history.addHistoryEntry({
status: SyncStatus.SUCCESS,
details: {
type: SyncType.DELETE,
relativePath: document.relativePath
},
message:
"Successfully deleted file which had been deleted remotely",
author: response.userId,
timestamp: new Date(response.updatedDate)
});
}
});
}
public async unrestrictedSyncLocallyDeletedFile(
document: DocumentRecord
): Promise<void> {
const updateDetails: SyncDeleteDetails = {
type: SyncType.DELETE,
relativePath: document.relativePath
};
await this.executeSync(updateDetails, async () => {
if (document.metadata === undefined) {
this.logger.debug(
`Document ${document.relativePath} has never been synced, no need to delete it remotely`
);
return;
}
const response = await this.syncService.delete({
documentId: document.metadata.documentId,
relativePath: document.relativePath
});
this.database.updateDocumentMetadata(
{
documentId: response.documentId,
parentVersionId: response.vaultUpdateId,
hash: EMPTY_HASH,
remoteRelativePath: document.relativePath
},
document
);
this.database.addSeenUpdateId(response.vaultUpdateId);
this.history.addHistoryEntry({
status: SyncStatus.SUCCESS,
details: updateDetails,
message: `Successfully deleted locally deleted file on the server`,
author: response.userId
});
});
}
public async unrestrictedSyncRemotelyUpdatedFile(
remoteVersion: DocumentVersionWithoutContent,
document?: DocumentRecord
): Promise<void> {
const updateDetails: SyncCreateDetails = {
type: SyncType.CREATE,
relativePath: remoteVersion.relativePath
};
await this.executeSync(updateDetails, async () => {
if (document?.metadata !== undefined) {
// If the file exists locally, let's pretend the user has updated it
// and deal with remote update/deletion within `unrestrictedSyncLocallyUpdatedFile`
if (
document.metadata.parentVersionId >=
remoteVersion.vaultUpdateId
) {
this.logger.debug(
`Document ${document.relativePath} is already at least as up-to-date as the fetched version`
);
return;
}
return this.unrestrictedSyncLocallyCreatedOrUpdatedFile({
document,
force: true
});
} else if (remoteVersion.isDeleted) {
// Either the document hasn't made it to us before and therefore we don't need to delete it,
// or we already have it, in which case the preceeding if would've dealt with it
this.logger.debug(
`Document ${remoteVersion.relativePath} has been deleted remotely, no need to sync`
);
return;
}
// Don't download oversized files
const historyEntryForSkippedOversizedFile =
this.getHistoryEntryForSkippedOversizedFile(
remoteVersion.contentSize,
remoteVersion.relativePath
);
if (historyEntryForSkippedOversizedFile !== undefined) {
this.history.addHistoryEntry(
historyEntryForSkippedOversizedFile
);
return;
}
const contentBytes =
await this.syncService.getDocumentVersionContent({
documentId: remoteVersion.documentId,
vaultUpdateId: remoteVersion.vaultUpdateId
});
// We're trying to create an entirely new document that didn't exist locally
document = this.database.getDocumentByDocumentId(
remoteVersion.documentId
);
// It can happen that a concurrent sync operation has already created the document, so we can bail here
if (document !== undefined) {
this.logger.debug(
`Document ${remoteVersion.relativePath} has already been created locally, no need to create it again`
);
return;
}
await this.operations.ensureClearPath(remoteVersion.relativePath);
this.database.updateDocumentMetadata(
{
documentId: remoteVersion.documentId,
parentVersionId: remoteVersion.vaultUpdateId,
hash: await hash(contentBytes),
remoteRelativePath: remoteVersion.relativePath
},
this.database.createNewPendingDocument(
remoteVersion.relativePath
)
);
await this.operations.create(
remoteVersion.relativePath,
contentBytes
);
await this.updateCache(
remoteVersion.vaultUpdateId,
contentBytes,
remoteVersion.relativePath
);
this.history.addHistoryEntry({
status: SyncStatus.SUCCESS,
details: updateDetails,
message: `Successfully downloaded remote file which hadn't existed locally`,
author: remoteVersion.userId,
timestamp: new Date(remoteVersion.updatedDate)
});
});
}
private async executeSync<T>(
details: SyncDetails,
fn: () => Promise<T>
): Promise<T | undefined> {
if (!this.settings.getSettings().isSyncEnabled) {
this.logger.info(
`Skipping sync operation for file '${details.relativePath}' because sync is disabled`
);
return;
}
for (const pattern of this.ignorePatterns) {
if (pattern.test(details.relativePath)) {
this.logger.debug(
`File '${details.relativePath}' is ignored by the ignore pattern: ${pattern}`
);
return; // bail without SKIPPED status because we were told to ignore this file and we shouldn't clutter up the history
}
}
try {
// Only check the size of files which already exist locally.
if (await this.operations.exists(details.relativePath)) {
const sizeInBytes = await this.operations.getFileSize(
details.relativePath
);
const historyEntryForSkippedOversizedFile =
this.getHistoryEntryForSkippedOversizedFile(
sizeInBytes,
details.relativePath
);
if (historyEntryForSkippedOversizedFile !== undefined) {
this.history.addHistoryEntry(
historyEntryForSkippedOversizedFile
);
return;
}
}
return await fn();
} catch (e) {
if (e instanceof FileNotFoundError) {
// A subsequent sync operation must have been creating to deal with this
this.logger.info(
`Skiping file '${details.relativePath}' because it no longer exists when trying to ${details.type.toLocaleLowerCase()} it`
);
return;
}
if (e instanceof SyncResetError) {
this.logger.info(
`Interrupting sync operation because of a reset`
);
return;
} else {
this.history.addHistoryEntry({
status: SyncStatus.ERROR,
details,
message: `Failed to sync file '${details.relativePath}' because of ${e} when trying to ${details.type.toLocaleLowerCase()} it`
});
throw e;
}
}
}
private async handleMaybeMergingResponse({
document,
response,
contentHash,
originalRelativePath,
originalContentBytes,
isCreate
}: {
document: DocumentRecord;
response: DocumentVersion | DocumentUpdateResponse;
contentHash: string;
originalRelativePath: string;
originalContentBytes: Uint8Array;
isCreate?: boolean;
}): Promise<void> {
// `document` is mutable and reflects the latest state in the local database
if (document.isDeleted) {
this.logger.info(
`Document ${document.relativePath} has been deleted before we could finish updating it`
);
this.database.addSeenUpdateId(response.vaultUpdateId);
return;
}
if (
(document.metadata?.parentVersionId ?? 0) > response.vaultUpdateId
) {
this.logger.debug(
`Document ${document.relativePath} is already more up to date than the fetched version`
);
this.database.addSeenUpdateId(response.vaultUpdateId); // in case the previous `vaultUpdateId` update hasn't made it through
return;
}
if (response.isDeleted) {
return this.applyRemoteDeleteLocally(document, response);
}
let actualPath = document.relativePath;
if (isCreate) {
// We have a file locally that got moved by another client to the same path as the one we're trying to create.
// The server returns a merging update for the document ID that already exists locally (but at another path).
// We have to merge these two documents by extending the provenance of the existing document and deleting
// the old document that the new document already contains the content for.
const existingDocument = this.database.getDocumentByDocumentId(
response.documentId
);
if (existingDocument !== undefined) {
this.logger.info(
`Merging existing document ${existingDocument.relativePath} into ${document.relativePath
} after concurrent move & creation`
);
if (!existingDocument.isDeleted) {
this.database.delete(existingDocument.relativePath); // make sure syncLocallyDeletedFile doesn't actually schedule deleting the new file
this.database.removeDocument(existingDocument);
await this.operations.move(existingDocument.relativePath, document.relativePath);
} else {
this.database.removeDocument(existingDocument);
}
}
}
// this can't happen on the creation path as we can only get a merging response if a document already exists remotely on the same path
if (response.relativePath != originalRelativePath) {
actualPath = response.relativePath;
// Make sure to update the remote relative path to avoid uploading
// the file as a result of this filesystem event.
if (document.metadata !== undefined) {
document.metadata.remoteRelativePath = response.relativePath;
}
await this.operations.move(
document.relativePath,
response.relativePath
); // this can throw FileNotFoundError
}
if (!("type" in response) || response.type === "MergingUpdate") {
const responseBytes = base64ToBytes(response.contentBase64);
contentHash = await hash(responseBytes);
this.database.updateDocumentMetadata(
{
documentId: response.documentId,
parentVersionId: response.vaultUpdateId,
hash: contentHash,
remoteRelativePath: response.relativePath
},
document
);
await this.operations.write(
actualPath,
originalContentBytes,
responseBytes
);
await this.updateCache(
response.vaultUpdateId,
responseBytes,
actualPath
);
} else {
this.database.updateDocumentMetadata(
{
documentId: response.documentId,
parentVersionId: response.vaultUpdateId,
hash: contentHash,
remoteRelativePath: response.relativePath
},
document
);
await this.updateCache(
response.vaultUpdateId,
originalContentBytes,
actualPath
);
}
this.database.addSeenUpdateId(response.vaultUpdateId);
}
private getHistoryEntryForSkippedOversizedFile(
sizeInBytes: number,
relativePath: RelativePath
): CommonHistoryEntry | undefined {
const sizeInMB = Math.round(sizeInBytes / 1024 / 1024);
const { maxFileSizeMB } = this.settings.getSettings();
if (sizeInMB > maxFileSizeMB) {
return {
status: SyncStatus.SKIPPED,
details: {
type: SyncType.SKIPPED,
relativePath
},
message: `File size of ${sizeInMB} MB exceeds the maximum file size limit of ${maxFileSizeMB
} MB`
};
}
}
private async updateCache(
updateId: number,
contentBytes: Uint8Array,
filePath: RelativePath
): Promise<void> {
if (
isFileTypeMergable(
filePath,
(await this.serverConfig.getConfig()).mergeableFileExtensions
) &&
!isBinary(contentBytes)
) {
this.contentCache.put(updateId, contentBytes);
}
}
private async applyRemoteDeleteLocally(
document: DocumentRecord,
response: DocumentVersion | DocumentUpdateResponse
): Promise<void> {
this.database.delete(document.relativePath);
this.database.updateDocumentMetadata(
{
documentId: response.documentId,
parentVersionId: response.vaultUpdateId,
hash: EMPTY_HASH,
remoteRelativePath: response.relativePath
},
document
);
await this.operations.delete(document.relativePath);
this.database.addSeenUpdateId(response.vaultUpdateId);
}
}