Skip to main content

Agent Retry Strategies

Implementing robust retry strategies is crucial for building resilient multi-agent systems that can handle transient failures gracefully. This guide covers various retry patterns and their implementation.

Retry Strategy Fundamentals

When to Retry

  1. Transient Network Errors: Temporary connectivity issues
  2. Rate Limiting: API throttling responses
  3. Temporary Resource Unavailability: Database locks, service restarts
  4. Timeout Errors: Slow responses that exceed limits
  5. Partial Failures: When part of an operation succeeds

When NOT to Retry

  1. Authentication Failures: Invalid credentials
  2. Authorization Errors: Insufficient permissions
  3. Invalid Input: Malformed requests
  4. Business Logic Errors: Domain-specific failures
  5. Resource Not Found: 404 errors

Retry Patterns

1. Exponential Backoff with Jitter

Prevent thundering herd problems with randomized delays:
import random
import time
from typing import TypeVar, Callable, Optional, Any
from dataclasses import dataclass
import logging

T = TypeVar('T')

@dataclass
class RetryConfig:
    max_attempts: int = 3
    initial_delay: float = 1.0
    max_delay: float = 60.0
    exponential_base: float = 2.0
    jitter: bool = True
    
class ExponentialBackoffRetry:
    def __init__(self, config: RetryConfig = None):
        self.config = config or RetryConfig()
        self.logger = logging.getLogger(__name__)
        
    def execute(self, 
                func: Callable[..., T],
                *args,
                retryable_exceptions: tuple = (Exception,),
                on_retry: Optional[Callable[[int, Exception], None]] = None,
                **kwargs) -> T:
        """Execute function with exponential backoff retry"""
        
        last_exception = None
        
        for attempt in range(self.config.max_attempts):
            try:
                return func(*args, **kwargs)
                
            except retryable_exceptions as e:
                last_exception = e
                
                if attempt == self.config.max_attempts - 1:
                    # Last attempt failed
                    self.logger.error(f"All {self.config.max_attempts} attempts failed")
                    raise
                
                # Calculate delay
                delay = self._calculate_delay(attempt)
                
                # Call retry callback if provided
                if on_retry:
                    on_retry(attempt + 1, e)
                
                self.logger.warning(
                    f"Attempt {attempt + 1} failed: {str(e)}. "
                    f"Retrying in {delay:.2f} seconds..."
                )
                
                time.sleep(delay)
        
        raise last_exception
    
    def _calculate_delay(self, attempt: int) -> float:
        """Calculate delay with exponential backoff and jitter"""
        # Exponential delay
        delay = self.config.initial_delay * (self.config.exponential_base ** attempt)
        
        # Cap at max delay
        delay = min(delay, self.config.max_delay)
        
        # Add jitter
        if self.config.jitter:
            # Full jitter strategy
            delay = random.uniform(0, delay)
        
        return delay

# Async version
import asyncio

class AsyncExponentialBackoffRetry:
    def __init__(self, config: RetryConfig = None):
        self.config = config or RetryConfig()
        self.logger = logging.getLogger(__name__)
    
    async def execute(self,
                     func: Callable[..., T],
                     *args,
                     retryable_exceptions: tuple = (Exception,),
                     on_retry: Optional[Callable[[int, Exception], None]] = None,
                     **kwargs) -> T:
        """Execute async function with exponential backoff retry"""
        
        last_exception = None
        
        for attempt in range(self.config.max_attempts):
            try:
                return await func(*args, **kwargs)
                
            except retryable_exceptions as e:
                last_exception = e
                
                if attempt == self.config.max_attempts - 1:
                    self.logger.error(f"All {self.config.max_attempts} attempts failed")
                    raise
                
                delay = self._calculate_delay(attempt)
                
                if on_retry:
                    on_retry(attempt + 1, e)
                
                self.logger.warning(
                    f"Attempt {attempt + 1} failed: {str(e)}. "
                    f"Retrying in {delay:.2f} seconds..."
                )
                
                await asyncio.sleep(delay)
        
        raise last_exception
    
    def _calculate_delay(self, attempt: int) -> float:
        """Calculate delay with exponential backoff and jitter"""
        delay = self.config.initial_delay * (self.config.exponential_base ** attempt)
        delay = min(delay, self.config.max_delay)
        
        if self.config.jitter:
            delay = random.uniform(0, delay)
        
        return delay

