server: use fetched contents of news for search index

This commit is contained in:
Bill Thiede 2025-01-29 14:08:20 -08:00
parent c7aa32b922
commit 12c8e0e33b
12 changed files with 168 additions and 87 deletions

1
Cargo.lock generated
View File

@ -2958,6 +2958,7 @@ dependencies = [
"memmap",
"notmuch",
"opentelemetry",
"regex",
"reqwest 0.12.9",
"rocket",
"rocket_cors",

View File

@ -26,6 +26,7 @@ maplit = "1.0.2"
memmap = "0.7.0"
notmuch = { path = "../notmuch" }
opentelemetry = "0.27.1"
regex = "1.11.1"
reqwest = { version = "0.12.7", features = ["blocking"] }
rocket = { version = "0.5.0-rc.2", features = ["json"] }
rocket_cors = "0.6.0"

View File

@ -0,0 +1,15 @@
-- Add down migration script here
BEGIN;
DROP INDEX IF EXISTS post_search_summary_idx;
ALTER TABLE post DROP search_summary;
-- CREATE INDEX post_summary_idx ON post USING gin (to_tsvector(
-- 'english',
-- regexp_replace(
-- regexp_replace(summary, '<[^>]+>', ' ', 'g'),
-- '\s+',
-- ' ',
-- 'g'
-- )
-- ));
COMMIT;

View File

@ -0,0 +1,14 @@
-- Add up migration script here
BEGIN;
DROP INDEX IF EXISTS post_summary_idx;
ALTER TABLE post ADD search_summary TEXT;
CREATE INDEX post_search_summary_idx ON post USING gin (
to_tsvector('english', search_summary)
);
UPDATE post SET search_summary = regexp_replace(
regexp_replace(summary, '<[^>]+>', ' ', 'g'),
'\s+',
' ',
'g'
);
COMMIT;

View File

@ -1,5 +1,4 @@
SELECT
COUNT(*) count
SELECT COUNT(*) AS count
FROM
post
WHERE
@ -13,5 +12,6 @@ WHERE
)
AND (
$3::text IS NULL
OR to_tsvector('english', summary) @@ websearch_to_tsquery('english', $3)
OR TO_TSVECTOR('english', search_summary)
@@ WEBSEARCH_TO_TSQUERY('english', $3)
)

View File

@ -0,0 +1,8 @@
SELECT
p.id,
link,
clean_summary
FROM
post AS p
INNER JOIN feed AS f ON p.site = f.slug -- necessary to weed out nzb posts
WHERE search_summary IS NULL;

View File

@ -16,7 +16,7 @@ WHERE
)
AND (
$5 :: text IS NULL
OR to_tsvector('english', summary) @@ websearch_to_tsquery('english', $5)
OR to_tsvector('english', search_summary) @@ websearch_to_tsquery('english', $5)
)
ORDER BY
date DESC,

View File

@ -7,6 +7,7 @@ use std::{error::Error, io::Cursor, str::FromStr};
use async_graphql::{extensions, http::GraphiQLSource, EmptySubscription, Schema};
use async_graphql_rocket::{GraphQLQuery, GraphQLRequest, GraphQLResponse};
use cacher::FilesystemCacher;
#[cfg(feature = "tantivy")]
use letterbox_server::tantivy::TantivyConnection;
use letterbox_server::{
@ -220,9 +221,10 @@ async fn main() -> Result<(), Box<dyn Error>> {
#[cfg(feature = "tantivy")]
let tantivy_conn = TantivyConnection::new(&config.newsreader_tantivy_db_path)?;
let cacher = FilesystemCacher::new(&config.slurp_cache_path)?;
let schema = Schema::build(QueryRoot, Mutation, EmptySubscription)
.data(Notmuch::default())
.data(config)
.data(cacher)
.data(pool.clone());
#[cfg(feature = "tantivy")]

View File

@ -5,6 +5,7 @@ use async_graphql::{
Context, EmptySubscription, Enum, Error, FieldResult, InputObject, Object, Schema,
SimpleObject, Union,
};
use cacher::FilesystemCacher;
use log::info;
use notmuch::Notmuch;
use serde::{Deserialize, Serialize};
@ -14,7 +15,7 @@ use tracing::instrument;
#[cfg(feature = "tantivy")]
use crate::tantivy::TantivyConnection;
use crate::{config::Config, newsreader, nm, Query};
use crate::{newsreader, nm, Query};
/// # Number of seconds since the Epoch
pub type UnixTime = isize;
@ -478,8 +479,8 @@ impl QueryRoot {
#[instrument(skip_all, fields(thread_id=thread_id, request_id=request_id()))]
async fn thread<'ctx>(&self, ctx: &Context<'ctx>, thread_id: String) -> Result<Thread, Error> {
let nm = ctx.data_unchecked::<Notmuch>();
let cacher = ctx.data_unchecked::<FilesystemCacher>();
let pool = ctx.data_unchecked::<PgPool>();
let config = ctx.data_unchecked::<Config>();
let debug_content_tree = ctx
.look_ahead()
.field("messages")
@ -487,7 +488,7 @@ impl QueryRoot {
.field("contentTree")
.exists();
if newsreader::is_newsreader_thread(&thread_id) {
Ok(newsreader::thread(config, pool, thread_id).await?)
Ok(newsreader::thread(cacher, pool, thread_id).await?)
} else {
Ok(nm::thread(nm, pool, thread_id, debug_content_tree).await?)
}
@ -609,11 +610,13 @@ impl Mutation {
#[instrument(skip_all, fields(request_id=request_id()))]
async fn refresh<'ctx>(&self, ctx: &Context<'ctx>) -> Result<bool, Error> {
let nm = ctx.data_unchecked::<Notmuch>();
let cacher = ctx.data_unchecked::<FilesystemCacher>();
let pool = ctx.data_unchecked::<PgPool>();
info!("{}", String::from_utf8_lossy(&nm.new()?));
newsreader::refresh(pool, cacher).await?;
#[cfg(feature = "tantivy")]
{
let tantivy = ctx.data_unchecked::<TantivyConnection>();
let pool = ctx.data_unchecked::<PgPool>();
// TODO: parallelize
tantivy.refresh(pool).await?;
}

View File

@ -19,6 +19,7 @@ use lol_html::{
RewriteStrSettings,
};
use maplit::{hashmap, hashset};
use regex::Regex;
use scraper::{Html, Selector};
use sqlx::types::time::PrimitiveDateTime;
use thiserror::Error;
@ -105,6 +106,8 @@ impl Transformer for StripHtml {
..RewriteStrSettings::default()
},
)?;
let re = Regex::new(r"\s+").expect("failed to parse regex");
let text = re.replace_all(&text, " ").to_string();
Ok(text)
}
@ -250,13 +253,13 @@ impl Transformer for AddOutlink {
}
}
struct SlurpContents {
cacher: Arc<Mutex<FilesystemCacher>>,
struct SlurpContents<'c> {
cacher: &'c FilesystemCacher,
inline_css: bool,
site_selectors: HashMap<String, Vec<Selector>>,
}
impl SlurpContents {
impl<'c> SlurpContents<'c> {
fn get_selectors(&self, link: &Url) -> Option<&[Selector]> {
for (host, selector) in self.site_selectors.iter() {
if link.host_str().map(|h| h.contains(host)).unwrap_or(false) {
@ -268,7 +271,7 @@ impl SlurpContents {
}
#[async_trait]
impl Transformer for SlurpContents {
impl<'c> Transformer for SlurpContents<'c> {
fn should_run(&self, link: &Option<Url>, html: &str) -> bool {
let mut will_slurp = false;
if let Some(link) = link {
@ -294,7 +297,7 @@ impl Transformer for SlurpContents {
let Some(selectors) = self.get_selectors(&link) else {
return Ok(html.to_string());
};
let cacher = self.cacher.lock().await;
let cacher = self.cacher;
let body = if let Some(body) = cacher.get(link.as_str()) {
String::from_utf8_lossy(&body).to_string()
} else {

View File

@ -1,4 +1,4 @@
use std::sync::Arc;
use std::collections::HashMap;
use cacher::FilesystemCacher;
use log::info;
@ -6,17 +6,15 @@ use maplit::hashmap;
use scraper::Selector;
use shared::compute_color;
use sqlx::postgres::PgPool;
use tokio::sync::Mutex;
use tracing::instrument;
use url::Url;
use crate::{
clean_title, compute_offset_limit,
config::Config,
error::ServerError,
graphql::{Corpus, NewsPost, Tag, Thread, ThreadSummary},
thread_summary_from_row, AddOutlink, FrameImages, Query, SanitizeHtml, SlurpContents,
ThreadSummaryRecord, Transformer, NEWSREADER_TAG_PREFIX, NEWSREADER_THREAD_PREFIX,
StripHtml, ThreadSummaryRecord, Transformer, NEWSREADER_TAG_PREFIX, NEWSREADER_THREAD_PREFIX,
};
pub fn is_newsreader_query(query: &Query) -> bool {
@ -173,7 +171,7 @@ pub async fn tags(pool: &PgPool, _needs_unread: bool) -> Result<Vec<Tag>, Server
#[instrument(name = "newsreader::thread", skip_all, fields(thread_id=%thread_id))]
pub async fn thread(
config: &Config,
cacher: &FilesystemCacher,
pool: &PgPool,
thread_id: String,
) -> Result<Thread, ServerError> {
@ -191,12 +189,106 @@ pub async fn thread(
// TODO: remove the various places that have this as an Option
let link = Some(Url::parse(&r.link)?);
let mut body = r.clean_summary.unwrap_or("NO SUMMARY".to_string());
let cacher = Arc::new(Mutex::new(FilesystemCacher::new(&config.slurp_cache_path)?));
let body_tranformers: Vec<Box<dyn Transformer>> = vec![
let body_transformers: Vec<Box<dyn Transformer>> = vec![
Box::new(SlurpContents {
cacher,
inline_css: true,
site_selectors: hashmap![
site_selectors: slurp_contents_selectors(),
}),
Box::new(FrameImages),
Box::new(AddOutlink),
// TODO: causes doubling of images in cloudflare blogs
//Box::new(EscapeHtml),
Box::new(SanitizeHtml {
cid_prefix: "",
base_url: &link,
}),
];
for t in body_transformers.iter() {
if t.should_run(&link, &body) {
body = t.transform(&link, &body).await?;
}
}
let title = clean_title(&r.title.unwrap_or("NO TITLE".to_string())).await?;
let is_read = r.is_read.unwrap_or(false);
let timestamp = r
.date
.expect("post missing date")
.assume_utc()
.unix_timestamp();
Ok(Thread::News(NewsPost {
thread_id,
is_read,
slug,
site,
title,
body,
url: link
.as_ref()
.map(|url| url.to_string())
.unwrap_or("NO URL".to_string()),
timestamp,
}))
}
#[instrument(name = "newsreader::set_read_status", skip_all, fields(query=%query,unread=%unread))]
pub async fn set_read_status<'ctx>(
pool: &PgPool,
query: &Query,
unread: bool,
) -> Result<bool, ServerError> {
// TODO: make single query when query.uids.len() > 1
let uids: Vec<_> = query
.uids
.iter()
.filter(|uid| is_newsreader_thread(uid))
.map(
|uid| extract_thread_id(uid), // TODO strip prefix
)
.collect();
for uid in uids {
sqlx::query_file!("sql/set_unread.sql", !unread, uid)
.execute(pool)
.await?;
}
Ok(true)
}
#[instrument(name = "newsreader::refresh", skip_all)]
pub async fn refresh<'ctx>(pool: &PgPool, cacher: &FilesystemCacher) -> Result<bool, ServerError> {
let body_transformers: Vec<Box<dyn Transformer>> = vec![
Box::new(SlurpContents {
cacher,
inline_css: true,
site_selectors: slurp_contents_selectors(),
}),
Box::new(StripHtml),
];
let rows = sqlx::query_file!("sql/need-search-summary.sql",)
.fetch_all(pool)
.await?;
for r in rows {
let link = Url::parse(&r.link)?;
info!("adding {link} to search index");
let link = Some(link);
let mut body = r.clean_summary.unwrap_or("NO SUMMARY".to_string());
for t in body_transformers.iter() {
if t.should_run(&link, &body) {
body = t.transform(&link, &body).await?;
}
}
sqlx::query!(
"UPDATE post SET search_summary = $1 WHERE id = $2",
body,
r.id
)
.execute(pool)
.await?;
}
Ok(true)
}
fn slurp_contents_selectors() -> HashMap<String, Vec<Selector>> {
hashmap![
"atmeta.com".to_string() => vec![
Selector::parse("div.entry-content").unwrap(),
],
@ -257,62 +349,5 @@ pub async fn thread(
Selector::parse("img#cc-comic").unwrap(),
Selector::parse("div#aftercomic img").unwrap(),
],
],
}),
Box::new(FrameImages),
Box::new(AddOutlink),
// TODO: causes doubling of images in cloudflare blogs
//Box::new(EscapeHtml),
Box::new(SanitizeHtml {
cid_prefix: "",
base_url: &link,
}),
];
for t in body_tranformers.iter() {
if t.should_run(&link, &body) {
body = t.transform(&link, &body).await?;
}
}
let title = clean_title(&r.title.unwrap_or("NO TITLE".to_string())).await?;
let is_read = r.is_read.unwrap_or(false);
let timestamp = r
.date
.expect("post missing date")
.assume_utc()
.unix_timestamp();
Ok(Thread::News(NewsPost {
thread_id,
is_read,
slug,
site,
title,
body,
url: link
.as_ref()
.map(|url| url.to_string())
.unwrap_or("NO URL".to_string()),
timestamp,
}))
}
#[instrument(name = "newsreader::set_read_status", skip_all, fields(query=%query,unread=%unread))]
pub async fn set_read_status<'ctx>(
pool: &PgPool,
query: &Query,
unread: bool,
) -> Result<bool, ServerError> {
// TODO: make single query when query.uids.len() > 1
let uids: Vec<_> = query
.uids
.iter()
.filter(|uid| is_newsreader_thread(uid))
.map(
|uid| extract_thread_id(uid), // TODO strip prefix
)
.collect();
for uid in uids {
sqlx::query_file!("sql/set_unread.sql", !unread, uid)
.execute(pool)
.await?;
}
Ok(true)
]
}

View File

@ -696,7 +696,6 @@ fn walk_attachments_inner<T, F: Fn(&ParsedMail, &[usize]) -> Option<T> + Copy>(
fn extract_attachments(m: &ParsedMail, id: &str) -> Result<Vec<Attachment>, ServerError> {
let mut attachments = Vec::new();
for (idx, sp) in m.subparts.iter().enumerate() {
info!("sp: {:?}", sp.headers);
if let Some(attachment) = extract_attachment(sp, id, &[idx]) {
// Filter out inline attachements, they're flattened into the body of the message.
if attachment.disposition == DispositionType::Attachment {