server & notmuch: more attachment WIP, stop leaking notmuch processes
This commit is contained in:
parent
28d5562491
commit
f6bdf302fe
@ -208,9 +208,9 @@
|
|||||||
|
|
||||||
use std::{
|
use std::{
|
||||||
ffi::OsStr,
|
ffi::OsStr,
|
||||||
io::{self, BufRead, BufReader, Lines},
|
io::{self},
|
||||||
path::{Path, PathBuf},
|
path::{Path, PathBuf},
|
||||||
process::{Child, ChildStdout, Command, Stdio},
|
process::Command,
|
||||||
};
|
};
|
||||||
|
|
||||||
use log::info;
|
use log::info;
|
||||||
@ -556,14 +556,14 @@ impl Notmuch {
|
|||||||
Ok(res)
|
Ok(res)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn message_ids(&self, query: &str) -> Result<Lines<BufReader<ChildStdout>>, NotmuchError> {
|
pub fn message_ids(&self, query: &str) -> Result<Vec<String>, NotmuchError> {
|
||||||
let mut child = self.run_notmuch_pipe(["search", "--output=messages", query])?;
|
let res = self.run_notmuch(["search", "--output=messages", "--format=json", query])?;
|
||||||
Ok(BufReader::new(child.stdout.take().unwrap()).lines())
|
Ok(serde_json::from_slice(&res)?)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn files(&self, query: &str) -> Result<Lines<BufReader<ChildStdout>>, NotmuchError> {
|
pub fn files(&self, query: &str) -> Result<Vec<String>, NotmuchError> {
|
||||||
let mut child = self.run_notmuch_pipe(["search", "--output=files", query])?;
|
let res = self.run_notmuch(["search", "--output=files", "--format=json", query])?;
|
||||||
Ok(BufReader::new(child.stdout.take().unwrap()).lines())
|
Ok(serde_json::from_slice(&res)?)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn run_notmuch<I, S>(&self, args: I) -> Result<Vec<u8>, NotmuchError>
|
fn run_notmuch<I, S>(&self, args: I) -> Result<Vec<u8>, NotmuchError>
|
||||||
@ -580,21 +580,6 @@ impl Notmuch {
|
|||||||
let out = cmd.output()?;
|
let out = cmd.output()?;
|
||||||
Ok(out.stdout)
|
Ok(out.stdout)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn run_notmuch_pipe<I, S>(&self, args: I) -> Result<Child, NotmuchError>
|
|
||||||
where
|
|
||||||
I: IntoIterator<Item = S>,
|
|
||||||
S: AsRef<OsStr>,
|
|
||||||
{
|
|
||||||
let mut cmd = Command::new("notmuch");
|
|
||||||
if let Some(config_path) = &self.config_path {
|
|
||||||
cmd.arg("--config").arg(config_path);
|
|
||||||
}
|
|
||||||
cmd.args(args);
|
|
||||||
info!("{:?}", &cmd);
|
|
||||||
let child = cmd.stdout(Stdio::piped()).spawn()?;
|
|
||||||
Ok(child)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
|||||||
@ -1,18 +1,20 @@
|
|||||||
use std::{
|
use std::{
|
||||||
|
collections::HashMap,
|
||||||
fs::File,
|
fs::File,
|
||||||
hash::{DefaultHasher, Hash, Hasher},
|
hash::{DefaultHasher, Hash, Hasher},
|
||||||
|
str::FromStr,
|
||||||
};
|
};
|
||||||
|
|
||||||
use async_graphql::{
|
use async_graphql::{
|
||||||
connection::{self, Connection, Edge},
|
connection::{self, Connection, Edge},
|
||||||
Context, EmptyMutation, EmptySubscription, Error, FieldResult, Object, Schema, SimpleObject,
|
Context, EmptyMutation, EmptySubscription, Enum, Error, FieldResult, Object, Schema,
|
||||||
Union,
|
SimpleObject, Union,
|
||||||
};
|
};
|
||||||
use log::{error, info, warn};
|
use log::{error, info, warn};
|
||||||
use mailparse::{parse_mail, MailHeaderMap, ParsedMail};
|
use mailparse::{parse_mail, MailHeader, MailHeaderMap, ParsedMail};
|
||||||
use memmap::MmapOptions;
|
use memmap::MmapOptions;
|
||||||
use notmuch::Notmuch;
|
use notmuch::Notmuch;
|
||||||
use rayon::prelude::*;
|
use rocket::time::Instant;
|
||||||
|
|
||||||
pub struct QueryRoot;
|
pub struct QueryRoot;
|
||||||
|
|
||||||
@ -46,6 +48,8 @@ pub struct Thread {
|
|||||||
|
|
||||||
#[derive(Debug, SimpleObject)]
|
#[derive(Debug, SimpleObject)]
|
||||||
pub struct Message {
|
pub struct Message {
|
||||||
|
// Message-ID for message, prepend `id:<id>` to search in notmuch
|
||||||
|
pub id: String,
|
||||||
// First From header found in email
|
// First From header found in email
|
||||||
pub from: Option<Email>,
|
pub from: Option<Email>,
|
||||||
// All To headers found in email
|
// All To headers found in email
|
||||||
@ -56,10 +60,50 @@ pub struct Message {
|
|||||||
pub subject: Option<String>,
|
pub subject: Option<String>,
|
||||||
// Parsed Date header, if found and valid
|
// Parsed Date header, if found and valid
|
||||||
pub timestamp: Option<i64>,
|
pub timestamp: Option<i64>,
|
||||||
|
// Headers
|
||||||
|
pub headers: Vec<Header>,
|
||||||
// The body contents
|
// The body contents
|
||||||
pub body: Body,
|
pub body: Body,
|
||||||
// On disk location of message
|
// On disk location of message
|
||||||
pub path: String,
|
pub path: String,
|
||||||
|
pub attachments: Vec<Attachment>,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Content-Type: image/jpeg; name="PXL_20231125_204826860.jpg"
|
||||||
|
// Content-Disposition: attachment; filename="PXL_20231125_204826860.jpg"
|
||||||
|
// Content-Transfer-Encoding: base64
|
||||||
|
// Content-ID: <f_lponoluo1>
|
||||||
|
// X-Attachment-Id: f_lponoluo1
|
||||||
|
#[derive(Debug, SimpleObject)]
|
||||||
|
pub struct Attachment {
|
||||||
|
filename: String,
|
||||||
|
content_type: Option<String>,
|
||||||
|
content_id: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Enum, Copy, Clone, Eq, PartialEq)]
|
||||||
|
enum DispositionType {
|
||||||
|
Inline,
|
||||||
|
Attachment,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl FromStr for DispositionType {
|
||||||
|
type Err = String;
|
||||||
|
|
||||||
|
// Required method
|
||||||
|
fn from_str(s: &str) -> Result<Self, Self::Err> {
|
||||||
|
Ok(match s {
|
||||||
|
"inline" => DispositionType::Inline,
|
||||||
|
"attachment" => DispositionType::Attachment,
|
||||||
|
c => return Err(format!("unknown disposition type: {c}")),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, SimpleObject)]
|
||||||
|
pub struct Header {
|
||||||
|
key: String,
|
||||||
|
value: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
@ -104,6 +148,9 @@ impl Html {
|
|||||||
async fn content_tree(&self) -> &str {
|
async fn content_tree(&self) -> &str {
|
||||||
&self.content_tree
|
&self.content_tree
|
||||||
}
|
}
|
||||||
|
async fn headers(&self) -> Vec<Header> {
|
||||||
|
Vec::new()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Union)]
|
#[derive(Debug, Union)]
|
||||||
@ -214,15 +261,32 @@ impl QueryRoot {
|
|||||||
|
|
||||||
async fn tags<'ctx>(&self, ctx: &Context<'ctx>) -> FieldResult<Vec<Tag>> {
|
async fn tags<'ctx>(&self, ctx: &Context<'ctx>) -> FieldResult<Vec<Tag>> {
|
||||||
let nm = ctx.data_unchecked::<Notmuch>();
|
let nm = ctx.data_unchecked::<Notmuch>();
|
||||||
Ok(nm
|
let now = Instant::now();
|
||||||
|
let needs_unread = ctx.look_ahead().field("unread").exists();
|
||||||
|
let unread_msg_cnt: HashMap<String, usize> = if needs_unread {
|
||||||
|
// 10000 is an arbitrary number, if there's more than 10k unread messages, we'll
|
||||||
|
// get an inaccurate count.
|
||||||
|
nm.search("is:unread", 0, 10000)?
|
||||||
|
.0
|
||||||
|
.iter()
|
||||||
|
.fold(HashMap::new(), |mut m, ts| {
|
||||||
|
ts.tags.iter().for_each(|t| {
|
||||||
|
m.entry(t.clone()).and_modify(|c| *c += 1).or_insert(1);
|
||||||
|
});
|
||||||
|
m
|
||||||
|
})
|
||||||
|
} else {
|
||||||
|
HashMap::new()
|
||||||
|
};
|
||||||
|
let tags = nm
|
||||||
.tags()?
|
.tags()?
|
||||||
.into_par_iter()
|
.into_iter()
|
||||||
.map(|tag| {
|
.map(|tag| {
|
||||||
let mut hasher = DefaultHasher::new();
|
let mut hasher = DefaultHasher::new();
|
||||||
tag.hash(&mut hasher);
|
tag.hash(&mut hasher);
|
||||||
let hex = format!("#{:06x}", hasher.finish() % (1 << 24));
|
let hex = format!("#{:06x}", hasher.finish() % (1 << 24));
|
||||||
let unread = if ctx.look_ahead().field("unread").exists() {
|
let unread = if needs_unread {
|
||||||
nm.count(&format!("tag:{tag} is:unread")).unwrap_or(0)
|
*unread_msg_cnt.get(&tag).unwrap_or(&0)
|
||||||
} else {
|
} else {
|
||||||
0
|
0
|
||||||
};
|
};
|
||||||
@ -233,7 +297,9 @@ impl QueryRoot {
|
|||||||
unread,
|
unread,
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
.collect())
|
.collect();
|
||||||
|
info!("Fetching tags took {}", now.elapsed());
|
||||||
|
Ok(tags)
|
||||||
}
|
}
|
||||||
async fn thread<'ctx>(&self, ctx: &Context<'ctx>, thread_id: String) -> Result<Thread, Error> {
|
async fn thread<'ctx>(&self, ctx: &Context<'ctx>, thread_id: String) -> Result<Thread, Error> {
|
||||||
// TODO(wathiede): normalize all email addresses through an address book with preferred
|
// TODO(wathiede): normalize all email addresses through an address book with preferred
|
||||||
@ -246,8 +312,8 @@ impl QueryRoot {
|
|||||||
.field("contentTree")
|
.field("contentTree")
|
||||||
.exists();
|
.exists();
|
||||||
let mut messages = Vec::new();
|
let mut messages = Vec::new();
|
||||||
for path in nm.files(&thread_id)? {
|
for (path, id) in std::iter::zip(nm.files(&thread_id)?, nm.message_ids(&thread_id)?) {
|
||||||
let path = path?;
|
info!("{id}\nfile: {path}");
|
||||||
let file = File::open(&path)?;
|
let file = File::open(&path)?;
|
||||||
let mmap = unsafe { MmapOptions::new().map(&file)? };
|
let mmap = unsafe { MmapOptions::new().map(&file)? };
|
||||||
let m = parse_mail(&mmap)?;
|
let m = parse_mail(&mmap)?;
|
||||||
@ -290,14 +356,27 @@ impl QueryRoot {
|
|||||||
}),
|
}),
|
||||||
b => b,
|
b => b,
|
||||||
};
|
};
|
||||||
|
let headers = m
|
||||||
|
.headers
|
||||||
|
.iter()
|
||||||
|
.map(|h| Header {
|
||||||
|
key: h.get_key(),
|
||||||
|
value: h.get_value(),
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
// TODO(wathiede): parse message and fill out attachments
|
||||||
|
let attachments = extract_attachments(&m)?;
|
||||||
messages.push(Message {
|
messages.push(Message {
|
||||||
|
id,
|
||||||
from,
|
from,
|
||||||
to,
|
to,
|
||||||
cc,
|
cc,
|
||||||
subject,
|
subject,
|
||||||
timestamp,
|
timestamp,
|
||||||
|
headers,
|
||||||
body,
|
body,
|
||||||
path,
|
path,
|
||||||
|
attachments,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
messages.reverse();
|
messages.reverse();
|
||||||
@ -339,6 +418,10 @@ fn extract_unhandled(m: &ParsedMail) -> Result<Body, Error> {
|
|||||||
text: msg,
|
text: msg,
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// multipart/alternative defines multiple representations of the same message, and clients should
|
||||||
|
// show the fanciest they can display. For this program, the priority is text/html, text/plain,
|
||||||
|
// then give up.
|
||||||
fn extract_alternative(m: &ParsedMail) -> Result<Body, Error> {
|
fn extract_alternative(m: &ParsedMail) -> Result<Body, Error> {
|
||||||
for sp in &m.subparts {
|
for sp in &m.subparts {
|
||||||
if sp.ctype.mimetype == "text/html" {
|
if sp.ctype.mimetype == "text/html" {
|
||||||
@ -355,6 +438,8 @@ fn extract_alternative(m: &ParsedMail) -> Result<Body, Error> {
|
|||||||
Err("extract_alternative".into())
|
Err("extract_alternative".into())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// multipart/mixed defines multiple types of context all of which should be presented to the user
|
||||||
|
// 'serially'.
|
||||||
fn extract_mixed(m: &ParsedMail) -> Result<Body, Error> {
|
fn extract_mixed(m: &ParsedMail) -> Result<Body, Error> {
|
||||||
for sp in &m.subparts {
|
for sp in &m.subparts {
|
||||||
if sp.ctype.mimetype == "multipart/alternative" {
|
if sp.ctype.mimetype == "multipart/alternative" {
|
||||||
@ -394,21 +479,92 @@ fn extract_related(m: &ParsedMail) -> Result<Body, Error> {
|
|||||||
Err("extract_related".into())
|
Err("extract_related".into())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO(wathiede): make this walk_attachments that takes a closure.
|
||||||
|
// Then implement one closure for building `Attachment` and imlement another that can be used to
|
||||||
|
// get the bytes for serving attachments of HTTP
|
||||||
|
fn extract_attachments(m: &ParsedMail) -> Result<Vec<Attachment>, Error> {
|
||||||
|
let mut attachements = Vec::new();
|
||||||
|
for sp in &m.subparts {
|
||||||
|
for h in &sp.headers {
|
||||||
|
if h.get_key() == "Content-Disposition" {
|
||||||
|
let v = h.get_value();
|
||||||
|
if let Some(idx) = v.find(";") {
|
||||||
|
let dt = &v[..idx];
|
||||||
|
match DispositionType::from_str(dt) {
|
||||||
|
Ok(DispositionType::Attachment) => {
|
||||||
|
attachements.push(Attachment {
|
||||||
|
filename: get_attachment_filename(&v).to_string(),
|
||||||
|
content_type: get_content_type(&sp.headers),
|
||||||
|
content_id: get_content_id(&sp.headers),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
Ok(DispositionType::Inline) => continue,
|
||||||
|
Err(e) => {
|
||||||
|
warn!("failed to parse Content-Disposition type '{}'", e);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
} else {
|
||||||
|
warn!("header has Content-Disposition missing ';'");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(attachements)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_attachment_filename(header_value: &str) -> &str {
|
||||||
|
// Strip last "
|
||||||
|
let v = &header_value[..header_value.len() - 1];
|
||||||
|
if let Some(idx) = v.rfind('"') {
|
||||||
|
&v[idx + 1..]
|
||||||
|
} else {
|
||||||
|
""
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_content_type<'a>(headers: &[MailHeader<'a>]) -> Option<String> {
|
||||||
|
for h in headers {
|
||||||
|
if h.get_key() == "Content-Type" {
|
||||||
|
let v = h.get_value();
|
||||||
|
if let Some(idx) = v.find(';') {
|
||||||
|
return Some(v[..idx].to_string());
|
||||||
|
} else {
|
||||||
|
return Some(v);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
None
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_content_id<'a>(headers: &[MailHeader<'a>]) -> Option<String> {
|
||||||
|
for h in headers {
|
||||||
|
if h.get_key() == "Content-ID" {
|
||||||
|
return Some(h.get_value());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
None
|
||||||
|
}
|
||||||
|
|
||||||
fn render_content_type_tree(m: &ParsedMail) -> String {
|
fn render_content_type_tree(m: &ParsedMail) -> String {
|
||||||
const WIDTH: usize = 4;
|
const WIDTH: usize = 4;
|
||||||
fn render_rec(m: &ParsedMail, depth: usize) -> String {
|
fn render_rec(m: &ParsedMail, depth: usize) -> String {
|
||||||
let mut parts = Vec::new();
|
let mut parts = Vec::new();
|
||||||
let msg = format!("{} {}", "-".repeat(depth * WIDTH), m.ctype.mimetype);
|
let msg = format!("{} {}", "-".repeat(depth * WIDTH), m.ctype.mimetype);
|
||||||
parts.push(msg);
|
parts.push(msg);
|
||||||
|
let indent = " ".repeat(depth * WIDTH);
|
||||||
if !m.ctype.charset.is_empty() {
|
if !m.ctype.charset.is_empty() {
|
||||||
parts.push(format!(
|
parts.push(format!("{indent} Character Set: {}", m.ctype.charset));
|
||||||
"{} Character Set: {}",
|
|
||||||
" ".repeat(depth * WIDTH),
|
|
||||||
m.ctype.charset
|
|
||||||
));
|
|
||||||
}
|
}
|
||||||
for (k, v) in m.ctype.params.iter() {
|
for (k, v) in m.ctype.params.iter() {
|
||||||
parts.push(format!("{} {k}: {v}", " ".repeat(depth * WIDTH),));
|
parts.push(format!("{indent} {k}: {v}"));
|
||||||
|
}
|
||||||
|
if !m.headers.is_empty() {
|
||||||
|
parts.push(format!("{indent} == headers =="));
|
||||||
|
for h in &m.headers {
|
||||||
|
parts.push(format!("{indent} {}: {}", h.get_key(), h.get_value()));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
for sp in &m.subparts {
|
for sp in &m.subparts {
|
||||||
parts.push(render_rec(sp, depth + 1))
|
parts.push(render_rec(sp, depth + 1))
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user