This commit is contained in:
Andras Schmelczer 2025-03-15 09:25:09 +00:00
parent 408afa3626
commit e3196c2dc0
No known key found for this signature in database
GPG key ID: FC8F2C3D3D1A718C
10 changed files with 338 additions and 301 deletions

View file

@ -38,7 +38,7 @@ export class FileOperations {
const decoder = new TextDecoder("utf-8"); const decoder = new TextDecoder("utf-8");
// Normalize line endings to LF on Windows // Normalize line-endings to LF on Windows
let text = decoder.decode(content); let text = decoder.decode(content);
text = text.replace(/\r\n/g, "\n"); text = text.replace(/\r\n/g, "\n");
@ -53,7 +53,7 @@ export class FileOperations {
return this.fs.exists(path); return this.fs.exists(path);
} }
// Create and write the file if it doesn't exist. Otherwise, it has the same behavior as write. // Create and write the file if it doesn't exist.Otherwise, it has the same behavior as write.
// All parent directories are created if they don't exist. // All parent directories are created if they don't exist.
public async create( public async create(
path: RelativePath, path: RelativePath,
@ -73,20 +73,15 @@ export class FileOperations {
`Existing metadata for ${path}: ${JSON.stringify(document?.metadata)}` `Existing metadata for ${path}: ${JSON.stringify(document?.metadata)}`
); );
if ( if (document !== undefined && document.documentId === documentId) {
document?.metadata !== undefined &&
document.metadata.documentId === documentId
) {
// This can happen if the document got moved both locally and remotely // This can happen if the document got moved both locally and remotely
// to the same file path. In this case, we shouldn't deconflict, however, // to the same file path. In this case, we shouldn't deconflict, however,
// we also can't overwrite otherwise we'd lose changes. // we also can't overwrite otherwise we'd lose changes.
throw new FileNotFoundError(path); throw new FileNotFoundError(path);
} }
this.logger.debug( this.database.move(path, deconflictedPath);
`We need to save what's at ${path} to ${deconflictedPath}` await this.fs.rename(path, deconflictedPath);
);
await this.move(path, deconflictedPath, documentId);
} else { } else {
await this.createParentDirectories(path); await this.createParentDirectories(path);
} }
@ -147,7 +142,7 @@ export class FileOperations {
} }
public async delete(path: RelativePath): Promise<void> { public async delete(path: RelativePath): Promise<void> {
if (!(await this.exists(path))) { if (await this.exists(path)) {
this.logger.debug(`Deleting file: ${path}`); this.logger.debug(`Deleting file: ${path}`);
return this.fs.delete(path); return this.fs.delete(path);
} else { } else {
@ -175,7 +170,7 @@ export class FileOperations {
if ( if (
document?.metadata !== undefined && document?.metadata !== undefined &&
document.metadata.documentId === documentId document.documentId === documentId
) { ) {
// This can happen if the document got moved both locally and remotely // This can happen if the document got moved both locally and remotely
// to the same file path. In this case, we shouldn't deconflict, however, // to the same file path. In this case, we shouldn't deconflict, however,
@ -183,12 +178,13 @@ export class FileOperations {
throw new FileNotFoundError(newPath); throw new FileNotFoundError(newPath);
} }
await this.move(newPath, deconflictedPath, documentId); this.database.move(newPath, deconflictedPath);
// this.database.move(oldPath, newPath); await this.fs.rename(newPath, deconflictedPath);
} else { } else {
await this.createParentDirectories(newPath); await this.createParentDirectories(newPath);
} }
this.database.move(oldPath, newPath);
await this.fs.rename(oldPath, newPath); await this.fs.rename(oldPath, newPath);
} }

View file

@ -6,14 +6,13 @@ export type RelativePath = string;
export interface DocumentMetadata { export interface DocumentMetadata {
parentVersionId: VaultUpdateId; parentVersionId: VaultUpdateId;
documentId: DocumentId;
hash: string; hash: string;
} }
export interface StoredDocumentMetadata { export interface StoredDocumentMetadata {
relativePath: RelativePath; relativePath: RelativePath;
parentVersionId: VaultUpdateId;
documentId: DocumentId; documentId: DocumentId;
parentVersionId: VaultUpdateId;
hash: string; hash: string;
} }
@ -25,6 +24,7 @@ export interface StoredDatabase {
export interface DocumentRecord { export interface DocumentRecord {
identity: symbol; identity: symbol;
relativePath: RelativePath; relativePath: RelativePath;
documentId: DocumentId;
metadata: DocumentMetadata | undefined; metadata: DocumentMetadata | undefined;
isDeleted: boolean; isDeleted: boolean;
updates: Promise<void>[]; updates: Promise<void>[];
@ -43,14 +43,17 @@ export class Database {
initialState ??= {}; initialState ??= {};
this.documents = this.documents =
initialState.documents?.map(({ relativePath, ...metadata }) => ({ initialState.documents?.map(
relativePath, ({ relativePath, documentId, ...metadata }) => ({
identity: Symbol(), relativePath,
metadata, documentId,
isDeleted: false, identity: Symbol(),
updates: [], metadata,
parallelVersion: 0 isDeleted: false,
})) ?? []; updates: [],
parallelVersion: 0
})
) ?? [];
this.ensureConsistency(); this.ensureConsistency();
this.logger.debug(`Loaded ${this.documents.length} documents`); this.logger.debug(`Loaded ${this.documents.length} documents`);
@ -135,11 +138,17 @@ export class Database {
({ identity }) => identity !== entry.identity ({ identity }) => identity !== entry.identity
); );
if (entry.relativePath !== relativePath) {
throw new Error(
"Document identity does not match the relative path"
);
}
this.documents.push({ this.documents.push({
...entry, ...entry,
relativePath, relativePath,
documentId,
metadata: { metadata: {
documentId,
parentVersionId, parentVersionId,
hash hash
} }
@ -153,13 +162,13 @@ export class Database {
// meaning that two documents occupy the same path in terms of in-flight requests so we // meaning that two documents occupy the same path in terms of in-flight requests so we
// need to create a new parallel version. // need to create a new parallel version.
entry = this.getLatestDocumentByRelativePath(relativePath); entry = this.getLatestDocumentByRelativePath(relativePath);
if (entry && entry.metadata?.documentId !== documentId) { if (entry && entry.documentId !== documentId) {
this.documents.push({ this.documents.push({
// `entry` might be undefined if the document is new // `entry` might be undefined if the document is new
identity: Symbol(), identity: Symbol(),
relativePath, relativePath,
documentId,
metadata: { metadata: {
documentId,
parentVersionId, parentVersionId,
hash hash
}, },
@ -174,8 +183,8 @@ export class Database {
this.documents.push({ this.documents.push({
identity: Symbol(), identity: Symbol(),
relativePath, relativePath,
documentId,
metadata: { metadata: {
documentId,
parentVersionId, parentVersionId,
hash hash
}, },
@ -210,16 +219,13 @@ export class Database {
let entry = this.getLatestDocumentByRelativePath(relativePath); let entry = this.getLatestDocumentByRelativePath(relativePath);
if (entry === undefined) { if (entry === undefined) {
entry = { throw new Error(
relativePath, `Document not found by relative path: ${relativePath}, ${JSON.stringify(
identity: Symbol(), this.documents,
metadata: undefined, null,
isDeleted: false, 2
updates: [], )}`
parallelVersion: 0 );
};
this.documents.push(entry);
} }
const currentPromises = entry.updates; const currentPromises = entry.updates;
@ -227,6 +233,30 @@ export class Database {
await Promise.all(currentPromises); await Promise.all(currentPromises);
} }
public getNewResolvedDocumentByRelativePath(
documentId: DocumentId,
relativePath: RelativePath,
promise: Promise<void>
): void {
let previousEntry = this.getLatestDocumentByRelativePath(relativePath);
const entry = {
relativePath,
documentId,
identity: Symbol(),
metadata: undefined,
isDeleted: false,
updates: [promise],
parallelVersion:
previousEntry?.parallelVersion === undefined
? 0
: previousEntry.parallelVersion + 1
};
this.documents.push(entry);
this.save();
}
public getDocumentByUpdatePromise(promise: Promise<void>): DocumentRecord { public getDocumentByUpdatePromise(promise: Promise<void>): DocumentRecord {
const result = this.documents.find(({ updates }) => const result = this.documents.find(({ updates }) =>
updates.includes(promise) updates.includes(promise)
@ -240,11 +270,9 @@ export class Database {
} }
public getDocumentByDocumentId( public getDocumentByDocumentId(
documentId: DocumentId find: DocumentId
): DocumentRecord | undefined { ): DocumentRecord | undefined {
return this.documents.find( return this.documents.find(({ documentId }) => documentId === find);
({ metadata }) => metadata?.documentId === documentId
);
} }
public getDocumentByIdentity(find: symbol): DocumentRecord { public getDocumentByIdentity(find: symbol): DocumentRecord {
@ -263,9 +291,8 @@ export class Database {
): void { ): void {
const oldDocument = const oldDocument =
this.getLatestDocumentByRelativePath(oldRelativePath); this.getLatestDocumentByRelativePath(oldRelativePath);
if (oldDocument === undefined) { if (oldDocument === undefined) {
// We can try moving a non-existent document if it hasn't yet got created becasue it's
// the result of an offline event while this move happens online before.
return; return;
} }
@ -275,13 +302,11 @@ export class Database {
let newDocument = this.getLatestDocumentByRelativePath(newRelativePath); let newDocument = this.getLatestDocumentByRelativePath(newRelativePath);
// It's either an invalid state of newDocument is pending deletion and we have to wait for it to complete // It's either an invalid state of newDocument is pending deletion and we have
// to wait for it to complete.
this.documents.push({ this.documents.push({
identity: oldDocument.identity, ...oldDocument,
metadata: oldDocument.metadata,
relativePath: newRelativePath, relativePath: newRelativePath,
isDeleted: oldDocument.isDeleted,
updates: oldDocument.updates,
// We're in a strange state where the target of the move has just got deleted, // We're in a strange state where the target of the move has just got deleted,
// however, its metadata might already have a bunch of updates queued up for // however, its metadata might already have a bunch of updates queued up for
// the document at the new location. We need to keep these updates. // the document at the new location. We need to keep these updates.
@ -295,8 +320,9 @@ export class Database {
public delete(relativePath: RelativePath): void { public delete(relativePath: RelativePath): void {
const candidate = this.getLatestDocumentByRelativePath(relativePath); const candidate = this.getLatestDocumentByRelativePath(relativePath);
if (candidate === undefined) { if (candidate === undefined) {
// it's fine because the document to be deleted might not have been created yet throw new Error(
return; `Document not found by relative path: ${relativePath}`
);
} }
candidate.isDeleted = true; candidate.isDeleted = true;
} }
@ -319,16 +345,12 @@ export class Database {
private ensureConsistency(): void { private ensureConsistency(): void {
const idToPath = new Map<string, string[]>(); const idToPath = new Map<string, string[]>();
this.resolvedDocuments this.resolvedDocuments.forEach(({ relativePath, documentId }) => {
.filter(({ metadata }) => metadata !== undefined) idToPath.set(documentId, [
.forEach(({ metadata, relativePath }) => { ...(idToPath.get(documentId) ?? []),
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion relativePath
idToPath.set(metadata!.documentId, [ ]);
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion });
...(idToPath.get(metadata!.documentId) ?? []),
relativePath
]);
});
const duplicates = Array.from(idToPath.entries()) const duplicates = Array.from(idToPath.entries())
.filter(([_, paths]) => paths.length > 1) .filter(([_, paths]) => paths.length > 1)

View file

@ -0,0 +1,52 @@
import { Syncer } from "../sync-operations/syncer";
import { Settings } from "../persistence/settings";
import { Logger } from "../tracing/logger";
import { createPromise } from "../utils/create-promise";
import { retriedFetchFactory } from "../utils/retried-fetch";
export class ConnectedState {
private resolveIsSyncEnabled: (() => void) | undefined;
private syncIsEnabled: Promise<void> | undefined;
public constructor(
settings: Settings,
private readonly logger: Logger
) {
settings.addOnSettingsChangeHandlers((newSettings, oldSettings) => {
if (!oldSettings.isSyncEnabled && newSettings.isSyncEnabled) {
this.handleComingOnline();
} else if (
oldSettings.isSyncEnabled &&
!newSettings.isSyncEnabled
) {
this.handleGoingOffline();
}
});
}
private handleComingOnline() {
this.logger.debug("Sync is enabled");
this.resolveIsSyncEnabled?.();
}
private handleGoingOffline() {
this.logger.debug("Sync is disabled");
[this.syncIsEnabled, this.resolveIsSyncEnabled] = createPromise();
}
public getFetchImplementation(
fetch: typeof globalThis.fetch,
{ doRetries = true }: { doRetries: boolean } = { doRetries: true }
): typeof globalThis.fetch {
const retriedFetch = doRetries
? retriedFetchFactory(this.logger, fetch)
: fetch;
return async (input: RequestInfo | URL): Promise<Response> => {
if (this.syncIsEnabled !== undefined) {
await this.syncIsEnabled;
}
return retriedFetch(input);
};
}
}

View file

@ -6,20 +6,22 @@ import type {
RelativePath, RelativePath,
VaultUpdateId VaultUpdateId
} from "../persistence/database"; } from "../persistence/database";
import type { Logger } from "src/tracing/logger"; import type { Logger } from "../tracing/logger";
import { retriedFetchFactory } from "src/utils/retried-fetch"; import type { Settings } from "../persistence/settings";
import type { Settings } from "src/persistence/settings"; import { ConnectedState } from "./connected-state";
export interface CheckConnectionResult { export interface CheckConnectionResult {
isSuccessful: boolean; isSuccessful: boolean;
message: string; message: string;
} }
export class SyncService { export class SyncService {
private client!: Client<paths>; private client!: Client<paths>;
private clientWithoutRetries!: Client<paths>; private clientWithoutRetries!: Client<paths>;
private _fetchImplementation: typeof globalThis.fetch = globalThis.fetch; private _fetchImplementation: typeof globalThis.fetch = globalThis.fetch;
public constructor( public constructor(
private readonly connectedState: ConnectedState,
private readonly settings: Settings, private readonly settings: Settings,
private readonly logger: Logger private readonly logger: Logger
) { ) {
@ -52,17 +54,19 @@ export class SyncService {
} }
public async create({ public async create({
documentId,
relativePath, relativePath,
contentBytes, contentBytes
createdDate
}: { }: {
documentId?: DocumentId;
relativePath: RelativePath; relativePath: RelativePath;
contentBytes: Uint8Array; contentBytes: Uint8Array;
createdDate: Date;
}): Promise<components["schemas"]["DocumentVersionWithoutContent"]> { }): Promise<components["schemas"]["DocumentVersionWithoutContent"]> {
const formData = new FormData(); const formData = new FormData();
if (documentId !== undefined) {
formData.append("document_id", documentId);
}
formData.append("relative_path", relativePath); formData.append("relative_path", relativePath);
formData.append("created_date", createdDate.toISOString());
formData.append("content", new Blob([contentBytes])); formData.append("content", new Blob([contentBytes]));
const response = await this.client.POST( const response = await this.client.POST(
@ -100,21 +104,18 @@ export class SyncService {
parentVersionId, parentVersionId,
documentId, documentId,
relativePath, relativePath,
contentBytes, contentBytes
createdDate
}: { }: {
parentVersionId: VaultUpdateId; parentVersionId: VaultUpdateId;
documentId: DocumentId; documentId: DocumentId;
relativePath: RelativePath; relativePath: RelativePath;
contentBytes: Uint8Array; contentBytes: Uint8Array;
createdDate: Date;
}): Promise<components["schemas"]["DocumentUpdateResponse"]> { }): Promise<components["schemas"]["DocumentUpdateResponse"]> {
this.logger.debug( this.logger.debug(
`Updating document ${documentId} with parent version ${parentVersionId} & ${new TextDecoder().decode(contentBytes)} & ${relativePath}` `Updating document ${documentId} with parent version ${parentVersionId} & ${new TextDecoder().decode(contentBytes)} & ${relativePath}`
); );
const formData = new FormData(); const formData = new FormData();
formData.append("parent_version_id", parentVersionId.toString()); formData.append("parent_version_id", parentVersionId.toString());
formData.append("created_date", createdDate.toISOString());
formData.append("relative_path", relativePath); formData.append("relative_path", relativePath);
formData.append("content", new Blob([contentBytes])); formData.append("content", new Blob([contentBytes]));
@ -152,12 +153,10 @@ export class SyncService {
public async delete({ public async delete({
documentId, documentId,
relativePath, relativePath
createdDate
}: { }: {
documentId: DocumentId; documentId: DocumentId;
relativePath: RelativePath; relativePath: RelativePath;
createdDate: Date;
}): Promise<components["schemas"]["DocumentVersionWithoutContent"]> { }): Promise<components["schemas"]["DocumentVersionWithoutContent"]> {
const response = await this.client.DELETE( const response = await this.client.DELETE(
"/vaults/{vault_id}/documents/{document_id}", "/vaults/{vault_id}/documents/{document_id}",
@ -172,7 +171,6 @@ export class SyncService {
} }
}, },
body: { body: {
createdDate: createdDate.toISOString(),
relativePath relativePath
} }
} }
@ -298,11 +296,17 @@ export class SyncService {
private createClient(remoteUri: string): void { private createClient(remoteUri: string): void {
this.client = createClient<paths>({ this.client = createClient<paths>({
baseUrl: remoteUri, baseUrl: remoteUri,
fetch: retriedFetchFactory(this.logger, this._fetchImplementation) fetch: this.connectedState.getFetchImplementation(
this._fetchImplementation
)
}); });
this.clientWithoutRetries = createClient<paths>({ this.clientWithoutRetries = createClient<paths>({
baseUrl: remoteUri baseUrl: remoteUri,
fetch: this.connectedState.getFetchImplementation(
this._fetchImplementation,
{ doRetries: false }
)
}); });
} }
} }

View file

@ -12,6 +12,7 @@ import { SyncService } from "./services/sync-service";
import { Syncer } from "./sync-operations/syncer"; import { Syncer } from "./sync-operations/syncer";
import type { FileSystemOperations } from "./file-operations/filesystem-operations"; import type { FileSystemOperations } from "./file-operations/filesystem-operations";
import { FileOperations } from "./file-operations/file-operations"; import { FileOperations } from "./file-operations/file-operations";
import { ConnectedState } from "./services/connected-state";
export class SyncClient { export class SyncClient {
private remoteListenerIntervalId: NodeJS.Timeout | null = null; private remoteListenerIntervalId: NodeJS.Timeout | null = null;
@ -90,7 +91,9 @@ export class SyncClient {
} }
); );
const syncService = new SyncService(settings, logger); const connectedState = new ConnectedState(settings, logger);
const syncService = new SyncService(connectedState, settings, logger);
const syncer = new Syncer( const syncer = new Syncer(
logger, logger,
@ -117,18 +120,13 @@ export class SyncClient {
); );
settings.addOnSettingsChangeHandlers((newSettings, oldSettings) => { settings.addOnSettingsChangeHandlers((newSettings, oldSettings) => {
client.registerRemoteEventListener( if (
newSettings.fetchChangesUpdateIntervalMs newSettings.fetchChangesUpdateIntervalMs !==
); oldSettings.fetchChangesUpdateIntervalMs
) {
if (!oldSettings.isSyncEnabled && newSettings.isSyncEnabled) { client.registerRemoteEventListener(
syncer newSettings.fetchChangesUpdateIntervalMs
.scheduleSyncForOfflineChanges() );
.catch((_error: unknown) => {
logger.error(
"Failed to schedule sync for offline changes"
);
});
} }
}); });

View file

@ -3,6 +3,7 @@ import type { SyncService } from "src/services/sync-service";
import type { Logger } from "src/tracing/logger"; import type { Logger } from "src/tracing/logger";
import type { SyncHistory } from "src/tracing/sync-history"; import type { SyncHistory } from "src/tracing/sync-history";
import PQueue from "p-queue"; import PQueue from "p-queue";
import { v4 as uuidv4 } from "uuid";
import { hash } from "src/utils/hash"; import { hash } from "src/utils/hash";
import type { components } from "src/services/types"; import type { components } from "src/services/types";
import type { Settings } from "src/persistence/settings"; import type { Settings } from "src/persistence/settings";
@ -27,7 +28,7 @@ export class Syncer {
public constructor( public constructor(
private readonly logger: Logger, private readonly logger: Logger,
private readonly database: Database, private readonly database: Database,
private readonly settings: Settings, settings: Settings,
private readonly syncService: SyncService, private readonly syncService: SyncService,
private readonly operations: FileOperations, private readonly operations: FileOperations,
history: SyncHistory history: SyncHistory
@ -43,9 +44,11 @@ export class Syncer {
this.syncQueue.concurrency = newSettings.syncConcurrency; this.syncQueue.concurrency = newSettings.syncConcurrency;
}); });
this.syncQueue.on("active", () => { this.syncQueue.on("active", () =>
this.emitRemainingOperationsChange(this.syncQueue.size); this.remainingOperationsListeners.forEach((listener) =>
}); listener(this.syncQueue.size)
)
);
this.internalSyncer = new UnrestrictedSyncer( this.internalSyncer = new UnrestrictedSyncer(
logger, logger,
@ -82,29 +85,32 @@ export class Syncer {
} }
public async syncLocallyCreatedFile( public async syncLocallyCreatedFile(
relativePath: RelativePath, relativePath: RelativePath
updateTime?: Date
): Promise<void> { ): Promise<void> {
if (!this.settings.getSettings().isSyncEnabled) { if (
this.logger.info( this.database.getLatestDocumentByRelativePath(relativePath)
`Syncing is disabled, not syncing '${relativePath}'` ?.isDeleted === false
) {
this.logger.debug(
`Document ${relativePath} already exists in the database, skipping`
); );
return; return;
} }
const [promise, resolve, reject] = createPromise(); const [promise, resolve, reject] = createPromise();
const proposedDocumentId = uuidv4();
// Most likely, we're waiting for the previous delete to finish on the file at this path this.database.getNewResolvedDocumentByRelativePath(
await this.database.getResolvedDocumentByRelativePath( proposedDocumentId,
relativePath, relativePath,
promise promise
); );
try { try {
await this.syncQueue.add(async () => await this.syncQueue.add(() =>
this.internalSyncer.unrestrictedSyncLocallyCreatedFile( this.internalSyncer.unrestrictedSyncLocallyCreatedFile(
() => this.database.getDocumentByUpdatePromise(promise), proposedDocumentId,
updateTime () => this.database.getDocumentByUpdatePromise(promise)
) )
); );
@ -119,13 +125,8 @@ export class Syncer {
public async syncLocallyDeletedFile( public async syncLocallyDeletedFile(
relativePath: RelativePath relativePath: RelativePath
): Promise<void> { ): Promise<void> {
if (!this.settings.getSettings().isSyncEnabled) { // We have to have a record of the delete in case there's an in-flight update for the same
this.logger.info( // document which finishes after the delete has succeeded and would introduce a phantom metadata record.
`Syncing is disabled, not syncing '${relativePath}'`
);
return;
}
this.database.delete(relativePath); this.database.delete(relativePath);
const [promise, resolve, reject] = createPromise(); const [promise, resolve, reject] = createPromise();
@ -137,13 +138,9 @@ export class Syncer {
try { try {
await this.syncQueue.add(async () => await this.syncQueue.add(async () =>
this.internalSyncer.unrestrictedSyncLocallyDeletedFile(() => { this.internalSyncer.unrestrictedSyncLocallyDeletedFile(() =>
this.logger.debug( this.database.getDocumentByUpdatePromise(promise)
`aaaahg ${relativePath} has been deleted locally, syncing to delete it` )
);
return this.database.getDocumentByUpdatePromise(promise);
})
); );
resolve(); resolve();
@ -156,23 +153,22 @@ export class Syncer {
public async syncLocallyUpdatedFile({ public async syncLocallyUpdatedFile({
oldPath, oldPath,
relativePath, relativePath
updateTime
}: { }: {
oldPath?: RelativePath; oldPath?: RelativePath;
relativePath: RelativePath; relativePath: RelativePath;
updateTime?: Date;
}): Promise<void> { }): Promise<void> {
if (!this.settings.getSettings().isSyncEnabled) {
this.logger.info(
`Syncing is disabled, not syncing '${relativePath}'`
);
return;
}
const [promise, resolve, reject] = createPromise();
if (oldPath !== undefined) { if (oldPath !== undefined) {
if (
this.database.getLatestDocumentByRelativePath(oldPath)
?.isDeleted === true
) {
this.logger.debug(
`Document ${oldPath} has been deleted locally, skipping`
);
return;
}
if (oldPath === relativePath) { if (oldPath === relativePath) {
throw new Error( throw new Error(
`Old path and new path are the same: ${oldPath}` `Old path and new path are the same: ${oldPath}`
@ -182,6 +178,18 @@ export class Syncer {
this.database.move(oldPath, relativePath); this.database.move(oldPath, relativePath);
} }
if (
this.database.getLatestDocumentByRelativePath(relativePath)
?.isDeleted === true
) {
this.logger.debug(
`Document ${relativePath} has been deleted locally, skipping`
);
return;
}
const [promise, resolve, reject] = createPromise();
await this.database.getResolvedDocumentByRelativePath( await this.database.getResolvedDocumentByRelativePath(
relativePath, relativePath,
promise promise
@ -191,7 +199,6 @@ export class Syncer {
await this.syncQueue.add(async () => await this.syncQueue.add(async () =>
this.internalSyncer.unrestrictedSyncLocallyUpdatedFile({ this.internalSyncer.unrestrictedSyncLocallyUpdatedFile({
oldPath, oldPath,
updateTime,
getLatestDocument: () => getLatestDocument: () =>
this.database.getDocumentByUpdatePromise(promise) this.database.getDocumentByUpdatePromise(promise)
}) })
@ -206,13 +213,6 @@ export class Syncer {
} }
public async scheduleSyncForOfflineChanges(): Promise<void> { public async scheduleSyncForOfflineChanges(): Promise<void> {
if (!this.settings.getSettings().isSyncEnabled) {
this.logger.debug(
`Syncing is disabled, not uploading local changes`
);
return;
}
if (this.runningScheduleSyncForOfflineChanges !== undefined) { if (this.runningScheduleSyncForOfflineChanges !== undefined) {
this.logger.debug("Uploading local changes is already in progress"); this.logger.debug("Uploading local changes is already in progress");
return this.runningScheduleSyncForOfflineChanges; return this.runningScheduleSyncForOfflineChanges;
@ -234,13 +234,6 @@ export class Syncer {
} }
public async applyRemoteChangesLocally(): Promise<void> { public async applyRemoteChangesLocally(): Promise<void> {
if (!this.settings.getSettings().isSyncEnabled) {
this.logger.debug(
`Syncing is disabled, not fetching remote changes`
);
return;
}
if (this.runningApplyRemoteChangesLocally != null) { if (this.runningApplyRemoteChangesLocally != null) {
this.logger.debug( this.logger.debug(
"Applying remote changes locally is already in progress" "Applying remote changes locally is already in progress"
@ -272,6 +265,35 @@ export class Syncer {
this.internalSyncer.reset(); this.internalSyncer.reset();
} }
private async internalApplyRemoteChangesLocally(): Promise<void> {
const remote = await this.syncQueue.add(async () =>
this.syncService.getAll(this.database.getLastSeenUpdateId())
);
if (!remote) {
throw new Error("Failed to fetch remote changes");
}
if (remote.latestDocuments.length === 0) {
this.logger.debug("No remote changes to apply");
return;
}
this.logger.info("Applying remote changes locally");
await Promise.all(
remote.latestDocuments.map(this.syncRemotelyUpdatedFile.bind(this))
);
const lastSeenUpdateId = this.database.getLastSeenUpdateId();
if (
lastSeenUpdateId === undefined ||
remote.lastUpdateId > lastSeenUpdateId
) {
this.database.setLastSeenUpdateId(remote.lastUpdateId);
}
}
private async syncRemotelyUpdatedFile( private async syncRemotelyUpdatedFile(
remoteVersion: components["schemas"]["DocumentVersionWithoutContent"] remoteVersion: components["schemas"]["DocumentVersionWithoutContent"]
): Promise<void> { ): Promise<void> {
@ -279,45 +301,38 @@ export class Syncer {
remoteVersion.documentId remoteVersion.documentId
); );
if (document === undefined) {
const candidate = this.database.getLatestDocumentByRelativePath(
remoteVersion.relativePath
);
if (candidate !== undefined && candidate.metadata === undefined) {
document = candidate;
}
}
if (document === undefined) {
await this.syncQueue.add(async () =>
this.internalSyncer.unrestrictedSyncRemotelyUpdatedFile(
remoteVersion
)
);
return;
}
const [promise, resolve, reject] = createPromise(); const [promise, resolve, reject] = createPromise();
await this.database.getResolvedDocumentByRelativePath( if (document === undefined) {
document.relativePath,
promise
);
try {
await this.syncQueue.add(async () => await this.syncQueue.add(async () =>
this.internalSyncer.unrestrictedSyncRemotelyUpdatedFile( this.internalSyncer.unrestrictedSyncRemotelyUpdatedFile(
remoteVersion, remoteVersion,
() => this.database.getDocumentByUpdatePromise(promise) () =>
this.database.getDocumentByDocumentId(
remoteVersion.documentId
)
) )
); );
} else {
await this.database.getResolvedDocumentByRelativePath(
document.relativePath,
promise
);
resolve(); try {
} catch (e) { await this.syncQueue.add(async () =>
reject(e); this.internalSyncer.unrestrictedSyncRemotelyUpdatedFile(
} finally { remoteVersion,
this.database.removeDocumentPromise(promise); () => this.database.getDocumentByUpdatePromise(promise)
)
);
resolve();
} catch (e) {
reject(e);
} finally {
this.database.removeDocumentPromise(promise);
}
} }
} }
@ -405,35 +420,4 @@ export class Syncer {
await Promise.all([updates, deletes]); await Promise.all([updates, deletes]);
} }
private async internalApplyRemoteChangesLocally(): Promise<void> {
const remote = await this.syncService.getAll(
this.database.getLastSeenUpdateId()
);
if (remote.latestDocuments.length === 0) {
this.logger.debug("No remote changes to apply");
return;
}
this.logger.info("Applying remote changes locally");
await Promise.all(
remote.latestDocuments.map(this.syncRemotelyUpdatedFile.bind(this))
);
const lastSeenUpdateId = this.database.getLastSeenUpdateId();
if (
lastSeenUpdateId === undefined ||
remote.lastUpdateId > lastSeenUpdateId
) {
this.database.setLastSeenUpdateId(remote.lastUpdateId);
}
}
private emitRemainingOperationsChange(remainingOperations: number): void {
this.remainingOperationsListeners.forEach((listener) => {
listener(remainingOperations);
});
}
} }

View file

@ -1,5 +1,6 @@
import type { import type {
Database, Database,
DocumentId,
DocumentRecord, DocumentRecord,
RelativePath RelativePath
} from "../persistence/database"; } from "../persistence/database";
@ -15,6 +16,7 @@ import type { Settings } from "src/persistence/settings";
import type { FileOperations } from "src/file-operations/file-operations"; import type { FileOperations } from "src/file-operations/file-operations";
import { FileNotFoundError } from "src/file-operations/safe-filesystem-operations"; import { FileNotFoundError } from "src/file-operations/safe-filesystem-operations";
import { DocumentLocks } from "../file-operations/document-locks"; import { DocumentLocks } from "../file-operations/document-locks";
import { createPromise } from "src/utils/create-promise";
export class UnrestrictedSyncer { export class UnrestrictedSyncer {
private readonly locks: DocumentLocks; private readonly locks: DocumentLocks;
@ -31,8 +33,8 @@ export class UnrestrictedSyncer {
} }
public async unrestrictedSyncLocallyCreatedFile( public async unrestrictedSyncLocallyCreatedFile(
getLatestDocument: () => DocumentRecord, proposedDocumentId: DocumentId,
updateTime?: Date getLatestDocument: () => DocumentRecord
): Promise<void> { ): Promise<void> {
let latestDocument = getLatestDocument(); let latestDocument = getLatestDocument();
@ -41,29 +43,15 @@ export class UnrestrictedSyncer {
SyncType.CREATE, SyncType.CREATE,
SyncSource.PUSH, SyncSource.PUSH,
async () => { async () => {
if (
latestDocument.metadata !== undefined &&
!latestDocument.isDeleted
) {
this.logger.debug(
`Document ${latestDocument.relativePath} already exists in the database, no need to create it again`
);
return;
}
const contentBytes = await this.operations.read( const contentBytes = await this.operations.read(
latestDocument.relativePath latestDocument.relativePath
); // this can throw FileNotFoundError ); // this can throw FileNotFoundError
const contentHash = hash(contentBytes); const contentHash = hash(contentBytes);
updateTime ??= await this.operations.getModificationTime(
latestDocument.relativePath
); // this can throw FileNotFoundError
const response = await this.syncService.create({ const response = await this.syncService.create({
documentId: proposedDocumentId,
relativePath: latestDocument.relativePath, relativePath: latestDocument.relativePath,
contentBytes, contentBytes
createdDate: updateTime
}); });
latestDocument = getLatestDocument(); latestDocument = getLatestDocument();
@ -76,15 +64,15 @@ export class UnrestrictedSyncer {
type: SyncType.CREATE type: SyncType.CREATE
}); });
const newMetadata = { this.database.setDocument(
relativePath: latestDocument.relativePath, {
documentId: response.documentId, relativePath: latestDocument.relativePath,
parentVersionId: response.vaultUpdateId, documentId: response.documentId,
hash: contentHash, parentVersionId: response.vaultUpdateId,
isDeleted: false hash: contentHash
}; },
latestDocument.identity
this.database.setDocument(newMetadata, latestDocument.identity); );
this.tryIncrementVaultUpdateId(response.vaultUpdateId); this.tryIncrementVaultUpdateId(response.vaultUpdateId);
} }
@ -100,17 +88,9 @@ export class UnrestrictedSyncer {
SyncType.DELETE, SyncType.DELETE,
SyncSource.PUSH, SyncSource.PUSH,
async () => { async () => {
if (document.metadata === undefined) {
this.logger.debug(
`Document '${document.relativePath}' has been created yet so deleting it remotely can be skipped`
);
return;
}
const response = await this.syncService.delete({ const response = await this.syncService.delete({
documentId: document.metadata.documentId, documentId: document.documentId,
relativePath: document.relativePath, relativePath: document.relativePath
createdDate: new Date() // We've got the event now, so it must have been deleted just now
}); });
this.history.addHistoryEntry({ this.history.addHistoryEntry({
@ -123,8 +103,6 @@ export class UnrestrictedSyncer {
document = getLatestDocument(); document = getLatestDocument();
// We have to have a record of the delete in case there's an in-flight update for the same
// document which finishes after the delete has succeeded and would introduce a phantom metadata record.
this.database.setDocument( this.database.setDocument(
{ {
relativePath: document.relativePath, relativePath: document.relativePath,
@ -140,12 +118,10 @@ export class UnrestrictedSyncer {
public async unrestrictedSyncLocallyUpdatedFile({ public async unrestrictedSyncLocallyUpdatedFile({
oldPath, oldPath,
getLatestDocument, getLatestDocument
updateTime
}: { }: {
oldPath?: RelativePath; oldPath?: RelativePath;
getLatestDocument: () => DocumentRecord; getLatestDocument: () => DocumentRecord;
updateTime?: Date;
}): Promise<void> { }): Promise<void> {
let document = getLatestDocument(); let document = getLatestDocument();
@ -178,19 +154,13 @@ export class UnrestrictedSyncer {
return; return;
} }
updateTime ??= await this.operations.getModificationTime(
document.relativePath
); // this can throw FileNotFoundError;
const response = await this.syncService.put({ const response = await this.syncService.put({
documentId: document.metadata.documentId, documentId: document.documentId,
parentVersionId: document.metadata.parentVersionId, parentVersionId: document.metadata.parentVersionId,
relativePath: document.relativePath, relativePath: document.relativePath,
contentBytes, contentBytes
createdDate: updateTime
}); });
// Update relativePath which is the only property that can change while this is running (due to a move)
document = getLatestDocument(); document = getLatestDocument();
if (document.isDeleted) { if (document.isDeleted) {
@ -252,6 +222,11 @@ export class UnrestrictedSyncer {
} }
if (response.relativePath != document.relativePath) { if (response.relativePath != document.relativePath) {
// this.database.getNewResolvedDocumentByRelativePath(
// response.relativePath,
// promise
// );
await this.operations.move( await this.operations.move(
document.relativePath, document.relativePath,
response.relativePath, response.relativePath,
@ -283,10 +258,7 @@ export class UnrestrictedSyncer {
this.database.setDocument( this.database.setDocument(
{ {
documentId: response.documentId, documentId: response.documentId,
relativePath: relativePath: document.relativePath,
response.relativePath != document.relativePath
? response.relativePath
: document.relativePath,
parentVersionId: response.vaultUpdateId, parentVersionId: response.vaultUpdateId,
hash: contentHash hash: contentHash
}, },
@ -300,20 +272,19 @@ export class UnrestrictedSyncer {
public async unrestrictedSyncRemotelyUpdatedFile( public async unrestrictedSyncRemotelyUpdatedFile(
remoteVersion: components["schemas"]["DocumentVersionWithoutContent"], remoteVersion: components["schemas"]["DocumentVersionWithoutContent"],
getLatestDocument?: () => DocumentRecord getLatestDocument: () => DocumentRecord | undefined
): Promise<void> { ): Promise<void> {
await this.executeSync( await this.executeSync(
[remoteVersion.relativePath], [remoteVersion.relativePath],
SyncType.UPDATE, SyncType.UPDATE,
SyncSource.PULL, SyncSource.PULL,
async () => { async () => {
const localMetadata = let localMetadata = getLatestDocument();
getLatestDocument?.() ??
this.database.getDocumentByDocumentId(
remoteVersion.documentId
);
if (localMetadata?.metadata !== undefined) { if (
localMetadata !== undefined &&
localMetadata?.metadata !== undefined
) {
// If the file exists locally, let's pretend the user has updated it // If the file exists locally, let's pretend the user has updated it
// and deal with remote update/deletion within `unrestrictedSyncLocallyUpdatedFile` // and deal with remote update/deletion within `unrestrictedSyncLocallyUpdatedFile`
if ( if (
@ -329,7 +300,7 @@ export class UnrestrictedSyncer {
return this.unrestrictedSyncLocallyUpdatedFile({ return this.unrestrictedSyncLocallyUpdatedFile({
getLatestDocument: () => getLatestDocument: () =>
this.database.getDocumentByIdentity( this.database.getDocumentByIdentity(
localMetadata.identity localMetadata!.identity
) )
}); });
} else if (remoteVersion.isDeleted) { } else if (remoteVersion.isDeleted) {
@ -347,27 +318,26 @@ export class UnrestrictedSyncer {
}) })
).contentBase64; ).contentBase64;
const latestDocument = localMetadata = getLatestDocument();
getLatestDocument?.() ??
this.database.getDocumentByDocumentId(
remoteVersion.documentId
);
if (latestDocument?.isDeleted) { if (localMetadata?.isDeleted === true) {
this.logger.info( this.logger.info(
`Document ${remoteVersion.relativePath} has been deleted locally before we could finish updating it` `Document ${remoteVersion.relativePath} has been deleted locally before we could finish updating it`
); );
return; return;
} }
if (
localMetadata?.metadata?.parentVersionId ??
-1 >= remoteVersion.vaultUpdateId
) {
this.logger.debug(
`Document ${remoteVersion.relativePath} is already more up to date than the fetched version`
);
return;
}
const contentBytes = deserialize(content); const contentBytes = deserialize(content);
await this.operations.create(
remoteVersion.relativePath,
contentBytes,
remoteVersion.documentId
);
this.database.setDocument( this.database.setDocument(
{ {
documentId: remoteVersion.documentId, documentId: remoteVersion.documentId,
@ -375,7 +345,13 @@ export class UnrestrictedSyncer {
parentVersionId: remoteVersion.vaultUpdateId, parentVersionId: remoteVersion.vaultUpdateId,
hash: hash(contentBytes) hash: hash(contentBytes)
}, },
latestDocument?.identity localMetadata?.identity
);
await this.operations.create(
remoteVersion.relativePath,
contentBytes,
remoteVersion.documentId
); );
this.history.addHistoryEntry({ this.history.addHistoryEntry({
@ -390,19 +366,12 @@ export class UnrestrictedSyncer {
} }
public async executeSync<T>( public async executeSync<T>(
lockedPaths: RelativePath[], paths: RelativePath[],
syncType: SyncType, syncType: SyncType,
syncSource: SyncSource, syncSource: SyncSource,
fn: () => Promise<T> fn: () => Promise<T>
): Promise<T | undefined> { ): Promise<T | undefined> {
const relativePath = lockedPaths[lockedPaths.length - 1]; const relativePath = paths[paths.length - 1];
if (!this.settings.getSettings().isSyncEnabled) {
this.logger.info(
`Syncing is disabled, not syncing '${relativePath}'`
);
return;
}
if (!this.operations.isFileEligibleForSync(relativePath)) { if (!this.operations.isFileEligibleForSync(relativePath)) {
this.history.addHistoryEntry({ this.history.addHistoryEntry({

View file

@ -83,8 +83,6 @@ export class MockAgent extends MockClient {
} }
public async act(): Promise<void> { public async act(): Promise<void> {
this.assertAllContentIsPresentOnce();
const options: (() => Promise<unknown>)[] = [ const options: (() => Promise<unknown>)[] = [
this.createFileAction.bind(this), this.createFileAction.bind(this),
this.changeFetchChangesUpdateIntervalMsAction.bind(this) this.changeFetchChangesUpdateIntervalMsAction.bind(this)

View file

@ -1,3 +1,4 @@
import { assert } from "../utils/assert";
import type { import type {
RelativePath, RelativePath,
FileSystemOperations, FileSystemOperations,
@ -81,6 +82,18 @@ export class MockClient implements FileSystemOperations {
const newContentUint8Array = new TextEncoder().encode(newContent); const newContentUint8Array = new TextEncoder().encode(newContent);
this.localFiles.set(path, newContentUint8Array); this.localFiles.set(path, newContentUint8Array);
const existingPats = currentContent
.split(" ")
.map((part) => part.trim());
const newParts = newContent.split(" ").map((part) => part.trim());
existingPats.forEach((part) =>
// all changes should be additive
assert(
newParts.includes(part),
`Part ${part} not found in new content`
)
);
this.client.logger.info( this.client.logger.info(
`Updated file ${path} with:\n current content: ${currentContent}\n new content: ${newContent}` `Updated file ${path} with:\n current content: ${currentContent}\n new content: ${newContent}`
); );

View file

@ -92,16 +92,16 @@ async function runTest({
async function runTests(): Promise<void> { async function runTests(): Promise<void> {
const agentCounts = [2, 8]; const agentCounts = [2, 8];
const jitterScaleInSeconds = [0.5, 0, 2]; const jitterScaleInSeconds = [0.5, 0, 2];
const concurrencies = [1]; const concurrencies = [16, 1];
const iterations = [50, 200]; const iterations = [50, 200];
const doDeletes = [true, false]; const doDeletes = [true, false];
for (let i = 0; i < 10; i++) { for (const agentCount of agentCounts) {
for (const agentCount of agentCounts) { for (const concurrency of concurrencies) {
for (const concurrency of concurrencies) { for (const jitter of jitterScaleInSeconds) {
for (const jitter of jitterScaleInSeconds) { for (const iteration of iterations) {
for (const iteration of iterations) { for (const deleteFiles of doDeletes) {
for (const deleteFiles of doDeletes) { for (let i = 0; i < 10; i++) {
await runTest({ await runTest({
agentCount, agentCount,
concurrency, concurrency,
@ -110,6 +110,7 @@ async function runTests(): Promise<void> {
jitterScaleInSeconds: jitter jitterScaleInSeconds: jitter
}); });
} }
return;
} }
} }
} }