2. Circuit Breaker with Retry

Combine circuit breaker pattern with intelligent retry:
from datetime import datetime, timedelta
from enum import Enum
import threading

class CircuitState(Enum):
    CLOSED = "closed"
    OPEN = "open"
    HALF_OPEN = "half_open"

class CircuitBreakerRetry:
    def __init__(self,
                 failure_threshold: int = 5,
                 recovery_timeout: int = 60,
                 half_open_max_calls: int = 3):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.half_open_max_calls = half_open_max_calls
        
        self.state = CircuitState.CLOSED
        self.failure_count = 0
        self.last_failure_time = None
        self.half_open_calls = 0
        
        self._lock = threading.Lock()
        self.retry_strategy = ExponentialBackoffRetry()
        
        # Metrics
        self.metrics = {
            "total_calls": 0,
            "successful_calls": 0,
            "failed_calls": 0,
            "rejected_calls": 0
        }
    
    def execute(self, func: Callable[..., T], *args, **kwargs) -> T:
        """Execute function with circuit breaker and retry logic"""
        with self._lock:
            self.metrics["total_calls"] += 1
            
            if self.state == CircuitState.OPEN:
                if self._should_attempt_reset():
                    self.state = CircuitState.HALF_OPEN
                    self.half_open_calls = 0
                else:
                    self.metrics["rejected_calls"] += 1
                    raise Exception("Circuit breaker is OPEN")
            
            if self.state == CircuitState.HALF_OPEN:
                if self.half_open_calls >= self.half_open_max_calls:
                    self.metrics["rejected_calls"] += 1
                    raise Exception("Circuit breaker is HALF_OPEN, max calls reached")
                self.half_open_calls += 1
        
        try:
            # Use retry strategy when circuit is closed or half-open
            result = self.retry_strategy.execute(
                func, *args,
                on_retry=self._on_retry,
                **kwargs
            )
            
            with self._lock:
                self._on_success()
                self.metrics["successful_calls"] += 1
            
            return result
            
        except Exception as e:
            with self._lock:
                self._on_failure()
                self.metrics["failed_calls"] += 1
            raise
    
    def _should_attempt_reset(self) -> bool:
        """Check if circuit should attempt reset"""
        return (
            self.last_failure_time and
            datetime.now() - self.last_failure_time > timedelta(seconds=self.recovery_timeout)
        )
    
    def _on_success(self):
        """Handle successful call"""
        if self.state == CircuitState.HALF_OPEN:
            self.state = CircuitState.CLOSED
            self.failure_count = 0
            self.last_failure_time = None
    
    def _on_failure(self):
        """Handle failed call"""
        self.failure_count += 1
        self.last_failure_time = datetime.now()
        
        if self.failure_count >= self.failure_threshold:
            self.state = CircuitState.OPEN
    
    def _on_retry(self, attempt: int, exception: Exception):
        """Called when retry attempt is made"""
        # Could implement additional logic here
        pass
    
    def get_state(self) -> Dict[str, Any]:
        """Get current circuit breaker state"""
        with self._lock:
            return {
                "state": self.state.value,
                "failure_count": self.failure_count,
                "metrics": self.metrics.copy()
            }

3. Adaptive Retry Strategy

Adjust retry behavior based on success patterns:
import statistics
from collections import deque

