Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.praison.ai/llms.txt

Use this file to discover all available pages before exploring further.

Agent Retry Strategies

Implementing robust retry strategies is crucial for building resilient multi-agent systems that can handle transient failures gracefully. This guide covers various retry patterns and their implementation.

Retry Strategy Fundamentals

When to Retry

  1. Transient Network Errors: Temporary connectivity issues
  2. Rate Limiting: API throttling responses
  3. Temporary Resource Unavailability: Database locks, service restarts
  4. Timeout Errors: Slow responses that exceed limits
  5. Partial Failures: When part of an operation succeeds

When NOT to Retry

  1. Authentication Failures: Invalid credentials
  2. Authorization Errors: Insufficient permissions
  3. Invalid Input: Malformed requests
  4. Business Logic Errors: Domain-specific failures
  5. Resource Not Found: 404 errors

Retry Patterns

1. Exponential Backoff with Jitter

Prevent thundering herd problems with randomized delays:
import random
import time
from typing import TypeVar, Callable, Optional, Any
from dataclasses import dataclass
import logging

T = TypeVar('T')

@dataclass
class RetryConfig:
    max_attempts: int = 3
    initial_delay: float = 1.0
    max_delay: float = 60.0
    exponential_base: float = 2.0
    jitter: bool = True
    
class ExponentialBackoffRetry:
    def __init__(self, config: RetryConfig = None):
        self.config = config or RetryConfig()
        self.logger = logging.getLogger(__name__)
        
    def execute(self, 
                func: Callable[..., T],
                *args,
                retryable_exceptions: tuple = (Exception,),
                on_retry: Optional[Callable[[int, Exception], None]] = None,
                **kwargs) -> T:
        """Execute function with exponential backoff retry"""
        
        last_exception = None
        
        for attempt in range(self.config.max_attempts):
            try:
                return func(*args, **kwargs)
                
            except retryable_exceptions as e:
                last_exception = e
                
                if attempt == self.config.max_attempts - 1:
                    # Last attempt failed
                    self.logger.error(f"All {self.config.max_attempts} attempts failed")
                    raise
                
                # Calculate delay
                delay = self._calculate_delay(attempt)
                
                # Call retry callback if provided
                if on_retry:
                    on_retry(attempt + 1, e)
                
                self.logger.warning(
                    f"Attempt {attempt + 1} failed: {str(e)}. "
                    f"Retrying in {delay:.2f} seconds..."
                )
                
                time.sleep(delay)
        
        raise last_exception
    
    def _calculate_delay(self, attempt: int) -> float:
        """Calculate delay with exponential backoff and jitter"""
        # Exponential delay
        delay = self.config.initial_delay * (self.config.exponential_base ** attempt)
        
        # Cap at max delay
        delay = min(delay, self.config.max_delay)
        
        # Add jitter
        if self.config.jitter:
            # Full jitter strategy
            delay = random.uniform(0, delay)
        
        return delay

# Async version
import asyncio

class AsyncExponentialBackoffRetry:
    def __init__(self, config: RetryConfig = None):
        self.config = config or RetryConfig()
        self.logger = logging.getLogger(__name__)
    
    async def execute(self,
                     func: Callable[..., T],
                     *args,
                     retryable_exceptions: tuple = (Exception,),
                     on_retry: Optional[Callable[[int, Exception], None]] = None,
                     **kwargs) -> T:
        """Execute async function with exponential backoff retry"""
        
        last_exception = None
        
        for attempt in range(self.config.max_attempts):
            try:
                return await func(*args, **kwargs)
                
            except retryable_exceptions as e:
                last_exception = e
                
                if attempt == self.config.max_attempts - 1:
                    self.logger.error(f"All {self.config.max_attempts} attempts failed")
                    raise
                
                delay = self._calculate_delay(attempt)
                
                if on_retry:
                    on_retry(attempt + 1, e)
                
                self.logger.warning(
                    f"Attempt {attempt + 1} failed: {str(e)}. "
                    f"Retrying in {delay:.2f} seconds..."
                )
                
                await asyncio.sleep(delay)
        
        raise last_exception
    
    def _calculate_delay(self, attempt: int) -> float:
        """Calculate delay with exponential backoff and jitter"""
        delay = self.config.initial_delay * (self.config.exponential_base ** attempt)
        delay = min(delay, self.config.max_delay)
        
        if self.config.jitter:
            delay = random.uniform(0, delay)
        
        return delay
