Skip to main content

Graceful Degradation Patterns

Graceful degradation ensures your multi-agent system continues to provide value even when components fail or resources are constrained. This guide covers patterns for building resilient systems that fail gracefully.

Core Principles

Design for Partial Failure

  1. Service Continuity: Maintain core functionality when non-critical components fail
  2. Progressive Enhancement: Build from minimal viable functionality upward
  3. Fallback Strategies: Always have a Plan B (and C)
  4. User Communication: Keep users informed about degraded functionality
  5. Automatic Recovery: Self-heal when conditions improve

Degradation Patterns

1. Capability Degradation

Reduce functionality while maintaining core services:
from enum import Enum
from typing import Dict, List, Any, Optional
from abc import ABC, abstractmethod

class ServiceLevel(Enum):
    FULL = "full"
    DEGRADED = "degraded"
    MINIMAL = "minimal"
    OFFLINE = "offline"

class DegradableService(ABC):
    def __init__(self, name: str):
        self.name = name
        self.current_level = ServiceLevel.FULL
        self.capabilities = self._define_capabilities()
    
    @abstractmethod
    def _define_capabilities(self) -> Dict[ServiceLevel, List[str]]:
        """Define capabilities available at each service level"""
        pass
    
    def get_available_capabilities(self) -> List[str]:
        """Get currently available capabilities"""
        return self.capabilities.get(self.current_level, [])
    
    def degrade(self):
        """Degrade to next lower service level"""
        levels = [ServiceLevel.FULL, ServiceLevel.DEGRADED, 
                 ServiceLevel.MINIMAL, ServiceLevel.OFFLINE]
        
        current_index = levels.index(self.current_level)
        if current_index < len(levels) - 1:
            self.current_level = levels[current_index + 1]
            self._on_degrade()
    
    def restore(self):
        """Restore to next higher service level"""
        levels = [ServiceLevel.OFFLINE, ServiceLevel.MINIMAL,
                 ServiceLevel.DEGRADED, ServiceLevel.FULL]
        
        current_index = levels.index(self.current_level)
        if current_index < len(levels) - 1:
            self.current_level = levels[current_index + 1]
            self._on_restore()
    
    @abstractmethod
    def _on_degrade(self):
        """Hook for degradation actions"""
        pass
    
    @abstractmethod
    def _on_restore(self):
        """Hook for restoration actions"""
        pass

class IntelligentAssistant(DegradableService):
    def _define_capabilities(self) -> Dict[ServiceLevel, List[str]]:
        return {
            ServiceLevel.FULL: [
                "natural_language_understanding",
                "context_awareness",
                "multi_turn_conversation",
                "personalization",
                "proactive_suggestions",
                "complex_reasoning"
            ],
            ServiceLevel.DEGRADED: [
                "natural_language_understanding",
                "basic_context",
                "single_turn_responses",
                "simple_reasoning"
            ],
            ServiceLevel.MINIMAL: [
                "keyword_matching",
                "predefined_responses",
                "basic_commands"
            ],
            ServiceLevel.OFFLINE: []
        }
    
    def process_request(self, request: str) -> str:
        """Process request based on current service level"""
        capabilities = self.get_available_capabilities()
        
        if self.current_level == ServiceLevel.FULL:
            return self._full_processing(request)
        elif self.current_level == ServiceLevel.DEGRADED:
            return self._degraded_processing(request)
        elif self.current_level == ServiceLevel.MINIMAL:
            return self._minimal_processing(request)
        else:
            return "Service temporarily unavailable"
    
    def _full_processing(self, request: str) -> str:
        # Full NLU and reasoning
        return f"[FULL] Processed with all capabilities: {request}"
    
    def _degraded_processing(self, request: str) -> str:
        # Simplified processing
        return f"[DEGRADED] Basic response to: {request}"
    
    def _minimal_processing(self, request: str) -> str:
        # Keyword-based responses
        keywords = ["help", "status", "error"]
        for keyword in keywords:
            if keyword in request.lower():
                return f"[MINIMAL] Detected keyword '{keyword}'"
        return "[MINIMAL] Please try basic commands"
    
    def _on_degrade(self):
        print(f"Assistant degraded to {self.current_level.value}")
    
    def _on_restore(self):
        print(f"Assistant restored to {self.current_level.value}")

