Finish up here

This commit is contained in:
Sofia 2024-03-16 17:43:55 +02:00
parent 21c4b82658
commit a1c1bc5809
5 changed files with 274 additions and 110 deletions

59
Cargo.lock generated
View File

@ -717,6 +717,17 @@ version = "2.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f518f335dce6725a761382244631d86cf0ccb2863413590b31338feb467f9c3"
[[package]]
name = "is-terminal"
version = "0.4.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f23ff5ef2b80d608d61efee834934d862cd92461afc0560dedf493e4c033738b"
dependencies = [
"hermit-abi",
"libc",
"windows-sys 0.52.0",
]
[[package]]
name = "itoa"
version = "1.0.10"
@ -765,6 +776,9 @@ name = "log"
version = "0.4.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "90ed8c1e510134f979dbc4f070f87d4313098b704861a105fe34231c70a3901c"
dependencies = [
"serde",
]
[[package]]
name = "memchr"
@ -812,10 +826,13 @@ dependencies = [
"http-body-util",
"hyper 1.2.0",
"hyper-util",
"log",
"ring",
"serde",
"serde_json",
"serenity",
"stderrlog",
"thiserror",
"tokio",
]
@ -1280,6 +1297,15 @@ dependencies = [
"digest",
]
[[package]]
name = "signal-hook-registry"
version = "1.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d8229b473baa5980ac72ef434c4415e70c4b5e71b423043adb4ba059f89c99a1"
dependencies = [
"libc",
]
[[package]]
name = "skeptic"
version = "0.13.7"
@ -1332,6 +1358,19 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f"
[[package]]
name = "stderrlog"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "61c910772f992ab17d32d6760e167d2353f4130ed50e796752689556af07dc6b"
dependencies = [
"chrono",
"is-terminal",
"log",
"termcolor",
"thread_local",
]
[[package]]
name = "subtle"
version = "2.5.0"
@ -1405,6 +1444,15 @@ dependencies = [
"windows-sys 0.52.0",
]
[[package]]
name = "termcolor"
version = "1.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bab24d30b911b2376f3a13cc2cd443142f0c81dda04c118693e35b3835757755"
dependencies = [
"winapi-util",
]
[[package]]
name = "thiserror"
version = "1.0.58"
@ -1425,6 +1473,16 @@ dependencies = [
"syn 2.0.48",
]
[[package]]
name = "thread_local"
version = "1.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b9ef9bad013ada3808854ceac7b46812a6465ba368859a37e2100283d2d719c"
dependencies = [
"cfg-if",
"once_cell",
]
[[package]]
name = "time"
version = "0.3.34"
@ -1483,6 +1541,7 @@ dependencies = [
"mio",
"num_cpus",
"pin-project-lite",
"signal-hook-registry",
"socket2",
"tokio-macros",
"windows-sys 0.48.0",

View File

@ -7,7 +7,7 @@ edition = "2021"
[dependencies]
hyper = { version = "1", features = ["server", "http1", "http2"] }
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
tokio = { version = "1", features = ["macros", "rt-multi-thread", "signal"] }
http-body-util = "0.1"
hyper-util = { version = "0.1", features = ["tokio"] }
serde = {version = "1.0", features = ["derive"]}
@ -15,4 +15,7 @@ serde_json = "1.0"
chrono = { version = "0.4.35", features = ["serde"] }
serenity = { version = "0.12" }
ring = { version = "0.17.8" }
data-encoding = "2.5"
data-encoding = "2.5"
stderrlog = "0.6.0"
log = { version = "0.4.21", features = ["serde"] }
thiserror = "1.0.37"

View File

@ -1,11 +1,11 @@
use std::net::IpAddr;
use serde::Deserialize;
use std::net::IpAddr;
#[derive(Deserialize, Debug, Clone)]
pub struct Config {
pub host: IpAddr,
pub port: u16,
pub log_level: log::Level,
pub discord_token: String,
pub miniflux_base_url: String,
pub miniflux_webhook_secret: String,

82
src/discord.rs Normal file
View File

@ -0,0 +1,82 @@
use std::sync::Arc;
use super::config::Config;
use super::miniflux_requests::{Entry, Feed, MinifluxEvent, NewEntries};
use serenity::all::{
Cache, Color, CreateButton, CreateEmbed, CreateEmbedAuthor, CreateEmbedFooter, CreateMessage,
Http, UserId,
};
use thiserror::Error;
#[derive(Clone)]
pub struct DiscordHolder {
pub cache: Arc<Cache>,
pub http: Arc<Http>,
pub config: Config,
}
#[derive(Error, Debug)]
pub enum DiscordError {
#[error("Discord API error: {0}")]
DiscordApiError(#[from] serenity::Error),
}
impl DiscordHolder {
pub async fn send(&self, userid: u64, event: MinifluxEvent) -> Result<(), DiscordError> {
let user = self.http.get_user(UserId::new(userid)).await?;
match event {
MinifluxEvent::New(NewEntries { feed, entries }) => {
for entry in entries {
user.direct_message(
(&self.cache, self.http.as_ref()),
self.message_from_entry(&entry, &feed)?,
)
.await?;
log::info!("Entry {} sent to user {}", entry.id, user.name);
}
}
_ => {}
};
Ok(())
}
fn message_from_entry(
&self,
entry: &Entry,
feed: &Feed,
) -> Result<CreateMessage, DiscordError> {
let author = CreateEmbedAuthor::new(&feed.title).url(&feed.site_url);
let footer = CreateEmbedFooter::new(format!("{} minutes", entry.reading_time.to_string()));
let miniflux_url =
format!("{}/feed/{}/entry/{}", self.config.miniflux_base_url, feed.id, entry.id);
let mut embed = CreateEmbed::new()
.title(&entry.title)
.url(&entry.url)
.footer(footer)
.timestamp(entry.published_at)
.author(author)
.color(Color::from_rgb(
(feed.id % 256) as u8,
((feed.id * feed.id) % 256) as u8,
((feed.id * feed.id * feed.id) % 256) as u8,
))
.description(&entry.content.chars().take(200).collect::<String>());
if entry.tags.len() > 0 {
embed = embed.field("Tags", entry.tags.join(","), true)
}
if let Some(enclosure) = entry.enclosures.iter().find(|e| e.mime_type.starts_with("image/"))
{
embed = embed.image(&enclosure.url);
}
let external_button = CreateButton::new_link(&entry.url).label("external").emoji('📤');
let miniflux_button = CreateButton::new_link(miniflux_url).label("miniflux").emoji('📩');
Ok(CreateMessage::new().embed(embed).button(external_button).button(miniflux_button))
}
}

View File

@ -1,26 +1,24 @@
use std::{fs, net::SocketAddr};
use std::{convert::Infallible, fs, net::SocketAddr};
use config::Config;
use http_body_util::{combinators::BoxBody, BodyExt, Full};
use http_body_util::{BodyExt, Full};
use hyper::{
body::{self, Body, Bytes},
server::conn::http1,
service::service_fn,
Error, Request, Response,
Method, Request, Response, StatusCode,
};
use hyper_util::rt::TokioIo;
use miniflux_requests::{Entry, Feed, MinifluxEvent, NewEntries};
use miniflux_requests::MinifluxEvent;
use ring::hmac;
use serenity::{
all::{
CacheHttp, Color, CreateButton, CreateEmbed, CreateEmbedAuthor, CreateEmbedFooter,
CreateMessage, EmbedMessageBuilding, GatewayIntents, MessageBuilder, User, UserId,
},
Client,
};
use serenity::{all::GatewayIntents, Client};
use thiserror::Error;
use tokio::net::TcpListener;
use crate::discord::DiscordHolder;
mod config;
mod discord;
mod miniflux_requests;
//FIXME!
@ -30,155 +28,177 @@ const CONFIG_PATH: &'static str = "./config.json";
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let config: Config = serde_json::from_slice(&fs::read(CONFIG_PATH).unwrap()).unwrap();
stderrlog::new()
.module(module_path!())
.verbosity(config.log_level)
.timestamp(stderrlog::Timestamp::Second)
.init()
.unwrap();
let mut client = Client::builder(&config.discord_token, GatewayIntents::empty())
.await
.expect("Err creating client");
.expect("Error creating Discord client");
let addr = SocketAddr::from((config.host, config.port));
let listener = TcpListener::bind(addr).await?;
log::info!("Binded to {}:{}", config.host, config.port);
let cache = client.cache.clone();
let http = client.http.clone();
let holder = DiscordHolder { cache, http, config };
let client_handle = tokio::task::spawn(async move {
client.start().await.unwrap();
log::info!("Discord client started, should show up online now!");
client.start().await.expect("Failed starting Discord client!");
});
let loop_handle = tokio::task::spawn(async move {
log::info!("Listening to TCP/HTTP connections now..");
loop {
let (stream, _) = listener.accept().await.unwrap();
let Ok((stream, addr)) = listener.accept().await else {
log::warn!("Failed to accept TCP stream");
continue;
};
log::debug!("=> {}", addr);
let io = TokioIo::new(stream);
let cache = cache.clone();
let http = http.clone();
let config = config.clone();
let holder = holder.clone();
// Spawn a tokio task to serve multiple connections concurrently
tokio::task::spawn(async move {
if let Err(err) = http1::Builder::new()
// `service_fn` converts our function in a `Service`
.serve_connection(
io,
service_fn(|req: Request<body::Incoming>| {
let cache = cache.clone();
let http = http.clone();
let config = config.clone();
async move { hello(req, (&cache, http.as_ref()), &config).await }
}),
)
.await
{
println!("Error serving connection: {:?}", err);
let service = service_fn(|req: Request<body::Incoming>| {
let holder = holder.clone();
async move {
Ok::<_, Infallible>(
process_webhook(req, &holder).await.unwrap_or_else(|e| e.into()),
)
}
});
let result = http1::Builder::new().serve_connection(io, service).await;
if let Err(err) = result {
log::warn!("Error serving connection {}: {:?}", addr, err);
}
});
}
});
let (res1, res2) = tokio::join!(client_handle, loop_handle);
res1.unwrap();
res2.unwrap();
res1.expect("Failed unwrapping client handle result!");
res2.expect("Failed unwrapping loop handle result!");
Ok(())
}
async fn hello(
async fn process_webhook(
req: Request<body::Incoming>,
ctx: impl CacheHttp + Copy,
config: &Config,
) -> Result<Response<BoxBody<hyper::body::Bytes, hyper::Error>>, Error> {
// todo check method
let _method = req.method();
let headers = req.headers();
// todo fix unwrap
let userid = req.uri().path().split("/").nth(1).unwrap().parse::<u64>().unwrap();
if !config.whitelisted_user_ids.contains(&userid) {
// Fixme!
panic!("{} not allowed!", userid);
holder: &DiscordHolder,
) -> Result<Response<Full<Bytes>>, CustomError> {
let method = req.method();
if method != Method::POST {
Err(CustomError::InvalidMethod(method.clone()))?;
}
let user = ctx.http().get_user(UserId::new(userid)).await.unwrap();
let userid: u64 =
req.uri().path().split("/").nth(1).ok_or(CustomError::IdPathNotFound)?.parse()?;
if !holder.config.whitelisted_user_ids.contains(&userid) {
Err(CustomError::NotWhitelistedUser(userid))?;
}
let headers = req.headers();
// Todo remove expect
// Todo make sure contents match signature
let signature_bytes =
headers.get("x-miniflux-signature").expect("expected signature").as_bytes();
let signature = data_encoding::HEXLOWER.decode(signature_bytes).unwrap();
let event_type = headers.get("x-miniflux-event-type").expect("expected event type").clone();
headers.get("x-miniflux-signature").ok_or(CustomError::SignatureMissing)?.as_bytes();
let signature = data_encoding::HEXLOWER.decode(signature_bytes)?;
let event_type =
headers.get("x-miniflux-event-type").ok_or(CustomError::EventTypeMissing)?.clone();
let upper = req.body().size_hint().upper().unwrap_or(u64::MAX);
if upper > config.payload_max_size {
let mut resp = Response::new(full("Body too big"));
*resp.status_mut() = hyper::StatusCode::PAYLOAD_TOO_LARGE;
dbg!("Got message, too big!");
return Ok(resp);
if upper > holder.config.payload_max_size {
Err(CustomError::PayloadTooLarge(upper))?;
}
let whole_body = req.collect().await?.to_bytes();
let bytes = whole_body.iter().cloned().collect::<Vec<u8>>();
let key = hmac::Key::new(hmac::HMAC_SHA256, &config.miniflux_webhook_secret.as_bytes());
let key = hmac::Key::new(hmac::HMAC_SHA256, &holder.config.miniflux_webhook_secret.as_bytes());
// TODO! Remove unwrap!
hmac::verify(&key, &bytes, &signature).unwrap();
let () =
hmac::verify(&key, &bytes, &signature).map_err(|_| CustomError::HmacValidationError)?;
let event: MinifluxEvent = serde_json::from_slice(&bytes).unwrap();
let event: MinifluxEvent = serde_json::from_slice(&bytes)?;
match event {
MinifluxEvent::New(_) => assert!(event_type == "new_entries"),
MinifluxEvent::Save(_) => assert!(event_type == "save_entry"),
}
send(user, ctx, event, config).await;
log::info!("received {} from miniflux!", event_type.to_str().unwrap_or_default());
Ok(Response::new(full(vec![])))
let res = holder.send(userid, event).await;
if let Err(err) = res {
log::error!("Error while trying to send discord message {}", err);
}
Ok(Response::new(Full::new(vec![].into())))
}
fn full<T: Into<Bytes>>(chunk: T) -> BoxBody<Bytes, hyper::Error> {
Full::new(chunk.into()).map_err(|never| match never {}).boxed()
#[derive(Error, Debug)]
pub enum CustomError {
#[error("id path not found")]
IdPathNotFound,
#[error("invalid method: {0}")]
InvalidMethod(Method),
#[error("parse int error: {0}")]
ParseIntError(#[from] std::num::ParseIntError),
#[error("{0} not allowed!")]
NotWhitelistedUser(u64),
#[error("signature missing")]
SignatureMissing,
#[error("event_type missing")]
EventTypeMissing,
#[error("signature decoding error: {0}")]
DecodeError(#[from] data_encoding::DecodeError),
#[error("payload too large ({0})")]
PayloadTooLarge(u64),
#[error("Hyper Error: {0}")]
HyperError(#[from] hyper::Error),
#[error("HMAC signature validation failed")]
HmacValidationError,
#[error("JSON deserialization failed: {0}")]
JsonError(#[from] serde_json::Error),
}
async fn send(user: User, ctx: impl CacheHttp + Copy, event: MinifluxEvent, config: &Config) {
match event {
MinifluxEvent::New(NewEntries { feed, entries }) => {
for entry in entries {
user.direct_message(ctx, message_from_entry(&entry, &feed, config)).await.unwrap();
}
impl Into<Response<Full<Bytes>>> for CustomError {
fn into(self) -> Response<Full<Bytes>> {
match &self {
CustomError::JsonError(_) => log::error!("{}", self),
CustomError::HmacValidationError => log::warn!("{}", self),
CustomError::HyperError(_) => log::warn!("{}", self),
CustomError::PayloadTooLarge(_) => log::warn!("{}", self),
CustomError::SignatureMissing => log::warn!("{}", self),
CustomError::EventTypeMissing => log::warn!("{}", self),
_ => log::debug!("{}", self),
}
_ => {}
let mut resp = Response::new(Full::from(Bytes::from(self.to_string())));
let status_code = match self {
CustomError::IdPathNotFound => StatusCode::NOT_FOUND,
CustomError::InvalidMethod(_) => StatusCode::METHOD_NOT_ALLOWED,
CustomError::ParseIntError(_) => StatusCode::BAD_REQUEST,
CustomError::NotWhitelistedUser(_) => StatusCode::FORBIDDEN,
CustomError::SignatureMissing => StatusCode::BAD_REQUEST,
CustomError::EventTypeMissing => StatusCode::BAD_REQUEST,
CustomError::DecodeError(_) => StatusCode::BAD_REQUEST,
CustomError::PayloadTooLarge(_) => StatusCode::PAYLOAD_TOO_LARGE,
CustomError::HyperError(_) => StatusCode::INTERNAL_SERVER_ERROR,
CustomError::HmacValidationError => StatusCode::FORBIDDEN,
CustomError::JsonError(_) => StatusCode::BAD_REQUEST,
};
*resp.status_mut() = status_code;
resp
}
}
fn message_from_entry(entry: &Entry, feed: &Feed, config: &Config) -> CreateMessage {
let author = CreateEmbedAuthor::new(&feed.title).url(&feed.site_url);
let footer = CreateEmbedFooter::new(format!("{} minutes", entry.reading_time.to_string()));
let minreq_url = format!("{}/feed/{}/entry/{}", config.miniflux_base_url, feed.id, entry.id);
let mut embed = CreateEmbed::new()
.title(&entry.title)
.url(&entry.url)
.footer(footer)
.timestamp(entry.published_at)
.author(author)
.color(Color::from_rgb(
(feed.id % 256) as u8,
((feed.id * feed.id) % 256) as u8,
((feed.id * feed.id * feed.id) % 256) as u8,
))
.description(&entry.content.chars().take(200).collect::<String>());
if entry.tags.len() > 0 {
embed = embed.field("Tags", entry.tags.join(","), true)
}
if let Some(enclosure) = entry.enclosures.iter().find(|e| e.mime_type.starts_with("image/")) {
embed = embed.image(&enclosure.url);
}
let external_button = CreateButton::new_link(&entry.url).label("external").emoji('📤');
let minreq_button = CreateButton::new_link(minreq_url).label("minreq").emoji('📩');
CreateMessage::new().embed(embed).button(external_button).button(minreq_button)
}