PraisonAI now ships a built-in tool circuit breaker that wraps every tool call automatically. See Tool Circuit Breaker. The examples below show how to extend or customise that pattern.

2. Circuit Breaker with Retry

Combine circuit breaker pattern with intelligent retry:
from datetime import datetime, timedelta
from enum import Enum
import threading

class CircuitState(Enum):
    CLOSED = "closed"
    OPEN = "open"
    HALF_OPEN = "half_open"

class CircuitBreakerRetry:
    def __init__(self,
                 failure_threshold: int = 5,
                 recovery_timeout: int = 60,
                 half_open_max_calls: int = 3):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.half_open_max_calls = half_open_max_calls
        
        self.state = CircuitState.CLOSED
        self.failure_count = 0
        self.last_failure_time = None
        self.half_open_calls = 0
        
        self._lock = threading.Lock()
        self.retry_strategy = ExponentialBackoffRetry()
        
        # Metrics
        self.metrics = {
            "total_calls": 0,
            "successful_calls": 0,
            "failed_calls": 0,
            "rejected_calls": 0
        }
    
    def execute(self, func: Callable[..., T], *args, **kwargs) -> T:
        """Execute function with circuit breaker and retry logic"""
        with self._lock:
            self.metrics["total_calls"] += 1
            
            if self.state == CircuitState.OPEN:
                if self._should_attempt_reset():
                    self.state = CircuitState.HALF_OPEN
                    self.half_open_calls = 0
                else:
                    self.metrics["rejected_calls"] += 1
                    raise Exception("Circuit breaker is OPEN")
            
            if self.state == CircuitState.HALF_OPEN:
                if self.half_open_calls >= self.half_open_max_calls:
                    self.metrics["rejected_calls"] += 1
                    raise Exception("Circuit breaker is HALF_OPEN, max calls reached")
                self.half_open_calls += 1
        
        try:
            # Use retry strategy when circuit is closed or half-open
            result = self.retry_strategy.execute(
                func, *args,
                on_retry=self._on_retry,
                **kwargs
            )
            
            with self._lock:
                self._on_success()
                self.metrics["successful_calls"] += 1
            
            return result
            
        except Exception as e:
            with self._lock:
                self._on_failure()
                self.metrics["failed_calls"] += 1
            raise
    
    def _should_attempt_reset(self) -> bool:
        """Check if circuit should attempt reset"""
        return (
            self.last_failure_time and
            datetime.now() - self.last_failure_time > timedelta(seconds=self.recovery_timeout)
        )
    
    def _on_success(self):
        """Handle successful call"""
        if self.state == CircuitState.HALF_OPEN:
            self.state = CircuitState.CLOSED
            self.failure_count = 0
            self.last_failure_time = None
    
    def _on_failure(self):
        """Handle failed call"""
        self.failure_count += 1
        self.last_failure_time = datetime.now()
        
        if self.failure_count >= self.failure_threshold:
            self.state = CircuitState.OPEN
    
    def _on_retry(self, attempt: int, exception: Exception):
        """Called when retry attempt is made"""
        # Could implement additional logic here
        pass
    
    def get_state(self) -> Dict[str, Any]:
        """Get current circuit breaker state"""
        with self._lock:
            return {
                "state": self.state.value,
                "failure_count": self.failure_count,
                "metrics": self.metrics.copy()
            }

3. Adaptive Retry Strategy

Adjust retry behavior based on success patterns:
import statistics
from collections import deque

