Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

brokkr-agent::work_orders Rust

Functions

brokkr-agent::work_orders::is_error_retryable

private

#![allow(unused)]
fn main() {
fn is_error_retryable (error : & dyn std :: error :: Error) -> bool
}

Determines if an error is retryable by inspecting the error message.

Non-retryable errors include:

  • 404 NotFound (resource doesn’t exist)
  • 403 Forbidden (permission denied)
  • 400 BadRequest (malformed request)
  • Validation errors Retryable errors include:
  • 429 TooManyRequests
  • 500 InternalServerError
  • 503 ServiceUnavailable
  • 504 GatewayTimeout
  • Network/connectivity errors
Source
#![allow(unused)]
fn main() {
fn is_error_retryable(error: &dyn std::error::Error) -> bool {
    let error_str = error.to_string().to_lowercase();

    // Non-retryable patterns (permanent failures)
    let non_retryable_patterns = [
        "notfound",
        "not found",
        "forbidden",
        "unauthorized",
        "badrequest",
        "bad request",
        "invalid",
        "unprocessable",
        "conflict",
    ];

    for pattern in &non_retryable_patterns {
        if error_str.contains(pattern) {
            debug!(
                "Error classified as non-retryable (matched '{}'): {}",
                pattern, error
            );
            return false;
        }
    }

    // Retryable patterns (transient failures)
    let retryable_patterns = [
        "timeout",
        "unavailable",
        "connection",
        "network",
        "internal",
        "too many requests",
        "throttl",
    ];

    for pattern in &retryable_patterns {
        if error_str.contains(pattern) {
            debug!(
                "Error classified as retryable (matched '{}'): {}",
                pattern, error
            );
            return true;
        }
    }

    // Default to non-retryable for unknown errors
    // This prevents infinite retry loops for unhandled cases
    debug!(
        "Error classified as non-retryable (no pattern match): {}",
        error
    );
    false
}
}

brokkr-agent::work_orders::process_pending_work_orders

pub

#![allow(unused)]
fn main() {
async fn process_pending_work_orders (config : & Settings , http_client : & Client , k8s_client : & K8sClient , agent : & Agent ,) -> Result < usize , Box < dyn std :: error :: Error > >
}

Processes pending work orders for the agent.

This function:

  1. Fetches pending work orders from the broker
  2. Claims the first available work order
  3. Executes the work based on work type
  4. Reports completion to the broker

Parameters:

NameTypeDescription
config-Application settings
http_client-HTTP client for broker communication
k8s_client-Kubernetes client for resource operations
agent-Agent details

Returns:

Number of work orders processed

Source
#![allow(unused)]
fn main() {
pub async fn process_pending_work_orders(
    config: &Settings,
    http_client: &Client,
    k8s_client: &K8sClient,
    agent: &Agent,
) -> Result<usize, Box<dyn std::error::Error>> {
    // Fetch pending work orders
    let pending = broker::fetch_pending_work_orders(config, http_client, agent, None).await?;

    if pending.is_empty() {
        trace!("No pending work orders for agent {}", agent.name);
        return Ok(0);
    }

    info!(
        "Found {} pending work orders for agent {}",
        pending.len(),
        agent.name
    );

    let mut processed = 0;

    // Process one work order at a time
    // In the future, we could parallelize this based on work type
    for work_order in pending.iter().take(1) {
        match process_single_work_order(config, http_client, k8s_client, agent, work_order).await {
            Ok(_) => {
                processed += 1;
                info!(
                    "Successfully processed work order {} (type: {})",
                    work_order.id, work_order.work_type
                );
            }
            Err(e) => {
                error!(
                    "Failed to process work order {} (type: {}): {}",
                    work_order.id, work_order.work_type, e
                );
                // Continue with next work order instead of failing completely
            }
        }
    }

    Ok(processed)
}
}

brokkr-agent::work_orders::process_single_work_order

private

#![allow(unused)]
fn main() {
async fn process_single_work_order (config : & Settings , http_client : & Client , k8s_client : & K8sClient , agent : & Agent , work_order : & WorkOrder ,) -> Result < () , Box < dyn std :: error :: Error > >
}

Processes a single work order through its complete lifecycle.

