Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

brokkr-agent::k8s::api Rust

Structs

brokkr-agent::k8s::api::RetryConfig

private

Retry configuration for Kubernetes operations

Fields

NameTypeDescription
max_elapsed_timeDuration
initial_intervalDuration
max_intervalDuration
multiplierf64

Functions

brokkr-agent::k8s::api::is_retryable_error

private

#![allow(unused)]
fn main() {
fn is_retryable_error (error : & KubeError) -> bool
}

Determines if a Kubernetes error is retryable

Source
#![allow(unused)]
fn main() {
fn is_retryable_error(error: &KubeError) -> bool {
    match error {
        KubeError::Api(api_err) => {
            matches!(api_err.code, 429 | 500 | 503 | 504)
                || matches!(
                    api_err.reason.as_str(),
                    "ServiceUnavailable" | "InternalError" | "Timeout"
                )
        }
        _ => false,
    }
}
}

brokkr-agent::k8s::api::with_retries

private

#![allow(unused)]
fn main() {
async fn with_retries < F , Fut , T > (operation : F , config : RetryConfig ,) -> Result < T , Box < dyn std :: error :: Error > > where F : Fn () -> Fut , Fut : std :: future :: Future < Output = Result < T , KubeError > > ,
}

Executes a Kubernetes operation with retries

Source
#![allow(unused)]
fn main() {
async fn with_retries<F, Fut, T>(
    operation: F,
    config: RetryConfig,
) -> Result<T, Box<dyn std::error::Error>>
where
    F: Fn() -> Fut,
    Fut: std::future::Future<Output = Result<T, KubeError>>,
{
    let backoff = ExponentialBackoffBuilder::new()
        .with_initial_interval(config.initial_interval)
        .with_max_interval(config.max_interval)
        .with_multiplier(config.multiplier)
        .with_max_elapsed_time(Some(config.max_elapsed_time))
        .build();

    let operation_with_backoff = || async {
        match operation().await {
            Ok(value) => Ok(value),
            Err(error) => {
                if is_retryable_error(&error) {
                    warn!("Retryable error encountered: {}", error);
                    Err(backoff::Error::Transient {
                        err: error,
                        retry_after: None,
                    })
                } else {
                    error!("Non-retryable error encountered: {}", error);
                    Err(backoff::Error::Permanent(error))
                }
            }
        }
    };

    backoff::future::retry(backoff, operation_with_backoff)
        .await
        .map_err(|e| Box::new(e) as Box<dyn std::error::Error>)
}
}

brokkr-agent::k8s::api::apply_k8s_objects

pub

#![allow(unused)]
fn main() {
async fn apply_k8s_objects (k8s_objects : & [DynamicObject] , k8s_client : K8sClient , patch_params : PatchParams ,) -> Result < () , Box < dyn std :: error :: Error > >
}

Applies a list of Kubernetes objects to the cluster using server-side apply.

Parameters:

NameTypeDescription
k8s_objects-List of DynamicObjects to apply
discovery-Kubernetes Discovery client for API resource resolution
k8s_client-Kubernetes client for API interactions
patch_params-Parameters for the patch operation

Returns:

  • Result<(), Box<dyn std::error::Error>> - Success or error with message
Source
#![allow(unused)]
fn main() {
pub async fn apply_k8s_objects(
    k8s_objects: &[DynamicObject],
    k8s_client: K8sClient,
    patch_params: PatchParams,
) -> Result<(), Box<dyn std::error::Error>> {
    info!("Applying {} Kubernetes objects", k8s_objects.len());
    let start = Instant::now();

    let discovery = Discovery::new(k8s_client.clone())
        .run()
        .await
        .map_err(|e| {
            error!("Failed to create Kubernetes discovery client: {}", e);
            metrics::kubernetes_operations_total()
                .with_label_values(&["apply"])
                .inc();
            metrics::kubernetes_operation_duration_seconds()
                .with_label_values(&["apply"])
                .observe(start.elapsed().as_secs_f64());
            e
        })?;

    for k8s_object in k8s_objects {
        let default_namespace = &"default".to_string();
        let namespace = k8s_object
            .metadata
            .namespace
            .as_deref()
            .unwrap_or(default_namespace);

        let gvk = if let Some(tm) = &k8s_object.types {
            GroupVersionKind::try_from(tm)?
        } else {
            error!(
                "Cannot apply object without valid TypeMeta for object named '{}'",
                k8s_object.name_any()
            );
            return Err(format!(
                "Cannot apply object without valid TypeMeta for object named '{}'",
                k8s_object.name_any()
            )
            .into());
        };

        if let Some((ar, caps)) = discovery.resolve_gvk(&gvk) {
            let api = dynamic_api(ar, caps, k8s_client.clone(), Some(namespace), false);
            info!(
                "Applying {} '{}' in namespace '{}'",
                gvk.kind,
                k8s_object.name_any(),
                namespace
            );
            debug!("Object content:\n{}", serde_yaml::to_string(&k8s_object)?);

            let data = serde_json::to_value(k8s_object)?;
            let name = k8s_object.name_any();
            let name_for_error = name.clone();
            let patch_params = patch_params.clone();

            with_retries(
                move || {
                    let api = api.clone();
                    let name = name.clone();
                    let data = data.clone();
                    let patch_params = patch_params.clone();
                    async move { api.patch(&name, &patch_params, &Patch::Apply(data)).await }
                },
                RetryConfig::default(),
            )
            .await
            .map_err(|e| {
                error!(
                    "Failed to apply {} '{}' in namespace '{}': {}",
                    gvk.kind, name_for_error, namespace, e
                );
                e
            })?;

            info!(
                "Successfully applied {} '{}' in namespace '{}'",
                gvk.kind, name_for_error, namespace
            );
        } else {
            error!(
                "Failed to resolve GroupVersionKind for {} '{}' in namespace '{}'",
                gvk.kind,
                k8s_object.name_any(),
                namespace
            );
            return Err(format!(
                "Failed to resolve GroupVersionKind for {} '{}' in namespace '{}'",
                gvk.kind,
                k8s_object.name_any(),
                namespace
            )
            .into());
        }
    }

    info!(
        "Successfully applied all {} Kubernetes objects",
        k8s_objects.len()
    );

    // Record metrics for successful apply
    metrics::kubernetes_operations_total()
        .with_label_values(&["apply"])
        .inc();
    metrics::kubernetes_operation_duration_seconds()
        .with_label_values(&["apply"])
        .observe(start.elapsed().as_secs_f64());

    Ok(())
}
}

brokkr-agent::k8s::api::dynamic_api

pub

#![allow(unused)]
fn main() {
fn dynamic_api (ar : ApiResource , caps : ApiCapabilities , client : K8sClient , namespace : Option < & str > , all_namespaces : bool ,) -> Api < DynamicObject >
}

Creates a dynamic Kubernetes API client for a specific resource type

Parameters:

NameTypeDescription
ar-ApiResource describing the Kubernetes resource type
caps-Capabilities of the API (e.g., list, watch, etc.)
client-Kubernetes client instance
namespace-Optional namespace to scope the API to
all_namespaces-Whether to operate across all namespaces

Returns:

An Api instance configured for the specified resource type

Source
#![allow(unused)]
fn main() {
pub fn dynamic_api(
    ar: ApiResource,
    caps: ApiCapabilities,
    client: K8sClient,
    namespace: Option<&str>,
    all_namespaces: bool,
) -> Api<DynamicObject> {
    if caps.scope == Scope::Cluster || all_namespaces {
        Api::all_with(client, &ar)
    } else if let Some(namespace) = namespace {
        Api::namespaced_with(client, namespace, &ar)
    } else {
        Api::default_namespaced_with(client, &ar)
    }
}
}

brokkr-agent::k8s::api::get_all_objects_by_annotation

pub

#![allow(unused)]
fn main() {
async fn get_all_objects_by_annotation (k8s_client : & K8sClient , annotation_key : & str , annotation_value : & str ,) -> Result < Vec < DynamicObject > , Box < dyn std :: error :: Error > >
}

Retrieves all Kubernetes objects with a specific annotation key-value pair.

Parameters:

NameTypeDescription
k8s_client-Kubernetes client
discovery-Kubernetes Discovery client
annotation_key-Annotation key to filter by
annotation_value-Annotation value to filter by

