From cb96c294a26997fd401afaf32b44d4663da565c9 Mon Sep 17 00:00:00 2001 From: Andras Schmelczer Date: Tue, 25 Mar 2025 21:30:42 +0000 Subject: [PATCH] Move app state creating Broadcasts --- .gitignore | 2 +- backend/config-e2e.yml | 3 + .../sync_server/src/{server => }/app_state.rs | 15 ++++- .../sync_server/src/app_state/broadcasts.rs | 59 +++++++++++++++++++ .../src/{ => app_state}/database.rs | 25 ++++---- .../migrations/20241207143519_bootstrap.sql | 0 .../src/{ => app_state}/database/models.rs | 0 .../sync_server/src/config/server_config.rs | 14 ++++- backend/sync_server/src/consts.rs | 1 + 9 files changed, 101 insertions(+), 18 deletions(-) rename backend/sync_server/src/{server => }/app_state.rs (61%) create mode 100644 backend/sync_server/src/app_state/broadcasts.rs rename backend/sync_server/src/{ => app_state}/database.rs (96%) rename backend/sync_server/src/{ => app_state}/database/migrations/20241207143519_bootstrap.sql (100%) rename backend/sync_server/src/{ => app_state}/database/models.rs (100%) diff --git a/.gitignore b/.gitignore index b1e083d2..a91ed90b 100644 --- a/.gitignore +++ b/.gitignore @@ -14,4 +14,4 @@ backend/databases *.log -plugin/coverage +*.sqlx diff --git a/backend/config-e2e.yml b/backend/config-e2e.yml index 2345c8b3..04fe344a 100644 --- a/backend/config-e2e.yml +++ b/backend/config-e2e.yml @@ -1,10 +1,13 @@ database: databases_directory_path: databases max_connections: 12 + server: host: 0.0.0.0 port: 3000 max_body_size_mb: 512 + max_clients_per_vault: 256 + users: user_tokens: - name: admin diff --git a/backend/sync_server/src/server/app_state.rs b/backend/sync_server/src/app_state.rs similarity index 61% rename from backend/sync_server/src/server/app_state.rs rename to backend/sync_server/src/app_state.rs index 2a8a96eb..1cad9149 100644 --- a/backend/sync_server/src/server/app_state.rs +++ b/backend/sync_server/src/app_state.rs @@ -1,13 +1,19 @@ +pub mod broadcasts; +pub mod database; + use std::ffi::OsString; use anyhow::Result; +use broadcasts::Broadcasts; +use database::Database; -use crate::{config::Config, consts::DEFAULT_CONFIG_PATH, database::Database}; +use crate::{config::Config, consts::DEFAULT_CONFIG_PATH}; #[derive(Clone, Debug)] pub struct AppState { pub config: Config, pub database: Database, + pub broadcasts: Broadcasts, } impl AppState { @@ -17,7 +23,12 @@ impl AppState { let config = Config::read_or_create(&path).await?; let database = Database::try_new(&config.database).await?; + let broadcasts = Broadcasts::new(&config.server); - Ok(Self { config, database }) + Ok(Self { + config, + database, + broadcasts, + }) } } diff --git a/backend/sync_server/src/app_state/broadcasts.rs b/backend/sync_server/src/app_state/broadcasts.rs new file mode 100644 index 00000000..3e7d98cc --- /dev/null +++ b/backend/sync_server/src/app_state/broadcasts.rs @@ -0,0 +1,59 @@ +use std::{collections::HashMap, sync::Arc}; + +use anyhow::Context; +use tokio::sync::{Mutex, broadcast}; + +use super::database::models::{DocumentVersionWithoutContent, VaultId}; +use crate::{ + config::server_config::ServerConfig, + errors::{SyncServerError, server_error}, +}; + +#[derive(Debug, Clone)] +pub struct Broadcasts { + max_clients_per_vault: usize, + tx: Arc>>>, +} + +impl Broadcasts { + pub fn new(server_config: &ServerConfig) -> Self { + Self { + max_clients_per_vault: server_config.max_clients_per_vault, + tx: Arc::new(Mutex::new(HashMap::new())), + } + } + + pub async fn get_receiver( + &self, + vault: VaultId, + ) -> broadcast::Receiver { + let tx = self.get_or_create(vault).await; + + tx.subscribe() + } + + pub async fn send( + &self, + vault: VaultId, + document: DocumentVersionWithoutContent, + ) -> Result<(), SyncServerError> { + let tx = self.get_or_create(vault).await; + + tx.send(document) + .context("Cannot broadcast update message to websocket listeners") + .map_err(server_error)?; + + Ok(()) + } + + async fn get_or_create( + &self, + vault: VaultId, + ) -> broadcast::Sender { + let mut tx = self.tx.lock().await; + + tx.entry(vault) + .or_insert_with(|| broadcast::channel(self.max_clients_per_vault).0.clone()) + .clone() + } +} diff --git a/backend/sync_server/src/database.rs b/backend/sync_server/src/app_state/database.rs similarity index 96% rename from backend/sync_server/src/database.rs rename to backend/sync_server/src/app_state/database.rs index 882bd0a2..fa7f35b0 100644 --- a/backend/sync_server/src/database.rs +++ b/backend/sync_server/src/app_state/database.rs @@ -85,13 +85,13 @@ impl Database { } async fn run_migrations(pool: &Pool) -> Result<()> { - sqlx::migrate!("src/database/migrations") + sqlx::migrate!("src/app_state/database/migrations") .run(pool) .await .context("Cannot check for pending migrations") } - async fn get_connection_pool(&mut self, vault: &VaultId) -> Result> { + async fn get_connection_pool(&self, vault: &VaultId) -> Result> { let mut pools = self.connection_pools.lock().await; if !pools.contains_key(vault) { let pool = Self::create_vault_database(&self.config, vault).await?; @@ -108,7 +108,7 @@ impl Database { /// Attempting to write from this transaction might result in a /// database locked error. Use this transaction for read-only operations. pub async fn create_readonly_transaction( - &mut self, + &self, vault: &VaultId, ) -> Result> { self.get_connection_pool(vault) @@ -118,10 +118,7 @@ impl Database { .context("Cannot create transaction") } - pub async fn create_write_transaction( - &mut self, - vault: &VaultId, - ) -> Result> { + pub async fn create_write_transaction(&self, vault: &VaultId) -> Result> { let mut transaction = self.create_readonly_transaction(vault).await?; // sqlx doesn't support immediate transactions for sqlite: https://github.com/launchbadge/sqlx/issues/481 @@ -134,7 +131,7 @@ impl Database { /// Return the latest state of all documents in the vault pub async fn get_latest_documents( - &mut self, + &self, vault: &VaultId, transaction: Option<&mut Transaction<'_>>, ) -> Result> { @@ -165,7 +162,7 @@ impl Database { /// Return the latest state of all documents (including deleted) in the /// vault which have changed since the given update id pub async fn get_latest_documents_since( - &mut self, + &self, vault: &VaultId, vault_update_id: VaultUpdateId, transaction: Option<&mut Transaction<'_>>, @@ -199,7 +196,7 @@ impl Database { } pub async fn get_max_update_id_in_vault( - &mut self, + &self, vault: &VaultId, transaction: Option<&mut Transaction<'_>>, ) -> Result { @@ -222,7 +219,7 @@ impl Database { } pub async fn get_latest_document_by_path( - &mut self, + &self, vault: &VaultId, relative_path: &str, transaction: Option<&mut Transaction<'_>>, @@ -258,7 +255,7 @@ impl Database { } pub async fn get_latest_document( - &mut self, + &self, vault: &VaultId, document_id: &DocumentId, transaction: Option<&mut Transaction<'_>>, @@ -291,7 +288,7 @@ impl Database { } pub async fn get_document_version( - &mut self, + &self, vault: &VaultId, vault_update_id: VaultUpdateId, transaction: Option<&mut Transaction<'_>>, @@ -322,7 +319,7 @@ impl Database { } pub async fn insert_document_version( - &mut self, + &self, vault: &VaultId, version: &StoredDocumentVersion, transaction: Option<&mut Transaction<'_>>, diff --git a/backend/sync_server/src/database/migrations/20241207143519_bootstrap.sql b/backend/sync_server/src/app_state/database/migrations/20241207143519_bootstrap.sql similarity index 100% rename from backend/sync_server/src/database/migrations/20241207143519_bootstrap.sql rename to backend/sync_server/src/app_state/database/migrations/20241207143519_bootstrap.sql diff --git a/backend/sync_server/src/database/models.rs b/backend/sync_server/src/app_state/database/models.rs similarity index 100% rename from backend/sync_server/src/database/models.rs rename to backend/sync_server/src/app_state/database/models.rs diff --git a/backend/sync_server/src/config/server_config.rs b/backend/sync_server/src/config/server_config.rs index 8d7c63ea..077bd8d8 100644 --- a/backend/sync_server/src/config/server_config.rs +++ b/backend/sync_server/src/config/server_config.rs @@ -1,7 +1,10 @@ use log::debug; use serde::{Deserialize, Serialize}; -use crate::consts::{DEFAULT_HOST, DEFAULT_MAX_BODY_SIZE_MB, DEFAULT_PORT}; +use crate::consts::{ + DEFAULT_HOST, DEFAULT_MAX_BODY_SIZE_MB, DEFAULT_MAX_CLIENTS_PER_VAULT, DEFAULT_PORT, +}; + #[derive(Debug, Deserialize, Serialize, Clone)] pub struct ServerConfig { #[serde(default = "default_host")] @@ -12,6 +15,9 @@ pub struct ServerConfig { #[serde(default = "default_max_body_size_mb")] pub max_body_size_mb: usize, + + #[serde(default = "default_max_clients_per_vault")] + pub max_clients_per_vault: usize, } fn default_host() -> String { @@ -29,12 +35,18 @@ fn default_max_body_size_mb() -> usize { DEFAULT_MAX_BODY_SIZE_MB } +fn default_max_clients_per_vault() -> usize { + debug!("Using default max clients per vault: {DEFAULT_MAX_CLIENTS_PER_VAULT}"); + DEFAULT_MAX_CLIENTS_PER_VAULT +} + impl Default for ServerConfig { fn default() -> Self { Self { host: default_host(), port: default_port(), max_body_size_mb: default_max_body_size_mb(), + max_clients_per_vault: default_max_clients_per_vault(), } } } diff --git a/backend/sync_server/src/consts.rs b/backend/sync_server/src/consts.rs index bec936b4..2d3bec55 100644 --- a/backend/sync_server/src/consts.rs +++ b/backend/sync_server/src/consts.rs @@ -4,3 +4,4 @@ pub const DEFAULT_HOST: &str = "127.0.0.1"; pub const DEFAULT_PORT: u16 = 3000; pub const DEFAULT_MAX_CONNECTIONS: u32 = 12; pub const DEFAULT_MAX_BODY_SIZE_MB: usize = 4096; +pub const DEFAULT_MAX_CLIENTS_PER_VAULT: usize = 256;