This commit is contained in:
Andras Schmelczer 2025-03-15 17:15:44 +00:00
parent 2987afb20a
commit d5112a7d0f
No known key found for this signature in database
GPG key ID: FC8F2C3D3D1A718C
7 changed files with 147 additions and 272 deletions

View file

@ -17,10 +17,6 @@
- Install [`rustup`](https://rustup.rs): `curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh`
- `sudo apt install llvm -y`
- `rustup self update`
- `rustup update`
- `rustup install nightly`
- `rustup default nightly`
- `rustup component add llvm-tools-preview`
- `cargo install cargo-generate cargo-fuzz cargo-insta rustfilt cargo-binutils`
- Install [`wasm-pack`](https://rustwasm.github.io/wasm-pack/installer): `curl https://rustwasm.github.io/wasm-pack/installer/init.sh -sSf | sh`

View file

@ -2,10 +2,7 @@ import type { Logger } from "../tracing/logger";
import type { FileSystemOperations } from "./filesystem-operations";
import type { Database, RelativePath } from "../persistence/database";
import { isBinary, isFileTypeMergable, mergeText } from "sync_lib";
import {
FileNotFoundError,
SafeFileSystemOperations
} from "./safe-filesystem-operations";
import { SafeFileSystemOperations } from "./safe-filesystem-operations";
export class FileOperations {
private static readonly PARENTHESES_REGEX = / \((\d+)\)$/;
@ -67,7 +64,7 @@ export class FileOperations {
`Didn't expect ${path} to exist, deconflicting by moving it to '${deconflictedPath}'`
);
// this.database.move(path, deconflictedPath);
this.database.move(path, deconflictedPath);
await this.fs.rename(path, deconflictedPath);
} else {
await this.createParentDirectories(path);
@ -142,9 +139,9 @@ export class FileOperations {
if (oldPath === newPath) {
return;
}
await this.ensureClearPath(newPath);
this.database.move(oldPath, newPath);
await this.fs.rename(oldPath, newPath);
}

View file

@ -22,7 +22,6 @@ export interface StoredDatabase {
}
export interface DocumentRecord {
identity: symbol;
relativePath: RelativePath;
documentId: DocumentId;
metadata: DocumentMetadata | undefined;
@ -47,7 +46,6 @@ export class Database {
({ relativePath, documentId, ...metadata }) => ({
relativePath,
documentId,
identity: Symbol(),
metadata,
isDeleted: false,
updates: [],
@ -118,85 +116,33 @@ export class Database {
public setDocument(
{
documentId,
relativePath,
parentVersionId,
hash
}: {
documentId: DocumentId;
relativePath: RelativePath;
parentVersionId: VaultUpdateId;
hash: string;
},
identity?: symbol
toUpdate: DocumentRecord
): void {
if (identity !== undefined) {
const entry = this.getDocumentByIdentity(identity);
this.documents = this.documents.filter(
(doc) => doc.identity !== entry.identity
);
if (entry.relativePath !== relativePath) {
throw new Error(
"Document identity does not match the relative path"
);
}
this.documents.push({
...entry,
relativePath,
documentId,
metadata: {
parentVersionId,
hash
}
});
this.save();
return;
if (!this.documents.includes(toUpdate)) {
throw new Error("Document not found in database");
}
// We find a match based on relative path and we find one with a different document id
// meaning that two documents occupy the same path in terms of in-flight requests so we
// need to create a new parallel version.
const entry = this.getLatestDocumentByRelativePath(relativePath);
if (entry && entry.documentId !== documentId) {
this.documents.push({
// `entry` might be undefined if the document is new
identity: Symbol(),
relativePath,
documentId,
metadata: {
parentVersionId,
hash
},
isDeleted: false,
updates: [],
parallelVersion: entry.parallelVersion + 1
});
this.save();
return;
}
this.documents.push({
identity: Symbol(),
relativePath,
documentId,
metadata: {
parentVersionId,
hash
},
isDeleted: false,
updates: [],
parallelVersion: 0
});
toUpdate.metadata = { parentVersionId, hash };
this.save();
return;
}
public removeDocumentPromise(promise: Promise<void>): void {
const entry = this.getDocumentByUpdatePromise(promise);
const entry = this.documents.find(({ updates }) =>
updates.includes(promise)
);
if (entry === undefined) {
throw new Error("Document not found by update promise");
}
entry.updates = entry.updates.filter((update) => update !== promise);
// No need to save as Promises don't get serialized
}
@ -214,7 +160,7 @@ export class Database {
public async getResolvedDocumentByRelativePath(
relativePath: RelativePath,
promise: Promise<void>
): Promise<void> {
): Promise<DocumentRecord> {
const entry = this.getLatestDocumentByRelativePath(relativePath);
if (entry === undefined) {
@ -230,20 +176,21 @@ export class Database {
const currentPromises = entry.updates;
entry.updates = [...currentPromises, promise];
await Promise.all(currentPromises);
return entry;
}
public getNewResolvedDocumentByRelativePath(
public createNewPendingDocument(
documentId: DocumentId,
relativePath: RelativePath,
promise: Promise<void>
): void {
): DocumentRecord {
const previousEntry =
this.getLatestDocumentByRelativePath(relativePath);
const entry = {
relativePath,
documentId,
identity: Symbol(),
metadata: undefined,
isDeleted: false,
updates: [promise],
@ -255,18 +202,8 @@ export class Database {
this.documents.push(entry);
this.save();
}
public getDocumentByUpdatePromise(promise: Promise<void>): DocumentRecord {
const result = this.documents.find(({ updates }) =>
updates.includes(promise)
);
if (result === undefined) {
throw new Error("Document not found by update promise");
}
return result;
return entry;
}
public getDocumentByDocumentId(
@ -275,16 +212,6 @@ export class Database {
return this.documents.find(({ documentId }) => documentId === find);
}
public getDocumentByIdentity(find: symbol): DocumentRecord {
const result = this.documents.find(({ identity }) => identity === find);
if (result === undefined) {
throw new Error("Document not found by identity symbol");
}
return result;
}
public move(
oldRelativePath: RelativePath,
newRelativePath: RelativePath
@ -296,10 +223,6 @@ export class Database {
return;
}
this.documents = this.documents.filter(
({ identity }) => identity !== oldDocument.identity
);
const newDocument =
this.getLatestDocumentByRelativePath(newRelativePath);
if (newDocument !== undefined && !newDocument.isDeleted) {
@ -308,17 +231,12 @@ export class Database {
);
}
// It's either an invalid state of newDocument is pending deletion and we have
// to wait for it to complete.
this.documents.push({
...oldDocument,
relativePath: newRelativePath,
// We're in a strange state where the target of the move has just got deleted,
// however, its metadata might already have a bunch of updates queued up for
// the document at the new location. We need to keep these updates.
parallelVersion:
newDocument !== undefined ? newDocument.parallelVersion + 1 : 0
});
oldDocument.relativePath = newRelativePath;
// We're in a strange state where the target of the move has just got deleted,
// however, its metadata might already have a bunch of updates queued up for
// the document at the new location. We need to keep these updates.
oldDocument.parallelVersion =
newDocument !== undefined ? newDocument.parallelVersion + 1 : 0;
this.save();
}

View file

@ -3,8 +3,8 @@ import type { SyncService } from "../services/sync-service";
import type { Logger } from "../tracing/logger";
import type { SyncHistory } from "../tracing/sync-history";
import PQueue from "p-queue";
import { v4 as uuidv4 } from "uuid";
import { hash } from "../utils/hash";
import { v4 as uuidv4 } from "uuid";
import type { components } from "../services/types";
import type { Settings } from "../persistence/settings";
import type { FileOperations } from "../file-operations/file-operations";
@ -98,20 +98,16 @@ export class Syncer {
}
const [promise, resolve, reject] = createPromise();
const proposedDocumentId = uuidv4();
this.database.getNewResolvedDocumentByRelativePath(
proposedDocumentId,
const document = this.database.createNewPendingDocument(
uuidv4(),
relativePath,
promise
);
try {
await this.syncQueue.add(async () =>
this.internalSyncer.unrestrictedSyncLocallyCreatedFile(
proposedDocumentId,
() => this.database.getDocumentByUpdatePromise(promise)
)
this.internalSyncer.unrestrictedSyncLocallyCreatedFile(document)
);
resolve();
@ -131,16 +127,14 @@ export class Syncer {
const [promise, resolve, reject] = createPromise();
await this.database.getResolvedDocumentByRelativePath(
const document = await this.database.getResolvedDocumentByRelativePath(
relativePath,
promise
);
try {
await this.syncQueue.add(async () =>
this.internalSyncer.unrestrictedSyncLocallyDeletedFile(() =>
this.database.getDocumentByUpdatePromise(promise)
)
this.internalSyncer.unrestrictedSyncLocallyDeletedFile(document)
);
resolve();
@ -158,17 +152,13 @@ export class Syncer {
oldPath?: RelativePath;
relativePath: RelativePath;
}): Promise<void> {
if (oldPath !== undefined) {
if (
this.database.getLatestDocumentByRelativePath(oldPath)
?.isDeleted === true
) {
this.logger.debug(
`Document ${oldPath} has been deleted locally, skipping`
);
return;
}
if (
oldPath !== undefined &&
(this.database.getLatestDocumentByRelativePath(relativePath) ===
undefined ||
this.database.getLatestDocumentByRelativePath(relativePath)
?.isDeleted === true)
) {
if (oldPath === relativePath) {
throw new Error(
`Old path and new path are the same: ${oldPath}`
@ -178,10 +168,17 @@ export class Syncer {
this.database.move(oldPath, relativePath);
}
if (
this.database.getLatestDocumentByRelativePath(relativePath)
?.isDeleted === true
) {
let document =
this.database.getLatestDocumentByRelativePath(relativePath);
if (document === undefined) {
this.logger.debug(
`Cannot find document ${relativePath} in the database, skipping`
);
return;
}
if (document.isDeleted) {
this.logger.debug(
`Document ${relativePath} has been deleted locally, skipping`
);
@ -190,7 +187,7 @@ export class Syncer {
const [promise, resolve, reject] = createPromise();
await this.database.getResolvedDocumentByRelativePath(
document = await this.database.getResolvedDocumentByRelativePath(
relativePath,
promise
);
@ -199,8 +196,7 @@ export class Syncer {
await this.syncQueue.add(async () =>
this.internalSyncer.unrestrictedSyncLocallyUpdatedFile({
oldPath,
getLatestDocument: () =>
this.database.getDocumentByUpdatePromise(promise)
document
})
);
@ -299,7 +295,7 @@ export class Syncer {
private async syncRemotelyUpdatedFile(
remoteVersion: components["schemas"]["DocumentVersionWithoutContent"]
): Promise<void> {
const document = this.database.getDocumentByDocumentId(
let document = this.database.getDocumentByDocumentId(
remoteVersion.documentId
);
@ -308,15 +304,11 @@ export class Syncer {
if (document === undefined) {
await this.syncQueue.add(async () =>
this.internalSyncer.unrestrictedSyncRemotelyUpdatedFile(
remoteVersion,
() =>
this.database.getDocumentByDocumentId(
remoteVersion.documentId
)
remoteVersion
)
);
} else {
await this.database.getResolvedDocumentByRelativePath(
document = await this.database.getResolvedDocumentByRelativePath(
document.relativePath,
promise
);
@ -325,7 +317,7 @@ export class Syncer {
await this.syncQueue.add(async () =>
this.internalSyncer.unrestrictedSyncRemotelyUpdatedFile(
remoteVersion,
() => this.database.getDocumentByUpdatePromise(promise)
document
)
);

View file

@ -1,6 +1,5 @@
import type {
Database,
DocumentId,
DocumentRecord,
RelativePath
} from "../persistence/database";
@ -33,31 +32,24 @@ export class UnrestrictedSyncer {
}
public async unrestrictedSyncLocallyCreatedFile(
proposedDocumentId: DocumentId,
getLatestDocument: () => DocumentRecord
document: DocumentRecord
): Promise<void> {
let document = getLatestDocument();
return this.executeSync(
[document.relativePath],
document.relativePath,
SyncType.CREATE,
SyncSource.PUSH,
async () => {
document = getLatestDocument();
const contentBytes = await this.operations.read(
document.relativePath
); // this can throw FileNotFoundError
const contentHash = hash(contentBytes);
const response = await this.syncService.create({
documentId: proposedDocumentId,
documentId: document.documentId,
relativePath: document.relativePath,
contentBytes
});
document = getLatestDocument();
this.history.addHistoryEntry({
status: SyncStatus.SUCCESS,
source: SyncSource.PUSH,
@ -68,12 +60,10 @@ export class UnrestrictedSyncer {
this.database.setDocument(
{
relativePath: document.relativePath,
documentId: response.documentId,
parentVersionId: response.vaultUpdateId,
hash: contentHash
},
document.identity
document
);
this.tryIncrementVaultUpdateId(response.vaultUpdateId);
@ -82,16 +72,13 @@ export class UnrestrictedSyncer {
}
public async unrestrictedSyncLocallyDeletedFile(
getLatestDocument: () => DocumentRecord
document: DocumentRecord
): Promise<void> {
let document = getLatestDocument();
await this.executeSync(
[document.relativePath],
document.relativePath,
SyncType.DELETE,
SyncSource.PUSH,
async () => {
document = getLatestDocument();
const response = await this.syncService.delete({
documentId: document.documentId,
relativePath: document.relativePath
@ -105,16 +92,12 @@ export class UnrestrictedSyncer {
type: SyncType.DELETE
});
document = getLatestDocument();
this.database.setDocument(
{
relativePath: document.relativePath,
documentId: response.documentId,
parentVersionId: response.vaultUpdateId,
hash: EMPTY_HASH
},
document.identity
document
);
}
);
@ -122,21 +105,16 @@ export class UnrestrictedSyncer {
public async unrestrictedSyncLocallyUpdatedFile({
oldPath,
getLatestDocument
document
}: {
oldPath?: RelativePath;
getLatestDocument: () => DocumentRecord;
document: DocumentRecord;
}): Promise<void> {
let document = getLatestDocument();
await this.executeSync(
[oldPath, document.relativePath].filter(
(path) => path !== undefined
),
document.relativePath,
SyncType.UPDATE,
SyncSource.PUSH,
async () => {
document = getLatestDocument();
const originalRelativePath = document.relativePath;
if (document.metadata === undefined || document.isDeleted) {
@ -168,8 +146,8 @@ export class UnrestrictedSyncer {
contentBytes
});
document = getLatestDocument();
// `document` is mutable and reflects the latest state in the local database
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
if (document.isDeleted) {
this.logger.info(
`Document ${document.relativePath} has been deleted before we could finish updating it`
@ -177,7 +155,8 @@ export class UnrestrictedSyncer {
return;
}
if (!document.metadata) {
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
if (document.metadata === undefined) {
throw new Error(
`Document ${document.relativePath} no longer has metadata after updating it`
);
@ -213,12 +192,10 @@ export class UnrestrictedSyncer {
this.database.delete(document.relativePath);
this.database.setDocument(
{
documentId: response.documentId,
relativePath: document.relativePath,
parentVersionId: response.vaultUpdateId,
hash: EMPTY_HASH
},
document.identity
document
);
await this.operations.delete(document.relativePath);
@ -231,11 +208,6 @@ export class UnrestrictedSyncer {
let actualPath = document.relativePath;
if (response.relativePath != originalRelativePath) {
// this.database.getNewResolvedDocumentByRelativePath(
// response.relativePath,
// promise
// );
actualPath = response.relativePath;
await this.operations.move(
document.relativePath,
@ -243,6 +215,14 @@ export class UnrestrictedSyncer {
); // this can throw FileNotFoundError
}
this.database.setDocument(
{
parentVersionId: response.vaultUpdateId,
hash: contentHash
},
document
);
if (response.type === "MergingUpdate") {
const responseBytes = deserialize(response.contentBase64);
contentHash = hash(responseBytes);
@ -262,16 +242,6 @@ export class UnrestrictedSyncer {
});
}
this.database.setDocument(
{
documentId: response.documentId,
relativePath: actualPath,
parentVersionId: response.vaultUpdateId,
hash: contentHash
},
document.identity
);
this.tryIncrementVaultUpdateId(response.vaultUpdateId);
}
);
@ -279,20 +249,18 @@ export class UnrestrictedSyncer {
public async unrestrictedSyncRemotelyUpdatedFile(
remoteVersion: components["schemas"]["DocumentVersionWithoutContent"],
getLatestDocument: () => DocumentRecord | undefined
document?: DocumentRecord
): Promise<void> {
await this.executeSync(
[remoteVersion.relativePath],
remoteVersion.relativePath,
SyncType.UPDATE,
SyncSource.PULL,
async () => {
let localMetadata = getLatestDocument();
if (localMetadata?.metadata !== undefined) {
if (document?.metadata !== undefined) {
// If the file exists locally, let's pretend the user has updated it
// and deal with remote update/deletion within `unrestrictedSyncLocallyUpdatedFile`
if (
localMetadata.metadata.parentVersionId >=
document.metadata.parentVersionId >=
remoteVersion.vaultUpdateId
) {
this.logger.debug(
@ -302,11 +270,7 @@ export class UnrestrictedSyncer {
}
return this.unrestrictedSyncLocallyUpdatedFile({
getLatestDocument: () =>
this.database.getDocumentByIdentity(
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
localMetadata!.identity
)
document
});
} else if (remoteVersion.isDeleted) {
// Either the doc hasn't made it to us before and therefore we don't need to delete it,
@ -323,9 +287,11 @@ export class UnrestrictedSyncer {
})
).contentBase64;
localMetadata = getLatestDocument();
document = this.database.getDocumentByDocumentId(
remoteVersion.documentId
);
if (localMetadata?.isDeleted === true) {
if (document?.isDeleted === true) {
this.logger.info(
`Document ${remoteVersion.relativePath} has been deleted locally before we could finish updating it`
);
@ -333,7 +299,7 @@ export class UnrestrictedSyncer {
}
if (
(localMetadata?.metadata?.parentVersionId ?? -1) >=
(document?.metadata?.parentVersionId ?? -1) >=
remoteVersion.vaultUpdateId
) {
this.logger.debug(
@ -344,16 +310,21 @@ export class UnrestrictedSyncer {
const contentBytes = deserialize(content);
const [promise, resolve] = createPromise();
await this.operations.ensureClearPath(
remoteVersion.relativePath
);
this.database.getNewResolvedDocumentByRelativePath(
remoteVersion.documentId,
remoteVersion.relativePath,
promise
const [promise, resolve] = createPromise();
this.database.setDocument(
{
parentVersionId: remoteVersion.vaultUpdateId,
hash: hash(contentBytes)
},
this.database.createNewPendingDocument(
remoteVersion.documentId,
remoteVersion.relativePath,
promise
)
);
await this.operations.create(
@ -361,17 +332,6 @@ export class UnrestrictedSyncer {
contentBytes
);
const document =
this.database.getDocumentByUpdatePromise(promise);
this.database.setDocument(
{
documentId: remoteVersion.documentId,
relativePath: remoteVersion.relativePath,
parentVersionId: remoteVersion.vaultUpdateId,
hash: hash(contentBytes)
},
document.identity
);
resolve();
this.database.removeDocumentPromise(promise);
@ -387,13 +347,11 @@ export class UnrestrictedSyncer {
}
public async executeSync<T>(
paths: RelativePath[],
relativePath: RelativePath,
syncType: SyncType,
syncSource: SyncSource,
fn: () => Promise<T>
): Promise<T | undefined> {
const relativePath = paths[paths.length - 1];
if (!this.operations.isFileEligibleForSync(relativePath)) {
this.history.addHistoryEntry({
status: SyncStatus.ERROR,

View file

@ -63,7 +63,11 @@ export class MockClient implements FileSystemOperations {
`Creating file ${path} with content ${new TextDecoder().decode(newContent)}`
);
this.localFiles.set(path, newContent);
void this.client.syncer.syncLocallyCreatedFile(path);
// we aren't the best client and it takes some time to notice changes
setImmediate(() => {
void this.client.syncer.syncLocallyCreatedFile(path);
});
}
public async createDirectory(_path: RelativePath): Promise<void> {
@ -101,8 +105,11 @@ export class MockClient implements FileSystemOperations {
`Updated file ${path} with:\n current content: ${currentContent}\n new content: ${newContent}`
);
void this.client.syncer.syncLocallyUpdatedFile({
relativePath: path
// we aren't the best client and it takes some time to notice changes
setImmediate(() => {
void this.client.syncer.syncLocallyUpdatedFile({
relativePath: path
});
});
return newContent;
@ -116,13 +123,16 @@ export class MockClient implements FileSystemOperations {
`Updated file ${path} with:\n new content: ${new TextDecoder().decode(content)}`
);
if (hasExisted) {
void this.client.syncer.syncLocallyUpdatedFile({
relativePath: path
});
} else {
void this.client.syncer.syncLocallyCreatedFile(path);
}
// we aren't the best client and it takes some time to notice changes
setImmediate(() => {
if (hasExisted) {
void this.client.syncer.syncLocallyUpdatedFile({
relativePath: path
});
} else {
void this.client.syncer.syncLocallyCreatedFile(path);
}
});
}
public async delete(path: RelativePath): Promise<void> {
@ -130,7 +140,10 @@ export class MockClient implements FileSystemOperations {
`Deleting file: ${path} with:\n content ${new TextDecoder().decode(this.localFiles.get(path))}`
);
this.localFiles.delete(path);
void this.client.syncer.syncLocallyDeletedFile(path);
// we aren't the best client and it takes some time to notice changes
setImmediate(() => {
void this.client.syncer.syncLocallyDeletedFile(path);
});
}
public async rename(
@ -150,9 +163,12 @@ export class MockClient implements FileSystemOperations {
`Renamed file: ${oldPath} -> ${newPath} with:\n content ${new TextDecoder().decode(file)}`
);
void this.client.syncer.syncLocallyUpdatedFile({
oldPath,
relativePath: newPath
// we aren't the best client and it takes some time to notice changes
setImmediate(() => {
void this.client.syncer.syncLocallyUpdatedFile({
oldPath,
relativePath: newPath
});
});
}
}

View file

@ -91,26 +91,24 @@ async function runTest({
async function runTests(): Promise<void> {
const agentCounts = [2, 8];
const jitterScaleInSeconds = [0.5, 0, 2];
const concurrencies = [16, 1];
const iterations = [200];
const networkJitterScaleInSeconds = [0.5, 2];
const concurrencies = [
16,
1 // test with concurrency 1 to check for deadlocks
];
const doDeletes = [true, false];
for (const agentCount of agentCounts) {
for (const concurrency of concurrencies) {
for (const jitter of jitterScaleInSeconds) {
for (const iteration of iterations) {
for (const deleteFiles of doDeletes) {
for (let i = 0; i < 3; i++) {
await runTest({
agentCount,
concurrency,
iterations: iteration,
doDeletes: deleteFiles,
jitterScaleInSeconds: jitter
});
}
}
for (const jitter of networkJitterScaleInSeconds) {
for (const deleteFiles of doDeletes) {
await runTest({
agentCount,
concurrency,
iterations: 200,
doDeletes: deleteFiles,
jitterScaleInSeconds: jitter
});
}
}
}