Add mail tagging support

This commit is contained in:
2025-04-21 21:15:55 -07:00
parent 38e75ec251
commit 9178badfd0
9 changed files with 408 additions and 202 deletions

View File

@@ -27,7 +27,7 @@ css-inline = "0.14.0"
futures = "0.3.31"
headers = "0.4.0"
html-escape = "0.2.13"
letterbox-notmuch = { version = "0.15.11", path = "../notmuch", registry = "xinu" }
letterbox-notmuch = { version = "0.15.11", path = "../notmuch" }
letterbox-shared = { version = "0.15.11", path = "../shared", registry = "xinu" }
linkify = "0.10.0"
log = "0.4.17"

View File

@@ -0,0 +1,39 @@
use std::error::Error;
use clap::Parser;
use letterbox_notmuch::Notmuch;
use letterbox_server::nm::label_unprocessed;
use sqlx::postgres::PgPool;
use tracing::info;
#[derive(Parser)]
#[command(version, about, long_about = None)]
struct Cli {
#[arg(short, long, default_value = env!("DATABASE_URL"))]
newsreader_database_url: String,
#[arg(short, long, default_value = "10")]
/// Set to 0 to process all matches
messages_to_process: usize,
#[arg(short, long, default_value = "false")]
execute: bool,
/// Process messages matching this notmuch query
#[arg(short, long, default_value = "tag:unprocessed")]
query: String,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let cli = Cli::parse();
let _guard = xtracing::init(env!("CARGO_BIN_NAME"))?;
build_info::build_info!(fn bi);
info!("Build Info: {}", letterbox_shared::build_version(bi));
let pool = PgPool::connect(&cli.newsreader_database_url).await?;
let nm = Notmuch::default();
let limit = if cli.messages_to_process > 0 {
Some(cli.messages_to_process)
} else {
None
};
label_unprocessed(&nm, &pool, !cli.execute, limit, &cli.query).await?;
Ok(())
}

View File

@@ -17,7 +17,7 @@ use tracing::instrument;
#[cfg(feature = "tantivy")]
use crate::tantivy::TantivyConnection;
use crate::{newsreader, nm, Query};
use crate::{newsreader, nm, nm::label_unprocessed, Query};
/// # Number of seconds since the Epoch
pub type UnixTime = isize;
@@ -629,6 +629,10 @@ impl MutationRoot {
let pool = ctx.data_unchecked::<PgPool>();
info!("{}", String::from_utf8_lossy(&nm.new()?));
newsreader::refresh(pool, cacher).await?;
// Process email labels
label_unprocessed(&nm, &pool, false, Some(10), "tag:unprocessed").await?;
#[cfg(feature = "tantivy")]
{
let tantivy = ctx.data_unchecked::<TantivyConnection>();

View File

@@ -1,11 +1,14 @@
use std::{collections::HashMap, fs::File};
use std::{
collections::{HashMap, HashSet},
fs::File,
};
use letterbox_notmuch::Notmuch;
use letterbox_shared::compute_color;
use letterbox_shared::{compute_color, Rule};
use log::{error, info, warn};
use mailparse::{parse_content_type, parse_mail, MailHeader, MailHeaderMap, ParsedMail};
use memmap::MmapOptions;
use sqlx::PgPool;
use sqlx::{types::Json, PgPool};
use tracing::instrument;
use crate::{
@@ -925,3 +928,106 @@ WHERE
.await?;
Ok(row.map(|r| r.url))
}
/*
* grab email_rules table from sql
* For each message with `unprocessed` label
* parse the message
* pass headers for each message through a matcher using email rules
* for each match, add label to message
* if any matches were found, remove unprocessed
* TODO: how to handle inbox label
*/
#[instrument(name="nm::label_unprocessed", skip_all, fields(dryrun=dryrun, limit=?limit, query=%query))]
pub async fn label_unprocessed(
nm: &Notmuch,
pool: &PgPool,
dryrun: bool,
limit: Option<usize>,
query: &str,
) -> Result<(), ServerError> {
use futures::StreamExt;
let ids = nm.message_ids(query)?;
info!(
"Processing {limit:?} of {} messages with '{query}'",
ids.len()
);
let rules: Vec<_> = sqlx::query!(
r#"
SELECT rule as "rule: Json<Rule>"
FROM email_rule
ORDER BY sort_order
"#,
)
.fetch(pool)
.map(|r| r.unwrap().rule.0)
.collect()
.await;
/*
use letterbox_shared::{Match, MatchType};
let rules = vec![Rule {
stop_on_match: false,
matches: vec![Match {
match_type: MatchType::From,
needle: "eftours".to_string(),
}],
tag: "EFTours".to_string(),
}];
*/
info!("Loaded {} rules", rules.len());
let ids = if let Some(limit) = limit {
&ids[..limit]
} else {
&ids[..]
};
for id in ids {
let id = format!("id:{id}");
let files = nm.files(&id)?;
// Only process the first file path is multiple files have the same id
let path = files.iter().next().unwrap();
let file = File::open(&path)?;
let mmap = unsafe { MmapOptions::new().map(&file)? };
let m = parse_mail(&mmap)?;
let (matched_rule, add_tags) = find_tags(&rules, &m.headers);
if matched_rule {
if dryrun {
info!(
"\nAdd tags: {add_tags:?}\nTo: {} From: {} Subject: {}\n",
m.headers.get_first_value("to").expect("no from header"),
m.headers.get_first_value("from").expect("no from header"),
m.headers
.get_first_value("subject")
.expect("no subject header")
);
} else {
for t in &add_tags {
nm.tag_add(t, &id)?;
}
if !add_tags.contains("inbox") {
nm.tag_remove("inbox", &id)?;
}
nm.tag_remove("unprocessed", &id)?;
}
}
}
Ok(())
}
fn find_tags<'a, 'b>(rules: &'a [Rule], headers: &'b [MailHeader]) -> (bool, HashSet<&'a str>) {
let mut matched_rule = false;
let mut add_tags = HashSet::new();
for rule in rules {
for hdr in headers {
if rule.is_match(&hdr.get_key(), &hdr.get_value()) {
//info!("Matched {rule:?}");
matched_rule = true;
add_tags.insert(rule.tag.as_str());
if rule.stop_on_match {
return (true, add_tags);
}
}
}
}
return (matched_rule, add_tags);
}