The Infinium SDK provides full async support through AsyncInfiniumClient. Every method available on the sync client has an async equivalent.
AsyncInfiniumClient
import asyncio
from infinium import AsyncInfiniumClient
async def main():
client = AsyncInfiniumClient(
agent_id="your-agent-id",
agent_secret="your-agent-secret",
)
await client.send_task(
name="Moderate content",
description="Check user content for policy violations.",
duration=1.8,
)
await client.close()
asyncio.run(main())
Context Manager
Use async with for automatic cleanup:
async with AsyncInfiniumClient(agent_id="...", agent_secret="...") as client:
await client.send_task(name="...", description="...", duration=1.0)
# client.close() is called automatically
Async Trace Sending
send_task()
response = await client.send_task(
name="Summarize document",
description="Generated a 2-sentence summary.",
duration=2.1,
llm_usage={"model": "gpt-4o", "prompt_tokens": 500, "completion_tokens": 80},
)
send_task_data()
td = AsyncInfiniumClient.create_task_data(
name="Research task",
description="Multi-step research pipeline.",
duration=5.4,
steps=steps,
)
response = await client.send_task_data(td)
send_tasks_batch()
The async batch method supports a fail_fast parameter:
result = await client.send_tasks_batch(
tasks,
max_concurrent=5,
fail_fast=True, # Stop on first error (async only)
)
print(f"Sent {result.successful}/{result.successful + result.failed}")
Async Decorators
@client.trace() — Auto-Detects
@client.trace("Async Classifier")
async def classify(text: str) -> dict:
resp = await async_openai.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": text}],
)
return json.loads(resp.choices[0].message.content)
@async_trace_agent — Explicit
from infinium import async_trace_agent
@async_trace_agent("Content Moderator", client)
async def moderate(content: str) -> str:
resp = await async_anthropic.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=256,
messages=[{"role": "user", "content": content}],
)
return resp.content[0].text
Async with watch()
watch() supports async LLM clients:
from openai import AsyncOpenAI
from anthropic import AsyncAnthropic
from infinium.integrations import watch
async_openai = watch(AsyncOpenAI())
async_anthropic = watch(AsyncAnthropic())
@client.trace("Multi-Provider Agent")
async def process(query: str) -> str:
# Both calls are auto-captured
resp1 = await async_openai.chat.completions.create(
model="gpt-4o", messages=[{"role": "user", "content": query}],
)
resp2 = await async_anthropic.messages.create(
model="claude-3-5-sonnet-20241022", max_tokens=1024,
messages=[{"role": "user", "content": query}],
)
return f"OpenAI: {resp1.choices[0].message.content}\nAnthropic: {resp2.content[0].text}"
Async TraceBuilder
TraceBuilder works the same in async contexts. The step() context manager is synchronous (uses with, not async with), but you can await inside it:
from infinium import TraceBuilder
trace = TraceBuilder("Async Research", "Research with async LLM calls")
with trace.step("llm_inference", "Analyze data") as step:
resp = await async_openai.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": data}],
)
step.set_output(resp.choices[0].message.content[:500])
step.record_llm_call("gpt-4o",
prompt_tokens=resp.usage.prompt_tokens,
completion_tokens=resp.usage.completion_tokens)
task_data = trace.build()
await client.send_task_data(task_data)
Async Maestro Polling
response = await client.send_task_data(task_data)
trace_id = response.data.get("traceId")
interpretation = await client.wait_for_interpretation(
trace_id, timeout=120, poll_interval=3,
)
print(interpretation.data["interpretedTraceResult"])
Async Prompt Studio
prompt = await client.get_prompt(
prompt_id="...",
prompt_key="...",
variables={"tone": "professional"},
)