LinkedIn Browser MCP Server
by alinaqi
from fastmcp import FastMCP, Context
from playwright.async_api import async_playwright
import asyncio
import os
import json
from dotenv import load_dotenv
from cryptography.fernet import Fernet
import time
import logging
import sys
from pathlib import Path
# Set up logging to stderr only
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[logging.StreamHandler(sys.stderr)]
)
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
def setup_sessions_directory():
"""Set up the sessions directory with proper permissions"""
try:
sessions_dir = Path(__file__).parent / 'sessions'
sessions_dir.mkdir(mode=0o777, parents=True, exist_ok=True)
# Ensure the directory has the correct permissions even if it already existed
os.chmod(sessions_dir, 0o777)
logger.debug(f"Sessions directory set up at {sessions_dir} with full permissions")
return True
except Exception as e:
logger.error(f"Failed to set up sessions directory: {str(e)}")
return False
# Load environment variables
env_path = Path(__file__).parent / '.env'
if env_path.exists():
load_dotenv(env_path)
logger.debug(f"Loaded environment from {env_path}")
else:
logger.warning(f"No .env file found at {env_path}")
# Create MCP server with required dependencies
mcp = FastMCP(
"linkedin",
dependencies=[
"playwright==1.40.0",
"python-dotenv>=0.19.0",
"cryptography>=35.0.0",
"httpx>=0.24.0"
],
debug=True # Enable debug mode for better error reporting
)
def report_progress(ctx, current, total, message=None):
"""Helper function to report progress with proper validation"""
try:
progress = min(1.0, current / total) if total > 0 else 0
if message:
ctx.info(message)
logger.debug(f"Progress: {progress:.2%} - {message if message else ''}")
except Exception as e:
logger.error(f"Error reporting progress: {str(e)}")
def handle_notification(ctx, notification_type, params=None):
"""Helper function to handle notifications with proper validation"""
try:
if notification_type == "initialized":
logger.info("MCP Server initialized")
if ctx: # Only call ctx.info if ctx is provided
ctx.info("Server initialized and ready")
elif notification_type == "cancelled":
reason = params.get("reason", "Unknown reason")
logger.warning(f"Operation cancelled: {reason}")
if ctx:
ctx.warning(f"Operation cancelled: {reason}")
else:
logger.debug(f"Notification: {notification_type} - {params}")
except Exception as e:
logger.error(f"Error handling notification: {str(e)}")
# Helper to save cookies between sessions
async def save_cookies(page, platform):
"""Save cookies with proper directory permissions"""
try:
cookies = await page.context.cookies()
# Validate cookies
if not cookies or not isinstance(cookies, list):
raise ValueError("Invalid cookie format")
# Add timestamp for expiration check
cookie_data = {
"timestamp": int(time.time()),
"cookies": cookies
}
# Ensure sessions directory exists with proper permissions
if not setup_sessions_directory():
raise Exception("Failed to set up sessions directory")
# Encrypt cookies before saving
key = os.getenv('COOKIE_ENCRYPTION_KEY', Fernet.generate_key())
f = Fernet(key)
encrypted_data = f.encrypt(json.dumps(cookie_data).encode())
cookie_file = Path(__file__).parent / 'sessions' / f'{platform}_cookies.json'
with open(cookie_file, 'wb') as f:
f.write(encrypted_data)
# Set file permissions to 666 (rw-rw-rw-)
os.chmod(cookie_file, 0o666)
except Exception as e:
raise Exception(f"Failed to save cookies: {str(e)}")
# Helper to load cookies
async def load_cookies(context, platform):
try:
with open(f'sessions/{platform}_cookies.json', 'rb') as f:
encrypted_data = f.read()
# Decrypt cookies
key = os.getenv('COOKIE_ENCRYPTION_KEY')
if not key:
return False
f = Fernet(key)
cookie_data = json.loads(f.decrypt(encrypted_data))
# Check cookie expiration (24 hours)
if int(time.time()) - cookie_data["timestamp"] > 86400:
os.remove(f'sessions/{platform}_cookies.json')
return False
await context.add_cookies(cookie_data["cookies"])
return True
except FileNotFoundError:
return False
except Exception as e:
# If there's any error loading cookies, delete the file and start fresh
try:
os.remove(f'sessions/{platform}_cookies.json')
except:
pass
return False
class BrowserSession:
"""Context manager for browser sessions with cookie persistence"""
def __init__(self, platform='linkedin', headless=True, launch_timeout=30000, max_retries=3):
logger.info(f"Initializing {platform} browser session (headless: {headless})")
self.platform = platform
self.headless = headless
self.launch_timeout = launch_timeout
self.max_retries = max_retries
self.playwright = None
self.browser = None
self.context = None
self._closed = False
async def __aenter__(self):
retry_count = 0
last_error = None
# Ensure sessions directory exists with proper permissions
if not setup_sessions_directory():
raise Exception("Failed to set up sessions directory with proper permissions")
while retry_count < self.max_retries and not self._closed:
try:
logger.info(f"Starting Playwright (attempt {retry_count + 1}/{self.max_retries})")
# Ensure clean state
await self._cleanup()
# Initialize Playwright with timeout
self.playwright = await asyncio.wait_for(
async_playwright().start(),
timeout=self.launch_timeout/1000
)
# Launch browser with more generous timeout and retry logic
launch_success = False
for attempt in range(3):
try:
logger.info(f"Launching browser (sub-attempt {attempt + 1}/3)")
self.browser = await self.playwright.chromium.launch(
headless=self.headless,
timeout=self.launch_timeout,
args=[
'--disable-dev-shm-usage',
'--no-sandbox',
'--disable-blink-features=AutomationControlled', # Try to avoid detection
'--start-maximized' # Start with maximized window
]
)
launch_success = True
break
except Exception as e:
logger.error(f"Browser launch sub-attempt {attempt + 1} failed: {str(e)}")
await asyncio.sleep(2) # Increased delay between attempts
if not launch_success:
raise Exception("Failed to launch browser after 3 attempts")
logger.info("Creating browser context")
self.context = await self.browser.new_context(
viewport={'width': 1280, 'height': 800},
user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.110 Safari/537.36'
)
# Try to load existing session
logger.info("Attempting to load existing session")
try:
session_loaded = await load_cookies(self.context, self.platform)
if session_loaded:
logger.info("Existing session loaded successfully")
else:
logger.info("No existing session found or session expired")
except Exception as cookie_error:
logger.warning(f"Error loading cookies: {str(cookie_error)}")
# Continue even if cookie loading fails
return self
except Exception as e:
last_error = e
retry_count += 1
logger.error(f"Browser session initialization attempt {retry_count} failed: {str(e)}")
# Cleanup on failure
await self._cleanup()
if retry_count < self.max_retries and not self._closed:
await asyncio.sleep(2 * retry_count) # Exponential backoff
else:
logger.error("All browser session initialization attempts failed")
raise Exception(f"Failed to initialize browser after {self.max_retries} attempts. Last error: {str(last_error)}")
async def _cleanup(self):
"""Clean up browser resources"""
if self.browser:
try:
await self.browser.close()
except Exception as e:
logger.error(f"Error closing browser: {str(e)}")
if self.playwright:
try:
await self.playwright.stop()
except Exception as e:
logger.error(f"Error stopping playwright: {str(e)}")
self.browser = None
self.playwright = None
self.context = None
async def __aexit__(self, exc_type, exc_val, exc_tb):
logger.info("Closing browser session")
self._closed = True
await self._cleanup()
async def new_page(self, url=None):
if self._closed:
raise Exception("Browser session has been closed")
page = await self.context.new_page()
if url:
try:
await page.goto(url, wait_until='networkidle', timeout=30000)
except Exception as e:
logger.error(f"Error navigating to {url}: {str(e)}")
raise
return page
async def save_session(self, page):
if self._closed:
raise Exception("Browser session has been closed")
try:
await save_cookies(page, self.platform)
except Exception as e:
logger.error(f"Error saving session: {str(e)}")
raise
@mcp.tool()
async def login_linkedin(username: str | None = None, password: str | None = None, ctx: Context | None = None) -> dict:
"""Open LinkedIn login page in browser for manual login.
Username and password are optional - if not provided, user will need to enter them manually."""
logger.info("Starting LinkedIn login with browser for manual login")
# Create browser session with explicit window size and position
async with BrowserSession(platform='linkedin', headless=False) as session:
try:
# Configure browser window
page = await session.new_page()
await page.set_viewport_size({'width': 1280, 'height': 800})
# Navigate to LinkedIn login
await page.goto('https://www.linkedin.com/login', wait_until='networkidle')
# Check if already logged in
if 'feed' in page.url:
await session.save_session(page)
return {"status": "success", "message": "Already logged in"}
if ctx:
ctx.info("Please log in manually through the browser window...")
ctx.info("The browser will wait for up to 5 minutes for you to complete the login.")
logger.info("Waiting for manual login...")
# Pre-fill credentials if provided
try:
if username:
await page.fill('#username', username)
if password:
await page.fill('#password', password)
except Exception as e:
logger.warning(f"Failed to pre-fill credentials: {str(e)}")
# Continue anyway - user can enter manually
# Wait for successful login (feed page)
try:
await page.wait_for_url('**/feed/**', timeout=300000) # 5 minutes timeout
if ctx:
ctx.info("Login successful!")
logger.info("Manual login successful")
await session.save_session(page)
# Keep browser open for a moment to show success
await asyncio.sleep(3)
return {"status": "success", "message": "Manual login successful"}
except Exception as e:
logger.error(f"Login timeout: {str(e)}")
return {
"status": "error",
"message": "Login timeout. Please try again and complete login within 5 minutes."
}
except Exception as e:
logger.error(f"Login process error: {str(e)}")
return {"status": "error", "message": f"Login process error: {str(e)}"}
@mcp.tool()
async def login_linkedin_secure(ctx: Context | None = None) -> dict:
"""Open LinkedIn login page in browser for manual login using environment credentials as default values.
Optional environment variables:
- LINKEDIN_USERNAME: Your LinkedIn email/username (will be pre-filled if provided)
- LINKEDIN_PASSWORD: Your LinkedIn password (will be pre-filled if provided)
Returns:
dict: Login status and message
"""
logger.info("Starting secure LinkedIn login")
username = os.getenv('LINKEDIN_USERNAME', '').strip()
password = os.getenv('LINKEDIN_PASSWORD', '').strip()
# We'll pass the credentials to pre-fill them, but user can still modify them
return await login_linkedin(username if username else None, password if password else None, ctx)
@mcp.tool()
async def get_linkedin_profile(username: str, ctx: Context) -> dict:
"""Get LinkedIn profile information"""
async with BrowserSession(platform='linkedin', headless=False) as session:
page = await session.new_page(f'https://www.linkedin.com/in/{username}')
# Check if profile page loaded
if 'profile' not in page.url:
return {"status": "error", "message": "Profile page not found"}
@mcp.tool()
async def browse_linkedin_feed(ctx: Context, count: int = 5) -> dict:
"""Browse LinkedIn feed and return recent posts
Args:
ctx: MCP context for logging and progress reporting
count: Number of posts to retrieve (default: 5)
Returns:
dict: Contains status, posts array, and any errors
"""
posts = []
errors = []
async with BrowserSession(platform='linkedin') as session:
try:
page = await session.new_page('https://www.linkedin.com/feed/')
# Check if we're logged in
if 'login' in page.url:
return {
"status": "error",
"message": "Not logged in. Please run login_linkedin tool first"
}
ctx.info(f"Browsing feed for {count} posts...")
# Scroll to load content
for i in range(min(count, 20)): # Limit to reasonable number
report_progress(ctx, i, count, f"Loading post {i+1}/{count}")
try:
# Wait for posts to be visible
await page.wait_for_selector('.feed-shared-update-v2', timeout=5000)
# Extract visible posts
new_posts = await page.evaluate('''() => {
return Array.from(document.querySelectorAll('.feed-shared-update-v2'))
.map(post => {
try {
return {
author: post.querySelector('.feed-shared-actor__name')?.innerText?.trim() || 'Unknown',
headline: post.querySelector('.feed-shared-actor__description')?.innerText?.trim() || '',
content: post.querySelector('.feed-shared-text')?.innerText?.trim() || '',
timestamp: post.querySelector('.feed-shared-actor__sub-description')?.innerText?.trim() || '',
likes: post.querySelector('.social-details-social-counts__reactions-count')?.innerText?.trim() || '0'
};
} catch (e) {
return null;
}
})
.filter(p => p !== null);
}''')
# Add new posts to our collection, avoiding duplicates
for post in new_posts:
if post not in posts:
posts.append(post)
if len(posts) >= count:
break
# Scroll down to load more content
await page.evaluate('window.scrollBy(0, 800)')
await page.wait_for_timeout(1000) # Wait for content to load
except Exception as scroll_error:
errors.append(f"Error during scroll {i}: {str(scroll_error)}")
continue
# Save session cookies
await session.save_session(page)
return {
"status": "success",
"posts": posts[:count],
"count": len(posts),
"errors": errors if errors else None
}
except Exception as e:
return {
"status": "error",
"message": f"Failed to browse feed: {str(e)}",
"posts": posts,
"errors": errors
}
@mcp.tool()
async def search_linkedin_profiles(query: str, ctx: Context, count: int = 5) -> dict:
"""Search for LinkedIn profiles matching a query"""
async with BrowserSession(platform='linkedin') as session:
try:
search_url = f'https://www.linkedin.com/search/results/people/?keywords={query}'
page = await session.new_page(search_url)
# Check if we're logged in
if 'login' in page.url:
return {
"status": "error",
"message": "Not logged in. Please run login_linkedin tool first"
}
ctx.info(f"Searching for profiles matching: {query}")
report_progress(ctx, 20, 100, "Loading search results...")
# Wait for search results
await page.wait_for_selector('.reusable-search__result-container', timeout=10000)
ctx.info("Search results loaded")
report_progress(ctx, 50, 100, "Extracting profile data...")
# Extract profile data
profiles = await page.evaluate('''(count) => {
const results = [];
const profileCards = document.querySelectorAll('.reusable-search__result-container');
for (let i = 0; i < Math.min(profileCards.length, count); i++) {
const card = profileCards[i];
try {
const profile = {
name: card.querySelector('.entity-result__title-text a')?.innerText?.trim() || 'Unknown',
headline: card.querySelector('.entity-result__primary-subtitle')?.innerText?.trim() || '',
location: card.querySelector('.entity-result__secondary-subtitle')?.innerText?.trim() || '',
profileUrl: card.querySelector('.app-aware-link')?.href || '',
connectionDegree: card.querySelector('.dist-value')?.innerText?.trim() || '',
snippet: card.querySelector('.entity-result__summary')?.innerText?.trim() || ''
};
results.push(profile);
} catch (e) {
console.error("Error extracting profile", e);
}
}
return results;
}''', count)
report_progress(ctx, 90, 100, "Saving session...")
await session.save_session(page)
report_progress(ctx, 100, 100, "Search complete")
return {
"status": "success",
"profiles": profiles,
"count": len(profiles),
"query": query
}
except Exception as e:
ctx.error(f"Profile search failed: {str(e)}")
return {
"status": "error",
"message": f"Failed to search profiles: {str(e)}"
}
@mcp.tool()
async def view_linkedin_profile(profile_url: str, ctx: Context) -> dict:
"""Visit and extract data from a specific LinkedIn profile"""
if not ('linkedin.com/in/' in profile_url):
return {
"status": "error",
"message": "Invalid LinkedIn profile URL. Should contain 'linkedin.com/in/'"
}
async with BrowserSession(platform='linkedin') as session:
try:
page = await session.new_page(profile_url)
# Check if we're logged in
if 'login' in page.url:
return {
"status": "error",
"message": "Not logged in. Please run login_linkedin tool first"
}
ctx.info(f"Viewing profile: {profile_url}")
# Wait for profile to load
await page.wait_for_selector('.pv-top-card', timeout=10000)
await ctx.report_progress(0.5, 1.0)
# Extract profile information
profile_data = await page.evaluate('''() => {
const getData = (selector, property = 'innerText') => {
const element = document.querySelector(selector);
return element ? element[property].trim() : null;
};
return {
name: getData('.pv-top-card--list .text-heading-xlarge'),
headline: getData('.pv-top-card--list .text-body-medium'),
location: getData('.pv-top-card--list .text-body-small:not(.inline)'),
connectionDegree: getData('.pv-top-card__connections-count .t-black--light'),
about: getData('.pv-shared-text-with-see-more .inline-show-more-text'),
experience: Array.from(document.querySelectorAll('#experience-section .pv-entity__summary-info'))
.map(exp => ({
title: exp.querySelector('h3')?.innerText?.trim() || '',
company: exp.querySelector('.pv-entity__secondary-title')?.innerText?.trim() || '',
duration: exp.querySelector('.pv-entity__date-range span:not(.visually-hidden)')?.innerText?.trim() || ''
})),
education: Array.from(document.querySelectorAll('#education-section .pv-education-entity'))
.map(edu => ({
school: edu.querySelector('.pv-entity__school-name')?.innerText?.trim() || '',
degree: edu.querySelector('.pv-entity__degree-name .pv-entity__comma-item')?.innerText?.trim() || '',
field: edu.querySelector('.pv-entity__fos .pv-entity__comma-item')?.innerText?.trim() || '',
dates: edu.querySelector('.pv-entity__dates span:not(.visually-hidden)')?.innerText?.trim() || ''
}))
};
}''')
await ctx.report_progress(1.0, 1.0)
await session.save_session(page)
return {
"status": "success",
"profile": profile_data,
"url": profile_url
}
except Exception as e:
ctx.error(f"Profile viewing failed: {str(e)}")
return {
"status": "error",
"message": f"Failed to extract profile data: {str(e)}"
}
@mcp.tool()
async def interact_with_linkedin_post(post_url: str, ctx: Context, action: str = "like", comment: str = None) -> dict:
"""Interact with a LinkedIn post (like, comment)"""
if not ('linkedin.com/posts/' in post_url or 'linkedin.com/feed/update/' in post_url):
return {
"status": "error",
"message": "Invalid LinkedIn post URL"
}
valid_actions = ["like", "comment", "read"]
if action not in valid_actions:
return {
"status": "error",
"message": f"Invalid action. Choose from: {', '.join(valid_actions)}"
}
async with BrowserSession(platform='linkedin', headless=False) as session:
try:
page = await session.new_page(post_url)
# Check if we're logged in
if 'login' in page.url:
return {
"status": "error",
"message": "Not logged in. Please run login_linkedin tool first"
}
# Wait for post to load
await page.wait_for_selector('.feed-shared-update-v2', timeout=10000)
ctx.info(f"Post loaded, performing action: {action}")
# Read post content
post_content = await page.evaluate('''() => {
const post = document.querySelector('.feed-shared-update-v2');
return {
author: post.querySelector('.feed-shared-actor__name')?.innerText?.trim() || 'Unknown',
content: post.querySelector('.feed-shared-text')?.innerText?.trim() || '',
engagementCount: post.querySelector('.social-details-social-counts__reactions-count')?.innerText?.trim() || '0'
};
}''')
# Perform the requested action
if action == "like":
# Find and click like button if not already liked
liked = await page.evaluate('''() => {
const likeButton = document.querySelector('button.react-button__trigger');
const isLiked = likeButton.getAttribute('aria-pressed') === 'true';
if (!isLiked) {
likeButton.click();
return true;
}
return false;
}''')
result = {
"status": "success",
"action": "like",
"performed": liked,
"message": "Successfully liked the post" if liked else "Post was already liked"
}
elif action == "comment" and comment:
# Add comment to the post
await page.click('button.comments-comment-box__trigger') # Open comment box
await page.fill('.ql-editor', comment)
await page.click('button.comments-comment-box__submit-button') # Submit comment
# Wait for comment to appear
await page.wait_for_timeout(2000)
result = {
"status": "success",
"action": "comment",
"message": "Comment posted successfully"
}
else: # action == "read"
result = {
"status": "success",
"action": "read",
"post": post_content
}
await session.save_session(page)
return result
except Exception as e:
ctx.error(f"Post interaction failed: {str(e)}")
return {
"status": "error",
"message": f"Failed to interact with post: {str(e)}"
}
if __name__ == "__main__":
try:
logger.debug("Starting LinkedIn MCP Server with debug logging")
# Initialize MCP server with simple configuration
try:
handle_notification(None, "initialized") # Pass None for ctx during initialization
mcp.run(transport='stdio')
except KeyboardInterrupt:
handle_notification(None, "cancelled", {"reason": "Server stopped by user"})
logger.info("Server stopped by user")
except Exception as e:
handle_notification(None, "cancelled", {"reason": str(e)})
logger.error(f"Server error: {str(e)}", exc_info=True)
sys.exit(1)
except Exception as e:
logger.error(f"Startup error: {str(e)}", exc_info=True)
sys.exit(1)