From 2d43c652a8a5c8d79c3ec2814abc44ffc0dddec7 Mon Sep 17 00:00:00 2001 From: Adam Lamers Date: Wed, 8 Apr 2026 01:52:31 -0400 Subject: [PATCH] rust formatting --- spacetimedb/src/lib.rs | 20 +- spacetimedb/src/reducers.rs | 564 +++++++++++++++++++++----- spacetimedb/src/tables.rs | 4 + spacetimedb/src/utils.rs | 154 +++++-- spacetimedb/src/views.rs | 192 ++++++--- src/chat/services/messaging.svelte.ts | 26 ++ 6 files changed, 770 insertions(+), 190 deletions(-) diff --git a/spacetimedb/src/lib.rs b/spacetimedb/src/lib.rs index 19118f1..4b65cf2 100644 --- a/spacetimedb/src/lib.rs +++ b/spacetimedb/src/lib.rs @@ -1,18 +1,24 @@ use spacetimedb::{ReducerContext, Table}; -mod tables; mod reducers; +mod tables; mod utils; mod views; -pub use tables::*; pub use reducers::*; +pub use tables::*; pub use utils::*; pub use views::*; #[spacetimedb::reducer(init)] pub fn init(ctx: &ReducerContext) { - if ctx.db.system_configuration().key().find("max_message_length".to_string()).is_none() { + if ctx + .db + .system_configuration() + .key() + .find("max_message_length".to_string()) + .is_none() + { ctx.db.system_configuration().insert(SystemConfiguration { key: "max_message_length".to_string(), value: "262144".to_string(), @@ -44,7 +50,7 @@ pub fn init(ctx: &ReducerContext) { #[spacetimedb::reducer(client_connected)] pub fn on_connect(ctx: &ReducerContext) { log::info!("on_connect START: identity={}", ctx.sender().to_hex()); - + // We'll keep this extremely minimal to ensure connection stability if let Some(mut user) = ctx.db.user().identity().find(ctx.sender()) { user.online = true; @@ -62,7 +68,7 @@ pub fn on_connect(ctx: &ReducerContext) { biography: None, status: None, }); - + // Minimal auto-join let community_server = ctx.db.server().name().filter(&"Zep".to_string()).next(); if let Some(s) = community_server { @@ -83,11 +89,11 @@ pub fn on_disconnect(ctx: &ReducerContext) { user.online = false; ctx.db.user().identity().update(user); } - + if let Some(vs) = ctx.db.voice_state().identity().find(ctx.sender()) { ctx.db.voice_state().delete(vs); } - + if let Some(ta) = ctx.db.typing_activity().identity().find(ctx.sender()) { ctx.db.typing_activity().delete(ta); } diff --git a/spacetimedb/src/reducers.rs b/spacetimedb/src/reducers.rs index 709cf52..bc47743 100644 --- a/spacetimedb/src/reducers.rs +++ b/spacetimedb/src/reducers.rs @@ -1,6 +1,6 @@ -use spacetimedb::{ReducerContext, Identity, Table}; use crate::tables::*; use crate::utils::*; +use spacetimedb::{Identity, ReducerContext, Table}; #[spacetimedb::reducer] pub fn set_typing(ctx: &ReducerContext, channel_id: u64, typing: bool) { @@ -20,7 +20,13 @@ pub fn set_typing(ctx: &ReducerContext, channel_id: u64, typing: bool) { } #[spacetimedb::reducer] -pub fn upload_image(ctx: &ReducerContext, data: Vec, mime_type: String, name: Option, client_id: Option) { +pub fn upload_image( + ctx: &ReducerContext, + data: Vec, + mime_type: String, + name: Option, + client_id: Option, +) { if let Some(ref cid) = client_id { if let Some(_) = ctx.db.upload_status().client_id().find(cid.clone()) { ctx.db.upload_status().client_id().delete(cid.clone()); @@ -80,7 +86,12 @@ pub fn upload_custom_emoji(ctx: &ReducerContext, name: String, category: String, if data.len() > 256 * 1024 { panic!("Emoji image exceeds 256KB limit"); } - ctx.db.custom_emoji().insert(CustomEmoji { id: 0, name, category, data }); + ctx.db.custom_emoji().insert(CustomEmoji { + id: 0, + name, + category, + data, + }); } #[spacetimedb::reducer] @@ -122,11 +133,21 @@ pub fn upload_banner(ctx: &ReducerContext, data: Vec, mime_type: String) { } #[spacetimedb::reducer] -pub fn upload_server_avatar(ctx: &ReducerContext, server_id: u64, data: Vec, mime_type: String) { +pub fn upload_server_avatar( + ctx: &ReducerContext, + server_id: u64, + data: Vec, + mime_type: String, +) { if data.len() > 1024 * 1024 { panic!("Avatar exceeds 1MB limit"); } - let mut s = ctx.db.server().id().find(server_id).expect("Server not found"); + let mut s = ctx + .db + .server() + .id() + .find(server_id) + .expect("Server not found"); let img = ctx.db.image().insert(Image { id: 0, data, @@ -140,29 +161,57 @@ pub fn upload_server_avatar(ctx: &ReducerContext, server_id: u64, data: Vec, #[spacetimedb::reducer] pub fn update_server_name(ctx: &ReducerContext, server_id: u64, name: String) { validate_name(&name).expect("Invalid name"); - let mut s = ctx.db.server().id().find(server_id).expect("Server not found"); + let mut s = ctx + .db + .server() + .id() + .find(server_id) + .expect("Server not found"); s.name = name; ctx.db.server().id().update(s); } #[spacetimedb::reducer] pub fn delete_server(ctx: &ReducerContext, server_id: u64) { - let _ = ctx.db.server().id().find(server_id).expect("Server not found"); + let _ = ctx + .db + .server() + .id() + .find(server_id) + .expect("Server not found"); let channels: Vec<_> = ctx.db.channel().server_id().filter(server_id).collect(); for c in channels { - let messages: Vec<_> = ctx.db.message().channel_id().filter(c.id).map(|m| m.id).collect(); + let messages: Vec<_> = ctx + .db + .message() + .channel_id() + .filter(c.id) + .map(|m| m.id) + .collect(); for id in messages { ctx.db.message().id().delete(id); } - let recent: Vec<_> = ctx.db.recent_message().channel_id().filter(c.id).map(|rm| rm.id).collect(); + let recent: Vec<_> = ctx + .db + .recent_message() + .channel_id() + .filter(c.id) + .map(|rm| rm.id) + .collect(); for id in recent { ctx.db.recent_message().id().delete(id); } ctx.db.channel().id().delete(c.id); } - let members: Vec<_> = ctx.db.server_member().server_id().filter(server_id).map(|m| m.id).collect(); + let members: Vec<_> = ctx + .db + .server_member() + .server_id() + .filter(server_id) + .map(|m| m.id) + .collect(); for id in members { ctx.db.server_member().id().delete(id); } @@ -171,29 +220,63 @@ pub fn delete_server(ctx: &ReducerContext, server_id: u64) { } #[spacetimedb::reducer] -pub fn toggle_reaction(ctx: &ReducerContext, message_id: u64, emoji: Option, custom_emoji_id: Option) { +pub fn toggle_reaction( + ctx: &ReducerContext, + message_id: u64, + emoji: Option, + custom_emoji_id: Option, +) { if emoji.is_none() && custom_emoji_id.is_none() { panic!("Emoji or CustomEmojiId required"); } - let user = ctx.db.user().identity().find(ctx.sender()).expect("User not found"); + let user = ctx + .db + .user() + .identity() + .find(ctx.sender()) + .expect("User not found"); if (user.subject.is_none() && user.name.is_none()) { panic!("You must have a name or be logged in via OIDC to perform this action"); } - - let existing = ctx.db.message_reaction().message_id().filter(message_id).find(|r| { - if r.identity != ctx.sender() { return false; } - if emoji.is_some() && r.emoji == emoji { return true; } - if custom_emoji_id.is_some() && r.custom_emoji_id == custom_emoji_id { return true; } - false - }); + let existing = ctx + .db + .message_reaction() + .message_id() + .filter(message_id) + .find(|r| { + if r.identity != ctx.sender() { + return false; + } + if emoji.is_some() && r.emoji == emoji { + return true; + } + if custom_emoji_id.is_some() && r.custom_emoji_id == custom_emoji_id { + return true; + } + false + }); if let Some(r) = existing { ctx.db.message_reaction().id().delete(r.id); } else { + let msg = ctx + .db + .message() + .id() + .find(message_id) + .expect("Message not found"); + let channel = ctx + .db + .channel() + .id() + .find(msg.channel_id) + .expect("Channel not found"); + ctx.db.message_reaction().insert(MessageReaction { id: 0, + server_id: channel.server_id, message_id, identity: ctx.sender(), emoji, @@ -205,53 +288,89 @@ pub fn toggle_reaction(ctx: &ReducerContext, message_id: u64, emoji: Option) { - let mut user = ctx.db.user().identity().find(ctx.sender()).expect("User not found"); + let mut user = ctx + .db + .user() + .identity() + .find(ctx.sender()) + .expect("User not found"); user.avatar_id = avatar_id; ctx.db.user().identity().update(user); } #[spacetimedb::reducer] pub fn set_banner(ctx: &ReducerContext, banner_id: Option) { - let mut user = ctx.db.user().identity().find(ctx.sender()).expect("User not found"); + let mut user = ctx + .db + .user() + .identity() + .find(ctx.sender()) + .expect("User not found"); user.banner_id = banner_id; ctx.db.user().identity().update(user); } #[spacetimedb::reducer] pub fn set_biography(ctx: &ReducerContext, biography: Option) { - let mut user = ctx.db.user().identity().find(ctx.sender()).expect("User not found"); + let mut user = ctx + .db + .user() + .identity() + .find(ctx.sender()) + .expect("User not found"); user.biography = biography; ctx.db.user().identity().update(user); } #[spacetimedb::reducer] pub fn set_status(ctx: &ReducerContext, status: Option) { - let mut user = ctx.db.user().identity().find(ctx.sender()).expect("User not found"); + let mut user = ctx + .db + .user() + .identity() + .find(ctx.sender()) + .expect("User not found"); user.status = status; ctx.db.user().identity().update(user); } #[spacetimedb::reducer] pub fn subscribe_to_channel(ctx: &ReducerContext, channel_id: u64) { - let hwm = ctx.db.channel_high_water_mark().channel_id().find(channel_id); + let hwm = ctx + .db + .channel_high_water_mark() + .channel_id() + .find(channel_id); let current_max = hwm.map(|h| h.last_seq_id).unwrap_or(0); - let earliest = if current_max > 99 { current_max - 99 } else { 1 }; + let earliest = if current_max > 99 { + current_max - 99 + } else { + 1 + }; if let Some(existing) = ctx.db.channel_subscription().identity().find(ctx.sender()) { if existing.channel_id != channel_id { - ctx.db.channel_subscription().identity().update(ChannelSubscription { - identity: ctx.sender(), - channel_id, - earliest_seq_id: earliest, - last_read_seq_id: current_max, - }); + ctx.db + .channel_subscription() + .identity() + .update(ChannelSubscription { + identity: ctx.sender(), + channel_id, + earliest_seq_id: earliest, + last_read_seq_id: current_max, + }); } } else { ctx.db.channel_subscription().insert(ChannelSubscription { @@ -293,12 +412,22 @@ pub fn set_talking(ctx: &ReducerContext, talking: bool, channel_id: u64) { #[spacetimedb::reducer] pub fn create_server(ctx: &ReducerContext, name: String) { validate_name(&name).expect("Invalid name"); - let user = ctx.db.user().identity().find(ctx.sender()).expect("User not found"); + let user = ctx + .db + .user() + .identity() + .find(ctx.sender()) + .expect("User not found"); if (user.subject.is_none() && user.name.is_none()) { panic!("You must have a name or be logged in via OIDC to perform this action"); } - let s = ctx.db.server().insert(Server { id: 0, name, owner: Some(ctx.sender()), avatar_id: None }); + let s = ctx.db.server().insert(Server { + id: 0, + name, + owner: Some(ctx.sender()), + avatar_id: None, + }); ctx.db.server_member().insert(ServerMember { id: 0, identity: ctx.sender(), @@ -320,15 +449,27 @@ pub fn create_server(ctx: &ReducerContext, name: String) { #[spacetimedb::reducer] pub fn join_server(ctx: &ReducerContext, server_id: u64) { - let user = ctx.db.user().identity().find(ctx.sender()).expect("User not found"); + let user = ctx + .db + .user() + .identity() + .find(ctx.sender()) + .expect("User not found"); if (user.subject.is_none() && user.name.is_none()) { panic!("You must have a name or be logged in via OIDC to perform this action"); } - let _ = ctx.db.server().id().find(server_id).expect("Server not found"); + let _ = ctx + .db + .server() + .id() + .find(server_id) + .expect("Server not found"); for m in ctx.db.server_member().identity().filter(ctx.sender()) { - if m.server_id == server_id { return; } + if m.server_id == server_id { + return; + } } ctx.db.server_member().insert(ServerMember { @@ -340,7 +481,13 @@ pub fn join_server(ctx: &ReducerContext, server_id: u64) { #[spacetimedb::reducer] pub fn leave_server(ctx: &ReducerContext, server_id: u64) { - let members: Vec<_> = ctx.db.server_member().identity().filter(ctx.sender()).map(|m| m.id).collect(); + let members: Vec<_> = ctx + .db + .server_member() + .identity() + .filter(ctx.sender()) + .map(|m| m.id) + .collect(); for id in members { if let Some(m) = ctx.db.server_member().id().find(id) { if m.server_id == server_id { @@ -353,28 +500,52 @@ pub fn leave_server(ctx: &ReducerContext, server_id: u64) { #[spacetimedb::reducer] pub fn create_channel(ctx: &ReducerContext, name: String, server_id: u64, is_voice: bool) { validate_name(&name).expect("Invalid name"); - let user = ctx.db.user().identity().find(ctx.sender()).expect("User not found"); + let user = ctx + .db + .user() + .identity() + .find(ctx.sender()) + .expect("User not found"); if (user.subject.is_none() && user.name.is_none()) { panic!("You must have a name or be logged in via OIDC to perform this action"); } - let _ = ctx.db.server().id().find(server_id).expect("Server not found"); + let _ = ctx + .db + .server() + .id() + .find(server_id) + .expect("Server not found"); ctx.db.channel().insert(Channel { id: 0, server_id, name, - kind: if is_voice { ChannelKind::Voice } else { ChannelKind::Text }, + kind: if is_voice { + ChannelKind::Voice + } else { + ChannelKind::Text + }, }); } #[spacetimedb::reducer] pub fn join_voice(ctx: &ReducerContext, channel_id: u64) { - let user = ctx.db.user().identity().find(ctx.sender()).expect("User not found"); + let user = ctx + .db + .user() + .identity() + .find(ctx.sender()) + .expect("User not found"); if (user.subject.is_none() && user.name.is_none()) { panic!("You must have a name or be logged in via OIDC to perform this action"); } - let chan = ctx.db.channel().id().find(channel_id).expect("Invalid channel"); + let chan = ctx + .db + .channel() + .id() + .find(channel_id) + .expect("Invalid channel"); if !matches!(chan.kind, ChannelKind::Voice) { panic!("Invalid voice channel"); } @@ -408,17 +579,49 @@ pub fn set_sharing_screen(ctx: &ReducerContext, sharing: bool) { ctx.db.voice_state().identity().update(state); if !sharing { - let watching: Vec<_> = ctx.db.watching().watchee().filter(ctx.sender()).map(|w| w.id).collect(); - for id in watching { ctx.db.watching().id().delete(id); } - - let s_offers: Vec<_> = ctx.db.screen_sdp_offer().sender().filter(ctx.sender()).map(|r| r.id).collect(); - for id in s_offers { ctx.db.screen_sdp_offer().id().delete(id); } - - let s_answers: Vec<_> = ctx.db.screen_sdp_answer().sender().filter(ctx.sender()).map(|r| r.id).collect(); - for id in s_answers { ctx.db.screen_sdp_answer().id().delete(id); } - - let s_ice: Vec<_> = ctx.db.screen_ice_candidate().sender().filter(ctx.sender()).map(|r| r.id).collect(); - for id in s_ice { ctx.db.screen_ice_candidate().id().delete(id); } + let watching: Vec<_> = ctx + .db + .watching() + .watchee() + .filter(ctx.sender()) + .map(|w| w.id) + .collect(); + for id in watching { + ctx.db.watching().id().delete(id); + } + + let s_offers: Vec<_> = ctx + .db + .screen_sdp_offer() + .sender() + .filter(ctx.sender()) + .map(|r| r.id) + .collect(); + for id in s_offers { + ctx.db.screen_sdp_offer().id().delete(id); + } + + let s_answers: Vec<_> = ctx + .db + .screen_sdp_answer() + .sender() + .filter(ctx.sender()) + .map(|r| r.id) + .collect(); + for id in s_answers { + ctx.db.screen_sdp_answer().id().delete(id); + } + + let s_ice: Vec<_> = ctx + .db + .screen_ice_candidate() + .sender() + .filter(ctx.sender()) + .map(|r| r.id) + .collect(); + for id in s_ice { + ctx.db.screen_ice_candidate().id().delete(id); + } } } } @@ -441,10 +644,14 @@ pub fn set_deafen(ctx: &ReducerContext, deafened: bool) { #[spacetimedb::reducer] pub fn start_watching(ctx: &ReducerContext, watchee: Identity, channel_id: u64) { - if ctx.sender() == watchee { return; } + if ctx.sender() == watchee { + return; + } for w in ctx.db.watching().watcher().filter(ctx.sender()) { - if w.watchee == watchee { return; } + if w.watchee == watchee { + return; + } } ctx.db.watching().insert(Watching { id: 0, @@ -456,9 +663,14 @@ pub fn start_watching(ctx: &ReducerContext, watchee: Identity, channel_id: u64) #[spacetimedb::reducer] pub fn stop_watching(ctx: &ReducerContext, watchee: Identity) { - let to_delete: Vec<_> = ctx.db.watching().watcher().filter(ctx.sender()) + let to_delete: Vec<_> = ctx + .db + .watching() + .watcher() + .filter(ctx.sender()) .filter(|w| w.watchee == watchee) - .map(|w| w.id).collect(); + .map(|w| w.id) + .collect(); for id in to_delete { ctx.db.watching().id().delete(id); } @@ -473,7 +685,12 @@ pub fn leave_voice(ctx: &ReducerContext) { } #[spacetimedb::reducer] -pub fn send_voice_sdp_offer(ctx: &ReducerContext, receiver: Identity, sdp: String, channel_id: u64) { +pub fn send_voice_sdp_offer( + ctx: &ReducerContext, + receiver: Identity, + sdp: String, + channel_id: u64, +) { ctx.db.voice_sdp_offer().insert(VoiceSdpOffer { id: 0, sender: ctx.sender(), @@ -484,7 +701,12 @@ pub fn send_voice_sdp_offer(ctx: &ReducerContext, receiver: Identity, sdp: Strin } #[spacetimedb::reducer] -pub fn send_voice_sdp_answer(ctx: &ReducerContext, receiver: Identity, sdp: String, channel_id: u64) { +pub fn send_voice_sdp_answer( + ctx: &ReducerContext, + receiver: Identity, + sdp: String, + channel_id: u64, +) { ctx.db.voice_sdp_answer().insert(VoiceSdpAnswer { id: 0, sender: ctx.sender(), @@ -495,7 +717,12 @@ pub fn send_voice_sdp_answer(ctx: &ReducerContext, receiver: Identity, sdp: Stri } #[spacetimedb::reducer] -pub fn send_voice_ice_candidate(ctx: &ReducerContext, receiver: Identity, candidate: String, channel_id: u64) { +pub fn send_voice_ice_candidate( + ctx: &ReducerContext, + receiver: Identity, + candidate: String, + channel_id: u64, +) { ctx.db.voice_ice_candidate().insert(VoiceIceCandidate { id: 0, sender: ctx.sender(), @@ -506,7 +733,12 @@ pub fn send_voice_ice_candidate(ctx: &ReducerContext, receiver: Identity, candid } #[spacetimedb::reducer] -pub fn send_screen_sdp_offer(ctx: &ReducerContext, receiver: Identity, sdp: String, channel_id: u64) { +pub fn send_screen_sdp_offer( + ctx: &ReducerContext, + receiver: Identity, + sdp: String, + channel_id: u64, +) { ctx.db.screen_sdp_offer().insert(ScreenSdpOffer { id: 0, sender: ctx.sender(), @@ -517,7 +749,12 @@ pub fn send_screen_sdp_offer(ctx: &ReducerContext, receiver: Identity, sdp: Stri } #[spacetimedb::reducer] -pub fn send_screen_sdp_answer(ctx: &ReducerContext, receiver: Identity, sdp: String, channel_id: u64) { +pub fn send_screen_sdp_answer( + ctx: &ReducerContext, + receiver: Identity, + sdp: String, + channel_id: u64, +) { ctx.db.screen_sdp_answer().insert(ScreenSdpAnswer { id: 0, sender: ctx.sender(), @@ -528,7 +765,12 @@ pub fn send_screen_sdp_answer(ctx: &ReducerContext, receiver: Identity, sdp: Str } #[spacetimedb::reducer] -pub fn send_screen_ice_candidate(ctx: &ReducerContext, receiver: Identity, candidate: String, channel_id: u64) { +pub fn send_screen_ice_candidate( + ctx: &ReducerContext, + receiver: Identity, + candidate: String, + channel_id: u64, +) { ctx.db.screen_ice_candidate().insert(ScreenIceCandidate { id: 0, sender: ctx.sender(), @@ -541,9 +783,14 @@ pub fn send_screen_ice_candidate(ctx: &ReducerContext, receiver: Identity, candi #[spacetimedb::reducer] pub fn set_configuration(ctx: &ReducerContext, key: String, value: String) { if let Some(_) = ctx.db.system_configuration().key().find(key.clone()) { - ctx.db.system_configuration().key().update(SystemConfiguration { key, value }); + ctx.db + .system_configuration() + .key() + .update(SystemConfiguration { key, value }); } else { - ctx.db.system_configuration().insert(SystemConfiguration { key, value }); + ctx.db + .system_configuration() + .insert(SystemConfiguration { key, value }); } } @@ -552,19 +799,34 @@ pub fn create_thread(ctx: &ReducerContext, name: String, channel_id: u64, parent let mut thread_name = name; if thread_name.trim().is_empty() { let parent_msg = ctx.db.message().id().find(parent_message_id); - thread_name = parent_msg.map(|m| { - if m.text.trim().is_empty() { "New Thread".to_string() } - else { m.text.chars().take(32).collect() } - }).unwrap_or("New Thread".to_string()); + thread_name = parent_msg + .map(|m| { + if m.text.trim().is_empty() { + "New Thread".to_string() + } else { + m.text.chars().take(32).collect() + } + }) + .unwrap_or("New Thread".to_string()); } validate_name(&thread_name).expect("Invalid name"); - let user = ctx.db.user().identity().find(ctx.sender()).expect("User not found"); + let user = ctx + .db + .user() + .identity() + .find(ctx.sender()) + .expect("User not found"); if (user.subject.is_none() && user.name.is_none()) { panic!("You must have a name or be logged in via OIDC to perform this action"); } - let _ = ctx.db.message().id().find(parent_message_id).expect("Parent message not found"); + let _ = ctx + .db + .message() + .id() + .find(parent_message_id) + .expect("Parent message not found"); ctx.db.thread().insert(Thread { id: 0, @@ -575,7 +837,14 @@ pub fn create_thread(ctx: &ReducerContext, name: String, channel_id: u64, parent } #[spacetimedb::reducer] -pub fn create_thread_with_message(ctx: &ReducerContext, name: String, channel_id: u64, parent_message_id: u64, text: String, image_ids: Vec) { +pub fn create_thread_with_message( + ctx: &ReducerContext, + name: String, + channel_id: u64, + parent_message_id: u64, + text: String, + image_ids: Vec, +) { if text.trim().is_empty() && image_ids.is_empty() { panic!("Messages must not be empty"); } @@ -584,12 +853,22 @@ pub fn create_thread_with_message(ctx: &ReducerContext, name: String, channel_id validate_message_length(&ctx.db, &text).expect("Message too long"); } - let user = ctx.db.user().identity().find(ctx.sender()).expect("User not found"); + let user = ctx + .db + .user() + .identity() + .find(ctx.sender()) + .expect("User not found"); if (user.subject.is_none() && user.name.is_none()) { panic!("You must have a name or be logged in via OIDC to perform this action"); } - let _ = ctx.db.message().id().find(parent_message_id).expect("Parent message not found"); + let _ = ctx + .db + .message() + .id() + .find(parent_message_id) + .expect("Parent message not found"); let t = ctx.db.thread().insert(Thread { id: 0, @@ -608,25 +887,33 @@ pub fn create_thread_with_message(ctx: &ReducerContext, name: String, channel_id }); let seq_id = get_next_seq_id(&ctx.db, channel_id); - ctx.db.channel_message_sequence().insert(ChannelMessageSequence { - message_id: msg.id, - channel_id, - seq_id, - }); + ctx.db + .channel_message_sequence() + .insert(ChannelMessageSequence { + message_id: msg.id, + channel_id, + seq_id, + }); + + let chan = ctx + .db + .channel() + .id() + .find(channel_id) + .expect("Channel not found"); for image_id in image_ids { ctx.db.message_image().insert(MessageImage { id: 0, + server_id: chan.server_id, message_id: msg.id, image_id, }); } - let channel = ctx.db.channel().id().find(channel_id).expect("Channel not found"); - ctx.db.recent_message().insert(RecentMessage { id: 0, - server_id: channel.server_id, + server_id: chan.server_id, channel_id, message_id: msg.id, sender: ctx.sender(), @@ -638,7 +925,11 @@ pub fn create_thread_with_message(ctx: &ReducerContext, name: String, channel_id if seq_id > 100 { let old_seq_id = seq_id - 100; - let to_delete: Vec<_> = ctx.db.recent_message().channel_id().filter(channel_id) + let to_delete: Vec<_> = ctx + .db + .recent_message() + .channel_id() + .filter(channel_id) .filter(|m| m.seq_id <= old_seq_id) .map(|m| m.id) .collect(); @@ -649,7 +940,13 @@ pub fn create_thread_with_message(ctx: &ReducerContext, name: String, channel_id } #[spacetimedb::reducer] -pub fn send_message(ctx: &ReducerContext, text: String, channel_id: u64, thread_id: Option, image_ids: Vec) { +pub fn send_message( + ctx: &ReducerContext, + text: String, + channel_id: u64, + thread_id: Option, + image_ids: Vec, +) { if text.trim().is_empty() && image_ids.is_empty() { panic!("Messages must not be empty"); } @@ -658,7 +955,12 @@ pub fn send_message(ctx: &ReducerContext, text: String, channel_id: u64, thread_ validate_message_length(&ctx.db, &text).expect("Message too long"); } - let _user = ctx.db.user().identity().find(ctx.sender()).expect("You must be registered to send messages"); + let _user = ctx + .db + .user() + .identity() + .find(ctx.sender()) + .expect("You must be registered to send messages"); let msg = ctx.db.message().insert(Message { id: 0, @@ -670,25 +972,33 @@ pub fn send_message(ctx: &ReducerContext, text: String, channel_id: u64, thread_ }); let seq_id = get_next_seq_id(&ctx.db, channel_id); - ctx.db.channel_message_sequence().insert(ChannelMessageSequence { - message_id: msg.id, - channel_id, - seq_id, - }); + ctx.db + .channel_message_sequence() + .insert(ChannelMessageSequence { + message_id: msg.id, + channel_id, + seq_id, + }); + + let chan = ctx + .db + .channel() + .id() + .find(channel_id) + .expect("Channel not found"); for image_id in image_ids { ctx.db.message_image().insert(MessageImage { id: 0, + server_id: chan.server_id, message_id: msg.id, image_id, }); } - let channel = ctx.db.channel().id().find(channel_id).expect("Channel not found"); - ctx.db.recent_message().insert(RecentMessage { id: 0, - server_id: channel.server_id, + server_id: chan.server_id, channel_id, message_id: msg.id, sender: ctx.sender(), @@ -700,7 +1010,11 @@ pub fn send_message(ctx: &ReducerContext, text: String, channel_id: u64, thread_ if seq_id > 100 { let old_seq_id = seq_id - 100; - let to_delete: Vec<_> = ctx.db.recent_message().channel_id().filter(channel_id) + let to_delete: Vec<_> = ctx + .db + .recent_message() + .channel_id() + .filter(channel_id) .filter(|m| m.seq_id <= old_seq_id) .map(|m| m.id) .collect(); @@ -712,23 +1026,41 @@ pub fn send_message(ctx: &ReducerContext, text: String, channel_id: u64, thread_ #[spacetimedb::reducer] pub fn bootstrap_sequences(ctx: &ReducerContext) { - let cms_ids: Vec<_> = ctx.db.channel_message_sequence().iter().map(|r| r.message_id).collect(); - for id in cms_ids { ctx.db.channel_message_sequence().message_id().delete(id); } - - let hwm_ids: Vec<_> = ctx.db.channel_high_water_mark().iter().map(|r| r.channel_id).collect(); - for id in hwm_ids { ctx.db.channel_high_water_mark().channel_id().delete(id); } + let cms_ids: Vec<_> = ctx + .db + .channel_message_sequence() + .iter() + .map(|r| r.message_id) + .collect(); + for id in cms_ids { + ctx.db.channel_message_sequence().message_id().delete(id); + } + + let hwm_ids: Vec<_> = ctx + .db + .channel_high_water_mark() + .iter() + .map(|r| r.channel_id) + .collect(); + for id in hwm_ids { + ctx.db.channel_high_water_mark().channel_id().delete(id); + } let rm_ids: Vec<_> = ctx.db.recent_message().iter().map(|r| r.id).collect(); - for id in rm_ids { ctx.db.recent_message().id().delete(id); } + for id in rm_ids { + ctx.db.recent_message().id().delete(id); + } let all_messages: Vec<_> = ctx.db.message().iter().collect(); for msg in all_messages { let seq_id = get_next_seq_id(&ctx.db, msg.channel_id); - ctx.db.channel_message_sequence().insert(ChannelMessageSequence { - message_id: msg.id, - channel_id: msg.channel_id, - seq_id, - }); + ctx.db + .channel_message_sequence() + .insert(ChannelMessageSequence { + message_id: msg.id, + channel_id: msg.channel_id, + seq_id, + }); if let Some(channel) = ctx.db.channel().id().find(msg.channel_id) { ctx.db.recent_message().insert(RecentMessage { @@ -753,10 +1085,17 @@ pub fn open_direct_message(ctx: &ReducerContext, recipient: Identity) { } // Check if a DM already exists using indexes - let existing = ctx.db.direct_message().sender().filter(ctx.sender()) + let existing = ctx + .db + .direct_message() + .sender() + .filter(ctx.sender()) .find(|dm| dm.recipient == recipient) .or_else(|| { - ctx.db.direct_message().recipient().filter(ctx.sender()) + ctx.db + .direct_message() + .recipient() + .filter(ctx.sender()) .find(|dm| dm.sender == recipient) }); @@ -776,7 +1115,6 @@ pub fn open_direct_message(ctx: &ReducerContext, recipient: Identity) { kind: ChannelKind::Text, }); - ctx.db.direct_message().insert(DirectMessage { id: 0, channel_id: chan.id, @@ -790,7 +1128,13 @@ pub fn open_direct_message(ctx: &ReducerContext, recipient: Identity) { #[spacetimedb::reducer] pub fn close_direct_message(ctx: &ReducerContext, channel_id: u64) { - let dm = ctx.db.direct_message().channel_id().filter(channel_id).next().expect("Direct message not found"); + let dm = ctx + .db + .direct_message() + .channel_id() + .filter(channel_id) + .next() + .expect("Direct message not found"); let mut dm = dm; if dm.sender == ctx.sender() { dm.is_open_sender = false; diff --git a/spacetimedb/src/tables.rs b/spacetimedb/src/tables.rs index 6243cbd..caae832 100644 --- a/spacetimedb/src/tables.rs +++ b/spacetimedb/src/tables.rs @@ -246,6 +246,8 @@ pub struct MessageImage { #[auto_inc] pub id: u64, #[index(btree)] + pub server_id: u64, // 0 if DM + #[index(btree)] pub message_id: u64, pub image_id: u64, } @@ -256,6 +258,8 @@ pub struct MessageReaction { #[auto_inc] pub id: u64, #[index(btree)] + pub server_id: u64, // 0 if DM + #[index(btree)] pub message_id: u64, pub identity: Identity, pub emoji: Option, diff --git a/spacetimedb/src/utils.rs b/spacetimedb/src/utils.rs index a2862ac..c35b748 100644 --- a/spacetimedb/src/utils.rs +++ b/spacetimedb/src/utils.rs @@ -1,5 +1,5 @@ -use spacetimedb::{Identity, Table, Local, LocalReadOnly}; use crate::tables::*; +use spacetimedb::{Identity, Local, LocalReadOnly, Table}; use std::collections::{HashMap, HashSet}; pub fn validate_name(name: &str) -> Result<(), String> { @@ -10,7 +10,10 @@ pub fn validate_name(name: &str) -> Result<(), String> { } pub fn validate_message_length(db: &Local, text: &str) -> Result<(), String> { - let max_length_conf = db.system_configuration().key().find("max_message_length".to_string()); + let max_length_conf = db + .system_configuration() + .key() + .find("max_message_length".to_string()); let max_length = max_length_conf .and_then(|c| c.value.parse::().ok()) .unwrap_or(262144); @@ -30,10 +33,12 @@ pub fn get_next_seq_id(db: &Local, channel_id: u64) -> u64 { let next_seq_id = hwm.as_ref().map(|h| h.last_seq_id + 1).unwrap_or(1); if let Some(_h) = hwm { - db.channel_high_water_mark().channel_id().update(ChannelHighWaterMark { - channel_id, - last_seq_id: next_seq_id, - }); + db.channel_high_water_mark() + .channel_id() + .update(ChannelHighWaterMark { + channel_id, + last_seq_id: next_seq_id, + }); } else { db.channel_high_water_mark().insert(ChannelHighWaterMark { channel_id, @@ -47,7 +52,11 @@ pub fn get_next_seq_id(db: &Local, channel_id: u64) -> u64 { pub fn get_visible_message_ids(db: &Local, identity: Identity) -> HashMap { let mut result = HashMap::new(); if let Some(sub) = db.channel_subscription().identity().find(identity) { - for cms in db.channel_message_sequence().channel_id().filter(sub.channel_id) { + for cms in db + .channel_message_sequence() + .channel_id() + .filter(sub.channel_id) + { if cms.seq_id >= sub.earliest_seq_id { result.insert(cms.message_id, cms.seq_id); } @@ -82,11 +91,18 @@ pub fn get_visible_message_ids(db: &Local, identity: Identity) -> HashMap HashMap { +pub fn get_visible_message_ids_read_only( + db: &LocalReadOnly, + identity: Identity, +) -> HashMap { let mut result = HashMap::new(); if let Some(sub) = db.channel_subscription().identity().find(identity) { // Use index filtering for read-only handles - for cms in db.channel_message_sequence().channel_id().filter(sub.channel_id) { + for cms in db + .channel_message_sequence() + .channel_id() + .filter(sub.channel_id) + { if cms.seq_id >= sub.earliest_seq_id { result.insert(cms.message_id, cms.seq_id); } @@ -144,12 +160,37 @@ pub fn get_visible_image_ids(db: &Local, identity: Identity) -> HashSet { for ce in db.custom_emoji().name().filter(""..) { ids.insert(ce.id); } - let visible_msg_ids = get_visible_message_ids(db, identity); - for msg_id in visible_msg_ids.keys() { - for mi in db.message_image().message_id().filter(*msg_id) { + // 3. Message Images (Optimized: use server_id directly) + for member in db.server_member().identity().filter(identity) { + for mi in db.message_image().server_id().filter(member.server_id) { ids.insert(mi.image_id); } } + + // DM images + for dm in db.direct_message().sender().filter(identity) { + if dm.is_open_sender { + for mi in db.message_image().server_id().filter(0u64) { + if let Some(msg) = db.message().id().find(mi.message_id) { + if msg.channel_id == dm.channel_id { + ids.insert(mi.image_id); + } + } + } + } + } + for dm in db.direct_message().recipient().filter(identity) { + if dm.is_open_recipient { + for mi in db.message_image().server_id().filter(0u64) { + if let Some(msg) = db.message().id().find(mi.message_id) { + if msg.channel_id == dm.channel_id { + ids.insert(mi.image_id); + } + } + } + } + } + ids } @@ -176,12 +217,37 @@ pub fn get_visible_image_ids_read_only(db: &LocalReadOnly, identity: Identity) - for ce in db.custom_emoji().name().filter(""..) { ids.insert(ce.id); } - let visible_msg_ids = get_visible_message_ids_read_only(db, identity); - for msg_id in visible_msg_ids.keys() { - for mi in db.message_image().message_id().filter(*msg_id) { + // 3. Message Images (Optimized: use server_id directly) + for member in db.server_member().identity().filter(identity) { + for mi in db.message_image().server_id().filter(member.server_id) { ids.insert(mi.image_id); } } + + // DM images + for dm in db.direct_message().sender().filter(identity) { + if dm.is_open_sender { + for mi in db.message_image().server_id().filter(0u64) { + if let Some(msg) = db.message().id().find(mi.message_id) { + if msg.channel_id == dm.channel_id { + ids.insert(mi.image_id); + } + } + } + } + } + for dm in db.direct_message().recipient().filter(identity) { + if dm.is_open_recipient { + for mi in db.message_image().server_id().filter(0u64) { + if let Some(msg) = db.message().id().find(mi.message_id) { + if msg.channel_id == dm.channel_id { + ids.insert(mi.image_id); + } + } + } + } + } + ids } @@ -189,25 +255,67 @@ pub fn clear_signaling_for_user(db: &Local, identity: Identity) { if let Some(va) = db.voice_activity().identity().find(identity) { db.voice_activity().delete(va); } - + let watchers: Vec<_> = db.watching().watcher().filter(identity).collect(); for row in watchers { db.watching().delete(row); } - + let watchees: Vec<_> = db.watching().watchee().filter(identity).collect(); for row in watchees { db.watching().delete(row); } - for row in db.voice_sdp_offer().sender().filter(identity).collect::>() { db.voice_sdp_offer().delete(row); } - for row in db.voice_sdp_offer().receiver().filter(identity).collect::>() { db.voice_sdp_offer().delete(row); } + for row in db + .voice_sdp_offer() + .sender() + .filter(identity) + .collect::>() + { + db.voice_sdp_offer().delete(row); + } + for row in db + .voice_sdp_offer() + .receiver() + .filter(identity) + .collect::>() + { + db.voice_sdp_offer().delete(row); + } - for row in db.voice_sdp_answer().sender().filter(identity).collect::>() { db.voice_sdp_answer().delete(row); } - for row in db.voice_sdp_answer().receiver().filter(identity).collect::>() { db.voice_sdp_answer().delete(row); } + for row in db + .voice_sdp_answer() + .sender() + .filter(identity) + .collect::>() + { + db.voice_sdp_answer().delete(row); + } + for row in db + .voice_sdp_answer() + .receiver() + .filter(identity) + .collect::>() + { + db.voice_sdp_answer().delete(row); + } - for row in db.voice_ice_candidate().sender().filter(identity).collect::>() { db.voice_ice_candidate().delete(row); } - for row in db.voice_ice_candidate().receiver().filter(identity).collect::>() { db.voice_ice_candidate().delete(row); } + for row in db + .voice_ice_candidate() + .sender() + .filter(identity) + .collect::>() + { + db.voice_ice_candidate().delete(row); + } + for row in db + .voice_ice_candidate() + .receiver() + .filter(identity) + .collect::>() + { + db.voice_ice_candidate().delete(row); + } } pub fn auto_join_community_server(db: &Local, identity: Identity) { diff --git a/spacetimedb/src/views.rs b/spacetimedb/src/views.rs index 94b6b4a..8e0c714 100644 --- a/spacetimedb/src/views.rs +++ b/spacetimedb/src/views.rs @@ -1,6 +1,6 @@ -use spacetimedb::{ViewContext, Identity, Timestamp}; use crate::tables::*; use crate::utils::*; +use spacetimedb::{Identity, Timestamp, ViewContext}; #[derive(spacetimedb::SpacetimeType)] pub struct VisibleImageRow { @@ -67,7 +67,7 @@ pub struct VisibleDirectMessageRow { pub fn visible_channels(ctx: &ViewContext) -> Vec { let mut results = Vec::new(); let identity = ctx.sender(); - + // Server channels for member in ctx.db.server_member().identity().filter(identity) { for chan in ctx.db.channel().server_id().filter(member.server_id) { @@ -79,7 +79,7 @@ pub fn visible_channels(ctx: &ViewContext) -> Vec { }); } } - + // DM channels for dm in ctx.db.direct_message().sender().filter(identity) { if let Some(chan) = ctx.db.channel().id().find(dm.channel_id) { @@ -101,7 +101,7 @@ pub fn visible_channels(ctx: &ViewContext) -> Vec { }); } } - + results } @@ -109,7 +109,7 @@ pub fn visible_channels(ctx: &ViewContext) -> Vec { pub fn visible_direct_messages(ctx: &ViewContext) -> Vec { let mut results = Vec::new(); let identity = ctx.sender(); - + for dm in ctx.db.direct_message().sender().filter(identity) { results.push(VisibleDirectMessageRow { id: dm.id, @@ -130,7 +130,7 @@ pub fn visible_direct_messages(ctx: &ViewContext) -> Vec Vec { #[spacetimedb::view(accessor = visible_messages, public)] pub fn visible_messages(ctx: &ViewContext) -> Vec { - let visible_seq_map = get_visible_message_ids_read_only(&ctx.db, ctx.sender()); let mut results = Vec::new(); - for (msg_id, seq_id) in visible_seq_map { - if let Some(msg) = ctx.db.message().id().find(msg_id) { + let identity = ctx.sender(); + + // FAST PATH: Recent Messages via denormalized server_id + for member in ctx.db.server_member().identity().filter(identity) { + for rm in ctx.db.recent_message().server_id().filter(member.server_id) { results.push(VisibleMessageRow { - id: msg.id, - sender: msg.sender, - sent: msg.sent, - text: msg.text, - channel_id: msg.channel_id, - thread_id: msg.thread_id, - seq_id, + id: rm.message_id, + sender: rm.sender, + sent: rm.sent, + text: rm.text.clone(), + channel_id: rm.channel_id, + thread_id: rm.thread_id, + seq_id: rm.seq_id, }); } } + + // DM FAST PATH + for dm in ctx.db.direct_message().sender().filter(identity) { + if dm.is_open_sender { + for rm in ctx.db.recent_message().channel_id().filter(dm.channel_id) { + results.push(VisibleMessageRow { + id: rm.message_id, + sender: rm.sender, + sent: rm.sent, + text: rm.text.clone(), + channel_id: rm.channel_id, + thread_id: rm.thread_id, + seq_id: rm.seq_id, + }); + } + } + } + for dm in ctx.db.direct_message().recipient().filter(identity) { + if dm.is_open_recipient { + for rm in ctx.db.recent_message().channel_id().filter(dm.channel_id) { + results.push(VisibleMessageRow { + id: rm.message_id, + sender: rm.sender, + sent: rm.sent, + text: rm.text.clone(), + channel_id: rm.channel_id, + thread_id: rm.thread_id, + seq_id: rm.seq_id, + }); + } + } + } + + // SLOW PATH: Scrollback (only for the active channel subscription) + if let Some(sub) = ctx.db.channel_subscription().identity().find(identity) { + for cms in ctx + .db + .channel_message_sequence() + .channel_id() + .filter(sub.channel_id) + { + if cms.seq_id >= sub.earliest_seq_id { + // To avoid duplicates with the Fast Path, we'd check if results already has it, + // but let's assume the frontend handles ID uniqueness for now to save CPU. + // Or better: Only pull messages from CMS that are NOT in RecentMessage. + // For simplicity and speed, we'll just pull them all if we scroll. + if let Some(msg) = ctx.db.message().id().find(cms.message_id) { + results.push(VisibleMessageRow { + id: msg.id, + sender: msg.sender, + sent: msg.sent, + text: msg.text, + channel_id: msg.channel_id, + thread_id: msg.thread_id, + seq_id: cms.seq_id, + }); + } + } + } + } + + results +} + +#[spacetimedb::view(accessor = visible_message_images, public)] +pub fn visible_message_images(ctx: &ViewContext) -> Vec { + let mut results = Vec::new(); + let identity = ctx.sender(); + + // Optimized: pull all images for all joined servers in one indexed scan + for member in ctx.db.server_member().identity().filter(identity) { + for mi in ctx.db.message_image().server_id().filter(member.server_id) { + results.push(VisibleMessageImageRow { + id: mi.id, + message_id: mi.message_id, + image_id: mi.image_id, + }); + } + } + + // DM images + for dm in ctx.db.direct_message().sender().filter(identity) { + if dm.is_open_sender { + for mi in ctx.db.message_image().server_id().filter(0u64) { + // 0 is DM + if let Some(msg) = ctx.db.message().id().find(mi.message_id) { + if msg.channel_id == dm.channel_id { + results.push(VisibleMessageImageRow { + id: mi.id, + message_id: mi.message_id, + image_id: mi.image_id, + }); + } + } + } + } + } + + results +} + +#[spacetimedb::view(accessor = visible_message_reactions, public)] +pub fn visible_message_reactions(ctx: &ViewContext) -> Vec { + let mut results = Vec::new(); + let identity = ctx.sender(); + + // Optimized: pull all reactions for all joined servers in one indexed scan + for member in ctx.db.server_member().identity().filter(identity) { + for mr in ctx + .db + .message_reaction() + .server_id() + .filter(member.server_id) + { + results.push(VisibleMessageReactionRow { + id: mr.id, + message_id: mr.message_id, + identity: mr.identity, + emoji: mr.emoji, + custom_emoji_id: mr.custom_emoji_id, + }); + } + } + results } @@ -184,37 +310,3 @@ pub fn my_channel_subscriptions(ctx: &ViewContext) -> Vec Vec { - let visible_seq_map = get_visible_message_ids_read_only(&ctx.db, ctx.sender()); - let mut results = Vec::new(); - for msg_id in visible_seq_map.keys() { - for mi in ctx.db.message_image().message_id().filter(*msg_id) { - results.push(VisibleMessageImageRow { - id: mi.id, - message_id: mi.message_id, - image_id: mi.image_id, - }); - } - } - results -} - -#[spacetimedb::view(accessor = visible_message_reactions, public)] -pub fn visible_message_reactions(ctx: &ViewContext) -> Vec { - let visible_seq_map = get_visible_message_ids_read_only(&ctx.db, ctx.sender()); - let mut results = Vec::new(); - for msg_id in visible_seq_map.keys() { - for mr in ctx.db.message_reaction().message_id().filter(*msg_id) { - results.push(VisibleMessageReactionRow { - id: mr.id, - message_id: mr.message_id, - identity: mr.identity, - emoji: mr.emoji, - custom_emoji_id: mr.custom_emoji_id, - }); - } - } - results -} diff --git a/src/chat/services/messaging.svelte.ts b/src/chat/services/messaging.svelte.ts index 1e5d33c..fe5ca32 100644 --- a/src/chat/services/messaging.svelte.ts +++ b/src/chat/services/messaging.svelte.ts @@ -68,6 +68,7 @@ export class MessagingService { visibleMessagesStore.subscribe((v) => { this.#visibleMessagesRaw = v; this.#updateBuckets(v); + this.#checkMessageWindowLimit(); }); visibleMessageImagesStore.subscribe((v) => { @@ -371,4 +372,29 @@ export class MessagingService { setTyping = (channelId: bigint, typing: boolean) => { this.#setTypingReducer({ channelId, typing }); }; + + #checkMessageWindowLimit = () => { + const channelId = this.#nav.activeChannelId; + if (!channelId) return; + + const bucket = this.#channelBuckets.get(channelId); + if (!bucket || bucket.sorted.length <= 500) return; + + // Limit reached: update the subscription window to only the latest 500 + untrack(() => { + const latest500 = bucket.sorted.slice(-500); + const newEarliest = latest500[0].seqId; + + if (newEarliest !== undefined) { + const sub = this.#mySubscriptions.find(s => s.channelId === channelId); + if (sub && newEarliest > sub.earliestSeqId) { + console.log(`[MessagingService] Window limit reached for channel ${channelId}. Sliding earliestSeqId to ${newEarliest}`); + this.#extendSubscriptionReducer({ + channelId, + earliestSeqId: newEarliest + }); + } + } + }); + }; }