notmuch: add bulk version of allmail::parse.
This commit is contained in:
parent
448cef15a8
commit
720621e19e
@ -12,5 +12,6 @@ serde_json = { version = "1.0", features = ["unbounded_depth"] }
|
|||||||
thiserror = "1.0.30"
|
thiserror = "1.0.30"
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
|
itertools = "0.10.1"
|
||||||
pretty_assertions = "1"
|
pretty_assertions = "1"
|
||||||
rayon = "1.5"
|
rayon = "1.5"
|
||||||
|
|||||||
@ -480,7 +480,8 @@ impl Notmuch {
|
|||||||
|
|
||||||
pub fn show(&self, query: &str) -> Result<ThreadSet, NotmuchError> {
|
pub fn show(&self, query: &str) -> Result<ThreadSet, NotmuchError> {
|
||||||
let slice = self.run_notmuch(["show", "--format=json", query])?;
|
let slice = self.run_notmuch(["show", "--format=json", query])?;
|
||||||
//
|
// Notmuch returns JSON with invalid unicode. So we lossy convert it to a string here an
|
||||||
|
// use that for parsing in rust.
|
||||||
let s = String::from_utf8_lossy(&slice);
|
let s = String::from_utf8_lossy(&slice);
|
||||||
let mut deserializer = serde_json::Deserializer::from_str(&s);
|
let mut deserializer = serde_json::Deserializer::from_str(&s);
|
||||||
deserializer.disable_recursion_limit();
|
deserializer.disable_recursion_limit();
|
||||||
|
|||||||
@ -4,12 +4,14 @@ use std::{
|
|||||||
time::{Duration, Instant},
|
time::{Duration, Instant},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
use itertools::Itertools;
|
||||||
use rayon::iter::{ParallelBridge, ParallelIterator};
|
use rayon::iter::{ParallelBridge, ParallelIterator};
|
||||||
|
|
||||||
use notmuch::{Notmuch, NotmuchError, SearchSummary, ThreadSet};
|
use notmuch::{Notmuch, NotmuchError, SearchSummary, ThreadSet};
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn parse() -> Result<(), Box<dyn Error>> {
|
#[ignore] // it is too expensive
|
||||||
|
fn parse_one() -> Result<(), Box<dyn Error>> {
|
||||||
// take_hook() returns the default hook in case when a custom one is not set
|
// take_hook() returns the default hook in case when a custom one is not set
|
||||||
let orig_hook = std::panic::take_hook();
|
let orig_hook = std::panic::take_hook();
|
||||||
std::panic::set_hook(Box::new(move |panic_info| {
|
std::panic::set_hook(Box::new(move |panic_info| {
|
||||||
@ -19,7 +21,7 @@ fn parse() -> Result<(), Box<dyn Error>> {
|
|||||||
}));
|
}));
|
||||||
|
|
||||||
let nm = Notmuch::default();
|
let nm = Notmuch::default();
|
||||||
let count = nm.count("*")? as f32;
|
let total = nm.count("*")? as f32;
|
||||||
let start = Instant::now();
|
let start = Instant::now();
|
||||||
nm.message_ids("*")?
|
nm.message_ids("*")?
|
||||||
.enumerate()
|
.enumerate()
|
||||||
@ -32,12 +34,17 @@ fn parse() -> Result<(), Box<dyn Error>> {
|
|||||||
//println!("{:?}", ts);
|
//println!("{:?}", ts);
|
||||||
if i > 0 && i % 1000 == 0 {
|
if i > 0 && i % 1000 == 0 {
|
||||||
let diff = start.elapsed();
|
let diff = start.elapsed();
|
||||||
let percent = (i as f32 * 100.) / count;
|
let progress = i as f32;
|
||||||
let eta = diff.mul_f32(count as f32).div_f32(i as f32);
|
let percent = (progress * 100.) / total;
|
||||||
|
let duration_per_message = diff.div_f32(progress);
|
||||||
|
let total_runtime = duration_per_message.mul_f32(total as f32);
|
||||||
|
let eta = duration_per_message.mul_f32(total - progress);
|
||||||
print!(
|
print!(
|
||||||
"\nElapsed {}s ETA {}s Percent {}% ",
|
"\nElapsed {}s ETA {}s Total {}s Time/msg {}us Percent {}% ",
|
||||||
diff.as_secs_f32(),
|
diff.as_secs_f32(),
|
||||||
eta.as_secs_f32(),
|
eta.as_secs_f32(),
|
||||||
|
total_runtime.as_secs_f32(),
|
||||||
|
duration_per_message.as_micros(),
|
||||||
percent
|
percent
|
||||||
);
|
);
|
||||||
stdout().flush().expect("failed to flush stdout");
|
stdout().flush().expect("failed to flush stdout");
|
||||||
@ -48,6 +55,57 @@ fn parse() -> Result<(), Box<dyn Error>> {
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
println!("\n");
|
println!("\n");
|
||||||
assert!(false);
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
#[ignore] // it is too expensive
|
||||||
|
fn parse_bulk() -> Result<(), Box<dyn Error>> {
|
||||||
|
// take_hook() returns the default hook in case when a custom one is not set
|
||||||
|
let orig_hook = std::panic::take_hook();
|
||||||
|
std::panic::set_hook(Box::new(move |panic_info| {
|
||||||
|
// invoke the default handler and exit the process
|
||||||
|
orig_hook(panic_info);
|
||||||
|
std::process::exit(1);
|
||||||
|
}));
|
||||||
|
|
||||||
|
let nm = Notmuch::default();
|
||||||
|
let total = nm.count("*")? as f32;
|
||||||
|
let start = Instant::now();
|
||||||
|
let chunk_size = 1000;
|
||||||
|
nm.message_ids("*")?
|
||||||
|
.chunks(chunk_size)
|
||||||
|
.into_iter()
|
||||||
|
.enumerate()
|
||||||
|
//.par_bridge()
|
||||||
|
.for_each(|(i, chunk)| {
|
||||||
|
let msgs: Result<Vec<_>, _> = chunk.collect();
|
||||||
|
let msgs = msgs.expect("failed to unwrap msg");
|
||||||
|
let query = msgs.join(" OR ");
|
||||||
|
let ts = nm
|
||||||
|
.show(&query)
|
||||||
|
.expect(&format!("failed to show msgs: {}", query));
|
||||||
|
//println!("{:?}", ts);
|
||||||
|
if i > 0 && i % 10 == 0 {
|
||||||
|
let progress = (i * chunk_size) as f32;
|
||||||
|
let diff = start.elapsed();
|
||||||
|
let percent = (progress * 100.) / total;
|
||||||
|
let duration_per_message = diff.div_f32(progress);
|
||||||
|
let total_runtime = duration_per_message.mul_f32(total as f32);
|
||||||
|
let eta = duration_per_message.mul_f32(total - progress);
|
||||||
|
print!(
|
||||||
|
"\nElapsed {}s ETA {}s Total {}s Time/msg {}us Percent {}% ",
|
||||||
|
diff.as_secs_f32(),
|
||||||
|
eta.as_secs_f32(),
|
||||||
|
total_runtime.as_secs_f32(),
|
||||||
|
duration_per_message.as_micros(),
|
||||||
|
percent
|
||||||
|
);
|
||||||
|
stdout().flush().expect("failed to flush stdout");
|
||||||
|
}
|
||||||
|
print!(".");
|
||||||
|
stdout().flush().expect("failed to flush stdout");
|
||||||
|
});
|
||||||
|
println!("\n");
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user