optimize read paths

This commit is contained in:
2026-04-21 21:12:50 -04:00
parent 2644b6ecd8
commit 6c305af961
5 changed files with 917 additions and 195 deletions
+19 -7
View File
@@ -10,7 +10,8 @@ pub use tables::*;
pub use utils::*;
pub use views::*;
pub const SYSTEM_IDENTITY: &str = "0000000000000000000000000000000000000000000000000000000000000000";
pub const SYSTEM_IDENTITY: &str =
"0000000000000000000000000000000000000000000000000000000000000000";
#[spacetimedb::reducer(init)]
pub fn init(ctx: &ReducerContext) {
@@ -97,7 +98,11 @@ pub fn on_connect(ctx: &ReducerContext) {
let sub = jwt.subject();
let issuer = jwt.issuer();
// Use first 8 chars of sub if it's a long string/UUID
initial_name = Some(if sub.len() > 12 { sub[..8].to_string() } else { sub.to_string() });
initial_name = Some(if sub.len() > 12 {
sub[..8].to_string()
} else {
sub.to_string()
});
is_anon = issuer.contains("localhost");
}
@@ -182,16 +187,23 @@ pub fn update_auth_info(ctx: &ReducerContext) {
let issuer = jwt.issuer();
user.issuer = Some(issuer.to_string());
user.subject = Some(sub.to_string());
// Flag as anonymous if issuer is localhost
user.anonymous = issuer.contains("localhost");
// Also update name if they don't have a custom one yet
if user.name.is_none() {
user.name = Some(if sub.len() > 12 { sub[..8].to_string() } else { sub.to_string() });
user.name = Some(if sub.len() > 12 {
sub[..8].to_string()
} else {
sub.to_string()
});
}
log::info!("update_auth_info: updated user with OIDC info (anon={})", user.anonymous);
log::info!(
"update_auth_info: updated user with OIDC info (anon={})",
user.anonymous
);
ctx.db.user().identity().update(user);
sync_server_member_info(&ctx.db, ctx.sender());
}
+582 -124
View File
@@ -5,35 +5,66 @@ use spacetimedb::{Identity, ReducerContext, Table};
#[spacetimedb::reducer]
pub fn set_typing(ctx: &ReducerContext, channel_id: u64, typing: bool) {
let existing = ctx.db.typing_activity().identity().find(ctx.sender());
let activity = TypingActivity { identity: ctx.sender(), channel_id, typing };
if existing.is_some() { ctx.db.typing_activity().identity().update(activity); }
else { ctx.db.typing_activity().insert(activity); }
let activity = TypingActivity {
identity: ctx.sender(),
channel_id,
typing,
};
if existing.is_some() {
ctx.db.typing_activity().identity().update(activity);
} else {
ctx.db.typing_activity().insert(activity);
}
}
#[spacetimedb::reducer]
pub fn upload_image(ctx: &ReducerContext, data: Vec<u8>, mime_type: String, name: Option<String>, client_id: Option<String>) {
pub fn upload_image(
ctx: &ReducerContext,
data: Vec<u8>,
mime_type: String,
name: Option<String>,
client_id: Option<String>,
) {
if let Some(ref cid) = client_id {
ctx.db.upload_status().insert(UploadStatus {
client_id: cid.clone(), identity: ctx.sender(), status: "pending".to_string(), image_id: None, error: None,
client_id: cid.clone(),
identity: ctx.sender(),
status: "pending".to_string(),
image_id: None,
error: None,
});
}
if data.len() > 4 * 1024 * 1024 {
if let Some(ref cid) = client_id {
ctx.db.upload_status().client_id().update(UploadStatus {
client_id: cid.clone(), identity: ctx.sender(), status: "error".to_string(),
image_id: None, error: Some("Image exceeds 4MB limit".to_string()),
client_id: cid.clone(),
identity: ctx.sender(),
status: "error".to_string(),
image_id: None,
error: Some("Image exceeds 4MB limit".to_string()),
});
}
return;
}
let img = ctx.db.image().insert(Image { id: 0, mime_type, name });
ctx.db.image_data().insert(ImageData { image_id: img.id, data });
let img = ctx.db.image().insert(Image {
id: 0,
mime_type,
name,
});
ctx.db.image_data().insert(ImageData {
image_id: img.id,
data,
});
if let Some(ref cid) = client_id {
ctx.db.upload_status().client_id().update(UploadStatus {
client_id: cid.clone(), identity: ctx.sender(), image_id: Some(img.id), status: "success".to_string(), error: None,
client_id: cid.clone(),
identity: ctx.sender(),
image_id: Some(img.id),
status: "success".to_string(),
error: None,
});
}
}
@@ -49,14 +80,28 @@ pub fn clear_upload_status(ctx: &ReducerContext, client_id: String) {
#[spacetimedb::reducer]
pub fn upload_custom_emoji(ctx: &ReducerContext, name: String, category: String, data: Vec<u8>) {
if data.len() > 256 * 1024 { panic!("Emoji image exceeds 256KB limit"); }
ctx.db.custom_emoji().insert(CustomEmoji { id: 0, name, category, data });
if data.len() > 256 * 1024 {
panic!("Emoji image exceeds 256KB limit");
}
ctx.db.custom_emoji().insert(CustomEmoji {
id: 0,
name,
category,
data,
});
}
#[spacetimedb::reducer]
pub fn upload_avatar(ctx: &ReducerContext, data: Vec<u8>, mime_type: String) {
let img = ctx.db.image().insert(Image { id: 0, mime_type, name: Some("avatar".to_string()) });
ctx.db.image_data().insert(ImageData { image_id: img.id, data });
let img = ctx.db.image().insert(Image {
id: 0,
mime_type,
name: Some("avatar".to_string()),
});
ctx.db.image_data().insert(ImageData {
image_id: img.id,
data,
});
if let Some(mut user) = ctx.db.user().identity().find(ctx.sender()) {
user.avatar_id = Some(img.id);
ctx.db.user().identity().update(user);
@@ -66,8 +111,15 @@ pub fn upload_avatar(ctx: &ReducerContext, data: Vec<u8>, mime_type: String) {
#[spacetimedb::reducer]
pub fn upload_banner(ctx: &ReducerContext, data: Vec<u8>, mime_type: String) {
let img = ctx.db.image().insert(Image { id: 0, mime_type, name: Some("banner".to_string()) });
ctx.db.image_data().insert(ImageData { image_id: img.id, data });
let img = ctx.db.image().insert(Image {
id: 0,
mime_type,
name: Some("banner".to_string()),
});
ctx.db.image_data().insert(ImageData {
image_id: img.id,
data,
});
if let Some(mut user) = ctx.db.user().identity().find(ctx.sender()) {
user.banner_id = Some(img.id);
ctx.db.user().identity().update(user);
@@ -83,10 +135,27 @@ pub fn set_banner(ctx: &ReducerContext, banner_id: Option<u64>) {
}
#[spacetimedb::reducer]
pub fn upload_server_avatar(ctx: &ReducerContext, server_id: u64, data: Vec<u8>, mime_type: String) {
let mut s = ctx.db.server().id().find(server_id).expect("Server not found");
let img = ctx.db.image().insert(Image { id: 0, mime_type, name: Some("server_avatar".to_string()) });
ctx.db.image_data().insert(ImageData { image_id: img.id, data });
pub fn upload_server_avatar(
ctx: &ReducerContext,
server_id: u64,
data: Vec<u8>,
mime_type: String,
) {
let mut s = ctx
.db
.server()
.id()
.find(server_id)
.expect("Server not found");
let img = ctx.db.image().insert(Image {
id: 0,
mime_type,
name: Some("server_avatar".to_string()),
});
ctx.db.image_data().insert(ImageData {
image_id: img.id,
data,
});
s.avatar_id = Some(img.id);
ctx.db.server().id().update(s);
}
@@ -94,7 +163,12 @@ pub fn upload_server_avatar(ctx: &ReducerContext, server_id: u64, data: Vec<u8>,
#[spacetimedb::reducer]
pub fn update_server_name(ctx: &ReducerContext, server_id: u64, name: String) {
validate_name(&name).expect("Invalid name");
let mut s = ctx.db.server().id().find(server_id).expect("Server not found");
let mut s = ctx
.db
.server()
.id()
.find(server_id)
.expect("Server not found");
s.name = name;
ctx.db.server().id().update(s);
}
@@ -102,10 +176,14 @@ pub fn update_server_name(ctx: &ReducerContext, server_id: u64, name: String) {
#[spacetimedb::reducer]
pub fn delete_server(ctx: &ReducerContext, server_id: u64) {
for c in ctx.db.channel().server_id().filter(server_id) {
for msg in ctx.db.message().channel_id().filter(c.id) { ctx.db.message().id().delete(msg.id); }
for msg in ctx.db.message().channel_id().filter(c.id) {
ctx.db.message().id().delete(msg.id);
}
ctx.db.channel().id().delete(c.id);
}
for m in ctx.db.server_member().server_id().filter(server_id) { ctx.db.server_member().id().delete(m.id); }
for m in ctx.db.server_member().server_id().filter(server_id) {
ctx.db.server_member().id().delete(m.id);
}
ctx.db.server().id().delete(server_id);
}
@@ -124,13 +202,20 @@ pub fn edit_message(ctx: &ReducerContext, message_id: u64, new_text: String) {
#[spacetimedb::reducer]
pub fn delete_message(ctx: &ReducerContext, message_id: u64) {
if let Some(msg) = ctx.db.message().id().find(message_id) {
if msg.sender == ctx.sender() { ctx.db.message().id().delete(message_id); }
if msg.sender == ctx.sender() {
ctx.db.message().id().delete(message_id);
}
}
}
#[spacetimedb::reducer]
pub fn set_server_public(ctx: &ReducerContext, server_id: u64, public: bool) {
let mut s = ctx.db.server().id().find(server_id).expect("Server not found");
let mut s = ctx
.db
.server()
.id()
.find(server_id)
.expect("Server not found");
if s.owner == Some(ctx.sender()) {
s.public = public;
ctx.db.server().id().update(s);
@@ -138,11 +223,25 @@ pub fn set_server_public(ctx: &ReducerContext, server_id: u64, public: bool) {
}
#[spacetimedb::reducer]
pub fn toggle_reaction(ctx: &ReducerContext, message_id: u64, emoji: Option<String>, custom_emoji_id: Option<u64>) {
pub fn toggle_reaction(
ctx: &ReducerContext,
message_id: u64,
emoji: Option<String>,
custom_emoji_id: Option<u64>,
) {
if let Some(mut msg) = ctx.db.message().id().find(message_id) {
let existing_idx = msg.reactions.iter().position(|r| r.identity == ctx.sender() && r.emoji == emoji && r.custom_emoji_id == custom_emoji_id);
if let Some(idx) = existing_idx { msg.reactions.remove(idx); }
else { msg.reactions.push(Reaction { identity: ctx.sender(), emoji, custom_emoji_id }); }
let existing_idx = msg.reactions.iter().position(|r| {
r.identity == ctx.sender() && r.emoji == emoji && r.custom_emoji_id == custom_emoji_id
});
if let Some(idx) = existing_idx {
msg.reactions.remove(idx);
} else {
msg.reactions.push(Reaction {
identity: ctx.sender(),
emoji,
custom_emoji_id,
});
}
ctx.db.message().id().update(msg);
}
}
@@ -192,18 +291,43 @@ pub fn set_status(ctx: &ReducerContext, status: Option<String>) {
#[spacetimedb::reducer]
pub fn subscribe_to_channel(ctx: &ReducerContext, channel_id: u64) {
let current_max = ctx.db.channel_internal_state().channel_id().find(channel_id).map(|c| c.last_seq_id).unwrap_or(0);
let current_max = ctx
.db
.channel_internal_state()
.channel_id()
.find(channel_id)
.map(|c| c.last_seq_id)
.unwrap_or(0);
let limit = get_recent_message_limit(&ctx.db);
let earliest = if current_max >= limit { current_max - (limit - 1) } else { 1 };
let earliest = if current_max >= limit {
current_max - (limit - 1)
} else {
1
};
let mut sub = ctx
.db
.channel_subscription()
.identity()
.find(ctx.sender())
.unwrap_or(ChannelSubscription {
identity: ctx.sender(),
channel_id,
earliest_seq_id: earliest,
last_read_seq_id: current_max,
});
let mut sub = ctx.db.channel_subscription().identity().find(ctx.sender())
.unwrap_or(ChannelSubscription { identity: ctx.sender(), channel_id, earliest_seq_id: earliest, last_read_seq_id: current_max });
sub.channel_id = channel_id;
sub.earliest_seq_id = earliest;
sub.last_read_seq_id = current_max;
if ctx.db.channel_subscription().identity().find(ctx.sender()).is_some() {
if ctx
.db
.channel_subscription()
.identity()
.find(ctx.sender())
.is_some()
{
ctx.db.channel_subscription().identity().update(sub);
} else {
ctx.db.channel_subscription().insert(sub);
@@ -213,7 +337,10 @@ pub fn subscribe_to_channel(ctx: &ReducerContext, channel_id: u64) {
#[spacetimedb::reducer]
pub fn request_image_blob(ctx: &ReducerContext, image_id: u64) {
ctx.db.image_blob_request().identity().delete(ctx.sender());
ctx.db.image_blob_request().insert(ImageBlobRequest { identity: ctx.sender(), image_id });
ctx.db.image_blob_request().insert(ImageBlobRequest {
identity: ctx.sender(),
image_id,
});
}
#[spacetimedb::reducer]
@@ -231,38 +358,95 @@ pub fn create_server(ctx: &ReducerContext, name: String) {
if let Err(e) = validate_name(&name) {
return report_error(&ctx.db, ctx.sender(), "create_server", &e, ctx.timestamp);
}
let user = match ctx.db.user().identity().find(ctx.sender()) {
Some(u) => u,
None => return report_error(&ctx.db, ctx.sender(), "create_server", "User not found", ctx.timestamp),
None => {
return report_error(
&ctx.db,
ctx.sender(),
"create_server",
"User not found",
ctx.timestamp,
);
}
};
let s = ctx.db.server().insert(Server { id: 0, name: name.clone(), owner: Some(ctx.sender()), avatar_id: None, channels: Vec::new(), public: false });
ctx.db.server_member().insert(ServerMember { id: 0, identity: ctx.sender(), server_id: s.id, name: user.name.clone(), avatar_id: user.avatar_id, online: user.online });
let c1 = ctx.db.channel().insert(Channel { id: 0, server_id: s.id, name: "general".to_string(), kind: ChannelKind::Text });
let c2 = ctx.db.channel().insert(Channel { id: 0, server_id: s.id, name: "voice".to_string(), kind: ChannelKind::Voice });
let s = ctx.db.server().insert(Server {
id: 0,
name: name.clone(),
owner: Some(ctx.sender()),
avatar_id: None,
channels: Vec::new(),
public: false,
});
ctx.db.server_member().insert(ServerMember {
id: 0,
identity: ctx.sender(),
server_id: s.id,
name: user.name.clone(),
avatar_id: user.avatar_id,
online: user.online,
});
let c1 = ctx.db.channel().insert(Channel {
id: 0,
server_id: s.id,
name: "general".to_string(),
kind: ChannelKind::Text,
});
let c2 = ctx.db.channel().insert(Channel {
id: 0,
server_id: s.id,
name: "voice".to_string(),
kind: ChannelKind::Voice,
});
let mut s = ctx.db.server().id().find(s.id).unwrap();
s.channels.push(ChannelMetadata { id: c1.id, name: c1.name, kind: c1.kind });
s.channels.push(ChannelMetadata { id: c2.id, name: c2.name, kind: c2.kind });
s.channels.push(ChannelMetadata {
id: c1.id,
name: c1.name,
kind: c1.kind,
});
s.channels.push(ChannelMetadata {
id: c2.id,
name: c2.name,
kind: c2.kind,
});
ctx.db.server().id().update(s.clone());
sync_server_access(&ctx.db, ctx.sender(), s.id);
report_success(&ctx.db, ctx.sender(), "create_server", ctx.timestamp);
}
use rand::distributions::{Alphanumeric, DistString};
#[spacetimedb::reducer]
pub fn create_invite(ctx: &ReducerContext, server_id: u64, max_uses: Option<u32>, expires_in_hrs: Option<u32>) {
pub fn create_invite(
ctx: &ReducerContext,
server_id: u64,
max_uses: Option<u32>,
expires_in_hrs: Option<u32>,
) {
// Only members can invite
if !ctx.db.server_member().identity().filter(ctx.sender()).any(|m| m.server_id == server_id) {
return report_error(&ctx.db, ctx.sender(), "create_invite", "Only server members can create invites", ctx.timestamp);
if !ctx
.db
.server_member()
.identity()
.filter(ctx.sender())
.any(|m| m.server_id == server_id)
{
return report_error(
&ctx.db,
ctx.sender(),
"create_invite",
"Only server members can create invites",
ctx.timestamp,
);
}
// Generate a 12-character alphanumeric code using the provided deterministic RNG
let code = Alphanumeric.sample_string(&mut ctx.rng(), 12);
let expires_at = expires_in_hrs.map(|h| {
let duration = spacetimedb::TimeDuration::from_micros(h as i64 * 3600 * 1000000);
ctx.timestamp + duration
@@ -283,52 +467,108 @@ pub fn create_invite(ctx: &ReducerContext, server_id: u64, max_uses: Option<u32>
#[spacetimedb::reducer]
pub fn join_server(ctx: &ReducerContext, server_id: Option<u64>, invite_code: Option<String>) {
let sender = ctx.sender();
let user = match ctx.db.user().identity().find(sender) {
Some(u) => u,
None => return report_error(&ctx.db, sender, "join_server", "User not found", ctx.timestamp),
None => {
return report_error(
&ctx.db,
sender,
"join_server",
"User not found",
ctx.timestamp,
);
}
};
let target_server_id = if let Some(id) = server_id {
id
} else if let Some(ref code) = invite_code {
let invite = match ctx.db.invite().code().find(code.clone()) {
Some(i) => i,
None => return report_error(&ctx.db, sender, "join_server", "Invalid invite code", ctx.timestamp),
None => {
return report_error(
&ctx.db,
sender,
"join_server",
"Invalid invite code",
ctx.timestamp,
);
}
};
invite.server_id
} else {
return report_error(&ctx.db, sender, "join_server", "Either server_id or invite_code must be provided", ctx.timestamp);
return report_error(
&ctx.db,
sender,
"join_server",
"Either server_id or invite_code must be provided",
ctx.timestamp,
);
};
let s = match ctx.db.server().id().find(target_server_id) {
Some(s) => s,
None => return report_error(&ctx.db, sender, "join_server", "Server not found", ctx.timestamp),
None => {
return report_error(
&ctx.db,
sender,
"join_server",
"Server not found",
ctx.timestamp,
);
}
};
// Permission check: if private, must have a valid invite code
if !s.public {
if let Some(code) = invite_code {
let invite = match ctx.db.invite().code().find(code) {
Some(i) => i,
None => return report_error(&ctx.db, sender, "join_server", "Invalid invite code", ctx.timestamp),
None => {
return report_error(
&ctx.db,
sender,
"join_server",
"Invalid invite code",
ctx.timestamp,
);
}
};
if invite.server_id != target_server_id {
return report_error(&ctx.db, sender, "join_server", "Invite code does not match server", ctx.timestamp);
if invite.server_id != target_server_id {
return report_error(
&ctx.db,
sender,
"join_server",
"Invite code does not match server",
ctx.timestamp,
);
}
// Expiry check
if let Some(expiry) = invite.expires_at {
if ctx.timestamp > expiry {
return report_error(&ctx.db, sender, "join_server", "Invite code expired", ctx.timestamp);
if ctx.timestamp > expiry {
return report_error(
&ctx.db,
sender,
"join_server",
"Invite code expired",
ctx.timestamp,
);
}
}
// Uses check
if let Some(mut uses) = invite.uses_remaining {
if uses == 0 {
return report_error(&ctx.db, sender, "join_server", "Invite code usage limit reached", ctx.timestamp);
if uses == 0 {
return report_error(
&ctx.db,
sender,
"join_server",
"Invite code usage limit reached",
ctx.timestamp,
);
}
uses -= 1;
let mut invite = invite.clone();
@@ -336,35 +576,87 @@ pub fn join_server(ctx: &ReducerContext, server_id: Option<u64>, invite_code: Op
ctx.db.invite().code().update(invite);
}
} else {
return report_error(&ctx.db, sender, "join_server", "Invite code required for private server", ctx.timestamp);
return report_error(
&ctx.db,
sender,
"join_server",
"Invite code required for private server",
ctx.timestamp,
);
}
}
if !s.public && user.subject.is_none() && user.name.is_none() {
return report_error(&ctx.db, sender, "join_server", "DisplayName required for private server", ctx.timestamp);
if !s.public && user.subject.is_none() && user.name.is_none() {
return report_error(
&ctx.db,
sender,
"join_server",
"DisplayName required for private server",
ctx.timestamp,
);
}
if ctx.db.server_member().identity().filter(sender).any(|m| m.server_id == target_server_id) {
return report_error(&ctx.db, sender, "join_server", "Already a member of this server", ctx.timestamp);
if ctx
.db
.server_member()
.identity()
.filter(sender)
.any(|m| m.server_id == target_server_id)
{
return report_error(
&ctx.db,
sender,
"join_server",
"Already a member of this server",
ctx.timestamp,
);
}
ctx.db.server_member().insert(ServerMember { id: 0, identity: sender, server_id: target_server_id, name: user.name.clone(), avatar_id: user.avatar_id, online: user.online });
ctx.db.server_member().insert(ServerMember {
id: 0,
identity: sender,
server_id: target_server_id,
name: user.name.clone(),
avatar_id: user.avatar_id,
online: user.online,
});
sync_server_access(&ctx.db, sender, target_server_id);
// Record success
report_success_with_payload(&ctx.db, sender, "join_server", &target_server_id.to_string(), ctx.timestamp);
report_success_with_payload(
&ctx.db,
sender,
"join_server",
&target_server_id.to_string(),
ctx.timestamp,
);
}
#[spacetimedb::reducer]
pub fn leave_server(ctx: &ReducerContext, server_id: u64) {
let sender = ctx.sender();
let members: Vec<_> = ctx.db.server_member().identity().filter(sender).filter(|m| m.server_id == server_id).collect();
for m in members { ctx.db.server_member().id().delete(m.id); }
let members: Vec<_> = ctx
.db
.server_member()
.identity()
.filter(sender)
.filter(|m| m.server_id == server_id)
.collect();
for m in members {
ctx.db.server_member().id().delete(m.id);
}
// Also delete any invites this user created for this server
let invites: Vec<_> = ctx.db.invite().iter().filter(|i| i.inviter == sender && i.server_id == server_id).collect();
for i in invites { ctx.db.invite().code().delete(i.code); }
let invites: Vec<_> = ctx
.db
.invite()
.iter()
.filter(|i| i.inviter == sender && i.server_id == server_id)
.collect();
for i in invites {
ctx.db.invite().code().delete(i.code);
}
revoke_server_access(&ctx.db, sender, server_id);
report_success(&ctx.db, sender, "leave_server", ctx.timestamp);
}
@@ -376,17 +668,46 @@ pub fn create_channel(ctx: &ReducerContext, name: String, server_id: u64, is_voi
}
let mut s = match ctx.db.server().id().find(server_id) {
Some(s) => s,
None => return report_error(&ctx.db, ctx.sender(), "create_channel", "Server not found", ctx.timestamp),
None => {
return report_error(
&ctx.db,
ctx.sender(),
"create_channel",
"Server not found",
ctx.timestamp,
);
}
};
if s.owner != Some(ctx.sender()) {
return report_error(&ctx.db, ctx.sender(), "create_channel", "Only owner can create channels", ctx.timestamp);
return report_error(
&ctx.db,
ctx.sender(),
"create_channel",
"Only owner can create channels",
ctx.timestamp,
);
}
let kind = if is_voice { ChannelKind::Voice } else { ChannelKind::Text };
let chan = ctx.db.channel().insert(Channel { id: 0, server_id, name: name.clone(), kind });
s.channels.push(ChannelMetadata { id: chan.id, name, kind });
let kind = if is_voice {
ChannelKind::Voice
} else {
ChannelKind::Text
};
let chan = ctx.db.channel().insert(Channel {
id: 0,
server_id,
name: name.clone(),
kind,
});
s.channels.push(ChannelMetadata {
id: chan.id,
name,
kind,
});
ctx.db.server().id().update(s);
for m in ctx.db.server_member().server_id().filter(server_id) { grant_user_channel_access(&ctx.db, m.identity, chan.id); }
for m in ctx.db.server_member().server_id().filter(server_id) {
grant_user_channel_access(&ctx.db, m.identity, chan.id);
}
report_success(&ctx.db, ctx.sender(), "create_channel", ctx.timestamp);
}
@@ -395,11 +716,22 @@ pub fn join_voice(ctx: &ReducerContext, channel_id: u64) {
if let Some(mut state) = ctx.db.user_state().identity().find(ctx.sender()) {
if state.channel_id != channel_id {
clear_signaling_for_user(&ctx.db, ctx.sender());
state.channel_id = channel_id; state.is_sharing_screen = false; state.is_talking = false; state.watching = None;
state.channel_id = channel_id;
state.is_sharing_screen = false;
state.is_talking = false;
state.watching = None;
ctx.db.user_state().identity().update(state);
}
} else {
ctx.db.user_state().insert(UserState { identity: ctx.sender(), channel_id, is_sharing_screen: false, is_muted: false, is_deafened: false, is_talking: false, watching: None });
ctx.db.user_state().insert(UserState {
identity: ctx.sender(),
channel_id,
is_sharing_screen: false,
is_muted: false,
is_deafened: false,
is_talking: false,
watching: None,
});
}
}
@@ -409,8 +741,16 @@ pub fn set_sharing_screen(ctx: &ReducerContext, sharing: bool) {
state.is_sharing_screen = sharing;
ctx.db.user_state().identity().update(state);
if !sharing {
let watchers: Vec<_> = ctx.db.user_state().iter().filter(|s| s.watching == Some(ctx.sender())).collect();
for mut w in watchers { w.watching = None; ctx.db.user_state().identity().update(w); }
let watchers: Vec<_> = ctx
.db
.user_state()
.iter()
.filter(|s| s.watching == Some(ctx.sender()))
.collect();
for mut w in watchers {
w.watching = None;
ctx.db.user_state().identity().update(w);
}
clear_signaling_for_user(&ctx.db, ctx.sender());
}
}
@@ -459,31 +799,80 @@ pub fn stop_watching(ctx: &ReducerContext) {
}
#[spacetimedb::reducer]
pub fn leave_voice(ctx: &ReducerContext) { clear_user_presence(&ctx.db, ctx.sender()); }
#[spacetimedb::reducer]
pub fn send_webrtc_signal(ctx: &ReducerContext, receiver: Identity, signal_kind: SignalKind, media_type: MediaType, data: String, channel_id: u64) {
ctx.db.webrtc_signal().insert(WebRTCSignal { id: 0, sender: ctx.sender(), receiver, signal_kind, media_type, data, channel_id });
pub fn leave_voice(ctx: &ReducerContext) {
clear_user_presence(&ctx.db, ctx.sender());
}
#[spacetimedb::reducer]
pub fn send_message(ctx: &ReducerContext, text: String, channel_id: u64, thread_id: Option<u64>, image_ids: Vec<u64>, is_encrypted: bool) {
internal_send_message(&ctx.db, ctx.sender(), channel_id, text, ctx.timestamp, thread_id, image_ids, is_encrypted);
pub fn send_webrtc_signal(
ctx: &ReducerContext,
receiver: Identity,
signal_kind: SignalKind,
media_type: MediaType,
data: String,
channel_id: u64,
) {
ctx.db.webrtc_signal().insert(WebRTCSignal {
id: 0,
sender: ctx.sender(),
receiver,
signal_kind,
media_type,
data,
channel_id,
});
}
#[spacetimedb::reducer]
pub fn send_message(
ctx: &ReducerContext,
text: String,
channel_id: u64,
thread_id: Option<u64>,
image_ids: Vec<u64>,
is_encrypted: bool,
) {
internal_send_message(
&ctx.db,
ctx.sender(),
channel_id,
text,
ctx.timestamp,
thread_id,
image_ids,
is_encrypted,
);
}
#[spacetimedb::reducer]
pub fn set_configuration(ctx: &ReducerContext, key: String, value: String) {
let config = SystemConfiguration { key: key.clone(), value };
if ctx.db.system_configuration().key().find(key).is_some() { ctx.db.system_configuration().key().update(config); }
else { ctx.db.system_configuration().insert(config); }
let config = SystemConfiguration {
key: key.clone(),
value,
};
if ctx.db.system_configuration().key().find(key).is_some() {
ctx.db.system_configuration().key().update(config);
} else {
ctx.db.system_configuration().insert(config);
}
}
#[spacetimedb::reducer]
pub fn create_thread(ctx: &ReducerContext, name: String, channel_id: u64, parent_message_id: u64) {
let mut thread_name = name;
if thread_name.trim().is_empty() {
thread_name = ctx.db.message().id().find(parent_message_id)
.map(|m| if m.text.trim().is_empty() { "New Thread".to_string() } else { m.text.chars().take(32).collect() })
thread_name = ctx
.db
.message()
.id()
.find(parent_message_id)
.map(|m| {
if m.text.trim().is_empty() {
"New Thread".to_string()
} else {
m.text.chars().take(32).collect()
}
})
.unwrap_or("New Thread".to_string());
}
if let Err(e) = validate_name(&thread_name) {
@@ -492,62 +881,128 @@ pub fn create_thread(ctx: &ReducerContext, name: String, channel_id: u64, parent
if let Some(mut pm) = ctx.db.message().id().find(parent_message_id) {
if pm.channel_id != channel_id {
return report_error(&ctx.db, ctx.sender(), "create_thread", "Channel ID mismatch", ctx.timestamp);
return report_error(
&ctx.db,
ctx.sender(),
"create_thread",
"Channel ID mismatch",
ctx.timestamp,
);
}
pm.thread_name = Some(thread_name);
pm.thread_name = Some(thread_name);
ctx.db.message().id().update(pm);
report_success(&ctx.db, ctx.sender(), "create_thread", ctx.timestamp);
} else {
report_error(&ctx.db, ctx.sender(), "create_thread", "Parent message not found", ctx.timestamp);
report_error(
&ctx.db,
ctx.sender(),
"create_thread",
"Parent message not found",
ctx.timestamp,
);
}
}
#[spacetimedb::reducer]
pub fn create_thread_with_message(ctx: &ReducerContext, name: String, channel_id: u64, parent_message_id: u64, text: String, image_ids: Vec<u64>, is_encrypted: bool) {
pub fn create_thread_with_message(
ctx: &ReducerContext,
name: String,
channel_id: u64,
parent_message_id: u64,
text: String,
image_ids: Vec<u64>,
is_encrypted: bool,
) {
if let Err(e) = validate_name(&name) {
return report_error(&ctx.db, ctx.sender(), "create_thread", &e, ctx.timestamp);
}
if let Some(mut pm) = ctx.db.message().id().find(parent_message_id) {
if pm.channel_id != channel_id {
return report_error(&ctx.db, ctx.sender(), "create_thread", "Channel ID mismatch", ctx.timestamp);
return report_error(
&ctx.db,
ctx.sender(),
"create_thread",
"Channel ID mismatch",
ctx.timestamp,
);
}
pm.thread_name = Some(name);
pm.thread_reply_count += 1;
ctx.db.message().id().update(pm);
internal_send_message(&ctx.db, ctx.sender(), channel_id, text, ctx.timestamp, Some(parent_message_id), image_ids, is_encrypted);
pm.thread_name = Some(name);
pm.thread_reply_count += 1;
let pm = ctx.db.message().id().update(pm);
sync_recent_message(&ctx.db, pm);
internal_send_message(
&ctx.db,
ctx.sender(),
channel_id,
text,
ctx.timestamp,
Some(parent_message_id),
image_ids,
is_encrypted,
);
report_success(&ctx.db, ctx.sender(), "create_thread", ctx.timestamp);
} else {
report_error(&ctx.db, ctx.sender(), "create_thread", "Parent message not found", ctx.timestamp);
report_error(
&ctx.db,
ctx.sender(),
"create_thread",
"Parent message not found",
ctx.timestamp,
);
}
}
#[spacetimedb::reducer]
pub fn bootstrap_sequences(ctx: &ReducerContext) {
let internal_states: Vec<_> = ctx.db.channel_internal_state().iter().collect();
for s in internal_states { ctx.db.channel_internal_state().channel_id().delete(s.channel_id); }
for s in internal_states {
ctx.db
.channel_internal_state()
.channel_id()
.delete(s.channel_id);
}
let all_access: Vec<_> = ctx.db.user_channel_access().iter().collect();
for a in all_access { ctx.db.user_channel_access().id().delete(a.id); }
for a in all_access {
ctx.db.user_channel_access().id().delete(a.id);
}
for mut msg in ctx.db.message().iter() {
let seq_id = get_next_seq_id(&ctx.db, msg.channel_id);
msg.seq_id = seq_id; ctx.db.message().id().update(msg);
msg.seq_id = seq_id;
ctx.db.message().id().update(msg);
}
for m in ctx.db.server_member().iter() {
sync_server_access(&ctx.db, m.identity.clone(), m.server_id);
}
for dm in ctx.db.direct_message().iter() {
grant_user_channel_access(&ctx.db, dm.sender, dm.channel_id);
grant_user_channel_access(&ctx.db, dm.recipient, dm.channel_id);
}
for m in ctx.db.server_member().iter() { sync_server_access(&ctx.db, m.identity.clone(), m.server_id); }
for dm in ctx.db.direct_message().iter() { grant_user_channel_access(&ctx.db, dm.sender, dm.channel_id); grant_user_channel_access(&ctx.db, dm.recipient, dm.channel_id); }
}
#[spacetimedb::reducer]
pub fn open_direct_message(ctx: &ReducerContext, recipient: Identity) {
if ctx.sender() != recipient { internal_open_direct_message(&ctx.db, ctx.sender(), recipient); }
if ctx.sender() != recipient {
internal_open_direct_message(&ctx.db, ctx.sender(), recipient);
}
}
#[spacetimedb::reducer]
pub fn close_direct_message(ctx: &ReducerContext, channel_id: u64) {
if let Some(mut dm) = ctx.db.direct_message().channel_id().filter(channel_id).next() {
if dm.sender == ctx.sender() { dm.is_open_sender = false; }
else if dm.recipient == ctx.sender() { dm.is_open_recipient = false; }
if let Some(mut dm) = ctx
.db
.direct_message()
.channel_id()
.filter(channel_id)
.next()
{
if dm.sender == ctx.sender() {
dm.is_open_sender = false;
} else if dm.recipient == ctx.sender() {
dm.is_open_recipient = false;
}
ctx.db.direct_message().id().update(dm);
}
}
@@ -556,7 +1011,10 @@ pub fn close_direct_message(ctx: &ReducerContext, channel_id: u64) {
pub fn open_thread(ctx: &ReducerContext, thread_id: u64) {
let identity = ctx.sender();
ctx.db.thread_subscription().identity().delete(identity);
ctx.db.thread_subscription().insert(ThreadSubscription { identity, thread_id });
ctx.db.thread_subscription().insert(ThreadSubscription {
identity,
thread_id,
});
}
#[spacetimedb::reducer]
+42 -1
View File
@@ -130,7 +130,6 @@ pub enum SignalKind {
IceCandidate,
}
#[derive(spacetimedb::SpacetimeType, Clone, Copy, Debug, PartialEq)]
pub enum MediaType {
Voice,
@@ -206,6 +205,48 @@ pub struct Message {
pub seq_id: u64,
}
#[spacetimedb::table(accessor = recent_message, index(accessor = channel_seq, btree(columns = [channel_id, seq_id])))]
#[derive(Clone)]
pub struct RecentMessage {
#[primary_key]
pub id: u64,
pub sender: Identity,
pub sent: Timestamp,
pub text: String,
#[index(btree)]
pub channel_id: u64,
pub server_id: u64,
pub thread_id: Option<u64>,
pub reactions: Vec<Reaction>,
pub image_ids: Vec<u64>,
pub thread_name: Option<String>,
pub thread_reply_count: u32,
pub edited: bool,
pub is_encrypted: bool,
pub seq_id: u64,
}
impl From<RecentMessage> for Message {
fn from(m: RecentMessage) -> Self {
Self {
id: m.id,
sender: m.sender,
sent: m.sent,
text: m.text,
channel_id: m.channel_id,
server_id: m.server_id,
thread_id: m.thread_id,
reactions: m.reactions,
image_ids: m.image_ids,
thread_name: m.thread_name,
thread_reply_count: m.thread_reply_count,
edited: m.edited,
is_encrypted: m.is_encrypted,
seq_id: m.seq_id,
}
}
}
#[spacetimedb::table(accessor = custom_emoji, public)]
#[derive(Clone)]
pub struct CustomEmoji {
+201 -40
View File
@@ -27,7 +27,7 @@ pub fn validate_message_length(db: &Local, text: &str) -> Result<(), String> {
Ok(())
}
pub fn get_recent_message_limit(db: &Local) -> u64 {
pub fn get_recent_message_limit_read_only(db: &LocalReadOnly) -> u64 {
db.system_configuration()
.key()
.find("recent_message_limit".to_string())
@@ -35,7 +35,7 @@ pub fn get_recent_message_limit(db: &Local) -> u64 {
.unwrap_or(50)
}
pub fn get_recent_message_limit_read_only(db: &LocalReadOnly) -> u64 {
pub fn get_recent_message_limit(db: &Local) -> u64 {
db.system_configuration()
.key()
.find("recent_message_limit".to_string())
@@ -60,11 +60,19 @@ pub fn get_next_seq_id(db: &Local, channel_id: u64) -> u64 {
/// Simplified: uses UserChannelAccess table directly
pub fn get_visible_message_ids(db: &Local, identity: Identity) -> HashSet<u64> {
db.user_channel_access().identity().filter(identity).map(|a| a.channel_id).collect()
db.user_channel_access()
.identity()
.filter(identity)
.map(|a| a.channel_id)
.collect()
}
pub fn get_visible_message_ids_read_only(db: &LocalReadOnly, identity: Identity) -> HashSet<u64> {
db.user_channel_access().identity().filter(identity).map(|a| a.channel_id).collect()
db.user_channel_access()
.identity()
.filter(identity)
.map(|a| a.channel_id)
.collect()
}
pub fn get_visible_image_ids(db: &Local, identity: Identity) -> HashSet<u64> {
@@ -81,38 +89,57 @@ pub fn get_visible_image_ids(db: &Local, identity: Identity) -> HashSet<u64> {
// User's own avatar/banner
if let Some(user) = db.user().identity().find(identity) {
if let Some(id) = user.avatar_id { results.insert(id); }
if let Some(id) = user.banner_id { results.insert(id); }
if let Some(id) = user.avatar_id {
results.insert(id);
}
if let Some(id) = user.banner_id {
results.insert(id);
}
}
// Server avatars for servers I am a member of or are public
let my_server_ids: HashSet<u64> = db.server_member().identity().filter(identity).map(|m| m.server_id).collect();
let my_server_ids: HashSet<u64> = db
.server_member()
.identity()
.filter(identity)
.map(|m| m.server_id)
.collect();
for s in db.server().name().filter(""..) {
if s.public || my_server_ids.contains(&s.id) {
if let Some(id) = s.avatar_id { results.insert(id); }
if let Some(id) = s.avatar_id {
results.insert(id);
}
}
}
// Avatars for members of servers I am in (and redundant check for server avatars stored in membership)
for server_id in my_server_ids {
for member in db.server_member().server_id().filter(server_id) {
if let Some(id) = member.avatar_id { results.insert(id); }
if let Some(id) = member.avatar_id {
results.insert(id);
}
}
// Also check if any server I'm in has an avatar id that might not have been caught in the name filter
if let Some(s) = db.server().id().find(server_id) {
if let Some(id) = s.avatar_id { results.insert(id); }
if let Some(id) = s.avatar_id {
results.insert(id);
}
}
}
// Avatars for DM participants
for dm in db.direct_message().sender().filter(identity) {
if let Some(u) = db.user().identity().find(dm.recipient) {
if let Some(id) = u.avatar_id { results.insert(id); }
if let Some(id) = u.avatar_id {
results.insert(id);
}
}
}
for dm in db.direct_message().recipient().filter(identity) {
if let Some(u) = db.user().identity().find(dm.sender) {
if let Some(id) = u.avatar_id { results.insert(id); }
if let Some(id) = u.avatar_id {
results.insert(id);
}
}
}
@@ -133,38 +160,57 @@ pub fn get_visible_image_ids_read_only(db: &LocalReadOnly, identity: Identity) -
// User's own avatar/banner
if let Some(user) = db.user().identity().find(identity) {
if let Some(id) = user.avatar_id { results.insert(id); }
if let Some(id) = user.banner_id { results.insert(id); }
if let Some(id) = user.avatar_id {
results.insert(id);
}
if let Some(id) = user.banner_id {
results.insert(id);
}
}
// Server avatars for servers I am a member of or are public
let my_server_ids: HashSet<u64> = db.server_member().identity().filter(identity).map(|m| m.server_id).collect();
let my_server_ids: HashSet<u64> = db
.server_member()
.identity()
.filter(identity)
.map(|m| m.server_id)
.collect();
for s in db.server().name().filter(""..) {
if s.public || my_server_ids.contains(&s.id) {
if let Some(id) = s.avatar_id { results.insert(id); }
if let Some(id) = s.avatar_id {
results.insert(id);
}
}
}
// Avatars for members of servers I am in (and redundant check for server avatars stored in membership)
for server_id in my_server_ids {
for member in db.server_member().server_id().filter(server_id) {
if let Some(id) = member.avatar_id { results.insert(id); }
if let Some(id) = member.avatar_id {
results.insert(id);
}
}
// Also check if any server I'm in has an avatar id that might not have been caught in the name filter
if let Some(s) = db.server().id().find(server_id) {
if let Some(id) = s.avatar_id { results.insert(id); }
if let Some(id) = s.avatar_id {
results.insert(id);
}
}
}
// Avatars for DM participants
for dm in db.direct_message().sender().filter(identity) {
if let Some(u) = db.user().identity().find(dm.recipient) {
if let Some(id) = u.avatar_id { results.insert(id); }
if let Some(id) = u.avatar_id {
results.insert(id);
}
}
}
for dm in db.direct_message().recipient().filter(identity) {
if let Some(u) = db.user().identity().find(dm.sender) {
if let Some(id) = u.avatar_id { results.insert(id); }
if let Some(id) = u.avatar_id {
results.insert(id);
}
}
}
@@ -172,17 +218,40 @@ pub fn get_visible_image_ids_read_only(db: &LocalReadOnly, identity: Identity) -
}
pub fn internal_open_direct_message(db: &Local, sender: Identity, recipient: Identity) -> u64 {
let existing = db.direct_message().sender().filter(sender).find(|dm| dm.recipient == recipient)
.or_else(|| db.direct_message().sender().filter(recipient).find(|dm| dm.recipient == sender));
let existing = db
.direct_message()
.sender()
.filter(sender)
.find(|dm| dm.recipient == recipient)
.or_else(|| {
db.direct_message()
.sender()
.filter(recipient)
.find(|dm| dm.recipient == sender)
});
if let Some(mut dm) = existing {
if dm.sender == sender { dm.is_open_sender = true; } else { dm.is_open_recipient = true; }
if dm.sender == sender {
dm.is_open_sender = true;
} else {
dm.is_open_recipient = true;
}
db.direct_message().id().update(dm.clone());
dm.channel_id
} else {
let chan = db.channel().insert(Channel { id: 0, server_id: 0, name: "dm".to_string(), kind: ChannelKind::Text });
let chan = db.channel().insert(Channel {
id: 0,
server_id: 0,
name: "dm".to_string(),
kind: ChannelKind::Text,
});
db.direct_message().insert(DirectMessage {
id: 0, channel_id: chan.id, sender, recipient, is_open_sender: true, is_open_recipient: true,
id: 0,
channel_id: chan.id,
sender,
recipient,
is_open_sender: true,
is_open_recipient: true,
});
grant_user_channel_access(db, sender, chan.id);
grant_user_channel_access(db, recipient, chan.id);
@@ -201,13 +270,70 @@ pub fn internal_send_message(
is_encrypted: bool,
) {
let seq_id = get_next_seq_id(db, channel_id);
let server_id = db.channel().id().find(channel_id).map(|c| c.server_id).unwrap_or(0);
let server_id = db
.channel()
.id()
.find(channel_id)
.map(|c| c.server_id)
.unwrap_or(0);
db.message().insert(Message {
id: 0, sender, sent: timestamp, text, channel_id, server_id, thread_id,
reactions: Vec::new(), image_ids, thread_name: None, thread_reply_count: 0,
edited: false, is_encrypted, seq_id,
let msg = db.message().insert(Message {
id: 0,
sender,
sent: timestamp,
text,
channel_id,
server_id,
thread_id,
reactions: Vec::new(),
image_ids,
thread_name: None,
thread_reply_count: 0,
edited: false,
is_encrypted,
seq_id,
});
sync_recent_message(db, msg);
}
pub fn sync_recent_message(db: &Local, msg: Message) {
// 1. Insert into recent table
db.recent_message().insert(RecentMessage {
id: msg.id,
sender: msg.sender,
sent: msg.sent,
text: msg.text.clone(),
channel_id: msg.channel_id,
server_id: msg.server_id,
thread_id: msg.thread_id,
reactions: msg.reactions.clone(),
image_ids: msg.image_ids.clone(),
thread_name: msg.thread_name.clone(),
thread_reply_count: msg.thread_reply_count,
edited: msg.edited,
is_encrypted: msg.is_encrypted,
seq_id: msg.seq_id,
});
// 2. Prune to 50
let limit = get_recent_message_limit(db);
let count = db
.recent_message()
.channel_id()
.filter(msg.channel_id)
.count();
if count > limit as usize {
// Find the oldest message by seq_id for this channel in the recent table
if let Some(oldest) = db
.recent_message()
.channel_seq()
.filter((msg.channel_id, 0u64..))
.next()
{
db.recent_message().id().delete(oldest.id);
}
}
}
pub fn sync_server_member_info(db: &Local, identity: Identity) {
@@ -222,14 +348,25 @@ pub fn sync_server_member_info(db: &Local, identity: Identity) {
}
pub fn grant_user_channel_access(db: &Local, identity: Identity, channel_id: u64) {
let exists = db.user_channel_access().identity().filter(identity).any(|a| a.channel_id == channel_id);
let exists = db
.user_channel_access()
.identity()
.filter(identity)
.any(|a| a.channel_id == channel_id);
if !exists {
db.user_channel_access().insert(UserChannelAccess { id: 0, identity, channel_id });
db.user_channel_access().insert(UserChannelAccess {
id: 0,
identity,
channel_id,
});
}
}
pub fn revoke_user_channel_access(db: &Local, identity: Identity, channel_id: u64) {
let to_delete: Vec<_> = db.user_channel_access().identity().filter(identity)
let to_delete: Vec<_> = db
.user_channel_access()
.identity()
.filter(identity)
.filter(|a| a.channel_id == channel_id)
.map(|a| a.id)
.collect();
@@ -258,11 +395,24 @@ pub fn clear_user_presence(db: &Local, identity: Identity) {
}
pub fn clear_signaling_for_user(db: &Local, identity: Identity) {
let signals: Vec<_> = db.webrtc_signal().sender().filter(identity).map(|s| s.id).collect();
for id in signals { db.webrtc_signal().id().delete(id); }
let signals: Vec<_> = db
.webrtc_signal()
.sender()
.filter(identity)
.map(|s| s.id)
.collect();
for id in signals {
db.webrtc_signal().id().delete(id);
}
}
pub fn report_error(db: &Local, identity: Identity, reducer_name: &str, error: &str, timestamp: spacetimedb::Timestamp) {
pub fn report_error(
db: &Local,
identity: Identity,
reducer_name: &str,
error: &str,
timestamp: spacetimedb::Timestamp,
) {
db.reducer_status().identity().delete(identity);
db.reducer_status().insert(ReducerStatus {
identity,
@@ -273,7 +423,12 @@ pub fn report_error(db: &Local, identity: Identity, reducer_name: &str, error: &
});
}
pub fn report_success(db: &Local, identity: Identity, reducer_name: &str, timestamp: spacetimedb::Timestamp) {
pub fn report_success(
db: &Local,
identity: Identity,
reducer_name: &str,
timestamp: spacetimedb::Timestamp,
) {
db.reducer_status().identity().delete(identity);
db.reducer_status().insert(ReducerStatus {
identity,
@@ -284,7 +439,13 @@ pub fn report_success(db: &Local, identity: Identity, reducer_name: &str, timest
});
}
pub fn report_success_with_payload(db: &Local, identity: Identity, reducer_name: &str, payload: &str, timestamp: spacetimedb::Timestamp) {
pub fn report_success_with_payload(
db: &Local,
identity: Identity,
reducer_name: &str,
payload: &str,
timestamp: spacetimedb::Timestamp,
) {
db.reducer_status().identity().delete(identity);
db.reducer_status().insert(ReducerStatus {
identity,
+73 -23
View File
@@ -32,7 +32,12 @@ pub fn visible_typing_activity(ctx: &ViewContext) -> Vec<TypingActivity> {
let mut results = std::collections::HashMap::new();
for access in ctx.db.user_channel_access().identity().filter(identity) {
for activity in ctx.db.typing_activity().channel_id().filter(access.channel_id) {
for activity in ctx
.db
.typing_activity()
.channel_id()
.filter(access.channel_id)
{
if activity.typing {
results.insert(activity.identity, activity.clone());
}
@@ -74,27 +79,51 @@ pub fn visible_recent_activity(ctx: &ViewContext) -> Vec<Message> {
let identity = ctx.sender();
let mut results = Vec::new();
// Force dependency tracking
let _ = ctx.db.recent_message().channel_id().filter(0u64..).count();
for access in ctx.db.user_channel_access().identity().filter(identity) {
let last_seq_id = ctx.db.channel_internal_state().channel_id().find(access.channel_id)
.map(|s| s.last_seq_id).unwrap_or(0);
let limit = get_recent_message_limit_read_only(&ctx.db);
let min_seq = if last_seq_id > limit { last_seq_id - (limit - 1) } else { 1 };
// HIGH PERFORMANCE: Uses composite index range scan
for msg in ctx.db.message().channel_seq().filter((access.channel_id, min_seq..)) {
results.push(msg.clone());
for msg in ctx
.db
.recent_message()
.channel_id()
.filter(access.channel_id)
{
results.push(Message {
id: msg.id,
sender: msg.sender,
sent: msg.sent,
text: msg.text.clone(),
channel_id: msg.channel_id,
server_id: msg.server_id,
thread_id: msg.thread_id,
reactions: msg.reactions.clone(),
image_ids: msg.image_ids.clone(),
thread_name: msg.thread_name.clone(),
thread_reply_count: msg.thread_reply_count,
edited: msg.edited,
is_encrypted: msg.is_encrypted,
seq_id: msg.seq_id,
});
}
}
results
}
#[spacetimedb::view(accessor = visible_servers, public)]
pub fn visible_servers(ctx: &ViewContext) -> Vec<Server> {
let identity = ctx.sender();
let my_server_ids: std::collections::HashSet<u64> = ctx.db.server_member().identity().filter(identity).map(|m| m.server_id).collect();
let my_server_ids: std::collections::HashSet<u64> = ctx
.db
.server_member()
.identity()
.filter(identity)
.map(|m| m.server_id)
.collect();
ctx.db.server().name().filter(""..)
ctx.db
.server()
.name()
.filter(""..)
.filter(|s: &Server| s.public || my_server_ids.contains(&s.id))
.map(|s: Server| s.clone())
.collect()
@@ -126,7 +155,10 @@ pub fn visible_channels(ctx: &ViewContext) -> Vec<VisibleChannelRow> {
if let Some(s) = ctx.db.server().id().find(member.server_id) {
for chan_meta in s.channels {
results.push(VisibleChannelRow {
id: chan_meta.id, server_id: s.id, name: chan_meta.name.clone(), kind: chan_meta.kind,
id: chan_meta.id,
server_id: s.id,
name: chan_meta.name.clone(),
kind: chan_meta.kind,
});
}
}
@@ -137,7 +169,10 @@ pub fn visible_channels(ctx: &ViewContext) -> Vec<VisibleChannelRow> {
if let Some(chan) = ctx.db.channel().id().find(access.channel_id) {
if chan.server_id == 0 {
results.push(VisibleChannelRow {
id: chan.id, server_id: 0, name: chan.name.clone(), kind: chan.kind,
id: chan.id,
server_id: 0,
name: chan.name.clone(),
kind: chan.kind,
});
}
}
@@ -149,7 +184,8 @@ pub fn visible_channels(ctx: &ViewContext) -> Vec<VisibleChannelRow> {
#[spacetimedb::view(accessor = visible_direct_messages, public)]
pub fn visible_direct_messages(ctx: &ViewContext) -> impl Query<DirectMessage> {
let identity = ctx.sender();
ctx.from.direct_message()
ctx.from
.direct_message()
.r#where(move |dm| dm.sender.eq(identity).or(dm.recipient.eq(identity)))
}
@@ -176,7 +212,9 @@ pub fn visible_images(ctx: &ViewContext) -> Vec<VisibleImageRow> {
for id in image_ids {
if let Some(img) = ctx.db.image().id().find(id) {
results.push(VisibleImageRow {
id: img.id, mime_type: img.mime_type.clone(), name: img.name.clone(),
id: img.id,
mime_type: img.mime_type.clone(),
name: img.name.clone(),
});
}
}
@@ -215,7 +253,10 @@ pub fn visible_user_states(ctx: &ViewContext) -> Vec<UserState> {
#[spacetimedb::view(accessor = visible_webrtc_signals, public)]
pub fn visible_webrtc_signals(ctx: &ViewContext) -> Vec<WebRTCSignal> {
let identity = ctx.sender();
ctx.db.webrtc_signal().sender().filter(identity)
ctx.db
.webrtc_signal()
.sender()
.filter(identity)
.chain(ctx.db.webrtc_signal().receiver().filter(identity))
.map(|s: WebRTCSignal| s.clone())
.collect()
@@ -227,7 +268,9 @@ pub fn visible_scrollback_messages(ctx: &ViewContext) -> impl Query<Message> {
if let Some(sub) = ctx.db.channel_subscription().identity().find(identity) {
let cid = sub.channel_id;
let min_seq = sub.earliest_seq_id;
ctx.from.message().r#where(move |m| m.channel_id.eq(cid).and(m.seq_id.gte(min_seq)))
ctx.from
.message()
.r#where(move |m| m.channel_id.eq(cid).and(m.seq_id.gte(min_seq)))
} else {
ctx.from.message().r#where(|m| m.id.eq(0))
}
@@ -236,12 +279,19 @@ pub fn visible_scrollback_messages(ctx: &ViewContext) -> impl Query<Message> {
#[spacetimedb::view(accessor = my_channel_subscriptions, public)]
pub fn my_channel_subscriptions(ctx: &ViewContext) -> Vec<MyChannelSubscriptionRow> {
if let Some(sub) = ctx.db.channel_subscription().identity().find(ctx.sender()) {
let last_seq_id = ctx.db.channel_internal_state().channel_id().find(sub.channel_id)
.map(|s| s.last_seq_id).unwrap_or(0);
let last_seq_id = ctx
.db
.channel_internal_state()
.channel_id()
.find(sub.channel_id)
.map(|s| s.last_seq_id)
.unwrap_or(0);
vec![MyChannelSubscriptionRow {
identity: sub.identity, channel_id: sub.channel_id,
earliest_seq_id: sub.earliest_seq_id, last_read_seq_id: sub.last_read_seq_id,
identity: sub.identity,
channel_id: sub.channel_id,
earliest_seq_id: sub.earliest_seq_id,
last_read_seq_id: sub.last_read_seq_id,
last_seq_id,
}]
} else {