diff --git a/spacetimedb/src/reducers.rs b/spacetimedb/src/reducers.rs index 55f6435..2e1f3b4 100644 --- a/spacetimedb/src/reducers.rs +++ b/spacetimedb/src/reducers.rs @@ -291,13 +291,21 @@ pub fn set_status(ctx: &ReducerContext, status: Option) { #[spacetimedb::reducer] pub fn subscribe_to_channel(ctx: &ReducerContext, channel_id: u64) { - let current_max = ctx.db.channel_internal_state().channel_id().find(channel_id) - .map(|c| c.last_seq_id).unwrap_or(0); + let current_max = ctx + .db + .channel_internal_state() + .channel_id() + .find(channel_id) + .map(|c| c.last_seq_id) + .unwrap_or(0); // Initial state: earliest is same as current max (scrollback is empty) let earliest = current_max; - ctx.db.channel_subscription().identity().delete(ctx.sender()); + ctx.db + .channel_subscription() + .identity() + .delete(ctx.sender()); ctx.db.channel_subscription().insert(ChannelSubscription { identity: ctx.sender(), channel_id, @@ -321,7 +329,10 @@ pub fn extend_subscription(ctx: &ReducerContext, channel_id: u64, earliest_seq_i if sub.channel_id == channel_id { let mut sub = sub.clone(); sub.earliest_seq_id = earliest_seq_id; - ctx.db.channel_subscription().identity().delete(ctx.sender()); + ctx.db + .channel_subscription() + .identity() + .delete(ctx.sender()); ctx.db.channel_subscription().insert(sub); } } @@ -833,6 +844,11 @@ pub fn set_configuration(ctx: &ReducerContext, key: String, value: String) { #[spacetimedb::reducer] pub fn create_thread(ctx: &ReducerContext, name: String, channel_id: u64, parent_message_id: u64) { + log::info!( + "create_thread START: name={}, parent_message_id={}", + name, + parent_message_id + ); let mut thread_name = name; if thread_name.trim().is_empty() { thread_name = ctx @@ -887,6 +903,11 @@ pub fn create_thread_with_message( image_ids: Vec, is_encrypted: bool, ) { + log::info!( + "create_thread_with_message START: name={}, parent_message_id={}", + name, + parent_message_id + ); if let Err(e) = validate_name(&name) { return report_error(&ctx.db, ctx.sender(), "create_thread", &e, ctx.timestamp); } @@ -902,7 +923,7 @@ pub fn create_thread_with_message( ); } pm.thread_name = Some(name); - pm.thread_reply_count += 1; + // Note: internal_send_message will increment pm.thread_reply_count for us let pm = ctx.db.message().id().update(pm); sync_recent_message(&ctx.db, pm); diff --git a/spacetimedb/src/utils.rs b/spacetimedb/src/utils.rs index b601c36..75ae167 100644 --- a/spacetimedb/src/utils.rs +++ b/spacetimedb/src/utils.rs @@ -293,12 +293,20 @@ pub fn internal_send_message( is_encrypted, seq_id, }); - sync_recent_message(db, msg); + sync_recent_message(db, msg.clone()); + + // Plan D: Update parent message reply count + if let Some(tid) = thread_id { + if let Some(mut parent) = db.message().id().find(tid) { + parent.thread_reply_count += 1; + let parent = db.message().id().update(parent); + sync_recent_message(db, parent); + } + } } pub fn sync_recent_message(db: &Local, msg: Message) { - // 1. Insert into recent table - db.recent_message().insert(RecentMessage { + let recent = RecentMessage { id: msg.id, sender: msg.sender, sent: msg.sent, @@ -306,14 +314,21 @@ pub fn sync_recent_message(db: &Local, msg: Message) { channel_id: msg.channel_id, server_id: msg.server_id, thread_id: msg.thread_id, - reactions: msg.reactions.clone(), - image_ids: msg.image_ids.clone(), - thread_name: msg.thread_name.clone(), + reactions: msg.reactions, + image_ids: msg.image_ids, + thread_name: msg.thread_name, thread_reply_count: msg.thread_reply_count, edited: msg.edited, is_encrypted: msg.is_encrypted, seq_id: msg.seq_id, - }); + }; + + // 1. Upsert: Update if exists, otherwise insert + if db.recent_message().id().find(msg.id).is_some() { + db.recent_message().id().update(recent); + } else { + db.recent_message().insert(recent); + } // 2. Prune to 50 let limit = get_recent_message_limit(db); @@ -325,10 +340,14 @@ pub fn sync_recent_message(db: &Local, msg: Message) { if count > limit as usize { // Find all recent messages for this channel - let mut recent_msgs: Vec<_> = db.recent_message().channel_id().filter(msg.channel_id).collect(); + let mut recent_msgs: Vec<_> = db + .recent_message() + .channel_id() + .filter(msg.channel_id) + .collect(); // Sort by seq_id ascending recent_msgs.sort_by_key(|m| m.seq_id); - + // Delete the oldest ones until we are back at the limit let to_delete = count - limit as usize; for i in 0..to_delete { diff --git a/spacetimedb/src/views.rs b/spacetimedb/src/views.rs index 93eabce..93aa411 100644 --- a/spacetimedb/src/views.rs +++ b/spacetimedb/src/views.rs @@ -196,9 +196,46 @@ pub fn visible_thread_messages(ctx: &ViewContext) -> Vec { if let Some(sub) = ctx.db.thread_subscription().identity().find(identity) { let tid = sub.thread_id; + + // 1. Include the parent message itself (for metadata) + if let Some(parent) = ctx.db.message().id().find(tid) { + results.push(Message { + id: parent.id, + sender: parent.sender, + sent: parent.sent, + text: parent.text.clone(), + channel_id: parent.channel_id, + server_id: parent.server_id, + thread_id: parent.thread_id, + reactions: parent.reactions.clone(), + image_ids: parent.image_ids.clone(), + thread_name: parent.thread_name.clone(), + thread_reply_count: parent.thread_reply_count, + edited: parent.edited, + is_encrypted: parent.is_encrypted, + seq_id: parent.seq_id, + }); + } + + // 2. Include all replies for msg in ctx.db.message().channel_id().filter(0u64..) { if msg.thread_id == Some(tid) { - results.push(msg.clone()); + results.push(Message { + id: msg.id, + sender: msg.sender, + sent: msg.sent, + text: msg.text.clone(), + channel_id: msg.channel_id, + server_id: msg.server_id, + thread_id: msg.thread_id, + reactions: msg.reactions.clone(), + image_ids: msg.image_ids.clone(), + thread_name: msg.thread_name.clone(), + thread_reply_count: msg.thread_reply_count, + edited: msg.edited, + is_encrypted: msg.is_encrypted, + seq_id: msg.seq_id, + }); } } } @@ -274,11 +311,20 @@ pub fn visible_scrollback_messages(ctx: &ViewContext) -> Vec { // HIGH PERFORMANCE: Uses composite index range scan on the MAIN message table for msg in ctx.db.message().channel_seq().filter((cid, min_seq..)) { results.push(Message { - id: msg.id, sender: msg.sender, sent: msg.sent, text: msg.text.clone(), - channel_id: msg.channel_id, server_id: msg.server_id, thread_id: msg.thread_id, - reactions: msg.reactions.clone(), image_ids: msg.image_ids.clone(), - thread_name: msg.thread_name.clone(), thread_reply_count: msg.thread_reply_count, - edited: msg.edited, is_encrypted: msg.is_encrypted, seq_id: msg.seq_id, + id: msg.id, + sender: msg.sender, + sent: msg.sent, + text: msg.text.clone(), + channel_id: msg.channel_id, + server_id: msg.server_id, + thread_id: msg.thread_id, + reactions: msg.reactions.clone(), + image_ids: msg.image_ids.clone(), + thread_name: msg.thread_name.clone(), + thread_reply_count: msg.thread_reply_count, + edited: msg.edited, + is_encrypted: msg.is_encrypted, + seq_id: msg.seq_id, }); } } diff --git a/src/chat/services/messaging.svelte.ts b/src/chat/services/messaging.svelte.ts index a3a6b7f..8d97df5 100644 --- a/src/chat/services/messaging.svelte.ts +++ b/src/chat/services/messaging.svelte.ts @@ -290,12 +290,15 @@ export class MessagingService { const threadId = this.#nav.activeThreadId; if (!threadId) return []; - // Plan D: the database.threadMessages state already contains ONLY the messages for the active thread - return Array.from(this.#db.threadMessages).sort((a, b) => { - if (a.seqId < b.seqId) return -1; - if (a.seqId > b.seqId) return 1; - return a.sent.microsSinceUnixEpoch < b.sent.microsSinceUnixEpoch ? -1 : 1; - }); + // Plan D: the database.threadMessages state contains the parent AND replies. + // We filter out the parent here because the UI renders it separately in a header. + return Array.from(this.#db.threadMessages) + .filter(m => m.id !== threadId) + .sort((a, b) => { + if (a.seqId < b.seqId) return -1; + if (a.seqId > b.seqId) return 1; + return a.sent.microsSinceUnixEpoch < b.sent.microsSinceUnixEpoch ? -1 : 1; + }); } get hasMoreMessages() {