Format & lint
This commit is contained in:
parent
fefac224b0
commit
7f62273e72
179 changed files with 2210 additions and 1319 deletions
|
|
@ -3,8 +3,8 @@ import type { Logger } from "../tracing/logger";
|
|||
import { globsToRegexes } from "../utils/globs-to-regexes";
|
||||
import { CONFLICT_PATH_REGEX } from "./conflict-path";
|
||||
import { removeFromArray } from "../utils/remove-from-array";
|
||||
import type { DocumentWithPath } from "./types";
|
||||
import {
|
||||
DocumentWithPath,
|
||||
SyncEventType,
|
||||
type DocumentId,
|
||||
type DocumentRecord,
|
||||
|
|
@ -12,27 +12,28 @@ import {
|
|||
type RelativePath,
|
||||
type StoredSyncState,
|
||||
type SyncEvent,
|
||||
type VaultUpdateId,
|
||||
type VaultUpdateId
|
||||
} from "./types";
|
||||
import { MinCovered } from "../utils/data-structures/min-covered";
|
||||
|
||||
|
||||
export class SyncEventQueue {
|
||||
private _lastSeenUpdateId: MinCovered;
|
||||
|
||||
// Latest state of the filesystem as we know it, excluding
|
||||
// unconfirmed creates but including pending deletes.
|
||||
//
|
||||
// It's always indexed by the latest path on disk.
|
||||
//
|
||||
//
|
||||
// It maps a subset of the remote state onto the local filesystem.
|
||||
private readonly documents = new Map<RelativePath, DocumentRecord>();
|
||||
|
||||
// All outstanding operations in order of occurrence,
|
||||
// can include multiple generations of the same document,
|
||||
// can include multiple generations of the same document,
|
||||
// e.g.: a create, delete, create sequence for the same path.
|
||||
//
|
||||
// The paths within the events must always correspond to the latest
|
||||
// path on disk, so the path of each event may be updated multiple
|
||||
// times.
|
||||
// times.
|
||||
//
|
||||
// It maps pending changes onto the local filesystem.
|
||||
private readonly events: SyncEvent[] = [];
|
||||
|
|
@ -40,8 +41,6 @@ export class SyncEventQueue {
|
|||
// file creations for paths matching any of these patterns will be ignored
|
||||
private ignorePatterns: RegExp[];
|
||||
|
||||
public _lastSeenUpdateId: MinCovered;
|
||||
|
||||
public constructor(
|
||||
private readonly settings: Settings,
|
||||
private readonly logger: Logger,
|
||||
|
|
@ -70,17 +69,13 @@ export class SyncEventQueue {
|
|||
this.documents.set(relativePath, record);
|
||||
}
|
||||
}
|
||||
this._lastSeenUpdateId = new MinCovered(initialState.lastSeenUpdateId ?? 0);
|
||||
this._lastSeenUpdateId = new MinCovered(
|
||||
initialState.lastSeenUpdateId ?? 0
|
||||
);
|
||||
|
||||
this.logger.debug(`Loaded ${this.documents.size} documents and lastSeenUpdateId=${this._lastSeenUpdateId} from storage`);
|
||||
}
|
||||
|
||||
public get lastSeenUpdateId(): VaultUpdateId {
|
||||
return this._lastSeenUpdateId.min;
|
||||
}
|
||||
|
||||
public set lastSeenUpdateId(id: VaultUpdateId) {
|
||||
this._lastSeenUpdateId.add(id);
|
||||
this.logger.debug(
|
||||
`Loaded ${this.documents.size} documents and lastSeenUpdateId=${this._lastSeenUpdateId.min} from storage`
|
||||
);
|
||||
}
|
||||
|
||||
public get pendingUpdateCount(): number {
|
||||
|
|
@ -91,8 +86,19 @@ export class SyncEventQueue {
|
|||
return this.documents.size;
|
||||
}
|
||||
|
||||
public get lastSeenUpdateId(): VaultUpdateId {
|
||||
return this._lastSeenUpdateId.min;
|
||||
}
|
||||
|
||||
public set lastSeenUpdateId(id: VaultUpdateId) {
|
||||
this._lastSeenUpdateId.add(id);
|
||||
}
|
||||
|
||||
public async enqueue(input: FileSyncEvent): Promise<void> {
|
||||
const path = (input.type === SyncEventType.RemoteChange) ? input.remoteVersion.relativePath : input.path;
|
||||
const path =
|
||||
input.type === SyncEventType.RemoteChange
|
||||
? input.remoteVersion.relativePath
|
||||
: input.path;
|
||||
|
||||
if (this.ignorePatterns.some((pattern) => pattern.test(path))) {
|
||||
this.logger.info(
|
||||
|
|
@ -106,22 +112,28 @@ export class SyncEventQueue {
|
|||
return;
|
||||
}
|
||||
|
||||
|
||||
if (input.type === SyncEventType.LocalCreate) {
|
||||
this.events.push({ type: SyncEventType.LocalCreate, path, originalPath: path, resolvers: Promise.withResolvers() });
|
||||
this.events.push({
|
||||
type: SyncEventType.LocalCreate,
|
||||
path,
|
||||
originalPath: path,
|
||||
resolvers: Promise.withResolvers()
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
const lookupPath = (input.type === SyncEventType.LocalUpdate && input.oldPath) ? input.oldPath : path;
|
||||
const lookupPath =
|
||||
input.type === SyncEventType.LocalUpdate &&
|
||||
input.oldPath !== undefined
|
||||
? input.oldPath
|
||||
: path;
|
||||
const record = this.documents.get(lookupPath);
|
||||
|
||||
// latest creation must take precedence as it's from the doc's latest generation
|
||||
const pendingDocumentId: Promise<DocumentId> | undefined =
|
||||
this.findLatestCreateForPath(lookupPath)?.resolvers.promise;
|
||||
|
||||
const documentId: DocumentId | undefined =
|
||||
record?.documentId;
|
||||
|
||||
const documentId: DocumentId | undefined = record?.documentId;
|
||||
|
||||
if (pendingDocumentId === undefined && documentId === undefined) {
|
||||
// we can get here when deleting a local document after a remote update
|
||||
|
|
@ -129,7 +141,14 @@ export class SyncEventQueue {
|
|||
}
|
||||
|
||||
if (input.type === SyncEventType.LocalDelete) {
|
||||
this.events.push({ type: SyncEventType.LocalDelete, documentId: pendingDocumentId ?? documentId! });
|
||||
const deleteId = pendingDocumentId ?? documentId;
|
||||
if (deleteId === undefined) {
|
||||
throw new Error("Unreachable: deleteId must be defined here");
|
||||
}
|
||||
this.events.push({
|
||||
type: SyncEventType.LocalDelete,
|
||||
documentId: deleteId
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
@ -137,30 +156,43 @@ export class SyncEventQueue {
|
|||
if (pendingDocumentId !== undefined) {
|
||||
this.updatePendingCreatePath(input.oldPath, path);
|
||||
} else {
|
||||
if (record === undefined) {
|
||||
throw new Error(
|
||||
"Unreachable: record must be defined for non-pending update"
|
||||
);
|
||||
}
|
||||
this.documents.delete(input.oldPath);
|
||||
this.documents.set(path, record!);
|
||||
this.documents.set(path, record);
|
||||
for (const e of this.events) {
|
||||
// It already has a docId, so there can't be a pending create event for it
|
||||
if (e.type === SyncEventType.LocalUpdate && e.documentId === documentId) {
|
||||
// It already has a docId, so there can't be a pending create event for it
|
||||
if (
|
||||
e.type === SyncEventType.LocalUpdate &&
|
||||
e.documentId === documentId
|
||||
) {
|
||||
e.path = path;
|
||||
}
|
||||
}
|
||||
await this.save();
|
||||
|
||||
}
|
||||
return
|
||||
return;
|
||||
}
|
||||
|
||||
this.events.push({ type: SyncEventType.LocalUpdate, documentId: pendingDocumentId ?? documentId!, path, originalPath: path });
|
||||
const updateId = pendingDocumentId ?? documentId;
|
||||
if (updateId === undefined) {
|
||||
throw new Error("Unreachable: updateId must be defined here");
|
||||
}
|
||||
this.events.push({
|
||||
type: SyncEventType.LocalUpdate,
|
||||
documentId: updateId,
|
||||
path,
|
||||
originalPath: path
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
|
||||
public async next(): Promise<SyncEvent | undefined> {
|
||||
return this.events.shift();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Call once a create has been acknowledged by the server.
|
||||
*/
|
||||
|
|
@ -170,19 +202,21 @@ export class SyncEventQueue {
|
|||
): Promise<void> {
|
||||
removeFromArray(this.events, event); // in case the create event is still pending
|
||||
await this.setDocument(event.path, record);
|
||||
event.resolvers?.resolve(record.documentId);
|
||||
event.resolvers.resolve(record.documentId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Update the settled document map and persist the new document version.
|
||||
*/
|
||||
public setDocument(path: RelativePath, record: DocumentRecord): Promise<void> {
|
||||
public async setDocument(
|
||||
path: RelativePath,
|
||||
record: DocumentRecord
|
||||
): Promise<void> {
|
||||
this.documents.set(path, record);
|
||||
return this.save();
|
||||
|
||||
}
|
||||
|
||||
public removeDocument(path: RelativePath): Promise<void> {
|
||||
public async removeDocument(path: RelativePath): Promise<void> {
|
||||
this.documents.delete(path);
|
||||
return this.save();
|
||||
}
|
||||
|
|
@ -198,11 +232,7 @@ export class SyncEventQueue {
|
|||
return undefined;
|
||||
}
|
||||
|
||||
|
||||
|
||||
public getDocumentByDocumentIdOrFail(
|
||||
target: DocumentId
|
||||
): DocumentWithPath {
|
||||
public getDocumentByDocumentIdOrFail(target: DocumentId): DocumentWithPath {
|
||||
const result = this.getDocumentByDocumentId(target);
|
||||
if (!result) {
|
||||
throw new Error(`No document found with id ${target}`);
|
||||
|
|
@ -210,10 +240,6 @@ export class SyncEventQueue {
|
|||
return result;
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
public async save(): Promise<void> {
|
||||
return this.saveData({
|
||||
documents: Array.from(this.documents.entries()).map(
|
||||
|
|
@ -227,16 +253,16 @@ export class SyncEventQueue {
|
|||
}
|
||||
|
||||
// todo: let's remove
|
||||
public getSettledDocumentByPath(path: RelativePath): DocumentRecord | undefined {
|
||||
public getSettledDocumentByPath(
|
||||
path: RelativePath
|
||||
): DocumentRecord | undefined {
|
||||
return this.documents.get(path);
|
||||
}
|
||||
|
||||
|
||||
public allSettledDocuments(): Map<RelativePath, DocumentRecord> {
|
||||
return new Map(this.documents.entries());
|
||||
}
|
||||
|
||||
|
||||
public hasPendingEventsForPath(path: RelativePath): boolean {
|
||||
const record = this.documents.get(path);
|
||||
if (record === undefined) {
|
||||
|
|
@ -252,7 +278,8 @@ export class SyncEventQueue {
|
|||
e.documentId === docId) ||
|
||||
(e.type === SyncEventType.RemoteChange &&
|
||||
// we care about the local path not the remote
|
||||
this.getDocumentByDocumentId(e.remoteVersion.documentId)?.path === path)
|
||||
this.getDocumentByDocumentId(e.remoteVersion.documentId)
|
||||
?.path === path)
|
||||
);
|
||||
}
|
||||
|
||||
|
|
@ -266,11 +293,10 @@ export class SyncEventQueue {
|
|||
);
|
||||
}
|
||||
|
||||
|
||||
public async clearAllState(): Promise<void> {
|
||||
this.clearPending();
|
||||
this.documents.clear();
|
||||
this._lastSeenUpdateId.reset()
|
||||
this._lastSeenUpdateId.reset();
|
||||
await this.save();
|
||||
}
|
||||
|
||||
|
|
@ -279,29 +305,6 @@ export class SyncEventQueue {
|
|||
this.events.length = 0;
|
||||
}
|
||||
|
||||
|
||||
private updatePendingCreatePath(
|
||||
oldPath: RelativePath,
|
||||
newPath: RelativePath
|
||||
): void {
|
||||
const createEvent = this.findLatestCreateForPath(oldPath);
|
||||
if (createEvent === undefined) return;
|
||||
|
||||
const promise = createEvent.resolvers?.promise;
|
||||
createEvent.path = newPath;
|
||||
|
||||
if (promise !== undefined) {
|
||||
for (const e of this.events) {
|
||||
if (
|
||||
e.type === SyncEventType.LocalUpdate &&
|
||||
e.documentId === promise
|
||||
) {
|
||||
e.path = newPath;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public findLatestCreateForPath(
|
||||
path: RelativePath
|
||||
): Extract<SyncEvent, { type: SyncEventType.LocalCreate }> | undefined {
|
||||
|
|
@ -314,18 +317,34 @@ export class SyncEventQueue {
|
|||
return undefined;
|
||||
}
|
||||
|
||||
private updatePendingCreatePath(
|
||||
oldPath: RelativePath,
|
||||
newPath: RelativePath
|
||||
): void {
|
||||
const createEvent = this.findLatestCreateForPath(oldPath);
|
||||
if (createEvent === undefined) return;
|
||||
|
||||
const { promise } = createEvent.resolvers;
|
||||
createEvent.path = newPath;
|
||||
|
||||
|
||||
|
||||
private rejectAllPendingCreates(): void {
|
||||
for (const event of this.events) {
|
||||
if (event.type === SyncEventType.LocalCreate && event.resolvers !== undefined) {
|
||||
event.resolvers.promise.catch(() => { /* suppressed — consumer may not be listening */ });
|
||||
event.resolvers.reject(new Error("Create was cancelled"));
|
||||
for (const e of this.events) {
|
||||
if (
|
||||
e.type === SyncEventType.LocalUpdate &&
|
||||
e.documentId === promise
|
||||
) {
|
||||
e.path = newPath;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private rejectAllPendingCreates(): void {
|
||||
for (const event of this.events) {
|
||||
if (event.type === SyncEventType.LocalCreate) {
|
||||
event.resolvers.promise.catch(() => {
|
||||
/* suppressed — consumer may not be listening */
|
||||
});
|
||||
event.resolvers.reject(new Error("Create was cancelled"));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue