Skip to main content
Multi-agent security scanning: code analysis → vulnerability detection → severity scoring → remediation planning. Includes SQLite findings database, structured output, and API deployment.

Setup

# Create environment
python3 -m venv venv && source venv/bin/activate

# Install packages
pip install praisonaiagents praisonai

# Set API key
export OPENAI_API_KEY="your-key"

Create Sample Data

# Create vulnerability patterns
cat > patterns.json << 'EOF'
{
  "SQL_INJECTION": {"cwe": "CWE-89", "severity": "critical", "cvss_base": 9.8, "pattern": ".*\\$\\{.*\\}.*SQL.*"},
  "XSS": {"cwe": "CWE-79", "severity": "high", "cvss_base": 7.5, "pattern": "innerHTML.*=.*user"},
  "PATH_TRAVERSAL": {"cwe": "CWE-22", "severity": "high", "cvss_base": 7.0, "pattern": "\\.\\./"},
  "HARDCODED_SECRET": {"cwe": "CWE-798", "severity": "medium", "cvss_base": 5.5, "pattern": "(password|secret|key)\\s*=\\s*['\"]"}
}
EOF

# Create sample code to scan
cat > sample_code.py << 'EOF'
def get_user(user_id):
    query = f"SELECT * FROM users WHERE id = ${user_id}"  # SQL Injection
    return db.execute(query)

def render_comment(comment):
    element.innerHTML = comment  # XSS vulnerability

API_KEY = "sk-secret123"  # Hardcoded secret
EOF

Run: Python Code

from praisonaiagents import Agent, Agents, Task, tool
from pydantic import BaseModel
from typing import List
import json
import re

# Structured output
class VulnerabilityReport(BaseModel):
    file_path: str
    total_findings: int
    critical_count: int
    high_count: int
    medium_count: int
    findings: List[dict]
    remediation_priority: List[str]

# Database persistence is configured via memory={} parameter

# Tools
@tool
def scan_file(file_path: str) -> str:
    """Scan file for vulnerability patterns."""
    with open("patterns.json") as f:
        patterns = json.load(f)
    
    with open(file_path) as f:
        code = f.read()
        lines = code.split('\n')
    
    findings = []
    for vuln_type, vuln_info in patterns.items():
        for i, line in enumerate(lines, 1):
            if re.search(vuln_info["pattern"], line, re.IGNORECASE):
                findings.append({
                    "type": vuln_type,
                    "cwe": vuln_info["cwe"],
                    "severity": vuln_info["severity"],
                    "cvss": vuln_info["cvss_base"],
                    "line": i,
                    "code": line.strip()[:50]
                })
    
    return json.dumps(findings)

@tool
def calculate_cvss(base_score: float, exploitability: str, impact: str) -> str:
    """Calculate CVSS score with environmental factors."""
    exploit_mod = {"easy": 1.0, "medium": 0.9, "hard": 0.7}.get(exploitability, 0.9)
    impact_mod = {"high": 1.0, "medium": 0.8, "low": 0.5}.get(impact, 0.8)
    
    final_score = base_score * exploit_mod * impact_mod
    
    if final_score >= 9.0:
        rating = "Critical"
    elif final_score >= 7.0:
        rating = "High"
    elif final_score >= 4.0:
        rating = "Medium"
    else:
        rating = "Low"
    
    return json.dumps({"final_score": round(final_score, 1), "rating": rating})

@tool
def get_remediation(vuln_type: str) -> str:
    """Get remediation guidance for vulnerability type."""
    remediations = {
        "SQL_INJECTION": "Use parameterized queries or prepared statements",
        "XSS": "Sanitize user input and use Content-Security-Policy headers",
        "PATH_TRAVERSAL": "Validate and sanitize file paths, use allowlists",
        "HARDCODED_SECRET": "Use environment variables or secret management service"
    }
    return json.dumps({"vulnerability": vuln_type, "remediation": remediations.get(vuln_type, "Review security best practices")})

# Agents
scanner = Agent(
    name="VulnerabilityScanner",
    instructions="Scan code for vulnerabilities. Use scan_file tool.",
    tools=[scan_file],
    memory={
        "db": "sqlite:///security_scan.db",
        "session_id": "security-scan"
    }
)

