diff --git a/Cargo.lock b/Cargo.lock index 356774a..fc48420 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3090,6 +3090,7 @@ dependencies = [ "build-info", "letterbox-notmuch", "serde", + "strum_macros 0.27.1", ] [[package]] diff --git a/notmuch/src/lib.rs b/notmuch/src/lib.rs index f90b7e9..350aad0 100644 --- a/notmuch/src/lib.rs +++ b/notmuch/src/lib.rs @@ -470,7 +470,7 @@ pub enum NotmuchError { MailParseError(#[from] mailparse::MailParseError), } -#[derive(Default)] +#[derive(Clone, Default)] pub struct Notmuch { config_path: Option, } diff --git a/server/src/bin/letterbox-server.rs b/server/src/bin/letterbox-server.rs index 58dfcb1..56b46ac 100644 --- a/server/src/bin/letterbox-server.rs +++ b/server/src/bin/letterbox-server.rs @@ -1,7 +1,7 @@ // Rocket generates a lot of warnings for handlers // TODO: figure out why #![allow(unreachable_patterns)] -use std::{error::Error, io::Cursor, net::SocketAddr, str::FromStr, sync::Arc}; +use std::{error::Error, io::Cursor, net::SocketAddr, str::FromStr, sync::Arc, time::Duration}; use async_graphql::{extensions, http::GraphiQLSource, Schema}; use async_graphql_axum::{GraphQL, GraphQLSubscription}; @@ -21,7 +21,9 @@ use letterbox_server::tantivy::TantivyConnection; use letterbox_server::{ config::Config, error::ServerError, - graphql::{Attachment, GraphqlSchema, MutationRoot, QueryRoot, SubscriptionRoot}, + graphql::{ + compute_catchup_ids, Attachment, GraphqlSchema, MutationRoot, QueryRoot, SubscriptionRoot, + }, nm::{attachment_bytes, cid_attachment_bytes}, ws::ConnectionTracker, }; @@ -294,19 +296,44 @@ async fn main() -> Result<(), Box> { std::fs::create_dir_all(&config.slurp_cache_path)?; } let pool = PgPool::connect(&config.newsreader_database_url).await?; + let nm = Notmuch::default(); sqlx::migrate!("./migrations").run(&pool).await?; #[cfg(feature = "tantivy")] let tantivy_conn = TantivyConnection::new(&config.newsreader_tantivy_db_path)?; let cacher = FilesystemCacher::new(&config.slurp_cache_path)?; let schema = Schema::build(QueryRoot, MutationRoot, SubscriptionRoot) - .data(Notmuch::default()) + .data(nm.clone()) .data(cacher) .data(pool.clone()); let schema = schema.extension(extensions::Logger).finish(); let conn_tracker = Arc::new(Mutex::new(ConnectionTracker::default())); + async fn watch_new( + nm: Notmuch, + pool: PgPool, + conn_tracker: Arc>, + poll_time: Duration, + ) -> Result<(), async_graphql::Error> { + let mut old_ids = Vec::new(); + loop { + let ids = compute_catchup_ids(&nm, &pool, "is:unread").await?; + if old_ids != ids { + info!("old_ids: {old_ids:?}\n ids: {ids:?}"); + conn_tracker + .lock() + .await + .send_message_all(WebsocketMessage::RefreshMessages) + .await + } + old_ids = ids; + tokio::time::sleep(poll_time).await; + } + } + let ct = Arc::clone(&conn_tracker); + let poll_time = Duration::from_secs(10); + let _h = tokio::spawn(watch_new(nm, pool, ct, poll_time)); let app = Router::new() .route("/test", get(test_handler)) diff --git a/server/src/graphql.rs b/server/src/graphql.rs index b1f74bc..2371dc2 100644 --- a/server/src/graphql.rs +++ b/server/src/graphql.rs @@ -319,37 +319,7 @@ impl QueryRoot { ) -> Result, Error> { let nm = ctx.data_unchecked::(); let pool = ctx.data_unchecked::(); - let query: Query = query.parse()?; - // TODO: implement optimized versions of fetching just IDs - let newsreader_fut = newsreader_search(pool, None, None, None, None, &query); - let notmuch_fut = notmuch_search(nm, None, None, None, None, &query); - let (newsreader_results, notmuch_results) = join!(newsreader_fut, notmuch_fut); - - let newsreader_results = newsreader_results?; - let notmuch_results = notmuch_results?; - info!( - "newsreader_results ({}) notmuch_results ({})", - newsreader_results.len(), - notmuch_results.len(), - ); - - let mut results: Vec<_> = newsreader_results - .into_iter() - .chain(notmuch_results) - .collect(); - // The leading '-' is to reverse sort - results.sort_by_key(|item| match item { - ThreadSummaryCursor::Newsreader(_, ts) => -ts.timestamp, - ThreadSummaryCursor::Notmuch(_, ts) => -ts.timestamp, - }); - let ids = results - .into_iter() - .map(|r| match r { - ThreadSummaryCursor::Newsreader(_, ts) => ts.thread, - ThreadSummaryCursor::Notmuch(_, ts) => ts.thread, - }) - .collect(); - Ok(ids) + compute_catchup_ids(nm, pool, &query).await } // TODO: this function doesn't get parallelism, possibly because notmuch is sync and blocks, @@ -649,7 +619,6 @@ impl MutationRoot { tantivy.drop_and_load_index()?; tantivy.reindex_all(pool).await?; - println("hit"); Ok(true) } @@ -679,3 +648,42 @@ impl SubscriptionRoot { } pub type GraphqlSchema = Schema; + +#[instrument(skip_all, fields(query=query))] +pub async fn compute_catchup_ids( + nm: &Notmuch, + pool: &PgPool, + query: &str, +) -> Result, Error> { + let query: Query = query.parse()?; + // TODO: implement optimized versions of fetching just IDs + let newsreader_fut = newsreader_search(pool, None, None, None, None, &query); + let notmuch_fut = notmuch_search(nm, None, None, None, None, &query); + let (newsreader_results, notmuch_results) = join!(newsreader_fut, notmuch_fut); + + let newsreader_results = newsreader_results?; + let notmuch_results = notmuch_results?; + info!( + "newsreader_results ({}) notmuch_results ({})", + newsreader_results.len(), + notmuch_results.len(), + ); + + let mut results: Vec<_> = newsreader_results + .into_iter() + .chain(notmuch_results) + .collect(); + // The leading '-' is to reverse sort + results.sort_by_key(|item| match item { + ThreadSummaryCursor::Newsreader(_, ts) => -ts.timestamp, + ThreadSummaryCursor::Notmuch(_, ts) => -ts.timestamp, + }); + let ids = results + .into_iter() + .map(|r| match r { + ThreadSummaryCursor::Newsreader(_, ts) => ts.thread, + ThreadSummaryCursor::Notmuch(_, ts) => ts.thread, + }) + .collect(); + Ok(ids) +} diff --git a/server/src/lib.rs b/server/src/lib.rs index 1cd0d80..a2d20d4 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -445,19 +445,16 @@ pub fn sanitize_html( let mut element_content_handlers = vec![ // Remove width and height attributes on elements element!("[width],[height]", |el| { - println!("width or height {el:?}"); el.remove_attribute("width"); el.remove_attribute("height"); Ok(()) }), // Remove width and height values from inline styles element!("[style]", |el| { - println!("style {el:?}"); let style = el.get_attribute("style").unwrap(); let style = style .split(";") .filter(|s| { - println!("s {s}"); let Some((k, _)) = s.split_once(':') else { return true; }; @@ -469,7 +466,6 @@ pub fn sanitize_html( }) .collect::>() .join(";"); - println!("style: {style}"); if let Err(e) = el.set_attribute("style", &style) { error!("Failed to set style attribute: {e}"); } diff --git a/server/src/mail.rs b/server/src/mail.rs index bb369a3..19bd583 100644 --- a/server/src/mail.rs +++ b/server/src/mail.rs @@ -61,8 +61,6 @@ pub async fn read_mail_to_db(pool: &PgPool, path: &str) -> Result<(), MailError> .ok_or(MailError::MissingDate)?, )?; - println!("Feed: {feed_id} Subject: {}", subject); - if let Some(_m) = first_html(&m) { info!("add email {slug} {subject} {message_id} {date} {uid} {url}"); } else { diff --git a/server/src/nm.rs b/server/src/nm.rs index a9fe62d..2e1947d 100644 --- a/server/src/nm.rs +++ b/server/src/nm.rs @@ -349,9 +349,7 @@ fn email_addresses( for ma in mal.into_inner() { match ma { mailparse::MailAddr::Group(gi) => { - if !gi.group_name.contains("ndisclosed") { - println!("[{path}][{header_name}] Group: {gi}"); - } + if !gi.group_name.contains("ndisclosed") {} } mailparse::MailAddr::Single(s) => addrs.push(Email { name: s.display_name, diff --git a/server/src/ws.rs b/server/src/ws.rs index e4dba8e..bf81620 100644 --- a/server/src/ws.rs +++ b/server/src/ws.rs @@ -13,8 +13,10 @@ impl ConnectionTracker { pub fn add_peer(&mut self, socket: WebSocket, who: SocketAddr) { warn!("adding {who:?} to connection tracker"); self.peers.insert(who, socket); + self.send_message_all(WebsocketMessage::RefreshMessages); } pub async fn send_message_all(&mut self, msg: WebsocketMessage) { + info!("send_message_all {msg}"); let m = serde_json::to_string(&msg).expect("failed to json encode WebsocketMessage"); let mut bad_peers = Vec::new(); for (who, socket) in &mut self.peers.iter_mut() { diff --git a/shared/Cargo.toml b/shared/Cargo.toml index 9a596a8..578dfca 100644 --- a/shared/Cargo.toml +++ b/shared/Cargo.toml @@ -14,3 +14,4 @@ version.workspace = true build-info = "0.0.40" letterbox-notmuch = { version = "0.12.1", path = "../notmuch", registry = "xinu" } serde = { version = "1.0.147", features = ["derive"] } +strum_macros = "0.27.1" diff --git a/shared/src/lib.rs b/shared/src/lib.rs index 3724eb6..5b97d7a 100644 --- a/shared/src/lib.rs +++ b/shared/src/lib.rs @@ -13,7 +13,7 @@ pub struct SearchResult { pub total: usize, } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, strum_macros::Display)] pub enum WebsocketMessage { RefreshMessages, }