Skip to main content
This guide serves as the foundation for a workshop on Retrieval-Augmented Generation (RAG). You will learn how to build a RAG system using cycls, chromadb, and openai, starting with a basic implementation and evolving it into an “Agentic” RAG that handles file uploads and dynamic visualization tools. You will learn how to:
  • Build a Basic RAG system with static documents.
  • Create an Attachment Handler to process user uploads (PDFs, etc.).
  • Implement an Agentic RAG that indexes content dynamically.
  • Add Tooling capabilities to generate interactive charts.

Prerequisites

  • Python 3.8+
  • cycls package installed
  • OpenAI API Key
  • Docker installed (for local testing)
pip install cycls

Part 1: The Basic RAG

We start by creating a simple RAG system. This agent will have a pre-defined “knowledge base” hardcoded into it. It demonstrates the core loop of RAG: Embed -> Store -> Retrieve -> Generate. Create a file named basicrag.py.

Step 1.1: Setup and Dependencies

We initialize the agent with chromadb (vector database) and openai (embeddings and generation).
import cycls

# Initialize the agent with dependencies
agent = cycls.Agent(
    pip=["chromadb", "openai", "python-dotenv"],
    copy=[".env"]
)

Step 1.2: The Core Logic

The handler function performs the RAG operations. Note that in this basic version, documents are hardcoded.
@agent("chroma-agent", title="RAG Agent")
async def search_agent(context):
    import os
    from dotenv import load_dotenv
    import chromadb
    from chromadb.utils import embedding_functions

    load_dotenv()
    
    # 1. Setup OpenAI Embedding Function
    openai_ef = embedding_functions.OpenAIEmbeddingFunction(
        api_key=os.getenv("OPENAI_API_KEY"),
        model_name="text-embedding-3-small"
    )
    
    # 2. Initialize ChromaDB (Ephemeral in-memory for this demo)
    client = chromadb.Client()
    collection = client.get_or_create_collection(name="docs", embedding_function=openai_ef)

    # 3. Ingest Static Knowledge
    # In a real app, this would come from a database or file loader
    collection.add(
        documents=["I love cats", "I love dogs", "The weather is nice"],
        ids=["1", "2", "3"]
    )

    # 4. Retrieve Context
    query = context.messages[-1]["content"]
    results = collection.query(query_texts=[query], n_results=1)
    retrieved_doc = results['documents'][0][0]

    # 5. Generate Response (Simple yield for demo)
    yield f"Context: {retrieved_doc}"

# Run locally
agent.deploy(prod=False)
Why this is “Basic”:
  • The knowledge base is static and ephemeral (re-created on every run).
  • It cannot handle user files or new data.
  • It strictly retrieves text; it doesn’t do anything with it other than display it.

Part 2: Evolving to Agentic RAG

Now we move to agenticrag.py. An “Agentic” RAG doesn’t just look up info; it interacts with the environment. It will:
  1. Read files you upload (PDFs, etc.).
  2. Index them on the fly.
  3. Decide whether to answer with text or generate a visualization (Tool Use).

Helper Module: attach.py

To keep our agent clean, we move complex file handling to attach.py. This module handles:
  • Downloading files from Cycls URLs.
  • Extracting text from PDFs (PyPDF2).
  • Formatting messages for the LLM.
Ensure attach.py is in the same directory.

Step 2.1: Agent Configuration

We need more dependencies now, including PyPDF2 for parsing and httpx for downloading.
agent = cycls.Agent(
    pip=["chromadb", "openai", "python-dotenv", "httpx", "PyPDF2"],
    copy=[".env", "attach.py"] # detailed copy instruction
)

Step 2.2: Dynamic Indexing

Instead of hardcoded strings, we process the incoming message to find file content.
# Inside agenticrag.py handler...

# Process Attachments using our helper
processed = await attach.process_messages_for_openai([context.messages[-1]])
last_msg_content = processed[0]['content']
full_text = last_msg_content if isinstance(last_msg_content, str) else last_msg_content[0]['text']

# Regex to find file content blocks formatted by attach.py
new_docs = re.findall(r'--- Content of .*? ---\n(.*?)\n--- End of file ---', full_text, re.DOTALL)

# Index new files immediately
if new_docs:
    try:
        collection.add(documents=new_docs, ids=[f"doc_{hash(d)}" for d in new_docs])
    except: pass

Step 2.3: Tool Use (Chart Generation)

This is what makes it “Agentic”. The model evaluates the user query. If the user asks for a “chart” or “plot”, it switches logic paths to generate HTML instead of just text.
# Define the tool
async def generate_chart(description: str, data: str) -> str:
    # ... (Prompt engineering to generate ApexCharts HTML) ...
    # See agenticrag.py for full implementation

# Decision Logic
if any(w in query.lower() for w in ["chart", "plot", "graph", "visualize"]):
    yield "Generating visualization..."
    chart_html = await generate_chart(query, retrieved_context)
    yield f'<iframe srcdoc="{chart_html}" ... ></iframe>'
else:
    # Standard RAG Text Response
    # ...

Key Differences Summary

FeatureBasic RAG (basicrag.py)Agentic RAG (agenticrag.py)
Knowledge BaseHardcoded, static strings.Dynamic, built from user uploads.
Input HandlingText only.Text + Files (PDF, etc.) via attach.py.
ReasoningLinear: Query -> Retrieve -> Answer.Branching: Check intent -> (Visualize OR Answer).
OutputPlain Text.Text or Interactive HTML Widgets.

Full Code Reference

1. Basic RAG (basicrag.py)

import cycls

# Initialize the agent with dependencies
agent = cycls.Agent(
    pip=["chromadb", "openai", "python-dotenv"],
    copy=[".env"]
)

@agent("chroma-agent", title="RAG Agent")
async def search_agent(context):
    import os
    from dotenv import load_dotenv
    import chromadb
    from chromadb.utils import embedding_functions

    load_dotenv()
    
    # Setup OpenAI Embedding Function
    openai_ef = embedding_functions.OpenAIEmbeddingFunction(
        api_key=os.getenv("OPENAI_API_KEY"),
        model_name="text-embedding-3-small"
    )
    
    # Initialize ChromaDB client
    client = chromadb.Client()
    
    # Create collection with the embedding function
    collection = client.get_or_create_collection(
        name="docs", 
        embedding_function=openai_ef
    )

    # Add documents to the collection
    collection.add(
        documents=["I love cats", "I love dogs", "The weather is nice"],
        ids=["1", "2", "3"]
    )

    # Query using the latest message
    query = context.messages[-1]["content"]
    results = collection.query(query_texts=[query], n_results=1)

    # Return retrieved context
    retrieved_doc = results['documents'][0][0]
    yield f"Context: {retrieved_doc}"

# Run locally
agent.deploy(prod=False)

2. Agentic RAG (agenticrag.py)

import cycls

# Initialize agent with dependencies for RAG + Attachments + Charts
agent = cycls.Agent(
    pip=["chromadb", "openai", "python-dotenv", "httpx", "PyPDF2"],
    copy=[".env", "attach.py"]
)