class AdaptiveRetryStrategy:
    def __init__(self,
                 min_attempts: int = 1,
                 max_attempts: int = 5,
                 success_threshold: float = 0.8,
                 window_size: int = 100):
        self.min_attempts = min_attempts
        self.max_attempts = max_attempts
        self.success_threshold = success_threshold
        self.window_size = window_size
        
        self.current_max_attempts = max_attempts
        self.results = deque(maxlen=window_size)
        self.attempt_counts = deque(maxlen=window_size)
        
        self._lock = threading.Lock()
    
    def execute(self, func: Callable[..., T], *args, **kwargs) -> T:
        """Execute with adaptive retry"""
        last_exception = None
        attempts = 0
        
        for attempt in range(1, self.current_max_attempts + 1):
            attempts = attempt
            
            try:
                result = func(*args, **kwargs)
                self._record_success(attempts)
                return result
                
            except Exception as e:
                last_exception = e
                
                if attempt < self.current_max_attempts:
                    # Adaptive delay based on attempt number
                    delay = self._calculate_adaptive_delay(attempt)
                    time.sleep(delay)
                else:
                    self._record_failure(attempts)
                    raise
        
        raise last_exception
    
    def _calculate_adaptive_delay(self, attempt: int) -> float:
        """Calculate delay based on recent performance"""
        base_delay = 1.0
        
        with self._lock:
            if len(self.results) >= 10:
                # Adjust delay based on recent success rate
                success_rate = sum(self.results) / len(self.results)
                
                if success_rate < 0.3:
                    # High failure rate - increase delays
                    base_delay *= 2
                elif success_rate > 0.8:
                    # High success rate - decrease delays
                    base_delay *= 0.5
            
            # Add some randomness
            delay = base_delay * (2 ** (attempt - 1))
            return min(delay + random.uniform(-0.5, 0.5), 30.0)
    
    def _record_success(self, attempts: int):
        """Record successful execution"""
        with self._lock:
            self.results.append(True)
            self.attempt_counts.append(attempts)
            self._adapt_strategy()
    
    def _record_failure(self, attempts: int):
        """Record failed execution"""
        with self._lock:
            self.results.append(False)
            self.attempt_counts.append(attempts)
            self._adapt_strategy()
    
    def _adapt_strategy(self):
        """Adapt retry strategy based on recent performance"""
        if len(self.results) < 20:
            return
        
        success_rate = sum(self.results) / len(self.results)
        avg_attempts = statistics.mean(self.attempt_counts)
        
        if success_rate > self.success_threshold and avg_attempts < 2:
            # High success with few retries - can reduce max attempts
            self.current_max_attempts = max(
                self.min_attempts,
                self.current_max_attempts - 1
            )
        elif success_rate < 0.5 and avg_attempts > 3:
            # Low success with many retries - increase max attempts
            self.current_max_attempts = min(
                self.max_attempts,
                self.current_max_attempts + 1
            )
    
    def get_stats(self) -> Dict[str, Any]:
        """Get adaptive strategy statistics"""
        with self._lock:
            if not self.results:
                return {"current_max_attempts": self.current_max_attempts}
            
            return {
                "current_max_attempts": self.current_max_attempts,
                "success_rate": sum(self.results) / len(self.results),
                "avg_attempts": statistics.mean(self.attempt_counts),
                "sample_size": len(self.results)
            }

4. Retry with Fallback

Implement retry with progressive fallback options:
@dataclass
class RetryWithFallbackConfig:
    primary_retry_attempts: int = 3
    fallback_retry_attempts: int = 2
    use_cache_on_failure: bool = True

class RetryWithFallback:
    def __init__(self, config: RetryWithFallbackConfig = None):
        self.config = config or RetryWithFallbackConfig()
        self.cache = {}
        self.primary_retry = ExponentialBackoffRetry(
            RetryConfig(max_attempts=self.config.primary_retry_attempts)
        )
        self.fallback_retry = ExponentialBackoffRetry(
            RetryConfig(max_attempts=self.config.fallback_retry_attempts)
        )
    
    def execute(self,
                primary_func: Callable[..., T],
                fallback_func: Optional[Callable[..., T]] = None,
                cache_key: Optional[str] = None,
                *args, **kwargs) -> T:
        """Execute with retry and fallback"""
        
        # Try primary function with retry
        try:
            result = self.primary_retry.execute(primary_func, *args, **kwargs)
            
            # Cache successful result
            if cache_key:
                self.cache[cache_key] = {
                    "value": result,
                    "timestamp": time.time()
                }
            
            return result
            
        except Exception as primary_error:
            logging.warning(f"Primary function failed: {primary_error}")
            
            # Try fallback if available
            if fallback_func:
                try:
                    return self.fallback_retry.execute(fallback_func, *args, **kwargs)
                except Exception as fallback_error:
                    logging.warning(f"Fallback function failed: {fallback_error}")
            
            # Try cache if enabled
            if self.config.use_cache_on_failure and cache_key and cache_key in self.cache:
                cached = self.cache[cache_key]
                logging.info(f"Returning cached result from {time.time() - cached['timestamp']:.1f}s ago")
                return cached["value"]
            
            # All options exhausted
            raise primary_error

5. Contextual Retry Strategy