class AdaptiveRetryStrategy:
    def __init__(self,
                 min_attempts: int = 1,
                 max_attempts: int = 5,
                 success_threshold: float = 0.8,
                 window_size: int = 100):
        self.min_attempts = min_attempts
        self.max_attempts = max_attempts
        self.success_threshold = success_threshold
        self.window_size = window_size
        
        self.current_max_attempts = max_attempts
        self.results = deque(maxlen=window_size)
        self.attempt_counts = deque(maxlen=window_size)
        
        self._lock = threading.Lock()
    
    def execute(self, func: Callable[..., T], *args, **kwargs) -> T:
        """Execute with adaptive retry"""
        last_exception = None
        attempts = 0
        
        for attempt in range(1, self.current_max_attempts + 1):
            attempts = attempt
            
            try:
                result = func(*args, **kwargs)
                self._record_success(attempts)
                return result
                
            except Exception as e:
                last_exception = e
                
                if attempt < self.current_max_attempts:
                    # Adaptive delay based on attempt number
                    delay = self._calculate_adaptive_delay(attempt)
                    time.sleep(delay)
                else:
                    self._record_failure(attempts)
                    raise
        
        raise last_exception
    
    def _calculate_adaptive_delay(self, attempt: int) -> float:
        """Calculate delay based on recent performance"""
        base_delay = 1.0
        
        with self._lock:
            if len(self.results) >= 10:
                # Adjust delay based on recent success rate
                success_rate = sum(self.results) / len(self.results)
                
                if success_rate < 0.3:
                    # High failure rate - increase delays
                    base_delay *= 2
                elif success_rate > 0.8:
                    # High success rate - decrease delays
                    base_delay *= 0.5
            
            # Add some randomness
            delay = base_delay * (2 ** (attempt - 1))
            return min(delay + random.uniform(-0.5, 0.5), 30.0)
    
    def _record_success(self, attempts: int):
        """Record successful execution"""
        with self._lock:
            self.results.append(True)
            self.attempt_counts.append(attempts)
            self._adapt_strategy()
    
    def _record_failure(self, attempts: int):
        """Record failed execution"""
        with self._lock:
            self.results.append(False)
            self.attempt_counts.append(attempts)
            self._adapt_strategy()
    
    def _adapt_strategy(self):
        """Adapt retry strategy based on recent performance"""
        if len(self.results) < 20:
            return
        
        success_rate = sum(self.results) / len(self.results)
        avg_attempts = statistics.mean(self.attempt_counts)
        
        if success_rate > self.success_threshold and avg_attempts < 2:
            # High success with few retries - can reduce max attempts
            self.current_max_attempts = max(
                self.min_attempts,
                self.current_max_attempts - 1
            )
        elif success_rate < 0.5 and avg_attempts > 3:
            # Low success with many retries - increase max attempts
            self.current_max_attempts = min(
                self.max_attempts,
                self.current_max_attempts + 1
            )
    
    def get_stats(self) -> Dict[str, Any]:
        """Get adaptive strategy statistics"""
        with self._lock:
            if not self.results:
                return {"current_max_attempts": self.current_max_attempts}
            
            return {
                "current_max_attempts": self.current_max_attempts,
                "success_rate": sum(self.results) / len(self.results),
                "avg_attempts": statistics.mean(self.attempt_counts),
                "sample_size": len(self.results)
            }

4. Retry with Fallback

Implement retry with progressive fallback options:
@dataclass
class RetryWithFallbackConfig:
    primary_retry_attempts: int = 3
    fallback_retry_attempts: int = 2
    use_cache_on_failure: bool = True

class RetryWithFallback:
    def __init__(self, config: RetryWithFallbackConfig = None):
        self.config = config or RetryWithFallbackConfig()
        self.cache = {}
        self.primary_retry = ExponentialBackoffRetry(
            RetryConfig(max_attempts=self.config.primary_retry_attempts)
        )
        self.fallback_retry = ExponentialBackoffRetry(
            RetryConfig(max_attempts=self.config.fallback_retry_attempts)
        )
    
    def execute(self,
                primary_func: Callable[..., T],
                fallback_func: Optional[Callable[..., T]] = None,
                cache_key: Optional[str] = None,
                *args, **kwargs) -> T:
        """Execute with retry and fallback"""
        
        # Try primary function with retry
        try:
            result = self.primary_retry.execute(primary_func, *args, **kwargs)
            
            # Cache successful result
            if cache_key:
                self.cache[cache_key] = {
                    "value": result,
                    "timestamp": time.time()
                }
            
            return result
            
        except Exception as primary_error:
            logging.warning(f"Primary function failed: {primary_error}")
            
            # Try fallback if available
            if fallback_func:
                try:
                    return self.fallback_retry.execute(fallback_func, *args, **kwargs)
                except Exception as fallback_error:
                    logging.warning(f"Fallback function failed: {fallback_error}")
            
            # Try cache if enabled
            if self.config.use_cache_on_failure and cache_key and cache_key in self.cache:
                cached = self.cache[cache_key]
                logging.info(f"Returning cached result from {time.time() - cached['timestamp']:.1f}s ago")
                return cached["value"]
            
            # All options exhausted
            raise primary_error

