Add API for propagating cursor locations (#61)
This commit is contained in:
parent
f97193e287
commit
e8b9bf40c5
80 changed files with 1930 additions and 2229 deletions
128
backend/sync_server/src/app_state/cursors.rs
Normal file
128
backend/sync_server/src/app_state/cursors.rs
Normal file
|
|
@ -0,0 +1,128 @@
|
|||
use core::time::Duration;
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
use super::{
|
||||
database::models::{DeviceId, VaultId},
|
||||
websocket::{
|
||||
broadcasts::Broadcasts,
|
||||
models::{
|
||||
ClientCursors, CursorPositionFromServer, CursorSpan, WebSocketServerMessage,
|
||||
WebSocketServerMessageWithOrigin,
|
||||
},
|
||||
},
|
||||
};
|
||||
use crate::config::database_config::DatabaseConfig;
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct Cursors {
|
||||
config: DatabaseConfig,
|
||||
broadcasts: Broadcasts,
|
||||
vault_to_cursors: Arc<Mutex<HashMap<VaultId, Vec<ClientCursorsWithTimeToLive>>>>,
|
||||
}
|
||||
|
||||
impl Cursors {
|
||||
pub fn new(config: &DatabaseConfig, broadcasts: &Broadcasts) -> Self {
|
||||
Self {
|
||||
config: config.clone(),
|
||||
broadcasts: broadcasts.clone(),
|
||||
vault_to_cursors: Arc::new(Mutex::new(HashMap::new())),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn update_cursors(
|
||||
&self,
|
||||
vault_id: VaultId,
|
||||
user_name: String,
|
||||
device_id: &DeviceId,
|
||||
document_to_cursors: HashMap<String, Vec<CursorSpan>>,
|
||||
) {
|
||||
let mut vault_to_cursors = self.vault_to_cursors.lock().await;
|
||||
|
||||
let all_device_cursors = vault_to_cursors.entry(vault_id).or_insert_with(Vec::new);
|
||||
|
||||
all_device_cursors.retain(|c| &c.client_cursors.device_id != device_id);
|
||||
all_device_cursors.push(ClientCursorsWithTimeToLive::new(ClientCursors {
|
||||
user_name,
|
||||
device_id: device_id.to_string(),
|
||||
cursors: document_to_cursors,
|
||||
}));
|
||||
|
||||
drop(vault_to_cursors); // Explicitly drop the lock before broadcasting to avoid deadlock
|
||||
self.broadcast_cursors().await;
|
||||
}
|
||||
|
||||
pub async fn get_cursors(&self, vault_id: &VaultId) -> Vec<ClientCursors> {
|
||||
let vault_to_cursors = self.vault_to_cursors.lock().await;
|
||||
vault_to_cursors
|
||||
.get(vault_id)
|
||||
.map(|cursors| {
|
||||
cursors
|
||||
.iter()
|
||||
.cloned()
|
||||
.map(|with_ttl| with_ttl.client_cursors)
|
||||
.collect::<Vec<_>>()
|
||||
})
|
||||
.unwrap_or_default()
|
||||
}
|
||||
|
||||
pub fn start_background_task(self) {
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
self.remove_expired_cursors().await;
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
async fn remove_expired_cursors(&self) {
|
||||
let mut vault_to_cursors = self.vault_to_cursors.lock().await;
|
||||
|
||||
for (_vault_id, cursors) in vault_to_cursors.iter_mut() {
|
||||
cursors.retain(|cursor| !cursor.is_expired(self.config.cursor_timeout));
|
||||
}
|
||||
}
|
||||
|
||||
async fn broadcast_cursors(&self) {
|
||||
let vault_to_cursors = self.vault_to_cursors.lock().await;
|
||||
|
||||
for (vault_id, cursors) in vault_to_cursors.iter() {
|
||||
self.broadcasts
|
||||
.send_document_update(
|
||||
vault_id.clone(),
|
||||
WebSocketServerMessageWithOrigin::new(WebSocketServerMessage::CursorPositions(
|
||||
CursorPositionFromServer {
|
||||
clients: cursors.iter().map(|c| c.client_cursors.clone()).collect(),
|
||||
},
|
||||
)),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn remove_cursors_of_device(&self, vault_id: &str, device_id: &str) {
|
||||
let mut vault_to_cursors = self.vault_to_cursors.lock().await;
|
||||
|
||||
if let Some(cursors) = vault_to_cursors.get_mut(vault_id) {
|
||||
cursors.retain(|c| c.client_cursors.device_id != device_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
struct ClientCursorsWithTimeToLive {
|
||||
client_cursors: ClientCursors,
|
||||
last_updated: std::time::Instant,
|
||||
}
|
||||
|
||||
impl ClientCursorsWithTimeToLive {
|
||||
fn new(client_cursors: ClientCursors) -> Self {
|
||||
Self {
|
||||
client_cursors,
|
||||
last_updated: std::time::Instant::now(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn is_expired(&self, ttl: Duration) -> bool { self.last_updated.elapsed() > ttl }
|
||||
}
|
||||
|
|
@ -6,23 +6,29 @@ use models::{
|
|||
DocumentId, DocumentVersionWithoutContent, StoredDocumentVersion, VaultId, VaultUpdateId,
|
||||
};
|
||||
use sqlx::{sqlite::SqliteConnectOptions, types::chrono::Utc};
|
||||
|
||||
pub mod models;
|
||||
use sqlx::{Pool, Sqlite, sqlite::SqlitePoolOptions};
|
||||
use tokio::sync::Mutex;
|
||||
use uuid::fmt::Hyphenated;
|
||||
|
||||
use super::websocket::{
|
||||
broadcasts::Broadcasts,
|
||||
models::{WebSocketServerMessage, WebSocketServerMessageWithOrigin, WebSocketVaultUpdate},
|
||||
};
|
||||
use crate::config::database_config::DatabaseConfig;
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct Database {
|
||||
config: DatabaseConfig,
|
||||
broadcasts: Broadcasts,
|
||||
connection_pools: Arc<Mutex<HashMap<VaultId, Pool<Sqlite>>>>,
|
||||
}
|
||||
|
||||
pub type Transaction<'a> = sqlx::Transaction<'a, Sqlite>;
|
||||
|
||||
impl Database {
|
||||
pub async fn try_new(config: &DatabaseConfig) -> Result<Self> {
|
||||
pub async fn try_new(config: &DatabaseConfig, broadcasts: &Broadcasts) -> Result<Self> {
|
||||
tokio::fs::create_dir_all(&config.databases_directory_path)
|
||||
.await
|
||||
.with_context(|| {
|
||||
|
|
@ -55,6 +61,7 @@ impl Database {
|
|||
Ok(Self {
|
||||
config: config.clone(),
|
||||
connection_pools: Arc::new(Mutex::new(connection_pools)),
|
||||
broadcasts: broadcasts.clone(),
|
||||
})
|
||||
}
|
||||
|
||||
|
|
@ -362,7 +369,7 @@ impl Database {
|
|||
|
||||
pub async fn insert_document_version(
|
||||
&self,
|
||||
vault: &VaultId,
|
||||
vault_id: &VaultId,
|
||||
version: &StoredDocumentVersion,
|
||||
transaction: Option<&mut Transaction<'_>>,
|
||||
) -> Result<()> {
|
||||
|
|
@ -394,10 +401,25 @@ impl Database {
|
|||
if let Some(transaction) = transaction {
|
||||
query.execute(&mut **transaction).await
|
||||
} else {
|
||||
query.execute(&self.get_connection_pool(vault).await?).await
|
||||
query
|
||||
.execute(&self.get_connection_pool(vault_id).await?)
|
||||
.await
|
||||
}
|
||||
.context("Cannot insert document version")?;
|
||||
|
||||
self.broadcasts
|
||||
.send_document_update(
|
||||
vault_id.clone(),
|
||||
WebSocketServerMessageWithOrigin::with_origin(
|
||||
version.device_id.clone(),
|
||||
WebSocketServerMessage::VaultUpdate(WebSocketVaultUpdate {
|
||||
documents: vec![version.clone().into()],
|
||||
is_initial_sync: false,
|
||||
}),
|
||||
),
|
||||
)
|
||||
.await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,10 +1,11 @@
|
|||
use chrono::{DateTime, Utc};
|
||||
use schemars::JsonSchema;
|
||||
use serde::Serialize;
|
||||
use sync_lib::bytes_to_base64;
|
||||
use ts_rs::TS;
|
||||
|
||||
pub type VaultId = String;
|
||||
pub type VaultUpdateId = i64;
|
||||
|
||||
pub type DocumentId = uuid::Uuid;
|
||||
pub type UserId = String;
|
||||
pub type DeviceId = String;
|
||||
|
|
@ -25,16 +26,20 @@ impl PartialEq<Self> for StoredDocumentVersion {
|
|||
fn eq(&self, other: &Self) -> bool { self.vault_update_id == other.vault_update_id }
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, JsonSchema)]
|
||||
#[derive(TS, Debug, Clone, Serialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct DocumentVersionWithoutContent {
|
||||
#[ts(as = "i32")]
|
||||
pub vault_update_id: VaultUpdateId,
|
||||
|
||||
pub document_id: DocumentId,
|
||||
pub relative_path: String,
|
||||
pub updated_date: DateTime<Utc>,
|
||||
pub is_deleted: bool,
|
||||
pub user_id: UserId,
|
||||
pub device_id: DeviceId,
|
||||
|
||||
#[ts(as = "i32")]
|
||||
pub content_size: u64,
|
||||
}
|
||||
|
||||
|
|
@ -53,10 +58,12 @@ impl From<StoredDocumentVersion> for DocumentVersionWithoutContent {
|
|||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, JsonSchema)]
|
||||
#[derive(TS, Debug, Clone, Serialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct DocumentVersion {
|
||||
#[ts(as = "i32")]
|
||||
pub vault_update_id: VaultUpdateId,
|
||||
|
||||
pub document_id: DocumentId,
|
||||
pub relative_path: String,
|
||||
pub updated_date: DateTime<Utc>,
|
||||
|
|
|
|||
3
backend/sync_server/src/app_state/websocket.rs
Normal file
3
backend/sync_server/src/app_state/websocket.rs
Normal file
|
|
@ -0,0 +1,3 @@
|
|||
pub mod broadcasts;
|
||||
pub mod models;
|
||||
pub mod utils;
|
||||
|
|
@ -3,19 +3,15 @@ use std::{collections::HashMap, sync::Arc};
|
|||
use anyhow::Context;
|
||||
use tokio::sync::{Mutex, broadcast};
|
||||
|
||||
use super::database::models::{DeviceId, DocumentVersionWithoutContent, VaultId};
|
||||
use crate::{config::server_config::ServerConfig, errors::server_error};
|
||||
use super::models::WebSocketServerMessageWithOrigin;
|
||||
use crate::{
|
||||
app_state::database::models::VaultId, config::server_config::ServerConfig, errors::server_error,
|
||||
};
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Broadcasts {
|
||||
max_clients_per_vault: usize,
|
||||
tx: Arc<Mutex<HashMap<VaultId, broadcast::Sender<VaultUpdate>>>>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct VaultUpdate {
|
||||
pub origin_device_id: Option<DeviceId>,
|
||||
pub document: DocumentVersionWithoutContent,
|
||||
tx: Arc<Mutex<HashMap<VaultId, broadcast::Sender<WebSocketServerMessageWithOrigin>>>>,
|
||||
}
|
||||
|
||||
impl Broadcasts {
|
||||
|
|
@ -26,20 +22,27 @@ impl Broadcasts {
|
|||
}
|
||||
}
|
||||
|
||||
pub async fn get_receiver(&self, vault: VaultId) -> broadcast::Receiver<VaultUpdate> {
|
||||
pub async fn get_receiver(
|
||||
&self,
|
||||
vault: VaultId,
|
||||
) -> broadcast::Receiver<WebSocketServerMessageWithOrigin> {
|
||||
let tx = self.get_or_create(vault).await;
|
||||
|
||||
tx.subscribe()
|
||||
}
|
||||
|
||||
/// Sent a document update to all clients subscribed to the vault.
|
||||
/// We ignore & log failures.
|
||||
pub async fn send(&self, vault: VaultId, document: VaultUpdate) {
|
||||
/// Notify all clients (who are subscribed to the vault) about an update.
|
||||
/// We only log failures.
|
||||
pub async fn send_document_update(
|
||||
&self,
|
||||
vault: VaultId,
|
||||
document: WebSocketServerMessageWithOrigin,
|
||||
) {
|
||||
let tx = self.get_or_create(vault).await;
|
||||
|
||||
let result = tx
|
||||
.send(document)
|
||||
.context("Cannot broadcast update message to websocket listeners")
|
||||
.context("Cannot broadcast server message to websocket listeners")
|
||||
.map_err(server_error);
|
||||
|
||||
if result.is_err() {
|
||||
|
|
@ -47,7 +50,10 @@ impl Broadcasts {
|
|||
}
|
||||
}
|
||||
|
||||
async fn get_or_create(&self, vault: VaultId) -> broadcast::Sender<VaultUpdate> {
|
||||
async fn get_or_create(
|
||||
&self,
|
||||
vault: VaultId,
|
||||
) -> broadcast::Sender<WebSocketServerMessageWithOrigin> {
|
||||
let mut tx = self.tx.lock().await;
|
||||
|
||||
tx.entry(vault)
|
||||
88
backend/sync_server/src/app_state/websocket/models.rs
Normal file
88
backend/sync_server/src/app_state/websocket/models.rs
Normal file
|
|
@ -0,0 +1,88 @@
|
|||
use std::collections::HashMap;
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
use ts_rs::TS;
|
||||
|
||||
use crate::app_state::database::models::{DeviceId, DocumentVersionWithoutContent, VaultUpdateId};
|
||||
|
||||
#[derive(TS, Deserialize, Clone, Debug)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct WebSocketHandshake {
|
||||
pub token: String,
|
||||
pub device_id: DeviceId,
|
||||
|
||||
#[ts(as = "Option<i32>")]
|
||||
pub last_seen_vault_update_id: Option<VaultUpdateId>,
|
||||
}
|
||||
|
||||
#[derive(TS, Serialize, Deserialize, Clone, Debug)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct CursorSpan {
|
||||
pub start: usize,
|
||||
pub end: usize,
|
||||
}
|
||||
|
||||
#[derive(TS, Deserialize, Clone, Debug)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct CursorPositionFromClient {
|
||||
pub document_to_cursors: HashMap<String, Vec<CursorSpan>>,
|
||||
}
|
||||
|
||||
#[derive(TS, Serialize, Clone, Debug)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct ClientCursors {
|
||||
pub user_name: String,
|
||||
pub device_id: DeviceId,
|
||||
pub cursors: HashMap<String, Vec<CursorSpan>>,
|
||||
}
|
||||
|
||||
#[derive(TS, Serialize, Clone, Debug)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct CursorPositionFromServer {
|
||||
pub clients: Vec<ClientCursors>,
|
||||
}
|
||||
|
||||
#[derive(TS, Serialize, Clone, Debug)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct WebSocketVaultUpdate {
|
||||
pub documents: Vec<DocumentVersionWithoutContent>,
|
||||
pub is_initial_sync: bool,
|
||||
}
|
||||
|
||||
#[derive(TS, Deserialize, Clone, Debug)]
|
||||
#[serde(rename_all = "camelCase", tag = "type")]
|
||||
#[ts(export)]
|
||||
pub enum WebSocketClientMessage {
|
||||
Handshake(WebSocketHandshake),
|
||||
CursorPositions(CursorPositionFromClient),
|
||||
}
|
||||
|
||||
#[derive(TS, Serialize, Clone, Debug)]
|
||||
#[serde(rename_all = "camelCase", tag = "type")]
|
||||
#[ts(export)]
|
||||
pub enum WebSocketServerMessage {
|
||||
VaultUpdate(WebSocketVaultUpdate),
|
||||
CursorPositions(CursorPositionFromServer),
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct WebSocketServerMessageWithOrigin {
|
||||
pub origin_device_id: Option<DeviceId>,
|
||||
pub message: WebSocketServerMessage,
|
||||
}
|
||||
|
||||
impl WebSocketServerMessageWithOrigin {
|
||||
pub fn new(message: WebSocketServerMessage) -> Self {
|
||||
Self {
|
||||
origin_device_id: None,
|
||||
message,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn with_origin(origin_device_id: DeviceId, message: WebSocketServerMessage) -> Self {
|
||||
Self {
|
||||
origin_device_id: Some(origin_device_id),
|
||||
message,
|
||||
}
|
||||
}
|
||||
}
|
||||
80
backend/sync_server/src/app_state/websocket/utils.rs
Normal file
80
backend/sync_server/src/app_state/websocket/utils.rs
Normal file
|
|
@ -0,0 +1,80 @@
|
|||
use anyhow::Context;
|
||||
use axum::extract::ws::{Message, WebSocket};
|
||||
use futures::{sink::SinkExt, stream::SplitSink};
|
||||
|
||||
use super::models::{WebSocketClientMessage, WebSocketHandshake, WebSocketServerMessage};
|
||||
use crate::{
|
||||
app_state::{
|
||||
AppState,
|
||||
database::models::{DocumentVersionWithoutContent, VaultId, VaultUpdateId},
|
||||
},
|
||||
config::user_config::User,
|
||||
errors::{SyncServerError, server_error, unauthenticated_error},
|
||||
server::auth::auth,
|
||||
};
|
||||
|
||||
pub struct AuthenticatedWebSocketHandshake {
|
||||
pub handshake: WebSocketHandshake,
|
||||
pub user: User,
|
||||
}
|
||||
|
||||
pub fn get_authenticated_handshake(
|
||||
state: &AppState,
|
||||
vault_id: &VaultId,
|
||||
message: Option<Message>,
|
||||
) -> Result<AuthenticatedWebSocketHandshake, SyncServerError> {
|
||||
if let Some(Message::Text(message)) = message {
|
||||
let message: WebSocketClientMessage = serde_json::from_str(&message)
|
||||
.context("Failed to parse message")
|
||||
.map_err(server_error)?;
|
||||
|
||||
match message {
|
||||
WebSocketClientMessage::Handshake(handshake) => {
|
||||
let user = auth(state, handshake.token.trim(), vault_id)?;
|
||||
Ok(AuthenticatedWebSocketHandshake { handshake, user })
|
||||
}
|
||||
WebSocketClientMessage::CursorPositions(_) => Err(unauthenticated_error(
|
||||
anyhow::anyhow!("Expected a handshake message"),
|
||||
)),
|
||||
}
|
||||
} else {
|
||||
Err(unauthenticated_error(anyhow::anyhow!(
|
||||
"Failed to authenticate due to invalid message"
|
||||
)))
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn get_unseen_documents(
|
||||
state: &AppState,
|
||||
vault_id: &VaultId,
|
||||
last_seen_vault_update_id: Option<VaultUpdateId>,
|
||||
) -> Result<Vec<DocumentVersionWithoutContent>, SyncServerError> {
|
||||
if let Some(update_id) = last_seen_vault_update_id {
|
||||
state
|
||||
.database
|
||||
.get_latest_documents_since(vault_id, update_id, None)
|
||||
.await
|
||||
.map_err(server_error)
|
||||
} else {
|
||||
state
|
||||
.database
|
||||
.get_latest_documents(vault_id, None)
|
||||
.await
|
||||
.map_err(server_error)
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn send_update_over_websocket(
|
||||
update: &WebSocketServerMessage,
|
||||
sender: &mut SplitSink<WebSocket, Message>,
|
||||
) -> Result<(), SyncServerError> {
|
||||
let serialized_update = serde_json::to_string(update)
|
||||
.context("Failed to serialize update")
|
||||
.map_err(server_error)?;
|
||||
|
||||
sender
|
||||
.send(Message::Text(serialized_update))
|
||||
.await
|
||||
.context("Failed to send message over websocket")
|
||||
.map_err(server_error)
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue