only use recent messages table unless history is needed

This commit is contained in:
2026-04-05 11:09:18 -04:00
parent 62815e6e10
commit f058b508ec
6 changed files with 173 additions and 136 deletions
+1 -1
View File
@@ -154,7 +154,7 @@ const [items, isLoading] = useTable(tables.item);
### ⛔ DO NOT:
- **Invent hooks** like `useItems()`, `useData()` — use `useTable(tables.tableName)`
- **Import from fake packages** — only `spacetimedb`, `spacetimedb/react`, `./module_bindings`
- **Import from fake packages** — only `spacetimedb`, `spacetimedb/svelte`, `./module_bindings`
---
+55 -1
View File
@@ -396,6 +396,25 @@ const typing_activity = table(
},
);
const recent_message = table(
{
name: "recent_message",
public: true,
indexes: [
{ accessor: "by_channel", algorithm: "btree", columns: ["channel_id"] },
],
},
{
id: t.u64().primaryKey().autoInc(),
channel_id: t.u64(),
message_id: t.u64(),
sender: t.identity(),
text: t.string(),
thread_id: t.u64().optional(),
sent: t.timestamp(),
},
);
const system_configuration = table(
{
name: "system_configuration",
@@ -428,6 +447,7 @@ const spacetimedb = schema({
custom_emoji,
image,
typing_activity,
recent_message,
system_configuration,
});
export default spacetimedb;
@@ -1076,7 +1096,7 @@ export const create_thread_with_message = spacetimedb.reducer(
if (!threadName || threadName.trim().length === 0) {
threadName = "New Thread";
}
if ((!text || text.trim().length === 0) && imageIds.length === 0)
throw new SenderError("Messages must not be empty");
@@ -1116,6 +1136,23 @@ export const create_thread_with_message = spacetimedb.reducer(
image_id: imageId,
});
}
// Maintain recent_message (last 100)
ctx.db.recent_message.insert({
id: 0n,
channel_id: channelId,
message_id: msg.id,
sender: ctx.sender,
text,
thread_id: t.id,
sent: ctx.timestamp,
});
const recent = [...ctx.db.recent_message.by_channel.filter(channelId)].sort(
(a, b) => (a.sent.microsSinceUnixEpoch < b.sent.microsSinceUnixEpoch ? -1 : 1),
);
if (recent.length > 100) {
ctx.db.recent_message.id.delete(recent[0].id);
}
},
);
@@ -1155,6 +1192,23 @@ export const send_message = spacetimedb.reducer(
image_id: imageId,
});
}
// Maintain recent_message (last 100)
ctx.db.recent_message.insert({
id: 0n,
channel_id: channelId,
message_id: msg.id,
sender: ctx.sender,
text,
thread_id: threadId,
sent: ctx.timestamp,
});
const recent2 = [...ctx.db.recent_message.by_channel.filter(channelId)].sort(
(a, b) => (a.sent.microsSinceUnixEpoch < b.sent.microsSinceUnixEpoch ? -1 : 1),
);
if (recent2.length > 100) {
ctx.db.recent_message.id.delete(recent2[0].id);
}
},
);
+1 -1
View File
@@ -20,7 +20,7 @@
isAtBottom = atBottom;
// If we reach the top and have more messages to load
if (scrollTop === 0 && !isPrepending && chat.channelMessages.length >= chat.messageLimit) {
if (scrollTop === 0 && !isPrepending && chat.hasMoreMessages) {
isPrepending = true;
oldScrollHeight = scrollHeight;
chat.handleLoadMoreMessages();
+2 -2
View File
@@ -293,8 +293,8 @@ export class ChatService {
get threadMessages() {
return this.#msg.threadMessages;
}
get messageLimit() {
return this.#msg.messageLimit;
get hasMoreMessages() {
return this.#msg.hasMoreMessages;
}
// Voice
-3
View File
@@ -7,12 +7,9 @@ export class DatabaseService {
channels = $state<readonly Types.Channel[]>([]);
users = $state<readonly Types.User[]>([]);
serverMembers = $state<readonly Types.ServerMember[]>([]);
allMessages = $state<readonly Types.Message[]>([]);
messageImages = $state<readonly Types.MessageImage[]>([]);
allThreads = $state<readonly Types.Thread[]>([]);
images = $state<readonly Types.Image[]>([]);
customEmojis = $state<readonly Types.CustomEmoji[]>([]);
messageReactions = $state<readonly Types.MessageReaction[]>([]);
voiceStates = $state<readonly Types.VoiceState[]>([]);
voiceActivity = $state<readonly Types.VoiceActivity[]>([]);
typingActivity = $state<readonly Types.TypingActivity[]>([]);
+114 -128
View File
@@ -1,6 +1,6 @@
import { DatabaseService } from "./database.svelte";
import { NavigationService } from "./navigation.svelte";
import { useTable, useReducer } from "spacetimedb/svelte";
import { useReducer, useTable } from "spacetimedb/svelte";
import { reducers, tables } from "../../module_bindings";
import * as Types from "../../module_bindings/types";
import { getConnection } from "../../config";
@@ -12,19 +12,27 @@ export class MessagingService {
#nav: NavigationService;
#identity: () => Identity | null;
#createThreadWithMessageReducer: any;
#sendMessageReducer: any;
#createThreadWithMessageReducer: any;
#uploadImageReducer: any;
#uploadCustomEmojiReducer: any;
#toggleReactionReducer: any;
#setTypingReducer: any;
#allMessages = $state<readonly Types.Message[]>([]);
#allHistoryMessages = $state<readonly Types.Message[]>([]);
#allRecentMessages = $state<readonly Types.RecentMessage[]>([]);
#messageImages = $state<readonly Types.MessageImage[]>([]);
#messageReactions = $state<readonly Types.MessageReaction[]>([]);
messageLimit = $state(50);
// Track which channels we have expanded to full history
#expandedChannels = $state<Set<bigint>>(new Set());
isLoadingMore = $state(false);
hasMoreMessages = $derived.by(() => {
const channelId = this.#nav.activeChannelId;
if (!channelId) return false;
return !this.#expandedChannels.has(channelId);
});
constructor(
db: DatabaseService,
@@ -35,119 +43,97 @@ export class MessagingService {
this.#nav = nav;
this.#identity = identity;
this.#createThreadWithMessageReducer = useReducer(
reducers.createThreadWithMessage,
);
this.#sendMessageReducer = useReducer(reducers.sendMessage);
this.#createThreadWithMessageReducer = useReducer(reducers.createThreadWithMessage);
this.#uploadImageReducer = useReducer(reducers.uploadImage);
this.#uploadCustomEmojiReducer = useReducer(reducers.uploadCustomEmoji);
this.#toggleReactionReducer = useReducer(reducers.toggleReaction);
this.#setTypingReducer = useReducer(reducers.setTyping);
// 1. Reactive Sync from SpacetimeDB Cache
const [messagesStore] = useTable(tables.message);
const [recentMessagesStore] = useTable(tables.recent_message);
const [messageImagesStore] = useTable(tables.message_image);
const [messageReactionsStore] = useTable(tables.message_reaction);
messagesStore.subscribe(
(v: readonly Types.Message[]) => (this.#allMessages = v),
);
messageImagesStore.subscribe(
(v: readonly Types.MessageImage[]) => (this.#messageImages = v),
);
messageReactionsStore.subscribe(
(v: readonly Types.MessageReaction[]) => (this.#messageReactions = v),
);
messagesStore.subscribe((v) => (this.#allHistoryMessages = v));
recentMessagesStore.subscribe((v) => (this.#allRecentMessages = v));
messageImagesStore.subscribe((v) => (this.#messageImages = v));
messageReactionsStore.subscribe((v) => (this.#messageReactions = v));
// 2. Surgical Subscription Management
$effect(() => {
const channelId = this.#nav.activeChannelId;
const serverId = this.#nav.activeServerId;
const limit = this.messageLimit;
const threadId = this.#nav.activeThreadId;
const identity = this.#identity();
const conn = getConnection();
const isExpanded = channelId ? this.#expandedChannels.has(channelId) : false;
if (!conn) return;
untrack(() => {
const queries = ["SELECT * FROM server", "SELECT * FROM custom_emoji", "SELECT * FROM system_configuration"];
const queries: string[] = [];
// 1. Surgical Membership & Identity Pruning
if (identity) {
const idHex = identity.toHexString();
queries.push("SELECT * FROM custom_emoji");
queries.push("SELECT * FROM system_configuration");
queries.push(`SELECT * FROM server WHERE id IN (SELECT server_id FROM server_member WHERE identity = 0x${idHex})`);
queries.push(`SELECT * FROM user WHERE identity = 0x${idHex}`);
queries.push(
`SELECT * FROM server_member WHERE identity = 0x${idHex}`,
);
queries.push(`SELECT * FROM server_member WHERE identity = 0x${idHex}`);
if (serverId) {
queries.push(`SELECT * FROM server_member WHERE server_id = ${serverId}`);
// INITIAL LOAD: Subscribe to ALL recent messages in the server.
// These now contain the actual message data (text, sender, etc.)
queries.push(`SELECT * FROM recent_message WHERE channel_id IN (SELECT id FROM channel WHERE server_id = ${serverId})`);
}
} else {
queries.push("SELECT * FROM custom_emoji");
queries.push("SELECT * FROM system_configuration");
queries.push("SELECT * FROM server WHERE name = 'Ditchcord'");
}
// 2. Metadata & Global State
// Start with a broad user query to ensure we find ourselves
let userQuery = "SELECT * FROM user WHERE online = true";
if (identity) {
userQuery += ` OR identity = 0x${identity.toHexString()}`;
}
if (serverId) {
queries.push(`SELECT * FROM channel WHERE server_id = ${serverId}`);
queries.push(
`SELECT * FROM thread WHERE channel_id IN (SELECT id FROM channel WHERE server_id = ${serverId})`,
);
queries.push(`SELECT * FROM thread WHERE channel_id IN (SELECT id FROM channel WHERE server_id = ${serverId})`);
queries.push(`SELECT * FROM voice_state WHERE channel_id IN (SELECT id FROM channel WHERE server_id = ${serverId})`);
queries.push(`SELECT * FROM voice_activity WHERE channel_id IN (SELECT id FROM channel WHERE server_id = ${serverId})`);
queries.push(`SELECT * FROM typing_activity WHERE channel_id IN (SELECT id FROM channel WHERE server_id = ${serverId})`);
queries.push(`SELECT * FROM watching WHERE channel_id IN (SELECT id FROM channel WHERE server_id = ${serverId})`);
// Voice states and activity are lightweight and indexed by channel_id
queries.push(
`SELECT * FROM voice_state WHERE channel_id IN (SELECT id FROM channel WHERE server_id = ${serverId})`,
);
queries.push(
`SELECT * FROM voice_activity WHERE channel_id IN (SELECT id FROM channel WHERE server_id = ${serverId})`,
);
queries.push(
`SELECT * FROM typing_activity WHERE channel_id IN (SELECT id FROM channel WHERE server_id = ${serverId})`,
);
queries.push(
`SELECT * FROM watching WHERE channel_id IN (SELECT id FROM channel WHERE server_id = ${serverId})`,
);
// 3. Load the actual messages for the active channel (with pagination)
if (channelId) {
queries.push(
`SELECT * FROM message WHERE channel_id = ${channelId} AND thread_id IS NULL ORDER BY sent DESC LIMIT ${limit}`,
);
// Scalable Message Loading Logic
if (channelId && isExpanded) {
// ONLY subscribe to the heavy message table if the user requests history expansion
queries.push(`SELECT * FROM message WHERE channel_id = ${channelId} AND thread_id IS NULL`);
}
// If viewing a thread, pull those specific messages too
if (threadId) {
// Threads are always fully subscribed since they are usually small/active context
queries.push(
`SELECT * FROM message WHERE thread_id = ${threadId} OR id = (SELECT parent_message_id FROM thread WHERE id = ${threadId})`,
);
}
// 4. Surgical Image & Reaction Sync
// Metadata queries (Reactions/Images) for visible messages
const visibleMsgSubquery = threadId
? `(SELECT id FROM message WHERE thread_id = ${threadId} OR id = (SELECT parent_message_id FROM thread WHERE id = ${threadId}))`
: `(SELECT id FROM message WHERE channel_id = ${channelId} AND thread_id IS NULL ORDER BY sent DESC LIMIT ${limit})`;
: (isExpanded
? `(SELECT id FROM message WHERE channel_id = ${channelId} AND thread_id IS NULL)`
: `(SELECT message_id FROM recent_message WHERE channel_id = ${channelId})`);
queries.push(
`SELECT * FROM message_image WHERE message_id IN ${visibleMsgSubquery}`,
);
queries.push(
`SELECT * FROM message_reaction WHERE message_id IN ${visibleMsgSubquery}`,
);
queries.push(`SELECT * FROM message_image WHERE message_id IN ${visibleMsgSubquery}`);
queries.push(`SELECT * FROM message_reaction WHERE message_id IN ${visibleMsgSubquery}`);
// Add members of this server to user query
userQuery += ` OR identity IN (SELECT identity FROM server_member WHERE server_id = ${serverId})`;
// Add message senders to user query
userQuery += ` OR identity IN (SELECT sender FROM message WHERE id IN ${visibleMsgSubquery})`;
// Add reactors to user query
userQuery += ` OR identity IN (SELECT identity FROM message_reaction WHERE message_id IN ${visibleMsgSubquery})`;
// Add typers to user query
userQuery += ` OR identity IN (SELECT identity FROM typing_activity WHERE channel_id IN (SELECT id FROM channel WHERE server_id = ${serverId}))`;
// Image Sync: Message Images + User Avatars
userQuery += ` OR identity IN (SELECT sender FROM recent_message WHERE channel_id = ${channelId})`;
const userSubquery = `(${userQuery.replace("SELECT *", "SELECT identity")})`;
queries.push(`
SELECT * FROM image WHERE
@@ -158,23 +144,54 @@ export class MessagingService {
}
queries.push(userQuery);
console.log(`[messaging] Subscribing with ${queries.length} queries`);
conn.subscriptionBuilder().subscribe(queries);
});
});
// Reset limit on channel switch
$effect(() => {
void this.#nav.activeChannelId;
untrack(() => {
this.messageLimit = 50;
});
});
}
get allMessages() {
return this.#allMessages;
const channelId = this.#nav.activeChannelId;
if (!channelId) return [];
const isExpanded = this.#expandedChannels.has(channelId);
// Map recent messages to standard Message type
const mappedRecent = this.#allRecentMessages
.filter(rm => rm.channelId === channelId)
.map(rm => ({
id: rm.messageId,
sender: rm.sender,
text: rm.text,
channelId: rm.channelId,
threadId: rm.threadId,
sent: rm.sent
})) as Types.Message[];
if (!isExpanded) {
// In non-expanded mode, ONLY show the cache
return mappedRecent.sort((a, b) =>
Number(BigInt(a.sent.microsSinceUnixEpoch) - BigInt(b.sent.microsSinceUnixEpoch))
);
}
// In expanded mode, merge history and recent, deduplicating by ID
const msgMap = new Map<bigint, Types.Message>();
// 1. Add History (the authoritative full set from SpacetimeDB)
for (const m of this.#allHistoryMessages) {
if (m.channelId === channelId) {
msgMap.set(m.id, m);
}
}
// 2. Add Recent (may contain very new messages not yet indexed into the history sync)
for (const m of mappedRecent) {
msgMap.set(m.id, m);
}
return Array.from(msgMap.values()).sort((a, b) =>
Number(BigInt(a.sent.microsSinceUnixEpoch) - BigInt(b.sent.microsSinceUnixEpoch))
);
}
get messageImages() {
@@ -188,26 +205,21 @@ export class MessagingService {
get channelMessages() {
const channelId = this.#nav.activeChannelId;
if (!channelId) return [];
return this.#allMessages
return this.allMessages
.filter((m) => m.channelId === channelId && !m.threadId)
.sort((a, b) =>
Number(
BigInt(a.sent.microsSinceUnixEpoch) -
BigInt(b.sent.microsSinceUnixEpoch),
),
Number(BigInt(a.sent.microsSinceUnixEpoch) - BigInt(b.sent.microsSinceUnixEpoch))
);
}
get threadMessages() {
const threadId = this.#nav.activeThreadId;
if (!threadId) return [];
return this.#allMessages
.filter((m) => m.threadId === threadId)
return this.allMessages
.filter(m => m.threadId === threadId)
.sort((a, b) =>
Number(
BigInt(a.sent.microsSinceUnixEpoch) -
BigInt(b.sent.microsSinceUnixEpoch),
),
Number(BigInt(a.sent.microsSinceUnixEpoch) - BigInt(b.sent.microsSinceUnixEpoch))
);
}
@@ -218,10 +230,7 @@ export class MessagingService {
}
handleStartThread = (msg: Types.Message) => {
// Check if thread already exists
const existing = this.#db.allThreads.find(
(t) => t.parentMessageId === msg.id,
);
const existing = this.#db.allThreads.find((t) => t.parentMessageId === msg.id);
if (existing) {
this.#nav.activeThreadId = existing.id;
this.#nav.pendingThreadParentMessageId = null;
@@ -231,41 +240,20 @@ export class MessagingService {
}
};
handleSendMessage = (
text: string,
threadId?: bigint,
imageIds: bigint[] = [],
) => {
handleSendMessage = (text: string, threadId?: bigint, imageIds: bigint[] = []) => {
if (this.#nav.activeChannelId) {
if (threadId) {
this.#sendMessageReducer({
channelId: this.#nav.activeChannelId,
text: text,
threadId: threadId,
imageIds: imageIds,
});
this.#sendMessageReducer({ channelId: this.#nav.activeChannelId, text, threadId, imageIds });
} else if (this.#nav.pendingThreadParentMessageId) {
const parentMsgId = this.#nav.pendingThreadParentMessageId;
const parentMsg = this.#allMessages.find((m) => m.id === parentMsgId);
const parentMsg = this.allMessages.find((m) => m.id === parentMsgId);
const name = (parentMsg?.text && parentMsg.text.trim().length > 0)
? parentMsg.text.substring(0, 32)
: "New Thread";
this.#createThreadWithMessageReducer({
name,
channelId: this.#nav.activeChannelId,
parentMessageId: parentMsgId,
text,
imageIds,
});
// Remove manual nulling - NavigationService effect will handle this once thread is created
this.#createThreadWithMessageReducer({ name, channelId: this.#nav.activeChannelId, parentMessageId: parentMsgId, text, imageIds });
} else {
this.#sendMessageReducer({
channelId: this.#nav.activeChannelId,
text: text,
threadId: undefined,
imageIds: imageIds,
});
this.#sendMessageReducer({ channelId: this.#nav.activeChannelId, text, threadId: undefined, imageIds });
}
}
};
@@ -274,23 +262,21 @@ export class MessagingService {
this.#uploadImageReducer({ data, mimeType, name });
};
uploadCustomEmoji = async (
name: string,
category: string,
data: Uint8Array,
) => {
uploadCustomEmoji = async (name: string, category: string, data: Uint8Array) => {
this.#uploadCustomEmojiReducer({ name, category, data });
};
handleLoadMoreMessages = () => {
this.messageLimit += 50;
const channelId = this.#nav.activeChannelId;
if (!channelId) return;
// Switch to full channel history subscription
const newExpanded = new Set(this.#expandedChannels);
newExpanded.add(channelId);
this.#expandedChannels = newExpanded;
};
toggleReaction = (
messageId: bigint,
emoji?: string,
customEmojiId?: bigint,
) => {
toggleReaction = (messageId: bigint, emoji?: string, customEmojiId?: bigint) => {
this.#toggleReactionReducer({ messageId, emoji, customEmojiId });
};