// Rocket generates a lot of warnings for handlers // TODO: figure out why #![allow(unreachable_patterns)] use std::{error::Error, io::Cursor, net::SocketAddr, str::FromStr, sync::Arc, time::Duration}; use async_graphql::{extensions, http::GraphiQLSource, Schema}; use async_graphql_axum::{GraphQL, GraphQLSubscription}; //allows to extract the IP of connecting user use axum::extract::connect_info::ConnectInfo; use axum::{ extract::{ws::WebSocketUpgrade, State}, response::{self, IntoResponse}, routing::{any, get}, Router, }; use axum_extra::TypedHeader; use cacher::FilesystemCacher; use letterbox_notmuch::{Notmuch, NotmuchError, ThreadSet}; #[cfg(feature = "tantivy")] use letterbox_server::tantivy::TantivyConnection; use letterbox_server::{ config::Config, error::ServerError, graphql::{ compute_catchup_ids, Attachment, GraphqlSchema, MutationRoot, QueryRoot, SubscriptionRoot, }, nm::{attachment_bytes, cid_attachment_bytes}, ws::ConnectionTracker, }; use letterbox_shared::WebsocketMessage; use sqlx::postgres::PgPool; use tokio::{net::TcpListener, sync::Mutex}; use tower_http::trace::{DefaultMakeSpan, TraceLayer}; use tracing::{error, info}; /* #[get("/show//pretty")] async fn show_pretty( nm: &State, query: &str, ) -> Result, Debug> { let query = urlencoding::decode(query).map_err(|e| ServerError::from(NotmuchError::from(e)))?; let res = nm.show(&query).map_err(ServerError::from)?; Ok(Json(res)) } #[get("/show/")] async fn show(nm: &State, query: &str) -> Result, Debug> { let query = urlencoding::decode(query).map_err(NotmuchError::from)?; let res = nm.show(&query)?; Ok(Json(res)) } struct InlineAttachmentResponder(Attachment); impl<'r, 'o: 'r> Responder<'r, 'o> for InlineAttachmentResponder { fn respond_to(self, _: &'r Request<'_>) -> rocket::response::Result<'o> { let mut resp = Response::build(); if let Some(filename) = self.0.filename { resp.header(Header::new( "Content-Disposition", format!(r#"inline; filename="{}""#, filename), )); } if let Some(content_type) = self.0.content_type { if let Some(ct) = ContentType::parse_flexible(&content_type) { resp.header(ct); } } resp.sized_body(self.0.bytes.len(), Cursor::new(self.0.bytes)) .ok() } } struct DownloadAttachmentResponder(Attachment); impl<'r, 'o: 'r> Responder<'r, 'o> for DownloadAttachmentResponder { fn respond_to(self, _: &'r Request<'_>) -> rocket::response::Result<'o> { let mut resp = Response::build(); if let Some(filename) = self.0.filename { resp.header(Header::new( "Content-Disposition", format!(r#"attachment; filename="{}""#, filename), )); } if let Some(content_type) = self.0.content_type { if let Some(ct) = ContentType::parse_flexible(&content_type) { resp.header(ct); } } resp.sized_body(self.0.bytes.len(), Cursor::new(self.0.bytes)) .ok() } } #[get("/cid//")] async fn view_cid( nm: &State, id: &str, cid: &str, ) -> Result> { let mid = if id.starts_with("id:") { id.to_string() } else { format!("id:{}", id) }; info!("view cid attachment {mid} {cid}"); let attachment = cid_attachment_bytes(nm, &mid, &cid)?; Ok(InlineAttachmentResponder(attachment)) } #[get("/view/attachment///<_>")] async fn view_attachment( nm: &State, id: &str, idx: &str, ) -> Result> { let mid = if id.starts_with("id:") { id.to_string() } else { format!("id:{}", id) }; info!("view attachment {mid} {idx}"); let idx: Vec<_> = idx .split('.') .map(|s| s.parse().expect("not a usize")) .collect(); let attachment = attachment_bytes(nm, &mid, &idx)?; Ok(InlineAttachmentResponder(attachment)) } #[get("/download/attachment///<_>")] async fn download_attachment( nm: &State, id: &str, idx: &str, ) -> Result> { let mid = if id.starts_with("id:") { id.to_string() } else { format!("id:{}", id) }; info!("download attachment {mid} {idx}"); let idx: Vec<_> = idx .split('.') .map(|s| s.parse().expect("not a usize")) .collect(); let attachment = attachment_bytes(nm, &mid, &idx)?; Ok(DownloadAttachmentResponder(attachment)) } #[get("/original/")] async fn original( nm: &State, id: &str, ) -> Result<(ContentType, Vec), Debug> { let mid = if id.starts_with("id:") { id.to_string() } else { format!("id:{}", id) }; let res = nm.show_original(&mid)?; Ok((ContentType::Plain, res)) } #[rocket::get("/")] fn graphiql() -> content::RawHtml { content::RawHtml( GraphiQLSource::build() .endpoint("/api/graphql") .subscription_endpoint("/api/graphql") .finish(), ) } #[rocket::get("/graphql?")] async fn graphql_query(schema: &State, query: GraphQLQuery) -> GraphQLResponse { query.execute(schema.inner()).await } #[rocket::post("/graphql", data = "", format = "application/json")] async fn graphql_request( schema: &State, request: GraphQLRequest, ) -> GraphQLResponse { request.execute(schema.inner()).await } #[rocket::main] async fn main() -> Result<(), Box> { let _guard = xtracing::init(env!("CARGO_BIN_NAME"))?; build_info::build_info!(fn bi); info!("Build Info: {}", letterbox_shared::build_version(bi)); let allowed_origins = AllowedOrigins::all(); let cors = rocket_cors::CorsOptions { allowed_origins, allowed_methods: vec!["Get"] .into_iter() .map(|s| FromStr::from_str(s).unwrap()) .collect(), allowed_headers: AllowedHeaders::some(&["Authorization", "Accept"]), allow_credentials: true, ..Default::default() } .to_cors()?; let rkt = rocket::build() .mount( letterbox_shared::urls::MOUNT_POINT, routes![ original, show_pretty, show, graphql_query, graphql_request, graphiql, view_cid, view_attachment, download_attachment, ], ) .attach(cors) .attach(AdHoc::config::()); let config: Config = rkt.figment().extract()?; if !std::fs::exists(&config.slurp_cache_path)? { info!("Creating slurp cache @ '{}'", &config.slurp_cache_path); std::fs::create_dir_all(&config.slurp_cache_path)?; } let pool = PgPool::connect(&config.newsreader_database_url).await?; sqlx::migrate!("./migrations").run(&pool).await?; #[cfg(feature = "tantivy")] let tantivy_conn = TantivyConnection::new(&config.newsreader_tantivy_db_path)?; let cacher = FilesystemCacher::new(&config.slurp_cache_path)?; let schema = Schema::build(QueryRoot, Mutation, Subscription) .data(Notmuch::default()) .data(cacher) .data(pool.clone()); #[cfg(feature = "tantivy")] let schema = schema.data(tantivy_conn); let schema = schema.extension(extensions::Logger).finish(); let rkt = rkt.manage(schema).manage(pool).manage(Notmuch::default()); //.manage(Notmuch::with_config("../notmuch/testdata/notmuch.config")) rkt.launch().await?; Ok(()) } */ async fn graphiql() -> impl IntoResponse { response::Html( GraphiQLSource::build() .endpoint("/api/graphql/") .subscription_endpoint("/api/graphql/ws") .finish(), ) } async fn start_ws( ws: WebSocketUpgrade, ConnectInfo(addr): ConnectInfo, State(connection_tracker): State>>, ) -> impl IntoResponse { ws.on_upgrade(async move |socket| connection_tracker.lock().await.add_peer(socket, addr)) } #[axum_macros::debug_handler] async fn test_handler( State(connection_tracker): State>>, ) -> impl IntoResponse { connection_tracker .lock() .await .send_message_all(WebsocketMessage::RefreshMessages) .await; "test triggered" } #[tokio::main] async fn main() -> Result<(), Box> { let _guard = xtracing::init(env!("CARGO_BIN_NAME"))?; build_info::build_info!(fn bi); info!("Build Info: {}", letterbox_shared::build_version(bi)); // TODO: move these to config let port = 9345; let config = Config { newsreader_database_url: "postgres://newsreader@nixos-07.h.xinu.tv/newsreader".to_string(), newsreader_tantivy_db_path: "../target/database/newsreader".to_string(), slurp_cache_path: "/tmp/letterbox/slurp".to_string(), }; if !std::fs::exists(&config.slurp_cache_path)? { info!("Creating slurp cache @ '{}'", &config.slurp_cache_path); std::fs::create_dir_all(&config.slurp_cache_path)?; } let pool = PgPool::connect(&config.newsreader_database_url).await?; let nm = Notmuch::default(); sqlx::migrate!("./migrations").run(&pool).await?; #[cfg(feature = "tantivy")] let tantivy_conn = TantivyConnection::new(&config.newsreader_tantivy_db_path)?; let cacher = FilesystemCacher::new(&config.slurp_cache_path)?; let schema = Schema::build(QueryRoot, MutationRoot, SubscriptionRoot) .data(nm.clone()) .data(cacher) .data(pool.clone()); let schema = schema.extension(extensions::Logger).finish(); let conn_tracker = Arc::new(Mutex::new(ConnectionTracker::default())); async fn watch_new( nm: Notmuch, pool: PgPool, conn_tracker: Arc>, poll_time: Duration, ) -> Result<(), async_graphql::Error> { let mut old_ids = Vec::new(); loop { let ids = compute_catchup_ids(&nm, &pool, "is:unread").await?; if old_ids != ids { info!("old_ids: {old_ids:?}\n ids: {ids:?}"); conn_tracker .lock() .await .send_message_all(WebsocketMessage::RefreshMessages) .await } old_ids = ids; tokio::time::sleep(poll_time).await; } } let ct = Arc::clone(&conn_tracker); let poll_time = Duration::from_secs(10); let _h = tokio::spawn(watch_new(nm, pool, ct, poll_time)); let app = Router::new() .route("/test", get(test_handler)) .route("/api/ws", any(start_ws)) .route_service("/api/graphql/ws", GraphQLSubscription::new(schema.clone())) .route( "/api/graphql/", get(graphiql).post_service(GraphQL::new(schema.clone())), ) .with_state(conn_tracker) .layer( TraceLayer::new_for_http() .make_span_with(DefaultMakeSpan::default().include_headers(true)), ); let listener = TcpListener::bind(SocketAddr::from(([0, 0, 0, 0], port))) .await .unwrap(); tracing::info!("listening on {}", listener.local_addr().unwrap()); axum::serve( listener, app.into_make_service_with_connect_info::(), ) .await .unwrap(); Ok(()) }