@agent("agentic-rag", title="Agentic RAG")
async def chat(context):
    import os
    import re
    from dotenv import load_dotenv
    import chromadb
    from chromadb.utils import embedding_functions
    from openai import OpenAI, AsyncOpenAI
    import attach 

    load_dotenv()
    
    # 1. Setup ChromaDB
    print("\n[DEBUG] 1. Setting up ChromaDB client and embedding function...")
    openai_ef = embedding_functions.OpenAIEmbeddingFunction(
        api_key=os.getenv("OPENAI_API_KEY"),
        model_name="text-embedding-3-small"
    )
    client = chromadb.Client()
    collection = client.get_or_create_collection(name="docs", embedding_function=openai_ef)
    print("[DEBUG] ChromaDB collection 'docs' ready.")

    # 2. Process Attachments
    print("[DEBUG] 2. Processing message for attachments...")
    processed = await attach.process_messages_for_openai([context.messages[-1]], debug=True)
    last_msg_content = processed[0]['content']
    full_text = last_msg_content if isinstance(last_msg_content, str) else last_msg_content[0]['text']
    
    # Index new files
    new_docs = re.findall(r'--- Content of .*? ---\n(.*?)\n--- End of file ---', full_text, re.DOTALL)
    if new_docs:
        print(f"[DEBUG] Found {len(new_docs)} new document sections to index.")
        try:
            collection.add(documents=new_docs, ids=[f"doc_{hash(d)}" for d in new_docs])
            print(f"[DEBUG] Successfully indexed {len(new_docs)} documents in ChromaDB.")
        except Exception as e:
            print(f"[DEBUG] Error indexing documents: {e}")
    else:
        print("[DEBUG] No new documents found to index.")

    # 3. Identify User Query
    original_content = context.messages[-1].get('content', '')
    query = ""
    if isinstance(original_content, str):
        query = original_content
    elif isinstance(original_content, list):
        query = " ".join([p['text'] for p in original_content if p.get('type') == 'text'])
    
    if not query.strip():
        query = "Summarize the uploaded document."
    
    print(f"[DEBUG] 3. User Query identified: '{query}'")

    # 4. RAG Retrieval
    print(f"[DEBUG] 4. Querying ChromaDB for top 1 result...")
    results = collection.query(query_texts=[query], n_results=1)
    retrieved_context = results['documents'][0][0] if results['documents'][0] else "No context found."
    print(f"[DEBUG] Retrieved Context length: {len(retrieved_context)} chars")
    print(f"[DEBUG] Context Preview: {retrieved_context[:100]}...")

    # --- Chart Generation Tool ---
    async def generate_chart(description: str, data: str) -> str:
        """Generates an HTML chart using ApexCharts via LLM."""
        vis_prompt = """
        You are a data visualization expert. Generate a single HTML file containing an interactive ApexCharts visualization.
        
        REQUIREMENTS:
        - Use the 'ApexCharts' library from CDN: <script src="https://cdn.jsdelivr.net/npm/apexcharts"></script>
        - The container div MUST have id="chart".
        - Initialize the chart in a <script> tag at the end of the body.
        - Use a modern, clean color palette (e.g., blues, teals, grays).
        - Ensure the chart is responsive (width: '100%').
        - Add tooltips and clear axis labels.
        - Title should be centered and styled appropriately.
        - Font family should be sans-serif (e.g., Inter, Roboto, Arial).
        
        Output ONLY the raw HTML code starting with <!DOCTYPE html>. Do not include markdown blocks (```html).
        Ensure the chart is beautiful, professional, and uses the provided data accurately.
        """
        llm_client = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY")) # Re-using OpenAI for simplicity
        response = await llm_client.chat.completions.create(
            model="gpt-4o",
            messages=[
                {"role": "system", "content": vis_prompt},
                {"role": "user", "content": f"Description: {description}\nData: {data}"}
            ]
        )
        html = response.choices[0].message.content.strip()
        # Cleanup markdown if present
        if html.startswith("```"): html = html.split("\n", 1)[1].rsplit("\n", 1)[0]
        # Compress and escape for iframe
        compressed = " ".join(html.split()).replace('"', "&quot;")
        return compressed

    # 5. Decide: Chart or Text?
    if any(w in query.lower() for w in ["chart", "plot", "graph", "visualize"]):
        print("[DEBUG] 5. Decision: Generating visualization (Tool Use)")
        yield "Generating visualization..."
        chart_html = await generate_chart(query, retrieved_context)
        yield f'<iframe srcdoc="{chart_html}" width="100%" height="600" style="border:none; border-radius: 8px; box-shadow: 0 4px 6px -1px rgba(0, 0, 0, 0.1); width: 100%; min-width: 600px;"></iframe>'
    else:
        print("[DEBUG] 5. Decision: Generating text response")
        # 6. Generate Text Answer
        llm = OpenAI()
        response = llm.chat.completions.create(
            model="gpt-4o",
            messages=[
                {"role": "system", "content": "You are a helpful RAG assistant. Answer using the provided context."},
                {"role": "user", "content": f"Context: {retrieved_context}\n\nQuestion: {query}"}
            ]
        )
        yield response.choices[0].message.content

# Run
agent.deploy(prod=False)

3. Attachment Handler (attach.py)

"""
Cycls Attachment Handler Module
================================
Handles attached files (PDF, Images, Text, CSV) and converts them to OpenAI format.

Usage:
---------
from attach import process_messages_for_openai, has_visual_content

async def llm(context):
    processed_messages = await process_messages_for_openai(context.messages)
    # ... use processed_messages with OpenAI
"""

import os
import re
import base64
import httpx


def extract_file_info(content_item):
    """Extract file URL and type from cycls file content item"""
    if content_item.get('type') != 'file':
        return None
    
    text = content_item.get('text', '')
    
    # Extract URL using regex
    url_match = re.search(r'File URL: (https?://[^\s\n]+)', text)
    type_match = re.search(r'File type: ([^\s\n]+)', text)
    name_match = re.search(r'\[File attached: ([^\]]+)\]', text)
    
    if url_match:
        return {
            'url': url_match.group(1),
            'type': type_match.group(1) if type_match else 'unknown',
            'name': name_match.group(1) if name_match else 'file'
        }
    return None


async def download_file(url, timeout=60.0):
    """Download file from URL and return bytes"""
    try:
        async with httpx.AsyncClient(timeout=timeout) as client:
            response = await client.get(url, follow_redirects=True)
            if response.status_code == 200:
                return response.content
            print(f"[attach] Download failed with status {response.status_code}")
    except Exception as e:
        print(f"[attach] Download error: {e}")
    return None


def extract_pdf_text(pdf_bytes):
    """Extract text from PDF bytes"""
    try:
        import PyPDF2  # type: ignore[import-untyped]
        import io
        
        pdf_file = io.BytesIO(pdf_bytes)
        pdf_reader = PyPDF2.PdfReader(pdf_file)
        
        text = ""
        for page in pdf_reader.pages:
            text += page.extract_text() + "\n"
        
        return text.strip()
    except Exception as e:
        print(f"[attach] PDF extraction error: {e}")
        return None


def get_mime_type(url_or_filename):
    """Detect mime type from URL or filename"""
    lower = url_or_filename.lower()
    if '.jpg' in lower or '.jpeg' in lower:
        return 'image/jpeg'
    elif '.gif' in lower:
        return 'image/gif'
    elif '.webp' in lower:
        return 'image/webp'
    elif '.png' in lower:
        return 'image/png'
    elif '.svg' in lower:
        return 'image/svg+xml'
    return 'image/png'  # default


async def process_messages_for_openai(messages, debug=False):
    """
    Process cycls messages and handle file attachments.
    
    Args:
        messages: List of message dicts from context.messages
        debug: If True, print debug information
    
    Returns:
        List of processed messages ready for OpenAI API
    """
    processed_messages = []
    
    for msg in messages:
        role = msg.get('role', 'user')
        content = msg.get('content', '')
        
        # If content is a string, keep it simple
        if isinstance(content, str):
            if content:  # Skip empty messages
                processed_messages.append({'role': role, 'content': content})
            continue
        
        # If content is a list, process each item
        if isinstance(content, list):
            new_content = []
            file_texts = []
            
            for item in content:
                item_type = item.get('type', '')
                
                if item_type == 'text':
                    text = item.get('text', '')
                    if text:
                        new_content.append({'type': 'text', 'text': text})
                
                elif item_type == 'image_url':
                    # Download image and convert to base64
                    image_url = item.get('image_url', {}).get('url', '')
                    
                    if image_url.startswith('data:'):
                        # Already base64, use as is
                        new_content.append(item)
                    elif image_url.startswith('http'):
                        if debug:
                            print(f"[attach] Downloading image...")
                        image_bytes = await download_file(image_url)
                        if image_bytes:
                            b64_data = base64.b64encode(image_bytes).decode('utf-8')
                            mime_type = get_mime_type(image_url)
                            new_content.append({
                                'type': 'image_url',
                                'image_url': {'url': f"data:{mime_type};base64,{b64_data}"}
                            })
                            if debug:
                                print(f"[attach] Image converted to base64 ({mime_type})")
                        else:
                            new_content.append({'type': 'text', 'text': '[Image could not be loaded]'})
                    else:
                        new_content.append(item)
                
                elif item_type == 'file':
                    # Extract file info and process
                    file_info = extract_file_info(item)
                    if file_info:
                        if debug:
                            print(f"[attach] Processing file: {file_info['name']} ({file_info['type']})")
                        
                        # Download the file
                        file_bytes = await download_file(file_info['url'])
                        
                        if file_bytes:
                            file_type = file_info['type'].lower()
                            
                            if 'pdf' in file_type:
                                # Extract text from PDF
                                pdf_text = extract_pdf_text(file_bytes)
                                if pdf_text:
                                    file_texts.append(f"\n\n--- Content of {file_info['name']} ---\n{pdf_text}\n--- End of file ---\n")
                                    if debug:
                                        print(f"[attach] Extracted {len(pdf_text)} chars from PDF")
                                else:
                                    file_texts.append(f"\n\n[Could not extract text from {file_info['name']}]\n")
                            
                            elif 'image' in file_type:
                                # Convert image to base64
                                b64_data = base64.b64encode(file_bytes).decode('utf-8')
                                mime_type = file_info['type']
                                new_content.append({
                                    'type': 'image_url',
                                    'image_url': {'url': f"data:{mime_type};base64,{b64_data}"}
                                })
                            
                            else:
                                # Try to read as text (txt, csv, json, etc.)
                                try:
                                    text_content = file_bytes.decode('utf-8')
                                    file_texts.append(f"\n\n--- Content of {file_info['name']} ---\n{text_content}\n--- End of file ---\n")
                                except:
                                    file_texts.append(f"\n\n[Could not read {file_info['name']}]\n")
            
            # Combine text content with file texts
            if file_texts:
                combined_text = ""
                for item in new_content:
                    if item.get('type') == 'text':
                        combined_text += item.get('text', '') + "\n"
                
                combined_text += "".join(file_texts)
                
                # If there are images, keep them and add combined text
                image_items = [item for item in new_content if item.get('type') == 'image_url']
                if image_items:
                    final_content = [{'type': 'text', 'text': combined_text.strip()}] + image_items
                    processed_messages.append({'role': role, 'content': final_content})
                else:
                    processed_messages.append({'role': role, 'content': combined_text.strip()})
            elif new_content:
                # No files, just regular content
                if len(new_content) == 1 and new_content[0].get('type') == 'text':
                    processed_messages.append({'role': role, 'content': new_content[0]['text']})
                else:
                    processed_messages.append({'role': role, 'content': new_content})
    
    return processed_messages


def has_visual_content(messages):
    """Check if processed messages contain images (need gpt-4o)"""
    return any(
        isinstance(msg.get('content'), list) and 
        any(item.get('type') == 'image_url' for item in msg.get('content', []))
        for msg in messages
    )