This commit is contained in:
Andras Schmelczer 2025-12-14 23:30:04 +00:00
parent e103bba12c
commit c4f992c9d6
21 changed files with 233 additions and 193 deletions

View file

@ -2,5 +2,5 @@ export const TIMEOUT_FOR_MERGING_HISTORY_ENTRIES_IN_SECONDS = 60;
export const DIFF_CACHE_SIZE_MB = 2;
export const MAX_LOG_MESSAGE_COUNT = 100000;
export const MAX_HISTORY_ENTRY_COUNT = 5000;
export const SUPPORTED_API_VERSION = 2;
export const WEBSOCKET_DISCONNECT_TIMEOUT_IN_S = 10;
export const SUPPORTED_API_VERSION = 3;
export const WEBSOCKET_DISCONNECT_TIMEOUT_IN_SECONDS = 10;

View file

@ -169,9 +169,9 @@ export class FileOperations {
}
await this.ensureClearPath(newPath);
this.database.move(oldPath, newPath);
await this.fs.rename(oldPath, newPath);
await this.deletingEmptyParentDirectoriesOfDeletedFile(oldPath);
}

View file

@ -9,6 +9,7 @@ export type DocumentId = string;
export type RelativePath = string;
export interface DocumentMetadata {
documentId: DocumentId;
parentVersionId: VaultUpdateId;
hash: string;
remoteRelativePath?: RelativePath;
@ -36,7 +37,6 @@ export interface StoredDatabase {
*/
export interface DocumentRecord {
relativePath: RelativePath;
documentId: DocumentId;
metadata: DocumentMetadata | undefined;
isDeleted: boolean;
updates: Promise<unknown>[];
@ -57,9 +57,8 @@ export class Database {
this.documents =
initialState.documents?.map(
({ relativePath, documentId, ...metadata }) => ({
({ relativePath, ...metadata }) => ({
relativePath,
documentId,
metadata,
isDeleted: false,
updates: [],
@ -114,7 +113,7 @@ export class Database {
i === 0
? false
: records[i - 1].parallelVersion ===
current.parallelVersion
current.parallelVersion
)
) {
throw new Error(
@ -127,6 +126,7 @@ export class Database {
public updateDocumentMetadata(
metadata: {
documentId: DocumentId;
parentVersionId: VaultUpdateId;
hash: string;
remoteRelativePath: RelativePath;
@ -196,19 +196,18 @@ export class Database {
}
public createNewPendingDocument(
documentId: DocumentId,
relativePath: RelativePath,
promise: Promise<unknown>
): DocumentRecord {
this.logger.debug(
`Creating new pending document: ${relativePath} (${documentId})`
`Creating new pending document: ${relativePath}`
);
const previousEntry =
this.getLatestDocumentByRelativePath(relativePath);
const entry = {
relativePath,
documentId,
documentId: undefined,
metadata: undefined,
isDeleted: false,
updates: [promise],
@ -231,8 +230,8 @@ export class Database {
): DocumentRecord {
const entry = {
relativePath,
documentId,
metadata: {
documentId,
parentVersionId,
hash: EMPTY_HASH,
remoteRelativePath: relativePath
@ -251,7 +250,7 @@ export class Database {
public getDocumentByDocumentId(
find: DocumentId
): DocumentRecord | undefined {
return this.documents.find(({ documentId }) => documentId === find);
return this.documents.find(({ metadata }) => metadata?.documentId === find);
}
public move(
@ -331,8 +330,7 @@ export class Database {
public async save(): Promise<void> {
return this.saveData({
documents: this.resolvedDocuments.map(
({ relativePath, documentId, metadata }) => ({
documentId,
({ relativePath, metadata }) => ({
relativePath,
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
...metadata! // `resolvedDocuments` only returns docs with metadata set
@ -346,9 +344,12 @@ export class Database {
private ensureConsistency(): void {
const idToPath = new Map<string, string[]>();
this.resolvedDocuments.forEach(({ relativePath, documentId }) => {
idToPath.set(documentId, [
...(idToPath.get(documentId) ?? []),
this.resolvedDocuments.forEach(({ relativePath, metadata }) => {
if (metadata === undefined) {
return;
}
idToPath.set(metadata.documentId, [
...(idToPath.get(metadata.documentId) ?? []),
relativePath
]);
});
@ -360,7 +361,7 @@ export class Database {
if (duplicates.length > 0) {
throw new Error(
"Document IDs are not unique, found duplicates: " +
duplicates.join("; ")
duplicates.join("; ")
);
}
}

View file

@ -14,15 +14,14 @@ export class ServerConfig {
private response: Promise<PingResponse> | undefined;
private config: ServerConfigData | undefined;
public constructor(private readonly syncService: SyncService) {}
public constructor(private readonly syncService: SyncService) { }
private static validateConfig(config: ServerConfigData): void {
if (config.supportedApiVersion !== SUPPORTED_API_VERSION) {
const shouldUpgradeClient =
config.supportedApiVersion > SUPPORTED_API_VERSION;
throw new ServerVersionMismatchError(
`Unsupported API version: ${config.supportedApiVersion}. Consider upgrading the ${
shouldUpgradeClient ? "client" : "sync-server"
`Unsupported API version: ${config.supportedApiVersion}. Consider upgrading the ${shouldUpgradeClient ? "client" : "sync-server"
} to ensure compatibility`
);
}
@ -34,11 +33,6 @@ export class ServerConfig {
}
}
// warm the cache
public async initialize(): Promise<void> {
await this.getConfig();
}
public async checkConnection(forceUpdate = false): Promise<{
isSuccessful: boolean;
message: string;

View file

@ -66,27 +66,29 @@ export class SyncService {
}
public async create({
documentId,
relativePath,
contentBytes
contentBytes,
forceMerge
}: {
documentId?: DocumentId;
relativePath: RelativePath;
contentBytes: Uint8Array;
forceMerge?: boolean;
}): Promise<DocumentVersionWithoutContent> {
return this.retryForever(async () => {
const formData = new FormData();
if (documentId !== undefined) {
formData.append("document_id", documentId);
}
formData.append("relative_path", relativePath);
if (forceMerge === true) {
formData.append("force_merge", "true");
}
formData.append(
"content",
new Blob([new Uint8Array(contentBytes)])
);
this.logger.debug(
`Creating document with id ${documentId} and relative path ${relativePath}`
`Creating document with relative path ${relativePath} (forceMerge: ${forceMerge})`
);
const response = await this.client(this.getUrl("/documents"), {
@ -155,8 +157,7 @@ export class SyncService {
(await response.json()) as DocumentUpdateResponse; // eslint-disable-line @typescript-eslint/no-unsafe-type-assertion
this.logger.debug(
`Updated document ${JSON.stringify(result)} with id ${
result.documentId
`Updated document ${JSON.stringify(result)} with id ${result.documentId
}}`
);
@ -208,8 +209,7 @@ export class SyncService {
(await response.json()) as DocumentUpdateResponse; // eslint-disable-line @typescript-eslint/no-unsafe-type-assertion
this.logger.debug(
`Updated document ${JSON.stringify(result)} with id ${
result.documentId
`Updated document ${JSON.stringify(result)} with id ${result.documentId
}}`
);
@ -336,7 +336,7 @@ export class SyncService {
return this.retryForever(async () => {
this.logger.debug(
"Getting all documents" +
(since != null ? ` since ${since}` : "")
(since != null ? ` since ${since}` : "")
);
const url = new URL(this.getUrl("/documents"));

View file

@ -6,7 +6,7 @@ import type { CursorPositionFromClient } from "./types/CursorPositionFromClient"
import type { ClientCursors } from "./types/ClientCursors";
import { createPromise } from "../utils/create-promise";
import type { WebSocketVaultUpdate } from "./types/WebSocketVaultUpdate";
import { WEBSOCKET_DISCONNECT_TIMEOUT_IN_S } from "../consts";
import { WEBSOCKET_DISCONNECT_TIMEOUT_IN_SECONDS } from "../consts";
import { removeFromArray } from "../utils/remove-from-array";
import { EventListeners } from "../utils/data-structures/event-listeners";
import { awaitAll } from "../utils/await-all";
@ -36,7 +36,7 @@ export class WebSocketManager {
private readonly logger: Logger,
private readonly settings: Settings,
private readonly webSocketFactoryImplementation: typeof globalThis.WebSocket = WebSocket
) {}
) { }
public get isWebSocketConnected(): boolean {
return (
@ -69,10 +69,10 @@ export class WebSocketManager {
timeoutId = setTimeout(() => {
reject(
new Error(
`Timeout waiting for WebSocket to close after ${WEBSOCKET_DISCONNECT_TIMEOUT_IN_S} seconds`
`Timeout waiting for WebSocket to close after ${WEBSOCKET_DISCONNECT_TIMEOUT_IN_SECONDS} seconds`
)
);
}, WEBSOCKET_DISCONNECT_TIMEOUT_IN_S * 1000);
}, WEBSOCKET_DISCONNECT_TIMEOUT_IN_SECONDS * 1000);
});
try {

View file

@ -56,7 +56,7 @@ export class SyncClient {
database: Partial<StoredDatabase>;
}>
>
) {}
) { }
public get documentCount(): number {
return this.database.length;
@ -472,7 +472,8 @@ export class SyncClient {
this.checkIfDestroyed("startSyncing");
this.fetchController.finishReset();
await this.serverConfig.initialize();
// warm the cache
await this.serverConfig.getConfig();
this.webSocketManager.start();
if (!this.hasStartedOfflineSync) {

View file

@ -113,7 +113,7 @@ export class CursorTracker {
documentsWithCursors.push({
relative_path: relativePath,
document_id: record.documentId,
document_id: record.metadata.documentId,
vault_update_id: record.metadata.parentVersionId,
cursors: cursors.map(({ start, end }) => ({
start: Math.min(start, end),

View file

@ -8,13 +8,12 @@ import type { SyncService } from "../services/sync-service";
import type { Logger } from "../tracing/logger";
import PQueue from "p-queue";
import { hash } from "../utils/hash";
import { v4 as uuidv4 } from "uuid";
import type { Settings } from "../persistence/settings";
import type { FileOperations } from "../file-operations/file-operations";
import { findMatchingFile } from "../utils/find-matching-file";
import type { UnrestrictedSyncer } from "./unrestricted-syncer";
import { createPromise } from "../utils/create-promise";
import { SyncResetError } from "../services/sync-reset-error";
import { SyncResetError } from "../errors/sync-reset-error";
import { Locks } from "../utils/data-structures/locks";
import type { DocumentVersionWithoutContent } from "../services/types/DocumentVersionWithoutContent";
import type { WebSocketVaultUpdate } from "../services/types/WebSocketVaultUpdate";
@ -98,9 +97,7 @@ export class Syncer {
const [promise, resolve, reject] = createPromise();
const id = uuidv4();
const document = this.database.createNewPendingDocument(
id,
relativePath,
promise
);
@ -171,7 +168,7 @@ export class Syncer {
// in that case, we mustn't move it again.
if (
this.database.getLatestDocumentByRelativePath(relativePath) ===
undefined ||
undefined ||
this.database.getLatestDocumentByRelativePath(relativePath)
?.isDeleted === true
) {
@ -391,8 +388,6 @@ export class Syncer {
}
private async internalScheduleSyncForOfflineChanges(): Promise<void> {
await this.createFakeDocumentsFromRemoteState();
const allLocalFiles = await this.operations.listFilesRecursively();
this.logger.info(
`Scheduling sync for ${allLocalFiles.length} local files`
@ -426,9 +421,19 @@ export class Syncer {
// Perhaps the file has been moved; let's check by looking at the deleted files
const contentHash = await this.syncQueue.add(async () => {
const contentBytes =
await this.operations.read(relativePath); // this can throw FileNotFoundError
return hash(contentBytes);
try {
const contentBytes =
await this.operations.read(relativePath); // this can throw FileNotFoundError
return hash(contentBytes);
} catch (e) {
if (
e instanceof Error &&
e.name === "FileNotFoundError"
) {
return undefined;
}
throw e;
}
});
if (contentHash == undefined) {
@ -481,42 +486,9 @@ export class Syncer {
return this.syncLocallyDeletedFile(relativePath);
})
);
}
/**
* Create fake documents in the database for all files that are present locally
* and also exist remotely. This will stop the subequent syncs from duplicating
* the documents by creating the same documents from multiple clients.
*/
private async createFakeDocumentsFromRemoteState(): Promise<void> {
if (this.database.getHasInitialSyncCompleted()) {
return;
}
const [allLocalFiles, remote] = await awaitAll([
this.operations.listFilesRecursively(),
this.syncQueue.add(async () => this.syncService.getAll())
]);
if (remote !== undefined) {
remote.latestDocuments
.filter(
(remoteDocument) =>
allLocalFiles.includes(remoteDocument.relativePath) &&
!remoteDocument.isDeleted &&
this.database.getDocumentByDocumentId(
remoteDocument.documentId
) === undefined
)
.forEach((remoteDocument) => {
this.database.createNewEmptyDocument(
remoteDocument.documentId,
remoteDocument.vaultUpdateId,
remoteDocument.relativePath
);
});
}
this.database.setHasInitialSyncCompleted(true);
}
}

View file

@ -82,9 +82,9 @@ export class UnrestrictedSyncer {
const contentHash = hash(contentBytes);
const response = await this.syncService.create({
documentId: document.documentId,
relativePath: originalRelativePath,
contentBytes
contentBytes,
forceMerge: !this.database.getHasInitialSyncCompleted() // don't duplicate files on first sync
});
// In case a document with the same name (but different ID) had existed remotely that we haven't known about
@ -100,6 +100,7 @@ export class UnrestrictedSyncer {
this.database.updateDocumentMetadata(
{
documentId: response.documentId,
parentVersionId: response.vaultUpdateId,
hash: contentHash,
remoteRelativePath: response.relativePath
@ -131,13 +132,21 @@ export class UnrestrictedSyncer {
};
await this.executeSync(updateDetails, async () => {
if (document.metadata === undefined) {
this.logger.debug(
`Document ${document.relativePath} has no metadata, so it was never synced remotely`
);
return;
}
const response = await this.syncService.delete({
documentId: document.documentId,
documentId: document.metadata.documentId,
relativePath: document.relativePath
});
this.database.updateDocumentMetadata(
{
...document.metadata,
parentVersionId: response.vaultUpdateId,
hash: EMPTY_HASH,
remoteRelativePath: document.relativePath
@ -170,14 +179,14 @@ export class UnrestrictedSyncer {
const updateDetails: SyncUpdateDetails | SyncMovedDetails =
oldPath !== undefined
? {
type: SyncType.MOVE,
relativePath: document.relativePath,
movedFrom: oldPath
}
type: SyncType.MOVE,
relativePath: document.relativePath,
movedFrom: oldPath
}
: {
type: SyncType.UPDATE,
relativePath: document.relativePath
};
type: SyncType.UPDATE,
relativePath: document.relativePath
};
await this.executeSync(updateDetails, async () => {
const originalRelativePath = document.relativePath;
@ -216,22 +225,22 @@ export class UnrestrictedSyncer {
response =
isText && cachedVersion !== undefined
? await this.syncService.putText({
documentId: document.documentId,
parentVersionId:
document.metadata.parentVersionId,
relativePath: document.relativePath,
content: diff(
new TextDecoder().decode(cachedVersion),
new TextDecoder().decode(contentBytes)
)
})
documentId: document.metadata.documentId,
parentVersionId:
document.metadata.parentVersionId,
relativePath: document.relativePath,
content: diff(
new TextDecoder().decode(cachedVersion),
new TextDecoder().decode(contentBytes)
)
})
: await this.syncService.putBinary({
documentId: document.documentId,
parentVersionId:
document.metadata.parentVersionId,
relativePath: document.relativePath,
contentBytes
});
documentId: document.metadata.documentId,
parentVersionId:
document.metadata.parentVersionId,
relativePath: document.relativePath,
contentBytes
});
} else {
if (!force) {
this.logger.debug(
@ -241,7 +250,7 @@ export class UnrestrictedSyncer {
}
response = await this.syncService.get({
documentId: document.documentId
documentId: document.metadata.documentId
});
}
@ -290,6 +299,7 @@ export class UnrestrictedSyncer {
this.database.updateDocumentMetadata(
{
...document.metadata,
parentVersionId: response.vaultUpdateId,
hash: contentHash,
remoteRelativePath: response.relativePath
@ -317,6 +327,7 @@ export class UnrestrictedSyncer {
} else {
this.database.updateDocumentMetadata(
{
...document.metadata,
parentVersionId: response.vaultUpdateId,
hash: contentHash,
remoteRelativePath: response.relativePath
@ -334,16 +345,16 @@ export class UnrestrictedSyncer {
const actualUpdateDetails: SyncUpdateDetails | SyncMovedDetails =
oldPath !== undefined ||
response.relativePath != originalRelativePath
response.relativePath != originalRelativePath
? {
type: SyncType.MOVE,
relativePath: response.relativePath,
movedFrom: originalRelativePath
}
type: SyncType.MOVE,
relativePath: response.relativePath,
movedFrom: originalRelativePath
}
: {
type: SyncType.UPDATE,
relativePath: response.relativePath
};
type: SyncType.UPDATE,
relativePath: response.relativePath
};
if (areThereLocalChanges) {
this.history.addHistoryEntry({
@ -437,12 +448,12 @@ export class UnrestrictedSyncer {
const [promise, resolve] = createPromise();
this.database.updateDocumentMetadata(
{
documentId: remoteVersion.documentId,
parentVersionId: remoteVersion.vaultUpdateId,
hash: hash(contentBytes),
remoteRelativePath: remoteVersion.relativePath
},
this.database.createNewPendingDocument(
remoteVersion.documentId,
remoteVersion.relativePath,
promise
)
@ -541,9 +552,8 @@ export class UnrestrictedSyncer {
type: SyncType.SKIPPED,
relativePath
},
message: `File size of ${sizeInMB} MB exceeds the maximum file size limit of ${
maxFileSizeMB
} MB`
message: `File size of ${sizeInMB} MB exceeds the maximum file size limit of ${maxFileSizeMB
} MB`
};
}
}
@ -582,6 +592,7 @@ export class UnrestrictedSyncer {
this.database.delete(document.relativePath);
this.database.updateDocumentMetadata(
{
documentId: response.documentId,
parentVersionId: response.vaultUpdateId,
hash: EMPTY_HASH,
remoteRelativePath: response.relativePath

View file

@ -9,7 +9,7 @@ type ResolvedTuple<T extends readonly unknown[]> = {
export const awaitAll = async <T extends readonly unknown[]>(
promises: PromiseTuple<T>
): Promise<ResolvedTuple<T>> => {
// eslint-disable-next-line no-restricted-properties
// eslint-disable-next-line no-restricted-properties, @typescript-eslint/await-thenable
const result = await Promise.allSettled(promises);
for (const res of result) {
if (res.status === "rejected") {