TLDR

This tutorial shows you how to build an AI-powered EHR integration system that can intelligently split mixed medical documents and extract structured data. First, use Cardinal’s /split endpoint to separate different document types, then extract specific pages into separate PDFs and apply targeted /extract schemas to each section for precise EHR data entry.

Building Your EHR Integration System with Proper Page Splitting

0) Install dependencies

# Install dependencies including PDF manipulation
!pip install -q requests python-dotenv pandas tqdm PyPDF2 io tempfile

1) Load environment variables

from google.colab import drive
drive.mount('/content/drive')

import os, dotenv
dotenv.load_dotenv('/content/drive/MyDrive/.env')

# Cardinal API
CARDINAL_URL = os.getenv("CARDINAL_URL", "https://api.trycardinal.ai")
CARDINAL_API_KEY = os.getenv("CARDINAL_API_KEY")

# Verify API key is loaded
if not CARDINAL_API_KEY:
    print("Warning: CARDINAL_API_KEY not found in environment variables")
else:
    print("Cardinal API key loaded successfully")

2) PDF Page Extraction Functions

import PyPDF2
import io
import tempfile
import requests
from typing import List, Optional
import base64

def download_pdf_from_url(pdf_url: str) -> bytes:
    """Download PDF content from URL."""
    try:
        response = requests.get(pdf_url, timeout=30)
        response.raise_for_status()
        return response.content
    except requests.exceptions.RequestException as e:
        raise Exception(f"Failed to download PDF from {pdf_url}: {str(e)}")

def extract_pages_from_pdf(pdf_content: bytes, pages: List[int]) -> bytes:
    """
    Extract specific pages from PDF content and return as new PDF bytes.
    
    Args:
        pdf_content: Raw PDF bytes
        pages: List of page numbers (1-indexed) to extract
    
    Returns:
        New PDF content containing only the specified pages
    """
    try:
        # Create PDF reader from bytes
        pdf_reader = PyPDF2.PdfReader(io.BytesIO(pdf_content))
        pdf_writer = PyPDF2.PdfWriter()
        
        # Validate page numbers
        total_pages = len(pdf_reader.pages)
        valid_pages = []
        
        for page_num in pages:
            if 1 <= page_num <= total_pages:
                valid_pages.append(page_num - 1)  # Convert to 0-indexed
            else:
                print(f"Warning: Page {page_num} is out of range (1-{total_pages}). Skipping.")
        
        if not valid_pages:
            raise Exception(f"No valid pages found in range 1-{total_pages}")
        
        # Extract specified pages
        for page_idx in valid_pages:
            pdf_writer.add_page(pdf_reader.pages[page_idx])
        
        # Write to bytes
        output_buffer = io.BytesIO()
        pdf_writer.write(output_buffer)
        return output_buffer.getvalue()
        
    except Exception as e:
        raise Exception(f"Failed to extract pages {pages}: {str(e)}")

def upload_pdf_bytes_to_temp_url(pdf_bytes: bytes, filename: str = None) -> str:
    """
    Upload PDF bytes to a temporary URL that Cardinal can access.
    
    Note: This is a placeholder function. In a real implementation, you would:
    1. Upload to your cloud storage (S3, GCS, etc.)
    2. Return a publicly accessible URL
    3. Optionally set expiration time for security
    
    For this demo, we'll simulate by writing to a temporary file and returning a file:// URL
    """
    if filename is None:
        filename = f"temp_extracted_pages_{id(pdf_bytes)}.pdf"
    
    # In a real implementation, upload to cloud storage here
    # For demo purposes, we'll create a temporary file
    temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.pdf', prefix='cardinal_')
    temp_file.write(pdf_bytes)
    temp_file.close()
    
    print(f"  📄 Created temporary PDF: {temp_file.name} ({len(pdf_bytes)} bytes)")
    
    # Return file path (in real implementation, return cloud storage URL)
    return f"file://{temp_file.name}"
    
    # REAL IMPLEMENTATION EXAMPLE:
    # import boto3
    # s3_client = boto3.client('s3')
    # bucket_name = "your-temp-bucket"
    # object_key = f"temp-pdfs/{filename}"
    # 
    # s3_client.put_object(
    #     Bucket=bucket_name,
    #     Key=object_key,
    #     Body=pdf_bytes,
    #     ContentType='application/pdf'
    # )
    # 
    # return f"https://{bucket_name}.s3.amazonaws.com/{object_key}"