Source
#![allow(unused)]
fn main() {
async fn process_single_work_order(
    config: &Settings,
    http_client: &Client,
    k8s_client: &K8sClient,
    agent: &Agent,
    work_order: &WorkOrder,
) -> Result<(), Box<dyn std::error::Error>> {
    info!(
        "Processing work order {} (type: {}, status: {})",
        work_order.id, work_order.work_type, work_order.status
    );

    // Claim the work order
    let claimed = broker::claim_work_order(config, http_client, agent, work_order.id).await?;
    info!("Successfully claimed work order {}", claimed.id);

    // Execute based on work type
    let result = match work_order.work_type.as_str() {
        "build" => execute_build_work_order(config, http_client, k8s_client, agent, &claimed).await,
        "custom" => execute_custom_work_order(k8s_client, agent, &claimed).await,
        unknown => Err(format!("Unknown work type: {}", unknown).into()),
    };

    // Report completion
    match result {
        Ok(message) => {
            broker::complete_work_order(config, http_client, work_order.id, true, message, true)
                .await?;
            info!("Work order {} completed successfully", work_order.id);
        }
        Err(e) => {
            let error_msg = e.to_string();
            let retryable = is_error_retryable(e.as_ref());
            if retryable {
                warn!(
                    "Work order {} failed with retryable error: {}",
                    work_order.id, e
                );
            } else {
                error!(
                    "Work order {} failed with non-retryable error: {}",
                    work_order.id, e
                );
            }
            broker::complete_work_order(
                config,
                http_client,
                work_order.id,
                false,
                Some(error_msg),
                retryable,
            )
            .await?;
            return Err(e);
        }
    }

    Ok(())
}
}

brokkr-agent::work_orders::execute_build_work_order

private

#![allow(unused)]
fn main() {
async fn execute_build_work_order (_config : & Settings , _http_client : & Client , k8s_client : & K8sClient , agent : & Agent , work_order : & WorkOrder ,) -> Result < Option < String > , Box < dyn std :: error :: Error > >
}

Executes a build work order using Shipwright.

Source
#![allow(unused)]
fn main() {
async fn execute_build_work_order(
    _config: &Settings,
    _http_client: &Client,
    k8s_client: &K8sClient,
    agent: &Agent,
    work_order: &WorkOrder,
) -> Result<Option<String>, Box<dyn std::error::Error>> {
    info!(
        "Executing build work order {} for agent {}",
        work_order.id, agent.name
    );

    // Parse the YAML content to extract Build and WorkOrder resources
    let yaml_docs = crate::utils::multidoc_deserialize(&work_order.yaml_content)?;

    if yaml_docs.is_empty() {
        return Err("Work order YAML content is empty".into());
    }

    // Apply all K8s resources from the YAML
    // The YAML should contain Shipwright Build + brokkr WorkOrder CRD
    for _doc in &yaml_docs {
        debug!("Applying K8s resource from work order YAML");
        // We'll implement the actual application in the build module
    }

    // Execute the build using the build handler
    let result = build::execute_build(
        k8s_client,
        &work_order.yaml_content,
        &work_order.id.to_string(),
    )
    .await?;

    Ok(result)
}
}

brokkr-agent::work_orders::execute_custom_work_order

private

#![allow(unused)]
fn main() {
async fn execute_custom_work_order (k8s_client : & K8sClient , agent : & Agent , work_order : & WorkOrder ,) -> Result < Option < String > , Box < dyn std :: error :: Error > >
}

Executes a custom work order by applying YAML resources to the cluster.

Source
#![allow(unused)]
fn main() {
async fn execute_custom_work_order(
    k8s_client: &K8sClient,
    agent: &Agent,
    work_order: &WorkOrder,
) -> Result<Option<String>, Box<dyn std::error::Error>> {
    use kube::api::{DynamicObject, PatchParams};

    info!(
        "Executing custom work order {} for agent {}",
        work_order.id, agent.name
    );

    // Parse the YAML content
    let yaml_docs = crate::utils::multidoc_deserialize(&work_order.yaml_content)?;

    if yaml_docs.is_empty() {
        return Err("Work order YAML content is empty".into());
    }

    // Convert YAML docs to DynamicObjects
    let mut objects: Vec<DynamicObject> = Vec::new();
    for yaml_doc in &yaml_docs {
        // Skip null documents
        if yaml_doc.is_null() {
            continue;
        }

        let object: DynamicObject = serde_yaml::from_value(yaml_doc.clone())?;
        let gvk = object
            .types
            .as_ref()
            .ok_or("Object missing type metadata")?;
        debug!(
            "Parsed {} {}/{}",
            gvk.kind,
            object.metadata.namespace.as_deref().unwrap_or("cluster"),
            object.metadata.name.as_deref().unwrap_or("unnamed")
        );
        objects.push(object);
    }

    if objects.is_empty() {
        return Err("No valid Kubernetes objects found in YAML".into());
    }

    info!(
        "Applying {} resource(s) from custom work order {}",
        objects.len(),
        work_order.id
    );

    // Apply all resources using server-side apply
    let patch_params = PatchParams::apply("brokkr-agent").force();
    crate::k8s::api::apply_k8s_objects(&objects, k8s_client.clone(), patch_params).await?;

    Ok(Some(format!(
        "Successfully applied {} resource(s)",
        objects.len()
    )))
}
}