Manage context size and information flow in long workflows
retain_full_context
parameter:
from praisonaiagents import Agent, Task, PraisonAIAgents
# Create an agent
processor = Agent(
name="DataProcessor",
role="Data analysis expert",
goal="Process and analyze data efficiently"
)
# Task with minimal context (only passes its output)
task1 = Task(
name="extract_data",
description="Extract data from source files",
expected_output="Extracted data in JSON format",
agent=processor,
retain_full_context=False # Only pass this task's output forward
)
# Task that needs full historical context
task2 = Task(
name="analyze_trends",
description="Analyze trends across all previous data",
expected_output="Comprehensive trend analysis",
agent=processor,
retain_full_context=True # Include all previous outputs
)
# Create and run workflow
agents = PraisonAIAgents(
agents=[processor],
tasks=[task1, task2],
process="sequential"
)
result = agents.start()
from praisonaiagents import Agent, Task, PraisonAIAgents
# Define agents for the pipeline
data_loader = Agent(
name="DataLoader",
role="Data loading specialist",
goal="Load and prepare data efficiently"
)
data_cleaner = Agent(
name="DataCleaner",
role="Data preprocessing expert",
goal="Clean and preprocess data"
)
analyst = Agent(
name="Analyst",
role="Statistical analyst",
goal="Perform comprehensive data analysis"
)
# Data processing pipeline with selective context
raw_data_task = Task(
name="load_raw_data",
description="Load 10GB of raw data",
expected_output="Data loading summary",
agent=data_loader,
retain_full_context=False # Don't pass raw data forward
)
clean_data_task = Task(
name="clean_data",
description="Clean and preprocess data",
expected_output="Cleaned data statistics",
agent=data_cleaner,
retain_full_context=False # Only pass statistics forward
)
analyze_task = Task(
name="analyze_data",
description="Perform statistical analysis",
expected_output="Analysis results",
agent=analyst,
retain_full_context=True # Need all previous summaries
)
# Define the summarizer agent
summarizer = Agent(
name="Summarizer",
role="Content summarizer",
goal="Create concise and accurate summaries"
)
# Configure context window sizes
summary_task = Task(
name="summarize_chapter",
description="Summarize this book chapter",
expected_output="Chapter summary",
agent=summarizer,
retain_full_context=False,
max_context_length=2000 # Limit input context size
)
# Task that needs more context
final_summary = Task(
name="create_book_summary",
description="Create overall book summary from chapter summaries",
expected_output="Complete book summary",
agent=summarizer,
retain_full_context=True,
max_context_length=8000 # Larger context for all summaries
)
from praisonaiagents import Task, ContextFilter
# Custom context filter
class DataContextFilter(ContextFilter):
def filter(self, context):
"""Keep only specific data types"""
filtered = {}
for key, value in context.items():
if key in ['summary', 'metrics', 'errors']:
filtered[key] = value
return filtered
# Apply filter to task
analysis_task = Task(
name="final_analysis",
description="Analyze filtered results",
expected_output="Analysis report",
agent=analyst,
context_filter=DataContextFilter()
)
class DynamicContextWorkflow:
def __init__(self):
self.context_size = 0
self.max_context = 4000
def create_task(self, name, description, agent):
# Dynamically decide context retention
retain_full = self.context_size < self.max_context
task = Task(
name=name,
description=description,
expected_output=f"Output of {name}",
agent=agent,
retain_full_context=retain_full
)
# Estimate context growth
self.context_size += len(description) + 500
return task
# Define the processor agent
processor = Agent(
name="Processor",
role="Data processor",
goal="Process information efficiently"
)
# Usage
workflow = DynamicContextWorkflow()
tasks = []
for i in range(10):
task = workflow.create_task(
name=f"step_{i}",
description=f"Process step {i}",
agent=processor
)
tasks.append(task)
from praisonaiagents import Agent, Task, PraisonAIAgents
# Summarizer agent
summarizer = Agent(
name="ContextSummarizer",
role="Information summarizer",
goal="Create concise summaries of large contexts"
)
# Regular processing agent
processor = Agent(
name="Processor",
role="Data processor",
goal="Process information efficiently"
)
# Create tasks with periodic summarization
tasks = []
for i in range(20):
if i % 5 == 4: # Every 5th task
# Insert summarization task
summary_task = Task(
name=f"summarize_up_to_{i}",
description="Summarize all previous outputs",
expected_output="Concise summary of progress",
agent=summarizer,
retain_full_context=True # Get all context
)
tasks.append(summary_task)
# Next task gets only summary
process_task = Task(
name=f"process_{i}",
description=f"Continue processing step {i}",
expected_output=f"Result of step {i}",
agent=processor,
retain_full_context=False # Only summary
)
else:
# Regular task
process_task = Task(
name=f"process_{i}",
description=f"Process step {i}",
expected_output=f"Result of step {i}",
agent=processor,
retain_full_context=False
)
tasks.append(process_task)
from praisonaiagents import Agent, Task
# Define the required agents
extractor = Agent(
name="DataExtractor",
role="Data extraction specialist",
goal="Extract specific information from data"
)
analyst = Agent(
name="Analyst",
role="Data analyst",
goal="Analyze customer data"
)
# Task that extracts specific information
extract_task = Task(
name="extract_key_data",
description="Extract only customer IDs and totals",
expected_output="List of customer IDs and order totals",
agent=extractor,
retain_full_context=False,
output_filter=lambda x: {
"customer_ids": x.get("customer_ids", []),
"totals": x.get("totals", [])
}
)
# Task that uses filtered information
process_task = Task(
name="process_customers",
description="Process customer data",
expected_output="Customer analysis",
agent=analyst,
# Only receives customer_ids and totals from previous task
)
# Define pipeline agents
fetcher = Agent(
name="DataFetcher",
role="API data retrieval specialist",
goal="Fetch data from external APIs"
)
validator = Agent(
name="DataValidator",
role="Data validation expert",
goal="Ensure data quality and format"
)
transformer = Agent(
name="DataTransformer",
role="Data transformation specialist",
goal="Transform data to target format"
)
storer = Agent(
name="DataStorer",
role="Database storage specialist",
goal="Store data securely in database"
)
# Each task only passes essential data forward
pipeline_tasks = [
Task(
name="fetch_data",
description="Fetch data from API",
expected_output="Data fetched confirmation",
agent=fetcher,
retain_full_context=False
),
Task(
name="validate_data",
description="Validate data format",
expected_output="Validation report",
agent=validator,
retain_full_context=False
),
Task(
name="transform_data",
description="Transform to target format",
expected_output="Transformation complete",
agent=transformer,
retain_full_context=False
),
Task(
name="store_data",
description="Store in database",
expected_output="Storage confirmation",
agent=storer,
retain_full_context=False
)
]
# Define processor agent
processor = Agent(
name="Processor",
role="Task processor",
goal="Process tasks efficiently"
)
checkpoint_interval = 5
tasks = []
for i in range(20):
is_checkpoint = (i % checkpoint_interval == 0)
task = Task(
name=f"task_{i}",
description=f"Process item {i}",
expected_output=f"Result {i}",
agent=processor,
retain_full_context=is_checkpoint,
metadata={"checkpoint": is_checkpoint}
)
tasks.append(task)
# Define agents for hierarchical processing
detail_processor = Agent(
name="DetailProcessor",
role="Detail-level processor",
goal="Process individual records efficiently"
)
summarizer = Agent(
name="Summarizer",
role="Batch summarizer",
goal="Create concise batch summaries"
)
analyst = Agent(
name="Analyst",
role="Comprehensive analyst",
goal="Perform deep analysis on all data"
)
# Detail tasks - minimal context
detail_task = Task(
name="process_detail",
description="Process individual record",
expected_output="Processed record",
agent=detail_processor,
retain_full_context=False
)
# Summary tasks - medium context
summary_task = Task(
name="create_summary",
description="Summarize batch of records",
expected_output="Batch summary",
agent=summarizer,
retain_full_context=False,
include_previous_n_tasks=5 # Last 5 tasks only
)
# Analysis tasks - full context
analysis_task = Task(
name="analyze_all",
description="Comprehensive analysis",
expected_output="Full analysis report",
agent=analyst,
retain_full_context=True
)
# Document context flow
workflow_plan = """
1. Load Data (Large) -> retain_full_context=False
2. Clean Data -> retain_full_context=False
3. Extract Features -> retain_full_context=False
4. Summarize Features -> retain_full_context=True
5. Train Model -> retain_full_context=False
6. Evaluate Model -> retain_full_context=True
"""
from praisonaiagents.callbacks import Callback
class ContextMonitor(Callback):
def on_task_start(self, task, context, **kwargs):
context_size = len(str(context))
print(f"Task {task.name} context size: {context_size} chars")
if context_size > 10000:
print(f"WARNING: Large context for {task.name}")
# Use monitor
agents = PraisonAIAgents(
agents=[processor],
tasks=tasks,
callbacks=[ContextMonitor()]
)
def create_summary_checkpoint(previous_tasks, agent):
"""Create a summary checkpoint after a batch of tasks"""
return Task(
name=f"summary_checkpoint_{len(previous_tasks)}",
description=f"Summarize outputs from the last {len(previous_tasks)} tasks",
expected_output="Concise summary of key findings",
agent=agent,
retain_full_context=True, # Get all context
next_retain_full_context=False # But don't pass it all forward
)
# Calculate optimal context retention
def should_retain_full_context(task_index, total_tasks):
"""Determine if task should retain full context"""
# Key decision points need full context
decision_points = [0, total_tasks // 2, total_tasks - 1]
# Summary tasks need full context
is_summary = (task_index % 10 == 9)
return task_index in decision_points or is_summary
from praisonaiagents import Task
# Clear unnecessary data
class MemoryEfficientTask(Task):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.clear_on_complete = kwargs.get('clear_on_complete', True)
def on_complete(self, result):
if self.clear_on_complete and not self.retain_full_context:
# Clear large data after task completes
self.context = {"summary": result.summary}
retain_full_context=False
for most tasks# Debug context flow
agents = PraisonAIAgents(
agents=[processor],
tasks=tasks,
verbose=True, # Show context at each step
debug_context=True # Additional context debugging
)