brokkr-broker::utils::background_tasks Rust
Background tasks for the Brokkr Broker.
This module contains background tasks that run periodically to maintain system health and cleanup expired data.
Structs
brokkr-broker::utils::background_tasks::DiagnosticCleanupConfig
pub
Configuration for diagnostic cleanup task.
Fields
| Name | Type | Description |
|---|---|---|
interval_seconds | u64 | How often to run the cleanup (in seconds). |
max_age_hours | i64 | Maximum age for completed/expired diagnostics before deletion (in hours). |
brokkr-broker::utils::background_tasks::WorkOrderMaintenanceConfig
pub
Configuration for work order maintenance task.
Fields
| Name | Type | Description |
|---|---|---|
interval_seconds | u64 | How often to run the maintenance (in seconds). |
brokkr-broker::utils::background_tasks::WebhookDeliveryConfig
pub
Configuration for webhook delivery worker.
Fields
| Name | Type | Description |
|---|---|---|
interval_seconds | u64 | How often to poll for pending deliveries (in seconds). |
batch_size | i64 | Maximum number of deliveries to process per interval. |
brokkr-broker::utils::background_tasks::WebhookCleanupConfig
pub
Configuration for webhook cleanup task.
Fields
| Name | Type | Description |
|---|---|---|
interval_seconds | u64 | How often to run the cleanup (in seconds). |
retention_days | i64 | Number of days to retain completed/dead deliveries. |
brokkr-broker::utils::background_tasks::AuditLogCleanupConfig
pub
Configuration for audit log cleanup task.
Fields
| Name | Type | Description |
|---|---|---|
interval_seconds | u64 | How often to run the cleanup (in seconds). |
retention_days | i64 | Number of days to retain audit logs. |
Functions
brokkr-broker::utils::background_tasks::start_diagnostic_cleanup_task
pub
#![allow(unused)]
fn main() {
fn start_diagnostic_cleanup_task (dal : DAL , config : DiagnosticCleanupConfig)
}
Starts the diagnostic cleanup background task.
This task periodically:
- Expires pending diagnostic requests that have passed their expiry time
- Deletes old completed/expired/failed diagnostic requests and their results
Parameters:
| Name | Type | Description |
|---|---|---|
dal | - | The Data Access Layer instance |
config | - | Configuration for the cleanup task |
Source
#![allow(unused)]
fn main() {
pub fn start_diagnostic_cleanup_task(dal: DAL, config: DiagnosticCleanupConfig) {
info!(
"Starting diagnostic cleanup task (interval: {}s, max_age: {}h)",
config.interval_seconds, config.max_age_hours
);
tokio::spawn(async move {
let mut ticker = interval(Duration::from_secs(config.interval_seconds));
loop {
ticker.tick().await;
// Expire pending requests that have passed their expiry time
match dal.diagnostic_requests().expire_old_requests() {
Ok(expired) => {
if expired > 0 {
info!("Expired {} pending diagnostic requests", expired);
}
}
Err(e) => {
error!("Failed to expire diagnostic requests: {:?}", e);
}
}
// Delete old completed/expired/failed requests (cascades to results)
match dal
.diagnostic_requests()
.cleanup_old_requests(config.max_age_hours)
{
Ok(deleted) => {
if deleted > 0 {
info!(
"Cleaned up {} old diagnostic requests (age > {}h)",
deleted, config.max_age_hours
);
}
}
Err(e) => {
error!("Failed to cleanup old diagnostic requests: {:?}", e);
}
}
}
});
}
}
brokkr-broker::utils::background_tasks::start_work_order_maintenance_task
pub
#![allow(unused)]
fn main() {
fn start_work_order_maintenance_task (dal : DAL , config : WorkOrderMaintenanceConfig)
}
Starts the work order maintenance background task.
This task periodically:
- Moves RETRY_PENDING work orders back to PENDING when their backoff has elapsed
- Reclaims stale CLAIMED work orders that have timed out
Parameters:
| Name | Type | Description |
|---|---|---|
dal | - | The Data Access Layer instance |
config | - | Configuration for the maintenance task |
Source
#![allow(unused)]
fn main() {
pub fn start_work_order_maintenance_task(dal: DAL, config: WorkOrderMaintenanceConfig) {
info!(
"Starting work order maintenance task (interval: {}s)",
config.interval_seconds
);
tokio::spawn(async move {
let mut ticker = interval(Duration::from_secs(config.interval_seconds));
loop {
ticker.tick().await;
// Process RETRY_PENDING work orders whose backoff has elapsed
match dal.work_orders().process_retry_pending() {
Ok(count) => {
if count > 0 {
info!("Reset {} RETRY_PENDING work orders to PENDING", count);
}
}
Err(e) => {
error!("Failed to process retry pending work orders: {:?}", e);
}
}
// Reclaim stale CLAIMED work orders
match dal.work_orders().process_stale_claims() {
Ok(count) => {
if count > 0 {
info!("Released {} stale claimed work orders", count);
}
}
Err(e) => {
error!("Failed to process stale claims: {:?}", e);
}
}
}
});
}
}
brokkr-broker::utils::background_tasks::start_webhook_delivery_task
pub
#![allow(unused)]
fn main() {
fn start_webhook_delivery_task (dal : DAL , config : WebhookDeliveryConfig)
}
Starts the webhook delivery worker background task.
This task periodically:
- Releases expired acquired deliveries back to pending
- Moves failed deliveries with elapsed backoff back to pending
- Claims pending deliveries for broker (target_labels is NULL)
- Attempts to deliver each via HTTP POST
- Marks deliveries as success or failure (with retry scheduling)
Parameters:
| Name | Type | Description |
|---|---|---|
dal | - | The Data Access Layer instance |
config | - | Configuration for the delivery worker |
Source
#![allow(unused)]
fn main() {
pub fn start_webhook_delivery_task(dal: DAL, config: WebhookDeliveryConfig) {
info!(
"Starting webhook delivery worker (interval: {}s, batch_size: {})",
config.interval_seconds, config.batch_size
);
let client = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(30))
.build()
.expect("Failed to create HTTP client");
tokio::spawn(async move {
let mut ticker = interval(Duration::from_secs(config.interval_seconds));
loop {
ticker.tick().await;
// First, release any expired acquired deliveries
match dal.webhook_deliveries().release_expired() {
Ok(count) => {
if count > 0 {
debug!("Released {} expired webhook delivery claims", count);
}
}
Err(e) => {
error!("Failed to release expired webhook deliveries: {:?}", e);
}
}
// Move failed deliveries back to pending if retry time has elapsed
match dal.webhook_deliveries().process_retries() {
Ok(count) => {
if count > 0 {
debug!(
"Moved {} webhook deliveries from failed to pending for retry",
count
);
}
}
Err(e) => {
error!("Failed to process webhook retries: {:?}", e);
}
}
// Claim pending broker deliveries (target_labels is NULL)
let deliveries = match dal
.webhook_deliveries()
.claim_for_broker(config.batch_size, None)
{
Ok(d) => d,
Err(e) => {
error!("Failed to claim pending webhook deliveries: {:?}", e);
continue;
}
};
if deliveries.is_empty() {
continue;
}
debug!("Processing {} claimed webhook deliveries", deliveries.len());
for delivery in deliveries {
// Get the subscription to retrieve URL and auth header
let subscription = match dal.webhook_subscriptions().get(delivery.subscription_id) {
Ok(Some(sub)) => sub,
Ok(None) => {
warn!(
"Subscription {} not found for delivery {}, marking as dead",
delivery.subscription_id, delivery.id
);
let _ = dal.webhook_deliveries().mark_failed(
delivery.id,
"Subscription not found",
0, // Force dead
);
continue;
}
Err(e) => {
error!(
"Failed to get subscription {} for delivery {}: {:?}",
delivery.subscription_id, delivery.id, e
);
continue;
}
};
// Decrypt URL and auth header
let url = match super::encryption::decrypt_string(&subscription.url_encrypted) {
Ok(u) => u,
Err(e) => {
error!(
"Failed to decrypt URL for subscription {}: {}",
subscription.id, e
);
let _ = dal.webhook_deliveries().mark_failed(
delivery.id,
&format!("Failed to decrypt URL: {}", e),
0,
);
continue;
}
};
let auth_header = subscription
.auth_header_encrypted
.as_ref()
.map(|encrypted| super::encryption::decrypt_string(encrypted))
.transpose();
let auth_header = match auth_header {
Ok(h) => h,
Err(e) => {
error!(
"Failed to decrypt auth header for subscription {}: {}",
subscription.id, e
);
let _ = dal.webhook_deliveries().mark_failed(
delivery.id,
&format!("Failed to decrypt auth header: {}", e),
0,
);
continue;
}
};
// Attempt delivery
let result =
attempt_delivery(&client, &url, auth_header.as_deref(), &delivery.payload)
.await;
match result {
Ok(_) => match dal.webhook_deliveries().mark_success(delivery.id) {
Ok(_) => {
debug!(
"Webhook delivery {} succeeded for subscription {}",
delivery.id, subscription.id
);
}
Err(e) => {
error!(
"Failed to mark delivery {} as success: {:?}",
delivery.id, e
);
}
},
Err(error) => {
match dal.webhook_deliveries().mark_failed(
delivery.id,
&error,
subscription.max_retries,
) {
Ok(updated) => {
if updated.status == "dead" {
warn!(
"Webhook delivery {} dead after {} attempts: {}",
delivery.id, updated.attempts, error
);
} else {
debug!(
"Webhook delivery {} failed (attempt {}), will retry: {}",
delivery.id, updated.attempts, error
);
}
}
Err(e) => {
error!(
"Failed to mark delivery {} as failed: {:?}",
delivery.id, e
);
}
}
}
}
}
}
});
}
}
brokkr-broker::utils::background_tasks::attempt_delivery
private
#![allow(unused)]
fn main() {
async fn attempt_delivery (client : & reqwest :: Client , url : & str , auth_header : Option < & str > , payload : & str ,) -> Result < () , String >
}
Attempts to deliver a webhook payload via HTTP POST.
Source
#![allow(unused)]
fn main() {
async fn attempt_delivery(
client: &reqwest::Client,
url: &str,
auth_header: Option<&str>,
payload: &str,
) -> Result<(), String> {
let mut request = client
.post(url)
.header("Content-Type", "application/json")
.body(payload.to_string());
if let Some(auth) = auth_header {
request = request.header("Authorization", auth);
}
let response = request
.send()
.await
.map_err(|e| format!("Request failed: {}", e))?;
let status = response.status();
if status.is_success() {
Ok(())
} else {
let body = response.text().await.unwrap_or_default();
Err(format!(
"HTTP {}: {}",
status,
body.chars().take(200).collect::<String>()
))
}
}
}
brokkr-broker::utils::background_tasks::start_webhook_cleanup_task
pub
#![allow(unused)]
fn main() {
fn start_webhook_cleanup_task (dal : DAL , config : WebhookCleanupConfig)
}
Starts the webhook cleanup background task.
This task periodically deletes old completed/dead deliveries based on the retention policy.
Parameters:
| Name | Type | Description |
|---|---|---|
dal | - | The Data Access Layer instance |
config | - | Configuration for the cleanup task |
Source
#![allow(unused)]
fn main() {
pub fn start_webhook_cleanup_task(dal: DAL, config: WebhookCleanupConfig) {
info!(
"Starting webhook cleanup task (interval: {}s, retention: {}d)",
config.interval_seconds, config.retention_days
);
tokio::spawn(async move {
let mut ticker = interval(Duration::from_secs(config.interval_seconds));
loop {
ticker.tick().await;
match dal.webhook_deliveries().cleanup_old(config.retention_days) {
Ok(deleted) => {
if deleted > 0 {
info!(
"Cleaned up {} old webhook deliveries (age > {}d)",
deleted, config.retention_days
);
}
}
Err(e) => {
error!("Failed to cleanup old webhook deliveries: {:?}", e);
}
}
}
});
}
}
brokkr-broker::utils::background_tasks::start_audit_log_cleanup_task
pub
#![allow(unused)]
fn main() {
fn start_audit_log_cleanup_task (dal : DAL , config : AuditLogCleanupConfig)
}
Starts the audit log cleanup background task.
This task periodically deletes old audit log entries based on the configured retention policy.
Parameters:
| Name | Type | Description |
|---|---|---|
dal | - | The Data Access Layer instance |
config | - | Configuration for the cleanup task |
Source
#![allow(unused)]
fn main() {
pub fn start_audit_log_cleanup_task(dal: DAL, config: AuditLogCleanupConfig) {
info!(
"Starting audit log cleanup task (interval: {}s, retention: {}d)",
config.interval_seconds, config.retention_days
);
tokio::spawn(async move {
let mut ticker = interval(Duration::from_secs(config.interval_seconds));
loop {
ticker.tick().await;
match dal.audit_logs().cleanup_old_logs(config.retention_days) {
Ok(deleted) => {
if deleted > 0 {
info!(
"Cleaned up {} old audit logs (age > {}d)",
deleted, config.retention_days
);
}
}
Err(e) => {
error!("Failed to cleanup old audit logs: {:?}", e);
}
}
}
});
}
}