Memory Cleanup for Long-Running Apps
Long-running multi-agent applications can accumulate memory over time, leading to performance degradation and potential crashes. This guide covers best practices for effective memory management.Understanding Memory Issues
Common Memory Problems
- Memory Leaks: Unreleased references to objects
- Conversation History Accumulation: Growing chat histories
- Cache Overflow: Unbounded caching
- Circular References: Objects referencing each other
- Resource Handles: Unclosed files, connections, etc.
Memory Management Strategies
1. Conversation History Management
Implement sliding window or summary-based history management:Copy
from collections import deque
from typing import List, Dict, Any
import hashlib
class MemoryEfficientConversationManager:
def __init__(self, max_history_length: int = 100, summary_threshold: int = 50):
self.max_history_length = max_history_length
self.summary_threshold = summary_threshold
self.conversation_history = deque(maxlen=max_history_length)
self.summaries = []
def add_message(self, message: Dict[str, Any]):
"""Add a message to conversation history with automatic cleanup"""
self.conversation_history.append(message)
# Create summary when threshold is reached
if len(self.conversation_history) >= self.summary_threshold:
self._create_summary()
def _create_summary(self):
"""Create a summary of older messages"""
messages_to_summarize = list(self.conversation_history)[:self.summary_threshold//2]
# In production, use an LLM to create actual summaries
summary = {
"type": "summary",
"message_count": len(messages_to_summarize),
"timestamp": messages_to_summarize[0]["timestamp"],
"key_points": self._extract_key_points(messages_to_summarize)
}
self.summaries.append(summary)
# Remove summarized messages
for _ in range(len(messages_to_summarize)):
self.conversation_history.popleft()
def _extract_key_points(self, messages: List[Dict]) -> List[str]:
"""Extract key points from messages (simplified version)"""
# In production, use NLP or LLM for better extraction
return [msg.get("content", "")[:50] + "..." for msg in messages[-3:]]
def get_context(self, last_n: int = 10) -> List[Dict]:
"""Get recent context including summaries"""
context = []
# Add relevant summaries
if self.summaries:
context.extend(self.summaries[-2:]) # Last 2 summaries
# Add recent messages
recent_messages = list(self.conversation_history)[-last_n:]
context.extend(recent_messages)
return context
def cleanup(self):
"""Explicit cleanup method"""
self.conversation_history.clear()
self.summaries.clear()
2. Agent Memory Management
Implement memory limits and cleanup for agents:Copy
import gc
import weakref
from datetime import datetime, timedelta
class MemoryManagedAgent:
def __init__(self, name: str, memory_limit_mb: int = 100):
self.name = name
self.memory_limit_mb = memory_limit_mb
self.created_at = datetime.now()
self.last_cleanup = datetime.now()
self._memory_store = {}
self._weak_refs = weakref.WeakValueDictionary()
def store_memory(self, key: str, value: Any, weak: bool = False):
"""Store data with option for weak references"""
if weak:
self._weak_refs[key] = value
else:
self._memory_store[key] = value
# Check memory usage
if self._estimate_memory_usage() > self.memory_limit_mb:
self._cleanup_old_memories()
def _estimate_memory_usage(self) -> float:
"""Estimate memory usage in MB"""
import sys
total_size = 0
for obj in self._memory_store.values():
total_size += sys.getsizeof(obj)
return total_size / (1024 * 1024)
def _cleanup_old_memories(self):
"""Remove old or less important memories"""
# Sort by age or importance (simplified)
if hasattr(self, 'memory_importance'):
sorted_keys = sorted(
self._memory_store.keys(),
key=lambda k: self.memory_importance.get(k, 0)
)
else:
sorted_keys = list(self._memory_store.keys())
# Remove least important/oldest 20%
remove_count = len(sorted_keys) // 5
for key in sorted_keys[:remove_count]:
del self._memory_store[key]
# Force garbage collection
gc.collect()
self.last_cleanup = datetime.now()
def periodic_cleanup(self):
"""Run periodic cleanup tasks"""
if datetime.now() - self.last_cleanup > timedelta(minutes=30):
self._cleanup_old_memories()
gc.collect()
3. Resource Pool Management
Implement resource pooling to prevent resource exhaustion:Copy
from contextlib import contextmanager
import threading
from queue import Queue, Empty
class ResourcePool:
def __init__(self, factory, max_size: int = 10, cleanup_func=None):
self.factory = factory
self.max_size = max_size
self.cleanup_func = cleanup_func
self.pool = Queue(maxsize=max_size)
self.size = 0
self.lock = threading.Lock()
@contextmanager
def acquire(self, timeout: float = 30):
"""Acquire a resource from the pool"""
resource = None
try:
# Try to get from pool
try:
resource = self.pool.get(timeout=timeout)
except Empty:
# Create new resource if under limit
with self.lock:
if self.size < self.max_size:
resource = self.factory()
self.size += 1
else:
raise TimeoutError("Resource pool exhausted")
yield resource
finally:
# Return resource to pool
if resource is not None:
try:
self.pool.put_nowait(resource)
except:
# Pool is full, cleanup resource
if self.cleanup_func:
self.cleanup_func(resource)
with self.lock:
self.size -= 1
def cleanup_all(self):
"""Clean up all resources in the pool"""
while not self.pool.empty():
try:
resource = self.pool.get_nowait()
if self.cleanup_func:
self.cleanup_func(resource)
except Empty:
break
self.size = 0
4. Cache Management
Implement LRU cache with memory limits:Copy
from functools import lru_cache
import sys
from collections import OrderedDict
class MemoryBoundedLRUCache:
def __init__(self, max_memory_mb: int = 50, max_items: int = 1000):
self.max_memory_mb = max_memory_mb
self.max_items = max_items
self.cache = OrderedDict()
self.memory_usage = 0
def get(self, key):
"""Get item from cache"""
if key in self.cache:
# Move to end (most recently used)
self.cache.move_to_end(key)
return self.cache[key]
return None
def put(self, key, value):
"""Put item in cache with memory management"""
value_size = sys.getsizeof(value)
# Remove items if memory limit exceeded
while (self.memory_usage + value_size > self.max_memory_mb * 1024 * 1024 or
len(self.cache) >= self.max_items):
if not self.cache:
break
# Remove least recently used
oldest_key = next(iter(self.cache))
oldest_value = self.cache.pop(oldest_key)
self.memory_usage -= sys.getsizeof(oldest_value)
# Add new item
self.cache[key] = value
self.memory_usage += value_size
def clear(self):
"""Clear the cache"""
self.cache.clear()
self.memory_usage = 0
Memory Monitoring
1. Memory Usage Tracking
Copy
import psutil
import os
from datetime import datetime
class MemoryMonitor:
def __init__(self, alert_threshold_percent: float = 80):
self.alert_threshold_percent = alert_threshold_percent
self.process = psutil.Process(os.getpid())
self.memory_history = []
def get_memory_info(self) -> Dict[str, Any]:
"""Get current memory usage information"""
memory_info = self.process.memory_info()
memory_percent = self.process.memory_percent()
info = {
"timestamp": datetime.now(),
"rss_mb": memory_info.rss / 1024 / 1024,
"vms_mb": memory_info.vms / 1024 / 1024,
"percent": memory_percent,
"available_mb": psutil.virtual_memory().available / 1024 / 1024
}
self.memory_history.append(info)
# Keep only last hour of history
cutoff = datetime.now() - timedelta(hours=1)
self.memory_history = [
h for h in self.memory_history
if h["timestamp"] > cutoff
]
return info
def check_memory_health(self) -> Tuple[bool, str]:
"""Check if memory usage is healthy"""
info = self.get_memory_info()
if info["percent"] > self.alert_threshold_percent:
return False, f"Memory usage critical: {info['percent']:.1f}%"
# Check for memory growth trend
if len(self.memory_history) > 10:
recent = self.memory_history[-10:]
growth_rate = (recent[-1]["rss_mb"] - recent[0]["rss_mb"]) / len(recent)
if growth_rate > 10: # Growing > 10MB per check
return False, f"Memory growing rapidly: {growth_rate:.1f}MB/check"
return True, "Memory usage normal"
2. Automatic Garbage Collection
Copy
import gc
import schedule
from typing import Callable
class AutomaticMemoryManager:
def __init__(self):
self.cleanup_callbacks: List[Callable] = []
self.last_gc_stats = None
def register_cleanup(self, callback: Callable):
"""Register a cleanup callback"""
self.cleanup_callbacks.append(callback)
def aggressive_cleanup(self):
"""Perform aggressive memory cleanup"""
# Run all registered cleanup callbacks
for callback in self.cleanup_callbacks:
try:
callback()
except Exception as e:
print(f"Cleanup callback failed: {e}")
# Force garbage collection
gc.collect(2) # Full collection
# Get statistics
self.last_gc_stats = {
"collected": sum(gc.get_count()),
"uncollectable": len(gc.garbage),
"timestamp": datetime.now()
}
return self.last_gc_stats
def setup_automatic_cleanup(self, interval_minutes: int = 30):
"""Setup periodic automatic cleanup"""
schedule.every(interval_minutes).minutes.do(self.aggressive_cleanup)
# Also cleanup on high memory usage
def conditional_cleanup():
memory_monitor = MemoryMonitor()
healthy, _ = memory_monitor.check_memory_health()
if not healthy:
self.aggressive_cleanup()
schedule.every(5).minutes.do(conditional_cleanup)
Best Practices
1. Use Context Managers
Always use context managers for resource management:Copy
class ManagedAgentSession:
def __init__(self, agent_factory):
self.agent_factory = agent_factory
self.agents = []
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
# Cleanup all agents
for agent in self.agents:
agent.cleanup()
self.agents.clear()
gc.collect()
def create_agent(self, *args, **kwargs):
agent = self.agent_factory(*args, **kwargs)
self.agents.append(agent)
return agent
2. Implement Memory Budgets
Set memory budgets for different components:Copy
class MemoryBudgetManager:
def __init__(self, total_budget_mb: int = 1000):
self.total_budget_mb = total_budget_mb
self.allocations = {}
def allocate(self, component: str, budget_mb: int) -> bool:
"""Allocate memory budget to a component"""
current_allocated = sum(self.allocations.values())
if current_allocated + budget_mb > self.total_budget_mb:
return False
self.allocations[component] = budget_mb
return True
def check_usage(self, component: str, current_usage_mb: float) -> bool:
"""Check if component is within budget"""
if component not in self.allocations:
return False
return current_usage_mb <= self.allocations[component]
3. Profile Memory Usage
Regular profiling helps identify memory issues:Copy
import tracemalloc
from typing import List, Tuple
class MemoryProfiler:
def __init__(self):
self.snapshots = []
def start_profiling(self):
"""Start memory profiling"""
tracemalloc.start()
self.take_snapshot("start")
def take_snapshot(self, label: str):
"""Take a memory snapshot"""
snapshot = tracemalloc.take_snapshot()
self.snapshots.append((label, snapshot))
def get_top_allocations(self, limit: int = 10) -> List[Tuple[str, float]]:
"""Get top memory allocations"""
if len(self.snapshots) < 2:
return []
_, snapshot1 = self.snapshots[-2]
_, snapshot2 = self.snapshots[-1]
stats = snapshot2.compare_to(snapshot1, 'lineno')
results = []
for stat in stats[:limit]:
results.append((
f"{stat.traceback[0].filename}:{stat.traceback[0].lineno}",
stat.size_diff / 1024 / 1024 # Convert to MB
))
return results
def stop_profiling(self):
"""Stop profiling and cleanup"""
tracemalloc.stop()
self.snapshots.clear()
Common Pitfalls
- Unbounded Collections: Always set limits on collections
- Circular References: Use weak references where appropriate
- Global State: Minimize global state that accumulates data
- Event Listeners: Always unregister event listeners
- Thread Local Storage: Clean up thread-local data
Testing Memory Management
Copy
import pytest
import time
def test_memory_bounded_cache():
cache = MemoryBoundedLRUCache(max_memory_mb=1, max_items=100)
# Fill cache beyond memory limit
for i in range(200):
cache.put(f"key_{i}", "x" * 10000) # ~10KB per item
# Cache should have evicted old items
assert len(cache.cache) < 200
assert cache.get("key_0") is None # Old items evicted
assert cache.get("key_199") is not None # Recent items kept
def test_conversation_manager_cleanup():
manager = MemoryEfficientConversationManager(max_history_length=10)
# Add many messages
for i in range(20):
manager.add_message({"content": f"Message {i}", "timestamp": time.time()})
# Should not exceed max length
assert len(manager.conversation_history) <= 10
# Should have created summaries
assert len(manager.summaries) > 0

