Skip to content

Audit Logging Tutorial for HeliosDB-Lite

Version: 1.0 Last Updated: 2025-12-01 Target Audience: Developers, DevOps Engineers, Compliance Teams Estimated Time: 45-60 minutes


Table of Contents

  1. Introduction
  2. Configuration
  3. Event Types
  4. Querying Audit Logs
  5. Log Management
  6. Compliance Scenarios
  7. Integration
  8. Tamper Detection

1. Introduction

What is Audit Logging?

Audit logging is a comprehensive tracking mechanism that records all significant database operations, providing a tamper-proof trail of who did what, when, and whether it succeeded. In HeliosDB-Lite, audit logging is a built-in, high-performance feature that operates with sub-millisecond overhead.

Key characteristics of HeliosDB-Lite audit logging:

  • Tamper-Proof: Append-only logs with cryptographic SHA-256 checksums
  • High-Performance: Asynchronous buffered logging (<0.1ms overhead)
  • Configurable: Fine-grained control over what operations are logged
  • Queryable: Standard SQL interface for audit log analysis
  • Embedded: Zero external dependencies or infrastructure

Compliance Requirements

Audit logging is critical for meeting regulatory compliance requirements:

Regulation Requirements HeliosDB-Lite Support
SOC2 Track who, what, when, and result of all data access ✅ User, operation, timestamp, success tracking
HIPAA 7-year retention of all PHI access and modifications ✅ Configurable retention up to 7+ years
GDPR Log all data processing activities, right to erasure ✅ Comprehensive DML logging with metadata
PCI-DSS Audit trail for all access to cardholder data ✅ Tamper-proof logs with cryptographic integrity
FDA 21 CFR Part 11 Electronic records with secure timestamps ✅ Immutable timestamps with checksums

Audit Trail Importance

Audit trails serve multiple critical business functions:

  1. Compliance: Meet regulatory requirements and pass audits
  2. Security: Detect and investigate unauthorized access or data breaches
  3. Forensics: Reconstruct events leading to data corruption or loss
  4. Debugging: Trace application behavior and identify root causes
  5. Accountability: Create a record of who made changes to sensitive data
  6. Legal Protection: Provide evidence for legal proceedings or disputes

Real-world example: A healthcare provider using HeliosDB-Lite reduced compliance audit preparation time from 40 hours to 8 hours by leveraging SQL-queryable audit logs with built-in tamper detection.


2. Configuration

Enabling Audit Logging

Audit logging can be enabled through configuration files or programmatically in your application code.

TOML Configuration File

Create a configuration file heliosdb.toml:

[database]
path = "/var/lib/heliosdb/myapp.db"
memory_limit_mb = 512
enable_wal = true

[audit]
enabled = true
log_ddl = true
log_dml = true
log_select = false
log_transactions = false
log_auth = true
retention_days = 90
async_buffer_size = 100
enable_checksums = true
max_query_length = 10000

[audit.capture_metadata]
capture_client_ip = true
capture_application_name = true
capture_database_name = true
capture_execution_time = true
capture_custom_fields = false

Programmatic Configuration (Rust)

use heliosdb_lite::{EmbeddedDatabase, Config};
use heliosdb_lite::audit::{AuditLogger, AuditConfig, MetadataCapture};
use std::sync::Arc;

fn setup_audit_logging() -> Result<AuditLogger, Box<dyn std::error::Error>> {
    // Create database
    let config = Config::in_memory();
    let db = EmbeddedDatabase::new_in_memory()?;

    // Get storage reference
    let storage = Arc::new(
        heliosdb_lite::storage::StorageEngine::open_in_memory(&config)?
    );

    // Custom audit configuration
    let audit_config = AuditConfig {
        enabled: true,
        log_ddl: true,
        log_dml: true,
        log_select: false,
        log_transactions: true,
        log_auth: true,
        retention_days: 90,
        async_buffer_size: 100,
        enable_checksums: true,
        max_query_length: 10000,
        capture_metadata: MetadataCapture {
            capture_client_ip: true,
            capture_application_name: true,
            capture_database_name: true,
            capture_execution_time: true,
            capture_custom_fields: false,
        },
    };

    // Initialize audit logger
    let mut logger = AuditLogger::new(storage, audit_config)?;

    // Set session context
    logger.set_user("admin".to_string());
    logger.set_session_id(uuid::Uuid::new_v4().to_string());

    Ok(logger)
}

Log Destinations

HeliosDB-Lite stores audit logs in an internal table and supports multiple output formats:

Internal Table Storage (Default)

All audit events are stored in the __audit_log system table:

-- View audit log table schema
SELECT * FROM __audit_log LIMIT 1;

The __audit_log table is automatically created when audit logging is initialized.

File-Based Log Export

You can export audit logs to files for archival or external processing:

use heliosdb_lite::audit::AuditLogger;
use std::fs::File;
use std::io::Write;

fn export_audit_logs(logger: &AuditLogger) -> Result<(), Box<dyn std::error::Error>> {
    // Query all audit events
    let events = logger.query_audit_log("")?;

    // Write to JSON file
    let mut file = File::create("/var/log/heliosdb/audit.json")?;

    for event in events {
        // Convert tuple to JSON (pseudo-code)
        let json = format_event_as_json(&event);
        writeln!(file, "{}", json)?;
    }

    Ok(())
}

Database Table Export

Export audit logs to a separate archive table:

-- Create archive table
CREATE TABLE audit_log_archive (
    id INT8 PRIMARY KEY,
    timestamp TIMESTAMP,
    session_id TEXT,
    user TEXT,
    operation TEXT,
    target TEXT,
    query TEXT,
    affected_rows INT8,
    success BOOLEAN,
    error TEXT,
    checksum TEXT
);

-- Copy old audit logs to archive
INSERT INTO audit_log_archive
SELECT * FROM __audit_log
WHERE timestamp < NOW() - INTERVAL '90 days';