def create_page_specific_pdf_url(original_pdf_url: str, pages: List[int], section_name: str) -> str:
    """
    Create a new PDF containing only the specified pages and return its URL.
    
    Args:
        original_pdf_url: URL of the original PDF
        pages: List of page numbers to extract
        section_name: Name of the document section (for filename)
    
    Returns:
        URL of the new PDF containing only the specified pages
    """
    try:
        print(f"  🔧 Extracting pages {pages} for {section_name}...")
        
        # Download original PDF
        pdf_content = download_pdf_from_url(original_pdf_url)
        print(f"  📥 Downloaded original PDF ({len(pdf_content)} bytes)")
        
        # Extract specific pages
        extracted_pdf = extract_pages_from_pdf(pdf_content, pages)
        print(f"  ✂️  Extracted {len(pages)} pages ({len(extracted_pdf)} bytes)")
        
        # Upload extracted PDF and get URL
        filename = f"{section_name}_pages_{'_'.join(map(str, pages))}.pdf"
        extracted_url = upload_pdf_bytes_to_temp_url(extracted_pdf, filename)
        
        return extracted_url
        
    except Exception as e:
        print(f"  ❌ Failed to create page-specific PDF: {str(e)}")
        raise

3) Document Processing Functions

import json

# Keep the same document queries and schemas from before
MEDICAL_DOCUMENT_QUERIES = [
    {
        "name": "patient_intake",
        "description": "Patient intake forms with personal information, medical history, symptoms, medications, and demographics"
    },
    {
        "name": "insurance_cards", 
        "description": "Insurance cards with member ID, group number, plan information, and insurance company details"
    },
    {
        "name": "consent_forms",
        "description": "Consent forms, HIPAA authorizations, treatment agreements, and signature pages"
    },
    {
        "name": "referral_letters",
        "description": "Physician referral letters, medical recommendations, and provider-to-provider communications"
    }
]

# [Include all the schema definitions from the previous version here]
PATIENT_INTAKE_SCHEMA = {
    "$schema": "https://json-schema.org/draft/2020-12/schema",
    "title": "PatientIntakeForm",
    "type": "object",
    "properties": {
        "patient_demographics": {
            "type": "object",
            "properties": {
                "first_name": {"type": "string"},
                "last_name": {"type": "string"},
                "date_of_birth": {"type": "string"},
                "gender": {"type": "string"},
                "address": {
                    "type": "object",
                    "properties": {
                        "street": {"type": "string"},
                        "city": {"type": "string"},
                        "state": {"type": "string"},
                        "zip_code": {"type": "string"}
                    }
                },
                "phone_number": {"type": "string"},
                "email": {"type": "string"},
                "emergency_contact": {
                    "type": "object",
                    "properties": {
                        "name": {"type": "string"},
                        "relationship": {"type": "string"},
                        "phone": {"type": "string"}
                    }
                }
            }
        },
        "medical_history": {
            "type": "object",
            "properties": {
                "current_medications": {
                    "type": "array",
                    "items": {
                        "type": "object",
                        "properties": {
                            "medication_name": {"type": "string"},
                            "dosage": {"type": "string"},
                            "frequency": {"type": "string"}
                        }
                    }
                },
                "allergies": {
                    "type": "array",
                    "items": {"type": "string"}
                },
                "medical_conditions": {
                    "type": "array", 
                    "items": {"type": "string"}
                },
                "previous_surgeries": {
                    "type": "array",
                    "items": {
                        "type": "object",
                        "properties": {
                            "procedure": {"type": "string"},
                            "date": {"type": "string"}
                        }
                    }
                },
                "family_history": {
                    "type": "array",
                    "items": {
                        "type": "object", 
                        "properties": {
                            "condition": {"type": "string"},
                            "relationship": {"type": "string"}
                        }
                    }
                }
            }
        },
        "chief_complaint": {
            "type": "object",
            "properties": {
                "primary_symptoms": {"type": "string"},
                "symptom_duration": {"type": "string"},
                "pain_level": {"type": "integer", "minimum": 0, "maximum": 10},
                "additional_concerns": {"type": "string"}
            }
        }
    },
    "required": ["patient_demographics"]
}

