diff --git a/Cargo.lock b/Cargo.lock index 3ce8f19..71ee0e5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2949,6 +2949,7 @@ dependencies = [ "chrono", "clap", "css-inline", + "futures 0.3.31", "html-escape", "linkify", "log", diff --git a/server/Cargo.toml b/server/Cargo.toml index 9f0a090..4ad24ba 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -17,6 +17,7 @@ cacher = { version = "0.1.0", registry = "xinu" } chrono = "0.4.39" clap = { version = "4.5.23", features = ["derive"] } css-inline = "0.13.0" +futures = "0.3.31" html-escape = "0.2.13" linkify = "0.10.0" log = "0.4.17" diff --git a/server/sql/need-search-summary.sql b/server/sql/need-search-summary.sql index 6ae95eb..9dad53d 100644 --- a/server/sql/need-search-summary.sql +++ b/server/sql/need-search-summary.sql @@ -3,6 +3,7 @@ SELECT link, clean_summary FROM +-- Remoe tablesample when db sufficiently indexed post AS p TABLESAMPLE SYSTEM (.1) INNER JOIN feed AS f ON p.site = f.slug -- necessary to weed out nzb posts WHERE search_summary IS NULL; diff --git a/server/src/newsreader.rs b/server/src/newsreader.rs index fcd3be0..692d4b6 100644 --- a/server/src/newsreader.rs +++ b/server/src/newsreader.rs @@ -1,6 +1,7 @@ use std::collections::HashMap; use cacher::FilesystemCacher; +use futures::{stream::FuturesUnordered, StreamExt}; use log::info; use maplit::hashmap; use scraper::Selector; @@ -254,23 +255,25 @@ pub async fn set_read_status<'ctx>( } #[instrument(name = "newsreader::refresh", skip_all)] pub async fn refresh<'ctx>(pool: &PgPool, cacher: &FilesystemCacher) -> Result { - let body_transformers: Vec> = vec![ - Box::new(SlurpContents { - cacher, - inline_css: true, - site_selectors: slurp_contents_selectors(), - }), - Box::new(StripHtml), - ]; + async fn update_search_summary( + pool: &PgPool, + cacher: &FilesystemCacher, + link: Url, + body: String, + id: i32, + ) -> Result<(), ServerError> { + let body_transformers: Vec> = vec![ + Box::new(SlurpContents { + cacher, + inline_css: true, + site_selectors: slurp_contents_selectors(), + }), + Box::new(StripHtml), + ]; - let rows = sqlx::query_file!("sql/need-search-summary.sql",) - .fetch_all(pool) - .await?; - for r in rows { - let link = Url::parse(&r.link)?; info!("adding {link} to search index"); + let mut body = body; let link = Some(link); - let mut body = r.clean_summary.unwrap_or("NO SUMMARY".to_string()); for t in body_transformers.iter() { if t.should_run(&link, &body) { body = t.transform(&link, &body).await?; @@ -279,10 +282,35 @@ pub async fn refresh<'ctx>(pool: &PgPool, cacher: &FilesystemCacher) -> Result = sqlx::query_file!("sql/need-search-summary.sql",) + .fetch_all(pool) + .await? + .into_iter() + .map(|r| { + let link = Url::parse(&r.link).expect("failed to parse link"); + let body = r.clean_summary.unwrap_or("NO SUMMARY".to_string()); + let id = r.id; + update_search_summary(pool, cacher, link, body, id) + }) + .collect(); + + while let Some(res) = unordered.next().await { + //let res = res; + match res { + Ok(()) => {} + Err(err) => { + info!("failed refresh {err:?}"); + // TODO: + //fd.error = Some(err); + } + }; } Ok(true) }