Log Format Options

JSON Format

Export audit events as JSON for integration with external systems:

{
  "id": 12345,
  "timestamp": "2025-12-01T10:30:45.123Z",
  "session_id": "550e8400-e29b-41d4-a716-446655440000",
  "user": "admin",
  "operation": "INSERT",
  "target": "users",
  "query": "INSERT INTO users (id, name, email) VALUES (1, 'Alice', 'alice@example.com')",
  "affected_rows": 1,
  "success": true,
  "error": null,
  "checksum": "a8f5f167f44f4964e6c998dee827110c062df690bcaf8e94e0c0e67c03c3e2fa"
}

CSV Format

Export audit events as CSV for spreadsheet analysis:

id,timestamp,user,operation,target,affected_rows,success
12345,2025-12-01T10:30:45.123Z,admin,INSERT,users,1,true
12346,2025-12-01T10:31:12.456Z,admin,UPDATE,users,1,true
12347,2025-12-01T10:32:05.789Z,admin,DELETE,users,1,true

Configuration Presets

HeliosDB-Lite provides pre-configured audit profiles for common use cases:

Default Configuration

Balanced configuration for most applications:

let config = AuditConfig::default();
// Logs: DDL, DML (not SELECT), Auth
// Retention: 90 days
// Checksums: Enabled

Minimal Configuration

Lightweight configuration for low-overhead environments:

let config = AuditConfig::minimal();
// Logs: DDL only
// Retention: 30 days
// Checksums: Disabled

Verbose Configuration

Comprehensive logging for debugging and forensics:

let config = AuditConfig::verbose();
// Logs: DDL, DML, SELECT, Transactions, Auth
// Retention: 365 days
// Checksums: Enabled

Compliance Configuration

Strict configuration for regulatory compliance:

let config = AuditConfig::compliance();
// Logs: DDL, DML, Transactions, Auth
// Retention: 2555 days (7 years)
// Checksums: Enabled (mandatory)

Configuration Comparison Table:

Feature Minimal Default Verbose Compliance
DDL Logging
DML Logging
SELECT Logging
Transaction Logging
Auth Logging
Retention (days) 30 90 365 2555
Checksums
Overhead <0.05ms <0.1ms <0.2ms <0.1ms

3. Event Types

HeliosDB-Lite audit logging tracks multiple categories of database operations.

DDL Events (Data Definition Language)

DDL events track changes to database schema:

Operation Description Example
CREATE_TABLE New table creation CREATE TABLE users (id INT, name TEXT)
DROP_TABLE Table deletion DROP TABLE users
ALTER_TABLE Table schema modification ALTER TABLE users ADD COLUMN email TEXT
CREATE_INDEX Index creation CREATE INDEX idx_users_name ON users(name)
DROP_INDEX Index deletion DROP INDEX idx_users_name

Example: Logging DDL Operations

use heliosdb_lite::audit::AuditLogger;

fn log_schema_change(logger: &AuditLogger) -> Result<(), Box<dyn std::error::Error>> {
    // Log successful table creation
    logger.log_ddl(
        "CREATE TABLE",
        "users",
        "CREATE TABLE users (id INT PRIMARY KEY, name TEXT, email TEXT)",
        true,  // success
        None,  // no error
    )?;

    // Log failed index creation
    logger.log_ddl(
        "CREATE INDEX",
        "users",
        "CREATE INDEX idx_invalid ON nonexistent_table(name)",
        false,  // failed
        Some("Table does not exist"),
    )?;

    Ok(())
}

DML Events (Data Manipulation Language)

DML events track data modifications:

Operation Description Example
INSERT New row insertion INSERT INTO users VALUES (1, 'Alice')
UPDATE Row modification UPDATE users SET name='Bob' WHERE id=1
DELETE Row deletion DELETE FROM users WHERE id=1

Example: Logging DML Operations

fn log_data_changes(logger: &AuditLogger) -> Result<(), Box<dyn std::error::Error>> {
    // Log INSERT operation
    logger.log_dml(
        "INSERT",
        "users",
        "INSERT INTO users (id, name, email) VALUES (1, 'Alice', 'alice@example.com')",
        1,     // affected rows
        true,  // success
        None,  // no error
    )?;

    // Log UPDATE operation
    logger.log_dml(
        "UPDATE",
        "users",
        "UPDATE users SET email='alice.smith@example.com' WHERE id=1",
        1,     // affected rows
        true,  // success
        None,
    )?;

    // Log DELETE operation
    logger.log_dml(
        "DELETE",
        "users",
        "DELETE FROM users WHERE id=1",
        1,     // affected rows
        true,  // success
        None,
    )?;

    Ok(())
}

Security Events

Security events track authentication and authorization:

Operation Description Example
LOGIN User authentication User login successful
LOGOUT User session termination User logout
GRANT Permission granted GRANT SELECT ON users TO analyst
REVOKE Permission revoked REVOKE DELETE ON users FROM analyst

Example: Logging Security Events

fn log_auth_events(logger: &AuditLogger) -> Result<(), Box<dyn std::error::Error>> {
    use heliosdb_lite::audit::OperationType;
    use heliosdb_lite::audit::AuditMetadata;

    // Log successful login
    logger.log_operation(
        OperationType::Login,
        None,  // no specific target
        "User 'admin' logged in from 192.168.1.100",
        0,     // no rows affected
        true,  // success
        None,  // no error
        AuditMetadata::default(),
    )?;

    // Log failed login attempt
    logger.log_operation(
        OperationType::Login,
        None,
        "User 'hacker' failed login attempt",
        0,
        false,  // failed
        Some("Invalid credentials"),
        AuditMetadata::default(),
    )?;

    Ok(())
}

Query Events (SELECT)

Query events track data access (disabled by default due to verbosity):

Example: Logging SELECT Queries

