Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

brokkr-agent::webhooks Rust

Webhook delivery module for agent-side webhook processing.

This module provides functionality for agents to poll for pending webhooks assigned to them, deliver them via HTTP, and report results back to the broker.

Structs

brokkr-agent::webhooks::PendingWebhookDelivery

pub

Derives: Debug, Clone, Deserialize

Pending webhook delivery from the broker. Contains decrypted URL and auth header for delivery.

Fields

NameTypeDescription
idUuidDelivery ID.
subscription_idUuidSubscription ID.
event_typeStringEvent type being delivered.
payloadStringJSON-encoded event payload.
urlStringDecrypted webhook URL.
auth_headerOption < String >Decrypted Authorization header (if configured).
timeout_secondsi32HTTP timeout in seconds.
max_retriesi32Maximum retries for this subscription.
attemptsi32Current attempt number.

brokkr-agent::webhooks::DeliveryResultRequest

pub

Derives: Debug, Clone, Serialize

Request body for reporting delivery result to broker.

Fields

NameTypeDescription
successboolWhether delivery succeeded.
status_codeOption < i32 >HTTP status code (if available).
errorOption < String >Error message (if failed).
duration_msOption < i64 >Delivery duration in milliseconds.

brokkr-agent::webhooks::DeliveryResult

pub

Derives: Debug

Result of a webhook delivery attempt.

Fields

NameTypeDescription
successboolWhether delivery succeeded.
status_codeOption < i32 >HTTP status code (if available).
errorOption < String >Error message (if failed).
duration_msi64Delivery duration in milliseconds.

Functions

brokkr-agent::webhooks::status_u16

private

#![allow(unused)]
fn main() {
fn status_u16 (err : & BrokkrError) -> Option < u16 >
}
Source
#![allow(unused)]
fn main() {
fn status_u16(err: &BrokkrError) -> Option<u16> {
    err.status().map(|s| s.as_u16())
}
}

brokkr-agent::webhooks::convert

private

#![allow(unused)]
fn main() {
fn convert < F : Serialize , T : DeserializeOwned > (value : F) -> Result < T , serde_json :: Error >
}
Source
#![allow(unused)]
fn main() {
fn convert<F: Serialize, T: DeserializeOwned>(value: F) -> Result<T, serde_json::Error> {
    let v = serde_json::to_value(value)?;
    serde_json::from_value(v)
}
}

brokkr-agent::webhooks::boxed

private

#![allow(unused)]
fn main() {
fn boxed (prefix : & str , err : BrokkrError) -> Box < dyn std :: error :: Error >
}
Source
#![allow(unused)]
fn main() {
fn boxed(prefix: &str, err: BrokkrError) -> Box<dyn std::error::Error> {
    let msg = match status_u16(&err) {
        Some(s) => format!("{prefix}. Status: {s}, Error: {err}"),
        None => format!("{prefix}: {err}"),
    };
    msg.into()
}
}

brokkr-agent::webhooks::fetch_pending_webhooks

pub

#![allow(unused)]
fn main() {
async fn fetch_pending_webhooks (_config : & Settings , client : & BrokkrClient , agent : & Agent ,) -> Result < Vec < PendingWebhookDelivery > , Box < dyn std :: error :: Error > >
}

Fetches pending webhook deliveries for this agent from the broker.

Parameters:

NameTypeDescription
config-Application settings containing broker configuration
client-HTTP client for making requests
agent-Agent details

Returns:

Pending webhook deliveries or error

