Compare commits

..

No commits in common. "b8ef753f85d7ab43e8d260179d915b931e6156a5" and "b11f6b51495f330e2bbe40be7ec52dac42b6faf1" have entirely different histories.

22 changed files with 923 additions and 784 deletions

724
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -8,7 +8,7 @@ authors = ["Bill Thiede <git@xinu.tv>"]
edition = "2021"
license = "UNLICENSED"
publish = ["xinu"]
version = "0.15.0"
version = "0.12.1"
repository = "https://git.z.xinu.tv/wathiede/letterbox"
[profile.dev]

View File

@ -470,7 +470,7 @@ pub enum NotmuchError {
MailParseError(#[from] mailparse::MailParseError),
}
#[derive(Clone, Default)]
#[derive(Default)]
pub struct Notmuch {
config_path: Option<PathBuf>,
}

View File

@ -0,0 +1,22 @@
{
"db_name": "PostgreSQL",
"query": "\nSELECT id\nFROM feed\nWHERE slug = $1\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "id",
"type_info": "Int4"
}
],
"parameters": {
"Left": [
"Text"
]
},
"nullable": [
false
]
},
"hash": "dabd12987369cb273c0191d46645c376439d246d5a697340574c6afdac93d2cc"
}

View File

@ -0,0 +1,24 @@
{
"db_name": "PostgreSQL",
"query": "\nINSERT INTO feed ( name, slug, url, homepage, selector )\nVALUES ( $1, $2, $3, '', '' )\nRETURNING id\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "id",
"type_info": "Int4"
}
],
"parameters": {
"Left": [
"Text",
"Text",
"Text"
]
},
"nullable": [
false
]
},
"hash": "e2a448aaf4fe92fc1deda10bf844f6b9225d35758cba7c9f337c1a730aee41bd"
}

View File

@ -15,19 +15,15 @@ version.workspace = true
ammonia = "4.0.0"
anyhow = "1.0.79"
async-graphql = { version = "7", features = ["log"] }
async-graphql-axum = "7.0.15"
async-graphql-rocket = "7"
async-trait = "0.1.81"
axum = { version = "0.8.3", features = ["ws"] }
axum-macros = "0.5.0"
build-info = "0.0.40"
cacher = { version = "0.2.0", registry = "xinu" }
chrono = "0.4.39"
clap = { version = "4.5.23", features = ["derive"] }
css-inline = "0.14.0"
futures = "0.3.31"
headers = "0.4.0"
html-escape = "0.2.13"
letterbox-notmuch = { version = "0.15.0", path = "../notmuch", registry = "xinu" }
letterbox-shared = { version = "0.15.0", path = "../shared", registry = "xinu" }
linkify = "0.10.0"
log = "0.4.17"
lol_html = "2.0.0"
@ -36,6 +32,8 @@ maplit = "1.0.2"
memmap = "0.7.0"
regex = "1.11.1"
reqwest = { version = "0.12.7", features = ["blocking"] }
rocket = { version = "0.5.0-rc.2", features = ["json"] }
rocket_cors = "0.6.0"
scraper = "0.23.0"
serde = { version = "1.0.147", features = ["derive"] }
serde_json = "1.0.87"
@ -43,13 +41,14 @@ sqlx = { version = "0.8.2", features = ["postgres", "runtime-tokio", "time"] }
tantivy = { version = "0.24.0", optional = true }
thiserror = "2.0.0"
tokio = "1.26.0"
tower-http = { version = "0.6.2", features = ["trace"] }
tracing = "0.1.41"
url = "2.5.2"
urlencoding = "2.1.3"
#xtracing = { git = "http://git-private.h.xinu.tv/wathiede/xtracing.git" }
#xtracing = { path = "../../xtracing" }
#xtracing = { git = "http://git-private.h.xinu.tv/wathiede/xtracing.git" }
xtracing = { version = "0.3.0", registry = "xinu" }
letterbox-notmuch = { version = "0.12.1", path = "../notmuch", registry = "xinu" }
letterbox-shared = { version = "0.12.1", path = "../shared", registry = "xinu" }
[build-dependencies]
build-info-build = "0.0.40"

View File

@ -5,6 +5,7 @@ newsreader_database_url = "postgres://newsreader@nixos-07.h.xinu.tv/newsreader"
newsreader_tantivy_db_path = "../target/database/newsreader"
[debug]
address = "0.0.0.0"
port = 9345
# Uncomment to make it production like.
#log_level = "critical"

View File

@ -0,0 +1,22 @@
use clap::Parser;
use letterbox_server::mail::read_mail_to_db;
use sqlx::postgres::PgPool;
/// Add certain emails as posts in newsfeed app.
#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
struct Args {
/// DB URL, something like postgres://newsreader@nixos-07.h.xinu.tv/newsreader
#[arg(short, long)]
db_url: String,
/// path to parse
path: String,
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let _guard = xtracing::init(env!("CARGO_BIN_NAME"))?;
let args = Args::parse();
let pool = PgPool::connect(&args.db_url).await?;
read_mail_to_db(&pool, &args.path).await?;
Ok(())
}

View File