# [Include other schemas - INSURANCE_CARD_SCHEMA, CONSENT_FORM_SCHEMA, etc.]

SCHEMA_MAPPING = {
    "patient_intake": PATIENT_INTAKE_SCHEMA,
    # Add other schemas here
}

def extract_from_page_specific_pdf(page_specific_url: str,
                                  schema: Dict[str, Any], 
                                  section_name: str,
                                  original_pages: List[int],
                                  use_fast_mode: bool = False) -> Dict[str, Any]:
    """
    Extract structured data from a page-specific PDF.
    
    Args:
        page_specific_url: URL to the PDF containing only the relevant pages
        schema: JSON schema for extraction
        section_name: Name of the document section being processed
        original_pages: Original page numbers from the full document
        use_fast_mode: Use fast extraction mode
    
    Returns:
        Dictionary containing extracted structured data
    """
    url = f"{CARDINAL_URL.rstrip('/')}/extract"
    
    # Create context specific to the document section
    context_map = {
        "patient_intake": "Patient intake form with demographics, medical history, current symptoms, and medications",
        "insurance_cards": "Insurance card with member ID, group numbers, copay amounts, and coverage details", 
        "consent_forms": "Medical consent forms with HIPAA authorizations, signatures, and communication preferences",
        "referral_letters": "Medical referral letter from one provider to another with patient information and recommendations"
    }
    
    custom_context = context_map.get(section_name, f"Medical document section: {section_name}")
    
    # Prepare form data
    form_data = {
        "fileUrl": page_specific_url,
        "schema": json.dumps(schema),
        "fast": str(use_fast_mode).lower(),
        "customContext": custom_context
    }
    
    headers = {
        "x-api-key": CARDINAL_API_KEY
    }
    
    print(f"  🔍 Extracting {section_name} data from page-specific PDF...")
    
    try:
        response = requests.post(url, data=form_data, headers=headers, timeout=180)
        response.raise_for_status()
        
        result = response.json()
        
        # Parse the response
        if isinstance(result.get("response"), str):
            extracted_data = json.loads(result["response"])
        else:
            extracted_data = result.get("response", {})
            
        return {
            "success": True,
            "section_name": section_name,
            "original_pages": original_pages,
            "page_specific_url": page_specific_url,
            "data": extracted_data,
            "method": result.get("method", "unknown"),
            "pages_processed": result.get("pages_processed"),
            "raw_response": result
        }
        
    except requests.exceptions.RequestException as e:
        print(f"  ❌ Request error during {section_name} extraction: {e}")
        return {"success": False, "section_name": section_name, "error": str(e)}
    except json.JSONDecodeError as e:
        print(f"  ❌ JSON parsing error for {section_name}: {e}")
        return {"success": False, "section_name": section_name, "error": f"Failed to parse response: {e}"}
    except Exception as e:
        print(f"  ❌ Unexpected error during {section_name} extraction: {e}")
        return {"success": False, "section_name": section_name, "error": str(e)}