Different retry strategies based on context:
class ContextualRetryStrategy:
    def __init__(self):
        self.strategies = {}
        self.default_strategy = ExponentialBackoffRetry()
        
    def register_strategy(self, context: str, strategy: Any):
        """Register a retry strategy for a specific context"""
        self.strategies[context] = strategy
    
    def execute(self,
                func: Callable[..., T],
                context: str,
                *args, **kwargs) -> T:
        """Execute with context-appropriate retry strategy"""
        
        # Select strategy based on context
        strategy = self.strategies.get(context, self.default_strategy)
        
        # Add context-specific error handling
        if context == "database":
            retryable_exceptions = (DBConnectionError, TimeoutError)
        elif context == "api":
            retryable_exceptions = (RequestException, HTTPError)
        elif context == "ml_model":
            retryable_exceptions = (ModelLoadError, InferenceError)
        else:
            retryable_exceptions = (Exception,)
        
        return strategy.execute(
            func, *args,
            retryable_exceptions=retryable_exceptions,
            **kwargs
        )

# Usage example
retry_manager = ContextualRetryStrategy()

# Register specific strategies
retry_manager.register_strategy(
    "database",
    ExponentialBackoffRetry(RetryConfig(
        max_attempts=5,
        initial_delay=0.1,
        max_delay=10.0
    ))
)

retry_manager.register_strategy(
    "api",
    ExponentialBackoffRetry(RetryConfig(
        max_attempts=3,
        initial_delay=1.0,
        max_delay=30.0,
        jitter=True
    ))
)

Advanced Retry Patterns

1. Bulkhead Retry Pattern

Isolate retry resources to prevent cascade failures:
from concurrent.futures import ThreadPoolExecutor, Future
import queue

class BulkheadRetry:
    def __init__(self,
                 max_concurrent_retries: int = 10,
                 queue_size: int = 100):
        self.max_concurrent_retries = max_concurrent_retries
        self.executor = ThreadPoolExecutor(max_workers=max_concurrent_retries)
        self.retry_queue = queue.Queue(maxsize=queue_size)
        self.active_retries = 0
        self._lock = threading.Lock()
        
    def execute_with_bulkhead(self,
                             func: Callable[..., T],
                             *args,
                             retry_config: RetryConfig = None,
                             **kwargs) -> Future[T]:
        """Execute with bulkhead isolation"""
        
        retry_config = retry_config or RetryConfig()
        
        # Check if we can accept more retries
        with self._lock:
            if self.active_retries >= self.max_concurrent_retries:
                try:
                    # Try to queue
                    self.retry_queue.put_nowait((func, args, kwargs, retry_config))
                    return self._create_pending_future()
                except queue.Full:
                    raise Exception("Retry bulkhead is full")
            
            self.active_retries += 1
        
        # Submit retry task
        future = self.executor.submit(
            self._execute_with_retry,
            func, args, kwargs, retry_config
        )
        
        # Decrement counter when done
        future.add_done_callback(lambda f: self._on_retry_complete())
        
        return future
    
    def _execute_with_retry(self, func, args, kwargs, retry_config):
        """Execute function with retry"""
        retry_strategy = ExponentialBackoffRetry(retry_config)
        return retry_strategy.execute(func, *args, **kwargs)
    
    def _on_retry_complete(self):
        """Called when retry completes"""
        with self._lock:
            self.active_retries -= 1
            
            # Process queued retries
            if not self.retry_queue.empty():
                try:
                    func, args, kwargs, retry_config = self.retry_queue.get_nowait()
                    self.active_retries += 1
                    
                    future = self.executor.submit(
                        self._execute_with_retry,
                        func, args, kwargs, retry_config
                    )
                    future.add_done_callback(lambda f: self._on_retry_complete())
                except queue.Empty:
                    pass
    
    def _create_pending_future(self) -> Future[T]:
        """Create a future that will be resolved when retry executes"""
        future = Future()
        # Implementation depends on your needs
        return future

2. Hedged Requests Pattern

Send multiple requests and use the first successful response:
import asyncio
from typing import List

