server: fetch search summaries in parallel

This commit is contained in:
Bill Thiede 2025-01-29 15:43:46 -08:00
parent d9d58afed9
commit ab47f32b52
4 changed files with 46 additions and 15 deletions

1
Cargo.lock generated
View File

@ -2949,6 +2949,7 @@ dependencies = [
"chrono", "chrono",
"clap", "clap",
"css-inline", "css-inline",
"futures 0.3.31",
"html-escape", "html-escape",
"linkify", "linkify",
"log", "log",

View File

@ -17,6 +17,7 @@ cacher = { version = "0.1.0", registry = "xinu" }
chrono = "0.4.39" chrono = "0.4.39"
clap = { version = "4.5.23", features = ["derive"] } clap = { version = "4.5.23", features = ["derive"] }
css-inline = "0.13.0" css-inline = "0.13.0"
futures = "0.3.31"
html-escape = "0.2.13" html-escape = "0.2.13"
linkify = "0.10.0" linkify = "0.10.0"
log = "0.4.17" log = "0.4.17"

View File

@ -3,6 +3,7 @@ SELECT
link, link,
clean_summary clean_summary
FROM FROM
-- Remoe tablesample when db sufficiently indexed
post AS p TABLESAMPLE SYSTEM (.1) post AS p TABLESAMPLE SYSTEM (.1)
INNER JOIN feed AS f ON p.site = f.slug -- necessary to weed out nzb posts INNER JOIN feed AS f ON p.site = f.slug -- necessary to weed out nzb posts
WHERE search_summary IS NULL; WHERE search_summary IS NULL;

View File

@ -1,6 +1,7 @@
use std::collections::HashMap; use std::collections::HashMap;
use cacher::FilesystemCacher; use cacher::FilesystemCacher;
use futures::{stream::FuturesUnordered, StreamExt};
use log::info; use log::info;
use maplit::hashmap; use maplit::hashmap;
use scraper::Selector; use scraper::Selector;
@ -254,6 +255,13 @@ pub async fn set_read_status<'ctx>(
} }
#[instrument(name = "newsreader::refresh", skip_all)] #[instrument(name = "newsreader::refresh", skip_all)]
pub async fn refresh<'ctx>(pool: &PgPool, cacher: &FilesystemCacher) -> Result<bool, ServerError> { pub async fn refresh<'ctx>(pool: &PgPool, cacher: &FilesystemCacher) -> Result<bool, ServerError> {
async fn update_search_summary(
pool: &PgPool,
cacher: &FilesystemCacher,
link: Url,
body: String,
id: i32,
) -> Result<(), ServerError> {
let body_transformers: Vec<Box<dyn Transformer>> = vec![ let body_transformers: Vec<Box<dyn Transformer>> = vec![
Box::new(SlurpContents { Box::new(SlurpContents {
cacher, cacher,
@ -263,14 +271,9 @@ pub async fn refresh<'ctx>(pool: &PgPool, cacher: &FilesystemCacher) -> Result<b
Box::new(StripHtml), Box::new(StripHtml),
]; ];
let rows = sqlx::query_file!("sql/need-search-summary.sql",)
.fetch_all(pool)
.await?;
for r in rows {
let link = Url::parse(&r.link)?;
info!("adding {link} to search index"); info!("adding {link} to search index");
let mut body = body;
let link = Some(link); let link = Some(link);
let mut body = r.clean_summary.unwrap_or("NO SUMMARY".to_string());
for t in body_transformers.iter() { for t in body_transformers.iter() {
if t.should_run(&link, &body) { if t.should_run(&link, &body) {
body = t.transform(&link, &body).await?; body = t.transform(&link, &body).await?;
@ -279,10 +282,35 @@ pub async fn refresh<'ctx>(pool: &PgPool, cacher: &FilesystemCacher) -> Result<b
sqlx::query!( sqlx::query!(
"UPDATE post SET search_summary = $1 WHERE id = $2", "UPDATE post SET search_summary = $1 WHERE id = $2",
body, body,
r.id id
) )
.execute(pool) .execute(pool)
.await?; .await?;
Ok(())
}
let mut unordered: FuturesUnordered<_> = sqlx::query_file!("sql/need-search-summary.sql",)
.fetch_all(pool)
.await?
.into_iter()
.map(|r| {
let link = Url::parse(&r.link).expect("failed to parse link");
let body = r.clean_summary.unwrap_or("NO SUMMARY".to_string());
let id = r.id;
update_search_summary(pool, cacher, link, body, id)
})
.collect();
while let Some(res) = unordered.next().await {
//let res = res;
match res {
Ok(()) => {}
Err(err) => {
info!("failed refresh {err:?}");
// TODO:
//fd.error = Some(err);
}
};
} }
Ok(true) Ok(true)
} }