fn log_query(logger: &AuditLogger) -> Result<(), Box<dyn std::error::Error>> {
    // Enable SELECT logging first in config
    logger.log_select(
        "users",
        "SELECT * FROM users WHERE email LIKE '%@example.com'",
        15,         // row count
        Some(12),   // execution time in milliseconds
    )?;

    Ok(())
}

Note: SELECT logging is disabled by default because it generates high log volume. Enable only when required for compliance or debugging.

Transaction Events

Transaction events track transaction boundaries:

Operation Description Example
BEGIN Transaction start BEGIN TRANSACTION
COMMIT Transaction commit COMMIT
ROLLBACK Transaction abort ROLLBACK

4. Querying Audit Logs

The __audit_log Table

All audit events are stored in the __audit_log system table with the following schema:

CREATE TABLE __audit_log (
    id INT8 PRIMARY KEY,              -- Unique event ID
    timestamp TIMESTAMP,               -- When operation occurred
    session_id TEXT,                   -- Session identifier
    user TEXT,                         -- User who performed operation
    operation TEXT,                    -- Operation type (INSERT, UPDATE, etc.)
    target TEXT,                       -- Target object (table name, etc.)
    query TEXT,                        -- Full SQL query
    affected_rows INT8,                -- Number of rows affected
    success BOOLEAN,                   -- Whether operation succeeded
    error TEXT,                        -- Error message (if failed)
    checksum TEXT                      -- SHA-256 tamper detection checksum
);

Basic Queries

View Recent Audit Events

-- Get last 100 audit events
SELECT * FROM __audit_log
ORDER BY id DESC
LIMIT 100;

View All Operations by a Specific User

-- Get all operations by user 'admin'
SELECT
    timestamp,
    operation,
    target,
    affected_rows,
    success
FROM __audit_log
WHERE user = 'admin'
ORDER BY timestamp DESC;

View Failed Operations

-- Get all failed operations
SELECT
    timestamp,
    user,
    operation,
    target,
    query,
    error
FROM __audit_log
WHERE success = false
ORDER BY timestamp DESC;

Filtering by Event Type

-- Get all INSERT operations
SELECT * FROM __audit_log
WHERE operation = 'INSERT';

-- Get all DDL operations (schema changes)
SELECT * FROM __audit_log
WHERE operation IN ('CREATE_TABLE', 'DROP_TABLE', 'ALTER_TABLE', 'CREATE_INDEX', 'DROP_INDEX');

-- Get all DML operations (data changes)
SELECT * FROM __audit_log
WHERE operation IN ('INSERT', 'UPDATE', 'DELETE');

-- Get all authentication events
SELECT * FROM __audit_log
WHERE operation IN ('LOGIN', 'LOGOUT', 'GRANT', 'REVOKE');

Time-Based Queries

-- Events from the last 24 hours
SELECT * FROM __audit_log
WHERE timestamp >= NOW() - INTERVAL '24 hours'
ORDER BY timestamp DESC;

-- Events within a specific date range
SELECT * FROM __audit_log
WHERE timestamp >= '2025-12-01T00:00:00Z'
  AND timestamp <= '2025-12-31T23:59:59Z'
ORDER BY timestamp;

-- Events grouped by hour
SELECT
    DATE_TRUNC('hour', timestamp) as hour,
    COUNT(*) as event_count,
    SUM(CASE WHEN success THEN 1 ELSE 0 END) as successful,
    SUM(CASE WHEN NOT success THEN 1 ELSE 0 END) as failed
FROM __audit_log
GROUP BY hour
ORDER BY hour DESC;

Advanced Queries

Most Active Users

SELECT
    user,
    COUNT(*) as total_operations,
    SUM(CASE WHEN operation IN ('INSERT', 'UPDATE', 'DELETE') THEN 1 ELSE 0 END) as data_modifications,
    SUM(CASE WHEN NOT success THEN 1 ELSE 0 END) as failed_operations
FROM __audit_log
GROUP BY user
ORDER BY total_operations DESC;

Most Modified Tables

SELECT
    target as table_name,
    COUNT(*) as modifications,
    SUM(affected_rows) as total_rows_affected
FROM __audit_log
WHERE operation IN ('INSERT', 'UPDATE', 'DELETE')
  AND target IS NOT NULL
GROUP BY target
ORDER BY modifications DESC;

Operations by Time of Day

SELECT
    EXTRACT(HOUR FROM timestamp) as hour_of_day,
    COUNT(*) as operation_count
FROM __audit_log
GROUP BY hour_of_day
ORDER BY hour_of_day;

Programmatic Query Interface (Rust)

use heliosdb_lite::audit::{AuditQuery, AuditFilter, OperationType};

fn query_audit_logs(logger: &AuditLogger) -> Result<(), Box<dyn std::error::Error>> {
    // Build query using filter API
    let query = AuditQuery::new()
        .with_operation(OperationType::Insert)
        .with_user("admin".to_string())
        .with_success(true)
        .limit(100)
        .offset(0);

    // Build SQL and execute
    let sql = query.build_sql();
    let tuples = logger.query_audit_log(&sql)?;

    // Parse events
    let events = AuditQuery::parse_events(tuples)?;

    // Process results
    for event in events {
        println!("Event {}: {} on {} by {} at {}",
            event.id,
            event.operation,
            event.target.unwrap_or_default(),
            event.user,
            event.timestamp
        );

        // Verify checksum
        if !event.verify_checksum() {
            eprintln!("WARNING: Event {} has invalid checksum!", event.id);
        }
    }

    Ok(())
}

5. Log Management

Log Rotation

HeliosDB-Lite uses an append-only audit log model. For long-running systems, implement log rotation to manage storage:

Manual Rotation Script

#!/bin/bash
# rotate_audit_logs.sh - Daily audit log rotation script

DB_PATH="/var/lib/heliosdb/myapp.db"
ARCHIVE_DIR="/var/log/heliosdb/audit_archive"
RETENTION_DAYS=90

# Export old audit logs
heliosdb-cli -d "$DB_PATH" -e "
    COPY (
        SELECT * FROM __audit_log
        WHERE timestamp < NOW() - INTERVAL '$RETENTION_DAYS days'
    ) TO '$ARCHIVE_DIR/audit_$(date +%Y%m%d).csv' WITH CSV HEADER;
"

# Delete old logs from active table
heliosdb-cli -d "$DB_PATH" -e "
    DELETE FROM __audit_log
    WHERE timestamp < NOW() - INTERVAL '$RETENTION_DAYS days';
"

echo "Audit log rotation complete: $(date)"

Programmatic Rotation (Rust)

use heliosdb_lite::EmbeddedDatabase;
use chrono::{Utc, Duration};

fn rotate_audit_logs(db: &EmbeddedDatabase, retention_days: i64)
    -> Result<(), Box<dyn std::error::Error>>
{
    let cutoff_date = Utc::now() - Duration::days(retention_days);

    // Archive old logs to separate table
    let archive_query = format!(
        "INSERT INTO audit_log_archive \
         SELECT * FROM __audit_log \
         WHERE timestamp < '{}'",
        cutoff_date.to_rfc3339()
    );
    db.execute(&archive_query)?;

    // Delete archived logs from active table
    let delete_query = format!(
        "DELETE FROM __audit_log \
         WHERE timestamp < '{}'",
        cutoff_date.to_rfc3339()
    );
    db.execute(&delete_query)?;

    println!("Rotated audit logs older than {} days", retention_days);
    Ok(())
}

Archival Strategies

Compressed File Archive

use std::fs::File;
use std::io::Write;
use flate2::Compression;
use flate2::write::GzEncoder;

fn archive_to_compressed_file(
    logger: &AuditLogger,
    output_path: &str
) -> Result<(), Box<dyn std::error::Error>> {
    // Query old audit events
    let events = logger.query_audit_log(
        "timestamp < NOW() - INTERVAL '90 days'"
    )?;

    // Create compressed file
    let file = File::create(output_path)?;
    let mut encoder = GzEncoder::new(file, Compression::default());

    // Write events as JSON lines
    for event in events {
        let json = serde_json::to_string(&event)?;
        writeln!(encoder, "{}", json)?;
    }

    encoder.finish()?;
    Ok(())
}

Database Partition Archive

-- Create partitioned archive tables by year
CREATE TABLE audit_log_2024 (LIKE __audit_log);
CREATE TABLE audit_log_2025 (LIKE __audit_log);

-- Move 2024 logs to archive partition
INSERT INTO audit_log_2024
SELECT * FROM __audit_log
WHERE EXTRACT(YEAR FROM timestamp) = 2024;

-- Delete from active log
DELETE FROM __audit_log
WHERE EXTRACT(YEAR FROM timestamp) = 2024;

Cloud Storage Archive (AWS S3)

use rusoto_s3::{S3Client, PutObjectRequest, S3};
use rusoto_core::Region;

async fn archive_to_s3(
    logger: &AuditLogger,
    bucket: &str,
    key: &str
) -> Result<(), Box<dyn std::error::Error>> {
    let s3_client = S3Client::new(Region::UsEast1);

    // Export audit logs to JSON
    let events = logger.query_audit_log("")?;
    let json_data = serde_json::to_vec(&events)?;

    // Upload to S3
    let put_request = PutObjectRequest {
        bucket: bucket.to_string(),
        key: key.to_string(),
        body: Some(json_data.into()),
        ..Default::default()
    };

    s3_client.put_object(put_request).await?;
    Ok(())
}

Retention Policies

Implement automated retention policies based on compliance requirements:

Policy Definition

use serde::{Deserialize, Serialize};

#[derive(Debug, Serialize, Deserialize)]
struct RetentionPolicy {
    // Active log retention
    active_retention_days: u32,

    // Archive retention (before permanent deletion)
    archive_retention_days: u32,

    // Operation-specific overrides
    ddl_retention_days: Option<u32>,
    security_event_retention_days: Option<u32>,

    // Archive destination
    archive_destination: ArchiveDestination,
}

#[derive(Debug, Serialize, Deserialize)]
enum ArchiveDestination {
    LocalFile { path: String },
    S3 { bucket: String, prefix: String },
    Database { table: String },
}

Policy Enforcement

fn enforce_retention_policy(
    logger: &AuditLogger,
    policy: &RetentionPolicy
) -> Result<(), Box<dyn std::error::Error>> {
    use chrono::{Utc, Duration};

    let active_cutoff = Utc::now() - Duration::days(policy.active_retention_days as i64);
    let archive_cutoff = Utc::now() - Duration::days(
        (policy.active_retention_days + policy.archive_retention_days) as i64
    );

    // Archive logs older than active retention
    let archive_query = format!(
        "SELECT * FROM __audit_log WHERE timestamp < '{}'",
        active_cutoff.to_rfc3339()
    );
    let events_to_archive = logger.query_audit_log(&archive_query)?;

    // Write to archive destination
    match &policy.archive_destination {
        ArchiveDestination::Database { table } => {
            // Move to archive table
            println!("Archiving {} events to {}", events_to_archive.len(), table);
        },
        ArchiveDestination::S3 { bucket, prefix } => {
            // Upload to S3
            println!("Archiving {} events to s3://{}/{}",
                events_to_archive.len(), bucket, prefix);
        },
        _ => {}
    }

    // Delete archived logs from active table
    let delete_query = format!(
        "DELETE FROM __audit_log WHERE timestamp < '{}'",
        active_cutoff.to_rfc3339()
    );
    // Execute delete (implementation omitted)

    Ok(())
}

Compliance-Specific Retention