Returns:

  • Result<Vec<DynamicObject>, Box<dyn std::error::Error>> - List of matching objects or error
Source
#![allow(unused)]
fn main() {
pub async fn get_all_objects_by_annotation(
    k8s_client: &K8sClient,
    annotation_key: &str,
    annotation_value: &str,
) -> Result<Vec<DynamicObject>, Box<dyn std::error::Error>> {
    let mut results = Vec::new();

    let discovery = Discovery::new(k8s_client.clone())
        .run()
        .await
        .expect("Failed to create discovery client");

    // Search through all API groups and resources
    for group in discovery.groups() {
        for (ar, caps) in group.recommended_resources() {
            let api: Api<DynamicObject> =
                dynamic_api(ar.clone(), caps.clone(), k8s_client.clone(), None, true);

            match api.list(&Default::default()).await {
                Ok(list) => {
                    let matching_objects = list
                        .items
                        .into_iter()
                        .filter(|obj| {
                            obj.metadata
                                .annotations
                                .as_ref()
                                .and_then(|annotations| annotations.get(annotation_key))
                                .is_some_and(|value| value == annotation_value)
                        })
                        .map(|mut obj| {
                            // Set TypeMeta directly
                            obj.types = Some(TypeMeta {
                                api_version: if ar.group.is_empty() {
                                    ar.version.clone()
                                } else {
                                    format!("{}/{}", ar.group, ar.version)
                                },
                                kind: ar.kind.clone(),
                            });
                            obj
                        });
                    results.extend(matching_objects);
                }
                Err(e) => warn!("Error listing resources for {:?}: {:?}", ar, e),
            }
        }
    }

    Ok(results)
}
}

brokkr-agent::k8s::api::delete_k8s_objects

pub

#![allow(unused)]
fn main() {
async fn delete_k8s_objects (k8s_objects : & [DynamicObject] , k8s_client : K8sClient , agent_id : & Uuid ,) -> Result < () , Box < dyn std :: error :: Error > >
}

Deletes a list of Kubernetes objects from the cluster.

Parameters:

NameTypeDescription
k8s_objects-List of DynamicObjects to delete
discovery-Kubernetes Discovery client for API resource resolution
k8s_client-Kubernetes client for API interactions

Returns:

  • Result<(), Box<dyn std::error::Error>> - Success or error with message
Source
#![allow(unused)]
fn main() {
pub async fn delete_k8s_objects(
    k8s_objects: &[DynamicObject],
    k8s_client: K8sClient,
    agent_id: &Uuid,
) -> Result<(), Box<dyn std::error::Error>> {
    info!(
        "Starting deletion of {} Kubernetes objects",
        k8s_objects.len()
    );
    let discovery = Discovery::new(k8s_client.clone())
        .run()
        .await
        .expect("Failed to create discovery client");

    for k8s_object in k8s_objects {
        // Verify ownership before attempting deletion
        if !verify_object_ownership(k8s_object, agent_id) {
            error!(
                "Cannot delete object '{}' (kind: {}) as it is not owned by agent {}",
                k8s_object.name_any(),
                k8s_object.types.as_ref().map_or("unknown", |t| &t.kind),
                agent_id
            );
            return Err(format!(
                "Cannot delete object '{}' as it is not owned by this agent",
                k8s_object.name_any()
            )
            .into());
        }

        debug!("Processing k8s object for deletion: {:?}", k8s_object);
        let default_namespace = &"default".to_string();
        let namespace = k8s_object
            .metadata
            .namespace
            .as_ref()
            .unwrap_or(default_namespace);

        let gvk = if let Some(tm) = &k8s_object.types {
            GroupVersionKind::try_from(tm)?
        } else {
            error!(
                "Cannot delete object '{}' without valid TypeMeta",
                k8s_object.name_any()
            );
            return Err(format!(
                "Cannot delete object without valid TypeMeta {:?}",
                k8s_object
            )
            .into());
        };

        if let Some((ar, caps)) = discovery.resolve_gvk(&gvk) {
            let api = dynamic_api(ar, caps, k8s_client.clone(), Some(namespace), false);
            let name = k8s_object.name_any();
            let name_for_error = name.clone();
            info!(
                "Deleting {} '{}' in namespace '{}'",
                gvk.kind, name, namespace
            );

            with_retries(
                move || {
                    let api = api.clone();
                    let name = name.clone();
                    async move { api.delete(&name, &Default::default()).await }
                },
                RetryConfig::default(),
            )
            .await
            .map_err(|e| {
                error!(
                    "Failed to delete {} '{}' in namespace '{}': {}",
                    gvk.kind, name_for_error, namespace, e
                );
                e
            })?;

            info!(
                "Successfully deleted {} '{}' in namespace '{}'",
                gvk.kind, name_for_error, namespace
            );
        }
    }

    info!(
        "Successfully deleted all {} Kubernetes objects",
        k8s_objects.len()
    );
    Ok(())
}
}

