The MCP SSE (Server-Sent Events) Transport module provides real-time communication between PraisonAI agents and MCP servers using Server-Sent Events. This transport layer enables agents to use tools from MCP servers that communicate via SSE instead of stdio.
from praisonaiagents.mcp import MCP# Create MCP client with SSE transportmcp = MCP( server_url="http://localhost:8080/sse", transport="sse")# The client automatically connects and discovers available tools
3
Use MCP tools with agents
Copy
from praisonaiagents import Agent# Create agent with MCP toolsagent = Agent( name="Assistant", role="AI Assistant with MCP tools", tools=mcp.get_tools())# Agent can now use all tools from the MCP serverresponse = agent.chat("Search for Python tutorials")
async def connect(self) -> None: """Establish SSE connection and discover server capabilities."""
discover_tools()
Copy
async def discover_tools(self) -> List[Dict[str, Any]]: """ Discover available tools from the MCP server. Returns: List of tool definitions with schemas """
invoke_tool()
Copy
async def invoke_tool( self, name: str, arguments: Dict[str, Any]) -> Any: """ Invoke a tool on the MCP server. Args: name: Tool name arguments: Tool arguments Returns: Tool execution result """
from praisonaiagents.mcp import MCPfrom praisonaiagents import Agent# Connect to an SSE MCP servermcp = MCP( server_url="http://localhost:3000/sse", transport="sse")# Create agent with MCP toolsagent = Agent( name="SearchAgent", role="Web Search Specialist", tools=mcp.get_tools())# Use the toolsresult = agent.chat("Find the latest news about AI")
# SSE transport automatically handles streaming responsesagent = Agent( name="StreamingAgent", tools=mcp.get_tools())# Tool responses can include streamed dataresponse = agent.chat("Generate a long report about climate change")# The SSE transport will handle incremental updates automatically
class StreamProcessor(SSEMCPClient): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.stream_buffer = [] async def _handle_event(self, event: Dict[str, Any]): if event.get("type") == "stream": # Buffer streaming data self.stream_buffer.append(event.get("data")) # Process when buffer is full if len(self.stream_buffer) >= 10: await self._process_buffer() else: await super()._handle_event(event) async def _process_buffer(self): # Process buffered stream data combined_data = "".join(self.stream_buffer) # ... process combined data ... self.stream_buffer.clear()