import httpx
import json
class SimpleMCPHttpClient:
def __init__(self, base_url):
self.base_url = base_url
async def call_tool(self, name, args):
payload = {
"jsonrpc": "2.0",
"method": "tools/call",
"params": {
"name": name,
"arguments": args
},
"id": 1
}
async with httpx.AsyncClient() as client:
resp = await client.post(self.base_url, json=payload, headers={"Accept": "application/json, text/event-stream"})
if resp.status_code != 200:
return f"Error: {resp.status_code} - {resp.text}"
# Parse SSE response
lines = resp.text.strip().split('\n')
for line in lines:
if line.startswith('data: '):
data = line[6:] # remove 'data: '
json_data = json.loads(data)
if 'result' in json_data and 'content' in json_data['result']:
content = json_data['result']['content']
if content and content[0]['type'] == 'text':
return content[0]['text']
return resp.text # fallback
async def list_tools(self):
payload = {
"jsonrpc": "2.0",
"method": "tools/list",
"params": {},
"id": 2
}
async with httpx.AsyncClient() as client:
resp = await client.post(self.base_url, json=payload, headers={"Accept": "application/json, text/event-stream"})
if resp.status_code != 200:
return f"Error: {resp.status_code} - {resp.text}"
# Parse SSE response
lines = resp.text.strip().split('\n')
for line in lines:
if line.startswith('data: '):
data = line[6:] # remove 'data: '
json_data = json.loads(data)
if 'result' in json_data and 'tools' in json_data['result']:
return json_data['result']['tools']
return [] # fallback