Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

brokkr-agent::broker Rust

Broker communication module for agent-broker interaction.

For detailed documentation, see the Brokkr Documentation.

Functions

brokkr-agent::broker::wait_for_broker_ready

pub

#![allow(unused)]
fn main() {
async fn wait_for_broker_ready (config : & Settings)
}

Waits for the broker service to become ready.

Parameters:

NameTypeDescription
config-Application settings containing broker configuration
Source
#![allow(unused)]
fn main() {
pub async fn wait_for_broker_ready(config: &Settings) {
    let client = Client::new();
    let readyz_url = format!("{}/readyz", config.agent.broker_url);

    for attempt in 1..=config.agent.max_retries {
        match client.get(&readyz_url).send().await {
            Ok(response) => {
                if response.status().is_success() {
                    info!("Successfully connected to broker at {}", readyz_url);
                    return;
                }
                warn!(
                    "Broker at {} returned non-success status: {}",
                    readyz_url,
                    response.status()
                );
            }
            Err(e) => {
                warn!(
                    "Failed to connect to broker at {} (attempt {}/{}): {:?}",
                    readyz_url, attempt, config.agent.max_retries, e
                );
            }
        }
        if attempt < config.agent.max_retries {
            info!(
                "Waiting for broker to be ready at {} (attempt {}/{})",
                readyz_url, attempt, config.agent.max_retries
            );
            sleep(Duration::from_secs(1)).await;
        }
    }
    error!(
        "Failed to connect to broker at {} after {} attempts. Exiting.",
        readyz_url, config.agent.max_retries
    );
    std::process::exit(1);
}
}

brokkr-agent::broker::verify_agent_pak

pub

#![allow(unused)]
fn main() {
async fn verify_agent_pak (config : & Settings) -> Result < () , Box < dyn std :: error :: Error > >
}

Verifies the agent’s Personal Access Key (PAK) with the broker.

Parameters:

NameTypeDescription
config-Application settings containing the PAK

Returns:

  • Result<(), Box<dyn std::error::Error>> - Success or error with message
Source
#![allow(unused)]
fn main() {
pub async fn verify_agent_pak(config: &Settings) -> Result<(), Box<dyn std::error::Error>> {
    let url = format!("{}/api/v1/auth/pak", config.agent.broker_url);
    debug!("Verifying agent PAK at {}", url);

    let response = reqwest::Client::new()
        .post(&url)
        .header("Content-Type", "application/json")
        .header("Authorization", format!("Bearer {}", config.agent.pak))
        .body("{}") // Empty JSON body
        .send()
        .await
        .map_err(|e| {
            error!("Failed to send PAK verification request: {}", e);
            Box::new(e) as Box<dyn std::error::Error>
        })?;

    match response.status() {
        StatusCode::OK => {
            info!("Successfully verified agent PAK");
            Ok(())
        }
        StatusCode::UNAUTHORIZED => {
            error!("Agent PAK verification failed: unauthorized");
            Err("Invalid agent PAK".into())
        }
        status => {
            let error_body = response.text().await.unwrap_or_default();
            error!(
                "PAK verification failed with status {}: {}",
                status, error_body
            );
            Err(format!(
                "PAK verification failed. Status: {}, Body: {}",
                status, error_body
            )
            .into())
        }
    }
}
}

brokkr-agent::broker::fetch_agent_details

pub

#![allow(unused)]
fn main() {
async fn fetch_agent_details (config : & Settings , client : & Client ,) -> Result < Agent , Box < dyn std :: error :: Error > >
}

Fetches the details of the agent from the broker.

Parameters:

NameTypeDescription
config-Application settings containing broker configuration
client-HTTP client for making requests to the broker

Returns:

  • Result<Agent, Box<dyn std::error::Error>> - Agent details or error
Source
#![allow(unused)]
fn main() {
pub async fn fetch_agent_details(
    config: &Settings,
    client: &Client,
) -> Result<Agent, Box<dyn std::error::Error>> {
    let url = format!(
        "{}/api/v1/agents/?name={}&cluster_name={}",
        config.agent.broker_url, config.agent.agent_name, config.agent.cluster_name
    );
    debug!("Fetching agent details from {}", url);

    let response = client
        .get(&url)
        .header("Authorization", format!("Bearer {}", config.agent.pak))
        .send()
        .await
        .map_err(|e| {
            error!("Failed to fetch agent details: {}", e);
            Box::new(e) as Box<dyn std::error::Error>
        })?;

    match response.status() {
        StatusCode::OK => {
            let agent: Agent = response.json().await.map_err(|e| {
                error!("Failed to deserialize agent details: {}", e);
                Box::new(e) as Box<dyn std::error::Error>
            })?;

            info!(
                "Successfully fetched details for agent {} in cluster {}",
                agent.name, agent.cluster_name
            );

            Ok(agent)
        }
        StatusCode::NOT_FOUND => {
            error!(
                "Agent not found: name={}, cluster={}",
                config.agent.agent_name, config.agent.cluster_name
            );
            Err("Agent not found".into())
        }
        status => {
            let error_body = response.text().await.unwrap_or_default();
            error!(
                "Failed to fetch agent details. Status {}: {}",
                status, error_body
            );
            Err(format!(
                "Failed to fetch agent details. Status: {}, Body: {}",
                status, error_body
            )
            .into())
        }
    }
}
}

brokkr-agent::broker::fetch_and_process_deployment_objects

pub

#![allow(unused)]
fn main() {
async fn fetch_and_process_deployment_objects (config : & Settings , client : & Client , agent : & Agent ,) -> Result < Vec < DeploymentObject > , Box < dyn std :: error :: Error > >
}

Fetches and processes deployment objects from the Kubernetes cluster

Parameters:

NameTypeDescription
config-Application settings containing configuration parameters
client-HTTP client for making API requests
agent-Agent instance containing runtime context

Returns:

  • Result<Vec<DeploymentObject>> - A vector of processed deployment objects if successful

Raises:

ExceptionDescription
ErrorReturns an error if:
ErrorFailed to fetch deployments from the cluster
ErrorFailed to process deployment objects
Source
#![allow(unused)]
fn main() {
pub async fn fetch_and_process_deployment_objects(
    config: &Settings,
    client: &Client,
    agent: &Agent,
) -> Result<Vec<DeploymentObject>, Box<dyn std::error::Error>> {
    let url = format!(
        "{}/api/v1/agents/{}/target-state",
        config.agent.broker_url, agent.id
    );

    debug!("Fetching deployment objects from {}", url);

    let start = Instant::now();
    let response = client
        .get(&url)
        .header("Authorization", format!("Bearer {}", config.agent.pak))
        .send()
        .await
        .map_err(|e| {
            error!("Failed to send request to broker: {}", e);
            metrics::poll_requests_total()
                .with_label_values(&["error"])
                .inc();
            metrics::poll_duration_seconds()
                .with_label_values(&[])
                .observe(start.elapsed().as_secs_f64());
            Box::new(e) as Box<dyn std::error::Error>
        })?;

    let duration = start.elapsed().as_secs_f64();
    match response.status() {
        StatusCode::OK => {
            let deployment_objects: Vec<DeploymentObject> = response.json().await.map_err(|e| {
                error!("Failed to deserialize deployment objects: {}", e);
                Box::new(e) as Box<dyn std::error::Error>
            })?;

            info!(
                "Successfully fetched {} deployment objects for agent {}",
                deployment_objects.len(),
                agent.name
            );

            metrics::poll_requests_total()
                .with_label_values(&["success"])
                .inc();
            metrics::poll_duration_seconds()
                .with_label_values(&[])
                .observe(duration);
            metrics::last_successful_poll_timestamp().set(
                std::time::SystemTime::now()
                    .duration_since(std::time::UNIX_EPOCH)
                    .unwrap_or_default()
                    .as_secs_f64(),
            );

            Ok(deployment_objects)
        }
        status => {
            let error_body = response.text().await.unwrap_or_default();
            error!(
                "Broker request failed with status {}: {}",
                status, error_body
            );
            metrics::poll_requests_total()
                .with_label_values(&["error"])
                .inc();
            metrics::poll_duration_seconds()
                .with_label_values(&[])
                .observe(duration);
            Err(format!(
                "Broker request failed. Status: {}, Body: {}",
                status, error_body
            )
            .into())
        }
    }
}
}

