Fix slow commit bug
This commit is contained in:
parent
eb23f445d0
commit
0329fc29f2
7 changed files with 88 additions and 40 deletions
|
|
@ -793,23 +793,18 @@ impl Database {
|
|||
.await
|
||||
.context("Failed to commit transaction")?;
|
||||
|
||||
// For non-delete writes the originating device already has
|
||||
// authoritative state from its HTTP response, so we tag the
|
||||
// broadcast with `origin_device_id` and the send task in
|
||||
// `websocket.rs` filters it out for that device. Deletes are
|
||||
// delivered to *every* connected client including the author —
|
||||
// the originator only removes the document from its sync queue
|
||||
// once it receives this receipt.
|
||||
// Broadcast every commit to every connected client, including
|
||||
// the originator. The HTTP response is the originator's normal
|
||||
// path to learn its own update, but if the response is lost
|
||||
// (sync reset, dropped TCP) the broadcast is the only remaining
|
||||
// delivery channel — and the client-side `parentVersionId`
|
||||
// dedup absorbs the redundant message when the response made it
|
||||
// through.
|
||||
let envelope = WebSocketServerMessage::VaultUpdate(WebSocketVaultUpdate {
|
||||
document: version.clone().into(),
|
||||
});
|
||||
let with_origin = if version.is_deleted {
|
||||
WebSocketServerMessageWithOrigin::new(envelope)
|
||||
} else {
|
||||
WebSocketServerMessageWithOrigin::with_origin(version.device_id.clone(), envelope)
|
||||
};
|
||||
self.broadcasts
|
||||
.send_document_update(vault_id, with_origin)?;
|
||||
.send_document_update(vault_id, WebSocketServerMessageWithOrigin::new(envelope))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
|||
|
|
@ -80,28 +80,13 @@ pub enum WebSocketServerMessage {
|
|||
CursorPositions(CursorPositionFromServer),
|
||||
}
|
||||
|
||||
/// Broadcast envelope carrying the message plus the device that produced
|
||||
/// it. The per-recipient send task compares `origin_device_id` against
|
||||
/// its own device id to fill in `originates_from_self` before the message
|
||||
/// is serialized on the wire.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct WebSocketServerMessageWithOrigin {
|
||||
pub origin_device_id: Option<DeviceId>,
|
||||
pub message: WebSocketServerMessage,
|
||||
}
|
||||
|
||||
impl WebSocketServerMessageWithOrigin {
|
||||
pub fn new(message: WebSocketServerMessage) -> Self {
|
||||
Self {
|
||||
origin_device_id: None,
|
||||
message,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn with_origin(origin_device_id: DeviceId, message: WebSocketServerMessage) -> Self {
|
||||
Self {
|
||||
origin_device_id: Some(origin_device_id),
|
||||
message,
|
||||
}
|
||||
Self { message }
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -208,14 +208,24 @@ async fn websocket(
|
|||
loop {
|
||||
match broadcast_receiver.recv().await {
|
||||
Ok(update) => {
|
||||
// Drop messages this device authored because the HTTP
|
||||
// response already carried authoritative state back.
|
||||
// Delete broadcasts are sent without an origin so the
|
||||
// author also receives them — that's the receipt the
|
||||
// client needs to drop the doc from its sync queue.
|
||||
if Some(&device_id) == update.origin_device_id.as_ref() {
|
||||
continue;
|
||||
}
|
||||
// Always deliver vault updates to the originating
|
||||
// device too. The HTTP response is the *normal* path
|
||||
// for the originator to learn its own update, and
|
||||
// the client-side wire loop dedupes redundant
|
||||
// broadcasts via the `parentVersionId` check. But
|
||||
// when the response is lost mid-flight (sync reset,
|
||||
// pause/resume, dropped TCP) the originator has no
|
||||
// record of the doc; if the broadcast is also
|
||||
// suppressed AND the next handshake's cursor was
|
||||
// captured before the commit (cursor < vuid), the
|
||||
// doc falls through both delivery paths and is
|
||||
// stranded forever on the originator. Letting the
|
||||
// self-broadcast through closes that window — the
|
||||
// message processes as a remote create on the
|
||||
// originator's reconnected WS and the file is
|
||||
// restored. (Cursor messages still get the inner
|
||||
// self-filter below; we drop our own cursor entries
|
||||
// from the `clients` payload.)
|
||||
|
||||
// Filter out vault updates already covered by the
|
||||
// catch-up snapshot. The handshake atomically
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue