Skip to main content
A2A push notifications enable proactive task status updates via webhooks, while async processing with return_immediately allows non-blocking agent interactions.

Quick Start

1

Async Task Processing with return_immediately

Send requests that return immediately while the agent processes in the background.
import asyncio
from praisonaiagents import Agent
from praisonaiagents.ui.a2a import A2AClient

agent = Agent(
    name="Research Agent",
    instructions="Provide detailed research on any topic",
    model="gpt-4o"
)

async def main():
    async with A2AClient("http://localhost:8000") as client:
        # Non-blocking request - returns immediately
        result = await client.send_message(
            "Write a detailed report on AI trends",
            configuration={"returnImmediately": True}
        )
        
        task_id = result["result"]["id"]
        print(f"Task submitted: {task_id}")
        
        # Poll for completion
        while True:
            task = await client.get_task(task_id)
            state = task["result"]["status"]["state"]
            if state in ("completed", "failed", "cancelled"):
                break
            await asyncio.sleep(1)
        
        print(f"Task finished with state: {state}")

asyncio.run(main())
2

Setting Up Push Notifications

Configure webhook notifications for automatic task status updates.
from praisonaiagents import Agent
from praisonaiagents.ui.a2a import A2A, TaskPushNotificationConfig, AuthenticationInfo

class PushEnabledA2A(A2A):
    """A2A server with push notification support."""
    
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._push_configs = {}
    
    def _handle_push_notification(self, request_id, method, params):
        if method == "tasks/pushNotificationConfig/set":
            task_id = params.get("taskId")
            url = params.get("url")
            token = params.get("token")
            
            # Store configuration
            self._push_configs[task_id] = {
                "url": url,
                "token": token,
                "params": params
            }
            
            return JSONResponse(content={
                "jsonrpc": "2.0",
                "id": request_id,
                "result": {"taskId": task_id, "url": url},
            })
        
        return super()._handle_push_notification(request_id, method, params)

agent = Agent(name="Webhook Agent", instructions="Helper agent")
a2a = PushEnabledA2A(agent=agent)
a2a.serve(port=8000)

How It Works

ComponentPurposeBenefit
return_immediatelyNon-blocking task submissionImproved client responsiveness
Push NotificationsProactive status updatesEliminates need for constant polling
Webhook ConfigurationFlexible delivery endpointsIntegration with existing systems

Configuration Options

TaskPushNotificationConfig

Configure webhook delivery for task status updates.
OptionTypeDefaultDescription
idstrNoneConfiguration identifier
task_idstrRequiredTask to monitor for updates
urlstrRequiredWebhook endpoint URL
tokenstrNoneVerification token
authenticationAuthenticationInfoNoneAuth configuration

AuthenticationInfo

Authentication details for webhook requests.
OptionTypeDefaultDescription
schemestrRequiredAuth scheme (e.g., “Bearer”)
credentialsstrNoneAuth credentials/token

return_immediately Configuration

Blocking vs Non-blocking Requests

Default Behavior (Blocking):
# Request blocks until agent completes
response = await client.send_message("Complex analysis task")
# Response contains completed result
Non-blocking with return_immediately:
# Request returns immediately
response = await client.send_message(
    "Complex analysis task",
    configuration={"returnImmediately": True}
)
# Response contains task ID in "submitted" state

Polling Pattern

async def wait_for_completion(client, task_id, timeout=300):
    """Poll task until completion with timeout."""
    start_time = asyncio.get_event_loop().time()
    
    while True:
        if asyncio.get_event_loop().time() - start_time > timeout:
            raise TimeoutError("Task did not complete within timeout")
        
        task = await client.get_task(task_id)
        state = task["result"]["status"]["state"]
        
        if state == "completed":
            return task["result"]
        elif state in ("failed", "cancelled"):
            raise RuntimeError(f"Task {state}: {task['result']['status'].get('message', '')}")
        
        await asyncio.sleep(1)

Push Notification JSON-RPC Methods

Available Methods

MethodPurposeSupport
tasks/pushNotificationConfig/setRegister webhook configExtension required
tasks/pushNotificationConfig/getRetrieve webhook configExtension required
tasks/pushNotificationConfig/listList all webhook configsExtension required
tasks/pushNotificationConfig/deleteRemove webhook configExtension required

Default Error Response

