store creation id and implement moves

This commit is contained in:
Andras Schmelczer 2026-04-21 20:30:04 +01:00
parent dca59a18dc
commit 5ee9db0007
9 changed files with 128 additions and 23 deletions

View file

@ -5,6 +5,7 @@ import type { WebSocketClientMessage } from "./types/WebSocketClientMessage";
import type { CursorPositionFromClient } from "./types/CursorPositionFromClient"; import type { CursorPositionFromClient } from "./types/CursorPositionFromClient";
import type { ClientCursors } from "./types/ClientCursors"; import type { ClientCursors } from "./types/ClientCursors";
import type { WebSocketVaultUpdate } from "./types/WebSocketVaultUpdate"; import type { WebSocketVaultUpdate } from "./types/WebSocketVaultUpdate";
import type { WebSocketVaultPathChange } from "./types/WebSocketVaultPathChange";
import { import {
WEBSOCKET_DISCONNECT_TIMEOUT_IN_SECONDS, WEBSOCKET_DISCONNECT_TIMEOUT_IN_SECONDS,
WEBSOCKET_CONNECTION_TIMEOUT_IN_SECONDS WEBSOCKET_CONNECTION_TIMEOUT_IN_SECONDS
@ -22,6 +23,10 @@ export class WebSocketManager {
(update: WebSocketVaultUpdate) => Promise<void> (update: WebSocketVaultUpdate) => Promise<void>
>(); >();
public readonly onRemotePathChangeReceived = new EventListeners<
(pathChange: WebSocketVaultPathChange) => Promise<void>
>();
public readonly onRemoteCursorsUpdateReceived = new EventListeners< public readonly onRemoteCursorsUpdateReceived = new EventListeners<
(cursors: ClientCursors[]) => Promise<void> (cursors: ClientCursors[]) => Promise<void>
>(); >();
@ -280,22 +285,28 @@ export class WebSocketManager {
private async handleWebSocketMessage( private async handleWebSocketMessage(
message: WebSocketServerMessage message: WebSocketServerMessage
): Promise<void> { ): Promise<void> {
if (message.type === "vaultUpdate") { switch (message.type) {
await this.onRemoteVaultUpdateReceived.triggerAsync(message); case "vaultUpdate":
await this.onRemoteVaultUpdateReceived.triggerAsync(message);
return;
} else if (message.type === "cursorPositions") { case "pathChange":
this.logger.debug( this.logger.debug(
`Received cursor positions for ${JSON.stringify(message.clients)}` `Received path change for document ${message.documentId}${message.relativePath}`
); );
await this.onRemotePathChangeReceived.triggerAsync(message);
await this.onRemoteCursorsUpdateReceived.triggerAsync( return;
message.clients case "cursorPositions":
); this.logger.debug(
} else { `Received cursor positions for ${JSON.stringify(message.clients)}`
this.logger.warn( );
`Received unknown message type: ${JSON.stringify(message)}` await this.onRemoteCursorsUpdateReceived.triggerAsync(
); message.clients
);
return;
default:
this.logger.warn(
`Received unknown message type: ${JSON.stringify(message)}`
);
} }
} }
} }

View file

@ -14,6 +14,7 @@ import { scheduleOfflineChanges } from "./offline-change-detector";
import { SyncResetError } from "../errors/sync-reset-error"; import { SyncResetError } from "../errors/sync-reset-error";
import type { DocumentVersionWithoutContent } from "../services/types/DocumentVersionWithoutContent"; import type { DocumentVersionWithoutContent } from "../services/types/DocumentVersionWithoutContent";
import type { WebSocketVaultUpdate } from "../services/types/WebSocketVaultUpdate"; import type { WebSocketVaultUpdate } from "../services/types/WebSocketVaultUpdate";
import type { WebSocketVaultPathChange } from "../services/types/WebSocketVaultPathChange";
import type { WebSocketManager } from "../services/websocket-manager"; import type { WebSocketManager } from "../services/websocket-manager";
import type { WebSocketClientMessage } from "../services/types/WebSocketClientMessage"; import type { WebSocketClientMessage } from "../services/types/WebSocketClientMessage";
import { EventListeners } from "../utils/data-structures/event-listeners"; import { EventListeners } from "../utils/data-structures/event-listeners";
@ -73,6 +74,9 @@ export class Syncer {
this.webSocketManager.onRemoteVaultUpdateReceived.add( this.webSocketManager.onRemoteVaultUpdateReceived.add(
this.syncRemotelyUpdatedFile.bind(this) this.syncRemotelyUpdatedFile.bind(this)
); );
this.webSocketManager.onRemotePathChangeReceived.add(
this.syncRemotelyChangedPath.bind(this)
);
} }
public get isFirstSyncComplete(): boolean { public get isFirstSyncComplete(): boolean {
@ -173,6 +177,63 @@ export class Syncer {
} }
} }
// A PathChange notifies us that a document now lives at a new server-
// canonical path. It's delivered to every client (origin included)
// because the create/update HTTP response no longer carries the path,
// so the only way the origin learns about dedupe or first-rename-wins
// is via this event.
public async syncRemotelyChangedPath(
pathChange: WebSocketVaultPathChange
): Promise<void> {
try {
const existing = this.queue.getDocumentByDocumentId(
pathChange.documentId
);
if (existing === undefined) {
throw new Error(
`Received path change for unknown document ${pathChange.documentId}`
);
}
const { path: currentPath, record } = existing;
const newPath = pathChange.relativePath;
if (currentPath !== newPath) {
await this.operations.move(currentPath, newPath);
this.history.addHistoryEntry({
status: SyncStatus.SUCCESS,
details: {
type: SyncType.MOVE,
relativePath: newPath,
movedFrom: currentPath
},
message: "Applied remote path change"
});
}
// `operations.move` updates the queue's path index, but
// doesn't touch `remoteRelativePath`. Refresh it so offline
// change detection compares against the server's path.
// parentVersionId intentionally stays at its prior value:
// if the write also changed content, the corresponding
// VaultUpdate handles that; advancing it here would make us
// skip fetching content we don't yet have.
this.queue.setDocument(newPath, {
...record,
remoteRelativePath: newPath
});
} catch (e) {
if (e instanceof SyncResetError) {
this.logger.info(
"Failed to apply remote path change due to a reset"
);
return;
}
this.logger.error(`Failed to apply remote path change: ${e}`);
}
}
public reset(): void { public reset(): void {
this._isFirstSyncComplete = false; this._isFirstSyncComplete = false;
this.queue.clear(); this.queue.clear();

View file

@ -586,6 +586,7 @@ impl Database {
r#" r#"
select select
vault_update_id, vault_update_id,
creation_vault_update_id,
document_id as "document_id: Hyphenated", document_id as "document_id: Hyphenated",
relative_path, relative_path,
updated_date as "updated_date: chrono::DateTime<Utc>", updated_date as "updated_date: chrono::DateTime<Utc>",
@ -626,6 +627,7 @@ impl Database {
r#" r#"
select select
vault_update_id, vault_update_id,
creation_vault_update_id,
document_id as "document_id: Hyphenated", document_id as "document_id: Hyphenated",
relative_path, relative_path,
updated_date as "updated_date: chrono::DateTime<Utc>", updated_date as "updated_date: chrono::DateTime<Utc>",
@ -661,6 +663,7 @@ impl Database {
r#" r#"
select select
vault_update_id, vault_update_id,
creation_vault_update_id,
document_id as "document_id: Hyphenated", document_id as "document_id: Hyphenated",
relative_path, relative_path,
updated_date as "updated_date: chrono::DateTime<Utc>", updated_date as "updated_date: chrono::DateTime<Utc>",
@ -697,6 +700,7 @@ impl Database {
r#" r#"
insert into documents ( insert into documents (
vault_update_id, vault_update_id,
creation_vault_update_id,
document_id, document_id,
relative_path, relative_path,
updated_date, updated_date,
@ -706,9 +710,10 @@ impl Database {
device_id, device_id,
has_been_merged has_been_merged
) )
values (?, ?, ?, ?, ?, ?, ?, ?, ?) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
"#, "#,
version.vault_update_id, version.vault_update_id,
version.creation_vault_update_id,
document_id, document_id,
version.relative_path, version.relative_path,
version.updated_date, version.updated_date,

View file

@ -0,0 +1,20 @@
ALTER TABLE documents ADD COLUMN creation_vault_update_id INTEGER NOT NULL DEFAULT 0;
UPDATE documents
SET creation_vault_update_id = (
SELECT MIN(d2.vault_update_id)
FROM documents d2
WHERE d2.document_id = documents.document_id
);
DROP VIEW latest_document_versions;
CREATE VIEW IF NOT EXISTS latest_document_versions AS --recreate view as it now includes one more field
SELECT d.*
FROM documents d
INNER JOIN (
SELECT MAX(vault_update_id) AS max_version_id
FROM documents
GROUP BY document_id
) max_versions
ON d.vault_update_id = max_versions.max_version_id;

View file

@ -13,6 +13,7 @@ pub type DeviceId = String;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct StoredDocumentVersion { pub struct StoredDocumentVersion {
pub vault_update_id: VaultUpdateId, pub vault_update_id: VaultUpdateId,
pub creation_vault_update_id: VaultUpdateId,
pub document_id: DocumentId, pub document_id: DocumentId,
pub relative_path: String, pub relative_path: String,
pub updated_date: DateTime<Utc>, pub updated_date: DateTime<Utc>,

View file

@ -121,8 +121,10 @@ pub async fn create_document(
let path_changed = deduped_path != sanitized_relative_path; let path_changed = deduped_path != sanitized_relative_path;
let new_vault_update_id = last_update_id + 1;
let new_version = StoredDocumentVersion { let new_version = StoredDocumentVersion {
vault_update_id: last_update_id + 1, vault_update_id: new_vault_update_id,
creation_vault_update_id: new_vault_update_id,
document_id, document_id,
relative_path: deduped_path, relative_path: deduped_path,
content: new_content, content: new_content,

View file

@ -73,13 +73,16 @@ pub async fn delete_document(
return Ok(Json(latest_version.clone().into())); return Ok(Json(latest_version.clone().into()));
} }
let (latest_relative_path, latest_content) = latest_version.map_or_else( let new_vault_update_id = last_update_id + 1;
|| (String::new(), Vec::new()), let (latest_relative_path, latest_content, creation_vault_update_id) =
|version| (version.relative_path, version.content), latest_version.map_or_else(
); || (String::new(), Vec::new(), new_vault_update_id),
|version| (version.relative_path, version.content, version.creation_vault_update_id),
);
let new_version = StoredDocumentVersion { let new_version = StoredDocumentVersion {
vault_update_id: last_update_id + 1, vault_update_id: new_vault_update_id,
creation_vault_update_id,
document_id, document_id,
relative_path: latest_relative_path, relative_path: latest_relative_path,
content: latest_content, // copy the content from the latest version content: latest_content, // copy the content from the latest version

View file

@ -133,6 +133,7 @@ pub async fn restore_document_version(
let new_version = StoredDocumentVersion { let new_version = StoredDocumentVersion {
vault_update_id: last_update_id + 1, vault_update_id: last_update_id + 1,
creation_vault_update_id: target_version.creation_vault_update_id,
document_id, document_id,
relative_path: restore_path, relative_path: restore_path,
content: target_version.content, content: target_version.content,

View file

@ -303,6 +303,7 @@ pub async fn update_document(
let new_version = StoredDocumentVersion { let new_version = StoredDocumentVersion {
document_id, document_id,
vault_update_id: last_update_id + 1, vault_update_id: last_update_id + 1,
creation_vault_update_id: latest_version.creation_vault_update_id,
relative_path: new_relative_path, relative_path: new_relative_path,
content: merged_content, content: merged_content,
updated_date: chrono::Utc::now(), updated_date: chrono::Utc::now(),