From c25548a2d74479acae780e4f2f78b375614f319e Mon Sep 17 00:00:00 2001 From: jac is jake Date: Wed, 19 Mar 2025 18:52:12 -0600 Subject: [PATCH] Add initial implementation of FHIR OCR POC - Created a Docker Compose setup for the FHIR OCR application, including services for the main app, Keycloak for authentication, and a HAPI FHIR server. - Added a README file detailing project structure, features, requirements, and setup instructions. - Included necessary Python dependencies in requirements.txt. - Implemented core modules for OCR processing, FHIR resource mapping, and security features. - Developed test scripts for API security and OCR to FHIR flow. - Established compliance and privacy modules for audit logging and data protection. - Created sample data generation script for testing purposes. - Set up a basic FastAPI application structure with endpoints for authentication and FHIR resource management. --- FHIR_OCR_POC/README.md | 172 +++++++++ FHIR_OCR_POC/api/__init__.py | 3 + FHIR_OCR_POC/api/app.py | 343 ++++++++++++++++++ FHIR_OCR_POC/compliance_module/__init__.py | 3 + .../compliance_module/audit_logger.py | 260 +++++++++++++ .../compliance_module/privacy_filter.py | 230 ++++++++++++ FHIR_OCR_POC/docker-compose.yml | 66 ++++ FHIR_OCR_POC/docker/Dockerfile | 34 ++ FHIR_OCR_POC/fhir_module/__init__.py | 3 + FHIR_OCR_POC/fhir_module/fhir_client.py | 194 ++++++++++ FHIR_OCR_POC/fhir_module/fhir_repository.py | 244 +++++++++++++ FHIR_OCR_POC/ocr_module/__init__.py | 3 + FHIR_OCR_POC/ocr_module/fhir_mapper.py | 172 +++++++++ FHIR_OCR_POC/ocr_module/ocr_processor.py | 185 ++++++++++ FHIR_OCR_POC/ocr_module/test_ocr.py | 137 +++++++ FHIR_OCR_POC/requirements.txt | 14 + .../sample_data/create_sample_image.py | 77 ++++ FHIR_OCR_POC/security_module/__init__.py | 3 + FHIR_OCR_POC/security_module/auth.py | 234 ++++++++++++ FHIR_OCR_POC/test_api_security.py | 108 ++++++ FHIR_OCR_POC/test_ocr_flow.py | 110 ++++++ 21 files changed, 2595 insertions(+) create mode 100644 FHIR_OCR_POC/README.md create mode 100644 FHIR_OCR_POC/api/__init__.py create mode 100644 FHIR_OCR_POC/api/app.py create mode 100644 FHIR_OCR_POC/compliance_module/__init__.py create mode 100644 FHIR_OCR_POC/compliance_module/audit_logger.py create mode 100644 FHIR_OCR_POC/compliance_module/privacy_filter.py create mode 100644 FHIR_OCR_POC/docker-compose.yml create mode 100644 FHIR_OCR_POC/docker/Dockerfile create mode 100644 FHIR_OCR_POC/fhir_module/__init__.py create mode 100644 FHIR_OCR_POC/fhir_module/fhir_client.py create mode 100644 FHIR_OCR_POC/fhir_module/fhir_repository.py create mode 100644 FHIR_OCR_POC/ocr_module/__init__.py create mode 100644 FHIR_OCR_POC/ocr_module/fhir_mapper.py create mode 100644 FHIR_OCR_POC/ocr_module/ocr_processor.py create mode 100644 FHIR_OCR_POC/ocr_module/test_ocr.py create mode 100644 FHIR_OCR_POC/requirements.txt create mode 100644 FHIR_OCR_POC/sample_data/create_sample_image.py create mode 100644 FHIR_OCR_POC/security_module/__init__.py create mode 100644 FHIR_OCR_POC/security_module/auth.py create mode 100644 FHIR_OCR_POC/test_api_security.py create mode 100644 FHIR_OCR_POC/test_ocr_flow.py diff --git a/FHIR_OCR_POC/README.md b/FHIR_OCR_POC/README.md new file mode 100644 index 0000000..740cad6 --- /dev/null +++ b/FHIR_OCR_POC/README.md @@ -0,0 +1,172 @@ +# FHIR-Based Healthcare Document Processing POC + +This proof-of-concept application demonstrates a secure, FHIR-compliant system for processing healthcare documents with OCR and storing the extracted data via a FHIR API. + +## Project Structure + +- `ocr_module/`: OCR functionality using Tesseract for document text extraction +- `fhir_module/`: FHIR API implementation using HAPI FHIR for data storage and retrieval +- `security_module/`: Authentication and authorization using OAuth2/OpenID Connect +- `compliance_module/`: Audit logging and compliance features +- `api/`: Main application API that integrates all modules +- `docker/`: Docker and docker-compose configuration files + +## Features + +- Document processing with OCR to extract healthcare information +- FHIR-compliant data storage and retrieval +- OAuth2/OpenID Connect authentication +- Comprehensive audit logging +- Role-based access control +- Local deployment with Docker + +## Requirements + +- Docker and Docker Compose +- Python 3.9+ +- Tesseract OCR engine (automatically installed in Docker) + +## Getting Started + +### Using Docker Compose (Recommended) + +The easiest way to run the application is using Docker Compose: + +1. Clone the repository +2. Navigate to the project root directory +3. Run Docker Compose: + +```bash +docker-compose up +``` + +This will start the following services: +- FHIR OCR application at http://localhost:8000 +- Keycloak authentication server at http://localhost:8181 +- HAPI FHIR server at http://localhost:8090 (included but not integrated in the POC) + +### Manual Setup + +If you prefer to run the application without Docker: + +1. Install Tesseract OCR engine on your system: + - **Ubuntu/Debian**: `sudo apt-get install tesseract-ocr` + - **macOS**: `brew install tesseract` + - **Windows**: Download installer from [Tesseract GitHub page](https://github.com/UB-Mannheim/tesseract/wiki) + +2. Create and activate a Python virtual environment: +```bash +python -m venv venv +source venv/bin/activate # On Windows: venv\Scripts\activate +``` + +3. Install Python dependencies: +```bash +pip install -r requirements.txt +``` + +4. Run the application: +```bash +uvicorn api.app:app --host 0.0.0.0 --port 8000 +``` + +## Usage + +### 1. Authentication + +To access the API, you need to obtain an authentication token: + +```bash +curl -X POST http://localhost:8000/auth/token \ + -H "Content-Type: application/json" \ + -d '{"username": "admin", "password": "password"}' +``` + +This will return a JSON response with an access token: + +```json +{ + "access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...", + "token_type": "bearer" +} +``` + +### 2. Processing a Document + +Use the token to upload and process a document: + +```bash +curl -X POST http://localhost:8000/ocr/process \ + -H "Authorization: Bearer YOUR_TOKEN_HERE" \ + -F "file=@/path/to/document.jpg" \ + -F "process_as=insurance_card" +``` + +The API will return the OCR results and the IDs of the created FHIR resources. + +### 3. Retrieving FHIR Resources + +Get a patient resource: + +```bash +curl -X GET http://localhost:8000/fhir/Patient/PATIENT_ID \ + -H "Authorization: Bearer YOUR_TOKEN_HERE" +``` + +Get an observation resource: + +```bash +curl -X GET http://localhost:8000/fhir/Observation/OBSERVATION_ID \ + -H "Authorization: Bearer YOUR_TOKEN_HERE" +``` + +### 4. Testing the OCR Flow Locally + +For testing the OCR to FHIR flow without the API, use the provided test script: + +```bash +python test_ocr_flow.py --image sample_data/your_image.jpg --output test_results +``` + +## Keycloak Setup + +The Docker Compose configuration includes a Keycloak server for authentication. +For production use, you would need to: + +1. Access the Keycloak admin console at http://localhost:8181 +2. Log in with username `admin` and password `admin` +3. Create a new realm (e.g., `fhir-ocr`) +4. Create a new client (e.g., `fhir-ocr-client`) +5. Configure client access type as "confidential" +6. Add redirect URIs for your application +7. Create roles (e.g., `user`, `admin`) +8. Create users and assign roles + +## Security and Compliance + +The application includes several security and compliance features: + +- **Authentication**: OAuth2/OpenID Connect with Keycloak +- **Authorization**: Role-based access control +- **Audit Logging**: All API calls and data access are logged +- **Privacy Filtering**: Sensitive data can be masked or redacted + +## Docker Environment Variables + +The following environment variables can be set in the docker-compose.yml file: + +- `ENVIRONMENT`: `development` or `production` +- `JWT_SECRET_KEY`: Secret key for JWT token signing +- `JWT_ALGORITHM`: Algorithm for JWT token signing +- `ACCESS_TOKEN_EXPIRE_MINUTES`: Token expiration time in minutes + +## API Documentation + +API documentation is available at http://localhost:8000/docs when the application is running. + +## License + +This project uses open-source components: +- Tesseract OCR (Apache License 2.0) +- HAPI FHIR (Apache License 2.0) +- Keycloak (Apache License 2.0) \ No newline at end of file diff --git a/FHIR_OCR_POC/api/__init__.py b/FHIR_OCR_POC/api/__init__.py new file mode 100644 index 0000000..86d73ee --- /dev/null +++ b/FHIR_OCR_POC/api/__init__.py @@ -0,0 +1,3 @@ +""" +Main API module that integrates all components of the FHIR OCR application. +""" \ No newline at end of file diff --git a/FHIR_OCR_POC/api/app.py b/FHIR_OCR_POC/api/app.py new file mode 100644 index 0000000..3edebd3 --- /dev/null +++ b/FHIR_OCR_POC/api/app.py @@ -0,0 +1,343 @@ +import os +import logging +import tempfile +from typing import Dict, Any, List, Optional, Union +from pathlib import Path +import json +import uvicorn + +from fastapi import FastAPI, Depends, File, UploadFile, HTTPException, Form, Query +from fastapi.middleware.cors import CORSMiddleware +from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials +from fastapi.responses import JSONResponse +from pydantic import BaseModel + +# Import modules +from ocr_module.ocr_processor import OCRProcessor +from ocr_module.fhir_mapper import FHIRMapper +from fhir_module.fhir_repository import FHIRRepository +from security_module.auth import auth_handler +from compliance_module.audit_logger import audit_logger, AuditMiddleware +from compliance_module.privacy_filter import privacy_filter + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s [%(levelname)s] %(name)s: %(message)s', + datefmt='%Y-%m-%d %H:%M:%S' +) +logger = logging.getLogger(__name__) + +# Create FastAPI app +app = FastAPI( + title="FHIR OCR API", + description="API for processing healthcare documents with OCR and storing extracted data via FHIR", + version="0.1.0" +) + +# Add CORS middleware +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], # Adjust in production + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) + +# Add audit middleware +app.add_middleware(AuditMiddleware) + +# Initialize components +ocr_processor = OCRProcessor() +fhir_mapper = FHIRMapper() +fhir_repository = FHIRRepository(storage_dir=os.path.join(os.getcwd(), "fhir_storage")) + +# Security scheme for Swagger UI +security_scheme = HTTPBearer() + +# Pydantic models for API requests/responses +class TokenRequest(BaseModel): + username: str + password: str + +class TokenResponse(BaseModel): + access_token: str + token_type: str = "bearer" + +class OCRRequest(BaseModel): + image_url: Optional[str] = None + process_as: Optional[str] = "auto" # 'insurance_card', 'lab_result', etc. + +class OCRResponse(BaseModel): + raw_text: str + structured_data: Dict[str, Any] + confidence: float + patient_id: Optional[str] = None + observation_id: Optional[str] = None + +class ResourceResponse(BaseModel): + resource_type: str + id: str + data: Dict[str, Any] + +# API routes +@app.post("/auth/token", response_model=TokenResponse, tags=["Authentication"]) +async def login_for_access_token(form_data: TokenRequest): + """ + Get an access token for API authentication. + """ + # This is a simplified authentication for the POC + # In production, this would validate credentials against a user database + + # For POC, we just check if the username is not empty + if not form_data.username: + raise HTTPException(status_code=400, detail="Invalid username") + + # Determine roles based on username (for demonstration) + roles = ["user"] + if form_data.username == "admin": + roles.append("admin") + + # Create access token + access_token = auth_handler.create_access_token(form_data.username, roles) + + # Log the authentication + audit_logger.log_event( + event_type="authentication", + user_id=form_data.username, + action="login", + details={"roles": roles} + ) + + return {"access_token": access_token, "token_type": "bearer"} + +@app.post("/ocr/process", response_model=OCRResponse, tags=["OCR"]) +async def process_document( + file: UploadFile = File(...), + process_as: str = Form("auto"), + user: Dict[str, Any] = Depends(auth_handler.get_current_user) +): + """ + Process a document with OCR and extract healthcare data. + + Optionally specify how to process the document (insurance_card, lab_result, etc.) + """ + try: + # Save uploaded file to temporary location + with tempfile.NamedTemporaryFile(delete=False, suffix=os.path.splitext(file.filename)[1]) as temp_file: + temp_file.write(await file.read()) + temp_file_path = temp_file.name + + # Process image with OCR + ocr_result = ocr_processor.process_image(temp_file_path) + + # If process_as is specified, override the detected document type + if process_as != "auto": + ocr_result["structured_data"]["document_type"] = process_as + + # Map OCR data to FHIR resources + patient = fhir_mapper.map_to_patient(ocr_result) + + # Create patient resource in FHIR repository + patient_data = fhir_repository.create_resource(patient) + + # Log the creation + audit_logger.log_create( + user_id=user.get("user_id", "unknown"), + resource_type="Patient", + resource_id=patient.id, + details={"document_type": ocr_result["structured_data"]["document_type"]} + ) + + # Map to observation if applicable + observation_id = None + if ocr_result["structured_data"]["document_type"] in ["lab_result", "prescription"]: + observation = fhir_mapper.map_to_observation(ocr_result, patient.id) + if observation: + observation_data = fhir_repository.create_resource(observation) + observation_id = observation.id + + # Log the creation + audit_logger.log_create( + user_id=user.get("user_id", "unknown"), + resource_type="Observation", + resource_id=observation.id, + details={"document_type": ocr_result["structured_data"]["document_type"]} + ) + + # Clean up temporary file + os.unlink(temp_file_path) + + return { + "raw_text": ocr_result["raw_text"], + "structured_data": ocr_result["structured_data"], + "confidence": ocr_result["confidence"], + "patient_id": patient.id, + "observation_id": observation_id + } + + except Exception as e: + logger.error(f"Error processing document: {str(e)}") + raise HTTPException(status_code=500, detail=f"Error processing document: {str(e)}") + +@app.get("/fhir/Patient/{patient_id}", response_model=ResourceResponse, tags=["FHIR"]) +async def get_patient( + patient_id: str, + mask_sensitive: bool = Query(False, description="Whether to mask sensitive information"), + user: Dict[str, Any] = Depends(auth_handler.has_role(["user", "admin"])) +): + """ + Get a patient resource by ID. + """ + try: + # Get patient resource + patient_data = fhir_repository.read_resource("Patient", patient_id) + + # Log the access + audit_logger.log_access( + user_id=user.get("user_id", "unknown"), + resource_type="Patient", + resource_id=patient_id + ) + + # Apply privacy filter if requested + if mask_sensitive: + patient_data = privacy_filter.filter_resource(patient_data, "Patient") + + return { + "resource_type": "Patient", + "id": patient_id, + "data": patient_data + } + + except FileNotFoundError: + raise HTTPException(status_code=404, detail=f"Patient {patient_id} not found") + except Exception as e: + logger.error(f"Error retrieving patient: {str(e)}") + raise HTTPException(status_code=500, detail=f"Error retrieving patient: {str(e)}") + +@app.get("/fhir/Observation/{observation_id}", response_model=ResourceResponse, tags=["FHIR"]) +async def get_observation( + observation_id: str, + mask_sensitive: bool = Query(False, description="Whether to mask sensitive information"), + user: Dict[str, Any] = Depends(auth_handler.has_role(["user", "admin"])) +): + """ + Get an observation resource by ID. + """ + try: + # Get observation resource + observation_data = fhir_repository.read_resource("Observation", observation_id) + + # Log the access + audit_logger.log_access( + user_id=user.get("user_id", "unknown"), + resource_type="Observation", + resource_id=observation_id + ) + + # Apply privacy filter if requested + if mask_sensitive: + observation_data = privacy_filter.filter_resource(observation_data, "Observation") + + return { + "resource_type": "Observation", + "id": observation_id, + "data": observation_data + } + + except FileNotFoundError: + raise HTTPException(status_code=404, detail=f"Observation {observation_id} not found") + except Exception as e: + logger.error(f"Error retrieving observation: {str(e)}") + raise HTTPException(status_code=500, detail=f"Error retrieving observation: {str(e)}") + +@app.get("/fhir/Patient", response_model=List[ResourceResponse], tags=["FHIR"]) +async def search_patients( + name: Optional[str] = None, + gender: Optional[str] = None, + mask_sensitive: bool = Query(False, description="Whether to mask sensitive information"), + user: Dict[str, Any] = Depends(auth_handler.has_role(["user", "admin"])) +): + """ + Search for patients. + """ + try: + # Build search parameters + params = {} + if name: + params["name.family"] = name + if gender: + params["gender"] = gender + + # Search for patients + patients = fhir_repository.search_resources("Patient", params) + + # Log the search + audit_logger.log_access( + user_id=user.get("user_id", "unknown"), + resource_type="Patient", + details={"search_params": params} + ) + + # Apply privacy filter if requested + if mask_sensitive: + patients = [privacy_filter.filter_resource(p, "Patient") for p in patients] + + return [ + { + "resource_type": "Patient", + "id": p.get("id", "unknown"), + "data": p + } + for p in patients + ] + + except Exception as e: + logger.error(f"Error searching patients: {str(e)}") + raise HTTPException(status_code=500, detail=f"Error searching patients: {str(e)}") + +@app.delete("/fhir/Patient/{patient_id}", tags=["FHIR"]) +async def delete_patient( + patient_id: str, + user: Dict[str, Any] = Depends(auth_handler.has_role(["admin"])) +): + """ + Delete a patient resource (admin only). + """ + try: + # Delete patient + success = fhir_repository.delete_resource("Patient", patient_id) + + if not success: + raise HTTPException(status_code=404, detail=f"Patient {patient_id} not found") + + # Log the deletion + audit_logger.log_delete( + user_id=user.get("user_id", "unknown"), + resource_type="Patient", + resource_id=patient_id + ) + + return {"detail": f"Patient {patient_id} deleted"} + + except HTTPException: + raise + except Exception as e: + logger.error(f"Error deleting patient: {str(e)}") + raise HTTPException(status_code=500, detail=f"Error deleting patient: {str(e)}") + +@app.get("/health", tags=["System"]) +async def health_check(): + """ + Health check endpoint. + """ + return {"status": "OK", "version": app.version} + +if __name__ == "__main__": + # Create FHIR storage directories + for resource_type in ["Patient", "Observation"]: + os.makedirs(os.path.join(os.getcwd(), "fhir_storage", resource_type), exist_ok=True) + + # Run server + uvicorn.run(app, host="0.0.0.0", port=8000) \ No newline at end of file diff --git a/FHIR_OCR_POC/compliance_module/__init__.py b/FHIR_OCR_POC/compliance_module/__init__.py new file mode 100644 index 0000000..628ad99 --- /dev/null +++ b/FHIR_OCR_POC/compliance_module/__init__.py @@ -0,0 +1,3 @@ +""" +Compliance Module for audit logging and data privacy. +""" \ No newline at end of file diff --git a/FHIR_OCR_POC/compliance_module/audit_logger.py b/FHIR_OCR_POC/compliance_module/audit_logger.py new file mode 100644 index 0000000..8b207cb --- /dev/null +++ b/FHIR_OCR_POC/compliance_module/audit_logger.py @@ -0,0 +1,260 @@ +import os +import json +import logging +import datetime +import uuid +from typing import Dict, Any, Optional, List, Union +from fastapi import Request, Response +from starlette.middleware.base import BaseHTTPMiddleware +from functools import wraps + +class AuditLogger: + """ + Audit logger for HIPAA compliance. + + Records all data access and modifications for compliance and security audit purposes. + """ + + def __init__(self, log_file: str = None): + """ + Initialize the audit logger. + + Args: + log_file: Optional path to the audit log file + """ + self.logger = logging.getLogger("audit") + + # Configure audit logger if not already configured + if not self.logger.handlers: + # Create a separate handler for audit logs + if log_file: + handler = logging.FileHandler(log_file) + else: + handler = logging.StreamHandler() + + formatter = logging.Formatter( + '%(asctime)s [AUDIT] [%(levelname)s] %(message)s', + datefmt='%Y-%m-%dT%H:%M:%S%z' + ) + handler.setFormatter(formatter) + + self.logger.addHandler(handler) + self.logger.setLevel(logging.INFO) + + # Ensure audit logs are always written, even if root logger level is higher + self.logger.propagate = False + + def log_event(self, event_type: str, user_id: str, resource_type: str = None, + resource_id: str = None, action: str = None, details: Dict[str, Any] = None): + """ + Log an audit event. + + Args: + event_type: Type of event (access, create, update, delete) + user_id: ID of the user performing the action + resource_type: Type of resource being accessed + resource_id: ID of the resource being accessed + action: Action being performed + details: Additional details about the event + """ + event = { + "timestamp": datetime.datetime.utcnow().isoformat(), + "event_id": str(uuid.uuid4()), + "event_type": event_type, + "user_id": user_id, + "resource_type": resource_type, + "resource_id": resource_id, + "action": action, + "details": details or {} + } + + # Log the event + self.logger.info(json.dumps(event)) + + def log_access(self, user_id: str, resource_type: str, resource_id: str = None, + details: Dict[str, Any] = None): + """ + Log a resource access event. + + Args: + user_id: ID of the user accessing the resource + resource_type: Type of resource being accessed + resource_id: ID of the resource being accessed + details: Additional details about the access + """ + self.log_event( + event_type="access", + user_id=user_id, + resource_type=resource_type, + resource_id=resource_id, + action="read", + details=details + ) + + def log_create(self, user_id: str, resource_type: str, resource_id: str, + details: Dict[str, Any] = None): + """ + Log a resource creation event. + + Args: + user_id: ID of the user creating the resource + resource_type: Type of resource being created + resource_id: ID of the created resource + details: Additional details about the creation + """ + self.log_event( + event_type="create", + user_id=user_id, + resource_type=resource_type, + resource_id=resource_id, + action="create", + details=details + ) + + def log_update(self, user_id: str, resource_type: str, resource_id: str, + details: Dict[str, Any] = None): + """ + Log a resource update event. + + Args: + user_id: ID of the user updating the resource + resource_type: Type of resource being updated + resource_id: ID of the updated resource + details: Additional details about the update + """ + self.log_event( + event_type="update", + user_id=user_id, + resource_type=resource_type, + resource_id=resource_id, + action="update", + details=details + ) + + def log_delete(self, user_id: str, resource_type: str, resource_id: str, + details: Dict[str, Any] = None): + """ + Log a resource deletion event. + + Args: + user_id: ID of the user deleting the resource + resource_type: Type of resource being deleted + resource_id: ID of the deleted resource + details: Additional details about the deletion + """ + self.log_event( + event_type="delete", + user_id=user_id, + resource_type=resource_type, + resource_id=resource_id, + action="delete", + details=details + ) + +# Create a global audit logger instance +audit_logger = AuditLogger() + +class AuditMiddleware(BaseHTTPMiddleware): + """ + Middleware for auditing API requests. + + Records all API requests for compliance and security audit purposes. + """ + + async def dispatch(self, request: Request, call_next): + """ + Process a request and log audit information. + + Args: + request: The request object + call_next: The next middleware or route handler + + Returns: + The response + """ + # Get start time + start_time = datetime.datetime.utcnow() + + # Get request details + method = request.method + url = str(request.url) + client_host = request.client.host if request.client else "unknown" + + # Get user ID from request if available + user_id = "unknown" + if hasattr(request.state, "user") and hasattr(request.state.user, "user_id"): + user_id = request.state.user.user_id + + try: + # Call the next middleware or route handler + response = await call_next(request) + + # Get response status code + status_code = response.status_code + + # Log the request + audit_logger.log_event( + event_type="api_request", + user_id=user_id, + action=method, + details={ + "url": url, + "status_code": status_code, + "client_host": client_host, + "duration_ms": int((datetime.datetime.utcnow() - start_time).total_seconds() * 1000) + } + ) + + return response + + except Exception as e: + # Log the error + audit_logger.log_event( + event_type="api_error", + user_id=user_id, + action=method, + details={ + "url": url, + "error": str(e), + "client_host": client_host, + "duration_ms": int((datetime.datetime.utcnow() - start_time).total_seconds() * 1000) + } + ) + + # Re-raise the exception + raise + +def audit_access(resource_type: str): + """ + Decorator for auditing resource access. + + Args: + resource_type: Type of resource being accessed + + Returns: + Decorated function + """ + def decorator(func): + @wraps(func) + def wrapper(*args, **kwargs): + # Get user ID if available + user_id = "unknown" + if "user" in kwargs and "user_id" in kwargs["user"]: + user_id = kwargs["user"]["user_id"] + + # Get resource ID if available + resource_id = kwargs.get("resource_id", None) + + # Log the access + audit_logger.log_access( + user_id=user_id, + resource_type=resource_type, + resource_id=resource_id + ) + + # Call the original function + return func(*args, **kwargs) + + return wrapper + + return decorator \ No newline at end of file diff --git a/FHIR_OCR_POC/compliance_module/privacy_filter.py b/FHIR_OCR_POC/compliance_module/privacy_filter.py new file mode 100644 index 0000000..4f35c23 --- /dev/null +++ b/FHIR_OCR_POC/compliance_module/privacy_filter.py @@ -0,0 +1,230 @@ +import re +import copy +import logging +from typing import Dict, Any, List, Set, Union, Optional +import json + +class PrivacyFilter: + """ + Filter for handling sensitive healthcare data. + + This class implements methods to redact, mask, or filter sensitive information + in accordance with privacy regulations. + """ + + # Regular expressions for identifying sensitive data patterns + PATTERNS = { + "ssn": r"\b\d{3}[-\s]?\d{2}[-\s]?\d{4}\b", + "credit_card": r"\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b", + "phone": r"\b\d{3}[-\s.]?\d{3}[-\s.]?\d{4}\b", + "email": r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b", + "zipcode": r"\b\d{5}(?:[-\s]\d{4})?\b" + } + + # FHIR fields that may contain sensitive data + SENSITIVE_FIELDS = { + "Patient": { + "identifier": ["value"], + "telecom": ["value"], + "address": ["line", "postalCode"], + "contact": ["telecom.value"], + "generalPractitioner": [] + }, + "Observation": { + "identifier": ["value"], + "performer": [] + } + } + + def __init__(self): + """Initialize the privacy filter.""" + self.logger = logging.getLogger(__name__) + + def filter_resource(self, resource: Dict[str, Any], + resource_type: str, + redact_fields: List[str] = None, + mask_fields: List[str] = None) -> Dict[str, Any]: + """ + Filter sensitive data in a FHIR resource. + + Args: + resource: FHIR resource data + resource_type: Type of FHIR resource + redact_fields: List of field paths to completely redact + mask_fields: List of field paths to mask (e.g., "XXXX1234") + + Returns: + Filtered resource data + """ + # Make a deep copy to avoid modifying the original + filtered = copy.deepcopy(resource) + + # Get default sensitive fields for this resource type + sensitive_fields = self.SENSITIVE_FIELDS.get(resource_type, {}) + + # Apply redaction for specified fields + if redact_fields: + for field_path in redact_fields: + self._apply_to_field(filtered, field_path, self._redact_value) + + # Apply masking for specified fields + if mask_fields: + for field_path in mask_fields: + self._apply_to_field(filtered, field_path, self._mask_value) + + # Apply default processing for known sensitive fields + for field, subfields in sensitive_fields.items(): + if field in filtered: + if isinstance(filtered[field], list): + for i, item in enumerate(filtered[field]): + if isinstance(item, dict): + # Process subfields in list items + for subfield in subfields: + self._apply_to_field(item, subfield, self._mask_value) + elif isinstance(filtered[field], dict): + # Process subfields in dictionaries + for subfield in subfields: + self._apply_to_field(filtered[field], subfield, self._mask_value) + + # Detect and mask potential PII that wasn't explicitly specified + self._detect_and_mask_patterns(filtered) + + return filtered + + def _apply_to_field(self, data: Dict[str, Any], field_path: str, + processor_func: callable) -> None: + """ + Apply a processing function to a field specified by path. + + Args: + data: Data dictionary + field_path: Path to the field (dot notation) + processor_func: Function to apply to the field value + """ + if not data or not isinstance(data, dict): + return + + # Handle dot notation for nested fields + parts = field_path.split('.') + current = data + + # Navigate to the nested field + for i, part in enumerate(parts[:-1]): + if part in current: + if isinstance(current[part], dict): + current = current[part] + elif isinstance(current[part], list): + # Handle lists of objects + for item in current[part]: + if isinstance(item, dict): + # Recursively apply to each item in the list + self._apply_to_field(item, '.'.join(parts[i+1:]), processor_func) + return + else: + # Can't navigate further + return + else: + # Field doesn't exist + return + + # Apply processor to the field + last_part = parts[-1] + if last_part in current: + current[last_part] = processor_func(current[last_part]) + + def _detect_and_mask_patterns(self, data: Dict[str, Any]) -> None: + """ + Recursively scan data for patterns that look like sensitive information. + + Args: + data: Data to scan + """ + if isinstance(data, dict): + for key, value in data.items(): + if isinstance(value, (dict, list)): + self._detect_and_mask_patterns(value) + elif isinstance(value, str): + # Scan string values for patterns + for pattern_name, pattern in self.PATTERNS.items(): + if re.search(pattern, value): + data[key] = self._mask_pattern(value, pattern) + elif isinstance(data, list): + for i, item in enumerate(data): + if isinstance(item, (dict, list)): + self._detect_and_mask_patterns(item) + elif isinstance(item, str): + # Scan string values for patterns + for pattern_name, pattern in self.PATTERNS.items(): + if re.search(pattern, item): + data[i] = self._mask_pattern(item, pattern) + + def _mask_pattern(self, text: str, pattern: str) -> str: + """ + Mask a specific pattern in text. + + Args: + text: Text to mask + pattern: Regex pattern to find + + Returns: + Masked text + """ + def replace_match(match): + s = match.group(0) + # Keep first and last character, mask the rest + if len(s) > 2: + return s[0] + 'X' * (len(s) - 2) + s[-1] + else: + return 'X' * len(s) + + return re.sub(pattern, replace_match, text) + + def _redact_value(self, value: Any) -> str: + """ + Completely redact a value. + + Args: + value: Value to redact + + Returns: + Redacted value + """ + if isinstance(value, str): + return "[REDACTED]" + elif isinstance(value, (int, float)): + return 0 + elif isinstance(value, list): + return [] + elif isinstance(value, dict): + return {} + else: + return None + + def _mask_value(self, value: Any) -> Any: + """ + Mask a value, preserving some information. + + Args: + value: Value to mask + + Returns: + Masked value + """ + if isinstance(value, str): + if len(value) <= 4: + return "X" * len(value) + else: + # Keep first and last character, mask the rest + return value[0] + "X" * (len(value) - 2) + value[-1] + elif isinstance(value, (int, float)): + # Mask numbers by rounding/truncating + return 0 # For simplicity in POC, in production this might be more sophisticated + elif isinstance(value, list): + return [self._mask_value(v) for v in value] + elif isinstance(value, dict): + return {k: self._mask_value(v) for k, v in value.items()} + else: + return value + +# Create a global privacy filter instance +privacy_filter = PrivacyFilter() \ No newline at end of file diff --git a/FHIR_OCR_POC/docker-compose.yml b/FHIR_OCR_POC/docker-compose.yml new file mode 100644 index 0000000..c2e1acf --- /dev/null +++ b/FHIR_OCR_POC/docker-compose.yml @@ -0,0 +1,66 @@ +version: '3.8' + +services: + # Main FHIR OCR application + fhir-ocr-app: + build: + context: . + dockerfile: docker/Dockerfile + ports: + - "8000:8000" + volumes: + - ./fhir_storage:/app/fhir_storage + - ./sample_data:/app/sample_data + environment: + - ENVIRONMENT=development + - JWT_SECRET_KEY=dev-secret-key-replace-in-production + - JWT_ALGORITHM=HS256 + - ACCESS_TOKEN_EXPIRE_MINUTES=60 + depends_on: + - keycloak + networks: + - fhir-ocr-network + restart: unless-stopped + + # Keycloak for authentication + keycloak: + image: quay.io/keycloak/keycloak:20.0.2 + ports: + - "8181:8080" + environment: + - KEYCLOAK_ADMIN=admin + - KEYCLOAK_ADMIN_PASSWORD=admin + - KC_HEALTH_ENABLED=true + - KC_METRICS_ENABLED=true + - KC_HTTP_ENABLED=true + - KC_DB=dev-file + command: + - start-dev + volumes: + - keycloak_data:/opt/keycloak/data + networks: + - fhir-ocr-network + restart: unless-stopped + + # HAPI FHIR Server + # Note: This is included but not connected in this POC + # It can be used as an alternative to the local file storage + hapi-fhir: + image: hapiproject/hapi:latest + ports: + - "8090:8080" + environment: + - hapi.fhir.default_encoding=json + - hapi.fhir.allow_external_references=true + - hapi.fhir.allow_placeholder_references=true + - hapi.fhir.validation.requests_enabled=false + - hapi.fhir.validation.responses_enabled=false + networks: + - fhir-ocr-network + restart: unless-stopped + +volumes: + keycloak_data: + +networks: + fhir-ocr-network: \ No newline at end of file diff --git a/FHIR_OCR_POC/docker/Dockerfile b/FHIR_OCR_POC/docker/Dockerfile new file mode 100644 index 0000000..1ce0d4c --- /dev/null +++ b/FHIR_OCR_POC/docker/Dockerfile @@ -0,0 +1,34 @@ +FROM python:3.9-slim + +# Install system dependencies including Tesseract OCR +RUN apt-get update && apt-get install -y \ + tesseract-ocr \ + libgl1-mesa-glx \ + libglib2.0-0 \ + && rm -rf /var/lib/apt/lists/* + +# Set working directory +WORKDIR /app + +# Copy requirements first for better caching +COPY requirements.txt . + +# Install Python dependencies +RUN pip install --no-cache-dir -r requirements.txt + +# Copy application code +COPY . . + +# Create storage directories for FHIR resources +RUN mkdir -p fhir_storage/Patient fhir_storage/Observation + +# Set environment variables +ENV PYTHONPATH=/app +ENV PYTHONUNBUFFERED=1 +ENV ENVIRONMENT=development + +# Expose port +EXPOSE 8000 + +# Run application +CMD ["uvicorn", "api.app:app", "--host", "0.0.0.0", "--port", "8000"] \ No newline at end of file diff --git a/FHIR_OCR_POC/fhir_module/__init__.py b/FHIR_OCR_POC/fhir_module/__init__.py new file mode 100644 index 0000000..42d2833 --- /dev/null +++ b/FHIR_OCR_POC/fhir_module/__init__.py @@ -0,0 +1,3 @@ +""" +FHIR Module for handling FHIR resources storage and retrieval. +""" \ No newline at end of file diff --git a/FHIR_OCR_POC/fhir_module/fhir_client.py b/FHIR_OCR_POC/fhir_module/fhir_client.py new file mode 100644 index 0000000..b6279d6 --- /dev/null +++ b/FHIR_OCR_POC/fhir_module/fhir_client.py @@ -0,0 +1,194 @@ +import os +import json +import logging +import requests +from typing import Dict, Any, List, Optional, Union +from fhir.resources.resource import Resource +from fhir.resources.patient import Patient +from fhir.resources.observation import Observation + +class FHIRClient: + """ + Client for interacting with a FHIR server. + """ + + def __init__(self, base_url: str, auth_token: Optional[str] = None): + """ + Initialize the FHIR client. + + Args: + base_url: Base URL of the FHIR server + auth_token: Optional authentication token + """ + self.base_url = base_url.rstrip('/') + self.auth_token = auth_token + self.logger = logging.getLogger(__name__) + + # Verify FHIR server connection + try: + response = self._make_request('GET', f"{self.base_url}/metadata") + if response.status_code != 200: + self.logger.warning(f"FHIR server returned status {response.status_code} for metadata request") + else: + capability = response.json() + self.logger.info(f"Connected to FHIR server, version: {capability.get('software', {}).get('version', 'unknown')}") + except Exception as e: + self.logger.error(f"Error connecting to FHIR server: {str(e)}") + # Don't raise error here to allow for initialization even if server is not available + + def _make_request(self, method: str, url: str, data: Optional[Dict[str, Any]] = None) -> requests.Response: + """ + Make an HTTP request to the FHIR server. + + Args: + method: HTTP method (GET, POST, PUT, DELETE) + url: URL to request + data: Optional data to send + + Returns: + HTTP response + """ + headers = { + 'Content-Type': 'application/fhir+json', + 'Accept': 'application/fhir+json' + } + + # Add authentication if provided + if self.auth_token: + headers['Authorization'] = f"Bearer {self.auth_token}" + + try: + response = requests.request( + method=method, + url=url, + headers=headers, + json=data + ) + + # Log request details + self.logger.debug(f"{method} {url}: {response.status_code}") + + # Raise error for non-2xx responses + response.raise_for_status() + + return response + except requests.exceptions.HTTPError as e: + self.logger.error(f"HTTP error: {str(e)}") + # Include response body in error log if available + if hasattr(e, 'response') and e.response is not None: + self.logger.error(f"Response: {e.response.text}") + raise + except Exception as e: + self.logger.error(f"Request error: {str(e)}") + raise + + def create_resource(self, resource: Resource) -> Dict[str, Any]: + """ + Create a new FHIR resource. + + Args: + resource: FHIR resource to create + + Returns: + Created resource data + """ + resource_type = resource.resource_type + + # Convert resource to JSON + resource_json = json.loads(resource.json()) + + # Make request to create resource + response = self._make_request('POST', f"{self.base_url}/{resource_type}", resource_json) + + # Return created resource + return response.json() + + def read_resource(self, resource_type: str, resource_id: str) -> Dict[str, Any]: + """ + Read a FHIR resource by ID. + + Args: + resource_type: Type of resource (Patient, Observation, etc.) + resource_id: ID of the resource + + Returns: + Resource data + """ + response = self._make_request('GET', f"{self.base_url}/{resource_type}/{resource_id}") + return response.json() + + def update_resource(self, resource: Resource) -> Dict[str, Any]: + """ + Update an existing FHIR resource. + + Args: + resource: FHIR resource to update + + Returns: + Updated resource data + """ + resource_type = resource.resource_type + resource_id = resource.id + + # Convert resource to JSON + resource_json = json.loads(resource.json()) + + # Make request to update resource + response = self._make_request( + 'PUT', + f"{self.base_url}/{resource_type}/{resource_id}", + resource_json + ) + + # Return updated resource + return response.json() + + def delete_resource(self, resource_type: str, resource_id: str) -> bool: + """ + Delete a FHIR resource. + + Args: + resource_type: Type of resource + resource_id: ID of the resource + + Returns: + True if deletion was successful + """ + response = self._make_request('DELETE', f"{self.base_url}/{resource_type}/{resource_id}") + return response.status_code in (200, 202, 204) + + def search_resources(self, resource_type: str, params: Dict[str, Any] = None) -> List[Dict[str, Any]]: + """ + Search for FHIR resources. + + Args: + resource_type: Type of resource to search for + params: Search parameters + + Returns: + List of matching resources + """ + url = f"{self.base_url}/{resource_type}" + if params: + # Convert params to URL query string + query_params = [] + for key, value in params.items(): + if isinstance(value, list): + for v in value: + query_params.append(f"{key}={v}") + else: + query_params.append(f"{key}={value}") + + url = f"{url}?{'&'.join(query_params)}" + + response = self._make_request('GET', url) + bundle = response.json() + + # Extract resources from bundle + resources = [] + if 'entry' in bundle: + for entry in bundle['entry']: + if 'resource' in entry: + resources.append(entry['resource']) + + return resources \ No newline at end of file diff --git a/FHIR_OCR_POC/fhir_module/fhir_repository.py b/FHIR_OCR_POC/fhir_module/fhir_repository.py new file mode 100644 index 0000000..5b96c1e --- /dev/null +++ b/FHIR_OCR_POC/fhir_module/fhir_repository.py @@ -0,0 +1,244 @@ +import os +import json +import uuid +import datetime +import logging +from typing import Dict, Any, List, Optional, Union +from pathlib import Path +from fhir.resources.resource import Resource +from fhir.resources.patient import Patient +from fhir.resources.observation import Observation + +class FHIRRepository: + """ + Local storage repository for FHIR resources. + This is a simplified implementation for the POC, using file storage. + In production, this would be replaced with a database. + """ + + def __init__(self, storage_dir: str = 'fhir_storage'): + """ + Initialize the FHIR repository. + + Args: + storage_dir: Directory to store FHIR resources + """ + self.storage_dir = storage_dir + self.logger = logging.getLogger(__name__) + + # Create storage directory if it doesn't exist + try: + os.makedirs(os.path.join(storage_dir, 'Patient'), exist_ok=True) + os.makedirs(os.path.join(storage_dir, 'Observation'), exist_ok=True) + self.logger.info(f"FHIR storage directories created in {storage_dir}") + except Exception as e: + self.logger.error(f"Error creating storage directories: {str(e)}") + raise + + def _get_resource_path(self, resource_type: str, resource_id: str) -> str: + """ + Get the file path for a resource. + + Args: + resource_type: Type of resource + resource_id: ID of the resource + + Returns: + Path to the resource file + """ + return os.path.join(self.storage_dir, resource_type, f"{resource_id}.json") + + def _read_resource_file(self, file_path: str) -> Dict[str, Any]: + """ + Read a resource file. + + Args: + file_path: Path to the resource file + + Returns: + Resource data as a dictionary + """ + try: + with open(file_path, 'r') as f: + return json.load(f) + except Exception as e: + self.logger.error(f"Error reading resource file: {str(e)}") + raise + + def _write_resource_file(self, file_path: str, data: Dict[str, Any]) -> None: + """ + Write a resource file. + + Args: + file_path: Path to the resource file + data: Resource data + """ + try: + with open(file_path, 'w') as f: + json.dump(data, f, indent=2) + except Exception as e: + self.logger.error(f"Error writing resource file: {str(e)}") + raise + + def create_resource(self, resource: Resource) -> Dict[str, Any]: + """ + Create a new FHIR resource. + + Args: + resource: FHIR resource to create + + Returns: + Created resource data + """ + resource_type = resource.resource_type + + # Ensure resource has an ID + if not resource.id: + resource.id = str(uuid.uuid4()) + + # Convert resource to dictionary + resource_data = json.loads(resource.json()) + + # Add metadata + resource_data['meta'] = resource_data.get('meta', {}) + resource_data['meta']['lastUpdated'] = datetime.datetime.utcnow().isoformat() + + # Save resource to file + file_path = self._get_resource_path(resource_type, resource.id) + self._write_resource_file(file_path, resource_data) + + self.logger.info(f"Created {resource_type} resource with ID {resource.id}") + return resource_data + + def read_resource(self, resource_type: str, resource_id: str) -> Dict[str, Any]: + """ + Read a FHIR resource by ID. + + Args: + resource_type: Type of resource + resource_id: ID of the resource + + Returns: + Resource data + """ + file_path = self._get_resource_path(resource_type, resource_id) + + if not os.path.exists(file_path): + self.logger.error(f"Resource not found: {resource_type}/{resource_id}") + raise FileNotFoundError(f"Resource not found: {resource_type}/{resource_id}") + + resource_data = self._read_resource_file(file_path) + self.logger.debug(f"Read {resource_type} resource with ID {resource_id}") + return resource_data + + def update_resource(self, resource: Resource) -> Dict[str, Any]: + """ + Update an existing FHIR resource. + + Args: + resource: FHIR resource to update + + Returns: + Updated resource data + """ + resource_type = resource.resource_type + resource_id = resource.id + + if not resource_id: + self.logger.error("Cannot update resource without ID") + raise ValueError("Resource must have an ID for update") + + # Check if resource exists + file_path = self._get_resource_path(resource_type, resource_id) + if not os.path.exists(file_path): + self.logger.error(f"Resource not found for update: {resource_type}/{resource_id}") + raise FileNotFoundError(f"Resource not found for update: {resource_type}/{resource_id}") + + # Convert resource to dictionary + resource_data = json.loads(resource.json()) + + # Update metadata + resource_data['meta'] = resource_data.get('meta', {}) + resource_data['meta']['lastUpdated'] = datetime.datetime.utcnow().isoformat() + + # Save updated resource + self._write_resource_file(file_path, resource_data) + + self.logger.info(f"Updated {resource_type} resource with ID {resource_id}") + return resource_data + + def delete_resource(self, resource_type: str, resource_id: str) -> bool: + """ + Delete a FHIR resource. + + Args: + resource_type: Type of resource + resource_id: ID of the resource + + Returns: + True if deletion was successful + """ + file_path = self._get_resource_path(resource_type, resource_id) + + if not os.path.exists(file_path): + self.logger.error(f"Resource not found for deletion: {resource_type}/{resource_id}") + return False + + try: + os.remove(file_path) + self.logger.info(f"Deleted {resource_type} resource with ID {resource_id}") + return True + except Exception as e: + self.logger.error(f"Error deleting resource: {str(e)}") + return False + + def search_resources(self, resource_type: str, params: Dict[str, Any] = None) -> List[Dict[str, Any]]: + """ + Search for FHIR resources. + + Args: + resource_type: Type of resource to search for + params: Search parameters + + Returns: + List of matching resources + """ + # Check if resource type directory exists + resource_dir = os.path.join(self.storage_dir, resource_type) + if not os.path.exists(resource_dir): + self.logger.error(f"Resource type directory not found: {resource_type}") + return [] + + # Get all resource files of the specified type + resources = [] + for file_name in os.listdir(resource_dir): + if file_name.endswith('.json'): + file_path = os.path.join(resource_dir, file_name) + resource = self._read_resource_file(file_path) + + # Filter by parameters if provided + if params: + matches_all_params = True + for key, value in params.items(): + # Handle nested properties with dot notation (e.g., "name.family") + parts = key.split('.') + resource_value = resource + for part in parts: + if isinstance(resource_value, dict) and part in resource_value: + resource_value = resource_value[part] + else: + resource_value = None + break + + # Check if value matches + if resource_value != value: + matches_all_params = False + break + + if not matches_all_params: + continue + + resources.append(resource) + + self.logger.debug(f"Found {len(resources)} {resource_type} resources matching search criteria") + return resources \ No newline at end of file diff --git a/FHIR_OCR_POC/ocr_module/__init__.py b/FHIR_OCR_POC/ocr_module/__init__.py new file mode 100644 index 0000000..ec52a4a --- /dev/null +++ b/FHIR_OCR_POC/ocr_module/__init__.py @@ -0,0 +1,3 @@ +""" +OCR Module for extracting text from healthcare documents. +""" \ No newline at end of file diff --git a/FHIR_OCR_POC/ocr_module/fhir_mapper.py b/FHIR_OCR_POC/ocr_module/fhir_mapper.py new file mode 100644 index 0000000..0978e5d --- /dev/null +++ b/FHIR_OCR_POC/ocr_module/fhir_mapper.py @@ -0,0 +1,172 @@ +import uuid +import datetime +from typing import Dict, Any, Optional +from fhir.resources.patient import Patient +from fhir.resources.humanname import HumanName +from fhir.resources.identifier import Identifier +from fhir.resources.observation import Observation +from fhir.resources.codeableconcept import CodeableConcept +from fhir.resources.coding import Coding +from fhir.resources.reference import Reference + +class FHIRMapper: + """ + Maps OCR extracted data to FHIR resources. + """ + + def __init__(self): + """Initialize the FHIR mapper.""" + pass + + def map_to_patient(self, ocr_data: Dict[str, Any]) -> Patient: + """ + Map OCR extracted data to a FHIR Patient resource. + + Args: + ocr_data: Dictionary with OCR extracted data + + Returns: + FHIR Patient resource + """ + # Extract patient data from OCR results + patient_data = ocr_data.get('structured_data', {}).get('patient', {}) + + # Create a unique ID for the patient + patient_id = str(uuid.uuid4()) + + # Parse name + name = None + if patient_data.get('name'): + # Simple parsing - in production would need more sophisticated name parsing + name_parts = patient_data['name'].split() + if len(name_parts) > 1: + given = name_parts[:-1] + family = name_parts[-1] + else: + given = name_parts + family = "" + + name = HumanName(given=given, family=family, use="official") + + # Parse DOB + birth_date = None + if patient_data.get('dob'): + # Try to parse date - this is simplified and would need better handling + try: + # Attempt to parse common date formats + for fmt in ('%Y-%m-%d', '%m/%d/%Y', '%d/%m/%Y', '%B %d, %Y'): + try: + birth_date = datetime.datetime.strptime( + patient_data['dob'], fmt + ).strftime('%Y-%m-%d') + break + except ValueError: + continue + except Exception: + # If date parsing fails, store as is + birth_date = patient_data['dob'] + + # Create patient resource + patient = Patient( + id=patient_id, + identifier=[ + Identifier( + system="http://example.org/fhir/ocr-extracted-mrn", + value=patient_data.get('id', f"OCR-{patient_id}") + ) + ], + active=True + ) + + # Add name if available + if name: + patient.name = [name] + + # Add birth date if available + if birth_date: + patient.birthDate = birth_date + + # Add gender if available + if patient_data.get('gender'): + # Map to FHIR gender values + gender_map = { + 'male': 'male', + 'm': 'male', + 'female': 'female', + 'f': 'female', + 'other': 'other', + 'unknown': 'unknown' + } + gender_value = patient_data['gender'].lower() + patient.gender = gender_map.get(gender_value, 'unknown') + + return patient + + def map_to_observation(self, ocr_data: Dict[str, Any], patient_id: str) -> Optional[Observation]: + """ + Map OCR extracted data to a FHIR Observation resource. + + Args: + ocr_data: Dictionary with OCR extracted data + patient_id: ID of the associated patient + + Returns: + FHIR Observation resource or None if no observation data found + """ + # This is a simplified example that would need to be expanded based on + # the specific type of document being processed + + document_type = ocr_data.get('structured_data', {}).get('document_type') + + # Only process certain document types for observations + if document_type not in ['lab_result', 'prescription', 'clinical_note']: + return None + + # Create a unique ID for the observation + observation_id = str(uuid.uuid4()) + + # Create basic observation structure + observation = Observation( + id=observation_id, + status="final", + subject=Reference(reference=f"Patient/{patient_id}"), + effectiveDateTime=datetime.datetime.now().isoformat() + ) + + # Set category based on document type + if document_type == 'lab_result': + observation.category = [ + CodeableConcept( + coding=[ + Coding( + system="http://terminology.hl7.org/CodeSystem/observation-category", + code="laboratory", + display="Laboratory" + ) + ], + text="Laboratory" + ) + ] + elif document_type == 'prescription': + observation.category = [ + CodeableConcept( + coding=[ + Coding( + system="http://terminology.hl7.org/CodeSystem/observation-category", + code="medication", + display="Medication" + ) + ], + text="Medication" + ) + ] + + # In a real implementation, we would extract specific lab values or medication + # information from the OCR data and populate the observation accordingly + + # Example: store raw text in note for demonstration purposes + observation.note = [{ + "text": f"OCR extracted text: {ocr_data.get('raw_text', '')[:200]}..." + }] + + return observation \ No newline at end of file diff --git a/FHIR_OCR_POC/ocr_module/ocr_processor.py b/FHIR_OCR_POC/ocr_module/ocr_processor.py new file mode 100644 index 0000000..73d7e87 --- /dev/null +++ b/FHIR_OCR_POC/ocr_module/ocr_processor.py @@ -0,0 +1,185 @@ +import os +import logging +from typing import Dict, Any, List, Optional, Tuple +import cv2 +import pytesseract +from PIL import Image + +class OCRProcessor: + """ + OCR processor class for healthcare documents using Tesseract. + """ + + def __init__(self, tesseract_cmd: Optional[str] = None): + """ + Initialize the OCR processor. + + Args: + tesseract_cmd: Optional path to Tesseract executable + """ + if tesseract_cmd: + pytesseract.pytesseract.tesseract_cmd = tesseract_cmd + + # Set up logging + self.logger = logging.getLogger(__name__) + + # Verify Tesseract installation + try: + pytesseract.get_tesseract_version() + self.logger.info("Tesseract OCR initialized successfully") + except Exception as e: + self.logger.error(f"Failed to initialize Tesseract OCR: {str(e)}") + raise RuntimeError(f"Tesseract OCR not properly installed: {str(e)}") + + def preprocess_image(self, image): + """ + Preprocess image to improve OCR accuracy. + + Args: + image: OpenCV image object + + Returns: + Preprocessed image + """ + # Convert to grayscale + gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) + + # Apply threshold to get black and white image + _, binary = cv2.threshold(gray, 150, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU) + + # Invert back + binary = 255 - binary + + return binary + + def process_image(self, image_path: str) -> Dict[str, Any]: + """ + Process an image file and extract text using OCR. + + Args: + image_path: Path to the image file + + Returns: + Dictionary containing extracted text and metadata + """ + if not os.path.exists(image_path): + self.logger.error(f"Image file not found: {image_path}") + raise FileNotFoundError(f"Image file not found: {image_path}") + + try: + # Read image using OpenCV + img = cv2.imread(image_path) + if img is None: + raise ValueError(f"Failed to read image: {image_path}") + + # Preprocess image + preprocessed = self.preprocess_image(img) + + # Apply OCR + raw_text = pytesseract.image_to_string(preprocessed) + + # Get detailed OCR data including confidence levels + ocr_data = pytesseract.image_to_data(preprocessed, output_type=pytesseract.Output.DICT) + + # Extract structured data + structured_data = self.extract_healthcare_data(raw_text) + + result = { + "raw_text": raw_text, + "structured_data": structured_data, + "confidence": self._calculate_avg_confidence(ocr_data), + "metadata": { + "source_file": image_path, + "ocr_engine": "Tesseract", + "ocr_version": pytesseract.get_tesseract_version() + } + } + + self.logger.info(f"Successfully processed image: {image_path}") + return result + + except Exception as e: + self.logger.error(f"OCR processing error: {str(e)}") + raise RuntimeError(f"Failed to process image with OCR: {str(e)}") + + def extract_healthcare_data(self, text: str) -> Dict[str, Any]: + """ + Extract structured healthcare data from OCR text. + + Args: + text: Raw OCR text + + Returns: + Dictionary containing extracted healthcare data fields + """ + # This is a simplified implementation that would need to be enhanced + # with more sophisticated extraction logic for a real-world application + + lines = [line.strip() for line in text.split('\n') if line.strip()] + data = { + "patient": { + "name": None, + "dob": None, + "id": None, + "gender": None + }, + "document_type": self._detect_document_type(text), + "extracted_fields": {} + } + + # Simple extraction based on keywords (would need enhancement for production) + for line in lines: + if "name:" in line.lower() or "patient:" in line.lower(): + data["patient"]["name"] = self._extract_after_colon(line) + elif "dob:" in line.lower() or "birth" in line.lower() or "born:" in line.lower(): + data["patient"]["dob"] = self._extract_after_colon(line) + elif "id:" in line.lower() or "mrn:" in line.lower() or "record" in line.lower(): + data["patient"]["id"] = self._extract_after_colon(line) + elif "gender:" in line.lower() or "sex:" in line.lower(): + data["patient"]["gender"] = self._extract_after_colon(line) + + return data + + def _detect_document_type(self, text: str) -> str: + """ + Attempt to detect the type of healthcare document. + + Args: + text: Raw OCR text + + Returns: + Document type string + """ + text_lower = text.lower() + + if "insurance" in text_lower and ("card" in text_lower or "policy" in text_lower): + return "insurance_card" + elif "prescription" in text_lower: + return "prescription" + elif "lab" in text_lower and ("result" in text_lower or "report" in text_lower): + return "lab_result" + elif "discharge" in text_lower and "summary" in text_lower: + return "discharge_summary" + else: + return "unknown" + + def _extract_after_colon(self, text: str) -> str: + """Extract the content after a colon in a string.""" + if ":" in text: + return text.split(":", 1)[1].strip() + return text.strip() + + def _calculate_avg_confidence(self, ocr_data: Dict) -> float: + """ + Calculate average confidence score from OCR data. + + Args: + ocr_data: Dictionary containing OCR data from pytesseract + + Returns: + Average confidence score as a percentage + """ + confidences = [conf for conf in ocr_data.get('conf', []) if conf != -1] + if not confidences: + return 0.0 + return sum(confidences) / len(confidences) \ No newline at end of file diff --git a/FHIR_OCR_POC/ocr_module/test_ocr.py b/FHIR_OCR_POC/ocr_module/test_ocr.py new file mode 100644 index 0000000..099374b --- /dev/null +++ b/FHIR_OCR_POC/ocr_module/test_ocr.py @@ -0,0 +1,137 @@ +import os +import unittest +import logging +from unittest.mock import patch, MagicMock +import json +from .ocr_processor import OCRProcessor +from .fhir_mapper import FHIRMapper + +# Set up logging for tests +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +class TestOCRModule(unittest.TestCase): + """Test cases for OCR module.""" + + def setUp(self): + """Set up test fixtures.""" + # Mock Tesseract to avoid dependency on actual installation for tests + self.tesseract_patcher = patch('pytesseract.pytesseract.image_to_string') + self.mock_image_to_string = self.tesseract_patcher.start() + self.mock_image_to_string.return_value = """ + Patient: John Doe + DOB: 01/15/1980 + Sex: Male + MRN: 12345678 + Insurance: HealthCorp + Policy #: HC987654321 + """ + + # Mock image_to_data to return confidence scores + self.data_patcher = patch('pytesseract.pytesseract.image_to_data') + self.mock_image_to_data = self.data_patcher.start() + self.mock_image_to_data.return_value = { + 'conf': [90, 95, 85, 92, 88] + } + + # Mock Tesseract version + self.version_patcher = patch('pytesseract.pytesseract.get_tesseract_version') + self.mock_get_version = self.version_patcher.start() + self.mock_get_version.return_value = '4.1.1' + + # Create OCR processor with mocks + self.ocr = OCRProcessor() + + # Create FHIR mapper + self.mapper = FHIRMapper() + + def tearDown(self): + """Tear down test fixtures.""" + self.tesseract_patcher.stop() + self.data_patcher.stop() + self.version_patcher.stop() + + @patch('cv2.imread') + @patch('cv2.cvtColor') + @patch('cv2.threshold') + @patch('os.path.exists') + def test_process_image(self, mock_exists, mock_threshold, mock_cvtcolor, mock_imread): + """Test image processing and OCR extraction.""" + # Set up mocks + mock_exists.return_value = True + mock_imread.return_value = MagicMock() + mock_cvtcolor.return_value = MagicMock() + mock_threshold.return_value = (None, MagicMock()) + + # Process a mock image + result = self.ocr.process_image("test_image.jpg") + + # Verify results + self.assertIsNotNone(result) + self.assertIn("raw_text", result) + self.assertIn("structured_data", result) + self.assertIn("confidence", result) + self.assertIn("metadata", result) + + # Check structured data extraction + patient_data = result["structured_data"]["patient"] + self.assertEqual(patient_data["name"], "John Doe") + self.assertEqual(patient_data["dob"], "01/15/1980") + self.assertEqual(patient_data["gender"], "Male") + self.assertEqual(patient_data["id"], "12345678") + + # Check document type detection + self.assertEqual(result["structured_data"]["document_type"], "insurance_card") + + def test_map_to_fhir_patient(self): + """Test mapping OCR data to FHIR Patient resource.""" + # Create sample OCR data + ocr_data = { + "raw_text": "Patient: John Doe\nDOB: 01/15/1980\nSex: Male\nMRN: 12345678", + "structured_data": { + "patient": { + "name": "John Doe", + "dob": "01/15/1980", + "gender": "Male", + "id": "12345678" + }, + "document_type": "insurance_card" + } + } + + # Map to FHIR Patient + patient = self.mapper.map_to_patient(ocr_data) + + # Verify FHIR resource + self.assertIsNotNone(patient) + self.assertEqual(patient.gender, "male") + self.assertEqual(patient.name[0].family, "Doe") + self.assertEqual(patient.name[0].given[0], "John") + self.assertEqual(patient.birthDate, "1980-01-15") + self.assertEqual(patient.identifier[0].value, "12345678") + + def test_map_to_fhir_observation(self): + """Test mapping OCR data to FHIR Observation resource.""" + # Create sample OCR data for a lab result + ocr_data = { + "raw_text": "Lab Result\nPatient: John Doe\nTest: Blood Glucose\nResult: 120 mg/dL", + "structured_data": { + "patient": { + "name": "John Doe", + "id": "12345678" + }, + "document_type": "lab_result" + } + } + + # Map to FHIR Observation + observation = self.mapper.map_to_observation(ocr_data, "patient-123") + + # Verify FHIR resource + self.assertIsNotNone(observation) + self.assertEqual(observation.status, "final") + self.assertEqual(observation.subject.reference, "Patient/patient-123") + self.assertEqual(observation.category[0].coding[0].code, "laboratory") + +if __name__ == "__main__": + unittest.main() \ No newline at end of file diff --git a/FHIR_OCR_POC/requirements.txt b/FHIR_OCR_POC/requirements.txt new file mode 100644 index 0000000..f4224f6 --- /dev/null +++ b/FHIR_OCR_POC/requirements.txt @@ -0,0 +1,14 @@ +pytesseract==0.3.10 +pillow==10.0.0 +opencv-python==4.8.0.74 +numpy==1.24.0 +fhir.resources==6.5.0 +requests==2.31.0 +fastapi==0.103.1 +uvicorn==0.23.2 +python-multipart==0.0.6 +python-jose==3.3.0 +PyJWT==2.8.0 +python-dotenv==1.0.0 +pydantic==1.10.8 +pytest==7.4.0 \ No newline at end of file diff --git a/FHIR_OCR_POC/sample_data/create_sample_image.py b/FHIR_OCR_POC/sample_data/create_sample_image.py new file mode 100644 index 0000000..3a61a73 --- /dev/null +++ b/FHIR_OCR_POC/sample_data/create_sample_image.py @@ -0,0 +1,77 @@ +#!/usr/bin/env python +""" +Create a sample insurance card image for testing. + +This script generates a simple image that resembles an insurance card +with synthetic patient data for OCR testing. +""" + +import os +from PIL import Image, ImageDraw, ImageFont +import argparse + +def create_insurance_card(output_path, patient_name="John Doe", dob="01/15/1980", + member_id="ABC12345678", plan="HealthPlus Gold"): + """ + Create a sample insurance card image. + + Args: + output_path: Path to save the image + patient_name: Patient name + dob: Date of birth + member_id: Member ID + plan: Insurance plan + """ + # Create a blank image (standard card size in pixels at 300 DPI) + width, height = 1050, 650 # ~3.5" x 2.17" + image = Image.new('RGB', (width, height), color=(255, 255, 255)) + draw = ImageDraw.Draw(image) + + # Try to load a font, falling back to default if not available + try: + font_large = ImageFont.truetype("Arial", 36) + font_medium = ImageFont.truetype("Arial", 28) + font_small = ImageFont.truetype("Arial", 24) + except IOError: + # Use default font if Arial not available + font_large = ImageFont.load_default() + font_medium = ImageFont.load_default() + font_small = ImageFont.load_default() + + # Draw a blue rectangle at the top (insurance company header) + draw.rectangle([(0, 0), (width, 120)], fill=(0, 82, 156)) + + # Add insurance company name + draw.text((50, 40), "HealthCorp Insurance", fill=(255, 255, 255), font=font_large) + + # Add card details + draw.text((50, 150), f"Name: {patient_name}", fill=(0, 0, 0), font=font_medium) + draw.text((50, 200), f"DOB: {dob}", fill=(0, 0, 0), font=font_medium) + draw.text((50, 250), f"Member ID: {member_id}", fill=(0, 0, 0), font=font_medium) + draw.text((50, 300), f"Plan: {plan}", fill=(0, 0, 0), font=font_medium) + + # Add gender field + draw.text((50, 350), "Gender: Male", fill=(0, 0, 0), font=font_medium) + + # Add additional information + draw.text((50, 450), "Customer Service: 1-800-555-1234", fill=(0, 0, 0), font=font_small) + draw.text((50, 500), "Group #: HC987654", fill=(0, 0, 0), font=font_small) + draw.text((50, 550), "RxBIN: 123456 RxPCN: ABC", fill=(0, 0, 0), font=font_small) + + # Add a border + draw.rectangle([(0, 0), (width-1, height-1)], outline=(0, 0, 0), width=2) + + # Save the image + image.save(output_path) + print(f"Sample insurance card saved to: {output_path}") + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description='Create a sample insurance card image') + parser.add_argument('--output', default='sample_insurance_card.png', help='Output image path') + parser.add_argument('--name', default='John Doe', help='Patient name') + parser.add_argument('--dob', default='01/15/1980', help='Date of birth') + parser.add_argument('--id', default='ABC12345678', help='Member ID') + parser.add_argument('--plan', default='HealthPlus Gold', help='Insurance plan') + args = parser.parse_args() + + create_insurance_card(args.output, args.name, args.dob, args.id, args.plan) \ No newline at end of file diff --git a/FHIR_OCR_POC/security_module/__init__.py b/FHIR_OCR_POC/security_module/__init__.py new file mode 100644 index 0000000..c004fa5 --- /dev/null +++ b/FHIR_OCR_POC/security_module/__init__.py @@ -0,0 +1,3 @@ +""" +Security Module for authentication and authorization. +""" \ No newline at end of file diff --git a/FHIR_OCR_POC/security_module/auth.py b/FHIR_OCR_POC/security_module/auth.py new file mode 100644 index 0000000..923820e --- /dev/null +++ b/FHIR_OCR_POC/security_module/auth.py @@ -0,0 +1,234 @@ +import os +import time +import logging +from typing import Dict, Any, Optional, List, Union +import json +import jwt +from jwt.exceptions import PyJWTError +from fastapi import Request, HTTPException, Depends +from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials + +# Security config - in production, these would be loaded from environment variables +JWT_SECRET_KEY = os.getenv("JWT_SECRET_KEY", "dev-secret-key-replace-in-production") +JWT_ALGORITHM = os.getenv("JWT_ALGORITHM", "HS256") +ACCESS_TOKEN_EXPIRE_MINUTES = int(os.getenv("ACCESS_TOKEN_EXPIRE_MINUTES", "30")) + +# Set up security scheme for Swagger UI +security_scheme = HTTPBearer() + +class AuthHandler: + """ + Authentication handler for JWT tokens. + """ + + def __init__(self, secret_key: str = JWT_SECRET_KEY, algorithm: str = JWT_ALGORITHM): + """ + Initialize the authentication handler. + + Args: + secret_key: Secret key for JWT token signing + algorithm: JWT algorithm + """ + self.secret_key = secret_key + self.algorithm = algorithm + self.logger = logging.getLogger(__name__) + + def create_access_token(self, user_id: str, roles: List[str] = None) -> str: + """ + Create a new access token. + + Args: + user_id: User ID + roles: List of roles + + Returns: + JWT access token + """ + payload = { + "sub": user_id, + "roles": roles or [], + "exp": time.time() + (ACCESS_TOKEN_EXPIRE_MINUTES * 60), + "iat": time.time() + } + + token = jwt.encode(payload, self.secret_key, algorithm=self.algorithm) + self.logger.debug(f"Created access token for user {user_id}") + return token + + def decode_token(self, token: str) -> Dict[str, Any]: + """ + Decode and verify a JWT token. + + Args: + token: JWT token + + Returns: + Token payload + + Raises: + HTTPException: If token is invalid + """ + try: + payload = jwt.decode(token, self.secret_key, algorithms=[self.algorithm]) + return payload + except PyJWTError as e: + self.logger.error(f"Token verification failed: {str(e)}") + raise HTTPException(status_code=401, detail="Invalid token") + + def get_current_user(self, credentials: HTTPAuthorizationCredentials = Depends(security_scheme)) -> Dict[str, Any]: + """ + Get the current user from the token. + + Args: + credentials: HTTP authorization credentials + + Returns: + User info from token + + Raises: + HTTPException: If token is invalid + """ + token = credentials.credentials + payload = self.decode_token(token) + + # Check if token is expired + exp = payload.get("exp", 0) + if time.time() > exp: + self.logger.warning(f"Expired token for user {payload.get('sub')}") + raise HTTPException(status_code=401, detail="Token expired") + + return { + "user_id": payload.get("sub"), + "roles": payload.get("roles", []) + } + + def has_role(self, required_roles: List[str]) -> callable: + """ + Dependency for role-based access control. + + Args: + required_roles: List of required roles + + Returns: + Dependency function + """ + def check_roles(user: Dict[str, Any] = Depends(self.get_current_user)) -> Dict[str, Any]: + """ + Check if user has required roles. + + Args: + user: User info from token + + Returns: + User info + + Raises: + HTTPException: If user does not have required roles + """ + user_roles = user.get("roles", []) + + # Check if user has any of the required roles + if not any(role in user_roles for role in required_roles): + self.logger.warning(f"User {user['user_id']} does not have required roles: {required_roles}") + raise HTTPException(status_code=403, detail="Insufficient permissions") + + return user + + return check_roles + + +# For local development/testing, create a mock authentication handler +class MockAuthHandler: + """ + Mock authentication handler for local development/testing. + This should NOT be used in production. + """ + + def __init__(self): + """Initialize the mock authentication handler.""" + self.logger = logging.getLogger(__name__) + self.logger.warning("Using mock authentication handler - NOT SECURE FOR PRODUCTION") + + def create_access_token(self, user_id: str, roles: List[str] = None) -> str: + """ + Create a mock access token. + + Args: + user_id: User ID + roles: List of roles + + Returns: + Mock JWT token + """ + payload = { + "sub": user_id, + "roles": roles or [], + "exp": time.time() + 3600, # 1 hour + "iat": time.time() + } + + # Use a simple JWT with an obvious test key + token = jwt.encode(payload, "test-key-not-for-production", algorithm="HS256") + self.logger.debug(f"Created mock access token for user {user_id}") + return token + + def get_current_user(self, credentials: HTTPAuthorizationCredentials = Depends(security_scheme)) -> Dict[str, Any]: + """ + Get user info from mock token. + + Args: + credentials: HTTP authorization credentials + + Returns: + User info + """ + token = credentials.credentials + + try: + payload = jwt.decode(token, "test-key-not-for-production", algorithms=["HS256"]) + return { + "user_id": payload.get("sub"), + "roles": payload.get("roles", []) + } + except: + # For testing, allow a special "dev-token" that grants admin access + if token == "dev-token": + self.logger.warning("Using development token - NOT SECURE") + return { + "user_id": "dev-user", + "roles": ["admin"] + } + raise HTTPException(status_code=401, detail="Invalid token") + + def has_role(self, required_roles: List[str]) -> callable: + """ + Dependency for mock role-based access control. + + Args: + required_roles: List of required roles + + Returns: + Dependency function + """ + def check_roles(user: Dict[str, Any] = Depends(self.get_current_user)) -> Dict[str, Any]: + user_roles = user.get("roles", []) + + # For development, allow "admin" role to access anything + if "admin" in user_roles: + return user + + # Otherwise, check required roles + if not any(role in user_roles for role in required_roles): + self.logger.warning(f"User {user['user_id']} does not have required roles: {required_roles}") + raise HTTPException(status_code=403, detail="Insufficient permissions") + + return user + + return check_roles + + +# Create either real or mock auth handler based on environment +if os.getenv("ENVIRONMENT", "development") == "production": + auth_handler = AuthHandler() +else: + auth_handler = MockAuthHandler() \ No newline at end of file diff --git a/FHIR_OCR_POC/test_api_security.py b/FHIR_OCR_POC/test_api_security.py new file mode 100644 index 0000000..5c0bb73 --- /dev/null +++ b/FHIR_OCR_POC/test_api_security.py @@ -0,0 +1,108 @@ +#!/usr/bin/env python +""" +Test script for API security. + +This script tests the security features of the API by attempting to access endpoints +with and without proper authentication. +""" + +import requests +import argparse +import logging +import json + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s [%(levelname)s] %(message)s', + datefmt='%Y-%m-%d %H:%M:%S' +) +logger = logging.getLogger(__name__) + +def test_api_security(base_url): + """ + Test API security features. + + Args: + base_url: Base URL of the API + """ + logger.info("Testing API security...") + + # Test health endpoint (should be accessible without authentication) + logger.info("\nTesting health endpoint (public)...") + health_url = f"{base_url}/health" + response = requests.get(health_url) + logger.info(f"Status code: {response.status_code}") + if response.status_code == 200: + logger.info("Success! Health endpoint is publicly accessible as expected.") + else: + logger.error("Error: Health endpoint should be publicly accessible.") + + # Test token endpoint + logger.info("\nTesting token endpoint...") + token_url = f"{base_url}/auth/token" + + # Test with invalid credentials + logger.info("Testing with empty username...") + response = requests.post(token_url, json={"username": "", "password": "password"}) + logger.info(f"Status code: {response.status_code}") + if response.status_code == 400: + logger.info("Success! Token endpoint rejected empty username as expected.") + else: + logger.error("Error: Token endpoint should reject empty username.") + + # Test with valid credentials + logger.info("Testing with valid username...") + response = requests.post(token_url, json={"username": "admin", "password": "password"}) + logger.info(f"Status code: {response.status_code}") + + if response.status_code == 200: + token_data = response.json() + access_token = token_data.get("access_token") + logger.info("Success! Received valid token.") + + # Test protected endpoint without token + logger.info("\nTesting protected endpoint without token...") + patients_url = f"{base_url}/fhir/Patient" + response = requests.get(patients_url) + logger.info(f"Status code: {response.status_code}") + + if response.status_code in (401, 403): + logger.info("Success! Access denied without token as expected.") + else: + logger.error("Error: Protected endpoint should deny access without token.") + + # Test protected endpoint with token + logger.info("\nTesting protected endpoint with token...") + headers = {"Authorization": f"Bearer {access_token}"} + response = requests.get(patients_url, headers=headers) + logger.info(f"Status code: {response.status_code}") + + if response.status_code == 200: + logger.info("Success! Access granted with token as expected.") + else: + logger.error("Error: Protected endpoint should grant access with token.") + + # Test admin-only endpoint with user token + logger.info("\nTesting role-based access control...") + # For this POC, we use a generic endpoint that requires admin role + delete_url = f"{base_url}/fhir/Patient/non-existent-id" + response = requests.delete(delete_url, headers=headers) + + if response.status_code == 403: + logger.info("Success! Regular user denied access to admin endpoint as expected.") + elif response.status_code == 404: + logger.info("Success! Admin user granted access to admin endpoint as expected.") + else: + logger.error(f"Unexpected response code: {response.status_code}") + else: + logger.error("Error: Could not obtain token for testing.") + + logger.info("\nAPI security testing completed.") + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description='Test API security') + parser.add_argument('--url', default='http://localhost:8000', help='Base URL of the API') + args = parser.parse_args() + + test_api_security(args.url) \ No newline at end of file diff --git a/FHIR_OCR_POC/test_ocr_flow.py b/FHIR_OCR_POC/test_ocr_flow.py new file mode 100644 index 0000000..e9deae0 --- /dev/null +++ b/FHIR_OCR_POC/test_ocr_flow.py @@ -0,0 +1,110 @@ +#!/usr/bin/env python +""" +Test script for OCR to FHIR flow. + +This script demonstrates the full flow of OCR processing and FHIR resource creation, +using the local implementation without requiring the API to be running. +""" + +import os +import sys +import json +import logging +import argparse +from pathlib import Path + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s [%(levelname)s] %(name)s: %(message)s', + datefmt='%Y-%m-%d %H:%M:%S' +) +logger = logging.getLogger(__name__) + +def main(): + """Run the OCR to FHIR test flow.""" + # Parse command line arguments + parser = argparse.ArgumentParser(description='Test OCR to FHIR flow') + parser.add_argument('--image', required=True, help='Path to the image file to process') + parser.add_argument('--output', default='test_results', help='Directory to store results') + parser.add_argument('--tesseract', help='Path to Tesseract executable') + args = parser.parse_args() + + # Verify image file exists + if not os.path.exists(args.image): + logger.error(f"Image file not found: {args.image}") + sys.exit(1) + + # Create output directory + os.makedirs(args.output, exist_ok=True) + + try: + # Import local modules + # Add the current directory to the path if running from a different directory + sys.path.append(os.path.dirname(os.path.abspath(__file__))) + + from ocr_module.ocr_processor import OCRProcessor + from ocr_module.fhir_mapper import FHIRMapper + from fhir_module.fhir_repository import FHIRRepository + + # Initialize components + logger.info("Initializing components...") + ocr_processor = OCRProcessor(tesseract_cmd=args.tesseract) + fhir_mapper = FHIRMapper() + fhir_repository = FHIRRepository(storage_dir=args.output) + + # Process image with OCR + logger.info(f"Processing image: {args.image}") + ocr_result = ocr_processor.process_image(args.image) + + # Save OCR results to file + ocr_output_file = os.path.join(args.output, 'ocr_result.json') + with open(ocr_output_file, 'w') as f: + json.dump(ocr_result, f, indent=2) + + logger.info(f"OCR results saved to: {ocr_output_file}") + logger.info(f"Document type detected: {ocr_result['structured_data']['document_type']}") + logger.info(f"OCR confidence: {ocr_result['confidence']}%") + + # Map OCR data to FHIR Patient resource + logger.info("Mapping OCR data to FHIR Patient resource...") + patient = fhir_mapper.map_to_patient(ocr_result) + + # Create patient resource in FHIR repository + patient_data = fhir_repository.create_resource(patient) + + logger.info(f"Patient resource created with ID: {patient.id}") + + # Map to observation if applicable + if ocr_result["structured_data"]["document_type"] in ["lab_result", "prescription"]: + logger.info("Mapping OCR data to FHIR Observation resource...") + observation = fhir_mapper.map_to_observation(ocr_result, patient.id) + + if observation: + observation_data = fhir_repository.create_resource(observation) + logger.info(f"Observation resource created with ID: {observation.id}") + + # Print patient information + logger.info("\nExtracted Patient Information:") + patient_info = ocr_result["structured_data"]["patient"] + for key, value in patient_info.items(): + if value: + logger.info(f" {key.capitalize()}: {value}") + + # Test reading the patient from the repository + retrieved_patient = fhir_repository.read_resource("Patient", patient.id) + + # Save retrieved patient to file + patient_output_file = os.path.join(args.output, 'patient_resource.json') + with open(patient_output_file, 'w') as f: + json.dump(retrieved_patient, f, indent=2) + + logger.info(f"Patient resource saved to: {patient_output_file}") + logger.info("Test completed successfully!") + + except Exception as e: + logger.error(f"Error in OCR to FHIR flow: {str(e)}") + sys.exit(1) + +if __name__ == "__main__": + main() \ No newline at end of file