Source
#![allow(unused)]
fn main() {
pub async fn fetch_pending_webhooks(
    _config: &Settings,
    client: &BrokkrClient,
    agent: &Agent,
) -> Result<Vec<PendingWebhookDelivery>, Box<dyn std::error::Error>> {
    debug!("Fetching pending webhooks for agent {}", agent.name);

    match client
        .api()
        .get_pending_agent_webhooks()
        .agent_id(agent.id)
        .send()
        .await
    {
        Ok(rv) => {
            let deliveries: Vec<PendingWebhookDelivery> =
                convert(rv.into_inner()).map_err(|e| {
                    error!("Failed to convert pending webhooks: {}", e);
                    Box::new(e) as Box<dyn std::error::Error>
                })?;
            if !deliveries.is_empty() {
                debug!(
                    "Fetched {} pending webhook deliveries for agent {}",
                    deliveries.len(),
                    agent.name
                );
            }
            Ok(deliveries)
        }
        Err(raw) => {
            let wrapped = BrokkrError::from(raw);
            error!("Failed to fetch pending webhooks: {}", wrapped);
            Err(boxed("Failed to fetch pending webhooks", wrapped))
        }
    }
}
}

brokkr-agent::webhooks::report_delivery_result

pub

#![allow(unused)]
fn main() {
async fn report_delivery_result (_config : & Settings , client : & BrokkrClient , delivery_id : Uuid , result : & DeliveryResult ,) -> Result < () , Box < dyn std :: error :: Error > >
}

Reports the result of a webhook delivery attempt to the broker.

Parameters:

NameTypeDescription
config-Application settings containing broker configuration
client-HTTP client for making requests
delivery_id-ID of the delivery being reported
result-The delivery result

Returns:

Success or error

Source
#![allow(unused)]
fn main() {
pub async fn report_delivery_result(
    _config: &Settings,
    client: &BrokkrClient,
    delivery_id: Uuid,
    result: &DeliveryResult,
) -> Result<(), Box<dyn std::error::Error>> {
    debug!("Reporting delivery result for {}", delivery_id);

    let request_body = DeliveryResultRequest {
        success: result.success,
        status_code: result.status_code,
        error: result.error.clone(),
        duration_ms: Some(result.duration_ms),
    };
    let sdk_body: brokkr_client::types::DeliveryResultRequest =
        convert(request_body).map_err(|e| {
            error!("Failed to convert DeliveryResultRequest: {}", e);
            Box::new(e) as Box<dyn std::error::Error>
        })?;

    match client
        .api()
        .report_delivery_result()
        .id(delivery_id)
        .body(sdk_body)
        .send()
        .await
    {
        Ok(_) => {
            debug!("Successfully reported delivery result for {}", delivery_id);
            Ok(())
        }
        Err(raw) => {
            let wrapped = BrokkrError::from(raw);
            error!(
                "Failed to report delivery result for {}: {}",
                delivery_id, wrapped
            );
            Err(boxed("Failed to report delivery result", wrapped))
        }
    }
}
}

brokkr-agent::webhooks::deliver_webhook

pub

#![allow(unused)]
fn main() {
async fn deliver_webhook (delivery : & PendingWebhookDelivery) -> DeliveryResult
}

Delivers a webhook via HTTP POST.

Parameters:

NameTypeDescription
delivery-The pending webhook delivery with URL and payload

Returns:

DeliveryResult with success/failure info and timing

