Move files

This commit is contained in:
Andras Schmelczer 2025-02-19 20:47:52 +00:00
parent 6bb051460e
commit dd6f63f357
No known key found for this signature in database
GPG key ID: FC8F2C3D3D1A718C
50 changed files with 72 additions and 78 deletions

View file

@ -0,0 +1,3 @@
module.exports = {
preset: "ts-jest/presets/js-with-babel-esm"
};

View file

@ -0,0 +1,28 @@
{
"name": "sync-client",
"version": "1.0.0",
"main": "dist/index.js",
"types": "dist/types/index.d.ts",
"scripts": {
"dev": "webpack watch --mode development",
"build": "webpack --mode production",
"test": "NODE_OPTIONS=\"$NODE_OPTIONS --experimental-vm-modules\" jest"
},
"devDependencies": {
"tslib": "2.8.1",
"typescript": "5.7.3",
"sync_lib": "file:../../backend/sync_lib/pkg",
"@types/jest": "^29.5.14",
"@types/node": "^22.13.4",
"jest": "^29.7.0",
"ts-jest": "^29.2.5",
"p-queue": "^8.1.0",
"fetch-retry": "^6.0.0",
"byte-base64": "^1.1.0",
"openapi-fetch": "0.13.4",
"openapi-typescript": "7.6.1",
"ts-loader": "^9.5.2",
"webpack": "^5.98.0",
"webpack-cli": "^6.0.1"
}
}

View file

@ -0,0 +1,186 @@
import type { SyncSettings } from "./sync-settings";
import { DEFAULT_SETTINGS } from "./sync-settings";
import type {
DocumentId,
DocumentMetadata,
RelativePath,
VaultUpdateId
} from "./document-metadata";
import { Logger } from "src/tracing/logger";
interface StoredDatabase {
documents: Map<RelativePath, DocumentMetadata>;
settings: SyncSettings;
lastSeenUpdateId: VaultUpdateId | undefined;
}
// Todo: split it into settings and documents
export class Database {
private _documents = new Map<RelativePath, DocumentMetadata>();
private _settings: SyncSettings;
private _lastSeenUpdateId: VaultUpdateId | undefined;
private readonly onSettingsChangeHandlers: ((
newSettings: SyncSettings,
oldSettings: SyncSettings
) => void)[] = [];
public constructor(
initialState: Partial<StoredDatabase> | undefined,
private readonly saveData: (data: unknown) => Promise<void>
) {
initialState ??= {};
if (
// eslint-disable-next-line @typescript-eslint/strict-boolean-expressions
Object.prototype.hasOwnProperty.call(initialState, "documents") &&
initialState.documents
) {
for (const [relativePath, metadata] of Object.entries(
initialState.documents
)) {
// eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion
this._documents.set(relativePath, metadata as DocumentMetadata);
}
}
Logger.getInstance().debug(`Loaded ${this._documents.size} documents`);
this._settings = {
...DEFAULT_SETTINGS,
...(initialState.settings ?? {})
};
Logger.getInstance().debug(
`Loaded settings: ${JSON.stringify(this._settings, null, 2)}`
);
this._lastSeenUpdateId = initialState.lastSeenUpdateId;
Logger.getInstance().debug(
`Loaded last seen update id: ${this._lastSeenUpdateId}`
);
}
public getDocuments(): Map<RelativePath, DocumentMetadata> {
return this._documents;
}
public getSettings(): SyncSettings {
return this._settings;
}
public async setSettings(value: SyncSettings): Promise<void> {
const oldSettings = this._settings;
this._settings = value;
this.onSettingsChangeHandlers.forEach((handler) => {
handler(value, oldSettings);
});
await this.save();
}
public addOnSettingsChangeHandlers(
handler: (settings: SyncSettings, oldSettings: SyncSettings) => void
): void {
this.onSettingsChangeHandlers.push(handler);
}
public async setSetting<T extends keyof SyncSettings>(
key: T,
value: SyncSettings[T]
): Promise<void> {
const newSettings = { ...this._settings, [key]: value };
Logger.getInstance().debug(
`Setting ${key} to ${value}, new settings: ${JSON.stringify(
newSettings,
null,
2
)}`
);
await this.setSettings(newSettings);
}
public getLastSeenUpdateId(): VaultUpdateId | undefined {
return this._lastSeenUpdateId;
}
public async setLastSeenUpdateId(
value: VaultUpdateId | undefined
): Promise<void> {
this._lastSeenUpdateId = value;
await this.save();
}
public async resetSyncState(): Promise<void> {
this._documents = new Map();
this._lastSeenUpdateId = 0;
await this.save();
}
public getDocumentByDocumentId(
documentId: DocumentId
): [RelativePath, DocumentMetadata] | undefined {
return [...this._documents.entries()].find(
([_, metadata]) => metadata.documentId === documentId
);
}
public async setDocument({
documentId,
relativePath,
parentVersionId,
hash
}: {
documentId: DocumentId;
relativePath: RelativePath;
parentVersionId: VaultUpdateId;
hash: string;
}): Promise<void> {
this._documents.set(relativePath, {
documentId,
parentVersionId,
hash
});
await this.save();
}
public async moveDocument({
documentId,
oldRelativePath,
relativePath,
parentVersionId,
hash
}: {
documentId: DocumentId;
oldRelativePath: RelativePath;
relativePath: RelativePath;
parentVersionId: VaultUpdateId;
hash: string;
}): Promise<void> {
this._documents.delete(oldRelativePath);
this._documents.set(relativePath, {
documentId,
parentVersionId,
hash
});
await this.save();
}
public async removeDocument(relativePath: RelativePath): Promise<void> {
this._documents.delete(relativePath);
await this.save();
}
public getDocument(
relativePath: RelativePath
): DocumentMetadata | undefined {
return this._documents.get(relativePath);
}
private async save(): Promise<void> {
await this.saveData({
documents: Object.fromEntries(this._documents.entries()),
settings: this._settings,
lastSeenUpdateId: this._lastSeenUpdateId
});
}
}

View file

@ -0,0 +1,9 @@
export type VaultUpdateId = number;
export type DocumentId = string;
export type RelativePath = string;
export interface DocumentMetadata {
parentVersionId: VaultUpdateId;
documentId: DocumentId;
hash: string;
}

View file

@ -0,0 +1,25 @@
import { LogLevel } from "src/tracing/logger";
export interface SyncSettings {
remoteUri: string;
token: string;
vaultName: string;
fetchChangesUpdateIntervalMs: number;
syncConcurrency: number;
isSyncEnabled: boolean;
displayNoopSyncEvents: boolean;
minimumLogLevel: LogLevel;
maxFileSizeMB: number;
}
export const DEFAULT_SETTINGS: SyncSettings = {
remoteUri: "",
token: "",
vaultName: "default",
fetchChangesUpdateIntervalMs: 1000,
syncConcurrency: 1,
isSyncEnabled: false,
displayNoopSyncEvents: false,
minimumLogLevel: LogLevel.INFO,
maxFileSizeMB: 10
};

View file

