diff --git a/Cargo.lock b/Cargo.lock index cd7a683..356774a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -448,6 +448,40 @@ dependencies = [ "tracing", ] +[[package]] +name = "axum-extra" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "45bf463831f5131b7d3c756525b305d40f1185b688565648a92e1392ca35713d" +dependencies = [ + "axum 0.8.3", + "axum-core 0.5.2", + "bytes 1.10.1", + "futures-util", + "headers", + "http 1.3.1", + "http-body 1.0.1", + "http-body-util", + "mime", + "pin-project-lite", + "rustversion", + "serde", + "tower 0.5.2", + "tower-layer", + "tower-service", +] + +[[package]] +name = "axum-macros" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "604fde5e028fea851ce1d8570bbdc034bec850d157f7569d10f347d06808c05c" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.100", +] + [[package]] name = "backtrace" version = "0.3.74" @@ -1952,6 +1986,19 @@ version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a8d1add55171497b4705a648c6b583acafb01d58050a51727785f0b2c8e0a2b2" +[[package]] +name = "gloo-console" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2a17868f56b4a24f677b17c8cb69958385102fa879418052d60b50bc1727e261" +dependencies = [ + "gloo-utils 0.2.0", + "js-sys", + "serde", + "wasm-bindgen", + "web-sys", +] + [[package]] name = "gloo-events" version = "0.1.2" @@ -2228,6 +2275,30 @@ dependencies = [ "num-traits", ] +[[package]] +name = "headers" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "322106e6bd0cba2d5ead589ddb8150a13d7c4217cf80d7c4f682ca994ccc6aa9" +dependencies = [ + "base64 0.21.7", + "bytes 1.10.1", + "headers-core", + "http 1.3.1", + "httpdate", + "mime", + "sha1", +] + +[[package]] +name = "headers-core" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "54b4a22553d4242c49fddb9ba998a99962b5cc6f22cb5a3482bec22522403ce4" +dependencies = [ + "http 1.3.1", +] + [[package]] name = "heck" version = "0.4.1" @@ -2977,6 +3048,8 @@ dependencies = [ "async-graphql-axum", "async-trait", "axum 0.8.3", + "axum-extra", + "axum-macros", "build-info", "build-info-build", "cacher", @@ -2984,6 +3057,7 @@ dependencies = [ "clap", "css-inline", "futures 0.3.31", + "headers", "html-escape", "letterbox-notmuch", "letterbox-shared", @@ -3027,6 +3101,7 @@ dependencies = [ "chrono", "console_error_panic_hook", "console_log", + "gloo-console", "gloo-net", "graphql_client", "human_format", diff --git a/server/Cargo.toml b/server/Cargo.toml index f131767..c62e115 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -17,13 +17,16 @@ anyhow = "1.0.79" async-graphql = { version = "7", features = ["log"] } async-graphql-axum = "7.0.15" async-trait = "0.1.81" -axum = "0.8.1" +axum = { version = "0.8.3", features = ["ws"] } +axum-extra = { version = "0.10.1", features = ["typed-header"] } +axum-macros = "0.5.0" build-info = "0.0.40" cacher = { version = "0.2.0", registry = "xinu" } chrono = "0.4.39" clap = { version = "4.5.23", features = ["derive"] } css-inline = "0.14.0" futures = "0.3.31" +headers = "0.4.0" html-escape = "0.2.13" letterbox-notmuch = { version = "0.12.1", path = "../notmuch", registry = "xinu" } letterbox-shared = { version = "0.12.1", path = "../shared", registry = "xinu" } diff --git a/server/src/bin/letterbox-server.rs b/server/src/bin/letterbox-server.rs index dce9a9b..58dfcb1 100644 --- a/server/src/bin/letterbox-server.rs +++ b/server/src/bin/letterbox-server.rs @@ -1,15 +1,19 @@ // Rocket generates a lot of warnings for handlers // TODO: figure out why #![allow(unreachable_patterns)] -use std::{error::Error, io::Cursor, str::FromStr}; +use std::{error::Error, io::Cursor, net::SocketAddr, str::FromStr, sync::Arc}; use async_graphql::{extensions, http::GraphiQLSource, Schema}; use async_graphql_axum::{GraphQL, GraphQLSubscription}; +//allows to extract the IP of connecting user +use axum::extract::connect_info::ConnectInfo; use axum::{ + extract::{ws::WebSocketUpgrade, State}, response::{self, IntoResponse}, - routing::get, + routing::{any, get}, Router, }; +use axum_extra::TypedHeader; use cacher::FilesystemCacher; use letterbox_notmuch::{Notmuch, NotmuchError, ThreadSet}; #[cfg(feature = "tantivy")] @@ -19,10 +23,13 @@ use letterbox_server::{ error::ServerError, graphql::{Attachment, GraphqlSchema, MutationRoot, QueryRoot, SubscriptionRoot}, nm::{attachment_bytes, cid_attachment_bytes}, + ws::ConnectionTracker, }; +use letterbox_shared::WebsocketMessage; use sqlx::postgres::PgPool; -use tokio::net::TcpListener; -use tower_http::trace::TraceLayer; +use tokio::{net::TcpListener, sync::Mutex}; +use tower_http::trace::{DefaultMakeSpan, TraceLayer}; +use tracing::{error, info}; /* #[get("/show//pretty")] @@ -245,36 +252,85 @@ async fn main() -> Result<(), Box> { async fn graphiql() -> impl IntoResponse { response::Html( GraphiQLSource::build() - .endpoint("/api/") - .subscription_endpoint("/api/ws") + .endpoint("/api/graphql/") + .subscription_endpoint("/api/graphql/ws") .finish(), ) } +async fn start_ws( + ws: WebSocketUpgrade, + ConnectInfo(addr): ConnectInfo, + State(connection_tracker): State>>, +) -> impl IntoResponse { + ws.on_upgrade(async move |socket| connection_tracker.lock().await.add_peer(socket, addr)) +} +#[axum_macros::debug_handler] +async fn test_handler( + State(connection_tracker): State>>, +) -> impl IntoResponse { + connection_tracker + .lock() + .await + .send_message_all(WebsocketMessage::RefreshMessages) + .await; + "test triggered" +} + #[tokio::main] async fn main() -> Result<(), Box> { let _guard = xtracing::init(env!("CARGO_BIN_NAME"))?; + build_info::build_info!(fn bi); + info!("Build Info: {}", letterbox_shared::build_version(bi)); + // TODO: move these to config + let port = 9345; + let config = Config { + newsreader_database_url: "postgres://newsreader@nixos-07.h.xinu.tv/newsreader".to_string(), + newsreader_tantivy_db_path: "../target/database/newsreader".to_string(), + slurp_cache_path: "/tmp/letterbox/slurp".to_string(), + }; + if !std::fs::exists(&config.slurp_cache_path)? { + info!("Creating slurp cache @ '{}'", &config.slurp_cache_path); + std::fs::create_dir_all(&config.slurp_cache_path)?; + } + let pool = PgPool::connect(&config.newsreader_database_url).await?; + sqlx::migrate!("./migrations").run(&pool).await?; + #[cfg(feature = "tantivy")] + let tantivy_conn = TantivyConnection::new(&config.newsreader_tantivy_db_path)?; + + let cacher = FilesystemCacher::new(&config.slurp_cache_path)?; let schema = Schema::build(QueryRoot, MutationRoot, SubscriptionRoot) - //.data(Storage::default()) - .finish(); + .data(Notmuch::default()) + .data(cacher) + .data(pool.clone()); + + let schema = schema.extension(extensions::Logger).finish(); + + let conn_tracker = Arc::new(Mutex::new(ConnectionTracker::default())); let app = Router::new() + .route("/test", get(test_handler)) + .route("/api/ws", any(start_ws)) + .route_service("/api/graphql/ws", GraphQLSubscription::new(schema.clone())) .route( - "/api/", + "/api/graphql/", get(graphiql).post_service(GraphQL::new(schema.clone())), ) - .route_service("/api/ws", GraphQLSubscription::new(schema)) + .with_state(conn_tracker) .layer( TraceLayer::new_for_http() - .on_request(tower_http::trace::DefaultOnRequest::new().level(tracing::Level::INFO)) - .on_response( - tower_http::trace::DefaultOnResponse::new().level(tracing::Level::INFO), - ) - .on_failure(tower_http::trace::DefaultOnFailure::new().level(tracing::Level::WARN)), + .make_span_with(DefaultMakeSpan::default().include_headers(true)), ); - axum::serve(TcpListener::bind("0.0.0.0:9345").await.unwrap(), app) + let listener = TcpListener::bind(SocketAddr::from(([0, 0, 0, 0], port))) .await .unwrap(); + tracing::info!("listening on {}", listener.local_addr().unwrap()); + axum::serve( + listener, + app.into_make_service_with_connect_info::(), + ) + .await + .unwrap(); Ok(()) } diff --git a/server/src/lib.rs b/server/src/lib.rs index 418c437..1cd0d80 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -4,6 +4,8 @@ pub mod graphql; pub mod mail; pub mod newsreader; pub mod nm; +pub mod ws; + #[cfg(feature = "tantivy")] pub mod tantivy; diff --git a/server/src/ws.rs b/server/src/ws.rs new file mode 100644 index 0000000..e4dba8e --- /dev/null +++ b/server/src/ws.rs @@ -0,0 +1,32 @@ +use std::{collections::HashMap, net::SocketAddr}; + +use axum::extract::ws::{Message, WebSocket}; +use letterbox_shared::WebsocketMessage; +use tracing::{info, warn}; + +#[derive(Default)] +pub struct ConnectionTracker { + peers: HashMap, +} + +impl ConnectionTracker { + pub fn add_peer(&mut self, socket: WebSocket, who: SocketAddr) { + warn!("adding {who:?} to connection tracker"); + self.peers.insert(who, socket); + } + pub async fn send_message_all(&mut self, msg: WebsocketMessage) { + let m = serde_json::to_string(&msg).expect("failed to json encode WebsocketMessage"); + let mut bad_peers = Vec::new(); + for (who, socket) in &mut self.peers.iter_mut() { + if let Err(e) = socket.send(Message::Text(m.clone().into())).await { + warn!("{:?} is bad, scheduling for removal: {e}", who); + bad_peers.push(who.clone()); + } + } + + for b in bad_peers { + info!("removing bad peer {b:?}"); + self.peers.remove(&b); + } + } +} diff --git a/shared/src/lib.rs b/shared/src/lib.rs index 9c41b0d..3724eb6 100644 --- a/shared/src/lib.rs +++ b/shared/src/lib.rs @@ -14,7 +14,9 @@ pub struct SearchResult { } #[derive(Serialize, Deserialize, Debug)] -pub struct Message {} +pub enum WebsocketMessage { + RefreshMessages, +} pub mod urls { pub const MOUNT_POINT: &'static str = "/api"; diff --git a/web/Cargo.toml b/web/Cargo.toml index 72cde9a..73a8f6a 100644 --- a/web/Cargo.toml +++ b/web/Cargo.toml @@ -38,6 +38,7 @@ letterbox-notmuch = { version = "0.12.1", path = "../notmuch", registry = "xinu" seed_hooks = { version = "0.4.0", registry = "xinu" } strum_macros = "0.27.1" wasm-sockets = "1.0.0" +gloo-console = "0.3.0" [package.metadata.wasm-pack.profile.release] wasm-opt = ['-Os'] diff --git a/web/src/state.rs b/web/src/state.rs index 12d7750..b3c0f35 100644 --- a/web/src/state.rs +++ b/web/src/state.rs @@ -43,7 +43,7 @@ pub fn init(url: Url, orders: &mut impl Orders) -> Model { }; // TODO(wathiede): only do this while viewing the index? Or maybe add a new message that force // 'notmuch new' on the server periodically? - orders.stream(streams::interval(30_000, || Msg::RefreshStart)); + //orders.stream(streams::interval(30_000, || Msg::RefreshStart)); orders.subscribe(Msg::OnUrlChanged); orders.stream(streams::window_event(Ev::Scroll, |_| Msg::WindowScrolled)); diff --git a/web/src/websocket.rs b/web/src/websocket.rs new file mode 100644 index 0000000..c4b32f3 --- /dev/null +++ b/web/src/websocket.rs @@ -0,0 +1,167 @@ +use std::rc::Rc; + +use letterbox_shared::WebsocketMessage; +use log::{error, info}; +use seed::{prelude::*, *}; +use serde::{Deserialize, Serialize}; +use wasm_sockets::{self, ConnectionStatus, EventClient, Message, WebSocketError}; +use web_sys::CloseEvent; + +/// Message from the server to the client. +#[derive(Serialize, Deserialize)] +pub struct ServerMessage { + pub id: usize, + pub text: String, +} + +/// Message from the client to the server. +#[derive(Serialize, Deserialize)] +pub struct ClientMessage { + pub text: String, +} + +//const WS_URL: &str = "wss://9000.z.xinu.tv/api/ws"; +//const WS_URL: &str = "wss://9345.z.xinu.tv/api/graphql/ws"; +const WS_URL: &str = "wss://6758.z.xinu.tv/api/ws"; + +// ------ ------ +// Model +// ------ ------ + +pub struct Model { + web_socket: EventClient, + web_socket_reconnector: Option, +} + +// ------ ------ +// Init +// ------ ------ + +pub fn init(_: Url, orders: &mut impl Orders) -> Model { + Model { + web_socket: create_websocket(orders).unwrap(), + web_socket_reconnector: None, + } +} + +// ------ ------ +// Update +// ------ ------ + +pub enum Msg { + WebSocketOpened, + TextMessageReceived(String), + BinaryMessageReceived(ServerMessage), + CloseWebSocket, + WebSocketClosed(CloseEvent), + WebSocketFailed, + ReconnectWebSocket(usize), + SendMessage(ClientMessage), + SendBinaryMessage(ClientMessage), +} + +pub fn update(msg: Msg, mut model: &mut Model, orders: &mut impl Orders) { + match msg { + Msg::WebSocketOpened => { + model.web_socket_reconnector = None; + info!("WebSocket connection is open now"); + } + Msg::TextMessageReceived(msg) => { + info!("recieved text {}", msg); + } + Msg::BinaryMessageReceived(message) => { + error!("Client received binary message"); + } + Msg::CloseWebSocket => { + model.web_socket_reconnector = None; + model.web_socket.close().unwrap(); + } + Msg::WebSocketClosed(close_event) => { + info!("=================="); + info!("WebSocket connection was closed:"); + info!("Clean: {}", close_event.was_clean()); + info!("Code: {}", close_event.code()); + info!("Reason: {}", close_event.reason()); + info!("=================="); + + // Chrome doesn't invoke `on_error` when the connection is lost. + if !close_event.was_clean() && model.web_socket_reconnector.is_none() { + model.web_socket_reconnector = Some( + orders.stream_with_handle(streams::backoff(None, Msg::ReconnectWebSocket)), + ); + } + } + Msg::WebSocketFailed => { + info!("WebSocket failed"); + if model.web_socket_reconnector.is_none() { + model.web_socket_reconnector = Some( + orders.stream_with_handle(streams::backoff(None, Msg::ReconnectWebSocket)), + ); + } + } + Msg::ReconnectWebSocket(retries) => { + info!("Reconnect attempt: {}", retries); + model.web_socket = create_websocket(orders).unwrap(); + } + Msg::SendMessage(msg) => { + let txt = serde_json::to_string(&msg).unwrap(); + model.web_socket.send_string(&txt).unwrap(); + } + Msg::SendBinaryMessage(_msg) => { + error!("Attempt to send binary message, unsupported"); + } + } +} + +fn create_websocket(orders: &impl Orders) -> Result { + let msg_sender = orders.msg_sender(); + + let mut client = EventClient::new(WS_URL)?; + + client.set_on_error(Some(Box::new(|error| { + gloo_console::error!("WS: ", error); + }))); + + let send = msg_sender.clone(); + client.set_on_connection(Some(Box::new(move |client: &EventClient| { + info!("{:#?}", client.status); + let msg = match *client.status.borrow() { + ConnectionStatus::Connecting => { + info!("Connecting..."); + None + } + ConnectionStatus::Connected => Some(Msg::WebSocketOpened), + ConnectionStatus::Error => Some(Msg::WebSocketFailed), + ConnectionStatus::Disconnected => { + info!("Disconnected"); + None + } + }; + send(msg); + }))); + + let send = msg_sender.clone(); + client.set_on_close(Some(Box::new(move |ev| { + info!("WS: Connection closed"); + send(Some(Msg::WebSocketClosed(ev))); + }))); + + let send = msg_sender.clone(); + client.set_on_message(Some(Box::new( + move |_: &EventClient, msg: wasm_sockets::Message| decode_message(msg, Rc::clone(&send)), + ))); + + Ok(client) +} + +fn decode_message(message: Message, msg_sender: Rc)>) { + match message { + Message::Text(txt) => { + let msg: WebsocketMessage = serde_json::from_str(&txt).unwrap_or_else(|e| { + panic!("failed to parse json into WebsocketMessage: {e}\n'{txt}'") + }); + msg_sender(Some(Msg::TextMessageReceived(txt))); + } + m => error!("unexpected message type received of {m:?}"), + } +}