fn get_compliance_retention_policy(compliance_type: &str) -> RetentionPolicy {
    match compliance_type {
        "SOC2" => RetentionPolicy {
            active_retention_days: 90,
            archive_retention_days: 275,  // 1 year total
            ddl_retention_days: Some(365),
            security_event_retention_days: Some(730),  // 2 years
            archive_destination: ArchiveDestination::S3 {
                bucket: "compliance-audit-logs".to_string(),
                prefix: "soc2/".to_string(),
            },
        },
        "HIPAA" => RetentionPolicy {
            active_retention_days: 365,
            archive_retention_days: 2190,  // 7 years total (2555 days)
            ddl_retention_days: Some(2555),
            security_event_retention_days: Some(2555),
            archive_destination: ArchiveDestination::S3 {
                bucket: "hipaa-audit-logs".to_string(),
                prefix: "phi-access/".to_string(),
            },
        },
        "GDPR" => RetentionPolicy {
            active_retention_days: 180,
            archive_retention_days: 910,  // 3 years total
            ddl_retention_days: Some(365),
            security_event_retention_days: Some(1095),  // 3 years
            archive_destination: ArchiveDestination::Database {
                table: "audit_log_gdpr_archive".to_string(),
            },
        },
        _ => RetentionPolicy {
            active_retention_days: 90,
            archive_retention_days: 275,
            ddl_retention_days: None,
            security_event_retention_days: None,
            archive_destination: ArchiveDestination::LocalFile {
                path: "/var/log/heliosdb/archive".to_string(),
            },
        },
    }
}

6. Compliance Scenarios

SOC2 Audit Trails

SOC2 compliance requires tracking who accessed what data, when, and whether it succeeded.

Configuration for SOC2

[audit]
enabled = true
log_ddl = true                  # Track schema changes
log_dml = true                  # Track data modifications
log_select = false              # Optional (increases volume)
log_auth = true                 # Track authentication
retention_days = 90             # Minimum 90 days
enable_checksums = true         # Tamper detection

SOC2 Audit Queries

-- 1. User access report
SELECT
    user,
    COUNT(*) as operations,
    MIN(timestamp) as first_access,
    MAX(timestamp) as last_access
FROM __audit_log
WHERE timestamp >= NOW() - INTERVAL '90 days'
GROUP BY user;

-- 2. Failed operations (security incidents)
SELECT
    timestamp,
    user,
    operation,
    target,
    error
FROM __audit_log
WHERE success = false
  AND timestamp >= NOW() - INTERVAL '90 days'
ORDER BY timestamp DESC;

-- 3. Data modifications by table
SELECT
    target,
    operation,
    COUNT(*) as modifications,
    SUM(affected_rows) as rows_affected
FROM __audit_log
WHERE operation IN ('INSERT', 'UPDATE', 'DELETE')
  AND timestamp >= NOW() - INTERVAL '90 days'
GROUP BY target, operation
ORDER BY modifications DESC;

-- 4. Administrative changes (DDL)
SELECT
    timestamp,
    user,
    operation,
    target,
    query
FROM __audit_log
WHERE operation IN ('CREATE_TABLE', 'DROP_TABLE', 'ALTER_TABLE')
  AND timestamp >= NOW() - INTERVAL '90 days'
ORDER BY timestamp DESC;

SOC2 Compliance Report Generation

use heliosdb_lite::audit::AuditLogger;
use serde::{Deserialize, Serialize};

#[derive(Debug, Serialize, Deserialize)]
struct SOC2Report {
    report_period: String,
    total_operations: u64,
    unique_users: u64,
    failed_operations: u64,
    schema_changes: u64,
    data_modifications: u64,
    user_activity: Vec<UserActivity>,
}

#[derive(Debug, Serialize, Deserialize)]
struct UserActivity {
    user: String,
    operations: u64,
    first_access: String,
    last_access: String,
}

fn generate_soc2_report(logger: &AuditLogger) -> Result<SOC2Report, Box<dyn std::error::Error>> {
    // Query audit logs for last 90 days
    let events = logger.query_audit_log(
        "timestamp >= NOW() - INTERVAL '90 days'"
    )?;

    // Parse and aggregate data
    let parsed_events = AuditQuery::parse_events(events)?;

    let total_operations = parsed_events.len() as u64;
    let failed_operations = parsed_events.iter()
        .filter(|e| !e.success)
        .count() as u64;

    // Generate report
    let report = SOC2Report {
        report_period: "Last 90 days".to_string(),
        total_operations,
        unique_users: 0,  // Calculate from events
        failed_operations,
        schema_changes: 0,  // Calculate DDL events
        data_modifications: 0,  // Calculate DML events
        user_activity: vec![],
    };

    // Export to JSON
    let json = serde_json::to_string_pretty(&report)?;
    std::fs::write("/tmp/soc2_audit_report.json", json)?;

    Ok(report)
}

HIPAA Access Logging

HIPAA requires 7-year retention of all access to Protected Health Information (PHI).

Configuration for HIPAA

[audit]
enabled = true
log_ddl = true
log_dml = true
log_select = true               # Required for PHI access tracking
log_auth = true
retention_days = 2555           # 7 years
enable_checksums = true         # Mandatory tamper detection

[audit.capture_metadata]
capture_client_ip = true        # Track access origin
capture_application_name = true
capture_execution_time = true

HIPAA-Specific Queries

-- Track all access to patient records table
SELECT
    timestamp,
    user,
    operation,
    query,
    affected_rows
FROM __audit_log
WHERE target = 'patient_records'
  AND timestamp >= NOW() - INTERVAL '7 years'
ORDER BY timestamp DESC;

-- Audit trail for specific patient
SELECT
    timestamp,
    user,
    operation,
    query
FROM __audit_log
WHERE query LIKE '%patient_id=12345%'
ORDER BY timestamp;

-- Daily access summary for PHI tables
SELECT
    DATE(timestamp) as access_date,
    COUNT(*) as total_access,
    COUNT(DISTINCT user) as unique_users
