Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

brokkr-broker::cli::commands Rust

Structs

brokkr-broker::cli::commands::Count

private

Derives: QueryableByName, Debug

Fields

NameTypeDescription
counti64

Functions

brokkr-broker::cli::commands::serve

pub

#![allow(unused)]
fn main() {
async fn serve (config : & Settings) -> Result < () , Box < dyn std :: error :: Error > >
}

Function to start the Brokkr Broker server

This function initializes the database, runs migrations, checks for first-time setup, configures API routes, and starts the server with graceful shutdown support.

Source
#![allow(unused)]
fn main() {
pub async fn serve(config: &Settings) -> Result<(), Box<dyn std::error::Error>> {
    info!("Starting Brokkr Broker application");

    // Create database connection pool
    // Pool size needs to accommodate:
    // - 5 background tasks (diagnostic cleanup, work order maintenance, webhook delivery/cleanup, audit cleanup)
    // - HTTP requests (middleware holds 1 connection while DAL methods need another)
    // - Concurrent request handling and webhook event emission
    info!("Creating database connection pool");
    let connection_pool = create_shared_connection_pool(
        &config.database.url,
        "brokkr",
        50,
        config.database.schema.as_deref(),
    );
    info!("Database connection pool created successfully");

    // Set up schema if configured (for multi-tenant deployments)
    if let Some(ref schema) = config.database.schema {
        info!("Setting up schema: {}", schema);
        connection_pool
            .setup_schema(schema)
            .expect("Failed to set up schema");
        info!("Schema '{}' set up successfully", schema);
    }

    // Run pending migrations
    info!("Running pending database migrations");
    let mut conn = connection_pool.get().expect("Failed to get DB connection");

    conn.run_pending_migrations(MIGRATIONS)
        .expect("Failed to run migrations");
    info!("Database migrations completed successfully");

    // Check if this is the first time running the application
    let is_first_run = conn
        .transaction(|conn| {
            let result: Count =
                sql_query("SELECT COUNT(*) as count FROM app_initialization").get_result(conn)?;
            if result.count == 0 {
                // If it's the first run, insert a record into app_initialization
                sql_query("INSERT INTO app_initialization DEFAULT VALUES").execute(conn)?;
                Ok::<bool, DieselError>(true)
            } else {
                Ok::<bool, DieselError>(false)
            }
        })
        .expect("Failed to check initialization status");

    // Perform first-time setup if necessary
    if is_first_run {
        info!("First time application startup detected. Creating admin role...");
        utils::first_startup(&mut conn, config)?;
    } else {
        info!("Existing application detected. Proceeding with normal startup.");
    }

    // Initialize Data Access Layer
    info!("Initializing Data Access Layer");
    let auth_cache_ttl = config.broker.auth_cache_ttl_seconds.unwrap_or(60);
    let dal = DAL::new_with_auth_cache(connection_pool.clone(), auth_cache_ttl);
    info!(
        "Auth cache TTL: {}s ({})",
        auth_cache_ttl,
        if auth_cache_ttl > 0 {
            "enabled"
        } else {
            "disabled"
        }
    );

    // Initialize encryption key for webhooks
    info!("Initializing encryption key");
    utils::encryption::init_encryption_key(config.broker.webhook_encryption_key.as_deref())
        .expect("Failed to initialize encryption key");

    // Initialize audit logger for compliance tracking
    info!("Initializing audit logger");
    utils::audit::init_audit_logger(dal.clone()).expect("Failed to initialize audit logger");

    // Start background tasks
    info!("Starting background tasks");
    let cleanup_config = utils::background_tasks::DiagnosticCleanupConfig {
        interval_seconds: config
            .broker
            .diagnostic_cleanup_interval_seconds
            .unwrap_or(900),
        max_age_hours: config.broker.diagnostic_max_age_hours.unwrap_or(1),
    };
    utils::background_tasks::start_diagnostic_cleanup_task(dal.clone(), cleanup_config);

    // Start work order maintenance task (retry processing and stale claim detection)
    let work_order_config = utils::background_tasks::WorkOrderMaintenanceConfig::default();
    utils::background_tasks::start_work_order_maintenance_task(dal.clone(), work_order_config);

    // Start webhook delivery worker
    let webhook_delivery_config = utils::background_tasks::WebhookDeliveryConfig {
        interval_seconds: config.broker.webhook_delivery_interval_seconds.unwrap_or(5),
        batch_size: config.broker.webhook_delivery_batch_size.unwrap_or(50),
    };
    utils::background_tasks::start_webhook_delivery_task(dal.clone(), webhook_delivery_config);

    // Start webhook cleanup task
    let webhook_cleanup_config = utils::background_tasks::WebhookCleanupConfig {
        interval_seconds: 3600, // Every hour
        retention_days: config.broker.webhook_cleanup_retention_days.unwrap_or(7),
    };
    utils::background_tasks::start_webhook_cleanup_task(dal.clone(), webhook_cleanup_config);

    // Start audit log cleanup task
    let audit_cleanup_config = utils::background_tasks::AuditLogCleanupConfig {
        interval_seconds: 86400, // Daily
        retention_days: config.broker.audit_log_retention_days.unwrap_or(90),
    };
    utils::background_tasks::start_audit_log_cleanup_task(dal.clone(), audit_cleanup_config);

    // Create reloadable configuration for hot-reload support
    info!("Initializing reloadable configuration");
    let reloadable_config = ReloadableConfig::from_settings(config.clone(), None);

    // Start ConfigMap watcher for Kubernetes hot-reload (if running in K8s)
    if let Some(watcher_config) = utils::config_watcher::ConfigWatcherConfig::from_environment() {
        utils::config_watcher::start_config_watcher(reloadable_config.clone(), watcher_config);
    }

    // Configure API routes
    info!("Configuring API routes");
    let app = api::configure_api_routes(dal.clone(), &config.cors, Some(reloadable_config))
        .with_state(dal);

    // Set up the server address
    let addr = "0.0.0.0:3000";
    info!("Starting server on {}", addr);
    let listener = tokio::net::TcpListener::bind(addr).await?;

    // Set up shutdown signal handler
    let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();
    tokio::spawn(async move {
        signal::ctrl_c().await.expect("Failed to listen for ctrl+c");
        shutdown_tx.send(()).ok();
    });

    // Start the server with graceful shutdown
    info!("Brokkr Broker is now running");
    axum::serve(listener, app)
        .with_graceful_shutdown(utils::shutdown(shutdown_rx))
        .await?;

    Ok(())
}
}

