Add websocket handler on server, connect from client

Additionally add /test handler that triggers server->client WS message
This commit is contained in:
Bill Thiede 2025-04-14 20:46:52 -07:00
parent b2c73ffa15
commit f2042f284e
9 changed files with 357 additions and 19 deletions

75
Cargo.lock generated
View File

@ -448,6 +448,40 @@ dependencies = [
"tracing",
]
[[package]]
name = "axum-extra"
version = "0.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "45bf463831f5131b7d3c756525b305d40f1185b688565648a92e1392ca35713d"
dependencies = [
"axum 0.8.3",
"axum-core 0.5.2",
"bytes 1.10.1",
"futures-util",
"headers",
"http 1.3.1",
"http-body 1.0.1",
"http-body-util",
"mime",
"pin-project-lite",
"rustversion",
"serde",
"tower 0.5.2",
"tower-layer",
"tower-service",
]
[[package]]
name = "axum-macros"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "604fde5e028fea851ce1d8570bbdc034bec850d157f7569d10f347d06808c05c"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.100",
]
[[package]]
name = "backtrace"
version = "0.3.74"
@ -1952,6 +1986,19 @@ version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a8d1add55171497b4705a648c6b583acafb01d58050a51727785f0b2c8e0a2b2"
[[package]]
name = "gloo-console"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2a17868f56b4a24f677b17c8cb69958385102fa879418052d60b50bc1727e261"
dependencies = [
"gloo-utils 0.2.0",
"js-sys",
"serde",
"wasm-bindgen",
"web-sys",
]
[[package]]
name = "gloo-events"
version = "0.1.2"
@ -2228,6 +2275,30 @@ dependencies = [
"num-traits",
]
[[package]]
name = "headers"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "322106e6bd0cba2d5ead589ddb8150a13d7c4217cf80d7c4f682ca994ccc6aa9"
dependencies = [
"base64 0.21.7",
"bytes 1.10.1",
"headers-core",
"http 1.3.1",
"httpdate",
"mime",
"sha1",
]
[[package]]
name = "headers-core"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "54b4a22553d4242c49fddb9ba998a99962b5cc6f22cb5a3482bec22522403ce4"
dependencies = [
"http 1.3.1",
]
[[package]]
name = "heck"
version = "0.4.1"
@ -2977,6 +3048,8 @@ dependencies = [
"async-graphql-axum",
"async-trait",
"axum 0.8.3",
"axum-extra",
"axum-macros",
"build-info",
"build-info-build",
"cacher",
@ -2984,6 +3057,7 @@ dependencies = [
"clap",
"css-inline",
"futures 0.3.31",
"headers",
"html-escape",
"letterbox-notmuch",
"letterbox-shared",
@ -3027,6 +3101,7 @@ dependencies = [
"chrono",
"console_error_panic_hook",
"console_log",
"gloo-console",
"gloo-net",
"graphql_client",
"human_format",

View File

@ -17,13 +17,16 @@ anyhow = "1.0.79"
async-graphql = { version = "7", features = ["log"] }
async-graphql-axum = "7.0.15"
async-trait = "0.1.81"
axum = "0.8.1"
axum = { version = "0.8.3", features = ["ws"] }
axum-extra = { version = "0.10.1", features = ["typed-header"] }
axum-macros = "0.5.0"
build-info = "0.0.40"
cacher = { version = "0.2.0", registry = "xinu" }
chrono = "0.4.39"
clap = { version = "4.5.23", features = ["derive"] }
css-inline = "0.14.0"
futures = "0.3.31"
headers = "0.4.0"
html-escape = "0.2.13"
letterbox-notmuch = { version = "0.12.1", path = "../notmuch", registry = "xinu" }
letterbox-shared = { version = "0.12.1", path = "../shared", registry = "xinu" }

View File

@ -1,15 +1,19 @@
// Rocket generates a lot of warnings for handlers
// TODO: figure out why
#![allow(unreachable_patterns)]
use std::{error::Error, io::Cursor, str::FromStr};
use std::{error::Error, io::Cursor, net::SocketAddr, str::FromStr, sync::Arc};
use async_graphql::{extensions, http::GraphiQLSource, Schema};
use async_graphql_axum::{GraphQL, GraphQLSubscription};
//allows to extract the IP of connecting user
use axum::extract::connect_info::ConnectInfo;
use axum::{
extract::{ws::WebSocketUpgrade, State},
response::{self, IntoResponse},
routing::get,
routing::{any, get},
Router,
};
use axum_extra::TypedHeader;
use cacher::FilesystemCacher;
use letterbox_notmuch::{Notmuch, NotmuchError, ThreadSet};
#[cfg(feature = "tantivy")]
@ -19,10 +23,13 @@ use letterbox_server::{
error::ServerError,
graphql::{Attachment, GraphqlSchema, MutationRoot, QueryRoot, SubscriptionRoot},
nm::{attachment_bytes, cid_attachment_bytes},
ws::ConnectionTracker,
};
use letterbox_shared::WebsocketMessage;
use sqlx::postgres::PgPool;
use tokio::net::TcpListener;
use tower_http::trace::TraceLayer;
use tokio::{net::TcpListener, sync::Mutex};
use tower_http::trace::{DefaultMakeSpan, TraceLayer};
use tracing::{error, info};
/*
#[get("/show/<query>/pretty")]
@ -245,36 +252,85 @@ async fn main() -> Result<(), Box<dyn Error>> {
async fn graphiql() -> impl IntoResponse {
response::Html(
GraphiQLSource::build()
.endpoint("/api/")
.subscription_endpoint("/api/ws")
.endpoint("/api/graphql/")
.subscription_endpoint("/api/graphql/ws")
.finish(),
)
}
async fn start_ws(
ws: WebSocketUpgrade,
ConnectInfo(addr): ConnectInfo<SocketAddr>,
State(connection_tracker): State<Arc<Mutex<ConnectionTracker>>>,
) -> impl IntoResponse {
ws.on_upgrade(async move |socket| connection_tracker.lock().await.add_peer(socket, addr))
}
#[axum_macros::debug_handler]
async fn test_handler(
State(connection_tracker): State<Arc<Mutex<ConnectionTracker>>>,
) -> impl IntoResponse {
connection_tracker
.lock()
.await
.send_message_all(WebsocketMessage::RefreshMessages)
.await;
"test triggered"
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let _guard = xtracing::init(env!("CARGO_BIN_NAME"))?;
build_info::build_info!(fn bi);
info!("Build Info: {}", letterbox_shared::build_version(bi));
// TODO: move these to config
let port = 9345;
let config = Config {
newsreader_database_url: "postgres://newsreader@nixos-07.h.xinu.tv/newsreader".to_string(),
newsreader_tantivy_db_path: "../target/database/newsreader".to_string(),
slurp_cache_path: "/tmp/letterbox/slurp".to_string(),
};
if !std::fs::exists(&config.slurp_cache_path)? {
info!("Creating slurp cache @ '{}'", &config.slurp_cache_path);
std::fs::create_dir_all(&config.slurp_cache_path)?;
}
let pool = PgPool::connect(&config.newsreader_database_url).await?;
sqlx::migrate!("./migrations").run(&pool).await?;
#[cfg(feature = "tantivy")]
let tantivy_conn = TantivyConnection::new(&config.newsreader_tantivy_db_path)?;
let cacher = FilesystemCacher::new(&config.slurp_cache_path)?;
let schema = Schema::build(QueryRoot, MutationRoot, SubscriptionRoot)
//.data(Storage::default())
.finish();
.data(Notmuch::default())
.data(cacher)
.data(pool.clone());
let schema = schema.extension(extensions::Logger).finish();
let conn_tracker = Arc::new(Mutex::new(ConnectionTracker::default()));
let app = Router::new()
.route("/test", get(test_handler))
.route("/api/ws", any(start_ws))
.route_service("/api/graphql/ws", GraphQLSubscription::new(schema.clone()))
.route(
"/api/",
"/api/graphql/",
get(graphiql).post_service(GraphQL::new(schema.clone())),
)
.route_service("/api/ws", GraphQLSubscription::new(schema))
.with_state(conn_tracker)
.layer(
TraceLayer::new_for_http()
.on_request(tower_http::trace::DefaultOnRequest::new().level(tracing::Level::INFO))
.on_response(
tower_http::trace::DefaultOnResponse::new().level(tracing::Level::INFO),
)
.on_failure(tower_http::trace::DefaultOnFailure::new().level(tracing::Level::WARN)),
.make_span_with(DefaultMakeSpan::default().include_headers(true)),
);
axum::serve(TcpListener::bind("0.0.0.0:9345").await.unwrap(), app)
let listener = TcpListener::bind(SocketAddr::from(([0, 0, 0, 0], port)))
.await
.unwrap();
tracing::info!("listening on {}", listener.local_addr().unwrap());
axum::serve(
listener,
app.into_make_service_with_connect_info::<SocketAddr>(),
)
.await
.unwrap();
Ok(())
}

View File

@ -4,6 +4,8 @@ pub mod graphql;
pub mod mail;
pub mod newsreader;
pub mod nm;
pub mod ws;
#[cfg(feature = "tantivy")]
pub mod tantivy;

32
server/src/ws.rs Normal file
View File

@ -0,0 +1,32 @@
use std::{collections::HashMap, net::SocketAddr};
use axum::extract::ws::{Message, WebSocket};
use letterbox_shared::WebsocketMessage;
use tracing::{info, warn};
#[derive(Default)]
pub struct ConnectionTracker {
peers: HashMap<SocketAddr, WebSocket>,
}
impl ConnectionTracker {
pub fn add_peer(&mut self, socket: WebSocket, who: SocketAddr) {
warn!("adding {who:?} to connection tracker");
self.peers.insert(who, socket);
}
pub async fn send_message_all(&mut self, msg: WebsocketMessage) {
let m = serde_json::to_string(&msg).expect("failed to json encode WebsocketMessage");
let mut bad_peers = Vec::new();
for (who, socket) in &mut self.peers.iter_mut() {
if let Err(e) = socket.send(Message::Text(m.clone().into())).await {
warn!("{:?} is bad, scheduling for removal: {e}", who);
bad_peers.push(who.clone());
}
}
for b in bad_peers {
info!("removing bad peer {b:?}");
self.peers.remove(&b);
}
}
}

View File

@ -14,7 +14,9 @@ pub struct SearchResult {
}
#[derive(Serialize, Deserialize, Debug)]
pub struct Message {}
pub enum WebsocketMessage {
RefreshMessages,
}
pub mod urls {
pub const MOUNT_POINT: &'static str = "/api";

View File

@ -38,6 +38,7 @@ letterbox-notmuch = { version = "0.12.1", path = "../notmuch", registry = "xinu"
seed_hooks = { version = "0.4.0", registry = "xinu" }
strum_macros = "0.27.1"
wasm-sockets = "1.0.0"
gloo-console = "0.3.0"
[package.metadata.wasm-pack.profile.release]
wasm-opt = ['-Os']

View File

@ -43,7 +43,7 @@ pub fn init(url: Url, orders: &mut impl Orders<Msg>) -> Model {
};
// TODO(wathiede): only do this while viewing the index? Or maybe add a new message that force
// 'notmuch new' on the server periodically?
orders.stream(streams::interval(30_000, || Msg::RefreshStart));
//orders.stream(streams::interval(30_000, || Msg::RefreshStart));
orders.subscribe(Msg::OnUrlChanged);
orders.stream(streams::window_event(Ev::Scroll, |_| Msg::WindowScrolled));

167
web/src/websocket.rs Normal file
View File

@ -0,0 +1,167 @@
use std::rc::Rc;
use letterbox_shared::WebsocketMessage;
use log::{error, info};
use seed::{prelude::*, *};
use serde::{Deserialize, Serialize};
use wasm_sockets::{self, ConnectionStatus, EventClient, Message, WebSocketError};
use web_sys::CloseEvent;
/// Message from the server to the client.
#[derive(Serialize, Deserialize)]
pub struct ServerMessage {
pub id: usize,
pub text: String,
}
/// Message from the client to the server.
#[derive(Serialize, Deserialize)]
pub struct ClientMessage {
pub text: String,
}
//const WS_URL: &str = "wss://9000.z.xinu.tv/api/ws";
//const WS_URL: &str = "wss://9345.z.xinu.tv/api/graphql/ws";
const WS_URL: &str = "wss://6758.z.xinu.tv/api/ws";
// ------ ------
// Model
// ------ ------
pub struct Model {
web_socket: EventClient,
web_socket_reconnector: Option<StreamHandle>,
}
// ------ ------
// Init
// ------ ------
pub fn init(_: Url, orders: &mut impl Orders<Msg>) -> Model {
Model {
web_socket: create_websocket(orders).unwrap(),
web_socket_reconnector: None,
}
}
// ------ ------
// Update
// ------ ------
pub enum Msg {
WebSocketOpened,
TextMessageReceived(String),
BinaryMessageReceived(ServerMessage),
CloseWebSocket,
WebSocketClosed(CloseEvent),
WebSocketFailed,
ReconnectWebSocket(usize),
SendMessage(ClientMessage),
SendBinaryMessage(ClientMessage),
}
pub fn update(msg: Msg, mut model: &mut Model, orders: &mut impl Orders<Msg>) {
match msg {
Msg::WebSocketOpened => {
model.web_socket_reconnector = None;
info!("WebSocket connection is open now");
}
Msg::TextMessageReceived(msg) => {
info!("recieved text {}", msg);
}
Msg::BinaryMessageReceived(message) => {
error!("Client received binary message");
}
Msg::CloseWebSocket => {
model.web_socket_reconnector = None;
model.web_socket.close().unwrap();
}
Msg::WebSocketClosed(close_event) => {
info!("==================");
info!("WebSocket connection was closed:");
info!("Clean: {}", close_event.was_clean());
info!("Code: {}", close_event.code());
info!("Reason: {}", close_event.reason());
info!("==================");
// Chrome doesn't invoke `on_error` when the connection is lost.
if !close_event.was_clean() && model.web_socket_reconnector.is_none() {
model.web_socket_reconnector = Some(
orders.stream_with_handle(streams::backoff(None, Msg::ReconnectWebSocket)),
);
}
}
Msg::WebSocketFailed => {
info!("WebSocket failed");
if model.web_socket_reconnector.is_none() {
model.web_socket_reconnector = Some(
orders.stream_with_handle(streams::backoff(None, Msg::ReconnectWebSocket)),
);
}
}
Msg::ReconnectWebSocket(retries) => {
info!("Reconnect attempt: {}", retries);
model.web_socket = create_websocket(orders).unwrap();
}
Msg::SendMessage(msg) => {
let txt = serde_json::to_string(&msg).unwrap();
model.web_socket.send_string(&txt).unwrap();
}
Msg::SendBinaryMessage(_msg) => {
error!("Attempt to send binary message, unsupported");
}
}
}
fn create_websocket(orders: &impl Orders<Msg>) -> Result<EventClient, WebSocketError> {
let msg_sender = orders.msg_sender();
let mut client = EventClient::new(WS_URL)?;
client.set_on_error(Some(Box::new(|error| {
gloo_console::error!("WS: ", error);
})));
let send = msg_sender.clone();
client.set_on_connection(Some(Box::new(move |client: &EventClient| {
info!("{:#?}", client.status);
let msg = match *client.status.borrow() {
ConnectionStatus::Connecting => {
info!("Connecting...");
None
}
ConnectionStatus::Connected => Some(Msg::WebSocketOpened),
ConnectionStatus::Error => Some(Msg::WebSocketFailed),
ConnectionStatus::Disconnected => {
info!("Disconnected");
None
}
};
send(msg);
})));
let send = msg_sender.clone();
client.set_on_close(Some(Box::new(move |ev| {
info!("WS: Connection closed");
send(Some(Msg::WebSocketClosed(ev)));
})));
let send = msg_sender.clone();
client.set_on_message(Some(Box::new(
move |_: &EventClient, msg: wasm_sockets::Message| decode_message(msg, Rc::clone(&send)),
)));
Ok(client)
}
fn decode_message(message: Message, msg_sender: Rc<dyn Fn(Option<Msg>)>) {
match message {
Message::Text(txt) => {
let msg: WebsocketMessage = serde_json::from_str(&txt).unwrap_or_else(|e| {
panic!("failed to parse json into WebsocketMessage: {e}\n'{txt}'")
});
msg_sender(Some(Msg::TextMessageReceived(txt)));
}
m => error!("unexpected message type received of {m:?}"),
}
}