brokkr-broker::utils::audit Rust
Async Audit Logger for Brokkr.
This module provides an asynchronous audit logging service that buffers audit entries and writes them to the database in batches to avoid impacting request latency.
Structs
brokkr-broker::utils::audit::AuditLoggerConfig
pub
Derives: Debug, Clone
Configuration for the audit logger.
Fields
| Name | Type | Description |
|---|---|---|
channel_size | usize | Channel buffer size. |
batch_size | usize | Maximum batch size for writes. |
flush_interval_ms | u64 | Flush interval in milliseconds. |
brokkr-broker::utils::audit::AuditLogger
pub
Derives: Clone
The async audit logger for buffering and batching audit entries.
Fields
| Name | Type | Description |
|---|---|---|
sender | mpsc :: Sender < NewAuditLog > | Sender for emitting audit entries. |
Methods
new pub
#![allow(unused)]
fn main() {
fn new (dal : DAL) -> Self
}
Creates a new audit logger and starts the background writer.
Parameters:
| Name | Type | Description |
|---|---|---|
dal | - | The Data Access Layer for database operations. |
Returns:
An AuditLogger instance.
Source
#![allow(unused)]
fn main() {
pub fn new(dal: DAL) -> Self {
Self::with_config(dal, AuditLoggerConfig::default())
}
}
with_config pub
#![allow(unused)]
fn main() {
fn with_config (dal : DAL , config : AuditLoggerConfig) -> Self
}
Creates a new audit logger with custom configuration.
Parameters:
| Name | Type | Description |
|---|---|---|
dal | - | The Data Access Layer for database operations. |
config | - | The logger configuration. |
Returns:
An AuditLogger instance.
Source
#![allow(unused)]
fn main() {
pub fn with_config(dal: DAL, config: AuditLoggerConfig) -> Self {
let (sender, receiver) = mpsc::channel(config.channel_size);
// Start the background writer task
start_audit_writer(dal, receiver, config.batch_size, config.flush_interval_ms);
info!(
"Audit logger started (buffer: {}, batch: {}, flush: {}ms)",
config.channel_size, config.batch_size, config.flush_interval_ms
);
Self { sender }
}
}
log pub
#![allow(unused)]
fn main() {
fn log (& self , entry : NewAuditLog)
}
Logs an audit entry asynchronously (non-blocking).
If the channel is full, the entry will be dropped and an error logged.
Parameters:
| Name | Type | Description |
|---|---|---|
entry | - | The audit log entry to record. |
Source
#![allow(unused)]
fn main() {
pub fn log(&self, entry: NewAuditLog) {
let sender = self.sender.clone();
let action = entry.action.clone();
tokio::spawn(async move {
match sender.send(entry).await {
Ok(_) => {
debug!("Audit entry queued: {}", action);
}
Err(e) => {
error!(
"Failed to queue audit entry (action: {}): channel full or closed - {}",
action, e
);
}
}
});
}
}
log_async pub
async
#![allow(unused)]
fn main() {
async fn log_async (& self , entry : NewAuditLog ,) -> Result < () , mpsc :: error :: SendError < NewAuditLog > >
}
Logs an audit entry, waiting for it to be accepted.
Parameters:
| Name | Type | Description |
|---|---|---|
entry | - | The audit log entry to record. |
Returns:
Ok if the entry was accepted, Err if the channel is closed.
Source
#![allow(unused)]
fn main() {
pub async fn log_async(
&self,
entry: NewAuditLog,
) -> Result<(), mpsc::error::SendError<NewAuditLog>> {
let action = entry.action.clone();
self.sender.send(entry).await.map_err(|e| {
error!("Failed to queue audit entry (action: {}): {}", action, e);
e
})?;
debug!("Audit entry queued (async): {}", action);
Ok(())
}
}
try_log pub
#![allow(unused)]
fn main() {
fn try_log (& self , entry : NewAuditLog) -> bool
}
Tries to log an audit entry without blocking.
Parameters:
| Name | Type | Description |
|---|---|---|
entry | - | The audit log entry to record. |
Returns:
true if the entry was queued, false if the channel is full.
Source
#![allow(unused)]
fn main() {
pub fn try_log(&self, entry: NewAuditLog) -> bool {
match self.sender.try_send(entry) {
Ok(_) => true,
Err(mpsc::error::TrySendError::Full(_)) => {
warn!("Audit log channel full, entry dropped");
false
}
Err(mpsc::error::TrySendError::Closed(_)) => {
error!("Audit log channel closed");
false
}
}
}
}
Functions
brokkr-broker::utils::audit::init_audit_logger
pub
#![allow(unused)]
fn main() {
fn init_audit_logger (dal : DAL) -> Result < () , String >
}
Initializes the global audit logger.
This should be called once during broker startup.
Parameters:
| Name | Type | Description |
|---|---|---|
dal | - | The Data Access Layer for database operations. |
Returns:
Ok(()) if initialization succeeded, Err if already initialized.
Source
#![allow(unused)]
fn main() {
pub fn init_audit_logger(dal: DAL) -> Result<(), String> {
init_audit_logger_with_config(dal, AuditLoggerConfig::default())
}
}
brokkr-broker::utils::audit::init_audit_logger_with_config
pub
#![allow(unused)]
fn main() {
fn init_audit_logger_with_config (dal : DAL , config : AuditLoggerConfig) -> Result < () , String >
}
Initializes the global audit logger with custom configuration.
Parameters:
| Name | Type | Description |
|---|---|---|
dal | - | The Data Access Layer for database operations. |
config | - | The logger configuration. |
Returns:
Ok(()) if initialization succeeded, Err if already initialized.
Source
#![allow(unused)]
fn main() {
pub fn init_audit_logger_with_config(dal: DAL, config: AuditLoggerConfig) -> Result<(), String> {
let logger = AuditLogger::with_config(dal, config);
AUDIT_LOGGER
.set(Arc::new(logger))
.map_err(|_| "Audit logger already initialized".to_string())
}
}
brokkr-broker::utils::audit::get_audit_logger
pub
#![allow(unused)]
fn main() {
fn get_audit_logger () -> Option < Arc < AuditLogger > >
}
Gets the global audit logger.
Returns:
The audit logger, or None if not initialized.
Source
#![allow(unused)]
fn main() {
pub fn get_audit_logger() -> Option<Arc<AuditLogger>> {
AUDIT_LOGGER.get().cloned()
}
}
brokkr-broker::utils::audit::log
pub
#![allow(unused)]
fn main() {
fn log (entry : NewAuditLog)
}
Logs an audit entry to the global audit logger.
This is a convenience function for logging without needing to get the logger directly.
Parameters:
| Name | Type | Description |
|---|---|---|
entry | - | The audit log entry to record. |
Source
#![allow(unused)]
fn main() {
pub fn log(entry: NewAuditLog) {
if let Some(logger) = get_audit_logger() {
logger.log(entry);
} else {
warn!(
"Audit logger not initialized, entry dropped: {}",
entry.action
);
}
}
}
brokkr-broker::utils::audit::try_log
pub
#![allow(unused)]
fn main() {
fn try_log (entry : NewAuditLog) -> bool
}
Tries to log an audit entry without blocking.
Parameters:
| Name | Type | Description |
|---|---|---|
entry | - | The audit log entry to record. |
Returns:
true if logged, false if channel full or logger not initialized.
Source
#![allow(unused)]
fn main() {
pub fn try_log(entry: NewAuditLog) -> bool {
if let Some(logger) = get_audit_logger() {
logger.try_log(entry)
} else {
warn!(
"Audit logger not initialized, entry dropped: {}",
entry.action
);
false
}
}
}
brokkr-broker::utils::audit::start_audit_writer
private
#![allow(unused)]
fn main() {
fn start_audit_writer (dal : DAL , mut receiver : mpsc :: Receiver < NewAuditLog > , batch_size : usize , flush_interval_ms : u64 ,)
}
Starts the background audit writer task.
This task receives audit entries from the channel and writes them to the database in batches for efficiency.
Source
#![allow(unused)]
fn main() {
fn start_audit_writer(
dal: DAL,
mut receiver: mpsc::Receiver<NewAuditLog>,
batch_size: usize,
flush_interval_ms: u64,
) {
tokio::spawn(async move {
info!("Audit writer started");
let mut buffer: Vec<NewAuditLog> = Vec::with_capacity(batch_size);
let mut ticker = interval(Duration::from_millis(flush_interval_ms));
loop {
tokio::select! {
// Receive new entries
Some(entry) = receiver.recv() => {
buffer.push(entry);
// Flush if buffer is full
if buffer.len() >= batch_size {
flush_buffer(&dal, &mut buffer);
}
}
// Periodic flush
_ = ticker.tick() => {
if !buffer.is_empty() {
flush_buffer(&dal, &mut buffer);
}
}
// Channel closed
else => {
// Final flush before shutdown
if !buffer.is_empty() {
flush_buffer(&dal, &mut buffer);
}
warn!("Audit writer stopped - channel closed");
break;
}
}
}
});
}
}
brokkr-broker::utils::audit::flush_buffer
private
#![allow(unused)]
fn main() {
fn flush_buffer (dal : & DAL , buffer : & mut Vec < NewAuditLog >)
}
Flushes the buffer to the database.
Source
#![allow(unused)]
fn main() {
fn flush_buffer(dal: &DAL, buffer: &mut Vec<NewAuditLog>) {
if buffer.is_empty() {
return;
}
let count = buffer.len();
match dal.audit_logs().create_batch(buffer) {
Ok(inserted) => {
debug!("Flushed {} audit entries to database", inserted);
}
Err(e) => {
error!(
"Failed to flush {} audit entries to database: {:?}",
count, e
);
// Don't lose the entries - they'll be retried on next flush
// Actually, we should clear the buffer anyway to prevent infinite retries
// Log the actions that failed
for entry in buffer.iter() {
error!(
"Lost audit entry: {} ({})",
entry.action, entry.resource_type
);
}
}
}
buffer.clear();
}
}
brokkr-broker::utils::audit::log_action
pub
#![allow(unused)]
fn main() {
fn log_action (actor_type : & str , actor_id : Option < uuid :: Uuid > , action : & str , resource_type : & str , resource_id : Option < uuid :: Uuid > , details : Option < serde_json :: Value > , ip_address : Option < String > , user_agent : Option < String > ,)
}
Helper to create and log an audit entry in one call.
Parameters:
| Name | Type | Description |
|---|---|---|
actor_type | - | Type of actor (admin, agent, generator, system). |
actor_id | - | ID of the actor. |
action | - | The action performed. |
resource_type | - | Type of resource affected. |
resource_id | - | ID of the affected resource. |
details | - | Optional additional details. |
ip_address | - | Optional client IP address. |
user_agent | - | Optional client user agent. |
Source
#![allow(unused)]
fn main() {
pub fn log_action(
actor_type: &str,
actor_id: Option<uuid::Uuid>,
action: &str,
resource_type: &str,
resource_id: Option<uuid::Uuid>,
details: Option<serde_json::Value>,
ip_address: Option<String>,
user_agent: Option<String>,
) {
match NewAuditLog::new(actor_type, actor_id, action, resource_type, resource_id) {
Ok(mut entry) => {
if let Some(d) = details {
entry = entry.with_details(d);
}
if let Some(ip) = ip_address {
entry = entry.with_ip_address(ip);
}
if let Some(ua) = user_agent {
entry = entry.with_user_agent(ua);
}
log(entry);
}
Err(e) => {
error!("Failed to create audit entry: {}", e);
}
}
}
}