5. Contextual Retry Strategy

Different retry strategies based on context:
class ContextualRetryStrategy:
    def __init__(self):
        self.strategies = {}
        self.default_strategy = ExponentialBackoffRetry()
        
    def register_strategy(self, context: str, strategy: Any):
        """Register a retry strategy for a specific context"""
        self.strategies[context] = strategy
    
    def execute(self,
                func: Callable[..., T],
                context: str,
                *args, **kwargs) -> T:
        """Execute with context-appropriate retry strategy"""
        
        # Select strategy based on context
        strategy = self.strategies.get(context, self.default_strategy)
        
        # Add context-specific error handling
        if context == "database":
            retryable_exceptions = (DBConnectionError, TimeoutError)
        elif context == "api":
            retryable_exceptions = (RequestException, HTTPError)
        elif context == "ml_model":
            retryable_exceptions = (ModelLoadError, InferenceError)
        else:
            retryable_exceptions = (Exception,)
        
        return strategy.execute(
            func, *args,
            retryable_exceptions=retryable_exceptions,
            **kwargs
        )

# Usage example
retry_manager = ContextualRetryStrategy()

# Register specific strategies
retry_manager.register_strategy(
    "database",
    ExponentialBackoffRetry(RetryConfig(
        max_attempts=5,
        initial_delay=0.1,
        max_delay=10.0
    ))
)

retry_manager.register_strategy(
    "api",
    ExponentialBackoffRetry(RetryConfig(
        max_attempts=3,
        initial_delay=1.0,
        max_delay=30.0,
        jitter=True
    ))
)

Advanced Retry Patterns

1. Bulkhead Retry Pattern

Isolate retry resources to prevent cascade failures:
from concurrent.futures import ThreadPoolExecutor, Future
import queue

class BulkheadRetry:
    def __init__(self,
                 max_concurrent_retries: int = 10,
                 queue_size: int = 100):
        self.max_concurrent_retries = max_concurrent_retries
        self.executor = ThreadPoolExecutor(max_workers=max_concurrent_retries)
        self.retry_queue = queue.Queue(maxsize=queue_size)
        self.active_retries = 0
        self._lock = threading.Lock()
        
    def execute_with_bulkhead(self,
                             func: Callable[..., T],
                             *args,
                             retry_config: RetryConfig = None,
                             **kwargs) -> Future[T]:
        """Execute with bulkhead isolation"""
        
        retry_config = retry_config or RetryConfig()
        
        # Check if we can accept more retries
        with self._lock:
            if self.active_retries >= self.max_concurrent_retries:
                try:
                    # Try to queue
                    self.retry_queue.put_nowait((func, args, kwargs, retry_config))
                    return self._create_pending_future()
                except queue.Full:
                    raise Exception("Retry bulkhead is full")
            
            self.active_retries += 1
        
        # Submit retry task
        future = self.executor.submit(
            self._execute_with_retry,
            func, args, kwargs, retry_config
        )
        
        # Decrement counter when done
        future.add_done_callback(lambda f: self._on_retry_complete())
        
        return future
    
    def _execute_with_retry(self, func, args, kwargs, retry_config):
        """Execute function with retry"""
        retry_strategy = ExponentialBackoffRetry(retry_config)
        return retry_strategy.execute(func, *args, **kwargs)
    
    def _on_retry_complete(self):
        """Called when retry completes"""
        with self._lock:
            self.active_retries -= 1
            
            # Process queued retries
            if not self.retry_queue.empty():
                try:
                    func, args, kwargs, retry_config = self.retry_queue.get_nowait()
                    self.active_retries += 1
                    
                    future = self.executor.submit(
                        self._execute_with_retry,
                        func, args, kwargs, retry_config
                    )
                    future.add_done_callback(lambda f: self._on_retry_complete())
                except queue.Empty:
                    pass
    
    def _create_pending_future(self) -> Future[T]:
        """Create a future that will be resolved when retry executes"""
        future = Future()
        # Implementation depends on your needs
        return future

2. Hedged Requests Pattern

