# Langchain

## Langchain Agents with Aiceberg

*This document explains how we monitor different event types across an Agentic workflow in Agents built using Langchain framework*

AicebergMiddleware provides real-time safety monitoring for your LangChain Agents. It tracks user inputs, LLM calls, and tool executions and memory to ensure safe and compliant agent behavior.

Simply add it to your LangChain Agents with:

`middleware=[AicebergMiddleware()]`

and get instant visibility into the safety of your conversational and reasoning workflows.

## What is middleware and how it exposes hooks in Langchain?

Langchain middleware is a feature that allows developers to **intercept and customize the agent's core loop**, which involves calling a language model and executing tools.

Middleware is used to control and customize agent execution at every step. Middleware provides a way to more tightly control what happens inside the agent.

The core agent loop involves calling a model, letting it choose tools to execute, and then finishing when it calls no more tools.

Middleware exposes hooks before and after different event types. You can build custom middleware by implementing hooks that can be run at specific points in the agent execution flow. There are two types of middleware: decorator-based and class-based.

This guide uses class-based middleware because it is more powerful for complex middleware with multiple hooks; decorator-based middleware is useful for a single-hook middleware. Please refer to the appendix for more info.

### Aiceberg as a middleware

Aiceberg middleware in LangChain acts as an advanced, real-time control layer within the agent execution flow, enabling security, compliance, and observability features for generative and Agentic AI systems. By plugging Aiceberg middleware into a LangChain agent, every prompt, tool call, and model response can be inspected and checked for policy violations, risks like prompt injection, and sensitive data leakage before reaching the model or returning to the user.

We use specific hooks for monitoring the respective event types in our custom AicebergMiddleware class.

| Hooks                            |                                                              Where | What events we monitor              |
| -------------------------------- | -----------------------------------------------------------------: | ----------------------------------- |
| before\_agent                    |                          Before agent starts (once per invocation) | User → Agent (forward)              |
| after\_agent                     |                  After agent completes (up to once per invocation) | Agent → User (backward)             |
| wrap\_model\_call                |                                             Around each model call | Agent ↔ LLM (forward & backward)    |
| wrap\_tool\_call                 |                                              Around each tool call | Agent ↔ Tools (forward & backward)  |
| before\_model / wrap\_tool\_call | Before model invocation starts / around the read/write memory tool | Agent ↔ Memory (forward & backward) |
| wrap\_tool\_call                 |                                         Around sub-agent as a tool | Agent → Agent                       |

P.S. LangChain generally offers two types of hooks: Node-style hooks and Wrap-style hooks. We use Wrap-style hooks—specifically `wrap_model_call` and `wrap_tool_call`—to observe the Agent-to-LLM and Agent-to-Tool event types. Wrap-style hooks allow intercepting execution with full control over handler calls (including the ability to block calls). See the appendix for a detailed distinction.

***

## Events Aiceberg Monitors

### User-to-Agent (user\_agt)

The agent workflow is instrumented with two monitoring hooks — `before_agent` and `after_agent` — which capture **User-to-Agent (U2A)** events. These events represent both the **initial user query sent to the agent** and the **final agent response** produced after completing the full agentic reasoning process. Both hooks act as **gates** in the agent pipeline. If either hook detects a signal or receives a flagged response from Aiceberg, the entire agent flow can be blocked immediately.

#### Hook: before\_agent

Monitors the user query being sent to the agent by forwarding it to Aiceberg. The Aiceberg response contains an `event_id` that is stored for correlation with the later final agent response.

Example:

```python
class LoggingMiddleware(AgentMiddleware):

    @hook_config(can_jump_to=["end"])
    def before_agent(self, state, runtime):
        print("____________Monitoring U2A event type: U2A forward___________")
        # Extract the user’s message from the current conversation state
        msg = state['messages'][0].content
        # Send the user message to Aiceberg for monitoring
        response = aiceberg_monitor(is_input=True, content=msg, event_type="user_agt")
        _event_store.event_id = response.json().get("event_id")
        flagged_result = handle_aiceberg_flagged_event(response)
        # If flagged_result is not None, stop the agent and return the safe response
        if flagged_result:
            return flagged_result

        return None
```

#### Hook: after\_agent

Monitors the agent’s response after it’s generated, linking it to the earlier user query using the stored event ID. If Aiceberg flags the content, the agent flow is halted or the output is redacted.

Example:

```python
@hook_config(can_jump_to=["end"])
def after_agent(self, state, runtime):
    print("___________Monitoring U2A event type: U2A backward___________")
    event_id = getattr(_event_store, "event_id", None)
    # Get the assistant's last message content
    content = state['messages'][-1].content
    # send monitoring event to Aiceberg
    response = aiceberg_monitor(
        is_input=False,
        content=content,
        event_type="agt_user",
        link_event_id=str(event_id)
    )
    # If flagged_result is not None, stop the agent and return the safe response
    flagged_result = handle_aiceberg_flagged_event(response)
    if flagged_result:
        return flagged_result

    return None
```

***

### Agent-to-LLM (agt\_llm)

The agent flow is instrumented with the `wrap_model_call` hook, which monitors the entire **Agent-to-LLM (A2M)** interaction lifecycle. This hook captures both the **request sent from the agent** to the model and the **response** returned from the model.

#### Hook: wrap\_model\_call

Monitors every LLM call initiated by the Agent, as well as the corresponding model response returned from the LLM. Interactions are sanitized (e.g., removing usage parameters) before sending to Aiceberg. If Aiceberg flags content, the call or response can be blocked or replaced with a safe message.

Example (abbreviated):

```python
def wrap_model_call(self, request: ModelRequest, handler: Callable[[ModelRequest], ModelResponse]) -> ModelResponse:
    """
    Monitors and wraps the model call (Agent ↔ LLM) to track A2M (agent-to-model)
    events via Aiceberg. Logs both forward (request) and backward (response) flows,
    and checks for flagged events using Aiceberg moderation.
    """

    # A2M Forward: Agent → LLM
    print("_____________Monitoring A2M event type: A2M forward___________")
    write_log("A2M_forward", str(request))

    # Convert request to a dictionary and remove internal/unnecessary fields
    data = request.__dict__.copy()
    for key in ["model", "response_format", "state", "runtime", "model_settings"]:
        data.pop(key, None)

    # Clean message objects before sending to Aiceberg
    cleaned_messages = []
    for msg in data.get("messages", []):
        msg_type = type(msg).__name__
        if msg_type == "HumanMessage":
            cleaned_messages.append({
                "type": msg_type,
                "content": safe_get(msg, "content", "")
            })
        elif msg_type == "AIMessage":
            cleaned_messages.append({
                "type": msg_type,
                "content": safe_get(msg, "content", ""),
                "tool_calls": safe_get(msg, "tool_calls", [])
            })
        elif msg_type == "ToolMessage":
            if not isinstance(msg, dict):
                msg = {k: v for k, v in msg.__dict__.items() if not k.startswith("_")}
            msg["type"] = msg_type
            cleaned_messages.append(msg)
        else:
            cleaned_messages.append({
                "type": msg_type,
                "content": safe_get(msg, "content", "")
            })

    data["messages"] = cleaned_messages
    serialized_data = json.dumps(data, default=str, indent=2)

    # Send to Aiceberg for monitoring (forward event)
    aiceberg_response = aiceberg_monitor(
        is_input=True,
        content=str(serialized_data),
        event_type="agt_llm"  # Agent → LLM event
    )
    print("AICEBERG RESPONSE:", aiceberg_response.json())

    # Handle flagged events before model call
    flagged_result = handle_aiceberg_flagged_event(aiceberg_response)
    if flagged_result:
        # Return a safe placeholder model response if input was flagged
        return ModelResponse(
            result=[AIMessage(content="I cannot process requests containing inappropriate content. Please rephrase your request")],
            structured_response=None,
        )

    # Store event_id for backward linking
    event_id = aiceberg_response.json().get("event_id")

    # Proceed with the model call
    response = handler(request)

    # A2M Backward: LLM → Agent
    print("____________Monitoring A2M event type: A2M backward___________")

    minimal_response = []
    for msg in response.result:
        minimal_response.append({
            "content": msg.content,
            "tool_calls": [tc for tc in msg.tool_calls]
        })

    # Send model output to Aiceberg for post-check
    aiceberg_response = aiceberg_monitor(
        is_input=False,
        content=str(minimal_response),
        event_type="agt_llm",
        link_event_id=event_id
    )

    # Handle flagged output events
    flagged_result = handle_aiceberg_flagged_event(aiceberg_response)
    if flagged_result:
        return ModelResponse(
            result=[AIMessage(content="I cannot process requests containing inappropriate content. Please rephrase your request.")],
            structured_response=None,
        )

    # Return the original model response if everything is safe
    return response
```

***

### Agent-to-Tool (agt\_tool)

Every tool invocation passes through a monitoring wrapper (`wrap_tool_call`) that integrates with Aiceberg. Both the tool call and the tool output are tracked and sent to Aiceberg.

#### Hook: wrap\_tool\_call

Monitors every tool call made by the Agent and the corresponding tool output. If Aiceberg flags the tool input or output, the middleware can raise an exception and block execution.

Example:

```python
def wrap_tool_call(
    self,
    request: ToolCallRequest,
    handler: Callable[[ToolCallRequest], ToolMessage | Command],
) -> ToolMessage | Command:
    print("___________Monitoring A2T event type: TOOL_CALL___________")

    try:
        # Forward tool call to Aiceberg for monitoring
        aiceberg_response = aiceberg_monitor(
            is_input=True,
            content=str(request.tool_call),
            event_type="agt_tool"
        )
        event_id = aiceberg_response.json().get("event_id")

        # Raise exception if flagged by Aiceberg
        if aiceberg_response.json().get("event_result") == "flagged":
            raise RuntimeError("Tool execution blocked: flagged content detected by Aiceberg")

        # Proceed with the tool call if safe
        result = handler(request)

        # Log and send tool output to Aiceberg for monitoring
        aiceberg_response = aiceberg_monitor(
            is_input=False,
            content=str(result),
            event_type="agt_tool",
            link_event_id=event_id
        )

        event_id = aiceberg_response.json().get("event_id")
        if aiceberg_response.json().get("event_result") == "flagged":
            raise RuntimeError("Tool execution blocked: flagged content detected by Aiceberg")

        return result

    except Exception as e:
        print(f"Tool failed: {e}")
        raise
```

Can the tool call be blocked?

* Yes. By raising an exception when Aiceberg flags input/output, the middleware can block a tool call mid-execution.

Why tool blocking is not done by default:

* Blocking a tool mid-flight can disrupt the agent’s reasoning flow because the LLM expects tool outputs to continue processing. Without a proper result, you may: (1) send an error message as the tool result (confuses the LLM), (2) send empty/fake data (breaks logic), or (3) stop the entire agent (user gets incomplete response).

Recommended approach:

* Log all tool calls and outputs for audit visibility.
* If a tool produces unsafe content, handle it after execution (e.g., when the LLM processes the result or before displaying it to the user).
* Block tools selectively when the use case demands it (e.g., preventing unsafe DB writes). If blocking, ensure the aftermath is handled properly.

***

### Agent-to-Memory (agt\_mem)

LangChain’s agent state manages short-term memory, enabling the agent to retain and access context across conversation turns. The state is persisted with a checkpointer and updated automatically. You can monitor or manipulate this short-term memory using hooks or tools.

#### 1. Accessing Short-Term Memory with `@before_model`

Use the `@before_model` hook to inspect/modify the agent’s memory before the model is called. The state (including `messages`) is passed to this hook.

Example — trimming messages before model invocation:

```python
@before_model
def trim_messages(state: AgentState, runtime: Runtime) -> dict[str, Any] | None:
    """Keep only the last few messages to fit the model's context window."""
    messages = state["messages"]

    # Only trim if there are more than 3 messages
    if len(messages) <= 3:
        return None  # No changes needed

    first_msg = messages[0]
    # Keep the last few messages for context
    recent_messages = messages[-3:] if len(messages) % 2 == 0 else messages[-4:]
    new_messages = [first_msg] + recent_messages

    # Return the updated message list to overwrite memory
    return {
        "messages": [
            RemoveMessage(id=REMOVE_ALL_MESSAGES),
            *new_messages
        ]
    }
```

