Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

brokkr-broker::utils::audit Rust

Async Audit Logger for Brokkr.

This module provides an asynchronous audit logging service that buffers audit entries and writes them to the database in batches to avoid impacting request latency.

Structs

brokkr-broker::utils::audit::AuditLoggerConfig

pub

Derives: Debug, Clone

Configuration for the audit logger.

Fields

NameTypeDescription
channel_sizeusizeChannel buffer size.
batch_sizeusizeMaximum batch size for writes.
flush_interval_msu64Flush interval in milliseconds.

brokkr-broker::utils::audit::AuditLogger

pub

Derives: Clone

The async audit logger for buffering and batching audit entries.

Fields

NameTypeDescription
sendermpsc :: Sender < NewAuditLog >Sender for emitting audit entries.

Methods

new pub
#![allow(unused)]
fn main() {
fn new (dal : DAL) -> Self
}

Creates a new audit logger and starts the background writer.

Parameters:

NameTypeDescription
dal-The Data Access Layer for database operations.

Returns:

An AuditLogger instance.

Source
#![allow(unused)]
fn main() {
    pub fn new(dal: DAL) -> Self {
        Self::with_config(dal, AuditLoggerConfig::default())
    }
}
with_config pub
#![allow(unused)]
fn main() {
fn with_config (dal : DAL , config : AuditLoggerConfig) -> Self
}

Creates a new audit logger with custom configuration.

Parameters:

NameTypeDescription
dal-The Data Access Layer for database operations.
config-The logger configuration.

Returns:

An AuditLogger instance.

Source
#![allow(unused)]
fn main() {
    pub fn with_config(dal: DAL, config: AuditLoggerConfig) -> Self {
        let (sender, receiver) = mpsc::channel(config.channel_size);

        // Start the background writer task
        start_audit_writer(dal, receiver, config.batch_size, config.flush_interval_ms);

        info!(
            "Audit logger started (buffer: {}, batch: {}, flush: {}ms)",
            config.channel_size, config.batch_size, config.flush_interval_ms
        );

        Self { sender }
    }
}
log pub
#![allow(unused)]
fn main() {
fn log (& self , entry : NewAuditLog)
}

Logs an audit entry asynchronously (non-blocking).

If the channel is full, the entry will be dropped and an error logged.

Parameters:

NameTypeDescription
entry-The audit log entry to record.
Source
#![allow(unused)]
fn main() {
    pub fn log(&self, entry: NewAuditLog) {
        let sender = self.sender.clone();
        let action = entry.action.clone();

        tokio::spawn(async move {
            match sender.send(entry).await {
                Ok(_) => {
                    debug!("Audit entry queued: {}", action);
                }
                Err(e) => {
                    error!(
                        "Failed to queue audit entry (action: {}): channel full or closed - {}",
                        action, e
                    );
                }
            }
        });
    }
}
log_async pub

async

#![allow(unused)]
fn main() {
async fn log_async (& self , entry : NewAuditLog ,) -> Result < () , mpsc :: error :: SendError < NewAuditLog > >
}

Logs an audit entry, waiting for it to be accepted.

Parameters:

NameTypeDescription
entry-The audit log entry to record.

Returns:

Ok if the entry was accepted, Err if the channel is closed.

Source
#![allow(unused)]
fn main() {
    pub async fn log_async(
        &self,
        entry: NewAuditLog,
    ) -> Result<(), mpsc::error::SendError<NewAuditLog>> {
        let action = entry.action.clone();

        self.sender.send(entry).await.map_err(|e| {
            error!("Failed to queue audit entry (action: {}): {}", action, e);
            e
        })?;

        debug!("Audit entry queued (async): {}", action);
        Ok(())
    }
}
try_log pub
#![allow(unused)]
fn main() {
fn try_log (& self , entry : NewAuditLog) -> bool
}

Tries to log an audit entry without blocking.

Parameters:

NameTypeDescription
entry-The audit log entry to record.

