brokkr-agent::broker Rust
Broker communication module for agent-broker interaction.
All v1 API calls go through [brokkr_client::BrokkrClient] (T-D1
migration). The only remaining reqwest::Client usage is in
[wait_for_broker_ready] for the /readyz health probe, which lives
outside the v1 surface and is therefore not part of the generated SDK.
Public function signatures continue to exchange brokkr-models types
(Agent, DeploymentObject, …) so callers (cli/commands.rs) didn’t
have to change. Inside each function we JSON-round-trip between the
SDK’s generated types and the workspace’s brokkr-models types — the
wire form is byte-identical and the conversion is cheap on the
frequencies we operate at (seconds-scale).
Functions
brokkr-agent::broker::status_u16
private
#![allow(unused)]
fn main() {
fn status_u16 (err : & BrokkrError) -> Option < u16 >
}
HTTP status helper. The agent and brokkr-client link different reqwest majors (0.11 vs 0.13), so we never compare StatusCode values directly — always go through as_u16().
Source
#![allow(unused)]
fn main() {
fn status_u16(err: &BrokkrError) -> Option<u16> {
err.status().map(|s| s.as_u16())
}
}
brokkr-agent::broker::convert
private
#![allow(unused)]
fn main() {
fn convert < From : Serialize , To : DeserializeOwned > (value : From) -> Result < To , serde_json :: Error >
}
JSON-round-trip between two serde-compatible types. Used to bridge the SDK’s brokkr_client::types::* and the workspace’s brokkr_models types, which have identical wire formats but distinct Rust identities.
Source
#![allow(unused)]
fn main() {
fn convert<From: Serialize, To: DeserializeOwned>(value: From) -> Result<To, serde_json::Error> {
let v = serde_json::to_value(value)?;
serde_json::from_value(v)
}
}
brokkr-agent::broker::boxed
private
#![allow(unused)]
fn main() {
fn boxed (prefix : & str , err : BrokkrError) -> Box < dyn std :: error :: Error >
}
Map a BrokkrError into the agent’s historical Box<dyn Error> shape with a stable prefix. Status-aware mapping happens at the call site.
Source
#![allow(unused)]
fn main() {
fn boxed(prefix: &str, err: BrokkrError) -> Box<dyn std::error::Error> {
let msg = match status_u16(&err) {
Some(s) => format!("{prefix}. Status: {s}, Error: {err}"),
None => format!("{prefix}: {err}"),
};
msg.into()
}
}
brokkr-agent::broker::wait_for_broker_ready
pub
#![allow(unused)]
fn main() {
async fn wait_for_broker_ready (config : & Settings)
}
Waits for the broker service to become ready.
Not migrated to the SDK: /readyz is outside the v1 spec surface.
Source
#![allow(unused)]
fn main() {
pub async fn wait_for_broker_ready(config: &Settings) {
let client = reqwest::Client::new();
let readyz_url = format!("{}/readyz", config.agent.broker_url);
for attempt in 1..=config.agent.max_retries {
match client.get(&readyz_url).send().await {
Ok(response) => {
if response.status().is_success() {
info!("Successfully connected to broker at {}", readyz_url);
return;
}
warn!(
"Broker at {} returned non-success status: {}",
readyz_url,
response.status()
);
}
Err(e) => {
warn!(
"Failed to connect to broker at {} (attempt {}/{}): {:?}",
readyz_url, attempt, config.agent.max_retries, e
);
}
}
if attempt < config.agent.max_retries {
info!(
"Waiting for broker to be ready at {} (attempt {}/{})",
readyz_url, attempt, config.agent.max_retries
);
sleep(Duration::from_secs(1)).await;
}
}
error!(
"Failed to connect to broker at {} after {} attempts. Exiting.",
readyz_url, config.agent.max_retries
);
std::process::exit(1);
}
}
brokkr-agent::broker::verify_agent_pak
pub
#![allow(unused)]
fn main() {
async fn verify_agent_pak (config : & Settings) -> Result < () , Box < dyn std :: error :: Error > >
}
Verifies the agent’s Personal Access Key (PAK) with the broker.
Source
#![allow(unused)]
fn main() {
pub async fn verify_agent_pak(config: &Settings) -> Result<(), Box<dyn std::error::Error>> {
debug!("Verifying agent PAK at {}", config.agent.broker_url);
let sdk = crate::broker_sdk::build_client(config)
.map_err(|e| Box::new(e) as Box<dyn std::error::Error>)?;
match sdk.api().verify_pak().send().await {
Ok(_) => {
info!("Successfully verified agent PAK");
Ok(())
}
Err(raw) => {
let wrapped = BrokkrError::from(raw);
if status_u16(&wrapped) == Some(401) {
error!("Agent PAK verification failed: unauthorized");
Err("Invalid agent PAK".into())
} else {
error!("PAK verification failed: {}", wrapped);
Err(boxed("PAK verification failed", wrapped))
}
}
}
}
}
brokkr-agent::broker::fetch_agent_details
pub
#![allow(unused)]
fn main() {
async fn fetch_agent_details (config : & Settings , client : & BrokkrClient ,) -> Result < Agent , Box < dyn std :: error :: Error > >
}
Fetches the details of the agent from the broker.
Source
#![allow(unused)]
fn main() {
pub async fn fetch_agent_details(
config: &Settings,
client: &BrokkrClient,
) -> Result<Agent, Box<dyn std::error::Error>> {
debug!(
"Fetching agent details for name={} cluster={}",
config.agent.agent_name, config.agent.cluster_name
);
let result = client
.api()
.search_agent()
.name(&config.agent.agent_name)
.cluster_name(&config.agent.cluster_name)
.send()
.await;
match result {
Ok(rv) => {
let agent: Agent = convert(rv.into_inner()).map_err(|e| {
error!("Failed to convert agent details: {}", e);
Box::new(e) as Box<dyn std::error::Error>
})?;
info!(
"Successfully fetched details for agent {} in cluster {}",
agent.name, agent.cluster_name
);
Ok(agent)
}
Err(raw) => {
let wrapped = BrokkrError::from(raw);
if status_u16(&wrapped) == Some(404) {
error!(
"Agent not found: name={}, cluster={}",
config.agent.agent_name, config.agent.cluster_name
);
Err("Agent not found".into())
} else {
error!("Failed to fetch agent details: {}", wrapped);
Err(boxed("Failed to fetch agent details", wrapped))
}
}
}
}
}
brokkr-agent::broker::fetch_and_process_deployment_objects
pub
#![allow(unused)]
fn main() {
async fn fetch_and_process_deployment_objects (_config : & Settings , client : & BrokkrClient , agent : & Agent ,) -> Result < Vec < DeploymentObject > , Box < dyn std :: error :: Error > >
}
Fetches deployment objects to apply from the broker’s target-state view.
Source
#![allow(unused)]
fn main() {
pub async fn fetch_and_process_deployment_objects(
_config: &Settings,
client: &BrokkrClient,
agent: &Agent,
) -> Result<Vec<DeploymentObject>, Box<dyn std::error::Error>> {
debug!("Fetching target-state deployment objects for agent {}", agent.name);
let start = Instant::now();
let result = client.api().get_target_state().id(agent.id).send().await;
let duration = start.elapsed().as_secs_f64();
match result {
Ok(rv) => {
let objects: Vec<DeploymentObject> = convert(rv.into_inner()).map_err(|e| {
error!("Failed to convert deployment objects: {}", e);
metrics::poll_requests_total()
.with_label_values(&["error"])
.inc();
metrics::poll_duration_seconds()
.with_label_values(&[])
.observe(duration);
Box::new(e) as Box<dyn std::error::Error>
})?;
info!(
"Successfully fetched {} deployment objects for agent {}",
objects.len(),
agent.name
);
metrics::poll_requests_total()
.with_label_values(&["success"])
.inc();
metrics::poll_duration_seconds()
.with_label_values(&[])
.observe(duration);
Ok(objects)
}
Err(raw) => {
let wrapped = BrokkrError::from(raw);
error!("Failed to fetch deployment objects: {}", wrapped);
metrics::poll_requests_total()
.with_label_values(&["error"])
.inc();
metrics::poll_duration_seconds()
.with_label_values(&[])
.observe(duration);
Err(boxed("Failed to fetch deployment objects", wrapped))
}
}
}
}
brokkr-agent::broker::send_success_event
pub
#![allow(unused)]
fn main() {
async fn send_success_event (_config : & Settings , client : & BrokkrClient , agent : & Agent , deployment_object_id : Uuid , message : Option < String > ,) -> Result < () , Box < dyn std :: error :: Error > >
}
Sends a success event to the broker for the given deployment object.
Source
#![allow(unused)]
fn main() {
pub async fn send_success_event(
_config: &Settings,
client: &BrokkrClient,
agent: &Agent,
deployment_object_id: Uuid,
message: Option<String>,
) -> Result<(), Box<dyn std::error::Error>> {
debug!(
"Sending success event for deployment {} for agent {}",
deployment_object_id, agent.name
);
let event = NewAgentEvent {
agent_id: agent.id,
deployment_object_id,
event_type: "DEPLOY".to_string(),
status: "SUCCESS".to_string(),
message,
};
let sdk_event: brokkr_client::types::NewAgentEvent = convert(event).map_err(|e| {
error!("Failed to convert NewAgentEvent: {}", e);
Box::new(e) as Box<dyn std::error::Error>
})?;
match client
.api()
.create_event()
.id(agent.id)
.body(sdk_event)
.send()
.await
{
Ok(_) => {
info!(
"Successfully reported deployment success for object {}",
deployment_object_id
);
Ok(())
}
Err(raw) => {
let wrapped = BrokkrError::from(raw);
error!("Failed to send success event: {}", wrapped);
Err(boxed("Failed to send success event", wrapped))
}
}
}
}
brokkr-agent::broker::send_failure_event
pub
#![allow(unused)]
fn main() {
async fn send_failure_event (_config : & Settings , client : & BrokkrClient , agent : & Agent , deployment_object_id : Uuid , error_message : String ,) -> Result < () , Box < dyn std :: error :: Error > >
}
Sends a failure event to the broker for the given deployment object.
Source
#![allow(unused)]
fn main() {
pub async fn send_failure_event(
_config: &Settings,
client: &BrokkrClient,
agent: &Agent,
deployment_object_id: Uuid,
error_message: String,
) -> Result<(), Box<dyn std::error::Error>> {
debug!(
"Sending failure event for deployment {} for agent {}",
deployment_object_id, agent.name
);
let event = NewAgentEvent {
agent_id: agent.id,
deployment_object_id,
event_type: "DEPLOY".to_string(),
status: "FAILURE".to_string(),
message: Some(error_message),
};
let sdk_event: brokkr_client::types::NewAgentEvent = convert(event).map_err(|e| {
error!("Failed to convert NewAgentEvent: {}", e);
Box::new(e) as Box<dyn std::error::Error>
})?;
match client
.api()
.create_event()
.id(agent.id)
.body(sdk_event)
.send()
.await
{
Ok(_) => {
info!(
"Successfully reported deployment failure for object {}",
deployment_object_id
);
Ok(())
}
Err(raw) => {
let wrapped = BrokkrError::from(raw);
error!(
"Failed to send failure event for deployment {}: {}",
deployment_object_id, wrapped
);
Err(boxed("Failed to send failure event", wrapped))
}
}
}
}
brokkr-agent::broker::send_heartbeat
pub
#![allow(unused)]
fn main() {
async fn send_heartbeat (_config : & Settings , client : & BrokkrClient , agent : & Agent ,) -> Result < () , Box < dyn std :: error :: Error > >
}
Sends a heartbeat to the broker for the given agent.
Source
#![allow(unused)]
fn main() {
pub async fn send_heartbeat(
_config: &Settings,
client: &BrokkrClient,
agent: &Agent,
) -> Result<(), Box<dyn std::error::Error>> {
match client.api().record_heartbeat().id(agent.id).send().await {
Ok(_) => {
trace!("Heartbeat sent successfully for agent {}", agent.name);
metrics::heartbeat_sent_total().inc();
metrics::last_successful_poll_timestamp().set(
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs_f64(),
);
Ok(())
}
Err(raw) => {
let wrapped = BrokkrError::from(raw);
metrics::heartbeat_sent_total().inc();
if status_u16(&wrapped) == Some(401) {
error!("Heartbeat unauthorized for agent {}", agent.name);
Err("Unauthorized: Invalid agent PAK".into())
} else {
error!(
"Heartbeat failed for agent {}: {}",
agent.name, wrapped
);
Err(boxed("Heartbeat failed", wrapped))
}
}
}
}
}
brokkr-agent::broker::send_health_status
pub
#![allow(unused)]
fn main() {
async fn send_health_status (_config : & Settings , client : & BrokkrClient , agent : & Agent , health_updates : Vec < DeploymentObjectHealthUpdate > ,) -> Result < () , Box < dyn std :: error :: Error > >
}
Sends health status updates for deployment objects to the broker.
Source
#![allow(unused)]
fn main() {
pub async fn send_health_status(
_config: &Settings,
client: &BrokkrClient,
agent: &Agent,
health_updates: Vec<DeploymentObjectHealthUpdate>,
) -> Result<(), Box<dyn std::error::Error>> {
if health_updates.is_empty() {
return Ok(());
}
debug!(
"Sending health status update for {} deployment objects for agent {}",
health_updates.len(),
agent.name
);
let update = HealthStatusUpdate {
deployment_objects: health_updates,
};
let count = update.deployment_objects.len();
let sdk_update: brokkr_client::types::HealthStatusUpdate = convert(update).map_err(|e| {
error!("Failed to convert HealthStatusUpdate: {}", e);
Box::new(e) as Box<dyn std::error::Error>
})?;
match client
.api()
.update_health_status()
.id(agent.id)
.body(sdk_update)
.send()
.await
{
Ok(_) => {
debug!(
"Successfully sent health status for {} deployment objects",
count
);
Ok(())
}
Err(raw) => {
let wrapped = BrokkrError::from(raw);
if status_u16(&wrapped) == Some(401) {
error!("Health status update unauthorized for agent {}", agent.name);
Err("Unauthorized: Invalid agent PAK".into())
} else {
error!(
"Health status update failed for agent {}: {}",
agent.name, wrapped
);
Err(boxed("Health status update failed", wrapped))
}
}
}
}
}
brokkr-agent::broker::fetch_pending_diagnostics
pub
#![allow(unused)]
fn main() {
async fn fetch_pending_diagnostics (_config : & Settings , client : & BrokkrClient , agent : & Agent ,) -> Result < Vec < DiagnosticRequest > , Box < dyn std :: error :: Error > >
}
Fetches pending diagnostic requests for the agent.
Source
#![allow(unused)]
fn main() {
pub async fn fetch_pending_diagnostics(
_config: &Settings,
client: &BrokkrClient,
agent: &Agent,
) -> Result<Vec<DiagnosticRequest>, Box<dyn std::error::Error>> {
debug!("Fetching pending diagnostics for agent {}", agent.name);
match client
.api()
.get_pending_diagnostics()
.id(agent.id)
.send()
.await
{
Ok(rv) => {
let requests: Vec<DiagnosticRequest> = convert(rv.into_inner()).map_err(|e| {
error!("Failed to convert diagnostic requests: {}", e);
Box::new(e) as Box<dyn std::error::Error>
})?;
if !requests.is_empty() {
debug!(
"Found {} pending diagnostic requests for agent {}",
requests.len(),
agent.name
);
}
Ok(requests)
}
Err(raw) => {
let wrapped = BrokkrError::from(raw);
error!("Failed to fetch pending diagnostics: {}", wrapped);
Err(boxed("Failed to fetch pending diagnostics", wrapped))
}
}
}
}
brokkr-agent::broker::claim_diagnostic_request
pub
#![allow(unused)]
fn main() {
async fn claim_diagnostic_request (_config : & Settings , client : & BrokkrClient , request_id : Uuid ,) -> Result < DiagnosticRequest , Box < dyn std :: error :: Error > >
}
Claims a diagnostic request for processing.
Source
#![allow(unused)]
fn main() {
pub async fn claim_diagnostic_request(
_config: &Settings,
client: &BrokkrClient,
request_id: Uuid,
) -> Result<DiagnosticRequest, Box<dyn std::error::Error>> {
debug!("Claiming diagnostic request {}", request_id);
match client.api().claim_diagnostic().id(request_id).send().await {
Ok(rv) => {
let request: DiagnosticRequest = convert(rv.into_inner()).map_err(|e| {
error!("Failed to convert claimed diagnostic request: {}", e);
Box::new(e) as Box<dyn std::error::Error>
})?;
info!("Successfully claimed diagnostic request {}", request_id);
Ok(request)
}
Err(raw) => {
let wrapped = BrokkrError::from(raw);
if status_u16(&wrapped) == Some(409) {
warn!(
"Diagnostic request {} already claimed or completed",
request_id
);
Err(format!(
"Diagnostic request {request_id} already claimed or completed"
)
.into())
} else {
error!(
"Failed to claim diagnostic request {}: {}",
request_id, wrapped
);
Err(boxed("Failed to claim diagnostic request", wrapped))
}
}
}
}
}
brokkr-agent::broker::submit_diagnostic_result
pub
#![allow(unused)]
fn main() {
async fn submit_diagnostic_result (_config : & Settings , client : & BrokkrClient , request_id : Uuid , result : SubmitDiagnosticResult ,) -> Result < () , Box < dyn std :: error :: Error > >
}
Submits diagnostic results for a request.
Source
#![allow(unused)]
fn main() {
pub async fn submit_diagnostic_result(
_config: &Settings,
client: &BrokkrClient,
request_id: Uuid,
result: SubmitDiagnosticResult,
) -> Result<(), Box<dyn std::error::Error>> {
debug!("Submitting diagnostic result for request {}", request_id);
let sdk_result: brokkr_client::types::SubmitDiagnosticResult =
convert(result).map_err(|e| {
error!("Failed to convert SubmitDiagnosticResult: {}", e);
Box::new(e) as Box<dyn std::error::Error>
})?;
match client
.api()
.submit_diagnostic_result()
.id(request_id)
.body(sdk_result)
.send()
.await
{
Ok(_) => {
info!(
"Successfully submitted diagnostic result for request {}",
request_id
);
Ok(())
}
Err(raw) => {
let wrapped = BrokkrError::from(raw);
error!(
"Failed to submit diagnostic result for request {}: {}",
request_id, wrapped
);
Err(boxed("Failed to submit diagnostic result", wrapped))
}
}
}
}