wip again
This commit is contained in:
parent
53bfbfaa4a
commit
3ba0b7a88b
3 changed files with 286 additions and 385 deletions
|
|
@ -0,0 +1,247 @@
|
||||||
|
import type { DocumentRecord, RelativePath } from "./types";
|
||||||
|
import { SyncEventType } from "./types";
|
||||||
|
import type { Logger } from "../tracing/logger";
|
||||||
|
import { hash } from "../utils/hash";
|
||||||
|
import type { FileOperations } from "../file-operations/file-operations";
|
||||||
|
import { findMatchingFile } from "../utils/find-matching-file";
|
||||||
|
import { FileNotFoundError } from "../errors/file-not-found-error";
|
||||||
|
import type { SyncEventQueue } from "./sync-event-queue";
|
||||||
|
|
||||||
|
interface DocumentWithPath {
|
||||||
|
path: RelativePath;
|
||||||
|
record: DocumentRecord;
|
||||||
|
}
|
||||||
|
|
||||||
|
interface SyncInstruction {
|
||||||
|
type: "update" | "create";
|
||||||
|
relativePath: string;
|
||||||
|
oldPath?: string;
|
||||||
|
}
|
||||||
|
|
||||||
|
interface OfflineChangeDetectorDeps {
|
||||||
|
logger: Logger;
|
||||||
|
operations: FileOperations;
|
||||||
|
queue: SyncEventQueue;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Scans the local filesystem and the document database to determine
|
||||||
|
* which files were created, updated, moved, or deleted while the
|
||||||
|
* client was offline, then enqueues the appropriate sync events.
|
||||||
|
*/
|
||||||
|
export async function scheduleOfflineChanges(
|
||||||
|
deps: OfflineChangeDetectorDeps,
|
||||||
|
enqueueCreate: (path: RelativePath) => void,
|
||||||
|
enqueueUpdate: (args: { oldPath?: RelativePath; relativePath: RelativePath }) => void,
|
||||||
|
enqueueDelete: (path: RelativePath) => void,
|
||||||
|
): Promise<void> {
|
||||||
|
const { logger, operations, queue } = deps;
|
||||||
|
|
||||||
|
const allLocalFiles = await operations.listFilesRecursively();
|
||||||
|
logger.info(`Scheduling sync for ${allLocalFiles.length} local files`);
|
||||||
|
|
||||||
|
queue.clear();
|
||||||
|
|
||||||
|
const allDocuments = new Map(queue.allSettledDocuments());
|
||||||
|
const locallyRenamedPaths = enqueueRenamedDocuments(deps, allDocuments);
|
||||||
|
|
||||||
|
let deletedCandidates = await findLocallyDeletedFiles(operations, allDocuments);
|
||||||
|
|
||||||
|
const instructions = await buildSyncInstructions(
|
||||||
|
deps,
|
||||||
|
allLocalFiles,
|
||||||
|
locallyRenamedPaths,
|
||||||
|
deletedCandidates,
|
||||||
|
);
|
||||||
|
|
||||||
|
// Enqueue deletes first
|
||||||
|
for (const { path } of deletedCandidates) {
|
||||||
|
logger.debug(`Document ${path} has been deleted locally, scheduling sync to delete it`);
|
||||||
|
enqueueDelete(path);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Then updates/moves
|
||||||
|
for (const instruction of instructions) {
|
||||||
|
if (instruction.type === "update") {
|
||||||
|
enqueueUpdate({
|
||||||
|
oldPath: instruction.oldPath,
|
||||||
|
relativePath: instruction.relativePath,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Creates last so the server can merge with existing documents
|
||||||
|
for (const instruction of instructions) {
|
||||||
|
if (instruction.type === "create") {
|
||||||
|
enqueueCreate(instruction.relativePath);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
function enqueueRenamedDocuments(
|
||||||
|
{ queue, logger }: OfflineChangeDetectorDeps,
|
||||||
|
allDocuments: Map<RelativePath, DocumentRecord>,
|
||||||
|
): Set<RelativePath> {
|
||||||
|
const locallyRenamedPaths = new Set<RelativePath>();
|
||||||
|
|
||||||
|
for (const [path, record] of allDocuments) {
|
||||||
|
const remoteRelPath = record.remoteRelativePath;
|
||||||
|
const hasLocalRename = remoteRelPath !== undefined && remoteRelPath !== path;
|
||||||
|
|
||||||
|
if (hasLocalRename) {
|
||||||
|
queue.enqueue({ type: SyncEventType.SyncLocal, path });
|
||||||
|
locallyRenamedPaths.add(path);
|
||||||
|
logger.debug(`Document ${path} was renamed locally (from ${remoteRelPath}), scheduling sync`);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return locallyRenamedPaths;
|
||||||
|
}
|
||||||
|
|
||||||
|
async function findLocallyDeletedFiles(
|
||||||
|
operations: FileOperations,
|
||||||
|
allDocuments: Map<RelativePath, DocumentRecord>,
|
||||||
|
): Promise<DocumentWithPath[]> {
|
||||||
|
const result: DocumentWithPath[] = [];
|
||||||
|
|
||||||
|
for (const [path, record] of allDocuments) {
|
||||||
|
if (!(await operations.exists(path))) {
|
||||||
|
result.push({ path, record });
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
async function buildSyncInstructions(
|
||||||
|
deps: OfflineChangeDetectorDeps,
|
||||||
|
allLocalFiles: RelativePath[],
|
||||||
|
locallyRenamedPaths: Set<RelativePath>,
|
||||||
|
deletedCandidates: DocumentWithPath[],
|
||||||
|
): Promise<SyncInstruction[]> {
|
||||||
|
const { logger, operations, queue } = deps;
|
||||||
|
const instructions: SyncInstruction[] = [];
|
||||||
|
|
||||||
|
for (const relativePath of allLocalFiles) {
|
||||||
|
if (locallyRenamedPaths.has(relativePath)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
const existingRecord = queue.getSettledDocumentByPath(relativePath);
|
||||||
|
|
||||||
|
if (existingRecord !== undefined) {
|
||||||
|
const result = await handleExistingDocument(
|
||||||
|
deps,
|
||||||
|
relativePath,
|
||||||
|
existingRecord,
|
||||||
|
deletedCandidates,
|
||||||
|
);
|
||||||
|
if (result !== undefined) {
|
||||||
|
if (result.updatedDeletedCandidates !== undefined) {
|
||||||
|
deletedCandidates = result.updatedDeletedCandidates;
|
||||||
|
}
|
||||||
|
if (result.instruction !== undefined) {
|
||||||
|
instructions.push(result.instruction);
|
||||||
|
}
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.debug(
|
||||||
|
`Document ${relativePath} might have been updated locally, scheduling sync to validate and update it`,
|
||||||
|
);
|
||||||
|
instructions.push({ type: "update", relativePath });
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
const result = await handleNewFile(deps, relativePath, deletedCandidates);
|
||||||
|
if (result.updatedDeletedCandidates !== undefined) {
|
||||||
|
deletedCandidates = result.updatedDeletedCandidates;
|
||||||
|
}
|
||||||
|
instructions.push(result.instruction);
|
||||||
|
}
|
||||||
|
|
||||||
|
return instructions;
|
||||||
|
}
|
||||||
|
|
||||||
|
async function handleExistingDocument(
|
||||||
|
{ logger, operations }: OfflineChangeDetectorDeps,
|
||||||
|
relativePath: RelativePath,
|
||||||
|
existingRecord: DocumentRecord,
|
||||||
|
deletedCandidates: DocumentWithPath[],
|
||||||
|
): Promise<
|
||||||
|
| { instruction?: SyncInstruction; updatedDeletedCandidates?: DocumentWithPath[] }
|
||||||
|
| undefined
|
||||||
|
> {
|
||||||
|
if (deletedCandidates.length === 0) {
|
||||||
|
return undefined;
|
||||||
|
}
|
||||||
|
|
||||||
|
let contentHash: string | undefined;
|
||||||
|
try {
|
||||||
|
const bytes = await operations.read(relativePath);
|
||||||
|
contentHash = await hash(bytes);
|
||||||
|
} catch (e) {
|
||||||
|
if (e instanceof FileNotFoundError) return { instruction: undefined };
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (contentHash === existingRecord.remoteHash) {
|
||||||
|
return undefined;
|
||||||
|
}
|
||||||
|
|
||||||
|
const originalFile = await findMatchingFile(contentHash, deletedCandidates);
|
||||||
|
if (originalFile === undefined) {
|
||||||
|
return undefined;
|
||||||
|
}
|
||||||
|
|
||||||
|
// This file was moved here from a different path, displacing the existing document
|
||||||
|
const updatedDeletedCandidates = [
|
||||||
|
...deletedCandidates.filter((item) => item.path !== originalFile.path),
|
||||||
|
{ path: relativePath, record: existingRecord },
|
||||||
|
];
|
||||||
|
|
||||||
|
logger.debug(
|
||||||
|
`Document '${originalFile.path}' was moved to ${relativePath} (displacing existing document), scheduling sync to move it`,
|
||||||
|
);
|
||||||
|
|
||||||
|
return {
|
||||||
|
instruction: { type: "update", oldPath: originalFile.path, relativePath },
|
||||||
|
updatedDeletedCandidates,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
async function handleNewFile(
|
||||||
|
{ logger, operations }: OfflineChangeDetectorDeps,
|
||||||
|
relativePath: RelativePath,
|
||||||
|
deletedCandidates: DocumentWithPath[],
|
||||||
|
): Promise<{ instruction: SyncInstruction; updatedDeletedCandidates?: DocumentWithPath[] }> {
|
||||||
|
let contentHash: string | undefined;
|
||||||
|
try {
|
||||||
|
const contentBytes = await operations.read(relativePath);
|
||||||
|
contentHash = await hash(contentBytes);
|
||||||
|
} catch (e) {
|
||||||
|
if (e instanceof FileNotFoundError) {
|
||||||
|
return { instruction: { type: "create", relativePath } };
|
||||||
|
}
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
|
||||||
|
const originalFile = await findMatchingFile(contentHash, deletedCandidates);
|
||||||
|
if (originalFile !== undefined) {
|
||||||
|
const updatedDeletedCandidates = deletedCandidates.filter(
|
||||||
|
(item) => item.path !== originalFile.path,
|
||||||
|
);
|
||||||
|
|
||||||
|
logger.debug(
|
||||||
|
`Document '${originalFile.path}' was not found under its current path in the database but was found under a different path (${relativePath}), scheduling sync to move it`,
|
||||||
|
);
|
||||||
|
|
||||||
|
return {
|
||||||
|
instruction: { type: "update", oldPath: originalFile.path, relativePath },
|
||||||
|
updatedDeletedCandidates,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.debug(`Document ${relativePath} not found in database, scheduling sync to create it`);
|
||||||
|
return { instruction: { type: SyncEventType.Create, relativePath } };
|
||||||
|
}
|
||||||
|
|
@ -340,7 +340,7 @@ export class SyncEventQueue {
|
||||||
return this.ignorePatterns.some((pattern) => pattern.test(path));
|
return this.ignorePatterns.some((pattern) => pattern.test(path));
|
||||||
}
|
}
|
||||||
|
|
||||||
private removeAllEventsForDocumentId(documentId: DocumentId): void {
|
public removeAllEventsForDocumentId(documentId: DocumentId): void {
|
||||||
for (let i = this.events.length - 1; i >= 0; i--) {
|
for (let i = this.events.length - 1; i >= 0; i--) {
|
||||||
const e = this.events[i];
|
const e = this.events[i];
|
||||||
if (
|
if (
|
||||||
|
|
|
||||||
|
|
@ -7,10 +7,10 @@ import {
|
||||||
type VaultUpdateId,
|
type VaultUpdateId,
|
||||||
} from "./types";
|
} from "./types";
|
||||||
import type { Logger } from "../tracing/logger";
|
import type { Logger } from "../tracing/logger";
|
||||||
import { EMPTY_HASH, hash } from "../utils/hash";
|
import { hash } from "../utils/hash";
|
||||||
import type { Settings } from "../persistence/settings";
|
import type { Settings } from "../persistence/settings";
|
||||||
import type { FileOperations } from "../file-operations/file-operations";
|
import type { FileOperations } from "../file-operations/file-operations";
|
||||||
import { findMatchingFile } from "../utils/find-matching-file";
|
import { scheduleOfflineChanges } from "./offline-change-detector";
|
||||||
import { SyncResetError } from "../errors/sync-reset-error";
|
import { SyncResetError } from "../errors/sync-reset-error";
|
||||||
import type { DocumentVersionWithoutContent } from "../services/types/DocumentVersionWithoutContent";
|
import type { DocumentVersionWithoutContent } from "../services/types/DocumentVersionWithoutContent";
|
||||||
import type { WebSocketVaultUpdate } from "../services/types/WebSocketVaultUpdate";
|
import type { WebSocketVaultUpdate } from "../services/types/WebSocketVaultUpdate";
|
||||||
|
|
@ -84,18 +84,13 @@ export class Syncer {
|
||||||
}
|
}
|
||||||
|
|
||||||
public syncLocallyCreatedFile(relativePath: RelativePath): void {
|
public syncLocallyCreatedFile(relativePath: RelativePath): void {
|
||||||
this.queue.enqueue({ type: SyncEventType.Create, path: relativePath, originalPath: relativePath });
|
this.queue.enqueue({ type: SyncEventType.Create, path: relativePath });
|
||||||
this.ensureDraining();
|
this.ensureDraining();
|
||||||
}
|
}
|
||||||
|
|
||||||
public syncLocallyDeletedFile(relativePath: RelativePath): void {
|
public syncLocallyDeletedFile(relativePath: RelativePath): void {
|
||||||
const record = this.queue.getSettledDocumentByPath(relativePath);
|
|
||||||
const documentId: DocumentId | Promise<DocumentId> | undefined =
|
|
||||||
record?.documentId ?? this.queue.getCreatePromise(relativePath);
|
|
||||||
if (documentId === undefined) return;
|
|
||||||
this.queue.enqueue({
|
this.queue.enqueue({
|
||||||
type: SyncEventType.Delete,
|
type: SyncEventType.Delete,
|
||||||
documentId,
|
|
||||||
path: relativePath,
|
path: relativePath,
|
||||||
});
|
});
|
||||||
this.ensureDraining();
|
this.ensureDraining();
|
||||||
|
|
@ -108,54 +103,7 @@ export class Syncer {
|
||||||
oldPath?: RelativePath;
|
oldPath?: RelativePath;
|
||||||
relativePath: RelativePath;
|
relativePath: RelativePath;
|
||||||
}): void {
|
}): void {
|
||||||
if (oldPath === undefined) {
|
this.queue.enqueue({ type: SyncEventType.SyncLocal, path: relativePath, oldPath });
|
||||||
const record = this.queue.getSettledDocumentByPath(relativePath);
|
|
||||||
if (record === undefined) {
|
|
||||||
this.syncLocallyCreatedFile(relativePath);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
this.queue.enqueue({
|
|
||||||
type: SyncEventType.SyncLocal,
|
|
||||||
documentId: record.documentId,
|
|
||||||
path: relativePath,
|
|
||||||
originalPath: relativePath,
|
|
||||||
});
|
|
||||||
this.ensureDraining();
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Handle rename
|
|
||||||
const sourceRecord = this.queue.getSettledDocumentByPath(oldPath);
|
|
||||||
if (sourceRecord !== undefined) {
|
|
||||||
// Capture the displaced document's version before
|
|
||||||
// moveDocument removes it from the store
|
|
||||||
const displacedRecord = this.queue.getSettledDocumentByPath(relativePath);
|
|
||||||
const displacedDocumentId = this.queue.moveDocument(
|
|
||||||
oldPath,
|
|
||||||
relativePath
|
|
||||||
);
|
|
||||||
if (displacedDocumentId !== undefined) {
|
|
||||||
this.queue.enqueue({
|
|
||||||
type: SyncEventType.Delete,
|
|
||||||
documentId: displacedDocumentId,
|
|
||||||
path: relativePath,
|
|
||||||
displacedAtVersion: displacedRecord?.parentVersionId,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
this.queue.enqueue({
|
|
||||||
type: SyncEventType.SyncLocal,
|
|
||||||
documentId: sourceRecord.documentId,
|
|
||||||
path: relativePath,
|
|
||||||
originalPath: relativePath,
|
|
||||||
});
|
|
||||||
} else {
|
|
||||||
// No settled document at the old path — enqueue a fresh
|
|
||||||
// create at the new path. If a Create for the old path is
|
|
||||||
// still in the queue it will fail with FileNotFoundError
|
|
||||||
// and reject its resolvers, cancelling any dependent events.
|
|
||||||
this.syncLocallyCreatedFile(relativePath);
|
|
||||||
}
|
|
||||||
|
|
||||||
this.ensureDraining();
|
this.ensureDraining();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -209,17 +157,7 @@ export class Syncer {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
// The initial sync is a complete snapshot so we can jump the
|
|
||||||
// minimum straight to the max vaultUpdateId. Subsequent
|
|
||||||
// broadcasts use addSeenUpdateId (called per-event inside each
|
|
||||||
// processor) which tracks contiguous coverage and won't advance
|
|
||||||
// past gaps — correct for incremental updates but wrong for a
|
|
||||||
// snapshot whose IDs are intentionally sparse
|
|
||||||
if (message.isInitialSync) {
|
if (message.isInitialSync) {
|
||||||
this.queue.lastSeenUpdateId = Math.max(
|
|
||||||
...message.documents.map((d) => d.vaultUpdateId),
|
|
||||||
this.queue.lastSeenUpdateId
|
|
||||||
);
|
|
||||||
this._isFirstSyncComplete = true;
|
this._isFirstSyncComplete = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -259,181 +197,13 @@ export class Syncer {
|
||||||
|
|
||||||
|
|
||||||
private async internalScheduleSyncForOfflineChanges(): Promise<void> {
|
private async internalScheduleSyncForOfflineChanges(): Promise<void> {
|
||||||
const allLocalFiles = await this.operations.listFilesRecursively();
|
await scheduleOfflineChanges(
|
||||||
this.logger.info(
|
{ logger: this.logger, operations: this.operations, queue: this.queue },
|
||||||
`Scheduling sync for ${allLocalFiles.length} local files`
|
(path) => this.syncLocallyCreatedFile(path),
|
||||||
|
(args) => this.syncLocallyUpdatedFile(args),
|
||||||
|
(path) => this.syncLocallyDeletedFile(path),
|
||||||
);
|
);
|
||||||
|
|
||||||
// Clear stale event tracking from any previous drain
|
|
||||||
this.queue.clear();
|
|
||||||
|
|
||||||
// Detect documents whose local path diverges from the server path.
|
|
||||||
// This happens when a rename was recorded while sync was disabled.
|
|
||||||
const allDocuments = this.queue.allSettledDocuments();
|
|
||||||
const locallyRenamedPaths = new Set<RelativePath>();
|
|
||||||
|
|
||||||
for (const [path, record] of allDocuments) {
|
|
||||||
const remoteRelPath = record.remoteRelativePath;
|
|
||||||
const hasLocalRename =
|
|
||||||
remoteRelPath !== undefined && remoteRelPath !== path;
|
|
||||||
|
|
||||||
if (hasLocalRename) {
|
|
||||||
// Enqueue a sync-local at the current (renamed) path;
|
|
||||||
// the processSyncLocal handler will detect the path
|
|
||||||
// divergence and send an update with the new path
|
|
||||||
this.queue.enqueue({
|
|
||||||
type: SyncEventType.SyncLocal,
|
|
||||||
documentId: record.documentId,
|
|
||||||
path,
|
|
||||||
originalPath: path,
|
|
||||||
});
|
|
||||||
locallyRenamedPaths.add(path);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Find files that have been deleted locally
|
|
||||||
interface DocumentWithPath {
|
|
||||||
path: RelativePath;
|
|
||||||
record: DocumentRecord;
|
|
||||||
}
|
|
||||||
let locallyPossiblyDeletedFiles: DocumentWithPath[] = [];
|
|
||||||
for (const [path, record] of allDocuments) {
|
|
||||||
if (!(await this.operations.exists(path))) {
|
|
||||||
locallyPossiblyDeletedFiles.push({ path, record });
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
interface Instruction {
|
|
||||||
type: "update" | "create";
|
|
||||||
relativePath: string;
|
|
||||||
oldPath?: string;
|
|
||||||
}
|
|
||||||
const instructions: Instruction[] = [];
|
|
||||||
|
|
||||||
for (const relativePath of allLocalFiles) {
|
|
||||||
if (locallyRenamedPaths.has(relativePath)) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
const existingRecord = this.queue.getSettledDocumentByPath(relativePath);
|
|
||||||
|
|
||||||
if (existingRecord !== undefined) {
|
|
||||||
// Verify the content actually belongs to this document.
|
|
||||||
// A file might exist at a known path but actually be a
|
|
||||||
// different document that was renamed here while offline
|
|
||||||
if (locallyPossiblyDeletedFiles.length > 0) {
|
|
||||||
let contentHash: string | undefined;
|
|
||||||
try {
|
|
||||||
const bytes =
|
|
||||||
await this.operations.read(relativePath);
|
|
||||||
contentHash = await hash(bytes);
|
|
||||||
} catch (e) {
|
|
||||||
if (e instanceof FileNotFoundError) continue;
|
|
||||||
throw e;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (contentHash !== existingRecord.remoteHash) {
|
|
||||||
const originalFile = await findMatchingFile(
|
|
||||||
contentHash,
|
|
||||||
locallyPossiblyDeletedFiles
|
|
||||||
);
|
|
||||||
if (originalFile !== undefined) {
|
|
||||||
// This file was moved here from a different path
|
|
||||||
locallyPossiblyDeletedFiles.push({
|
|
||||||
path: relativePath,
|
|
||||||
record: existingRecord
|
|
||||||
});
|
|
||||||
locallyPossiblyDeletedFiles =
|
|
||||||
locallyPossiblyDeletedFiles.filter(
|
|
||||||
(item) =>
|
|
||||||
item.path !== originalFile.path
|
|
||||||
);
|
|
||||||
|
|
||||||
this.logger.debug(
|
|
||||||
`Document '${originalFile.path}' was moved to ${relativePath} (displacing existing document), scheduling sync to move it`
|
|
||||||
);
|
|
||||||
instructions.push({
|
|
||||||
type: "update",
|
|
||||||
oldPath: originalFile.path,
|
|
||||||
relativePath
|
|
||||||
});
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
this.logger.debug(
|
|
||||||
`Document ${relativePath} might have been updated locally, scheduling sync to validate and update it`
|
|
||||||
);
|
|
||||||
instructions.push({ type: "update", relativePath });
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Perhaps the file has been moved; check by looking at the deleted files
|
|
||||||
let contentHash: string | undefined = undefined;
|
|
||||||
try {
|
|
||||||
const contentBytes = await this.operations.read(relativePath);
|
|
||||||
contentHash = await hash(contentBytes);
|
|
||||||
} catch (e) {
|
|
||||||
if (e instanceof FileNotFoundError) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
throw e;
|
|
||||||
}
|
|
||||||
|
|
||||||
const originalFile = await findMatchingFile(
|
|
||||||
contentHash,
|
|
||||||
locallyPossiblyDeletedFiles
|
|
||||||
);
|
|
||||||
if (originalFile !== undefined) {
|
|
||||||
locallyPossiblyDeletedFiles =
|
|
||||||
locallyPossiblyDeletedFiles.filter(
|
|
||||||
(item) => item.path !== originalFile.path
|
|
||||||
);
|
|
||||||
|
|
||||||
this.logger.debug(
|
|
||||||
`Document '${originalFile.path}' was not found under its current path in the database but was found under a different path (${relativePath}), scheduling sync to move it`
|
|
||||||
);
|
|
||||||
|
|
||||||
instructions.push({
|
|
||||||
type: "update",
|
|
||||||
oldPath: originalFile.path,
|
|
||||||
relativePath
|
|
||||||
});
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
this.logger.debug(
|
|
||||||
`Document ${relativePath} not found in database, scheduling sync to create it`
|
|
||||||
);
|
|
||||||
instructions.push({ type: SyncEventType.Create, relativePath });
|
|
||||||
}
|
|
||||||
|
|
||||||
// Enqueue deletes first
|
|
||||||
for (const { path } of locallyPossiblyDeletedFiles) {
|
|
||||||
this.logger.debug(
|
|
||||||
`Document ${path} has been deleted locally, scheduling sync to delete it`
|
|
||||||
);
|
|
||||||
this.syncLocallyDeletedFile(path);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Then updates/moves
|
|
||||||
for (const instruction of instructions) {
|
|
||||||
if (instruction.type === "update") {
|
|
||||||
this.syncLocallyUpdatedFile({
|
|
||||||
oldPath: instruction.oldPath,
|
|
||||||
relativePath: instruction.relativePath
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Creates last so the server can merge with existing documents
|
|
||||||
for (const instruction of instructions) {
|
|
||||||
if (instruction.type === "create") {
|
|
||||||
this.syncLocallyCreatedFile(instruction.relativePath);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
await this.scheduleDrain();
|
await this.scheduleDrain();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -451,7 +221,7 @@ export class Syncer {
|
||||||
}
|
}
|
||||||
|
|
||||||
private async drain(): Promise<void> {
|
private async drain(): Promise<void> {
|
||||||
let event = this.queue.next();
|
let event = await this.queue.next();
|
||||||
while (event !== undefined) {
|
while (event !== undefined) {
|
||||||
try {
|
try {
|
||||||
await this.processEvent(event);
|
await this.processEvent(event);
|
||||||
|
|
@ -465,7 +235,7 @@ export class Syncer {
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
this.notifyRemainingOperationsChanged();
|
this.notifyRemainingOperationsChanged();
|
||||||
event = this.queue.next();
|
event = await this.queue.next();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -503,44 +273,8 @@ export class Syncer {
|
||||||
}
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (
|
|
||||||
e instanceof HttpClientError &&
|
|
||||||
event.type === SyncEventType.SyncLocal
|
|
||||||
) {
|
|
||||||
// The server rejected the update (e.g. document was
|
|
||||||
// deleted). Re-create only if local content differs
|
|
||||||
// from the last synced version — otherwise the remote
|
|
||||||
// delete should win
|
|
||||||
const doc = this.queue.getDocumentByDocumentId(
|
|
||||||
event.documentId
|
|
||||||
);
|
|
||||||
if (doc === undefined) return;
|
|
||||||
const { path: eventPath, record } = doc;
|
|
||||||
if (await this.operations.exists(eventPath)) {
|
|
||||||
const localBytes =
|
|
||||||
await this.operations.read(eventPath);
|
|
||||||
const localHash = await hash(localBytes);
|
|
||||||
if (localHash !== record.remoteHash) {
|
|
||||||
this.logger.info(
|
|
||||||
`Server rejected update for ${eventPath} but local content changed, re-creating`
|
|
||||||
);
|
|
||||||
this.queue.removeDocument(eventPath);
|
|
||||||
this.syncLocallyCreatedFile(eventPath);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
this.logger.info(
|
|
||||||
`Server rejected update for ${eventPath} (${e.message}), removing local copy`
|
|
||||||
);
|
|
||||||
this.queue.removeDocument(eventPath);
|
|
||||||
await this.operations.delete(eventPath);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
if (e instanceof HttpClientError) {
|
if (e instanceof HttpClientError) {
|
||||||
// Server rejected a request (e.g. updating a deleted
|
this.logger.error(
|
||||||
// document during sync-remote processing). Not an
|
|
||||||
// error — the next offline scan will reconcile
|
|
||||||
this.logger.info(
|
|
||||||
`Server rejected ${event.type} request: ${e.message}`
|
`Server rejected ${event.type} request: ${e.message}`
|
||||||
);
|
);
|
||||||
return;
|
return;
|
||||||
|
|
@ -574,77 +308,33 @@ export class Syncer {
|
||||||
contentBytes
|
contentBytes
|
||||||
});
|
});
|
||||||
|
|
||||||
event.resolvers?.resolve(response.documentId);
|
|
||||||
|
|
||||||
// Handle concurrent move & creation: the server merged our create
|
// Handle concurrent move & creation: the server merged our create
|
||||||
// with an existing document that we also have locally at a different path
|
// with an existing document that we also have locally at a different path
|
||||||
const existingDoc = this.queue.getDocumentByDocumentId(
|
const existingDoc = this.queue.getDocumentByDocumentId(
|
||||||
response.documentId
|
response.documentId
|
||||||
);
|
);
|
||||||
|
|
||||||
|
// need to merge in db
|
||||||
if (existingDoc !== undefined && existingDoc.path !== effectivePath) {
|
if (existingDoc !== undefined && existingDoc.path !== effectivePath) {
|
||||||
this.logger.info(
|
// this.logger.info(
|
||||||
`Merging existing document ${existingDoc.path} into ${effectivePath} after concurrent move & creation`
|
// `Merging existing document ${existingDoc.path} into ${effectivePath} after concurrent move & creation`
|
||||||
);
|
// );
|
||||||
await this.operations.delete(existingDoc.path);
|
// await this.operations.delete(existingDoc.path);
|
||||||
this.queue.removeDocument(existingDoc.path);
|
// this.queue.removeDocument(existingDoc.path);
|
||||||
|
// if (!this.queue.getDocumentByDocumentId(existingDoc.record.documentId)) {
|
||||||
|
// this.queue.removeAllEventsForDocumentId(existingDoc.record.documentId);
|
||||||
|
// }
|
||||||
|
// }
|
||||||
}
|
}
|
||||||
|
|
||||||
// When the server deconflicts the create to a different path, another
|
|
||||||
// document may now occupy the original path (downloaded while the
|
|
||||||
// create was in flight). handleMaybeMergingResponse would move the
|
|
||||||
// file AND the foreign document's record to the deconflicted path,
|
|
||||||
// then overwrite it — orphaning the foreign document. Handle this
|
|
||||||
// by writing directly to the deconflicted path instead of moving
|
|
||||||
const foreignRecord = this.queue.getSettledDocumentByPath(effectivePath);
|
|
||||||
const pathOccupiedByForeignDocument =
|
|
||||||
response.relativePath !== effectivePath &&
|
|
||||||
foreignRecord !== undefined &&
|
|
||||||
foreignRecord.documentId !== response.documentId;
|
|
||||||
|
|
||||||
if (pathOccupiedByForeignDocument) {
|
await this.handleMaybeMergingResponse({
|
||||||
const actualPath = response.relativePath;
|
path: effectivePath,
|
||||||
|
response,
|
||||||
if ("type" in response && response.type === "MergingUpdate") {
|
contentHash,
|
||||||
const responseBytes = base64ToBytes(response.contentBase64);
|
originalContentBytes: contentBytes
|
||||||
await this.operations.create(actualPath, responseBytes);
|
});
|
||||||
const afterWriteBytes =
|
|
||||||
await this.operations.read(actualPath);
|
|
||||||
const afterWriteHash = await hash(afterWriteBytes);
|
|
||||||
this.queue.setDocument(actualPath, {
|
|
||||||
documentId: response.documentId,
|
|
||||||
parentVersionId: response.vaultUpdateId,
|
|
||||||
remoteHash: afterWriteHash,
|
|
||||||
remoteRelativePath: response.relativePath
|
|
||||||
});
|
|
||||||
await this.updateCache(
|
|
||||||
response.vaultUpdateId,
|
|
||||||
responseBytes,
|
|
||||||
actualPath
|
|
||||||
);
|
|
||||||
} else {
|
|
||||||
await this.operations.create(actualPath, contentBytes);
|
|
||||||
this.queue.setDocument(actualPath, {
|
|
||||||
documentId: response.documentId,
|
|
||||||
parentVersionId: response.vaultUpdateId,
|
|
||||||
remoteHash: contentHash,
|
|
||||||
remoteRelativePath: response.relativePath
|
|
||||||
});
|
|
||||||
await this.updateCache(
|
|
||||||
response.vaultUpdateId,
|
|
||||||
contentBytes,
|
|
||||||
actualPath
|
|
||||||
);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
await this.handleMaybeMergingResponse({
|
|
||||||
path: effectivePath,
|
|
||||||
response,
|
|
||||||
contentHash,
|
|
||||||
originalContentBytes: contentBytes
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
this.queue.addSeenUpdateId(response.vaultUpdateId);
|
|
||||||
|
|
||||||
this.history.addHistoryEntry({
|
this.history.addHistoryEntry({
|
||||||
status: SyncStatus.SUCCESS,
|
status: SyncStatus.SUCCESS,
|
||||||
|
|
@ -660,8 +350,6 @@ export class Syncer {
|
||||||
private async processDelete(
|
private async processDelete(
|
||||||
event: Extract<SyncEvent, { type: SyncEventType.Delete }>
|
event: Extract<SyncEvent, { type: SyncEventType.Delete }>
|
||||||
): Promise<void> {
|
): Promise<void> {
|
||||||
const { path } = event;
|
|
||||||
|
|
||||||
let documentId: DocumentId;
|
let documentId: DocumentId;
|
||||||
if (typeof event.documentId === "string") {
|
if (typeof event.documentId === "string") {
|
||||||
documentId = event.documentId;
|
documentId = event.documentId;
|
||||||
|
|
@ -676,40 +364,21 @@ export class Syncer {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// For displacement deletes (side effect of a rename), check
|
|
||||||
// if another client updated the document since our last known
|
|
||||||
// version. If so, skip the delete to preserve their edits
|
|
||||||
if (event.displacedAtVersion !== undefined) {
|
|
||||||
const latest = await this.syncService.get({ documentId });
|
|
||||||
if (
|
|
||||||
!latest.isDeleted &&
|
|
||||||
latest.vaultUpdateId > event.displacedAtVersion
|
|
||||||
) {
|
|
||||||
this.logger.info(
|
|
||||||
`Skipping displacement delete for ${documentId} — document was updated by another client`
|
|
||||||
);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Use the document's current path from the store if available,
|
|
||||||
// otherwise fall back to the path from the event (e.g. when the
|
|
||||||
// document was displaced by a move and already removed from the store)
|
|
||||||
const doc = this.queue.getDocumentByDocumentId(documentId);
|
const doc = this.queue.getDocumentByDocumentId(documentId);
|
||||||
const relativePath = doc?.path ?? path;
|
if (doc === undefined) {
|
||||||
|
this.logger.debug(
|
||||||
|
`Skipping delete for unknown documentId ${documentId}`
|
||||||
|
);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
const relativePath = doc.path;
|
||||||
|
|
||||||
const response = await this.syncService.delete({
|
const response = await this.syncService.delete({
|
||||||
documentId,
|
documentId,
|
||||||
relativePath
|
relativePath
|
||||||
});
|
});
|
||||||
|
|
||||||
// Only remove the document record if it still belongs to this
|
this.queue.removeDocument(doc.path);
|
||||||
// documentId; the path may have been reused by a different document
|
|
||||||
// (e.g. after a move-to-occupied-path)
|
|
||||||
if (doc !== undefined) {
|
|
||||||
this.queue.removeDocument(doc.path);
|
|
||||||
}
|
|
||||||
this.queue.addSeenUpdateId(response.vaultUpdateId);
|
|
||||||
|
|
||||||
this.history.addHistoryEntry({
|
this.history.addHistoryEntry({
|
||||||
status: SyncStatus.SUCCESS,
|
status: SyncStatus.SUCCESS,
|
||||||
|
|
@ -781,7 +450,6 @@ export class Syncer {
|
||||||
originalContentBytes: contentBytes
|
originalContentBytes: contentBytes
|
||||||
});
|
});
|
||||||
|
|
||||||
this.queue.addSeenUpdateId(response.vaultUpdateId);
|
|
||||||
|
|
||||||
const isMerge =
|
const isMerge =
|
||||||
"type" in response && response.type === "MergingUpdate";
|
"type" in response && response.type === "MergingUpdate";
|
||||||
|
|
@ -815,7 +483,6 @@ export class Syncer {
|
||||||
this.logger.debug(
|
this.logger.debug(
|
||||||
`Document ${existingDoc.path} is already up-to-date`
|
`Document ${existingDoc.path} is already up-to-date`
|
||||||
);
|
);
|
||||||
this.queue.addSeenUpdateId(remoteVersion.vaultUpdateId);
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -831,7 +498,6 @@ export class Syncer {
|
||||||
this.logger.debug(
|
this.logger.debug(
|
||||||
`Document ${remoteVersion.relativePath} has been deleted remotely, no need to sync`
|
`Document ${remoteVersion.relativePath} has been deleted remotely, no need to sync`
|
||||||
);
|
);
|
||||||
this.queue.addSeenUpdateId(remoteVersion.vaultUpdateId);
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -858,13 +524,11 @@ export class Syncer {
|
||||||
// Local changes survive; re-upload as a new document
|
// Local changes survive; re-upload as a new document
|
||||||
this.queue.removeDocument(currentPath);
|
this.queue.removeDocument(currentPath);
|
||||||
this.syncLocallyCreatedFile(currentPath);
|
this.syncLocallyCreatedFile(currentPath);
|
||||||
this.queue.addSeenUpdateId(remoteVersion.vaultUpdateId);
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
await this.operations.delete(currentPath);
|
await this.operations.delete(currentPath);
|
||||||
this.queue.removeDocument(currentPath);
|
this.queue.removeDocument(currentPath);
|
||||||
this.queue.addSeenUpdateId(remoteVersion.vaultUpdateId);
|
|
||||||
|
|
||||||
this.history.addHistoryEntry({
|
this.history.addHistoryEntry({
|
||||||
status: SyncStatus.SUCCESS,
|
status: SyncStatus.SUCCESS,
|
||||||
|
|
@ -897,7 +561,6 @@ export class Syncer {
|
||||||
await this.operations.delete(currentPath);
|
await this.operations.delete(currentPath);
|
||||||
this.queue.removeDocument(currentPath);
|
this.queue.removeDocument(currentPath);
|
||||||
}
|
}
|
||||||
this.queue.addSeenUpdateId(fullVersion.vaultUpdateId);
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -920,7 +583,6 @@ export class Syncer {
|
||||||
originalContentBytes: contentBytes
|
originalContentBytes: contentBytes
|
||||||
});
|
});
|
||||||
|
|
||||||
this.queue.addSeenUpdateId(response.vaultUpdateId);
|
|
||||||
|
|
||||||
this.history.addHistoryEntry({
|
this.history.addHistoryEntry({
|
||||||
status: SyncStatus.SUCCESS,
|
status: SyncStatus.SUCCESS,
|
||||||
|
|
@ -976,7 +638,6 @@ export class Syncer {
|
||||||
responseBytes,
|
responseBytes,
|
||||||
actualPath
|
actualPath
|
||||||
);
|
);
|
||||||
this.queue.addSeenUpdateId(fullVersion.vaultUpdateId);
|
|
||||||
|
|
||||||
this.history.addHistoryEntry({
|
this.history.addHistoryEntry({
|
||||||
status: SyncStatus.SUCCESS,
|
status: SyncStatus.SUCCESS,
|
||||||
|
|
@ -1058,7 +719,6 @@ export class Syncer {
|
||||||
remoteVersion.relativePath
|
remoteVersion.relativePath
|
||||||
);
|
);
|
||||||
|
|
||||||
this.queue.addSeenUpdateId(remoteVersion.vaultUpdateId);
|
|
||||||
|
|
||||||
this.history.addHistoryEntry({
|
this.history.addHistoryEntry({
|
||||||
status: SyncStatus.SUCCESS,
|
status: SyncStatus.SUCCESS,
|
||||||
|
|
@ -1129,7 +789,6 @@ export class Syncer {
|
||||||
const record = this.queue.getSettledDocumentByPath(path);
|
const record = this.queue.getSettledDocumentByPath(path);
|
||||||
if (record !== undefined && localHash !== record.remoteHash) {
|
if (record !== undefined && localHash !== record.remoteHash) {
|
||||||
this.queue.removeDocument(path);
|
this.queue.removeDocument(path);
|
||||||
this.queue.addSeenUpdateId(response.vaultUpdateId);
|
|
||||||
this.syncLocallyCreatedFile(path);
|
this.syncLocallyCreatedFile(path);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
@ -1156,12 +815,7 @@ export class Syncer {
|
||||||
await this.operations.read(displacedPath);
|
await this.operations.read(displacedPath);
|
||||||
const displacedHash = await hash(displacedBytes);
|
const displacedHash = await hash(displacedBytes);
|
||||||
if (displacedHash !== displacedRecord.remoteHash) {
|
if (displacedHash !== displacedRecord.remoteHash) {
|
||||||
this.queue.enqueue({
|
this.queue.enqueue({ type: SyncEventType.SyncLocal, path: displacedPath });
|
||||||
type: SyncEventType.SyncLocal,
|
|
||||||
documentId: displacedRecord.documentId,
|
|
||||||
path: displacedPath,
|
|
||||||
originalPath: displacedPath,
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue