Improve network usage for small text changes (#166)

This commit is contained in:
Andras Schmelczer 2025-11-16 22:10:22 +00:00 committed by GitHub
parent 1da17c462e
commit be1635c26e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
20 changed files with 697 additions and 62 deletions

View file

@ -33,12 +33,7 @@ export default [
"@typescript-eslint/class-methods-use-this": "off", "@typescript-eslint/class-methods-use-this": "off",
"@typescript-eslint/consistent-return": "off", "@typescript-eslint/consistent-return": "off",
"@typescript-eslint/no-unsafe-argument": "off", "@typescript-eslint/no-unsafe-argument": "off",
"@typescript-eslint/max-params": [ "@typescript-eslint/max-params": "off",
"error",
{
max: 6
}
],
"@typescript-eslint/no-magic-numbers": "off", "@typescript-eslint/no-magic-numbers": "off",
"@typescript-eslint/prefer-readonly-parameter-types": "off", "@typescript-eslint/prefer-readonly-parameter-types": "off",
"@typescript-eslint/naming-convention": "off", "@typescript-eslint/naming-convention": "off",

View file

@ -1583,7 +1583,9 @@
} }
}, },
"node_modules/brace-expansion": { "node_modules/brace-expansion": {
"version": "1.1.11", "version": "1.1.12",
"resolved": "https://registry.npmjs.org/brace-expansion/-/brace-expansion-1.1.12.tgz",
"integrity": "sha512-9T9UjW3r0UW5c1Q7GTwllptXwhvYmEzFhzMfZ9H7FQWt+uZePjZPjBP/W1ZEyZ1twGWom5/56TF4lPcqjnDHcg==",
"dev": true, "dev": true,
"license": "MIT", "license": "MIT",
"dependencies": { "dependencies": {
@ -2742,7 +2744,9 @@
} }
}, },
"node_modules/js-yaml": { "node_modules/js-yaml": {
"version": "4.1.0", "version": "4.1.1",
"resolved": "https://registry.npmjs.org/js-yaml/-/js-yaml-4.1.1.tgz",
"integrity": "sha512-qQKT4zQxXl8lLwBtHMWwaTcGfFOZviOJet3Oy/xmGk2gZH677CJM9EvtfdSkgWcATZhj/55JZ0rmy3myCT5lsA==",
"dev": true, "dev": true,
"license": "MIT", "license": "MIT",
"dependencies": { "dependencies": {
@ -3487,6 +3491,7 @@
"version": "0.5.0", "version": "0.5.0",
"resolved": "https://registry.npmjs.org/reconcile-text/-/reconcile-text-0.5.0.tgz", "resolved": "https://registry.npmjs.org/reconcile-text/-/reconcile-text-0.5.0.tgz",
"integrity": "sha512-zki3lqw9Oxdhm9ZvDN17VyYoL1Isc8BEL07ILVDE2yGfNEI7thrkczoNCUr+hkFU2rzZtfxECTG0b7p61AJ6wg==", "integrity": "sha512-zki3lqw9Oxdhm9ZvDN17VyYoL1Isc8BEL07ILVDE2yGfNEI7thrkczoNCUr+hkFU2rzZtfxECTG0b7p61AJ6wg==",
"dev": true,
"license": "MIT" "license": "MIT"
}, },
"node_modules/regex-parser": { "node_modules/regex-parser": {
@ -4687,7 +4692,7 @@
"byte-base64": "^1.1.0", "byte-base64": "^1.1.0",
"minimatch": "^10.0.1", "minimatch": "^10.0.1",
"p-queue": "^8.1.0", "p-queue": "^8.1.0",
"reconcile-text": "^0.5.0", "reconcile-text": "^0.7.1",
"uuid": "^13.0.0" "uuid": "^13.0.0"
}, },
"devDependencies": { "devDependencies": {
@ -4703,7 +4708,9 @@
} }
}, },
"sync-client/node_modules/brace-expansion": { "sync-client/node_modules/brace-expansion": {
"version": "2.0.1", "version": "2.0.2",
"resolved": "https://registry.npmjs.org/brace-expansion/-/brace-expansion-2.0.2.tgz",
"integrity": "sha512-Jt0vHyM+jmUBqojB7E1NIYadt0vI0Qxjxd2TErW94wDz+E2LAm5vKMXXwg6ZZBTHPuUlDgQHKXvjGBdfcF1ZDQ==",
"license": "MIT", "license": "MIT",
"dependencies": { "dependencies": {
"balanced-match": "^1.0.0" "balanced-match": "^1.0.0"
@ -4722,6 +4729,12 @@
"url": "https://github.com/sponsors/isaacs" "url": "https://github.com/sponsors/isaacs"
} }
}, },
"sync-client/node_modules/reconcile-text": {
"version": "0.7.1",
"resolved": "https://registry.npmjs.org/reconcile-text/-/reconcile-text-0.7.1.tgz",
"integrity": "sha512-khedcYvAKs7ELKh5Z8mz2vyomMY5TqznV1dB+k/7qUAX9cheMNN5/EPJVQYZepOMunYbnQitvhFJX3kD4IMcNw==",
"license": "MIT"
},
"test-client": { "test-client": {
"version": "0.9.2", "version": "0.9.2",
"bin": { "bin": {

View file

@ -16,7 +16,7 @@
"byte-base64": "^1.1.0", "byte-base64": "^1.1.0",
"minimatch": "^10.0.1", "minimatch": "^10.0.1",
"p-queue": "^8.1.0", "p-queue": "^8.1.0",
"reconcile-text": "^0.5.0", "reconcile-text": "^0.7.1",
"uuid": "^13.0.0" "uuid": "^13.0.0"
}, },
"devDependencies": { "devDependencies": {

View file

@ -3,8 +3,9 @@ import type { FileSystemOperations } from "./filesystem-operations";
import type { Database, RelativePath } from "../persistence/database"; import type { Database, RelativePath } from "../persistence/database";
import { SafeFileSystemOperations } from "./safe-filesystem-operations"; import { SafeFileSystemOperations } from "./safe-filesystem-operations";
import type { TextWithCursors } from "reconcile-text"; import type { TextWithCursors } from "reconcile-text";
import { isBinary, reconcile } from "reconcile-text"; import { reconcile } from "reconcile-text";
import { isFileTypeMergable } from "../utils/is-file-type-mergable"; import { isFileTypeMergable } from "../utils/is-file-type-mergable";
import { isBinary } from "../utils/is-binary";
export class FileOperations { export class FileOperations {
private static readonly PARENTHESES_REGEX = / \((\d+)\)$/; private static readonly PARENTHESES_REGEX = / \((\d+)\)$/;

View file

@ -16,6 +16,7 @@ import type { DocumentVersion } from "./types/DocumentVersion";
import type { FetchLatestDocumentsResponse } from "./types/FetchLatestDocumentsResponse"; import type { FetchLatestDocumentsResponse } from "./types/FetchLatestDocumentsResponse";
import type { PingResponse } from "./types/PingResponse"; import type { PingResponse } from "./types/PingResponse";
import type { DeleteDocumentVersion } from "./types/DeleteDocumentVersion"; import type { DeleteDocumentVersion } from "./types/DeleteDocumentVersion";
import type { UpdateTextDocumentVersion } from "./types/UpdateTextDocumentVersion";
export interface CheckConnectionResult { export interface CheckConnectionResult {
isSuccessful: boolean; isSuccessful: boolean;
@ -102,7 +103,59 @@ export class SyncService {
}); });
} }
public async put({ public async putText({
parentVersionId,
documentId,
relativePath,
content
}: {
parentVersionId: VaultUpdateId;
documentId: DocumentId;
relativePath: RelativePath;
content: (number | string)[];
}): Promise<DocumentUpdateResponse> {
return this.withRetries(async () => {
this.logger.debug(
`Updating text document ${documentId} with parent version ${parentVersionId} and relative path ${relativePath}`
);
const request: UpdateTextDocumentVersion = {
parentVersionId,
relativePath,
content
};
const response = await this.client(
this.getUrl(`/documents/${documentId}/text`),
{
method: "PUT",
body: JSON.stringify(request),
headers: this.getDefaultHeaders({ type: "json" })
}
);
const result: SerializedError | DocumentUpdateResponse =
(await response.json()) as // eslint-disable-line @typescript-eslint/no-unsafe-type-assertion
| SerializedError
| DocumentUpdateResponse;
if ("errorType" in result) {
throw new Error(
`Failed to update document: ${SyncService.formatError(result)}`
);
}
this.logger.debug(
`Updated document ${JSON.stringify(result)} with id ${
result.documentId
}}`
);
return result;
});
}
public async putBinary({
parentVersionId, parentVersionId,
documentId, documentId,
relativePath, relativePath,
@ -115,7 +168,7 @@ export class SyncService {
}): Promise<DocumentUpdateResponse> { }): Promise<DocumentUpdateResponse> {
return this.withRetries(async () => { return this.withRetries(async () => {
this.logger.debug( this.logger.debug(
`Updating document ${documentId} with parent version ${parentVersionId} and relative path ${relativePath}` `Updating binary document ${documentId} with parent version ${parentVersionId} and relative path ${relativePath}`
); );
const formData = new FormData(); const formData = new FormData();
formData.append("parent_version_id", parentVersionId.toString()); formData.append("parent_version_id", parentVersionId.toString());
@ -126,7 +179,7 @@ export class SyncService {
); );
const response = await this.client( const response = await this.client(
this.getUrl(`/documents/${documentId}`), this.getUrl(`/documents/${documentId}/binary`),
{ {
method: "PUT", method: "PUT",
body: formData, body: formData,
@ -171,10 +224,7 @@ export class SyncService {
{ {
method: "DELETE", method: "DELETE",
body: JSON.stringify(request), body: JSON.stringify(request),
headers: { headers: this.getDefaultHeaders({ type: "json" })
"Content-Type": "application/json",
...this.getDefaultHeaders()
}
} }
); );
@ -297,11 +347,19 @@ export class SyncService {
return `${safeRemoteUri}/vaults/${vaultName}${path}`; return `${safeRemoteUri}/vaults/${vaultName}${path}`;
} }
private getDefaultHeaders(): Record<string, string> { private getDefaultHeaders(
return { { type }: { type?: "json" } = { type: undefined }
): Record<string, string> {
const headers: Record<string, string> = {
"device-id": this.deviceId, "device-id": this.deviceId,
authorization: `Bearer ${this.settings.getSettings().token}` authorization: `Bearer ${this.settings.getSettings().token}`
}; };
if (type === "json") {
headers["Content-Type"] = "application/json";
}
return headers;
} }
private async withRetries<T>(fn: () => Promise<T>): Promise<T> { private async withRetries<T>(fn: () => Promise<T>): Promise<T> {

View file

@ -0,0 +1,7 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
export interface UpdateTextDocumentVersion {
parentVersionId: number;
relativePath: string;
content: (number | string)[];
}

View file

@ -21,13 +21,13 @@ import { CursorTracker } from "./sync-operations/cursor-tracker";
import type { CursorSpan } from "./services/types/CursorSpan"; import type { CursorSpan } from "./services/types/CursorSpan";
import type { MaybeOutdatedClientCursors } from "./types/maybe-outdated-client-cursors"; import type { MaybeOutdatedClientCursors } from "./types/maybe-outdated-client-cursors";
import { FileChangeNotifier } from "./sync-operations/file-change-notifier"; import { FileChangeNotifier } from "./sync-operations/file-change-notifier";
import { FixedSizeDocumentCache } from "./utils/fix-sized-cache";
export class SyncClient { export class SyncClient {
private static readonly MINIMUM_SAVE_INTERVAL_MS = 1000; private static readonly MINIMUM_SAVE_INTERVAL_MS = 1000;
private hasStartedOfflineSync = false; private hasStartedOfflineSync = false;
private hasFinishedOfflineSync = false; private hasFinishedOfflineSync = false;
// eslint-disable-next-line @typescript-eslint/max-params
private constructor( private constructor(
private readonly history: SyncHistory, private readonly history: SyncHistory,
private readonly settings: Settings, private readonly settings: Settings,
@ -135,13 +135,15 @@ export class SyncClient {
nativeLineEndings nativeLineEndings
); );
const contentCache = new FixedSizeDocumentCache(1024 * 1024 * 2); // 2 MB cache
const unrestrictedSyncer = new UnrestrictedSyncer( const unrestrictedSyncer = new UnrestrictedSyncer(
logger, logger,
database, database,
settings, settings,
syncService, syncService,
fileOperations, fileOperations,
history history,
contentCache
); );
const syncer = new Syncer( const syncer = new Syncer(
@ -150,7 +152,8 @@ export class SyncClient {
settings, settings,
syncService, syncService,
fileOperations, fileOperations,
unrestrictedSyncer unrestrictedSyncer,
contentCache
); );
const webSocketManager = new WebSocketManager( const webSocketManager = new WebSocketManager(

View file

@ -17,6 +17,7 @@ import { createPromise } from "../utils/create-promise";
import { SyncResetError } from "../services/sync-reset-error"; import { SyncResetError } from "../services/sync-reset-error";
import { Locks } from "../utils/locks"; import { Locks } from "../utils/locks";
import type { DocumentVersionWithoutContent } from "../services/types/DocumentVersionWithoutContent"; import type { DocumentVersionWithoutContent } from "../services/types/DocumentVersionWithoutContent";
import type { FixedSizeDocumentCache } from "../utils/fix-sized-cache";
export class Syncer { export class Syncer {
private readonly remoteDocumentsLock: Locks<DocumentId>; private readonly remoteDocumentsLock: Locks<DocumentId>;
@ -33,7 +34,8 @@ export class Syncer {
settings: Settings, settings: Settings,
private readonly syncService: SyncService, private readonly syncService: SyncService,
private readonly operations: FileOperations, private readonly operations: FileOperations,
private readonly internalSyncer: UnrestrictedSyncer private readonly internalSyncer: UnrestrictedSyncer,
private readonly contentCache: FixedSizeDocumentCache
) { ) {
this.syncQueue = new PQueue({ this.syncQueue = new PQueue({
concurrency: settings.getSettings().syncConcurrency concurrency: settings.getSettings().syncConcurrency
@ -250,6 +252,7 @@ export class Syncer {
public async reset(): Promise<void> { public async reset(): Promise<void> {
await this.waitUntilFinished(); await this.waitUntilFinished();
this.contentCache.clear();
} }
public async syncRemotelyUpdatedFile( public async syncRemotelyUpdatedFile(

View file

@ -4,6 +4,7 @@ import type {
RelativePath RelativePath
} from "../persistence/database"; } from "../persistence/database";
import { diff } from "reconcile-text";
import type { SyncService } from "../services/sync-service"; import type { SyncService } from "../services/sync-service";
import type { Logger } from "../tracing/logger"; import type { Logger } from "../tracing/logger";
import type { import type {
@ -27,6 +28,9 @@ import { globsToRegexes } from "../utils/globs-to-regexes";
import type { DocumentVersion } from "../services/types/DocumentVersion"; import type { DocumentVersion } from "../services/types/DocumentVersion";
import type { DocumentUpdateResponse } from "../services/types/DocumentUpdateResponse"; import type { DocumentUpdateResponse } from "../services/types/DocumentUpdateResponse";
import type { DocumentVersionWithoutContent } from "../services/types/DocumentVersionWithoutContent"; import type { DocumentVersionWithoutContent } from "../services/types/DocumentVersionWithoutContent";
import type { FixedSizeDocumentCache } from "../utils/fix-sized-cache";
import { isFileTypeMergable } from "../utils/is-file-type-mergable";
import { isBinary } from "../utils/is-binary";
export class UnrestrictedSyncer { export class UnrestrictedSyncer {
private ignorePatterns: RegExp[]; private ignorePatterns: RegExp[];
@ -37,7 +41,8 @@ export class UnrestrictedSyncer {
private readonly settings: Settings, private readonly settings: Settings,
private readonly syncService: SyncService, private readonly syncService: SyncService,
private readonly operations: FileOperations, private readonly operations: FileOperations,
private readonly history: SyncHistory private readonly history: SyncHistory,
private readonly contentCache: FixedSizeDocumentCache
) { ) {
this.ignorePatterns = globsToRegexes( this.ignorePatterns = globsToRegexes(
this.settings.getSettings().ignorePatterns, this.settings.getSettings().ignorePatterns,
@ -87,8 +92,12 @@ export class UnrestrictedSyncer {
}, },
document document
); );
this.database.addSeenUpdateId(response.vaultUpdateId); this.database.addSeenUpdateId(response.vaultUpdateId);
this.updateCache(
response.vaultUpdateId,
contentBytes,
response.relativePath
);
this.history.addHistoryEntry({ this.history.addHistoryEntry({
status: SyncStatus.SUCCESS, status: SyncStatus.SUCCESS,
@ -178,12 +187,32 @@ export class UnrestrictedSyncer {
undefined; undefined;
if (areThereLocalChanges) { if (areThereLocalChanges) {
response = await this.syncService.put({ const isText =
documentId: document.documentId, !isBinary(contentBytes) &&
parentVersionId: document.metadata.parentVersionId, isFileTypeMergable(document.relativePath);
relativePath: document.relativePath, const cachedVersion = this.contentCache.get(
contentBytes document.metadata.parentVersionId
}); );
response =
isText && cachedVersion !== undefined
? await this.syncService.putText({
documentId: document.documentId,
parentVersionId:
document.metadata.parentVersionId,
relativePath: document.relativePath,
content: diff(
new TextDecoder().decode(cachedVersion),
new TextDecoder().decode(contentBytes)
)
})
: await this.syncService.putBinary({
documentId: document.documentId,
parentVersionId:
document.metadata.parentVersionId,
relativePath: document.relativePath,
contentBytes
});
} else { } else {
if (!force) { if (!force) {
this.logger.debug( this.logger.debug(
@ -274,12 +303,16 @@ export class UnrestrictedSyncer {
}, },
document document
); );
await this.operations.write( await this.operations.write(
actualPath, actualPath,
contentBytes, contentBytes,
responseBytes responseBytes
); );
this.updateCache(
response.vaultUpdateId,
responseBytes,
actualPath
);
if (!force) { if (!force) {
this.history.addHistoryEntry({ this.history.addHistoryEntry({
@ -297,6 +330,11 @@ export class UnrestrictedSyncer {
}, },
document document
); );
this.updateCache(
response.vaultUpdateId,
contentBytes,
actualPath
);
} }
this.database.addSeenUpdateId(response.vaultUpdateId); this.database.addSeenUpdateId(response.vaultUpdateId);
@ -423,6 +461,11 @@ export class UnrestrictedSyncer {
remoteVersion.relativePath, remoteVersion.relativePath,
contentBytes contentBytes
); );
this.updateCache(
remoteVersion.vaultUpdateId,
contentBytes,
remoteVersion.relativePath
);
resolve(); resolve();
this.database.removeDocumentPromise(promise); this.database.removeDocumentPromise(promise);
@ -513,4 +556,14 @@ export class UnrestrictedSyncer {
}; };
} }
} }
private updateCache(
updateId: number,
contentBytes: Uint8Array,
filePath: RelativePath
): void {
if (isFileTypeMergable(filePath) && !isBinary(contentBytes)) {
this.contentCache.put(updateId, contentBytes);
}
}
} }

View file

@ -0,0 +1,239 @@
import { describe, it } from "node:test";
import assert from "node:assert";
import { FixedSizeDocumentCache } from "./fix-sized-cache";
describe("fixedSizeDocumentCache", () => {
it("happyPath", async () => {
const cache = new FixedSizeDocumentCache(4);
const doc1 = new Uint8Array([1, 2]);
const doc2 = new Uint8Array([3, 4]);
const doc3 = new Uint8Array([5, 6]);
cache.put(1, doc1);
assert.equal(cache.get(1), doc1);
cache.put(2, doc2);
assert.equal(cache.get(1), doc1);
assert.equal(cache.get(2), doc2);
cache.put(3, doc3);
assert.equal(cache.get(1), undefined);
assert.equal(cache.get(2), doc2);
assert.equal(cache.get(3), doc3);
});
it("updateExistingEntry", async () => {
const cache = new FixedSizeDocumentCache(4);
const doc1_v1 = new Uint8Array([1, 2]);
const doc1_v2 = new Uint8Array([3, 4]);
const doc2 = new Uint8Array([5, 6]);
cache.put(1, doc1_v1);
assert.equal(cache.get(1), doc1_v1);
cache.put(2, doc2);
assert.equal(cache.get(1), doc1_v1);
assert.equal(cache.get(2), doc2);
cache.put(1, doc1_v2); // Update doc1
assert.equal(cache.get(1), doc1_v2);
assert.equal(cache.get(2), doc2);
});
it("evictOldestEntry", async () => {
const cache = new FixedSizeDocumentCache(4);
const doc1 = new Uint8Array([1, 2]);
const doc2 = new Uint8Array([3, 4]);
const doc3 = new Uint8Array([5, 6]);
cache.put(1, doc1);
cache.put(2, doc2);
assert.equal(cache.get(2), doc2);
assert.equal(cache.get(1), doc1);
cache.put(3, doc3);
assert.equal(cache.get(1), doc1);
assert.equal(cache.get(2), undefined);
assert.equal(cache.get(3), doc3);
});
it("tooLargeEntry", async () => {
const cache = new FixedSizeDocumentCache(2);
const doc1 = new Uint8Array([1, 2, 3]);
cache.put(1, doc1);
assert.equal(cache.get(1), undefined);
});
it("multipleEvictionsInSinglePut", async () => {
const cache = new FixedSizeDocumentCache(10);
const doc1 = new Uint8Array([1, 2]);
const doc2 = new Uint8Array([3, 4]);
const doc3 = new Uint8Array([5, 6]);
const doc4 = new Uint8Array([7, 8, 9, 10, 11, 12, 13, 14]); // 8 bytes
cache.put(1, doc1);
cache.put(2, doc2);
cache.put(3, doc3);
// Cache now has 6 bytes total
cache.put(4, doc4); // Should evict doc1 and doc2 to make room (total: 2+8=10)
assert.equal(cache.get(1), undefined); // Evicted
assert.equal(cache.get(2), undefined); // Evicted
assert.equal(cache.get(3), doc3); // Still present
assert.equal(cache.get(4), doc4);
});
it("clearCache", async () => {
const cache = new FixedSizeDocumentCache(10);
const doc1 = new Uint8Array([1, 2]);
const doc2 = new Uint8Array([3, 4]);
cache.put(1, doc1);
cache.put(2, doc2);
assert.equal(cache.get(1), doc1);
assert.equal(cache.get(2), doc2);
cache.clear();
assert.equal(cache.get(1), undefined);
assert.equal(cache.get(2), undefined);
// Should be able to add entries after clear
cache.put(3, doc1);
assert.equal(cache.get(3), doc1);
});
it("getNonExistentKey", async () => {
const cache = new FixedSizeDocumentCache(10);
const doc1 = new Uint8Array([1, 2]);
cache.put(1, doc1);
assert.equal(cache.get(999), undefined);
});
it("updateEntryWithDifferentSizeTriggeringEviction", async () => {
const cache = new FixedSizeDocumentCache(6);
const doc1_v1 = new Uint8Array([1, 2]);
const doc1_v2 = new Uint8Array([1, 2, 3, 4]); // Larger version
const doc2 = new Uint8Array([5, 6]);
const doc3 = new Uint8Array([7, 8]);
cache.put(1, doc1_v1);
cache.put(2, doc2);
cache.put(3, doc3);
// Update doc1 with larger version, should evict doc2
cache.put(1, doc1_v2);
assert.equal(cache.get(1), doc1_v2);
assert.equal(cache.get(2), undefined); // Evicted
assert.equal(cache.get(3), doc3);
});
it("singleItemCache", async () => {
const cache = new FixedSizeDocumentCache(2);
const doc1 = new Uint8Array([1, 2]);
const doc2 = new Uint8Array([3, 4]);
cache.put(1, doc1);
assert.equal(cache.get(1), doc1);
cache.put(2, doc2);
assert.equal(cache.get(1), undefined); // Evicted
assert.equal(cache.get(2), doc2);
});
it("multipleGetsOnSameEntry", async () => {
const cache = new FixedSizeDocumentCache(4);
const doc1 = new Uint8Array([1, 2]);
const doc2 = new Uint8Array([3, 4]);
const doc3 = new Uint8Array([5, 6]);
cache.put(1, doc1);
cache.put(2, doc2);
// Multiple gets on doc1
cache.get(1);
cache.get(1);
cache.get(1);
// Order should be: 2 (LRU), 1 (MRU)
cache.put(3, doc3);
assert.equal(cache.get(1), doc1);
assert.equal(cache.get(2), undefined); // Evicted
assert.equal(cache.get(3), doc3);
});
it("exactlySizedEntry", async () => {
const cache = new FixedSizeDocumentCache(4);
const doc1 = new Uint8Array([1, 2, 3, 4]); // Exactly cache size
cache.put(1, doc1);
assert.equal(cache.get(1), doc1);
const doc2 = new Uint8Array([5, 6]);
cache.put(2, doc2);
// doc1 should be evicted to make room for doc2
assert.equal(cache.get(1), undefined);
assert.equal(cache.get(2), doc2);
});
it("updateEntryMakesItMostRecent", async () => {
const cache = new FixedSizeDocumentCache(6);
const doc1_v1 = new Uint8Array([1, 2]);
const doc1_v2 = new Uint8Array([3, 4]);
const doc2 = new Uint8Array([5, 6]);
const doc3 = new Uint8Array([7, 8]);
const doc4 = new Uint8Array([9, 10]);
cache.put(1, doc1_v1);
cache.put(2, doc2);
cache.put(3, doc3);
// Update doc1 (should move it to most recent)
cache.put(1, doc1_v2);
// Order should be: 2 (LRU), 3, 1 (MRU)
// Adding doc4 should evict doc2
cache.put(4, doc4);
assert.equal(cache.get(1), doc1_v2);
assert.equal(cache.get(2), undefined); // Evicted
assert.equal(cache.get(3), doc3);
assert.equal(cache.get(4), doc4);
});
it("alternatingAccessPattern", async () => {
const cache = new FixedSizeDocumentCache(4);
const doc1 = new Uint8Array([1, 2]);
const doc2 = new Uint8Array([3, 4]);
const doc3 = new Uint8Array([5, 6]);
cache.put(1, doc1);
cache.put(2, doc2);
// Alternate access between doc1 and doc2
cache.get(1);
cache.get(2);
cache.get(1);
cache.get(2);
// Order should be: 1, 2 (MRU)
cache.put(3, doc3);
assert.equal(cache.get(1), undefined); // Evicted
assert.equal(cache.get(2), doc2);
assert.equal(cache.get(3), doc3);
});
it("zeroByteDocs", async () => {
const cache = new FixedSizeDocumentCache(2);
const doc1 = new Uint8Array([]);
const doc2 = new Uint8Array([]);
const doc3 = new Uint8Array([1, 2]);
cache.put(1, doc1);
cache.put(2, doc2);
cache.put(3, doc3);
assert.equal(cache.get(1), doc1);
assert.equal(cache.get(2), doc2);
assert.equal(cache.get(3), doc3);
});
});

View file

@ -0,0 +1,113 @@
// Implements an in-memory fixed-size cache for document contents,
import type { VaultUpdateId } from "../persistence/database";
// Doubly-linked list node for O(1) LRU operations
class LRUNode {
public constructor(
public key: VaultUpdateId,
public value: Uint8Array,
public prev: LRUNode | null = null,
public next: LRUNode | null = null
) {}
}
// evicting the least recently used documents when the size limit is exceeded.
export class FixedSizeDocumentCache {
private readonly maxSizeInBytes: number;
private currentSizeInBytes: number;
private readonly cache: Map<VaultUpdateId, LRUNode>;
private head: LRUNode | null; // Least recently used
private tail: LRUNode | null; // Most recently used
public constructor(maxSizeInBytes: number) {
this.maxSizeInBytes = maxSizeInBytes;
this.currentSizeInBytes = 0;
this.cache = new Map();
this.head = null;
this.tail = null;
}
public get(updateId: VaultUpdateId): Uint8Array | undefined {
const node = this.cache.get(updateId);
if (node) {
this.moveToTail(node);
return node.value;
}
return undefined;
}
public put(updateId: VaultUpdateId, content: Uint8Array): void {
if (content.byteLength > this.maxSizeInBytes) {
// Document is too large to fit in the cache
return;
}
// If the document is already in the cache, update it
const existingNode = this.cache.get(updateId);
if (existingNode != null) {
this.currentSizeInBytes -= existingNode.value.byteLength;
this.removeNode(existingNode);
this.cache.delete(updateId);
}
const newNode = new LRUNode(updateId, content);
this.cache.set(updateId, newNode);
this.addToTail(newNode);
this.currentSizeInBytes += content.byteLength;
// Evict least recently used documents if over size limit
while (this.currentSizeInBytes > this.maxSizeInBytes && this.head) {
const lruNode = this.head;
this.removeNode(lruNode);
this.cache.delete(lruNode.key);
this.currentSizeInBytes -= lruNode.value.byteLength;
}
}
public clear(): void {
this.cache.clear();
this.head = null;
this.tail = null;
this.currentSizeInBytes = 0;
}
private removeNode(node: LRUNode): void {
if (node.prev) {
node.prev.next = node.next;
} else {
this.head = node.next;
}
if (node.next) {
node.next.prev = node.prev;
} else {
this.tail = node.prev;
}
node.prev = null;
node.next = null;
}
private addToTail(node: LRUNode): void {
node.prev = this.tail;
node.next = null;
if (this.tail) {
this.tail.next = node;
}
this.tail = node;
this.head ??= node;
}
private moveToTail(node: LRUNode): void {
if (node === this.tail) {
return;
}
this.removeNode(node);
this.addToTail(node);
}
}

View file

@ -0,0 +1,16 @@
// Text is unlikely to contain null bytes, so we can use that to distinguish binary files.
export function isBinary(content: Uint8Array): boolean {
for (const byte of content) {
if (byte === 0) {
return true;
}
}
try {
new TextDecoder("utf-8", { fatal: true }).decode(content);
} catch {
return true;
}
return false;
}

View file

@ -1680,9 +1680,12 @@ dependencies = [
[[package]] [[package]]
name = "reconcile-text" name = "reconcile-text"
version = "0.5.0" version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c8d690c19b0bf6574cd3591d10f20df5aa52d2af95b8dcaacbc86893292ac8c5" checksum = "913440a3c2b90cd3ed3e967660f2bb624b71e8059b9fc86960a5f91bd1e2e353"
dependencies = [
"serde",
]
[[package]] [[package]]
name = "redox_syscall" name = "redox_syscall"

View file

@ -35,7 +35,7 @@ bimap = "0.6.3"
ts-rs = { version = "10.1", features = ["uuid-impl", "chrono-impl"] } ts-rs = { version = "10.1", features = ["uuid-impl", "chrono-impl"] }
serde_with = "3.15.1" serde_with = "3.15.1"
base64 = "0.22.1" base64 = "0.22.1"
reconcile-text = "0.5.0" reconcile-text = { version = "0.7.1", features = ["serde"] }
[profile.release] [profile.release]
codegen-units = 1 codegen-units = 1

View file

@ -117,8 +117,12 @@ fn get_authed_routes(app_state: AppState) -> Router<AppState> {
get(fetch_latest_document_version::fetch_latest_document_version), get(fetch_latest_document_version::fetch_latest_document_version),
) )
.route( .route(
"/vaults/:vault_id/documents/:document_id", "/vaults/:vault_id/documents/:document_id/binary",
put(update_document::update_document), put(update_document::update_binary),
)
.route(
"/vaults/:vault_id/documents/:document_id/text",
put(update_document::update_text),
) )
.route( .route(
"/vaults/:vault_id/documents/:document_id/versions/:version_id", "/vaults/:vault_id/documents/:document_id/versions/:version_id",

View file

@ -1,5 +1,6 @@
use axum::body::Bytes; use axum::body::Bytes;
use axum_typed_multipart::{FieldData, TryFromMultipart}; use axum_typed_multipart::{FieldData, TryFromMultipart};
use reconcile_text::NumberOrString;
use serde::{self, Deserialize}; use serde::{self, Deserialize};
use ts_rs::TS; use ts_rs::TS;
@ -20,17 +21,28 @@ pub struct CreateDocumentVersion {
pub content: FieldData<Bytes>, pub content: FieldData<Bytes>,
} }
#[derive(TS, Debug, TryFromMultipart)] #[derive(Debug, TryFromMultipart)]
#[ts(export)] pub struct UpdateBinaryDocumentVersion {
pub struct UpdateDocumentVersion {
pub parent_version_id: VaultUpdateId, pub parent_version_id: VaultUpdateId,
pub relative_path: String, pub relative_path: String,
#[ts(as = "Vec<u8>")]
#[form_data(limit = "unlimited")] #[form_data(limit = "unlimited")]
pub content: FieldData<Bytes>, pub content: FieldData<Bytes>,
} }
#[derive(TS, Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
#[ts(export)]
pub struct UpdateTextDocumentVersion {
#[ts(as = "i32")]
pub parent_version_id: VaultUpdateId,
pub relative_path: String,
#[ts(type = "Array<number | string>")]
pub content: Vec<NumberOrString>,
}
#[derive(TS, Debug, Deserialize)] #[derive(TS, Debug, Deserialize)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
#[ts(export)] #[ts(export)]

View file

@ -6,23 +6,25 @@ use axum::{
use axum_extra::TypedHeader; use axum_extra::TypedHeader;
use axum_typed_multipart::TypedMultipart; use axum_typed_multipart::TypedMultipart;
use log::info; use log::info;
use reconcile_text::{BuiltinTokenizer, is_binary, reconcile}; use reconcile_text::{BuiltinTokenizer, EditedText, reconcile};
use serde::Deserialize; use serde::Deserialize;
use super::{ use super::{
device_id_header::DeviceIdHeader, requests::UpdateDocumentVersion, device_id_header::DeviceIdHeader, requests::UpdateTextDocumentVersion,
responses::DocumentUpdateResponse, responses::DocumentUpdateResponse,
}; };
use crate::{ use crate::{
app_state::{ app_state::{
AppState, AppState,
database::models::{DocumentId, StoredDocumentVersion, VaultId}, database::models::{DocumentId, StoredDocumentVersion, VaultId, VaultUpdateId},
}, },
config::user_config::User, config::user_config::User,
errors::{SyncServerError, not_found_error, server_error}, errors::{SyncServerError, not_found_error, server_error},
server::requests::UpdateBinaryDocumentVersion,
utils::{ utils::{
dedup_paths::dedup_paths, is_file_type_mergable::is_file_type_mergable, dedup_paths::dedup_paths, is_binary::is_binary,
normalize::normalize, sanitize_path::sanitize_path, is_file_type_mergable::is_file_type_mergable, normalize::normalize,
sanitize_path::sanitize_path,
}, },
}; };
@ -30,13 +32,11 @@ use crate::{
pub struct UpdateDocumentPathParams { pub struct UpdateDocumentPathParams {
#[serde(deserialize_with = "normalize")] #[serde(deserialize_with = "normalize")]
vault_id: VaultId, vault_id: VaultId,
document_id: DocumentId, document_id: DocumentId,
} }
#[axum::debug_handler] #[axum::debug_handler]
#[allow(clippy::too_many_lines)] pub async fn update_binary(
pub async fn update_document(
Path(UpdateDocumentPathParams { Path(UpdateDocumentPathParams {
vault_id, vault_id,
document_id, document_id,
@ -44,25 +44,92 @@ pub async fn update_document(
Extension(user): Extension<User>, Extension(user): Extension<User>,
TypedHeader(device_id): TypedHeader<DeviceIdHeader>, TypedHeader(device_id): TypedHeader<DeviceIdHeader>,
State(state): State<AppState>, State(state): State<AppState>,
TypedMultipart(request): TypedMultipart<UpdateDocumentVersion>, TypedMultipart(request): TypedMultipart<UpdateBinaryDocumentVersion>,
) -> Result<Json<DocumentUpdateResponse>, SyncServerError> { ) -> Result<Json<DocumentUpdateResponse>, SyncServerError> {
// No need for a transaction as document versions are immutable let parent_document = get_parent_document(&state, &vault_id, request.parent_version_id).await?;
let parent_document = state let content = request.content.contents.to_vec();
update_document(
parent_document,
vault_id,
document_id,
user,
device_id,
state,
&request.relative_path,
content,
)
.await
}
#[axum::debug_handler]
#[allow(clippy::too_many_lines)]
pub async fn update_text(
Path(UpdateDocumentPathParams {
vault_id,
document_id,
}): Path<UpdateDocumentPathParams>,
Extension(user): Extension<User>,
TypedHeader(device_id): TypedHeader<DeviceIdHeader>,
State(state): State<AppState>,
Json(request): Json<UpdateTextDocumentVersion>,
) -> Result<Json<DocumentUpdateResponse>, SyncServerError> {
let parent_document = get_parent_document(&state, &vault_id, request.parent_version_id).await?;
let edited_text = EditedText::from_diff(
str::from_utf8(&parent_document.content)
.expect("parent must be valid UTF-8 because it's a text document"),
request.content,
&*BuiltinTokenizer::Word,
);
let content = edited_text.apply().text().into_bytes();
update_document(
parent_document,
vault_id,
document_id,
user,
device_id,
state,
&request.relative_path,
content,
)
.await
}
async fn get_parent_document(
state: &AppState,
vault_id: &VaultId,
parent_version_id: VaultUpdateId,
) -> Result<StoredDocumentVersion, SyncServerError> {
state
.database .database
.get_document_version(&vault_id, request.parent_version_id, None) .get_document_version(vault_id, parent_version_id, None)
.await .await
.map_err(server_error)? .map_err(server_error)?
.map_or_else( .map_or_else(
|| { || {
Err(not_found_error(anyhow!( Err(not_found_error(anyhow!(
"Parent version with id `{}` not found", "Parent version with id `{parent_version_id}` not found"
request.parent_version_id
))) )))
}, },
Ok, Ok,
)?; )
}
let sanitized_relative_path = sanitize_path(&request.relative_path); #[allow(clippy::too_many_lines, clippy::too_many_arguments)]
async fn update_document(
parent_document: StoredDocumentVersion,
vault_id: VaultId,
document_id: DocumentId,
user: User,
device_id: DeviceIdHeader,
state: AppState,
relative_path: &str,
content: Vec<u8>,
) -> Result<Json<DocumentUpdateResponse>, SyncServerError> {
let sanitized_relative_path = sanitize_path(relative_path);
let mut transaction = state let mut transaction = state
.database .database
@ -102,8 +169,6 @@ pub async fn update_document(
))); )));
} }
let content = request.content.contents.to_vec();
// Return the latest version if the content and path are the same as the latest // Return the latest version if the content and path are the same as the latest
// version // version
if content == latest_version.content && sanitized_relative_path == latest_version.relative_path if content == latest_version.content && sanitized_relative_path == latest_version.relative_path

View file

@ -1,4 +1,5 @@
pub mod dedup_paths; pub mod dedup_paths;
pub mod is_binary;
pub mod is_file_type_mergable; pub mod is_file_type_mergable;
pub mod normalize; pub mod normalize;
pub mod rotating_file_writer; pub mod rotating_file_writer;

View file

@ -0,0 +1,26 @@
/// Heuristically determine if the given data is a binary or a text file's
/// content.
///
/// Only text inputs can be reconciled using the crate's functions.
#[must_use]
pub fn is_binary(data: &[u8]) -> bool {
if data.contains(&0) {
// Even though the NUL character is valid in UTF-8, it's highly suspicious in
// human-readable text.
return true;
}
std::str::from_utf8(data).is_err()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_is_binary() {
assert!(is_binary(&[0, 159, 146, 150]));
assert!(is_binary(&[0, 12]));
assert!(!is_binary(b"hello"));
}
}

View file

@ -93,6 +93,26 @@ impl RotatingFileWriter {
SystemTime::now() >= inner.next_rotation_time SystemTime::now() >= inner.next_rotation_time
} }
fn open_or_create_log_file(inner: &mut RotatingFileWriterInner) -> io::Result<()> {
// If we haven't reached rotation time and there's an existing log file, reuse it
if !Self::should_rotate(inner)
&& let Some(latest_file) =
Self::find_latest_log_file(&inner.directory, &inner.file_prefix)
{
let filepath = inner.directory.join(&latest_file);
let file = OpenOptions::new()
.create(true)
.append(true)
.open(&filepath)?;
inner.current_file = Some(file);
return Ok(());
}
// Otherwise, create a new log file with current timestamp
Self::rotate(inner)
}
fn rotate(inner: &mut RotatingFileWriterInner) -> io::Result<()> { fn rotate(inner: &mut RotatingFileWriterInner) -> io::Result<()> {
let timestamp = Local::now().format("%Y-%m-%d_%H-%M-%S"); let timestamp = Local::now().format("%Y-%m-%d_%H-%M-%S");
let filename = format!("{}.{}.log", inner.file_prefix, timestamp); let filename = format!("{}.{}.log", inner.file_prefix, timestamp);
@ -114,7 +134,9 @@ impl Write for RotatingFileWriter {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> { fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
let mut inner = self.inner.lock().unwrap(); let mut inner = self.inner.lock().unwrap();
if inner.current_file.is_none() || Self::should_rotate(&inner) { if inner.current_file.is_none() {
Self::open_or_create_log_file(&mut inner)?;
} else if Self::should_rotate(&inner) {
Self::rotate(&mut inner)?; Self::rotate(&mut inner)?;
} }
@ -328,6 +350,7 @@ mod tests {
#[test] #[test]
fn test_restart_behavior() { fn test_restart_behavior() {
let temp_dir = std::env::temp_dir().join("test_restart_behavior"); let temp_dir = std::env::temp_dir().join("test_restart_behavior");
let _ = fs::remove_dir_all(&temp_dir);
// Create initial writer and write some data // Create initial writer and write some data
{ {