remove thread table

This commit is contained in:
2026-04-21 20:27:52 -04:00
parent b9feb7acbf
commit 2c5b1288a6
11 changed files with 288 additions and 153 deletions
+2 -1
View File
@@ -167,7 +167,8 @@ pub fn on_disconnect(ctx: &ReducerContext) {
ctx.db.typing_activity().delete(ta);
}
ctx.db.join_server_status().identity().delete(ctx.sender());
ctx.db.reducer_status().identity().delete(ctx.sender());
ctx.db.thread_subscription().identity().delete(ctx.sender());
clear_user_presence(&ctx.db, ctx.sender());
}
+81 -71
View File
@@ -228,9 +228,16 @@ pub fn extend_subscription(ctx: &ReducerContext, channel_id: u64, earliest_seq_i
#[spacetimedb::reducer]
pub fn create_server(ctx: &ReducerContext, name: String) {
validate_name(&name).expect("Invalid name");
let user = ctx.db.user().identity().find(ctx.sender()).expect("User not found");
let s = ctx.db.server().insert(Server { id: 0, name, owner: Some(ctx.sender()), avatar_id: None, channels: Vec::new(), public: false });
if let Err(e) = validate_name(&name) {
return report_error(&ctx.db, ctx.sender(), "create_server", &e, ctx.timestamp);
}
let user = match ctx.db.user().identity().find(ctx.sender()) {
Some(u) => u,
None => return report_error(&ctx.db, ctx.sender(), "create_server", "User not found", ctx.timestamp),
};
let s = ctx.db.server().insert(Server { id: 0, name: name.clone(), owner: Some(ctx.sender()), avatar_id: None, channels: Vec::new(), public: false });
ctx.db.server_member().insert(ServerMember { id: 0, identity: ctx.sender(), server_id: s.id, name: user.name.clone(), avatar_id: user.avatar_id, online: user.online });
let c1 = ctx.db.channel().insert(Channel { id: 0, server_id: s.id, name: "general".to_string(), kind: ChannelKind::Text });
let c2 = ctx.db.channel().insert(Channel { id: 0, server_id: s.id, name: "voice".to_string(), kind: ChannelKind::Voice });
@@ -240,6 +247,8 @@ pub fn create_server(ctx: &ReducerContext, name: String) {
s.channels.push(ChannelMetadata { id: c2.id, name: c2.name, kind: c2.kind });
ctx.db.server().id().update(s.clone());
sync_server_access(&ctx.db, ctx.sender(), s.id);
report_success(&ctx.db, ctx.sender(), "create_server", ctx.timestamp);
}
use rand::distributions::{Alphanumeric, DistString};
@@ -248,7 +257,7 @@ use rand::distributions::{Alphanumeric, DistString};
pub fn create_invite(ctx: &ReducerContext, server_id: u64, max_uses: Option<u32>, expires_in_hrs: Option<u32>) {
// Only members can invite
if !ctx.db.server_member().identity().filter(ctx.sender()).any(|m| m.server_id == server_id) {
panic!("Only server members can create invites");
return report_error(&ctx.db, ctx.sender(), "create_invite", "Only server members can create invites", ctx.timestamp);
}
// Generate a 12-character alphanumeric code using the provided deterministic RNG
@@ -267,55 +276,34 @@ pub fn create_invite(ctx: &ReducerContext, server_id: u64, max_uses: Option<u32>
uses_remaining: max_uses,
});
// Send the generated code back to the client via JoinServerStatus
ctx.db.join_server_status().identity().delete(ctx.sender());
ctx.db.join_server_status().insert(JoinServerStatus {
identity: ctx.sender(),
status: "invite_created".to_string(),
server_id: Some(server_id),
error: Some(code), // We use the error field to carry the code for this status
});
// Send the generated code back to the client via ReducerStatus
report_success_with_payload(&ctx.db, ctx.sender(), "create_invite", &code, ctx.timestamp);
}
#[spacetimedb::reducer]
pub fn join_server(ctx: &ReducerContext, server_id: Option<u64>, invite_code: Option<String>) {
let sender = ctx.sender();
// Helper to record error and return
let fail = |db: &spacetimedb::Local, err: &str| {
db.join_server_status().identity().delete(sender);
db.join_server_status().insert(JoinServerStatus {
identity: sender,
status: "error".to_string(),
server_id: None,
error: Some(err.to_string()),
});
let user = match ctx.db.user().identity().find(sender) {
Some(u) => u,
None => return report_error(&ctx.db, sender, "join_server", "User not found", ctx.timestamp),
};
let user = ctx.db.user().identity().find(sender).expect("User not found");
let target_server_id = if let Some(id) = server_id {
id
} else if let Some(ref code) = invite_code {
let invite = match ctx.db.invite().code().find(code.clone()) {
Some(i) => i,
None => {
fail(&ctx.db, "Invalid invite code");
return;
}
None => return report_error(&ctx.db, sender, "join_server", "Invalid invite code", ctx.timestamp),
};
invite.server_id
} else {
fail(&ctx.db, "Either server_id or invite_code must be provided");
return;
return report_error(&ctx.db, sender, "join_server", "Either server_id or invite_code must be provided", ctx.timestamp);
};
let s = match ctx.db.server().id().find(target_server_id) {
Some(s) => s,
None => {
fail(&ctx.db, "Server not found");
return;
}
None => return report_error(&ctx.db, sender, "join_server", "Server not found", ctx.timestamp),
};
// Permission check: if private, must have a valid invite code
@@ -323,30 +311,24 @@ pub fn join_server(ctx: &ReducerContext, server_id: Option<u64>, invite_code: Op
if let Some(code) = invite_code {
let invite = match ctx.db.invite().code().find(code) {
Some(i) => i,
None => {
fail(&ctx.db, "Invalid invite code");
return;
}
None => return report_error(&ctx.db, sender, "join_server", "Invalid invite code", ctx.timestamp),
};
if invite.server_id != target_server_id {
fail(&ctx.db, "Invite code does not match server");
return;
return report_error(&ctx.db, sender, "join_server", "Invite code does not match server", ctx.timestamp);
}
// Expiry check
if let Some(expiry) = invite.expires_at {
if ctx.timestamp > expiry {
fail(&ctx.db, "Invite code expired");
return;
return report_error(&ctx.db, sender, "join_server", "Invite code expired", ctx.timestamp);
}
}
// Uses check
if let Some(mut uses) = invite.uses_remaining {
if uses == 0 {
fail(&ctx.db, "Invite code usage limit reached");
return;
return report_error(&ctx.db, sender, "join_server", "Invite code usage limit reached", ctx.timestamp);
}
uses -= 1;
let mut invite = invite.clone();
@@ -354,32 +336,23 @@ pub fn join_server(ctx: &ReducerContext, server_id: Option<u64>, invite_code: Op
ctx.db.invite().code().update(invite);
}
} else {
fail(&ctx.db, "Invite code required for private server");
return;
return report_error(&ctx.db, sender, "join_server", "Invite code required for private server", ctx.timestamp);
}
}
if !s.public && user.subject.is_none() && user.name.is_none() {
fail(&ctx.db, "DisplayName required for private server");
return;
return report_error(&ctx.db, sender, "join_server", "DisplayName required for private server", ctx.timestamp);
}
if ctx.db.server_member().identity().filter(sender).any(|m| m.server_id == target_server_id) {
fail(&ctx.db, "Already a member of this server");
return;
return report_error(&ctx.db, sender, "join_server", "Already a member of this server", ctx.timestamp);
}
ctx.db.server_member().insert(ServerMember { id: 0, identity: sender, server_id: target_server_id, name: user.name.clone(), avatar_id: user.avatar_id, online: user.online });
sync_server_access(&ctx.db, sender, target_server_id);
// Record success
ctx.db.join_server_status().identity().delete(sender);
ctx.db.join_server_status().insert(JoinServerStatus {
identity: sender,
status: "success".to_string(),
server_id: Some(target_server_id),
error: None,
});
report_success_with_payload(&ctx.db, sender, "join_server", &target_server_id.to_string(), ctx.timestamp);
}
#[spacetimedb::reducer]
@@ -393,17 +366,28 @@ pub fn leave_server(ctx: &ReducerContext, server_id: u64) {
for i in invites { ctx.db.invite().code().delete(i.code); }
revoke_server_access(&ctx.db, sender, server_id);
report_success(&ctx.db, sender, "leave_server", ctx.timestamp);
}
#[spacetimedb::reducer]
pub fn create_channel(ctx: &ReducerContext, name: String, server_id: u64, is_voice: bool) {
validate_name(&name).expect("Invalid name");
let mut s = ctx.db.server().id().find(server_id).expect("Server not found");
if let Err(e) = validate_name(&name) {
return report_error(&ctx.db, ctx.sender(), "create_channel", &e, ctx.timestamp);
}
let mut s = match ctx.db.server().id().find(server_id) {
Some(s) => s,
None => return report_error(&ctx.db, ctx.sender(), "create_channel", "Server not found", ctx.timestamp),
};
if s.owner != Some(ctx.sender()) {
return report_error(&ctx.db, ctx.sender(), "create_channel", "Only owner can create channels", ctx.timestamp);
}
let kind = if is_voice { ChannelKind::Voice } else { ChannelKind::Text };
let chan = ctx.db.channel().insert(Channel { id: 0, server_id, name: name.clone(), kind });
s.channels.push(ChannelMetadata { id: chan.id, name, kind });
ctx.db.server().id().update(s);
for m in ctx.db.server_member().server_id().filter(server_id) { grant_user_channel_access(&ctx.db, m.identity, chan.id); }
report_success(&ctx.db, ctx.sender(), "create_channel", ctx.timestamp);
}
#[spacetimedb::reducer]
@@ -485,13 +469,6 @@ pub fn send_webrtc_signal(ctx: &ReducerContext, receiver: Identity, signal_kind:
#[spacetimedb::reducer]
pub fn send_message(ctx: &ReducerContext, text: String, channel_id: u64, thread_id: Option<u64>, image_ids: Vec<u64>, is_encrypted: bool) {
internal_send_message(&ctx.db, ctx.sender(), channel_id, text, ctx.timestamp, thread_id, image_ids, is_encrypted);
if let Some(tid) = thread_id {
if let Some(thread) = ctx.db.thread().id().find(tid) {
if let Some(mut pm) = ctx.db.message().id().find(thread.parent_message_id) {
pm.thread_reply_count += 1; ctx.db.message().id().update(pm);
}
}
}
}
#[spacetimedb::reducer]
@@ -509,19 +486,40 @@ pub fn create_thread(ctx: &ReducerContext, name: String, channel_id: u64, parent
.map(|m| if m.text.trim().is_empty() { "New Thread".to_string() } else { m.text.chars().take(32).collect() })
.unwrap_or("New Thread".to_string());
}
validate_name(&thread_name).expect("Invalid name");
ctx.db.thread().insert(Thread { id: 0, channel_id, parent_message_id, name: thread_name.clone() });
if let Err(e) = validate_name(&thread_name) {
return report_error(&ctx.db, ctx.sender(), "create_thread", &e, ctx.timestamp);
}
if let Some(mut pm) = ctx.db.message().id().find(parent_message_id) {
pm.thread_name = Some(thread_name); ctx.db.message().id().update(pm);
if pm.channel_id != channel_id {
return report_error(&ctx.db, ctx.sender(), "create_thread", "Channel ID mismatch", ctx.timestamp);
}
pm.thread_name = Some(thread_name);
ctx.db.message().id().update(pm);
report_success(&ctx.db, ctx.sender(), "create_thread", ctx.timestamp);
} else {
report_error(&ctx.db, ctx.sender(), "create_thread", "Parent message not found", ctx.timestamp);
}
}
#[spacetimedb::reducer]
pub fn create_thread_with_message(ctx: &ReducerContext, name: String, channel_id: u64, parent_message_id: u64, text: String, image_ids: Vec<u64>, is_encrypted: bool) {
let t = ctx.db.thread().insert(Thread { id: 0, channel_id, parent_message_id, name: name.clone() });
internal_send_message(&ctx.db, ctx.sender(), channel_id, text, ctx.timestamp, Some(t.id), image_ids, is_encrypted);
if let Err(e) = validate_name(&name) {
return report_error(&ctx.db, ctx.sender(), "create_thread", &e, ctx.timestamp);
}
if let Some(mut pm) = ctx.db.message().id().find(parent_message_id) {
pm.thread_name = Some(name); pm.thread_reply_count += 1; ctx.db.message().id().update(pm);
if pm.channel_id != channel_id {
return report_error(&ctx.db, ctx.sender(), "create_thread", "Channel ID mismatch", ctx.timestamp);
}
pm.thread_name = Some(name);
pm.thread_reply_count += 1;
ctx.db.message().id().update(pm);
internal_send_message(&ctx.db, ctx.sender(), channel_id, text, ctx.timestamp, Some(parent_message_id), image_ids, is_encrypted);
report_success(&ctx.db, ctx.sender(), "create_thread", ctx.timestamp);
} else {
report_error(&ctx.db, ctx.sender(), "create_thread", "Parent message not found", ctx.timestamp);
}
}
@@ -553,3 +551,15 @@ pub fn close_direct_message(ctx: &ReducerContext, channel_id: u64) {
ctx.db.direct_message().id().update(dm);
}
}
#[spacetimedb::reducer]
pub fn open_thread(ctx: &ReducerContext, thread_id: u64) {
let identity = ctx.sender();
ctx.db.thread_subscription().identity().delete(identity);
ctx.db.thread_subscription().insert(ThreadSubscription { identity, thread_id });
}
#[spacetimedb::reducer]
pub fn close_thread(ctx: &ReducerContext) {
ctx.db.thread_subscription().identity().delete(ctx.sender());
}
+13 -14
View File
@@ -165,17 +165,12 @@ pub struct ChannelSubscription {
pub last_read_seq_id: u64,
}
#[spacetimedb::table(accessor = thread, public)]
#[spacetimedb::table(accessor = thread_subscription)]
#[derive(Clone)]
pub struct Thread {
pub struct ThreadSubscription {
#[primary_key]
#[auto_inc]
pub id: u64,
#[index(btree)]
pub channel_id: u64,
#[unique]
pub parent_message_id: u64,
pub name: String,
pub identity: Identity,
pub thread_id: u64,
}
#[derive(spacetimedb::SpacetimeType, Clone, Debug)]
@@ -251,7 +246,7 @@ pub struct ImageBlobRequest {
pub image_id: u64,
}
#[spacetimedb::table(accessor = typing_activity, public)]
#[spacetimedb::table(accessor = typing_activity)]
#[derive(Clone)]
pub struct TypingActivity {
#[primary_key]
@@ -282,14 +277,18 @@ pub struct UploadStatus {
pub error: Option<String>,
}
#[spacetimedb::table(accessor = join_server_status, public)]
#[spacetimedb::table(accessor = reducer_status, public)]
#[derive(Clone)]
pub struct JoinServerStatus {
pub struct ReducerStatus {
#[primary_key]
pub identity: Identity,
pub status: String, // "success", "error"
pub server_id: Option<u64>,
// We could use a composite key if we want to track multiple reducers,
// but for most UI feedback, one "current" status per user is usually enough.
// Let's use a single row per user for now for simplicity.
pub reducer_name: String,
pub status: String, // "success", "error", "processing"
pub error: Option<String>,
pub last_update: Timestamp,
}
#[spacetimedb::table(accessor = invite)]
+33
View File
@@ -261,3 +261,36 @@ pub fn clear_signaling_for_user(db: &Local, identity: Identity) {
let signals: Vec<_> = db.webrtc_signal().sender().filter(identity).map(|s| s.id).collect();
for id in signals { db.webrtc_signal().id().delete(id); }
}
pub fn report_error(db: &Local, identity: Identity, reducer_name: &str, error: &str, timestamp: spacetimedb::Timestamp) {
db.reducer_status().identity().delete(identity);
db.reducer_status().insert(ReducerStatus {
identity,
reducer_name: reducer_name.to_string(),
status: "error".to_string(),
error: Some(error.to_string()),
last_update: timestamp,
});
}
pub fn report_success(db: &Local, identity: Identity, reducer_name: &str, timestamp: spacetimedb::Timestamp) {
db.reducer_status().identity().delete(identity);
db.reducer_status().insert(ReducerStatus {
identity,
reducer_name: reducer_name.to_string(),
status: "success".to_string(),
error: None,
last_update: timestamp,
});
}
pub fn report_success_with_payload(db: &Local, identity: Identity, reducer_name: &str, payload: &str, timestamp: spacetimedb::Timestamp) {
db.reducer_status().identity().delete(identity);
db.reducer_status().insert(ReducerStatus {
identity,
reducer_name: reducer_name.to_string(),
status: "success".to_string(),
error: Some(payload.to_string()), // We reuse the error field as a general-purpose result payload for success
last_update: timestamp,
});
}
+16 -21
View File
@@ -155,6 +155,22 @@ pub fn visible_direct_messages(ctx: &ViewContext) -> impl Query<DirectMessage> {
.r#where(move |dm| dm.sender.eq(identity).or(dm.recipient.eq(identity)))
}
#[spacetimedb::view(accessor = visible_thread_messages, public)]
pub fn visible_thread_messages(ctx: &ViewContext) -> Vec<Message> {
let identity = ctx.sender();
let mut results = Vec::new();
if let Some(sub) = ctx.db.thread_subscription().identity().find(identity) {
let tid = sub.thread_id;
for msg in ctx.db.message().channel_id().filter(0u64..) {
if msg.thread_id == Some(tid) {
results.push(msg.clone());
}
}
}
results
}
#[spacetimedb::view(accessor = visible_images, public)]
pub fn visible_images(ctx: &ViewContext) -> Vec<VisibleImageRow> {
let image_ids = get_visible_image_ids_read_only(&ctx.db, ctx.sender());
@@ -219,27 +235,6 @@ pub fn visible_scrollback_messages(ctx: &ViewContext) -> impl Query<Message> {
}
}
#[spacetimedb::view(accessor = visible_scrollback_thread_messages, public)]
pub fn visible_scrollback_thread_messages(ctx: &ViewContext) -> Vec<VisibleMessageRow> {
let identity = ctx.sender();
let mut results = Vec::new();
if let Some(sub) = ctx.db.channel_subscription().identity().find(identity) {
for msg in ctx.db.message().channel_id().filter(sub.channel_id) {
if msg.thread_id.is_some() {
results.push(VisibleMessageRow {
id: msg.id, sender: msg.sender, sent: msg.sent, text: msg.text.clone(),
channel_id: msg.channel_id, thread_id: msg.thread_id, seq_id: msg.seq_id,
reactions: msg.reactions.clone(), image_ids: msg.image_ids.clone(),
thread_name: msg.thread_name.clone(), thread_reply_count: msg.thread_reply_count,
edited: msg.edited, is_encrypted: msg.is_encrypted,
});
}
}
}
results
}
#[spacetimedb::view(accessor = my_channel_subscriptions, public)]
pub fn my_channel_subscriptions(ctx: &ViewContext) -> Vec<MyChannelSubscriptionRow> {
if let Some(sub) = ctx.db.channel_subscription().identity().find(ctx.sender()) {
+13
View File
@@ -85,6 +85,19 @@
}
});
// Global Reducer Error Handler
$effect(() => {
const status = chat.reducerStatus;
if (status?.status === "error") {
chat.confirmModal = {
title: `Action Failed: ${status.reducerName}`,
message: status.error || "An unknown error occurred during this action.",
confirmText: "Dismiss",
onConfirm: () => {}
};
}
});
let showSettings = $state(false);
let showMemberList = $state(true);
let showSidebar = $state(true); // Toggle for mobile
+18 -1
View File
@@ -527,7 +527,24 @@
{#if !isThread && (msg.threadId || msg.threadName)}
<button
class="thread-link"
onclick={() => (chat.activeThreadId = msg.threadId || existingThread?.id)}
onclick={() => {
const targetId = msg.threadId || existingThread?.id;
console.log(`[MessageItem] Opening thread. msg.id=${msg.id}, threadId=${msg.threadId}, existing=${existingThread?.id}`);
if (targetId) {
chat.activeThreadId = targetId;
} else if (msg.threadName) {
// If we have a name but no ID in cache yet, try to find it again or show loading
console.log("[MessageItem] Thread metadata indicates thread exists but not in local cache. Finding by parentMsgId...");
const t = chat.allThreads.find(x => x.parentMessageId === msg.id);
if (t) {
chat.activeThreadId = t.id;
} else {
// This might happen if the thread was just created or hasn't synced yet
chat.pendingThreadParentMessageId = msg.id;
}
}
}}
style="margin-top: 2px; padding-left: 12px; border-left: 2px solid var(--background-accent); display: flex; align-items: center; gap: 4px; font-size: 0.75rem; background: none; border-top: none; border-right: none; border-bottom: none; color: var(--brand); cursor: pointer; height: 18px; text-decoration: none;"
aria-label="View thread with {msg.threadReplyCount} messages"
>
+14 -12
View File
@@ -5,6 +5,7 @@
import MessageItem from "./MessageItem.svelte";
import { getContext, tick } from "svelte";
import type { ChatService } from "../services/chat.svelte";
import type { Thread } from "../services/database.svelte";
let {
activeThreadId,
@@ -24,7 +25,7 @@
activeChannelId: bigint | null,
activeServer: Types.Server | undefined,
isFullyAuthenticated: boolean,
allThreads: readonly Types.Thread[],
allThreads: readonly Thread[],
allMessages: readonly Types.Message[],
} = $props();
@@ -55,6 +56,11 @@
}
const activeThread = $derived(allThreads.find((t) => t.id === activeThreadId));
$effect(() => {
console.log(`[ThreadView] activeThreadId=${activeThreadId}, foundThread=${!!activeThread}, allThreadsCount=${allThreads.length}`);
});
const parentMessageId = $derived(activeThread ? activeThread.parentMessageId : pendingThreadParentMessageId);
const parentMessage = $derived(allMessages.find(m => m.id === parentMessageId));
@@ -65,16 +71,7 @@
setPendingThreadParentMessageId(null);
}
const threadMessages = $derived.by(() => {
if (activeThreadId) {
return allMessages
.filter((m) => m.threadId === activeThreadId)
.sort((a, b) =>
a.sent.microsSinceUnixEpoch < b.sent.microsSinceUnixEpoch ? -1 : 1
);
}
return [];
});
const threadMessages = $derived(chat.threadMessages);
function handleContentLoad() {
if (isAtBottom) {
@@ -112,7 +109,7 @@
});
</script>
{#if (activeThreadId && activeThread) || pendingThreadParentMessageId}
{#if activeThreadId || pendingThreadParentMessageId}
<div class="thread-view">
<header class="thread-header">
<div class="thread-header-left">
@@ -148,6 +145,10 @@
<span>BEGINNING OF THREAD</span>
</div>
</div>
{:else if activeThreadId && !activeThread}
<div class="thread-loading-placeholder" style="padding: 20px; text-align: center; color: var(--text-muted);">
<i class="fas fa-spinner fa-spin"></i> Loading thread metadata...
</div>
{/if}
<ThreadMessageList
@@ -156,6 +157,7 @@
/>
{#if !isAtBottom && (threadMessages.length > 0 || parentMessage)}
...
<div class="scroll-pause-indicator small">
<span>Newer messages</span>
<button
+22 -11
View File
@@ -3,7 +3,7 @@ import { Identity } from "spacetimedb";
import { SvelteSet } from "svelte/reactivity";
import * as Types from "../../module_bindings/types";
import { getUsername, formatTime } from "../utils";
import { DatabaseService } from "./database.svelte";
import { DatabaseService, type Thread } from "./database.svelte";
import { NavigationService } from "./navigation.svelte";
import { ThemeService, themeService } from "./theme.svelte";
import { AuthContextService } from "./auth-context.svelte";
@@ -49,13 +49,24 @@ export class ChatService {
this.#encryption = new EncryptionService(this.#db, () => this.identity);
this.#media = new MediaCacheService(this.#db, () => this.identity);
// Sync active thread with backend for surgical message delivery (Plan D)
$effect(() => {
const threadId = this.activeThreadId;
if (threadId) {
this.#msg.handleOpenThread(threadId);
} else {
this.#msg.handleCloseThread();
}
});
// Global Join Handler: Automatically navigate when we successfully join a server
$effect(() => {
const status = this.joinServerStatus;
if (status?.status === "success" && status.serverId) {
console.log(`[ChatService] Global join success detected for server ${status.serverId}. Navigating...`);
const status = this.reducerStatus;
if (status?.status === "success" && status.reducerName === "join_server" && status.error) {
const serverId = BigInt(status.error);
console.log(`[ChatService] Global join success detected for server ${serverId}. Navigating...`);
untrack(() => {
this.activeServerId = status.serverId!;
this.activeServerId = serverId;
});
}
});
@@ -176,9 +187,12 @@ export class ChatService {
get activeChannel() {
return this.#nav.activeChannel;
}
get activeThread() {
get activeThread(): Thread | undefined {
return this.#msg.activeThread;
}
get allThreads(): Thread[] {
return this.#db.allThreads as Thread[];
}
get joinedServers() {
return this.#nav.joinedServers;
}
@@ -338,9 +352,6 @@ export class ChatService {
}
getMessageImages = (messageId: bigint) => this.#msg.getMessageImages(messageId);
getMessageReactions = (messageId: bigint) => this.#msg.getMessageReactions(messageId);
get allThreads() {
return this.#db.allThreads;
}
get images() {
return this.#db.images;
}
@@ -765,10 +776,10 @@ export class ChatService {
get directMessages() {
return this.#db.directMessages;
}
get joinServerStatus() {
get reducerStatus() {
const myId = this.identity;
if (!myId) return null;
return this.#db.joinServerStatus.find(s => s.identity.isEqual(myId)) || null;
return this.#db.reducerStatus.find(s => s.identity.isEqual(myId)) || null;
}
handleDeleteServer = (serverId: bigint) => {
+41 -6
View File
@@ -3,13 +3,46 @@ import { useTable } from "spacetimedb/svelte";
import * as Types from "../../module_bindings/types";
import type { Identity } from "spacetimedb";
export interface Thread {
id: bigint;
channelId: bigint;
parentMessageId: bigint;
name: string;
}
export class DatabaseService {
servers = $state<readonly Types.Server[]>([]);
channels = $state<readonly Types.VisibleChannelRow[]>([]);
directMessages = $state<readonly Types.DirectMessage[]>([]);
users = $state<readonly Types.User[]>([]);
serverMembers = $state<readonly Types.ServerMember[]>([]);
allThreads = $state<readonly Types.Thread[]>([]);
threadMessages = $state<readonly Types.Message[]>([]);
recentMessages = $state<readonly Types.Message[]>([]);
allThreads = $derived.by(() => {
// Collect all messages that are thread parents
const threads: Thread[] = [];
const seen = new Set<bigint>();
const process = (msgs: readonly Types.Message[]) => {
for (const m of msgs) {
if (m.threadName && !seen.has(m.id)) {
seen.add(m.id);
threads.push({
id: m.id,
channelId: m.channelId,
parentMessageId: m.id,
name: m.threadName
});
}
}
};
process(this.recentMessages);
process(this.threadMessages);
return threads;
});
images = $state<readonly Types.VisibleImageRow[]>([]);
imageBlobs = $state<readonly Types.ImageData[]>([]);
customEmojis = $state<readonly Types.CustomEmoji[]>([]);
@@ -17,7 +50,7 @@ export class DatabaseService {
typingActivity = $state<readonly Types.TypingActivity[]>([]);
systemConfiguration = $state<readonly Types.SystemConfiguration[]>([]);
uploadStatus = $state<readonly Types.UploadStatus[]>([]);
joinServerStatus = $state<readonly Types.JoinServerStatus[]>([]);
reducerStatus = $state<readonly Types.ReducerStatus[]>([]);
isUsersReady = $state(false);
isServersReady = $state(false);
isChannelsReady = $state(false);
@@ -81,14 +114,15 @@ export class DatabaseService {
const [usersStore, usersReadyStore] = useTable(tables.user);
const [userStatesStore] = useTable(tables.visible_user_states);
const [serverMembersStore, membersReadyStore] = useTable(tables.visible_server_members);
const [threadsStore] = useTable(tables.thread);
const [recentMessagesStore] = useTable(tables.visible_recent_activity);
const [threadMessagesStore] = useTable(tables.visible_thread_messages);
const [imagesStore, imagesReadyStore] = useTable(tables.visible_images);
const [imageBlobsStore] = useTable(tables.visible_image_blobs);
const [customEmojisStore] = useTable(tables.custom_emoji);
const [typingActivityStore] = useTable(tables.visible_typing_activity);
const [systemConfigStore] = useTable(tables.system_configuration);
const [uploadStatusStore] = useTable(tables.upload_status);
const [joinServerStatusStore] = useTable(tables.join_server_status);
const [reducerStatusStore] = useTable(tables.reducer_status);
serversStore.subscribe((v) => (this.servers = v));
serversReadyStore.subscribe((v) => (this.isServersReady = v));
@@ -100,7 +134,8 @@ export class DatabaseService {
userStatesStore.subscribe((v) => (this.userStates = v));
serverMembersStore.subscribe((v) => (this.serverMembers = v));
membersReadyStore.subscribe((v) => (this.isMembersReady = v));
threadsStore.subscribe((v) => (this.allThreads = v));
recentMessagesStore.subscribe((v) => (this.recentMessages = v));
threadMessagesStore.subscribe((v) => (this.threadMessages = v));
imagesStore.subscribe((v) => (this.images = v));
imagesReadyStore.subscribe((v) => (this.isImagesReady = v));
imageBlobsStore.subscribe((v) => {
@@ -115,6 +150,6 @@ export class DatabaseService {
typingActivityStore.subscribe((v) => (this.typingActivity = v));
systemConfigStore.subscribe((v) => (this.systemConfiguration = v));
uploadStatusStore.subscribe((v) => (this.uploadStatus = v));
joinServerStatusStore.subscribe((v) => (this.joinServerStatus = v));
reducerStatusStore.subscribe((v) => (this.reducerStatus = v));
}
}
+34 -15
View File
@@ -1,4 +1,4 @@
import { DatabaseService } from "./database.svelte";
import { DatabaseService, type Thread } from "./database.svelte";
import { NavigationService } from "./navigation.svelte";
import { useReducer, useTable } from "spacetimedb/svelte";
import { reducers, tables } from "../../module_bindings";
@@ -24,6 +24,8 @@ export class MessagingService {
#subscribeToChannelReducer: any;
#extendSubscriptionReducer: any;
#clearUploadStatusReducer: any;
#openThreadReducer: any;
#closeThreadReducer: any;
/**
* Hook for ChatService to intercept and potentially encrypt messages.
@@ -86,6 +88,8 @@ export class MessagingService {
this.#subscribeToChannelReducer = useReducer(reducers.subscribeToChannel);
this.#extendSubscriptionReducer = useReducer(reducers.extendSubscription);
this.#clearUploadStatusReducer = useReducer(reducers.clearUploadStatus);
this.#openThreadReducer = useReducer(reducers.openThread);
this.#closeThreadReducer = useReducer(reducers.closeThread);
const [visibleMessagesStore, visibleMessagesReadyStore] = useTable(tables.visible_recent_activity);
const [visibleScrollbackStore] = useTable(tables.visible_scrollback_messages);
@@ -93,13 +97,22 @@ export class MessagingService {
mySubscriptionsStore.subscribe((v) => (this.#mySubscriptions = v));
let recentMessages: readonly Types.Message[] = [];
let scrollbackMessages: readonly Types.Message[] = [];
let recentMessages = $state<readonly Types.Message[]>([]);
let scrollbackMessages = $state<readonly Types.Message[]>([]);
// Incremental update logic for visible messages
const seenMessageIds = new Set<bigint>();
let initialSyncComplete = false;
// Plan D & Global Sync: Reactively update buckets whenever any message source changes
$effect(() => {
this.#updateBuckets([
...recentMessages,
...scrollbackMessages,
...this.#db.threadMessages
]);
});
visibleMessagesStore.subscribe((v) => {
if (v.length > 0) {
console.log(`[MessagingService] Received batch of ${v.length} messages. Initial sync complete: ${initialSyncComplete}`);
@@ -124,7 +137,6 @@ export class MessagingService {
}
recentMessages = v;
this.#updateBuckets([...recentMessages, ...scrollbackMessages]);
// Mark any channel that has received messages as ready
for (const m of v) {
@@ -153,7 +165,6 @@ export class MessagingService {
for (const msg of v) seenMessageIds.add(msg.id);
scrollbackMessages = v;
this.#updateBuckets([...recentMessages, ...scrollbackMessages]);
});
$effect(() => {
@@ -175,11 +186,14 @@ export class MessagingService {
const idHex = identity.toHexString();
queries.push("SELECT * FROM custom_emoji");
queries.push("SELECT * FROM system_configuration");
queries.push("SELECT * FROM reducer_status");
queries.push(`SELECT * FROM visible_servers`);
queries.push(`SELECT * FROM user WHERE identity = 0x${idHex}`);
queries.push(`SELECT * FROM visible_server_members`);
queries.push(`SELECT * FROM visible_channels`);
queries.push(`SELECT * FROM visible_recent_activity`);
queries.push(`SELECT * FROM visible_thread_messages`);
queries.push(`SELECT * FROM visible_direct_messages`);
queries.push(`SELECT * FROM my_channel_subscriptions`);
@@ -191,7 +205,6 @@ export class MessagingService {
if (channelId) {
this.#subscribeToChannelReducer({ channelId });
queries.push(`SELECT * FROM visible_scrollback_messages`);
queries.push(`SELECT * FROM thread WHERE channel_id = ${channelId}`);
queries.push(`SELECT * FROM visible_user_states`);
queries.push(`SELECT * FROM visible_typing_activity`);
@@ -258,10 +271,18 @@ export class MessagingService {
return this.allMessages.filter((m) => !m.threadId);
}
get activeThread() {
get activeThread(): Thread | undefined {
return this.#db.allThreads.find((t) => t.id === this.#nav.activeThreadId);
}
handleOpenThread = (threadId: bigint) => {
this.#openThreadReducer({ threadId });
};
handleCloseThread = () => {
this.#closeThreadReducer({});
};
getMessageImages(messageId: bigint) {
const channelId = this.#nav.activeChannelId;
if (!channelId) return [];
@@ -284,14 +305,12 @@ export class MessagingService {
const threadId = this.#nav.activeThreadId;
if (!threadId) return [];
const thread = this.activeThread;
if (!thread) return [];
const channelId = thread.channelId;
const bucket = this.#channelBuckets.get(channelId);
if (!bucket) return [];
return bucket.sorted.filter(m => m.threadId === threadId);
// Plan D: the database.threadMessages state already contains ONLY the messages for the active thread
return Array.from(this.#db.threadMessages).sort((a, b) => {
if (a.seqId < b.seqId) return -1;
if (a.seqId > b.seqId) return 1;
return a.sent.microsSinceUnixEpoch < b.sent.microsSinceUnixEpoch ? -1 : 1;
});
}
get hasMoreMessages() {