Graceful Degradation Patterns
Graceful degradation ensures your multi-agent system continues to provide value even when components fail or resources are constrained. This guide covers patterns for building resilient systems that fail gracefully.Core Principles
Design for Partial Failure
- Service Continuity: Maintain core functionality when non-critical components fail
- Progressive Enhancement: Build from minimal viable functionality upward
- Fallback Strategies: Always have a Plan B (and C)
- User Communication: Keep users informed about degraded functionality
- Automatic Recovery: Self-heal when conditions improve
Degradation Patterns
1. Capability Degradation
Reduce functionality while maintaining core services:Copy
from enum import Enum
from typing import Dict, List, Any, Optional
from abc import ABC, abstractmethod
class ServiceLevel(Enum):
FULL = "full"
DEGRADED = "degraded"
MINIMAL = "minimal"
OFFLINE = "offline"
class DegradableService(ABC):
def __init__(self, name: str):
self.name = name
self.current_level = ServiceLevel.FULL
self.capabilities = self._define_capabilities()
@abstractmethod
def _define_capabilities(self) -> Dict[ServiceLevel, List[str]]:
"""Define capabilities available at each service level"""
pass
def get_available_capabilities(self) -> List[str]:
"""Get currently available capabilities"""
return self.capabilities.get(self.current_level, [])
def degrade(self):
"""Degrade to next lower service level"""
levels = [ServiceLevel.FULL, ServiceLevel.DEGRADED,
ServiceLevel.MINIMAL, ServiceLevel.OFFLINE]
current_index = levels.index(self.current_level)
if current_index < len(levels) - 1:
self.current_level = levels[current_index + 1]
self._on_degrade()
def restore(self):
"""Restore to next higher service level"""
levels = [ServiceLevel.OFFLINE, ServiceLevel.MINIMAL,
ServiceLevel.DEGRADED, ServiceLevel.FULL]
current_index = levels.index(self.current_level)
if current_index < len(levels) - 1:
self.current_level = levels[current_index + 1]
self._on_restore()
@abstractmethod
def _on_degrade(self):
"""Hook for degradation actions"""
pass
@abstractmethod
def _on_restore(self):
"""Hook for restoration actions"""
pass
class IntelligentAssistant(DegradableService):
def _define_capabilities(self) -> Dict[ServiceLevel, List[str]]:
return {
ServiceLevel.FULL: [
"natural_language_understanding",
"context_awareness",
"multi_turn_conversation",
"personalization",
"proactive_suggestions",
"complex_reasoning"
],
ServiceLevel.DEGRADED: [
"natural_language_understanding",
"basic_context",
"single_turn_responses",
"simple_reasoning"
],
ServiceLevel.MINIMAL: [
"keyword_matching",
"predefined_responses",
"basic_commands"
],
ServiceLevel.OFFLINE: []
}
def process_request(self, request: str) -> str:
"""Process request based on current service level"""
capabilities = self.get_available_capabilities()
if self.current_level == ServiceLevel.FULL:
return self._full_processing(request)
elif self.current_level == ServiceLevel.DEGRADED:
return self._degraded_processing(request)
elif self.current_level == ServiceLevel.MINIMAL:
return self._minimal_processing(request)
else:
return "Service temporarily unavailable"
def _full_processing(self, request: str) -> str:
# Full NLU and reasoning
return f"[FULL] Processed with all capabilities: {request}"
def _degraded_processing(self, request: str) -> str:
# Simplified processing
return f"[DEGRADED] Basic response to: {request}"
def _minimal_processing(self, request: str) -> str:
# Keyword-based responses
keywords = ["help", "status", "error"]
for keyword in keywords:
if keyword in request.lower():
return f"[MINIMAL] Detected keyword '{keyword}'"
return "[MINIMAL] Please try basic commands"
def _on_degrade(self):
print(f"Assistant degraded to {self.current_level.value}")
def _on_restore(self):
print(f"Assistant restored to {self.current_level.value}")
2. Resource-Based Degradation
Adjust behavior based on available resources:Copy
import psutil
from dataclasses import dataclass
from typing import Callable
@dataclass
class ResourceThresholds:
cpu_high: float = 80.0
cpu_critical: float = 95.0
memory_high: float = 80.0
memory_critical: float = 95.0
response_time_high: float = 2.0 # seconds
response_time_critical: float = 5.0
class ResourceAwareDegradation:
def __init__(self, thresholds: ResourceThresholds = None):
self.thresholds = thresholds or ResourceThresholds()
self.degradation_strategies = []
self.current_degradations = set()
self.metrics_history = []
def add_degradation_strategy(self, name: str,
condition: Callable[[Dict], bool],
apply: Callable[[], None],
revert: Callable[[], None]):
"""Add a degradation strategy"""
self.degradation_strategies.append({
"name": name,
"condition": condition,
"apply": apply,
"revert": revert
})
def check_and_adjust(self):
"""Check resources and adjust degradation level"""
metrics = self._collect_metrics()
self.metrics_history.append(metrics)
# Keep only last 10 metrics
if len(self.metrics_history) > 10:
self.metrics_history.pop(0)
for strategy in self.degradation_strategies:
should_degrade = strategy["condition"](metrics)
is_degraded = strategy["name"] in self.current_degradations
if should_degrade and not is_degraded:
# Apply degradation
strategy["apply"]()
self.current_degradations.add(strategy["name"])
print(f"Applied degradation: {strategy['name']}")
elif not should_degrade and is_degraded:
# Revert degradation
strategy["revert"]()
self.current_degradations.remove(strategy["name"])
print(f"Reverted degradation: {strategy['name']}")
def _collect_metrics(self) -> Dict[str, float]:
"""Collect system metrics"""
return {
"cpu_percent": psutil.cpu_percent(interval=1),
"memory_percent": psutil.virtual_memory().percent,
"disk_usage": psutil.disk_usage('/').percent,
"active_threads": threading.active_count()
}
def get_health_status(self) -> Dict[str, Any]:
"""Get current health status"""
if not self.metrics_history:
return {"status": "unknown", "degradations": []}
latest_metrics = self.metrics_history[-1]
# Determine overall health
if latest_metrics["cpu_percent"] > self.thresholds.cpu_critical or \
latest_metrics["memory_percent"] > self.thresholds.memory_critical:
status = "critical"
elif latest_metrics["cpu_percent"] > self.thresholds.cpu_high or \
latest_metrics["memory_percent"] > self.thresholds.memory_high:
status = "degraded"
else:
status = "healthy"
return {
"status": status,
"metrics": latest_metrics,
"active_degradations": list(self.current_degradations)
}
# Example usage
degradation_manager = ResourceAwareDegradation()
# Add degradation strategies
degradation_manager.add_degradation_strategy(
name="disable_caching",
condition=lambda m: m["memory_percent"] > 85,
apply=lambda: print("Caching disabled"),
revert=lambda: print("Caching enabled")
)
degradation_manager.add_degradation_strategy(
name="reduce_concurrency",
condition=lambda m: m["cpu_percent"] > 90,
apply=lambda: print("Reduced concurrency"),
revert=lambda: print("Normal concurrency")
)
3. Fallback Chain Pattern
Implement a chain of fallback options:Copy
from typing import List, TypeVar, Generic, Optional
T = TypeVar('T')
class FallbackChain(Generic[T]):
def __init__(self):
self.handlers: List[Callable[..., T]] = []
self.fallback_metrics = {
"attempts": 0,
"failures_by_level": {}
}
def add_handler(self, handler: Callable[..., T], name: str = None):
"""Add a handler to the fallback chain"""
handler_name = name or handler.__name__
self.handlers.append((handler_name, handler))
self.fallback_metrics["failures_by_level"][handler_name] = 0
def execute(self, *args, **kwargs) -> Optional[T]:
"""Execute handlers in order until one succeeds"""
self.fallback_metrics["attempts"] += 1
for i, (name, handler) in enumerate(self.handlers):
try:
result = handler(*args, **kwargs)
# Log successful handler
if i > 0:
print(f"Succeeded with fallback handler: {name}")
return result
except Exception as e:
self.fallback_metrics["failures_by_level"][name] += 1
# Log failure and continue to next handler
print(f"Handler '{name}' failed: {str(e)}")
if i == len(self.handlers) - 1:
# Last handler failed
raise Exception("All handlers in fallback chain failed")
return None
def get_metrics(self) -> Dict[str, Any]:
"""Get fallback chain metrics"""
return {
**self.fallback_metrics,
"success_rate": 1 - (sum(self.fallback_metrics["failures_by_level"].values()) /
max(self.fallback_metrics["attempts"], 1))
}
# Example: Multi-level data retrieval
class DataRetriever:
def __init__(self):
self.fallback_chain = FallbackChain[Dict]()
self._setup_fallback_chain()
def _setup_fallback_chain(self):
"""Setup fallback chain for data retrieval"""
# Primary: Fast cache
self.fallback_chain.add_handler(
self._get_from_cache,
"cache"
)
# Secondary: Database
self.fallback_chain.add_handler(
self._get_from_database,
"database"
)
# Tertiary: External API
self.fallback_chain.add_handler(
self._get_from_api,
"external_api"
)
# Last resort: Default/cached data
self.fallback_chain.add_handler(
self._get_default_data,
"default"
)
def get_data(self, key: str) -> Dict:
"""Get data with automatic fallback"""
return self.fallback_chain.execute(key)
def _get_from_cache(self, key: str) -> Dict:
# Simulate cache lookup
if random.random() > 0.8: # 20% cache miss
raise Exception("Cache miss")
return {"source": "cache", "data": f"cached_{key}"}
def _get_from_database(self, key: str) -> Dict:
# Simulate database lookup
if random.random() > 0.9: # 10% failure
raise Exception("Database unavailable")
return {"source": "database", "data": f"db_{key}"}
def _get_from_api(self, key: str) -> Dict:
# Simulate API call
if random.random() > 0.7: # 30% failure
raise Exception("API timeout")
return {"source": "api", "data": f"api_{key}"}
def _get_default_data(self, key: str) -> Dict:
# Always succeeds with default data
return {"source": "default", "data": "default_value"}
4. Circuit Breaker with Degradation
Combine circuit breaker with graceful degradation:Copy
from datetime import datetime, timedelta
class DegradingCircuitBreaker:
def __init__(self, failure_threshold: int = 5,
recovery_timeout: int = 60,
degradation_levels: List[str] = None):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.degradation_levels = degradation_levels or [
"full", "partial", "minimal", "offline"
]
self.failure_count = 0
self.last_failure_time = None
self.current_level_index = 0
self.state = "closed" # closed, open, half-open
@property
def current_level(self) -> str:
"""Get current degradation level"""
return self.degradation_levels[self.current_level_index]
def call(self, func: Callable, fallback: Optional[Callable] = None,
*args, **kwargs) -> Any:
"""Execute function with circuit breaker protection"""
# Check if circuit should be reset
if self.state == "open":
if self._should_attempt_reset():
self.state = "half-open"
else:
# Circuit is open, use fallback or fail
if fallback:
return self._execute_with_degradation(fallback, *args, **kwargs)
raise Exception("Circuit breaker is open")
try:
# Attempt to execute function
result = func(*args, **kwargs)
# Success - reset on half-open
if self.state == "half-open":
self._reset()
return result
except Exception as e:
self._record_failure()
# Use fallback if available
if fallback:
return self._execute_with_degradation(fallback, *args, **kwargs)
raise e
def _record_failure(self):
"""Record a failure and potentially open circuit"""
self.failure_count += 1
self.last_failure_time = datetime.now()
if self.failure_count >= self.failure_threshold:
self.state = "open"
self._degrade()
def _should_attempt_reset(self) -> bool:
"""Check if enough time has passed to attempt reset"""
return (datetime.now() - self.last_failure_time).seconds >= self.recovery_timeout
def _reset(self):
"""Reset circuit breaker"""
self.failure_count = 0
self.last_failure_time = None
self.state = "closed"
self._restore()
def _degrade(self):
"""Move to next degradation level"""
if self.current_level_index < len(self.degradation_levels) - 1:
self.current_level_index += 1
print(f"Degraded to: {self.current_level}")
def _restore(self):
"""Move to previous degradation level"""
if self.current_level_index > 0:
self.current_level_index -= 1
print(f"Restored to: {self.current_level}")
def _execute_with_degradation(self, func: Callable, *args, **kwargs) -> Any:
"""Execute function with current degradation level"""
# Pass degradation level to function
if 'degradation_level' in inspect.signature(func).parameters:
kwargs['degradation_level'] = self.current_level
return func(*args, **kwargs)
5. Adaptive Timeout Pattern
Adjust timeouts based on system performance:Copy
import statistics
class AdaptiveTimeout:
def __init__(self, initial_timeout: float = 5.0,
min_timeout: float = 1.0,
max_timeout: float = 30.0):
self.initial_timeout = initial_timeout
self.min_timeout = min_timeout
self.max_timeout = max_timeout
self.current_timeout = initial_timeout
self.response_times = []
self.timeout_history = []
def execute_with_timeout(self, func: Callable, *args, **kwargs) -> Any:
"""Execute function with adaptive timeout"""
import signal
def timeout_handler(signum, frame):
raise TimeoutError(f"Operation timed out after {self.current_timeout}s")
# Set timeout
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(int(self.current_timeout))
start_time = time.time()
try:
result = func(*args, **kwargs)
# Record successful response time
response_time = time.time() - start_time
self._record_response_time(response_time)
return result
except TimeoutError:
# Increase timeout for next attempt
self._increase_timeout()
raise
finally:
# Cancel alarm
signal.alarm(0)
def _record_response_time(self, response_time: float):
"""Record response time and adjust timeout"""
self.response_times.append(response_time)
# Keep only last 100 response times
if len(self.response_times) > 100:
self.response_times.pop(0)
# Adjust timeout based on statistics
if len(self.response_times) >= 10:
# Calculate P95 response time
p95 = statistics.quantiles(self.response_times, n=20)[18] # 95th percentile
# Set timeout to P95 + 50% margin
new_timeout = p95 * 1.5
# Apply bounds
self.current_timeout = max(
self.min_timeout,
min(self.max_timeout, new_timeout)
)
self.timeout_history.append({
"timestamp": time.time(),
"timeout": self.current_timeout,
"based_on_p95": p95
})
def _increase_timeout(self):
"""Increase timeout after failure"""
self.current_timeout = min(
self.max_timeout,
self.current_timeout * 1.5
)
def get_stats(self) -> Dict[str, Any]:
"""Get timeout statistics"""
if not self.response_times:
return {"current_timeout": self.current_timeout}
return {
"current_timeout": self.current_timeout,
"avg_response_time": statistics.mean(self.response_times),
"p95_response_time": statistics.quantiles(self.response_times, n=20)[18],
"timeout_adjustments": len(self.timeout_history)
}
Implementation Strategies
1. Health-Based Routing
Route requests based on service health:Copy
class HealthBasedRouter:
def __init__(self):
self.services = {}
self.health_scores = {}
self.routing_stats = defaultdict(int)
def register_service(self, name: str, service: Any,
health_check: Callable[[], float]):
"""Register a service with health check"""
self.services[name] = {
"instance": service,
"health_check": health_check
}
def route_request(self, request: Any) -> Any:
"""Route request to healthiest service"""
# Update health scores
self._update_health_scores()
# Get services sorted by health
healthy_services = [
(name, score) for name, score in self.health_scores.items()
if score > 0.2 # Minimum health threshold
]
if not healthy_services:
raise Exception("No healthy services available")
# Sort by health score
healthy_services.sort(key=lambda x: x[1], reverse=True)
# Try services in order of health
for service_name, health_score in healthy_services:
try:
service = self.services[service_name]["instance"]
result = service.handle_request(request)
self.routing_stats[service_name] += 1
return result
except Exception as e:
print(f"Service {service_name} failed: {e}")
continue
raise Exception("All services failed")
def _update_health_scores(self):
"""Update health scores for all services"""
for name, service_info in self.services.items():
try:
score = service_info["health_check"]()
self.health_scores[name] = score
except:
self.health_scores[name] = 0.0
2. Load Shedding
Drop non-critical requests under load:Copy
from enum import Enum
import hashlib
class RequestPriority(Enum):
CRITICAL = 4
HIGH = 3
NORMAL = 2
LOW = 1
class LoadShedder:
def __init__(self, capacity: int = 1000):
self.capacity = capacity
self.current_load = 0
self.shed_threshold = 0.8
self.priority_thresholds = {
RequestPriority.LOW: 0.6,
RequestPriority.NORMAL: 0.8,
RequestPriority.HIGH: 0.9,
RequestPriority.CRITICAL: 1.0
}
self.stats = defaultdict(int)
def should_accept_request(self, request_id: str,
priority: RequestPriority) -> bool:
"""Determine if request should be accepted"""
load_ratio = self.current_load / self.capacity
# Always accept critical requests if possible
if priority == RequestPriority.CRITICAL and load_ratio < 1.0:
return True
# Check against priority threshold
threshold = self.priority_thresholds[priority]
if load_ratio >= threshold:
# Shed request
self.stats[f"shed_{priority.name}"] += 1
return False
# Probabilistic shedding for smoother degradation
if load_ratio > self.shed_threshold:
# Calculate shedding probability
shed_probability = (load_ratio - self.shed_threshold) / (1.0 - self.shed_threshold)
# Use request ID for deterministic random decision
hash_value = int(hashlib.md5(request_id.encode()).hexdigest(), 16)
if (hash_value % 100) / 100 < shed_probability:
self.stats[f"probabilistic_shed_{priority.name}"] += 1
return False
self.stats[f"accepted_{priority.name}"] += 1
return True
def update_load(self, current_load: int):
"""Update current load"""
self.current_load = current_load
def get_shedding_stats(self) -> Dict[str, Any]:
"""Get load shedding statistics"""
total_requests = sum(self.stats.values())
shed_requests = sum(v for k, v in self.stats.items() if 'shed' in k)
return {
"load_ratio": self.current_load / self.capacity,
"total_requests": total_requests,
"shed_requests": shed_requests,
"shed_rate": shed_requests / max(total_requests, 1),
"by_priority": dict(self.stats)
}
Monitoring and Alerting
Degradation Dashboard
Copy
class DegradationMonitor:
def __init__(self):
self.services = {}
self.degradation_events = []
self.alert_handlers = []
def register_service(self, service: DegradableService):
"""Register a service for monitoring"""
self.services[service.name] = service
def add_alert_handler(self, handler: Callable[[Dict], None]):
"""Add alert handler"""
self.alert_handlers.append(handler)
def check_services(self):
"""Check all services and generate alerts"""
for name, service in self.services.items():
previous_level = getattr(service, '_previous_level', service.current_level)
if service.current_level != previous_level:
event = {
"timestamp": datetime.now(),
"service": name,
"previous_level": previous_level.value,
"current_level": service.current_level.value,
"direction": "degraded" if service.current_level.value < previous_level.value else "restored"
}
self.degradation_events.append(event)
# Send alerts
for handler in self.alert_handlers:
handler(event)
service._previous_level = service.current_level
def get_system_status(self) -> Dict[str, Any]:
"""Get overall system status"""
service_levels = {}
degraded_count = 0
for name, service in self.services.items():
service_levels[name] = service.current_level.value
if service.current_level != ServiceLevel.FULL:
degraded_count += 1
return {
"overall_health": "healthy" if degraded_count == 0 else "degraded",
"degraded_services": degraded_count,
"total_services": len(self.services),
"service_levels": service_levels,
"recent_events": self.degradation_events[-10:]
}
Best Practices
-
Test Degradation Paths: Regularly test all degradation scenarios
Copy
def test_degradation_scenario(): service = IntelligentAssistant("test") # Test each level for level in [ServiceLevel.DEGRADED, ServiceLevel.MINIMAL]: service.degrade() response = service.process_request("test query") assert response is not None assert service.current_level == level -
Monitor Degradation Metrics: Track when and why degradation occurs
Copy
def log_degradation_metrics(service_name: str, reason: str, level: str): metrics = { "service": service_name, "reason": reason, "level": level, "timestamp": datetime.now(), "impact": calculate_impact(level) } # Log to monitoring system monitoring.record("degradation", metrics) -
Communicate Status: Keep users informed
Copy
def get_user_friendly_status(service_level: ServiceLevel) -> str: messages = { ServiceLevel.FULL: "All features available", ServiceLevel.DEGRADED: "Running with reduced features for stability", ServiceLevel.MINIMAL: "Basic features only - we're working on it", ServiceLevel.OFFLINE: "Service temporarily unavailable" } return messages.get(service_level, "Unknown status")
Testing Graceful Degradation
Copy
import pytest
from unittest.mock import Mock, patch
def test_capability_degradation():
assistant = IntelligentAssistant("test")
# Test full capabilities
assert "complex_reasoning" in assistant.get_available_capabilities()
# Test degradation
assistant.degrade()
assert assistant.current_level == ServiceLevel.DEGRADED
assert "complex_reasoning" not in assistant.get_available_capabilities()
assert "simple_reasoning" in assistant.get_available_capabilities()
def test_fallback_chain():
chain = FallbackChain[str]()
# Add handlers
chain.add_handler(lambda: Exception("Primary failed"), "primary")
chain.add_handler(lambda: "fallback_result", "fallback")
# Execute
result = chain.execute()
assert result == "fallback_result"
assert chain.get_metrics()["failures_by_level"]["primary"] == 1
@patch('psutil.cpu_percent')
@patch('psutil.virtual_memory')
def test_resource_degradation(mock_memory, mock_cpu):
# Simulate high CPU
mock_cpu.return_value = 95.0
mock_memory.return_value = Mock(percent=50.0)
manager = ResourceAwareDegradation()
# Add strategy
degraded = False
def set_degraded():
nonlocal degraded
degraded = True
manager.add_degradation_strategy(
"test",
lambda m: m["cpu_percent"] > 90,
set_degraded,
lambda: None
)
manager.check_and_adjust()
assert degraded
assert "test" in manager.current_degradations

