Agent Retry Strategies
Implementing robust retry strategies is crucial for building resilient multi-agent systems that can handle transient failures gracefully. This guide covers various retry patterns and their implementation.Retry Strategy Fundamentals
When to Retry
- Transient Network Errors: Temporary connectivity issues
- Rate Limiting: API throttling responses
- Temporary Resource Unavailability: Database locks, service restarts
- Timeout Errors: Slow responses that exceed limits
- Partial Failures: When part of an operation succeeds
When NOT to Retry
- Authentication Failures: Invalid credentials
- Authorization Errors: Insufficient permissions
- Invalid Input: Malformed requests
- Business Logic Errors: Domain-specific failures
- Resource Not Found: 404 errors
Retry Patterns
1. Exponential Backoff with Jitter
Prevent thundering herd problems with randomized delays:Copy
import random
import time
from typing import TypeVar, Callable, Optional, Any
from dataclasses import dataclass
import logging
T = TypeVar('T')
@dataclass
class RetryConfig:
max_attempts: int = 3
initial_delay: float = 1.0
max_delay: float = 60.0
exponential_base: float = 2.0
jitter: bool = True
class ExponentialBackoffRetry:
def __init__(self, config: RetryConfig = None):
self.config = config or RetryConfig()
self.logger = logging.getLogger(__name__)
def execute(self,
func: Callable[..., T],
*args,
retryable_exceptions: tuple = (Exception,),
on_retry: Optional[Callable[[int, Exception], None]] = None,
**kwargs) -> T:
"""Execute function with exponential backoff retry"""
last_exception = None
for attempt in range(self.config.max_attempts):
try:
return func(*args, **kwargs)
except retryable_exceptions as e:
last_exception = e
if attempt == self.config.max_attempts - 1:
# Last attempt failed
self.logger.error(f"All {self.config.max_attempts} attempts failed")
raise
# Calculate delay
delay = self._calculate_delay(attempt)
# Call retry callback if provided
if on_retry:
on_retry(attempt + 1, e)
self.logger.warning(
f"Attempt {attempt + 1} failed: {str(e)}. "
f"Retrying in {delay:.2f} seconds..."
)
time.sleep(delay)
raise last_exception
def _calculate_delay(self, attempt: int) -> float:
"""Calculate delay with exponential backoff and jitter"""
# Exponential delay
delay = self.config.initial_delay * (self.config.exponential_base ** attempt)
# Cap at max delay
delay = min(delay, self.config.max_delay)
# Add jitter
if self.config.jitter:
# Full jitter strategy
delay = random.uniform(0, delay)
return delay
# Async version
import asyncio
class AsyncExponentialBackoffRetry:
def __init__(self, config: RetryConfig = None):
self.config = config or RetryConfig()
self.logger = logging.getLogger(__name__)
async def execute(self,
func: Callable[..., T],
*args,
retryable_exceptions: tuple = (Exception,),
on_retry: Optional[Callable[[int, Exception], None]] = None,
**kwargs) -> T:
"""Execute async function with exponential backoff retry"""
last_exception = None
for attempt in range(self.config.max_attempts):
try:
return await func(*args, **kwargs)
except retryable_exceptions as e:
last_exception = e
if attempt == self.config.max_attempts - 1:
self.logger.error(f"All {self.config.max_attempts} attempts failed")
raise
delay = self._calculate_delay(attempt)
if on_retry:
on_retry(attempt + 1, e)
self.logger.warning(
f"Attempt {attempt + 1} failed: {str(e)}. "
f"Retrying in {delay:.2f} seconds..."
)
await asyncio.sleep(delay)
raise last_exception
def _calculate_delay(self, attempt: int) -> float:
"""Calculate delay with exponential backoff and jitter"""
delay = self.config.initial_delay * (self.config.exponential_base ** attempt)
delay = min(delay, self.config.max_delay)
if self.config.jitter:
delay = random.uniform(0, delay)
return delay
2. Circuit Breaker with Retry
Combine circuit breaker pattern with intelligent retry:Copy
from datetime import datetime, timedelta
from enum import Enum
import threading
class CircuitState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
class CircuitBreakerRetry:
def __init__(self,
failure_threshold: int = 5,
recovery_timeout: int = 60,
half_open_max_calls: int = 3):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.half_open_max_calls = half_open_max_calls
self.state = CircuitState.CLOSED
self.failure_count = 0
self.last_failure_time = None
self.half_open_calls = 0
self._lock = threading.Lock()
self.retry_strategy = ExponentialBackoffRetry()
# Metrics
self.metrics = {
"total_calls": 0,
"successful_calls": 0,
"failed_calls": 0,
"rejected_calls": 0
}
def execute(self, func: Callable[..., T], *args, **kwargs) -> T:
"""Execute function with circuit breaker and retry logic"""
with self._lock:
self.metrics["total_calls"] += 1
if self.state == CircuitState.OPEN:
if self._should_attempt_reset():
self.state = CircuitState.HALF_OPEN
self.half_open_calls = 0
else:
self.metrics["rejected_calls"] += 1
raise Exception("Circuit breaker is OPEN")
if self.state == CircuitState.HALF_OPEN:
if self.half_open_calls >= self.half_open_max_calls:
self.metrics["rejected_calls"] += 1
raise Exception("Circuit breaker is HALF_OPEN, max calls reached")
self.half_open_calls += 1
try:
# Use retry strategy when circuit is closed or half-open
result = self.retry_strategy.execute(
func, *args,
on_retry=self._on_retry,
**kwargs
)
with self._lock:
self._on_success()
self.metrics["successful_calls"] += 1
return result
except Exception as e:
with self._lock:
self._on_failure()
self.metrics["failed_calls"] += 1
raise
def _should_attempt_reset(self) -> bool:
"""Check if circuit should attempt reset"""
return (
self.last_failure_time and
datetime.now() - self.last_failure_time > timedelta(seconds=self.recovery_timeout)
)
def _on_success(self):
"""Handle successful call"""
if self.state == CircuitState.HALF_OPEN:
self.state = CircuitState.CLOSED
self.failure_count = 0
self.last_failure_time = None
def _on_failure(self):
"""Handle failed call"""
self.failure_count += 1
self.last_failure_time = datetime.now()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
def _on_retry(self, attempt: int, exception: Exception):
"""Called when retry attempt is made"""
# Could implement additional logic here
pass
def get_state(self) -> Dict[str, Any]:
"""Get current circuit breaker state"""
with self._lock:
return {
"state": self.state.value,
"failure_count": self.failure_count,
"metrics": self.metrics.copy()
}
3. Adaptive Retry Strategy
Adjust retry behavior based on success patterns:Copy
import statistics
from collections import deque
class AdaptiveRetryStrategy:
def __init__(self,
min_attempts: int = 1,
max_attempts: int = 5,
success_threshold: float = 0.8,
window_size: int = 100):
self.min_attempts = min_attempts
self.max_attempts = max_attempts
self.success_threshold = success_threshold
self.window_size = window_size
self.current_max_attempts = max_attempts
self.results = deque(maxlen=window_size)
self.attempt_counts = deque(maxlen=window_size)
self._lock = threading.Lock()
def execute(self, func: Callable[..., T], *args, **kwargs) -> T:
"""Execute with adaptive retry"""
last_exception = None
attempts = 0
for attempt in range(1, self.current_max_attempts + 1):
attempts = attempt
try:
result = func(*args, **kwargs)
self._record_success(attempts)
return result
except Exception as e:
last_exception = e
if attempt < self.current_max_attempts:
# Adaptive delay based on attempt number
delay = self._calculate_adaptive_delay(attempt)
time.sleep(delay)
else:
self._record_failure(attempts)
raise
raise last_exception
def _calculate_adaptive_delay(self, attempt: int) -> float:
"""Calculate delay based on recent performance"""
base_delay = 1.0
with self._lock:
if len(self.results) >= 10:
# Adjust delay based on recent success rate
success_rate = sum(self.results) / len(self.results)
if success_rate < 0.3:
# High failure rate - increase delays
base_delay *= 2
elif success_rate > 0.8:
# High success rate - decrease delays
base_delay *= 0.5
# Add some randomness
delay = base_delay * (2 ** (attempt - 1))
return min(delay + random.uniform(-0.5, 0.5), 30.0)
def _record_success(self, attempts: int):
"""Record successful execution"""
with self._lock:
self.results.append(True)
self.attempt_counts.append(attempts)
self._adapt_strategy()
def _record_failure(self, attempts: int):
"""Record failed execution"""
with self._lock:
self.results.append(False)
self.attempt_counts.append(attempts)
self._adapt_strategy()
def _adapt_strategy(self):
"""Adapt retry strategy based on recent performance"""
if len(self.results) < 20:
return
success_rate = sum(self.results) / len(self.results)
avg_attempts = statistics.mean(self.attempt_counts)
if success_rate > self.success_threshold and avg_attempts < 2:
# High success with few retries - can reduce max attempts
self.current_max_attempts = max(
self.min_attempts,
self.current_max_attempts - 1
)
elif success_rate < 0.5 and avg_attempts > 3:
# Low success with many retries - increase max attempts
self.current_max_attempts = min(
self.max_attempts,
self.current_max_attempts + 1
)
def get_stats(self) -> Dict[str, Any]:
"""Get adaptive strategy statistics"""
with self._lock:
if not self.results:
return {"current_max_attempts": self.current_max_attempts}
return {
"current_max_attempts": self.current_max_attempts,
"success_rate": sum(self.results) / len(self.results),
"avg_attempts": statistics.mean(self.attempt_counts),
"sample_size": len(self.results)
}
4. Retry with Fallback
Implement retry with progressive fallback options:Copy
@dataclass
class RetryWithFallbackConfig:
primary_retry_attempts: int = 3
fallback_retry_attempts: int = 2
use_cache_on_failure: bool = True
class RetryWithFallback:
def __init__(self, config: RetryWithFallbackConfig = None):
self.config = config or RetryWithFallbackConfig()
self.cache = {}
self.primary_retry = ExponentialBackoffRetry(
RetryConfig(max_attempts=self.config.primary_retry_attempts)
)
self.fallback_retry = ExponentialBackoffRetry(
RetryConfig(max_attempts=self.config.fallback_retry_attempts)
)
def execute(self,
primary_func: Callable[..., T],
fallback_func: Optional[Callable[..., T]] = None,
cache_key: Optional[str] = None,
*args, **kwargs) -> T:
"""Execute with retry and fallback"""
# Try primary function with retry
try:
result = self.primary_retry.execute(primary_func, *args, **kwargs)
# Cache successful result
if cache_key:
self.cache[cache_key] = {
"value": result,
"timestamp": time.time()
}
return result
except Exception as primary_error:
logging.warning(f"Primary function failed: {primary_error}")
# Try fallback if available
if fallback_func:
try:
return self.fallback_retry.execute(fallback_func, *args, **kwargs)
except Exception as fallback_error:
logging.warning(f"Fallback function failed: {fallback_error}")
# Try cache if enabled
if self.config.use_cache_on_failure and cache_key and cache_key in self.cache:
cached = self.cache[cache_key]
logging.info(f"Returning cached result from {time.time() - cached['timestamp']:.1f}s ago")
return cached["value"]
# All options exhausted
raise primary_error
5. Contextual Retry Strategy
Different retry strategies based on context:Copy
class ContextualRetryStrategy:
def __init__(self):
self.strategies = {}
self.default_strategy = ExponentialBackoffRetry()
def register_strategy(self, context: str, strategy: Any):
"""Register a retry strategy for a specific context"""
self.strategies[context] = strategy
def execute(self,
func: Callable[..., T],
context: str,
*args, **kwargs) -> T:
"""Execute with context-appropriate retry strategy"""
# Select strategy based on context
strategy = self.strategies.get(context, self.default_strategy)
# Add context-specific error handling
if context == "database":
retryable_exceptions = (DBConnectionError, TimeoutError)
elif context == "api":
retryable_exceptions = (RequestException, HTTPError)
elif context == "ml_model":
retryable_exceptions = (ModelLoadError, InferenceError)
else:
retryable_exceptions = (Exception,)
return strategy.execute(
func, *args,
retryable_exceptions=retryable_exceptions,
**kwargs
)
# Usage example
retry_manager = ContextualRetryStrategy()
# Register specific strategies
retry_manager.register_strategy(
"database",
ExponentialBackoffRetry(RetryConfig(
max_attempts=5,
initial_delay=0.1,
max_delay=10.0
))
)
retry_manager.register_strategy(
"api",
ExponentialBackoffRetry(RetryConfig(
max_attempts=3,
initial_delay=1.0,
max_delay=30.0,
jitter=True
))
)
Advanced Retry Patterns
1. Bulkhead Retry Pattern
Isolate retry resources to prevent cascade failures:Copy
from concurrent.futures import ThreadPoolExecutor, Future
import queue
class BulkheadRetry:
def __init__(self,
max_concurrent_retries: int = 10,
queue_size: int = 100):
self.max_concurrent_retries = max_concurrent_retries
self.executor = ThreadPoolExecutor(max_workers=max_concurrent_retries)
self.retry_queue = queue.Queue(maxsize=queue_size)
self.active_retries = 0
self._lock = threading.Lock()
def execute_with_bulkhead(self,
func: Callable[..., T],
*args,
retry_config: RetryConfig = None,
**kwargs) -> Future[T]:
"""Execute with bulkhead isolation"""
retry_config = retry_config or RetryConfig()
# Check if we can accept more retries
with self._lock:
if self.active_retries >= self.max_concurrent_retries:
try:
# Try to queue
self.retry_queue.put_nowait((func, args, kwargs, retry_config))
return self._create_pending_future()
except queue.Full:
raise Exception("Retry bulkhead is full")
self.active_retries += 1
# Submit retry task
future = self.executor.submit(
self._execute_with_retry,
func, args, kwargs, retry_config
)
# Decrement counter when done
future.add_done_callback(lambda f: self._on_retry_complete())
return future
def _execute_with_retry(self, func, args, kwargs, retry_config):
"""Execute function with retry"""
retry_strategy = ExponentialBackoffRetry(retry_config)
return retry_strategy.execute(func, *args, **kwargs)
def _on_retry_complete(self):
"""Called when retry completes"""
with self._lock:
self.active_retries -= 1
# Process queued retries
if not self.retry_queue.empty():
try:
func, args, kwargs, retry_config = self.retry_queue.get_nowait()
self.active_retries += 1
future = self.executor.submit(
self._execute_with_retry,
func, args, kwargs, retry_config
)
future.add_done_callback(lambda f: self._on_retry_complete())
except queue.Empty:
pass
def _create_pending_future(self) -> Future[T]:
"""Create a future that will be resolved when retry executes"""
future = Future()
# Implementation depends on your needs
return future
2. Hedged Requests Pattern
Send multiple requests and use the first successful response:Copy
import asyncio
from typing import List
class HedgedRetryStrategy:
def __init__(self,
hedge_after_ms: int = 100,
max_hedges: int = 2):
self.hedge_after_ms = hedge_after_ms
self.max_hedges = max_hedges
async def execute(self,
func: Callable[..., T],
*args, **kwargs) -> T:
"""Execute with hedged requests"""
tasks = []
results = []
errors = []
# Start first request
task = asyncio.create_task(self._execute_with_tracking(
func, args, kwargs, 0, results, errors
))
tasks.append(task)
# Start hedge timers
for hedge_num in range(1, self.max_hedges + 1):
hedge_task = asyncio.create_task(
self._start_hedge_after_delay(
func, args, kwargs, hedge_num,
results, errors, tasks
)
)
tasks.append(hedge_task)
# Wait for first success or all failures
while True:
if results:
# Cancel remaining tasks
for task in tasks:
if not task.done():
task.cancel()
return results[0]
if all(task.done() for task in tasks):
# All tasks completed without success
raise Exception(f"All hedged requests failed: {errors}")
await asyncio.sleep(0.01)
async def _execute_with_tracking(self,
func: Callable,
args: tuple,
kwargs: dict,
request_num: int,
results: List,
errors: List):
"""Execute function and track results"""
try:
result = await func(*args, **kwargs)
results.append(result)
logging.info(f"Hedged request {request_num} succeeded")
except Exception as e:
errors.append((request_num, str(e)))
logging.warning(f"Hedged request {request_num} failed: {e}")
async def _start_hedge_after_delay(self,
func: Callable,
args: tuple,
kwargs: dict,
hedge_num: int,
results: List,
errors: List,
tasks: List):
"""Start hedge request after delay"""
await asyncio.sleep(self.hedge_after_ms / 1000.0)
if not results: # Only start if no success yet
task = asyncio.create_task(self._execute_with_tracking(
func, args, kwargs, hedge_num, results, errors
))
tasks.append(task)
Monitoring and Metrics
Retry Metrics Collector
Copy
@dataclass
class RetryMetrics:
total_attempts: int = 0
successful_attempts: int = 0
failed_attempts: int = 0
retry_counts: Dict[int, int] = None # attempts -> count
error_types: Dict[str, int] = None
total_retry_time: float = 0
def __post_init__(self):
if self.retry_counts is None:
self.retry_counts = defaultdict(int)
if self.error_types is None:
self.error_types = defaultdict(int)
class MonitoredRetryStrategy:
def __init__(self, base_strategy: Any):
self.base_strategy = base_strategy
self.metrics = RetryMetrics()
self._lock = threading.Lock()
def execute(self, func: Callable[..., T], *args, **kwargs) -> T:
"""Execute with metrics collection"""
start_time = time.time()
attempts = 0
last_error = None
def on_retry(attempt: int, exception: Exception):
nonlocal attempts, last_error
attempts = attempt
last_error = exception
with self._lock:
self.metrics.error_types[type(exception).__name__] += 1
try:
# Pass our on_retry callback
if 'on_retry' in kwargs:
original_on_retry = kwargs['on_retry']
def combined_on_retry(attempt, exception):
on_retry(attempt, exception)
original_on_retry(attempt, exception)
kwargs['on_retry'] = combined_on_retry
else:
kwargs['on_retry'] = on_retry
result = self.base_strategy.execute(func, *args, **kwargs)
with self._lock:
self.metrics.total_attempts += 1
self.metrics.successful_attempts += 1
self.metrics.retry_counts[attempts] += 1
self.metrics.total_retry_time += time.time() - start_time
return result
except Exception as e:
with self._lock:
self.metrics.total_attempts += 1
self.metrics.failed_attempts += 1
self.metrics.retry_counts[attempts] += 1
self.metrics.total_retry_time += time.time() - start_time
if last_error:
self.metrics.error_types[type(last_error).__name__] += 1
raise
def get_metrics_summary(self) -> Dict[str, Any]:
"""Get metrics summary"""
with self._lock:
if self.metrics.total_attempts == 0:
return {"message": "No retry attempts yet"}
success_rate = self.metrics.successful_attempts / self.metrics.total_attempts
avg_retry_time = self.metrics.total_retry_time / self.metrics.total_attempts
# Calculate retry distribution
retry_distribution = dict(self.metrics.retry_counts)
return {
"total_attempts": self.metrics.total_attempts,
"success_rate": success_rate,
"failure_rate": 1 - success_rate,
"avg_retry_time": avg_retry_time,
"retry_distribution": retry_distribution,
"common_errors": dict(self.metrics.error_types),
"avg_retries_per_attempt": sum(
k * v for k, v in retry_distribution.items()
) / self.metrics.total_attempts
}
Best Practices
-
Idempotency: Ensure operations can be safely retried
Copy
def make_idempotent_request(request_id: str, data: Dict): # Use request_id to prevent duplicate processing if request_already_processed(request_id): return get_previous_result(request_id) result = process_request(data) store_result(request_id, result) return result -
Retry Budgets: Limit total retry time
Copy
class RetryBudget: def __init__(self, max_retry_seconds: float = 300): self.max_retry_seconds = max_retry_seconds self.start_time = None def can_retry(self) -> bool: if self.start_time is None: self.start_time = time.time() return True elapsed = time.time() - self.start_time return elapsed < self.max_retry_seconds -
Error Classification: Retry only appropriate errors
Copy
RETRYABLE_HTTP_CODES = {408, 429, 500, 502, 503, 504} def is_retryable_error(error: Exception) -> bool: if isinstance(error, HTTPError): return error.response.status_code in RETRYABLE_HTTP_CODES elif isinstance(error, ConnectionError): return True elif isinstance(error, TimeoutError): return True else: return False
Testing Retry Strategies
Copy
import pytest
from unittest.mock import Mock, call
def test_exponential_backoff():
retry = ExponentialBackoffRetry(RetryConfig(
max_attempts=3,
initial_delay=0.1,
jitter=False
))
# Mock function that fails twice then succeeds
mock_func = Mock(side_effect=[Exception("Fail 1"), Exception("Fail 2"), "Success"])
result = retry.execute(mock_func)
assert result == "Success"
assert mock_func.call_count == 3
def test_circuit_breaker_retry():
cb_retry = CircuitBreakerRetry(failure_threshold=2)
# Function that always fails
failing_func = Mock(side_effect=Exception("Always fails"))
# First two calls should retry and fail
for _ in range(2):
with pytest.raises(Exception):
cb_retry.execute(failing_func)
# Circuit should now be open
assert cb_retry.state == CircuitState.OPEN
# Next call should fail immediately
with pytest.raises(Exception, match="Circuit breaker is OPEN"):
cb_retry.execute(failing_func)
async def test_hedged_requests():
hedge_retry = HedgedRetryStrategy(hedge_after_ms=50, max_hedges=2)
call_count = 0
async def slow_then_fast():
nonlocal call_count
call_count += 1
if call_count == 1:
await asyncio.sleep(0.2) # Slow first request
return "slow"
else:
await asyncio.sleep(0.01) # Fast hedged request
return "fast"
result = await hedge_retry.execute(slow_then_fast)
assert result == "fast" # Should get fast response
assert call_count == 2 # Both requests started