def split_medical_document(file_url: str, 
                          queries: List[Dict[str, str]] = None) -> Dict[str, Any]:
    """
    Split a mixed medical document into different types using Cardinal's /split endpoint.
    
    Args:
        file_url: Direct URL to the PDF file
        queries: List of query objects for document splitting
    
    Returns:
        Dictionary containing split results with pages for each document type
    """
    if queries is None:
        queries = MEDICAL_DOCUMENT_QUERIES
        
    url = f"{CARDINAL_URL.rstrip('/')}/split"
    
    # Prepare form data
    form_data = {
        "fileUrl": file_url,
        "queries": json.dumps(queries)
    }
    
    headers = {
        "x-api-key": CARDINAL_API_KEY
    }
    
    print(f"📋 Splitting medical document: {file_url}")
    print(f"🔍 Looking for {len(queries)} document types...")
    
    try:
        response = requests.post(url, data=form_data, headers=headers, timeout=180)
        response.raise_for_status()
        
        result = response.json()
        
        if result.get("success"):
            print(f"✅ Successfully split document into {len(result.get('partitions', []))} sections")
            
            # Print summary of what was found
            for partition in result.get("partitions", []):
                page_count = len(partition.get("pages", []))
                print(f"  📄 {partition['name']}: {page_count} pages {partition.get('pages', [])}")
                
            return {
                "success": True,
                "split_result": result
            }
        else:
            return {
                "success": False,
                "error": "Split operation failed",
                "raw_response": result
            }
            
    except requests.exceptions.RequestException as e:
        print(f"❌ Request error during document split: {e}")
        return {"success": False, "error": str(e)}
    except Exception as e:
        print(f"❌ Unexpected error during document split: {e}")
        return {"success": False, "error": str(e)}

4) Updated Complete EHR Integration Pipeline

def process_mixed_medical_document_with_page_splitting(file_url: str, use_fast_mode: bool = False) -> Dict[str, Any]:
    """
    Complete pipeline: Split document, extract specific pages, then extract structured data from each page-specific PDF.
    
    Args:
        file_url: URL to the mixed medical document
        use_fast_mode: Whether to use fast extraction mode
    
    Returns:
        Dictionary containing all extracted EHR data organized by document type
    """
    print(f"\n🏥 PROCESSING MIXED MEDICAL DOCUMENT WITH PAGE SPLITTING")
    print(f"📄 Document: {file_url}")
    print(f"⚡ Mode: {'Fast' if use_fast_mode else 'Standard'}")
    print("="*60)
    
    # Step 1: Split the document
    print("\n📋 Step 1: Splitting document into sections...")
    split_result = split_medical_document(file_url)
    
    if not split_result.get("success"):
        print("❌ Document splitting failed!")
        return {"success": False, "error": "Failed to split document", "details": split_result}
    
    # Step 2: Create page-specific PDFs and extract data
    print(f"\n🔍 Step 2: Creating page-specific PDFs and extracting data...")
    
    extraction_results = {}
    temp_files_created = []  # Keep track for cleanup
    split_data = split_result["split_result"]
    
    for partition in split_data.get("partitions", []):
        section_name = partition["name"]
        pages = partition.get("pages", [])
        
        if not pages:  # Skip empty partitions
            print(f"⚠️  Skipping {section_name} (no pages found)")
            continue
            
        # Get the appropriate schema for this document type
        schema = SCHEMA_MAPPING.get(section_name)
        if not schema:
            print(f"⚠️  No schema defined for {section_name}, skipping...")
            continue
            
        print(f"\n📑 Processing {section_name} (pages {pages})...")
        
        try:
            # Step 2a: Create page-specific PDF
            page_specific_url = create_page_specific_pdf_url(
                original_pdf_url=file_url,
                pages=pages,
                section_name=section_name
            )
            temp_files_created.append(page_specific_url)
            
            # Step 2b: Extract structured data from page-specific PDF
            extraction_result = extract_from_page_specific_pdf(
                page_specific_url=page_specific_url,
                schema=schema,
                section_name=section_name,
                original_pages=pages,
                use_fast_mode=use_fast_mode
            )
            
            extraction_results[section_name] = extraction_result
            
            if extraction_result.get("success"):
                print(f"  ✅ Successfully extracted {section_name} data")
            else:
                print(f"  ❌ Failed to extract {section_name} data: {extraction_result.get('error')}")
                
        except Exception as e:
            print(f"  ❌ Error processing {section_name}: {str(e)}")
            extraction_results[section_name] = {
                "success": False,
                "section_name": section_name,
                "error": str(e)
            }
    
    # Step 3: Compile final results
    successful_extractions = sum(1 for result in extraction_results.values() 
                               if result.get("success"))
    
    print(f"\n📊 PROCESSING COMPLETE:")
    print(f"   Document sections found: {len(split_data.get('partitions', []))}")
    print(f"   Page-specific PDFs created: {len(temp_files_created)}")
    print(f"   Successful extractions: {successful_extractions}")
    print(f"   Failed extractions: {len(extraction_results) - successful_extractions}")
    
    # Cleanup temporary files (optional - in production you might want to keep them)
    print(f"\n🧹 Cleanup: Created {len(temp_files_created)} temporary files")
    
    return {
        "success": True,
        "document_url": file_url,
        "split_results": split_data,
        "extraction_results": extraction_results,
        "temp_files_created": temp_files_created,
        "summary": {
            "total_sections": len(split_data.get("partitions", [])),
            "successful_extractions": successful_extractions,
            "processing_mode": "fast" if use_fast_mode else "standard",
            "page_splitting_enabled": True
        }
    }