@ -1,100 +1,114 @@
// Rocket generates a lot of warnings for handlers
// TODO: figure out why
#![allow(unreachable_patterns)]
use std::{error::Error, net::SocketAddr, sync::Arc, time::Duration};
#[macro_use]
extern crate rocket;
use std::{error::Error, io::Cursor, str::FromStr};
use async_graphql::{extensions, http::GraphiQLSource, Schema};
use async_graphql_axum::{GraphQL, GraphQLSubscription};
//allows to extract the IP of connecting user
use axum::extract::connect_info::ConnectInfo;
use axum::{
extract::{self, ws::WebSocketUpgrade, State},
http::{header, StatusCode},
response::{self, IntoResponse, Response},
routing::{any, get},
Router,
};
use async_graphql::{extensions, http::GraphiQLSource, EmptySubscription, Schema};
use async_graphql_rocket::{GraphQLQuery, GraphQLRequest, GraphQLResponse};
use cacher::FilesystemCacher;
use letterbox_notmuch::Notmuch;
use letterbox_notmuch::{Notmuch, NotmuchError, ThreadSet};
#[cfg(feature = "tantivy")]
use letterbox_server::tantivy::TantivyConnection;
use letterbox_server::{
config::Config,
graphql::{compute_catchup_ids, Attachment, MutationRoot, QueryRoot, SubscriptionRoot},
error::ServerError,
graphql::{Attachment, GraphqlSchema, Mutation, QueryRoot},
nm::{attachment_bytes, cid_attachment_bytes},
ws::ConnectionTracker,
};
use letterbox_shared::WebsocketMessage;
use rocket::{
fairing::AdHoc,
http::{ContentType, Header},
request::Request,
response::{content, Debug, Responder},
serde::json::Json,
Response, State,
};
use rocket_cors::{AllowedHeaders, AllowedOrigins};
use sqlx::postgres::PgPool;
use tokio::{net::TcpListener, sync::Mutex};
use tower_http::trace::{DefaultMakeSpan, TraceLayer};
use tracing::info;
// Make our own error that wraps `anyhow::Error`.
struct AppError(letterbox_server::ServerError);
#[get("/show/<query>/pretty")]
async fn show_pretty(
nm: &State<Notmuch>,
query: &str,
) -> Result<Json<ThreadSet>, Debug<ServerError>> {
let query = urlencoding::decode(query).map_err(|e| ServerError::from(NotmuchError::from(e)))?;
let res = nm.show(&query).map_err(ServerError::from)?;
Ok(Json(res))
}
// Tell axum how to convert `AppError` into a response.
impl IntoResponse for AppError {
fn into_response(self) -> Response {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("Something went wrong: {}", self.0),
)
.into_response()
#[get("/show/<query>")]
async fn show(nm: &State<Notmuch>, query: &str) -> Result<Json<ThreadSet>, Debug<NotmuchError>> {
let query = urlencoding::decode(query).map_err(NotmuchError::from)?;
let res = nm.show(&query)?;
Ok(Json(res))
}
struct InlineAttachmentResponder(Attachment);
impl<'r, 'o: 'r> Responder<'r, 'o> for InlineAttachmentResponder {
fn respond_to(self, _: &'r Request<'_>) -> rocket::response::Result<'o> {
let mut resp = Response::build();
if let Some(filename) = self.0.filename {
resp.header(Header::new(
"Content-Disposition",
format!(r#"inline; filename="{}""#, filename),
));
}
if let Some(content_type) = self.0.content_type {
if let Some(ct) = ContentType::parse_flexible(&content_type) {
resp.header(ct);
}
}
// This enables using `?` on functions that return `Result<_, letterbox_server::Error>` to turn them into
// `Result<_, AppError>`. That way you don't need to do that manually.
impl<E> From<E> for AppError
where
E: Into<letterbox_server::ServerError>,
{
fn from(err: E) -> Self {
Self(err.into())
resp.sized_body(self.0.bytes.len(), Cursor::new(self.0.bytes))
.ok()
}
}
fn inline_attachment_response(attachment: Attachment) -> impl IntoResponse {
info!("attachment filename {:?}", attachment.filename);
let mut hdr_map = headers::HeaderMap::new();
if let Some(filename) = attachment.filename {
hdr_map.insert(
header::CONTENT_DISPOSITION,
format!(r#"inline; filename="{}""#, filename)
.parse()
.unwrap(),
);
struct DownloadAttachmentResponder(Attachment);
impl<'r, 'o: 'r> Responder<'r, 'o> for DownloadAttachmentResponder {
fn respond_to(self, _: &'r Request<'_>) -> rocket::response::Result<'o> {
let mut resp = Response::build();
if let Some(filename) = self.0.filename {
resp.header(Header::new(
"Content-Disposition",
format!(r#"attachment; filename="{}""#, filename),
));
}
if let Some(ct) = attachment.content_type {
hdr_map.insert(header::CONTENT_TYPE, ct.parse().unwrap());
if let Some(content_type) = self.0.content_type {
if let Some(ct) = ContentType::parse_flexible(&content_type) {
resp.header(ct);
}
}
resp.sized_body(self.0.bytes.len(), Cursor::new(self.0.bytes))
.ok()
}
info!("hdr_map {hdr_map:?}");
(hdr_map, attachment.bytes).into_response()
}
fn download_attachment_response(attachment: Attachment) -> impl IntoResponse {
info!("attachment filename {:?}", attachment.filename);
let mut hdr_map = headers::HeaderMap::new();
if let Some(filename) = attachment.filename {
hdr_map.insert(
header::CONTENT_DISPOSITION,
format!(r#"attachment; filename="{}""#, filename)
.parse()
.unwrap(),
);
}
if let Some(ct) = attachment.content_type {
hdr_map.insert(header::CONTENT_TYPE, ct.parse().unwrap());
}
info!("hdr_map {hdr_map:?}");
(hdr_map, attachment.bytes).into_response()
#[get("/cid/<id>/<cid>")]
async fn view_cid(
nm: &State<Notmuch>,
id: &str,
cid: &str,
) -> Result<InlineAttachmentResponder, Debug<ServerError>> {
let mid = if id.starts_with("id:") {
id.to_string()
} else {
format!("id:{}", id)
};
info!("view cid attachment {mid} {cid}");
let attachment = cid_attachment_bytes(nm, &mid, &cid)?;
Ok(InlineAttachmentResponder(attachment))
}
#[axum_macros::debug_handler]
#[get("/view/attachment/<id>/<idx>/<_>")]
async fn view_attachment(
State(AppState { nm, .. }): State<AppState>,
extract::Path((id, idx, _)): extract::Path<(String, String, String)>,
) -> Result<impl IntoResponse, AppError> {
nm: &State<Notmuch>,
id: &str,
idx: &str,
) -> Result<InlineAttachmentResponder, Debug<ServerError>> {
let mid = if id.starts_with("id:") {
id.to_string()
} else {
@ -105,14 +119,16 @@ async fn view_attachment(
.split('.')
.map(|s| s.parse().expect("not a usize"))
.collect();
let attachment = attachment_bytes(&nm, &mid, &idx)?;
Ok(inline_attachment_response(attachment))
let attachment = attachment_bytes(nm, &mid, &idx)?;
Ok(InlineAttachmentResponder(attachment))
}
#[get("/download/attachment/<id>/<idx>/<_>")]
async fn download_attachment(
State(AppState { nm, .. }): State<AppState>,
extract::Path((id, idx, _)): extract::Path<(String, String, String)>,
) -> Result<impl IntoResponse, AppError> {
nm: &State<Notmuch>,
id: &str,
idx: &str,
) -> Result<DownloadAttachmentResponder, Debug<ServerError>> {
let mid = if id.starts_with("id:") {
id.to_string()
} else {
@ -123,154 +139,102 @@ async fn download_attachment(
.split('.')
.map(|s| s.parse().expect("not a usize"))
.collect();
let attachment = attachment_bytes(&nm, &mid, &idx)?;
Ok(download_attachment_response(attachment))
let attachment = attachment_bytes(nm, &mid, &idx)?;
Ok(DownloadAttachmentResponder(attachment))
}
async fn view_cid(
State(AppState { nm, .. }): State<AppState>,
extract::Path((id, cid)): extract::Path<(String, String)>,
) -> Result<impl IntoResponse, AppError> {
#[get("/original/<id>")]
async fn original(
nm: &State<Notmuch>,
id: &str,
) -> Result<(ContentType, Vec<u8>), Debug<NotmuchError>> {
let mid = if id.starts_with("id:") {
id.to_string()
} else {
format!("id:{}", id)
};
info!("view cid attachment {mid} {cid}");
let attachment = cid_attachment_bytes(&nm, &mid, &cid)?;
Ok(inline_attachment_response(attachment))
let res = nm.show_original(&mid)?;
Ok((ContentType::Plain, res))
}
async fn graphiql() -> impl IntoResponse {
response::Html(
GraphiQLSource::build()
.endpoint("/api/graphql/")
.subscription_endpoint("/api/graphql/ws")
.finish(),
)
#[rocket::get("/")]
fn graphiql() -> content::RawHtml<String> {
content::RawHtml(GraphiQLSource::build().endpoint("/api/graphql").finish())
}
async fn start_ws(
ws: WebSocketUpgrade,
ConnectInfo(addr): ConnectInfo<SocketAddr>,
State(AppState {
connection_tracker, ..
}): State<AppState>,
) -> impl IntoResponse {
ws.on_upgrade(async move |socket| connection_tracker.lock().await.add_peer(socket, addr).await)
}
#[axum_macros::debug_handler]
async fn test_handler(
State(AppState {
connection_tracker, ..
}): State<AppState>,
) -> impl IntoResponse {
connection_tracker
.lock()
.await
.send_message_all(WebsocketMessage::RefreshMessages)
.await;
"test triggered"
#[rocket::get("/graphql?<query..>")]
async fn graphql_query(schema: &State<GraphqlSchema>, query: GraphQLQuery) -> GraphQLResponse {
query.execute(schema.inner()).await
}
async fn watch_new(
nm: Notmuch,
pool: PgPool,
conn_tracker: Arc<Mutex<ConnectionTracker>>,
poll_time: Duration,
) -> Result<(), async_graphql::Error> {
let mut old_ids = Vec::new();
loop {
let ids = compute_catchup_ids(&nm, &pool, "is:unread").await?;
if old_ids != ids {
info!("old_ids: {old_ids:?}\n ids: {ids:?}");
conn_tracker
.lock()
.await
.send_message_all(WebsocketMessage::RefreshMessages)
.await
}
old_ids = ids;
tokio::time::sleep(poll_time).await;
}
#[rocket::post("/graphql", data = "<request>", format = "application/json")]
async fn graphql_request(
schema: &State<GraphqlSchema>,
request: GraphQLRequest,
) -> GraphQLResponse {
request.execute(schema.inner()).await
}
#[derive(Clone)]
struct AppState {
nm: Notmuch,
connection_tracker: Arc<Mutex<ConnectionTracker>>,
}
#[tokio::main]
#[rocket::main]
async fn main() -> Result<(), Box<dyn Error>> {
let _guard = xtracing::init(env!("CARGO_BIN_NAME"))?;
build_info::build_info!(fn bi);
info!("Build Info: {}", letterbox_shared::build_version(bi));
// TODO: move these to config
let port = 9345;
let config = Config {
newsreader_database_url: "postgres://newsreader@nixos-07.h.xinu.tv/newsreader".to_string(),
newsreader_tantivy_db_path: "../target/database/newsreader".to_string(),
slurp_cache_path: "/tmp/letterbox/slurp".to_string(),
};
let allowed_origins = AllowedOrigins::all();
let cors = rocket_cors::CorsOptions {
allowed_origins,
allowed_methods: vec!["Get"]
.into_iter()
.map(|s| FromStr::from_str(s).unwrap())
.collect(),
allowed_headers: AllowedHeaders::some(&["Authorization", "Accept"]),
allow_credentials: true,
..Default::default()
}
.to_cors()?;
let rkt = rocket::build()
.mount(
letterbox_shared::urls::MOUNT_POINT,
routes![
original,
show_pretty,
show,
graphql_query,
graphql_request,
graphiql,
view_cid,
view_attachment,
download_attachment,
],
)
.attach(cors)
.attach(AdHoc::config::<Config>());
let config: Config = rkt.figment().extract()?;
if !std::fs::exists(&config.slurp_cache_path)? {
info!("Creating slurp cache @ '{}'", &config.slurp_cache_path);
std::fs::create_dir_all(&config.slurp_cache_path)?;
}
let pool = PgPool::connect(&config.newsreader_database_url).await?;
let nm = Notmuch::default();
sqlx::migrate!("./migrations").run(&pool).await?;
#[cfg(feature = "tantivy")]
let tantivy_conn = TantivyConnection::new(&config.newsreader_tantivy_db_path)?;
let cacher = FilesystemCacher::new(&config.slurp_cache_path)?;
let schema = Schema::build(QueryRoot, MutationRoot, SubscriptionRoot)
.data(nm.clone())
let schema = Schema::build(QueryRoot, Mutation, EmptySubscription)
.data(Notmuch::default())
.data(cacher)
.data(pool.clone());
#[cfg(feature = "tantivy")]
let schema = schema.data(tantivy_conn);
let schema = schema.extension(extensions::Logger).finish();
let connection_tracker = Arc::new(Mutex::new(ConnectionTracker::default()));
let ct = Arc::clone(&connection_tracker);
let poll_time = Duration::from_secs(10);
let _h = tokio::spawn(watch_new(nm.clone(), pool, ct, poll_time));
let rkt = rkt.manage(schema).manage(pool).manage(Notmuch::default());
//.manage(Notmuch::with_config("../notmuch/testdata/notmuch.config"))
let app = Router::new()
.route("/test", get(test_handler))
.route(
"/api/download/attachment/{id}/{idx}/{*rest}",
get(download_attachment),
)
.route(
"/api/view/attachment/{id}/{idx}/{*rest}",
get(view_attachment),
)
.route("/api/cid/{id}/{cid}", get(view_cid))
.route("/api/ws", any(start_ws))
.route_service("/api/graphql/ws", GraphQLSubscription::new(schema.clone()))
.route(
"/api/graphql/",
get(graphiql).post_service(GraphQL::new(schema.clone())),
)
.with_state(AppState {
nm,
connection_tracker,
})
.layer(
TraceLayer::new_for_http()
.make_span_with(DefaultMakeSpan::default().include_headers(true)),
);
let listener = TcpListener::bind(SocketAddr::from(([0, 0, 0, 0], port)))
.await
.unwrap();
tracing::info!("listening on {}", listener.local_addr().unwrap());
axum::serve(
listener,
app.into_make_service_with_connect_info::<SocketAddr>(),
)
.await
.unwrap();
rkt.launch().await?;
Ok(())
}

View File

@ -2,12 +2,10 @@ use std::{fmt, str::FromStr};
use async_graphql::{
connection::{self, Connection, Edge, OpaqueCursor},
futures_util::Stream,
Context, Enum, Error, FieldResult, InputObject, Object, Schema, SimpleObject, Subscription,
Union,
Context, EmptySubscription, Enum, Error, FieldResult, InputObject, Object, Schema,
SimpleObject, Union,
};
use cacher::FilesystemCacher;
use futures::stream;
use letterbox_notmuch::Notmuch;
use log::info;
use serde::{Deserialize, Serialize};
@ -291,6 +289,7 @@ impl QueryRoot {
build_info::build_info!(fn bi);
Ok(letterbox_shared::build_version(bi))
}
#[instrument(skip_all, fields(query=query))]
#[instrument(skip_all, fields(query=query, rid=request_id()))]
async fn count<'ctx>(&self, ctx: &Context<'ctx>, query: String) -> Result<usize, Error> {
let nm = ctx.data_unchecked::<Notmuch>();
@ -311,7 +310,6 @@ impl QueryRoot {
info!("count {newsreader_query:?} newsreader count {newsreader_count} notmuch count {notmuch_count} tantivy count {tantivy_count} total {total}");
Ok(total)
}
#[instrument(skip_all, fields(query=query, rid=request_id()))]
async fn catchup<'ctx>(
&self,
ctx: &Context<'ctx>,
@ -319,7 +317,37 @@ impl QueryRoot {
) -> Result<Vec<String>, Error> {
let nm = ctx.data_unchecked::<Notmuch>();
let pool = ctx.data_unchecked::<PgPool>();
compute_catchup_ids(nm, pool, &query).await
let query: Query = query.parse()?;
// TODO: implement optimized versions of fetching just IDs
let newsreader_fut = newsreader_search(pool, None, None, None, None, &query);
let notmuch_fut = notmuch_search(nm, None, None, None, None, &query);
let (newsreader_results, notmuch_results) = join!(newsreader_fut, notmuch_fut);
let newsreader_results = newsreader_results?;
let notmuch_results = notmuch_results?;
info!(
"newsreader_results ({}) notmuch_results ({})",
newsreader_results.len(),
notmuch_results.len(),
);
let mut results: Vec<_> = newsreader_results
.into_iter()
.chain(notmuch_results)
.collect();
// The leading '-' is to reverse sort
results.sort_by_key(|item| match item {
ThreadSummaryCursor::Newsreader(_, ts) => -ts.timestamp,
ThreadSummaryCursor::Notmuch(_, ts) => -ts.timestamp,
});
let ids = results
.into_iter()
.map(|r| match r {
ThreadSummaryCursor::Newsreader(_, ts) => ts.thread,
ThreadSummaryCursor::Notmuch(_, ts) => ts.thread,
})
.collect();
Ok(ids)
}
// TODO: this function doesn't get parallelism, possibly because notmuch is sync and blocks,
@ -565,9 +593,9 @@ async fn tantivy_search(
.collect())
}
pub struct MutationRoot;
pub struct Mutation;
#[Object]
impl MutationRoot {
impl Mutation {
#[instrument(skip_all, fields(query=query, unread=unread, rid=request_id()))]
async fn set_read_status<'ctx>(
&self,
@ -639,51 +667,4 @@ impl MutationRoot {
}
}
pub struct SubscriptionRoot;
#[Subscription]
impl SubscriptionRoot {
async fn values(&self, _ctx: &Context<'_>) -> Result<impl Stream<Item = usize>, Error> {
Ok(stream::iter(0..10))
}
}
pub type GraphqlSchema = Schema<QueryRoot, MutationRoot, SubscriptionRoot>;
#[instrument(skip_all, fields(query=query))]
pub async fn compute_catchup_ids(
nm: &Notmuch,
pool: &PgPool,
query: &str,
) -> Result<Vec<String>, Error> {
let query: Query = query.parse()?;
// TODO: implement optimized versions of fetching just IDs
let newsreader_fut = newsreader_search(pool, None, None, None, None, &query);
let notmuch_fut = notmuch_search(nm, None, None, None, None, &query);
let (newsreader_results, notmuch_results) = join!(newsreader_fut, notmuch_fut);
let newsreader_results = newsreader_results?;
let notmuch_results = notmuch_results?;
info!(
"newsreader_results ({}) notmuch_results ({})",
newsreader_results.len(),
notmuch_results.len(),
);
let mut results: Vec<_> = newsreader_results
.into_iter()
.chain(notmuch_results)
.collect();
// The leading '-' is to reverse sort
results.sort_by_key(|item| match item {
ThreadSummaryCursor::Newsreader(_, ts) => -ts.timestamp,
ThreadSummaryCursor::Notmuch(_, ts) => -ts.timestamp,
});
let ids = results
.into_iter()
.map(|r| match r {
ThreadSummaryCursor::Newsreader(_, ts) => ts.thread,
ThreadSummaryCursor::Notmuch(_, ts) => ts.thread,
})
.collect();
Ok(ids)
}
pub type GraphqlSchema = Schema<QueryRoot, Mutation, EmptySubscription>;

View File

@ -1,10 +1,9 @@
pub mod config;
pub mod error;
pub mod graphql;
pub mod mail;
pub mod newsreader;
pub mod nm;
pub mod ws;
#[cfg(feature = "tantivy")]
pub mod tantivy;
@ -19,7 +18,6 @@ use std::{
use async_trait::async_trait;
use cacher::{Cacher, FilesystemCacher};
use css_inline::{CSSInliner, InlineError, InlineOptions};
pub use error::ServerError;
use linkify::{LinkFinder, LinkKind};
use log::{debug, error, info, warn};
use lol_html::{
@ -35,6 +33,7 @@ use thiserror::Error;
use url::Url;
use crate::{
error::ServerError,
graphql::{Corpus, ThreadSummary},
newsreader::is_newsreader_thread,
nm::is_notmuch_thread_or_id,
@ -444,16 +443,19 @@ pub fn sanitize_html(
let mut element_content_handlers = vec![
// Remove width and height attributes on elements
element!("[width],[height]", |el| {
println!("width or height {el:?}");
el.remove_attribute("width");
el.remove_attribute("height");
Ok(())
}),
// Remove width and height values from inline styles
element!("[style]", |el| {
println!("style {el:?}");
let style = el.get_attribute("style").unwrap();
let style = style
.split(";")
.filter(|s| {
println!("s {s}");
let Some((k, _)) = s.split_once(':') else {
return true;
};
@ -465,6 +467,7 @@ pub fn sanitize_html(
})
.collect::<Vec<_>>()
.join(";");
println!("style: {style}");
if let Err(e) = el.set_attribute("style", &style) {
error!("Failed to set style attribute: {e}");
}

113
server/src/mail.rs Normal file
View File

@ -0,0 +1,113 @@
use std::{fs::File, io::Read};
use mailparse::{
addrparse_header, dateparse, parse_mail, MailHeaderMap, MailParseError, ParsedMail,
};
use sqlx::postgres::PgPool;
use thiserror::Error;
use tracing::info;
#[derive(Error, Debug)]
pub enum MailError {
#[error("missing from header")]
MissingFrom,
#[error("missing from header display name")]
MissingFromDisplayName,
#[error("missing subject header")]
MissingSubject,
#[error("missing html part")]
MissingHtmlPart,
#[error("missing message ID")]
MissingMessageId,
#[error("missing date")]
MissingDate,
#[error("DB error {0}")]
SqlxError(#[from] sqlx::Error),
#[error("IO error {0}")]
IOError(#[from] std::io::Error),
#[error("mail parse error {0}")]
MailParseError(#[from] MailParseError),
}
pub async fn read_mail_to_db(pool: &PgPool, path: &str) -> Result<(), MailError> {
let mut file = File::open(path)?;
let mut buffer = Vec::new();
file.read_to_end(&mut buffer)?;
let m = parse_mail(&buffer)?;
let subject = m
.headers
.get_first_value("subject")
.ok_or(MailError::MissingSubject)?;
let from = addrparse_header(
m.headers
.get_first_header("from")
.ok_or(MailError::MissingFrom)?,
)?;
let from = from.extract_single_info().ok_or(MailError::MissingFrom)?;
let name = from.display_name.ok_or(MailError::MissingFromDisplayName)?;
let slug = name.to_lowercase().replace(' ', "-");
let url = from.addr;
let message_id = m
.headers
.get_first_value("Message-ID")
.ok_or(MailError::MissingMessageId)?;
let uid = &message_id;
let feed_id = find_feed(&pool, &name, &slug, &url).await?;
let date = dateparse(
&m.headers
.get_first_value("Date")
.ok_or(MailError::MissingDate)?,
)?;
println!("Feed: {feed_id} Subject: {}", subject);
if let Some(_m) = first_html(&m) {
info!("add email {slug} {subject} {message_id} {date} {uid} {url}");
} else {
return Err(MailError::MissingHtmlPart.into());
}
Ok(())
}
fn first_html<'m>(m: &'m ParsedMail<'m>) -> Option<&'m ParsedMail<'m>> {
for ele in m.parts() {
if ele.ctype.mimetype == "text/html" {
return Some(ele);
}
}
None
}
async fn find_feed(pool: &PgPool, name: &str, slug: &str, url: &str) -> Result<i32, MailError> {
match sqlx::query!(
r#"
SELECT id
FROM feed
WHERE slug = $1
"#,
slug
)
.fetch_one(pool)
.await
{
Err(sqlx::Error::RowNotFound) => {
let rec = sqlx::query!(
r#"
INSERT INTO feed ( name, slug, url, homepage, selector )
VALUES ( $1, $2, $3, '', '' )
RETURNING id
"#,
name,
slug,
url
)
.fetch_one(pool)
.await?;
return Ok(rec.id);
}
Ok(rec) => return Ok(rec.id),
Err(e) => return Err(e.into()),
};
}

View File

@ -338,7 +338,7 @@ pub async fn thread(
}
fn email_addresses(
_path: &str,
path: &str,
m: &ParsedMail,
header_name: &str,
) -> Result<Vec<Email>, ServerError> {
@ -349,7 +349,9 @@ fn email_addresses(
for ma in mal.into_inner() {
match ma {
mailparse::MailAddr::Group(gi) => {
if !gi.group_name.contains("ndisclosed") {}
if !gi.group_name.contains("ndisclosed") {
println!("[{path}][{header_name}] Group: {gi}");
}
}
mailparse::MailAddr::Single(s) => addrs.push(Email {
name: s.display_name,

View File

@ -1,35 +0,0 @@
use std::{collections::HashMap, net::SocketAddr};
use axum::extract::ws::{Message, WebSocket};
use letterbox_shared::WebsocketMessage;
use tracing::{info, warn};
#[derive(Default)]
pub struct ConnectionTracker {
peers: HashMap<SocketAddr, WebSocket>,
}
impl ConnectionTracker {
pub async fn add_peer(&mut self, socket: WebSocket, who: SocketAddr) {
warn!("adding {who:?} to connection tracker");
self.peers.insert(who, socket);
self.send_message_all(WebsocketMessage::RefreshMessages)
.await;
}
pub async fn send_message_all(&mut self, msg: WebsocketMessage) {
info!("send_message_all {msg}");
let m = serde_json::to_string(&msg).expect("failed to json encode WebsocketMessage");
let mut bad_peers = Vec::new();
for (who, socket) in &mut self.peers.iter_mut() {
if let Err(e) = socket.send(Message::Text(m.clone().into())).await {
warn!("{:?} is bad, scheduling for removal: {e}", who);
bad_peers.push(who.clone());
}
}
for b in bad_peers {
info!("removing bad peer {b:?}");
self.peers.remove(&b);
}
}
}

View File

@ -12,6 +12,5 @@ version.workspace = true
[dependencies]
build-info = "0.0.40"
letterbox-notmuch = { version = "0.15.0", path = "../notmuch", registry = "xinu" }
letterbox-notmuch = { version = "0.12.1", path = "../notmuch", registry = "xinu" }
serde = { version = "1.0.147", features = ["derive"] }
strum_macros = "0.27.1"

View File

@ -13,10 +13,8 @@ pub struct SearchResult {
pub total: usize,
}
#[derive(Serialize, Deserialize, Debug, strum_macros::Display)]
pub enum WebsocketMessage {
RefreshMessages,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct Message {}
pub mod urls {
pub const MOUNT_POINT: &'static str = "/api";

View File

@ -33,13 +33,10 @@ wasm-bindgen = "=0.2.100"
uuid = { version = "1.13.1", features = [
"js",
] } # direct dep to set js feature, prevents Rng issues
letterbox-shared = { version = "0.15.0", path = "../shared", registry = "xinu" }
letterbox-notmuch = { version = "0.15.0", path = "../notmuch", registry = "xinu" }
letterbox-shared = { version = "0.12.1", path = "../shared", registry = "xinu" }
letterbox-notmuch = { version = "0.12.1", path = "../notmuch", registry = "xinu" }
seed_hooks = { version = "0.4.0", registry = "xinu" }
strum_macros = "0.27.1"
gloo-console = "0.3.0"
[target.'cfg(target_arch = "wasm32")'.dependencies]
wasm-sockets = "1.0.0"
[package.metadata.wasm-pack.profile.release]
wasm-opt = ['-Os']

View File

@ -6,10 +6,6 @@ release = false
address = "0.0.0.0"
port = 6758
[[proxy]]
ws = true
backend = "ws://localhost:9345/api/ws"
[[proxy]]
backend = "http://localhost:9345/api/"

View File

@ -2,8 +2,6 @@
// - it's useful when you want to check your code with `cargo make verify`
// but some rules are too "annoying" or are not applicable for your case.)
#![allow(clippy::wildcard_imports)]
// Until https://github.com/rust-lang/rust/issues/138762 is addressed in dependencies
#![allow(wasm_c_abi)]
use log::Level;
use seed::App;
@ -13,7 +11,6 @@ mod consts;
mod graphql;
mod state;
mod view;
mod websocket;
fn main() {
// This provides better error messages in debug mode.

View File

@ -1,7 +1,6 @@
use std::collections::HashSet;
use graphql_client::GraphQLQuery;
use letterbox_shared::WebsocketMessage;
use log::{debug, error, info, warn};
use seed::{prelude::*, *};
use thiserror::Error;
@ -12,7 +11,6 @@ use crate::{
consts::SEARCH_RESULTS_PER_PAGE,
graphql,
graphql::{front_page_query::*, send_graphql, show_thread_query::*},
websocket,
};
/// Used to fake the unread string while in development
@ -40,11 +38,11 @@ pub fn init(url: Url, orders: &mut impl Orders<Msg>) -> Model {
if url.hash().is_none() {
orders.request_url(urls::search(unread_query(), 0));
} else {
orders.request_url(url.clone());
orders.request_url(url);
};
// TODO(wathiede): only do this while viewing the index? Or maybe add a new message that force
// 'notmuch new' on the server periodically?
//orders.stream(streams::interval(30_000, || Msg::RefreshStart));
orders.stream(streams::interval(30_000, || Msg::RefreshStart));
orders.subscribe(Msg::OnUrlChanged);
orders.stream(streams::window_event(Ev::Scroll, |_| Msg::WindowScrolled));
@ -62,7 +60,6 @@ pub fn init(url: Url, orders: &mut impl Orders<Msg>) -> Model {
},
catchup: None,
last_url: Url::current(),
websocket: websocket::init(&mut orders.proxy(Msg::WebSocket)),
}
}
@ -662,18 +659,6 @@ pub fn update(msg: Msg, model: &mut Model, orders: &mut impl Orders<Msg>) {
orders.send_msg(Msg::ScrollToTop);
model.catchup = None;
}
Msg::WebSocket(ws) => {
websocket::update(ws, &mut model.websocket, &mut orders.proxy(Msg::WebSocket));
while let Some(msg) = model.websocket.updates.pop_front() {
orders.send_msg(Msg::WebsocketMessage(msg));
}
}
Msg::WebsocketMessage(msg) => {
match msg {
WebsocketMessage::RefreshMessages => orders.send_msg(Msg::Refresh),
};
}
}
}
@ -706,7 +691,6 @@ pub struct Model {
pub versions: Version,
pub catchup: Option<Catchup>,
pub last_url: Url,
pub websocket: websocket::Model,
}
#[derive(Debug)]
@ -838,7 +822,4 @@ pub enum Msg {
CatchupMarkAsRead,
CatchupNext,
CatchupExit,
WebSocket(websocket::Msg),
WebsocketMessage(WebsocketMessage),
}

View File

@ -12,7 +12,7 @@ use web_sys::{HtmlElement, HtmlInputElement};
use crate::{
api::urls,
graphql::{front_page_query::*, show_thread_query::*},
state::{unread_query, CatchupItem, Context, Model, Msg, RefreshingState, Tag, Version},
state::{CatchupItem, Context, Model, Msg, RefreshingState, Tag, Version},
};
// TODO(wathiede): create a QueryString enum that wraps single and multiple message ids and thread
@ -1210,18 +1210,13 @@ fn view_header(
false
};
let query = Url::decode_uri_component(query).unwrap_or("".to_string());
let filter_all = String::new();
let filter_unread = unread_query().to_string();
let filter_news = "is:unread is:news".to_string();
let filter_mail = "is:unread is:mail".to_string();
let highlight_color = "bg-sky-800";
nav![
C!["flex", "flex-col"],
div![
C!["flex-auto", "flex"],
button![
C![IF!(is_error => "bg-red-500"), "rounded-none"],
C![IF![is_error => "bg-red-500"], "rounded-none"],
tw_classes::button(),
span![i![C![
"fa-solid",
@ -1231,35 +1226,35 @@ fn view_header(
ev(Ev::Click, |_| Msg::RefreshStart),
],
button![
IF!(query == filter_all => C![highlight_color]),
tw_classes::button(),
C!["grow", "rounded-none"],
"All",
ev(Ev::Click, |_| Msg::SearchQuery(filter_all)),
ev(Ev::Click, |_| Msg::SearchQuery(String::new())),
],
button![
IF!(query == filter_unread => C![highlight_color]),
tw_classes::button(),
C!["grow", "rounded-none"],
span![i![C!["far", "fa-envelope"]]],
" Unread",
ev(Ev::Click, |_| Msg::SearchQuery(filter_unread)),
ev(Ev::Click, |_| Msg::SearchQuery("is:unread".to_string())),
],
button![
IF!(query == filter_news => C![highlight_color]),
tw_classes::button(),
C!["grow", "rounded-none"],
span![i![C!["far", "fa-envelope"]]],
" News",
ev(Ev::Click, |_| Msg::SearchQuery(filter_news)),
ev(Ev::Click, |_| Msg::SearchQuery(
"is:unread is:news".to_string()
)),
],
button![
IF!(query == filter_mail => C![highlight_color]),
tw_classes::button(),
C!["grow", "rounded-none"],
span![i![C!["far", "fa-envelope"]]],
" Mail",
ev(Ev::Click, |_| Msg::SearchQuery(filter_mail)),
ev(Ev::Click, |_| Msg::SearchQuery(
"is:unread is:mail".to_string()
)),
],
],
div![

View File

@ -1,218 +0,0 @@
use std::{collections::VecDeque, rc::Rc};
use letterbox_shared::WebsocketMessage;
use log::{error, info};
use seed::prelude::*;
use serde::{Deserialize, Serialize};
#[cfg(not(target_arch = "wasm32"))]
#[allow(dead_code)]
mod wasm_sockets {
use std::{cell::RefCell, rc::Rc};
use thiserror::Error;
use web_sys::{CloseEvent, ErrorEvent};
#[derive(Debug)]
pub struct JsValue;
#[derive(Debug)]
pub enum ConnectionStatus {
/// Connecting to a server
Connecting,
/// Connected to a server
Connected,
/// Disconnected from a server due to an error
Error,
/// Disconnected from a server without an error
Disconnected,
}
#[derive(Debug)]
pub struct EventClient {
pub status: Rc<RefCell<ConnectionStatus>>,
}
impl EventClient {
pub fn new(_: &str) -> Result<Self, WebSocketError> {
todo!("this is a mock")
}
pub fn send_string(&self, _essage: &str) -> Result<(), JsValue> {
todo!("this is a mock")
}
pub fn set_on_error(&mut self, _: Option<Box<dyn Fn(ErrorEvent)>>) {
todo!("this is a mock")
}
pub fn set_on_connection(&mut self, _: Option<Box<dyn Fn(&EventClient)>>) {
todo!("this is a mock")
}
pub fn set_on_close(&mut self, _: Option<Box<dyn Fn(CloseEvent)>>) {
todo!("this is a mock")
}
pub fn set_on_message(&mut self, _: Option<Box<dyn Fn(&EventClient, Message)>>) {
todo!("this is a mock")
}
}
#[derive(Debug, Clone)]
pub enum Message {
Text(String),
Binary(Vec<u8>),
}
#[derive(Debug, Clone, Error)]
pub enum WebSocketError {}
}
#[cfg(not(target_arch = "wasm32"))]
use wasm_sockets::{ConnectionStatus, EventClient, Message, WebSocketError};
#[cfg(target_arch = "wasm32")]
use wasm_sockets::{ConnectionStatus, EventClient, Message, WebSocketError};
use web_sys::CloseEvent;
/// Message from the server to the client.
#[derive(Serialize, Deserialize)]
pub struct ServerMessage {
pub id: usize,
pub text: String,
}
/// Message from the client to the server.
#[derive(Serialize, Deserialize)]
pub struct ClientMessage {
pub text: String,
}
//const WS_URL: &str = "wss://9000.z.xinu.tv/api/ws";
//const WS_URL: &str = "wss://9345.z.xinu.tv/api/graphql/ws";
const WS_URL: &str = "wss://6758.z.xinu.tv/api/ws";
// ------ ------
// Model
// ------ ------
pub struct Model {
web_socket: EventClient,
web_socket_reconnector: Option<StreamHandle>,
pub updates: VecDeque<WebsocketMessage>,
}
// ------ ------
// Init
// ------ ------
pub fn init(orders: &mut impl Orders<Msg>) -> Model {
Model {
web_socket: create_websocket(orders).unwrap(),
web_socket_reconnector: None,
updates: VecDeque::new(),
}
}
// ------ ------
// Update
// ------ ------
pub enum Msg {
WebSocketOpened,
TextMessageReceived(WebsocketMessage),
WebSocketClosed(CloseEvent),
WebSocketFailed,
ReconnectWebSocket(usize),
#[allow(dead_code)]
SendMessage(ClientMessage),
}
pub fn update(msg: Msg, model: &mut Model, orders: &mut impl Orders<Msg>) {
match msg {
Msg::WebSocketOpened => {
model.web_socket_reconnector = None;
info!("WebSocket connection is open now");
}
Msg::TextMessageReceived(msg) => {
model.updates.push_back(msg);
}
Msg::WebSocketClosed(close_event) => {
info!(
r#"==================
WebSocket connection was closed:
Clean: {0}
Code: {1}
Reason: {2}
=================="#,
close_event.was_clean(),
close_event.code(),
close_event.reason()
);
// Chrome doesn't invoke `on_error` when the connection is lost.
if !close_event.was_clean() && model.web_socket_reconnector.is_none() {
model.web_socket_reconnector = Some(
orders.stream_with_handle(streams::backoff(None, Msg::ReconnectWebSocket)),
);
}
}
Msg::WebSocketFailed => {
info!("WebSocket failed");
if model.web_socket_reconnector.is_none() {
model.web_socket_reconnector = Some(
orders.stream_with_handle(streams::backoff(None, Msg::ReconnectWebSocket)),
);
}
}
Msg::ReconnectWebSocket(retries) => {
info!("Reconnect attempt: {}", retries);
model.web_socket = create_websocket(orders).unwrap();
}
Msg::SendMessage(msg) => {
let txt = serde_json::to_string(&msg).unwrap();
model.web_socket.send_string(&txt).unwrap();
}
}
}
fn create_websocket(orders: &impl Orders<Msg>) -> Result<EventClient, WebSocketError> {
let msg_sender = orders.msg_sender();
let mut client = EventClient::new(WS_URL)?;
client.set_on_error(Some(Box::new(|error| {
gloo_console::error!("WS: ", error);
})));
let send = msg_sender.clone();
client.set_on_connection(Some(Box::new(move |client: &EventClient| {
info!("{:#?}", client.status);
let msg = match *client.status.borrow() {
ConnectionStatus::Connecting => {
info!("Connecting...");
None
}
ConnectionStatus::Connected => Some(Msg::WebSocketOpened),
ConnectionStatus::Error => Some(Msg::WebSocketFailed),
ConnectionStatus::Disconnected => {
info!("Disconnected");
None
}
};
send(msg);
})));
let send = msg_sender.clone();
client.set_on_close(Some(Box::new(move |ev| {
info!("WS: Connection closed");
send(Some(Msg::WebSocketClosed(ev)));
})));
let send = msg_sender.clone();
client.set_on_message(Some(Box::new(move |_: &EventClient, msg: Message| {
decode_message(msg, Rc::clone(&send))
})));
Ok(client)
}
fn decode_message(message: Message, msg_sender: Rc<dyn Fn(Option<Msg>)>) {
match message {
Message::Text(txt) => {
let msg: WebsocketMessage = serde_json::from_str(&txt).unwrap_or_else(|e| {
panic!("failed to parse json into WebsocketMessage: {e}\n'{txt}'")
});
msg_sender(Some(Msg::TextMessageReceived(msg)));
}
m => error!("unexpected message type received of {m:?}"),
}
}