This commit is contained in:
Andras Schmelczer 2026-04-25 12:43:47 +01:00
parent aecbcd1d2c
commit 7293c58a71
4 changed files with 370 additions and 716 deletions

View file

@ -71,21 +71,21 @@ describe("SyncEventQueue", () => {
const queue = createQueue();
queue.enqueue({
type: SyncEventType.RemoteUpdate,
type: SyncEventType.RemoteChange,
remoteVersion: fakeRemoteVersion("A", { vaultUpdateId: 1 })
});
queue.enqueue({
type: SyncEventType.RemoteUpdate,
type: SyncEventType.RemoteChange,
remoteVersion: fakeRemoteVersion("A", { vaultUpdateId: 2 })
});
queue.enqueue({
type: SyncEventType.RemoteUpdate,
type: SyncEventType.RemoteChange,
remoteVersion: fakeRemoteVersion("A", { vaultUpdateId: 3 })
});
const event = await queue.next();
assert.strictEqual(event?.type, SyncEventType.RemoteUpdate);
if (event?.type === SyncEventType.RemoteUpdate) {
assert.strictEqual(event?.type, SyncEventType.RemoteChange);
if (event?.type === SyncEventType.RemoteChange) {
assert.strictEqual(event.remoteVersion.vaultUpdateId, 3);
}
assert.strictEqual(await queue.next(), undefined);
@ -217,7 +217,7 @@ describe("SyncEventQueue", () => {
queue.enqueue({ type: SyncEventType.LocalDelete, path: "a.md" });
queue.enqueue({
type: SyncEventType.RemoteUpdate,
type: SyncEventType.RemoteChange,
remoteVersion: fakeRemoteVersion("A", { vaultUpdateId: 5 })
});
@ -238,7 +238,7 @@ describe("SyncEventQueue", () => {
queue.enqueue({ type: SyncEventType.LocalUpdate, path: "a.md" });
queue.enqueue({ type: SyncEventType.LocalCreate, path: "b.md" });
queue.enqueue({
type: SyncEventType.RemoteUpdate,
type: SyncEventType.RemoteChange,
remoteVersion: fakeRemoteVersion("A", { vaultUpdateId: 5 })
});
@ -342,7 +342,7 @@ describe("SyncEventQueue", () => {
assert.strictEqual(queue.pendingUpdateCount, 1);
queue.enqueue({
type: SyncEventType.RemoteUpdate,
type: SyncEventType.RemoteChange,
remoteVersion: fakeRemoteVersion("N")
});
assert.strictEqual(queue.pendingUpdateCount, 2);

View file

@ -13,10 +13,7 @@ import {
type SyncEvent,
type VaultUpdateId,
} from "./types";
import { sleep } from "../utils/sleep";
export const SAVE_RETRY_BASE_DELAY_MS = 50;
export const SAVE_RETRY_MAX_ATTEMPTS = 3;
export class SyncEventQueue {
// Latest state of the filesystem as we know it, excluding
@ -88,14 +85,14 @@ export class SyncEventQueue {
}
public enqueue(input: FileSyncEvent): void {
if (input.type === SyncEventType.RemoteUpdate) {
if (input.type === SyncEventType.RemoteChange) {
this.events.push(input);
return;
}
const { path } = input;
if (this.isIgnored(path)) {
if (this.ignorePatterns.some((pattern) => pattern.test(path))) {
this.logger.info(
`Ignoring ${input.type} for ${path} as it matches ignore patterns`
);
@ -103,15 +100,20 @@ export class SyncEventQueue {
}
if (input.type === SyncEventType.LocalCreate) {
this.events.push({ type: SyncEventType.LocalCreate, path, originalPath: path });
this.events.push({ type: SyncEventType.LocalCreate, path, originalPath: path, resolvers: Promise.withResolvers() });
return;
}
const lookupPath = (input.type === SyncEventType.LocalUpdate && input.oldPath) ? input.oldPath : path;
const record = this.documents.get(lookupPath);
const documentId: DocumentId | Promise<DocumentId> | undefined =
this.getLatestCreatePromise(lookupPath) ?? record?.documentId;
if (documentId === undefined) return;
this.findLatestCreateForPath(lookupPath)?.resolvers.promise ?? record?.documentId;
if (documentId === undefined) {
// we can get here when deleting a local document after a remote update
return;
}
if (input.type === SyncEventType.LocalDelete) {
this.events.push({ type: SyncEventType.LocalDelete, documentId });
@ -146,29 +148,56 @@ export class SyncEventQueue {
/**
* Call once a create has been acknowledged by the server.
*/
public resolveCreate(
public async resolveCreate(
event: Extract<SyncEvent, { type: SyncEventType.LocalCreate }>,
record: DocumentRecord
): void {
const promise = event.resolvers?.promise;
this.documents.set(event.path, record);
): Promise<void> {
removeFromArray(this.events, event); // in case the create event is still pending
await this.setDocument(event.path, record);
event.resolvers?.resolve(record.documentId);
}
if (promise !== undefined) {
for (const e of this.events) {
if (
(e.type === SyncEventType.LocalUpdate || e.type === SyncEventType.LocalDelete) &&
e.documentId === promise
) {
(e as { documentId: DocumentId | Promise<DocumentId> }).documentId = record.documentId;
}
/**
* Update the settled document map and persist the new document version.
*/
public setDocument(path: RelativePath, record: DocumentRecord): Promise<void> {
this.documents.set(path, record);
return this.save();
}
public removeDocument(path: RelativePath): Promise<void> {
this.documents.delete(path);
return this.save();
}
public getDocumentByDocumentId(
target: DocumentId
): { path: RelativePath; record: DocumentRecord } | undefined {
for (const [path, record] of this.documents) {
if (record.documentId === target) {
return { path, record };
}
}
this.saveInTheBackground();
return undefined;
}
public getDocumentByDocumentIdOrFail(
target: DocumentId
): { path: RelativePath; record: DocumentRecord } {
const result = this.getDocumentByDocumentId(target);
if (!result) {
throw new Error(`No document found with id ${target}`);
}
return result;
}
public async save(): Promise<void> {
return this.saveData({
documents: Array.from(this.documents.entries()).map(
@ -186,35 +215,9 @@ export class SyncEventQueue {
return this.documents.get(path);
}
public getDocumentByDocumentId(
target: DocumentId
): { path: RelativePath; record: DocumentRecord } | undefined {
for (const [path, record] of this.documents) {
if (record.documentId === target) {
return { path, record };
}
}
return undefined;
}
public setDocument(path: RelativePath, record: DocumentRecord): void {
this.documents.set(path, record);
this.saveInTheBackground();
}
public removeDocument(path: RelativePath): void {
this.documents.delete(path);
this.saveInTheBackground();
}
public getLatestCreatePromise(path: RelativePath): Promise<DocumentId> | undefined {
const event = this.findLatestCreate(path);
if (event === undefined) return undefined;
event.resolvers ??= Promise.withResolvers<DocumentId>();
return event.resolvers.promise;
}
public allSettledDocuments(): [RelativePath, DocumentRecord][] {
return Array.from(this.documents.entries());
@ -257,7 +260,7 @@ export class SyncEventQueue {
public hasPendingEventsForPath(path: RelativePath): boolean {
const record = this.documents.get(path);
if (!record) {
if (record === undefined) {
return true; // if we don't know about this path, it must be pending creation
}
const docId = record.documentId;
@ -268,12 +271,22 @@ export class SyncEventQueue {
e.documentId === docId) ||
(e.type === SyncEventType.LocalDelete &&
e.documentId === docId) ||
(e.type === SyncEventType.RemoteUpdate &&
(e.type === SyncEventType.RemoteChange &&
// we care about the local path not the remote
this.getDocumentByDocumentId(e.remoteVersion.documentId)?.path === path)
);
}
public hasPendingLocalEventsForDocumentId(documentId: DocumentId): boolean {
return this.events.some(
(e) =>
(e.type === SyncEventType.LocalUpdate &&
e.documentId === documentId) ||
(e.type === SyncEventType.LocalDelete &&
e.documentId === documentId)
);
}
public resetState(): void {
this.rejectAllPendingCreates();
@ -288,18 +301,13 @@ export class SyncEventQueue {
private isIgnored(path: RelativePath): boolean {
return this.ignorePatterns.some((pattern) => pattern.test(path));
}
public removeAllEventsForDocumentId(documentId: DocumentId): void {
for (let i = this.events.length - 1; i >= 0; i--) {
const e = this.events[i];
if (
(e.type === SyncEventType.LocalUpdate &&
e.documentId === documentId) ||
(e.type === SyncEventType.RemoteUpdate &&
(e.type === SyncEventType.RemoteChange &&
e.remoteVersion.documentId === documentId) ||
(e.type === SyncEventType.LocalDelete &&
e.documentId === documentId)
@ -310,11 +318,11 @@ export class SyncEventQueue {
}
}
public updatePendingCreatePath(
private updatePendingCreatePath(
oldPath: RelativePath,
newPath: RelativePath
): void {
const createEvent = this.findLatestCreate(oldPath);
const createEvent = this.findLatestCreateForPath(oldPath);
if (createEvent === undefined) return;
const promise = createEvent.resolvers?.promise;
@ -332,7 +340,7 @@ export class SyncEventQueue {
}
}
private findLatestCreate(
public findLatestCreateForPath(
path: RelativePath
): Extract<SyncEvent, { type: SyncEventType.LocalCreate }> | undefined {
for (let i = this.events.length - 1; i >= 0; i--) {
@ -344,40 +352,9 @@ export class SyncEventQueue {
return undefined;
}
/**
* Returns whether there is an unsynced Create event queued at `path`.
* A caller uses this to decide between displacing the local file vs.
* merging it with a concurrent remote create.
*/
public hasPendingCreateAt(path: RelativePath): boolean {
return this.findLatestCreate(path) !== undefined;
}
/**
* Cancel the latest queued Create for `path`. Rejects its resolver
* promise (so any dependent SyncLocal/Delete events that `await`ed
* the future documentId skip themselves gracefully) and removes the
* Create event from the queue. Returns true if a Create was found
* and cancelled.
*/
public cancelPendingCreate(path: RelativePath): boolean {
const event = this.findLatestCreate(path);
if (event === undefined) return false;
if (event.resolvers !== undefined) {
event.resolvers.promise.catch(() => {
/* suppressed — consumer may not be listening */
});
event.resolvers.reject(
new Error(
"Create was cancelled — merged with concurrent remote create"
)
);
}
removeFromArray(this.events, event);
return true;
}
private rejectAllPendingCreates(): void {
for (const event of this.events) {

File diff suppressed because it is too large Load diff

View file

@ -8,7 +8,7 @@ export interface DocumentRecord {
documentId: DocumentId;
parentVersionId: VaultUpdateId;
remoteHash: string;
remoteRelativePath?: RelativePath;
remoteRelativePath: RelativePath;
}
export interface StoredDocument extends DocumentRecord {
@ -24,21 +24,21 @@ export enum SyncEventType {
LocalCreate = "local-create",
LocalUpdate = "local-update", // includes both content and path changes
LocalDelete = "local-delete",
RemoteUpdate = "remote-update", // includes every type of update coming from the server
RemoteChange = "remote-change", // includes every type of create/update/delete coming from the server
}
export type FileSyncEvent =
| { type: SyncEventType.LocalCreate; path: RelativePath }
| { type: SyncEventType.LocalUpdate; path: RelativePath; oldPath?: RelativePath }
| { type: SyncEventType.LocalDelete; path: RelativePath }
| { type: SyncEventType.RemoteUpdate; remoteVersion: DocumentVersionWithoutContent };
| { type: SyncEventType.RemoteChange; remoteVersion: DocumentVersionWithoutContent };
export type SyncEvent =
| {
type: SyncEventType.LocalCreate;
path: RelativePath; // current path on disk
originalPath: RelativePath; // original path on disk when the event was queued
resolvers?: PromiseWithResolvers<DocumentId>
resolvers: PromiseWithResolvers<DocumentId>
}
| {
type: SyncEventType.LocalUpdate;
@ -52,6 +52,6 @@ export type SyncEvent =
documentId: DocumentId | Promise<DocumentId>; // if it's a promise, the promise is fulfilled once the document's create event is processed
}
| {
type: SyncEventType.RemoteUpdate;
type: SyncEventType.RemoteChange;
remoteVersion: DocumentVersionWithoutContent;
};