From ebf32a99053e9924da29dc447f6984cce9312be0 Mon Sep 17 00:00:00 2001 From: Bill Thiede Date: Sat, 28 Sep 2024 11:17:52 -0700 Subject: [PATCH] server: WIP tantivy integration --- server/sql/all-posts.sql | 1 + server/sql/threads-from-uid.sql | 13 ++++ server/src/bin/server.rs | 27 +------- server/src/graphql.rs | 49 ++++++++++++- server/src/lib.rs | 61 ++++++++++++++++- server/src/newsreader.rs | 75 +++++++------------- server/src/tantivy.rs | 99 +++++++++++++++++++++------ server/static/graphql-playground.html | 59 ++++++++++++++++ 8 files changed, 285 insertions(+), 99 deletions(-) create mode 100644 server/sql/threads-from-uid.sql create mode 100644 server/static/graphql-playground.html diff --git a/server/sql/all-posts.sql b/server/sql/all-posts.sql index ea70efd..47e4c0f 100644 --- a/server/sql/all-posts.sql +++ b/server/sql/all-posts.sql @@ -8,3 +8,4 @@ SELECT uid, id FROM post +WHERE title ILIKE '%grapheme%' OR summary ILIKE '%grapheme%'; diff --git a/server/sql/threads-from-uid.sql b/server/sql/threads-from-uid.sql new file mode 100644 index 0000000..ba2eebc --- /dev/null +++ b/server/sql/threads-from-uid.sql @@ -0,0 +1,13 @@ +SELECT + site, + date, + is_read, + title, + uid, + name +FROM + post p + JOIN feed f ON p.site = f.slug +WHERE + uid = ANY ($1) +; diff --git a/server/src/bin/server.rs b/server/src/bin/server.rs index 3406a6f..e09ba9d 100644 --- a/server/src/bin/server.rs +++ b/server/src/bin/server.rs @@ -166,21 +166,6 @@ fn graphiql() -> content::RawHtml { content::RawHtml(GraphiQLSource::build().endpoint("/api/graphql").finish()) } -#[rocket::post("/reindex-news-db")] -async fn reindex_news_db( - pool: &State, - tantivy_conn: &State, -) -> Result> { - tantivy_conn.reindex(pool).await?; - Ok(format!("Reindexed tantivy\n")) -} - -#[rocket::get("/search-news-db")] -fn search_news_db(tantivy_conn: &State) -> Result> { - let res = tantivy_conn.search().map_err(ServerError::from)?; - Ok(format!("{}", res)) -} - #[rocket::get("/graphql?")] async fn graphql_query(schema: &State, query: GraphQLQuery) -> GraphQLResponse { query.execute(schema.inner()).await @@ -223,8 +208,6 @@ async fn main() -> Result<(), Box> { .mount( shared::urls::MOUNT_POINT, routes![ - reindex_news_db, - search_news_db, original, refresh, show_pretty, @@ -246,21 +229,17 @@ async fn main() -> Result<(), Box> { std::fs::create_dir_all(&config.slurp_cache_path)?; } let pool = PgPool::connect(&config.newsreader_database_url).await?; - let tantivy_conn = - TantivyConnection::new(&config.newsreader_tantivy_db_path)?; + let tantivy_conn = TantivyConnection::new(&config.newsreader_tantivy_db_path)?; let schema = Schema::build(QueryRoot, Mutation, EmptySubscription) .data(Notmuch::default()) .data(config) .data(pool.clone()) + .data(tantivy_conn) .extension(async_graphql::extensions::Logger) .finish(); - let rkt = rkt - .manage(schema) - .manage(pool) - .manage(Notmuch::default()) - .manage(tantivy_conn); + let rkt = rkt.manage(schema).manage(pool).manage(Notmuch::default()); //.manage(Notmuch::with_config("../notmuch/testdata/notmuch.config")) rkt.launch().await?; diff --git a/server/src/graphql.rs b/server/src/graphql.rs index a9ae478..10d8267 100644 --- a/server/src/graphql.rs +++ b/server/src/graphql.rs @@ -8,7 +8,7 @@ use notmuch::Notmuch; use serde::{Deserialize, Serialize}; use sqlx::postgres::PgPool; -use crate::{config::Config, newsreader, nm, Query}; +use crate::{config::Config, newsreader, nm, tantivy::TantivyConnection, Query}; /// # Number of seconds since the Epoch pub type UnixTime = isize; @@ -224,6 +224,7 @@ pub struct Tag { struct SearchCursor { newsreader_offset: i32, notmuch_offset: i32, + tantivy_offset: i32, } pub struct QueryRoot; @@ -258,10 +259,13 @@ impl QueryRoot { info!("search({after:?} {before:?} {first:?} {last:?} {query:?})",); let nm = ctx.data_unchecked::(); let pool = ctx.data_unchecked::(); + let tantivy = ctx.data_unchecked::(); + #[derive(Debug)] enum ThreadSummaryCursor { Newsreader(i32, ThreadSummary), Notmuch(i32, ThreadSummary), + Tantivy(i32, ThreadSummary), } Ok(connection::query( after, @@ -279,8 +283,11 @@ impl QueryRoot { ); let newsreader_after = after.as_ref().map(|sc| sc.newsreader_offset); let notmuch_after = after.as_ref().map(|sc| sc.notmuch_offset); + let tantivy_after = after.as_ref().map(|sc| sc.tantivy_offset); + let newsreader_before = before.as_ref().map(|sc| sc.newsreader_offset); let notmuch_before = before.as_ref().map(|sc| sc.notmuch_offset); + let tantivy_before = before.as_ref().map(|sc| sc.tantivy_offset); let newsreader_query: Query = query.parse()?; info!("newsreader_query {newsreader_query:?}"); @@ -318,15 +325,39 @@ impl QueryRoot { Vec::new() }; + let tantivy_results = if newsreader_query.is_tantivy { + tantivy + .search( + pool, + tantivy_after, + tantivy_before, + first.map(|v| v as i32), + last.map(|v| v as i32), + &newsreader_query, + ) + .await? + .into_iter() + .map(|(cur, ts)| ThreadSummaryCursor::Tantivy(cur, ts)) + .collect() + } else { + Vec::new() + }; + + info!( + "tantivy results:\nis_tantivy:{} {tantivy_results:#?}", + newsreader_query.is_tantivy + ); let mut results: Vec<_> = newsreader_results .into_iter() .chain(notmuch_results) + .chain(tantivy_results) .collect(); // The leading '-' is to reverse sort results.sort_by_key(|item| match item { ThreadSummaryCursor::Newsreader(_, ts) => -ts.timestamp, ThreadSummaryCursor::Notmuch(_, ts) => -ts.timestamp, + ThreadSummaryCursor::Tantivy(_, ts) => -ts.timestamp, }); let mut has_next_page = before.is_some(); @@ -348,6 +379,7 @@ impl QueryRoot { let mut connection = Connection::new(has_previous_page, has_next_page); let mut newsreader_offset = 0; let mut notmuch_offset = 0; + let mut tantivy_offset = 0; connection.edges.extend(results.into_iter().map(|item| { let thread_summary; @@ -360,10 +392,15 @@ impl QueryRoot { thread_summary = ts; notmuch_offset = offset; } + ThreadSummaryCursor::Tantivy(offset, ts) => { + thread_summary = ts; + tantivy_offset = offset; + } } let cur = OpaqueCursor(SearchCursor { newsreader_offset, notmuch_offset, + tantivy_offset, }); Edge::new(cur, thread_summary) })); @@ -443,6 +480,16 @@ impl Mutation { nm.tag_remove(&tag, &query)?; Ok(true) } + /// Drop and recreate tantivy index. Warning this is slow + async fn drop_and_load_index<'ctx>(&self, ctx: &Context<'ctx>) -> Result { + let tantivy = ctx.data_unchecked::(); + let pool = ctx.data_unchecked::(); + + tantivy.drop_and_load_index()?; + tantivy.reindex(pool).await?; + + Ok(true) + } } pub type GraphqlSchema = Schema; diff --git a/server/src/lib.rs b/server/src/lib.rs index 816ccd0..19692a2 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -18,11 +18,19 @@ use lol_html::{ }; use maplit::{hashmap, hashset}; use scraper::{Html, Selector}; +use sqlx::{postgres::PgPool, types::time::PrimitiveDateTime}; use thiserror::Error; use tokio::sync::Mutex; use url::Url; -use crate::newsreader::{extract_thread_id, is_newsreader_thread}; +use crate::{ + error::ServerError, + graphql::ThreadSummary, + newsreader::{extract_thread_id, is_newsreader_thread}, +}; + +const NEWSREADER_TAG_PREFIX: &'static str = "News/"; +const NEWSREADER_THREAD_PREFIX: &'static str = "news:"; // TODO: figure out how to use Cow #[async_trait] @@ -604,6 +612,7 @@ pub struct Query { pub remainder: Vec, pub is_notmuch: bool, pub is_newsreader: bool, + pub is_tantivy: bool, } impl Query { @@ -638,6 +647,7 @@ impl FromStr for Query { let mut remainder = Vec::new(); let mut is_notmuch = false; let mut is_newsreader = false; + let mut is_tantivy = false; for word in s.split_whitespace() { if word == "is:unread" { unread_only = true @@ -664,6 +674,8 @@ impl FromStr for Query { is_newsreader = true; is_notmuch = true; } + // TODO: decide if tantivy gets it's own life or replaces newsreader + is_tantivy = is_newsreader; Ok(Query { unread_only, tag, @@ -671,6 +683,53 @@ impl FromStr for Query { remainder, is_notmuch, is_newsreader, + is_tantivy, }) } } +pub struct ThreadSummaryRecord { + pub site: Option, + pub date: Option, + pub is_read: Option, + pub title: Option, + pub uid: String, + pub name: Option, +} + +async fn thread_summary_from_row(r: ThreadSummaryRecord) -> ThreadSummary { + let site = r.site.unwrap_or("UNKOWN TAG".to_string()); + let mut tags = vec![format!("{NEWSREADER_TAG_PREFIX}{site}")]; + if !r.is_read.unwrap_or(true) { + tags.push("unread".to_string()); + }; + let mut title = r.title.unwrap_or("NO TITLE".to_string()); + title = clean_title(&title).await.expect("failed to clean title"); + ThreadSummary { + thread: format!("{NEWSREADER_THREAD_PREFIX}{}", r.uid), + timestamp: r + .date + .expect("post missing date") + .assume_utc() + .unix_timestamp() as isize, + date_relative: "TODO date_relative".to_string(), + matched: 0, + total: 1, + authors: r.name.unwrap_or_else(|| site.clone()), + subject: title, + tags, + } +} +async fn clean_title(title: &str) -> Result { + // Make title HTML so html parsers work + let mut title = format!("{title}"); + let title_tranformers: Vec> = + vec![Box::new(EscapeHtml), Box::new(StripHtml)]; + // Make title HTML so html parsers work + title = format!("{title}"); + for t in title_tranformers.iter() { + if t.should_run(&None, &title) { + title = t.transform(&None, &title).await?; + } + } + Ok(title) +} diff --git a/server/src/newsreader.rs b/server/src/newsreader.rs index 5dc8c53..0bf1461 100644 --- a/server/src/newsreader.rs +++ b/server/src/newsreader.rs @@ -10,35 +10,33 @@ use tokio::sync::Mutex; use url::Url; use crate::{ - compute_offset_limit, + clean_title, compute_offset_limit, config::Config, error::ServerError, graphql::{NewsPost, Tag, Thread, ThreadSummary}, - AddOutlink, EscapeHtml, FrameImages, InlineStyle, Query, SanitizeHtml, SlurpContents, - StripHtml, Transformer, + thread_summary_from_row, AddOutlink, EscapeHtml, FrameImages, InlineStyle, Query, SanitizeHtml, + SlurpContents, StripHtml, ThreadSummaryRecord, Transformer, NEWSREADER_TAG_PREFIX, + NEWSREADER_THREAD_PREFIX, }; -const TAG_PREFIX: &'static str = "News/"; -const THREAD_PREFIX: &'static str = "news:"; - pub fn is_newsreader_search(query: &str) -> bool { - query.contains(TAG_PREFIX) + query.contains(NEWSREADER_TAG_PREFIX) } pub fn is_newsreader_thread(query: &str) -> bool { - query.starts_with(THREAD_PREFIX) + query.starts_with(NEWSREADER_THREAD_PREFIX) } pub fn extract_thread_id(query: &str) -> &str { - &query[THREAD_PREFIX.len()..] + &query[NEWSREADER_THREAD_PREFIX.len()..] } pub fn extract_site(tag: &str) -> &str { - &tag[TAG_PREFIX.len()..] + &tag[NEWSREADER_TAG_PREFIX.len()..] } pub fn make_news_tag(tag: &str) -> String { - format!("tag:{TAG_PREFIX}{tag}") + format!("tag:{NEWSREADER_TAG_PREFIX}{tag}") } pub async fn count(pool: &PgPool, query: &Query) -> Result { @@ -93,37 +91,23 @@ pub async fn search( ) .fetch_all(pool) .await?; - let mut res = Vec::new(); for (i, r) in rows.into_iter().enumerate() { - let site = r.site.unwrap_or("UNKOWN TAG".to_string()); - let mut tags = vec![format!("{TAG_PREFIX}{site}")]; - if !r.is_read.unwrap_or(true) { - tags.push("unread".to_string()); - }; - let mut title = r.title.unwrap_or("NO TITLE".to_string()); - title = clean_title(&title).await.expect("failed to clean title"); res.push(( i as i32 + offset, - ThreadSummary { - thread: format!("{THREAD_PREFIX}{}", r.uid), - timestamp: r - .date - .expect("post missing date") - .assume_utc() - .unix_timestamp() as isize, - date_relative: "TODO date_relative".to_string(), - matched: 0, - total: 1, - authors: r.name.unwrap_or_else(|| site.clone()), - subject: title, - tags, - }, + thread_summary_from_row(ThreadSummaryRecord { + site: r.site, + date: r.date, + is_read: r.is_read, + title: r.title, + uid: r.uid, + name: r.name, + }) + .await, )); } Ok(res) } - pub async fn tags(pool: &PgPool, _needs_unread: bool) -> Result, ServerError> { // TODO: optimize query by using needs_unread let tags = sqlx::query_file!("sql/tags.sql").fetch_all(pool).await?; @@ -131,7 +115,10 @@ pub async fn tags(pool: &PgPool, _needs_unread: bool) -> Result, Server .into_iter() .map(|tag| { let unread = tag.unread.unwrap_or(0).try_into().unwrap_or(0); - let name = format!("{TAG_PREFIX}{}", tag.site.expect("tag must have site")); + let name = format!( + "{NEWSREADER_TAG_PREFIX}{}", + tag.site.expect("tag must have site") + ); let hex = compute_color(&name); Tag { name, @@ -150,8 +137,8 @@ pub async fn thread( thread_id: String, ) -> Result { let id = thread_id - .strip_prefix(THREAD_PREFIX) - .expect("news thread doesn't start with '{THREAD_PREFIX}'") + .strip_prefix(NEWSREADER_THREAD_PREFIX) + .expect("news thread doesn't start with '{NEWSREADER_THREAD_PREFIX}'") .to_string(); let r = sqlx::query_file!("sql/thread.sql", id) @@ -265,17 +252,3 @@ pub async fn set_read_status<'ctx>( .await?; Ok(true) } -async fn clean_title(title: &str) -> Result { - // Make title HTML so html parsers work - let mut title = format!("{title}"); - let title_tranformers: Vec> = - vec![Box::new(EscapeHtml), Box::new(StripHtml)]; - // Make title HTML so html parsers work - title = format!("{title}"); - for t in title_tranformers.iter() { - if t.should_run(&None, &title) { - title = t.transform(&None, &title).await?; - } - } - Ok(title) -} diff --git a/server/src/tantivy.rs b/server/src/tantivy.rs index 031d11d..e46391c 100644 --- a/server/src/tantivy.rs +++ b/server/src/tantivy.rs @@ -1,23 +1,31 @@ use log::info; use sqlx::postgres::PgPool; -use tantivy::{Index, IndexWriter, TantivyError}; +use tantivy::{schema::Value, Index, TantivyError}; -use crate::error::ServerError; +use crate::{ + error::ServerError, graphql::ThreadSummary, thread_summary_from_row, Query, ThreadSummaryRecord, +}; pub struct TantivyConnection { - index: Index, + db_path: String, + //index: Index, } impl TantivyConnection { - pub fn new(tantivy_db_path: &str) -> Result { - let index = match Index::open_in_dir(tantivy_db_path) { + fn get_index(&self) -> Result { + Ok(match Index::open_in_dir(&self.db_path) { Ok(idx) => idx, Err(_) => { - create_news_db(tantivy_db_path)?; - Index::open_in_dir(tantivy_db_path)? + create_news_db(&self.db_path)?; + Index::open_in_dir(&self.db_path)? } - }; - Ok(TantivyConnection { index }) + }) + } + + pub fn new(tantivy_db_path: &str) -> Result { + Ok(TantivyConnection { + db_path: tantivy_db_path.to_string(), + }) } pub async fn reindex(&self, pool: &PgPool) -> Result<(), ServerError> { use tantivy::{doc, Term}; @@ -25,8 +33,9 @@ impl TantivyConnection { let start_time = std::time::Instant::now(); let pool: &PgPool = pool; - let mut index_writer = self.index.writer(50_000_000)?; - let schema = self.index.schema(); + let index = self.get_index()?; + let mut index_writer = index.writer(50_000_000)?; + let schema = index.schema(); let site = schema.get_field("site")?; let title = schema.get_field("title")?; let summary = schema.get_field("summary")?; @@ -68,30 +77,76 @@ impl TantivyConnection { info!("took {:.2}s to reindex", start_time.elapsed().as_secs_f32()); Ok(()) } - pub fn search(&self) -> Result { + pub async fn search( + &self, + pool: &PgPool, + after: Option, + before: Option, + first: Option, + last: Option, + query: &Query, + ) -> Result, async_graphql::Error> { use tantivy::{collector::TopDocs, query::QueryParser, Document, TantivyDocument}; + // TODO: set based on function parameters + let offset = 0; - let reader = self.index.reader()?; - let schema = self.index.schema(); + let index = self.get_index()?; + let reader = index.reader()?; + let schema = index.schema(); let searcher = reader.searcher(); let site = schema.get_field("site")?; + let uid = schema.get_field("uid")?; let title = schema.get_field("title")?; let summary = schema.get_field("summary")?; - let query_parser = QueryParser::for_index(&self.index, vec![site, title, summary]); + let date = schema.get_field("date")?; + let query_parser = QueryParser::for_index(&index, vec![title, summary]); - let query = query_parser.parse_query("grapheme")?; + let query = query_parser.parse_query(&query.remainder.join(" "))?; let top_docs = searcher.search(&query, &TopDocs::with_limit(10))?; - let mut results = vec![]; info!("search found {} docs", top_docs.len()); - for (_score, doc_address) in top_docs { - let retrieved_doc: TantivyDocument = searcher.doc(doc_address)?; - results.push(format!("{}", retrieved_doc.to_json(&schema))); + let uids = top_docs + .into_iter() + .map(|(_, doc_address)| { + searcher.doc(doc_address).map(|doc: TantivyDocument| { + doc.get_first(uid) + .expect("doc missing uid") + .as_str() + .expect("doc str missing") + .to_string() + }) + }) + .collect::, TantivyError>>()?; + + //let uids = format!("'{}'", uids.join("','")); + info!("uids {uids:?}"); + let rows = sqlx::query_file!("sql/threads-from-uid.sql", &uids as &[String]) + .fetch_all(pool) + .await?; + let mut res = Vec::new(); + info!("found {} hits joining w/ tantivy", rows.len()); + for (i, r) in rows.into_iter().enumerate() { + res.push(( + i as i32 + offset, + thread_summary_from_row(ThreadSummaryRecord { + site: r.site, + date: r.date, + is_read: r.is_read, + title: r.title, + uid: r.uid, + name: r.name, + }) + .await, + )); } - Ok(results.join(" ")) + Ok(res) + } + pub fn drop_and_load_index(&self) -> Result<(), TantivyError> { + create_news_db(&self.db_path) } } fn create_news_db(tantivy_db_path: &str) -> Result<(), TantivyError> { + info!("create_news_db"); std::fs::remove_dir_all(tantivy_db_path)?; std::fs::create_dir_all(tantivy_db_path)?; use tantivy::schema::*; @@ -100,7 +155,7 @@ fn create_news_db(tantivy_db_path: &str) -> Result<(), TantivyError> { schema_builder.add_text_field("title", TEXT | STORED); schema_builder.add_text_field("summary", TEXT); schema_builder.add_text_field("link", STRING | STORED); - schema_builder.add_date_field("date", FAST); + schema_builder.add_date_field("date", FAST | INDEXED | STORED); schema_builder.add_bool_field("is_read", FAST); schema_builder.add_text_field("uid", STRING | STORED); schema_builder.add_i64_field("id", FAST); diff --git a/server/static/graphql-playground.html b/server/static/graphql-playground.html new file mode 100644 index 0000000..e7c27e5 --- /dev/null +++ b/server/static/graphql-playground.html @@ -0,0 +1,59 @@ + + + + + + + GraphQL Playground + + + + + + +
+ + +
Loading + GraphQL Playground +
+
+ + + +