Keep subscription fanout worker warm with adaptive linger policy (#4805)

# Description of Changes

Similar to https://github.com/clockworklabs/SpacetimeDB/pull/4801, after
we evaluate subscriptions on the main database thread, we send the
results to a worker whose job it is to fan out the updates for the
relevant clients. Hence we want to make sure this worker is not
constantly parked on `recv()` as each `send` on the main thread will
incur overhead waking the task.

To avoid this I've added a utility that wraps an `mpsc`
`UnboundedReceiver` with an adaptive "linger" policy. On each message
`recv`, the worker will now "linger" for a period of time and wait for
any more messages before parking itself on the `recv()` again.

# API and ABI breaking changes

None

# Expected complexity level and risk

1

# Testing

Manual performance testing for now. Automation to follow.
This commit is contained in:
joshua-spacetime
2026-04-21 18:21:44 -07:00
committed by GitHub
parent 78d6b6f7dd
commit 91494c9cf2
3 changed files with 163 additions and 2 deletions
@@ -10,6 +10,7 @@ use crate::host::module_host::{DatabaseTableUpdate, ModuleEvent, UpdatesRelValue
use crate::subscription::delta::eval_delta;
use crate::subscription::row_list_builder_pool::{BsatnRowListBuilderPool, JsonRowListBuilderFakePool};
use crate::subscription::websocket_building::{BuildableWebsocketFormat, RowListBuilderSource};
use crate::util::adaptive_recv::AdaptiveUnboundedReceiver;
use crate::worker_metrics::WORKER_METRICS;
type V2EvalUpdatesResult = (Vec<V2ClientUpdate>, Vec<(SubscriptionIdV2, Box<str>)>, ExecutionMetrics);
use core::mem;
@@ -37,6 +38,7 @@ use std::collections::BTreeMap;
use std::fmt::Debug;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{mpsc, oneshot};
/// Clients are uniquely identified by their Identity and ConnectionId.
@@ -1715,7 +1717,7 @@ impl SendWorkerClient {
/// See comment on the `send_worker_tx` field in [`SubscriptionManager`] for motivation.
struct SendWorker {
/// Receiver end of the [`SubscriptionManager`]'s `send_worker_tx` channel.
rx: mpsc::UnboundedReceiver<SendWorkerMessage>,
rx: AdaptiveUnboundedReceiver<SendWorkerMessage>,
/// `subscription_send_queue_length` metric labeled for this database's `Identity`.
///
@@ -1756,6 +1758,12 @@ impl Drop for SendWorker {
}
impl SendWorker {
// Keep the worker warm briefly after handling a message so bursts do not
// pay a park/unpark cost on every enqueue, while still parking quickly
// once traffic goes quiet.
const BASELINE_LINGER: Duration = Duration::from_micros(25);
const MAX_LINGER: Duration = Duration::from_micros(200);
fn is_client_dropped_or_cancelled(&self, client_id: &ClientId) -> bool {
self.clients
.get(client_id)
@@ -1814,7 +1822,7 @@ impl SendWorker {
database_identity_to_clean_up_metric: Option<Identity>,
) -> Self {
Self {
rx,
rx: AdaptiveUnboundedReceiver::new(rx, Self::BASELINE_LINGER, Self::MAX_LINGER),
queue_length_metric,
clients: Default::default(),
database_identity_to_clean_up_metric,
+152
View File
@@ -0,0 +1,152 @@
use std::time::Duration;
use tokio::sync::mpsc::{self, error::TryRecvError};
use tokio::time::sleep;
/// Receives from a Tokio unbounded channel with an adaptive linger policy.
///
/// This helper is intended for single-consumer background workers that want
/// to avoid parking on `recv()` after every message during bursty traffic.
///
/// The receiver has two modes - hot and cold. In cold mode it blocks on
/// `recv()` until the next message arrives. In hot mode it prefers to stay
/// awake, so after receiving a message, it will drain the channel, sleep for
/// a short period (linger), and only then poll the channel again. This keeps
/// the receiver off `recv()` during the linger window, so producers can enqueue
/// more work without waking a parked task.
///
/// The linger policy is as follows: If work is present when a linger window
/// expires, double the window up to `max_linger`. If a linger window expires
/// and the queue is still empty, reset the window to `baseline_linger`.
///
/// Note, messages returned immediately by `try_recv()` do not count as hits,
/// and do not double the linger window.
#[derive(Debug)]
pub struct AdaptiveUnboundedReceiver<T> {
rx: mpsc::UnboundedReceiver<T>,
linger: AdaptiveLinger,
is_hot: bool,
}
impl<T> AdaptiveUnboundedReceiver<T> {
/// Create an adaptive receiver around a Tokio unbounded channel.
///
/// `baseline_linger` is the linger window used after a cold wakeup or any
/// linger miss. `max_linger` caps how far the linger window may grow after
/// repeated linger hits.
///
/// This constructor does not spawn any tasks and does not alter the
/// channel's ordering semantics. It only configures how aggressively the
/// consumer stays awake after work arrives.
pub fn new(rx: mpsc::UnboundedReceiver<T>, baseline_linger: Duration, max_linger: Duration) -> Self {
Self {
rx,
linger: AdaptiveLinger::new(baseline_linger, max_linger),
is_hot: false,
}
}
/// Receive the next message while adapting how aggressively we linger
/// before parking again.
///
/// Once a worker has been woken up by one message, subsequent calls try to
/// stay on the hot path:
///
/// 1. Drain any already-queued work immediately with `try_recv()`
/// 2. If the queue is empty, sleep for the current linger window
/// 3. When the sleep fires, poll the queue again with `try_recv()`
/// 4. On a linger hit, double the window and continue lingering
/// 5. On a linger miss, reset the window to the baseline and park on `recv()`
///
/// This trades a small amount of hot-path latency for lower wake overhead.
/// While the receiver is hot, senders enqueue into the channel without
/// waking a parked `recv()` future.
pub async fn recv(&mut self) -> Option<T> {
loop {
if !self.is_hot {
let message = self.rx.recv().await?;
self.is_hot = true;
return Some(message);
}
match self.rx.try_recv() {
Ok(message) => return Some(message),
Err(TryRecvError::Disconnected) => return None,
Err(TryRecvError::Empty) => {}
}
let linger = self.linger.current();
// In case the baseline is zero,
// this will cause the loop to do `self.rx.recv(..)`.
if linger.is_zero() {
self.cool_down();
continue;
}
sleep(linger).await;
match self.rx.try_recv() {
Ok(message) => {
self.linger.on_hit();
return Some(message);
}
Err(TryRecvError::Disconnected) => return None,
Err(TryRecvError::Empty) => {
self.cool_down();
}
}
}
}
/// Return the receiver to its cold state after a linger miss.
///
/// The next call to [`Self::recv`] will block on the underlying channel
/// instead of continuing to linger, and the linger policy is reset to its
/// baseline window.
fn cool_down(&mut self) {
self.is_hot = false;
self.linger.on_miss();
}
}
#[derive(Debug)]
struct AdaptiveLinger {
baseline: Duration,
current: Duration,
max: Duration,
}
impl AdaptiveLinger {
/// Create a linger policy with a baseline window and an upper bound.
///
/// `baseline` is the window restored after any linger miss. `max` caps how
/// far the window may grow after repeated linger hits.
fn new(baseline: Duration, max: Duration) -> Self {
assert!(
baseline <= max,
"baseline linger ({baseline:?}) must not exceed max linger ({max:?})"
);
Self {
baseline,
current: baseline,
max,
}
}
/// Return the current linger window.
fn current(&self) -> Duration {
self.current
}
/// Record a linger hit by growing the next linger window.
///
/// The window doubles on each hit until it reaches `self.max`.
fn on_hit(&mut self) {
self.current = self.current.saturating_mul(2).min(self.max);
}
/// Record a linger miss by resetting to the baseline window.
fn on_miss(&mut self) {
self.current = self.baseline;
}
}
+1
View File
@@ -6,6 +6,7 @@ use tracing::Span;
pub mod prometheus_handle;
pub mod adaptive_recv;
pub mod jobs;
pub mod notify_once;
pub mod thread_scheduling;