@ -0,0 +1,32 @@
import type { RelativePath } from "src/database/document-metadata";
export interface FileOperations {
listAllFiles: () => Promise<RelativePath[]>;
read: (path: RelativePath) => Promise<Uint8Array>;
getFileSize: (path: RelativePath) => Promise<number>;
exists: (path: RelativePath) => Promise<boolean>;
getModificationTime: (path: RelativePath) => Promise<Date>;
// Create and write the file if it doesn't exist. Otherwise, it has the same behavior as write.
// All parent directories are created if they don't exist.
create: (path: RelativePath, newContent: Uint8Array) => Promise<void>;
// Update the file at the given path.
// If the file's content is different from `expectedContent`, the a 3-way merge is performed before writing.
// If the file no longer exists, the file is not recreated and an empty array is returned.
write: (
path: RelativePath,
expectedContent: Uint8Array,
newContent: Uint8Array
) => Promise<Uint8Array>;
remove: (path: RelativePath) => Promise<void>;
move: (oldPath: RelativePath, newPath: RelativePath) => Promise<void>;
isFileEligibleForSync: (path: RelativePath) => boolean;
}

View file

@ -0,0 +1,48 @@
export { applyRemoteChangesLocally } from "./sync-operations/apply-remote-changes-locally";
export {
type RelativePath,
type DocumentId,
type VaultUpdateId,
type DocumentMetadata
} from "./database/document-metadata";
export { Database } from "./database/database";
export {
SyncService,
type CheckConnectionResult
} from "./services/sync-service";
export { Syncer } from "./sync-operations/syncer";
export {
SyncHistory,
SyncType,
SyncSource,
SyncStatus,
type HistoryStats,
type HistoryEntry
} from "./tracing/sync-history";
export { Logger, LogLevel } from "./tracing/logger";
export { type FileOperations } from "./file-operations";
import init from "sync_lib";
import wasmBin from "sync_lib/sync_lib_bg.wasm";
export const initialize = async (): Promise<void> => {
await init(
// eslint-disable-next-line
(wasmBin as any).default // it is loaded as a base64 string by webpack
);
};
export {
isFileTypeMergable,
mergeText,
bytesToBase64,
base64ToBytes,
merge,
isBinary
} from "sync_lib";

View file

@ -0,0 +1,295 @@
import type { Client } from "openapi-fetch";
import createClient from "openapi-fetch";
import type { components, paths } from "./types"; // Generated by openapi-typescript
import type { Database } from "../database/database";
import type { SyncSettings } from "../database/sync-settings";
import type {
DocumentId,
RelativePath,
VaultUpdateId
} from "src/database/document-metadata";
import { Logger } from "src/tracing/logger";
import { retriedFetch } from "src/utils/retried-fetch";
export interface CheckConnectionResult {
isSuccessful: boolean;
message: string;
}
export class SyncService {
private client: Client<paths>;
private clientWithoutRetries: Client<paths>;
public constructor(private readonly database: Database) {
this.createClient(database.getSettings());
database.addOnSettingsChangeHandlers((s) => {
this.createClient(s);
});
}
private static formatError(
error: components["schemas"]["SerializedError"]
): string {
let result = error.message;
if (error.causes.length > 0) {
const causes = error.causes.join(", ");
result += ` caused by: ${causes}`;
}
return result;
}
public async ping(): Promise<components["schemas"]["PingResponse"]> {
const response = await this.clientWithoutRetries.GET("/ping", {
params: {
header: {
authorization: `Bearer ${this.database.getSettings().token}`
}
}
});
Logger.getInstance().debug(
`Ping response: ${JSON.stringify(response.data)}`
);
if (!response.data) {
throw new Error(
`Failed to ping server: ${SyncService.formatError(response.error)}`
);
}
return response.data;
}
public async create({
relativePath,
contentBytes,
createdDate
}: {
relativePath: RelativePath;
contentBytes: Uint8Array;
createdDate: Date;
}): Promise<components["schemas"]["DocumentUpdateResponse"]> {
const formData = new FormData();
formData.append("relative_path", relativePath);
formData.append("created_date", createdDate.toISOString());
formData.append("content", new Blob([contentBytes]));
const response = await this.client.POST(
"/vaults/{vault_id}/documents",
{
params: {
path: {
vault_id: this.database.getSettings().vaultName
},
header: {
authorization: `Bearer ${this.database.getSettings().token}`
}
},
// eslint-disable-next-line
body: formData as any // FormData is not supported by openapi-fetch
}
);
if (!response.data) {
throw new Error(
`Failed to create document: ${SyncService.formatError(response.error)}`
);
}
Logger.getInstance().debug(
`Created document ${JSON.stringify(response.data)} with id ${
response.data.documentId
}`
);
return response.data;
}
public async put({
parentVersionId,
documentId,
relativePath,
contentBytes,
createdDate
}: {
parentVersionId: VaultUpdateId;
documentId: DocumentId;
relativePath: RelativePath;
contentBytes: Uint8Array;
createdDate: Date;
}): Promise<components["schemas"]["DocumentUpdateResponse"]> {
const formData = new FormData();
formData.append("parent_version_id", parentVersionId.toString());
formData.append("created_date", createdDate.toISOString());
formData.append("relative_path", relativePath);
formData.append("content", new Blob([contentBytes]));
const response = await this.client.PUT(
"/vaults/{vault_id}/documents/{document_id}",
{
params: {
path: {
vault_id: this.database.getSettings().vaultName,
document_id: documentId
},
header: {
authorization: `Bearer ${this.database.getSettings().token}`
}
},
// eslint-disable-next-line
body: formData as any // FormData is not supported by openapi-fetch
}
);
if (!response.data) {
throw new Error(
`Failed to update document: ${SyncService.formatError(response.error)}`
);
}
Logger.getInstance().debug(
`Updated document ${JSON.stringify(response.data)} with id ${
response.data.documentId
}`
);
return response.data;
}
public async delete({
documentId,
relativePath,
createdDate
}: {
documentId: DocumentId;
relativePath: RelativePath;
createdDate: Date;
}): Promise<void> {
const response = await this.client.DELETE(
"/vaults/{vault_id}/documents/{document_id}",
{
params: {
path: {
vault_id: this.database.getSettings().vaultName,
document_id: documentId
},
header: {
authorization: `Bearer ${this.database.getSettings().token}`
}
},
body: {
createdDate: createdDate.toISOString(),
relativePath
}
}
);
if (response.error) {
throw new Error(`Failed to delete document`);
}
Logger.getInstance().debug(
`Deleted document ${relativePath} with id ${documentId}`
);
return response.data;
}
public async get({
documentId
}: {
documentId: DocumentId;
}): Promise<components["schemas"]["DocumentVersion"]> {
const response = await this.client.GET(
"/vaults/{vault_id}/documents/{document_id}",
{
params: {
path: {
vault_id: this.database.getSettings().vaultName,
document_id: documentId
},
header: {
authorization: `Bearer ${this.database.getSettings().token}`
}
}
}
);
if (!response.data) {
throw new Error(
`Failed to get document: ${SyncService.formatError(response.error)}`
);
}
Logger.getInstance().debug(
`Get document ${response.data.relativePath} with id ${response.data.documentId}`
);
return response.data;
}
public async getAll(
since?: VaultUpdateId
): Promise<components["schemas"]["FetchLatestDocumentsResponse"]> {
const response = await this.client.GET("/vaults/{vault_id}/documents", {
params: {
path: {
vault_id: this.database.getSettings().vaultName
},
header: {
authorization: `Bearer ${this.database.getSettings().token}`
},
query: {
since_update_id: since
}
}
});
const { error } = response;
if (error) {
throw new Error(
`Failed to get documents: ${SyncService.formatError(response.error)}`
);
}
Logger.getInstance().debug(
`Got ${response.data.latestDocuments.length} document metadata`
);
return response.data;
}
public async checkConnection(): Promise<CheckConnectionResult> {
try {
const result = await this.ping();
if (result.isAuthenticated) {
return {
isSuccessful: true,
message: `Successfully connected to server (version: ${result.serverVersion}) and authenticated.`
};
}
return {
isSuccessful: false,
message: `Successfully connected to server (version: ${result.serverVersion}) but failed to authenticate.`
};
} catch (e) {
return {
isSuccessful: false,
message: `Failed to connect to server: ${e}`
};
}
}
private createClient(settings: SyncSettings): void {
this.client = createClient<paths>({
baseUrl: settings.remoteUri,
fetch: retriedFetch
});
this.clientWithoutRetries = createClient<paths>({
baseUrl: settings.remoteUri
});
}
}

View file

@ -0,0 +1,612 @@
/**
* This file was auto-generated by openapi-typescript.
* Do not make direct changes to the file.
*/
export interface paths {
"/ping": {
parameters: {
query?: never;
header?: never;
path?: never;
cookie?: never;
};
get: {
parameters: {
query?: never;
header?: {
authorization?: string;
};
path?: never;
cookie?: never;
};
requestBody?: never;
responses: {
200: {
headers: {
[name: string]: unknown;
};
content: {
"application/json": components["schemas"]["PingResponse"];
};
};
default: {
headers: {
[name: string]: unknown;
};
content: {
"application/json": components["schemas"]["SerializedError"];
};
};
};
};
put?: never;
post?: never;
delete?: never;
options?: never;
head?: never;
patch?: never;
trace?: never;
};
"/vaults/{vault_id}/documents": {
parameters: {
query?: never;
header?: never;
path?: never;
cookie?: never;
};
get: {
parameters: {
query?: {
since_update_id?: number | null;
};
header: {
authorization: string;
};
path: {
vault_id: string;
};
cookie?: never;
};
requestBody?: never;
responses: {
200: {
headers: {
[name: string]: unknown;
};
content: {
"application/json": components["schemas"]["FetchLatestDocumentsResponse"];
};
};
default: {
headers: {
[name: string]: unknown;
};
content: {
"application/json": components["schemas"]["SerializedError"];
};
};
};
};
put?: never;
post: {
parameters: {
query?: never;
header: {
authorization: string;
};
path: {
vault_id: string;
};
cookie?: never;
};
requestBody: {
content: {
"multipart/form-data": components["schemas"]["CreateDocumentVersionMultipart"];
};
};
responses: {
200: {
headers: {
[name: string]: unknown;
};
content: {
"application/json": components["schemas"]["DocumentUpdateResponse"];
};
};
default: {
headers: {
[name: string]: unknown;
};
content: {
"application/json": components["schemas"]["SerializedError"];
};
};
};
};
delete?: never;
options?: never;
head?: never;
patch?: never;
trace?: never;
};
"/vaults/{vault_id}/documents/json": {
parameters: {
query?: never;
header?: never;
path?: never;
cookie?: never;
};
get?: never;
put?: never;
post: {
parameters: {
query?: never;
header: {
authorization: string;
};
path: {
vault_id: string;
};
cookie?: never;
};
requestBody: {
content: {
"application/json": components["schemas"]["CreateDocumentVersion"];
};
};
responses: {
200: {
headers: {
[name: string]: unknown;
};
content: {
"application/json": components["schemas"]["DocumentUpdateResponse"];
};
};
default: {
headers: {
[name: string]: unknown;
};
content: {
"application/json": components["schemas"]["SerializedError"];
};
};
};
};
delete?: never;
options?: never;
head?: never;
patch?: never;
trace?: never;
};
"/vaults/{vault_id}/documents/{document_id}": {
parameters: {
query?: never;
header?: never;
path?: never;
cookie?: never;
};
get: {
parameters: {
query?: never;
header: {
authorization: string;
};
path: {
document_id: string;
vault_id: string;
};
cookie?: never;
};
requestBody?: never;
responses: {
200: {
headers: {
[name: string]: unknown;
};
content: {
"application/json": components["schemas"]["DocumentVersion"];
};
};
default: {
headers: {
[name: string]: unknown;
};
content: {
"application/json": components["schemas"]["SerializedError"];
};
};
};
};
put: {
parameters: {
query?: never;
header: {
authorization: string;
};
path: {
document_id: string;
vault_id: string;
};
cookie?: never;
};
requestBody: {
content: {
"multipart/form-data": components["schemas"]["UpdateDocumentVersionMultipart"];
};
};
responses: {
200: {
headers: {
[name: string]: unknown;
};
content: {
"application/json": components["schemas"]["DocumentUpdateResponse"];
};
};
default: {
headers: {
[name: string]: unknown;
};
content: {
"application/json": components["schemas"]["SerializedError"];
};
};
};
};
post?: never;
delete: {
parameters: {
query?: never;
header: {
authorization: string;
};
path: {
document_id: string;
vault_id: string;
};
cookie?: never;
};
requestBody: {
content: {
"application/json": components["schemas"]["DeleteDocumentVersion"];
};
};
responses: {
/** @description no content */
200: {
headers: {
[name: string]: unknown;
};
content?: never;
};
default: {
headers: {
[name: string]: unknown;
};
content: {
"application/json": components["schemas"]["SerializedError"];
};
};
};
};
options?: never;
head?: never;
patch?: never;
trace?: never;
};
"/vaults/{vault_id}/documents/{document_id}/json": {
parameters: {
query?: never;
header?: never;
path?: never;
cookie?: never;
};
get?: never;
put: {
parameters: {
query?: never;
header: {
authorization: string;
};
path: {
document_id: string;
vault_id: string;
};
cookie?: never;
};
requestBody: {
content: {
"application/json": components["schemas"]["UpdateDocumentVersion"];
};
};
responses: {
200: {
headers: {
[name: string]: unknown;
};
content: {
"application/json": components["schemas"]["DocumentUpdateResponse"];
};
};
default: {
headers: {
[name: string]: unknown;
};
content: {
"application/json": components["schemas"]["SerializedError"];
};
};
};
};
post?: never;
delete?: never;
options?: never;
head?: never;
patch?: never;
trace?: never;
};
"/vaults/{vault_id}/documents/{document_id}/versions/{version_id}": {
parameters: {
query?: never;
header?: never;
path?: never;
cookie?: never;
};
get?: never;
put: {
parameters: {
query?: never;
header: {
authorization: string;
};
path: {
document_id: string;
vault_id: string;
vault_update_id: number;
};
cookie?: never;
};
requestBody?: never;
responses: {
200: {
headers: {
[name: string]: unknown;
};
content: {
"application/json": components["schemas"]["DocumentVersion"];
};
};
default: {
headers: {
[name: string]: unknown;
};
content: {
"application/json": components["schemas"]["SerializedError"];
};
};
};
};
post?: never;
delete?: never;
options?: never;
head?: never;
patch?: never;
trace?: never;
};
"/vaults/{vault_id}/documents/{document_id}/versions/{version_id}/content": {
parameters: {
query?: never;
header?: never;
path?: never;
cookie?: never;
};
get?: never;
put: {
parameters: {
query?: never;
header: {
authorization: string;
};
path: {
document_id: string;
vault_id: string;
vault_update_id: number;
};
cookie?: never;
};
requestBody?: never;
responses: {
/** @description byte stream */
200: {
headers: {
[name: string]: unknown;
};
content: {
"application/octet-stream": unknown;
};
};
default: {
headers: {
[name: string]: unknown;
};
content: {
"application/json": components["schemas"]["SerializedError"];
};
};
};
};
post?: never;
delete?: never;
options?: never;
head?: never;
patch?: never;
trace?: never;
};
}
export type webhooks = Record<string, never>;
export interface components {
schemas: {
Array_of_uint8: number[];
CreateDocumentVersion: {
contentBase64: string;
/** Format: date-time */
createdDate: string;
relativePath: string;
};
CreateDocumentVersionMultipart: {
content: components["schemas"]["Array_of_uint8"];
/** Format: date-time */
created_date: string;
relative_path: string;
};
DeleteDocumentVersion: {
/** Format: date-time */
createdDate: string;
relativePath: string;
};
/** @description Response to a create/update document request. */
DocumentUpdateResponse:
| {
/** Format: date-time */
createdDate: string;
/** Format: uuid */
documentId: string;
isDeleted: boolean;
relativePath: string;
/** @enum {string} */
type: "FastForwardUpdate";
/** Format: date-time */
updatedDate: string;
vaultId: string;
/** Format: int64 */
vaultUpdateId: number;
}
| {
contentBase64: string;
/** Format: date-time */
createdDate: string;
/** Format: uuid */
documentId: string;
isDeleted: boolean;
relativePath: string;
/** @enum {string} */
type: "MergingUpdate";
/** Format: date-time */
updatedDate: string;
vaultId: string;
/** Format: int64 */
vaultUpdateId: number;
};
DocumentVersion: {
contentBase64: string;
/** Format: date-time */
createdDate: string;
/** Format: uuid */
documentId: string;
isDeleted: boolean;
relativePath: string;
/** Format: date-time */
updatedDate: string;
vaultId: string;
/** Format: int64 */
vaultUpdateId: number;
};
DocumentVersionWithoutContent: {
/** Format: date-time */
createdDate: string;
/** Format: uuid */
documentId: string;
isDeleted: boolean;
relativePath: string;
/** Format: date-time */
updatedDate: string;
vaultId: string;
/** Format: int64 */
vaultUpdateId: number;
};
/** @description Response to a fetch latest documents request. */
FetchLatestDocumentsResponse: {
/**
* Format: int64
* @description The update ID of the latest document in the response.
*/
lastUpdateId: number;
latestDocuments: components["schemas"]["DocumentVersionWithoutContent"][];
};
PathParams: {
vault_id: string;
};
PathParams2: {
vault_id: string;
};
PathParams3: {
/** Format: uuid */
document_id: string;
vault_id: string;
};
PathParams4: {
/** Format: uuid */
document_id: string;
vault_id: string;
};
PathParams5: {
/** Format: uuid */
document_id: string;
vault_id: string;
/** Format: int64 */
vault_update_id: number;
};
PathParams6: {
/** Format: uuid */
document_id: string;
vault_id: string;
/** Format: int64 */
vault_update_id: number;
};
PathParams7: {
/** Format: uuid */
document_id: string;
vault_id: string;
};
/** @description Response to a ping request. */
PingResponse: {
/** @description Whether the client is authenticated based on the sent Authorization header. */
isAuthenticated: boolean;
/** @description Semantic version of the server. */
serverVersion: string;
};
QueryParams: {
/** Format: int64 */
since_update_id?: number | null;
};
SerializedError: {
causes: string[];
message: string;
};
UpdateDocumentVersion: {
contentBase64: string;
/** Format: date-time */
createdDate: string;
/** Format: int64 */
parentVersionId: number;
relativePath: string;
};
UpdateDocumentVersionMultipart: {
content: components["schemas"]["Array_of_uint8"];
/** Format: date-time */
createdDate: string;
/** Format: int64 */
parentVersionId: number;
relativePath: string;
};
};
responses: never;
parameters: never;
requestBodies: never;
headers: never;
pathItems: never;
}
export type $defs = Record<string, never>;
export type operations = Record<string, never>;

View file

@ -0,0 +1,61 @@
import type { Database } from "../database/database";
import type { SyncService } from "src/services/sync-service";
import { Logger } from "src/tracing/logger";
import type { Syncer } from "./syncer";
let isRunning = false;
export async function applyRemoteChangesLocally({
database,
syncService,
syncer
}: {
database: Database;
syncService: SyncService;
syncer: Syncer;
}): Promise<void> {
if (!database.getSettings().isSyncEnabled) {
Logger.getInstance().debug(
`Syncing is disabled, not fetching remote changes`
);
return;
} else if (isRunning) {
Logger.getInstance().debug(
"Applying remote changes locally is already in progress, skipping invocation"
);
return;
}
isRunning = true;
try {
const remote = await syncService.getAll(database.getLastSeenUpdateId());
if (remote.latestDocuments.length === 0) {
Logger.getInstance().debug("No remote changes to apply");
return;
}
Logger.getInstance().info("Applying remote changes locally");
await Promise.all(
remote.latestDocuments.map(async (remoteDocument) =>
syncer.syncRemotelyUpdatedFile(remoteDocument)
)
);
const lastSeenUpdateId = database.getLastSeenUpdateId();
if (
lastSeenUpdateId === undefined ||
remote.lastUpdateId > lastSeenUpdateId
) {
await database.setLastSeenUpdateId(remote.lastUpdateId);
}
} catch (e) {
Logger.getInstance().error(
`Failed to apply remote changes locally: ${e}`
);
} finally {
isRunning = false;
}
}

View file

@ -0,0 +1,79 @@
import { RelativePath } from "../database/document-metadata";
import {
tryLockDocument,
waitForDocumentLock,
unlockDocument
} from "./document-lock";
describe("Document Lock Operations", () => {
const testPath: RelativePath = "test/document/path";
beforeEach(() => {
// Reset the state before each test
(global as any).locked = new Set<RelativePath>();
(global as any).waiters = new Map<RelativePath, (() => void)[]>();
});
test("should lock a document successfully", () => {
const result = tryLockDocument(testPath);
expect(result).toBe(true);
});
test("should not lock a document that is already locked", () => {
tryLockDocument(testPath);
const result = tryLockDocument(testPath);
expect(result).toBe(false);
});
test("should unlock a locked document", () => {
tryLockDocument(testPath);
unlockDocument(testPath);
const result = tryLockDocument(testPath);
expect(result).toBe(true);
unlockDocument(testPath);
});
test("should throw an error when unlocking a document that is not locked", () => {
expect(() => {
unlockDocument(testPath);
}).toThrow(`Document ${testPath} is not locked, cannot unlock`);
});
test("should wait for a document lock and resolve when unlocked", async () => {
tryLockDocument(testPath);
let resolved = false;
const waitPromise = waitForDocumentLock(testPath).then(() => {
resolved = true;
});
unlockDocument(testPath);
await waitPromise;
expect(resolved).toBe(true);
});
test("should resolve multiple waiters in FIFO order", async () => {
tryLockDocument(testPath);
let firstResolved = false;
let secondResolved = false;
const firstWaitPromise = waitForDocumentLock(testPath).then(() => {
firstResolved = true;
});
const secondWaitPromise = waitForDocumentLock(testPath).then(() => {
secondResolved = true;
});
unlockDocument(testPath);
await firstWaitPromise;
expect(firstResolved).toBe(true);
expect(secondResolved).toBe(false);
unlockDocument(testPath);
await secondWaitPromise;
expect(secondResolved).toBe(true);
});
});

