Skip to main content
Glama
vitalune

Personal Knowledge Assistant

by vitalune
linkedin_client.py37 kB
""" LinkedIn API Client Implementation This module provides comprehensive LinkedIn API integration with: - OAuth2 authentication - Profile data access and management - Post creation and management - Connection management - Company page operations (if applicable) - Analytics data retrieval - Professional network insights - Proper scope management - Compliance with LinkedIn's API usage policies """ import asyncio import json import urllib.parse from datetime import datetime, timezone, timedelta from typing import Dict, Any, List, Optional, Union from dataclasses import dataclass, field from pathlib import Path import httpx import structlog from .base_client import BaseAPIClient, RateLimitConfig, CircuitBreakerConfig from ..config.auth import AuthProvider, TokenType from ..utils.rate_limiter import get_rate_limit_manager logger = structlog.get_logger(__name__) @dataclass class LinkedInProfile: """LinkedIn profile data structure""" id: str first_name: str last_name: str email_address: Optional[str] = None headline: Optional[str] = None summary: Optional[str] = None location: Optional[Dict[str, Any]] = None industry: Optional[str] = None positions: List[Dict[str, Any]] = field(default_factory=list) educations: List[Dict[str, Any]] = field(default_factory=list) skills: List[Dict[str, Any]] = field(default_factory=list) profile_picture: Optional[str] = None public_profile_url: Optional[str] = None num_connections: Optional[int] = None num_connections_capped: bool = False @classmethod def from_api_response(cls, data: Dict[str, Any]) -> 'LinkedInProfile': """Create LinkedInProfile from API response""" return cls( id=data['id'], first_name=data.get('firstName', {}).get('localized', {}).get('en_US', ''), last_name=data.get('lastName', {}).get('localized', {}).get('en_US', ''), email_address=data.get('emailAddress'), headline=data.get('headline', {}).get('localized', {}).get('en_US'), summary=data.get('summary', {}).get('localized', {}).get('en_US'), location=data.get('location'), industry=data.get('industry'), positions=data.get('positions', {}).get('values', []), educations=data.get('educations', {}).get('values', []), skills=data.get('skills', {}).get('values', []), profile_picture=data.get('profilePicture', {}).get('displayImage'), public_profile_url=data.get('publicProfileUrl'), num_connections=data.get('numConnections'), num_connections_capped=data.get('numConnectionsCapped', False) ) @dataclass class LinkedInPost: """LinkedIn post data structure""" id: str author: str created_time: datetime last_modified_time: datetime text: Optional[str] = None visibility: str = "PUBLIC" lifecycle_state: str = "PUBLISHED" specific_content: Dict[str, Any] = field(default_factory=dict) activity: Dict[str, Any] = field(default_factory=dict) @classmethod def from_api_response(cls, data: Dict[str, Any]) -> 'LinkedInPost': """Create LinkedInPost from API response""" created_time = datetime.fromtimestamp( data['createdAt'] / 1000, timezone.utc ) if 'createdAt' in data else datetime.now(timezone.utc) last_modified_time = datetime.fromtimestamp( data['lastModifiedAt'] / 1000, timezone.utc ) if 'lastModifiedAt' in data else created_time # Extract text from specific content text = None specific_content = data.get('specificContent', {}) if 'com.linkedin.ugc.ShareContent' in specific_content: share_content = specific_content['com.linkedin.ugc.ShareContent'] share_commentary = share_content.get('shareCommentary', {}) text = share_commentary.get('text') return cls( id=data['id'], author=data.get('author', ''), created_time=created_time, last_modified_time=last_modified_time, text=text, visibility=data.get('visibility', {}).get('com.linkedin.ugc.MemberNetworkVisibility', 'PUBLIC'), lifecycle_state=data.get('lifecycleState', 'PUBLISHED'), specific_content=specific_content, activity=data.get('activity', {}) ) @dataclass class LinkedInConnection: """LinkedIn connection data structure""" id: str first_name: str last_name: str headline: Optional[str] = None location: Optional[str] = None industry: Optional[str] = None profile_picture: Optional[str] = None public_profile_url: Optional[str] = None @classmethod def from_api_response(cls, data: Dict[str, Any]) -> 'LinkedInConnection': """Create LinkedInConnection from API response""" return cls( id=data['id'], first_name=data.get('firstName', {}).get('localized', {}).get('en_US', ''), last_name=data.get('lastName', {}).get('localized', {}).get('en_US', ''), headline=data.get('headline', {}).get('localized', {}).get('en_US'), location=data.get('location', {}).get('name'), industry=data.get('industry'), profile_picture=data.get('profilePicture', {}).get('displayImage'), public_profile_url=data.get('publicProfileUrl') ) @dataclass class LinkedInCompany: """LinkedIn company data structure""" id: str name: str universal_name: Optional[str] = None description: Optional[str] = None website_url: Optional[str] = None industry: Optional[str] = None company_type: Optional[str] = None headquarters: Optional[Dict[str, Any]] = None logo: Optional[str] = None employee_count_range: Optional[Dict[str, Any]] = None founded: Optional[int] = None @classmethod def from_api_response(cls, data: Dict[str, Any]) -> 'LinkedInCompany': """Create LinkedInCompany from API response""" return cls( id=str(data['id']), name=data.get('name', {}).get('localized', {}).get('en_US', ''), universal_name=data.get('universalName'), description=data.get('description', {}).get('localized', {}).get('en_US'), website_url=data.get('websiteUrl'), industry=data.get('industry'), company_type=data.get('companyType'), headquarters=data.get('headquarters'), logo=data.get('logo', {}).get('original'), employee_count_range=data.get('employeeCountRange'), founded=data.get('foundedOn', {}).get('year') ) class LinkedInClient(BaseAPIClient): """LinkedIn API client with comprehensive functionality""" def __init__( self, client_id: str, client_secret: str, scopes: Optional[List[str]] = None, **kwargs ): # LinkedIn-specific rate limiting (more restrictive) rate_limit_config = RateLimitConfig( requests_per_minute=100, # Conservative limit requests_per_hour=1000, requests_per_day=10000, burst_size=3 ) super().__init__( provider=AuthProvider.LINKEDIN, base_url="https://api.linkedin.com/v2", rate_limit_config=rate_limit_config, **kwargs ) self.client_id = client_id self.client_secret = client_secret self.scopes = scopes or [ 'r_liteprofile', 'r_emailaddress', 'w_member_social' # Only for reading saved posts ] self.rate_limiter = get_rate_limit_manager() # User context (filled after authentication) self._authenticated_user: Optional[LinkedInProfile] = None def _format_auth_header(self, token: str) -> Dict[str, str]: """Format authentication header for LinkedIn API""" return {"Authorization": f"Bearer {token}"} async def authenticate(self, redirect_uri: str = "http://localhost:8080/oauth/callback") -> bool: """Authenticate with LinkedIn using OAuth2""" try: # Build authorization URL auth_params = { 'response_type': 'code', 'client_id': self.client_id, 'redirect_uri': redirect_uri, 'scope': ' '.join(self.scopes), 'state': self.auth_manager.create_session_state( provider=self.provider, redirect_uri=redirect_uri, scopes=self.scopes ) } auth_url = "https://www.linkedin.com/oauth/v2/authorization?" + urllib.parse.urlencode(auth_params) logger.info( "LinkedIn OAuth flow initiated", auth_url=auth_url, state=auth_params['state'][:8] + "...", scopes=self.scopes ) raise Exception(f"Please visit this URL to authorize: {auth_url}") except Exception as e: logger.error(f"LinkedIn authentication failed", error=str(e)) return False async def handle_oauth_callback(self, authorization_code: str, state: str) -> bool: """Handle OAuth callback with authorization code""" try: # Validate session state session_data = self.auth_manager.consume_session_state(state) if not session_data: raise ValueError("Invalid or expired OAuth state") # Exchange authorization code for tokens token_data = { 'grant_type': 'authorization_code', 'code': authorization_code, 'redirect_uri': session_data["redirect_uri"], 'client_id': self.client_id, 'client_secret': self.client_secret } headers = { 'Content-Type': 'application/x-www-form-urlencoded' } response = await self._make_request( method="POST", endpoint="https://www.linkedin.com/oauth/v2/accessToken", data=token_data, headers=headers, authenticated=False ) token_response = response.json() # Store tokens securely access_token_id = await self.auth_manager.store_token( provider=self.provider, token_type=TokenType.ACCESS_TOKEN, token_value=token_response['access_token'], expires_in=token_response.get('expires_in', 5184000), # Default ~60 days scopes=self.scopes, subject=None, # Will be populated after getting user info client_id=self.client_id ) # LinkedIn typically doesn't provide refresh tokens # in the standard OAuth flow # Get authenticated user info await self._load_authenticated_user() logger.info( "LinkedIn authentication successful", access_token_id=access_token_id, user_id=self._authenticated_user.id if self._authenticated_user else None, scopes=self.scopes ) return True except Exception as e: logger.error(f"LinkedIn OAuth callback failed", error=str(e)) return False async def refresh_token(self) -> bool: """Refresh LinkedIn access token""" # LinkedIn typically doesn't support token refresh in the standard flow # Users need to re-authenticate when tokens expire logger.warning("LinkedIn token refresh not supported - re-authentication required") return False async def _load_authenticated_user(self): """Load authenticated user information""" try: profile_data = await self.get_profile() self._authenticated_user = LinkedInProfile.from_api_response(profile_data) except Exception as e: logger.warning(f"Failed to load authenticated user", error=str(e)) async def get_rate_limits(self) -> Dict[str, Any]: """Get current rate limit status for LinkedIn""" status = await self.rate_limiter.get_all_status() return status.get("linkedin", {}) async def get_profile(self, person_id: str = "~") -> Dict[str, Any]: """Get LinkedIn profile information""" status = await self.rate_limiter.is_allowed("linkedin", "profile") if not status.allowed: raise Exception(f"Rate limited. Retry after {status.retry_after}s") try: # Build projection to specify fields projection = "(id,firstName,lastName,headline,summary,location,industry,positions,educations,skills,profilePicture(displayImage~:playableStreams),publicProfileUrl,numConnections,numConnectionsCapped)" response = await self._make_request( method="GET", endpoint=f"/people/{person_id}", params={'projection': projection} ) await self.rate_limiter.record_response("linkedin", "profile", True) return response.json() except Exception as e: await self.rate_limiter.record_response("linkedin", "profile", False) logger.error(f"Failed to get LinkedIn profile", person_id=person_id, error=str(e)) raise async def get_email_address(self) -> Dict[str, Any]: """Get authenticated user's email address""" status = await self.rate_limiter.is_allowed("linkedin", "email") if not status.allowed: raise Exception(f"Rate limited. Retry after {status.retry_after}s") try: projection = "(elements*(handle~))" response = await self._make_request( method="GET", endpoint="/emailAddress", params={'q': 'members', 'projection': projection} ) await self.rate_limiter.record_response("linkedin", "email", True) result = response.json() elements = result.get('elements', []) if elements: email_info = elements[0].get('handle~', {}) return { 'email_address': email_info.get('emailAddress'), 'primary': True } return {} except Exception as e: await self.rate_limiter.record_response("linkedin", "email", False) logger.error(f"Failed to get LinkedIn email", error=str(e)) raise async def get_posts( self, author: str = "~", max_results: int = 20, start: int = 0 ) -> List[LinkedInPost]: """Get posts by author""" status = await self.rate_limiter.is_allowed("linkedin", "posts") if not status.allowed: raise Exception(f"Rate limited. Retry after {status.retry_after}s") try: params = { 'q': 'authors', 'authors': f'urn:li:person:{author}' if author != "~" else 'urn:li:person:~', 'count': min(max_results, 50), # LinkedIn limit 'start': start, 'sortBy': 'LAST_MODIFIED' } response = await self._make_request( method="GET", endpoint="/ugcPosts", params=params ) await self.rate_limiter.record_response("linkedin", "posts", True) result = response.json() posts = [LinkedInPost.from_api_response(post_data) for post_data in result.get('elements', [])] return posts except Exception as e: await self.rate_limiter.record_response("linkedin", "posts", False) logger.error(f"Failed to get LinkedIn posts", author=author, error=str(e)) raise async def get_post(self, post_id: str) -> LinkedInPost: """Get a specific post""" status = await self.rate_limiter.is_allowed("linkedin", "posts") if not status.allowed: raise Exception(f"Rate limited. Retry after {status.retry_after}s") try: response = await self._make_request( method="GET", endpoint=f"/ugcPosts/{post_id}" ) await self.rate_limiter.record_response("linkedin", "posts", True) return LinkedInPost.from_api_response(response.json()) except Exception as e: await self.rate_limiter.record_response("linkedin", "posts", False) logger.error(f"Failed to get LinkedIn post", post_id=post_id, error=str(e)) raise async def create_post( self, text: str, visibility: str = "PUBLIC", media_urls: Optional[List[str]] = None ) -> LinkedInPost: """Create a new LinkedIn post""" # Note: Requires write permissions status = await self.rate_limiter.is_allowed("linkedin", "create_post") if not status.allowed: raise Exception(f"Rate limited. Retry after {status.retry_after}s") try: # Build post data post_data = { 'author': f'urn:li:person:{self._authenticated_user.id}' if self._authenticated_user else 'urn:li:person:~', 'lifecycleState': 'PUBLISHED', 'specificContent': { 'com.linkedin.ugc.ShareContent': { 'shareCommentary': { 'text': text }, 'shareMediaCategory': 'NONE' } }, 'visibility': { 'com.linkedin.ugc.MemberNetworkVisibility': visibility } } # Add media if provided if media_urls: post_data['specificContent']['com.linkedin.ugc.ShareContent']['shareMediaCategory'] = 'IMAGE' post_data['specificContent']['com.linkedin.ugc.ShareContent']['media'] = [ { 'status': 'READY', 'description': { 'text': 'Shared image' }, 'media': url, 'title': { 'text': 'Image' } } for url in media_urls ] response = await self._make_request( method="POST", endpoint="/ugcPosts", data=post_data ) await self.rate_limiter.record_response("linkedin", "create_post", True) return LinkedInPost.from_api_response(response.json()) except Exception as e: await self.rate_limiter.record_response("linkedin", "create_post", False) logger.error(f"Failed to create LinkedIn post", error=str(e)) raise async def delete_post(self, post_id: str) -> bool: """Delete a LinkedIn post""" # Note: Requires write permissions status = await self.rate_limiter.is_allowed("linkedin", "delete_post") if not status.allowed: raise Exception(f"Rate limited. Retry after {status.retry_after}s") try: response = await self._make_request( method="DELETE", endpoint=f"/ugcPosts/{post_id}" ) await self.rate_limiter.record_response("linkedin", "delete_post", True) return response.status_code == 204 except Exception as e: await self.rate_limiter.record_response("linkedin", "delete_post", False) logger.error(f"Failed to delete LinkedIn post", post_id=post_id, error=str(e)) return False async def get_connections( self, max_results: int = 100, start: int = 0 ) -> List[LinkedInConnection]: """Get user's connections""" status = await self.rate_limiter.is_allowed("linkedin", "connections") if not status.allowed: raise Exception(f"Rate limited. Retry after {status.retry_after}s") try: projection = "(elements*(to~(id,firstName,lastName,headline,location,industry,profilePicture(displayImage~:playableStreams),publicProfileUrl)))" params = { 'q': 'viewer', 'start': start, 'count': min(max_results, 500), # LinkedIn limit 'projection': projection } response = await self._make_request( method="GET", endpoint="/connections", params=params ) await self.rate_limiter.record_response("linkedin", "connections", True) result = response.json() connections = [] for element in result.get('elements', []): connection_data = element.get('to~', {}) if connection_data: connections.append(LinkedInConnection.from_api_response(connection_data)) return connections except Exception as e: await self.rate_limiter.record_response("linkedin", "connections", False) logger.error(f"Failed to get LinkedIn connections", error=str(e)) raise async def search_people( self, keywords: Optional[str] = None, first_name: Optional[str] = None, last_name: Optional[str] = None, company: Optional[str] = None, title: Optional[str] = None, max_results: int = 25, start: int = 0 ) -> List[Dict[str, Any]]: """Search for people on LinkedIn""" status = await self.rate_limiter.is_allowed("linkedin", "search") if not status.allowed: raise Exception(f"Rate limited. Retry after {status.retry_after}s") try: params = { 'q': 'people', 'start': start, 'count': min(max_results, 50) # LinkedIn limit } if keywords: params['keywords'] = keywords if first_name: params['firstName'] = first_name if last_name: params['lastName'] = last_name if company: params['company'] = company if title: params['title'] = title response = await self._make_request( method="GET", endpoint="/peopleSearch", params=params ) await self.rate_limiter.record_response("linkedin", "search", True) result = response.json() return result.get('people', {}).get('values', []) except Exception as e: await self.rate_limiter.record_response("linkedin", "search", False) logger.error(f"Failed to search LinkedIn people", error=str(e)) raise async def get_companies( self, company_ids: Optional[List[str]] = None, universal_names: Optional[List[str]] = None, max_results: int = 50 ) -> List[LinkedInCompany]: """Get company information""" status = await self.rate_limiter.is_allowed("linkedin", "companies") if not status.allowed: raise Exception(f"Rate limited. Retry after {status.retry_after}s") try: params = {} if company_ids: params['ids'] = ','.join(company_ids) elif universal_names: params['universalNames'] = ','.join(universal_names) else: # Get user's current company if no specific companies requested if self._authenticated_user and self._authenticated_user.positions: current_position = self._authenticated_user.positions[0] # Most recent company_id = current_position.get('company', {}).get('id') if company_id: params['ids'] = str(company_id) if not params: return [] response = await self._make_request( method="GET", endpoint="/companies", params=params ) await self.rate_limiter.record_response("linkedin", "companies", True) result = response.json() companies = [] for company_data in result.get('values', []): companies.append(LinkedInCompany.from_api_response(company_data)) return companies[:max_results] except Exception as e: await self.rate_limiter.record_response("linkedin", "companies", False) logger.error(f"Failed to get LinkedIn companies", error=str(e)) raise async def get_company_updates( self, company_id: str, max_results: int = 20, start: int = 0 ) -> List[Dict[str, Any]]: """Get company updates/posts""" status = await self.rate_limiter.is_allowed("linkedin", "company_updates") if not status.allowed: raise Exception(f"Rate limited. Retry after {status.retry_after}s") try: params = { 'company': company_id, 'start': start, 'count': min(max_results, 50) } response = await self._make_request( method="GET", endpoint="/companyUpdates", params=params ) await self.rate_limiter.record_response("linkedin", "company_updates", True) result = response.json() return result.get('values', []) except Exception as e: await self.rate_limiter.record_response("linkedin", "company_updates", False) logger.error(f"Failed to get company updates", company_id=company_id, error=str(e)) raise async def get_analytics( self, post_id: Optional[str] = None, time_range: str = "LAST_30_DAYS" ) -> Dict[str, Any]: """Get analytics data for posts or profile""" status = await self.rate_limiter.is_allowed("linkedin", "analytics") if not status.allowed: raise Exception(f"Rate limited. Retry after {status.retry_after}s") try: if post_id: # Get post-specific analytics response = await self._make_request( method="GET", endpoint=f"/socialActions/{post_id}", params={'projection': '(numComments,numLikes,numShares)'} ) else: # Get profile analytics (if available) params = { 'q': 'company', 'company': f'urn:li:organization:{self._authenticated_user.id}' if self._authenticated_user else '', 'timeRange': time_range } response = await self._make_request( method="GET", endpoint="/networkSizes", params=params ) await self.rate_limiter.record_response("linkedin", "analytics", True) return response.json() except Exception as e: await self.rate_limiter.record_response("linkedin", "analytics", False) logger.error(f"Failed to get LinkedIn analytics", post_id=post_id, error=str(e)) raise async def send_invitation( self, person_id: str, message: Optional[str] = None ) -> bool: """Send connection invitation""" # Note: Requires write permissions and may have restrictions status = await self.rate_limiter.is_allowed("linkedin", "invitations") if not status.allowed: raise Exception(f"Rate limited. Retry after {status.retry_after}s") try: invitation_data = { 'recipients': [f'urn:li:person:{person_id}'], 'subject': 'I would like to connect with you', 'body': message or 'I would like to add you to my professional network on LinkedIn.' } response = await self._make_request( method="POST", endpoint="/invitations", data=invitation_data ) await self.rate_limiter.record_response("linkedin", "invitations", True) return response.status_code == 201 except Exception as e: await self.rate_limiter.record_response("linkedin", "invitations", False) logger.error(f"Failed to send LinkedIn invitation", person_id=person_id, error=str(e)) return False async def export_data( self, output_path: Path, data_types: List[str] = None, max_items: int = 500 ) -> int: """Export LinkedIn data""" if data_types is None: data_types = ['profile', 'posts', 'connections'] logger.info(f"Starting LinkedIn export", output_path=str(output_path), data_types=data_types) output_path.mkdir(parents=True, exist_ok=True) exported_count = 0 for data_type in data_types: try: output_file = output_path / f"{data_type}.json" if data_type == 'profile': # Export profile information profile_data = await self.get_profile() email_data = await self.get_email_address() combined_data = { 'profile': profile_data, 'email': email_data } with open(output_file, 'w', encoding='utf-8') as f: json.dump(combined_data, f, indent=2, ensure_ascii=False) exported_count += 1 elif data_type == 'posts': # Export user's posts posts = await self.get_posts(max_results=max_items) posts_data = [post.__dict__ for post in posts] # Convert datetime objects to ISO strings def serialize_datetime(obj): if isinstance(obj, datetime): return obj.isoformat() return obj serialized_data = json.loads(json.dumps(posts_data, default=serialize_datetime)) with open(output_file, 'w', encoding='utf-8') as f: json.dump(serialized_data, f, indent=2, ensure_ascii=False) exported_count += len(posts) elif data_type == 'connections': # Export connections connections = await self.get_connections(max_results=max_items) connections_data = [conn.__dict__ for conn in connections] with open(output_file, 'w', encoding='utf-8') as f: json.dump(connections_data, f, indent=2, ensure_ascii=False) exported_count += len(connections) else: logger.warning(f"Unknown data type: {data_type}") continue logger.info(f"Exported {data_type}", output_file=str(output_file)) except Exception as e: logger.error(f"Failed to export {data_type}", error=str(e)) continue logger.info(f"LinkedIn export completed", exported_count=exported_count) return exported_count async def get_saved_posts( self, max_results: int = 50, start: int = 0 ) -> List[Dict[str, Any]]: """Get user's saved posts/articles""" # Note: This functionality may be limited or require special permissions status = await self.rate_limiter.is_allowed("linkedin", "saved_posts") if not status.allowed: raise Exception(f"Rate limited. Retry after {status.retry_after}s") try: params = { 'q': 'saved', 'start': start, 'count': min(max_results, 50) } response = await self._make_request( method="GET", endpoint="/posts", params=params ) await self.rate_limiter.record_response("linkedin", "saved_posts", True) result = response.json() return result.get('elements', []) except Exception as e: await self.rate_limiter.record_response("linkedin", "saved_posts", False) logger.error(f"Failed to get saved posts", error=str(e)) # Return empty list if feature not available return [] async def get_network_insights(self) -> Dict[str, Any]: """Get professional network insights""" status = await self.rate_limiter.is_allowed("linkedin", "insights") if not status.allowed: raise Exception(f"Rate limited. Retry after {status.retry_after}s") try: # Get basic network statistics insights = {} # Profile data if self._authenticated_user: insights['profile'] = { 'num_connections': self._authenticated_user.num_connections, 'num_connections_capped': self._authenticated_user.num_connections_capped, 'industry': self._authenticated_user.industry, 'location': self._authenticated_user.location } # Recent posts performance posts = await self.get_posts(max_results=10) if posts: insights['recent_posts'] = { 'count': len(posts), 'latest_post_date': posts[0].created_time.isoformat() if posts else None } # Connections data connections = await self.get_connections(max_results=100) if connections: industries = {} locations = {} for conn in connections: if conn.industry: industries[conn.industry] = industries.get(conn.industry, 0) + 1 if conn.location: locations[conn.location] = locations.get(conn.location, 0) + 1 insights['network_analysis'] = { 'top_industries': sorted(industries.items(), key=lambda x: x[1], reverse=True)[:5], 'top_locations': sorted(locations.items(), key=lambda x: x[1], reverse=True)[:5], 'total_analyzed': len(connections) } await self.rate_limiter.record_response("linkedin", "insights", True) return insights except Exception as e: await self.rate_limiter.record_response("linkedin", "insights", False) logger.error(f"Failed to get network insights", error=str(e)) raise

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/vitalune/Nexus-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server