Compare commits
14 Commits
letterbox-
...
letterbox-
| Author | SHA1 | Date | |
|---|---|---|---|
| 4244fa0d82 | |||
| 4b15e71893 | |||
| 1bbebad01b | |||
| 27edffd090 | |||
| 08212a9f78 | |||
| 877ec6c4b0 | |||
| 3ce92d6bdf | |||
| 1a28bb2021 | |||
| b86f72f75c | |||
| 1a8b98d420 | |||
| 383a7d800f | |||
| 453561140a | |||
| f6d5d3755b | |||
| 5226fe090e |
33
Cargo.lock
generated
33
Cargo.lock
generated
@@ -3020,9 +3020,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "letterbox-notmuch"
|
||||
version = "0.17.3"
|
||||
version = "0.17.9"
|
||||
source = "sparse+https://git.z.xinu.tv/api/packages/wathiede/cargo/"
|
||||
checksum = "660e35d98139603d764aba884faea20b6ebd43864afd8a70ee0864d161cb7089"
|
||||
checksum = "aea61000b7ea6d2cca754dea71bafb54aa913a98f30f7b01836cce1414af5614"
|
||||
dependencies = [
|
||||
"log",
|
||||
"mailparse",
|
||||
@@ -3034,7 +3034,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "letterbox-notmuch"
|
||||
version = "0.17.4"
|
||||
version = "0.17.11"
|
||||
dependencies = [
|
||||
"itertools",
|
||||
"log",
|
||||
@@ -3049,12 +3049,12 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "letterbox-procmail2notmuch"
|
||||
version = "0.17.4"
|
||||
version = "0.17.11"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"clap",
|
||||
"letterbox-notmuch 0.17.3",
|
||||
"letterbox-shared 0.17.3",
|
||||
"letterbox-notmuch 0.17.9",
|
||||
"letterbox-shared 0.17.9",
|
||||
"serde",
|
||||
"sqlx",
|
||||
"tokio 1.44.2",
|
||||
@@ -3062,7 +3062,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "letterbox-server"
|
||||
version = "0.17.4"
|
||||
version = "0.17.11"
|
||||
dependencies = [
|
||||
"ammonia",
|
||||
"anyhow",
|
||||
@@ -3080,8 +3080,8 @@ dependencies = [
|
||||
"futures 0.3.31",
|
||||
"headers",
|
||||
"html-escape",
|
||||
"letterbox-notmuch 0.17.3",
|
||||
"letterbox-shared 0.17.3",
|
||||
"letterbox-notmuch 0.17.11",
|
||||
"letterbox-shared 0.17.11",
|
||||
"linkify",
|
||||
"log",
|
||||
"lol_html",
|
||||
@@ -3106,12 +3106,12 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "letterbox-shared"
|
||||
version = "0.17.3"
|
||||
version = "0.17.9"
|
||||
source = "sparse+https://git.z.xinu.tv/api/packages/wathiede/cargo/"
|
||||
checksum = "e30cefa3eebb0b15b077527072a4a53dbde8dd6cb513b20e78f036a84c86329a"
|
||||
checksum = "359b4c3ab6b8a91d9a66798b3ee87285f102b7820d38f1d5d3f4be4ea7480803"
|
||||
dependencies = [
|
||||
"build-info",
|
||||
"letterbox-notmuch 0.17.3",
|
||||
"letterbox-notmuch 0.17.9",
|
||||
"regex",
|
||||
"serde",
|
||||
"sqlx",
|
||||
@@ -3121,10 +3121,10 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "letterbox-shared"
|
||||
version = "0.17.4"
|
||||
version = "0.17.11"
|
||||
dependencies = [
|
||||
"build-info",
|
||||
"letterbox-notmuch 0.17.3",
|
||||
"letterbox-notmuch 0.17.11",
|
||||
"regex",
|
||||
"serde",
|
||||
"sqlx",
|
||||
@@ -3134,7 +3134,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "letterbox-web"
|
||||
version = "0.17.4"
|
||||
version = "0.17.11"
|
||||
dependencies = [
|
||||
"build-info",
|
||||
"build-info-build",
|
||||
@@ -3146,8 +3146,7 @@ dependencies = [
|
||||
"graphql_client",
|
||||
"human_format",
|
||||
"itertools",
|
||||
"letterbox-notmuch 0.17.3",
|
||||
"letterbox-shared 0.17.3",
|
||||
"letterbox-shared 0.17.9",
|
||||
"log",
|
||||
"seed",
|
||||
"seed_hooks",
|
||||
|
||||
@@ -8,7 +8,7 @@ authors = ["Bill Thiede <git@xinu.tv>"]
|
||||
edition = "2021"
|
||||
license = "UNLICENSED"
|
||||
publish = ["xinu"]
|
||||
version = "0.17.4"
|
||||
version = "0.17.11"
|
||||
repository = "https://git.z.xinu.tv/wathiede/letterbox"
|
||||
|
||||
[profile.dev]
|
||||
|
||||
@@ -503,15 +503,28 @@ impl Notmuch {
|
||||
self.tags_for_query("*")
|
||||
}
|
||||
|
||||
#[instrument(skip_all, fields(tag=tag,search_term=search_term))]
|
||||
pub fn tag_add(&self, tag: &str, search_term: &str) -> Result<(), NotmuchError> {
|
||||
self.run_notmuch(["tag", &format!("+{tag}"), search_term])?;
|
||||
self.tags_add(tag, &[search_term])
|
||||
}
|
||||
|
||||
#[instrument(skip_all, fields(tag=tag,search_term=?search_term))]
|
||||
pub fn tags_add(&self, tag: &str, search_term: &[&str]) -> Result<(), NotmuchError> {
|
||||
let tag = format!("+{tag}");
|
||||
let mut args = vec!["tag", &tag];
|
||||
args.extend(search_term);
|
||||
self.run_notmuch(&args)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(skip_all, fields(tag=tag,search_term=search_term))]
|
||||
pub fn tag_remove(&self, tag: &str, search_term: &str) -> Result<(), NotmuchError> {
|
||||
self.run_notmuch(["tag", &format!("-{tag}"), search_term])?;
|
||||
self.tags_remove(tag, &[search_term])
|
||||
}
|
||||
#[instrument(skip_all, fields(tag=tag,search_term=?search_term))]
|
||||
pub fn tags_remove(&self, tag: &str, search_term: &[&str]) -> Result<(), NotmuchError> {
|
||||
let tag = format!("-{tag}");
|
||||
let mut args = vec!["tag", &tag];
|
||||
args.extend(search_term);
|
||||
self.run_notmuch(&args)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -13,8 +13,8 @@ version.workspace = true
|
||||
[dependencies]
|
||||
anyhow = "1.0.98"
|
||||
clap = { version = "4.5.37", features = ["derive", "env"] }
|
||||
letterbox-notmuch = { version = "0.17.2", registry = "xinu" }
|
||||
letterbox-shared = { version = "0.17.2", registry = "xinu" }
|
||||
letterbox-notmuch = { version = "0.17.9", registry = "xinu" }
|
||||
letterbox-shared = { version = "0.17.9", registry = "xinu" }
|
||||
serde = { version = "1.0.219", features = ["derive"] }
|
||||
sqlx = { version = "0.8.5", features = ["postgres", "runtime-tokio"] }
|
||||
tokio = { version = "1.44.2", features = ["rt", "macros", "rt-multi-thread"] }
|
||||
|
||||
@@ -27,8 +27,8 @@ css-inline = "0.14.4"
|
||||
futures = "0.3.31"
|
||||
headers = "0.4.0"
|
||||
html-escape = "0.2.13"
|
||||
letterbox-notmuch = { version = "0.17.2", registry = "xinu" }
|
||||
letterbox-shared = { version = "0.17.2", registry = "xinu" }
|
||||
letterbox-notmuch = { path = "../notmuch", version = "0.17.11", registry = "xinu" }
|
||||
letterbox-shared = { path = "../shared", version = "0.17.11", registry = "xinu" }
|
||||
linkify = "0.10.0"
|
||||
log = "0.4.27"
|
||||
lol_html = "2.3.0"
|
||||
|
||||
@@ -21,7 +21,7 @@ use letterbox_notmuch::Notmuch;
|
||||
use letterbox_server::tantivy::TantivyConnection;
|
||||
use letterbox_server::{
|
||||
graphql::{compute_catchup_ids, Attachment, MutationRoot, QueryRoot, SubscriptionRoot},
|
||||
nm::{attachment_bytes, cid_attachment_bytes},
|
||||
nm::{attachment_bytes, cid_attachment_bytes, label_unprocessed},
|
||||
ws::ConnectionTracker,
|
||||
};
|
||||
use letterbox_shared::WebsocketMessage;
|
||||
@@ -29,7 +29,7 @@ use serde::Deserialize;
|
||||
use sqlx::postgres::PgPool;
|
||||
use tokio::{net::TcpListener, sync::Mutex};
|
||||
use tower_http::trace::{DefaultMakeSpan, TraceLayer};
|
||||
use tracing::info;
|
||||
use tracing::{error, info};
|
||||
|
||||
// Make our own error that wraps `ServerError`.
|
||||
struct AppError(letterbox_server::ServerError);
|
||||
@@ -176,11 +176,15 @@ async fn start_ws(
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct NotificationParams {
|
||||
delay_ms: Option<u64>,
|
||||
num_unprocessed: Option<usize>,
|
||||
}
|
||||
|
||||
async fn send_refresh_websocket_handler(
|
||||
State(AppState {
|
||||
connection_tracker, ..
|
||||
nm,
|
||||
pool,
|
||||
connection_tracker,
|
||||
..
|
||||
}): State<AppState>,
|
||||
params: Query<NotificationParams>,
|
||||
) -> impl IntoResponse {
|
||||
@@ -190,6 +194,15 @@ async fn send_refresh_websocket_handler(
|
||||
info!("sleeping {delay:?}");
|
||||
tokio::time::sleep(delay).await;
|
||||
}
|
||||
let limit = match params.num_unprocessed {
|
||||
Some(0) => None,
|
||||
Some(limit) => Some(limit),
|
||||
None => Some(10),
|
||||
};
|
||||
|
||||
if let Err(err) = label_unprocessed(&nm, &pool, false, limit, "tag:unprocessed").await {
|
||||
error!("Failed to label_unprocessed: {err:?}");
|
||||
};
|
||||
connection_tracker
|
||||
.lock()
|
||||
.await
|
||||
@@ -204,18 +217,33 @@ async fn watch_new(
|
||||
conn_tracker: Arc<Mutex<ConnectionTracker>>,
|
||||
poll_time: Duration,
|
||||
) -> Result<(), async_graphql::Error> {
|
||||
let mut old_ids = Vec::new();
|
||||
loop {
|
||||
async fn watch_new_iteration(
|
||||
nm: &Notmuch,
|
||||
pool: &PgPool,
|
||||
conn_tracker: Arc<Mutex<ConnectionTracker>>,
|
||||
old_ids: &[String],
|
||||
) -> Result<Vec<String>, async_graphql::Error> {
|
||||
let ids = compute_catchup_ids(&nm, &pool, "is:unread").await?;
|
||||
info!("old_ids: {} ids: {}", old_ids.len(), ids.len());
|
||||
if old_ids != ids {
|
||||
info!("old_ids: {old_ids:?}\n ids: {ids:?}");
|
||||
label_unprocessed(&nm, &pool, false, Some(100), "tag:unprocessed").await?;
|
||||
conn_tracker
|
||||
.lock()
|
||||
.await
|
||||
.send_message_all(WebsocketMessage::RefreshMessages)
|
||||
.await
|
||||
}
|
||||
old_ids = ids;
|
||||
Ok(ids)
|
||||
}
|
||||
let mut old_ids = Vec::new();
|
||||
loop {
|
||||
old_ids = match watch_new_iteration(&nm, &pool, conn_tracker.clone(), &old_ids).await {
|
||||
Ok(old_ids) => old_ids,
|
||||
Err(err) => {
|
||||
error!("watch_new_iteration failed: {err:?}");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
tokio::time::sleep(poll_time).await;
|
||||
}
|
||||
}
|
||||
@@ -223,6 +251,7 @@ async fn watch_new(
|
||||
#[derive(Clone)]
|
||||
struct AppState {
|
||||
nm: Notmuch,
|
||||
pool: PgPool,
|
||||
connection_tracker: Arc<Mutex<ConnectionTracker>>,
|
||||
}
|
||||
|
||||
@@ -263,7 +292,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
|
||||
let connection_tracker = Arc::new(Mutex::new(ConnectionTracker::default()));
|
||||
let ct = Arc::clone(&connection_tracker);
|
||||
let poll_time = Duration::from_secs(60);
|
||||
let _h = tokio::spawn(watch_new(nm.clone(), pool, ct, poll_time));
|
||||
let _h = tokio::spawn(watch_new(nm.clone(), pool.clone(), ct, poll_time));
|
||||
|
||||
let api_routes = Router::new()
|
||||
.route(
|
||||
@@ -288,6 +317,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
|
||||
.nest("/notification", notification_routes)
|
||||
.with_state(AppState {
|
||||
nm,
|
||||
pool,
|
||||
connection_tracker,
|
||||
})
|
||||
.layer(
|
||||
|
||||
@@ -982,6 +982,8 @@ pub async fn label_unprocessed(
|
||||
} else {
|
||||
&ids[..]
|
||||
};
|
||||
let mut add_mutations = HashMap::new();
|
||||
let mut rm_mutations = HashMap::new();
|
||||
for id in ids {
|
||||
let id = format!("id:{id}");
|
||||
let files = nm.files(&id)?;
|
||||
@@ -1001,20 +1003,63 @@ pub async fn label_unprocessed(
|
||||
.get_first_value("subject")
|
||||
.expect("no subject header")
|
||||
);
|
||||
} else {
|
||||
for t in &add_tags {
|
||||
nm.tag_add(t, &id)?;
|
||||
}
|
||||
if add_tags.contains("spam") || add_tags.contains("Spam") {
|
||||
nm.tag_remove("unread", &id)?;
|
||||
}
|
||||
if !add_tags.contains("inbox") {
|
||||
nm.tag_remove("inbox", &id)?;
|
||||
}
|
||||
nm.tag_remove("unprocessed", &id)?;
|
||||
}
|
||||
for t in &add_tags {
|
||||
//nm.tag_add(t, &id)?;
|
||||
add_mutations
|
||||
.entry(t.to_string())
|
||||
.or_insert_with(|| Vec::new())
|
||||
.push(id.clone());
|
||||
}
|
||||
if add_tags.contains("spam") || add_tags.contains("Spam") {
|
||||
//nm.tag_remove("unread", &id)?;
|
||||
let t = "unread".to_string();
|
||||
rm_mutations
|
||||
.entry(t)
|
||||
.or_insert_with(|| Vec::new())
|
||||
.push(id.clone());
|
||||
}
|
||||
if !add_tags.contains("inbox") {
|
||||
//nm.tag_remove("inbox", &id)?;
|
||||
let t = "inbox".to_string();
|
||||
rm_mutations
|
||||
.entry(t)
|
||||
.or_insert_with(|| Vec::new())
|
||||
.push(id.clone());
|
||||
}
|
||||
//nm.tag_remove("unprocessed", &id)?;
|
||||
} else {
|
||||
if add_tags.is_empty() {
|
||||
let t = "Grey".to_string();
|
||||
add_mutations
|
||||
.entry(t)
|
||||
.or_insert_with(|| Vec::new())
|
||||
.push(id.clone());
|
||||
}
|
||||
let t = "unprocessed".to_string();
|
||||
rm_mutations
|
||||
.entry(t)
|
||||
.or_insert_with(|| Vec::new())
|
||||
.push(id.clone());
|
||||
}
|
||||
}
|
||||
println!("Adding {} distinct labels", add_mutations.len());
|
||||
for (tag, ids) in add_mutations.iter() {
|
||||
println!(" {tag}: {}", ids.len());
|
||||
if !dryrun {
|
||||
let ids: Vec<_> = ids.iter().map(|s| s.as_str()).collect();
|
||||
nm.tags_add(tag, &ids)?;
|
||||
}
|
||||
}
|
||||
println!("Removing {} distinct labels", rm_mutations.len());
|
||||
for (tag, ids) in rm_mutations.iter() {
|
||||
println!(" {tag}: {}", ids.len());
|
||||
if !dryrun {
|
||||
let ids: Vec<_> = ids.iter().map(|s| s.as_str()).collect();
|
||||
nm.tags_remove(tag, &ids)?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
fn find_tags<'a, 'b>(rules: &'a [Rule], headers: &'b [MailHeader]) -> (bool, HashSet<&'a str>) {
|
||||
|
||||
@@ -12,7 +12,7 @@ version.workspace = true
|
||||
|
||||
[dependencies]
|
||||
build-info = "0.0.40"
|
||||
letterbox-notmuch = { version = "0.17.2", registry = "xinu" }
|
||||
letterbox-notmuch = { path = "../notmuch", version = "0.17.11", registry = "xinu" }
|
||||
regex = "1.11.1"
|
||||
serde = { version = "1.0.219", features = ["derive"] }
|
||||
sqlx = "0.8.5"
|
||||
|
||||
@@ -33,8 +33,7 @@ wasm-bindgen = "=0.2.100"
|
||||
uuid = { version = "1.16.0", features = [
|
||||
"js",
|
||||
] } # direct dep to set js feature, prevents Rng issues
|
||||
letterbox-shared = { version = "0.17.2", registry = "xinu" }
|
||||
letterbox-notmuch = { version = "0.17.2", registry = "xinu" }
|
||||
letterbox-shared = { version = "0.17.9", registry = "xinu" }
|
||||
seed_hooks = { version = "0.4.1", registry = "xinu" }
|
||||
strum_macros = "0.27.1"
|
||||
gloo-console = "0.3.0"
|
||||
|
||||
@@ -671,7 +671,7 @@ pub fn update(msg: Msg, model: &mut Model, orders: &mut impl Orders<Msg>) {
|
||||
}
|
||||
Msg::WebsocketMessage(msg) => {
|
||||
match msg {
|
||||
WebsocketMessage::RefreshMessages => orders.send_msg(Msg::RefreshStart),
|
||||
WebsocketMessage::RefreshMessages => orders.send_msg(Msg::Refresh),
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user