fix scrollback

This commit is contained in:
2026-04-21 21:26:43 -04:00
parent 6c305af961
commit 2bc7da8028
4 changed files with 76 additions and 106 deletions
+11 -37
View File
@@ -291,47 +291,19 @@ pub fn set_status(ctx: &ReducerContext, status: Option<String>) {
#[spacetimedb::reducer]
pub fn subscribe_to_channel(ctx: &ReducerContext, channel_id: u64) {
let current_max = ctx
.db
.channel_internal_state()
.channel_id()
.find(channel_id)
.map(|c| c.last_seq_id)
.unwrap_or(0);
let limit = get_recent_message_limit(&ctx.db);
let earliest = if current_max >= limit {
current_max - (limit - 1)
} else {
1
};
let current_max = ctx.db.channel_internal_state().channel_id().find(channel_id)
.map(|c| c.last_seq_id).unwrap_or(0);
let mut sub = ctx
.db
.channel_subscription()
.identity()
.find(ctx.sender())
.unwrap_or(ChannelSubscription {
// Initial state: earliest is same as current max (scrollback is empty)
let earliest = current_max;
ctx.db.channel_subscription().identity().delete(ctx.sender());
ctx.db.channel_subscription().insert(ChannelSubscription {
identity: ctx.sender(),
channel_id,
earliest_seq_id: earliest,
last_read_seq_id: current_max,
});
sub.channel_id = channel_id;
sub.earliest_seq_id = earliest;
sub.last_read_seq_id = current_max;
if ctx
.db
.channel_subscription()
.identity()
.find(ctx.sender())
.is_some()
{
ctx.db.channel_subscription().identity().update(sub);
} else {
ctx.db.channel_subscription().insert(sub);
}
}
#[spacetimedb::reducer]
@@ -345,10 +317,12 @@ pub fn request_image_blob(ctx: &ReducerContext, image_id: u64) {
#[spacetimedb::reducer]
pub fn extend_subscription(ctx: &ReducerContext, channel_id: u64, earliest_seq_id: u64) {
if let Some(mut sub) = ctx.db.channel_subscription().identity().find(ctx.sender()) {
if let Some(sub) = ctx.db.channel_subscription().identity().find(ctx.sender()) {
if sub.channel_id == channel_id {
let mut sub = sub.clone();
sub.earliest_seq_id = earliest_seq_id;
ctx.db.channel_subscription().identity().update(sub);
ctx.db.channel_subscription().identity().delete(ctx.sender());
ctx.db.channel_subscription().insert(sub);
}
}
}
+9 -8
View File
@@ -324,14 +324,15 @@ pub fn sync_recent_message(db: &Local, msg: Message) {
.count();
if count > limit as usize {
// Find the oldest message by seq_id for this channel in the recent table
if let Some(oldest) = db
.recent_message()
.channel_seq()
.filter((msg.channel_id, 0u64..))
.next()
{
db.recent_message().id().delete(oldest.id);
// Find all recent messages for this channel
let mut recent_msgs: Vec<_> = db.recent_message().channel_id().filter(msg.channel_id).collect();
// Sort by seq_id ascending
recent_msgs.sort_by_key(|m| m.seq_id);
// Delete the oldest ones until we are back at the limit
let to_delete = count - limit as usize;
for i in 0..to_delete {
db.recent_message().id().delete(recent_msgs[i].id);
}
}
}
+16 -6
View File
@@ -263,19 +263,29 @@ pub fn visible_webrtc_signals(ctx: &ViewContext) -> Vec<WebRTCSignal> {
}
#[spacetimedb::view(accessor = visible_scrollback_messages, public)]
pub fn visible_scrollback_messages(ctx: &ViewContext) -> impl Query<Message> {
pub fn visible_scrollback_messages(ctx: &ViewContext) -> Vec<Message> {
let identity = ctx.sender();
let mut results = Vec::new();
if let Some(sub) = ctx.db.channel_subscription().identity().find(identity) {
let cid = sub.channel_id;
let min_seq = sub.earliest_seq_id;
ctx.from
.message()
.r#where(move |m| m.channel_id.eq(cid).and(m.seq_id.gte(min_seq)))
} else {
ctx.from.message().r#where(|m| m.id.eq(0))
// HIGH PERFORMANCE: Uses composite index range scan on the MAIN message table
for msg in ctx.db.message().channel_seq().filter((cid, min_seq..)) {
results.push(Message {
id: msg.id, sender: msg.sender, sent: msg.sent, text: msg.text.clone(),
channel_id: msg.channel_id, server_id: msg.server_id, thread_id: msg.thread_id,
reactions: msg.reactions.clone(), image_ids: msg.image_ids.clone(),
thread_name: msg.thread_name.clone(), thread_reply_count: msg.thread_reply_count,
edited: msg.edited, is_encrypted: msg.is_encrypted, seq_id: msg.seq_id,
});
}
}
results
}
#[spacetimedb::view(accessor = my_channel_subscriptions, public)]
pub fn my_channel_subscriptions(ctx: &ViewContext) -> Vec<MyChannelSubscriptionRow> {
if let Some(sub) = ctx.db.channel_subscription().identity().find(ctx.sender()) {
+36 -51
View File
@@ -38,19 +38,44 @@ export class MessagingService {
*/
onMessageReceived?: (params: { channelId: bigint, senderIdentity: Identity, id: bigint, text: string, isEncrypted: boolean }) => void;
// Internal reactive state from SpacetimeDB
// Optimized Per-Channel/Per-Message Buckets
#channelBuckets = new SvelteMap<bigint, {
map: Map<bigint, Types.Message>,
sorted: Types.Message[]
}>();
isLoadingMore = $state(false);
#readyChannels = new SvelteSet<bigint>();
isGlobalSyncDone = $state(false);
encryptionOptIn = $state(new SvelteSet<string>());
#mySubscriptions = $state<readonly Types.MyChannelSubscriptionRow[]>([]);
#recentMessages = $state<readonly Types.Message[]>([]);
#scrollbackMessages = $state<readonly Types.Message[]>([]);
isLoadingMore = $state(false);
#channelBuckets = $derived.by(() => {
const buckets = new Map<bigint, {
map: Map<bigint, Types.Message>,
sorted: Types.Message[]
}>();
const all = [...this.#recentMessages, ...this.#scrollbackMessages, ...this.#db.threadMessages];
for (const m of all) {
let bucket = buckets.get(m.channelId);
if (!bucket) {
bucket = { map: new Map(), sorted: [] };
buckets.set(m.channelId, bucket);
}
bucket.map.set(m.id, m);
}
// Sort each bucket
for (const bucket of buckets.values()) {
bucket.sorted = Array.from(bucket.map.values()).sort((a, b) => {
if (a.seqId < b.seqId) return -1;
if (a.seqId > b.seqId) return 1;
return a.sent.microsSinceUnixEpoch < b.sent.microsSinceUnixEpoch ? -1 : 1;
});
}
return buckets;
});
get isMessagesReady() {
const cid = this.#nav.activeChannelId;
if (!cid) return true;
@@ -97,22 +122,10 @@ export class MessagingService {
mySubscriptionsStore.subscribe((v) => (this.#mySubscriptions = v));
let recentMessages = $state<readonly Types.Message[]>([]);
let scrollbackMessages = $state<readonly Types.Message[]>([]);
// Incremental update logic for visible messages
const seenMessageIds = new Set<bigint>();
let initialSyncComplete = false;
// Plan D & Global Sync: Reactively update buckets whenever any message source changes
$effect(() => {
this.#updateBuckets([
...recentMessages,
...scrollbackMessages,
...this.#db.threadMessages
]);
});
visibleMessagesStore.subscribe((v) => {
if (v.length > 0) {
console.log(`[MessagingService] Received batch of ${v.length} messages. Initial sync complete: ${initialSyncComplete}`);
@@ -136,7 +149,7 @@ export class MessagingService {
}
}
recentMessages = v;
this.#recentMessages = v;
// Mark any channel that has received messages as ready
for (const m of v) {
@@ -154,7 +167,7 @@ export class MessagingService {
this.#readyChannels.add(cid);
}
// Also mark all channels currently in our recentMessages as ready
for (const m of recentMessages) {
for (const m of this.#recentMessages) {
this.#readyChannels.add(m.channelId);
}
}
@@ -164,7 +177,7 @@ export class MessagingService {
// Add scrollback messages to seen set so they NEVER trigger notifications
for (const msg of v) seenMessageIds.add(msg.id);
scrollbackMessages = v;
this.#scrollbackMessages = v;
});
$effect(() => {
@@ -207,9 +220,6 @@ export class MessagingService {
queries.push(`SELECT * FROM visible_scrollback_messages`);
queries.push(`SELECT * FROM visible_user_states`);
queries.push(`SELECT * FROM visible_typing_activity`);
// Fast-path recent activity for the ACTIVE channel only
queries.push(`SELECT * FROM visible_recent_activity WHERE channel_id = ${channelId}`);
}
console.log(`[MessagingService] Updating subscriptions: ${queries.length} queries`);
@@ -224,31 +234,6 @@ export class MessagingService {
});
}
#updateBuckets(newMessages: readonly Types.Message[]) {
const tempBuckets = new Map<bigint, Map<bigint, Types.Message>>();
for (const m of newMessages) {
let bucketMap = tempBuckets.get(m.channelId);
if (!bucketMap) {
bucketMap = new Map();
tempBuckets.set(m.channelId, bucketMap);
}
bucketMap.set(m.id, m);
}
this.#channelBuckets.clear();
for (const [chanId, messagesMap] of tempBuckets.entries()) {
const sorted = Array.from(messagesMap.values()).sort((a, b) => {
if (a.seqId < b.seqId) return -1;
if (a.seqId > b.seqId) return 1;
return a.sent.microsSinceUnixEpoch < b.sent.microsSinceUnixEpoch ? -1 : 1;
});
this.#channelBuckets.set(chanId, { map: messagesMap, sorted });
}
}
get synchronizedMessages() {
const all: Types.Message[] = [];
for (const bucket of this.#channelBuckets.values()) {