Improve sync logic

This commit is contained in:
Andras Schmelczer 2024-12-20 16:19:10 +00:00
parent ee76a6e26e
commit 359571a2a0
No known key found for this signature in database
GPG key ID: FC8F2C3D3D1A718C
8 changed files with 487 additions and 304 deletions

View file

@ -23,6 +23,15 @@
- `cargo install sqlx-cli`
## cut new version
```sh
cd plugin
npm version patch
git tag -a 0.0.2 -m "0.0.2"
git push origin 0.0.2
```
@ -34,7 +43,7 @@
- e2e tests
- add clap
- add auth middleware
- add request logs
- run eslint in ci
- CI for:
- publish reconcile
@ -56,3 +65,10 @@ question_mark_used = { level = "allow", priority = 1 }
implicit_return = { level = "allow", priority = 1 }
pedantic = { level = "warn", priority = 0 }
cargo = { level = "warn", priority = 0 }
reset should reset counters
access logs
retry
mem usage

View file

@ -1,41 +1,36 @@
import { Editor, MarkdownView, Plugin, WorkspaceLeaf } from "obsidian";
import type { WorkspaceLeaf } from "obsidian";
import { Plugin } from "obsidian";
import * as lib from "../../backend/sync_lib/pkg/sync_lib.js";
import * as wasmBin from "../../backend/sync_lib/pkg/sync_lib_bg.wasm";
import { SyncSettingsTab } from "./views/settings-tab";
import { SyncView } from "./views/sync-view";
import { Logger } from "./logger";
import { SyncEventHandler } from "./events/sync-event-handler";
import { ObsidianFileEventHandler } from "./events/obisidan-event-handler.js";
import { SyncService } from "./services/sync-service";
import { Database } from "./database/database";
import { applyRemoteChangesLocally } from "./sync-operations/apply-remote-changes-locally";
import { ObsidianFileOperations } from "./file-operations/obsidian-file-operations";
import { applyLocalChangesRemotely } from "./sync-operations/apply-local-changes-remotely";
import { StatusBar } from "./views/status-bar";
import { Logger } from "./tracing/logger.js";
import { SyncHistory } from "./tracing/sync-history.js";
export default class SyncPlugin extends Plugin {
private remoteListenerIntervalId: number | null = null;
private operations = new ObsidianFileOperations(this.app.vault);
private readonly operations = new ObsidianFileOperations(this.app.vault);
private readonly history = new SyncHistory();
async onload() {
Logger.getInstance().info('Starting plugin "Sample Plugin"');
public async onload(): Promise<void> {
Logger.getInstance().info("Starting plugin");
await lib.default(
Promise.resolve(
(wasmBin as any).default // eslint-disable-line @typescript-eslint/no-explicit-any
// eslint-disable-next-line
(wasmBin as any).default
)
);
this.addCommand({
id: "sample-editor-command",
name: "Sample editor command",
editorCallback: (editor: Editor, _view: MarkdownView) => {
console.log(editor.getSelection());
editor.replaceSelection("Sample Editor Command");
},
});
const database = new Database(
await this.loadData(),
this.saveData.bind(this)
@ -43,19 +38,28 @@ export default class SyncPlugin extends Plugin {
const syncServer = new SyncService(database);
new StatusBar(this, syncServer);
new StatusBar(this, this.history);
this.addSettingTab(
new SyncSettingsTab(this.app, this, database, syncServer)
new SyncSettingsTab(
this.app,
this,
database,
syncServer,
this.history
)
);
const eventHandler = new SyncEventHandler(
const eventHandler = new ObsidianFileEventHandler(
database,
syncServer,
this.operations
this.operations,
this.history
);
this.app.workspace.onLayoutReady(() => {
this.app.workspace.onLayoutReady(async () => {
Logger.getInstance().info("Initialising sync handlers");
[
this.app.vault.on(
"create",
@ -73,9 +77,18 @@ export default class SyncPlugin extends Plugin {
"rename",
eventHandler.onRename.bind(eventHandler)
),
].forEach((event) => this.registerEvent(event));
].forEach((event) => {
this.registerEvent(event);
});
applyLocalChangesRemotely(database, syncServer, this.operations);
await applyLocalChangesRemotely({
database,
syncServer,
operations: this.operations,
history: this.history,
});
Logger.getInstance().info("Sync handlers initialised");
});
this.registerRemoteEventListener(
@ -83,7 +96,9 @@ export default class SyncPlugin extends Plugin {
syncServer,
database.getSettings().fetchChangesUpdateIntervalMs
);
database.addOnSettingsChangeHandlers((settings, oldSettings) => {
// eslint-disable-next-line @typescript-eslint/no-misused-promises
database.addOnSettingsChangeHandlers(async (settings, oldSettings) => {
this.registerRemoteEventListener(
database,
syncServer,
@ -91,11 +106,12 @@ export default class SyncPlugin extends Plugin {
);
if (!oldSettings.isSyncEnabled && settings.isSyncEnabled) {
applyLocalChangesRemotely(
database,
await applyLocalChangesRemotely({
database: database,
syncServer,
this.operations
);
operations: this.operations,
history: this.history,
});
}
});
@ -104,12 +120,20 @@ export default class SyncPlugin extends Plugin {
const ribbonIconEl = this.addRibbonIcon(
"dice",
"Sample Plugin",
(_: MouseEvent) => this.activateView()
async (_: MouseEvent) => this.activateView()
);
ribbonIconEl.addClass("my-plugin-ribbon-class");
Logger.getInstance().info("Plugin loaded");
}
async activateView() {
public onunload(): void {
if (this.remoteListenerIntervalId !== null) {
window.clearInterval(this.remoteListenerIntervalId);
}
}
private async activateView(): Promise<void> {
const { workspace } = this.app;
let leaf: WorkspaceLeaf | null = null;
@ -117,21 +141,17 @@ export default class SyncPlugin extends Plugin {
if (leaves.length > 0) {
// A leaf with our view already exists, use that
leaf = leaves[0];
[leaf] = leaves;
} else {
// Our view could not be found in the workspace, create a new leaf
// in the right sidebar for it
// In the right sidebar for it
leaf = workspace.getRightLeaf(false);
await leaf?.setViewState({ type: SyncView.TYPE, active: true });
}
// "Reveal" the leaf in case it is in a collapsed sidebar
workspace.revealLeaf(leaf!);
}
onunload(): void {
if (this.remoteListenerIntervalId) {
window.clearInterval(this.remoteListenerIntervalId);
if (leaf) {
await workspace.revealLeaf(leaf);
}
}
@ -139,18 +159,20 @@ export default class SyncPlugin extends Plugin {
database: Database,
syncServer: SyncService,
intervalMs: number
) {
if (this.remoteListenerIntervalId) {
): void {
if (this.remoteListenerIntervalId !== null) {
window.clearInterval(this.remoteListenerIntervalId);
}
this.remoteListenerIntervalId = window.setInterval(
() =>
applyRemoteChangesLocally(
// eslint-disable-next-line @typescript-eslint/no-misused-promises
async () =>
applyRemoteChangesLocally({
database,
syncServer,
this.operations
),
operations: this.operations,
history: this.history,
}),
intervalMs
);
}

View file

@ -1,131 +1,130 @@
import { Database } from "../database/database";
import { SyncService } from "../services/sync-service";
import { Logger } from "../logger";
import { FileOperations } from "../file-operations/file-operations";
import type { Database } from "../database/database";
import type { SyncService } from "../services/sync-service";
import type { FileOperations } from "../file-operations/file-operations";
import { syncLocallyCreatedFile } from "./sync-locally-created-file";
import { EMPTY_HASH, hash } from "src/utils/hash";
import { syncLocallyUpdatedFile } from "./sync-locally-updated-file";
import { syncLocallyDeletedFile } from "./sync-locally-deleted-file";
import { Notice } from "obsidian";
import PQueue from "p-queue";
import { Logger } from "src/tracing/logger";
import type { SyncHistory } from "src/tracing/sync-history";
let isRunning = false;
export interface Progress {
processedFiles: number;
totalFiles: number;
}
export async function applyLocalChangesRemotely(
database: Database,
syncServer: SyncService,
operations: FileOperations
) {
console.log("applyLocalChangesRemotely");
export async function applyLocalChangesRemotely({
database,
syncServer,
operations,
history,
}: {
database: Database;
syncServer: SyncService;
operations: FileOperations;
history: SyncHistory;
}): Promise<void> {
if (isRunning) {
Logger.getInstance().info("Push sync already in progress, skipping");
Logger.getInstance().debug(
"Uploading local changes is already in progress, skipping"
);
return;
}
let tasks: Promise<void>[] = [];
isRunning = true;
try {
const tasks: Promise<void>[] = [];
const allLocalFiles = await operations.listAllFiles();
console.log(allLocalFiles);
const deletedFiles = [...database.getDocuments().entries()].filter(
([path, _]) => !allLocalFiles.includes(path)
);
const allLocalFiles = await operations.listAllFiles();
const locallyDeletedFiles = [
...database.getDocuments().entries(),
].filter(([path, _]) => !allLocalFiles.includes(path));
console.log(deletedFiles);
const promiseQueue = new PQueue({
concurrency: 1,
});
await Promise.all(
allLocalFiles.map((path) =>
promiseQueue.add(async () => {
const syncedState = database.getDocument(path);
if (!syncedState) {
Logger.getInstance().info(
`Document ${path} not found in database`
);
await Promise.all(
allLocalFiles.map(async (path) => {
const metadata = database.getDocument(path);
if (!metadata) {
const contentHash = hash(await operations.read(path));
if (contentHash != EMPTY_HASH) {
const match = deletedFiles.find(
([path, doc]) => doc.hash === contentHash
const match = locallyDeletedFiles.find(
([_, document]) => document.hash === contentHash
);
if (contentHash != EMPTY_HASH && match) {
locallyDeletedFiles.remove(match);
Logger.getInstance().debug(
`Document ${path} not found in database but found under a different path ${match[0]}, scheduling sync to update it`
);
if (match) {
const oldPath = match[0];
Logger.getInstance().info(
`Document ${path} found remotely under a different path (${oldPath}), moving`
);
tasks.push(
syncLocallyUpdatedFile({
database,
syncServer,
operations,
oldPath,
filePath: path,
updateTime:
await operations.getModificationTime(
path
),
})
);
deletedFiles.remove(match);
return;
}
}
tasks.push(
syncLocallyCreatedFile({
return syncLocallyUpdatedFile({
database,
syncServer,
operations,
history,
oldPath: match[0],
relativePath: path,
updateTime: await operations.getModificationTime(
path
),
filePath: path,
})
});
}
Logger.getInstance().debug(
`Document ${path} not found in database, scheduling sync to create it`
);
return;
return syncLocallyCreatedFile({
database,
syncServer,
operations,
history,
updateTime: await operations.getModificationTime(path),
relativePath: path,
});
}
const content = await operations.read(path);
if (syncedState.hash !== hash(content)) {
Logger.getInstance().info(
`Document ${path} has local changes, updating`
if (metadata.hash !== hash(content)) {
Logger.getInstance().debug(
`Document ${path} has been updated locally, scheduling sync to update it`
);
tasks.push(
syncLocallyUpdatedFile({
database,
syncServer,
operations,
filePath: path,
updateTime: await operations.getModificationTime(
path
),
})
);
return;
return syncLocallyUpdatedFile({
database,
syncServer,
operations,
history,
relativePath: path,
updateTime: await operations.getModificationTime(path),
});
}
})
)
);
deletedFiles.forEach(([relativePath, _]) => {
Logger.getInstance().info(
`Document ${relativePath} deleted locally, deleting`
return Promise.resolve();
})
);
tasks.push(
syncLocallyDeletedFile({
database,
syncServer,
relativePath,
...locallyDeletedFiles.map(async ([relativePath, _]) => {
Logger.getInstance().debug(
`Document ${relativePath} has been deleted locally, scheduling sync to delete it`
);
return syncLocallyDeletedFile({
database,
syncServer,
history,
relativePath,
});
})
);
});
await Promise.all(tasks);
new Notice("Local changes synced remotely");
try {
await Promise.all(tasks);
Logger.getInstance().info(
`All local changes have been applied remotely`
);
return;
} catch {
await Promise.allSettled(tasks);
Logger.getInstance().error(
`Not all local changes have been applied remotely`
);
}
} finally {
isRunning = false;
}
}

View file

@ -1,29 +1,37 @@
import { Database } from "src/database/database";
import { FileOperations } from "src/file-operations/file-operations";
import { Logger } from "src/logger";
import { SyncService } from "src/services/sync-service";
import type { Database } from "src/database/database";
import type { FileOperations } from "src/file-operations/file-operations";
import type { SyncService } from "src/services/sync-service";
import { syncRemotelyUpdatedFile } from "./sync-remotely-updated-file";
import { Logger } from "src/tracing/logger";
import type { SyncHistory } from "src/tracing/sync-history";
let isRunning = false;
export async function applyRemoteChangesLocally(
database: Database,
syncServer: SyncService,
operations: FileOperations
) {
if (isRunning) {
Logger.getInstance().info("Pull sync already in progress, skipping");
export async function applyRemoteChangesLocally({
database,
syncServer,
operations,
history,
}: {
database: Database;
syncServer: SyncService;
operations: FileOperations;
history: SyncHistory;
}): Promise<void> {
if (!database.getSettings().isSyncEnabled) {
Logger.getInstance().debug(
`Syncing is disabled, not fetching remote changes`
);
return;
} else if (isRunning) {
Logger.getInstance().debug(
"Applying remote changes locally is already in progress, skipping invocation"
);
return;
} else {
Logger.getInstance().info("Starting pull sync");
}
isRunning = true;
try {
if (!database.getSettings().isSyncEnabled) {
return;
}
const remote = await syncServer.getAll(database.getLastSeenUpdateId());
if (remote.latestDocuments.length === 0) {
@ -34,11 +42,12 @@ export async function applyRemoteChangesLocally(
Logger.getInstance().info("Applying remote changes locally");
await Promise.all(
remote.latestDocuments.map((remoteDocument) =>
remote.latestDocuments.map(async (remoteDocument) =>
syncRemotelyUpdatedFile({
database,
syncServer,
operations: operations,
history,
operations,
remoteVersion: remoteDocument,
})
)

View file

@ -1,57 +1,91 @@
import * as lib from "../../../backend/sync_lib/pkg/sync_lib.js";
import { Database } from "src/database/database";
import { Logger } from "src/logger";
import { SyncService } from "src/services/sync-service";
import type { Database } from "src/database/database";
import type { SyncService } from "src/services/sync-service";
import { hash } from "src/utils/hash";
import { unlockDocument, waitForDocumentLock } from "./locks";
import { FileOperations } from "src/file-operations/file-operations";
import { RelativePath } from "src/database/document-metadata";
import type { FileOperations } from "src/file-operations/file-operations";
import type { RelativePath } from "src/database/document-metadata";
import type { SyncHistory } from "src/tracing/sync-history.js";
import { SyncSource, SyncStatus, SyncType } from "src/tracing/sync-history.js";
import { Logger } from "src/tracing/logger.js";
/// This can be used when updating a files content and/or path.
export async function syncLocallyCreatedFile({
database,
syncServer,
operations,
history,
updateTime,
filePath,
relativePath,
}: {
database: Database;
syncServer: SyncService;
operations: FileOperations;
history: SyncHistory;
updateTime: Date;
filePath: RelativePath;
relativePath: RelativePath;
}): Promise<void> {
await waitForDocumentLock(filePath);
if (!database.getSettings().isSyncEnabled) {
Logger.getInstance().info(
`Syncing is disabled, not syncing ${relativePath}`
);
return;
}
Logger.getInstance().debug(`Syncing ${relativePath}`);
await waitForDocumentLock(relativePath);
try {
const metadata = database.getDocument(filePath);
const metadata = database.getDocument(relativePath);
if (metadata) {
throw new Error(
`Document metadata found for ${filePath}, this is unexpected`
`Document metadata found for ${relativePath}, this is unexpected. Consider resetting the plugin's sync history.`
);
}
const contentBytes = await operations.read(filePath);
const response = await syncServer.create({
relativePath: filePath,
contentBytes,
createdDate: updateTime,
const contentBytes = await operations.read(relativePath),
contentHash = hash(contentBytes),
response = await syncServer.create({
relativePath,
contentBytes,
createdDate: updateTime,
});
history.addHistoryEntry({
status: SyncStatus.SUCCESS,
source: SyncSource.PUSH,
relativePath,
message: `Successfully uploaded locally created file`,
type: SyncType.CREATE,
});
const responseBytes = lib.base64_to_bytes(response.contentBase64);
await operations.write(filePath, contentBytes, responseBytes);
const responseBytes = lib.base64_to_bytes(response.contentBase64),
responseHash = hash(responseBytes);
if (contentHash !== responseHash) {
await operations.write(relativePath, contentBytes, responseBytes);
history.addHistoryEntry({
status: SyncStatus.SUCCESS,
source: SyncSource.PULL,
relativePath,
message: `The file we created locally has already existed remotely, so we have merged them`,
type: SyncType.UPDATE,
});
}
await database.setDocument({
documentId: response.documentId,
relativePath: response.relativePath,
parentVersionId: response.vaultUpdateId,
hash: hash(responseBytes),
hash: responseHash,
});
} catch (e) {
Logger.getInstance().error(
`Failed to sync locally updated file ${filePath}: ${e}`
);
history.addHistoryEntry({
status: SyncStatus.ERROR,
relativePath,
message: `Failed to reconcile locally created file: ${e}`,
type: SyncType.CREATE,
});
throw e;
} finally {
unlockDocument(filePath);
unlockDocument(relativePath);
}
}

View file

@ -1,26 +1,41 @@
import { Database } from "src/database/database";
import { RelativePath } from "src/database/document-metadata";
import { Logger } from "src/logger";
import { SyncService } from "src/services/sync-service";
import type { Database } from "src/database/database";
import type { RelativePath } from "src/database/document-metadata";
import type { SyncService } from "src/services/sync-service";
import { unlockDocument, waitForDocumentLock } from "./locks";
import { Logger } from "src/tracing/logger";
import type { SyncHistory } from "src/tracing/sync-history";
import { SyncSource, SyncStatus, SyncType } from "src/tracing/sync-history";
export async function syncLocallyDeletedFile({
database,
syncServer,
history,
relativePath,
}: {
database: Database;
syncServer: SyncService;
history: SyncHistory;
relativePath: RelativePath;
}): Promise<void> {
if (!database.getSettings().isSyncEnabled) {
Logger.getInstance().info(
`Syncing is disabled, not syncing ${relativePath}`
);
return;
}
Logger.getInstance().debug(`Syncing ${relativePath}`);
await waitForDocumentLock(relativePath);
try {
const metadata = database.getDocument(relativePath);
if (!metadata) {
Logger.getInstance().warn(
`Document metadata not found for ${relativePath}`
);
history.addHistoryEntry({
status: SyncStatus.NO_OP,
relativePath,
message: `Locally deleted file hasn't been uploaded yet, so there's no need to delete it on the remote server`,
type: SyncType.DELETE,
});
return;
}
@ -32,10 +47,22 @@ export async function syncLocallyDeletedFile({
});
await database.removeDocument(relativePath);
history.addHistoryEntry({
status: SyncStatus.SUCCESS,
source: SyncSource.PUSH,
relativePath,
message: `Successfully deleted locally deleted file on the remote server`,
type: SyncType.DELETE,
});
} catch (e) {
Logger.getInstance().error(
`Failed to sync locally deleted file ${relativePath}: ${e}`
);
history.addHistoryEntry({
status: SyncStatus.ERROR,
relativePath,
message: `Failed to remotely delete locally deleted file: ${e}`,
type: SyncType.DELETE,
});
throw e;
} finally {
unlockDocument(relativePath);
}

View file

@ -1,71 +1,103 @@
import * as lib from "../../../backend/sync_lib/pkg/sync_lib.js";
import { Database } from "src/database/database";
import { Logger } from "src/logger";
import { SyncService } from "src/services/sync-service";
import type { Database } from "src/database/database";
import type { SyncService } from "src/services/sync-service";
import { hash } from "src/utils/hash";
import { unlockDocument, waitForDocumentLock } from "./locks";
import { FileOperations } from "src/file-operations/file-operations";
import { RelativePath } from "src/database/document-metadata";
import type { FileOperations } from "src/file-operations/file-operations";
import type { RelativePath } from "src/database/document-metadata";
import { Logger } from "src/tracing/logger.js";
import type { SyncHistory } from "src/tracing/sync-history.js";
import { SyncSource, SyncStatus, SyncType } from "src/tracing/sync-history.js";
/// This can be used when updating a files content and/or path.
/// This can be used when updating a file's content and/or path.
export async function syncLocallyUpdatedFile({
database,
syncServer,
operations,
history,
updateTime,
filePath,
relativePath,
oldPath,
}: {
database: Database;
syncServer: SyncService;
operations: FileOperations;
history: SyncHistory;
updateTime: Date;
filePath: RelativePath;
relativePath: RelativePath;
oldPath?: RelativePath;
}): Promise<void> {
await waitForDocumentLock(filePath);
if (!database.getSettings().isSyncEnabled) {
Logger.getInstance().info(
`Syncing is disabled, not syncing ${relativePath}`
);
return;
}
Logger.getInstance().debug(`Syncing ${relativePath}`);
await waitForDocumentLock(relativePath);
try {
const metadata = database.getDocument(oldPath || filePath);
const metadata = database.getDocument(oldPath ?? relativePath);
if (!metadata) {
throw new Error(`Document metadata not found for ${filePath}`);
throw new Error(
`Document metadata not found for ${relativePath}. Consider resetting the plugin's sync history.`
);
}
const contentBytes = await operations.read(filePath);
const contentHash = hash(contentBytes);
const contentBytes = await operations.read(relativePath),
contentHash = hash(contentBytes);
if (metadata.hash === contentHash && !oldPath) {
Logger.getInstance().info(
`Document hash matches, no need to sync ${filePath}`
);
if (metadata.hash === contentHash && oldPath !== undefined) {
history.addHistoryEntry({
status: SyncStatus.NO_OP,
relativePath,
message: `File hash matches with last synced version, no need to sync`,
type: SyncType.UPDATE,
});
return;
}
const response = await syncServer.put({
documentId: metadata.documentId,
parentVersionId: metadata.parentVersionId,
relativePath: filePath,
relativePath,
contentBytes,
createdDate: updateTime,
});
if (response.isDeleted) {
await operations.remove(oldPath || filePath);
history.addHistoryEntry({
status: SyncStatus.SUCCESS,
source: SyncSource.PUSH,
relativePath,
message: `Successfully uploaded locally updated file to the remote server`,
type: SyncType.UPDATE,
});
if (metadata) {
await database.removeDocument(oldPath || filePath);
}
if (response.isDeleted) {
await operations.remove(oldPath ?? relativePath);
await database.removeDocument(oldPath ?? relativePath);
history.addHistoryEntry({
status: SyncStatus.SUCCESS,
source: SyncSource.PULL,
relativePath,
message:
"The file we tried to update had been deleted remotely, therefore, we have deleted it locally",
type: SyncType.DELETE,
});
return;
}
const responseBytes = lib.base64_to_bytes(response.contentBase64);
if (response.relativePath != filePath) {
if (response.relativePath != relativePath) {
await waitForDocumentLock(response.relativePath);
try {
await operations.move(
oldPath || filePath,
oldPath ?? relativePath,
response.relativePath
);
await operations.write(
@ -73,25 +105,37 @@ export async function syncLocallyUpdatedFile({
contentBytes,
responseBytes
);
history.addHistoryEntry({
status: SyncStatus.SUCCESS,
source: SyncSource.PULL,
relativePath,
message:
"The file we updated had been moved remotely, therefore, we have moved it locally as well",
type: SyncType.UPDATE,
});
} finally {
unlockDocument(response.relativePath);
}
} else {
await operations.write(filePath, contentBytes, responseBytes);
await operations.write(relativePath, contentBytes, responseBytes);
}
await database.moveDocument({
documentId: metadata.documentId,
oldRelativePath: oldPath || filePath,
oldRelativePath: oldPath ?? relativePath,
relativePath: response.relativePath,
parentVersionId: response.vaultUpdateId,
hash: contentHash,
});
} catch (e) {
Logger.getInstance().error(
`Failed to sync locally updated file ${filePath}: ${e}`
);
history.addHistoryEntry({
status: SyncStatus.ERROR,
relativePath,
message: `Failed to reconcile locally updated file: ${e}`,
type: SyncType.UPDATE,
});
throw e;
} finally {
unlockDocument(filePath);
unlockDocument(relativePath);
}
}

View file

@ -1,110 +1,142 @@
import { Database } from "src/database/database";
import type { Database } from "src/database/database";
import { unlockDocument, waitForDocumentLock } from "./locks";
import { SyncService } from "src/services/sync-service";
import type { SyncService } from "src/services/sync-service";
import * as lib from "../../../backend/sync_lib/pkg/sync_lib.js";
import { hash } from "src/utils/hash";
import { Logger } from "src/logger";
import { components } from "src/services/types";
import { FileOperations } from "src/file-operations/file-operations";
import type { components } from "src/services/types";
import type { FileOperations } from "src/file-operations/file-operations";
import { Logger } from "src/tracing/logger";
import type { SyncHistory } from "src/tracing/sync-history";
import { SyncSource, SyncStatus, SyncType } from "src/tracing/sync-history";
export async function syncRemotelyUpdatedFile({
database,
syncServer,
operations,
history,
remoteVersion,
}: {
database: Database;
syncServer: SyncService;
operations: FileOperations;
history: SyncHistory;
remoteVersion: components["schemas"]["DocumentVersionWithoutContent"];
}): Promise<void> {
Logger.getInstance().info(
Logger.getInstance().debug(
`Syncing remotely updated file ${remoteVersion.relativePath}`
);
const content = (
await syncServer.get({
documentId: remoteVersion.documentId,
})
).contentBase64;
const currentVersion = database.getDocumentByDocumentId(
remoteVersion.documentId
);
try {
const content = (
await syncServer.get({
documentId: remoteVersion.documentId,
})
).contentBase64,
currentVersion = database.getDocumentByDocumentId(
remoteVersion.documentId
);
if (!currentVersion) {
if (remoteVersion.isDeleted) {
if (!currentVersion) {
if (remoteVersion.isDeleted) {
history.addHistoryEntry({
status: SyncStatus.NO_OP,
source: SyncSource.PULL,
relativePath: remoteVersion.relativePath,
message: `Remotely deleted file hasn't been synced yet, so there's no need to delete it locally`,
type: SyncType.DELETE,
});
return;
}
await waitForDocumentLock(remoteVersion.relativePath);
try {
const contentBytes = lib.base64_to_bytes(content);
await operations.create(
remoteVersion.relativePath,
contentBytes
);
await database.setDocument({
documentId: remoteVersion.documentId,
relativePath: remoteVersion.relativePath,
parentVersionId: remoteVersion.vaultUpdateId,
hash: hash(contentBytes),
});
history.addHistoryEntry({
status: SyncStatus.SUCCESS,
source: SyncSource.PULL,
relativePath: remoteVersion.relativePath,
message: `Successfully downloaded remote file which hasn't existed locally`,
type: SyncType.CREATE,
});
} finally {
unlockDocument(remoteVersion.relativePath);
}
return;
}
Logger.getInstance().info(
`Document metadata not found for ${remoteVersion.relativePath}, it must be new`
);
const [relativePath, metadata] = currentVersion;
await waitForDocumentLock(relativePath);
await waitForDocumentLock(remoteVersion.relativePath);
try {
const contentBytes = lib.base64_to_bytes(content);
operations.create(remoteVersion.relativePath, contentBytes);
await database.setDocument({
documentId: remoteVersion.documentId,
relativePath: remoteVersion.relativePath,
parentVersionId: remoteVersion.vaultUpdateId,
hash: hash(contentBytes),
});
} finally {
unlockDocument(remoteVersion.relativePath);
}
return;
}
const [relativePath, metadata] = currentVersion;
await waitForDocumentLock(relativePath);
try {
if (remoteVersion.isDeleted) {
Logger.getInstance().info(
`Document ${relativePath} has been deleted remotely`
);
await operations.remove(relativePath);
if (metadata) {
if (remoteVersion.isDeleted) {
await operations.remove(relativePath);
await database.removeDocument(relativePath);
}
} else {
const currentContent = await operations.read(relativePath);
const currentHash = hash(currentContent);
if (currentHash !== metadata.hash) {
Logger.getInstance().info(
`Document ${relativePath} has been updated both remotely and locally, skipping`
);
return;
} else {
if (relativePath !== remoteVersion.relativePath) {
await operations.move(
relativePath,
remoteVersion.relativePath
);
}
const contentBytes = lib.base64_to_bytes(content);
await operations.write(
remoteVersion.relativePath,
currentContent,
contentBytes
);
await database.moveDocument({
documentId: remoteVersion.documentId,
oldRelativePath: relativePath,
history.addHistoryEntry({
status: SyncStatus.SUCCESS,
source: SyncSource.PULL,
relativePath: remoteVersion.relativePath,
parentVersionId: remoteVersion.vaultUpdateId,
hash: metadata.hash,
message: `Successfully deleted remotely deleted file locally`,
type: SyncType.DELETE,
});
} else {
const currentContent = await operations.read(relativePath),
currentHash = hash(currentContent);
if (currentHash !== metadata.hash) {
Logger.getInstance().info(
`Document ${relativePath} has been updated both remotely and locally, skipping until the event is processed`
);
} else {
if (relativePath !== remoteVersion.relativePath) {
await operations.move(
relativePath,
remoteVersion.relativePath
);
}
const contentBytes = lib.base64_to_bytes(content);
await operations.write(
remoteVersion.relativePath,
currentContent,
contentBytes
);
await database.moveDocument({
documentId: remoteVersion.documentId,
oldRelativePath: relativePath,
relativePath: remoteVersion.relativePath,
parentVersionId: remoteVersion.vaultUpdateId,
hash: metadata.hash,
});
history.addHistoryEntry({
status: SyncStatus.SUCCESS,
source: SyncSource.PULL,
relativePath: remoteVersion.relativePath,
message: `Successfully updated remotely updated file locally`,
type: SyncType.UPDATE,
});
}
}
} finally {
unlockDocument(relativePath);
}
} catch (e) {
Logger.getInstance().error(
`Failed to sync remotely updated file ${remoteVersion.relativePath}: ${e}`
);
} finally {
unlockDocument(relativePath);
history.addHistoryEntry({
status: SyncStatus.ERROR,
source: SyncSource.PULL,
relativePath: remoteVersion.relativePath,
message: `Failed to reconcile remotely updated file: ${e}`,
});
throw e;
}
}