WIP
This commit is contained in:
parent
bcf48c428d
commit
a2522ca44a
14 changed files with 1370 additions and 1040 deletions
|
|
@ -10,7 +10,7 @@ use serde::Deserialize;
|
|||
|
||||
use super::{app_state::AppState, auth::auth, requests::DeleteDocumentVersion};
|
||||
use crate::{
|
||||
database::models::{DocumentId, StoredDocumentVersion, VaultId},
|
||||
database::models::{DocumentId, DocumentVersionWithoutContent, StoredDocumentVersion, VaultId},
|
||||
errors::{SyncServerError, server_error},
|
||||
utils::sanitize_path,
|
||||
};
|
||||
|
|
@ -31,7 +31,7 @@ pub async fn delete_document(
|
|||
}): Path<PathParams>,
|
||||
State(state): State<AppState>,
|
||||
Json(request): Json<DeleteDocumentVersion>,
|
||||
) -> Result<(), SyncServerError> {
|
||||
) -> Result<Json<DocumentVersionWithoutContent>, SyncServerError> {
|
||||
auth(&state, auth_header.token())?;
|
||||
|
||||
let mut transaction = state
|
||||
|
|
@ -69,5 +69,5 @@ pub async fn delete_document(
|
|||
.context("Failed to commit successful transaction")
|
||||
.map_err(server_error)?;
|
||||
|
||||
Ok(())
|
||||
Ok(Json(new_version.into()))
|
||||
}
|
||||
|
|
|
|||
|
|
@ -138,6 +138,18 @@ async fn internal_update_document(
|
|||
Ok,
|
||||
)?;
|
||||
|
||||
if latest_version.is_deleted {
|
||||
transaction
|
||||
.rollback()
|
||||
.await
|
||||
.context("Failed to roll back transaction")
|
||||
.map_err(server_error)?;
|
||||
|
||||
return Ok(Json(DocumentUpdateResponse::FastForwardUpdate(
|
||||
latest_version.into(),
|
||||
)));
|
||||
}
|
||||
|
||||
let sanitized_relative_path = sanitize_path(&relative_path);
|
||||
|
||||
// Return the latest version if the content and path are the same as the latest
|
||||
|
|
@ -195,7 +207,7 @@ async fn internal_update_document(
|
|||
content: merged_content,
|
||||
created_date,
|
||||
updated_date: chrono::Utc::now(),
|
||||
is_deleted: latest_version.is_deleted,
|
||||
is_deleted: false,
|
||||
};
|
||||
|
||||
state
|
||||
|
|
|
|||
|
|
@ -1,17 +1,27 @@
|
|||
import type { FileSystemOperations } from "sync-client";
|
||||
import type { Database, RelativePath } from "../persistence/database";
|
||||
import type {
|
||||
Database,
|
||||
DocumentMetadata,
|
||||
RelativePath
|
||||
} from "../persistence/database";
|
||||
import { FileOperations } from "./file-operations";
|
||||
import { Logger } from "../tracing/logger";
|
||||
import { assertSetContainsExactly } from "../utils/assert-set-contains-exactly";
|
||||
|
||||
describe("File operations", () => {
|
||||
class MockDatabase {
|
||||
public async updatePath(
|
||||
public async move(
|
||||
_oldRelativePath: RelativePath,
|
||||
_newRelativePath: RelativePath
|
||||
): Promise<void> {
|
||||
// this is called but irrelevant for this mock
|
||||
}
|
||||
|
||||
public getResolvedDocument(
|
||||
_relativePath: RelativePath | undefined
|
||||
): DocumentMetadata | undefined {
|
||||
return undefined;
|
||||
}
|
||||
}
|
||||
|
||||
class FakeFileSystemOperations implements FileSystemOperations {
|
||||
|
|
|
|||
|
|
@ -6,7 +6,10 @@ import type {
|
|||
RelativePath
|
||||
} from "src/persistence/database";
|
||||
import { isBinary, isFileTypeMergable, mergeText } from "sync_lib";
|
||||
import { SafeFileSystemOperations } from "./safe-filesystem-operations";
|
||||
import {
|
||||
FileNotFoundError,
|
||||
SafeFileSystemOperations
|
||||
} from "./safe-filesystem-operations";
|
||||
|
||||
export class FileOperations {
|
||||
private static readonly PARENTHESES_REGEX = / \((\d+)\)$/;
|
||||
|
|
@ -17,7 +20,7 @@ export class FileOperations {
|
|||
private readonly database: Database,
|
||||
fs: FileSystemOperations
|
||||
) {
|
||||
this.fs = new SafeFileSystemOperations(fs);
|
||||
this.fs = new SafeFileSystemOperations(fs, logger);
|
||||
}
|
||||
|
||||
public async listAllFiles(): Promise<RelativePath[]> {
|
||||
|
|
@ -58,15 +61,37 @@ export class FileOperations {
|
|||
// All parent directories are created if they don't exist.
|
||||
public async create(
|
||||
path: RelativePath,
|
||||
newContent: Uint8Array
|
||||
newContent: Uint8Array,
|
||||
documentId?: DocumentId
|
||||
): Promise<void> {
|
||||
this.logger.debug(`Creating file: ${path}`);
|
||||
if (await this.fs.exists(path)) {
|
||||
const deconflictedPath = await this.deconflictPath(path);
|
||||
this.logger.debug(
|
||||
`Didn't expect ${path} to exist, deconflicting by moving it to '${deconflictedPath}'`
|
||||
);
|
||||
await this.database.updatePath(path, deconflictedPath);
|
||||
await this.fs.rename(path, deconflictedPath);
|
||||
|
||||
const existingMetadata = this.database.getResolvedDocument(path);
|
||||
this.logger.debug(
|
||||
`Existing metadata for ${path}: ${JSON.stringify(existingMetadata)}`
|
||||
);
|
||||
if (
|
||||
existingMetadata === undefined ||
|
||||
existingMetadata.isDeleted ||
|
||||
existingMetadata.documentId !== documentId ||
|
||||
!documentId
|
||||
) {
|
||||
this.logger.debug(
|
||||
`We need to save what's at ${path} to ${deconflictedPath}`
|
||||
);
|
||||
await this.move(path, deconflictedPath, documentId);
|
||||
await this.database.move(path, deconflictedPath);
|
||||
} else {
|
||||
// This can happen if the document got moved both locally and remotely
|
||||
// to the same file path. In this case, we shouldn't deconflict, however,
|
||||
// we also can't overwrite otherwise we'd lose changes.
|
||||
throw new FileNotFoundError(path);
|
||||
}
|
||||
} else {
|
||||
await this.createParentDirectories(path);
|
||||
}
|
||||
|
|
@ -126,9 +151,13 @@ export class FileOperations {
|
|||
return new TextEncoder().encode(resultText);
|
||||
}
|
||||
|
||||
public async remove(path: RelativePath): Promise<void> {
|
||||
this.logger.debug(`Deleting file: ${path}`);
|
||||
return this.fs.delete(path);
|
||||
public async delete(path: RelativePath): Promise<void> {
|
||||
if (!(await this.exists(path))) {
|
||||
this.logger.debug(`Deleting file: ${path}`);
|
||||
return this.fs.delete(path);
|
||||
} else {
|
||||
this.logger.debug(`No need to delete '${path}', it doesn't exist`);
|
||||
}
|
||||
}
|
||||
|
||||
public async move(
|
||||
|
|
@ -145,16 +174,20 @@ export class FileOperations {
|
|||
this.logger.debug(
|
||||
`Conflict when moving '${oldPath}' to '${newPath}', the latter already exists, deconflicting by moving it to '${deconflictedPath}'`
|
||||
);
|
||||
|
||||
const existingMetadata = this.database.getDocument(newPath);
|
||||
const existingMetadata = this.database.getResolvedDocument(newPath);
|
||||
if (
|
||||
existingMetadata === undefined ||
|
||||
existingMetadata.documentId !== documentId
|
||||
existingMetadata.isDeleted ||
|
||||
existingMetadata.documentId !== documentId ||
|
||||
!documentId
|
||||
) {
|
||||
await this.database.updatePath(newPath, deconflictedPath);
|
||||
await this.fs.rename(newPath, deconflictedPath);
|
||||
await this.move(newPath, deconflictedPath, documentId);
|
||||
await this.database.move(oldPath, newPath);
|
||||
} else {
|
||||
await this.database.deleteDocument(newPath);
|
||||
// This can happen if the document got moved both locally and remotely
|
||||
// to the same file path. In this case, we shouldn't deconflict, however,
|
||||
// we also can't overwrite otherwise we'd lose changes.
|
||||
throw new FileNotFoundError(newPath);
|
||||
}
|
||||
} else {
|
||||
await this.createParentDirectories(newPath);
|
||||
|
|
|
|||
|
|
@ -13,7 +13,5 @@ export interface FileSystemOperations {
|
|||
exists: (path: RelativePath) => Promise<boolean>;
|
||||
createDirectory: (path: RelativePath) => Promise<void>;
|
||||
delete: (path: RelativePath) => Promise<void>;
|
||||
|
||||
// Must be able to handle renaming to a file that already exists
|
||||
rename: (oldPath: RelativePath, newPath: RelativePath) => Promise<void>;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,5 +1,7 @@
|
|||
import type { RelativePath } from "src/persistence/database";
|
||||
import type { RelativePath } from "../persistence/database";
|
||||
import type { FileSystemOperations } from "./filesystem-operations";
|
||||
import type { Logger } from "../tracing/logger";
|
||||
import { DocumentLocks } from "./document-locks";
|
||||
|
||||
export class FileNotFoundError extends Error {
|
||||
public constructor(message: string) {
|
||||
|
|
@ -9,71 +11,145 @@ export class FileNotFoundError extends Error {
|
|||
}
|
||||
|
||||
// Decorate FileSystemOperations replacing errors with FileNotFoundError
|
||||
// if the accessed file doesn't exist.
|
||||
// if the accessed file doesn't exist. It also ensures that there's only
|
||||
// ever a single request in-flight for any one file.
|
||||
export class SafeFileSystemOperations implements FileSystemOperations {
|
||||
public constructor(private readonly fs: FileSystemOperations) {}
|
||||
private readonly locks: DocumentLocks;
|
||||
|
||||
public constructor(
|
||||
private readonly fs: FileSystemOperations,
|
||||
private readonly logger: Logger
|
||||
) {
|
||||
this.locks = new DocumentLocks(logger);
|
||||
}
|
||||
|
||||
public async listAllFiles(): Promise<RelativePath[]> {
|
||||
this.logger.debug("Listing all files");
|
||||
return this.fs.listAllFiles();
|
||||
}
|
||||
|
||||
public async read(path: RelativePath): Promise<Uint8Array> {
|
||||
return this.safeOperation(path, async () => this.fs.read(path));
|
||||
this.logger.debug(`Reading file: ${path}`);
|
||||
return this.safeOperation(
|
||||
path,
|
||||
this.decorateToHoldLock(path, async () => this.fs.read(path)),
|
||||
"read"
|
||||
);
|
||||
}
|
||||
|
||||
public async write(path: RelativePath, content: Uint8Array): Promise<void> {
|
||||
return this.fs.write(path, content);
|
||||
this.logger.debug(`Writing file: ${path}`);
|
||||
return this.decorateToHoldLock(path, async () =>
|
||||
this.fs.write(path, content)
|
||||
)();
|
||||
}
|
||||
|
||||
public async atomicUpdateText(
|
||||
path: RelativePath,
|
||||
updater: (currentContent: string) => string
|
||||
): Promise<string> {
|
||||
return this.safeOperation(path, async () =>
|
||||
this.fs.atomicUpdateText(path, updater)
|
||||
this.logger.debug(`Atomic update of file: ${path}`);
|
||||
return this.safeOperation(
|
||||
path,
|
||||
this.decorateToHoldLock(path, async () =>
|
||||
this.fs.atomicUpdateText(path, updater)
|
||||
),
|
||||
"atomicUpdateText"
|
||||
);
|
||||
}
|
||||
|
||||
public async getFileSize(path: RelativePath): Promise<number> {
|
||||
return this.safeOperation(path, async () => this.fs.getFileSize(path));
|
||||
this.logger.debug(`Getting file size: ${path}`);
|
||||
return this.safeOperation(
|
||||
path,
|
||||
this.decorateToHoldLock(path, async () =>
|
||||
this.fs.getFileSize(path)
|
||||
),
|
||||
"getFileSize"
|
||||
);
|
||||
}
|
||||
|
||||
public async getModificationTime(path: RelativePath): Promise<Date> {
|
||||
return this.safeOperation(path, async () =>
|
||||
this.fs.getModificationTime(path)
|
||||
this.logger.debug(`Getting modification time: ${path}`);
|
||||
return this.safeOperation(
|
||||
path,
|
||||
this.decorateToHoldLock(path, async () =>
|
||||
this.fs.getModificationTime(path)
|
||||
),
|
||||
"getModificationTime"
|
||||
);
|
||||
}
|
||||
|
||||
public async exists(path: RelativePath): Promise<boolean> {
|
||||
return this.fs.exists(path);
|
||||
this.logger.debug(`Checking if file exists: ${path}`);
|
||||
return this.decorateToHoldLock(path, async () =>
|
||||
this.fs.exists(path)
|
||||
)();
|
||||
}
|
||||
|
||||
public async createDirectory(path: RelativePath): Promise<void> {
|
||||
return this.fs.createDirectory(path);
|
||||
this.logger.debug(`Creating directory: ${path}`);
|
||||
return this.decorateToHoldLock(path, async () =>
|
||||
this.fs.createDirectory(path)
|
||||
)();
|
||||
}
|
||||
|
||||
public async delete(path: RelativePath): Promise<void> {
|
||||
return this.fs.delete(path);
|
||||
this.logger.debug(`Deleting file: ${path}`);
|
||||
return this.decorateToHoldLock(path, async () =>
|
||||
this.fs.delete(path)
|
||||
)();
|
||||
}
|
||||
|
||||
public async rename(
|
||||
oldPath: RelativePath,
|
||||
newPath: RelativePath
|
||||
): Promise<void> {
|
||||
return this.safeOperation(oldPath, async () =>
|
||||
this.fs.rename(oldPath, newPath)
|
||||
this.logger.debug(`Renaming file: ${oldPath} to ${newPath}`);
|
||||
return this.safeOperation(
|
||||
oldPath,
|
||||
this.decorateToHoldLock([oldPath, newPath], async () =>
|
||||
this.fs.rename(oldPath, newPath)
|
||||
),
|
||||
"rename"
|
||||
);
|
||||
}
|
||||
|
||||
private decorateToHoldLock<T>(
|
||||
pathOrPaths: RelativePath | RelativePath[],
|
||||
operation: () => Promise<T>
|
||||
): () => Promise<T> {
|
||||
return async () => {
|
||||
const paths = Array.isArray(pathOrPaths)
|
||||
? pathOrPaths
|
||||
: [pathOrPaths];
|
||||
await Promise.all(
|
||||
paths.map(async (path) => this.locks.waitForDocumentLock(path))
|
||||
);
|
||||
try {
|
||||
return await operation();
|
||||
} finally {
|
||||
await Promise.all(
|
||||
paths.map((path) => {
|
||||
this.locks.unlockDocument(path);
|
||||
})
|
||||
);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private async safeOperation<T>(
|
||||
path: RelativePath,
|
||||
operation: () => Promise<T>
|
||||
operation: () => Promise<T>,
|
||||
operationName: string
|
||||
): Promise<T> {
|
||||
// Without locking the file, this isn't atomic, however, it's good enough practicaly.
|
||||
// This will only break if the file exists, gets deleted and then immediately
|
||||
// recreated while `operation` is running.
|
||||
if (!(await this.fs.exists(path))) {
|
||||
throw new FileNotFoundError(path);
|
||||
throw new FileNotFoundError(
|
||||
`File not found: ${path} before trying to ${operationName}`
|
||||
);
|
||||
}
|
||||
try {
|
||||
return await operation();
|
||||
|
|
@ -81,7 +157,9 @@ export class SafeFileSystemOperations implements FileSystemOperations {
|
|||
if (await this.fs.exists(path)) {
|
||||
throw error;
|
||||
} else {
|
||||
throw new FileNotFoundError(path);
|
||||
throw new FileNotFoundError(
|
||||
`File not found: ${path} when trying to ${operationName}`
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -6,6 +6,7 @@ export interface DocumentMetadata {
|
|||
parentVersionId: VaultUpdateId;
|
||||
documentId: DocumentId;
|
||||
hash: string;
|
||||
isDeleted: boolean;
|
||||
}
|
||||
|
||||
import type { Logger } from "src/tracing/logger";
|
||||
|
|
@ -16,7 +17,10 @@ export interface StoredDatabase {
|
|||
}
|
||||
|
||||
export class Database {
|
||||
private documents = new Map<RelativePath, DocumentMetadata>();
|
||||
private documents = new Map<
|
||||
RelativePath,
|
||||
DocumentMetadata | Promise<DocumentMetadata | undefined>
|
||||
>();
|
||||
|
||||
private lastSeenUpdateId: VaultUpdateId | undefined;
|
||||
|
||||
|
|
@ -43,8 +47,15 @@ export class Database {
|
|||
);
|
||||
}
|
||||
|
||||
public getDocuments(): Map<RelativePath, DocumentMetadata> {
|
||||
return this.documents;
|
||||
public get length(): number {
|
||||
return this.documents.size;
|
||||
}
|
||||
|
||||
public get resolvedDocuments(): [RelativePath, DocumentMetadata][] {
|
||||
// eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion
|
||||
return Array.from(this.documents.entries()).filter(
|
||||
([_, metadata]) => !(metadata instanceof Promise)
|
||||
) as [RelativePath, DocumentMetadata][];
|
||||
}
|
||||
|
||||
public getLastSeenUpdateId(): VaultUpdateId | undefined {
|
||||
|
|
@ -67,58 +78,106 @@ export class Database {
|
|||
public getDocumentByDocumentId(
|
||||
documentId: DocumentId
|
||||
): [RelativePath, DocumentMetadata] | undefined {
|
||||
return [...this.documents.entries()].find(
|
||||
return this.resolvedDocuments.find(
|
||||
([_, metadata]) => metadata.documentId === documentId
|
||||
);
|
||||
}
|
||||
|
||||
public getDocumentByIdentity(
|
||||
document:
|
||||
| DocumentMetadata
|
||||
| Promise<DocumentMetadata | undefined>
|
||||
| undefined
|
||||
):
|
||||
| [
|
||||
RelativePath,
|
||||
DocumentMetadata | Promise<DocumentMetadata | undefined>
|
||||
]
|
||||
| undefined {
|
||||
if (document === undefined) {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
return Array.from(this.documents.entries()).find(
|
||||
([_, metadata]) => metadata === document
|
||||
);
|
||||
}
|
||||
|
||||
public async setDocument({
|
||||
documentId,
|
||||
relativePath,
|
||||
parentVersionId,
|
||||
hash
|
||||
hash,
|
||||
isDeleted
|
||||
}: {
|
||||
documentId: DocumentId;
|
||||
relativePath: RelativePath;
|
||||
parentVersionId: VaultUpdateId;
|
||||
hash: string;
|
||||
isDeleted: boolean;
|
||||
}): Promise<void> {
|
||||
this.documents.set(relativePath, {
|
||||
documentId,
|
||||
parentVersionId,
|
||||
hash
|
||||
hash,
|
||||
isDeleted
|
||||
});
|
||||
await this.save();
|
||||
}
|
||||
|
||||
public async removeDocument(relativePath: RelativePath): Promise<void> {
|
||||
this.documents.delete(relativePath);
|
||||
await this.save();
|
||||
public async setDocumentPromise({
|
||||
relativePath,
|
||||
promise
|
||||
}: {
|
||||
relativePath: RelativePath;
|
||||
promise: Promise<DocumentMetadata | undefined>;
|
||||
}): Promise<void> {
|
||||
this.documents.set(relativePath, promise);
|
||||
// No need to save as Promises don't get serialized
|
||||
// and a crash would only result in the document being
|
||||
// creatied again.
|
||||
}
|
||||
|
||||
public getResolvedDocument(
|
||||
relativePath: RelativePath | undefined
|
||||
): DocumentMetadata | undefined {
|
||||
if (relativePath == undefined) {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
const metadata = this.documents.get(relativePath);
|
||||
if (metadata instanceof Promise) {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
return metadata;
|
||||
}
|
||||
|
||||
public getDocument(
|
||||
relativePath: RelativePath
|
||||
): DocumentMetadata | undefined {
|
||||
relativePath: RelativePath | undefined
|
||||
): Promise<DocumentMetadata | undefined> | DocumentMetadata | undefined {
|
||||
if (relativePath == undefined) {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
return this.documents.get(relativePath);
|
||||
}
|
||||
|
||||
public async deleteDocument(relativePath: RelativePath): Promise<void> {
|
||||
this.documents.delete(relativePath);
|
||||
await this.save();
|
||||
}
|
||||
|
||||
public async updatePath(
|
||||
public async move(
|
||||
oldRelativePath: RelativePath,
|
||||
newRelativePath: RelativePath
|
||||
): Promise<void> {
|
||||
const document = this.documents.get(oldRelativePath);
|
||||
if (!document) {
|
||||
throw new Error(
|
||||
`Cannot update physical path for document that does not exist: ${oldRelativePath}`
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
if (this.documents.has(newRelativePath)) {
|
||||
const resolvedDocument = this.getResolvedDocument(oldRelativePath);
|
||||
if (
|
||||
this.documents.has(newRelativePath) &&
|
||||
resolvedDocument != undefined &&
|
||||
resolvedDocument.isDeleted
|
||||
) {
|
||||
throw new Error(
|
||||
`Cannot update physical path to path that is already in use: ${newRelativePath}`
|
||||
);
|
||||
|
|
@ -133,16 +192,15 @@ export class Database {
|
|||
private async save(): Promise<void> {
|
||||
this.ensureConsistency();
|
||||
await this.saveData({
|
||||
documents: Object.fromEntries(this.documents.entries()),
|
||||
documents: Object.fromEntries(this.resolvedDocuments),
|
||||
lastSeenUpdateId: this.lastSeenUpdateId
|
||||
});
|
||||
}
|
||||
|
||||
private ensureConsistency(): void {
|
||||
const allMetadata = Array.from(this.documents.entries());
|
||||
const idToPath = new Map<string, Array<string>>();
|
||||
const idToPath = new Map<string, string[]>();
|
||||
|
||||
allMetadata.forEach(([name, metadata]) => {
|
||||
this.resolvedDocuments.forEach(([name, metadata]) => {
|
||||
idToPath.set(metadata.documentId, [
|
||||
...(idToPath.get(metadata.documentId) ?? []),
|
||||
name
|
||||
|
|
|
|||
|
|
@ -59,7 +59,7 @@ export class SyncService {
|
|||
relativePath: RelativePath;
|
||||
contentBytes: Uint8Array;
|
||||
createdDate: Date;
|
||||
}): Promise<components["schemas"]["DocumentUpdateResponse"]> {
|
||||
}): Promise<components["schemas"]["DocumentVersionWithoutContent"]> {
|
||||
const formData = new FormData();
|
||||
formData.append("relative_path", relativePath);
|
||||
formData.append("created_date", createdDate.toISOString());
|
||||
|
|
@ -155,7 +155,7 @@ export class SyncService {
|
|||
documentId: DocumentId;
|
||||
relativePath: RelativePath;
|
||||
createdDate: Date;
|
||||
}): Promise<void> {
|
||||
}): Promise<components["schemas"]["DocumentVersionWithoutContent"]> {
|
||||
const response = await this.client.DELETE(
|
||||
"/vaults/{vault_id}/documents/{document_id}",
|
||||
{
|
||||
|
|
|
|||
File diff suppressed because it is too large
Load diff
|
|
@ -42,7 +42,7 @@ export class SyncClient {
|
|||
}
|
||||
|
||||
public get documentCount(): number {
|
||||
return this._database.getDocuments().size;
|
||||
return this._database.length;
|
||||
}
|
||||
|
||||
public set fetchImplementation(fetch: typeof globalThis.fetch) {
|
||||
|
|
|
|||
|
|
@ -1,4 +1,8 @@
|
|||
import type { Database, RelativePath } from "../persistence/database";
|
||||
import type {
|
||||
Database,
|
||||
DocumentMetadata,
|
||||
RelativePath
|
||||
} from "../persistence/database";
|
||||
|
||||
import type { SyncService } from "src/services/sync-service";
|
||||
import type { Logger } from "src/tracing/logger";
|
||||
|
|
@ -10,6 +14,7 @@ import type { Settings } from "src/persistence/settings";
|
|||
import type { FileOperations } from "src/file-operations/file-operations";
|
||||
import { findMatchingFileBasedOnHash } from "src/utils/find-matching-file-based-on-hash";
|
||||
import { UnrestrictedSyncer } from "./unrestricted-syncer";
|
||||
import { FileNotFoundError } from "src/file-operations/safe-filesystem-operations";
|
||||
|
||||
export class Syncer {
|
||||
private readonly remainingOperationsListeners: ((
|
||||
|
|
@ -58,6 +63,23 @@ export class Syncer {
|
|||
);
|
||||
}
|
||||
|
||||
private static async forgivingFileNotFoundWrapper<T>(
|
||||
fn: () => Promise<T>,
|
||||
logger: Logger
|
||||
): Promise<T | undefined> {
|
||||
try {
|
||||
return await fn();
|
||||
} catch (e) {
|
||||
if (e instanceof FileNotFoundError) {
|
||||
logger.debug(
|
||||
`File has been deleted or moved before we had a chance to inspect it, skipping`
|
||||
);
|
||||
} else {
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public addRemainingOperationsListener(
|
||||
listener: (remainingOperations: number) => void
|
||||
): void {
|
||||
|
|
@ -68,10 +90,42 @@ export class Syncer {
|
|||
relativePath: RelativePath,
|
||||
updateTime: Date
|
||||
): Promise<void> {
|
||||
let resolve:
|
||||
| undefined
|
||||
| ((metadata: DocumentMetadata | undefined) => void) = undefined;
|
||||
|
||||
const creationPromise = new Promise<DocumentMetadata | undefined>(
|
||||
(r) => (resolve = r)
|
||||
);
|
||||
|
||||
await this.database.setDocumentPromise({
|
||||
relativePath,
|
||||
promise: creationPromise
|
||||
});
|
||||
|
||||
await this.syncQueue.add(async () => {
|
||||
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
|
||||
resolve!(
|
||||
await this.internalSyncer.unrestrictedSyncLocallyCreatedFile(
|
||||
relativePath,
|
||||
updateTime
|
||||
)
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
public async syncLocallyDeletedFile(
|
||||
relativePath: RelativePath
|
||||
): Promise<void> {
|
||||
let metadata = this.database.getDocument(relativePath);
|
||||
if (metadata !== undefined && !(metadata instanceof Promise)) {
|
||||
metadata = Promise.resolve(metadata);
|
||||
}
|
||||
|
||||
await this.syncQueue.add(async () =>
|
||||
this.internalSyncer.unrestrictedSyncLocallyCreatedFile(
|
||||
this.internalSyncer.unrestrictedSyncLocallyDeletedFile(
|
||||
relativePath,
|
||||
updateTime
|
||||
metadata
|
||||
)
|
||||
);
|
||||
}
|
||||
|
|
@ -81,8 +135,25 @@ export class Syncer {
|
|||
relativePath: RelativePath;
|
||||
updateTime: Date;
|
||||
}): Promise<void> {
|
||||
if (args.oldPath === args.relativePath) {
|
||||
throw new Error(
|
||||
`Old path and new path are the same: ${args.oldPath}`
|
||||
);
|
||||
}
|
||||
|
||||
if (args.oldPath !== undefined) {
|
||||
await this.database.move(args.oldPath, args.relativePath);
|
||||
}
|
||||
|
||||
let metadata = this.database.getDocument(args.relativePath);
|
||||
if (metadata !== undefined && !(metadata instanceof Promise)) {
|
||||
metadata = Promise.resolve(metadata);
|
||||
}
|
||||
await this.syncQueue.add(async () =>
|
||||
this.internalSyncer.unrestrictedSyncLocallyUpdatedFile(args)
|
||||
this.internalSyncer.unrestrictedSyncLocallyUpdatedFile({
|
||||
...args,
|
||||
metadata
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
|
|
@ -90,14 +161,6 @@ export class Syncer {
|
|||
return this.syncQueue.onEmpty();
|
||||
}
|
||||
|
||||
public async syncLocallyDeletedFile(
|
||||
relativePath: RelativePath
|
||||
): Promise<void> {
|
||||
await this.syncQueue.add(async () =>
|
||||
this.internalSyncer.unrestrictedSyncLocallyDeletedFile(relativePath)
|
||||
);
|
||||
}
|
||||
|
||||
public async scheduleSyncForOfflineChanges(): Promise<void> {
|
||||
if (!this.settings.getSettings().isSyncEnabled) {
|
||||
this.logger.debug(
|
||||
|
|
@ -178,32 +241,50 @@ export class Syncer {
|
|||
|
||||
// This includes renamed files for now
|
||||
let locallyPossiblyDeletedFiles = [
|
||||
...this.database.getDocuments().entries()
|
||||
...this.database.resolvedDocuments
|
||||
].filter(([path, _]) => !allLocalFiles.includes(path));
|
||||
|
||||
await Promise.all(
|
||||
const updates = Promise.all(
|
||||
allLocalFiles.map(async (relativePath) =>
|
||||
this.syncQueue.add(async () => {
|
||||
const metadata = this.database.getDocument(relativePath);
|
||||
const metadata =
|
||||
this.database.getResolvedDocument(relativePath);
|
||||
|
||||
if (metadata) {
|
||||
this.logger.debug(
|
||||
`Document ${relativePath} might have been updated locally, scheduling sync to validate and update it`
|
||||
);
|
||||
const updateTime =
|
||||
await Syncer.forgivingFileNotFoundWrapper(
|
||||
async () =>
|
||||
this.operations.getModificationTime(
|
||||
relativePath
|
||||
),
|
||||
this.logger
|
||||
);
|
||||
if (updateTime === undefined) {
|
||||
return;
|
||||
}
|
||||
|
||||
return this.internalSyncer.unrestrictedSyncLocallyUpdatedFile(
|
||||
{
|
||||
relativePath,
|
||||
updateTime:
|
||||
await this.operations.getModificationTime(
|
||||
relativePath
|
||||
)
|
||||
updateTime,
|
||||
metadata: Promise.resolve(metadata)
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
// Perhaps the file has been moved. Let's check by looking at the deleted files
|
||||
const contentBytes =
|
||||
await this.operations.read(relativePath);
|
||||
await Syncer.forgivingFileNotFoundWrapper(
|
||||
async () => this.operations.read(relativePath),
|
||||
this.logger
|
||||
);
|
||||
if (contentBytes === undefined) {
|
||||
return;
|
||||
}
|
||||
|
||||
const contentHash = hash(contentBytes);
|
||||
|
||||
// todo: make this smarter so that offline files can be renamed & edited at the same time
|
||||
|
|
@ -221,14 +302,29 @@ export class Syncer {
|
|||
this.logger.debug(
|
||||
`Document '${originalFile[0]}' was not found under its current path in the database but was found under a different path (${relativePath}), scheduling sync to move it`
|
||||
);
|
||||
|
||||
const updateTime =
|
||||
await Syncer.forgivingFileNotFoundWrapper(
|
||||
async () =>
|
||||
this.operations.getModificationTime(
|
||||
relativePath
|
||||
),
|
||||
this.logger
|
||||
);
|
||||
if (updateTime === undefined) {
|
||||
return;
|
||||
}
|
||||
|
||||
return this.internalSyncer.unrestrictedSyncLocallyUpdatedFile(
|
||||
{
|
||||
oldPath: originalFile[0],
|
||||
relativePath: relativePath,
|
||||
updateTime:
|
||||
await this.operations.getModificationTime(
|
||||
relativePath,
|
||||
updateTime,
|
||||
metadata: Promise.resolve(
|
||||
this.database.getResolvedDocument(
|
||||
relativePath
|
||||
),
|
||||
)
|
||||
),
|
||||
optimisations: {
|
||||
contentBytes,
|
||||
contentHash
|
||||
|
|
@ -240,15 +336,26 @@ export class Syncer {
|
|||
this.logger.debug(
|
||||
`Document ${relativePath} not found in database, scheduling sync to create it`
|
||||
);
|
||||
const updateTime =
|
||||
await Syncer.forgivingFileNotFoundWrapper(
|
||||
async () =>
|
||||
this.operations.getModificationTime(
|
||||
relativePath
|
||||
),
|
||||
this.logger
|
||||
);
|
||||
if (updateTime === undefined) {
|
||||
return;
|
||||
}
|
||||
return this.internalSyncer.unrestrictedSyncLocallyCreatedFile(
|
||||
relativePath,
|
||||
await this.operations.getModificationTime(relativePath)
|
||||
updateTime
|
||||
);
|
||||
})
|
||||
)
|
||||
);
|
||||
|
||||
await Promise.all(
|
||||
const deletes = Promise.all(
|
||||
locallyPossiblyDeletedFiles.map(async ([relativePath, _]) => {
|
||||
this.logger.debug(
|
||||
`Document ${relativePath} has been deleted locally, scheduling sync to delete it`
|
||||
|
|
@ -265,6 +372,8 @@ export class Syncer {
|
|||
return this.syncLocallyDeletedFile(relativePath);
|
||||
})
|
||||
);
|
||||
|
||||
await Promise.all([updates, deletes]);
|
||||
}
|
||||
|
||||
private async internalApplyRemoteChangesLocally(): Promise<void> {
|
||||
|
|
@ -280,9 +389,15 @@ export class Syncer {
|
|||
this.logger.info("Applying remote changes locally");
|
||||
|
||||
await Promise.all(
|
||||
remote.latestDocuments.map(async (remoteDocument) =>
|
||||
this.syncRemotelyUpdatedFile(remoteDocument)
|
||||
)
|
||||
remote.latestDocuments
|
||||
.filter(
|
||||
(remoteDocument) =>
|
||||
remoteDocument.vaultUpdateId >
|
||||
(this.database.getDocumentByDocumentId(
|
||||
remoteDocument.documentId
|
||||
)?.[1].parentVersionId ?? -1)
|
||||
)
|
||||
.map(this.syncRemotelyUpdatedFile.bind(this))
|
||||
);
|
||||
|
||||
const lastSeenUpdateId = this.database.getLastSeenUpdateId();
|
||||
|
|
|
|||
|
|
@ -1,19 +1,23 @@
|
|||
import type { Database, RelativePath } from "../persistence/database";
|
||||
import type {
|
||||
Database,
|
||||
DocumentMetadata,
|
||||
RelativePath
|
||||
} from "../persistence/database";
|
||||
|
||||
import type { SyncService } from "src/services/sync-service";
|
||||
import type { Logger } from "src/tracing/logger";
|
||||
import type { SyncHistory } from "src/tracing/sync-history";
|
||||
import { SyncSource, SyncStatus, SyncType } from "src/tracing/sync-history";
|
||||
import { hash } from "src/utils/hash";
|
||||
import { EMPTY_HASH, hash } from "src/utils/hash";
|
||||
import type { components } from "src/services/types";
|
||||
import { deserialize } from "src/utils/deserialize";
|
||||
import type { Settings } from "src/persistence/settings";
|
||||
import type { FileOperations } from "src/file-operations/file-operations";
|
||||
import { FileNotFoundError } from "src/file-operations/safe-filesystem-operations";
|
||||
import { DocumentLocks } from "./document-locks";
|
||||
import { DocumentLocks } from "../file-operations/document-locks";
|
||||
|
||||
export class UnrestrictedSyncer {
|
||||
private readonly locks = new DocumentLocks();
|
||||
private readonly locks: DocumentLocks;
|
||||
|
||||
public constructor(
|
||||
private readonly logger: Logger,
|
||||
|
|
@ -22,7 +26,9 @@ export class UnrestrictedSyncer {
|
|||
private readonly syncService: SyncService,
|
||||
private readonly operations: FileOperations,
|
||||
private readonly history: SyncHistory
|
||||
) {}
|
||||
) {
|
||||
this.locks = new DocumentLocks(logger);
|
||||
}
|
||||
|
||||
public async unrestrictedSyncLocallyCreatedFile(
|
||||
relativePath: RelativePath,
|
||||
|
|
@ -31,96 +37,132 @@ export class UnrestrictedSyncer {
|
|||
contentBytes?: Uint8Array;
|
||||
contentHash?: string;
|
||||
}
|
||||
): Promise<void> {
|
||||
await this.executeWhileHoldingFileLock(
|
||||
): Promise<DocumentMetadata | undefined> {
|
||||
return this.executeSync(
|
||||
[relativePath],
|
||||
SyncType.CREATE,
|
||||
SyncSource.PUSH,
|
||||
async () => {
|
||||
const localMetadata = this.database.getDocument(relativePath);
|
||||
|
||||
if (
|
||||
(await this.operations.getFileSize(relativePath)) / // this can throw FileNotFoundError
|
||||
1024 /
|
||||
1024 >
|
||||
this.settings.getSettings().maxFileSizeMB
|
||||
!(localMetadata instanceof Promise) &&
|
||||
localMetadata &&
|
||||
!localMetadata.isDeleted
|
||||
) {
|
||||
this.history.addHistoryEntry({
|
||||
status: SyncStatus.ERROR,
|
||||
relativePath,
|
||||
message: `File size exceeds the maximum file size limit of ${
|
||||
this.settings.getSettings().maxFileSizeMB
|
||||
}MB`,
|
||||
type: SyncType.CREATE
|
||||
});
|
||||
this.logger.debug(
|
||||
`Document metadata already exists for ${relativePath}, it must have been downloaded from the server`
|
||||
);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
const contentBytes =
|
||||
optimisations?.contentBytes ??
|
||||
(await this.operations.read(relativePath)); // this can throw FileNotFoundError
|
||||
let contentHash =
|
||||
const contentHash =
|
||||
optimisations?.contentHash ?? hash(contentBytes);
|
||||
|
||||
const localMetadata = this.database.getDocument(relativePath);
|
||||
if (localMetadata) {
|
||||
this.logger.debug(
|
||||
`Document metadata already exists for ${relativePath}, it must have been downloaded from the server`
|
||||
);
|
||||
|
||||
if (localMetadata.hash === contentHash) {
|
||||
this.history.addHistoryEntry({
|
||||
status: SyncStatus.NO_OP,
|
||||
relativePath,
|
||||
message: `File hash matches with last synced version, no need to sync`,
|
||||
type: SyncType.UPDATE
|
||||
});
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
const response = await this.syncService.create({
|
||||
relativePath,
|
||||
contentBytes,
|
||||
createdDate: updateTime
|
||||
});
|
||||
|
||||
const currentMetadata =
|
||||
this.database.getDocumentByIdentity(localMetadata);
|
||||
if (!currentMetadata) {
|
||||
throw new Error(
|
||||
`Document metadata for ${relativePath} not found after creation`
|
||||
);
|
||||
}
|
||||
|
||||
this.history.addHistoryEntry({
|
||||
status: SyncStatus.SUCCESS,
|
||||
source: SyncSource.PUSH,
|
||||
relativePath,
|
||||
relativePath: currentMetadata[0],
|
||||
message: `Successfully uploaded locally created file`,
|
||||
type: SyncType.CREATE
|
||||
});
|
||||
|
||||
// The response can't have a different relative path than the one we sent
|
||||
// because the relative path is the key when finding existing documents
|
||||
// when a create request is sent.
|
||||
|
||||
if (response.type === "MergingUpdate") {
|
||||
const responseBytes = deserialize(response.contentBase64);
|
||||
contentHash = hash(responseBytes);
|
||||
|
||||
await this.operations.write(
|
||||
relativePath,
|
||||
contentBytes,
|
||||
responseBytes
|
||||
);
|
||||
this.history.addHistoryEntry({
|
||||
status: SyncStatus.SUCCESS,
|
||||
source: SyncSource.PULL,
|
||||
relativePath,
|
||||
message: `The file we created locally has already existed remotely, so we have merged them`,
|
||||
type: SyncType.UPDATE
|
||||
});
|
||||
}
|
||||
const newMetadata = {
|
||||
documentId: response.documentId,
|
||||
parentVersionId: response.vaultUpdateId,
|
||||
hash: contentHash,
|
||||
isDeleted: false
|
||||
};
|
||||
|
||||
await this.database.setDocument({
|
||||
documentId: response.documentId,
|
||||
relativePath: response.relativePath,
|
||||
parentVersionId: response.vaultUpdateId,
|
||||
hash: contentHash
|
||||
relativePath: currentMetadata[0],
|
||||
...newMetadata
|
||||
});
|
||||
|
||||
await this.tryIncrementVaultUpdateId(response.vaultUpdateId);
|
||||
|
||||
return newMetadata;
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
public async unrestrictedSyncLocallyDeletedFile(
|
||||
relativePath: RelativePath,
|
||||
metadata: Promise<DocumentMetadata | undefined> | undefined
|
||||
): Promise<void> {
|
||||
await this.executeSync(
|
||||
[relativePath],
|
||||
SyncType.DELETE,
|
||||
SyncSource.PUSH,
|
||||
async () => {
|
||||
const localMetadata =
|
||||
metadata !== undefined
|
||||
? await metadata
|
||||
: this.database.getResolvedDocument(relativePath);
|
||||
|
||||
if (!localMetadata || localMetadata.isDeleted) {
|
||||
this.logger.info(
|
||||
`Locally deleted file hasn't been uploaded yet, so there's no need to delete it on the remote server`
|
||||
);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
const response = await this.syncService.delete({
|
||||
documentId: localMetadata.documentId,
|
||||
relativePath,
|
||||
createdDate: new Date() // We got the event now, so it must have been deleted just now
|
||||
});
|
||||
|
||||
this.history.addHistoryEntry({
|
||||
status: SyncStatus.SUCCESS,
|
||||
source: SyncSource.PUSH,
|
||||
relativePath,
|
||||
message: `Successfully deleted locally deleted file on the remote server`,
|
||||
type: SyncType.DELETE
|
||||
});
|
||||
|
||||
const currentMetadata = this.database.getDocumentByDocumentId(
|
||||
localMetadata.documentId
|
||||
);
|
||||
|
||||
if (!currentMetadata || currentMetadata[1].isDeleted) {
|
||||
this.logger.info(
|
||||
`No metadata found for deleted file, '${relativePath}' must have been deleted by another operation`
|
||||
);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
await this.operations.delete(currentMetadata[0]);
|
||||
|
||||
// We have to have a record of the delete in case there's an in-flight update for the same
|
||||
// document which finishes after the delete has succeeded and would introduce a phantom metadata record.
|
||||
await this.database.setDocument({
|
||||
relativePath: currentMetadata[0],
|
||||
documentId: response.documentId,
|
||||
parentVersionId: response.vaultUpdateId,
|
||||
hash: EMPTY_HASH,
|
||||
isDeleted: true
|
||||
});
|
||||
}
|
||||
);
|
||||
}
|
||||
|
|
@ -128,53 +170,34 @@ export class UnrestrictedSyncer {
|
|||
public async unrestrictedSyncLocallyUpdatedFile({
|
||||
oldPath,
|
||||
relativePath,
|
||||
metadata,
|
||||
updateTime,
|
||||
optimisations
|
||||
}: {
|
||||
oldPath?: RelativePath;
|
||||
relativePath: RelativePath;
|
||||
metadata: Promise<DocumentMetadata | undefined> | undefined;
|
||||
updateTime: Date;
|
||||
optimisations?: {
|
||||
contentBytes?: Uint8Array;
|
||||
contentHash?: string;
|
||||
};
|
||||
}): Promise<void> {
|
||||
await this.executeWhileHoldingFileLock(
|
||||
await this.executeSync(
|
||||
[oldPath, relativePath].filter((path) => path !== undefined),
|
||||
SyncType.UPDATE,
|
||||
SyncSource.PUSH,
|
||||
async () => {
|
||||
// Check the new path first in case the metadata has been already moved
|
||||
let localMetadata = this.database.getDocument(relativePath);
|
||||
let metadataPath = relativePath;
|
||||
const localMetadata =
|
||||
metadata !== undefined
|
||||
? await metadata
|
||||
: this.database.getResolvedDocument(relativePath);
|
||||
|
||||
if (localMetadata === undefined && oldPath !== undefined) {
|
||||
localMetadata = this.database.getDocument(oldPath);
|
||||
metadataPath = oldPath;
|
||||
}
|
||||
|
||||
if (!localMetadata) {
|
||||
if (!localMetadata || localMetadata.isDeleted) {
|
||||
// It's fine, a subsequent sync operation must have dealt with this
|
||||
return;
|
||||
}
|
||||
|
||||
if (
|
||||
(await this.operations.getFileSize(relativePath)) / // this can throw FileNotFoundError
|
||||
1024 /
|
||||
1024 >
|
||||
this.settings.getSettings().maxFileSizeMB
|
||||
) {
|
||||
this.history.addHistoryEntry({
|
||||
status: SyncStatus.ERROR,
|
||||
relativePath,
|
||||
message: `File size exceeds the maximum file size limit of ${
|
||||
this.settings.getSettings().maxFileSizeMB
|
||||
}MB`,
|
||||
type: SyncType.CREATE
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
const contentBytes =
|
||||
optimisations?.contentBytes ??
|
||||
(await this.operations.read(relativePath)); // this can throw FileNotFoundError
|
||||
|
|
@ -186,23 +209,48 @@ export class UnrestrictedSyncer {
|
|||
localMetadata.hash === contentHash &&
|
||||
oldPath === undefined
|
||||
) {
|
||||
this.history.addHistoryEntry({
|
||||
status: SyncStatus.NO_OP,
|
||||
relativePath,
|
||||
message: `File hash matches with last synced version, no need to sync`,
|
||||
type: SyncType.UPDATE
|
||||
});
|
||||
this.logger.debug(
|
||||
`File hash of ${relativePath} matches with last synced version and the path hasn't changed; no need to sync`
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
// Re-fetch based on the documentId instead of the relativePath because
|
||||
// the relativePath might have changed since this operation was scheduled
|
||||
let latestMetadata = this.database.getDocumentByDocumentId(
|
||||
localMetadata.documentId
|
||||
);
|
||||
if (!latestMetadata || latestMetadata[1].isDeleted) {
|
||||
// It's fine, a subsequent sync operation must have dealt with this
|
||||
return;
|
||||
}
|
||||
|
||||
const response = await this.syncService.put({
|
||||
documentId: localMetadata.documentId,
|
||||
parentVersionId: localMetadata.parentVersionId,
|
||||
relativePath,
|
||||
documentId: latestMetadata[1].documentId,
|
||||
parentVersionId: latestMetadata[1].parentVersionId,
|
||||
relativePath: latestMetadata[0],
|
||||
contentBytes,
|
||||
createdDate: updateTime
|
||||
});
|
||||
|
||||
latestMetadata = this.database.getDocumentByDocumentId(
|
||||
response.documentId
|
||||
);
|
||||
|
||||
if (!latestMetadata || latestMetadata[1].isDeleted) {
|
||||
// The document has been deleted since this operation was scheduled
|
||||
return;
|
||||
}
|
||||
|
||||
if (
|
||||
latestMetadata[1].parentVersionId >= response.vaultUpdateId
|
||||
) {
|
||||
this.logger.debug(
|
||||
`Document ${relativePath} is already more up to date than the fetched version`
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
this.history.addHistoryEntry({
|
||||
status: SyncStatus.SUCCESS,
|
||||
source: SyncSource.PUSH,
|
||||
|
|
@ -212,11 +260,7 @@ export class UnrestrictedSyncer {
|
|||
});
|
||||
|
||||
if (response.isDeleted) {
|
||||
await this.operations.remove(oldPath ?? relativePath);
|
||||
await this.database.removeDocument(oldPath ?? relativePath);
|
||||
await this.tryIncrementVaultUpdateId(
|
||||
response.vaultUpdateId
|
||||
);
|
||||
await this.operations.delete(relativePath);
|
||||
|
||||
this.history.addHistoryEntry({
|
||||
status: SyncStatus.SUCCESS,
|
||||
|
|
@ -227,110 +271,69 @@ export class UnrestrictedSyncer {
|
|||
type: SyncType.DELETE
|
||||
});
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
if (
|
||||
response.relativePath != relativePath &&
|
||||
response.relativePath != oldPath
|
||||
) {
|
||||
await this.locks.waitForDocumentLock(response.relativePath);
|
||||
}
|
||||
|
||||
try {
|
||||
if (response.relativePath != relativePath) {
|
||||
// TODO: this can fail, that's bad
|
||||
await this.operations.move(
|
||||
// this can throw FileNotFoundError
|
||||
relativePath,
|
||||
response.relativePath,
|
||||
response.documentId
|
||||
);
|
||||
}
|
||||
|
||||
if (response.type === "MergingUpdate") {
|
||||
const responseBytes = deserialize(
|
||||
response.contentBase64
|
||||
);
|
||||
contentHash = hash(responseBytes);
|
||||
|
||||
await this.operations.write(
|
||||
response.relativePath,
|
||||
contentBytes,
|
||||
responseBytes
|
||||
);
|
||||
|
||||
this.history.addHistoryEntry({
|
||||
status: SyncStatus.SUCCESS,
|
||||
source: SyncSource.PULL,
|
||||
relativePath,
|
||||
message: `The file we updated had been updated remotely, so we downloaded the merged version`,
|
||||
type: SyncType.UPDATE
|
||||
});
|
||||
}
|
||||
|
||||
if (metadataPath !== response.relativePath) {
|
||||
await this.database.updatePath(
|
||||
metadataPath,
|
||||
response.relativePath
|
||||
);
|
||||
}
|
||||
await this.database.setDocument({
|
||||
documentId: localMetadata.documentId,
|
||||
relativePath: response.relativePath,
|
||||
documentId: response.documentId,
|
||||
relativePath: latestMetadata[0],
|
||||
parentVersionId: response.vaultUpdateId,
|
||||
hash: contentHash
|
||||
hash: EMPTY_HASH,
|
||||
isDeleted: true
|
||||
});
|
||||
|
||||
await this.tryIncrementVaultUpdateId(
|
||||
response.vaultUpdateId
|
||||
);
|
||||
} finally {
|
||||
if (
|
||||
response.relativePath != relativePath &&
|
||||
response.relativePath != oldPath
|
||||
) {
|
||||
this.locks.unlockDocument(response.relativePath);
|
||||
}
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
public async unrestrictedSyncLocallyDeletedFile(
|
||||
relativePath: RelativePath
|
||||
): Promise<void> {
|
||||
await this.executeWhileHoldingFileLock(
|
||||
[relativePath],
|
||||
SyncType.DELETE,
|
||||
SyncSource.PUSH,
|
||||
async () => {
|
||||
const localMetadata = this.database.getDocument(relativePath);
|
||||
if (!localMetadata) {
|
||||
this.history.addHistoryEntry({
|
||||
status: SyncStatus.NO_OP,
|
||||
relativePath,
|
||||
message: `Locally deleted file hasn't been uploaded yet, so there's no need to delete it on the remote server`,
|
||||
type: SyncType.DELETE
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
await this.syncService.delete({
|
||||
documentId: localMetadata.documentId,
|
||||
relativePath,
|
||||
createdDate: new Date() // We got the event now, so it must have been deleted just now
|
||||
if (
|
||||
latestMetadata[1].parentVersionId >= response.vaultUpdateId
|
||||
) {
|
||||
this.logger.debug(
|
||||
`Document ${relativePath} is already more up to date than the fetched version`
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
if (response.relativePath != relativePath) {
|
||||
await this.operations.move(
|
||||
latestMetadata[0],
|
||||
response.relativePath,
|
||||
response.documentId
|
||||
); // this can throw FileNotFoundError
|
||||
}
|
||||
|
||||
if (response.type === "MergingUpdate") {
|
||||
const responseBytes = deserialize(response.contentBase64);
|
||||
contentHash = hash(responseBytes);
|
||||
|
||||
await this.operations.write(
|
||||
response.relativePath,
|
||||
contentBytes,
|
||||
responseBytes
|
||||
);
|
||||
|
||||
this.history.addHistoryEntry({
|
||||
status: SyncStatus.SUCCESS,
|
||||
source: SyncSource.PULL,
|
||||
relativePath,
|
||||
message: `The file we updated had been updated remotely, so we downloaded the merged version`,
|
||||
type: SyncType.UPDATE
|
||||
});
|
||||
}
|
||||
|
||||
await this.database.setDocument({
|
||||
documentId: response.documentId,
|
||||
relativePath:
|
||||
response.relativePath != relativePath
|
||||
? response.relativePath
|
||||
: latestMetadata[0],
|
||||
parentVersionId: response.vaultUpdateId,
|
||||
hash: contentHash,
|
||||
isDeleted: response.isDeleted
|
||||
});
|
||||
|
||||
this.history.addHistoryEntry({
|
||||
status: SyncStatus.SUCCESS,
|
||||
source: SyncSource.PUSH,
|
||||
relativePath,
|
||||
message: `Successfully deleted locally deleted file on the remote server`,
|
||||
type: SyncType.DELETE
|
||||
});
|
||||
|
||||
await this.database.removeDocument(relativePath);
|
||||
await this.tryIncrementVaultUpdateId(response.vaultUpdateId);
|
||||
}
|
||||
);
|
||||
}
|
||||
|
|
@ -338,56 +341,68 @@ export class UnrestrictedSyncer {
|
|||
public async unrestrictedSyncRemotelyUpdatedFile(
|
||||
remoteVersion: components["schemas"]["DocumentVersionWithoutContent"]
|
||||
): Promise<void> {
|
||||
await this.executeWhileHoldingFileLock(
|
||||
await this.executeSync(
|
||||
[remoteVersion.relativePath],
|
||||
SyncType.UPDATE,
|
||||
SyncSource.PULL,
|
||||
async () => {
|
||||
let localMetadata = this.database.getDocumentByDocumentId(
|
||||
const content = (
|
||||
await this.syncService.get({
|
||||
documentId: remoteVersion.documentId
|
||||
})
|
||||
).contentBase64;
|
||||
const contentBytes = deserialize(content);
|
||||
const contentHash = hash(contentBytes);
|
||||
|
||||
const localMetadata = this.database.getDocumentByDocumentId(
|
||||
remoteVersion.documentId
|
||||
);
|
||||
|
||||
if (
|
||||
localMetadata &&
|
||||
localMetadata[0] !== remoteVersion.relativePath
|
||||
localMetadata?.[1].documentId ===
|
||||
remoteVersion.documentId &&
|
||||
localMetadata[1].parentVersionId >
|
||||
remoteVersion.vaultUpdateId
|
||||
) {
|
||||
await this.locks.waitForDocumentLock(localMetadata[0]);
|
||||
this.logger.info(
|
||||
`Document ${remoteVersion.relativePath} is already up to date`
|
||||
);
|
||||
return;
|
||||
}
|
||||
// Waiting for the new lock might take a while so we need to fetch the database
|
||||
// entry again in case it's changed.
|
||||
localMetadata = this.database.getDocumentByDocumentId(
|
||||
remoteVersion.documentId
|
||||
);
|
||||
|
||||
if (!localMetadata) {
|
||||
const localBytes = await this.operations.read(
|
||||
remoteVersion.relativePath
|
||||
); // this can throw FileNotFoundError
|
||||
const localHash = hash(localBytes);
|
||||
|
||||
if (localHash !== localMetadata?.[1].hash) {
|
||||
this.logger.info(
|
||||
`Document ${remoteVersion.relativePath} has pending local changes, so we shouldn't update it here`
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
if (!localMetadata || localMetadata[1].isDeleted) {
|
||||
if (remoteVersion.isDeleted) {
|
||||
this.history.addHistoryEntry({
|
||||
status: SyncStatus.NO_OP,
|
||||
source: SyncSource.PULL,
|
||||
relativePath: remoteVersion.relativePath,
|
||||
message: `Remotely deleted file hasn't been synced yet, so there's no need to delete it locally`,
|
||||
type: SyncType.DELETE
|
||||
});
|
||||
this.logger.info(
|
||||
`Remotely deleted file hasn't been synced yet, so there's no need to delete it locally`
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
const content = (
|
||||
await this.syncService.get({
|
||||
documentId: remoteVersion.documentId
|
||||
})
|
||||
).contentBase64;
|
||||
const contentBytes = deserialize(content);
|
||||
|
||||
await this.operations.create(
|
||||
remoteVersion.relativePath,
|
||||
contentBytes
|
||||
contentBytes,
|
||||
remoteVersion.documentId
|
||||
);
|
||||
|
||||
await this.database.setDocument({
|
||||
documentId: remoteVersion.documentId,
|
||||
relativePath: remoteVersion.relativePath,
|
||||
parentVersionId: remoteVersion.vaultUpdateId,
|
||||
hash: hash(contentBytes)
|
||||
hash: hash(contentBytes),
|
||||
isDeleted: remoteVersion.isDeleted
|
||||
});
|
||||
|
||||
this.history.addHistoryEntry({
|
||||
status: SyncStatus.SUCCESS,
|
||||
source: SyncSource.PULL,
|
||||
|
|
@ -399,7 +414,6 @@ export class UnrestrictedSyncer {
|
|||
}
|
||||
|
||||
const [relativePath, metadata] = localMetadata;
|
||||
|
||||
if (remoteVersion.vaultUpdateId <= metadata.parentVersionId) {
|
||||
this.logger.debug(
|
||||
`Document ${relativePath} is already up to date`
|
||||
|
|
@ -407,89 +421,70 @@ export class UnrestrictedSyncer {
|
|||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
if (remoteVersion.isDeleted) {
|
||||
await this.operations.remove(relativePath);
|
||||
await this.database.removeDocument(relativePath);
|
||||
if (remoteVersion.isDeleted) {
|
||||
await this.operations.delete(relativePath);
|
||||
|
||||
this.history.addHistoryEntry({
|
||||
status: SyncStatus.SUCCESS,
|
||||
source: SyncSource.PULL,
|
||||
relativePath: remoteVersion.relativePath,
|
||||
message: `Successfully deleted remotely deleted file locally`,
|
||||
type: SyncType.DELETE
|
||||
});
|
||||
} else {
|
||||
// TODO: this can fail, that's bad
|
||||
const currentContent =
|
||||
await this.operations.read(relativePath); // this can throw FileNotFoundError
|
||||
const currentHash = hash(currentContent);
|
||||
this.history.addHistoryEntry({
|
||||
status: SyncStatus.SUCCESS,
|
||||
source: SyncSource.PULL,
|
||||
relativePath: remoteVersion.relativePath,
|
||||
message: `Successfully deleted remotely deleted file locally`,
|
||||
type: SyncType.DELETE
|
||||
});
|
||||
|
||||
if (currentHash !== metadata.hash) {
|
||||
this.logger.info(
|
||||
`Document ${relativePath} has been updated both remotely and locally, letting the local file update event handle it`
|
||||
);
|
||||
return;
|
||||
}
|
||||
await this.database.setDocument({
|
||||
documentId: remoteVersion.documentId,
|
||||
relativePath: relativePath,
|
||||
parentVersionId: remoteVersion.vaultUpdateId,
|
||||
hash: EMPTY_HASH,
|
||||
isDeleted: true
|
||||
});
|
||||
|
||||
const content = (
|
||||
await this.syncService.get({
|
||||
documentId: remoteVersion.documentId
|
||||
})
|
||||
).contentBase64;
|
||||
const contentBytes = deserialize(content);
|
||||
const contentHash = hash(contentBytes);
|
||||
|
||||
if (relativePath !== remoteVersion.relativePath) {
|
||||
// TODO: this can fail, that's bad
|
||||
await this.operations.move(
|
||||
// this can throw FileNotFoundError
|
||||
relativePath,
|
||||
remoteVersion.relativePath,
|
||||
remoteVersion.documentId
|
||||
);
|
||||
|
||||
await this.database.updatePath(
|
||||
relativePath,
|
||||
remoteVersion.relativePath
|
||||
);
|
||||
}
|
||||
|
||||
await this.operations.write(
|
||||
remoteVersion.relativePath,
|
||||
currentContent,
|
||||
contentBytes
|
||||
);
|
||||
await this.database.setDocument({
|
||||
documentId: remoteVersion.documentId,
|
||||
relativePath: remoteVersion.relativePath,
|
||||
parentVersionId: remoteVersion.vaultUpdateId,
|
||||
hash: contentHash
|
||||
});
|
||||
|
||||
this.history.addHistoryEntry({
|
||||
status: SyncStatus.SUCCESS,
|
||||
source: SyncSource.PULL,
|
||||
relativePath: remoteVersion.relativePath,
|
||||
message: `Successfully updated remotely updated file locally`,
|
||||
type: SyncType.UPDATE
|
||||
});
|
||||
}
|
||||
} finally {
|
||||
if (relativePath !== remoteVersion.relativePath) {
|
||||
this.locks.unlockDocument(relativePath);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
if (relativePath !== remoteVersion.relativePath) {
|
||||
// TODO: this can fail, that's bad
|
||||
await this.operations.move(
|
||||
// this can throw FileNotFoundError
|
||||
relativePath,
|
||||
remoteVersion.relativePath,
|
||||
remoteVersion.documentId
|
||||
);
|
||||
}
|
||||
|
||||
// todo: why
|
||||
await this.operations.create(
|
||||
remoteVersion.relativePath,
|
||||
contentBytes,
|
||||
remoteVersion.documentId
|
||||
);
|
||||
|
||||
await this.database.setDocument({
|
||||
documentId: remoteVersion.documentId,
|
||||
relativePath: remoteVersion.relativePath,
|
||||
parentVersionId: remoteVersion.vaultUpdateId,
|
||||
hash: contentHash,
|
||||
isDeleted: remoteVersion.isDeleted
|
||||
});
|
||||
|
||||
this.history.addHistoryEntry({
|
||||
status: SyncStatus.SUCCESS,
|
||||
source: SyncSource.PULL,
|
||||
relativePath: remoteVersion.relativePath,
|
||||
message: `Successfully updated remotely updated file locally`,
|
||||
type: SyncType.UPDATE
|
||||
});
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
public async executeWhileHoldingFileLock(
|
||||
public async executeSync<T>(
|
||||
lockedPaths: RelativePath[],
|
||||
syncType: SyncType,
|
||||
syncSource: SyncSource,
|
||||
fn: () => Promise<void>
|
||||
): Promise<void> {
|
||||
fn: () => Promise<T>
|
||||
): Promise<T | undefined> {
|
||||
const relativePath = lockedPaths[lockedPaths.length - 1];
|
||||
|
||||
if (!this.settings.getSettings().isSyncEnabled) {
|
||||
|
|
@ -498,31 +493,47 @@ export class UnrestrictedSyncer {
|
|||
);
|
||||
return;
|
||||
}
|
||||
|
||||
if (!this.operations.isFileEligibleForSync(relativePath)) {
|
||||
this.logger.info(
|
||||
`File ${relativePath} is not eligible for syncing`
|
||||
);
|
||||
this.history.addHistoryEntry({
|
||||
status: SyncStatus.ERROR,
|
||||
relativePath,
|
||||
message: `File ${relativePath} is not eligible for syncing`,
|
||||
type: syncType
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
this.logger.debug(
|
||||
`Syncing ${relativePath} (${syncSource} - ${syncType})`
|
||||
);
|
||||
|
||||
await Promise.all(
|
||||
lockedPaths.map(this.locks.waitForDocumentLock.bind(this.locks))
|
||||
);
|
||||
try {
|
||||
await fn();
|
||||
if (
|
||||
(await this.operations.exists(relativePath)) &&
|
||||
(await this.operations.getFileSize(relativePath)) / // this can throw FileNotFoundError
|
||||
1024 /
|
||||
1024 >
|
||||
this.settings.getSettings().maxFileSizeMB
|
||||
) {
|
||||
this.history.addHistoryEntry({
|
||||
status: SyncStatus.ERROR,
|
||||
relativePath,
|
||||
message: `File size exceeds the maximum file size limit of ${
|
||||
this.settings.getSettings().maxFileSizeMB
|
||||
}MB`,
|
||||
type: syncType
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
return await fn();
|
||||
} catch (e) {
|
||||
if (e instanceof FileNotFoundError) {
|
||||
// A subsequent sync operation must have been creating to deal with this
|
||||
this.history.addHistoryEntry({
|
||||
status: SyncStatus.NO_OP,
|
||||
relativePath,
|
||||
message: `Skip ${syncSource.toLocaleLowerCase()} file because it no longer exists when trying to ${syncType.toLocaleLowerCase()} it`,
|
||||
type: syncType,
|
||||
source: syncSource
|
||||
});
|
||||
this.logger.info(
|
||||
`Skip ${syncSource.toLocaleLowerCase()} file because it no longer exists when trying to ${syncType.toLocaleLowerCase()} it`
|
||||
);
|
||||
} else {
|
||||
this.history.addHistoryEntry({
|
||||
status: SyncStatus.ERROR,
|
||||
|
|
@ -533,8 +544,6 @@ export class UnrestrictedSyncer {
|
|||
});
|
||||
throw e;
|
||||
}
|
||||
} finally {
|
||||
lockedPaths.forEach(this.locks.unlockDocument.bind(this.locks));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -46,27 +46,31 @@ export class MockAgent extends MockClient {
|
|||
? "(online) "
|
||||
: "(offline)";
|
||||
const formatted = `[${this.name} ${state}] ${logLine.timestamp.toISOString()} ${logLine.level} ${logLine.message}`;
|
||||
|
||||
// HACK: we have to ensure the file has been synced if we want to change it offline without data loss
|
||||
const historyEntry = /.*History entry: (.*.md).*/.exec(
|
||||
logLine.message
|
||||
);
|
||||
|
||||
if (historyEntry) {
|
||||
this.doNotTouchWhileOffline =
|
||||
this.doNotTouchWhileOffline.filter(
|
||||
(file) => file !== historyEntry[1]
|
||||
);
|
||||
}
|
||||
switch (logLine.level) {
|
||||
case LogLevel.ERROR:
|
||||
console.error(formatted);
|
||||
|
||||
// Let's not ignore errors
|
||||
process.exit(1);
|
||||
// eslint-disable-next-line @typescript-eslint/no-floating-promises
|
||||
sleep(1000).then(() => process.exit(1));
|
||||
|
||||
break;
|
||||
case LogLevel.WARNING:
|
||||
console.warn(formatted);
|
||||
break;
|
||||
case LogLevel.INFO:
|
||||
// HACK: we have to ensure the file has been synced if we want to change it offline without data loss
|
||||
const result = /.*History entry: (.*.md).*/.exec(
|
||||
logLine.message
|
||||
);
|
||||
if (result) {
|
||||
this.doNotTouchWhileOffline =
|
||||
this.doNotTouchWhileOffline.filter(
|
||||
(file) => file !== result[1]
|
||||
);
|
||||
}
|
||||
|
||||
console.info(formatted);
|
||||
break;
|
||||
case LogLevel.DEBUG:
|
||||
|
|
@ -79,16 +83,17 @@ export class MockAgent extends MockClient {
|
|||
}
|
||||
|
||||
public async act(): Promise<void> {
|
||||
this.assertAllContentIsPresentOnce();
|
||||
|
||||
const options: (() => Promise<unknown>)[] = [
|
||||
this.createFileAction.bind(this),
|
||||
this.changeFetchChangesUpdateIntervalMsAction.bind(this)
|
||||
];
|
||||
|
||||
if (
|
||||
this.client.settings.getSettings().isSyncEnabled &&
|
||||
this.doNotTouchWhileOffline.length === 0
|
||||
) {
|
||||
options.push(this.disableSyncAction.bind(this));
|
||||
if (this.client.settings.getSettings().isSyncEnabled) {
|
||||
if (this.doNotTouchWhileOffline.length === 0) {
|
||||
options.push(this.disableSyncAction.bind(this));
|
||||
}
|
||||
} else {
|
||||
options.push(this.enableSyncAction.bind(this));
|
||||
}
|
||||
|
|
|
|||
|
|
@ -38,6 +38,8 @@ async function runTest({
|
|||
)
|
||||
);
|
||||
}
|
||||
// for debugging
|
||||
(globalThis as any).clients = clients;
|
||||
|
||||
try {
|
||||
await Promise.all(clients.map(async (client) => client.init()));
|
||||
|
|
@ -78,34 +80,32 @@ async function runTest({
|
|||
console.info(`Content check for ${client.name} passed`);
|
||||
});
|
||||
|
||||
console.info(`Test passed with ${settings}`);
|
||||
console.info(`Test passed ${settings}`);
|
||||
} catch (err) {
|
||||
console.error(`Test failed with ${settings}`);
|
||||
console.error(`Test failed ${settings}`);
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
|
||||
async function runTests(): Promise<void> {
|
||||
const agentCounts = [2, 10];
|
||||
const jitterScaleInSeconds = [0.5, 3, 0];
|
||||
const jitterScaleInSeconds = [0, 0.5, 3];
|
||||
const concurrencies = [1, 16];
|
||||
const iterations = [50, 300];
|
||||
const doDeletes = [false, true];
|
||||
const doDeletes = [false];
|
||||
|
||||
for (const agentCount of agentCounts) {
|
||||
for (const concurrency of concurrencies) {
|
||||
for (const jitter of jitterScaleInSeconds) {
|
||||
for (const iteration of iterations) {
|
||||
for (const deleteFiles of doDeletes) {
|
||||
while (true) {
|
||||
await runTest({
|
||||
agentCount,
|
||||
concurrency,
|
||||
iterations: iteration,
|
||||
doDeletes: deleteFiles,
|
||||
jitterScaleInSeconds: jitter
|
||||
});
|
||||
}
|
||||
await runTest({
|
||||
agentCount,
|
||||
concurrency,
|
||||
iterations: iteration,
|
||||
doDeletes: deleteFiles,
|
||||
jitterScaleInSeconds: jitter
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -113,11 +113,24 @@ async function runTests(): Promise<void> {
|
|||
}
|
||||
}
|
||||
|
||||
process.on("uncaughtException", async (error) => {
|
||||
console.error("Uncaught Exception:", error);
|
||||
await sleep(1000);
|
||||
process.exit(1);
|
||||
});
|
||||
|
||||
process.on("unhandledRejection", async (reason, promise) => {
|
||||
console.error("Unhandled Rejection:", reason);
|
||||
await sleep(1000);
|
||||
process.exit(1);
|
||||
});
|
||||
|
||||
runTests()
|
||||
.then(() => {
|
||||
process.exit(0);
|
||||
})
|
||||
.catch((err: unknown) => {
|
||||
.catch(async (err: unknown) => {
|
||||
console.error(err);
|
||||
await sleep(1000);
|
||||
process.exit(1);
|
||||
});
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue