server: move tantivy code to separate mod
This commit is contained in:
parent
d1604f8e70
commit
3c8d7d4f81
@ -23,9 +23,9 @@ use server::{
|
|||||||
error::ServerError,
|
error::ServerError,
|
||||||
graphql::{Attachment, GraphqlSchema, Mutation, QueryRoot},
|
graphql::{Attachment, GraphqlSchema, Mutation, QueryRoot},
|
||||||
nm::{attachment_bytes, cid_attachment_bytes},
|
nm::{attachment_bytes, cid_attachment_bytes},
|
||||||
|
tantivy::TantivyConnection,
|
||||||
};
|
};
|
||||||
use sqlx::postgres::PgPool;
|
use sqlx::postgres::PgPool;
|
||||||
use tantivy::{Index, IndexWriter};
|
|
||||||
|
|
||||||
#[get("/refresh")]
|
#[get("/refresh")]
|
||||||
async fn refresh(nm: &State<Notmuch>) -> Result<Json<String>, Debug<NotmuchError>> {
|
async fn refresh(nm: &State<Notmuch>) -> Result<Json<String>, Debug<NotmuchError>> {
|
||||||
@ -166,124 +166,19 @@ fn graphiql() -> content::RawHtml<String> {
|
|||||||
content::RawHtml(GraphiQLSource::build().endpoint("/api/graphql").finish())
|
content::RawHtml(GraphiQLSource::build().endpoint("/api/graphql").finish())
|
||||||
}
|
}
|
||||||
|
|
||||||
#[rocket::post("/create-news-db")]
|
|
||||||
fn create_news_db(config: &State<Config>) -> Result<String, Debug<ServerError>> {
|
|
||||||
create_news_db_impl(config)?;
|
|
||||||
Ok(format!(
|
|
||||||
"DB created in {}\n",
|
|
||||||
config.newsreader_tantivy_db_path
|
|
||||||
))
|
|
||||||
}
|
|
||||||
fn create_news_db_impl(config: &Config) -> Result<(), ServerError> {
|
|
||||||
std::fs::remove_dir_all(&config.newsreader_tantivy_db_path).map_err(ServerError::from)?;
|
|
||||||
std::fs::create_dir_all(&config.newsreader_tantivy_db_path).map_err(ServerError::from)?;
|
|
||||||
use tantivy::schema::*;
|
|
||||||
let mut schema_builder = Schema::builder();
|
|
||||||
schema_builder.add_text_field("site", STRING | STORED);
|
|
||||||
schema_builder.add_text_field("title", TEXT | STORED);
|
|
||||||
schema_builder.add_text_field("summary", TEXT);
|
|
||||||
schema_builder.add_text_field("link", STRING | STORED);
|
|
||||||
schema_builder.add_date_field("date", FAST);
|
|
||||||
schema_builder.add_bool_field("is_read", FAST);
|
|
||||||
schema_builder.add_text_field("uid", STRING | STORED);
|
|
||||||
schema_builder.add_i64_field("id", FAST);
|
|
||||||
|
|
||||||
let schema = schema_builder.build();
|
|
||||||
Index::create_in_dir(&config.newsreader_tantivy_db_path, schema).map_err(ServerError::from)?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
#[rocket::post("/reindex-news-db")]
|
#[rocket::post("/reindex-news-db")]
|
||||||
async fn reindex_news_db(
|
async fn reindex_news_db(
|
||||||
pool: &State<PgPool>,
|
pool: &State<PgPool>,
|
||||||
config: &State<Config>,
|
tantivy_conn: &State<TantivyConnection>,
|
||||||
) -> Result<String, Debug<ServerError>> {
|
) -> Result<String, Debug<ServerError>> {
|
||||||
use tantivy::{doc, Term};
|
tantivy_conn.reindex(pool).await?;
|
||||||
|
Ok(format!("Reindexed tantivy\n"))
|
||||||
let start_time = std::time::Instant::now();
|
|
||||||
let pool: &PgPool = pool;
|
|
||||||
|
|
||||||
let index =
|
|
||||||
Index::open_in_dir(&config.newsreader_tantivy_db_path).map_err(ServerError::from)?;
|
|
||||||
let mut index_writer = index.writer(50_000_000).map_err(ServerError::from)?;
|
|
||||||
let schema = index.schema();
|
|
||||||
let site = schema.get_field("site").map_err(ServerError::from)?;
|
|
||||||
let title = schema.get_field("title").map_err(ServerError::from)?;
|
|
||||||
let summary = schema.get_field("summary").map_err(ServerError::from)?;
|
|
||||||
let link = schema.get_field("link").map_err(ServerError::from)?;
|
|
||||||
let date = schema.get_field("date").map_err(ServerError::from)?;
|
|
||||||
let is_read = schema.get_field("is_read").map_err(ServerError::from)?;
|
|
||||||
let uid = schema.get_field("uid").map_err(ServerError::from)?;
|
|
||||||
let id = schema.get_field("id").map_err(ServerError::from)?;
|
|
||||||
|
|
||||||
let rows = sqlx::query_file!("sql/all-posts.sql")
|
|
||||||
.fetch_all(pool)
|
|
||||||
.await
|
|
||||||
.map_err(ServerError::from)?;
|
|
||||||
|
|
||||||
let total = rows.len();
|
|
||||||
for (i, r) in rows.into_iter().enumerate() {
|
|
||||||
if i % 10_000 == 0 {
|
|
||||||
info!(
|
|
||||||
"{i}/{total} processed, elapsed {:.2}s",
|
|
||||||
start_time.elapsed().as_secs_f32()
|
|
||||||
);
|
|
||||||
}
|
|
||||||
let id_term = Term::from_field_text(uid, &r.uid);
|
|
||||||
index_writer.delete_term(id_term);
|
|
||||||
index_writer
|
|
||||||
.add_document(doc!(
|
|
||||||
site => r.site.expect("UNKOWN_SITE"),
|
|
||||||
title => r.title.expect("UNKOWN_TITLE"),
|
|
||||||
// TODO: clean and extract text from HTML
|
|
||||||
summary => r.summary.expect("UNKNOWN_SUMMARY"),
|
|
||||||
link => r.link.expect("link"),
|
|
||||||
date => tantivy::DateTime::from_primitive(r.date.expect("date")),
|
|
||||||
is_read => r.is_read.expect("is_read"),
|
|
||||||
uid => r.uid,
|
|
||||||
id => r.id as i64,
|
|
||||||
))
|
|
||||||
.map_err(ServerError::from)?;
|
|
||||||
}
|
|
||||||
|
|
||||||
index_writer.commit().map_err(ServerError::from)?;
|
|
||||||
|
|
||||||
info!("took {:.2}s to reindex", start_time.elapsed().as_secs_f32());
|
|
||||||
Ok(format!(
|
|
||||||
"DB openned in {}\n",
|
|
||||||
config.newsreader_tantivy_db_path
|
|
||||||
))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[rocket::get("/search-news-db")]
|
#[rocket::get("/search-news-db")]
|
||||||
fn search_news_db(
|
fn search_news_db(tantivy_conn: &State<TantivyConnection>) -> Result<String, Debug<ServerError>> {
|
||||||
index: &State<tantivy::Index>,
|
let res = tantivy_conn.search().map_err(ServerError::from)?;
|
||||||
reader: &State<tantivy::IndexReader>,
|
Ok(format!("{}", res))
|
||||||
) -> Result<String, Debug<ServerError>> {
|
|
||||||
use tantivy::{collector::TopDocs, query::QueryParser, Document, TantivyDocument};
|
|
||||||
|
|
||||||
let searcher = reader.searcher();
|
|
||||||
let schema = index.schema();
|
|
||||||
let site = schema.get_field("site").map_err(ServerError::from)?;
|
|
||||||
let title = schema.get_field("title").map_err(ServerError::from)?;
|
|
||||||
let summary = schema.get_field("summary").map_err(ServerError::from)?;
|
|
||||||
let query_parser = QueryParser::for_index(&index, vec![site, title, summary]);
|
|
||||||
|
|
||||||
let query = query_parser
|
|
||||||
.parse_query("grapheme")
|
|
||||||
.map_err(ServerError::from)?;
|
|
||||||
let top_docs = searcher
|
|
||||||
.search(&query, &TopDocs::with_limit(10))
|
|
||||||
.map_err(ServerError::from)?;
|
|
||||||
let mut results = vec![];
|
|
||||||
info!("search found {} docs", top_docs.len());
|
|
||||||
for (_score, doc_address) in top_docs {
|
|
||||||
let retrieved_doc: TantivyDocument =
|
|
||||||
searcher.doc(doc_address).map_err(ServerError::from)?;
|
|
||||||
results.push(format!("{}", retrieved_doc.to_json(&schema)));
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(format!("{}", results.join(" ")))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[rocket::get("/graphql?<query..>")]
|
#[rocket::get("/graphql?<query..>")]
|
||||||
@ -328,7 +223,6 @@ async fn main() -> Result<(), Box<dyn Error>> {
|
|||||||
.mount(
|
.mount(
|
||||||
shared::urls::MOUNT_POINT,
|
shared::urls::MOUNT_POINT,
|
||||||
routes![
|
routes![
|
||||||
create_news_db,
|
|
||||||
reindex_news_db,
|
reindex_news_db,
|
||||||
search_news_db,
|
search_news_db,
|
||||||
original,
|
original,
|
||||||
@ -352,14 +246,9 @@ async fn main() -> Result<(), Box<dyn Error>> {
|
|||||||
std::fs::create_dir_all(&config.slurp_cache_path)?;
|
std::fs::create_dir_all(&config.slurp_cache_path)?;
|
||||||
}
|
}
|
||||||
let pool = PgPool::connect(&config.newsreader_database_url).await?;
|
let pool = PgPool::connect(&config.newsreader_database_url).await?;
|
||||||
let tantivy_newsreader_index = match Index::open_in_dir(&config.newsreader_tantivy_db_path) {
|
let tantivy_conn =
|
||||||
Ok(idx) => idx,
|
TantivyConnection::new(&config.newsreader_tantivy_db_path)?;
|
||||||
Err(_) => {
|
|
||||||
create_news_db_impl(&config)?;
|
|
||||||
Index::open_in_dir(&config.newsreader_tantivy_db_path)?
|
|
||||||
}
|
|
||||||
};
|
|
||||||
let tantivy_newsreader_reader = tantivy_newsreader_index.reader()?;
|
|
||||||
let schema = Schema::build(QueryRoot, Mutation, EmptySubscription)
|
let schema = Schema::build(QueryRoot, Mutation, EmptySubscription)
|
||||||
.data(Notmuch::default())
|
.data(Notmuch::default())
|
||||||
.data(config)
|
.data(config)
|
||||||
@ -371,8 +260,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
|
|||||||
.manage(schema)
|
.manage(schema)
|
||||||
.manage(pool)
|
.manage(pool)
|
||||||
.manage(Notmuch::default())
|
.manage(Notmuch::default())
|
||||||
.manage(tantivy_newsreader_index)
|
.manage(tantivy_conn);
|
||||||
.manage(tantivy_newsreader_reader);
|
|
||||||
//.manage(Notmuch::with_config("../notmuch/testdata/notmuch.config"))
|
//.manage(Notmuch::with_config("../notmuch/testdata/notmuch.config"))
|
||||||
|
|
||||||
rkt.launch().await?;
|
rkt.launch().await?;
|
||||||
|
|||||||
@ -3,6 +3,7 @@ pub mod error;
|
|||||||
pub mod graphql;
|
pub mod graphql;
|
||||||
pub mod newsreader;
|
pub mod newsreader;
|
||||||
pub mod nm;
|
pub mod nm;
|
||||||
|
pub mod tantivy;
|
||||||
|
|
||||||
use std::{collections::HashMap, convert::Infallible, str::FromStr, sync::Arc};
|
use std::{collections::HashMap, convert::Infallible, str::FromStr, sync::Arc};
|
||||||
|
|
||||||
|
|||||||
111
server/src/tantivy.rs
Normal file
111
server/src/tantivy.rs
Normal file
@ -0,0 +1,111 @@
|
|||||||
|
use log::info;
|
||||||
|
use sqlx::postgres::PgPool;
|
||||||
|
use tantivy::{Index, IndexWriter, TantivyError};
|
||||||
|
|
||||||
|
use crate::error::ServerError;
|
||||||
|
|
||||||
|
pub struct TantivyConnection {
|
||||||
|
index: Index,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TantivyConnection {
|
||||||
|
pub fn new(tantivy_db_path: &str) -> Result<TantivyConnection, TantivyError> {
|
||||||
|
let index = match Index::open_in_dir(tantivy_db_path) {
|
||||||
|
Ok(idx) => idx,
|
||||||
|
Err(_) => {
|
||||||
|
create_news_db(tantivy_db_path)?;
|
||||||
|
Index::open_in_dir(tantivy_db_path)?
|
||||||
|
}
|
||||||
|
};
|
||||||
|
Ok(TantivyConnection { index })
|
||||||
|
}
|
||||||
|
pub async fn reindex(&self, pool: &PgPool) -> Result<(), ServerError> {
|
||||||
|
use tantivy::{doc, Term};
|
||||||
|
|
||||||
|
let start_time = std::time::Instant::now();
|
||||||
|
let pool: &PgPool = pool;
|
||||||
|
|
||||||
|
let mut index_writer = self.index.writer(50_000_000)?;
|
||||||
|
let schema = self.index.schema();
|
||||||
|
let site = schema.get_field("site")?;
|
||||||
|
let title = schema.get_field("title")?;
|
||||||
|
let summary = schema.get_field("summary")?;
|
||||||
|
let link = schema.get_field("link")?;
|
||||||
|
let date = schema.get_field("date")?;
|
||||||
|
let is_read = schema.get_field("is_read")?;
|
||||||
|
let uid = schema.get_field("uid")?;
|
||||||
|
let id = schema.get_field("id")?;
|
||||||
|
|
||||||
|
let rows = sqlx::query_file!("sql/all-posts.sql")
|
||||||
|
.fetch_all(pool)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
let total = rows.len();
|
||||||
|
for (i, r) in rows.into_iter().enumerate() {
|
||||||
|
if i % 10_000 == 0 {
|
||||||
|
info!(
|
||||||
|
"{i}/{total} processed, elapsed {:.2}s",
|
||||||
|
start_time.elapsed().as_secs_f32()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
let id_term = Term::from_field_text(uid, &r.uid);
|
||||||
|
index_writer.delete_term(id_term);
|
||||||
|
index_writer.add_document(doc!(
|
||||||
|
site => r.site.expect("UNKOWN_SITE"),
|
||||||
|
title => r.title.expect("UNKOWN_TITLE"),
|
||||||
|
// TODO: clean and extract text from HTML
|
||||||
|
summary => r.summary.expect("UNKNOWN_SUMMARY"),
|
||||||
|
link => r.link.expect("link"),
|
||||||
|
date => tantivy::DateTime::from_primitive(r.date.expect("date")),
|
||||||
|
is_read => r.is_read.expect("is_read"),
|
||||||
|
uid => r.uid,
|
||||||
|
id => r.id as i64,
|
||||||
|
))?;
|
||||||
|
}
|
||||||
|
|
||||||
|
index_writer.commit()?;
|
||||||
|
|
||||||
|
info!("took {:.2}s to reindex", start_time.elapsed().as_secs_f32());
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
pub fn search(&self) -> Result<String, TantivyError> {
|
||||||
|
use tantivy::{collector::TopDocs, query::QueryParser, Document, TantivyDocument};
|
||||||
|
|
||||||
|
let reader = self.index.reader()?;
|
||||||
|
let schema = self.index.schema();
|
||||||
|
let searcher = reader.searcher();
|
||||||
|
let site = schema.get_field("site")?;
|
||||||
|
let title = schema.get_field("title")?;
|
||||||
|
let summary = schema.get_field("summary")?;
|
||||||
|
let query_parser = QueryParser::for_index(&self.index, vec![site, title, summary]);
|
||||||
|
|
||||||
|
let query = query_parser.parse_query("grapheme")?;
|
||||||
|
let top_docs = searcher.search(&query, &TopDocs::with_limit(10))?;
|
||||||
|
let mut results = vec![];
|
||||||
|
info!("search found {} docs", top_docs.len());
|
||||||
|
for (_score, doc_address) in top_docs {
|
||||||
|
let retrieved_doc: TantivyDocument = searcher.doc(doc_address)?;
|
||||||
|
results.push(format!("{}", retrieved_doc.to_json(&schema)));
|
||||||
|
}
|
||||||
|
Ok(results.join(" "))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn create_news_db(tantivy_db_path: &str) -> Result<(), TantivyError> {
|
||||||
|
std::fs::remove_dir_all(tantivy_db_path)?;
|
||||||
|
std::fs::create_dir_all(tantivy_db_path)?;
|
||||||
|
use tantivy::schema::*;
|
||||||
|
let mut schema_builder = Schema::builder();
|
||||||
|
schema_builder.add_text_field("site", STRING | STORED);
|
||||||
|
schema_builder.add_text_field("title", TEXT | STORED);
|
||||||
|
schema_builder.add_text_field("summary", TEXT);
|
||||||
|
schema_builder.add_text_field("link", STRING | STORED);
|
||||||
|
schema_builder.add_date_field("date", FAST);
|
||||||
|
schema_builder.add_bool_field("is_read", FAST);
|
||||||
|
schema_builder.add_text_field("uid", STRING | STORED);
|
||||||
|
schema_builder.add_i64_field("id", FAST);
|
||||||
|
|
||||||
|
let schema = schema_builder.build();
|
||||||
|
Index::create_in_dir(tantivy_db_path, schema)?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
Loading…
x
Reference in New Issue
Block a user