miniflux-discord/src/main.rs

241 lines
8.3 KiB
Rust

use std::{convert::Infallible, fs, net::SocketAddr, path::PathBuf};
use config::Config;
use http_body_util::{BodyExt, Full};
use hyper::{
body::{self, Body, Bytes},
server::conn::http1,
service::service_fn,
Method, Request, Response, StatusCode,
};
use hyper_util::rt::TokioIo;
use miniflux_requests::MinifluxEvent;
use ring::hmac;
use serenity::{all::GatewayIntents, Client};
use thiserror::Error;
use tokio::net::TcpListener;
use crate::discord::DiscordHolder;
mod config;
mod discord;
mod miniflux_requests;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let path_opt = {
let default_path = dirs::config_dir().map(|p| p.join("miniflux_discord.json"));
let path = std::env::var("CONFIG_PATH").ok().map(|s| PathBuf::from(s)).or(default_path);
path
};
let path = match path_opt {
Some(p) => p,
None => panic!("config path could not be resolved! Use CONFIG_PATH environment variable"),
};
let res: Result<_, StartingError> = fs::read(path)
.map_err(|e| e.into())
.and_then(|s| serde_json::from_slice(&s).map_err(|e| e.into()));
let config: Config = match res {
Ok(c) => c,
Err(e) => panic!("{}", e),
};
stderrlog::new()
.module(module_path!())
.verbosity(config.log_level)
.timestamp(stderrlog::Timestamp::Second)
.init()
.expect("stderrlog could not be initialized!");
match start_serving(config).await {
Ok(_) => Ok(()),
Err(e) => {
log::error!("{}", e);
Ok(())
}
}
}
async fn start_serving(config: Config) -> Result<(), StartingError> {
let mut client = Client::builder(&config.discord_token, GatewayIntents::empty()).await?;
let addr = SocketAddr::from((config.host, config.port));
let listener = TcpListener::bind(addr).await?;
log::info!("Binded to {}:{}", config.host, config.port);
let cache = client.cache.clone();
let http = client.http.clone();
let holder = DiscordHolder { cache, http, config };
let client_handle = tokio::task::spawn(async move {
log::info!("Discord client started, should show up online now!");
client.start().await.expect("Failed starting Discord client!");
});
let loop_handle = tokio::task::spawn(async move {
log::info!("Listening to TCP/HTTP connections now..");
loop {
let Ok((stream, addr)) = listener.accept().await else {
log::warn!("Failed to accept TCP stream");
continue;
};
log::debug!("=> {}", addr);
let io = TokioIo::new(stream);
let holder = holder.clone();
// Spawn a tokio task to serve multiple connections concurrently
tokio::task::spawn(async move {
let service = service_fn(|req: Request<body::Incoming>| {
let holder = holder.clone();
async move {
Ok::<_, Infallible>(
process_webhook(req, &holder).await.unwrap_or_else(|e| e.into()),
)
}
});
let result = http1::Builder::new().serve_connection(io, service).await;
if let Err(err) = result {
log::warn!("Error serving connection {}: {:?}", addr, err);
}
});
}
});
let (res1, res2) = tokio::join!(client_handle, loop_handle);
res1.expect("Failed unwrapping client handle result!");
res2.expect("Failed unwrapping loop handle result!");
Ok(())
}
#[derive(Error, Debug)]
pub enum StartingError {
#[error("configuration file path not found!")]
ConfigPathNotFound,
#[error("IO Error: {0}")]
IOError(#[from] std::io::Error),
#[error("Config Json deserialization error: {0}")]
ConfigJsonError(#[from] serde_json::Error),
#[error("Logger init failed: {0}")]
LoggerError(#[from] log::SetLoggerError),
#[error("Discord Client Error: {0}")]
DiscordClientError(#[from] serenity::Error),
}
async fn process_webhook(
req: Request<body::Incoming>,
holder: &DiscordHolder,
) -> Result<Response<Full<Bytes>>, ServingError> {
let method = req.method();
if method != Method::POST {
Err(ServingError::InvalidMethod(method.clone()))?;
}
let userid: u64 =
req.uri().path().split("/").nth(1).ok_or(ServingError::IdPathNotFound)?.parse()?;
if !holder.config.whitelisted_user_ids.contains(&userid) {
Err(ServingError::NotWhitelistedUser(userid))?;
}
let headers = req.headers();
let signature_bytes =
headers.get("x-miniflux-signature").ok_or(ServingError::SignatureMissing)?.as_bytes();
let signature = data_encoding::HEXLOWER.decode(signature_bytes)?;
let event_type =
headers.get("x-miniflux-event-type").ok_or(ServingError::EventTypeMissing)?.clone();
let upper = req.body().size_hint().upper().unwrap_or(u64::MAX);
if upper > holder.config.payload_max_size {
Err(ServingError::PayloadTooLarge(upper))?;
}
let whole_body = req.collect().await?.to_bytes();
let bytes = whole_body.iter().cloned().collect::<Vec<u8>>();
let key = hmac::Key::new(hmac::HMAC_SHA256, &holder.config.miniflux_webhook_secret.as_bytes());
let () =
hmac::verify(&key, &bytes, &signature).map_err(|_| ServingError::HmacValidationError)?;
let event: MinifluxEvent = serde_json::from_slice(&bytes)?;
match event {
MinifluxEvent::New(_) => assert!(event_type == "new_entries"),
MinifluxEvent::Save(_) => assert!(event_type == "save_entry"),
}
log::info!("received {} from miniflux!", event_type.to_str().unwrap_or_default());
let res = holder.send(userid, event).await;
if let Err(err) = res {
log::error!("Error while trying to send discord message {}", err);
}
Ok(Response::new(Full::new(vec![].into())))
}
#[derive(Error, Debug)]
pub enum ServingError {
#[error("id path not found")]
IdPathNotFound,
#[error("invalid method: {0}")]
InvalidMethod(Method),
#[error("parse int error: {0}")]
ParseIntError(#[from] std::num::ParseIntError),
#[error("{0} not allowed!")]
NotWhitelistedUser(u64),
#[error("signature missing")]
SignatureMissing,
#[error("event_type missing")]
EventTypeMissing,
#[error("signature decoding error: {0}")]
DecodeError(#[from] data_encoding::DecodeError),
#[error("payload too large ({0})")]
PayloadTooLarge(u64),
#[error("Hyper Error: {0}")]
HyperError(#[from] hyper::Error),
#[error("HMAC signature validation failed")]
HmacValidationError,
#[error("JSON deserialization failed: {0}")]
JsonError(#[from] serde_json::Error),
}
impl Into<Response<Full<Bytes>>> for ServingError {
fn into(self) -> Response<Full<Bytes>> {
match &self {
ServingError::JsonError(_) => log::error!("{}", self),
ServingError::HmacValidationError => log::warn!("{}", self),
ServingError::HyperError(_) => log::warn!("{}", self),
ServingError::PayloadTooLarge(_) => log::warn!("{}", self),
ServingError::SignatureMissing => log::warn!("{}", self),
ServingError::EventTypeMissing => log::warn!("{}", self),
_ => log::debug!("{}", self),
}
let mut resp = Response::new(Full::from(Bytes::from(self.to_string())));
let status_code = match self {
ServingError::IdPathNotFound => StatusCode::NOT_FOUND,
ServingError::InvalidMethod(_) => StatusCode::METHOD_NOT_ALLOWED,
ServingError::ParseIntError(_) => StatusCode::BAD_REQUEST,
ServingError::NotWhitelistedUser(_) => StatusCode::FORBIDDEN,
ServingError::SignatureMissing => StatusCode::BAD_REQUEST,
ServingError::EventTypeMissing => StatusCode::BAD_REQUEST,
ServingError::DecodeError(_) => StatusCode::BAD_REQUEST,
ServingError::PayloadTooLarge(_) => StatusCode::PAYLOAD_TOO_LARGE,
ServingError::HyperError(_) => StatusCode::INTERNAL_SERVER_ERROR,
ServingError::HmacValidationError => StatusCode::FORBIDDEN,
ServingError::JsonError(_) => StatusCode::BAD_REQUEST,
};
*resp.status_mut() = status_code;
resp
}
}