Send multiple requests and use the first successful response:
import asyncio
from typing import List

class HedgedRetryStrategy:
    def __init__(self,
                 hedge_after_ms: int = 100,
                 max_hedges: int = 2):
        self.hedge_after_ms = hedge_after_ms
        self.max_hedges = max_hedges
        
    async def execute(self,
                     func: Callable[..., T],
                     *args, **kwargs) -> T:
        """Execute with hedged requests"""
        
        tasks = []
        results = []
        errors = []
        
        # Start first request
        task = asyncio.create_task(self._execute_with_tracking(
            func, args, kwargs, 0, results, errors
        ))
        tasks.append(task)
        
        # Start hedge timers
        for hedge_num in range(1, self.max_hedges + 1):
            hedge_task = asyncio.create_task(
                self._start_hedge_after_delay(
                    func, args, kwargs, hedge_num,
                    results, errors, tasks
                )
            )
            tasks.append(hedge_task)
        
        # Wait for first success or all failures
        while True:
            if results:
                # Cancel remaining tasks
                for task in tasks:
                    if not task.done():
                        task.cancel()
                
                return results[0]
            
            if all(task.done() for task in tasks):
                # All tasks completed without success
                raise Exception(f"All hedged requests failed: {errors}")
            
            await asyncio.sleep(0.01)
    
    async def _execute_with_tracking(self,
                                   func: Callable,
                                   args: tuple,
                                   kwargs: dict,
                                   request_num: int,
                                   results: List,
                                   errors: List):
        """Execute function and track results"""
        try:
            result = await func(*args, **kwargs)
            results.append(result)
            logging.info(f"Hedged request {request_num} succeeded")
        except Exception as e:
            errors.append((request_num, str(e)))
            logging.warning(f"Hedged request {request_num} failed: {e}")
    
    async def _start_hedge_after_delay(self,
                                     func: Callable,
                                     args: tuple,
                                     kwargs: dict,
                                     hedge_num: int,
                                     results: List,
                                     errors: List,
                                     tasks: List):
        """Start hedge request after delay"""
        await asyncio.sleep(self.hedge_after_ms / 1000.0)
        
        if not results:  # Only start if no success yet
            task = asyncio.create_task(self._execute_with_tracking(
                func, args, kwargs, hedge_num, results, errors
            ))
            tasks.append(task)

Monitoring and Metrics

Retry Metrics Collector

@dataclass
class RetryMetrics:
    total_attempts: int = 0
    successful_attempts: int = 0
    failed_attempts: int = 0
    retry_counts: Dict[int, int] = None  # attempts -> count
    error_types: Dict[str, int] = None
    total_retry_time: float = 0
    
    def __post_init__(self):
        if self.retry_counts is None:
            self.retry_counts = defaultdict(int)
        if self.error_types is None:
            self.error_types = defaultdict(int)

class MonitoredRetryStrategy:
    def __init__(self, base_strategy: Any):
        self.base_strategy = base_strategy
        self.metrics = RetryMetrics()
        self._lock = threading.Lock()
        
    def execute(self, func: Callable[..., T], *args, **kwargs) -> T:
        """Execute with metrics collection"""
        start_time = time.time()
        attempts = 0
        last_error = None
        
        def on_retry(attempt: int, exception: Exception):
            nonlocal attempts, last_error
            attempts = attempt
            last_error = exception
            
            with self._lock:
                self.metrics.error_types[type(exception).__name__] += 1
        
        try:
            # Pass our on_retry callback
            if 'on_retry' in kwargs:
                original_on_retry = kwargs['on_retry']
                def combined_on_retry(attempt, exception):
                    on_retry(attempt, exception)
                    original_on_retry(attempt, exception)
                kwargs['on_retry'] = combined_on_retry
            else:
                kwargs['on_retry'] = on_retry
            
            result = self.base_strategy.execute(func, *args, **kwargs)
            
            with self._lock:
                self.metrics.total_attempts += 1
                self.metrics.successful_attempts += 1
                self.metrics.retry_counts[attempts] += 1
                self.metrics.total_retry_time += time.time() - start_time
            
            return result
            
        except Exception as e:
            with self._lock:
                self.metrics.total_attempts += 1
                self.metrics.failed_attempts += 1
                self.metrics.retry_counts[attempts] += 1
                self.metrics.total_retry_time += time.time() - start_time
                
                if last_error:
                    self.metrics.error_types[type(last_error).__name__] += 1
                
            raise
    
    def get_metrics_summary(self) -> Dict[str, Any]:
        """Get metrics summary"""
        with self._lock:
            if self.metrics.total_attempts == 0:
                return {"message": "No retry attempts yet"}
            
            success_rate = self.metrics.successful_attempts / self.metrics.total_attempts
            avg_retry_time = self.metrics.total_retry_time / self.metrics.total_attempts
            
            # Calculate retry distribution
            retry_distribution = dict(self.metrics.retry_counts)
            
            return {
                "total_attempts": self.metrics.total_attempts,
                "success_rate": success_rate,
                "failure_rate": 1 - success_rate,
                "avg_retry_time": avg_retry_time,
                "retry_distribution": retry_distribution,
                "common_errors": dict(self.metrics.error_types),
                "avg_retries_per_attempt": sum(
                    k * v for k, v in retry_distribution.items()
                ) / self.metrics.total_attempts
            }

