# recipe.py
import os
import json
import re
from praisonaiagents import Agent, Task, AgentTeam
def run(input_data: dict, config: dict = None) -> dict:
"""Plan video highlight clips from transcript."""
transcript_text = input_data.get("transcript_text")
video_path = input_data.get("video_path")
highlight_goal = input_data.get("highlight_goal", "engaging")
max_clips = input_data.get("max_clips", 10)
if not transcript_text and not video_path:
return {"ok": False, "error": {"code": "MISSING_INPUT", "message": "Either transcript_text or video_path is required"}}
if video_path and not transcript_text:
if not os.path.exists(video_path):
return {"ok": False, "error": {"code": "FILE_NOT_FOUND", "message": f"Video not found: {video_path}"}}
return {"ok": False, "error": {"code": "NOT_IMPLEMENTED", "message": "Video transcription not implemented. Provide transcript_text."}}
try:
goal_guidelines = {
"engaging": "Find moments that grab attention, surprising reveals, or compelling stories",
"educational": "Find clear explanations, key insights, and teachable moments",
"funny": "Find humorous moments, jokes, and entertaining segments",
"emotional": "Find touching moments, inspiring stories, and emotional peaks",
"actionable": "Find practical tips, how-to segments, and advice"
}
# Create highlight finder agent
finder = Agent(
name="Highlight Finder",
role="Video Content Analyst",
goal=f"Find the most {highlight_goal} moments",
instructions=f"""
You are a video content analyst specializing in finding highlights.
Goal: {highlight_goal} - {goal_guidelines[highlight_goal]}
For each highlight:
- Identify start and end timestamps
- Explain why it's highlight-worthy
- Suggest a clip duration (15-60 seconds ideal)
- Rate engagement potential (1-10)
Find up to {max_clips} highlights.
""",
)
# Create title generator
titler = Agent(
name="Title Generator",
role="Social Media Specialist",
goal="Create compelling clip titles",
instructions="""
You are a social media specialist.
- Create catchy, clickable titles
- Keep titles under 60 characters
- Use power words and hooks
- Match the content tone
""",
)
# Define tasks
find_task = Task(
name="find_highlights",
description=f"""
Analyze this transcript and find up to {max_clips} {highlight_goal} highlights:
{transcript_text[:10000]}
Output as JSON array:
[{{
"start_time": "MM:SS",
"end_time": "MM:SS",
"description": "What happens",
"why_highlight": "Why it's engaging",
"engagement_score": 8
}}]
""",
expected_output="JSON array of highlight clips",
agent=finder,
)
title_task = Task(
name="generate_titles",
description="Generate compelling titles for each highlight clip",
expected_output="List of titles matching each clip",
agent=titler,
context=[find_task],
)
# Execute
agents = AgentTeam(
agents=[finder, titler],
tasks=[find_task, title_task],
)
result = agents.start()
# Parse clips
clips_text = result.get("find_highlights", "[]")
clips = parse_clips(clips_text)
# Parse titles
titles_text = result.get("generate_titles", "")
titles = [t.strip() for t in titles_text.split("\n") if t.strip() and len(t.strip()) > 5]
return {
"ok": True,
"clip_plan_json": clips[:max_clips],
"title_suggestions": titles[:max_clips],
"artifacts": [],
"warnings": [],
}
except Exception as e:
return {"ok": False, "error": {"code": "PROCESSING_ERROR", "message": str(e)}}
def parse_clips(text: str) -> list:
"""Parse clip plan from agent output."""
try:
match = re.search(r'\[.*\]', text, re.DOTALL)
if match:
return json.loads(match.group())
except json.JSONDecodeError:
pass
# Fallback parsing
clips = []
current_clip = {}
for line in text.split('\n'):
if 'start' in line.lower() and ':' in line:
time_match = re.search(r'(\d{1,2}:\d{2}(?::\d{2})?)', line)
if time_match:
current_clip['start_time'] = time_match.group(1)
elif 'end' in line.lower() and ':' in line:
time_match = re.search(r'(\d{1,2}:\d{2}(?::\d{2})?)', line)
if time_match:
current_clip['end_time'] = time_match.group(1)
elif 'description' in line.lower():
current_clip['description'] = line.split(':', 1)[-1].strip()
if current_clip.get('start_time') and current_clip.get('end_time'):
clips.append(current_clip)
current_clip = {}
return clips