Langchain
Langchain Agents with Aiceberg
This document explains how we monitor different event types across an Agentic workflow in Agents built using Langchain framework
AicebergMiddleware provides real-time safety monitoring for your LangChain Agents. It tracks user inputs, LLM calls, and tool executions and memory to ensure safe and compliant agent behavior.
Simply add it to your LangChain Agents with:
middleware=[AicebergMiddleware()]
and get instant visibility into the safety of your conversational and reasoning workflows.
What is middleware and how it exposes hooks in Langchain?
Langchain middleware is a feature that allows developers to intercept and customize the agent's core loop, which involves calling a language model and executing tools.
Middleware is used to control and customize agent execution at every step. Middleware provides a way to more tightly control what happens inside the agent.
The core agent loop involves calling a model, letting it choose tools to execute, and then finishing when it calls no more tools.
Middleware exposes hooks before and after different event types. You can build custom middleware by implementing hooks that can be run at specific points in the agent execution flow. There are two types of middleware: decorator-based and class-based.
This guide uses class-based middleware because it is more powerful for complex middleware with multiple hooks; decorator-based middleware is useful for a single-hook middleware. Please refer to the appendix for more info.
Aiceberg as a middleware
Aiceberg middleware in LangChain acts as an advanced, real-time control layer within the agent execution flow, enabling security, compliance, and observability features for generative and Agentic AI systems. By plugging Aiceberg middleware into a LangChain agent, every prompt, tool call, and model response can be inspected and checked for policy violations, risks like prompt injection, and sensitive data leakage before reaching the model or returning to the user.
We use specific hooks for monitoring the respective event types in our custom AicebergMiddleware class.
before_agent
Before agent starts (once per invocation)
User → Agent (forward)
after_agent
After agent completes (up to once per invocation)
Agent → User (backward)
wrap_model_call
Around each model call
Agent ↔ LLM (forward & backward)
wrap_tool_call
Around each tool call
Agent ↔ Tools (forward & backward)
before_model / wrap_tool_call
Before model invocation starts / around the read/write memory tool
Agent ↔ Memory (forward & backward)
wrap_tool_call
Around sub-agent as a tool
Agent → Agent
P.S. LangChain generally offers two types of hooks: Node-style hooks and Wrap-style hooks. We use Wrap-style hooks—specifically wrap_model_call and wrap_tool_call—to observe the Agent-to-LLM and Agent-to-Tool event types. Wrap-style hooks allow intercepting execution with full control over handler calls (including the ability to block calls). See the appendix for a detailed distinction.
Events Aiceberg Monitors
User-to-Agent (user_agt)
The agent workflow is instrumented with two monitoring hooks — before_agent and after_agent — which capture User-to-Agent (U2A) events. These events represent both the initial user query sent to the agent and the final agent response produced after completing the full agentic reasoning process. Both hooks act as gates in the agent pipeline. If either hook detects a signal or receives a flagged response from Aiceberg, the entire agent flow can be blocked immediately.
Hook: before_agent
Monitors the user query being sent to the agent by forwarding it to Aiceberg. The Aiceberg response contains an event_id that is stored for correlation with the later final agent response.
Example:
class LoggingMiddleware(AgentMiddleware):
@hook_config(can_jump_to=["end"])
def before_agent(self, state, runtime):
print("____________Monitoring U2A event type: U2A forward___________")
# Extract the user’s message from the current conversation state
msg = state['messages'][0].content
# Send the user message to Aiceberg for monitoring
response = aiceberg_monitor(is_input=True, content=msg, event_type="user_agt")
_event_store.event_id = response.json().get("event_id")
flagged_result = handle_aiceberg_flagged_event(response)
# If flagged_result is not None, stop the agent and return the safe response
if flagged_result:
return flagged_result
return NoneHook: after_agent
Monitors the agent’s response after it’s generated, linking it to the earlier user query using the stored event ID. If Aiceberg flags the content, the agent flow is halted or the output is redacted.
Example:
@hook_config(can_jump_to=["end"])
def after_agent(self, state, runtime):
print("___________Monitoring U2A event type: U2A backward___________")
event_id = getattr(_event_store, "event_id", None)
# Get the assistant's last message content
content = state['messages'][-1].content
# send monitoring event to Aiceberg
response = aiceberg_monitor(
is_input=False,
content=content,
event_type="agt_user",
link_event_id=str(event_id)
)
# If flagged_result is not None, stop the agent and return the safe response
flagged_result = handle_aiceberg_flagged_event(response)
if flagged_result:
return flagged_result
return NoneAgent-to-LLM (agt_llm)
The agent flow is instrumented with the wrap_model_call hook, which monitors the entire Agent-to-LLM (A2M) interaction lifecycle. This hook captures both the request sent from the agent to the model and the response returned from the model.
Hook: wrap_model_call
Monitors every LLM call initiated by the Agent, as well as the corresponding model response returned from the LLM. Interactions are sanitized (e.g., removing usage parameters) before sending to Aiceberg. If Aiceberg flags content, the call or response can be blocked or replaced with a safe message.
Example (abbreviated):
def wrap_model_call(self, request: ModelRequest, handler: Callable[[ModelRequest], ModelResponse]) -> ModelResponse:
"""
Monitors and wraps the model call (Agent ↔ LLM) to track A2M (agent-to-model)
events via Aiceberg. Logs both forward (request) and backward (response) flows,
and checks for flagged events using Aiceberg moderation.
"""
# A2M Forward: Agent → LLM
print("_____________Monitoring A2M event type: A2M forward___________")
write_log("A2M_forward", str(request))
# Convert request to a dictionary and remove internal/unnecessary fields
data = request.__dict__.copy()
for key in ["model", "response_format", "state", "runtime", "model_settings"]:
data.pop(key, None)
# Clean message objects before sending to Aiceberg
cleaned_messages = []
for msg in data.get("messages", []):
msg_type = type(msg).__name__
if msg_type == "HumanMessage":
cleaned_messages.append({
"type": msg_type,
"content": safe_get(msg, "content", "")
})
elif msg_type == "AIMessage":
cleaned_messages.append({
"type": msg_type,
"content": safe_get(msg, "content", ""),
"tool_calls": safe_get(msg, "tool_calls", [])
})
elif msg_type == "ToolMessage":
if not isinstance(msg, dict):
msg = {k: v for k, v in msg.__dict__.items() if not k.startswith("_")}
msg["type"] = msg_type
cleaned_messages.append(msg)
else:
cleaned_messages.append({
"type": msg_type,
"content": safe_get(msg, "content", "")
})
data["messages"] = cleaned_messages
serialized_data = json.dumps(data, default=str, indent=2)
# Send to Aiceberg for monitoring (forward event)
aiceberg_response = aiceberg_monitor(
is_input=True,
content=str(serialized_data),
event_type="agt_llm" # Agent → LLM event
)
print("AICEBERG RESPONSE:", aiceberg_response.json())
# Handle flagged events before model call
flagged_result = handle_aiceberg_flagged_event(aiceberg_response)
if flagged_result:
# Return a safe placeholder model response if input was flagged
return ModelResponse(
result=[AIMessage(content="I cannot process requests containing inappropriate content. Please rephrase your request")],
structured_response=None,
)
# Store event_id for backward linking
event_id = aiceberg_response.json().get("event_id")
# Proceed with the model call
response = handler(request)
# A2M Backward: LLM → Agent
print("____________Monitoring A2M event type: A2M backward___________")
minimal_response = []
for msg in response.result:
minimal_response.append({
"content": msg.content,
"tool_calls": [tc for tc in msg.tool_calls]
})
# Send model output to Aiceberg for post-check
aiceberg_response = aiceberg_monitor(
is_input=False,
content=str(minimal_response),
event_type="agt_llm",
link_event_id=event_id
)
# Handle flagged output events
flagged_result = handle_aiceberg_flagged_event(aiceberg_response)
if flagged_result:
return ModelResponse(
result=[AIMessage(content="I cannot process requests containing inappropriate content. Please rephrase your request.")],
structured_response=None,
)
# Return the original model response if everything is safe
return responseAgent-to-Tool (agt_tool)
Every tool invocation passes through a monitoring wrapper (wrap_tool_call) that integrates with Aiceberg. Both the tool call and the tool output are tracked and sent to Aiceberg.
Hook: wrap_tool_call
Monitors every tool call made by the Agent and the corresponding tool output. If Aiceberg flags the tool input or output, the middleware can raise an exception and block execution.
Example:
def wrap_tool_call(
self,
request: ToolCallRequest,
handler: Callable[[ToolCallRequest], ToolMessage | Command],
) -> ToolMessage | Command:
print("___________Monitoring A2T event type: TOOL_CALL___________")
try:
# Forward tool call to Aiceberg for monitoring
aiceberg_response = aiceberg_monitor(
is_input=True,
content=str(request.tool_call),
event_type="agt_tool"
)
event_id = aiceberg_response.json().get("event_id")
# Raise exception if flagged by Aiceberg
if aiceberg_response.json().get("event_result") == "flagged":
raise RuntimeError("Tool execution blocked: flagged content detected by Aiceberg")
# Proceed with the tool call if safe
result = handler(request)
# Log and send tool output to Aiceberg for monitoring
aiceberg_response = aiceberg_monitor(
is_input=False,
content=str(result),
event_type="agt_tool",
link_event_id=event_id
)
event_id = aiceberg_response.json().get("event_id")
if aiceberg_response.json().get("event_result") == "flagged":
raise RuntimeError("Tool execution blocked: flagged content detected by Aiceberg")
return result
except Exception as e:
print(f"Tool failed: {e}")
raiseCan the tool call be blocked?
Yes. By raising an exception when Aiceberg flags input/output, the middleware can block a tool call mid-execution.
Why tool blocking is not done by default:
Blocking a tool mid-flight can disrupt the agent’s reasoning flow because the LLM expects tool outputs to continue processing. Without a proper result, you may: (1) send an error message as the tool result (confuses the LLM), (2) send empty/fake data (breaks logic), or (3) stop the entire agent (user gets incomplete response).
Recommended approach:
Log all tool calls and outputs for audit visibility.
If a tool produces unsafe content, handle it after execution (e.g., when the LLM processes the result or before displaying it to the user).
Block tools selectively when the use case demands it (e.g., preventing unsafe DB writes). If blocking, ensure the aftermath is handled properly.
Agent-to-Memory (agt_mem)
LangChain’s agent state manages short-term memory, enabling the agent to retain and access context across conversation turns. The state is persisted with a checkpointer and updated automatically. You can monitor or manipulate this short-term memory using hooks or tools.
1. Accessing Short-Term Memory with @before_model
@before_modelUse the @before_model hook to inspect/modify the agent’s memory before the model is called. The state (including messages) is passed to this hook.
Example — trimming messages before model invocation:
@before_model
def trim_messages(state: AgentState, runtime: Runtime) -> dict[str, Any] | None:
"""Keep only the last few messages to fit the model's context window."""
messages = state["messages"]
# Only trim if there are more than 3 messages
if len(messages) <= 3:
return None # No changes needed
first_msg = messages[0]
# Keep the last few messages for context
recent_messages = messages[-3:] if len(messages) % 2 == 0 else messages[-4:]
new_messages = [first_msg] + recent_messages
# Return the updated message list to overwrite memory
return {
"messages": [
RemoveMessage(id=REMOVE_ALL_MESSAGES),
*new_messages
]
}Example — monitoring all conversation messages:
@hook_config(can_jump_to=["end"])
def before_model(self, state, runtime):
print("____________Monitoring All Conversation Messages as short term memory ___________")
# Extract all messages' contents as a combined string
all_messages = "\n".join(msg.content for msg in state['messages'])
# Send the combined message history to Aiceberg for monitoring
response = aiceberg_monitor(
is_input=True,
content=all_messages,
event_type="agt_mem"
)
_event_store.event_id = response.json().get("event_id")
flagged_result = handle_aiceberg_flagged_event(response)
# If flagged_result is not None, stop model step and return the safe response
if flagged_result:
return flagged_result
return NoneAiceberg can:
Inspect short-term memory before the model uses it.
Detect sensitive or policy-violating data.
Detect and redact sensitive info (PII) before it reaches the model.
2. Accessing Short-Term Memory via Tools
Tools can read/write short-term memory via runtime.state.
Example — reading from short-term memory:
@tool
def get_user_info(runtime: ToolRuntime) -> str:
"""Retrieve user information from short-term memory."""
user_id = runtime.state["user_id"]
return "User is John Smith" if user_id == "user_123" else "Unknown user"Example — writing to short-term memory:
@tool
def update_user_info(runtime: ToolRuntime[CustomContext, CustomState]) -> Command:
"""Update user info and append to the message history."""
user_id = runtime.context.user_id
name = "John Smith" if user_id == "user_123" else "Unknown user"
return Command(update={
"user_name": name,
# Update the short-term message history
"messages": [
ToolMessage(
"Successfully looked up user information",
tool_call_id=runtime.tool_call_id
)
]
})Monitoring via wrap_tool_call:
Aiceberg integrates with
wrap_tool_callto monitor inputs and outputs of tool calls, which includes data read from / written to memory. This provides visibility into state modifications and allows detection of sensitive data exposure.
Agent-to-Agent
Agent-to-agent communication occurs through two patterns: Tool Calling (supervisor agent calls sub-agents as tools) and Handoffs (control passes directly to another agent).
Tool Calling
A sub-agent can be called as a tool. The wrap_tool_call hook monitors the incoming request (what is passed to the sub-agent) and the returned response, enabling policy enforcement on the data exchanged.
Example of calling a sub-agent via a tool:
from langchain.tools import tool
from langchain.agents import create_agent
subagent1 = create_agent(model="...", tools=[...])
@tool(
"subagent1_name",
description="subagent1_description"
)
def call_subagent1(query: str):
result = subagent1.invoke({
"messages": [{"role": "user", "content": query}]
})
return result["messages"][-1].content
agent = create_agent(model="...", tools=[call_subagent1])Handoffs
Handoffs refer to passing control and state to another agent. The full implementation is still evolving in LangChain, but conceptual docs and LangGraph components indicate handoffs use a Command object to transfer control and state. Because state is updated and available to the before_model hook, Aiceberg can monitor or redact information passed during handoffs via that hook.
Quick setup
Register the Aiceberg middleware
Register AicebergMiddleware in your LangChain Agent. Example Weather Agent:
from dataclasses import dataclass
from langchain.tools import tool, ToolRuntime
from typing import Any
from langchain.agents import create_agent
from langchain.chat_models import init_chat_model
from langchain.agents.middleware import AgentMiddleware, ModelRequest, ModelResponse
from typing import Callable
from langgraph.checkpoint.memory import InMemorySaver
from langchain.agents.middleware import AgentMiddleware, AgentState
from langgraph.runtime import Runtime
from typing import Any
from langchain.tools.tool_node import ToolCallRequest
from langchain_core.messages import ToolMessage
from langgraph.types import Command
from typing import Callable
import requests
import threading
import json
import os
from dataclasses import asdict
from monitoring import write_log
import re
from aiceberg_middleware import *
SYSTEM_PROMPT = """You are an expert weather forecaster, who speaks in puns.
You have access to two tools:
- get_weather_for_location: use this to get the weather for a specific location
- get_user_location: use this to get the user's location
If a user asks you for the weather, make sure you know the location. If you can tell from the question that they mean wherever they are, use the get_user_location tool to find their location."""
@tool
def get_weather_for_location(city: str) -> str:
"""Get weather for a given city."""
return f"It's always sunny in {city}!"
@dataclass
class Context:
"""Custom runtime context schema."""
user_id: str
@tool
def get_user_location(runtime: ToolRuntime[Context, Any]) -> str:
"""Retrieve user information based on user ID."""
user_id = runtime.context.user_id
return "Florida" if user_id == "1" else "SF"
model = init_chat_model(
"gpt-4.1-mini",
temperature=0.5,
timeout=10,
max_tokens=1000
)
agent = create_agent(
model=model,
system_prompt=SYSTEM_PROMPT,
tools=[get_user_location, get_weather_for_location],
context_schema=Context,
middleware=[AicebergMiddleware()],
#checkpointer=InMemorySaver(),
)
# `thread_id` is a unique identifier for a given conversation.
config = {"configurable": {"thread_id": "1"}}
response = agent.invoke(
{"messages": [{"role": "user", "content": "what is the weather outside?"}]},
config=config,
context=Context(user_id="abc")
)Run the agent and observe events on the Aiceberg dashboard.
Appendix
Last updated