Example — monitoring all conversation messages:

```python
@hook_config(can_jump_to=["end"])
def before_model(self, state, runtime):
    print("____________Monitoring All Conversation Messages as short term memory ___________")
    # Extract all messages' contents as a combined string
    all_messages = "\n".join(msg.content for msg in state['messages'])

    # Send the combined message history to Aiceberg for monitoring
    response = aiceberg_monitor(
        is_input=True,
        content=all_messages,
        event_type="agt_mem"
    )
    _event_store.event_id = response.json().get("event_id")
    flagged_result = handle_aiceberg_flagged_event(response)

    # If flagged_result is not None, stop model step and return the safe response
    if flagged_result:
        return flagged_result

    return None
```

Aiceberg can:

* Inspect short-term memory before the model uses it.
* Detect sensitive or policy-violating data.
* Detect and redact sensitive info (PII) before it reaches the model.

#### 2. Accessing Short-Term Memory via Tools

Tools can read/write short-term memory via `runtime.state`.

Example — reading from short-term memory:

```python
@tool
def get_user_info(runtime: ToolRuntime) -> str:
    """Retrieve user information from short-term memory."""
    user_id = runtime.state["user_id"]
    return "User is John Smith" if user_id == "user_123" else "Unknown user"
```

Example — writing to short-term memory:

```python
@tool
def update_user_info(runtime: ToolRuntime[CustomContext, CustomState]) -> Command:
    """Update user info and append to the message history."""
    user_id = runtime.context.user_id
    name = "John Smith" if user_id == "user_123" else "Unknown user"

    return Command(update={
        "user_name": name,
        # Update the short-term message history
        "messages": [
            ToolMessage(
                "Successfully looked up user information",
                tool_call_id=runtime.tool_call_id
            )
        ]
    })
```

Monitoring via `wrap_tool_call`:

* Aiceberg integrates with `wrap_tool_call` to monitor inputs and outputs of tool calls, which includes data read from / written to memory. This provides visibility into state modifications and allows detection of sensitive data exposure.

***

### Agent-to-Agent

Agent-to-agent communication occurs through two patterns: Tool Calling (supervisor agent calls sub-agents as tools) and Handoffs (control passes directly to another agent).

#### Tool Calling

A sub-agent can be called as a tool. The `wrap_tool_call` hook monitors the incoming request (what is passed to the sub-agent) and the returned response, enabling policy enforcement on the data exchanged.

Example of calling a sub-agent via a tool:

```python
from langchain.tools import tool
from langchain.agents import create_agent

subagent1 = create_agent(model="...", tools=[...])

@tool(
    "subagent1_name",
    description="subagent1_description"
)
def call_subagent1(query: str):
    result = subagent1.invoke({
        "messages": [{"role": "user", "content": query}]
    })
    return result["messages"][-1].content

agent = create_agent(model="...", tools=[call_subagent1])
```

#### Handoffs

Handoffs refer to passing control and state to another agent. The full implementation is still evolving in LangChain, but conceptual docs and LangGraph components indicate handoffs use a `Command` object to transfer control and state. Because state is updated and available to the `before_model` hook, Aiceberg can monitor or redact information passed during handoffs via that hook.

***

## Quick setup

{% stepper %}
{% step %}

### Install dependencies (once)

Run:

```
pip install -r requirements.txt
```

{% endstep %}

{% step %}

### Add environment variables (`.env`)

Example entries:

```
AICEBERG_API_URL= ...
DEFAULT_HEADERS={"Content-Type": "application/json"}
OPENAI_API_KEY=sk-...
PROFILE_ID=...
```

{% endstep %}

{% step %}

### Register the Aiceberg middleware

Register AicebergMiddleware in your LangChain Agent. Example Weather Agent:

