Compare commits
No commits in common. "f6d5d3755b5f82712ed3b010a459e7f37f838246" and "c10ad00ca77808294e714c10edb2a2b4291c5c39" have entirely different histories.
f6d5d3755b
...
c10ad00ca7
10
Cargo.lock
generated
10
Cargo.lock
generated
@ -3034,7 +3034,7 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "letterbox-notmuch"
|
name = "letterbox-notmuch"
|
||||||
version = "0.17.5"
|
version = "0.17.4"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"itertools",
|
"itertools",
|
||||||
"log",
|
"log",
|
||||||
@ -3049,7 +3049,7 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "letterbox-procmail2notmuch"
|
name = "letterbox-procmail2notmuch"
|
||||||
version = "0.17.5"
|
version = "0.17.4"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"anyhow",
|
"anyhow",
|
||||||
"clap",
|
"clap",
|
||||||
@ -3062,7 +3062,7 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "letterbox-server"
|
name = "letterbox-server"
|
||||||
version = "0.17.5"
|
version = "0.17.4"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"ammonia",
|
"ammonia",
|
||||||
"anyhow",
|
"anyhow",
|
||||||
@ -3121,7 +3121,7 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "letterbox-shared"
|
name = "letterbox-shared"
|
||||||
version = "0.17.5"
|
version = "0.17.4"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"build-info",
|
"build-info",
|
||||||
"letterbox-notmuch 0.17.3",
|
"letterbox-notmuch 0.17.3",
|
||||||
@ -3134,7 +3134,7 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "letterbox-web"
|
name = "letterbox-web"
|
||||||
version = "0.17.5"
|
version = "0.17.4"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"build-info",
|
"build-info",
|
||||||
"build-info-build",
|
"build-info-build",
|
||||||
|
|||||||
@ -8,7 +8,7 @@ authors = ["Bill Thiede <git@xinu.tv>"]
|
|||||||
edition = "2021"
|
edition = "2021"
|
||||||
license = "UNLICENSED"
|
license = "UNLICENSED"
|
||||||
publish = ["xinu"]
|
publish = ["xinu"]
|
||||||
version = "0.17.5"
|
version = "0.17.4"
|
||||||
repository = "https://git.z.xinu.tv/wathiede/letterbox"
|
repository = "https://git.z.xinu.tv/wathiede/letterbox"
|
||||||
|
|
||||||
[profile.dev]
|
[profile.dev]
|
||||||
|
|||||||
@ -21,7 +21,7 @@ use letterbox_notmuch::Notmuch;
|
|||||||
use letterbox_server::tantivy::TantivyConnection;
|
use letterbox_server::tantivy::TantivyConnection;
|
||||||
use letterbox_server::{
|
use letterbox_server::{
|
||||||
graphql::{compute_catchup_ids, Attachment, MutationRoot, QueryRoot, SubscriptionRoot},
|
graphql::{compute_catchup_ids, Attachment, MutationRoot, QueryRoot, SubscriptionRoot},
|
||||||
nm::{attachment_bytes, cid_attachment_bytes, label_unprocessed},
|
nm::{attachment_bytes, cid_attachment_bytes},
|
||||||
ws::ConnectionTracker,
|
ws::ConnectionTracker,
|
||||||
};
|
};
|
||||||
use letterbox_shared::WebsocketMessage;
|
use letterbox_shared::WebsocketMessage;
|
||||||
@ -29,7 +29,7 @@ use serde::Deserialize;
|
|||||||
use sqlx::postgres::PgPool;
|
use sqlx::postgres::PgPool;
|
||||||
use tokio::{net::TcpListener, sync::Mutex};
|
use tokio::{net::TcpListener, sync::Mutex};
|
||||||
use tower_http::trace::{DefaultMakeSpan, TraceLayer};
|
use tower_http::trace::{DefaultMakeSpan, TraceLayer};
|
||||||
use tracing::{error, info};
|
use tracing::info;
|
||||||
|
|
||||||
// Make our own error that wraps `ServerError`.
|
// Make our own error that wraps `ServerError`.
|
||||||
struct AppError(letterbox_server::ServerError);
|
struct AppError(letterbox_server::ServerError);
|
||||||
@ -176,15 +176,11 @@ async fn start_ws(
|
|||||||
#[derive(Debug, Deserialize)]
|
#[derive(Debug, Deserialize)]
|
||||||
struct NotificationParams {
|
struct NotificationParams {
|
||||||
delay_ms: Option<u64>,
|
delay_ms: Option<u64>,
|
||||||
num_unprocessed: Option<usize>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn send_refresh_websocket_handler(
|
async fn send_refresh_websocket_handler(
|
||||||
State(AppState {
|
State(AppState {
|
||||||
nm,
|
connection_tracker, ..
|
||||||
pool,
|
|
||||||
connection_tracker,
|
|
||||||
..
|
|
||||||
}): State<AppState>,
|
}): State<AppState>,
|
||||||
params: Query<NotificationParams>,
|
params: Query<NotificationParams>,
|
||||||
) -> impl IntoResponse {
|
) -> impl IntoResponse {
|
||||||
@ -194,15 +190,6 @@ async fn send_refresh_websocket_handler(
|
|||||||
info!("sleeping {delay:?}");
|
info!("sleeping {delay:?}");
|
||||||
tokio::time::sleep(delay).await;
|
tokio::time::sleep(delay).await;
|
||||||
}
|
}
|
||||||
let limit = match params.num_unprocessed {
|
|
||||||
Some(0) => None,
|
|
||||||
Some(limit) => Some(limit),
|
|
||||||
None => Some(10),
|
|
||||||
};
|
|
||||||
|
|
||||||
if let Err(err) = label_unprocessed(&nm, &pool, false, limit, "tag:unprocessed").await {
|
|
||||||
error!("Failed to label_unprocessed: {err:?}");
|
|
||||||
};
|
|
||||||
connection_tracker
|
connection_tracker
|
||||||
.lock()
|
.lock()
|
||||||
.await
|
.await
|
||||||
@ -217,33 +204,18 @@ async fn watch_new(
|
|||||||
conn_tracker: Arc<Mutex<ConnectionTracker>>,
|
conn_tracker: Arc<Mutex<ConnectionTracker>>,
|
||||||
poll_time: Duration,
|
poll_time: Duration,
|
||||||
) -> Result<(), async_graphql::Error> {
|
) -> Result<(), async_graphql::Error> {
|
||||||
async fn watch_new_iteration(
|
let mut old_ids = Vec::new();
|
||||||
nm: &Notmuch,
|
loop {
|
||||||
pool: &PgPool,
|
|
||||||
conn_tracker: Arc<Mutex<ConnectionTracker>>,
|
|
||||||
old_ids: &[String],
|
|
||||||
) -> Result<Vec<String>, async_graphql::Error> {
|
|
||||||
let ids = compute_catchup_ids(&nm, &pool, "is:unread").await?;
|
let ids = compute_catchup_ids(&nm, &pool, "is:unread").await?;
|
||||||
info!("old_ids: {} ids: {}", old_ids.len(), ids.len());
|
|
||||||
if old_ids != ids {
|
if old_ids != ids {
|
||||||
label_unprocessed(&nm, &pool, false, Some(100), "tag:unprocessed").await?;
|
info!("old_ids: {old_ids:?}\n ids: {ids:?}");
|
||||||
conn_tracker
|
conn_tracker
|
||||||
.lock()
|
.lock()
|
||||||
.await
|
.await
|
||||||
.send_message_all(WebsocketMessage::RefreshMessages)
|
.send_message_all(WebsocketMessage::RefreshMessages)
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
Ok(ids)
|
old_ids = ids;
|
||||||
}
|
|
||||||
let mut old_ids = Vec::new();
|
|
||||||
loop {
|
|
||||||
old_ids = match watch_new_iteration(&nm, &pool, conn_tracker.clone(), &old_ids).await {
|
|
||||||
Ok(old_ids) => old_ids,
|
|
||||||
Err(err) => {
|
|
||||||
error!("watch_new_iteration failed: {err:?}");
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
tokio::time::sleep(poll_time).await;
|
tokio::time::sleep(poll_time).await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -251,7 +223,6 @@ async fn watch_new(
|
|||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
struct AppState {
|
struct AppState {
|
||||||
nm: Notmuch,
|
nm: Notmuch,
|
||||||
pool: PgPool,
|
|
||||||
connection_tracker: Arc<Mutex<ConnectionTracker>>,
|
connection_tracker: Arc<Mutex<ConnectionTracker>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -292,7 +263,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
|
|||||||
let connection_tracker = Arc::new(Mutex::new(ConnectionTracker::default()));
|
let connection_tracker = Arc::new(Mutex::new(ConnectionTracker::default()));
|
||||||
let ct = Arc::clone(&connection_tracker);
|
let ct = Arc::clone(&connection_tracker);
|
||||||
let poll_time = Duration::from_secs(60);
|
let poll_time = Duration::from_secs(60);
|
||||||
let _h = tokio::spawn(watch_new(nm.clone(), pool.clone(), ct, poll_time));
|
let _h = tokio::spawn(watch_new(nm.clone(), pool, ct, poll_time));
|
||||||
|
|
||||||
let api_routes = Router::new()
|
let api_routes = Router::new()
|
||||||
.route(
|
.route(
|
||||||
@ -317,7 +288,6 @@ async fn main() -> Result<(), Box<dyn Error>> {
|
|||||||
.nest("/notification", notification_routes)
|
.nest("/notification", notification_routes)
|
||||||
.with_state(AppState {
|
.with_state(AppState {
|
||||||
nm,
|
nm,
|
||||||
pool,
|
|
||||||
connection_tracker,
|
connection_tracker,
|
||||||
})
|
})
|
||||||
.layer(
|
.layer(
|
||||||
|
|||||||
@ -671,7 +671,7 @@ pub fn update(msg: Msg, model: &mut Model, orders: &mut impl Orders<Msg>) {
|
|||||||
}
|
}
|
||||||
Msg::WebsocketMessage(msg) => {
|
Msg::WebsocketMessage(msg) => {
|
||||||
match msg {
|
match msg {
|
||||||
WebsocketMessage::RefreshMessages => orders.send_msg(Msg::Refresh),
|
WebsocketMessage::RefreshMessages => orders.send_msg(Msg::RefreshStart),
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user