brokkr-agent::broker::send_success_event

pub

#![allow(unused)]
fn main() {
async fn send_success_event (config : & Settings , client : & Client , agent : & Agent , deployment_object_id : Uuid , message : Option < String > ,) -> Result < () , Box < dyn std :: error :: Error > >
}

Sends a success event to the broker for the given deployment object.

Parameters:

NameTypeDescription
config-Application settings containing broker configuration
client-HTTP client for making requests to the broker
agent-Agent details
deployment_object_id-ID of the deployment object
message-Optional message to include in the event

Returns:

  • Result<(), Box<dyn std::error::Error>> - Success or error with message
Source
#![allow(unused)]
fn main() {
pub async fn send_success_event(
    config: &Settings,
    client: &Client,
    agent: &Agent,
    deployment_object_id: Uuid,
    message: Option<String>,
) -> Result<(), Box<dyn std::error::Error>> {
    let url = format!(
        "{}/api/v1/agents/{}/events",
        config.agent.broker_url, agent.id
    );
    debug!(
        "Sending success event for deployment {} to {}",
        deployment_object_id, url
    );

    let event = NewAgentEvent {
        agent_id: agent.id,
        deployment_object_id,
        event_type: "DEPLOY".to_string(),
        status: "SUCCESS".to_string(),
        message,
    };

    let response = client
        .post(&url)
        .header("Authorization", format!("Bearer {}", config.agent.pak))
        .json(&event)
        .send()
        .await
        .map_err(|e| {
            error!("Failed to send success event: {}", e);
            Box::new(e) as Box<dyn std::error::Error>
        })?;

    match response.status() {
        StatusCode::OK | StatusCode::CREATED => {
            info!(
                "Successfully reported deployment success for object {}",
                deployment_object_id
            );
            Ok(())
        }
        status => {
            let error_body = response.text().await.unwrap_or_default();
            error!(
                "Failed to send success event. Status {}: {}",
                status, error_body
            );
            Err(format!(
                "Failed to send success event. Status: {}, Body: {}",
                status, error_body
            )
            .into())
        }
    }
}
}

brokkr-agent::broker::send_failure_event

pub

#![allow(unused)]
fn main() {
async fn send_failure_event (config : & Settings , client : & Client , agent : & Agent , deployment_object_id : Uuid , error_message : String ,) -> Result < () , Box < dyn std :: error :: Error > >
}

Sends a failure event to the broker for the given deployment object.

Parameters:

NameTypeDescription
config-Application settings containing broker configuration
client-HTTP client for making requests to the broker
agent-Agent details
deployment_object_id-ID of the deployment object
error_message-Message to include in the event

Returns:

  • Result<(), Box<dyn std::error::Error>> - Success or error with message
