Compare commits
No commits in common. "b8ef753f85d7ab43e8d260179d915b931e6156a5" and "b11f6b51495f330e2bbe40be7ec52dac42b6faf1" have entirely different histories.
b8ef753f85
...
b11f6b5149
724
Cargo.lock
generated
724
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
@ -8,7 +8,7 @@ authors = ["Bill Thiede <git@xinu.tv>"]
|
|||||||
edition = "2021"
|
edition = "2021"
|
||||||
license = "UNLICENSED"
|
license = "UNLICENSED"
|
||||||
publish = ["xinu"]
|
publish = ["xinu"]
|
||||||
version = "0.15.0"
|
version = "0.12.1"
|
||||||
repository = "https://git.z.xinu.tv/wathiede/letterbox"
|
repository = "https://git.z.xinu.tv/wathiede/letterbox"
|
||||||
|
|
||||||
[profile.dev]
|
[profile.dev]
|
||||||
|
|||||||
@ -470,7 +470,7 @@ pub enum NotmuchError {
|
|||||||
MailParseError(#[from] mailparse::MailParseError),
|
MailParseError(#[from] mailparse::MailParseError),
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Default)]
|
#[derive(Default)]
|
||||||
pub struct Notmuch {
|
pub struct Notmuch {
|
||||||
config_path: Option<PathBuf>,
|
config_path: Option<PathBuf>,
|
||||||
}
|
}
|
||||||
|
|||||||
22
server/.sqlx/query-dabd12987369cb273c0191d46645c376439d246d5a697340574c6afdac93d2cc.json
generated
Normal file
22
server/.sqlx/query-dabd12987369cb273c0191d46645c376439d246d5a697340574c6afdac93d2cc.json
generated
Normal file
@ -0,0 +1,22 @@
|
|||||||
|
{
|
||||||
|
"db_name": "PostgreSQL",
|
||||||
|
"query": "\nSELECT id\nFROM feed\nWHERE slug = $1\n ",
|
||||||
|
"describe": {
|
||||||
|
"columns": [
|
||||||
|
{
|
||||||
|
"ordinal": 0,
|
||||||
|
"name": "id",
|
||||||
|
"type_info": "Int4"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"parameters": {
|
||||||
|
"Left": [
|
||||||
|
"Text"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"nullable": [
|
||||||
|
false
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"hash": "dabd12987369cb273c0191d46645c376439d246d5a697340574c6afdac93d2cc"
|
||||||
|
}
|
||||||
24
server/.sqlx/query-e2a448aaf4fe92fc1deda10bf844f6b9225d35758cba7c9f337c1a730aee41bd.json
generated
Normal file
24
server/.sqlx/query-e2a448aaf4fe92fc1deda10bf844f6b9225d35758cba7c9f337c1a730aee41bd.json
generated
Normal file
@ -0,0 +1,24 @@
|
|||||||
|
{
|
||||||
|
"db_name": "PostgreSQL",
|
||||||
|
"query": "\nINSERT INTO feed ( name, slug, url, homepage, selector )\nVALUES ( $1, $2, $3, '', '' )\nRETURNING id\n ",
|
||||||
|
"describe": {
|
||||||
|
"columns": [
|
||||||
|
{
|
||||||
|
"ordinal": 0,
|
||||||
|
"name": "id",
|
||||||
|
"type_info": "Int4"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"parameters": {
|
||||||
|
"Left": [
|
||||||
|
"Text",
|
||||||
|
"Text",
|
||||||
|
"Text"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"nullable": [
|
||||||
|
false
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"hash": "e2a448aaf4fe92fc1deda10bf844f6b9225d35758cba7c9f337c1a730aee41bd"
|
||||||
|
}
|
||||||
@ -15,19 +15,15 @@ version.workspace = true
|
|||||||
ammonia = "4.0.0"
|
ammonia = "4.0.0"
|
||||||
anyhow = "1.0.79"
|
anyhow = "1.0.79"
|
||||||
async-graphql = { version = "7", features = ["log"] }
|
async-graphql = { version = "7", features = ["log"] }
|
||||||
async-graphql-axum = "7.0.15"
|
async-graphql-rocket = "7"
|
||||||
async-trait = "0.1.81"
|
async-trait = "0.1.81"
|
||||||
axum = { version = "0.8.3", features = ["ws"] }
|
|
||||||
axum-macros = "0.5.0"
|
|
||||||
build-info = "0.0.40"
|
build-info = "0.0.40"
|
||||||
cacher = { version = "0.2.0", registry = "xinu" }
|
cacher = { version = "0.2.0", registry = "xinu" }
|
||||||
chrono = "0.4.39"
|
chrono = "0.4.39"
|
||||||
|
clap = { version = "4.5.23", features = ["derive"] }
|
||||||
css-inline = "0.14.0"
|
css-inline = "0.14.0"
|
||||||
futures = "0.3.31"
|
futures = "0.3.31"
|
||||||
headers = "0.4.0"
|
|
||||||
html-escape = "0.2.13"
|
html-escape = "0.2.13"
|
||||||
letterbox-notmuch = { version = "0.15.0", path = "../notmuch", registry = "xinu" }
|
|
||||||
letterbox-shared = { version = "0.15.0", path = "../shared", registry = "xinu" }
|
|
||||||
linkify = "0.10.0"
|
linkify = "0.10.0"
|
||||||
log = "0.4.17"
|
log = "0.4.17"
|
||||||
lol_html = "2.0.0"
|
lol_html = "2.0.0"
|
||||||
@ -36,6 +32,8 @@ maplit = "1.0.2"
|
|||||||
memmap = "0.7.0"
|
memmap = "0.7.0"
|
||||||
regex = "1.11.1"
|
regex = "1.11.1"
|
||||||
reqwest = { version = "0.12.7", features = ["blocking"] }
|
reqwest = { version = "0.12.7", features = ["blocking"] }
|
||||||
|
rocket = { version = "0.5.0-rc.2", features = ["json"] }
|
||||||
|
rocket_cors = "0.6.0"
|
||||||
scraper = "0.23.0"
|
scraper = "0.23.0"
|
||||||
serde = { version = "1.0.147", features = ["derive"] }
|
serde = { version = "1.0.147", features = ["derive"] }
|
||||||
serde_json = "1.0.87"
|
serde_json = "1.0.87"
|
||||||
@ -43,13 +41,14 @@ sqlx = { version = "0.8.2", features = ["postgres", "runtime-tokio", "time"] }
|
|||||||
tantivy = { version = "0.24.0", optional = true }
|
tantivy = { version = "0.24.0", optional = true }
|
||||||
thiserror = "2.0.0"
|
thiserror = "2.0.0"
|
||||||
tokio = "1.26.0"
|
tokio = "1.26.0"
|
||||||
tower-http = { version = "0.6.2", features = ["trace"] }
|
|
||||||
tracing = "0.1.41"
|
tracing = "0.1.41"
|
||||||
url = "2.5.2"
|
url = "2.5.2"
|
||||||
urlencoding = "2.1.3"
|
urlencoding = "2.1.3"
|
||||||
#xtracing = { git = "http://git-private.h.xinu.tv/wathiede/xtracing.git" }
|
|
||||||
#xtracing = { path = "../../xtracing" }
|
#xtracing = { path = "../../xtracing" }
|
||||||
|
#xtracing = { git = "http://git-private.h.xinu.tv/wathiede/xtracing.git" }
|
||||||
xtracing = { version = "0.3.0", registry = "xinu" }
|
xtracing = { version = "0.3.0", registry = "xinu" }
|
||||||
|
letterbox-notmuch = { version = "0.12.1", path = "../notmuch", registry = "xinu" }
|
||||||
|
letterbox-shared = { version = "0.12.1", path = "../shared", registry = "xinu" }
|
||||||
|
|
||||||
[build-dependencies]
|
[build-dependencies]
|
||||||
build-info-build = "0.0.40"
|
build-info-build = "0.0.40"
|
||||||
|
|||||||
@ -5,6 +5,7 @@ newsreader_database_url = "postgres://newsreader@nixos-07.h.xinu.tv/newsreader"
|
|||||||
newsreader_tantivy_db_path = "../target/database/newsreader"
|
newsreader_tantivy_db_path = "../target/database/newsreader"
|
||||||
|
|
||||||
[debug]
|
[debug]
|
||||||
|
address = "0.0.0.0"
|
||||||
port = 9345
|
port = 9345
|
||||||
# Uncomment to make it production like.
|
# Uncomment to make it production like.
|
||||||
#log_level = "critical"
|
#log_level = "critical"
|
||||||
|
|||||||
22
server/src/bin/email2db.rs
Normal file
22
server/src/bin/email2db.rs
Normal file
@ -0,0 +1,22 @@
|
|||||||
|
use clap::Parser;
|
||||||
|
use letterbox_server::mail::read_mail_to_db;
|
||||||
|
use sqlx::postgres::PgPool;
|
||||||
|
|
||||||
|
/// Add certain emails as posts in newsfeed app.
|
||||||
|
#[derive(Parser, Debug)]
|
||||||
|
#[command(author, version, about, long_about = None)]
|
||||||
|
struct Args {
|
||||||
|
/// DB URL, something like postgres://newsreader@nixos-07.h.xinu.tv/newsreader
|
||||||
|
#[arg(short, long)]
|
||||||
|
db_url: String,
|
||||||
|
/// path to parse
|
||||||
|
path: String,
|
||||||
|
}
|
||||||
|
#[tokio::main]
|
||||||
|
async fn main() -> anyhow::Result<()> {
|
||||||
|
let _guard = xtracing::init(env!("CARGO_BIN_NAME"))?;
|
||||||
|
let args = Args::parse();
|
||||||
|
let pool = PgPool::connect(&args.db_url).await?;
|
||||||
|
read_mail_to_db(&pool, &args.path).await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
@ -1,100 +1,114 @@
|
|||||||
// Rocket generates a lot of warnings for handlers
|
// Rocket generates a lot of warnings for handlers
|
||||||
// TODO: figure out why
|
// TODO: figure out why
|
||||||
#![allow(unreachable_patterns)]
|
#![allow(unreachable_patterns)]
|
||||||
use std::{error::Error, net::SocketAddr, sync::Arc, time::Duration};
|
#[macro_use]
|
||||||
|
extern crate rocket;
|
||||||
|
use std::{error::Error, io::Cursor, str::FromStr};
|
||||||
|
|
||||||
use async_graphql::{extensions, http::GraphiQLSource, Schema};
|
use async_graphql::{extensions, http::GraphiQLSource, EmptySubscription, Schema};
|
||||||
use async_graphql_axum::{GraphQL, GraphQLSubscription};
|
use async_graphql_rocket::{GraphQLQuery, GraphQLRequest, GraphQLResponse};
|
||||||
//allows to extract the IP of connecting user
|
|
||||||
use axum::extract::connect_info::ConnectInfo;
|
|
||||||
use axum::{
|
|
||||||
extract::{self, ws::WebSocketUpgrade, State},
|
|
||||||
http::{header, StatusCode},
|
|
||||||
response::{self, IntoResponse, Response},
|
|
||||||
routing::{any, get},
|
|
||||||
Router,
|
|
||||||
};
|
|
||||||
use cacher::FilesystemCacher;
|
use cacher::FilesystemCacher;
|
||||||
use letterbox_notmuch::Notmuch;
|
use letterbox_notmuch::{Notmuch, NotmuchError, ThreadSet};
|
||||||
#[cfg(feature = "tantivy")]
|
#[cfg(feature = "tantivy")]
|
||||||
use letterbox_server::tantivy::TantivyConnection;
|
use letterbox_server::tantivy::TantivyConnection;
|
||||||
use letterbox_server::{
|
use letterbox_server::{
|
||||||
config::Config,
|
config::Config,
|
||||||
graphql::{compute_catchup_ids, Attachment, MutationRoot, QueryRoot, SubscriptionRoot},
|
error::ServerError,
|
||||||
|
graphql::{Attachment, GraphqlSchema, Mutation, QueryRoot},
|
||||||
nm::{attachment_bytes, cid_attachment_bytes},
|
nm::{attachment_bytes, cid_attachment_bytes},
|
||||||
ws::ConnectionTracker,
|
|
||||||
};
|
};
|
||||||
use letterbox_shared::WebsocketMessage;
|
use rocket::{
|
||||||
|
fairing::AdHoc,
|
||||||
|
http::{ContentType, Header},
|
||||||
|
request::Request,
|
||||||
|
response::{content, Debug, Responder},
|
||||||
|
serde::json::Json,
|
||||||
|
Response, State,
|
||||||
|
};
|
||||||
|
use rocket_cors::{AllowedHeaders, AllowedOrigins};
|
||||||
use sqlx::postgres::PgPool;
|
use sqlx::postgres::PgPool;
|
||||||
use tokio::{net::TcpListener, sync::Mutex};
|
|
||||||
use tower_http::trace::{DefaultMakeSpan, TraceLayer};
|
|
||||||
use tracing::info;
|
|
||||||
|
|
||||||
// Make our own error that wraps `anyhow::Error`.
|
#[get("/show/<query>/pretty")]
|
||||||
struct AppError(letterbox_server::ServerError);
|
async fn show_pretty(
|
||||||
|
nm: &State<Notmuch>,
|
||||||
// Tell axum how to convert `AppError` into a response.
|
query: &str,
|
||||||
impl IntoResponse for AppError {
|
) -> Result<Json<ThreadSet>, Debug<ServerError>> {
|
||||||
fn into_response(self) -> Response {
|
let query = urlencoding::decode(query).map_err(|e| ServerError::from(NotmuchError::from(e)))?;
|
||||||
(
|
let res = nm.show(&query).map_err(ServerError::from)?;
|
||||||
StatusCode::INTERNAL_SERVER_ERROR,
|
Ok(Json(res))
|
||||||
format!("Something went wrong: {}", self.0),
|
|
||||||
)
|
|
||||||
.into_response()
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
// This enables using `?` on functions that return `Result<_, letterbox_server::Error>` to turn them into
|
|
||||||
// `Result<_, AppError>`. That way you don't need to do that manually.
|
#[get("/show/<query>")]
|
||||||
impl<E> From<E> for AppError
|
async fn show(nm: &State<Notmuch>, query: &str) -> Result<Json<ThreadSet>, Debug<NotmuchError>> {
|
||||||
where
|
let query = urlencoding::decode(query).map_err(NotmuchError::from)?;
|
||||||
E: Into<letterbox_server::ServerError>,
|
let res = nm.show(&query)?;
|
||||||
{
|
Ok(Json(res))
|
||||||
fn from(err: E) -> Self {
|
}
|
||||||
Self(err.into())
|
|
||||||
|
struct InlineAttachmentResponder(Attachment);
|
||||||
|
|
||||||
|
impl<'r, 'o: 'r> Responder<'r, 'o> for InlineAttachmentResponder {
|
||||||
|
fn respond_to(self, _: &'r Request<'_>) -> rocket::response::Result<'o> {
|
||||||
|
let mut resp = Response::build();
|
||||||
|
if let Some(filename) = self.0.filename {
|
||||||
|
resp.header(Header::new(
|
||||||
|
"Content-Disposition",
|
||||||
|
format!(r#"inline; filename="{}""#, filename),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
if let Some(content_type) = self.0.content_type {
|
||||||
|
if let Some(ct) = ContentType::parse_flexible(&content_type) {
|
||||||
|
resp.header(ct);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
resp.sized_body(self.0.bytes.len(), Cursor::new(self.0.bytes))
|
||||||
|
.ok()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn inline_attachment_response(attachment: Attachment) -> impl IntoResponse {
|
struct DownloadAttachmentResponder(Attachment);
|
||||||
info!("attachment filename {:?}", attachment.filename);
|
|
||||||
let mut hdr_map = headers::HeaderMap::new();
|
impl<'r, 'o: 'r> Responder<'r, 'o> for DownloadAttachmentResponder {
|
||||||
if let Some(filename) = attachment.filename {
|
fn respond_to(self, _: &'r Request<'_>) -> rocket::response::Result<'o> {
|
||||||
hdr_map.insert(
|
let mut resp = Response::build();
|
||||||
header::CONTENT_DISPOSITION,
|
if let Some(filename) = self.0.filename {
|
||||||
format!(r#"inline; filename="{}""#, filename)
|
resp.header(Header::new(
|
||||||
.parse()
|
"Content-Disposition",
|
||||||
.unwrap(),
|
format!(r#"attachment; filename="{}""#, filename),
|
||||||
);
|
));
|
||||||
|
}
|
||||||
|
if let Some(content_type) = self.0.content_type {
|
||||||
|
if let Some(ct) = ContentType::parse_flexible(&content_type) {
|
||||||
|
resp.header(ct);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
resp.sized_body(self.0.bytes.len(), Cursor::new(self.0.bytes))
|
||||||
|
.ok()
|
||||||
}
|
}
|
||||||
if let Some(ct) = attachment.content_type {
|
|
||||||
hdr_map.insert(header::CONTENT_TYPE, ct.parse().unwrap());
|
|
||||||
}
|
|
||||||
info!("hdr_map {hdr_map:?}");
|
|
||||||
(hdr_map, attachment.bytes).into_response()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn download_attachment_response(attachment: Attachment) -> impl IntoResponse {
|
#[get("/cid/<id>/<cid>")]
|
||||||
info!("attachment filename {:?}", attachment.filename);
|
async fn view_cid(
|
||||||
let mut hdr_map = headers::HeaderMap::new();
|
nm: &State<Notmuch>,
|
||||||
if let Some(filename) = attachment.filename {
|
id: &str,
|
||||||
hdr_map.insert(
|
cid: &str,
|
||||||
header::CONTENT_DISPOSITION,
|
) -> Result<InlineAttachmentResponder, Debug<ServerError>> {
|
||||||
format!(r#"attachment; filename="{}""#, filename)
|
let mid = if id.starts_with("id:") {
|
||||||
.parse()
|
id.to_string()
|
||||||
.unwrap(),
|
} else {
|
||||||
);
|
format!("id:{}", id)
|
||||||
}
|
};
|
||||||
if let Some(ct) = attachment.content_type {
|
info!("view cid attachment {mid} {cid}");
|
||||||
hdr_map.insert(header::CONTENT_TYPE, ct.parse().unwrap());
|
let attachment = cid_attachment_bytes(nm, &mid, &cid)?;
|
||||||
}
|
Ok(InlineAttachmentResponder(attachment))
|
||||||
info!("hdr_map {hdr_map:?}");
|
|
||||||
(hdr_map, attachment.bytes).into_response()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[axum_macros::debug_handler]
|
#[get("/view/attachment/<id>/<idx>/<_>")]
|
||||||
async fn view_attachment(
|
async fn view_attachment(
|
||||||
State(AppState { nm, .. }): State<AppState>,
|
nm: &State<Notmuch>,
|
||||||
extract::Path((id, idx, _)): extract::Path<(String, String, String)>,
|
id: &str,
|
||||||
) -> Result<impl IntoResponse, AppError> {
|
idx: &str,
|
||||||
|
) -> Result<InlineAttachmentResponder, Debug<ServerError>> {
|
||||||
let mid = if id.starts_with("id:") {
|
let mid = if id.starts_with("id:") {
|
||||||
id.to_string()
|
id.to_string()
|
||||||
} else {
|
} else {
|
||||||
@ -105,14 +119,16 @@ async fn view_attachment(
|
|||||||
.split('.')
|
.split('.')
|
||||||
.map(|s| s.parse().expect("not a usize"))
|
.map(|s| s.parse().expect("not a usize"))
|
||||||
.collect();
|
.collect();
|
||||||
let attachment = attachment_bytes(&nm, &mid, &idx)?;
|
let attachment = attachment_bytes(nm, &mid, &idx)?;
|
||||||
Ok(inline_attachment_response(attachment))
|
Ok(InlineAttachmentResponder(attachment))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[get("/download/attachment/<id>/<idx>/<_>")]
|
||||||
async fn download_attachment(
|
async fn download_attachment(
|
||||||
State(AppState { nm, .. }): State<AppState>,
|
nm: &State<Notmuch>,
|
||||||
extract::Path((id, idx, _)): extract::Path<(String, String, String)>,
|
id: &str,
|
||||||
) -> Result<impl IntoResponse, AppError> {
|
idx: &str,
|
||||||
|
) -> Result<DownloadAttachmentResponder, Debug<ServerError>> {
|
||||||
let mid = if id.starts_with("id:") {
|
let mid = if id.starts_with("id:") {
|
||||||
id.to_string()
|
id.to_string()
|
||||||
} else {
|
} else {
|
||||||
@ -123,154 +139,102 @@ async fn download_attachment(
|
|||||||
.split('.')
|
.split('.')
|
||||||
.map(|s| s.parse().expect("not a usize"))
|
.map(|s| s.parse().expect("not a usize"))
|
||||||
.collect();
|
.collect();
|
||||||
let attachment = attachment_bytes(&nm, &mid, &idx)?;
|
let attachment = attachment_bytes(nm, &mid, &idx)?;
|
||||||
Ok(download_attachment_response(attachment))
|
Ok(DownloadAttachmentResponder(attachment))
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn view_cid(
|
#[get("/original/<id>")]
|
||||||
State(AppState { nm, .. }): State<AppState>,
|
async fn original(
|
||||||
extract::Path((id, cid)): extract::Path<(String, String)>,
|
nm: &State<Notmuch>,
|
||||||
) -> Result<impl IntoResponse, AppError> {
|
id: &str,
|
||||||
|
) -> Result<(ContentType, Vec<u8>), Debug<NotmuchError>> {
|
||||||
let mid = if id.starts_with("id:") {
|
let mid = if id.starts_with("id:") {
|
||||||
id.to_string()
|
id.to_string()
|
||||||
} else {
|
} else {
|
||||||
format!("id:{}", id)
|
format!("id:{}", id)
|
||||||
};
|
};
|
||||||
info!("view cid attachment {mid} {cid}");
|
let res = nm.show_original(&mid)?;
|
||||||
let attachment = cid_attachment_bytes(&nm, &mid, &cid)?;
|
Ok((ContentType::Plain, res))
|
||||||
Ok(inline_attachment_response(attachment))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn graphiql() -> impl IntoResponse {
|
#[rocket::get("/")]
|
||||||
response::Html(
|
fn graphiql() -> content::RawHtml<String> {
|
||||||
GraphiQLSource::build()
|
content::RawHtml(GraphiQLSource::build().endpoint("/api/graphql").finish())
|
||||||
.endpoint("/api/graphql/")
|
|
||||||
.subscription_endpoint("/api/graphql/ws")
|
|
||||||
.finish(),
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn start_ws(
|
#[rocket::get("/graphql?<query..>")]
|
||||||
ws: WebSocketUpgrade,
|
async fn graphql_query(schema: &State<GraphqlSchema>, query: GraphQLQuery) -> GraphQLResponse {
|
||||||
ConnectInfo(addr): ConnectInfo<SocketAddr>,
|
query.execute(schema.inner()).await
|
||||||
State(AppState {
|
|
||||||
connection_tracker, ..
|
|
||||||
}): State<AppState>,
|
|
||||||
) -> impl IntoResponse {
|
|
||||||
ws.on_upgrade(async move |socket| connection_tracker.lock().await.add_peer(socket, addr).await)
|
|
||||||
}
|
|
||||||
#[axum_macros::debug_handler]
|
|
||||||
async fn test_handler(
|
|
||||||
State(AppState {
|
|
||||||
connection_tracker, ..
|
|
||||||
}): State<AppState>,
|
|
||||||
) -> impl IntoResponse {
|
|
||||||
connection_tracker
|
|
||||||
.lock()
|
|
||||||
.await
|
|
||||||
.send_message_all(WebsocketMessage::RefreshMessages)
|
|
||||||
.await;
|
|
||||||
"test triggered"
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn watch_new(
|
#[rocket::post("/graphql", data = "<request>", format = "application/json")]
|
||||||
nm: Notmuch,
|
async fn graphql_request(
|
||||||
pool: PgPool,
|
schema: &State<GraphqlSchema>,
|
||||||
conn_tracker: Arc<Mutex<ConnectionTracker>>,
|
request: GraphQLRequest,
|
||||||
poll_time: Duration,
|
) -> GraphQLResponse {
|
||||||
) -> Result<(), async_graphql::Error> {
|
request.execute(schema.inner()).await
|
||||||
let mut old_ids = Vec::new();
|
|
||||||
loop {
|
|
||||||
let ids = compute_catchup_ids(&nm, &pool, "is:unread").await?;
|
|
||||||
if old_ids != ids {
|
|
||||||
info!("old_ids: {old_ids:?}\n ids: {ids:?}");
|
|
||||||
conn_tracker
|
|
||||||
.lock()
|
|
||||||
.await
|
|
||||||
.send_message_all(WebsocketMessage::RefreshMessages)
|
|
||||||
.await
|
|
||||||
}
|
|
||||||
old_ids = ids;
|
|
||||||
tokio::time::sleep(poll_time).await;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[rocket::main]
|
||||||
struct AppState {
|
|
||||||
nm: Notmuch,
|
|
||||||
connection_tracker: Arc<Mutex<ConnectionTracker>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::main]
|
|
||||||
async fn main() -> Result<(), Box<dyn Error>> {
|
async fn main() -> Result<(), Box<dyn Error>> {
|
||||||
let _guard = xtracing::init(env!("CARGO_BIN_NAME"))?;
|
let _guard = xtracing::init(env!("CARGO_BIN_NAME"))?;
|
||||||
build_info::build_info!(fn bi);
|
build_info::build_info!(fn bi);
|
||||||
info!("Build Info: {}", letterbox_shared::build_version(bi));
|
info!("Build Info: {}", letterbox_shared::build_version(bi));
|
||||||
// TODO: move these to config
|
let allowed_origins = AllowedOrigins::all();
|
||||||
let port = 9345;
|
let cors = rocket_cors::CorsOptions {
|
||||||
let config = Config {
|
allowed_origins,
|
||||||
newsreader_database_url: "postgres://newsreader@nixos-07.h.xinu.tv/newsreader".to_string(),
|
allowed_methods: vec!["Get"]
|
||||||
newsreader_tantivy_db_path: "../target/database/newsreader".to_string(),
|
.into_iter()
|
||||||
slurp_cache_path: "/tmp/letterbox/slurp".to_string(),
|
.map(|s| FromStr::from_str(s).unwrap())
|
||||||
};
|
.collect(),
|
||||||
|
allowed_headers: AllowedHeaders::some(&["Authorization", "Accept"]),
|
||||||
|
allow_credentials: true,
|
||||||
|
..Default::default()
|
||||||
|
}
|
||||||
|
.to_cors()?;
|
||||||
|
|
||||||
|
let rkt = rocket::build()
|
||||||
|
.mount(
|
||||||
|
letterbox_shared::urls::MOUNT_POINT,
|
||||||
|
routes![
|
||||||
|
original,
|
||||||
|
show_pretty,
|
||||||
|
show,
|
||||||
|
graphql_query,
|
||||||
|
graphql_request,
|
||||||
|
graphiql,
|
||||||
|
view_cid,
|
||||||
|
view_attachment,
|
||||||
|
download_attachment,
|
||||||
|
],
|
||||||
|
)
|
||||||
|
.attach(cors)
|
||||||
|
.attach(AdHoc::config::<Config>());
|
||||||
|
|
||||||
|
let config: Config = rkt.figment().extract()?;
|
||||||
if !std::fs::exists(&config.slurp_cache_path)? {
|
if !std::fs::exists(&config.slurp_cache_path)? {
|
||||||
info!("Creating slurp cache @ '{}'", &config.slurp_cache_path);
|
info!("Creating slurp cache @ '{}'", &config.slurp_cache_path);
|
||||||
std::fs::create_dir_all(&config.slurp_cache_path)?;
|
std::fs::create_dir_all(&config.slurp_cache_path)?;
|
||||||
}
|
}
|
||||||
let pool = PgPool::connect(&config.newsreader_database_url).await?;
|
let pool = PgPool::connect(&config.newsreader_database_url).await?;
|
||||||
let nm = Notmuch::default();
|
|
||||||
sqlx::migrate!("./migrations").run(&pool).await?;
|
sqlx::migrate!("./migrations").run(&pool).await?;
|
||||||
#[cfg(feature = "tantivy")]
|
#[cfg(feature = "tantivy")]
|
||||||
let tantivy_conn = TantivyConnection::new(&config.newsreader_tantivy_db_path)?;
|
let tantivy_conn = TantivyConnection::new(&config.newsreader_tantivy_db_path)?;
|
||||||
|
|
||||||
let cacher = FilesystemCacher::new(&config.slurp_cache_path)?;
|
let cacher = FilesystemCacher::new(&config.slurp_cache_path)?;
|
||||||
let schema = Schema::build(QueryRoot, MutationRoot, SubscriptionRoot)
|
let schema = Schema::build(QueryRoot, Mutation, EmptySubscription)
|
||||||
.data(nm.clone())
|
.data(Notmuch::default())
|
||||||
.data(cacher)
|
.data(cacher)
|
||||||
.data(pool.clone());
|
.data(pool.clone());
|
||||||
|
|
||||||
|
#[cfg(feature = "tantivy")]
|
||||||
|
let schema = schema.data(tantivy_conn);
|
||||||
|
|
||||||
let schema = schema.extension(extensions::Logger).finish();
|
let schema = schema.extension(extensions::Logger).finish();
|
||||||
|
|
||||||
let connection_tracker = Arc::new(Mutex::new(ConnectionTracker::default()));
|
let rkt = rkt.manage(schema).manage(pool).manage(Notmuch::default());
|
||||||
let ct = Arc::clone(&connection_tracker);
|
//.manage(Notmuch::with_config("../notmuch/testdata/notmuch.config"))
|
||||||
let poll_time = Duration::from_secs(10);
|
|
||||||
let _h = tokio::spawn(watch_new(nm.clone(), pool, ct, poll_time));
|
|
||||||
|
|
||||||
let app = Router::new()
|
rkt.launch().await?;
|
||||||
.route("/test", get(test_handler))
|
|
||||||
.route(
|
|
||||||
"/api/download/attachment/{id}/{idx}/{*rest}",
|
|
||||||
get(download_attachment),
|
|
||||||
)
|
|
||||||
.route(
|
|
||||||
"/api/view/attachment/{id}/{idx}/{*rest}",
|
|
||||||
get(view_attachment),
|
|
||||||
)
|
|
||||||
.route("/api/cid/{id}/{cid}", get(view_cid))
|
|
||||||
.route("/api/ws", any(start_ws))
|
|
||||||
.route_service("/api/graphql/ws", GraphQLSubscription::new(schema.clone()))
|
|
||||||
.route(
|
|
||||||
"/api/graphql/",
|
|
||||||
get(graphiql).post_service(GraphQL::new(schema.clone())),
|
|
||||||
)
|
|
||||||
.with_state(AppState {
|
|
||||||
nm,
|
|
||||||
connection_tracker,
|
|
||||||
})
|
|
||||||
.layer(
|
|
||||||
TraceLayer::new_for_http()
|
|
||||||
.make_span_with(DefaultMakeSpan::default().include_headers(true)),
|
|
||||||
);
|
|
||||||
|
|
||||||
let listener = TcpListener::bind(SocketAddr::from(([0, 0, 0, 0], port)))
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
tracing::info!("listening on {}", listener.local_addr().unwrap());
|
|
||||||
axum::serve(
|
|
||||||
listener,
|
|
||||||
app.into_make_service_with_connect_info::<SocketAddr>(),
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|||||||
@ -2,12 +2,10 @@ use std::{fmt, str::FromStr};
|
|||||||
|
|
||||||
use async_graphql::{
|
use async_graphql::{
|
||||||
connection::{self, Connection, Edge, OpaqueCursor},
|
connection::{self, Connection, Edge, OpaqueCursor},
|
||||||
futures_util::Stream,
|
Context, EmptySubscription, Enum, Error, FieldResult, InputObject, Object, Schema,
|
||||||
Context, Enum, Error, FieldResult, InputObject, Object, Schema, SimpleObject, Subscription,
|
SimpleObject, Union,
|
||||||
Union,
|
|
||||||
};
|
};
|
||||||
use cacher::FilesystemCacher;
|
use cacher::FilesystemCacher;
|
||||||
use futures::stream;
|
|
||||||
use letterbox_notmuch::Notmuch;
|
use letterbox_notmuch::Notmuch;
|
||||||
use log::info;
|
use log::info;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
@ -291,6 +289,7 @@ impl QueryRoot {
|
|||||||
build_info::build_info!(fn bi);
|
build_info::build_info!(fn bi);
|
||||||
Ok(letterbox_shared::build_version(bi))
|
Ok(letterbox_shared::build_version(bi))
|
||||||
}
|
}
|
||||||
|
#[instrument(skip_all, fields(query=query))]
|
||||||
#[instrument(skip_all, fields(query=query, rid=request_id()))]
|
#[instrument(skip_all, fields(query=query, rid=request_id()))]
|
||||||
async fn count<'ctx>(&self, ctx: &Context<'ctx>, query: String) -> Result<usize, Error> {
|
async fn count<'ctx>(&self, ctx: &Context<'ctx>, query: String) -> Result<usize, Error> {
|
||||||
let nm = ctx.data_unchecked::<Notmuch>();
|
let nm = ctx.data_unchecked::<Notmuch>();
|
||||||
@ -311,7 +310,6 @@ impl QueryRoot {
|
|||||||
info!("count {newsreader_query:?} newsreader count {newsreader_count} notmuch count {notmuch_count} tantivy count {tantivy_count} total {total}");
|
info!("count {newsreader_query:?} newsreader count {newsreader_count} notmuch count {notmuch_count} tantivy count {tantivy_count} total {total}");
|
||||||
Ok(total)
|
Ok(total)
|
||||||
}
|
}
|
||||||
#[instrument(skip_all, fields(query=query, rid=request_id()))]
|
|
||||||
async fn catchup<'ctx>(
|
async fn catchup<'ctx>(
|
||||||
&self,
|
&self,
|
||||||
ctx: &Context<'ctx>,
|
ctx: &Context<'ctx>,
|
||||||
@ -319,7 +317,37 @@ impl QueryRoot {
|
|||||||
) -> Result<Vec<String>, Error> {
|
) -> Result<Vec<String>, Error> {
|
||||||
let nm = ctx.data_unchecked::<Notmuch>();
|
let nm = ctx.data_unchecked::<Notmuch>();
|
||||||
let pool = ctx.data_unchecked::<PgPool>();
|
let pool = ctx.data_unchecked::<PgPool>();
|
||||||
compute_catchup_ids(nm, pool, &query).await
|
let query: Query = query.parse()?;
|
||||||
|
// TODO: implement optimized versions of fetching just IDs
|
||||||
|
let newsreader_fut = newsreader_search(pool, None, None, None, None, &query);
|
||||||
|
let notmuch_fut = notmuch_search(nm, None, None, None, None, &query);
|
||||||
|
let (newsreader_results, notmuch_results) = join!(newsreader_fut, notmuch_fut);
|
||||||
|
|
||||||
|
let newsreader_results = newsreader_results?;
|
||||||
|
let notmuch_results = notmuch_results?;
|
||||||
|
info!(
|
||||||
|
"newsreader_results ({}) notmuch_results ({})",
|
||||||
|
newsreader_results.len(),
|
||||||
|
notmuch_results.len(),
|
||||||
|
);
|
||||||
|
|
||||||
|
let mut results: Vec<_> = newsreader_results
|
||||||
|
.into_iter()
|
||||||
|
.chain(notmuch_results)
|
||||||
|
.collect();
|
||||||
|
// The leading '-' is to reverse sort
|
||||||
|
results.sort_by_key(|item| match item {
|
||||||
|
ThreadSummaryCursor::Newsreader(_, ts) => -ts.timestamp,
|
||||||
|
ThreadSummaryCursor::Notmuch(_, ts) => -ts.timestamp,
|
||||||
|
});
|
||||||
|
let ids = results
|
||||||
|
.into_iter()
|
||||||
|
.map(|r| match r {
|
||||||
|
ThreadSummaryCursor::Newsreader(_, ts) => ts.thread,
|
||||||
|
ThreadSummaryCursor::Notmuch(_, ts) => ts.thread,
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
Ok(ids)
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: this function doesn't get parallelism, possibly because notmuch is sync and blocks,
|
// TODO: this function doesn't get parallelism, possibly because notmuch is sync and blocks,
|
||||||
@ -565,9 +593,9 @@ async fn tantivy_search(
|
|||||||
.collect())
|
.collect())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct MutationRoot;
|
pub struct Mutation;
|
||||||
#[Object]
|
#[Object]
|
||||||
impl MutationRoot {
|
impl Mutation {
|
||||||
#[instrument(skip_all, fields(query=query, unread=unread, rid=request_id()))]
|
#[instrument(skip_all, fields(query=query, unread=unread, rid=request_id()))]
|
||||||
async fn set_read_status<'ctx>(
|
async fn set_read_status<'ctx>(
|
||||||
&self,
|
&self,
|
||||||
@ -639,51 +667,4 @@ impl MutationRoot {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct SubscriptionRoot;
|
pub type GraphqlSchema = Schema<QueryRoot, Mutation, EmptySubscription>;
|
||||||
#[Subscription]
|
|
||||||
impl SubscriptionRoot {
|
|
||||||
async fn values(&self, _ctx: &Context<'_>) -> Result<impl Stream<Item = usize>, Error> {
|
|
||||||
Ok(stream::iter(0..10))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub type GraphqlSchema = Schema<QueryRoot, MutationRoot, SubscriptionRoot>;
|
|
||||||
|
|
||||||
#[instrument(skip_all, fields(query=query))]
|
|
||||||
pub async fn compute_catchup_ids(
|
|
||||||
nm: &Notmuch,
|
|
||||||
pool: &PgPool,
|
|
||||||
query: &str,
|
|
||||||
) -> Result<Vec<String>, Error> {
|
|
||||||
let query: Query = query.parse()?;
|
|
||||||
// TODO: implement optimized versions of fetching just IDs
|
|
||||||
let newsreader_fut = newsreader_search(pool, None, None, None, None, &query);
|
|
||||||
let notmuch_fut = notmuch_search(nm, None, None, None, None, &query);
|
|
||||||
let (newsreader_results, notmuch_results) = join!(newsreader_fut, notmuch_fut);
|
|
||||||
|
|
||||||
let newsreader_results = newsreader_results?;
|
|
||||||
let notmuch_results = notmuch_results?;
|
|
||||||
info!(
|
|
||||||
"newsreader_results ({}) notmuch_results ({})",
|
|
||||||
newsreader_results.len(),
|
|
||||||
notmuch_results.len(),
|
|
||||||
);
|
|
||||||
|
|
||||||
let mut results: Vec<_> = newsreader_results
|
|
||||||
.into_iter()
|
|
||||||
.chain(notmuch_results)
|
|
||||||
.collect();
|
|
||||||
// The leading '-' is to reverse sort
|
|
||||||
results.sort_by_key(|item| match item {
|
|
||||||
ThreadSummaryCursor::Newsreader(_, ts) => -ts.timestamp,
|
|
||||||
ThreadSummaryCursor::Notmuch(_, ts) => -ts.timestamp,
|
|
||||||
});
|
|
||||||
let ids = results
|
|
||||||
.into_iter()
|
|
||||||
.map(|r| match r {
|
|
||||||
ThreadSummaryCursor::Newsreader(_, ts) => ts.thread,
|
|
||||||
ThreadSummaryCursor::Notmuch(_, ts) => ts.thread,
|
|
||||||
})
|
|
||||||
.collect();
|
|
||||||
Ok(ids)
|
|
||||||
}
|
|
||||||
|
|||||||
@ -1,10 +1,9 @@
|
|||||||
pub mod config;
|
pub mod config;
|
||||||
pub mod error;
|
pub mod error;
|
||||||
pub mod graphql;
|
pub mod graphql;
|
||||||
|
pub mod mail;
|
||||||
pub mod newsreader;
|
pub mod newsreader;
|
||||||
pub mod nm;
|
pub mod nm;
|
||||||
pub mod ws;
|
|
||||||
|
|
||||||
#[cfg(feature = "tantivy")]
|
#[cfg(feature = "tantivy")]
|
||||||
pub mod tantivy;
|
pub mod tantivy;
|
||||||
|
|
||||||
@ -19,7 +18,6 @@ use std::{
|
|||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use cacher::{Cacher, FilesystemCacher};
|
use cacher::{Cacher, FilesystemCacher};
|
||||||
use css_inline::{CSSInliner, InlineError, InlineOptions};
|
use css_inline::{CSSInliner, InlineError, InlineOptions};
|
||||||
pub use error::ServerError;
|
|
||||||
use linkify::{LinkFinder, LinkKind};
|
use linkify::{LinkFinder, LinkKind};
|
||||||
use log::{debug, error, info, warn};
|
use log::{debug, error, info, warn};
|
||||||
use lol_html::{
|
use lol_html::{
|
||||||
@ -35,6 +33,7 @@ use thiserror::Error;
|
|||||||
use url::Url;
|
use url::Url;
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
|
error::ServerError,
|
||||||
graphql::{Corpus, ThreadSummary},
|
graphql::{Corpus, ThreadSummary},
|
||||||
newsreader::is_newsreader_thread,
|
newsreader::is_newsreader_thread,
|
||||||
nm::is_notmuch_thread_or_id,
|
nm::is_notmuch_thread_or_id,
|
||||||
@ -444,16 +443,19 @@ pub fn sanitize_html(
|
|||||||
let mut element_content_handlers = vec![
|
let mut element_content_handlers = vec![
|
||||||
// Remove width and height attributes on elements
|
// Remove width and height attributes on elements
|
||||||
element!("[width],[height]", |el| {
|
element!("[width],[height]", |el| {
|
||||||
|
println!("width or height {el:?}");
|
||||||
el.remove_attribute("width");
|
el.remove_attribute("width");
|
||||||
el.remove_attribute("height");
|
el.remove_attribute("height");
|
||||||
Ok(())
|
Ok(())
|
||||||
}),
|
}),
|
||||||
// Remove width and height values from inline styles
|
// Remove width and height values from inline styles
|
||||||
element!("[style]", |el| {
|
element!("[style]", |el| {
|
||||||
|
println!("style {el:?}");
|
||||||
let style = el.get_attribute("style").unwrap();
|
let style = el.get_attribute("style").unwrap();
|
||||||
let style = style
|
let style = style
|
||||||
.split(";")
|
.split(";")
|
||||||
.filter(|s| {
|
.filter(|s| {
|
||||||
|
println!("s {s}");
|
||||||
let Some((k, _)) = s.split_once(':') else {
|
let Some((k, _)) = s.split_once(':') else {
|
||||||
return true;
|
return true;
|
||||||
};
|
};
|
||||||
@ -465,6 +467,7 @@ pub fn sanitize_html(
|
|||||||
})
|
})
|
||||||
.collect::<Vec<_>>()
|
.collect::<Vec<_>>()
|
||||||
.join(";");
|
.join(";");
|
||||||
|
println!("style: {style}");
|
||||||
if let Err(e) = el.set_attribute("style", &style) {
|
if let Err(e) = el.set_attribute("style", &style) {
|
||||||
error!("Failed to set style attribute: {e}");
|
error!("Failed to set style attribute: {e}");
|
||||||
}
|
}
|
||||||
|
|||||||
113
server/src/mail.rs
Normal file
113
server/src/mail.rs
Normal file
@ -0,0 +1,113 @@
|
|||||||
|
use std::{fs::File, io::Read};
|
||||||
|
|
||||||
|
use mailparse::{
|
||||||
|
addrparse_header, dateparse, parse_mail, MailHeaderMap, MailParseError, ParsedMail,
|
||||||
|
};
|
||||||
|
use sqlx::postgres::PgPool;
|
||||||
|
use thiserror::Error;
|
||||||
|
use tracing::info;
|
||||||
|
|
||||||
|
#[derive(Error, Debug)]
|
||||||
|
pub enum MailError {
|
||||||
|
#[error("missing from header")]
|
||||||
|
MissingFrom,
|
||||||
|
#[error("missing from header display name")]
|
||||||
|
MissingFromDisplayName,
|
||||||
|
#[error("missing subject header")]
|
||||||
|
MissingSubject,
|
||||||
|
#[error("missing html part")]
|
||||||
|
MissingHtmlPart,
|
||||||
|
#[error("missing message ID")]
|
||||||
|
MissingMessageId,
|
||||||
|
#[error("missing date")]
|
||||||
|
MissingDate,
|
||||||
|
#[error("DB error {0}")]
|
||||||
|
SqlxError(#[from] sqlx::Error),
|
||||||
|
#[error("IO error {0}")]
|
||||||
|
IOError(#[from] std::io::Error),
|
||||||
|
#[error("mail parse error {0}")]
|
||||||
|
MailParseError(#[from] MailParseError),
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn read_mail_to_db(pool: &PgPool, path: &str) -> Result<(), MailError> {
|
||||||
|
let mut file = File::open(path)?;
|
||||||
|
let mut buffer = Vec::new();
|
||||||
|
file.read_to_end(&mut buffer)?;
|
||||||
|
let m = parse_mail(&buffer)?;
|
||||||
|
|
||||||
|
let subject = m
|
||||||
|
.headers
|
||||||
|
.get_first_value("subject")
|
||||||
|
.ok_or(MailError::MissingSubject)?;
|
||||||
|
|
||||||
|
let from = addrparse_header(
|
||||||
|
m.headers
|
||||||
|
.get_first_header("from")
|
||||||
|
.ok_or(MailError::MissingFrom)?,
|
||||||
|
)?;
|
||||||
|
let from = from.extract_single_info().ok_or(MailError::MissingFrom)?;
|
||||||
|
let name = from.display_name.ok_or(MailError::MissingFromDisplayName)?;
|
||||||
|
let slug = name.to_lowercase().replace(' ', "-");
|
||||||
|
let url = from.addr;
|
||||||
|
let message_id = m
|
||||||
|
.headers
|
||||||
|
.get_first_value("Message-ID")
|
||||||
|
.ok_or(MailError::MissingMessageId)?;
|
||||||
|
let uid = &message_id;
|
||||||
|
let feed_id = find_feed(&pool, &name, &slug, &url).await?;
|
||||||
|
let date = dateparse(
|
||||||
|
&m.headers
|
||||||
|
.get_first_value("Date")
|
||||||
|
.ok_or(MailError::MissingDate)?,
|
||||||
|
)?;
|
||||||
|
|
||||||
|
println!("Feed: {feed_id} Subject: {}", subject);
|
||||||
|
|
||||||
|
if let Some(_m) = first_html(&m) {
|
||||||
|
info!("add email {slug} {subject} {message_id} {date} {uid} {url}");
|
||||||
|
} else {
|
||||||
|
return Err(MailError::MissingHtmlPart.into());
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
fn first_html<'m>(m: &'m ParsedMail<'m>) -> Option<&'m ParsedMail<'m>> {
|
||||||
|
for ele in m.parts() {
|
||||||
|
if ele.ctype.mimetype == "text/html" {
|
||||||
|
return Some(ele);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
None
|
||||||
|
}
|
||||||
|
async fn find_feed(pool: &PgPool, name: &str, slug: &str, url: &str) -> Result<i32, MailError> {
|
||||||
|
match sqlx::query!(
|
||||||
|
r#"
|
||||||
|
SELECT id
|
||||||
|
FROM feed
|
||||||
|
WHERE slug = $1
|
||||||
|
"#,
|
||||||
|
slug
|
||||||
|
)
|
||||||
|
.fetch_one(pool)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Err(sqlx::Error::RowNotFound) => {
|
||||||
|
let rec = sqlx::query!(
|
||||||
|
r#"
|
||||||
|
INSERT INTO feed ( name, slug, url, homepage, selector )
|
||||||
|
VALUES ( $1, $2, $3, '', '' )
|
||||||
|
RETURNING id
|
||||||
|
"#,
|
||||||
|
name,
|
||||||
|
slug,
|
||||||
|
url
|
||||||
|
)
|
||||||
|
.fetch_one(pool)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
return Ok(rec.id);
|
||||||
|
}
|
||||||
|
Ok(rec) => return Ok(rec.id),
|
||||||
|
Err(e) => return Err(e.into()),
|
||||||
|
};
|
||||||
|
}
|
||||||
@ -338,7 +338,7 @@ pub async fn thread(
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn email_addresses(
|
fn email_addresses(
|
||||||
_path: &str,
|
path: &str,
|
||||||
m: &ParsedMail,
|
m: &ParsedMail,
|
||||||
header_name: &str,
|
header_name: &str,
|
||||||
) -> Result<Vec<Email>, ServerError> {
|
) -> Result<Vec<Email>, ServerError> {
|
||||||
@ -349,7 +349,9 @@ fn email_addresses(
|
|||||||
for ma in mal.into_inner() {
|
for ma in mal.into_inner() {
|
||||||
match ma {
|
match ma {
|
||||||
mailparse::MailAddr::Group(gi) => {
|
mailparse::MailAddr::Group(gi) => {
|
||||||
if !gi.group_name.contains("ndisclosed") {}
|
if !gi.group_name.contains("ndisclosed") {
|
||||||
|
println!("[{path}][{header_name}] Group: {gi}");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
mailparse::MailAddr::Single(s) => addrs.push(Email {
|
mailparse::MailAddr::Single(s) => addrs.push(Email {
|
||||||
name: s.display_name,
|
name: s.display_name,
|
||||||
|
|||||||
@ -1,35 +0,0 @@
|
|||||||
use std::{collections::HashMap, net::SocketAddr};
|
|
||||||
|
|
||||||
use axum::extract::ws::{Message, WebSocket};
|
|
||||||
use letterbox_shared::WebsocketMessage;
|
|
||||||
use tracing::{info, warn};
|
|
||||||
|
|
||||||
#[derive(Default)]
|
|
||||||
pub struct ConnectionTracker {
|
|
||||||
peers: HashMap<SocketAddr, WebSocket>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl ConnectionTracker {
|
|
||||||
pub async fn add_peer(&mut self, socket: WebSocket, who: SocketAddr) {
|
|
||||||
warn!("adding {who:?} to connection tracker");
|
|
||||||
self.peers.insert(who, socket);
|
|
||||||
self.send_message_all(WebsocketMessage::RefreshMessages)
|
|
||||||
.await;
|
|
||||||
}
|
|
||||||
pub async fn send_message_all(&mut self, msg: WebsocketMessage) {
|
|
||||||
info!("send_message_all {msg}");
|
|
||||||
let m = serde_json::to_string(&msg).expect("failed to json encode WebsocketMessage");
|
|
||||||
let mut bad_peers = Vec::new();
|
|
||||||
for (who, socket) in &mut self.peers.iter_mut() {
|
|
||||||
if let Err(e) = socket.send(Message::Text(m.clone().into())).await {
|
|
||||||
warn!("{:?} is bad, scheduling for removal: {e}", who);
|
|
||||||
bad_peers.push(who.clone());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for b in bad_peers {
|
|
||||||
info!("removing bad peer {b:?}");
|
|
||||||
self.peers.remove(&b);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -12,6 +12,5 @@ version.workspace = true
|
|||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
build-info = "0.0.40"
|
build-info = "0.0.40"
|
||||||
letterbox-notmuch = { version = "0.15.0", path = "../notmuch", registry = "xinu" }
|
letterbox-notmuch = { version = "0.12.1", path = "../notmuch", registry = "xinu" }
|
||||||
serde = { version = "1.0.147", features = ["derive"] }
|
serde = { version = "1.0.147", features = ["derive"] }
|
||||||
strum_macros = "0.27.1"
|
|
||||||
|
|||||||
@ -13,10 +13,8 @@ pub struct SearchResult {
|
|||||||
pub total: usize,
|
pub total: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, Debug, strum_macros::Display)]
|
#[derive(Serialize, Deserialize, Debug)]
|
||||||
pub enum WebsocketMessage {
|
pub struct Message {}
|
||||||
RefreshMessages,
|
|
||||||
}
|
|
||||||
|
|
||||||
pub mod urls {
|
pub mod urls {
|
||||||
pub const MOUNT_POINT: &'static str = "/api";
|
pub const MOUNT_POINT: &'static str = "/api";
|
||||||
|
|||||||
@ -33,13 +33,10 @@ wasm-bindgen = "=0.2.100"
|
|||||||
uuid = { version = "1.13.1", features = [
|
uuid = { version = "1.13.1", features = [
|
||||||
"js",
|
"js",
|
||||||
] } # direct dep to set js feature, prevents Rng issues
|
] } # direct dep to set js feature, prevents Rng issues
|
||||||
letterbox-shared = { version = "0.15.0", path = "../shared", registry = "xinu" }
|
letterbox-shared = { version = "0.12.1", path = "../shared", registry = "xinu" }
|
||||||
letterbox-notmuch = { version = "0.15.0", path = "../notmuch", registry = "xinu" }
|
letterbox-notmuch = { version = "0.12.1", path = "../notmuch", registry = "xinu" }
|
||||||
seed_hooks = { version = "0.4.0", registry = "xinu" }
|
seed_hooks = { version = "0.4.0", registry = "xinu" }
|
||||||
strum_macros = "0.27.1"
|
strum_macros = "0.27.1"
|
||||||
gloo-console = "0.3.0"
|
|
||||||
[target.'cfg(target_arch = "wasm32")'.dependencies]
|
|
||||||
wasm-sockets = "1.0.0"
|
|
||||||
|
|
||||||
[package.metadata.wasm-pack.profile.release]
|
[package.metadata.wasm-pack.profile.release]
|
||||||
wasm-opt = ['-Os']
|
wasm-opt = ['-Os']
|
||||||
|
|||||||
@ -6,10 +6,6 @@ release = false
|
|||||||
address = "0.0.0.0"
|
address = "0.0.0.0"
|
||||||
port = 6758
|
port = 6758
|
||||||
|
|
||||||
[[proxy]]
|
|
||||||
ws = true
|
|
||||||
backend = "ws://localhost:9345/api/ws"
|
|
||||||
|
|
||||||
[[proxy]]
|
[[proxy]]
|
||||||
backend = "http://localhost:9345/api/"
|
backend = "http://localhost:9345/api/"
|
||||||
|
|
||||||
|
|||||||
@ -2,8 +2,6 @@
|
|||||||
// - it's useful when you want to check your code with `cargo make verify`
|
// - it's useful when you want to check your code with `cargo make verify`
|
||||||
// but some rules are too "annoying" or are not applicable for your case.)
|
// but some rules are too "annoying" or are not applicable for your case.)
|
||||||
#![allow(clippy::wildcard_imports)]
|
#![allow(clippy::wildcard_imports)]
|
||||||
// Until https://github.com/rust-lang/rust/issues/138762 is addressed in dependencies
|
|
||||||
#![allow(wasm_c_abi)]
|
|
||||||
|
|
||||||
use log::Level;
|
use log::Level;
|
||||||
use seed::App;
|
use seed::App;
|
||||||
@ -13,7 +11,6 @@ mod consts;
|
|||||||
mod graphql;
|
mod graphql;
|
||||||
mod state;
|
mod state;
|
||||||
mod view;
|
mod view;
|
||||||
mod websocket;
|
|
||||||
|
|
||||||
fn main() {
|
fn main() {
|
||||||
// This provides better error messages in debug mode.
|
// This provides better error messages in debug mode.
|
||||||
|
|||||||
@ -1,7 +1,6 @@
|
|||||||
use std::collections::HashSet;
|
use std::collections::HashSet;
|
||||||
|
|
||||||
use graphql_client::GraphQLQuery;
|
use graphql_client::GraphQLQuery;
|
||||||
use letterbox_shared::WebsocketMessage;
|
|
||||||
use log::{debug, error, info, warn};
|
use log::{debug, error, info, warn};
|
||||||
use seed::{prelude::*, *};
|
use seed::{prelude::*, *};
|
||||||
use thiserror::Error;
|
use thiserror::Error;
|
||||||
@ -12,7 +11,6 @@ use crate::{
|
|||||||
consts::SEARCH_RESULTS_PER_PAGE,
|
consts::SEARCH_RESULTS_PER_PAGE,
|
||||||
graphql,
|
graphql,
|
||||||
graphql::{front_page_query::*, send_graphql, show_thread_query::*},
|
graphql::{front_page_query::*, send_graphql, show_thread_query::*},
|
||||||
websocket,
|
|
||||||
};
|
};
|
||||||
|
|
||||||
/// Used to fake the unread string while in development
|
/// Used to fake the unread string while in development
|
||||||
@ -40,11 +38,11 @@ pub fn init(url: Url, orders: &mut impl Orders<Msg>) -> Model {
|
|||||||
if url.hash().is_none() {
|
if url.hash().is_none() {
|
||||||
orders.request_url(urls::search(unread_query(), 0));
|
orders.request_url(urls::search(unread_query(), 0));
|
||||||
} else {
|
} else {
|
||||||
orders.request_url(url.clone());
|
orders.request_url(url);
|
||||||
};
|
};
|
||||||
// TODO(wathiede): only do this while viewing the index? Or maybe add a new message that force
|
// TODO(wathiede): only do this while viewing the index? Or maybe add a new message that force
|
||||||
// 'notmuch new' on the server periodically?
|
// 'notmuch new' on the server periodically?
|
||||||
//orders.stream(streams::interval(30_000, || Msg::RefreshStart));
|
orders.stream(streams::interval(30_000, || Msg::RefreshStart));
|
||||||
orders.subscribe(Msg::OnUrlChanged);
|
orders.subscribe(Msg::OnUrlChanged);
|
||||||
orders.stream(streams::window_event(Ev::Scroll, |_| Msg::WindowScrolled));
|
orders.stream(streams::window_event(Ev::Scroll, |_| Msg::WindowScrolled));
|
||||||
|
|
||||||
@ -62,7 +60,6 @@ pub fn init(url: Url, orders: &mut impl Orders<Msg>) -> Model {
|
|||||||
},
|
},
|
||||||
catchup: None,
|
catchup: None,
|
||||||
last_url: Url::current(),
|
last_url: Url::current(),
|
||||||
websocket: websocket::init(&mut orders.proxy(Msg::WebSocket)),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -662,18 +659,6 @@ pub fn update(msg: Msg, model: &mut Model, orders: &mut impl Orders<Msg>) {
|
|||||||
orders.send_msg(Msg::ScrollToTop);
|
orders.send_msg(Msg::ScrollToTop);
|
||||||
model.catchup = None;
|
model.catchup = None;
|
||||||
}
|
}
|
||||||
|
|
||||||
Msg::WebSocket(ws) => {
|
|
||||||
websocket::update(ws, &mut model.websocket, &mut orders.proxy(Msg::WebSocket));
|
|
||||||
while let Some(msg) = model.websocket.updates.pop_front() {
|
|
||||||
orders.send_msg(Msg::WebsocketMessage(msg));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Msg::WebsocketMessage(msg) => {
|
|
||||||
match msg {
|
|
||||||
WebsocketMessage::RefreshMessages => orders.send_msg(Msg::Refresh),
|
|
||||||
};
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -706,7 +691,6 @@ pub struct Model {
|
|||||||
pub versions: Version,
|
pub versions: Version,
|
||||||
pub catchup: Option<Catchup>,
|
pub catchup: Option<Catchup>,
|
||||||
pub last_url: Url,
|
pub last_url: Url,
|
||||||
pub websocket: websocket::Model,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
@ -838,7 +822,4 @@ pub enum Msg {
|
|||||||
CatchupMarkAsRead,
|
CatchupMarkAsRead,
|
||||||
CatchupNext,
|
CatchupNext,
|
||||||
CatchupExit,
|
CatchupExit,
|
||||||
|
|
||||||
WebSocket(websocket::Msg),
|
|
||||||
WebsocketMessage(WebsocketMessage),
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -12,7 +12,7 @@ use web_sys::{HtmlElement, HtmlInputElement};
|
|||||||
use crate::{
|
use crate::{
|
||||||
api::urls,
|
api::urls,
|
||||||
graphql::{front_page_query::*, show_thread_query::*},
|
graphql::{front_page_query::*, show_thread_query::*},
|
||||||
state::{unread_query, CatchupItem, Context, Model, Msg, RefreshingState, Tag, Version},
|
state::{CatchupItem, Context, Model, Msg, RefreshingState, Tag, Version},
|
||||||
};
|
};
|
||||||
|
|
||||||
// TODO(wathiede): create a QueryString enum that wraps single and multiple message ids and thread
|
// TODO(wathiede): create a QueryString enum that wraps single and multiple message ids and thread
|
||||||
@ -1210,18 +1210,13 @@ fn view_header(
|
|||||||
false
|
false
|
||||||
};
|
};
|
||||||
let query = Url::decode_uri_component(query).unwrap_or("".to_string());
|
let query = Url::decode_uri_component(query).unwrap_or("".to_string());
|
||||||
let filter_all = String::new();
|
|
||||||
let filter_unread = unread_query().to_string();
|
|
||||||
let filter_news = "is:unread is:news".to_string();
|
|
||||||
let filter_mail = "is:unread is:mail".to_string();
|
|
||||||
let highlight_color = "bg-sky-800";
|
|
||||||
|
|
||||||
nav![
|
nav![
|
||||||
C!["flex", "flex-col"],
|
C!["flex", "flex-col"],
|
||||||
div![
|
div![
|
||||||
C!["flex-auto", "flex"],
|
C!["flex-auto", "flex"],
|
||||||
button![
|
button![
|
||||||
C![IF!(is_error => "bg-red-500"), "rounded-none"],
|
C![IF![is_error => "bg-red-500"], "rounded-none"],
|
||||||
tw_classes::button(),
|
tw_classes::button(),
|
||||||
span![i![C![
|
span![i![C![
|
||||||
"fa-solid",
|
"fa-solid",
|
||||||
@ -1231,35 +1226,35 @@ fn view_header(
|
|||||||
ev(Ev::Click, |_| Msg::RefreshStart),
|
ev(Ev::Click, |_| Msg::RefreshStart),
|
||||||
],
|
],
|
||||||
button![
|
button![
|
||||||
IF!(query == filter_all => C![highlight_color]),
|
|
||||||
tw_classes::button(),
|
tw_classes::button(),
|
||||||
C!["grow", "rounded-none"],
|
C!["grow", "rounded-none"],
|
||||||
"All",
|
"All",
|
||||||
ev(Ev::Click, |_| Msg::SearchQuery(filter_all)),
|
ev(Ev::Click, |_| Msg::SearchQuery(String::new())),
|
||||||
],
|
],
|
||||||
button![
|
button![
|
||||||
IF!(query == filter_unread => C![highlight_color]),
|
|
||||||
tw_classes::button(),
|
tw_classes::button(),
|
||||||
C!["grow", "rounded-none"],
|
C!["grow", "rounded-none"],
|
||||||
span![i![C!["far", "fa-envelope"]]],
|
span![i![C!["far", "fa-envelope"]]],
|
||||||
" Unread",
|
" Unread",
|
||||||
ev(Ev::Click, |_| Msg::SearchQuery(filter_unread)),
|
ev(Ev::Click, |_| Msg::SearchQuery("is:unread".to_string())),
|
||||||
],
|
],
|
||||||
button![
|
button![
|
||||||
IF!(query == filter_news => C![highlight_color]),
|
|
||||||
tw_classes::button(),
|
tw_classes::button(),
|
||||||
C!["grow", "rounded-none"],
|
C!["grow", "rounded-none"],
|
||||||
span![i![C!["far", "fa-envelope"]]],
|
span![i![C!["far", "fa-envelope"]]],
|
||||||
" News",
|
" News",
|
||||||
ev(Ev::Click, |_| Msg::SearchQuery(filter_news)),
|
ev(Ev::Click, |_| Msg::SearchQuery(
|
||||||
|
"is:unread is:news".to_string()
|
||||||
|
)),
|
||||||
],
|
],
|
||||||
button![
|
button![
|
||||||
IF!(query == filter_mail => C![highlight_color]),
|
|
||||||
tw_classes::button(),
|
tw_classes::button(),
|
||||||
C!["grow", "rounded-none"],
|
C!["grow", "rounded-none"],
|
||||||
span![i![C!["far", "fa-envelope"]]],
|
span![i![C!["far", "fa-envelope"]]],
|
||||||
" Mail",
|
" Mail",
|
||||||
ev(Ev::Click, |_| Msg::SearchQuery(filter_mail)),
|
ev(Ev::Click, |_| Msg::SearchQuery(
|
||||||
|
"is:unread is:mail".to_string()
|
||||||
|
)),
|
||||||
],
|
],
|
||||||
],
|
],
|
||||||
div![
|
div![
|
||||||
|
|||||||
@ -1,218 +0,0 @@
|
|||||||
use std::{collections::VecDeque, rc::Rc};
|
|
||||||
|
|
||||||
use letterbox_shared::WebsocketMessage;
|
|
||||||
use log::{error, info};
|
|
||||||
use seed::prelude::*;
|
|
||||||
use serde::{Deserialize, Serialize};
|
|
||||||
#[cfg(not(target_arch = "wasm32"))]
|
|
||||||
#[allow(dead_code)]
|
|
||||||
mod wasm_sockets {
|
|
||||||
use std::{cell::RefCell, rc::Rc};
|
|
||||||
|
|
||||||
use thiserror::Error;
|
|
||||||
use web_sys::{CloseEvent, ErrorEvent};
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub struct JsValue;
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub enum ConnectionStatus {
|
|
||||||
/// Connecting to a server
|
|
||||||
Connecting,
|
|
||||||
/// Connected to a server
|
|
||||||
Connected,
|
|
||||||
/// Disconnected from a server due to an error
|
|
||||||
Error,
|
|
||||||
/// Disconnected from a server without an error
|
|
||||||
Disconnected,
|
|
||||||
}
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub struct EventClient {
|
|
||||||
pub status: Rc<RefCell<ConnectionStatus>>,
|
|
||||||
}
|
|
||||||
impl EventClient {
|
|
||||||
pub fn new(_: &str) -> Result<Self, WebSocketError> {
|
|
||||||
todo!("this is a mock")
|
|
||||||
}
|
|
||||||
pub fn send_string(&self, _essage: &str) -> Result<(), JsValue> {
|
|
||||||
todo!("this is a mock")
|
|
||||||
}
|
|
||||||
pub fn set_on_error(&mut self, _: Option<Box<dyn Fn(ErrorEvent)>>) {
|
|
||||||
todo!("this is a mock")
|
|
||||||
}
|
|
||||||
pub fn set_on_connection(&mut self, _: Option<Box<dyn Fn(&EventClient)>>) {
|
|
||||||
todo!("this is a mock")
|
|
||||||
}
|
|
||||||
pub fn set_on_close(&mut self, _: Option<Box<dyn Fn(CloseEvent)>>) {
|
|
||||||
todo!("this is a mock")
|
|
||||||
}
|
|
||||||
pub fn set_on_message(&mut self, _: Option<Box<dyn Fn(&EventClient, Message)>>) {
|
|
||||||
todo!("this is a mock")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
#[derive(Debug, Clone)]
|
|
||||||
pub enum Message {
|
|
||||||
Text(String),
|
|
||||||
Binary(Vec<u8>),
|
|
||||||
}
|
|
||||||
#[derive(Debug, Clone, Error)]
|
|
||||||
pub enum WebSocketError {}
|
|
||||||
}
|
|
||||||
#[cfg(not(target_arch = "wasm32"))]
|
|
||||||
use wasm_sockets::{ConnectionStatus, EventClient, Message, WebSocketError};
|
|
||||||
#[cfg(target_arch = "wasm32")]
|
|
||||||
use wasm_sockets::{ConnectionStatus, EventClient, Message, WebSocketError};
|
|
||||||
use web_sys::CloseEvent;
|
|
||||||
|
|
||||||
/// Message from the server to the client.
|
|
||||||
#[derive(Serialize, Deserialize)]
|
|
||||||
pub struct ServerMessage {
|
|
||||||
pub id: usize,
|
|
||||||
pub text: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Message from the client to the server.
|
|
||||||
#[derive(Serialize, Deserialize)]
|
|
||||||
pub struct ClientMessage {
|
|
||||||
pub text: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
//const WS_URL: &str = "wss://9000.z.xinu.tv/api/ws";
|
|
||||||
//const WS_URL: &str = "wss://9345.z.xinu.tv/api/graphql/ws";
|
|
||||||
const WS_URL: &str = "wss://6758.z.xinu.tv/api/ws";
|
|
||||||
|
|
||||||
// ------ ------
|
|
||||||
// Model
|
|
||||||
// ------ ------
|
|
||||||
|
|
||||||
pub struct Model {
|
|
||||||
web_socket: EventClient,
|
|
||||||
web_socket_reconnector: Option<StreamHandle>,
|
|
||||||
pub updates: VecDeque<WebsocketMessage>,
|
|
||||||
}
|
|
||||||
|
|
||||||
// ------ ------
|
|
||||||
// Init
|
|
||||||
// ------ ------
|
|
||||||
|
|
||||||
pub fn init(orders: &mut impl Orders<Msg>) -> Model {
|
|
||||||
Model {
|
|
||||||
web_socket: create_websocket(orders).unwrap(),
|
|
||||||
web_socket_reconnector: None,
|
|
||||||
updates: VecDeque::new(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// ------ ------
|
|
||||||
// Update
|
|
||||||
// ------ ------
|
|
||||||
|
|
||||||
pub enum Msg {
|
|
||||||
WebSocketOpened,
|
|
||||||
TextMessageReceived(WebsocketMessage),
|
|
||||||
WebSocketClosed(CloseEvent),
|
|
||||||
WebSocketFailed,
|
|
||||||
ReconnectWebSocket(usize),
|
|
||||||
#[allow(dead_code)]
|
|
||||||
SendMessage(ClientMessage),
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn update(msg: Msg, model: &mut Model, orders: &mut impl Orders<Msg>) {
|
|
||||||
match msg {
|
|
||||||
Msg::WebSocketOpened => {
|
|
||||||
model.web_socket_reconnector = None;
|
|
||||||
info!("WebSocket connection is open now");
|
|
||||||
}
|
|
||||||
Msg::TextMessageReceived(msg) => {
|
|
||||||
model.updates.push_back(msg);
|
|
||||||
}
|
|
||||||
Msg::WebSocketClosed(close_event) => {
|
|
||||||
info!(
|
|
||||||
r#"==================
|
|
||||||
WebSocket connection was closed:
|
|
||||||
Clean: {0}
|
|
||||||
Code: {1}
|
|
||||||
Reason: {2}
|
|
||||||
=================="#,
|
|
||||||
close_event.was_clean(),
|
|
||||||
close_event.code(),
|
|
||||||
close_event.reason()
|
|
||||||
);
|
|
||||||
|
|
||||||
// Chrome doesn't invoke `on_error` when the connection is lost.
|
|
||||||
if !close_event.was_clean() && model.web_socket_reconnector.is_none() {
|
|
||||||
model.web_socket_reconnector = Some(
|
|
||||||
orders.stream_with_handle(streams::backoff(None, Msg::ReconnectWebSocket)),
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Msg::WebSocketFailed => {
|
|
||||||
info!("WebSocket failed");
|
|
||||||
if model.web_socket_reconnector.is_none() {
|
|
||||||
model.web_socket_reconnector = Some(
|
|
||||||
orders.stream_with_handle(streams::backoff(None, Msg::ReconnectWebSocket)),
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Msg::ReconnectWebSocket(retries) => {
|
|
||||||
info!("Reconnect attempt: {}", retries);
|
|
||||||
model.web_socket = create_websocket(orders).unwrap();
|
|
||||||
}
|
|
||||||
Msg::SendMessage(msg) => {
|
|
||||||
let txt = serde_json::to_string(&msg).unwrap();
|
|
||||||
model.web_socket.send_string(&txt).unwrap();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn create_websocket(orders: &impl Orders<Msg>) -> Result<EventClient, WebSocketError> {
|
|
||||||
let msg_sender = orders.msg_sender();
|
|
||||||
|
|
||||||
let mut client = EventClient::new(WS_URL)?;
|
|
||||||
|
|
||||||
client.set_on_error(Some(Box::new(|error| {
|
|
||||||
gloo_console::error!("WS: ", error);
|
|
||||||
})));
|
|
||||||
|
|
||||||
let send = msg_sender.clone();
|
|
||||||
client.set_on_connection(Some(Box::new(move |client: &EventClient| {
|
|
||||||
info!("{:#?}", client.status);
|
|
||||||
let msg = match *client.status.borrow() {
|
|
||||||
ConnectionStatus::Connecting => {
|
|
||||||
info!("Connecting...");
|
|
||||||
None
|
|
||||||
}
|
|
||||||
ConnectionStatus::Connected => Some(Msg::WebSocketOpened),
|
|
||||||
ConnectionStatus::Error => Some(Msg::WebSocketFailed),
|
|
||||||
ConnectionStatus::Disconnected => {
|
|
||||||
info!("Disconnected");
|
|
||||||
None
|
|
||||||
}
|
|
||||||
};
|
|
||||||
send(msg);
|
|
||||||
})));
|
|
||||||
|
|
||||||
let send = msg_sender.clone();
|
|
||||||
client.set_on_close(Some(Box::new(move |ev| {
|
|
||||||
info!("WS: Connection closed");
|
|
||||||
send(Some(Msg::WebSocketClosed(ev)));
|
|
||||||
})));
|
|
||||||
|
|
||||||
let send = msg_sender.clone();
|
|
||||||
client.set_on_message(Some(Box::new(move |_: &EventClient, msg: Message| {
|
|
||||||
decode_message(msg, Rc::clone(&send))
|
|
||||||
})));
|
|
||||||
|
|
||||||
Ok(client)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn decode_message(message: Message, msg_sender: Rc<dyn Fn(Option<Msg>)>) {
|
|
||||||
match message {
|
|
||||||
Message::Text(txt) => {
|
|
||||||
let msg: WebsocketMessage = serde_json::from_str(&txt).unwrap_or_else(|e| {
|
|
||||||
panic!("failed to parse json into WebsocketMessage: {e}\n'{txt}'")
|
|
||||||
});
|
|
||||||
msg_sender(Some(Msg::TextMessageReceived(msg)));
|
|
||||||
}
|
|
||||||
m => error!("unexpected message type received of {m:?}"),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Loading…
x
Reference in New Issue
Block a user