TLDR

This tutorial shows you how to build an AI-powered search system for revenue cycle management workflows. You’ll upload EOBs, billing statements, claims documents, and financial records, extract all the text with precise locations, and create a searchable knowledge base. When you need to resolve billing disputes, track claim statuses, or analyze payment patterns, you can instantly search across all your RCM data to find relevant payment information, denial reasons, procedure codes, and other financial details—complete with citations showing exactly where each piece of information came from in the original documents.

Building Your Financial Knowledge Base: Revenue Cycle Management Edition

Let’s begin with a practical example—processing revenue cycle management requests. This will serve as your proof of concept. At a high level, we’ll use the Cardinal API to extract text along with its bounding boxes from scanned PDFs. Those extracted chunks will then be embedded and stored in Weaviate’s vector database. From there, you can query Weaviate to retrieve the most relevant results for any search.

0) Install dependencies

# Install dependencies
!pip install -q "weaviate-client==4.6.5" python-dotenv boto3 requests tqdm pandas pillow PyMuPDF pdf2image

1) Load environment variables & Connect to Weaviate

from google.colab import drive
drive.mount('/content/drive')

import os, dotenv
dotenv.load_dotenv('/content/drive/MyDrive/.env')

# Cardinal
CARDINAL_URL = os.getenv("CARDINAL_URL", "https://api.trycardinal.ai")
CARDINAL_API_KEY = os.getenv("CARDINAL_API_KEY")

# Weaviate
WEAVIATE_URL = os.getenv("WEAVIATE_URL")
WEAVIATE_API_KEY = os.getenv("WEAVIATE_API_KEY")

# OpenAI (for Weaviate to use for vectorizer/generative)
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")

# S3 (optional - we'll use this for our sample file)
AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID") or os.getenv("AWS_KEY")
AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_ACCESS_KEY") or os.getenv("AWS_SECRET")
AWS_S3_BUCKET = os.getenv("AWS_S3_BUCKET") or os.getenv("AWS_S3_NAME")
S3_PREFIX = os.getenv("S3_PREFIX", "")

2) Connect to Weaviate

import weaviate
import weaviate.classes.config as wc
import os

# Check Weaviate client version
print(f"Weaviate client version: {weaviate.__version__}")

client = weaviate.connect_to_wcs(
    cluster_url=WEAVIATE_URL,
    auth_credentials=weaviate.auth.AuthApiKey(WEAVIATE_API_KEY),
    headers={"X-OpenAI-Api-Key": OPENAI_API_KEY} if OPENAI_API_KEY else {}
)
print("Connected to Weaviate:", client.is_ready())

3) Create a Weaviate collection

We’re calling it “CardinalDemo”:
try:
    client.collections.delete("CardinalDemo")
    print("Deleted existing CardinalDemo collection")
except Exception as e:
    print(f"No existing collection to delete: {e}")

# Create new collection
documents = client.collections.create(
    name="CardinalDemo",
    vectorizer_config=wc.Configure.Vectorizer.text2vec_openai(
        model="text-embedding-3-small",
        type_="text"
    ),
    generative_config=wc.Configure.Generative.openai(
        model="gpt-4"
    ),
    properties=[
        wc.Property(name="text", data_type=wc.DataType.TEXT),
        wc.Property(name="type", data_type=wc.DataType.TEXT),
        wc.Property(name="element_id", data_type=wc.DataType.TEXT, skip_vectorization=True),
        wc.Property(name="page_number", data_type=wc.DataType.INT),
        wc.Property(name="page_width_pts", data_type=wc.DataType.NUMBER, skip_vectorization=True),
        wc.Property(name="page_height_pts", data_type=wc.DataType.NUMBER, skip_vectorization=True),

        # Original bbox (inches) - Cardinal's native format
        wc.Property(name="bbox_in", data_type=wc.DataType.OBJECT, nested_properties=[
            wc.Property(name="min_x", data_type=wc.DataType.NUMBER),
            wc.Property(name="min_y", data_type=wc.DataType.NUMBER),
            wc.Property(name="max_x", data_type=wc.DataType.NUMBER),
            wc.Property(name="max_y", data_type=wc.DataType.NUMBER),
        ], skip_vectorization=True),

        # Derived bbox (points) - standard PDF coordinates
        wc.Property(name="bbox_pts", data_type=wc.DataType.OBJECT, nested_properties=[
            wc.Property(name="x", data_type=wc.DataType.NUMBER),
            wc.Property(name="y", data_type=wc.DataType.NUMBER),
            wc.Property(name="w", data_type=wc.DataType.NUMBER),
            wc.Property(name="h", data_type=wc.DataType.NUMBER),
        ], skip_vectorization=True),

        # Normalized bbox (%) - perfect for web overlays
        wc.Property(name="bbox_norm", data_type=wc.DataType.OBJECT, nested_properties=[
            wc.Property(name="left", data_type=wc.DataType.NUMBER),
            wc.Property(name="top", data_type=wc.DataType.NUMBER),
            wc.Property(name="width", data_type=wc.DataType.NUMBER),
            wc.Property(name="height", data_type=wc.DataType.NUMBER),
        ], skip_vectorization=True),

        # File metadata
        wc.Property(name="filename", data_type=wc.DataType.TEXT),
        wc.Property(name="filetype", data_type=wc.DataType.TEXT, skip_vectorization=True),
        wc.Property(name="languages", data_type=wc.DataType.TEXT_ARRAY, skip_vectorization=True),
        wc.Property(name="source_url", data_type=wc.DataType.TEXT, skip_vectorization=True),
    ],
)

print("Created CardinalDemo collection")

4) Helper functions

import json, requests, re
from typing import Dict, Any, List, Optional
from urllib.parse import urlparse, quote
from tqdm import tqdm

INCHES_TO_POINTS = 72.0

def list_s3_urls(bucket_with_prefix: str, limit: Optional[int]=None) -> List[str]:
    """Return a list of s3://bucket/key URLs."""
    if not bucket_with_prefix or not bucket_with_prefix.startswith("s3://"):
        return []

    import boto3
    s3 = boto3.client(
        "s3",
        aws_access_key_id=AWS_ACCESS_KEY_ID,
        aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
        region_name=AWS_REGION
    )
    parsed = urlparse(bucket_with_prefix)
    bucket = parsed.netloc
    prefix = parsed.path.lstrip("/")

    urls = []
    paginator = s3.get_paginator('list_objects_v2')
    for page in paginator.paginate(Bucket=bucket, Prefix=prefix):
        for obj in page.get("Contents", []):
            key = obj["Key"]
            if re.search(r"\.(pdf|PDF|png|jpg|jpeg)$", key):
                urls.append(f"s3://{bucket}/{key}")
                if limit and len(urls) >= limit:
                    return urls
    return urls

def s3_to_https(s3_url: str) -> Optional[str]:
    """Convert s3://bucket/key to public HTTPS URL."""
    if not s3_url.startswith("s3://"):
        return s3_url

    parsed = urlparse(s3_url)
    bucket = parsed.netloc
    key = parsed.path.lstrip("/")
    encoded_key = quote(key, safe='/')

    # Try region-specific format
    return f"https://{bucket}.s3.{AWS_REGION}.amazonaws.com/{encoded_key}"

def process_with_cardinal(file_url: str) -> Dict[str, Any]:
    """Call Cardinal /rag, which will return Markdown content with their bounding boxes."""
    url = f"{CARDINAL_URL.rstrip('/')}/rag"
    form = {
        "fileUrl": file_url
    }
    headers = {"x-api-key": CARDINAL_API_KEY}

    print(f"Processing: {file_url}")
    r = requests.post(url, data=form, headers=headers, timeout=180)
    r.raise_for_status()
    return r.json()

def bbox_in_to_pts(bbox_in: Dict[str, float]) -> Dict[str, float]:
    x = bbox_in["min_x"] * INCHES_TO_POINTS
    y = bbox_in["min_y"] * INCHES_TO_POINTS
    w = (bbox_in["max_x"] - bbox_in["min_x"]) * INCHES_TO_POINTS
    h = (bbox_in["max_y"] - bbox_in["min_y"]) * INCHES_TO_POINTS
    return {"x": x, "y": y, "w": w, "h": h}

def bbox_pts_to_norm(bbox_pts: Dict[str, float],
                     page_w_pts: float,
                     page_h_pts: float) -> Dict[str, float]:
    left = 100.0 * bbox_pts["x"] / max(page_w_pts, 1e-6)
    top = 100.0 * bbox_pts["y"] / max(page_h_pts, 1e-6)
    width = 100.0 * bbox_pts["w"] / max(page_w_pts, 1e-6)
    height = 100.0 * bbox_pts["h"] / max(page_h_pts, 1e-6)
    return {"left": left, "top": top, "width": width, "height": height}

