diff --git a/server/src/bin/email2db.rs b/server/src/bin/email2db.rs deleted file mode 100644 index ed4503a..0000000 --- a/server/src/bin/email2db.rs +++ /dev/null @@ -1,22 +0,0 @@ -use clap::Parser; -use letterbox_server::mail::read_mail_to_db; -use sqlx::postgres::PgPool; - -/// Add certain emails as posts in newsfeed app. -#[derive(Parser, Debug)] -#[command(author, version, about, long_about = None)] -struct Args { - /// DB URL, something like postgres://newsreader@nixos-07.h.xinu.tv/newsreader - #[arg(short, long)] - db_url: String, - /// path to parse - path: String, -} -#[tokio::main] -async fn main() -> anyhow::Result<()> { - let _guard = xtracing::init(env!("CARGO_BIN_NAME"))?; - let args = Args::parse(); - let pool = PgPool::connect(&args.db_url).await?; - read_mail_to_db(&pool, &args.path).await?; - Ok(()) -} diff --git a/server/src/bin/letterbox-server.rs b/server/src/bin/letterbox-server.rs index 56b46ac..4d03437 100644 --- a/server/src/bin/letterbox-server.rs +++ b/server/src/bin/letterbox-server.rs @@ -1,29 +1,26 @@ // Rocket generates a lot of warnings for handlers // TODO: figure out why #![allow(unreachable_patterns)] -use std::{error::Error, io::Cursor, net::SocketAddr, str::FromStr, sync::Arc, time::Duration}; +use std::{error::Error, net::SocketAddr, sync::Arc, time::Duration}; use async_graphql::{extensions, http::GraphiQLSource, Schema}; use async_graphql_axum::{GraphQL, GraphQLSubscription}; //allows to extract the IP of connecting user use axum::extract::connect_info::ConnectInfo; use axum::{ - extract::{ws::WebSocketUpgrade, State}, - response::{self, IntoResponse}, + extract::{self, ws::WebSocketUpgrade, State}, + http::{header, StatusCode}, + response::{self, IntoResponse, Response}, routing::{any, get}, Router, }; -use axum_extra::TypedHeader; use cacher::FilesystemCacher; -use letterbox_notmuch::{Notmuch, NotmuchError, ThreadSet}; +use letterbox_notmuch::Notmuch; #[cfg(feature = "tantivy")] use letterbox_server::tantivy::TantivyConnection; use letterbox_server::{ config::Config, - error::ServerError, - graphql::{ - compute_catchup_ids, Attachment, GraphqlSchema, MutationRoot, QueryRoot, SubscriptionRoot, - }, + graphql::{compute_catchup_ids, Attachment, MutationRoot, QueryRoot, SubscriptionRoot}, nm::{attachment_bytes, cid_attachment_bytes}, ws::ConnectionTracker, }; @@ -31,90 +28,73 @@ use letterbox_shared::WebsocketMessage; use sqlx::postgres::PgPool; use tokio::{net::TcpListener, sync::Mutex}; use tower_http::trace::{DefaultMakeSpan, TraceLayer}; -use tracing::{error, info}; +use tracing::info; -/* -#[get("/show//pretty")] -async fn show_pretty( - nm: &State, - query: &str, -) -> Result, Debug> { - let query = urlencoding::decode(query).map_err(|e| ServerError::from(NotmuchError::from(e)))?; - let res = nm.show(&query).map_err(ServerError::from)?; - Ok(Json(res)) +// Make our own error that wraps `anyhow::Error`. +struct AppError(letterbox_server::ServerError); + +// Tell axum how to convert `AppError` into a response. +impl IntoResponse for AppError { + fn into_response(self) -> Response { + ( + StatusCode::INTERNAL_SERVER_ERROR, + format!("Something went wrong: {}", self.0), + ) + .into_response() + } } - -#[get("/show/")] -async fn show(nm: &State, query: &str) -> Result, Debug> { - let query = urlencoding::decode(query).map_err(NotmuchError::from)?; - let res = nm.show(&query)?; - Ok(Json(res)) -} - -struct InlineAttachmentResponder(Attachment); - -impl<'r, 'o: 'r> Responder<'r, 'o> for InlineAttachmentResponder { - fn respond_to(self, _: &'r Request<'_>) -> rocket::response::Result<'o> { - let mut resp = Response::build(); - if let Some(filename) = self.0.filename { - resp.header(Header::new( - "Content-Disposition", - format!(r#"inline; filename="{}""#, filename), - )); - } - if let Some(content_type) = self.0.content_type { - if let Some(ct) = ContentType::parse_flexible(&content_type) { - resp.header(ct); - } - } - resp.sized_body(self.0.bytes.len(), Cursor::new(self.0.bytes)) - .ok() +// This enables using `?` on functions that return `Result<_, letterbox_server::Error>` to turn them into +// `Result<_, AppError>`. That way you don't need to do that manually. +impl From for AppError +where + E: Into, +{ + fn from(err: E) -> Self { + Self(err.into()) } } -struct DownloadAttachmentResponder(Attachment); - -impl<'r, 'o: 'r> Responder<'r, 'o> for DownloadAttachmentResponder { - fn respond_to(self, _: &'r Request<'_>) -> rocket::response::Result<'o> { - let mut resp = Response::build(); - if let Some(filename) = self.0.filename { - resp.header(Header::new( - "Content-Disposition", - format!(r#"attachment; filename="{}""#, filename), - )); - } - if let Some(content_type) = self.0.content_type { - if let Some(ct) = ContentType::parse_flexible(&content_type) { - resp.header(ct); - } - } - resp.sized_body(self.0.bytes.len(), Cursor::new(self.0.bytes)) - .ok() +fn inline_attachment_response(attachment: Attachment) -> impl IntoResponse { + info!("attachment filename {:?}", attachment.filename); + let mut hdr_map = headers::HeaderMap::new(); + if let Some(filename) = attachment.filename { + hdr_map.insert( + header::CONTENT_DISPOSITION, + format!(r#"inline; filename="{}""#, filename) + .parse() + .unwrap(), + ); } + if let Some(ct) = attachment.content_type { + hdr_map.insert(header::CONTENT_TYPE, ct.parse().unwrap()); + } + info!("hdr_map {hdr_map:?}"); + (hdr_map, attachment.bytes).into_response() } -#[get("/cid//")] -async fn view_cid( - nm: &State, - id: &str, - cid: &str, -) -> Result> { - let mid = if id.starts_with("id:") { - id.to_string() - } else { - format!("id:{}", id) - }; - info!("view cid attachment {mid} {cid}"); - let attachment = cid_attachment_bytes(nm, &mid, &cid)?; - Ok(InlineAttachmentResponder(attachment)) +fn download_attachment_response(attachment: Attachment) -> impl IntoResponse { + info!("attachment filename {:?}", attachment.filename); + let mut hdr_map = headers::HeaderMap::new(); + if let Some(filename) = attachment.filename { + hdr_map.insert( + header::CONTENT_DISPOSITION, + format!(r#"attachment; filename="{}""#, filename) + .parse() + .unwrap(), + ); + } + if let Some(ct) = attachment.content_type { + hdr_map.insert(header::CONTENT_TYPE, ct.parse().unwrap()); + } + info!("hdr_map {hdr_map:?}"); + (hdr_map, attachment.bytes).into_response() } -#[get("/view/attachment///<_>")] +#[axum_macros::debug_handler] async fn view_attachment( - nm: &State, - id: &str, - idx: &str, -) -> Result> { + State(AppState { nm, .. }): State, + extract::Path((id, idx, _)): extract::Path<(String, String, String)>, +) -> Result { let mid = if id.starts_with("id:") { id.to_string() } else { @@ -125,16 +105,14 @@ async fn view_attachment( .split('.') .map(|s| s.parse().expect("not a usize")) .collect(); - let attachment = attachment_bytes(nm, &mid, &idx)?; - Ok(InlineAttachmentResponder(attachment)) + let attachment = attachment_bytes(&nm, &mid, &idx)?; + Ok(inline_attachment_response(attachment)) } -#[get("/download/attachment///<_>")] async fn download_attachment( - nm: &State, - id: &str, - idx: &str, -) -> Result> { + State(AppState { nm, .. }): State, + extract::Path((id, idx, _)): extract::Path<(String, String, String)>, +) -> Result { let mid = if id.starts_with("id:") { id.to_string() } else { @@ -145,112 +123,24 @@ async fn download_attachment( .split('.') .map(|s| s.parse().expect("not a usize")) .collect(); - let attachment = attachment_bytes(nm, &mid, &idx)?; - Ok(DownloadAttachmentResponder(attachment)) + let attachment = attachment_bytes(&nm, &mid, &idx)?; + Ok(download_attachment_response(attachment)) } -#[get("/original/")] -async fn original( - nm: &State, - id: &str, -) -> Result<(ContentType, Vec), Debug> { +async fn view_cid( + State(AppState { nm, .. }): State, + extract::Path((id, cid)): extract::Path<(String, String)>, +) -> Result { let mid = if id.starts_with("id:") { id.to_string() } else { format!("id:{}", id) }; - let res = nm.show_original(&mid)?; - Ok((ContentType::Plain, res)) + info!("view cid attachment {mid} {cid}"); + let attachment = cid_attachment_bytes(&nm, &mid, &cid)?; + Ok(inline_attachment_response(attachment)) } -#[rocket::get("/")] -fn graphiql() -> content::RawHtml { - content::RawHtml( - GraphiQLSource::build() - .endpoint("/api/graphql") - .subscription_endpoint("/api/graphql") - .finish(), - ) -} - -#[rocket::get("/graphql?")] -async fn graphql_query(schema: &State, query: GraphQLQuery) -> GraphQLResponse { - query.execute(schema.inner()).await -} - -#[rocket::post("/graphql", data = "", format = "application/json")] -async fn graphql_request( - schema: &State, - request: GraphQLRequest, -) -> GraphQLResponse { - request.execute(schema.inner()).await -} - -#[rocket::main] -async fn main() -> Result<(), Box> { - let _guard = xtracing::init(env!("CARGO_BIN_NAME"))?; - build_info::build_info!(fn bi); - info!("Build Info: {}", letterbox_shared::build_version(bi)); - let allowed_origins = AllowedOrigins::all(); - let cors = rocket_cors::CorsOptions { - allowed_origins, - allowed_methods: vec!["Get"] - .into_iter() - .map(|s| FromStr::from_str(s).unwrap()) - .collect(), - allowed_headers: AllowedHeaders::some(&["Authorization", "Accept"]), - allow_credentials: true, - ..Default::default() - } - .to_cors()?; - - let rkt = rocket::build() - .mount( - letterbox_shared::urls::MOUNT_POINT, - routes![ - original, - show_pretty, - show, - graphql_query, - graphql_request, - graphiql, - view_cid, - view_attachment, - download_attachment, - ], - ) - .attach(cors) - .attach(AdHoc::config::()); - - let config: Config = rkt.figment().extract()?; - if !std::fs::exists(&config.slurp_cache_path)? { - info!("Creating slurp cache @ '{}'", &config.slurp_cache_path); - std::fs::create_dir_all(&config.slurp_cache_path)?; - } - let pool = PgPool::connect(&config.newsreader_database_url).await?; - sqlx::migrate!("./migrations").run(&pool).await?; - #[cfg(feature = "tantivy")] - let tantivy_conn = TantivyConnection::new(&config.newsreader_tantivy_db_path)?; - - let cacher = FilesystemCacher::new(&config.slurp_cache_path)?; - let schema = Schema::build(QueryRoot, Mutation, Subscription) - .data(Notmuch::default()) - .data(cacher) - .data(pool.clone()); - - #[cfg(feature = "tantivy")] - let schema = schema.data(tantivy_conn); - - let schema = schema.extension(extensions::Logger).finish(); - - let rkt = rkt.manage(schema).manage(pool).manage(Notmuch::default()); - //.manage(Notmuch::with_config("../notmuch/testdata/notmuch.config")) - - rkt.launch().await?; - Ok(()) -} -*/ - async fn graphiql() -> impl IntoResponse { response::Html( GraphiQLSource::build() @@ -263,13 +153,17 @@ async fn graphiql() -> impl IntoResponse { async fn start_ws( ws: WebSocketUpgrade, ConnectInfo(addr): ConnectInfo, - State(connection_tracker): State>>, + State(AppState { + connection_tracker, .. + }): State, ) -> impl IntoResponse { - ws.on_upgrade(async move |socket| connection_tracker.lock().await.add_peer(socket, addr)) + ws.on_upgrade(async move |socket| connection_tracker.lock().await.add_peer(socket, addr).await) } #[axum_macros::debug_handler] async fn test_handler( - State(connection_tracker): State>>, + State(AppState { + connection_tracker, .. + }): State, ) -> impl IntoResponse { connection_tracker .lock() @@ -279,6 +173,34 @@ async fn test_handler( "test triggered" } +async fn watch_new( + nm: Notmuch, + pool: PgPool, + conn_tracker: Arc>, + poll_time: Duration, +) -> Result<(), async_graphql::Error> { + let mut old_ids = Vec::new(); + loop { + let ids = compute_catchup_ids(&nm, &pool, "is:unread").await?; + if old_ids != ids { + info!("old_ids: {old_ids:?}\n ids: {ids:?}"); + conn_tracker + .lock() + .await + .send_message_all(WebsocketMessage::RefreshMessages) + .await + } + old_ids = ids; + tokio::time::sleep(poll_time).await; + } +} + +#[derive(Clone)] +struct AppState { + nm: Notmuch, + connection_tracker: Arc>, +} + #[tokio::main] async fn main() -> Result<(), Box> { let _guard = xtracing::init(env!("CARGO_BIN_NAME"))?; @@ -309,41 +231,32 @@ async fn main() -> Result<(), Box> { let schema = schema.extension(extensions::Logger).finish(); - let conn_tracker = Arc::new(Mutex::new(ConnectionTracker::default())); - async fn watch_new( - nm: Notmuch, - pool: PgPool, - conn_tracker: Arc>, - poll_time: Duration, - ) -> Result<(), async_graphql::Error> { - let mut old_ids = Vec::new(); - loop { - let ids = compute_catchup_ids(&nm, &pool, "is:unread").await?; - if old_ids != ids { - info!("old_ids: {old_ids:?}\n ids: {ids:?}"); - conn_tracker - .lock() - .await - .send_message_all(WebsocketMessage::RefreshMessages) - .await - } - old_ids = ids; - tokio::time::sleep(poll_time).await; - } - } - let ct = Arc::clone(&conn_tracker); + let connection_tracker = Arc::new(Mutex::new(ConnectionTracker::default())); + let ct = Arc::clone(&connection_tracker); let poll_time = Duration::from_secs(10); - let _h = tokio::spawn(watch_new(nm, pool, ct, poll_time)); + let _h = tokio::spawn(watch_new(nm.clone(), pool, ct, poll_time)); let app = Router::new() .route("/test", get(test_handler)) + .route( + "/api/download/attachment/{id}/{idx}/{*rest}", + get(download_attachment), + ) + .route( + "/api/view/attachment/{id}/{idx}/{*rest}", + get(view_attachment), + ) + .route("/api/cid/{id}/{cid}", get(view_cid)) .route("/api/ws", any(start_ws)) .route_service("/api/graphql/ws", GraphQLSubscription::new(schema.clone())) .route( "/api/graphql/", get(graphiql).post_service(GraphQL::new(schema.clone())), ) - .with_state(conn_tracker) + .with_state(AppState { + nm, + connection_tracker, + }) .layer( TraceLayer::new_for_http() .make_span_with(DefaultMakeSpan::default().include_headers(true)), diff --git a/server/src/graphql.rs b/server/src/graphql.rs index 2371dc2..8d58f6e 100644 --- a/server/src/graphql.rs +++ b/server/src/graphql.rs @@ -2,7 +2,7 @@ use std::{fmt, str::FromStr}; use async_graphql::{ connection::{self, Connection, Edge, OpaqueCursor}, - futures_util::{Stream, StreamExt}, + futures_util::Stream, Context, Enum, Error, FieldResult, InputObject, Object, Schema, SimpleObject, Subscription, Union, }; @@ -642,7 +642,7 @@ impl MutationRoot { pub struct SubscriptionRoot; #[Subscription] impl SubscriptionRoot { - async fn values(&self, ctx: &Context<'_>) -> Result, Error> { + async fn values(&self, _ctx: &Context<'_>) -> Result, Error> { Ok(stream::iter(0..10)) } } diff --git a/server/src/lib.rs b/server/src/lib.rs index a2d20d4..99352e8 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -1,7 +1,6 @@ pub mod config; pub mod error; pub mod graphql; -pub mod mail; pub mod newsreader; pub mod nm; pub mod ws; @@ -20,6 +19,7 @@ use std::{ use async_trait::async_trait; use cacher::{Cacher, FilesystemCacher}; use css_inline::{CSSInliner, InlineError, InlineOptions}; +pub use error::ServerError; use linkify::{LinkFinder, LinkKind}; use log::{debug, error, info, warn}; use lol_html::{ @@ -35,7 +35,6 @@ use thiserror::Error; use url::Url; use crate::{ - error::ServerError, graphql::{Corpus, ThreadSummary}, newsreader::is_newsreader_thread, nm::is_notmuch_thread_or_id, diff --git a/server/src/mail.rs b/server/src/mail.rs deleted file mode 100644 index 19bd583..0000000 --- a/server/src/mail.rs +++ /dev/null @@ -1,111 +0,0 @@ -use std::{fs::File, io::Read}; - -use mailparse::{ - addrparse_header, dateparse, parse_mail, MailHeaderMap, MailParseError, ParsedMail, -}; -use sqlx::postgres::PgPool; -use thiserror::Error; -use tracing::info; - -#[derive(Error, Debug)] -pub enum MailError { - #[error("missing from header")] - MissingFrom, - #[error("missing from header display name")] - MissingFromDisplayName, - #[error("missing subject header")] - MissingSubject, - #[error("missing html part")] - MissingHtmlPart, - #[error("missing message ID")] - MissingMessageId, - #[error("missing date")] - MissingDate, - #[error("DB error {0}")] - SqlxError(#[from] sqlx::Error), - #[error("IO error {0}")] - IOError(#[from] std::io::Error), - #[error("mail parse error {0}")] - MailParseError(#[from] MailParseError), -} - -pub async fn read_mail_to_db(pool: &PgPool, path: &str) -> Result<(), MailError> { - let mut file = File::open(path)?; - let mut buffer = Vec::new(); - file.read_to_end(&mut buffer)?; - let m = parse_mail(&buffer)?; - - let subject = m - .headers - .get_first_value("subject") - .ok_or(MailError::MissingSubject)?; - - let from = addrparse_header( - m.headers - .get_first_header("from") - .ok_or(MailError::MissingFrom)?, - )?; - let from = from.extract_single_info().ok_or(MailError::MissingFrom)?; - let name = from.display_name.ok_or(MailError::MissingFromDisplayName)?; - let slug = name.to_lowercase().replace(' ', "-"); - let url = from.addr; - let message_id = m - .headers - .get_first_value("Message-ID") - .ok_or(MailError::MissingMessageId)?; - let uid = &message_id; - let feed_id = find_feed(&pool, &name, &slug, &url).await?; - let date = dateparse( - &m.headers - .get_first_value("Date") - .ok_or(MailError::MissingDate)?, - )?; - - if let Some(_m) = first_html(&m) { - info!("add email {slug} {subject} {message_id} {date} {uid} {url}"); - } else { - return Err(MailError::MissingHtmlPart.into()); - } - - Ok(()) -} -fn first_html<'m>(m: &'m ParsedMail<'m>) -> Option<&'m ParsedMail<'m>> { - for ele in m.parts() { - if ele.ctype.mimetype == "text/html" { - return Some(ele); - } - } - None -} -async fn find_feed(pool: &PgPool, name: &str, slug: &str, url: &str) -> Result { - match sqlx::query!( - r#" -SELECT id -FROM feed -WHERE slug = $1 - "#, - slug - ) - .fetch_one(pool) - .await - { - Err(sqlx::Error::RowNotFound) => { - let rec = sqlx::query!( - r#" -INSERT INTO feed ( name, slug, url, homepage, selector ) -VALUES ( $1, $2, $3, '', '' ) -RETURNING id - "#, - name, - slug, - url - ) - .fetch_one(pool) - .await?; - - return Ok(rec.id); - } - Ok(rec) => return Ok(rec.id), - Err(e) => return Err(e.into()), - }; -} diff --git a/server/src/nm.rs b/server/src/nm.rs index 2e1947d..dfc22a0 100644 --- a/server/src/nm.rs +++ b/server/src/nm.rs @@ -338,7 +338,7 @@ pub async fn thread( } fn email_addresses( - path: &str, + _path: &str, m: &ParsedMail, header_name: &str, ) -> Result, ServerError> { diff --git a/server/src/ws.rs b/server/src/ws.rs index bf81620..ab8a748 100644 --- a/server/src/ws.rs +++ b/server/src/ws.rs @@ -10,10 +10,11 @@ pub struct ConnectionTracker { } impl ConnectionTracker { - pub fn add_peer(&mut self, socket: WebSocket, who: SocketAddr) { + pub async fn add_peer(&mut self, socket: WebSocket, who: SocketAddr) { warn!("adding {who:?} to connection tracker"); self.peers.insert(who, socket); - self.send_message_all(WebsocketMessage::RefreshMessages); + self.send_message_all(WebsocketMessage::RefreshMessages) + .await; } pub async fn send_message_all(&mut self, msg: WebsocketMessage) { info!("send_message_all {msg}");