class HedgedRetryStrategy:
    def __init__(self,
                 hedge_after_ms: int = 100,
                 max_hedges: int = 2):
        self.hedge_after_ms = hedge_after_ms
        self.max_hedges = max_hedges
        
    async def execute(self,
                     func: Callable[..., T],
                     *args, **kwargs) -> T:
        """Execute with hedged requests"""
        
        tasks = []
        results = []
        errors = []
        
        # Start first request
        task = asyncio.create_task(self._execute_with_tracking(
            func, args, kwargs, 0, results, errors
        ))
        tasks.append(task)
        
        # Start hedge timers
        for hedge_num in range(1, self.max_hedges + 1):
            hedge_task = asyncio.create_task(
                self._start_hedge_after_delay(
                    func, args, kwargs, hedge_num,
                    results, errors, tasks
                )
            )
            tasks.append(hedge_task)
        
        # Wait for first success or all failures
        while True:
            if results:
                # Cancel remaining tasks
                for task in tasks:
                    if not task.done():
                        task.cancel()
                
                return results[0]
            
            if all(task.done() for task in tasks):
                # All tasks completed without success
                raise Exception(f"All hedged requests failed: {errors}")
            
            await asyncio.sleep(0.01)
    
    async def _execute_with_tracking(self,
                                   func: Callable,
                                   args: tuple,
                                   kwargs: dict,
                                   request_num: int,
                                   results: List,
                                   errors: List):
        """Execute function and track results"""
        try:
            result = await func(*args, **kwargs)
            results.append(result)
            logging.info(f"Hedged request {request_num} succeeded")
        except Exception as e:
            errors.append((request_num, str(e)))
            logging.warning(f"Hedged request {request_num} failed: {e}")
    
    async def _start_hedge_after_delay(self,
                                     func: Callable,
                                     args: tuple,
                                     kwargs: dict,
                                     hedge_num: int,
                                     results: List,
                                     errors: List,
                                     tasks: List):
        """Start hedge request after delay"""
        await asyncio.sleep(self.hedge_after_ms / 1000.0)
        
        if not results:  # Only start if no success yet
            task = asyncio.create_task(self._execute_with_tracking(
                func, args, kwargs, hedge_num, results, errors
            ))
            tasks.append(task)

Monitoring and Metrics

Retry Metrics Collector

@dataclass
class RetryMetrics:
    total_attempts: int = 0
    successful_attempts: int = 0
    failed_attempts: int = 0
    retry_counts: Dict[int, int] = None  # attempts -> count
    error_types: Dict[str, int] = None
    total_retry_time: float = 0
    
    def __post_init__(self):
        if self.retry_counts is None:
            self.retry_counts = defaultdict(int)
        if self.error_types is None:
            self.error_types = defaultdict(int)

class MonitoredRetryStrategy:
    def __init__(self, base_strategy: Any):
        self.base_strategy = base_strategy
        self.metrics = RetryMetrics()
        self._lock = threading.Lock()
        
    def execute(self, func: Callable[..., T], *args, **kwargs) -> T:
        """Execute with metrics collection"""
        start_time = time.time()
        attempts = 0
        last_error = None
        
        def on_retry(attempt: int, exception: Exception):
            nonlocal attempts, last_error
            attempts = attempt
            last_error = exception
            
            with self._lock:
                self.metrics.error_types[type(exception).__name__] += 1
        
        try:
            # Pass our on_retry callback
            if 'on_retry' in kwargs:
                original_on_retry = kwargs['on_retry']
                def combined_on_retry(attempt, exception):
                    on_retry(attempt, exception)
                    original_on_retry(attempt, exception)
                kwargs['on_retry'] = combined_on_retry
            else:
                kwargs['on_retry'] = on_retry
            
            result = self.base_strategy.execute(func, *args, **kwargs)
            
            with self._lock:
                self.metrics.total_attempts += 1
                self.metrics.successful_attempts += 1
                self.metrics.retry_counts[attempts] += 1
                self.metrics.total_retry_time += time.time() - start_time
            
            return result
            
        except Exception as e:
            with self._lock:
                self.metrics.total_attempts += 1
                self.metrics.failed_attempts += 1
                self.metrics.retry_counts[attempts] += 1
                self.metrics.total_retry_time += time.time() - start_time
                
                if last_error:
                    self.metrics.error_types[type(last_error).__name__] += 1
                
            raise
    
    def get_metrics_summary(self) -> Dict[str, Any]:
        """Get metrics summary"""
        with self._lock:
            if self.metrics.total_attempts == 0:
                return {"message": "No retry attempts yet"}
            
            success_rate = self.metrics.successful_attempts / self.metrics.total_attempts
            avg_retry_time = self.metrics.total_retry_time / self.metrics.total_attempts
            
            # Calculate retry distribution
            retry_distribution = dict(self.metrics.retry_counts)
            
            return {
                "total_attempts": self.metrics.total_attempts,
                "success_rate": success_rate,
                "failure_rate": 1 - success_rate,
                "avg_retry_time": avg_retry_time,
                "retry_distribution": retry_distribution,
                "common_errors": dict(self.metrics.error_types),
                "avg_retries_per_attempt": sum(
                    k * v for k, v in retry_distribution.items()
                ) / self.metrics.total_attempts
            }

