diff --git a/spacetimedb/src/reducers.rs b/spacetimedb/src/reducers.rs index f83dd63..e035f8b 100644 --- a/spacetimedb/src/reducers.rs +++ b/spacetimedb/src/reducers.rs @@ -682,37 +682,16 @@ pub fn set_sharing_screen(ctx: &ReducerContext, sharing: bool) { ctx.db.watching().id().delete(id); } - let s_offers: Vec<_> = ctx + let signals: Vec<_> = ctx .db - .screen_sdp_offer() + .webrtc_signal() .sender() .filter(ctx.sender()) - .map(|r| r.id) + .filter(|s| s.media_type == MediaType::Screen) + .map(|s| s.id) .collect(); - for id in s_offers { - ctx.db.screen_sdp_offer().id().delete(id); - } - - let s_answers: Vec<_> = ctx - .db - .screen_sdp_answer() - .sender() - .filter(ctx.sender()) - .map(|r| r.id) - .collect(); - for id in s_answers { - ctx.db.screen_sdp_answer().id().delete(id); - } - - let s_ice: Vec<_> = ctx - .db - .screen_ice_candidate() - .sender() - .filter(ctx.sender()) - .map(|r| r.id) - .collect(); - for id in s_ice { - ctx.db.screen_ice_candidate().id().delete(id); + for id in signals { + ctx.db.webrtc_signal().id().delete(id); } } } @@ -777,97 +756,21 @@ pub fn leave_voice(ctx: &ReducerContext) { } #[spacetimedb::reducer] -pub fn send_voice_sdp_offer( +pub fn send_webrtc_signal( ctx: &ReducerContext, receiver: Identity, - sdp: String, + signal_kind: SignalKind, + media_type: MediaType, + data: String, channel_id: u64, ) { - ctx.db.voice_sdp_offer().insert(VoiceSdpOffer { + ctx.db.webrtc_signal().insert(WebRTCSignal { id: 0, sender: ctx.sender(), receiver, - sdp, - channel_id, - }); -} - -#[spacetimedb::reducer] -pub fn send_voice_sdp_answer( - ctx: &ReducerContext, - receiver: Identity, - sdp: String, - channel_id: u64, -) { - ctx.db.voice_sdp_answer().insert(VoiceSdpAnswer { - id: 0, - sender: ctx.sender(), - receiver, - sdp, - channel_id, - }); -} - -#[spacetimedb::reducer] -pub fn send_voice_ice_candidate( - ctx: &ReducerContext, - receiver: Identity, - candidate: String, - channel_id: u64, -) { - ctx.db.voice_ice_candidate().insert(VoiceIceCandidate { - id: 0, - sender: ctx.sender(), - receiver, - candidate, - channel_id, - }); -} - -#[spacetimedb::reducer] -pub fn send_screen_sdp_offer( - ctx: &ReducerContext, - receiver: Identity, - sdp: String, - channel_id: u64, -) { - ctx.db.screen_sdp_offer().insert(ScreenSdpOffer { - id: 0, - sender: ctx.sender(), - receiver, - sdp, - channel_id, - }); -} - -#[spacetimedb::reducer] -pub fn send_screen_sdp_answer( - ctx: &ReducerContext, - receiver: Identity, - sdp: String, - channel_id: u64, -) { - ctx.db.screen_sdp_answer().insert(ScreenSdpAnswer { - id: 0, - sender: ctx.sender(), - receiver, - sdp, - channel_id, - }); -} - -#[spacetimedb::reducer] -pub fn send_screen_ice_candidate( - ctx: &ReducerContext, - receiver: Identity, - candidate: String, - channel_id: u64, -) { - ctx.db.screen_ice_candidate().insert(ScreenIceCandidate { - id: 0, - sender: ctx.sender(), - receiver, - candidate, + signal_kind, + media_type, + data, channel_id, }); } diff --git a/spacetimedb/src/tables.rs b/spacetimedb/src/tables.rs index b5e3495..7393421 100644 --- a/spacetimedb/src/tables.rs +++ b/spacetimedb/src/tables.rs @@ -117,36 +117,21 @@ pub struct Watching { pub channel_id: u64, } -#[spacetimedb::table(accessor = voice_sdp_offer)] -pub struct VoiceSdpOffer { - #[primary_key] - #[auto_inc] - pub id: u64, - #[index(btree)] - pub sender: Identity, - #[index(btree)] - pub receiver: Identity, - pub sdp: String, - #[index(btree)] - pub channel_id: u64, +#[derive(spacetimedb::SpacetimeType, Clone, Copy, Debug, PartialEq)] +pub enum SignalKind { + Offer, + Answer, + IceCandidate, } -#[spacetimedb::table(accessor = voice_sdp_answer)] -pub struct VoiceSdpAnswer { - #[primary_key] - #[auto_inc] - pub id: u64, - #[index(btree)] - pub sender: Identity, - #[index(btree)] - pub receiver: Identity, - pub sdp: String, - #[index(btree)] - pub channel_id: u64, +#[derive(spacetimedb::SpacetimeType, Clone, Copy, Debug, PartialEq)] +pub enum MediaType { + Voice, + Screen, } -#[spacetimedb::table(accessor = voice_ice_candidate)] -pub struct VoiceIceCandidate { +#[spacetimedb::table(accessor = webrtc_signal)] +pub struct WebRTCSignal { #[primary_key] #[auto_inc] pub id: u64, @@ -154,49 +139,9 @@ pub struct VoiceIceCandidate { pub sender: Identity, #[index(btree)] pub receiver: Identity, - pub candidate: String, - #[index(btree)] - pub channel_id: u64, -} - -#[spacetimedb::table(accessor = screen_sdp_offer)] -pub struct ScreenSdpOffer { - #[primary_key] - #[auto_inc] - pub id: u64, - #[index(btree)] - pub sender: Identity, - #[index(btree)] - pub receiver: Identity, - pub sdp: String, - #[index(btree)] - pub channel_id: u64, -} - -#[spacetimedb::table(accessor = screen_sdp_answer)] -pub struct ScreenSdpAnswer { - #[primary_key] - #[auto_inc] - pub id: u64, - #[index(btree)] - pub sender: Identity, - #[index(btree)] - pub receiver: Identity, - pub sdp: String, - #[index(btree)] - pub channel_id: u64, -} - -#[spacetimedb::table(accessor = screen_ice_candidate)] -pub struct ScreenIceCandidate { - #[primary_key] - #[auto_inc] - pub id: u64, - #[index(btree)] - pub sender: Identity, - #[index(btree)] - pub receiver: Identity, - pub candidate: String, + pub signal_kind: SignalKind, + pub media_type: MediaType, + pub data: String, #[index(btree)] pub channel_id: u64, } diff --git a/spacetimedb/src/utils.rs b/spacetimedb/src/utils.rs index cbbd5cf..8e349f7 100644 --- a/spacetimedb/src/utils.rs +++ b/spacetimedb/src/utils.rs @@ -354,54 +354,20 @@ pub fn clear_signaling_for_user(db: &Local, identity: Identity) { } for row in db - .voice_sdp_offer() + .webrtc_signal() .sender() .filter(identity) .collect::>() { - db.voice_sdp_offer().delete(row); + db.webrtc_signal().delete(row); } for row in db - .voice_sdp_offer() + .webrtc_signal() .receiver() .filter(identity) .collect::>() { - db.voice_sdp_offer().delete(row); - } - - for row in db - .voice_sdp_answer() - .sender() - .filter(identity) - .collect::>() - { - db.voice_sdp_answer().delete(row); - } - for row in db - .voice_sdp_answer() - .receiver() - .filter(identity) - .collect::>() - { - db.voice_sdp_answer().delete(row); - } - - for row in db - .voice_ice_candidate() - .sender() - .filter(identity) - .collect::>() - { - db.voice_ice_candidate().delete(row); - } - for row in db - .voice_ice_candidate() - .receiver() - .filter(identity) - .collect::>() - { - db.voice_ice_candidate().delete(row); + db.webrtc_signal().delete(row); } } diff --git a/spacetimedb/src/views.rs b/spacetimedb/src/views.rs index 5229b7f..acb8305 100644 --- a/spacetimedb/src/views.rs +++ b/spacetimedb/src/views.rs @@ -52,6 +52,57 @@ pub struct VisibleDirectMessageRow { pub is_open_recipient: bool, } +#[spacetimedb::view(accessor = visible_recent_activity, public)] +pub fn visible_recent_activity(ctx: &ViewContext) -> Vec { + let identity = ctx.sender(); + let mut results = Vec::new(); + let mut seen_ids = std::collections::HashSet::new(); + + // 1. Servers I'm a member of + let my_server_ids: Vec = ctx + .db + .server_member() + .identity() + .filter(identity) + .map(|m| m.server_id) + .collect(); + + for server_id in my_server_ids { + for rm in ctx.db.recent_message().server_id().filter(server_id) { + if seen_ids.insert(rm.id) { + results.push(rm); + } + } + } + + // 2. Open DMs + let my_dms: Vec<_> = ctx + .db + .direct_message() + .sender() + .filter(identity) + .filter(|dm| dm.is_open_sender) + .chain( + ctx.db + .direct_message() + .recipient() + .filter(identity) + .filter(|dm| dm.is_open_recipient), + ) + .map(|dm| dm.channel_id) + .collect(); + + for channel_id in my_dms { + for rm in ctx.db.recent_message().channel_id().filter(channel_id) { + if seen_ids.insert(rm.id) { + results.push(rm); + } + } + } + + results +} + #[spacetimedb::view(accessor = visible_servers, public)] pub fn visible_servers(ctx: &ViewContext) -> Vec { let identity = ctx.sender(); @@ -168,58 +219,16 @@ pub fn visible_images(ctx: &ViewContext) -> Vec { results } -#[spacetimedb::view(accessor = visible_messages, public)] -pub fn visible_messages(ctx: &ViewContext) -> Vec { +#[spacetimedb::view(accessor = visible_webrtc_signals, public)] +pub fn visible_webrtc_signals(ctx: &ViewContext) -> Vec { let identity = ctx.sender(); let mut results = Vec::new(); - - // 1. Find all server IDs I am in - let my_server_ids: std::collections::HashSet = ctx - .db - .server_member() - .identity() - .filter(identity) - .map(|m| m.server_id) - .collect(); - - // 2. Find recent messages for those servers - for server_id in my_server_ids { - for rm in ctx.db.recent_message().server_id().filter(server_id) { - results.push(rm); - } + for signal in ctx.db.webrtc_signal().sender().filter(identity) { + results.push(signal); } - - results -} - -#[spacetimedb::view(accessor = visible_dm_messages, public)] -pub fn visible_dm_messages(ctx: &ViewContext) -> Vec { - let mut results = Vec::new(); - let identity = ctx.sender(); - - // DM FAST PATH: Recent messages from Open DMs - let my_dms: Vec<_> = ctx - .db - .direct_message() - .sender() - .filter(identity) - .filter(|dm| dm.is_open_sender) - .chain( - ctx.db - .direct_message() - .recipient() - .filter(identity) - .filter(|dm| dm.is_open_recipient), - ) - .map(|dm| dm.channel_id) - .collect(); - - for channel_id in my_dms { - for rm in ctx.db.recent_message().channel_id().filter(channel_id) { - results.push(rm); - } + for signal in ctx.db.webrtc_signal().receiver().filter(identity) { + results.push(signal); } - results } @@ -283,84 +292,6 @@ pub fn visible_scrollback_messages(ctx: &ViewContext) -> Vec results } -#[spacetimedb::view(accessor = visible_voice_sdp_offers, public)] -pub fn visible_voice_sdp_offers(ctx: &ViewContext) -> Vec { - let identity = ctx.sender(); - let mut results = Vec::new(); - for offer in ctx.db.voice_sdp_offer().sender().filter(identity) { - results.push(offer); - } - for offer in ctx.db.voice_sdp_offer().receiver().filter(identity) { - results.push(offer); - } - results -} - -#[spacetimedb::view(accessor = visible_voice_sdp_answers, public)] -pub fn visible_voice_sdp_answers(ctx: &ViewContext) -> Vec { - let identity = ctx.sender(); - let mut results = Vec::new(); - for answer in ctx.db.voice_sdp_answer().sender().filter(identity) { - results.push(answer); - } - for answer in ctx.db.voice_sdp_answer().receiver().filter(identity) { - results.push(answer); - } - results -} - -#[spacetimedb::view(accessor = visible_voice_ice_candidates, public)] -pub fn visible_voice_ice_candidates(ctx: &ViewContext) -> Vec { - let identity = ctx.sender(); - let mut results = Vec::new(); - for candidate in ctx.db.voice_ice_candidate().sender().filter(identity) { - results.push(candidate); - } - for candidate in ctx.db.voice_ice_candidate().receiver().filter(identity) { - results.push(candidate); - } - results -} - -#[spacetimedb::view(accessor = visible_screen_sdp_offers, public)] -pub fn visible_screen_sdp_offers(ctx: &ViewContext) -> Vec { - let identity = ctx.sender(); - let mut results = Vec::new(); - for offer in ctx.db.screen_sdp_offer().sender().filter(identity) { - results.push(offer); - } - for offer in ctx.db.screen_sdp_offer().receiver().filter(identity) { - results.push(offer); - } - results -} - -#[spacetimedb::view(accessor = visible_screen_sdp_answers, public)] -pub fn visible_screen_sdp_answers(ctx: &ViewContext) -> Vec { - let identity = ctx.sender(); - let mut results = Vec::new(); - for answer in ctx.db.screen_sdp_answer().sender().filter(identity) { - results.push(answer); - } - for answer in ctx.db.screen_sdp_answer().receiver().filter(identity) { - results.push(answer); - } - results -} - -#[spacetimedb::view(accessor = visible_screen_ice_candidates, public)] -pub fn visible_screen_ice_candidates(ctx: &ViewContext) -> Vec { - let identity = ctx.sender(); - let mut results = Vec::new(); - for candidate in ctx.db.screen_ice_candidate().sender().filter(identity) { - results.push(candidate); - } - for candidate in ctx.db.screen_ice_candidate().receiver().filter(identity) { - results.push(candidate); - } - results -} - #[spacetimedb::view(accessor = my_channel_subscriptions, public)] pub fn my_channel_subscriptions(ctx: &ViewContext) -> Vec { if let Some(sub) = ctx.db.channel_subscription().identity().find(ctx.sender()) { diff --git a/src/chat/services/messaging.svelte.ts b/src/chat/services/messaging.svelte.ts index b18ce1c..192a6d2 100644 --- a/src/chat/services/messaging.svelte.ts +++ b/src/chat/services/messaging.svelte.ts @@ -76,21 +76,19 @@ export class MessagingService { this.#extendSubscriptionReducer = useReducer(reducers.extendSubscription); this.#clearUploadStatusReducer = useReducer(reducers.clearUploadStatus); - const [visibleMessagesStore, visibleMessagesReadyStore] = useTable(tables.visible_messages); - const [visibleDmMessagesStore] = useTable(tables.visible_dm_messages); + const [visibleMessagesStore, visibleMessagesReadyStore] = useTable(tables.visible_recent_activity); const [visibleScrollbackStore] = useTable(tables.visible_scrollback_messages); const [mySubscriptionsStore] = useTable(tables.my_channel_subscriptions); type CombinedMessageRow = Types.RecentMessage | Types.VisibleMessageRow; - let serverMessages: readonly Types.RecentMessage[] = []; - let dmMessages: readonly Types.RecentMessage[] = []; + let recentMessages: readonly Types.RecentMessage[] = []; let scrollbackMessages: readonly Types.VisibleMessageRow[] = []; // Incremental update logic for visible messages visibleMessagesStore.subscribe((v) => { - serverMessages = v; - this.#updateBuckets([...serverMessages, ...dmMessages, ...scrollbackMessages]); + recentMessages = v; + this.#updateBuckets([...recentMessages, ...scrollbackMessages]); // Mark any channel that has received messages as ready for (const m of v) { @@ -105,21 +103,16 @@ export class MessagingService { if (cid) { this.#readyChannels.add(cid); } - // Also mark all channels currently in our serverMessages as ready - for (const m of serverMessages) { + // Also mark all channels currently in our recentMessages as ready + for (const m of recentMessages) { this.#readyChannels.add(m.channelId); } } }); - visibleDmMessagesStore.subscribe((v) => { - dmMessages = v; - this.#updateBuckets([...serverMessages, ...dmMessages, ...scrollbackMessages]); - }); - visibleScrollbackStore.subscribe((v) => { scrollbackMessages = v; - this.#updateBuckets([...serverMessages, ...dmMessages, ...scrollbackMessages]); + this.#updateBuckets([...recentMessages, ...scrollbackMessages]); }); mySubscriptionsStore.subscribe((v) => (this.#mySubscriptions = v)); @@ -151,16 +144,10 @@ export class MessagingService { queries.push(`SELECT * FROM my_channel_subscriptions`); // Recent messages for all joined channels/DMs - queries.push(`SELECT * FROM visible_messages`); - queries.push(`SELECT * FROM visible_dm_messages`); + queries.push(`SELECT * FROM visible_recent_activity`); // WebRTC Signaling - queries.push(`SELECT * FROM visible_voice_sdp_offers`); - queries.push(`SELECT * FROM visible_voice_sdp_answers`); - queries.push(`SELECT * FROM visible_voice_ice_candidates`); - queries.push(`SELECT * FROM visible_screen_sdp_offers`); - queries.push(`SELECT * FROM visible_screen_sdp_answers`); - queries.push(`SELECT * FROM visible_screen_ice_candidates`); + queries.push(`SELECT * FROM visible_webrtc_signals`); } // 2. View-specific queries diff --git a/src/chat/services/webrtc/channel-audio-webrtc.svelte.ts b/src/chat/services/webrtc/channel-audio-webrtc.svelte.ts index 331d7b4..e471631 100644 --- a/src/chat/services/webrtc/channel-audio-webrtc.svelte.ts +++ b/src/chat/services/webrtc/channel-audio-webrtc.svelte.ts @@ -6,22 +6,16 @@ import * as Types from "../../../module_bindings/types"; export class ChannelAudioWebRTCService { voiceStates = $state([]); - offers = $state([]); - answers = $state([]); - iceCandidates = $state([]); + signals = $state([]); - #sendSdpOffer = useReducer(reducers.sendVoiceSdpOffer); - #sendSdpAnswer = useReducer(reducers.sendVoiceSdpAnswer); - #sendIceCandidate = useReducer(reducers.sendVoiceIceCandidate); + #sendSignal = useReducer(reducers.sendWebrtcSignal); // --- Signaling State --- makingOffer = new Map(); ignoreOffer = new Map(); signalingQueue = new Map>(); - processedOffers = new Set(); - processedAnswers = new Set(); - processedCandidates = new Set(); + processedSignals = new Set(); candidateQueue = new Map(); identity = $state(null); @@ -45,13 +39,8 @@ export class ChannelAudioWebRTCService { const [vsStore] = useTable(tables.voice_state); vsStore.subscribe((v) => (this.voiceStates = v)); - const [offStore] = useTable(tables.visible_voice_sdp_offers); - const [ansStore] = useTable(tables.visible_voice_sdp_answers); - const [iceStore] = useTable(tables.visible_voice_ice_candidates); - - offStore.subscribe((v) => (this.offers = v)); - ansStore.subscribe((v) => (this.answers = v)); - iceStore.subscribe((v) => (this.iceCandidates = v)); + const [signalsStore] = useTable(tables.visible_webrtc_signals); + signalsStore.subscribe((v) => (this.signals = v)); this.peerManager = new PeerManagerService( identity, @@ -104,9 +93,7 @@ export class ChannelAudioWebRTCService { this.peerManager.peers.forEach((_, id) => this.peerManager.closePeer(id), ); - this.processedOffers.clear(); - this.processedAnswers.clear(); - this.processedCandidates.clear(); + this.processedSignals.clear(); this.makingOffer.clear(); this.ignoreOffer.clear(); this.signalingQueue.clear(); @@ -147,40 +134,28 @@ export class ChannelAudioWebRTCService { $effect(() => { if (!this.connectedChannelId || !this.identity) return; - // Handle Offers - const myOffers = this.offers.filter( - (o) => - o.channelId === this.connectedChannelId && - o.receiver.isEqual(this.identity!), + const mySignals = this.signals.filter( + (s) => + s.channelId === this.connectedChannelId && + s.receiver.isEqual(this.identity!) && + s.mediaType.tag === "Voice" ); - for (const offerRow of myOffers) { - if (this.processedOffers.has(offerRow.id)) continue; - this.processedOffers.add(offerRow.id); - this.handleOffer(offerRow); - } - // Handle Answers - const myAnswers = this.answers.filter( - (a) => - a.channelId === this.connectedChannelId && - a.receiver.isEqual(this.identity!), - ); - for (const answerRow of myAnswers) { - if (this.processedAnswers.has(answerRow.id)) continue; - this.processedAnswers.add(answerRow.id); - this.handleAnswer(answerRow); - } + for (const signal of mySignals) { + if (this.processedSignals.has(signal.id)) continue; + this.processedSignals.add(signal.id); - // Handle ICE - const myIce = this.iceCandidates.filter( - (c) => - c.channelId === this.connectedChannelId && - c.receiver.isEqual(this.identity!), - ); - for (const candRow of myIce) { - if (this.processedCandidates.has(candRow.id)) continue; - this.processedCandidates.add(candRow.id); - this.handleIceCandidate(candRow); + switch (signal.signalKind.tag) { + case "Offer": + this.handleOffer(signal); + break; + case "Answer": + this.handleAnswer(signal); + break; + case "IceCandidate": + this.handleIceCandidate(signal); + break; + } } }); } @@ -228,9 +203,11 @@ export class ChannelAudioWebRTCService { try { this.makingOffer.set(peerIdHex, true); await pc.setLocalDescription(); - this.#sendSdpOffer({ + this.#sendSignal({ receiver: Identity.fromString(peerIdHex), - sdp: JSON.stringify(pc.localDescription), + signalKind: { tag: "Offer" }, + mediaType: { tag: "Voice" }, + data: JSON.stringify(pc.localDescription), channelId: this.connectedChannelId, }); } finally { @@ -241,16 +218,18 @@ export class ChannelAudioWebRTCService { onIceCandidate(peerIdHex: string, candidate: RTCIceCandidate) { if (this.connectedChannelId) { - this.#sendIceCandidate({ + this.#sendSignal({ receiver: Identity.fromString(peerIdHex), - candidate: JSON.stringify(candidate), + signalKind: { tag: "IceCandidate" }, + mediaType: { tag: "Voice" }, + data: JSON.stringify(candidate), channelId: this.connectedChannelId, }); } } - handleOffer(offerRow: Types.VoiceSdpOffer) { - const peerIdHex = offerRow.sender.toHexString(); + handleOffer(signal: Types.WebRTCSignal) { + const peerIdHex = signal.sender.toHexString(); this.enqueueSignalingTask(peerIdHex, async () => { const pc = this.peerManager.createPeerConnection(peerIdHex); if (!pc) return; @@ -264,13 +243,15 @@ export class ChannelAudioWebRTCService { if (offerCollision) await pc.setLocalDescription({ type: "rollback" }); await pc.setRemoteDescription( - new RTCSessionDescription(JSON.parse(offerRow.sdp)), + new RTCSessionDescription(JSON.parse(signal.data)), ); const answer = await pc.createAnswer(); await pc.setLocalDescription(answer); - this.#sendSdpAnswer({ - receiver: offerRow.sender, - sdp: JSON.stringify(answer), + this.#sendSignal({ + receiver: signal.sender, + signalKind: { tag: "Answer" }, + mediaType: { tag: "Voice" }, + data: JSON.stringify(answer), channelId: this.connectedChannelId!, }); await this.drainCandidateQueue(peerIdHex, pc); @@ -283,14 +264,14 @@ export class ChannelAudioWebRTCService { }); } - handleAnswer(answerRow: Types.VoiceSdpAnswer) { - const peerIdHex = answerRow.sender.toHexString(); + handleAnswer(signal: Types.WebRTCSignal) { + const peerIdHex = signal.sender.toHexString(); this.enqueueSignalingTask(peerIdHex, async () => { const peer = this.peerManager.getPeer(peerIdHex); if (!peer) return; try { await peer.pc.setRemoteDescription( - new RTCSessionDescription(JSON.parse(answerRow.sdp)), + new RTCSessionDescription(JSON.parse(signal.data)), ); await this.drainCandidateQueue(peerIdHex, peer.pc); } catch (e) { @@ -302,13 +283,13 @@ export class ChannelAudioWebRTCService { }); } - handleIceCandidate(candRow: Types.VoiceIceCandidate) { - const peerIdHex = candRow.sender.toHexString(); + handleIceCandidate(signal: Types.WebRTCSignal) { + const peerIdHex = signal.sender.toHexString(); this.enqueueSignalingTask(peerIdHex, async () => { const pc = this.peerManager.createPeerConnection(peerIdHex); if (!pc) return; try { - const candidate = JSON.parse(candRow.candidate); + const candidate = JSON.parse(signal.data); if (pc.remoteDescription) { await pc.addIceCandidate(new RTCIceCandidate(candidate)); } else if (!this.ignoreOffer.get(peerIdHex)) { diff --git a/src/chat/services/webrtc/local-media.svelte.ts b/src/chat/services/webrtc/local-media.svelte.ts index 8205c21..5573d1a 100644 --- a/src/chat/services/webrtc/local-media.svelte.ts +++ b/src/chat/services/webrtc/local-media.svelte.ts @@ -67,11 +67,11 @@ export class LocalMediaService { $effect(() => { if ( !this.localStream || - (!this.connectedChannelId && !this.isTestingMic) + (this.connectedChannelId === undefined && !this.isTestingMic) ) { this.currentInputLevel = 0; if (this.isTalking) { - if (this.connectedChannelId) { + if (this.connectedChannelId !== undefined) { this.#setTalking({ talking: false, channelId: this.connectedChannelId, @@ -150,7 +150,7 @@ export class LocalMediaService { if (isInputGated) { if (this.isTalking) { - if (this.connectedChannelId) { + if (this.connectedChannelId !== undefined) { this.#setTalking({ talking: false, channelId: this.connectedChannelId, @@ -164,7 +164,7 @@ export class LocalMediaService { if (this.currentInputLevel > this.voiceThreshold) { silenceFrames = 0; if (!this.isTalking) { - if (this.connectedChannelId) { + if (this.connectedChannelId !== undefined) { this.#setTalking({ talking: true, channelId: this.connectedChannelId, @@ -175,7 +175,7 @@ export class LocalMediaService { } else { silenceFrames++; if (silenceFrames > maxSilenceFrames && this.isTalking) { - if (this.connectedChannelId) { + if (this.connectedChannelId !== undefined) { this.#setTalking({ talking: false, channelId: this.connectedChannelId, diff --git a/src/chat/services/webrtc/screen-sharing-webrtc.svelte.ts b/src/chat/services/webrtc/screen-sharing-webrtc.svelte.ts index 2ae99a6..690328e 100644 --- a/src/chat/services/webrtc/screen-sharing-webrtc.svelte.ts +++ b/src/chat/services/webrtc/screen-sharing-webrtc.svelte.ts @@ -6,22 +6,16 @@ import * as Types from "../../../module_bindings/types"; export class ScreenSharingWebRTCService { watching = $state([]); - offers = $state([]); - answers = $state([]); - iceCandidates = $state([]); + signals = $state([]); - #sendSdpOffer = useReducer(reducers.sendScreenSdpOffer); - #sendSdpAnswer = useReducer(reducers.sendScreenSdpAnswer); - #sendIceCandidate = useReducer(reducers.sendScreenIceCandidate); + #sendSignal = useReducer(reducers.sendWebrtcSignal); // --- Signaling State --- makingOffer = new Map(); ignoreOffer = new Map(); signalingQueue = new Map>(); - processedOffers = new Set(); - processedAnswers = new Set(); - processedCandidates = new Set(); + processedSignals = new Set(); candidateQueue = new Map(); identity = $state(null); @@ -42,13 +36,8 @@ export class ScreenSharingWebRTCService { const [wStore] = useTable(tables.watching); wStore.subscribe((v) => (this.watching = v)); - const [offStore] = useTable(tables.visible_screen_sdp_offers); - const [ansStore] = useTable(tables.visible_screen_sdp_answers); - const [iceStore] = useTable(tables.visible_screen_ice_candidates); - - offStore.subscribe((v) => (this.offers = v)); - ansStore.subscribe((v) => (this.answers = v)); - iceStore.subscribe((v) => (this.iceCandidates = v)); + const [signalsStore] = useTable(tables.visible_webrtc_signals); + signalsStore.subscribe((v) => (this.signals = v)); this.peerManager = new PeerManagerService( identity, @@ -99,9 +88,7 @@ export class ScreenSharingWebRTCService { this.peerManager.peers.forEach((_, id) => this.peerManager.closePeer(id), ); - this.processedOffers.clear(); - this.processedAnswers.clear(); - this.processedCandidates.clear(); + this.processedSignals.clear(); this.makingOffer.clear(); this.ignoreOffer.clear(); this.signalingQueue.clear(); @@ -143,37 +130,28 @@ export class ScreenSharingWebRTCService { $effect(() => { if (!this.connectedChannelId || !this.identity) return; - const myOffers = this.offers.filter( - (o) => - o.channelId === this.connectedChannelId && - o.receiver.isEqual(this.identity!), + const mySignals = this.signals.filter( + (s) => + s.channelId === this.connectedChannelId && + s.receiver.isEqual(this.identity!) && + s.mediaType.tag === "Screen" ); - for (const offerRow of myOffers) { - if (this.processedOffers.has(offerRow.id)) continue; - this.processedOffers.add(offerRow.id); - this.handleOffer(offerRow); - } - const myAnswers = this.answers.filter( - (a) => - a.channelId === this.connectedChannelId && - a.receiver.isEqual(this.identity!), - ); - for (const answerRow of myAnswers) { - if (this.processedAnswers.has(answerRow.id)) continue; - this.processedAnswers.add(answerRow.id); - this.handleAnswer(answerRow); - } + for (const signal of mySignals) { + if (this.processedSignals.has(signal.id)) continue; + this.processedSignals.add(signal.id); - const myIce = this.iceCandidates.filter( - (c) => - c.channelId === this.connectedChannelId && - c.receiver.isEqual(this.identity!), - ); - for (const candRow of myIce) { - if (this.processedCandidates.has(candRow.id)) continue; - this.processedCandidates.add(candRow.id); - this.handleIceCandidate(candRow); + switch (signal.signalKind.tag) { + case "Offer": + this.handleOffer(signal); + break; + case "Answer": + this.handleAnswer(signal); + break; + case "IceCandidate": + this.handleIceCandidate(signal); + break; + } } }); } @@ -221,9 +199,11 @@ export class ScreenSharingWebRTCService { try { this.makingOffer.set(peerIdHex, true); await pc.setLocalDescription(); - this.#sendSdpOffer({ + this.#sendSignal({ receiver: Identity.fromString(peerIdHex), - sdp: JSON.stringify(pc.localDescription), + signalKind: { tag: "Offer" }, + mediaType: { tag: "Screen" }, + data: JSON.stringify(pc.localDescription), channelId: this.connectedChannelId, }); } finally { @@ -234,16 +214,18 @@ export class ScreenSharingWebRTCService { onIceCandidate(peerIdHex: string, candidate: RTCIceCandidate) { if (this.connectedChannelId) { - this.#sendIceCandidate({ + this.#sendSignal({ receiver: Identity.fromString(peerIdHex), - candidate: JSON.stringify(candidate), + signalKind: { tag: "IceCandidate" }, + mediaType: { tag: "Screen" }, + data: JSON.stringify(candidate), channelId: this.connectedChannelId, }); } } - handleOffer(offerRow: Types.ScreenSdpOffer) { - const peerIdHex = offerRow.sender.toHexString(); + handleOffer(signal: Types.WebRTCSignal) { + const peerIdHex = signal.sender.toHexString(); this.enqueueSignalingTask(peerIdHex, async () => { const pc = this.peerManager.createPeerConnection(peerIdHex); if (!pc) return; @@ -257,13 +239,15 @@ export class ScreenSharingWebRTCService { if (offerCollision) await pc.setLocalDescription({ type: "rollback" }); await pc.setRemoteDescription( - new RTCSessionDescription(JSON.parse(offerRow.sdp)), + new RTCSessionDescription(JSON.parse(signal.data)), ); const answer = await pc.createAnswer(); await pc.setLocalDescription(answer); - this.#sendSdpAnswer({ - receiver: offerRow.sender, - sdp: JSON.stringify(answer), + this.#sendSignal({ + receiver: signal.sender, + signalKind: { tag: "Answer" }, + mediaType: { tag: "Screen" }, + data: JSON.stringify(answer), channelId: this.connectedChannelId!, }); await this.drainCandidateQueue(peerIdHex, pc); @@ -276,14 +260,14 @@ export class ScreenSharingWebRTCService { }); } - handleAnswer(answerRow: Types.ScreenSdpAnswer) { - const peerIdHex = answerRow.sender.toHexString(); + handleAnswer(signal: Types.WebRTCSignal) { + const peerIdHex = signal.sender.toHexString(); this.enqueueSignalingTask(peerIdHex, async () => { const peer = this.peerManager.getPeer(peerIdHex); if (!peer) return; try { await peer.pc.setRemoteDescription( - new RTCSessionDescription(JSON.parse(answerRow.sdp)), + new RTCSessionDescription(JSON.parse(signal.data)), ); await this.drainCandidateQueue(peerIdHex, peer.pc); } catch (e) { @@ -295,13 +279,13 @@ export class ScreenSharingWebRTCService { }); } - handleIceCandidate(candRow: Types.ScreenIceCandidate) { - const peerIdHex = candRow.sender.toHexString(); + handleIceCandidate(signal: Types.WebRTCSignal) { + const peerIdHex = signal.sender.toHexString(); this.enqueueSignalingTask(peerIdHex, async () => { const pc = this.peerManager.createPeerConnection(peerIdHex); if (!pc) return; try { - const candidate = JSON.parse(candRow.candidate); + const candidate = JSON.parse(signal.data); if (pc.remoteDescription) { await pc.addIceCandidate(new RTCIceCandidate(candidate)); } else if (!this.ignoreOffer.get(peerIdHex)) { diff --git a/src/chat/services/webrtc/webrtc.svelte.ts b/src/chat/services/webrtc/webrtc.svelte.ts index ec83376..71fbce4 100644 --- a/src/chat/services/webrtc/webrtc.svelte.ts +++ b/src/chat/services/webrtc/webrtc.svelte.ts @@ -128,20 +128,20 @@ export class WebRTCService { // Sync mute/deafen to DB $effect(() => { - if (this.connectedChannelId) { + if (this.connectedChannelId !== undefined) { this.#setMuteReducer({ muted: this.localMedia.isMuted }); } }); $effect(() => { - if (this.connectedChannelId) { + if (this.connectedChannelId !== undefined) { this.#setDeafenReducer({ deafened: this.localMedia.isDeafened }); } }); // Orchestration $effect(() => { - if (this.connectedChannelId && this.identity) { + if (this.connectedChannelId !== undefined && this.identity) { console.log( `[WebRTC] Joined channel ${this.connectedChannelId}, requesting mic...`, ); @@ -162,7 +162,7 @@ export class WebRTCService { }; startWatching = (peerIdentity: Identity) => { - if (this.connectedChannelId) { + if (this.connectedChannelId !== undefined) { this.#startWatchingReducer({ watchee: peerIdentity, channelId: this.connectedChannelId,