Set up monorepo workspace

This commit is contained in:
Andras Schmelczer 2025-02-18 22:36:53 +00:00
parent ae3acb9e1e
commit eb88d35c2e
No known key found for this signature in database
GPG key ID: FC8F2C3D3D1A718C
10 changed files with 207 additions and 26 deletions

28
sync-client/package.json Normal file
View file

@ -0,0 +1,28 @@
{
"name": "sync-client",
"version": "1.0.0",
"main": "dist/index.js",
"types": "dist/types/src/index.d.ts",
"scripts": {
"dev": "webpack watch --mode development",
"build": "webpack --mode production",
"test": "NODE_OPTIONS=\"$NODE_OPTIONS --experimental-vm-modules\" jest"
},
"devDependencies": {
"tslib": "2.4.0",
"typescript": "5.7.2",
"sync_lib": "file:../backend/sync_lib/pkg",
"@types/jest": "^29.5.14",
"@types/node": "^16.11.6",
"jest": "^29.7.0",
"ts-jest": "^29.2.5",
"p-queue": "^8.0.1",
"fetch-retry": "^6.0.0",
"byte-base64": "^1.1.0",
"openapi-fetch": "0.13.3",
"openapi-typescript": "7.4.4",
"ts-loader": "^9.5.1",
"webpack": "^5.97.1",
"webpack-cli": "^6.0.1"
}
}

48
sync-client/src/index.ts Normal file
View file

@ -0,0 +1,48 @@
export { applyRemoteChangesLocally } from "./sync-operations/apply-remote-changes-locally";
export {
type RelativePath,
type DocumentId,
type VaultUpdateId,
type DocumentMetadata
} from "./database/document-metadata";
export { Database } from "./database/database";
export {
SyncService,
type CheckConnectionResult
} from "./services/sync-service";
export { Syncer } from "./sync-operations/syncer";
export {
SyncHistory,
SyncType,
SyncSource,
SyncStatus,
type HistoryStats,
type HistoryEntry
} from "./tracing/sync-history";
export { Logger, LogLevel } from "./tracing/logger";
export { type FileOperations } from "./file-operations";
import init from "sync_lib";
import wasmBin from "sync_lib/sync_lib_bg.wasm";
export const initialize = async (): Promise<void> => {
await init(
// eslint-disable-next-line
(wasmBin as any).default // it is loaded as a base64 string by webpack
);
};
export {
isFileTypeMergable,
mergeText,
bytesToBase64,
base64ToBytes,
merge,
isBinary
} from "sync_lib";

View file

