letterbox/server/src/tantivy.rs

354 lines
13 KiB
Rust

use std::collections::HashSet;
use log::{debug, error, info, warn};
use sqlx::{postgres::PgPool, types::time::PrimitiveDateTime};
use tantivy::{
collector::{DocSetCollector, TopDocs},
doc, query,
query::{AllQuery, BooleanQuery, Occur, QueryParser, TermQuery},
schema::{Facet, IndexRecordOption, Value},
DocAddress, Index, IndexReader, Searcher, TantivyDocument, TantivyError, Term,
};
use tracing::{info_span, instrument, Instrument};
use crate::{
compute_offset_limit,
error::ServerError,
graphql::{Corpus, ThreadSummary},
newsreader::{extract_thread_id, is_newsreader_thread},
thread_summary_from_row, Query, ThreadSummaryRecord,
};
pub fn is_tantivy_query(query: &Query) -> bool {
query.is_tantivy || query.corpus == Some(Corpus::Tantivy)
}
pub struct TantivyConnection {
db_path: String,
index: Index,
reader: IndexReader,
}
fn get_index(db_path: &str) -> Result<Index, TantivyError> {
Ok(match Index::open_in_dir(db_path) {
Ok(idx) => idx,
Err(err) => {
warn!("Failed to open {db_path}: {err}");
create_news_db(db_path)?;
Index::open_in_dir(db_path)?
}
})
}
impl TantivyConnection {
pub fn new(tantivy_db_path: &str) -> Result<TantivyConnection, TantivyError> {
let index = get_index(tantivy_db_path)?;
let reader = index.reader()?;
Ok(TantivyConnection {
db_path: tantivy_db_path.to_string(),
index,
reader,
})
}
#[instrument(name = "tantivy::refresh", skip_all)]
pub async fn refresh(&self, pool: &PgPool) -> Result<(), ServerError> {
let start_time = std::time::Instant::now();
let p_uids: Vec<_> = sqlx::query_file!("sql/all-uids.sql")
.fetch_all(pool)
.instrument(info_span!("postgres query"))
.await?
.into_iter()
.map(|r| r.uid)
.collect();
info!(
"refresh from postgres got {} uids in {}",
p_uids.len(),
start_time.elapsed().as_secs_f32()
);
let t_span = info_span!("tantivy query");
let _enter = t_span.enter();
let start_time = std::time::Instant::now();
let (searcher, _query) = self.searcher_and_query(&Query::default())?;
let docs = searcher.search(&AllQuery, &DocSetCollector)?;
let uid = self.index.schema().get_field("uid")?;
let t_uids: Vec<_> = docs
.into_iter()
.map(|doc_address| {
searcher
.doc(doc_address)
.map(|doc: TantivyDocument| {
debug!("doc: {doc:#?}");
doc.get_first(uid)
.expect("uid")
.as_str()
.expect("as_str")
.to_string()
})
.expect("searcher.doc")
})
.collect();
drop(_enter);
info!(
"refresh tantivy got {} uids in {}",
t_uids.len(),
start_time.elapsed().as_secs_f32()
);
let t_set: HashSet<_> = t_uids.into_iter().collect();
let need: Vec<_> = p_uids
.into_iter()
.filter(|uid| !t_set.contains(uid.as_str()))
.collect();
if !need.is_empty() {
info!(
"need to reindex {} uids: {:?}...",
need.len(),
&need[..need.len().min(10)]
);
}
let batch_size = 1000;
let uids: Vec<_> = need[..need.len().min(batch_size)]
.into_iter()
.cloned()
.collect();
self.reindex_uids(pool, &uids).await
}
#[instrument(skip(self, pool))]
async fn reindex_uids(&self, pool: &PgPool, uids: &[String]) -> Result<(), ServerError> {
if uids.is_empty() {
return Ok(());
}
// TODO: add SlurpContents and convert HTML to text
let pool: &PgPool = pool;
let mut index_writer = self.index.writer(50_000_000)?;
let schema = self.index.schema();
let site = schema.get_field("site")?;
let title = schema.get_field("title")?;
let summary = schema.get_field("summary")?;
let link = schema.get_field("link")?;
let date = schema.get_field("date")?;
let is_read = schema.get_field("is_read")?;
let uid = schema.get_field("uid")?;
let id = schema.get_field("id")?;
let tag = schema.get_field("tag")?;
info!("reindexing {} posts", uids.len());
let rows = sqlx::query_file_as!(PostgresDoc, "sql/posts-from-uids.sql", uids)
.fetch_all(pool)
.await?;
if uids.len() != rows.len() {
error!(
"Had {} uids and only got {} rows: uids {uids:?}",
uids.len(),
rows.len()
);
}
for r in rows {
let id_term = Term::from_field_text(uid, &r.uid);
index_writer.delete_term(id_term);
let slug = r.site;
let tag_facet = Facet::from(&format!("/News/{slug}"));
index_writer.add_document(doc!(
site => slug.clone(),
title => r.title,
// TODO: clean and extract text from HTML
summary => r.summary,
link => r.link,
date => tantivy::DateTime::from_primitive(r.date),
is_read => r.is_read,
uid => r.uid,
id => r.id as u64,
tag => tag_facet,
))?;
}
info_span!("IndexWriter.commit").in_scope(|| index_writer.commit())?;
info_span!("IndexReader.reload").in_scope(|| self.reader.reload())?;
Ok(())
}
#[instrument(name = "tantivy::reindex_thread", skip_all, fields(query=%query))]
pub async fn reindex_thread(&self, pool: &PgPool, query: &Query) -> Result<(), ServerError> {
let uids: Vec<_> = query
.uids
.iter()
.filter(|uid| is_newsreader_thread(uid))
.map(|uid| extract_thread_id(uid).to_string())
.collect();
Ok(self.reindex_uids(pool, &uids).await?)
}
#[instrument(name = "tantivy::reindex_all", skip_all)]
pub async fn reindex_all(&self, pool: &PgPool) -> Result<(), ServerError> {
let rows = sqlx::query_file!("sql/all-posts.sql")
.fetch_all(pool)
.await?;
let uids: Vec<String> = rows.into_iter().map(|r| r.uid).collect();
self.reindex_uids(pool, &uids).await?;
Ok(())
}
fn searcher_and_query(
&self,
query: &Query,
) -> Result<(Searcher, Box<dyn query::Query>), ServerError> {
// TODO: only create one reader
// From https://tantivy-search.github.io/examples/basic_search.html
// "For a search server you will typically create one reader for the entire lifetime of
// your program, and acquire a new searcher for every single request."
//
// I think there's some challenge in making the reader work if we reindex, so reader my
// need to be stored indirectly, and be recreated on reindex
// I think creating a reader takes 200-300 ms.
let schema = self.index.schema();
let searcher = self.reader.searcher();
let title = schema.get_field("title")?;
let summary = schema.get_field("summary")?;
let query_parser = QueryParser::for_index(&self.index, vec![title, summary]);
// Tantivy uses '*' to match all docs, not empty string
let term = &query.remainder.join(" ");
let term = if term.is_empty() { "*" } else { term };
info!("query_parser('{term}')");
let tantivy_query = query_parser.parse_query(&term)?;
let tag = schema.get_field("tag")?;
let is_read = schema.get_field("is_read")?;
let mut terms = vec![(Occur::Must, tantivy_query)];
for t in &query.tags {
let facet = Facet::from(&format!("/{t}"));
let facet_term = Term::from_facet(tag, &facet);
let facet_term_query = Box::new(TermQuery::new(facet_term, IndexRecordOption::Basic));
terms.push((Occur::Must, facet_term_query));
}
if query.unread_only {
info!("searching for unread only");
let term = Term::from_field_bool(is_read, false);
terms.push((
Occur::Must,
Box::new(TermQuery::new(term, IndexRecordOption::Basic)),
));
}
let search_query = BooleanQuery::new(terms);
Ok((searcher, Box::new(search_query)))
}
#[instrument(name="tantivy::count", skip_all, fields(query=%query))]
pub async fn count(&self, query: &Query) -> Result<usize, ServerError> {
if !is_tantivy_query(query) {
return Ok(0);
}
info!("tantivy::count {query:?}");
use tantivy::collector::Count;
let (searcher, query) = self.searcher_and_query(&query)?;
Ok(searcher.search(&query, &Count)?)
}
#[instrument(name="tantivy::search", skip_all, fields(query=%query))]
pub async fn search(
&self,
pool: &PgPool,
after: Option<i32>,
before: Option<i32>,
first: Option<i32>,
last: Option<i32>,
query: &Query,
) -> Result<Vec<(i32, ThreadSummary)>, async_graphql::Error> {
if !is_tantivy_query(query) {
return Ok(Vec::new());
}
let (offset, mut limit) = compute_offset_limit(after, before, first, last);
if before.is_none() {
// When searching forward, the +1 is to see if there are more pages of data available.
// Searching backwards implies there's more pages forward, because the value represented by
// `before` is on the next page.
limit = limit + 1;
}
let (searcher, search_query) = self.searcher_and_query(&query)?;
info!("Tantivy::search(query '{query:?}', off {offset}, lim {limit}, search_query {search_query:?})");
let top_docs = searcher.search(
&search_query,
&TopDocs::with_limit(limit as usize)
.and_offset(offset as usize)
.order_by_u64_field("date", tantivy::index::Order::Desc),
)?;
info!("search found {} docs", top_docs.len());
let uid = self.index.schema().get_field("uid")?;
let uids = top_docs
.into_iter()
.map(|(_, doc_address): (u64, DocAddress)| {
searcher.doc(doc_address).map(|doc: TantivyDocument| {
debug!("doc: {doc:#?}");
doc.get_first(uid)
.expect("doc missing uid")
.as_str()
.expect("doc str missing")
.to_string()
})
})
.collect::<Result<Vec<String>, TantivyError>>()?;
//let uids = format!("'{}'", uids.join("','"));
info!("uids {uids:?}");
let rows = sqlx::query_file!("sql/threads-from-uid.sql", &uids as &[String])
.fetch_all(pool)
.await?;
let mut res = Vec::new();
info!("found {} hits joining w/ tantivy", rows.len());
for (i, r) in rows.into_iter().enumerate() {
res.push((
i as i32 + offset,
thread_summary_from_row(ThreadSummaryRecord {
site: r.site,
date: r.date,
is_read: r.is_read,
title: r.title,
uid: r.uid,
name: r.name,
corpus: Corpus::Tantivy,
})
.await,
));
}
Ok(res)
}
pub fn drop_and_load_index(&self) -> Result<(), TantivyError> {
create_news_db(&self.db_path)
}
}
fn create_news_db(tantivy_db_path: &str) -> Result<(), TantivyError> {
info!("create_news_db");
// Don't care if directory didn't exist
let _ = std::fs::remove_dir_all(tantivy_db_path);
std::fs::create_dir_all(tantivy_db_path)?;
use tantivy::schema::*;
let mut schema_builder = Schema::builder();
schema_builder.add_text_field("site", STRING | STORED);
schema_builder.add_text_field("title", TEXT | STORED);
schema_builder.add_text_field("summary", TEXT);
schema_builder.add_text_field("link", STRING | STORED);
schema_builder.add_date_field("date", FAST | INDEXED | STORED);
schema_builder.add_bool_field("is_read", FAST | INDEXED | STORED);
schema_builder.add_text_field("uid", STRING | STORED);
schema_builder.add_u64_field("id", FAST);
schema_builder.add_facet_field("tag", FacetOptions::default());
let schema = schema_builder.build();
Index::create_in_dir(tantivy_db_path, schema)?;
Ok(())
}
struct PostgresDoc {
site: String,
title: String,
summary: String,
link: String,
date: PrimitiveDateTime,
is_read: bool,
uid: String,
id: i32,
}