Skip to main content
Multi-agent predictive maintenance: sensor monitoring → anomaly detection → failure prediction → maintenance scheduling. Includes SQLite persistence, structured alerts, and API deployment.

Setup

# Create environment
python3 -m venv venv && source venv/bin/activate

# Install packages
pip install praisonaiagents praisonai

# Set API key
export OPENAI_API_KEY="your-key"

Create Sample Data

# Create sensor readings
cat > sensors.json << 'EOF'
[
  {"equipment_id": "PUMP-001", "temperature": 95, "vibration": 1.5, "pressure": 150, "runtime_hours": 8500},
  {"equipment_id": "MOTOR-002", "temperature": 72, "vibration": 0.3, "pressure": 100, "runtime_hours": 2000},
  {"equipment_id": "COMPRESSOR-003", "temperature": 88, "vibration": 1.2, "pressure": 180, "runtime_hours": 12000}
]
EOF

Run: Python Code

from praisonaiagents import Agent, Agents, Task, tool
from pydantic import BaseModel
from typing import List
import json

# Structured output
class MaintenanceAlert(BaseModel):
    equipment_id: str
    risk_level: str  # critical, high, medium, low
    failure_probability: float
    predicted_failure_type: str
    recommended_action: str
    scheduled_date: str

# Database persistence is configured via memory={} parameter

# Tools
@tool
def read_sensor_data(equipment_id: str) -> str:
    """Read current sensor data for equipment."""
    with open("sensors.json") as f:
        sensors = json.load(f)
    for s in sensors:
        if s["equipment_id"] == equipment_id:
            return json.dumps(s)
    return json.dumps({"error": "Equipment not found"})

@tool
def check_maintenance_history(equipment_id: str) -> str:
    """Check maintenance history for equipment."""
    history = {
        "PUMP-001": {"last_maintenance": "2024-01-15", "issues": ["bearing_wear", "seal_leak"]},
        "MOTOR-002": {"last_maintenance": "2024-06-01", "issues": []},
        "COMPRESSOR-003": {"last_maintenance": "2023-09-20", "issues": ["valve_failure"]}
    }
    return json.dumps(history.get(equipment_id, {"last_maintenance": "unknown", "issues": []}))

@tool
def schedule_maintenance(equipment_id: str, priority: str, action: str) -> str:
    """Schedule maintenance for equipment."""
    import datetime
    days = {"critical": 1, "high": 3, "medium": 7, "low": 14}
    date = datetime.date.today() + datetime.timedelta(days=days.get(priority, 7))
    return json.dumps({"equipment_id": equipment_id, "scheduled_date": str(date), "action": action, "status": "scheduled"})

# Agents
monitor = Agent(
    name="SensorMonitor",
    instructions="Read and analyze sensor data. Flag abnormal readings.",
    tools=[read_sensor_data],
    memory={
        "db": "sqlite:///maintenance.db",
        "session_id": "maintenance-monitor"
    }
)

analyzer = Agent(
    name="AnomalyAnalyzer",
    instructions="""Analyze sensor data for anomalies:
    - Temperature > 90°C = critical
    - Vibration > 1.0 = warning
    - Runtime > 10000 hours = inspection needed""",
    tools=[check_maintenance_history]
)

scheduler = Agent(
    name="MaintenanceScheduler",
    instructions="Schedule maintenance based on risk level. Use schedule_maintenance tool.",
    tools=[schedule_maintenance]
)

# Tasks
monitor_task = Task(
    description="Read sensor data for equipment: {equipment_id}",
    agent=monitor,
    expected_output="Current sensor readings with status"
)

analyze_task = Task(
    description="Analyze for anomalies and predict failure risk",
    agent=analyzer,
    expected_output="Risk assessment with failure probability"
)

schedule_task = Task(
    description="Schedule maintenance if needed",
    agent=scheduler,
    expected_output="Maintenance schedule or no action needed",
    output_pydantic=MaintenanceAlert
)

# Run for each equipment
with open("sensors.json") as f:
    equipment_list = json.load(f)

for equip in equipment_list:
    agents = Agents(agents=[monitor, analyzer, scheduler], tasks=[monitor_task, analyze_task, schedule_task])
    result = agents.start(equipment_id=equip["equipment_id"])
    print(f"{equip['equipment_id']}: {result}")

Run: CLI

# Check single equipment
praisonai "Analyze PUMP-001 sensor data and predict maintenance needs" --verbose

# With persistence
praisonai "Monitor all equipment for anomalies" --memory --user-id maintenance_team

# Batch analysis
praisonai "Analyze sensors.json and schedule maintenance" --file sensors.json

Run: agents.yaml

Create agents.yaml:
framework: praisonai
topic: "predictive maintenance system"
roles:
  monitor:
    role: Sensor Monitor
    goal: Collect and validate sensor readings
    backstory: Expert at IoT sensor data analysis
    tasks:
      read_sensors:
        description: |
          Read sensor data and flag:
          - Temperature anomalies
          - Vibration patterns
          - Pressure deviations
        expected_output: Sensor status report
        
  analyzer:
    role: Failure Predictor
    goal: Predict equipment failures
    backstory: Expert at predictive analytics for industrial equipment
    tasks:
      predict_failure:
        description: |
          Analyze patterns and predict:
          - Failure type (bearing, motor, seal, etc.)
          - Probability (0-100%)
          - Time to failure (hours/days)
        expected_output: Failure prediction with confidence
        
  scheduler:
    role: Maintenance Planner
    goal: Optimize maintenance schedules
    backstory: Expert at maintenance planning and resource allocation
    tasks:
      schedule:
        description: |
          Schedule maintenance:
          - Critical: within 24 hours
          - High: within 3 days
          - Medium: within 1 week
          - Low: next scheduled window
        expected_output: Maintenance schedule with parts needed
Run:
praisonai agents.yaml --verbose

Monitor & Verify

# View maintenance history
praisonai --history 20 --user-id maintenance_team

# Check metrics
praisonai --metrics

# Export schedule
praisonai --save maintenance_schedule

Serve API

from praisonaiagents import Agent, tool
import json

@tool
def quick_health_check(temperature: float, vibration: float, runtime_hours: int) -> str:
    """Quick equipment health assessment."""
    score = 100
    issues = []
    
    if temperature > 90:
        score -= 30
        issues.append("high_temperature")
    if vibration > 1.0:
        score -= 25
        issues.append("excessive_vibration")
    if runtime_hours > 10000:
        score -= 20
        issues.append("overdue_inspection")
    
    status = "critical" if score < 50 else "warning" if score < 75 else "healthy"
    return json.dumps({"health_score": score, "status": status, "issues": issues})

agent = Agent(
    name="MaintenanceAPI",
    instructions="Assess equipment health from sensor readings.",
    tools=[quick_health_check]
)

agent.launch(path="/health-check", port=8000)
Test:
curl -X POST http://localhost:8000/health-check \
  -H "Content-Type: application/json" \
  -d '{"message": "Check health: temperature=95, vibration=1.5, runtime=8500"}'

Cleanup

rm -f maintenance.db sensors.json
deactivate

Features Demonstrated

FeatureImplementation
Multi-agentMonitor → Analyzer → Scheduler
Structured OutputPydantic MaintenanceAlert
Custom ToolsSensor reading, history check
DB PersistenceSQLite via db()
CLI--file for batch processing
YAML Config3-agent maintenance pipeline
API Endpointagent.launch()
Health Scoring0-100 with thresholds