brokkr-agent::webhooks Rust
Webhook delivery module for agent-side webhook processing.
This module provides functionality for agents to poll for pending webhooks assigned to them, deliver them via HTTP, and report results back to the broker.
Structs
brokkr-agent::webhooks::PendingWebhookDelivery
pub
Derives: Debug, Clone, Deserialize
Pending webhook delivery from the broker. Contains decrypted URL and auth header for delivery.
Fields
| Name | Type | Description |
|---|---|---|
id | Uuid | Delivery ID. |
subscription_id | Uuid | Subscription ID. |
event_type | String | Event type being delivered. |
payload | String | JSON-encoded event payload. |
url | String | Decrypted webhook URL. |
auth_header | Option < String > | Decrypted Authorization header (if configured). |
timeout_seconds | i32 | HTTP timeout in seconds. |
max_retries | i32 | Maximum retries for this subscription. |
attempts | i32 | Current attempt number. |
brokkr-agent::webhooks::DeliveryResultRequest
pub
Derives: Debug, Clone, Serialize
Request body for reporting delivery result to broker.
Fields
| Name | Type | Description |
|---|---|---|
success | bool | Whether delivery succeeded. |
status_code | Option < i32 > | HTTP status code (if available). |
error | Option < String > | Error message (if failed). |
duration_ms | Option < i64 > | Delivery duration in milliseconds. |
brokkr-agent::webhooks::DeliveryResult
pub
Derives: Debug
Result of a webhook delivery attempt.
Fields
| Name | Type | Description |
|---|---|---|
success | bool | Whether delivery succeeded. |
status_code | Option < i32 > | HTTP status code (if available). |
error | Option < String > | Error message (if failed). |
duration_ms | i64 | Delivery duration in milliseconds. |
Functions
brokkr-agent::webhooks::fetch_pending_webhooks
pub
#![allow(unused)]
fn main() {
async fn fetch_pending_webhooks (config : & Settings , client : & Client , agent : & Agent ,) -> Result < Vec < PendingWebhookDelivery > , Box < dyn std :: error :: Error > >
}
Fetches pending webhook deliveries for this agent from the broker.
Parameters:
| Name | Type | Description |
|---|---|---|
config | - | Application settings containing broker configuration |
client | - | HTTP client for making requests |
agent | - | Agent details |
Returns:
Pending webhook deliveries or error
Source
#![allow(unused)]
fn main() {
pub async fn fetch_pending_webhooks(
config: &Settings,
client: &Client,
agent: &Agent,
) -> Result<Vec<PendingWebhookDelivery>, Box<dyn std::error::Error>> {
let url = format!(
"{}/api/v1/agents/{}/webhooks/pending",
config.agent.broker_url, agent.id
);
debug!("Fetching pending webhooks from {}", url);
let response = client
.get(&url)
.header("Authorization", format!("Bearer {}", config.agent.pak))
.send()
.await
.map_err(|e| {
error!("Failed to fetch pending webhooks: {}", e);
Box::new(e) as Box<dyn std::error::Error>
})?;
match response.status() {
StatusCode::OK => {
let deliveries: Vec<PendingWebhookDelivery> = response.json().await.map_err(|e| {
error!("Failed to deserialize pending webhooks: {}", e);
Box::new(e) as Box<dyn std::error::Error>
})?;
if !deliveries.is_empty() {
debug!(
"Fetched {} pending webhook deliveries for agent {}",
deliveries.len(),
agent.name
);
}
Ok(deliveries)
}
status => {
let error_body = response.text().await.unwrap_or_default();
error!(
"Failed to fetch pending webhooks. Status {}: {}",
status, error_body
);
Err(format!(
"Failed to fetch pending webhooks. Status: {}, Body: {}",
status, error_body
)
.into())
}
}
}
}
brokkr-agent::webhooks::report_delivery_result
pub
#![allow(unused)]
fn main() {
async fn report_delivery_result (config : & Settings , client : & Client , delivery_id : Uuid , result : & DeliveryResult ,) -> Result < () , Box < dyn std :: error :: Error > >
}
Reports the result of a webhook delivery attempt to the broker.
Parameters:
| Name | Type | Description |
|---|---|---|
config | - | Application settings containing broker configuration |
client | - | HTTP client for making requests |
delivery_id | - | ID of the delivery being reported |
result | - | The delivery result |
Returns:
Success or error
Source
#![allow(unused)]
fn main() {
pub async fn report_delivery_result(
config: &Settings,
client: &Client,
delivery_id: Uuid,
result: &DeliveryResult,
) -> Result<(), Box<dyn std::error::Error>> {
let url = format!(
"{}/api/v1/webhook-deliveries/{}/result",
config.agent.broker_url, delivery_id
);
debug!("Reporting delivery result for {} to {}", delivery_id, url);
let request_body = DeliveryResultRequest {
success: result.success,
status_code: result.status_code,
error: result.error.clone(),
duration_ms: Some(result.duration_ms),
};
let response = client
.post(&url)
.header("Authorization", format!("Bearer {}", config.agent.pak))
.json(&request_body)
.send()
.await
.map_err(|e| {
error!("Failed to report delivery result: {}", e);
Box::new(e) as Box<dyn std::error::Error>
})?;
match response.status() {
StatusCode::OK => {
debug!("Successfully reported delivery result for {}", delivery_id);
Ok(())
}
status => {
let error_body = response.text().await.unwrap_or_default();
error!(
"Failed to report delivery result for {}. Status {}: {}",
delivery_id, status, error_body
);
Err(format!(
"Failed to report delivery result. Status: {}, Body: {}",
status, error_body
)
.into())
}
}
}
}
brokkr-agent::webhooks::deliver_webhook
pub
#![allow(unused)]
fn main() {
async fn deliver_webhook (delivery : & PendingWebhookDelivery) -> DeliveryResult
}
Delivers a webhook via HTTP POST.
Parameters:
| Name | Type | Description |
|---|---|---|
delivery | - | The pending webhook delivery with URL and payload |
Returns:
DeliveryResult with success/failure info and timing
Source
#![allow(unused)]
fn main() {
pub async fn deliver_webhook(delivery: &PendingWebhookDelivery) -> DeliveryResult {
let start = Instant::now();
// Build HTTP client with timeout
let client = match reqwest::Client::builder()
.timeout(Duration::from_secs(delivery.timeout_seconds as u64))
.build()
{
Ok(c) => c,
Err(e) => {
return DeliveryResult {
success: false,
status_code: None,
error: Some(format!("Failed to create HTTP client: {}", e)),
duration_ms: start.elapsed().as_millis() as i64,
};
}
};
// Build the request
let mut request = client
.post(&delivery.url)
.header("Content-Type", "application/json")
.header("X-Brokkr-Event-Type", &delivery.event_type)
.header("X-Brokkr-Delivery-Id", delivery.id.to_string())
.body(delivery.payload.clone());
// Add authorization header if present
if let Some(ref auth) = delivery.auth_header {
request = request.header("Authorization", auth);
}
// Send the request
match request.send().await {
Ok(response) => {
let status_code = response.status().as_u16() as i32;
let duration_ms = start.elapsed().as_millis() as i64;
if response.status().is_success() {
debug!(
"Webhook delivery {} succeeded with status {} in {}ms",
delivery.id, status_code, duration_ms
);
DeliveryResult {
success: true,
status_code: Some(status_code),
error: None,
duration_ms,
}
} else {
// Get error body for context (limit to 500 chars)
let error_body = response
.text()
.await
.unwrap_or_else(|_| "Failed to read response body".to_string());
let error_preview: String = error_body.chars().take(500).collect();
warn!(
"Webhook delivery {} failed with status {}: {}",
delivery.id, status_code, error_preview
);
DeliveryResult {
success: false,
status_code: Some(status_code),
error: Some(format!("HTTP {}: {}", status_code, error_preview)),
duration_ms,
}
}
}
Err(e) => {
let duration_ms = start.elapsed().as_millis() as i64;
let error_msg = classify_error(&e);
warn!(
"Webhook delivery {} failed after {}ms: {}",
delivery.id, duration_ms, error_msg
);
DeliveryResult {
success: false,
status_code: None,
error: Some(error_msg),
duration_ms,
}
}
}
}
}
brokkr-agent::webhooks::classify_error
private
#![allow(unused)]
fn main() {
fn classify_error (error : & reqwest :: Error) -> String
}
Classifies request errors for logging and retry decisions.
Source
#![allow(unused)]
fn main() {
fn classify_error(error: &reqwest::Error) -> String {
if error.is_timeout() {
"Request timed out".to_string()
} else if error.is_connect() {
"Connection failed".to_string()
} else if error.is_request() {
format!("Request error: {}", error)
} else {
format!("Error: {}", error)
}
}
}
brokkr-agent::webhooks::process_pending_webhooks
pub
#![allow(unused)]
fn main() {
async fn process_pending_webhooks (config : & Settings , client : & Client , agent : & Agent ,) -> Result < usize , Box < dyn std :: error :: Error > >
}
Processes all pending webhook deliveries for this agent.
This function:
- Fetches pending webhooks from the broker
- Delivers each webhook via HTTP
- Reports results back to the broker
Parameters:
| Name | Type | Description |
|---|---|---|
config | - | Application settings |
client | - | HTTP client for broker communication |
agent | - | Agent details |
Returns:
Number of webhooks processed or error
Source
#![allow(unused)]
fn main() {
pub async fn process_pending_webhooks(
config: &Settings,
client: &Client,
agent: &Agent,
) -> Result<usize, Box<dyn std::error::Error>> {
// Fetch pending deliveries from broker
let deliveries = fetch_pending_webhooks(config, client, agent).await?;
if deliveries.is_empty() {
return Ok(0);
}
info!(
"Processing {} pending webhook deliveries for agent {}",
deliveries.len(),
agent.name
);
let mut processed = 0;
for delivery in deliveries {
debug!(
"Delivering webhook {} (event: {}, attempt: {})",
delivery.id,
delivery.event_type,
delivery.attempts + 1
);
// Deliver the webhook
let result = deliver_webhook(&delivery).await;
// Report result to broker
if let Err(e) = report_delivery_result(config, client, delivery.id, &result).await {
error!(
"Failed to report delivery result for {}: {}",
delivery.id, e
);
// Continue processing other deliveries even if reporting fails
}
processed += 1;
if result.success {
info!(
"Webhook delivery {} succeeded in {}ms",
delivery.id, result.duration_ms
);
} else {
warn!(
"Webhook delivery {} failed: {:?}",
delivery.id,
result.error.as_deref().unwrap_or("unknown error")
);
}
}
Ok(processed)
}
}