When push notifications are not supported:
curl -X POST http://localhost:8000/a2a \
  -H "Content-Type: application/json" \
  -d '{
    "jsonrpc": "2.0",
    "method": "tasks/pushNotificationConfig/set",
    "id": "1",
    "params": {
      "taskId": "task-123",
      "url": "https://my-app.com/webhooks/a2a"
    }
  }'

# Response:
# {
#   "jsonrpc": "2.0", 
#   "id": "1",
#   "error": {
#     "code": -32601,
#     "message": "Push notifications not supported: tasks/pushNotificationConfig/set"
#   }
# }

Custom Push Notification Implementation

Complete Implementation Example

from fastapi.responses import JSONResponse
from praisonaiagents import Agent
from praisonaiagents.ui.a2a import A2A
import httpx
import asyncio

class WebhookA2A(A2A):
    """Full-featured A2A with webhook push notifications."""
    
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._push_configs = {}
    
    def get_agent_card(self):
        """Override to advertise push notification support."""
        card = super().get_agent_card()
        card.capabilities.push_notifications = True
        return card
    
    def _handle_push_notification(self, request_id, method, params):
        """Handle all push notification methods."""
        
        if method == "tasks/pushNotificationConfig/set":
            return self._set_push_config(request_id, params)
        elif method == "tasks/pushNotificationConfig/get":
            return self._get_push_config(request_id, params)
        elif method == "tasks/pushNotificationConfig/list":
            return self._list_push_configs(request_id, params)
        elif method == "tasks/pushNotificationConfig/delete":
            return self._delete_push_config(request_id, params)
        
        return super()._handle_push_notification(request_id, method, params)
    
    def _set_push_config(self, request_id, params):
        task_id = params.get("taskId")
        url = params.get("url")
        token = params.get("token")
        auth = params.get("authentication", {})
        
        if not task_id or not url:
            return self._error_response(request_id, -32602, "taskId and url required")
        
        config = {
            "taskId": task_id,
            "url": url,
            "token": token,
            "authentication": auth
        }
        
        self._push_configs[task_id] = config
        
        return JSONResponse(content={
            "jsonrpc": "2.0",
            "id": request_id,
            "result": config
        })
    
    def _get_push_config(self, request_id, params):
        task_id = params.get("taskId")
        config = self._push_configs.get(task_id)
        
        if not config:
            return self._error_response(request_id, -32000, "Config not found")
        
        return JSONResponse(content={
            "jsonrpc": "2.0",
            "id": request_id,
            "result": config
        })
    
    def _list_push_configs(self, request_id, params):
        configs = list(self._push_configs.values())
        
        return JSONResponse(content={
            "jsonrpc": "2.0",
            "id": request_id,
            "result": {"configs": configs}
        })
    
    def _delete_push_config(self, request_id, params):
        task_id = params.get("taskId")
        
        if task_id in self._push_configs:
            del self._push_configs[task_id]
            return JSONResponse(content={
                "jsonrpc": "2.0",
                "id": request_id,
                "result": {"deleted": True}
            })
        
        return self._error_response(request_id, -32000, "Config not found")
    
    def _error_response(self, request_id, code, message):
        return JSONResponse(content={
            "jsonrpc": "2.0",
            "id": request_id,
            "error": {"code": code, "message": message}
        })
    
    async def _notify_webhook(self, task_id, task_data):
        """Send webhook notification for task update."""
        config = self._push_configs.get(task_id)
        if not config:
            return
        
        headers = {"Content-Type": "application/json"}
        
        # Add authentication if configured
        auth = config.get("authentication", {})
        if auth.get("scheme") and auth.get("credentials"):
            headers["Authorization"] = f"{auth['scheme']} {auth['credentials']}"
        
        # Add verification token if configured
        if config.get("token"):
            headers["X-Webhook-Token"] = config["token"]
        
        payload = {
            "taskId": task_id,
            "status": task_data.get("status"),
            "timestamp": task_data.get("timestamp")
        }
        
        try:
            async with httpx.AsyncClient() as client:
                response = await client.post(
                    config["url"],
                    json=payload,
                    headers=headers,
                    timeout=10.0
                )
                response.raise_for_status()
        except Exception as e:
            # Log webhook delivery failure
            print(f"Webhook delivery failed for task {task_id}: {e}")

# Usage
agent = Agent(name="Webhook Agent", instructions="Helper")
a2a = WebhookA2A(agent=agent)
a2a.serve(port=8000)

