Implement the message handlers

This commit is contained in:
Jeffrey Dallatezza
2024-12-02 11:22:22 -05:00
parent 4c53a42fb8
commit 24bec7d65d
2 changed files with 34 additions and 4 deletions
+21 -1
View File
@@ -12,7 +12,9 @@ use crate::util::prometheus_handle::IntGaugeExt;
use crate::worker_metrics::WORKER_METRICS;
use derive_more::From;
use futures::prelude::*;
use spacetimedb_client_api_messages::websocket::{CallReducerFlags, Compression, FormatSwitch};
use spacetimedb_client_api_messages::websocket::{
CallReducerFlags, Compression, FormatSwitch, SubscribeSingle, Unsubscribe,
};
use spacetimedb_lib::identity::RequestId;
use tokio::sync::{mpsc, oneshot, watch};
use tokio::task::AbortHandle;
@@ -283,6 +285,24 @@ impl ClientConnection {
.await
}
pub async fn subscribe_single(&self, subscription: SubscribeSingle, timer: Instant) -> Result<(), DBError> {
let me = self.clone();
tokio::task::spawn_blocking(move || {
me.module
.subscriptions()
.add_subscription(me.sender, subscription, timer, None)
})
.await
.unwrap() // TODO: is unwrapping right here?
}
pub async fn unsubscribe(&self, request: Unsubscribe, timer: Instant) -> Result<(), DBError> {
let me = self.clone();
tokio::task::spawn_blocking(move || me.module.subscriptions().remove_subscription(me.sender, request, timer))
.await
.unwrap() // TODO: is unwrapping right here?
}
pub async fn subscribe(&self, subscription: Subscribe, timer: Instant) -> Result<(), DBError> {
let me = self.clone();
tokio::task::spawn_blocking(move || {
+13 -3
View File
@@ -80,10 +80,20 @@ pub async fn handle(client: &ClientConnection, message: DataMessage, timer: Inst
})
}
ClientMessage::SubscribeSingle(subscription) => {
todo!("subscribe_single");
let res = client.subscribe_single(subscription, timer).await;
WORKER_METRICS
.request_round_trip
.with_label_values(&WorkloadType::Subscribe, &address, "")
.observe(timer.elapsed().as_secs_f64());
res.map_err(|e| (None, None, e.into()))
}
ClientMessage::Unsubscribe(subscription) => {
todo!("subscribe_single");
ClientMessage::Unsubscribe(request) => {
let res = client.unsubscribe(request, timer).await;
WORKER_METRICS
.request_round_trip
.with_label_values(&WorkloadType::Unsubscribe, &address, "")
.observe(timer.elapsed().as_secs_f64());
res.map_err(|e| (None, None, e.into()))
}
ClientMessage::Subscribe(subscription) => {
let res = client.subscribe(subscription, timer).await;