wip better queue
This commit is contained in:
parent
d034ad5cb3
commit
1a4e39d57a
3 changed files with 414 additions and 219 deletions
|
|
@ -14,9 +14,19 @@ import {
|
|||
} from "./types";
|
||||
|
||||
export class SyncEventQueue {
|
||||
private readonly events: SyncEvent[] = [];
|
||||
// latest state of the filesystem as we know it, excluding
|
||||
// unconfirmed creates but including pending deletes,
|
||||
// it's always indexed by the latest path on disk
|
||||
private readonly documents = new Map<RelativePath, DocumentRecord>();
|
||||
private readonly recentlyDeletedDocumentIds = new Set<DocumentId>();
|
||||
|
||||
// all outstanding operations in order of occurrence,
|
||||
// can include multiple generations of the same document,
|
||||
// e.g.: a create, delete, create sequence for the same path.
|
||||
// The paths for the events must always correspond to the latest
|
||||
// path on disk, so the path of each event may be updated multiple
|
||||
// times.
|
||||
private readonly events: SyncEvent[] = [];
|
||||
|
||||
private lastSeenUpdateIds: CoveredValues;
|
||||
private ignorePatterns: RegExp[];
|
||||
|
||||
|
|
@ -55,7 +65,7 @@ export class SyncEventQueue {
|
|||
this.lastSeenUpdateIds.add(record.parentVersionId);
|
||||
}
|
||||
|
||||
this.logger.debug(`Loaded ${this.documents.size} documents`);
|
||||
this.logger.debug(`Loaded ${this.documents.size} documents and lastSeenUpdateId=${this.lastSeenUpdateIds.min}`);
|
||||
}
|
||||
|
||||
public get size(): number {
|
||||
|
|
@ -66,10 +76,15 @@ export class SyncEventQueue {
|
|||
return this.documents.size;
|
||||
}
|
||||
|
||||
public getLastSeenUpdateId(): VaultUpdateId {
|
||||
public get lastSeenUpdateId(): VaultUpdateId {
|
||||
return this.lastSeenUpdateIds.min;
|
||||
}
|
||||
|
||||
public set lastSeenUpdateId(value: number) {
|
||||
this.lastSeenUpdateIds.min = value;
|
||||
this.saveInTheBackground();
|
||||
}
|
||||
|
||||
public addSeenUpdateId(value: number): void {
|
||||
const previousMin = this.lastSeenUpdateIds.min;
|
||||
this.lastSeenUpdateIds.add(value);
|
||||
|
|
@ -78,12 +93,8 @@ export class SyncEventQueue {
|
|||
}
|
||||
}
|
||||
|
||||
public setLastSeenUpdateId(value: number): void {
|
||||
this.lastSeenUpdateIds.min = value;
|
||||
this.saveInTheBackground();
|
||||
}
|
||||
|
||||
public getDocument(path: RelativePath): DocumentRecord | undefined {
|
||||
public getSettledDocumentByPath(path: RelativePath): DocumentRecord | undefined {
|
||||
return this.documents.get(path);
|
||||
}
|
||||
|
||||
|
|
@ -104,86 +115,96 @@ export class SyncEventQueue {
|
|||
}
|
||||
|
||||
public removeDocument(path: RelativePath): void {
|
||||
const record = this.documents.get(path);
|
||||
if (record !== undefined) {
|
||||
this.recentlyDeletedDocumentIds.add(record.documentId);
|
||||
}
|
||||
this.documents.delete(path);
|
||||
this.saveInTheBackground();
|
||||
}
|
||||
|
||||
/**
|
||||
* Move a document from oldPath to newPath.
|
||||
* If the target path is occupied by a different document, it is removed
|
||||
* and its documentId is returned so the caller can handle the displacement.
|
||||
* Settle a Create event: add the document to the settled map,
|
||||
* resolve the create promise, and replace promise-based documentId
|
||||
* references in the event queue with the actual string documentId.
|
||||
*/
|
||||
public moveDocument(
|
||||
oldPath: RelativePath,
|
||||
newPath: RelativePath
|
||||
): DocumentId | undefined {
|
||||
const record = this.documents.get(oldPath);
|
||||
if (record === undefined) return undefined;
|
||||
public resolveCreate(
|
||||
event: Extract<SyncEvent, { type: SyncEventType.Create }>,
|
||||
record: DocumentRecord
|
||||
): void {
|
||||
const promise = event.resolvers?.promise;
|
||||
|
||||
let displacedDocumentId: DocumentId | undefined = undefined;
|
||||
const existingAtTarget = this.documents.get(newPath);
|
||||
if (
|
||||
existingAtTarget !== undefined &&
|
||||
existingAtTarget.documentId !== record.documentId
|
||||
) {
|
||||
displacedDocumentId = existingAtTarget.documentId;
|
||||
this.recentlyDeletedDocumentIds.add(displacedDocumentId);
|
||||
this.documents.delete(newPath);
|
||||
this.documents.set(event.path, record);
|
||||
event.resolvers?.resolve(record.documentId);
|
||||
|
||||
if (promise !== undefined) {
|
||||
for (const e of this.events) {
|
||||
if (
|
||||
(e.type === SyncEventType.SyncLocal || e.type === SyncEventType.Delete) &&
|
||||
e.documentId === promise
|
||||
) {
|
||||
(e as { documentId: DocumentId | Promise<DocumentId> }).documentId = record.documentId;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
this.documents.delete(oldPath);
|
||||
this.documents.set(newPath, record);
|
||||
this.saveInTheBackground();
|
||||
return displacedDocumentId;
|
||||
}
|
||||
|
||||
public wasRecentlyDeleted(documentId: DocumentId): boolean {
|
||||
return this.recentlyDeletedDocumentIds.has(documentId);
|
||||
public getCreatePromise(path: RelativePath): Promise<DocumentId> | undefined {
|
||||
const event = this.findLastCreate(path);
|
||||
if (event === undefined) return undefined;
|
||||
event.resolvers ??= Promise.withResolvers<DocumentId>();
|
||||
return event.resolvers.promise;
|
||||
}
|
||||
|
||||
public unmarkRecentlyDeleted(documentId: DocumentId): void {
|
||||
this.recentlyDeletedDocumentIds.delete(documentId);
|
||||
}
|
||||
|
||||
|
||||
public allDocuments(): [RelativePath, DocumentRecord][] {
|
||||
public allSettledDocuments(): [RelativePath, DocumentRecord][] {
|
||||
return Array.from(this.documents.entries());
|
||||
}
|
||||
|
||||
public hasCreateEvent(path: RelativePath): boolean {
|
||||
return this.events.some(
|
||||
(e) => e.type === SyncEventType.Create && e.path === path
|
||||
);
|
||||
}
|
||||
/**
|
||||
* Returns the set of paths we expect to exist on disk by replaying
|
||||
* the event queue on top of the settled documents map.
|
||||
*/
|
||||
public trackedPaths(): Set<RelativePath> {
|
||||
const paths = new Set(this.documents.keys());
|
||||
// Track current path for each pending create so moves can be applied
|
||||
const pendingPaths = new Map<Promise<DocumentId>, RelativePath>();
|
||||
|
||||
public updateCreatePath(
|
||||
oldPath: RelativePath,
|
||||
newPath: RelativePath
|
||||
): boolean {
|
||||
for (const event of this.events) {
|
||||
if (event.type === SyncEventType.Create && event.path === oldPath) {
|
||||
event.path = newPath;
|
||||
return true;
|
||||
}
|
||||
if (event.type === SyncEventType.Create) {
|
||||
paths.add(event.path);
|
||||
if (event.resolvers !== undefined) {
|
||||
pendingPaths.set(event.resolvers.promise, event.path);
|
||||
}
|
||||
} else if (event.type === SyncEventType.Delete) {
|
||||
if (typeof event.documentId === "string") {
|
||||
const path = this.getDocumentByDocumentId(event.documentId)?.path;
|
||||
if (path) {
|
||||
paths.delete(path);
|
||||
} else {
|
||||
throw new Error(`Delete event for unknown documentId ${event.documentId}`);
|
||||
}
|
||||
} else {
|
||||
const path = pendingPaths.get(event.documentId);
|
||||
if (!path) {
|
||||
throw new Error(`Delete event with unresolved documentId promise`);
|
||||
}
|
||||
paths.delete(path);
|
||||
}
|
||||
} // no need to handle SyncLocal as path updates are applied to this.documents immediately when the event is enqueued
|
||||
}
|
||||
return false;
|
||||
return paths;
|
||||
}
|
||||
|
||||
public hasPendingEventsForPath(path: RelativePath): boolean {
|
||||
const record = this.documents.get(path);
|
||||
const docId = record?.documentId;
|
||||
if (!record) {
|
||||
return true; // if we don't know about this path, it must be pending creation
|
||||
}
|
||||
const docId = record.documentId;
|
||||
return this.events.some(
|
||||
(e) =>
|
||||
(e.type === SyncEventType.Create && e.path === path) ||
|
||||
(e.type === SyncEventType.SyncLocal &&
|
||||
docId !== undefined &&
|
||||
e.documentId === docId) ||
|
||||
(e.type === SyncEventType.Delete &&
|
||||
docId !== undefined &&
|
||||
e.documentId === docId) ||
|
||||
(e.type === SyncEventType.SyncRemote &&
|
||||
e.remoteVersion.relativePath === path)
|
||||
|
|
@ -203,31 +224,26 @@ export class SyncEventQueue {
|
|||
}
|
||||
|
||||
public resetState(): void {
|
||||
this.rejectAllPendingCreates();
|
||||
this.documents.clear();
|
||||
this.recentlyDeletedDocumentIds.clear();
|
||||
this.lastSeenUpdateIds = new CoveredValues(0);
|
||||
this.saveInTheBackground();
|
||||
}
|
||||
|
||||
public clear(): void {
|
||||
this.rejectAllPendingCreates();
|
||||
this.events.length = 0;
|
||||
this.recentlyDeletedDocumentIds.clear();
|
||||
}
|
||||
|
||||
public enqueue(event: SyncEvent): void {
|
||||
if (this.isIgnored(event)) return;
|
||||
|
||||
if (event.type === SyncEventType.Create) {
|
||||
if (this.documents.has(event.path)) return;
|
||||
if (this.hasCreateEvent(event.path)) return;
|
||||
}
|
||||
|
||||
this.events.push(event);
|
||||
}
|
||||
|
||||
|
||||
|
||||
public next(): SyncEvent | undefined {
|
||||
public async next(): Promise<SyncEvent | undefined> {
|
||||
if (this.events.length === 0) return undefined;
|
||||
|
||||
const [first] = this.events;
|
||||
|
|
@ -244,9 +260,7 @@ export class SyncEventQueue {
|
|||
if (first.type === SyncEventType.Delete) {
|
||||
this.events.shift();
|
||||
const { documentId } = first;
|
||||
if (documentId !== "") {
|
||||
this.removeAllEventsForDocumentId(documentId);
|
||||
}
|
||||
this.removeAllEventsForDocumentId(await documentId);
|
||||
return first;
|
||||
}
|
||||
|
||||
|
|
@ -261,16 +275,18 @@ export class SyncEventQueue {
|
|||
e.documentId === documentId
|
||||
);
|
||||
if (deleteEvent !== undefined) {
|
||||
this.removeAllSyncLocalsForDocumentId(documentId);
|
||||
this.removeAllSyncLocalsForDocumentId(await documentId);
|
||||
removeFromArray(this.events, deleteEvent);
|
||||
return deleteEvent;
|
||||
}
|
||||
|
||||
// Coalesce multiple sync-locals for the same documentId to the last one
|
||||
// Coalesce multiple sync-locals for the same documentId and
|
||||
// original path to the last one
|
||||
const matching = this.events.filter(
|
||||
(e) =>
|
||||
e.type === SyncEventType.SyncLocal &&
|
||||
e.documentId === documentId
|
||||
e.documentId === documentId &&
|
||||
e.originalPath === first.originalPath
|
||||
);
|
||||
const result = matching[matching.length - 1];
|
||||
for (const item of matching) {
|
||||
|
|
@ -328,6 +344,49 @@ export class SyncEventQueue {
|
|||
}
|
||||
}
|
||||
|
||||
public updatePendingCreatePath(
|
||||
oldPath: RelativePath,
|
||||
newPath: RelativePath
|
||||
): void {
|
||||
const createEvent = this.findLastCreate(oldPath);
|
||||
if (createEvent === undefined) return;
|
||||
|
||||
const promise = createEvent.resolvers?.promise;
|
||||
createEvent.path = newPath;
|
||||
|
||||
if (promise !== undefined) {
|
||||
for (const e of this.events) {
|
||||
if (
|
||||
e.type === SyncEventType.SyncLocal &&
|
||||
e.documentId === promise
|
||||
) {
|
||||
e.path = newPath;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private findLastCreate(
|
||||
path: RelativePath
|
||||
): Extract<SyncEvent, { type: SyncEventType.Create }> | undefined {
|
||||
for (let i = this.events.length - 1; i >= 0; i--) {
|
||||
const e = this.events[i];
|
||||
if (e.type === SyncEventType.Create && e.path === path) {
|
||||
return e;
|
||||
}
|
||||
}
|
||||
return undefined;
|
||||
}
|
||||
|
||||
private rejectAllPendingCreates(): void {
|
||||
for (const event of this.events) {
|
||||
if (event.type === SyncEventType.Create && event.resolvers !== undefined) {
|
||||
event.resolvers.promise.catch(() => { /* suppressed — consumer may not be listening */ });
|
||||
event.resolvers.reject(new Error("Create was cancelled"));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private saveInTheBackground(): void {
|
||||
void this.save().catch((error: unknown) => {
|
||||
this.logger.error(`Error saving sync state: ${error}`);
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue