fix threads

This commit is contained in:
2026-04-21 22:05:34 -04:00
parent 5933b43cd3
commit 1f0631dfbc
4 changed files with 115 additions and 26 deletions
+26 -5
View File
@@ -291,13 +291,21 @@ pub fn set_status(ctx: &ReducerContext, status: Option<String>) {
#[spacetimedb::reducer] #[spacetimedb::reducer]
pub fn subscribe_to_channel(ctx: &ReducerContext, channel_id: u64) { pub fn subscribe_to_channel(ctx: &ReducerContext, channel_id: u64) {
let current_max = ctx.db.channel_internal_state().channel_id().find(channel_id) let current_max = ctx
.map(|c| c.last_seq_id).unwrap_or(0); .db
.channel_internal_state()
.channel_id()
.find(channel_id)
.map(|c| c.last_seq_id)
.unwrap_or(0);
// Initial state: earliest is same as current max (scrollback is empty) // Initial state: earliest is same as current max (scrollback is empty)
let earliest = current_max; let earliest = current_max;
ctx.db.channel_subscription().identity().delete(ctx.sender()); ctx.db
.channel_subscription()
.identity()
.delete(ctx.sender());
ctx.db.channel_subscription().insert(ChannelSubscription { ctx.db.channel_subscription().insert(ChannelSubscription {
identity: ctx.sender(), identity: ctx.sender(),
channel_id, channel_id,
@@ -321,7 +329,10 @@ pub fn extend_subscription(ctx: &ReducerContext, channel_id: u64, earliest_seq_i
if sub.channel_id == channel_id { if sub.channel_id == channel_id {
let mut sub = sub.clone(); let mut sub = sub.clone();
sub.earliest_seq_id = earliest_seq_id; sub.earliest_seq_id = earliest_seq_id;
ctx.db.channel_subscription().identity().delete(ctx.sender()); ctx.db
.channel_subscription()
.identity()
.delete(ctx.sender());
ctx.db.channel_subscription().insert(sub); ctx.db.channel_subscription().insert(sub);
} }
} }
@@ -833,6 +844,11 @@ pub fn set_configuration(ctx: &ReducerContext, key: String, value: String) {
#[spacetimedb::reducer] #[spacetimedb::reducer]
pub fn create_thread(ctx: &ReducerContext, name: String, channel_id: u64, parent_message_id: u64) { pub fn create_thread(ctx: &ReducerContext, name: String, channel_id: u64, parent_message_id: u64) {
log::info!(
"create_thread START: name={}, parent_message_id={}",
name,
parent_message_id
);
let mut thread_name = name; let mut thread_name = name;
if thread_name.trim().is_empty() { if thread_name.trim().is_empty() {
thread_name = ctx thread_name = ctx
@@ -887,6 +903,11 @@ pub fn create_thread_with_message(
image_ids: Vec<u64>, image_ids: Vec<u64>,
is_encrypted: bool, is_encrypted: bool,
) { ) {
log::info!(
"create_thread_with_message START: name={}, parent_message_id={}",
name,
parent_message_id
);
if let Err(e) = validate_name(&name) { if let Err(e) = validate_name(&name) {
return report_error(&ctx.db, ctx.sender(), "create_thread", &e, ctx.timestamp); return report_error(&ctx.db, ctx.sender(), "create_thread", &e, ctx.timestamp);
} }
@@ -902,7 +923,7 @@ pub fn create_thread_with_message(
); );
} }
pm.thread_name = Some(name); pm.thread_name = Some(name);
pm.thread_reply_count += 1; // Note: internal_send_message will increment pm.thread_reply_count for us
let pm = ctx.db.message().id().update(pm); let pm = ctx.db.message().id().update(pm);
sync_recent_message(&ctx.db, pm); sync_recent_message(&ctx.db, pm);
+28 -9
View File
@@ -293,12 +293,20 @@ pub fn internal_send_message(
is_encrypted, is_encrypted,
seq_id, seq_id,
}); });
sync_recent_message(db, msg); sync_recent_message(db, msg.clone());
// Plan D: Update parent message reply count
if let Some(tid) = thread_id {
if let Some(mut parent) = db.message().id().find(tid) {
parent.thread_reply_count += 1;
let parent = db.message().id().update(parent);
sync_recent_message(db, parent);
}
}
} }
pub fn sync_recent_message(db: &Local, msg: Message) { pub fn sync_recent_message(db: &Local, msg: Message) {
// 1. Insert into recent table let recent = RecentMessage {
db.recent_message().insert(RecentMessage {
id: msg.id, id: msg.id,
sender: msg.sender, sender: msg.sender,
sent: msg.sent, sent: msg.sent,
@@ -306,14 +314,21 @@ pub fn sync_recent_message(db: &Local, msg: Message) {
channel_id: msg.channel_id, channel_id: msg.channel_id,
server_id: msg.server_id, server_id: msg.server_id,
thread_id: msg.thread_id, thread_id: msg.thread_id,
reactions: msg.reactions.clone(), reactions: msg.reactions,
image_ids: msg.image_ids.clone(), image_ids: msg.image_ids,
thread_name: msg.thread_name.clone(), thread_name: msg.thread_name,
thread_reply_count: msg.thread_reply_count, thread_reply_count: msg.thread_reply_count,
edited: msg.edited, edited: msg.edited,
is_encrypted: msg.is_encrypted, is_encrypted: msg.is_encrypted,
seq_id: msg.seq_id, seq_id: msg.seq_id,
}); };
// 1. Upsert: Update if exists, otherwise insert
if db.recent_message().id().find(msg.id).is_some() {
db.recent_message().id().update(recent);
} else {
db.recent_message().insert(recent);
}
// 2. Prune to 50 // 2. Prune to 50
let limit = get_recent_message_limit(db); let limit = get_recent_message_limit(db);
@@ -325,10 +340,14 @@ pub fn sync_recent_message(db: &Local, msg: Message) {
if count > limit as usize { if count > limit as usize {
// Find all recent messages for this channel // Find all recent messages for this channel
let mut recent_msgs: Vec<_> = db.recent_message().channel_id().filter(msg.channel_id).collect(); let mut recent_msgs: Vec<_> = db
.recent_message()
.channel_id()
.filter(msg.channel_id)
.collect();
// Sort by seq_id ascending // Sort by seq_id ascending
recent_msgs.sort_by_key(|m| m.seq_id); recent_msgs.sort_by_key(|m| m.seq_id);
// Delete the oldest ones until we are back at the limit // Delete the oldest ones until we are back at the limit
let to_delete = count - limit as usize; let to_delete = count - limit as usize;
for i in 0..to_delete { for i in 0..to_delete {
+52 -6
View File
@@ -196,9 +196,46 @@ pub fn visible_thread_messages(ctx: &ViewContext) -> Vec<Message> {
if let Some(sub) = ctx.db.thread_subscription().identity().find(identity) { if let Some(sub) = ctx.db.thread_subscription().identity().find(identity) {
let tid = sub.thread_id; let tid = sub.thread_id;
// 1. Include the parent message itself (for metadata)
if let Some(parent) = ctx.db.message().id().find(tid) {
results.push(Message {
id: parent.id,
sender: parent.sender,
sent: parent.sent,
text: parent.text.clone(),
channel_id: parent.channel_id,
server_id: parent.server_id,
thread_id: parent.thread_id,
reactions: parent.reactions.clone(),
image_ids: parent.image_ids.clone(),
thread_name: parent.thread_name.clone(),
thread_reply_count: parent.thread_reply_count,
edited: parent.edited,
is_encrypted: parent.is_encrypted,
seq_id: parent.seq_id,
});
}
// 2. Include all replies
for msg in ctx.db.message().channel_id().filter(0u64..) { for msg in ctx.db.message().channel_id().filter(0u64..) {
if msg.thread_id == Some(tid) { if msg.thread_id == Some(tid) {
results.push(msg.clone()); results.push(Message {
id: msg.id,
sender: msg.sender,
sent: msg.sent,
text: msg.text.clone(),
channel_id: msg.channel_id,
server_id: msg.server_id,
thread_id: msg.thread_id,
reactions: msg.reactions.clone(),
image_ids: msg.image_ids.clone(),
thread_name: msg.thread_name.clone(),
thread_reply_count: msg.thread_reply_count,
edited: msg.edited,
is_encrypted: msg.is_encrypted,
seq_id: msg.seq_id,
});
} }
} }
} }
@@ -274,11 +311,20 @@ pub fn visible_scrollback_messages(ctx: &ViewContext) -> Vec<Message> {
// HIGH PERFORMANCE: Uses composite index range scan on the MAIN message table // HIGH PERFORMANCE: Uses composite index range scan on the MAIN message table
for msg in ctx.db.message().channel_seq().filter((cid, min_seq..)) { for msg in ctx.db.message().channel_seq().filter((cid, min_seq..)) {
results.push(Message { results.push(Message {
id: msg.id, sender: msg.sender, sent: msg.sent, text: msg.text.clone(), id: msg.id,
channel_id: msg.channel_id, server_id: msg.server_id, thread_id: msg.thread_id, sender: msg.sender,
reactions: msg.reactions.clone(), image_ids: msg.image_ids.clone(), sent: msg.sent,
thread_name: msg.thread_name.clone(), thread_reply_count: msg.thread_reply_count, text: msg.text.clone(),
edited: msg.edited, is_encrypted: msg.is_encrypted, seq_id: msg.seq_id, channel_id: msg.channel_id,
server_id: msg.server_id,
thread_id: msg.thread_id,
reactions: msg.reactions.clone(),
image_ids: msg.image_ids.clone(),
thread_name: msg.thread_name.clone(),
thread_reply_count: msg.thread_reply_count,
edited: msg.edited,
is_encrypted: msg.is_encrypted,
seq_id: msg.seq_id,
}); });
} }
} }
+9 -6
View File
@@ -290,12 +290,15 @@ export class MessagingService {
const threadId = this.#nav.activeThreadId; const threadId = this.#nav.activeThreadId;
if (!threadId) return []; if (!threadId) return [];
// Plan D: the database.threadMessages state already contains ONLY the messages for the active thread // Plan D: the database.threadMessages state contains the parent AND replies.
return Array.from(this.#db.threadMessages).sort((a, b) => { // We filter out the parent here because the UI renders it separately in a header.
if (a.seqId < b.seqId) return -1; return Array.from(this.#db.threadMessages)
if (a.seqId > b.seqId) return 1; .filter(m => m.id !== threadId)
return a.sent.microsSinceUnixEpoch < b.sent.microsSinceUnixEpoch ? -1 : 1; .sort((a, b) => {
}); if (a.seqId < b.seqId) return -1;
if (a.seqId > b.seqId) return 1;
return a.sent.microsSinceUnixEpoch < b.sent.microsSinceUnixEpoch ? -1 : 1;
});
} }
get hasMoreMessages() { get hasMoreMessages() {