# Example usage
SAMPLE_EHR_DOCUMENT = "https://example-medical-docs.s3.amazonaws.com/mixed_patient_packet.pdf"

print("🚀 Starting EHR Integration Pipeline with Page Splitting...")

# Process with proper page splitting
ehr_results = process_mixed_medical_document_with_page_splitting(
    file_url=SAMPLE_EHR_DOCUMENT,
    use_fast_mode=False
)

if ehr_results.get("success"):
    print(f"\n🎉 SUCCESS! Processed document with page splitting:")
    print(f"   • Split into {ehr_results['summary']['total_sections']} sections")
    print(f"   • Created {len(ehr_results['temp_files_created'])} page-specific PDFs")
    print(f"   • Successfully extracted {ehr_results['summary']['successful_extractions']} data sets")
else:
    print(f"❌ Processing failed: {ehr_results.get('error')}")

5) Cleanup Function

import os

def cleanup_temp_files(temp_file_urls: List[str]):
    """Clean up temporary PDF files created during processing."""
    cleaned_count = 0
    
    for url in temp_file_urls:
        if url.startswith("file://"):
            file_path = url.replace("file://", "")
            try:
                if os.path.exists(file_path):
                    os.remove(file_path)
                    cleaned_count += 1
                    print(f"  🗑️  Deleted: {file_path}")
            except Exception as e:
                print(f"  ⚠️  Failed to delete {file_path}: {e}")
    
    print(f"🧹 Cleanup complete: {cleaned_count} temporary files removed")

# Example cleanup
if 'ehr_results' in locals() and ehr_results.get("temp_files_created"):
    print("\n🧹 CLEANING UP TEMPORARY FILES...")
    cleanup_temp_files(ehr_results["temp_files_created"])

What You Just Built

Congratulations! You’ve created an intelligent EHR integration system that can split mixed medical documents and extract structured data. Your system can now:
  • Split mixed documents into patient intake, insurance cards, and consent forms
  • Extract targeted data using specialized schemas for each document type
  • Process batch workflows for multiple patient packets efficiently
  • Export EHR-ready data in CSV format for seamless integration
Your sample patient packet is just the beginning - imagine processing hundreds of patient documents automatically for instant EHR data entry!