web & server: using tantivy for news post search

This commit is contained in:
2024-09-29 16:28:05 -07:00
parent f36d1e0c29
commit 3ec1741f10
22 changed files with 737 additions and 170 deletions

View File

@@ -1,11 +1,26 @@
use log::info;
use sqlx::postgres::PgPool;
use tantivy::{schema::Value, Index, TantivyError};
use std::collections::HashSet;
use crate::{
error::ServerError, graphql::ThreadSummary, thread_summary_from_row, Query, ThreadSummaryRecord,
use log::{debug, error, info};
use sqlx::{postgres::PgPool, types::time::PrimitiveDateTime};
use tantivy::{
collector::{DocSetCollector, TopDocs},
query,
query::{AllQuery, BooleanQuery, Occur, QueryParser, TermQuery},
schema::{Facet, IndexRecordOption, Value},
DocAddress, Index, Searcher, TantivyDocument, TantivyError, Term,
};
use crate::{
compute_offset_limit,
error::ServerError,
graphql::{Corpus, ThreadSummary},
newsreader::{extract_thread_id, is_newsreader_thread},
thread_summary_from_row, Query, ThreadSummaryRecord,
};
pub fn is_tantivy_query(query: &Query) -> bool {
query.is_tantivy || query.corpus == Some(Corpus::Tantivy)
}
pub struct TantivyConnection {
db_path: String,
//index: Index,
@@ -27,7 +42,67 @@ impl TantivyConnection {
db_path: tantivy_db_path.to_string(),
})
}
pub async fn reindex(&self, pool: &PgPool) -> Result<(), ServerError> {
pub async fn refresh(&self, pool: &PgPool) -> Result<(), ServerError> {
let start_time = std::time::Instant::now();
let p_uids: Vec<_> = sqlx::query_file!("sql/all-uids.sql")
.fetch_all(pool)
.await?
.into_iter()
.map(|r| r.uid)
.collect();
info!(
"refresh from postgres got {} uids in {}",
p_uids.len(),
start_time.elapsed().as_secs_f32()
);
let start_time = std::time::Instant::now();
let (searcher, _query) = self.searcher_and_query("")?;
let docs = searcher.search(&AllQuery, &DocSetCollector)?;
let uid = self.get_index()?.schema().get_field("uid")?;
let t_uids: Vec<_> = docs
.into_iter()
.map(|doc_address| {
searcher
.doc(doc_address)
.map(|doc: TantivyDocument| {
debug!("doc: {doc:#?}");
doc.get_first(uid)
.expect("uid")
.as_str()
.expect("as_str")
.to_string()
})
.expect("searcher.doc")
})
.collect();
info!(
"refresh tantivy got {} uids in {}",
t_uids.len(),
start_time.elapsed().as_secs_f32()
);
let t_set: HashSet<_> = t_uids.into_iter().collect();
let need: Vec<_> = p_uids
.into_iter()
.filter(|uid| !t_set.contains(uid.as_str()))
.collect();
if !need.is_empty() {
info!(
"need to reindex {} uids: {:?}...",
need.len(),
&need[..need.len().min(10)]
);
}
let batch_size = 1000;
let uids: Vec<_> = need[..need.len().min(batch_size)]
.into_iter()
.cloned()
.collect();
self.reindex_uids(pool, &uids).await
}
async fn reindex_uids(&self, pool: &PgPool, uids: &[String]) -> Result<(), ServerError> {
// TODO: add SlurpContents and convert HTML to text
use tantivy::{doc, Term};
let start_time = std::time::Instant::now();
@@ -44,11 +119,20 @@ impl TantivyConnection {
let is_read = schema.get_field("is_read")?;
let uid = schema.get_field("uid")?;
let id = schema.get_field("id")?;
let tag = schema.get_field("tag")?;
let rows = sqlx::query_file!("sql/all-posts.sql")
info!("reindexing {} posts", uids.len());
let rows = sqlx::query_file_as!(PostgresDoc, "sql/posts-from-uids.sql", uids)
.fetch_all(pool)
.await?;
if uids.len() != rows.len() {
error!(
"Had {} uids and only got {} rows: uids {uids:?}",
uids.len(),
rows.len()
);
}
let total = rows.len();
for (i, r) in rows.into_iter().enumerate() {
if i % 10_000 == 0 {
@@ -57,26 +141,76 @@ impl TantivyConnection {
start_time.elapsed().as_secs_f32()
);
}
let id_term = Term::from_field_text(uid, &r.uid);
index_writer.delete_term(id_term);
let slug = r.site;
let tag_facet = Facet::from(&format!("/News/{slug}"));
index_writer.add_document(doc!(
site => r.site.expect("UNKOWN_SITE"),
title => r.title.expect("UNKOWN_TITLE"),
site => slug.clone(),
title => r.title,
// TODO: clean and extract text from HTML
summary => r.summary.expect("UNKNOWN_SUMMARY"),
link => r.link.expect("link"),
date => tantivy::DateTime::from_primitive(r.date.expect("date")),
is_read => r.is_read.expect("is_read"),
summary => r.summary,
link => r.link,
date => tantivy::DateTime::from_primitive(r.date),
is_read => r.is_read,
uid => r.uid,
id => r.id as i64,
id => r.id as u64,
tag => tag_facet,
))?;
}
index_writer.commit()?;
info!("took {:.2}s to reindex", start_time.elapsed().as_secs_f32());
index_writer.commit()?;
Ok(())
}
pub async fn reindex_thread(&self, pool: &PgPool, query: &Query) -> Result<(), ServerError> {
let uids: Vec<_> = query
.uids
.iter()
.filter(|uid| is_newsreader_thread(uid))
.map(|uid| extract_thread_id(uid).to_string())
.collect();
Ok(self.reindex_uids(pool, &uids).await?)
}
pub async fn reindex_all(&self, pool: &PgPool) -> Result<(), ServerError> {
let rows = sqlx::query_file!("sql/all-posts.sql")
.fetch_all(pool)
.await?;
let uids: Vec<String> = rows.into_iter().map(|r| r.uid).collect();
self.reindex_uids(pool, &uids).await?;
Ok(())
}
fn searcher_and_query(
&self,
term: &str,
) -> Result<(Searcher, Box<dyn query::Query>), ServerError> {
let index = self.get_index()?;
let reader = index.reader()?;
let schema = index.schema();
let searcher = reader.searcher();
let title = schema.get_field("title")?;
let summary = schema.get_field("summary")?;
let query_parser = QueryParser::for_index(&index, vec![title, summary]);
// Tantivy uses '*' to match all docs, not empty string
let term = if term.is_empty() { "*" } else { term };
info!("query_parser('{term}')");
let query = query_parser.parse_query(&term)?;
Ok((searcher, query))
}
pub async fn count(&self, query: &Query) -> Result<usize, ServerError> {
if !is_tantivy_query(query) {
return Ok(0);
}
use tantivy::collector::Count;
let term = query.remainder.join(" ");
let (searcher, query) = self.searcher_and_query(&term)?;
Ok(searcher.search(&query, &Count)?)
}
pub async fn search(
&self,
pool: &PgPool,
@@ -86,28 +220,51 @@ impl TantivyConnection {
last: Option<i32>,
query: &Query,
) -> Result<Vec<(i32, ThreadSummary)>, async_graphql::Error> {
use tantivy::{collector::TopDocs, query::QueryParser, Document, TantivyDocument};
// TODO: set based on function parameters
let offset = 0;
if !is_tantivy_query(query) {
return Ok(Vec::new());
}
let (offset, mut limit) = compute_offset_limit(after, before, first, last);
if before.is_none() {
// When searching forward, the +1 is to see if there are more pages of data available.
// Searching backwards implies there's more pages forward, because the value represented by
// `before` is on the next page.
limit = limit + 1;
}
let index = self.get_index()?;
let reader = index.reader()?;
let schema = index.schema();
let searcher = reader.searcher();
let site = schema.get_field("site")?;
let uid = schema.get_field("uid")?;
let title = schema.get_field("title")?;
let summary = schema.get_field("summary")?;
let date = schema.get_field("date")?;
let query_parser = QueryParser::for_index(&index, vec![title, summary]);
let query = query_parser.parse_query(&query.remainder.join(" "))?;
let top_docs = searcher.search(&query, &TopDocs::with_limit(10))?;
let term = query.remainder.join(" ");
let (searcher, tantivy_query) = self.searcher_and_query(&term)?;
let tag = self.get_index()?.schema().get_field("tag")?;
let is_read = self.get_index()?.schema().get_field("is_read")?;
let mut terms = vec![(Occur::Must, tantivy_query)];
for t in &query.tags {
let facet = Facet::from(&format!("/{t}"));
let facet_term = Term::from_facet(tag, &facet);
let facet_term_query = Box::new(TermQuery::new(facet_term, IndexRecordOption::Basic));
terms.push((Occur::Must, facet_term_query));
}
if query.unread_only {
info!("searching for unread only");
let term = Term::from_field_bool(is_read, false);
terms.push((
Occur::Must,
Box::new(TermQuery::new(term, IndexRecordOption::Basic)),
));
}
let search_query = BooleanQuery::new(terms);
info!("Tantivy::search(term '{term}', off {offset}, lim {limit}, search_query {search_query:?})");
let top_docs = searcher.search(
&search_query,
&TopDocs::with_limit(limit as usize)
.and_offset(offset as usize)
.order_by_u64_field("date", tantivy::index::Order::Desc),
)?;
info!("search found {} docs", top_docs.len());
let uid = self.get_index()?.schema().get_field("uid")?;
let uids = top_docs
.into_iter()
.map(|(_, doc_address)| {
.map(|(_, doc_address): (u64, DocAddress)| {
searcher.doc(doc_address).map(|doc: TantivyDocument| {
debug!("doc: {doc:#?}");
doc.get_first(uid)
.expect("doc missing uid")
.as_str()
@@ -134,6 +291,7 @@ impl TantivyConnection {
title: r.title,
uid: r.uid,
name: r.name,
corpus: Corpus::Tantivy,
})
.await,
));
@@ -157,11 +315,23 @@ fn create_news_db(tantivy_db_path: &str) -> Result<(), TantivyError> {
schema_builder.add_text_field("summary", TEXT);
schema_builder.add_text_field("link", STRING | STORED);
schema_builder.add_date_field("date", FAST | INDEXED | STORED);
schema_builder.add_bool_field("is_read", FAST);
schema_builder.add_bool_field("is_read", FAST | INDEXED | STORED);
schema_builder.add_text_field("uid", STRING | STORED);
schema_builder.add_i64_field("id", FAST);
schema_builder.add_u64_field("id", FAST);
schema_builder.add_facet_field("tag", FacetOptions::default());
let schema = schema_builder.build();
Index::create_in_dir(tantivy_db_path, schema)?;
Ok(())
}
struct PostgresDoc {
site: String,
title: String,
summary: String,
link: String,
date: PrimitiveDateTime,
is_read: bool,
uid: String,
id: i32,
}