diff --git a/backend/Dockerfile b/backend/Dockerfile deleted file mode 100644 index 4f3021e..0000000 --- a/backend/Dockerfile +++ /dev/null @@ -1,33 +0,0 @@ -FROM rust:1.87 AS builder - -WORKDIR /usr/src/backend - -RUN apt update && apt install -y musl-tools -RUN cargo install sqlx-cli - -COPY . . - -RUN sqlx database create --database-url sqlite://db.sqlite3 -RUN sqlx migrate run --source sync_server/src/app_state/database/migrations --database-url sqlite://db.sqlite3 - -RUN cargo build --package sync_server --release --target x86_64-unknown-linux-musl - -# Runtime image -FROM alpine:3.21.3 - -LABEL org.opencontainers.image.authors="andras@schmelczer.dev" - -RUN apk add --no-cache curl - -COPY --from=builder /usr/src/backend/target/x86_64-unknown-linux-musl/release/sync_server /app/sync_server - -VOLUME /data -EXPOSE 3000/tcp -WORKDIR /data - -HEALTHCHECK \ - --interval=30s \ - --timeout=5s \ - CMD curl -f http://localhost:3000/vaults/fake/ping || exit 1 - -ENTRYPOINT ["/app/sync_server"] diff --git a/backend/sync_server/Cargo.toml b/backend/sync_server/Cargo.toml deleted file mode 100644 index 3ca2c75..0000000 --- a/backend/sync_server/Cargo.toml +++ /dev/null @@ -1,40 +0,0 @@ -[package] -name = "sync_server" -version.workspace = true -edition.workspace = true -authors.workspace = true -license.workspace = true -repository.workspace = true - -[dependencies] -sync_lib = { path = "../sync_lib" } - -serde = { workspace = true } -thiserror = { workspace = true } - -tokio = { version = "1.44.2", features = ["full"]} -uuid = { version = "1.16.0", features = ["v4", "serde"] } -log = { version = "0.4.27" } -anyhow = { version = "1.0.98", features = ["backtrace"] } -axum = { version = "0.7.4", features = ["ws", "macros", "tracing", "multipart"]} -axum-extra = { version = "0.9.6", features = ["typed-header"] } -axum_typed_multipart = "0.11.0" -tower-http = { version = "0.6.1", features = ["cors", "trace", "limit", "timeout"] } -tracing = "0.1.41" -tracing-subscriber = { version = "0.3.19", features = ["fmt", "env-filter"]} -sqlx = { version = "0.8.6", features = ["sqlite", "runtime-tokio", "uuid", "chrono"] } -chrono = { version = "0.4.41", features = ["serde"] } -rand = "0.9.0" -sanitize-filename = "0.6.0" -regex = "1.11.1" -clap = { version = "4.5.38", features = ["derive"] } -futures = "0.3.31" -serde_yaml = "0.9.34" -serde_json = "1.0.140" -clap-verbosity-flag = "3.0.3" -bimap = "0.6.3" -ts-rs = { version = "10.1", features = ["uuid-impl", "chrono-impl"] } -serde_with = "3.12.0" - -[lints] -workspace = true diff --git a/backend/sync_server/README.md b/backend/sync_server/README.md deleted file mode 100644 index 7d61209..0000000 --- a/backend/sync_server/README.md +++ /dev/null @@ -1,9 +0,0 @@ -# Sync server - -## Creating/resetting the Database for development - -```sh -sqlx database create --database-url sqlite://db.sqlite3 -sqlx migrate run --source sync_server/src/app_state/database/migrations --database-url sqlite://db.sqlite3 -cargo sqlx prepare --workspace -``` diff --git a/backend/sync_server/build.rs b/backend/sync_server/build.rs deleted file mode 100644 index d506869..0000000 --- a/backend/sync_server/build.rs +++ /dev/null @@ -1,5 +0,0 @@ -// generated by `sqlx migrate build-script` -fn main() { - // trigger recompilation when a new migration is added - println!("cargo:rerun-if-changed=migrations"); -} diff --git a/backend/sync_server/src/app_state.rs b/backend/sync_server/src/app_state.rs deleted file mode 100644 index a61467d..0000000 --- a/backend/sync_server/src/app_state.rs +++ /dev/null @@ -1,41 +0,0 @@ -pub mod cursors; -pub mod database; -pub mod websocket; - -use std::ffi::OsString; - -use anyhow::Result; -use cursors::Cursors; -use database::Database; -use websocket::broadcasts::Broadcasts; - -use crate::{config::Config, consts::DEFAULT_CONFIG_PATH}; - -#[derive(Clone, Debug)] -pub struct AppState { - pub config: Config, - pub database: Database, - pub cursors: Cursors, - pub broadcasts: Broadcasts, -} - -impl AppState { - pub async fn try_new(config_path: Option) -> Result { - let config_path = config_path.unwrap_or_else(|| OsString::from(DEFAULT_CONFIG_PATH)); - let path = std::path::PathBuf::from(config_path); - - let config = Config::read_or_create(&path).await?; - let broadcasts = Broadcasts::new(&config.server); - let database = Database::try_new(&config.database, &broadcasts).await?; - let cursors: Cursors = Cursors::new(&config.database, &broadcasts); - - Cursors::start_background_task(cursors.clone()); - - Ok(Self { - config, - database, - cursors, - broadcasts, - }) - } -} diff --git a/backend/sync_server/src/app_state/cursors.rs b/backend/sync_server/src/app_state/cursors.rs deleted file mode 100644 index 245109c..0000000 --- a/backend/sync_server/src/app_state/cursors.rs +++ /dev/null @@ -1,128 +0,0 @@ -use core::time::Duration; -use std::{collections::HashMap, sync::Arc}; - -use tokio::sync::Mutex; - -use super::{ - database::models::{DeviceId, VaultId}, - websocket::{ - broadcasts::Broadcasts, - models::{ - ClientCursors, CursorPositionFromServer, CursorSpan, WebSocketServerMessage, - WebSocketServerMessageWithOrigin, - }, - }, -}; -use crate::config::database_config::DatabaseConfig; - -#[derive(Clone, Debug)] -pub struct Cursors { - config: DatabaseConfig, - broadcasts: Broadcasts, - vault_to_cursors: Arc>>>, -} - -impl Cursors { - pub fn new(config: &DatabaseConfig, broadcasts: &Broadcasts) -> Self { - Self { - config: config.clone(), - broadcasts: broadcasts.clone(), - vault_to_cursors: Arc::new(Mutex::new(HashMap::new())), - } - } - - pub async fn update_cursors( - &self, - vault_id: VaultId, - user_name: String, - device_id: &DeviceId, - document_to_cursors: HashMap>, - ) { - let mut vault_to_cursors = self.vault_to_cursors.lock().await; - - let all_device_cursors = vault_to_cursors.entry(vault_id).or_insert_with(Vec::new); - - all_device_cursors.retain(|c| &c.client_cursors.device_id != device_id); - all_device_cursors.push(ClientCursorsWithTimeToLive::new(ClientCursors { - user_name, - device_id: device_id.to_string(), - cursors: document_to_cursors, - })); - - drop(vault_to_cursors); // Explicitly drop the lock before broadcasting to avoid deadlock - self.broadcast_cursors().await; - } - - pub async fn get_cursors(&self, vault_id: &VaultId) -> Vec { - let vault_to_cursors = self.vault_to_cursors.lock().await; - vault_to_cursors - .get(vault_id) - .map(|cursors| { - cursors - .iter() - .cloned() - .map(|with_ttl| with_ttl.client_cursors) - .collect::>() - }) - .unwrap_or_default() - } - - pub fn start_background_task(self) { - tokio::spawn(async move { - loop { - self.remove_expired_cursors().await; - tokio::time::sleep(Duration::from_secs(1)).await; - } - }); - } - - async fn remove_expired_cursors(&self) { - let mut vault_to_cursors = self.vault_to_cursors.lock().await; - - for (_vault_id, cursors) in vault_to_cursors.iter_mut() { - cursors.retain(|cursor| !cursor.is_expired(self.config.cursor_timeout)); - } - } - - async fn broadcast_cursors(&self) { - let vault_to_cursors = self.vault_to_cursors.lock().await; - - for (vault_id, cursors) in vault_to_cursors.iter() { - self.broadcasts - .send_document_update( - vault_id.clone(), - WebSocketServerMessageWithOrigin::new(WebSocketServerMessage::CursorPositions( - CursorPositionFromServer { - clients: cursors.iter().map(|c| c.client_cursors.clone()).collect(), - }, - )), - ) - .await; - } - } - - pub async fn remove_cursors_of_device(&self, vault_id: &str, device_id: &str) { - let mut vault_to_cursors = self.vault_to_cursors.lock().await; - - if let Some(cursors) = vault_to_cursors.get_mut(vault_id) { - cursors.retain(|c| c.client_cursors.device_id != device_id); - } - } -} - -#[derive(Clone, Debug)] -struct ClientCursorsWithTimeToLive { - client_cursors: ClientCursors, - last_updated: std::time::Instant, -} - -impl ClientCursorsWithTimeToLive { - fn new(client_cursors: ClientCursors) -> Self { - Self { - client_cursors, - last_updated: std::time::Instant::now(), - } - } - - pub fn is_expired(&self, ttl: Duration) -> bool { self.last_updated.elapsed() > ttl } -} diff --git a/backend/sync_server/src/app_state/database.rs b/backend/sync_server/src/app_state/database.rs deleted file mode 100644 index f894014..0000000 --- a/backend/sync_server/src/app_state/database.rs +++ /dev/null @@ -1,425 +0,0 @@ -use core::time::Duration; -use std::{collections::HashMap, sync::Arc}; - -use anyhow::{Context as _, Result}; -use models::{ - DocumentId, DocumentVersionWithoutContent, StoredDocumentVersion, VaultId, VaultUpdateId, -}; -use sqlx::{sqlite::SqliteConnectOptions, types::chrono::Utc}; - -pub mod models; -use sqlx::{Pool, Sqlite, sqlite::SqlitePoolOptions}; -use tokio::sync::Mutex; -use uuid::fmt::Hyphenated; - -use super::websocket::{ - broadcasts::Broadcasts, - models::{WebSocketServerMessage, WebSocketServerMessageWithOrigin, WebSocketVaultUpdate}, -}; -use crate::config::database_config::DatabaseConfig; - -#[derive(Clone, Debug)] -pub struct Database { - config: DatabaseConfig, - broadcasts: Broadcasts, - connection_pools: Arc>>>, -} - -pub type Transaction<'a> = sqlx::Transaction<'a, Sqlite>; - -impl Database { - pub async fn try_new(config: &DatabaseConfig, broadcasts: &Broadcasts) -> Result { - tokio::fs::create_dir_all(&config.databases_directory_path) - .await - .with_context(|| { - format!( - "Failed to create databases directory: {}", - config.databases_directory_path.to_string_lossy() - ) - })?; - - let mut connection_pools = std::collections::HashMap::new(); - - let mut entries = tokio::fs::read_dir(&config.databases_directory_path).await?; - while let Some(entry) = entries.next_entry().await? { - if !entry.file_name().to_string_lossy().ends_with(".sqlite") { - continue; - } - - let vault: VaultId = entry - .file_name() - .to_string_lossy() - .trim_end_matches(".sqlite") - .to_owned(); - - connection_pools.insert( - vault.clone(), - Self::create_vault_database(config, &vault).await?, - ); - } - - Ok(Self { - config: config.clone(), - connection_pools: Arc::new(Mutex::new(connection_pools)), - broadcasts: broadcasts.clone(), - }) - } - - async fn create_vault_database( - config: &DatabaseConfig, - vault: &VaultId, - ) -> Result> { - let file_name = config - .databases_directory_path - .join(format!("{vault}.sqlite")); - - let connection_options = SqliteConnectOptions::new() - .filename(file_name.clone()) - .create_if_missing(true) - .busy_timeout(Duration::from_secs(3600)) - .journal_mode(sqlx::sqlite::SqliteJournalMode::Wal); - - let pool = SqlitePoolOptions::new() - .max_connections(config.max_connections_per_vault) - .test_before_acquire(true) - .connect_with(connection_options) - .await - .with_context(|| format!("Cannot open database at {}", file_name.display()))?; - - Self::run_migrations(&pool).await?; - - Ok(pool) - } - - async fn run_migrations(pool: &Pool) -> Result<()> { - sqlx::migrate!("src/app_state/database/migrations") - .run(pool) - .await - .context("Cannot check for pending migrations") - } - - async fn get_connection_pool(&self, vault: &VaultId) -> Result> { - let mut pools = self.connection_pools.lock().await; - if !pools.contains_key(vault) { - let pool = Self::create_vault_database(&self.config, vault).await?; - pools.insert(vault.clone(), pool); - } - - let pool = pools - .get(vault) - .expect("Pool was just inserted or already exists"); - - Ok(pool.clone()) - } - - /// Attempting to write from this transaction might result in a - /// database locked error. Use this transaction for read-only operations. - pub async fn create_readonly_transaction( - &self, - vault: &VaultId, - ) -> Result> { - self.get_connection_pool(vault) - .await? - .begin() - .await - .context("Cannot create transaction") - } - - pub async fn create_write_transaction(&self, vault: &VaultId) -> Result> { - let mut transaction = self.create_readonly_transaction(vault).await?; - - // sqlx doesn't support immediate transactions for sqlite: https://github.com/launchbadge/sqlx/issues/481 - sqlx::query!("END; BEGIN IMMEDIATE;") - .execute(&mut *transaction) - .await?; - - Ok(transaction) - } - - /// Return the latest state of all documents in the vault - pub async fn get_latest_documents( - &self, - vault: &VaultId, - transaction: Option<&mut Transaction<'_>>, - ) -> Result> { - let query = sqlx::query!( - r#" - select - vault_update_id, - document_id as "document_id: Hyphenated", - relative_path, - updated_date as "updated_date: chrono::DateTime", - is_deleted, - user_id, - device_id, - length(content) as "content_size: u64" - from latest_document_versions - order by vault_update_id - "#, - ); - - if let Some(transaction) = transaction { - query.fetch_all(&mut **transaction).await - } else { - query - .fetch_all(&self.get_connection_pool(vault).await?) - .await - } - .context("Cannot fetch latest documents") - .map(|rows| { - rows.into_iter() - .map(|row| DocumentVersionWithoutContent { - vault_update_id: row.vault_update_id, - document_id: row.document_id.into(), - relative_path: row.relative_path, - updated_date: row.updated_date, - is_deleted: row.is_deleted, - user_id: row.user_id, - device_id: row.device_id, - content_size: row - .content_size - .expect("Content size can't be null but sqlx can't infer it"), - }) - .collect() - }) - } - - /// Return the latest state of all documents (including deleted) in the - /// vault which have changed since the given update id - pub async fn get_latest_documents_since( - &self, - vault: &VaultId, - vault_update_id: VaultUpdateId, - transaction: Option<&mut Transaction<'_>>, - ) -> Result> { - let query = sqlx::query!( - r#" - select - vault_update_id, - document_id as "document_id: Hyphenated", - relative_path, - updated_date as "updated_date: chrono::DateTime", - is_deleted, - user_id, - device_id, - length(content) as "content_size: u64" - from latest_document_versions - where vault_update_id > ? - order by vault_update_id - "#, - vault_update_id - ); - - if let Some(transaction) = transaction { - query.fetch_all(&mut **transaction).await - } else { - query - .fetch_all(&self.get_connection_pool(vault).await?) - .await - } - .with_context(|| { - format!("Cannot fetch latest documents since vault_update_id {vault_update_id}") - }) - .map(|rows| { - rows.into_iter() - .map(|row| DocumentVersionWithoutContent { - vault_update_id: row.vault_update_id, - document_id: row.document_id.into(), - relative_path: row.relative_path, - updated_date: row.updated_date, - is_deleted: row.is_deleted, - user_id: row.user_id, - device_id: row.device_id, - content_size: row - .content_size - .expect("Content size can't be null but sqlx can't infer it"), - }) - .collect() - }) - } - - pub async fn get_max_update_id_in_vault( - &self, - vault: &VaultId, - transaction: Option<&mut Transaction<'_>>, - ) -> Result { - let query = sqlx::query!( - r#" - select coalesce(max(vault_update_id), 0) as max_vault_update_id - from documents - "#, - ); - - if let Some(transaction) = transaction { - query.fetch_one(&mut **transaction).await - } else { - query - .fetch_one(&self.get_connection_pool(vault).await?) - .await - } - .map(|row| row.max_vault_update_id) - .context("Cannot fetch max update id in vault") - } - - pub async fn get_latest_document_by_path( - &self, - vault: &VaultId, - relative_path: &str, - transaction: Option<&mut Transaction<'_>>, - ) -> Result> { - let query = sqlx::query_as!( - StoredDocumentVersion, - r#" - select - vault_update_id, - document_id as "document_id: Hyphenated", - relative_path, - updated_date as "updated_date: chrono::DateTime", - content, - is_deleted, - user_id, - device_id - from latest_document_versions - where relative_path = ? - order by vault_update_id desc -- `latest_document_versions` only contains a single latest version of each document, however, - -- multiple documents can have the same `relative_path`, if they have been deleted. That's - -- why we only care about the latest version of the document with the given relative path. - limit 1 - "#, - relative_path - ); - - if let Some(transaction) = transaction { - query.fetch_optional(&mut **transaction).await - } else { - query - .fetch_optional(&self.get_connection_pool(vault).await?) - .await - } - .context("Cannot fetch latest document version") - } - - pub async fn get_latest_document( - &self, - vault: &VaultId, - document_id: &DocumentId, - transaction: Option<&mut Transaction<'_>>, - ) -> Result> { - let document_id = document_id.as_hyphenated(); - let query = sqlx::query_as!( - StoredDocumentVersion, - r#" - select - vault_update_id, - document_id as "document_id: Hyphenated", - relative_path, - updated_date as "updated_date: chrono::DateTime", - content, - is_deleted, - user_id, - device_id - from latest_document_versions - where document_id = ? - "#, - document_id - ); - - if let Some(transaction) = transaction { - query.fetch_optional(&mut **transaction).await - } else { - query - .fetch_optional(&self.get_connection_pool(vault).await?) - .await - } - .context("Cannot fetch latest document version") - } - - pub async fn get_document_version( - &self, - vault: &VaultId, - vault_update_id: VaultUpdateId, - transaction: Option<&mut Transaction<'_>>, - ) -> Result> { - let query = sqlx::query_as!( - StoredDocumentVersion, - r#" - select - vault_update_id, - document_id as "document_id: Hyphenated", - relative_path, - updated_date as "updated_date: chrono::DateTime", - content, - is_deleted, - user_id, - device_id - from documents - where vault_update_id = ?"#, - vault_update_id - ); - - if let Some(transaction) = transaction { - query.fetch_optional(&mut **transaction).await - } else { - query - .fetch_optional(&self.get_connection_pool(vault).await?) - .await - } - .context("Cannot fetch document version") - } - - pub async fn insert_document_version( - &self, - vault_id: &VaultId, - version: &StoredDocumentVersion, - transaction: Option<&mut Transaction<'_>>, - ) -> Result<()> { - let document_id = version.document_id.as_hyphenated(); - let query = sqlx::query!( - r#" - insert into documents ( - vault_update_id, - document_id, - relative_path, - updated_date, - content, - is_deleted, - user_id, - device_id - ) - values (?, ?, ?, ?, ?, ?, ?, ?) - "#, - version.vault_update_id, - document_id, - version.relative_path, - version.updated_date, - version.content, - version.is_deleted, - version.user_id, - version.device_id - ); - - if let Some(transaction) = transaction { - query.execute(&mut **transaction).await - } else { - query - .execute(&self.get_connection_pool(vault_id).await?) - .await - } - .context("Cannot insert document version")?; - - self.broadcasts - .send_document_update( - vault_id.clone(), - WebSocketServerMessageWithOrigin::with_origin( - version.device_id.clone(), - WebSocketServerMessage::VaultUpdate(WebSocketVaultUpdate { - documents: vec![version.clone().into()], - is_initial_sync: false, - }), - ), - ) - .await; - - Ok(()) - } -} diff --git a/backend/sync_server/src/app_state/database/migrations/20241207143519_bootstrap.sql b/backend/sync_server/src/app_state/database/migrations/20241207143519_bootstrap.sql deleted file mode 100644 index 4a9f31b..0000000 --- a/backend/sync_server/src/app_state/database/migrations/20241207143519_bootstrap.sql +++ /dev/null @@ -1,21 +0,0 @@ -CREATE TABLE IF NOT EXISTS documents ( - vault_update_id INTEGER NOT NULL PRIMARY KEY, - document_id TEXT NOT NULL, - relative_path TEXT NOT NULL, - updated_date TIMESTAMP NOT NULL, - content BLOB NOT NULL, - is_deleted BOOLEAN NOT NULL -); - -CREATE VIEW IF NOT EXISTS latest_document_versions AS -SELECT d.* -FROM documents d -INNER JOIN ( - SELECT MAX(vault_update_id) AS max_version_id - FROM documents - GROUP BY document_id -) max_versions -ON d.vault_update_id = max_versions.max_version_id; - -CREATE INDEX IF NOT EXISTS idx_documents_vault_id_relative_path -ON documents (relative_path); diff --git a/backend/sync_server/src/app_state/database/migrations/20250522192949_add_provenance_columns.sql b/backend/sync_server/src/app_state/database/migrations/20250522192949_add_provenance_columns.sql deleted file mode 100644 index 0686017..0000000 --- a/backend/sync_server/src/app_state/database/migrations/20250522192949_add_provenance_columns.sql +++ /dev/null @@ -1,2 +0,0 @@ -ALTER TABLE documents ADD COLUMN user_id TEXT NOT NULL DEFAULT ""; -ALTER TABLE documents ADD COLUMN device_id TEXT NOT NULL DEFAULT ""; diff --git a/backend/sync_server/src/app_state/database/models.rs b/backend/sync_server/src/app_state/database/models.rs deleted file mode 100644 index e995611..0000000 --- a/backend/sync_server/src/app_state/database/models.rs +++ /dev/null @@ -1,89 +0,0 @@ -use chrono::{DateTime, Utc}; -use serde::Serialize; -use sync_lib::bytes_to_base64; -use ts_rs::TS; - -pub type VaultId = String; -pub type VaultUpdateId = i64; - -pub type DocumentId = uuid::Uuid; -pub type UserId = String; -pub type DeviceId = String; - -#[derive(Debug, Clone)] -pub struct StoredDocumentVersion { - pub vault_update_id: VaultUpdateId, - pub document_id: DocumentId, - pub relative_path: String, - pub updated_date: DateTime, - pub content: Vec, - pub is_deleted: bool, - pub user_id: UserId, - pub device_id: DeviceId, -} - -impl PartialEq for StoredDocumentVersion { - fn eq(&self, other: &Self) -> bool { self.vault_update_id == other.vault_update_id } -} - -#[derive(TS, Debug, Clone, Serialize)] -#[serde(rename_all = "camelCase")] -pub struct DocumentVersionWithoutContent { - #[ts(as = "i32")] - pub vault_update_id: VaultUpdateId, - - pub document_id: DocumentId, - pub relative_path: String, - pub updated_date: DateTime, - pub is_deleted: bool, - pub user_id: UserId, - pub device_id: DeviceId, - - #[ts(as = "i32")] - pub content_size: u64, -} - -impl From for DocumentVersionWithoutContent { - fn from(value: StoredDocumentVersion) -> Self { - Self { - vault_update_id: value.vault_update_id, - document_id: value.document_id, - relative_path: value.relative_path, - updated_date: value.updated_date, - is_deleted: value.is_deleted, - user_id: value.user_id, - device_id: value.device_id, - content_size: value.content.len() as u64, - } - } -} - -#[derive(TS, Debug, Clone, Serialize)] -#[serde(rename_all = "camelCase")] -pub struct DocumentVersion { - #[ts(as = "i32")] - pub vault_update_id: VaultUpdateId, - - pub document_id: DocumentId, - pub relative_path: String, - pub updated_date: DateTime, - pub content_base64: String, - pub is_deleted: bool, - pub user_id: UserId, - pub device_id: DeviceId, -} - -impl From for DocumentVersion { - fn from(value: StoredDocumentVersion) -> Self { - Self { - vault_update_id: value.vault_update_id, - document_id: value.document_id, - relative_path: value.relative_path, - updated_date: value.updated_date, - content_base64: bytes_to_base64(&value.content), - is_deleted: value.is_deleted, - user_id: value.user_id, - device_id: value.device_id, - } - } -} diff --git a/backend/sync_server/src/app_state/websocket.rs b/backend/sync_server/src/app_state/websocket.rs deleted file mode 100644 index b945606..0000000 --- a/backend/sync_server/src/app_state/websocket.rs +++ /dev/null @@ -1,3 +0,0 @@ -pub mod broadcasts; -pub mod models; -pub mod utils; diff --git a/backend/sync_server/src/app_state/websocket/broadcasts.rs b/backend/sync_server/src/app_state/websocket/broadcasts.rs deleted file mode 100644 index cef6ee6..0000000 --- a/backend/sync_server/src/app_state/websocket/broadcasts.rs +++ /dev/null @@ -1,63 +0,0 @@ -use std::{collections::HashMap, sync::Arc}; - -use anyhow::Context; -use tokio::sync::{Mutex, broadcast}; - -use super::models::WebSocketServerMessageWithOrigin; -use crate::{ - app_state::database::models::VaultId, config::server_config::ServerConfig, errors::server_error, -}; - -#[derive(Debug, Clone)] -pub struct Broadcasts { - max_clients_per_vault: usize, - tx: Arc>>>, -} - -impl Broadcasts { - pub fn new(server_config: &ServerConfig) -> Self { - Self { - max_clients_per_vault: server_config.max_clients_per_vault, - tx: Arc::new(Mutex::new(HashMap::new())), - } - } - - pub async fn get_receiver( - &self, - vault: VaultId, - ) -> broadcast::Receiver { - let tx = self.get_or_create(vault).await; - - tx.subscribe() - } - - /// Notify all clients (who are subscribed to the vault) about an update. - /// We only log failures. - pub async fn send_document_update( - &self, - vault: VaultId, - document: WebSocketServerMessageWithOrigin, - ) { - let tx = self.get_or_create(vault).await; - - let result = tx - .send(document) - .context("Cannot broadcast server message to websocket listeners") - .map_err(server_error); - - if result.is_err() { - log::debug!("Failed to send message: {result:?}"); - } - } - - async fn get_or_create( - &self, - vault: VaultId, - ) -> broadcast::Sender { - let mut tx = self.tx.lock().await; - - tx.entry(vault) - .or_insert_with(|| broadcast::channel(self.max_clients_per_vault).0.clone()) - .clone() - } -} diff --git a/backend/sync_server/src/app_state/websocket/models.rs b/backend/sync_server/src/app_state/websocket/models.rs deleted file mode 100644 index 6bb4f4e..0000000 --- a/backend/sync_server/src/app_state/websocket/models.rs +++ /dev/null @@ -1,88 +0,0 @@ -use std::collections::HashMap; - -use serde::{Deserialize, Serialize}; -use ts_rs::TS; - -use crate::app_state::database::models::{DeviceId, DocumentVersionWithoutContent, VaultUpdateId}; - -#[derive(TS, Deserialize, Clone, Debug)] -#[serde(rename_all = "camelCase")] -pub struct WebSocketHandshake { - pub token: String, - pub device_id: DeviceId, - - #[ts(as = "Option")] - pub last_seen_vault_update_id: Option, -} - -#[derive(TS, Serialize, Deserialize, Clone, Debug)] -#[serde(rename_all = "camelCase")] -pub struct CursorSpan { - pub start: usize, - pub end: usize, -} - -#[derive(TS, Deserialize, Clone, Debug)] -#[serde(rename_all = "camelCase")] -pub struct CursorPositionFromClient { - pub document_to_cursors: HashMap>, -} - -#[derive(TS, Serialize, Clone, Debug)] -#[serde(rename_all = "camelCase")] -pub struct ClientCursors { - pub user_name: String, - pub device_id: DeviceId, - pub cursors: HashMap>, -} - -#[derive(TS, Serialize, Clone, Debug)] -#[serde(rename_all = "camelCase")] -pub struct CursorPositionFromServer { - pub clients: Vec, -} - -#[derive(TS, Serialize, Clone, Debug)] -#[serde(rename_all = "camelCase")] -pub struct WebSocketVaultUpdate { - pub documents: Vec, - pub is_initial_sync: bool, -} - -#[derive(TS, Deserialize, Clone, Debug)] -#[serde(rename_all = "camelCase", tag = "type")] -#[ts(export)] -pub enum WebSocketClientMessage { - Handshake(WebSocketHandshake), - CursorPositions(CursorPositionFromClient), -} - -#[derive(TS, Serialize, Clone, Debug)] -#[serde(rename_all = "camelCase", tag = "type")] -#[ts(export)] -pub enum WebSocketServerMessage { - VaultUpdate(WebSocketVaultUpdate), - CursorPositions(CursorPositionFromServer), -} - -#[derive(Clone, Debug)] -pub struct WebSocketServerMessageWithOrigin { - pub origin_device_id: Option, - pub message: WebSocketServerMessage, -} - -impl WebSocketServerMessageWithOrigin { - pub fn new(message: WebSocketServerMessage) -> Self { - Self { - origin_device_id: None, - message, - } - } - - pub fn with_origin(origin_device_id: DeviceId, message: WebSocketServerMessage) -> Self { - Self { - origin_device_id: Some(origin_device_id), - message, - } - } -} diff --git a/backend/sync_server/src/app_state/websocket/utils.rs b/backend/sync_server/src/app_state/websocket/utils.rs deleted file mode 100644 index 1e0dd24..0000000 --- a/backend/sync_server/src/app_state/websocket/utils.rs +++ /dev/null @@ -1,80 +0,0 @@ -use anyhow::Context; -use axum::extract::ws::{Message, WebSocket}; -use futures::{sink::SinkExt, stream::SplitSink}; - -use super::models::{WebSocketClientMessage, WebSocketHandshake, WebSocketServerMessage}; -use crate::{ - app_state::{ - AppState, - database::models::{DocumentVersionWithoutContent, VaultId, VaultUpdateId}, - }, - config::user_config::User, - errors::{SyncServerError, server_error, unauthenticated_error}, - server::auth::auth, -}; - -pub struct AuthenticatedWebSocketHandshake { - pub handshake: WebSocketHandshake, - pub user: User, -} - -pub fn get_authenticated_handshake( - state: &AppState, - vault_id: &VaultId, - message: Option, -) -> Result { - if let Some(Message::Text(message)) = message { - let message: WebSocketClientMessage = serde_json::from_str(&message) - .context("Failed to parse message") - .map_err(server_error)?; - - match message { - WebSocketClientMessage::Handshake(handshake) => { - let user = auth(state, handshake.token.trim(), vault_id)?; - Ok(AuthenticatedWebSocketHandshake { handshake, user }) - } - WebSocketClientMessage::CursorPositions(_) => Err(unauthenticated_error( - anyhow::anyhow!("Expected a handshake message"), - )), - } - } else { - Err(unauthenticated_error(anyhow::anyhow!( - "Failed to authenticate due to invalid message" - ))) - } -} - -pub async fn get_unseen_documents( - state: &AppState, - vault_id: &VaultId, - last_seen_vault_update_id: Option, -) -> Result, SyncServerError> { - if let Some(update_id) = last_seen_vault_update_id { - state - .database - .get_latest_documents_since(vault_id, update_id, None) - .await - .map_err(server_error) - } else { - state - .database - .get_latest_documents(vault_id, None) - .await - .map_err(server_error) - } -} - -pub async fn send_update_over_websocket( - update: &WebSocketServerMessage, - sender: &mut SplitSink, -) -> Result<(), SyncServerError> { - let serialized_update = serde_json::to_string(update) - .context("Failed to serialize update") - .map_err(server_error)?; - - sender - .send(Message::Text(serialized_update)) - .await - .context("Failed to send message over websocket") - .map_err(server_error) -} diff --git a/backend/sync_server/src/cli.rs b/backend/sync_server/src/cli.rs deleted file mode 100644 index d5c0852..0000000 --- a/backend/sync_server/src/cli.rs +++ /dev/null @@ -1,2 +0,0 @@ -pub mod args; -pub mod color_when; diff --git a/backend/sync_server/src/cli/args.rs b/backend/sync_server/src/cli/args.rs deleted file mode 100644 index 603d8d1..0000000 --- a/backend/sync_server/src/cli/args.rs +++ /dev/null @@ -1,26 +0,0 @@ -use std::ffi::OsString; - -use clap::Parser; -use clap_verbosity_flag::{InfoLevel, Verbosity}; - -use crate::cli::color_when::ColorWhen; - -/// Server for backing the `VaultLink` plugin -#[derive(Parser, Debug)] -#[command(version, about, long_about = None)] -pub struct Args { - #[arg(index = 1)] - pub config_path: Option, - - #[command(flatten)] - pub verbose: Verbosity, - - #[arg( - long, - value_name = "WHEN", - default_value_t = ColorWhen::Auto, - default_missing_value = "always", - value_enum - )] - pub color: ColorWhen, -} diff --git a/backend/sync_server/src/cli/color_when.rs b/backend/sync_server/src/cli/color_when.rs deleted file mode 100644 index a3709b9..0000000 --- a/backend/sync_server/src/cli/color_when.rs +++ /dev/null @@ -1,31 +0,0 @@ -use std::io::IsTerminal; - -use clap::ValueEnum; - -#[derive(ValueEnum, Copy, Clone, Debug, PartialEq, Eq)] -pub enum ColorWhen { - Always, - Auto, - Never, -} - -impl ColorWhen { - pub fn use_colors(self) -> bool { - match self { - ColorWhen::Always => true, - ColorWhen::Auto => { - std::env::var_os("NO_COLOR").is_none() && std::io::stderr().is_terminal() - } - ColorWhen::Never => false, - } - } -} - -impl std::fmt::Display for ColorWhen { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - self.to_possible_value() - .expect("no values are skipped") - .get_name() - .fmt(f) - } -} diff --git a/backend/sync_server/src/config.rs b/backend/sync_server/src/config.rs deleted file mode 100644 index 700b1ea..0000000 --- a/backend/sync_server/src/config.rs +++ /dev/null @@ -1,66 +0,0 @@ -use std::path::Path; - -use anyhow::{Context as _, Result}; -use database_config::DatabaseConfig; -use log::info; -use serde::{Deserialize, Serialize}; -use server_config::ServerConfig; -use tokio::fs; -use user_config::UserConfig; - -pub mod database_config; -pub mod server_config; -pub mod user_config; - -#[derive(Debug, Deserialize, Serialize, Clone, Default)] -pub struct Config { - #[serde(default)] - pub database: DatabaseConfig, - #[serde(default)] - pub server: ServerConfig, - #[serde(default)] - pub users: UserConfig, -} - -impl Config { - pub async fn read_or_create(path: &Path) -> Result { - let config = if path.exists() { - info!( - "Loading configuration from '{}'", - path.canonicalize().unwrap().display() - ); - Self::load_from_file(path).await? - } else { - Self::default() - }; - - config.write(path).await?; - info!( - "Updated configuration at '{}'", - path.canonicalize().unwrap().display() - ); - - Ok(config) - } - - pub async fn load_from_file(path: &Path) -> Result { - let contents = fs::read_to_string(path).await.with_context(|| { - format!( - "Cannot load configuration from disk from {}", - path.display() - ) - })?; - - let config = serde_yaml::from_str(&contents).context("Failed to parse configuration")?; - - Ok(config) - } - - pub async fn write(&self, path: &Path) -> Result<()> { - let contents = serde_yaml::to_string(&self).context("Failed to serialize configuration")?; - - fs::write(path, contents) - .await - .context("Failed to write configuration to disk") - } -} diff --git a/backend/sync_server/src/config/database_config.rs b/backend/sync_server/src/config/database_config.rs deleted file mode 100644 index f1c92d9..0000000 --- a/backend/sync_server/src/config/database_config.rs +++ /dev/null @@ -1,48 +0,0 @@ -use std::{path::PathBuf, time::Duration}; - -use log::debug; -use serde::{Deserialize, Serialize}; -use serde_with::serde_as; - -use crate::consts::{ - DEFAULT_CURSOR_TIMEOUT, DEFAULT_DATABASES_DIRECTORY_PATH, DEFAULT_MAX_CONNECTIONS_PER_VAULT, -}; - -#[serde_with::serde_as] -#[derive(Debug, Deserialize, Serialize, Clone)] -pub struct DatabaseConfig { - #[serde(default = "default_databases_directory_path")] - pub databases_directory_path: PathBuf, - - #[serde(default = "default_max_connections_per_vault")] - pub max_connections_per_vault: u32, - - #[serde(default = "default_cursor_timeout", rename = "cursor_timeout_seconds")] - #[serde_as(as = "serde_with::DurationSeconds")] - pub cursor_timeout: Duration, -} - -fn default_databases_directory_path() -> PathBuf { - debug!("Using default databases directory path: {DEFAULT_DATABASES_DIRECTORY_PATH:?}"); - PathBuf::from(DEFAULT_DATABASES_DIRECTORY_PATH) -} - -fn default_max_connections_per_vault() -> u32 { - debug!("Using default max connections: {DEFAULT_MAX_CONNECTIONS_PER_VAULT}"); - DEFAULT_MAX_CONNECTIONS_PER_VAULT -} - -fn default_cursor_timeout() -> Duration { - debug!("Using default cursor timeout: {DEFAULT_CURSOR_TIMEOUT:?}"); - DEFAULT_CURSOR_TIMEOUT -} - -impl Default for DatabaseConfig { - fn default() -> Self { - Self { - databases_directory_path: default_databases_directory_path(), - max_connections_per_vault: default_max_connections_per_vault(), - cursor_timeout: default_cursor_timeout(), - } - } -} diff --git a/backend/sync_server/src/config/server_config.rs b/backend/sync_server/src/config/server_config.rs deleted file mode 100644 index ce922fb..0000000 --- a/backend/sync_server/src/config/server_config.rs +++ /dev/null @@ -1,50 +0,0 @@ -use log::debug; -use serde::{Deserialize, Serialize}; - -use crate::consts::{ - DEFAULT_HOST, DEFAULT_MAX_BODY_SIZE_MB, DEFAULT_MAX_CLIENTS_PER_VAULT, DEFAULT_PORT, - DEFAULT_RESPONSE_TIMEOUT_SECONDS, -}; - -#[derive(Debug, Deserialize, Serialize, Clone, Default)] -pub struct ServerConfig { - #[serde(default = "default_host")] - pub host: String, - - #[serde(default = "default_port")] - pub port: u16, - - #[serde(default = "default_max_body_size_mb")] - pub max_body_size_mb: usize, - - #[serde(default = "default_max_clients_per_vault")] - pub max_clients_per_vault: usize, - - #[serde(default = "default_response_timeout_seconds")] - pub response_timeout_seconds: u64, -} - -fn default_host() -> String { - debug!("Using default server host: {DEFAULT_HOST}"); - DEFAULT_HOST.to_owned() -} - -fn default_port() -> u16 { - debug!("Using default server port: {DEFAULT_PORT}"); - DEFAULT_PORT -} - -fn default_max_body_size_mb() -> usize { - debug!("Using default max body size (MB): {DEFAULT_MAX_BODY_SIZE_MB}"); - DEFAULT_MAX_BODY_SIZE_MB -} - -fn default_max_clients_per_vault() -> usize { - debug!("Using default max clients per vault: {DEFAULT_MAX_CLIENTS_PER_VAULT}"); - DEFAULT_MAX_CLIENTS_PER_VAULT -} - -fn default_response_timeout_seconds() -> u64 { - debug!("Using default response timeout (seconds): {DEFAULT_RESPONSE_TIMEOUT_SECONDS}"); - DEFAULT_RESPONSE_TIMEOUT_SECONDS -} diff --git a/backend/sync_server/src/config/user_config.rs b/backend/sync_server/src/config/user_config.rs deleted file mode 100644 index ed7ecc2..0000000 --- a/backend/sync_server/src/config/user_config.rs +++ /dev/null @@ -1,164 +0,0 @@ -use bimap::BiHashMap; -use rand::{Rng, distr::Alphanumeric, rng}; -use serde::{Deserialize, Deserializer, Serialize, de::Error}; - -use crate::app_state::database::models::VaultId; - -#[derive(Debug, Deserialize, Serialize, Clone)] -pub struct UserConfig { - #[serde(default = "default_users", deserialize_with = "validate_users")] - pub user_configs: Vec, -} - -fn validate_users<'de, D>(deserializer: D) -> Result, D::Error> -where - D: Deserializer<'de>, -{ - let users = Vec::::deserialize(deserializer)?; - - let mut user_token_map = BiHashMap::new(); - for user in &users { - if let Some(existing_name) = user_token_map.get_by_right(&user.token) { - return Err(D::Error::custom(format!( - "Duplicate user token found: '{}' for users '{}' and '{}'. User tokens must be \ - unique.", - user.token, existing_name, user.name - ))); - } - - if user_token_map.contains_left(&user.name) { - return Err(D::Error::custom(format!( - "Duplicate user name found: '{}'. User names must be unique.", - user.name - ))); - } - - user_token_map.insert(user.name.clone(), user.token.clone()); - } - - Ok(users) -} - -impl UserConfig { - pub fn get_user(&self, token: &str) -> Option<&User> { - self.user_configs.iter().find(|u| u.token == token) - } -} - -#[derive(Debug, Deserialize, Serialize, Clone)] -pub struct User { - pub name: String, - pub token: String, - pub vault_access: VaultAccess, -} - -impl Default for UserConfig { - fn default() -> Self { - Self { - user_configs: default_users(), - } - } -} - -#[derive(Debug, Deserialize, Serialize, Clone, Default)] -#[serde(rename_all = "snake_case", tag = "type")] -pub enum VaultAccess { - #[default] - AllowAccessToAll, - - AllowList(AllowListedVaults), -} - -#[derive(Debug, Deserialize, Serialize, Clone, Default)] -pub struct AllowListedVaults { - pub allowed: Vec, -} - -fn default_users() -> Vec { - vec![User { - name: "admin".to_owned(), - token: get_random_token(), - vault_access: VaultAccess::default(), - }] -} - -pub fn get_random_token() -> String { - rng() - .sample_iter(&Alphanumeric) - .take(64) - .map(char::from) - .collect() -} -#[cfg(test)] -mod tests { - use serde_json::json; - - use super::*; - - #[test] - fn test_validate_users_unique_names_and_tokens() { - let config_json = json!({ - "user_configs": [ - { - "name": "alice", - "token": "token1", - "vault_access": { "type": "allow_access_to_all" } - }, - { - "name": "bob", - "token": "token2", - "vault_access": { "type": "allow_access_to_all" } - } - ] - }); - - let config: Result = serde_json::from_value(config_json); - assert!(config.is_ok()); - } - - #[test] - fn test_validate_users_duplicate_names() { - let config_json = json!({ - "user_configs": [ - { - "name": "alice", - "token": "token1", - "vault_access": { "type": "allow_access_to_all" } - }, - { - "name": "alice", - "token": "token2", - "vault_access": { "type": "allow_access_to_all" } - } - ] - }); - - let config: Result = serde_json::from_value(config_json); - assert!(config.is_err()); - let err = config.unwrap_err().to_string(); - assert!(err.contains("Duplicate user name found")); - } - - #[test] - fn test_validate_users_duplicate_tokens() { - let config_json = json!({ - "user_configs": [ - { - "name": "alice", - "token": "token1", - "vault_access": { "type": "allow_access_to_all" } - }, - { - "name": "bob", - "token": "token1", - "vault_access": { "type": "allow_access_to_all" } - } - ] - }); - - let config: Result = serde_json::from_value(config_json); - assert!(config.is_err()); - let err = config.unwrap_err().to_string(); - assert!(err.contains("Duplicate user token found")); - } -} diff --git a/backend/sync_server/src/consts.rs b/backend/sync_server/src/consts.rs deleted file mode 100644 index df5a284..0000000 --- a/backend/sync_server/src/consts.rs +++ /dev/null @@ -1,13 +0,0 @@ -use std::time::Duration; - -pub const DEFAULT_CONFIG_PATH: &str = "config.yml"; - -pub const DEFAULT_DATABASES_DIRECTORY_PATH: &str = "databases"; -pub const DEFAULT_MAX_CONNECTIONS_PER_VAULT: u32 = 12; -pub const DEFAULT_CURSOR_TIMEOUT: Duration = Duration::from_secs(60); - -pub const DEFAULT_HOST: &str = "127.0.0.1"; -pub const DEFAULT_PORT: u16 = 3000; -pub const DEFAULT_MAX_BODY_SIZE_MB: usize = 4096; -pub const DEFAULT_RESPONSE_TIMEOUT_SECONDS: u64 = 60; -pub const DEFAULT_MAX_CLIENTS_PER_VAULT: usize = 256; diff --git a/backend/sync_server/src/errors.rs b/backend/sync_server/src/errors.rs deleted file mode 100644 index 987c301..0000000 --- a/backend/sync_server/src/errors.rs +++ /dev/null @@ -1,140 +0,0 @@ -use std::fmt::Display; - -use axum::{ - Json, - http::StatusCode, - response::{IntoResponse, Response}, -}; -use log::{debug, error}; -use serde::Serialize; -use thiserror::Error; -use ts_rs::TS; - -#[derive(Error, Debug)] -pub enum SyncServerError { - #[error("Initialisation error: {0}")] - InitError(#[source] anyhow::Error), - - #[error("Client error: {0:?}")] - ClientError(#[source] anyhow::Error), - - #[error("Server error: {0:?}")] - ServerError(#[source] anyhow::Error), - - #[error("Not found: {0}")] - NotFound(#[source] anyhow::Error), - - #[error("Unauthorized: {0}")] - Unauthenticated(#[source] anyhow::Error), - - #[error("Permission denied error: {0}")] - PermissionDeniedError(#[source] anyhow::Error), -} - -impl SyncServerError { - pub fn serialize(&self) -> SerializedError { - match self { - Self::InitError(error) - | Self::ClientError(error) - | Self::ServerError(error) - | Self::NotFound(error) - | Self::Unauthenticated(error) - | Self::PermissionDeniedError(error) => error.into(), - } - } -} - -#[derive(TS, Debug, Clone, Serialize)] -#[serde(rename_all = "camelCase")] -#[ts(export)] -pub struct SerializedError { - pub error_type: &'static str, - pub message: String, - pub causes: Vec, -} - -impl Display for SerializedError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - if !self.causes.is_empty() { - write!(f, "\nCauses:\n")?; - for cause in &self.causes { - write!(f, "{}", &format!("- {cause}\n"))?; - } - } - - Ok(()) - } -} - -impl IntoResponse for SyncServerError { - fn into_response(self) -> Response { - let body = Json(self.serialize()); - - match self { - Self::InitError(_) | Self::ServerError(_) => { - (StatusCode::INTERNAL_SERVER_ERROR, body).into_response() - } - Self::ClientError(_) => (StatusCode::BAD_REQUEST, body).into_response(), - Self::NotFound(_) => (StatusCode::NOT_FOUND, body).into_response(), - Self::Unauthenticated(_) => (StatusCode::UNAUTHORIZED, body).into_response(), - Self::PermissionDeniedError(_) => (StatusCode::FORBIDDEN, body).into_response(), - } - } -} - -impl From<&anyhow::Error> for SerializedError { - fn from(error: &anyhow::Error) -> SerializedError { - let mut causes = vec![]; - let mut current_error = error.source(); - while let Some(error) = current_error { - causes.push(error.to_string()); - current_error = error.source(); - } - - SerializedError { - error_type: error.downcast_ref::().map_or( - "UnknownError", - |e| match e { - SyncServerError::InitError(_) => "InitError", - SyncServerError::ClientError(_) => "ClientError", - SyncServerError::ServerError(_) => "ServerError", - SyncServerError::NotFound(_) => "NotFound", - SyncServerError::Unauthenticated(_) => "Unauthenticated", - SyncServerError::PermissionDeniedError(_) => "PermissionDeniedError", - }, - ), - message: error.to_string(), - causes, - } - } -} - -pub fn init_error(error: anyhow::Error) -> SyncServerError { - debug!("Initialization error: {error:?}"); - SyncServerError::InitError(error) -} - -pub fn server_error(error: anyhow::Error) -> SyncServerError { - debug!("Server error: {error:?}"); - SyncServerError::ServerError(error) -} - -pub fn client_error(error: anyhow::Error) -> SyncServerError { - debug!("Client error: {error:?}"); - SyncServerError::ClientError(error) -} - -pub fn not_found_error(error: anyhow::Error) -> SyncServerError { - debug!("Not found: {error:?}"); - SyncServerError::NotFound(error) -} - -pub fn unauthenticated_error(error: anyhow::Error) -> SyncServerError { - debug!("Unauthenticated user: {error:?}"); - SyncServerError::Unauthenticated(error) -} - -pub fn permission_denied_error(error: anyhow::Error) -> SyncServerError { - debug!("Permission denied: {error:?}"); - SyncServerError::PermissionDeniedError(error) -} diff --git a/backend/sync_server/src/main.rs b/backend/sync_server/src/main.rs deleted file mode 100644 index 8355654..0000000 --- a/backend/sync_server/src/main.rs +++ /dev/null @@ -1,86 +0,0 @@ -mod app_state; -mod cli; -mod config; -mod consts; -mod errors; -mod server; -mod utils; - -use std::process::ExitCode; - -use anyhow::{Context as _, Result}; -use clap::Parser; -use cli::args::Args; -use errors::{SyncServerError, init_error}; -use log::info; -use server::create_server; -use tracing_subscriber::{EnvFilter, fmt::format, util::SubscriberInitExt}; - -#[tokio::main] -async fn main() -> ExitCode { - let args = Args::parse(); - - let mut result = set_up_logging(&args); - - if result.is_ok() { - result = start_server(args).await; - } - - match result { - Ok(()) => ExitCode::SUCCESS, - Err(e) => { - eprintln!("{}", e.serialize()); - ExitCode::FAILURE - } - } -} - -fn set_up_logging(args: &Args) -> Result<(), SyncServerError> { - let level_filter = match args.verbose.log_level_filter() { - // We don't want to allow disabling all logging - log::LevelFilter::Off | log::LevelFilter::Error => tracing::Level::ERROR, - log::LevelFilter::Warn => tracing::Level::WARN, - log::LevelFilter::Info => tracing::Level::INFO, - log::LevelFilter::Debug => tracing::Level::DEBUG, - log::LevelFilter::Trace => tracing::Level::TRACE, - }; - - let env_filter = EnvFilter::builder() - .with_default_directive(level_filter.into()) - .from_env() - .context("Failed to create logging env filter") - .map_err(init_error)?; - - let use_colors = args.color.use_colors(); - - let is_debug_mode = args.verbose.log_level_filter() >= log::LevelFilter::Debug; - - tracing_subscriber::fmt() - .with_ansi(use_colors) - .with_env_filter(env_filter) - .event_format( - format() - .without_time() - .with_target(is_debug_mode) - .with_line_number(is_debug_mode) - .compact(), - ) - .finish() - .try_init() - .context("Failed to initialise tracing") - .map_err(init_error)?; - - Ok(()) -} - -async fn start_server(args: Args) -> Result<(), SyncServerError> { - info!( - "Starting VaultLink server version {}", - env!("CARGO_PKG_VERSION") - ); - - create_server(args.config_path) - .await - .context("Failed to start server") - .map_err(init_error) -} diff --git a/backend/sync_server/src/server.rs b/backend/sync_server/src/server.rs deleted file mode 100644 index 3f659c9..0000000 --- a/backend/sync_server/src/server.rs +++ /dev/null @@ -1,184 +0,0 @@ -pub mod auth; -mod create_document; -mod delete_document; -mod device_id_header; -mod fetch_document_version; -mod fetch_document_version_content; -mod fetch_latest_document_version; -mod fetch_latest_documents; -mod index; -mod ping; -mod requests; -mod responses; -mod update_document; -mod websocket; - -use std::{ffi::OsString, time::Duration}; - -use anyhow::{Context as _, Result, anyhow}; -use auth::auth_middleware; -use axum::{ - Router, - extract::{DefaultBodyLimit, Request}, - http::{self, HeaderValue, Method}, - middleware, - response::IntoResponse, - routing::{IntoMakeService, delete, get, post, put}, -}; -use device_id_header::DEVICE_ID_HEADER_NAME; -use log::info; -use tokio::signal; -use tower_http::{ - LatencyUnit, - cors::CorsLayer, - limit::RequestBodyLimitLayer, - timeout::TimeoutLayer, - trace::{ - DefaultOnBodyChunk, DefaultOnEos, DefaultOnFailure, DefaultOnRequest, DefaultOnResponse, - TraceLayer, - }, -}; -use tracing::{Level, info_span}; - -use crate::{ - app_state::AppState, - config::server_config::ServerConfig, - errors::{client_error, not_found_error}, -}; - -pub async fn create_server(config_path: Option) -> Result<()> { - let app_state = AppState::try_new(config_path) - .await - .context("Failed to initialise app state")?; - - let server_config = app_state.config.server.clone(); - - let app = Router::new() - .nest("/", get_authed_routes(app_state.clone())) - .route("/", get(index::index)) - .route("/vaults/:vault_id/ping", get(ping::ping)) - .route("/vaults/:vault_id/ws", get(websocket::websocket_handler)) - .layer(DefaultBodyLimit::disable()) - .layer(RequestBodyLimitLayer::new( - app_state.config.server.max_body_size_mb * 1024 * 1024, - )) - .layer(TimeoutLayer::new(Duration::from_secs( - server_config.response_timeout_seconds, - ))) - .layer( - CorsLayer::new() - .allow_origin("*".parse::().expect("Failed to parse origin")) - .allow_headers([ - http::header::CONTENT_TYPE, - http::header::AUTHORIZATION, - DEVICE_ID_HEADER_NAME.clone(), - ]) - .allow_methods([Method::GET, Method::POST, Method::PUT, Method::DELETE]), - ) - .layer( - TraceLayer::new_for_http() - .make_span_with(|request: &Request<_>| { - info_span!( - "http", - method = ?request.method(), - uri = ?request.uri(), - ) - }) - .on_request(DefaultOnRequest::new().level(Level::INFO)) - .on_response( - DefaultOnResponse::new() - .level(Level::INFO) - .latency_unit(LatencyUnit::Millis), - ) - .on_body_chunk(DefaultOnBodyChunk::new()) - .on_eos(DefaultOnEos::new()) - .on_failure(DefaultOnFailure::new().level(Level::ERROR)), - ) - .with_state(app_state) - .fallback(handle_404) - .fallback(handle_405) - .into_make_service(); - - start_server(app, &server_config).await -} - -fn get_authed_routes(app_state: AppState) -> Router { - Router::new() - .route( - "/vaults/:vault_id/documents", - get(fetch_latest_documents::fetch_latest_documents), - ) - .route( - "/vaults/:vault_id/documents", - post(create_document::create_document), - ) - .route( - "/vaults/:vault_id/documents/:document_id", - get(fetch_latest_document_version::fetch_latest_document_version), - ) - .route( - "/vaults/:vault_id/documents/:document_id", - put(update_document::update_document), - ) - .route( - "/vaults/:vault_id/documents/:document_id/versions/:version_id", - put(fetch_document_version::fetch_document_version), - ) - .route( - "/vaults/:vault_id/documents/:document_id/versions/:version_id/content", - put(fetch_document_version_content::fetch_document_version_content), - ) - .route( - "/vaults/:vault_id/documents/:document_id", - delete(delete_document::delete_document), - ) - .layer(middleware::from_fn_with_state(app_state, auth_middleware)) -} - -async fn start_server(app: IntoMakeService, config: &ServerConfig) -> Result<()> { - let address = format!("{}:{}", config.host, config.port); - let listener = tokio::net::TcpListener::bind(address.clone()) - .await - .with_context(|| format!("Failed to bind to address: {address}"))?; - - info!( - "Listening on http://{}", - listener - .local_addr() - .context("Failed to get local address")? - ); - - axum::serve(listener, app) - .with_graceful_shutdown(shutdown_signal()) - .tcp_nodelay(true) - .await - .context("Failed to start server") -} - -async fn shutdown_signal() { - let ctrl_c = async { - signal::ctrl_c() - .await - .expect("failed to install Ctrl+C handler"); - }; - - #[cfg(unix)] - let terminate = async { - signal::unix::signal(signal::unix::SignalKind::terminate()) - .expect("failed to install signal handler") - .recv() - .await; - }; - - #[cfg(not(unix))] - let terminate = std::future::pending::<()>(); - - tokio::select! { - () = ctrl_c => {}, - () = terminate => {}, - } -} - -async fn handle_404() -> impl IntoResponse { not_found_error(anyhow!("Page not found")) } - -async fn handle_405() -> impl IntoResponse { client_error(anyhow!("Method not allowed")) } diff --git a/backend/sync_server/src/server/assets/index.html b/backend/sync_server/src/server/assets/index.html deleted file mode 100644 index ef9c5a6..0000000 --- a/backend/sync_server/src/server/assets/index.html +++ /dev/null @@ -1,9 +0,0 @@ - - - - VaultLink - - -

