TLDR
This tutorial shows you how to build an AI-powered search system for prior authorization workflows. You’ll upload patient intake forms and medical documents, extract all the text with precise locations, and create a searchable knowledge base. When you need to fill out a prior auth form, you can instantly search across all your patient data to find relevant medical history, diagnoses, medications, and other required information—complete with citations showing exactly where each piece of information came from in the original documents.Building Your Medical Knowledge Base: Prior Authorization Edition
Let’s begin with a practical example—processing prior authorization requests. This will serve as your proof of concept. At a high level, we’ll use the Cardinal API to extract text along with its bounding boxes from scanned PDFs. Those extracted chunks will then be embedded and stored in Weaviate’s vector database. From there, you can query Weaviate to retrieve the most relevant results for any search.0) Install dependencies
Copy
Ask AI
# Install dependencies
!pip install -q "weaviate-client==4.6.5" python-dotenv boto3 requests tqdm pandas pillow PyMuPDF pdf2image
1) Load environment variables & Connect to Weaviate
Copy
Ask AI
from google.colab import drive
drive.mount('/content/drive')
import os, dotenv
dotenv.load_dotenv('/content/drive/MyDrive/.env')
# Cardinal
CARDINAL_URL = os.getenv("CARDINAL_URL", "https://api.trycardinal.ai")
CARDINAL_API_KEY = os.getenv("CARDINAL_API_KEY")
# Weaviate
WEAVIATE_URL = os.getenv("WEAVIATE_URL")
WEAVIATE_API_KEY = os.getenv("WEAVIATE_API_KEY")
# OpenAI (for Weaviate to use for vectorizer/generative)
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
# S3 (optional - we'll use this for our sample file)
AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID") or os.getenv("AWS_KEY")
AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_ACCESS_KEY") or os.getenv("AWS_SECRET")
AWS_S3_BUCKET = os.getenv("AWS_S3_BUCKET") or os.getenv("AWS_S3_NAME")
S3_PREFIX = os.getenv("S3_PREFIX", "")
2) Connect to Weaviate
Copy
Ask AI
import weaviate
import weaviate.classes.config as wc
import os
# Check Weaviate client version
print(f"Weaviate client version: {weaviate.__version__}")
client = weaviate.connect_to_wcs(
cluster_url=WEAVIATE_URL,
auth_credentials=weaviate.auth.AuthApiKey(WEAVIATE_API_KEY),
headers={"X-OpenAI-Api-Key": OPENAI_API_KEY} if OPENAI_API_KEY else {}
)
print("Connected to Weaviate:", client.is_ready())
3) Create a Weaviate collection
We’re calling it “CardinalDemo”:Copy
Ask AI
try:
client.collections.delete("CardinalDemo")
print("Deleted existing CardinalDemo collection")
except Exception as e:
print(f"No existing collection to delete: {e}")
# Create new collection
documents = client.collections.create(
name="CardinalDemo",
vectorizer_config=wc.Configure.Vectorizer.text2vec_openai(
model="text-embedding-3-small",
type_="text"
),
generative_config=wc.Configure.Generative.openai(
model="gpt-4"
),
properties=[
wc.Property(name="text", data_type=wc.DataType.TEXT),
wc.Property(name="type", data_type=wc.DataType.TEXT),
wc.Property(name="element_id", data_type=wc.DataType.TEXT, skip_vectorization=True),
wc.Property(name="page_number", data_type=wc.DataType.INT),
wc.Property(name="page_width_pts", data_type=wc.DataType.NUMBER, skip_vectorization=True),
wc.Property(name="page_height_pts", data_type=wc.DataType.NUMBER, skip_vectorization=True),
# Original bbox (inches) - Cardinal's native format
wc.Property(name="bbox_in", data_type=wc.DataType.OBJECT, nested_properties=[
wc.Property(name="min_x", data_type=wc.DataType.NUMBER),
wc.Property(name="min_y", data_type=wc.DataType.NUMBER),
wc.Property(name="max_x", data_type=wc.DataType.NUMBER),
wc.Property(name="max_y", data_type=wc.DataType.NUMBER),
], skip_vectorization=True),
# Derived bbox (points) - standard PDF coordinates
wc.Property(name="bbox_pts", data_type=wc.DataType.OBJECT, nested_properties=[
wc.Property(name="x", data_type=wc.DataType.NUMBER),
wc.Property(name="y", data_type=wc.DataType.NUMBER),
wc.Property(name="w", data_type=wc.DataType.NUMBER),
wc.Property(name="h", data_type=wc.DataType.NUMBER),
], skip_vectorization=True),
# Normalized bbox (%) - perfect for web overlays
wc.Property(name="bbox_norm", data_type=wc.DataType.OBJECT, nested_properties=[
wc.Property(name="left", data_type=wc.DataType.NUMBER),
wc.Property(name="top", data_type=wc.DataType.NUMBER),
wc.Property(name="width", data_type=wc.DataType.NUMBER),
wc.Property(name="height", data_type=wc.DataType.NUMBER),
], skip_vectorization=True),
# File metadata
wc.Property(name="filename", data_type=wc.DataType.TEXT),
wc.Property(name="filetype", data_type=wc.DataType.TEXT, skip_vectorization=True),
wc.Property(name="languages", data_type=wc.DataType.TEXT_ARRAY, skip_vectorization=True),
wc.Property(name="source_url", data_type=wc.DataType.TEXT, skip_vectorization=True),
],
)
print("Created CardinalDemo collection")
4) Helper functions
Copy
Ask AI
import json, requests, re
from typing import Dict, Any, List, Optional
from urllib.parse import urlparse, quote
from tqdm import tqdm
INCHES_TO_POINTS = 72.0
def list_s3_urls(bucket_with_prefix: str, limit: Optional[int]=None) -> List[str]:
"""Return a list of s3://bucket/key URLs."""
if not bucket_with_prefix or not bucket_with_prefix.startswith("s3://"):
return []
import boto3
s3 = boto3.client(
"s3",
aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
region_name=AWS_REGION
)
parsed = urlparse(bucket_with_prefix)
bucket = parsed.netloc
prefix = parsed.path.lstrip("/")
urls = []
paginator = s3.get_paginator('list_objects_v2')
for page in paginator.paginate(Bucket=bucket, Prefix=prefix):
for obj in page.get("Contents", []):
key = obj["Key"]
if re.search(r"\.(pdf|PDF|png|jpg|jpeg)$", key):
urls.append(f"s3://{bucket}/{key}")
if limit and len(urls) >= limit:
return urls
return urls
def s3_to_https(s3_url: str) -> Optional[str]:
"""Convert s3://bucket/key to public HTTPS URL."""
if not s3_url.startswith("s3://"):
return s3_url
parsed = urlparse(s3_url)
bucket = parsed.netloc
key = parsed.path.lstrip("/")
encoded_key = quote(key, safe='/')
# Try region-specific format
return f"https://{bucket}.s3.{AWS_REGION}.amazonaws.com/{encoded_key}"
def process_with_cardinal(file_url: str) -> Dict[str, Any]:
"""Call Cardinal /rag, which will return Markdown content with their bounding boxes."""
url = f"{CARDINAL_URL.rstrip('/')}/rag"
form = {
"fileUrl": file_url
}
headers = {"x-api-key": CARDINAL_API_KEY}
print(f"Processing: {file_url}")
r = requests.post(url, data=form, headers=headers, timeout=180)
r.raise_for_status()
return r.json()
def bbox_in_to_pts(bbox_in: Dict[str, float]) -> Dict[str, float]:
x = bbox_in["min_x"] * INCHES_TO_POINTS
y = bbox_in["min_y"] * INCHES_TO_POINTS
w = (bbox_in["max_x"] - bbox_in["min_x"]) * INCHES_TO_POINTS
h = (bbox_in["max_y"] - bbox_in["min_y"]) * INCHES_TO_POINTS
return {"x": x, "y": y, "w": w, "h": h}
def bbox_pts_to_norm(bbox_pts: Dict[str, float],
page_w_pts: float,
page_h_pts: float) -> Dict[str, float]:
left = 100.0 * bbox_pts["x"] / max(page_w_pts, 1e-6)
top = 100.0 * bbox_pts["y"] / max(page_h_pts, 1e-6)
width = 100.0 * bbox_pts["w"] / max(page_w_pts, 1e-6)
height = 100.0 * bbox_pts["h"] / max(page_h_pts, 1e-6)
return {"left": left, "top": top, "width": width, "height": height}
def extract_filename_from_url(url: str) -> str:
"""Extract filename from URL"""
if url.startswith('s3://'):
parsed = urlparse(url)
filename = os.path.basename(parsed.path)
else:
parsed = urlparse(url)
filename = os.path.basename(parsed.path)
if not filename or filename == '/':
filename = "unknown.pdf"
return filename
def cardinal_to_weaviate_objects(cardinal_resp: Dict[str, Any],
source_url: str) -> List[Dict[str, Any]]:
"""Convert Cardinal response to Weaviate objects."""
objs = []
pages = cardinal_resp.get("pages", []) or []
for i in range(len(pages)):
p = pages[i]
page_num = i + 1
page_w_pts = float(p.get("width", 0))
page_h_pts = float(p.get("height", 0))
for bb in p.get("bounding_boxes", []) or []:
bbox_in = bb.get("bounding_box") or {}
content = (bb.get("content") or "").strip()
if not content:
continue
bbox_pts = bbox_in_to_pts(bbox_in)
bbox_norm = bbox_pts_to_norm(bbox_pts, page_w_pts, page_h_pts)
# Create properties dictionary - all field names must match the schema exactly
props = {
"text": content,
"type": "paragraph",
"element_id": f"{source_url}#p{page_num}:{hash(content) % (10**9)}",
"page_number": page_num,
"page_width_pts": page_w_pts,
"page_height_pts": page_h_pts,
"bbox_in": {
"min_x": float(bbox_in.get("min_x", 0.0)),
"min_y": float(bbox_in.get("min_y", 0.0)),
"max_x": float(bbox_in.get("max_x", 0.0)),
"max_y": float(bbox_in.get("max_y", 0.0)),
},
"bbox_pts": {
"x": float(bbox_pts["x"]),
"y": float(bbox_pts["y"]),
"w": float(bbox_pts["w"]),
"h": float(bbox_pts["h"])
},
"bbox_norm": {
"left": float(bbox_norm["left"]),
"top": float(bbox_norm["top"]),
"width": float(bbox_norm["width"]),
"height": float(bbox_norm["height"])
},
"filename": extract_filename_from_url(source_url),
"filetype": "pdf",
"languages": ["en"],
"source_url": source_url,
}
# Create the object with properties key for insert_many
obj = {"properties": props}
objs.append(obj)
return objs
5) Get our sample document URL
Here’s where it gets practical—we’ll start with a patient intake form as our sample document. In real workflows, you’d encode intake forms, medical histories, and other records so you can pull the exact details needed to complete a prior authorization request.Copy
Ask AI
# Our sample file
urls = ["s3://public-cardinal-bucket/forms/Sample_Patient_Intake_Form.pdf"]
print(f"Found {len(urls)} forms to process")
if urls:
print("Sample form:", urls[0])
6) Process the form
Copy
Ask AI
all_objs = []
for raw_url in tqdm(urls, desc="Processing form files"):
try:
# Convert S3 URL to HTTPS
https_url = s3_to_https(raw_url) if raw_url.startswith("s3://") else raw_url
if not https_url:
print(f"Skipping invalid URL: {raw_url}")
continue
# Process with Cardinal - this is where the magic happens!
resp = process_with_cardinal(https_url)
# Debug: Check response structure
if not resp.get("pages"):
print(f"Warning: No pages found in response for {raw_url}")
continue
# Convert to Weaviate objects
objects = cardinal_to_weaviate_objects(resp, source_url=raw_url)
print(f"Extracted {len(objects)} form items from {extract_filename_from_url(raw_url)}")
# Debug: Print first object structure
if objects and len(all_objs) == 0: # Only print for first file
print("\nFirst form item structure:")
print(json.dumps(objects[0], indent=2)[:500] + "...")
all_objs.extend(objects)
except Exception as e:
print(f"Error processing {raw_url}: {e}")
continue
print(f"\nTotal form items to insert: {len(all_objs)}")
7) Store all form items in Weaviate
Copy
Ask AI
inserted = 0
if all_objs:
try:
# Use the batch method which is more reliable
with documents.batch.dynamic() as batch:
for obj in all_objs:
# Extract properties from the object
properties = obj["properties"]
batch.add_object(properties=properties)
# Check for errors periodically
if batch.number_errors > 10:
print(f"Stopping due to {batch.number_errors} errors")
break
# Check for failed objects
if documents.batch.failed_objects:
print(f"Failed to insert {len(documents.batch.failed_objects)} objects")
print(f"First failed object: {documents.batch.failed_objects[0]}")
else:
print(f"Successfully inserted all {len(all_objs)} form items!")
except Exception as e:
print(f"Batch insert error: {e}")
print("Trying insert_many method as fallback...")
# Fallback to insert_many
BATCH_SIZE = 50 # Smaller batch size
for i in tqdm(range(0, len(all_objs), BATCH_SIZE), desc="Inserting to Weaviate"):
batch = all_objs[i:i+BATCH_SIZE]
try:
response = documents.data.insert_many(batch)
if response.errors:
print(f"Errors in batch {i//BATCH_SIZE}: {response.errors}")
else:
inserted += len(batch)
except Exception as e:
print(f"Error inserting batch {i//BATCH_SIZE}: {e}")
print(f"Successfully inserted {inserted} objects using insert_many")
else:
print("No objects to insert!")
8) Test your form knowledge base
Time to see if our form search actually works!Copy
Ask AI
print("\n=== Testing Your Prior Auth Knowledge Base ===")
try:
# First, check if there's any data
count_result = documents.aggregate.over_all(total_count=True)
print(f"Total patient intake items in collection: {count_result.total_count}")
if count_result.total_count == 0:
print("No patient intake items found! Check insertion process.")
else:
# Method 1: Try a simple fetch first to verify data structure
print("\n--- Sample patient intake items ---")
sample = documents.query.fetch_objects(
limit=2,
include_vector=False
)
if sample.objects:
for i, obj in enumerate(sample.objects, 1):
print(f"\nPatient Form Item {i}:")
if hasattr(obj, 'properties') and obj.properties:
props = obj.properties
print(f" Text: {props.get('text', 'MISSING')[:100] if props.get('text') else 'MISSING'}...")
print(f" Form Name: {props.get('filename', 'MISSING')}")
print(f" Page: {props.get('page_number', 'MISSING')}")
print(f" Field Type: {props.get('type', 'MISSING')}")
# Check bbox_norm structure
bbox_norm = props.get('bbox_norm', {})
if bbox_norm:
print(f" Location: left={bbox_norm.get('left', 0):.1f}%, "
f"top={bbox_norm.get('top', 0):.1f}%, "
f"width={bbox_norm.get('width', 0):.1f}%, "
f"height={bbox_norm.get('height', 0):.1f}%")
# Method 2: Try semantic search for patient information
print("\n--- Search Results: 'Patient medical history diabetes' ---")
res = documents.query.hybrid(
query="Patient medical history diabetes",
alpha=0.5,
limit=3,
include_vector=False,
return_metadata=['score']
)
if res.objects:
print(f"Found {len(res.objects)} matching patient data entries:")
for i, obj in enumerate(res.objects, 1):
print(f"\n--- Patient Data Result {i} ---")
if hasattr(obj, 'properties') and obj.properties:
props = obj.properties
print(f"Text: {props.get('text', 'MISSING')[:200] if props.get('text') else 'MISSING'}...")
print(f"Form Name: {props.get('filename', 'MISSING')}")
print(f"Page: {props.get('page_number', 'MISSING')}")
# Show search score if available
if hasattr(obj, 'metadata') and obj.metadata:
if hasattr(obj.metadata, 'score') and obj.metadata.score is not None:
print(f"Relevance Score: {obj.metadata.score:.4f}")
else:
print("No results found for 'Patient medical history diabetes' search.")
except Exception as e:
print(f"Query error: {e}")
import traceback
traceback.print_exc()
# Clean up connection
client.close()
What You Just Built
Congratulations! You’ve created a searchable prior authorization knowledge base from patient intake PDFs. Your system can now:- Find specific patient information using natural language search
- Trace results back to exact locations on the original forms
- Handle multiple coordinate systems for different display needs
- Scale to process hundreds of patient forms with the same pipeline