Langchain

Langchain Agents with Aiceberg

This document explains how we monitor different event types across an Agentic workflow in Agents built using Langchain framework

AicebergMiddleware provides real-time safety monitoring for your LangChain Agents. It tracks user inputs, LLM calls, and tool executions and memory to ensure safe and compliant agent behavior.

Simply add it to your LangChain Agents with:

middleware=[AicebergMiddleware()]

and get instant visibility into the safety of your conversational and reasoning workflows.

What is middleware and how it exposes hooks in Langchain?

Langchain middleware is a feature that allows developers to intercept and customize the agent's core loop, which involves calling a language model and executing tools.

Middleware is used to control and customize agent execution at every step. Middleware provides a way to more tightly control what happens inside the agent.

The core agent loop involves calling a model, letting it choose tools to execute, and then finishing when it calls no more tools.

Middleware exposes hooks before and after different event types. You can build custom middleware by implementing hooks that can be run at specific points in the agent execution flow. There are two types of middleware: decorator-based and class-based.

This guide uses class-based middleware because it is more powerful for complex middleware with multiple hooks; decorator-based middleware is useful for a single-hook middleware. Please refer to the appendix for more info.

Aiceberg as a middleware

Aiceberg middleware in LangChain acts as an advanced, real-time control layer within the agent execution flow, enabling security, compliance, and observability features for generative and Agentic AI systems. By plugging Aiceberg middleware into a LangChain agent, every prompt, tool call, and model response can be inspected and checked for policy violations, risks like prompt injection, and sensitive data leakage before reaching the model or returning to the user.

We use specific hooks for monitoring the respective event types in our custom AicebergMiddleware class.

Hooks
Where
What events we monitor

before_agent

Before agent starts (once per invocation)

User → Agent (forward)

after_agent

After agent completes (up to once per invocation)

Agent → User (backward)

wrap_model_call

Around each model call

Agent ↔ LLM (forward & backward)

wrap_tool_call

Around each tool call

Agent ↔ Tools (forward & backward)

before_model / wrap_tool_call

Before model invocation starts / around the read/write memory tool

Agent ↔ Memory (forward & backward)

wrap_tool_call

Around sub-agent as a tool

Agent → Agent

P.S. LangChain generally offers two types of hooks: Node-style hooks and Wrap-style hooks. We use Wrap-style hooks—specifically wrap_model_call and wrap_tool_call—to observe the Agent-to-LLM and Agent-to-Tool event types. Wrap-style hooks allow intercepting execution with full control over handler calls (including the ability to block calls). See the appendix for a detailed distinction.


Events Aiceberg Monitors

User-to-Agent (user_agt)

The agent workflow is instrumented with two monitoring hooks — before_agent and after_agent — which capture User-to-Agent (U2A) events. These events represent both the initial user query sent to the agent and the final agent response produced after completing the full agentic reasoning process. Both hooks act as gates in the agent pipeline. If either hook detects a signal or receives a flagged response from Aiceberg, the entire agent flow can be blocked immediately.

Hook: before_agent

Monitors the user query being sent to the agent by forwarding it to Aiceberg. The Aiceberg response contains an event_id that is stored for correlation with the later final agent response.

Example:

class LoggingMiddleware(AgentMiddleware):

    @hook_config(can_jump_to=["end"])
    def before_agent(self, state, runtime):
        print("____________Monitoring U2A event type: U2A forward___________")
        # Extract the user’s message from the current conversation state
        msg = state['messages'][0].content
        # Send the user message to Aiceberg for monitoring
        response = aiceberg_monitor(is_input=True, content=msg, event_type="user_agt")
        _event_store.event_id = response.json().get("event_id")
        flagged_result = handle_aiceberg_flagged_event(response)
        # If flagged_result is not None, stop the agent and return the safe response
        if flagged_result:
            return flagged_result

        return None

Hook: after_agent

Monitors the agent’s response after it’s generated, linking it to the earlier user query using the stored event ID. If Aiceberg flags the content, the agent flow is halted or the output is redacted.

