"""
MCP Protocol Handler for Zintlr MCP Server.
Implements the Model Context Protocol (MCP) JSON-RPC 2.0 interface.
Handles initialization, tool listing, and tool execution.
Protocol Reference: https://modelcontextprotocol.io/specification/2025-06-18
"""
import json
from typing import Any
from fastapi import Request
from fastapi.responses import JSONResponse
from app.config import settings
from app.session import session_manager
from app.api_client import ZintlrAPIClient
# ============ Tool Definitions ============
TOOL_DEFINITIONS = [
{
"name": "search_prospects",
"description": """Search for people or companies using filters.
Use this tool to find prospects by location, industry, job title, company size, etc.
For searching by company name or domain, use search_by_company_name or search_by_company_domain first to get company_id, then use it in filters.""",
"inputSchema": {
"type": "object",
"properties": {
"search_query": {
"type": "string",
"description": "LinkedIn profile URL (person or company). Leave empty when using filters only."
},
"data_type": {
"type": "string",
"enum": ["person", "company"],
"description": "Type of search: 'person' for people, 'company' for companies",
"default": "person"
},
"filters": {
"type": "object",
"description": "Search filters with 'company' and 'person' sub-objects",
"properties": {
"company": {
"type": "object",
"properties": {
"company_id": {"type": "object", "properties": {"include": {"type": "array"}, "exclude": {"type": "array"}}},
"company_domains": {"type": "object", "properties": {"include": {"type": "array"}, "exclude": {"type": "array"}}},
"company_hq_location": {"type": "object"},
"company_industry": {"type": "object", "properties": {"include": {"type": "array"}, "exclude": {"type": "array"}}},
"company_size": {"type": "array", "description": "Size IDs: 1=10k+, 2=5k-10k, 3=1k-5k, 4=501-1k, 5=201-500, 6=51-200, 7=11-50, 8=1-10"},
"company_technology": {"type": "object", "properties": {"include": {"type": "array"}, "exclude": {"type": "array"}}}
}
},
"person": {
"type": "object",
"properties": {
"person_location": {"type": "object"},
"job_title_keyword": {"type": "object", "properties": {"include": {"type": "array"}, "exclude": {"type": "array"}}},
"seniority": {"type": "array", "description": "IDs: 0=Executive, 1=VP, 2=Director, 3=Manager, 4=Others"}
}
}
}
},
"page_num": {"type": "integer", "description": "Page number (0-indexed)", "default": 0},
"rows_per_page": {"type": "integer", "description": "Results per page (max 100)", "default": 10}
}
}
},
{
"name": "search_by_company_name",
"description": "Search for companies by name (autocomplete). Returns company_id which can be used in search_prospects filters.",
"inputSchema": {
"type": "object",
"properties": {
"company_name": {"type": "string", "description": "Company name to search for"}
},
"required": ["company_name"]
}
},
{
"name": "search_by_company_domain",
"description": "Find a company by its website domain. Returns company info including company_id.",
"inputSchema": {
"type": "object",
"properties": {
"domain": {"type": "string", "description": "Company website domain (e.g., 'google.com')"}
},
"required": ["domain"]
}
},
{
"name": "search_by_job_title",
"description": "Search for job title suggestions (autocomplete). Returns job titles that can be used in filters.",
"inputSchema": {
"type": "object",
"properties": {
"job_title": {"type": "string", "description": "Job title keyword to search for"}
},
"required": ["job_title"]
}
},
{
"name": "search_by_location",
"description": "Search for location suggestions (autocomplete). Returns location names for filters.",
"inputSchema": {
"type": "object",
"properties": {
"location": {"type": "string", "description": "Location name to search for"},
"location_type": {"type": "string", "enum": ["person", "company"], "default": "person"}
},
"required": ["location"]
}
},
{
"name": "search_by_technology",
"description": "Search for technology/tech stack suggestions. Returns tech names for company_technology filter.",
"inputSchema": {
"type": "object",
"properties": {
"technology": {"type": "string", "description": "Technology name to search for"}
},
"required": ["technology"]
}
},
{
"name": "get_person_profile",
"description": "Get detailed profile for a person. Use after finding them via search_prospects.",
"inputSchema": {
"type": "object",
"properties": {
"profile_id": {"type": "string", "description": "Person's _id from search_prospects results"}
},
"required": ["profile_id"]
}
},
{
"name": "get_company_profile",
"description": "Get detailed company profile including overview, tech stack, and employee info.",
"inputSchema": {
"type": "object",
"properties": {
"company_id": {"type": "string", "description": "Company's _id from search results"},
"domain": {"type": "string", "description": "Company domain (alternative to company_id)"}
}
}
},
{
"name": "unlock_contact_info",
"description": "Unlock contact information (email/phone) for person profiles. This consumes credits.",
"inputSchema": {
"type": "object",
"properties": {
"profile_ids": {
"type": "array",
"items": {"type": "string"},
"description": "List of person profile IDs to unlock (max 25)"
},
"unlock_type": {
"type": "string",
"enum": ["email", "phone", "both"],
"description": "Type of contact info to unlock",
"default": "both"
}
},
"required": ["profile_ids"]
}
},
{
"name": "fetch_profile",
"description": "Fetch the currently logged-in user's profile information.",
"inputSchema": {
"type": "object",
"properties": {
"include_permissions": {"type": "boolean", "default": False},
"include_invitation_data": {"type": "boolean", "default": False},
"include_onboarding": {"type": "boolean", "default": False}
}
}
},
{
"name": "get_search_history",
"description": "Retrieve user's recent search history.",
"inputSchema": {"type": "object", "properties": {}}
},
{
"name": "get_saved_searches",
"description": "Retrieve user's saved search templates.",
"inputSchema": {"type": "object", "properties": {}}
},
{
"name": "save_search",
"description": "Save a search template for future reuse.",
"inputSchema": {
"type": "object",
"properties": {
"name": {"type": "string", "description": "Descriptive name for this search"},
"filters": {"type": "object", "description": "The filters object from a search"},
"data_type": {"type": "string", "enum": ["person", "company"], "default": "person"},
"search_query": {"type": "string", "default": ""}
},
"required": ["name", "filters"]
}
}
]
# ============ JSON-RPC Helpers ============
def jsonrpc_response(id: Any, result: Any) -> dict:
"""Create a JSON-RPC 2.0 success response."""
return {
"jsonrpc": "2.0",
"id": id,
"result": result
}
def jsonrpc_error(id: Any, code: int, message: str, data: Any = None) -> dict:
"""Create a JSON-RPC 2.0 error response."""
error = {"code": code, "message": message}
if data is not None:
error["data"] = data
return {
"jsonrpc": "2.0",
"id": id,
"error": error
}
# Standard JSON-RPC error codes
PARSE_ERROR = -32700
INVALID_REQUEST = -32600
METHOD_NOT_FOUND = -32601
INVALID_PARAMS = -32602
INTERNAL_ERROR = -32603
# Custom error codes
UNAUTHORIZED = -32001
TOOL_EXECUTION_ERROR = -32002
# ============ MCP Handlers ============
async def handle_initialize(request_id: Any, params: dict) -> dict:
"""Handle MCP initialize request."""
return jsonrpc_response(request_id, {
"protocolVersion": settings.mcp_protocol_version,
"capabilities": {
"tools": {"listChanged": False},
},
"serverInfo": {
"name": settings.mcp_server_name,
"version": settings.mcp_server_version,
}
})
async def handle_tools_list(request_id: Any, params: dict) -> dict:
"""Handle MCP tools/list request."""
return jsonrpc_response(request_id, {
"tools": TOOL_DEFINITIONS
})
async def handle_tool_call(
request_id: Any,
params: dict,
user_tokens: dict[str, str],
client_ip: str,
) -> dict:
"""Handle MCP tools/call request."""
tool_name = params.get("name")
tool_args = params.get("arguments", {})
if not tool_name:
return jsonrpc_error(request_id, INVALID_PARAMS, "Missing tool name")
try:
# Create API client with user's tokens
client = ZintlrAPIClient(user_tokens, client_ip)
# Execute the appropriate tool
result = await execute_tool(client, tool_name, tool_args)
return jsonrpc_response(request_id, {
"content": [
{
"type": "text",
"text": result if isinstance(result, str) else json.dumps(result, indent=2)
}
]
})
except ValueError as e:
return jsonrpc_error(request_id, UNAUTHORIZED, str(e))
except Exception as e:
return jsonrpc_error(request_id, TOOL_EXECUTION_ERROR, f"Tool execution failed: {str(e)}")
async def handle_notifications_initialized(request_id: Any, params: dict) -> None:
"""Handle notifications/initialized - no response needed for notifications."""
return None
# ============ Tool Execution ============
async def execute_tool(client: ZintlrAPIClient, tool_name: str, args: dict) -> Any:
"""Execute a tool and return the result."""
if tool_name == "search_prospects":
return await tool_search_prospects(client, args)
elif tool_name == "search_by_company_name":
return await tool_search_by_company_name(client, args)
elif tool_name == "search_by_company_domain":
return await tool_search_by_company_domain(client, args)
elif tool_name == "search_by_job_title":
return await tool_search_by_job_title(client, args)
elif tool_name == "search_by_location":
return await tool_search_by_location(client, args)
elif tool_name == "search_by_technology":
return await tool_search_by_technology(client, args)
elif tool_name == "get_person_profile":
return await tool_get_person_profile(client, args)
elif tool_name == "get_company_profile":
return await tool_get_company_profile(client, args)
elif tool_name == "unlock_contact_info":
return await tool_unlock_contact_info(client, args)
elif tool_name == "fetch_profile":
return await tool_fetch_profile(client, args)
elif tool_name == "get_search_history":
return await tool_get_search_history(client, args)
elif tool_name == "get_saved_searches":
return await tool_get_saved_searches(client, args)
elif tool_name == "save_search":
return await tool_save_search(client, args)
else:
raise ValueError(f"Unknown tool: {tool_name}")
# ============ Tool Implementations ============
def ensure_include_exclude(filter_val: dict | None) -> dict:
"""Ensure filter has both include and exclude arrays."""
if not filter_val or not isinstance(filter_val, dict):
return {"include": [], "exclude": []}
return {
"include": filter_val.get("include", []),
"exclude": filter_val.get("exclude", [])
}
def normalize_company_filters(filters: dict) -> dict:
"""Normalize company filters to proper API format."""
provided_location = filters.get("company_hq_location", {})
merged_location = {
"continent": ensure_include_exclude(provided_location.get("continent")),
"country": ensure_include_exclude(provided_location.get("country")),
"state": ensure_include_exclude(provided_location.get("state")),
"city": ensure_include_exclude(provided_location.get("city")),
"zipcode": ensure_include_exclude(provided_location.get("zipcode"))
}
return {
"company_id": ensure_include_exclude(filters.get("company_id")),
"company_domains": ensure_include_exclude(filters.get("company_domains")),
"company_hq_location": merged_location,
"company_industry": ensure_include_exclude(filters.get("company_industry")),
"company_technology": ensure_include_exclude(filters.get("company_technology")),
"company_keywords": ensure_include_exclude(filters.get("company_keywords")),
"company_size": filters.get("company_size", []),
"company_revenue": filters.get("company_revenue", []),
"company_foundation": filters.get("company_foundation", {"start": "", "end": ""}),
"company_desc": ensure_include_exclude(filters.get("company_desc"))
}
def normalize_person_filters(filters: dict) -> dict:
"""Normalize person filters to proper API format."""
provided_location = filters.get("person_location", {})
merged_location = {
"continent": ensure_include_exclude(provided_location.get("continent")),
"country": ensure_include_exclude(provided_location.get("country")),
"state": ensure_include_exclude(provided_location.get("state")),
"city": ensure_include_exclude(provided_location.get("city")),
"zipcode": ensure_include_exclude(provided_location.get("zipcode"))
}
return {
"name": filters.get("name", ""),
"contact_details": filters.get("contact_details", []),
"job_functions": filters.get("job_functions", []),
"seniority": filters.get("seniority", []),
"person_location": merged_location,
"job_title": ensure_include_exclude(filters.get("job_title")),
"job_title_keyword": ensure_include_exclude(filters.get("job_title_keyword")),
"industry": ensure_include_exclude(filters.get("industry")),
"person_desc": ensure_include_exclude(filters.get("person_desc"))
}
async def tool_search_prospects(client: ZintlrAPIClient, args: dict) -> str:
"""Search for people or companies."""
data_type_map = {"person": 1, "company": 2}
data_type_val = data_type_map.get(args.get("data_type", "person"), 1)
filters = args.get("filters", {})
normalized_filters = {
"company": normalize_company_filters(filters.get("company", {})),
"person": normalize_person_filters(filters.get("person", {}))
}
payload = {
"search_query": args.get("search_query", "")[:100],
"filters": normalized_filters,
"data_type": data_type_val,
"page_num": args.get("page_num", 0),
"rows_per_page": min(args.get("rows_per_page", 10), 100),
"refresh": 0,
"skip_count": 0,
"filter_type": 1,
"source": "DB"
}
response = await client.post("prospecting/search", payload)
if response.get("success"):
result = response.get("data", {})
prospects = result.get("data", [])
metadata = result.get("metadata", [{}])[0] if result.get("metadata") else {}
summary = f"Found {metadata.get('total', len(prospects))} results\n"
summary += f"Page {metadata.get('page', 1)} of {metadata.get('total_pages', 1)}\n\n"
return summary + json.dumps(prospects, indent=2)
return f"Search failed: {response.get('message', 'Unknown error')}"
async def tool_search_by_company_name(client: ZintlrAPIClient, args: dict) -> str:
"""Search for company name suggestions."""
payload = {"search_query": args.get("company_name", "")}
response = await client.post("prospecting/company/search/name", payload)
if response.get("success"):
return json.dumps(response.get("data", []), indent=2)
return f"Search failed: {response.get('message', 'Unknown error')}"
async def tool_search_by_company_domain(client: ZintlrAPIClient, args: dict) -> str:
"""Find company by domain."""
payload = {"search_query": args.get("domain", "")}
response = await client.post("prospecting/company/search/domain", payload)
if response.get("success"):
return json.dumps(response.get("data", []), indent=2)
return f"Search failed: {response.get('message', 'Unknown error')}"
async def tool_search_by_job_title(client: ZintlrAPIClient, args: dict) -> str:
"""Search for job title suggestions."""
payload = {
"search_query": "",
"search_keyword": args.get("job_title", "")
}
response = await client.post("prospecting/user/search/jobtitle", payload)
if response.get("success"):
return json.dumps(response.get("data", []), indent=2)
return f"Search failed: {response.get('message', 'Unknown error')}"
async def tool_search_by_location(client: ZintlrAPIClient, args: dict) -> str:
"""Search for location suggestions."""
location_type = args.get("location_type", "person")
endpoint = "profile" if location_type == "person" else "company"
payload = {"search_query": args.get("location", "")}
response = await client.post(f"prospecting/{endpoint}/search/location", payload)
if response.get("success"):
return json.dumps(response.get("data", []), indent=2)
return f"Search failed: {response.get('message', 'Unknown error')}"
async def tool_search_by_technology(client: ZintlrAPIClient, args: dict) -> str:
"""Search for technology suggestions."""
payload = {
"search_query": "",
"search_keyword": args.get("technology", "")
}
response = await client.post("prospecting/company/search/technology", payload)
if response.get("success"):
return json.dumps(response.get("data", []), indent=2)
return f"Search failed: {response.get('message', 'Unknown error')}"
async def tool_get_person_profile(client: ZintlrAPIClient, args: dict) -> str:
"""Get detailed person profile."""
payload = {"profile_id": args.get("profile_id", "")}
response = await client.post("prospecting/person/profile", payload)
if response.get("success"):
return json.dumps(response.get("data", {}), indent=2)
return f"Failed to get profile: {response.get('message', 'Unknown error')}"
async def tool_get_company_profile(client: ZintlrAPIClient, args: dict) -> str:
"""Get detailed company profile."""
company_id = args.get("company_id")
domain = args.get("domain")
if not company_id and not domain:
return "Either company_id or domain is required"
payload = {"tab": 1}
if company_id:
payload["company_id"] = company_id
elif domain:
payload["domain"] = domain
response = await client.post("prospecting/company/profile", payload)
if response.get("success"):
return json.dumps(response.get("data", {}), indent=2)
return f"Failed to get company profile: {response.get('message', 'Unknown error')}"
async def tool_unlock_contact_info(client: ZintlrAPIClient, args: dict) -> str:
"""Unlock contact information for profiles."""
profile_ids = args.get("profile_ids", [])
if not profile_ids:
return "At least one profile_id is required"
if len(profile_ids) > 25:
return "Maximum 25 profiles can be unlocked at once"
unlock_type_map = {
"email": ["EMAIL"],
"phone": ["PHONE"],
"both": ["EMAIL", "PHONE"]
}
unlock_type_value = unlock_type_map.get(args.get("unlock_type", "both"), ["EMAIL", "PHONE"])
# Unlock profiles one by one (bulk unlock may fail)
unlocked = []
failed_count = 0
for profile_id in profile_ids:
payload = {
"profile_ids": [profile_id],
"unlock_type": unlock_type_value,
}
response = await client.post("subscriptions/unlock", payload)
if response.get("success"):
result = response.get("data", {})
profiles = result.get("unlockedProfiles", [])
if profiles:
unlocked.extend(profiles)
else:
failed_count += 1
else:
failed_count += 1
if not unlocked:
return f"No profiles were unlocked. {failed_count} request(s) failed."
# Format output
output = f"Unlocked {len(unlocked)} profile(s)"
if failed_count > 0:
output += f" ({failed_count} failed)"
output += "\n\n"
for profile in unlocked:
name = profile.get('full_name', f"{profile.get('first_name', '')} {profile.get('last_name', '')}")
output += f"**{name}**\n"
output += f" Company: {profile.get('company_name', 'N/A')}\n"
output += f" Title: {profile.get('job_title', 'N/A')}\n"
emails = profile.get("emails", [])
if emails:
output += " Emails:\n"
for email in emails:
if isinstance(email, dict):
addr = email.get('address', email.get('email', ''))
if addr and 'xxxxx' not in addr.lower():
output += f" - {addr}\n"
elif isinstance(email, str) and 'xxxxx' not in email.lower():
output += f" - {email}\n"
phones = profile.get("phone", [])
if phones:
output += " Phones:\n"
for phone in phones:
if isinstance(phone, dict):
num = phone.get('ph', phone.get('phone', phone.get('number', '')))
if num and 'xxxxx' not in str(num).lower():
output += f" - {num}\n"
elif isinstance(phone, str) and 'xxxxx' not in phone.lower():
output += f" - {phone}\n"
output += "\n"
return output
async def tool_fetch_profile(client: ZintlrAPIClient, args: dict) -> str:
"""Fetch current user's profile."""
params = {
"permissions": 1 if args.get("include_permissions") else 0,
"get_invitation_data": 1 if args.get("include_invitation_data") else 0,
"onboarding": 1 if args.get("include_onboarding") else 0
}
response = await client.get("auth/fetchprofile", params)
if response.get("success"):
return json.dumps(response.get("data", {}), indent=2)
return f"Failed to fetch profile: {response.get('message', 'Unknown error')}"
async def tool_get_search_history(client: ZintlrAPIClient, args: dict) -> str:
"""Get user's search history."""
response = await client.get("prospecting/user/searchistory")
if response.get("success"):
return json.dumps(response.get("data", []), indent=2)
return f"Failed to get search history: {response.get('message', 'Unknown error')}"
async def tool_get_saved_searches(client: ZintlrAPIClient, args: dict) -> str:
"""Get user's saved searches."""
response = await client.get("prospecting/user/savedsearch")
if response.get("success"):
return json.dumps(response.get("data", []), indent=2)
return f"Failed to get saved searches: {response.get('message', 'Unknown error')}"
async def tool_save_search(client: ZintlrAPIClient, args: dict) -> str:
"""Save a search template."""
data_type_val = 1 if args.get("data_type", "person") == "person" else 2
filters = args.get("filters", {})
normalized_filters = {
"company": normalize_company_filters(filters.get("company", {})),
"person": normalize_person_filters(filters.get("person", {}))
}
payload = {
"label_name": args.get("name", ""),
"filter_params": normalized_filters,
"search_query": args.get("search_query", ""),
"data_type": data_type_val
}
response = await client.post("prospecting/user/savedsearch/add", payload)
if response.get("success"):
return f"Search saved successfully: {args.get('name')}"
return f"Failed to save search: {response.get('message', 'Unknown error')}"
# ============ Main Request Handler ============
async def handle_mcp_request(
request: Request,
body: dict,
session_id: str | None = None,
) -> JSONResponse:
"""
Handle an MCP JSON-RPC request.
Args:
request: FastAPI request object
body: Parsed JSON-RPC request body
session_id: User's session ID (from Authorization header)
Returns:
JSON-RPC response
"""
request_id = body.get("id")
method = body.get("method", "")
params = body.get("params", {})
# Handle methods that don't require auth
if method == "initialize":
return JSONResponse(await handle_initialize(request_id, params))
if method == "notifications/initialized":
# Notifications don't get responses
await handle_notifications_initialized(request_id, params)
return JSONResponse(status_code=202, content={})
if method == "tools/list":
return JSONResponse(await handle_tools_list(request_id, params))
# Methods below require authentication
if not session_id:
return JSONResponse(
jsonrpc_error(request_id, UNAUTHORIZED, "Authentication required"),
status_code=401
)
# Get user tokens from session
user_tokens = await session_manager.get_user_tokens(session_id)
if not user_tokens:
return JSONResponse(
jsonrpc_error(request_id, UNAUTHORIZED, "Invalid or expired session"),
status_code=401
)
# Get client IP
client_ip = request.headers.get("x-forwarded-for", request.client.host if request.client else "Unknown")
if "," in client_ip:
client_ip = client_ip.split(",")[0].strip()
# Handle tool calls
if method == "tools/call":
return JSONResponse(await handle_tool_call(request_id, params, user_tokens, client_ip))
# Unknown method
return JSONResponse(
jsonrpc_error(request_id, METHOD_NOT_FOUND, f"Unknown method: {method}")
)