diff --git a/server/src/bin/letterbox-server.rs b/server/src/bin/letterbox-server.rs index 2af5147..37d656f 100644 --- a/server/src/bin/letterbox-server.rs +++ b/server/src/bin/letterbox-server.rs @@ -21,7 +21,7 @@ use letterbox_notmuch::Notmuch; use letterbox_server::tantivy::TantivyConnection; use letterbox_server::{ graphql::{compute_catchup_ids, Attachment, MutationRoot, QueryRoot, SubscriptionRoot}, - nm::{attachment_bytes, cid_attachment_bytes}, + nm::{attachment_bytes, cid_attachment_bytes, label_unprocessed}, ws::ConnectionTracker, }; use letterbox_shared::WebsocketMessage; @@ -29,7 +29,7 @@ use serde::Deserialize; use sqlx::postgres::PgPool; use tokio::{net::TcpListener, sync::Mutex}; use tower_http::trace::{DefaultMakeSpan, TraceLayer}; -use tracing::info; +use tracing::{error, info}; // Make our own error that wraps `ServerError`. struct AppError(letterbox_server::ServerError); @@ -176,11 +176,15 @@ async fn start_ws( #[derive(Debug, Deserialize)] struct NotificationParams { delay_ms: Option, + num_unprocessed: Option, } async fn send_refresh_websocket_handler( State(AppState { - connection_tracker, .. + nm, + pool, + connection_tracker, + .. }): State, params: Query, ) -> impl IntoResponse { @@ -190,6 +194,15 @@ async fn send_refresh_websocket_handler( info!("sleeping {delay:?}"); tokio::time::sleep(delay).await; } + let limit = match params.num_unprocessed { + Some(0) => None, + Some(limit) => Some(limit), + None => Some(10), + }; + + if let Err(err) = label_unprocessed(&nm, &pool, false, limit, "tag:unprocessed").await { + error!("Failed to label_unprocessed: {err:?}"); + }; connection_tracker .lock() .await @@ -204,18 +217,33 @@ async fn watch_new( conn_tracker: Arc>, poll_time: Duration, ) -> Result<(), async_graphql::Error> { - let mut old_ids = Vec::new(); - loop { + async fn watch_new_iteration( + nm: &Notmuch, + pool: &PgPool, + conn_tracker: Arc>, + old_ids: &[String], + ) -> Result, async_graphql::Error> { let ids = compute_catchup_ids(&nm, &pool, "is:unread").await?; + info!("old_ids: {} ids: {}", old_ids.len(), ids.len()); if old_ids != ids { - info!("old_ids: {old_ids:?}\n ids: {ids:?}"); + label_unprocessed(&nm, &pool, false, Some(100), "tag:unprocessed").await?; conn_tracker .lock() .await .send_message_all(WebsocketMessage::RefreshMessages) .await } - old_ids = ids; + Ok(ids) + } + let mut old_ids = Vec::new(); + loop { + old_ids = match watch_new_iteration(&nm, &pool, conn_tracker.clone(), &old_ids).await { + Ok(old_ids) => old_ids, + Err(err) => { + error!("watch_new_iteration failed: {err:?}"); + continue; + } + }; tokio::time::sleep(poll_time).await; } } @@ -223,6 +251,7 @@ async fn watch_new( #[derive(Clone)] struct AppState { nm: Notmuch, + pool: PgPool, connection_tracker: Arc>, } @@ -263,7 +292,7 @@ async fn main() -> Result<(), Box> { let connection_tracker = Arc::new(Mutex::new(ConnectionTracker::default())); let ct = Arc::clone(&connection_tracker); let poll_time = Duration::from_secs(60); - let _h = tokio::spawn(watch_new(nm.clone(), pool, ct, poll_time)); + let _h = tokio::spawn(watch_new(nm.clone(), pool.clone(), ct, poll_time)); let api_routes = Router::new() .route( @@ -288,6 +317,7 @@ async fn main() -> Result<(), Box> { .nest("/notification", notification_routes) .with_state(AppState { nm, + pool, connection_tracker, }) .layer( diff --git a/web/src/state.rs b/web/src/state.rs index d68425d..2a902ad 100644 --- a/web/src/state.rs +++ b/web/src/state.rs @@ -671,7 +671,7 @@ pub fn update(msg: Msg, model: &mut Model, orders: &mut impl Orders) { } Msg::WebsocketMessage(msg) => { match msg { - WebsocketMessage::RefreshMessages => orders.send_msg(Msg::RefreshStart), + WebsocketMessage::RefreshMessages => orders.send_msg(Msg::Refresh), }; } }