brokkr-broker::cli::commands::rotate_admin

pub

#![allow(unused)]
fn main() {
fn rotate_admin (config : & Settings) -> Result < () , Box < dyn std :: error :: Error > >
}

Function to rotate the admin key

This function generates a new admin key and updates it in the database.

Source
#![allow(unused)]
fn main() {
pub fn rotate_admin(config: &Settings) -> Result<(), Box<dyn std::error::Error>> {
    info!("Rotating admin key");

    // Create database connection
    let mut conn = PgConnection::establish(&config.database.url)
        .expect("Failed to establish database connection");

    // Run the first_startup function to generate a new admin key
    utils::upsert_admin(&mut conn, config)?;

    info!("Admin key rotated successfully");
    Ok(())
}
}

brokkr-broker::cli::commands::rotate_agent_key

pub

#![allow(unused)]
fn main() {
fn rotate_agent_key (config : & Settings , uuid : Uuid) -> Result < () , Box < dyn std :: error :: Error > >
}
Source
#![allow(unused)]
fn main() {
pub fn rotate_agent_key(config: &Settings, uuid: Uuid) -> Result<(), Box<dyn std::error::Error>> {
    info!("Rotating agent key");

    let pool = create_shared_connection_pool(
        &config.database.url,
        "brokkr",
        1,
        config.database.schema.as_deref(),
    );
    let dal = DAL::new(pool.clone());

    let agent = dal.agents().get(uuid)?.ok_or("Agent not found")?;
    let new_pak_hash = utils::pak::create_pak()?.1;
    dal.agents().update_pak_hash(agent.id, new_pak_hash)?;

    info!("Agent key rotated successfully for agent: {}", agent.name);
    Ok(())
}
}

brokkr-broker::cli::commands::rotate_generator_key

pub

#![allow(unused)]
fn main() {
fn rotate_generator_key (config : & Settings , uuid : Uuid ,) -> Result < () , Box < dyn std :: error :: Error > >
}
Source
#![allow(unused)]
fn main() {
pub fn rotate_generator_key(
    config: &Settings,
    uuid: Uuid,
) -> Result<(), Box<dyn std::error::Error>> {
    info!("Rotating generator key");

    let pool = create_shared_connection_pool(
        &config.database.url,
        "brokkr",
        1,
        config.database.schema.as_deref(),
    );
    let dal = DAL::new(pool.clone());

    let generator = dal.generators().get(uuid)?.ok_or("Generator not found")?;

    let new_pak_hash = utils::pak::create_pak()?.1;
    dal.generators()
        .update_pak_hash(generator.id, new_pak_hash)?;

    info!(
        "Generator key rotated successfully for generator: {}",
        generator.name
    );
    Ok(())
}
}

brokkr-broker::cli::commands::create_agent

pub

#![allow(unused)]
fn main() {
fn create_agent (config : & Settings , name : String , cluster_name : String ,) -> Result < () , Box < dyn std :: error :: Error > >
}
Source
#![allow(unused)]
fn main() {
pub fn create_agent(
    config: &Settings,
    name: String,
    cluster_name: String,
) -> Result<(), Box<dyn std::error::Error>> {
    info!("Creating new agent: {}", name);

    // Use pool size 2 because agent creation emits webhook events
    // which require a second connection while the first is still held
    let pool = create_shared_connection_pool(
        &config.database.url,
        "brokkr",
        2,
        config.database.schema.as_deref(),
    );
    let dal = DAL::new(pool.clone());

    let new_agent = NewAgent::new(name, cluster_name)
        .map_err(|e| format!("Failed to create NewAgent: {}", e))?;

    let (pak, pak_hash) = pak::create_pak()?;

    let agent = dal.agents().create(&new_agent)?;
    dal.agents().update_pak_hash(agent.id, pak_hash)?;

    info!("Successfully created agent with ID: {}", agent.id);
    println!("Agent created successfully:");
    println!("ID: {}", agent.id);
    println!("Name: {}", agent.name);
    println!("Cluster: {}", agent.cluster_name);
    println!("Initial PAK: {}", pak);

    Ok(())
}
}

brokkr-broker::cli::commands::create_generator

pub

#![allow(unused)]
fn main() {
fn create_generator (config : & Settings , name : String , description : Option < String > ,) -> Result < () , Box < dyn std :: error :: Error > >
}
Source
#![allow(unused)]
fn main() {
pub fn create_generator(
    config: &Settings,
    name: String,
    description: Option<String>,
) -> Result<(), Box<dyn std::error::Error>> {
    info!("Creating new generator: {}", name);

    let pool = create_shared_connection_pool(
        &config.database.url,
        "brokkr",
        1,
        config.database.schema.as_deref(),
    );
    let dal = DAL::new(pool.clone());

    let new_generator = NewGenerator::new(name, description)
        .map_err(|e| format!("Failed to create NewGenerator: {}", e))?;

    let (pak, pak_hash) = pak::create_pak()?;

    let generator = dal.generators().create(&new_generator)?;
    dal.generators().update_pak_hash(generator.id, pak_hash)?;

    info!("Successfully created generator with ID: {}", generator.id);
    println!("Generator created successfully:");
    println!("ID: {}", generator.id);
    println!("Name: {}", generator.name);
    println!("Initial PAK: {}", pak);

    Ok(())
}
}