Learn how to create AI agents that can efficiently handle repetitive tasks through automated loops.
A workflow optimization pattern where agents handle repetitive tasks through automated loops, processing multiple instances efficiently while maintaining consistency.
Set your OpenAI API key as an environment variable in your terminal:
Copy
export OPENAI_API_KEY=your_api_key_here
3
Create a file
Create a new file repetitive_agent.py with the basic setup:
Copy
from praisonaiagents import Agent, Workflow, WorkflowContext, StepResultfrom praisonaiagents import loop# Create processor agentprocessor = Agent( name="Processor", role="Task Processor", goal="Process each task item thoroughly", instructions="Process the given task. Provide a detailed response for each item.")# Create summarizer agentsummarizer = Agent( name="Summarizer", role="Results Summarizer", goal="Summarize all processed results", instructions="Summarize all the processed results into a final report.")# Create workflow with loop - processor handles each itemworkflow = Workflow( steps=[ loop(processor, over="topics"), # Agent processes each topic summarizer # Summarize all results ], variables={"topics": ["AI Ethics", "Machine Learning", "Neural Networks"]})result = workflow.start("Research and summarize these AI topics")print(f"Final Summary: {result['output'][:500]}...")
4
Start Workflow
Type this in your terminal to run your workflow:
Copy
python repetitive_agent.py
Requirements
Python 3.10 or higher
OpenAI API key. Generate OpenAI API key here. Use Other models using this guide.
Loop tasks can automatically process CSV and text files to create dynamic subtasks. This powerful feature enables batch processing of data without manual task creation.
from praisonaiagents import Workflow, WorkflowContext, StepResult, Agentfrom praisonaiagents import loop# Create a CSV file with customer issueswith open("customers.csv", "w") as f: f.write("name,issue\n") f.write("John,Billing problem with subscription\n") f.write("Jane,Technical issue with login\n") f.write("Sarah,Request for feature enhancement\n")# Create specialized support agentsupport_agent = Agent( name="Support Agent", role="Customer support specialist", goal="Resolve customer issues efficiently", llm="gpt-4o-mini")# Process each customer using the agentdef handle_customer(ctx: WorkflowContext) -> StepResult: row = ctx.variables.get("item", {}) name = row.get("name", "unknown") issue = row.get("issue", "unknown") # Use agent to handle the issue response = support_agent.chat(f"Help {name} with: {issue}") return StepResult(output=f"{name}: {response}")# Create workflow with loop over CSVworkflow = Workflow( steps=[loop(handle_customer, from_csv="customers.csv")])# Start processingresult = workflow.start("Process all customer issues")# Print resultsprint("Customer Support Results:")for output in result["variables"].get("loop_outputs", []): print(f" {output}")
Process batches of tasks from CSV or other structured files:
Copy
from praisonaiagents import Workflow, WorkflowContext, StepResult, Agentfrom praisonaiagents import loop# Create agent for processing questionsqa_agent = Agent( name="QA Bot", role="Answer questions", goal="Provide accurate answers to user questions")# Process each question using the agentdef answer_question(ctx: WorkflowContext) -> StepResult: row = ctx.variables.get("item", {}) question = row.get("question", "") answer = qa_agent.chat(question) return StepResult(output=f"Q: {question}\nA: {answer}")# Create workflow with loop over CSVworkflow = Workflow( steps=[loop(answer_question, from_csv="questions.csv")])# Run the batch processingresult = workflow.start("Process all questions")print(result["variables"]["loop_outputs"])
The input CSV file should have headers that correspond to task parameters:
Copy
question,context,priority"What is Python?","Programming language context","high""Explain machine learning","AI and ML context","medium""How does Docker work?","Container technology context","high"
from praisonaiagents.callbacks import Callbackclass BatchProgressTracker(Callback): def __init__(self): self.processed = 0 self.total = 0 def on_task_start(self, task, **kwargs): if task.task_type == "loop" and self.total == 0: # Count total items import csv try: with open(task.input_file, 'r', encoding='utf-8') as f: # More efficient for counting lines in a CSV self.total = sum(1 for _ in f) - 1 except FileNotFoundError: print(f"Warning: Input file not found at {task.input_file}. Progress will not be shown.") self.total = 0 def on_subtask_complete(self, subtask, result, **kwargs): self.processed += 1 print(f"Progress: {self.processed}/{self.total} ({self.processed/self.total*100:.1f}%)")# Use progress trackeragents = Agents( agents=[qa_agent], tasks=[loop_task], callbacks=[BatchProgressTracker()])
File Validation: Always validate input files before processing
Copy
import osimport csvdef validate_input_file(filepath): if not os.path.exists(filepath): raise FileNotFoundError(f"Input file not found: {filepath}") with open(filepath, 'r') as f: reader = csv.reader(f) headers = next(reader, None) if not headers: raise ValueError("CSV file is empty or has no headers") return True
Memory Management: For large files, use streaming
Copy
loop_task = Task( name="process_large_file", description="Process item", expected_output="Result", agent=processor, task_type="loop", input_file="large_data.csv", streaming=True, # Process one item at a time chunk_size=100 # Read 100 rows at a time)
Result Storage: Save results progressively
Copy
loop_task = Task( name="process_and_save", description="Process and save", expected_output="Saved result", agent=processor, task_type="loop", input_file="data.csv", output_file="results.csv", # Save results to file append_mode=True # Append results as processed)