diff --git a/.vscode/settings.json b/.vscode/settings.json index 7f8ecfcd..707af02f 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -1,6 +1,6 @@ { "editor.formatOnSave": true, - "rust-analyzer.checkOnSave.command": "clippy", + "rust-analyzer.check.command": "clippy", "nixEnvSelector.suggestion": false, "nixEnvSelector.nixFile": "${workspaceFolder}/default.nix" -} \ No newline at end of file +} diff --git a/Cargo.lock b/Cargo.lock index 6d6ef809..ae209196 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -58,7 +58,7 @@ version = "0.7.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "891477e0c6a8957309ee5c45a6368af3ae14bb510732d2684ffa19af310920f9" dependencies = [ - "getrandom", + "getrandom 0.2.15", "once_cell", "version_check", ] @@ -70,7 +70,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e89da841a80418a9b391ebaea17f5c112ffaaa96f621d2c285b5174da76b9011" dependencies = [ "cfg-if", - "getrandom", + "getrandom 0.2.15", "once_cell", "version_check", "zerocopy", @@ -139,9 +139,12 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.95" +version = "1.0.98" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "34ac096ce696dc2fcabef30516bb13c0a68a11d30131d3df6f04711467681b04" +checksum = "e16d2d3311acee920a9eb8d33b8cbc1787ce4a264e85f964c2404b969bdcd487" +dependencies = [ + "backtrace", +] [[package]] name = "arbitrary" @@ -2056,7 +2059,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e9b3460f44bea8cd47f45a0c70892f1eff856d97cd55358b2f73f663789f6190" dependencies = [ "ct-codecs", - "getrandom", + "getrandom 0.2.15", ] [[package]] @@ -2675,10 +2678,22 @@ dependencies = [ "cfg-if", "js-sys", "libc", - "wasi", + "wasi 0.11.0+wasi-snapshot-preview1", "wasm-bindgen", ] +[[package]] +name = "getrandom" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26145e563e54f2cadc477553f1ec5ee650b00862f0a58bcd12cbdc5f0ea2d2f4" +dependencies = [ + "cfg-if", + "libc", + "r-efi", + "wasi 0.14.2+wasi-0.2.4", +] + [[package]] name = "getset" version = "0.1.3" @@ -4313,7 +4328,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2886843bf800fba2e3377cff24abf6379b4c4d5c6681eaf9ea5b0d15090450bd" dependencies = [ "libc", - "wasi", + "wasi 0.11.0+wasi-snapshot-preview1", "windows-sys 0.52.0", ] @@ -4933,7 +4948,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b7cafe60d6cf8e62e1b9b2ea516a089c008945bb5a275416789e7db0bc199dc" dependencies = [ "memchr", - "thiserror 2.0.9", + "thiserror 2.0.12", "ucd-trie", ] @@ -5386,7 +5401,7 @@ dependencies = [ "libc", "once_cell", "raw-cpuid", - "wasi", + "wasi 0.11.0+wasi-snapshot-preview1", "web-sys", "winapi", ] @@ -5430,6 +5445,12 @@ version = "0.4.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5a3866219251662ec3b26fc217e3e05bf9c4f84325234dfb96bf0bf840889e49" +[[package]] +name = "r-efi" +version = "5.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "74765f6d916ee2faa39bc8e68e4f3ed8949b48cccdac59983d287a7cb71ce9c5" + [[package]] name = "radium" version = "0.7.0" @@ -5466,6 +5487,16 @@ dependencies = [ "rand_core 0.6.4", ] +[[package]] +name = "rand" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9fbfd9d094a40bf3ae768db9361049ace4c0e04a4fd6b359518bd7b73a73dd97" +dependencies = [ + "rand_chacha 0.9.0", + "rand_core 0.9.3", +] + [[package]] name = "rand_chacha" version = "0.1.1" @@ -5486,6 +5517,16 @@ dependencies = [ "rand_core 0.6.4", ] +[[package]] +name = "rand_chacha" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb" +dependencies = [ + "ppv-lite86", + "rand_core 0.9.3", +] + [[package]] name = "rand_core" version = "0.3.1" @@ -5507,7 +5548,16 @@ version = "0.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" dependencies = [ - "getrandom", + "getrandom 0.2.15", +] + +[[package]] +name = "rand_core" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "99d9a13982dcf210057a8a78572b2217b667c3beacbf3a0d8b454f6f82837d38" +dependencies = [ + "getrandom 0.3.3", ] [[package]] @@ -5982,6 +6032,7 @@ dependencies = [ "pretty_env_logger", "revolt-result", "sentry", + "sentry-anyhow", "serde", ] @@ -6202,6 +6253,7 @@ name = "revolt-pushd" version = "0.8.5" dependencies = [ "amqprs", + "anyhow", "async-trait", "authifier", "base64 0.22.1", @@ -6214,6 +6266,7 @@ dependencies = [ "revolt-database", "revolt-models", "revolt-presence", + "revolt-result", "revolt_a2", "revolt_optional_struct", "serde", @@ -6356,7 +6409,7 @@ checksum = "c17fa4cb658e3583423e915b9f3acc01cceaee1860e33d59ebae66adc3a2dc0d" dependencies = [ "cc", "cfg-if", - "getrandom", + "getrandom 0.2.15", "libc", "spin", "untrusted", @@ -6918,9 +6971,9 @@ dependencies = [ "httpdate", "native-tls", "reqwest 0.11.27", - "sentry-backtrace", + "sentry-backtrace 0.31.8", "sentry-contexts", - "sentry-core", + "sentry-core 0.31.8", "sentry-debug-images", "sentry-panic", "sentry-tracing", @@ -6928,6 +6981,17 @@ dependencies = [ "ureq", ] +[[package]] +name = "sentry-anyhow" +version = "0.38.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c93cfb800654dc846ccbeb35254a2f6f194aaf7e187223fd90aeb7afafe722c" +dependencies = [ + "anyhow", + "sentry-backtrace 0.38.1", + "sentry-core 0.38.1", +] + [[package]] name = "sentry-backtrace" version = "0.31.8" @@ -6937,7 +7001,18 @@ dependencies = [ "backtrace", "once_cell", "regex", - "sentry-core", + "sentry-core 0.31.8", +] + +[[package]] +name = "sentry-backtrace" +version = "0.38.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8dace796060e4ad10e3d1405b122ae184a8b2e71dce05ae450e4f81b7686b0d9" +dependencies = [ + "backtrace", + "regex", + "sentry-core 0.38.1", ] [[package]] @@ -6950,7 +7025,7 @@ dependencies = [ "libc", "os_info", "rustc_version", - "sentry-core", + "sentry-core 0.31.8", "uname", ] @@ -6962,7 +7037,18 @@ checksum = "901f761681f97db3db836ef9e094acdd8756c40215326c194201941947164ef1" dependencies = [ "once_cell", "rand 0.8.5", - "sentry-types", + "sentry-types 0.31.8", + "serde", + "serde_json", +] + +[[package]] +name = "sentry-core" +version = "0.38.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7426d4beec270cfdbb50f85f0bb2ce176ea57eed0b11741182a163055a558187" +dependencies = [ + "sentry-types 0.38.1", "serde", "serde_json", ] @@ -6975,7 +7061,7 @@ checksum = "afdb263e73d22f39946f6022ed455b7561b22ff5553aca9be3c6a047fa39c328" dependencies = [ "findshlibs", "once_cell", - "sentry-core", + "sentry-core 0.31.8", ] [[package]] @@ -6984,8 +7070,8 @@ version = "0.31.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "74fbf1c163f8b6a9d05912e1b272afa27c652e8b47ea60cb9a57ad5e481eea99" dependencies = [ - "sentry-backtrace", - "sentry-core", + "sentry-backtrace 0.31.8", + "sentry-core 0.31.8", ] [[package]] @@ -6994,8 +7080,8 @@ version = "0.31.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "82eabcab0a047040befd44599a1da73d3adb228ff53b5ed9795ae04535577704" dependencies = [ - "sentry-backtrace", - "sentry-core", + "sentry-backtrace 0.31.8", + "sentry-core 0.31.8", "tracing-core", "tracing-subscriber", ] @@ -7017,6 +7103,23 @@ dependencies = [ "uuid", ] +[[package]] +name = "sentry-types" +version = "0.38.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "04b6c9287202294685cb1f749b944dbbce8160b81a1061ecddc073025fed129f" +dependencies = [ + "debugid", + "hex", + "rand 0.9.1", + "serde", + "serde_json", + "thiserror 2.0.12", + "time", + "url", + "uuid", +] + [[package]] name = "serde" version = "1.0.217" @@ -7647,7 +7750,7 @@ checksum = "9a8a559c81686f576e8cd0290cd2a24a2a9ad80c98b3478856500fcbd7acd704" dependencies = [ "cfg-if", "fastrand 2.3.0", - "getrandom", + "getrandom 0.2.15", "once_cell", "rustix", "windows-sys 0.59.0", @@ -7684,11 +7787,11 @@ dependencies = [ [[package]] name = "thiserror" -version = "2.0.9" +version = "2.0.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f072643fd0190df67a8bab670c20ef5d8737177d6ac6b2e9a236cb096206b2cc" +checksum = "567b8a2dae586314f7be2a752ec7474332959c6460e02bde30d702a66d488708" dependencies = [ - "thiserror-impl 2.0.9", + "thiserror-impl 2.0.12", ] [[package]] @@ -7704,9 +7807,9 @@ dependencies = [ [[package]] name = "thiserror-impl" -version = "2.0.9" +version = "2.0.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b50fa271071aae2e6ee85f842e2e28ba8cd2c5fb67f11fcb1fd70b276f9e7d4" +checksum = "7f7cf42b4507d8ea322120659672cf1b9dbb93f8f2d4ecfd6e51350ff5b17a1d" dependencies = [ "proc-macro2", "quote 1.0.38", @@ -8199,7 +8302,7 @@ version = "1.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "04f903f293d11f31c0c29e4148f6dc0d033a7f80cebc0282bea147611667d289" dependencies = [ - "getrandom", + "getrandom 0.2.15", "rand 0.8.5", "web-time", ] @@ -8459,7 +8562,7 @@ version = "1.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f8c5f0a0af699448548ad1a2fbf920fb4bee257eae39953ba95cb84891a0446a" dependencies = [ - "getrandom", + "getrandom 0.2.15", "serde", ] @@ -8606,13 +8709,22 @@ version = "0.11.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" +[[package]] +name = "wasi" +version = "0.14.2+wasi-0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9683f9a5a998d873c0d21fcbe3c083009670149a8fab228644b8bd36b2c48cb3" +dependencies = [ + "wit-bindgen-rt", +] + [[package]] name = "wasix" version = "0.12.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c1fbb4ef9bbca0c1170e0b00dd28abc9e3b68669821600cad1caaed606583c6d" dependencies = [ - "wasi", + "wasi 0.11.0+wasi-snapshot-preview1", ] [[package]] @@ -9061,6 +9173,15 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "wit-bindgen-rt" +version = "0.39.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f42320e61fe2cfd34354ecb597f86f413484a798ba44a8ca1165c58d42da6c1" +dependencies = [ + "bitflags 2.6.0", +] + [[package]] name = "write16" version = "1.0.0" diff --git a/crates/core/config/Cargo.toml b/crates/core/config/Cargo.toml index 2411d099..a2beff9a 100644 --- a/crates/core/config/Cargo.toml +++ b/crates/core/config/Cargo.toml @@ -9,9 +9,10 @@ description = "Revolt Backend: Configuration" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [features] +anyhow = ["dep:sentry-anyhow"] report-macros = ["revolt-result"] test = ["async-std"] -default = ["test"] +default = ["test", "anyhow"] [dependencies] # Utility @@ -32,6 +33,7 @@ pretty_env_logger = "0.4.0" # Sentry sentry = "0.31.5" +sentry-anyhow = { version = "0.38.1", optional = true } # Core revolt-result = { version = "0.8.5", path = "../result", optional = true } diff --git a/crates/core/config/src/lib.rs b/crates/core/config/src/lib.rs index 8f67c76a..b4b3bed2 100644 --- a/crates/core/config/src/lib.rs +++ b/crates/core/config/src/lib.rs @@ -7,6 +7,7 @@ use once_cell::sync::Lazy; use serde::Deserialize; pub use sentry::{capture_error, capture_message, Level}; +pub use sentry_anyhow::capture_anyhow; #[cfg(feature = "report-macros")] #[macro_export] @@ -343,6 +344,7 @@ pub struct Sentry { pub events: String, pub files: String, pub proxy: String, + pub pushd: String, pub crond: String, } diff --git a/crates/daemons/pushd/Cargo.toml b/crates/daemons/pushd/Cargo.toml index 6ad8bc1a..699654de 100644 --- a/crates/daemons/pushd/Cargo.toml +++ b/crates/daemons/pushd/Cargo.toml @@ -5,7 +5,10 @@ edition = "2021" license = "AGPL-3.0-or-later" [dependencies] -revolt-config = { version = "0.8.5", path = "../../core/config" } +revolt-result = { version = "0.8.5", path = "../../core/result" } +revolt-config = { version = "0.8.5", path = "../../core/config", features = [ + "report-macros", +] } revolt-database = { version = "0.8.5", path = "../../core/database" } revolt-models = { version = "0.8.5", path = "../../core/models", features = [ "validator", @@ -14,6 +17,8 @@ revolt-presence = { version = "0.8.5", path = "../../core/presence", features = "redis-is-patched", ] } +anyhow = { version = "1.0.98" } + amqprs = { version = "1.7.0" } fcm_v1 = "0.3.0" web-push = "0.10.0" diff --git a/crates/daemons/pushd/src/consumers/inbound/fr_accepted.rs b/crates/daemons/pushd/src/consumers/inbound/fr_accepted.rs index ff084a83..d525138c 100644 --- a/crates/daemons/pushd/src/consumers/inbound/fr_accepted.rs +++ b/crates/daemons/pushd/src/consumers/inbound/fr_accepted.rs @@ -7,6 +7,7 @@ use amqprs::{ consumer::AsyncConsumer, BasicProperties, Deliver, }; +use anyhow::Result; use async_trait::async_trait; use log::debug; use revolt_database::{events::rabbit::*, Database}; @@ -54,21 +55,16 @@ impl FRAcceptedConsumer { channel: None, } } -} -#[allow(unused_variables)] -#[async_trait] -impl AsyncConsumer for FRAcceptedConsumer { - /// This consumer handles delegating messages into their respective platform queues. - async fn consume( + async fn consume_event( &mut self, - channel: &Channel, - deliver: Deliver, - basic_properties: BasicProperties, + _channel: &Channel, + _deliver: Deliver, + _basic_properties: BasicProperties, content: Vec, - ) { - let content = String::from_utf8(content).unwrap(); - let payload: FRAcceptedPayload = serde_json::from_str(content.as_str()).unwrap(); + ) -> Result<()> { + let content = String::from_utf8(content)?; + let payload: FRAcceptedPayload = serde_json::from_str(content.as_str())?; debug!("Received FR accept event"); @@ -111,11 +107,34 @@ impl AsyncConsumer for FRAcceptedConsumer { .insert("endpoint".to_string(), sub.endpoint.clone()); } - let payload = serde_json::to_string(&sendable).unwrap(); + let payload = serde_json::to_string(&sendable)?; publish_message(self, payload.into(), args).await; } } } + + Ok(()) + } +} + +#[allow(unused_variables)] +#[async_trait] +impl AsyncConsumer for FRAcceptedConsumer { + /// This consumer handles delegating messages into their respective platform queues. + async fn consume( + &mut self, + channel: &Channel, + deliver: Deliver, + basic_properties: BasicProperties, + content: Vec, + ) { + if let Err(err) = self + .consume_event(channel, deliver, basic_properties, content) + .await + { + revolt_config::capture_anyhow(&err); + eprintln!("Failed to process friend request accepted event: {err:?}"); + } } } diff --git a/crates/daemons/pushd/src/consumers/inbound/fr_received.rs b/crates/daemons/pushd/src/consumers/inbound/fr_received.rs index c52dfec1..f64bc92b 100644 --- a/crates/daemons/pushd/src/consumers/inbound/fr_received.rs +++ b/crates/daemons/pushd/src/consumers/inbound/fr_received.rs @@ -7,6 +7,7 @@ use amqprs::{ consumer::AsyncConsumer, BasicProperties, Deliver, }; +use anyhow::Result; use async_trait::async_trait; use log::debug; use revolt_database::{events::rabbit::*, Database}; @@ -54,21 +55,16 @@ impl FRReceivedConsumer { channel: None, } } -} -#[allow(unused_variables)] -#[async_trait] -impl AsyncConsumer for FRReceivedConsumer { - /// This consumer handles delegating messages into their respective platform queues. - async fn consume( + async fn consume_event( &mut self, - channel: &Channel, - deliver: Deliver, - basic_properties: BasicProperties, + _channel: &Channel, + _deliver: Deliver, + _basic_properties: BasicProperties, content: Vec, - ) { - let content = String::from_utf8(content).unwrap(); - let payload: FRReceivedPayload = serde_json::from_str(content.as_str()).unwrap(); + ) -> Result<()> { + let content = String::from_utf8(content)?; + let payload: FRReceivedPayload = serde_json::from_str(content.as_str())?; debug!("Received FR received event"); @@ -111,11 +107,34 @@ impl AsyncConsumer for FRReceivedConsumer { .insert("endpoint".to_string(), sub.endpoint.clone()); } - let payload = serde_json::to_string(&sendable).unwrap(); + let payload = serde_json::to_string(&sendable)?; publish_message(self, payload.into(), args).await; } } } + + Ok(()) + } +} + +#[allow(unused_variables)] +#[async_trait] +impl AsyncConsumer for FRReceivedConsumer { + /// This consumer handles delegating messages into their respective platform queues. + async fn consume( + &mut self, + channel: &Channel, + deliver: Deliver, + basic_properties: BasicProperties, + content: Vec, + ) { + if let Err(err) = self + .consume_event(channel, deliver, basic_properties, content) + .await + { + revolt_config::capture_anyhow(&err); + eprintln!("Failed to process friend request received event: {err:?}"); + } } } diff --git a/crates/daemons/pushd/src/consumers/inbound/generic.rs b/crates/daemons/pushd/src/consumers/inbound/generic.rs index 58070f66..aa3950d7 100644 --- a/crates/daemons/pushd/src/consumers/inbound/generic.rs +++ b/crates/daemons/pushd/src/consumers/inbound/generic.rs @@ -7,6 +7,7 @@ use amqprs::{ consumer::AsyncConsumer, BasicProperties, Deliver, }; +use anyhow::Result; use async_trait::async_trait; use log::debug; use revolt_database::{events::rabbit::*, Database}; @@ -54,21 +55,16 @@ impl GenericConsumer { channel: None, } } -} -#[allow(unused_variables)] -#[async_trait] -impl AsyncConsumer for GenericConsumer { - /// This consumer handles delegating messages into their respective platform queues. - async fn consume( + async fn consume_event( &mut self, - channel: &Channel, - deliver: Deliver, - basic_properties: BasicProperties, + _channel: &Channel, + _deliver: Deliver, + _basic_properties: BasicProperties, content: Vec, - ) { - let content = String::from_utf8(content).unwrap(); - let payload: MessageSentPayload = serde_json::from_str(content.as_str()).unwrap(); + ) -> Result<()> { + let content = String::from_utf8(content)?; + let payload: MessageSentPayload = serde_json::from_str(content.as_str())?; debug!("Received message event on origin"); @@ -117,11 +113,34 @@ impl AsyncConsumer for GenericConsumer { .insert("endpoint".to_string(), sub.endpoint.clone()); } - let payload = serde_json::to_string(&sendable).unwrap(); + let payload = serde_json::to_string(&sendable)?; publish_message(self, payload.into(), args).await; } } } + + Ok(()) + } +} + +#[allow(unused_variables)] +#[async_trait] +impl AsyncConsumer for GenericConsumer { + /// This consumer handles delegating messages into their respective platform queues. + async fn consume( + &mut self, + channel: &Channel, + deliver: Deliver, + basic_properties: BasicProperties, + content: Vec, + ) { + if let Err(err) = self + .consume_event(channel, deliver, basic_properties, content) + .await + { + revolt_config::capture_anyhow(&err); + eprintln!("Failed to process generic event: {err:?}"); + } } } diff --git a/crates/daemons/pushd/src/consumers/inbound/mass_mention.rs b/crates/daemons/pushd/src/consumers/inbound/mass_mention.rs index a44dcb1e..bf5dd453 100644 --- a/crates/daemons/pushd/src/consumers/inbound/mass_mention.rs +++ b/crates/daemons/pushd/src/consumers/inbound/mass_mention.rs @@ -10,6 +10,7 @@ use amqprs::{ consumer::AsyncConsumer, BasicProperties, Deliver, }; +use anyhow::Result; use async_trait::async_trait; use revolt_database::{ events::rabbit::*, util::bulk_permissions::BulkDatabasePermissionQuery, Database, Member, @@ -61,7 +62,11 @@ impl MassMessageConsumer { } } - async fn fire_notification_for_users(&mut self, push: &PushNotification, users: &[String]) { + async fn fire_notification_for_users( + &mut self, + push: &PushNotification, + users: &[String], + ) -> Result<()> { if let Ok(sessions) = self .authifier_db .find_sessions_with_subscription(users) @@ -105,29 +110,26 @@ impl MassMessageConsumer { .insert("endpoint".to_string(), sub.endpoint.clone()); } - let payload = serde_json::to_string(&sendable).unwrap(); + let payload = serde_json::to_string(&sendable)?; publish_message(self, payload.into(), args).await; } } } - } -} -#[allow(unused_variables)] -#[async_trait] -impl AsyncConsumer for MassMessageConsumer { - /// This consumer handles adding mentions for all the users affected by a mass mention ping, and then sends out push notifications - async fn consume( + Ok(()) + } + + async fn consume_event( &mut self, - channel: &Channel, - deliver: Deliver, - basic_properties: BasicProperties, + _channel: &Channel, + _deliver: Deliver, + _basic_properties: BasicProperties, content: Vec, - ) { + ) -> Result<()> { let config = revolt_config::config().await; - let content = String::from_utf8(content).unwrap(); - let payload: MassMessageSentPayload = serde_json::from_str(content.as_str()).unwrap(); + let content = String::from_utf8(content)?; + let payload: MassMessageSentPayload = serde_json::from_str(content.as_str())?; debug!("Received mass message event"); @@ -159,8 +161,7 @@ impl AsyncConsumer for MassMessageConsumer { let mut db_query = self .db .fetch_all_members_chunked(&payload.server_id) - .await - .expect("Failed to fetch members from database"); + .await?; let mut exhausted = false; let ack_chnl = vec![push.channel.id().to_string()]; @@ -203,7 +204,8 @@ impl AsyncConsumer for MassMessageConsumer { target_users, online_users ); - self.fire_notification_for_users(&push, &target_users).await; + self.fire_notification_for_users(&push, &target_users) + .await?; if exhausted { break; @@ -211,19 +213,11 @@ impl AsyncConsumer for MassMessageConsumer { } } else if let Some(roles) = &push.message.role_mentions { // role mentions - let _role_members = self + let mut role_members = self .db .fetch_all_members_with_roles_chunked(&payload.server_id, roles) - .await; + .await?; - debug!("role members: {:?}", _role_members); - - if _role_members.is_err() { - revolt_config::capture_error(&_role_members.err().unwrap()); - return; - } - - let mut role_members = _role_members.unwrap(); let mut chunk = vec![]; let mut exhausted = false; @@ -266,10 +260,33 @@ impl AsyncConsumer for MassMessageConsumer { debug!("targets: {:?}", targets); - self.fire_notification_for_users(&push, &targets).await; + self.fire_notification_for_users(&push, &targets).await?; } } } } + + Ok(()) + } +} + +#[allow(unused_variables)] +#[async_trait] +impl AsyncConsumer for MassMessageConsumer { + /// This consumer handles adding mentions for all the users affected by a mass mention ping, and then sends out push notifications + async fn consume( + &mut self, + channel: &Channel, + deliver: Deliver, + basic_properties: BasicProperties, + content: Vec, + ) { + if let Err(err) = self + .consume_event(channel, deliver, basic_properties, content) + .await + { + revolt_config::capture_anyhow(&err); + eprintln!("Failed to process mass message event: {err:?}"); + } } } diff --git a/crates/daemons/pushd/src/consumers/inbound/message.rs b/crates/daemons/pushd/src/consumers/inbound/message.rs index 99e61cbd..6b9d00fe 100644 --- a/crates/daemons/pushd/src/consumers/inbound/message.rs +++ b/crates/daemons/pushd/src/consumers/inbound/message.rs @@ -7,6 +7,7 @@ use amqprs::{ consumer::AsyncConsumer, BasicProperties, Deliver, }; +use anyhow::Result; use async_trait::async_trait; use log::debug; use revolt_database::{events::rabbit::*, Database}; @@ -54,21 +55,16 @@ impl MessageConsumer { channel: None, } } -} -#[allow(unused_variables)] -#[async_trait] -impl AsyncConsumer for MessageConsumer { - /// This consumer handles delegating messages into their respective platform queues. - async fn consume( + async fn consume_event( &mut self, - channel: &Channel, - deliver: Deliver, - basic_properties: BasicProperties, + _channel: &Channel, + _deliver: Deliver, + _basic_properties: BasicProperties, content: Vec, - ) { - let content = String::from_utf8(content).unwrap(); - let payload: MessageSentPayload = serde_json::from_str(content.as_str()).unwrap(); + ) -> Result<()> { + let content = String::from_utf8(content)?; + let payload: MessageSentPayload = serde_json::from_str(content.as_str())?; debug!("Received message event on origin"); @@ -117,11 +113,34 @@ impl AsyncConsumer for MessageConsumer { .insert("endpoint".to_string(), sub.endpoint.clone()); } - let payload = serde_json::to_string(&sendable).unwrap(); + let payload = serde_json::to_string(&sendable)?; publish_message(self, payload.into(), args).await; } } } + + Ok(()) + } +} + +#[allow(unused_variables)] +#[async_trait] +impl AsyncConsumer for MessageConsumer { + /// This consumer handles delegating messages into their respective platform queues. + async fn consume( + &mut self, + channel: &Channel, + deliver: Deliver, + basic_properties: BasicProperties, + content: Vec, + ) { + if let Err(err) = self + .consume_event(channel, deliver, basic_properties, content) + .await + { + revolt_config::capture_anyhow(&err); + eprintln!("Failed to process message event: {err:?}"); + } } } diff --git a/crates/daemons/pushd/src/consumers/outbound/apn.rs b/crates/daemons/pushd/src/consumers/outbound/apn.rs index f9a09cc8..7cc43c76 100644 --- a/crates/daemons/pushd/src/consumers/outbound/apn.rs +++ b/crates/daemons/pushd/src/consumers/outbound/apn.rs @@ -1,6 +1,7 @@ use std::{borrow::Cow, collections::BTreeMap, io::Cursor}; use amqprs::{channel::Channel as AmqpChannel, consumer::AsyncConsumer, BasicProperties, Deliver}; +use anyhow::{anyhow, Result}; use async_trait::async_trait; use base64::{ engine::{self}, @@ -122,20 +123,16 @@ impl ApnsOutboundConsumer { Ok(ApnsOutboundConsumer { db, client }) } -} -#[allow(unused_variables)] -#[async_trait] -impl AsyncConsumer for ApnsOutboundConsumer { - async fn consume( + async fn consume_event( &mut self, - channel: &AmqpChannel, - deliver: Deliver, - basic_properties: BasicProperties, + _channel: &AmqpChannel, + _deliver: Deliver, + _basic_properties: BasicProperties, content: Vec, - ) { - let content = String::from_utf8(content).unwrap(); - let payload: PayloadToService = serde_json::from_str(content.as_str()).unwrap(); + ) -> Result<()> { + let content = String::from_utf8(content)?; + let payload: PayloadToService = serde_json::from_str(content.as_str())?; let payload_options = NotificationOptions { apns_id: None, @@ -159,7 +156,7 @@ impl AsyncConsumer for ApnsOutboundConsumer { alert.from_user.username, alert.from_user.discriminator ))) .clone() - .unwrap(), + .ok_or_else(|| anyhow!("missing name"))?, )]; let apn_payload = Payload { @@ -205,7 +202,7 @@ impl AsyncConsumer for ApnsOutboundConsumer { alert.accepted_user.username, alert.accepted_user.discriminator ))) .clone() - .unwrap(), + .ok_or_else(|| anyhow!("missing name"))?, )]; let apn_payload = Payload { @@ -355,5 +352,27 @@ impl AsyncConsumer for ApnsOutboundConsumer { } } } + + Ok(()) + } +} + +#[allow(unused_variables)] +#[async_trait] +impl AsyncConsumer for ApnsOutboundConsumer { + async fn consume( + &mut self, + channel: &AmqpChannel, + deliver: Deliver, + basic_properties: BasicProperties, + content: Vec, + ) { + if let Err(err) = self + .consume_event(channel, deliver, basic_properties, content) + .await + { + revolt_config::capture_anyhow(&err); + eprintln!("Failed to process APN event: {err:?}"); + } } } diff --git a/crates/daemons/pushd/src/consumers/outbound/fcm.rs b/crates/daemons/pushd/src/consumers/outbound/fcm.rs index 61700bbb..1e3f1d2a 100644 --- a/crates/daemons/pushd/src/consumers/outbound/fcm.rs +++ b/crates/daemons/pushd/src/consumers/outbound/fcm.rs @@ -2,6 +2,7 @@ use std::{collections::HashMap, time::Duration}; use amqprs::{channel::Channel as AmqpChannel, consumer::AsyncConsumer, BasicProperties, Deliver}; +use anyhow::{anyhow, bail, Result}; use async_trait::async_trait; use fcm_v1::{ android::AndroidConfig, @@ -64,22 +65,16 @@ impl FcmOutboundConsumer { ), }) } -} -#[allow(unused_variables)] -#[async_trait] -impl AsyncConsumer for FcmOutboundConsumer { - async fn consume( + async fn consume_event( &mut self, - channel: &AmqpChannel, - deliver: Deliver, - basic_properties: BasicProperties, + _channel: &AmqpChannel, + _deliver: Deliver, + _basic_properties: BasicProperties, content: Vec, - ) { - let content = String::from_utf8(content).unwrap(); - let payload: PayloadToService = serde_json::from_str(content.as_str()).unwrap(); - - let config = revolt_config::config().await; + ) -> Result<()> { + let content = String::from_utf8(content)?; + let payload: PayloadToService = serde_json::from_str(content.as_str())?; #[allow(clippy::needless_late_init)] let resp: Result; @@ -94,7 +89,7 @@ impl AsyncConsumer for FcmOutboundConsumer { alert.from_user.username, alert.from_user.discriminator ))) .clone() - .unwrap(); + .ok_or_else(|| anyhow!("missing name"))?; let mut data = HashMap::new(); data.insert( @@ -122,7 +117,7 @@ impl AsyncConsumer for FcmOutboundConsumer { alert.accepted_user.username, alert.accepted_user.discriminator ))) .clone() - .unwrap(); + .ok_or_else(|| anyhow!("missing name"))?; let mut data: HashMap = HashMap::new(); data.insert( @@ -175,7 +170,7 @@ impl AsyncConsumer for FcmOutboundConsumer { } PayloadKind::BadgeUpdate(_) => { - panic!("FCM cannot handle badge updates, and they should not be sent here.") + bail!("FCM cannot handle badge updates and they should not be sent here."); } } @@ -195,5 +190,27 @@ impl AsyncConsumer for FcmOutboundConsumer { } } } + + Ok(()) + } +} + +#[allow(unused_variables)] +#[async_trait] +impl AsyncConsumer for FcmOutboundConsumer { + async fn consume( + &mut self, + channel: &AmqpChannel, + deliver: Deliver, + basic_properties: BasicProperties, + content: Vec, + ) { + if let Err(err) = self + .consume_event(channel, deliver, basic_properties, content) + .await + { + revolt_config::capture_anyhow(&err); + eprintln!("Failed to process FCM event: {err:?}"); + } } } diff --git a/crates/daemons/pushd/src/consumers/outbound/vapid.rs b/crates/daemons/pushd/src/consumers/outbound/vapid.rs index fb735d5e..92ce6e56 100644 --- a/crates/daemons/pushd/src/consumers/outbound/vapid.rs +++ b/crates/daemons/pushd/src/consumers/outbound/vapid.rs @@ -2,13 +2,13 @@ use std::collections::HashMap; use amqprs::{channel::Channel as AmqpChannel, consumer::AsyncConsumer, BasicProperties, Deliver}; +use anyhow::{anyhow, bail, Result}; use async_trait::async_trait; use base64::{ engine::{self}, Engine as _, }; use revolt_database::{events::rabbit::*, Database}; -// use revolt_models::v0::{Channel, PushNotification}; use web_push::{ ContentEncoding, IsahcWebPushClient, SubscriptionInfo, SubscriptionKeys, VapidSignatureBuilder, WebPushClient, WebPushError, WebPushMessageBuilder, @@ -21,11 +21,11 @@ pub struct VapidOutboundConsumer { } impl VapidOutboundConsumer { - pub async fn new(db: Database) -> Result { + pub async fn new(db: Database) -> Result { let config = revolt_config::config().await; if config.pushd.vapid.private_key.is_empty() | config.pushd.vapid.public_key.is_empty() { - return Err("No Vapid keys present"); + bail!("no Vapid keys present"); } let web_push_private_key = engine::general_purpose::URL_SAFE_NO_PAD @@ -38,28 +38,30 @@ impl VapidOutboundConsumer { pkey: web_push_private_key, }) } -} -#[allow(unused_variables)] -#[async_trait] -impl AsyncConsumer for VapidOutboundConsumer { - async fn consume( + async fn consume_event( &mut self, - channel: &AmqpChannel, - deliver: Deliver, - basic_properties: BasicProperties, + _channel: &AmqpChannel, + _deliver: Deliver, + _basic_properties: BasicProperties, content: Vec, - ) { - let content = String::from_utf8(content).unwrap(); - let payload: PayloadToService = serde_json::from_str(content.as_str()).unwrap(); - - let config = revolt_config::config().await; + ) -> Result<()> { + let content = String::from_utf8(content)?; + let payload: PayloadToService = serde_json::from_str(content.as_str())?; let subscription = SubscriptionInfo { - endpoint: payload.extras.get("endpoint").unwrap().clone(), + endpoint: payload + .extras + .get("endpoint") + .ok_or_else(|| anyhow!("missing endpoint"))? + .clone(), keys: SubscriptionKeys { auth: payload.token, - p256dh: payload.extras.get("p256dh").unwrap().clone(), + p256dh: payload + .extras + .get("p256dh") + .ok_or_else(|| anyhow!("missing p256dh"))? + .clone(), }, }; @@ -76,12 +78,12 @@ impl AsyncConsumer for VapidOutboundConsumer { alert.from_user.username, alert.from_user.discriminator ))) .clone() - .unwrap(); + .ok_or_else(|| anyhow!("missing name"))?; let mut body = HashMap::new(); body.insert("body", format!("{} sent you a friend request", name)); - payload_body = serde_json::to_string(&body).unwrap(); + payload_body = serde_json::to_string(&body)?; } PayloadKind::FRAccepted(alert) => { let name = alert @@ -92,21 +94,21 @@ impl AsyncConsumer for VapidOutboundConsumer { alert.accepted_user.username, alert.accepted_user.discriminator ))) .clone() - .unwrap(); + .ok_or_else(|| anyhow!("missing name"))?; let mut body = HashMap::new(); body.insert("body", format!("{} accepted your friend request", name)); - payload_body = serde_json::to_string(&body).unwrap(); + payload_body = serde_json::to_string(&body)?; } PayloadKind::Generic(alert) => { - payload_body = serde_json::to_string(&alert).unwrap(); + payload_body = serde_json::to_string(&alert)?; } PayloadKind::MessageNotification(alert) => { - payload_body = serde_json::to_string(&alert).unwrap(); + payload_body = serde_json::to_string(&alert)?; } PayloadKind::BadgeUpdate(_) => { - panic!("Vapid cannot handle badge updates, and they should not be sent here.") + bail!("Vapid cannot handle badge updates and they should not be sent here."); } } @@ -122,28 +124,40 @@ impl AsyncConsumer for VapidOutboundConsumer { Ok(msg) => { if let Err(err) = self.client.send(msg).await { if err == WebPushError::Unauthorized { - if let Err(err) = self - .db + self.db .remove_push_subscription_by_session_id(&payload.session_id) - .await - { - revolt_config::capture_error(&err); - } + .await?; } } + + Ok(()) } - Err(err) => { - revolt_config::capture_error(&err); - } + Err(err) => Err(err.into()), } } - Err(err) => { - revolt_config::capture_error(&err); - } + Err(err) => Err(err.into()), }, - Err(err) => { - revolt_config::capture_error(&err); - } + Err(err) => Err(err.into()), + } + } +} + +#[allow(unused_variables)] +#[async_trait] +impl AsyncConsumer for VapidOutboundConsumer { + async fn consume( + &mut self, + channel: &AmqpChannel, + deliver: Deliver, + basic_properties: BasicProperties, + content: Vec, + ) { + if let Err(err) = self + .consume_event(channel, deliver, basic_properties, content) + .await + { + revolt_config::capture_anyhow(&err); + eprintln!("Failed to process Vapid event: {err:?}"); } } } diff --git a/crates/daemons/pushd/src/main.rs b/crates/daemons/pushd/src/main.rs index 1560b049..57be4d89 100644 --- a/crates/daemons/pushd/src/main.rs +++ b/crates/daemons/pushd/src/main.rs @@ -27,6 +27,9 @@ async fn main() { let config = config().await; pretty_env_logger::init(); + // Configure logging and environment + revolt_config::configure!(pushd); + // Setup database let db = revolt_database::DatabaseInfo::Auto.connect().await.unwrap(); let authifier: authifier::Database;