# LlamaIndex

## Overview

`SimpleAicebergHandler` mirrors key LlamaIndex agent events into Aiceberg without touching your core RAG code.

Gain transparent query, retrieval, and LLM spans plus moderation checks with one drop-in callback.

## Why callbacks instead of instrumentation or workflows?

### What are callbacks?

Callbacks are LlamaIndex's original observability mechanism. When your agent executes a query, retrieves documents, or calls an LLM, LlamaIndex emits events at the start and end of each operation. A callback handler—like `SimpleAicebergHandler`—listens for these events and can inspect or react to the data flowing through your RAG pipeline.

The key thing: callbacks are synchronous and can interrupt the flow. When we receive an event, we can send data to Aiceberg, check the moderation response, and raise an error if needed. The agent stops right there, before continuing to the next step.

### Why not instrumentation?

LlamaIndex is replacing callbacks with a new instrumentation module that provides better tracing and observability. Instrumentation uses events and spans to track execution across distributed systems, similar to OpenTelemetry. It's great for debugging, performance monitoring, and visualizing what your agent did after the fact.

But instrumentation is observation-only. You can watch what happens, log it, send it to your monitoring backend—but you can't stop the workflow mid-execution. If Aiceberg detects a policy violation in the LLM output, instrumentation can't prevent that output from reaching the user. It can only record that it happened.

We need the ability to block unsafe responses before they leave the system. Callbacks give us that: when `_raise_if_blocked` throws a `RuntimeError`, the agent halts immediately and your application can show a safe fallback message instead of the flagged content.

As LlamaIndex moves toward deprecating callbacks, we'll need to revisit this design—possibly combining instrumentation for tracing with a separate validation layer. For now, callbacks are the cleanest way to enforce real-time moderation.

### Why not workflows?

LlamaIndex workflows are an event-driven abstraction for building complex, multi-step processes. Instead of relying on LlamaIndex's built-in query engine, you define explicit steps that handle specific events and emit new events to trigger the next step. This gives you fine-grained control over the execution flow—you decide when to retrieve, when to call the LLM, how to handle tool calls, etc.

Workflows are powerful for custom RAG pipelines or agentic systems where you need non-standard logic (multi-stage retrieval, conditional branching, parallel tool execution). But they require you to rebuild your agent from scratch using workflow primitives.

Our goal was to add monitoring and moderation to existing LlamaIndex agents without rewriting them. Most teams already have a working query engine or chat engine; they just want to layer Aiceberg on top. Callbacks let you do that—register the handler, done. No code changes to your core RAG logic, no migration to a new execution model.

If your team is building a new agent from scratch and wants workflows for other reasons (better observability, explicit step control), you could integrate Aiceberg calls directly into your workflow steps. But for retrofitting monitoring onto existing agents, callbacks are the simplest path.

### The tradeoff

Callbacks are synchronous and blocking. Every Aiceberg API call happens inline during your agent's execution, which adds latency (typically \~50-200ms per event depending on network conditions). For high-throughput production systems, this could be a bottleneck.

If latency becomes an issue, you'd want to move to async callbacks or batch events in the background. But for most RAG use cases—chatbots, internal tools, search augmentation—the added milliseconds are negligible compared to the LLM call itself (which usually takes 1-3 seconds). The ability to enforce policies in real-time is worth the tradeoff.

## Why we built this

Deep instrumentation across the RAG workflow was noisy and brittle.

LlamaIndex already emits callbacks, so we forward only the highlights to Aiceberg.

Aiceberg can now block unsafe answers before they leave the agent while letting the chat continue during outages.

## Callback goodness in real scenarios

| Scenario                           |                                       What you do | What you get                                                                                     |
| ---------------------------------- | ------------------------------------------------: | ------------------------------------------------------------------------------------------------ |
| Track a user conversation          | Register the handler and run your agent normally. | Every query and final reply is logged under the user\_agt profile.                               |
| Inspect retrieval quality          |                             Keep the same wiring. | Aiceberg stores the raw query and retrieved chunks under the agt\_mem profile.                   |
| Watch LLM output for policy issues |                       Leave the handler in place. | A blocked or rejected moderation result raises a `RuntimeError` so you can show a safe fallback. |
| Run locally without credentials    |                          Skip `AICEBERG_API_KEY`. | Handler returns `{"event_result": "passed"}` so your dev loop stays fast.                        |

## How it works

`SimpleAicebergHandler` extends LlamaIndex's `BaseCallbackHandler` and listens for three specific events: `QUERY`, `RETRIEVE`, and `LLM`. When you register it with your agent's callback manager, LlamaIndex automatically calls our handler at the start and end of each operation.

### The basic flow

When a user asks a question, LlamaIndex fires `CBEventType.QUERY` (start), then `CBEventType.RETRIEVE` (start/end) to fetch relevant docs, then `CBEventType.LLM` (start/end) to generate an answer, and finally `CBEventType.QUERY` (end) with the complete response.

Our handler catches each of these moments and forwards the data to Aiceberg:

* Start events send the input (question, retrieval query, or prompt) to Aiceberg and get back an `event_id`.
* End events send the output (answer, retrieved docs, or LLM response) and link it back to the start event using that `event_id`.

