Don't fail on disconnected WebSocket connection
This commit is contained in:
parent
85648c9466
commit
de26f513e9
5 changed files with 16 additions and 15 deletions
|
|
@ -4,10 +4,7 @@ use anyhow::Context;
|
||||||
use tokio::sync::{Mutex, broadcast};
|
use tokio::sync::{Mutex, broadcast};
|
||||||
|
|
||||||
use super::database::models::{DocumentVersionWithoutContent, VaultId};
|
use super::database::models::{DocumentVersionWithoutContent, VaultId};
|
||||||
use crate::{
|
use crate::{config::server_config::ServerConfig, errors::server_error};
|
||||||
config::server_config::ServerConfig,
|
|
||||||
errors::{SyncServerError, server_error},
|
|
||||||
};
|
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct Broadcasts {
|
pub struct Broadcasts {
|
||||||
|
|
@ -32,18 +29,17 @@ impl Broadcasts {
|
||||||
tx.subscribe()
|
tx.subscribe()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn send(
|
pub async fn send(&self, vault: VaultId, document: DocumentVersionWithoutContent) {
|
||||||
&self,
|
|
||||||
vault: VaultId,
|
|
||||||
document: DocumentVersionWithoutContent,
|
|
||||||
) -> Result<(), SyncServerError> {
|
|
||||||
let tx = self.get_or_create(vault).await;
|
let tx = self.get_or_create(vault).await;
|
||||||
|
|
||||||
tx.send(document)
|
let result = tx
|
||||||
|
.send(document)
|
||||||
.context("Cannot broadcast update message to websocket listeners")
|
.context("Cannot broadcast update message to websocket listeners")
|
||||||
.map_err(server_error)?;
|
.map_err(server_error);
|
||||||
|
|
||||||
Ok(())
|
if result.is_err() {
|
||||||
|
log::debug!("Failed to send message: {result:?}");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn get_or_create(
|
async fn get_or_create(
|
||||||
|
|
|
||||||
|
|
@ -146,7 +146,7 @@ async fn internal_create_document(
|
||||||
state
|
state
|
||||||
.broadcasts
|
.broadcasts
|
||||||
.send(vault_id, new_version.clone().into())
|
.send(vault_id, new_version.clone().into())
|
||||||
.await?;
|
.await;
|
||||||
|
|
||||||
Ok(Json(new_version.into()))
|
Ok(Json(new_version.into()))
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -75,7 +75,7 @@ pub async fn delete_document(
|
||||||
state
|
state
|
||||||
.broadcasts
|
.broadcasts
|
||||||
.send(vault_id, new_version.clone().into())
|
.send(vault_id, new_version.clone().into())
|
||||||
.await?;
|
.await;
|
||||||
|
|
||||||
Ok(Json(new_version.into()))
|
Ok(Json(new_version.into()))
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -221,7 +221,7 @@ async fn internal_update_document(
|
||||||
state
|
state
|
||||||
.broadcasts
|
.broadcasts
|
||||||
.send(vault_id, new_version.clone().into())
|
.send(vault_id, new_version.clone().into())
|
||||||
.await?;
|
.await;
|
||||||
|
|
||||||
Ok(Json(if is_different_from_request_content {
|
Ok(Json(if is_different_from_request_content {
|
||||||
DocumentUpdateResponse::MergingUpdate(new_version.into())
|
DocumentUpdateResponse::MergingUpdate(new_version.into())
|
||||||
|
|
|
||||||
|
|
@ -123,6 +123,11 @@ async fn websocket(
|
||||||
.context("Websocket send task failed")
|
.context("Websocket send task failed")
|
||||||
.map_err(server_error)??;
|
.map_err(server_error)??;
|
||||||
|
|
||||||
|
recv_task
|
||||||
|
.await
|
||||||
|
.context("Websocket receive task failed")
|
||||||
|
.map_err(server_error)?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue