Setup
Copy
# Create environment
python3 -m venv venv && source venv/bin/activate
# Install packages
pip install praisonaiagents praisonai
# Set API key
export OPENAI_API_KEY="your-key"
Create Sample Data
Copy
# Create vulnerability patterns
cat > patterns.json << 'EOF'
{
"SQL_INJECTION": {"cwe": "CWE-89", "severity": "critical", "cvss_base": 9.8, "pattern": ".*\\$\\{.*\\}.*SQL.*"},
"XSS": {"cwe": "CWE-79", "severity": "high", "cvss_base": 7.5, "pattern": "innerHTML.*=.*user"},
"PATH_TRAVERSAL": {"cwe": "CWE-22", "severity": "high", "cvss_base": 7.0, "pattern": "\\.\\./"},
"HARDCODED_SECRET": {"cwe": "CWE-798", "severity": "medium", "cvss_base": 5.5, "pattern": "(password|secret|key)\\s*=\\s*['\"]"}
}
EOF
# Create sample code to scan
cat > sample_code.py << 'EOF'
def get_user(user_id):
query = f"SELECT * FROM users WHERE id = ${user_id}" # SQL Injection
return db.execute(query)
def render_comment(comment):
element.innerHTML = comment # XSS vulnerability
API_KEY = "sk-secret123" # Hardcoded secret
EOF
Run: Python Code
Copy
from praisonaiagents import Agent, Agents, Task, tool
from pydantic import BaseModel
from typing import List
import json
import re
# Structured output
class VulnerabilityReport(BaseModel):
file_path: str
total_findings: int
critical_count: int
high_count: int
medium_count: int
findings: List[dict]
remediation_priority: List[str]
# Database persistence is configured via memory={} parameter
# Tools
@tool
def scan_file(file_path: str) -> str:
"""Scan file for vulnerability patterns."""
with open("patterns.json") as f:
patterns = json.load(f)
with open(file_path) as f:
code = f.read()
lines = code.split('\n')
findings = []
for vuln_type, vuln_info in patterns.items():
for i, line in enumerate(lines, 1):
if re.search(vuln_info["pattern"], line, re.IGNORECASE):
findings.append({
"type": vuln_type,
"cwe": vuln_info["cwe"],
"severity": vuln_info["severity"],
"cvss": vuln_info["cvss_base"],
"line": i,
"code": line.strip()[:50]
})
return json.dumps(findings)
@tool
def calculate_cvss(base_score: float, exploitability: str, impact: str) -> str:
"""Calculate CVSS score with environmental factors."""
exploit_mod = {"easy": 1.0, "medium": 0.9, "hard": 0.7}.get(exploitability, 0.9)
impact_mod = {"high": 1.0, "medium": 0.8, "low": 0.5}.get(impact, 0.8)
final_score = base_score * exploit_mod * impact_mod
if final_score >= 9.0:
rating = "Critical"
elif final_score >= 7.0:
rating = "High"
elif final_score >= 4.0:
rating = "Medium"
else:
rating = "Low"
return json.dumps({"final_score": round(final_score, 1), "rating": rating})
@tool
def get_remediation(vuln_type: str) -> str:
"""Get remediation guidance for vulnerability type."""
remediations = {
"SQL_INJECTION": "Use parameterized queries or prepared statements",
"XSS": "Sanitize user input and use Content-Security-Policy headers",
"PATH_TRAVERSAL": "Validate and sanitize file paths, use allowlists",
"HARDCODED_SECRET": "Use environment variables or secret management service"
}
return json.dumps({"vulnerability": vuln_type, "remediation": remediations.get(vuln_type, "Review security best practices")})
# Agents
scanner = Agent(
name="VulnerabilityScanner",
instructions="Scan code for vulnerabilities. Use scan_file tool.",
tools=[scan_file],
memory={
"db": "sqlite:///security_scan.db",
"session_id": "security-scan"
}
)
analyzer = Agent(
name="SeverityAnalyzer",
instructions="Analyze severity. Use calculate_cvss tool.",
tools=[calculate_cvss]
)
advisor = Agent(
name="RemediationAdvisor",
instructions="Provide remediation guidance. Use get_remediation tool.",
tools=[get_remediation]
)
# Tasks
scan_task = Task(
description="Scan file: {file_path}",
agent=scanner,
expected_output="List of vulnerabilities found"
)
analyze_task = Task(
description="Analyze severity of each finding",
agent=analyzer,
expected_output="CVSS scores and ratings"
)
remediate_task = Task(
description="Generate remediation report",
agent=advisor,
expected_output="Vulnerability report with remediation",
output_pydantic=VulnerabilityReport
)
# Run
agents = Agents(agents=[scanner, analyzer, advisor], tasks=[scan_task, analyze_task, remediate_task])
result = agents.start(file_path="sample_code.py")
print(result)
Run: CLI
Copy
# Scan single file
praisonai "Scan sample_code.py for security vulnerabilities" --verbose
# With persistence
praisonai "Security audit of Python codebase" --memory --user-id security_team
# Full scan
praisonai "Find all SQL injection and XSS vulnerabilities" --telemetry
Run: agents.yaml
Createagents.yaml:
Copy
framework: praisonai
topic: "vulnerability detection"
roles:
scanner:
role: Security Scanner
goal: Detect vulnerabilities
backstory: Expert at static code analysis
tasks:
scan:
description: |
Scan for:
- SQL Injection (CWE-89)
- XSS (CWE-79)
- Path Traversal (CWE-22)
- Hardcoded Secrets (CWE-798)
expected_output: Vulnerability findings
analyzer:
role: Severity Analyst
goal: Score vulnerabilities
backstory: Expert at CVSS scoring
tasks:
analyze:
description: |
Calculate CVSS:
- Base score
- Exploitability
- Impact assessment
expected_output: Severity ratings
advisor:
role: Remediation Advisor
goal: Provide fix guidance
backstory: Expert at secure coding
tasks:
advise:
description: |
Recommend fixes:
- Code changes
- Security controls
- Priority order
expected_output: Remediation plan
Copy
praisonai agents.yaml --verbose
Monitor & Verify
Copy
# View scan history
praisonai --history 10 --user-id security_team
# Check metrics
praisonai --metrics
# Export report
praisonai --save security_report
Serve API
Copy
from praisonaiagents import Agent, tool
import json
import re
@tool
def quick_vuln_check(code_snippet: str) -> str:
"""Quick vulnerability check on code snippet."""
findings = []
# SQL Injection
if re.search(r'f["\'].*SELECT.*\{', code_snippet):
findings.append({"type": "SQL_INJECTION", "severity": "critical"})
# XSS
if re.search(r'innerHTML\s*=', code_snippet):
findings.append({"type": "XSS", "severity": "high"})
# Hardcoded secrets
if re.search(r'(password|secret|key)\s*=\s*["\'][^"\']+["\']', code_snippet, re.I):
findings.append({"type": "HARDCODED_SECRET", "severity": "medium"})
return json.dumps({
"findings": findings,
"total": len(findings),
"safe": len(findings) == 0
})
agent = Agent(
name="SecurityAPI",
instructions="Check code for security vulnerabilities.",
tools=[quick_vuln_check]
)
agent.launch(path="/vuln-check", port=8000)
Copy
curl -X POST http://localhost:8000/vuln-check \
-H "Content-Type: application/json" \
-d '{"message": "Check this code: query = f\"SELECT * FROM users WHERE id = {user_id}\""}'
Cleanup
Copy
rm -f security_scan.db patterns.json sample_code.py
deactivate
Features Demonstrated
| Feature | Implementation |
|---|---|
| Multi-agent | Scanner → Analyzer → Advisor |
| Structured Output | Pydantic VulnerabilityReport |
| Pattern Matching | Regex-based detection |
| CVSS Scoring | Base + environmental factors |
| DB Persistence | SQLite via db() |
| CLI | --telemetry for tracking |
| YAML Config | 3-agent security pipeline |
| API Endpoint | agent.launch() |

