brokkr-agent::k8s::api Rust
Structs
brokkr-agent::k8s::api::RetryConfig
private
Retry configuration for Kubernetes operations
Fields
| Name | Type | Description |
|---|---|---|
max_elapsed_time | Duration | |
initial_interval | Duration | |
max_interval | Duration | |
multiplier | f64 |
Functions
brokkr-agent::k8s::api::is_retryable_error
private
#![allow(unused)]
fn main() {
fn is_retryable_error (error : & KubeError) -> bool
}
Determines if a Kubernetes error is retryable
Source
#![allow(unused)]
fn main() {
fn is_retryable_error(error: &KubeError) -> bool {
match error {
KubeError::Api(api_err) => {
matches!(api_err.code, 429 | 500 | 503 | 504)
|| matches!(
api_err.reason.as_str(),
"ServiceUnavailable" | "InternalError" | "Timeout"
)
}
_ => false,
}
}
}
brokkr-agent::k8s::api::with_retries
private
#![allow(unused)]
fn main() {
async fn with_retries < F , Fut , T > (operation : F , config : RetryConfig ,) -> Result < T , Box < dyn std :: error :: Error > > where F : Fn () -> Fut , Fut : std :: future :: Future < Output = Result < T , KubeError > > ,
}
Executes a Kubernetes operation with retries
Source
#![allow(unused)]
fn main() {
async fn with_retries<F, Fut, T>(
operation: F,
config: RetryConfig,
) -> Result<T, Box<dyn std::error::Error>>
where
F: Fn() -> Fut,
Fut: std::future::Future<Output = Result<T, KubeError>>,
{
let backoff = ExponentialBackoffBuilder::new()
.with_initial_interval(config.initial_interval)
.with_max_interval(config.max_interval)
.with_multiplier(config.multiplier)
.with_max_elapsed_time(Some(config.max_elapsed_time))
.build();
let operation_with_backoff = || async {
match operation().await {
Ok(value) => Ok(value),
Err(error) => {
if is_retryable_error(&error) {
warn!("Retryable error encountered: {}", error);
Err(backoff::Error::Transient {
err: error,
retry_after: None,
})
} else {
error!("Non-retryable error encountered: {}", error);
Err(backoff::Error::Permanent(error))
}
}
}
};
backoff::future::retry(backoff, operation_with_backoff)
.await
.map_err(|e| Box::new(e) as Box<dyn std::error::Error>)
}
}
brokkr-agent::k8s::api::apply_k8s_objects
pub
#![allow(unused)]
fn main() {
async fn apply_k8s_objects (k8s_objects : & [DynamicObject] , k8s_client : K8sClient , patch_params : PatchParams ,) -> Result < () , Box < dyn std :: error :: Error > >
}
Applies a list of Kubernetes objects to the cluster using server-side apply.
Parameters:
| Name | Type | Description |
|---|---|---|
k8s_objects | - | List of DynamicObjects to apply |
discovery | - | Kubernetes Discovery client for API resource resolution |
k8s_client | - | Kubernetes client for API interactions |
patch_params | - | Parameters for the patch operation |
Returns:
Result<(), Box<dyn std::error::Error>>- Success or error with message
Source
#![allow(unused)]
fn main() {
pub async fn apply_k8s_objects(
k8s_objects: &[DynamicObject],
k8s_client: K8sClient,
patch_params: PatchParams,
) -> Result<(), Box<dyn std::error::Error>> {
info!("Applying {} Kubernetes objects", k8s_objects.len());
let start = Instant::now();
let discovery = Discovery::new(k8s_client.clone())
.run()
.await
.map_err(|e| {
error!("Failed to create Kubernetes discovery client: {}", e);
metrics::kubernetes_operations_total()
.with_label_values(&["apply"])
.inc();
metrics::kubernetes_operation_duration_seconds()
.with_label_values(&["apply"])
.observe(start.elapsed().as_secs_f64());
e
})?;
for k8s_object in k8s_objects {
let default_namespace = &"default".to_string();
let namespace = k8s_object
.metadata
.namespace
.as_deref()
.unwrap_or(default_namespace);
let gvk = if let Some(tm) = &k8s_object.types {
GroupVersionKind::try_from(tm)?
} else {
error!(
"Cannot apply object without valid TypeMeta for object named '{}'",
k8s_object.name_any()
);
return Err(format!(
"Cannot apply object without valid TypeMeta for object named '{}'",
k8s_object.name_any()
)
.into());
};
if let Some((ar, caps)) = discovery.resolve_gvk(&gvk) {
let api = dynamic_api(ar, caps, k8s_client.clone(), Some(namespace), false);
info!(
"Applying {} '{}' in namespace '{}'",
gvk.kind,
k8s_object.name_any(),
namespace
);
debug!("Object content:\n{}", serde_yaml::to_string(&k8s_object)?);
let data = serde_json::to_value(k8s_object)?;
let name = k8s_object.name_any();
let name_for_error = name.clone();
let patch_params = patch_params.clone();
with_retries(
move || {
let api = api.clone();
let name = name.clone();
let data = data.clone();
let patch_params = patch_params.clone();
async move { api.patch(&name, &patch_params, &Patch::Apply(data)).await }
},
RetryConfig::default(),
)
.await
.map_err(|e| {
error!(
"Failed to apply {} '{}' in namespace '{}': {}",
gvk.kind, name_for_error, namespace, e
);
e
})?;
info!(
"Successfully applied {} '{}' in namespace '{}'",
gvk.kind, name_for_error, namespace
);
} else {
error!(
"Failed to resolve GroupVersionKind for {} '{}' in namespace '{}'",
gvk.kind,
k8s_object.name_any(),
namespace
);
return Err(format!(
"Failed to resolve GroupVersionKind for {} '{}' in namespace '{}'",
gvk.kind,
k8s_object.name_any(),
namespace
)
.into());
}
}
info!(
"Successfully applied all {} Kubernetes objects",
k8s_objects.len()
);
// Record metrics for successful apply
metrics::kubernetes_operations_total()
.with_label_values(&["apply"])
.inc();
metrics::kubernetes_operation_duration_seconds()
.with_label_values(&["apply"])
.observe(start.elapsed().as_secs_f64());
Ok(())
}
}
brokkr-agent::k8s::api::dynamic_api
pub
#![allow(unused)]
fn main() {
fn dynamic_api (ar : ApiResource , caps : ApiCapabilities , client : K8sClient , namespace : Option < & str > , all_namespaces : bool ,) -> Api < DynamicObject >
}
Creates a dynamic Kubernetes API client for a specific resource type
Parameters:
| Name | Type | Description |
|---|---|---|
ar | - | ApiResource describing the Kubernetes resource type |
caps | - | Capabilities of the API (e.g., list, watch, etc.) |
client | - | Kubernetes client instance |
namespace | - | Optional namespace to scope the API to |
all_namespaces | - | Whether to operate across all namespaces |
Returns:
An Api
Source
#![allow(unused)]
fn main() {
pub fn dynamic_api(
ar: ApiResource,
caps: ApiCapabilities,
client: K8sClient,
namespace: Option<&str>,
all_namespaces: bool,
) -> Api<DynamicObject> {
if caps.scope == Scope::Cluster || all_namespaces {
Api::all_with(client, &ar)
} else if let Some(namespace) = namespace {
Api::namespaced_with(client, namespace, &ar)
} else {
Api::default_namespaced_with(client, &ar)
}
}
}
brokkr-agent::k8s::api::get_all_objects_by_annotation
pub
#![allow(unused)]
fn main() {
async fn get_all_objects_by_annotation (k8s_client : & K8sClient , annotation_key : & str , annotation_value : & str ,) -> Result < Vec < DynamicObject > , Box < dyn std :: error :: Error > >
}
Retrieves all Kubernetes objects with a specific annotation key-value pair.
Parameters:
| Name | Type | Description |
|---|---|---|
k8s_client | - | Kubernetes client |
discovery | - | Kubernetes Discovery client |
annotation_key | - | Annotation key to filter by |
annotation_value | - | Annotation value to filter by |
Returns:
Result<Vec<DynamicObject>, Box<dyn std::error::Error>>- List of matching objects or error
Source
#![allow(unused)]
fn main() {
pub async fn get_all_objects_by_annotation(
k8s_client: &K8sClient,
annotation_key: &str,
annotation_value: &str,
) -> Result<Vec<DynamicObject>, Box<dyn std::error::Error>> {
let mut results = Vec::new();
let discovery = Discovery::new(k8s_client.clone())
.run()
.await
.expect("Failed to create discovery client");
// Search through all API groups and resources
for group in discovery.groups() {
for (ar, caps) in group.recommended_resources() {
let api: Api<DynamicObject> =
dynamic_api(ar.clone(), caps.clone(), k8s_client.clone(), None, true);
match api.list(&Default::default()).await {
Ok(list) => {
let matching_objects = list
.items
.into_iter()
.filter(|obj| {
obj.metadata
.annotations
.as_ref()
.and_then(|annotations| annotations.get(annotation_key))
.is_some_and(|value| value == annotation_value)
})
.map(|mut obj| {
// Set TypeMeta directly
obj.types = Some(TypeMeta {
api_version: if ar.group.is_empty() {
ar.version.clone()
} else {
format!("{}/{}", ar.group, ar.version)
},
kind: ar.kind.clone(),
});
obj
});
results.extend(matching_objects);
}
Err(e) => warn!("Error listing resources for {:?}: {:?}", ar, e),
}
}
}
Ok(results)
}
}
brokkr-agent::k8s::api::delete_k8s_objects
pub
#![allow(unused)]
fn main() {
async fn delete_k8s_objects (k8s_objects : & [DynamicObject] , k8s_client : K8sClient , agent_id : & Uuid ,) -> Result < () , Box < dyn std :: error :: Error > >
}
Deletes a list of Kubernetes objects from the cluster.
Parameters:
| Name | Type | Description |
|---|---|---|
k8s_objects | - | List of DynamicObjects to delete |
discovery | - | Kubernetes Discovery client for API resource resolution |
k8s_client | - | Kubernetes client for API interactions |
Returns:
Result<(), Box<dyn std::error::Error>>- Success or error with message
Source
#![allow(unused)]
fn main() {
pub async fn delete_k8s_objects(
k8s_objects: &[DynamicObject],
k8s_client: K8sClient,
agent_id: &Uuid,
) -> Result<(), Box<dyn std::error::Error>> {
info!(
"Starting deletion of {} Kubernetes objects",
k8s_objects.len()
);
let discovery = Discovery::new(k8s_client.clone())
.run()
.await
.expect("Failed to create discovery client");
for k8s_object in k8s_objects {
// Verify ownership before attempting deletion
if !verify_object_ownership(k8s_object, agent_id) {
error!(
"Cannot delete object '{}' (kind: {}) as it is not owned by agent {}",
k8s_object.name_any(),
k8s_object.types.as_ref().map_or("unknown", |t| &t.kind),
agent_id
);
return Err(format!(
"Cannot delete object '{}' as it is not owned by this agent",
k8s_object.name_any()
)
.into());
}
debug!("Processing k8s object for deletion: {:?}", k8s_object);
let default_namespace = &"default".to_string();
let namespace = k8s_object
.metadata
.namespace
.as_ref()
.unwrap_or(default_namespace);
let gvk = if let Some(tm) = &k8s_object.types {
GroupVersionKind::try_from(tm)?
} else {
error!(
"Cannot delete object '{}' without valid TypeMeta",
k8s_object.name_any()
);
return Err(format!(
"Cannot delete object without valid TypeMeta {:?}",
k8s_object
)
.into());
};
if let Some((ar, caps)) = discovery.resolve_gvk(&gvk) {
let api = dynamic_api(ar, caps, k8s_client.clone(), Some(namespace), false);
let name = k8s_object.name_any();
let name_for_error = name.clone();
info!(
"Deleting {} '{}' in namespace '{}'",
gvk.kind, name, namespace
);
with_retries(
move || {
let api = api.clone();
let name = name.clone();
async move { api.delete(&name, &Default::default()).await }
},
RetryConfig::default(),
)
.await
.map_err(|e| {
error!(
"Failed to delete {} '{}' in namespace '{}': {}",
gvk.kind, name_for_error, namespace, e
);
e
})?;
info!(
"Successfully deleted {} '{}' in namespace '{}'",
gvk.kind, name_for_error, namespace
);
}
}
info!(
"Successfully deleted all {} Kubernetes objects",
k8s_objects.len()
);
Ok(())
}
}
brokkr-agent::k8s::api::validate_k8s_objects
pub
#![allow(unused)]
fn main() {
async fn validate_k8s_objects (k8s_objects : & [DynamicObject] , k8s_client : K8sClient ,) -> Result < () , Box < dyn std :: error :: Error > >
}
Validates Kubernetes objects against the API server without applying them.
Parameters:
| Name | Type | Description |
|---|---|---|
k8s_objects | - | List of DynamicObjects to validate |
k8s_client | - | Kubernetes client for API interactions |
Returns:
Result<(), Box<dyn std::error::Error>>- Success or error with validation message
Source
#![allow(unused)]
fn main() {
pub async fn validate_k8s_objects(
k8s_objects: &[DynamicObject],
k8s_client: K8sClient,
) -> Result<(), Box<dyn std::error::Error>> {
let mut validation_errors = Vec::new();
let discovery = Discovery::new(k8s_client.clone())
.run()
.await
.expect("Failed to create discovery client");
for k8s_object in k8s_objects {
let default_namespace = &"default".to_string();
let namespace = k8s_object
.metadata
.namespace
.as_deref()
.unwrap_or(default_namespace);
let gvk = if let Some(tm) = &k8s_object.types {
match GroupVersionKind::try_from(tm) {
Ok(gvk) => gvk,
Err(e) => {
validation_errors.push(format!(
"Invalid TypeMeta for object '{}': {}",
k8s_object.name_any(),
e
));
continue;
}
}
} else {
validation_errors.push(format!(
"Missing TypeMeta for object '{}'",
k8s_object.name_any()
));
continue;
};
if let Some((ar, caps)) = discovery.resolve_gvk(&gvk) {
let api = dynamic_api(ar, caps, k8s_client.clone(), Some(namespace), false);
match serde_json::to_value(k8s_object) {
Ok(data) => {
let mut patch_params = PatchParams::apply("validation");
patch_params = patch_params.dry_run();
patch_params.force = true;
match api
.patch(&k8s_object.name_any(), &patch_params, &Patch::Apply(data))
.await
{
Ok(_) => {
info!(
"Validation successful for {:?} '{}'",
gvk.kind,
k8s_object.name_any()
);
}
Err(e) => {
error!(
"Validation failed for {:?} '{}': {:?}",
gvk.kind,
k8s_object.name_any(),
e
);
validation_errors.push(format!(
"Validation failed for {} '{}': {}",
gvk.kind,
k8s_object.name_any(),
e
));
}
}
}
Err(e) => {
validation_errors.push(format!(
"Failed to serialize object '{}': {}",
k8s_object.name_any(),
e
));
}
}
} else {
validation_errors.push(format!(
"Unable to resolve GVK {:?} for object '{}'",
gvk,
k8s_object.name_any()
));
}
}
if validation_errors.is_empty() {
Ok(())
} else {
Err(validation_errors.join("\n").into())
}
}
}
brokkr-agent::k8s::api::apply_single_object
private
#![allow(unused)]
fn main() {
async fn apply_single_object (object : & DynamicObject , client : & Client , stack_id : & str , checksum : & str ,) -> Result < () , Box < dyn std :: error :: Error > >
}
Applies a single Kubernetes object with proper annotations.
Parameters:
| Name | Type | Description |
|---|---|---|
object | - | The DynamicObject to apply |
client | - | Kubernetes client |
stack_id | - | Stack ID for annotation |
checksum | - | Checksum for annotation |
Source
#![allow(unused)]
fn main() {
async fn apply_single_object(
object: &DynamicObject,
client: &Client,
stack_id: &str,
checksum: &str,
) -> Result<(), Box<dyn std::error::Error>> {
let kind = object
.types
.as_ref()
.map(|t| t.kind.clone())
.unwrap_or_default();
let namespace = object
.metadata
.namespace
.as_deref()
.unwrap_or("default")
.to_string();
let name = object.metadata.name.as_deref().unwrap_or("").to_string();
let key = format!("{}:{}@{}", kind, name, namespace);
debug!(
"Applying priority object: kind={}, namespace={}, name={}",
kind, namespace, name
);
// Prepare object with annotations
let mut object = object.clone();
let annotations = object
.metadata
.annotations
.get_or_insert_with(BTreeMap::new);
annotations.insert(STACK_LABEL.to_string(), stack_id.to_string());
annotations.insert(CHECKSUM_ANNOTATION.to_string(), checksum.to_string());
let mut params = PatchParams::apply("brokkr-controller");
params.force = true;
if let Some(gvk) = object.types.as_ref() {
let gvk = GroupVersionKind::try_from(gvk)?;
if let Some((ar, caps)) = Discovery::new(client.clone())
.run()
.await?
.resolve_gvk(&gvk)
{
let api = dynamic_api(ar, caps, client.clone(), Some(&namespace), false);
let patch = Patch::Apply(&object);
match api.patch(&name, ¶ms, &patch).await {
Ok(_) => {
debug!("Successfully applied priority object {}", key);
Ok(())
}
Err(e) => {
error!("Failed to apply priority object {}: {}", key, e);
Err(Box::new(e))
}
}
} else {
Err(format!("Failed to resolve GVK for {}", key).into())
}
} else {
Err(format!("Missing TypeMeta for {}", key).into())
}
}
}
brokkr-agent::k8s::api::rollback_namespaces
private
#![allow(unused)]
fn main() {
async fn rollback_namespaces (client : & Client , namespaces : & [String])
}
Rolls back namespaces that were created during a failed reconciliation.
Parameters:
| Name | Type | Description |
|---|---|---|
client | - | Kubernetes client |
namespaces | - | List of namespace names to delete |
Source
#![allow(unused)]
fn main() {
async fn rollback_namespaces(client: &Client, namespaces: &[String]) {
if namespaces.is_empty() {
return;
}
warn!(
"Rolling back {} namespace(s) due to reconciliation failure",
namespaces.len()
);
for ns_name in namespaces {
info!("Deleting namespace '{}' as part of rollback", ns_name);
// Create a namespace API
let ns_api: Api<Namespace> = Api::all(client.clone());
match ns_api.delete(ns_name, &DeleteParams::default()).await {
Ok(_) => {
info!(
"Successfully deleted namespace '{}' during rollback",
ns_name
);
}
Err(e) => {
// Log but don't fail - best effort cleanup
warn!(
"Failed to delete namespace '{}' during rollback: {}",
ns_name, e
);
}
}
}
}
}
brokkr-agent::k8s::api::reconcile_target_state
pub
#![allow(unused)]
fn main() {
async fn reconcile_target_state (objects : & [DynamicObject] , client : Client , stack_id : & str , checksum : & str ,) -> Result < () , Box < dyn std :: error :: Error > >
}
Reconciles the target state of Kubernetes objects for a stack.
This function:
- Applies priority resources (Namespaces, CRDs) first to ensure dependencies exist
- Validates remaining objects against the API server
- Applies all resources with server-side apply
- Prunes any objects that are no longer part of the desired state but belong to the same stack
- Rolls back namespace creation if any part of the reconciliation fails
Parameters:
| Name | Type | Description |
|---|---|---|
k8s_objects | - | List of DynamicObjects representing the desired state |
k8s_client | - | Kubernetes client for API interactions |
Returns:
Result<(), Box<dyn std::error::Error>>- Success or error with message
Source
#![allow(unused)]
fn main() {
pub async fn reconcile_target_state(
objects: &[DynamicObject],
client: Client,
stack_id: &str,
checksum: &str,
) -> Result<(), Box<dyn std::error::Error>> {
info!(
"Starting reconciliation with stack_id={}, checksum={}",
stack_id, checksum
);
// If we have objects to apply, handle them in dependency order
if !objects.is_empty() {
// Separate priority objects (Namespaces, CRDs) from regular objects
// Priority objects must be applied first before we can validate namespaced resources
let (priority_objects, regular_objects): (Vec<_>, Vec<_>) =
objects.iter().partition(|obj| {
obj.types
.as_ref()
.map(|t| t.kind == "Namespace" || t.kind == "CustomResourceDefinition")
.unwrap_or(false)
});
// Track namespaces we create so we can roll them back on failure
let mut created_namespaces: Vec<String> = Vec::new();
// Apply priority objects (Namespaces, CRDs) first without validation
// These are cluster-scoped and don't have namespace dependencies
if !priority_objects.is_empty() {
info!(
"Applying {} priority resources (Namespaces/CRDs) first",
priority_objects.len()
);
for object in &priority_objects {
// Track namespace names for potential rollback
if object
.types
.as_ref()
.map(|t| t.kind == "Namespace")
.unwrap_or(false)
{
if let Some(name) = &object.metadata.name {
created_namespaces.push(name.clone());
}
}
if let Err(e) = apply_single_object(object, &client, stack_id, checksum).await {
// Rollback: delete any namespaces we created
rollback_namespaces(&client, &created_namespaces).await;
return Err(e);
}
}
}
// Now validate remaining objects (namespaces exist at this point)
if !regular_objects.is_empty() {
debug!("Validating {} regular objects", regular_objects.len());
let regular_refs: Vec<DynamicObject> =
regular_objects.iter().map(|o| (*o).clone()).collect();
if let Err(e) = validate_k8s_objects(®ular_refs, client.clone()).await {
error!("Validation failed: {}", e);
// Rollback: delete any namespaces we created
rollback_namespaces(&client, &created_namespaces).await;
return Err(e);
}
debug!("All objects validated successfully");
}
// Apply all resources with server-side apply
// (Priority objects were already applied, but applying again is idempotent)
info!("Applying {} resources", objects.len());
for object in objects {
let kind = object
.types
.as_ref()
.map(|t| t.kind.clone())
.unwrap_or_default();
let namespace = object
.metadata
.namespace
.as_deref()
.unwrap_or("default")
.to_string();
let name = object.metadata.name.as_deref().unwrap_or("").to_string();
let key = format!("{}:{}@{}", kind, name, namespace);
debug!(
"Processing object: kind={}, namespace={}, name={}",
kind, namespace, name
);
// Prepare object with annotations
let mut object = object.clone();
let annotations = object
.metadata
.annotations
.get_or_insert_with(BTreeMap::new);
annotations.insert(STACK_LABEL.to_string(), stack_id.to_string());
annotations.insert(CHECKSUM_ANNOTATION.to_string(), checksum.to_string());
let mut params = PatchParams::apply("brokkr-controller");
params.force = true;
if let Some(gvk) = object.types.as_ref() {
let gvk = GroupVersionKind::try_from(gvk)?;
if let Some((ar, caps)) = Discovery::new(client.clone())
.run()
.await?
.resolve_gvk(&gvk)
{
let api = dynamic_api(ar, caps, client.clone(), Some(&namespace), false);
let patch = Patch::Apply(&object);
match api.patch(&name, ¶ms, &patch).await {
Ok(_) => debug!("Successfully applied {}", key),
Err(e) => {
error!("Failed to apply {}: {}", key, e);
// Rollback: delete any namespaces we created
rollback_namespaces(&client, &created_namespaces).await;
return Err(Box::new(e));
}
}
}
}
}
} else {
info!("No objects in desired state, will remove all existing objects in stack");
}
// Get existing resources with this stack ID after applying changes
debug!("Fetching existing resources for stack {}", stack_id);
let existing = get_all_objects_by_annotation(&client, STACK_LABEL, stack_id).await?;
debug!("Found {} existing resources", existing.len());
// Prune objects that are no longer in the desired state
for existing_obj in existing {
let kind = existing_obj
.types
.as_ref()
.map(|t| t.kind.clone())
.unwrap_or_default();
let namespace = existing_obj
.metadata
.namespace
.as_deref()
.unwrap_or("default")
.to_string();
let name = existing_obj
.metadata
.name
.as_deref()
.unwrap_or("")
.to_string();
let key = format!("{}:{}@{}", kind, name, namespace);
// Skip if object has owner references
if let Some(owner_refs) = &existing_obj.metadata.owner_references {
if !owner_refs.is_empty() {
debug!("Skipping object {} with owner references", key);
continue;
}
}
// Delete if checksum doesn't match the new checksum
let existing_checksum = existing_obj
.metadata
.annotations
.as_ref()
.and_then(|anns| anns.get(CHECKSUM_ANNOTATION))
.map_or("".to_string(), |v| v.to_string());
if existing_checksum != checksum {
info!(
"Deleting object {} (checksum mismatch: {} != {})",
key, existing_checksum, checksum
);
if let Some(gvk) = existing_obj.types.as_ref() {
let gvk = GroupVersionKind::try_from(gvk)?;
if let Some((ar, caps)) = Discovery::new(client.clone())
.run()
.await?
.resolve_gvk(&gvk)
{
let api = dynamic_api(ar, caps, client.clone(), Some(&namespace), false);
match api.delete(&name, &DeleteParams::default()).await {
Ok(_) => debug!("Successfully deleted {}", key),
Err(e) => {
error!("Failed to delete {}: {}", key, e);
return Err(Box::new(e));
}
}
}
}
} else {
debug!("Keeping object {} (checksum matches: {})", key, checksum);
}
}
info!("Reconciliation completed successfully");
Ok(())
}
}
brokkr-agent::k8s::api::create_k8s_client
pub
#![allow(unused)]
fn main() {
async fn create_k8s_client (kubeconfig_path : Option < & str > ,) -> Result < K8sClient , Box < dyn std :: error :: Error > >
}
Creates a Kubernetes client using either a provided kubeconfig path or default configuration.
Parameters:
| Name | Type | Description |
|---|---|---|
kubeconfig_path | - | Optional path to kubeconfig file |
Returns:
Result<K8sClient, Box<dyn std::error::Error>>- Kubernetes client or error
Source
#![allow(unused)]
fn main() {
pub async fn create_k8s_client(
kubeconfig_path: Option<&str>,
) -> Result<K8sClient, Box<dyn std::error::Error>> {
// Set KUBECONFIG environment variable if path is provided
if let Some(path) = kubeconfig_path {
info!("Setting KUBECONFIG environment variable to: {}", path);
std::env::set_var("KUBECONFIG", path);
} else {
info!("Using default Kubernetes configuration");
}
let client = K8sClient::try_default()
.await
.map_err(|e| format!("Failed to create Kubernetes client: {}", e))?;
// Verify cluster connectivity using API server version (doesn't require RBAC permissions)
match client.apiserver_version().await {
Ok(version) => {
info!(
"Successfully connected to Kubernetes cluster (version: {}.{})",
version.major, version.minor
);
}
Err(e) => {
error!("Failed to verify Kubernetes cluster connectivity: {}", e);
return Err(format!("Failed to connect to Kubernetes cluster: {}", e).into());
}
}
Ok(client)
}
}