Example:

@hook_config(can_jump_to=["end"])
def after_agent(self, state, runtime):
    print("___________Monitoring U2A event type: U2A backward___________")
    event_id = getattr(_event_store, "event_id", None)
    # Get the assistant's last message content
    content = state['messages'][-1].content
    # send monitoring event to Aiceberg
    response = aiceberg_monitor(
        is_input=False,
        content=content,
        event_type="agt_user",
        link_event_id=str(event_id)
    )
    # If flagged_result is not None, stop the agent and return the safe response
    flagged_result = handle_aiceberg_flagged_event(response)
    if flagged_result:
        return flagged_result

    return None

Agent-to-LLM (agt_llm)

The agent flow is instrumented with the wrap_model_call hook, which monitors the entire Agent-to-LLM (A2M) interaction lifecycle. This hook captures both the request sent from the agent to the model and the response returned from the model.

Hook: wrap_model_call

Monitors every LLM call initiated by the Agent, as well as the corresponding model response returned from the LLM. Interactions are sanitized (e.g., removing usage parameters) before sending to Aiceberg. If Aiceberg flags content, the call or response can be blocked or replaced with a safe message.

Example (abbreviated):

def wrap_model_call(self, request: ModelRequest, handler: Callable[[ModelRequest], ModelResponse]) -> ModelResponse:
    """
    Monitors and wraps the model call (Agent ↔ LLM) to track A2M (agent-to-model)
    events via Aiceberg. Logs both forward (request) and backward (response) flows,
    and checks for flagged events using Aiceberg moderation.
    """

    # A2M Forward: Agent → LLM
    print("_____________Monitoring A2M event type: A2M forward___________")
    write_log("A2M_forward", str(request))

    # Convert request to a dictionary and remove internal/unnecessary fields
    data = request.__dict__.copy()
    for key in ["model", "response_format", "state", "runtime", "model_settings"]:
        data.pop(key, None)

    # Clean message objects before sending to Aiceberg
    cleaned_messages = []
    for msg in data.get("messages", []):
        msg_type = type(msg).__name__
        if msg_type == "HumanMessage":
            cleaned_messages.append({
                "type": msg_type,
                "content": safe_get(msg, "content", "")
            })
        elif msg_type == "AIMessage":
            cleaned_messages.append({
                "type": msg_type,
                "content": safe_get(msg, "content", ""),
                "tool_calls": safe_get(msg, "tool_calls", [])
            })
        elif msg_type == "ToolMessage":
            if not isinstance(msg, dict):
                msg = {k: v for k, v in msg.__dict__.items() if not k.startswith("_")}
            msg["type"] = msg_type
            cleaned_messages.append(msg)
        else:
            cleaned_messages.append({
                "type": msg_type,
                "content": safe_get(msg, "content", "")
            })

    data["messages"] = cleaned_messages
    serialized_data = json.dumps(data, default=str, indent=2)

    # Send to Aiceberg for monitoring (forward event)
    aiceberg_response = aiceberg_monitor(
        is_input=True,
        content=str(serialized_data),
        event_type="agt_llm"  # Agent → LLM event
    )
    print("AICEBERG RESPONSE:", aiceberg_response.json())

    # Handle flagged events before model call
    flagged_result = handle_aiceberg_flagged_event(aiceberg_response)
    if flagged_result:
        # Return a safe placeholder model response if input was flagged
        return ModelResponse(
            result=[AIMessage(content="I cannot process requests containing inappropriate content. Please rephrase your request")],
            structured_response=None,
        )

    # Store event_id for backward linking
    event_id = aiceberg_response.json().get("event_id")

    # Proceed with the model call
    response = handler(request)

    # A2M Backward: LLM → Agent
    print("____________Monitoring A2M event type: A2M backward___________")

    minimal_response = []
    for msg in response.result:
        minimal_response.append({
            "content": msg.content,
            "tool_calls": [tc for tc in msg.tool_calls]
        })

    # Send model output to Aiceberg for post-check
    aiceberg_response = aiceberg_monitor(
        is_input=False,
        content=str(minimal_response),
        event_type="agt_llm",
        link_event_id=event_id
    )

    # Handle flagged output events
    flagged_result = handle_aiceberg_flagged_event(aiceberg_response)
    if flagged_result:
        return ModelResponse(
            result=[AIMessage(content="I cannot process requests containing inappropriate content. Please rephrase your request.")],
            structured_response=None,
        )

    # Return the original model response if everything is safe
    return response