analyzer = Agent(
    name="SeverityAnalyzer",
    instructions="Analyze severity. Use calculate_cvss tool.",
    tools=[calculate_cvss]
)

advisor = Agent(
    name="RemediationAdvisor",
    instructions="Provide remediation guidance. Use get_remediation tool.",
    tools=[get_remediation]
)

# Tasks
scan_task = Task(
    description="Scan file: {file_path}",
    agent=scanner,
    expected_output="List of vulnerabilities found"
)

analyze_task = Task(
    description="Analyze severity of each finding",
    agent=analyzer,
    expected_output="CVSS scores and ratings"
)

remediate_task = Task(
    description="Generate remediation report",
    agent=advisor,
    expected_output="Vulnerability report with remediation",
    output_pydantic=VulnerabilityReport
)

# Run
agents = Agents(agents=[scanner, analyzer, advisor], tasks=[scan_task, analyze_task, remediate_task])
result = agents.start(file_path="sample_code.py")
print(result)

Run: CLI

# Scan single file
praisonai "Scan sample_code.py for security vulnerabilities" --verbose

# With persistence
praisonai "Security audit of Python codebase" --memory --user-id security_team

# Full scan
praisonai "Find all SQL injection and XSS vulnerabilities" --telemetry

Run: agents.yaml

Create agents.yaml:
framework: praisonai
topic: "vulnerability detection"
roles:
  scanner:
    role: Security Scanner
    goal: Detect vulnerabilities
    backstory: Expert at static code analysis
    tasks:
      scan:
        description: |
          Scan for:
          - SQL Injection (CWE-89)
          - XSS (CWE-79)
          - Path Traversal (CWE-22)
          - Hardcoded Secrets (CWE-798)
        expected_output: Vulnerability findings
        
  analyzer:
    role: Severity Analyst
    goal: Score vulnerabilities
    backstory: Expert at CVSS scoring
    tasks:
      analyze:
        description: |
          Calculate CVSS:
          - Base score
          - Exploitability
          - Impact assessment
        expected_output: Severity ratings
        
  advisor:
    role: Remediation Advisor
    goal: Provide fix guidance
    backstory: Expert at secure coding
    tasks:
      advise:
        description: |
          Recommend fixes:
          - Code changes
          - Security controls
          - Priority order
        expected_output: Remediation plan
Run:
praisonai agents.yaml --verbose

Monitor & Verify

# View scan history
praisonai --history 10 --user-id security_team

# Check metrics
praisonai --metrics

# Export report
praisonai --save security_report

Serve API

from praisonaiagents import Agent, tool
import json
import re

@tool
def quick_vuln_check(code_snippet: str) -> str:
    """Quick vulnerability check on code snippet."""
    findings = []
    
    # SQL Injection
    if re.search(r'f["\'].*SELECT.*\{', code_snippet):
        findings.append({"type": "SQL_INJECTION", "severity": "critical"})
    
    # XSS
    if re.search(r'innerHTML\s*=', code_snippet):
        findings.append({"type": "XSS", "severity": "high"})
    
    # Hardcoded secrets
    if re.search(r'(password|secret|key)\s*=\s*["\'][^"\']+["\']', code_snippet, re.I):
        findings.append({"type": "HARDCODED_SECRET", "severity": "medium"})
    
    return json.dumps({
        "findings": findings,
        "total": len(findings),
        "safe": len(findings) == 0
    })

agent = Agent(
    name="SecurityAPI",
    instructions="Check code for security vulnerabilities.",
    tools=[quick_vuln_check]
)

agent.launch(path="/vuln-check", port=8000)
Test:
curl -X POST http://localhost:8000/vuln-check \
  -H "Content-Type: application/json" \
  -d '{"message": "Check this code: query = f\"SELECT * FROM users WHERE id = {user_id}\""}'

Cleanup

rm -f security_scan.db patterns.json sample_code.py
deactivate

Features Demonstrated

FeatureImplementation
Multi-agentScanner → Analyzer → Advisor
Structured OutputPydantic VulnerabilityReport
Pattern MatchingRegex-based detection
CVSS ScoringBase + environmental factors
DB PersistenceSQLite via db()
CLI--telemetry for tracking
YAML Config3-agent security pipeline
API Endpointagent.launch()