cycls, chromadb, and openai, starting with a basic implementation and evolving it into an “Agentic” RAG that handles file uploads and dynamic visualization tools.
You will learn how to:
- Build a Basic RAG system with static documents.
- Create an Attachment Handler to process user uploads (PDFs, etc.).
- Implement an Agentic RAG that indexes content dynamically.
- Add Tooling capabilities to generate interactive charts.
Prerequisites
- Python 3.8+
cyclspackage installed- OpenAI API Key
- Docker installed (for local testing)
pip install cycls
Part 1: The Basic RAG
We start by creating a simple RAG system. This agent will have a pre-defined “knowledge base” hardcoded into it. It demonstrates the core loop of RAG: Embed -> Store -> Retrieve -> Generate. Create a file namedbasicrag.py.
Step 1.1: Setup and Dependencies
We initialize the agent withchromadb (vector database) and openai (embeddings and generation).
import cycls
# Initialize the agent with dependencies
agent = cycls.Agent(
pip=["chromadb", "openai", "python-dotenv"],
copy=[".env"]
)
Step 1.2: The Core Logic
The handler function performs the RAG operations. Note that in this basic version, documents are hardcoded.@agent("chroma-agent", title="RAG Agent")
async def search_agent(context):
import os
from dotenv import load_dotenv
import chromadb
from chromadb.utils import embedding_functions
load_dotenv()
# 1. Setup OpenAI Embedding Function
openai_ef = embedding_functions.OpenAIEmbeddingFunction(
api_key=os.getenv("OPENAI_API_KEY"),
model_name="text-embedding-3-small"
)
# 2. Initialize ChromaDB (Ephemeral in-memory for this demo)
client = chromadb.Client()
collection = client.get_or_create_collection(name="docs", embedding_function=openai_ef)
# 3. Ingest Static Knowledge
# In a real app, this would come from a database or file loader
collection.add(
documents=["I love cats", "I love dogs", "The weather is nice"],
ids=["1", "2", "3"]
)
# 4. Retrieve Context
query = context.messages[-1]["content"]
results = collection.query(query_texts=[query], n_results=1)
retrieved_doc = results['documents'][0][0]
# 5. Generate Response (Simple yield for demo)
yield f"Context: {retrieved_doc}"
# Run locally
agent.deploy(prod=False)
- The knowledge base is static and ephemeral (re-created on every run).
- It cannot handle user files or new data.
- It strictly retrieves text; it doesn’t do anything with it other than display it.
Part 2: Evolving to Agentic RAG
Now we move toagenticrag.py. An “Agentic” RAG doesn’t just look up info; it interacts with the environment. It will:
- Read files you upload (PDFs, etc.).
- Index them on the fly.
- Decide whether to answer with text or generate a visualization (Tool Use).
Helper Module: attach.py
To keep our agent clean, we move complex file handling to attach.py. This module handles:
- Downloading files from Cycls URLs.
- Extracting text from PDFs (
PyPDF2). - Formatting messages for the LLM.
attach.py is in the same directory.
Step 2.1: Agent Configuration
We need more dependencies now, includingPyPDF2 for parsing and httpx for downloading.
agent = cycls.Agent(
pip=["chromadb", "openai", "python-dotenv", "httpx", "PyPDF2"],
copy=[".env", "attach.py"] # detailed copy instruction
)
Step 2.2: Dynamic Indexing
Instead of hardcoded strings, we process the incoming message to find file content.# Inside agenticrag.py handler...
# Process Attachments using our helper
processed = await attach.process_messages_for_openai([context.messages[-1]])
last_msg_content = processed[0]['content']
full_text = last_msg_content if isinstance(last_msg_content, str) else last_msg_content[0]['text']
# Regex to find file content blocks formatted by attach.py
new_docs = re.findall(r'--- Content of .*? ---\n(.*?)\n--- End of file ---', full_text, re.DOTALL)
# Index new files immediately
if new_docs:
try:
collection.add(documents=new_docs, ids=[f"doc_{hash(d)}" for d in new_docs])
except: pass
Step 2.3: Tool Use (Chart Generation)
This is what makes it “Agentic”. The model evaluates the user query. If the user asks for a “chart” or “plot”, it switches logic paths to generate HTML instead of just text.# Define the tool
async def generate_chart(description: str, data: str) -> str:
# ... (Prompt engineering to generate ApexCharts HTML) ...
# See agenticrag.py for full implementation
# Decision Logic
if any(w in query.lower() for w in ["chart", "plot", "graph", "visualize"]):
yield "Generating visualization..."
chart_html = await generate_chart(query, retrieved_context)
yield f'<iframe srcdoc="{chart_html}" ... ></iframe>'
else:
# Standard RAG Text Response
# ...
Key Differences Summary
| Feature | Basic RAG (basicrag.py) | Agentic RAG (agenticrag.py) |
|---|---|---|
| Knowledge Base | Hardcoded, static strings. | Dynamic, built from user uploads. |
| Input Handling | Text only. | Text + Files (PDF, etc.) via attach.py. |
| Reasoning | Linear: Query -> Retrieve -> Answer. | Branching: Check intent -> (Visualize OR Answer). |
| Output | Plain Text. | Text or Interactive HTML Widgets. |
Full Code Reference
1. Basic RAG (basicrag.py)
import cycls
# Initialize the agent with dependencies
agent = cycls.Agent(
pip=["chromadb", "openai", "python-dotenv"],
copy=[".env"]
)
@agent("chroma-agent", title="RAG Agent")
async def search_agent(context):
import os
from dotenv import load_dotenv
import chromadb
from chromadb.utils import embedding_functions
load_dotenv()
# Setup OpenAI Embedding Function
openai_ef = embedding_functions.OpenAIEmbeddingFunction(
api_key=os.getenv("OPENAI_API_KEY"),
model_name="text-embedding-3-small"
)
# Initialize ChromaDB client
client = chromadb.Client()
# Create collection with the embedding function
collection = client.get_or_create_collection(
name="docs",
embedding_function=openai_ef
)
# Add documents to the collection
collection.add(
documents=["I love cats", "I love dogs", "The weather is nice"],
ids=["1", "2", "3"]
)
# Query using the latest message
query = context.messages[-1]["content"]
results = collection.query(query_texts=[query], n_results=1)
# Return retrieved context
retrieved_doc = results['documents'][0][0]
yield f"Context: {retrieved_doc}"
# Run locally
agent.deploy(prod=False)
2. Agentic RAG (agenticrag.py)
import cycls
# Initialize agent with dependencies for RAG + Attachments + Charts
agent = cycls.Agent(
pip=["chromadb", "openai", "python-dotenv", "httpx", "PyPDF2"],
copy=[".env", "attach.py"]
)
@agent("agentic-rag", title="Agentic RAG")
async def chat(context):
import os
import re
from dotenv import load_dotenv
import chromadb
from chromadb.utils import embedding_functions
from openai import OpenAI, AsyncOpenAI
import attach
load_dotenv()
# 1. Setup ChromaDB
print("\n[DEBUG] 1. Setting up ChromaDB client and embedding function...")
openai_ef = embedding_functions.OpenAIEmbeddingFunction(
api_key=os.getenv("OPENAI_API_KEY"),
model_name="text-embedding-3-small"
)
client = chromadb.Client()
collection = client.get_or_create_collection(name="docs", embedding_function=openai_ef)
print("[DEBUG] ChromaDB collection 'docs' ready.")
# 2. Process Attachments
print("[DEBUG] 2. Processing message for attachments...")
processed = await attach.process_messages_for_openai([context.messages[-1]], debug=True)
last_msg_content = processed[0]['content']
full_text = last_msg_content if isinstance(last_msg_content, str) else last_msg_content[0]['text']
# Index new files
new_docs = re.findall(r'--- Content of .*? ---\n(.*?)\n--- End of file ---', full_text, re.DOTALL)
if new_docs:
print(f"[DEBUG] Found {len(new_docs)} new document sections to index.")
try:
collection.add(documents=new_docs, ids=[f"doc_{hash(d)}" for d in new_docs])
print(f"[DEBUG] Successfully indexed {len(new_docs)} documents in ChromaDB.")
except Exception as e:
print(f"[DEBUG] Error indexing documents: {e}")
else:
print("[DEBUG] No new documents found to index.")
# 3. Identify User Query
original_content = context.messages[-1].get('content', '')
query = ""
if isinstance(original_content, str):
query = original_content
elif isinstance(original_content, list):
query = " ".join([p['text'] for p in original_content if p.get('type') == 'text'])
if not query.strip():
query = "Summarize the uploaded document."
print(f"[DEBUG] 3. User Query identified: '{query}'")
# 4. RAG Retrieval
print(f"[DEBUG] 4. Querying ChromaDB for top 1 result...")
results = collection.query(query_texts=[query], n_results=1)
retrieved_context = results['documents'][0][0] if results['documents'][0] else "No context found."
print(f"[DEBUG] Retrieved Context length: {len(retrieved_context)} chars")
print(f"[DEBUG] Context Preview: {retrieved_context[:100]}...")
# --- Chart Generation Tool ---
async def generate_chart(description: str, data: str) -> str:
"""Generates an HTML chart using ApexCharts via LLM."""
vis_prompt = """
You are a data visualization expert. Generate a single HTML file containing an interactive ApexCharts visualization.
REQUIREMENTS:
- Use the 'ApexCharts' library from CDN: <script src="https://cdn.jsdelivr.net/npm/apexcharts"></script>
- The container div MUST have id="chart".
- Initialize the chart in a <script> tag at the end of the body.
- Use a modern, clean color palette (e.g., blues, teals, grays).
- Ensure the chart is responsive (width: '100%').
- Add tooltips and clear axis labels.
- Title should be centered and styled appropriately.
- Font family should be sans-serif (e.g., Inter, Roboto, Arial).
Output ONLY the raw HTML code starting with <!DOCTYPE html>. Do not include markdown blocks (```html).
Ensure the chart is beautiful, professional, and uses the provided data accurately.
"""
llm_client = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY")) # Re-using OpenAI for simplicity
response = await llm_client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": vis_prompt},
{"role": "user", "content": f"Description: {description}\nData: {data}"}
]
)
html = response.choices[0].message.content.strip()
# Cleanup markdown if present
if html.startswith("```"): html = html.split("\n", 1)[1].rsplit("\n", 1)[0]
# Compress and escape for iframe
compressed = " ".join(html.split()).replace('"', """)
return compressed
# 5. Decide: Chart or Text?
if any(w in query.lower() for w in ["chart", "plot", "graph", "visualize"]):
print("[DEBUG] 5. Decision: Generating visualization (Tool Use)")
yield "Generating visualization..."
chart_html = await generate_chart(query, retrieved_context)
yield f'<iframe srcdoc="{chart_html}" width="100%" height="600" style="border:none; border-radius: 8px; box-shadow: 0 4px 6px -1px rgba(0, 0, 0, 0.1); width: 100%; min-width: 600px;"></iframe>'
else:
print("[DEBUG] 5. Decision: Generating text response")
# 6. Generate Text Answer
llm = OpenAI()
response = llm.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": "You are a helpful RAG assistant. Answer using the provided context."},
{"role": "user", "content": f"Context: {retrieved_context}\n\nQuestion: {query}"}
]
)
yield response.choices[0].message.content
# Run
agent.deploy(prod=False)
3. Attachment Handler (attach.py)
"""
Cycls Attachment Handler Module
================================
Handles attached files (PDF, Images, Text, CSV) and converts them to OpenAI format.
Usage:
---------
from attach import process_messages_for_openai, has_visual_content
async def llm(context):
processed_messages = await process_messages_for_openai(context.messages)
# ... use processed_messages with OpenAI
"""
import os
import re
import base64
import httpx
def extract_file_info(content_item):
"""Extract file URL and type from cycls file content item"""
if content_item.get('type') != 'file':
return None
text = content_item.get('text', '')
# Extract URL using regex
url_match = re.search(r'File URL: (https?://[^\s\n]+)', text)
type_match = re.search(r'File type: ([^\s\n]+)', text)
name_match = re.search(r'\[File attached: ([^\]]+)\]', text)
if url_match:
return {
'url': url_match.group(1),
'type': type_match.group(1) if type_match else 'unknown',
'name': name_match.group(1) if name_match else 'file'
}
return None
async def download_file(url, timeout=60.0):
"""Download file from URL and return bytes"""
try:
async with httpx.AsyncClient(timeout=timeout) as client:
response = await client.get(url, follow_redirects=True)
if response.status_code == 200:
return response.content
print(f"[attach] Download failed with status {response.status_code}")
except Exception as e:
print(f"[attach] Download error: {e}")
return None
def extract_pdf_text(pdf_bytes):
"""Extract text from PDF bytes"""
try:
import PyPDF2 # type: ignore[import-untyped]
import io
pdf_file = io.BytesIO(pdf_bytes)
pdf_reader = PyPDF2.PdfReader(pdf_file)
text = ""
for page in pdf_reader.pages:
text += page.extract_text() + "\n"
return text.strip()
except Exception as e:
print(f"[attach] PDF extraction error: {e}")
return None
def get_mime_type(url_or_filename):
"""Detect mime type from URL or filename"""
lower = url_or_filename.lower()
if '.jpg' in lower or '.jpeg' in lower:
return 'image/jpeg'
elif '.gif' in lower:
return 'image/gif'
elif '.webp' in lower:
return 'image/webp'
elif '.png' in lower:
return 'image/png'
elif '.svg' in lower:
return 'image/svg+xml'
return 'image/png' # default
async def process_messages_for_openai(messages, debug=False):
"""
Process cycls messages and handle file attachments.
Args:
messages: List of message dicts from context.messages
debug: If True, print debug information
Returns:
List of processed messages ready for OpenAI API
"""
processed_messages = []
for msg in messages:
role = msg.get('role', 'user')
content = msg.get('content', '')
# If content is a string, keep it simple
if isinstance(content, str):
if content: # Skip empty messages
processed_messages.append({'role': role, 'content': content})
continue
# If content is a list, process each item
if isinstance(content, list):
new_content = []
file_texts = []
for item in content:
item_type = item.get('type', '')
if item_type == 'text':
text = item.get('text', '')
if text:
new_content.append({'type': 'text', 'text': text})
elif item_type == 'image_url':
# Download image and convert to base64
image_url = item.get('image_url', {}).get('url', '')
if image_url.startswith('data:'):
# Already base64, use as is
new_content.append(item)
elif image_url.startswith('http'):
if debug:
print(f"[attach] Downloading image...")
image_bytes = await download_file(image_url)
if image_bytes:
b64_data = base64.b64encode(image_bytes).decode('utf-8')
mime_type = get_mime_type(image_url)
new_content.append({
'type': 'image_url',
'image_url': {'url': f"data:{mime_type};base64,{b64_data}"}
})
if debug:
print(f"[attach] Image converted to base64 ({mime_type})")
else:
new_content.append({'type': 'text', 'text': '[Image could not be loaded]'})
else:
new_content.append(item)
elif item_type == 'file':
# Extract file info and process
file_info = extract_file_info(item)
if file_info:
if debug:
print(f"[attach] Processing file: {file_info['name']} ({file_info['type']})")
# Download the file
file_bytes = await download_file(file_info['url'])
if file_bytes:
file_type = file_info['type'].lower()
if 'pdf' in file_type:
# Extract text from PDF
pdf_text = extract_pdf_text(file_bytes)
if pdf_text:
file_texts.append(f"\n\n--- Content of {file_info['name']} ---\n{pdf_text}\n--- End of file ---\n")
if debug:
print(f"[attach] Extracted {len(pdf_text)} chars from PDF")
else:
file_texts.append(f"\n\n[Could not extract text from {file_info['name']}]\n")
elif 'image' in file_type:
# Convert image to base64
b64_data = base64.b64encode(file_bytes).decode('utf-8')
mime_type = file_info['type']
new_content.append({
'type': 'image_url',
'image_url': {'url': f"data:{mime_type};base64,{b64_data}"}
})
else:
# Try to read as text (txt, csv, json, etc.)
try:
text_content = file_bytes.decode('utf-8')
file_texts.append(f"\n\n--- Content of {file_info['name']} ---\n{text_content}\n--- End of file ---\n")
except:
file_texts.append(f"\n\n[Could not read {file_info['name']}]\n")
# Combine text content with file texts
if file_texts:
combined_text = ""
for item in new_content:
if item.get('type') == 'text':
combined_text += item.get('text', '') + "\n"
combined_text += "".join(file_texts)
# If there are images, keep them and add combined text
image_items = [item for item in new_content if item.get('type') == 'image_url']
if image_items:
final_content = [{'type': 'text', 'text': combined_text.strip()}] + image_items
processed_messages.append({'role': role, 'content': final_content})
else:
processed_messages.append({'role': role, 'content': combined_text.strip()})
elif new_content:
# No files, just regular content
if len(new_content) == 1 and new_content[0].get('type') == 'text':
processed_messages.append({'role': role, 'content': new_content[0]['text']})
else:
processed_messages.append({'role': role, 'content': new_content})
return processed_messages
def has_visual_content(messages):
"""Check if processed messages contain images (need gpt-4o)"""
return any(
isinstance(msg.get('content'), list) and
any(item.get('type') == 'image_url' for item in msg.get('content', []))
for msg in messages
)