Skip to main content
Glama

Poe Proxy MCP Server

openai_client.py16.4 kB
""" OpenAI-compatible POE API client for dual-client architecture. This module provides OpenAI SDK-based client for POE's new REST API endpoint, supporting function calling, advanced parameters, and proper error handling. """ import os import json import time import uuid import asyncio from typing import Dict, List, Optional, Any, AsyncGenerator, Union from concurrent.futures import ThreadPoolExecutor import httpx import openai from openai import AsyncOpenAI, OpenAI from pydantic import BaseModel, Field from loguru import logger from utils import ( PoeProxyError, AuthenticationError, PoeApiError, handle_exception, ) # Tool execution registry TOOL_REGISTRY: Dict[str, callable] = {} class OpenAITool(BaseModel): """OpenAI-compatible tool definition.""" type: str = "function" function: Dict[str, Any] class OpenAIMessage(BaseModel): """OpenAI-compatible message format.""" role: str content: Optional[str] = None name: Optional[str] = None tool_calls: Optional[List[Dict[str, Any]]] = None tool_call_id: Optional[str] = None class OpenAICompletionRequest(BaseModel): """OpenAI-compatible chat completion request.""" model: str messages: List[Dict[str, Any]] tools: Optional[List[Dict[str, Any]]] = None tool_choice: Optional[Union[str, Dict[str, Any]]] = None parallel_tool_calls: Optional[bool] = None max_tokens: Optional[int] = None temperature: Optional[float] = Field(None, ge=0, le=2) top_p: Optional[float] = Field(None, ge=0, le=1) stop: Optional[Union[str, List[str]]] = None stream: Optional[bool] = False stream_options: Optional[Dict[str, Any]] = None user: Optional[str] = None class PoeOpenAIClient: """ OpenAI-compatible client for POE API. This client uses the OpenAI SDK to interact with POE's REST API, providing full support for function calling, streaming, and advanced parameters. """ def __init__( self, api_key: str, base_url: str = "https://api.poe.com/v1", async_mode: bool = True, debug_mode: bool = False ): """ Initialize the OpenAI-compatible POE client. Args: api_key: POE API key base_url: Base URL for POE API (default: https://api.poe.com/v1) async_mode: Whether to use async client debug_mode: Enable debug logging """ if not api_key: raise AuthenticationError( "POE API key is required. Get your API key from https://poe.com/api_key" ) self.api_key = api_key self.base_url = base_url self.debug_mode = debug_mode # Initialize OpenAI clients if async_mode: self.client = AsyncOpenAI( api_key=api_key, base_url=base_url, timeout=60.0, max_retries=3, ) else: self.client = OpenAI( api_key=api_key, base_url=base_url, timeout=60.0, max_retries=3, ) self.async_mode = async_mode logger.info(f"POE OpenAI client initialized (async={async_mode})") def register_tool(self, name: str, func: callable, description: str = "", parameters: Dict = None): """ Register a tool for function calling. Args: name: Tool name func: Callable function description: Tool description parameters: JSON schema for parameters """ TOOL_REGISTRY[name] = func logger.debug(f"Registered tool: {name}") def get_tool_definition(self, name: str, description: str, parameters: Dict) -> Dict: """ Create OpenAI-compatible tool definition. Args: name: Tool name description: Tool description parameters: JSON schema for parameters Returns: OpenAI tool definition """ return { "type": "function", "function": { "name": name, "description": description, "parameters": parameters } } async def execute_tool_call(self, tool_call: Dict) -> Dict: """ Execute a single tool call. Args: tool_call: Tool call from OpenAI response Returns: Tool execution result """ name = tool_call["function"]["name"] try: args = json.loads(tool_call["function"]["arguments"]) if name not in TOOL_REGISTRY: raise ValueError(f"Unknown tool: {name}") func = TOOL_REGISTRY[name] # Handle async and sync functions if asyncio.iscoroutinefunction(func): result = await func(**args) else: result = func(**args) return { "tool_call_id": tool_call["id"], "output": json.dumps(result) if not isinstance(result, str) else result } except Exception as e: logger.error(f"Tool execution failed for {name}: {str(e)}") return { "tool_call_id": tool_call["id"], "output": json.dumps({"error": str(e)}) } async def process_tool_calls(self, tool_calls: List[Dict]) -> List[Dict]: """ Process multiple tool calls, potentially in parallel. Args: tool_calls: List of tool calls from OpenAI response Returns: List of tool execution results """ if not tool_calls: return [] # Execute tool calls concurrently tasks = [self.execute_tool_call(call) for call in tool_calls] results = await asyncio.gather(*tasks) return results async def chat_completion( self, model: str, messages: List[Dict[str, Any]], tools: Optional[List[Dict[str, Any]]] = None, tool_choice: Optional[Union[str, Dict[str, Any]]] = None, parallel_tool_calls: Optional[bool] = None, max_tokens: Optional[int] = None, temperature: Optional[float] = None, top_p: Optional[float] = None, stop: Optional[Union[str, List[str]]] = None, stream: bool = False, stream_options: Optional[Dict[str, Any]] = None, user: Optional[str] = None, auto_execute_tools: bool = True, ) -> Union[Dict, AsyncGenerator]: """ Create a chat completion with OpenAI-compatible API. Args: model: Model name (e.g., "Claude-Opus-4.1", "Gemini-2.5-Pro") messages: Conversation messages tools: Tool definitions tool_choice: Tool selection strategy parallel_tool_calls: Enable parallel tool execution max_tokens: Maximum tokens to generate temperature: Sampling temperature (0-2) top_p: Nucleus sampling parameter stop: Stop sequences stream: Enable streaming response stream_options: Streaming configuration user: User identifier for sessions auto_execute_tools: Automatically execute tool calls Returns: Chat completion response or stream """ try: # Build request parameters params = { "model": model, "messages": messages, "stream": stream, } # Add optional parameters if tools: params["tools"] = tools if tool_choice is not None: params["tool_choice"] = tool_choice if parallel_tool_calls is not None: params["parallel_tool_calls"] = parallel_tool_calls if max_tokens is not None: params["max_tokens"] = max_tokens if temperature is not None: params["temperature"] = temperature if top_p is not None: params["top_p"] = top_p if stop is not None: params["stop"] = stop if stream_options: params["stream_options"] = stream_options if user: params["user"] = user if self.debug_mode: logger.debug(f"Chat completion request: model={model}, stream={stream}, tools={bool(tools)}") # Make the API call if self.async_mode: response = await self.client.chat.completions.create(**params) else: response = self.client.chat.completions.create(**params) # Handle streaming response if stream: return self._handle_stream(response) # Handle tool calls if present and auto-execution is enabled if auto_execute_tools and hasattr(response.choices[0].message, 'tool_calls') and response.choices[0].message.tool_calls: tool_calls = [ { "id": tc.id, "function": { "name": tc.function.name, "arguments": tc.function.arguments } } for tc in response.choices[0].message.tool_calls ] # Execute tools tool_results = await self.process_tool_calls(tool_calls) # Add tool results to messages for result in tool_results: messages.append({ "role": "tool", "tool_call_id": result["tool_call_id"], "content": result["output"] }) # Make another call to get final response return await self.chat_completion( model=model, messages=messages, tools=tools, max_tokens=max_tokens, temperature=temperature, top_p=top_p, stop=stop, stream=stream, user=user, auto_execute_tools=False # Prevent infinite recursion ) # Convert response to dict for compatibility return self._response_to_dict(response) except openai.AuthenticationError as e: raise AuthenticationError(f"POE API authentication failed: {str(e)}") except openai.RateLimitError as e: # Extract retry-after header if available retry_after = getattr(e, 'retry_after', None) raise PoeApiError(f"Rate limit exceeded. Retry after: {retry_after}") except openai.APIError as e: raise PoeApiError(f"POE API error: {str(e)}") except Exception as e: logger.error(f"Unexpected error in chat completion: {str(e)}") raise PoeProxyError(f"Chat completion failed: {str(e)}") async def _handle_stream(self, stream) -> AsyncGenerator: """Handle streaming response.""" try: async for chunk in stream: yield { "id": chunk.id, "object": "chat.completion.chunk", "created": chunk.created, "model": chunk.model, "choices": [ { "index": choice.index, "delta": { "content": choice.delta.content, "tool_calls": choice.delta.tool_calls if hasattr(choice.delta, 'tool_calls') else None }, "finish_reason": choice.finish_reason } for choice in chunk.choices ] } except Exception as e: logger.error(f"Streaming error: {str(e)}") raise PoeApiError(f"Streaming failed: {str(e)}") def _response_to_dict(self, response) -> Dict: """Convert OpenAI response object to dictionary.""" return { "id": response.id, "object": response.object, "created": response.created, "model": response.model, "choices": [ { "index": choice.index, "message": { "role": choice.message.role, "content": choice.message.content, "tool_calls": [ { "id": tc.id, "type": tc.type, "function": { "name": tc.function.name, "arguments": tc.function.arguments } } for tc in (choice.message.tool_calls or []) ] if hasattr(choice.message, 'tool_calls') else None }, "finish_reason": choice.finish_reason } for choice in response.choices ], "usage": { "prompt_tokens": response.usage.prompt_tokens if response.usage else 0, "completion_tokens": response.usage.completion_tokens if response.usage else 0, "total_tokens": response.usage.total_tokens if response.usage else 0, } if hasattr(response, 'usage') else None } @staticmethod def map_error_to_openai_format(error: Exception) -> Dict: """ Map internal errors to OpenAI error format. Args: error: Exception to map Returns: OpenAI-compatible error response """ error_mapping = { AuthenticationError: ("authentication_error", 401), PoeApiError: ("api_error", 500), ValueError: ("invalid_request_error", 400), } error_type, status_code = error_mapping.get( type(error), ("internal_error", 500) ) return { "error": { "message": str(error), "type": error_type, "code": status_code, } } # Example tool implementations def example_get_weather(city: str) -> Dict: """Example weather tool.""" return {"weather": f"Sunny in {city}", "temperature": "72°F"} def example_calculate(expression: str) -> Dict: """Example calculator tool.""" try: result = eval(expression, {"__builtins__": {}}, {}) return {"result": result} except Exception as e: return {"error": str(e)} # Tool definitions for examples EXAMPLE_TOOL_DEFINITIONS = [ { "type": "function", "function": { "name": "get_weather", "description": "Get weather information for a city", "parameters": { "type": "object", "properties": { "city": { "type": "string", "description": "City name" } }, "required": ["city"] } } }, { "type": "function", "function": { "name": "calculate", "description": "Perform mathematical calculation", "parameters": { "type": "object", "properties": { "expression": { "type": "string", "description": "Mathematical expression to evaluate" } }, "required": ["expression"] } } } ]

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Anansitrading/poe-proxy-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server