Files
zep/src/chat/services/messaging.svelte.ts
T
2026-04-16 19:24:18 -04:00

454 lines
15 KiB
TypeScript

import { DatabaseService } from "./database.svelte";
import { NavigationService } from "./navigation.svelte";
import { useReducer, useTable } from "spacetimedb/svelte";
import { reducers, tables } from "../../module_bindings";
import * as Types from "../../module_bindings/types";
import { getConnection } from "../../config";
import { untrack } from "svelte";
import { SvelteMap, SvelteSet } from "svelte/reactivity";
import type { Identity } from "spacetimedb";
export class MessagingService {
#db: DatabaseService;
#nav: NavigationService;
#identity: () => Identity | null;
#sendMessageReducer: any;
#createThreadWithMessageReducer: any;
#uploadImageReducer: any;
#uploadCustomEmojiReducer: any;
#toggleReactionReducer: any;
#editMessageReducer: any;
#deleteMessageReducer: any;
#setTypingReducer: any;
#subscribeToChannelReducer: any;
#extendSubscriptionReducer: any;
#clearUploadStatusReducer: any;
/**
* Hook for ChatService to intercept and potentially encrypt messages.
* returns { text: string, isEncrypted: boolean }
*/
onSendMessage?: (text: string, channelId: bigint) => Promise<{ text: string, isEncrypted: boolean }>;
/**
* Hook triggered when a new message arrives from the server.
*/
onMessageReceived?: (params: { channelId: bigint, senderIdentity: Identity, id: bigint, text: string, isEncrypted: boolean }) => void;
// Internal reactive state from SpacetimeDB
// Optimized Per-Channel/Per-Message Buckets
#channelBuckets = new SvelteMap<bigint, {
map: Map<bigint, Types.Message>,
sorted: Types.Message[]
}>();
isLoadingMore = $state(false);
#readyChannels = new SvelteSet<bigint>();
isGlobalSyncDone = $state(false);
encryptionOptIn = $state(new SvelteSet<string>());
#mySubscriptions = $state<readonly Types.MyChannelSubscriptionRow[]>([]);
get isMessagesReady() {
const cid = this.#nav.activeChannelId;
if (!cid) return true;
// If the global sync of recent messages is done, every channel is ready for its initial view
if (this.isGlobalSyncDone) return true;
// If it's explicitly in our ready set (e.g. via onApplied)
if (this.#readyChannels.has(cid)) return true;
// Fallback: If we already have messages for this channel in our bucket
const bucket = this.#channelBuckets.get(cid);
if (bucket && bucket.sorted.length > 0) return true;
return false;
}
constructor(
db: DatabaseService,
nav: NavigationService,
identity: () => Identity | null,
) {
this.#db = db;
this.#nav = nav;
this.#identity = identity;
this.#sendMessageReducer = useReducer(reducers.sendMessage);
this.#createThreadWithMessageReducer = useReducer(reducers.createThreadWithMessage);
this.#uploadImageReducer = useReducer(reducers.uploadImage);
this.#uploadCustomEmojiReducer = useReducer(reducers.uploadCustomEmoji);
this.#toggleReactionReducer = useReducer(reducers.toggleReaction);
this.#editMessageReducer = useReducer(reducers.editMessage);
this.#deleteMessageReducer = useReducer(reducers.deleteMessage);
this.#setTypingReducer = useReducer(reducers.setTyping);
this.#subscribeToChannelReducer = useReducer(reducers.subscribeToChannel);
this.#extendSubscriptionReducer = useReducer(reducers.extendSubscription);
this.#clearUploadStatusReducer = useReducer(reducers.clearUploadStatus);
const [visibleMessagesStore, visibleMessagesReadyStore] = useTable(tables.visible_recent_activity);
const [visibleScrollbackStore] = useTable(tables.visible_scrollback_messages);
const [mySubscriptionsStore] = useTable(tables.my_channel_subscriptions);
mySubscriptionsStore.subscribe((v) => (this.#mySubscriptions = v));
let recentMessages: readonly Types.Message[] = [];
let scrollbackMessages: readonly Types.Message[] = [];
// Incremental update logic for visible messages
const seenMessageIds = new Set<bigint>();
let initialSyncComplete = false;
visibleMessagesStore.subscribe((v) => {
if (v.length > 0) {
console.log(`[MessagingService] Received batch of ${v.length} messages. Initial sync complete: ${initialSyncComplete}`);
}
// If this is a new batch, identify truly new messages
for (const msg of v) {
if (!seenMessageIds.has(msg.id)) {
// Only trigger notifications if the initial database sync is fully complete
if (initialSyncComplete && this.onMessageReceived) {
console.log(`[MessagingService] Dispatching notification for message ${msg.id}`);
this.onMessageReceived({
channelId: msg.channelId,
senderIdentity: msg.sender,
id: msg.id,
text: msg.text,
isEncrypted: msg.isEncrypted
});
}
seenMessageIds.add(msg.id);
}
}
recentMessages = v;
this.#updateBuckets([...recentMessages, ...scrollbackMessages]);
// Mark any channel that has received messages as ready
for (const m of v) {
this.#readyChannels.add(m.channelId);
}
});
visibleMessagesReadyStore.subscribe((v) => {
console.log(`[MessagingService] Global sync status: ${v}`);
this.isGlobalSyncDone = v;
if (v) {
initialSyncComplete = true;
const cid = untrack(() => this.#nav.activeChannelId);
if (cid) {
this.#readyChannels.add(cid);
}
// Also mark all channels currently in our recentMessages as ready
for (const m of recentMessages) {
this.#readyChannels.add(m.channelId);
}
}
});
visibleScrollbackStore.subscribe((v) => {
// Add scrollback messages to seen set so they NEVER trigger notifications
for (const msg of v) seenMessageIds.add(msg.id);
scrollbackMessages = v;
this.#updateBuckets([...recentMessages, ...scrollbackMessages]);
});
$effect(() => {
const channelId = this.#nav.activeChannelId;
const identity = this.#identity();
const conn = getConnection();
if (!conn) return;
untrack(() => {
const queries: string[] = [];
// 1. Global/Session-long queries
queries.push("SELECT * FROM upload_status");
queries.push("SELECT * FROM visible_images");
queries.push("SELECT * FROM visible_image_blobs");
if (identity) {
const idHex = identity.toHexString();
queries.push("SELECT * FROM custom_emoji");
queries.push("SELECT * FROM system_configuration");
queries.push(`SELECT * FROM visible_servers`);
queries.push(`SELECT * FROM user WHERE identity = 0x${idHex}`);
queries.push(`SELECT * FROM visible_server_members`);
queries.push(`SELECT * FROM visible_channels`);
queries.push(`SELECT * FROM visible_direct_messages`);
queries.push(`SELECT * FROM my_channel_subscriptions`);
// WebRTC Signaling (Needs to stay global for incoming calls)
queries.push(`SELECT * FROM visible_webrtc_signals`);
}
// 2. View-specific queries
if (channelId) {
this.#subscribeToChannelReducer({ channelId });
queries.push(`SELECT * FROM visible_scrollback_messages`);
queries.push(`SELECT * FROM thread WHERE channel_id = ${channelId}`);
queries.push(`SELECT * FROM visible_user_states`);
queries.push(`SELECT * FROM visible_typing_activity`);
// Fast-path recent activity for the ACTIVE channel only
queries.push(`SELECT * FROM visible_recent_activity WHERE channel_id = ${channelId}`);
}
console.log(`[MessagingService] Updating subscriptions: ${queries.length} queries`);
conn.subscriptionBuilder()
.onApplied(() => {
if (channelId) {
this.#readyChannels.add(channelId);
}
})
.subscribe(queries);
});
});
}
#updateBuckets(newMessages: readonly Types.Message[]) {
const tempBuckets = new Map<bigint, Map<bigint, Types.Message>>();
for (const m of newMessages) {
let bucketMap = tempBuckets.get(m.channelId);
if (!bucketMap) {
bucketMap = new Map();
tempBuckets.set(m.channelId, bucketMap);
}
bucketMap.set(m.id, m);
}
this.#channelBuckets.clear();
for (const [chanId, messagesMap] of tempBuckets.entries()) {
const sorted = Array.from(messagesMap.values()).sort((a, b) => {
if (a.seqId < b.seqId) return -1;
if (a.seqId > b.seqId) return 1;
return a.sent.microsSinceUnixEpoch < b.sent.microsSinceUnixEpoch ? -1 : 1;
});
this.#channelBuckets.set(chanId, { map: messagesMap, sorted });
}
}
get synchronizedMessages() {
const all: Types.Message[] = [];
for (const bucket of this.#channelBuckets.values()) {
all.push(...bucket.sorted);
}
return all;
}
get allMessages() {
const channelId = this.#nav.activeChannelId;
if (!channelId) return [];
const bucket = this.#channelBuckets.get(channelId);
if (!bucket) return [];
return bucket.sorted;
}
get channelMessages() {
return this.allMessages.filter((m) => !m.threadId);
}
get activeThread() {
return this.#db.allThreads.find((t) => t.id === this.#nav.activeThreadId);
}
getMessageImages(messageId: bigint) {
const channelId = this.#nav.activeChannelId;
if (!channelId) return [];
const bucket = this.#channelBuckets.get(channelId);
if (!bucket) return [];
const msg = bucket.map.get(messageId);
return msg?.imageIds || [];
}
getMessageReactions(messageId: bigint) {
const channelId = this.#nav.activeChannelId;
if (!channelId) return [];
const bucket = this.#channelBuckets.get(channelId);
if (!bucket) return [];
const msg = bucket.map.get(messageId);
return msg?.reactions || [];
}
get threadMessages() {
const threadId = this.#nav.activeThreadId;
if (!threadId) return [];
const thread = this.activeThread;
if (!thread) return [];
const channelId = thread.channelId;
const bucket = this.#channelBuckets.get(channelId);
if (!bucket) return [];
return bucket.sorted.filter(m => m.threadId === threadId);
}
get hasMoreMessages() {
const channelId = this.#nav.activeChannelId;
if (!channelId) return false;
const sub = this.#mySubscriptions.find((s) => s.channelId === channelId);
if (!sub) return false;
// Check if the earliest message we have is > 1
// OR if we don't have all messages up to the latest head
const bucket = this.#channelBuckets.get(channelId);
if (!bucket || bucket.sorted.length === 0) return sub.lastSeqId > 0n;
return sub.earliestSeqId > 1n;
}
handleStartThread = (msg: Types.Message) => {
const existing = this.#db.allThreads.find((t) => t.parentMessageId === msg.id);
if (existing) {
this.#nav.activeThreadId = existing.id;
this.#nav.pendingThreadParentMessageId = null;
} else {
this.#nav.pendingThreadParentMessageId = msg.id;
this.#nav.activeThreadId = null;
}
};
handleSendMessage = async (text: string, threadId?: bigint, imageIds: bigint[] = []) => {
if (this.#nav.activeChannelId) {
let finalParams = { text, isEncrypted: false };
if (this.onSendMessage) {
finalParams = await this.onSendMessage(text, this.#nav.activeChannelId);
}
if (threadId) {
this.#sendMessageReducer({
channelId: this.#nav.activeChannelId,
text: finalParams.text,
threadId,
imageIds,
isEncrypted: finalParams.isEncrypted
});
} else if (this.#nav.pendingThreadParentMessageId) {
const parentMsgId = this.#nav.pendingThreadParentMessageId;
const parentMsg = this.allMessages.find((m) => m.id === parentMsgId);
const name = (parentMsg?.text && parentMsg.text.trim().length > 0)
? parentMsg.text.substring(0, 32)
: "New Thread";
this.#createThreadWithMessageReducer({
name,
channelId: this.#nav.activeChannelId,
parentMessageId: parentMsgId,
text: finalParams.text,
imageIds,
isEncrypted: finalParams.isEncrypted
});
} else {
this.#sendMessageReducer({
channelId: this.#nav.activeChannelId,
text: finalParams.text,
threadId: undefined,
imageIds,
isEncrypted: finalParams.isEncrypted
});
}
}
};
handleLoadMoreMessages = async (): Promise<boolean> => {
const channelId = this.#nav.activeChannelId;
if (!channelId || this.isLoadingMore) return false;
const msgs = this.channelMessages;
if (msgs.length === 0) return false;
const oldestMsg = msgs[0];
const oldestSeq = oldestMsg.seqId;
if (oldestSeq === undefined || oldestSeq <= 1n) {
return false;
}
this.isLoadingMore = true;
try {
// Fetch exactly 100 more messages (inclusive range)
const newEarliest = oldestSeq > 100n ? oldestSeq - 100n : 1n;
this.#extendSubscriptionReducer({
channelId,
earliestSeqId: newEarliest
});
await new Promise(resolve => setTimeout(resolve, 500));
this.isLoadingMore = false;
return true;
} catch (e) {
console.error("Failed to load more messages:", e);
this.isLoadingMore = false;
return false;
}
};
toggleReaction = (messageId: bigint, emoji?: string, customEmojiId?: bigint) => {
this.#toggleReactionReducer({ messageId, emoji, customEmojiId });
};
uploadImage = async (data: Uint8Array, mimeType: string, name?: string): Promise<bigint> => {
const clientId = Math.random().toString(36).substring(2);
try {
this.#uploadImageReducer({ data, mimeType, name, clientId });
} catch (e) {
console.error(`[MessagingService] Reducer call failed for ${clientId}:`, e);
}
return new Promise((resolve, reject) => {
let attempts = 0;
const interval = setInterval(() => {
const status = this.#db.uploadStatus.find(s => s.clientId === clientId);
if (status) {
if (status.status === "success" && status.imageId !== undefined) {
clearInterval(interval);
this.#clearUploadStatusReducer({ clientId });
resolve(status.imageId);
} else if (status.status === "error") {
clearInterval(interval);
this.#clearUploadStatusReducer({ clientId });
reject(new Error(status.error || "Unknown upload error"));
}
}
if (++attempts > 100) { // 10 second timeout
clearInterval(interval);
reject(new Error("Upload timed out"));
}
}, 100);
});
};
uploadCustomEmoji = async (name: string, category: string, data: Uint8Array) => {
this.#uploadCustomEmojiReducer({ name, category, data });
};
handleEditMessage = (messageId: bigint, newText: string) => {
if (newText.trim()) {
this.#editMessageReducer({ messageId, newText });
}
};
handleDeleteMessage = (messageId: bigint) => {
this.#deleteMessageReducer({ messageId });
};
setTyping = (channelId: bigint, typing: boolean) => {
this.#setTypingReducer({ channelId, typing });
};
}