Documentation Index
Fetch the complete documentation index at: https://docs.praison.ai/llms.txt
Use this file to discover all available pages before exploring further.
Graceful Degradation Patterns
Graceful degradation ensures your multi-agent system continues to provide value even when components fail or resources are constrained. This guide covers patterns for building resilient systems that fail gracefully.Core Principles
Design for Partial Failure
- Service Continuity: Maintain core functionality when non-critical components fail
- Progressive Enhancement: Build from minimal viable functionality upward
- Fallback Strategies: Always have a Plan B (and C)
- User Communication: Keep users informed about degraded functionality
- Automatic Recovery: Self-heal when conditions improve
Degradation Patterns
1. Capability Degradation
Reduce functionality while maintaining core services:from enum import Enum
from typing import Dict, List, Any, Optional
from abc import ABC, abstractmethod
class ServiceLevel(Enum):
FULL = "full"
DEGRADED = "degraded"
MINIMAL = "minimal"
OFFLINE = "offline"
class DegradableService(ABC):
def __init__(self, name: str):
self.name = name
self.current_level = ServiceLevel.FULL
self.capabilities = self._define_capabilities()
@abstractmethod
def _define_capabilities(self) -> Dict[ServiceLevel, List[str]]:
"""Define capabilities available at each service level"""
pass
def get_available_capabilities(self) -> List[str]:
"""Get currently available capabilities"""
return self.capabilities.get(self.current_level, [])
def degrade(self):
"""Degrade to next lower service level"""
levels = [ServiceLevel.FULL, ServiceLevel.DEGRADED,
ServiceLevel.MINIMAL, ServiceLevel.OFFLINE]
current_index = levels.index(self.current_level)
if current_index < len(levels) - 1:
self.current_level = levels[current_index + 1]
self._on_degrade()
def restore(self):
"""Restore to next higher service level"""
levels = [ServiceLevel.OFFLINE, ServiceLevel.MINIMAL,
ServiceLevel.DEGRADED, ServiceLevel.FULL]
current_index = levels.index(self.current_level)
if current_index < len(levels) - 1:
self.current_level = levels[current_index + 1]
self._on_restore()
@abstractmethod
def _on_degrade(self):
"""Hook for degradation actions"""
pass
@abstractmethod
def _on_restore(self):
"""Hook for restoration actions"""
pass
class IntelligentAssistant(DegradableService):
def _define_capabilities(self) -> Dict[ServiceLevel, List[str]]:
return {
ServiceLevel.FULL: [
"natural_language_understanding",
"context_awareness",
"multi_turn_conversation",
"personalization",
"proactive_suggestions",
"complex_reasoning"
],
ServiceLevel.DEGRADED: [
"natural_language_understanding",
"basic_context",
"single_turn_responses",
"simple_reasoning"
],
ServiceLevel.MINIMAL: [
"keyword_matching",
"predefined_responses",
"basic_commands"
],
ServiceLevel.OFFLINE: []
}
def process_request(self, request: str) -> str:
"""Process request based on current service level"""
capabilities = self.get_available_capabilities()
if self.current_level == ServiceLevel.FULL:
return self._full_processing(request)
elif self.current_level == ServiceLevel.DEGRADED:
return self._degraded_processing(request)
elif self.current_level == ServiceLevel.MINIMAL:
return self._minimal_processing(request)
else:
return "Service temporarily unavailable"
def _full_processing(self, request: str) -> str:
# Full NLU and reasoning
return f"[FULL] Processed with all capabilities: {request}"
def _degraded_processing(self, request: str) -> str:
# Simplified processing
return f"[DEGRADED] Basic response to: {request}"
def _minimal_processing(self, request: str) -> str:
# Keyword-based responses
keywords = ["help", "status", "error"]
for keyword in keywords:
if keyword in request.lower():
return f"[MINIMAL] Detected keyword '{keyword}'"
return "[MINIMAL] Please try basic commands"
def _on_degrade(self):
print(f"Assistant degraded to {self.current_level.value}")
def _on_restore(self):
print(f"Assistant restored to {self.current_level.value}")
2. Resource-Based Degradation
Adjust behavior based on available resources:import psutil
from dataclasses import dataclass
from typing import Callable
@dataclass
class ResourceThresholds:
cpu_high: float = 80.0
cpu_critical: float = 95.0
memory_high: float = 80.0
memory_critical: float = 95.0
response_time_high: float = 2.0 # seconds
response_time_critical: float = 5.0
class ResourceAwareDegradation:
def __init__(self, thresholds: ResourceThresholds = None):
self.thresholds = thresholds or ResourceThresholds()
self.degradation_strategies = []
self.current_degradations = set()
self.metrics_history = []
def add_degradation_strategy(self, name: str,
condition: Callable[[Dict], bool],
apply: Callable[[], None],
revert: Callable[[], None]):
"""Add a degradation strategy"""
self.degradation_strategies.append({
"name": name,
"condition": condition,
"apply": apply,
"revert": revert
})
def check_and_adjust(self):
"""Check resources and adjust degradation level"""
metrics = self._collect_metrics()
self.metrics_history.append(metrics)
# Keep only last 10 metrics
if len(self.metrics_history) > 10:
self.metrics_history.pop(0)
for strategy in self.degradation_strategies:
should_degrade = strategy["condition"](metrics)
is_degraded = strategy["name"] in self.current_degradations
if should_degrade and not is_degraded:
# Apply degradation
strategy["apply"]()
self.current_degradations.add(strategy["name"])
print(f"Applied degradation: {strategy['name']}")
elif not should_degrade and is_degraded:
# Revert degradation
strategy["revert"]()
self.current_degradations.remove(strategy["name"])
print(f"Reverted degradation: {strategy['name']}")
def _collect_metrics(self) -> Dict[str, float]:
"""Collect system metrics"""
return {
"cpu_percent": psutil.cpu_percent(interval=1),
"memory_percent": psutil.virtual_memory().percent,
"disk_usage": psutil.disk_usage('/').percent,
"active_threads": threading.active_count()
}
def get_health_status(self) -> Dict[str, Any]:
"""Get current health status"""
if not self.metrics_history:
return {"status": "unknown", "degradations": []}
latest_metrics = self.metrics_history[-1]
# Determine overall health
if latest_metrics["cpu_percent"] > self.thresholds.cpu_critical or \
latest_metrics["memory_percent"] > self.thresholds.memory_critical:
status = "critical"
elif latest_metrics["cpu_percent"] > self.thresholds.cpu_high or \
latest_metrics["memory_percent"] > self.thresholds.memory_high:
status = "degraded"
else:
status = "healthy"
return {
"status": status,
"metrics": latest_metrics,
"active_degradations": list(self.current_degradations)
}
# Example usage
degradation_manager = ResourceAwareDegradation()
# Add degradation strategies
degradation_manager.add_degradation_strategy(
name="disable_caching",
condition=lambda m: m["memory_percent"] > 85,
apply=lambda: print("Caching disabled"),
revert=lambda: print("Caching enabled")
)
degradation_manager.add_degradation_strategy(
name="reduce_concurrency",
condition=lambda m: m["cpu_percent"] > 90,
apply=lambda: print("Reduced concurrency"),
revert=lambda: print("Normal concurrency")
)
3. Fallback Chain Pattern
Implement a chain of fallback options:from typing import List, TypeVar, Generic, Optional
T = TypeVar('T')
class FallbackChain(Generic[T]):
def __init__(self):
self.handlers: List[Callable[..., T]] = []
self.fallback_metrics = {
"attempts": 0,
"failures_by_level": {}
}
def add_handler(self, handler: Callable[..., T], name: str = None):
"""Add a handler to the fallback chain"""
handler_name = name or handler.__name__
self.handlers.append((handler_name, handler))
self.fallback_metrics["failures_by_level"][handler_name] = 0
def execute(self, *args, **kwargs) -> Optional[T]:
"""Execute handlers in order until one succeeds"""
self.fallback_metrics["attempts"] += 1
for i, (name, handler) in enumerate(self.handlers):
try:
result = handler(*args, **kwargs)
# Log successful handler
if i > 0:
print(f"Succeeded with fallback handler: {name}")
return result
except Exception as e:
self.fallback_metrics["failures_by_level"][name] += 1
# Log failure and continue to next handler
print(f"Handler '{name}' failed: {str(e)}")
if i == len(self.handlers) - 1:
# Last handler failed
raise Exception("All handlers in fallback chain failed")
return None
def get_metrics(self) -> Dict[str, Any]:
"""Get fallback chain metrics"""
return {
**self.fallback_metrics,
"success_rate": 1 - (sum(self.fallback_metrics["failures_by_level"].values()) /
max(self.fallback_metrics["attempts"], 1))
}
# Example: Multi-level data retrieval
class DataRetriever:
def __init__(self):
self.fallback_chain = FallbackChain[Dict]()
self._setup_fallback_chain()
def _setup_fallback_chain(self):
"""Setup fallback chain for data retrieval"""
# Primary: Fast cache
self.fallback_chain.add_handler(
self._get_from_cache,
"cache"
)
# Secondary: Database
self.fallback_chain.add_handler(
self._get_from_database,
"database"
)
# Tertiary: External API
self.fallback_chain.add_handler(
self._get_from_api,
"external_api"
)
# Last resort: Default/cached data
self.fallback_chain.add_handler(
self._get_default_data,
"default"
)
def get_data(self, key: str) -> Dict:
"""Get data with automatic fallback"""
return self.fallback_chain.execute(key)
def _get_from_cache(self, key: str) -> Dict:
# Simulate cache lookup
if random.random() > 0.8: # 20% cache miss
raise Exception("Cache miss")
return {"source": "cache", "data": f"cached_{key}"}
def _get_from_database(self, key: str) -> Dict:
# Simulate database lookup
if random.random() > 0.9: # 10% failure
raise Exception("Database unavailable")
return {"source": "database", "data": f"db_{key}"}
def _get_from_api(self, key: str) -> Dict:
# Simulate API call
if random.random() > 0.7: # 30% failure
raise Exception("API timeout")
return {"source": "api", "data": f"api_{key}"}
def _get_default_data(self, key: str) -> Dict:
# Always succeeds with default data
return {"source": "default", "data": "default_value"}
PraisonAI now ships a built-in tool circuit breaker that wraps every tool call automatically. See Tool Circuit Breaker. The examples below show how to extend or customise that pattern.
4. Circuit Breaker with Degradation
Combine circuit breaker with graceful degradation:from datetime import datetime, timedelta
class DegradingCircuitBreaker:
def __init__(self, failure_threshold: int = 5,
recovery_timeout: int = 60,
degradation_levels: List[str] = None):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.degradation_levels = degradation_levels or [
"full", "partial", "minimal", "offline"
]
self.failure_count = 0
self.last_failure_time = None
self.current_level_index = 0
self.state = "closed" # closed, open, half-open
@property
def current_level(self) -> str:
"""Get current degradation level"""
return self.degradation_levels[self.current_level_index]
def call(self, func: Callable, fallback: Optional[Callable] = None,
*args, **kwargs) -> Any:
"""Execute function with circuit breaker protection"""
# Check if circuit should be reset
if self.state == "open":
if self._should_attempt_reset():
self.state = "half-open"
else:
# Circuit is open, use fallback or fail
if fallback:
return self._execute_with_degradation(fallback, *args, **kwargs)
raise Exception("Circuit breaker is open")
try:
# Attempt to execute function
result = func(*args, **kwargs)
# Success - reset on half-open
if self.state == "half-open":
self._reset()
return result
except Exception as e:
self._record_failure()
# Use fallback if available
if fallback:
return self._execute_with_degradation(fallback, *args, **kwargs)
raise e
def _record_failure(self):
"""Record a failure and potentially open circuit"""
self.failure_count += 1
self.last_failure_time = datetime.now()
if self.failure_count >= self.failure_threshold:
self.state = "open"
self._degrade()
def _should_attempt_reset(self) -> bool:
"""Check if enough time has passed to attempt reset"""
return (datetime.now() - self.last_failure_time).seconds >= self.recovery_timeout
def _reset(self):
"""Reset circuit breaker"""
self.failure_count = 0
self.last_failure_time = None
self.state = "closed"
self._restore()
def _degrade(self):
"""Move to next degradation level"""
if self.current_level_index < len(self.degradation_levels) - 1:
self.current_level_index += 1
print(f"Degraded to: {self.current_level}")
def _restore(self):
"""Move to previous degradation level"""
if self.current_level_index > 0:
self.current_level_index -= 1
print(f"Restored to: {self.current_level}")
def _execute_with_degradation(self, func: Callable, *args, **kwargs) -> Any:
"""Execute function with current degradation level"""
# Pass degradation level to function
if 'degradation_level' in inspect.signature(func).parameters:
kwargs['degradation_level'] = self.current_level
return func(*args, **kwargs)
5. Adaptive Timeout Pattern
Adjust timeouts based on system performance:import statistics
class AdaptiveTimeout:
def __init__(self, initial_timeout: float = 5.0,
min_timeout: float = 1.0,
max_timeout: float = 30.0):
self.initial_timeout = initial_timeout
self.min_timeout = min_timeout
self.max_timeout = max_timeout
self.current_timeout = initial_timeout
self.response_times = []
self.timeout_history = []
def execute_with_timeout(self, func: Callable, *args, **kwargs) -> Any:
"""Execute function with adaptive timeout"""
import signal
def timeout_handler(signum, frame):
raise TimeoutError(f"Operation timed out after {self.current_timeout}s")
# Set timeout
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(int(self.current_timeout))
start_time = time.time()
try:
result = func(*args, **kwargs)
# Record successful response time
response_time = time.time() - start_time
self._record_response_time(response_time)
return result
except TimeoutError:
# Increase timeout for next attempt
self._increase_timeout()
raise
finally:
# Cancel alarm
signal.alarm(0)
def _record_response_time(self, response_time: float):
"""Record response time and adjust timeout"""
self.response_times.append(response_time)
# Keep only last 100 response times
if len(self.response_times) > 100:
self.response_times.pop(0)
# Adjust timeout based on statistics
if len(self.response_times) >= 10:
# Calculate P95 response time
p95 = statistics.quantiles(self.response_times, n=20)[18] # 95th percentile
# Set timeout to P95 + 50% margin
new_timeout = p95 * 1.5
# Apply bounds
self.current_timeout = max(
self.min_timeout,
min(self.max_timeout, new_timeout)
)
self.timeout_history.append({
"timestamp": time.time(),
"timeout": self.current_timeout,
"based_on_p95": p95
})
def _increase_timeout(self):
"""Increase timeout after failure"""
self.current_timeout = min(
self.max_timeout,
self.current_timeout * 1.5
)
def get_stats(self) -> Dict[str, Any]:
"""Get timeout statistics"""
if not self.response_times:
return {"current_timeout": self.current_timeout}
return {
"current_timeout": self.current_timeout,
"avg_response_time": statistics.mean(self.response_times),
"p95_response_time": statistics.quantiles(self.response_times, n=20)[18],
"timeout_adjustments": len(self.timeout_history)
}
Implementation Strategies
1. Health-Based Routing
Route requests based on service health:class HealthBasedRouter:
def __init__(self):
self.services = {}
self.health_scores = {}
self.routing_stats = defaultdict(int)
def register_service(self, name: str, service: Any,
health_check: Callable[[], float]):
"""Register a service with health check"""
self.services[name] = {
"instance": service,
"health_check": health_check
}
def route_request(self, request: Any) -> Any:
"""Route request to healthiest service"""
# Update health scores
self._update_health_scores()
# Get services sorted by health
healthy_services = [
(name, score) for name, score in self.health_scores.items()
if score > 0.2 # Minimum health threshold
]
if not healthy_services:
raise Exception("No healthy services available")
# Sort by health score
healthy_services.sort(key=lambda x: x[1], reverse=True)
# Try services in order of health
for service_name, health_score in healthy_services:
try:
service = self.services[service_name]["instance"]
result = service.handle_request(request)
self.routing_stats[service_name] += 1
return result
except Exception as e:
print(f"Service {service_name} failed: {e}")
continue
raise Exception("All services failed")
def _update_health_scores(self):
"""Update health scores for all services"""
for name, service_info in self.services.items():
try:
score = service_info["health_check"]()
self.health_scores[name] = score
except:
self.health_scores[name] = 0.0
2. Load Shedding
Drop non-critical requests under load:from enum import Enum
import hashlib
class RequestPriority(Enum):
CRITICAL = 4
HIGH = 3
NORMAL = 2
LOW = 1
class LoadShedder:
def __init__(self, capacity: int = 1000):
self.capacity = capacity
self.current_load = 0
self.shed_threshold = 0.8
self.priority_thresholds = {
RequestPriority.LOW: 0.6,
RequestPriority.NORMAL: 0.8,
RequestPriority.HIGH: 0.9,
RequestPriority.CRITICAL: 1.0
}
self.stats = defaultdict(int)
def should_accept_request(self, request_id: str,
priority: RequestPriority) -> bool:
"""Determine if request should be accepted"""
load_ratio = self.current_load / self.capacity
# Always accept critical requests if possible
if priority == RequestPriority.CRITICAL and load_ratio < 1.0:
return True
# Check against priority threshold
threshold = self.priority_thresholds[priority]
if load_ratio >= threshold:
# Shed request
self.stats[f"shed_{priority.name}"] += 1
return False
# Probabilistic shedding for smoother degradation
if load_ratio > self.shed_threshold:
# Calculate shedding probability
shed_probability = (load_ratio - self.shed_threshold) / (1.0 - self.shed_threshold)
# Use request ID for deterministic random decision
hash_value = int(hashlib.md5(request_id.encode()).hexdigest(), 16)
if (hash_value % 100) / 100 < shed_probability:
self.stats[f"probabilistic_shed_{priority.name}"] += 1
return False
self.stats[f"accepted_{priority.name}"] += 1
return True
def update_load(self, current_load: int):
"""Update current load"""
self.current_load = current_load
def get_shedding_stats(self) -> Dict[str, Any]:
"""Get load shedding statistics"""
total_requests = sum(self.stats.values())
shed_requests = sum(v for k, v in self.stats.items() if 'shed' in k)
return {
"load_ratio": self.current_load / self.capacity,
"total_requests": total_requests,
"shed_requests": shed_requests,
"shed_rate": shed_requests / max(total_requests, 1),
"by_priority": dict(self.stats)
}
Monitoring and Alerting
Degradation Dashboard
class DegradationMonitor:
def __init__(self):
self.services = {}
self.degradation_events = []
self.alert_handlers = []
def register_service(self, service: DegradableService):
"""Register a service for monitoring"""
self.services[service.name] = service
def add_alert_handler(self, handler: Callable[[Dict], None]):
"""Add alert handler"""
self.alert_handlers.append(handler)
def check_services(self):
"""Check all services and generate alerts"""
for name, service in self.services.items():
previous_level = getattr(service, '_previous_level', service.current_level)
if service.current_level != previous_level:
event = {
"timestamp": datetime.now(),
"service": name,
"previous_level": previous_level.value,
"current_level": service.current_level.value,
"direction": "degraded" if service.current_level.value < previous_level.value else "restored"
}
self.degradation_events.append(event)
# Send alerts
for handler in self.alert_handlers:
handler(event)
service._previous_level = service.current_level
def get_system_status(self) -> Dict[str, Any]:
"""Get overall system status"""
service_levels = {}
degraded_count = 0
for name, service in self.services.items():
service_levels[name] = service.current_level.value
if service.current_level != ServiceLevel.FULL:
degraded_count += 1
return {
"overall_health": "healthy" if degraded_count == 0 else "degraded",
"degraded_services": degraded_count,
"total_services": len(self.services),
"service_levels": service_levels,
"recent_events": self.degradation_events[-10:]
}
Best Practices
-
Test Degradation Paths: Regularly test all degradation scenarios
def test_degradation_scenario(): service = IntelligentAssistant("test") # Test each level for level in [ServiceLevel.DEGRADED, ServiceLevel.MINIMAL]: service.degrade() response = service.process_request("test query") assert response is not None assert service.current_level == level -
Monitor Degradation Metrics: Track when and why degradation occurs
def log_degradation_metrics(service_name: str, reason: str, level: str): metrics = { "service": service_name, "reason": reason, "level": level, "timestamp": datetime.now(), "impact": calculate_impact(level) } # Log to monitoring system monitoring.record("degradation", metrics) -
Communicate Status: Keep users informed
def get_user_friendly_status(service_level: ServiceLevel) -> str: messages = { ServiceLevel.FULL: "All features available", ServiceLevel.DEGRADED: "Running with reduced features for stability", ServiceLevel.MINIMAL: "Basic features only - we're working on it", ServiceLevel.OFFLINE: "Service temporarily unavailable" } return messages.get(service_level, "Unknown status")
Testing Graceful Degradation
import pytest
from unittest.mock import Mock, patch
def test_capability_degradation():
assistant = IntelligentAssistant("test")
# Test full capabilities
assert "complex_reasoning" in assistant.get_available_capabilities()
# Test degradation
assistant.degrade()
assert assistant.current_level == ServiceLevel.DEGRADED
assert "complex_reasoning" not in assistant.get_available_capabilities()
assert "simple_reasoning" in assistant.get_available_capabilities()
def test_fallback_chain():
chain = FallbackChain[str]()
# Add handlers
chain.add_handler(lambda: Exception("Primary failed"), "primary")
chain.add_handler(lambda: "fallback_result", "fallback")
# Execute
result = chain.execute()
assert result == "fallback_result"
assert chain.get_metrics()["failures_by_level"]["primary"] == 1
@patch('psutil.cpu_percent')
@patch('psutil.virtual_memory')
def test_resource_degradation(mock_memory, mock_cpu):
# Simulate high CPU
mock_cpu.return_value = 95.0
mock_memory.return_value = Mock(percent=50.0)
manager = ResourceAwareDegradation()
# Add strategy
degraded = False
def set_degraded():
nonlocal degraded
degraded = True
manager.add_degradation_strategy(
"test",
lambda m: m["cpu_percent"] > 90,
set_degraded,
lambda: None
)
manager.check_and_adjust()
assert degraded
assert "test" in manager.current_degradations

