> ## Documentation Index
> Fetch the complete documentation index at: https://docs.praison.ai/llms.txt
> Use this file to discover all available pages before exploring further.

# Graceful Degradation Patterns

> Design patterns for building resilient multi-agent systems that degrade gracefully under failure

# Graceful Degradation Patterns

Graceful degradation ensures your multi-agent system continues to provide value even when components fail or resources are constrained. This guide covers patterns for building resilient systems that fail gracefully.

## Core Principles

### Design for Partial Failure

1. **Service Continuity**: Maintain core functionality when non-critical components fail
2. **Progressive Enhancement**: Build from minimal viable functionality upward
3. **Fallback Strategies**: Always have a Plan B (and C)
4. **User Communication**: Keep users informed about degraded functionality
5. **Automatic Recovery**: Self-heal when conditions improve

## Degradation Patterns

### 1. Capability Degradation

Reduce functionality while maintaining core services:

```python theme={"theme":{"light":"vitesse-light","dark":"vitesse-dark"}}
from enum import Enum
from typing import Dict, List, Any, Optional
from abc import ABC, abstractmethod

class ServiceLevel(Enum):
    FULL = "full"
    DEGRADED = "degraded"
    MINIMAL = "minimal"
    OFFLINE = "offline"

class DegradableService(ABC):
    def __init__(self, name: str):
        self.name = name
        self.current_level = ServiceLevel.FULL
        self.capabilities = self._define_capabilities()
    
    @abstractmethod
    def _define_capabilities(self) -> Dict[ServiceLevel, List[str]]:
        """Define capabilities available at each service level"""
        pass
    
    def get_available_capabilities(self) -> List[str]:
        """Get currently available capabilities"""
        return self.capabilities.get(self.current_level, [])
    
    def degrade(self):
        """Degrade to next lower service level"""
        levels = [ServiceLevel.FULL, ServiceLevel.DEGRADED, 
                 ServiceLevel.MINIMAL, ServiceLevel.OFFLINE]
        
        current_index = levels.index(self.current_level)
        if current_index < len(levels) - 1:
            self.current_level = levels[current_index + 1]
            self._on_degrade()
    
    def restore(self):
        """Restore to next higher service level"""
        levels = [ServiceLevel.OFFLINE, ServiceLevel.MINIMAL,
                 ServiceLevel.DEGRADED, ServiceLevel.FULL]
        
        current_index = levels.index(self.current_level)
        if current_index < len(levels) - 1:
            self.current_level = levels[current_index + 1]
            self._on_restore()
    
    @abstractmethod
    def _on_degrade(self):
        """Hook for degradation actions"""
        pass
    
    @abstractmethod
    def _on_restore(self):
        """Hook for restoration actions"""
        pass

class IntelligentAssistant(DegradableService):
    def _define_capabilities(self) -> Dict[ServiceLevel, List[str]]:
        return {
            ServiceLevel.FULL: [
                "natural_language_understanding",
                "context_awareness",
                "multi_turn_conversation",
                "personalization",
                "proactive_suggestions",
                "complex_reasoning"
            ],
            ServiceLevel.DEGRADED: [
                "natural_language_understanding",
                "basic_context",
                "single_turn_responses",
                "simple_reasoning"
            ],
            ServiceLevel.MINIMAL: [
                "keyword_matching",
                "predefined_responses",
                "basic_commands"
            ],
            ServiceLevel.OFFLINE: []
        }
    
    def process_request(self, request: str) -> str:
        """Process request based on current service level"""
        capabilities = self.get_available_capabilities()
        
        if self.current_level == ServiceLevel.FULL:
            return self._full_processing(request)
        elif self.current_level == ServiceLevel.DEGRADED:
            return self._degraded_processing(request)
        elif self.current_level == ServiceLevel.MINIMAL:
            return self._minimal_processing(request)
        else:
            return "Service temporarily unavailable"
    
    def _full_processing(self, request: str) -> str:
        # Full NLU and reasoning
        return f"[FULL] Processed with all capabilities: {request}"
    
    def _degraded_processing(self, request: str) -> str:
        # Simplified processing
        return f"[DEGRADED] Basic response to: {request}"
    
    def _minimal_processing(self, request: str) -> str:
        # Keyword-based responses
        keywords = ["help", "status", "error"]
        for keyword in keywords:
            if keyword in request.lower():
                return f"[MINIMAL] Detected keyword '{keyword}'"
        return "[MINIMAL] Please try basic commands"
    
    def _on_degrade(self):
        print(f"Assistant degraded to {self.current_level.value}")
    
    def _on_restore(self):
        print(f"Assistant restored to {self.current_level.value}")
```

