diff --git a/sync-server/src/app_state/database.rs b/sync-server/src/app_state/database.rs index c91bc28a..b9bf8df1 100644 --- a/sync-server/src/app_state/database.rs +++ b/sync-server/src/app_state/database.rs @@ -16,6 +16,24 @@ pub mod models; #[error("Database is busy")] pub struct WriteBusyError; +/// Tells [`Database::insert_document_version`] which WebSocket events the +/// just-committed version should produce. The caller is the only party +/// with enough context to decide this (the DB layer has no access to +/// "what the client sent" or "what the prior version looked like"). +#[derive(Debug, Clone, Copy, Default)] +pub struct InsertBroadcast { + /// Emit a `VaultUpdate` (filtered from the origin device). Set when + /// the stored bytes differ from the prior version's bytes — i.e. + /// peers need to pull new content. + pub content_changed: bool, + + /// Emit a `PathChange` (delivered to every client, origin included). + /// Set when the stored path differs from the prior stored path *or* + /// from the path the origin client sent — i.e. someone needs to + /// reconcile a dedupe, rename, or first-rename-wins outcome. + pub path_changed: bool, +} + use sqlx::{ Pool, Sqlite, pool::PoolConnection, sqlite::SqliteConnection, sqlite::SqlitePoolOptions, }; @@ -25,7 +43,10 @@ use uuid::fmt::Hyphenated; use super::websocket::{ broadcasts::Broadcasts, - models::{WebSocketServerMessage, WebSocketServerMessageWithOrigin, WebSocketVaultUpdate}, + models::{ + WebSocketServerMessage, WebSocketServerMessageWithOrigin, WebSocketVaultPathChange, + WebSocketVaultUpdate, + }, }; use crate::config::database_config::DatabaseConfig; use crate::consts::IDLE_POOL_TIMEOUT; @@ -669,6 +690,7 @@ impl Database { vault_id: &VaultId, version: &StoredDocumentVersion, mut transaction: WriteTransaction, + broadcast: InsertBroadcast, ) -> Result<()> { let document_id = version.document_id.as_hyphenated(); let query = sqlx::query!( @@ -712,18 +734,43 @@ impl Database { .await .context("Failed to commit transaction")?; - self.broadcasts - .send_document_update( - vault_id.clone(), - WebSocketServerMessageWithOrigin::with_origin( - version.device_id.clone(), - WebSocketServerMessage::VaultUpdate(WebSocketVaultUpdate { - documents: vec![version.clone().into()], - is_initial_sync: false, - }), - ), - ) - .await; + if broadcast.content_changed { + // Content events are filtered out for the origin device — the + // origin already has the content (or learns about the merge + // via the HTTP response). + self.broadcasts + .send_document_update( + vault_id.clone(), + WebSocketServerMessageWithOrigin::with_origin( + version.device_id.clone(), + WebSocketServerMessage::VaultUpdate(WebSocketVaultUpdate { + documents: vec![version.clone().into()], + is_initial_sync: false, + }), + ), + ) + .await; + } + + if broadcast.path_changed { + // Path change events intentionally carry no origin so *every* + // connected client (including the one that made the write) + // receives them. The create/update HTTP response no longer + // carries `relative_path`, so the origin device relies on this + // event to learn the server-canonical path. + self.broadcasts + .send_document_update( + vault_id.clone(), + WebSocketServerMessageWithOrigin::new(WebSocketServerMessage::PathChange( + WebSocketVaultPathChange { + vault_update_id: version.vault_update_id, + document_id: version.document_id, + relative_path: version.relative_path.clone(), + }, + )), + ) + .await; + } Ok(()) } diff --git a/sync-server/src/app_state/database/models.rs b/sync-server/src/app_state/database/models.rs index 7aea3358..9812703d 100644 --- a/sync-server/src/app_state/database/models.rs +++ b/sync-server/src/app_state/database/models.rs @@ -77,6 +77,72 @@ pub struct DocumentVersion { pub device_id: DeviceId, } +/// Like [`DocumentVersionWithoutContent`] but without the `relative_path`. +/// Used only in create/update responses where the client already tracks +/// the path locally (the server is the source of truth for the +/// document identity, not its path). +#[derive(TS, Debug, Clone, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct DocumentUpdateMetadata { + #[ts(type = "number")] + pub vault_update_id: VaultUpdateId, + + pub document_id: DocumentId, + pub updated_date: DateTime, + pub is_deleted: bool, + pub user_id: UserId, + pub device_id: DeviceId, + + #[ts(type = "number")] + pub content_size: u64, +} + +impl From for DocumentUpdateMetadata { + fn from(value: StoredDocumentVersion) -> Self { + Self { + vault_update_id: value.vault_update_id, + document_id: value.document_id, + updated_date: value.updated_date, + is_deleted: value.is_deleted, + user_id: value.user_id, + device_id: value.device_id, + content_size: value.content.len() as u64, + } + } +} + +/// Like [`DocumentVersion`] but without the `relative_path`. +/// Used only in create/update responses when the server had to merge the +/// client's content with a newer remote version and therefore must echo +/// the merged content back. +#[derive(TS, Debug, Clone, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct DocumentUpdateMergedContent { + #[ts(type = "number")] + pub vault_update_id: VaultUpdateId, + + pub document_id: DocumentId, + pub updated_date: DateTime, + pub content_base64: String, + pub is_deleted: bool, + pub user_id: UserId, + pub device_id: DeviceId, +} + +impl From for DocumentUpdateMergedContent { + fn from(value: StoredDocumentVersion) -> Self { + Self { + vault_update_id: value.vault_update_id, + document_id: value.document_id, + updated_date: value.updated_date, + content_base64: STANDARD.encode(&value.content), + is_deleted: value.is_deleted, + user_id: value.user_id, + device_id: value.device_id, + } + } +} + /// Row struct for vault history queries (used by `sqlx::query_as!`) #[derive(Debug)] pub struct VaultHistoryRow { diff --git a/sync-server/src/app_state/websocket/models.rs b/sync-server/src/app_state/websocket/models.rs index 116c2b84..97247229 100644 --- a/sync-server/src/app_state/websocket/models.rs +++ b/sync-server/src/app_state/websocket/models.rs @@ -64,6 +64,15 @@ pub struct WebSocketVaultUpdate { pub is_initial_sync: bool, } +#[derive(TS, Serialize, Clone, Debug)] +#[serde(rename_all = "camelCase")] +pub struct WebSocketVaultPathChange { + #[ts(type = "number")] + pub vault_update_id: VaultUpdateId, + pub document_id: DocumentId, + pub relative_path: String, +} + #[derive(TS, Deserialize, Clone, Debug)] #[serde(rename_all = "camelCase", tag = "type")] #[ts(export)] @@ -77,6 +86,7 @@ pub enum WebSocketClientMessage { #[ts(export)] pub enum WebSocketServerMessage { VaultUpdate(WebSocketVaultUpdate), + PathChange(WebSocketVaultPathChange), CursorPositions(CursorPositionFromServer), } diff --git a/sync-server/src/server/create_document.rs b/sync-server/src/server/create_document.rs index 89941b9a..51ed5b47 100644 --- a/sync-server/src/server/create_document.rs +++ b/sync-server/src/server/create_document.rs @@ -11,7 +11,10 @@ use super::{device_id_header::DeviceIdHeader, requests::CreateDocumentVersion}; use crate::{ app_state::{ AppState, - database::models::{StoredDocumentVersion, VaultId}, + database::{ + InsertBroadcast, + models::{StoredDocumentVersion, VaultId}, + }, }, config::user_config::User, errors::{SyncServerError, client_error, server_error, write_transaction_error}, @@ -116,6 +119,8 @@ pub async fn create_document( ); } + let path_changed = deduped_path != sanitized_relative_path; + let new_version = StoredDocumentVersion { vault_update_id: last_update_id + 1, document_id, @@ -130,7 +135,17 @@ pub async fn create_document( state .database - .insert_document_version(&vault_id, &new_version, transaction) + .insert_document_version( + &vault_id, + &new_version, + transaction, + InsertBroadcast { + // A brand-new document is always a content change for peers. + content_changed: true, + // Origin needs to know if the server deduped its requested path. + path_changed, + }, + ) .await .map_err(server_error)?; diff --git a/sync-server/src/server/delete_document.rs b/sync-server/src/server/delete_document.rs index 3e6398b8..48872e34 100644 --- a/sync-server/src/server/delete_document.rs +++ b/sync-server/src/server/delete_document.rs @@ -11,8 +11,9 @@ use super::{device_id_header::DeviceIdHeader, requests::DeleteDocumentVersion}; use crate::{ app_state::{ AppState, - database::models::{ - DocumentId, DocumentVersionWithoutContent, StoredDocumentVersion, VaultId, + database::{ + InsertBroadcast, + models::{DocumentId, DocumentVersionWithoutContent, StoredDocumentVersion, VaultId}, }, }, config::user_config::User, @@ -91,7 +92,17 @@ pub async fn delete_document( state .database - .insert_document_version(&vault_id, &new_version, transaction) + .insert_document_version( + &vault_id, + &new_version, + transaction, + InsertBroadcast { + // Deletion is a content change peers must learn about. + content_changed: true, + // Delete never renames. + path_changed: false, + }, + ) .await .map_err(server_error)?; diff --git a/sync-server/src/server/responses.rs b/sync-server/src/server/responses.rs index f393747d..18158e65 100644 --- a/sync-server/src/server/responses.rs +++ b/sync-server/src/server/responses.rs @@ -3,7 +3,8 @@ use serde::{self, Serialize}; use ts_rs::TS; use crate::app_state::database::models::{ - DocumentVersion, DocumentVersionWithoutContent, VaultUpdateId, + DocumentUpdateMergedContent, DocumentUpdateMetadata, DocumentVersionWithoutContent, + VaultUpdateId, }; /// Response to a ping request. @@ -66,7 +67,7 @@ pub struct ListVaultsResponse { pub user_name: String, } -/// Response to an update document request. +/// Response to a create/update document request. #[derive(TS, Debug, Clone, Serialize)] #[serde(tag = "type")] #[ts(export)] @@ -74,9 +75,9 @@ pub enum DocumentUpdateResponse { /// Returned when the created/updated document's content is the same as was /// sent in the create/update request and thus the response doesn't contain /// the content because the client must already have it. - FastForwardUpdate(DocumentVersionWithoutContent), + FastForwardUpdate(DocumentUpdateMetadata), /// Returned when the created/updated document's content is different from /// what was sent in the create/update request. - MergingUpdate(DocumentVersion), + MergingUpdate(DocumentUpdateMergedContent), } diff --git a/sync-server/src/server/restore_document_version.rs b/sync-server/src/server/restore_document_version.rs index 36c0344e..fadbdb5a 100644 --- a/sync-server/src/server/restore_document_version.rs +++ b/sync-server/src/server/restore_document_version.rs @@ -11,9 +11,12 @@ use super::device_id_header::DeviceIdHeader; use crate::{ app_state::{ AppState, - database::models::{ - DocumentId, DocumentVersionWithoutContent, StoredDocumentVersion, VaultId, - VaultUpdateId, + database::{ + InsertBroadcast, + models::{ + DocumentId, DocumentVersionWithoutContent, StoredDocumentVersion, VaultId, + VaultUpdateId, + }, }, }, config::user_config::User, @@ -120,6 +123,14 @@ pub async fn restore_document_version( .await .map_err(server_error)?; + // The current latest (pre-restore) is our baseline for deciding + // whether content and/or path actually change. + let current_latest = state + .database + .get_latest_document(&vault_id, &document_id, Some(&mut *transaction)) + .await + .map_err(server_error)?; + let new_version = StoredDocumentVersion { vault_update_id: last_update_id + 1, document_id, @@ -132,9 +143,27 @@ pub async fn restore_document_version( has_been_merged: false, }; + let (content_changed, path_changed) = match ¤t_latest { + Some(prev) => ( + prev.content != new_version.content || prev.is_deleted, + prev.relative_path != new_version.relative_path, + ), + // No prior version (shouldn't happen in practice — target_version + // already proved the document exists — but treat defensively). + None => (true, true), + }; + state .database - .insert_document_version(&vault_id, &new_version, transaction) + .insert_document_version( + &vault_id, + &new_version, + transaction, + InsertBroadcast { + content_changed, + path_changed, + }, + ) .await .map_err(server_error)?; diff --git a/sync-server/src/server/update_document.rs b/sync-server/src/server/update_document.rs index b6227de5..998c2dd4 100644 --- a/sync-server/src/server/update_document.rs +++ b/sync-server/src/server/update_document.rs @@ -17,7 +17,7 @@ use crate::{ app_state::{ AppState, database::{ - WriteTransaction, + InsertBroadcast, WriteTransaction, models::{DocumentId, StoredDocumentVersion, VaultId, VaultUpdateId}, }, }, @@ -292,6 +292,14 @@ pub async fn update_document( latest_version.relative_path.clone() }; + let content_changed = merged_content != latest_version.content; + // Stored path differs from either the prior stored path (peers need + // to learn about the rename) or from the path the origin sent + // (origin needs to learn if its rename was deduped or rejected by + // first-rename-wins). + let path_changed = new_relative_path != latest_version.relative_path + || new_relative_path != sanitized_relative_path; + let new_version = StoredDocumentVersion { document_id, vault_update_id: last_update_id + 1, @@ -306,7 +314,15 @@ pub async fn update_document( state .database - .insert_document_version(&vault_id, &new_version, transaction) + .insert_document_version( + &vault_id, + &new_version, + transaction, + InsertBroadcast { + content_changed, + path_changed, + }, + ) .await .map_err(server_error)?; diff --git a/sync-server/src/server/websocket.rs b/sync-server/src/server/websocket.rs index a0d15c10..ffac8d38 100644 --- a/sync-server/src/server/websocket.rs +++ b/sync-server/src/server/websocket.rs @@ -179,7 +179,8 @@ async fn websocket( .filter(|client| client.device_id != device_id) .collect(), }), - WebSocketServerMessage::VaultUpdate(_) => update.message, + WebSocketServerMessage::VaultUpdate(_) + | WebSocketServerMessage::PathChange(_) => update.message, }; send_update_over_websocket(&message, &mut sender).await?;