from fastapi import FastAPI, HTTPException, Depends, Header
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from pydantic import BaseModel, Field
import requests
import os
from typing import Dict, Any, List, Optional
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# Get MCP server URL from environment variables
MCP_SERVER_URL = os.getenv("MCP_SERVER_URL", "http://localhost:8888/mcp")
app = FastAPI(
title="LinkedIn MCP Client API",
description="A specialized FastAPI client for LinkedIn integration via Model Context Protocol (MCP)",
version="1.0.0",
docs_url="/docs",
redoc_url="/redoc"
)
# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Pydantic models for LinkedIn-specific requests
class ProfileRequest(BaseModel):
person_id: Optional[str] = Field(None, description="LinkedIn person ID (optional, defaults to current user)")
access_token: Optional[str] = Field(None, description="LinkedIn access token (optional if set in environment)")
class PostRequest(BaseModel):
content: str = Field(..., description="Post content text")
visibility: Optional[str] = Field("PUBLIC", description="Post visibility: PUBLIC or CONNECTIONS")
access_token: Optional[str] = Field(None, description="LinkedIn access token")
class PostsRequest(BaseModel):
count: Optional[int] = Field(50, description="Number of posts to retrieve (max: 100)")
access_token: Optional[str] = Field(None, description="LinkedIn access token")
class CompanyRequest(BaseModel):
company_id: str = Field(..., description="LinkedIn company ID")
access_token: Optional[str] = Field(None, description="LinkedIn access token")
class CompanySearchRequest(BaseModel):
keywords: str = Field(..., description="Search keywords")
count: Optional[int] = Field(10, description="Number of results (max: 50)")
access_token: Optional[str] = Field(None, description="LinkedIn access token")
class ConnectionsRequest(BaseModel):
start: Optional[int] = Field(0, description="Starting position")
count: Optional[int] = Field(50, description="Number of connections (max: 500)")
access_token: Optional[str] = Field(None, description="LinkedIn access token")
class ConnectionRequest(BaseModel):
person_id: str = Field(..., description="LinkedIn person ID to connect with")
message: Optional[str] = Field("", description="Optional connection message")
access_token: Optional[str] = Field(None, description="LinkedIn access token")
class MessagesRequest(BaseModel):
conversation_id: Optional[str] = Field(None, description="Specific conversation ID")
count: Optional[int] = Field(20, description="Number of messages (max: 100)")
access_token: Optional[str] = Field(None, description="LinkedIn access token")
class SendMessageRequest(BaseModel):
recipients: List[str] = Field(..., description="Array of LinkedIn person IDs")
message: str = Field(..., description="Message content")
access_token: Optional[str] = Field(None, description="LinkedIn access token")
class NetworkAnalysisRequest(BaseModel):
analysis_type: str = Field(..., description="Type of analysis: connections, posts, industry, activity")
access_token: Optional[str] = Field(None, description="LinkedIn access token")
# Helper function to make MCP requests
async def call_mcp_server(method: str, params: Dict[str, Any] = None) -> Dict[str, Any]:
if params is None:
params = {}
payload = {
"jsonrpc": "2.0",
"method": method,
"params": params,
"id": "1"
}
try:
response = requests.post(MCP_SERVER_URL, json=payload, timeout=30)
response.raise_for_status()
result = response.json()
if "error" in result:
raise HTTPException(status_code=400, detail=result["error"]["message"])
return result.get("result", {})
except requests.RequestException as e:
raise HTTPException(status_code=500, detail=f"Failed to communicate with MCP server: {str(e)}")
# Helper function to call LinkedIn tools
async def call_linkedin_tool(tool_name: str, args: Dict[str, Any]) -> Any:
result = await call_mcp_server("mcp/callTool", {
"name": tool_name,
"args": args
})
# Parse the response content
if "content" in result and result["content"]:
content_text = result["content"][0].get("text", "{}")
try:
import json
return json.loads(content_text)
except json.JSONDecodeError:
return {"raw_response": content_text}
return result
# Authentication dependency
def get_access_token(access_token: Optional[str] = None) -> str:
"""Get access token from request or environment"""
token = access_token or os.getenv("LINKEDIN_ACCESS_TOKEN")
if not token:
raise HTTPException(
status_code=401,
detail="LinkedIn access token is required. Set LINKEDIN_ACCESS_TOKEN environment variable or provide in request."
)
return token
# API Routes
@app.get("/", tags=["Info"])
async def root():
return {
"name": "LinkedIn MCP Client API",
"description": "Specialized FastAPI client for LinkedIn integration via MCP",
"version": "1.0.0",
"mcp_server": MCP_SERVER_URL,
"endpoints": {
"profile": "/profile - Get LinkedIn profile information",
"posts": "/posts - Manage LinkedIn posts",
"companies": "/companies - Company information and search",
"connections": "/connections - Network management",
"messages": "/messages - LinkedIn messaging",
"analytics": "/analytics - Network analysis"
}
}
@app.get("/server/info", tags=["Server"])
async def get_server_info():
"""Get MCP server information"""
return await call_mcp_server("mcp/init")
@app.get("/tools", tags=["Server"])
async def list_tools():
"""List all available LinkedIn tools"""
return await call_mcp_server("mcp/listTools")
@app.get("/resources", tags=["Server"])
async def list_resources():
"""List all available documentation resources"""
return await call_mcp_server("mcp/listResources")
@app.get("/resources/{resource_name}", tags=["Server"])
async def get_resource(resource_name: str):
"""Get specific documentation resource"""
return await call_mcp_server("mcp/readResource", {
"uri": f"docs://{resource_name}"
})
# Profile Management
@app.get("/profile", tags=["Profile"])
async def get_profile(person_id: Optional[str] = None, access_token: Optional[str] = Header(None)):
"""Get LinkedIn profile information for current user or specified person"""
token = get_access_token(access_token)
args = {"accessToken": token}
if person_id:
args["personId"] = person_id
return await call_linkedin_tool("get-profile", args)
@app.post("/profile", tags=["Profile"])
async def get_profile_post(request: ProfileRequest):
"""Get LinkedIn profile information (POST version for complex requests)"""
token = get_access_token(request.access_token)
args = {"accessToken": token}
if request.person_id:
args["personId"] = request.person_id
return await call_linkedin_tool("get-profile", args)
# Posts Management
@app.post("/posts", tags=["Posts"])
async def create_post(request: PostRequest):
"""Create a new LinkedIn post"""
token = get_access_token(request.access_token)
args = {
"accessToken": token,
"content": request.content,
"visibility": request.visibility
}
return await call_linkedin_tool("create-post", args)
@app.get("/posts", tags=["Posts"])
async def get_posts(count: int = 50, access_token: Optional[str] = Header(None)):
"""Get user's LinkedIn posts"""
token = get_access_token(access_token)
args = {
"accessToken": token,
"count": min(count, 100)
}
return await call_linkedin_tool("get-posts", args)
@app.post("/posts/list", tags=["Posts"])
async def get_posts_post(request: PostsRequest):
"""Get user's LinkedIn posts (POST version)"""
token = get_access_token(request.access_token)
args = {
"accessToken": token,
"count": min(request.count, 100)
}
return await call_linkedin_tool("get-posts", args)
# Company Management
@app.get("/companies/{company_id}", tags=["Companies"])
async def get_company(company_id: str, access_token: Optional[str] = Header(None)):
"""Get company information by ID"""
token = get_access_token(access_token)
args = {
"accessToken": token,
"companyId": company_id
}
return await call_linkedin_tool("get-company", args)
@app.post("/companies/info", tags=["Companies"])
async def get_company_post(request: CompanyRequest):
"""Get company information (POST version)"""
token = get_access_token(request.access_token)
args = {
"accessToken": token,
"companyId": request.company_id
}
return await call_linkedin_tool("get-company", args)
@app.post("/companies/search", tags=["Companies"])
async def search_companies(request: CompanySearchRequest):
"""Search for companies by keywords"""
token = get_access_token(request.access_token)
args = {
"accessToken": token,
"keywords": request.keywords,
"count": min(request.count, 50)
}
return await call_linkedin_tool("search-companies", args)
# Connections Management
@app.get("/connections", tags=["Connections"])
async def get_connections(start: int = 0, count: int = 50, access_token: Optional[str] = Header(None)):
"""Get user's LinkedIn connections"""
token = get_access_token(access_token)
args = {
"accessToken": token,
"start": start,
"count": min(count, 500)
}
return await call_linkedin_tool("get-connections", args)
@app.post("/connections/list", tags=["Connections"])
async def get_connections_post(request: ConnectionsRequest):
"""Get user's LinkedIn connections (POST version)"""
token = get_access_token(request.access_token)
args = {
"accessToken": token,
"start": request.start,
"count": min(request.count, 500)
}
return await call_linkedin_tool("get-connections", args)
@app.post("/connections/request", tags=["Connections"])
async def send_connection_request(request: ConnectionRequest):
"""Send a connection request to another LinkedIn user"""
token = get_access_token(request.access_token)
args = {
"accessToken": token,
"personId": request.person_id,
"message": request.message
}
return await call_linkedin_tool("send-connection-request", args)
# Messaging
@app.get("/messages", tags=["Messages"])
async def get_messages(conversation_id: Optional[str] = None, count: int = 20, access_token: Optional[str] = Header(None)):
"""Get LinkedIn messages/conversations"""
token = get_access_token(access_token)
args = {
"accessToken": token,
"count": min(count, 100)
}
if conversation_id:
args["conversationId"] = conversation_id
return await call_linkedin_tool("get-messages", args)
@app.post("/messages/list", tags=["Messages"])
async def get_messages_post(request: MessagesRequest):
"""Get LinkedIn messages/conversations (POST version)"""
token = get_access_token(request.access_token)
args = {
"accessToken": token,
"count": min(request.count, 100)
}
if request.conversation_id:
args["conversationId"] = request.conversation_id
return await call_linkedin_tool("get-messages", args)
@app.post("/messages/send", tags=["Messages"])
async def send_message(request: SendMessageRequest):
"""Send a message to LinkedIn connections"""
token = get_access_token(request.access_token)
args = {
"accessToken": token,
"recipients": request.recipients,
"message": request.message
}
return await call_linkedin_tool("send-message", args)
# Analytics
@app.post("/analytics/network", tags=["Analytics"])
async def analyze_network(request: NetworkAnalysisRequest):
"""Analyze LinkedIn network and provide insights"""
token = get_access_token(request.access_token)
args = {
"accessToken": token,
"analysisType": request.analysis_type
}
return await call_linkedin_tool("analyze-network", args)
@app.get("/analytics/network/{analysis_type}", tags=["Analytics"])
async def analyze_network_get(analysis_type: str, access_token: Optional[str] = Header(None)):
"""Analyze LinkedIn network (GET version)"""
token = get_access_token(access_token)
args = {
"accessToken": token,
"analysisType": analysis_type
}
return await call_linkedin_tool("analyze-network", args)
# Health check
@app.get("/health", tags=["Health"])
async def health_check():
"""Check if the service and MCP server are healthy"""
try:
server_info = await call_mcp_server("mcp/init")
return {
"status": "healthy",
"mcp_server": "connected",
"server_info": server_info
}
except Exception as e:
return JSONResponse(
status_code=503,
content={
"status": "unhealthy",
"mcp_server": "disconnected",
"error": str(e)
}
)
if __name__ == "__main__":
import uvicorn
uvicorn.run("linkedin_client:app", host="0.0.0.0", port=8002, reload=True)