Source
#![allow(unused)]
fn main() {
pub async fn send_failure_event(
    config: &Settings,
    client: &Client,
    agent: &Agent,
    deployment_object_id: Uuid,
    error_message: String,
) -> Result<(), Box<dyn std::error::Error>> {
    let url = format!(
        "{}/api/v1/agents/{}/events",
        config.agent.broker_url, agent.id
    );
    debug!(
        "Sending failure event for deployment {} to {}",
        deployment_object_id, url
    );

    let event = NewAgentEvent {
        agent_id: agent.id,
        deployment_object_id,
        event_type: "DEPLOY".to_string(),
        status: "FAILURE".to_string(),
        message: Some(error_message),
    };

    let response = client
        .post(&url)
        .header("Authorization", format!("Bearer {}", config.agent.pak))
        .json(&event)
        .send()
        .await
        .map_err(|e| {
            error!(
                "Failed to send failure event for deployment {}: {}",
                deployment_object_id, e
            );
            Box::new(e) as Box<dyn std::error::Error>
        })?;

    match response.status() {
        StatusCode::OK | StatusCode::CREATED => {
            info!(
                "Successfully reported deployment failure for object {}",
                deployment_object_id
            );
            Ok(())
        }
        status => {
            let error_body = response.text().await.unwrap_or_default();
            error!(
                "Failed to send failure event. Status {}: {}",
                status, error_body
            );
            Err(format!(
                "Failed to send failure event. Status: {}, Body: {}",
                status, error_body
            )
            .into())
        }
    }
}
}

brokkr-agent::broker::send_heartbeat

pub

#![allow(unused)]
fn main() {
async fn send_heartbeat (config : & Settings , client : & Client , agent : & Agent ,) -> Result < () , Box < dyn std :: error :: Error > >
}

Sends a heartbeat event to the broker for the given agent.

Parameters:

NameTypeDescription
config-Application settings containing broker configuration
client-HTTP client for making requests to the broker
agent-Agent details

Returns:

  • Result<(), Box<dyn std::error::Error>> - Success or error with message
Source
#![allow(unused)]
fn main() {
pub async fn send_heartbeat(
    config: &Settings,
    client: &Client,
    agent: &Agent,
) -> Result<(), Box<dyn std::error::Error>> {
    let url = format!(
        "{}/api/v1/agents/{}/heartbeat",
        config.agent.broker_url, agent.id
    );

    let _start = Instant::now();
    let response = client
        .post(&url)
        .header("Authorization", format!("Bearer {}", config.agent.pak))
        .send()
        .await
        .map_err(|e| {
            error!("Failed to send heartbeat for agent {}: {}", agent.name, e);
            metrics::heartbeat_sent_total().inc();
            Box::new(e) as Box<dyn std::error::Error>
        })?;

    match response.status() {
        StatusCode::OK | StatusCode::NO_CONTENT => {
            trace!("Heartbeat sent successfully for agent {}", agent.name);
            metrics::heartbeat_sent_total().inc();
            metrics::last_successful_poll_timestamp().set(
                std::time::SystemTime::now()
                    .duration_since(std::time::UNIX_EPOCH)
                    .unwrap_or_default()
                    .as_secs_f64(),
            );
            Ok(())
        }
        StatusCode::UNAUTHORIZED => {
            error!("Heartbeat unauthorized for agent {}", agent.name);
            Err("Unauthorized: Invalid agent PAK".into())
        }
        status => {
            let error_body = response.text().await.unwrap_or_default();
            error!(
                "Heartbeat failed for agent {}. Status {}: {}",
                agent.name, status, error_body
            );
            Err(format!("Heartbeat failed. Status: {}, Body: {}", status, error_body).into())
        }
    }
}
}

brokkr-agent::broker::send_health_status

pub

#![allow(unused)]
fn main() {
async fn send_health_status (config : & Settings , client : & Client , agent : & Agent , health_updates : Vec < DeploymentObjectHealthUpdate > ,) -> Result < () , Box < dyn std :: error :: Error > >
}

Sends health status updates for deployment objects to the broker.

Parameters:

NameTypeDescription
config-Application settings containing broker configuration
client-HTTP client for making requests to the broker
agent-Agent details
health_updates-List of deployment object health updates

Returns:

  • Result<(), Box<dyn std::error::Error>> - Success or error with message
