This commit is contained in:
Andras Schmelczer 2026-05-04 13:07:18 +01:00
parent 39c5591d36
commit 35877b69da
94 changed files with 3157 additions and 1859 deletions

View file

@ -0,0 +1,9 @@
export class FileAlreadyExistsError extends Error {
public constructor(
message: string,
public readonly filePath: string
) {
super(message);
this.name = "FileAlreadyExistsError";
}
}

View file

@ -1,15 +1,14 @@
import { describe, it } from "node:test";
import assert from "node:assert/strict";
import type { RelativePath } from "../sync-operations/types";
import { FileOperations, MoveOnConflict } from "./file-operations";
import { FileOperations } from "./file-operations";
import { Logger } from "../tracing/logger";
import { assertSetContainsExactly } from "../utils/assert-set-contains-exactly";
import type { FileSystemOperations } from "./filesystem-operations";
import type { TextWithCursors } from "reconcile-text";
import type { ServerConfig, ServerConfigData } from "../services/server-config";
import { CONFLICT_PATH_REGEX } from "../sync-operations/conflict-path";
import { removeFromArray } from "../utils/remove-from-array";
import { ExpectedFsEvents } from "../sync-operations/expected-fs-events";
import { FileAlreadyExistsError } from "../errors/file-already-exists-error";
class MockServerConfig implements Pick<ServerConfig, "getConfig"> {
public async getConfig(): Promise<ServerConfigData> {
@ -79,173 +78,78 @@ function makeOps(): {
return { fs, ops };
}
function singleConflictPath(
names: Set<string>,
expectedNonConflictNames: string[]
): string {
const expected = new Set(expectedNonConflictNames);
const conflicts = Array.from(names).filter((name) => !expected.has(name));
assert.equal(
conflicts.length,
1,
`expected exactly one conflict-path entry, got ${JSON.stringify(conflicts)}`
);
assert.ok(
CONFLICT_PATH_REGEX.test(conflicts[0]),
`expected ${conflicts[0]} to match the conflict-path pattern`
);
return conflicts[0];
}
describe("File operations", () => {
it("move to empty target just renames the file", async () => {
it("create writes the file at the requested path", async () => {
const { fs, ops } = makeOps();
await ops.create("a", new Uint8Array(), MoveOnConflict.EXISTING);
const result = await ops.create("a", new Uint8Array());
assertSetContainsExactly(fs.names, "a");
assert.equal(result.actualPath, "a");
});
it("create throws FileAlreadyExistsError when the path is occupied", async () => {
const { fs, ops } = makeOps();
await ops.create("note.md", new Uint8Array());
await assert.rejects(
ops.create("note.md", new Uint8Array()),
FileAlreadyExistsError
);
// The original file is left intact and no other entries appeared.
assertSetContainsExactly(fs.names, "note.md");
});
it("move to an empty target just renames the file", async () => {
const { fs, ops } = makeOps();
await ops.create("a", new Uint8Array());
assertSetContainsExactly(fs.names, "a");
await ops.move("a", "b", MoveOnConflict.EXISTING);
const result = await ops.move("a", "b");
assertSetContainsExactly(fs.names, "b");
assert.equal(result.actualPath, "b");
});
it("create with EXISTING displaces the existing file to a conflict path", async () => {
it("move with same source and target is a no-op", async () => {
const { fs, ops } = makeOps();
await ops.create("note.md", new Uint8Array(), MoveOnConflict.EXISTING);
await ops.create("note.md", new Uint8Array(), MoveOnConflict.EXISTING);
await ops.create("a", new Uint8Array());
const result = await ops.move("a", "a");
// The original `note.md` location now holds the new file; the previous
// contents were displaced to a conflict path.
const conflict = singleConflictPath(fs.names, ["note.md"]);
assert.ok(
conflict.endsWith("-note.md"),
`conflict name should preserve the original filename, got ${conflict}`
);
assertSetContainsExactly(fs.names, "a");
assert.equal(result.actualPath, "a");
});
it("create with NEW redirects the new file to a conflict path", async () => {
it("move throws FileAlreadyExistsError when the target is occupied", async () => {
const { fs, ops } = makeOps();
await ops.create("note.md", new Uint8Array(), MoveOnConflict.EXISTING);
await ops.create("note.md", new Uint8Array(), MoveOnConflict.NEW);
await ops.create("source.md", new Uint8Array());
await ops.create("dest.md", new Uint8Array());
// The original `note.md` is untouched; the new file went to a conflict path.
const conflict = singleConflictPath(fs.names, ["note.md"]);
assert.ok(
conflict.endsWith("-note.md"),
`conflict name should preserve the original filename, got ${conflict}`
await assert.rejects(
ops.move("source.md", "dest.md"),
FileAlreadyExistsError
);
// Both files are left intact — no displacement happens.
assertSetContainsExactly(fs.names, "source.md", "dest.md");
});
it("move with EXISTING displaces the target to a conflict path", async () => {
it("create works for nested paths (parent-directory creation)", async () => {
const { fs, ops } = makeOps();
await ops.create(
"source.md",
new Uint8Array(),
MoveOnConflict.EXISTING
);
await ops.create("dest.md", new Uint8Array(), MoveOnConflict.EXISTING);
await ops.move("source.md", "dest.md", MoveOnConflict.EXISTING);
// `dest.md` now holds what used to be at `source.md`; the original
// `dest.md` moved to a conflict path in the same directory.
const conflict = singleConflictPath(fs.names, ["dest.md"]);
assert.ok(
conflict.endsWith("-dest.md"),
`conflict should preserve the original filename, got ${conflict}`
);
await ops.create("a/b.c/d", new Uint8Array());
assertSetContainsExactly(fs.names, "a/b.c/d");
});
it("move with NEW redirects the moved file to a conflict path", async () => {
it("move works for nested target paths (parent-directory creation)", async () => {
const { fs, ops } = makeOps();
await ops.create(
"source.md",
new Uint8Array(),
MoveOnConflict.EXISTING
);
await ops.create("dest.md", new Uint8Array(), MoveOnConflict.EXISTING);
await ops.create("source", new Uint8Array());
await ops.move("source", "a/b.c/dest");
await ops.move("source.md", "dest.md", MoveOnConflict.NEW);
// The original `dest.md` is untouched; the moved file went to a conflict path.
const conflict = singleConflictPath(fs.names, ["dest.md"]);
assert.ok(
conflict.endsWith("-dest.md"),
`conflict should preserve the original filename, got ${conflict}`
);
});
it("preserves the parent directory when generating a conflict path", async () => {
const { fs, ops } = makeOps();
await ops.create("a/b.c/d", new Uint8Array(), MoveOnConflict.EXISTING);
await ops.create("a/b.c/e", new Uint8Array(), MoveOnConflict.EXISTING);
await ops.move("a/b.c/d", "a/b.c/e", MoveOnConflict.EXISTING);
const conflict = singleConflictPath(fs.names, ["a/b.c/e"]);
assert.ok(
conflict.startsWith("a/b.c/"),
`conflict should live in the same directory, got ${conflict}`
);
assert.ok(
conflict.endsWith("-e"),
`conflict should preserve the filename, got ${conflict}`
);
});
it("handles dotfiles without mangling the extension", async () => {
const { fs, ops } = makeOps();
await ops.create(
".gitignore",
new Uint8Array(),
MoveOnConflict.EXISTING
);
await ops.create("temp", new Uint8Array(), MoveOnConflict.EXISTING);
await ops.move("temp", ".gitignore", MoveOnConflict.EXISTING);
const conflict = singleConflictPath(fs.names, [".gitignore"]);
assert.ok(
conflict.endsWith("-.gitignore"),
`conflict should preserve the dotfile name verbatim, got ${conflict}`
);
await ops.create(
".config.json",
new Uint8Array(),
MoveOnConflict.EXISTING
);
await ops.create("temp2", new Uint8Array(), MoveOnConflict.EXISTING);
await ops.move("temp2", ".config.json", MoveOnConflict.EXISTING);
// Now one conflict for .gitignore, one for .config.json.
const conflicts = Array.from(fs.names).filter(
(name) => name !== ".gitignore" && name !== ".config.json"
);
assert.equal(conflicts.length, 2);
assert.ok(conflicts.every((c) => CONFLICT_PATH_REGEX.test(c)));
assert.ok(conflicts.some((c) => c.endsWith("-.gitignore")));
assert.ok(conflicts.some((c) => c.endsWith("-.config.json")));
});
it("generates a fresh conflict path on every displacement", async () => {
const { fs, ops } = makeOps();
await ops.create("x", new Uint8Array(), MoveOnConflict.EXISTING);
await ops.create("x", new Uint8Array(), MoveOnConflict.EXISTING);
await ops.create("x", new Uint8Array(), MoveOnConflict.EXISTING);
const conflicts = Array.from(fs.names);
removeFromArray(conflicts, "x");
assert.equal(conflicts.length, 2);
assert.ok(conflicts.every((c) => CONFLICT_PATH_REGEX.test(c)));
assert.notEqual(
conflicts[0],
conflicts[1],
"each displacement should produce a unique conflict path"
);
assertSetContainsExactly(fs.names, "a/b.c/dest");
});
});

View file

@ -6,28 +6,19 @@ import type { TextWithCursors } from "reconcile-text";
import { reconcile } from "reconcile-text";
import { isFileTypeMergable } from "../utils/is-file-type-mergable";
import { isBinary } from "../utils/is-binary";
import { buildConflictFileName } from "../sync-operations/conflict-path";
import type { ServerConfig } from "../services/server-config";
import { FileNotFoundError } from "../errors/file-not-found-error";
import { FileAlreadyExistsError } from "../errors/file-already-exists-error";
import type { ExpectedFsEvents } from "../sync-operations/expected-fs-events";
export enum MoveOnConflict {
EXISTING = "EXISTING",
NEW = "NEW"
}
/**
* Outcome of a `move`/`create`. `actualPath` is where the new file
* ended up (which may differ from the requested path under
* `MoveOnConflict.NEW` if the target was occupied). `displacedTo` is
* set only when an existing file at the requested path was bumped to
* a `conflict-…` path under `MoveOnConflict.EXISTING`; the caller
* uses it to repoint any tracking for the displaced doc before its
* own follow-up `setDocument` clobbers the old slot.
* Outcome of a `move`/`create`. `actualPath` is where the file ended up;
* with the conflict-path machinery removed it is always equal to the
* requested path. The shape is preserved so callers don't all need to
* change.
*/
export interface FileOpResult {
actualPath: RelativePath;
displacedTo?: RelativePath;
}
export class FileOperations {
@ -55,17 +46,6 @@ export class FileOperations {
return [pathParts.join("/"), fileName];
}
/**
* Build a local-only conflict path for a file the client has to set aside.
* Format: `<dir>/conflict-<uuid>-<originalName>` UUID makes collisions
* statistically impossible, so no disk probe / lock dance is needed.
*/
private static buildConflictPath(path: RelativePath): RelativePath {
const [directory, fileName] = FileOperations.getParentDirAndFileName(path);
const conflictName = buildConflictFileName(fileName);
return directory ? `${directory}/${conflictName}` : conflictName;
}
public async listFilesRecursively(
root: RelativePath | undefined = undefined
): Promise<RelativePath[]> {
@ -79,29 +59,32 @@ export class FileOperations {
/**
* Create a file at the specified path.
*
* If a file with the same name already exists, it is moved before creating the new one.
* Parent directories are created if necessary.
* Throws `FileAlreadyExistsError` if a file already lives at `path`.
* Parent directories are created if necessary. The reconciler is the
* only caller that places files now and pre-checks for conflicts;
* the throw guards against a TOCTOU race rather than being a normal
* code path.
*/
public async create(
path: RelativePath,
newContent: Uint8Array,
moveOnConflict: MoveOnConflict
newContent: Uint8Array
): Promise<FileOpResult> {
const result = await this.ensureClearPath(path, moveOnConflict);
// ensureClearPath leaves actualPath empty: either the file never
// existed, or it was just renamed away. The upcoming write therefore
// looks like a fresh create to the watcher.
this.expectedFsEvents.expectCreate(result.actualPath);
try {
await this.fs.write(
result.actualPath,
this.toNativeLineEndings(newContent)
if (await this.fs.exists(path)) {
throw new FileAlreadyExistsError(
`Refusing to create '${path}': file already exists`,
path
);
}
await this.createParentDirectories(path);
this.expectedFsEvents.expectCreate(path);
try {
await this.fs.write(path, this.toNativeLineEndings(newContent));
} catch (e) {
this.expectedFsEvents.unexpectCreate(result.actualPath);
this.expectedFsEvents.unexpectCreate(path);
throw e;
}
return result;
return { actualPath: path };
}
/**
@ -132,7 +115,8 @@ export class FileOperations {
if (
!isFileTypeMergable(
path,
(await this.serverConfig.getConfig()).mergeableFileExtensions
(await this.serverConfig.getConfig())
.mergeableFileExtensions
) ||
isBinary(expectedContent) ||
isBinary(newContent)
@ -225,64 +209,39 @@ export class FileOperations {
return this.fs.exists(path);
}
/**
* Move the file at `oldPath` to `newPath`.
*
* Throws `FileAlreadyExistsError` if a file already lives at `newPath`
* (and `oldPath !== newPath`). The reconciler is the only caller that
* relocates tracked records and pre-checks for conflicts; the throw
* guards against a TOCTOU race.
*/
public async move(
oldPath: RelativePath,
newPath: RelativePath,
moveOnConflict: MoveOnConflict
newPath: RelativePath
): Promise<FileOpResult> {
if (oldPath === newPath) {
return { actualPath: oldPath };
}
const cleared = await this.ensureClearPath(newPath, moveOnConflict);
this.expectedFsEvents.expectRename(oldPath, cleared.actualPath);
if (await this.fs.exists(newPath)) {
throw new FileAlreadyExistsError(
`Refusing to move '${oldPath}' onto '${newPath}': target already exists`,
newPath
);
}
await this.createParentDirectories(newPath);
this.expectedFsEvents.expectRename(oldPath, newPath);
try {
await this.fs.rename(oldPath, cleared.actualPath);
await this.fs.rename(oldPath, newPath);
} catch (e) {
this.expectedFsEvents.unexpectRename(oldPath, cleared.actualPath);
this.expectedFsEvents.unexpectRename(oldPath, newPath);
throw e;
}
await this.deletingEmptyParentDirectoriesOfDeletedFile(oldPath);
return cleared;
}
private async ensureClearPath(
path: RelativePath,
moveOnConflict: MoveOnConflict
): Promise<FileOpResult> {
if (await this.fs.exists(path)) {
const conflictPath = FileOperations.buildConflictPath(path);
if (moveOnConflict === MoveOnConflict.NEW) {
return { actualPath: conflictPath };
}
this.logger.debug(
`Displacing existing file at ${path} to '${conflictPath}' to make room`
);
// The displaced file's rename will fire as a watcher event;
// register `expectRename` so the watcher dedups it. The
// caller is responsible for the queue bookkeeping (relocating
// the displaced doc's tracking) using the `displacedTo` we
// return.
this.expectedFsEvents.expectRename(path, conflictPath);
try {
await this.fs.rename(path, conflictPath);
} catch (e) {
this.expectedFsEvents.unexpectRename(path, conflictPath);
throw e;
}
return { actualPath: path, displacedTo: conflictPath };
}
this.logger.debug(
`No existing file at ${path}, creating parent directories if needed`
);
await this.createParentDirectories(path);
return { actualPath: path };
return { actualPath: newPath };
}
private async deletingEmptyParentDirectoriesOfDeletedFile(

View file

@ -37,7 +37,6 @@ export type { AuthenticationError } from "./errors/authentication-error";
export type { MaybeOutdatedClientCursors } from "./types/maybe-outdated-client-cursors";
export { DocumentSyncStatus } from "./types/document-sync-status";
export { SyncClient } from "./sync-client";
export { CONFLICT_PATH_REGEX } from "./sync-operations/conflict-path";
export type { TextWithCursors, CursorPosition } from "reconcile-text";
export const debugging = {

View file

@ -25,7 +25,7 @@ export const DEFAULT_SETTINGS: SyncSettings = {
webSocketRetryIntervalMs: 3500,
diffCacheSizeMB: 4,
enableTelemetry: false,
networkRetryIntervalMs: 1000,
networkRetryIntervalMs: 1000
};
export class Settings {

View file

@ -35,7 +35,8 @@ export class ServerConfig {
const shouldUpgradeClient =
config.supportedApiVersion > SUPPORTED_API_VERSION;
throw new ServerVersionMismatchError(
`Unsupported API version: ${config.supportedApiVersion}. Consider upgrading the ${shouldUpgradeClient ? "client" : "sync-server"
`Unsupported API version: ${config.supportedApiVersion}. Consider upgrading the ${
shouldUpgradeClient ? "client" : "sync-server"
} to ensure compatibility`
);
}
@ -90,6 +91,11 @@ export class ServerConfig {
return this.config;
}
public reset(): void {
this.response = undefined;
this.config = undefined;
}
private async startPing(): Promise<PingResponse> {
const pending = this.syncService.ping().catch((e: unknown) => {
if (this.response === pending) {
@ -100,9 +106,4 @@ export class ServerConfig {
this.response = pending;
return pending;
}
public reset(): void {
this.response = undefined;
this.config = undefined;
}
}

View file

@ -71,7 +71,9 @@ export class SyncService {
response: Response,
operation: string
): Promise<void> {
if (response.ok) { return; }
if (response.ok) {
return;
}
const message = `Failed to ${operation}: ${await SyncService.errorFromResponse(response)}`;
// 429 is the only 4xx the server uses for *transient* contention
// (`WriteBusyError` → HTTP 429). Every other 4xx means the request
@ -183,7 +185,8 @@ export class SyncService {
(await response.json()) as DocumentUpdateResponse; // eslint-disable-line @typescript-eslint/no-unsafe-type-assertion
this.logger.debug(
`Updated document ${JSON.stringify(result)} with id ${result.documentId
`Updated document ${JSON.stringify(result)} with id ${
result.documentId
}}`
);
@ -231,7 +234,8 @@ export class SyncService {
(await response.json()) as DocumentUpdateResponse; // eslint-disable-line @typescript-eslint/no-unsafe-type-assertion
this.logger.debug(
`Updated document ${JSON.stringify(result)} with id ${result.documentId
`Updated document ${JSON.stringify(result)} with id ${
result.documentId
}}`
);
@ -240,14 +244,12 @@ export class SyncService {
}
public async delete({
documentId,
documentId
}: {
documentId: DocumentId;
}): Promise<DocumentVersionWithoutContent> {
return this.retryForever(async () => {
this.logger.debug(
`Delete document with id ${documentId}`
);
this.logger.debug(`Delete document with id ${documentId}`);
// The server identifies the document by its URL path; no body
// is needed. Sending one was a leftover of an earlier shape.
@ -264,9 +266,7 @@ export class SyncService {
const result: DocumentVersionWithoutContent =
(await response.json()) as DocumentVersionWithoutContent; // eslint-disable-line @typescript-eslint/no-unsafe-type-assertion
this.logger.debug(
`Deleted document with id ${documentId}`
);
this.logger.debug(`Deleted document with id ${documentId}`);
return result;
});
@ -338,7 +338,7 @@ export class SyncService {
return this.retryForever(async () => {
this.logger.debug(
"Getting all documents" +
(since != null ? ` since ${since}` : "")
(since != null ? ` since ${since}` : "")
);
const url = new URL(this.getUrl("/documents"));

View file

@ -1,4 +1,8 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { DocumentWithCursors } from "./DocumentWithCursors";
export interface ClientCursors { userName: string, deviceId: string, documentsWithCursors: DocumentWithCursors[], }
export interface ClientCursors {
userName: string;
deviceId: string;
documentsWithCursors: DocumentWithCursors[];
}

View file

@ -1,3 +1,7 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
export interface CreateDocumentVersion { relative_path: string, last_seen_vault_update_id: number, content: number[], }
export interface CreateDocumentVersion {
relative_path: string;
last_seen_vault_update_id: number;
content: number[];
}

View file

@ -1,4 +1,6 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { DocumentWithCursors } from "./DocumentWithCursors";
export interface CursorPositionFromClient { documentsWithCursors: DocumentWithCursors[], }
export interface CursorPositionFromClient {
documentsWithCursors: DocumentWithCursors[];
}

View file

@ -1,4 +1,6 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { ClientCursors } from "./ClientCursors";
export interface CursorPositionFromServer { clients: ClientCursors[], }
export interface CursorPositionFromServer {
clients: ClientCursors[];
}

View file

@ -1,3 +1,6 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
export interface CursorSpan { start: number, end: number, }
export interface CursorSpan {
start: number;
end: number;
}

View file

@ -5,4 +5,6 @@ import type { DocumentVersionWithoutContent } from "./DocumentVersionWithoutCont
/**
* Response to a create/update document request.
*/
export type DocumentUpdateResponse = { "type": "FastForwardUpdate" } & DocumentVersionWithoutContent | { "type": "MergingUpdate" } & DocumentVersion;
export type DocumentUpdateResponse =
| ({ type: "FastForwardUpdate" } & DocumentVersionWithoutContent)
| ({ type: "MergingUpdate" } & DocumentVersion);

View file

@ -1,3 +1,12 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
export interface DocumentVersion { vaultUpdateId: number, documentId: string, relativePath: string, updatedDate: string, contentBase64: string, isDeleted: boolean, userId: string, deviceId: string, }
export interface DocumentVersion {
vaultUpdateId: number;
documentId: string;
relativePath: string;
updatedDate: string;
contentBase64: string;
isDeleted: boolean;
userId: string;
deviceId: string;
}

View file

@ -1,7 +1,16 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
export interface DocumentVersionWithoutContent { vaultUpdateId: number, documentId: string, relativePath: string, updatedDate: string, isDeleted: boolean, userId: string, deviceId: string, contentSize: number,
/**
* True iff this is the first version of the document
*/
isNewFile: boolean, }
export interface DocumentVersionWithoutContent {
vaultUpdateId: number;
documentId: string;
relativePath: string;
updatedDate: string;
isDeleted: boolean;
userId: string;
deviceId: string;
contentSize: number;
/**
* True iff this is the first version of the document
*/
isNewFile: boolean;
}

View file

@ -1,4 +1,9 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { CursorSpan } from "./CursorSpan";
export interface DocumentWithCursors { vaultUpdateId: number | null, documentId: string, relativePath: string, cursors: CursorSpan[], }
export interface DocumentWithCursors {
vaultUpdateId: number | null;
documentId: string;
relativePath: string;
cursors: CursorSpan[];
}

View file

@ -4,8 +4,10 @@ import type { DocumentVersionWithoutContent } from "./DocumentVersionWithoutCont
/**
* Response to a fetch latest documents request.
*/
export interface FetchLatestDocumentsResponse { latestDocuments: DocumentVersionWithoutContent[],
/**
* The update ID of the latest document in the response.
*/
lastUpdateId: bigint, }
export interface FetchLatestDocumentsResponse {
latestDocuments: DocumentVersionWithoutContent[];
/**
* The update ID of the latest document in the response.
*/
lastUpdateId: bigint;
}

View file

@ -4,4 +4,8 @@ import type { VaultInfo } from "./VaultInfo";
/**
* Response to listing vaults accessible to the authenticated user.
*/
export interface ListVaultsResponse { vaults: VaultInfo[], hasMore: boolean, userName: string, }
export interface ListVaultsResponse {
vaults: VaultInfo[];
hasMore: boolean;
userName: string;
}

View file

@ -3,22 +3,23 @@
/**
* Response to a ping request.
*/
export interface PingResponse {
/**
* Semantic version of the server.
*/
serverVersion: string,
/**
* Whether the client is authenticated based on the sent Authorization
* header.
*/
isAuthenticated: boolean,
/**
* List of file extensions that are allowed to be merged.
*/
mergeableFileExtensions: string[],
/**
* API version ensuring backwards & forwards compatibility between the client
* and server.
*/
supportedApiVersion: number, }
export interface PingResponse {
/**
* Semantic version of the server.
*/
serverVersion: string;
/**
* Whether the client is authenticated based on the sent Authorization
* header.
*/
isAuthenticated: boolean;
/**
* List of file extensions that are allowed to be merged.
*/
mergeableFileExtensions: string[];
/**
* API version ensuring backwards & forwards compatibility between the client
* and server.
*/
supportedApiVersion: number;
}

View file

@ -1,3 +1,7 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
export interface SerializedError { errorType: string, message: string, causes: string[], }
export interface SerializedError {
errorType: string;
message: string;
causes: string[];
}

View file

@ -1,3 +1,7 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
export interface UpdateTextDocumentVersion { parentVersionId: number, relativePath: string | null, content: (number | string)[], }
export interface UpdateTextDocumentVersion {
parentVersionId: number;
relativePath: string | null;
content: (number | string)[];
}

View file

@ -4,4 +4,7 @@ import type { DocumentVersionWithoutContent } from "./DocumentVersionWithoutCont
/**
* Response to a vault history request (paginated).
*/
export interface VaultHistoryResponse { versions: DocumentVersionWithoutContent[], hasMore: boolean, }
export interface VaultHistoryResponse {
versions: DocumentVersionWithoutContent[];
hasMore: boolean;
}

View file

@ -3,4 +3,8 @@
/**
* Summary of a single vault returned by the list-vaults endpoint.
*/
export interface VaultInfo { name: string, documentCount: number, createdAt: string | null, }
export interface VaultInfo {
name: string;
documentCount: number;
createdAt: string | null;
}

View file

@ -2,4 +2,6 @@
import type { CursorPositionFromClient } from "./CursorPositionFromClient";
import type { WebSocketHandshake } from "./WebSocketHandshake";
export type WebSocketClientMessage = { "type": "handshake" } & WebSocketHandshake | { "type": "cursorPositions" } & CursorPositionFromClient;
export type WebSocketClientMessage =
| ({ type: "handshake" } & WebSocketHandshake)
| ({ type: "cursorPositions" } & CursorPositionFromClient);

View file

@ -1,3 +1,7 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
export interface WebSocketHandshake { token: string, deviceId: string, lastSeenVaultUpdateId: number | null, }
export interface WebSocketHandshake {
token: string;
deviceId: string;
lastSeenVaultUpdateId: number | null;
}

View file

@ -2,4 +2,6 @@
import type { CursorPositionFromServer } from "./CursorPositionFromServer";
import type { WebSocketVaultUpdate } from "./WebSocketVaultUpdate";
export type WebSocketServerMessage = { "type": "vaultUpdate" } & WebSocketVaultUpdate | { "type": "cursorPositions" } & CursorPositionFromServer;
export type WebSocketServerMessage =
| ({ type: "vaultUpdate" } & WebSocketVaultUpdate)
| ({ type: "cursorPositions" } & CursorPositionFromServer);

View file

@ -1,4 +1,6 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { DocumentVersionWithoutContent } from "./DocumentVersionWithoutContent";
export interface WebSocketVaultUpdate { document: DocumentVersionWithoutContent, }
export interface WebSocketVaultUpdate {
document: DocumentVersionWithoutContent;
}

View file

@ -4,6 +4,7 @@ import assert from "node:assert";
import { WebSocketManager } from "./websocket-manager";
import type { Logger } from "../tracing/logger";
import type { Settings } from "../persistence/settings";
import { awaitAll } from "../utils/await-all";
class MockCloseEvent extends Event {
public code: number;
@ -287,7 +288,7 @@ describe("WebSocketManager", () => {
const start = Date.now();
// Two concurrent stops mimic destroy() racing onSettingsChange.
await Promise.all([manager.stop(), manager.stop()]);
await awaitAll([manager.stop(), manager.stop()]);
const elapsed = Date.now() - start;
// Both should resolve via the normal close path; if the second call
@ -297,9 +298,8 @@ describe("WebSocketManager", () => {
elapsed < 1000,
`concurrent stop() took ${elapsed}ms — expected fast resolution`
);
const errorCalls = (
mockLogger.error as unknown as { calls: unknown[] }
).calls;
const errorCalls = (mockLogger.error as unknown as { calls: unknown[] })
.calls;
assert.strictEqual(
errorCalls.length,
0,

View file

@ -70,59 +70,6 @@ export class WebSocketManager {
await this.stopPromise;
}
private async performStop(): Promise<void> {
const { promise, resolve } = Promise.withResolvers<undefined>();
this.resolveDisconnectingPromise = (): void => {
resolve(undefined);
};
this.isStopped = true;
if (this.reconnectTimeoutId !== undefined) {
clearTimeout(this.reconnectTimeoutId);
this.reconnectTimeoutId = undefined;
}
if (this.connectionTimeoutId !== undefined) {
clearTimeout(this.connectionTimeoutId);
this.connectionTimeoutId = undefined;
}
this.webSocket?.close(1000, "WebSocketManager has been stopped");
// eslint-disable-next-line @typescript-eslint/init-declarations
let timeoutId: ReturnType<typeof setTimeout> | undefined;
const timeoutPromise = new Promise<void>((_, reject) => {
timeoutId = setTimeout(() => {
reject(
new Error(
`Timeout waiting for WebSocket to close after ${WEBSOCKET_DISCONNECT_TIMEOUT_IN_SECONDS} seconds`
)
);
}, WEBSOCKET_DISCONNECT_TIMEOUT_IN_SECONDS * 1000);
});
try {
while (this.isWebSocketConnected) {
await Promise.race([promise, timeoutPromise]);
}
} catch (error) {
this.logger.error(
`Error while waiting for WebSocket to close: ${String(error)}`
);
// Force cleanup even if close didn't work
this.resolveDisconnectingPromise?.();
this.resolveDisconnectingPromise = null;
} finally {
// Clear timeout to prevent unhandled rejection
if (timeoutId !== undefined) {
clearTimeout(timeoutId);
}
}
await this.waitUntilFinished();
}
public async waitUntilFinished(): Promise<void> {
await awaitAll(this.outstandingPromises);
}
@ -173,6 +120,59 @@ export class WebSocketManager {
}
}
private async performStop(): Promise<void> {
const { promise, resolve } = Promise.withResolvers<undefined>();
this.resolveDisconnectingPromise = (): void => {
resolve(undefined);
};
this.isStopped = true;
if (this.reconnectTimeoutId !== undefined) {
clearTimeout(this.reconnectTimeoutId);
this.reconnectTimeoutId = undefined;
}
if (this.connectionTimeoutId !== undefined) {
clearTimeout(this.connectionTimeoutId);
this.connectionTimeoutId = undefined;
}
this.webSocket?.close(1000, "WebSocketManager has been stopped");
// eslint-disable-next-line @typescript-eslint/init-declarations
let timeoutId: ReturnType<typeof setTimeout> | undefined;
const timeoutPromise = new Promise<void>((_, reject) => {
timeoutId = setTimeout(() => {
reject(
new Error(
`Timeout waiting for WebSocket to close after ${WEBSOCKET_DISCONNECT_TIMEOUT_IN_SECONDS} seconds`
)
);
}, WEBSOCKET_DISCONNECT_TIMEOUT_IN_SECONDS * 1000);
});
try {
while (this.isWebSocketConnected) {
await Promise.race([promise, timeoutPromise]);
}
} catch (error) {
this.logger.error(
`Error while waiting for WebSocket to close: ${String(error)}`
);
// Force cleanup even if close didn't work
this.resolveDisconnectingPromise();
this.resolveDisconnectingPromise = null;
} finally {
// Clear timeout to prevent unhandled rejection
if (timeoutId !== undefined) {
clearTimeout(timeoutId);
}
}
await this.waitUntilFinished();
}
private initializeWebSocket(): void {
// Clean up old WebSocket handlers to prevent race conditions
if (this.webSocket) {

View file

@ -35,7 +35,9 @@ export class SyncClient {
private unloadTelemetry?: () => void;
private isDestroying = false;
private readonly eventUnsubscribers: (() => void)[] = [];
private readonly settingsChangeLock = new Lock("SyncClient.onSettingsChange");
private readonly settingsChangeLock = new Lock(
"SyncClient.onSettingsChange"
);
private constructor(
public readonly logger: Logger,
@ -57,7 +59,7 @@ export class SyncClient {
database: Partial<StoredSyncState>;
}>
>
) { }
) {}
public get syncedDocumentCount(): number {
return this.syncEventQueue.syncedDocumentCount;
@ -149,7 +151,6 @@ export class SyncClient {
}
);
const syncEventQueue = new SyncEventQueue(
settings,
logger,
@ -403,8 +404,6 @@ export class SyncClient {
this.syncer.syncLocallyDeletedFile(relativePath);
}
public getDocumentSyncingStatus(
relativePath: RelativePath
): DocumentSyncStatus {
@ -436,30 +435,6 @@ export class SyncClient {
await this.waitUntilFinishedInternal();
}
/**
* The actual drain separated from `waitUntilFinished` so internal
* shutdown paths (`pause` / `destroy`) can wait for in-flight work
* without tripping the public `checkIfDestroyed` guard, which exists
* only to keep external callers from continuing to use a disposed
* client.
*
* Loops because a WebSocket message handler completing is what enqueues
* a `RemoteChange` into the syncer; if we awaited the syncer first and
* the WS handler second, a message arriving mid-wait would leave a fresh
* drain pending while `save()` ran. Each iteration waits for both, then
* re-checks; we exit only once both report idle in the same pass.
*/
private async waitUntilFinishedInternal(): Promise<void> {
while (
this.webSocketManager.hasOutstandingWork ||
this.syncer.hasPendingWork
) {
await this.webSocketManager.waitUntilFinished();
await this.syncer.waitUntilFinished();
}
await this.syncEventQueue.save();
}
/**
* Completely destroy the SyncClient, cancelling all in-progress operations.
* After calling this method, the SyncClient cannot be used again.
@ -499,6 +474,30 @@ export class SyncClient {
}
}
/**
* The actual drain separated from `waitUntilFinished` so internal
* shutdown paths (`pause` / `destroy`) can wait for in-flight work
* without tripping the public `checkIfDestroyed` guard, which exists
* only to keep external callers from continuing to use a disposed
* client.
*
* Loops because a WebSocket message handler completing is what enqueues
* a `RemoteChange` into the syncer; if we awaited the syncer first and
* the WS handler second, a message arriving mid-wait would leave a fresh
* drain pending while `save()` ran. Each iteration waits for both, then
* re-checks; we exit only once both report idle in the same pass.
*/
private async waitUntilFinishedInternal(): Promise<void> {
while (
this.webSocketManager.hasOutstandingWork ||
this.syncer.hasPendingWork
) {
await this.webSocketManager.waitUntilFinished();
await this.syncer.waitUntilFinished();
}
await this.syncEventQueue.save();
}
private async startSyncing(): Promise<void> {
this.checkIfDestroyed("startSyncing");
this.fetchController.finishReset();
@ -563,7 +562,9 @@ export class SyncClient {
// reset() pauses, clears state, then starts iff isSyncEnabled
// — so any concurrent isSyncEnabled change is already applied.
await this.reset();
} else if (newSettings.isSyncEnabled !== oldSettings.isSyncEnabled) {
} else if (
newSettings.isSyncEnabled !== oldSettings.isSyncEnabled
) {
if (newSettings.isSyncEnabled) {
await this.startSyncing();
} else {

View file

@ -1,88 +0,0 @@
import { describe, it } from "node:test";
import assert from "node:assert";
import { buildConflictFileName, CONFLICT_PATH_REGEX } from "./conflict-path";
describe("buildConflictFileName", () => {
it("truncates to the filesystem byte limit while preserving the extension", () => {
const result = buildConflictFileName(`${"a".repeat(300)}.md`);
assert.ok(Buffer.byteLength(result, "utf8") <= 255);
assert.ok(result.endsWith(".md"));
});
it("truncates on a codepoint boundary for multi-byte UTF-8 names", () => {
// "🎉" is 4 bytes in UTF-8; splitting one would yield U+FFFD.
const result = buildConflictFileName(`${"🎉".repeat(100)}.md`);
assert.ok(Buffer.byteLength(result, "utf8") <= 255);
assert.ok(!result.includes("<22>"));
});
it("does not split a ZWJ emoji sequence", () => {
// 👨‍👩‍👧 is one grapheme but 5 code points joined by U+200D.
// A codepoint-only truncation can leave a dangling ZWJ.
const family = "\u{1F468}\u{1F469}\u{1F467}";
const result = buildConflictFileName(`${family.repeat(20)}.md`);
assert.ok(Buffer.byteLength(result, "utf8") <= 255);
const stem = result.slice(
"conflict-".length + 36 + 1,
result.length - ".md".length
);
assert.strictEqual(
stem.length % family.length,
0,
"stem length must be a whole number of families"
);
assert.ok(!stem.endsWith(""), "stem must not end with a dangling ZWJ");
});
it("does not split a base character from its combining mark", () => {
// NFD "é" = "e" (U+0065) + combining acute (U+0301): one grapheme,
// two code points. A codepoint-only loop can strand the accent.
const grapheme = "é";
const result = buildConflictFileName(`${grapheme.repeat(150)}.md`);
assert.ok(Buffer.byteLength(result, "utf8") <= 255);
const stem = result.slice(
"conflict-".length + 36 + 1,
result.length - ".md".length
);
assert.strictEqual(
stem.length % grapheme.length,
0,
"stem length must be a whole number of graphemes"
);
assert.ok(
!stem.endsWith("́") || stem.endsWith(grapheme),
"combining mark must stay attached to its base character"
);
});
});
describe("CONFLICT_PATH_REGEX", () => {
it("does not misclassify user-authored names that start with `conflict-`", () => {
assert.strictEqual(
CONFLICT_PATH_REGEX.test("conflict-resolution.md"),
false
);
});
it("only inspects the final path segment", () => {
assert.strictEqual(
CONFLICT_PATH_REGEX.test(
"conflict-12345678-1234-1234-1234-123456789abc-x/note.md"
),
false
);
assert.strictEqual(
CONFLICT_PATH_REGEX.test(
"a/b/conflict-12345678-1234-1234-1234-123456789abc-note.md"
),
true
);
});
it("round-trips with buildConflictFileName", () => {
assert.strictEqual(
CONFLICT_PATH_REGEX.test(buildConflictFileName("note.md")),
true
);
});
});

View file

@ -1,48 +0,0 @@
// Local-only files displaced by `FileOperations.ensureClearPath` are named
// `conflict-<uuid>-<originalName>`. The UUID is a full RFC-4122 v4 value so
// a user-authored filename that happens to start with `conflict-` doesn't
// get misclassified. The leading `(?:^|\/)` and trailing `[^/]*$` anchor the
// match to the final path segment so intermediate directories named after
// old conflict files (if a user renames one into a directory) don't ignore
// everything beneath them.
export const CONFLICT_PATH_REGEX =
/(?:^|\/)conflict-[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}-[^/]*$/u;
const CONFLICT_PREFIX_LEN = "conflict-".length + 36 + 1;
const MAX_SEGMENT_BYTES = 255;
const MAX_ORIGINAL_BYTES = MAX_SEGMENT_BYTES - CONFLICT_PREFIX_LEN - 4;
function truncateFileNameToByteLimit(
fileName: string,
maxBytes: number
): string {
const encoder = new TextEncoder();
if (encoder.encode(fileName).byteLength <= maxBytes) { return fileName; }
const dotIndex = fileName.lastIndexOf(".");
// Dotfile (starts with "." and nothing else) → no extension to preserve.
const hasExtension = dotIndex > 0;
const extension = hasExtension ? fileName.slice(dotIndex) : "";
const stem = hasExtension ? fileName.slice(0, dotIndex) : fileName;
const extensionBytes = encoder.encode(extension).byteLength;
const stemBudget = Math.max(0, maxBytes - extensionBytes);
const segmenter = new Intl.Segmenter(undefined, {
granularity: "grapheme"
});
let truncatedStem = "";
let usedBytes = 0;
for (const { segment } of segmenter.segment(stem)) {
const segmentBytes = encoder.encode(segment).byteLength;
if (usedBytes + segmentBytes > stemBudget) { break; }
truncatedStem += segment;
usedBytes += segmentBytes;
}
return truncatedStem + extension;
}
export function buildConflictFileName(fileName: string): string {
const safeName = truncateFileNameToByteLimit(fileName, MAX_ORIGINAL_BYTES);
return `conflict-${crypto.randomUUID()}-${safeName}`;
}

View file

@ -118,7 +118,7 @@ export class CursorTracker {
for (const [relativePath, cursors] of Object.entries(
documentToCursors
)) {
const record = this.queue.getSettledDocumentByPath(relativePath);
const record = this.queue.getRecordByLocalPath(relativePath);
if (!record) {
continue; // Let's wait for the file to be created before sending cursors
@ -146,7 +146,7 @@ export class CursorTracker {
const readContent = await this.fileOperations.read(
doc.relativePath
);
const record = this.queue.getSettledDocumentByPath(
const record = this.queue.getRecordByLocalPath(
doc.relativePath
);
if (record?.remoteHash !== (await hash(readContent))) {
@ -155,7 +155,9 @@ export class CursorTracker {
}
const afterJson = JSON.stringify(documentsWithCursors);
if (this.lastLocalCursorStateWithoutDirtyDocumentsJson === afterJson) {
if (
this.lastLocalCursorStateWithoutDirtyDocumentsJson === afterJson
) {
return;
}
@ -233,9 +235,7 @@ export class CursorTracker {
private async getDocumentUpToDateness(
document: DocumentWithCursors
): Promise<DocumentUpToDateness> {
const record = this.queue.getSettledDocumentByPath(
document.relativePath
);
const record = this.queue.getRecordByLocalPath(document.relativePath);
if (!record) {
// the document of the cursor must be from the future
@ -253,7 +253,7 @@ export class CursorTracker {
document.relativePath
);
const currentRecord = this.queue.getSettledDocumentByPath(
const currentRecord = this.queue.getRecordByLocalPath(
document.relativePath
);
return currentRecord?.remoteHash === (await hash(currentContent))

View file

@ -27,6 +27,13 @@ export class ExpectedFsEvents {
// delimiter cannot occur inside either path.
private readonly renames = new Map<RelativePath, number>();
private static renameKey(
oldPath: RelativePath,
newPath: RelativePath
): string {
return JSON.stringify({ oldPath, newPath });
}
public expectCreate(path: RelativePath): void {
this.bump(this.creates, path);
}
@ -39,10 +46,7 @@ export class ExpectedFsEvents {
this.bump(this.deletes, path);
}
public expectRename(
oldPath: RelativePath,
newPath: RelativePath
): void {
public expectRename(oldPath: RelativePath, newPath: RelativePath): void {
this.bump(this.renames, ExpectedFsEvents.renameKey(oldPath, newPath));
}
@ -68,10 +72,7 @@ export class ExpectedFsEvents {
this.decrement(this.deletes, path);
}
public unexpectRename(
oldPath: RelativePath,
newPath: RelativePath
): void {
public unexpectRename(oldPath: RelativePath, newPath: RelativePath): void {
this.decrement(
this.renames,
ExpectedFsEvents.renameKey(oldPath, newPath)
@ -106,13 +107,6 @@ export class ExpectedFsEvents {
this.renames.clear();
}
private static renameKey(
oldPath: RelativePath,
newPath: RelativePath
): string {
return JSON.stringify({ oldPath, newPath });
}
private bump(map: Map<RelativePath, number>, key: RelativePath): void {
map.set(key, (map.get(key) ?? 0) + 1);
}
@ -122,15 +116,23 @@ export class ExpectedFsEvents {
key: RelativePath
): boolean {
const count = map.get(key) ?? 0;
if (count === 0) {return false;}
if (count === 1) {map.delete(key);}
else {map.set(key, count - 1);}
if (count === 0) {
return false;
}
if (count === 1) {
map.delete(key);
} else {
map.set(key, count - 1);
}
return true;
}
private decrement(map: Map<RelativePath, number>, key: RelativePath): void {
const count = map.get(key) ?? 0;
if (count <= 1) {map.delete(key);}
else {map.set(key, count - 1);}
if (count <= 1) {
map.delete(key);
} else {
map.set(key, count - 1);
}
}
}

View file

@ -24,6 +24,10 @@ export async function scheduleOfflineChanges(
): Promise<void> {
const allLocalFiles = new Set(await operations.listFilesRecursively());
logger.info(`Scheduling sync for ${allLocalFiles.size} local files`);
// `allSettledDocuments()` skips records with `localPath === undefined`
// — those have no local file by definition and don't participate in
// the disk-vs-record diff. The reconciler will place them on its
// next pass.
const allDocuments = queue.allSettledDocuments();
// A doc is "possibly deleted" only if it has no local file. Including
@ -31,7 +35,14 @@ export async function scheduleOfflineChanges(
// the update below.
const locallyPossiblyDeletedFiles: DocumentRecord[] = [];
for (const record of allDocuments.values()) {
if (!allLocalFiles.has(record.path)) {
// `localPath` is guaranteed non-undefined for entries in
// `allSettledDocuments()`, but narrow explicitly for the type
// checker (and so a future change to that helper doesn't
// silently break this loop).
if (
record.localPath !== undefined &&
!allLocalFiles.has(record.localPath)
) {
locallyPossiblyDeletedFiles.push(record);
}
}
@ -57,11 +68,17 @@ export async function scheduleOfflineChanges(
locallyPossiblyDeletedFiles
);
if (matchingDeletedFile !== undefined) {
// localPath is guaranteed defined for records in
// locallyPossiblyDeletedFiles (we filtered above).
const oldPath = matchingDeletedFile.localPath;
if (oldPath === undefined) {
continue;
}
logger.debug(
`File ${path} might have been moved from ${matchingDeletedFile.path} while offline, scheduling sync to move it`
`File ${path} might have been moved from ${oldPath} while offline, scheduling sync to move it`
);
enqueueUpdate({
oldPath: matchingDeletedFile.path,
oldPath,
relativePath: path
});
removeFromArray(locallyPossiblyDeletedFiles, matchingDeletedFile);
@ -70,7 +87,9 @@ export async function scheduleOfflineChanges(
}
for (const path of locallyPossibleCreatedFiles) {
if (renamedPaths.has(path)) {continue;}
if (renamedPaths.has(path)) {
continue;
}
logger.info(
`File ${path} was created while offline, scheduling sync to create it`
@ -80,10 +99,13 @@ export async function scheduleOfflineChanges(
}
for (const item of locallyPossiblyDeletedFiles) {
if (item.localPath === undefined) {
continue;
}
logger.info(
`File ${item.path} was deleted while offline, scheduling sync to delete it`
`File ${item.localPath} was deleted while offline, scheduling sync to delete it`
);
enqueueDelete(item.path);
enqueueDelete(item.localPath);
}
for (const path of syncedLocalFiles) {

View file

@ -0,0 +1,980 @@
import type { FileOperations } from "../file-operations/file-operations";
import { FileNotFoundError } from "../errors/file-not-found-error";
import { FileAlreadyExistsError } from "../errors/file-already-exists-error";
import type { Logger } from "../tracing/logger";
import type { SyncService } from "../services/sync-service";
import type { SyncEventQueue } from "./sync-event-queue";
import type { DocumentId, DocumentRecord, RelativePath } from "./types";
import { hash } from "../utils/hash";
const SWAP_MARKER_DIR = ".vaultlink";
const SWAP_MARKER_PREFIX = "swap-";
const SWAP_MARKER_SUFFIX = ".json";
interface SwapLeg {
documentId: DocumentId;
from: RelativePath;
to: RelativePath;
expectedHashOnFrom: string;
}
interface SwapMarker {
uuid: string;
legs: SwapLeg[];
}
interface PlannedMove {
record: DocumentRecord;
from: RelativePath;
to: RelativePath;
}
function tryParseSwapMarker(bytes: Uint8Array): SwapMarker | undefined {
try {
// eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion
return JSON.parse(new TextDecoder().decode(bytes)) as SwapMarker;
} catch {
return undefined;
}
}
/**
* The Reconciler is the second of the sync engine's two loops. The wire
* loop (records server) updates `record.remoteRelativePath` and writes
* file content into `record.localPath`; it does not move files for path
* placement. The Reconciler (records disk) runs after every wire-loop
* step and best-effort lines disk up with `remoteRelativePath` for every
* tracked record.
*
* "Best effort" means: any per-record obstacle (slot occupied, file
* missing, etc.) is silently skipped and retried on the next pass.
* `run()` never throws per-record errors are logged and the next
* record is processed.
*
* Three shapes of work exist:
* 1. Initial placement `localPath === undefined`. The wire loop
* created the record with no on-disk presence (e.g. a remote create
* whose target slot was occupied at receive time). If the slot is
* free now, fetch content (from `pendingPlacementContent` if a
* handler stuffed it for us, otherwise from the server) and write.
* 2. Simple rename `localPath !== remoteRelativePath` and no other
* tracked record wants our current slot. Plain rename.
* 3. Cycle two or more records want each others' current slots
* (A B, B A; or longer rotations). Resolved by reading every
* member's bytes into memory then overwriting each target slot.
* A write-ahead marker file lets `recoverFromInterruptedSwap()`
* finish a swap that crashed mid-flight on next startup.
*/
export class Reconciler {
public constructor(
private readonly logger: Logger,
private readonly operations: FileOperations,
private readonly syncService: SyncService,
private readonly queue: SyncEventQueue,
// Bytes already in hand from a recent server response, keyed by
// docId. Wire-loop handlers populate this transiently when they
// have content for a record they just upserted with `localPath
// === undefined`; the reconciler uses it on the same pass
// instead of re-fetching from the server. Keys are deleted when
// consumed.
private readonly pendingPlacementContent: Map<DocumentId, Uint8Array>
) {}
/**
* Single best-effort pass. Walks every tracked record, places
* unplaced ones, and reorganises any whose `localPath !==
* remoteRelativePath`. Never throws — per-record failures are
* logged and the next record is processed. The Syncer is expected
* to call this after every wire-loop drain step, so any record
* skipped this pass gets another shot once the obstructing event
* is processed.
*/
public async run(): Promise<void> {
const allRecords = this.collectAllRecords();
const movesNeeded: PlannedMove[] = [];
for (const record of allRecords) {
if (record.localPath === record.remoteRelativePath) {
continue;
}
// The reconciler operates on settled records. A record with a
// pending LocalUpdate or LocalDelete is mid-flight: the wire
// loop owns the user's intent (rename target, edit content,
// deletion) and the record's `remoteRelativePath` may still
// reflect the pre-rename server state. Touching disk now
// would race the wire loop — e.g. a queued user-rename
// LocalUpdate would find its source path vacated by the
// reconciler moving the file back to the stale
// `remoteRelativePath`. Skip; once the wire loop drains the
// pending events, a subsequent reconciler pass sees a
// settled record and converges.
if (
this.queue.hasPendingLocalEventsForDocumentId(record.documentId)
) {
continue;
}
// The doc has been deleted server-side (HTTP DELETE acked) but
// the WebSocket receipt that would `removeDocumentById` hasn't
// arrived yet. The record looks like "needs initial placement"
// (`localPath === undefined`, since the LocalDelete enqueue
// cleared it), but placing would resurrect a doc the user
// explicitly deleted. Skip; `processRemoteDelete` will remove
// the record entirely once the WS receipt arrives.
if (this.queue.hasPendingServerDelete(record.documentId)) {
continue;
}
if (record.localPath === undefined) {
await this.tryInitialPlacement(record);
continue;
}
// localPath !== undefined and !== remoteRelativePath. Plan a
// move. First defensive existence check: the file may have
// been deleted between the wire loop touching disk and this
// reconciler pass — the watcher's LocalDelete will land
// shortly and fix the record. Skip silently.
try {
if (!(await this.operations.exists(record.localPath))) {
this.logger.debug(
`Reconciler: record ${record.documentId} localPath ${record.localPath} ` +
`is missing on disk; skipping (LocalDelete will catch up)`
);
continue;
}
} catch (e) {
this.logger.error(
`Reconciler: existence check failed for ${record.localPath}: ${String(e)}`
);
continue;
}
movesNeeded.push({
record,
from: record.localPath,
to: record.remoteRelativePath
});
}
if (movesNeeded.length === 0) {
return;
}
await this.executeMoves(movesNeeded);
}
/**
* Read any swap-marker file left behind by a crash mid-swap and
* roll forward. Called once on startup before the Reconciler
* begins normal passes. Idempotent: with no marker, a no-op.
*/
public async recoverFromInterruptedSwap(): Promise<void> {
let markerPaths: RelativePath[] = [];
try {
markerPaths = await this.findSwapMarkerFiles();
} catch (e) {
this.logger.error(
`Reconciler: failed to scan for swap markers: ${String(e)}`
);
return;
}
for (const markerPath of markerPaths) {
try {
await this.recoverFromOneMarker(markerPath);
} catch (e) {
this.logger.error(
`Reconciler: recovery from ${markerPath} failed: ${String(e)}`
);
}
}
}
private collectAllRecords(): DocumentRecord[] {
// Iterate every tracked record — placement-pending ones
// (`localPath === undefined`) included. `allSettledDocuments`
// filters those out, which would render records born from a
// remote create that landed on an occupied slot (no on-disk
// file, no entry in `pendingPlacementContent` either, since the
// wire loop deliberately doesn't buffer their content) invisible
// forever. `pendingPlacementContent` is purely a cache for
// `tryInitialPlacement`'s content fetch — not a record-discovery
// channel.
const out: DocumentRecord[] = [];
for (const record of this.queue.allRecords()) {
out.push(record);
}
// Best-effort cleanup: drop cached content for docs the queue
// no longer tracks. Previously this happened as a side effect of
// the placement-pending discovery loop; do it explicitly now.
if (this.pendingPlacementContent.size > 0) {
for (const docId of this.pendingPlacementContent.keys()) {
if (this.queue.getDocumentByDocumentId(docId) === undefined) {
this.pendingPlacementContent.delete(docId);
}
}
}
return out;
}
private async tryInitialPlacement(record: DocumentRecord): Promise<void> {
const target = record.remoteRelativePath;
// Slot occupancy: pre-check both the disk and our tracked
// records. Either form of occupancy means we wait — the
// occupant's own reconciliation pass (after their next wire-loop
// step) will move them off this slot.
try {
if (await this.operations.exists(target)) {
this.logger.debug(
`Reconciler: cannot place ${record.documentId} at ${target} ` +
`— slot occupied on disk; will retry next pass`
);
return;
}
} catch (e) {
this.logger.error(
`Reconciler: existence check failed for ${target}: ${String(e)}`
);
return;
}
if (this.queue.byLocalPath.get(target) !== undefined) {
this.logger.debug(
`Reconciler: cannot place ${record.documentId} at ${target} ` +
`— slot tracked by another record; will retry next pass`
);
return;
}
let content = this.pendingPlacementContent.get(record.documentId);
if (content === undefined) {
try {
content = await this.syncService.getDocumentVersionContent({
documentId: record.documentId,
vaultUpdateId: record.parentVersionId
});
} catch (e) {
this.logger.error(
`Reconciler: failed to fetch content for ${record.documentId}: ${String(e)}`
);
return;
}
}
try {
await this.operations.create(target, content);
} catch (e) {
if (e instanceof FileNotFoundError) {
this.logger.debug(
`Reconciler: create at ${target} hit FileNotFound (likely parent ` +
`directory race); will retry next pass`
);
return;
}
if (e instanceof FileAlreadyExistsError) {
this.logger.debug(
`Reconciler: create at ${target} lost TOCTOU race ` +
`(slot occupied between pre-check and write); will retry next pass`
);
return;
}
this.logger.error(
`Reconciler: create at ${target} failed: ${String(e)}`
);
return;
}
try {
await this.queue.setLocalPath(record.documentId, target);
} catch (e) {
this.logger.error(
`Reconciler: setLocalPath after create failed for ${record.documentId}: ${String(e)}`
);
return;
}
this.pendingPlacementContent.delete(record.documentId);
this.logger.debug(
`Reconciler: placed ${record.documentId} at ${target}`
);
}
private async executeMoves(moves: PlannedMove[]): Promise<void> {
// Build a directed graph: each move (record currently at `from`,
// wants to go to `to`) gets an edge to whatever tracked record
// currently holds `to`. A node with no outgoing edge is a leaf
// in the DAG: its target slot is held by no tracked record. If
// the slot is held by an *untracked* file we can't safely
// displace it (no record to relocate); skip those moves and
// let the next pass retry.
const movesByDocId = new Map<DocumentId, PlannedMove>();
for (const move of moves) {
movesByDocId.set(move.record.documentId, move);
}
const skipped = new Set<DocumentId>();
const edges = new Map<DocumentId, DocumentId | null>();
for (const move of moves) {
const occupant = this.queue.byLocalPath.get(move.to);
if (occupant === undefined) {
let occupied = false;
try {
occupied = await this.operations.exists(move.to);
} catch (e) {
this.logger.error(
`Reconciler: existence check failed for ${move.to}: ${String(e)}`
);
skipped.add(move.record.documentId);
continue;
}
if (occupied) {
this.logger.debug(
`Reconciler: move ${move.record.documentId} -> ${move.to} blocked ` +
`by untracked file; will retry next pass`
);
skipped.add(move.record.documentId);
continue;
}
edges.set(move.record.documentId, null);
} else if (occupant.documentId === move.record.documentId) {
// Self-loop on `to` shouldn't normally happen — we
// skipped records where localPath===remoteRelativePath
// up front. Defensive: nothing to do.
continue;
} else if (movesByDocId.has(occupant.documentId)) {
edges.set(move.record.documentId, occupant.documentId);
} else {
// Occupant is a tracked record that doesn't *want* to
// move (its localPath === its remoteRelativePath). We
// can't dislodge it without orphaning its on-disk
// file; skip and retry.
this.logger.debug(
`Reconciler: move ${move.record.documentId} -> ${move.to} blocked by ` +
`tracked record ${occupant.documentId} which is not moving; ` +
`will retry next pass`
);
skipped.add(move.record.documentId);
}
}
// SCC decomposition (Tarjan's algorithm) over the move graph.
const sccs = this.tarjanSccs(edges, skipped);
// Topo-sort the DAG of SCCs (leaves first). Tarjan emits SCCs
// in reverse topological order — leaves first — which is
// already what we want.
for (const scc of sccs) {
if (scc.length === 1) {
const [docId] = scc;
if (skipped.has(docId)) {
continue;
}
const move = movesByDocId.get(docId);
if (move === undefined) {
continue;
}
// Self-loop check: if the only edge from this node
// points back to itself, treat as a 1-cycle (impossible
// given our up-front filter, but cheap defensiveness).
const target = edges.get(docId);
if (target === docId) {
await this.executeCycle([move]);
} else {
await this.executeSimpleRename(move);
}
} else {
const cycleMoves = scc
.map((id) => movesByDocId.get(id))
.filter(
(m): m is PlannedMove =>
m !== undefined && !skipped.has(m.record.documentId)
);
if (cycleMoves.length === scc.length) {
await this.executeCycle(cycleMoves);
} else {
// A member of the cycle was skipped — the cycle
// can't be resolved as a unit. Skip the rest; next
// pass tries again with whatever's still relevant.
this.logger.debug(
`Reconciler: cycle of ${scc.length} skipped because a ` +
`member dropped out; will retry next pass`
);
}
}
}
}
private async executeSimpleRename(move: PlannedMove): Promise<void> {
// Defense-in-depth: the queue's invariant says
// `record.localPath !== undefined ⇒ byLocalPath.get(record.localPath) === record`.
// If the byLocalPath index disagrees with the record we
// captured when planning, the invariant was violated somewhere
// upstream — the file at `move.from` belongs to a different
// record now and renaming it would clobber that record's
// content. Refuse the move; the next pass re-plans.
const indexed = this.queue.byLocalPath.get(move.from);
if (indexed !== move.record) {
this.logger.warn(
`Reconciler: refusing rename ${move.from} -> ${move.to} for ` +
`${move.record.documentId}: byLocalPath says ${move.from} ` +
`belongs to ${indexed?.documentId ?? "<no record>"} ` +
`(invariant violation upstream); skipping`
);
return;
}
// The target may have been freed by an earlier move in this
// pass (a leaf we processed first). Re-check both source and
// target before committing.
try {
if (!(await this.operations.exists(move.from))) {
this.logger.debug(
`Reconciler: source ${move.from} vanished before rename; skipping`
);
return;
}
} catch (e) {
this.logger.error(
`Reconciler: existence check failed for ${move.from}: ${String(e)}`
);
return;
}
try {
if (await this.operations.exists(move.to)) {
if (this.queue.byLocalPath.get(move.to) !== undefined) {
// Slot got reclaimed by a tracked doc mid-pass —
// back off and retry next pass.
this.logger.debug(
`Reconciler: target ${move.to} reclaimed by another record ` +
`mid-pass; skipping`
);
return;
}
// Untracked file appeared; same reasoning as in
// executeMoves' planning step. Defer.
this.logger.debug(
`Reconciler: target ${move.to} now occupied by untracked file; skipping`
);
return;
}
} catch (e) {
this.logger.error(
`Reconciler: existence check failed for ${move.to}: ${String(e)}`
);
return;
}
try {
await this.operations.move(move.from, move.to);
} catch (e) {
if (e instanceof FileNotFoundError) {
this.logger.debug(
`Reconciler: rename ${move.from} -> ${move.to} hit FileNotFound; ` +
`will retry next pass`
);
return;
}
if (e instanceof FileAlreadyExistsError) {
this.logger.debug(
`Reconciler: rename ${move.from} -> ${move.to} lost TOCTOU race ` +
`(target reclaimed between pre-check and rename); will retry next pass`
);
return;
}
this.logger.error(
`Reconciler: rename ${move.from} -> ${move.to} failed: ${String(e)}`
);
return;
}
try {
await this.queue.setLocalPath(move.record.documentId, move.to);
} catch (e) {
this.logger.error(
`Reconciler: setLocalPath after rename failed for ${move.record.documentId}: ${String(e)}`
);
return;
}
this.logger.debug(
`Reconciler: renamed ${move.record.documentId} from ${move.from} to ${move.to}`
);
}
private async executeCycle(members: PlannedMove[]): Promise<void> {
// Defense-in-depth: same invariant check as
// `executeSimpleRename` but cycle-wide. If any member's `from`
// slot no longer matches the planned record per byLocalPath,
// abort the whole cycle — partial-cycle progress under a
// shadowed-record race is the worst case (it can shuffle bytes
// between the wrong docs).
for (const member of members) {
const indexed = this.queue.byLocalPath.get(member.from);
if (indexed !== member.record) {
this.logger.warn(
`Reconciler: refusing cycle: byLocalPath says ${member.from} ` +
`belongs to ${indexed?.documentId ?? "<no record>"} ` +
`but planned for ${member.record.documentId} ` +
`(invariant violation upstream); skipping cycle`
);
return;
}
}
// Read every member's bytes first; we'll overwrite the target
// slots with these. All reads happen before any write, so the
// cycle is fully captured in memory before we start mutating
// disk. If any read fails the whole cycle is aborted —
// partial-cycle work is the riskiest case (it can leave docs
// pointing at the wrong content).
const contentByDocId = new Map<DocumentId, Uint8Array>();
// We also need the pre-write content of each `to` slot for the
// 3-way merge in `operations.write` — passing the freshly-read
// disk bytes as `expectedContent` makes the merge resolve to a
// clean overwrite (since `expected === current` at write time).
const oldToContentByDocId = new Map<DocumentId, Uint8Array>();
try {
for (const member of members) {
contentByDocId.set(
member.record.documentId,
await this.operations.read(member.from)
);
}
// The `to` of each member is guaranteed to be the `from` of
// some other member (it's a cycle). We've already read all
// those `from`s, so reuse those reads.
const fromToDocId = new Map<RelativePath, DocumentId>();
for (const member of members) {
fromToDocId.set(member.from, member.record.documentId);
}
for (const member of members) {
const sourceDocId = fromToDocId.get(member.to);
if (sourceDocId === undefined) {
throw new Error(
`Reconciler: cycle ${member.record.documentId} -> ${member.to} ` +
`has no member at ${member.to}; graph is not a true cycle`
);
}
const oldBytes = contentByDocId.get(sourceDocId);
if (oldBytes === undefined) {
throw new Error(
`Reconciler: missing pre-read content for ${sourceDocId}`
);
}
oldToContentByDocId.set(member.record.documentId, oldBytes);
}
} catch (e) {
this.logger.error(
`Reconciler: cycle pre-read failed: ${String(e)}; aborting cycle`
);
return;
}
// Write-ahead marker so a crash mid-swap can be repaired on
// next start. Recovery decides what's been written by hashing
// each `from` slot — anything still matching `expectedHashOnFrom`
// hasn't been overwritten yet.
const legs: SwapLeg[] = [];
try {
for (const member of members) {
const memberContent = contentByDocId.get(
member.record.documentId
);
if (memberContent === undefined) {
throw new Error(
`Reconciler: cycle member ${member.record.documentId} missing content`
);
}
legs.push({
documentId: member.record.documentId,
from: member.from,
to: member.to,
expectedHashOnFrom: await hash(memberContent)
});
}
} catch (e) {
this.logger.error(
`Reconciler: cycle hashing failed: ${String(e)}; aborting cycle`
);
return;
}
const markerUuid = crypto.randomUUID();
const markerPath = this.markerPathFor(markerUuid);
const markerBytes = new TextEncoder().encode(
JSON.stringify({ uuid: markerUuid, legs } satisfies SwapMarker)
);
try {
// The marker path embeds a fresh uuid, so a FileAlreadyExistsError
// is statistically impossible here.
await this.operations.create(markerPath, markerBytes);
} catch (e) {
this.logger.error(
`Reconciler: failed to write swap marker ${markerPath}: ${String(e)}; ` +
`aborting cycle`
);
return;
}
// Now apply the writes. Each leg overwrites the bytes at `to`
// with the bytes that were at the cycle predecessor's `from`.
// We pass the freshly-read pre-write content as
// `expectedContent` so the 3-way merge inside `operations.write`
// becomes a clean overwrite (no concurrent edits to merge with).
// `operations.write` registers `expectUpdate` itself, so the
// watcher swallows each leg's modify event.
const writtenLegs: SwapLeg[] = [];
for (const leg of legs) {
const newBytes = contentByDocId.get(leg.documentId);
const oldBytes = oldToContentByDocId.get(leg.documentId);
if (newBytes === undefined || oldBytes === undefined) {
this.logger.error(
`Reconciler: cycle leg ${leg.from} -> ${leg.to} missing ` +
`content; aborting cycle`
);
return;
}
try {
await this.operations.write(leg.to, oldBytes, newBytes);
writtenLegs.push(leg);
} catch (e) {
this.logger.error(
`Reconciler: cycle leg ${leg.from} -> ${leg.to} write failed: ` +
`${String(e)}; cycle is now in a half-applied state — recovery ` +
`marker ${markerPath} will roll forward on next start`
);
// Don't delete the marker — it's load-bearing for
// recovery. The records' localPath assignments are
// intentionally NOT updated for the failed leg or any
// subsequent leg, so the next reconciler pass will
// observe the same situation and re-plan.
return;
}
}
// Re-key records to their new localPaths. We do this AFTER
// all writes succeeded; if a setLocalPath fails partway the
// marker is still on disk and recovery covers it.
for (const leg of writtenLegs) {
try {
await this.queue.setLocalPath(leg.documentId, leg.to);
} catch (e) {
this.logger.error(
`Reconciler: setLocalPath after cycle write failed for ` +
`${leg.documentId}: ${String(e)}`
);
}
}
try {
await this.operations.delete(markerPath);
} catch (e) {
this.logger.warn(
`Reconciler: failed to delete swap marker ${markerPath}: ${String(e)}; ` +
`next start's recovery will see it but find every leg already applied`
);
}
this.logger.debug(
`Reconciler: completed cycle of ${members.length} members`
);
}
private async findSwapMarkerFiles(): Promise<RelativePath[]> {
let entries: RelativePath[] = [];
try {
entries =
await this.operations.listFilesRecursively(SWAP_MARKER_DIR);
} catch (e) {
if (e instanceof FileNotFoundError) {
return [];
}
throw e;
}
return entries.filter((p) => {
const name = p.split("/").pop() ?? "";
return (
name.startsWith(SWAP_MARKER_PREFIX) &&
name.endsWith(SWAP_MARKER_SUFFIX)
);
});
}
private async recoverFromOneMarker(
markerPath: RelativePath
): Promise<void> {
const markerBytes = await this.operations.read(markerPath);
const marker = this.parseSwapMarker(markerBytes);
if (marker === undefined) {
this.logger.error(
`Reconciler: corrupt swap marker ${markerPath}; deleting`
);
try {
await this.operations.delete(markerPath);
} catch (deleteErr) {
this.logger.error(
`Reconciler: failed to delete corrupt marker ${markerPath}: ${String(deleteErr)}`
);
}
return;
}
this.logger.info(
`Reconciler: recovering from interrupted swap ${marker.uuid} ` +
`with ${marker.legs.length} legs`
);
// Recovery rules per leg:
// - hash(from) === expectedHashOnFrom — the swap was
// interrupted BEFORE this leg overwrote `to`. We need to
// write the source bytes to `to` AND update the record.
// - hash(from) differs (or `from` is missing) — this leg
// already ran (someone else's bytes are now at `from`,
// which means the cycle predecessor's leg ran too). Mark
// as already-applied for record bookkeeping.
for (const leg of marker.legs) {
let needsApply = false;
try {
if (await this.operations.exists(leg.from)) {
const fromBytes = await this.operations.read(leg.from);
const fromHash = await hash(fromBytes);
needsApply = fromHash === leg.expectedHashOnFrom;
}
} catch (e) {
this.logger.error(
`Reconciler: hash check during recovery for ${leg.from} failed: ` +
`${String(e)}; skipping leg`
);
continue;
}
if (needsApply) {
try {
const sourceBytes = await this.operations.read(leg.from);
// We don't know what (if anything) is at `to`. If
// it exists we want to overwrite. operations.write
// refuses if the file doesn't exist, so:
if (await this.operations.exists(leg.to)) {
const currentToBytes = await this.operations.read(
leg.to
);
await this.operations.write(
leg.to,
currentToBytes,
sourceBytes
);
} else {
await this.operations.create(leg.to, sourceBytes);
}
} catch (e) {
this.logger.error(
`Reconciler: applying recovery leg ${leg.from} -> ${leg.to} ` +
`failed: ${String(e)}`
);
continue;
}
}
// Whether we just applied or it was already applied,
// update the record so its localPath matches the
// post-swap state.
try {
const record = this.queue.getDocumentByDocumentId(
leg.documentId
);
if (record !== undefined) {
await this.queue.setLocalPath(leg.documentId, leg.to);
}
} catch (e) {
this.logger.error(
`Reconciler: setLocalPath during recovery for ${leg.documentId} ` +
`failed: ${String(e)}`
);
}
}
try {
await this.operations.delete(markerPath);
} catch (e) {
this.logger.error(
`Reconciler: failed to delete swap marker ${markerPath} after recovery: ` +
String(e)
);
}
}
private markerPathFor(uuid: string): RelativePath {
return `${SWAP_MARKER_DIR}/${SWAP_MARKER_PREFIX}${uuid}${SWAP_MARKER_SUFFIX}`;
}
/**
* SCC decomposition over the move graph, returning components in
* leaves-first order (so the caller can process leaves before
* cycles, freeing target slots progressively).
*
* Exploits the fact that this is a *functional graph*: each node
* has at most one outgoing edge (the doc whose slot we want). So
* every non-trivial SCC is a single simple cycle; any non-cycle
* node is its own singleton component. To detect cycles, walk
* from each unvisited node following edges and mark the path; if
* we hit a node on the current path, the segment from that node
* to the current frontier is a cycle. If we hit a visited node
* not on the current path (or a null), we just chain leaves.
*
* Skipped nodes are treated as having no outgoing edge (their
* targets are blocked).
*/
private tarjanSccs(
edges: Map<DocumentId, DocumentId | null>,
skipped: Set<DocumentId>
): DocumentId[][] {
const allNodes = new Set<DocumentId>();
for (const id of edges.keys()) {
allNodes.add(id);
}
for (const id of skipped) {
allNodes.add(id);
}
const visited = new Set<DocumentId>();
const componentOf = new Map<DocumentId, number>();
const sccs: DocumentId[][] = [];
const edgeOf = (node: DocumentId): DocumentId | null => {
if (skipped.has(node)) {
return null;
}
return edges.get(node) ?? null;
};
for (const root of allNodes) {
if (visited.has(root)) {
continue;
}
// Walk forward marking the path until we hit a visited node
// or a null. `pathIndex` lets us detect "did we land back on
// our own path".
const path: DocumentId[] = [];
const pathIndex = new Map<DocumentId, number>();
let cursor: DocumentId | null = root;
while (
cursor !== null &&
!visited.has(cursor) &&
!pathIndex.has(cursor)
) {
pathIndex.set(cursor, path.length);
path.push(cursor);
cursor = edgeOf(cursor);
}
// We stopped because either (a) cursor is null, (b) cursor
// is already visited (chain merges into an earlier-explored
// subgraph — every node on `path` is its own singleton
// component), or (c) cursor is on `path` itself — the
// suffix of `path` from `pathIndex.get(cursor)` onward is a
// cycle; the prefix is a tail of singletons.
let cycleStart = path.length;
if (cursor !== null) {
const idx = pathIndex.get(cursor);
if (idx !== undefined) {
cycleStart = idx;
}
}
// Singletons in `path[0..cycleStart)`. Emit them in
// leaves-first order: the deepest (closest to the cycle or
// chain-end) is the leaf in the DAG of SCCs, so we emit
// from the END of the prefix backward to get topo order
// (children before parents).
for (let i = cycleStart - 1; i >= 0; i--) {
const node = path[i];
visited.add(node);
const componentId = sccs.length;
componentOf.set(node, componentId);
sccs.push([node]);
}
// Cycle (if any).
if (cycleStart < path.length) {
const cycleNodes = path.slice(cycleStart);
const componentId = sccs.length;
for (const node of cycleNodes) {
visited.add(node);
componentOf.set(node, componentId);
}
sccs.push(cycleNodes);
}
}
// The order produced above is mostly leaves-first per chain,
// but chains explored later may include singletons that merge
// into earlier-emitted components. Re-sort by (component points
// to anything? if so, target's component must come first). With
// a functional graph this is equivalent to emitting any node
// before the node it points to. Do a final stable topo sort.
const componentTarget = new Map<number, number | null>();
for (let cid = 0; cid < sccs.length; cid++) {
// Pick a representative; in a functional-graph SCC, every
// node's edge points either inside the SCC (cycle) or to
// exactly one other SCC (singleton chain). For singletons
// the representative's edge gives us the parent component.
const [rep] = sccs[cid];
const edge = edgeOf(rep);
if (edge === null) {
componentTarget.set(cid, null);
} else {
const targetCid = componentOf.get(edge);
if (targetCid === undefined || targetCid === cid) {
componentTarget.set(cid, null);
} else {
componentTarget.set(cid, targetCid);
}
}
}
// Topo-sort: emit a component only after its target has been
// emitted.
const emitted = new Set<number>();
const ordered: DocumentId[][] = [];
const tryEmit = (cid: number, stack: Set<number>): void => {
if (emitted.has(cid)) {
return;
}
if (stack.has(cid)) {
return;
} // shouldn't happen given functional-graph SCC contraction
stack.add(cid);
const target = componentTarget.get(cid) ?? null;
if (target !== null) {
tryEmit(target, stack);
}
stack.delete(cid);
if (!emitted.has(cid)) {
emitted.add(cid);
ordered.push(sccs[cid]);
}
};
for (let cid = 0; cid < sccs.length; cid++) {
tryEmit(cid, new Set());
}
return ordered;
}
private parseSwapMarker(bytes: Uint8Array): SwapMarker | undefined {
// Marker files are written by us (`writeSwapMarker`) and only
// consumed here on startup recovery; the shape is closed. Treat
// a parse failure as a corrupt marker.
const parsed = tryParseSwapMarker(bytes);
if (
parsed === undefined ||
typeof parsed.uuid !== "string" ||
!Array.isArray(parsed.legs)
) {
return undefined;
}
return parsed;
}
}

View file

@ -1,20 +1,56 @@
import { describe, it } from "node:test";
import assert from "node:assert";
import { SyncEventQueue } from "./sync-event-queue";
import {
STORED_STATE_SCHEMA_VERSION,
SyncEventQueue
} from "./sync-event-queue";
import { Settings } from "../persistence/settings";
import { Logger } from "../tracing/logger";
import type { DocumentVersionWithoutContent } from "../services/types/DocumentVersionWithoutContent";
import { SyncEventType } from "./types";
import type { DocumentRecord, RelativePath } from "./types";
import type { DocumentRecord, RelativePath, StoredSyncState } from "./types";
interface QueueHarness {
queue: SyncEventQueue;
settings: Settings;
saveCalls: StoredSyncState[];
}
function createHarness(
options: {
ignorePatterns?: string[];
initialState?: Partial<StoredSyncState>;
omitSchemaVersion?: boolean;
} = {}
): QueueHarness {
const logger = new Logger();
const settings = new Settings(
logger,
{ ignorePatterns: options.ignorePatterns ?? [] },
async () => {
/* no-op */
}
);
const saveCalls: StoredSyncState[] = [];
const initialState: Partial<StoredSyncState> | undefined =
options.initialState === undefined && options.omitSchemaVersion !== true
? { schemaVersion: STORED_STATE_SCHEMA_VERSION }
: options.initialState;
const queue = new SyncEventQueue(
settings,
logger,
initialState,
async (data) => {
saveCalls.push(data);
}
);
return { queue, settings, saveCalls };
}
function createQueue(ignorePatterns: string[] = []): SyncEventQueue {
const logger = new Logger();
const settings = new Settings(logger, { ignorePatterns }, async () => {
/* no-op */
});
return new SyncEventQueue(settings, logger, undefined, async () => {
/* no-op */
});
return createHarness({ ignorePatterns }).queue;
}
function fakeRemoteVersion(
@ -39,12 +75,13 @@ function fakeRecord(
documentId: string,
overrides: Partial<DocumentRecord> = {}
): DocumentRecord {
const path = `${documentId.toLowerCase()}.md`;
return {
path: `${documentId.toLowerCase()}.md`,
documentId,
parentVersionId: 1,
remoteHash: `hash-${documentId}`,
remoteRelativePath: `${documentId.toLowerCase()}.md`,
remoteRelativePath: path,
localPath: path,
...overrides
};
}
@ -52,7 +89,7 @@ function fakeRecord(
describe("SyncEventQueue", () => {
it("returns enqueued events in FIFO order with no coalescing", async () => {
const queue = createQueue();
await queue.setDocument("a.md", fakeRecord("A"));
await queue.upsertRecord(fakeRecord("A"));
await queue.enqueue({ type: SyncEventType.LocalCreate, path: "b.md" });
await queue.enqueue({ type: SyncEventType.LocalCreate, path: "c.md" });
@ -87,7 +124,7 @@ describe("SyncEventQueue", () => {
it("delete resolves documentId from path", async () => {
const queue = createQueue();
await queue.setDocument("a.md", fakeRecord("A"));
await queue.upsertRecord(fakeRecord("A"));
await queue.enqueue({ type: SyncEventType.LocalDelete, path: "a.md" });
@ -105,42 +142,161 @@ describe("SyncEventQueue", () => {
assert.strictEqual(queue.pendingUpdateCount, 0);
});
it("delete clears the localPath of the affected record", async () => {
const queue = createQueue();
await queue.upsertRecord(fakeRecord("A"));
await queue.enqueue({ type: SyncEventType.LocalDelete, path: "a.md" });
const record = queue.getDocumentByDocumentId("A");
assert.ok(record !== undefined);
assert.strictEqual(record.localPath, undefined);
assert.strictEqual(
queue.getRecordByLocalPath("a.md" as RelativePath),
undefined
);
});
it("document store CRUD operations work correctly", async () => {
const queue = createQueue();
assert.strictEqual(queue.getSettledDocumentByPath("a.md"), undefined);
assert.strictEqual(
queue.getRecordByLocalPath("a.md" as RelativePath),
undefined
);
assert.strictEqual(queue.syncedDocumentCount, 0);
await queue.setDocument("a.md", fakeRecord("A"));
await queue.upsertRecord(fakeRecord("A"));
assert.strictEqual(queue.syncedDocumentCount, 1);
assert.deepStrictEqual(
queue.getSettledDocumentByPath("a.md"),
fakeRecord("A")
);
const settled = queue.getRecordByLocalPath("a.md" as RelativePath);
assert.strictEqual(settled?.documentId, "A");
assert.strictEqual(settled.localPath, "a.md");
assert.strictEqual(settled.remoteRelativePath, "a.md");
const found = queue.getDocumentByDocumentId("A");
assert.strictEqual(found?.path, "a.md");
assert.strictEqual(found?.localPath, "a.md");
assert.strictEqual(found.documentId, "A");
await queue.removeDocument("a.md");
await queue.removeDocumentById("A");
assert.strictEqual(queue.syncedDocumentCount, 0);
assert.strictEqual(queue.getSettledDocumentByPath("a.md"), undefined);
assert.strictEqual(
queue.getRecordByLocalPath("a.md" as RelativePath),
undefined
);
assert.strictEqual(queue.getDocumentByDocumentId("A"), undefined);
});
it("SyncLocal with oldPath moves the document in the store", async () => {
it("LocalUpdate with oldPath moves the document on disk", async () => {
const queue = createQueue();
await queue.setDocument("a.md", fakeRecord("A"));
await queue.upsertRecord(fakeRecord("A"));
await queue.enqueue({
type: SyncEventType.LocalUpdate,
path: "b.md",
oldPath: "a.md"
});
assert.strictEqual(queue.getSettledDocumentByPath("a.md"), undefined);
assert.strictEqual(
queue.getSettledDocumentByPath("b.md")?.documentId,
queue.getRecordByLocalPath("a.md" as RelativePath),
undefined
);
const moved = queue.getRecordByLocalPath("b.md" as RelativePath);
assert.strictEqual(moved?.documentId, "A");
assert.strictEqual(moved.localPath, "b.md");
// The doc's remoteRelativePath is owned by the wire loop, not the
// watcher path — a local rename does not move the server-side path.
assert.strictEqual(moved.remoteRelativePath, "a.md");
});
it("LocalUpdate rename onto a tracked slot enqueues a delete for the displaced doc", async () => {
const queue = createQueue();
await queue.upsertRecord(fakeRecord("A"));
await queue.upsertRecord(fakeRecord("B"));
// User renames a.md onto b.md, clobbering b.md on disk.
await queue.enqueue({
type: SyncEventType.LocalUpdate,
path: "b.md",
oldPath: "a.md"
});
// Doc A now lives at b.md.
const aRecord = queue.getDocumentByDocumentId("A");
assert.strictEqual(aRecord?.localPath, "b.md");
const slot = queue.getRecordByLocalPath("b.md" as RelativePath);
assert.strictEqual(slot?.documentId, "A");
// Doc B has no local file anymore (its bytes were overwritten).
const bRecord = queue.getDocumentByDocumentId("B");
assert.strictEqual(bRecord?.localPath, undefined);
// Two events should be queued: the LocalDelete for B, then the
// LocalUpdate for A (push order in `enqueue`).
assert.strictEqual(queue.pendingUpdateCount, 2);
const first = await queue.next();
assert.strictEqual(first?.type, SyncEventType.LocalDelete);
assert.strictEqual(first.documentId, "B");
assert.strictEqual(first.path, "b.md");
const second = await queue.next();
assert.strictEqual(second?.type, SyncEventType.LocalUpdate);
assert.strictEqual(second.documentId, "A");
assert.strictEqual(second.path, "b.md");
assert.strictEqual(second.isUserRename, true);
});
it("byLocalPath stays consistent across upsertRecord, setLocalPath, and rename", async () => {
const queue = createQueue();
await queue.upsertRecord(fakeRecord("A"));
assert.strictEqual(queue.byLocalPath.size, 1);
assert.strictEqual(
queue.byLocalPath.get("a.md" as RelativePath)?.documentId,
"A"
);
// upsertRecord that relocates the localPath should re-key.
await queue.upsertRecord(
fakeRecord("A", { localPath: "renamed.md" as RelativePath })
);
assert.strictEqual(queue.byLocalPath.size, 1);
assert.strictEqual(
queue.byLocalPath.get("a.md" as RelativePath),
undefined
);
assert.strictEqual(
queue.byLocalPath.get("renamed.md" as RelativePath)?.documentId,
"A"
);
// setLocalPath should re-key.
await queue.setLocalPath("A", "later.md" as RelativePath);
assert.strictEqual(queue.byLocalPath.size, 1);
assert.strictEqual(
queue.byLocalPath.get("renamed.md" as RelativePath),
undefined
);
assert.strictEqual(
queue.byLocalPath.get("later.md" as RelativePath)?.documentId,
"A"
);
// setLocalPath to undefined should drop the entry.
await queue.setLocalPath("A", undefined);
assert.strictEqual(queue.byLocalPath.size, 0);
assert.strictEqual(
queue.byLocalPath.get("later.md" as RelativePath),
undefined
);
// The record is still tracked by docId.
assert.strictEqual(
queue.getDocumentByDocumentId("A")?.localPath,
undefined
);
});
it("create can be re-enqueued after being dequeued", async () => {
@ -178,9 +334,52 @@ describe("SyncEventQueue", () => {
assert.strictEqual(queue.pendingUpdateCount, 2);
});
it("addInternalIgnorePattern hides paths from enqueue and survives settings reload", async () => {
const harness = createHarness({ ignorePatterns: ["*.tmp"] });
const { queue, settings } = harness;
queue.addInternalIgnorePattern(".vaultlink/**");
await queue.enqueue({
type: SyncEventType.LocalCreate,
path: ".vaultlink/swap"
});
assert.strictEqual(queue.pendingUpdateCount, 0);
// User-pattern matching still works alongside the internal pattern.
await queue.enqueue({
type: SyncEventType.LocalCreate,
path: "scratch.tmp"
});
assert.strictEqual(queue.pendingUpdateCount, 0);
// Settings reload must not forget the internal pattern.
await settings.setSettings({ ignorePatterns: ["*.bak"] });
await queue.enqueue({
type: SyncEventType.LocalCreate,
path: ".vaultlink/another"
});
assert.strictEqual(queue.pendingUpdateCount, 0);
// The new user pattern took effect.
await queue.enqueue({
type: SyncEventType.LocalCreate,
path: "old.bak"
});
assert.strictEqual(queue.pendingUpdateCount, 0);
// And paths outside both pattern sets still pass through.
await queue.enqueue({
type: SyncEventType.LocalCreate,
path: "notes.md"
});
assert.strictEqual(queue.pendingUpdateCount, 1);
});
it("clearPending removes events but keeps documents", async () => {
const queue = createQueue();
await queue.setDocument("a.md", fakeRecord("A"));
await queue.upsertRecord(fakeRecord("A"));
await queue.enqueue({ type: SyncEventType.LocalCreate, path: "b.md" });
await queue.enqueue({ type: SyncEventType.LocalCreate, path: "c.md" });
@ -191,15 +390,18 @@ describe("SyncEventQueue", () => {
assert.strictEqual(queue.pendingUpdateCount, 0);
assert.strictEqual(queue.syncedDocumentCount, 1);
assert.strictEqual(
queue.getSettledDocumentByPath("a.md")?.documentId,
queue.getRecordByLocalPath("a.md" as RelativePath)?.documentId,
"A"
);
});
it("allSettledDocuments returns all tracked documents", async () => {
it("allSettledDocuments returns all tracked documents that have a localPath", async () => {
const queue = createQueue();
await queue.setDocument("a.md", fakeRecord("A"));
await queue.setDocument("b.md", fakeRecord("B"));
await queue.upsertRecord(fakeRecord("A"));
await queue.upsertRecord(fakeRecord("B"));
// A doc with no local file (e.g. a remote create whose slot was
// occupied) should not appear in the localPath-keyed view.
await queue.upsertRecord(fakeRecord("C", { localPath: undefined }));
const docs = queue.allSettledDocuments();
assert.strictEqual(docs.size, 2);
@ -208,37 +410,70 @@ describe("SyncEventQueue", () => {
});
it("loads initial state from persistence", () => {
const logger = new Logger();
const settings = new Settings(logger, {}, async () => {
/* no-op */
});
const queue = new SyncEventQueue(
settings,
logger,
{
const harness = createHarness({
initialState: {
schemaVersion: STORED_STATE_SCHEMA_VERSION,
documents: [
fakeRecord("A", { path: "a.md", parentVersionId: 5 }),
fakeRecord("B", { path: "b.md", parentVersionId: 3 })
fakeRecord("A", { parentVersionId: 5 }),
fakeRecord("B", { parentVersionId: 3 })
],
lastSeenUpdateId: 4
},
async () => {
/* no-op */
}
);
});
const { queue } = harness;
assert.strictEqual(queue.syncedDocumentCount, 2);
assert.strictEqual(
queue.getSettledDocumentByPath("a.md")?.documentId,
queue.getRecordByLocalPath("a.md" as RelativePath)?.documentId,
"A"
);
assert.strictEqual(
queue.getSettledDocumentByPath("b.md")?.documentId,
queue.getRecordByLocalPath("b.md" as RelativePath)?.documentId,
"B"
);
assert.strictEqual(queue.lastSeenUpdateId, 4);
});
it("constructor with mismatched schema version wipes state and saves the new version", () => {
const harness = createHarness({
initialState: {
schemaVersion: 0,
documents: [fakeRecord("A"), fakeRecord("B")],
lastSeenUpdateId: 7
}
});
// Persisted documents and watermark were discarded.
assert.strictEqual(harness.queue.syncedDocumentCount, 0);
assert.strictEqual(harness.queue.lastSeenUpdateId, 0);
// The constructor scheduled a save (don't await — fire-and-forget),
// but we synchronously enqueued it so it should have landed by now.
// The recorded save uses the current schema version.
assert.ok(harness.saveCalls.length >= 1);
const last = harness.saveCalls[harness.saveCalls.length - 1];
assert.strictEqual(last.schemaVersion, STORED_STATE_SCHEMA_VERSION);
assert.deepStrictEqual(last.documents, []);
assert.strictEqual(last.lastSeenUpdateId, 0);
});
it("constructor with missing schema version also wipes state", () => {
const harness = createHarness({
initialState: {
documents: [fakeRecord("A")],
lastSeenUpdateId: 3
}
});
assert.strictEqual(harness.queue.syncedDocumentCount, 0);
assert.strictEqual(harness.queue.lastSeenUpdateId, 0);
assert.ok(harness.saveCalls.length >= 1);
assert.strictEqual(
harness.saveCalls[harness.saveCalls.length - 1].schemaVersion,
STORED_STATE_SCHEMA_VERSION
);
});
it("resolveCreate settles the document and resolves the create promise", async () => {
const queue = createQueue();
@ -250,12 +485,16 @@ describe("SyncEventQueue", () => {
await queue.resolveCreate(
event,
fakeRecord("DOC-1", { parentVersionId: 5 })
fakeRecord("DOC-1", {
parentVersionId: 5,
localPath: "a.md" as RelativePath,
remoteRelativePath: "a.md" as RelativePath
})
);
// Document is now settled
assert.strictEqual(
queue.getSettledDocumentByPath("a.md")?.documentId,
queue.getRecordByLocalPath("a.md" as RelativePath)?.documentId,
"DOC-1"
);
@ -279,22 +518,132 @@ describe("SyncEventQueue", () => {
it("hasPendingEventsForPath reflects pending events", async () => {
const queue = createQueue();
await queue.setDocument("a.md", fakeRecord("A"));
await queue.upsertRecord(fakeRecord("A"));
assert.strictEqual(queue.hasPendingEventsForPath("a.md"), false);
assert.strictEqual(
queue.hasPendingEventsForPath("a.md" as RelativePath),
false
);
await queue.enqueue({ type: SyncEventType.LocalDelete, path: "a.md" });
assert.strictEqual(queue.hasPendingEventsForPath("a.md"), true);
// After a delete the localPath is cleared; an unknown path is treated
// as "must be pending creation", so this still returns true.
assert.strictEqual(
queue.hasPendingEventsForPath("a.md" as RelativePath),
true
);
});
it("setLocalPath displaces a previous holder of the same path", async () => {
const queue = createQueue();
await queue.upsertRecord(fakeRecord("A"));
await queue.upsertRecord(
fakeRecord("B", { localPath: "b.md" as RelativePath })
);
// Move B onto a.md — the slot already held by A. The invariant
// requires A's localPath to be cleared (placement-pending),
// and byLocalPath["a.md"] === B.
await queue.setLocalPath("B", "a.md" as RelativePath);
const a = queue.getDocumentByDocumentId("A");
const b = queue.getDocumentByDocumentId("B");
assert.strictEqual(a?.localPath, undefined);
assert.strictEqual(b?.localPath, "a.md");
assert.strictEqual(
queue.getRecordByLocalPath("a.md" as RelativePath)?.documentId,
"B"
);
// B's old slot is now empty — nothing else moved into it.
assert.strictEqual(
queue.getRecordByLocalPath("b.md" as RelativePath),
undefined
);
});
it("upsertRecord displaces a previous holder of the same path", async () => {
const queue = createQueue();
await queue.upsertRecord(fakeRecord("A"));
// A new record (different docId) claims a.md. The prior holder
// (A) must be displaced — its localPath cleared, and
// byLocalPath["a.md"] now points at the new record.
await queue.upsertRecord(
fakeRecord("B", { localPath: "a.md" as RelativePath })
);
const a = queue.getDocumentByDocumentId("A");
const b = queue.getDocumentByDocumentId("B");
assert.strictEqual(a?.localPath, undefined);
assert.strictEqual(b?.localPath, "a.md");
assert.strictEqual(
queue.getRecordByLocalPath("a.md" as RelativePath)?.documentId,
"B"
);
});
it("the localPath/byLocalPath invariant holds across rename + recreate cycles", async () => {
// Construct the exact same-path create cycle that produces the
// bug-D race: docA at P, then docB created at P (via
// upsertRecord), and finally a setLocalPath that would move a
// third doc onto P. The invariant must hold at every step:
// exactly one record has localPath===P at any given time, and
// byLocalPath.get(P) returns it.
const queue = createQueue();
const path = "p.md" as RelativePath;
await queue.upsertRecord(
fakeRecord("A", { localPath: path, remoteRelativePath: path })
);
// Sanity: A holds the slot.
assert.strictEqual(queue.getRecordByLocalPath(path)?.documentId, "A");
assert.strictEqual(queue.getDocumentByDocumentId("A")?.localPath, path);
// docB created at P via upsertRecord (e.g. a remote create
// that races A's local file onto the same slot). A must be
// displaced.
await queue.upsertRecord(
fakeRecord("B", { localPath: path, remoteRelativePath: path })
);
assert.strictEqual(
queue.getDocumentByDocumentId("A")?.localPath,
undefined
);
assert.strictEqual(queue.getDocumentByDocumentId("B")?.localPath, path);
assert.strictEqual(queue.getRecordByLocalPath(path)?.documentId, "B");
// Now setLocalPath moves a third doc C onto P. B must in turn
// be displaced; the invariant still holds.
await queue.upsertRecord(
fakeRecord("C", { localPath: "c.md" as RelativePath })
);
await queue.setLocalPath("C", path);
assert.strictEqual(
queue.getDocumentByDocumentId("B")?.localPath,
undefined
);
assert.strictEqual(queue.getDocumentByDocumentId("C")?.localPath, path);
assert.strictEqual(queue.getRecordByLocalPath(path)?.documentId, "C");
// Across the whole cycle exactly one record holds the slot.
const holders = Array.from(queue.allRecords()).filter(
(r) => r.localPath === path
);
assert.strictEqual(holders.length, 1);
assert.strictEqual(holders[0].documentId, "C");
});
it("clearAllState clears everything", async () => {
const queue = createQueue();
await queue.setDocument("a.md", fakeRecord("A"));
await queue.upsertRecord(fakeRecord("A"));
await queue.enqueue({ type: SyncEventType.LocalCreate, path: "b.md" });
await queue.clearAllState();
assert.strictEqual(queue.syncedDocumentCount, 0);
assert.strictEqual(queue.pendingUpdateCount, 0);
assert.strictEqual(queue.byLocalPath.size, 0);
});
});

View file

@ -1,7 +1,6 @@
import type { Settings } from "../persistence/settings";
import type { Logger } from "../tracing/logger";
import { globsToRegexes } from "../utils/globs-to-regexes";
import { CONFLICT_PATH_REGEX } from "./conflict-path";
import { removeFromArray } from "../utils/remove-from-array";
import { EventListeners } from "../utils/data-structures/event-listeners";
import {
@ -16,6 +15,8 @@ import {
} from "./types";
import { MinCovered } from "../utils/data-structures/min-covered";
export const STORED_STATE_SCHEMA_VERSION = 2;
export class SyncEventQueue {
// Fires synchronously whenever the events array length changes (push, pop,
// remove, bulk-clear). The Syncer mirrors this into its public count
@ -27,13 +28,17 @@ export class SyncEventQueue {
private readonly _lastSeenUpdateId: MinCovered;
// Latest state of the filesystem as we know it, excluding
// unconfirmed creates but including pending deletes.
//
// It's always indexed by the latest path on disk.
//
// It maps a subset of the remote state onto the local filesystem.
private readonly documents = new Map<RelativePath, DocumentRecord>();
// Primary index of every settled document, keyed by docId. The wire loop
// (records ↔ server) updates `remoteRelativePath` here as the server
// assigns/relocates a doc; the Reconciler (records ↔ disk) updates
// `localPath` here as it places files on disk.
private readonly byDocId = new Map<DocumentId, DocumentRecord>();
// Derived index from `localPath -> record`. Maintained alongside every
// mutation that touches `localPath` so callers (the watcher path through
// `enqueue`, the Reconciler) get O(1) lookups by disk location. Only
// contains records whose `localPath !== undefined`.
private readonly _byLocalPath = new Map<RelativePath, DocumentRecord>();
// All outstanding operations in order of occurrence,
// can include multiple generations of the same document,
@ -50,12 +55,28 @@ export class SyncEventQueue {
// because the user explicitly told us to ignore them.
private userIgnorePatterns: RegExp[];
// Whether `CONFLICT_PATH_REGEX` is applied at enqueue time. Conflict files
// exist because the syncer set them aside; ignoring them at runtime
// prevents resync churn. During an offline scan we DO want to surface them
// so a stranded conflict file (e.g. one this client previously displaced
// and was unable to re-sync) gets picked up as a normal new file.
private ignoreConflictPaths = true;
// Hard-coded ignores that callers (e.g. the Syncer for `.vaultlink/**`
// swap-marker files) pin via `addInternalIgnorePattern`. Folded into
// `userIgnorePatterns` so the existing match path doesn't need to know
// about two arrays. Stored separately so a later `onSettingsChanged`
// event that re-derives `userIgnorePatterns` from settings doesn't
// forget the internal patterns.
private readonly internalIgnorePatterns: RegExp[] = [];
// DocIds whose HTTP DELETE has been acked by the server but whose
// WebSocket-receipt-driven `removeDocumentById` hasn't run yet (the
// record is still in `byDocId` because the wire loop keeps it around to
// recognise late remote updates as "file is missing"). The Reconciler
// and the remote-update wire-loop handlers consult this set to skip any
// work that would resurrect the doc — without it, a placement-pending
// record (`localPath === undefined` after the LocalDelete enqueue) would
// be re-fetched from the server and written back to disk, or a late
// RemoteChange for the same doc would stash the pre-delete bytes into
// `pendingPlacementContent` for the Reconciler to "place".
//
// Cleared as a side effect of `removeDocumentById`. Also cleared on
// `clearAllState` / schema-version-mismatch reset.
private readonly _pendingServerDeletes = new Set<DocumentId>();
public constructor(
private readonly settings: Settings,
@ -69,17 +90,52 @@ export class SyncEventQueue {
);
this.settings.onSettingsChanged.add((newSettings) => {
this.userIgnorePatterns = globsToRegexes(
newSettings.ignorePatterns,
this.logger
);
this.userIgnorePatterns = [
...globsToRegexes(newSettings.ignorePatterns, this.logger),
...this.internalIgnorePatterns
];
});
initialState ??= {};
const persistedSchemaVersion = initialState.schemaVersion;
if (persistedSchemaVersion !== STORED_STATE_SCHEMA_VERSION) {
this.logger.info(
`Persisted state schema version is ${persistedSchemaVersion ?? "unset"}, expected ${STORED_STATE_SCHEMA_VERSION}; discarding persisted documents and watermark so the offline scan re-derives state from disk`
);
initialState = {};
// Schedule a save so the new schema version sticks even if the user
// never makes a change. Don't await here (constructor is sync); the
// first real save in `save()` will pin it down anyway.
void this.saveData({
schemaVersion: STORED_STATE_SCHEMA_VERSION,
documents: [],
lastSeenUpdateId: 0
});
}
if (initialState.documents !== undefined) {
for (const record of initialState.documents) {
this.documents.set(record.path, record);
this.byDocId.set(record.documentId, record);
if (record.localPath !== undefined) {
// Defensive: if two persisted records share the same
// localPath (shouldn't happen given the invariant
// enforced at every mutation point, but persisted
// state from older buggy versions could violate it),
// displace the prior holder so we don't end up with
// a shadowed record on load.
const displaced = this._byLocalPath.get(record.localPath);
if (displaced !== undefined && displaced !== record) {
displaced.localPath = undefined;
this.logger.warn(
`Persisted state had two records sharing localPath ` +
`${record.localPath} (${displaced.documentId} and ` +
`${record.documentId}); clearing the prior holder's ` +
`localPath so the reconciler re-places it`
);
}
this._byLocalPath.set(record.localPath, record);
}
}
}
this._lastSeenUpdateId = new MinCovered(
@ -87,7 +143,7 @@ export class SyncEventQueue {
);
this.logger.debug(
`Loaded ${this.documents.size} documents and lastSeenUpdateId=${this._lastSeenUpdateId.min} from storage`
`Loaded ${this.byDocId.size} documents and lastSeenUpdateId=${this._lastSeenUpdateId.min} from storage`
);
}
@ -96,7 +152,17 @@ export class SyncEventQueue {
}
public get syncedDocumentCount(): number {
return this.documents.size;
return this.byDocId.size;
}
/**
* Read-only view of the `localPath -> record` index. Use for O(1) lookups
* by disk location; the index is maintained by every mutation that
* touches `localPath` (`upsertRecord`, `setLocalPath`, the rename branch
* of `enqueue`, `removeDocumentById`).
*/
public get byLocalPath(): ReadonlyMap<RelativePath, DocumentRecord> {
return this._byLocalPath;
}
public get lastSeenUpdateId(): VaultUpdateId {
@ -108,12 +174,17 @@ export class SyncEventQueue {
}
/**
* Toggle whether `CONFLICT_PATH_REGEX` filters incoming events. The
* offline scan flips this off so a stranded conflict file gets surfaced
* as a regular create; everywhere else conflict files stay ignored.
* Pin an additional ignore pattern that survives setting reloads. Used
* by the Syncer to hide internal scratch paths (e.g. `.vaultlink/**`
* swap markers written by the Reconciler) from the watcher-driven
* enqueue path. The pattern is compiled with the same `globsToRegexes`
* used for user-configurable ignores; matching uses the existing
* userIgnorePatterns array so there's only one match path.
*/
public setIgnoreConflictPaths(ignore: boolean): void {
this.ignoreConflictPaths = ignore;
public addInternalIgnorePattern(pattern: string): void {
const compiled = globsToRegexes([pattern], this.logger);
this.internalIgnorePatterns.push(...compiled);
this.userIgnorePatterns.push(...compiled);
}
public async enqueue(input: FileSyncEvent): Promise<void> {
@ -135,32 +206,10 @@ export class SyncEventQueue {
return;
}
// Drop bare LocalCreate events for conflict paths. Those are
// produced by the watcher when the syncer's own write to a
// displacement path slips past the `ExpectedFsEvents` filter
// (e.g. a sync race where the watcher fires before
// `expectCreate` was registered). Re-uploading them as new docs
// would invent duplicates on the server. The legitimate way a
// conflict-path doc enters the queue is via the displacement
// rename's `LocalUpdate` (with `oldPath`) — that branch is
// allowed through below so the tracked document's path follows
// its file.
if (
this.ignoreConflictPaths &&
CONFLICT_PATH_REGEX.test(path) &&
input.type === SyncEventType.LocalCreate
) {
this.logger.info(
`Ignoring local-create for ${path} as it is a conflict path`
);
return;
}
if (input.type === SyncEventType.LocalCreate) {
this.events.push({
type: SyncEventType.LocalCreate,
path,
originalPath: path,
resolvers: Promise.withResolvers()
});
this.notifyPendingUpdateCountChanged();
@ -169,10 +218,10 @@ export class SyncEventQueue {
const lookupPath =
input.type === SyncEventType.LocalUpdate &&
input.oldPath !== undefined
input.oldPath !== undefined
? input.oldPath
: path;
const record = this.documents.get(lookupPath);
const record = this._byLocalPath.get(lookupPath);
// latest creation must take precedence as it's from the doc's latest generation
const pendingDocumentId: Promise<DocumentId> | undefined =
@ -180,18 +229,30 @@ export class SyncEventQueue {
const documentId: DocumentId | undefined = record?.documentId;
if (pendingDocumentId === undefined && documentId === undefined) {
const effectiveDocumentId:
| Promise<DocumentId>
| DocumentId
| undefined = pendingDocumentId ?? documentId;
if (effectiveDocumentId === undefined) {
// we can get here when deleting a local document after a remote update
return;
}
if (input.type === SyncEventType.LocalDelete) {
// Push BEFORE awaiting `setLocalPath` (and its inner `save()`).
// See the comment below on the synchronicity contract with
// `ensureDraining()`.
this.events.push({
type: SyncEventType.LocalDelete,
documentId: (pendingDocumentId ?? documentId)!,
documentId: effectiveDocumentId,
path: lookupPath
});
this.notifyPendingUpdateCountChanged();
if (record !== undefined) {
// The file is gone from disk; clear the doc's localPath so the
// Reconciler doesn't try to operate on a vacated slot.
await this.setLocalPath(record.documentId, undefined);
}
return;
}
@ -211,45 +272,47 @@ export class SyncEventQueue {
// overwrote that file), that doc effectively no longer
// exists locally — its content was clobbered. Without
// explicitly recording the loss the doc would silently
// drop out of the documents map below and we'd skip
// drop out of the byLocalPath index below and we'd skip
// notifying the server, leaving a phantom on the remote
// that other agents still see. Enqueue a LocalDelete for
// it so the server learns about the deletion.
const displacedRecord = this.documents.get(path);
const displacedRecord = this._byLocalPath.get(path);
if (
displacedRecord !== undefined &&
displacedRecord.documentId !== documentId
displacedRecord.documentId !== record.documentId
) {
this.events.push({
type: SyncEventType.LocalDelete,
documentId: displacedRecord.documentId,
// The doc still lives at `path` on the server; the
// OS rename only overwrote our local file. Snapshot
// the path so `processDelete` can issue the server
// DELETE even after `documents.set(path, record)`
// below removes the entry from the map.
// Snapshot the path; once we move `record` onto
// `path` below the displaced doc will no longer
// resolve via `byLocalPath`.
path
});
// Drop the displaced doc's localPath: its file on
// disk is gone (overwritten by the rename).
// Mutate synchronously so the byLocalPath index is
// correct before we move `record` onto the same
// slot below; the persist runs in the trailing
// `save()` so we don't await before pushing the
// LocalUpdate (synchronicity contract).
this.mutateLocalPathInPlace(displacedRecord, undefined);
needsSave = true;
}
// Inlined relocation: same shape as `setDocument`'s
// relocation branch (mutate the record's path in place,
// delete-old, set-new, retarget queued LocalUpdates) but
// kept synchronous. Callers fire `enqueue` with `void`
// and immediately call `ensureDraining()`; if we awaited
// `setDocument()` here, the LocalUpdate push below would
// happen after the await and the drain that already
// started would see an empty queue, exit, and leave the
// event stranded. We mutate `record.path` rather than
// re-creating it so any reference held by an in-flight
// drain handler sees the new path on its next read.
record.path = path;
this.documents.delete(input.oldPath);
this.documents.set(path, record);
// Move record's localPath onto the new slot. We mutate
// the record in place rather than re-creating it so any
// held reference (drain handlers, queued events) sees
// the new path on its next read.
this.mutateLocalPathInPlace(record, path);
// Retarget any queued LocalUpdates for this doc onto
// the new path. The queue's invariant — and what
// `skipIfOversized` and the watcher dedup checks bake
// in — is that `event.path` always points at the doc's
// current disk location.
for (const e of this.events) {
// It already has a docId, so there can't be a pending create event for it
if (
e.type === SyncEventType.LocalUpdate &&
e.documentId === documentId
e.documentId === record.documentId
) {
e.path = path;
}
@ -258,11 +321,14 @@ export class SyncEventQueue {
}
}
// Push BEFORE awaiting `save()`. See the comment above on the
// synchronicity contract with `ensureDraining()`.
// Push BEFORE awaiting `save()`. The synchronicity contract is:
// `Syncer.ensureDraining()` runs immediately after each `enqueue`,
// and the drain only sees what's in `events[]`. Pushing after an
// await would let the drain start, see an empty queue, exit, and
// leave the event stranded.
this.events.push({
type: SyncEventType.LocalUpdate,
documentId: (pendingDocumentId ?? documentId)!,
documentId: effectiveDocumentId,
path,
originalPath: path,
isUserRename
@ -282,7 +348,6 @@ export class SyncEventQueue {
return event;
}
/**
* Return the next event without removing it. Drain uses this so the
* event stays visible in the queue while it is being processed
@ -308,7 +373,6 @@ export class SyncEventQueue {
}
}
/**
* Call once a create has been acknowledged by the server.
*
@ -316,7 +380,7 @@ export class SyncEventQueue {
* this create was still in-flight carry the create's `resolvers.promise`
* as their `documentId` (see the `pendingDocumentId` branch of
* `enqueue`). We must rewrite those references to the resolved string
* id *before* calling `setDocument`, otherwise its event-rewrite loop
* id *before* calling `upsertRecord`, otherwise its event-rewrite loop
* (which compares `e.documentId === record.documentId`) would silently
* skip them leaving their `event.path` pointing at the pre-rename
* slot and causing the next drain step's `getFileSize(event.path)` to
@ -333,7 +397,7 @@ export class SyncEventQueue {
event.resolvers.promise,
record.documentId
);
await this.setDocument(event.path, record);
await this.upsertRecord(record);
event.resolvers.resolve(record.documentId);
}
@ -360,76 +424,118 @@ export class SyncEventQueue {
}
/**
* Update the settled document map and persist the new document version.
* Insert or merge a document record by `documentId`. When a record with
* the same docId already exists it is mutated in place so any held
* references (drain handlers, queued events) keep seeing the up-to-date
* fields on their next read this stays load-bearing for the Syncer's
* drain handlers, which await across HTTP roundtrips.
*
* If the document is already tracked under a different path (e.g. after a
* rename) the old entry is removed so the map stays keyed by the latest
* disk path and `getDocumentByDocumentId` can't return a stale match.
*
* Whenever this relocates a tracked doc it also rewrites the `path`
* field of every queued `LocalUpdate` for the same doc. The invariant
* the queue relies on and that `skipIfOversized` and the watcher
* dedup checks bake in is that `event.path` always points at the
* doc's current disk location. Letting the map move out from under
* the events would leave readers like `getFileSize(event.path)`
* pointing at a vacated slot and silently swallowing the event.
* Maintains the `byLocalPath` index. If the `localPath` changes the
* relocation goes through `setLocalPath` (which also persists), so the
* caller doesn't need to call `save()` separately.
*/
public async setDocument(
path: RelativePath,
record: DocumentRecord
): Promise<void> {
// If a record for the same docId is already tracked, mutate it in
// place instead of inserting a fresh object. Callers (drain
// handlers, queued events) hold long-lived references to the
// record and read `.path` from it on every access — replacing the
// reference would orphan those reads at the old object's path
// value. Keeping the same object identity also keeps the
// `documents.get(record.path) === record` invariant trivially
// true after a rename.
let target: DocumentRecord | undefined;
for (const [existingPath, existingRecord] of this.documents) {
if (existingRecord.documentId === record.documentId) {
target = existingRecord;
if (existingPath !== path) {
this.documents.delete(existingPath);
}
public async upsertRecord(record: DocumentRecord): Promise<void> {
const existing = this.byDocId.get(record.documentId);
if (existing === undefined) {
const target: DocumentRecord = { ...record };
this.byDocId.set(record.documentId, target);
if (target.localPath !== undefined) {
// Route through `mutateLocalPathInPlace` so the
// localPath/byLocalPath invariant is upheld: if another
// record already holds this slot, displace it (clear
// its localPath) before installing `target`. Otherwise
// we'd leave the displaced record shadowed (its
// `localPath` still points at a slot that no longer
// belongs to it), which the Reconciler would then
// "rescue" by reading/renaming the file at that path
// — but that file belongs to `target` now, causing
// data loss.
target.localPath = undefined;
this.mutateLocalPathInPlace(target, record.localPath);
}
}
if (target === undefined) {
target = { ...record, path };
} else {
target.path = path;
target.intendedPath = record.intendedPath;
target.parentVersionId = record.parentVersionId;
target.remoteHash = record.remoteHash;
target.remoteRelativePath = record.remoteRelativePath;
}
this.documents.set(path, target);
for (const e of this.events) {
if (
e.type === SyncEventType.LocalUpdate &&
e.documentId === record.documentId
) {
e.path = path;
existing.parentVersionId = record.parentVersionId;
existing.remoteHash = record.remoteHash;
existing.remoteRelativePath = record.remoteRelativePath;
if (existing.localPath !== record.localPath) {
// setLocalPath re-keys `byLocalPath` and persists.
return this.setLocalPath(record.documentId, record.localPath);
}
}
return this.save();
}
public async removeDocument(path: RelativePath): Promise<void> {
this.documents.delete(path);
/**
* Update the `localPath` of an already-tracked record (by docId) and
* re-key the `byLocalPath` index. Called by both the watcher path
* (through `enqueue`) and the Reconciler.
*
* Pass `undefined` to mark the doc as "no local file" the Reconciler
* will place a file later (e.g. a remote create whose
* `remoteRelativePath` slot is occupied at receive time).
*/
public async setLocalPath(
documentId: DocumentId,
newLocalPath: RelativePath | undefined
): Promise<void> {
const record = this.byDocId.get(documentId);
if (record === undefined) {
return;
}
this.mutateLocalPathInPlace(record, newLocalPath);
return this.save();
}
public async removeDocumentById(documentId: DocumentId): Promise<void> {
const record = this.byDocId.get(documentId);
if (record === undefined) {
// Still clear any deletion-pending mark and purge stale
// RemoteChange events so a never-tracked doc doesn't accumulate
// entries.
this._pendingServerDeletes.delete(documentId);
this.purgeRemoteChangesForDocumentId(documentId);
return;
}
if (
record.localPath !== undefined &&
this._byLocalPath.get(record.localPath) === record
) {
this._byLocalPath.delete(record.localPath);
}
this.byDocId.delete(documentId);
this._pendingServerDeletes.delete(documentId);
// Drop any pending RemoteChange events for this doc. A common case:
// a catch-up RemoteChange for the doc was deferred indefinitely
// while the user's LocalDelete (and any LocalUpdate behind it) sat
// in the queue ahead of it. Once those drain and the doc is
// removed, a still-pending RemoteChange for an earlier version
// would be processed by `processRemoteCreateForNewDocument` (the
// doc is now untracked, and catch-up's `isNewFile=true` semantics
// qualify it as a fresh create), resurrecting the doc on disk
// with stale bytes that disagree with every other agent.
this.purgeRemoteChangesForDocumentId(documentId);
return this.save();
}
/**
* Mark a doc as "HTTP DELETE has been acked by the server but the
* WebSocket receipt that would call `removeDocumentById` hasn't arrived
* yet". The Reconciler and remote-update wire-loop handlers consult
* `hasPendingServerDelete` to skip any work that would resurrect the
* doc. Cleared automatically by `removeDocumentById`.
*/
public markServerDeletePending(documentId: DocumentId): void {
this._pendingServerDeletes.add(documentId);
}
public hasPendingServerDelete(documentId: DocumentId): boolean {
return this._pendingServerDeletes.has(documentId);
}
public getDocumentByDocumentId(
target: DocumentId
): DocumentRecord | undefined {
for (const record of this.documents.values()) {
if (record.documentId === target) {
return record;
}
}
return undefined;
return this.byDocId.get(target);
}
public getDocumentByDocumentIdOrFail(target: DocumentId): DocumentRecord {
@ -440,26 +546,45 @@ export class SyncEventQueue {
return result;
}
public getRecordByLocalPath(
path: RelativePath
): DocumentRecord | undefined {
return this._byLocalPath.get(path);
}
public async save(): Promise<void> {
return this.saveData({
documents: Array.from(this.documents.values()),
schemaVersion: STORED_STATE_SCHEMA_VERSION,
documents: Array.from(this.byDocId.values()),
lastSeenUpdateId: this.lastSeenUpdateId
});
}
// todo: let's remove
public getSettledDocumentByPath(
path: RelativePath
): DocumentRecord | undefined {
return this.documents.get(path);
public allSettledDocuments(): Map<RelativePath, DocumentRecord> {
const result = new Map<RelativePath, DocumentRecord>();
for (const record of this.byDocId.values()) {
if (record.localPath !== undefined) {
result.set(record.localPath, record);
}
}
return result;
}
public allSettledDocuments(): Map<RelativePath, DocumentRecord> {
return new Map(this.documents.entries());
/**
* Every tracked record, regardless of whether it has been placed on
* disk yet. The Reconciler uses this to find records whose
* `localPath === undefined` (e.g. a remote create that landed when
* its target slot was occupied) and try to place them once the
* obstruction clears. `allSettledDocuments` filters those out, so
* relying on it would render placement-pending records invisible
* forever.
*/
public allRecords(): Iterable<DocumentRecord> {
return this.byDocId.values();
}
public hasPendingEventsForPath(path: RelativePath): boolean {
const record = this.documents.get(path);
const record = this._byLocalPath.get(path);
if (record === undefined) {
return true; // if we don't know about this path, it must be pending creation
}
@ -474,7 +599,7 @@ export class SyncEventQueue {
(e.type === SyncEventType.RemoteChange &&
// we care about the local path not the remote
this.getDocumentByDocumentId(e.remoteVersion.documentId)
?.path === path)
?.localPath === path)
);
}
@ -490,7 +615,9 @@ export class SyncEventQueue {
public async clearAllState(): Promise<void> {
this.clearPending();
this.documents.clear();
this.byDocId.clear();
this._byLocalPath.clear();
this._pendingServerDeletes.clear();
this._lastSeenUpdateId.reset();
await this.save();
}
@ -504,10 +631,6 @@ export class SyncEventQueue {
}
}
private notifyPendingUpdateCountChanged(): void {
this.onPendingUpdateCountChanged.trigger(this.events.length);
}
public findLatestCreateForPath(
path: RelativePath
): Extract<SyncEvent, { type: SyncEventType.LocalCreate }> | undefined {
@ -525,7 +648,9 @@ export class SyncEventQueue {
newPath: RelativePath
): void {
const createEvent = this.findLatestCreateForPath(oldPath);
if (createEvent === undefined) { return; }
if (createEvent === undefined) {
return;
}
const { promise } = createEvent.resolvers;
createEvent.path = newPath;
@ -540,6 +665,54 @@ export class SyncEventQueue {
}
}
/**
* Synchronous half of `setLocalPath`: mutate `record.localPath` and
* re-key `_byLocalPath` without persisting. Used by `enqueue`'s
* rename branch where the synchronicity contract requires we push
* the LocalUpdate event before awaiting the save.
*
* Enforces the invariant
* `record.localPath !== undefined ⇒ byLocalPath.get(record.localPath) === record`.
* If `newLocalPath` is currently held by a different record, that
* record is *displaced*: its `localPath` is cleared so it enters
* placement-pending state, and the Reconciler's next pass will
* re-place it via `tryInitialPlacement`. Without this displacement
* the prior holder would remain shadowed (its `localPath === P`
* but `byLocalPath[P]` points elsewhere) and the Reconciler could
* later try to "rescue" the shadowed record by reading/renaming
* the file at `P` which belongs to the new owner now causing
* data loss. This is the architectural fix for bug D
* (`Files from agent-1 missing in agent-0` after a same-path
* create cycle).
*/
private mutateLocalPathInPlace(
record: DocumentRecord,
newLocalPath: RelativePath | undefined
): void {
if (
record.localPath !== undefined &&
this._byLocalPath.get(record.localPath) === record
) {
this._byLocalPath.delete(record.localPath);
}
record.localPath = newLocalPath;
if (newLocalPath !== undefined) {
const displaced = this._byLocalPath.get(newLocalPath);
if (displaced !== undefined && displaced !== record) {
// Invariant: `byLocalPath[displaced.localPath] === displaced`.
// We're about to overwrite that slot, so clear the
// displaced record's localPath; the reconciler will
// re-place it via tryInitialPlacement on the next pass.
displaced.localPath = undefined;
}
this._byLocalPath.set(newLocalPath, record);
}
}
private notifyPendingUpdateCountChanged(): void {
this.onPendingUpdateCountChanged.trigger(this.events.length);
}
private rejectAllPendingCreates(): void {
for (const event of this.events) {
if (event.type === SyncEventType.LocalCreate) {
@ -550,4 +723,23 @@ export class SyncEventQueue {
}
}
}
private purgeRemoteChangesForDocumentId(documentId: DocumentId): void {
const toRemove = this.events.filter(
(e) =>
e.type === SyncEventType.RemoteChange &&
e.remoteVersion.documentId === documentId
);
for (const event of toRemove) {
if (event.type === SyncEventType.RemoteChange) {
// Advance the watermark for the dropped event so the gap
// doesn't leave the catch-up replay this id forever.
this._lastSeenUpdateId.add(event.remoteVersion.vaultUpdateId);
}
removeFromArray(this.events, event);
}
if (toRemove.length > 0) {
this.notifyPendingUpdateCountChanged();
}
}
}

File diff suppressed because it is too large Load diff

View file

@ -5,30 +5,26 @@ export type DocumentId = string;
export type RelativePath = string;
export interface DocumentRecord {
// The doc's current local disk path. The queue's `documents` map is
// keyed by this same string and the invariant `documents.get(record.path)
// === record` is held by every queue mutation. Stored as a field on the
// record (not just as the map key) so callers can hold a stable
// reference to the record and read `.path` for the live value rather
// than capturing a string into a local variable that goes stale on the
// next rename.
path: RelativePath;
// Set when the doc's local file lives at a `conflict-<uuid>-` path
// because an earlier remote create / remote rename couldn't claim the
// path the server has it at (it was occupied locally at the time).
// Server-bound requests for this doc must use `intendedPath` rather
// than `path`, otherwise the server would learn about the local
// conflict-uuid path and propagate it as the doc's canonical location
// to every other client. `undefined` for docs whose local path matches
// the server's view.
intendedPath?: RelativePath;
documentId: DocumentId;
parentVersionId: VaultUpdateId;
remoteHash: string;
// Hash of the last server version this client has observed for the doc.
// `undefined` means we have a record but haven't actually seen content
// yet — typically a remote-create whose target slot was occupied at
// receive time, where we deliberately defer the fetch to the reconciler.
// Consumers should treat undefined as "no comparison possible" (the
// fast-skip in `processLocalUpdate` falls through to a real upload).
remoteHash: string | undefined;
remoteRelativePath: RelativePath;
// Where the doc's file currently lives on disk. `undefined` means the doc
// has no local file yet — happens for a remote create whose
// `remoteRelativePath` slot was occupied at receive time. The reconciler
// will place the file once the slot frees, fetching content from the
// server on demand.
localPath: RelativePath | undefined;
}
export interface StoredSyncState {
schemaVersion: number;
documents: DocumentRecord[] | undefined;
lastSeenUpdateId: VaultUpdateId | undefined;
}
@ -43,36 +39,35 @@ export enum SyncEventType {
export type FileSyncEvent =
| { type: SyncEventType.LocalCreate; path: RelativePath }
| {
type: SyncEventType.LocalUpdate;
path: RelativePath;
oldPath?: RelativePath; // oldPath is undefined for content changes
}
type: SyncEventType.LocalUpdate;
path: RelativePath;
oldPath?: RelativePath; // oldPath is undefined for content changes
}
| { type: SyncEventType.LocalDelete; path: RelativePath }
| {
type: SyncEventType.RemoteChange;
remoteVersion: DocumentVersionWithoutContent;
};
type: SyncEventType.RemoteChange;
remoteVersion: DocumentVersionWithoutContent;
};
export type SyncEvent =
| {
type: SyncEventType.LocalCreate;
path: RelativePath; // current path on disk
originalPath: RelativePath; // original path on disk when the event was queued
resolvers: PromiseWithResolvers<DocumentId>;
}
type: SyncEventType.LocalCreate;
path: RelativePath; // current path on disk; mutated in place by `updatePendingCreatePath` when the user renames mid-flight
resolvers: PromiseWithResolvers<DocumentId>;
}
| {
type: SyncEventType.LocalUpdate;
documentId: DocumentId | Promise<DocumentId>; // if it's a promise, the promise is fulfilled once the document's create event is processed
path: RelativePath; // current path on disk
originalPath: RelativePath; // original path on disk when the event was queued
isUserRename: boolean; // true iff this event was queued because the user renamed the file
}
type: SyncEventType.LocalUpdate;
documentId: DocumentId | Promise<DocumentId>; // if it's a promise, the promise is fulfilled once the document's create event is processed
path: RelativePath; // current path on disk
originalPath: RelativePath; // original path on disk when the event was queued
isUserRename: boolean; // true iff this event was queued because the user renamed the file
}
| {
type: SyncEventType.LocalDelete;
documentId: DocumentId | Promise<DocumentId>; // if it's a promise, the promise is fulfilled once the document's create event is processed
path: RelativePath; // only used for showing on the UI
}
type: SyncEventType.LocalDelete;
documentId: DocumentId | Promise<DocumentId>; // if it's a promise, the promise is fulfilled once the document's create event is processed
path: RelativePath; // only used for showing on the UI
}
| {
type: SyncEventType.RemoteChange;
remoteVersion: DocumentVersionWithoutContent;
};
type: SyncEventType.RemoteChange;
remoteVersion: DocumentVersionWithoutContent;
};

View file

@ -65,7 +65,6 @@ export enum SyncStatus {
SKIPPED = "SKIPPED"
}
export interface HistoryStats {
success: number;
error: number;
@ -83,7 +82,7 @@ export class SyncHistory {
error: 0
};
public constructor(private readonly logger: Logger) { }
public constructor(private readonly logger: Logger) {}
public get entries(): readonly HistoryEntry[] {
return this._entries;
@ -136,8 +135,8 @@ export class SyncHistory {
candidate !== undefined &&
(this._entries[0] === candidate ||
candidate.timestamp.getTime() +
TIMEOUT_FOR_MERGING_HISTORY_ENTRIES_IN_SECONDS * 1000 >
entry.timestamp.getTime())
TIMEOUT_FOR_MERGING_HISTORY_ENTRIES_IN_SECONDS * 1000 >
entry.timestamp.getTime())
) {
return candidate;
}

View file

@ -43,7 +43,9 @@ export class EventListeners<TListener extends (...args: any[]) => any> {
const snapshot = this.listeners.slice();
for (const listener of snapshot) {
// allow removing listeners during the trigger loop
if (!this.listeners.includes(listener)) {continue;}
if (!this.listeners.includes(listener)) {
continue;
}
listener(...args);
}
}
@ -59,7 +61,9 @@ export class EventListeners<TListener extends (...args: any[]) => any> {
const snapshot = this.listeners.slice();
const promises: Promise<unknown>[] = [];
for (const listener of snapshot) {
if (!this.listeners.includes(listener)) {continue;}
if (!this.listeners.includes(listener)) {
continue;
}
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
const result = listener(...args);
if (result instanceof Promise) {

View file

@ -16,7 +16,7 @@
export class MinCovered {
private seenValues: number[] = [];
public constructor(private minValue: number) { }
public constructor(private minValue: number) {}
public get min(): number {
return this.minValue;

View file

@ -10,5 +10,8 @@ export async function findMatchingFile(
return undefined;
}
return candidates.find((record) => record.remoteHash === contentHash);
return candidates.find(
(record) =>
record.remoteHash !== undefined && record.remoteHash === contentHash
);
}

View file

@ -1,8 +1,11 @@
export async function hash(content: Uint8Array): Promise<string> {
const digest = await crypto.subtle.digest(
"SHA-256",
content as Uint8Array<ArrayBuffer>
);
// Re-wrap into a fresh Uint8Array<ArrayBuffer> so SubtleCrypto's
// BufferSource overload accepts it without an unsafe type assertion.
// The lib types require an ArrayBuffer-backed view; the source may
// be backed by SharedArrayBuffer in some runtimes.
const buffer = new ArrayBuffer(content.byteLength);
new Uint8Array(buffer).set(content);
const digest = await crypto.subtle.digest("SHA-256", buffer);
const bytes = new Uint8Array(digest);
return Array.from(bytes, (b) => b.toString(16).padStart(2, "0")).join("");
}

View file

@ -1,3 +1,4 @@
import { awaitAll } from "./await-all";
import { sleep } from "./sleep";
/**
@ -52,10 +53,7 @@ export function rateLimit<
? minIntervalMs()
: minIntervalMs;
const fnPromise = fn(...args);
running = Promise.all([
fnPromise.catch(() => undefined),
sleep(interval)
]);
running = awaitAll([fnPromise.catch(() => undefined), sleep(interval)]);
return fnPromise;
};