refactor(pushd): add non-panic error handling to all queue consumers

This commit is contained in:
izzy
2025-05-13 10:41:03 +01:00
parent 01e0f9e558
commit 2aff76c369
14 changed files with 460 additions and 184 deletions
+2 -2
View File
@@ -1,6 +1,6 @@
{
"editor.formatOnSave": true,
"rust-analyzer.checkOnSave.command": "clippy",
"rust-analyzer.check.command": "clippy",
"nixEnvSelector.suggestion": false,
"nixEnvSelector.nixFile": "${workspaceFolder}/default.nix"
}
}
Generated
+151 -30
View File
@@ -58,7 +58,7 @@ version = "0.7.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "891477e0c6a8957309ee5c45a6368af3ae14bb510732d2684ffa19af310920f9"
dependencies = [
"getrandom",
"getrandom 0.2.15",
"once_cell",
"version_check",
]
@@ -70,7 +70,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e89da841a80418a9b391ebaea17f5c112ffaaa96f621d2c285b5174da76b9011"
dependencies = [
"cfg-if",
"getrandom",
"getrandom 0.2.15",
"once_cell",
"version_check",
"zerocopy",
@@ -139,9 +139,12 @@ dependencies = [
[[package]]
name = "anyhow"
version = "1.0.95"
version = "1.0.98"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "34ac096ce696dc2fcabef30516bb13c0a68a11d30131d3df6f04711467681b04"
checksum = "e16d2d3311acee920a9eb8d33b8cbc1787ce4a264e85f964c2404b969bdcd487"
dependencies = [
"backtrace",
]
[[package]]
name = "arbitrary"
@@ -2056,7 +2059,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e9b3460f44bea8cd47f45a0c70892f1eff856d97cd55358b2f73f663789f6190"
dependencies = [
"ct-codecs",
"getrandom",
"getrandom 0.2.15",
]
[[package]]
@@ -2675,10 +2678,22 @@ dependencies = [
"cfg-if",
"js-sys",
"libc",
"wasi",
"wasi 0.11.0+wasi-snapshot-preview1",
"wasm-bindgen",
]
[[package]]
name = "getrandom"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "26145e563e54f2cadc477553f1ec5ee650b00862f0a58bcd12cbdc5f0ea2d2f4"
dependencies = [
"cfg-if",
"libc",
"r-efi",
"wasi 0.14.2+wasi-0.2.4",
]
[[package]]
name = "getset"
version = "0.1.3"
@@ -4313,7 +4328,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2886843bf800fba2e3377cff24abf6379b4c4d5c6681eaf9ea5b0d15090450bd"
dependencies = [
"libc",
"wasi",
"wasi 0.11.0+wasi-snapshot-preview1",
"windows-sys 0.52.0",
]
@@ -4933,7 +4948,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b7cafe60d6cf8e62e1b9b2ea516a089c008945bb5a275416789e7db0bc199dc"
dependencies = [
"memchr",
"thiserror 2.0.9",
"thiserror 2.0.12",
"ucd-trie",
]
@@ -5386,7 +5401,7 @@ dependencies = [
"libc",
"once_cell",
"raw-cpuid",
"wasi",
"wasi 0.11.0+wasi-snapshot-preview1",
"web-sys",
"winapi",
]
@@ -5430,6 +5445,12 @@ version = "0.4.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5a3866219251662ec3b26fc217e3e05bf9c4f84325234dfb96bf0bf840889e49"
[[package]]
name = "r-efi"
version = "5.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "74765f6d916ee2faa39bc8e68e4f3ed8949b48cccdac59983d287a7cb71ce9c5"
[[package]]
name = "radium"
version = "0.7.0"
@@ -5466,6 +5487,16 @@ dependencies = [
"rand_core 0.6.4",
]
[[package]]
name = "rand"
version = "0.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9fbfd9d094a40bf3ae768db9361049ace4c0e04a4fd6b359518bd7b73a73dd97"
dependencies = [
"rand_chacha 0.9.0",
"rand_core 0.9.3",
]
[[package]]
name = "rand_chacha"
version = "0.1.1"
@@ -5486,6 +5517,16 @@ dependencies = [
"rand_core 0.6.4",
]
[[package]]
name = "rand_chacha"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb"
dependencies = [
"ppv-lite86",
"rand_core 0.9.3",
]
[[package]]
name = "rand_core"
version = "0.3.1"
@@ -5507,7 +5548,16 @@ version = "0.6.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c"
dependencies = [
"getrandom",
"getrandom 0.2.15",
]
[[package]]
name = "rand_core"
version = "0.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "99d9a13982dcf210057a8a78572b2217b667c3beacbf3a0d8b454f6f82837d38"
dependencies = [
"getrandom 0.3.3",
]
[[package]]
@@ -5982,6 +6032,7 @@ dependencies = [
"pretty_env_logger",
"revolt-result",
"sentry",
"sentry-anyhow",
"serde",
]
@@ -6202,6 +6253,7 @@ name = "revolt-pushd"
version = "0.8.5"
dependencies = [
"amqprs",
"anyhow",
"async-trait",
"authifier",
"base64 0.22.1",
@@ -6214,6 +6266,7 @@ dependencies = [
"revolt-database",
"revolt-models",
"revolt-presence",
"revolt-result",
"revolt_a2",
"revolt_optional_struct",
"serde",
@@ -6356,7 +6409,7 @@ checksum = "c17fa4cb658e3583423e915b9f3acc01cceaee1860e33d59ebae66adc3a2dc0d"
dependencies = [
"cc",
"cfg-if",
"getrandom",
"getrandom 0.2.15",
"libc",
"spin",
"untrusted",
@@ -6918,9 +6971,9 @@ dependencies = [
"httpdate",
"native-tls",
"reqwest 0.11.27",
"sentry-backtrace",
"sentry-backtrace 0.31.8",
"sentry-contexts",
"sentry-core",
"sentry-core 0.31.8",
"sentry-debug-images",
"sentry-panic",
"sentry-tracing",
@@ -6928,6 +6981,17 @@ dependencies = [
"ureq",
]
[[package]]
name = "sentry-anyhow"
version = "0.38.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3c93cfb800654dc846ccbeb35254a2f6f194aaf7e187223fd90aeb7afafe722c"
dependencies = [
"anyhow",
"sentry-backtrace 0.38.1",
"sentry-core 0.38.1",
]
[[package]]
name = "sentry-backtrace"
version = "0.31.8"
@@ -6937,7 +7001,18 @@ dependencies = [
"backtrace",
"once_cell",
"regex",
"sentry-core",
"sentry-core 0.31.8",
]
[[package]]
name = "sentry-backtrace"
version = "0.38.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8dace796060e4ad10e3d1405b122ae184a8b2e71dce05ae450e4f81b7686b0d9"
dependencies = [
"backtrace",
"regex",
"sentry-core 0.38.1",
]
[[package]]
@@ -6950,7 +7025,7 @@ dependencies = [
"libc",
"os_info",
"rustc_version",
"sentry-core",
"sentry-core 0.31.8",
"uname",
]
@@ -6962,7 +7037,18 @@ checksum = "901f761681f97db3db836ef9e094acdd8756c40215326c194201941947164ef1"
dependencies = [
"once_cell",
"rand 0.8.5",
"sentry-types",
"sentry-types 0.31.8",
"serde",
"serde_json",
]
[[package]]
name = "sentry-core"
version = "0.38.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7426d4beec270cfdbb50f85f0bb2ce176ea57eed0b11741182a163055a558187"
dependencies = [
"sentry-types 0.38.1",
"serde",
"serde_json",
]
@@ -6975,7 +7061,7 @@ checksum = "afdb263e73d22f39946f6022ed455b7561b22ff5553aca9be3c6a047fa39c328"
dependencies = [
"findshlibs",
"once_cell",
"sentry-core",
"sentry-core 0.31.8",
]
[[package]]
@@ -6984,8 +7070,8 @@ version = "0.31.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "74fbf1c163f8b6a9d05912e1b272afa27c652e8b47ea60cb9a57ad5e481eea99"
dependencies = [
"sentry-backtrace",
"sentry-core",
"sentry-backtrace 0.31.8",
"sentry-core 0.31.8",
]
[[package]]
@@ -6994,8 +7080,8 @@ version = "0.31.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "82eabcab0a047040befd44599a1da73d3adb228ff53b5ed9795ae04535577704"
dependencies = [
"sentry-backtrace",
"sentry-core",
"sentry-backtrace 0.31.8",
"sentry-core 0.31.8",
"tracing-core",
"tracing-subscriber",
]
@@ -7017,6 +7103,23 @@ dependencies = [
"uuid",
]
[[package]]
name = "sentry-types"
version = "0.38.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "04b6c9287202294685cb1f749b944dbbce8160b81a1061ecddc073025fed129f"
dependencies = [
"debugid",
"hex",
"rand 0.9.1",
"serde",
"serde_json",
"thiserror 2.0.12",
"time",
"url",
"uuid",
]
[[package]]
name = "serde"
version = "1.0.217"
@@ -7647,7 +7750,7 @@ checksum = "9a8a559c81686f576e8cd0290cd2a24a2a9ad80c98b3478856500fcbd7acd704"
dependencies = [
"cfg-if",
"fastrand 2.3.0",
"getrandom",
"getrandom 0.2.15",
"once_cell",
"rustix",
"windows-sys 0.59.0",
@@ -7684,11 +7787,11 @@ dependencies = [
[[package]]
name = "thiserror"
version = "2.0.9"
version = "2.0.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f072643fd0190df67a8bab670c20ef5d8737177d6ac6b2e9a236cb096206b2cc"
checksum = "567b8a2dae586314f7be2a752ec7474332959c6460e02bde30d702a66d488708"
dependencies = [
"thiserror-impl 2.0.9",
"thiserror-impl 2.0.12",
]
[[package]]
@@ -7704,9 +7807,9 @@ dependencies = [
[[package]]
name = "thiserror-impl"
version = "2.0.9"
version = "2.0.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b50fa271071aae2e6ee85f842e2e28ba8cd2c5fb67f11fcb1fd70b276f9e7d4"
checksum = "7f7cf42b4507d8ea322120659672cf1b9dbb93f8f2d4ecfd6e51350ff5b17a1d"
dependencies = [
"proc-macro2",
"quote 1.0.38",
@@ -8199,7 +8302,7 @@ version = "1.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "04f903f293d11f31c0c29e4148f6dc0d033a7f80cebc0282bea147611667d289"
dependencies = [
"getrandom",
"getrandom 0.2.15",
"rand 0.8.5",
"web-time",
]
@@ -8459,7 +8562,7 @@ version = "1.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8c5f0a0af699448548ad1a2fbf920fb4bee257eae39953ba95cb84891a0446a"
dependencies = [
"getrandom",
"getrandom 0.2.15",
"serde",
]
@@ -8606,13 +8709,22 @@ version = "0.11.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423"
[[package]]
name = "wasi"
version = "0.14.2+wasi-0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9683f9a5a998d873c0d21fcbe3c083009670149a8fab228644b8bd36b2c48cb3"
dependencies = [
"wit-bindgen-rt",
]
[[package]]
name = "wasix"
version = "0.12.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c1fbb4ef9bbca0c1170e0b00dd28abc9e3b68669821600cad1caaed606583c6d"
dependencies = [
"wasi",
"wasi 0.11.0+wasi-snapshot-preview1",
]
[[package]]
@@ -9061,6 +9173,15 @@ dependencies = [
"windows-sys 0.48.0",
]
[[package]]
name = "wit-bindgen-rt"
version = "0.39.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6f42320e61fe2cfd34354ecb597f86f413484a798ba44a8ca1165c58d42da6c1"
dependencies = [
"bitflags 2.6.0",
]
[[package]]
name = "write16"
version = "1.0.0"
+3 -1
View File
@@ -9,9 +9,10 @@ description = "Revolt Backend: Configuration"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[features]
anyhow = ["dep:sentry-anyhow"]
report-macros = ["revolt-result"]
test = ["async-std"]
default = ["test"]
default = ["test", "anyhow"]
[dependencies]
# Utility
@@ -32,6 +33,7 @@ pretty_env_logger = "0.4.0"
# Sentry
sentry = "0.31.5"
sentry-anyhow = { version = "0.38.1", optional = true }
# Core
revolt-result = { version = "0.8.5", path = "../result", optional = true }
+2
View File
@@ -7,6 +7,7 @@ use once_cell::sync::Lazy;
use serde::Deserialize;
pub use sentry::{capture_error, capture_message, Level};
pub use sentry_anyhow::capture_anyhow;
#[cfg(feature = "report-macros")]
#[macro_export]
@@ -343,6 +344,7 @@ pub struct Sentry {
pub events: String,
pub files: String,
pub proxy: String,
pub pushd: String,
pub crond: String,
}
+6 -1
View File
@@ -5,7 +5,10 @@ edition = "2021"
license = "AGPL-3.0-or-later"
[dependencies]
revolt-config = { version = "0.8.5", path = "../../core/config" }
revolt-result = { version = "0.8.5", path = "../../core/result" }
revolt-config = { version = "0.8.5", path = "../../core/config", features = [
"report-macros",
] }
revolt-database = { version = "0.8.5", path = "../../core/database" }
revolt-models = { version = "0.8.5", path = "../../core/models", features = [
"validator",
@@ -14,6 +17,8 @@ revolt-presence = { version = "0.8.5", path = "../../core/presence", features =
"redis-is-patched",
] }
anyhow = { version = "1.0.98" }
amqprs = { version = "1.7.0" }
fcm_v1 = "0.3.0"
web-push = "0.10.0"
@@ -7,6 +7,7 @@ use amqprs::{
consumer::AsyncConsumer,
BasicProperties, Deliver,
};
use anyhow::Result;
use async_trait::async_trait;
use log::debug;
use revolt_database::{events::rabbit::*, Database};
@@ -54,21 +55,16 @@ impl FRAcceptedConsumer {
channel: None,
}
}
}
#[allow(unused_variables)]
#[async_trait]
impl AsyncConsumer for FRAcceptedConsumer {
/// This consumer handles delegating messages into their respective platform queues.
async fn consume(
async fn consume_event(
&mut self,
channel: &Channel,
deliver: Deliver,
basic_properties: BasicProperties,
_channel: &Channel,
_deliver: Deliver,
_basic_properties: BasicProperties,
content: Vec<u8>,
) {
let content = String::from_utf8(content).unwrap();
let payload: FRAcceptedPayload = serde_json::from_str(content.as_str()).unwrap();
) -> Result<()> {
let content = String::from_utf8(content)?;
let payload: FRAcceptedPayload = serde_json::from_str(content.as_str())?;
debug!("Received FR accept event");
@@ -111,11 +107,34 @@ impl AsyncConsumer for FRAcceptedConsumer {
.insert("endpoint".to_string(), sub.endpoint.clone());
}
let payload = serde_json::to_string(&sendable).unwrap();
let payload = serde_json::to_string(&sendable)?;
publish_message(self, payload.into(), args).await;
}
}
}
Ok(())
}
}
#[allow(unused_variables)]
#[async_trait]
impl AsyncConsumer for FRAcceptedConsumer {
/// This consumer handles delegating messages into their respective platform queues.
async fn consume(
&mut self,
channel: &Channel,
deliver: Deliver,
basic_properties: BasicProperties,
content: Vec<u8>,
) {
if let Err(err) = self
.consume_event(channel, deliver, basic_properties, content)
.await
{
revolt_config::capture_anyhow(&err);
eprintln!("Failed to process friend request accepted event: {err:?}");
}
}
}
@@ -7,6 +7,7 @@ use amqprs::{
consumer::AsyncConsumer,
BasicProperties, Deliver,
};
use anyhow::Result;
use async_trait::async_trait;
use log::debug;
use revolt_database::{events::rabbit::*, Database};
@@ -54,21 +55,16 @@ impl FRReceivedConsumer {
channel: None,
}
}
}
#[allow(unused_variables)]
#[async_trait]
impl AsyncConsumer for FRReceivedConsumer {
/// This consumer handles delegating messages into their respective platform queues.
async fn consume(
async fn consume_event(
&mut self,
channel: &Channel,
deliver: Deliver,
basic_properties: BasicProperties,
_channel: &Channel,
_deliver: Deliver,
_basic_properties: BasicProperties,
content: Vec<u8>,
) {
let content = String::from_utf8(content).unwrap();
let payload: FRReceivedPayload = serde_json::from_str(content.as_str()).unwrap();
) -> Result<()> {
let content = String::from_utf8(content)?;
let payload: FRReceivedPayload = serde_json::from_str(content.as_str())?;
debug!("Received FR received event");
@@ -111,11 +107,34 @@ impl AsyncConsumer for FRReceivedConsumer {
.insert("endpoint".to_string(), sub.endpoint.clone());
}
let payload = serde_json::to_string(&sendable).unwrap();
let payload = serde_json::to_string(&sendable)?;
publish_message(self, payload.into(), args).await;
}
}
}
Ok(())
}
}
#[allow(unused_variables)]
#[async_trait]
impl AsyncConsumer for FRReceivedConsumer {
/// This consumer handles delegating messages into their respective platform queues.
async fn consume(
&mut self,
channel: &Channel,
deliver: Deliver,
basic_properties: BasicProperties,
content: Vec<u8>,
) {
if let Err(err) = self
.consume_event(channel, deliver, basic_properties, content)
.await
{
revolt_config::capture_anyhow(&err);
eprintln!("Failed to process friend request received event: {err:?}");
}
}
}
@@ -7,6 +7,7 @@ use amqprs::{
consumer::AsyncConsumer,
BasicProperties, Deliver,
};
use anyhow::Result;
use async_trait::async_trait;
use log::debug;
use revolt_database::{events::rabbit::*, Database};
@@ -54,21 +55,16 @@ impl GenericConsumer {
channel: None,
}
}
}
#[allow(unused_variables)]
#[async_trait]
impl AsyncConsumer for GenericConsumer {
/// This consumer handles delegating messages into their respective platform queues.
async fn consume(
async fn consume_event(
&mut self,
channel: &Channel,
deliver: Deliver,
basic_properties: BasicProperties,
_channel: &Channel,
_deliver: Deliver,
_basic_properties: BasicProperties,
content: Vec<u8>,
) {
let content = String::from_utf8(content).unwrap();
let payload: MessageSentPayload = serde_json::from_str(content.as_str()).unwrap();
) -> Result<()> {
let content = String::from_utf8(content)?;
let payload: MessageSentPayload = serde_json::from_str(content.as_str())?;
debug!("Received message event on origin");
@@ -117,11 +113,34 @@ impl AsyncConsumer for GenericConsumer {
.insert("endpoint".to_string(), sub.endpoint.clone());
}
let payload = serde_json::to_string(&sendable).unwrap();
let payload = serde_json::to_string(&sendable)?;
publish_message(self, payload.into(), args).await;
}
}
}
Ok(())
}
}
#[allow(unused_variables)]
#[async_trait]
impl AsyncConsumer for GenericConsumer {
/// This consumer handles delegating messages into their respective platform queues.
async fn consume(
&mut self,
channel: &Channel,
deliver: Deliver,
basic_properties: BasicProperties,
content: Vec<u8>,
) {
if let Err(err) = self
.consume_event(channel, deliver, basic_properties, content)
.await
{
revolt_config::capture_anyhow(&err);
eprintln!("Failed to process generic event: {err:?}");
}
}
}
@@ -10,6 +10,7 @@ use amqprs::{
consumer::AsyncConsumer,
BasicProperties, Deliver,
};
use anyhow::Result;
use async_trait::async_trait;
use revolt_database::{
events::rabbit::*, util::bulk_permissions::BulkDatabasePermissionQuery, Database, Member,
@@ -61,7 +62,11 @@ impl MassMessageConsumer {
}
}
async fn fire_notification_for_users(&mut self, push: &PushNotification, users: &[String]) {
async fn fire_notification_for_users(
&mut self,
push: &PushNotification,
users: &[String],
) -> Result<()> {
if let Ok(sessions) = self
.authifier_db
.find_sessions_with_subscription(users)
@@ -105,29 +110,26 @@ impl MassMessageConsumer {
.insert("endpoint".to_string(), sub.endpoint.clone());
}
let payload = serde_json::to_string(&sendable).unwrap();
let payload = serde_json::to_string(&sendable)?;
publish_message(self, payload.into(), args).await;
}
}
}
}
}
#[allow(unused_variables)]
#[async_trait]
impl AsyncConsumer for MassMessageConsumer {
/// This consumer handles adding mentions for all the users affected by a mass mention ping, and then sends out push notifications
async fn consume(
Ok(())
}
async fn consume_event(
&mut self,
channel: &Channel,
deliver: Deliver,
basic_properties: BasicProperties,
_channel: &Channel,
_deliver: Deliver,
_basic_properties: BasicProperties,
content: Vec<u8>,
) {
) -> Result<()> {
let config = revolt_config::config().await;
let content = String::from_utf8(content).unwrap();
let payload: MassMessageSentPayload = serde_json::from_str(content.as_str()).unwrap();
let content = String::from_utf8(content)?;
let payload: MassMessageSentPayload = serde_json::from_str(content.as_str())?;
debug!("Received mass message event");
@@ -159,8 +161,7 @@ impl AsyncConsumer for MassMessageConsumer {
let mut db_query = self
.db
.fetch_all_members_chunked(&payload.server_id)
.await
.expect("Failed to fetch members from database");
.await?;
let mut exhausted = false;
let ack_chnl = vec![push.channel.id().to_string()];
@@ -203,7 +204,8 @@ impl AsyncConsumer for MassMessageConsumer {
target_users, online_users
);
self.fire_notification_for_users(&push, &target_users).await;
self.fire_notification_for_users(&push, &target_users)
.await?;
if exhausted {
break;
@@ -211,19 +213,11 @@ impl AsyncConsumer for MassMessageConsumer {
}
} else if let Some(roles) = &push.message.role_mentions {
// role mentions
let _role_members = self
let mut role_members = self
.db
.fetch_all_members_with_roles_chunked(&payload.server_id, roles)
.await;
.await?;
debug!("role members: {:?}", _role_members);
if _role_members.is_err() {
revolt_config::capture_error(&_role_members.err().unwrap());
return;
}
let mut role_members = _role_members.unwrap();
let mut chunk = vec![];
let mut exhausted = false;
@@ -266,10 +260,33 @@ impl AsyncConsumer for MassMessageConsumer {
debug!("targets: {:?}", targets);
self.fire_notification_for_users(&push, &targets).await;
self.fire_notification_for_users(&push, &targets).await?;
}
}
}
}
Ok(())
}
}
#[allow(unused_variables)]
#[async_trait]
impl AsyncConsumer for MassMessageConsumer {
/// This consumer handles adding mentions for all the users affected by a mass mention ping, and then sends out push notifications
async fn consume(
&mut self,
channel: &Channel,
deliver: Deliver,
basic_properties: BasicProperties,
content: Vec<u8>,
) {
if let Err(err) = self
.consume_event(channel, deliver, basic_properties, content)
.await
{
revolt_config::capture_anyhow(&err);
eprintln!("Failed to process mass message event: {err:?}");
}
}
}
@@ -7,6 +7,7 @@ use amqprs::{
consumer::AsyncConsumer,
BasicProperties, Deliver,
};
use anyhow::Result;
use async_trait::async_trait;
use log::debug;
use revolt_database::{events::rabbit::*, Database};
@@ -54,21 +55,16 @@ impl MessageConsumer {
channel: None,
}
}
}
#[allow(unused_variables)]
#[async_trait]
impl AsyncConsumer for MessageConsumer {
/// This consumer handles delegating messages into their respective platform queues.
async fn consume(
async fn consume_event(
&mut self,
channel: &Channel,
deliver: Deliver,
basic_properties: BasicProperties,
_channel: &Channel,
_deliver: Deliver,
_basic_properties: BasicProperties,
content: Vec<u8>,
) {
let content = String::from_utf8(content).unwrap();
let payload: MessageSentPayload = serde_json::from_str(content.as_str()).unwrap();
) -> Result<()> {
let content = String::from_utf8(content)?;
let payload: MessageSentPayload = serde_json::from_str(content.as_str())?;
debug!("Received message event on origin");
@@ -117,11 +113,34 @@ impl AsyncConsumer for MessageConsumer {
.insert("endpoint".to_string(), sub.endpoint.clone());
}
let payload = serde_json::to_string(&sendable).unwrap();
let payload = serde_json::to_string(&sendable)?;
publish_message(self, payload.into(), args).await;
}
}
}
Ok(())
}
}
#[allow(unused_variables)]
#[async_trait]
impl AsyncConsumer for MessageConsumer {
/// This consumer handles delegating messages into their respective platform queues.
async fn consume(
&mut self,
channel: &Channel,
deliver: Deliver,
basic_properties: BasicProperties,
content: Vec<u8>,
) {
if let Err(err) = self
.consume_event(channel, deliver, basic_properties, content)
.await
{
revolt_config::capture_anyhow(&err);
eprintln!("Failed to process message event: {err:?}");
}
}
}
@@ -1,6 +1,7 @@
use std::{borrow::Cow, collections::BTreeMap, io::Cursor};
use amqprs::{channel::Channel as AmqpChannel, consumer::AsyncConsumer, BasicProperties, Deliver};
use anyhow::{anyhow, Result};
use async_trait::async_trait;
use base64::{
engine::{self},
@@ -122,20 +123,16 @@ impl ApnsOutboundConsumer {
Ok(ApnsOutboundConsumer { db, client })
}
}
#[allow(unused_variables)]
#[async_trait]
impl AsyncConsumer for ApnsOutboundConsumer {
async fn consume(
async fn consume_event(
&mut self,
channel: &AmqpChannel,
deliver: Deliver,
basic_properties: BasicProperties,
_channel: &AmqpChannel,
_deliver: Deliver,
_basic_properties: BasicProperties,
content: Vec<u8>,
) {
let content = String::from_utf8(content).unwrap();
let payload: PayloadToService = serde_json::from_str(content.as_str()).unwrap();
) -> Result<()> {
let content = String::from_utf8(content)?;
let payload: PayloadToService = serde_json::from_str(content.as_str())?;
let payload_options = NotificationOptions {
apns_id: None,
@@ -159,7 +156,7 @@ impl AsyncConsumer for ApnsOutboundConsumer {
alert.from_user.username, alert.from_user.discriminator
)))
.clone()
.unwrap(),
.ok_or_else(|| anyhow!("missing name"))?,
)];
let apn_payload = Payload {
@@ -205,7 +202,7 @@ impl AsyncConsumer for ApnsOutboundConsumer {
alert.accepted_user.username, alert.accepted_user.discriminator
)))
.clone()
.unwrap(),
.ok_or_else(|| anyhow!("missing name"))?,
)];
let apn_payload = Payload {
@@ -355,5 +352,27 @@ impl AsyncConsumer for ApnsOutboundConsumer {
}
}
}
Ok(())
}
}
#[allow(unused_variables)]
#[async_trait]
impl AsyncConsumer for ApnsOutboundConsumer {
async fn consume(
&mut self,
channel: &AmqpChannel,
deliver: Deliver,
basic_properties: BasicProperties,
content: Vec<u8>,
) {
if let Err(err) = self
.consume_event(channel, deliver, basic_properties, content)
.await
{
revolt_config::capture_anyhow(&err);
eprintln!("Failed to process APN event: {err:?}");
}
}
}
@@ -2,6 +2,7 @@ use std::{collections::HashMap, time::Duration};
use amqprs::{channel::Channel as AmqpChannel, consumer::AsyncConsumer, BasicProperties, Deliver};
use anyhow::{anyhow, bail, Result};
use async_trait::async_trait;
use fcm_v1::{
android::AndroidConfig,
@@ -64,22 +65,16 @@ impl FcmOutboundConsumer {
),
})
}
}
#[allow(unused_variables)]
#[async_trait]
impl AsyncConsumer for FcmOutboundConsumer {
async fn consume(
async fn consume_event(
&mut self,
channel: &AmqpChannel,
deliver: Deliver,
basic_properties: BasicProperties,
_channel: &AmqpChannel,
_deliver: Deliver,
_basic_properties: BasicProperties,
content: Vec<u8>,
) {
let content = String::from_utf8(content).unwrap();
let payload: PayloadToService = serde_json::from_str(content.as_str()).unwrap();
let config = revolt_config::config().await;
) -> Result<()> {
let content = String::from_utf8(content)?;
let payload: PayloadToService = serde_json::from_str(content.as_str())?;
#[allow(clippy::needless_late_init)]
let resp: Result<Message, FcmError>;
@@ -94,7 +89,7 @@ impl AsyncConsumer for FcmOutboundConsumer {
alert.from_user.username, alert.from_user.discriminator
)))
.clone()
.unwrap();
.ok_or_else(|| anyhow!("missing name"))?;
let mut data = HashMap::new();
data.insert(
@@ -122,7 +117,7 @@ impl AsyncConsumer for FcmOutboundConsumer {
alert.accepted_user.username, alert.accepted_user.discriminator
)))
.clone()
.unwrap();
.ok_or_else(|| anyhow!("missing name"))?;
let mut data: HashMap<String, Value> = HashMap::new();
data.insert(
@@ -175,7 +170,7 @@ impl AsyncConsumer for FcmOutboundConsumer {
}
PayloadKind::BadgeUpdate(_) => {
panic!("FCM cannot handle badge updates, and they should not be sent here.")
bail!("FCM cannot handle badge updates and they should not be sent here.");
}
}
@@ -195,5 +190,27 @@ impl AsyncConsumer for FcmOutboundConsumer {
}
}
}
Ok(())
}
}
#[allow(unused_variables)]
#[async_trait]
impl AsyncConsumer for FcmOutboundConsumer {
async fn consume(
&mut self,
channel: &AmqpChannel,
deliver: Deliver,
basic_properties: BasicProperties,
content: Vec<u8>,
) {
if let Err(err) = self
.consume_event(channel, deliver, basic_properties, content)
.await
{
revolt_config::capture_anyhow(&err);
eprintln!("Failed to process FCM event: {err:?}");
}
}
}
@@ -2,13 +2,13 @@ use std::collections::HashMap;
use amqprs::{channel::Channel as AmqpChannel, consumer::AsyncConsumer, BasicProperties, Deliver};
use anyhow::{anyhow, bail, Result};
use async_trait::async_trait;
use base64::{
engine::{self},
Engine as _,
};
use revolt_database::{events::rabbit::*, Database};
// use revolt_models::v0::{Channel, PushNotification};
use web_push::{
ContentEncoding, IsahcWebPushClient, SubscriptionInfo, SubscriptionKeys, VapidSignatureBuilder,
WebPushClient, WebPushError, WebPushMessageBuilder,
@@ -21,11 +21,11 @@ pub struct VapidOutboundConsumer {
}
impl VapidOutboundConsumer {
pub async fn new(db: Database) -> Result<VapidOutboundConsumer, &'static str> {
pub async fn new(db: Database) -> Result<VapidOutboundConsumer> {
let config = revolt_config::config().await;
if config.pushd.vapid.private_key.is_empty() | config.pushd.vapid.public_key.is_empty() {
return Err("No Vapid keys present");
bail!("no Vapid keys present");
}
let web_push_private_key = engine::general_purpose::URL_SAFE_NO_PAD
@@ -38,28 +38,30 @@ impl VapidOutboundConsumer {
pkey: web_push_private_key,
})
}
}
#[allow(unused_variables)]
#[async_trait]
impl AsyncConsumer for VapidOutboundConsumer {
async fn consume(
async fn consume_event(
&mut self,
channel: &AmqpChannel,
deliver: Deliver,
basic_properties: BasicProperties,
_channel: &AmqpChannel,
_deliver: Deliver,
_basic_properties: BasicProperties,
content: Vec<u8>,
) {
let content = String::from_utf8(content).unwrap();
let payload: PayloadToService = serde_json::from_str(content.as_str()).unwrap();
let config = revolt_config::config().await;
) -> Result<()> {
let content = String::from_utf8(content)?;
let payload: PayloadToService = serde_json::from_str(content.as_str())?;
let subscription = SubscriptionInfo {
endpoint: payload.extras.get("endpoint").unwrap().clone(),
endpoint: payload
.extras
.get("endpoint")
.ok_or_else(|| anyhow!("missing endpoint"))?
.clone(),
keys: SubscriptionKeys {
auth: payload.token,
p256dh: payload.extras.get("p256dh").unwrap().clone(),
p256dh: payload
.extras
.get("p256dh")
.ok_or_else(|| anyhow!("missing p256dh"))?
.clone(),
},
};
@@ -76,12 +78,12 @@ impl AsyncConsumer for VapidOutboundConsumer {
alert.from_user.username, alert.from_user.discriminator
)))
.clone()
.unwrap();
.ok_or_else(|| anyhow!("missing name"))?;
let mut body = HashMap::new();
body.insert("body", format!("{} sent you a friend request", name));
payload_body = serde_json::to_string(&body).unwrap();
payload_body = serde_json::to_string(&body)?;
}
PayloadKind::FRAccepted(alert) => {
let name = alert
@@ -92,21 +94,21 @@ impl AsyncConsumer for VapidOutboundConsumer {
alert.accepted_user.username, alert.accepted_user.discriminator
)))
.clone()
.unwrap();
.ok_or_else(|| anyhow!("missing name"))?;
let mut body = HashMap::new();
body.insert("body", format!("{} accepted your friend request", name));
payload_body = serde_json::to_string(&body).unwrap();
payload_body = serde_json::to_string(&body)?;
}
PayloadKind::Generic(alert) => {
payload_body = serde_json::to_string(&alert).unwrap();
payload_body = serde_json::to_string(&alert)?;
}
PayloadKind::MessageNotification(alert) => {
payload_body = serde_json::to_string(&alert).unwrap();
payload_body = serde_json::to_string(&alert)?;
}
PayloadKind::BadgeUpdate(_) => {
panic!("Vapid cannot handle badge updates, and they should not be sent here.")
bail!("Vapid cannot handle badge updates and they should not be sent here.");
}
}
@@ -122,28 +124,40 @@ impl AsyncConsumer for VapidOutboundConsumer {
Ok(msg) => {
if let Err(err) = self.client.send(msg).await {
if err == WebPushError::Unauthorized {
if let Err(err) = self
.db
self.db
.remove_push_subscription_by_session_id(&payload.session_id)
.await
{
revolt_config::capture_error(&err);
}
.await?;
}
}
Ok(())
}
Err(err) => {
revolt_config::capture_error(&err);
}
Err(err) => Err(err.into()),
}
}
Err(err) => {
revolt_config::capture_error(&err);
}
Err(err) => Err(err.into()),
},
Err(err) => {
revolt_config::capture_error(&err);
}
Err(err) => Err(err.into()),
}
}
}
#[allow(unused_variables)]
#[async_trait]
impl AsyncConsumer for VapidOutboundConsumer {
async fn consume(
&mut self,
channel: &AmqpChannel,
deliver: Deliver,
basic_properties: BasicProperties,
content: Vec<u8>,
) {
if let Err(err) = self
.consume_event(channel, deliver, basic_properties, content)
.await
{
revolt_config::capture_anyhow(&err);
eprintln!("Failed to process Vapid event: {err:?}");
}
}
}
+3
View File
@@ -27,6 +27,9 @@ async fn main() {
let config = config().await;
pretty_env_logger::init();
// Configure logging and environment
revolt_config::configure!(pushd);
// Setup database
let db = revolt_database::DatabaseInfo::Auto.connect().await.unwrap();
let authifier: authifier::Database;