FROM __audit_log
WHERE target IN ('patient_records', 'medical_history', 'prescriptions')
GROUP BY access_date
ORDER BY access_date DESC;

GDPR Data Access Records

GDPR requires logging all data processing activities and providing access logs to data subjects.

Configuration for GDPR

[audit]
enabled = true
log_ddl = true
log_dml = true
log_select = true               # Track data access for subject requests
retention_days = 1095           # 3 years typical
enable_checksums = true

[audit.capture_metadata]
capture_client_ip = true        # Track processing location
capture_custom_fields = true    # Legal basis, purpose

GDPR Subject Access Request

-- Generate access log for data subject (user email)
SELECT
    timestamp as access_time,
    user as accessor,
    operation as activity,
    query as details
FROM __audit_log
WHERE query LIKE '%user@example.com%'
   OR query LIKE '%user_id=67890%'
ORDER BY timestamp DESC;

-- Data processing activities summary
SELECT
    operation,
    COUNT(*) as occurrences,
    MIN(timestamp) as first_processed,
    MAX(timestamp) as last_processed
FROM __audit_log
WHERE target IN ('users', 'user_preferences', 'user_data')
  AND query LIKE '%user_id=67890%'
GROUP BY operation;

GDPR Right to Erasure Audit

-- Log data deletion for GDPR compliance
-- (Would be logged automatically by audit system)

-- Verify data deletion was logged
SELECT
    timestamp,
    user,
    operation,
    target,
    query,
    affected_rows,
    checksum
FROM __audit_log
WHERE operation = 'DELETE'
  AND query LIKE '%user_id=67890%'
ORDER BY timestamp DESC;

7. Integration

SIEM Integration

Integrate HeliosDB-Lite audit logs with Security Information and Event Management (SIEM) systems.

Splunk Integration

Export audit logs to Splunk using HTTP Event Collector:

use reqwest;
use serde_json::json;

async fn export_to_splunk(
    logger: &AuditLogger,
    splunk_url: &str,
    splunk_token: &str
) -> Result<(), Box<dyn std::error::Error>> {
    let client = reqwest::Client::new();

    // Query recent audit events
    let events = logger.query_audit_log(
        "timestamp >= NOW() - INTERVAL '5 minutes'"
    )?;

    let parsed_events = AuditQuery::parse_events(events)?;

    // Send each event to Splunk HEC
    for event in parsed_events {
        let payload = json!({
            "event": {
                "timestamp": event.timestamp,
                "user": event.user,
                "operation": event.operation.to_string(),
                "target": event.target,
                "query": event.query,
                "success": event.success,
                "checksum": event.checksum,
            },
            "sourcetype": "heliosdb:audit",
            "index": "database_audit",
        });

        client.post(format!("{}/services/collector", splunk_url))
            .header("Authorization", format!("Splunk {}", splunk_token))
            .json(&payload)
            .send()
            .await?;
    }

    Ok(())
}

Elastic Stack (ELK) Integration

Send audit logs to Elasticsearch:

use elasticsearch::{Elasticsearch, http::transport::Transport};
use elasticsearch::IndexParts;

async fn export_to_elasticsearch(
    logger: &AuditLogger,
    es_url: &str
) -> Result<(), Box<dyn std::error::Error>> {
    let transport = Transport::single_node(es_url)?;
    let client = Elasticsearch::new(transport);

    // Query audit events
    let events = logger.query_audit_log(
        "timestamp >= NOW() - INTERVAL '1 hour'"
    )?;

    let parsed_events = AuditQuery::parse_events(events)?;

    // Index each event
    for event in parsed_events {
        let body = serde_json::json!({
            "timestamp": event.timestamp,
            "user": event.user,
            "operation": event.operation.to_string(),
            "target": event.target,
            "query": event.query,
            "affected_rows": event.affected_rows,
            "success": event.success,
            "error": event.error,
            "checksum": event.checksum,
        });

        client.index(IndexParts::Index("heliosdb-audit"))
            .body(body)
            .send()
            .await?;
    }

    Ok(())
}

Log Forwarding

Forward audit logs to external systems in real-time.

Syslog Forwarding

use syslog::{Facility, Severity};

fn forward_to_syslog(event: &AuditEvent) -> Result<(), Box<dyn std::error::Error>> {
    let formatter = syslog::Formatter3164 {
        facility: Facility::LOG_AUDIT,
        hostname: None,
        process: "heliosdb".to_string(),
        pid: std::process::id(),
    };

    let mut writer = syslog::unix(formatter)?;

    let message = format!(
        "user={} operation={} target={} success={} checksum={}",
        event.user,
        event.operation,
        event.target.as_ref().unwrap_or(&"N/A".to_string()),
        event.success,
        event.checksum
    );

    writer.send(Severity::LOG_INFO, &message)?;
    Ok(())
}

Kafka Streaming

use rdkafka::producer::{FutureProducer, FutureRecord};
use rdkafka::config::ClientConfig;

async fn stream_to_kafka(
    logger: &AuditLogger,
    brokers: &str,
    topic: &str
) -> Result<(), Box<dyn std::error::Error>> {
    let producer: FutureProducer = ClientConfig::new()
        .set("bootstrap.servers", brokers)
        .set("message.timeout.ms", "5000")
        .create()?;

    // Continuously stream audit events
    loop {
        let events = logger.query_audit_log(
            "id > (SELECT COALESCE(MAX(id), 0) FROM kafka_audit_offset)"
        )?;

        let parsed_events = AuditQuery::parse_events(events)?;

        for event in parsed_events {
            let payload = serde_json::to_string(&event)?;

            let record = FutureRecord::to(topic)
                .key(&event.id.to_string())
                .payload(&payload);

            producer.send(record, std::time::Duration::from_secs(0)).await?;
        }

        tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
    }
}

Alert Configuration

Configure real-time alerts based on audit events.

