Skip to main content
Hook events are triggered at specific points in the agent lifecycle, allowing you to intercept, modify, or block operations.

Quick Start

1

Import Hook Components

from praisonaiagents.hooks import HookRegistry, HookEvent, HookResult
2

Register a Hook

registry = HookRegistry()

@registry.on(HookEvent.BEFORE_TOOL)
def log_tool(event_data):
    print(f"Tool: {event_data.tool_name}")
    return HookResult.allow()
3

Use with Agent

from praisonaiagents import Agent

agent = Agent(
    name="MyAgent",
    instructions="You are helpful",
    hooks=registry
)

Core Events

Tool Events

EventTriggerInput TypeUse Case
BEFORE_TOOLBefore tool executionBeforeToolInputSecurity checks, logging
AFTER_TOOLAfter tool executionAfterToolInputResult validation, logging
@registry.on(HookEvent.BEFORE_TOOL)
def before_tool(event_data):
    print(f"Calling: {event_data.tool_name}")
    print(f"Args: {event_data.arguments}")
    return HookResult.allow()

@registry.on(HookEvent.AFTER_TOOL)
def after_tool(event_data):
    print(f"Result: {event_data.result}")
    return HookResult.allow()

Agent Events

EventTriggerInput TypeUse Case
BEFORE_AGENTBefore agent runsBeforeAgentInputSetup, initialization
AFTER_AGENTAfter agent completesAfterAgentInputCleanup, reporting
@registry.on(HookEvent.BEFORE_AGENT)
def before_agent(event_data):
    print(f"Agent starting: {event_data.agent_name}")
    return HookResult.allow()

@registry.on(HookEvent.AFTER_AGENT)
def after_agent(event_data):
    print(f"Agent completed: {event_data.result}")
    return HookResult.allow()

LLM Events

EventTriggerInput TypeUse Case
BEFORE_LLMBefore LLM API callBeforeLLMInputRequest modification
AFTER_LLMAfter LLM responseAfterLLMInputResponse validation
@registry.on(HookEvent.BEFORE_LLM)
def before_llm(event_data):
    print(f"Model: {event_data.model}")
    print(f"Messages: {len(event_data.messages)}")
    return HookResult.allow()

@registry.on(HookEvent.AFTER_LLM)
def after_llm(event_data):
    print(f"Response length: {len(event_data.response)}")
    print(f"Tokens: {event_data.usage}")
    return HookResult.allow()

Session Events

EventTriggerInput TypeUse Case
SESSION_STARTWhen session startsSessionStartInputSession initialization
SESSION_ENDWhen session endsSessionEndInputSession cleanup
@registry.on(HookEvent.SESSION_START)
def session_start(event_data):
    print(f"Session: {event_data.session_id}")
    return HookResult.allow()

@registry.on(HookEvent.SESSION_END)
def session_end(event_data):
    print(f"Session ended: {event_data.session_id}")
    return HookResult.allow()

Error Events

EventTriggerInput TypeUse Case
ON_ERRORWhen error occursOnErrorInputError handling
ON_RETRYBefore retry attemptOnRetryInputRetry logic
@registry.on(HookEvent.ON_ERROR)
def on_error(event_data):
    print(f"Error: {event_data.error}")
    # Log to external service
    return HookResult.allow()

@registry.on(HookEvent.ON_RETRY)
def on_retry(event_data):
    # New tool-level fields
    print(f"[retry] {event_data.tool_name} attempt {event_data.attempt}/{event_data.max_attempts}")
    print(f"Error type: {event_data.error_type}, delay: {event_data.delay_ms}ms")
    
    # Legacy fields still available for backward compatibility
    # event_data.retry_count, event_data.max_retries, event_data.error_message
    
    if event_data.attempt > 2:
        return HookResult.deny("Too many retries")
    return HookResult.allow()

OnRetryInput Fields