If Aiceberg's moderation policy flags something as "blocked" or "rejected", we raise a `RuntimeError` immediately so your application can handle it gracefully—show a fallback message, log the issue, whatever makes sense for your use case.

The handler runs synchronously on every event, which keeps the code simple but does mean each Aiceberg call blocks your agent briefly. For high-throughput scenarios you'd want to batch or go async, but for most RAG use cases the added latency is negligible.

## Walking through a query

Let's trace what happens when a user asks: "What's the return policy for electronics?"

{% stepper %}
{% step %}

### Query starts (user → agent)

LlamaIndex calls `on_event_start` with `CBEventType.QUERY`. We grab the user's question from `payload[EventPayload.QUERY_STR]` and send it to Aiceberg:

```python
def _handle_query_start(self, payload: Dict[str, Any]) -> None:
    user_query = payload.get(EventPayload.QUERY_STR, "")
    self._log_debug(f"\nQUERY START (user→agent): {user_query}")
    self._send_event(
        event_name="query",
        content=user_query,
        is_input=True,
        block_message="Query input blocked by Aiceberg")
```

Aiceberg receives:

{ "profile\_id": "\<AB\_MONITORING\_PROFILE\_U2A>", "event\_type": "user\_agt", "forward\_to\_llm": false, "input": "What's the return policy for electronics?" }

Response includes `event_id: "evt_abc123"`, which we store in `self._event_links["query"]` for later.
{% endstep %}

{% step %}

### Retrieval starts (agent → memory)

Your agent queries the vector store. LlamaIndex fires `CBEventType.RETRIEVE` (start). We capture the retrieval query—often the same as the user question, but sometimes rewritten:

```python
def _handle_retrieve_start(self, payload: Dict[str, Any]) -> None:
    query = payload.get(EventPayload.QUERY_STR, "")
    self._log_debug(f"\nRETRIEVE START (agent→mem): {query}")

    self._send_event(
        event_name="retrieve",
        content=query,
        is_input=True,
        block_message="Retrieve input blocked by Aiceberg",
    )
```

Aiceberg sees this under the `AB_MONITORING_PROFILE_A2MEM` profile as `agt_mem` type.
{% endstep %}

{% step %}

### Retrieval ends (memory → agent)

The vector store returns matching chunks. LlamaIndex calls `on_event_end` with `CBEventType.RETRIEVE` and a list of nodes. We flatten those nodes into plain text:

```python
def _handle_retrieve_end(self, payload: Dict[str, Any]) -> None:
    nodes = payload.get(EventPayload.NODES, [])
    retrieved_content = self._extract_retrieved_content(nodes)

    self._log_debug(f"\nRETRIEVE END (mem→agent): {len(nodes)} documents, {len(retrieved_content)} chars")

    self._send_event(
        event_name="retrieve",
        content=retrieved_content,
        is_input=False,
        block_message="Retrieve output blocked by Aiceberg",
        link_to_start=True,
    )
```

The helper `_extract_retrieved_content` walks each node looking for `.text` or `.node.text` and joins them with newlines. Aiceberg gets the full concatenated context that will go into the LLM prompt.
{% endstep %}

{% step %}

### LLM starts (agent → LLM)

Now your agent builds a prompt from the system instructions, retrieved context, and user question. LlamaIndex fires `CBEventType.LLM` (start) with a `messages` list:

```python
def _handle_llm_start(self, payload: Dict[str, Any]) -> None:
    messages = payload.get(EventPayload.MESSAGES, [])
    self._log_debug(f"\nLLM START (agent→llm): {len(messages)} messages")

    self._latest_llm_output = ""

    prompt_text = self._extract_prompt_text(messages)
    if not prompt_text:
        return

    self._send_event(
        event_name="llm",
        content=prompt_text,
        is_input=True,
        block_message="LLM input blocked by Aiceberg",
    )
```

`_extract_prompt_text` iterates over message blocks, pulling out the text content and prefixing it with the role (system/user/assistant). The result is a single string showing exactly what goes to the model.
{% endstep %}

{% step %}

### LLM ends (LLM → agent)

The model responds. LlamaIndex calls `on_event_end` with `CBEventType.LLM`:

```python
def _handle_llm_end(self, payload: Dict[str, Any]) -> None:
    response = payload.get(EventPayload.RESPONSE, "")
    llm_answer = self._extract_llm_response(response)
    self._log_debug(f"\nLLM END (llm→agent): {llm_answer[:100]}...")

    self._latest_llm_output = llm_answer

    self._send_event(
        event_name="llm",
        content=llm_answer,
        is_input=False,
        block_message="LLM output blocked by Aiceberg",
        link_to_start=True,
    )
```

We extract the model's text (checking for `.message.blocks`, `.message.content`, or `.content` depending on the LLM integration), store it in `self._latest_llm_output`, and send it to Aiceberg linked to the LLM start event.
{% endstep %}

{% step %}

### Query ends (agent → user)

Finally, the agent wraps everything up and returns the answer to the user. LlamaIndex fires `CBEventType.QUERY` (end):

