Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

brokkr-broker::utils::background_tasks Rust

Background tasks for the Brokkr Broker.

This module contains background tasks that run periodically to maintain system health and cleanup expired data.

Structs

brokkr-broker::utils::background_tasks::DiagnosticCleanupConfig

pub

Configuration for diagnostic cleanup task.

Fields

NameTypeDescription
interval_secondsu64How often to run the cleanup (in seconds).
max_age_hoursi64Maximum age for completed/expired diagnostics before deletion (in hours).

brokkr-broker::utils::background_tasks::WorkOrderMaintenanceConfig

pub

Configuration for work order maintenance task.

Fields

NameTypeDescription
interval_secondsu64How often to run the maintenance (in seconds).

brokkr-broker::utils::background_tasks::WebhookDeliveryConfig

pub

Configuration for webhook delivery worker.

Fields

NameTypeDescription
interval_secondsu64How often to poll for pending deliveries (in seconds).
batch_sizei64Maximum number of deliveries to process per interval.

brokkr-broker::utils::background_tasks::WebhookCleanupConfig

pub

Configuration for webhook cleanup task.

Fields

NameTypeDescription
interval_secondsu64How often to run the cleanup (in seconds).
retention_daysi64Number of days to retain completed/dead deliveries.

brokkr-broker::utils::background_tasks::AuditLogCleanupConfig

pub

Configuration for audit log cleanup task.

Fields

NameTypeDescription
interval_secondsu64How often to run the cleanup (in seconds).
retention_daysi64Number of days to retain audit logs.

Functions

brokkr-broker::utils::background_tasks::start_diagnostic_cleanup_task

pub

#![allow(unused)]
fn main() {
fn start_diagnostic_cleanup_task (dal : DAL , config : DiagnosticCleanupConfig)
}

Starts the diagnostic cleanup background task.

This task periodically:

  1. Expires pending diagnostic requests that have passed their expiry time
  2. Deletes old completed/expired/failed diagnostic requests and their results

Parameters:

NameTypeDescription
dal-The Data Access Layer instance
config-Configuration for the cleanup task
Source
#![allow(unused)]
fn main() {
pub fn start_diagnostic_cleanup_task(dal: DAL, config: DiagnosticCleanupConfig) {
    info!(
        "Starting diagnostic cleanup task (interval: {}s, max_age: {}h)",
        config.interval_seconds, config.max_age_hours
    );

    tokio::spawn(async move {
        let mut ticker = interval(Duration::from_secs(config.interval_seconds));

        loop {
            ticker.tick().await;

            // Expire pending requests that have passed their expiry time
            match dal.diagnostic_requests().expire_old_requests() {
                Ok(expired) => {
                    if expired > 0 {
                        info!("Expired {} pending diagnostic requests", expired);
                    }
                }
                Err(e) => {
                    error!("Failed to expire diagnostic requests: {:?}", e);
                }
            }

            // Delete old completed/expired/failed requests (cascades to results)
            match dal
                .diagnostic_requests()
                .cleanup_old_requests(config.max_age_hours)
            {
                Ok(deleted) => {
                    if deleted > 0 {
                        info!(
                            "Cleaned up {} old diagnostic requests (age > {}h)",
                            deleted, config.max_age_hours
                        );
                    }
                }
                Err(e) => {
                    error!("Failed to cleanup old diagnostic requests: {:?}", e);
                }
            }
        }
    });
}
}

brokkr-broker::utils::background_tasks::start_work_order_maintenance_task

pub

#![allow(unused)]
fn main() {
fn start_work_order_maintenance_task (dal : DAL , config : WorkOrderMaintenanceConfig)
}

Starts the work order maintenance background task.

This task periodically:

  1. Moves RETRY_PENDING work orders back to PENDING when their backoff has elapsed
  2. Reclaims stale CLAIMED work orders that have timed out

Parameters:

NameTypeDescription
dal-The Data Access Layer instance
config-Configuration for the maintenance task
Source
#![allow(unused)]
fn main() {
pub fn start_work_order_maintenance_task(dal: DAL, config: WorkOrderMaintenanceConfig) {
    info!(
        "Starting work order maintenance task (interval: {}s)",
        config.interval_seconds
    );

    tokio::spawn(async move {
        let mut ticker = interval(Duration::from_secs(config.interval_seconds));

        loop {
            ticker.tick().await;

            // Process RETRY_PENDING work orders whose backoff has elapsed
            match dal.work_orders().process_retry_pending() {
                Ok(count) => {
                    if count > 0 {
                        info!("Reset {} RETRY_PENDING work orders to PENDING", count);
                    }
                }
                Err(e) => {
                    error!("Failed to process retry pending work orders: {:?}", e);
                }
            }

            // Reclaim stale CLAIMED work orders
            match dal.work_orders().process_stale_claims() {
                Ok(count) => {
                    if count > 0 {
                        info!("Released {} stale claimed work orders", count);
                    }
                }
                Err(e) => {
                    error!("Failed to process stale claims: {:?}", e);
                }
            }
        }
    });
}
}