```python
from dataclasses import dataclass
from langchain.tools import tool, ToolRuntime
from typing import Any
from langchain.agents import create_agent
from langchain.chat_models import init_chat_model
from langchain.agents.middleware import AgentMiddleware, ModelRequest, ModelResponse
from typing import Callable
from langgraph.checkpoint.memory import InMemorySaver
from langchain.agents.middleware import AgentMiddleware, AgentState
from langgraph.runtime import Runtime
from typing import Any
from langchain.tools.tool_node import ToolCallRequest
from langchain_core.messages import ToolMessage
from langgraph.types import Command
from typing import Callable
import requests
import threading
import json
import os
from dataclasses import asdict
from monitoring import write_log
import re
from aiceberg_middleware import *

SYSTEM_PROMPT = """You are an expert weather forecaster, who speaks in puns.

You have access to two tools:

- get_weather_for_location: use this to get the weather for a specific location
- get_user_location: use this to get the user's location

If a user asks you for the weather, make sure you know the location. If you can tell from the question that they mean wherever they are, use the get_user_location tool to find their location."""

@tool
def get_weather_for_location(city: str) -> str:
    """Get weather for a given city."""
    return f"It's always sunny in {city}!"

@dataclass
class Context:
    """Custom runtime context schema."""
    user_id: str

@tool
def get_user_location(runtime: ToolRuntime[Context, Any]) -> str:
    """Retrieve user information based on user ID."""
    user_id = runtime.context.user_id
    return "Florida" if user_id == "1" else "SF"

model = init_chat_model(
    "gpt-4.1-mini",
    temperature=0.5,
    timeout=10,
    max_tokens=1000
)

agent = create_agent(
    model=model,
    system_prompt=SYSTEM_PROMPT,
    tools=[get_user_location, get_weather_for_location],
    context_schema=Context,
    middleware=[AicebergMiddleware()],
    #checkpointer=InMemorySaver(),
)

# `thread_id` is a unique identifier for a given conversation.
config = {"configurable": {"thread_id": "1"}}

response = agent.invoke(
    {"messages": [{"role": "user", "content": "what is the weather outside?"}]},
    config=config,
    context=Context(user_id="abc")
)
```

Run the agent and observe events on the Aiceberg dashboard.
{% endstep %}
{% endstepper %}

***

## Appendix

<details>

<summary>Detailed distinction between Node-Style hooks and Wrap-Style hooks</summary>

The main difference between node-style hooks and wrap-style hooks in LangChain centers on their execution model and the level of control they provide over the agent's operations.

Node-Style Hooks

* Run at specific, predefined points in the agent’s execution flow (e.g., before the agent starts, before or after model calls, or after the agent completes).
* Execute sequentially and are typically used for tasks like logging, validation, or updating state.
* Invocation order is fixed: before hooks run first to last, after hooks run last to first.

Wrap-Style Hooks

* Intercept the execution of specific calls, such as model calls (`wrap_model_call`) or tool calls (`wrap_tool_call`).
* Provide full control over when—or if—the underlying handler is called, allowing execution to be blocked, retried, or modified dynamically.
* These hooks nest like function calls, enabling advanced control flow like retries, fallback mechanisms, or blocking malicious inputs.

We chose wrap-style hooks (`wrap_model_call` and `wrap_tool_call`) to observe and control Agent-to-LLM and Agent-to-Tool interactions because they allow full interception of these calls. Node-style hooks are well suited for observing User-to-Agent and Agent-to-User events where simple observation suffices.

</details>

<details>

<summary>Detailed explanation of why we chose class-based middleware over decorator-based</summary>

Class-based middleware provides a centralized structure to manage multiple lifecycle hooks—such as `before_agent`, `before_model`, `wrap_model_call`, `after_agent`, `wrap_tool_call`, etc.—within one class. This is powerful for orchestrating multiple hooks cohesively and maintaining shared middleware state or configuration across those hooks.

Decorator-based middleware is simpler for single-hook logic but can become fragmented when multiple hook points and internal state management are required.

Using a single class-based middleware to implement all hooks provides:

* A unified and centralized way to monitor and control different event types across the entire agent execution flow.
* Ease of integration: create an instance of the middleware class and pass it to `middleware` when creating the agent.
* Automatic invocation of lifecycle hooks by LangChain during agent execution.

</details>

***


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.aiceberg.ai/developers/agentic-ai/aiceberg-with-agentic-frameworks/langchain.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