```python
def _handle_query_end(self, payload: Dict[str, Any]) -> None:
    response = payload.get(EventPayload.RESPONSE, "")
    final_answer = self._extract_query_response(response)

    self._log_debug(f"\nQUERY END (agent→user): {final_answer[:100]}...")

    self._send_event(
        event_name="query",
        content=final_answer,
        is_input=False,
        block_message="Query output blocked by Aiceberg",
        link_to_start=True,
    )

    # Sync LLM output if needed
    if self.profile_agent_llm:
        llm_output_to_sync = final_answer or self._latest_llm_output
        if llm_output_to_sync:
            self._send_event(
                event_name="llm",
                content=llm_output_to_sync,
                is_input=False,
                block_message="LLM output blocked by Aiceberg",
                link_to_start=True,
            )
```

This sends the final answer to the user and also syncs it as the LLM output to Aiceberg. The sync ensures that even if the agent post-processes the LLM response (formatting, citations, etc.), Aiceberg sees the final text the user receives.
{% endstep %}
{% endstepper %}

## The dashboard view

After this flow completes, you'll see three conversation pairs in Aiceberg:

1. User to Agent (profile: U2A)
   * Input: "What's the return policy for electronics?"
   * Output: "Electronics can be returned within 30 days with original packaging..."
2. Agent to Memory (profile: A2MEM)
   * Input: "What's the return policy for electronics?"
   * Output: \[concatenated text from retrieved docs]
3. Agent to LLM (profile: A2M)
   * Input: \[system instruction + context + query as one prompt]
   * Output: \[raw model response]

Each pair is linked by the `event_id`, so you can trace a single user question through the entire RAG pipeline.

## Key design choices

### Why only QUERY, RETRIEVE, and LLM?

These three cover the core RAG workflow: what the user asked, what context was fetched, and what the model said. LlamaIndex emits other events (`EMBEDDING`, `SYNTHESIZE`, etc.), but they're either redundant (synthesis is captured in QUERY end) or not directly relevant for conversation monitoring (embedding generation). If your team needs tool calls or sub-questions, you'd add handlers for `CBEventType.TOOL` or `CBEventType.SUB_QUESTION`.

### Why three separate profiles?

Each conversation type (user↔agent, agent↔memory, agent↔LLM) has different moderation needs. You might allow certain language from users but block it in retrieval results, or apply stricter policies to LLM inputs vs outputs. Separate profiles give you that flexibility without complicated conditional logic.

## What the code does

`_send_event` is the common path for all Aiceberg calls. It looks up the profile ID and event type from `_event_config`, builds the payload, calls `send_to_aiceberg`, checks for blocks, and stores the `event_id` for linking start/end pairs.

`send_to_aiceberg` does the HTTP POST to `base_url/eap/v0/event` with your API key. If there's no key in the environment, it returns `{"event_result": "passed"}` so local dev works without credentials. Network errors also return "passed" so monitoring failures don't break your agent.

`_raise_if_blocked` checks the response from Aiceberg. If `event_result` is "blocked" or "rejected", it raises `RuntimeError` with a descriptive message. This stops the agent immediately so you can catch the exception and show a safe fallback.

The extract helpers (`_extract_prompt_text`, `_extract_retrieved_content`, etc.) handle the messy details of LlamaIndex's payload shapes. Different LLM integrations return responses in different formats, so these functions check for various attributes and fallback to `str()` if needed.

## Quick setup (5 minutes)

Install deps (once):

```bash
pip install -r requirements.txt
```

Add environment variables (`.env` works well):

* `AICEBERG_API_KEY=Bearer ...`
* `AB_MONITORING_PROFILE_U2A=...` (query events)
* `AB_MONITORING_PROFILE_A2MEM=...` (retrieval events)
* `AB_MONITORING_PROFILE_A2M=...` (LLM events)

Register the handler:

```python
from llama_index.core import Settings
from llama_index.core.callbacks import CallbackManager
from llama_index_rag_engine.aiceberg_callback_monitor import SimpleAicebergHandler

handler = SimpleAicebergHandler(debug=True)
Settings.callback_manager = CallbackManager([handler])
```

Run your usual agent script (for example `python run_rag.py`).

Check the console for debug previews and confirm the events in the Aiceberg dashboard.

## Event flow at a glance

| LlamaIndex event       | Input we send   | Output we send              | Aiceberg type |
| ---------------------- | --------------- | --------------------------- | ------------- |
| `CBEventType.QUERY`    | User question   | Final answer                | `user_agt`    |
| `CBEventType.RETRIEVE` | Retrieval query | Flattened document snippets | `agt_mem`     |
| `CBEventType.LLM`      | Prompt blocks   | Model response text         | `agt_llm`     |

## Logging & observability

Startup banner shows credential detection and monitored event families.

Each event prints a short preview (message counts, doc totals, output length) when `debug=True`.

Successful sends log the number of characters delivered to Aiceberg.

Blocked or rejected events raise clear messages such as "LLM output blocked by Aiceberg".


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.aiceberg.ai/developers/agentic-ai/aiceberg-with-agentic-frameworks/llamaindex.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