@ -0,0 +1,708 @@
import type { Database } from "../database/database";
import type {
DocumentMetadata,
RelativePath
} from "src/database/document-metadata";
import type { FileOperations } from "src/file-operations";
import type { SyncService } from "src/services/sync-service";
import { Logger } from "src/tracing/logger";
import type { SyncHistory } from "src/tracing/sync-history";
import { SyncSource, SyncStatus, SyncType } from "src/tracing/sync-history";
import { unlockDocument, waitForDocumentLock } from "./document-lock";
import PQueue from "p-queue";
import { EMPTY_HASH, hash } from "src/utils/hash";
import type { components } from "src/services/types";
import { deserialize } from "src/utils/deserialize";
export class Syncer {
private readonly remainingOperationsListeners: ((
remainingOperations: number
) => void)[] = [];
private readonly syncQueue: PQueue;
private isRunningOfflineSync = false;
public constructor(
private readonly database: Database,
private readonly syncService: SyncService,
private readonly operations: FileOperations,
private readonly history: SyncHistory
) {
this.syncQueue = new PQueue({
concurrency: database.getSettings().syncConcurrency
});
database.addOnSettingsChangeHandlers((settings) => {
this.syncQueue.concurrency = settings.syncConcurrency;
});
this.syncQueue.on("active", () => {
this.emitRemainingOperationsChange(this.syncQueue.size);
});
}
public addRemainingOperationsListener(
listener: (remainingOperations: number) => void
): void {
this.remainingOperationsListeners.push(listener);
}
public async syncLocallyCreatedFile(
relativePath: RelativePath,
updateTime: Date
): Promise<void> {
await this.syncQueue.add(async () =>
this.internalSyncLocallyCreatedFile(relativePath, updateTime)
);
}
public async syncLocallyUpdatedFile(args: {
oldPath?: RelativePath;
relativePath: RelativePath;
updateTime: Date;
}): Promise<void> {
await this.syncQueue.add(async () =>
this.internalSyncLocallyUpdatedFile(args)
);
}
public async syncLocallyDeletedFile(
relativePath: RelativePath
): Promise<void> {
await this.syncQueue.add(async () =>
this.internalSyncLocallyDeletedFile(relativePath)
);
}
public async syncRemotelyUpdatedFile(
remoteVersion: components["schemas"]["DocumentVersionWithoutContent"]
): Promise<void> {
await this.syncQueue.add(async () =>
this.internalSyncRemotelyUpdatedFile(remoteVersion)
);
}
public async scheduleSyncForOfflineChanges(): Promise<void> {
if (this.isRunningOfflineSync) {
Logger.getInstance().warn(
"Uploading local changes is already in progress, skipping"
);
return;
}
if (!this.database.getSettings().isSyncEnabled) {
Logger.getInstance().debug(
`Syncing is disabled, not uploading local changes`
);
return;
}
this.isRunningOfflineSync = true;
try {
const allLocalFiles = await this.operations.listAllFiles();
let locallyDeletedFiles = [
...this.database.getDocuments().entries()
].filter(([path, _]) => !allLocalFiles.includes(path));
await Promise.all(
allLocalFiles.map(async (relativePath) =>
this.syncQueue.add(async () => {
const metadata =
this.database.getDocument(relativePath);
// If there's no metadata, it must be a new file
if (!metadata) {
// Perhaps the file has been moved. Let's check by looking at the deleted files
const contentBytes =
await this.operations.read(relativePath);
const contentHash = hash(contentBytes);
const originalFile =
await this.findMatchingFileBasedOnHash(
contentHash,
locallyDeletedFiles
);
if (originalFile !== undefined) {
// `originalFile` hasn't been deleted but it got moved instead
locallyDeletedFiles =
locallyDeletedFiles.filter(
(item) => item != originalFile
);
Logger.getInstance().debug(
`Document ${relativePath} was not found under its current path in the database but was found under a different path ${originalFile[0]}, scheduling sync to move it`
);
return this.internalSyncLocallyUpdatedFile({
oldPath: originalFile[0],
relativePath: relativePath,
updateTime:
await this.operations.getModificationTime(
relativePath
),
optimisations: {
contentBytes,
contentHash
}
});
}
Logger.getInstance().debug(
`Document ${relativePath} not found in database, scheduling sync to create it`
);
return this.internalSyncLocallyCreatedFile(
relativePath,
await this.operations.getModificationTime(
relativePath
)
);
}
Logger.getInstance().debug(
`Document ${relativePath} has been updated locally, scheduling sync to update it`
);
return this.internalSyncLocallyUpdatedFile({
relativePath,
updateTime:
await this.operations.getModificationTime(
relativePath
)
});
})
)
);
await Promise.all(
locallyDeletedFiles.map(async ([relativePath, _]) => {
Logger.getInstance().debug(
`Document ${relativePath} has been deleted locally, scheduling sync to delete it`
);
if (await this.operations.exists(relativePath)) {
Logger.getInstance().debug(
`Document ${relativePath} actually exists locally, skipping`
);
return Promise.resolve();
}
return this.internalSyncLocallyDeletedFile(relativePath);
})
);
Logger.getInstance().info(
`All local changes have been applied remotely`
);
} catch (e) {
Logger.getInstance().error(
`Not all local changes have been applied remotely: ${e}`
);
} finally {
this.isRunningOfflineSync = false;
}
}
public async reset(): Promise<void> {
this.syncQueue.clear();
await this.syncQueue.onEmpty();
await this.database.resetSyncState();
this.history.reset();
this.remainingOperationsListeners.forEach((listener) => {
listener(0);
});
}
private async internalSyncLocallyCreatedFile(
relativePath: RelativePath,
updateTime: Date,
optimisations?: {
contentBytes?: Uint8Array;
contentHash?: string;
}
): Promise<void> {
await this.executeWhileHoldingFileLock(
relativePath,
SyncType.CREATE,
SyncSource.PUSH,
async () => {
if (
(await this.operations.getFileSize(relativePath)) /
1024 /
1024 >
this.database.getSettings().maxFileSizeMB
) {
this.history.addHistoryEntry({
status: SyncStatus.ERROR,
relativePath,
message: `File size exceeds the maximum file size limit of ${
this.database.getSettings().maxFileSizeMB
}MB`,
type: SyncType.CREATE
});
return;
}
const contentBytes =
optimisations?.contentBytes ??
(await this.operations.read(relativePath));
let contentHash =
optimisations?.contentHash ?? hash(contentBytes);
const localMetadata = this.database.getDocument(relativePath);
if (localMetadata) {
Logger.getInstance().debug(
`Document metadata already exists for ${relativePath}, it must have been downloaded from the server`
);
if (localMetadata.hash === contentHash) {
this.history.addHistoryEntry({
status: SyncStatus.NO_OP,
relativePath,
message: `File hash matches with last synced version, no need to sync`,
type: SyncType.UPDATE
});
return;
}
}
const response = await this.syncService.create({
relativePath,
contentBytes,
createdDate: updateTime
});
this.history.addHistoryEntry({
status: SyncStatus.SUCCESS,
source: SyncSource.PUSH,
relativePath,
message: `Successfully uploaded locally created file`,
type: SyncType.CREATE
});
if (response.type === "MergingUpdate") {
const responseBytes = deserialize(response.contentBase64);
contentHash = hash(responseBytes);
await this.operations.write(
relativePath,
contentBytes,
responseBytes
);
this.history.addHistoryEntry({
status: SyncStatus.SUCCESS,
source: SyncSource.PULL,
relativePath,
message: `The file we created locally has already existed remotely, so we have merged them`,
type: SyncType.UPDATE
});
}
await this.database.setDocument({
documentId: response.documentId,
relativePath: response.relativePath,
parentVersionId: response.vaultUpdateId,
hash: contentHash
});
await this.tryIncrementVaultUpdateId(response.vaultUpdateId);
}
);
}
private async internalSyncLocallyUpdatedFile({
oldPath,
relativePath,
updateTime,
optimisations
}: {
oldPath?: RelativePath;
relativePath: RelativePath;
updateTime: Date;
optimisations?: {
contentBytes?: Uint8Array;
contentHash?: string;
};
}): Promise<void> {
await this.executeWhileHoldingFileLock(
relativePath,
SyncType.UPDATE,
SyncSource.PUSH,
async () => {
if (
(await this.operations.getFileSize(relativePath)) /
1024 /
1024 >
this.database.getSettings().maxFileSizeMB
) {
this.history.addHistoryEntry({
status: SyncStatus.ERROR,
relativePath,
message: `File size exceeds the maximum file size limit of ${
this.database.getSettings().maxFileSizeMB
}MB`,
type: SyncType.CREATE
});
return;
}
const localMetadata = this.database.getDocument(
oldPath ?? relativePath
);
if (!localMetadata) {
if (this.database.getDocument(relativePath)) {
this.history.addHistoryEntry({
status: SyncStatus.NO_OP,
relativePath,
message: `The renaming doesn't require a sync because it must have been pulled from remote`,
type: SyncType.UPDATE
});
return;
}
throw new Error(
`Document metadata not found for ${relativePath}. This implies a corrupt local database. Consider resetting the plugin's sync history.`
);
}
const contentBytes =
optimisations?.contentBytes ??
(await this.operations.read(relativePath));
let contentHash =
optimisations?.contentHash ?? hash(contentBytes);
if (
localMetadata.hash === contentHash &&
oldPath === undefined
) {
this.history.addHistoryEntry({
status: SyncStatus.NO_OP,
relativePath,
message: `File hash matches with last synced version, no need to sync`,
type: SyncType.UPDATE
});
return;
}
const response = await this.syncService.put({
documentId: localMetadata.documentId,
parentVersionId: localMetadata.parentVersionId,
relativePath,
contentBytes,
createdDate: updateTime
});
this.history.addHistoryEntry({
status: SyncStatus.SUCCESS,
source: SyncSource.PUSH,
relativePath,
message: `Successfully uploaded locally updated file to the remote server`,
type: SyncType.UPDATE
});
if (response.isDeleted) {
await this.operations.remove(oldPath ?? relativePath);
await this.database.removeDocument(oldPath ?? relativePath);
await this.tryIncrementVaultUpdateId(
response.vaultUpdateId
);
this.history.addHistoryEntry({
status: SyncStatus.SUCCESS,
source: SyncSource.PULL,
relativePath,
message:
"The file we tried to update had been deleted remotely, therefore, we have deleted it locally",
type: SyncType.DELETE
});
return;
}
if (response.relativePath != relativePath) {
await waitForDocumentLock(response.relativePath);
}
try {
if (response.relativePath != relativePath) {
await this.operations.move(
oldPath ?? relativePath,
response.relativePath
);
}
if (response.type === "MergingUpdate") {
const responseBytes = deserialize(
response.contentBase64
);
contentHash = hash(responseBytes);
await this.operations.write(
response.relativePath,
contentBytes,
responseBytes
);
this.history.addHistoryEntry({
status: SyncStatus.SUCCESS,
source: SyncSource.PULL,
relativePath,
message: `The file we updated had been updated remotely, so we downloaded the merged version`,
type: SyncType.UPDATE
});
}
await this.database.moveDocument({
documentId: localMetadata.documentId,
oldRelativePath: oldPath ?? relativePath,
relativePath: response.relativePath,
parentVersionId: response.vaultUpdateId,
hash: contentHash
});
await this.tryIncrementVaultUpdateId(
response.vaultUpdateId
);
} finally {
if (response.relativePath != relativePath) {
unlockDocument(response.relativePath);
}
}
}
);
}
private async internalSyncLocallyDeletedFile(
relativePath: RelativePath
): Promise<void> {
await this.executeWhileHoldingFileLock(
relativePath,
SyncType.DELETE,
SyncSource.PUSH,
async () => {
const localMetadata = this.database.getDocument(relativePath);
if (!localMetadata) {
this.history.addHistoryEntry({
status: SyncStatus.NO_OP,
relativePath,
message: `Locally deleted file hasn't been uploaded yet, so there's no need to delete it on the remote server`,
type: SyncType.DELETE
});
return;
}
await this.syncService.delete({
documentId: localMetadata.documentId,
relativePath,
createdDate: new Date() // We got the event now, so it must have been deleted just now
});
this.history.addHistoryEntry({
status: SyncStatus.SUCCESS,
source: SyncSource.PUSH,
relativePath,
message: `Successfully deleted locally deleted file on the remote server`,
type: SyncType.DELETE
});
await this.database.removeDocument(relativePath);
}
);
}
private async internalSyncRemotelyUpdatedFile(
remoteVersion: components["schemas"]["DocumentVersionWithoutContent"]
): Promise<void> {
await this.executeWhileHoldingFileLock(
remoteVersion.relativePath,
SyncType.UPDATE,
SyncSource.PULL,
async () => {
const localMetadata = this.database.getDocumentByDocumentId(
remoteVersion.documentId
);
if (!localMetadata) {
if (remoteVersion.isDeleted) {
this.history.addHistoryEntry({
status: SyncStatus.NO_OP,
source: SyncSource.PULL,
relativePath: remoteVersion.relativePath,
message: `Remotely deleted file hasn't been synced yet, so there's no need to delete it locally`,
type: SyncType.DELETE
});
return;
}
const content = (
await this.syncService.get({
documentId: remoteVersion.documentId
})
).contentBase64;
const contentBytes = deserialize(content);
await this.operations.create(
remoteVersion.relativePath,
contentBytes
);
await this.database.setDocument({
documentId: remoteVersion.documentId,
relativePath: remoteVersion.relativePath,
parentVersionId: remoteVersion.vaultUpdateId,
hash: hash(contentBytes)
});
this.history.addHistoryEntry({
status: SyncStatus.SUCCESS,
source: SyncSource.PULL,
relativePath: remoteVersion.relativePath,
message: `Successfully downloaded remote file which hasn't existed locally`,
type: SyncType.CREATE
});
return;
}
const [relativePath, metadata] = localMetadata;
if (metadata.parentVersionId === remoteVersion.vaultUpdateId) {
Logger.getInstance().debug(
`Document ${relativePath} is already up to date`
);
return;
}
if (relativePath !== remoteVersion.relativePath) {
await waitForDocumentLock(relativePath);
}
try {
if (remoteVersion.isDeleted) {
await this.operations.remove(relativePath);
await this.database.removeDocument(relativePath);
this.history.addHistoryEntry({
status: SyncStatus.SUCCESS,
source: SyncSource.PULL,
relativePath: remoteVersion.relativePath,
message: `Successfully deleted remotely deleted file locally`,
type: SyncType.DELETE
});
} else {
const currentContent =
await this.operations.read(relativePath);
const currentHash = hash(currentContent);
if (currentHash !== metadata.hash) {
Logger.getInstance().info(
`Document ${relativePath} has been updated both remotely and locally, letting the local file update event handle it`
);
return;
}
const content = (
await this.syncService.get({
documentId: remoteVersion.documentId
})
).contentBase64;
const contentBytes = deserialize(content);
const contentHash = hash(contentBytes);
if (relativePath !== remoteVersion.relativePath) {
await this.operations.move(
relativePath,
remoteVersion.relativePath
);
}
await this.operations.write(
remoteVersion.relativePath,
currentContent,
contentBytes
);
await this.database.moveDocument({
documentId: remoteVersion.documentId,
oldRelativePath: relativePath,
relativePath: remoteVersion.relativePath,
parentVersionId: remoteVersion.vaultUpdateId,
hash: contentHash
});
this.history.addHistoryEntry({
status: SyncStatus.SUCCESS,
source: SyncSource.PULL,
relativePath: remoteVersion.relativePath,
message: `Successfully updated remotely updated file locally`,
type: SyncType.UPDATE
});
}
} finally {
if (relativePath !== remoteVersion.relativePath) {
unlockDocument(relativePath);
}
}
}
);
}
private async executeWhileHoldingFileLock(
relativePath: RelativePath,
syncType: SyncType,
syncSource: SyncSource,
fn: () => Promise<void>
): Promise<void> {
if (!this.database.getSettings().isSyncEnabled) {
Logger.getInstance().info(
`Syncing is disabled, not syncing ${relativePath}`
);
return;
}
if (!this.operations.isFileEligibleForSync(relativePath)) {
Logger.getInstance().info(
`File ${relativePath} is not eligible for syncing`
);
return;
}
Logger.getInstance().debug(`Syncing ${relativePath}`);
await waitForDocumentLock(relativePath);
try {
await fn();
} catch (e) {
this.history.addHistoryEntry({
status: SyncStatus.ERROR,
relativePath,
message: `Failed to ${syncSource.toLocaleLowerCase()} file ${e} when trying to ${syncType.toLocaleLowerCase()} it`,
type: syncType,
source: syncSource
});
throw e;
} finally {
unlockDocument(relativePath);
}
}
private emitRemainingOperationsChange(remainingOperations: number): void {
this.remainingOperationsListeners.forEach((listener) => {
listener(remainingOperations);
});
}
private async tryIncrementVaultUpdateId(
responseVaultUpdateId: number
): Promise<void> {
if (this.database.getLastSeenUpdateId() === responseVaultUpdateId - 1) {
await this.database.setLastSeenUpdateId(responseVaultUpdateId);
}
}
private async findMatchingFileBasedOnHash(
contentHash: string,
candidates: [RelativePath, DocumentMetadata][]
): Promise<[RelativePath, DocumentMetadata] | undefined> {
if (contentHash != EMPTY_HASH) {
return undefined;
}
return candidates.find(
([_, document]) => document.hash === contentHash
);
}
}

15
sync-client/tsconfig.json Normal file
View file

@ -0,0 +1,15 @@
{
"compilerOptions": {
"composite": true,
"baseUrl": ".",
"target": "ES2022",
"noImplicitAny": true,
"moduleResolution": "node",
"strictNullChecks": true,
"esModuleInterop": true,
"lib": [
"DOM",
"ESNext"
]
}
}

View file

@ -0,0 +1,49 @@
const path = require("path");
module.exports = (_env, _argv) => ({
entry: "./src/index.ts",
devtool: "source-map",
module: {
rules: [
{
test: /\.ts$/,
use: [
{
loader: "ts-loader",
options: {
compilerOptions: {
declaration: true,
declarationDir: "./dist/types"
},
transpileOnly: false
}
}
]
},
{
test: /\.wasm$/,
type: "asset/inline"
}
]
},
optimization: {
minimize: false
},
resolve: {
extensions: [".ts", ".js"],
alias: {
root: __dirname,
src: path.resolve(__dirname, "src")
}
},
output: {
clean: true,
filename: "index.js",
library: {
name: "SyncClient",
type: "umd"
},
globalObject: "this",
path: path.resolve(__dirname, "dist")
}
});