server: WIP tantivy integration

This commit is contained in:
2024-09-28 11:17:52 -07:00
parent 005a457348
commit ebf32a9905
8 changed files with 285 additions and 99 deletions

View File

@@ -8,7 +8,7 @@ use notmuch::Notmuch;
use serde::{Deserialize, Serialize};
use sqlx::postgres::PgPool;
use crate::{config::Config, newsreader, nm, Query};
use crate::{config::Config, newsreader, nm, tantivy::TantivyConnection, Query};
/// # Number of seconds since the Epoch
pub type UnixTime = isize;
@@ -224,6 +224,7 @@ pub struct Tag {
struct SearchCursor {
newsreader_offset: i32,
notmuch_offset: i32,
tantivy_offset: i32,
}
pub struct QueryRoot;
@@ -258,10 +259,13 @@ impl QueryRoot {
info!("search({after:?} {before:?} {first:?} {last:?} {query:?})",);
let nm = ctx.data_unchecked::<Notmuch>();
let pool = ctx.data_unchecked::<PgPool>();
let tantivy = ctx.data_unchecked::<TantivyConnection>();
#[derive(Debug)]
enum ThreadSummaryCursor {
Newsreader(i32, ThreadSummary),
Notmuch(i32, ThreadSummary),
Tantivy(i32, ThreadSummary),
}
Ok(connection::query(
after,
@@ -279,8 +283,11 @@ impl QueryRoot {
);
let newsreader_after = after.as_ref().map(|sc| sc.newsreader_offset);
let notmuch_after = after.as_ref().map(|sc| sc.notmuch_offset);
let tantivy_after = after.as_ref().map(|sc| sc.tantivy_offset);
let newsreader_before = before.as_ref().map(|sc| sc.newsreader_offset);
let notmuch_before = before.as_ref().map(|sc| sc.notmuch_offset);
let tantivy_before = before.as_ref().map(|sc| sc.tantivy_offset);
let newsreader_query: Query = query.parse()?;
info!("newsreader_query {newsreader_query:?}");
@@ -318,15 +325,39 @@ impl QueryRoot {
Vec::new()
};
let tantivy_results = if newsreader_query.is_tantivy {
tantivy
.search(
pool,
tantivy_after,
tantivy_before,
first.map(|v| v as i32),
last.map(|v| v as i32),
&newsreader_query,
)
.await?
.into_iter()
.map(|(cur, ts)| ThreadSummaryCursor::Tantivy(cur, ts))
.collect()
} else {
Vec::new()
};
info!(
"tantivy results:\nis_tantivy:{} {tantivy_results:#?}",
newsreader_query.is_tantivy
);
let mut results: Vec<_> = newsreader_results
.into_iter()
.chain(notmuch_results)
.chain(tantivy_results)
.collect();
// The leading '-' is to reverse sort
results.sort_by_key(|item| match item {
ThreadSummaryCursor::Newsreader(_, ts) => -ts.timestamp,
ThreadSummaryCursor::Notmuch(_, ts) => -ts.timestamp,
ThreadSummaryCursor::Tantivy(_, ts) => -ts.timestamp,
});
let mut has_next_page = before.is_some();
@@ -348,6 +379,7 @@ impl QueryRoot {
let mut connection = Connection::new(has_previous_page, has_next_page);
let mut newsreader_offset = 0;
let mut notmuch_offset = 0;
let mut tantivy_offset = 0;
connection.edges.extend(results.into_iter().map(|item| {
let thread_summary;
@@ -360,10 +392,15 @@ impl QueryRoot {
thread_summary = ts;
notmuch_offset = offset;
}
ThreadSummaryCursor::Tantivy(offset, ts) => {
thread_summary = ts;
tantivy_offset = offset;
}
}
let cur = OpaqueCursor(SearchCursor {
newsreader_offset,
notmuch_offset,
tantivy_offset,
});
Edge::new(cur, thread_summary)
}));
@@ -443,6 +480,16 @@ impl Mutation {
nm.tag_remove(&tag, &query)?;
Ok(true)
}
/// Drop and recreate tantivy index. Warning this is slow
async fn drop_and_load_index<'ctx>(&self, ctx: &Context<'ctx>) -> Result<bool, Error> {
let tantivy = ctx.data_unchecked::<TantivyConnection>();
let pool = ctx.data_unchecked::<PgPool>();
tantivy.drop_and_load_index()?;
tantivy.reindex(pool).await?;
Ok(true)
}
}
pub type GraphqlSchema = Schema<QueryRoot, Mutation, EmptySubscription>;