Returns:

true if the entry was queued, false if the channel is full.

Source
#![allow(unused)]
fn main() {
    pub fn try_log(&self, entry: NewAuditLog) -> bool {
        match self.sender.try_send(entry) {
            Ok(_) => true,
            Err(mpsc::error::TrySendError::Full(_)) => {
                warn!("Audit log channel full, entry dropped");
                false
            }
            Err(mpsc::error::TrySendError::Closed(_)) => {
                error!("Audit log channel closed");
                false
            }
        }
    }
}

Functions

brokkr-broker::utils::audit::init_audit_logger

pub

#![allow(unused)]
fn main() {
fn init_audit_logger (dal : DAL) -> Result < () , String >
}

Initializes the global audit logger.

This should be called once during broker startup.

Parameters:

NameTypeDescription
dal-The Data Access Layer for database operations.

Returns:

Ok(()) if initialization succeeded, Err if already initialized.

Source
#![allow(unused)]
fn main() {
pub fn init_audit_logger(dal: DAL) -> Result<(), String> {
    init_audit_logger_with_config(dal, AuditLoggerConfig::default())
}
}

brokkr-broker::utils::audit::init_audit_logger_with_config

pub

#![allow(unused)]
fn main() {
fn init_audit_logger_with_config (dal : DAL , config : AuditLoggerConfig) -> Result < () , String >
}

Initializes the global audit logger with custom configuration.

Parameters:

NameTypeDescription
dal-The Data Access Layer for database operations.
config-The logger configuration.

Returns:

Ok(()) if initialization succeeded, Err if already initialized.

Source
#![allow(unused)]
fn main() {
pub fn init_audit_logger_with_config(dal: DAL, config: AuditLoggerConfig) -> Result<(), String> {
    let logger = AuditLogger::with_config(dal, config);
    AUDIT_LOGGER
        .set(Arc::new(logger))
        .map_err(|_| "Audit logger already initialized".to_string())
}
}

brokkr-broker::utils::audit::get_audit_logger

pub

#![allow(unused)]
fn main() {
fn get_audit_logger () -> Option < Arc < AuditLogger > >
}

Gets the global audit logger.

Returns:

The audit logger, or None if not initialized.

Source
#![allow(unused)]
fn main() {
pub fn get_audit_logger() -> Option<Arc<AuditLogger>> {
    AUDIT_LOGGER.get().cloned()
}
}

brokkr-broker::utils::audit::log

pub

#![allow(unused)]
fn main() {
fn log (entry : NewAuditLog)
}

Logs an audit entry to the global audit logger.

This is a convenience function for logging without needing to get the logger directly.

Parameters:

NameTypeDescription
entry-The audit log entry to record.
Source
#![allow(unused)]
fn main() {
pub fn log(entry: NewAuditLog) {
    if let Some(logger) = get_audit_logger() {
        logger.log(entry);
    } else {
        warn!(
            "Audit logger not initialized, entry dropped: {}",
            entry.action
        );
    }
}
}

brokkr-broker::utils::audit::try_log

pub

#![allow(unused)]
fn main() {
fn try_log (entry : NewAuditLog) -> bool
}

Tries to log an audit entry without blocking.

Parameters:

NameTypeDescription
entry-The audit log entry to record.

Returns:

true if logged, false if channel full or logger not initialized.

Source
#![allow(unused)]
fn main() {
pub fn try_log(entry: NewAuditLog) -> bool {
    if let Some(logger) = get_audit_logger() {
        logger.try_log(entry)
    } else {
        warn!(
            "Audit logger not initialized, entry dropped: {}",
            entry.action
        );
        false
    }
}
}

brokkr-broker::utils::audit::start_audit_writer

private

#![allow(unused)]
fn main() {
fn start_audit_writer (dal : DAL , mut receiver : mpsc :: Receiver < NewAuditLog > , batch_size : usize , flush_interval_ms : u64 ,)
}

Starts the background audit writer task.

This task receives audit entries from the channel and writes them to the database in batches for efficiency.

Source
#![allow(unused)]
fn main() {
fn start_audit_writer(
    dal: DAL,
    mut receiver: mpsc::Receiver<NewAuditLog>,
    batch_size: usize,
    flush_interval_ms: u64,
) {
    tokio::spawn(async move {
        info!("Audit writer started");

        let mut buffer: Vec<NewAuditLog> = Vec::with_capacity(batch_size);
        let mut ticker = interval(Duration::from_millis(flush_interval_ms));

        loop {
            tokio::select! {
                // Receive new entries
                Some(entry) = receiver.recv() => {
                    buffer.push(entry);

                    // Flush if buffer is full
                    if buffer.len() >= batch_size {
                        flush_buffer(&dal, &mut buffer);
                    }
                }

                // Periodic flush
                _ = ticker.tick() => {
                    if !buffer.is_empty() {
                        flush_buffer(&dal, &mut buffer);
                    }
                }

                // Channel closed
                else => {
                    // Final flush before shutdown
                    if !buffer.is_empty() {
                        flush_buffer(&dal, &mut buffer);
                    }
                    warn!("Audit writer stopped - channel closed");
                    break;
                }
            }
        }
    });
}
}

brokkr-broker::utils::audit::flush_buffer

private

#![allow(unused)]
fn main() {
fn flush_buffer (dal : & DAL , buffer : & mut Vec < NewAuditLog >)
}

Flushes the buffer to the database.

Source
#![allow(unused)]
fn main() {
fn flush_buffer(dal: &DAL, buffer: &mut Vec<NewAuditLog>) {
    if buffer.is_empty() {
        return;
    }

    let count = buffer.len();

    match dal.audit_logs().create_batch(buffer) {
        Ok(inserted) => {
            debug!("Flushed {} audit entries to database", inserted);
        }
        Err(e) => {
            error!(
                "Failed to flush {} audit entries to database: {:?}",
                count, e
            );
            // Don't lose the entries - they'll be retried on next flush
            // Actually, we should clear the buffer anyway to prevent infinite retries
            // Log the actions that failed
            for entry in buffer.iter() {
                error!(
                    "Lost audit entry: {} ({})",
                    entry.action, entry.resource_type
                );
            }
        }
    }

    buffer.clear();
}
}

brokkr-broker::utils::audit::log_action

pub

#![allow(unused)]
fn main() {
fn log_action (actor_type : & str , actor_id : Option < uuid :: Uuid > , action : & str , resource_type : & str , resource_id : Option < uuid :: Uuid > , details : Option < serde_json :: Value > , ip_address : Option < String > , user_agent : Option < String > ,)
}

Helper to create and log an audit entry in one call.

Parameters:

NameTypeDescription
actor_type-Type of actor (admin, agent, generator, system).
actor_id-ID of the actor.
action-The action performed.
resource_type-Type of resource affected.
resource_id-ID of the affected resource.
details-Optional additional details.
ip_address-Optional client IP address.
user_agent-Optional client user agent.
Source
#![allow(unused)]
fn main() {
pub fn log_action(
    actor_type: &str,
    actor_id: Option<uuid::Uuid>,
    action: &str,
    resource_type: &str,
    resource_id: Option<uuid::Uuid>,
    details: Option<serde_json::Value>,
    ip_address: Option<String>,
    user_agent: Option<String>,
) {
    match NewAuditLog::new(actor_type, actor_id, action, resource_type, resource_id) {
        Ok(mut entry) => {
            if let Some(d) = details {
                entry = entry.with_details(d);
            }
            if let Some(ip) = ip_address {
                entry = entry.with_ip_address(ip);
            }
            if let Some(ua) = user_agent {
                entry = entry.with_user_agent(ua);
            }
            log(entry);
        }
        Err(e) => {
            error!("Failed to create audit entry: {}", e);
        }
    }
}
}