Agent-to-Tool (agt_tool)

Every tool invocation passes through a monitoring wrapper (wrap_tool_call) that integrates with Aiceberg. Both the tool call and the tool output are tracked and sent to Aiceberg.

Hook: wrap_tool_call

Monitors every tool call made by the Agent and the corresponding tool output. If Aiceberg flags the tool input or output, the middleware can raise an exception and block execution.

Example:

def wrap_tool_call(
    self,
    request: ToolCallRequest,
    handler: Callable[[ToolCallRequest], ToolMessage | Command],
) -> ToolMessage | Command:
    print("___________Monitoring A2T event type: TOOL_CALL___________")

    try:
        # Forward tool call to Aiceberg for monitoring
        aiceberg_response = aiceberg_monitor(
            is_input=True,
            content=str(request.tool_call),
            event_type="agt_tool"
        )
        event_id = aiceberg_response.json().get("event_id")

        # Raise exception if flagged by Aiceberg
        if aiceberg_response.json().get("event_result") == "flagged":
            raise RuntimeError("Tool execution blocked: flagged content detected by Aiceberg")

        # Proceed with the tool call if safe
        result = handler(request)

        # Log and send tool output to Aiceberg for monitoring
        aiceberg_response = aiceberg_monitor(
            is_input=False,
            content=str(result),
            event_type="agt_tool",
            link_event_id=event_id
        )

        event_id = aiceberg_response.json().get("event_id")
        if aiceberg_response.json().get("event_result") == "flagged":
            raise RuntimeError("Tool execution blocked: flagged content detected by Aiceberg")

        return result

    except Exception as e:
        print(f"Tool failed: {e}")
        raise

Can the tool call be blocked?

  • Yes. By raising an exception when Aiceberg flags input/output, the middleware can block a tool call mid-execution.

Why tool blocking is not done by default:

  • Blocking a tool mid-flight can disrupt the agent’s reasoning flow because the LLM expects tool outputs to continue processing. Without a proper result, you may: (1) send an error message as the tool result (confuses the LLM), (2) send empty/fake data (breaks logic), or (3) stop the entire agent (user gets incomplete response).

Recommended approach:

  • Log all tool calls and outputs for audit visibility.

  • If a tool produces unsafe content, handle it after execution (e.g., when the LLM processes the result or before displaying it to the user).

  • Block tools selectively when the use case demands it (e.g., preventing unsafe DB writes). If blocking, ensure the aftermath is handled properly.


Agent-to-Memory (agt_mem)

LangChain’s agent state manages short-term memory, enabling the agent to retain and access context across conversation turns. The state is persisted with a checkpointer and updated automatically. You can monitor or manipulate this short-term memory using hooks or tools.

1. Accessing Short-Term Memory with @before_model

Use the @before_model hook to inspect/modify the agent’s memory before the model is called. The state (including messages) is passed to this hook.

Example — trimming messages before model invocation:

@before_model
def trim_messages(state: AgentState, runtime: Runtime) -> dict[str, Any] | None:
    """Keep only the last few messages to fit the model's context window."""
    messages = state["messages"]

    # Only trim if there are more than 3 messages
    if len(messages) <= 3:
        return None  # No changes needed

    first_msg = messages[0]
    # Keep the last few messages for context
    recent_messages = messages[-3:] if len(messages) % 2 == 0 else messages[-4:]
    new_messages = [first_msg] + recent_messages

    # Return the updated message list to overwrite memory
    return {
        "messages": [
            RemoveMessage(id=REMOVE_ALL_MESSAGES),
            *new_messages
        ]
    }