Best Practices

  1. Idempotency: Ensure operations can be safely retried
    def make_idempotent_request(request_id: str, data: Dict):
        # Use request_id to prevent duplicate processing
        if request_already_processed(request_id):
            return get_previous_result(request_id)
        
        result = process_request(data)
        store_result(request_id, result)
        return result
    
  2. Retry Budgets: Limit total retry time
    class RetryBudget:
        def __init__(self, max_retry_seconds: float = 300):
            self.max_retry_seconds = max_retry_seconds
            self.start_time = None
            
        def can_retry(self) -> bool:
            if self.start_time is None:
                self.start_time = time.time()
                return True
                
            elapsed = time.time() - self.start_time
            return elapsed < self.max_retry_seconds
    
  3. Error Classification: Retry only appropriate errors
    RETRYABLE_HTTP_CODES = {408, 429, 500, 502, 503, 504}
    
    def is_retryable_error(error: Exception) -> bool:
        if isinstance(error, HTTPError):
            return error.response.status_code in RETRYABLE_HTTP_CODES
        elif isinstance(error, ConnectionError):
            return True
        elif isinstance(error, TimeoutError):
            return True
        else:
            return False
    

Testing Retry Strategies

import pytest
from unittest.mock import Mock, call

def test_exponential_backoff():
    retry = ExponentialBackoffRetry(RetryConfig(
        max_attempts=3,
        initial_delay=0.1,
        jitter=False
    ))
    
    # Mock function that fails twice then succeeds
    mock_func = Mock(side_effect=[Exception("Fail 1"), Exception("Fail 2"), "Success"])
    
    result = retry.execute(mock_func)
    
    assert result == "Success"
    assert mock_func.call_count == 3

def test_circuit_breaker_retry():
    cb_retry = CircuitBreakerRetry(failure_threshold=2)
    
    # Function that always fails
    failing_func = Mock(side_effect=Exception("Always fails"))
    
    # First two calls should retry and fail
    for _ in range(2):
        with pytest.raises(Exception):
            cb_retry.execute(failing_func)
    
    # Circuit should now be open
    assert cb_retry.state == CircuitState.OPEN
    
    # Next call should fail immediately
    with pytest.raises(Exception, match="Circuit breaker is OPEN"):
        cb_retry.execute(failing_func)

async def test_hedged_requests():
    hedge_retry = HedgedRetryStrategy(hedge_after_ms=50, max_hedges=2)
    
    call_count = 0
    
    async def slow_then_fast():
        nonlocal call_count
        call_count += 1
        
        if call_count == 1:
            await asyncio.sleep(0.2)  # Slow first request
            return "slow"
        else:
            await asyncio.sleep(0.01)  # Fast hedged request
            return "fast"
    
    result = await hedge_retry.execute(slow_then_fast)
    
    assert result == "fast"  # Should get fast response
    assert call_count == 2  # Both requests started

Conclusion

Implementing robust retry strategies is essential for building resilient multi-agent systems. By choosing the appropriate retry pattern and configuring it correctly, you can handle transient failures gracefully while avoiding issues like retry storms and cascading failures.