# src/obenan_login/server.py
import asyncio
import os
import platform
import time
import traceback
import requests
import json
from typing import Any
import mcp.types as types
from mcp.server import Server, NotificationOptions
from mcp.server.models import InitializationOptions
import mcp.server.stdio
# 1) Instantiate your MCP server
server = Server("obenan-mcp-server")
# 2) Declare the "login" tool
@server.list_tools()
async def list_tools() -> list[types.Tool]:
return [
types.Tool(
name="fetch_my_locations",
description="Fetch locations from Obenan API using the access token",
inputSchema={
"type": "object",
"properties": {
"access_token": {"type": "string", "description": "Access token for Obenan API. Optional if OBENAN_LOGIN_ACCESS_TOKEN environment variable is set."},
"group_id": {"type": "string", "description": "Group ID to filter locations by. Optional."}
},
"required": []
},
),
types.Tool(
name="get_location_details",
description="Get detailed information about a specific location by ID",
inputSchema={
"type": "object",
"properties": {
"location_id": {"type": "string", "description": "ID of the location to get details for"},
"access_token": {"type": "string", "description": "Access token for Obenan API. Optional if OBENAN_LOGIN_ACCESS_TOKEN environment variable is set."}
},
"required": ["location_id"]
},
),
types.Tool(
name="search_locations_by_name",
description="Search for locations containing a specific name and allow selection to get details",
inputSchema={
"type": "object",
"properties": {
"search_term": {"type": "string", "description": "Term to search for in location names"},
"access_token": {"type": "string", "description": "Access token for Obenan API. Optional if OBENAN_LOGIN_ACCESS_TOKEN environment variable is set."}
},
"required": ["search_term"]
},
),
types.Tool(
name="review_analyzer",
description="Analyze reviews using the Obenan Review Analyzer API",
inputSchema={
"type": "object",
"properties": {
"prompt": {"type": "string", "description": "Question or prompt for the review analyzer"}
},
"required": ["prompt"]
},
)
]
# 3) Handle calls to the tools
@server.call_tool()
async def call_tool(
name: str,
arguments: dict[str, Any] | None
) -> list[types.TextContent]:
if name == "fetch_my_locations":
return await handle_fetch_locations(arguments)
elif name == "get_location_details":
return await handle_location_details(arguments)
elif name == "search_locations_by_name":
return await handle_search_locations_by_name(arguments)
elif name == "review_analyzer":
return await handle_review_analyzer(arguments)
else:
raise RuntimeError(f"Unknown tool: {name}")
# Handle fetch_my_locations tool
async def handle_fetch_locations(
arguments: dict[str, Any] | None
) -> list[types.TextContent]:
# Get token from environment variable or from arguments
import os
access_token = os.environ.get("OBENAN_LOGIN_ACCESS_TOKEN")
# Add debug info about the token (just showing first/last few characters for security)
token_debug = ""
if access_token:
if len(access_token) > 10:
token_debug = f"{access_token[:5]}...{access_token[-5:]}"
else:
token_debug = "[too short to truncate safely]"
else:
token_debug = "[None - Token not found]"
try:
# Direct API call with simplified URL
url = "https://stagingapi.obenan.com/api/v1/location/search?isLocationPage=false&isListingPage=true"
headers = {"Authorization": f"Bearer {access_token}"}
response = requests.get(url, headers=headers)
if response.status_code == 200:
data = response.json()
# Simple output format focusing just on location names
formatted_response = f"β
Location Names: (Token: {token_debug})\n\n"
# Check for data.results path
if data and isinstance(data, dict) and "data" in data:
if isinstance(data["data"], dict) and "results" in data["data"]:
locations = data["data"]["results"]
if locations and isinstance(locations, list):
for i, loc in enumerate(locations):
name = loc.get("name", "Unknown")
formatted_response += f"{i+1}. {name}\n"
else:
formatted_response += "No locations found in results.\n"
else:
formatted_response += "Results field not found in data structure.\n"
else:
formatted_response += "No data field found in response.\n"
# Add response debug info
formatted_response += f"\n===DEBUG INFO===\n"
formatted_response += f"Response Keys: {list(data.keys()) if isinstance(data, dict) else 'Not a dict'}\n"
if isinstance(data, dict) and "data" in data:
data_keys = list(data["data"].keys()) if isinstance(data["data"], dict) else "Not a dict"
formatted_response += f"Data Keys: {data_keys}\n"
if isinstance(data["data"], dict) and "results" in data["data"]:
location_count = len(data["data"]["results"]) if isinstance(data["data"]["results"], list) else 0
formatted_response += f"Location Count: {location_count}\n"
return [types.TextContent(type="text", text=formatted_response)]
else:
error_msg = f"β Failed to fetch locations: HTTP {response.status_code}\n{response.text[:500]}"
return [types.TextContent(type="text", text=error_msg)]
except Exception as e:
error_trace = traceback.format_exc()
return [types.TextContent(
type="text",
text=f"π¨ Error fetching locations: {str(e)}\n\n{error_trace[:500]}"
)]
# Handle get_location_details tool
async def handle_location_details(
arguments: dict[str, Any] | None
) -> list[types.TextContent]:
# Get location ID from arguments
location_id = arguments.get("location_id")
if not location_id:
return [types.TextContent(
type="text",
text="β Error: Location ID is required to get location details"
)]
# Get token from environment variable or from arguments
access_token = os.environ.get("OBENAN_LOGIN_ACCESS_TOKEN")
if not access_token:
return [types.TextContent(
type="text",
text="β Error: Access token is required. Either provide it as an argument or set the OBENAN_LOGIN_ACCESS_TOKEN environment variable."
)]
try:
# Make API call to get location details - updated URL to correct endpoint
url = f"https://stagingapi.obenan.com/api/v1/location/{location_id}"
headers = {
"Authorization": f"Bearer {access_token}",
"Origin": "https://stagingapp.obenan.com"
}
response = requests.get(url, headers=headers)
if response.status_code == 200:
data = response.json()
# Format the response for better readability
formatted_response = f"β
Location Details for ID: {location_id}\n\n"
# Extract and format key information based on the actual response structure
if data and isinstance(data, dict) and "data" in data:
# Check if data.data is a dict with location fields or if it has a nested 'result' field
location_data = data["data"]
# If location_data has a 'result' field, use that instead
if isinstance(location_data, dict) and "result" in location_data:
location_data = location_data["result"]
# Basic information
name = location_data.get("name", "Unknown")
formatted_response += f"Name: {name}\n"
internal_name = location_data.get("internalName", "Unknown")
formatted_response += f"Internal Name: {internal_name}\n"
# Address information
address_line1 = location_data.get("addressLine1", "")
address_line2 = location_data.get("addressLine2", "")
city = location_data.get("city", "")
postal_code = location_data.get("postalCode", "")
country_code = location_data.get("countryCode", "")
address_parts = []
if address_line1:
address_parts.append(address_line1)
if address_line2 and address_line2 != "null":
address_parts.append(address_line2)
if city:
address_parts.append(city)
if postal_code:
address_parts.append(postal_code)
if country_code:
address_parts.append(country_code)
address_str = ", ".join(filter(None, address_parts))
formatted_response += f"Address: {address_str}\n"
# Contact information
formatted_response += f"Phone: {location_data.get('telephone', 'N/A')}\n"
formatted_response += f"Email: {location_data.get('businessEmail', 'N/A')}\n"
formatted_response += f"Website: {location_data.get('website', 'N/A')}\n\n"
# Status and dates
formatted_response += f"Status: {location_data.get('status', 'N/A')}\n"
formatted_response += f"Created: {location_data.get('createdAt', 'N/A')}\n"
formatted_response += f"Updated: {location_data.get('updatedAt', 'N/A')}\n\n"
# Category information
formatted_response += f"Category: {location_data.get('category', 'N/A')}\n"
formatted_response += f"Google Category: {location_data.get('googleCategory', 'N/A')}\n\n"
# Additional details
formatted_response += f"Reporting Enabled: {location_data.get('reportingEnabled', 'N/A')}\n"
formatted_response += f"SoConnect Status: {location_data.get('soconnect_connectivity_status', 'N/A')}\n"
# Company information if available
company = location_data.get('company', {})
if isinstance(company, dict) and company.get('name'):
formatted_response += f"Company: {company.get('name', 'N/A')}\n\n"
# Add debug information about the data structure
formatted_response += "===DATA STRUCTURE===\n"
if "result" in data.get("data", {}):
formatted_response += "Response contains nested 'result' object\n\n"
else:
formatted_response += "Response contains direct location data\n\n"
# Include full JSON response for reference (truncated for readability)
formatted_response += f"===FULL RESPONSE (TRUNCATED)===\n{json.dumps(data, indent=2)[:1000]}...\n"
else:
formatted_response += "No data found in response.\n"
return [types.TextContent(type="text", text=formatted_response)]
else:
# Include full error response
error_msg = f"β Failed to fetch location details: HTTP {response.status_code}\n{response.text}"
return [types.TextContent(type="text", text=error_msg)]
except Exception as e:
error_trace = traceback.format_exc()
return [types.TextContent(
type="text",
text=f"π¨ Error fetching location details: {str(e)}\n\nURL: {url}\n\n{error_trace}"
)]
# Handle search_locations_by_name tool
async def handle_search_locations_by_name(
arguments: dict[str, Any] | None
) -> list[types.TextContent]:
# Get search term from arguments
search_term = arguments.get("search_term")
if not search_term:
return [types.TextContent(
type="text",
text="β Error: Search term is required to search for locations"
)]
# Get token from environment variable or from arguments
access_token = os.environ.get("OBENAN_LOGIN_ACCESS_TOKEN")
if not access_token:
return [types.TextContent(
type="text",
text="β Error: Access token is required. Either provide it as an argument or set the OBENAN_LOGIN_ACCESS_TOKEN environment variable."
)]
try:
# First fetch all locations
url = "https://stagingapi.obenan.com/api/v1/location/search?isLocationPage=false&isListingPage=true"
headers = {"Authorization": f"Bearer {access_token}"}
response = requests.get(url, headers=headers)
if response.status_code == 200:
data = response.json()
# Format the response for better readability
formatted_response = f"π Searching for locations containing: '{search_term}'\n\n"
# Extract and filter locations
matching_locations = []
if data and isinstance(data, dict) and "data" in data:
if isinstance(data["data"], dict) and "results" in data["data"]:
locations = data["data"]["results"]
if locations and isinstance(locations, list):
# Filter locations by search term
for loc in locations:
name = loc.get("name", "")
if search_term.lower() in name.lower():
matching_locations.append(loc)
# Display matching locations
if matching_locations:
formatted_response += f"Found {len(matching_locations)} matching location(s):\n\n"
for i, loc in enumerate(matching_locations):
name = loc.get("name", "Unknown")
location_id = loc.get("id", "Unknown")
formatted_response += f"{i+1}. {name} (ID: {location_id})\n"
# Add instructions for getting details
formatted_response += "\nπ To get details for a specific location, use the get_location_details tool with the location ID.\n"
formatted_response += "Example: get_location_details({\"location_id\": \"[ID_FROM_ABOVE]\"})"
# If there's only one result, automatically show details
if len(matching_locations) == 1:
location_id = matching_locations[0].get("id")
formatted_response += f"\n\nAutomatically fetching details for the only match: {matching_locations[0].get('name')}\n"
formatted_response += "==============================================\n\n"
# Get location details
detail_url = f"https://stagingapi.obenan.com/api/v1/location/{location_id}"
detail_headers = {
"Authorization": f"Bearer {access_token}",
"Origin": "https://stagingapp.obenan.com"
}
detail_response = requests.get(detail_url, headers=detail_headers)
if detail_response.status_code == 200:
detail_data = detail_response.json()
# Format the details
if detail_data and isinstance(detail_data, dict) and "data" in detail_data:
# Check if data.data is a dict with location fields or if it has a nested 'result' field
location_data = detail_data["data"]
# If location_data has a 'result' field, use that instead
if isinstance(location_data, dict) and "result" in location_data:
location_data = location_data["result"]
# Basic information
name = location_data.get("name", "Unknown")
formatted_response += f"Name: {name}\n"
internal_name = location_data.get("internalName", "Unknown")
formatted_response += f"Internal Name: {internal_name}\n"
# Address information
address_line1 = location_data.get("addressLine1", "")
address_line2 = location_data.get("addressLine2", "")
city = location_data.get("city", "")
postal_code = location_data.get("postalCode", "")
country_code = location_data.get("countryCode", "")
address_parts = []
if address_line1:
address_parts.append(address_line1)
if address_line2 and address_line2 != "null":
address_parts.append(address_line2)
if city:
address_parts.append(city)
if postal_code:
address_parts.append(postal_code)
if country_code:
address_parts.append(country_code)
address_str = ", ".join(filter(None, address_parts))
formatted_response += f"Address: {address_str}\n"
# Contact information
formatted_response += f"Phone: {location_data.get('telephone', 'N/A')}\n"
formatted_response += f"Email: {location_data.get('businessEmail', 'N/A')}\n"
formatted_response += f"Website: {location_data.get('website', 'N/A')}\n\n"
else:
formatted_response += f"\nCould not fetch details automatically: HTTP {detail_response.status_code}\n"
else:
formatted_response += f"No locations found matching '{search_term}'.\n"
return [types.TextContent(type="text", text=formatted_response)]
else:
error_msg = f"β Failed to search locations: HTTP {response.status_code}\n{response.text[:500]}"
return [types.TextContent(type="text", text=error_msg)]
except Exception as e:
error_trace = traceback.format_exc()
return [types.TextContent(
type="text",
text=f"π¨ Error searching locations: {str(e)}\n\n{error_trace[:500]}"
)]
# Handle review analyzer tool
async def handle_review_analyzer(
arguments: dict[str, Any] | None
) -> list[types.TextContent]:
prompt = arguments.get("prompt")
if not prompt:
return [types.TextContent(
type="text",
text="β Error: Prompt is required for review analysis"
)]
try:
# Use access token from environment for authorization if needed
access_token = os.environ.get("OBENAN_LOGIN_ACCESS_TOKEN")
# Prepare the payload with user prompt and hardcoded values
payload = {
"prompt": prompt,
"location_id": [471, 472, 475],
"thirdPartyReviewSourcesId": [69],
"companyId": [175]
}
# Set up headers if token is available
headers = {}
if access_token:
headers["Authorization"] = f"Bearer {access_token}"
headers["Content-Type"] = "application/json"
# Make the POST request
url = "https://reviewanalyser.obenan.com/chat"
response = requests.post(url, json=payload, headers=headers)
if response.status_code == 200:
data = response.json()
# Format the response for better readability
formatted_response = f"β
Review Analysis Result\n\n"
# Add text response if available
if "response" in data:
formatted_response += f"Analysis: {data['response']}\n\n"
# Format graph data if available
if "graph_response" in data and isinstance(data["graph_response"], dict):
graph = data["graph_response"]
formatted_response += f"Chart Type: {graph.get('chart_type', 'Unknown')}\n"
# Handle columns
if "columns" in graph and isinstance(graph["columns"], list):
formatted_response += f"Columns: {', '.join(graph['columns'])}\n\n"
# Handle data
if "data" in graph and isinstance(graph["data"], list):
formatted_response += f"Data:\n"
for item in graph["data"]:
for key, value in item.items():
formatted_response += f" {key}: {value}\n"
formatted_response += "\n"
# Include full JSON response for reference
formatted_response += f"\n===FULL RESPONSE===\n{json.dumps(data, indent=2)}"
return [types.TextContent(type="text", text=formatted_response)]
else:
error_msg = f"β Failed to analyze reviews: HTTP {response.status_code}\n{response.text[:500]}"
return [types.TextContent(type="text", text=error_msg)]
except Exception as e:
error_trace = traceback.format_exc()
return [types.TextContent(
type="text",
text=f"π¨ Error analyzing reviews: {str(e)}\n\n{error_trace[:500]}"
)]
# 10) Wire up the stdio transport and entrypoint
async def main() -> None:
async with mcp.server.stdio.stdio_server() as (reader, writer):
await server.run(
reader,
writer,
InitializationOptions(
server_name="obenan-mcp-server",
server_version="0.1.0",
capabilities=server.get_capabilities(
notification_options=NotificationOptions(),
experimental_capabilities={},
),
),
)