The OnRetryInput event includes both new tool-specific fields and legacy fields for backward compatibility: New fields (recommended):
  • tool_name: Name of the failing tool
  • attempt: Current attempt number (1-based)
  • max_attempts: Maximum attempts configured
  • delay_ms: Delay before this retry in milliseconds
  • error_type: Classified error type (timeout, rate_limit, connection_error, unknown)
  • error: Original exception object
Legacy fields (for backward compatibility):
  • retry_count: Same as attempt
  • max_retries: Same as max_attempts
  • error_message: String representation of error

Agent-Level Error Callbacks

In addition to hook events, agents support a direct on_error callback for LLM failures:
from praisonaiagents import Agent
from praisonaiagents.errors import LLMError

def agent_error_handler(error):
    """Called when LLM chat completion fails"""
    print(f"Agent {error.agent_id} LLM error: {error.message}")
    print(f"Model: {error.model_name}")
    print(f"Retryable: {error.is_retryable}")
    
    # Custom error recovery logic
    if error.is_retryable:
        print("Will be retried by orchestration")
    else:
        print("Fatal error - check configuration")

# Agent-specific error handling
agent = Agent(
    name="Error Aware Agent",
    instructions="Process user requests",
    on_error=agent_error_handler  # Direct callback, not a hook
)

Hook Events vs Agent Callbacks

ApproachScopeReturn ValueWhen Called
HookEvent.ON_ERRORGlobal, all agentsHookResultAny hook system error
agent.on_errorSingle agentNoneLLM chat completion errors
# Using both together for comprehensive error handling
registry = HookRegistry()

@registry.on(HookEvent.ON_ERROR)
def global_hook_error(event_data):
    """Catches all hook system errors"""
    return HookResult.allow()

def llm_specific_error(error):
    """Catches LLM errors for this agent only"""
    pass

agent = Agent(
    name="Comprehensive Error Handling",
    hooks=registry,           # Global error hooks
    on_error=llm_specific_error  # Agent-specific LLM errors
)

Extended Events

User Interaction Events

EventTriggerUse Case
USER_PROMPT_SUBMITUser submits promptInput validation, logging
NOTIFICATIONNotification sentAlert routing
@registry.on(HookEvent.USER_PROMPT_SUBMIT)
def on_prompt(event_data):
    print(f"User prompt: {event_data.prompt}")
    # Validate or modify input
    return HookResult.allow()

@registry.on(HookEvent.NOTIFICATION)
def on_notification(event_data):
    print(f"Notification: {event_data.message}")
    # Route to external service
    return HookResult.allow()

Subagent Events

EventTriggerUse Case
SUBAGENT_STOPSubagent completesResult handling
@registry.on(HookEvent.SUBAGENT_STOP)
def on_subagent_stop(event_data):
    print(f"Subagent completed: {event_data.agent_name}")
    print(f"Result: {event_data.result}")
    return HookResult.allow()

System Events

EventTriggerUse Case
SETUPInitialization/maintenanceConfig loading
BEFORE_COMPACTIONBefore context compactionPre-compaction hooks
AFTER_COMPACTIONAfter context compactionPost-compaction validation
@registry.on(HookEvent.SETUP)
def on_setup(event_data):
    print("System initializing...")
    # Load configuration
    return HookResult.allow()

@registry.on(HookEvent.BEFORE_COMPACTION)
def before_compaction(event_data):
    print(f"Compacting context: {event_data.token_count} tokens")
    return HookResult.allow()

@registry.on(HookEvent.AFTER_COMPACTION)
def after_compaction(event_data):
    print(f"Compacted to: {event_data.new_token_count} tokens")
    return HookResult.allow()

Message Events

EventTriggerUse Case
MESSAGE_RECEIVEDMessage receivedPreprocessing
MESSAGE_SENDINGBefore message sentModification
MESSAGE_SENTAfter message sentConfirmation
@registry.on(HookEvent.MESSAGE_RECEIVED)
def on_message_received(event_data):
    print(f"Received: {event_data.message}")
    return HookResult.allow()