Source
#![allow(unused)]
fn main() {
pub async fn send_health_status(
    config: &Settings,
    client: &Client,
    agent: &Agent,
    health_updates: Vec<DeploymentObjectHealthUpdate>,
) -> Result<(), Box<dyn std::error::Error>> {
    if health_updates.is_empty() {
        return Ok(());
    }

    let url = format!(
        "{}/api/v1/agents/{}/health-status",
        config.agent.broker_url, agent.id
    );

    debug!(
        "Sending health status update for {} deployment objects for agent {}",
        health_updates.len(),
        agent.name
    );

    let update = HealthStatusUpdate {
        deployment_objects: health_updates,
    };

    let response = client
        .patch(&url)
        .header("Authorization", format!("Bearer {}", config.agent.pak))
        .json(&update)
        .send()
        .await
        .map_err(|e| {
            error!(
                "Failed to send health status for agent {}: {}",
                agent.name, e
            );
            Box::new(e) as Box<dyn std::error::Error>
        })?;

    match response.status() {
        StatusCode::OK | StatusCode::NO_CONTENT => {
            debug!(
                "Successfully sent health status for {} deployment objects",
                update.deployment_objects.len()
            );
            Ok(())
        }
        StatusCode::UNAUTHORIZED => {
            error!("Health status update unauthorized for agent {}", agent.name);
            Err("Unauthorized: Invalid agent PAK".into())
        }
        status => {
            let error_body = response.text().await.unwrap_or_default();
            error!(
                "Health status update failed for agent {}. Status {}: {}",
                agent.name, status, error_body
            );
            Err(format!(
                "Health status update failed. Status: {}, Body: {}",
                status, error_body
            )
            .into())
        }
    }
}
}

brokkr-agent::broker::fetch_pending_diagnostics

pub

#![allow(unused)]
fn main() {
async fn fetch_pending_diagnostics (config : & Settings , client : & Client , agent : & Agent ,) -> Result < Vec < DiagnosticRequest > , Box < dyn std :: error :: Error > >
}

Fetches pending diagnostic requests for the agent.

Parameters:

NameTypeDescription
config-Application settings containing broker configuration
client-HTTP client for making requests to the broker
agent-Agent details

Returns:

  • Result<Vec<DiagnosticRequest>, Box<dyn std::error::Error>> - Pending diagnostic requests
Source
#![allow(unused)]
fn main() {
pub async fn fetch_pending_diagnostics(
    config: &Settings,
    client: &Client,
    agent: &Agent,
) -> Result<Vec<DiagnosticRequest>, Box<dyn std::error::Error>> {
    let url = format!(
        "{}/api/v1/agents/{}/diagnostics/pending",
        config.agent.broker_url, agent.id
    );

    debug!("Fetching pending diagnostics from {}", url);

    let response = client
        .get(&url)
        .header("Authorization", format!("Bearer {}", config.agent.pak))
        .send()
        .await
        .map_err(|e| {
            error!("Failed to fetch pending diagnostics: {}", e);
            Box::new(e) as Box<dyn std::error::Error>
        })?;

    match response.status() {
        StatusCode::OK => {
            let requests: Vec<DiagnosticRequest> = response.json().await.map_err(|e| {
                error!("Failed to deserialize diagnostic requests: {}", e);
                Box::new(e) as Box<dyn std::error::Error>
            })?;

            if !requests.is_empty() {
                debug!(
                    "Found {} pending diagnostic requests for agent {}",
                    requests.len(),
                    agent.name
                );
            }

            Ok(requests)
        }
        status => {
            let error_body = response.text().await.unwrap_or_default();
            error!(
                "Failed to fetch pending diagnostics. Status {}: {}",
                status, error_body
            );
            Err(format!(
                "Failed to fetch pending diagnostics. Status: {}, Body: {}",
                status, error_body
            )
            .into())
        }
    }
}
}

brokkr-agent::broker::claim_diagnostic_request

pub

#![allow(unused)]
fn main() {
async fn claim_diagnostic_request (config : & Settings , client : & Client , request_id : Uuid ,) -> Result < DiagnosticRequest , Box < dyn std :: error :: Error > >
}

