Basic syncing in the plugin

This commit is contained in:
Andras Schmelczer 2024-12-15 15:47:08 +00:00
parent dfdf1d016b
commit d088d42a65
No known key found for this signature in database
GPG key ID: FC8F2C3D3D1A718C
17 changed files with 560 additions and 178 deletions

View file

@ -0,0 +1,54 @@
import { Vault } from "obsidian";
import { Database } from "./database/database";
import { SyncServer } from "./services/sync_service";
import { syncRemotelyUpdatedFile } from "./sync-operations/sync-remotely-updated-file";
import { Logger } from "./logger";
import { FileOperations } from "./file-operations/file-operations";
let isRunning = false;
export async function applyRemoteChangesLocally(
database: Database,
syncServer: SyncServer,
operations: FileOperations
) {
if (isRunning) {
Logger.getInstance().info("Sync already in progress, skipping");
return;
}
isRunning = true;
try {
if (!database.getSettings().isSyncEnabled) {
return;
}
const remote = await syncServer.getAll(database.getLastSeenUpdateId());
if (remote.latestDocuments.length === 0) {
Logger.getInstance().debug("No remote changes to apply");
return;
}
Logger.getInstance().info("Applying remote changes locally");
await Promise.all(
remote.latestDocuments.map((remoteDocument) =>
syncRemotelyUpdatedFile({
database,
syncServer,
operations: operations,
remoteVersion: remoteDocument,
})
)
);
await database.setLastSeenUpdateId(remote.lastUpdateId);
} catch (e) {
Logger.getInstance().error(
`Failed to apply remote changes locally: ${e}`
);
} finally {
isRunning = false;
}
}

View file

