# recipe.py
import json
from praisonaiagents import Agent, Task, AgentTeam
def run(input_data: dict, config: dict = None) -> dict:
"""Extract action items from meeting transcript."""
transcript_text = input_data.get("transcript_text")
attendees = input_data.get("attendees", [])
output_format = input_data.get("format", "markdown")
if not transcript_text:
return {"ok": False, "error": {"code": "MISSING_INPUT", "message": "transcript_text is required"}}
try:
attendees_str = ", ".join(attendees) if attendees else "Not specified"
# Create analyzer agent
analyzer = Agent(
name="Meeting Analyst",
role="Meeting Content Specialist",
goal="Analyze meeting content and identify key points",
instructions="""
You are a meeting analysis expert.
- Identify main topics discussed
- Note decisions made
- Find commitments and deadlines
- Track who said what when relevant
""",
)
# Create action extractor
extractor = Agent(
name="Action Extractor",
role="Action Item Specialist",
goal="Extract clear, actionable items",
instructions=f"""
You are an action item extraction expert.
Known attendees: {attendees_str}
For each action item, identify:
- Task description
- Assignee (if mentioned)
- Due date (if mentioned)
- Priority (if implied)
Output as JSON array.
""",
)
# Create formatter
formatter = Agent(
name="Minutes Formatter",
role="Documentation Specialist",
goal=f"Format minutes in {output_format}",
instructions=f"""
You are a documentation specialist.
Format meeting minutes in {output_format} format.
Include:
- Meeting summary
- Key discussion points
- Decisions made
- Action items section
""",
)
# Define tasks
analyze_task = Task(
name="analyze_meeting",
description=f"Analyze this meeting transcript:\n\n{transcript_text[:8000]}",
expected_output="Meeting analysis with key points",
agent=analyzer,
)
extract_task = Task(
name="extract_actions",
description="Extract action items from the meeting analysis",
expected_output='JSON array: [{"task": "...", "assignee": "...", "due": "...", "priority": "..."}]',
agent=extractor,
context=[analyze_task],
)
format_task = Task(
name="format_minutes",
description=f"Format complete meeting minutes in {output_format}",
expected_output=f"Formatted meeting minutes in {output_format}",
agent=formatter,
context=[analyze_task, extract_task],
)
# Execute
agents = AgentTeam(
agents=[analyzer, extractor, formatter],
tasks=[analyze_task, extract_task, format_task],
)
result = agents.start()
# Parse action items
actions_text = result.get("extract_actions", "[]")
action_items = parse_action_items(actions_text)
return {
"ok": True,
"minutes": result.get("format_minutes", ""),
"action_items": action_items,
"artifacts": [],
"warnings": [],
}
except Exception as e:
return {"ok": False, "error": {"code": "PROCESSING_ERROR", "message": str(e)}}
def parse_action_items(text: str) -> list:
"""Parse action items from agent output."""
import re
try:
match = re.search(r'\[.*\]', text, re.DOTALL)
if match:
return json.loads(match.group())
except json.JSONDecodeError:
pass
# Fallback: parse bullet points
items = []
for line in text.split('\n'):
line = line.strip()
if line.startswith(('-', '*', '•')) and len(line) > 2:
items.append({"task": line[1:].strip(), "assignee": None, "due": None})
return items