brokkr-agent::k8s::api::validate_k8s_objects

pub

#![allow(unused)]
fn main() {
async fn validate_k8s_objects (k8s_objects : & [DynamicObject] , k8s_client : K8sClient ,) -> Result < () , Box < dyn std :: error :: Error > >
}

Validates Kubernetes objects against the API server without applying them.

Parameters:

NameTypeDescription
k8s_objects-List of DynamicObjects to validate
k8s_client-Kubernetes client for API interactions

Returns:

  • Result<(), Box<dyn std::error::Error>> - Success or error with validation message
Source
#![allow(unused)]
fn main() {
pub async fn validate_k8s_objects(
    k8s_objects: &[DynamicObject],
    k8s_client: K8sClient,
) -> Result<(), Box<dyn std::error::Error>> {
    let mut validation_errors = Vec::new();

    let discovery = Discovery::new(k8s_client.clone())
        .run()
        .await
        .expect("Failed to create discovery client");

    for k8s_object in k8s_objects {
        let default_namespace = &"default".to_string();
        let namespace = k8s_object
            .metadata
            .namespace
            .as_deref()
            .unwrap_or(default_namespace);

        let gvk = if let Some(tm) = &k8s_object.types {
            match GroupVersionKind::try_from(tm) {
                Ok(gvk) => gvk,
                Err(e) => {
                    validation_errors.push(format!(
                        "Invalid TypeMeta for object '{}': {}",
                        k8s_object.name_any(),
                        e
                    ));
                    continue;
                }
            }
        } else {
            validation_errors.push(format!(
                "Missing TypeMeta for object '{}'",
                k8s_object.name_any()
            ));
            continue;
        };

        if let Some((ar, caps)) = discovery.resolve_gvk(&gvk) {
            let api = dynamic_api(ar, caps, k8s_client.clone(), Some(namespace), false);

            match serde_json::to_value(k8s_object) {
                Ok(data) => {
                    let mut patch_params = PatchParams::apply("validation");
                    patch_params = patch_params.dry_run();
                    patch_params.force = true;

                    match api
                        .patch(&k8s_object.name_any(), &patch_params, &Patch::Apply(data))
                        .await
                    {
                        Ok(_) => {
                            info!(
                                "Validation successful for {:?} '{}'",
                                gvk.kind,
                                k8s_object.name_any()
                            );
                        }
                        Err(e) => {
                            error!(
                                "Validation failed for {:?} '{}': {:?}",
                                gvk.kind,
                                k8s_object.name_any(),
                                e
                            );
                            validation_errors.push(format!(
                                "Validation failed for {} '{}': {}",
                                gvk.kind,
                                k8s_object.name_any(),
                                e
                            ));
                        }
                    }
                }
                Err(e) => {
                    validation_errors.push(format!(
                        "Failed to serialize object '{}': {}",
                        k8s_object.name_any(),
                        e
                    ));
                }
            }
        } else {
            validation_errors.push(format!(
                "Unable to resolve GVK {:?} for object '{}'",
                gvk,
                k8s_object.name_any()
            ));
        }
    }

    if validation_errors.is_empty() {
        Ok(())
    } else {
        Err(validation_errors.join("\n").into())
    }
}
}

brokkr-agent::k8s::api::apply_single_object

private

#![allow(unused)]
fn main() {
async fn apply_single_object (object : & DynamicObject , client : & Client , stack_id : & str , checksum : & str ,) -> Result < () , Box < dyn std :: error :: Error > >
}

Applies a single Kubernetes object with proper annotations.

Parameters:

NameTypeDescription
object-The DynamicObject to apply
client-Kubernetes client
stack_id-Stack ID for annotation
checksum-Checksum for annotation
Source
#![allow(unused)]
fn main() {
async fn apply_single_object(
    object: &DynamicObject,
    client: &Client,
    stack_id: &str,
    checksum: &str,
) -> Result<(), Box<dyn std::error::Error>> {
    let kind = object
        .types
        .as_ref()
        .map(|t| t.kind.clone())
        .unwrap_or_default();
    let namespace = object
        .metadata
        .namespace
        .as_deref()
        .unwrap_or("default")
        .to_string();
    let name = object.metadata.name.as_deref().unwrap_or("").to_string();
    let key = format!("{}:{}@{}", kind, name, namespace);

    debug!(
        "Applying priority object: kind={}, namespace={}, name={}",
        kind, namespace, name
    );

    // Prepare object with annotations
    let mut object = object.clone();
    let annotations = object
        .metadata
        .annotations
        .get_or_insert_with(BTreeMap::new);
    annotations.insert(STACK_LABEL.to_string(), stack_id.to_string());
    annotations.insert(CHECKSUM_ANNOTATION.to_string(), checksum.to_string());

    let mut params = PatchParams::apply("brokkr-controller");
    params.force = true;

    if let Some(gvk) = object.types.as_ref() {
        let gvk = GroupVersionKind::try_from(gvk)?;
        if let Some((ar, caps)) = Discovery::new(client.clone())
            .run()
            .await?
            .resolve_gvk(&gvk)
        {
            let api = dynamic_api(ar, caps, client.clone(), Some(&namespace), false);

            let patch = Patch::Apply(&object);
            match api.patch(&name, &params, &patch).await {
                Ok(_) => {
                    debug!("Successfully applied priority object {}", key);
                    Ok(())
                }
                Err(e) => {
                    error!("Failed to apply priority object {}: {}", key, e);
                    Err(Box::new(e))
                }
            }
        } else {
            Err(format!("Failed to resolve GVK for {}", key).into())
        }
    } else {
        Err(format!("Missing TypeMeta for {}", key).into())
    }
}
}

brokkr-agent::k8s::api::rollback_namespaces

private

#![allow(unused)]
fn main() {
async fn rollback_namespaces (client : & Client , namespaces : & [String])
}

Rolls back namespaces that were created during a failed reconciliation.

Parameters:

NameTypeDescription
client-Kubernetes client
namespaces-List of namespace names to delete
Source
#![allow(unused)]
fn main() {
async fn rollback_namespaces(client: &Client, namespaces: &[String]) {
    if namespaces.is_empty() {
        return;
    }

    warn!(
        "Rolling back {} namespace(s) due to reconciliation failure",
        namespaces.len()
    );

    for ns_name in namespaces {
        info!("Deleting namespace '{}' as part of rollback", ns_name);

        // Create a namespace API
        let ns_api: Api<Namespace> = Api::all(client.clone());

        match ns_api.delete(ns_name, &DeleteParams::default()).await {
            Ok(_) => {
                info!(
                    "Successfully deleted namespace '{}' during rollback",
                    ns_name
                );
            }
            Err(e) => {
                // Log but don't fail - best effort cleanup
                warn!(
                    "Failed to delete namespace '{}' during rollback: {}",
                    ns_name, e
                );
            }
        }
    }
}
}

brokkr-agent::k8s::api::reconcile_target_state

pub

#![allow(unused)]
fn main() {
async fn reconcile_target_state (objects : & [DynamicObject] , client : Client , stack_id : & str , checksum : & str ,) -> Result < () , Box < dyn std :: error :: Error > >
}

Reconciles the target state of Kubernetes objects for a stack.

This function:

  1. Applies priority resources (Namespaces, CRDs) first to ensure dependencies exist
  2. Validates remaining objects against the API server
  3. Applies all resources with server-side apply
  4. Prunes any objects that are no longer part of the desired state but belong to the same stack
  5. Rolls back namespace creation if any part of the reconciliation fails

Parameters:

