feat: handle reaction remove, fix deadlock
All checks were successful
Build / Build (push) Successful in 2m1s
Build / docker (push) Successful in 20s

This commit is contained in:
파링 2026-01-07 15:42:47 +09:00
parent f3eb1dc188
commit c0296f698a
Signed by: paring
SSH key fingerprint: SHA256:8uCHhCpn/gVOLEaTolmbub9kfM6XBxWkIWmHxUZoWWk
4 changed files with 61 additions and 35 deletions

View file

@ -1,4 +1,4 @@
use std::{borrow::Cow, collections::HashMap}; use std::collections::HashMap;
use anyhow::anyhow; use anyhow::anyhow;
use serenity::all::{ use serenity::all::{
@ -59,8 +59,6 @@ impl Handler {
return Ok(()); return Ok(());
} }
// if let CommandType interaction.data.kind {}
for option in interaction.data.options() { for option in interaction.data.options() {
if option.name == "script" { if option.name == "script" {
let script = get_guild(&self.db, guild_id) let script = get_guild(&self.db, guild_id)
@ -171,6 +169,7 @@ if reactions["⭐"] >= 3 {
.guild() .guild()
.and_then(|x| x.parent_id) .and_then(|x| x.parent_id)
.map(|x| x.to_string()), .map(|x| x.to_string()),
false,
) )
.unwrap_or_else(|err| (None, format!("run failed: {err:?}"))); .unwrap_or_else(|err| (None, format!("run failed: {err:?}")));

View file

@ -15,6 +15,7 @@ use serenity::{
small_fixed_array::{FixedArray, FixedString}, small_fixed_array::{FixedArray, FixedString},
}; };
use sqlx::{PgExecutor, PgPool}; use sqlx::{PgExecutor, PgPool};
use tokio::sync::Semaphore;
use crate::{ use crate::{
commands, commands,
@ -49,7 +50,7 @@ impl From<MessageSnapshot> for CloneBuilderMessage {
pub struct Handler { pub struct Handler {
pub db: PgPool, pub db: PgPool,
pub message_lock: Arc<DashMap<MessageId, Arc<Mutex<()>>>>, pub message_lock: Arc<DashMap<MessageId, Arc<Semaphore>>>,
} }
#[async_trait] #[async_trait]
@ -71,25 +72,12 @@ impl EventHandler for Handler {
} }
} }
FullEvent::ReactionAdd { add_reaction, .. } => { FullEvent::ReactionAdd { add_reaction, .. } => {
let lock = self self.process_reaction(context, add_reaction, false).await;
.message_lock
.entry(add_reaction.message_id)
.or_insert_with(|| Arc::new(Mutex::new(())));
{
let _guard = lock.lock().await;
if let Err(e) = self.reaction_add(context, add_reaction).await {
error!("Error while processing reaction add event: {e:?}");
}
}
if Arc::strong_count(&lock) == 2 {
self.message_lock
.remove_if(&add_reaction.message_id, |_, val| {
Arc::strong_count(val) <= 2
});
} }
FullEvent::ReactionRemove {
removed_reaction, ..
} => {
self.process_reaction(context, removed_reaction, true).await;
} }
FullEvent::InteractionCreate { interaction } => { FullEvent::InteractionCreate { interaction } => {
if let Err(e) = self.interaction_create(context, interaction).await { if let Err(e) = self.interaction_create(context, interaction).await {
@ -129,8 +117,37 @@ impl Handler {
Ok(()) Ok(())
} }
async fn reaction_add(&self, ctx: &Context, reaction: &Reaction) -> anyhow::Result<()> { async fn process_reaction(&self, ctx: &Context, reaction: &Reaction, is_remove: bool) {
if reaction.user(&ctx.http).await?.bot() { let semaphore = {
self.message_lock
.entry(reaction.message_id)
.or_insert_with(|| Arc::new(Semaphore::new(1)))
.value()
.clone()
};
let _permit = match semaphore.acquire().await {
Ok(p) => p,
Err(_) => return,
};
if let Err(e) = self.process_reaction_inner(ctx, reaction, is_remove).await {
error!("Error while processing reaction add event: {e:?}");
}
if Arc::strong_count(&semaphore) <= 2 {
self.message_lock
.remove_if(&reaction.message_id, |_, val| Arc::strong_count(val) <= 2);
}
}
async fn process_reaction_inner(
&self,
ctx: &Context,
reaction: &Reaction,
is_remove: bool,
) -> anyhow::Result<()> {
if reaction.user((ctx.cache(), ctx.http())).await?.bot() {
return Ok(()); return Ok(());
} }
let Some(guild_id) = reaction.guild_id else { let Some(guild_id) = reaction.guild_id else {
@ -145,7 +162,11 @@ impl Handler {
let existing_message = get_message(&self.db, reaction.message_id).await?; let existing_message = get_message(&self.db, reaction.message_id).await?;
let msg = reaction.message(&ctx.http).await?; if is_remove && existing_message.is_none() {
return Ok(());
}
let msg = reaction.message((ctx.cache(), ctx.http())).await?;
let reactions: HashMap<String, i64> = msg let reactions: HashMap<String, i64> = msg
.reactions .reactions
@ -159,7 +180,7 @@ impl Handler {
CloneBuilderMessage::from(msg.clone()) CloneBuilderMessage::from(msg.clone())
}; };
let channel = reaction.channel(&ctx.http).await?; let channel = reaction.channel((ctx.cache(), ctx.http())).await?;
debug!("channel: {channel:?}"); debug!("channel: {channel:?}");
@ -171,6 +192,7 @@ impl Handler {
.guild() .guild()
.and_then(|x| x.parent_id) .and_then(|x| x.parent_id)
.map(|x| x.to_string()), .map(|x| x.to_string()),
is_remove,
) )
.map(|x| x.0) .map(|x| x.0)
.inspect_err(|res| { .inspect_err(|res| {
@ -200,7 +222,7 @@ impl Handler {
.parse() .parse()
.context("unable to parse counter channel id")?; .context("unable to parse counter channel id")?;
let existing_message = ctx.http.get_message(chn_id, msg_id).await?; let existing_message = chn_id.message((ctx.cache(), ctx.http()), msg_id).await?;
for comp in existing_message for comp in existing_message
.components .components
@ -311,7 +333,7 @@ impl Handler {
EditWebhookMessage::new().components(components), EditWebhookMessage::new().components(components),
) )
.await?; .await?;
} else { } else if !is_remove {
let counter_msg = webhook let counter_msg = webhook
.execute( .execute(
&ctx.http, &ctx.http,

View file

@ -57,7 +57,9 @@ async fn main() -> anyhow::Result<()> {
Token::from_str(config.bot.token.expose_secret()).unwrap(), Token::from_str(config.bot.token.expose_secret()).unwrap(),
GatewayIntents::GUILD_MESSAGES GatewayIntents::GUILD_MESSAGES
| GatewayIntents::GUILD_MESSAGE_REACTIONS | GatewayIntents::GUILD_MESSAGE_REACTIONS
| GatewayIntents::GUILDS, | GatewayIntents::GUILDS
| GatewayIntents::GUILD_MESSAGES
| GatewayIntents::MESSAGE_CONTENT,
) )
.event_handler(Arc::new(Handler { .event_handler(Arc::new(Handler {
db, db,

View file

@ -20,19 +20,21 @@ pub fn check(
reactions: HashMap<String, i64>, reactions: HashMap<String, i64>,
channel: String, channel: String,
category: Option<String>, category: Option<String>,
is_removed: bool,
) -> anyhow::Result<(Option<ReactionResult>, String)> { ) -> anyhow::Result<(Option<ReactionResult>, String)> {
debug!("script: {script}"); debug!("script: {script}");
let mut engine = Engine::new(); let mut engine = Engine::new();
let buffer = Arc::new(RwLock::new(String::new())); let buffer = Arc::new(RwLock::new(String::new()));
engine.set_max_operations(1000); engine.set_max_operations(1000);
engine.register_type::<ReactionResult>(); engine.register_type::<ReactionResult>();
engine.register_fn("result", |webhook_url: String, count: i64, icon: String| { engine.register_fn(
ReactionResult { "result",
|webhook_url: String, count: Dynamic, icon: String| ReactionResult {
channel_id: webhook_url, channel_id: webhook_url,
count, count: count.as_int().unwrap_or(0),
icon, icon,
} },
}); );
let logger = buffer.clone(); let logger = buffer.clone();
@ -56,6 +58,7 @@ pub fn check(
let mut scope = Scope::new(); let mut scope = Scope::new();
scope.set_value("reactions", Dynamic::from(emotes_input)); scope.set_value("reactions", Dynamic::from(emotes_input));
scope.set_value("channel", Dynamic::from(channel)); scope.set_value("channel", Dynamic::from(channel));
scope.set_value("removed", Dynamic::from(is_removed));
scope.set_value( scope.set_value(
"category", "category",