Skip to main content
Multi-agent DeFi trading: market analysis → arbitrage detection → liquidity optimization → risk assessment → trade execution. Includes SQLite trade logs, structured output, and API deployment.

Setup

# Create environment
python3 -m venv venv && source venv/bin/activate

# Install packages
pip install praisonaiagents praisonai

# Set API key
export OPENAI_API_KEY="your-key"

Create Sample Data

# Create liquidity pools
cat > pools.json << 'EOF'
[
  {"id": "UNISWAP-ETH-USDC", "dex": "Uniswap", "token0": "ETH", "token1": "USDC", "reserve0": 10000, "reserve1": 20000000, "fee": 0.003},
  {"id": "SUSHI-ETH-USDC", "dex": "Sushiswap", "token0": "ETH", "token1": "USDC", "reserve0": 8000, "reserve1": 16200000, "fee": 0.003},
  {"id": "CURVE-USDC-USDT", "dex": "Curve", "token0": "USDC", "token1": "USDT", "reserve0": 50000000, "reserve1": 50100000, "fee": 0.0004}
]
EOF

# Create market data
cat > market.json << 'EOF'
{
  "ETH": {"price_usd": 2000, "volatility_24h": 0.05},
  "USDC": {"price_usd": 1.0, "volatility_24h": 0.001},
  "USDT": {"price_usd": 0.999, "volatility_24h": 0.002},
  "gas_price_gwei": 25
}
EOF

Run: Python Code

from praisonaiagents import Agent, Agents, Task, tool
from pydantic import BaseModel
from typing import List
import json

# Structured output
class TradeSignal(BaseModel):
    opportunity_type: str  # arbitrage, liquidity_provision
    pools_involved: List[str]
    expected_profit_usd: float
    risk_level: str  # low, medium, high
    gas_cost_usd: float
    net_profit_usd: float
    recommendation: str  # execute, skip, monitor

# Database persistence is configured via memory={} parameter

# Tools
@tool
def get_pool_data(pool_id: str) -> str:
    """Get liquidity pool data."""
    with open("pools.json") as f:
        pools = json.load(f)
    for p in pools:
        if p["id"] == pool_id:
            return json.dumps(p)
    return json.dumps({"error": "Pool not found"})

@tool
def get_market_data() -> str:
    """Get current market data."""
    with open("market.json") as f:
        return f.read()

@tool
def calculate_arbitrage(pool1_id: str, pool2_id: str, amount: float) -> str:
    """Calculate arbitrage opportunity between two pools."""
    with open("pools.json") as f:
        pools = {p["id"]: p for p in json.load(f)}
    
    p1, p2 = pools.get(pool1_id), pools.get(pool2_id)
    if not p1 or not p2:
        return json.dumps({"error": "Pool not found"})
    
    # Calculate price difference
    price1 = p1["reserve1"] / p1["reserve0"]
    price2 = p2["reserve1"] / p2["reserve0"]
    spread = abs(price1 - price2) / min(price1, price2)
    
    # Estimate profit (simplified)
    gross_profit = amount * spread
    fees = amount * (p1["fee"] + p2["fee"])
    gas_cost = 25 * 200000 / 1e9 * 2000  # gas_price * gas_limit / 1e9 * eth_price
    net_profit = gross_profit - fees - gas_cost
    
    return json.dumps({
        "spread_pct": round(spread * 100, 4),
        "gross_profit": round(gross_profit, 2),
        "fees": round(fees, 2),
        "gas_cost": round(gas_cost, 2),
        "net_profit": round(net_profit, 2),
        "profitable": net_profit > 0
    })

@tool
def assess_risk(volatility: float, liquidity: float, gas_price: int) -> str:
    """Assess trade risk."""
    risk_score = volatility * 100 + (50 / liquidity) + (gas_price / 10)
    
    if risk_score > 15:
        level = "high"
    elif risk_score > 8:
        level = "medium"
    else:
        level = "low"
    
    return json.dumps({"risk_score": round(risk_score, 2), "level": level})

# Agents
market_analyzer = Agent(
    name="MarketAnalyzer",
    instructions="Analyze market conditions. Use get_market_data tool.",
    tools=[get_market_data],
    memory={
        "db": "sqlite:///defi_trades.db",
        "session_id": "defi-trading"
    }
)

