diff --git a/spacetimedb/src/lib.rs b/spacetimedb/src/lib.rs index c618c16..096b5cd 100644 --- a/spacetimedb/src/lib.rs +++ b/spacetimedb/src/lib.rs @@ -167,7 +167,8 @@ pub fn on_disconnect(ctx: &ReducerContext) { ctx.db.typing_activity().delete(ta); } - ctx.db.join_server_status().identity().delete(ctx.sender()); + ctx.db.reducer_status().identity().delete(ctx.sender()); + ctx.db.thread_subscription().identity().delete(ctx.sender()); clear_user_presence(&ctx.db, ctx.sender()); } diff --git a/spacetimedb/src/reducers.rs b/spacetimedb/src/reducers.rs index 53011e9..5a97724 100644 --- a/spacetimedb/src/reducers.rs +++ b/spacetimedb/src/reducers.rs @@ -228,9 +228,16 @@ pub fn extend_subscription(ctx: &ReducerContext, channel_id: u64, earliest_seq_i #[spacetimedb::reducer] pub fn create_server(ctx: &ReducerContext, name: String) { - validate_name(&name).expect("Invalid name"); - let user = ctx.db.user().identity().find(ctx.sender()).expect("User not found"); - let s = ctx.db.server().insert(Server { id: 0, name, owner: Some(ctx.sender()), avatar_id: None, channels: Vec::new(), public: false }); + if let Err(e) = validate_name(&name) { + return report_error(&ctx.db, ctx.sender(), "create_server", &e, ctx.timestamp); + } + + let user = match ctx.db.user().identity().find(ctx.sender()) { + Some(u) => u, + None => return report_error(&ctx.db, ctx.sender(), "create_server", "User not found", ctx.timestamp), + }; + + let s = ctx.db.server().insert(Server { id: 0, name: name.clone(), owner: Some(ctx.sender()), avatar_id: None, channels: Vec::new(), public: false }); ctx.db.server_member().insert(ServerMember { id: 0, identity: ctx.sender(), server_id: s.id, name: user.name.clone(), avatar_id: user.avatar_id, online: user.online }); let c1 = ctx.db.channel().insert(Channel { id: 0, server_id: s.id, name: "general".to_string(), kind: ChannelKind::Text }); let c2 = ctx.db.channel().insert(Channel { id: 0, server_id: s.id, name: "voice".to_string(), kind: ChannelKind::Voice }); @@ -240,6 +247,8 @@ pub fn create_server(ctx: &ReducerContext, name: String) { s.channels.push(ChannelMetadata { id: c2.id, name: c2.name, kind: c2.kind }); ctx.db.server().id().update(s.clone()); sync_server_access(&ctx.db, ctx.sender(), s.id); + + report_success(&ctx.db, ctx.sender(), "create_server", ctx.timestamp); } use rand::distributions::{Alphanumeric, DistString}; @@ -248,7 +257,7 @@ use rand::distributions::{Alphanumeric, DistString}; pub fn create_invite(ctx: &ReducerContext, server_id: u64, max_uses: Option, expires_in_hrs: Option) { // Only members can invite if !ctx.db.server_member().identity().filter(ctx.sender()).any(|m| m.server_id == server_id) { - panic!("Only server members can create invites"); + return report_error(&ctx.db, ctx.sender(), "create_invite", "Only server members can create invites", ctx.timestamp); } // Generate a 12-character alphanumeric code using the provided deterministic RNG @@ -267,55 +276,34 @@ pub fn create_invite(ctx: &ReducerContext, server_id: u64, max_uses: Option uses_remaining: max_uses, }); - // Send the generated code back to the client via JoinServerStatus - ctx.db.join_server_status().identity().delete(ctx.sender()); - ctx.db.join_server_status().insert(JoinServerStatus { - identity: ctx.sender(), - status: "invite_created".to_string(), - server_id: Some(server_id), - error: Some(code), // We use the error field to carry the code for this status - }); + // Send the generated code back to the client via ReducerStatus + report_success_with_payload(&ctx.db, ctx.sender(), "create_invite", &code, ctx.timestamp); } #[spacetimedb::reducer] pub fn join_server(ctx: &ReducerContext, server_id: Option, invite_code: Option) { let sender = ctx.sender(); - // Helper to record error and return - let fail = |db: &spacetimedb::Local, err: &str| { - db.join_server_status().identity().delete(sender); - db.join_server_status().insert(JoinServerStatus { - identity: sender, - status: "error".to_string(), - server_id: None, - error: Some(err.to_string()), - }); + let user = match ctx.db.user().identity().find(sender) { + Some(u) => u, + None => return report_error(&ctx.db, sender, "join_server", "User not found", ctx.timestamp), }; - - let user = ctx.db.user().identity().find(sender).expect("User not found"); let target_server_id = if let Some(id) = server_id { id } else if let Some(ref code) = invite_code { let invite = match ctx.db.invite().code().find(code.clone()) { Some(i) => i, - None => { - fail(&ctx.db, "Invalid invite code"); - return; - } + None => return report_error(&ctx.db, sender, "join_server", "Invalid invite code", ctx.timestamp), }; invite.server_id } else { - fail(&ctx.db, "Either server_id or invite_code must be provided"); - return; + return report_error(&ctx.db, sender, "join_server", "Either server_id or invite_code must be provided", ctx.timestamp); }; let s = match ctx.db.server().id().find(target_server_id) { Some(s) => s, - None => { - fail(&ctx.db, "Server not found"); - return; - } + None => return report_error(&ctx.db, sender, "join_server", "Server not found", ctx.timestamp), }; // Permission check: if private, must have a valid invite code @@ -323,30 +311,24 @@ pub fn join_server(ctx: &ReducerContext, server_id: Option, invite_code: Op if let Some(code) = invite_code { let invite = match ctx.db.invite().code().find(code) { Some(i) => i, - None => { - fail(&ctx.db, "Invalid invite code"); - return; - } + None => return report_error(&ctx.db, sender, "join_server", "Invalid invite code", ctx.timestamp), }; if invite.server_id != target_server_id { - fail(&ctx.db, "Invite code does not match server"); - return; + return report_error(&ctx.db, sender, "join_server", "Invite code does not match server", ctx.timestamp); } // Expiry check if let Some(expiry) = invite.expires_at { if ctx.timestamp > expiry { - fail(&ctx.db, "Invite code expired"); - return; + return report_error(&ctx.db, sender, "join_server", "Invite code expired", ctx.timestamp); } } // Uses check if let Some(mut uses) = invite.uses_remaining { if uses == 0 { - fail(&ctx.db, "Invite code usage limit reached"); - return; + return report_error(&ctx.db, sender, "join_server", "Invite code usage limit reached", ctx.timestamp); } uses -= 1; let mut invite = invite.clone(); @@ -354,32 +336,23 @@ pub fn join_server(ctx: &ReducerContext, server_id: Option, invite_code: Op ctx.db.invite().code().update(invite); } } else { - fail(&ctx.db, "Invite code required for private server"); - return; + return report_error(&ctx.db, sender, "join_server", "Invite code required for private server", ctx.timestamp); } } if !s.public && user.subject.is_none() && user.name.is_none() { - fail(&ctx.db, "DisplayName required for private server"); - return; + return report_error(&ctx.db, sender, "join_server", "DisplayName required for private server", ctx.timestamp); } if ctx.db.server_member().identity().filter(sender).any(|m| m.server_id == target_server_id) { - fail(&ctx.db, "Already a member of this server"); - return; + return report_error(&ctx.db, sender, "join_server", "Already a member of this server", ctx.timestamp); } ctx.db.server_member().insert(ServerMember { id: 0, identity: sender, server_id: target_server_id, name: user.name.clone(), avatar_id: user.avatar_id, online: user.online }); sync_server_access(&ctx.db, sender, target_server_id); // Record success - ctx.db.join_server_status().identity().delete(sender); - ctx.db.join_server_status().insert(JoinServerStatus { - identity: sender, - status: "success".to_string(), - server_id: Some(target_server_id), - error: None, - }); + report_success_with_payload(&ctx.db, sender, "join_server", &target_server_id.to_string(), ctx.timestamp); } #[spacetimedb::reducer] @@ -393,17 +366,28 @@ pub fn leave_server(ctx: &ReducerContext, server_id: u64) { for i in invites { ctx.db.invite().code().delete(i.code); } revoke_server_access(&ctx.db, sender, server_id); + report_success(&ctx.db, sender, "leave_server", ctx.timestamp); } #[spacetimedb::reducer] pub fn create_channel(ctx: &ReducerContext, name: String, server_id: u64, is_voice: bool) { - validate_name(&name).expect("Invalid name"); - let mut s = ctx.db.server().id().find(server_id).expect("Server not found"); + if let Err(e) = validate_name(&name) { + return report_error(&ctx.db, ctx.sender(), "create_channel", &e, ctx.timestamp); + } + let mut s = match ctx.db.server().id().find(server_id) { + Some(s) => s, + None => return report_error(&ctx.db, ctx.sender(), "create_channel", "Server not found", ctx.timestamp), + }; + if s.owner != Some(ctx.sender()) { + return report_error(&ctx.db, ctx.sender(), "create_channel", "Only owner can create channels", ctx.timestamp); + } + let kind = if is_voice { ChannelKind::Voice } else { ChannelKind::Text }; let chan = ctx.db.channel().insert(Channel { id: 0, server_id, name: name.clone(), kind }); s.channels.push(ChannelMetadata { id: chan.id, name, kind }); ctx.db.server().id().update(s); for m in ctx.db.server_member().server_id().filter(server_id) { grant_user_channel_access(&ctx.db, m.identity, chan.id); } + report_success(&ctx.db, ctx.sender(), "create_channel", ctx.timestamp); } #[spacetimedb::reducer] @@ -485,13 +469,6 @@ pub fn send_webrtc_signal(ctx: &ReducerContext, receiver: Identity, signal_kind: #[spacetimedb::reducer] pub fn send_message(ctx: &ReducerContext, text: String, channel_id: u64, thread_id: Option, image_ids: Vec, is_encrypted: bool) { internal_send_message(&ctx.db, ctx.sender(), channel_id, text, ctx.timestamp, thread_id, image_ids, is_encrypted); - if let Some(tid) = thread_id { - if let Some(thread) = ctx.db.thread().id().find(tid) { - if let Some(mut pm) = ctx.db.message().id().find(thread.parent_message_id) { - pm.thread_reply_count += 1; ctx.db.message().id().update(pm); - } - } - } } #[spacetimedb::reducer] @@ -509,19 +486,40 @@ pub fn create_thread(ctx: &ReducerContext, name: String, channel_id: u64, parent .map(|m| if m.text.trim().is_empty() { "New Thread".to_string() } else { m.text.chars().take(32).collect() }) .unwrap_or("New Thread".to_string()); } - validate_name(&thread_name).expect("Invalid name"); - ctx.db.thread().insert(Thread { id: 0, channel_id, parent_message_id, name: thread_name.clone() }); + if let Err(e) = validate_name(&thread_name) { + return report_error(&ctx.db, ctx.sender(), "create_thread", &e, ctx.timestamp); + } + if let Some(mut pm) = ctx.db.message().id().find(parent_message_id) { - pm.thread_name = Some(thread_name); ctx.db.message().id().update(pm); + if pm.channel_id != channel_id { + return report_error(&ctx.db, ctx.sender(), "create_thread", "Channel ID mismatch", ctx.timestamp); + } + pm.thread_name = Some(thread_name); + ctx.db.message().id().update(pm); + report_success(&ctx.db, ctx.sender(), "create_thread", ctx.timestamp); + } else { + report_error(&ctx.db, ctx.sender(), "create_thread", "Parent message not found", ctx.timestamp); } } #[spacetimedb::reducer] pub fn create_thread_with_message(ctx: &ReducerContext, name: String, channel_id: u64, parent_message_id: u64, text: String, image_ids: Vec, is_encrypted: bool) { - let t = ctx.db.thread().insert(Thread { id: 0, channel_id, parent_message_id, name: name.clone() }); - internal_send_message(&ctx.db, ctx.sender(), channel_id, text, ctx.timestamp, Some(t.id), image_ids, is_encrypted); + if let Err(e) = validate_name(&name) { + return report_error(&ctx.db, ctx.sender(), "create_thread", &e, ctx.timestamp); + } + if let Some(mut pm) = ctx.db.message().id().find(parent_message_id) { - pm.thread_name = Some(name); pm.thread_reply_count += 1; ctx.db.message().id().update(pm); + if pm.channel_id != channel_id { + return report_error(&ctx.db, ctx.sender(), "create_thread", "Channel ID mismatch", ctx.timestamp); + } + pm.thread_name = Some(name); + pm.thread_reply_count += 1; + ctx.db.message().id().update(pm); + + internal_send_message(&ctx.db, ctx.sender(), channel_id, text, ctx.timestamp, Some(parent_message_id), image_ids, is_encrypted); + report_success(&ctx.db, ctx.sender(), "create_thread", ctx.timestamp); + } else { + report_error(&ctx.db, ctx.sender(), "create_thread", "Parent message not found", ctx.timestamp); } } @@ -553,3 +551,15 @@ pub fn close_direct_message(ctx: &ReducerContext, channel_id: u64) { ctx.db.direct_message().id().update(dm); } } + +#[spacetimedb::reducer] +pub fn open_thread(ctx: &ReducerContext, thread_id: u64) { + let identity = ctx.sender(); + ctx.db.thread_subscription().identity().delete(identity); + ctx.db.thread_subscription().insert(ThreadSubscription { identity, thread_id }); +} + +#[spacetimedb::reducer] +pub fn close_thread(ctx: &ReducerContext) { + ctx.db.thread_subscription().identity().delete(ctx.sender()); +} diff --git a/spacetimedb/src/tables.rs b/spacetimedb/src/tables.rs index 953b83a..d2a2ea5 100644 --- a/spacetimedb/src/tables.rs +++ b/spacetimedb/src/tables.rs @@ -165,17 +165,12 @@ pub struct ChannelSubscription { pub last_read_seq_id: u64, } -#[spacetimedb::table(accessor = thread, public)] +#[spacetimedb::table(accessor = thread_subscription)] #[derive(Clone)] -pub struct Thread { +pub struct ThreadSubscription { #[primary_key] - #[auto_inc] - pub id: u64, - #[index(btree)] - pub channel_id: u64, - #[unique] - pub parent_message_id: u64, - pub name: String, + pub identity: Identity, + pub thread_id: u64, } #[derive(spacetimedb::SpacetimeType, Clone, Debug)] @@ -251,7 +246,7 @@ pub struct ImageBlobRequest { pub image_id: u64, } -#[spacetimedb::table(accessor = typing_activity, public)] +#[spacetimedb::table(accessor = typing_activity)] #[derive(Clone)] pub struct TypingActivity { #[primary_key] @@ -282,14 +277,18 @@ pub struct UploadStatus { pub error: Option, } -#[spacetimedb::table(accessor = join_server_status, public)] +#[spacetimedb::table(accessor = reducer_status, public)] #[derive(Clone)] -pub struct JoinServerStatus { +pub struct ReducerStatus { #[primary_key] pub identity: Identity, - pub status: String, // "success", "error" - pub server_id: Option, + // We could use a composite key if we want to track multiple reducers, + // but for most UI feedback, one "current" status per user is usually enough. + // Let's use a single row per user for now for simplicity. + pub reducer_name: String, + pub status: String, // "success", "error", "processing" pub error: Option, + pub last_update: Timestamp, } #[spacetimedb::table(accessor = invite)] diff --git a/spacetimedb/src/utils.rs b/spacetimedb/src/utils.rs index f45ffc0..387c654 100644 --- a/spacetimedb/src/utils.rs +++ b/spacetimedb/src/utils.rs @@ -261,3 +261,36 @@ pub fn clear_signaling_for_user(db: &Local, identity: Identity) { let signals: Vec<_> = db.webrtc_signal().sender().filter(identity).map(|s| s.id).collect(); for id in signals { db.webrtc_signal().id().delete(id); } } + +pub fn report_error(db: &Local, identity: Identity, reducer_name: &str, error: &str, timestamp: spacetimedb::Timestamp) { + db.reducer_status().identity().delete(identity); + db.reducer_status().insert(ReducerStatus { + identity, + reducer_name: reducer_name.to_string(), + status: "error".to_string(), + error: Some(error.to_string()), + last_update: timestamp, + }); +} + +pub fn report_success(db: &Local, identity: Identity, reducer_name: &str, timestamp: spacetimedb::Timestamp) { + db.reducer_status().identity().delete(identity); + db.reducer_status().insert(ReducerStatus { + identity, + reducer_name: reducer_name.to_string(), + status: "success".to_string(), + error: None, + last_update: timestamp, + }); +} + +pub fn report_success_with_payload(db: &Local, identity: Identity, reducer_name: &str, payload: &str, timestamp: spacetimedb::Timestamp) { + db.reducer_status().identity().delete(identity); + db.reducer_status().insert(ReducerStatus { + identity, + reducer_name: reducer_name.to_string(), + status: "success".to_string(), + error: Some(payload.to_string()), // We reuse the error field as a general-purpose result payload for success + last_update: timestamp, + }); +} diff --git a/spacetimedb/src/views.rs b/spacetimedb/src/views.rs index 4a4b29a..e08f7d9 100644 --- a/spacetimedb/src/views.rs +++ b/spacetimedb/src/views.rs @@ -77,7 +77,7 @@ pub fn visible_recent_activity(ctx: &ViewContext) -> Vec { for access in ctx.db.user_channel_access().identity().filter(identity) { let last_seq_id = ctx.db.channel_internal_state().channel_id().find(access.channel_id) .map(|s| s.last_seq_id).unwrap_or(0); - + let limit = get_recent_message_limit_read_only(&ctx.db); let min_seq = if last_seq_id > limit { last_seq_id - (limit - 1) } else { 1 }; @@ -155,6 +155,22 @@ pub fn visible_direct_messages(ctx: &ViewContext) -> impl Query { .r#where(move |dm| dm.sender.eq(identity).or(dm.recipient.eq(identity))) } +#[spacetimedb::view(accessor = visible_thread_messages, public)] +pub fn visible_thread_messages(ctx: &ViewContext) -> Vec { + let identity = ctx.sender(); + let mut results = Vec::new(); + + if let Some(sub) = ctx.db.thread_subscription().identity().find(identity) { + let tid = sub.thread_id; + for msg in ctx.db.message().channel_id().filter(0u64..) { + if msg.thread_id == Some(tid) { + results.push(msg.clone()); + } + } + } + results +} + #[spacetimedb::view(accessor = visible_images, public)] pub fn visible_images(ctx: &ViewContext) -> Vec { let image_ids = get_visible_image_ids_read_only(&ctx.db, ctx.sender()); @@ -219,27 +235,6 @@ pub fn visible_scrollback_messages(ctx: &ViewContext) -> impl Query { } } -#[spacetimedb::view(accessor = visible_scrollback_thread_messages, public)] -pub fn visible_scrollback_thread_messages(ctx: &ViewContext) -> Vec { - let identity = ctx.sender(); - let mut results = Vec::new(); - - if let Some(sub) = ctx.db.channel_subscription().identity().find(identity) { - for msg in ctx.db.message().channel_id().filter(sub.channel_id) { - if msg.thread_id.is_some() { - results.push(VisibleMessageRow { - id: msg.id, sender: msg.sender, sent: msg.sent, text: msg.text.clone(), - channel_id: msg.channel_id, thread_id: msg.thread_id, seq_id: msg.seq_id, - reactions: msg.reactions.clone(), image_ids: msg.image_ids.clone(), - thread_name: msg.thread_name.clone(), thread_reply_count: msg.thread_reply_count, - edited: msg.edited, is_encrypted: msg.is_encrypted, - }); - } - } - } - results -} - #[spacetimedb::view(accessor = my_channel_subscriptions, public)] pub fn my_channel_subscriptions(ctx: &ViewContext) -> Vec { if let Some(sub) = ctx.db.channel_subscription().identity().find(ctx.sender()) { diff --git a/src/chat/ChatContainer.svelte b/src/chat/ChatContainer.svelte index 0388604..78f5c80 100644 --- a/src/chat/ChatContainer.svelte +++ b/src/chat/ChatContainer.svelte @@ -85,6 +85,19 @@ } }); + // Global Reducer Error Handler + $effect(() => { + const status = chat.reducerStatus; + if (status?.status === "error") { + chat.confirmModal = { + title: `Action Failed: ${status.reducerName}`, + message: status.error || "An unknown error occurred during this action.", + confirmText: "Dismiss", + onConfirm: () => {} + }; + } + }); + let showSettings = $state(false); let showMemberList = $state(true); let showSidebar = $state(true); // Toggle for mobile diff --git a/src/chat/components/MessageItem.svelte b/src/chat/components/MessageItem.svelte index 97d1038..c114fee 100644 --- a/src/chat/components/MessageItem.svelte +++ b/src/chat/components/MessageItem.svelte @@ -527,7 +527,24 @@ {#if !isThread && (msg.threadId || msg.threadName)}