Skip to main content
Glama

Model Context Protocol Server

rest.py1.74 kB
from typing import Dict, Any from .base import BaseContextProvider import aiohttp class RESTContextProvider(BaseContextProvider): def __init__(self, base_url: str, headers: Dict[str, str] = None): self.base_url = base_url self.headers = headers or {} self.session = None async def initialize(self): """Initialize the HTTP session""" if not self.session: self.session = aiohttp.ClientSession(headers=self.headers) async def get_context(self, query: str, **kwargs) -> Dict[str, Any]: """Get context from REST API""" if not self.session: await self.initialize() # Example implementation - modify based on your needs endpoint = f"{self.base_url}/search" params = { "q": query, **kwargs } async with self.session.get(endpoint, params=params) as response: result = await response.json() return { "data": result, "source": "rest", "query": query } async def validate_connection(self) -> bool: """Validate the REST API connection""" try: if not self.session: await self.initialize() # Try to access a health check or status endpoint async with self.session.get(f"{self.base_url}/health") as response: return response.status == 200 except Exception as e: print(f"REST API connection error: {e}") return False async def close(self): """Close the HTTP session""" if self.session: await self.session.close() self.session = None

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Shekharmaheswari85/MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server