arbitrage_detector = Agent(
    name="ArbitrageDetector",
    instructions="Find arbitrage opportunities. Use get_pool_data and calculate_arbitrage tools.",
    tools=[get_pool_data, calculate_arbitrage]
)

risk_assessor = Agent(
    name="RiskAssessor",
    instructions="Assess trade risks. Use assess_risk tool.",
    tools=[assess_risk]
)

# Tasks
market_task = Task(
    description="Analyze current market conditions",
    agent=market_analyzer,
    expected_output="Market data with prices and volatility"
)

arbitrage_task = Task(
    description="Find arbitrage between UNISWAP-ETH-USDC and SUSHI-ETH-USDC",
    agent=arbitrage_detector,
    expected_output="Arbitrage opportunity analysis"
)

risk_task = Task(
    description="Assess risk for the trade opportunity",
    agent=risk_assessor,
    expected_output="Risk assessment",
    output_pydantic=TradeSignal
)

# Run
agents = Agents(agents=[market_analyzer, arbitrage_detector, risk_assessor], tasks=[market_task, arbitrage_task, risk_task])
result = agents.start()
print(result)

Run: CLI

# Analyze single pool
praisonai "Analyze UNISWAP-ETH-USDC liquidity pool" --verbose

# With persistence
praisonai "Find arbitrage opportunities across DEXes" --memory --user-id trader1

# Monitor mode
praisonai "Monitor DeFi markets for trading opportunities" --telemetry

Run: agents.yaml

Create agents.yaml:
framework: praisonai
topic: "DeFi market making"
roles:
  analyzer:
    role: Market Analyst
    goal: Monitor DeFi markets
    backstory: Expert at on-chain data analysis
    tasks:
      analyze:
        description: |
          Analyze markets:
          - Token prices
          - Pool liquidity
          - Gas prices
          - Volume trends
        expected_output: Market analysis
        
  arbitrage:
    role: Arbitrage Hunter
    goal: Find profitable opportunities
    backstory: Expert at MEV and arbitrage
    tasks:
      find:
        description: |
          Find opportunities:
          - Cross-DEX arbitrage
          - Triangular arbitrage
          - Flash loan opportunities
        expected_output: Arbitrage opportunities
        
  risk:
    role: Risk Manager
    goal: Assess and manage risk
    backstory: Expert at DeFi risk management
    tasks:
      assess:
        description: |
          Assess risks:
          - Impermanent loss
          - Smart contract risk
          - Slippage risk
          - Gas price volatility
        expected_output: Risk assessment with recommendations
Run:
praisonai agents.yaml --verbose

Monitor & Verify

# View trade history
praisonai --history 10 --user-id trader1

# Check metrics
praisonai --metrics

# Export trades
praisonai --save trade_log

Serve API

from praisonaiagents import Agent, tool
import json

@tool
def quick_arb_check(price1: float, price2: float, amount: float, gas_gwei: int) -> str:
    """Quick arbitrage profitability check."""
    spread = abs(price1 - price2) / min(price1, price2)
    gross = amount * spread
    gas_cost = gas_gwei * 200000 / 1e9 * 2000
    fees = amount * 0.006  # 0.3% each way
    net = gross - gas_cost - fees
    
    return json.dumps({
        "spread_pct": round(spread * 100, 4),
        "net_profit": round(net, 2),
        "profitable": net > 0,
        "recommendation": "execute" if net > 10 else "skip"
    })

agent = Agent(
    name="DeFiAPI",
    instructions="Check DeFi arbitrage opportunities.",
    tools=[quick_arb_check]
)

agent.launch(path="/arb-check", port=8000)
Test:
curl -X POST http://localhost:8000/arb-check \
  -H "Content-Type: application/json" \
  -d '{"message": "Check arb: price1=2000, price2=2010, amount=10000, gas=25"}'

Cleanup

rm -f defi_trades.db pools.json market.json
deactivate

Features Demonstrated

FeatureImplementation
Multi-agentAnalyzer → Arbitrage → Risk
Structured OutputPydantic TradeSignal
Arbitrage CalcCross-DEX spread analysis
Risk ScoringVolatility + liquidity + gas
DB PersistenceSQLite via db()
CLI--telemetry for monitoring
YAML Config3-agent trading pipeline
API Endpointagent.launch()