TLDR
This tutorial shows you how to build an AI-powered search system for revenue cycle management workflows. You’ll upload EOBs, billing statements, claims documents, and financial records, extract all the text with precise locations, and create a searchable knowledge base. When you need to resolve billing disputes, track claim statuses, or analyze payment patterns, you can instantly search across all your RCM data to find relevant payment information, denial reasons, procedure codes, and other financial details—complete with citations showing exactly where each piece of information came from in the original documents.Building Your Financial Knowledge Base: Revenue Cycle Management Edition
Let’s begin with a practical example—processing revenue cycle management requests. This will serve as your proof of concept. At a high level, we’ll use the Cardinal API to extract text along with its bounding boxes from scanned PDFs. Those extracted chunks will then be embedded and stored in Weaviate’s vector database. From there, you can query Weaviate to retrieve the most relevant results for any search.0) Install dependencies
Copy
Ask AI
# Install dependencies
!pip install -q "weaviate-client==4.6.5" python-dotenv boto3 requests tqdm pandas pillow PyMuPDF pdf2image
1) Load environment variables & Connect to Weaviate
Copy
Ask AI
from google.colab import drive
drive.mount('/content/drive')
import os, dotenv
dotenv.load_dotenv('/content/drive/MyDrive/.env')
# Cardinal
CARDINAL_URL = os.getenv("CARDINAL_URL", "https://api.trycardinal.ai")
CARDINAL_API_KEY = os.getenv("CARDINAL_API_KEY")
# Weaviate
WEAVIATE_URL = os.getenv("WEAVIATE_URL")
WEAVIATE_API_KEY = os.getenv("WEAVIATE_API_KEY")
# OpenAI (for Weaviate to use for vectorizer/generative)
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
# S3 (optional - we'll use this for our sample file)
AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID") or os.getenv("AWS_KEY")
AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_ACCESS_KEY") or os.getenv("AWS_SECRET")
AWS_S3_BUCKET = os.getenv("AWS_S3_BUCKET") or os.getenv("AWS_S3_NAME")
S3_PREFIX = os.getenv("S3_PREFIX", "")
2) Connect to Weaviate
Copy
Ask AI
import weaviate
import weaviate.classes.config as wc
import os
# Check Weaviate client version
print(f"Weaviate client version: {weaviate.__version__}")
client = weaviate.connect_to_wcs(
cluster_url=WEAVIATE_URL,
auth_credentials=weaviate.auth.AuthApiKey(WEAVIATE_API_KEY),
headers={"X-OpenAI-Api-Key": OPENAI_API_KEY} if OPENAI_API_KEY else {}
)
print("Connected to Weaviate:", client.is_ready())
3) Create a Weaviate collection
We’re calling it “CardinalDemo”:Copy
Ask AI
try:
client.collections.delete("CardinalDemo")
print("Deleted existing CardinalDemo collection")
except Exception as e:
print(f"No existing collection to delete: {e}")
# Create new collection
documents = client.collections.create(
name="CardinalDemo",
vectorizer_config=wc.Configure.Vectorizer.text2vec_openai(
model="text-embedding-3-small",
type_="text"
),
generative_config=wc.Configure.Generative.openai(
model="gpt-4"
),
properties=[
wc.Property(name="text", data_type=wc.DataType.TEXT),
wc.Property(name="type", data_type=wc.DataType.TEXT),
wc.Property(name="element_id", data_type=wc.DataType.TEXT, skip_vectorization=True),
wc.Property(name="page_number", data_type=wc.DataType.INT),
wc.Property(name="page_width_pts", data_type=wc.DataType.NUMBER, skip_vectorization=True),
wc.Property(name="page_height_pts", data_type=wc.DataType.NUMBER, skip_vectorization=True),
# Original bbox (inches) - Cardinal's native format
wc.Property(name="bbox_in", data_type=wc.DataType.OBJECT, nested_properties=[
wc.Property(name="min_x", data_type=wc.DataType.NUMBER),
wc.Property(name="min_y", data_type=wc.DataType.NUMBER),
wc.Property(name="max_x", data_type=wc.DataType.NUMBER),
wc.Property(name="max_y", data_type=wc.DataType.NUMBER),
], skip_vectorization=True),
# Derived bbox (points) - standard PDF coordinates
wc.Property(name="bbox_pts", data_type=wc.DataType.OBJECT, nested_properties=[
wc.Property(name="x", data_type=wc.DataType.NUMBER),
wc.Property(name="y", data_type=wc.DataType.NUMBER),
wc.Property(name="w", data_type=wc.DataType.NUMBER),
wc.Property(name="h", data_type=wc.DataType.NUMBER),
], skip_vectorization=True),
# Normalized bbox (%) - perfect for web overlays
wc.Property(name="bbox_norm", data_type=wc.DataType.OBJECT, nested_properties=[
wc.Property(name="left", data_type=wc.DataType.NUMBER),
wc.Property(name="top", data_type=wc.DataType.NUMBER),
wc.Property(name="width", data_type=wc.DataType.NUMBER),
wc.Property(name="height", data_type=wc.DataType.NUMBER),
], skip_vectorization=True),
# File metadata
wc.Property(name="filename", data_type=wc.DataType.TEXT),
wc.Property(name="filetype", data_type=wc.DataType.TEXT, skip_vectorization=True),
wc.Property(name="languages", data_type=wc.DataType.TEXT_ARRAY, skip_vectorization=True),
wc.Property(name="source_url", data_type=wc.DataType.TEXT, skip_vectorization=True),
],
)
print("Created CardinalDemo collection")
4) Helper functions
Copy
Ask AI
import json, requests, re
from typing import Dict, Any, List, Optional
from urllib.parse import urlparse, quote
from tqdm import tqdm
INCHES_TO_POINTS = 72.0
def list_s3_urls(bucket_with_prefix: str, limit: Optional[int]=None) -> List[str]:
"""Return a list of s3://bucket/key URLs."""
if not bucket_with_prefix or not bucket_with_prefix.startswith("s3://"):
return []
import boto3
s3 = boto3.client(
"s3",
aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
region_name=AWS_REGION
)
parsed = urlparse(bucket_with_prefix)
bucket = parsed.netloc
prefix = parsed.path.lstrip("/")
urls = []
paginator = s3.get_paginator('list_objects_v2')
for page in paginator.paginate(Bucket=bucket, Prefix=prefix):
for obj in page.get("Contents", []):
key = obj["Key"]
if re.search(r"\.(pdf|PDF|png|jpg|jpeg)$", key):
urls.append(f"s3://{bucket}/{key}")
if limit and len(urls) >= limit:
return urls
return urls
def s3_to_https(s3_url: str) -> Optional[str]:
"""Convert s3://bucket/key to public HTTPS URL."""
if not s3_url.startswith("s3://"):
return s3_url
parsed = urlparse(s3_url)
bucket = parsed.netloc
key = parsed.path.lstrip("/")
encoded_key = quote(key, safe='/')
# Try region-specific format
return f"https://{bucket}.s3.{AWS_REGION}.amazonaws.com/{encoded_key}"
def process_with_cardinal(file_url: str) -> Dict[str, Any]:
"""Call Cardinal /rag, which will return Markdown content with their bounding boxes."""
url = f"{CARDINAL_URL.rstrip('/')}/rag"
form = {
"fileUrl": file_url
}
headers = {"x-api-key": CARDINAL_API_KEY}
print(f"Processing: {file_url}")
r = requests.post(url, data=form, headers=headers, timeout=180)
r.raise_for_status()
return r.json()
def bbox_in_to_pts(bbox_in: Dict[str, float]) -> Dict[str, float]:
x = bbox_in["min_x"] * INCHES_TO_POINTS
y = bbox_in["min_y"] * INCHES_TO_POINTS
w = (bbox_in["max_x"] - bbox_in["min_x"]) * INCHES_TO_POINTS
h = (bbox_in["max_y"] - bbox_in["min_y"]) * INCHES_TO_POINTS
return {"x": x, "y": y, "w": w, "h": h}
def bbox_pts_to_norm(bbox_pts: Dict[str, float],
page_w_pts: float,
page_h_pts: float) -> Dict[str, float]:
left = 100.0 * bbox_pts["x"] / max(page_w_pts, 1e-6)
top = 100.0 * bbox_pts["y"] / max(page_h_pts, 1e-6)
width = 100.0 * bbox_pts["w"] / max(page_w_pts, 1e-6)
height = 100.0 * bbox_pts["h"] / max(page_h_pts, 1e-6)
return {"left": left, "top": top, "width": width, "height": height}
def extract_filename_from_url(url: str) -> str:
"""Extract filename from URL"""
if url.startswith('s3://'):
parsed = urlparse(url)
filename = os.path.basename(parsed.path)
else:
parsed = urlparse(url)
filename = os.path.basename(parsed.path)
if not filename or filename == '/':
filename = "unknown.pdf"
return filename
def cardinal_to_weaviate_objects(cardinal_resp: Dict[str, Any],
source_url: str) -> List[Dict[str, Any]]:
"""Convert Cardinal response to Weaviate objects."""
objs = []
pages = cardinal_resp.get("pages", []) or []
for i in range(len(pages)):
p = pages[i]
page_num = i + 1
page_w_pts = float(p.get("width", 0))
page_h_pts = float(p.get("height", 0))
for bb in p.get("bounding_boxes", []) or []:
bbox_in = bb.get("bounding_box") or {}
content = (bb.get("content") or "").strip()
if not content:
continue
bbox_pts = bbox_in_to_pts(bbox_in)
bbox_norm = bbox_pts_to_norm(bbox_pts, page_w_pts, page_h_pts)
# Create properties dictionary - all field names must match the schema exactly
props = {
"text": content,
"type": "paragraph",
"element_id": f"{source_url}#p{page_num}:{hash(content) % (10**9)}",
"page_number": page_num,
"page_width_pts": page_w_pts,
"page_height_pts": page_h_pts,
"bbox_in": {
"min_x": float(bbox_in.get("min_x", 0.0)),
"min_y": float(bbox_in.get("min_y", 0.0)),
"max_x": float(bbox_in.get("max_x", 0.0)),
"max_y": float(bbox_in.get("max_y", 0.0)),
},
"bbox_pts": {
"x": float(bbox_pts["x"]),
"y": float(bbox_pts["y"]),
"w": float(bbox_pts["w"]),
"h": float(bbox_pts["h"])
},
"bbox_norm": {
"left": float(bbox_norm["left"]),
"top": float(bbox_norm["top"]),
"width": float(bbox_norm["width"]),
"height": float(bbox_norm["height"])
},
"filename": extract_filename_from_url(source_url),
"filetype": "pdf",
"languages": ["en"],
"source_url": source_url,
}
# Create the object with properties key for insert_many
obj = {"properties": props}
objs.append(obj)
return objs
5) Get our sample document URL
Here’s where it gets practical—we’ll start with a billing statement as our sample document. In real workflows, you’d encode EOBs, claims documents, payment records, and denial letters so you can pull the exact financial details needed to resolve billing disputes or track revenue patterns.Copy
Ask AI
# Our sample file
urls = ["s3://public-cardinal-bucket/billing/Sample_EOB_Statement.pdf"]
print(f"Found {len(urls)} billing documents to process")
if urls:
print("Sample billing document:", urls[0])
6) Process the billing documents
Copy
Ask AI
all_objs = []
for raw_url in tqdm(urls, desc="Processing billing documents"):
try:
# Convert S3 URL to HTTPS
https_url = s3_to_https(raw_url) if raw_url.startswith("s3://") else raw_url
if not https_url:
print(f"Skipping invalid URL: {raw_url}")
continue
# Process with Cardinal - this is where the magic happens!
resp = process_with_cardinal(https_url)
# Debug: Check response structure
if not resp.get("pages"):
print(f"Warning: No pages found in response for {raw_url}")
continue
# Convert to Weaviate objects
objects = cardinal_to_weaviate_objects(resp, source_url=raw_url)
print(f"Extracted {len(objects)} billing items from {extract_filename_from_url(raw_url)}")
# Debug: Print first object structure
if objects and len(all_objs) == 0: # Only print for first file
print("\nFirst billing item structure:")
print(json.dumps(objects[0], indent=2)[:500] + "...")
all_objs.extend(objects)
except Exception as e:
print(f"Error processing {raw_url}: {e}")
continue
print(f"\nTotal billing items to insert: {len(all_objs)}")
7) Store all billing items in Weaviate
Copy
Ask AI
inserted = 0
if all_objs:
try:
# Use the batch method which is more reliable
with documents.batch.dynamic() as batch:
for obj in all_objs:
# Extract properties from the object
properties = obj["properties"]
batch.add_object(properties=properties)
# Check for errors periodically
if batch.number_errors > 10:
print(f"Stopping due to {batch.number_errors} errors")
break
# Check for failed objects
if documents.batch.failed_objects:
print(f"Failed to insert {len(documents.batch.failed_objects)} objects")
print(f"First failed object: {documents.batch.failed_objects[0]}")
else:
print(f"Successfully inserted all {len(all_objs)} billing items!")
except Exception as e:
print(f"Batch insert error: {e}")
print("Trying insert_many method as fallback...")
# Fallback to insert_many
BATCH_SIZE = 50 # Smaller batch size
for i in tqdm(range(0, len(all_objs), BATCH_SIZE), desc="Inserting to Weaviate"):
batch = all_objs[i:i+BATCH_SIZE]
try:
response = documents.data.insert_many(batch)
if response.errors:
print(f"Errors in batch {i//BATCH_SIZE}: {response.errors}")
else:
inserted += len(batch)
except Exception as e:
print(f"Error inserting batch {i//BATCH_SIZE}: {e}")
print(f"Successfully inserted {inserted} objects using insert_many")
else:
print("No objects to insert!")
8) Test your RCM knowledge base
Time to see if our billing search actually works!Copy
Ask AI
print("\n=== Testing Your RCM Knowledge Base ===")
try:
# First, check if there's any data
count_result = documents.aggregate.over_all(total_count=True)
print(f"Total billing items in collection: {count_result.total_count}")
if count_result.total_count == 0:
print("No billing items found! Check insertion process.")
else:
# Method 1: Try a simple fetch first to verify data structure
print("\n--- Sample billing items ---")
sample = documents.query.fetch_objects(
limit=2,
include_vector=False
)
if sample.objects:
for i, obj in enumerate(sample.objects, 1):
print(f"\nBilling Item {i}:")
if hasattr(obj, 'properties') and obj.properties:
props = obj.properties
print(f" Text: {props.get('text', 'MISSING')[:100] if props.get('text') else 'MISSING'}...")
print(f" Document Name: {props.get('filename', 'MISSING')}")
print(f" Page: {props.get('page_number', 'MISSING')}")
print(f" Field Type: {props.get('type', 'MISSING')}")
# Check bbox_norm structure
bbox_norm = props.get('bbox_norm', {})
if bbox_norm:
print(f" Location: left={bbox_norm.get('left', 0):.1f}%, "
f"top={bbox_norm.get('top', 0):.1f}%, "
f"width={bbox_norm.get('width', 0):.1f}%, "
f"height={bbox_norm.get('height', 0):.1f}%")
# Method 2: Try semantic search for billing information
print("\n--- Search Results: 'claim denial payment adjustment' ---")
res = documents.query.hybrid(
query="claim denial payment adjustment",
alpha=0.5,
limit=3,
include_vector=False,
return_metadata=['score']
)
if res.objects:
print(f"Found {len(res.objects)} matching billing entries:")
for i, obj in enumerate(res.objects, 1):
print(f"\n--- Billing Result {i} ---")
if hasattr(obj, 'properties') and obj.properties:
props = obj.properties
print(f"Text: {props.get('text', 'MISSING')[:200] if props.get('text') else 'MISSING'}...")
print(f"Document Name: {props.get('filename', 'MISSING')}")
print(f"Page: {props.get('page_number', 'MISSING')}")
# Show search score if available
if hasattr(obj, 'metadata') and obj.metadata:
if hasattr(obj.metadata, 'score') and obj.metadata.score is not None:
print(f"Relevance Score: {obj.metadata.score:.4f}")
else:
print("No results found for 'claim denial payment adjustment' search.")
except Exception as e:
print(f"Query error: {e}")
import traceback
traceback.print_exc()
# Clean up connection
client.close()
What You Just Built
Congratulations! You’ve created a searchable revenue cycle management knowledge base from billing PDFs. Your system can now:- Find specific payment information using natural language search
- Trace results back to exact locations on the original documents
- Handle multiple coordinate systems for different display needs
- Scale to process hundreds of billing documents with the same pipeline