Best Practices

  1. Idempotency: Ensure operations can be safely retried
    def make_idempotent_request(request_id: str, data: Dict):
        # Use request_id to prevent duplicate processing
        if request_already_processed(request_id):
            return get_previous_result(request_id)
        
        result = process_request(data)
        store_result(request_id, result)
        return result
    
  2. Retry Budgets: Limit total retry time
    class RetryBudget:
        def __init__(self, max_retry_seconds: float = 300):
            self.max_retry_seconds = max_retry_seconds
            self.start_time = None
            
        def can_retry(self) -> bool:
            if self.start_time is None:
                self.start_time = time.time()
                return True
                
            elapsed = time.time() - self.start_time
            return elapsed < self.max_retry_seconds
    
  3. Error Classification: Retry only appropriate errors
    RETRYABLE_HTTP_CODES = {408, 429, 500, 502, 503, 504}
    
    def is_retryable_error(error: Exception) -> bool:
        if isinstance(error, HTTPError):
            return error.response.status_code in RETRYABLE_HTTP_CODES
        elif isinstance(error, ConnectionError):
            return True
        elif isinstance(error, TimeoutError):
            return True
        else:
            return False
    

Testing Retry Strategies

import pytest
from unittest.mock import Mock, call

def test_exponential_backoff():
    retry = ExponentialBackoffRetry(RetryConfig(
        max_attempts=3,
        initial_delay=0.1,
        jitter=False
    ))
    
    # Mock function that fails twice then succeeds
    mock_func = Mock(side_effect=[Exception("Fail 1"), Exception("Fail 2"), "Success"])
    
    result = retry.execute(mock_func)
    
    assert result == "Success"
    assert mock_func.call_count == 3

def test_circuit_breaker_retry():
    cb_retry = CircuitBreakerRetry(failure_threshold=2)
    
    # Function that always fails
    failing_func = Mock(side_effect=Exception("Always fails"))
    
    # First two calls should retry and fail
    for _ in range(2):
        with pytest.raises(Exception):
            cb_retry.execute(failing_func)
    
    # Circuit should now be open
    assert cb_retry.state == CircuitState.OPEN
    
    # Next call should fail immediately
    with pytest.raises(Exception, match="Circuit breaker is OPEN"):
        cb_retry.execute(failing_func)

async def test_hedged_requests():
    hedge_retry = HedgedRetryStrategy(hedge_after_ms=50, max_hedges=2)
    
    call_count = 0
    
    async def slow_then_fast():
        nonlocal call_count
        call_count += 1
        
        if call_count == 1:
            await asyncio.sleep(0.2)  # Slow first request
            return "slow"
        else:
            await asyncio.sleep(0.01)  # Fast hedged request
            return "fast"
    
    result = await hedge_retry.execute(slow_then_fast)
    
    assert result == "fast"  # Should get fast response
    assert call_count == 2  # Both requests started

Built-in Task Guardrail Retries

PraisonAI provides built-in retry functionality specifically for guardrail validation failures, distinct from the generic ExponentialBackoffRetry patterns above.
Guardrail retries are handled automatically by the executor when Task(guardrail=..., max_retries=...) is configured. This is separate from manual retry implementations.
from praisonaiagents import Agent, Task, PraisonAIAgents

def validate_content(output):
    """Built-in guardrail with retry support"""
    if len(output.raw.split()) < 50:
        return False, "Content too short - needs at least 50 words"
    return True, output

task = Task(
    description="Write a detailed explanation",
    agent=agent,
    guardrail=validate_content,
    max_retries=3,  # Built-in executor-level retry
    retry_with_feedback=True
)

# The executor automatically handles:
# - Guardrail validation
# - Retry logic on failure  
# - Feedback to agent on retry
# - Final failure after max_retries
This differs from manual retry strategies as it’s integrated into the task execution workflow and handles the retry loop at the executor level.

Conclusion

Implementing robust retry strategies is essential for building resilient multi-agent systems. By choosing the appropriate retry pattern and configuring it correctly, you can handle transient failures gracefully while avoiding issues like retry storms and cascading failures.