@ -112,6 +112,14 @@ export class Database {
await this.save();
}
public getDocumentByDocumentId(
documentId: DocumentId
): [RelativePath, DocumentMetadata] | undefined {
return [...this._documents.entries()].find(
([_, metadata]) => metadata.documentId === documentId
);
}
public async setDocument({
documentId,
relativePath,

View file

@ -3,13 +3,16 @@ import { FileEventHandler } from "./file-event-handler";
import { Logger } from "src/logger";
import { SyncServer } from "src/services/sync_service";
import { Database } from "src/database/database";
import { syncLocallyDeletedFile } from "src/sync-functions/sync-locally-deleted-file";
import { syncLocallyUpdatedFile } from "src/sync-functions/sync-locally-updated-file";
import { syncLocallyDeletedFile } from "src/sync-operations/sync-locally-deleted-file";
import { syncLocallyUpdatedFile } from "src/sync-operations/sync-locally-updated-file";
import { FileOperations } from "src/file-operations/file-operations";
import { syncLocallyCreatedFile } from "src/sync-operations/sync-locally-created-file";
export class SyncEventHandler implements FileEventHandler {
public constructor(
private database: Database,
private syncServer: SyncServer
private syncServer: SyncServer,
private operations: FileOperations
) {}
async onCreate(file: TAbstractFile): Promise<void> {
@ -23,10 +26,12 @@ export class SyncEventHandler implements FileEventHandler {
return;
}
await syncLocallyUpdatedFile({
await syncLocallyCreatedFile({
database: this.database,
syncServer: this.syncServer,
file,
operations: this.operations,
updateTime: new Date(file.stat.ctime),
filePath: file.path,
});
} else {
Logger.getInstance().info(`Folder created: ${file.path}, ignored`);
@ -70,7 +75,9 @@ export class SyncEventHandler implements FileEventHandler {
await syncLocallyUpdatedFile({
database: this.database,
syncServer: this.syncServer,
file,
operations: this.operations,
updateTime: new Date(file.stat.ctime),
filePath: file.path,
oldPath,
});
} else {
@ -94,7 +101,9 @@ export class SyncEventHandler implements FileEventHandler {
await syncLocallyUpdatedFile({
database: this.database,
syncServer: this.syncServer,
file,
operations: this.operations,
updateTime: new Date(file.stat.ctime),
filePath: file.path,
});
} else {
Logger.getInstance().info(`Folder modified: ${file.path}, ignored`);

View file

@ -0,0 +1,18 @@
import { RelativePath } from "src/database/document-metadata";
export interface FileOperations {
read(path: RelativePath): Promise<Uint8Array>;
create(path: RelativePath, newContent: Uint8Array): Promise<void>;
// Writes new content to the file at the given path. If the file's content has changed since the expectedContent was read, the write will merge the changes.
write(
path: RelativePath,
expectedContent: Uint8Array,
newContent: Uint8Array
): Promise<Uint8Array>;
remove(path: RelativePath): Promise<void>;
move(oldPath: RelativePath, newPath: RelativePath): Promise<void>;
}

View file

@ -0,0 +1,71 @@
import { normalizePath, Vault } from "obsidian";
import { FileOperations } from "./file-operations";
import * as lib from "../../../backend/sync_lib/pkg/sync_lib.js";
import { isEqualBytes } from "src/utils/is-equal-bytes";
import { RelativePath } from "src/database/document-metadata";
export class ObsidianFileOperations implements FileOperations {
public constructor(private vault: Vault) {}
async read(path: RelativePath): Promise<Uint8Array> {
return new Uint8Array(
await this.vault.adapter.readBinary(normalizePath(path))
);
}
async write(
path: RelativePath,
expectedContent: Uint8Array,
newContent: Uint8Array
): Promise<Uint8Array> {
if (!(await this.vault.adapter.exists(normalizePath(path)))) {
return new Uint8Array(0);
}
const currentContent = await this.read(path);
if (!isEqualBytes(currentContent, expectedContent)) {
const result = lib.merge(
expectedContent,
currentContent,
newContent
);
await this.vault.adapter.writeBinary(normalizePath(path), result);
return result;
} else {
await this.vault.adapter.writeBinary(
normalizePath(path),
newContent
);
return newContent;
}
}
async create(path: RelativePath, newContent: Uint8Array): Promise<void> {
if (await this.vault.adapter.exists(normalizePath(path))) {
await this.write(path, new Uint8Array(0), newContent);
return;
}
await this.vault.adapter.writeBinary(normalizePath(path), newContent);
}
async remove(path: RelativePath): Promise<void> {
if (await this.vault.adapter.exists(normalizePath(path))) {
return this.vault.adapter.remove(normalizePath(path));
}
}
async move(oldPath: RelativePath, newPath: RelativePath): Promise<void> {
if (oldPath === newPath) {
return;
}
this.vault.adapter.rename(
normalizePath(oldPath),
normalizePath(newPath)
);
}
}

View file

@ -1,6 +1,6 @@
import { Notice } from "obsidian";
enum LogLevel {
export enum LogLevel {
DEBUG,
INFO,
WARNING,
@ -66,8 +66,10 @@ export class Logger {
new Notice(message);
}
public getMessages(): LogLine[] {
return this.messages;
public getMessages(mininumSeverity: LogLevel): LogLine[] {
return this.messages.filter(
(message) => message.level >= mininumSeverity
);
}
private pushMessage(message: string, level: LogLevel): void {

View file

@ -20,9 +20,11 @@ import { SyncEventHandler } from "./events/sync-event-handler.js";
import { SyncServer } from "./services/sync_service.js";
import { Database } from "./database/database.js";
import { applyRemoteChangesLocally } from "./apply-remote-changes-locally.js";
import { ObsidianFileOperations } from "./file-operations/obsidian-file-operations.js";
export default class SyncPlugin extends Plugin {
private remoteListenerIntervalId: number | null = null;
private operations = new ObsidianFileOperations(this.app.vault);
async onload() {
Logger.getInstance().info('Starting plugin "Sample Plugin"');
@ -49,26 +51,32 @@ export default class SyncPlugin extends Plugin {
new SyncSettingsTab(this.app, this, database, syncServer)
);
const eventHandler = new SyncEventHandler(database, syncServer);
const eventHandler = new SyncEventHandler(
database,
syncServer,
this.operations
);
[
this.app.vault.on(
"create",
eventHandler.onCreate.bind(eventHandler)
),
this.app.vault.on(
"modify",
eventHandler.onModify.bind(eventHandler)
),
this.app.vault.on(
"delete",
eventHandler.onDelete.bind(eventHandler)
),
this.app.vault.on(
"rename",
eventHandler.onRename.bind(eventHandler)
),
].forEach((event) => this.registerEvent(event));
this.app.workspace.onLayoutReady(() =>
[
this.app.vault.on(
"create",
eventHandler.onCreate.bind(eventHandler)
),
this.app.vault.on(
"modify",
eventHandler.onModify.bind(eventHandler)
),
this.app.vault.on(
"delete",
eventHandler.onDelete.bind(eventHandler)
),
this.app.vault.on(
"rename",
eventHandler.onRename.bind(eventHandler)
),
].forEach((event) => this.registerEvent(event))
);
this.registerRemoteEventListener(
database,
@ -93,8 +101,6 @@ export default class SyncPlugin extends Plugin {
ribbonIconEl.addClass("my-plugin-ribbon-class");
}
onunload() {}
async activateView() {
const { workspace } = this.app;
@ -115,7 +121,7 @@ export default class SyncPlugin extends Plugin {
workspace.revealLeaf(leaf!);
}
unload(): void {
onunload(): void {
if (this.remoteListenerIntervalId) {
window.clearInterval(this.remoteListenerIntervalId);
}
@ -132,7 +138,11 @@ export default class SyncPlugin extends Plugin {
this.remoteListenerIntervalId = window.setInterval(
() =>
applyRemoteChangesLocally(database, syncServer, this.app.vault),
applyRemoteChangesLocally(
database,
syncServer,
this.operations
),
intervalMs
);
}

View file

@ -36,7 +36,7 @@ export class SyncServer {
},
});
Logger.getInstance().info(
Logger.getInstance().debug(
"Ping response: " + JSON.stringify(response.data)
);
@ -77,7 +77,7 @@ export class SyncServer {
throw new Error(`Failed to create document: ${response.error}`);
}
Logger.getInstance().info(
Logger.getInstance().debug(
"Created document " + JSON.stringify(response.data)
);
@ -123,7 +123,7 @@ export class SyncServer {
throw new Error(`Failed to update document: ${response.error}`);
}
Logger.getInstance().info(
Logger.getInstance().debug(
"Updated document " + JSON.stringify(response.data)
);
@ -163,7 +163,7 @@ export class SyncServer {
throw new Error(`Failed to delete document`);
}
Logger.getInstance().info(
Logger.getInstance().debug(
"Updated document " + JSON.stringify(response.data)
);
@ -195,7 +195,7 @@ export class SyncServer {
throw new Error(`Failed to get document: ${response.error}`);
}
Logger.getInstance().info(
Logger.getInstance().debug(
"Get document " + JSON.stringify(response.data)
);
@ -224,7 +224,7 @@ export class SyncServer {
throw new Error(`Failed to get documents: ${response.error}`);
}
Logger.getInstance().info(
Logger.getInstance().debug(
"Get document " + JSON.stringify(response.data)
);

View file

@ -1,39 +0,0 @@
import { DocumentId } from "src/database/document-metadata";
const locked = new Set<DocumentId>();
const waiters = new Map<DocumentId, Array<() => void>>();
export function tryLockDocument(documentId: DocumentId): boolean {
if (locked.has(documentId)) {
return false;
}
locked.add(documentId);
return true;
}
export function waitForDocumentLock(documentId: DocumentId): Promise<void> {
if (tryLockDocument(documentId)) {
return Promise.resolve();
}
return new Promise((resolve) => {
if (!waiters.has(documentId)) {
waiters.set(documentId, []);
}
waiters.get(documentId)!.push(resolve);
});
}
export function unlockDocument(documentId: DocumentId): void {
if (!locked.has(documentId)) {
throw new Error(`Document ${documentId} is not locked`);
}
if (waiters.has(documentId)) {
waiters.get(documentId)!.shift()?.();
} else {
locked.delete(documentId);
}
}

View file

@ -1,27 +0,0 @@
import { Database } from "src/database/database";
import { RelativePath } from "src/database/document-metadata";
import { Logger } from "src/logger";
import { SyncServer } from "src/services/sync_service";
export async function syncLocallyDeletedFile(
database: Database,
syncServer: SyncServer,
relativePath: RelativePath
) {
const metadata = database.getDocument(relativePath);
if (!metadata) {
Logger.getInstance().warn(
`Document metadata not found for ${relativePath}`
);
}
await syncServer.delete({
relativePath,
// We got the event now, so it must have been deleted just now
createdDate: new Date(),
});
if (metadata) {
await database.removeDocument(relativePath);
}
}

View file

@ -1,71 +0,0 @@
import * as lib from "../../../backend/sync_lib/pkg/sync_lib.js";
import { TFile } from "obsidian";
import { Database } from "src/database/database";
import { Logger } from "src/logger";
import { SyncServer } from "src/services/sync_service";
import { hash } from "src/utils/hash.js";
import { isEqualBytes } from "src/utils/is-equal-bytes.js";
export async function syncLocallyUpdatedFile({
database,
syncServer,
file,
oldPath,
}: {
database: Database;
syncServer: SyncServer;
file: TFile;
oldPath?: string;
}) {
const contentBytes = new Uint8Array(await file.vault.readBinary(file));
const contentHash = hash(contentBytes);
const metadata = database.getDocument(oldPath || file.path);
if (!metadata) {
Logger.getInstance().info(
`Document metadata not found for ${
oldPath || file.path
}, it must be new`
);
} else if (metadata.hash === contentHash) {
Logger.getInstance().info(
`Document hash matches, no need to sync ${file.path}`
);
return;
}
const response = await syncServer.put({
parentVersionId: metadata?.parentVersionId,
relativePath: file.path,
contentBytes,
createdDate: new Date(file.stat.ctime),
});
const localDbUpdatePromise = database.moveDocument({
oldRelativePath: oldPath || file.path,
relativePath: file.path,
parentVersionId: response.vaultUpdateId,
hash: contentHash,
});
if (file.path !== response.relativePath) {
await file.vault.rename(file, response.relativePath);
}
const newContentBytes = new Uint8Array(await file.vault.readBinary(file));
const responseBytes = lib.base64_to_bytes(response.contentBase64);
if (!isEqualBytes(contentBytes, newContentBytes)) {
Logger.getInstance().info(
`Content changed since sending original update request for ${file.path}`
);
const result = lib.merge(contentBytes, newContentBytes, responseBytes);
await file.vault.modifyBinary(file, result);
} else {
await file.vault.modifyBinary(file, responseBytes);
}
await localDbUpdatePromise;
}

View file

@ -0,0 +1,40 @@
import { RelativePath } from "src/database/document-metadata";
const locked = new Set<RelativePath>();
const waiters = new Map<RelativePath, Array<() => void>>();
export function tryLockDocument(relativePath: RelativePath): boolean {
if (locked.has(relativePath)) {
return false;
}
locked.add(relativePath);
return true;
}
export function waitForDocumentLock(relativePath: RelativePath): Promise<void> {
if (tryLockDocument(relativePath)) {
return Promise.resolve();
}
return new Promise((resolve) => {
if (!waiters.has(relativePath)) {
waiters.set(relativePath, []);
}
waiters.get(relativePath)!.push(resolve);
});
}
export function unlockDocument(relativePath: RelativePath): void {
if (!locked.has(relativePath)) {
throw new Error(`Document ${relativePath} is not locked`);
}
const nextWaiting = waiters.get(relativePath)?.shift();
if (nextWaiting) {
nextWaiting();
} else {
locked.delete(relativePath);
}
}

View file

@ -0,0 +1,59 @@
import * as lib from "../../../backend/sync_lib/pkg/sync_lib.js";
import { TFile } from "obsidian";
import { Database } from "src/database/database";
import { Logger } from "src/logger";
import { SyncServer } from "src/services/sync_service";
import { hash } from "src/utils/hash";
import { isEqualBytes } from "src/utils/is-equal-bytes";
import { unlockDocument, waitForDocumentLock } from "./locks.js";
import { FileOperations } from "src/file-operations/file-operations.js";
import { RelativePath } from "src/database/document-metadata.js";
/// This can be used when updating a files content and/or path.
export async function syncLocallyCreatedFile({
database,
syncServer,
operations,
updateTime,
filePath,
}: {
database: Database;
syncServer: SyncServer;
operations: FileOperations;
updateTime: Date;
filePath: RelativePath;
}): Promise<void> {
await waitForDocumentLock(filePath);
try {
const metadata = database.getDocument(filePath);
if (metadata) {
throw new Error(
`Document metadata found for ${filePath}, this is unexpected`
);
}
const contentBytes = await operations.read(filePath);
const response = await syncServer.create({
relativePath: filePath,
contentBytes,
createdDate: updateTime,
});
const responseBytes = lib.base64_to_bytes(response.contentBase64);
await operations.write(filePath, contentBytes, responseBytes);
await database.setDocument({
documentId: response.documentId,
relativePath: response.relativePath,
parentVersionId: response.vaultUpdateId,
hash: hash(responseBytes),
});
} catch (e) {
Logger.getInstance().error(
`Failed to sync locally updated file ${filePath}: ${e}`
);
} finally {
unlockDocument(filePath);
}
}

View file

@ -0,0 +1,38 @@
import { Database } from "src/database/database";
import { RelativePath } from "src/database/document-metadata";
import { Logger } from "src/logger";
import { SyncServer } from "src/services/sync_service";
import { tryLockDocument, unlockDocument, waitForDocumentLock } from "./locks";
export async function syncLocallyDeletedFile(
database: Database,
syncServer: SyncServer,
relativePath: RelativePath
): Promise<void> {
await waitForDocumentLock(relativePath);
try {
const metadata = database.getDocument(relativePath);
if (!metadata) {
Logger.getInstance().warn(
`Document metadata not found for ${relativePath}`
);
return;
}
await syncServer.delete({
documentId: metadata.documentId,
relativePath,
// We got the event now, so it must have been deleted just now
createdDate: new Date(),
});
await database.removeDocument(relativePath);
} catch (e) {
Logger.getInstance().error(
`Failed to sync locally deleted file ${relativePath}: ${e}`
);
} finally {
unlockDocument(relativePath);
}
}

View file

@ -0,0 +1,99 @@
import * as lib from "../../../backend/sync_lib/pkg/sync_lib.js";
import { TFile } from "obsidian";
import { Database } from "src/database/database";
import { Logger } from "src/logger";
import { SyncServer } from "src/services/sync_service";
import { hash } from "src/utils/hash";
import { isEqualBytes } from "src/utils/is-equal-bytes";
import { unlockDocument, waitForDocumentLock } from "./locks.js";
import { FileOperations } from "src/file-operations/file-operations.js";
import { RelativePath } from "src/database/document-metadata.js";
/// This can be used when updating a files content and/or path.
export async function syncLocallyUpdatedFile({
database,
syncServer,
operations,
updateTime,
filePath,
oldPath,
}: {
database: Database;
syncServer: SyncServer;
operations: FileOperations;
updateTime: Date;
filePath: RelativePath;
oldPath?: RelativePath;
}): Promise<void> {
await waitForDocumentLock(filePath);
try {
const metadata = database.getDocument(oldPath || filePath);
if (!metadata) {
throw new Error(`Document metadata not found for ${filePath}`);
}
const contentBytes = await operations.read(filePath);
const contentHash = hash(contentBytes);
if (metadata.hash === contentHash && !oldPath) {
Logger.getInstance().info(
`Document hash matches, no need to sync ${filePath}`
);
return;
}
const response = await syncServer.put({
documentId: metadata.documentId,
parentVersionId: metadata.parentVersionId,
relativePath: filePath,
contentBytes,
createdDate: updateTime,
});
if (response.isDeleted) {
await operations.remove(oldPath || filePath);
if (metadata) {
await database.removeDocument(oldPath || filePath);
}
return;
}
const responseBytes = lib.base64_to_bytes(response.contentBase64);
if (response.relativePath != filePath) {
await waitForDocumentLock(response.relativePath);
try {
await operations.move(
oldPath || filePath,
response.relativePath
);
await operations.write(
response.relativePath,
contentBytes,
responseBytes
);
} finally {
unlockDocument(response.relativePath);
}
} else {
await operations.write(filePath, contentBytes, responseBytes);
}
await database.moveDocument({
documentId: metadata.documentId,
oldRelativePath: oldPath || filePath,
relativePath: response.relativePath,
parentVersionId: response.vaultUpdateId,
hash: contentHash,
});
} catch (e) {
Logger.getInstance().error(
`Failed to sync locally updated file ${filePath}: ${e}`
);
} finally {
unlockDocument(filePath);
}
}

View file

@ -0,0 +1,111 @@
import { Vault } from "obsidian";
import { Database } from "src/database/database";
import { unlockDocument, waitForDocumentLock } from "./locks";
import { SyncServer } from "src/services/sync_service";
import * as lib from "../../../backend/sync_lib/pkg/sync_lib.js";
import { hash } from "src/utils/hash";
import { Logger } from "src/logger";
import { components } from "src/services/types";
import { FileOperations } from "src/file-operations/file-operations";
export async function syncRemotelyUpdatedFile({
database,
syncServer,
operations,
remoteVersion,
}: {
database: Database;
syncServer: SyncServer;
operations: FileOperations;
remoteVersion: components["schemas"]["DocumentVersionWithoutContent"];
}): Promise<void> {
Logger.getInstance().info(
`Syncing remotely updated file ${remoteVersion.relativePath}`
);
const content = (
await syncServer.get({
documentId: remoteVersion.documentId,
})
).contentBase64;
const currentVersion = database.getDocumentByDocumentId(
remoteVersion.documentId
);
if (!currentVersion) {
if (remoteVersion.isDeleted) {
return;
}
Logger.getInstance().info(
`Document metadata not found for ${remoteVersion.relativePath}, it must be new`
);
await waitForDocumentLock(remoteVersion.relativePath);
try {
const contentBytes = lib.base64_to_bytes(content);
operations.create(remoteVersion.relativePath, contentBytes);
await database.setDocument({
documentId: remoteVersion.documentId,
relativePath: remoteVersion.relativePath,
parentVersionId: remoteVersion.vaultUpdateId,
hash: hash(contentBytes),
});
} finally {
unlockDocument(remoteVersion.relativePath);
}
return;
}
const [relativePath, metadata] = currentVersion;
await waitForDocumentLock(relativePath);
try {
if (remoteVersion.isDeleted) {
Logger.getInstance().info(
`Document ${relativePath} has been deleted remotely`
);
await operations.remove(relativePath);
if (metadata) {
await database.removeDocument(relativePath);
}
} else {
const currentContent = await operations.read(relativePath);
const currentHash = hash(currentContent);
if (currentHash !== metadata.hash) {
Logger.getInstance().info(
`Document ${relativePath} has been updated both remotely and locally, skipping`
);
return;
} else {
if (relativePath !== remoteVersion.relativePath) {
await operations.move(
relativePath,
remoteVersion.relativePath
);
}
const contentBytes = lib.base64_to_bytes(content);
await operations.write(
remoteVersion.relativePath,
currentContent,
contentBytes
);
await database.moveDocument({
documentId: remoteVersion.documentId,
oldRelativePath: relativePath,
relativePath: remoteVersion.relativePath,
parentVersionId: remoteVersion.vaultUpdateId,
hash: metadata.hash,
});
}
}
} catch (e) {
Logger.getInstance().error(
`Failed to sync remotely updated file ${remoteVersion.relativePath}: ${e}`
);
} finally {
unlockDocument(relativePath);
}
}

View file

@ -1,5 +1,5 @@
import { ItemView, WorkspaceLeaf } from "obsidian";
import { Logger } from "src/logger";
import { Logger, LogLevel } from "src/logger";
export class SyncView extends ItemView {
public static TYPE = "example-view";
@ -29,7 +29,7 @@ export class SyncView extends ItemView {
container.empty();
const messages = Logger.getInstance()
.getMessages()
.getMessages(LogLevel.INFO)
.map((message) => message.toString())
.join("\n");