Webhook Client Configuration

import asyncio
from praisonaiagents.ui.a2a import A2AClient

async def configure_webhook():
    async with A2AClient("http://localhost:8000") as client:
        # Set webhook for task notifications
        config_response = await client.call_method(
            "tasks/pushNotificationConfig/set",
            {
                "taskId": "task-123",
                "url": "https://my-app.com/webhooks/a2a",
                "token": "webhook-secret",
                "authentication": {
                    "scheme": "Bearer",
                    "credentials": "my-auth-token"
                }
            }
        )
        
        print("Webhook configured:", config_response)

asyncio.run(configure_webhook())

Agent Card Capabilities

Push Notification Advertisement

The Agent Card declares push notification support:
{
  "name": "Webhook Agent",
  "url": "http://localhost:8000/a2a",
  "version": "1.0.0",
  "capabilities": {
    "streaming": true,
    "pushNotifications": true
  }
}
When pushNotifications is false (default), all push notification methods return -32601 errors.

Common Patterns

async def async_task_with_polling():
    async with A2AClient("http://localhost:8000") as client:
        # Submit task immediately
        result = await client.send_message(
            "Generate a 50-page report",
            configuration={"returnImmediately": True}
        )
        
        task_id = result["result"]["id"]
        
        # Poll with exponential backoff
        delay = 1
        while True:
            task = await client.get_task(task_id)
            state = task["result"]["status"]["state"]
            
            if state in ("completed", "failed", "cancelled"):
                return task["result"]
            
            await asyncio.sleep(min(delay, 30))
            delay *= 1.5

Best Practices

Implement proper webhook handling with retries and failure logging.
async def reliable_webhook_delivery(url, payload, max_retries=3):
    for attempt in range(max_retries):
        try:
            async with httpx.AsyncClient() as client:
                response = await client.post(url, json=payload, timeout=30)
                response.raise_for_status()
                return True
        except Exception as e:
            if attempt == max_retries - 1:
                # Log final failure
                logger.error(f"Webhook failed after {max_retries} attempts: {e}")
                return False
            await asyncio.sleep(2 ** attempt)  # Exponential backoff
Set appropriate timeouts for long-running tasks.
async def task_with_timeout(client, message, timeout=600):
    result = await client.send_message(
        message,
        configuration={"returnImmediately": True}
    )
    
    task_id = result["result"]["id"]
    start_time = asyncio.get_event_loop().time()
    
    while True:
        elapsed = asyncio.get_event_loop().time() - start_time
        if elapsed > timeout:
            # Cancel the task
            await client.call_method("tasks/cancel", {"id": task_id})
            raise TimeoutError(f"Task {task_id} timed out after {timeout}s")
        
        task = await client.get_task(task_id)
        if task["result"]["status"]["state"] in ("completed", "failed", "cancelled"):
            break
        
        await asyncio.sleep(5)
    
    return task["result"]
Always validate webhook authenticity and implement proper error handling.
class SecureWebhookHandler:
    def __init__(self, secret_token: str):
        self.secret_token = secret_token
    
    def verify_token(self, request_token: str) -> bool:
        return hmac.compare_digest(self.secret_token, request_token or "")
    
    async def handle_webhook(self, payload: dict, token: str):
        if not self.verify_token(token):
            raise ValueError("Invalid webhook token")
        
        task_id = payload.get("taskId")
        if not task_id:
            raise ValueError("Missing taskId in webhook payload")
        
        # Process webhook safely
        await self.process_task_update(task_id, payload)
Design your application to work with or without push notifications.
class AdaptiveA2AClient:
    def __init__(self, url: str):
        self.client = A2AClient(url)
        self.supports_push = None
    
    async def check_push_support(self):
        try:
            # Try to set a test webhook config
            await self.client.call_method("tasks/pushNotificationConfig/set", {
                "taskId": "test",
                "url": "https://test.example.com"
            })
            self.supports_push = True
        except:
            self.supports_push = False
    
    async def submit_task(self, message: str):
        if self.supports_push is None:
            await self.check_push_support()
        
        if self.supports_push:
            # Use webhooks for notification
            return await self._submit_with_webhook(message)
        else:
            # Fall back to polling
            return await self._submit_with_polling(message)

A2A Protocol

Core A2A server setup and configuration

A2A Tasks

Task lifecycle and state management

A2A Client

Client SDK with polling examples

Async Features

General async processing patterns