Compare commits

..

No commits in common. "f6d5d3755b5f82712ed3b010a459e7f37f838246" and "c10ad00ca77808294e714c10edb2a2b4291c5c39" have entirely different histories.

4 changed files with 15 additions and 45 deletions

10
Cargo.lock generated
View File

@ -3034,7 +3034,7 @@ dependencies = [
[[package]] [[package]]
name = "letterbox-notmuch" name = "letterbox-notmuch"
version = "0.17.5" version = "0.17.4"
dependencies = [ dependencies = [
"itertools", "itertools",
"log", "log",
@ -3049,7 +3049,7 @@ dependencies = [
[[package]] [[package]]
name = "letterbox-procmail2notmuch" name = "letterbox-procmail2notmuch"
version = "0.17.5" version = "0.17.4"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"clap", "clap",
@ -3062,7 +3062,7 @@ dependencies = [
[[package]] [[package]]
name = "letterbox-server" name = "letterbox-server"
version = "0.17.5" version = "0.17.4"
dependencies = [ dependencies = [
"ammonia", "ammonia",
"anyhow", "anyhow",
@ -3121,7 +3121,7 @@ dependencies = [
[[package]] [[package]]
name = "letterbox-shared" name = "letterbox-shared"
version = "0.17.5" version = "0.17.4"
dependencies = [ dependencies = [
"build-info", "build-info",
"letterbox-notmuch 0.17.3", "letterbox-notmuch 0.17.3",
@ -3134,7 +3134,7 @@ dependencies = [
[[package]] [[package]]
name = "letterbox-web" name = "letterbox-web"
version = "0.17.5" version = "0.17.4"
dependencies = [ dependencies = [
"build-info", "build-info",
"build-info-build", "build-info-build",

View File

@ -8,7 +8,7 @@ authors = ["Bill Thiede <git@xinu.tv>"]
edition = "2021" edition = "2021"
license = "UNLICENSED" license = "UNLICENSED"
publish = ["xinu"] publish = ["xinu"]
version = "0.17.5" version = "0.17.4"
repository = "https://git.z.xinu.tv/wathiede/letterbox" repository = "https://git.z.xinu.tv/wathiede/letterbox"
[profile.dev] [profile.dev]

View File

@ -21,7 +21,7 @@ use letterbox_notmuch::Notmuch;
use letterbox_server::tantivy::TantivyConnection; use letterbox_server::tantivy::TantivyConnection;
use letterbox_server::{ use letterbox_server::{
graphql::{compute_catchup_ids, Attachment, MutationRoot, QueryRoot, SubscriptionRoot}, graphql::{compute_catchup_ids, Attachment, MutationRoot, QueryRoot, SubscriptionRoot},
nm::{attachment_bytes, cid_attachment_bytes, label_unprocessed}, nm::{attachment_bytes, cid_attachment_bytes},
ws::ConnectionTracker, ws::ConnectionTracker,
}; };
use letterbox_shared::WebsocketMessage; use letterbox_shared::WebsocketMessage;
@ -29,7 +29,7 @@ use serde::Deserialize;
use sqlx::postgres::PgPool; use sqlx::postgres::PgPool;
use tokio::{net::TcpListener, sync::Mutex}; use tokio::{net::TcpListener, sync::Mutex};
use tower_http::trace::{DefaultMakeSpan, TraceLayer}; use tower_http::trace::{DefaultMakeSpan, TraceLayer};
use tracing::{error, info}; use tracing::info;
// Make our own error that wraps `ServerError`. // Make our own error that wraps `ServerError`.
struct AppError(letterbox_server::ServerError); struct AppError(letterbox_server::ServerError);
@ -176,15 +176,11 @@ async fn start_ws(
#[derive(Debug, Deserialize)] #[derive(Debug, Deserialize)]
struct NotificationParams { struct NotificationParams {
delay_ms: Option<u64>, delay_ms: Option<u64>,
num_unprocessed: Option<usize>,
} }
async fn send_refresh_websocket_handler( async fn send_refresh_websocket_handler(
State(AppState { State(AppState {
nm, connection_tracker, ..
pool,
connection_tracker,
..
}): State<AppState>, }): State<AppState>,
params: Query<NotificationParams>, params: Query<NotificationParams>,
) -> impl IntoResponse { ) -> impl IntoResponse {
@ -194,15 +190,6 @@ async fn send_refresh_websocket_handler(
info!("sleeping {delay:?}"); info!("sleeping {delay:?}");
tokio::time::sleep(delay).await; tokio::time::sleep(delay).await;
} }
let limit = match params.num_unprocessed {
Some(0) => None,
Some(limit) => Some(limit),
None => Some(10),
};
if let Err(err) = label_unprocessed(&nm, &pool, false, limit, "tag:unprocessed").await {
error!("Failed to label_unprocessed: {err:?}");
};
connection_tracker connection_tracker
.lock() .lock()
.await .await
@ -217,33 +204,18 @@ async fn watch_new(
conn_tracker: Arc<Mutex<ConnectionTracker>>, conn_tracker: Arc<Mutex<ConnectionTracker>>,
poll_time: Duration, poll_time: Duration,
) -> Result<(), async_graphql::Error> { ) -> Result<(), async_graphql::Error> {
async fn watch_new_iteration( let mut old_ids = Vec::new();
nm: &Notmuch, loop {
pool: &PgPool,
conn_tracker: Arc<Mutex<ConnectionTracker>>,
old_ids: &[String],
) -> Result<Vec<String>, async_graphql::Error> {
let ids = compute_catchup_ids(&nm, &pool, "is:unread").await?; let ids = compute_catchup_ids(&nm, &pool, "is:unread").await?;
info!("old_ids: {} ids: {}", old_ids.len(), ids.len());
if old_ids != ids { if old_ids != ids {
label_unprocessed(&nm, &pool, false, Some(100), "tag:unprocessed").await?; info!("old_ids: {old_ids:?}\n ids: {ids:?}");
conn_tracker conn_tracker
.lock() .lock()
.await .await
.send_message_all(WebsocketMessage::RefreshMessages) .send_message_all(WebsocketMessage::RefreshMessages)
.await .await
} }
Ok(ids) old_ids = ids;
}
let mut old_ids = Vec::new();
loop {
old_ids = match watch_new_iteration(&nm, &pool, conn_tracker.clone(), &old_ids).await {
Ok(old_ids) => old_ids,
Err(err) => {
error!("watch_new_iteration failed: {err:?}");
continue;
}
};
tokio::time::sleep(poll_time).await; tokio::time::sleep(poll_time).await;
} }
} }
@ -251,7 +223,6 @@ async fn watch_new(
#[derive(Clone)] #[derive(Clone)]
struct AppState { struct AppState {
nm: Notmuch, nm: Notmuch,
pool: PgPool,
connection_tracker: Arc<Mutex<ConnectionTracker>>, connection_tracker: Arc<Mutex<ConnectionTracker>>,
} }
@ -292,7 +263,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
let connection_tracker = Arc::new(Mutex::new(ConnectionTracker::default())); let connection_tracker = Arc::new(Mutex::new(ConnectionTracker::default()));
let ct = Arc::clone(&connection_tracker); let ct = Arc::clone(&connection_tracker);
let poll_time = Duration::from_secs(60); let poll_time = Duration::from_secs(60);
let _h = tokio::spawn(watch_new(nm.clone(), pool.clone(), ct, poll_time)); let _h = tokio::spawn(watch_new(nm.clone(), pool, ct, poll_time));
let api_routes = Router::new() let api_routes = Router::new()
.route( .route(
@ -317,7 +288,6 @@ async fn main() -> Result<(), Box<dyn Error>> {
.nest("/notification", notification_routes) .nest("/notification", notification_routes)
.with_state(AppState { .with_state(AppState {
nm, nm,
pool,
connection_tracker, connection_tracker,
}) })
.layer( .layer(

View File

@ -671,7 +671,7 @@ pub fn update(msg: Msg, model: &mut Model, orders: &mut impl Orders<Msg>) {
} }
Msg::WebsocketMessage(msg) => { Msg::WebsocketMessage(msg) => {
match msg { match msg {
WebsocketMessage::RefreshMessages => orders.send_msg(Msg::Refresh), WebsocketMessage::RefreshMessages => orders.send_msg(Msg::RefreshStart),
}; };
} }
} }