NameTypeDescription
k8s_objects-List of DynamicObjects representing the desired state
k8s_client-Kubernetes client for API interactions

Returns:

  • Result<(), Box<dyn std::error::Error>> - Success or error with message
Source
#![allow(unused)]
fn main() {
pub async fn reconcile_target_state(
    objects: &[DynamicObject],
    client: Client,
    stack_id: &str,
    checksum: &str,
) -> Result<(), Box<dyn std::error::Error>> {
    info!(
        "Starting reconciliation with stack_id={}, checksum={}",
        stack_id, checksum
    );

    // If we have objects to apply, handle them in dependency order
    if !objects.is_empty() {
        // Separate priority objects (Namespaces, CRDs) from regular objects
        // Priority objects must be applied first before we can validate namespaced resources
        let (priority_objects, regular_objects): (Vec<_>, Vec<_>) =
            objects.iter().partition(|obj| {
                obj.types
                    .as_ref()
                    .map(|t| t.kind == "Namespace" || t.kind == "CustomResourceDefinition")
                    .unwrap_or(false)
            });

        // Track namespaces we create so we can roll them back on failure
        let mut created_namespaces: Vec<String> = Vec::new();

        // Apply priority objects (Namespaces, CRDs) first without validation
        // These are cluster-scoped and don't have namespace dependencies
        if !priority_objects.is_empty() {
            info!(
                "Applying {} priority resources (Namespaces/CRDs) first",
                priority_objects.len()
            );
            for object in &priority_objects {
                // Track namespace names for potential rollback
                if object
                    .types
                    .as_ref()
                    .map(|t| t.kind == "Namespace")
                    .unwrap_or(false)
                {
                    if let Some(name) = &object.metadata.name {
                        created_namespaces.push(name.clone());
                    }
                }

                if let Err(e) = apply_single_object(object, &client, stack_id, checksum).await {
                    // Rollback: delete any namespaces we created
                    rollback_namespaces(&client, &created_namespaces).await;
                    return Err(e);
                }
            }
        }

        // Now validate remaining objects (namespaces exist at this point)
        if !regular_objects.is_empty() {
            debug!("Validating {} regular objects", regular_objects.len());
            let regular_refs: Vec<DynamicObject> =
                regular_objects.iter().map(|o| (*o).clone()).collect();
            if let Err(e) = validate_k8s_objects(&regular_refs, client.clone()).await {
                error!("Validation failed: {}", e);
                // Rollback: delete any namespaces we created
                rollback_namespaces(&client, &created_namespaces).await;
                return Err(e);
            }
            debug!("All objects validated successfully");
        }

        // Apply all resources with server-side apply
        // (Priority objects were already applied, but applying again is idempotent)
        info!("Applying {} resources", objects.len());
        for object in objects {
            let kind = object
                .types
                .as_ref()
                .map(|t| t.kind.clone())
                .unwrap_or_default();
            let namespace = object
                .metadata
                .namespace
                .as_deref()
                .unwrap_or("default")
                .to_string();
            let name = object.metadata.name.as_deref().unwrap_or("").to_string();
            let key = format!("{}:{}@{}", kind, name, namespace);

            debug!(
                "Processing object: kind={}, namespace={}, name={}",
                kind, namespace, name
            );

            // Prepare object with annotations
            let mut object = object.clone();
            let annotations = object
                .metadata
                .annotations
                .get_or_insert_with(BTreeMap::new);
            annotations.insert(STACK_LABEL.to_string(), stack_id.to_string());
            annotations.insert(CHECKSUM_ANNOTATION.to_string(), checksum.to_string());

            let mut params = PatchParams::apply("brokkr-controller");
            params.force = true;

            if let Some(gvk) = object.types.as_ref() {
                let gvk = GroupVersionKind::try_from(gvk)?;
                if let Some((ar, caps)) = Discovery::new(client.clone())
                    .run()
                    .await?
                    .resolve_gvk(&gvk)
                {
                    let api = dynamic_api(ar, caps, client.clone(), Some(&namespace), false);

                    let patch = Patch::Apply(&object);
                    match api.patch(&name, &params, &patch).await {
                        Ok(_) => debug!("Successfully applied {}", key),
                        Err(e) => {
                            error!("Failed to apply {}: {}", key, e);
                            // Rollback: delete any namespaces we created
                            rollback_namespaces(&client, &created_namespaces).await;
                            return Err(Box::new(e));
                        }
                    }
                }
            }
        }
    } else {
        info!("No objects in desired state, will remove all existing objects in stack");
    }

    // Get existing resources with this stack ID after applying changes
    debug!("Fetching existing resources for stack {}", stack_id);
    let existing = get_all_objects_by_annotation(&client, STACK_LABEL, stack_id).await?;
    debug!("Found {} existing resources", existing.len());

    // Prune objects that are no longer in the desired state
    for existing_obj in existing {
        let kind = existing_obj
            .types
            .as_ref()
            .map(|t| t.kind.clone())
            .unwrap_or_default();
        let namespace = existing_obj
            .metadata
            .namespace
            .as_deref()
            .unwrap_or("default")
            .to_string();
        let name = existing_obj
            .metadata
            .name
            .as_deref()
            .unwrap_or("")
            .to_string();
        let key = format!("{}:{}@{}", kind, name, namespace);

        // Skip if object has owner references
        if let Some(owner_refs) = &existing_obj.metadata.owner_references {
            if !owner_refs.is_empty() {
                debug!("Skipping object {} with owner references", key);
                continue;
            }
        }

        // Delete if checksum doesn't match the new checksum
        let existing_checksum = existing_obj
            .metadata
            .annotations
            .as_ref()
            .and_then(|anns| anns.get(CHECKSUM_ANNOTATION))
            .map_or("".to_string(), |v| v.to_string());

        if existing_checksum != checksum {
            info!(
                "Deleting object {} (checksum mismatch: {} != {})",
                key, existing_checksum, checksum
            );
            if let Some(gvk) = existing_obj.types.as_ref() {
                let gvk = GroupVersionKind::try_from(gvk)?;
                if let Some((ar, caps)) = Discovery::new(client.clone())
                    .run()
                    .await?
                    .resolve_gvk(&gvk)
                {
                    let api = dynamic_api(ar, caps, client.clone(), Some(&namespace), false);
                    match api.delete(&name, &DeleteParams::default()).await {
                        Ok(_) => debug!("Successfully deleted {}", key),
                        Err(e) => {
                            error!("Failed to delete {}: {}", key, e);
                            return Err(Box::new(e));
                        }
                    }
                }
            }
        } else {
            debug!("Keeping object {} (checksum matches: {})", key, checksum);
        }
    }

    info!("Reconciliation completed successfully");
    Ok(())
}
}

