Move app state creating Broadcasts
This commit is contained in:
parent
ccff1cfc7a
commit
cb96c294a2
9 changed files with 101 additions and 18 deletions
2
.gitignore
vendored
2
.gitignore
vendored
|
|
@ -14,4 +14,4 @@ backend/databases
|
|||
|
||||
*.log
|
||||
|
||||
plugin/coverage
|
||||
*.sqlx
|
||||
|
|
|
|||
|
|
@ -1,10 +1,13 @@
|
|||
database:
|
||||
databases_directory_path: databases
|
||||
max_connections: 12
|
||||
|
||||
server:
|
||||
host: 0.0.0.0
|
||||
port: 3000
|
||||
max_body_size_mb: 512
|
||||
max_clients_per_vault: 256
|
||||
|
||||
users:
|
||||
user_tokens:
|
||||
- name: admin
|
||||
|
|
|
|||
|
|
@ -1,13 +1,19 @@
|
|||
pub mod broadcasts;
|
||||
pub mod database;
|
||||
|
||||
use std::ffi::OsString;
|
||||
|
||||
use anyhow::Result;
|
||||
use broadcasts::Broadcasts;
|
||||
use database::Database;
|
||||
|
||||
use crate::{config::Config, consts::DEFAULT_CONFIG_PATH, database::Database};
|
||||
use crate::{config::Config, consts::DEFAULT_CONFIG_PATH};
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct AppState {
|
||||
pub config: Config,
|
||||
pub database: Database,
|
||||
pub broadcasts: Broadcasts,
|
||||
}
|
||||
|
||||
impl AppState {
|
||||
|
|
@ -17,7 +23,12 @@ impl AppState {
|
|||
|
||||
let config = Config::read_or_create(&path).await?;
|
||||
let database = Database::try_new(&config.database).await?;
|
||||
let broadcasts = Broadcasts::new(&config.server);
|
||||
|
||||
Ok(Self { config, database })
|
||||
Ok(Self {
|
||||
config,
|
||||
database,
|
||||
broadcasts,
|
||||
})
|
||||
}
|
||||
}
|
||||
59
backend/sync_server/src/app_state/broadcasts.rs
Normal file
59
backend/sync_server/src/app_state/broadcasts.rs
Normal file
|
|
@ -0,0 +1,59 @@
|
|||
use std::{collections::HashMap, sync::Arc};
|
||||
|
||||
use anyhow::Context;
|
||||
use tokio::sync::{Mutex, broadcast};
|
||||
|
||||
use super::database::models::{DocumentVersionWithoutContent, VaultId};
|
||||
use crate::{
|
||||
config::server_config::ServerConfig,
|
||||
errors::{SyncServerError, server_error},
|
||||
};
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Broadcasts {
|
||||
max_clients_per_vault: usize,
|
||||
tx: Arc<Mutex<HashMap<VaultId, broadcast::Sender<DocumentVersionWithoutContent>>>>,
|
||||
}
|
||||
|
||||
impl Broadcasts {
|
||||
pub fn new(server_config: &ServerConfig) -> Self {
|
||||
Self {
|
||||
max_clients_per_vault: server_config.max_clients_per_vault,
|
||||
tx: Arc::new(Mutex::new(HashMap::new())),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn get_receiver(
|
||||
&self,
|
||||
vault: VaultId,
|
||||
) -> broadcast::Receiver<DocumentVersionWithoutContent> {
|
||||
let tx = self.get_or_create(vault).await;
|
||||
|
||||
tx.subscribe()
|
||||
}
|
||||
|
||||
pub async fn send(
|
||||
&self,
|
||||
vault: VaultId,
|
||||
document: DocumentVersionWithoutContent,
|
||||
) -> Result<(), SyncServerError> {
|
||||
let tx = self.get_or_create(vault).await;
|
||||
|
||||
tx.send(document)
|
||||
.context("Cannot broadcast update message to websocket listeners")
|
||||
.map_err(server_error)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn get_or_create(
|
||||
&self,
|
||||
vault: VaultId,
|
||||
) -> broadcast::Sender<DocumentVersionWithoutContent> {
|
||||
let mut tx = self.tx.lock().await;
|
||||
|
||||
tx.entry(vault)
|
||||
.or_insert_with(|| broadcast::channel(self.max_clients_per_vault).0.clone())
|
||||
.clone()
|
||||
}
|
||||
}
|
||||
|
|
@ -85,13 +85,13 @@ impl Database {
|
|||
}
|
||||
|
||||
async fn run_migrations(pool: &Pool<Sqlite>) -> Result<()> {
|
||||
sqlx::migrate!("src/database/migrations")
|
||||
sqlx::migrate!("src/app_state/database/migrations")
|
||||
.run(pool)
|
||||
.await
|
||||
.context("Cannot check for pending migrations")
|
||||
}
|
||||
|
||||
async fn get_connection_pool(&mut self, vault: &VaultId) -> Result<Pool<Sqlite>> {
|
||||
async fn get_connection_pool(&self, vault: &VaultId) -> Result<Pool<Sqlite>> {
|
||||
let mut pools = self.connection_pools.lock().await;
|
||||
if !pools.contains_key(vault) {
|
||||
let pool = Self::create_vault_database(&self.config, vault).await?;
|
||||
|
|
@ -108,7 +108,7 @@ impl Database {
|
|||
/// Attempting to write from this transaction might result in a
|
||||
/// database locked error. Use this transaction for read-only operations.
|
||||
pub async fn create_readonly_transaction(
|
||||
&mut self,
|
||||
&self,
|
||||
vault: &VaultId,
|
||||
) -> Result<Transaction<'static>> {
|
||||
self.get_connection_pool(vault)
|
||||
|
|
@ -118,10 +118,7 @@ impl Database {
|
|||
.context("Cannot create transaction")
|
||||
}
|
||||
|
||||
pub async fn create_write_transaction(
|
||||
&mut self,
|
||||
vault: &VaultId,
|
||||
) -> Result<Transaction<'static>> {
|
||||
pub async fn create_write_transaction(&self, vault: &VaultId) -> Result<Transaction<'static>> {
|
||||
let mut transaction = self.create_readonly_transaction(vault).await?;
|
||||
|
||||
// sqlx doesn't support immediate transactions for sqlite: https://github.com/launchbadge/sqlx/issues/481
|
||||
|
|
@ -134,7 +131,7 @@ impl Database {
|
|||
|
||||
/// Return the latest state of all documents in the vault
|
||||
pub async fn get_latest_documents(
|
||||
&mut self,
|
||||
&self,
|
||||
vault: &VaultId,
|
||||
transaction: Option<&mut Transaction<'_>>,
|
||||
) -> Result<Vec<DocumentVersionWithoutContent>> {
|
||||
|
|
@ -165,7 +162,7 @@ impl Database {
|
|||
/// Return the latest state of all documents (including deleted) in the
|
||||
/// vault which have changed since the given update id
|
||||
pub async fn get_latest_documents_since(
|
||||
&mut self,
|
||||
&self,
|
||||
vault: &VaultId,
|
||||
vault_update_id: VaultUpdateId,
|
||||
transaction: Option<&mut Transaction<'_>>,
|
||||
|
|
@ -199,7 +196,7 @@ impl Database {
|
|||
}
|
||||
|
||||
pub async fn get_max_update_id_in_vault(
|
||||
&mut self,
|
||||
&self,
|
||||
vault: &VaultId,
|
||||
transaction: Option<&mut Transaction<'_>>,
|
||||
) -> Result<i64> {
|
||||
|
|
@ -222,7 +219,7 @@ impl Database {
|
|||
}
|
||||
|
||||
pub async fn get_latest_document_by_path(
|
||||
&mut self,
|
||||
&self,
|
||||
vault: &VaultId,
|
||||
relative_path: &str,
|
||||
transaction: Option<&mut Transaction<'_>>,
|
||||
|
|
@ -258,7 +255,7 @@ impl Database {
|
|||
}
|
||||
|
||||
pub async fn get_latest_document(
|
||||
&mut self,
|
||||
&self,
|
||||
vault: &VaultId,
|
||||
document_id: &DocumentId,
|
||||
transaction: Option<&mut Transaction<'_>>,
|
||||
|
|
@ -291,7 +288,7 @@ impl Database {
|
|||
}
|
||||
|
||||
pub async fn get_document_version(
|
||||
&mut self,
|
||||
&self,
|
||||
vault: &VaultId,
|
||||
vault_update_id: VaultUpdateId,
|
||||
transaction: Option<&mut Transaction<'_>>,
|
||||
|
|
@ -322,7 +319,7 @@ impl Database {
|
|||
}
|
||||
|
||||
pub async fn insert_document_version(
|
||||
&mut self,
|
||||
&self,
|
||||
vault: &VaultId,
|
||||
version: &StoredDocumentVersion,
|
||||
transaction: Option<&mut Transaction<'_>>,
|
||||
|
|
@ -1,7 +1,10 @@
|
|||
use log::debug;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::consts::{DEFAULT_HOST, DEFAULT_MAX_BODY_SIZE_MB, DEFAULT_PORT};
|
||||
use crate::consts::{
|
||||
DEFAULT_HOST, DEFAULT_MAX_BODY_SIZE_MB, DEFAULT_MAX_CLIENTS_PER_VAULT, DEFAULT_PORT,
|
||||
};
|
||||
|
||||
#[derive(Debug, Deserialize, Serialize, Clone)]
|
||||
pub struct ServerConfig {
|
||||
#[serde(default = "default_host")]
|
||||
|
|
@ -12,6 +15,9 @@ pub struct ServerConfig {
|
|||
|
||||
#[serde(default = "default_max_body_size_mb")]
|
||||
pub max_body_size_mb: usize,
|
||||
|
||||
#[serde(default = "default_max_clients_per_vault")]
|
||||
pub max_clients_per_vault: usize,
|
||||
}
|
||||
|
||||
fn default_host() -> String {
|
||||
|
|
@ -29,12 +35,18 @@ fn default_max_body_size_mb() -> usize {
|
|||
DEFAULT_MAX_BODY_SIZE_MB
|
||||
}
|
||||
|
||||
fn default_max_clients_per_vault() -> usize {
|
||||
debug!("Using default max clients per vault: {DEFAULT_MAX_CLIENTS_PER_VAULT}");
|
||||
DEFAULT_MAX_CLIENTS_PER_VAULT
|
||||
}
|
||||
|
||||
impl Default for ServerConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
host: default_host(),
|
||||
port: default_port(),
|
||||
max_body_size_mb: default_max_body_size_mb(),
|
||||
max_clients_per_vault: default_max_clients_per_vault(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -4,3 +4,4 @@ pub const DEFAULT_HOST: &str = "127.0.0.1";
|
|||
pub const DEFAULT_PORT: u16 = 3000;
|
||||
pub const DEFAULT_MAX_CONNECTIONS: u32 = 12;
|
||||
pub const DEFAULT_MAX_BODY_SIZE_MB: usize = 4096;
|
||||
pub const DEFAULT_MAX_CLIENTS_PER_VAULT: usize = 256;
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue