Files
zep/spacetimedb/src/reducers.rs
T

1266 lines
40 KiB
Rust

use crate::tables::*;
use crate::utils::*;
use spacetimedb::{Identity, ReducerContext, Table};
#[spacetimedb::reducer]
pub fn set_typing(ctx: &ReducerContext, channel_id: u64, typing: bool) {
let existing = ctx.db.typing_activity().identity().find(ctx.sender());
let activity = TypingActivity {
identity: ctx.sender(),
channel_id,
typing,
};
if existing.is_some() {
ctx.db.typing_activity().identity().update(activity);
} else {
ctx.db.typing_activity().insert(activity);
}
}
#[spacetimedb::reducer]
pub fn upload_image(
ctx: &ReducerContext,
data: Vec<u8>,
mime_type: String,
name: Option<String>,
client_id: Option<String>,
) {
if let Some(ref cid) = client_id {
ctx.db.upload_status().insert(UploadStatus {
client_id: cid.clone(),
identity: ctx.sender(),
status: "pending".to_string(),
image_id: None,
error: None,
});
}
if data.len() > 4 * 1024 * 1024 {
if let Some(ref cid) = client_id {
ctx.db.upload_status().client_id().update(UploadStatus {
client_id: cid.clone(),
identity: ctx.sender(),
status: "error".to_string(),
image_id: None,
error: Some("Image exceeds 4MB limit".to_string()),
});
}
return;
}
let img = ctx.db.image().insert(Image {
id: 0,
mime_type,
name,
});
ctx.db.image_data().insert(ImageData {
image_id: img.id,
data,
});
if let Some(ref cid) = client_id {
ctx.db.upload_status().client_id().update(UploadStatus {
client_id: cid.clone(),
identity: ctx.sender(),
image_id: Some(img.id),
status: "success".to_string(),
error: None,
});
}
}
#[spacetimedb::reducer]
pub fn clear_upload_status(ctx: &ReducerContext, client_id: String) {
if let Some(status) = ctx.db.upload_status().client_id().find(client_id.clone()) {
if status.identity == ctx.sender() {
ctx.db.upload_status().client_id().delete(client_id);
}
}
}
#[spacetimedb::reducer]
pub fn upload_custom_emoji(ctx: &ReducerContext, name: String, category: String, data: Vec<u8>) {
if data.len() > 256 * 1024 {
return report_error(&ctx.db, ctx.sender(), "upload_emoji", "Emoji image exceeds 256KB limit", ctx.timestamp);
}
// Note: Emojis are currently global.
ctx.db.custom_emoji().insert(CustomEmoji {
id: 0,
name,
category,
data,
});
report_success(&ctx.db, ctx.sender(), "upload_emoji", ctx.timestamp);
}
#[spacetimedb::reducer]
pub fn upload_avatar(ctx: &ReducerContext, data: Vec<u8>, mime_type: String) {
let img = ctx.db.image().insert(Image {
id: 0,
mime_type,
name: Some("avatar".to_string()),
});
ctx.db.image_data().insert(ImageData {
image_id: img.id,
data,
});
if let Some(mut user) = ctx.db.user().identity().find(ctx.sender()) {
user.avatar_id = Some(img.id);
ctx.db.user().identity().update(user);
sync_server_member_info(&ctx.db, ctx.sender());
}
}
#[spacetimedb::reducer]
pub fn upload_banner(ctx: &ReducerContext, data: Vec<u8>, mime_type: String) {
let img = ctx.db.image().insert(Image {
id: 0,
mime_type,
name: Some("banner".to_string()),
});
ctx.db.image_data().insert(ImageData {
image_id: img.id,
data,
});
if let Some(mut user) = ctx.db.user().identity().find(ctx.sender()) {
user.banner_id = Some(img.id);
ctx.db.user().identity().update(user);
}
}
#[spacetimedb::reducer]
pub fn set_banner(ctx: &ReducerContext, banner_id: Option<u64>) {
if let Some(mut user) = ctx.db.user().identity().find(ctx.sender()) {
user.banner_id = banner_id;
ctx.db.user().identity().update(user);
}
}
#[spacetimedb::reducer]
pub fn upload_server_avatar(
ctx: &ReducerContext,
server_id: u64,
data: Vec<u8>,
mime_type: String,
) {
if !has_permission(&ctx.db, ctx.sender(), server_id, PERM_MANAGE_SERVER) {
return report_error(&ctx.db, ctx.sender(), "update_server", "Insufficient permissions", ctx.timestamp);
}
let mut s = match ctx.db.server().id().find(server_id) {
Some(s) => s,
None => return report_error(&ctx.db, ctx.sender(), "update_server", "Server not found", ctx.timestamp),
};
if data.len() > 4 * 1024 * 1024 {
return report_error(&ctx.db, ctx.sender(), "update_server", "Image exceeds 4MB limit", ctx.timestamp);
}
let img = ctx.db.image().insert(Image {
id: 0,
mime_type,
name: Some("server_avatar".to_string()),
});
ctx.db.image_data().insert(ImageData {
image_id: img.id,
data,
});
s.avatar_id = Some(img.id);
ctx.db.server().id().update(s);
report_success(&ctx.db, ctx.sender(), "update_server", ctx.timestamp);
}
#[spacetimedb::reducer]
pub fn update_server_name(ctx: &ReducerContext, server_id: u64, name: String) {
if !has_permission(&ctx.db, ctx.sender(), server_id, PERM_MANAGE_SERVER) {
return report_error(&ctx.db, ctx.sender(), "update_server", "Insufficient permissions", ctx.timestamp);
}
if let Err(e) = validate_name(&name) {
return report_error(&ctx.db, ctx.sender(), "update_server", &e, ctx.timestamp);
}
if let Some(mut s) = ctx.db.server().id().find(server_id) {
s.name = name;
ctx.db.server().id().update(s);
report_success(&ctx.db, ctx.sender(), "update_server", ctx.timestamp);
}
}
#[spacetimedb::reducer]
pub fn delete_server(ctx: &ReducerContext, server_id: u64) {
if !has_permission(&ctx.db, ctx.sender(), server_id, PERM_DELETE_SERVER) {
return report_error(&ctx.db, ctx.sender(), "delete_server", "Insufficient permissions", ctx.timestamp);
}
for c in ctx.db.channel().server_id().filter(server_id) {
for msg in ctx.db.message().channel_id().filter(c.id) {
ctx.db.message().id().delete(msg.id);
}
ctx.db.channel().id().delete(c.id);
}
for m in ctx.db.server_member().server_id().filter(server_id) {
ctx.db.server_member().id().delete(m.id);
}
// Clean up roles and assignments
for r in ctx.db.server_role().server_id().filter(server_id) {
ctx.db.server_role().id().delete(r.id);
}
for ma in ctx.db.member_role().server_id().filter(server_id) {
ctx.db.member_role().id().delete(ma.id);
}
for p in ctx.db.server_permission().server_id().filter(server_id) {
ctx.db.server_permission().id().delete(p.id);
}
ctx.db.server().id().delete(server_id);
report_success(&ctx.db, ctx.sender(), "delete_server", ctx.timestamp);
}
#[spacetimedb::reducer]
pub fn create_role(ctx: &ReducerContext, server_id: u64, name: String, color: Option<String>, icon_id: Option<u64>, permissions: u128) {
if !has_permission(&ctx.db, ctx.sender(), server_id, PERM_MANAGE_ROLES) {
return report_error(&ctx.db, ctx.sender(), "create_role", "Insufficient permissions", ctx.timestamp);
}
let position = ctx.db.server_role().server_id().filter(server_id).count() as u32;
ctx.db.server_role().insert(ServerRole {
id: 0,
server_id,
name,
color,
icon_id,
permissions,
position,
});
report_success(&ctx.db, ctx.sender(), "create_role", ctx.timestamp);
}
#[spacetimedb::reducer]
pub fn update_role(ctx: &ReducerContext, role_id: u64, name: String, color: Option<String>, icon_id: Option<u64>, permissions: u128, position: u32) {
let mut role = match ctx.db.server_role().id().find(role_id) {
Some(r) => r,
None => return report_error(&ctx.db, ctx.sender(), "update_role", "Role not found", ctx.timestamp),
};
if !has_permission(&ctx.db, ctx.sender(), role.server_id, PERM_MANAGE_ROLES) {
return report_error(&ctx.db, ctx.sender(), "update_role", "Insufficient permissions", ctx.timestamp);
}
role.name = name;
role.color = color;
role.icon_id = icon_id;
role.permissions = permissions;
role.position = position;
ctx.db.server_role().id().update(role);
report_success(&ctx.db, ctx.sender(), "update_role", ctx.timestamp);
}
#[spacetimedb::reducer]
pub fn delete_role(ctx: &ReducerContext, role_id: u64) {
let role = match ctx.db.server_role().id().find(role_id) {
Some(r) => r,
None => return report_error(&ctx.db, ctx.sender(), "delete_role", "Role not found", ctx.timestamp),
};
if !has_permission(&ctx.db, ctx.sender(), role.server_id, PERM_MANAGE_ROLES) {
return report_error(&ctx.db, ctx.sender(), "delete_role", "Insufficient permissions", ctx.timestamp);
}
// Remove all assignments for this role
let assignments: Vec<_> = ctx.db.member_role().role_id().filter(role_id).collect();
for a in assignments {
ctx.db.member_role().id().delete(a.id);
}
ctx.db.server_role().id().delete(role_id);
report_success(&ctx.db, ctx.sender(), "delete_role", ctx.timestamp);
}
#[spacetimedb::reducer]
pub fn assign_role(ctx: &ReducerContext, server_id: u64, identity: Identity, role_id: u64) {
if !has_permission(&ctx.db, ctx.sender(), server_id, PERM_MANAGE_ROLES) {
return report_error(&ctx.db, ctx.sender(), "assign_role", "Insufficient permissions", ctx.timestamp);
}
// Verify role exists and belongs to server
let role = match ctx.db.server_role().id().find(role_id) {
Some(r) => r,
None => return report_error(&ctx.db, ctx.sender(), "assign_role", "Role not found", ctx.timestamp),
};
if role.server_id != server_id {
return report_error(&ctx.db, ctx.sender(), "assign_role", "Role does not belong to this server", ctx.timestamp);
}
// Check for duplicate assignment
if ctx.db.member_role().identity().filter(identity).any(|mr| mr.role_id == role_id) {
return;
}
ctx.db.member_role().insert(MemberRole {
id: 0,
server_id,
identity,
role_id,
});
report_success(&ctx.db, ctx.sender(), "assign_role", ctx.timestamp);
}
#[spacetimedb::reducer]
pub fn revoke_role(ctx: &ReducerContext, server_id: u64, identity: Identity, role_id: u64) {
if !has_permission(&ctx.db, ctx.sender(), server_id, PERM_MANAGE_ROLES) {
return report_error(&ctx.db, ctx.sender(), "revoke_role", "Insufficient permissions", ctx.timestamp);
}
let assignments: Vec<_> = ctx.db.member_role().identity().filter(identity).filter(|mr| mr.role_id == role_id).collect();
for a in assignments {
ctx.db.member_role().id().delete(a.id);
}
report_success(&ctx.db, ctx.sender(), "revoke_role", ctx.timestamp);
}
#[spacetimedb::reducer]
pub fn edit_message(ctx: &ReducerContext, message_id: u64, new_text: String) {
if let Err(e) = validate_message_length(&ctx.db, &new_text) {
return report_error(&ctx.db, ctx.sender(), "edit_message", &e, ctx.timestamp);
}
if let Some(mut msg) = ctx.db.message().id().find(message_id) {
if msg.sender == ctx.sender() {
msg.text = new_text;
msg.edited = true;
let msg = ctx.db.message().id().update(msg);
sync_recent_message(&ctx.db, msg);
report_success(&ctx.db, ctx.sender(), "edit_message", ctx.timestamp);
}
}
}
#[spacetimedb::reducer]
pub fn delete_message(ctx: &ReducerContext, message_id: u64) {
if let Some(msg) = ctx.db.message().id().find(message_id) {
let is_owner = msg.sender == ctx.sender();
let can_moderate = has_permission(&ctx.db, ctx.sender(), msg.server_id, PERM_MODERATE_MESSAGES);
if is_owner || can_moderate {
ctx.db.message().id().delete(message_id);
// We should also delete from recent_message if it's there
if let Some(recent) = ctx.db.recent_message().id().find(message_id) {
ctx.db.recent_message().id().delete(recent.id);
}
report_success(&ctx.db, ctx.sender(), "delete_message", ctx.timestamp);
}
}
}
#[spacetimedb::reducer]
pub fn set_server_public(ctx: &ReducerContext, server_id: u64, public: bool) {
if !has_permission(&ctx.db, ctx.sender(), server_id, PERM_MANAGE_SERVER) {
return report_error(&ctx.db, ctx.sender(), "update_server", "Insufficient permissions", ctx.timestamp);
}
if let Some(mut s) = ctx.db.server().id().find(server_id) {
s.public = public;
ctx.db.server().id().update(s);
report_success(&ctx.db, ctx.sender(), "update_server", ctx.timestamp);
}
}
#[spacetimedb::reducer]
pub fn set_member_permissions(ctx: &ReducerContext, server_id: u64, identity: Identity, permissions: u128) {
if !has_permission(&ctx.db, ctx.sender(), server_id, PERM_MANAGE_ROLES) {
return report_error(&ctx.db, ctx.sender(), "set_permissions", "Insufficient permissions", ctx.timestamp);
}
let existing = ctx.db.server_permission().server_id().filter(server_id).find(|p| p.identity == identity);
if let Some(mut p) = existing {
p.permissions = permissions;
ctx.db.server_permission().id().update(p);
} else {
ctx.db.server_permission().insert(ServerPermission { id: 0, server_id, identity, permissions });
}
report_success(&ctx.db, ctx.sender(), "set_permissions", ctx.timestamp);
}
#[spacetimedb::reducer]
pub fn toggle_reaction(
ctx: &ReducerContext,
message_id: u64,
emoji: Option<String>,
custom_emoji_id: Option<u64>,
) {
if let Some(mut msg) = ctx.db.message().id().find(message_id) {
let existing_idx = msg.reactions.iter().position(|r| {
r.identity == ctx.sender() && r.emoji == emoji && r.custom_emoji_id == custom_emoji_id
});
if let Some(idx) = existing_idx {
msg.reactions.remove(idx);
} else {
msg.reactions.push(Reaction {
identity: ctx.sender(),
emoji,
custom_emoji_id,
});
}
let msg = ctx.db.message().id().update(msg);
sync_recent_message(&ctx.db, msg);
report_success(&ctx.db, ctx.sender(), "toggle_reaction", ctx.timestamp);
}
}
#[spacetimedb::reducer]
pub fn set_name(ctx: &ReducerContext, name: String) {
if let Err(e) = validate_name(&name) {
return report_error(&ctx.db, ctx.sender(), "update_profile", &e, ctx.timestamp);
}
if let Some(mut user) = ctx.db.user().identity().find(ctx.sender()) {
user.name = Some(name);
ctx.db.user().identity().update(user);
sync_server_member_info(&ctx.db, ctx.sender());
report_success(&ctx.db, ctx.sender(), "update_profile", ctx.timestamp);
}
}
#[spacetimedb::reducer]
pub fn set_avatar(ctx: &ReducerContext, avatar_id: Option<u64>) {
if let Some(mut user) = ctx.db.user().identity().find(ctx.sender()) {
user.avatar_id = avatar_id;
ctx.db.user().identity().update(user);
sync_server_member_info(&ctx.db, ctx.sender());
report_success(&ctx.db, ctx.sender(), "update_profile", ctx.timestamp);
}
}
#[spacetimedb::reducer]
pub fn update_public_key(ctx: &ReducerContext, public_key: Option<String>) {
if let Some(mut user) = ctx.db.user().identity().find(ctx.sender()) {
user.public_key = public_key;
ctx.db.user().identity().update(user);
report_success(&ctx.db, ctx.sender(), "update_keys", ctx.timestamp);
}
}
#[spacetimedb::reducer]
pub fn set_biography(ctx: &ReducerContext, biography: Option<String>) {
if let Some(mut user) = ctx.db.user().identity().find(ctx.sender()) {
user.biography = biography;
ctx.db.user().identity().update(user);
}
}
#[spacetimedb::reducer]
pub fn set_status(ctx: &ReducerContext, status: Option<String>) {
if let Some(mut user) = ctx.db.user().identity().find(ctx.sender()) {
user.status = status;
ctx.db.user().identity().update(user);
}
}
#[spacetimedb::reducer]
pub fn subscribe_to_channel(ctx: &ReducerContext, channel_id: u64) {
let current_max = ctx
.db
.channel_internal_state()
.channel_id()
.find(channel_id)
.map(|c| c.last_seq_id)
.unwrap_or(0);
// Initial state: earliest is same as current max (scrollback is empty)
let earliest = current_max;
ctx.db
.channel_subscription()
.identity()
.delete(ctx.sender());
ctx.db.channel_subscription().insert(ChannelSubscription {
identity: ctx.sender(),
channel_id,
earliest_seq_id: earliest,
last_read_seq_id: current_max,
});
}
#[spacetimedb::reducer]
pub fn request_image_blob(ctx: &ReducerContext, image_id: u64) {
ctx.db.image_blob_request().identity().delete(ctx.sender());
ctx.db.image_blob_request().insert(ImageBlobRequest {
identity: ctx.sender(),
image_id,
});
}
#[spacetimedb::reducer]
pub fn extend_subscription(ctx: &ReducerContext, channel_id: u64, earliest_seq_id: u64) {
if let Some(sub) = ctx.db.channel_subscription().identity().find(ctx.sender()) {
if sub.channel_id == channel_id {
let mut sub = sub.clone();
sub.earliest_seq_id = earliest_seq_id;
ctx.db
.channel_subscription()
.identity()
.delete(ctx.sender());
ctx.db.channel_subscription().insert(sub);
}
}
}
#[spacetimedb::reducer]
pub fn create_server(ctx: &ReducerContext, name: String) {
if let Err(e) = validate_name(&name) {
return report_error(&ctx.db, ctx.sender(), "create_server", &e, ctx.timestamp);
}
let user = match ctx.db.user().identity().find(ctx.sender()) {
Some(u) => u,
None => {
return report_error(
&ctx.db,
ctx.sender(),
"create_server",
"User not found",
ctx.timestamp,
);
}
};
let s = ctx.db.server().insert(Server {
id: 0,
name: name.clone(),
owner: Some(ctx.sender()),
avatar_id: None,
channels: Vec::new(),
public: false,
default_role_id: None,
});
ctx.db.server_member().insert(ServerMember {
id: 0,
identity: ctx.sender(),
server_id: s.id,
name: user.name.clone(),
avatar_id: user.avatar_id,
online: user.online,
});
let c1 = ctx.db.channel().insert(Channel {
id: 0,
server_id: s.id,
name: "general".to_string(),
kind: ChannelKind::Text,
});
let c2 = ctx.db.channel().insert(Channel {
id: 0,
server_id: s.id,
name: "voice".to_string(),
kind: ChannelKind::Voice,
});
let mut s = ctx.db.server().id().find(s.id).unwrap();
s.channels.push(ChannelMetadata {
id: c1.id,
name: c1.name,
kind: c1.kind,
});
s.channels.push(ChannelMetadata {
id: c2.id,
name: c2.name,
kind: c2.kind,
});
ctx.db.server().id().update(s.clone());
// RBAC: Grant all permissions to the creator
ctx.db.server_permission().insert(ServerPermission {
id: 0,
server_id: s.id,
identity: ctx.sender(),
permissions: u128::MAX,
});
// Create Default Roles
let admin_role = ctx.db.server_role().insert(ServerRole {
id: 0,
server_id: s.id,
name: "Admin".to_string(),
color: Some("#ed4245".to_string()), // Red
icon_id: None,
permissions: u128::MAX,
position: 3,
});
ctx.db.server_role().insert(ServerRole {
id: 0,
server_id: s.id,
name: "Moderator".to_string(),
color: Some("#3498db".to_string()), // Blue
icon_id: None,
permissions: PERM_KICK_MEMBERS | PERM_BAN_MEMBERS | PERM_MODERATE_MESSAGES | PERM_USE_VOICE | PERM_SHARE_SCREEN | PERM_USE_THREADS,
position: 2,
});
let member_role = ctx.db.server_role().insert(ServerRole {
id: 0,
server_id: s.id,
name: "Member".to_string(),
color: Some("#99aab5".to_string()), // Gray
icon_id: None,
permissions: PERM_USE_VOICE | PERM_SHARE_SCREEN | PERM_USE_THREADS,
position: 1,
});
// Assign Admin role to creator
ctx.db.member_role().insert(MemberRole {
id: 0,
server_id: s.id,
identity: ctx.sender(),
role_id: admin_role.id,
});
// Set default role for server
let mut s = ctx.db.server().id().find(s.id).unwrap();
s.default_role_id = Some(member_role.id);
ctx.db.server().id().update(s.clone());
sync_server_access(&ctx.db, ctx.sender(), s.id);
report_success(&ctx.db, ctx.sender(), "create_server", ctx.timestamp);
}
use rand::distributions::{Alphanumeric, DistString};
#[spacetimedb::reducer]
pub fn create_invite(
ctx: &ReducerContext,
server_id: u64,
max_uses: Option<u32>,
expires_in_hrs: Option<u32>,
) {
// RBAC: Only members with permission can invite
if !has_permission(&ctx.db, ctx.sender(), server_id, PERM_CREATE_INVITES) {
return report_error(
&ctx.db,
ctx.sender(),
"create_invite",
"Insufficient permissions",
ctx.timestamp,
);
}
// Generate a 12-character alphanumeric code using the provided deterministic RNG
let code = Alphanumeric.sample_string(&mut ctx.rng(), 12);
let expires_at = expires_in_hrs.map(|h| {
let duration = spacetimedb::TimeDuration::from_micros(h as i64 * 3600 * 1000000);
ctx.timestamp + duration
});
ctx.db.invite().insert(Invite {
code: code.clone(),
server_id,
inviter: ctx.sender(),
expires_at,
uses_remaining: max_uses,
});
// Send the generated code back to the client via ReducerStatus
report_success_with_payload(&ctx.db, ctx.sender(), "create_invite", &code, ctx.timestamp);
}
#[spacetimedb::reducer]
pub fn join_server(ctx: &ReducerContext, server_id: Option<u64>, invite_code: Option<String>) {
let sender = ctx.sender();
let user = match ctx.db.user().identity().find(sender) {
Some(u) => u,
None => {
return report_error(
&ctx.db,
sender,
"join_server",
"User not found",
ctx.timestamp,
);
}
};
let target_server_id = if let Some(id) = server_id {
id
} else if let Some(ref code) = invite_code {
let invite = match ctx.db.invite().code().find(code.clone()) {
Some(i) => i,
None => {
return report_error(
&ctx.db,
sender,
"join_server",
"Invalid invite code",
ctx.timestamp,
);
}
};
invite.server_id
} else {
return report_error(
&ctx.db,
sender,
"join_server",
"Either server_id or invite_code must be provided",
ctx.timestamp,
);
};
let s = match ctx.db.server().id().find(target_server_id) {
Some(s) => s,
None => {
return report_error(
&ctx.db,
sender,
"join_server",
"Server not found",
ctx.timestamp,
);
}
};
// Permission check: if private, must have a valid invite code
if !s.public {
if let Some(code) = invite_code {
let invite = match ctx.db.invite().code().find(code) {
Some(i) => i,
None => {
return report_error(
&ctx.db,
sender,
"join_server",
"Invalid invite code",
ctx.timestamp,
);
}
};
if invite.server_id != target_server_id {
return report_error(
&ctx.db,
sender,
"join_server",
"Invite code does not match server",
ctx.timestamp,
);
}
// Expiry check
if let Some(expiry) = invite.expires_at {
if ctx.timestamp > expiry {
return report_error(
&ctx.db,
sender,
"join_server",
"Invite code expired",
ctx.timestamp,
);
}
}
// Uses check
if let Some(mut uses) = invite.uses_remaining {
if uses == 0 {
return report_error(
&ctx.db,
sender,
"join_server",
"Invite code usage limit reached",
ctx.timestamp,
);
}
uses -= 1;
let mut invite = invite.clone();
invite.uses_remaining = Some(uses);
ctx.db.invite().code().update(invite);
}
} else {
return report_error(
&ctx.db,
sender,
"join_server",
"Invite code required for private server",
ctx.timestamp,
);
}
}
if !s.public && user.subject.is_none() && user.name.is_none() {
return report_error(
&ctx.db,
sender,
"join_server",
"DisplayName required for private server",
ctx.timestamp,
);
}
if ctx
.db
.server_member()
.identity()
.filter(sender)
.any(|m| m.server_id == target_server_id)
{
return report_error(
&ctx.db,
sender,
"join_server",
"Already a member of this server",
ctx.timestamp,
);
}
ctx.db.server_member().insert(ServerMember {
id: 0,
identity: sender,
server_id: target_server_id,
name: user.name.clone(),
avatar_id: user.avatar_id,
online: user.online,
});
// Auto-assign default role if it exists
if let Some(default_role_id) = s.default_role_id {
ctx.db.member_role().insert(MemberRole {
id: 0,
server_id: target_server_id,
identity: sender,
role_id: default_role_id,
});
}
sync_server_access(&ctx.db, sender, target_server_id);
// Record success
report_success_with_payload(
&ctx.db,
sender,
"join_server",
&target_server_id.to_string(),
ctx.timestamp,
);
}
#[spacetimedb::reducer]
pub fn set_default_role(ctx: &ReducerContext, server_id: u64, role_id: Option<u64>) {
if !has_permission(&ctx.db, ctx.sender(), server_id, PERM_MANAGE_ROLES) {
return report_error(&ctx.db, ctx.sender(), "set_default_role", "Insufficient permissions", ctx.timestamp);
}
if let Some(mut s) = ctx.db.server().id().find(server_id) {
// Verify role exists and belongs to server if it's not None
if let Some(rid) = role_id {
let role = match ctx.db.server_role().id().find(rid) {
Some(r) => r,
None => return report_error(&ctx.db, ctx.sender(), "set_default_role", "Role not found", ctx.timestamp),
};
if role.server_id != server_id {
return report_error(&ctx.db, ctx.sender(), "set_default_role", "Role does not belong to this server", ctx.timestamp);
}
}
s.default_role_id = role_id;
ctx.db.server().id().update(s);
report_success(&ctx.db, ctx.sender(), "set_default_role", ctx.timestamp);
}
}
#[spacetimedb::reducer]
pub fn leave_server(ctx: &ReducerContext, server_id: u64) {
let sender = ctx.sender();
let members: Vec<_> = ctx
.db
.server_member()
.identity()
.filter(sender)
.filter(|m| m.server_id == server_id)
.collect();
for m in members {
ctx.db.server_member().id().delete(m.id);
}
// Also delete any invites this user created for this server
let invites: Vec<_> = ctx
.db
.invite()
.iter()
.filter(|i| i.inviter == sender && i.server_id == server_id)
.collect();
for i in invites {
ctx.db.invite().code().delete(i.code);
}
revoke_server_access(&ctx.db, sender, server_id);
report_success(&ctx.db, sender, "leave_server", ctx.timestamp);
}
#[spacetimedb::reducer]
pub fn create_channel(ctx: &ReducerContext, name: String, server_id: u64, is_voice: bool) {
if !has_permission(&ctx.db, ctx.sender(), server_id, PERM_MANAGE_CHANNELS) {
return report_error(&ctx.db, ctx.sender(), "create_channel", "Insufficient permissions", ctx.timestamp);
}
if let Err(e) = validate_name(&name) {
return report_error(&ctx.db, ctx.sender(), "create_channel", &e, ctx.timestamp);
}
let mut s = match ctx.db.server().id().find(server_id) {
Some(s) => s,
None => {
return report_error(
&ctx.db,
ctx.sender(),
"create_channel",
"Server not found",
ctx.timestamp,
);
}
};
let kind = if is_voice {
ChannelKind::Voice
} else {
ChannelKind::Text
};
let chan = ctx.db.channel().insert(Channel {
id: 0,
server_id,
name: name.clone(),
kind,
});
s.channels.push(ChannelMetadata {
id: chan.id,
name,
kind,
});
ctx.db.server().id().update(s);
for m in ctx.db.server_member().server_id().filter(server_id) {
grant_user_channel_access(&ctx.db, m.identity, chan.id);
}
report_success(&ctx.db, ctx.sender(), "create_channel", ctx.timestamp);
}
#[spacetimedb::reducer]
pub fn join_voice(ctx: &ReducerContext, channel_id: u64) {
let chan = match ctx.db.channel().id().find(channel_id) {
Some(c) => c,
None => return report_error(&ctx.db, ctx.sender(), "join_voice", "Channel not found", ctx.timestamp),
};
// RBAC: Check voice permission if it's a server channel
if chan.server_id != 0 && !has_permission(&ctx.db, ctx.sender(), chan.server_id, PERM_USE_VOICE) {
return report_error(&ctx.db, ctx.sender(), "join_voice", "Insufficient permissions", ctx.timestamp);
}
if let Some(mut state) = ctx.db.user_state().identity().find(ctx.sender()) {
if state.channel_id != channel_id {
clear_signaling_for_user(&ctx.db, ctx.sender());
state.channel_id = channel_id;
state.is_sharing_screen = false;
state.is_talking = false;
state.watching = None;
ctx.db.user_state().identity().update(state);
}
} else {
ctx.db.user_state().insert(UserState {
identity: ctx.sender(),
channel_id,
is_sharing_screen: false,
is_muted: false,
is_deafened: false,
is_talking: false,
watching: None,
});
}
report_success(&ctx.db, ctx.sender(), "join_voice", ctx.timestamp);
}
#[spacetimedb::reducer]
pub fn set_sharing_screen(ctx: &ReducerContext, sharing: bool) {
if let Some(mut state) = ctx.db.user_state().identity().find(ctx.sender()) {
state.is_sharing_screen = sharing;
ctx.db.user_state().identity().update(state);
if !sharing {
let watchers: Vec<_> = ctx
.db
.user_state()
.iter()
.filter(|s| s.watching == Some(ctx.sender()))
.collect();
for mut w in watchers {
w.watching = None;
ctx.db.user_state().identity().update(w);
}
clear_signaling_for_user(&ctx.db, ctx.sender());
}
}
}
#[spacetimedb::reducer]
pub fn set_mute(ctx: &ReducerContext, muted: bool) {
if let Some(mut state) = ctx.db.user_state().identity().find(ctx.sender()) {
state.is_muted = muted;
ctx.db.user_state().identity().update(state);
}
}
#[spacetimedb::reducer]
pub fn set_deafen(ctx: &ReducerContext, deafened: bool) {
if let Some(mut state) = ctx.db.user_state().identity().find(ctx.sender()) {
state.is_deafened = deafened;
ctx.db.user_state().identity().update(state);
}
}
#[spacetimedb::reducer]
pub fn set_talking(ctx: &ReducerContext, talking: bool) {
if let Some(mut state) = ctx.db.user_state().identity().find(ctx.sender()) {
state.is_talking = talking;
ctx.db.user_state().identity().update(state);
}
}
#[spacetimedb::reducer]
pub fn start_watching(ctx: &ReducerContext, watchee: Identity) {
if ctx.sender() != watchee {
if let Some(mut state) = ctx.db.user_state().identity().find(ctx.sender()) {
state.watching = Some(watchee);
ctx.db.user_state().identity().update(state);
}
}
}
#[spacetimedb::reducer]
pub fn stop_watching(ctx: &ReducerContext) {
if let Some(mut state) = ctx.db.user_state().identity().find(ctx.sender()) {
state.watching = None;
ctx.db.user_state().identity().update(state);
}
}
#[spacetimedb::reducer]
pub fn leave_voice(ctx: &ReducerContext) {
clear_user_presence(&ctx.db, ctx.sender());
}
#[spacetimedb::reducer]
pub fn send_webrtc_signal(
ctx: &ReducerContext,
receiver: Identity,
signal_kind: SignalKind,
media_type: MediaType,
data: String,
channel_id: u64,
) {
ctx.db.webrtc_signal().insert(WebRTCSignal {
id: 0,
sender: ctx.sender(),
receiver,
signal_kind,
media_type,
data,
channel_id,
});
}
#[spacetimedb::reducer]
pub fn send_message(
ctx: &ReducerContext,
text: String,
channel_id: u64,
thread_id: Option<u64>,
image_ids: Vec<u64>,
is_encrypted: bool,
) {
internal_send_message(
&ctx.db,
ctx.sender(),
channel_id,
text,
ctx.timestamp,
thread_id,
image_ids,
is_encrypted,
);
}
#[spacetimedb::reducer]
pub fn set_configuration(ctx: &ReducerContext, key: String, value: String) {
let config = SystemConfiguration {
key: key.clone(),
value,
};
if ctx.db.system_configuration().key().find(key).is_some() {
ctx.db.system_configuration().key().update(config);
} else {
ctx.db.system_configuration().insert(config);
}
}
#[spacetimedb::reducer]
pub fn create_thread(ctx: &ReducerContext, name: String, channel_id: u64, parent_message_id: u64) {
log::info!(
"create_thread START: name={}, parent_message_id={}",
name,
parent_message_id
);
let mut thread_name = name;
if thread_name.trim().is_empty() {
thread_name = ctx
.db
.message()
.id()
.find(parent_message_id)
.map(|m| {
if m.text.trim().is_empty() {
"New Thread".to_string()
} else {
m.text.chars().take(32).collect()
}
})
.unwrap_or("New Thread".to_string());
}
if let Err(e) = validate_name(&thread_name) {
return report_error(&ctx.db, ctx.sender(), "create_thread", &e, ctx.timestamp);
}
if let Some(mut pm) = ctx.db.message().id().find(parent_message_id) {
if pm.channel_id != channel_id {
return report_error(
&ctx.db,
ctx.sender(),
"create_thread",
"Channel ID mismatch",
ctx.timestamp,
);
}
pm.thread_name = Some(thread_name);
ctx.db.message().id().update(pm);
report_success(&ctx.db, ctx.sender(), "create_thread", ctx.timestamp);
} else {
report_error(
&ctx.db,
ctx.sender(),
"create_thread",
"Parent message not found",
ctx.timestamp,
);
}
}
#[spacetimedb::reducer]
pub fn create_thread_with_message(
ctx: &ReducerContext,
name: String,
channel_id: u64,
parent_message_id: u64,
text: String,
image_ids: Vec<u64>,
is_encrypted: bool,
) {
log::info!(
"create_thread_with_message START: name={}, parent_message_id={}",
name,
parent_message_id
);
if let Err(e) = validate_name(&name) {
return report_error(&ctx.db, ctx.sender(), "create_thread", &e, ctx.timestamp);
}
if let Some(mut pm) = ctx.db.message().id().find(parent_message_id) {
if pm.channel_id != channel_id {
return report_error(
&ctx.db,
ctx.sender(),
"create_thread",
"Channel ID mismatch",
ctx.timestamp,
);
}
pm.thread_name = Some(name);
// Note: internal_send_message will increment pm.thread_reply_count for us
let pm = ctx.db.message().id().update(pm);
sync_recent_message(&ctx.db, pm);
internal_send_message(
&ctx.db,
ctx.sender(),
channel_id,
text,
ctx.timestamp,
Some(parent_message_id),
image_ids,
is_encrypted,
);
report_success(&ctx.db, ctx.sender(), "create_thread", ctx.timestamp);
} else {
report_error(
&ctx.db,
ctx.sender(),
"create_thread",
"Parent message not found",
ctx.timestamp,
);
}
}
#[spacetimedb::reducer]
pub fn bootstrap_sequences(ctx: &ReducerContext) {
let internal_states: Vec<_> = ctx.db.channel_internal_state().iter().collect();
for s in internal_states {
ctx.db
.channel_internal_state()
.channel_id()
.delete(s.channel_id);
}
let all_access: Vec<_> = ctx.db.user_channel_access().iter().collect();
for a in all_access {
ctx.db.user_channel_access().id().delete(a.id);
}
for mut msg in ctx.db.message().iter() {
let seq_id = get_next_seq_id(&ctx.db, msg.channel_id);
msg.seq_id = seq_id;
ctx.db.message().id().update(msg);
}
for m in ctx.db.server_member().iter() {
sync_server_access(&ctx.db, m.identity.clone(), m.server_id);
}
for dm in ctx.db.direct_message().iter() {
grant_user_channel_access(&ctx.db, dm.sender, dm.channel_id);
grant_user_channel_access(&ctx.db, dm.recipient, dm.channel_id);
}
}
#[spacetimedb::reducer]
pub fn open_direct_message(ctx: &ReducerContext, recipient: Identity) {
if ctx.sender() != recipient {
internal_open_direct_message(&ctx.db, ctx.sender(), recipient);
}
}
#[spacetimedb::reducer]
pub fn close_direct_message(ctx: &ReducerContext, channel_id: u64) {
if let Some(mut dm) = ctx
.db
.direct_message()
.channel_id()
.filter(channel_id)
.next()
{
if dm.sender == ctx.sender() {
dm.is_open_sender = false;
} else if dm.recipient == ctx.sender() {
dm.is_open_recipient = false;
}
ctx.db.direct_message().id().update(dm);
}
}
#[spacetimedb::reducer]
pub fn open_thread(ctx: &ReducerContext, thread_id: u64) {
let identity = ctx.sender();
ctx.db.thread_subscription().identity().delete(identity);
ctx.db.thread_subscription().insert(ThreadSubscription {
identity,
thread_id,
});
}
#[spacetimedb::reducer]
pub fn close_thread(ctx: &ReducerContext) {
ctx.db.thread_subscription().identity().delete(ctx.sender());
}