Example — monitoring all conversation messages:

@hook_config(can_jump_to=["end"])
def before_model(self, state, runtime):
    print("____________Monitoring All Conversation Messages as short term memory ___________")
    # Extract all messages' contents as a combined string
    all_messages = "\n".join(msg.content for msg in state['messages'])

    # Send the combined message history to Aiceberg for monitoring
    response = aiceberg_monitor(
        is_input=True,
        content=all_messages,
        event_type="agt_mem"
    )
    _event_store.event_id = response.json().get("event_id")
    flagged_result = handle_aiceberg_flagged_event(response)

    # If flagged_result is not None, stop model step and return the safe response
    if flagged_result:
        return flagged_result

    return None

Aiceberg can:

  • Inspect short-term memory before the model uses it.

  • Detect sensitive or policy-violating data.

  • Detect and redact sensitive info (PII) before it reaches the model.

2. Accessing Short-Term Memory via Tools

Tools can read/write short-term memory via runtime.state.

Example — reading from short-term memory:

@tool
def get_user_info(runtime: ToolRuntime) -> str:
    """Retrieve user information from short-term memory."""
    user_id = runtime.state["user_id"]
    return "User is John Smith" if user_id == "user_123" else "Unknown user"

Example — writing to short-term memory:

@tool
def update_user_info(runtime: ToolRuntime[CustomContext, CustomState]) -> Command:
    """Update user info and append to the message history."""
    user_id = runtime.context.user_id
    name = "John Smith" if user_id == "user_123" else "Unknown user"

    return Command(update={
        "user_name": name,
        # Update the short-term message history
        "messages": [
            ToolMessage(
                "Successfully looked up user information",
                tool_call_id=runtime.tool_call_id
            )
        ]
    })

Monitoring via wrap_tool_call:

  • Aiceberg integrates with wrap_tool_call to monitor inputs and outputs of tool calls, which includes data read from / written to memory. This provides visibility into state modifications and allows detection of sensitive data exposure.


Agent-to-Agent

Agent-to-agent communication occurs through two patterns: Tool Calling (supervisor agent calls sub-agents as tools) and Handoffs (control passes directly to another agent).

Tool Calling

A sub-agent can be called as a tool. The wrap_tool_call hook monitors the incoming request (what is passed to the sub-agent) and the returned response, enabling policy enforcement on the data exchanged.

Example of calling a sub-agent via a tool:

from langchain.tools import tool
from langchain.agents import create_agent

subagent1 = create_agent(model="...", tools=[...])

@tool(
    "subagent1_name",
    description="subagent1_description"
)
def call_subagent1(query: str):
    result = subagent1.invoke({
        "messages": [{"role": "user", "content": query}]
    })
    return result["messages"][-1].content

agent = create_agent(model="...", tools=[call_subagent1])

Handoffs

Handoffs refer to passing control and state to another agent. The full implementation is still evolving in LangChain, but conceptual docs and LangGraph components indicate handoffs use a Command object to transfer control and state. Because state is updated and available to the before_model hook, Aiceberg can monitor or redact information passed during handoffs via that hook.


Quick setup

1

Install dependencies (once)

Run:

pip install -r requirements.txt
2

Add environment variables (.env)

Example entries:

AICEBERG_API_URL= ...
DEFAULT_HEADERS={"Content-Type": "application/json"}
OPENAI_API_KEY=sk-...
PROFILE_ID=...
3

Register the Aiceberg middleware

Register AicebergMiddleware in your LangChain Agent. Example Weather Agent:

from dataclasses import dataclass
from langchain.tools import tool, ToolRuntime
from typing import Any
from langchain.agents import create_agent
from langchain.chat_models import init_chat_model
from langchain.agents.middleware import AgentMiddleware, ModelRequest, ModelResponse
from typing import Callable
from langgraph.checkpoint.memory import InMemorySaver
from langchain.agents.middleware import AgentMiddleware, AgentState
from langgraph.runtime import Runtime
from typing import Any
from langchain.tools.tool_node import ToolCallRequest
from langchain_core.messages import ToolMessage
from langgraph.types import Command
from typing import Callable
import requests
import threading
import json
import os
from dataclasses import asdict
from monitoring import write_log
import re
from aiceberg_middleware import *

SYSTEM_PROMPT = """You are an expert weather forecaster, who speaks in puns.

You have access to two tools:

- get_weather_for_location: use this to get the weather for a specific location
- get_user_location: use this to get the user's location

If a user asks you for the weather, make sure you know the location. If you can tell from the question that they mean wherever they are, use the get_user_location tool to find their location."""

@tool
def get_weather_for_location(city: str) -> str:
    """Get weather for a given city."""
    return f"It's always sunny in {city}!"

@dataclass
class Context:
    """Custom runtime context schema."""
    user_id: str

@tool
def get_user_location(runtime: ToolRuntime[Context, Any]) -> str:
    """Retrieve user information based on user ID."""
    user_id = runtime.context.user_id
    return "Florida" if user_id == "1" else "SF"

model = init_chat_model(
    "gpt-4.1-mini",
    temperature=0.5,
    timeout=10,
    max_tokens=1000
)

agent = create_agent(
    model=model,
    system_prompt=SYSTEM_PROMPT,
    tools=[get_user_location, get_weather_for_location],
    context_schema=Context,
    middleware=[AicebergMiddleware()],
    #checkpointer=InMemorySaver(),
)

# `thread_id` is a unique identifier for a given conversation.
config = {"configurable": {"thread_id": "1"}}

response = agent.invoke(
    {"messages": [{"role": "user", "content": "what is the weather outside?"}]},
    config=config,
    context=Context(user_id="abc")
)

Run the agent and observe events on the Aiceberg dashboard.


Appendix

Detailed distinction between Node-Style hooks and Wrap-Style hooks

The main difference between node-style hooks and wrap-style hooks in LangChain centers on their execution model and the level of control they provide over the agent's operations.

Node-Style Hooks

  • Run at specific, predefined points in the agent’s execution flow (e.g., before the agent starts, before or after model calls, or after the agent completes).

  • Execute sequentially and are typically used for tasks like logging, validation, or updating state.

  • Invocation order is fixed: before hooks run first to last, after hooks run last to first.

Wrap-Style Hooks

  • Intercept the execution of specific calls, such as model calls (wrap_model_call) or tool calls (wrap_tool_call).

  • Provide full control over when—or if—the underlying handler is called, allowing execution to be blocked, retried, or modified dynamically.

  • These hooks nest like function calls, enabling advanced control flow like retries, fallback mechanisms, or blocking malicious inputs.

We chose wrap-style hooks (wrap_model_call and wrap_tool_call) to observe and control Agent-to-LLM and Agent-to-Tool interactions because they allow full interception of these calls. Node-style hooks are well suited for observing User-to-Agent and Agent-to-User events where simple observation suffices.

Detailed explanation of why we chose class-based middleware over decorator-based

Class-based middleware provides a centralized structure to manage multiple lifecycle hooks—such as before_agent, before_model, wrap_model_call, after_agent, wrap_tool_call, etc.—within one class. This is powerful for orchestrating multiple hooks cohesively and maintaining shared middleware state or configuration across those hooks.

Decorator-based middleware is simpler for single-hook logic but can become fragmented when multiple hook points and internal state management are required.

Using a single class-based middleware to implement all hooks provides:

  • A unified and centralized way to monitor and control different event types across the entire agent execution flow.

  • Ease of integration: create an instance of the middleware class and pass it to middleware when creating the agent.

  • Automatic invocation of lifecycle hooks by LangChain during agent execution.


Last updated