Add initial implementation of FHIR OCR POC

- Created a Docker Compose setup for the FHIR OCR application, including services for the main app, Keycloak for authentication, and a HAPI FHIR server.
- Added a README file detailing project structure, features, requirements, and setup instructions.
- Included necessary Python dependencies in requirements.txt.
- Implemented core modules for OCR processing, FHIR resource mapping, and security features.
- Developed test scripts for API security and OCR to FHIR flow.
- Established compliance and privacy modules for audit logging and data protection.
- Created sample data generation script for testing purposes.
- Set up a basic FastAPI application structure with endpoints for authentication and FHIR resource management.
main
jac is jake 1 year ago
commit c25548a2d7

@ -0,0 +1,172 @@
# FHIR-Based Healthcare Document Processing POC
This proof-of-concept application demonstrates a secure, FHIR-compliant system for processing healthcare documents with OCR and storing the extracted data via a FHIR API.
## Project Structure
- `ocr_module/`: OCR functionality using Tesseract for document text extraction
- `fhir_module/`: FHIR API implementation using HAPI FHIR for data storage and retrieval
- `security_module/`: Authentication and authorization using OAuth2/OpenID Connect
- `compliance_module/`: Audit logging and compliance features
- `api/`: Main application API that integrates all modules
- `docker/`: Docker and docker-compose configuration files
## Features
- Document processing with OCR to extract healthcare information
- FHIR-compliant data storage and retrieval
- OAuth2/OpenID Connect authentication
- Comprehensive audit logging
- Role-based access control
- Local deployment with Docker
## Requirements
- Docker and Docker Compose
- Python 3.9+
- Tesseract OCR engine (automatically installed in Docker)
## Getting Started
### Using Docker Compose (Recommended)
The easiest way to run the application is using Docker Compose:
1. Clone the repository
2. Navigate to the project root directory
3. Run Docker Compose:
```bash
docker-compose up
```
This will start the following services:
- FHIR OCR application at http://localhost:8000
- Keycloak authentication server at http://localhost:8181
- HAPI FHIR server at http://localhost:8090 (included but not integrated in the POC)
### Manual Setup
If you prefer to run the application without Docker:
1. Install Tesseract OCR engine on your system:
- **Ubuntu/Debian**: `sudo apt-get install tesseract-ocr`
- **macOS**: `brew install tesseract`
- **Windows**: Download installer from [Tesseract GitHub page](https://github.com/UB-Mannheim/tesseract/wiki)
2. Create and activate a Python virtual environment:
```bash
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
```
3. Install Python dependencies:
```bash
pip install -r requirements.txt
```
4. Run the application:
```bash
uvicorn api.app:app --host 0.0.0.0 --port 8000
```
## Usage
### 1. Authentication
To access the API, you need to obtain an authentication token:
```bash
curl -X POST http://localhost:8000/auth/token \
-H "Content-Type: application/json" \
-d '{"username": "admin", "password": "password"}'
```
This will return a JSON response with an access token:
```json
{
"access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...",
"token_type": "bearer"
}
```
### 2. Processing a Document
Use the token to upload and process a document:
```bash
curl -X POST http://localhost:8000/ocr/process \
-H "Authorization: Bearer YOUR_TOKEN_HERE" \
-F "file=@/path/to/document.jpg" \
-F "process_as=insurance_card"
```
The API will return the OCR results and the IDs of the created FHIR resources.
### 3. Retrieving FHIR Resources
Get a patient resource:
```bash
curl -X GET http://localhost:8000/fhir/Patient/PATIENT_ID \
-H "Authorization: Bearer YOUR_TOKEN_HERE"
```
Get an observation resource:
```bash
curl -X GET http://localhost:8000/fhir/Observation/OBSERVATION_ID \
-H "Authorization: Bearer YOUR_TOKEN_HERE"
```
### 4. Testing the OCR Flow Locally
For testing the OCR to FHIR flow without the API, use the provided test script:
```bash
python test_ocr_flow.py --image sample_data/your_image.jpg --output test_results
```
## Keycloak Setup
The Docker Compose configuration includes a Keycloak server for authentication.
For production use, you would need to:
1. Access the Keycloak admin console at http://localhost:8181
2. Log in with username `admin` and password `admin`
3. Create a new realm (e.g., `fhir-ocr`)
4. Create a new client (e.g., `fhir-ocr-client`)
5. Configure client access type as "confidential"
6. Add redirect URIs for your application
7. Create roles (e.g., `user`, `admin`)
8. Create users and assign roles
## Security and Compliance
The application includes several security and compliance features:
- **Authentication**: OAuth2/OpenID Connect with Keycloak
- **Authorization**: Role-based access control
- **Audit Logging**: All API calls and data access are logged
- **Privacy Filtering**: Sensitive data can be masked or redacted
## Docker Environment Variables
The following environment variables can be set in the docker-compose.yml file:
- `ENVIRONMENT`: `development` or `production`
- `JWT_SECRET_KEY`: Secret key for JWT token signing
- `JWT_ALGORITHM`: Algorithm for JWT token signing
- `ACCESS_TOKEN_EXPIRE_MINUTES`: Token expiration time in minutes
## API Documentation
API documentation is available at http://localhost:8000/docs when the application is running.
## License
This project uses open-source components:
- Tesseract OCR (Apache License 2.0)
- HAPI FHIR (Apache License 2.0)
- Keycloak (Apache License 2.0)

@ -0,0 +1,3 @@
"""
Main API module that integrates all components of the FHIR OCR application.
"""

@ -0,0 +1,343 @@
import os
import logging
import tempfile
from typing import Dict, Any, List, Optional, Union
from pathlib import Path
import json
import uvicorn
from fastapi import FastAPI, Depends, File, UploadFile, HTTPException, Form, Query
from fastapi.middleware.cors import CORSMiddleware
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from fastapi.responses import JSONResponse
from pydantic import BaseModel
# Import modules
from ocr_module.ocr_processor import OCRProcessor
from ocr_module.fhir_mapper import FHIRMapper
from fhir_module.fhir_repository import FHIRRepository
from security_module.auth import auth_handler
from compliance_module.audit_logger import audit_logger, AuditMiddleware
from compliance_module.privacy_filter import privacy_filter
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(name)s: %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger(__name__)
# Create FastAPI app
app = FastAPI(
title="FHIR OCR API",
description="API for processing healthcare documents with OCR and storing extracted data via FHIR",
version="0.1.0"
)
# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Adjust in production
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Add audit middleware
app.add_middleware(AuditMiddleware)
# Initialize components
ocr_processor = OCRProcessor()
fhir_mapper = FHIRMapper()
fhir_repository = FHIRRepository(storage_dir=os.path.join(os.getcwd(), "fhir_storage"))
# Security scheme for Swagger UI
security_scheme = HTTPBearer()
# Pydantic models for API requests/responses
class TokenRequest(BaseModel):
username: str
password: str
class TokenResponse(BaseModel):
access_token: str
token_type: str = "bearer"
class OCRRequest(BaseModel):
image_url: Optional[str] = None
process_as: Optional[str] = "auto" # 'insurance_card', 'lab_result', etc.
class OCRResponse(BaseModel):
raw_text: str
structured_data: Dict[str, Any]
confidence: float
patient_id: Optional[str] = None
observation_id: Optional[str] = None
class ResourceResponse(BaseModel):
resource_type: str
id: str
data: Dict[str, Any]
# API routes
@app.post("/auth/token", response_model=TokenResponse, tags=["Authentication"])
async def login_for_access_token(form_data: TokenRequest):
"""
Get an access token for API authentication.
"""
# This is a simplified authentication for the POC
# In production, this would validate credentials against a user database
# For POC, we just check if the username is not empty
if not form_data.username:
raise HTTPException(status_code=400, detail="Invalid username")
# Determine roles based on username (for demonstration)
roles = ["user"]
if form_data.username == "admin":
roles.append("admin")
# Create access token
access_token = auth_handler.create_access_token(form_data.username, roles)
# Log the authentication
audit_logger.log_event(
event_type="authentication",
user_id=form_data.username,
action="login",
details={"roles": roles}
)
return {"access_token": access_token, "token_type": "bearer"}
@app.post("/ocr/process", response_model=OCRResponse, tags=["OCR"])
async def process_document(
file: UploadFile = File(...),
process_as: str = Form("auto"),
user: Dict[str, Any] = Depends(auth_handler.get_current_user)
):
"""
Process a document with OCR and extract healthcare data.
Optionally specify how to process the document (insurance_card, lab_result, etc.)
"""
try:
# Save uploaded file to temporary location
with tempfile.NamedTemporaryFile(delete=False, suffix=os.path.splitext(file.filename)[1]) as temp_file:
temp_file.write(await file.read())
temp_file_path = temp_file.name
# Process image with OCR
ocr_result = ocr_processor.process_image(temp_file_path)
# If process_as is specified, override the detected document type
if process_as != "auto":
ocr_result["structured_data"]["document_type"] = process_as
# Map OCR data to FHIR resources
patient = fhir_mapper.map_to_patient(ocr_result)
# Create patient resource in FHIR repository
patient_data = fhir_repository.create_resource(patient)
# Log the creation
audit_logger.log_create(
user_id=user.get("user_id", "unknown"),
resource_type="Patient",
resource_id=patient.id,
details={"document_type": ocr_result["structured_data"]["document_type"]}
)
# Map to observation if applicable
observation_id = None
if ocr_result["structured_data"]["document_type"] in ["lab_result", "prescription"]:
observation = fhir_mapper.map_to_observation(ocr_result, patient.id)
if observation:
observation_data = fhir_repository.create_resource(observation)
observation_id = observation.id
# Log the creation
audit_logger.log_create(
user_id=user.get("user_id", "unknown"),
resource_type="Observation",
resource_id=observation.id,
details={"document_type": ocr_result["structured_data"]["document_type"]}
)
# Clean up temporary file
os.unlink(temp_file_path)
return {
"raw_text": ocr_result["raw_text"],
"structured_data": ocr_result["structured_data"],
"confidence": ocr_result["confidence"],
"patient_id": patient.id,
"observation_id": observation_id
}
except Exception as e:
logger.error(f"Error processing document: {str(e)}")
raise HTTPException(status_code=500, detail=f"Error processing document: {str(e)}")
@app.get("/fhir/Patient/{patient_id}", response_model=ResourceResponse, tags=["FHIR"])
async def get_patient(
patient_id: str,
mask_sensitive: bool = Query(False, description="Whether to mask sensitive information"),
user: Dict[str, Any] = Depends(auth_handler.has_role(["user", "admin"]))
):
"""
Get a patient resource by ID.
"""
try:
# Get patient resource
patient_data = fhir_repository.read_resource("Patient", patient_id)
# Log the access
audit_logger.log_access(
user_id=user.get("user_id", "unknown"),
resource_type="Patient",
resource_id=patient_id
)
# Apply privacy filter if requested
if mask_sensitive:
patient_data = privacy_filter.filter_resource(patient_data, "Patient")
return {
"resource_type": "Patient",
"id": patient_id,
"data": patient_data
}
except FileNotFoundError:
raise HTTPException(status_code=404, detail=f"Patient {patient_id} not found")
except Exception as e:
logger.error(f"Error retrieving patient: {str(e)}")
raise HTTPException(status_code=500, detail=f"Error retrieving patient: {str(e)}")
@app.get("/fhir/Observation/{observation_id}", response_model=ResourceResponse, tags=["FHIR"])
async def get_observation(
observation_id: str,
mask_sensitive: bool = Query(False, description="Whether to mask sensitive information"),
user: Dict[str, Any] = Depends(auth_handler.has_role(["user", "admin"]))
):
"""
Get an observation resource by ID.
"""
try:
# Get observation resource
observation_data = fhir_repository.read_resource("Observation", observation_id)
# Log the access
audit_logger.log_access(
user_id=user.get("user_id", "unknown"),
resource_type="Observation",
resource_id=observation_id
)
# Apply privacy filter if requested
if mask_sensitive:
observation_data = privacy_filter.filter_resource(observation_data, "Observation")
return {
"resource_type": "Observation",
"id": observation_id,
"data": observation_data
}
except FileNotFoundError:
raise HTTPException(status_code=404, detail=f"Observation {observation_id} not found")
except Exception as e:
logger.error(f"Error retrieving observation: {str(e)}")
raise HTTPException(status_code=500, detail=f"Error retrieving observation: {str(e)}")
@app.get("/fhir/Patient", response_model=List[ResourceResponse], tags=["FHIR"])
async def search_patients(
name: Optional[str] = None,
gender: Optional[str] = None,
mask_sensitive: bool = Query(False, description="Whether to mask sensitive information"),
user: Dict[str, Any] = Depends(auth_handler.has_role(["user", "admin"]))
):
"""
Search for patients.
"""
try:
# Build search parameters
params = {}
if name:
params["name.family"] = name
if gender:
params["gender"] = gender
# Search for patients
patients = fhir_repository.search_resources("Patient", params)
# Log the search
audit_logger.log_access(
user_id=user.get("user_id", "unknown"),
resource_type="Patient",
details={"search_params": params}
)
# Apply privacy filter if requested
if mask_sensitive:
patients = [privacy_filter.filter_resource(p, "Patient") for p in patients]
return [
{
"resource_type": "Patient",
"id": p.get("id", "unknown"),
"data": p
}
for p in patients
]
except Exception as e:
logger.error(f"Error searching patients: {str(e)}")
raise HTTPException(status_code=500, detail=f"Error searching patients: {str(e)}")
@app.delete("/fhir/Patient/{patient_id}", tags=["FHIR"])
async def delete_patient(
patient_id: str,
user: Dict[str, Any] = Depends(auth_handler.has_role(["admin"]))
):
"""
Delete a patient resource (admin only).
"""
try:
# Delete patient
success = fhir_repository.delete_resource("Patient", patient_id)
if not success:
raise HTTPException(status_code=404, detail=f"Patient {patient_id} not found")
# Log the deletion
audit_logger.log_delete(
user_id=user.get("user_id", "unknown"),
resource_type="Patient",
resource_id=patient_id
)
return {"detail": f"Patient {patient_id} deleted"}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error deleting patient: {str(e)}")
raise HTTPException(status_code=500, detail=f"Error deleting patient: {str(e)}")
@app.get("/health", tags=["System"])
async def health_check():
"""
Health check endpoint.
"""
return {"status": "OK", "version": app.version}
if __name__ == "__main__":
# Create FHIR storage directories
for resource_type in ["Patient", "Observation"]:
os.makedirs(os.path.join(os.getcwd(), "fhir_storage", resource_type), exist_ok=True)
# Run server
uvicorn.run(app, host="0.0.0.0", port=8000)

@ -0,0 +1,3 @@
"""
Compliance Module for audit logging and data privacy.
"""

@ -0,0 +1,260 @@
import os
import json
import logging
import datetime
import uuid
from typing import Dict, Any, Optional, List, Union
from fastapi import Request, Response
from starlette.middleware.base import BaseHTTPMiddleware
from functools import wraps
class AuditLogger:
"""
Audit logger for HIPAA compliance.
Records all data access and modifications for compliance and security audit purposes.
"""
def __init__(self, log_file: str = None):
"""
Initialize the audit logger.
Args:
log_file: Optional path to the audit log file
"""
self.logger = logging.getLogger("audit")
# Configure audit logger if not already configured
if not self.logger.handlers:
# Create a separate handler for audit logs
if log_file:
handler = logging.FileHandler(log_file)
else:
handler = logging.StreamHandler()
formatter = logging.Formatter(
'%(asctime)s [AUDIT] [%(levelname)s] %(message)s',
datefmt='%Y-%m-%dT%H:%M:%S%z'
)
handler.setFormatter(formatter)
self.logger.addHandler(handler)
self.logger.setLevel(logging.INFO)
# Ensure audit logs are always written, even if root logger level is higher
self.logger.propagate = False
def log_event(self, event_type: str, user_id: str, resource_type: str = None,
resource_id: str = None, action: str = None, details: Dict[str, Any] = None):
"""
Log an audit event.
Args:
event_type: Type of event (access, create, update, delete)
user_id: ID of the user performing the action
resource_type: Type of resource being accessed
resource_id: ID of the resource being accessed
action: Action being performed
details: Additional details about the event
"""
event = {
"timestamp": datetime.datetime.utcnow().isoformat(),
"event_id": str(uuid.uuid4()),
"event_type": event_type,
"user_id": user_id,
"resource_type": resource_type,
"resource_id": resource_id,
"action": action,
"details": details or {}
}
# Log the event
self.logger.info(json.dumps(event))
def log_access(self, user_id: str, resource_type: str, resource_id: str = None,
details: Dict[str, Any] = None):
"""
Log a resource access event.
Args:
user_id: ID of the user accessing the resource
resource_type: Type of resource being accessed
resource_id: ID of the resource being accessed
details: Additional details about the access
"""
self.log_event(
event_type="access",
user_id=user_id,
resource_type=resource_type,
resource_id=resource_id,
action="read",
details=details
)
def log_create(self, user_id: str, resource_type: str, resource_id: str,
details: Dict[str, Any] = None):
"""
Log a resource creation event.
Args:
user_id: ID of the user creating the resource
resource_type: Type of resource being created
resource_id: ID of the created resource
details: Additional details about the creation
"""
self.log_event(
event_type="create",
user_id=user_id,
resource_type=resource_type,
resource_id=resource_id,
action="create",
details=details
)
def log_update(self, user_id: str, resource_type: str, resource_id: str,
details: Dict[str, Any] = None):
"""
Log a resource update event.
Args:
user_id: ID of the user updating the resource
resource_type: Type of resource being updated
resource_id: ID of the updated resource
details: Additional details about the update
"""
self.log_event(
event_type="update",
user_id=user_id,
resource_type=resource_type,
resource_id=resource_id,
action="update",
details=details
)
def log_delete(self, user_id: str, resource_type: str, resource_id: str,
details: Dict[str, Any] = None):
"""
Log a resource deletion event.
Args:
user_id: ID of the user deleting the resource
resource_type: Type of resource being deleted
resource_id: ID of the deleted resource
details: Additional details about the deletion
"""
self.log_event(
event_type="delete",
user_id=user_id,
resource_type=resource_type,
resource_id=resource_id,
action="delete",
details=details
)
# Create a global audit logger instance
audit_logger = AuditLogger()
class AuditMiddleware(BaseHTTPMiddleware):
"""
Middleware for auditing API requests.
Records all API requests for compliance and security audit purposes.
"""
async def dispatch(self, request: Request, call_next):
"""
Process a request and log audit information.
Args:
request: The request object
call_next: The next middleware or route handler
Returns:
The response
"""
# Get start time
start_time = datetime.datetime.utcnow()
# Get request details
method = request.method
url = str(request.url)
client_host = request.client.host if request.client else "unknown"
# Get user ID from request if available
user_id = "unknown"
if hasattr(request.state, "user") and hasattr(request.state.user, "user_id"):
user_id = request.state.user.user_id
try:
# Call the next middleware or route handler
response = await call_next(request)
# Get response status code
status_code = response.status_code
# Log the request
audit_logger.log_event(
event_type="api_request",
user_id=user_id,
action=method,
details={
"url": url,
"status_code": status_code,
"client_host": client_host,
"duration_ms": int((datetime.datetime.utcnow() - start_time).total_seconds() * 1000)
}
)
return response
except Exception as e:
# Log the error
audit_logger.log_event(
event_type="api_error",
user_id=user_id,
action=method,
details={
"url": url,
"error": str(e),
"client_host": client_host,
"duration_ms": int((datetime.datetime.utcnow() - start_time).total_seconds() * 1000)
}
)
# Re-raise the exception
raise
def audit_access(resource_type: str):
"""
Decorator for auditing resource access.
Args:
resource_type: Type of resource being accessed
Returns:
Decorated function
"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
# Get user ID if available
user_id = "unknown"
if "user" in kwargs and "user_id" in kwargs["user"]:
user_id = kwargs["user"]["user_id"]
# Get resource ID if available
resource_id = kwargs.get("resource_id", None)
# Log the access
audit_logger.log_access(
user_id=user_id,
resource_type=resource_type,
resource_id=resource_id
)
# Call the original function
return func(*args, **kwargs)
return wrapper
return decorator

@ -0,0 +1,230 @@
import re
import copy
import logging
from typing import Dict, Any, List, Set, Union, Optional
import json
class PrivacyFilter:
"""
Filter for handling sensitive healthcare data.
This class implements methods to redact, mask, or filter sensitive information
in accordance with privacy regulations.
"""
# Regular expressions for identifying sensitive data patterns
PATTERNS = {
"ssn": r"\b\d{3}[-\s]?\d{2}[-\s]?\d{4}\b",
"credit_card": r"\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b",
"phone": r"\b\d{3}[-\s.]?\d{3}[-\s.]?\d{4}\b",
"email": r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b",
"zipcode": r"\b\d{5}(?:[-\s]\d{4})?\b"
}
# FHIR fields that may contain sensitive data
SENSITIVE_FIELDS = {
"Patient": {
"identifier": ["value"],
"telecom": ["value"],
"address": ["line", "postalCode"],
"contact": ["telecom.value"],
"generalPractitioner": []
},
"Observation": {
"identifier": ["value"],
"performer": []
}
}
def __init__(self):
"""Initialize the privacy filter."""
self.logger = logging.getLogger(__name__)
def filter_resource(self, resource: Dict[str, Any],
resource_type: str,
redact_fields: List[str] = None,
mask_fields: List[str] = None) -> Dict[str, Any]:
"""
Filter sensitive data in a FHIR resource.
Args:
resource: FHIR resource data
resource_type: Type of FHIR resource
redact_fields: List of field paths to completely redact
mask_fields: List of field paths to mask (e.g., "XXXX1234")
Returns:
Filtered resource data
"""
# Make a deep copy to avoid modifying the original
filtered = copy.deepcopy(resource)
# Get default sensitive fields for this resource type
sensitive_fields = self.SENSITIVE_FIELDS.get(resource_type, {})
# Apply redaction for specified fields
if redact_fields:
for field_path in redact_fields:
self._apply_to_field(filtered, field_path, self._redact_value)
# Apply masking for specified fields
if mask_fields:
for field_path in mask_fields:
self._apply_to_field(filtered, field_path, self._mask_value)
# Apply default processing for known sensitive fields
for field, subfields in sensitive_fields.items():
if field in filtered:
if isinstance(filtered[field], list):
for i, item in enumerate(filtered[field]):
if isinstance(item, dict):
# Process subfields in list items
for subfield in subfields:
self._apply_to_field(item, subfield, self._mask_value)
elif isinstance(filtered[field], dict):
# Process subfields in dictionaries
for subfield in subfields:
self._apply_to_field(filtered[field], subfield, self._mask_value)
# Detect and mask potential PII that wasn't explicitly specified
self._detect_and_mask_patterns(filtered)
return filtered
def _apply_to_field(self, data: Dict[str, Any], field_path: str,
processor_func: callable) -> None:
"""
Apply a processing function to a field specified by path.
Args:
data: Data dictionary
field_path: Path to the field (dot notation)
processor_func: Function to apply to the field value
"""
if not data or not isinstance(data, dict):
return
# Handle dot notation for nested fields
parts = field_path.split('.')
current = data
# Navigate to the nested field
for i, part in enumerate(parts[:-1]):
if part in current:
if isinstance(current[part], dict):
current = current[part]
elif isinstance(current[part], list):
# Handle lists of objects
for item in current[part]:
if isinstance(item, dict):
# Recursively apply to each item in the list
self._apply_to_field(item, '.'.join(parts[i+1:]), processor_func)
return
else:
# Can't navigate further
return
else:
# Field doesn't exist
return
# Apply processor to the field
last_part = parts[-1]
if last_part in current:
current[last_part] = processor_func(current[last_part])
def _detect_and_mask_patterns(self, data: Dict[str, Any]) -> None:
"""
Recursively scan data for patterns that look like sensitive information.
Args:
data: Data to scan
"""
if isinstance(data, dict):
for key, value in data.items():
if isinstance(value, (dict, list)):
self._detect_and_mask_patterns(value)
elif isinstance(value, str):
# Scan string values for patterns
for pattern_name, pattern in self.PATTERNS.items():
if re.search(pattern, value):
data[key] = self._mask_pattern(value, pattern)
elif isinstance(data, list):
for i, item in enumerate(data):
if isinstance(item, (dict, list)):
self._detect_and_mask_patterns(item)
elif isinstance(item, str):
# Scan string values for patterns
for pattern_name, pattern in self.PATTERNS.items():
if re.search(pattern, item):
data[i] = self._mask_pattern(item, pattern)
def _mask_pattern(self, text: str, pattern: str) -> str:
"""
Mask a specific pattern in text.
Args:
text: Text to mask
pattern: Regex pattern to find
Returns:
Masked text
"""
def replace_match(match):
s = match.group(0)
# Keep first and last character, mask the rest
if len(s) > 2:
return s[0] + 'X' * (len(s) - 2) + s[-1]
else:
return 'X' * len(s)
return re.sub(pattern, replace_match, text)
def _redact_value(self, value: Any) -> str:
"""
Completely redact a value.
Args:
value: Value to redact
Returns:
Redacted value
"""
if isinstance(value, str):
return "[REDACTED]"
elif isinstance(value, (int, float)):
return 0
elif isinstance(value, list):
return []
elif isinstance(value, dict):
return {}
else:
return None
def _mask_value(self, value: Any) -> Any:
"""
Mask a value, preserving some information.
Args:
value: Value to mask
Returns:
Masked value
"""
if isinstance(value, str):
if len(value) <= 4:
return "X" * len(value)
else:
# Keep first and last character, mask the rest
return value[0] + "X" * (len(value) - 2) + value[-1]
elif isinstance(value, (int, float)):
# Mask numbers by rounding/truncating
return 0 # For simplicity in POC, in production this might be more sophisticated
elif isinstance(value, list):
return [self._mask_value(v) for v in value]
elif isinstance(value, dict):
return {k: self._mask_value(v) for k, v in value.items()}
else:
return value
# Create a global privacy filter instance
privacy_filter = PrivacyFilter()

@ -0,0 +1,66 @@
version: '3.8'
services:
# Main FHIR OCR application
fhir-ocr-app:
build:
context: .
dockerfile: docker/Dockerfile
ports:
- "8000:8000"
volumes:
- ./fhir_storage:/app/fhir_storage
- ./sample_data:/app/sample_data
environment:
- ENVIRONMENT=development
- JWT_SECRET_KEY=dev-secret-key-replace-in-production
- JWT_ALGORITHM=HS256
- ACCESS_TOKEN_EXPIRE_MINUTES=60
depends_on:
- keycloak
networks:
- fhir-ocr-network
restart: unless-stopped
# Keycloak for authentication
keycloak:
image: quay.io/keycloak/keycloak:20.0.2
ports:
- "8181:8080"
environment:
- KEYCLOAK_ADMIN=admin
- KEYCLOAK_ADMIN_PASSWORD=admin
- KC_HEALTH_ENABLED=true
- KC_METRICS_ENABLED=true
- KC_HTTP_ENABLED=true
- KC_DB=dev-file
command:
- start-dev
volumes:
- keycloak_data:/opt/keycloak/data
networks:
- fhir-ocr-network
restart: unless-stopped
# HAPI FHIR Server
# Note: This is included but not connected in this POC
# It can be used as an alternative to the local file storage
hapi-fhir:
image: hapiproject/hapi:latest
ports:
- "8090:8080"
environment:
- hapi.fhir.default_encoding=json
- hapi.fhir.allow_external_references=true
- hapi.fhir.allow_placeholder_references=true
- hapi.fhir.validation.requests_enabled=false
- hapi.fhir.validation.responses_enabled=false
networks:
- fhir-ocr-network
restart: unless-stopped
volumes:
keycloak_data:
networks:
fhir-ocr-network:

@ -0,0 +1,34 @@
FROM python:3.9-slim
# Install system dependencies including Tesseract OCR
RUN apt-get update && apt-get install -y \
tesseract-ocr \
libgl1-mesa-glx \
libglib2.0-0 \
&& rm -rf /var/lib/apt/lists/*
# Set working directory
WORKDIR /app
# Copy requirements first for better caching
COPY requirements.txt .
# Install Python dependencies
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY . .
# Create storage directories for FHIR resources
RUN mkdir -p fhir_storage/Patient fhir_storage/Observation
# Set environment variables
ENV PYTHONPATH=/app
ENV PYTHONUNBUFFERED=1
ENV ENVIRONMENT=development
# Expose port
EXPOSE 8000
# Run application
CMD ["uvicorn", "api.app:app", "--host", "0.0.0.0", "--port", "8000"]

@ -0,0 +1,3 @@
"""
FHIR Module for handling FHIR resources storage and retrieval.
"""

@ -0,0 +1,194 @@
import os
import json
import logging
import requests
from typing import Dict, Any, List, Optional, Union
from fhir.resources.resource import Resource
from fhir.resources.patient import Patient
from fhir.resources.observation import Observation
class FHIRClient:
"""
Client for interacting with a FHIR server.
"""
def __init__(self, base_url: str, auth_token: Optional[str] = None):
"""
Initialize the FHIR client.
Args:
base_url: Base URL of the FHIR server
auth_token: Optional authentication token
"""
self.base_url = base_url.rstrip('/')
self.auth_token = auth_token
self.logger = logging.getLogger(__name__)
# Verify FHIR server connection
try:
response = self._make_request('GET', f"{self.base_url}/metadata")
if response.status_code != 200:
self.logger.warning(f"FHIR server returned status {response.status_code} for metadata request")
else:
capability = response.json()
self.logger.info(f"Connected to FHIR server, version: {capability.get('software', {}).get('version', 'unknown')}")
except Exception as e:
self.logger.error(f"Error connecting to FHIR server: {str(e)}")
# Don't raise error here to allow for initialization even if server is not available
def _make_request(self, method: str, url: str, data: Optional[Dict[str, Any]] = None) -> requests.Response:
"""
Make an HTTP request to the FHIR server.
Args:
method: HTTP method (GET, POST, PUT, DELETE)
url: URL to request
data: Optional data to send
Returns:
HTTP response
"""
headers = {
'Content-Type': 'application/fhir+json',
'Accept': 'application/fhir+json'
}
# Add authentication if provided
if self.auth_token:
headers['Authorization'] = f"Bearer {self.auth_token}"
try:
response = requests.request(
method=method,
url=url,
headers=headers,
json=data
)
# Log request details
self.logger.debug(f"{method} {url}: {response.status_code}")
# Raise error for non-2xx responses
response.raise_for_status()
return response
except requests.exceptions.HTTPError as e:
self.logger.error(f"HTTP error: {str(e)}")
# Include response body in error log if available
if hasattr(e, 'response') and e.response is not None:
self.logger.error(f"Response: {e.response.text}")
raise
except Exception as e:
self.logger.error(f"Request error: {str(e)}")
raise
def create_resource(self, resource: Resource) -> Dict[str, Any]:
"""
Create a new FHIR resource.
Args:
resource: FHIR resource to create
Returns:
Created resource data
"""
resource_type = resource.resource_type
# Convert resource to JSON
resource_json = json.loads(resource.json())
# Make request to create resource
response = self._make_request('POST', f"{self.base_url}/{resource_type}", resource_json)
# Return created resource
return response.json()
def read_resource(self, resource_type: str, resource_id: str) -> Dict[str, Any]:
"""
Read a FHIR resource by ID.
Args:
resource_type: Type of resource (Patient, Observation, etc.)
resource_id: ID of the resource
Returns:
Resource data
"""
response = self._make_request('GET', f"{self.base_url}/{resource_type}/{resource_id}")
return response.json()
def update_resource(self, resource: Resource) -> Dict[str, Any]:
"""
Update an existing FHIR resource.
Args:
resource: FHIR resource to update
Returns:
Updated resource data
"""
resource_type = resource.resource_type
resource_id = resource.id
# Convert resource to JSON
resource_json = json.loads(resource.json())
# Make request to update resource
response = self._make_request(
'PUT',
f"{self.base_url}/{resource_type}/{resource_id}",
resource_json
)
# Return updated resource
return response.json()
def delete_resource(self, resource_type: str, resource_id: str) -> bool:
"""
Delete a FHIR resource.
Args:
resource_type: Type of resource
resource_id: ID of the resource
Returns:
True if deletion was successful
"""
response = self._make_request('DELETE', f"{self.base_url}/{resource_type}/{resource_id}")
return response.status_code in (200, 202, 204)
def search_resources(self, resource_type: str, params: Dict[str, Any] = None) -> List[Dict[str, Any]]:
"""
Search for FHIR resources.
Args:
resource_type: Type of resource to search for
params: Search parameters
Returns:
List of matching resources
"""
url = f"{self.base_url}/{resource_type}"
if params:
# Convert params to URL query string
query_params = []
for key, value in params.items():
if isinstance(value, list):
for v in value:
query_params.append(f"{key}={v}")
else:
query_params.append(f"{key}={value}")
url = f"{url}?{'&'.join(query_params)}"
response = self._make_request('GET', url)
bundle = response.json()
# Extract resources from bundle
resources = []
if 'entry' in bundle:
for entry in bundle['entry']:
if 'resource' in entry:
resources.append(entry['resource'])
return resources

@ -0,0 +1,244 @@
import os
import json
import uuid
import datetime
import logging
from typing import Dict, Any, List, Optional, Union
from pathlib import Path
from fhir.resources.resource import Resource
from fhir.resources.patient import Patient
from fhir.resources.observation import Observation
class FHIRRepository:
"""
Local storage repository for FHIR resources.
This is a simplified implementation for the POC, using file storage.
In production, this would be replaced with a database.
"""
def __init__(self, storage_dir: str = 'fhir_storage'):
"""
Initialize the FHIR repository.
Args:
storage_dir: Directory to store FHIR resources
"""
self.storage_dir = storage_dir
self.logger = logging.getLogger(__name__)
# Create storage directory if it doesn't exist
try:
os.makedirs(os.path.join(storage_dir, 'Patient'), exist_ok=True)
os.makedirs(os.path.join(storage_dir, 'Observation'), exist_ok=True)
self.logger.info(f"FHIR storage directories created in {storage_dir}")
except Exception as e:
self.logger.error(f"Error creating storage directories: {str(e)}")
raise
def _get_resource_path(self, resource_type: str, resource_id: str) -> str:
"""
Get the file path for a resource.
Args:
resource_type: Type of resource
resource_id: ID of the resource
Returns:
Path to the resource file
"""
return os.path.join(self.storage_dir, resource_type, f"{resource_id}.json")
def _read_resource_file(self, file_path: str) -> Dict[str, Any]:
"""
Read a resource file.
Args:
file_path: Path to the resource file
Returns:
Resource data as a dictionary
"""
try:
with open(file_path, 'r') as f:
return json.load(f)
except Exception as e:
self.logger.error(f"Error reading resource file: {str(e)}")
raise
def _write_resource_file(self, file_path: str, data: Dict[str, Any]) -> None:
"""
Write a resource file.
Args:
file_path: Path to the resource file
data: Resource data
"""
try:
with open(file_path, 'w') as f:
json.dump(data, f, indent=2)
except Exception as e:
self.logger.error(f"Error writing resource file: {str(e)}")
raise
def create_resource(self, resource: Resource) -> Dict[str, Any]:
"""
Create a new FHIR resource.
Args:
resource: FHIR resource to create
Returns:
Created resource data
"""
resource_type = resource.resource_type
# Ensure resource has an ID
if not resource.id:
resource.id = str(uuid.uuid4())
# Convert resource to dictionary
resource_data = json.loads(resource.json())
# Add metadata
resource_data['meta'] = resource_data.get('meta', {})
resource_data['meta']['lastUpdated'] = datetime.datetime.utcnow().isoformat()
# Save resource to file
file_path = self._get_resource_path(resource_type, resource.id)
self._write_resource_file(file_path, resource_data)
self.logger.info(f"Created {resource_type} resource with ID {resource.id}")
return resource_data
def read_resource(self, resource_type: str, resource_id: str) -> Dict[str, Any]:
"""
Read a FHIR resource by ID.
Args:
resource_type: Type of resource
resource_id: ID of the resource
Returns:
Resource data
"""
file_path = self._get_resource_path(resource_type, resource_id)
if not os.path.exists(file_path):
self.logger.error(f"Resource not found: {resource_type}/{resource_id}")
raise FileNotFoundError(f"Resource not found: {resource_type}/{resource_id}")
resource_data = self._read_resource_file(file_path)
self.logger.debug(f"Read {resource_type} resource with ID {resource_id}")
return resource_data
def update_resource(self, resource: Resource) -> Dict[str, Any]:
"""
Update an existing FHIR resource.
Args:
resource: FHIR resource to update
Returns:
Updated resource data
"""
resource_type = resource.resource_type
resource_id = resource.id
if not resource_id:
self.logger.error("Cannot update resource without ID")
raise ValueError("Resource must have an ID for update")
# Check if resource exists
file_path = self._get_resource_path(resource_type, resource_id)
if not os.path.exists(file_path):
self.logger.error(f"Resource not found for update: {resource_type}/{resource_id}")
raise FileNotFoundError(f"Resource not found for update: {resource_type}/{resource_id}")
# Convert resource to dictionary
resource_data = json.loads(resource.json())
# Update metadata
resource_data['meta'] = resource_data.get('meta', {})
resource_data['meta']['lastUpdated'] = datetime.datetime.utcnow().isoformat()
# Save updated resource
self._write_resource_file(file_path, resource_data)
self.logger.info(f"Updated {resource_type} resource with ID {resource_id}")
return resource_data
def delete_resource(self, resource_type: str, resource_id: str) -> bool:
"""
Delete a FHIR resource.
Args:
resource_type: Type of resource
resource_id: ID of the resource
Returns:
True if deletion was successful
"""
file_path = self._get_resource_path(resource_type, resource_id)
if not os.path.exists(file_path):
self.logger.error(f"Resource not found for deletion: {resource_type}/{resource_id}")
return False
try:
os.remove(file_path)
self.logger.info(f"Deleted {resource_type} resource with ID {resource_id}")
return True
except Exception as e:
self.logger.error(f"Error deleting resource: {str(e)}")
return False
def search_resources(self, resource_type: str, params: Dict[str, Any] = None) -> List[Dict[str, Any]]:
"""
Search for FHIR resources.
Args:
resource_type: Type of resource to search for
params: Search parameters
Returns:
List of matching resources
"""
# Check if resource type directory exists
resource_dir = os.path.join(self.storage_dir, resource_type)
if not os.path.exists(resource_dir):
self.logger.error(f"Resource type directory not found: {resource_type}")
return []
# Get all resource files of the specified type
resources = []
for file_name in os.listdir(resource_dir):
if file_name.endswith('.json'):
file_path = os.path.join(resource_dir, file_name)
resource = self._read_resource_file(file_path)
# Filter by parameters if provided
if params:
matches_all_params = True
for key, value in params.items():
# Handle nested properties with dot notation (e.g., "name.family")
parts = key.split('.')
resource_value = resource
for part in parts:
if isinstance(resource_value, dict) and part in resource_value:
resource_value = resource_value[part]
else:
resource_value = None
break
# Check if value matches
if resource_value != value:
matches_all_params = False
break
if not matches_all_params:
continue
resources.append(resource)
self.logger.debug(f"Found {len(resources)} {resource_type} resources matching search criteria")
return resources

@ -0,0 +1,3 @@
"""
OCR Module for extracting text from healthcare documents.
"""

@ -0,0 +1,172 @@
import uuid
import datetime
from typing import Dict, Any, Optional
from fhir.resources.patient import Patient
from fhir.resources.humanname import HumanName
from fhir.resources.identifier import Identifier
from fhir.resources.observation import Observation
from fhir.resources.codeableconcept import CodeableConcept
from fhir.resources.coding import Coding
from fhir.resources.reference import Reference
class FHIRMapper:
"""
Maps OCR extracted data to FHIR resources.
"""
def __init__(self):
"""Initialize the FHIR mapper."""
pass
def map_to_patient(self, ocr_data: Dict[str, Any]) -> Patient:
"""
Map OCR extracted data to a FHIR Patient resource.
Args:
ocr_data: Dictionary with OCR extracted data
Returns:
FHIR Patient resource
"""
# Extract patient data from OCR results
patient_data = ocr_data.get('structured_data', {}).get('patient', {})
# Create a unique ID for the patient
patient_id = str(uuid.uuid4())
# Parse name
name = None
if patient_data.get('name'):
# Simple parsing - in production would need more sophisticated name parsing
name_parts = patient_data['name'].split()
if len(name_parts) > 1:
given = name_parts[:-1]
family = name_parts[-1]
else:
given = name_parts
family = ""
name = HumanName(given=given, family=family, use="official")
# Parse DOB
birth_date = None
if patient_data.get('dob'):
# Try to parse date - this is simplified and would need better handling
try:
# Attempt to parse common date formats
for fmt in ('%Y-%m-%d', '%m/%d/%Y', '%d/%m/%Y', '%B %d, %Y'):
try:
birth_date = datetime.datetime.strptime(
patient_data['dob'], fmt
).strftime('%Y-%m-%d')
break
except ValueError:
continue
except Exception:
# If date parsing fails, store as is
birth_date = patient_data['dob']
# Create patient resource
patient = Patient(
id=patient_id,
identifier=[
Identifier(
system="http://example.org/fhir/ocr-extracted-mrn",
value=patient_data.get('id', f"OCR-{patient_id}")
)
],
active=True
)
# Add name if available
if name:
patient.name = [name]
# Add birth date if available
if birth_date:
patient.birthDate = birth_date
# Add gender if available
if patient_data.get('gender'):
# Map to FHIR gender values
gender_map = {
'male': 'male',
'm': 'male',
'female': 'female',
'f': 'female',
'other': 'other',
'unknown': 'unknown'
}
gender_value = patient_data['gender'].lower()
patient.gender = gender_map.get(gender_value, 'unknown')
return patient
def map_to_observation(self, ocr_data: Dict[str, Any], patient_id: str) -> Optional[Observation]:
"""
Map OCR extracted data to a FHIR Observation resource.
Args:
ocr_data: Dictionary with OCR extracted data
patient_id: ID of the associated patient
Returns:
FHIR Observation resource or None if no observation data found
"""
# This is a simplified example that would need to be expanded based on
# the specific type of document being processed
document_type = ocr_data.get('structured_data', {}).get('document_type')
# Only process certain document types for observations
if document_type not in ['lab_result', 'prescription', 'clinical_note']:
return None
# Create a unique ID for the observation
observation_id = str(uuid.uuid4())
# Create basic observation structure
observation = Observation(
id=observation_id,
status="final",
subject=Reference(reference=f"Patient/{patient_id}"),
effectiveDateTime=datetime.datetime.now().isoformat()
)
# Set category based on document type
if document_type == 'lab_result':
observation.category = [
CodeableConcept(
coding=[
Coding(
system="http://terminology.hl7.org/CodeSystem/observation-category",
code="laboratory",
display="Laboratory"
)
],
text="Laboratory"
)
]
elif document_type == 'prescription':
observation.category = [
CodeableConcept(
coding=[
Coding(
system="http://terminology.hl7.org/CodeSystem/observation-category",
code="medication",
display="Medication"
)
],
text="Medication"
)
]
# In a real implementation, we would extract specific lab values or medication
# information from the OCR data and populate the observation accordingly
# Example: store raw text in note for demonstration purposes
observation.note = [{
"text": f"OCR extracted text: {ocr_data.get('raw_text', '')[:200]}..."
}]
return observation

@ -0,0 +1,185 @@
import os
import logging
from typing import Dict, Any, List, Optional, Tuple
import cv2
import pytesseract
from PIL import Image
class OCRProcessor:
"""
OCR processor class for healthcare documents using Tesseract.
"""
def __init__(self, tesseract_cmd: Optional[str] = None):
"""
Initialize the OCR processor.
Args:
tesseract_cmd: Optional path to Tesseract executable
"""
if tesseract_cmd:
pytesseract.pytesseract.tesseract_cmd = tesseract_cmd
# Set up logging
self.logger = logging.getLogger(__name__)
# Verify Tesseract installation
try:
pytesseract.get_tesseract_version()
self.logger.info("Tesseract OCR initialized successfully")
except Exception as e:
self.logger.error(f"Failed to initialize Tesseract OCR: {str(e)}")
raise RuntimeError(f"Tesseract OCR not properly installed: {str(e)}")
def preprocess_image(self, image):
"""
Preprocess image to improve OCR accuracy.
Args:
image: OpenCV image object
Returns:
Preprocessed image
"""
# Convert to grayscale
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
# Apply threshold to get black and white image
_, binary = cv2.threshold(gray, 150, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU)
# Invert back
binary = 255 - binary
return binary
def process_image(self, image_path: str) -> Dict[str, Any]:
"""
Process an image file and extract text using OCR.
Args:
image_path: Path to the image file
Returns:
Dictionary containing extracted text and metadata
"""
if not os.path.exists(image_path):
self.logger.error(f"Image file not found: {image_path}")
raise FileNotFoundError(f"Image file not found: {image_path}")
try:
# Read image using OpenCV
img = cv2.imread(image_path)
if img is None:
raise ValueError(f"Failed to read image: {image_path}")
# Preprocess image
preprocessed = self.preprocess_image(img)
# Apply OCR
raw_text = pytesseract.image_to_string(preprocessed)
# Get detailed OCR data including confidence levels
ocr_data = pytesseract.image_to_data(preprocessed, output_type=pytesseract.Output.DICT)
# Extract structured data
structured_data = self.extract_healthcare_data(raw_text)
result = {
"raw_text": raw_text,
"structured_data": structured_data,
"confidence": self._calculate_avg_confidence(ocr_data),
"metadata": {
"source_file": image_path,
"ocr_engine": "Tesseract",
"ocr_version": pytesseract.get_tesseract_version()
}
}
self.logger.info(f"Successfully processed image: {image_path}")
return result
except Exception as e:
self.logger.error(f"OCR processing error: {str(e)}")
raise RuntimeError(f"Failed to process image with OCR: {str(e)}")
def extract_healthcare_data(self, text: str) -> Dict[str, Any]:
"""
Extract structured healthcare data from OCR text.
Args:
text: Raw OCR text
Returns:
Dictionary containing extracted healthcare data fields
"""
# This is a simplified implementation that would need to be enhanced
# with more sophisticated extraction logic for a real-world application
lines = [line.strip() for line in text.split('\n') if line.strip()]
data = {
"patient": {
"name": None,
"dob": None,
"id": None,
"gender": None
},
"document_type": self._detect_document_type(text),
"extracted_fields": {}
}
# Simple extraction based on keywords (would need enhancement for production)
for line in lines:
if "name:" in line.lower() or "patient:" in line.lower():
data["patient"]["name"] = self._extract_after_colon(line)
elif "dob:" in line.lower() or "birth" in line.lower() or "born:" in line.lower():
data["patient"]["dob"] = self._extract_after_colon(line)
elif "id:" in line.lower() or "mrn:" in line.lower() or "record" in line.lower():
data["patient"]["id"] = self._extract_after_colon(line)
elif "gender:" in line.lower() or "sex:" in line.lower():
data["patient"]["gender"] = self._extract_after_colon(line)
return data
def _detect_document_type(self, text: str) -> str:
"""
Attempt to detect the type of healthcare document.
Args:
text: Raw OCR text
Returns:
Document type string
"""
text_lower = text.lower()
if "insurance" in text_lower and ("card" in text_lower or "policy" in text_lower):
return "insurance_card"
elif "prescription" in text_lower:
return "prescription"
elif "lab" in text_lower and ("result" in text_lower or "report" in text_lower):
return "lab_result"
elif "discharge" in text_lower and "summary" in text_lower:
return "discharge_summary"
else:
return "unknown"
def _extract_after_colon(self, text: str) -> str:
"""Extract the content after a colon in a string."""
if ":" in text:
return text.split(":", 1)[1].strip()
return text.strip()
def _calculate_avg_confidence(self, ocr_data: Dict) -> float:
"""
Calculate average confidence score from OCR data.
Args:
ocr_data: Dictionary containing OCR data from pytesseract
Returns:
Average confidence score as a percentage
"""
confidences = [conf for conf in ocr_data.get('conf', []) if conf != -1]
if not confidences:
return 0.0
return sum(confidences) / len(confidences)

@ -0,0 +1,137 @@
import os
import unittest
import logging
from unittest.mock import patch, MagicMock
import json
from .ocr_processor import OCRProcessor
from .fhir_mapper import FHIRMapper
# Set up logging for tests
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class TestOCRModule(unittest.TestCase):
"""Test cases for OCR module."""
def setUp(self):
"""Set up test fixtures."""
# Mock Tesseract to avoid dependency on actual installation for tests
self.tesseract_patcher = patch('pytesseract.pytesseract.image_to_string')
self.mock_image_to_string = self.tesseract_patcher.start()
self.mock_image_to_string.return_value = """
Patient: John Doe
DOB: 01/15/1980
Sex: Male
MRN: 12345678
Insurance: HealthCorp
Policy #: HC987654321
"""
# Mock image_to_data to return confidence scores
self.data_patcher = patch('pytesseract.pytesseract.image_to_data')
self.mock_image_to_data = self.data_patcher.start()
self.mock_image_to_data.return_value = {
'conf': [90, 95, 85, 92, 88]
}
# Mock Tesseract version
self.version_patcher = patch('pytesseract.pytesseract.get_tesseract_version')
self.mock_get_version = self.version_patcher.start()
self.mock_get_version.return_value = '4.1.1'
# Create OCR processor with mocks
self.ocr = OCRProcessor()
# Create FHIR mapper
self.mapper = FHIRMapper()
def tearDown(self):
"""Tear down test fixtures."""
self.tesseract_patcher.stop()
self.data_patcher.stop()
self.version_patcher.stop()
@patch('cv2.imread')
@patch('cv2.cvtColor')
@patch('cv2.threshold')
@patch('os.path.exists')
def test_process_image(self, mock_exists, mock_threshold, mock_cvtcolor, mock_imread):
"""Test image processing and OCR extraction."""
# Set up mocks
mock_exists.return_value = True
mock_imread.return_value = MagicMock()
mock_cvtcolor.return_value = MagicMock()
mock_threshold.return_value = (None, MagicMock())
# Process a mock image
result = self.ocr.process_image("test_image.jpg")
# Verify results
self.assertIsNotNone(result)
self.assertIn("raw_text", result)
self.assertIn("structured_data", result)
self.assertIn("confidence", result)
self.assertIn("metadata", result)
# Check structured data extraction
patient_data = result["structured_data"]["patient"]
self.assertEqual(patient_data["name"], "John Doe")
self.assertEqual(patient_data["dob"], "01/15/1980")
self.assertEqual(patient_data["gender"], "Male")
self.assertEqual(patient_data["id"], "12345678")
# Check document type detection
self.assertEqual(result["structured_data"]["document_type"], "insurance_card")
def test_map_to_fhir_patient(self):
"""Test mapping OCR data to FHIR Patient resource."""
# Create sample OCR data
ocr_data = {
"raw_text": "Patient: John Doe\nDOB: 01/15/1980\nSex: Male\nMRN: 12345678",
"structured_data": {
"patient": {
"name": "John Doe",
"dob": "01/15/1980",
"gender": "Male",
"id": "12345678"
},
"document_type": "insurance_card"
}
}
# Map to FHIR Patient
patient = self.mapper.map_to_patient(ocr_data)
# Verify FHIR resource
self.assertIsNotNone(patient)
self.assertEqual(patient.gender, "male")
self.assertEqual(patient.name[0].family, "Doe")
self.assertEqual(patient.name[0].given[0], "John")
self.assertEqual(patient.birthDate, "1980-01-15")
self.assertEqual(patient.identifier[0].value, "12345678")
def test_map_to_fhir_observation(self):
"""Test mapping OCR data to FHIR Observation resource."""
# Create sample OCR data for a lab result
ocr_data = {
"raw_text": "Lab Result\nPatient: John Doe\nTest: Blood Glucose\nResult: 120 mg/dL",
"structured_data": {
"patient": {
"name": "John Doe",
"id": "12345678"
},
"document_type": "lab_result"
}
}
# Map to FHIR Observation
observation = self.mapper.map_to_observation(ocr_data, "patient-123")
# Verify FHIR resource
self.assertIsNotNone(observation)
self.assertEqual(observation.status, "final")
self.assertEqual(observation.subject.reference, "Patient/patient-123")
self.assertEqual(observation.category[0].coding[0].code, "laboratory")
if __name__ == "__main__":
unittest.main()

@ -0,0 +1,14 @@
pytesseract==0.3.10
pillow==10.0.0
opencv-python==4.8.0.74
numpy==1.24.0
fhir.resources==6.5.0
requests==2.31.0
fastapi==0.103.1
uvicorn==0.23.2
python-multipart==0.0.6
python-jose==3.3.0
PyJWT==2.8.0
python-dotenv==1.0.0
pydantic==1.10.8
pytest==7.4.0

@ -0,0 +1,77 @@
#!/usr/bin/env python
"""
Create a sample insurance card image for testing.
This script generates a simple image that resembles an insurance card
with synthetic patient data for OCR testing.
"""
import os
from PIL import Image, ImageDraw, ImageFont
import argparse
def create_insurance_card(output_path, patient_name="John Doe", dob="01/15/1980",
member_id="ABC12345678", plan="HealthPlus Gold"):
"""
Create a sample insurance card image.
Args:
output_path: Path to save the image
patient_name: Patient name
dob: Date of birth
member_id: Member ID
plan: Insurance plan
"""
# Create a blank image (standard card size in pixels at 300 DPI)
width, height = 1050, 650 # ~3.5" x 2.17"
image = Image.new('RGB', (width, height), color=(255, 255, 255))
draw = ImageDraw.Draw(image)
# Try to load a font, falling back to default if not available
try:
font_large = ImageFont.truetype("Arial", 36)
font_medium = ImageFont.truetype("Arial", 28)
font_small = ImageFont.truetype("Arial", 24)
except IOError:
# Use default font if Arial not available
font_large = ImageFont.load_default()
font_medium = ImageFont.load_default()
font_small = ImageFont.load_default()
# Draw a blue rectangle at the top (insurance company header)
draw.rectangle([(0, 0), (width, 120)], fill=(0, 82, 156))
# Add insurance company name
draw.text((50, 40), "HealthCorp Insurance", fill=(255, 255, 255), font=font_large)
# Add card details
draw.text((50, 150), f"Name: {patient_name}", fill=(0, 0, 0), font=font_medium)
draw.text((50, 200), f"DOB: {dob}", fill=(0, 0, 0), font=font_medium)
draw.text((50, 250), f"Member ID: {member_id}", fill=(0, 0, 0), font=font_medium)
draw.text((50, 300), f"Plan: {plan}", fill=(0, 0, 0), font=font_medium)
# Add gender field
draw.text((50, 350), "Gender: Male", fill=(0, 0, 0), font=font_medium)
# Add additional information
draw.text((50, 450), "Customer Service: 1-800-555-1234", fill=(0, 0, 0), font=font_small)
draw.text((50, 500), "Group #: HC987654", fill=(0, 0, 0), font=font_small)
draw.text((50, 550), "RxBIN: 123456 RxPCN: ABC", fill=(0, 0, 0), font=font_small)
# Add a border
draw.rectangle([(0, 0), (width-1, height-1)], outline=(0, 0, 0), width=2)
# Save the image
image.save(output_path)
print(f"Sample insurance card saved to: {output_path}")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Create a sample insurance card image')
parser.add_argument('--output', default='sample_insurance_card.png', help='Output image path')
parser.add_argument('--name', default='John Doe', help='Patient name')
parser.add_argument('--dob', default='01/15/1980', help='Date of birth')
parser.add_argument('--id', default='ABC12345678', help='Member ID')
parser.add_argument('--plan', default='HealthPlus Gold', help='Insurance plan')
args = parser.parse_args()
create_insurance_card(args.output, args.name, args.dob, args.id, args.plan)

@ -0,0 +1,3 @@
"""
Security Module for authentication and authorization.
"""

@ -0,0 +1,234 @@
import os
import time
import logging
from typing import Dict, Any, Optional, List, Union
import json
import jwt
from jwt.exceptions import PyJWTError
from fastapi import Request, HTTPException, Depends
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
# Security config - in production, these would be loaded from environment variables
JWT_SECRET_KEY = os.getenv("JWT_SECRET_KEY", "dev-secret-key-replace-in-production")
JWT_ALGORITHM = os.getenv("JWT_ALGORITHM", "HS256")
ACCESS_TOKEN_EXPIRE_MINUTES = int(os.getenv("ACCESS_TOKEN_EXPIRE_MINUTES", "30"))
# Set up security scheme for Swagger UI
security_scheme = HTTPBearer()
class AuthHandler:
"""
Authentication handler for JWT tokens.
"""
def __init__(self, secret_key: str = JWT_SECRET_KEY, algorithm: str = JWT_ALGORITHM):
"""
Initialize the authentication handler.
Args:
secret_key: Secret key for JWT token signing
algorithm: JWT algorithm
"""
self.secret_key = secret_key
self.algorithm = algorithm
self.logger = logging.getLogger(__name__)
def create_access_token(self, user_id: str, roles: List[str] = None) -> str:
"""
Create a new access token.
Args:
user_id: User ID
roles: List of roles
Returns:
JWT access token
"""
payload = {
"sub": user_id,
"roles": roles or [],
"exp": time.time() + (ACCESS_TOKEN_EXPIRE_MINUTES * 60),
"iat": time.time()
}
token = jwt.encode(payload, self.secret_key, algorithm=self.algorithm)
self.logger.debug(f"Created access token for user {user_id}")
return token
def decode_token(self, token: str) -> Dict[str, Any]:
"""
Decode and verify a JWT token.
Args:
token: JWT token
Returns:
Token payload
Raises:
HTTPException: If token is invalid
"""
try:
payload = jwt.decode(token, self.secret_key, algorithms=[self.algorithm])
return payload
except PyJWTError as e:
self.logger.error(f"Token verification failed: {str(e)}")
raise HTTPException(status_code=401, detail="Invalid token")
def get_current_user(self, credentials: HTTPAuthorizationCredentials = Depends(security_scheme)) -> Dict[str, Any]:
"""
Get the current user from the token.
Args:
credentials: HTTP authorization credentials
Returns:
User info from token
Raises:
HTTPException: If token is invalid
"""
token = credentials.credentials
payload = self.decode_token(token)
# Check if token is expired
exp = payload.get("exp", 0)
if time.time() > exp:
self.logger.warning(f"Expired token for user {payload.get('sub')}")
raise HTTPException(status_code=401, detail="Token expired")
return {
"user_id": payload.get("sub"),
"roles": payload.get("roles", [])
}
def has_role(self, required_roles: List[str]) -> callable:
"""
Dependency for role-based access control.
Args:
required_roles: List of required roles
Returns:
Dependency function
"""
def check_roles(user: Dict[str, Any] = Depends(self.get_current_user)) -> Dict[str, Any]:
"""
Check if user has required roles.
Args:
user: User info from token
Returns:
User info
Raises:
HTTPException: If user does not have required roles
"""
user_roles = user.get("roles", [])
# Check if user has any of the required roles
if not any(role in user_roles for role in required_roles):
self.logger.warning(f"User {user['user_id']} does not have required roles: {required_roles}")
raise HTTPException(status_code=403, detail="Insufficient permissions")
return user
return check_roles
# For local development/testing, create a mock authentication handler
class MockAuthHandler:
"""
Mock authentication handler for local development/testing.
This should NOT be used in production.
"""
def __init__(self):
"""Initialize the mock authentication handler."""
self.logger = logging.getLogger(__name__)
self.logger.warning("Using mock authentication handler - NOT SECURE FOR PRODUCTION")
def create_access_token(self, user_id: str, roles: List[str] = None) -> str:
"""
Create a mock access token.
Args:
user_id: User ID
roles: List of roles
Returns:
Mock JWT token
"""
payload = {
"sub": user_id,
"roles": roles or [],
"exp": time.time() + 3600, # 1 hour
"iat": time.time()
}
# Use a simple JWT with an obvious test key
token = jwt.encode(payload, "test-key-not-for-production", algorithm="HS256")
self.logger.debug(f"Created mock access token for user {user_id}")
return token
def get_current_user(self, credentials: HTTPAuthorizationCredentials = Depends(security_scheme)) -> Dict[str, Any]:
"""
Get user info from mock token.
Args:
credentials: HTTP authorization credentials
Returns:
User info
"""
token = credentials.credentials
try:
payload = jwt.decode(token, "test-key-not-for-production", algorithms=["HS256"])
return {
"user_id": payload.get("sub"),
"roles": payload.get("roles", [])
}
except:
# For testing, allow a special "dev-token" that grants admin access
if token == "dev-token":
self.logger.warning("Using development token - NOT SECURE")
return {
"user_id": "dev-user",
"roles": ["admin"]
}
raise HTTPException(status_code=401, detail="Invalid token")
def has_role(self, required_roles: List[str]) -> callable:
"""
Dependency for mock role-based access control.
Args:
required_roles: List of required roles
Returns:
Dependency function
"""
def check_roles(user: Dict[str, Any] = Depends(self.get_current_user)) -> Dict[str, Any]:
user_roles = user.get("roles", [])
# For development, allow "admin" role to access anything
if "admin" in user_roles:
return user
# Otherwise, check required roles
if not any(role in user_roles for role in required_roles):
self.logger.warning(f"User {user['user_id']} does not have required roles: {required_roles}")
raise HTTPException(status_code=403, detail="Insufficient permissions")
return user
return check_roles
# Create either real or mock auth handler based on environment
if os.getenv("ENVIRONMENT", "development") == "production":
auth_handler = AuthHandler()
else:
auth_handler = MockAuthHandler()

@ -0,0 +1,108 @@
#!/usr/bin/env python
"""
Test script for API security.
This script tests the security features of the API by attempting to access endpoints
with and without proper authentication.
"""
import requests
import argparse
import logging
import json
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger(__name__)
def test_api_security(base_url):
"""
Test API security features.
Args:
base_url: Base URL of the API
"""
logger.info("Testing API security...")
# Test health endpoint (should be accessible without authentication)
logger.info("\nTesting health endpoint (public)...")
health_url = f"{base_url}/health"
response = requests.get(health_url)
logger.info(f"Status code: {response.status_code}")
if response.status_code == 200:
logger.info("Success! Health endpoint is publicly accessible as expected.")
else:
logger.error("Error: Health endpoint should be publicly accessible.")
# Test token endpoint
logger.info("\nTesting token endpoint...")
token_url = f"{base_url}/auth/token"
# Test with invalid credentials
logger.info("Testing with empty username...")
response = requests.post(token_url, json={"username": "", "password": "password"})
logger.info(f"Status code: {response.status_code}")
if response.status_code == 400:
logger.info("Success! Token endpoint rejected empty username as expected.")
else:
logger.error("Error: Token endpoint should reject empty username.")
# Test with valid credentials
logger.info("Testing with valid username...")
response = requests.post(token_url, json={"username": "admin", "password": "password"})
logger.info(f"Status code: {response.status_code}")
if response.status_code == 200:
token_data = response.json()
access_token = token_data.get("access_token")
logger.info("Success! Received valid token.")
# Test protected endpoint without token
logger.info("\nTesting protected endpoint without token...")
patients_url = f"{base_url}/fhir/Patient"
response = requests.get(patients_url)
logger.info(f"Status code: {response.status_code}")
if response.status_code in (401, 403):
logger.info("Success! Access denied without token as expected.")
else:
logger.error("Error: Protected endpoint should deny access without token.")
# Test protected endpoint with token
logger.info("\nTesting protected endpoint with token...")
headers = {"Authorization": f"Bearer {access_token}"}
response = requests.get(patients_url, headers=headers)
logger.info(f"Status code: {response.status_code}")
if response.status_code == 200:
logger.info("Success! Access granted with token as expected.")
else:
logger.error("Error: Protected endpoint should grant access with token.")
# Test admin-only endpoint with user token
logger.info("\nTesting role-based access control...")
# For this POC, we use a generic endpoint that requires admin role
delete_url = f"{base_url}/fhir/Patient/non-existent-id"
response = requests.delete(delete_url, headers=headers)
if response.status_code == 403:
logger.info("Success! Regular user denied access to admin endpoint as expected.")
elif response.status_code == 404:
logger.info("Success! Admin user granted access to admin endpoint as expected.")
else:
logger.error(f"Unexpected response code: {response.status_code}")
else:
logger.error("Error: Could not obtain token for testing.")
logger.info("\nAPI security testing completed.")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Test API security')
parser.add_argument('--url', default='http://localhost:8000', help='Base URL of the API')
args = parser.parse_args()
test_api_security(args.url)

@ -0,0 +1,110 @@
#!/usr/bin/env python
"""
Test script for OCR to FHIR flow.
This script demonstrates the full flow of OCR processing and FHIR resource creation,
using the local implementation without requiring the API to be running.
"""
import os
import sys
import json
import logging
import argparse
from pathlib import Path
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(name)s: %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger(__name__)
def main():
"""Run the OCR to FHIR test flow."""
# Parse command line arguments
parser = argparse.ArgumentParser(description='Test OCR to FHIR flow')
parser.add_argument('--image', required=True, help='Path to the image file to process')
parser.add_argument('--output', default='test_results', help='Directory to store results')
parser.add_argument('--tesseract', help='Path to Tesseract executable')
args = parser.parse_args()
# Verify image file exists
if not os.path.exists(args.image):
logger.error(f"Image file not found: {args.image}")
sys.exit(1)
# Create output directory
os.makedirs(args.output, exist_ok=True)
try:
# Import local modules
# Add the current directory to the path if running from a different directory
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from ocr_module.ocr_processor import OCRProcessor
from ocr_module.fhir_mapper import FHIRMapper
from fhir_module.fhir_repository import FHIRRepository
# Initialize components
logger.info("Initializing components...")
ocr_processor = OCRProcessor(tesseract_cmd=args.tesseract)
fhir_mapper = FHIRMapper()
fhir_repository = FHIRRepository(storage_dir=args.output)
# Process image with OCR
logger.info(f"Processing image: {args.image}")
ocr_result = ocr_processor.process_image(args.image)
# Save OCR results to file
ocr_output_file = os.path.join(args.output, 'ocr_result.json')
with open(ocr_output_file, 'w') as f:
json.dump(ocr_result, f, indent=2)
logger.info(f"OCR results saved to: {ocr_output_file}")
logger.info(f"Document type detected: {ocr_result['structured_data']['document_type']}")
logger.info(f"OCR confidence: {ocr_result['confidence']}%")
# Map OCR data to FHIR Patient resource
logger.info("Mapping OCR data to FHIR Patient resource...")
patient = fhir_mapper.map_to_patient(ocr_result)
# Create patient resource in FHIR repository
patient_data = fhir_repository.create_resource(patient)
logger.info(f"Patient resource created with ID: {patient.id}")
# Map to observation if applicable
if ocr_result["structured_data"]["document_type"] in ["lab_result", "prescription"]:
logger.info("Mapping OCR data to FHIR Observation resource...")
observation = fhir_mapper.map_to_observation(ocr_result, patient.id)
if observation:
observation_data = fhir_repository.create_resource(observation)
logger.info(f"Observation resource created with ID: {observation.id}")
# Print patient information
logger.info("\nExtracted Patient Information:")
patient_info = ocr_result["structured_data"]["patient"]
for key, value in patient_info.items():
if value:
logger.info(f" {key.capitalize()}: {value}")
# Test reading the patient from the repository
retrieved_patient = fhir_repository.read_resource("Patient", patient.id)
# Save retrieved patient to file
patient_output_file = os.path.join(args.output, 'patient_resource.json')
with open(patient_output_file, 'w') as f:
json.dump(retrieved_patient, f, indent=2)
logger.info(f"Patient resource saved to: {patient_output_file}")
logger.info("Test completed successfully!")
except Exception as e:
logger.error(f"Error in OCR to FHIR flow: {str(e)}")
sys.exit(1)
if __name__ == "__main__":
main()
Loading…
Cancel
Save