use std::{fmt, str::FromStr}; use async_graphql::{ connection::{self, Connection, Edge, OpaqueCursor}, Context, EmptySubscription, Enum, Error, FieldResult, InputObject, Object, Schema, SimpleObject, Union, }; use cacher::FilesystemCacher; use letterbox_notmuch::Notmuch; use log::info; use serde::{Deserialize, Serialize}; use sqlx::postgres::PgPool; use tokio::join; use tracing::instrument; #[cfg(feature = "tantivy")] use crate::tantivy::TantivyConnection; use crate::{newsreader, nm, Query}; /// # Number of seconds since the Epoch pub type UnixTime = isize; /// # Thread ID, sans "thread:" pub type ThreadId = String; #[derive(Debug, Enum, Copy, Clone, Eq, PartialEq)] pub enum Corpus { Notmuch, Newsreader, Tantivy, } impl FromStr for Corpus { type Err = String; fn from_str(s: &str) -> Result { Ok(match s { "notmuch" => Corpus::Notmuch, "newsreader" => Corpus::Newsreader, "tantivy" => Corpus::Tantivy, s => return Err(format!("unknown corpus: '{s}'")), }) } } // TODO: add is_read field and remove all use of 'tag:unread' #[derive(Debug, SimpleObject)] pub struct ThreadSummary { pub thread: ThreadId, pub timestamp: UnixTime, /// user-friendly timestamp pub date_relative: String, /// number of matched messages pub matched: isize, /// total messages in thread pub total: isize, /// comma-separated names with | between matched and unmatched pub authors: String, pub subject: String, pub tags: Vec, pub corpus: Corpus, } #[derive(Debug, Union)] pub enum Thread { Email(EmailThread), News(NewsPost), } #[derive(Debug, SimpleObject)] pub struct NewsPost { pub thread_id: String, pub is_read: bool, pub slug: String, pub site: String, pub title: String, pub body: String, pub url: String, pub timestamp: i64, } #[derive(Debug, SimpleObject)] pub struct EmailThread { pub thread_id: String, pub subject: String, pub messages: Vec, } #[derive(Debug, SimpleObject)] pub struct Message { // Message-ID for message, prepend `id:` to search in notmuch pub id: String, // First From header found in email pub from: Option, // All To headers found in email pub to: Vec, // All CC headers found in email pub cc: Vec, // First Subject header found in email pub subject: Option, // Parsed Date header, if found and valid pub timestamp: Option, // Headers pub headers: Vec
, // The body contents pub body: Body, // On disk location of message pub path: String, pub attachments: Vec, pub tags: Vec, } // Content-Type: image/jpeg; name="PXL_20231125_204826860.jpg" // Content-Disposition: attachment; filename="PXL_20231125_204826860.jpg" // Content-Transfer-Encoding: base64 // Content-ID: // X-Attachment-Id: f_lponoluo1 #[derive(Default, Debug, SimpleObject)] pub struct Attachment { pub id: String, pub idx: String, pub filename: Option, pub size: usize, pub content_type: Option, pub content_id: Option, pub disposition: DispositionType, pub bytes: Vec, } #[derive(Debug, Clone, Eq, PartialEq)] pub struct Disposition { pub r#type: DispositionType, pub filename: Option, pub size: Option, } #[derive(Debug, Enum, Copy, Clone, Eq, PartialEq)] pub enum DispositionType { Inline, Attachment, } impl From for DispositionType { fn from(value: mailparse::DispositionType) -> Self { match value { mailparse::DispositionType::Inline => DispositionType::Inline, mailparse::DispositionType::Attachment => DispositionType::Attachment, dt => panic!("unhandled DispositionType {dt:?}"), } } } impl Default for DispositionType { fn default() -> Self { DispositionType::Attachment } } #[derive(Debug, SimpleObject)] pub struct Header { pub key: String, pub value: String, } #[derive(Debug)] pub struct UnhandledContentType { pub text: String, pub content_tree: String, } #[Object] impl UnhandledContentType { async fn contents(&self) -> &str { &self.text } async fn content_tree(&self) -> &str { &self.content_tree } } #[derive(Debug)] pub struct PlainText { pub text: String, pub content_tree: String, } #[Object] impl PlainText { async fn contents(&self) -> &str { &self.text } async fn content_tree(&self) -> &str { &self.content_tree } } #[derive(Debug)] pub struct Html { pub html: String, pub content_tree: String, } #[Object] impl Html { async fn contents(&self) -> &str { &self.html } async fn content_tree(&self) -> &str { &self.content_tree } async fn headers(&self) -> Vec
{ Vec::new() } } #[derive(Debug, Union)] pub enum Body { UnhandledContentType(UnhandledContentType), PlainText(PlainText), Html(Html), } impl Body { pub fn html(html: String) -> Body { Body::Html(Html { html, content_tree: "".to_string(), }) } pub fn text(text: String) -> Body { Body::PlainText(PlainText { text, content_tree: "".to_string(), }) } } #[derive(Debug, SimpleObject)] pub struct Email { pub name: Option, pub addr: Option, pub photo_url: Option, } impl fmt::Display for Email { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), std::fmt::Error> { match (&self.name, &self.addr) { (Some(name), Some(addr)) => write!(f, "{name} <{addr}>")?, (Some(name), None) => write!(f, "{name}")?, (None, Some(addr)) => write!(f, "{addr}")?, (None, None) => write!(f, "")?, } Ok(()) } } #[derive(SimpleObject)] pub struct Tag { pub name: String, pub fg_color: String, pub bg_color: String, pub unread: usize, } #[derive(Serialize, Deserialize, Debug, InputObject)] struct SearchCursor { newsreader_offset: i32, notmuch_offset: i32, #[cfg(feature = "tantivy")] tantivy_offset: i32, } fn request_id() -> String { let now = std::time::SystemTime::now(); let nanos = now .duration_since(std::time::SystemTime::UNIX_EPOCH) .unwrap_or_default() .as_nanos(); format!("{nanos:x}") } pub struct QueryRoot; #[Object] impl QueryRoot { async fn version<'ctx>(&self, _ctx: &Context<'ctx>) -> Result { build_info::build_info!(fn bi); Ok(letterbox_shared::build_version(bi)) } #[instrument(skip_all, fields(query=query))] #[instrument(skip_all, fields(query=query, request_id=request_id()))] async fn count<'ctx>(&self, ctx: &Context<'ctx>, query: String) -> Result { let nm = ctx.data_unchecked::(); let pool = ctx.data_unchecked::(); #[cfg(feature = "tantivy")] let tantivy = ctx.data_unchecked::(); let newsreader_query: Query = query.parse()?; let newsreader_count = newsreader::count(pool, &newsreader_query).await?; let notmuch_count = nm::count(nm, &newsreader_query).await?; #[cfg(feature = "tantivy")] let tantivy_count = tantivy.count(&newsreader_query).await?; #[cfg(not(feature = "tantivy"))] let tantivy_count = 0; let total = newsreader_count + notmuch_count + tantivy_count; info!("count {newsreader_query:?} newsreader count {newsreader_count} notmuch count {notmuch_count} tantivy count {tantivy_count} total {total}"); Ok(total) } // TODO: this function doesn't get parallelism, possibly because notmuch is sync and blocks, // rewrite that with tokio::process:Command #[instrument(skip_all, fields(query=query, request_id=request_id()))] async fn search<'ctx>( &self, ctx: &Context<'ctx>, after: Option, before: Option, first: Option, last: Option, query: String, ) -> Result, ThreadSummary>, Error> { info!("search({after:?} {before:?} {first:?} {last:?} {query:?})",); let nm = ctx.data_unchecked::(); let pool = ctx.data_unchecked::(); #[cfg(feature = "tantivy")] let tantivy = ctx.data_unchecked::(); Ok(connection::query( after, before, first, last, |after: Option>, before: Option>, first: Option, last: Option| async move { info!( "search(after {:?} before {:?} first {first:?} last {last:?} query: {query:?})", after.as_ref().map(|v| &v.0), before.as_ref().map(|v| &v.0) ); let newsreader_after = after.as_ref().map(|sc| sc.newsreader_offset); let notmuch_after = after.as_ref().map(|sc| sc.notmuch_offset); #[cfg(feature = "tantivy")] let tantivy_after = after.as_ref().map(|sc| sc.tantivy_offset); let newsreader_before = before.as_ref().map(|sc| sc.newsreader_offset); let notmuch_before = before.as_ref().map(|sc| sc.notmuch_offset); #[cfg(feature = "tantivy")] let tantivy_before = before.as_ref().map(|sc| sc.tantivy_offset); let first = first.map(|v| v as i32); let last = last.map(|v| v as i32); let query: Query = query.parse()?; info!("newsreader_query {query:?}"); let newsreader_fut = newsreader_search( pool, newsreader_after, newsreader_before, first, last, &query, ); let notmuch_fut = notmuch_search(nm, notmuch_after, notmuch_before, first, last, &query); #[cfg(feature = "tantivy")] let tantivy_fut = tantivy_search( tantivy, pool, tantivy_after, tantivy_before, first, last, &query, ); #[cfg(not(feature = "tantivy"))] let tantivy_fut = async { Ok::, async_graphql::Error>(Vec::new()) }; let (newsreader_results, notmuch_results, tantivy_results) = join!(newsreader_fut, notmuch_fut, tantivy_fut); let newsreader_results = newsreader_results?; let notmuch_results = notmuch_results?; let tantivy_results = tantivy_results?; info!( "newsreader_results ({}) notmuch_results ({}) tantivy_results ({})", newsreader_results.len(), notmuch_results.len(), tantivy_results.len() ); let mut results: Vec<_> = newsreader_results .into_iter() .chain(notmuch_results) .chain(tantivy_results) .collect(); // The leading '-' is to reverse sort results.sort_by_key(|item| match item { ThreadSummaryCursor::Newsreader(_, ts) => -ts.timestamp, ThreadSummaryCursor::Notmuch(_, ts) => -ts.timestamp, #[cfg(feature = "tantivy")] ThreadSummaryCursor::Tantivy(_, ts) => -ts.timestamp, }); let mut has_next_page = before.is_some(); if let Some(first) = first { let first = first as usize; if results.len() > first { has_next_page = true; results.truncate(first); } } let mut has_previous_page = after.is_some(); if let Some(last) = last { let last = last as usize; if results.len() > last { has_previous_page = true; results.truncate(last); } } let mut connection = Connection::new(has_previous_page, has_next_page); // Set starting offset as the value from cursor to preserve state if no results from a corpus survived the truncation let mut newsreader_offset = after.as_ref().map(|sc| sc.newsreader_offset).unwrap_or(0); let mut notmuch_offset = after.as_ref().map(|sc| sc.notmuch_offset).unwrap_or(0); #[cfg(feature = "tantivy")] let tantivy_offset = after.as_ref().map(|sc| sc.tantivy_offset).unwrap_or(0); info!( "newsreader_offset ({}) notmuch_offset ({})", newsreader_offset, notmuch_offset, ); connection.edges.extend(results.into_iter().map(|item| { let thread_summary; match item { ThreadSummaryCursor::Newsreader(offset, ts) => { thread_summary = ts; newsreader_offset = offset; } ThreadSummaryCursor::Notmuch(offset, ts) => { thread_summary = ts; notmuch_offset = offset; } #[cfg(feature = "tantivy")] ThreadSummaryCursor::Tantivy(offset, ts) => { thread_summary = ts; tantivy_offset = offset; } } let cur = OpaqueCursor(SearchCursor { newsreader_offset, notmuch_offset, #[cfg(feature = "tantivy")] tantivy_offset, }); Edge::new(cur, thread_summary) })); Ok::<_, async_graphql::Error>(connection) }, ) .await?) } #[instrument(skip_all, fields(request_id=request_id()))] async fn tags<'ctx>(&self, ctx: &Context<'ctx>) -> FieldResult> { let nm = ctx.data_unchecked::(); let pool = ctx.data_unchecked::(); let needs_unread = ctx.look_ahead().field("unread").exists(); let mut tags = newsreader::tags(pool, needs_unread).await?; tags.append(&mut nm::tags(nm, needs_unread)?); Ok(tags) } #[instrument(skip_all, fields(thread_id=thread_id, request_id=request_id()))] async fn thread<'ctx>(&self, ctx: &Context<'ctx>, thread_id: String) -> Result { let nm = ctx.data_unchecked::(); let cacher = ctx.data_unchecked::(); let pool = ctx.data_unchecked::(); let debug_content_tree = ctx .look_ahead() .field("messages") .field("body") .field("contentTree") .exists(); if newsreader::is_newsreader_thread(&thread_id) { Ok(newsreader::thread(cacher, pool, thread_id).await?) } else { Ok(nm::thread(nm, pool, thread_id, debug_content_tree).await?) } } } #[derive(Debug)] enum ThreadSummaryCursor { Newsreader(i32, ThreadSummary), Notmuch(i32, ThreadSummary), #[cfg(feature = "tantivy")] Tantivy(i32, ThreadSummary), } async fn newsreader_search( pool: &PgPool, after: Option, before: Option, first: Option, last: Option, query: &Query, ) -> Result, async_graphql::Error> { Ok(newsreader::search(pool, after, before, first, last, &query) .await? .into_iter() .map(|(cur, ts)| ThreadSummaryCursor::Newsreader(cur, ts)) .collect()) } async fn notmuch_search( nm: &Notmuch, after: Option, before: Option, first: Option, last: Option, query: &Query, ) -> Result, async_graphql::Error> { Ok(nm::search(nm, after, before, first, last, &query) .await? .into_iter() .map(|(cur, ts)| ThreadSummaryCursor::Notmuch(cur, ts)) .collect()) } #[cfg(feature = "tantivy")] async fn tantivy_search( tantivy: &TantivyConnection, pool: &PgPool, after: Option, before: Option, first: Option, last: Option, query: &Query, ) -> Result, async_graphql::Error> { Ok(tantivy .search(pool, after, before, first, last, &query) .await? .into_iter() .map(|(cur, ts)| ThreadSummaryCursor::Tantivy(cur, ts)) .collect()) } pub struct Mutation; #[Object] impl Mutation { #[instrument(skip_all, fields(query=query, unread=unread, request_id=request_id()))] async fn set_read_status<'ctx>( &self, ctx: &Context<'ctx>, query: String, unread: bool, ) -> Result { let nm = ctx.data_unchecked::(); let pool = ctx.data_unchecked::(); #[cfg(feature = "tantivy")] let tantivy = ctx.data_unchecked::(); let query: Query = query.parse()?; newsreader::set_read_status(pool, &query, unread).await?; #[cfg(feature = "tantivy")] tantivy.reindex_thread(pool, &query).await?; nm::set_read_status(nm, &query, unread).await?; Ok(true) } #[instrument(skip_all, fields(query=query, tag=tag, request_id=request_id()))] async fn tag_add<'ctx>( &self, ctx: &Context<'ctx>, query: String, tag: String, ) -> Result { let nm = ctx.data_unchecked::(); info!("tag_add({tag}, {query})"); nm.tag_add(&tag, &query)?; Ok(true) } #[instrument(skip_all, fields(query=query, tag=tag, request_id=request_id()))] async fn tag_remove<'ctx>( &self, ctx: &Context<'ctx>, query: String, tag: String, ) -> Result { let nm = ctx.data_unchecked::(); info!("tag_remove({tag}, {query})"); nm.tag_remove(&tag, &query)?; Ok(true) } /// Drop and recreate tantivy index. Warning this is slow #[cfg(feature = "tantivy")] async fn drop_and_load_index<'ctx>(&self, ctx: &Context<'ctx>) -> Result { let tantivy = ctx.data_unchecked::(); let pool = ctx.data_unchecked::(); tantivy.drop_and_load_index()?; tantivy.reindex_all(pool).await?; Ok(true) } #[instrument(skip_all, fields(request_id=request_id()))] async fn refresh<'ctx>(&self, ctx: &Context<'ctx>) -> Result { let nm = ctx.data_unchecked::(); let cacher = ctx.data_unchecked::(); let pool = ctx.data_unchecked::(); info!("{}", String::from_utf8_lossy(&nm.new()?)); newsreader::refresh(pool, cacher).await?; #[cfg(feature = "tantivy")] { let tantivy = ctx.data_unchecked::(); // TODO: parallelize tantivy.refresh(pool).await?; } Ok(true) } } pub type GraphqlSchema = Schema;