// Rocket generates a lot of warnings for handlers // TODO: figure out why #![allow(unreachable_patterns)] use std::{error::Error, net::SocketAddr, sync::Arc, time::Duration}; use async_graphql::{extensions, http::GraphiQLSource, Schema}; use async_graphql_axum::{GraphQL, GraphQLSubscription}; //allows to extract the IP of connecting user use axum::extract::connect_info::ConnectInfo; use axum::{ extract::{self, ws::WebSocketUpgrade, Query, State}, http::{header, StatusCode}, response::{self, IntoResponse, Response}, routing::{any, get, post}, Router, }; use cacher::FilesystemCacher; use clap::Parser; use letterbox_notmuch::Notmuch; #[cfg(feature = "tantivy")] use letterbox_server::tantivy::TantivyConnection; use letterbox_server::{ graphql::{compute_catchup_ids, Attachment, MutationRoot, QueryRoot, SubscriptionRoot}, nm::{attachment_bytes, cid_attachment_bytes, label_unprocessed}, ws::ConnectionTracker, }; use letterbox_shared::WebsocketMessage; use serde::Deserialize; use sqlx::postgres::PgPool; use tokio::{net::TcpListener, sync::Mutex}; use tower_http::trace::{DefaultMakeSpan, TraceLayer}; use tracing::{error, info}; // Make our own error that wraps `ServerError`. struct AppError(letterbox_server::ServerError); // Tell axum how to convert `AppError` into a response. impl IntoResponse for AppError { fn into_response(self) -> Response { ( StatusCode::INTERNAL_SERVER_ERROR, format!("Something went wrong: {}", self.0), ) .into_response() } } // This enables using `?` on functions that return `Result<_, letterbox_server::Error>` to turn them into // `Result<_, AppError>`. That way you don't need to do that manually. impl From for AppError where E: Into, { fn from(err: E) -> Self { Self(err.into()) } } fn inline_attachment_response(attachment: Attachment) -> impl IntoResponse { info!("attachment filename {:?}", attachment.filename); let mut hdr_map = headers::HeaderMap::new(); if let Some(filename) = attachment.filename { hdr_map.insert( header::CONTENT_DISPOSITION, format!(r#"inline; filename="{}""#, filename) .parse() .unwrap(), ); } if let Some(ct) = attachment.content_type { hdr_map.insert(header::CONTENT_TYPE, ct.parse().unwrap()); } info!("hdr_map {hdr_map:?}"); (hdr_map, attachment.bytes).into_response() } fn download_attachment_response(attachment: Attachment) -> impl IntoResponse { info!("attachment filename {:?}", attachment.filename); let mut hdr_map = headers::HeaderMap::new(); if let Some(filename) = attachment.filename { hdr_map.insert( header::CONTENT_DISPOSITION, format!(r#"attachment; filename="{}""#, filename) .parse() .unwrap(), ); } if let Some(ct) = attachment.content_type { hdr_map.insert(header::CONTENT_TYPE, ct.parse().unwrap()); } info!("hdr_map {hdr_map:?}"); (hdr_map, attachment.bytes).into_response() } #[axum_macros::debug_handler] async fn view_attachment( State(AppState { nm, .. }): State, extract::Path((id, idx, _)): extract::Path<(String, String, String)>, ) -> Result { let mid = if id.starts_with("id:") { id.to_string() } else { format!("id:{}", id) }; info!("view attachment {mid} {idx}"); let idx: Vec<_> = idx .split('.') .map(|s| s.parse().expect("not a usize")) .collect(); let attachment = attachment_bytes(&nm, &mid, &idx)?; Ok(inline_attachment_response(attachment)) } async fn download_attachment( State(AppState { nm, .. }): State, extract::Path((id, idx, _)): extract::Path<(String, String, String)>, ) -> Result { let mid = if id.starts_with("id:") { id.to_string() } else { format!("id:{}", id) }; info!("download attachment {mid} {idx}"); let idx: Vec<_> = idx .split('.') .map(|s| s.parse().expect("not a usize")) .collect(); let attachment = attachment_bytes(&nm, &mid, &idx)?; Ok(download_attachment_response(attachment)) } async fn view_cid( State(AppState { nm, .. }): State, extract::Path((id, cid)): extract::Path<(String, String)>, ) -> Result { let mid = if id.starts_with("id:") { id.to_string() } else { format!("id:{}", id) }; info!("view cid attachment {mid} {cid}"); let attachment = cid_attachment_bytes(&nm, &mid, &cid)?; Ok(inline_attachment_response(attachment)) } // TODO make this work with gitea message ids like `wathiede/letterbox/pulls/91@git.z.xinu.tv` async fn view_original( State(AppState { nm, .. }): State, extract::Path(id): extract::Path, ) -> Result { info!("view_original {id}"); let bytes = nm.show_original(&id)?; let s = String::from_utf8_lossy(&bytes).to_string(); Ok(s.into_response()) } async fn graphiql() -> impl IntoResponse { response::Html( GraphiQLSource::build() .endpoint("/api/graphql/") .subscription_endpoint("/api/graphql/ws") .finish(), ) } async fn start_ws( ws: WebSocketUpgrade, ConnectInfo(addr): ConnectInfo, State(AppState { connection_tracker, .. }): State, ) -> impl IntoResponse { info!("intiating websocket connection for {addr}"); ws.on_upgrade(async move |socket| connection_tracker.lock().await.add_peer(socket, addr).await) } #[derive(Debug, Deserialize)] struct NotificationParams { delay_ms: Option, num_unprocessed: Option, } async fn send_refresh_websocket_handler( State(AppState { nm, pool, connection_tracker, .. }): State, params: Query, ) -> impl IntoResponse { info!("send_refresh_websocket_handler params {params:?}"); if let Some(delay_ms) = params.delay_ms { let delay = Duration::from_millis(delay_ms); info!("sleeping {delay:?}"); tokio::time::sleep(delay).await; } let limit = match params.num_unprocessed { Some(0) => None, Some(limit) => Some(limit), None => Some(10), }; let mut ids = None; match label_unprocessed(&nm, &pool, false, limit, "tag:unprocessed").await { Ok(i) => ids = Some(i), Err(err) => error!("Failed to label_unprocessed: {err:?}"), }; connection_tracker .lock() .await .send_message_all(WebsocketMessage::RefreshMessages) .await; if let Some(ids) = ids { format!("{ids:?}") } else { "refresh triggered".to_string() } } async fn watch_new( nm: Notmuch, pool: PgPool, conn_tracker: Arc>, poll_time: Duration, ) -> Result<(), async_graphql::Error> { async fn watch_new_iteration( nm: &Notmuch, pool: &PgPool, conn_tracker: Arc>, old_ids: &[String], ) -> Result, async_graphql::Error> { let ids = compute_catchup_ids(&nm, &pool, "is:unread").await?; info!("old_ids: {} ids: {}", old_ids.len(), ids.len()); if old_ids != ids { label_unprocessed(&nm, &pool, false, Some(100), "tag:unprocessed").await?; conn_tracker .lock() .await .send_message_all(WebsocketMessage::RefreshMessages) .await } Ok(ids) } let mut old_ids = Vec::new(); loop { old_ids = match watch_new_iteration(&nm, &pool, conn_tracker.clone(), &old_ids).await { Ok(old_ids) => old_ids, Err(err) => { error!("watch_new_iteration failed: {err:?}"); continue; } }; tokio::time::sleep(poll_time).await; } } #[derive(Clone)] struct AppState { nm: Notmuch, pool: PgPool, connection_tracker: Arc>, } #[derive(Parser)] #[command(version, about, long_about = None)] struct Cli { #[arg(short, long, default_value = "0.0.0.0:9345")] addr: SocketAddr, newsreader_database_url: String, newsreader_tantivy_db_path: String, slurp_cache_path: String, } #[tokio::main] async fn main() -> Result<(), Box> { let cli = Cli::parse(); let _guard = xtracing::init(env!("CARGO_BIN_NAME"))?; build_info::build_info!(fn bi); info!("Build Info: {}", letterbox_shared::build_version(bi)); if !std::fs::exists(&cli.slurp_cache_path)? { info!("Creating slurp cache @ '{}'", &cli.slurp_cache_path); std::fs::create_dir_all(&cli.slurp_cache_path)?; } let pool = PgPool::connect(&cli.newsreader_database_url).await?; let nm = Notmuch::default(); sqlx::migrate!("./migrations").run(&pool).await?; #[cfg(feature = "tantivy")] let tantivy_conn = TantivyConnection::new(&cli.newsreader_tantivy_db_path)?; let cacher = FilesystemCacher::new(&cli.slurp_cache_path)?; let schema = Schema::build(QueryRoot, MutationRoot, SubscriptionRoot) .data(nm.clone()) .data(cacher) .data(pool.clone()); let schema = schema.extension(extensions::Logger).finish(); let connection_tracker = Arc::new(Mutex::new(ConnectionTracker::default())); let ct = Arc::clone(&connection_tracker); let poll_time = Duration::from_secs(60); let _h = tokio::spawn(watch_new(nm.clone(), pool.clone(), ct, poll_time)); let api_routes = Router::new() .route( "/download/attachment/{id}/{idx}/{*rest}", get(download_attachment), ) .route("/view/attachment/{id}/{idx}/{*rest}", get(view_attachment)) .route("/original/{id}", get(view_original)) .route("/cid/{id}/{cid}", get(view_cid)) .route("/ws", any(start_ws)) .route_service("/graphql/ws", GraphQLSubscription::new(schema.clone())) .route( "/graphql/", get(graphiql).post_service(GraphQL::new(schema.clone())), ); let notification_routes = Router::new() .route("/mail", post(send_refresh_websocket_handler)) .route("/news", post(send_refresh_websocket_handler)); let app = Router::new() .nest("/api", api_routes) .nest("/notification", notification_routes) .with_state(AppState { nm, pool, connection_tracker, }) .layer( TraceLayer::new_for_http() .make_span_with(DefaultMakeSpan::default().include_headers(true)), ); let listener = TcpListener::bind(cli.addr).await.unwrap(); tracing::info!("listening on {}", listener.local_addr().unwrap()); axum::serve( listener, app.into_make_service_with_connect_info::(), ) .await .unwrap(); Ok(()) }