Asynchronous Execution¶
OpenTrace v0.2 introduces comprehensive asynchronous execution support, enabling high-performance parallel processing and concurrent optimization workflows.
🚀 Overview¶
Asynchronous execution in OpenTrace v0.2 provides:
- Concurrent Operations: Run multiple optimizations simultaneously
- Non-blocking Execution: Continue processing while waiting for results
- Scalable Performance: Handle thousands of concurrent requests
- Resource Efficiency: Better CPU and memory utilization
⚡ Key Features¶
Async Trace Decorators¶
Transform any function into an asynchronous trace:
import asyncio
import opto
@opto.trace.async_trace
async def async_agent(query: str) -> str:
    # Async LLM call
    response = await llm.acomplete(query)
    return response
# Usage
async def main():
    result = await async_agent("What is machine learning?")
    print(result)
asyncio.run(main())
Parallel Optimization¶
Run multiple optimization tasks concurrently:
import asyncio
from opto import trace
@trace.optimize(strategy="async_beam_search")
async def my_workflow(input_data):
    # Your async workflow
    results = await process_async(input_data)
    return results
# Parallel execution
async def optimize_multiple():
    tasks = [
        my_workflow(data1),
        my_workflow(data2), 
        my_workflow(data3)
    ]
    results = await asyncio.gather(*tasks)
    return results
Async Batch Processing¶
Process large datasets efficiently with async batching:
from opto.trace import AsyncBatchProcessor
async def process_batch():
    processor = AsyncBatchProcessor(
        batch_size=100,
        max_concurrent=10
    )
    async for batch_result in processor.process(data_stream):
        # Process results as they arrive
        await handle_batch_result(batch_result)
🔄 Concurrency Patterns¶
Producer-Consumer Pattern¶
import asyncio
from opto.trace import AsyncQueue
async def producer(queue):
    for i in range(100):
        task = create_optimization_task(i)
        await queue.put(task)
    await queue.put(None)  # Sentinel
async def consumer(queue):
    while True:
        task = await queue.get()
        if task is None:
            break
        result = await optimize_task(task)
        await process_result(result)
async def main():
    queue = AsyncQueue(maxsize=10)
    await asyncio.gather(
        producer(queue),
        consumer(queue),
        consumer(queue)  # Multiple consumers
    )
Streaming Results¶
Process optimization results as they become available:
@opto.trace.async_stream
async def streaming_optimizer(data_stream):
    async for data_chunk in data_stream:
        result = await optimize_chunk(data_chunk)
        yield result
# Usage
async for result in streaming_optimizer(data_stream):
    await handle_result(result)
🎯 Performance Benefits¶
Benchmark Comparisons¶
Asynchronous execution shows significant performance improvements:
| Operation | Synchronous | Asynchronous | Improvement | 
|---|---|---|---|
| 100 LLM calls | 250 seconds | 25 seconds | 10x faster | 
| Batch optimization | 120 seconds | 20 seconds | 6x faster | 
| Concurrent workflows | 300 seconds | 45 seconds | 6.7x faster | 
Memory Efficiency¶
# Traditional approach - high memory usage
results = []
for item in large_dataset:
    result = optimize(item)  # Blocking
    results.append(result)
# Async approach - efficient memory usage  
async def process_efficiently():
    async for result in async_optimize_stream(large_dataset):
        await process_result(result)  # Process immediately
🔧 Advanced Configuration¶
Custom Event Loops¶
import asyncio
import opto
# Custom event loop with specific settings
loop = asyncio.new_event_loop()
loop.set_debug(True)
@opto.trace.async_trace(loop=loop)
async def custom_agent(query):
    return await process_with_custom_loop(query)
Async Context Management¶
from opto.trace import AsyncTraceContext
async def optimized_workflow():
    async with AsyncTraceContext(
        max_concurrent=20,
        timeout=30.0,
        retry_policy="exponential_backoff"
    ) as ctx:
        results = await ctx.run_parallel([
            task1(), task2(), task3()
        ])
    return results
Error Handling and Retries¶
@opto.trace.async_trace(
    retry_attempts=3,
    retry_delay=1.0,
    timeout=10.0
)
async def robust_async_agent(query):
    try:
        response = await llm.acomplete(query)
        return response
    except asyncio.TimeoutError:
        return "Request timed out"
    except Exception as e:
        # Automatic retry handling
        raise opto.RetryableError(f"Temporary failure: {e}")
🌊 Real-world Examples¶
Async Web Scraping Agent¶
import aiohttp
import opto
@opto.trace.async_trace
async def scrape_and_analyze(urls):
    async with aiohttp.ClientSession() as session:
        tasks = []
        for url in urls:
            task = asyncio.create_task(scrape_url(session, url))
            tasks.append(task)
        contents = await asyncio.gather(*tasks)
        # Analyze content in parallel
        analysis_tasks = [analyze_content(content) for content in contents]
        analyses = await asyncio.gather(*analysis_tasks)
        return combine_analyses(analyses)
Async Multi-Model Ensemble¶
@opto.trace.async_trace
async def ensemble_prediction(query):
    # Query multiple models simultaneously
    model_tasks = [
        model_gpt4.apredict(query),
        model_claude.apredict(query), 
        model_local.apredict(query)
    ]
    predictions = await asyncio.gather(*model_tasks, return_exceptions=True)
    # Filter successful predictions
    valid_predictions = [p for p in predictions if not isinstance(p, Exception)]
    # Combine results
    return ensemble_combine(valid_predictions)
📊 Monitoring and Debugging¶
Async Performance Metrics¶
from opto.trace import AsyncMetrics
@opto.trace.async_trace
async def monitored_agent(query):
    async with AsyncMetrics.monitor("agent_performance") as metrics:
        result = await process_query(query)
        # Metrics automatically captured:
        # - Execution time
        # - Concurrency level  
        # - Queue wait time
        # - Resource usage
        return result
# View metrics
print(AsyncMetrics.get_summary("agent_performance"))
Debugging Async Traces¶
# Enable async debugging
opto.trace.set_async_debug(True)
@opto.trace.async_trace(debug=True)
async def debug_agent(query):
    # Detailed logging of async operations
    await asyncio.sleep(0.1)  # Simulated async work
    return f"Processed: {query}"
🎯 Best Practices¶
- Use Async for I/O-bound Tasks: Perfect for LLM calls, web requests, file operations
- Limit Concurrency: Use semaphores to prevent overwhelming external services
- Handle Exceptions Gracefully: Implement proper error handling and recovery
- Monitor Resource Usage: Track memory and connection usage in production
- Test Thoroughly: Async code can have subtle race conditions
⚠️ Migration Guide¶
Converting existing synchronous traces to async:
# Before (synchronous)
@opto.trace
def sync_agent(query):
    response = llm.complete(query)
    return response
# After (asynchronous) 
@opto.trace.async_trace
async def async_agent(query):
    response = await llm.acomplete(query)  # Note: await
    return response
# Usage change
# Before: result = sync_agent("query")
# After: result = await async_agent("query")
📚 Learn More¶
- Async API Reference
- Performance Optimization Guide
- Concurrency Patterns Tutorial
- Error Handling Guide
Ready to supercharge your workflows with async execution? Start with our optimization tutorial to see async in action!