2. Resource-Based Degradation

Adjust behavior based on available resources:
import psutil
from dataclasses import dataclass
from typing import Callable

@dataclass
class ResourceThresholds:
    cpu_high: float = 80.0
    cpu_critical: float = 95.0
    memory_high: float = 80.0
    memory_critical: float = 95.0
    response_time_high: float = 2.0  # seconds
    response_time_critical: float = 5.0

class ResourceAwareDegradation:
    def __init__(self, thresholds: ResourceThresholds = None):
        self.thresholds = thresholds or ResourceThresholds()
        self.degradation_strategies = []
        self.current_degradations = set()
        self.metrics_history = []
    
    def add_degradation_strategy(self, name: str, 
                               condition: Callable[[Dict], bool],
                               apply: Callable[[], None],
                               revert: Callable[[], None]):
        """Add a degradation strategy"""
        self.degradation_strategies.append({
            "name": name,
            "condition": condition,
            "apply": apply,
            "revert": revert
        })
    
    def check_and_adjust(self):
        """Check resources and adjust degradation level"""
        metrics = self._collect_metrics()
        self.metrics_history.append(metrics)
        
        # Keep only last 10 metrics
        if len(self.metrics_history) > 10:
            self.metrics_history.pop(0)
        
        for strategy in self.degradation_strategies:
            should_degrade = strategy["condition"](metrics)
            is_degraded = strategy["name"] in self.current_degradations
            
            if should_degrade and not is_degraded:
                # Apply degradation
                strategy["apply"]()
                self.current_degradations.add(strategy["name"])
                print(f"Applied degradation: {strategy['name']}")
            
            elif not should_degrade and is_degraded:
                # Revert degradation
                strategy["revert"]()
                self.current_degradations.remove(strategy["name"])
                print(f"Reverted degradation: {strategy['name']}")
    
    def _collect_metrics(self) -> Dict[str, float]:
        """Collect system metrics"""
        return {
            "cpu_percent": psutil.cpu_percent(interval=1),
            "memory_percent": psutil.virtual_memory().percent,
            "disk_usage": psutil.disk_usage('/').percent,
            "active_threads": threading.active_count()
        }
    
    def get_health_status(self) -> Dict[str, Any]:
        """Get current health status"""
        if not self.metrics_history:
            return {"status": "unknown", "degradations": []}
        
        latest_metrics = self.metrics_history[-1]
        
        # Determine overall health
        if latest_metrics["cpu_percent"] > self.thresholds.cpu_critical or \
           latest_metrics["memory_percent"] > self.thresholds.memory_critical:
            status = "critical"
        elif latest_metrics["cpu_percent"] > self.thresholds.cpu_high or \
             latest_metrics["memory_percent"] > self.thresholds.memory_high:
            status = "degraded"
        else:
            status = "healthy"
        
        return {
            "status": status,
            "metrics": latest_metrics,
            "active_degradations": list(self.current_degradations)
        }

# Example usage
degradation_manager = ResourceAwareDegradation()

# Add degradation strategies
degradation_manager.add_degradation_strategy(
    name="disable_caching",
    condition=lambda m: m["memory_percent"] > 85,
    apply=lambda: print("Caching disabled"),
    revert=lambda: print("Caching enabled")
)

degradation_manager.add_degradation_strategy(
    name="reduce_concurrency",
    condition=lambda m: m["cpu_percent"] > 90,
    apply=lambda: print("Reduced concurrency"),
    revert=lambda: print("Normal concurrency")
)

3. Fallback Chain Pattern

Implement a chain of fallback options:
from typing import List, TypeVar, Generic, Optional

T = TypeVar('T')

class FallbackChain(Generic[T]):
    def __init__(self):
        self.handlers: List[Callable[..., T]] = []
        self.fallback_metrics = {
            "attempts": 0,
            "failures_by_level": {}
        }
    
    def add_handler(self, handler: Callable[..., T], name: str = None):
        """Add a handler to the fallback chain"""
        handler_name = name or handler.__name__
        self.handlers.append((handler_name, handler))
        self.fallback_metrics["failures_by_level"][handler_name] = 0
    
    def execute(self, *args, **kwargs) -> Optional[T]:
        """Execute handlers in order until one succeeds"""
        self.fallback_metrics["attempts"] += 1
        
        for i, (name, handler) in enumerate(self.handlers):
            try:
                result = handler(*args, **kwargs)
                
                # Log successful handler
                if i > 0:
                    print(f"Succeeded with fallback handler: {name}")
                
                return result
                
            except Exception as e:
                self.fallback_metrics["failures_by_level"][name] += 1
                
                # Log failure and continue to next handler
                print(f"Handler '{name}' failed: {str(e)}")
                
                if i == len(self.handlers) - 1:
                    # Last handler failed
                    raise Exception("All handlers in fallback chain failed")
        
        return None
    
    def get_metrics(self) -> Dict[str, Any]:
        """Get fallback chain metrics"""
        return {
            **self.fallback_metrics,
            "success_rate": 1 - (sum(self.fallback_metrics["failures_by_level"].values()) / 
                               max(self.fallback_metrics["attempts"], 1))
        }

# Example: Multi-level data retrieval
class DataRetriever:
    def __init__(self):
        self.fallback_chain = FallbackChain[Dict]()
        self._setup_fallback_chain()
    
    def _setup_fallback_chain(self):
        """Setup fallback chain for data retrieval"""
        # Primary: Fast cache
        self.fallback_chain.add_handler(
            self._get_from_cache,
            "cache"
        )
        
        # Secondary: Database
        self.fallback_chain.add_handler(
            self._get_from_database,
            "database"
        )
        
        # Tertiary: External API
        self.fallback_chain.add_handler(
            self._get_from_api,
            "external_api"
        )
        
        # Last resort: Default/cached data
        self.fallback_chain.add_handler(
            self._get_default_data,
            "default"
        )
    
    def get_data(self, key: str) -> Dict:
        """Get data with automatic fallback"""
        return self.fallback_chain.execute(key)
    
    def _get_from_cache(self, key: str) -> Dict:
        # Simulate cache lookup
        if random.random() > 0.8:  # 20% cache miss
            raise Exception("Cache miss")
        return {"source": "cache", "data": f"cached_{key}"}
    
    def _get_from_database(self, key: str) -> Dict:
        # Simulate database lookup
        if random.random() > 0.9:  # 10% failure
            raise Exception("Database unavailable")
        return {"source": "database", "data": f"db_{key}"}
    
    def _get_from_api(self, key: str) -> Dict:
        # Simulate API call
        if random.random() > 0.7:  # 30% failure
            raise Exception("API timeout")
        return {"source": "api", "data": f"api_{key}"}
    
    def _get_default_data(self, key: str) -> Dict:
        # Always succeeds with default data
        return {"source": "default", "data": "default_value"}

4. Circuit Breaker with Degradation

Combine circuit breaker with graceful degradation:
from datetime import datetime, timedelta

class DegradingCircuitBreaker:
    def __init__(self, failure_threshold: int = 5, 
                 recovery_timeout: int = 60,
                 degradation_levels: List[str] = None):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.degradation_levels = degradation_levels or [
            "full", "partial", "minimal", "offline"
        ]
        
        self.failure_count = 0
        self.last_failure_time = None
        self.current_level_index = 0
        self.state = "closed"  # closed, open, half-open
    
    @property
    def current_level(self) -> str:
        """Get current degradation level"""
        return self.degradation_levels[self.current_level_index]
    
    def call(self, func: Callable, fallback: Optional[Callable] = None, 
             *args, **kwargs) -> Any:
        """Execute function with circuit breaker protection"""
        # Check if circuit should be reset
        if self.state == "open":
            if self._should_attempt_reset():
                self.state = "half-open"
            else:
                # Circuit is open, use fallback or fail
                if fallback:
                    return self._execute_with_degradation(fallback, *args, **kwargs)
                raise Exception("Circuit breaker is open")
        
        try:
            # Attempt to execute function
            result = func(*args, **kwargs)
            
            # Success - reset on half-open
            if self.state == "half-open":
                self._reset()
            
            return result
            
        except Exception as e:
            self._record_failure()
            
            # Use fallback if available
            if fallback:
                return self._execute_with_degradation(fallback, *args, **kwargs)
            
            raise e
    
    def _record_failure(self):
        """Record a failure and potentially open circuit"""
        self.failure_count += 1
        self.last_failure_time = datetime.now()
        
        if self.failure_count >= self.failure_threshold:
            self.state = "open"
            self._degrade()
    
    def _should_attempt_reset(self) -> bool:
        """Check if enough time has passed to attempt reset"""
        return (datetime.now() - self.last_failure_time).seconds >= self.recovery_timeout
    
    def _reset(self):
        """Reset circuit breaker"""
        self.failure_count = 0
        self.last_failure_time = None
        self.state = "closed"
        self._restore()
    
    def _degrade(self):
        """Move to next degradation level"""
        if self.current_level_index < len(self.degradation_levels) - 1:
            self.current_level_index += 1
            print(f"Degraded to: {self.current_level}")
    
    def _restore(self):
        """Move to previous degradation level"""
        if self.current_level_index > 0:
            self.current_level_index -= 1
            print(f"Restored to: {self.current_level}")
    
    def _execute_with_degradation(self, func: Callable, *args, **kwargs) -> Any:
        """Execute function with current degradation level"""
        # Pass degradation level to function
        if 'degradation_level' in inspect.signature(func).parameters:
            kwargs['degradation_level'] = self.current_level
        
        return func(*args, **kwargs)

5. Adaptive Timeout Pattern

Adjust timeouts based on system performance:
import statistics

class AdaptiveTimeout:
    def __init__(self, initial_timeout: float = 5.0,
                 min_timeout: float = 1.0,
                 max_timeout: float = 30.0):
        self.initial_timeout = initial_timeout
        self.min_timeout = min_timeout
        self.max_timeout = max_timeout
        
        self.current_timeout = initial_timeout
        self.response_times = []
        self.timeout_history = []
    
    def execute_with_timeout(self, func: Callable, *args, **kwargs) -> Any:
        """Execute function with adaptive timeout"""
        import signal
        
        def timeout_handler(signum, frame):
            raise TimeoutError(f"Operation timed out after {self.current_timeout}s")
        
        # Set timeout
        signal.signal(signal.SIGALRM, timeout_handler)
        signal.alarm(int(self.current_timeout))
        
        start_time = time.time()
        
        try:
            result = func(*args, **kwargs)
            
            # Record successful response time
            response_time = time.time() - start_time
            self._record_response_time(response_time)
            
            return result
            
        except TimeoutError:
            # Increase timeout for next attempt
            self._increase_timeout()
            raise
            
        finally:
            # Cancel alarm
            signal.alarm(0)
    
    def _record_response_time(self, response_time: float):
        """Record response time and adjust timeout"""
        self.response_times.append(response_time)
        
        # Keep only last 100 response times
        if len(self.response_times) > 100:
            self.response_times.pop(0)
        
        # Adjust timeout based on statistics
        if len(self.response_times) >= 10:
            # Calculate P95 response time
            p95 = statistics.quantiles(self.response_times, n=20)[18]  # 95th percentile
            
            # Set timeout to P95 + 50% margin
            new_timeout = p95 * 1.5
            
            # Apply bounds
            self.current_timeout = max(
                self.min_timeout,
                min(self.max_timeout, new_timeout)
            )
            
            self.timeout_history.append({
                "timestamp": time.time(),
                "timeout": self.current_timeout,
                "based_on_p95": p95
            })
    
    def _increase_timeout(self):
        """Increase timeout after failure"""
        self.current_timeout = min(
            self.max_timeout,
            self.current_timeout * 1.5
        )
    
    def get_stats(self) -> Dict[str, Any]:
        """Get timeout statistics"""
        if not self.response_times:
            return {"current_timeout": self.current_timeout}
        
        return {
            "current_timeout": self.current_timeout,
            "avg_response_time": statistics.mean(self.response_times),
            "p95_response_time": statistics.quantiles(self.response_times, n=20)[18],
            "timeout_adjustments": len(self.timeout_history)
        }

Implementation Strategies

1. Health-Based Routing

Route requests based on service health:
class HealthBasedRouter:
    def __init__(self):
        self.services = {}
        self.health_scores = {}
        self.routing_stats = defaultdict(int)
    
    def register_service(self, name: str, service: Any, 
                        health_check: Callable[[], float]):
        """Register a service with health check"""
        self.services[name] = {
            "instance": service,
            "health_check": health_check
        }
    
    def route_request(self, request: Any) -> Any:
        """Route request to healthiest service"""
        # Update health scores
        self._update_health_scores()
        
        # Get services sorted by health
        healthy_services = [
            (name, score) for name, score in self.health_scores.items()
            if score > 0.2  # Minimum health threshold
        ]
        
        if not healthy_services:
            raise Exception("No healthy services available")
        
        # Sort by health score
        healthy_services.sort(key=lambda x: x[1], reverse=True)
        
        # Try services in order of health
        for service_name, health_score in healthy_services:
            try:
                service = self.services[service_name]["instance"]
                result = service.handle_request(request)
                
                self.routing_stats[service_name] += 1
                return result
                
            except Exception as e:
                print(f"Service {service_name} failed: {e}")
                continue
        
        raise Exception("All services failed")
    
    def _update_health_scores(self):
        """Update health scores for all services"""
        for name, service_info in self.services.items():
            try:
                score = service_info["health_check"]()
                self.health_scores[name] = score
            except:
                self.health_scores[name] = 0.0

2. Load Shedding

Drop non-critical requests under load:
from enum import Enum
import hashlib

class RequestPriority(Enum):
    CRITICAL = 4
    HIGH = 3
    NORMAL = 2
    LOW = 1

class LoadShedder:
    def __init__(self, capacity: int = 1000):
        self.capacity = capacity
        self.current_load = 0
        self.shed_threshold = 0.8
        self.priority_thresholds = {
            RequestPriority.LOW: 0.6,
            RequestPriority.NORMAL: 0.8,
            RequestPriority.HIGH: 0.9,
            RequestPriority.CRITICAL: 1.0
        }
        self.stats = defaultdict(int)
    
    def should_accept_request(self, request_id: str, 
                            priority: RequestPriority) -> bool:
        """Determine if request should be accepted"""
        load_ratio = self.current_load / self.capacity
        
        # Always accept critical requests if possible
        if priority == RequestPriority.CRITICAL and load_ratio < 1.0:
            return True
        
        # Check against priority threshold
        threshold = self.priority_thresholds[priority]
        
        if load_ratio >= threshold:
            # Shed request
            self.stats[f"shed_{priority.name}"] += 1
            return False
        
        # Probabilistic shedding for smoother degradation
        if load_ratio > self.shed_threshold:
            # Calculate shedding probability
            shed_probability = (load_ratio - self.shed_threshold) / (1.0 - self.shed_threshold)
            
            # Use request ID for deterministic random decision
            hash_value = int(hashlib.md5(request_id.encode()).hexdigest(), 16)
            if (hash_value % 100) / 100 < shed_probability:
                self.stats[f"probabilistic_shed_{priority.name}"] += 1
                return False
        
        self.stats[f"accepted_{priority.name}"] += 1
        return True
    
    def update_load(self, current_load: int):
        """Update current load"""
        self.current_load = current_load
    
    def get_shedding_stats(self) -> Dict[str, Any]:
        """Get load shedding statistics"""
        total_requests = sum(self.stats.values())
        shed_requests = sum(v for k, v in self.stats.items() if 'shed' in k)
        
        return {
            "load_ratio": self.current_load / self.capacity,
            "total_requests": total_requests,
            "shed_requests": shed_requests,
            "shed_rate": shed_requests / max(total_requests, 1),
            "by_priority": dict(self.stats)
        }

Monitoring and Alerting

Degradation Dashboard

class DegradationMonitor:
    def __init__(self):
        self.services = {}
        self.degradation_events = []
        self.alert_handlers = []
    
    def register_service(self, service: DegradableService):
        """Register a service for monitoring"""
        self.services[service.name] = service
    
    def add_alert_handler(self, handler: Callable[[Dict], None]):
        """Add alert handler"""
        self.alert_handlers.append(handler)
    
    def check_services(self):
        """Check all services and generate alerts"""
        for name, service in self.services.items():
            previous_level = getattr(service, '_previous_level', service.current_level)
            
            if service.current_level != previous_level:
                event = {
                    "timestamp": datetime.now(),
                    "service": name,
                    "previous_level": previous_level.value,
                    "current_level": service.current_level.value,
                    "direction": "degraded" if service.current_level.value < previous_level.value else "restored"
                }
                
                self.degradation_events.append(event)
                
                # Send alerts
                for handler in self.alert_handlers:
                    handler(event)
            
            service._previous_level = service.current_level
    
    def get_system_status(self) -> Dict[str, Any]:
        """Get overall system status"""
        service_levels = {}
        degraded_count = 0
        
        for name, service in self.services.items():
            service_levels[name] = service.current_level.value
            if service.current_level != ServiceLevel.FULL:
                degraded_count += 1
        
        return {
            "overall_health": "healthy" if degraded_count == 0 else "degraded",
            "degraded_services": degraded_count,
            "total_services": len(self.services),
            "service_levels": service_levels,
            "recent_events": self.degradation_events[-10:]
        }

Best Practices

  1. Test Degradation Paths: Regularly test all degradation scenarios
    def test_degradation_scenario():
        service = IntelligentAssistant("test")
        
        # Test each level
        for level in [ServiceLevel.DEGRADED, ServiceLevel.MINIMAL]:
            service.degrade()
            response = service.process_request("test query")
            assert response is not None
            assert service.current_level == level
    
  2. Monitor Degradation Metrics: Track when and why degradation occurs
    def log_degradation_metrics(service_name: str, reason: str, level: str):
        metrics = {
            "service": service_name,
            "reason": reason,
            "level": level,
            "timestamp": datetime.now(),
            "impact": calculate_impact(level)
        }
        
        # Log to monitoring system
        monitoring.record("degradation", metrics)
    
  3. Communicate Status: Keep users informed
    def get_user_friendly_status(service_level: ServiceLevel) -> str:
        messages = {
            ServiceLevel.FULL: "All features available",
            ServiceLevel.DEGRADED: "Running with reduced features for stability",
            ServiceLevel.MINIMAL: "Basic features only - we're working on it",
            ServiceLevel.OFFLINE: "Service temporarily unavailable"
        }
        return messages.get(service_level, "Unknown status")
    

Testing Graceful Degradation

import pytest
from unittest.mock import Mock, patch

def test_capability_degradation():
    assistant = IntelligentAssistant("test")
    
    # Test full capabilities
    assert "complex_reasoning" in assistant.get_available_capabilities()
    
    # Test degradation
    assistant.degrade()
    assert assistant.current_level == ServiceLevel.DEGRADED
    assert "complex_reasoning" not in assistant.get_available_capabilities()
    assert "simple_reasoning" in assistant.get_available_capabilities()

def test_fallback_chain():
    chain = FallbackChain[str]()
    
    # Add handlers
    chain.add_handler(lambda: Exception("Primary failed"), "primary")
    chain.add_handler(lambda: "fallback_result", "fallback")
    
    # Execute
    result = chain.execute()
    
    assert result == "fallback_result"
    assert chain.get_metrics()["failures_by_level"]["primary"] == 1

@patch('psutil.cpu_percent')
@patch('psutil.virtual_memory')
def test_resource_degradation(mock_memory, mock_cpu):
    # Simulate high CPU
    mock_cpu.return_value = 95.0
    mock_memory.return_value = Mock(percent=50.0)
    
    manager = ResourceAwareDegradation()
    
    # Add strategy
    degraded = False
    def set_degraded(): 
        nonlocal degraded
        degraded = True
    
    manager.add_degradation_strategy(
        "test",
        lambda m: m["cpu_percent"] > 90,
        set_degraded,
        lambda: None
    )
    
    manager.check_and_adjust()
    
    assert degraded
    assert "test" in manager.current_degradations

Conclusion

Graceful degradation is essential for building resilient multi-agent systems. By implementing these patterns, your system can maintain service availability even under adverse conditions, providing a better user experience and operational stability.