Auto-instrumentation lets the SDK automatically capture LLM calls without modifying your agent logic. There are two approaches: instance-level patching with watch(), and class-level patching with instrumentors.
watch() — Instance-Level Patching
watch() monkey-patches a single LLM client instance. Only that specific object is instrumented:
from openai import OpenAI
from infinium.integrations import watch
openai = watch(OpenAI()) # This instance is patched
other = OpenAI() # This instance is NOT patched
watch() returns the same client object, so you can chain it:
openai = watch(OpenAI(api_key="..."), capture_content=True)
Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
client | LLM client | required | An LLM provider client instance |
capture_content | bool | False | Capture input/output previews (truncated to 500 chars) |
Supported Providers
| Provider | Sync Client | Async Client | Patched Method |
|---|---|---|---|
| OpenAI | openai.OpenAI | openai.AsyncOpenAI | chat.completions.create |
| Anthropic | anthropic.Anthropic | anthropic.AsyncAnthropic | messages.create |
| Google Gemini | generativeai.GenerativeModel | same | generate_content / generate_content_async |
| xAI (Grok) | openai.OpenAI(base_url=...) | openai.AsyncOpenAI(base_url=...) | chat.completions.create |
xAI/Grok is auto-detected when the OpenAI client’s base_url contains x.ai, grok, or xai.
What Gets Captured
With capture_content=False (default):
| Field | Description |
|---|---|
provider | "openai", "anthropic", "google", "xai" |
model | Model name from the API call |
prompt_tokens | Input token count |
completion_tokens | Output token count |
temperature | Temperature parameter (if provided) |
latency_ns | Call duration in nanoseconds |
error_type | Exception class name (on failure) |
error_message | Exception message (on failure) |
With capture_content=True, additionally:
| Field | Description |
|---|---|
input_preview | Last 2 messages, truncated to 500 chars |
output_preview | Response text, truncated to 500 chars |
Privacy
Content capture is opt-in by default to protect sensitive data. Only enable it when you need to see what was sent to/from the LLM. Previews are always truncated to 500 characters.
Streaming Support
All providers handle streaming transparently. When you pass stream=True, the SDK wraps the response iterator to accumulate chunks and extract token counts:
openai = watch(OpenAI())
# Streaming works exactly the same -- no code changes needed
response = openai.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "Tell me a story"}],
stream=True,
)
for chunk in response:
print(chunk.choices[0].delta.content or "", end="")
# Tokens and latency are captured when the stream completes
The stream wrapper:
- Returns a wrapper object that mimics the original iterator
- Accumulates chunks to extract total token counts from the final chunk
- Records latency from start to end of stream consumption
- Uses a
_finalisedguard to prevent double-recording when the iterator is fully consumed and then__exit__is called - Async streams (
async for chunk in response:) work identically
Instrumentors — Class-Level Patching
Instrumentors patch at the class level, affecting all instances created after instrument() is called:
from infinium.integrations import OpenAIInstrumentor, AnthropicInstrumentor, GoogleInstrumentor
# All OpenAI clients created after this are instrumented
OpenAIInstrumentor(capture_content=False).instrument()
# All Anthropic clients created after this are instrumented
AnthropicInstrumentor(capture_content=True).instrument()
# All Gemini models created after this are instrumented
GoogleInstrumentor().instrument()
Reverting
Call uninstrument() to restore the original methods:
instrumentor = OpenAIInstrumentor()
instrumentor.instrument()
# ... later ...
instrumentor.uninstrument() # Restores original chat.completions.create
Instance vs Class-Level
watch(client) | XxxInstrumentor().instrument() | |
|---|---|---|
| Scope | Single instance | All future instances |
| Explicit | Yes — you see which client is patched | No — implicit global effect |
| Reversible | Replace the variable | Call uninstrument() |
| Best for | Most use cases | Framework initialization, middleware |
Recommendation: Use watch() unless you have a specific reason to patch globally.
Combining with Traces
Auto-captured calls are stored in a TraceContext (backed by contextvars.ContextVar). They’re automatically incorporated when used inside @trace_agent, @async_trace_agent, or @client.trace():
from openai import OpenAI
from infinium import InfiniumClient
from infinium.integrations import watch
client = InfiniumClient(agent_id="...", agent_secret="...")
openai = watch(OpenAI())
@client.trace("Summarize Article")
def summarize(article: str) -> str:
# This LLM call is auto-captured into the active trace
resp = openai.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": "Summarize in 3 bullets."},
{"role": "user", "content": article},
],
)
return resp.choices[0].message.content
# The trace includes the LLM call with model, tokens, and latency
result = summarize("The Federal Reserve announced...")
How It Works
@client.trace()opens aTraceContextand stores it in aContextVarwatch()-patched methods check for an activeTraceContext- If one exists, they record a
CapturedLlmCallinto it - When the decorated function returns,
TraceBuilder._incorporate_captured_calls()converts captured calls intoExecutionStepobjects and aggregatesLlmUsage - The trace is auto-sent to the API
This is async-safe and thread-safe because contextvars provides per-task isolation.
Provider-Specific Notes
OpenAI
from openai import OpenAI
openai = watch(OpenAI(api_key="sk-..."))
response = openai.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "Hello"}],
)
Token counts are extracted from response.usage.prompt_tokens and response.usage.completion_tokens.
Anthropic
from anthropic import Anthropic
anthropic = watch(Anthropic(api_key="sk-ant-..."))
response = anthropic.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
messages=[{"role": "user", "content": "Hello"}],
)
Token counts are extracted from response.usage.input_tokens and response.usage.output_tokens. Streaming parses message_start, message_delta, and content_block_delta events.
Google Gemini
import google.generativeai as genai
model = watch(genai.GenerativeModel("gemini-2.0-flash"))
response = model.generate_content("Explain quantum computing")
Token counts are extracted from response.usage_metadata. The model name is read from the GenerativeModel instance.
xAI (Grok)
from openai import OpenAI
xai = watch(OpenAI(base_url="https://api.x.ai/v1", api_key="xai-..."))
response = xai.chat.completions.create(
model="grok-2",
messages=[{"role": "user", "content": "Hello"}],
)
xAI uses the OpenAI SDK with a custom base_url. The SDK detects this automatically and records the provider as "xai".