def extract_filename_from_url(url: str) -> str:
    """Extract filename from URL"""
    if url.startswith('s3://'):
        parsed = urlparse(url)
        filename = os.path.basename(parsed.path)
    else:
        parsed = urlparse(url)
        filename = os.path.basename(parsed.path)

    if not filename or filename == '/':
        filename = "unknown.pdf"

    return filename

def cardinal_to_weaviate_objects(cardinal_resp: Dict[str, Any],
                                 source_url: str) -> List[Dict[str, Any]]:
    """Convert Cardinal response to Weaviate objects."""
    objs = []
    pages = cardinal_resp.get("pages", []) or []

    for i in range(len(pages)):
        p = pages[i]
        page_num = i + 1
        page_w_pts = float(p.get("width", 0))
        page_h_pts = float(p.get("height", 0))

        for bb in p.get("bounding_boxes", []) or []:
            bbox_in = bb.get("bounding_box") or {}
            content = (bb.get("content") or "").strip()
            if not content:
                continue

            bbox_pts = bbox_in_to_pts(bbox_in)
            bbox_norm = bbox_pts_to_norm(bbox_pts, page_w_pts, page_h_pts)

            # Create properties dictionary - all field names must match the schema exactly
            props = {
                "text": content,
                "type": "paragraph",
                "element_id": f"{source_url}#p{page_num}:{hash(content) % (10**9)}",
                "page_number": page_num,
                "page_width_pts": page_w_pts,
                "page_height_pts": page_h_pts,
                "bbox_in": {
                    "min_x": float(bbox_in.get("min_x", 0.0)),
                    "min_y": float(bbox_in.get("min_y", 0.0)),
                    "max_x": float(bbox_in.get("max_x", 0.0)),
                    "max_y": float(bbox_in.get("max_y", 0.0)),
                },
                "bbox_pts": {
                    "x": float(bbox_pts["x"]),
                    "y": float(bbox_pts["y"]),
                    "w": float(bbox_pts["w"]),
                    "h": float(bbox_pts["h"])
                },
                "bbox_norm": {
                    "left": float(bbox_norm["left"]),
                    "top": float(bbox_norm["top"]),
                    "width": float(bbox_norm["width"]),
                    "height": float(bbox_norm["height"])
                },
                "filename": extract_filename_from_url(source_url),
                "filetype": "pdf",
                "languages": ["en"],
                "source_url": source_url,
            }

            # Create the object with properties key for insert_many
            obj = {"properties": props}
            objs.append(obj)

    return objs

5) Get our sample document URL

Here’s where it gets practical—we’ll start with a billing statement as our sample document. In real workflows, you’d encode EOBs, claims documents, payment records, and denial letters so you can pull the exact financial details needed to resolve billing disputes or track revenue patterns.
# Our sample file
urls = ["s3://public-cardinal-bucket/billing/Sample_EOB_Statement.pdf"]

print(f"Found {len(urls)} billing documents to process")
if urls:
    print("Sample billing document:", urls[0])

6) Process the billing documents

all_objs = []

for raw_url in tqdm(urls, desc="Processing billing documents"):
    try:
        # Convert S3 URL to HTTPS
        https_url = s3_to_https(raw_url) if raw_url.startswith("s3://") else raw_url
        if not https_url:
            print(f"Skipping invalid URL: {raw_url}")
            continue

        # Process with Cardinal - this is where the magic happens!
        resp = process_with_cardinal(https_url)

        # Debug: Check response structure
        if not resp.get("pages"):
            print(f"Warning: No pages found in response for {raw_url}")
            continue

        # Convert to Weaviate objects
        objects = cardinal_to_weaviate_objects(resp, source_url=raw_url)
        print(f"Extracted {len(objects)} billing items from {extract_filename_from_url(raw_url)}")

        # Debug: Print first object structure
        if objects and len(all_objs) == 0:  # Only print for first file
            print("\nFirst billing item structure:")
            print(json.dumps(objects[0], indent=2)[:500] + "...")

        all_objs.extend(objects)

    except Exception as e:
        print(f"Error processing {raw_url}: {e}")
        continue

print(f"\nTotal billing items to insert: {len(all_objs)}")

7) Store all billing items in Weaviate

inserted = 0

if all_objs:
    try:
        # Use the batch method which is more reliable
        with documents.batch.dynamic() as batch:
            for obj in all_objs:
                # Extract properties from the object
                properties = obj["properties"]
                batch.add_object(properties=properties)

                # Check for errors periodically
                if batch.number_errors > 10:
                    print(f"Stopping due to {batch.number_errors} errors")
                    break

        # Check for failed objects
        if documents.batch.failed_objects:
            print(f"Failed to insert {len(documents.batch.failed_objects)} objects")
            print(f"First failed object: {documents.batch.failed_objects[0]}")
        else:
            print(f"Successfully inserted all {len(all_objs)} billing items!")

    except Exception as e:
        print(f"Batch insert error: {e}")
        print("Trying insert_many method as fallback...")

        # Fallback to insert_many
        BATCH_SIZE = 50  # Smaller batch size
        for i in tqdm(range(0, len(all_objs), BATCH_SIZE), desc="Inserting to Weaviate"):
            batch = all_objs[i:i+BATCH_SIZE]
            try:
                response = documents.data.insert_many(batch)
                if response.errors:
                    print(f"Errors in batch {i//BATCH_SIZE}: {response.errors}")
                else:
                    inserted += len(batch)
            except Exception as e:
                print(f"Error inserting batch {i//BATCH_SIZE}: {e}")

        print(f"Successfully inserted {inserted} objects using insert_many")
else:
    print("No objects to insert!")

8) Test your RCM knowledge base

Time to see if our billing search actually works!
print("\n=== Testing Your RCM Knowledge Base ===")
try:
    # First, check if there's any data
    count_result = documents.aggregate.over_all(total_count=True)
    print(f"Total billing items in collection: {count_result.total_count}")
    if count_result.total_count == 0:
        print("No billing items found! Check insertion process.")
    else:
        # Method 1: Try a simple fetch first to verify data structure
        print("\n--- Sample billing items ---")
        sample = documents.query.fetch_objects(
            limit=2,
            include_vector=False
        )
        if sample.objects:
            for i, obj in enumerate(sample.objects, 1):
                print(f"\nBilling Item {i}:")
                if hasattr(obj, 'properties') and obj.properties:
                    props = obj.properties
                    print(f"  Text: {props.get('text', 'MISSING')[:100] if props.get('text') else 'MISSING'}...")
                    print(f"  Document Name: {props.get('filename', 'MISSING')}")
                    print(f"  Page: {props.get('page_number', 'MISSING')}")
                    print(f"  Field Type: {props.get('type', 'MISSING')}")
                    # Check bbox_norm structure
                    bbox_norm = props.get('bbox_norm', {})
                    if bbox_norm:
                        print(f"  Location: left={bbox_norm.get('left', 0):.1f}%, "
                              f"top={bbox_norm.get('top', 0):.1f}%, "
                              f"width={bbox_norm.get('width', 0):.1f}%, "
                              f"height={bbox_norm.get('height', 0):.1f}%")

        # Method 2: Try semantic search for billing information
        print("\n--- Search Results: 'claim denial payment adjustment' ---")
        res = documents.query.hybrid(
            query="claim denial payment adjustment",
            alpha=0.5,
            limit=3,
            include_vector=False,
            return_metadata=['score']
        )
        if res.objects:
            print(f"Found {len(res.objects)} matching billing entries:")
            for i, obj in enumerate(res.objects, 1):
                print(f"\n--- Billing Result {i} ---")
                if hasattr(obj, 'properties') and obj.properties:
                    props = obj.properties
                    print(f"Text: {props.get('text', 'MISSING')[:200] if props.get('text') else 'MISSING'}...")
                    print(f"Document Name: {props.get('filename', 'MISSING')}")
                    print(f"Page: {props.get('page_number', 'MISSING')}")
                    # Show search score if available
                    if hasattr(obj, 'metadata') and obj.metadata:
                        if hasattr(obj.metadata, 'score') and obj.metadata.score is not None:
                            print(f"Relevance Score: {obj.metadata.score:.4f}")
        else:
            print("No results found for 'claim denial payment adjustment' search.")

except Exception as e:
    print(f"Query error: {e}")
    import traceback
    traceback.print_exc()

# Clean up connection
client.close()

What You Just Built

Congratulations! You’ve created a searchable revenue cycle management knowledge base from billing PDFs. Your system can now:
  • Find specific payment information using natural language search
  • Trace results back to exact locations on the original documents
  • Handle multiple coordinate systems for different display needs
  • Scale to process hundreds of billing documents with the same pipeline
Your billing statements are just the beginning - imagine having every EOB, denial letter, and payment record searchable in seconds to streamline revenue cycle operations!