LlamaIndex
Overview
SimpleAicebergHandler mirrors key LlamaIndex agent events into Aiceberg without touching your core RAG code.
Gain transparent query, retrieval, and LLM spans plus moderation checks with one drop-in callback.
Why callbacks instead of instrumentation or workflows?
What are callbacks?
Callbacks are LlamaIndex's original observability mechanism. When your agent executes a query, retrieves documents, or calls an LLM, LlamaIndex emits events at the start and end of each operation. A callback handler—like SimpleAicebergHandler—listens for these events and can inspect or react to the data flowing through your RAG pipeline.
The key thing: callbacks are synchronous and can interrupt the flow. When we receive an event, we can send data to Aiceberg, check the moderation response, and raise an error if needed. The agent stops right there, before continuing to the next step.
Why not instrumentation?
LlamaIndex is replacing callbacks with a new instrumentation module that provides better tracing and observability. Instrumentation uses events and spans to track execution across distributed systems, similar to OpenTelemetry. It's great for debugging, performance monitoring, and visualizing what your agent did after the fact.
But instrumentation is observation-only. You can watch what happens, log it, send it to your monitoring backend—but you can't stop the workflow mid-execution. If Aiceberg detects a policy violation in the LLM output, instrumentation can't prevent that output from reaching the user. It can only record that it happened.
We need the ability to block unsafe responses before they leave the system. Callbacks give us that: when _raise_if_blocked throws a RuntimeError, the agent halts immediately and your application can show a safe fallback message instead of the flagged content.
As LlamaIndex moves toward deprecating callbacks, we'll need to revisit this design—possibly combining instrumentation for tracing with a separate validation layer. For now, callbacks are the cleanest way to enforce real-time moderation.
Why not workflows?
LlamaIndex workflows are an event-driven abstraction for building complex, multi-step processes. Instead of relying on LlamaIndex's built-in query engine, you define explicit steps that handle specific events and emit new events to trigger the next step. This gives you fine-grained control over the execution flow—you decide when to retrieve, when to call the LLM, how to handle tool calls, etc.
Workflows are powerful for custom RAG pipelines or agentic systems where you need non-standard logic (multi-stage retrieval, conditional branching, parallel tool execution). But they require you to rebuild your agent from scratch using workflow primitives.
Our goal was to add monitoring and moderation to existing LlamaIndex agents without rewriting them. Most teams already have a working query engine or chat engine; they just want to layer Aiceberg on top. Callbacks let you do that—register the handler, done. No code changes to your core RAG logic, no migration to a new execution model.
If your team is building a new agent from scratch and wants workflows for other reasons (better observability, explicit step control), you could integrate Aiceberg calls directly into your workflow steps. But for retrofitting monitoring onto existing agents, callbacks are the simplest path.
The tradeoff
Callbacks are synchronous and blocking. Every Aiceberg API call happens inline during your agent's execution, which adds latency (typically ~50-200ms per event depending on network conditions). For high-throughput production systems, this could be a bottleneck.
If latency becomes an issue, you'd want to move to async callbacks or batch events in the background. But for most RAG use cases—chatbots, internal tools, search augmentation—the added milliseconds are negligible compared to the LLM call itself (which usually takes 1-3 seconds). The ability to enforce policies in real-time is worth the tradeoff.
Why we built this
Deep instrumentation across the RAG workflow was noisy and brittle.
LlamaIndex already emits callbacks, so we forward only the highlights to Aiceberg.
Aiceberg can now block unsafe answers before they leave the agent while letting the chat continue during outages.
Callback goodness in real scenarios
Track a user conversation
Register the handler and run your agent normally.
Every query and final reply is logged under the user_agt profile.
Inspect retrieval quality
Keep the same wiring.
Aiceberg stores the raw query and retrieved chunks under the agt_mem profile.
Watch LLM output for policy issues
Leave the handler in place.
A blocked or rejected moderation result raises a RuntimeError so you can show a safe fallback.
Run locally without credentials
Skip AICEBERG_API_KEY.
Handler returns {"event_result": "passed"} so your dev loop stays fast.
How it works
SimpleAicebergHandler extends LlamaIndex's BaseCallbackHandler and listens for three specific events: QUERY, RETRIEVE, and LLM. When you register it with your agent's callback manager, LlamaIndex automatically calls our handler at the start and end of each operation.
The basic flow
When a user asks a question, LlamaIndex fires CBEventType.QUERY (start), then CBEventType.RETRIEVE (start/end) to fetch relevant docs, then CBEventType.LLM (start/end) to generate an answer, and finally CBEventType.QUERY (end) with the complete response.
Our handler catches each of these moments and forwards the data to Aiceberg:
Start events send the input (question, retrieval query, or prompt) to Aiceberg and get back an
event_id.End events send the output (answer, retrieved docs, or LLM response) and link it back to the start event using that
event_id.
If Aiceberg's moderation policy flags something as "blocked" or "rejected", we raise a RuntimeError immediately so your application can handle it gracefully—show a fallback message, log the issue, whatever makes sense for your use case.
The handler runs synchronously on every event, which keeps the code simple but does mean each Aiceberg call blocks your agent briefly. For high-throughput scenarios you'd want to batch or go async, but for most RAG use cases the added latency is negligible.
Walking through a query
Let's trace what happens when a user asks: "What's the return policy for electronics?"
Query starts (user → agent)
LlamaIndex calls on_event_start with CBEventType.QUERY. We grab the user's question from payload[EventPayload.QUERY_STR] and send it to Aiceberg:
def _handle_query_start(self, payload: Dict[str, Any]) -> None:
user_query = payload.get(EventPayload.QUERY_STR, "")
self._log_debug(f"\nQUERY START (user→agent): {user_query}")
self._send_event(
event_name="query",
content=user_query,
is_input=True,
block_message="Query input blocked by Aiceberg")Aiceberg receives:
{ "profile_id": "<AB_MONITORING_PROFILE_U2A>", "event_type": "user_agt", "forward_to_llm": false, "input": "What's the return policy for electronics?" }
Response includes event_id: "evt_abc123", which we store in self._event_links["query"] for later.
Retrieval starts (agent → memory)
Your agent queries the vector store. LlamaIndex fires CBEventType.RETRIEVE (start). We capture the retrieval query—often the same as the user question, but sometimes rewritten:
def _handle_retrieve_start(self, payload: Dict[str, Any]) -> None:
query = payload.get(EventPayload.QUERY_STR, "")
self._log_debug(f"\nRETRIEVE START (agent→mem): {query}")
self._send_event(
event_name="retrieve",
content=query,
is_input=True,
block_message="Retrieve input blocked by Aiceberg",
)Aiceberg sees this under the AB_MONITORING_PROFILE_A2MEM profile as agt_mem type.
Retrieval ends (memory → agent)
The vector store returns matching chunks. LlamaIndex calls on_event_end with CBEventType.RETRIEVE and a list of nodes. We flatten those nodes into plain text:
def _handle_retrieve_end(self, payload: Dict[str, Any]) -> None:
nodes = payload.get(EventPayload.NODES, [])
retrieved_content = self._extract_retrieved_content(nodes)
self._log_debug(f"\nRETRIEVE END (mem→agent): {len(nodes)} documents, {len(retrieved_content)} chars")
self._send_event(
event_name="retrieve",
content=retrieved_content,
is_input=False,
block_message="Retrieve output blocked by Aiceberg",
link_to_start=True,
)The helper _extract_retrieved_content walks each node looking for .text or .node.text and joins them with newlines. Aiceberg gets the full concatenated context that will go into the LLM prompt.
LLM starts (agent → LLM)
Now your agent builds a prompt from the system instructions, retrieved context, and user question. LlamaIndex fires CBEventType.LLM (start) with a messages list:
def _handle_llm_start(self, payload: Dict[str, Any]) -> None:
messages = payload.get(EventPayload.MESSAGES, [])
self._log_debug(f"\nLLM START (agent→llm): {len(messages)} messages")
self._latest_llm_output = ""
prompt_text = self._extract_prompt_text(messages)
if not prompt_text:
return
self._send_event(
event_name="llm",
content=prompt_text,
is_input=True,
block_message="LLM input blocked by Aiceberg",
)_extract_prompt_text iterates over message blocks, pulling out the text content and prefixing it with the role (system/user/assistant). The result is a single string showing exactly what goes to the model.
LLM ends (LLM → agent)
The model responds. LlamaIndex calls on_event_end with CBEventType.LLM:
def _handle_llm_end(self, payload: Dict[str, Any]) -> None:
response = payload.get(EventPayload.RESPONSE, "")
llm_answer = self._extract_llm_response(response)
self._log_debug(f"\nLLM END (llm→agent): {llm_answer[:100]}...")
self._latest_llm_output = llm_answer
self._send_event(
event_name="llm",
content=llm_answer,
is_input=False,
block_message="LLM output blocked by Aiceberg",
link_to_start=True,
)We extract the model's text (checking for .message.blocks, .message.content, or .content depending on the LLM integration), store it in self._latest_llm_output, and send it to Aiceberg linked to the LLM start event.
Query ends (agent → user)
Finally, the agent wraps everything up and returns the answer to the user. LlamaIndex fires CBEventType.QUERY (end):
def _handle_query_end(self, payload: Dict[str, Any]) -> None:
response = payload.get(EventPayload.RESPONSE, "")
final_answer = self._extract_query_response(response)
self._log_debug(f"\nQUERY END (agent→user): {final_answer[:100]}...")
self._send_event(
event_name="query",
content=final_answer,
is_input=False,
block_message="Query output blocked by Aiceberg",
link_to_start=True,
)
# Sync LLM output if needed
if self.profile_agent_llm:
llm_output_to_sync = final_answer or self._latest_llm_output
if llm_output_to_sync:
self._send_event(
event_name="llm",
content=llm_output_to_sync,
is_input=False,
block_message="LLM output blocked by Aiceberg",
link_to_start=True,
)This sends the final answer to the user and also syncs it as the LLM output to Aiceberg. The sync ensures that even if the agent post-processes the LLM response (formatting, citations, etc.), Aiceberg sees the final text the user receives.
The dashboard view
After this flow completes, you'll see three conversation pairs in Aiceberg:
User to Agent (profile: U2A)
Input: "What's the return policy for electronics?"
Output: "Electronics can be returned within 30 days with original packaging..."
Agent to Memory (profile: A2MEM)
Input: "What's the return policy for electronics?"
Output: [concatenated text from retrieved docs]
Agent to LLM (profile: A2M)
Input: [system instruction + context + query as one prompt]
Output: [raw model response]
Each pair is linked by the event_id, so you can trace a single user question through the entire RAG pipeline.
Key design choices
Why only QUERY, RETRIEVE, and LLM?
These three cover the core RAG workflow: what the user asked, what context was fetched, and what the model said. LlamaIndex emits other events (EMBEDDING, SYNTHESIZE, etc.), but they're either redundant (synthesis is captured in QUERY end) or not directly relevant for conversation monitoring (embedding generation). If your team needs tool calls or sub-questions, you'd add handlers for CBEventType.TOOL or CBEventType.SUB_QUESTION.
Why three separate profiles?
Each conversation type (user↔agent, agent↔memory, agent↔LLM) has different moderation needs. You might allow certain language from users but block it in retrieval results, or apply stricter policies to LLM inputs vs outputs. Separate profiles give you that flexibility without complicated conditional logic.
What the code does
_send_event is the common path for all Aiceberg calls. It looks up the profile ID and event type from _event_config, builds the payload, calls send_to_aiceberg, checks for blocks, and stores the event_id for linking start/end pairs.
send_to_aiceberg does the HTTP POST to base_url/eap/v0/event with your API key. If there's no key in the environment, it returns {"event_result": "passed"} so local dev works without credentials. Network errors also return "passed" so monitoring failures don't break your agent.
_raise_if_blocked checks the response from Aiceberg. If event_result is "blocked" or "rejected", it raises RuntimeError with a descriptive message. This stops the agent immediately so you can catch the exception and show a safe fallback.
The extract helpers (_extract_prompt_text, _extract_retrieved_content, etc.) handle the messy details of LlamaIndex's payload shapes. Different LLM integrations return responses in different formats, so these functions check for various attributes and fallback to str() if needed.
Quick setup (5 minutes)
Install deps (once):
pip install -r requirements.txtAdd environment variables (.env works well):
AICEBERG_API_KEY=Bearer ...AB_MONITORING_PROFILE_U2A=...(query events)AB_MONITORING_PROFILE_A2MEM=...(retrieval events)AB_MONITORING_PROFILE_A2M=...(LLM events)
Register the handler:
from llama_index.core import Settings
from llama_index.core.callbacks import CallbackManager
from llama_index_rag_engine.aiceberg_callback_monitor import SimpleAicebergHandler
handler = SimpleAicebergHandler(debug=True)
Settings.callback_manager = CallbackManager([handler])Run your usual agent script (for example python run_rag.py).
Check the console for debug previews and confirm the events in the Aiceberg dashboard.
Event flow at a glance
CBEventType.QUERY
User question
Final answer
user_agt
CBEventType.RETRIEVE
Retrieval query
Flattened document snippets
agt_mem
CBEventType.LLM
Prompt blocks
Model response text
agt_llm
Logging & observability
Startup banner shows credential detection and monitored event families.
Each event prints a short preview (message counts, doc totals, output length) when debug=True.
Successful sends log the number of characters delivered to Aiceberg.
Blocked or rejected events raise clear messages such as "LLM output blocked by Aiceberg".
Last updated