Failed Login Alerts

use lettre::transport::smtp::authentication::Credentials;
use lettre::{Message, SmtpTransport, Transport};

fn setup_failed_login_alert(logger: &AuditLogger) -> Result<(), Box<dyn std::error::Error>> {
    // Monitor for failed login attempts
    let failed_logins = logger.query_audit_log(
        "operation = 'LOGIN' AND success = false AND timestamp >= NOW() - INTERVAL '5 minutes'"
    )?;

    let parsed_events = AuditQuery::parse_events(failed_logins)?;

    if parsed_events.len() >= 5 {
        // Send alert email
        send_alert_email(
            "security@example.com",
            "Multiple Failed Login Attempts Detected",
            &format!("{} failed login attempts in last 5 minutes", parsed_events.len())
        )?;
    }

    Ok(())
}

fn send_alert_email(
    to: &str,
    subject: &str,
    body: &str
) -> Result<(), Box<dyn std::error::Error>> {
    let email = Message::builder()
        .from("heliosdb@example.com".parse()?)
        .to(to.parse()?)
        .subject(subject)
        .body(body.to_string())?;

    let creds = Credentials::new(
        "smtp_username".to_string(),
        "smtp_password".to_string()
    );

    let mailer = SmtpTransport::relay("smtp.example.com")?
        .credentials(creds)
        .build();

    mailer.send(&email)?;
    Ok(())
}

Suspicious Activity Detection

fn detect_suspicious_activity(logger: &AuditLogger) -> Result<(), Box<dyn std::error::Error>> {
    // Pattern 1: Mass deletion
    let mass_deletes = logger.query_audit_log(
        "operation = 'DELETE' AND affected_rows > 1000 AND timestamp >= NOW() - INTERVAL '1 hour'"
    )?;

    if !mass_deletes.is_empty() {
        alert_security_team("Mass deletion detected", &mass_deletes)?;
    }

    // Pattern 2: After-hours access
    let after_hours = logger.query_audit_log(
        "EXTRACT(HOUR FROM timestamp) NOT BETWEEN 6 AND 22"
    )?;

    if !after_hours.is_empty() {
        alert_security_team("After-hours database access", &after_hours)?;
    }

    // Pattern 3: Schema changes in production
    let ddl_changes = logger.query_audit_log(
        "operation IN ('DROP_TABLE', 'ALTER_TABLE') AND timestamp >= NOW() - INTERVAL '1 hour'"
    )?;

    if !ddl_changes.is_empty() {
        alert_security_team("Schema changes detected", &ddl_changes)?;
    }

    Ok(())
}

fn alert_security_team(
    alert_type: &str,
    events: &[Tuple]
) -> Result<(), Box<dyn std::error::Error>> {
    println!("SECURITY ALERT: {}", alert_type);
    println!("Events: {:?}", events.len());
    // Send notification (email, Slack, PagerDuty, etc.)
    Ok(())
}

8. Tamper Detection

Checksum Verification

Every audit event includes a SHA-256 checksum for tamper detection.

Understanding Checksums

The checksum is computed over the following event fields: - Event ID - Timestamp - Session ID - User - Operation - Target - Query - Affected rows - Success status - Error message

Example checksum calculation:

use sha2::{Sha256, Digest};

fn calculate_checksum(event: &AuditEvent) -> String {
    let mut hasher = Sha256::new();

    // Add all event fields to hash
    hasher.update(event.id.to_string());
    hasher.update(event.timestamp.to_rfc3339());
    hasher.update(&event.session_id);
    hasher.update(&event.user);
    hasher.update(event.operation.to_string());

    if let Some(target) = &event.target {
        hasher.update(target);
    }

    hasher.update(&event.query);
    hasher.update(event.affected_rows.to_string());
    hasher.update(event.success.to_string());

    if let Some(error) = &event.error {
        hasher.update(error);
    }

    // Return hex-encoded hash
    format!("{:x}", hasher.finalize())
}

Verifying Event Integrity

fn verify_audit_log_integrity(logger: &AuditLogger) -> Result<(), Box<dyn std::error::Error>> {
    // Retrieve all audit events
    let events = logger.query_audit_log("")?;
    let parsed_events = AuditQuery::parse_events(events)?;

    let mut tampered_count = 0;

    for event in parsed_events {
        if !event.verify_checksum() {
            eprintln!("TAMPER DETECTED: Event {} has invalid checksum", event.id);
            eprintln!("  User: {}", event.user);
            eprintln!("  Operation: {}", event.operation);
            eprintln!("  Timestamp: {}", event.timestamp);
            eprintln!("  Stored checksum: {}", event.checksum);

            tampered_count += 1;
        }
    }

    if tampered_count > 0 {
        eprintln!("WARNING: {} tampered audit events detected!", tampered_count);
    } else {
        println!("Audit log integrity verified: All checksums valid");
    }

    Ok(())
}

Batch Verification Query

-- Find events with potential tampering
-- (This requires implementing checksum verification in SQL)
SELECT
    id,
    timestamp,
    user,
    operation,
    checksum
FROM __audit_log
WHERE LENGTH(checksum) != 64  -- SHA-256 is always 64 hex chars
   OR checksum NOT GLOB '[0-9a-f]*';  -- Invalid hex characters

Chain of Custody

Implement a chain-of-custody model where each event's checksum includes the previous event's checksum:

use sha2::{Sha256, Digest};

fn calculate_chained_checksum(event: &AuditEvent, previous_checksum: &str) -> String {
    let mut hasher = Sha256::new();

    // Include previous checksum in hash
    hasher.update(previous_checksum);

    // Add current event fields
    hasher.update(event.id.to_string());
    hasher.update(event.timestamp.to_rfc3339());
    hasher.update(&event.session_id);
    hasher.update(&event.user);
    hasher.update(event.operation.to_string());

    if let Some(target) = &event.target {
        hasher.update(target);
    }

    hasher.update(&event.query);
    hasher.update(event.affected_rows.to_string());
    hasher.update(event.success.to_string());

    if let Some(error) = &event.error {
        hasher.update(error);
    }

    format!("{:x}", hasher.finalize())
}