### 2. Resource-Based Degradation

Adjust behavior based on available resources:

```python theme={"theme":{"light":"vitesse-light","dark":"vitesse-dark"}}
import psutil
from dataclasses import dataclass
from typing import Callable

@dataclass
class ResourceThresholds:
    cpu_high: float = 80.0
    cpu_critical: float = 95.0
    memory_high: float = 80.0
    memory_critical: float = 95.0
    response_time_high: float = 2.0  # seconds
    response_time_critical: float = 5.0

class ResourceAwareDegradation:
    def __init__(self, thresholds: ResourceThresholds = None):
        self.thresholds = thresholds or ResourceThresholds()
        self.degradation_strategies = []
        self.current_degradations = set()
        self.metrics_history = []
    
    def add_degradation_strategy(self, name: str, 
                               condition: Callable[[Dict], bool],
                               apply: Callable[[], None],
                               revert: Callable[[], None]):
        """Add a degradation strategy"""
        self.degradation_strategies.append({
            "name": name,
            "condition": condition,
            "apply": apply,
            "revert": revert
        })
    
    def check_and_adjust(self):
        """Check resources and adjust degradation level"""
        metrics = self._collect_metrics()
        self.metrics_history.append(metrics)
        
        # Keep only last 10 metrics
        if len(self.metrics_history) > 10:
            self.metrics_history.pop(0)
        
        for strategy in self.degradation_strategies:
            should_degrade = strategy["condition"](metrics)
            is_degraded = strategy["name"] in self.current_degradations
            
            if should_degrade and not is_degraded:
                # Apply degradation
                strategy["apply"]()
                self.current_degradations.add(strategy["name"])
                print(f"Applied degradation: {strategy['name']}")
            
            elif not should_degrade and is_degraded:
                # Revert degradation
                strategy["revert"]()
                self.current_degradations.remove(strategy["name"])
                print(f"Reverted degradation: {strategy['name']}")
    
    def _collect_metrics(self) -> Dict[str, float]:
        """Collect system metrics"""
        return {
            "cpu_percent": psutil.cpu_percent(interval=1),
            "memory_percent": psutil.virtual_memory().percent,
            "disk_usage": psutil.disk_usage('/').percent,
            "active_threads": threading.active_count()
        }
    
    def get_health_status(self) -> Dict[str, Any]:
        """Get current health status"""
        if not self.metrics_history:
            return {"status": "unknown", "degradations": []}
        
        latest_metrics = self.metrics_history[-1]
        
        # Determine overall health
        if latest_metrics["cpu_percent"] > self.thresholds.cpu_critical or \
           latest_metrics["memory_percent"] > self.thresholds.memory_critical:
            status = "critical"
        elif latest_metrics["cpu_percent"] > self.thresholds.cpu_high or \
             latest_metrics["memory_percent"] > self.thresholds.memory_high:
            status = "degraded"
        else:
            status = "healthy"
        
        return {
            "status": status,
            "metrics": latest_metrics,
            "active_degradations": list(self.current_degradations)
        }

# Example usage
degradation_manager = ResourceAwareDegradation()

# Add degradation strategies
degradation_manager.add_degradation_strategy(
    name="disable_caching",
    condition=lambda m: m["memory_percent"] > 85,
    apply=lambda: print("Caching disabled"),
    revert=lambda: print("Caching enabled")
)

degradation_manager.add_degradation_strategy(
    name="reduce_concurrency",
    condition=lambda m: m["cpu_percent"] > 90,
    apply=lambda: print("Reduced concurrency"),
    revert=lambda: print("Normal concurrency")
)
```

### 3. Fallback Chain Pattern

Implement a chain of fallback options:

```python theme={"theme":{"light":"vitesse-light","dark":"vitesse-dark"}}
from typing import List, TypeVar, Generic, Optional

T = TypeVar('T')

class FallbackChain(Generic[T]):
    def __init__(self):
        self.handlers: List[Callable[..., T]] = []
        self.fallback_metrics = {
            "attempts": 0,
            "failures_by_level": {}
        }
    
    def add_handler(self, handler: Callable[..., T], name: str = None):
        """Add a handler to the fallback chain"""
        handler_name = name or handler.__name__
        self.handlers.append((handler_name, handler))
        self.fallback_metrics["failures_by_level"][handler_name] = 0
    
    def execute(self, *args, **kwargs) -> Optional[T]:
        """Execute handlers in order until one succeeds"""
        self.fallback_metrics["attempts"] += 1
        
        for i, (name, handler) in enumerate(self.handlers):
            try:
                result = handler(*args, **kwargs)
                
                # Log successful handler
                if i > 0:
                    print(f"Succeeded with fallback handler: {name}")
                
                return result
                
            except Exception as e:
                self.fallback_metrics["failures_by_level"][name] += 1
                
                # Log failure and continue to next handler
                print(f"Handler '{name}' failed: {str(e)}")
                
                if i == len(self.handlers) - 1:
                    # Last handler failed
                    raise Exception("All handlers in fallback chain failed")
        
        return None
    
    def get_metrics(self) -> Dict[str, Any]:
        """Get fallback chain metrics"""
        return {
            **self.fallback_metrics,
            "success_rate": 1 - (sum(self.fallback_metrics["failures_by_level"].values()) / 
                               max(self.fallback_metrics["attempts"], 1))
        }

# Example: Multi-level data retrieval
class DataRetriever:
    def __init__(self):
        self.fallback_chain = FallbackChain[Dict]()
        self._setup_fallback_chain()
    
    def _setup_fallback_chain(self):
        """Setup fallback chain for data retrieval"""
        # Primary: Fast cache
        self.fallback_chain.add_handler(
            self._get_from_cache,
            "cache"
        )
        
        # Secondary: Database
        self.fallback_chain.add_handler(
            self._get_from_database,
            "database"
        )
        
        # Tertiary: External API
        self.fallback_chain.add_handler(
            self._get_from_api,
            "external_api"
        )
        
        # Last resort: Default/cached data
        self.fallback_chain.add_handler(
            self._get_default_data,
            "default"
        )
    
    def get_data(self, key: str) -> Dict:
        """Get data with automatic fallback"""
        return self.fallback_chain.execute(key)
    
    def _get_from_cache(self, key: str) -> Dict:
        # Simulate cache lookup
        if random.random() > 0.8:  # 20% cache miss
            raise Exception("Cache miss")
        return {"source": "cache", "data": f"cached_{key}"}
    
    def _get_from_database(self, key: str) -> Dict:
        # Simulate database lookup
        if random.random() > 0.9:  # 10% failure
            raise Exception("Database unavailable")
        return {"source": "database", "data": f"db_{key}"}
    
    def _get_from_api(self, key: str) -> Dict:
        # Simulate API call
        if random.random() > 0.7:  # 30% failure
            raise Exception("API timeout")
        return {"source": "api", "data": f"api_{key}"}
    
    def _get_default_data(self, key: str) -> Dict:
        # Always succeeds with default data
        return {"source": "default", "data": "default_value"}
```

<Note>
  PraisonAI now ships a **built-in tool circuit breaker** that wraps every tool call automatically. See [Tool Circuit Breaker](/features/tool-circuit-breaker). The examples below show how to extend or customise that pattern.
</Note>

### 4. Circuit Breaker with Degradation

Combine circuit breaker with graceful degradation:

```python theme={"theme":{"light":"vitesse-light","dark":"vitesse-dark"}}
from datetime import datetime, timedelta

class DegradingCircuitBreaker:
    def __init__(self, failure_threshold: int = 5, 
                 recovery_timeout: int = 60,
                 degradation_levels: List[str] = None):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.degradation_levels = degradation_levels or [
            "full", "partial", "minimal", "offline"
        ]
        
        self.failure_count = 0
        self.last_failure_time = None
        self.current_level_index = 0
        self.state = "closed"  # closed, open, half-open
    
    @property
    def current_level(self) -> str:
        """Get current degradation level"""
        return self.degradation_levels[self.current_level_index]
    
    def call(self, func: Callable, fallback: Optional[Callable] = None, 
             *args, **kwargs) -> Any:
        """Execute function with circuit breaker protection"""
        # Check if circuit should be reset
        if self.state == "open":
            if self._should_attempt_reset():
                self.state = "half-open"
            else:
                # Circuit is open, use fallback or fail
                if fallback:
                    return self._execute_with_degradation(fallback, *args, **kwargs)
                raise Exception("Circuit breaker is open")
        
        try:
            # Attempt to execute function
            result = func(*args, **kwargs)
            
            # Success - reset on half-open
            if self.state == "half-open":
                self._reset()
            
            return result
            
        except Exception as e:
            self._record_failure()
            
            # Use fallback if available
            if fallback:
                return self._execute_with_degradation(fallback, *args, **kwargs)
            
            raise e
    
    def _record_failure(self):
        """Record a failure and potentially open circuit"""
        self.failure_count += 1
        self.last_failure_time = datetime.now()
        
        if self.failure_count >= self.failure_threshold:
            self.state = "open"
            self._degrade()
    
    def _should_attempt_reset(self) -> bool:
        """Check if enough time has passed to attempt reset"""
        return (datetime.now() - self.last_failure_time).seconds >= self.recovery_timeout
    
    def _reset(self):
        """Reset circuit breaker"""
        self.failure_count = 0
        self.last_failure_time = None
        self.state = "closed"
        self._restore()
    
    def _degrade(self):
        """Move to next degradation level"""
        if self.current_level_index < len(self.degradation_levels) - 1:
            self.current_level_index += 1
            print(f"Degraded to: {self.current_level}")
    
    def _restore(self):
        """Move to previous degradation level"""
        if self.current_level_index > 0:
            self.current_level_index -= 1
            print(f"Restored to: {self.current_level}")
    
    def _execute_with_degradation(self, func: Callable, *args, **kwargs) -> Any:
        """Execute function with current degradation level"""
        # Pass degradation level to function
        if 'degradation_level' in inspect.signature(func).parameters:
            kwargs['degradation_level'] = self.current_level
        
        return func(*args, **kwargs)
```

### 5. Adaptive Timeout Pattern

Adjust timeouts based on system performance:

```python theme={"theme":{"light":"vitesse-light","dark":"vitesse-dark"}}
import statistics

class AdaptiveTimeout:
    def __init__(self, initial_timeout: float = 5.0,
                 min_timeout: float = 1.0,
                 max_timeout: float = 30.0):
        self.initial_timeout = initial_timeout
        self.min_timeout = min_timeout
        self.max_timeout = max_timeout
        
        self.current_timeout = initial_timeout
        self.response_times = []
        self.timeout_history = []
    
    def execute_with_timeout(self, func: Callable, *args, **kwargs) -> Any:
        """Execute function with adaptive timeout"""
        import signal
        
        def timeout_handler(signum, frame):
            raise TimeoutError(f"Operation timed out after {self.current_timeout}s")
        
        # Set timeout
        signal.signal(signal.SIGALRM, timeout_handler)
        signal.alarm(int(self.current_timeout))
        
        start_time = time.time()
        
        try:
            result = func(*args, **kwargs)
            
            # Record successful response time
            response_time = time.time() - start_time
            self._record_response_time(response_time)
            
            return result
            
        except TimeoutError:
            # Increase timeout for next attempt
            self._increase_timeout()
            raise
            
        finally:
            # Cancel alarm
            signal.alarm(0)
    
    def _record_response_time(self, response_time: float):
        """Record response time and adjust timeout"""
        self.response_times.append(response_time)
        
        # Keep only last 100 response times
        if len(self.response_times) > 100:
            self.response_times.pop(0)
        
        # Adjust timeout based on statistics
        if len(self.response_times) >= 10:
            # Calculate P95 response time
            p95 = statistics.quantiles(self.response_times, n=20)[18]  # 95th percentile
            
            # Set timeout to P95 + 50% margin
            new_timeout = p95 * 1.5
            
            # Apply bounds
            self.current_timeout = max(
                self.min_timeout,
                min(self.max_timeout, new_timeout)
            )
            
            self.timeout_history.append({
                "timestamp": time.time(),
                "timeout": self.current_timeout,
                "based_on_p95": p95
            })
    
    def _increase_timeout(self):
        """Increase timeout after failure"""
        self.current_timeout = min(
            self.max_timeout,
            self.current_timeout * 1.5
        )
    
    def get_stats(self) -> Dict[str, Any]:
        """Get timeout statistics"""
        if not self.response_times:
            return {"current_timeout": self.current_timeout}
        
        return {
            "current_timeout": self.current_timeout,
            "avg_response_time": statistics.mean(self.response_times),
            "p95_response_time": statistics.quantiles(self.response_times, n=20)[18],
            "timeout_adjustments": len(self.timeout_history)
        }
```

## Implementation Strategies

### 1. Health-Based Routing

Route requests based on service health:

```python theme={"theme":{"light":"vitesse-light","dark":"vitesse-dark"}}
class HealthBasedRouter:
    def __init__(self):
        self.services = {}
        self.health_scores = {}
        self.routing_stats = defaultdict(int)
    
    def register_service(self, name: str, service: Any, 
                        health_check: Callable[[], float]):
        """Register a service with health check"""
        self.services[name] = {
            "instance": service,
            "health_check": health_check
        }
    
    def route_request(self, request: Any) -> Any:
        """Route request to healthiest service"""
        # Update health scores
        self._update_health_scores()
        
        # Get services sorted by health
        healthy_services = [
            (name, score) for name, score in self.health_scores.items()
            if score > 0.2  # Minimum health threshold
        ]
        
        if not healthy_services:
            raise Exception("No healthy services available")
        
        # Sort by health score
        healthy_services.sort(key=lambda x: x[1], reverse=True)
        
        # Try services in order of health
        for service_name, health_score in healthy_services:
            try:
                service = self.services[service_name]["instance"]
                result = service.handle_request(request)
                
                self.routing_stats[service_name] += 1
                return result
                
            except Exception as e:
                print(f"Service {service_name} failed: {e}")
                continue
        
        raise Exception("All services failed")
    
    def _update_health_scores(self):
        """Update health scores for all services"""
        for name, service_info in self.services.items():
            try:
                score = service_info["health_check"]()
                self.health_scores[name] = score
            except:
                self.health_scores[name] = 0.0
```

### 2. Load Shedding

Drop non-critical requests under load:

```python theme={"theme":{"light":"vitesse-light","dark":"vitesse-dark"}}
from enum import Enum
import hashlib

class RequestPriority(Enum):
    CRITICAL = 4
    HIGH = 3
    NORMAL = 2
    LOW = 1

class LoadShedder:
    def __init__(self, capacity: int = 1000):
        self.capacity = capacity
        self.current_load = 0
        self.shed_threshold = 0.8
        self.priority_thresholds = {
            RequestPriority.LOW: 0.6,
            RequestPriority.NORMAL: 0.8,
            RequestPriority.HIGH: 0.9,
            RequestPriority.CRITICAL: 1.0
        }
        self.stats = defaultdict(int)
    
    def should_accept_request(self, request_id: str, 
                            priority: RequestPriority) -> bool:
        """Determine if request should be accepted"""
        load_ratio = self.current_load / self.capacity
        
        # Always accept critical requests if possible
        if priority == RequestPriority.CRITICAL and load_ratio < 1.0:
            return True
        
        # Check against priority threshold
        threshold = self.priority_thresholds[priority]
        
        if load_ratio >= threshold:
            # Shed request
            self.stats[f"shed_{priority.name}"] += 1
            return False
        
        # Probabilistic shedding for smoother degradation
        if load_ratio > self.shed_threshold:
            # Calculate shedding probability
            shed_probability = (load_ratio - self.shed_threshold) / (1.0 - self.shed_threshold)
            
            # Use request ID for deterministic random decision
            hash_value = int(hashlib.md5(request_id.encode()).hexdigest(), 16)
            if (hash_value % 100) / 100 < shed_probability:
                self.stats[f"probabilistic_shed_{priority.name}"] += 1
                return False
        
        self.stats[f"accepted_{priority.name}"] += 1
        return True
    
    def update_load(self, current_load: int):
        """Update current load"""
        self.current_load = current_load
    
    def get_shedding_stats(self) -> Dict[str, Any]:
        """Get load shedding statistics"""
        total_requests = sum(self.stats.values())
        shed_requests = sum(v for k, v in self.stats.items() if 'shed' in k)
        
        return {
            "load_ratio": self.current_load / self.capacity,
            "total_requests": total_requests,
            "shed_requests": shed_requests,
            "shed_rate": shed_requests / max(total_requests, 1),
            "by_priority": dict(self.stats)
        }
```

## Monitoring and Alerting

### Degradation Dashboard

```python theme={"theme":{"light":"vitesse-light","dark":"vitesse-dark"}}
class DegradationMonitor:
    def __init__(self):
        self.services = {}
        self.degradation_events = []
        self.alert_handlers = []
    
    def register_service(self, service: DegradableService):
        """Register a service for monitoring"""
        self.services[service.name] = service
    
    def add_alert_handler(self, handler: Callable[[Dict], None]):
        """Add alert handler"""
        self.alert_handlers.append(handler)
    
    def check_services(self):
        """Check all services and generate alerts"""
        for name, service in self.services.items():
            previous_level = getattr(service, '_previous_level', service.current_level)
            
            if service.current_level != previous_level:
                event = {
                    "timestamp": datetime.now(),
                    "service": name,
                    "previous_level": previous_level.value,
                    "current_level": service.current_level.value,
                    "direction": "degraded" if service.current_level.value < previous_level.value else "restored"
                }
                
                self.degradation_events.append(event)
                
                # Send alerts
                for handler in self.alert_handlers:
                    handler(event)
            
            service._previous_level = service.current_level
    
    def get_system_status(self) -> Dict[str, Any]:
        """Get overall system status"""
        service_levels = {}
        degraded_count = 0
        
        for name, service in self.services.items():
            service_levels[name] = service.current_level.value
            if service.current_level != ServiceLevel.FULL:
                degraded_count += 1
        
        return {
            "overall_health": "healthy" if degraded_count == 0 else "degraded",
            "degraded_services": degraded_count,
            "total_services": len(self.services),
            "service_levels": service_levels,
            "recent_events": self.degradation_events[-10:]
        }
```

## Best Practices

1. **Test Degradation Paths**: Regularly test all degradation scenarios
   ```python theme={"theme":{"light":"vitesse-light","dark":"vitesse-dark"}}
   def test_degradation_scenario():
       service = IntelligentAssistant("test")
       
       # Test each level
       for level in [ServiceLevel.DEGRADED, ServiceLevel.MINIMAL]:
           service.degrade()
           response = service.process_request("test query")
           assert response is not None
           assert service.current_level == level
   ```

2. **Monitor Degradation Metrics**: Track when and why degradation occurs
   ```python theme={"theme":{"light":"vitesse-light","dark":"vitesse-dark"}}
   def log_degradation_metrics(service_name: str, reason: str, level: str):
       metrics = {
           "service": service_name,
           "reason": reason,
           "level": level,
           "timestamp": datetime.now(),
           "impact": calculate_impact(level)
       }
       
       # Log to monitoring system
       monitoring.record("degradation", metrics)
   ```

3. **Communicate Status**: Keep users informed
   ```python theme={"theme":{"light":"vitesse-light","dark":"vitesse-dark"}}
   def get_user_friendly_status(service_level: ServiceLevel) -> str:
       messages = {
           ServiceLevel.FULL: "All features available",
           ServiceLevel.DEGRADED: "Running with reduced features for stability",
           ServiceLevel.MINIMAL: "Basic features only - we're working on it",
           ServiceLevel.OFFLINE: "Service temporarily unavailable"
       }
       return messages.get(service_level, "Unknown status")
   ```

## Testing Graceful Degradation

```python theme={"theme":{"light":"vitesse-light","dark":"vitesse-dark"}}
import pytest
from unittest.mock import Mock, patch

def test_capability_degradation():
    assistant = IntelligentAssistant("test")
    
    # Test full capabilities
    assert "complex_reasoning" in assistant.get_available_capabilities()
    
    # Test degradation
    assistant.degrade()
    assert assistant.current_level == ServiceLevel.DEGRADED
    assert "complex_reasoning" not in assistant.get_available_capabilities()
    assert "simple_reasoning" in assistant.get_available_capabilities()

def test_fallback_chain():
    chain = FallbackChain[str]()
    
    # Add handlers
    chain.add_handler(lambda: Exception("Primary failed"), "primary")
    chain.add_handler(lambda: "fallback_result", "fallback")
    
    # Execute
    result = chain.execute()
    
    assert result == "fallback_result"
    assert chain.get_metrics()["failures_by_level"]["primary"] == 1

@patch('psutil.cpu_percent')
@patch('psutil.virtual_memory')
def test_resource_degradation(mock_memory, mock_cpu):
    # Simulate high CPU
    mock_cpu.return_value = 95.0
    mock_memory.return_value = Mock(percent=50.0)
    
    manager = ResourceAwareDegradation()
    
    # Add strategy
    degraded = False
    def set_degraded(): 
        nonlocal degraded
        degraded = True
    
    manager.add_degradation_strategy(
        "test",
        lambda m: m["cpu_percent"] > 90,
        set_degraded,
        lambda: None
    )
    
    manager.check_and_adjust()
    
    assert degraded
    assert "test" in manager.current_degradations
```

## Conclusion

Graceful degradation is essential for building resilient multi-agent systems. By implementing these patterns, your system can maintain service availability even under adverse conditions, providing a better user experience and operational stability.