View file

@ -0,0 +1,48 @@
import { RelativePath } from "../database/document-metadata";
const locked = new Set<RelativePath>();
const waiters = new Map<RelativePath, (() => void)[]>();
export function tryLockDocument(relativePath: RelativePath): boolean {
if (locked.has(relativePath)) {
return false;
}
locked.add(relativePath);
return true;
}
export async function waitForDocumentLock(
relativePath: RelativePath
): Promise<void> {
if (tryLockDocument(relativePath)) {
return Promise.resolve();
}
return new Promise((resolve) => {
let waiting = waiters.get(relativePath);
if (!waiting) {
waiting = [];
waiters.set(relativePath, waiting);
}
waiting.push(resolve);
});
}
export function unlockDocument(relativePath: RelativePath): void {
if (!locked.has(relativePath)) {
throw new Error(
`Document ${relativePath} is not locked, cannot unlock`
);
}
// Remove the first element to ensure FIFO unblocking order
const nextWaiting = waiters.get(relativePath)?.shift();
if (nextWaiting) {
nextWaiting();
} else {
locked.delete(relativePath);
}
}

View file

@ -0,0 +1,708 @@
import type { Database } from "../database/database";
import type {
DocumentMetadata,
RelativePath
} from "src/database/document-metadata";
import type { FileOperations } from "src/file-operations";
import type { SyncService } from "src/services/sync-service";
import { Logger } from "src/tracing/logger";
import type { SyncHistory } from "src/tracing/sync-history";
import { SyncSource, SyncStatus, SyncType } from "src/tracing/sync-history";
import { unlockDocument, waitForDocumentLock } from "./document-lock";
import PQueue from "p-queue";
import { EMPTY_HASH, hash } from "src/utils/hash";
import type { components } from "src/services/types";
import { deserialize } from "src/utils/deserialize";
export class Syncer {
private readonly remainingOperationsListeners: ((
remainingOperations: number
) => void)[] = [];
private readonly syncQueue: PQueue;
private isRunningOfflineSync = false;
public constructor(
private readonly database: Database,
private readonly syncService: SyncService,
private readonly operations: FileOperations,
private readonly history: SyncHistory
) {
this.syncQueue = new PQueue({
concurrency: database.getSettings().syncConcurrency
});
database.addOnSettingsChangeHandlers((settings) => {
this.syncQueue.concurrency = settings.syncConcurrency;
});
this.syncQueue.on("active", () => {
this.emitRemainingOperationsChange(this.syncQueue.size);
});
}
public addRemainingOperationsListener(
listener: (remainingOperations: number) => void
): void {
this.remainingOperationsListeners.push(listener);
}
public async syncLocallyCreatedFile(
relativePath: RelativePath,
updateTime: Date
): Promise<void> {
await this.syncQueue.add(async () =>
this.internalSyncLocallyCreatedFile(relativePath, updateTime)
);
}
public async syncLocallyUpdatedFile(args: {
oldPath?: RelativePath;
relativePath: RelativePath;
updateTime: Date;
}): Promise<void> {
await this.syncQueue.add(async () =>
this.internalSyncLocallyUpdatedFile(args)
);
}
public async syncLocallyDeletedFile(
relativePath: RelativePath
): Promise<void> {
await this.syncQueue.add(async () =>
this.internalSyncLocallyDeletedFile(relativePath)
);
}
public async syncRemotelyUpdatedFile(
remoteVersion: components["schemas"]["DocumentVersionWithoutContent"]
): Promise<void> {
await this.syncQueue.add(async () =>
this.internalSyncRemotelyUpdatedFile(remoteVersion)
);
}
public async scheduleSyncForOfflineChanges(): Promise<void> {
if (this.isRunningOfflineSync) {
Logger.getInstance().warn(
"Uploading local changes is already in progress, skipping"
);
return;
}
if (!this.database.getSettings().isSyncEnabled) {
Logger.getInstance().debug(
`Syncing is disabled, not uploading local changes`
);
return;
}
this.isRunningOfflineSync = true;
try {
const allLocalFiles = await this.operations.listAllFiles();
let locallyDeletedFiles = [
...this.database.getDocuments().entries()
].filter(([path, _]) => !allLocalFiles.includes(path));
await Promise.all(
allLocalFiles.map(async (relativePath) =>
this.syncQueue.add(async () => {
const metadata =
this.database.getDocument(relativePath);
// If there's no metadata, it must be a new file
if (!metadata) {
// Perhaps the file has been moved. Let's check by looking at the deleted files
const contentBytes =
await this.operations.read(relativePath);
const contentHash = hash(contentBytes);
const originalFile =
await this.findMatchingFileBasedOnHash(
contentHash,
locallyDeletedFiles
);
if (originalFile !== undefined) {
// `originalFile` hasn't been deleted but it got moved instead
locallyDeletedFiles =
locallyDeletedFiles.filter(
(item) => item != originalFile
);
Logger.getInstance().debug(
`Document ${relativePath} was not found under its current path in the database but was found under a different path ${originalFile[0]}, scheduling sync to move it`
);
return this.internalSyncLocallyUpdatedFile({
oldPath: originalFile[0],
relativePath: relativePath,
updateTime:
await this.operations.getModificationTime(
relativePath
),
optimisations: {
contentBytes,
contentHash
}
});
}
Logger.getInstance().debug(
`Document ${relativePath} not found in database, scheduling sync to create it`
);
return this.internalSyncLocallyCreatedFile(
relativePath,
await this.operations.getModificationTime(
relativePath
)
);
}
Logger.getInstance().debug(
`Document ${relativePath} has been updated locally, scheduling sync to update it`
);
return this.internalSyncLocallyUpdatedFile({
relativePath,
updateTime:
await this.operations.getModificationTime(
relativePath
)
});
})
)
);
await Promise.all(
locallyDeletedFiles.map(async ([relativePath, _]) => {
Logger.getInstance().debug(
`Document ${relativePath} has been deleted locally, scheduling sync to delete it`
);
if (await this.operations.exists(relativePath)) {
Logger.getInstance().debug(
`Document ${relativePath} actually exists locally, skipping`
);
return Promise.resolve();
}
return this.internalSyncLocallyDeletedFile(relativePath);
})
);
Logger.getInstance().info(
`All local changes have been applied remotely`
);
} catch (e) {
Logger.getInstance().error(
`Not all local changes have been applied remotely: ${e}`
);
} finally {
this.isRunningOfflineSync = false;
}
}
public async reset(): Promise<void> {
this.syncQueue.clear();
await this.syncQueue.onEmpty();
await this.database.resetSyncState();
this.history.reset();
this.remainingOperationsListeners.forEach((listener) => {
listener(0);
});
}
private async internalSyncLocallyCreatedFile(
relativePath: RelativePath,
updateTime: Date,
optimisations?: {
contentBytes?: Uint8Array;
contentHash?: string;
}
): Promise<void> {
await this.executeWhileHoldingFileLock(
relativePath,
SyncType.CREATE,
SyncSource.PUSH,
async () => {
if (
(await this.operations.getFileSize(relativePath)) /
1024 /
1024 >
this.database.getSettings().maxFileSizeMB
) {
this.history.addHistoryEntry({
status: SyncStatus.ERROR,
relativePath,
message: `File size exceeds the maximum file size limit of ${
this.database.getSettings().maxFileSizeMB
}MB`,
type: SyncType.CREATE
});
return;
}
const contentBytes =
optimisations?.contentBytes ??
(await this.operations.read(relativePath));
let contentHash =
optimisations?.contentHash ?? hash(contentBytes);
const localMetadata = this.database.getDocument(relativePath);
if (localMetadata) {
Logger.getInstance().debug(
`Document metadata already exists for ${relativePath}, it must have been downloaded from the server`
);
if (localMetadata.hash === contentHash) {
this.history.addHistoryEntry({
status: SyncStatus.NO_OP,
relativePath,
message: `File hash matches with last synced version, no need to sync`,
type: SyncType.UPDATE
});
return;
}
}
const response = await this.syncService.create({
relativePath,
contentBytes,
createdDate: updateTime
});
this.history.addHistoryEntry({
status: SyncStatus.SUCCESS,
source: SyncSource.PUSH,
relativePath,
message: `Successfully uploaded locally created file`,
type: SyncType.CREATE
});
if (response.type === "MergingUpdate") {
const responseBytes = deserialize(response.contentBase64);
contentHash = hash(responseBytes);
await this.operations.write(
relativePath,
contentBytes,
responseBytes
);
this.history.addHistoryEntry({
status: SyncStatus.SUCCESS,
source: SyncSource.PULL,
relativePath,
message: `The file we created locally has already existed remotely, so we have merged them`,
type: SyncType.UPDATE
});
}
await this.database.setDocument({
documentId: response.documentId,
relativePath: response.relativePath,
parentVersionId: response.vaultUpdateId,
hash: contentHash
});
await this.tryIncrementVaultUpdateId(response.vaultUpdateId);
}
);
}
private async internalSyncLocallyUpdatedFile({
oldPath,
relativePath,
updateTime,
optimisations
}: {
oldPath?: RelativePath;
relativePath: RelativePath;
updateTime: Date;
optimisations?: {
contentBytes?: Uint8Array;
contentHash?: string;
};
}): Promise<void> {
await this.executeWhileHoldingFileLock(
relativePath,
SyncType.UPDATE,
SyncSource.PUSH,
async () => {
if (
(await this.operations.getFileSize(relativePath)) /
1024 /
1024 >
this.database.getSettings().maxFileSizeMB
) {
this.history.addHistoryEntry({
status: SyncStatus.ERROR,
relativePath,
message: `File size exceeds the maximum file size limit of ${
this.database.getSettings().maxFileSizeMB
}MB`,
type: SyncType.CREATE
});
return;
}
const localMetadata = this.database.getDocument(
oldPath ?? relativePath
);
if (!localMetadata) {
if (this.database.getDocument(relativePath)) {
this.history.addHistoryEntry({
status: SyncStatus.NO_OP,
relativePath,
message: `The renaming doesn't require a sync because it must have been pulled from remote`,
type: SyncType.UPDATE
});
return;
}
throw new Error(
`Document metadata not found for ${relativePath}. This implies a corrupt local database. Consider resetting the plugin's sync history.`
);
}
const contentBytes =
optimisations?.contentBytes ??
(await this.operations.read(relativePath));
let contentHash =
optimisations?.contentHash ?? hash(contentBytes);
if (
localMetadata.hash === contentHash &&
oldPath === undefined
) {
this.history.addHistoryEntry({
status: SyncStatus.NO_OP,
relativePath,
message: `File hash matches with last synced version, no need to sync`,
type: SyncType.UPDATE
});
return;
}
const response = await this.syncService.put({
documentId: localMetadata.documentId,
parentVersionId: localMetadata.parentVersionId,
relativePath,
contentBytes,
createdDate: updateTime
});
this.history.addHistoryEntry({
status: SyncStatus.SUCCESS,
source: SyncSource.PUSH,
relativePath,
message: `Successfully uploaded locally updated file to the remote server`,
type: SyncType.UPDATE
});
if (response.isDeleted) {
await this.operations.remove(oldPath ?? relativePath);
await this.database.removeDocument(oldPath ?? relativePath);
await this.tryIncrementVaultUpdateId(
response.vaultUpdateId
);
this.history.addHistoryEntry({
status: SyncStatus.SUCCESS,
source: SyncSource.PULL,
relativePath,
message:
"The file we tried to update had been deleted remotely, therefore, we have deleted it locally",
type: SyncType.DELETE
});
return;
}
if (response.relativePath != relativePath) {
await waitForDocumentLock(response.relativePath);
}
try {
if (response.relativePath != relativePath) {
await this.operations.move(
oldPath ?? relativePath,
response.relativePath
);
}
if (response.type === "MergingUpdate") {
const responseBytes = deserialize(
response.contentBase64
);
contentHash = hash(responseBytes);
await this.operations.write(
response.relativePath,
contentBytes,
responseBytes
);
this.history.addHistoryEntry({
status: SyncStatus.SUCCESS,
source: SyncSource.PULL,
relativePath,
message: `The file we updated had been updated remotely, so we downloaded the merged version`,
type: SyncType.UPDATE
});
}
await this.database.moveDocument({
documentId: localMetadata.documentId,
oldRelativePath: oldPath ?? relativePath,
relativePath: response.relativePath,
parentVersionId: response.vaultUpdateId,
hash: contentHash
});
await this.tryIncrementVaultUpdateId(
response.vaultUpdateId
);
} finally {
if (response.relativePath != relativePath) {
unlockDocument(response.relativePath);
}
}
}
);
}
private async internalSyncLocallyDeletedFile(
relativePath: RelativePath
): Promise<void> {
await this.executeWhileHoldingFileLock(
relativePath,
SyncType.DELETE,
SyncSource.PUSH,
async () => {
const localMetadata = this.database.getDocument(relativePath);
if (!localMetadata) {
this.history.addHistoryEntry({
status: SyncStatus.NO_OP,
relativePath,
message: `Locally deleted file hasn't been uploaded yet, so there's no need to delete it on the remote server`,
type: SyncType.DELETE
});
return;
}
await this.syncService.delete({
documentId: localMetadata.documentId,
relativePath,
createdDate: new Date() // We got the event now, so it must have been deleted just now
});
this.history.addHistoryEntry({
status: SyncStatus.SUCCESS,
source: SyncSource.PUSH,
relativePath,
message: `Successfully deleted locally deleted file on the remote server`,
type: SyncType.DELETE
});
await this.database.removeDocument(relativePath);
}
);
}
private async internalSyncRemotelyUpdatedFile(
remoteVersion: components["schemas"]["DocumentVersionWithoutContent"]
): Promise<void> {
await this.executeWhileHoldingFileLock(
remoteVersion.relativePath,
SyncType.UPDATE,
SyncSource.PULL,
async () => {
const localMetadata = this.database.getDocumentByDocumentId(
remoteVersion.documentId
);
if (!localMetadata) {
if (remoteVersion.isDeleted) {
this.history.addHistoryEntry({
status: SyncStatus.NO_OP,
source: SyncSource.PULL,
relativePath: remoteVersion.relativePath,
message: `Remotely deleted file hasn't been synced yet, so there's no need to delete it locally`,
type: SyncType.DELETE
});
return;
}
const content = (
await this.syncService.get({
documentId: remoteVersion.documentId
})
).contentBase64;
const contentBytes = deserialize(content);
await this.operations.create(
remoteVersion.relativePath,
contentBytes
);
await this.database.setDocument({
documentId: remoteVersion.documentId,
relativePath: remoteVersion.relativePath,
parentVersionId: remoteVersion.vaultUpdateId,
hash: hash(contentBytes)
});
this.history.addHistoryEntry({
status: SyncStatus.SUCCESS,
source: SyncSource.PULL,
relativePath: remoteVersion.relativePath,
message: `Successfully downloaded remote file which hasn't existed locally`,
type: SyncType.CREATE
});
return;
}
const [relativePath, metadata] = localMetadata;
if (metadata.parentVersionId === remoteVersion.vaultUpdateId) {
Logger.getInstance().debug(
`Document ${relativePath} is already up to date`
);
return;
}
if (relativePath !== remoteVersion.relativePath) {
await waitForDocumentLock(relativePath);
}
try {
if (remoteVersion.isDeleted) {
await this.operations.remove(relativePath);
await this.database.removeDocument(relativePath);
this.history.addHistoryEntry({
status: SyncStatus.SUCCESS,
source: SyncSource.PULL,
relativePath: remoteVersion.relativePath,
message: `Successfully deleted remotely deleted file locally`,
type: SyncType.DELETE
});
} else {
const currentContent =
await this.operations.read(relativePath);
const currentHash = hash(currentContent);
if (currentHash !== metadata.hash) {
Logger.getInstance().info(
`Document ${relativePath} has been updated both remotely and locally, letting the local file update event handle it`
);
return;
}
const content = (
await this.syncService.get({
documentId: remoteVersion.documentId
})
).contentBase64;
const contentBytes = deserialize(content);
const contentHash = hash(contentBytes);
if (relativePath !== remoteVersion.relativePath) {
await this.operations.move(
relativePath,
remoteVersion.relativePath
);
}
await this.operations.write(
remoteVersion.relativePath,
currentContent,
contentBytes
);
await this.database.moveDocument({
documentId: remoteVersion.documentId,
oldRelativePath: relativePath,
relativePath: remoteVersion.relativePath,
parentVersionId: remoteVersion.vaultUpdateId,
hash: contentHash
});
this.history.addHistoryEntry({
status: SyncStatus.SUCCESS,
source: SyncSource.PULL,
relativePath: remoteVersion.relativePath,
message: `Successfully updated remotely updated file locally`,
type: SyncType.UPDATE
});
}
} finally {
if (relativePath !== remoteVersion.relativePath) {
unlockDocument(relativePath);
}
}
}
);
}
private async executeWhileHoldingFileLock(
relativePath: RelativePath,
syncType: SyncType,
syncSource: SyncSource,
fn: () => Promise<void>
): Promise<void> {
if (!this.database.getSettings().isSyncEnabled) {
Logger.getInstance().info(
`Syncing is disabled, not syncing ${relativePath}`
);
return;
}
if (!this.operations.isFileEligibleForSync(relativePath)) {
Logger.getInstance().info(
`File ${relativePath} is not eligible for syncing`
);
return;
}
Logger.getInstance().debug(`Syncing ${relativePath}`);
await waitForDocumentLock(relativePath);
try {
await fn();
} catch (e) {
this.history.addHistoryEntry({
status: SyncStatus.ERROR,
relativePath,
message: `Failed to ${syncSource.toLocaleLowerCase()} file ${e} when trying to ${syncType.toLocaleLowerCase()} it`,
type: syncType,
source: syncSource
});
throw e;
} finally {
unlockDocument(relativePath);
}
}
private emitRemainingOperationsChange(remainingOperations: number): void {
this.remainingOperationsListeners.forEach((listener) => {
listener(remainingOperations);
});
}
private async tryIncrementVaultUpdateId(
responseVaultUpdateId: number
): Promise<void> {
if (this.database.getLastSeenUpdateId() === responseVaultUpdateId - 1) {
await this.database.setLastSeenUpdateId(responseVaultUpdateId);
}
}
private async findMatchingFileBasedOnHash(
contentHash: string,
candidates: [RelativePath, DocumentMetadata][]
): Promise<[RelativePath, DocumentMetadata] | undefined> {
if (contentHash != EMPTY_HASH) {
return undefined;
}
return candidates.find(
([_, document]) => document.hash === contentHash
);
}
}

