server: add label_unprocessed method, and implement wake
This commit is contained in:
parent
2e43700cd7
commit
df356e8711
@ -676,6 +676,18 @@ impl MutationRoot {
|
|||||||
|
|
||||||
Ok(true)
|
Ok(true)
|
||||||
}
|
}
|
||||||
|
#[instrument(skip_all, fields(rid=request_id()))]
|
||||||
|
async fn label_unprocessed<'ctx>(
|
||||||
|
&self,
|
||||||
|
ctx: &Context<'ctx>,
|
||||||
|
limit: Option<usize>,
|
||||||
|
) -> Result<bool, Error> {
|
||||||
|
let nm = ctx.data_unchecked::<Notmuch>();
|
||||||
|
let pool = ctx.data_unchecked::<PgPool>();
|
||||||
|
label_unprocessed(&nm, &pool, false, limit, "tag:unprocessed").await?;
|
||||||
|
Ok(true)
|
||||||
|
}
|
||||||
|
|
||||||
#[instrument(skip_all, fields(rid=request_id()))]
|
#[instrument(skip_all, fields(rid=request_id()))]
|
||||||
async fn refresh<'ctx>(&self, ctx: &Context<'ctx>) -> Result<bool, Error> {
|
async fn refresh<'ctx>(&self, ctx: &Context<'ctx>) -> Result<bool, Error> {
|
||||||
let nm = ctx.data_unchecked::<Notmuch>();
|
let nm = ctx.data_unchecked::<Notmuch>();
|
||||||
@ -687,6 +699,9 @@ impl MutationRoot {
|
|||||||
// Process email labels
|
// Process email labels
|
||||||
label_unprocessed(&nm, &pool, false, Some(1000), "tag:unprocessed").await?;
|
label_unprocessed(&nm, &pool, false, Some(1000), "tag:unprocessed").await?;
|
||||||
|
|
||||||
|
// Look for snoozed messages and mark unread
|
||||||
|
wakeup(&nm, &pool).await?;
|
||||||
|
|
||||||
#[cfg(feature = "tantivy")]
|
#[cfg(feature = "tantivy")]
|
||||||
{
|
{
|
||||||
let tantivy = ctx.data_unchecked::<TantivyConnection>();
|
let tantivy = ctx.data_unchecked::<TantivyConnection>();
|
||||||
@ -707,6 +722,33 @@ impl SubscriptionRoot {
|
|||||||
|
|
||||||
pub type GraphqlSchema = Schema<QueryRoot, MutationRoot, SubscriptionRoot>;
|
pub type GraphqlSchema = Schema<QueryRoot, MutationRoot, SubscriptionRoot>;
|
||||||
|
|
||||||
|
#[instrument(name = "wakeup", skip_all)]
|
||||||
|
pub async fn wakeup(nm: &Notmuch, pool: &PgPool) -> Result<(), Error> {
|
||||||
|
for row in sqlx::query!(
|
||||||
|
r#"
|
||||||
|
SELECT id, message_id
|
||||||
|
FROM snooze
|
||||||
|
WHERE wake < NOW();
|
||||||
|
"#
|
||||||
|
)
|
||||||
|
.fetch_all(pool)
|
||||||
|
.await?
|
||||||
|
{
|
||||||
|
let query: Query = row.message_id.parse()?;
|
||||||
|
info!("need to wake {query}");
|
||||||
|
let unread = true;
|
||||||
|
newsreader::set_read_status(pool, &query, unread).await?;
|
||||||
|
#[cfg(feature = "tantivy")]
|
||||||
|
tantivy.reindex_thread(pool, &query).await?;
|
||||||
|
nm::set_read_status(nm, &query, unread).await?;
|
||||||
|
|
||||||
|
sqlx::query!("DELETE FROM snooze WHERE id = $1", row.id)
|
||||||
|
.execute(pool)
|
||||||
|
.await?;
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
#[instrument(skip_all, fields(query=query))]
|
#[instrument(skip_all, fields(query=query))]
|
||||||
pub async fn compute_catchup_ids(
|
pub async fn compute_catchup_ids(
|
||||||
nm: &Notmuch,
|
nm: &Notmuch,
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user