use std::collections::HashSet; use log::{debug, error, info, warn}; use sqlx::{postgres::PgPool, types::time::PrimitiveDateTime}; use tantivy::{ collector::{DocSetCollector, TopDocs}, doc, query, query::{AllQuery, BooleanQuery, Occur, QueryParser, TermQuery}, schema::{Facet, IndexRecordOption, Value}, DocAddress, Index, IndexReader, Searcher, TantivyDocument, TantivyError, Term, }; use tracing::{info_span, instrument, Instrument}; use crate::{ compute_offset_limit, error::ServerError, graphql::{Corpus, ThreadSummary}, newsreader::{extract_thread_id, is_newsreader_thread}, thread_summary_from_row, Query, ThreadSummaryRecord, }; pub fn is_tantivy_query(query: &Query) -> bool { query.is_tantivy || query.corpus == Some(Corpus::Tantivy) } pub struct TantivyConnection { db_path: String, index: Index, reader: IndexReader, } fn get_index(db_path: &str) -> Result { Ok(match Index::open_in_dir(db_path) { Ok(idx) => idx, Err(err) => { warn!("Failed to open {db_path}: {err}"); create_news_db(db_path)?; Index::open_in_dir(db_path)? } }) } impl TantivyConnection { pub fn new(tantivy_db_path: &str) -> Result { let index = get_index(tantivy_db_path)?; let reader = index.reader()?; Ok(TantivyConnection { db_path: tantivy_db_path.to_string(), index, reader, }) } #[instrument(name = "tantivy::refresh", skip_all)] pub async fn refresh(&self, pool: &PgPool) -> Result<(), ServerError> { let start_time = std::time::Instant::now(); let p_uids: Vec<_> = sqlx::query_file!("sql/all-uids.sql") .fetch_all(pool) .instrument(info_span!("postgres query")) .await? .into_iter() .map(|r| r.uid) .collect(); info!( "refresh from postgres got {} uids in {}", p_uids.len(), start_time.elapsed().as_secs_f32() ); let t_span = info_span!("tantivy query"); let _enter = t_span.enter(); let start_time = std::time::Instant::now(); let (searcher, _query) = self.searcher_and_query(&Query::default())?; let docs = searcher.search(&AllQuery, &DocSetCollector)?; let uid = self.index.schema().get_field("uid")?; let t_uids: Vec<_> = docs .into_iter() .map(|doc_address| { searcher .doc(doc_address) .map(|doc: TantivyDocument| { debug!("doc: {doc:#?}"); doc.get_first(uid) .expect("uid") .as_str() .expect("as_str") .to_string() }) .expect("searcher.doc") }) .collect(); drop(_enter); info!( "refresh tantivy got {} uids in {}", t_uids.len(), start_time.elapsed().as_secs_f32() ); let t_set: HashSet<_> = t_uids.into_iter().collect(); let need: Vec<_> = p_uids .into_iter() .filter(|uid| !t_set.contains(uid.as_str())) .collect(); if !need.is_empty() { info!( "need to reindex {} uids: {:?}...", need.len(), &need[..need.len().min(10)] ); } let batch_size = 1000; let uids: Vec<_> = need[..need.len().min(batch_size)] .into_iter() .cloned() .collect(); self.reindex_uids(pool, &uids).await } #[instrument(skip(self, pool))] async fn reindex_uids(&self, pool: &PgPool, uids: &[String]) -> Result<(), ServerError> { if uids.is_empty() { return Ok(()); } // TODO: add SlurpContents and convert HTML to text let pool: &PgPool = pool; let mut index_writer = self.index.writer(50_000_000)?; let schema = self.index.schema(); let site = schema.get_field("site")?; let title = schema.get_field("title")?; let summary = schema.get_field("summary")?; let link = schema.get_field("link")?; let date = schema.get_field("date")?; let is_read = schema.get_field("is_read")?; let uid = schema.get_field("uid")?; let id = schema.get_field("id")?; let tag = schema.get_field("tag")?; info!("reindexing {} posts", uids.len()); let rows = sqlx::query_file_as!(PostgresDoc, "sql/posts-from-uids.sql", uids) .fetch_all(pool) .await?; if uids.len() != rows.len() { error!( "Had {} uids and only got {} rows: uids {uids:?}", uids.len(), rows.len() ); } for r in rows { let id_term = Term::from_field_text(uid, &r.uid); index_writer.delete_term(id_term); let slug = r.site; let tag_facet = Facet::from(&format!("/News/{slug}")); index_writer.add_document(doc!( site => slug.clone(), title => r.title, // TODO: clean and extract text from HTML summary => r.summary, link => r.link, date => tantivy::DateTime::from_primitive(r.date), is_read => r.is_read, uid => r.uid, id => r.id as u64, tag => tag_facet, ))?; } info_span!("IndexWriter.commit").in_scope(|| index_writer.commit())?; info_span!("IndexReader.reload").in_scope(|| self.reader.reload())?; Ok(()) } #[instrument(name = "tantivy::reindex_thread", skip_all, fields(query=%query))] pub async fn reindex_thread(&self, pool: &PgPool, query: &Query) -> Result<(), ServerError> { let uids: Vec<_> = query .uids .iter() .filter(|uid| is_newsreader_thread(uid)) .map(|uid| extract_thread_id(uid).to_string()) .collect(); Ok(self.reindex_uids(pool, &uids).await?) } #[instrument(name = "tantivy::reindex_all", skip_all)] pub async fn reindex_all(&self, pool: &PgPool) -> Result<(), ServerError> { let rows = sqlx::query_file!("sql/all-posts.sql") .fetch_all(pool) .await?; let uids: Vec = rows.into_iter().map(|r| r.uid).collect(); self.reindex_uids(pool, &uids).await?; Ok(()) } fn searcher_and_query( &self, query: &Query, ) -> Result<(Searcher, Box), ServerError> { // TODO: only create one reader // From https://tantivy-search.github.io/examples/basic_search.html // "For a search server you will typically create one reader for the entire lifetime of // your program, and acquire a new searcher for every single request." // // I think there's some challenge in making the reader work if we reindex, so reader my // need to be stored indirectly, and be recreated on reindex // I think creating a reader takes 200-300 ms. let schema = self.index.schema(); let searcher = self.reader.searcher(); let title = schema.get_field("title")?; let summary = schema.get_field("summary")?; let query_parser = QueryParser::for_index(&self.index, vec![title, summary]); // Tantivy uses '*' to match all docs, not empty string let term = &query.remainder.join(" "); let term = if term.is_empty() { "*" } else { term }; info!("query_parser('{term}')"); let tantivy_query = query_parser.parse_query(&term)?; let tag = schema.get_field("tag")?; let is_read = schema.get_field("is_read")?; let mut terms = vec![(Occur::Must, tantivy_query)]; for t in &query.tags { let facet = Facet::from(&format!("/{t}")); let facet_term = Term::from_facet(tag, &facet); let facet_term_query = Box::new(TermQuery::new(facet_term, IndexRecordOption::Basic)); terms.push((Occur::Must, facet_term_query)); } if query.unread_only { info!("searching for unread only"); let term = Term::from_field_bool(is_read, false); terms.push(( Occur::Must, Box::new(TermQuery::new(term, IndexRecordOption::Basic)), )); } let search_query = BooleanQuery::new(terms); Ok((searcher, Box::new(search_query))) } #[instrument(name="tantivy::count", skip_all, fields(query=%query))] pub async fn count(&self, query: &Query) -> Result { if !is_tantivy_query(query) { return Ok(0); } info!("tantivy::count {query:?}"); use tantivy::collector::Count; let (searcher, query) = self.searcher_and_query(&query)?; Ok(searcher.search(&query, &Count)?) } #[instrument(name="tantivy::search", skip_all, fields(query=%query))] pub async fn search( &self, pool: &PgPool, after: Option, before: Option, first: Option, last: Option, query: &Query, ) -> Result, async_graphql::Error> { if !is_tantivy_query(query) { return Ok(Vec::new()); } let (offset, mut limit) = compute_offset_limit(after, before, first, last); if before.is_none() { // When searching forward, the +1 is to see if there are more pages of data available. // Searching backwards implies there's more pages forward, because the value represented by // `before` is on the next page. limit = limit + 1; } let (searcher, search_query) = self.searcher_and_query(&query)?; info!("Tantivy::search(query '{query:?}', off {offset}, lim {limit}, search_query {search_query:?})"); let top_docs = searcher.search( &search_query, &TopDocs::with_limit(limit as usize) .and_offset(offset as usize) .order_by_u64_field("date", tantivy::index::Order::Desc), )?; info!("search found {} docs", top_docs.len()); let uid = self.index.schema().get_field("uid")?; let uids = top_docs .into_iter() .map(|(_, doc_address): (u64, DocAddress)| { searcher.doc(doc_address).map(|doc: TantivyDocument| { debug!("doc: {doc:#?}"); doc.get_first(uid) .expect("doc missing uid") .as_str() .expect("doc str missing") .to_string() }) }) .collect::, TantivyError>>()?; //let uids = format!("'{}'", uids.join("','")); info!("uids {uids:?}"); let rows = sqlx::query_file!("sql/threads-from-uid.sql", &uids as &[String]) .fetch_all(pool) .await?; let mut res = Vec::new(); info!("found {} hits joining w/ tantivy", rows.len()); for (i, r) in rows.into_iter().enumerate() { res.push(( i as i32 + offset, thread_summary_from_row(ThreadSummaryRecord { site: r.site, date: r.date, is_read: r.is_read, title: r.title, uid: r.uid, name: r.name, corpus: Corpus::Tantivy, }) .await, )); } Ok(res) } pub fn drop_and_load_index(&self) -> Result<(), TantivyError> { create_news_db(&self.db_path) } } fn create_news_db(tantivy_db_path: &str) -> Result<(), TantivyError> { info!("create_news_db"); // Don't care if directory didn't exist let _ = std::fs::remove_dir_all(tantivy_db_path); std::fs::create_dir_all(tantivy_db_path)?; use tantivy::schema::*; let mut schema_builder = Schema::builder(); schema_builder.add_text_field("site", STRING | STORED); schema_builder.add_text_field("title", TEXT | STORED); schema_builder.add_text_field("summary", TEXT); schema_builder.add_text_field("link", STRING | STORED); schema_builder.add_date_field("date", FAST | INDEXED | STORED); schema_builder.add_bool_field("is_read", FAST | INDEXED | STORED); schema_builder.add_text_field("uid", STRING | STORED); schema_builder.add_u64_field("id", FAST); schema_builder.add_facet_field("tag", FacetOptions::default()); let schema = schema_builder.build(); Index::create_in_dir(tantivy_db_path, schema)?; Ok(()) } struct PostgresDoc { site: String, title: String, summary: String, link: String, date: PrimitiveDateTime, is_read: bool, uid: String, id: i32, }