brokkr-agent::broker Rust
Broker communication module for agent-broker interaction.
For detailed documentation, see the Brokkr Documentation.
Functions
brokkr-agent::broker::wait_for_broker_ready
pub
#![allow(unused)]
fn main() {
async fn wait_for_broker_ready (config : & Settings)
}
Waits for the broker service to become ready.
Parameters:
| Name | Type | Description |
|---|---|---|
config | - | Application settings containing broker configuration |
Source
#![allow(unused)]
fn main() {
pub async fn wait_for_broker_ready(config: &Settings) {
let client = Client::new();
let readyz_url = format!("{}/readyz", config.agent.broker_url);
for attempt in 1..=config.agent.max_retries {
match client.get(&readyz_url).send().await {
Ok(response) => {
if response.status().is_success() {
info!("Successfully connected to broker at {}", readyz_url);
return;
}
warn!(
"Broker at {} returned non-success status: {}",
readyz_url,
response.status()
);
}
Err(e) => {
warn!(
"Failed to connect to broker at {} (attempt {}/{}): {:?}",
readyz_url, attempt, config.agent.max_retries, e
);
}
}
if attempt < config.agent.max_retries {
info!(
"Waiting for broker to be ready at {} (attempt {}/{})",
readyz_url, attempt, config.agent.max_retries
);
sleep(Duration::from_secs(1)).await;
}
}
error!(
"Failed to connect to broker at {} after {} attempts. Exiting.",
readyz_url, config.agent.max_retries
);
std::process::exit(1);
}
}
brokkr-agent::broker::verify_agent_pak
pub
#![allow(unused)]
fn main() {
async fn verify_agent_pak (config : & Settings) -> Result < () , Box < dyn std :: error :: Error > >
}
Verifies the agent’s Personal Access Key (PAK) with the broker.
Parameters:
| Name | Type | Description |
|---|---|---|
config | - | Application settings containing the PAK |
Returns:
Result<(), Box<dyn std::error::Error>>- Success or error with message
Source
#![allow(unused)]
fn main() {
pub async fn verify_agent_pak(config: &Settings) -> Result<(), Box<dyn std::error::Error>> {
let url = format!("{}/api/v1/auth/pak", config.agent.broker_url);
debug!("Verifying agent PAK at {}", url);
let response = reqwest::Client::new()
.post(&url)
.header("Content-Type", "application/json")
.header("Authorization", format!("Bearer {}", config.agent.pak))
.body("{}") // Empty JSON body
.send()
.await
.map_err(|e| {
error!("Failed to send PAK verification request: {}", e);
Box::new(e) as Box<dyn std::error::Error>
})?;
match response.status() {
StatusCode::OK => {
info!("Successfully verified agent PAK");
Ok(())
}
StatusCode::UNAUTHORIZED => {
error!("Agent PAK verification failed: unauthorized");
Err("Invalid agent PAK".into())
}
status => {
let error_body = response.text().await.unwrap_or_default();
error!(
"PAK verification failed with status {}: {}",
status, error_body
);
Err(format!(
"PAK verification failed. Status: {}, Body: {}",
status, error_body
)
.into())
}
}
}
}
brokkr-agent::broker::fetch_agent_details
pub
#![allow(unused)]
fn main() {
async fn fetch_agent_details (config : & Settings , client : & Client ,) -> Result < Agent , Box < dyn std :: error :: Error > >
}
Fetches the details of the agent from the broker.
Parameters:
| Name | Type | Description |
|---|---|---|
config | - | Application settings containing broker configuration |
client | - | HTTP client for making requests to the broker |
Returns:
Result<Agent, Box<dyn std::error::Error>>- Agent details or error
Source
#![allow(unused)]
fn main() {
pub async fn fetch_agent_details(
config: &Settings,
client: &Client,
) -> Result<Agent, Box<dyn std::error::Error>> {
let url = format!(
"{}/api/v1/agents/?name={}&cluster_name={}",
config.agent.broker_url, config.agent.agent_name, config.agent.cluster_name
);
debug!("Fetching agent details from {}", url);
let response = client
.get(&url)
.header("Authorization", format!("Bearer {}", config.agent.pak))
.send()
.await
.map_err(|e| {
error!("Failed to fetch agent details: {}", e);
Box::new(e) as Box<dyn std::error::Error>
})?;
match response.status() {
StatusCode::OK => {
let agent: Agent = response.json().await.map_err(|e| {
error!("Failed to deserialize agent details: {}", e);
Box::new(e) as Box<dyn std::error::Error>
})?;
info!(
"Successfully fetched details for agent {} in cluster {}",
agent.name, agent.cluster_name
);
Ok(agent)
}
StatusCode::NOT_FOUND => {
error!(
"Agent not found: name={}, cluster={}",
config.agent.agent_name, config.agent.cluster_name
);
Err("Agent not found".into())
}
status => {
let error_body = response.text().await.unwrap_or_default();
error!(
"Failed to fetch agent details. Status {}: {}",
status, error_body
);
Err(format!(
"Failed to fetch agent details. Status: {}, Body: {}",
status, error_body
)
.into())
}
}
}
}
brokkr-agent::broker::fetch_and_process_deployment_objects
pub
#![allow(unused)]
fn main() {
async fn fetch_and_process_deployment_objects (config : & Settings , client : & Client , agent : & Agent ,) -> Result < Vec < DeploymentObject > , Box < dyn std :: error :: Error > >
}
Fetches and processes deployment objects from the Kubernetes cluster
Parameters:
| Name | Type | Description |
|---|---|---|
config | - | Application settings containing configuration parameters |
client | - | HTTP client for making API requests |
agent | - | Agent instance containing runtime context |
Returns:
Result<Vec<DeploymentObject>>- A vector of processed deployment objects if successful
Raises:
| Exception | Description |
|---|---|
Error | Returns an error if: |
Error | Failed to fetch deployments from the cluster |
Error | Failed to process deployment objects |
Source
#![allow(unused)]
fn main() {
pub async fn fetch_and_process_deployment_objects(
config: &Settings,
client: &Client,
agent: &Agent,
) -> Result<Vec<DeploymentObject>, Box<dyn std::error::Error>> {
let url = format!(
"{}/api/v1/agents/{}/target-state",
config.agent.broker_url, agent.id
);
debug!("Fetching deployment objects from {}", url);
let start = Instant::now();
let response = client
.get(&url)
.header("Authorization", format!("Bearer {}", config.agent.pak))
.send()
.await
.map_err(|e| {
error!("Failed to send request to broker: {}", e);
metrics::poll_requests_total()
.with_label_values(&["error"])
.inc();
metrics::poll_duration_seconds()
.with_label_values(&[])
.observe(start.elapsed().as_secs_f64());
Box::new(e) as Box<dyn std::error::Error>
})?;
let duration = start.elapsed().as_secs_f64();
match response.status() {
StatusCode::OK => {
let deployment_objects: Vec<DeploymentObject> = response.json().await.map_err(|e| {
error!("Failed to deserialize deployment objects: {}", e);
Box::new(e) as Box<dyn std::error::Error>
})?;
info!(
"Successfully fetched {} deployment objects for agent {}",
deployment_objects.len(),
agent.name
);
metrics::poll_requests_total()
.with_label_values(&["success"])
.inc();
metrics::poll_duration_seconds()
.with_label_values(&[])
.observe(duration);
metrics::last_successful_poll_timestamp().set(
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs_f64(),
);
Ok(deployment_objects)
}
status => {
let error_body = response.text().await.unwrap_or_default();
error!(
"Broker request failed with status {}: {}",
status, error_body
);
metrics::poll_requests_total()
.with_label_values(&["error"])
.inc();
metrics::poll_duration_seconds()
.with_label_values(&[])
.observe(duration);
Err(format!(
"Broker request failed. Status: {}, Body: {}",
status, error_body
)
.into())
}
}
}
}
brokkr-agent::broker::send_success_event
pub
#![allow(unused)]
fn main() {
async fn send_success_event (config : & Settings , client : & Client , agent : & Agent , deployment_object_id : Uuid , message : Option < String > ,) -> Result < () , Box < dyn std :: error :: Error > >
}
Sends a success event to the broker for the given deployment object.
Parameters:
| Name | Type | Description |
|---|---|---|
config | - | Application settings containing broker configuration |
client | - | HTTP client for making requests to the broker |
agent | - | Agent details |
deployment_object_id | - | ID of the deployment object |
message | - | Optional message to include in the event |
Returns:
Result<(), Box<dyn std::error::Error>>- Success or error with message
Source
#![allow(unused)]
fn main() {
pub async fn send_success_event(
config: &Settings,
client: &Client,
agent: &Agent,
deployment_object_id: Uuid,
message: Option<String>,
) -> Result<(), Box<dyn std::error::Error>> {
let url = format!(
"{}/api/v1/agents/{}/events",
config.agent.broker_url, agent.id
);
debug!(
"Sending success event for deployment {} to {}",
deployment_object_id, url
);
let event = NewAgentEvent {
agent_id: agent.id,
deployment_object_id,
event_type: "DEPLOY".to_string(),
status: "SUCCESS".to_string(),
message,
};
let response = client
.post(&url)
.header("Authorization", format!("Bearer {}", config.agent.pak))
.json(&event)
.send()
.await
.map_err(|e| {
error!("Failed to send success event: {}", e);
Box::new(e) as Box<dyn std::error::Error>
})?;
match response.status() {
StatusCode::OK | StatusCode::CREATED => {
info!(
"Successfully reported deployment success for object {}",
deployment_object_id
);
Ok(())
}
status => {
let error_body = response.text().await.unwrap_or_default();
error!(
"Failed to send success event. Status {}: {}",
status, error_body
);
Err(format!(
"Failed to send success event. Status: {}, Body: {}",
status, error_body
)
.into())
}
}
}
}
brokkr-agent::broker::send_failure_event
pub
#![allow(unused)]
fn main() {
async fn send_failure_event (config : & Settings , client : & Client , agent : & Agent , deployment_object_id : Uuid , error_message : String ,) -> Result < () , Box < dyn std :: error :: Error > >
}
Sends a failure event to the broker for the given deployment object.
Parameters:
| Name | Type | Description |
|---|---|---|
config | - | Application settings containing broker configuration |
client | - | HTTP client for making requests to the broker |
agent | - | Agent details |
deployment_object_id | - | ID of the deployment object |
error_message | - | Message to include in the event |
Returns:
Result<(), Box<dyn std::error::Error>>- Success or error with message
Source
#![allow(unused)]
fn main() {
pub async fn send_failure_event(
config: &Settings,
client: &Client,
agent: &Agent,
deployment_object_id: Uuid,
error_message: String,
) -> Result<(), Box<dyn std::error::Error>> {
let url = format!(
"{}/api/v1/agents/{}/events",
config.agent.broker_url, agent.id
);
debug!(
"Sending failure event for deployment {} to {}",
deployment_object_id, url
);
let event = NewAgentEvent {
agent_id: agent.id,
deployment_object_id,
event_type: "DEPLOY".to_string(),
status: "FAILURE".to_string(),
message: Some(error_message),
};
let response = client
.post(&url)
.header("Authorization", format!("Bearer {}", config.agent.pak))
.json(&event)
.send()
.await
.map_err(|e| {
error!(
"Failed to send failure event for deployment {}: {}",
deployment_object_id, e
);
Box::new(e) as Box<dyn std::error::Error>
})?;
match response.status() {
StatusCode::OK | StatusCode::CREATED => {
info!(
"Successfully reported deployment failure for object {}",
deployment_object_id
);
Ok(())
}
status => {
let error_body = response.text().await.unwrap_or_default();
error!(
"Failed to send failure event. Status {}: {}",
status, error_body
);
Err(format!(
"Failed to send failure event. Status: {}, Body: {}",
status, error_body
)
.into())
}
}
}
}
brokkr-agent::broker::send_heartbeat
pub
#![allow(unused)]
fn main() {
async fn send_heartbeat (config : & Settings , client : & Client , agent : & Agent ,) -> Result < () , Box < dyn std :: error :: Error > >
}
Sends a heartbeat event to the broker for the given agent.
Parameters:
| Name | Type | Description |
|---|---|---|
config | - | Application settings containing broker configuration |
client | - | HTTP client for making requests to the broker |
agent | - | Agent details |
Returns:
Result<(), Box<dyn std::error::Error>>- Success or error with message
Source
#![allow(unused)]
fn main() {
pub async fn send_heartbeat(
config: &Settings,
client: &Client,
agent: &Agent,
) -> Result<(), Box<dyn std::error::Error>> {
let url = format!(
"{}/api/v1/agents/{}/heartbeat",
config.agent.broker_url, agent.id
);
let _start = Instant::now();
let response = client
.post(&url)
.header("Authorization", format!("Bearer {}", config.agent.pak))
.send()
.await
.map_err(|e| {
error!("Failed to send heartbeat for agent {}: {}", agent.name, e);
metrics::heartbeat_sent_total().inc();
Box::new(e) as Box<dyn std::error::Error>
})?;
match response.status() {
StatusCode::OK | StatusCode::NO_CONTENT => {
trace!("Heartbeat sent successfully for agent {}", agent.name);
metrics::heartbeat_sent_total().inc();
metrics::last_successful_poll_timestamp().set(
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs_f64(),
);
Ok(())
}
StatusCode::UNAUTHORIZED => {
error!("Heartbeat unauthorized for agent {}", agent.name);
Err("Unauthorized: Invalid agent PAK".into())
}
status => {
let error_body = response.text().await.unwrap_or_default();
error!(
"Heartbeat failed for agent {}. Status {}: {}",
agent.name, status, error_body
);
Err(format!("Heartbeat failed. Status: {}, Body: {}", status, error_body).into())
}
}
}
}
brokkr-agent::broker::send_health_status
pub
#![allow(unused)]
fn main() {
async fn send_health_status (config : & Settings , client : & Client , agent : & Agent , health_updates : Vec < DeploymentObjectHealthUpdate > ,) -> Result < () , Box < dyn std :: error :: Error > >
}
Sends health status updates for deployment objects to the broker.
Parameters:
| Name | Type | Description |
|---|---|---|
config | - | Application settings containing broker configuration |
client | - | HTTP client for making requests to the broker |
agent | - | Agent details |
health_updates | - | List of deployment object health updates |
Returns:
Result<(), Box<dyn std::error::Error>>- Success or error with message
Source
#![allow(unused)]
fn main() {
pub async fn send_health_status(
config: &Settings,
client: &Client,
agent: &Agent,
health_updates: Vec<DeploymentObjectHealthUpdate>,
) -> Result<(), Box<dyn std::error::Error>> {
if health_updates.is_empty() {
return Ok(());
}
let url = format!(
"{}/api/v1/agents/{}/health-status",
config.agent.broker_url, agent.id
);
debug!(
"Sending health status update for {} deployment objects for agent {}",
health_updates.len(),
agent.name
);
let update = HealthStatusUpdate {
deployment_objects: health_updates,
};
let response = client
.patch(&url)
.header("Authorization", format!("Bearer {}", config.agent.pak))
.json(&update)
.send()
.await
.map_err(|e| {
error!(
"Failed to send health status for agent {}: {}",
agent.name, e
);
Box::new(e) as Box<dyn std::error::Error>
})?;
match response.status() {
StatusCode::OK | StatusCode::NO_CONTENT => {
debug!(
"Successfully sent health status for {} deployment objects",
update.deployment_objects.len()
);
Ok(())
}
StatusCode::UNAUTHORIZED => {
error!("Health status update unauthorized for agent {}", agent.name);
Err("Unauthorized: Invalid agent PAK".into())
}
status => {
let error_body = response.text().await.unwrap_or_default();
error!(
"Health status update failed for agent {}. Status {}: {}",
agent.name, status, error_body
);
Err(format!(
"Health status update failed. Status: {}, Body: {}",
status, error_body
)
.into())
}
}
}
}
brokkr-agent::broker::fetch_pending_diagnostics
pub
#![allow(unused)]
fn main() {
async fn fetch_pending_diagnostics (config : & Settings , client : & Client , agent : & Agent ,) -> Result < Vec < DiagnosticRequest > , Box < dyn std :: error :: Error > >
}
Fetches pending diagnostic requests for the agent.
Parameters:
| Name | Type | Description |
|---|---|---|
config | - | Application settings containing broker configuration |
client | - | HTTP client for making requests to the broker |
agent | - | Agent details |
Returns:
Result<Vec<DiagnosticRequest>, Box<dyn std::error::Error>>- Pending diagnostic requests
Source
#![allow(unused)]
fn main() {
pub async fn fetch_pending_diagnostics(
config: &Settings,
client: &Client,
agent: &Agent,
) -> Result<Vec<DiagnosticRequest>, Box<dyn std::error::Error>> {
let url = format!(
"{}/api/v1/agents/{}/diagnostics/pending",
config.agent.broker_url, agent.id
);
debug!("Fetching pending diagnostics from {}", url);
let response = client
.get(&url)
.header("Authorization", format!("Bearer {}", config.agent.pak))
.send()
.await
.map_err(|e| {
error!("Failed to fetch pending diagnostics: {}", e);
Box::new(e) as Box<dyn std::error::Error>
})?;
match response.status() {
StatusCode::OK => {
let requests: Vec<DiagnosticRequest> = response.json().await.map_err(|e| {
error!("Failed to deserialize diagnostic requests: {}", e);
Box::new(e) as Box<dyn std::error::Error>
})?;
if !requests.is_empty() {
debug!(
"Found {} pending diagnostic requests for agent {}",
requests.len(),
agent.name
);
}
Ok(requests)
}
status => {
let error_body = response.text().await.unwrap_or_default();
error!(
"Failed to fetch pending diagnostics. Status {}: {}",
status, error_body
);
Err(format!(
"Failed to fetch pending diagnostics. Status: {}, Body: {}",
status, error_body
)
.into())
}
}
}
}
brokkr-agent::broker::claim_diagnostic_request
pub
#![allow(unused)]
fn main() {
async fn claim_diagnostic_request (config : & Settings , client : & Client , request_id : Uuid ,) -> Result < DiagnosticRequest , Box < dyn std :: error :: Error > >
}
Claims a diagnostic request for processing.
Parameters:
| Name | Type | Description |
|---|---|---|
config | - | Application settings containing broker configuration |
client | - | HTTP client for making requests to the broker |
request_id | - | ID of the diagnostic request to claim |
Returns:
Result<DiagnosticRequest, Box<dyn std::error::Error>>- The claimed request
Source
#![allow(unused)]
fn main() {
pub async fn claim_diagnostic_request(
config: &Settings,
client: &Client,
request_id: Uuid,
) -> Result<DiagnosticRequest, Box<dyn std::error::Error>> {
let url = format!(
"{}/api/v1/diagnostics/{}/claim",
config.agent.broker_url, request_id
);
debug!("Claiming diagnostic request {}", request_id);
let response = client
.post(&url)
.header("Authorization", format!("Bearer {}", config.agent.pak))
.send()
.await
.map_err(|e| {
error!("Failed to claim diagnostic request {}: {}", request_id, e);
Box::new(e) as Box<dyn std::error::Error>
})?;
match response.status() {
StatusCode::OK => {
let request: DiagnosticRequest = response.json().await.map_err(|e| {
error!("Failed to deserialize claimed diagnostic request: {}", e);
Box::new(e) as Box<dyn std::error::Error>
})?;
info!("Successfully claimed diagnostic request {}", request_id);
Ok(request)
}
StatusCode::CONFLICT => {
warn!(
"Diagnostic request {} already claimed or completed",
request_id
);
Err(format!(
"Diagnostic request {} already claimed or completed",
request_id
)
.into())
}
status => {
let error_body = response.text().await.unwrap_or_default();
error!(
"Failed to claim diagnostic request {}. Status {}: {}",
request_id, status, error_body
);
Err(format!(
"Failed to claim diagnostic request. Status: {}, Body: {}",
status, error_body
)
.into())
}
}
}
}
brokkr-agent::broker::submit_diagnostic_result
pub
#![allow(unused)]
fn main() {
async fn submit_diagnostic_result (config : & Settings , client : & Client , request_id : Uuid , result : SubmitDiagnosticResult ,) -> Result < () , Box < dyn std :: error :: Error > >
}
Submits diagnostic results for a request.
Parameters:
| Name | Type | Description |
|---|---|---|
config | - | Application settings containing broker configuration |
client | - | HTTP client for making requests to the broker |
request_id | - | ID of the diagnostic request |
result | - | The diagnostic result to submit |
Returns:
Result<(), Box<dyn std::error::Error>>- Success or error
Source
#![allow(unused)]
fn main() {
pub async fn submit_diagnostic_result(
config: &Settings,
client: &Client,
request_id: Uuid,
result: SubmitDiagnosticResult,
) -> Result<(), Box<dyn std::error::Error>> {
let url = format!(
"{}/api/v1/diagnostics/{}/result",
config.agent.broker_url, request_id
);
debug!("Submitting diagnostic result for request {}", request_id);
let response = client
.post(&url)
.header("Authorization", format!("Bearer {}", config.agent.pak))
.json(&result)
.send()
.await
.map_err(|e| {
error!(
"Failed to submit diagnostic result for request {}: {}",
request_id, e
);
Box::new(e) as Box<dyn std::error::Error>
})?;
match response.status() {
StatusCode::OK | StatusCode::CREATED => {
info!(
"Successfully submitted diagnostic result for request {}",
request_id
);
Ok(())
}
status => {
let error_body = response.text().await.unwrap_or_default();
error!(
"Failed to submit diagnostic result for request {}. Status {}: {}",
request_id, status, error_body
);
Err(format!(
"Failed to submit diagnostic result. Status: {}, Body: {}",
status, error_body
)
.into())
}
}
}
}