Add idempotency key for create

This commit is contained in:
Andras Schmelczer 2026-03-15 08:06:22 +00:00
parent a63903734d
commit ae590e6fc8
35 changed files with 624 additions and 143 deletions

View file

@ -23,8 +23,15 @@ export interface StoredDocumentMetadata {
hash: string;
}
export interface StoredPendingDocument {
relativePath: RelativePath;
idempotencyKey: string;
originalCreationPath: RelativePath;
}
export interface StoredDatabase {
documents: StoredDocumentMetadata[];
pendingDocuments?: StoredPendingDocument[];
lastSeenUpdateId: VaultUpdateId | undefined;
}
@ -39,6 +46,11 @@ export interface DocumentRecord {
metadata: DocumentMetadata | undefined;
isDeleted: boolean;
parallelVersion: number;
/** The path when this pending document was first created locally.
* Survives renames so we can match it against server responses
* when a create request succeeded but the response was lost. */
originalCreationPath?: RelativePath;
idempotencyKey?: string;
}
export class Database {
@ -60,6 +72,26 @@ export class Database {
parallelVersion: 0
})) ?? [];
if (initialState.pendingDocuments) {
for (const pending of initialState.pendingDocuments) {
const existing =
this.getLatestDocumentByRelativePath(
pending.relativePath
);
this.documents.push({
relativePath: pending.relativePath,
metadata: undefined,
isDeleted: false,
parallelVersion:
existing !== undefined
? existing.parallelVersion + 1
: 0,
originalCreationPath: pending.originalCreationPath,
idempotencyKey: pending.idempotencyKey
});
}
}
this.ensureConsistency();
this.logger.debug(`Loaded ${this.documents.length} documents`);
@ -112,6 +144,12 @@ export class Database {
});
}
public get pendingDocuments(): DocumentRecord[] {
return this.documents.filter(
(doc) => doc.metadata === undefined && !doc.isDeleted
);
}
public updateDocumentMetadata(
metadata: {
documentId: DocumentId;
@ -155,19 +193,25 @@ export class Database {
const previousEntry =
this.getLatestDocumentByRelativePath(relativePath);
const entry = {
const entry: DocumentRecord = {
relativePath,
metadata: undefined,
isDeleted: false,
parallelVersion:
previousEntry?.parallelVersion === undefined
? 0
: previousEntry.parallelVersion + 1
: previousEntry.parallelVersion + 1,
originalCreationPath: relativePath,
idempotencyKey: crypto.randomUUID()
};
this.documents.push(entry);
// no need to save as we only save documents which have metadata
// Save without consistency check — pending docs can't violate
// the documentId uniqueness invariant since they have no metadata.
void this.save().catch((error: unknown) => {
this.logger.error(`Error saving data: ${error}`);
});
return entry;
}
@ -222,6 +266,10 @@ export class Database {
this.saveInTheBackground();
}
public containsDocument(target: DocumentRecord): boolean {
return this.documents.includes(target);
}
public getLastSeenUpdateId(): VaultUpdateId {
return this.lastSeenUpdateIds.min;
}
@ -256,6 +304,13 @@ export class Database {
...metadata! // `resolvedDocuments` only returns docs with metadata set
})
),
pendingDocuments: this.pendingDocuments.map(
({ relativePath, idempotencyKey, originalCreationPath }) => ({
relativePath,
idempotencyKey: idempotencyKey!,
originalCreationPath: originalCreationPath!
})
),
lastSeenUpdateId: this.lastSeenUpdateIds.min
});
}

View file

