Documentation Index
Fetch the complete documentation index at: https://docs.praison.ai/llms.txt
Use this file to discover all available pages before exploring further.
Agent Retry Strategies
Implementing robust retry strategies is crucial for building resilient multi-agent systems that can handle transient failures gracefully. This guide covers various retry patterns and their implementation.Retry Strategy Fundamentals
When to Retry
- Transient Network Errors: Temporary connectivity issues
- Rate Limiting: API throttling responses
- Temporary Resource Unavailability: Database locks, service restarts
- Timeout Errors: Slow responses that exceed limits
- Partial Failures: When part of an operation succeeds
When NOT to Retry
- Authentication Failures: Invalid credentials
- Authorization Errors: Insufficient permissions
- Invalid Input: Malformed requests
- Business Logic Errors: Domain-specific failures
- Resource Not Found: 404 errors
Retry Patterns
1. Exponential Backoff with Jitter
Prevent thundering herd problems with randomized delays:import random
import time
from typing import TypeVar, Callable, Optional, Any
from dataclasses import dataclass
import logging
T = TypeVar('T')
@dataclass
class RetryConfig:
max_attempts: int = 3
initial_delay: float = 1.0
max_delay: float = 60.0
exponential_base: float = 2.0
jitter: bool = True
class ExponentialBackoffRetry:
def __init__(self, config: RetryConfig = None):
self.config = config or RetryConfig()
self.logger = logging.getLogger(__name__)
def execute(self,
func: Callable[..., T],
*args,
retryable_exceptions: tuple = (Exception,),
on_retry: Optional[Callable[[int, Exception], None]] = None,
**kwargs) -> T:
"""Execute function with exponential backoff retry"""
last_exception = None
for attempt in range(self.config.max_attempts):
try:
return func(*args, **kwargs)
except retryable_exceptions as e:
last_exception = e
if attempt == self.config.max_attempts - 1:
# Last attempt failed
self.logger.error(f"All {self.config.max_attempts} attempts failed")
raise
# Calculate delay
delay = self._calculate_delay(attempt)
# Call retry callback if provided
if on_retry:
on_retry(attempt + 1, e)
self.logger.warning(
f"Attempt {attempt + 1} failed: {str(e)}. "
f"Retrying in {delay:.2f} seconds..."
)
time.sleep(delay)
raise last_exception
def _calculate_delay(self, attempt: int) -> float:
"""Calculate delay with exponential backoff and jitter"""
# Exponential delay
delay = self.config.initial_delay * (self.config.exponential_base ** attempt)
# Cap at max delay
delay = min(delay, self.config.max_delay)
# Add jitter
if self.config.jitter:
# Full jitter strategy
delay = random.uniform(0, delay)
return delay
# Async version
import asyncio
class AsyncExponentialBackoffRetry:
def __init__(self, config: RetryConfig = None):
self.config = config or RetryConfig()
self.logger = logging.getLogger(__name__)
async def execute(self,
func: Callable[..., T],
*args,
retryable_exceptions: tuple = (Exception,),
on_retry: Optional[Callable[[int, Exception], None]] = None,
**kwargs) -> T:
"""Execute async function with exponential backoff retry"""
last_exception = None
for attempt in range(self.config.max_attempts):
try:
return await func(*args, **kwargs)
except retryable_exceptions as e:
last_exception = e
if attempt == self.config.max_attempts - 1:
self.logger.error(f"All {self.config.max_attempts} attempts failed")
raise
delay = self._calculate_delay(attempt)
if on_retry:
on_retry(attempt + 1, e)
self.logger.warning(
f"Attempt {attempt + 1} failed: {str(e)}. "
f"Retrying in {delay:.2f} seconds..."
)
await asyncio.sleep(delay)
raise last_exception
def _calculate_delay(self, attempt: int) -> float:
"""Calculate delay with exponential backoff and jitter"""
delay = self.config.initial_delay * (self.config.exponential_base ** attempt)
delay = min(delay, self.config.max_delay)
if self.config.jitter:
delay = random.uniform(0, delay)
return delay
PraisonAI now ships a built-in tool circuit breaker that wraps every tool call automatically. See Tool Circuit Breaker. The examples below show how to extend or customise that pattern.
2. Circuit Breaker with Retry
Combine circuit breaker pattern with intelligent retry:from datetime import datetime, timedelta
from enum import Enum
import threading
class CircuitState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
class CircuitBreakerRetry:
def __init__(self,
failure_threshold: int = 5,
recovery_timeout: int = 60,
half_open_max_calls: int = 3):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.half_open_max_calls = half_open_max_calls
self.state = CircuitState.CLOSED
self.failure_count = 0
self.last_failure_time = None
self.half_open_calls = 0
self._lock = threading.Lock()
self.retry_strategy = ExponentialBackoffRetry()
# Metrics
self.metrics = {
"total_calls": 0,
"successful_calls": 0,
"failed_calls": 0,
"rejected_calls": 0
}
def execute(self, func: Callable[..., T], *args, **kwargs) -> T:
"""Execute function with circuit breaker and retry logic"""
with self._lock:
self.metrics["total_calls"] += 1
if self.state == CircuitState.OPEN:
if self._should_attempt_reset():
self.state = CircuitState.HALF_OPEN
self.half_open_calls = 0
else:
self.metrics["rejected_calls"] += 1
raise Exception("Circuit breaker is OPEN")
if self.state == CircuitState.HALF_OPEN:
if self.half_open_calls >= self.half_open_max_calls:
self.metrics["rejected_calls"] += 1
raise Exception("Circuit breaker is HALF_OPEN, max calls reached")
self.half_open_calls += 1
try:
# Use retry strategy when circuit is closed or half-open
result = self.retry_strategy.execute(
func, *args,
on_retry=self._on_retry,
**kwargs
)
with self._lock:
self._on_success()
self.metrics["successful_calls"] += 1
return result
except Exception as e:
with self._lock:
self._on_failure()
self.metrics["failed_calls"] += 1
raise
def _should_attempt_reset(self) -> bool:
"""Check if circuit should attempt reset"""
return (
self.last_failure_time and
datetime.now() - self.last_failure_time > timedelta(seconds=self.recovery_timeout)
)
def _on_success(self):
"""Handle successful call"""
if self.state == CircuitState.HALF_OPEN:
self.state = CircuitState.CLOSED
self.failure_count = 0
self.last_failure_time = None
def _on_failure(self):
"""Handle failed call"""
self.failure_count += 1
self.last_failure_time = datetime.now()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
def _on_retry(self, attempt: int, exception: Exception):
"""Called when retry attempt is made"""
# Could implement additional logic here
pass
def get_state(self) -> Dict[str, Any]:
"""Get current circuit breaker state"""
with self._lock:
return {
"state": self.state.value,
"failure_count": self.failure_count,
"metrics": self.metrics.copy()
}
3. Adaptive Retry Strategy
Adjust retry behavior based on success patterns:import statistics
from collections import deque
class AdaptiveRetryStrategy:
def __init__(self,
min_attempts: int = 1,
max_attempts: int = 5,
success_threshold: float = 0.8,
window_size: int = 100):
self.min_attempts = min_attempts
self.max_attempts = max_attempts
self.success_threshold = success_threshold
self.window_size = window_size
self.current_max_attempts = max_attempts
self.results = deque(maxlen=window_size)
self.attempt_counts = deque(maxlen=window_size)
self._lock = threading.Lock()
def execute(self, func: Callable[..., T], *args, **kwargs) -> T:
"""Execute with adaptive retry"""
last_exception = None
attempts = 0
for attempt in range(1, self.current_max_attempts + 1):
attempts = attempt
try:
result = func(*args, **kwargs)
self._record_success(attempts)
return result
except Exception as e:
last_exception = e
if attempt < self.current_max_attempts:
# Adaptive delay based on attempt number
delay = self._calculate_adaptive_delay(attempt)
time.sleep(delay)
else:
self._record_failure(attempts)
raise
raise last_exception
def _calculate_adaptive_delay(self, attempt: int) -> float:
"""Calculate delay based on recent performance"""
base_delay = 1.0
with self._lock:
if len(self.results) >= 10:
# Adjust delay based on recent success rate
success_rate = sum(self.results) / len(self.results)
if success_rate < 0.3:
# High failure rate - increase delays
base_delay *= 2
elif success_rate > 0.8:
# High success rate - decrease delays
base_delay *= 0.5
# Add some randomness
delay = base_delay * (2 ** (attempt - 1))
return min(delay + random.uniform(-0.5, 0.5), 30.0)
def _record_success(self, attempts: int):
"""Record successful execution"""
with self._lock:
self.results.append(True)
self.attempt_counts.append(attempts)
self._adapt_strategy()
def _record_failure(self, attempts: int):
"""Record failed execution"""
with self._lock:
self.results.append(False)
self.attempt_counts.append(attempts)
self._adapt_strategy()
def _adapt_strategy(self):
"""Adapt retry strategy based on recent performance"""
if len(self.results) < 20:
return
success_rate = sum(self.results) / len(self.results)
avg_attempts = statistics.mean(self.attempt_counts)
if success_rate > self.success_threshold and avg_attempts < 2:
# High success with few retries - can reduce max attempts
self.current_max_attempts = max(
self.min_attempts,
self.current_max_attempts - 1
)
elif success_rate < 0.5 and avg_attempts > 3:
# Low success with many retries - increase max attempts
self.current_max_attempts = min(
self.max_attempts,
self.current_max_attempts + 1
)
def get_stats(self) -> Dict[str, Any]:
"""Get adaptive strategy statistics"""
with self._lock:
if not self.results:
return {"current_max_attempts": self.current_max_attempts}
return {
"current_max_attempts": self.current_max_attempts,
"success_rate": sum(self.results) / len(self.results),
"avg_attempts": statistics.mean(self.attempt_counts),
"sample_size": len(self.results)
}
4. Retry with Fallback
Implement retry with progressive fallback options:@dataclass
class RetryWithFallbackConfig:
primary_retry_attempts: int = 3
fallback_retry_attempts: int = 2
use_cache_on_failure: bool = True
class RetryWithFallback:
def __init__(self, config: RetryWithFallbackConfig = None):
self.config = config or RetryWithFallbackConfig()
self.cache = {}
self.primary_retry = ExponentialBackoffRetry(
RetryConfig(max_attempts=self.config.primary_retry_attempts)
)
self.fallback_retry = ExponentialBackoffRetry(
RetryConfig(max_attempts=self.config.fallback_retry_attempts)
)
def execute(self,
primary_func: Callable[..., T],
fallback_func: Optional[Callable[..., T]] = None,
cache_key: Optional[str] = None,
*args, **kwargs) -> T:
"""Execute with retry and fallback"""
# Try primary function with retry
try:
result = self.primary_retry.execute(primary_func, *args, **kwargs)
# Cache successful result
if cache_key:
self.cache[cache_key] = {
"value": result,
"timestamp": time.time()
}
return result
except Exception as primary_error:
logging.warning(f"Primary function failed: {primary_error}")
# Try fallback if available
if fallback_func:
try:
return self.fallback_retry.execute(fallback_func, *args, **kwargs)
except Exception as fallback_error:
logging.warning(f"Fallback function failed: {fallback_error}")
# Try cache if enabled
if self.config.use_cache_on_failure and cache_key and cache_key in self.cache:
cached = self.cache[cache_key]
logging.info(f"Returning cached result from {time.time() - cached['timestamp']:.1f}s ago")
return cached["value"]
# All options exhausted
raise primary_error
5. Contextual Retry Strategy
Different retry strategies based on context:class ContextualRetryStrategy:
def __init__(self):
self.strategies = {}
self.default_strategy = ExponentialBackoffRetry()
def register_strategy(self, context: str, strategy: Any):
"""Register a retry strategy for a specific context"""
self.strategies[context] = strategy
def execute(self,
func: Callable[..., T],
context: str,
*args, **kwargs) -> T:
"""Execute with context-appropriate retry strategy"""
# Select strategy based on context
strategy = self.strategies.get(context, self.default_strategy)
# Add context-specific error handling
if context == "database":
retryable_exceptions = (DBConnectionError, TimeoutError)
elif context == "api":
retryable_exceptions = (RequestException, HTTPError)
elif context == "ml_model":
retryable_exceptions = (ModelLoadError, InferenceError)
else:
retryable_exceptions = (Exception,)
return strategy.execute(
func, *args,
retryable_exceptions=retryable_exceptions,
**kwargs
)
# Usage example
retry_manager = ContextualRetryStrategy()
# Register specific strategies
retry_manager.register_strategy(
"database",
ExponentialBackoffRetry(RetryConfig(
max_attempts=5,
initial_delay=0.1,
max_delay=10.0
))
)
retry_manager.register_strategy(
"api",
ExponentialBackoffRetry(RetryConfig(
max_attempts=3,
initial_delay=1.0,
max_delay=30.0,
jitter=True
))
)
Advanced Retry Patterns
1. Bulkhead Retry Pattern
Isolate retry resources to prevent cascade failures:from concurrent.futures import ThreadPoolExecutor, Future
import queue
class BulkheadRetry:
def __init__(self,
max_concurrent_retries: int = 10,
queue_size: int = 100):
self.max_concurrent_retries = max_concurrent_retries
self.executor = ThreadPoolExecutor(max_workers=max_concurrent_retries)
self.retry_queue = queue.Queue(maxsize=queue_size)
self.active_retries = 0
self._lock = threading.Lock()
def execute_with_bulkhead(self,
func: Callable[..., T],
*args,
retry_config: RetryConfig = None,
**kwargs) -> Future[T]:
"""Execute with bulkhead isolation"""
retry_config = retry_config or RetryConfig()
# Check if we can accept more retries
with self._lock:
if self.active_retries >= self.max_concurrent_retries:
try:
# Try to queue
self.retry_queue.put_nowait((func, args, kwargs, retry_config))
return self._create_pending_future()
except queue.Full:
raise Exception("Retry bulkhead is full")
self.active_retries += 1
# Submit retry task
future = self.executor.submit(
self._execute_with_retry,
func, args, kwargs, retry_config
)
# Decrement counter when done
future.add_done_callback(lambda f: self._on_retry_complete())
return future
def _execute_with_retry(self, func, args, kwargs, retry_config):
"""Execute function with retry"""
retry_strategy = ExponentialBackoffRetry(retry_config)
return retry_strategy.execute(func, *args, **kwargs)
def _on_retry_complete(self):
"""Called when retry completes"""
with self._lock:
self.active_retries -= 1
# Process queued retries
if not self.retry_queue.empty():
try:
func, args, kwargs, retry_config = self.retry_queue.get_nowait()
self.active_retries += 1
future = self.executor.submit(
self._execute_with_retry,
func, args, kwargs, retry_config
)
future.add_done_callback(lambda f: self._on_retry_complete())
except queue.Empty:
pass
def _create_pending_future(self) -> Future[T]:
"""Create a future that will be resolved when retry executes"""
future = Future()
# Implementation depends on your needs
return future
2. Hedged Requests Pattern
Send multiple requests and use the first successful response:import asyncio
from typing import List
class HedgedRetryStrategy:
def __init__(self,
hedge_after_ms: int = 100,
max_hedges: int = 2):
self.hedge_after_ms = hedge_after_ms
self.max_hedges = max_hedges
async def execute(self,
func: Callable[..., T],
*args, **kwargs) -> T:
"""Execute with hedged requests"""
tasks = []
results = []
errors = []
# Start first request
task = asyncio.create_task(self._execute_with_tracking(
func, args, kwargs, 0, results, errors
))
tasks.append(task)
# Start hedge timers
for hedge_num in range(1, self.max_hedges + 1):
hedge_task = asyncio.create_task(
self._start_hedge_after_delay(
func, args, kwargs, hedge_num,
results, errors, tasks
)
)
tasks.append(hedge_task)
# Wait for first success or all failures
while True:
if results:
# Cancel remaining tasks
for task in tasks:
if not task.done():
task.cancel()
return results[0]
if all(task.done() for task in tasks):
# All tasks completed without success
raise Exception(f"All hedged requests failed: {errors}")
await asyncio.sleep(0.01)
async def _execute_with_tracking(self,
func: Callable,
args: tuple,
kwargs: dict,
request_num: int,
results: List,
errors: List):
"""Execute function and track results"""
try:
result = await func(*args, **kwargs)
results.append(result)
logging.info(f"Hedged request {request_num} succeeded")
except Exception as e:
errors.append((request_num, str(e)))
logging.warning(f"Hedged request {request_num} failed: {e}")
async def _start_hedge_after_delay(self,
func: Callable,
args: tuple,
kwargs: dict,
hedge_num: int,
results: List,
errors: List,
tasks: List):
"""Start hedge request after delay"""
await asyncio.sleep(self.hedge_after_ms / 1000.0)
if not results: # Only start if no success yet
task = asyncio.create_task(self._execute_with_tracking(
func, args, kwargs, hedge_num, results, errors
))
tasks.append(task)
Monitoring and Metrics
Retry Metrics Collector
@dataclass
class RetryMetrics:
total_attempts: int = 0
successful_attempts: int = 0
failed_attempts: int = 0
retry_counts: Dict[int, int] = None # attempts -> count
error_types: Dict[str, int] = None
total_retry_time: float = 0
def __post_init__(self):
if self.retry_counts is None:
self.retry_counts = defaultdict(int)
if self.error_types is None:
self.error_types = defaultdict(int)
class MonitoredRetryStrategy:
def __init__(self, base_strategy: Any):
self.base_strategy = base_strategy
self.metrics = RetryMetrics()
self._lock = threading.Lock()
def execute(self, func: Callable[..., T], *args, **kwargs) -> T:
"""Execute with metrics collection"""
start_time = time.time()
attempts = 0
last_error = None
def on_retry(attempt: int, exception: Exception):
nonlocal attempts, last_error
attempts = attempt
last_error = exception
with self._lock:
self.metrics.error_types[type(exception).__name__] += 1
try:
# Pass our on_retry callback
if 'on_retry' in kwargs:
original_on_retry = kwargs['on_retry']
def combined_on_retry(attempt, exception):
on_retry(attempt, exception)
original_on_retry(attempt, exception)
kwargs['on_retry'] = combined_on_retry
else:
kwargs['on_retry'] = on_retry
result = self.base_strategy.execute(func, *args, **kwargs)
with self._lock:
self.metrics.total_attempts += 1
self.metrics.successful_attempts += 1
self.metrics.retry_counts[attempts] += 1
self.metrics.total_retry_time += time.time() - start_time
return result
except Exception as e:
with self._lock:
self.metrics.total_attempts += 1
self.metrics.failed_attempts += 1
self.metrics.retry_counts[attempts] += 1
self.metrics.total_retry_time += time.time() - start_time
if last_error:
self.metrics.error_types[type(last_error).__name__] += 1
raise
def get_metrics_summary(self) -> Dict[str, Any]:
"""Get metrics summary"""
with self._lock:
if self.metrics.total_attempts == 0:
return {"message": "No retry attempts yet"}
success_rate = self.metrics.successful_attempts / self.metrics.total_attempts
avg_retry_time = self.metrics.total_retry_time / self.metrics.total_attempts
# Calculate retry distribution
retry_distribution = dict(self.metrics.retry_counts)
return {
"total_attempts": self.metrics.total_attempts,
"success_rate": success_rate,
"failure_rate": 1 - success_rate,
"avg_retry_time": avg_retry_time,
"retry_distribution": retry_distribution,
"common_errors": dict(self.metrics.error_types),
"avg_retries_per_attempt": sum(
k * v for k, v in retry_distribution.items()
) / self.metrics.total_attempts
}
Best Practices
-
Idempotency: Ensure operations can be safely retried
def make_idempotent_request(request_id: str, data: Dict): # Use request_id to prevent duplicate processing if request_already_processed(request_id): return get_previous_result(request_id) result = process_request(data) store_result(request_id, result) return result -
Retry Budgets: Limit total retry time
class RetryBudget: def __init__(self, max_retry_seconds: float = 300): self.max_retry_seconds = max_retry_seconds self.start_time = None def can_retry(self) -> bool: if self.start_time is None: self.start_time = time.time() return True elapsed = time.time() - self.start_time return elapsed < self.max_retry_seconds -
Error Classification: Retry only appropriate errors
RETRYABLE_HTTP_CODES = {408, 429, 500, 502, 503, 504} def is_retryable_error(error: Exception) -> bool: if isinstance(error, HTTPError): return error.response.status_code in RETRYABLE_HTTP_CODES elif isinstance(error, ConnectionError): return True elif isinstance(error, TimeoutError): return True else: return False
Testing Retry Strategies
import pytest
from unittest.mock import Mock, call
def test_exponential_backoff():
retry = ExponentialBackoffRetry(RetryConfig(
max_attempts=3,
initial_delay=0.1,
jitter=False
))
# Mock function that fails twice then succeeds
mock_func = Mock(side_effect=[Exception("Fail 1"), Exception("Fail 2"), "Success"])
result = retry.execute(mock_func)
assert result == "Success"
assert mock_func.call_count == 3
def test_circuit_breaker_retry():
cb_retry = CircuitBreakerRetry(failure_threshold=2)
# Function that always fails
failing_func = Mock(side_effect=Exception("Always fails"))
# First two calls should retry and fail
for _ in range(2):
with pytest.raises(Exception):
cb_retry.execute(failing_func)
# Circuit should now be open
assert cb_retry.state == CircuitState.OPEN
# Next call should fail immediately
with pytest.raises(Exception, match="Circuit breaker is OPEN"):
cb_retry.execute(failing_func)
async def test_hedged_requests():
hedge_retry = HedgedRetryStrategy(hedge_after_ms=50, max_hedges=2)
call_count = 0
async def slow_then_fast():
nonlocal call_count
call_count += 1
if call_count == 1:
await asyncio.sleep(0.2) # Slow first request
return "slow"
else:
await asyncio.sleep(0.01) # Fast hedged request
return "fast"
result = await hedge_retry.execute(slow_then_fast)
assert result == "fast" # Should get fast response
assert call_count == 2 # Both requests started
Built-in Task Guardrail Retries
PraisonAI provides built-in retry functionality specifically for guardrail validation failures, distinct from the genericExponentialBackoffRetry patterns above.
Guardrail retries are handled automatically by the executor when
Task(guardrail=..., max_retries=...) is configured. This is separate from manual retry implementations.from praisonaiagents import Agent, Task, PraisonAIAgents
def validate_content(output):
"""Built-in guardrail with retry support"""
if len(output.raw.split()) < 50:
return False, "Content too short - needs at least 50 words"
return True, output
task = Task(
description="Write a detailed explanation",
agent=agent,
guardrail=validate_content,
max_retries=3, # Built-in executor-level retry
retry_with_feedback=True
)
# The executor automatically handles:
# - Guardrail validation
# - Retry logic on failure
# - Feedback to agent on retry
# - Final failure after max_retries

