You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
NORM/compliance_module/audit_logger.py

260 lines
8.3 KiB
Python

import os
import json
import logging
import datetime
import uuid
from typing import Dict, Any, Optional, List, Union
from fastapi import Request, Response
from starlette.middleware.base import BaseHTTPMiddleware
from functools import wraps
class AuditLogger:
"""
Audit logger for HIPAA compliance.
Records all data access and modifications for compliance and security audit purposes.
"""
def __init__(self, log_file: str = None):
"""
Initialize the audit logger.
Args:
log_file: Optional path to the audit log file
"""
self.logger = logging.getLogger("audit")
# Configure audit logger if not already configured
if not self.logger.handlers:
# Create a separate handler for audit logs
if log_file:
handler = logging.FileHandler(log_file)
else:
handler = logging.StreamHandler()
formatter = logging.Formatter(
'%(asctime)s [AUDIT] [%(levelname)s] %(message)s',
datefmt='%Y-%m-%dT%H:%M:%S%z'
)
handler.setFormatter(formatter)
self.logger.addHandler(handler)
self.logger.setLevel(logging.INFO)
# Ensure audit logs are always written, even if root logger level is higher
self.logger.propagate = False
def log_event(self, event_type: str, user_id: str, resource_type: str = None,
resource_id: str = None, action: str = None, details: Dict[str, Any] = None):
"""
Log an audit event.
Args:
event_type: Type of event (access, create, update, delete)
user_id: ID of the user performing the action
resource_type: Type of resource being accessed
resource_id: ID of the resource being accessed
action: Action being performed
details: Additional details about the event
"""
event = {
"timestamp": datetime.datetime.utcnow().isoformat(),
"event_id": str(uuid.uuid4()),
"event_type": event_type,
"user_id": user_id,
"resource_type": resource_type,
"resource_id": resource_id,
"action": action,
"details": details or {}
}
# Log the event
self.logger.info(json.dumps(event))
def log_access(self, user_id: str, resource_type: str, resource_id: str = None,
details: Dict[str, Any] = None):
"""
Log a resource access event.
Args:
user_id: ID of the user accessing the resource
resource_type: Type of resource being accessed
resource_id: ID of the resource being accessed
details: Additional details about the access
"""
self.log_event(
event_type="access",
user_id=user_id,
resource_type=resource_type,
resource_id=resource_id,
action="read",
details=details
)
def log_create(self, user_id: str, resource_type: str, resource_id: str,
details: Dict[str, Any] = None):
"""
Log a resource creation event.
Args:
user_id: ID of the user creating the resource
resource_type: Type of resource being created
resource_id: ID of the created resource
details: Additional details about the creation
"""
self.log_event(
event_type="create",
user_id=user_id,
resource_type=resource_type,
resource_id=resource_id,
action="create",
details=details
)
def log_update(self, user_id: str, resource_type: str, resource_id: str,
details: Dict[str, Any] = None):
"""
Log a resource update event.
Args:
user_id: ID of the user updating the resource
resource_type: Type of resource being updated
resource_id: ID of the updated resource
details: Additional details about the update
"""
self.log_event(
event_type="update",
user_id=user_id,
resource_type=resource_type,
resource_id=resource_id,
action="update",
details=details
)
def log_delete(self, user_id: str, resource_type: str, resource_id: str,
details: Dict[str, Any] = None):
"""
Log a resource deletion event.
Args:
user_id: ID of the user deleting the resource
resource_type: Type of resource being deleted
resource_id: ID of the deleted resource
details: Additional details about the deletion
"""
self.log_event(
event_type="delete",
user_id=user_id,
resource_type=resource_type,
resource_id=resource_id,
action="delete",
details=details
)
# Create a global audit logger instance
audit_logger = AuditLogger()
class AuditMiddleware(BaseHTTPMiddleware):
"""
Middleware for auditing API requests.
Records all API requests for compliance and security audit purposes.
"""
async def dispatch(self, request: Request, call_next):
"""
Process a request and log audit information.
Args:
request: The request object
call_next: The next middleware or route handler
Returns:
The response
"""
# Get start time
start_time = datetime.datetime.utcnow()
# Get request details
method = request.method
url = str(request.url)
client_host = request.client.host if request.client else "unknown"
# Get user ID from request if available
user_id = "unknown"
if hasattr(request.state, "user") and hasattr(request.state.user, "user_id"):
user_id = request.state.user.user_id
try:
# Call the next middleware or route handler
response = await call_next(request)
# Get response status code
status_code = response.status_code
# Log the request
audit_logger.log_event(
event_type="api_request",
user_id=user_id,
action=method,
details={
"url": url,
"status_code": status_code,
"client_host": client_host,
"duration_ms": int((datetime.datetime.utcnow() - start_time).total_seconds() * 1000)
}
)
return response
except Exception as e:
# Log the error
audit_logger.log_event(
event_type="api_error",
user_id=user_id,
action=method,
details={
"url": url,
"error": str(e),
"client_host": client_host,
"duration_ms": int((datetime.datetime.utcnow() - start_time).total_seconds() * 1000)
}
)
# Re-raise the exception
raise
def audit_access(resource_type: str):
"""
Decorator for auditing resource access.
Args:
resource_type: Type of resource being accessed
Returns:
Decorated function
"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
# Get user ID if available
user_id = "unknown"
if "user" in kwargs and "user_id" in kwargs["user"]:
user_id = kwargs["user"]["user_id"]
# Get resource ID if available
resource_id = kwargs.get("resource_id", None)
# Log the access
audit_logger.log_access(
user_id=user_id,
resource_type=resource_type,
resource_id=resource_id
)
# Call the original function
return func(*args, **kwargs)
return wrapper
return decorator