This commit is contained in:
Andras Schmelczer 2026-04-24 21:59:00 +01:00
parent 17a1f4d060
commit c9cf3239db
10 changed files with 200 additions and 509 deletions

View file

@ -3,9 +3,12 @@ import type { RelativePath } from "./types";
// Local-only files displaced by `FileOperations.ensureClearPath` are named
// `conflict-<uuid>-<originalName>`. The UUID is a full RFC-4122 v4 value so
// a user-authored filename that happens to start with `conflict-` doesn't
// get misclassified.
const CONFLICT_UUID_REGEX =
/^conflict-[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}-/u;
// get misclassified. The leading `(?:^|\/)` and trailing `[^/]*$` anchor the
// match to the final path segment so intermediate directories named after
// old conflict files (if a user renames one into a directory) don't ignore
// everything beneath them.
export const CONFLICT_PATH_REGEX =
/(?:^|\/)conflict-[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}-[^/]*$/u;
// Safe segment length for common filesystems (ext4 / NTFS / APFS all cap
// at 255 bytes). `conflict-<36-char-uuid>-` adds 46 bytes; reserve a few
@ -61,6 +64,5 @@ function truncateFileNameToByteLimit(
* strictly local and must stay invisible to the server.
*/
export function isConflictPath(path: RelativePath): boolean {
const fileName = path.substring(path.lastIndexOf("/") + 1);
return CONFLICT_UUID_REGEX.test(fileName);
return CONFLICT_PATH_REGEX.test(path);
}

View file

@ -278,7 +278,7 @@ describe("SyncEventQueue", () => {
const queue = createQueue();
queue.enqueue({ type: SyncEventType.LocalCreate, path: "a.md" });
const promise = queue.getCreatePromise("a.md");
const promise = queue.getLatestCreatePromise("a.md");
assert.ok(promise !== undefined);
// The syncer resolves via event.resolvers after dequeuing
@ -294,7 +294,7 @@ describe("SyncEventQueue", () => {
const queue = createQueue();
queue.enqueue({ type: SyncEventType.LocalCreate, path: "a.md" });
const promise = queue.getCreatePromise("a.md");
const promise = queue.getLatestCreatePromise("a.md");
assert.ok(promise !== undefined);
const event = await queue.next();
@ -311,8 +311,8 @@ describe("SyncEventQueue", () => {
queue.enqueue({ type: SyncEventType.LocalCreate, path: "a.md" });
queue.enqueue({ type: SyncEventType.LocalCreate, path: "b.md" });
const promiseA = queue.getCreatePromise("a.md");
const promiseB = queue.getCreatePromise("b.md");
const promiseA = queue.getLatestCreatePromise("a.md");
const promiseB = queue.getLatestCreatePromise("b.md");
assert.ok(promiseA !== undefined);
assert.ok(promiseB !== undefined);
@ -481,7 +481,7 @@ describe("SyncEventQueue", () => {
const queue = createQueue();
queue.enqueue({ type: SyncEventType.LocalCreate, path: "a.md" });
const createPromise = queue.getCreatePromise("a.md")!;
const createPromise = queue.getLatestCreatePromise("a.md")!;
// Dependent events enqueued while create is still pending
queue.enqueue({ type: SyncEventType.LocalUpdate, path: "a.md" });

View file

@ -1,7 +1,7 @@
import type { Settings } from "../persistence/settings";
import type { Logger } from "../tracing/logger";
import { globsToRegexes } from "../utils/globs-to-regexes";
import { isConflictPath } from "./conflict-path";
import { CONFLICT_PATH_REGEX } from "./conflict-path";
import { removeFromArray } from "../utils/remove-from-array";
import {
SyncEventType,
@ -44,7 +44,7 @@ export class SyncEventQueue {
private savePending = false;
private readonly lastSeenUpdateId: VaultUpdateId;
public readonly lastSeenUpdateId: VaultUpdateId;
public constructor(
private readonly settings: Settings,
@ -52,16 +52,19 @@ export class SyncEventQueue {
initialState: Partial<StoredSyncState> | undefined,
private readonly saveData: (data: StoredSyncState) => Promise<void>
) {
this.ignorePatterns = globsToRegexes(
this.settings.getSettings().ignorePatterns,
this.logger
);
this.ignorePatterns = [
CONFLICT_PATH_REGEX,
...globsToRegexes(
this.settings.getSettings().ignorePatterns,
this.logger
)
];
this.settings.onSettingsChanged.add((newSettings) => {
this.ignorePatterns = globsToRegexes(
newSettings.ignorePatterns,
this.logger
);
this.ignorePatterns = [
CONFLICT_PATH_REGEX,
...globsToRegexes(newSettings.ignorePatterns, this.logger)
];
});
initialState ??= {};
@ -84,6 +87,100 @@ export class SyncEventQueue {
return this.documents.size;
}
public enqueue(input: FileSyncEvent): void {
if (input.type === SyncEventType.RemoteUpdate) {
this.events.push(input);
return;
}
const { path } = input;
if (this.isIgnored(path)) {
this.logger.info(
`Ignoring ${input.type} for ${path} as it matches ignore patterns`
);
return;
}
if (input.type === SyncEventType.LocalCreate) {
this.events.push({ type: SyncEventType.LocalCreate, path, originalPath: path });
return;
}
const lookupPath = (input.type === SyncEventType.LocalUpdate && input.oldPath) ? input.oldPath : path;
const record = this.documents.get(lookupPath);
const documentId: DocumentId | Promise<DocumentId> | undefined =
this.getLatestCreatePromise(lookupPath) ?? record?.documentId;
if (documentId === undefined) return;
if (input.type === SyncEventType.LocalDelete) {
this.events.push({ type: SyncEventType.LocalDelete, documentId });
return;
}
if (input.oldPath !== undefined) {
if (typeof documentId === "string") {
this.documents.delete(input.oldPath);
this.documents.set(path, record!);
for (const e of this.events) {
// It already has a docId, so there can't be a pending create event for it
if (e.type === SyncEventType.LocalUpdate && e.documentId === documentId) {
e.path = path;
}
}
this.saveInTheBackground();
} else {
this.updatePendingCreatePath(input.oldPath, path);
}
}
this.events.push({ type: SyncEventType.LocalUpdate, documentId, path, originalPath: path });
}
public async next(): Promise<SyncEvent | undefined> {
return this.events.shift();
}
/**
* Call once a create has been acknowledged by the server.
*/
public resolveCreate(
event: Extract<SyncEvent, { type: SyncEventType.LocalCreate }>,
record: DocumentRecord
): void {
const promise = event.resolvers?.promise;
this.documents.set(event.path, record);
event.resolvers?.resolve(record.documentId);
if (promise !== undefined) {
for (const e of this.events) {
if (
(e.type === SyncEventType.LocalUpdate || e.type === SyncEventType.LocalDelete) &&
e.documentId === promise
) {
(e as { documentId: DocumentId | Promise<DocumentId> }).documentId = record.documentId;
}
}
}
this.saveInTheBackground();
}
public async save(): Promise<void> {
return this.saveData({
documents: Array.from(this.documents.entries()).map(
([relativePath, record]) => ({
relativePath,
...record
})
),
lastSeenUpdateId: this.lastSeenUpdateId
});
}
// todo: let's remove
public getSettledDocumentByPath(path: RelativePath): DocumentRecord | undefined {
return this.documents.get(path);
@ -110,87 +207,10 @@ export class SyncEventQueue {
this.saveInTheBackground();
}
/**
* Reflect a local rename in the queue's disk-path index.
*
* Mirrors the `input.oldPath !== undefined` branch of `enqueue`, but
* without emitting a new `SyncLocal` used by `FileOperations.move`
* when the rename is a byproduct of another sync operation (e.g. the
* user dragging a file) and the caller will push the resulting event
* separately, or not at all.
*
* If the rename targets a path that already holds a settled record
* (e.g. concurrent clobber), the destination's record is dropped: the
* caller is expected to have moved the displaced file out of the way
* via `ensureClearPath` already, so the dropped record reflects the
* now-orphaned disk state.
*/
public moveDocument(
oldPath: RelativePath,
newPath: RelativePath
): void {
if (oldPath === newPath) return;
const record = this.documents.get(oldPath);
if (record !== undefined) {
// If `newPath` already holds a settled record, overwriting it
// silently would orphan that document's identity. Warn so the
// bug is visible; the caller is expected to have freed the
// destination via `ensureClearPath` first.
const clobbered = this.documents.get(newPath);
if (clobbered !== undefined) {
this.logger.warn(
`moveDocument(${oldPath}${newPath}) is overwriting a settled record for document ${clobbered.documentId}; caller should have displaced it first`
);
}
this.documents.delete(oldPath);
this.documents.set(newPath, record);
for (const e of this.events) {
if (
e.type === SyncEventType.LocalUpdate &&
e.documentId === record.documentId
) {
e.path = newPath;
}
}
this.saveInTheBackground();
return;
}
// No settled record — the rename may be over a pending Create
// whose document hasn't been persisted on the server yet.
this.updatePendingCreatePath(oldPath, newPath);
}
/**
* Call once a create has been acknowledged by the server.
*/
public resolveCreate(
event: Extract<SyncEvent, { type: SyncEventType.LocalCreate }>,
record: DocumentRecord
): void {
const promise = event.resolvers?.promise;
this.documents.set(event.path, record);
event.resolvers?.resolve(record.documentId);
if (promise !== undefined) {
for (const e of this.events) {
if (
(e.type === SyncEventType.LocalUpdate || e.type === SyncEventType.LocalDelete) &&
e.documentId === promise
) {
(e as { documentId: DocumentId | Promise<DocumentId> }).documentId = record.documentId;
}
}
}
this.saveInTheBackground();
}
public getCreatePromise(path: RelativePath): Promise<DocumentId> | undefined {
const event = this.findLastCreate(path);
public getLatestCreatePromise(path: RelativePath): Promise<DocumentId> | undefined {
const event = this.findLatestCreate(path);
if (event === undefined) return undefined;
event.resolvers ??= Promise.withResolvers<DocumentId>();
return event.resolvers.promise;
@ -254,17 +274,6 @@ export class SyncEventQueue {
);
}
public async save(): Promise<void> {
return this.saveData({
documents: Array.from(this.documents.entries()).map(
([relativePath, record]) => ({
relativePath,
...record
})
),
lastSeenUpdateId: this.lastSeenUpdateId
});
}
public resetState(): void {
this.rejectAllPendingCreates();
@ -277,161 +286,11 @@ export class SyncEventQueue {
this.events.length = 0;
}
public enqueue(input: FileSyncEvent): void {
if (input.type === SyncEventType.RemoteUpdate) {
this.events.push(input);
return;
}
const { path } = input;
// Conflict-displaced files are local-only bookkeeping so a conflict
// hit is a debug-level event. A hit against a user-configured glob
// is a higher-signal "we're deliberately not syncing this" and
// stays at info.
if (isConflictPath(path)) {
this.logger.debug(
`Ignoring ${input.type} for ${path}: conflict-displaced file`
);
return;
}
if (this.matchesUserIgnorePattern(path)) {
this.logger.info(
`Ignoring ${input.type} for ${path} as it matches ignore patterns`
);
return;
}
if (input.type === SyncEventType.LocalCreate) {
this.events.push({ type: SyncEventType.LocalCreate, path, originalPath: path });
return;
}
const lookupPath = (input.type === SyncEventType.LocalUpdate && input.oldPath) ? input.oldPath : path;
const record = this.documents.get(lookupPath);
const documentId: DocumentId | Promise<DocumentId> | undefined =
record?.documentId ?? this.getCreatePromise(lookupPath);
if (documentId === undefined) return;
if (input.type === SyncEventType.LocalDelete) {
this.events.push({ type: SyncEventType.LocalDelete, documentId });
return;
}
if (input.oldPath !== undefined) {
if (typeof documentId === "string") {
this.documents.delete(input.oldPath);
this.documents.set(path, record!);
for (const e of this.events) {
if (e.type === SyncEventType.LocalUpdate && e.documentId === documentId) {
e.path = path;
}
}
this.saveInTheBackground();
} else {
this.updatePendingCreatePath(input.oldPath, path);
}
}
this.events.push({ type: SyncEventType.LocalUpdate, documentId, path, originalPath: path });
}
public async next(): Promise<SyncEvent | undefined> {
if (this.events.length === 0) return undefined;
const [first] = this.events;
// Creates are always returned immediately (FIFO)
if (first.type === SyncEventType.LocalCreate) {
this.events.shift();
return first;
}
// Deletes are returned immediately; also discard any subsequent
// events for the same documentId so stale broadcasts don't
// resurrect the document. If the documentId is still a pending
// `Promise<DocumentId>` (the originating Create hasn't landed
// yet), awaiting it may reject — handle that: the Create was
// cancelled, so the Delete has nothing to delete, just drop it.
if (first.type === SyncEventType.LocalDelete) {
this.events.shift();
const { documentId } = first;
let resolvedId: DocumentId;
try {
resolvedId = await documentId;
} catch {
this.logger.debug(
"Dropping Delete whose Create was cancelled before it could be synced"
);
return this.next();
}
this.removeAllEventsForDocumentId(resolvedId);
return first;
}
if (first.type === SyncEventType.LocalUpdate) {
const { documentId } = first;
// If there's a later delete for the same documentId, discard
// all sync-locals for that document and return the delete
const deleteEvent = this.events.find(
(e) =>
e.type === SyncEventType.LocalDelete &&
e.documentId === documentId
);
if (deleteEvent !== undefined) {
let resolvedId: DocumentId;
try {
resolvedId = await documentId;
} catch {
this.logger.debug(
"Dropping SyncLocal+Delete whose Create was cancelled before it could be synced"
);
return this.next();
}
this.removeAllEventsForDocumentId(resolvedId);
return deleteEvent;
}
// Coalesce multiple sync-locals for the same documentId and
// original path to the last one
const matching = this.events.filter(
(e) =>
e.type === SyncEventType.LocalUpdate &&
e.documentId === documentId &&
e.originalPath === first.originalPath // can't coalesce moves as they can depend on each other so we have to sync them in the same order, could do topological sort but let's keep it simple for now
);
const result = matching[matching.length - 1];
for (const item of matching) {
removeFromArray(this.events, item);
}
return result;
}
// Coalesce multiple RemoteUpdate events for the same documentId
// down to the last one — the `.next` walk already short-circuits
// on obsolete versions via `parentVersionId` checks, but compacting
// here keeps the queue bounded under burst remote activity.
const { documentId } = first.remoteVersion;
const matching = this.events.filter(
(e) =>
e.type === SyncEventType.RemoteUpdate &&
e.remoteVersion.documentId === documentId
);
const result = matching[matching.length - 1];
for (const item of matching) {
removeFromArray(this.events, item);
}
return result;
}
private matchesUserIgnorePattern(path: RelativePath): boolean {
return this.ignorePatterns.some((pattern) => pattern.test(path));
}
private isIgnored(path: RelativePath): boolean {
return isConflictPath(path) || this.matchesUserIgnorePattern(path);
return this.ignorePatterns.some((pattern) => pattern.test(path));
}
public removeAllEventsForDocumentId(documentId: DocumentId): void {
@ -455,7 +314,7 @@ export class SyncEventQueue {
oldPath: RelativePath,
newPath: RelativePath
): void {
const createEvent = this.findLastCreate(oldPath);
const createEvent = this.findLatestCreate(oldPath);
if (createEvent === undefined) return;
const promise = createEvent.resolvers?.promise;
@ -473,22 +332,7 @@ export class SyncEventQueue {
}
}
private findCreatePathByPromise(
promise: Promise<DocumentId>
): RelativePath | undefined {
for (let i = this.events.length - 1; i >= 0; i--) {
const e = this.events[i];
if (
e.type === SyncEventType.LocalCreate &&
e.resolvers?.promise === promise
) {
return e.path;
}
}
return undefined;
}
private findLastCreate(
private findLatestCreate(
path: RelativePath
): Extract<SyncEvent, { type: SyncEventType.LocalCreate }> | undefined {
for (let i = this.events.length - 1; i >= 0; i--) {
@ -506,7 +350,7 @@ export class SyncEventQueue {
* merging it with a concurrent remote create.
*/
public hasPendingCreateAt(path: RelativePath): boolean {
return this.findLastCreate(path) !== undefined;
return this.findLatestCreate(path) !== undefined;
}
/**
@ -517,7 +361,7 @@ export class SyncEventQueue {
* and cancelled.
*/
public cancelPendingCreate(path: RelativePath): boolean {
const event = this.findLastCreate(path);
const event = this.findLatestCreate(path);
if (event === undefined) return false;
if (event.resolvers !== undefined) {

View file

@ -44,7 +44,7 @@ export class Syncer {
private readonly queue: SyncEventQueue;
private _isFirstSyncComplete = false;
private _isFirstSyncStarted = false;
private runningScheduleSyncForOfflineChanges: Promise<void> | undefined;
private draining: Promise<void> | undefined;
private previousRemainingOperationsCount = 0;
@ -66,14 +66,6 @@ export class Syncer {
this.webSocketManager.onWebSocketStatusChanged.add((isConnected) => {
if (isConnected) {
this.sendHandshakeMessage();
// The server no longer carries an `is_initial_sync`
// terminator: it streams missed versions as individual
// VaultUpdates and then behaves like a live subscription.
// Mark first-sync as complete once we've observed the
// transition to "connected" — per-path sync status still
// relies on `hasPendingEventsForPath`, which correctly
// shows SYNCING while catch-up events are in flight.
this._isFirstSyncComplete = true;
}
});
this.webSocketManager.onRemoteVaultUpdateReceived.add(
@ -82,7 +74,7 @@ export class Syncer {
}
public get isFirstSyncComplete(): boolean {
return this._isFirstSyncComplete;
return this._isFirstSyncStarted;
}
public syncLocallyCreatedFile(relativePath: RelativePath): void {
@ -110,11 +102,7 @@ export class Syncer {
}
// Handler for every `WebSocketVaultUpdate` the server emits. The
// server filters out messages authored by this device, so every
// update here comes from a peer (or is part of the catch-up stream
// the server replays on connect for versions we missed while
// offline).
public async syncRemotelyUpdatedFile(
message: WebSocketVaultUpdate
): Promise<void> {
@ -126,6 +114,8 @@ export class Syncer {
});
this.ensureDraining();
this._isFirstSyncStarted = true;
}
public async scheduleSyncForOfflineChanges(): Promise<void> {
@ -167,7 +157,7 @@ export class Syncer {
public reset(): void {
this._isFirstSyncComplete = false;
this._isFirstSyncStarted = false;
this.queue.clear();
// Don't null the reference synchronously — if the scan is
// still in flight, the next reconnect would spawn a second
@ -220,14 +210,12 @@ export class Syncer {
);
});
await this.scheduleDrain();
this.ensureDraining();
await this.draining;
}
private ensureDraining(): void {
void this.chainOntoDrain(async () => this.drain());
}
/**
* Serialize a unit of work onto the same promise chain the drain
@ -248,12 +236,11 @@ export class Syncer {
);
return chained;
}
private async scheduleDrain(): Promise<void> {
this.ensureDraining();
await this.draining;
private ensureDraining(): void {
void this.chainOntoDrain(async () => this.drain());
}
private async drain(): Promise<void> {
let event = await this.queue.next();
while (event !== undefined) {

View file

@ -16,7 +16,7 @@ export interface StoredDocument extends DocumentRecord {
}
export interface StoredSyncState {
documents: StoredDocument[];
documents: StoredDocument[] | undefined;
lastSeenUpdateId: VaultUpdateId | undefined;
}