@registry.on(HookEvent.MESSAGE_SENDING)
def on_message_sending(event_data):
    print(f"Sending: {event_data.message}")
    return HookResult.allow()

Gateway Events

EventTriggerUse Case
GATEWAY_STARTGateway startsInitialization
GATEWAY_STOPGateway stopsCleanup
@registry.on(HookEvent.GATEWAY_START)
def on_gateway_start(event_data):
    print("Gateway starting...")
    return HookResult.allow()

@registry.on(HookEvent.GATEWAY_STOP)
def on_gateway_stop(event_data):
    print("Gateway stopping...")
    return HookResult.allow()

Storage Events

EventTriggerUse Case
TOOL_RESULT_PERSISTBefore result storageResult modification
@registry.on(HookEvent.TOOL_RESULT_PERSIST)
def on_persist(event_data):
    print(f"Persisting: {event_data.tool_name}")
    # Modify or filter result before storage
    return HookResult.allow()

Kanban Events

EventInput TypeUse Case
KANBAN_TASK_CREATEDKanbanHookInputTask added to board
KANBAN_TASK_CLAIMEDKanbanHookInputTask assigned
KANBAN_TASK_MOVEDKanbanHookInputTask status changed
KANBAN_TASK_DONEKanbanHookInputTask completed
KANBAN_TASK_BLOCKEDKanbanHookInputTask blocked
KANBAN_TASK_FAILEDKanbanHookInputTask failed
@registry.on(HookEvent.KANBAN_TASK_MOVED)
def track_progress(event_data):
    print(f"Task {event_data.task_id}: {event_data.from_status}{event_data.to_status}")
    return HookResult.allow()

@registry.on(HookEvent.KANBAN_TASK_BLOCKED)
def handle_blocked(event_data):
    print(f"🔒 Task blocked: {event_data.task_id}")
    # Send alert to Slack, email, etc.
    return HookResult.allow()

Complete Event Reference

EventCategoryDescription
BEFORE_TOOLToolBefore tool execution
AFTER_TOOLToolAfter tool execution
BEFORE_AGENTAgentBefore agent runs
AFTER_AGENTAgentAfter agent completes
BEFORE_LLMLLMBefore LLM API call
AFTER_LLMLLMAfter LLM response
SESSION_STARTSessionSession starts
SESSION_ENDSessionSession ends
ON_ERRORErrorError occurs
ON_RETRYErrorBefore retry
USER_PROMPT_SUBMITUserUser submits prompt
NOTIFICATIONUserNotification sent
SUBAGENT_STOPSubagentSubagent completes
SETUPSystemInitialization
BEFORE_COMPACTIONContextBefore compaction
AFTER_COMPACTIONContextAfter compaction
MESSAGE_RECEIVEDMessageMessage received
MESSAGE_SENDINGMessageBefore message sent
MESSAGE_SENTMessageAfter message sent
GATEWAY_STARTGatewayGateway starts
GATEWAY_STOPGatewayGateway stops
TOOL_RESULT_PERSISTStorageBefore result storage
KANBAN_TASK_CREATEDKanbanTask added to board
KANBAN_TASK_CLAIMEDKanbanTask assigned
KANBAN_TASK_MOVEDKanbanTask status changed
KANBAN_TASK_DONEKanbanTask completed
KANBAN_TASK_BLOCKEDKanbanTask blocked
KANBAN_TASK_FAILEDKanbanTask failed

Best Practices

Hooks run synchronously. Avoid heavy operations that could slow down agent execution.
Use pattern matchers to only run hooks for specific tools or operations.
Return HookResult.allow() quickly for non-matching cases to minimize overhead.
Wrap hook logic in try/except to prevent breaking agent execution.

Hooks

Hook system overview

Kanban Tasks

Kanban hook events and lifecycle

Plugins

Plugin system with hooks