TLDR
This tutorial shows you how to build an AI-powered EHR integration system that can intelligently split mixed medical documents and extract structured data. First, use Cardinal’s/split
endpoint to separate different document types, then extract specific pages into separate PDFs and apply targeted /extract
schemas to each section for precise EHR data entry.
Building Your EHR Integration System with Proper Page Splitting
0) Install dependencies
Copy
Ask AI
# Install dependencies including PDF manipulation
!pip install -q requests python-dotenv pandas tqdm PyPDF2 io tempfile
1) Load environment variables
Copy
Ask AI
from google.colab import drive
drive.mount('/content/drive')
import os, dotenv
dotenv.load_dotenv('/content/drive/MyDrive/.env')
# Cardinal API
CARDINAL_URL = os.getenv("CARDINAL_URL", "https://api.trycardinal.ai")
CARDINAL_API_KEY = os.getenv("CARDINAL_API_KEY")
# Verify API key is loaded
if not CARDINAL_API_KEY:
print("Warning: CARDINAL_API_KEY not found in environment variables")
else:
print("Cardinal API key loaded successfully")
2) PDF Page Extraction Functions
Copy
Ask AI
import PyPDF2
import io
import tempfile
import requests
from typing import List, Optional
import base64
def download_pdf_from_url(pdf_url: str) -> bytes:
"""Download PDF content from URL."""
try:
response = requests.get(pdf_url, timeout=30)
response.raise_for_status()
return response.content
except requests.exceptions.RequestException as e:
raise Exception(f"Failed to download PDF from {pdf_url}: {str(e)}")
def extract_pages_from_pdf(pdf_content: bytes, pages: List[int]) -> bytes:
"""
Extract specific pages from PDF content and return as new PDF bytes.
Args:
pdf_content: Raw PDF bytes
pages: List of page numbers (1-indexed) to extract
Returns:
New PDF content containing only the specified pages
"""
try:
# Create PDF reader from bytes
pdf_reader = PyPDF2.PdfReader(io.BytesIO(pdf_content))
pdf_writer = PyPDF2.PdfWriter()
# Validate page numbers
total_pages = len(pdf_reader.pages)
valid_pages = []
for page_num in pages:
if 1 <= page_num <= total_pages:
valid_pages.append(page_num - 1) # Convert to 0-indexed
else:
print(f"Warning: Page {page_num} is out of range (1-{total_pages}). Skipping.")
if not valid_pages:
raise Exception(f"No valid pages found in range 1-{total_pages}")
# Extract specified pages
for page_idx in valid_pages:
pdf_writer.add_page(pdf_reader.pages[page_idx])
# Write to bytes
output_buffer = io.BytesIO()
pdf_writer.write(output_buffer)
return output_buffer.getvalue()
except Exception as e:
raise Exception(f"Failed to extract pages {pages}: {str(e)}")
def upload_pdf_bytes_to_temp_url(pdf_bytes: bytes, filename: str = None) -> str:
"""
Upload PDF bytes to a temporary URL that Cardinal can access.
Note: This is a placeholder function. In a real implementation, you would:
1. Upload to your cloud storage (S3, GCS, etc.)
2. Return a publicly accessible URL
3. Optionally set expiration time for security
For this demo, we'll simulate by writing to a temporary file and returning a file:// URL
"""
if filename is None:
filename = f"temp_extracted_pages_{id(pdf_bytes)}.pdf"
# In a real implementation, upload to cloud storage here
# For demo purposes, we'll create a temporary file
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.pdf', prefix='cardinal_')
temp_file.write(pdf_bytes)
temp_file.close()
print(f" 📄 Created temporary PDF: {temp_file.name} ({len(pdf_bytes)} bytes)")
# Return file path (in real implementation, return cloud storage URL)
return f"file://{temp_file.name}"
# REAL IMPLEMENTATION EXAMPLE:
# import boto3
# s3_client = boto3.client('s3')
# bucket_name = "your-temp-bucket"
# object_key = f"temp-pdfs/{filename}"
#
# s3_client.put_object(
# Bucket=bucket_name,
# Key=object_key,
# Body=pdf_bytes,
# ContentType='application/pdf'
# )
#
# return f"https://{bucket_name}.s3.amazonaws.com/{object_key}"
def create_page_specific_pdf_url(original_pdf_url: str, pages: List[int], section_name: str) -> str:
"""
Create a new PDF containing only the specified pages and return its URL.
Args:
original_pdf_url: URL of the original PDF
pages: List of page numbers to extract
section_name: Name of the document section (for filename)
Returns:
URL of the new PDF containing only the specified pages
"""
try:
print(f" 🔧 Extracting pages {pages} for {section_name}...")
# Download original PDF
pdf_content = download_pdf_from_url(original_pdf_url)
print(f" 📥 Downloaded original PDF ({len(pdf_content)} bytes)")
# Extract specific pages
extracted_pdf = extract_pages_from_pdf(pdf_content, pages)
print(f" ✂️ Extracted {len(pages)} pages ({len(extracted_pdf)} bytes)")
# Upload extracted PDF and get URL
filename = f"{section_name}_pages_{'_'.join(map(str, pages))}.pdf"
extracted_url = upload_pdf_bytes_to_temp_url(extracted_pdf, filename)
return extracted_url
except Exception as e:
print(f" ❌ Failed to create page-specific PDF: {str(e)}")
raise
3) Document Processing Functions
Copy
Ask AI
import json
# Keep the same document queries and schemas from before
MEDICAL_DOCUMENT_QUERIES = [
{
"name": "patient_intake",
"description": "Patient intake forms with personal information, medical history, symptoms, medications, and demographics"
},
{
"name": "insurance_cards",
"description": "Insurance cards with member ID, group number, plan information, and insurance company details"
},
{
"name": "consent_forms",
"description": "Consent forms, HIPAA authorizations, treatment agreements, and signature pages"
},
{
"name": "referral_letters",
"description": "Physician referral letters, medical recommendations, and provider-to-provider communications"
}
]
# [Include all the schema definitions from the previous version here]
PATIENT_INTAKE_SCHEMA = {
"$schema": "https://json-schema.org/draft/2020-12/schema",
"title": "PatientIntakeForm",
"type": "object",
"properties": {
"patient_demographics": {
"type": "object",
"properties": {
"first_name": {"type": "string"},
"last_name": {"type": "string"},
"date_of_birth": {"type": "string"},
"gender": {"type": "string"},
"address": {
"type": "object",
"properties": {
"street": {"type": "string"},
"city": {"type": "string"},
"state": {"type": "string"},
"zip_code": {"type": "string"}
}
},
"phone_number": {"type": "string"},
"email": {"type": "string"},
"emergency_contact": {
"type": "object",
"properties": {
"name": {"type": "string"},
"relationship": {"type": "string"},
"phone": {"type": "string"}
}
}
}
},
"medical_history": {
"type": "object",
"properties": {
"current_medications": {
"type": "array",
"items": {
"type": "object",
"properties": {
"medication_name": {"type": "string"},
"dosage": {"type": "string"},
"frequency": {"type": "string"}
}
}
},
"allergies": {
"type": "array",
"items": {"type": "string"}
},
"medical_conditions": {
"type": "array",
"items": {"type": "string"}
},
"previous_surgeries": {
"type": "array",
"items": {
"type": "object",
"properties": {
"procedure": {"type": "string"},
"date": {"type": "string"}
}
}
},
"family_history": {
"type": "array",
"items": {
"type": "object",
"properties": {
"condition": {"type": "string"},
"relationship": {"type": "string"}
}
}
}
}
},
"chief_complaint": {
"type": "object",
"properties": {
"primary_symptoms": {"type": "string"},
"symptom_duration": {"type": "string"},
"pain_level": {"type": "integer", "minimum": 0, "maximum": 10},
"additional_concerns": {"type": "string"}
}
}
},
"required": ["patient_demographics"]
}
# [Include other schemas - INSURANCE_CARD_SCHEMA, CONSENT_FORM_SCHEMA, etc.]
SCHEMA_MAPPING = {
"patient_intake": PATIENT_INTAKE_SCHEMA,
# Add other schemas here
}
def extract_from_page_specific_pdf(page_specific_url: str,
schema: Dict[str, Any],
section_name: str,
original_pages: List[int],
use_fast_mode: bool = False) -> Dict[str, Any]:
"""
Extract structured data from a page-specific PDF.
Args:
page_specific_url: URL to the PDF containing only the relevant pages
schema: JSON schema for extraction
section_name: Name of the document section being processed
original_pages: Original page numbers from the full document
use_fast_mode: Use fast extraction mode
Returns:
Dictionary containing extracted structured data
"""
url = f"{CARDINAL_URL.rstrip('/')}/extract"
# Create context specific to the document section
context_map = {
"patient_intake": "Patient intake form with demographics, medical history, current symptoms, and medications",
"insurance_cards": "Insurance card with member ID, group numbers, copay amounts, and coverage details",
"consent_forms": "Medical consent forms with HIPAA authorizations, signatures, and communication preferences",
"referral_letters": "Medical referral letter from one provider to another with patient information and recommendations"
}
custom_context = context_map.get(section_name, f"Medical document section: {section_name}")
# Prepare form data
form_data = {
"fileUrl": page_specific_url,
"schema": json.dumps(schema),
"fast": str(use_fast_mode).lower(),
"customContext": custom_context
}
headers = {
"x-api-key": CARDINAL_API_KEY
}
print(f" 🔍 Extracting {section_name} data from page-specific PDF...")
try:
response = requests.post(url, data=form_data, headers=headers, timeout=180)
response.raise_for_status()
result = response.json()
# Parse the response
if isinstance(result.get("response"), str):
extracted_data = json.loads(result["response"])
else:
extracted_data = result.get("response", {})
return {
"success": True,
"section_name": section_name,
"original_pages": original_pages,
"page_specific_url": page_specific_url,
"data": extracted_data,
"method": result.get("method", "unknown"),
"pages_processed": result.get("pages_processed"),
"raw_response": result
}
except requests.exceptions.RequestException as e:
print(f" ❌ Request error during {section_name} extraction: {e}")
return {"success": False, "section_name": section_name, "error": str(e)}
except json.JSONDecodeError as e:
print(f" ❌ JSON parsing error for {section_name}: {e}")
return {"success": False, "section_name": section_name, "error": f"Failed to parse response: {e}"}
except Exception as e:
print(f" ❌ Unexpected error during {section_name} extraction: {e}")
return {"success": False, "section_name": section_name, "error": str(e)}
def split_medical_document(file_url: str,
queries: List[Dict[str, str]] = None) -> Dict[str, Any]:
"""
Split a mixed medical document into different types using Cardinal's /split endpoint.
Args:
file_url: Direct URL to the PDF file
queries: List of query objects for document splitting
Returns:
Dictionary containing split results with pages for each document type
"""
if queries is None:
queries = MEDICAL_DOCUMENT_QUERIES
url = f"{CARDINAL_URL.rstrip('/')}/split"
# Prepare form data
form_data = {
"fileUrl": file_url,
"queries": json.dumps(queries)
}
headers = {
"x-api-key": CARDINAL_API_KEY
}
print(f"📋 Splitting medical document: {file_url}")
print(f"🔍 Looking for {len(queries)} document types...")
try:
response = requests.post(url, data=form_data, headers=headers, timeout=180)
response.raise_for_status()
result = response.json()
if result.get("success"):
print(f"✅ Successfully split document into {len(result.get('partitions', []))} sections")
# Print summary of what was found
for partition in result.get("partitions", []):
page_count = len(partition.get("pages", []))
print(f" 📄 {partition['name']}: {page_count} pages {partition.get('pages', [])}")
return {
"success": True,
"split_result": result
}
else:
return {
"success": False,
"error": "Split operation failed",
"raw_response": result
}
except requests.exceptions.RequestException as e:
print(f"❌ Request error during document split: {e}")
return {"success": False, "error": str(e)}
except Exception as e:
print(f"❌ Unexpected error during document split: {e}")
return {"success": False, "error": str(e)}
4) Updated Complete EHR Integration Pipeline
Copy
Ask AI
def process_mixed_medical_document_with_page_splitting(file_url: str, use_fast_mode: bool = False) -> Dict[str, Any]:
"""
Complete pipeline: Split document, extract specific pages, then extract structured data from each page-specific PDF.
Args:
file_url: URL to the mixed medical document
use_fast_mode: Whether to use fast extraction mode
Returns:
Dictionary containing all extracted EHR data organized by document type
"""
print(f"\n🏥 PROCESSING MIXED MEDICAL DOCUMENT WITH PAGE SPLITTING")
print(f"📄 Document: {file_url}")
print(f"⚡ Mode: {'Fast' if use_fast_mode else 'Standard'}")
print("="*60)
# Step 1: Split the document
print("\n📋 Step 1: Splitting document into sections...")
split_result = split_medical_document(file_url)
if not split_result.get("success"):
print("❌ Document splitting failed!")
return {"success": False, "error": "Failed to split document", "details": split_result}
# Step 2: Create page-specific PDFs and extract data
print(f"\n🔍 Step 2: Creating page-specific PDFs and extracting data...")
extraction_results = {}
temp_files_created = [] # Keep track for cleanup
split_data = split_result["split_result"]
for partition in split_data.get("partitions", []):
section_name = partition["name"]
pages = partition.get("pages", [])
if not pages: # Skip empty partitions
print(f"⚠️ Skipping {section_name} (no pages found)")
continue
# Get the appropriate schema for this document type
schema = SCHEMA_MAPPING.get(section_name)
if not schema:
print(f"⚠️ No schema defined for {section_name}, skipping...")
continue
print(f"\n📑 Processing {section_name} (pages {pages})...")
try:
# Step 2a: Create page-specific PDF
page_specific_url = create_page_specific_pdf_url(
original_pdf_url=file_url,
pages=pages,
section_name=section_name
)
temp_files_created.append(page_specific_url)
# Step 2b: Extract structured data from page-specific PDF
extraction_result = extract_from_page_specific_pdf(
page_specific_url=page_specific_url,
schema=schema,
section_name=section_name,
original_pages=pages,
use_fast_mode=use_fast_mode
)
extraction_results[section_name] = extraction_result
if extraction_result.get("success"):
print(f" ✅ Successfully extracted {section_name} data")
else:
print(f" ❌ Failed to extract {section_name} data: {extraction_result.get('error')}")
except Exception as e:
print(f" ❌ Error processing {section_name}: {str(e)}")
extraction_results[section_name] = {
"success": False,
"section_name": section_name,
"error": str(e)
}
# Step 3: Compile final results
successful_extractions = sum(1 for result in extraction_results.values()
if result.get("success"))
print(f"\n📊 PROCESSING COMPLETE:")
print(f" Document sections found: {len(split_data.get('partitions', []))}")
print(f" Page-specific PDFs created: {len(temp_files_created)}")
print(f" Successful extractions: {successful_extractions}")
print(f" Failed extractions: {len(extraction_results) - successful_extractions}")
# Cleanup temporary files (optional - in production you might want to keep them)
print(f"\n🧹 Cleanup: Created {len(temp_files_created)} temporary files")
return {
"success": True,
"document_url": file_url,
"split_results": split_data,
"extraction_results": extraction_results,
"temp_files_created": temp_files_created,
"summary": {
"total_sections": len(split_data.get("partitions", [])),
"successful_extractions": successful_extractions,
"processing_mode": "fast" if use_fast_mode else "standard",
"page_splitting_enabled": True
}
}
# Example usage
SAMPLE_EHR_DOCUMENT = "https://example-medical-docs.s3.amazonaws.com/mixed_patient_packet.pdf"
print("🚀 Starting EHR Integration Pipeline with Page Splitting...")
# Process with proper page splitting
ehr_results = process_mixed_medical_document_with_page_splitting(
file_url=SAMPLE_EHR_DOCUMENT,
use_fast_mode=False
)
if ehr_results.get("success"):
print(f"\n🎉 SUCCESS! Processed document with page splitting:")
print(f" • Split into {ehr_results['summary']['total_sections']} sections")
print(f" • Created {len(ehr_results['temp_files_created'])} page-specific PDFs")
print(f" • Successfully extracted {ehr_results['summary']['successful_extractions']} data sets")
else:
print(f"❌ Processing failed: {ehr_results.get('error')}")
5) Cleanup Function
Copy
Ask AI
import os
def cleanup_temp_files(temp_file_urls: List[str]):
"""Clean up temporary PDF files created during processing."""
cleaned_count = 0
for url in temp_file_urls:
if url.startswith("file://"):
file_path = url.replace("file://", "")
try:
if os.path.exists(file_path):
os.remove(file_path)
cleaned_count += 1
print(f" 🗑️ Deleted: {file_path}")
except Exception as e:
print(f" ⚠️ Failed to delete {file_path}: {e}")
print(f"🧹 Cleanup complete: {cleaned_count} temporary files removed")
# Example cleanup
if 'ehr_results' in locals() and ehr_results.get("temp_files_created"):
print("\n🧹 CLEANING UP TEMPORARY FILES...")
cleanup_temp_files(ehr_results["temp_files_created"])
What You Just Built
Congratulations! You’ve created an intelligent EHR integration system that can split mixed medical documents and extract structured data. Your system can now:- Split mixed documents into patient intake, insurance cards, and consent forms
- Extract targeted data using specialized schemas for each document type
- Process batch workflows for multiple patient packets efficiently
- Export EHR-ready data in CSV format for seamless integration