Best practices for handling user input, validation, and safety in PraisonAI
from praisonaiagents import Agent
agent = Agent(
name="Assistant",
instructions="Help users with their queries"
)
# Basic input
user_input = input("What would you like help with? ")
response = agent.chat(user_input)
print(response)
import os
# Try environment variable first, fall back to prompt
api_key = os.getenv("API_KEY") or input("Enter API key: ")
agent = Agent(
name="SecureAgent",
instructions="Process secure requests",
llm_config={"api_key": api_key}
)
class InputCollector:
def __init__(self):
self.inputs = {}
def collect(self, fields):
for field in fields:
prompt = field.get("prompt", f"Enter {field['name']}: ")
validator = field.get("validator", lambda x: True)
while True:
value = input(prompt)
if validator(value):
self.inputs[field["name"]] = value
break
print("Invalid input, please try again.")
return self.inputs
# Usage
collector = InputCollector()
user_data = collector.collect([
{
"name": "email",
"prompt": "Enter email: ",
"validator": lambda x: "@" in x
},
{
"name": "age",
"prompt": "Enter age: ",
"validator": lambda x: x.isdigit() and 0 < int(x) < 150
}
])
from pydantic import BaseModel, validator, Field
from typing import Optional
import re
class UserInput(BaseModel):
email: str = Field(..., description="User email address")
age: int = Field(..., gt=0, lt=150)
query: str = Field(..., min_length=1, max_length=1000)
@validator("email")
def validate_email(cls, v):
if not re.match(r"[^@]+@[^@]+\.[^@]+", v):
raise ValueError("Invalid email format")
return v
@validator("query")
def sanitise_query(cls, v):
# Remove potentially harmful characters
return re.sub(r"[<>\"']", "", v)
# Use with agent
def process_request(raw_input: dict):
try:
# Validate and sanitise
validated = UserInput(**raw_input)
agent = Agent(
name="ValidatedAgent",
instructions="Process validated requests"
)
return agent.chat(validated.query)
except ValueError as e:
return f"Validation error: {e}"
from praisonaiagents import Agent, Task, Workflow
# Validation agent
validator = Agent(
name="Validator",
instructions="Validate user inputs and check for security issues"
)
# Processing agent
processor = Agent(
name="Processor",
instructions="Process validated requests"
)
# Create validation workflow
validation_task = Task(
description="Validate user input: {input}",
agent=validator,
expected_output="Validation result with any issues found"
)
processing_task = Task(
description="Process validated input",
agent=processor,
condition=lambda results: "valid" in results.get("validation_task", "")
)
workflow = Workflow(tasks=[validation_task, processing_task])
from praisonaiagents import Agent, approval
# Define risk levels
def classify_risk(action):
if any(word in action.lower() for word in ["delete", "remove", "destroy"]):
return "high"
elif any(word in action.lower() for word in ["modify", "update", "change"]):
return "medium"
return "low"
# Custom approval callback
def approval_callback(action, risk_level):
if risk_level == "high":
print(f"\n⚠️ HIGH RISK ACTION: {action}")
response = input("Type 'CONFIRM' to proceed: ")
return response == "CONFIRM"
elif risk_level == "medium":
response = input(f"\nConfirm action: {action} (y/n): ")
return response.lower() == "y"
return True # Auto-approve low risk
# Agent with approval system
agent = Agent(
name="SafeAgent",
instructions="Execute user commands safely",
tools=[file_operations, database_operations],
approval_callback=approval_callback,
risk_classifier=classify_risk
)
from praisonaiagents import Tool
# High-risk tool with mandatory approval
delete_tool = Tool(
name="delete_file",
description="Delete a file from the system",
function=delete_file_function,
requires_approval=True,
approval_prompt="This will permanently delete {filename}. Proceed?"
)
# Conditional approval based on parameters
def conditional_approval(params):
if params.get("force", False):
return input("Force delete requested. Are you sure? (yes/no): ") == "yes"
return True
modify_tool = Tool(
name="modify_config",
description="Modify system configuration",
function=modify_config_function,
approval_condition=conditional_approval
)
import streamlit as st
from praisonaiagents import Agent
# Input validation in Streamlit
def validate_streamlit_inputs():
with st.form("user_input"):
# Text input with validation
query = st.text_input(
"Enter your query",
max_chars=500,
help="Maximum 500 characters"
)
# Numeric input with constraints
temperature = st.slider(
"Response creativity",
min_value=0.0,
max_value=1.0,
value=0.7,
step=0.1
)
# Option selection
mode = st.selectbox(
"Processing mode",
["Safe", "Balanced", "Creative"],
index=1
)
submitted = st.form_submit_button("Submit")
if submitted:
if len(query.strip()) == 0:
st.error("Query cannot be empty")
return None
return {
"query": query.strip(),
"temperature": temperature,
"mode": mode
}
# Use validated input
if user_input := validate_streamlit_inputs():
agent = Agent(
name="UIAgent",
llm_config={"temperature": user_input["temperature"]}
)
with st.spinner("Processing..."):
response = agent.chat(user_input["query"])
st.success(response)
import gradio as gr
from praisonaiagents import Agent
def create_gradio_interface():
agent = Agent(name="GradioAgent")
def process_with_validation(text, temperature, max_length):
# Validate inputs
if not text or len(text.strip()) == 0:
return "Error: Input cannot be empty"
if len(text) > max_length:
return f"Error: Input exceeds {max_length} characters"
# Process with agent
try:
response = agent.chat(
text,
llm_config={"temperature": temperature}
)
return response
except Exception as e:
return f"Error processing request: {str(e)}"
interface = gr.Interface(
fn=process_with_validation,
inputs=[
gr.Textbox(
label="Your Query",
placeholder="Enter your question here",
lines=3
),
gr.Slider(
minimum=0,
maximum=1,
value=0.7,
label="Temperature"
),
gr.Number(
value=1000,
label="Max Input Length",
precision=0
)
],
outputs=gr.Textbox(label="Response"),
title="PraisonAI Assistant",
description="Ask questions with input validation"
)
return interface
from praisonaiagents import Agent, Session
import json
class TransactionalInput:
def __init__(self):
self.checkpoint = None
self.validated_inputs = []
def begin_transaction(self):
self.checkpoint = self.validated_inputs.copy()
def add_input(self, input_data, validator):
if validator(input_data):
self.validated_inputs.append(input_data)
return True
return False
def commit(self):
self.checkpoint = None
return self.validated_inputs
def rollback(self):
if self.checkpoint is not None:
self.validated_inputs = self.checkpoint
self.checkpoint = None
# Usage with agent
def process_multi_step_input(agent: Agent):
tx = TransactionalInput()
tx.begin_transaction()
try:
# Step 1: Collect user info
user_info = input("Enter user info (JSON): ")
if not tx.add_input(
json.loads(user_info),
lambda x: "user_id" in x and "email" in x
):
raise ValueError("Invalid user info")
# Step 2: Collect request
request = input("Enter request: ")
if not tx.add_input(
{"request": request},
lambda x: len(x.get("request", "")) > 0
):
raise ValueError("Empty request")
# Process all inputs
all_inputs = tx.commit()
return agent.chat(str(all_inputs))
except Exception as e:
tx.rollback()
return f"Transaction failed: {e}"
import re
import html
class InputSanitiser:
@staticmethod
def sanitise_text(text: str) -> str:
# Remove HTML tags
text = re.sub(r'<[^>]+>', '', text)
# Escape special characters
text = html.escape(text)
# Remove potential SQL injection patterns
text = re.sub(r'(;|--|\/\*|\*\/|xp_|sp_)', '', text)
# Remove potential command injection
text = re.sub(r'(\||&|;|`|\$\(|\))', '', text)
return text.strip()
@staticmethod
def sanitise_filename(filename: str) -> str:
# Remove path traversal attempts
filename = re.sub(r'\.\.[\\/]', '', filename)
# Keep only safe characters
filename = re.sub(r'[^a-zA-Z0-9._-]', '_', filename)
return filename
# Use with agent
def safe_file_operation(agent: Agent, user_input: str):
sanitised = InputSanitiser.sanitise_filename(user_input)
if sanitised != user_input:
print(f"Input sanitised: {user_input} -> {sanitised}")
return agent.chat(f"Process file: {sanitised}")
from datetime import datetime, timedelta
from collections import defaultdict
class RateLimiter:
def __init__(self, max_requests=10, window_seconds=60):
self.max_requests = max_requests
self.window = timedelta(seconds=window_seconds)
self.requests = defaultdict(list)
def is_allowed(self, user_id: str) -> bool:
now = datetime.now()
user_requests = self.requests[user_id]
# Remove old requests
user_requests[:] = [
req_time for req_time in user_requests
if now - req_time < self.window
]
# Check limit
if len(user_requests) >= self.max_requests:
return False
user_requests.append(now)
return True
# Use with agent
rate_limiter = RateLimiter(max_requests=5, window_seconds=60)
def handle_user_request(user_id: str, message: str):
if not rate_limiter.is_allowed(user_id):
return "Rate limit exceeded. Please try again later."
agent = Agent(name="RateLimitedAgent")
return agent.chat(message)
# 1. Environment variables (most secure)
api_key = os.getenv("API_KEY")
# 2. Configuration files
config = load_config("settings.json")
# 3. Command line arguments
parser.add_argument("--input", required=True)
# 4. Interactive prompts (least secure)
user_input = input("Enter value: ")
def input_pipeline(raw_input):
# 1. Type validation
validated = validate_types(raw_input)
# 2. Format validation
formatted = validate_format(validated)
# 3. Business logic validation
checked = validate_business_rules(formatted)
# 4. Sanitisation
sanitised = sanitise_input(checked)
# 5. Final approval
if requires_approval(sanitised):
approved = get_user_approval(sanitised)
return approved
return sanitised