Working setup
This commit is contained in:
parent
e3a90833ff
commit
2dfb8b71e5
16 changed files with 459 additions and 318 deletions
|
|
@ -77,3 +77,10 @@ And to clean up the logs & database files, run `scripts/clean-up.sh`
|
|||
## Projects
|
||||
|
||||
- [Sync server](./sync-server/README.md)
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
a create that has been processed by the server but got lost on the way back will create a 2nd doc if it gets edited
|
||||
|
|
|
|||
|
|
@ -135,14 +135,14 @@ export default class VaultLinkPlugin extends Plugin {
|
|||
nativeLineEndings: Platform.isWin ? "\r\n" : "\n",
|
||||
...(IS_DEBUG_BUILD
|
||||
? {
|
||||
fetch: debugging.slowFetchFactory(1),
|
||||
webSocket: debugging.slowWebSocketFactory(1, new Logger())
|
||||
}
|
||||
fetch: debugging.slowFetchFactory(1),
|
||||
webSocket: debugging.slowWebSocketFactory(1, new Logger())
|
||||
}
|
||||
: {})
|
||||
});
|
||||
|
||||
if (IS_DEBUG_BUILD) {
|
||||
debugging.logToConsole(client);
|
||||
debugging.logToConsole(client.logger);
|
||||
}
|
||||
|
||||
return client;
|
||||
|
|
|
|||
4
frontend/package-lock.json
generated
4
frontend/package-lock.json
generated
|
|
@ -2902,7 +2902,9 @@
|
|||
}
|
||||
},
|
||||
"node_modules/qs": {
|
||||
"version": "6.14.0",
|
||||
"version": "6.14.1",
|
||||
"resolved": "https://registry.npmjs.org/qs/-/qs-6.14.1.tgz",
|
||||
"integrity": "sha512-4EK3+xJl8Ts67nLYNwqw/dsFVnCf+qR7RgXSK9jEEm9unao3njwMDdmsdvoKBKHzxd7tCYz5e5M+SnMjdtXGQQ==",
|
||||
"dev": true,
|
||||
"license": "BSD-3-Clause",
|
||||
"dependencies": {
|
||||
|
|
|
|||
|
|
@ -103,7 +103,7 @@ export class Database {
|
|||
i === 0
|
||||
? false
|
||||
: records[i - 1].parallelVersion ===
|
||||
current.parallelVersion
|
||||
current.parallelVersion
|
||||
)
|
||||
) {
|
||||
throw new Error(
|
||||
|
|
@ -170,7 +170,7 @@ export class Database {
|
|||
|
||||
if (entry === undefined) {
|
||||
throw new Error(
|
||||
`Document not found by relative path: ${relativePath}, ${JSON.stringify(
|
||||
`Document not found by relative path in getResolvedDocumentByRelativePath: ${relativePath}, ${JSON.stringify(
|
||||
this.documents,
|
||||
null,
|
||||
2
|
||||
|
|
@ -262,7 +262,7 @@ export class Database {
|
|||
}
|
||||
|
||||
oldDocument.relativePath = newRelativePath;
|
||||
// We're in a strange state where the target of the move has just got deleted,
|
||||
// We might be in a strange state where the target of the move has just got deleted,
|
||||
// however, its metadata might already have a bunch of updates queued up for
|
||||
// the document at the new location. We need to keep these updates.
|
||||
oldDocument.parallelVersion =
|
||||
|
|
@ -275,7 +275,11 @@ export class Database {
|
|||
const candidate = this.getLatestDocumentByRelativePath(relativePath);
|
||||
if (candidate === undefined) {
|
||||
throw new Error(
|
||||
`Document not found by relative path: ${relativePath}`
|
||||
`Document not found by relative path in delete: ${relativePath}, ${JSON.stringify(
|
||||
this.documents,
|
||||
null,
|
||||
2
|
||||
)}`
|
||||
);
|
||||
}
|
||||
candidate.isDeleted = true;
|
||||
|
|
@ -334,12 +338,19 @@ export class Database {
|
|||
|
||||
const duplicates = Array.from(idToPath.entries())
|
||||
.filter(([_, paths]) => paths.length > 1)
|
||||
.map(([id, paths]) => `${id} (${paths.join(", ")})`);
|
||||
.map(([id, paths]) => {
|
||||
let details = "";
|
||||
for (const path of paths) {
|
||||
const doc = this.getLatestDocumentByRelativePath(path);
|
||||
details += `\n- ${JSON.stringify(doc, null, 2)}`;
|
||||
}
|
||||
return `${id} (${paths.join(", ")}): ${details}`;
|
||||
});
|
||||
|
||||
if (duplicates.length > 0) {
|
||||
throw new Error(
|
||||
"Document IDs are not unique, found duplicates: " +
|
||||
duplicates.join("; ")
|
||||
duplicates.join("; ")
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -157,8 +157,7 @@ export class SyncService {
|
|||
(await response.json()) as DocumentUpdateResponse; // eslint-disable-line @typescript-eslint/no-unsafe-type-assertion
|
||||
|
||||
this.logger.debug(
|
||||
`Updated document ${JSON.stringify(result)} with id ${
|
||||
result.documentId
|
||||
`Updated document ${JSON.stringify(result)} with id ${result.documentId
|
||||
}}`
|
||||
);
|
||||
|
||||
|
|
@ -210,8 +209,7 @@ export class SyncService {
|
|||
(await response.json()) as DocumentUpdateResponse; // eslint-disable-line @typescript-eslint/no-unsafe-type-assertion
|
||||
|
||||
this.logger.debug(
|
||||
`Updated document ${JSON.stringify(result)} with id ${
|
||||
result.documentId
|
||||
`Updated document ${JSON.stringify(result)} with id ${result.documentId
|
||||
}}`
|
||||
);
|
||||
|
||||
|
|
@ -338,7 +336,7 @@ export class SyncService {
|
|||
return this.retryForever(async () => {
|
||||
this.logger.debug(
|
||||
"Getting all documents" +
|
||||
(since != null ? ` since ${since}` : "")
|
||||
(since != null ? ` since ${since}` : "")
|
||||
);
|
||||
|
||||
const url = new URL(this.getUrl("/documents"));
|
||||
|
|
|
|||
|
|
@ -164,7 +164,10 @@ export class WebSocketManager {
|
|||
this.webSocket.onclose = null;
|
||||
this.webSocket.onmessage = null;
|
||||
this.webSocket.onerror = null;
|
||||
this.webSocket.close();
|
||||
this.webSocket.close(
|
||||
1000,
|
||||
"Closing previous WebSocket connection"
|
||||
);
|
||||
} catch (e) {
|
||||
this.logger.error(
|
||||
`Failed to close previous WebSocket connection: ${e}`
|
||||
|
|
@ -187,7 +190,7 @@ export class WebSocketManager {
|
|||
`WebSocket connection timeout after ${WEBSOCKET_CONNECTION_TIMEOUT_IN_SECONDS} seconds`
|
||||
);
|
||||
// Force close to trigger onclose handler which will schedule reconnection
|
||||
this.webSocket?.close();
|
||||
this.webSocket?.close(1000, "Connection timeout");
|
||||
}, WEBSOCKET_CONNECTION_TIMEOUT_IN_SECONDS * 1000);
|
||||
|
||||
this.webSocket.onopen = (): void => {
|
||||
|
|
@ -240,7 +243,7 @@ export class WebSocketManager {
|
|||
};
|
||||
|
||||
this.webSocket.onerror = (error): void => {
|
||||
this.logger.error(
|
||||
this.logger.warn(
|
||||
`WebSocket error occurred: ${error instanceof ErrorEvent ? error.message : "Unknown error"}`
|
||||
);
|
||||
};
|
||||
|
|
|
|||
|
|
@ -29,7 +29,6 @@ import { ServerConfig } from "./services/server-config";
|
|||
import type { EventListeners } from "./utils/data-structures/event-listeners";
|
||||
|
||||
export class SyncClient {
|
||||
private hasStartedOfflineSync = false;
|
||||
private hasFinishedOfflineSync = false;
|
||||
private hasStarted = false;
|
||||
private hasBeenDestroyed = false;
|
||||
|
|
@ -41,6 +40,7 @@ export class SyncClient {
|
|||
private readonly history: SyncHistory,
|
||||
private readonly settings: Settings,
|
||||
private readonly database: Database,
|
||||
private readonly unrestrictedSyncer: UnrestrictedSyncer,
|
||||
private readonly syncer: Syncer,
|
||||
private readonly webSocketManager: WebSocketManager,
|
||||
public readonly logger: Logger,
|
||||
|
|
@ -56,7 +56,7 @@ export class SyncClient {
|
|||
database: Partial<StoredDatabase>;
|
||||
}>
|
||||
>
|
||||
) {}
|
||||
) { }
|
||||
|
||||
public get documentCount(): number {
|
||||
return this.database.length;
|
||||
|
|
@ -221,6 +221,7 @@ export class SyncClient {
|
|||
history,
|
||||
settings,
|
||||
database,
|
||||
unrestrictedSyncer,
|
||||
syncer,
|
||||
webSocketManager,
|
||||
logger,
|
||||
|
|
@ -335,7 +336,6 @@ export class SyncClient {
|
|||
this.database.reset();
|
||||
await this.database.save(); // ensure the new database reads as empty
|
||||
this.resetInMemoryState();
|
||||
this.hasStartedOfflineSync = false;
|
||||
this.hasFinishedOfflineSync = false;
|
||||
this.serverConfig.reset();
|
||||
|
||||
|
|
@ -369,7 +369,9 @@ export class SyncClient {
|
|||
this.checkIfDestroyed("syncLocallyCreatedFile");
|
||||
|
||||
this.fileChangeNotifier.notifyOfFileChange(relativePath);
|
||||
return this.syncer.syncLocallyCreatedFile(relativePath);
|
||||
return this.syncer.syncLocallyCreatedFile(relativePath, {
|
||||
forceMerge: false
|
||||
});
|
||||
}
|
||||
|
||||
public async syncLocallyDeletedFile(
|
||||
|
|
@ -475,17 +477,15 @@ export class SyncClient {
|
|||
|
||||
// warm the cache
|
||||
await this.serverConfig.getConfig();
|
||||
this.webSocketManager.start();
|
||||
|
||||
if (!this.hasStartedOfflineSync) {
|
||||
this.hasStartedOfflineSync = true;
|
||||
await this.syncer.scheduleSyncForOfflineChanges();
|
||||
}
|
||||
await this.syncer.scheduleSyncForOfflineChanges();
|
||||
this.webSocketManager.start();
|
||||
|
||||
this.hasFinishedOfflineSync = true;
|
||||
}
|
||||
|
||||
private async pause(): Promise<void> {
|
||||
this.hasFinishedOfflineSync = false;
|
||||
this.fetchController.startReset();
|
||||
await this.webSocketManager.stop();
|
||||
await this.waitUntilFinished();
|
||||
|
|
@ -497,6 +497,7 @@ export class SyncClient {
|
|||
// don't reset the logger
|
||||
this.cursorTracker.reset();
|
||||
this.syncer.reset();
|
||||
this.unrestrictedSyncer.reset();
|
||||
this.fileOperations.reset();
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -42,7 +42,7 @@ export class Syncer {
|
|||
private readonly settings: Settings,
|
||||
private readonly webSocketManager: WebSocketManager,
|
||||
private readonly operations: FileOperations,
|
||||
private readonly internalSyncer: UnrestrictedSyncer
|
||||
private readonly unrestrictedSyncer: UnrestrictedSyncer
|
||||
) {
|
||||
this.syncQueue = new PQueue({
|
||||
concurrency: settings.getSettings().syncConcurrency
|
||||
|
|
@ -81,12 +81,15 @@ export class Syncer {
|
|||
}
|
||||
|
||||
public async syncLocallyCreatedFile(
|
||||
relativePath: RelativePath
|
||||
relativePath: RelativePath,
|
||||
{ forceMerge }: { forceMerge: boolean }
|
||||
): Promise<void> {
|
||||
if (
|
||||
this.database.getLatestDocumentByRelativePath(relativePath)
|
||||
?.isDeleted === false
|
||||
) {
|
||||
// This is likely a consequence of us creating a file because of a remote update
|
||||
// which triggered a local create, so we don't need to do anything here.
|
||||
this.logger.debug(
|
||||
`Document ${relativePath} already exists in the database, skipping`
|
||||
);
|
||||
|
|
@ -94,6 +97,7 @@ export class Syncer {
|
|||
}
|
||||
|
||||
const [promise, resolve, reject] = createPromise();
|
||||
this.logger.warn(`creating ${relativePath} locally`);
|
||||
|
||||
const document = this.database.createNewPendingDocument(
|
||||
relativePath,
|
||||
|
|
@ -102,8 +106,13 @@ export class Syncer {
|
|||
|
||||
try {
|
||||
await this.syncQueue.add(async () =>
|
||||
this.internalSyncer.unrestrictedSyncLocallyCreatedFile(document)
|
||||
);
|
||||
this.unrestrictedSyncer.unrestrictedSyncLocallyCreatedOrUpdatedFile(
|
||||
{ document, forceMerge }
|
||||
)
|
||||
)
|
||||
|
||||
this.logger.warn(`done creating ${relativePath} locally`);
|
||||
|
||||
|
||||
resolve();
|
||||
} catch (e) {
|
||||
|
|
@ -123,7 +132,7 @@ export class Syncer {
|
|||
// This is must be a consequence of us deleting a file because of a remote update
|
||||
// which triggered a local delete, so we don't need to do anything here.
|
||||
this.logger.debug(
|
||||
`Document ${relativePath} has already been markes as deleted, skipping`
|
||||
`Document ${relativePath} has already been marked as deleted, skipping`
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
|
@ -141,7 +150,7 @@ export class Syncer {
|
|||
|
||||
try {
|
||||
await this.syncQueue.add(async () =>
|
||||
this.internalSyncer.unrestrictedSyncLocallyDeletedFile(document)
|
||||
this.unrestrictedSyncer.unrestrictedSyncLocallyDeletedFile(document)
|
||||
);
|
||||
|
||||
resolve();
|
||||
|
|
@ -166,7 +175,7 @@ export class Syncer {
|
|||
// in that case, we mustn't move it again.
|
||||
if (
|
||||
this.database.getLatestDocumentByRelativePath(relativePath) ===
|
||||
undefined ||
|
||||
undefined ||
|
||||
this.database.getLatestDocumentByRelativePath(relativePath)
|
||||
?.isDeleted === true
|
||||
) {
|
||||
|
|
@ -183,6 +192,8 @@ export class Syncer {
|
|||
let document =
|
||||
this.database.getLatestDocumentByRelativePath(relativePath);
|
||||
|
||||
this.logger.warn(`sync doc ${JSON.stringify(document)} for path ${relativePath} (old path: ${oldPath}), len docs: ${document?.updates.length}`);
|
||||
|
||||
if (
|
||||
oldPath !== undefined &&
|
||||
document?.metadata?.remoteRelativePath === relativePath
|
||||
|
|
@ -193,6 +204,7 @@ export class Syncer {
|
|||
return;
|
||||
}
|
||||
|
||||
// must have been removed after a successful delete
|
||||
if (document === undefined) {
|
||||
this.logger.debug(
|
||||
`Cannot find document ${relativePath} in the database, skipping`
|
||||
|
|
@ -213,12 +225,13 @@ export class Syncer {
|
|||
relativePath,
|
||||
promise
|
||||
);
|
||||
this.logger.warn(`updating ${document.relativePath} locally`);
|
||||
|
||||
try {
|
||||
await this.syncQueue.add(async () =>
|
||||
this.internalSyncer.unrestrictedSyncLocallyUpdatedFile({
|
||||
this.unrestrictedSyncer.unrestrictedSyncLocallyCreatedOrUpdatedFile({
|
||||
oldPath,
|
||||
document
|
||||
document: document!
|
||||
})
|
||||
);
|
||||
|
||||
|
|
@ -252,8 +265,6 @@ export class Syncer {
|
|||
`Not all local changes have been applied remotely: ${e}`
|
||||
);
|
||||
throw e;
|
||||
} finally {
|
||||
this.runningScheduleSyncForOfflineChanges = undefined;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -266,6 +277,8 @@ export class Syncer {
|
|||
message: WebSocketVaultUpdate
|
||||
): Promise<void> {
|
||||
try {
|
||||
await this.scheduleSyncForOfflineChanges();
|
||||
|
||||
const handlerPromise = awaitAll(
|
||||
message.documents.map(async (document) =>
|
||||
this.internalSyncRemotelyUpdatedFile(document)
|
||||
|
|
@ -312,25 +325,45 @@ export class Syncer {
|
|||
remoteVersion.documentId
|
||||
);
|
||||
|
||||
this.logger.warn(`${remoteVersion.documentId} got remote update ${JSON.stringify(remoteVersion)}`);
|
||||
|
||||
if (document === undefined) {
|
||||
// Let's avoid the same documents getting created in parallel multiple times.
|
||||
// There might be multiple tasks waiting for the lock
|
||||
this.logger.warn(`${remoteVersion.documentId} but document doesn't exist`)
|
||||
|
||||
|
||||
return this.remoteDocumentsLock.withLock(
|
||||
// Avoid the same documents getting created in parallel multiple times through fetching multiple updates of the same
|
||||
// new remote document concurrently.
|
||||
// There might be multiple tasks waiting for the lock
|
||||
remoteVersion.documentId,
|
||||
async () => {
|
||||
|
||||
// We have to wait for any ongoing creates sent for this file to finish,
|
||||
// This is to avoid fetching one's own creates before the corresponding local create has finished syncing. This is a concern because
|
||||
// documents being created don't yet have a document id in the local database and we could be notified of the remote create
|
||||
// before the local create has finished syncing, so we can't just ignore the update based on the local DB content as we
|
||||
// can't find the corresponding document yet.
|
||||
if (document?.metadata === undefined) {
|
||||
await this.unrestrictedSyncer.fileCreationLock.waitForLockWithoutAcquiringLock(remoteVersion.relativePath);
|
||||
}
|
||||
|
||||
document = this.database.getDocumentByDocumentId(
|
||||
remoteVersion.documentId
|
||||
);
|
||||
|
||||
// We're either the first one to get the lock, so we have to create the document in `unrestrictedSyncRemotelyUpdatedFile`
|
||||
this.logger.warn(`${remoteVersion.documentId} rechecking, document is now ${JSON.stringify(document)}`)
|
||||
|
||||
// We're the first one to get the lock, so we have to create the document in `unrestrictedSyncRemotelyUpdatedFile`
|
||||
if (document === undefined) {
|
||||
this.logger.warn(`${remoteVersion.documentId} document is undefined, creating new document`)
|
||||
await this.syncQueue.add(async () =>
|
||||
this.internalSyncer.unrestrictedSyncRemotelyUpdatedFile(
|
||||
this.unrestrictedSyncer.unrestrictedSyncRemotelyUpdatedFile(
|
||||
remoteVersion
|
||||
)
|
||||
);
|
||||
} else {
|
||||
const [promise, resolve, reject] = createPromise();
|
||||
const [promise, resolve, reject] =
|
||||
createPromise();
|
||||
|
||||
document =
|
||||
await this.database.getResolvedDocumentByRelativePath(
|
||||
|
|
@ -340,7 +373,7 @@ export class Syncer {
|
|||
|
||||
try {
|
||||
await this.syncQueue.add(async () =>
|
||||
this.internalSyncer.unrestrictedSyncRemotelyUpdatedFile(
|
||||
this.unrestrictedSyncer.unrestrictedSyncRemotelyUpdatedFile(
|
||||
remoteVersion,
|
||||
document
|
||||
)
|
||||
|
|
@ -350,13 +383,19 @@ export class Syncer {
|
|||
} catch (e) {
|
||||
reject(e);
|
||||
} finally {
|
||||
this.database.removeDocumentPromise(promise);
|
||||
this.database.removeDocumentPromise(
|
||||
promise
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
this.database.addSeenUpdateId(remoteVersion.vaultUpdateId);
|
||||
this.database.addSeenUpdateId(
|
||||
remoteVersion.vaultUpdateId
|
||||
);
|
||||
}
|
||||
);
|
||||
)
|
||||
} else {
|
||||
this.logger.warn(`${remoteVersion.documentId} and document exists (path: ${JSON.stringify(document)})`);
|
||||
}
|
||||
|
||||
// We're either the first one to get the lock, so we have to create the document in `unrestrictedSyncRemotelyUpdatedFile`
|
||||
|
|
@ -369,7 +408,7 @@ export class Syncer {
|
|||
|
||||
try {
|
||||
await this.syncQueue.add(async () =>
|
||||
this.internalSyncer.unrestrictedSyncRemotelyUpdatedFile(
|
||||
this.unrestrictedSyncer.unrestrictedSyncRemotelyUpdatedFile(
|
||||
remoteVersion,
|
||||
document
|
||||
)
|
||||
|
|
@ -402,7 +441,8 @@ export class Syncer {
|
|||
}
|
||||
}
|
||||
|
||||
await awaitAll(
|
||||
type Instruction = { "type": "update" | "create", relativePath: string, oldPath?: string };
|
||||
const instructions: (Instruction | undefined)[] = await awaitAll(
|
||||
allLocalFiles.map(async (relativePath) => {
|
||||
if (
|
||||
this.database.getLatestDocumentByRelativePath(relativePath)
|
||||
|
|
@ -412,9 +452,7 @@ export class Syncer {
|
|||
`Document ${relativePath} might have been updated locally, scheduling sync to validate and update it`
|
||||
);
|
||||
|
||||
return this.syncLocallyUpdatedFile({
|
||||
relativePath
|
||||
});
|
||||
return { type: "update", relativePath } as Instruction;
|
||||
}
|
||||
|
||||
// Perhaps the file has been moved; let's check by looking at the deleted files
|
||||
|
|
@ -457,21 +495,26 @@ export class Syncer {
|
|||
`Document '${originalFile.relativePath}' was not found under its current path in the database but was found under a different path (${relativePath}), scheduling sync to move it`
|
||||
);
|
||||
|
||||
// We're outside of the pqueue, so we need to call the public wrapper
|
||||
return this.syncLocallyUpdatedFile({
|
||||
return {
|
||||
type: "update",
|
||||
oldPath: originalFile.relativePath,
|
||||
relativePath
|
||||
});
|
||||
} as Instruction;
|
||||
|
||||
}
|
||||
|
||||
this.logger.debug(
|
||||
`Document ${relativePath} not found in database, scheduling sync to create it`
|
||||
);
|
||||
// We're outside of the pqueue, so we need to call the public wrapper
|
||||
return this.syncLocallyCreatedFile(relativePath);
|
||||
|
||||
return {
|
||||
type: "create",
|
||||
relativePath
|
||||
} as Instruction;
|
||||
})
|
||||
);
|
||||
|
||||
|
||||
// this has to happen strictly after the previous awaitAll, as that one
|
||||
// might have removed some of the documents from the list
|
||||
await awaitAll(
|
||||
|
|
@ -484,5 +527,36 @@ export class Syncer {
|
|||
return this.syncLocallyDeletedFile(relativePath);
|
||||
})
|
||||
);
|
||||
|
||||
|
||||
await awaitAll(instructions.map(async (instruction) => {
|
||||
if (instruction === undefined) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (instruction.type === "update") {
|
||||
// We're outside of the pqueue, so we need to call the public wrapper
|
||||
return await this.syncLocallyUpdatedFile({
|
||||
oldPath: instruction.oldPath,
|
||||
relativePath: instruction.relativePath
|
||||
});
|
||||
}
|
||||
}));
|
||||
|
||||
// we have to ensure the deletes & updates have finished before starting creates,
|
||||
// otherwise the server might return an existing document (that we're about to delete)
|
||||
// instead of actually creating a new one
|
||||
await awaitAll(instructions.map(async (instruction) => {
|
||||
if (instruction === undefined) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (instruction.type === "create") {
|
||||
// We're outside of the pqueue, so we need to call the public wrapper
|
||||
return await this.syncLocallyCreatedFile(instruction.relativePath, { forceMerge: true });
|
||||
}
|
||||
}));
|
||||
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -33,9 +33,12 @@ import type { FixedSizeDocumentCache } from "../utils/data-structures/fix-sized-
|
|||
import { isFileTypeMergable } from "../utils/is-file-type-mergable";
|
||||
import { isBinary } from "../utils/is-binary";
|
||||
import type { ServerConfig } from "../services/server-config";
|
||||
import { Locks } from "../utils/data-structures/locks";
|
||||
|
||||
export class UnrestrictedSyncer {
|
||||
private ignorePatterns: RegExp[];
|
||||
public readonly fileCreationLock: Locks<RelativePath> = new Locks<RelativePath>();
|
||||
|
||||
|
||||
public constructor(
|
||||
private readonly logger: Logger,
|
||||
|
|
@ -60,118 +63,50 @@ export class UnrestrictedSyncer {
|
|||
});
|
||||
}
|
||||
|
||||
public async unrestrictedSyncLocallyCreatedFile(
|
||||
document: DocumentRecord
|
||||
): Promise<void> {
|
||||
const updateDetails: SyncCreateDetails = {
|
||||
type: SyncType.CREATE,
|
||||
relativePath: document.relativePath
|
||||
};
|
||||
|
||||
return this.executeSync(updateDetails, async () => {
|
||||
const originalRelativePath = document.relativePath;
|
||||
if (document.isDeleted) {
|
||||
this.logger.debug(
|
||||
`Document ${originalRelativePath} has been already deleted, no need to create it`
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
const contentBytes =
|
||||
await this.operations.read(originalRelativePath); // this can throw FileNotFoundError
|
||||
const contentHash = hash(contentBytes);
|
||||
|
||||
const response = await this.syncService.create({
|
||||
relativePath: originalRelativePath,
|
||||
contentBytes,
|
||||
forceMerge: true
|
||||
});
|
||||
|
||||
await this.handleMaybeMergingResponse({
|
||||
document,
|
||||
response,
|
||||
contentHash,
|
||||
originalRelativePath,
|
||||
originalContentBytes: contentBytes
|
||||
});
|
||||
|
||||
this.history.addHistoryEntry({
|
||||
status: SyncStatus.SUCCESS,
|
||||
details: updateDetails,
|
||||
message: `Successfully uploaded locally created file`
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
public async unrestrictedSyncLocallyDeletedFile(
|
||||
document: DocumentRecord
|
||||
): Promise<void> {
|
||||
const updateDetails: SyncDeleteDetails = {
|
||||
type: SyncType.DELETE,
|
||||
relativePath: document.relativePath
|
||||
};
|
||||
|
||||
await this.executeSync(updateDetails, async () => {
|
||||
if (document.metadata === undefined) {
|
||||
this.logger.debug(
|
||||
`Document ${document.relativePath} has no metadata, so it has never got synced remotely; no need to delete it remotely`
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
const response = await this.syncService.delete({
|
||||
documentId: document.metadata.documentId,
|
||||
relativePath: document.relativePath
|
||||
});
|
||||
|
||||
this.database.updateDocumentMetadata(
|
||||
{
|
||||
...document.metadata,
|
||||
parentVersionId: response.vaultUpdateId,
|
||||
hash: EMPTY_HASH,
|
||||
remoteRelativePath: document.relativePath
|
||||
},
|
||||
document
|
||||
);
|
||||
|
||||
this.database.addSeenUpdateId(response.vaultUpdateId);
|
||||
|
||||
this.history.addHistoryEntry({
|
||||
status: SyncStatus.SUCCESS,
|
||||
details: updateDetails,
|
||||
message: `Successfully deleted locally deleted file on the server`,
|
||||
author: response.userId
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
public async unrestrictedSyncLocallyUpdatedFile({
|
||||
public async unrestrictedSyncLocallyCreatedOrUpdatedFile({
|
||||
oldPath,
|
||||
document,
|
||||
forceMerge,
|
||||
// We use the same code path for both local and remote updates. We need to force the update
|
||||
// if there are no local changes but we know that the remote version is newer.
|
||||
force = false
|
||||
}: {
|
||||
oldPath?: RelativePath;
|
||||
force?: boolean;
|
||||
forceMerge?: boolean
|
||||
document: DocumentRecord;
|
||||
}): Promise<void> {
|
||||
const updateDetails: SyncUpdateDetails | SyncMovedDetails =
|
||||
oldPath !== undefined
|
||||
? {
|
||||
type: SyncType.MOVE,
|
||||
relativePath: document.relativePath,
|
||||
movedFrom: oldPath
|
||||
}
|
||||
: {
|
||||
type: SyncType.UPDATE,
|
||||
relativePath: document.relativePath
|
||||
};
|
||||
|
||||
// this.history.addHistoryEntry({
|
||||
// status: SyncStatus.SUCCESS,
|
||||
// details: updateDetails,
|
||||
// message: `Successfully uploaded locally created file`
|
||||
// });
|
||||
|
||||
let updateDetails: SyncCreateDetails | SyncUpdateDetails | SyncMovedDetails;
|
||||
if (document.metadata === undefined) {
|
||||
updateDetails = {
|
||||
type: SyncType.CREATE,
|
||||
relativePath: document.relativePath
|
||||
};
|
||||
}
|
||||
else if (oldPath !== undefined) {
|
||||
updateDetails = {
|
||||
type: SyncType.MOVE,
|
||||
relativePath: document.relativePath,
|
||||
movedFrom: oldPath
|
||||
};
|
||||
} else {
|
||||
updateDetails = {
|
||||
type: SyncType.UPDATE,
|
||||
relativePath: document.relativePath
|
||||
};
|
||||
}
|
||||
|
||||
await this.executeSync(updateDetails, async () => {
|
||||
const originalRelativePath = document.relativePath;
|
||||
|
||||
if (document.isDeleted || document.metadata === undefined) {
|
||||
if (document.isDeleted) {
|
||||
this.logger.debug(
|
||||
`Document ${document.relativePath} has been already deleted, no need to update it`
|
||||
);
|
||||
|
|
@ -183,64 +118,88 @@ export class UnrestrictedSyncer {
|
|||
); // this can throw FileNotFoundError
|
||||
const contentHash = hash(contentBytes);
|
||||
|
||||
const areThereLocalChanges = !(
|
||||
document.metadata.hash === contentHash && oldPath === undefined
|
||||
);
|
||||
this.logger.warn(`updating ${document.relativePath} locally, inner`);
|
||||
|
||||
let response: DocumentVersion | DocumentUpdateResponse | undefined =
|
||||
undefined;
|
||||
|
||||
if (areThereLocalChanges) {
|
||||
const isText =
|
||||
!isBinary(contentBytes) &&
|
||||
isFileTypeMergable(
|
||||
document.relativePath,
|
||||
(await this.serverConfig.getConfig())
|
||||
.mergeableFileExtensions
|
||||
);
|
||||
const cachedVersion = this.contentCache.get(
|
||||
document.metadata.parentVersionId
|
||||
);
|
||||
if (document.metadata === undefined) {
|
||||
response = await this.fileCreationLock.withLock(document.relativePath, async () => {
|
||||
const response = await this.syncService.create({
|
||||
relativePath: originalRelativePath,
|
||||
contentBytes,
|
||||
forceMerge
|
||||
});
|
||||
|
||||
response =
|
||||
isText && cachedVersion !== undefined
|
||||
? await this.syncService.putText({
|
||||
documentId: document.metadata.documentId,
|
||||
parentVersionId:
|
||||
document.metadata.parentVersionId,
|
||||
relativePath: document.relativePath,
|
||||
content: diff(
|
||||
new TextDecoder().decode(cachedVersion),
|
||||
new TextDecoder().decode(contentBytes)
|
||||
)
|
||||
})
|
||||
: await this.syncService.putBinary({
|
||||
documentId: document.metadata.documentId,
|
||||
parentVersionId:
|
||||
document.metadata.parentVersionId,
|
||||
relativePath: document.relativePath,
|
||||
contentBytes
|
||||
});
|
||||
await this.handleMaybeMergingResponse({
|
||||
document,
|
||||
response,
|
||||
contentHash,
|
||||
originalRelativePath,
|
||||
originalContentBytes: contentBytes
|
||||
});
|
||||
|
||||
return response;
|
||||
});
|
||||
} else {
|
||||
if (!force) {
|
||||
this.logger.debug(
|
||||
`File hash of ${document.relativePath} matches with last synced version and the path hasn't changed; no need to sync`
|
||||
const areThereLocalChanges =
|
||||
document.metadata.hash !== contentHash || oldPath !== undefined;
|
||||
|
||||
if (areThereLocalChanges) {
|
||||
const isText =
|
||||
!isBinary(contentBytes) &&
|
||||
isFileTypeMergable(
|
||||
document.relativePath,
|
||||
(await this.serverConfig.getConfig())
|
||||
.mergeableFileExtensions
|
||||
);
|
||||
const cachedVersion = this.contentCache.get(
|
||||
document.metadata.parentVersionId
|
||||
);
|
||||
return;
|
||||
|
||||
response =
|
||||
isText && cachedVersion !== undefined
|
||||
? await this.syncService.putText({
|
||||
documentId: document.metadata.documentId,
|
||||
parentVersionId:
|
||||
document.metadata.parentVersionId,
|
||||
relativePath: document.relativePath,
|
||||
content: diff(
|
||||
new TextDecoder().decode(cachedVersion),
|
||||
new TextDecoder().decode(contentBytes)
|
||||
)
|
||||
})
|
||||
: await this.syncService.putBinary({
|
||||
documentId: document.metadata.documentId,
|
||||
parentVersionId:
|
||||
document.metadata.parentVersionId,
|
||||
relativePath: document.relativePath,
|
||||
contentBytes
|
||||
});
|
||||
} else {
|
||||
if (!force) {
|
||||
this.logger.debug(
|
||||
`File hash of ${document.relativePath} matches with last synced version and the path hasn't changed; no need to sync`
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
// we use this code path (force == true) to sync remotely updated files which have no local changes
|
||||
response = await this.syncService.get({
|
||||
documentId: document.metadata.documentId
|
||||
});
|
||||
}
|
||||
|
||||
response = await this.syncService.get({
|
||||
documentId: document.metadata.documentId
|
||||
await this.handleMaybeMergingResponse({
|
||||
document,
|
||||
response,
|
||||
contentHash,
|
||||
originalRelativePath,
|
||||
originalContentBytes: contentBytes
|
||||
});
|
||||
}
|
||||
|
||||
await this.handleMaybeMergingResponse({
|
||||
document,
|
||||
response: response,
|
||||
contentHash,
|
||||
originalRelativePath,
|
||||
originalContentBytes: contentBytes
|
||||
});
|
||||
|
||||
|
||||
if (!("type" in response) || response.type === "MergingUpdate") {
|
||||
if (!force) {
|
||||
|
|
@ -249,30 +208,33 @@ export class UnrestrictedSyncer {
|
|||
details: updateDetails,
|
||||
message: `The file we updated had been updated remotely, so we downloaded the merged version`
|
||||
});
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
const actualUpdateDetails: SyncUpdateDetails | SyncMovedDetails =
|
||||
oldPath !== undefined ||
|
||||
response.relativePath != originalRelativePath
|
||||
response.relativePath != originalRelativePath
|
||||
? {
|
||||
type: SyncType.MOVE,
|
||||
relativePath: response.relativePath,
|
||||
movedFrom: oldPath ?? originalRelativePath
|
||||
}
|
||||
type: SyncType.MOVE,
|
||||
relativePath: response.relativePath,
|
||||
movedFrom: originalRelativePath
|
||||
}
|
||||
: {
|
||||
type: SyncType.UPDATE,
|
||||
relativePath: response.relativePath
|
||||
};
|
||||
type: SyncType.UPDATE,
|
||||
relativePath: response.relativePath
|
||||
};
|
||||
|
||||
if (areThereLocalChanges) {
|
||||
this.history.addHistoryEntry({
|
||||
status: SyncStatus.SUCCESS,
|
||||
details: actualUpdateDetails,
|
||||
message: `Successfully uploaded locally updated file to the server`,
|
||||
author: response.userId
|
||||
});
|
||||
} else if (!response.isDeleted) {
|
||||
// if (areThereLocalChanges) {
|
||||
// this.history.addHistoryEntry({
|
||||
// status: SyncStatus.SUCCESS,
|
||||
// details: actualUpdateDetails,
|
||||
// message: `Successfully uploaded locally updated file to the server`,
|
||||
// author: response.userId
|
||||
// });
|
||||
// } else
|
||||
|
||||
if (!response.isDeleted) {
|
||||
this.history.addHistoryEntry({
|
||||
status: SyncStatus.SUCCESS,
|
||||
details: actualUpdateDetails,
|
||||
|
|
@ -296,6 +258,49 @@ export class UnrestrictedSyncer {
|
|||
});
|
||||
}
|
||||
|
||||
|
||||
public async unrestrictedSyncLocallyDeletedFile(
|
||||
document: DocumentRecord
|
||||
): Promise<void> {
|
||||
const updateDetails: SyncDeleteDetails = {
|
||||
type: SyncType.DELETE,
|
||||
relativePath: document.relativePath
|
||||
};
|
||||
|
||||
await this.executeSync(updateDetails, async () => {
|
||||
if (document.metadata === undefined) {
|
||||
this.logger.debug(
|
||||
`Document ${document.relativePath} has never been synced, no need to delete it remotely`
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
const response = await this.syncService.delete({
|
||||
documentId: document.metadata.documentId,
|
||||
relativePath: document.relativePath
|
||||
});
|
||||
|
||||
this.database.updateDocumentMetadata(
|
||||
{
|
||||
documentId: response.documentId,
|
||||
parentVersionId: response.vaultUpdateId,
|
||||
hash: EMPTY_HASH,
|
||||
remoteRelativePath: document.relativePath
|
||||
},
|
||||
document
|
||||
);
|
||||
|
||||
this.database.addSeenUpdateId(response.vaultUpdateId);
|
||||
|
||||
this.history.addHistoryEntry({
|
||||
status: SyncStatus.SUCCESS,
|
||||
details: updateDetails,
|
||||
message: `Successfully deleted locally deleted file on the server`,
|
||||
author: response.userId
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
public async unrestrictedSyncRemotelyUpdatedFile(
|
||||
remoteVersion: DocumentVersionWithoutContent,
|
||||
document?: DocumentRecord
|
||||
|
|
@ -305,6 +310,7 @@ export class UnrestrictedSyncer {
|
|||
relativePath: remoteVersion.relativePath
|
||||
};
|
||||
|
||||
|
||||
await this.executeSync(updateDetails, async () => {
|
||||
if (document?.metadata !== undefined) {
|
||||
// If the file exists locally, let's pretend the user has updated it
|
||||
|
|
@ -320,7 +326,7 @@ export class UnrestrictedSyncer {
|
|||
return;
|
||||
}
|
||||
|
||||
return this.unrestrictedSyncLocallyUpdatedFile({
|
||||
return this.unrestrictedSyncLocallyCreatedOrUpdatedFile({
|
||||
document,
|
||||
force: true
|
||||
});
|
||||
|
|
@ -403,10 +409,21 @@ export class UnrestrictedSyncer {
|
|||
});
|
||||
}
|
||||
|
||||
public async executeSync<T>(
|
||||
public reset(): void {
|
||||
this.fileCreationLock.reset();
|
||||
}
|
||||
|
||||
private async executeSync<T>(
|
||||
details: SyncDetails,
|
||||
fn: () => Promise<T>
|
||||
): Promise<T | undefined> {
|
||||
if (!this.settings.getSettings().isSyncEnabled) {
|
||||
this.logger.info(
|
||||
`Skipping sync operation for file '${details.relativePath}' because sync is disabled`
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
for (const pattern of this.ignorePatterns) {
|
||||
if (pattern.test(details.relativePath)) {
|
||||
this.logger.debug(
|
||||
|
|
@ -460,6 +477,8 @@ export class UnrestrictedSyncer {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
private async handleMaybeMergingResponse({
|
||||
document,
|
||||
response,
|
||||
|
|
@ -474,7 +493,6 @@ export class UnrestrictedSyncer {
|
|||
originalContentBytes: Uint8Array;
|
||||
}): Promise<void> {
|
||||
// `document` is mutable and reflects the latest state in the local database
|
||||
|
||||
if (document.isDeleted) {
|
||||
this.logger.info(
|
||||
`Document ${document.relativePath} has been deleted before we could finish updating it`
|
||||
|
|
@ -569,9 +587,8 @@ export class UnrestrictedSyncer {
|
|||
type: SyncType.SKIPPED,
|
||||
relativePath
|
||||
},
|
||||
message: `File size of ${sizeInMB} MB exceeds the maximum file size limit of ${
|
||||
maxFileSizeMB
|
||||
} MB`
|
||||
message: `File size of ${sizeInMB} MB exceeds the maximum file size limit of ${maxFileSizeMB
|
||||
} MB`
|
||||
};
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -18,7 +18,7 @@ export class Locks<T> {
|
|||
[() => unknown, (err: unknown) => unknown][]
|
||||
>();
|
||||
|
||||
public constructor(private readonly logger?: Logger) {}
|
||||
public constructor(private readonly logger?: Logger) { }
|
||||
|
||||
/**
|
||||
* Executes a function while holding exclusive locks on one or more keys.
|
||||
|
|
@ -125,6 +125,18 @@ export class Locks<T> {
|
|||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Waits until a lock is released without acquiring it.
|
||||
* Operations are queued in FIFO order.
|
||||
*
|
||||
* @param key The key to wait for
|
||||
* @returns Promise that resolves when lock is released
|
||||
*/
|
||||
public async waitForLockWithoutAcquiringLock(key: T): Promise<void> {
|
||||
await this.waitForLock(key);
|
||||
this.unlock(key);
|
||||
}
|
||||
|
||||
/**
|
||||
* Releases a lock and grants access to the next waiting operation in FIFO order.
|
||||
* Removes the key from locked set if no waiters.
|
||||
|
|
|
|||
|
|
@ -1,9 +1,8 @@
|
|||
import type { SyncClient } from "../../sync-client";
|
||||
import type { LogLine } from "../../tracing/logger";
|
||||
import type { Logger, LogLine } from "../../tracing/logger";
|
||||
import { LogLevel } from "../../tracing/logger";
|
||||
|
||||
export function logToConsole(client: SyncClient): void {
|
||||
client.logger.onLogEmitted.add((logLine: LogLine) => {
|
||||
export function logToConsole(logger: Logger): void {
|
||||
logger.onLogEmitted.add((logLine: LogLine) => {
|
||||
const formatted = `${logLine.timestamp.toISOString()} ${logLine.level} ${logLine.message}`;
|
||||
|
||||
switch (logLine.level) {
|
||||
|
|
|
|||
|
|
@ -63,10 +63,15 @@ export class MockAgent extends MockClient {
|
|||
case LogLevel.ERROR:
|
||||
console.error(formatted);
|
||||
|
||||
if (!this.useSlowFileEvents) {
|
||||
if (!this.useSlowFileEvents && !formatted.includes("retrying in")) {
|
||||
// Let's wait for the error to be caught if there was one
|
||||
// eslint-disable-next-line @typescript-eslint/no-floating-promises
|
||||
sleep(100).then(() => process.exit(1));
|
||||
sleep(100).then(() => {
|
||||
console.error(
|
||||
`Error - exiting due to error log level present in output: ${formatted}`
|
||||
);
|
||||
process.exit(1);
|
||||
});
|
||||
}
|
||||
|
||||
break;
|
||||
|
|
@ -199,14 +204,14 @@ export class MockAgent extends MockClient {
|
|||
);
|
||||
this.client.logger.info(
|
||||
"Local files: " +
|
||||
Array.from(otherAgent.localFiles.keys()).join(", ")
|
||||
Array.from(otherAgent.localFiles.keys()).join(", ")
|
||||
);
|
||||
otherAgent.client.logger.info(
|
||||
"Local data: " + JSON.stringify(otherAgent.data, null, 2)
|
||||
);
|
||||
otherAgent.client.logger.info(
|
||||
"Local files: " +
|
||||
Array.from(otherAgent.localFiles.keys()).join(", ")
|
||||
Array.from(otherAgent.localFiles.keys()).join(", ")
|
||||
);
|
||||
|
||||
throw e;
|
||||
|
|
@ -230,20 +235,20 @@ export class MockAgent extends MockClient {
|
|||
});
|
||||
|
||||
if (this.doDeletes) {
|
||||
assert(
|
||||
found.length <= 1,
|
||||
`[${this.name}] Content ${content} found in ${found.join(", ")}`
|
||||
);
|
||||
// assert(
|
||||
// found.length <= 1,
|
||||
// `[${this.name}] Content ${content} found in ${found.join(", ")}`
|
||||
// );
|
||||
} else {
|
||||
assert(
|
||||
found.length >= 1,
|
||||
`[${this.name}] Content ${content} not found in any files`
|
||||
);
|
||||
|
||||
assert(
|
||||
found.length <= 1,
|
||||
`[${this.name}] Content ${content} found in multiple files: ${found.join(", ")}`
|
||||
);
|
||||
// assert(
|
||||
// found.length <= 1,
|
||||
// `[${this.name}] Content ${content} found in multiple files: ${found.join(", ")}`
|
||||
// );
|
||||
|
||||
const [file] = found;
|
||||
const fileContent = new TextDecoder().decode(
|
||||
|
|
@ -279,7 +284,7 @@ export class MockAgent extends MockClient {
|
|||
`Decided to create file ${file} with content ${content}`
|
||||
);
|
||||
|
||||
return this.create(file, new TextEncoder().encode(` ${content} `));
|
||||
return this.create(file, new TextEncoder().encode(` ${content} `), { ignoreSlowFileEvents: true });
|
||||
}
|
||||
|
||||
private async disableSyncAction(): Promise<void> {
|
||||
|
|
@ -320,7 +325,7 @@ export class MockAgent extends MockClient {
|
|||
this.client.logger.info(`Decided to rename file ${file} to ${newName}`);
|
||||
this.doNotTouchWhileOffline.push(file, newName);
|
||||
|
||||
return this.rename(file, newName);
|
||||
return this.rename(file, newName, { ignoreSlowFileEvents: true });
|
||||
}
|
||||
|
||||
private async updateFileAction(files: RelativePath[]): Promise<void> {
|
||||
|
|
@ -346,13 +351,13 @@ export class MockAgent extends MockClient {
|
|||
await this.atomicUpdateText(file, (old) => ({
|
||||
text: old.text + ` ${content} `,
|
||||
cursors: []
|
||||
}));
|
||||
}), { ignoreSlowFileEvents: true });
|
||||
}
|
||||
|
||||
private async deleteFileAction(files: RelativePath[]): Promise<void> {
|
||||
const file = choose(files);
|
||||
this.client.logger.info(`Decided to delete file ${file}`);
|
||||
return this.delete(file);
|
||||
return this.delete(file, { ignoreSlowFileEvents: true });
|
||||
}
|
||||
|
||||
private getContent(): string {
|
||||
|
|
|
|||
|
|
@ -64,7 +64,8 @@ export class MockClient implements FileSystemOperations {
|
|||
|
||||
public async create(
|
||||
path: RelativePath,
|
||||
newContent: Uint8Array
|
||||
newContent: Uint8Array,
|
||||
{ ignoreSlowFileEvents }: { ignoreSlowFileEvents: boolean } = { ignoreSlowFileEvents: false }
|
||||
): Promise<void> {
|
||||
if (this.localFiles.has(path)) {
|
||||
throw new Error(`File ${path} already exists`);
|
||||
|
|
@ -74,9 +75,9 @@ export class MockClient implements FileSystemOperations {
|
|||
);
|
||||
this.localFiles.set(path, newContent);
|
||||
|
||||
this.executeFileOperation(async () =>
|
||||
this.executeFileOperation((async () =>
|
||||
this.client.syncLocallyCreatedFile(path)
|
||||
);
|
||||
), ignoreSlowFileEvents);
|
||||
}
|
||||
|
||||
public async createDirectory(_path: RelativePath): Promise<void> {
|
||||
|
|
@ -85,7 +86,8 @@ export class MockClient implements FileSystemOperations {
|
|||
|
||||
public async atomicUpdateText(
|
||||
path: RelativePath,
|
||||
updater: (currentContent: TextWithCursors) => TextWithCursors
|
||||
updater: (currentContent: TextWithCursors) => TextWithCursors,
|
||||
{ ignoreSlowFileEvents }: { ignoreSlowFileEvents: boolean } = { ignoreSlowFileEvents: false }
|
||||
): Promise<string> {
|
||||
const file = this.localFiles.get(path);
|
||||
if (!file) {
|
||||
|
|
@ -102,13 +104,13 @@ export class MockClient implements FileSystemOperations {
|
|||
.map((part) => part.trim());
|
||||
const newParts = newContent.split(" ").map((part) => part.trim());
|
||||
existingParts.forEach((part) =>
|
||||
// all changes should be additive
|
||||
{
|
||||
assert(
|
||||
newParts.includes(part),
|
||||
`Part ${part} not found in new content: ${newContent}`
|
||||
);
|
||||
}
|
||||
// all changes should be additive
|
||||
{
|
||||
assert(
|
||||
newParts.includes(part),
|
||||
`Part ${part} not found in new content: ${newContent}`
|
||||
);
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
|
|
@ -116,11 +118,11 @@ export class MockClient implements FileSystemOperations {
|
|||
`Updated file ${path} with:\n current content: ${currentContent}\n new content: ${newContent}`
|
||||
);
|
||||
|
||||
this.executeFileOperation(async () =>
|
||||
this.executeFileOperation((async () =>
|
||||
this.client.syncLocallyUpdatedFile({
|
||||
relativePath: path
|
||||
})
|
||||
);
|
||||
), ignoreSlowFileEvents);
|
||||
|
||||
return newContent;
|
||||
}
|
||||
|
|
@ -144,20 +146,21 @@ export class MockClient implements FileSystemOperations {
|
|||
});
|
||||
}
|
||||
|
||||
public async delete(path: RelativePath): Promise<void> {
|
||||
public async delete(path: RelativePath, { ignoreSlowFileEvents }: { ignoreSlowFileEvents: boolean } = { ignoreSlowFileEvents: false }): Promise<void> {
|
||||
this.client.logger.info(
|
||||
`Deleting file: ${path} with:\n content ${new TextDecoder().decode(this.localFiles.get(path))}`
|
||||
);
|
||||
this.localFiles.delete(path);
|
||||
|
||||
this.executeFileOperation(async () =>
|
||||
this.executeFileOperation((async () =>
|
||||
this.client.syncLocallyDeletedFile(path)
|
||||
);
|
||||
), ignoreSlowFileEvents);
|
||||
}
|
||||
|
||||
public async rename(
|
||||
oldPath: RelativePath,
|
||||
newPath: RelativePath
|
||||
newPath: RelativePath,
|
||||
{ ignoreSlowFileEvents }: { ignoreSlowFileEvents: boolean } = { ignoreSlowFileEvents: false }
|
||||
): Promise<void> {
|
||||
const file = this.localFiles.get(oldPath);
|
||||
if (!file) {
|
||||
|
|
@ -172,16 +175,16 @@ export class MockClient implements FileSystemOperations {
|
|||
`Renamed file: ${oldPath} -> ${newPath} with:\n content ${new TextDecoder().decode(file)}`
|
||||
);
|
||||
|
||||
this.executeFileOperation(async () =>
|
||||
this.executeFileOperation((async () =>
|
||||
this.client.syncLocallyUpdatedFile({
|
||||
oldPath,
|
||||
relativePath: newPath
|
||||
})
|
||||
);
|
||||
), ignoreSlowFileEvents);
|
||||
}
|
||||
|
||||
private executeFileOperation(callback: () => unknown): void {
|
||||
if (this.useSlowFileEvents) {
|
||||
private executeFileOperation(callback: () => unknown, ignoreSlowFileEvents: boolean = false): void {
|
||||
if (this.useSlowFileEvents && !ignoreSlowFileEvents) {
|
||||
// we aren't the best client and it takes some time to notice changes
|
||||
setTimeout(callback, Math.random() * 100);
|
||||
} else {
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
import type { SyncSettings } from "sync-client";
|
||||
import { utils } from "sync-client";
|
||||
import { utils, debugging, Logger } from "sync-client";
|
||||
import { MockAgent } from "./agent/mock-agent";
|
||||
import { sleep } from "./utils/sleep";
|
||||
import { v4 as uuidv4 } from "uuid";
|
||||
|
|
@ -13,6 +13,9 @@ let slowFileEvents = false;
|
|||
// Whether to do resets in the test runs
|
||||
let doResets = false;
|
||||
|
||||
const logger = new Logger();
|
||||
debugging.logToConsole(logger);
|
||||
|
||||
async function runTest({
|
||||
agentCount,
|
||||
concurrency,
|
||||
|
|
@ -33,11 +36,13 @@ async function runTest({
|
|||
slowFileEvents = useSlowFileEvents;
|
||||
doResets = useResets;
|
||||
|
||||
|
||||
|
||||
const settings = `with ${agentCount} agents, concurrency ${concurrency}, iterations ${iterations}, doDeletes ${doDeletes}, doResets ${useResets}, jitterScaleInSeconds ${jitterScaleInSeconds}, useSlowFileEvents ${useSlowFileEvents}`;
|
||||
console.info(`Running test ${settings}`);
|
||||
logger.info(`Running test ${settings}`);
|
||||
|
||||
const vaultName = uuidv4();
|
||||
console.info(`Using vault name: ${vaultName}`);
|
||||
logger.info(`Using vault name: ${vaultName}`);
|
||||
const initialSettings: Partial<SyncSettings> = {
|
||||
isSyncEnabled: true,
|
||||
token: " test-token-change-me ", // same as in sync-server/config-e2e.yml with spaces
|
||||
|
|
@ -64,17 +69,17 @@ async function runTest({
|
|||
await utils.awaitAll(clients.map(async (client) => client.init()));
|
||||
|
||||
for (let i = 0; i < iterations; i++) {
|
||||
console.info(`Iteration ${i + 1}/${iterations}`);
|
||||
logger.info(`Iteration ${i + 1}/${iterations}`);
|
||||
await utils.awaitAll(clients.map(async (client) => client.act()));
|
||||
await sleep(Math.random() * 200);
|
||||
}
|
||||
|
||||
console.info("Stopping agents");
|
||||
logger.info("Stopping agents");
|
||||
|
||||
// Each agent can have unpushed changes which might conflict with eachother so each has to resolve the conflicts & push, and
|
||||
for (const client of clients) {
|
||||
try {
|
||||
console.info(`Finishing up ${client.name}`);
|
||||
logger.info(`Finishing up ${client.name}`);
|
||||
await client.finish();
|
||||
} catch (err) {
|
||||
if (!slowFileEvents) {
|
||||
|
|
@ -86,7 +91,7 @@ async function runTest({
|
|||
// then we need a second pass to ensure that all agents pull the same state.
|
||||
for (const client of clients) {
|
||||
try {
|
||||
console.info(`Destroying ${client.name}`);
|
||||
logger.info(`Destroying ${client.name}`);
|
||||
await client.destroy();
|
||||
} catch (err) {
|
||||
if (!slowFileEvents) {
|
||||
|
|
@ -95,27 +100,27 @@ async function runTest({
|
|||
}
|
||||
}
|
||||
|
||||
console.info("Agents finished successfully");
|
||||
logger.info("Agents finished successfully");
|
||||
|
||||
clients.slice(0, -1).forEach((client, i) => {
|
||||
console.info(
|
||||
logger.info(
|
||||
`Checking consistency between ${client.name} and ${clients[i + 1].name}`
|
||||
);
|
||||
client.assertFileSystemsAreConsistent(clients[i]);
|
||||
console.info(`Consistency check for ${client.name} passed`);
|
||||
logger.info(`Consistency check for ${client.name} passed`);
|
||||
});
|
||||
|
||||
console.info("File systems found to be consistent");
|
||||
logger.info("File systems found to be consistent");
|
||||
|
||||
clients.forEach((client) => {
|
||||
console.info(`Checking content for ${client.name}`);
|
||||
logger.info(`Checking content for ${client.name}`);
|
||||
client.assertAllContentIsPresentOnce();
|
||||
console.info(`Content check for ${client.name} passed`);
|
||||
logger.info(`Content check for ${client.name} passed`);
|
||||
});
|
||||
|
||||
console.info(`Test passed ${settings}`);
|
||||
logger.info(`Test passed ${settings}`);
|
||||
} catch (err) {
|
||||
console.error(`Test failed ${settings}`);
|
||||
logger.error(`Test failed ${settings}`);
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
|
|
@ -163,7 +168,7 @@ process.on("uncaughtException", (error) => {
|
|||
return;
|
||||
}
|
||||
|
||||
console.error("Uncaught exception:", error);
|
||||
logger.error(`Error - uncaught exception: ${error}`);
|
||||
process.exit(1);
|
||||
});
|
||||
|
||||
|
|
@ -191,7 +196,7 @@ process.on("unhandledRejection", (error, _promise) => {
|
|||
return;
|
||||
}
|
||||
|
||||
console.error("Unhandled rejection:", error);
|
||||
logger.error(`Error - unhandled rejection: ${error}`);
|
||||
process.exit(1);
|
||||
});
|
||||
|
||||
|
|
@ -199,7 +204,7 @@ runTests()
|
|||
.then(() => {
|
||||
process.exit(0);
|
||||
})
|
||||
.catch((err: unknown) => {
|
||||
console.error(err);
|
||||
.catch((error: unknown) => {
|
||||
logger.error(`Error - tests failed with ${error}`);
|
||||
process.exit(1);
|
||||
});
|
||||
|
|
|
|||
|
|
@ -9,24 +9,24 @@ server:
|
|||
max_clients_per_vault: 256
|
||||
response_timeout: 30m
|
||||
mergeable_file_extensions:
|
||||
- md
|
||||
- txt
|
||||
- md
|
||||
- txt
|
||||
users:
|
||||
user_configs:
|
||||
- name: admin
|
||||
token: test-token-change-me
|
||||
vault_access:
|
||||
type: allow_access_to_all
|
||||
- name: other-admin
|
||||
token: test-token-change-me2
|
||||
vault_access:
|
||||
type: allow_access_to_all
|
||||
- name: test
|
||||
token: other-test-token
|
||||
vault_access:
|
||||
type: allow_list
|
||||
allowed:
|
||||
- default
|
||||
- name: admin
|
||||
token: test-token-change-me
|
||||
vault_access:
|
||||
type: allow_access_to_all
|
||||
- name: other-admin
|
||||
token: test-token-change-me2
|
||||
vault_access:
|
||||
type: allow_access_to_all
|
||||
- name: test
|
||||
token: other-test-token
|
||||
vault_access:
|
||||
type: allow_list
|
||||
allowed:
|
||||
- default
|
||||
logging:
|
||||
log_directory: logs
|
||||
log_rotation: 7days
|
||||
|
|
|
|||
|
|
@ -104,8 +104,8 @@ impl Database {
|
|||
let connection_options = SqliteConnectOptions::new()
|
||||
.filename(file_name.clone())
|
||||
.create_if_missing(true)
|
||||
.auto_vacuum(sqlx::sqlite::SqliteAutoVacuum::Full)
|
||||
.busy_timeout(Duration::from_secs(3600))
|
||||
.auto_vacuum(sqlx::sqlite::SqliteAutoVacuum::Incremental)
|
||||
.busy_timeout(Duration::from_secs(30))
|
||||
.journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
|
||||
.log_slow_statements(log::LevelFilter::Warn, Duration::from_secs(30));
|
||||
|
||||
|
|
@ -130,26 +130,30 @@ impl Database {
|
|||
}
|
||||
|
||||
async fn get_connection_pool(&self, vault: &VaultId) -> Result<Pool<Sqlite>> {
|
||||
let mut pools = self.connection_pools.lock().await;
|
||||
|
||||
if !pools.contains_key(vault) {
|
||||
let pool = Self::create_vault_database(&self.config, vault).await?;
|
||||
pools.insert(
|
||||
vault.clone(),
|
||||
PoolWithTimestamp {
|
||||
pool,
|
||||
last_accessed: Instant::now(),
|
||||
},
|
||||
);
|
||||
// First, check if the pool exists without holding the lock during creation
|
||||
{
|
||||
let mut pools = self.connection_pools.lock().await;
|
||||
if let Some(pool_with_timestamp) = pools.get_mut(vault) {
|
||||
pool_with_timestamp.last_accessed = Instant::now();
|
||||
return Ok(pool_with_timestamp.pool.clone());
|
||||
}
|
||||
}
|
||||
|
||||
// Create the pool outside of the lock to avoid blocking other vaults
|
||||
// Note: This may result in multiple pools being created for the same vault
|
||||
// under high concurrency, but only one will be kept
|
||||
let new_pool = Self::create_vault_database(&self.config, vault).await?;
|
||||
|
||||
// Re-acquire lock and insert (or use existing if another task created it)
|
||||
let mut pools = self.connection_pools.lock().await;
|
||||
let pool_with_timestamp = pools
|
||||
.get_mut(vault)
|
||||
.expect("Pool was just inserted or already exists");
|
||||
.entry(vault.clone())
|
||||
.or_insert_with(|| PoolWithTimestamp {
|
||||
pool: new_pool.clone(),
|
||||
last_accessed: Instant::now(),
|
||||
});
|
||||
|
||||
// Update last accessed time
|
||||
pool_with_timestamp.last_accessed = Instant::now();
|
||||
|
||||
Ok(pool_with_timestamp.pool.clone())
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue