Audit Logging Tutorial for HeliosDB-Lite¶
Version: 1.0 Last Updated: 2025-12-01 Target Audience: Developers, DevOps Engineers, Compliance Teams Estimated Time: 45-60 minutes
Table of Contents¶
- Introduction
- Configuration
- Event Types
- Querying Audit Logs
- Log Management
- Compliance Scenarios
- Integration
- Tamper Detection
1. Introduction¶
What is Audit Logging?¶
Audit logging is a comprehensive tracking mechanism that records all significant database operations, providing a tamper-proof trail of who did what, when, and whether it succeeded. In HeliosDB-Lite, audit logging is a built-in, high-performance feature that operates with sub-millisecond overhead.
Key characteristics of HeliosDB-Lite audit logging:
- Tamper-Proof: Append-only logs with cryptographic SHA-256 checksums
- High-Performance: Asynchronous buffered logging (<0.1ms overhead)
- Configurable: Fine-grained control over what operations are logged
- Queryable: Standard SQL interface for audit log analysis
- Embedded: Zero external dependencies or infrastructure
Compliance Requirements¶
Audit logging is critical for meeting regulatory compliance requirements:
| Regulation | Requirements | HeliosDB-Lite Support |
|---|---|---|
| SOC2 | Track who, what, when, and result of all data access | ✅ User, operation, timestamp, success tracking |
| HIPAA | 7-year retention of all PHI access and modifications | ✅ Configurable retention up to 7+ years |
| GDPR | Log all data processing activities, right to erasure | ✅ Comprehensive DML logging with metadata |
| PCI-DSS | Audit trail for all access to cardholder data | ✅ Tamper-proof logs with cryptographic integrity |
| FDA 21 CFR Part 11 | Electronic records with secure timestamps | ✅ Immutable timestamps with checksums |
Audit Trail Importance¶
Audit trails serve multiple critical business functions:
- Compliance: Meet regulatory requirements and pass audits
- Security: Detect and investigate unauthorized access or data breaches
- Forensics: Reconstruct events leading to data corruption or loss
- Debugging: Trace application behavior and identify root causes
- Accountability: Create a record of who made changes to sensitive data
- Legal Protection: Provide evidence for legal proceedings or disputes
Real-world example: A healthcare provider using HeliosDB-Lite reduced compliance audit preparation time from 40 hours to 8 hours by leveraging SQL-queryable audit logs with built-in tamper detection.
2. Configuration¶
Enabling Audit Logging¶
Audit logging can be enabled through configuration files or programmatically in your application code.
TOML Configuration File¶
Create a configuration file heliosdb.toml:
[database]
path = "/var/lib/heliosdb/myapp.db"
memory_limit_mb = 512
enable_wal = true
[audit]
enabled = true
log_ddl = true
log_dml = true
log_select = false
log_transactions = false
log_auth = true
retention_days = 90
async_buffer_size = 100
enable_checksums = true
max_query_length = 10000
[audit.capture_metadata]
capture_client_ip = true
capture_application_name = true
capture_database_name = true
capture_execution_time = true
capture_custom_fields = false
Programmatic Configuration (Rust)¶
use heliosdb_lite::{EmbeddedDatabase, Config};
use heliosdb_lite::audit::{AuditLogger, AuditConfig, MetadataCapture};
use std::sync::Arc;
fn setup_audit_logging() -> Result<AuditLogger, Box<dyn std::error::Error>> {
// Create database
let config = Config::in_memory();
let db = EmbeddedDatabase::new_in_memory()?;
// Get storage reference
let storage = Arc::new(
heliosdb_lite::storage::StorageEngine::open_in_memory(&config)?
);
// Custom audit configuration
let audit_config = AuditConfig {
enabled: true,
log_ddl: true,
log_dml: true,
log_select: false,
log_transactions: true,
log_auth: true,
retention_days: 90,
async_buffer_size: 100,
enable_checksums: true,
max_query_length: 10000,
capture_metadata: MetadataCapture {
capture_client_ip: true,
capture_application_name: true,
capture_database_name: true,
capture_execution_time: true,
capture_custom_fields: false,
},
};
// Initialize audit logger
let mut logger = AuditLogger::new(storage, audit_config)?;
// Set session context
logger.set_user("admin".to_string());
logger.set_session_id(uuid::Uuid::new_v4().to_string());
Ok(logger)
}
Log Destinations¶
HeliosDB-Lite stores audit logs in an internal table and supports multiple output formats:
Internal Table Storage (Default)¶
All audit events are stored in the __audit_log system table:
The __audit_log table is automatically created when audit logging is initialized.
File-Based Log Export¶
You can export audit logs to files for archival or external processing:
use heliosdb_lite::audit::AuditLogger;
use std::fs::File;
use std::io::Write;
fn export_audit_logs(logger: &AuditLogger) -> Result<(), Box<dyn std::error::Error>> {
// Query all audit events
let events = logger.query_audit_log("")?;
// Write to JSON file
let mut file = File::create("/var/log/heliosdb/audit.json")?;
for event in events {
// Convert tuple to JSON (pseudo-code)
let json = format_event_as_json(&event);
writeln!(file, "{}", json)?;
}
Ok(())
}
Database Table Export¶
Export audit logs to a separate archive table:
-- Create archive table
CREATE TABLE audit_log_archive (
id INT8 PRIMARY KEY,
timestamp TIMESTAMP,
session_id TEXT,
user TEXT,
operation TEXT,
target TEXT,
query TEXT,
affected_rows INT8,
success BOOLEAN,
error TEXT,
checksum TEXT
);
-- Copy old audit logs to archive
INSERT INTO audit_log_archive
SELECT * FROM __audit_log
WHERE timestamp < NOW() - INTERVAL '90 days';
Log Format Options¶
JSON Format¶
Export audit events as JSON for integration with external systems:
{
"id": 12345,
"timestamp": "2025-12-01T10:30:45.123Z",
"session_id": "550e8400-e29b-41d4-a716-446655440000",
"user": "admin",
"operation": "INSERT",
"target": "users",
"query": "INSERT INTO users (id, name, email) VALUES (1, 'Alice', 'alice@example.com')",
"affected_rows": 1,
"success": true,
"error": null,
"checksum": "a8f5f167f44f4964e6c998dee827110c062df690bcaf8e94e0c0e67c03c3e2fa"
}
CSV Format¶
Export audit events as CSV for spreadsheet analysis:
id,timestamp,user,operation,target,affected_rows,success
12345,2025-12-01T10:30:45.123Z,admin,INSERT,users,1,true
12346,2025-12-01T10:31:12.456Z,admin,UPDATE,users,1,true
12347,2025-12-01T10:32:05.789Z,admin,DELETE,users,1,true
Configuration Presets¶
HeliosDB-Lite provides pre-configured audit profiles for common use cases:
Default Configuration¶
Balanced configuration for most applications:
let config = AuditConfig::default();
// Logs: DDL, DML (not SELECT), Auth
// Retention: 90 days
// Checksums: Enabled
Minimal Configuration¶
Lightweight configuration for low-overhead environments:
Verbose Configuration¶
Comprehensive logging for debugging and forensics:
let config = AuditConfig::verbose();
// Logs: DDL, DML, SELECT, Transactions, Auth
// Retention: 365 days
// Checksums: Enabled
Compliance Configuration¶
Strict configuration for regulatory compliance:
let config = AuditConfig::compliance();
// Logs: DDL, DML, Transactions, Auth
// Retention: 2555 days (7 years)
// Checksums: Enabled (mandatory)
Configuration Comparison Table:
| Feature | Minimal | Default | Verbose | Compliance |
|---|---|---|---|---|
| DDL Logging | ✅ | ✅ | ✅ | ✅ |
| DML Logging | ❌ | ✅ | ✅ | ✅ |
| SELECT Logging | ❌ | ❌ | ✅ | ❌ |
| Transaction Logging | ❌ | ❌ | ✅ | ✅ |
| Auth Logging | ❌ | ✅ | ✅ | ✅ |
| Retention (days) | 30 | 90 | 365 | 2555 |
| Checksums | ❌ | ✅ | ✅ | ✅ |
| Overhead | <0.05ms | <0.1ms | <0.2ms | <0.1ms |
3. Event Types¶
HeliosDB-Lite audit logging tracks multiple categories of database operations.
DDL Events (Data Definition Language)¶
DDL events track changes to database schema:
| Operation | Description | Example |
|---|---|---|
| CREATE_TABLE | New table creation | CREATE TABLE users (id INT, name TEXT) |
| DROP_TABLE | Table deletion | DROP TABLE users |
| ALTER_TABLE | Table schema modification | ALTER TABLE users ADD COLUMN email TEXT |
| CREATE_INDEX | Index creation | CREATE INDEX idx_users_name ON users(name) |
| DROP_INDEX | Index deletion | DROP INDEX idx_users_name |
Example: Logging DDL Operations
use heliosdb_lite::audit::AuditLogger;
fn log_schema_change(logger: &AuditLogger) -> Result<(), Box<dyn std::error::Error>> {
// Log successful table creation
logger.log_ddl(
"CREATE TABLE",
"users",
"CREATE TABLE users (id INT PRIMARY KEY, name TEXT, email TEXT)",
true, // success
None, // no error
)?;
// Log failed index creation
logger.log_ddl(
"CREATE INDEX",
"users",
"CREATE INDEX idx_invalid ON nonexistent_table(name)",
false, // failed
Some("Table does not exist"),
)?;
Ok(())
}
DML Events (Data Manipulation Language)¶
DML events track data modifications:
| Operation | Description | Example |
|---|---|---|
| INSERT | New row insertion | INSERT INTO users VALUES (1, 'Alice') |
| UPDATE | Row modification | UPDATE users SET name='Bob' WHERE id=1 |
| DELETE | Row deletion | DELETE FROM users WHERE id=1 |
Example: Logging DML Operations
fn log_data_changes(logger: &AuditLogger) -> Result<(), Box<dyn std::error::Error>> {
// Log INSERT operation
logger.log_dml(
"INSERT",
"users",
"INSERT INTO users (id, name, email) VALUES (1, 'Alice', 'alice@example.com')",
1, // affected rows
true, // success
None, // no error
)?;
// Log UPDATE operation
logger.log_dml(
"UPDATE",
"users",
"UPDATE users SET email='alice.smith@example.com' WHERE id=1",
1, // affected rows
true, // success
None,
)?;
// Log DELETE operation
logger.log_dml(
"DELETE",
"users",
"DELETE FROM users WHERE id=1",
1, // affected rows
true, // success
None,
)?;
Ok(())
}
Security Events¶
Security events track authentication and authorization:
| Operation | Description | Example |
|---|---|---|
| LOGIN | User authentication | User login successful |
| LOGOUT | User session termination | User logout |
| GRANT | Permission granted | GRANT SELECT ON users TO analyst |
| REVOKE | Permission revoked | REVOKE DELETE ON users FROM analyst |
Example: Logging Security Events
fn log_auth_events(logger: &AuditLogger) -> Result<(), Box<dyn std::error::Error>> {
use heliosdb_lite::audit::OperationType;
use heliosdb_lite::audit::AuditMetadata;
// Log successful login
logger.log_operation(
OperationType::Login,
None, // no specific target
"User 'admin' logged in from 192.168.1.100",
0, // no rows affected
true, // success
None, // no error
AuditMetadata::default(),
)?;
// Log failed login attempt
logger.log_operation(
OperationType::Login,
None,
"User 'hacker' failed login attempt",
0,
false, // failed
Some("Invalid credentials"),
AuditMetadata::default(),
)?;
Ok(())
}
Query Events (SELECT)¶
Query events track data access (disabled by default due to verbosity):
Example: Logging SELECT Queries
fn log_query(logger: &AuditLogger) -> Result<(), Box<dyn std::error::Error>> {
// Enable SELECT logging first in config
logger.log_select(
"users",
"SELECT * FROM users WHERE email LIKE '%@example.com'",
15, // row count
Some(12), // execution time in milliseconds
)?;
Ok(())
}
Note: SELECT logging is disabled by default because it generates high log volume. Enable only when required for compliance or debugging.
Transaction Events¶
Transaction events track transaction boundaries:
| Operation | Description | Example |
|---|---|---|
| BEGIN | Transaction start | BEGIN TRANSACTION |
| COMMIT | Transaction commit | COMMIT |
| ROLLBACK | Transaction abort | ROLLBACK |
4. Querying Audit Logs¶
The __audit_log Table¶
All audit events are stored in the __audit_log system table with the following schema:
CREATE TABLE __audit_log (
id INT8 PRIMARY KEY, -- Unique event ID
timestamp TIMESTAMP, -- When operation occurred
session_id TEXT, -- Session identifier
user TEXT, -- User who performed operation
operation TEXT, -- Operation type (INSERT, UPDATE, etc.)
target TEXT, -- Target object (table name, etc.)
query TEXT, -- Full SQL query
affected_rows INT8, -- Number of rows affected
success BOOLEAN, -- Whether operation succeeded
error TEXT, -- Error message (if failed)
checksum TEXT -- SHA-256 tamper detection checksum
);
Basic Queries¶
View Recent Audit Events¶
View All Operations by a Specific User¶
-- Get all operations by user 'admin'
SELECT
timestamp,
operation,
target,
affected_rows,
success
FROM __audit_log
WHERE user = 'admin'
ORDER BY timestamp DESC;
View Failed Operations¶
-- Get all failed operations
SELECT
timestamp,
user,
operation,
target,
query,
error
FROM __audit_log
WHERE success = false
ORDER BY timestamp DESC;
Filtering by Event Type¶
-- Get all INSERT operations
SELECT * FROM __audit_log
WHERE operation = 'INSERT';
-- Get all DDL operations (schema changes)
SELECT * FROM __audit_log
WHERE operation IN ('CREATE_TABLE', 'DROP_TABLE', 'ALTER_TABLE', 'CREATE_INDEX', 'DROP_INDEX');
-- Get all DML operations (data changes)
SELECT * FROM __audit_log
WHERE operation IN ('INSERT', 'UPDATE', 'DELETE');
-- Get all authentication events
SELECT * FROM __audit_log
WHERE operation IN ('LOGIN', 'LOGOUT', 'GRANT', 'REVOKE');
Time-Based Queries¶
-- Events from the last 24 hours
SELECT * FROM __audit_log
WHERE timestamp >= NOW() - INTERVAL '24 hours'
ORDER BY timestamp DESC;
-- Events within a specific date range
SELECT * FROM __audit_log
WHERE timestamp >= '2025-12-01T00:00:00Z'
AND timestamp <= '2025-12-31T23:59:59Z'
ORDER BY timestamp;
-- Events grouped by hour
SELECT
DATE_TRUNC('hour', timestamp) as hour,
COUNT(*) as event_count,
SUM(CASE WHEN success THEN 1 ELSE 0 END) as successful,
SUM(CASE WHEN NOT success THEN 1 ELSE 0 END) as failed
FROM __audit_log
GROUP BY hour
ORDER BY hour DESC;
Advanced Queries¶
Most Active Users¶
SELECT
user,
COUNT(*) as total_operations,
SUM(CASE WHEN operation IN ('INSERT', 'UPDATE', 'DELETE') THEN 1 ELSE 0 END) as data_modifications,
SUM(CASE WHEN NOT success THEN 1 ELSE 0 END) as failed_operations
FROM __audit_log
GROUP BY user
ORDER BY total_operations DESC;
Most Modified Tables¶
SELECT
target as table_name,
COUNT(*) as modifications,
SUM(affected_rows) as total_rows_affected
FROM __audit_log
WHERE operation IN ('INSERT', 'UPDATE', 'DELETE')
AND target IS NOT NULL
GROUP BY target
ORDER BY modifications DESC;
Operations by Time of Day¶
SELECT
EXTRACT(HOUR FROM timestamp) as hour_of_day,
COUNT(*) as operation_count
FROM __audit_log
GROUP BY hour_of_day
ORDER BY hour_of_day;
Programmatic Query Interface (Rust)¶
use heliosdb_lite::audit::{AuditQuery, AuditFilter, OperationType};
fn query_audit_logs(logger: &AuditLogger) -> Result<(), Box<dyn std::error::Error>> {
// Build query using filter API
let query = AuditQuery::new()
.with_operation(OperationType::Insert)
.with_user("admin".to_string())
.with_success(true)
.limit(100)
.offset(0);
// Build SQL and execute
let sql = query.build_sql();
let tuples = logger.query_audit_log(&sql)?;
// Parse events
let events = AuditQuery::parse_events(tuples)?;
// Process results
for event in events {
println!("Event {}: {} on {} by {} at {}",
event.id,
event.operation,
event.target.unwrap_or_default(),
event.user,
event.timestamp
);
// Verify checksum
if !event.verify_checksum() {
eprintln!("WARNING: Event {} has invalid checksum!", event.id);
}
}
Ok(())
}
5. Log Management¶
Log Rotation¶
HeliosDB-Lite uses an append-only audit log model. For long-running systems, implement log rotation to manage storage:
Manual Rotation Script¶
#!/bin/bash
# rotate_audit_logs.sh - Daily audit log rotation script
DB_PATH="/var/lib/heliosdb/myapp.db"
ARCHIVE_DIR="/var/log/heliosdb/audit_archive"
RETENTION_DAYS=90
# Export old audit logs
heliosdb-cli -d "$DB_PATH" -e "
COPY (
SELECT * FROM __audit_log
WHERE timestamp < NOW() - INTERVAL '$RETENTION_DAYS days'
) TO '$ARCHIVE_DIR/audit_$(date +%Y%m%d).csv' WITH CSV HEADER;
"
# Delete old logs from active table
heliosdb-cli -d "$DB_PATH" -e "
DELETE FROM __audit_log
WHERE timestamp < NOW() - INTERVAL '$RETENTION_DAYS days';
"
echo "Audit log rotation complete: $(date)"
Programmatic Rotation (Rust)¶
use heliosdb_lite::EmbeddedDatabase;
use chrono::{Utc, Duration};
fn rotate_audit_logs(db: &EmbeddedDatabase, retention_days: i64)
-> Result<(), Box<dyn std::error::Error>>
{
let cutoff_date = Utc::now() - Duration::days(retention_days);
// Archive old logs to separate table
let archive_query = format!(
"INSERT INTO audit_log_archive \
SELECT * FROM __audit_log \
WHERE timestamp < '{}'",
cutoff_date.to_rfc3339()
);
db.execute(&archive_query)?;
// Delete archived logs from active table
let delete_query = format!(
"DELETE FROM __audit_log \
WHERE timestamp < '{}'",
cutoff_date.to_rfc3339()
);
db.execute(&delete_query)?;
println!("Rotated audit logs older than {} days", retention_days);
Ok(())
}
Archival Strategies¶
Compressed File Archive¶
use std::fs::File;
use std::io::Write;
use flate2::Compression;
use flate2::write::GzEncoder;
fn archive_to_compressed_file(
logger: &AuditLogger,
output_path: &str
) -> Result<(), Box<dyn std::error::Error>> {
// Query old audit events
let events = logger.query_audit_log(
"timestamp < NOW() - INTERVAL '90 days'"
)?;
// Create compressed file
let file = File::create(output_path)?;
let mut encoder = GzEncoder::new(file, Compression::default());
// Write events as JSON lines
for event in events {
let json = serde_json::to_string(&event)?;
writeln!(encoder, "{}", json)?;
}
encoder.finish()?;
Ok(())
}
Database Partition Archive¶
-- Create partitioned archive tables by year
CREATE TABLE audit_log_2024 (LIKE __audit_log);
CREATE TABLE audit_log_2025 (LIKE __audit_log);
-- Move 2024 logs to archive partition
INSERT INTO audit_log_2024
SELECT * FROM __audit_log
WHERE EXTRACT(YEAR FROM timestamp) = 2024;
-- Delete from active log
DELETE FROM __audit_log
WHERE EXTRACT(YEAR FROM timestamp) = 2024;
Cloud Storage Archive (AWS S3)¶
use rusoto_s3::{S3Client, PutObjectRequest, S3};
use rusoto_core::Region;
async fn archive_to_s3(
logger: &AuditLogger,
bucket: &str,
key: &str
) -> Result<(), Box<dyn std::error::Error>> {
let s3_client = S3Client::new(Region::UsEast1);
// Export audit logs to JSON
let events = logger.query_audit_log("")?;
let json_data = serde_json::to_vec(&events)?;
// Upload to S3
let put_request = PutObjectRequest {
bucket: bucket.to_string(),
key: key.to_string(),
body: Some(json_data.into()),
..Default::default()
};
s3_client.put_object(put_request).await?;
Ok(())
}
Retention Policies¶
Implement automated retention policies based on compliance requirements:
Policy Definition¶
use serde::{Deserialize, Serialize};
#[derive(Debug, Serialize, Deserialize)]
struct RetentionPolicy {
// Active log retention
active_retention_days: u32,
// Archive retention (before permanent deletion)
archive_retention_days: u32,
// Operation-specific overrides
ddl_retention_days: Option<u32>,
security_event_retention_days: Option<u32>,
// Archive destination
archive_destination: ArchiveDestination,
}
#[derive(Debug, Serialize, Deserialize)]
enum ArchiveDestination {
LocalFile { path: String },
S3 { bucket: String, prefix: String },
Database { table: String },
}
Policy Enforcement¶
fn enforce_retention_policy(
logger: &AuditLogger,
policy: &RetentionPolicy
) -> Result<(), Box<dyn std::error::Error>> {
use chrono::{Utc, Duration};
let active_cutoff = Utc::now() - Duration::days(policy.active_retention_days as i64);
let archive_cutoff = Utc::now() - Duration::days(
(policy.active_retention_days + policy.archive_retention_days) as i64
);
// Archive logs older than active retention
let archive_query = format!(
"SELECT * FROM __audit_log WHERE timestamp < '{}'",
active_cutoff.to_rfc3339()
);
let events_to_archive = logger.query_audit_log(&archive_query)?;
// Write to archive destination
match &policy.archive_destination {
ArchiveDestination::Database { table } => {
// Move to archive table
println!("Archiving {} events to {}", events_to_archive.len(), table);
},
ArchiveDestination::S3 { bucket, prefix } => {
// Upload to S3
println!("Archiving {} events to s3://{}/{}",
events_to_archive.len(), bucket, prefix);
},
_ => {}
}
// Delete archived logs from active table
let delete_query = format!(
"DELETE FROM __audit_log WHERE timestamp < '{}'",
active_cutoff.to_rfc3339()
);
// Execute delete (implementation omitted)
Ok(())
}
Compliance-Specific Retention¶
fn get_compliance_retention_policy(compliance_type: &str) -> RetentionPolicy {
match compliance_type {
"SOC2" => RetentionPolicy {
active_retention_days: 90,
archive_retention_days: 275, // 1 year total
ddl_retention_days: Some(365),
security_event_retention_days: Some(730), // 2 years
archive_destination: ArchiveDestination::S3 {
bucket: "compliance-audit-logs".to_string(),
prefix: "soc2/".to_string(),
},
},
"HIPAA" => RetentionPolicy {
active_retention_days: 365,
archive_retention_days: 2190, // 7 years total (2555 days)
ddl_retention_days: Some(2555),
security_event_retention_days: Some(2555),
archive_destination: ArchiveDestination::S3 {
bucket: "hipaa-audit-logs".to_string(),
prefix: "phi-access/".to_string(),
},
},
"GDPR" => RetentionPolicy {
active_retention_days: 180,
archive_retention_days: 910, // 3 years total
ddl_retention_days: Some(365),
security_event_retention_days: Some(1095), // 3 years
archive_destination: ArchiveDestination::Database {
table: "audit_log_gdpr_archive".to_string(),
},
},
_ => RetentionPolicy {
active_retention_days: 90,
archive_retention_days: 275,
ddl_retention_days: None,
security_event_retention_days: None,
archive_destination: ArchiveDestination::LocalFile {
path: "/var/log/heliosdb/archive".to_string(),
},
},
}
}
6. Compliance Scenarios¶
SOC2 Audit Trails¶
SOC2 compliance requires tracking who accessed what data, when, and whether it succeeded.
Configuration for SOC2¶
[audit]
enabled = true
log_ddl = true # Track schema changes
log_dml = true # Track data modifications
log_select = false # Optional (increases volume)
log_auth = true # Track authentication
retention_days = 90 # Minimum 90 days
enable_checksums = true # Tamper detection
SOC2 Audit Queries¶
-- 1. User access report
SELECT
user,
COUNT(*) as operations,
MIN(timestamp) as first_access,
MAX(timestamp) as last_access
FROM __audit_log
WHERE timestamp >= NOW() - INTERVAL '90 days'
GROUP BY user;
-- 2. Failed operations (security incidents)
SELECT
timestamp,
user,
operation,
target,
error
FROM __audit_log
WHERE success = false
AND timestamp >= NOW() - INTERVAL '90 days'
ORDER BY timestamp DESC;
-- 3. Data modifications by table
SELECT
target,
operation,
COUNT(*) as modifications,
SUM(affected_rows) as rows_affected
FROM __audit_log
WHERE operation IN ('INSERT', 'UPDATE', 'DELETE')
AND timestamp >= NOW() - INTERVAL '90 days'
GROUP BY target, operation
ORDER BY modifications DESC;
-- 4. Administrative changes (DDL)
SELECT
timestamp,
user,
operation,
target,
query
FROM __audit_log
WHERE operation IN ('CREATE_TABLE', 'DROP_TABLE', 'ALTER_TABLE')
AND timestamp >= NOW() - INTERVAL '90 days'
ORDER BY timestamp DESC;
SOC2 Compliance Report Generation¶
use heliosdb_lite::audit::AuditLogger;
use serde::{Deserialize, Serialize};
#[derive(Debug, Serialize, Deserialize)]
struct SOC2Report {
report_period: String,
total_operations: u64,
unique_users: u64,
failed_operations: u64,
schema_changes: u64,
data_modifications: u64,
user_activity: Vec<UserActivity>,
}
#[derive(Debug, Serialize, Deserialize)]
struct UserActivity {
user: String,
operations: u64,
first_access: String,
last_access: String,
}
fn generate_soc2_report(logger: &AuditLogger) -> Result<SOC2Report, Box<dyn std::error::Error>> {
// Query audit logs for last 90 days
let events = logger.query_audit_log(
"timestamp >= NOW() - INTERVAL '90 days'"
)?;
// Parse and aggregate data
let parsed_events = AuditQuery::parse_events(events)?;
let total_operations = parsed_events.len() as u64;
let failed_operations = parsed_events.iter()
.filter(|e| !e.success)
.count() as u64;
// Generate report
let report = SOC2Report {
report_period: "Last 90 days".to_string(),
total_operations,
unique_users: 0, // Calculate from events
failed_operations,
schema_changes: 0, // Calculate DDL events
data_modifications: 0, // Calculate DML events
user_activity: vec![],
};
// Export to JSON
let json = serde_json::to_string_pretty(&report)?;
std::fs::write("/tmp/soc2_audit_report.json", json)?;
Ok(report)
}
HIPAA Access Logging¶
HIPAA requires 7-year retention of all access to Protected Health Information (PHI).
Configuration for HIPAA¶
[audit]
enabled = true
log_ddl = true
log_dml = true
log_select = true # Required for PHI access tracking
log_auth = true
retention_days = 2555 # 7 years
enable_checksums = true # Mandatory tamper detection
[audit.capture_metadata]
capture_client_ip = true # Track access origin
capture_application_name = true
capture_execution_time = true
HIPAA-Specific Queries¶
-- Track all access to patient records table
SELECT
timestamp,
user,
operation,
query,
affected_rows
FROM __audit_log
WHERE target = 'patient_records'
AND timestamp >= NOW() - INTERVAL '7 years'
ORDER BY timestamp DESC;
-- Audit trail for specific patient
SELECT
timestamp,
user,
operation,
query
FROM __audit_log
WHERE query LIKE '%patient_id=12345%'
ORDER BY timestamp;
-- Daily access summary for PHI tables
SELECT
DATE(timestamp) as access_date,
COUNT(*) as total_access,
COUNT(DISTINCT user) as unique_users
FROM __audit_log
WHERE target IN ('patient_records', 'medical_history', 'prescriptions')
GROUP BY access_date
ORDER BY access_date DESC;
GDPR Data Access Records¶
GDPR requires logging all data processing activities and providing access logs to data subjects.
Configuration for GDPR¶
[audit]
enabled = true
log_ddl = true
log_dml = true
log_select = true # Track data access for subject requests
retention_days = 1095 # 3 years typical
enable_checksums = true
[audit.capture_metadata]
capture_client_ip = true # Track processing location
capture_custom_fields = true # Legal basis, purpose
GDPR Subject Access Request¶
-- Generate access log for data subject (user email)
SELECT
timestamp as access_time,
user as accessor,
operation as activity,
query as details
FROM __audit_log
WHERE query LIKE '%user@example.com%'
OR query LIKE '%user_id=67890%'
ORDER BY timestamp DESC;
-- Data processing activities summary
SELECT
operation,
COUNT(*) as occurrences,
MIN(timestamp) as first_processed,
MAX(timestamp) as last_processed
FROM __audit_log
WHERE target IN ('users', 'user_preferences', 'user_data')
AND query LIKE '%user_id=67890%'
GROUP BY operation;
GDPR Right to Erasure Audit¶
-- Log data deletion for GDPR compliance
-- (Would be logged automatically by audit system)
-- Verify data deletion was logged
SELECT
timestamp,
user,
operation,
target,
query,
affected_rows,
checksum
FROM __audit_log
WHERE operation = 'DELETE'
AND query LIKE '%user_id=67890%'
ORDER BY timestamp DESC;
7. Integration¶
SIEM Integration¶
Integrate HeliosDB-Lite audit logs with Security Information and Event Management (SIEM) systems.
Splunk Integration¶
Export audit logs to Splunk using HTTP Event Collector:
use reqwest;
use serde_json::json;
async fn export_to_splunk(
logger: &AuditLogger,
splunk_url: &str,
splunk_token: &str
) -> Result<(), Box<dyn std::error::Error>> {
let client = reqwest::Client::new();
// Query recent audit events
let events = logger.query_audit_log(
"timestamp >= NOW() - INTERVAL '5 minutes'"
)?;
let parsed_events = AuditQuery::parse_events(events)?;
// Send each event to Splunk HEC
for event in parsed_events {
let payload = json!({
"event": {
"timestamp": event.timestamp,
"user": event.user,
"operation": event.operation.to_string(),
"target": event.target,
"query": event.query,
"success": event.success,
"checksum": event.checksum,
},
"sourcetype": "heliosdb:audit",
"index": "database_audit",
});
client.post(format!("{}/services/collector", splunk_url))
.header("Authorization", format!("Splunk {}", splunk_token))
.json(&payload)
.send()
.await?;
}
Ok(())
}
Elastic Stack (ELK) Integration¶
Send audit logs to Elasticsearch:
use elasticsearch::{Elasticsearch, http::transport::Transport};
use elasticsearch::IndexParts;
async fn export_to_elasticsearch(
logger: &AuditLogger,
es_url: &str
) -> Result<(), Box<dyn std::error::Error>> {
let transport = Transport::single_node(es_url)?;
let client = Elasticsearch::new(transport);
// Query audit events
let events = logger.query_audit_log(
"timestamp >= NOW() - INTERVAL '1 hour'"
)?;
let parsed_events = AuditQuery::parse_events(events)?;
// Index each event
for event in parsed_events {
let body = serde_json::json!({
"timestamp": event.timestamp,
"user": event.user,
"operation": event.operation.to_string(),
"target": event.target,
"query": event.query,
"affected_rows": event.affected_rows,
"success": event.success,
"error": event.error,
"checksum": event.checksum,
});
client.index(IndexParts::Index("heliosdb-audit"))
.body(body)
.send()
.await?;
}
Ok(())
}
Log Forwarding¶
Forward audit logs to external systems in real-time.
Syslog Forwarding¶
use syslog::{Facility, Severity};
fn forward_to_syslog(event: &AuditEvent) -> Result<(), Box<dyn std::error::Error>> {
let formatter = syslog::Formatter3164 {
facility: Facility::LOG_AUDIT,
hostname: None,
process: "heliosdb".to_string(),
pid: std::process::id(),
};
let mut writer = syslog::unix(formatter)?;
let message = format!(
"user={} operation={} target={} success={} checksum={}",
event.user,
event.operation,
event.target.as_ref().unwrap_or(&"N/A".to_string()),
event.success,
event.checksum
);
writer.send(Severity::LOG_INFO, &message)?;
Ok(())
}
Kafka Streaming¶
use rdkafka::producer::{FutureProducer, FutureRecord};
use rdkafka::config::ClientConfig;
async fn stream_to_kafka(
logger: &AuditLogger,
brokers: &str,
topic: &str
) -> Result<(), Box<dyn std::error::Error>> {
let producer: FutureProducer = ClientConfig::new()
.set("bootstrap.servers", brokers)
.set("message.timeout.ms", "5000")
.create()?;
// Continuously stream audit events
loop {
let events = logger.query_audit_log(
"id > (SELECT COALESCE(MAX(id), 0) FROM kafka_audit_offset)"
)?;
let parsed_events = AuditQuery::parse_events(events)?;
for event in parsed_events {
let payload = serde_json::to_string(&event)?;
let record = FutureRecord::to(topic)
.key(&event.id.to_string())
.payload(&payload);
producer.send(record, std::time::Duration::from_secs(0)).await?;
}
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
}
}
Alert Configuration¶
Configure real-time alerts based on audit events.
Failed Login Alerts¶
use lettre::transport::smtp::authentication::Credentials;
use lettre::{Message, SmtpTransport, Transport};
fn setup_failed_login_alert(logger: &AuditLogger) -> Result<(), Box<dyn std::error::Error>> {
// Monitor for failed login attempts
let failed_logins = logger.query_audit_log(
"operation = 'LOGIN' AND success = false AND timestamp >= NOW() - INTERVAL '5 minutes'"
)?;
let parsed_events = AuditQuery::parse_events(failed_logins)?;
if parsed_events.len() >= 5 {
// Send alert email
send_alert_email(
"security@example.com",
"Multiple Failed Login Attempts Detected",
&format!("{} failed login attempts in last 5 minutes", parsed_events.len())
)?;
}
Ok(())
}
fn send_alert_email(
to: &str,
subject: &str,
body: &str
) -> Result<(), Box<dyn std::error::Error>> {
let email = Message::builder()
.from("heliosdb@example.com".parse()?)
.to(to.parse()?)
.subject(subject)
.body(body.to_string())?;
let creds = Credentials::new(
"smtp_username".to_string(),
"smtp_password".to_string()
);
let mailer = SmtpTransport::relay("smtp.example.com")?
.credentials(creds)
.build();
mailer.send(&email)?;
Ok(())
}
Suspicious Activity Detection¶
fn detect_suspicious_activity(logger: &AuditLogger) -> Result<(), Box<dyn std::error::Error>> {
// Pattern 1: Mass deletion
let mass_deletes = logger.query_audit_log(
"operation = 'DELETE' AND affected_rows > 1000 AND timestamp >= NOW() - INTERVAL '1 hour'"
)?;
if !mass_deletes.is_empty() {
alert_security_team("Mass deletion detected", &mass_deletes)?;
}
// Pattern 2: After-hours access
let after_hours = logger.query_audit_log(
"EXTRACT(HOUR FROM timestamp) NOT BETWEEN 6 AND 22"
)?;
if !after_hours.is_empty() {
alert_security_team("After-hours database access", &after_hours)?;
}
// Pattern 3: Schema changes in production
let ddl_changes = logger.query_audit_log(
"operation IN ('DROP_TABLE', 'ALTER_TABLE') AND timestamp >= NOW() - INTERVAL '1 hour'"
)?;
if !ddl_changes.is_empty() {
alert_security_team("Schema changes detected", &ddl_changes)?;
}
Ok(())
}
fn alert_security_team(
alert_type: &str,
events: &[Tuple]
) -> Result<(), Box<dyn std::error::Error>> {
println!("SECURITY ALERT: {}", alert_type);
println!("Events: {:?}", events.len());
// Send notification (email, Slack, PagerDuty, etc.)
Ok(())
}
8. Tamper Detection¶
Checksum Verification¶
Every audit event includes a SHA-256 checksum for tamper detection.
Understanding Checksums¶
The checksum is computed over the following event fields: - Event ID - Timestamp - Session ID - User - Operation - Target - Query - Affected rows - Success status - Error message
Example checksum calculation:
use sha2::{Sha256, Digest};
fn calculate_checksum(event: &AuditEvent) -> String {
let mut hasher = Sha256::new();
// Add all event fields to hash
hasher.update(event.id.to_string());
hasher.update(event.timestamp.to_rfc3339());
hasher.update(&event.session_id);
hasher.update(&event.user);
hasher.update(event.operation.to_string());
if let Some(target) = &event.target {
hasher.update(target);
}
hasher.update(&event.query);
hasher.update(event.affected_rows.to_string());
hasher.update(event.success.to_string());
if let Some(error) = &event.error {
hasher.update(error);
}
// Return hex-encoded hash
format!("{:x}", hasher.finalize())
}
Verifying Event Integrity¶
fn verify_audit_log_integrity(logger: &AuditLogger) -> Result<(), Box<dyn std::error::Error>> {
// Retrieve all audit events
let events = logger.query_audit_log("")?;
let parsed_events = AuditQuery::parse_events(events)?;
let mut tampered_count = 0;
for event in parsed_events {
if !event.verify_checksum() {
eprintln!("TAMPER DETECTED: Event {} has invalid checksum", event.id);
eprintln!(" User: {}", event.user);
eprintln!(" Operation: {}", event.operation);
eprintln!(" Timestamp: {}", event.timestamp);
eprintln!(" Stored checksum: {}", event.checksum);
tampered_count += 1;
}
}
if tampered_count > 0 {
eprintln!("WARNING: {} tampered audit events detected!", tampered_count);
} else {
println!("Audit log integrity verified: All checksums valid");
}
Ok(())
}
Batch Verification Query¶
-- Find events with potential tampering
-- (This requires implementing checksum verification in SQL)
SELECT
id,
timestamp,
user,
operation,
checksum
FROM __audit_log
WHERE LENGTH(checksum) != 64 -- SHA-256 is always 64 hex chars
OR checksum NOT GLOB '[0-9a-f]*'; -- Invalid hex characters
Chain of Custody¶
Implement a chain-of-custody model where each event's checksum includes the previous event's checksum:
use sha2::{Sha256, Digest};
fn calculate_chained_checksum(event: &AuditEvent, previous_checksum: &str) -> String {
let mut hasher = Sha256::new();
// Include previous checksum in hash
hasher.update(previous_checksum);
// Add current event fields
hasher.update(event.id.to_string());
hasher.update(event.timestamp.to_rfc3339());
hasher.update(&event.session_id);
hasher.update(&event.user);
hasher.update(event.operation.to_string());
if let Some(target) = &event.target {
hasher.update(target);
}
hasher.update(&event.query);
hasher.update(event.affected_rows.to_string());
hasher.update(event.success.to_string());
if let Some(error) = &event.error {
hasher.update(error);
}
format!("{:x}", hasher.finalize())
}
fn verify_chain_of_custody(logger: &AuditLogger) -> Result<bool, Box<dyn std::error::Error>> {
let events = logger.query_audit_log("")?;
let parsed_events = AuditQuery::parse_events(events)?;
if parsed_events.is_empty() {
return Ok(true);
}
// Verify first event
if !parsed_events[0].verify_checksum() {
eprintln!("Chain broken: First event checksum invalid");
return Ok(false);
}
// Verify chain integrity
for i in 1..parsed_events.len() {
let prev_checksum = &parsed_events[i - 1].checksum;
let current_event = &parsed_events[i];
let expected_checksum = calculate_chained_checksum(current_event, prev_checksum);
if expected_checksum != current_event.checksum {
eprintln!("Chain broken at event {}: Checksum mismatch", current_event.id);
eprintln!(" Expected: {}", expected_checksum);
eprintln!(" Found: {}", current_event.checksum);
return Ok(false);
}
}
println!("Chain of custody verified: All {} events linked correctly", parsed_events.len());
Ok(true)
}
Forensic Analysis¶
Use audit logs and checksums for forensic investigation:
Identify Tampering Window¶
-- Find the time range where tampering might have occurred
-- by identifying gaps in event IDs or timestamps
SELECT
id,
timestamp,
LAG(id) OVER (ORDER BY id) as prev_id,
LAG(timestamp) OVER (ORDER BY timestamp) as prev_timestamp,
(id - LAG(id) OVER (ORDER BY id)) as id_gap,
EXTRACT(EPOCH FROM (timestamp - LAG(timestamp) OVER (ORDER BY timestamp))) as time_gap_seconds
FROM __audit_log
WHERE (id - LAG(id) OVER (ORDER BY id)) > 1 -- Missing IDs
OR EXTRACT(EPOCH FROM (timestamp - LAG(timestamp) OVER (ORDER BY timestamp))) < 0 -- Time went backwards
ORDER BY id;
Reconstruct Missing Events¶
fn detect_missing_events(logger: &AuditLogger) -> Result<Vec<u64>, Box<dyn std::error::Error>> {
let events = logger.query_audit_log("ORDER BY id")?;
let parsed_events = AuditQuery::parse_events(events)?;
let mut missing_ids = Vec::new();
for i in 1..parsed_events.len() {
let prev_id = parsed_events[i - 1].id;
let curr_id = parsed_events[i].id;
// Check for gaps in ID sequence
if curr_id - prev_id > 1 {
for missing_id in (prev_id + 1)..curr_id {
eprintln!("WARNING: Event ID {} is missing from audit log", missing_id);
missing_ids.push(missing_id);
}
}
}
if !missing_ids.is_empty() {
eprintln!("Forensic analysis: {} events missing or deleted", missing_ids.len());
}
Ok(missing_ids)
}
Timeline Reconstruction¶
use chrono::{DateTime, Utc};
#[derive(Debug)]
struct AuditTimeline {
events: Vec<TimelineEvent>,
anomalies: Vec<String>,
}
#[derive(Debug)]
struct TimelineEvent {
id: u64,
timestamp: DateTime<Utc>,
user: String,
operation: String,
target: Option<String>,
integrity_verified: bool,
}
fn reconstruct_timeline(logger: &AuditLogger) -> Result<AuditTimeline, Box<dyn std::error::Error>> {
let events = logger.query_audit_log("ORDER BY timestamp")?;
let parsed_events = AuditQuery::parse_events(events)?;
let mut timeline = AuditTimeline {
events: Vec::new(),
anomalies: Vec::new(),
};
for event in parsed_events {
// Verify integrity
let integrity_verified = event.verify_checksum();
if !integrity_verified {
timeline.anomalies.push(
format!("Event {} has invalid checksum at {}", event.id, event.timestamp)
);
}
timeline.events.push(TimelineEvent {
id: event.id,
timestamp: event.timestamp,
user: event.user.clone(),
operation: event.operation.to_string(),
target: event.target.clone(),
integrity_verified,
});
}
// Detect time anomalies
for i in 1..timeline.events.len() {
if timeline.events[i].timestamp < timeline.events[i - 1].timestamp {
timeline.anomalies.push(
format!("Time went backwards between events {} and {}",
timeline.events[i - 1].id,
timeline.events[i].id)
);
}
}
Ok(timeline)
}
Summary¶
This tutorial covered:
- Introduction - Understanding audit logging, compliance requirements, and importance
- Configuration - Enabling and customizing audit logging with presets and TOML
- Event Types - DDL, DML, security, and query events
- Querying Audit Logs - SQL queries and programmatic interfaces
- Log Management - Rotation, archival, and retention policies
- Compliance Scenarios - SOC2, HIPAA, and GDPR implementations
- Integration - SIEM systems, log forwarding, and alerting
- Tamper Detection - Checksum verification, chain of custody, and forensic analysis
Next Steps¶
- Explore Examples: Run
cargo run --example audit_demoto see audit logging in action - Review Documentation: Read
/docs/features/AUDIT_LOGGING.mdfor detailed API reference - Run Tests: Execute
cargo test --test audit_teststo understand test patterns - Implement Compliance: Use configuration presets for your regulatory requirements
- Set Up Monitoring: Configure alerts and SIEM integration for your environment
Additional Resources¶
Questions or Feedback? Open an issue on the HeliosDB-Lite repository or consult the community forum.