fn verify_chain_of_custody(logger: &AuditLogger) -> Result<bool, Box<dyn std::error::Error>> {
    let events = logger.query_audit_log("")?;
    let parsed_events = AuditQuery::parse_events(events)?;

    if parsed_events.is_empty() {
        return Ok(true);
    }

    // Verify first event
    if !parsed_events[0].verify_checksum() {
        eprintln!("Chain broken: First event checksum invalid");
        return Ok(false);
    }

    // Verify chain integrity
    for i in 1..parsed_events.len() {
        let prev_checksum = &parsed_events[i - 1].checksum;
        let current_event = &parsed_events[i];

        let expected_checksum = calculate_chained_checksum(current_event, prev_checksum);

        if expected_checksum != current_event.checksum {
            eprintln!("Chain broken at event {}: Checksum mismatch", current_event.id);
            eprintln!("  Expected: {}", expected_checksum);
            eprintln!("  Found: {}", current_event.checksum);
            return Ok(false);
        }
    }

    println!("Chain of custody verified: All {} events linked correctly", parsed_events.len());
    Ok(true)
}

Forensic Analysis

Use audit logs and checksums for forensic investigation:

Identify Tampering Window

-- Find the time range where tampering might have occurred
-- by identifying gaps in event IDs or timestamps
SELECT
    id,
    timestamp,
    LAG(id) OVER (ORDER BY id) as prev_id,
    LAG(timestamp) OVER (ORDER BY timestamp) as prev_timestamp,
    (id - LAG(id) OVER (ORDER BY id)) as id_gap,
    EXTRACT(EPOCH FROM (timestamp - LAG(timestamp) OVER (ORDER BY timestamp))) as time_gap_seconds
FROM __audit_log
WHERE (id - LAG(id) OVER (ORDER BY id)) > 1  -- Missing IDs
   OR EXTRACT(EPOCH FROM (timestamp - LAG(timestamp) OVER (ORDER BY timestamp))) < 0  -- Time went backwards
ORDER BY id;

Reconstruct Missing Events

fn detect_missing_events(logger: &AuditLogger) -> Result<Vec<u64>, Box<dyn std::error::Error>> {
    let events = logger.query_audit_log("ORDER BY id")?;
    let parsed_events = AuditQuery::parse_events(events)?;

    let mut missing_ids = Vec::new();

    for i in 1..parsed_events.len() {
        let prev_id = parsed_events[i - 1].id;
        let curr_id = parsed_events[i].id;

        // Check for gaps in ID sequence
        if curr_id - prev_id > 1 {
            for missing_id in (prev_id + 1)..curr_id {
                eprintln!("WARNING: Event ID {} is missing from audit log", missing_id);
                missing_ids.push(missing_id);
            }
        }
    }

    if !missing_ids.is_empty() {
        eprintln!("Forensic analysis: {} events missing or deleted", missing_ids.len());
    }

    Ok(missing_ids)
}

Timeline Reconstruction

use chrono::{DateTime, Utc};

#[derive(Debug)]
struct AuditTimeline {
    events: Vec<TimelineEvent>,
    anomalies: Vec<String>,
}

#[derive(Debug)]
struct TimelineEvent {
    id: u64,
    timestamp: DateTime<Utc>,
    user: String,
    operation: String,
    target: Option<String>,
    integrity_verified: bool,
}

fn reconstruct_timeline(logger: &AuditLogger) -> Result<AuditTimeline, Box<dyn std::error::Error>> {
    let events = logger.query_audit_log("ORDER BY timestamp")?;
    let parsed_events = AuditQuery::parse_events(events)?;

    let mut timeline = AuditTimeline {
        events: Vec::new(),
        anomalies: Vec::new(),
    };

    for event in parsed_events {
        // Verify integrity
        let integrity_verified = event.verify_checksum();

        if !integrity_verified {
            timeline.anomalies.push(
                format!("Event {} has invalid checksum at {}", event.id, event.timestamp)
            );
        }

        timeline.events.push(TimelineEvent {
            id: event.id,
            timestamp: event.timestamp,
            user: event.user.clone(),
            operation: event.operation.to_string(),
            target: event.target.clone(),
            integrity_verified,
        });
    }

    // Detect time anomalies
    for i in 1..timeline.events.len() {
        if timeline.events[i].timestamp < timeline.events[i - 1].timestamp {
            timeline.anomalies.push(
                format!("Time went backwards between events {} and {}",
                    timeline.events[i - 1].id,
                    timeline.events[i].id)
            );
        }
    }

    Ok(timeline)
}

Summary

This tutorial covered:

  1. Introduction - Understanding audit logging, compliance requirements, and importance
  2. Configuration - Enabling and customizing audit logging with presets and TOML
  3. Event Types - DDL, DML, security, and query events
  4. Querying Audit Logs - SQL queries and programmatic interfaces
  5. Log Management - Rotation, archival, and retention policies
  6. Compliance Scenarios - SOC2, HIPAA, and GDPR implementations
  7. Integration - SIEM systems, log forwarding, and alerting
  8. Tamper Detection - Checksum verification, chain of custody, and forensic analysis

Next Steps

  • Explore Examples: Run cargo run --example audit_demo to see audit logging in action
  • Review Documentation: Read /docs/features/AUDIT_LOGGING.md for detailed API reference
  • Run Tests: Execute cargo test --test audit_tests to understand test patterns
  • Implement Compliance: Use configuration presets for your regulatory requirements
  • Set Up Monitoring: Configure alerts and SIEM integration for your environment

Additional Resources


Questions or Feedback? Open an issue on the HeliosDB-Lite repository or consult the community forum.