Skip to main content
Multi-agent emergency response: incident assessment → severity routing → resource dispatch → response monitoring. Includes SQLite incident logging, structured alerts, and API deployment.

Setup

# Create environment
python3 -m venv venv && source venv/bin/activate

# Install packages
pip install praisonaiagents praisonai

# Set API key
export OPENAI_API_KEY="your-key"

Create Sample Data

# Create incident reports
cat > incidents.json << 'EOF'
[
  {"id": "INC001", "type": "fire", "location": "123 Main St", "casualties": 0, "severity": "high"},
  {"id": "INC002", "type": "medical", "location": "456 Oak Ave", "casualties": 1, "severity": "critical"},
  {"id": "INC003", "type": "traffic", "location": "Highway 101", "casualties": 0, "severity": "medium"}
]
EOF

# Create resource inventory
cat > resources.json << 'EOF'
{
  "fire_units": [{"id": "FD01", "status": "available"}, {"id": "FD02", "status": "deployed"}],
  "ambulances": [{"id": "AMB01", "status": "available"}, {"id": "AMB02", "status": "available"}],
  "police": [{"id": "PD01", "status": "available"}, {"id": "PD02", "status": "available"}]
}
EOF

Run: Python Code

from praisonaiagents import Agent, Agents, Task, tool
from praisonaiagents import Agent, Agents, Task, tool, db
from pydantic import BaseModel
from typing import List
import json

# Structured output
class DispatchOrder(BaseModel):
    incident_id: str
    severity: str
    units_dispatched: List[str]
    eta_minutes: int
    coordinator: str

# Database persistence
db_instance = db(database_url="sqlite:///emergency.db")

# Tools
@tool
def get_incident(incident_id: str) -> str:
    """Get incident details."""
    with open("incidents.json") as f:
        incidents = json.load(f)
    for inc in incidents:
        if inc["id"] == incident_id:
            return json.dumps(inc)
    return json.dumps({"error": "Incident not found"})

@tool
def check_available_resources(resource_type: str) -> str:
    """Check available resources by type."""
    with open("resources.json") as f:
        resources = json.load(f)
    available = [r for r in resources.get(resource_type, []) if r["status"] == "available"]
    return json.dumps({"type": resource_type, "available": available, "count": len(available)})

@tool
def dispatch_unit(unit_id: str, incident_id: str) -> str:
    """Dispatch a unit to an incident."""
    return json.dumps({"unit_id": unit_id, "incident_id": incident_id, "status": "dispatched", "eta_minutes": 8})

# Agents
assessor = Agent(
    name="IncidentAssessor",
    instructions="Assess incident severity and type. Use get_incident tool.",
    tools=[get_incident],
    db=db_instance,
    session_id="emergency-dispatch"
)

dispatcher = Agent(
    name="ResourceDispatcher",
    instructions="""Dispatch appropriate resources:
    - critical: ambulance + fire + police
    - high: fire + police
    - medium: police
    Use check_available_resources and dispatch_unit tools.""",
    tools=[check_available_resources, dispatch_unit]
)

coordinator = Agent(
    name="ResponseCoordinator",
    instructions="Coordinate response and provide ETA. Generate structured dispatch order."
)

# Tasks
assess_task = Task(
    description="Assess incident: {incident_id}",
    agent=assessor,
    expected_output="Incident details with severity"
)

dispatch_task = Task(
    description="Dispatch appropriate resources based on severity",
    agent=dispatcher,
    expected_output="List of dispatched units"
)

coordinate_task = Task(
    description="Coordinate response and generate dispatch order",
    agent=coordinator,
    expected_output="Structured dispatch order",
    output_pydantic=DispatchOrder
)

# Run
agents = Agents(agents=[assessor, dispatcher, coordinator], tasks=[assess_task, dispatch_task, coordinate_task])
result = agents.start(incident_id="INC002")
print(result)

Run: CLI

# Single incident
praisonai "Respond to fire emergency at 123 Main St" --verbose

# With persistence
praisonai "Medical emergency with casualties" --memory --user-id dispatch_center

# Interactive dispatch mode
praisonai --chat-mode --memory --user-id emergency_ops

Run: agents.yaml

Create agents.yaml:
framework: praisonai
topic: "emergency response coordination"
roles:
  assessor:
    role: Incident Assessor
    goal: Evaluate incident severity
    backstory: Expert at emergency triage
    tasks:
      assess:
        description: |
          Assess incident:
          - Type (fire, medical, traffic, crime)
          - Severity (critical, high, medium, low)
          - Casualties
          - Location
        expected_output: Incident assessment with severity
        
  dispatcher:
    role: Resource Dispatcher
    goal: Deploy appropriate resources
    backstory: Expert at resource allocation
    tasks:
      dispatch:
        description: |
          Dispatch based on severity:
          - Critical: all units
          - High: 2+ units
          - Medium: 1 unit
          - Low: advisory only
        expected_output: Dispatch orders with ETAs
        
  coordinator:
    role: Response Coordinator
    goal: Coordinate multi-agency response
    backstory: Expert at emergency coordination
    tasks:
      coordinate:
        description: |
          Coordinate response:
          - Assign incident commander
          - Set up communication
          - Track unit status
        expected_output: Coordination plan
Run:
praisonai agents.yaml --verbose

Monitor & Verify

# View dispatch history
praisonai --history 20 --user-id dispatch_center

# Check metrics
praisonai --metrics

# Export incident log
praisonai --save incident_log

Serve API

from praisonaiagents import Agent, tool
import json

@tool
def quick_dispatch(incident_type: str, severity: str) -> str:
    """Quick dispatch recommendation."""
    dispatch_matrix = {
        ("fire", "critical"): ["FD01", "FD02", "AMB01", "PD01"],
        ("fire", "high"): ["FD01", "PD01"],
        ("medical", "critical"): ["AMB01", "AMB02"],
        ("medical", "high"): ["AMB01"],
        ("traffic", "medium"): ["PD01"]
    }
    units = dispatch_matrix.get((incident_type, severity), ["PD01"])
    return json.dumps({"units": units, "eta_minutes": 8})

agent = Agent(
    name="EmergencyAPI",
    instructions="Provide dispatch recommendations for emergencies.",
    tools=[quick_dispatch]
)

agent.launch(path="/dispatch", port=8000)
Test:
curl -X POST http://localhost:8000/dispatch \
  -H "Content-Type: application/json" \
  -d '{"message": "Fire emergency, severity critical"}'

Cleanup

rm -f emergency.db incidents.json resources.json
deactivate

Features Demonstrated

FeatureImplementation
Multi-agentAssessor → Dispatcher → Coordinator
Structured OutputPydantic DispatchOrder
Resource ManagementUnit availability checking
DB PersistenceSQLite via db()
CLI--chat-mode for interactive
YAML Config3-agent response pipeline
API Endpointagent.launch()
Severity RoutingPriority-based dispatch