Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
160 changes: 128 additions & 32 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,16 @@ use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use tracing::{error, info, warn};

/// Returns true for Discord errors that are permanent configuration failures
/// (invalid token, disallowed intents). These should not be retried.
fn is_fatal_discord_error(e: &serenity::Error) -> bool {
matches!(
e,
serenity::Error::Gateway(GatewayError::InvalidAuthentication)
| serenity::Error::Gateway(GatewayError::DisallowedGatewayIntents)
)
}

/// Wait for SIGINT (ctrl_c) or, on unix, SIGTERM. SIGTERM is what Kubernetes
/// sends during pod termination, so handling it lets us run the full cleanup
/// path (shard manager, ACP pool drain) instead of getting SIGKILL'd after the
Expand Down Expand Up @@ -152,8 +162,16 @@ async fn main() -> anyhow::Result<()> {
cfg.pool.liveness_check_secs,
));

// Shutdown signal for Slack adapter
// Shared shutdown signal for all adapters. A single OS signal listener flips
// this watch channel; foreground adapters consume it without registering
// their own signal handlers on each reconnect.
let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);
let signal_shutdown_tx = shutdown_tx.clone();
let shutdown_signal_handle = tokio::spawn(async move {
shutdown_signal().await;
info!("shutdown signal received");
let _ = signal_shutdown_tx.send(true);
});

// Dispatcher handles tracked here so SIGTERM cleanup can call shutdown() on each (ADR §6.8).
// Also shared with the cleanup task for periodic stale-entry sweeping.
Expand Down Expand Up @@ -363,6 +381,7 @@ async fn main() -> anyhow::Result<()> {
};

// Run Discord adapter (foreground, blocking) or wait for ctrl_c
let mut fatal_exit = false;
if let Some(discord_cfg) = cfg.discord {
let allow_all_channels = config::resolve_allow_all(
discord_cfg.allow_all_channels,
Expand Down Expand Up @@ -443,48 +462,122 @@ async fn main() -> anyhow::Result<()> {
| GatewayIntents::GUILDS
| GatewayIntents::DIRECT_MESSAGES;

let mut client = Client::builder(&discord_cfg.bot_token, intents)
.event_handler(handler)
.await?;
let handler = Arc::new(handler);
let mut backoff_secs = 1u64;
const MAX_DISCORD_BACKOFF: u64 = 30;
let mut discord_shutdown_rx = shutdown_rx.clone();

// Graceful Discord shutdown on ctrl_c
let shard_manager = client.shard_manager.clone();
tokio::spawn(async move {
shutdown_signal().await;
info!("shutdown signal received");
shard_manager.shutdown_all().await;
});
loop {
if *discord_shutdown_rx.borrow() {
break;
}

let mut client = match Client::builder(&discord_cfg.bot_token, intents)
.event_handler_arc(Arc::clone(&handler))
.await
{
Ok(c) => c,
Err(e) => {
if is_fatal_discord_error(&e) {
error!(err = %e, "discord client build failed with fatal error, not retrying");
fatal_exit = true;
break;
}
error!(err = %e, backoff = backoff_secs, "discord client build failed, retrying");
if *discord_shutdown_rx.borrow() {
break;
}
tokio::select! {
_ = tokio::time::sleep(std::time::Duration::from_secs(backoff_secs)) => {}
_ = discord_shutdown_rx.changed() => { break; }
}
backoff_secs = (backoff_secs * 2).min(MAX_DISCORD_BACKOFF);
continue;
}
};

let shard_manager = client.shard_manager.clone();

info!("discord bot connecting");
let started_at = tokio::time::Instant::now();
let start = client.start();
tokio::pin!(start);
let fatal = tokio::select! {
result = &mut start => {
if started_at.elapsed() >= std::time::Duration::from_secs(60) {
backoff_secs = 1;
}
match result {
Err(e) if is_fatal_discord_error(&e) => {
match &e {
serenity::Error::Gateway(GatewayError::DisallowedGatewayIntents) => {
error!(
"Discord rejected privileged intents. \
Enable MESSAGE CONTENT INTENT at: \
https://discord.com/developers/applications → Bot → Privileged Gateway Intents"
);
}
serenity::Error::Gateway(GatewayError::InvalidAuthentication) => {
error!(
"Discord rejected bot token. \
Verify your bot_token in config.toml is correct and has not been reset."
);
}
_ => error!(err = %e, "discord fatal gateway error, not retrying"),
}
true
}
Err(e) => {
error!(err = %e, backoff = backoff_secs, "discord gateway error, reconnecting");
false
}
Ok(()) => {
warn!(backoff = backoff_secs, "discord gateway disconnected cleanly, reconnecting");
false
}
}
}
_ = discord_shutdown_rx.changed() => {
let _ = tokio::time::timeout(
std::time::Duration::from_secs(5),
async {
shard_manager.shutdown_all().await;
let _ = start.await;
},
).await;
break;
}
};

if fatal {
fatal_exit = true;
break;
}

info!("discord bot running");
match client.start().await {
Err(serenity::Error::Gateway(GatewayError::DisallowedGatewayIntents)) => {
error!(
"Discord rejected privileged intents. \
Enable MESSAGE CONTENT INTENT at: \
https://discord.com/developers/applications → Bot → Privileged Gateway Intents"
);
std::process::exit(1);
// Check if shutdown was requested before sleeping
if *discord_shutdown_rx.borrow() {
break;
}
Err(serenity::Error::Gateway(GatewayError::InvalidAuthentication)) => {
error!(
"Discord rejected bot token. \
Verify your bot_token in config.toml is correct and has not been reset."
);
std::process::exit(1);

tokio::select! {
_ = tokio::time::sleep(std::time::Duration::from_secs(backoff_secs)) => {}
_ = discord_shutdown_rx.changed() => { break; }
}
Err(e) => return Err(e.into()),
Ok(_) => {}
backoff_secs = (backoff_secs * 2).min(MAX_DISCORD_BACKOFF);
}
} else {
// No Discord — wait for SIGINT or SIGTERM
// No Discord — wait for SIGINT or SIGTERM via the shared shutdown watcher.
info!("running without discord, press ctrl+c to stop");
shutdown_signal().await;
info!("shutdown signal received");
let mut shutdown_rx = shutdown_rx.clone();
if !*shutdown_rx.borrow() {
let _ = shutdown_rx.changed().await;
}
}

// Cleanup
shutdown_signal_handle.abort();
cleanup_handle.abort();
// Signal Slack adapter to shut down gracefully
// Signal background adapters to shut down gracefully.
let _ = shutdown_tx.send(true);
if let Some(handle) = slack_handle {
let _ = tokio::time::timeout(std::time::Duration::from_secs(5), handle).await;
Expand All @@ -503,6 +596,9 @@ async fn main() -> anyhow::Result<()> {
let shutdown_pool = pool;
shutdown_pool.shutdown().await;
info!("openab shut down");
if fatal_exit {
anyhow::bail!("discord fatal error: invalid token or disallowed intents");
}
Ok(())
}

Expand Down
Loading