From 3ec1741f1066c3cc3455cde36be366663b8d6bba Mon Sep 17 00:00:00 2001 From: Bill Thiede Date: Sun, 29 Sep 2024 16:28:05 -0700 Subject: [PATCH] web & server: using tantivy for news post search --- ...1b523e6a804ae9569da18455eb380eadb515.json} | 4 +- ...4b99308a74db33cdfb91baaa1bb98dc53a82.json} | 4 +- ...501576a38db222380c034ae7bc030ba6a915e.json | 64 +++++ ...af16eed687c74d9d77957ad08aaaeefcd1ff2.json | 20 ++ ...93e5eac874eb8f95f55a27859537c51bca4f4.json | 52 ++++ server/sql/all-posts.sql | 9 +- server/sql/all-uids.sql | 6 + server/sql/count.sql | 5 +- server/sql/posts-from-uids.sql | 14 + server/sql/threads-from-uid.sql | 3 +- server/src/bin/server.rs | 6 - server/src/graphql.rs | 204 +++++++++------ server/src/lib.rs | 59 +++-- server/src/newsreader.rs | 67 ++++- server/src/nm.rs | 47 +++- server/src/tantivy.rs | 240 +++++++++++++++--- web/graphql/front_page.graphql | 1 + web/graphql/refresh.graphql | 3 + web/graphql/schema.json | 77 ++++++ web/src/graphql.rs | 8 + web/src/state.rs | 12 +- web/src/view/mod.rs | 2 +- 22 files changed, 737 insertions(+), 170 deletions(-) rename server/.sqlx/{query-e28b890e308f483aa6bd08617548ae66294ae1e99b1cab49f5f4211e0fd7d419.json => query-13a9193d91264b678c18df032cc61b523e6a804ae9569da18455eb380eadb515.json} (63%) rename server/.sqlx/{query-1b2244c9b9b64a1395d8d266f5df5352242bbe5efe481b0852e1c1d4b40584a7.json => query-4cbb6252a0b76f6a452ee09abb9f4b99308a74db33cdfb91baaa1bb98dc53a82.json} (80%) create mode 100644 server/.sqlx/query-700757f6fb175b1aa246816ac60501576a38db222380c034ae7bc030ba6a915e.json create mode 100644 server/.sqlx/query-c16821d20c1e6012d6079008889af16eed687c74d9d77957ad08aaaeefcd1ff2.json create mode 100644 server/.sqlx/query-ed34545b5fc681e9df9fc47c6ff93e5eac874eb8f95f55a27859537c51bca4f4.json create mode 100644 server/sql/all-uids.sql create mode 100644 server/sql/posts-from-uids.sql create mode 100644 web/graphql/refresh.graphql diff --git a/server/.sqlx/query-e28b890e308f483aa6bd08617548ae66294ae1e99b1cab49f5f4211e0fd7d419.json b/server/.sqlx/query-13a9193d91264b678c18df032cc61b523e6a804ae9569da18455eb380eadb515.json similarity index 63% rename from server/.sqlx/query-e28b890e308f483aa6bd08617548ae66294ae1e99b1cab49f5f4211e0fd7d419.json rename to server/.sqlx/query-13a9193d91264b678c18df032cc61b523e6a804ae9569da18455eb380eadb515.json index 40a833a..ee2494b 100644 --- a/server/.sqlx/query-e28b890e308f483aa6bd08617548ae66294ae1e99b1cab49f5f4211e0fd7d419.json +++ b/server/.sqlx/query-13a9193d91264b678c18df032cc61b523e6a804ae9569da18455eb380eadb515.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "SELECT\n COUNT(*) count\nFROM\n post\nWHERE\n ($1::text IS NULL OR site = $1)\n AND (\n NOT $2\n OR NOT is_read\n )\n", + "query": "SELECT\n COUNT(*) count\nFROM\n post\nWHERE\n (\n $1 :: text IS NULL\n OR site = $1\n )\n AND (\n NOT $2\n OR NOT is_read\n )\n", "describe": { "columns": [ { @@ -19,5 +19,5 @@ null ] }, - "hash": "e28b890e308f483aa6bd08617548ae66294ae1e99b1cab49f5f4211e0fd7d419" + "hash": "13a9193d91264b678c18df032cc61b523e6a804ae9569da18455eb380eadb515" } diff --git a/server/.sqlx/query-1b2244c9b9b64a1395d8d266f5df5352242bbe5efe481b0852e1c1d4b40584a7.json b/server/.sqlx/query-4cbb6252a0b76f6a452ee09abb9f4b99308a74db33cdfb91baaa1bb98dc53a82.json similarity index 80% rename from server/.sqlx/query-1b2244c9b9b64a1395d8d266f5df5352242bbe5efe481b0852e1c1d4b40584a7.json rename to server/.sqlx/query-4cbb6252a0b76f6a452ee09abb9f4b99308a74db33cdfb91baaa1bb98dc53a82.json index 15ec083..6540d94 100644 --- a/server/.sqlx/query-1b2244c9b9b64a1395d8d266f5df5352242bbe5efe481b0852e1c1d4b40584a7.json +++ b/server/.sqlx/query-4cbb6252a0b76f6a452ee09abb9f4b99308a74db33cdfb91baaa1bb98dc53a82.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "SELECT\n site,\n title,\n summary,\n link,\n date,\n is_read,\n uid,\n id\nFROM post\n", + "query": "SELECT\n site,\n title,\n summary,\n link,\n date,\n is_read,\n uid,\n p.id id\nFROM\n post AS p\n JOIN feed AS f ON p.site = f.slug -- necessary to weed out nzb posts\nORDER BY\n date DESC;\n", "describe": { "columns": [ { @@ -58,5 +58,5 @@ false ] }, - "hash": "1b2244c9b9b64a1395d8d266f5df5352242bbe5efe481b0852e1c1d4b40584a7" + "hash": "4cbb6252a0b76f6a452ee09abb9f4b99308a74db33cdfb91baaa1bb98dc53a82" } diff --git a/server/.sqlx/query-700757f6fb175b1aa246816ac60501576a38db222380c034ae7bc030ba6a915e.json b/server/.sqlx/query-700757f6fb175b1aa246816ac60501576a38db222380c034ae7bc030ba6a915e.json new file mode 100644 index 0000000..42721fe --- /dev/null +++ b/server/.sqlx/query-700757f6fb175b1aa246816ac60501576a38db222380c034ae7bc030ba6a915e.json @@ -0,0 +1,64 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT\n site AS \"site!\",\n title AS \"title!\",\n summary AS \"summary!\",\n link AS \"link!\",\n date AS \"date!\",\n is_read AS \"is_read!\",\n uid AS \"uid!\",\n p.id id\nFROM\n post p\n JOIN feed f ON p.site = f.slug\nWHERE\n uid = ANY ($1);\n", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "site!", + "type_info": "Text" + }, + { + "ordinal": 1, + "name": "title!", + "type_info": "Text" + }, + { + "ordinal": 2, + "name": "summary!", + "type_info": "Text" + }, + { + "ordinal": 3, + "name": "link!", + "type_info": "Text" + }, + { + "ordinal": 4, + "name": "date!", + "type_info": "Timestamp" + }, + { + "ordinal": 5, + "name": "is_read!", + "type_info": "Bool" + }, + { + "ordinal": 6, + "name": "uid!", + "type_info": "Text" + }, + { + "ordinal": 7, + "name": "id", + "type_info": "Int4" + } + ], + "parameters": { + "Left": [ + "TextArray" + ] + }, + "nullable": [ + true, + true, + true, + true, + true, + true, + false, + false + ] + }, + "hash": "700757f6fb175b1aa246816ac60501576a38db222380c034ae7bc030ba6a915e" +} diff --git a/server/.sqlx/query-c16821d20c1e6012d6079008889af16eed687c74d9d77957ad08aaaeefcd1ff2.json b/server/.sqlx/query-c16821d20c1e6012d6079008889af16eed687c74d9d77957ad08aaaeefcd1ff2.json new file mode 100644 index 0000000..f34cad1 --- /dev/null +++ b/server/.sqlx/query-c16821d20c1e6012d6079008889af16eed687c74d9d77957ad08aaaeefcd1ff2.json @@ -0,0 +1,20 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT\n uid\nFROM\n post AS p\n JOIN feed AS f ON p.site = f.slug -- necessary to weed out nzb posts\n;\n", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "uid", + "type_info": "Text" + } + ], + "parameters": { + "Left": [] + }, + "nullable": [ + false + ] + }, + "hash": "c16821d20c1e6012d6079008889af16eed687c74d9d77957ad08aaaeefcd1ff2" +} diff --git a/server/.sqlx/query-ed34545b5fc681e9df9fc47c6ff93e5eac874eb8f95f55a27859537c51bca4f4.json b/server/.sqlx/query-ed34545b5fc681e9df9fc47c6ff93e5eac874eb8f95f55a27859537c51bca4f4.json new file mode 100644 index 0000000..b568a0a --- /dev/null +++ b/server/.sqlx/query-ed34545b5fc681e9df9fc47c6ff93e5eac874eb8f95f55a27859537c51bca4f4.json @@ -0,0 +1,52 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT\n site,\n date,\n is_read,\n title,\n uid,\n name\nFROM\n post p\n JOIN feed f ON p.site = f.slug\nWHERE\n uid = ANY ($1)\nORDER BY\n date DESC;\n", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "site", + "type_info": "Text" + }, + { + "ordinal": 1, + "name": "date", + "type_info": "Timestamp" + }, + { + "ordinal": 2, + "name": "is_read", + "type_info": "Bool" + }, + { + "ordinal": 3, + "name": "title", + "type_info": "Text" + }, + { + "ordinal": 4, + "name": "uid", + "type_info": "Text" + }, + { + "ordinal": 5, + "name": "name", + "type_info": "Text" + } + ], + "parameters": { + "Left": [ + "TextArray" + ] + }, + "nullable": [ + true, + true, + true, + true, + false, + true + ] + }, + "hash": "ed34545b5fc681e9df9fc47c6ff93e5eac874eb8f95f55a27859537c51bca4f4" +} diff --git a/server/sql/all-posts.sql b/server/sql/all-posts.sql index 47e4c0f..95a4bf8 100644 --- a/server/sql/all-posts.sql +++ b/server/sql/all-posts.sql @@ -6,6 +6,9 @@ SELECT date, is_read, uid, - id -FROM post -WHERE title ILIKE '%grapheme%' OR summary ILIKE '%grapheme%'; + p.id id +FROM + post AS p + JOIN feed AS f ON p.site = f.slug -- necessary to weed out nzb posts +ORDER BY + date DESC; diff --git a/server/sql/all-uids.sql b/server/sql/all-uids.sql new file mode 100644 index 0000000..d46fa94 --- /dev/null +++ b/server/sql/all-uids.sql @@ -0,0 +1,6 @@ +SELECT + uid +FROM + post AS p + JOIN feed AS f ON p.site = f.slug -- necessary to weed out nzb posts +; diff --git a/server/sql/count.sql b/server/sql/count.sql index 69e1941..4801fdb 100644 --- a/server/sql/count.sql +++ b/server/sql/count.sql @@ -3,7 +3,10 @@ SELECT FROM post WHERE - ($1::text IS NULL OR site = $1) + ( + $1 :: text IS NULL + OR site = $1 + ) AND ( NOT $2 OR NOT is_read diff --git a/server/sql/posts-from-uids.sql b/server/sql/posts-from-uids.sql new file mode 100644 index 0000000..73a8551 --- /dev/null +++ b/server/sql/posts-from-uids.sql @@ -0,0 +1,14 @@ +SELECT + site AS "site!", + title AS "title!", + summary AS "summary!", + link AS "link!", + date AS "date!", + is_read AS "is_read!", + uid AS "uid!", + p.id id +FROM + post p + JOIN feed f ON p.site = f.slug +WHERE + uid = ANY ($1); diff --git a/server/sql/threads-from-uid.sql b/server/sql/threads-from-uid.sql index ba2eebc..ae893c7 100644 --- a/server/sql/threads-from-uid.sql +++ b/server/sql/threads-from-uid.sql @@ -10,4 +10,5 @@ FROM JOIN feed f ON p.site = f.slug WHERE uid = ANY ($1) -; +ORDER BY + date DESC; diff --git a/server/src/bin/server.rs b/server/src/bin/server.rs index e09ba9d..90e91de 100644 --- a/server/src/bin/server.rs +++ b/server/src/bin/server.rs @@ -27,11 +27,6 @@ use server::{ }; use sqlx::postgres::PgPool; -#[get("/refresh")] -async fn refresh(nm: &State) -> Result, Debug> { - Ok(Json(String::from_utf8_lossy(&nm.new()?).to_string())) -} - #[get("/show//pretty")] async fn show_pretty( nm: &State, @@ -209,7 +204,6 @@ async fn main() -> Result<(), Box> { shared::urls::MOUNT_POINT, routes![ original, - refresh, show_pretty, show, graphql_query, diff --git a/server/src/graphql.rs b/server/src/graphql.rs index 10d8267..c11ce2c 100644 --- a/server/src/graphql.rs +++ b/server/src/graphql.rs @@ -1,3 +1,5 @@ +use std::str::FromStr; + use async_graphql::{ connection::{self, Connection, Edge, OpaqueCursor}, Context, EmptySubscription, Enum, Error, FieldResult, InputObject, Object, Schema, @@ -16,6 +18,26 @@ pub type UnixTime = isize; /// # Thread ID, sans "thread:" pub type ThreadId = String; +#[derive(Debug, Enum, Copy, Clone, Eq, PartialEq)] +pub enum Corpus { + Notmuch, + Newsreader, + Tantivy, +} + +impl FromStr for Corpus { + type Err = String; + fn from_str(s: &str) -> Result { + Ok(match s { + "notmuch" => Corpus::Notmuch, + "newsreader" => Corpus::Newsreader, + "tantivy" => Corpus::Tantivy, + s => return Err(format!("unknown corpus: '{s}'")), + }) + } +} + +// TODO: add is_read field and remove all use of 'tag:unread' #[derive(Debug, SimpleObject)] pub struct ThreadSummary { pub thread: ThreadId, @@ -30,6 +52,7 @@ pub struct ThreadSummary { pub authors: String, pub subject: String, pub tags: Vec, + pub corpus: Corpus, } #[derive(Debug, Union)] @@ -237,13 +260,16 @@ impl QueryRoot { async fn count<'ctx>(&self, ctx: &Context<'ctx>, query: String) -> Result { let nm = ctx.data_unchecked::(); let pool = ctx.data_unchecked::(); + let tantivy = ctx.data_unchecked::(); let newsreader_query: Query = query.parse()?; let newsreader_count = newsreader::count(pool, &newsreader_query).await?; - let notmuch_count = nm::count(nm, &newsreader_query.to_notmuch()).await?; - info!("count {newsreader_query:?} newsreader count {newsreader_count} notmuch count {notmuch_count}"); - Ok(newsreader_count + notmuch_count) + let notmuch_count = nm::count(nm, &newsreader_query).await?; + let tantivy_count = tantivy.count(&newsreader_query).await?; + let total = newsreader_count + notmuch_count + tantivy_count; + info!("count {newsreader_query:?} newsreader count {newsreader_count} notmuch count {notmuch_count} tantivy count {tantivy_count} total {total}"); + Ok(total) } async fn search<'ctx>( @@ -255,18 +281,11 @@ impl QueryRoot { last: Option, query: String, ) -> Result, ThreadSummary>, Error> { - // TODO: add keywords to limit search to one corpus, i.e. is:news or is:mail info!("search({after:?} {before:?} {first:?} {last:?} {query:?})",); let nm = ctx.data_unchecked::(); let pool = ctx.data_unchecked::(); let tantivy = ctx.data_unchecked::(); - #[derive(Debug)] - enum ThreadSummaryCursor { - Newsreader(i32, ThreadSummary), - Notmuch(i32, ThreadSummary), - Tantivy(i32, ThreadSummary), - } Ok(connection::query( after, before, @@ -277,7 +296,7 @@ impl QueryRoot { first: Option, last: Option| async move { info!( - "search({:?} {:?} {first:?} {last:?} {query:?})", + "search(after {:?} before {:?} first {first:?} last {last:?} query: {query:?})", after.as_ref().map(|v| &v.0), before.as_ref().map(|v| &v.0) ); @@ -288,65 +307,40 @@ impl QueryRoot { let newsreader_before = before.as_ref().map(|sc| sc.newsreader_offset); let notmuch_before = before.as_ref().map(|sc| sc.notmuch_offset); let tantivy_before = before.as_ref().map(|sc| sc.tantivy_offset); + let first = first.map(|v| v as i32); + let last = last.map(|v| v as i32); - let newsreader_query: Query = query.parse()?; - info!("newsreader_query {newsreader_query:?}"); - let newsreader_results = if newsreader_query.is_newsreader { - newsreader::search( - pool, - newsreader_after, - newsreader_before, - first.map(|v| v as i32), - last.map(|v| v as i32), - &newsreader_query, - ) - .await? - .into_iter() - .map(|(cur, ts)| ThreadSummaryCursor::Newsreader(cur, ts)) - .collect() - } else { - Vec::new() - }; - - let notmuch_results = if newsreader_query.is_notmuch { - nm::search( - nm, - notmuch_after, - notmuch_before, - first.map(|v| v as i32), - last.map(|v| v as i32), - newsreader_query.to_notmuch(), - ) - .await? - .into_iter() - .map(|(cur, ts)| ThreadSummaryCursor::Notmuch(cur, ts)) - .collect() - } else { - Vec::new() - }; - - let tantivy_results = if newsreader_query.is_tantivy { - tantivy - .search( - pool, - tantivy_after, - tantivy_before, - first.map(|v| v as i32), - last.map(|v| v as i32), - &newsreader_query, - ) - .await? - .into_iter() - .map(|(cur, ts)| ThreadSummaryCursor::Tantivy(cur, ts)) - .collect() - } else { - Vec::new() - }; + let query: Query = query.parse()?; + info!("newsreader_query {query:?}"); + let newsreader_results = newsreader_search( + pool, + newsreader_after, + newsreader_before, + first, + last, + &query, + ) + .await?; + let notmuch_results = + notmuch_search(nm, notmuch_after, notmuch_before, first, last, &query).await?; + let tantivy_results = tantivy_search( + tantivy, + pool, + tantivy_after, + tantivy_before, + first, + last, + &query, + ) + .await?; info!( - "tantivy results:\nis_tantivy:{} {tantivy_results:#?}", - newsreader_query.is_tantivy + "newsreader_results ({}) notmuch_results ({}) tantivy_results ({})", + newsreader_results.len(), + notmuch_results.len(), + tantivy_results.len() ); + let mut results: Vec<_> = newsreader_results .into_iter() .chain(notmuch_results) @@ -362,6 +356,7 @@ impl QueryRoot { let mut has_next_page = before.is_some(); if let Some(first) = first { + let first = first as usize; if results.len() > first { has_next_page = true; results.truncate(first); @@ -370,6 +365,7 @@ impl QueryRoot { let mut has_previous_page = after.is_some(); if let Some(last) = last { + let last = last as usize; if results.len() > last { has_previous_page = true; results.truncate(last); @@ -437,6 +433,59 @@ impl QueryRoot { } } +#[derive(Debug)] +enum ThreadSummaryCursor { + Newsreader(i32, ThreadSummary), + Notmuch(i32, ThreadSummary), + Tantivy(i32, ThreadSummary), +} +async fn newsreader_search( + pool: &PgPool, + after: Option, + before: Option, + first: Option, + last: Option, + query: &Query, +) -> Result, async_graphql::Error> { + Ok(newsreader::search(pool, after, before, first, last, &query) + .await? + .into_iter() + .map(|(cur, ts)| ThreadSummaryCursor::Newsreader(cur, ts)) + .collect()) +} + +async fn notmuch_search( + nm: &Notmuch, + after: Option, + before: Option, + first: Option, + last: Option, + query: &Query, +) -> Result, async_graphql::Error> { + Ok(nm::search(nm, after, before, first, last, &query) + .await? + .into_iter() + .map(|(cur, ts)| ThreadSummaryCursor::Notmuch(cur, ts)) + .collect()) +} + +async fn tantivy_search( + tantivy: &TantivyConnection, + pool: &PgPool, + after: Option, + before: Option, + first: Option, + last: Option, + query: &Query, +) -> Result, async_graphql::Error> { + Ok(tantivy + .search(pool, after, before, first, last, &query) + .await? + .into_iter() + .map(|(cur, ts)| ThreadSummaryCursor::Tantivy(cur, ts)) + .collect()) +} + pub struct Mutation; #[Object] impl Mutation { @@ -448,14 +497,12 @@ impl Mutation { ) -> Result { let nm = ctx.data_unchecked::(); let pool = ctx.data_unchecked::(); + let tantivy = ctx.data_unchecked::(); - for q in query.split_whitespace() { - if newsreader::is_newsreader_thread(&q) { - newsreader::set_read_status(pool, &q, unread).await?; - } else { - nm::set_read_status(nm, q, unread).await?; - } - } + let query: Query = query.parse()?; + newsreader::set_read_status(pool, &query, unread).await?; + tantivy.reindex_thread(pool, &query).await?; + nm::set_read_status(nm, &query, unread).await?; Ok(true) } async fn tag_add<'ctx>( @@ -486,10 +533,19 @@ impl Mutation { let pool = ctx.data_unchecked::(); tantivy.drop_and_load_index()?; - tantivy.reindex(pool).await?; + tantivy.reindex_all(pool).await?; Ok(true) } + async fn refresh<'ctx>(&self, ctx: &Context<'ctx>) -> Result { + let nm = ctx.data_unchecked::(); + let tantivy = ctx.data_unchecked::(); + let pool = ctx.data_unchecked::(); + // TODO: parallelize + info!("{}", String::from_utf8_lossy(&nm.new()?)); + tantivy.refresh(pool).await?; + Ok(true) + } } pub type GraphqlSchema = Schema; diff --git a/server/src/lib.rs b/server/src/lib.rs index 19692a2..81445c2 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -18,15 +18,16 @@ use lol_html::{ }; use maplit::{hashmap, hashset}; use scraper::{Html, Selector}; -use sqlx::{postgres::PgPool, types::time::PrimitiveDateTime}; +use sqlx::types::time::PrimitiveDateTime; use thiserror::Error; use tokio::sync::Mutex; use url::Url; use crate::{ error::ServerError, - graphql::ThreadSummary, - newsreader::{extract_thread_id, is_newsreader_thread}, + graphql::{Corpus, ThreadSummary}, + newsreader::is_newsreader_thread, + nm::is_notmuch_thread_or_id, }; const NEWSREADER_TAG_PREFIX: &'static str = "News/"; @@ -607,12 +608,13 @@ fn compute_offset_limit( #[derive(Debug)] pub struct Query { pub unread_only: bool, - pub tag: Option, - pub uid: Option, + pub tags: Vec, + pub uids: Vec, pub remainder: Vec, pub is_notmuch: bool, pub is_newsreader: bool, pub is_tantivy: bool, + pub corpus: Option, } impl Query { @@ -627,10 +629,10 @@ impl Query { if self.unread_only { parts.push("is:unread".to_string()); } - if let Some(site) = &self.tag { - parts.push(format!("tag:{site}")); + for tag in &self.tags { + parts.push(format!("tag:{tag}")); } - if let Some(uid) = &self.uid { + for uid in &self.uids { parts.push(uid.clone()); } parts.extend(self.remainder.clone()); @@ -642,48 +644,60 @@ impl FromStr for Query { type Err = Infallible; fn from_str(s: &str) -> Result { let mut unread_only = false; - let mut tag = None; - let mut uid = None; + let mut tags = Vec::new(); + let mut uids = Vec::new(); let mut remainder = Vec::new(); let mut is_notmuch = false; - let mut is_newsreader = false; + let is_newsreader = false; let mut is_tantivy = false; + let mut corpus = None; for word in s.split_whitespace() { if word == "is:unread" { unread_only = true } else if word.starts_with("tag:") { - tag = Some(word["tag:".len()..].to_string()) + tags.push(word["tag:".len()..].to_string()); + /* } else if word.starts_with("tag:") { // Any tag that doesn't match site_prefix should explicitly set the site to something not in the // database site = Some(NON_EXISTENT_SITE_NAME.to_string()); */ + } else if word.starts_with("corpus:") { + let c = word["corpus:".len()..].to_string(); + corpus = c.parse::().map(|c| Some(c)).unwrap_or_else(|e| { + warn!("Error parsing corpus '{c}': {e:?}"); + None + }); } else if is_newsreader_thread(word) { - uid = Some(extract_thread_id(word).to_string()) + uids.push(word.to_string()); + } else if is_notmuch_thread_or_id(word) { + uids.push(word.to_string()); } else if word == "is:mail" || word == "is:email" || word == "is:notmuch" { is_notmuch = true; } else if word == "is:news" || word == "is:newsreader" { - is_newsreader = true; + is_tantivy = true; } else { remainder.push(word.to_string()); } } // If we don't see any explicit filters for a corpus, flip them all on - if !(is_notmuch || is_newsreader) { - is_newsreader = true; + if corpus.is_none() && !(is_newsreader || is_notmuch || is_tantivy) { + // Don't set is_newsreader unless debugging, assume tantivy can handle it. + // Explicitely setting corpus:newsreader will by-pass this logic + // is_newsreader = true; is_notmuch = true; + is_tantivy = true; } - // TODO: decide if tantivy gets it's own life or replaces newsreader - is_tantivy = is_newsreader; Ok(Query { unread_only, - tag, - uid, + tags, + uids, remainder, is_notmuch, is_newsreader, is_tantivy, + corpus, }) } } @@ -694,6 +708,7 @@ pub struct ThreadSummaryRecord { pub title: Option, pub uid: String, pub name: Option, + pub corpus: Corpus, } async fn thread_summary_from_row(r: ThreadSummaryRecord) -> ThreadSummary { @@ -711,12 +726,14 @@ async fn thread_summary_from_row(r: ThreadSummaryRecord) -> ThreadSummary { .expect("post missing date") .assume_utc() .unix_timestamp() as isize, - date_relative: "TODO date_relative".to_string(), + date_relative: format!("{:?}", r.date), + //date_relative: "TODO date_relative".to_string(), matched: 0, total: 1, authors: r.name.unwrap_or_else(|| site.clone()), subject: title, tags, + corpus: r.corpus, } } async fn clean_title(title: &str) -> Result { diff --git a/server/src/newsreader.rs b/server/src/newsreader.rs index 0bf1461..a15c45e 100644 --- a/server/src/newsreader.rs +++ b/server/src/newsreader.rs @@ -13,14 +13,14 @@ use crate::{ clean_title, compute_offset_limit, config::Config, error::ServerError, - graphql::{NewsPost, Tag, Thread, ThreadSummary}, + graphql::{Corpus, NewsPost, Tag, Thread, ThreadSummary}, thread_summary_from_row, AddOutlink, EscapeHtml, FrameImages, InlineStyle, Query, SanitizeHtml, - SlurpContents, StripHtml, ThreadSummaryRecord, Transformer, NEWSREADER_TAG_PREFIX, + SlurpContents, ThreadSummaryRecord, Transformer, NEWSREADER_TAG_PREFIX, NEWSREADER_THREAD_PREFIX, }; -pub fn is_newsreader_search(query: &str) -> bool { - query.contains(NEWSREADER_TAG_PREFIX) +pub fn is_newsreader_query(query: &Query) -> bool { + query.is_newsreader || query.corpus == Some(Corpus::Newsreader) } pub fn is_newsreader_thread(query: &str) -> bool { @@ -28,7 +28,11 @@ pub fn is_newsreader_thread(query: &str) -> bool { } pub fn extract_thread_id(query: &str) -> &str { - &query[NEWSREADER_THREAD_PREFIX.len()..] + if query.starts_with(NEWSREADER_THREAD_PREFIX) { + &query[NEWSREADER_THREAD_PREFIX.len()..] + } else { + query + } } pub fn extract_site(tag: &str) -> &str { @@ -39,14 +43,33 @@ pub fn make_news_tag(tag: &str) -> String { format!("tag:{NEWSREADER_TAG_PREFIX}{tag}") } +fn site_from_tags(tags: &[String]) -> Option { + for t in tags { + if t.starts_with(NEWSREADER_TAG_PREFIX) { + return Some(extract_site(t).to_string()); + } + } + None +} + pub async fn count(pool: &PgPool, query: &Query) -> Result { + if !is_newsreader_query(query) { + return Ok(0); + } if !query.remainder.is_empty() { // TODO: handle full text search against all sites, for now, early return if search words // are specified. return Ok(0); } - let row = sqlx::query_file!("sql/count.sql", query.tag, query.unread_only) + let site = site_from_tags(&query.tags); + if !query.tags.is_empty() && site.is_none() { + // Newsreader can only handle all sites read/unread queries, anything with a non-site tag + // isn't supported + return Ok(0); + } + + let row = sqlx::query_file!("sql/count.sql", site, query.unread_only) .fetch_one(pool) .await?; Ok(row.count.unwrap_or(0).try_into().unwrap_or(0)) @@ -61,12 +84,22 @@ pub async fn search( query: &Query, ) -> Result, async_graphql::Error> { info!("search({after:?} {before:?} {first:?} {last:?} {query:?}"); + if !is_newsreader_query(query) { + return Ok(Vec::new()); + } if !query.remainder.is_empty() { // TODO: handle full text search against all sites, for now, early return if search words // are specified. return Ok(Vec::new()); } + let site = site_from_tags(&query.tags); + if !query.tags.is_empty() && site.is_none() { + // Newsreader can only handle all sites read/unread queries, anything with a non-site tag + // isn't supported + return Ok(Vec::new()); + } + let (offset, mut limit) = compute_offset_limit(after, before, first, last); if before.is_none() { // When searching forward, the +1 is to see if there are more pages of data available. @@ -75,7 +108,6 @@ pub async fn search( limit = limit + 1; } - let site = query.tag.as_ref().map(|t| extract_site(&t).to_string()); info!( "search offset {offset} limit {limit} site {site:?} unread_only {}", query.unread_only @@ -102,6 +134,7 @@ pub async fn search( title: r.title, uid: r.uid, name: r.name, + corpus: Corpus::Newsreader, }) .await, )); @@ -243,12 +276,22 @@ pub async fn thread( } pub async fn set_read_status<'ctx>( pool: &PgPool, - query: &str, + query: &Query, unread: bool, ) -> Result { - let query: Query = query.parse()?; - sqlx::query_file!("sql/set_unread.sql", !unread, query.uid) - .execute(pool) - .await?; + // TODO: make single query when query.uids.len() > 1 + let uids: Vec<_> = query + .uids + .iter() + .filter(|uid| is_newsreader_thread(uid)) + .map( + |uid| extract_thread_id(uid), // TODO strip prefix + ) + .collect(); + for uid in uids { + sqlx::query_file!("sql/set_unread.sql", !unread, uid) + .execute(pool) + .await?; + } Ok(true) } diff --git a/server/src/nm.rs b/server/src/nm.rs index 1cbd3b6..1d2563e 100644 --- a/server/src/nm.rs +++ b/server/src/nm.rs @@ -14,10 +14,10 @@ use crate::{ compute_offset_limit, error::ServerError, graphql::{ - Attachment, Body, DispositionType, Email, EmailThread, Header, Html, Message, PlainText, - Tag, Thread, ThreadSummary, UnhandledContentType, + Attachment, Body, Corpus, DispositionType, Email, EmailThread, Header, Html, Message, + PlainText, Tag, Thread, ThreadSummary, UnhandledContentType, }, - linkify_html, InlineStyle, SanitizeHtml, Transformer, + linkify_html, InlineStyle, Query, SanitizeHtml, Transformer, }; const TEXT_PLAIN: &'static str = "text/plain"; @@ -31,6 +31,14 @@ const MULTIPART_RELATED: &'static str = "multipart/related"; const MAX_RAW_MESSAGE_SIZE: usize = 100_000; +fn is_notmuch_query(query: &Query) -> bool { + query.is_notmuch || query.corpus == Some(Corpus::Notmuch) +} + +pub fn is_notmuch_thread_or_id(id: &str) -> bool { + id.starts_with("id:") || id.starts_with("thread:") +} + // TODO(wathiede): decide good error type pub fn threadset_to_messages(thread_set: notmuch::ThreadSet) -> Result, ServerError> { for t in thread_set.0 { @@ -39,8 +47,12 @@ pub fn threadset_to_messages(thread_set: notmuch::ThreadSet) -> Result Result { - Ok(nm.count(query)?) +pub async fn count(nm: &Notmuch, query: &Query) -> Result { + if !is_notmuch_query(query) { + return Ok(0); + } + let query = query.to_notmuch(); + Ok(nm.count(&query)?) } pub async fn search( @@ -49,8 +61,12 @@ pub async fn search( before: Option, first: Option, last: Option, - query: String, + query: &Query, ) -> Result, async_graphql::Error> { + if !is_notmuch_query(query) { + return Ok(Vec::new()); + } + let query = query.to_notmuch(); let (offset, mut limit) = compute_offset_limit(after, before, first, last); if before.is_none() { // When searching forward, the +1 is to see if there are more pages of data available. @@ -75,6 +91,7 @@ pub async fn search( authors: ts.authors, subject: ts.subject, tags: ts.tags, + corpus: Corpus::Notmuch, }, ) }) @@ -770,13 +787,21 @@ fn render_content_type_tree(m: &ParsedMail) -> String { pub async fn set_read_status<'ctx>( nm: &Notmuch, - query: &str, + query: &Query, unread: bool, ) -> Result { - if unread { - nm.tag_add("unread", &format!("{query}"))?; - } else { - nm.tag_remove("unread", &format!("{query}"))?; + let uids: Vec<_> = query + .uids + .iter() + .filter(|uid| is_notmuch_thread_or_id(uid)) + .collect(); + info!("set_read_status({unread} {uids:?})"); + for uid in uids { + if unread { + nm.tag_add("unread", uid)?; + } else { + nm.tag_remove("unread", uid)?; + } } Ok(true) } diff --git a/server/src/tantivy.rs b/server/src/tantivy.rs index ea93e52..f5ac0b7 100644 --- a/server/src/tantivy.rs +++ b/server/src/tantivy.rs @@ -1,11 +1,26 @@ -use log::info; -use sqlx::postgres::PgPool; -use tantivy::{schema::Value, Index, TantivyError}; +use std::collections::HashSet; -use crate::{ - error::ServerError, graphql::ThreadSummary, thread_summary_from_row, Query, ThreadSummaryRecord, +use log::{debug, error, info}; +use sqlx::{postgres::PgPool, types::time::PrimitiveDateTime}; +use tantivy::{ + collector::{DocSetCollector, TopDocs}, + query, + query::{AllQuery, BooleanQuery, Occur, QueryParser, TermQuery}, + schema::{Facet, IndexRecordOption, Value}, + DocAddress, Index, Searcher, TantivyDocument, TantivyError, Term, }; +use crate::{ + compute_offset_limit, + error::ServerError, + graphql::{Corpus, ThreadSummary}, + newsreader::{extract_thread_id, is_newsreader_thread}, + thread_summary_from_row, Query, ThreadSummaryRecord, +}; + +pub fn is_tantivy_query(query: &Query) -> bool { + query.is_tantivy || query.corpus == Some(Corpus::Tantivy) +} pub struct TantivyConnection { db_path: String, //index: Index, @@ -27,7 +42,67 @@ impl TantivyConnection { db_path: tantivy_db_path.to_string(), }) } - pub async fn reindex(&self, pool: &PgPool) -> Result<(), ServerError> { + pub async fn refresh(&self, pool: &PgPool) -> Result<(), ServerError> { + let start_time = std::time::Instant::now(); + let p_uids: Vec<_> = sqlx::query_file!("sql/all-uids.sql") + .fetch_all(pool) + .await? + .into_iter() + .map(|r| r.uid) + .collect(); + info!( + "refresh from postgres got {} uids in {}", + p_uids.len(), + start_time.elapsed().as_secs_f32() + ); + + let start_time = std::time::Instant::now(); + let (searcher, _query) = self.searcher_and_query("")?; + let docs = searcher.search(&AllQuery, &DocSetCollector)?; + let uid = self.get_index()?.schema().get_field("uid")?; + let t_uids: Vec<_> = docs + .into_iter() + .map(|doc_address| { + searcher + .doc(doc_address) + .map(|doc: TantivyDocument| { + debug!("doc: {doc:#?}"); + doc.get_first(uid) + .expect("uid") + .as_str() + .expect("as_str") + .to_string() + }) + .expect("searcher.doc") + }) + .collect(); + + info!( + "refresh tantivy got {} uids in {}", + t_uids.len(), + start_time.elapsed().as_secs_f32() + ); + let t_set: HashSet<_> = t_uids.into_iter().collect(); + let need: Vec<_> = p_uids + .into_iter() + .filter(|uid| !t_set.contains(uid.as_str())) + .collect(); + if !need.is_empty() { + info!( + "need to reindex {} uids: {:?}...", + need.len(), + &need[..need.len().min(10)] + ); + } + let batch_size = 1000; + let uids: Vec<_> = need[..need.len().min(batch_size)] + .into_iter() + .cloned() + .collect(); + self.reindex_uids(pool, &uids).await + } + async fn reindex_uids(&self, pool: &PgPool, uids: &[String]) -> Result<(), ServerError> { + // TODO: add SlurpContents and convert HTML to text use tantivy::{doc, Term}; let start_time = std::time::Instant::now(); @@ -44,11 +119,20 @@ impl TantivyConnection { let is_read = schema.get_field("is_read")?; let uid = schema.get_field("uid")?; let id = schema.get_field("id")?; + let tag = schema.get_field("tag")?; - let rows = sqlx::query_file!("sql/all-posts.sql") + info!("reindexing {} posts", uids.len()); + let rows = sqlx::query_file_as!(PostgresDoc, "sql/posts-from-uids.sql", uids) .fetch_all(pool) .await?; + if uids.len() != rows.len() { + error!( + "Had {} uids and only got {} rows: uids {uids:?}", + uids.len(), + rows.len() + ); + } let total = rows.len(); for (i, r) in rows.into_iter().enumerate() { if i % 10_000 == 0 { @@ -57,26 +141,76 @@ impl TantivyConnection { start_time.elapsed().as_secs_f32() ); } + let id_term = Term::from_field_text(uid, &r.uid); index_writer.delete_term(id_term); + let slug = r.site; + let tag_facet = Facet::from(&format!("/News/{slug}")); index_writer.add_document(doc!( - site => r.site.expect("UNKOWN_SITE"), - title => r.title.expect("UNKOWN_TITLE"), + site => slug.clone(), + title => r.title, // TODO: clean and extract text from HTML - summary => r.summary.expect("UNKNOWN_SUMMARY"), - link => r.link.expect("link"), - date => tantivy::DateTime::from_primitive(r.date.expect("date")), - is_read => r.is_read.expect("is_read"), + summary => r.summary, + link => r.link, + date => tantivy::DateTime::from_primitive(r.date), + is_read => r.is_read, uid => r.uid, - id => r.id as i64, + id => r.id as u64, + tag => tag_facet, ))?; } - index_writer.commit()?; - info!("took {:.2}s to reindex", start_time.elapsed().as_secs_f32()); + + index_writer.commit()?; Ok(()) } + pub async fn reindex_thread(&self, pool: &PgPool, query: &Query) -> Result<(), ServerError> { + let uids: Vec<_> = query + .uids + .iter() + .filter(|uid| is_newsreader_thread(uid)) + .map(|uid| extract_thread_id(uid).to_string()) + .collect(); + Ok(self.reindex_uids(pool, &uids).await?) + } + pub async fn reindex_all(&self, pool: &PgPool) -> Result<(), ServerError> { + let rows = sqlx::query_file!("sql/all-posts.sql") + .fetch_all(pool) + .await?; + + let uids: Vec = rows.into_iter().map(|r| r.uid).collect(); + self.reindex_uids(pool, &uids).await?; + Ok(()) + } + fn searcher_and_query( + &self, + term: &str, + ) -> Result<(Searcher, Box), ServerError> { + let index = self.get_index()?; + let reader = index.reader()?; + let schema = index.schema(); + let searcher = reader.searcher(); + let title = schema.get_field("title")?; + let summary = schema.get_field("summary")?; + let query_parser = QueryParser::for_index(&index, vec![title, summary]); + // Tantivy uses '*' to match all docs, not empty string + let term = if term.is_empty() { "*" } else { term }; + + info!("query_parser('{term}')"); + let query = query_parser.parse_query(&term)?; + Ok((searcher, query)) + } + + pub async fn count(&self, query: &Query) -> Result { + if !is_tantivy_query(query) { + return Ok(0); + } + use tantivy::collector::Count; + let term = query.remainder.join(" "); + let (searcher, query) = self.searcher_and_query(&term)?; + Ok(searcher.search(&query, &Count)?) + } pub async fn search( &self, pool: &PgPool, @@ -86,28 +220,51 @@ impl TantivyConnection { last: Option, query: &Query, ) -> Result, async_graphql::Error> { - use tantivy::{collector::TopDocs, query::QueryParser, Document, TantivyDocument}; - // TODO: set based on function parameters - let offset = 0; + if !is_tantivy_query(query) { + return Ok(Vec::new()); + } + let (offset, mut limit) = compute_offset_limit(after, before, first, last); + if before.is_none() { + // When searching forward, the +1 is to see if there are more pages of data available. + // Searching backwards implies there's more pages forward, because the value represented by + // `before` is on the next page. + limit = limit + 1; + } - let index = self.get_index()?; - let reader = index.reader()?; - let schema = index.schema(); - let searcher = reader.searcher(); - let site = schema.get_field("site")?; - let uid = schema.get_field("uid")?; - let title = schema.get_field("title")?; - let summary = schema.get_field("summary")?; - let date = schema.get_field("date")?; - let query_parser = QueryParser::for_index(&index, vec![title, summary]); - - let query = query_parser.parse_query(&query.remainder.join(" "))?; - let top_docs = searcher.search(&query, &TopDocs::with_limit(10))?; + let term = query.remainder.join(" "); + let (searcher, tantivy_query) = self.searcher_and_query(&term)?; + let tag = self.get_index()?.schema().get_field("tag")?; + let is_read = self.get_index()?.schema().get_field("is_read")?; + let mut terms = vec![(Occur::Must, tantivy_query)]; + for t in &query.tags { + let facet = Facet::from(&format!("/{t}")); + let facet_term = Term::from_facet(tag, &facet); + let facet_term_query = Box::new(TermQuery::new(facet_term, IndexRecordOption::Basic)); + terms.push((Occur::Must, facet_term_query)); + } + if query.unread_only { + info!("searching for unread only"); + let term = Term::from_field_bool(is_read, false); + terms.push(( + Occur::Must, + Box::new(TermQuery::new(term, IndexRecordOption::Basic)), + )); + } + let search_query = BooleanQuery::new(terms); + info!("Tantivy::search(term '{term}', off {offset}, lim {limit}, search_query {search_query:?})"); + let top_docs = searcher.search( + &search_query, + &TopDocs::with_limit(limit as usize) + .and_offset(offset as usize) + .order_by_u64_field("date", tantivy::index::Order::Desc), + )?; info!("search found {} docs", top_docs.len()); + let uid = self.get_index()?.schema().get_field("uid")?; let uids = top_docs .into_iter() - .map(|(_, doc_address)| { + .map(|(_, doc_address): (u64, DocAddress)| { searcher.doc(doc_address).map(|doc: TantivyDocument| { + debug!("doc: {doc:#?}"); doc.get_first(uid) .expect("doc missing uid") .as_str() @@ -134,6 +291,7 @@ impl TantivyConnection { title: r.title, uid: r.uid, name: r.name, + corpus: Corpus::Tantivy, }) .await, )); @@ -157,11 +315,23 @@ fn create_news_db(tantivy_db_path: &str) -> Result<(), TantivyError> { schema_builder.add_text_field("summary", TEXT); schema_builder.add_text_field("link", STRING | STORED); schema_builder.add_date_field("date", FAST | INDEXED | STORED); - schema_builder.add_bool_field("is_read", FAST); + schema_builder.add_bool_field("is_read", FAST | INDEXED | STORED); schema_builder.add_text_field("uid", STRING | STORED); - schema_builder.add_i64_field("id", FAST); + schema_builder.add_u64_field("id", FAST); + schema_builder.add_facet_field("tag", FacetOptions::default()); let schema = schema_builder.build(); Index::create_in_dir(tantivy_db_path, schema)?; Ok(()) } + +struct PostgresDoc { + site: String, + title: String, + summary: String, + link: String, + date: PrimitiveDateTime, + is_read: bool, + uid: String, + id: i32, +} diff --git a/web/graphql/front_page.graphql b/web/graphql/front_page.graphql index 79c056c..8520ee9 100644 --- a/web/graphql/front_page.graphql +++ b/web/graphql/front_page.graphql @@ -14,6 +14,7 @@ query FrontPageQuery($query: String!, $after: String $before: String, $first: In subject authors tags + corpus } } tags { diff --git a/web/graphql/refresh.graphql b/web/graphql/refresh.graphql new file mode 100644 index 0000000..2e73c6c --- /dev/null +++ b/web/graphql/refresh.graphql @@ -0,0 +1,3 @@ +mutation RefreshMutation { + refresh +} diff --git a/web/graphql/schema.json b/web/graphql/schema.json index 23400d9..80ea28f 100644 --- a/web/graphql/schema.json +++ b/web/graphql/schema.json @@ -232,6 +232,35 @@ "name": "Boolean", "possibleTypes": null }, + { + "description": null, + "enumValues": [ + { + "deprecationReason": null, + "description": null, + "isDeprecated": false, + "name": "NOTMUCH" + }, + { + "deprecationReason": null, + "description": null, + "isDeprecated": false, + "name": "NEWSREADER" + }, + { + "deprecationReason": null, + "description": null, + "isDeprecated": false, + "name": "TANTIVY" + } + ], + "fields": null, + "inputFields": null, + "interfaces": null, + "kind": "ENUM", + "name": "Corpus", + "possibleTypes": null + }, { "description": null, "enumValues": [ @@ -850,6 +879,38 @@ "ofType": null } } + }, + { + "args": [], + "deprecationReason": null, + "description": "Drop and recreate tantivy index. Warning this is slow", + "isDeprecated": false, + "name": "dropAndLoadIndex", + "type": { + "kind": "NON_NULL", + "name": null, + "ofType": { + "kind": "SCALAR", + "name": "Boolean", + "ofType": null + } + } + }, + { + "args": [], + "deprecationReason": null, + "description": null, + "isDeprecated": false, + "name": "refresh", + "type": { + "kind": "NON_NULL", + "name": null, + "ofType": { + "kind": "SCALAR", + "name": "Boolean", + "ofType": null + } + } } ], "inputFields": null, @@ -1536,6 +1597,22 @@ } } } + }, + { + "args": [], + "deprecationReason": null, + "description": null, + "isDeprecated": false, + "name": "corpus", + "type": { + "kind": "NON_NULL", + "name": null, + "ofType": { + "kind": "ENUM", + "name": "Corpus", + "ofType": null + } + } } ], "inputFields": null, diff --git a/web/src/graphql.rs b/web/src/graphql.rs index 7f3c924..351833c 100644 --- a/web/src/graphql.rs +++ b/web/src/graphql.rs @@ -44,6 +44,14 @@ pub struct AddTagMutation; )] pub struct RemoveTagMutation; +#[derive(GraphQLQuery)] +#[graphql( + schema_path = "graphql/schema.json", + query_path = "graphql/refresh.graphql", + response_derives = "Debug" +)] +pub struct RefreshMutation; + pub async fn send_graphql(body: Body) -> Result, Error> where Body: Serialize, diff --git a/web/src/state.rs b/web/src/state.rs index 9e3b5be..0930633 100644 --- a/web/src/state.rs +++ b/web/src/state.rs @@ -111,7 +111,17 @@ pub fn update(msg: Msg, model: &mut Model, orders: &mut impl Orders) { Msg::Noop => {} Msg::RefreshStart => { model.refreshing_state = RefreshingState::Loading; - orders.perform_cmd(async move { Msg::RefreshDone(api::refresh_request().await.err()) }); + orders.perform_cmd(async move { + Msg::RefreshDone( + send_graphql::<_, graphql::refresh_mutation::ResponseData>( + graphql::RefreshMutation::build_query( + graphql::refresh_mutation::Variables {}, + ), + ) + .await + .err(), + ) + }); } Msg::RefreshDone(err) => { model.refreshing_state = if let Some(err) = err { diff --git a/web/src/view/mod.rs b/web/src/view/mod.rs index 3a7ac00..3126e66 100644 --- a/web/src/view/mod.rs +++ b/web/src/view/mod.rs @@ -203,7 +203,7 @@ fn view_search_results( }), ]], td![ - C!["from"], + C!["from", format!("corpus-{:?} ", r.corpus)], a![ C!["has-text-light", "text"], attrs! {