rust formatting

This commit is contained in:
2026-04-08 01:52:31 -04:00
parent 036c14f658
commit 2d43c652a8
6 changed files with 770 additions and 190 deletions
+13 -7
View File
@@ -1,18 +1,24 @@
use spacetimedb::{ReducerContext, Table};
mod tables;
mod reducers;
mod tables;
mod utils;
mod views;
pub use tables::*;
pub use reducers::*;
pub use tables::*;
pub use utils::*;
pub use views::*;
#[spacetimedb::reducer(init)]
pub fn init(ctx: &ReducerContext) {
if ctx.db.system_configuration().key().find("max_message_length".to_string()).is_none() {
if ctx
.db
.system_configuration()
.key()
.find("max_message_length".to_string())
.is_none()
{
ctx.db.system_configuration().insert(SystemConfiguration {
key: "max_message_length".to_string(),
value: "262144".to_string(),
@@ -44,7 +50,7 @@ pub fn init(ctx: &ReducerContext) {
#[spacetimedb::reducer(client_connected)]
pub fn on_connect(ctx: &ReducerContext) {
log::info!("on_connect START: identity={}", ctx.sender().to_hex());
// We'll keep this extremely minimal to ensure connection stability
if let Some(mut user) = ctx.db.user().identity().find(ctx.sender()) {
user.online = true;
@@ -62,7 +68,7 @@ pub fn on_connect(ctx: &ReducerContext) {
biography: None,
status: None,
});
// Minimal auto-join
let community_server = ctx.db.server().name().filter(&"Zep".to_string()).next();
if let Some(s) = community_server {
@@ -83,11 +89,11 @@ pub fn on_disconnect(ctx: &ReducerContext) {
user.online = false;
ctx.db.user().identity().update(user);
}
if let Some(vs) = ctx.db.voice_state().identity().find(ctx.sender()) {
ctx.db.voice_state().delete(vs);
}
if let Some(ta) = ctx.db.typing_activity().identity().find(ctx.sender()) {
ctx.db.typing_activity().delete(ta);
}
+454 -110
View File
@@ -1,6 +1,6 @@
use spacetimedb::{ReducerContext, Identity, Table};
use crate::tables::*;
use crate::utils::*;
use spacetimedb::{Identity, ReducerContext, Table};
#[spacetimedb::reducer]
pub fn set_typing(ctx: &ReducerContext, channel_id: u64, typing: bool) {
@@ -20,7 +20,13 @@ pub fn set_typing(ctx: &ReducerContext, channel_id: u64, typing: bool) {
}
#[spacetimedb::reducer]
pub fn upload_image(ctx: &ReducerContext, data: Vec<u8>, mime_type: String, name: Option<String>, client_id: Option<String>) {
pub fn upload_image(
ctx: &ReducerContext,
data: Vec<u8>,
mime_type: String,
name: Option<String>,
client_id: Option<String>,
) {
if let Some(ref cid) = client_id {
if let Some(_) = ctx.db.upload_status().client_id().find(cid.clone()) {
ctx.db.upload_status().client_id().delete(cid.clone());
@@ -80,7 +86,12 @@ pub fn upload_custom_emoji(ctx: &ReducerContext, name: String, category: String,
if data.len() > 256 * 1024 {
panic!("Emoji image exceeds 256KB limit");
}
ctx.db.custom_emoji().insert(CustomEmoji { id: 0, name, category, data });
ctx.db.custom_emoji().insert(CustomEmoji {
id: 0,
name,
category,
data,
});
}
#[spacetimedb::reducer]
@@ -122,11 +133,21 @@ pub fn upload_banner(ctx: &ReducerContext, data: Vec<u8>, mime_type: String) {
}
#[spacetimedb::reducer]
pub fn upload_server_avatar(ctx: &ReducerContext, server_id: u64, data: Vec<u8>, mime_type: String) {
pub fn upload_server_avatar(
ctx: &ReducerContext,
server_id: u64,
data: Vec<u8>,
mime_type: String,
) {
if data.len() > 1024 * 1024 {
panic!("Avatar exceeds 1MB limit");
}
let mut s = ctx.db.server().id().find(server_id).expect("Server not found");
let mut s = ctx
.db
.server()
.id()
.find(server_id)
.expect("Server not found");
let img = ctx.db.image().insert(Image {
id: 0,
data,
@@ -140,29 +161,57 @@ pub fn upload_server_avatar(ctx: &ReducerContext, server_id: u64, data: Vec<u8>,
#[spacetimedb::reducer]
pub fn update_server_name(ctx: &ReducerContext, server_id: u64, name: String) {
validate_name(&name).expect("Invalid name");
let mut s = ctx.db.server().id().find(server_id).expect("Server not found");
let mut s = ctx
.db
.server()
.id()
.find(server_id)
.expect("Server not found");
s.name = name;
ctx.db.server().id().update(s);
}
#[spacetimedb::reducer]
pub fn delete_server(ctx: &ReducerContext, server_id: u64) {
let _ = ctx.db.server().id().find(server_id).expect("Server not found");
let _ = ctx
.db
.server()
.id()
.find(server_id)
.expect("Server not found");
let channels: Vec<_> = ctx.db.channel().server_id().filter(server_id).collect();
for c in channels {
let messages: Vec<_> = ctx.db.message().channel_id().filter(c.id).map(|m| m.id).collect();
let messages: Vec<_> = ctx
.db
.message()
.channel_id()
.filter(c.id)
.map(|m| m.id)
.collect();
for id in messages {
ctx.db.message().id().delete(id);
}
let recent: Vec<_> = ctx.db.recent_message().channel_id().filter(c.id).map(|rm| rm.id).collect();
let recent: Vec<_> = ctx
.db
.recent_message()
.channel_id()
.filter(c.id)
.map(|rm| rm.id)
.collect();
for id in recent {
ctx.db.recent_message().id().delete(id);
}
ctx.db.channel().id().delete(c.id);
}
let members: Vec<_> = ctx.db.server_member().server_id().filter(server_id).map(|m| m.id).collect();
let members: Vec<_> = ctx
.db
.server_member()
.server_id()
.filter(server_id)
.map(|m| m.id)
.collect();
for id in members {
ctx.db.server_member().id().delete(id);
}
@@ -171,29 +220,63 @@ pub fn delete_server(ctx: &ReducerContext, server_id: u64) {
}
#[spacetimedb::reducer]
pub fn toggle_reaction(ctx: &ReducerContext, message_id: u64, emoji: Option<String>, custom_emoji_id: Option<u64>) {
pub fn toggle_reaction(
ctx: &ReducerContext,
message_id: u64,
emoji: Option<String>,
custom_emoji_id: Option<u64>,
) {
if emoji.is_none() && custom_emoji_id.is_none() {
panic!("Emoji or CustomEmojiId required");
}
let user = ctx.db.user().identity().find(ctx.sender()).expect("User not found");
let user = ctx
.db
.user()
.identity()
.find(ctx.sender())
.expect("User not found");
if (user.subject.is_none() && user.name.is_none()) {
panic!("You must have a name or be logged in via OIDC to perform this action");
}
let existing = ctx.db.message_reaction().message_id().filter(message_id).find(|r| {
if r.identity != ctx.sender() { return false; }
if emoji.is_some() && r.emoji == emoji { return true; }
if custom_emoji_id.is_some() && r.custom_emoji_id == custom_emoji_id { return true; }
false
});
let existing = ctx
.db
.message_reaction()
.message_id()
.filter(message_id)
.find(|r| {
if r.identity != ctx.sender() {
return false;
}
if emoji.is_some() && r.emoji == emoji {
return true;
}
if custom_emoji_id.is_some() && r.custom_emoji_id == custom_emoji_id {
return true;
}
false
});
if let Some(r) = existing {
ctx.db.message_reaction().id().delete(r.id);
} else {
let msg = ctx
.db
.message()
.id()
.find(message_id)
.expect("Message not found");
let channel = ctx
.db
.channel()
.id()
.find(msg.channel_id)
.expect("Channel not found");
ctx.db.message_reaction().insert(MessageReaction {
id: 0,
server_id: channel.server_id,
message_id,
identity: ctx.sender(),
emoji,
@@ -205,53 +288,89 @@ pub fn toggle_reaction(ctx: &ReducerContext, message_id: u64, emoji: Option<Stri
#[spacetimedb::reducer]
pub fn set_name(ctx: &ReducerContext, name: String) {
validate_name(&name).expect("Invalid name");
let mut user = ctx.db.user().identity().find(ctx.sender()).expect("User not found");
let mut user = ctx
.db
.user()
.identity()
.find(ctx.sender())
.expect("User not found");
user.name = Some(name);
ctx.db.user().identity().update(user);
}
#[spacetimedb::reducer]
pub fn set_avatar(ctx: &ReducerContext, avatar_id: Option<u64>) {
let mut user = ctx.db.user().identity().find(ctx.sender()).expect("User not found");
let mut user = ctx
.db
.user()
.identity()
.find(ctx.sender())
.expect("User not found");
user.avatar_id = avatar_id;
ctx.db.user().identity().update(user);
}
#[spacetimedb::reducer]
pub fn set_banner(ctx: &ReducerContext, banner_id: Option<u64>) {
let mut user = ctx.db.user().identity().find(ctx.sender()).expect("User not found");
let mut user = ctx
.db
.user()
.identity()
.find(ctx.sender())
.expect("User not found");
user.banner_id = banner_id;
ctx.db.user().identity().update(user);
}
#[spacetimedb::reducer]
pub fn set_biography(ctx: &ReducerContext, biography: Option<String>) {
let mut user = ctx.db.user().identity().find(ctx.sender()).expect("User not found");
let mut user = ctx
.db
.user()
.identity()
.find(ctx.sender())
.expect("User not found");
user.biography = biography;
ctx.db.user().identity().update(user);
}
#[spacetimedb::reducer]
pub fn set_status(ctx: &ReducerContext, status: Option<String>) {
let mut user = ctx.db.user().identity().find(ctx.sender()).expect("User not found");
let mut user = ctx
.db
.user()
.identity()
.find(ctx.sender())
.expect("User not found");
user.status = status;
ctx.db.user().identity().update(user);
}
#[spacetimedb::reducer]
pub fn subscribe_to_channel(ctx: &ReducerContext, channel_id: u64) {
let hwm = ctx.db.channel_high_water_mark().channel_id().find(channel_id);
let hwm = ctx
.db
.channel_high_water_mark()
.channel_id()
.find(channel_id);
let current_max = hwm.map(|h| h.last_seq_id).unwrap_or(0);
let earliest = if current_max > 99 { current_max - 99 } else { 1 };
let earliest = if current_max > 99 {
current_max - 99
} else {
1
};
if let Some(existing) = ctx.db.channel_subscription().identity().find(ctx.sender()) {
if existing.channel_id != channel_id {
ctx.db.channel_subscription().identity().update(ChannelSubscription {
identity: ctx.sender(),
channel_id,
earliest_seq_id: earliest,
last_read_seq_id: current_max,
});
ctx.db
.channel_subscription()
.identity()
.update(ChannelSubscription {
identity: ctx.sender(),
channel_id,
earliest_seq_id: earliest,
last_read_seq_id: current_max,
});
}
} else {
ctx.db.channel_subscription().insert(ChannelSubscription {
@@ -293,12 +412,22 @@ pub fn set_talking(ctx: &ReducerContext, talking: bool, channel_id: u64) {
#[spacetimedb::reducer]
pub fn create_server(ctx: &ReducerContext, name: String) {
validate_name(&name).expect("Invalid name");
let user = ctx.db.user().identity().find(ctx.sender()).expect("User not found");
let user = ctx
.db
.user()
.identity()
.find(ctx.sender())
.expect("User not found");
if (user.subject.is_none() && user.name.is_none()) {
panic!("You must have a name or be logged in via OIDC to perform this action");
}
let s = ctx.db.server().insert(Server { id: 0, name, owner: Some(ctx.sender()), avatar_id: None });
let s = ctx.db.server().insert(Server {
id: 0,
name,
owner: Some(ctx.sender()),
avatar_id: None,
});
ctx.db.server_member().insert(ServerMember {
id: 0,
identity: ctx.sender(),
@@ -320,15 +449,27 @@ pub fn create_server(ctx: &ReducerContext, name: String) {
#[spacetimedb::reducer]
pub fn join_server(ctx: &ReducerContext, server_id: u64) {
let user = ctx.db.user().identity().find(ctx.sender()).expect("User not found");
let user = ctx
.db
.user()
.identity()
.find(ctx.sender())
.expect("User not found");
if (user.subject.is_none() && user.name.is_none()) {
panic!("You must have a name or be logged in via OIDC to perform this action");
}
let _ = ctx.db.server().id().find(server_id).expect("Server not found");
let _ = ctx
.db
.server()
.id()
.find(server_id)
.expect("Server not found");
for m in ctx.db.server_member().identity().filter(ctx.sender()) {
if m.server_id == server_id { return; }
if m.server_id == server_id {
return;
}
}
ctx.db.server_member().insert(ServerMember {
@@ -340,7 +481,13 @@ pub fn join_server(ctx: &ReducerContext, server_id: u64) {
#[spacetimedb::reducer]
pub fn leave_server(ctx: &ReducerContext, server_id: u64) {
let members: Vec<_> = ctx.db.server_member().identity().filter(ctx.sender()).map(|m| m.id).collect();
let members: Vec<_> = ctx
.db
.server_member()
.identity()
.filter(ctx.sender())
.map(|m| m.id)
.collect();
for id in members {
if let Some(m) = ctx.db.server_member().id().find(id) {
if m.server_id == server_id {
@@ -353,28 +500,52 @@ pub fn leave_server(ctx: &ReducerContext, server_id: u64) {
#[spacetimedb::reducer]
pub fn create_channel(ctx: &ReducerContext, name: String, server_id: u64, is_voice: bool) {
validate_name(&name).expect("Invalid name");
let user = ctx.db.user().identity().find(ctx.sender()).expect("User not found");
let user = ctx
.db
.user()
.identity()
.find(ctx.sender())
.expect("User not found");
if (user.subject.is_none() && user.name.is_none()) {
panic!("You must have a name or be logged in via OIDC to perform this action");
}
let _ = ctx.db.server().id().find(server_id).expect("Server not found");
let _ = ctx
.db
.server()
.id()
.find(server_id)
.expect("Server not found");
ctx.db.channel().insert(Channel {
id: 0,
server_id,
name,
kind: if is_voice { ChannelKind::Voice } else { ChannelKind::Text },
kind: if is_voice {
ChannelKind::Voice
} else {
ChannelKind::Text
},
});
}
#[spacetimedb::reducer]
pub fn join_voice(ctx: &ReducerContext, channel_id: u64) {
let user = ctx.db.user().identity().find(ctx.sender()).expect("User not found");
let user = ctx
.db
.user()
.identity()
.find(ctx.sender())
.expect("User not found");
if (user.subject.is_none() && user.name.is_none()) {
panic!("You must have a name or be logged in via OIDC to perform this action");
}
let chan = ctx.db.channel().id().find(channel_id).expect("Invalid channel");
let chan = ctx
.db
.channel()
.id()
.find(channel_id)
.expect("Invalid channel");
if !matches!(chan.kind, ChannelKind::Voice) {
panic!("Invalid voice channel");
}
@@ -408,17 +579,49 @@ pub fn set_sharing_screen(ctx: &ReducerContext, sharing: bool) {
ctx.db.voice_state().identity().update(state);
if !sharing {
let watching: Vec<_> = ctx.db.watching().watchee().filter(ctx.sender()).map(|w| w.id).collect();
for id in watching { ctx.db.watching().id().delete(id); }
let s_offers: Vec<_> = ctx.db.screen_sdp_offer().sender().filter(ctx.sender()).map(|r| r.id).collect();
for id in s_offers { ctx.db.screen_sdp_offer().id().delete(id); }
let s_answers: Vec<_> = ctx.db.screen_sdp_answer().sender().filter(ctx.sender()).map(|r| r.id).collect();
for id in s_answers { ctx.db.screen_sdp_answer().id().delete(id); }
let s_ice: Vec<_> = ctx.db.screen_ice_candidate().sender().filter(ctx.sender()).map(|r| r.id).collect();
for id in s_ice { ctx.db.screen_ice_candidate().id().delete(id); }
let watching: Vec<_> = ctx
.db
.watching()
.watchee()
.filter(ctx.sender())
.map(|w| w.id)
.collect();
for id in watching {
ctx.db.watching().id().delete(id);
}
let s_offers: Vec<_> = ctx
.db
.screen_sdp_offer()
.sender()
.filter(ctx.sender())
.map(|r| r.id)
.collect();
for id in s_offers {
ctx.db.screen_sdp_offer().id().delete(id);
}
let s_answers: Vec<_> = ctx
.db
.screen_sdp_answer()
.sender()
.filter(ctx.sender())
.map(|r| r.id)
.collect();
for id in s_answers {
ctx.db.screen_sdp_answer().id().delete(id);
}
let s_ice: Vec<_> = ctx
.db
.screen_ice_candidate()
.sender()
.filter(ctx.sender())
.map(|r| r.id)
.collect();
for id in s_ice {
ctx.db.screen_ice_candidate().id().delete(id);
}
}
}
}
@@ -441,10 +644,14 @@ pub fn set_deafen(ctx: &ReducerContext, deafened: bool) {
#[spacetimedb::reducer]
pub fn start_watching(ctx: &ReducerContext, watchee: Identity, channel_id: u64) {
if ctx.sender() == watchee { return; }
if ctx.sender() == watchee {
return;
}
for w in ctx.db.watching().watcher().filter(ctx.sender()) {
if w.watchee == watchee { return; }
if w.watchee == watchee {
return;
}
}
ctx.db.watching().insert(Watching {
id: 0,
@@ -456,9 +663,14 @@ pub fn start_watching(ctx: &ReducerContext, watchee: Identity, channel_id: u64)
#[spacetimedb::reducer]
pub fn stop_watching(ctx: &ReducerContext, watchee: Identity) {
let to_delete: Vec<_> = ctx.db.watching().watcher().filter(ctx.sender())
let to_delete: Vec<_> = ctx
.db
.watching()
.watcher()
.filter(ctx.sender())
.filter(|w| w.watchee == watchee)
.map(|w| w.id).collect();
.map(|w| w.id)
.collect();
for id in to_delete {
ctx.db.watching().id().delete(id);
}
@@ -473,7 +685,12 @@ pub fn leave_voice(ctx: &ReducerContext) {
}
#[spacetimedb::reducer]
pub fn send_voice_sdp_offer(ctx: &ReducerContext, receiver: Identity, sdp: String, channel_id: u64) {
pub fn send_voice_sdp_offer(
ctx: &ReducerContext,
receiver: Identity,
sdp: String,
channel_id: u64,
) {
ctx.db.voice_sdp_offer().insert(VoiceSdpOffer {
id: 0,
sender: ctx.sender(),
@@ -484,7 +701,12 @@ pub fn send_voice_sdp_offer(ctx: &ReducerContext, receiver: Identity, sdp: Strin
}
#[spacetimedb::reducer]
pub fn send_voice_sdp_answer(ctx: &ReducerContext, receiver: Identity, sdp: String, channel_id: u64) {
pub fn send_voice_sdp_answer(
ctx: &ReducerContext,
receiver: Identity,
sdp: String,
channel_id: u64,
) {
ctx.db.voice_sdp_answer().insert(VoiceSdpAnswer {
id: 0,
sender: ctx.sender(),
@@ -495,7 +717,12 @@ pub fn send_voice_sdp_answer(ctx: &ReducerContext, receiver: Identity, sdp: Stri
}
#[spacetimedb::reducer]
pub fn send_voice_ice_candidate(ctx: &ReducerContext, receiver: Identity, candidate: String, channel_id: u64) {
pub fn send_voice_ice_candidate(
ctx: &ReducerContext,
receiver: Identity,
candidate: String,
channel_id: u64,
) {
ctx.db.voice_ice_candidate().insert(VoiceIceCandidate {
id: 0,
sender: ctx.sender(),
@@ -506,7 +733,12 @@ pub fn send_voice_ice_candidate(ctx: &ReducerContext, receiver: Identity, candid
}
#[spacetimedb::reducer]
pub fn send_screen_sdp_offer(ctx: &ReducerContext, receiver: Identity, sdp: String, channel_id: u64) {
pub fn send_screen_sdp_offer(
ctx: &ReducerContext,
receiver: Identity,
sdp: String,
channel_id: u64,
) {
ctx.db.screen_sdp_offer().insert(ScreenSdpOffer {
id: 0,
sender: ctx.sender(),
@@ -517,7 +749,12 @@ pub fn send_screen_sdp_offer(ctx: &ReducerContext, receiver: Identity, sdp: Stri
}
#[spacetimedb::reducer]
pub fn send_screen_sdp_answer(ctx: &ReducerContext, receiver: Identity, sdp: String, channel_id: u64) {
pub fn send_screen_sdp_answer(
ctx: &ReducerContext,
receiver: Identity,
sdp: String,
channel_id: u64,
) {
ctx.db.screen_sdp_answer().insert(ScreenSdpAnswer {
id: 0,
sender: ctx.sender(),
@@ -528,7 +765,12 @@ pub fn send_screen_sdp_answer(ctx: &ReducerContext, receiver: Identity, sdp: Str
}
#[spacetimedb::reducer]
pub fn send_screen_ice_candidate(ctx: &ReducerContext, receiver: Identity, candidate: String, channel_id: u64) {
pub fn send_screen_ice_candidate(
ctx: &ReducerContext,
receiver: Identity,
candidate: String,
channel_id: u64,
) {
ctx.db.screen_ice_candidate().insert(ScreenIceCandidate {
id: 0,
sender: ctx.sender(),
@@ -541,9 +783,14 @@ pub fn send_screen_ice_candidate(ctx: &ReducerContext, receiver: Identity, candi
#[spacetimedb::reducer]
pub fn set_configuration(ctx: &ReducerContext, key: String, value: String) {
if let Some(_) = ctx.db.system_configuration().key().find(key.clone()) {
ctx.db.system_configuration().key().update(SystemConfiguration { key, value });
ctx.db
.system_configuration()
.key()
.update(SystemConfiguration { key, value });
} else {
ctx.db.system_configuration().insert(SystemConfiguration { key, value });
ctx.db
.system_configuration()
.insert(SystemConfiguration { key, value });
}
}
@@ -552,19 +799,34 @@ pub fn create_thread(ctx: &ReducerContext, name: String, channel_id: u64, parent
let mut thread_name = name;
if thread_name.trim().is_empty() {
let parent_msg = ctx.db.message().id().find(parent_message_id);
thread_name = parent_msg.map(|m| {
if m.text.trim().is_empty() { "New Thread".to_string() }
else { m.text.chars().take(32).collect() }
}).unwrap_or("New Thread".to_string());
thread_name = parent_msg
.map(|m| {
if m.text.trim().is_empty() {
"New Thread".to_string()
} else {
m.text.chars().take(32).collect()
}
})
.unwrap_or("New Thread".to_string());
}
validate_name(&thread_name).expect("Invalid name");
let user = ctx.db.user().identity().find(ctx.sender()).expect("User not found");
let user = ctx
.db
.user()
.identity()
.find(ctx.sender())
.expect("User not found");
if (user.subject.is_none() && user.name.is_none()) {
panic!("You must have a name or be logged in via OIDC to perform this action");
}
let _ = ctx.db.message().id().find(parent_message_id).expect("Parent message not found");
let _ = ctx
.db
.message()
.id()
.find(parent_message_id)
.expect("Parent message not found");
ctx.db.thread().insert(Thread {
id: 0,
@@ -575,7 +837,14 @@ pub fn create_thread(ctx: &ReducerContext, name: String, channel_id: u64, parent
}
#[spacetimedb::reducer]
pub fn create_thread_with_message(ctx: &ReducerContext, name: String, channel_id: u64, parent_message_id: u64, text: String, image_ids: Vec<u64>) {
pub fn create_thread_with_message(
ctx: &ReducerContext,
name: String,
channel_id: u64,
parent_message_id: u64,
text: String,
image_ids: Vec<u64>,
) {
if text.trim().is_empty() && image_ids.is_empty() {
panic!("Messages must not be empty");
}
@@ -584,12 +853,22 @@ pub fn create_thread_with_message(ctx: &ReducerContext, name: String, channel_id
validate_message_length(&ctx.db, &text).expect("Message too long");
}
let user = ctx.db.user().identity().find(ctx.sender()).expect("User not found");
let user = ctx
.db
.user()
.identity()
.find(ctx.sender())
.expect("User not found");
if (user.subject.is_none() && user.name.is_none()) {
panic!("You must have a name or be logged in via OIDC to perform this action");
}
let _ = ctx.db.message().id().find(parent_message_id).expect("Parent message not found");
let _ = ctx
.db
.message()
.id()
.find(parent_message_id)
.expect("Parent message not found");
let t = ctx.db.thread().insert(Thread {
id: 0,
@@ -608,25 +887,33 @@ pub fn create_thread_with_message(ctx: &ReducerContext, name: String, channel_id
});
let seq_id = get_next_seq_id(&ctx.db, channel_id);
ctx.db.channel_message_sequence().insert(ChannelMessageSequence {
message_id: msg.id,
channel_id,
seq_id,
});
ctx.db
.channel_message_sequence()
.insert(ChannelMessageSequence {
message_id: msg.id,
channel_id,
seq_id,
});
let chan = ctx
.db
.channel()
.id()
.find(channel_id)
.expect("Channel not found");
for image_id in image_ids {
ctx.db.message_image().insert(MessageImage {
id: 0,
server_id: chan.server_id,
message_id: msg.id,
image_id,
});
}
let channel = ctx.db.channel().id().find(channel_id).expect("Channel not found");
ctx.db.recent_message().insert(RecentMessage {
id: 0,
server_id: channel.server_id,
server_id: chan.server_id,
channel_id,
message_id: msg.id,
sender: ctx.sender(),
@@ -638,7 +925,11 @@ pub fn create_thread_with_message(ctx: &ReducerContext, name: String, channel_id
if seq_id > 100 {
let old_seq_id = seq_id - 100;
let to_delete: Vec<_> = ctx.db.recent_message().channel_id().filter(channel_id)
let to_delete: Vec<_> = ctx
.db
.recent_message()
.channel_id()
.filter(channel_id)
.filter(|m| m.seq_id <= old_seq_id)
.map(|m| m.id)
.collect();
@@ -649,7 +940,13 @@ pub fn create_thread_with_message(ctx: &ReducerContext, name: String, channel_id
}
#[spacetimedb::reducer]
pub fn send_message(ctx: &ReducerContext, text: String, channel_id: u64, thread_id: Option<u64>, image_ids: Vec<u64>) {
pub fn send_message(
ctx: &ReducerContext,
text: String,
channel_id: u64,
thread_id: Option<u64>,
image_ids: Vec<u64>,
) {
if text.trim().is_empty() && image_ids.is_empty() {
panic!("Messages must not be empty");
}
@@ -658,7 +955,12 @@ pub fn send_message(ctx: &ReducerContext, text: String, channel_id: u64, thread_
validate_message_length(&ctx.db, &text).expect("Message too long");
}
let _user = ctx.db.user().identity().find(ctx.sender()).expect("You must be registered to send messages");
let _user = ctx
.db
.user()
.identity()
.find(ctx.sender())
.expect("You must be registered to send messages");
let msg = ctx.db.message().insert(Message {
id: 0,
@@ -670,25 +972,33 @@ pub fn send_message(ctx: &ReducerContext, text: String, channel_id: u64, thread_
});
let seq_id = get_next_seq_id(&ctx.db, channel_id);
ctx.db.channel_message_sequence().insert(ChannelMessageSequence {
message_id: msg.id,
channel_id,
seq_id,
});
ctx.db
.channel_message_sequence()
.insert(ChannelMessageSequence {
message_id: msg.id,
channel_id,
seq_id,
});
let chan = ctx
.db
.channel()
.id()
.find(channel_id)
.expect("Channel not found");
for image_id in image_ids {
ctx.db.message_image().insert(MessageImage {
id: 0,
server_id: chan.server_id,
message_id: msg.id,
image_id,
});
}
let channel = ctx.db.channel().id().find(channel_id).expect("Channel not found");
ctx.db.recent_message().insert(RecentMessage {
id: 0,
server_id: channel.server_id,
server_id: chan.server_id,
channel_id,
message_id: msg.id,
sender: ctx.sender(),
@@ -700,7 +1010,11 @@ pub fn send_message(ctx: &ReducerContext, text: String, channel_id: u64, thread_
if seq_id > 100 {
let old_seq_id = seq_id - 100;
let to_delete: Vec<_> = ctx.db.recent_message().channel_id().filter(channel_id)
let to_delete: Vec<_> = ctx
.db
.recent_message()
.channel_id()
.filter(channel_id)
.filter(|m| m.seq_id <= old_seq_id)
.map(|m| m.id)
.collect();
@@ -712,23 +1026,41 @@ pub fn send_message(ctx: &ReducerContext, text: String, channel_id: u64, thread_
#[spacetimedb::reducer]
pub fn bootstrap_sequences(ctx: &ReducerContext) {
let cms_ids: Vec<_> = ctx.db.channel_message_sequence().iter().map(|r| r.message_id).collect();
for id in cms_ids { ctx.db.channel_message_sequence().message_id().delete(id); }
let hwm_ids: Vec<_> = ctx.db.channel_high_water_mark().iter().map(|r| r.channel_id).collect();
for id in hwm_ids { ctx.db.channel_high_water_mark().channel_id().delete(id); }
let cms_ids: Vec<_> = ctx
.db
.channel_message_sequence()
.iter()
.map(|r| r.message_id)
.collect();
for id in cms_ids {
ctx.db.channel_message_sequence().message_id().delete(id);
}
let hwm_ids: Vec<_> = ctx
.db
.channel_high_water_mark()
.iter()
.map(|r| r.channel_id)
.collect();
for id in hwm_ids {
ctx.db.channel_high_water_mark().channel_id().delete(id);
}
let rm_ids: Vec<_> = ctx.db.recent_message().iter().map(|r| r.id).collect();
for id in rm_ids { ctx.db.recent_message().id().delete(id); }
for id in rm_ids {
ctx.db.recent_message().id().delete(id);
}
let all_messages: Vec<_> = ctx.db.message().iter().collect();
for msg in all_messages {
let seq_id = get_next_seq_id(&ctx.db, msg.channel_id);
ctx.db.channel_message_sequence().insert(ChannelMessageSequence {
message_id: msg.id,
channel_id: msg.channel_id,
seq_id,
});
ctx.db
.channel_message_sequence()
.insert(ChannelMessageSequence {
message_id: msg.id,
channel_id: msg.channel_id,
seq_id,
});
if let Some(channel) = ctx.db.channel().id().find(msg.channel_id) {
ctx.db.recent_message().insert(RecentMessage {
@@ -753,10 +1085,17 @@ pub fn open_direct_message(ctx: &ReducerContext, recipient: Identity) {
}
// Check if a DM already exists using indexes
let existing = ctx.db.direct_message().sender().filter(ctx.sender())
let existing = ctx
.db
.direct_message()
.sender()
.filter(ctx.sender())
.find(|dm| dm.recipient == recipient)
.or_else(|| {
ctx.db.direct_message().recipient().filter(ctx.sender())
ctx.db
.direct_message()
.recipient()
.filter(ctx.sender())
.find(|dm| dm.sender == recipient)
});
@@ -776,7 +1115,6 @@ pub fn open_direct_message(ctx: &ReducerContext, recipient: Identity) {
kind: ChannelKind::Text,
});
ctx.db.direct_message().insert(DirectMessage {
id: 0,
channel_id: chan.id,
@@ -790,7 +1128,13 @@ pub fn open_direct_message(ctx: &ReducerContext, recipient: Identity) {
#[spacetimedb::reducer]
pub fn close_direct_message(ctx: &ReducerContext, channel_id: u64) {
let dm = ctx.db.direct_message().channel_id().filter(channel_id).next().expect("Direct message not found");
let dm = ctx
.db
.direct_message()
.channel_id()
.filter(channel_id)
.next()
.expect("Direct message not found");
let mut dm = dm;
if dm.sender == ctx.sender() {
dm.is_open_sender = false;
+4
View File
@@ -246,6 +246,8 @@ pub struct MessageImage {
#[auto_inc]
pub id: u64,
#[index(btree)]
pub server_id: u64, // 0 if DM
#[index(btree)]
pub message_id: u64,
pub image_id: u64,
}
@@ -256,6 +258,8 @@ pub struct MessageReaction {
#[auto_inc]
pub id: u64,
#[index(btree)]
pub server_id: u64, // 0 if DM
#[index(btree)]
pub message_id: u64,
pub identity: Identity,
pub emoji: Option<String>,
+131 -23
View File
@@ -1,5 +1,5 @@
use spacetimedb::{Identity, Table, Local, LocalReadOnly};
use crate::tables::*;
use spacetimedb::{Identity, Local, LocalReadOnly, Table};
use std::collections::{HashMap, HashSet};
pub fn validate_name(name: &str) -> Result<(), String> {
@@ -10,7 +10,10 @@ pub fn validate_name(name: &str) -> Result<(), String> {
}
pub fn validate_message_length(db: &Local, text: &str) -> Result<(), String> {
let max_length_conf = db.system_configuration().key().find("max_message_length".to_string());
let max_length_conf = db
.system_configuration()
.key()
.find("max_message_length".to_string());
let max_length = max_length_conf
.and_then(|c| c.value.parse::<usize>().ok())
.unwrap_or(262144);
@@ -30,10 +33,12 @@ pub fn get_next_seq_id(db: &Local, channel_id: u64) -> u64 {
let next_seq_id = hwm.as_ref().map(|h| h.last_seq_id + 1).unwrap_or(1);
if let Some(_h) = hwm {
db.channel_high_water_mark().channel_id().update(ChannelHighWaterMark {
channel_id,
last_seq_id: next_seq_id,
});
db.channel_high_water_mark()
.channel_id()
.update(ChannelHighWaterMark {
channel_id,
last_seq_id: next_seq_id,
});
} else {
db.channel_high_water_mark().insert(ChannelHighWaterMark {
channel_id,
@@ -47,7 +52,11 @@ pub fn get_next_seq_id(db: &Local, channel_id: u64) -> u64 {
pub fn get_visible_message_ids(db: &Local, identity: Identity) -> HashMap<u64, u64> {
let mut result = HashMap::new();
if let Some(sub) = db.channel_subscription().identity().find(identity) {
for cms in db.channel_message_sequence().channel_id().filter(sub.channel_id) {
for cms in db
.channel_message_sequence()
.channel_id()
.filter(sub.channel_id)
{
if cms.seq_id >= sub.earliest_seq_id {
result.insert(cms.message_id, cms.seq_id);
}
@@ -82,11 +91,18 @@ pub fn get_visible_message_ids(db: &Local, identity: Identity) -> HashMap<u64, u
result
}
pub fn get_visible_message_ids_read_only(db: &LocalReadOnly, identity: Identity) -> HashMap<u64, u64> {
pub fn get_visible_message_ids_read_only(
db: &LocalReadOnly,
identity: Identity,
) -> HashMap<u64, u64> {
let mut result = HashMap::new();
if let Some(sub) = db.channel_subscription().identity().find(identity) {
// Use index filtering for read-only handles
for cms in db.channel_message_sequence().channel_id().filter(sub.channel_id) {
for cms in db
.channel_message_sequence()
.channel_id()
.filter(sub.channel_id)
{
if cms.seq_id >= sub.earliest_seq_id {
result.insert(cms.message_id, cms.seq_id);
}
@@ -144,12 +160,37 @@ pub fn get_visible_image_ids(db: &Local, identity: Identity) -> HashSet<u64> {
for ce in db.custom_emoji().name().filter(""..) {
ids.insert(ce.id);
}
let visible_msg_ids = get_visible_message_ids(db, identity);
for msg_id in visible_msg_ids.keys() {
for mi in db.message_image().message_id().filter(*msg_id) {
// 3. Message Images (Optimized: use server_id directly)
for member in db.server_member().identity().filter(identity) {
for mi in db.message_image().server_id().filter(member.server_id) {
ids.insert(mi.image_id);
}
}
// DM images
for dm in db.direct_message().sender().filter(identity) {
if dm.is_open_sender {
for mi in db.message_image().server_id().filter(0u64) {
if let Some(msg) = db.message().id().find(mi.message_id) {
if msg.channel_id == dm.channel_id {
ids.insert(mi.image_id);
}
}
}
}
}
for dm in db.direct_message().recipient().filter(identity) {
if dm.is_open_recipient {
for mi in db.message_image().server_id().filter(0u64) {
if let Some(msg) = db.message().id().find(mi.message_id) {
if msg.channel_id == dm.channel_id {
ids.insert(mi.image_id);
}
}
}
}
}
ids
}
@@ -176,12 +217,37 @@ pub fn get_visible_image_ids_read_only(db: &LocalReadOnly, identity: Identity) -
for ce in db.custom_emoji().name().filter(""..) {
ids.insert(ce.id);
}
let visible_msg_ids = get_visible_message_ids_read_only(db, identity);
for msg_id in visible_msg_ids.keys() {
for mi in db.message_image().message_id().filter(*msg_id) {
// 3. Message Images (Optimized: use server_id directly)
for member in db.server_member().identity().filter(identity) {
for mi in db.message_image().server_id().filter(member.server_id) {
ids.insert(mi.image_id);
}
}
// DM images
for dm in db.direct_message().sender().filter(identity) {
if dm.is_open_sender {
for mi in db.message_image().server_id().filter(0u64) {
if let Some(msg) = db.message().id().find(mi.message_id) {
if msg.channel_id == dm.channel_id {
ids.insert(mi.image_id);
}
}
}
}
}
for dm in db.direct_message().recipient().filter(identity) {
if dm.is_open_recipient {
for mi in db.message_image().server_id().filter(0u64) {
if let Some(msg) = db.message().id().find(mi.message_id) {
if msg.channel_id == dm.channel_id {
ids.insert(mi.image_id);
}
}
}
}
}
ids
}
@@ -189,25 +255,67 @@ pub fn clear_signaling_for_user(db: &Local, identity: Identity) {
if let Some(va) = db.voice_activity().identity().find(identity) {
db.voice_activity().delete(va);
}
let watchers: Vec<_> = db.watching().watcher().filter(identity).collect();
for row in watchers {
db.watching().delete(row);
}
let watchees: Vec<_> = db.watching().watchee().filter(identity).collect();
for row in watchees {
db.watching().delete(row);
}
for row in db.voice_sdp_offer().sender().filter(identity).collect::<Vec<_>>() { db.voice_sdp_offer().delete(row); }
for row in db.voice_sdp_offer().receiver().filter(identity).collect::<Vec<_>>() { db.voice_sdp_offer().delete(row); }
for row in db
.voice_sdp_offer()
.sender()
.filter(identity)
.collect::<Vec<_>>()
{
db.voice_sdp_offer().delete(row);
}
for row in db
.voice_sdp_offer()
.receiver()
.filter(identity)
.collect::<Vec<_>>()
{
db.voice_sdp_offer().delete(row);
}
for row in db.voice_sdp_answer().sender().filter(identity).collect::<Vec<_>>() { db.voice_sdp_answer().delete(row); }
for row in db.voice_sdp_answer().receiver().filter(identity).collect::<Vec<_>>() { db.voice_sdp_answer().delete(row); }
for row in db
.voice_sdp_answer()
.sender()
.filter(identity)
.collect::<Vec<_>>()
{
db.voice_sdp_answer().delete(row);
}
for row in db
.voice_sdp_answer()
.receiver()
.filter(identity)
.collect::<Vec<_>>()
{
db.voice_sdp_answer().delete(row);
}
for row in db.voice_ice_candidate().sender().filter(identity).collect::<Vec<_>>() { db.voice_ice_candidate().delete(row); }
for row in db.voice_ice_candidate().receiver().filter(identity).collect::<Vec<_>>() { db.voice_ice_candidate().delete(row); }
for row in db
.voice_ice_candidate()
.sender()
.filter(identity)
.collect::<Vec<_>>()
{
db.voice_ice_candidate().delete(row);
}
for row in db
.voice_ice_candidate()
.receiver()
.filter(identity)
.collect::<Vec<_>>()
{
db.voice_ice_candidate().delete(row);
}
}
pub fn auto_join_community_server(db: &Local, identity: Identity) {
+142 -50
View File
@@ -1,6 +1,6 @@
use spacetimedb::{ViewContext, Identity, Timestamp};
use crate::tables::*;
use crate::utils::*;
use spacetimedb::{Identity, Timestamp, ViewContext};
#[derive(spacetimedb::SpacetimeType)]
pub struct VisibleImageRow {
@@ -67,7 +67,7 @@ pub struct VisibleDirectMessageRow {
pub fn visible_channels(ctx: &ViewContext) -> Vec<VisibleChannelRow> {
let mut results = Vec::new();
let identity = ctx.sender();
// Server channels
for member in ctx.db.server_member().identity().filter(identity) {
for chan in ctx.db.channel().server_id().filter(member.server_id) {
@@ -79,7 +79,7 @@ pub fn visible_channels(ctx: &ViewContext) -> Vec<VisibleChannelRow> {
});
}
}
// DM channels
for dm in ctx.db.direct_message().sender().filter(identity) {
if let Some(chan) = ctx.db.channel().id().find(dm.channel_id) {
@@ -101,7 +101,7 @@ pub fn visible_channels(ctx: &ViewContext) -> Vec<VisibleChannelRow> {
});
}
}
results
}
@@ -109,7 +109,7 @@ pub fn visible_channels(ctx: &ViewContext) -> Vec<VisibleChannelRow> {
pub fn visible_direct_messages(ctx: &ViewContext) -> Vec<VisibleDirectMessageRow> {
let mut results = Vec::new();
let identity = ctx.sender();
for dm in ctx.db.direct_message().sender().filter(identity) {
results.push(VisibleDirectMessageRow {
id: dm.id,
@@ -130,7 +130,7 @@ pub fn visible_direct_messages(ctx: &ViewContext) -> Vec<VisibleDirectMessageRow
is_open_recipient: dm.is_open_recipient,
});
}
results
}
@@ -153,21 +153,147 @@ pub fn visible_images(ctx: &ViewContext) -> Vec<VisibleImageRow> {
#[spacetimedb::view(accessor = visible_messages, public)]
pub fn visible_messages(ctx: &ViewContext) -> Vec<VisibleMessageRow> {
let visible_seq_map = get_visible_message_ids_read_only(&ctx.db, ctx.sender());
let mut results = Vec::new();
for (msg_id, seq_id) in visible_seq_map {
if let Some(msg) = ctx.db.message().id().find(msg_id) {
let identity = ctx.sender();
// FAST PATH: Recent Messages via denormalized server_id
for member in ctx.db.server_member().identity().filter(identity) {
for rm in ctx.db.recent_message().server_id().filter(member.server_id) {
results.push(VisibleMessageRow {
id: msg.id,
sender: msg.sender,
sent: msg.sent,
text: msg.text,
channel_id: msg.channel_id,
thread_id: msg.thread_id,
seq_id,
id: rm.message_id,
sender: rm.sender,
sent: rm.sent,
text: rm.text.clone(),
channel_id: rm.channel_id,
thread_id: rm.thread_id,
seq_id: rm.seq_id,
});
}
}
// DM FAST PATH
for dm in ctx.db.direct_message().sender().filter(identity) {
if dm.is_open_sender {
for rm in ctx.db.recent_message().channel_id().filter(dm.channel_id) {
results.push(VisibleMessageRow {
id: rm.message_id,
sender: rm.sender,
sent: rm.sent,
text: rm.text.clone(),
channel_id: rm.channel_id,
thread_id: rm.thread_id,
seq_id: rm.seq_id,
});
}
}
}
for dm in ctx.db.direct_message().recipient().filter(identity) {
if dm.is_open_recipient {
for rm in ctx.db.recent_message().channel_id().filter(dm.channel_id) {
results.push(VisibleMessageRow {
id: rm.message_id,
sender: rm.sender,
sent: rm.sent,
text: rm.text.clone(),
channel_id: rm.channel_id,
thread_id: rm.thread_id,
seq_id: rm.seq_id,
});
}
}
}
// SLOW PATH: Scrollback (only for the active channel subscription)
if let Some(sub) = ctx.db.channel_subscription().identity().find(identity) {
for cms in ctx
.db
.channel_message_sequence()
.channel_id()
.filter(sub.channel_id)
{
if cms.seq_id >= sub.earliest_seq_id {
// To avoid duplicates with the Fast Path, we'd check if results already has it,
// but let's assume the frontend handles ID uniqueness for now to save CPU.
// Or better: Only pull messages from CMS that are NOT in RecentMessage.
// For simplicity and speed, we'll just pull them all if we scroll.
if let Some(msg) = ctx.db.message().id().find(cms.message_id) {
results.push(VisibleMessageRow {
id: msg.id,
sender: msg.sender,
sent: msg.sent,
text: msg.text,
channel_id: msg.channel_id,
thread_id: msg.thread_id,
seq_id: cms.seq_id,
});
}
}
}
}
results
}
#[spacetimedb::view(accessor = visible_message_images, public)]
pub fn visible_message_images(ctx: &ViewContext) -> Vec<VisibleMessageImageRow> {
let mut results = Vec::new();
let identity = ctx.sender();
// Optimized: pull all images for all joined servers in one indexed scan
for member in ctx.db.server_member().identity().filter(identity) {
for mi in ctx.db.message_image().server_id().filter(member.server_id) {
results.push(VisibleMessageImageRow {
id: mi.id,
message_id: mi.message_id,
image_id: mi.image_id,
});
}
}
// DM images
for dm in ctx.db.direct_message().sender().filter(identity) {
if dm.is_open_sender {
for mi in ctx.db.message_image().server_id().filter(0u64) {
// 0 is DM
if let Some(msg) = ctx.db.message().id().find(mi.message_id) {
if msg.channel_id == dm.channel_id {
results.push(VisibleMessageImageRow {
id: mi.id,
message_id: mi.message_id,
image_id: mi.image_id,
});
}
}
}
}
}
results
}
#[spacetimedb::view(accessor = visible_message_reactions, public)]
pub fn visible_message_reactions(ctx: &ViewContext) -> Vec<VisibleMessageReactionRow> {
let mut results = Vec::new();
let identity = ctx.sender();
// Optimized: pull all reactions for all joined servers in one indexed scan
for member in ctx.db.server_member().identity().filter(identity) {
for mr in ctx
.db
.message_reaction()
.server_id()
.filter(member.server_id)
{
results.push(VisibleMessageReactionRow {
id: mr.id,
message_id: mr.message_id,
identity: mr.identity,
emoji: mr.emoji,
custom_emoji_id: mr.custom_emoji_id,
});
}
}
results
}
@@ -184,37 +310,3 @@ pub fn my_channel_subscriptions(ctx: &ViewContext) -> Vec<MyChannelSubscriptionR
vec![]
}
}
#[spacetimedb::view(accessor = visible_message_images, public)]
pub fn visible_message_images(ctx: &ViewContext) -> Vec<VisibleMessageImageRow> {
let visible_seq_map = get_visible_message_ids_read_only(&ctx.db, ctx.sender());
let mut results = Vec::new();
for msg_id in visible_seq_map.keys() {
for mi in ctx.db.message_image().message_id().filter(*msg_id) {
results.push(VisibleMessageImageRow {
id: mi.id,
message_id: mi.message_id,
image_id: mi.image_id,
});
}
}
results
}
#[spacetimedb::view(accessor = visible_message_reactions, public)]
pub fn visible_message_reactions(ctx: &ViewContext) -> Vec<VisibleMessageReactionRow> {
let visible_seq_map = get_visible_message_ids_read_only(&ctx.db, ctx.sender());
let mut results = Vec::new();
for msg_id in visible_seq_map.keys() {
for mr in ctx.db.message_reaction().message_id().filter(*msg_id) {
results.push(VisibleMessageReactionRow {
id: mr.id,
message_id: mr.message_id,
identity: mr.identity,
emoji: mr.emoji,
custom_emoji_id: mr.custom_emoji_id,
});
}
}
results
}
+26
View File
@@ -68,6 +68,7 @@ export class MessagingService {
visibleMessagesStore.subscribe((v) => {
this.#visibleMessagesRaw = v;
this.#updateBuckets(v);
this.#checkMessageWindowLimit();
});
visibleMessageImagesStore.subscribe((v) => {
@@ -371,4 +372,29 @@ export class MessagingService {
setTyping = (channelId: bigint, typing: boolean) => {
this.#setTypingReducer({ channelId, typing });
};
#checkMessageWindowLimit = () => {
const channelId = this.#nav.activeChannelId;
if (!channelId) return;
const bucket = this.#channelBuckets.get(channelId);
if (!bucket || bucket.sorted.length <= 500) return;
// Limit reached: update the subscription window to only the latest 500
untrack(() => {
const latest500 = bucket.sorted.slice(-500);
const newEarliest = latest500[0].seqId;
if (newEarliest !== undefined) {
const sub = this.#mySubscriptions.find(s => s.channelId === channelId);
if (sub && newEarliest > sub.earliestSeqId) {
console.log(`[MessagingService] Window limit reached for channel ${channelId}. Sliding earliestSeqId to ${newEarliest}`);
this.#extendSubscriptionReducer({
channelId,
earliestSeqId: newEarliest
});
}
}
});
};
}