From 720621e19ee09038ff212a42210edd96e522f625 Mon Sep 17 00:00:00 2001 From: Bill Thiede Date: Sun, 31 Oct 2021 16:06:04 -0700 Subject: [PATCH] notmuch: add bulk version of allmail::parse. --- notmuch/Cargo.toml | 1 + notmuch/src/lib.rs | 3 +- notmuch/tests/allmail.rs | 70 ++++++++++++++++++++++++++++++++++++---- 3 files changed, 67 insertions(+), 7 deletions(-) diff --git a/notmuch/Cargo.toml b/notmuch/Cargo.toml index 7b61471..24442f1 100644 --- a/notmuch/Cargo.toml +++ b/notmuch/Cargo.toml @@ -12,5 +12,6 @@ serde_json = { version = "1.0", features = ["unbounded_depth"] } thiserror = "1.0.30" [dev-dependencies] +itertools = "0.10.1" pretty_assertions = "1" rayon = "1.5" diff --git a/notmuch/src/lib.rs b/notmuch/src/lib.rs index e31a104..051877e 100644 --- a/notmuch/src/lib.rs +++ b/notmuch/src/lib.rs @@ -480,7 +480,8 @@ impl Notmuch { pub fn show(&self, query: &str) -> Result { let slice = self.run_notmuch(["show", "--format=json", query])?; - // + // Notmuch returns JSON with invalid unicode. So we lossy convert it to a string here an + // use that for parsing in rust. let s = String::from_utf8_lossy(&slice); let mut deserializer = serde_json::Deserializer::from_str(&s); deserializer.disable_recursion_limit(); diff --git a/notmuch/tests/allmail.rs b/notmuch/tests/allmail.rs index 6210fb3..4abc676 100644 --- a/notmuch/tests/allmail.rs +++ b/notmuch/tests/allmail.rs @@ -4,12 +4,14 @@ use std::{ time::{Duration, Instant}, }; +use itertools::Itertools; use rayon::iter::{ParallelBridge, ParallelIterator}; use notmuch::{Notmuch, NotmuchError, SearchSummary, ThreadSet}; #[test] -fn parse() -> Result<(), Box> { +#[ignore] // it is too expensive +fn parse_one() -> Result<(), Box> { // take_hook() returns the default hook in case when a custom one is not set let orig_hook = std::panic::take_hook(); std::panic::set_hook(Box::new(move |panic_info| { @@ -19,7 +21,7 @@ fn parse() -> Result<(), Box> { })); let nm = Notmuch::default(); - let count = nm.count("*")? as f32; + let total = nm.count("*")? as f32; let start = Instant::now(); nm.message_ids("*")? .enumerate() @@ -32,12 +34,17 @@ fn parse() -> Result<(), Box> { //println!("{:?}", ts); if i > 0 && i % 1000 == 0 { let diff = start.elapsed(); - let percent = (i as f32 * 100.) / count; - let eta = diff.mul_f32(count as f32).div_f32(i as f32); + let progress = i as f32; + let percent = (progress * 100.) / total; + let duration_per_message = diff.div_f32(progress); + let total_runtime = duration_per_message.mul_f32(total as f32); + let eta = duration_per_message.mul_f32(total - progress); print!( - "\nElapsed {}s ETA {}s Percent {}% ", + "\nElapsed {}s ETA {}s Total {}s Time/msg {}us Percent {}% ", diff.as_secs_f32(), eta.as_secs_f32(), + total_runtime.as_secs_f32(), + duration_per_message.as_micros(), percent ); stdout().flush().expect("failed to flush stdout"); @@ -48,6 +55,57 @@ fn parse() -> Result<(), Box> { } }); println!("\n"); - assert!(false); + Ok(()) +} + +#[test] +#[ignore] // it is too expensive +fn parse_bulk() -> Result<(), Box> { + // take_hook() returns the default hook in case when a custom one is not set + let orig_hook = std::panic::take_hook(); + std::panic::set_hook(Box::new(move |panic_info| { + // invoke the default handler and exit the process + orig_hook(panic_info); + std::process::exit(1); + })); + + let nm = Notmuch::default(); + let total = nm.count("*")? as f32; + let start = Instant::now(); + let chunk_size = 1000; + nm.message_ids("*")? + .chunks(chunk_size) + .into_iter() + .enumerate() + //.par_bridge() + .for_each(|(i, chunk)| { + let msgs: Result, _> = chunk.collect(); + let msgs = msgs.expect("failed to unwrap msg"); + let query = msgs.join(" OR "); + let ts = nm + .show(&query) + .expect(&format!("failed to show msgs: {}", query)); + //println!("{:?}", ts); + if i > 0 && i % 10 == 0 { + let progress = (i * chunk_size) as f32; + let diff = start.elapsed(); + let percent = (progress * 100.) / total; + let duration_per_message = diff.div_f32(progress); + let total_runtime = duration_per_message.mul_f32(total as f32); + let eta = duration_per_message.mul_f32(total - progress); + print!( + "\nElapsed {}s ETA {}s Total {}s Time/msg {}us Percent {}% ", + diff.as_secs_f32(), + eta.as_secs_f32(), + total_runtime.as_secs_f32(), + duration_per_message.as_micros(), + percent + ); + stdout().flush().expect("failed to flush stdout"); + } + print!("."); + stdout().flush().expect("failed to flush stdout"); + }); + println!("\n"); Ok(()) }