omg it mostly works for deletes

This commit is contained in:
Andras Schmelczer 2025-03-10 22:49:51 +00:00
parent 054d109ef8
commit d23c1a8dbc
No known key found for this signature in database
GPG key ID: FC8F2C3D3D1A718C
6 changed files with 243 additions and 139 deletions

View file

@ -1,7 +1,6 @@
import type { FileSystemOperations } from "sync-client"; import type { FileSystemOperations } from "sync-client";
import type { import type {
Database, Database,
DocumentMetadata,
DocumentRecord, DocumentRecord,
RelativePath RelativePath
} from "../persistence/database"; } from "../persistence/database";
@ -11,14 +10,7 @@ import { assertSetContainsExactly } from "../utils/assert-set-contains-exactly";
describe("File operations", () => { describe("File operations", () => {
class MockDatabase { class MockDatabase {
public move( public getLatestDocumentByRelativePath(
_oldRelativePath: RelativePath,
_newRelativePath: RelativePath
): void {
// this is called but irrelevant for this mock
}
public getDocumentByRelativePath(
_find: RelativePath _find: RelativePath
): DocumentRecord | undefined { ): DocumentRecord | undefined {
return undefined; return undefined;

View file

@ -71,15 +71,12 @@ export class FileOperations {
`Didn't expect ${path} to exist, deconflicting by moving it to '${deconflictedPath}'` `Didn't expect ${path} to exist, deconflicting by moving it to '${deconflictedPath}'`
); );
const document = this.database.getDocumentByRelativePath(path); const document =
this.database.getLatestDocumentByRelativePath(path);
this.logger.debug( this.logger.debug(
`Existing metadata for ${path}: ${JSON.stringify(document?.metadata)}` `Existing metadata for ${path}: ${JSON.stringify(document?.metadata)}`
); );
this.logger.debug(
`We need to save what's at ${path} to ${deconflictedPath}`
);
if ( if (
document?.metadata !== undefined && document?.metadata !== undefined &&
document.metadata.documentId === documentId document.metadata.documentId === documentId
@ -94,7 +91,6 @@ export class FileOperations {
`We need to save what's at ${path} to ${deconflictedPath}` `We need to save what's at ${path} to ${deconflictedPath}`
); );
await this.move(path, deconflictedPath, documentId); await this.move(path, deconflictedPath, documentId);
// this.database.move(path, deconflictedPath);
} else { } else {
await this.createParentDirectories(path); await this.createParentDirectories(path);
} }
@ -178,7 +174,8 @@ export class FileOperations {
`Conflict when moving '${oldPath}' to '${newPath}', the latter already exists, deconflicting by moving it to '${deconflictedPath}'` `Conflict when moving '${oldPath}' to '${newPath}', the latter already exists, deconflicting by moving it to '${deconflictedPath}'`
); );
const document = this.database.getDocumentByRelativePath(newPath); const document =
this.database.getLatestDocumentByRelativePath(newPath);
if ( if (
document?.metadata !== undefined && document?.metadata !== undefined &&

View file

@ -10,6 +10,7 @@ export interface DocumentMetadata {
hash: string; hash: string;
isDeleted: boolean; isDeleted: boolean;
} }
export interface StoredDocumentMetadata { export interface StoredDocumentMetadata {
relativePath: RelativePath; relativePath: RelativePath;
parentVersionId: VaultUpdateId; parentVersionId: VaultUpdateId;
@ -25,6 +26,7 @@ export interface StoredDatabase {
export interface DocumentRecord { export interface DocumentRecord {
identity: symbol; identity: symbol;
parallelVersion: number;
relativePath: RelativePath; relativePath: RelativePath;
metadata: DocumentMetadata | undefined; metadata: DocumentMetadata | undefined;
updates: Promise<void>[]; updates: Promise<void>[];
@ -46,7 +48,8 @@ export class Database {
relativePath, relativePath,
identity: Symbol(), identity: Symbol(),
metadata, metadata,
updates: [] updates: [],
parallelVersion: 0
})) ?? []; })) ?? [];
this.ensureConsistency(); this.ensureConsistency();
@ -63,7 +66,38 @@ export class Database {
} }
public get resolvedDocuments(): DocumentRecord[] { public get resolvedDocuments(): DocumentRecord[] {
return this.documents.filter(({ metadata }) => metadata !== undefined); const paths = new Map<string, DocumentRecord[]>();
this.documents
.filter(
({ metadata }) => metadata !== undefined && !metadata.isDeleted
)
.forEach((record) =>
paths.set(record.relativePath, [
record,
...(paths.get(record.relativePath) ?? [])
])
);
return Array.from(paths.values()).map((records) => {
records.sort(
(a, b) => b.parallelVersion - a.parallelVersion // descending
);
if (
records.length > 1 &&
records.some((current, i) =>
i === 0
? false
: records[i - 1].parallelVersion ===
current.parallelVersion
)
) {
throw new Error(
`Multiple documents with the same parallel version and path at ${records[0].relativePath}`
);
}
return records[0];
});
} }
public getLastSeenUpdateId(): VaultUpdateId | undefined { public getLastSeenUpdateId(): VaultUpdateId | undefined {
@ -81,25 +115,53 @@ export class Database {
this.save(); this.save();
} }
public setDocument({ public setDocument(
documentId, {
relativePath, documentId,
parentVersionId, relativePath,
hash, parentVersionId,
isDeleted hash,
}: { isDeleted
documentId: DocumentId; }: {
relativePath: RelativePath; documentId: DocumentId;
parentVersionId: VaultUpdateId; relativePath: RelativePath;
hash: string; parentVersionId: VaultUpdateId;
isDeleted: boolean; hash: string;
}): void { isDeleted: boolean;
const entry = this.getDocumentByRelativePath(relativePath); },
identity?: symbol
): void {
let entry: DocumentRecord | undefined;
if (identity !== undefined) {
entry = this.getDocumentByIdentity(identity);
if (entry !== undefined) { if (entry !== undefined) {
this.documents = this.documents.filter( this.documents = this.documents.filter(
({ identity }) => identity !== entry.identity ({ identity }) => identity !== entry!.identity
); );
}
} else {
entry = this.getLatestDocumentByRelativePath(relativePath);
if (
entry?.metadata?.documentId !== undefined &&
entry.metadata.documentId !== documentId
) {
this.documents.push({
// `entry` might be undefined if the document is new
identity: Symbol(),
relativePath,
metadata: {
documentId,
parentVersionId,
hash,
isDeleted
},
updates: [],
parallelVersion: entry?.parallelVersion + 1
});
}
this.save();
return;
} }
this.documents.push({ this.documents.push({
@ -112,7 +174,8 @@ export class Database {
hash, hash,
isDeleted isDeleted
}, },
updates: entry?.updates ?? [] updates: entry?.updates ?? [],
parallelVersion: entry?.parallelVersion ?? 0
}); });
this.save(); this.save();
@ -124,24 +187,29 @@ export class Database {
// No need to save as Promises don't get serialized // No need to save as Promises don't get serialized
} }
public getDocumentByRelativePath( public getLatestDocumentByRelativePath(
find: RelativePath find: RelativePath
): DocumentRecord | undefined { ): DocumentRecord | undefined {
return this.documents.find(({ relativePath }) => relativePath === find); const candidates = this.documents.filter(
({ relativePath }) => relativePath === find
);
candidates.sort((a, b) => b.parallelVersion - a.parallelVersion); // descending
return candidates[0];
} }
public async getResolvedDocumentByRelativePath( public async getResolvedDocumentByRelativePath(
relativePath: RelativePath, relativePath: RelativePath,
promise: Promise<void> promise: Promise<void>
): Promise<DocumentRecord> { ): Promise<void> {
let entry = this.getDocumentByRelativePath(relativePath); let entry = this.getLatestDocumentByRelativePath(relativePath);
if (entry === undefined) { if (entry === undefined) {
entry = { entry = {
relativePath, relativePath,
identity: Symbol(), identity: Symbol(),
metadata: undefined, metadata: undefined,
updates: [] updates: [],
parallelVersion: 0
}; };
this.documents.push(entry); this.documents.push(entry);
@ -150,9 +218,6 @@ export class Database {
const currentPromises = entry.updates; const currentPromises = entry.updates;
entry.updates = [...currentPromises, promise]; entry.updates = [...currentPromises, promise];
await Promise.all(currentPromises); await Promise.all(currentPromises);
// Refetch the document as it might have been updated
return this.getDocumentByIdentity(entry.identity);
} }
public getDocumentByUpdatePromise(promise: Promise<void>): DocumentRecord { public getDocumentByUpdatePromise(promise: Promise<void>): DocumentRecord {
@ -189,38 +254,40 @@ export class Database {
oldRelativePath: RelativePath, oldRelativePath: RelativePath,
newRelativePath: RelativePath newRelativePath: RelativePath
): void { ): void {
const oldDocument = this.getDocumentByRelativePath(oldRelativePath); const oldDocument =
this.getLatestDocumentByRelativePath(oldRelativePath);
if (oldDocument === undefined) { if (oldDocument === undefined) {
return;
throw new Error( throw new Error(
`Document to be moved not found: ${oldRelativePath}` `Document to be moved not found: ${oldRelativePath}`
); );
} }
const newDocument = this.getDocumentByRelativePath(newRelativePath);
if (
newDocument !== undefined &&
newDocument.metadata?.isDeleted === false
) {
throw new Error(
`Cannot move document to existing path: ${newRelativePath}`
);
}
this.documents = this.documents.filter( this.documents = this.documents.filter(
({ identity }) => ({ identity }) => identity !== oldDocument.identity
identity !== oldDocument.identity &&
identity !== newDocument?.identity
); );
let newDocument = this.getLatestDocumentByRelativePath(newRelativePath);
// It's either an invalid state of newDocument is pending deletion and we have to wait for it to complete
this.documents.push({ this.documents.push({
...oldDocument, identity: oldDocument.identity,
relativePath: newRelativePath metadata: oldDocument.metadata,
relativePath: newRelativePath,
updates: oldDocument.updates,
// We're in a strange state where the target of the move has just got deleted,
// however, its metadata might already have a bunch of updates queued up for
// the document at the new location. We need to keep these updates.
parallelVersion:
newDocument !== undefined ? newDocument.parallelVersion + 1 : 0
}); });
this.save(); this.save();
} }
private save(): void { private save(): void {
this.logger.debug(JSON.stringify(this.documents, null, 2));
this.ensureConsistency(); this.ensureConsistency();
void this.saveData({ void this.saveData({
documents: this.resolvedDocuments.map( documents: this.resolvedDocuments.map(

View file

@ -1,9 +1,4 @@
import type { import type { Database, RelativePath } from "../persistence/database";
Database,
DocumentMetadata,
RelativePath
} from "../persistence/database";
import type { SyncService } from "src/services/sync-service"; import type { SyncService } from "src/services/sync-service";
import type { Logger } from "src/tracing/logger"; import type { Logger } from "src/tracing/logger";
import type { SyncHistory } from "src/tracing/sync-history"; import type { SyncHistory } from "src/tracing/sync-history";
@ -24,10 +19,8 @@ export class Syncer {
private readonly syncQueue: PQueue; private readonly syncQueue: PQueue;
private runningScheduleSyncForOfflineChanges: Promise<void> | undefined = private runningScheduleSyncForOfflineChanges: Promise<void> | undefined;
undefined; private runningApplyRemoteChangesLocally: Promise<void> | undefined;
private runningApplyRemoteChangesLocally: Promise<void> | undefined =
undefined;
private readonly internalSyncer: UnrestrictedSyncer; private readonly internalSyncer: UnrestrictedSyncer;
@ -92,10 +85,17 @@ export class Syncer {
relativePath: RelativePath, relativePath: RelativePath,
updateTime?: Date updateTime?: Date
): Promise<void> { ): Promise<void> {
if (!this.settings.getSettings().isSyncEnabled) {
this.logger.info(
`Syncing is disabled, not syncing '${relativePath}'`
);
return;
}
const [promise, resolve, reject] = createPromise(); const [promise, resolve, reject] = createPromise();
// Most likely, we're waiting for the previous delete to finish on the file at this path // Most likely, we're waiting for the previous delete to finish on the file at this path
const document = await this.database.getResolvedDocumentByRelativePath( await this.database.getResolvedDocumentByRelativePath(
relativePath, relativePath,
promise promise
); );
@ -103,8 +103,7 @@ export class Syncer {
try { try {
await this.syncQueue.add(async () => await this.syncQueue.add(async () =>
this.internalSyncer.unrestrictedSyncLocallyCreatedFile( this.internalSyncer.unrestrictedSyncLocallyCreatedFile(
() => () => this.database.getDocumentByUpdatePromise(promise),
this.database.getDocumentByIdentity(document.identity),
updateTime updateTime
) )
); );
@ -120,18 +119,29 @@ export class Syncer {
public async syncLocallyDeletedFile( public async syncLocallyDeletedFile(
relativePath: RelativePath relativePath: RelativePath
): Promise<void> { ): Promise<void> {
if (!this.settings.getSettings().isSyncEnabled) {
this.logger.info(
`Syncing is disabled, not syncing '${relativePath}'`
);
return;
}
const [promise, resolve, reject] = createPromise(); const [promise, resolve, reject] = createPromise();
const document = await this.database.getResolvedDocumentByRelativePath( await this.database.getResolvedDocumentByRelativePath(
relativePath, relativePath,
promise promise
); );
try { try {
await this.syncQueue.add(async () => await this.syncQueue.add(async () =>
this.internalSyncer.unrestrictedSyncLocallyDeletedFile(() => this.internalSyncer.unrestrictedSyncLocallyDeletedFile(() => {
this.database.getDocumentByIdentity(document.identity) this.logger.debug(
) `aaaahg ${relativePath} has been deleted locally, syncing to delete it`
);
return this.database.getDocumentByUpdatePromise(promise);
})
); );
resolve(); resolve();
@ -142,34 +152,46 @@ export class Syncer {
} }
} }
public async syncLocallyUpdatedFile(args: { public async syncLocallyUpdatedFile({
oldPath,
relativePath,
updateTime
}: {
oldPath?: RelativePath; oldPath?: RelativePath;
relativePath: RelativePath; relativePath: RelativePath;
updateTime?: Date; updateTime?: Date;
}): Promise<void> { }): Promise<void> {
if (args.oldPath !== undefined) { if (!this.settings.getSettings().isSyncEnabled) {
if (args.oldPath === args.relativePath) { this.logger.info(
throw new Error( `Syncing is disabled, not syncing '${relativePath}'`
`Old path and new path are the same: ${args.oldPath}` );
); return;
}
this.database.move(args.oldPath, args.relativePath);
} }
const [promise, resolve, reject] = createPromise(); const [promise, resolve, reject] = createPromise();
const metadata = await this.database.getResolvedDocumentByRelativePath( if (oldPath !== undefined) {
args.relativePath, if (oldPath === relativePath) {
throw new Error(
`Old path and new path are the same: ${oldPath}`
);
}
this.database.move(oldPath, relativePath);
}
await this.database.getResolvedDocumentByRelativePath(
relativePath,
promise promise
); );
try { try {
await this.syncQueue.add(async () => await this.syncQueue.add(async () =>
this.internalSyncer.unrestrictedSyncLocallyUpdatedFile({ this.internalSyncer.unrestrictedSyncLocallyUpdatedFile({
...args, oldPath,
updateTime,
getLatestDocument: () => getLatestDocument: () =>
this.database.getDocumentByIdentity(metadata.identity) this.database.getDocumentByUpdatePromise(promise)
}) })
); );
@ -189,7 +211,7 @@ export class Syncer {
return; return;
} }
if (this.runningScheduleSyncForOfflineChanges != null) { if (this.runningScheduleSyncForOfflineChanges !== undefined) {
this.logger.debug("Uploading local changes is already in progress"); this.logger.debug("Uploading local changes is already in progress");
return this.runningScheduleSyncForOfflineChanges; return this.runningScheduleSyncForOfflineChanges;
} }
@ -244,9 +266,7 @@ export class Syncer {
public async reset(): Promise<void> { public async reset(): Promise<void> {
this.syncQueue.clear(); this.syncQueue.clear();
await this.syncQueue.onEmpty(); await this.syncQueue.onEmpty();
this.remainingOperationsListeners.forEach((listener) => { this.remainingOperationsListeners.forEach((listener) => listener(0));
listener(0);
});
this.internalSyncer.reset(); this.internalSyncer.reset();
} }
@ -257,6 +277,15 @@ export class Syncer {
remoteVersion.documentId remoteVersion.documentId
); );
if (document === undefined) {
const candidate = this.database.getLatestDocumentByRelativePath(
remoteVersion.relativePath
);
if (candidate !== undefined && candidate.metadata === undefined) {
document = candidate;
}
}
if (document === undefined) { if (document === undefined) {
await this.syncQueue.add(async () => await this.syncQueue.add(async () =>
this.internalSyncer.unrestrictedSyncRemotelyUpdatedFile( this.internalSyncer.unrestrictedSyncRemotelyUpdatedFile(
@ -269,7 +298,7 @@ export class Syncer {
const [promise, resolve, reject] = createPromise(); const [promise, resolve, reject] = createPromise();
document = await this.database.getResolvedDocumentByRelativePath( await this.database.getResolvedDocumentByRelativePath(
document.relativePath, document.relativePath,
promise promise
); );
@ -278,7 +307,7 @@ export class Syncer {
await this.syncQueue.add(async () => await this.syncQueue.add(async () =>
this.internalSyncer.unrestrictedSyncRemotelyUpdatedFile( this.internalSyncer.unrestrictedSyncRemotelyUpdatedFile(
remoteVersion, remoteVersion,
() => this.database.getDocumentByIdentity(document.identity) () => this.database.getDocumentByUpdatePromise(promise)
) )
); );
@ -300,7 +329,7 @@ export class Syncer {
const updates = Promise.all( const updates = Promise.all(
allLocalFiles.map(async (relativePath) => { allLocalFiles.map(async (relativePath) => {
if ( if (
this.database.getDocumentByRelativePath(relativePath) this.database.getLatestDocumentByRelativePath(relativePath)
?.metadata !== undefined ?.metadata !== undefined
) { ) {
this.logger.debug( this.logger.debug(

View file

@ -1,6 +1,5 @@
import type { import type {
Database, Database,
DocumentMetadata,
DocumentRecord, DocumentRecord,
RelativePath RelativePath
} from "../persistence/database"; } from "../persistence/database";
@ -61,7 +60,7 @@ export class UnrestrictedSyncer {
createdDate: updateTime createdDate: updateTime
}); });
const { relativePath: currentRelativePath } = const { relativePath: currentRelativePath, identity } =
getLatestDocument(); getLatestDocument();
this.history.addHistoryEntry({ this.history.addHistoryEntry({
@ -80,7 +79,7 @@ export class UnrestrictedSyncer {
isDeleted: false isDeleted: false
}; };
this.database.setDocument(newMetadata); this.database.setDocument(newMetadata, identity);
this.tryIncrementVaultUpdateId(response.vaultUpdateId); this.tryIncrementVaultUpdateId(response.vaultUpdateId);
} }
@ -101,7 +100,7 @@ export class UnrestrictedSyncer {
document.metadata.isDeleted document.metadata.isDeleted
) { ) {
this.logger.debug( this.logger.debug(
`Document ${document.relativePath} has been already deleted, no need to delete it again` `Document '${document.relativePath}' has been already deleted, no need to delete it again`
); );
return; return;
} }
@ -124,13 +123,16 @@ export class UnrestrictedSyncer {
// We have to have a record of the delete in case there's an in-flight update for the same // We have to have a record of the delete in case there's an in-flight update for the same
// document which finishes after the delete has succeeded and would introduce a phantom metadata record. // document which finishes after the delete has succeeded and would introduce a phantom metadata record.
this.database.setDocument({ this.database.setDocument(
relativePath: document.relativePath, {
documentId: response.documentId, relativePath: document.relativePath,
parentVersionId: response.vaultUpdateId, documentId: response.documentId,
hash: EMPTY_HASH, parentVersionId: response.vaultUpdateId,
isDeleted: true hash: EMPTY_HASH,
}); isDeleted: true
},
document.identity
);
} }
); );
} }
@ -222,13 +224,16 @@ export class UnrestrictedSyncer {
type: SyncType.DELETE type: SyncType.DELETE
}); });
this.database.setDocument({ this.database.setDocument(
documentId: response.documentId, {
relativePath: document.relativePath, documentId: response.documentId,
parentVersionId: response.vaultUpdateId, relativePath: document.relativePath,
hash: EMPTY_HASH, parentVersionId: response.vaultUpdateId,
isDeleted: true hash: EMPTY_HASH,
}); isDeleted: true
},
document.identity
);
this.tryIncrementVaultUpdateId(response.vaultUpdateId); this.tryIncrementVaultUpdateId(response.vaultUpdateId);
@ -262,16 +267,19 @@ export class UnrestrictedSyncer {
}); });
} }
this.database.setDocument({ this.database.setDocument(
documentId: response.documentId, {
relativePath: documentId: response.documentId,
response.relativePath != document.relativePath relativePath:
? response.relativePath response.relativePath != document.relativePath
: document.relativePath, ? response.relativePath
parentVersionId: response.vaultUpdateId, : document.relativePath,
hash: contentHash, parentVersionId: response.vaultUpdateId,
isDeleted: response.isDeleted hash: contentHash,
}); isDeleted: response.isDeleted
},
document.identity
);
this.tryIncrementVaultUpdateId(response.vaultUpdateId); this.tryIncrementVaultUpdateId(response.vaultUpdateId);
} }
@ -293,10 +301,7 @@ export class UnrestrictedSyncer {
remoteVersion.documentId remoteVersion.documentId
); );
if ( if (localMetadata?.metadata !== undefined) {
localMetadata?.metadata !== undefined &&
!localMetadata.metadata.isDeleted
) {
// If the file exists locally, let's pretend the user has updated it // If the file exists locally, let's pretend the user has updated it
// and deal with remote update/deletion within `unrestrictedSyncLocallyUpdatedFile` // and deal with remote update/deletion within `unrestrictedSyncLocallyUpdatedFile`
if ( if (
@ -315,6 +320,11 @@ export class UnrestrictedSyncer {
localMetadata.identity localMetadata.identity
) )
}); });
} else if (remoteVersion.isDeleted) {
this.logger.debug(
`Document ${remoteVersion.relativePath} has been deleted remotely, no need to sync`
);
return;
} }
const content = ( const content = (
@ -330,13 +340,22 @@ export class UnrestrictedSyncer {
remoteVersion.documentId remoteVersion.documentId
); );
this.database.setDocument({ this.database.setDocument(
documentId: remoteVersion.documentId, {
relativePath: remoteVersion.relativePath, documentId: remoteVersion.documentId,
parentVersionId: remoteVersion.vaultUpdateId, relativePath: remoteVersion.relativePath,
hash: hash(contentBytes), parentVersionId: remoteVersion.vaultUpdateId,
isDeleted: remoteVersion.isDeleted hash: hash(contentBytes),
}); isDeleted: remoteVersion.isDeleted
},
getLatestDocument?.()?.identity ??
this.database.getDocumentByDocumentId(
remoteVersion.documentId
)?.identity ??
this.database.getLatestDocumentByRelativePath(
remoteVersion.relativePath
)?.identity
);
this.history.addHistoryEntry({ this.history.addHistoryEntry({
status: SyncStatus.SUCCESS, status: SyncStatus.SUCCESS,
@ -359,7 +378,7 @@ export class UnrestrictedSyncer {
if (!this.settings.getSettings().isSyncEnabled) { if (!this.settings.getSettings().isSyncEnabled) {
this.logger.info( this.logger.info(
`Syncing is disabled, not syncing ${relativePath}` `Syncing is disabled, not syncing '${relativePath}'`
); );
return; return;
} }

View file

@ -16,7 +16,7 @@ npm run build
pids=() pids=()
for i in $(seq 1 $process_count); do for i in $(seq 1 $process_count); do
node dist/cli.js 2>&1 | tee "log_${i}.log" & node dist/cli.js 2>&1 > "log_${i}.log" &
pids+=($!) pids+=($!)
done done