server: recursively descend email threads to find all unread recipients

This commit is contained in:
Bill Thiede 2025-02-22 17:58:07 -08:00
parent 52e24437bd
commit 00f61cf6be
2 changed files with 44 additions and 33 deletions

View File

@ -628,33 +628,44 @@ impl Notmuch {
let ts: ThreadSet = serde::de::Deserialize::deserialize(&mut deserializer)?;
deserializer.end()?;
let mut r = HashMap::new();
fn collect_from_thread_node(
r: &mut HashMap<String, usize>,
tn: &ThreadNode,
) -> Result<(), NotmuchError> {
let Some(msg) = &tn.0 else {
return Ok(());
};
let mut addrs = vec![];
let hdr = &msg.headers.to;
if let Some(to) = hdr {
addrs.push(to);
};
let hdr = &msg.headers.cc;
if let Some(cc) = hdr {
addrs.push(cc);
};
for recipient in addrs {
mailparse::addrparse(&recipient)?
.into_inner()
.iter()
.for_each(|a| {
let mailparse::MailAddr::Single(si) = a else {
return;
};
let addr = &si.addr;
if addr == "couchmoney@gmail.com" || addr.ends_with("@xinu.tv") {
*r.entry(addr.to_lowercase()).or_default() += 1;
}
});
}
Ok(())
}
for t in ts.0 {
for tn in t.0 {
let Some(msg) = tn.0 else {
continue;
};
let mut addrs = vec![];
let hdr = msg.headers.to;
if let Some(to) = hdr {
addrs.push(to);
};
let hdr = msg.headers.cc;
if let Some(cc) = hdr {
addrs.push(cc);
};
for recipient in addrs {
mailparse::addrparse(&recipient)?
.into_inner()
.iter()
.for_each(|a| {
let mailparse::MailAddr::Single(si) = a else {
return;
};
let addr = &si.addr;
if addr == "couchmoney@gmail.com" || addr.ends_with("@xinu.tv") {
*r.entry(addr.to_lowercase()).or_default() += 1;
}
});
collect_from_thread_node(&mut r, &tn)?;
for sub_tn in tn.1 {
collect_from_thread_node(&mut r, &sub_tn)?;
}
}
}

View File

@ -286,7 +286,7 @@ impl QueryRoot {
Ok(letterbox_shared::build_version(bi))
}
#[instrument(skip_all, fields(query=query))]
#[instrument(skip_all, fields(query=query, request_id=request_id()))]
#[instrument(skip_all, fields(query=query, rid=request_id()))]
async fn count<'ctx>(&self, ctx: &Context<'ctx>, query: String) -> Result<usize, Error> {
let nm = ctx.data_unchecked::<Notmuch>();
let pool = ctx.data_unchecked::<PgPool>();
@ -309,7 +309,7 @@ impl QueryRoot {
// TODO: this function doesn't get parallelism, possibly because notmuch is sync and blocks,
// rewrite that with tokio::process:Command
#[instrument(skip_all, fields(query=query, request_id=request_id()))]
#[instrument(skip_all, fields(query=query, rid=request_id()))]
async fn search<'ctx>(
&self,
ctx: &Context<'ctx>,
@ -467,7 +467,7 @@ impl QueryRoot {
.await?)
}
#[instrument(skip_all, fields(request_id=request_id()))]
#[instrument(skip_all, fields(rid=request_id()))]
async fn tags<'ctx>(&self, ctx: &Context<'ctx>) -> FieldResult<Vec<Tag>> {
let nm = ctx.data_unchecked::<Notmuch>();
let pool = ctx.data_unchecked::<PgPool>();
@ -476,7 +476,7 @@ impl QueryRoot {
tags.append(&mut nm::tags(nm, needs_unread)?);
Ok(tags)
}
#[instrument(skip_all, fields(thread_id=thread_id, request_id=request_id()))]
#[instrument(skip_all, fields(thread_id=thread_id, rid=request_id()))]
async fn thread<'ctx>(&self, ctx: &Context<'ctx>, thread_id: String) -> Result<Thread, Error> {
let nm = ctx.data_unchecked::<Notmuch>();
let cacher = ctx.data_unchecked::<FilesystemCacher>();
@ -553,7 +553,7 @@ async fn tantivy_search(
pub struct Mutation;
#[Object]
impl Mutation {
#[instrument(skip_all, fields(query=query, unread=unread, request_id=request_id()))]
#[instrument(skip_all, fields(query=query, unread=unread, rid=request_id()))]
async fn set_read_status<'ctx>(
&self,
ctx: &Context<'ctx>,
@ -572,7 +572,7 @@ impl Mutation {
nm::set_read_status(nm, &query, unread).await?;
Ok(true)
}
#[instrument(skip_all, fields(query=query, tag=tag, request_id=request_id()))]
#[instrument(skip_all, fields(query=query, tag=tag, rid=request_id()))]
async fn tag_add<'ctx>(
&self,
ctx: &Context<'ctx>,
@ -584,7 +584,7 @@ impl Mutation {
nm.tag_add(&tag, &query)?;
Ok(true)
}
#[instrument(skip_all, fields(query=query, tag=tag, request_id=request_id()))]
#[instrument(skip_all, fields(query=query, tag=tag, rid=request_id()))]
async fn tag_remove<'ctx>(
&self,
ctx: &Context<'ctx>,
@ -607,7 +607,7 @@ impl Mutation {
Ok(true)
}
#[instrument(skip_all, fields(request_id=request_id()))]
#[instrument(skip_all, fields(rid=request_id()))]
async fn refresh<'ctx>(&self, ctx: &Context<'ctx>) -> Result<bool, Error> {
let nm = ctx.data_unchecked::<Notmuch>();
let cacher = ctx.data_unchecked::<FilesystemCacher>();