2025-08-15 14:01:14 -07:00

817 lines
28 KiB
Rust

use std::{
collections::{HashMap, HashSet},
fs::File,
io::{Cursor, Read},
};
use letterbox_notmuch::Notmuch;
use letterbox_shared::{compute_color, Rule};
use mailparse::{parse_mail, MailHeader, MailHeaderMap};
use memmap::MmapOptions;
use sqlx::{types::Json, PgPool};
use tracing::{error, info, info_span, instrument, warn};
use zip::ZipArchive;
use crate::{
compute_offset_limit,
email_extract::*,
error::ServerError,
graphql::{
Attachment, Body, Corpus, EmailThread, Header, Html, Message, PlainText, Tag, Thread,
ThreadSummary, UnhandledContentType,
},
linkify_html, InlineStyle, Query, SanitizeHtml, Transformer,
};
const APPLICATION_GZIP: &'static str = "application/gzip";
const APPLICATION_ZIP: &'static str = "application/zip";
const MULTIPART_REPORT: &'static str = "multipart/report";
const MAX_RAW_MESSAGE_SIZE: usize = 100_000;
fn is_notmuch_query(query: &Query) -> bool {
query.is_notmuch || query.corpus == Some(Corpus::Notmuch)
}
pub fn is_notmuch_thread_or_id(id: &str) -> bool {
id.starts_with("id:") || id.starts_with("thread:")
}
// TODO(wathiede): decide good error type
pub fn threadset_to_messages(
thread_set: letterbox_notmuch::ThreadSet,
) -> Result<Vec<Message>, ServerError> {
for t in thread_set.0 {
for _tn in t.0 {}
}
Ok(Vec::new())
}
#[instrument(name="nm::count", skip_all, fields(query=%query))]
pub async fn count(nm: &Notmuch, query: &Query) -> Result<usize, ServerError> {
if !is_notmuch_query(query) {
return Ok(0);
}
let query = query.to_notmuch();
Ok(nm.count(&query)?)
}
#[instrument(name="nm::search", skip_all, fields(query=%query))]
pub async fn search(
nm: &Notmuch,
after: Option<i32>,
before: Option<i32>,
first: Option<i32>,
last: Option<i32>,
query: &Query,
) -> Result<Vec<(i32, ThreadSummary)>, async_graphql::Error> {
if !is_notmuch_query(query) {
return Ok(Vec::new());
}
let query = query.to_notmuch();
let (offset, mut limit) = compute_offset_limit(after, before, first, last);
if before.is_none() {
// When searching forward, the +1 is to see if there are more pages of data available.
// Searching backwards implies there's more pages forward, because the value represented by
// `before` is on the next page.
limit = limit + 1;
}
Ok(nm
.search(&query, offset as usize, limit as usize)?
.0
.into_iter()
.enumerate()
.map(|(i, ts)| {
(
offset + i as i32,
ThreadSummary {
thread: format!("thread:{}", ts.thread),
timestamp: ts.timestamp,
date_relative: ts.date_relative,
matched: ts.matched,
total: ts.total,
authors: ts.authors,
subject: ts.subject,
tags: ts.tags,
corpus: Corpus::Notmuch,
},
)
})
.collect())
}
#[instrument(name="nm::tags", skip_all, fields(needs_unread=needs_unread))]
pub fn tags(nm: &Notmuch, needs_unread: bool) -> Result<Vec<Tag>, ServerError> {
let unread_msg_cnt: HashMap<String, usize> = if needs_unread {
// 10000 is an arbitrary number, if there's more than 10k unread messages, we'll
// get an inaccurate count.
nm.search("is:unread", 0, 10000)?
.0
.iter()
.fold(HashMap::new(), |mut m, ts| {
ts.tags.iter().for_each(|t| {
m.entry(t.clone()).and_modify(|c| *c += 1).or_insert(1);
});
m
})
} else {
HashMap::new()
};
let tags: Vec<_> = nm
.tags()?
.into_iter()
.map(|tag| {
let hex = compute_color(&tag);
let unread = if needs_unread {
*unread_msg_cnt.get(&tag).unwrap_or(&0)
} else {
0
};
Tag {
name: tag,
fg_color: "white".to_string(),
bg_color: hex,
unread,
}
})
.chain(
nm.unread_recipients()?
.into_iter()
.filter_map(|(name, unread)| {
let Some(idx) = name.find('@') else {
return None;
};
let name = format!("{}/{}", &name[idx..], &name[..idx]);
let bg_color = compute_color(&name);
Some(Tag {
name,
fg_color: "white".to_string(),
bg_color,
unread,
})
}),
)
.collect();
Ok(tags)
}
#[instrument(name="nm::thread", skip_all, fields(thread_id=thread_id))]
pub async fn thread(
nm: &Notmuch,
pool: &PgPool,
thread_id: String,
debug_content_tree: bool,
) -> Result<Thread, ServerError> {
// TODO(wathiede): normalize all email addresses through an address book with preferred
// display names (that default to the most commonly seen name).
let mut messages = Vec::new();
for (path, id) in std::iter::zip(nm.files(&thread_id)?, nm.message_ids(&thread_id)?) {
let mut html_report_summary: Option<String> = None;
let tags = nm.tags_for_query(&format!("id:{}", id))?;
let file = File::open(&path)?;
let mmap = unsafe { MmapOptions::new().map(&file)? };
let m = parse_mail(&mmap)?;
let from = email_addresses(&path, &m, "from")?;
let mut from = match from.len() {
0 => None,
1 => from.into_iter().next(),
_ => {
warn!(
"Got {} from addresses in message, truncating: {:?}",
from.len(),
from
);
from.into_iter().next()
}
};
match from.as_mut() {
Some(from) => {
if let Some(addr) = from.addr.as_mut() {
let photo_url = photo_url_for_email_address(&pool, &addr).await?;
from.photo_url = photo_url;
}
}
_ => (),
}
let to = email_addresses(&path, &m, "to")?;
let cc = email_addresses(&path, &m, "cc")?;
let delivered_to = email_addresses(&path, &m, "delivered-to")?.pop();
let x_original_to = email_addresses(&path, &m, "x-original-to")?.pop();
let subject = m.headers.get_first_value("subject");
let timestamp = m
.headers
.get_first_value("date")
.and_then(|d| mailparse::dateparse(&d).ok());
let cid_prefix = letterbox_shared::urls::cid_prefix(None, &id);
let base_url = None;
let mut part_addr = Vec::new();
part_addr.push(id.to_string());
let body = match extract_body(&m, &mut part_addr)? {
Body::PlainText(PlainText { text, content_tree }) => {
let text = if text.len() > MAX_RAW_MESSAGE_SIZE {
format!(
"{}...\n\nMESSAGE WAS TRUNCATED @ {} bytes",
&text[..MAX_RAW_MESSAGE_SIZE],
MAX_RAW_MESSAGE_SIZE
)
} else {
text
};
Body::Html(Html {
html: {
let body_tranformers: Vec<Box<dyn Transformer>> = vec![
Box::new(InlineStyle),
Box::new(SanitizeHtml {
cid_prefix: &cid_prefix,
base_url: &base_url,
}),
];
let mut html = linkify_html(&text.trim_matches('\n'));
for t in body_tranformers.iter() {
if t.should_run(&None, &html) {
html = t.transform(&None, &html).await?;
}
}
format!(
r#"<p class="view-part-text-plain font-mono whitespace-pre-line">{}</p>"#,
// Trim newlines to prevent excessive white space at the beginning/end of
// presenation. Leave tabs and spaces incase plain text attempts to center a
// header on the first line.
html
)
},
content_tree: if debug_content_tree {
render_content_type_tree(&m)
} else {
content_tree
},
})
}
Body::Html(Html {
mut html,
content_tree,
}) => Body::Html(Html {
html: {
let body_tranformers: Vec<Box<dyn Transformer>> = vec![
// TODO: this breaks things like emails from calendar
//Box::new(InlineStyle),
Box::new(SanitizeHtml {
cid_prefix: &cid_prefix,
base_url: &base_url,
}),
];
for t in body_tranformers.iter() {
if t.should_run(&None, &html) {
html = t.transform(&None, &html).await?;
}
}
html
},
content_tree: if debug_content_tree {
render_content_type_tree(&m)
} else {
content_tree
},
}),
Body::UnhandledContentType(UnhandledContentType { content_tree, .. }) => {
let body_start = mmap
.windows(2)
.take(20_000)
.position(|w| w == b"\n\n")
.unwrap_or(0);
let body = mmap[body_start + 2..].to_vec();
Body::UnhandledContentType(UnhandledContentType {
text: String::from_utf8(body)?,
content_tree: if debug_content_tree {
render_content_type_tree(&m)
} else {
content_tree
},
})
}
};
let headers = m
.headers
.iter()
.map(|h| Header {
key: h.get_key(),
value: h.get_value(),
})
.collect();
// TODO(wathiede): parse message and fill out attachments
let attachments = extract_attachments(&m, &id)?;
let mut final_body = body;
let mut raw_report_content: Option<String> = None;
// Append TLS report if available
if m.ctype.mimetype.as_str() == MULTIPART_REPORT {
if let Ok(Body::Html(_html_body)) = extract_report(&m, &mut part_addr) {
// Extract raw JSON for pretty printing
if let Some(sp) = m
.subparts
.iter()
.find(|sp| sp.ctype.mimetype.as_str() == "application/tlsrpt+gzip")
{
if let Ok(gz_bytes) = sp.get_body_raw() {
let mut decoder = flate2::read::GzDecoder::new(&gz_bytes[..]);
let mut buffer = Vec::new();
if decoder.read_to_end(&mut buffer).is_ok() {
if let Ok(json_str) = String::from_utf8(buffer) {
raw_report_content = Some(json_str);
}
}
}
}
}
}
// Append DMARC report if available
if m.ctype.mimetype.as_str() == APPLICATION_ZIP {
if let Ok(Body::Html(html_body)) = extract_zip(&m) {
html_report_summary = Some(html_body.html);
// Extract raw XML for pretty printing
if let Ok(zip_bytes) = m.get_body_raw() {
if let Ok(mut archive) = ZipArchive::new(Cursor::new(&zip_bytes)) {
for i in 0..archive.len() {
if let Ok(mut file) = archive.by_index(i) {
let name = file.name().to_lowercase();
if is_dmarc_report_filename(&name) {
let mut xml = String::new();
use std::io::Read;
if file.read_to_string(&mut xml).is_ok() {
raw_report_content = Some(xml);
}
}
}
}
}
}
}
}
if m.ctype.mimetype.as_str() == APPLICATION_GZIP {
// Call extract_gzip to get the HTML summary and also to determine if it's a DMARC report
if let Ok((Body::Html(html_body), _)) = extract_gzip(&m) {
html_report_summary = Some(html_body.html);
// If extract_gzip successfully parsed a DMARC report, then extract the raw content
if let Ok(gz_bytes) = m.get_body_raw() {
let mut decoder = flate2::read::GzDecoder::new(&gz_bytes[..]);
let mut xml = String::new();
use std::io::Read;
if decoder.read_to_string(&mut xml).is_ok() {
raw_report_content = Some(xml);
}
}
}
}
let mut current_html = final_body.to_html().unwrap_or_default();
if let Some(html_summary) = html_report_summary {
current_html.push_str(&html_summary);
}
error!(
"mimetype {} raw_report_content.is_some() {}",
m.ctype.mimetype.as_str(),
raw_report_content.is_some()
);
if let Some(raw_content) = raw_report_content {
let pretty_printed_content = if m.ctype.mimetype.as_str() == MULTIPART_REPORT {
// Pretty print JSON
if let Ok(parsed_json) = serde_json::from_str::<serde_json::Value>(&raw_content) {
serde_json::to_string_pretty(&parsed_json).unwrap_or(raw_content)
} else {
raw_content
}
} else {
// DMARC reports are XML
// Pretty print XML
match pretty_print_xml_with_trimming(&raw_content) {
Ok(pretty_xml) => pretty_xml,
Err(e) => {
error!("Failed to pretty print XML: {:?}", e);
raw_content
}
}
};
current_html.push_str(&format!(
"\n<pre>{}</pre>",
html_escape::encode_text(&pretty_printed_content)
));
}
final_body = Body::Html(Html {
html: current_html,
content_tree: final_body.to_html_content_tree().unwrap_or_default(),
});
messages.push(Message {
id: format!("id:{}", id),
from,
to,
cc,
subject,
tags,
timestamp,
headers,
body: final_body,
path,
attachments,
delivered_to,
x_original_to,
});
}
messages.reverse();
// Find the first subject that's set. After reversing the vec, this should be the oldest
// message.
let subject: String = messages
.iter()
.skip_while(|m| m.subject.is_none())
.next()
.and_then(|m| m.subject.clone())
.unwrap_or("(NO SUBJECT)".to_string());
Ok(Thread::Email(EmailThread {
thread_id,
subject,
messages,
}))
}
pub fn cid_attachment_bytes(nm: &Notmuch, id: &str, cid: &str) -> Result<Attachment, ServerError> {
let files = nm.files(id)?;
let Some(path) = files.first() else {
warn!("failed to find files for message {}", id);
return Err(ServerError::PartNotFound);
};
let file = File::open(&path)?;
let mmap = unsafe { MmapOptions::new().map(&file)? };
let m = parse_mail(&mmap)?;
if let Some(attachment) = walk_attachments(&m, |sp, _cur_idx| {
info!("{} {:?}", cid, get_content_id(&sp.headers));
if let Some(h_cid) = get_content_id(&sp.headers) {
let h_cid = &h_cid[1..h_cid.len() - 1];
if h_cid == cid {
let attachment = extract_attachment(&sp, id, &[]).unwrap_or(Attachment {
..Attachment::default()
});
return Some(attachment);
}
}
None
}) {
return Ok(attachment);
}
Err(ServerError::PartNotFound)
}
pub fn attachment_bytes(nm: &Notmuch, id: &str, idx: &[usize]) -> Result<Attachment, ServerError> {
let files = nm.files(id)?;
let Some(path) = files.first() else {
warn!("failed to find files for message {}", id);
return Err(ServerError::PartNotFound);
};
let file = File::open(&path)?;
let mmap = unsafe { MmapOptions::new().map(&file)? };
let m = parse_mail(&mmap)?;
if let Some(attachment) = walk_attachments(&m, |sp, cur_idx| {
if cur_idx == idx {
let attachment = extract_attachment(&sp, id, idx).unwrap_or(Attachment {
..Attachment::default()
});
return Some(attachment);
}
None
}) {
return Ok(attachment);
}
Err(ServerError::PartNotFound)
}
#[instrument(name="nm::set_read_status", skip_all, fields(query=%query, unread=unread))]
pub async fn set_read_status<'ctx>(
nm: &Notmuch,
query: &Query,
unread: bool,
) -> Result<bool, ServerError> {
let uids: Vec<_> = query
.uids
.iter()
.filter(|uid| is_notmuch_thread_or_id(uid))
.collect();
info!("set_read_status({} {:?})", unread, uids);
for uid in uids {
if unread {
nm.tag_add("unread", uid)?;
} else {
nm.tag_remove("unread", uid)?;
}
}
Ok(true)
}
async fn photo_url_for_email_address(
pool: &PgPool,
addr: &str,
) -> Result<Option<String>, ServerError> {
let row =
sqlx::query_as::<_, (String,)>(include_str!("../sql/photo_url_for_email_address.sql"))
.bind(addr)
.fetch_optional(pool)
.await?;
Ok(row.map(|r| r.0))
}
/*
* grab email_rules table from sql
* For each message with `unprocessed` label
* parse the message
* pass headers for each message through a matcher using email rules
* for each match, add label to message
* if any matches were found, remove unprocessed
* TODO: how to handle inbox label
*/
#[instrument(name="nm::label_unprocessed", skip_all, fields(dryrun=dryrun, limit=?limit, query=%query))]
pub async fn label_unprocessed(
nm: &Notmuch,
pool: &PgPool,
dryrun: bool,
limit: Option<usize>,
query: &str,
) -> Result<Box<[String]>, ServerError> {
use futures::StreamExt;
let ids = nm.message_ids(query)?;
info!(
"Processing {:?} of {} messages with '{}'",
limit,
ids.len(),
query
);
let rules: Vec<_> =
sqlx::query_as::<_, (Json<Rule>,)>(include_str!("../sql/label_unprocessed.sql"))
.fetch(pool)
.map(|r| r.unwrap().0 .0)
.collect()
.await;
/*
use letterbox_shared::{Match, MatchType};
let rules = vec![Rule {
stop_on_match: false,
matches: vec![Match {
match_type: MatchType::From,
needle: "eftours".to_string(),
}],
tag: "EFTours".to_string(),
}];
*/
info!("Loaded {} rules", rules.len());
let limit = limit.unwrap_or(ids.len());
let limit = limit.min(ids.len());
let ids = &ids[..limit];
let mut add_mutations = HashMap::new();
let mut rm_mutations = HashMap::new();
for id in ids {
let id = format!("id:{}", id);
let files = nm.files(&id)?;
// Only process the first file path is multiple files have the same id
let Some(path) = files.iter().next() else {
error!("No files for message-ID {}", id);
let t = "Letterbox/Bad";
nm.tag_add(t, &id)?;
let t = "unprocessed";
nm.tag_remove(t, &id)?;
continue;
};
let file = File::open(&path)?;
info!("parsing {}", path);
let mmap = unsafe { MmapOptions::new().map(&file)? };
let m = match info_span!("parse_mail", path = path).in_scope(|| parse_mail(&mmap)) {
Ok(m) => m,
Err(err) => {
error!("Failed to parse {}: {}", path, err);
let t = "Letterbox/Bad";
nm.tag_add(t, &id)?;
let t = "unprocessed";
nm.tag_remove(t, &id)?;
continue;
}
};
let (matched_rule, add_tags) = find_tags(&rules, &m.headers);
if matched_rule {
if dryrun {
info!(
"\nAdd tags: {:?}\nTo: {} From: {} Subject: {}\n",
add_tags,
m.headers.get_first_value("to").expect("no from header"),
m.headers.get_first_value("from").expect("no from header"),
m.headers
.get_first_value("subject")
.expect("no subject header")
);
}
for t in &add_tags {
//nm.tag_add(t, &id)?;
add_mutations
.entry(t.to_string())
.or_insert_with(|| Vec::new())
.push(id.clone());
}
if add_tags.contains("spam") || add_tags.contains("Spam") {
//nm.tag_remove("unread", &id)?;
let t = "unread".to_string();
rm_mutations
.entry(t)
.or_insert_with(|| Vec::new())
.push(id.clone());
}
if !add_tags.contains("inbox") {
//nm.tag_remove("inbox", &id)?;
let t = "inbox".to_string();
rm_mutations
.entry(t)
.or_insert_with(|| Vec::new())
.push(id.clone());
}
//nm.tag_remove("unprocessed", &id)?;
} else {
if add_tags.is_empty() {
let t = "Grey".to_string();
add_mutations
.entry(t)
.or_insert_with(|| Vec::new())
.push(id.clone());
}
//nm.tag_remove("inbox", &id)?;
let t = "inbox".to_string();
rm_mutations
.entry(t)
.or_insert_with(|| Vec::new())
.push(id.clone());
}
let t = "unprocessed".to_string();
rm_mutations
.entry(t)
.or_insert_with(|| Vec::new())
.push(id.clone());
}
info!("Adding {} distinct labels", add_mutations.len());
for (tag, ids) in add_mutations.iter() {
info!(" {}: {}", tag, ids.len());
if !dryrun {
let ids: Vec<_> = ids.iter().map(|s| s.as_str()).collect();
info_span!("tags_add", tag = tag, count = ids.len())
.in_scope(|| nm.tags_add(tag, &ids))?;
}
}
info!("Removing {} distinct labels", rm_mutations.len());
for (tag, ids) in rm_mutations.iter() {
info!(" {}: {}", tag, ids.len());
if !dryrun {
let ids: Vec<_> = ids.iter().map(|s| s.as_str()).collect();
info_span!("tags_remove", tag = tag, count = ids.len())
.in_scope(|| nm.tags_remove(tag, &ids))?;
}
}
Ok(ids.into())
}
fn find_tags<'a, 'b>(rules: &'a [Rule], headers: &'b [MailHeader]) -> (bool, HashSet<&'a str>) {
let mut matched_rule = false;
let mut add_tags = HashSet::new();
for rule in rules {
for hdr in headers {
if rule.is_match(&hdr.get_key(), &hdr.get_value()) {
//info!("Matched {:?}", rule);
matched_rule = true;
add_tags.insert(rule.tag.as_str());
if rule.stop_on_match {
return (true, add_tags);
}
}
}
}
(matched_rule, add_tags)
}
#[cfg(test)]
mod tests {
use super::*;
const REPORT_V1: &str = r#"
{
"organization-name": "Google Inc.",
"date-range": {
"start-datetime": "2025-08-09T00:00:00Z",
"end-datetime": "2025-08-09T23:59:59Z"
},
"contact-info": "smtp-tls-reporting@google.com",
"report-id": "2025-08-09T00:00:00Z_xinu.tv",
"policies": [
{
"policy": {
"policy-type": "sts",
"policy-string": [
"version: STSv1",
"mode: testing",
"mx: mail.xinu.tv",
"max_age: 86400"
],
"policy-domain": "xinu.tv"
},
"summary": {
"total-successful-session-count": 20,
"total-failure-session-count": 0
}
}
]
}
"#;
// The following constants are kept for future test expansion, but are currently unused.
/*
const REPORT_V2: &str = r#"
{
"organization-name": "Google Inc.",
"date-range": {
"start-datetime": "2025-08-09T00:00:00Z",
"end-datetime": "2025-08-09T23:59:59Z"
},
"contact-info": "smtp-tls-reporting@google.com",
"report-id": "2025-08-09T00:00:00Z_xinu.tv",
"policies": [
{
"policy": {
"policy-type": "sts",
"policy-string": [
"version: STSv1",
"mode": "testing",
"mx": "mail.xinu.tv",
"max_age": "86400"
],
"policy-domain": "xinu.tv",
"mx-host": [
"mail.xinu.tv"
]
},
"summary": {
"total-successful-session-count": 3,
"total-failure-session-count": 0
}
}
]
}
"#;
const REPORT_V3: &str = r#"
{
"organization-name": "Google Inc.",
"date-range": {
"start-datetime": "2025-08-09T00:00:00Z",
"end-datetime": "2025-08-09T23:59:59Z"
},
"contact-info": "smtp-tls-reporting@google.com",
"report-id": "2025-08-09T00:00:00Z_xinu.tv",
"policies": [
{
"policy": {
"policy-type": "sts",
"policy-string": [
"version: STSv1",
"mode": "testing",
"mx": "mail.xinu.tv",
"max_age": "86400"
],
"policy-domain": "xinu.tv",
"mx-host": [
{
"hostname": "mail.xinu.tv",
"failure-count": 0,
"result-type": "success"
}
]
},
"summary": {
"total-successful-session-count": 3,
"total-failure-session-count": 0
}
}
]
}
"#;
*/
#[test]
fn test_parse_tls_report_v1() {
let report: TlsRpt = serde_json::from_str(REPORT_V1).unwrap();
}
}