Auto-instrumentation lets the SDK automatically capture LLM calls without modifying your agent logic. There are two approaches: instance-level patching with watch(), and class-level patching with instrumentors.

watch() — Instance-Level Patching

watch() monkey-patches a single LLM client instance. Only that specific object is instrumented:

from openai import OpenAI
from infinium.integrations import watch

openai = watch(OpenAI())  # This instance is patched
other = OpenAI()           # This instance is NOT patched

watch() returns the same client object, so you can chain it:

openai = watch(OpenAI(api_key="..."), capture_content=True)

Parameters

ParameterTypeDefaultDescription
clientLLM clientrequiredAn LLM provider client instance
capture_contentboolFalseCapture input/output previews (truncated to 500 chars)

Supported Providers

ProviderSync ClientAsync ClientPatched Method
OpenAIopenai.OpenAIopenai.AsyncOpenAIchat.completions.create
Anthropicanthropic.Anthropicanthropic.AsyncAnthropicmessages.create
Google Geminigenerativeai.GenerativeModelsamegenerate_content / generate_content_async
xAI (Grok)openai.OpenAI(base_url=...)openai.AsyncOpenAI(base_url=...)chat.completions.create

xAI/Grok is auto-detected when the OpenAI client’s base_url contains x.ai, grok, or xai.

What Gets Captured

With capture_content=False (default):

FieldDescription
provider"openai", "anthropic", "google", "xai"
modelModel name from the API call
prompt_tokensInput token count
completion_tokensOutput token count
temperatureTemperature parameter (if provided)
latency_nsCall duration in nanoseconds
error_typeException class name (on failure)
error_messageException message (on failure)

With capture_content=True, additionally:

FieldDescription
input_previewLast 2 messages, truncated to 500 chars
output_previewResponse text, truncated to 500 chars

Privacy

Content capture is opt-in by default to protect sensitive data. Only enable it when you need to see what was sent to/from the LLM. Previews are always truncated to 500 characters.


Streaming Support

All providers handle streaming transparently. When you pass stream=True, the SDK wraps the response iterator to accumulate chunks and extract token counts:

openai = watch(OpenAI())

# Streaming works exactly the same -- no code changes needed
response = openai.chat.completions.create(
    model="gpt-4o",
    messages=[{"role": "user", "content": "Tell me a story"}],
    stream=True,
)

for chunk in response:
    print(chunk.choices[0].delta.content or "", end="")

# Tokens and latency are captured when the stream completes

The stream wrapper:

  • Returns a wrapper object that mimics the original iterator
  • Accumulates chunks to extract total token counts from the final chunk
  • Records latency from start to end of stream consumption
  • Uses a _finalised guard to prevent double-recording when the iterator is fully consumed and then __exit__ is called
  • Async streams (async for chunk in response:) work identically

Instrumentors — Class-Level Patching

Instrumentors patch at the class level, affecting all instances created after instrument() is called:

from infinium.integrations import OpenAIInstrumentor, AnthropicInstrumentor, GoogleInstrumentor

# All OpenAI clients created after this are instrumented
OpenAIInstrumentor(capture_content=False).instrument()

# All Anthropic clients created after this are instrumented
AnthropicInstrumentor(capture_content=True).instrument()

# All Gemini models created after this are instrumented
GoogleInstrumentor().instrument()

Reverting

Call uninstrument() to restore the original methods:

instrumentor = OpenAIInstrumentor()
instrumentor.instrument()

# ... later ...
instrumentor.uninstrument()  # Restores original chat.completions.create

Instance vs Class-Level

watch(client)XxxInstrumentor().instrument()
ScopeSingle instanceAll future instances
ExplicitYes — you see which client is patchedNo — implicit global effect
ReversibleReplace the variableCall uninstrument()
Best forMost use casesFramework initialization, middleware

Recommendation: Use watch() unless you have a specific reason to patch globally.


Combining with Traces

Auto-captured calls are stored in a TraceContext (backed by contextvars.ContextVar). They’re automatically incorporated when used inside @trace_agent, @async_trace_agent, or @client.trace():

from openai import OpenAI
from infinium import InfiniumClient
from infinium.integrations import watch

client = InfiniumClient(agent_id="...", agent_secret="...")
openai = watch(OpenAI())

@client.trace("Summarize Article")
def summarize(article: str) -> str:
    # This LLM call is auto-captured into the active trace
    resp = openai.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "system", "content": "Summarize in 3 bullets."},
            {"role": "user", "content": article},
        ],
    )
    return resp.choices[0].message.content

# The trace includes the LLM call with model, tokens, and latency
result = summarize("The Federal Reserve announced...")

How It Works

  1. @client.trace() opens a TraceContext and stores it in a ContextVar
  2. watch()-patched methods check for an active TraceContext
  3. If one exists, they record a CapturedLlmCall into it
  4. When the decorated function returns, TraceBuilder._incorporate_captured_calls() converts captured calls into ExecutionStep objects and aggregates LlmUsage
  5. The trace is auto-sent to the API

This is async-safe and thread-safe because contextvars provides per-task isolation.


Provider-Specific Notes

OpenAI

from openai import OpenAI
openai = watch(OpenAI(api_key="sk-..."))

response = openai.chat.completions.create(
    model="gpt-4o",
    messages=[{"role": "user", "content": "Hello"}],
)

Token counts are extracted from response.usage.prompt_tokens and response.usage.completion_tokens.

Anthropic

from anthropic import Anthropic
anthropic = watch(Anthropic(api_key="sk-ant-..."))

response = anthropic.messages.create(
    model="claude-3-5-sonnet-20241022",
    max_tokens=1024,
    messages=[{"role": "user", "content": "Hello"}],
)

Token counts are extracted from response.usage.input_tokens and response.usage.output_tokens. Streaming parses message_start, message_delta, and content_block_delta events.

Google Gemini

import google.generativeai as genai
model = watch(genai.GenerativeModel("gemini-2.0-flash"))

response = model.generate_content("Explain quantum computing")

Token counts are extracted from response.usage_metadata. The model name is read from the GenerativeModel instance.

xAI (Grok)

from openai import OpenAI
xai = watch(OpenAI(base_url="https://api.x.ai/v1", api_key="xai-..."))

response = xai.chat.completions.create(
    model="grok-2",
    messages=[{"role": "user", "content": "Hello"}],
)

xAI uses the OpenAI SDK with a custom base_url. The SDK detects this automatically and records the provider as "xai".