Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

brokkr-broker::utils::config_watcher Rust

Configuration file watcher for hot-reload support.

This module provides functionality to watch for changes to the broker’s configuration file and trigger configuration reloads automatically.

Structs

brokkr-broker::utils::config_watcher::ConfigWatcherConfig

pub

Derives: Debug, Clone

Configuration for the file watcher.

Fields

NameTypeDescription
config_file_pathStringPath to the configuration file to watch.
debounce_durationDurationDebounce duration to prevent rapid successive reloads.
enabledboolWhether the watcher is enabled.

Methods

from_environment pub
#![allow(unused)]
fn main() {
fn from_environment () -> Option < Self >
}

Creates a new ConfigWatcherConfig from environment variables.

Looks for BROKKR_CONFIG_FILE environment variable to determine the config file path. If not set, returns None (watcher disabled).

Source
#![allow(unused)]
fn main() {
    pub fn from_environment() -> Option<Self> {
        // Check if config file path is specified
        let config_file_path = match std::env::var("BROKKR_CONFIG_FILE") {
            Ok(path) if !path.is_empty() => path,
            _ => {
                debug!("BROKKR_CONFIG_FILE not set, config file watcher disabled");
                return None;
            }
        };

        // Verify the file exists
        if !Path::new(&config_file_path).exists() {
            warn!(
                "Config file '{}' does not exist, config file watcher disabled",
                config_file_path
            );
            return None;
        }

        // Check if watcher is explicitly disabled
        let enabled = std::env::var("BROKKR_CONFIG_WATCHER_ENABLED")
            .map(|v| v.to_lowercase() != "false" && v != "0")
            .unwrap_or(true);

        if !enabled {
            info!("Config file watcher explicitly disabled via environment variable");
            return None;
        }

        // Get debounce duration from environment (in seconds)
        let debounce_secs = std::env::var("BROKKR_CONFIG_WATCHER_DEBOUNCE_SECONDS")
            .ok()
            .and_then(|v| v.parse().ok())
            .unwrap_or(5);

        Some(Self {
            config_file_path,
            debounce_duration: Duration::from_secs(debounce_secs),
            enabled: true,
        })
    }
}

Functions

brokkr-broker::utils::config_watcher::start_config_watcher

pub

#![allow(unused)]
fn main() {
fn start_config_watcher (config : ReloadableConfig , watcher_config : ConfigWatcherConfig ,) -> Option < tokio :: task :: JoinHandle < () > >
}

Starts the configuration file watcher as a background task.

This function spawns a tokio task that watches for changes to the specified configuration file and triggers configuration reloads with debouncing.

Parameters:

NameTypeDescription
config-The ReloadableConfig instance to reload on changes.
watcher_config-Configuration for the watcher.

Returns:

A handle to the spawned task, or None if the watcher couldn’t be started.

Source
#![allow(unused)]
fn main() {
pub fn start_config_watcher(
    config: ReloadableConfig,
    watcher_config: ConfigWatcherConfig,
) -> Option<tokio::task::JoinHandle<()>> {
    if !watcher_config.enabled {
        info!("Config file watcher is disabled");
        return None;
    }

    info!(
        "Starting config file watcher for '{}' with {}s debounce",
        watcher_config.config_file_path,
        watcher_config.debounce_duration.as_secs()
    );

    let handle = tokio::spawn(async move {
        if let Err(e) = run_config_watcher(config, watcher_config).await {
            error!("Config file watcher error: {}", e);
        }
    });

    Some(handle)
}
}

brokkr-broker::utils::config_watcher::run_config_watcher

private

#![allow(unused)]
fn main() {
async fn run_config_watcher (config : ReloadableConfig , watcher_config : ConfigWatcherConfig ,) -> Result < () , Box < dyn std :: error :: Error + Send + Sync > >
}

Internal function that runs the configuration file watcher loop.

Source
#![allow(unused)]
fn main() {
async fn run_config_watcher(
    config: ReloadableConfig,
    watcher_config: ConfigWatcherConfig,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
    let config_path = watcher_config.config_file_path.clone();
    let debounce_duration = watcher_config.debounce_duration;

    // Create a channel for file events
    let (tx, rx) = mpsc::channel();

    // Create a file watcher
    let mut watcher: RecommendedWatcher =
        notify::recommended_watcher(move |res: Result<Event, notify::Error>| {
            if let Ok(event) = res {
                // Only send for modify/create events
                if event.kind.is_modify() || event.kind.is_create() {
                    let _ = tx.send(());
                }
            }
        })?;

    // Watch the config file's parent directory (some editors replace files atomically)
    let config_path_ref = Path::new(&config_path);
    let watch_path = config_path_ref.parent().unwrap_or(config_path_ref);

    watcher.watch(watch_path, RecursiveMode::NonRecursive)?;

    info!("Config file watcher started for '{}'", config_path);

    // Track last reload time for debouncing
    let mut last_reload: Option<Instant> = None;

    // Process events
    loop {
        // Block waiting for events with a timeout
        match rx.recv_timeout(Duration::from_secs(60)) {
            Ok(()) => {
                // Check debounce
                let should_reload = match last_reload {
                    Some(last) => last.elapsed() >= debounce_duration,
                    None => true,
                };

                if should_reload {
                    // Wait for debounce period to catch rapid successive changes
                    tokio::time::sleep(debounce_duration).await;

                    // Drain any additional events that came in
                    while rx.try_recv().is_ok() {}

                    debug!("Config file change detected, reloading...");
                    last_reload = Some(Instant::now());

                    // Perform the reload
                    match config.reload() {
                        Ok(changes) => {
                            if changes.is_empty() {
                                debug!("Config file changed but no configuration changes detected");
                            } else {
                                info!(
                                    "Config file watcher triggered configuration reload with {} change(s):",
                                    changes.len()
                                );
                                for change in &changes {
                                    info!(
                                        "  - {}: '{}' -> '{}'",
                                        change.key, change.old_value, change.new_value
                                    );
                                }
                            }
                        }
                        Err(e) => {
                            error!("Failed to reload configuration from file change: {}", e);
                        }
                    }
                } else {
                    debug!(
                        "Debouncing config file change (last reload {}ms ago)",
                        last_reload.map(|l| l.elapsed().as_millis()).unwrap_or(0)
                    );
                }
            }
            Err(mpsc::RecvTimeoutError::Timeout) => {
                // No events, continue watching
                continue;
            }
            Err(mpsc::RecvTimeoutError::Disconnected) => {
                warn!("Config file watcher channel disconnected");
                break;
            }
        }
    }

    Ok(())
}
}