brokkr-broker::utils::background_tasks::start_webhook_delivery_task

pub

#![allow(unused)]
fn main() {
fn start_webhook_delivery_task (dal : DAL , config : WebhookDeliveryConfig)
}

Starts the webhook delivery worker background task.

This task periodically:

  1. Releases expired acquired deliveries back to pending
  2. Moves failed deliveries with elapsed backoff back to pending
  3. Claims pending deliveries for broker (target_labels is NULL)
  4. Attempts to deliver each via HTTP POST
  5. Marks deliveries as success or failure (with retry scheduling)

Parameters:

NameTypeDescription
dal-The Data Access Layer instance
config-Configuration for the delivery worker
Source
#![allow(unused)]
fn main() {
pub fn start_webhook_delivery_task(dal: DAL, config: WebhookDeliveryConfig) {
    info!(
        "Starting webhook delivery worker (interval: {}s, batch_size: {})",
        config.interval_seconds, config.batch_size
    );

    let client = reqwest::Client::builder()
        .timeout(std::time::Duration::from_secs(30))
        .build()
        .expect("Failed to create HTTP client");

    tokio::spawn(async move {
        let mut ticker = interval(Duration::from_secs(config.interval_seconds));

        loop {
            ticker.tick().await;

            // First, release any expired acquired deliveries
            match dal.webhook_deliveries().release_expired() {
                Ok(count) => {
                    if count > 0 {
                        debug!("Released {} expired webhook delivery claims", count);
                    }
                }
                Err(e) => {
                    error!("Failed to release expired webhook deliveries: {:?}", e);
                }
            }

            // Move failed deliveries back to pending if retry time has elapsed
            match dal.webhook_deliveries().process_retries() {
                Ok(count) => {
                    if count > 0 {
                        debug!(
                            "Moved {} webhook deliveries from failed to pending for retry",
                            count
                        );
                    }
                }
                Err(e) => {
                    error!("Failed to process webhook retries: {:?}", e);
                }
            }

            // Claim pending broker deliveries (target_labels is NULL)
            let deliveries = match dal
                .webhook_deliveries()
                .claim_for_broker(config.batch_size, None)
            {
                Ok(d) => d,
                Err(e) => {
                    error!("Failed to claim pending webhook deliveries: {:?}", e);
                    continue;
                }
            };

            if deliveries.is_empty() {
                continue;
            }

            debug!("Processing {} claimed webhook deliveries", deliveries.len());

            for delivery in deliveries {
                // Get the subscription to retrieve URL and auth header
                let subscription = match dal.webhook_subscriptions().get(delivery.subscription_id) {
                    Ok(Some(sub)) => sub,
                    Ok(None) => {
                        warn!(
                            "Subscription {} not found for delivery {}, marking as dead",
                            delivery.subscription_id, delivery.id
                        );
                        let _ = dal.webhook_deliveries().mark_failed(
                            delivery.id,
                            "Subscription not found",
                            0, // Force dead
                        );
                        continue;
                    }
                    Err(e) => {
                        error!(
                            "Failed to get subscription {} for delivery {}: {:?}",
                            delivery.subscription_id, delivery.id, e
                        );
                        continue;
                    }
                };

                // Decrypt URL and auth header
                let url = match super::encryption::decrypt_string(&subscription.url_encrypted) {
                    Ok(u) => u,
                    Err(e) => {
                        error!(
                            "Failed to decrypt URL for subscription {}: {}",
                            subscription.id, e
                        );
                        let _ = dal.webhook_deliveries().mark_failed(
                            delivery.id,
                            &format!("Failed to decrypt URL: {}", e),
                            0,
                        );
                        continue;
                    }
                };

                let auth_header = subscription
                    .auth_header_encrypted
                    .as_ref()
                    .map(|encrypted| super::encryption::decrypt_string(encrypted))
                    .transpose();

                let auth_header = match auth_header {
                    Ok(h) => h,
                    Err(e) => {
                        error!(
                            "Failed to decrypt auth header for subscription {}: {}",
                            subscription.id, e
                        );
                        let _ = dal.webhook_deliveries().mark_failed(
                            delivery.id,
                            &format!("Failed to decrypt auth header: {}", e),
                            0,
                        );
                        continue;
                    }
                };

                // Attempt delivery
                let result =
                    attempt_delivery(&client, &url, auth_header.as_deref(), &delivery.payload)
                        .await;

                match result {
                    Ok(_) => match dal.webhook_deliveries().mark_success(delivery.id) {
                        Ok(_) => {
                            debug!(
                                "Webhook delivery {} succeeded for subscription {}",
                                delivery.id, subscription.id
                            );
                        }
                        Err(e) => {
                            error!(
                                "Failed to mark delivery {} as success: {:?}",
                                delivery.id, e
                            );
                        }
                    },
                    Err(error) => {
                        match dal.webhook_deliveries().mark_failed(
                            delivery.id,
                            &error,
                            subscription.max_retries,
                        ) {
                            Ok(updated) => {
                                if updated.status == "dead" {
                                    warn!(
                                        "Webhook delivery {} dead after {} attempts: {}",
                                        delivery.id, updated.attempts, error
                                    );
                                } else {
                                    debug!(
                                        "Webhook delivery {} failed (attempt {}), will retry: {}",
                                        delivery.id, updated.attempts, error
                                    );
                                }
                            }
                            Err(e) => {
                                error!(
                                    "Failed to mark delivery {} as failed: {:?}",
                                    delivery.id, e
                                );
                            }
                        }
                    }
                }
            }
        }
    });
}
}

brokkr-broker::utils::background_tasks::attempt_delivery

private

#![allow(unused)]
fn main() {
async fn attempt_delivery (client : & reqwest :: Client , url : & str , auth_header : Option < & str > , payload : & str ,) -> Result < () , String >
}

Attempts to deliver a webhook payload via HTTP POST.

Source
#![allow(unused)]
fn main() {
async fn attempt_delivery(
    client: &reqwest::Client,
    url: &str,
    auth_header: Option<&str>,
    payload: &str,
) -> Result<(), String> {
    let mut request = client
        .post(url)
        .header("Content-Type", "application/json")
        .body(payload.to_string());

    if let Some(auth) = auth_header {
        request = request.header("Authorization", auth);
    }

    let response = request
        .send()
        .await
        .map_err(|e| format!("Request failed: {}", e))?;

    let status = response.status();
    if status.is_success() {
        Ok(())
    } else {
        let body = response.text().await.unwrap_or_default();
        Err(format!(
            "HTTP {}: {}",
            status,
            body.chars().take(200).collect::<String>()
        ))
    }
}
}

brokkr-broker::utils::background_tasks::start_webhook_cleanup_task

pub

#![allow(unused)]
fn main() {
fn start_webhook_cleanup_task (dal : DAL , config : WebhookCleanupConfig)
}

Starts the webhook cleanup background task.

This task periodically deletes old completed/dead deliveries based on the retention policy.

Parameters:

NameTypeDescription
dal-The Data Access Layer instance
config-Configuration for the cleanup task
Source
#![allow(unused)]
fn main() {
pub fn start_webhook_cleanup_task(dal: DAL, config: WebhookCleanupConfig) {
    info!(
        "Starting webhook cleanup task (interval: {}s, retention: {}d)",
        config.interval_seconds, config.retention_days
    );

    tokio::spawn(async move {
        let mut ticker = interval(Duration::from_secs(config.interval_seconds));

        loop {
            ticker.tick().await;

            match dal.webhook_deliveries().cleanup_old(config.retention_days) {
                Ok(deleted) => {
                    if deleted > 0 {
                        info!(
                            "Cleaned up {} old webhook deliveries (age > {}d)",
                            deleted, config.retention_days
                        );
                    }
                }
                Err(e) => {
                    error!("Failed to cleanup old webhook deliveries: {:?}", e);
                }
            }
        }
    });
}
}

brokkr-broker::utils::background_tasks::start_audit_log_cleanup_task

pub

#![allow(unused)]
fn main() {
fn start_audit_log_cleanup_task (dal : DAL , config : AuditLogCleanupConfig)
}

Starts the audit log cleanup background task.

This task periodically deletes old audit log entries based on the configured retention policy.

Parameters:

NameTypeDescription
dal-The Data Access Layer instance
config-Configuration for the cleanup task
Source
#![allow(unused)]
fn main() {
pub fn start_audit_log_cleanup_task(dal: DAL, config: AuditLogCleanupConfig) {
    info!(
        "Starting audit log cleanup task (interval: {}s, retention: {}d)",
        config.interval_seconds, config.retention_days
    );

    tokio::spawn(async move {
        let mut ticker = interval(Duration::from_secs(config.interval_seconds));

        loop {
            ticker.tick().await;

            match dal.audit_logs().cleanup_old_logs(config.retention_days) {
                Ok(deleted) => {
                    if deleted > 0 {
                        info!(
                            "Cleaned up {} old audit logs (age > {}d)",
                            deleted, config.retention_days
                        );
                    }
                }
                Err(e) => {
                    error!("Failed to cleanup old audit logs: {:?}", e);
                }
            }
        }
    });
}
}