VaultLink server

- - diff --git a/backend/sync_server/src/server/auth.rs b/backend/sync_server/src/server/auth.rs deleted file mode 100644 index d27c16e..0000000 --- a/backend/sync_server/src/server/auth.rs +++ /dev/null @@ -1,70 +0,0 @@ -use std::collections::HashMap; - -use axum::{ - extract::{Path, Request, State}, - middleware::Next, - response::Response, -}; -use axum_extra::{ - TypedHeader, - headers::{Authorization, authorization::Bearer}, -}; -use log::info; - -use crate::{ - app_state::{AppState, database::models::VaultId}, - config::user_config::{AllowListedVaults, User, VaultAccess}, - errors::{SyncServerError, permission_denied_error, unauthenticated_error}, - utils::normalize::normalize_string, -}; - -pub async fn auth_middleware( - State(state): State, - Path(path_params): Path>, - TypedHeader(auth_header): TypedHeader>, - mut req: Request, - next: Next, -) -> Result { - let token = auth_header.token().trim(); - let vault_id = normalize_string( - path_params - .get("vault_id") - .ok_or_else(|| unauthenticated_error(anyhow::anyhow!("Missing vault_id")))?, - ); - - let user = auth(&state, token, &vault_id)?; - - req.extensions_mut().insert(user); - - Ok(next.run(req).await) -} - -pub fn auth(state: &AppState, token: &str, vault_id: &VaultId) -> Result { - let user = state - .config - .users - .get_user(token) - .cloned() - .ok_or_else(|| unauthenticated_error(anyhow::anyhow!("Invalid token")))?; - - if match user.vault_access { - VaultAccess::AllowAccessToAll => true, - VaultAccess::AllowList(AllowListedVaults { ref allowed }) => allowed.contains(vault_id), - } { - info!( - "User '{}' is authenticated and is authorised to access to vault '{vault_id}'", - user.name - ); - - Ok(user) - } else { - info!( - "User '{}' is authenticated but is not authorised to access vault '{vault_id}'", - user.name - ); - - Err(permission_denied_error(anyhow::anyhow!( - "Permission denied for vault `{vault_id}`" - ))) - } -} diff --git a/backend/sync_server/src/server/create_document.rs b/backend/sync_server/src/server/create_document.rs deleted file mode 100644 index 7018d8c..0000000 --- a/backend/sync_server/src/server/create_document.rs +++ /dev/null @@ -1,95 +0,0 @@ -use anyhow::Context as _; -use axum::{ - Extension, Json, - extract::{Path, State}, -}; -use axum_extra::TypedHeader; -use axum_typed_multipart::TypedMultipart; -use serde::Deserialize; - -use super::{device_id_header::DeviceIdHeader, requests::CreateDocumentVersion}; -use crate::{ - app_state::{ - AppState, - database::models::{DocumentVersionWithoutContent, StoredDocumentVersion, VaultId}, - }, - config::user_config::User, - errors::{SyncServerError, client_error, server_error}, - utils::{normalize::normalize, sanitize_path::sanitize_path}, -}; - -#[derive(Deserialize)] -pub struct CreateDocumentPathParams { - #[serde(deserialize_with = "normalize")] - vault_id: VaultId, -} - -/// Create a new document in case a document with the same doesn't exist -/// already. If a document with the same path exists, a new version is created -/// with their content merged. -#[axum::debug_handler] -pub async fn create_document( - Path(CreateDocumentPathParams { vault_id }): Path, - Extension(user): Extension, - TypedHeader(device_id): TypedHeader, - State(state): State, - TypedMultipart(request): TypedMultipart, -) -> Result, SyncServerError> { - let mut transaction = state - .database - .create_write_transaction(&vault_id) - .await - .map_err(server_error)?; - - let document_id = match request.document_id { - Some(document_id) => { - let existing_version = state - .database - .get_latest_document(&vault_id, &document_id, Some(&mut transaction)) - .await - .map_err(server_error)?; - - if existing_version.is_some() { - return Err(client_error(anyhow::anyhow!( - "Document with the same ID already exists" - ))); - } - - document_id - } - None => uuid::Uuid::new_v4(), - }; - - let last_update_id = state - .database - .get_max_update_id_in_vault(&vault_id, Some(&mut transaction)) - .await - .map_err(server_error)?; - - let sanitized_relative_path = sanitize_path(&request.relative_path); - - let new_version = StoredDocumentVersion { - vault_update_id: last_update_id + 1, - document_id, - relative_path: sanitized_relative_path, - content: request.content.contents.to_vec(), - updated_date: chrono::Utc::now(), - is_deleted: false, - user_id: user.name, - device_id: device_id.0, - }; - - state - .database - .insert_document_version(&vault_id, &new_version, Some(&mut transaction)) - .await - .map_err(server_error)?; - - transaction - .commit() - .await - .context("Failed to commit successful transaction") - .map_err(server_error)?; - - Ok(Json(new_version.into())) -} diff --git a/backend/sync_server/src/server/delete_document.rs b/backend/sync_server/src/server/delete_document.rs deleted file mode 100644 index 5b7cd6e..0000000 --- a/backend/sync_server/src/server/delete_document.rs +++ /dev/null @@ -1,84 +0,0 @@ -use anyhow::Context as _; -use axum::{ - Extension, Json, - extract::{Path, State}, -}; -use axum_extra::TypedHeader; -use serde::Deserialize; - -use super::{device_id_header::DeviceIdHeader, requests::DeleteDocumentVersion}; -use crate::{ - app_state::{ - AppState, - database::models::{ - DocumentId, DocumentVersionWithoutContent, StoredDocumentVersion, VaultId, - }, - }, - config::user_config::User, - errors::{SyncServerError, server_error}, - utils::{normalize::normalize, sanitize_path::sanitize_path}, -}; - -#[derive(Deserialize)] -pub struct DeleteDocumentPathParams { - #[serde(deserialize_with = "normalize")] - vault_id: VaultId, - - document_id: DocumentId, -} - -#[axum::debug_handler] -pub async fn delete_document( - Path(DeleteDocumentPathParams { - vault_id, - document_id, - }): Path, - Extension(user): Extension, - TypedHeader(device_id): TypedHeader, - State(state): State, - Json(request): Json, -) -> Result, SyncServerError> { - let mut transaction = state - .database - .create_write_transaction(&vault_id) - .await - .map_err(server_error)?; - - let last_update_id = state - .database - .get_max_update_id_in_vault(&vault_id, Some(&mut transaction)) - .await - .map_err(server_error)?; - - let latest_content = state - .database - .get_latest_document(&vault_id, &document_id, Some(&mut transaction)) - .await - .map_err(server_error)? - .map_or_else(Vec::new, |version| version.content); // in case the document has never existed before deleting it - - let new_version = StoredDocumentVersion { - vault_update_id: last_update_id + 1, - document_id, - relative_path: sanitize_path(&request.relative_path), - content: latest_content, // copy the content from the latest version - updated_date: chrono::Utc::now(), - is_deleted: true, - user_id: user.name, - device_id: device_id.0, - }; - - state - .database - .insert_document_version(&vault_id, &new_version, Some(&mut transaction)) - .await - .map_err(server_error)?; - - transaction - .commit() - .await - .context("Failed to commit successful transaction") - .map_err(server_error)?; - - Ok(Json(new_version.into())) -} diff --git a/backend/sync_server/src/server/device_id_header.rs b/backend/sync_server/src/server/device_id_header.rs deleted file mode 100644 index be36c8d..0000000 --- a/backend/sync_server/src/server/device_id_header.rs +++ /dev/null @@ -1,33 +0,0 @@ -use axum_extra::headers; -use headers::{Header, HeaderName, HeaderValue}; - -pub struct DeviceIdHeader(pub String); - -pub static DEVICE_ID_HEADER_NAME: HeaderName = HeaderName::from_static("device-id"); - -impl Header for DeviceIdHeader { - fn name() -> &'static HeaderName { &DEVICE_ID_HEADER_NAME } - - fn decode<'i, I>(values: &mut I) -> Result - where - I: Iterator, - { - let value = values.next().ok_or_else(headers::Error::invalid)?; - - Ok(DeviceIdHeader( - value - .to_str() - .map_err(|_| headers::Error::invalid())? - .to_owned(), - )) - } - - fn encode(&self, values: &mut E) - where - E: Extend, - { - let value = HeaderValue::from_static(Box::leak(self.0.to_string().into_boxed_str())); - - values.extend(std::iter::once(value)); - } -} diff --git a/backend/sync_server/src/server/fetch_document_version.rs b/backend/sync_server/src/server/fetch_document_version.rs deleted file mode 100644 index 5b571a7..0000000 --- a/backend/sync_server/src/server/fetch_document_version.rs +++ /dev/null @@ -1,57 +0,0 @@ -use anyhow::anyhow; -use axum::{ - Json, - extract::{Path, State}, -}; -use serde::Deserialize; - -use crate::{ - app_state::{ - AppState, - database::models::{DocumentId, DocumentVersion, VaultId, VaultUpdateId}, - }, - errors::{SyncServerError, not_found_error, server_error}, - utils::normalize::normalize, -}; - -#[derive(Deserialize)] -pub struct FetchDocumentVersionPathParams { - #[serde(deserialize_with = "normalize")] - vault_id: VaultId, - - document_id: DocumentId, - vault_update_id: VaultUpdateId, -} - -#[axum::debug_handler] -pub async fn fetch_document_version( - Path(FetchDocumentVersionPathParams { - vault_id, - document_id, - vault_update_id, - }): Path, - State(state): State, -) -> Result, SyncServerError> { - let result = state - .database - .get_document_version(&vault_id, vault_update_id, None) - .await - .map_err(server_error)? - .map_or_else( - || { - Err(not_found_error(anyhow!( - "Document with vault update id `{vault_update_id}` not found", - ))) - }, - Ok, - )?; - - if result.document_id != document_id { - return Err(not_found_error(anyhow!( - "Document with document id `{document_id}` does not have a version with id \ - `{vault_update_id}`", - ))); - } - - Ok(Json(result.into())) -} diff --git a/backend/sync_server/src/server/fetch_document_version_content.rs b/backend/sync_server/src/server/fetch_document_version_content.rs deleted file mode 100644 index a419b7b..0000000 --- a/backend/sync_server/src/server/fetch_document_version_content.rs +++ /dev/null @@ -1,57 +0,0 @@ -use anyhow::anyhow; -use axum::{ - body::Bytes, - extract::{Path, State}, -}; -use serde::Deserialize; - -use crate::{ - app_state::{ - AppState, - database::models::{DocumentId, VaultId, VaultUpdateId}, - }, - errors::{SyncServerError, not_found_error, server_error}, - utils::normalize::normalize, -}; - -#[derive(Deserialize)] -pub struct FetchDocumentVersionContentPathParams { - #[serde(deserialize_with = "normalize")] - vault_id: VaultId, - - document_id: DocumentId, - vault_update_id: VaultUpdateId, -} - -#[axum::debug_handler] -pub async fn fetch_document_version_content( - Path(FetchDocumentVersionContentPathParams { - vault_id, - document_id, - vault_update_id, - }): Path, - State(state): State, -) -> Result { - let result = state - .database - .get_document_version(&vault_id, vault_update_id, None) - .await - .map_err(server_error)? - .map_or_else( - || { - Err(not_found_error(anyhow!( - "Document with vault update id `{vault_update_id}` not found", - ))) - }, - Ok, - )?; - - if result.document_id != document_id { - return Err(not_found_error(anyhow!( - "Document with document id `{document_id}` does not have a version with id \ - `{vault_update_id}`", - ))); - } - - Ok(result.content.into()) -} diff --git a/backend/sync_server/src/server/fetch_latest_document_version.rs b/backend/sync_server/src/server/fetch_latest_document_version.rs deleted file mode 100644 index 07f0786..0000000 --- a/backend/sync_server/src/server/fetch_latest_document_version.rs +++ /dev/null @@ -1,48 +0,0 @@ -use anyhow::anyhow; -use axum::{ - Json, - extract::{Path, State}, -}; -use serde::Deserialize; - -use crate::{ - app_state::{ - AppState, - database::models::{DocumentId, DocumentVersion, VaultId}, - }, - errors::{SyncServerError, not_found_error, server_error}, - utils::normalize::normalize, -}; - -#[derive(Deserialize)] -pub struct FetchLatestDocumentVersionPathParams { - #[serde(deserialize_with = "normalize")] - vault_id: VaultId, - - document_id: DocumentId, -} - -#[axum::debug_handler] -pub async fn fetch_latest_document_version( - Path(FetchLatestDocumentVersionPathParams { - vault_id, - document_id, - }): Path, - State(state): State, -) -> Result, SyncServerError> { - let latest_version = state - .database - .get_latest_document(&vault_id, &document_id, None) - .await - .map_err(server_error)? - .map_or_else( - || { - Err(not_found_error(anyhow!( - "Document with id `{document_id}` not found", - ))) - }, - Ok, - )?; - - Ok(Json(latest_version.into())) -} diff --git a/backend/sync_server/src/server/fetch_latest_documents.rs b/backend/sync_server/src/server/fetch_latest_documents.rs deleted file mode 100644 index 6101f55..0000000 --- a/backend/sync_server/src/server/fetch_latest_documents.rs +++ /dev/null @@ -1,56 +0,0 @@ -use axum::{ - Json, - extract::{Path, Query, State}, -}; -use serde::Deserialize; - -use super::responses::FetchLatestDocumentsResponse; -use crate::{ - app_state::{ - AppState, - database::models::{VaultId, VaultUpdateId}, - }, - errors::{SyncServerError, server_error}, - utils::normalize::normalize, -}; - -#[derive(Deserialize)] -pub struct FetchLatestDocumentsPathParams { - #[serde(deserialize_with = "normalize")] - vault_id: VaultId, -} - -#[derive(Deserialize)] -pub struct QueryParams { - since_update_id: Option, -} - -#[axum::debug_handler] -pub async fn fetch_latest_documents( - Path(FetchLatestDocumentsPathParams { vault_id }): Path, - Query(QueryParams { since_update_id }): Query, - State(state): State, -) -> Result, SyncServerError> { - let documents = if let Some(since_update_id) = since_update_id { - state - .database - .get_latest_documents_since(&vault_id, since_update_id, None) - .await - .map_err(server_error) - } else { - state - .database - .get_latest_documents(&vault_id, None) - .await - .map_err(server_error) - }?; - - Ok(Json(FetchLatestDocumentsResponse { - last_update_id: documents - .iter() - .map(|doc| doc.vault_update_id) - .max() - .unwrap_or(since_update_id.unwrap_or(0)), - latest_documents: documents, - })) -} diff --git a/backend/sync_server/src/server/index.rs b/backend/sync_server/src/server/index.rs deleted file mode 100644 index 64b053f..0000000 --- a/backend/sync_server/src/server/index.rs +++ /dev/null @@ -1,7 +0,0 @@ -use axum::response::{Html, IntoResponse}; - -pub async fn index() -> impl IntoResponse { - const HTML_CONTENT: &str = include_str!("./assets/index.html"); - let html_content = HTML_CONTENT; - Html(html_content) -} diff --git a/backend/sync_server/src/server/ping.rs b/backend/sync_server/src/server/ping.rs deleted file mode 100644 index 620ef0d..0000000 --- a/backend/sync_server/src/server/ping.rs +++ /dev/null @@ -1,37 +0,0 @@ -use axum::{ - Json, - extract::{Path, State}, -}; -use axum_extra::{ - TypedHeader, - headers::{Authorization, authorization::Bearer}, -}; -use serde::Deserialize; - -use super::{auth::auth, responses::PingResponse}; -use crate::{ - app_state::{AppState, database::models::VaultId}, - errors::SyncServerError, - utils::normalize::normalize, -}; - -#[derive(Deserialize)] -pub struct PingPathParams { - #[serde(deserialize_with = "normalize")] - vault_id: VaultId, -} - -#[axum::debug_handler] -pub async fn ping( - maybe_auth_header: Option>>, - Path(PingPathParams { vault_id }): Path, - State(state): State, -) -> Result, SyncServerError> { - let is_authenticated = maybe_auth_header - .is_some_and(|auth_header| auth(&state, auth_header.token(), &vault_id).is_ok()); - - Ok(Json(PingResponse { - server_version: env!("CARGO_PKG_VERSION").to_owned(), - is_authenticated, - })) -} diff --git a/backend/sync_server/src/server/requests.rs b/backend/sync_server/src/server/requests.rs deleted file mode 100644 index 9d1e478..0000000 --- a/backend/sync_server/src/server/requests.rs +++ /dev/null @@ -1,39 +0,0 @@ -use axum::body::Bytes; -use axum_typed_multipart::{FieldData, TryFromMultipart}; -use serde::{self, Deserialize}; -use ts_rs::TS; - -use crate::app_state::database::models::{DocumentId, VaultUpdateId}; - -#[derive(TS, Debug, TryFromMultipart)] -#[ts(export)] -pub struct CreateDocumentVersion { - /// The client can decide the document id (if it wishes to) in order - /// to help with syncing. If the client does not provide a document id, - /// the server will generate one. If the client provides a document id - /// it must not already exist in the database. - pub document_id: Option, - pub relative_path: String, - - #[ts(as = "Vec")] - #[form_data(limit = "unlimited")] - pub content: FieldData, -} - -#[derive(TS, Debug, TryFromMultipart)] -#[ts(export)] -pub struct UpdateDocumentVersion { - pub parent_version_id: VaultUpdateId, - pub relative_path: String, - - #[ts(as = "Vec")] - #[form_data(limit = "unlimited")] - pub content: FieldData, -} - -#[derive(TS, Debug, Deserialize)] -#[serde(rename_all = "camelCase")] -#[ts(export)] -pub struct DeleteDocumentVersion { - pub relative_path: String, -} diff --git a/backend/sync_server/src/server/responses.rs b/backend/sync_server/src/server/responses.rs deleted file mode 100644 index 5cfaa5d..0000000 --- a/backend/sync_server/src/server/responses.rs +++ /dev/null @@ -1,45 +0,0 @@ -use serde::{self, Serialize}; -use ts_rs::TS; - -use crate::app_state::database::models::{ - DocumentVersion, DocumentVersionWithoutContent, VaultUpdateId, -}; - -/// Response to a ping request. -#[derive(TS, Debug, Clone, Serialize)] -#[serde(rename_all = "camelCase")] -#[ts(export)] -pub struct PingResponse { - /// Semantic version of the server. - pub server_version: String, - - /// Whether the client is authenticated based on the sent Authorization - /// header. - pub is_authenticated: bool, -} - -/// Response to a fetch latest documents request. -#[derive(TS, Debug, Clone, Serialize)] -#[serde(rename_all = "camelCase")] -#[ts(export)] -pub struct FetchLatestDocumentsResponse { - pub latest_documents: Vec, - - /// The update ID of the latest document in the response. - pub last_update_id: VaultUpdateId, -} - -/// Response to an update document request. -#[derive(TS, Debug, Clone, Serialize)] -#[serde(tag = "type")] -#[ts(export)] -pub enum DocumentUpdateResponse { - /// Returned when the created/updated document's content is the same as was - /// sent in the create/update request and thus the response doesn't contain - /// the content because the client must already have it. - FastForwardUpdate(DocumentVersionWithoutContent), - - /// Returned when the created/updated document's content is different from - /// what was sent in the create/update request. - MergingUpdate(DocumentVersion), -} diff --git a/backend/sync_server/src/server/update_document.rs b/backend/sync_server/src/server/update_document.rs deleted file mode 100644 index a3ab25e..0000000 --- a/backend/sync_server/src/server/update_document.rs +++ /dev/null @@ -1,179 +0,0 @@ -use anyhow::{Context as _, anyhow}; -use axum::{ - Extension, Json, - extract::{Path, State}, -}; -use axum_extra::TypedHeader; -use axum_typed_multipart::TypedMultipart; -use log::info; -use serde::Deserialize; -use sync_lib::{is_file_type_mergable, merge}; - -use super::{ - device_id_header::DeviceIdHeader, requests::UpdateDocumentVersion, - responses::DocumentUpdateResponse, -}; -use crate::{ - app_state::{ - AppState, - database::models::{DocumentId, StoredDocumentVersion, VaultId}, - }, - config::user_config::User, - errors::{SyncServerError, not_found_error, server_error}, - utils::{dedup_paths::dedup_paths, normalize::normalize, sanitize_path::sanitize_path}, -}; - -#[derive(Deserialize)] -pub struct UpdateDocumentPathParams { - #[serde(deserialize_with = "normalize")] - vault_id: VaultId, - - document_id: DocumentId, -} - -#[axum::debug_handler] -#[allow(clippy::too_many_lines)] -pub async fn update_document( - Path(UpdateDocumentPathParams { - vault_id, - document_id, - }): Path, - Extension(user): Extension, - TypedHeader(device_id): TypedHeader, - State(state): State, - TypedMultipart(request): TypedMultipart, -) -> Result, SyncServerError> { - // No need for a transaction as document versions are immutable - let parent_document = state - .database - .get_document_version(&vault_id, request.parent_version_id, None) - .await - .map_err(server_error)? - .map_or_else( - || { - Err(not_found_error(anyhow!( - "Parent version with id `{}` not found", - request.parent_version_id - ))) - }, - Ok, - )?; - - let sanitized_relative_path = sanitize_path(&request.relative_path); - - let mut transaction = state - .database - .create_write_transaction(&vault_id) - .await - .map_err(server_error)?; - - let last_update_id = state - .database - .get_max_update_id_in_vault(&vault_id, Some(&mut transaction)) - .await - .map_err(server_error)?; - - let latest_version = state - .database - .get_latest_document(&vault_id, &document_id, Some(&mut transaction)) - .await - .map_err(server_error)? - .map_or_else( - || { - Err(not_found_error(anyhow!( - "Document with id `{document_id}` not found", - ))) - }, - Ok, - )?; - - if latest_version.is_deleted { - transaction - .rollback() - .await - .context("Failed to roll back transaction") - .map_err(server_error)?; - - return Ok(Json(DocumentUpdateResponse::FastForwardUpdate( - latest_version.into(), - ))); - } - - let content = request.content.contents.to_vec(); - - // Return the latest version if the content and path are the same as the latest - // version - if content == latest_version.content && sanitized_relative_path == latest_version.relative_path - { - info!("Document content is the same as the latest version, skipping update"); - transaction - .rollback() - .await - .context("Failed to roll back transaction") - .map_err(server_error)?; - - return Ok(Json(DocumentUpdateResponse::FastForwardUpdate( - latest_version.into(), - ))); - } - - let merged_content = if is_file_type_mergable(&sanitized_relative_path) { - merge(&parent_document.content, &latest_version.content, &content) - } else { - content.clone() - }; - - let is_different_from_request_content = merged_content != content; - - // We can only update the relative path if we're the first one to do so - let new_relative_path = if parent_document.relative_path == latest_version.relative_path - && latest_version.relative_path != sanitized_relative_path - { - let mut new_relative_path = String::default(); - for candidate in dedup_paths(&sanitized_relative_path) { - if state - .database - .get_latest_document_by_path(&vault_id, &candidate, Some(&mut transaction)) - .await - .map_err(server_error)? - .is_none() - { - new_relative_path = candidate; - break; - } - } - - new_relative_path - } else { - latest_version.relative_path.clone() - }; - - let new_version = StoredDocumentVersion { - document_id, - vault_update_id: last_update_id + 1, - relative_path: new_relative_path, - content: merged_content, - updated_date: chrono::Utc::now(), - is_deleted: false, - user_id: user.name, - device_id: device_id.0, - }; - - state - .database - .insert_document_version(&vault_id, &new_version, Some(&mut transaction)) - .await - .map_err(server_error)?; - - transaction - .commit() - .await - .context("Failed to commit successful transaction") - .map_err(server_error)?; - - Ok(Json(if is_different_from_request_content { - DocumentUpdateResponse::MergingUpdate(new_version.into()) - } else { - DocumentUpdateResponse::FastForwardUpdate(new_version.into()) - })) -} diff --git a/backend/sync_server/src/server/websocket.rs b/backend/sync_server/src/server/websocket.rs deleted file mode 100644 index e9dd886..0000000 --- a/backend/sync_server/src/server/websocket.rs +++ /dev/null @@ -1,181 +0,0 @@ -use anyhow::Context; -use axum::{ - extract::{ - Path, State, - ws::{Message, WebSocket, WebSocketUpgrade}, - }, - response::Response, -}; -use futures::stream::StreamExt; -use log::{debug, info}; -use serde::Deserialize; - -use crate::{ - app_state::{ - AppState, - database::models::VaultId, - websocket::{ - models::{ - CursorPositionFromServer, WebSocketClientMessage, WebSocketServerMessage, - WebSocketVaultUpdate, - }, - utils::{ - get_authenticated_handshake, get_unseen_documents, send_update_over_websocket, - }, - }, - }, - errors::{SyncServerError, client_error, server_error}, - utils::normalize::normalize, -}; - -#[derive(Deserialize)] -pub struct WebSocketPathParams { - #[serde(deserialize_with = "normalize")] - vault_id: VaultId, -} - -pub async fn websocket_handler( - ws: WebSocketUpgrade, - Path(WebSocketPathParams { vault_id }): Path, - State(state): State, -) -> Result { - Ok(ws.on_upgrade(move |socket| websocket_wrapped(state, socket, vault_id))) -} - -async fn websocket_wrapped(state: AppState, stream: WebSocket, vault_id: VaultId) { - info!("WebSocket connection opened on vault '{vault_id}'"); - - let result = websocket(state, stream, vault_id.clone()).await; - - if let Err(err) = result { - debug!("WebSocket connection error on vault '{vault_id}': {err}"); - } -} - -#[allow(clippy::too_many_lines)] -async fn websocket( - state: AppState, - stream: WebSocket, - vault_id: VaultId, -) -> Result<(), SyncServerError> { - let (mut sender, mut websocket_receiver) = stream.split(); - - let authed_handshake = get_authenticated_handshake( - &state, - &vault_id, - websocket_receiver - .next() - .await - .transpose() - .unwrap_or_default(), - )?; - - info!( - "WebSocket handshake successful for vault '{vault_id}' for '{}'", - authed_handshake.handshake.device_id - ); - - let mut broadcast_receiver = state.broadcasts.get_receiver(vault_id.clone()).await; - - send_update_over_websocket( - &WebSocketServerMessage::VaultUpdate(WebSocketVaultUpdate { - documents: get_unseen_documents( - &state, - &vault_id, - authed_handshake.handshake.last_seen_vault_update_id, - ) - .await?, - is_initial_sync: true, - }), - &mut sender, - ) - .await?; - - send_update_over_websocket( - &WebSocketServerMessage::CursorPositions(CursorPositionFromServer { - clients: state.cursors.get_cursors(&vault_id).await, - }), - &mut sender, - ) - .await?; - - let device_id = authed_handshake.handshake.device_id.clone(); - let mut send_task = tokio::spawn(async move { - while let Ok(update) = broadcast_receiver.recv().await { - if Some(&device_id) == update.origin_device_id.as_ref() { - continue; - } - - send_update_over_websocket(&update.message, &mut sender).await?; - } - - Ok::<(), SyncServerError>(()) - }); - - let device_id = authed_handshake.handshake.device_id.clone(); - let vault_id_clone = vault_id.clone(); - let cursor_manager = state.cursors.clone(); - let mut receive_task = tokio::spawn(async move { - while let Some(Ok(Message::Text(message))) = websocket_receiver.next().await { - let message: WebSocketClientMessage = serde_json::from_str(&message) - .context("Failed to parse WebSocket message from client") - .map_err(server_error)?; - - match message { - WebSocketClientMessage::Handshake(_) => { - return Err(client_error(anyhow::anyhow!( - "Unexpected handshake message" - ))); - } - WebSocketClientMessage::CursorPositions(cursors) => { - cursor_manager - .update_cursors( - vault_id_clone.clone(), - authed_handshake.user.name.clone(), - &device_id, - cursors.document_to_cursors, - ) - .await; - } - } - } - - Ok::<(), SyncServerError>(()) - }); - - tokio::select! { - _ = &mut send_task => receive_task.abort(), - _ = &mut receive_task => send_task.abort(), - }; - - let result: Result<(), SyncServerError> = (async { - send_task - .await - .context("WebSocket send task failed") - .map_err(client_error) - .and_then(|err| err)?; - - receive_task - .await - .context("WebSocket receive task failed") - .map_err(client_error) - .and_then(|err| err)?; - - Ok(()) - }) - .await; - - state - .cursors - .remove_cursors_of_device(&vault_id, &authed_handshake.handshake.device_id) - .await; - - if result.is_err() { - info!( - "WebSocket disconnected on vault '{vault_id}' for '{}'", - authed_handshake.handshake.device_id - ); - } - - result -} diff --git a/backend/sync_server/src/utils.rs b/backend/sync_server/src/utils.rs deleted file mode 100644 index 870f4ae..0000000 --- a/backend/sync_server/src/utils.rs +++ /dev/null @@ -1,3 +0,0 @@ -pub mod dedup_paths; -pub mod normalize; -pub mod sanitize_path; diff --git a/backend/sync_server/src/utils/dedup_paths.rs b/backend/sync_server/src/utils/dedup_paths.rs deleted file mode 100644 index c35ad33..0000000 --- a/backend/sync_server/src/utils/dedup_paths.rs +++ /dev/null @@ -1,88 +0,0 @@ -use regex::Regex; - -pub fn dedup_paths(path: &str) -> impl Iterator { - let mut path_parts = path.split('/').collect::>(); - let file_name = path_parts.pop().unwrap().to_owned(); - - let mut directory = path_parts.join("/"); - if !directory.is_empty() { - directory.push('/'); - } - - let name_parts = file_name.rsplitn(2, '.').collect::>(); - let mut reverse_parts = name_parts.into_iter().rev(); - let (stem, extension) = match (reverse_parts.next(), reverse_parts.next()) { - (Some(stem), maybe_extension) => ( - stem.to_owned(), - maybe_extension - .map(|ext| format!(".{ext}")) - .unwrap_or_default(), - ), - _ => unreachable!("Path must have at least one part"), - }; - - let regex = Regex::new(r" \((\d+)\)$").unwrap(); - let start_number = regex - .captures(&stem) - .and_then(|caps| caps.get(1)) - .and_then(|m| m.as_str().parse::().ok()) - .unwrap_or(0); - - let clean_stem = regex.replace(&stem, "").to_string(); - - (start_number..).map(move |dedup_number| { - if dedup_number == 0 { - format!("{directory}{clean_stem}{extension}") - } else { - format!("{directory}{clean_stem} ({dedup_number}){extension}") - } - }) -} - -#[cfg(test)] -mod test { - use super::*; - - #[test] - fn test_dedup_paths() { - let mut deduped = dedup_paths("file.txt"); - assert_eq!(deduped.next(), Some("file.txt".to_owned())); - assert_eq!(deduped.next(), Some("file (1).txt".to_owned())); - assert_eq!(deduped.next(), Some("file (2).txt".to_owned())); - - let mut deduped = dedup_paths("file"); - assert_eq!(deduped.next(), Some("file".to_owned())); - assert_eq!(deduped.next(), Some("file (1)".to_owned())); - assert_eq!(deduped.next(), Some("file (2)".to_owned())); - - let mut deduped = dedup_paths("file (51).md"); - assert_eq!(deduped.next(), Some("file (51).md".to_owned())); - assert_eq!(deduped.next(), Some("file (52).md".to_owned())); - assert_eq!(deduped.next(), Some("file (53).md".to_owned())); - - let mut deduped = dedup_paths("file (5)"); - assert_eq!(deduped.next(), Some("file (5)".to_owned())); - assert_eq!(deduped.next(), Some("file (6)".to_owned())); - assert_eq!(deduped.next(), Some("file (7)".to_owned())); - - let mut deduped = dedup_paths("my/path.with.dots/file (5).md"); - assert_eq!( - deduped.next(), - Some("my/path.with.dots/file (5).md".to_owned()) - ); - assert_eq!( - deduped.next(), - Some("my/path.with.dots/file (6).md".to_owned()) - ); - - let mut deduped = dedup_paths("my/path.with.dots/file (5)"); - assert_eq!( - deduped.next(), - Some("my/path.with.dots/file (5)".to_owned()) - ); - assert_eq!( - deduped.next(), - Some("my/path.with.dots/file (6)".to_owned()) - ); - } -} diff --git a/backend/sync_server/src/utils/normalize.rs b/backend/sync_server/src/utils/normalize.rs deleted file mode 100644 index adb83ac..0000000 --- a/backend/sync_server/src/utils/normalize.rs +++ /dev/null @@ -1,11 +0,0 @@ -use serde::{Deserialize, Deserializer}; - -pub fn normalize<'de, D>(deserializer: D) -> Result -where - D: Deserializer<'de>, -{ - let s = String::deserialize(deserializer)?; - Ok(normalize_string(&s)) -} - -pub fn normalize_string(s: &str) -> String { s.trim().to_lowercase() } diff --git a/backend/sync_server/src/utils/sanitize_path.rs b/backend/sync_server/src/utils/sanitize_path.rs deleted file mode 100644 index 9703225..0000000 --- a/backend/sync_server/src/utils/sanitize_path.rs +++ /dev/null @@ -1,34 +0,0 @@ -/// Sanitize the document's path to allow all clients to create the same path in -/// their filesystem. If we didn't do this server-side, client's would need to -/// deal with mapping invalid names to valid ones and then back. -pub fn sanitize_path(path: &str) -> String { - let options = sanitize_filename::Options { - truncate: true, - windows: true, // Windows is the lowest common denominator - replacement: "", - }; - - path.split('/') - .map(|part| { - let proposal = sanitize_filename::sanitize_with_options(part, options.clone()); - if !part.is_empty() && proposal.is_empty() { - "_".to_owned() - } else { - proposal - } - }) - .collect::>() - .join("/") -} - -#[cfg(test)] -mod test { - use super::*; - - #[test] - fn test_sanitize_path() { - assert_eq!(sanitize_path("/my/path/what?"), "/my/path/what"); - assert_eq!(sanitize_path("file (1).md"), "file (1).md"); - assert_eq!(sanitize_path("/my/path/\\\\:?"), "/my/path/_"); - } -}