From 24bec7d65dfdca9c7df0af6772e3768a8fed7bf8 Mon Sep 17 00:00:00 2001 From: Jeffrey Dallatezza Date: Mon, 2 Dec 2024 11:22:22 -0500 Subject: [PATCH] Implement the message handlers --- crates/core/src/client/client_connection.rs | 22 ++++++++++++++++++++- crates/core/src/client/message_handlers.rs | 16 ++++++++++++--- 2 files changed, 34 insertions(+), 4 deletions(-) diff --git a/crates/core/src/client/client_connection.rs b/crates/core/src/client/client_connection.rs index 4b49bf6b47..92621c631a 100644 --- a/crates/core/src/client/client_connection.rs +++ b/crates/core/src/client/client_connection.rs @@ -12,7 +12,9 @@ use crate::util::prometheus_handle::IntGaugeExt; use crate::worker_metrics::WORKER_METRICS; use derive_more::From; use futures::prelude::*; -use spacetimedb_client_api_messages::websocket::{CallReducerFlags, Compression, FormatSwitch}; +use spacetimedb_client_api_messages::websocket::{ + CallReducerFlags, Compression, FormatSwitch, SubscribeSingle, Unsubscribe, +}; use spacetimedb_lib::identity::RequestId; use tokio::sync::{mpsc, oneshot, watch}; use tokio::task::AbortHandle; @@ -283,6 +285,24 @@ impl ClientConnection { .await } + pub async fn subscribe_single(&self, subscription: SubscribeSingle, timer: Instant) -> Result<(), DBError> { + let me = self.clone(); + tokio::task::spawn_blocking(move || { + me.module + .subscriptions() + .add_subscription(me.sender, subscription, timer, None) + }) + .await + .unwrap() // TODO: is unwrapping right here? + } + + pub async fn unsubscribe(&self, request: Unsubscribe, timer: Instant) -> Result<(), DBError> { + let me = self.clone(); + tokio::task::spawn_blocking(move || me.module.subscriptions().remove_subscription(me.sender, request, timer)) + .await + .unwrap() // TODO: is unwrapping right here? + } + pub async fn subscribe(&self, subscription: Subscribe, timer: Instant) -> Result<(), DBError> { let me = self.clone(); tokio::task::spawn_blocking(move || { diff --git a/crates/core/src/client/message_handlers.rs b/crates/core/src/client/message_handlers.rs index ce7a0b9ebb..c2e4312d42 100644 --- a/crates/core/src/client/message_handlers.rs +++ b/crates/core/src/client/message_handlers.rs @@ -80,10 +80,20 @@ pub async fn handle(client: &ClientConnection, message: DataMessage, timer: Inst }) } ClientMessage::SubscribeSingle(subscription) => { - todo!("subscribe_single"); + let res = client.subscribe_single(subscription, timer).await; + WORKER_METRICS + .request_round_trip + .with_label_values(&WorkloadType::Subscribe, &address, "") + .observe(timer.elapsed().as_secs_f64()); + res.map_err(|e| (None, None, e.into())) } - ClientMessage::Unsubscribe(subscription) => { - todo!("subscribe_single"); + ClientMessage::Unsubscribe(request) => { + let res = client.unsubscribe(request, timer).await; + WORKER_METRICS + .request_round_trip + .with_label_values(&WorkloadType::Unsubscribe, &address, "") + .observe(timer.elapsed().as_secs_f64()); + res.map_err(|e| (None, None, e.into())) } ClientMessage::Subscribe(subscription) => { let res = client.subscribe(subscription, timer).await;