View file

@ -0,0 +1,98 @@
export enum LogLevel {
DEBUG = "DEBUG",
INFO = "INFO",
WARNING = "WARNING",
ERROR = "ERROR"
}
const LOG_LEVEL_ORDER = {
[LogLevel.DEBUG]: 0,
[LogLevel.INFO]: 1,
[LogLevel.WARNING]: 2,
[LogLevel.ERROR]: 3
};
class LogLine {
public timestamp = new Date();
public constructor(
public level: LogLevel,
public message: string
) {}
}
export class Logger {
private static readonly MAX_MESSAGES = 1000;
private static instance: Logger | null = null;
private readonly messages: LogLine[] = [];
private readonly onMessageListeners: ((
status: LogLine | undefined
) => void)[] = [];
private constructor() {} // eslint-disable-line @typescript-eslint/no-empty-function
public static getInstance(): Logger {
if (!Logger.instance) {
Logger.instance = new Logger();
}
return Logger.instance;
}
public debug(message: string): void {
console.debug(message);
this.pushMessage(message, LogLevel.DEBUG);
}
public info(message: string): void {
console.info(message);
this.pushMessage(message, LogLevel.INFO);
}
public warn(message: string): void {
console.warn(message);
this.pushMessage(message, LogLevel.WARNING);
}
public error(message: string): void {
console.error(message);
this.pushMessage(message, LogLevel.ERROR);
}
public getMessages(mininumSeverity: LogLevel): LogLine[] {
return this.messages.filter(
(message) =>
LOG_LEVEL_ORDER[message.level] >=
LOG_LEVEL_ORDER[mininumSeverity]
);
}
public addOnMessageListener(
listener: (message: LogLine | undefined) => void
): void {
this.onMessageListeners.push(listener);
}
public reset(): void {
this.messages.length = 0;
this.onMessageListeners.forEach((listener) => {
listener(undefined);
});
}
private pushMessage(message: string, level: LogLevel): void {
const logLine = new LogLine(level, message);
this.messages.push(logLine);
while (this.messages.length > Logger.MAX_MESSAGES) {
this.messages.shift();
}
this.onMessageListeners.forEach((listener) => {
listener(logLine);
});
}
}

View file

@ -0,0 +1,103 @@
import type { RelativePath } from "src/database/document-metadata";
import { Logger } from "./logger";
export interface CommonHistoryEntry {
status: SyncStatus;
relativePath: RelativePath;
message: string;
type?: SyncType;
source?: SyncSource;
}
export enum SyncType {
CREATE = "CREATE",
UPDATE = "UPDATE",
DELETE = "DELETE"
}
export enum SyncSource {
PUSH = "PUSH",
PULL = "PULL"
}
export enum SyncStatus {
NO_OP = "NO_OP",
SUCCESS = "SUCCESS",
ERROR = "ERROR"
}
export type HistoryEntry = CommonHistoryEntry & { timestamp: Date };
export interface HistoryStats {
success: number;
error: number;
}
export class SyncHistory {
private static readonly MAX_ENTRIES = 5000;
private readonly entries: HistoryEntry[] = [];
private readonly syncHistoryUpdateListeners: ((
status: HistoryStats
) => void)[] = [];
private status: HistoryStats = {
success: 0,
error: 0
};
public getEntries(): HistoryEntry[] {
return [...this.entries];
}
public reset(): void {
this.entries.length = 0;
this.status = {
success: 0,
error: 0
};
this.syncHistoryUpdateListeners.forEach((listener) => {
listener(this.status);
});
}
public addSyncHistoryUpdateListener(
listener: (stats: HistoryStats) => void
): void {
this.syncHistoryUpdateListeners.push(listener);
listener({ ...this.status });
}
public addHistoryEntry(entry: CommonHistoryEntry): void {
const historyEntry = {
...entry,
timestamp: new Date()
};
this.entries.push(historyEntry);
if (entry.status === SyncStatus.SUCCESS) {
this.status.success++;
Logger.getInstance().info(
`History entry: ${entry.relativePath} - ${entry.message}`
);
} else if (entry.status === SyncStatus.ERROR) {
this.status.error++;
Logger.getInstance().error(
`Error syncing file: ${entry.relativePath} - ${entry.message}`
);
} else {
Logger.getInstance().debug(
`No-op syncing file: ${entry.relativePath} - ${entry.message}`
);
}
this.syncHistoryUpdateListeners.forEach((listener) => {
listener(this.status);
});
if (this.entries.length > SyncHistory.MAX_ENTRIES) {
this.entries.shift();
}
}
}

View file

@ -0,0 +1,18 @@
import init, { base64ToBytes } from "sync_lib";
import fs from "fs";
describe("deserialize", () => {
it("should serialize a Uint8Array to a base64 string", async () => {
const wasmBin = fs.readFileSync(
"../../backend/sync_lib/pkg/sync_lib_bg.wasm"
);
await init({ module_or_path: wasmBin });
const base64 = "SGVsbG8=";
const jsResult = base64ToBytes(base64);
const expected = new Uint8Array([72, 101, 108, 108, 111]);
expect(jsResult).toEqual(expected);
const rustResult = base64ToBytes(base64);
expect(jsResult).toEqual(rustResult);
});
});

View file

@ -0,0 +1,5 @@
import { base64ToBytes } from "byte-base64";
export function deserialize(data: string): Uint8Array {
return base64ToBytes(data);
}

View file

@ -0,0 +1,12 @@
// https://stackoverflow.com/questions/7616461/generate-a-hash-from-string-in-javascript
export function hash(content: Uint8Array): string {
let result = 0;
// eslint-disable-next-line @typescript-eslint/prefer-for-of
for (let i = 0; i < content.length; i++) {
result = (result << 5) - result + content[i];
result |= 0; // Convert to 32bit integer
}
return Math.abs(result).toString(16);
}
export const EMPTY_HASH = hash(new Uint8Array(0));

View file

@ -0,0 +1,27 @@
import { isEqualBytes } from "./is-equal-bytes";
describe("isEqualBytes", () => {
it("should return true for equal byte arrays", () => {
const bytes1 = new Uint8Array([1, 2, 3, 4]);
const bytes2 = new Uint8Array([1, 2, 3, 4]);
expect(isEqualBytes(bytes1, bytes2)).toBe(true);
});
it("should return false for byte arrays of different lengths", () => {
const bytes1 = new Uint8Array([1, 2, 3, 4]);
const bytes2 = new Uint8Array([1, 2, 3]);
expect(isEqualBytes(bytes1, bytes2)).toBe(false);
});
it("should return true for empty byte arrays", () => {
const bytes1 = new Uint8Array([]);
const bytes2 = new Uint8Array([]);
expect(isEqualBytes(bytes1, bytes2)).toBe(true);
});
it("should return false for byte arrays with same length but different content", () => {
const bytes1 = new Uint8Array([1, 2, 3, 4]);
const bytes2 = new Uint8Array([4, 3, 2, 1]);
expect(isEqualBytes(bytes1, bytes2)).toBe(false);
});
});

View file

@ -0,0 +1,13 @@
export function isEqualBytes(bytes1: Uint8Array, bytes2: Uint8Array): boolean {
if (bytes1.length !== bytes2.length) {
return false;
}
for (let i = 0; i < bytes1.length; i++) {
if (bytes1[i] !== bytes2[i]) {
return false;
}
}
return true;
}

View file

@ -0,0 +1,36 @@
import * as fetchRetryFactory from "fetch-retry";
import type { RequestInitRetryParams } from "fetch-retry";
import { Logger } from "src/tracing/logger";
const fetchWithRetry = fetchRetryFactory.default(fetch);
function getUrlFromInput(input: RequestInfo | URL): string {
if (input instanceof URL) {
return input.href;
}
if (typeof input === "string") {
return input;
}
return input.url;
}
export async function retriedFetch(
input: RequestInfo | URL,
init: RequestInitRetryParams<typeof fetch> = {}
): Promise<Response> {
return fetchWithRetry(input, {
retryOn: function (attempt, error, response) {
if (error !== null || !response || response.status >= 500) {
Logger.getInstance().warn(
`Retrying fetch for ${getUrlFromInput(input)}, attempt ${attempt}`
);
return true;
}
return false;
},
retries: 6,
retryDelay: (attempt) => Math.pow(1.5, attempt) * 500,
...init
});
}

View file

@ -0,0 +1,18 @@
import { serialize } from "./serialize";
import init, { bytesToBase64 } from "sync_lib";
import fs from "fs";
describe("serialize", () => {
it("should serialize a Uint8Array to a base64 string", async () => {
const wasmBin = fs.readFileSync(
"../../backend/sync_lib/pkg/sync_lib_bg.wasm"
);
await init({ module_or_path: wasmBin });
const data = new Uint8Array([72, 101, 108, 108, 111]);
const jsResult = serialize(data);
const rustResult = bytesToBase64(data);
expect(rustResult).toBe("SGVsbG8=");
expect(jsResult).toBe(rustResult);
});
});

View file

@ -0,0 +1,5 @@
import { bytesToBase64 } from "byte-base64";
export function serialize(data: Uint8Array): string {
return bytesToBase64(data);
}

View file

@ -0,0 +1,15 @@
{
"compilerOptions": {
"baseUrl": ".",
"module": "ESNext",
"target": "ESNext",
"noImplicitAny": true,
"moduleResolution": "bundler",
"strictNullChecks": true,
"allowSyntheticDefaultImports": true,
"lib": [
"DOM",
"ESNext"
]
},
}

View file

@ -0,0 +1,49 @@
const path = require("path");
module.exports = (_env, _argv) => ({
entry: "./src/index.ts",
devtool: "source-map",
module: {
rules: [
{
test: /\.ts$/,
use: [
{
loader: "ts-loader",
options: {
compilerOptions: {
declaration: true,
declarationDir: "./dist/types"
},
transpileOnly: false
}
}
]
},
{
test: /\.wasm$/,
type: "asset/inline"
}
]
},
optimization: {
minimize: false
},
resolve: {
extensions: [".ts", ".js"],
alias: {
root: __dirname,
src: path.resolve(__dirname, "src")
}
},
output: {
clean: true,
filename: "index.js",
library: {
name: "SyncClient",
type: "umd"
},
globalObject: "this",
path: path.resolve(__dirname, "dist")
}
});