Claims a diagnostic request for processing.

Parameters:

NameTypeDescription
config-Application settings containing broker configuration
client-HTTP client for making requests to the broker
request_id-ID of the diagnostic request to claim

Returns:

  • Result<DiagnosticRequest, Box<dyn std::error::Error>> - The claimed request
Source
#![allow(unused)]
fn main() {
pub async fn claim_diagnostic_request(
    config: &Settings,
    client: &Client,
    request_id: Uuid,
) -> Result<DiagnosticRequest, Box<dyn std::error::Error>> {
    let url = format!(
        "{}/api/v1/diagnostics/{}/claim",
        config.agent.broker_url, request_id
    );

    debug!("Claiming diagnostic request {}", request_id);

    let response = client
        .post(&url)
        .header("Authorization", format!("Bearer {}", config.agent.pak))
        .send()
        .await
        .map_err(|e| {
            error!("Failed to claim diagnostic request {}: {}", request_id, e);
            Box::new(e) as Box<dyn std::error::Error>
        })?;

    match response.status() {
        StatusCode::OK => {
            let request: DiagnosticRequest = response.json().await.map_err(|e| {
                error!("Failed to deserialize claimed diagnostic request: {}", e);
                Box::new(e) as Box<dyn std::error::Error>
            })?;

            info!("Successfully claimed diagnostic request {}", request_id);
            Ok(request)
        }
        StatusCode::CONFLICT => {
            warn!(
                "Diagnostic request {} already claimed or completed",
                request_id
            );
            Err(format!(
                "Diagnostic request {} already claimed or completed",
                request_id
            )
            .into())
        }
        status => {
            let error_body = response.text().await.unwrap_or_default();
            error!(
                "Failed to claim diagnostic request {}. Status {}: {}",
                request_id, status, error_body
            );
            Err(format!(
                "Failed to claim diagnostic request. Status: {}, Body: {}",
                status, error_body
            )
            .into())
        }
    }
}
}

brokkr-agent::broker::submit_diagnostic_result

pub

#![allow(unused)]
fn main() {
async fn submit_diagnostic_result (config : & Settings , client : & Client , request_id : Uuid , result : SubmitDiagnosticResult ,) -> Result < () , Box < dyn std :: error :: Error > >
}

Submits diagnostic results for a request.

Parameters:

NameTypeDescription
config-Application settings containing broker configuration
client-HTTP client for making requests to the broker
request_id-ID of the diagnostic request
result-The diagnostic result to submit

Returns:

  • Result<(), Box<dyn std::error::Error>> - Success or error
Source
#![allow(unused)]
fn main() {
pub async fn submit_diagnostic_result(
    config: &Settings,
    client: &Client,
    request_id: Uuid,
    result: SubmitDiagnosticResult,
) -> Result<(), Box<dyn std::error::Error>> {
    let url = format!(
        "{}/api/v1/diagnostics/{}/result",
        config.agent.broker_url, request_id
    );

    debug!("Submitting diagnostic result for request {}", request_id);

    let response = client
        .post(&url)
        .header("Authorization", format!("Bearer {}", config.agent.pak))
        .json(&result)
        .send()
        .await
        .map_err(|e| {
            error!(
                "Failed to submit diagnostic result for request {}: {}",
                request_id, e
            );
            Box::new(e) as Box<dyn std::error::Error>
        })?;

    match response.status() {
        StatusCode::OK | StatusCode::CREATED => {
            info!(
                "Successfully submitted diagnostic result for request {}",
                request_id
            );
            Ok(())
        }
        status => {
            let error_body = response.text().await.unwrap_or_default();
            error!(
                "Failed to submit diagnostic result for request {}. Status {}: {}",
                request_id, status, error_body
            );
            Err(format!(
                "Failed to submit diagnostic result. Status: {}, Body: {}",
                status, error_body
            )
            .into())
        }
    }
}
}