354 lines
13 KiB
Rust
354 lines
13 KiB
Rust
use std::collections::HashSet;
|
|
|
|
use log::{debug, error, info, warn};
|
|
use sqlx::{postgres::PgPool, types::time::PrimitiveDateTime};
|
|
use tantivy::{
|
|
collector::{DocSetCollector, TopDocs},
|
|
doc, query,
|
|
query::{AllQuery, BooleanQuery, Occur, QueryParser, TermQuery},
|
|
schema::{Facet, IndexRecordOption, Value},
|
|
DocAddress, Index, IndexReader, Searcher, TantivyDocument, TantivyError, Term,
|
|
};
|
|
use tracing::{info_span, instrument, Instrument};
|
|
|
|
use crate::{
|
|
compute_offset_limit,
|
|
error::ServerError,
|
|
graphql::{Corpus, ThreadSummary},
|
|
newsreader::{extract_thread_id, is_newsreader_thread},
|
|
thread_summary_from_row, Query, ThreadSummaryRecord,
|
|
};
|
|
|
|
pub fn is_tantivy_query(query: &Query) -> bool {
|
|
query.is_tantivy || query.corpus == Some(Corpus::Tantivy)
|
|
}
|
|
pub struct TantivyConnection {
|
|
db_path: String,
|
|
index: Index,
|
|
reader: IndexReader,
|
|
}
|
|
|
|
fn get_index(db_path: &str) -> Result<Index, TantivyError> {
|
|
Ok(match Index::open_in_dir(db_path) {
|
|
Ok(idx) => idx,
|
|
Err(err) => {
|
|
warn!("Failed to open {db_path}: {err}");
|
|
create_news_db(db_path)?;
|
|
Index::open_in_dir(db_path)?
|
|
}
|
|
})
|
|
}
|
|
|
|
impl TantivyConnection {
|
|
pub fn new(tantivy_db_path: &str) -> Result<TantivyConnection, TantivyError> {
|
|
let index = get_index(tantivy_db_path)?;
|
|
let reader = index.reader()?;
|
|
|
|
Ok(TantivyConnection {
|
|
db_path: tantivy_db_path.to_string(),
|
|
index,
|
|
reader,
|
|
})
|
|
}
|
|
#[instrument(name = "tantivy::refresh", skip_all)]
|
|
pub async fn refresh(&self, pool: &PgPool) -> Result<(), ServerError> {
|
|
let start_time = std::time::Instant::now();
|
|
let p_uids: Vec<_> = sqlx::query_file!("sql/all-uids.sql")
|
|
.fetch_all(pool)
|
|
.instrument(info_span!("postgres query"))
|
|
.await?
|
|
.into_iter()
|
|
.map(|r| r.uid)
|
|
.collect();
|
|
info!(
|
|
"refresh from postgres got {} uids in {}",
|
|
p_uids.len(),
|
|
start_time.elapsed().as_secs_f32()
|
|
);
|
|
|
|
let t_span = info_span!("tantivy query");
|
|
let _enter = t_span.enter();
|
|
let start_time = std::time::Instant::now();
|
|
let (searcher, _query) = self.searcher_and_query(&Query::default())?;
|
|
let docs = searcher.search(&AllQuery, &DocSetCollector)?;
|
|
let uid = self.index.schema().get_field("uid")?;
|
|
let t_uids: Vec<_> = docs
|
|
.into_iter()
|
|
.map(|doc_address| {
|
|
searcher
|
|
.doc(doc_address)
|
|
.map(|doc: TantivyDocument| {
|
|
debug!("doc: {doc:#?}");
|
|
doc.get_first(uid)
|
|
.expect("uid")
|
|
.as_str()
|
|
.expect("as_str")
|
|
.to_string()
|
|
})
|
|
.expect("searcher.doc")
|
|
})
|
|
.collect();
|
|
drop(_enter);
|
|
|
|
info!(
|
|
"refresh tantivy got {} uids in {}",
|
|
t_uids.len(),
|
|
start_time.elapsed().as_secs_f32()
|
|
);
|
|
let t_set: HashSet<_> = t_uids.into_iter().collect();
|
|
let need: Vec<_> = p_uids
|
|
.into_iter()
|
|
.filter(|uid| !t_set.contains(uid.as_str()))
|
|
.collect();
|
|
if !need.is_empty() {
|
|
info!(
|
|
"need to reindex {} uids: {:?}...",
|
|
need.len(),
|
|
&need[..need.len().min(10)]
|
|
);
|
|
}
|
|
let batch_size = 1000;
|
|
let uids: Vec<_> = need[..need.len().min(batch_size)]
|
|
.into_iter()
|
|
.cloned()
|
|
.collect();
|
|
self.reindex_uids(pool, &uids).await
|
|
}
|
|
#[instrument(skip(self, pool))]
|
|
async fn reindex_uids(&self, pool: &PgPool, uids: &[String]) -> Result<(), ServerError> {
|
|
if uids.is_empty() {
|
|
return Ok(());
|
|
}
|
|
// TODO: add SlurpContents and convert HTML to text
|
|
|
|
let pool: &PgPool = pool;
|
|
|
|
let mut index_writer = self.index.writer(50_000_000)?;
|
|
let schema = self.index.schema();
|
|
let site = schema.get_field("site")?;
|
|
let title = schema.get_field("title")?;
|
|
let summary = schema.get_field("summary")?;
|
|
let link = schema.get_field("link")?;
|
|
let date = schema.get_field("date")?;
|
|
let is_read = schema.get_field("is_read")?;
|
|
let uid = schema.get_field("uid")?;
|
|
let id = schema.get_field("id")?;
|
|
let tag = schema.get_field("tag")?;
|
|
|
|
info!("reindexing {} posts", uids.len());
|
|
let rows = sqlx::query_file_as!(PostgresDoc, "sql/posts-from-uids.sql", uids)
|
|
.fetch_all(pool)
|
|
.await?;
|
|
|
|
if uids.len() != rows.len() {
|
|
error!(
|
|
"Had {} uids and only got {} rows: uids {uids:?}",
|
|
uids.len(),
|
|
rows.len()
|
|
);
|
|
}
|
|
for r in rows {
|
|
let id_term = Term::from_field_text(uid, &r.uid);
|
|
index_writer.delete_term(id_term);
|
|
let slug = r.site;
|
|
let tag_facet = Facet::from(&format!("/News/{slug}"));
|
|
index_writer.add_document(doc!(
|
|
site => slug.clone(),
|
|
title => r.title,
|
|
// TODO: clean and extract text from HTML
|
|
summary => r.summary,
|
|
link => r.link,
|
|
date => tantivy::DateTime::from_primitive(r.date),
|
|
is_read => r.is_read,
|
|
uid => r.uid,
|
|
id => r.id as u64,
|
|
tag => tag_facet,
|
|
))?;
|
|
}
|
|
|
|
info_span!("IndexWriter.commit").in_scope(|| index_writer.commit())?;
|
|
info_span!("IndexReader.reload").in_scope(|| self.reader.reload())?;
|
|
Ok(())
|
|
}
|
|
#[instrument(name = "tantivy::reindex_thread", skip_all, fields(query=%query))]
|
|
pub async fn reindex_thread(&self, pool: &PgPool, query: &Query) -> Result<(), ServerError> {
|
|
let uids: Vec<_> = query
|
|
.uids
|
|
.iter()
|
|
.filter(|uid| is_newsreader_thread(uid))
|
|
.map(|uid| extract_thread_id(uid).to_string())
|
|
.collect();
|
|
Ok(self.reindex_uids(pool, &uids).await?)
|
|
}
|
|
#[instrument(name = "tantivy::reindex_all", skip_all)]
|
|
pub async fn reindex_all(&self, pool: &PgPool) -> Result<(), ServerError> {
|
|
let rows = sqlx::query_file!("sql/all-posts.sql")
|
|
.fetch_all(pool)
|
|
.await?;
|
|
|
|
let uids: Vec<String> = rows.into_iter().map(|r| r.uid).collect();
|
|
self.reindex_uids(pool, &uids).await?;
|
|
Ok(())
|
|
}
|
|
fn searcher_and_query(
|
|
&self,
|
|
query: &Query,
|
|
) -> Result<(Searcher, Box<dyn query::Query>), ServerError> {
|
|
// TODO: only create one reader
|
|
// From https://tantivy-search.github.io/examples/basic_search.html
|
|
// "For a search server you will typically create one reader for the entire lifetime of
|
|
// your program, and acquire a new searcher for every single request."
|
|
//
|
|
// I think there's some challenge in making the reader work if we reindex, so reader my
|
|
// need to be stored indirectly, and be recreated on reindex
|
|
// I think creating a reader takes 200-300 ms.
|
|
let schema = self.index.schema();
|
|
let searcher = self.reader.searcher();
|
|
let title = schema.get_field("title")?;
|
|
let summary = schema.get_field("summary")?;
|
|
let query_parser = QueryParser::for_index(&self.index, vec![title, summary]);
|
|
// Tantivy uses '*' to match all docs, not empty string
|
|
let term = &query.remainder.join(" ");
|
|
let term = if term.is_empty() { "*" } else { term };
|
|
info!("query_parser('{term}')");
|
|
|
|
let tantivy_query = query_parser.parse_query(&term)?;
|
|
|
|
let tag = schema.get_field("tag")?;
|
|
let is_read = schema.get_field("is_read")?;
|
|
let mut terms = vec![(Occur::Must, tantivy_query)];
|
|
for t in &query.tags {
|
|
let facet = Facet::from(&format!("/{t}"));
|
|
let facet_term = Term::from_facet(tag, &facet);
|
|
let facet_term_query = Box::new(TermQuery::new(facet_term, IndexRecordOption::Basic));
|
|
terms.push((Occur::Must, facet_term_query));
|
|
}
|
|
if query.unread_only {
|
|
info!("searching for unread only");
|
|
let term = Term::from_field_bool(is_read, false);
|
|
terms.push((
|
|
Occur::Must,
|
|
Box::new(TermQuery::new(term, IndexRecordOption::Basic)),
|
|
));
|
|
}
|
|
let search_query = BooleanQuery::new(terms);
|
|
Ok((searcher, Box::new(search_query)))
|
|
}
|
|
|
|
#[instrument(name="tantivy::count", skip_all, fields(query=%query))]
|
|
pub async fn count(&self, query: &Query) -> Result<usize, ServerError> {
|
|
if !is_tantivy_query(query) {
|
|
return Ok(0);
|
|
}
|
|
info!("tantivy::count {query:?}");
|
|
use tantivy::collector::Count;
|
|
let (searcher, query) = self.searcher_and_query(&query)?;
|
|
Ok(searcher.search(&query, &Count)?)
|
|
}
|
|
#[instrument(name="tantivy::search", skip_all, fields(query=%query))]
|
|
pub async fn search(
|
|
&self,
|
|
pool: &PgPool,
|
|
after: Option<i32>,
|
|
before: Option<i32>,
|
|
first: Option<i32>,
|
|
last: Option<i32>,
|
|
query: &Query,
|
|
) -> Result<Vec<(i32, ThreadSummary)>, async_graphql::Error> {
|
|
if !is_tantivy_query(query) {
|
|
return Ok(Vec::new());
|
|
}
|
|
let (offset, mut limit) = compute_offset_limit(after, before, first, last);
|
|
if before.is_none() {
|
|
// When searching forward, the +1 is to see if there are more pages of data available.
|
|
// Searching backwards implies there's more pages forward, because the value represented by
|
|
// `before` is on the next page.
|
|
limit = limit + 1;
|
|
}
|
|
|
|
let (searcher, search_query) = self.searcher_and_query(&query)?;
|
|
info!("Tantivy::search(query '{query:?}', off {offset}, lim {limit}, search_query {search_query:?})");
|
|
let top_docs = searcher.search(
|
|
&search_query,
|
|
&TopDocs::with_limit(limit as usize)
|
|
.and_offset(offset as usize)
|
|
.order_by_u64_field("date", tantivy::index::Order::Desc),
|
|
)?;
|
|
info!("search found {} docs", top_docs.len());
|
|
let uid = self.index.schema().get_field("uid")?;
|
|
let uids = top_docs
|
|
.into_iter()
|
|
.map(|(_, doc_address): (u64, DocAddress)| {
|
|
searcher.doc(doc_address).map(|doc: TantivyDocument| {
|
|
debug!("doc: {doc:#?}");
|
|
doc.get_first(uid)
|
|
.expect("doc missing uid")
|
|
.as_str()
|
|
.expect("doc str missing")
|
|
.to_string()
|
|
})
|
|
})
|
|
.collect::<Result<Vec<String>, TantivyError>>()?;
|
|
|
|
//let uids = format!("'{}'", uids.join("','"));
|
|
info!("uids {uids:?}");
|
|
let rows = sqlx::query_file!("sql/threads-from-uid.sql", &uids as &[String])
|
|
.fetch_all(pool)
|
|
.await?;
|
|
let mut res = Vec::new();
|
|
info!("found {} hits joining w/ tantivy", rows.len());
|
|
for (i, r) in rows.into_iter().enumerate() {
|
|
res.push((
|
|
i as i32 + offset,
|
|
thread_summary_from_row(ThreadSummaryRecord {
|
|
site: r.site,
|
|
date: r.date,
|
|
is_read: r.is_read,
|
|
title: r.title,
|
|
uid: r.uid,
|
|
name: r.name,
|
|
corpus: Corpus::Tantivy,
|
|
})
|
|
.await,
|
|
));
|
|
}
|
|
Ok(res)
|
|
}
|
|
pub fn drop_and_load_index(&self) -> Result<(), TantivyError> {
|
|
create_news_db(&self.db_path)
|
|
}
|
|
}
|
|
|
|
fn create_news_db(tantivy_db_path: &str) -> Result<(), TantivyError> {
|
|
info!("create_news_db");
|
|
// Don't care if directory didn't exist
|
|
let _ = std::fs::remove_dir_all(tantivy_db_path);
|
|
std::fs::create_dir_all(tantivy_db_path)?;
|
|
use tantivy::schema::*;
|
|
let mut schema_builder = Schema::builder();
|
|
schema_builder.add_text_field("site", STRING | STORED);
|
|
schema_builder.add_text_field("title", TEXT | STORED);
|
|
schema_builder.add_text_field("summary", TEXT);
|
|
schema_builder.add_text_field("link", STRING | STORED);
|
|
schema_builder.add_date_field("date", FAST | INDEXED | STORED);
|
|
schema_builder.add_bool_field("is_read", FAST | INDEXED | STORED);
|
|
schema_builder.add_text_field("uid", STRING | STORED);
|
|
schema_builder.add_u64_field("id", FAST);
|
|
schema_builder.add_facet_field("tag", FacetOptions::default());
|
|
|
|
let schema = schema_builder.build();
|
|
Index::create_in_dir(tantivy_db_path, schema)?;
|
|
Ok(())
|
|
}
|
|
|
|
struct PostgresDoc {
|
|
site: String,
|
|
title: String,
|
|
summary: String,
|
|
link: String,
|
|
date: PrimitiveDateTime,
|
|
is_read: bool,
|
|
uid: String,
|
|
id: i32,
|
|
}
|