Add path change to server

This commit is contained in:
Andras Schmelczer 2026-04-21 20:09:36 +01:00
parent 9183f30b5d
commit dca59a18dc
9 changed files with 225 additions and 29 deletions

View file

@ -16,6 +16,24 @@ pub mod models;
#[error("Database is busy")]
pub struct WriteBusyError;
/// Tells [`Database::insert_document_version`] which WebSocket events the
/// just-committed version should produce. The caller is the only party
/// with enough context to decide this (the DB layer has no access to
/// "what the client sent" or "what the prior version looked like").
#[derive(Debug, Clone, Copy, Default)]
pub struct InsertBroadcast {
/// Emit a `VaultUpdate` (filtered from the origin device). Set when
/// the stored bytes differ from the prior version's bytes — i.e.
/// peers need to pull new content.
pub content_changed: bool,
/// Emit a `PathChange` (delivered to every client, origin included).
/// Set when the stored path differs from the prior stored path *or*
/// from the path the origin client sent — i.e. someone needs to
/// reconcile a dedupe, rename, or first-rename-wins outcome.
pub path_changed: bool,
}
use sqlx::{
Pool, Sqlite, pool::PoolConnection, sqlite::SqliteConnection, sqlite::SqlitePoolOptions,
};
@ -25,7 +43,10 @@ use uuid::fmt::Hyphenated;
use super::websocket::{
broadcasts::Broadcasts,
models::{WebSocketServerMessage, WebSocketServerMessageWithOrigin, WebSocketVaultUpdate},
models::{
WebSocketServerMessage, WebSocketServerMessageWithOrigin, WebSocketVaultPathChange,
WebSocketVaultUpdate,
},
};
use crate::config::database_config::DatabaseConfig;
use crate::consts::IDLE_POOL_TIMEOUT;
@ -669,6 +690,7 @@ impl Database {
vault_id: &VaultId,
version: &StoredDocumentVersion,
mut transaction: WriteTransaction,
broadcast: InsertBroadcast,
) -> Result<()> {
let document_id = version.document_id.as_hyphenated();
let query = sqlx::query!(
@ -712,18 +734,43 @@ impl Database {
.await
.context("Failed to commit transaction")?;
self.broadcasts
.send_document_update(
vault_id.clone(),
WebSocketServerMessageWithOrigin::with_origin(
version.device_id.clone(),
WebSocketServerMessage::VaultUpdate(WebSocketVaultUpdate {
documents: vec![version.clone().into()],
is_initial_sync: false,
}),
),
)
.await;
if broadcast.content_changed {
// Content events are filtered out for the origin device — the
// origin already has the content (or learns about the merge
// via the HTTP response).
self.broadcasts
.send_document_update(
vault_id.clone(),
WebSocketServerMessageWithOrigin::with_origin(
version.device_id.clone(),
WebSocketServerMessage::VaultUpdate(WebSocketVaultUpdate {
documents: vec![version.clone().into()],
is_initial_sync: false,
}),
),
)
.await;
}
if broadcast.path_changed {
// Path change events intentionally carry no origin so *every*
// connected client (including the one that made the write)
// receives them. The create/update HTTP response no longer
// carries `relative_path`, so the origin device relies on this
// event to learn the server-canonical path.
self.broadcasts
.send_document_update(
vault_id.clone(),
WebSocketServerMessageWithOrigin::new(WebSocketServerMessage::PathChange(
WebSocketVaultPathChange {
vault_update_id: version.vault_update_id,
document_id: version.document_id,
relative_path: version.relative_path.clone(),
},
)),
)
.await;
}
Ok(())
}

View file

@ -77,6 +77,72 @@ pub struct DocumentVersion {
pub device_id: DeviceId,
}
/// Like [`DocumentVersionWithoutContent`] but without the `relative_path`.
/// Used only in create/update responses where the client already tracks
/// the path locally (the server is the source of truth for the
/// document identity, not its path).
#[derive(TS, Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct DocumentUpdateMetadata {
#[ts(type = "number")]
pub vault_update_id: VaultUpdateId,
pub document_id: DocumentId,
pub updated_date: DateTime<Utc>,
pub is_deleted: bool,
pub user_id: UserId,
pub device_id: DeviceId,
#[ts(type = "number")]
pub content_size: u64,
}
impl From<StoredDocumentVersion> for DocumentUpdateMetadata {
fn from(value: StoredDocumentVersion) -> Self {
Self {
vault_update_id: value.vault_update_id,
document_id: value.document_id,
updated_date: value.updated_date,
is_deleted: value.is_deleted,
user_id: value.user_id,
device_id: value.device_id,
content_size: value.content.len() as u64,
}
}
}
/// Like [`DocumentVersion`] but without the `relative_path`.
/// Used only in create/update responses when the server had to merge the
/// client's content with a newer remote version and therefore must echo
/// the merged content back.
#[derive(TS, Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct DocumentUpdateMergedContent {
#[ts(type = "number")]
pub vault_update_id: VaultUpdateId,
pub document_id: DocumentId,
pub updated_date: DateTime<Utc>,
pub content_base64: String,
pub is_deleted: bool,
pub user_id: UserId,
pub device_id: DeviceId,
}
impl From<StoredDocumentVersion> for DocumentUpdateMergedContent {
fn from(value: StoredDocumentVersion) -> Self {
Self {
vault_update_id: value.vault_update_id,
document_id: value.document_id,
updated_date: value.updated_date,
content_base64: STANDARD.encode(&value.content),
is_deleted: value.is_deleted,
user_id: value.user_id,
device_id: value.device_id,
}
}
}
/// Row struct for vault history queries (used by `sqlx::query_as!`)
#[derive(Debug)]
pub struct VaultHistoryRow {

View file

@ -64,6 +64,15 @@ pub struct WebSocketVaultUpdate {
pub is_initial_sync: bool,
}
#[derive(TS, Serialize, Clone, Debug)]
#[serde(rename_all = "camelCase")]
pub struct WebSocketVaultPathChange {
#[ts(type = "number")]
pub vault_update_id: VaultUpdateId,
pub document_id: DocumentId,
pub relative_path: String,
}
#[derive(TS, Deserialize, Clone, Debug)]
#[serde(rename_all = "camelCase", tag = "type")]
#[ts(export)]
@ -77,6 +86,7 @@ pub enum WebSocketClientMessage {
#[ts(export)]
pub enum WebSocketServerMessage {
VaultUpdate(WebSocketVaultUpdate),
PathChange(WebSocketVaultPathChange),
CursorPositions(CursorPositionFromServer),
}

View file

@ -11,7 +11,10 @@ use super::{device_id_header::DeviceIdHeader, requests::CreateDocumentVersion};
use crate::{
app_state::{
AppState,
database::models::{StoredDocumentVersion, VaultId},
database::{
InsertBroadcast,
models::{StoredDocumentVersion, VaultId},
},
},
config::user_config::User,
errors::{SyncServerError, client_error, server_error, write_transaction_error},
@ -116,6 +119,8 @@ pub async fn create_document(
);
}
let path_changed = deduped_path != sanitized_relative_path;
let new_version = StoredDocumentVersion {
vault_update_id: last_update_id + 1,
document_id,
@ -130,7 +135,17 @@ pub async fn create_document(
state
.database
.insert_document_version(&vault_id, &new_version, transaction)
.insert_document_version(
&vault_id,
&new_version,
transaction,
InsertBroadcast {
// A brand-new document is always a content change for peers.
content_changed: true,
// Origin needs to know if the server deduped its requested path.
path_changed,
},
)
.await
.map_err(server_error)?;

View file

@ -11,8 +11,9 @@ use super::{device_id_header::DeviceIdHeader, requests::DeleteDocumentVersion};
use crate::{
app_state::{
AppState,
database::models::{
DocumentId, DocumentVersionWithoutContent, StoredDocumentVersion, VaultId,
database::{
InsertBroadcast,
models::{DocumentId, DocumentVersionWithoutContent, StoredDocumentVersion, VaultId},
},
},
config::user_config::User,
@ -91,7 +92,17 @@ pub async fn delete_document(
state
.database
.insert_document_version(&vault_id, &new_version, transaction)
.insert_document_version(
&vault_id,
&new_version,
transaction,
InsertBroadcast {
// Deletion is a content change peers must learn about.
content_changed: true,
// Delete never renames.
path_changed: false,
},
)
.await
.map_err(server_error)?;

View file

@ -3,7 +3,8 @@ use serde::{self, Serialize};
use ts_rs::TS;
use crate::app_state::database::models::{
DocumentVersion, DocumentVersionWithoutContent, VaultUpdateId,
DocumentUpdateMergedContent, DocumentUpdateMetadata, DocumentVersionWithoutContent,
VaultUpdateId,
};
/// Response to a ping request.
@ -66,7 +67,7 @@ pub struct ListVaultsResponse {
pub user_name: String,
}
/// Response to an update document request.
/// Response to a create/update document request.
#[derive(TS, Debug, Clone, Serialize)]
#[serde(tag = "type")]
#[ts(export)]
@ -74,9 +75,9 @@ pub enum DocumentUpdateResponse {
/// Returned when the created/updated document's content is the same as was
/// sent in the create/update request and thus the response doesn't contain
/// the content because the client must already have it.
FastForwardUpdate(DocumentVersionWithoutContent),
FastForwardUpdate(DocumentUpdateMetadata),
/// Returned when the created/updated document's content is different from
/// what was sent in the create/update request.
MergingUpdate(DocumentVersion),
MergingUpdate(DocumentUpdateMergedContent),
}

View file

@ -11,9 +11,12 @@ use super::device_id_header::DeviceIdHeader;
use crate::{
app_state::{
AppState,
database::models::{
DocumentId, DocumentVersionWithoutContent, StoredDocumentVersion, VaultId,
VaultUpdateId,
database::{
InsertBroadcast,
models::{
DocumentId, DocumentVersionWithoutContent, StoredDocumentVersion, VaultId,
VaultUpdateId,
},
},
},
config::user_config::User,
@ -120,6 +123,14 @@ pub async fn restore_document_version(
.await
.map_err(server_error)?;
// The current latest (pre-restore) is our baseline for deciding
// whether content and/or path actually change.
let current_latest = state
.database
.get_latest_document(&vault_id, &document_id, Some(&mut *transaction))
.await
.map_err(server_error)?;
let new_version = StoredDocumentVersion {
vault_update_id: last_update_id + 1,
document_id,
@ -132,9 +143,27 @@ pub async fn restore_document_version(
has_been_merged: false,
};
let (content_changed, path_changed) = match &current_latest {
Some(prev) => (
prev.content != new_version.content || prev.is_deleted,
prev.relative_path != new_version.relative_path,
),
// No prior version (shouldn't happen in practice — target_version
// already proved the document exists — but treat defensively).
None => (true, true),
};
state
.database
.insert_document_version(&vault_id, &new_version, transaction)
.insert_document_version(
&vault_id,
&new_version,
transaction,
InsertBroadcast {
content_changed,
path_changed,
},
)
.await
.map_err(server_error)?;

View file

@ -17,7 +17,7 @@ use crate::{
app_state::{
AppState,
database::{
WriteTransaction,
InsertBroadcast, WriteTransaction,
models::{DocumentId, StoredDocumentVersion, VaultId, VaultUpdateId},
},
},
@ -292,6 +292,14 @@ pub async fn update_document(
latest_version.relative_path.clone()
};
let content_changed = merged_content != latest_version.content;
// Stored path differs from either the prior stored path (peers need
// to learn about the rename) or from the path the origin sent
// (origin needs to learn if its rename was deduped or rejected by
// first-rename-wins).
let path_changed = new_relative_path != latest_version.relative_path
|| new_relative_path != sanitized_relative_path;
let new_version = StoredDocumentVersion {
document_id,
vault_update_id: last_update_id + 1,
@ -306,7 +314,15 @@ pub async fn update_document(
state
.database
.insert_document_version(&vault_id, &new_version, transaction)
.insert_document_version(
&vault_id,
&new_version,
transaction,
InsertBroadcast {
content_changed,
path_changed,
},
)
.await
.map_err(server_error)?;

View file

@ -179,7 +179,8 @@ async fn websocket(
.filter(|client| client.device_id != device_id)
.collect(),
}),
WebSocketServerMessage::VaultUpdate(_) => update.message,
WebSocketServerMessage::VaultUpdate(_)
| WebSocketServerMessage::PathChange(_) => update.message,
};
send_update_over_websocket(&message, &mut sender).await?;