Source
#![allow(unused)]
fn main() {
pub async fn deliver_webhook(delivery: &PendingWebhookDelivery) -> DeliveryResult {
    let start = Instant::now();

    // Build HTTP client with timeout
    let client = match reqwest::Client::builder()
        .timeout(Duration::from_secs(delivery.timeout_seconds as u64))
        .build()
    {
        Ok(c) => c,
        Err(e) => {
            return DeliveryResult {
                success: false,
                status_code: None,
                error: Some(format!("Failed to create HTTP client: {}", e)),
                duration_ms: start.elapsed().as_millis() as i64,
            };
        }
    };

    // Build the request
    let mut request = client
        .post(&delivery.url)
        .header("Content-Type", "application/json")
        .header("X-Brokkr-Event-Type", &delivery.event_type)
        .header("X-Brokkr-Delivery-Id", delivery.id.to_string())
        .body(delivery.payload.clone());

    // Add authorization header if present
    if let Some(ref auth) = delivery.auth_header {
        request = request.header("Authorization", auth);
    }

    // Send the request
    match request.send().await {
        Ok(response) => {
            let status_code = response.status().as_u16() as i32;
            let duration_ms = start.elapsed().as_millis() as i64;

            if response.status().is_success() {
                debug!(
                    "Webhook delivery {} succeeded with status {} in {}ms",
                    delivery.id, status_code, duration_ms
                );
                DeliveryResult {
                    success: true,
                    status_code: Some(status_code),
                    error: None,
                    duration_ms,
                }
            } else {
                // Get error body for context (limit to 500 chars)
                let error_body = response
                    .text()
                    .await
                    .unwrap_or_else(|_| "Failed to read response body".to_string());
                let error_preview: String = error_body.chars().take(500).collect();

                warn!(
                    "Webhook delivery {} failed with status {}: {}",
                    delivery.id, status_code, error_preview
                );

                DeliveryResult {
                    success: false,
                    status_code: Some(status_code),
                    error: Some(format!("HTTP {}: {}", status_code, error_preview)),
                    duration_ms,
                }
            }
        }
        Err(e) => {
            let duration_ms = start.elapsed().as_millis() as i64;
            let error_msg = classify_error(&e);

            warn!(
                "Webhook delivery {} failed after {}ms: {}",
                delivery.id, duration_ms, error_msg
            );

            DeliveryResult {
                success: false,
                status_code: None,
                error: Some(error_msg),
                duration_ms,
            }
        }
    }
}
}

brokkr-agent::webhooks::classify_error

private

#![allow(unused)]
fn main() {
fn classify_error (error : & reqwest :: Error) -> String
}

Classifies request errors for logging and retry decisions.

Source
#![allow(unused)]
fn main() {
fn classify_error(error: &reqwest::Error) -> String {
    if error.is_timeout() {
        "Request timed out".to_string()
    } else if error.is_connect() {
        "Connection failed".to_string()
    } else if error.is_request() {
        format!("Request error: {}", error)
    } else {
        format!("Error: {}", error)
    }
}
}

brokkr-agent::webhooks::process_pending_webhooks

pub

#![allow(unused)]
fn main() {
async fn process_pending_webhooks (config : & Settings , client : & BrokkrClient , agent : & Agent ,) -> Result < usize , Box < dyn std :: error :: Error > >
}

Processes all pending webhook deliveries for this agent.

This function:

  1. Fetches pending webhooks from the broker
  2. Delivers each webhook via HTTP
  3. Reports results back to the broker

Parameters:

NameTypeDescription
config-Application settings
client-HTTP client for broker communication
agent-Agent details

Returns:

Number of webhooks processed or error

Source
#![allow(unused)]
fn main() {
pub async fn process_pending_webhooks(
    config: &Settings,
    client: &BrokkrClient,
    agent: &Agent,
) -> Result<usize, Box<dyn std::error::Error>> {
    // Fetch pending deliveries from broker
    let deliveries = fetch_pending_webhooks(config, client, agent).await?;

    if deliveries.is_empty() {
        return Ok(0);
    }

    info!(
        "Processing {} pending webhook deliveries for agent {}",
        deliveries.len(),
        agent.name
    );

    let mut processed = 0;

    for delivery in deliveries {
        debug!(
            "Delivering webhook {} (event: {}, attempt: {})",
            delivery.id,
            delivery.event_type,
            delivery.attempts + 1
        );

        // Deliver the webhook
        let result = deliver_webhook(&delivery).await;

        // Report result to broker
        if let Err(e) = report_delivery_result(config, client, delivery.id, &result).await {
            error!(
                "Failed to report delivery result for {}: {}",
                delivery.id, e
            );
            // Continue processing other deliveries even if reporting fails
        }

        processed += 1;

        if result.success {
            info!(
                "Webhook delivery {} succeeded in {}ms",
                delivery.id, result.duration_ms
            );
        } else {
            warn!(
                "Webhook delivery {} failed: {:?}",
                delivery.id,
                result.error.as_deref().unwrap_or("unknown error")
            );
        }
    }

    Ok(processed)
}
}