@ -67,10 +67,12 @@ export class SyncService {
public async create({
relativePath,
contentBytes
contentBytes,
idempotencyKey
}: {
relativePath: RelativePath;
contentBytes: Uint8Array;
idempotencyKey?: string;
}): Promise<DocumentUpdateResponse> {
return this.retryForever(async () => {
const formData = new FormData();
@ -81,6 +83,10 @@ export class SyncService {
new Blob([new Uint8Array(contentBytes)])
);
if (idempotencyKey !== undefined) {
formData.append("idempotency_key", idempotencyKey);
}
this.logger.debug(
`Creating document with relative path ${relativePath}`
);
@ -362,6 +368,52 @@ export class SyncService {
});
}
public async resolveIdempotencyKeys(
keys: string[]
): Promise<Map<string, string>> {
this.logger.debug(
`Resolving ${keys.length} idempotency keys`
);
try {
const response = await this.client(
this.getUrl("/documents/resolve-keys"),
{
method: "POST",
body: JSON.stringify({ idempotencyKeys: keys }),
headers: this.getDefaultHeaders({ type: "json" })
}
);
if (!response.ok) {
this.logger.warn(
`Failed to resolve idempotency keys: ${await SyncService.errorFromResponse(
response
)}`
);
return new Map();
}
const result: { resolved: Record<string, string> } =
(await response.json()) as { resolved: Record<string, string> }; // eslint-disable-line @typescript-eslint/no-unsafe-type-assertion
const resolved = new Map<string, string>(
Object.entries(result.resolved)
);
this.logger.debug(
`Resolved ${resolved.size}/${keys.length} idempotency keys`
);
return resolved;
} catch (e) {
this.logger.warn(
`Failed to resolve idempotency keys: ${e}`
);
return new Map();
}
}
public async ping(): Promise<PingResponse> {
this.logger.debug("Pinging server");
const response = await this.pingClient(this.getUrl("/ping"), {

View file

@ -1,8 +1,4 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { DocumentWithCursors } from "./DocumentWithCursors";
export interface ClientCursors {
userName: string;
deviceId: string;
documentsWithCursors: DocumentWithCursors[];
}
export interface ClientCursors { userName: string, deviceId: string, documentsWithCursors: DocumentWithCursors[], }

View file

@ -1,6 +1,3 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
export interface CreateDocumentVersion {
relative_path: string;
content: number[];
}
export interface CreateDocumentVersion { relative_path: string, content: number[], }

View file

@ -1,6 +1,4 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { DocumentWithCursors } from "./DocumentWithCursors";
export interface CursorPositionFromClient {
documentsWithCursors: DocumentWithCursors[];
}
export interface CursorPositionFromClient { documentsWithCursors: DocumentWithCursors[], }

View file

@ -1,6 +1,4 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { ClientCursors } from "./ClientCursors";
export interface CursorPositionFromServer {
clients: ClientCursors[];
}
export interface CursorPositionFromServer { clients: ClientCursors[], }

View file

@ -1,6 +1,3 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
export interface CursorSpan {
start: number;
end: number;
}
export interface CursorSpan { start: number, end: number, }

View file

@ -1,5 +1,3 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
export interface DeleteDocumentVersion {
relativePath: string;
}
export interface DeleteDocumentVersion { relativePath: string, }

View file

@ -5,6 +5,4 @@ import type { DocumentVersionWithoutContent } from "./DocumentVersionWithoutCont
/**
* Response to an update document request.
*/
export type DocumentUpdateResponse =
| ({ type: "FastForwardUpdate" } & DocumentVersionWithoutContent)
| ({ type: "MergingUpdate" } & DocumentVersion);
export type DocumentUpdateResponse = { "type": "FastForwardUpdate" } & DocumentVersionWithoutContent | { "type": "MergingUpdate" } & DocumentVersion;

View file

@ -1,12 +1,3 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
export interface DocumentVersion {
vaultUpdateId: number;
documentId: string;
relativePath: string;
updatedDate: string;
contentBase64: string;
isDeleted: boolean;
userId: string;
deviceId: string;
}
export interface DocumentVersion { vaultUpdateId: number, documentId: string, relativePath: string, updatedDate: string, contentBase64: string, isDeleted: boolean, userId: string, deviceId: string, }

View file

@ -1,12 +1,3 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
export interface DocumentVersionWithoutContent {
vaultUpdateId: number;
documentId: string;
relativePath: string;
updatedDate: string;
isDeleted: boolean;
userId: string;
deviceId: string;
contentSize: number;
}
export interface DocumentVersionWithoutContent { vaultUpdateId: number, documentId: string, relativePath: string, updatedDate: string, isDeleted: boolean, userId: string, deviceId: string, contentSize: number, }

View file

@ -1,9 +1,4 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { CursorSpan } from "./CursorSpan";
export interface DocumentWithCursors {
vault_update_id: number | null;
document_id: string;
relative_path: string;
cursors: CursorSpan[];
}
export interface DocumentWithCursors { vault_update_id: number | null, document_id: string, relative_path: string, cursors: CursorSpan[], }

View file

@ -4,10 +4,8 @@ import type { DocumentVersionWithoutContent } from "./DocumentVersionWithoutCont
/**
* Response to a fetch latest documents request.
*/
export interface FetchLatestDocumentsResponse {
latestDocuments: DocumentVersionWithoutContent[];
/**
* The update ID of the latest document in the response.
*/
lastUpdateId: bigint;
}
export interface FetchLatestDocumentsResponse { latestDocuments: DocumentVersionWithoutContent[],
/**
* The update ID of the latest document in the response.
*/
lastUpdateId: bigint, }

View file

@ -3,23 +3,22 @@
/**
* Response to a ping request.
*/
export interface PingResponse {
/**
* Semantic version of the server.
*/
serverVersion: string;
/**
* Whether the client is authenticated based on the sent Authorization
* header.
*/
isAuthenticated: boolean;
/**
* List of file extensions that are allowed to be merged.
*/
mergeableFileExtensions: string[];
/**
* API version ensuring backwards & forwards compatibility between the client
* and server.
*/
supportedApiVersion: number;
}
export interface PingResponse {
/**
* Semantic version of the server.
*/
serverVersion: string,
/**
* Whether the client is authenticated based on the sent Authorization
* header.
*/
isAuthenticated: boolean,
/**
* List of file extensions that are allowed to be merged.
*/
mergeableFileExtensions: string[],
/**
* API version ensuring backwards & forwards compatibility between the client
* and server.
*/
supportedApiVersion: number, }

View file

@ -1,7 +1,3 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
export interface SerializedError {
errorType: string;
message: string;
causes: string[];
}
export interface SerializedError { errorType: string, message: string, causes: string[], }

View file

@ -1,7 +1,3 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
export interface UpdateTextDocumentVersion {
parentVersionId: number;
relativePath: string;
content: (number | string)[];
}
export interface UpdateTextDocumentVersion { parentVersionId: number, relativePath: string, content: (number | string)[], }

View file

@ -2,6 +2,4 @@
import type { CursorPositionFromClient } from "./CursorPositionFromClient";
import type { WebSocketHandshake } from "./WebSocketHandshake";
export type WebSocketClientMessage =
| ({ type: "handshake" } & WebSocketHandshake)
| ({ type: "cursorPositions" } & CursorPositionFromClient);
export type WebSocketClientMessage = { "type": "handshake" } & WebSocketHandshake | { "type": "cursorPositions" } & CursorPositionFromClient;

View file

@ -1,7 +1,3 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
export interface WebSocketHandshake {
token: string;
deviceId: string;
lastSeenVaultUpdateId: number | null;
}
export interface WebSocketHandshake { token: string, deviceId: string, lastSeenVaultUpdateId: number | null, }

View file

@ -2,6 +2,4 @@
import type { CursorPositionFromServer } from "./CursorPositionFromServer";
import type { WebSocketVaultUpdate } from "./WebSocketVaultUpdate";
export type WebSocketServerMessage =
| ({ type: "vaultUpdate" } & WebSocketVaultUpdate)
| ({ type: "cursorPositions" } & CursorPositionFromServer);
export type WebSocketServerMessage = { "type": "vaultUpdate" } & WebSocketVaultUpdate | { "type": "cursorPositions" } & CursorPositionFromServer;

View file

@ -1,7 +1,4 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { DocumentVersionWithoutContent } from "./DocumentVersionWithoutContent";
export interface WebSocketVaultUpdate {
documents: DocumentVersionWithoutContent[];
isInitialSync: boolean;
}
export interface WebSocketVaultUpdate { documents: DocumentVersionWithoutContent[], isInitialSync: boolean, }

View file

@ -10,7 +10,7 @@ import { hash } from "../utils/hash";
import type { FileChangeNotifier } from "./file-change-notifier";
import { Lock } from "../utils/data-structures/locks";
import { EventListeners } from "../utils/data-structures/event-listeners";
import { Logger } from "../tracing/logger";
import type { Logger } from "../tracing/logger";
// Cursor positions are updated separately from documents. However, a given cursor position is only
// valid within a certain version of the document it belongs to. This class tracks previous and the latest

View file

@ -89,15 +89,33 @@ export class Syncer {
public async syncLocallyCreatedFile(
relativePath: RelativePath
): Promise<void> {
// check whether someone else has already created the document in the database
if (
this.database.getLatestDocumentByRelativePath(relativePath)
?.isDeleted === false
) {
// This is likely a consequence of us creating a file because of a remote update
// which triggered a local create, so we don't need to do anything here.
const existingDocument =
this.database.getLatestDocumentByRelativePath(relativePath);
// Check whether someone else has already created the document in the database
if (existingDocument?.isDeleted === false) {
if (existingDocument.metadata !== undefined) {
// Fully synced document — likely created by a remote update
// which triggered a local create, so we don't need to do anything here.
this.logger.debug(
`Document ${relativePath} already exists in the database with metadata, skipping`
);
return;
}
// Pending create (interrupted by a sync reset or duplicate file watcher event)
// — reuse the existing record and retry the sync.
this.logger.debug(
`Document ${relativePath} already exists in the database, skipping`
`Document ${relativePath} has a pending create that was interrupted, retrying sync`
);
await this.enqueueSyncOperation(
async () =>
this.unrestrictedSyncer.unrestrictedSyncLocallyCreatedOrUpdatedFile(
{
document: existingDocument
}
),
[relativePath]
);
return;
}
@ -118,10 +136,10 @@ export class Syncer {
public async syncLocallyDeletedFile(
relativePath: RelativePath
): Promise<void> {
let document =
const document =
this.database.getLatestDocumentByRelativePath(relativePath);
if (document == null || document.isDeleted === true) {
if (document == null || document.isDeleted) {
// This is must be a consequence of us deleting a file because of a remote update
// which triggered a local delete, so we don't need to do anything here.
this.logger.debug(
@ -199,6 +217,17 @@ export class Syncer {
return;
}
// If a create operation is already in progress for this document (no metadata
// yet), skip the HTTP sync. The create operation will handle syncing the content.
// We've already updated the document's path in the database above if needed,
// so the create operation will use the correct path.
if (document.metadata === undefined) {
this.logger.debug(
`Document ${relativePath} has a pending create operation, skipping HTTP sync`
);
return;
}
await this.enqueueSyncOperation(
async () =>
this.unrestrictedSyncer.unrestrictedSyncLocallyCreatedOrUpdatedFile(
@ -265,7 +294,15 @@ export class Syncer {
this._isFirstSyncComplete = true;
} catch (e) {
this.logger.error(`Failed to sync remotely updated file: ${e}`);
if (e instanceof SyncResetError) {
this.logger.info(
"Sync reset during remote update processing"
);
} else {
this.logger.error(
`Failed to sync remotely updated file: ${e}`
);
}
}
}
@ -309,6 +346,8 @@ export class Syncer {
}
private async internalScheduleSyncForOfflineChanges(): Promise<void> {
await this.unrestrictedSyncer.resolveIdempotencyKeys();
const allLocalFiles = await this.operations.listFilesRecursively();
this.logger.info(
`Scheduling sync for ${allLocalFiles.length} local files`
@ -453,9 +492,25 @@ export class Syncer {
operation: () => Promise<T>,
keys: (string | undefined | null)[]
): Promise<T> {
return this.updatedDocumentsByPathAndKeysLocks.withLock(
keys.filter((k) => k !== undefined && k !== null),
async () => this.syncQueue.add(operation)
const filteredKeys = keys.filter((k) => k !== undefined && k !== null);
// IMPORTANT: We must NOT hold locks while waiting for a queue slot.
// If we did, we could deadlock when two concurrent operations hold
// locks on different keys while both waiting for queue capacity.
//
// Instead, we acquire locks INSIDE the queued operation. This ensures:
// 1. We only hold locks during actual operation execution
// 2. The queue serializes access to queue slots
// 3. Locks serialize access to the same document/path
//
// The result type needs special handling since syncQueue.add() can
// return undefined when the queue is paused/cleared.
const result = await this.syncQueue.add(async () =>
this.updatedDocumentsByPathAndKeysLocks.withLock(
filteredKeys,
operation
)
);
return result as T;
}
}

View file

@ -57,6 +57,61 @@ export class UnrestrictedSyncer {
});
}
public async resolveIdempotencyKeys(): Promise<void> {
const pendingDocs = this.database.pendingDocuments;
if (pendingDocs.length === 0) {
return;
}
const keys = pendingDocs
.map((d) => d.idempotencyKey)
// eslint-disable-next-line no-restricted-syntax -- Type narrowing, not removing a specific item
.filter((k): k is string => k !== undefined);
if (keys.length === 0) {
return;
}
this.logger.debug(
`Resolving ${keys.length} pending idempotency keys`
);
const resolved =
await this.syncService.resolveIdempotencyKeys(keys);
for (const doc of pendingDocs) {
if (
doc.idempotencyKey !== undefined &&
resolved.has(doc.idempotencyKey)
) {
const documentId = resolved.get(doc.idempotencyKey)!; // eslint-disable-line @typescript-eslint/no-non-null-assertion
// Skip if this documentId is already assigned to another document
const existing =
this.database.getDocumentByDocumentId(documentId);
if (existing !== undefined) {
this.logger.debug(
`Document ${documentId} already exists at ${existing.relativePath}, removing stale pending doc at ${doc.relativePath}`
);
this.database.removeDocument(doc);
continue;
}
this.logger.info(
`Resolved idempotency key ${doc.idempotencyKey} to document ${documentId} for ${doc.relativePath}`
);
this.database.updateDocumentMetadata(
{
documentId,
parentVersionId: 0,
hash: "",
remoteRelativePath: doc.relativePath
},
doc
);
}
}
}
public async unrestrictedSyncLocallyCreatedOrUpdatedFile({
oldPath,
// We use the same code path for both local and remote updates. We need to force the update
@ -108,7 +163,8 @@ export class UnrestrictedSyncer {
if (document.metadata === undefined) {
response = await this.syncService.create({
relativePath: originalRelativePath,
contentBytes
contentBytes,
idempotencyKey: document.idempotencyKey
});
await this.handleMaybeMergingResponse({
@ -247,6 +303,18 @@ export class UnrestrictedSyncer {
relativePath: document.relativePath
});
// A concurrent merge operation may have removed this document from the
// database while we were waiting for the delete response. In that case,
// the merge already handled the state transition and we should not
// update metadata (which would fail anyway since the document is gone).
if (!this.database.containsDocument(document)) {
this.logger.debug(
`Document ${document.relativePath} was removed from database by a concurrent operation, skipping metadata update after delete`
);
this.database.addSeenUpdateId(response.vaultUpdateId);
return;
}
this.database.updateDocumentMetadata(
{
documentId: response.documentId,
@ -474,6 +542,8 @@ export class UnrestrictedSyncer {
let actualPath = document.relativePath;
let existingContentBytes: Uint8Array | undefined;
if (isCreate) {
// We have a file locally that got moved by another client to the same path as the one we're trying to create.
// The server returns a merging update for the document ID that already exists locally (but at another path).
@ -482,21 +552,53 @@ export class UnrestrictedSyncer {
const existingDocument = this.database.getDocumentByDocumentId(
response.documentId
);
if (existingDocument !== undefined) {
// If existingDocument === document, then a previous sync operation already
// assigned this documentId to our document. We don't need to merge - just
// continue to update the metadata below.
if (existingDocument !== undefined && existingDocument !== document) {
this.logger.info(
`Merging existing document ${existingDocument.relativePath} into ${document.relativePath
} after concurrent move & creation`
);
if (!existingDocument.isDeleted) {
this.database.delete(existingDocument.relativePath); // make sure syncLocallyDeletedFile doesn't actually schedule deleting the new file
try {
existingContentBytes = await this.operations.read(
existingDocument.relativePath
);
} catch (e) {
if (e instanceof FileNotFoundError) {
return;
}
throw e;
}
this.database.removeDocument(existingDocument);
await this.operations.move(existingDocument.relativePath, document.relativePath);
await this.operations.delete(existingDocument.relativePath);
} else {
this.database.removeDocument(existingDocument);
}
}
}
// A document's documentId should never change once assigned. If the response has a
// different documentId than what the document already has, it means the file was
// renamed during the sync operation and the response is for a different document.
// We should bail out and let subsequent sync operations fix the state.
if (
document.metadata?.documentId !== undefined &&
document.metadata.documentId !== response.documentId
) {
this.logger.info(
`Document ${document.relativePath} already has documentId ${document.metadata.documentId}, ` +
`but response has documentId ${response.documentId}. Ignoring response to prevent documentId corruption.`
);
this.database.addSeenUpdateId(response.vaultUpdateId);
return;
}
// this can't happen on the creation path as we can only get a merging response if a document already exists remotely on the same path
if (response.relativePath != originalRelativePath) {
actualPath = response.relativePath;
@ -530,6 +632,17 @@ export class UnrestrictedSyncer {
originalContentBytes,
responseBytes
);
if (existingContentBytes !== undefined) {
// the merge case is only always for text files, so don't mind that we have to provide a byte array here
await this.operations.write(
actualPath,
new Uint8Array(0),
existingContentBytes
);
}
await this.updateCache(
response.vaultUpdateId,
responseBytes,

View file

@ -12,11 +12,11 @@ const COLORS = {
export function logToConsole(
logger: Logger,
{ useColors = true }: { useColors?: boolean } = {}
{ useColors = true, prefix }: { useColors?: boolean; prefix?: string } = {}
): void {
logger.onLogEmitted.add((logLine: LogLine) => {
const timestamp = logLine.timestamp.toISOString();
const message = logLine.message;
const {message} = logLine;
let color = "";
let reset = "";
@ -38,7 +38,8 @@ export function logToConsole(
}
}
const formatted = `${timestamp} ${color}${logLine.level}${reset} ${message}`;
const prefixPart = prefix !== undefined ? `${prefix} ` : "";
const formatted = `${prefixPart}${timestamp} ${color}${logLine.level}${reset} ${message}`;
switch (logLine.level) {
case LogLevel.ERROR: