Setup
Copy
# Create environment
python3 -m venv venv && source venv/bin/activate
# Install packages
pip install praisonaiagents praisonai
# Set API key
export OPENAI_API_KEY="your-key"
Create Sample Data
Copy
# Create incident reports
cat > incidents.json << 'EOF'
[
{"id": "INC001", "type": "fire", "location": "123 Main St", "casualties": 0, "severity": "high"},
{"id": "INC002", "type": "medical", "location": "456 Oak Ave", "casualties": 1, "severity": "critical"},
{"id": "INC003", "type": "traffic", "location": "Highway 101", "casualties": 0, "severity": "medium"}
]
EOF
# Create resource inventory
cat > resources.json << 'EOF'
{
"fire_units": [{"id": "FD01", "status": "available"}, {"id": "FD02", "status": "deployed"}],
"ambulances": [{"id": "AMB01", "status": "available"}, {"id": "AMB02", "status": "available"}],
"police": [{"id": "PD01", "status": "available"}, {"id": "PD02", "status": "available"}]
}
EOF
Run: Python Code
Copy
from praisonaiagents import Agent, Agents, Task, tool
from praisonaiagents import Agent, Agents, Task, tool, db
from pydantic import BaseModel
from typing import List
import json
# Structured output
class DispatchOrder(BaseModel):
incident_id: str
severity: str
units_dispatched: List[str]
eta_minutes: int
coordinator: str
# Database persistence
db_instance = db(database_url="sqlite:///emergency.db")
# Tools
@tool
def get_incident(incident_id: str) -> str:
"""Get incident details."""
with open("incidents.json") as f:
incidents = json.load(f)
for inc in incidents:
if inc["id"] == incident_id:
return json.dumps(inc)
return json.dumps({"error": "Incident not found"})
@tool
def check_available_resources(resource_type: str) -> str:
"""Check available resources by type."""
with open("resources.json") as f:
resources = json.load(f)
available = [r for r in resources.get(resource_type, []) if r["status"] == "available"]
return json.dumps({"type": resource_type, "available": available, "count": len(available)})
@tool
def dispatch_unit(unit_id: str, incident_id: str) -> str:
"""Dispatch a unit to an incident."""
return json.dumps({"unit_id": unit_id, "incident_id": incident_id, "status": "dispatched", "eta_minutes": 8})
# Agents
assessor = Agent(
name="IncidentAssessor",
instructions="Assess incident severity and type. Use get_incident tool.",
tools=[get_incident],
db=db_instance,
session_id="emergency-dispatch"
)
dispatcher = Agent(
name="ResourceDispatcher",
instructions="""Dispatch appropriate resources:
- critical: ambulance + fire + police
- high: fire + police
- medium: police
Use check_available_resources and dispatch_unit tools.""",
tools=[check_available_resources, dispatch_unit]
)
coordinator = Agent(
name="ResponseCoordinator",
instructions="Coordinate response and provide ETA. Generate structured dispatch order."
)
# Tasks
assess_task = Task(
description="Assess incident: {incident_id}",
agent=assessor,
expected_output="Incident details with severity"
)
dispatch_task = Task(
description="Dispatch appropriate resources based on severity",
agent=dispatcher,
expected_output="List of dispatched units"
)
coordinate_task = Task(
description="Coordinate response and generate dispatch order",
agent=coordinator,
expected_output="Structured dispatch order",
output_pydantic=DispatchOrder
)
# Run
agents = Agents(agents=[assessor, dispatcher, coordinator], tasks=[assess_task, dispatch_task, coordinate_task])
result = agents.start(incident_id="INC002")
print(result)
Run: CLI
Copy
# Single incident
praisonai "Respond to fire emergency at 123 Main St" --verbose
# With persistence
praisonai "Medical emergency with casualties" --memory --user-id dispatch_center
# Interactive dispatch mode
praisonai --chat-mode --memory --user-id emergency_ops
Run: agents.yaml
Createagents.yaml:
Copy
framework: praisonai
topic: "emergency response coordination"
roles:
assessor:
role: Incident Assessor
goal: Evaluate incident severity
backstory: Expert at emergency triage
tasks:
assess:
description: |
Assess incident:
- Type (fire, medical, traffic, crime)
- Severity (critical, high, medium, low)
- Casualties
- Location
expected_output: Incident assessment with severity
dispatcher:
role: Resource Dispatcher
goal: Deploy appropriate resources
backstory: Expert at resource allocation
tasks:
dispatch:
description: |
Dispatch based on severity:
- Critical: all units
- High: 2+ units
- Medium: 1 unit
- Low: advisory only
expected_output: Dispatch orders with ETAs
coordinator:
role: Response Coordinator
goal: Coordinate multi-agency response
backstory: Expert at emergency coordination
tasks:
coordinate:
description: |
Coordinate response:
- Assign incident commander
- Set up communication
- Track unit status
expected_output: Coordination plan
Copy
praisonai agents.yaml --verbose
Monitor & Verify
Copy
# View dispatch history
praisonai --history 20 --user-id dispatch_center
# Check metrics
praisonai --metrics
# Export incident log
praisonai --save incident_log
Serve API
Copy
from praisonaiagents import Agent, tool
import json
@tool
def quick_dispatch(incident_type: str, severity: str) -> str:
"""Quick dispatch recommendation."""
dispatch_matrix = {
("fire", "critical"): ["FD01", "FD02", "AMB01", "PD01"],
("fire", "high"): ["FD01", "PD01"],
("medical", "critical"): ["AMB01", "AMB02"],
("medical", "high"): ["AMB01"],
("traffic", "medium"): ["PD01"]
}
units = dispatch_matrix.get((incident_type, severity), ["PD01"])
return json.dumps({"units": units, "eta_minutes": 8})
agent = Agent(
name="EmergencyAPI",
instructions="Provide dispatch recommendations for emergencies.",
tools=[quick_dispatch]
)
agent.launch(path="/dispatch", port=8000)
Copy
curl -X POST http://localhost:8000/dispatch \
-H "Content-Type: application/json" \
-d '{"message": "Fire emergency, severity critical"}'
Cleanup
Copy
rm -f emergency.db incidents.json resources.json
deactivate
Features Demonstrated
| Feature | Implementation |
|---|---|
| Multi-agent | Assessor → Dispatcher → Coordinator |
| Structured Output | Pydantic DispatchOrder |
| Resource Management | Unit availability checking |
| DB Persistence | SQLite via db() |
| CLI | --chat-mode for interactive |
| YAML Config | 3-agent response pipeline |
| API Endpoint | agent.launch() |
| Severity Routing | Priority-based dispatch |