brokkr-agent::k8s::api::create_k8s_client

pub

#![allow(unused)]
fn main() {
async fn create_k8s_client (kubeconfig_path : Option < & str > ,) -> Result < K8sClient , Box < dyn std :: error :: Error > >
}

Creates a Kubernetes client using either a provided kubeconfig path or default configuration.

Parameters:

NameTypeDescription
kubeconfig_path-Optional path to kubeconfig file

Returns:

  • Result<K8sClient, Box<dyn std::error::Error>> - Kubernetes client or error
Source
#![allow(unused)]
fn main() {
pub async fn create_k8s_client(
    kubeconfig_path: Option<&str>,
) -> Result<K8sClient, Box<dyn std::error::Error>> {
    // Set KUBECONFIG environment variable if path is provided
    if let Some(path) = kubeconfig_path {
        info!("Setting KUBECONFIG environment variable to: {}", path);
        std::env::set_var("KUBECONFIG", path);
    } else {
        info!("Using default Kubernetes configuration");
    }

    let client = K8sClient::try_default()
        .await
        .map_err(|e| format!("Failed to create Kubernetes client: {}", e))?;

    // Verify cluster connectivity using API server version (doesn't require RBAC permissions)
    match client.apiserver_version().await {
        Ok(version) => {
            info!(
                "Successfully connected to Kubernetes cluster (version: {}.{})",
                version.major, version.minor
            );
        }
        Err(e) => {
            error!("Failed to verify Kubernetes cluster connectivity: {}", e);
            return Err(format!("Failed to connect to Kubernetes cluster: {}", e).into());
        }
    }

    Ok(client)
}
}