Skip to main content
Glama

LinkedIn MCP Server

web_scrapper.py23.3 kB
import asyncio import os import random import time from dataclasses import dataclass from datetime import datetime from functools import partial from multiprocessing import Pool, cpu_count from typing import Any, Dict, List, Set, Tuple import bs4 import requests from bs4 import BeautifulSoup from dotenv import load_dotenv from loguru import logger from selenium import webdriver from selenium.webdriver.chrome.options import Options from selenium.webdriver.common.by import By from selenium.webdriver.remote.webdriver import WebDriver from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.support.ui import WebDriverWait from linkedin_mcp_server.cache import BasicInMemoryCache # These two URL are the best ones to retrieve jobs without connecting to LinkedIn with your account JOB_RETRIEVAL_URL = "https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search-results/?distance=25&geoId=102277331&keywords=Looking%20for%20Research%20Enginer%2FMachine%20Learning%2FAI%20Engineer%20jobs%20in%20San%20Francisco" JOB_URL: str = "https://www.linkedin.com/jobs-guest/jobs/api/jobPosting/{job_id}" # Load environment variables from .env file # env_loaded: bool = load_dotenv() # if not env_loaded: # raise Exception("Failed to load environment variables from .env file") def setup_webdriver() -> WebDriver: """ Setup Chrome webdriver with headless mode and common options Returns: Configured Chrome webdriver """ chrome_options = Options() chrome_options.add_argument("--headless") chrome_options.add_argument("--no-sandbox") chrome_options.add_argument("--disable-dev-shm-usage") try: return webdriver.Chrome(options=chrome_options) except Exception as e: logger.error(f"Failed to initialize webdriver: {e}") raise def extract_job_description_worker(job_id: str) -> Dict[str, Any]: """ Worker function for extracting job description in a separate process. Args: job_id: LinkedIn job ID to process Returns: Dict containing job details """ # Each worker needs its own WebDriver instance driver = setup_webdriver() try: url = JOB_URL.format(job_id=job_id) logger.info(f"Worker processing job ID {job_id}") # Initialize job details with default values and URL job_details = { "linkedin_job_id": job_id, # Using a fixed key name instead of class attribute "url": url, "source": "linkedin", "scraped_at": datetime.now().isoformat(), "title": "N/A", "location": "N/A", "company": "N/A", "posted_date": "N/A", "number_of_applicants": "N/A", "raw_description": "N/A", "employment_type": "N/A", "seniority_level": "N/A", "job_function": "N/A", "industries": "N/A", "skills": "N/A", "company_details": "N/A" } driver.get(url) # Extract job title try: title_elem = driver.find_element( By.CSS_SELECTOR, ".top-card-layout__title, .topcard__title" ) job_details["title"] = title_elem.text.strip() logger.info(f"Extracted job title: {job_details['title']}") except Exception as e: logger.error(f"Failed to extract job title: {e}") # Extract company name and URL try: company_elem = driver.find_element( By.CSS_SELECTOR, "a.topcard__org-name-link, .topcard__flavor--black-link" ) job_details["company"] = company_elem.text.strip() company_url = company_elem.get_attribute('href') if company_url: job_details["company_url"] = company_url.split('?')[0] # Remove tracking params logger.info(f"Extracted company: {job_details['company']}") except Exception as e: logger.error(f"Failed to extract company name: {e}") # Extract location try: location_elem = driver.find_element( By.CSS_SELECTOR, ".topcard__flavor--bullet, .topcard__flavor:not(.topcard__flavor--black-link)" ) job_details["location"] = location_elem.text.strip() logger.info(f"Extracted location: {job_details['location']}") except Exception as e: logger.error(f"Failed to extract location: {e}") # Extract job description HTML and text try: # Get the full description HTML desc_elem = WebDriverWait(driver, 10).until( EC.presence_of_element_located(( By.CSS_SELECTOR, ".show-more-less-html__markup, " \ ".description__text, " \ ".jobs-box__html-content" )) ) # Get the raw description content (including HTML) raw_description = desc_elem.get_attribute('outerHTML') if not raw_description or len(raw_description.strip()) < 10: # Fallback to inner HTML if outer is empty raw_description = desc_elem.get_attribute('innerHTML') if raw_description and len(raw_description.strip()) > 10: # Ensure we have meaningful content job_details["raw_description"] = raw_description.strip() logger.info(f"Extracted raw job description with {len(raw_description)} characters") else: logger.warning("Could not extract meaningful job description content") except Exception as e: logger.error(f"Failed to extract job description: {e}") # Extract number of applicants try: # Try to find the number of applicants in the top card applicants_elem = driver.find_elements( By.CSS_SELECTOR, ".num-applicants__caption, " \ "[data-tracking-control-name='public_jobs_topcard-applicant-count'], " \ "figcaption.num-applicants__caption" ) if applicants_elem: applicants_text = applicants_elem[0].text.strip().lower() # Extract numeric value from text like "Over 200 applicants" or "200+ applicants" import re match = re.search(r'(\d+\+?|over\s+\d+)', applicants_text) if match: num_applicants = match.group(1).replace('+', '').replace('over', '').strip() if num_applicants.isdigit(): job_details["number_of_applicants"] = match.group(1).strip() logger.info(f"Found {match.group(1).strip()} applicants") except Exception as e: logger.warning(f"Could not extract number of applicants: {e}") # Extract job metadata (posted date, job type, etc.) try: # First try to extract from the top card (newer layout) meta_items = driver.find_elements( By.CSS_SELECTOR, ".posted-time-ago__text, " \ ".jobs-unified-top-card__job-insight, " \ ".job-flavors__label, " \ ".topcard__flavor--metadata, " \ ".description__job-criteria-item, " \ ".jobs-description-details__list-item, " \ ".description__job-criteria" ) # Try to find the job criteria section (newer layout) try: criteria_section = driver.find_element( By.CSS_SELECTOR, ".description__job-criteria" ) criteria_items = criteria_section.find_elements( By.CSS_SELECTOR, ".description__job-criteria-item" ) for item in criteria_items: try: label_elem = item.find_element( By.CSS_SELECTOR, ".description__job-criteria-subheader" ) value_elem = item.find_element( By.CSS_SELECTOR, ".description__job-criteria-text" ) if label_elem and value_elem: label = label_elem.text.strip().lower() value = value_elem.text.strip() if 'seniority' in label or 'level' in label: job_details["seniority_level"] = value elif 'employment type' in label or 'job type' in label: job_details["employment_type"] = value elif 'skill' in label.lower(): job_details["skills"] = str(value) if value is not None else "N/A" elif 'industr' in label.lower(): # Store industries as a single string industries_text = value.strip() job_details["industries"] = industries_text logger.info(f"Extracted industries: {industries_text}") elif 'posted' in label and 'date' in label: job_details["posted_date"] = value except Exception as e: logger.debug(f"Error extracting criteria item: {e}") continue except Exception as e: logger.debug(f"Could not find job criteria section: {e}") # Process any additional metadata from the top card for item in meta_items: text = item.text.strip().lower() if not text: continue # Extract posted date if we haven't found it yet if not job_details.get("posted_date") or job_details["posted_date"] == "N/A": if any(x in text for x in ['day', 'week', 'month', 'year', 'hour', 'minute', 'second', 'just now']): job_details["posted_date"] = item.text.strip() continue # Try to extract employment type if not found yet if not job_details.get("employment_type") or job_details["employment_type"] == "N/A": if any(x in text for x in ['full-time', 'part-time', 'contract', 'temporary', 'internship', 'apprenticeship']): job_details["employment_type"] = item.text.strip() continue # Try to extract seniority level if not found yet if not job_details.get("seniority_level") or job_details["seniority_level"] == "N/A": if any(x in text for x in ['entry', 'associate', 'mid', 'senior', 'lead', 'principal', 'director', 'vp', 'c-level', 'executive']): job_details["seniority_level"] = item.text.strip() continue # Check for employment type if any(term in text for term in ['full-time', 'part-time', 'contract', 'internship', 'temporary']): job_details["employment_type"] = text # Check for seniority level elif any(term in text for term in ['entry', 'associate', 'mid-senior', 'director', 'executive']): job_details["seniority_level"] = text # Check for job function elif any(term in text for term in ['engineering', 'product', 'design', 'marketing', 'sales']): job_details["job_function"] = text # Check for industries elif len(text.split(',')) > 1: # Likely industries job_details["industries"] = str(text) logger.info(f"Extracted metadata: {job_details.get('employment_type')}, {job_details.get('seniority_level')}") except Exception as e: logger.error(f"Failed to extract metadata: {e}") # Extract skills if available try: skills_section = driver.find_elements( By.CSS_SELECTOR, ".job-details-skill-match-status-list" ) if skills_section: try: skill_elems = driver.find_elements( By.CSS_SELECTOR, ".description__job-criteria-item--skills .description__job-criteria-text, " \ ".job-details-skill-match-status-list__text" ) if skill_elems: skills = ", ".join(elem.text.strip() for elem in skill_elems if elem.text.strip()) job_details["skills"] = skills logger.info(f"Extracted skills: {skills}") except Exception as e: logger.warning(f"Could not extract skills: {e}") except Exception as e: logger.debug(f"No skills section found or error extracting skills: {e}") # Extract company details if available try: company_section = driver.find_elements( By.CSS_SELECTOR, ".company-info" ) if company_section: company_details = [] # Company size size_elem = company_section[0].find_elements( By.CSS_SELECTOR, "[data-test='company-size']" ) if size_elem: size_text = size_elem[0].text.strip() if size_text: company_details.append(f"Size: {size_text}") # Company website website_elem = company_section[0].find_elements( By.CSS_SELECTOR, "a[data-test='company-website']" ) if website_elem: website = website_elem[0].get_attribute('href') if website: company_details.append(f"Website: {website}") # Join all details with semicolons company_details_str = "; ".join(company_details) if company_details else "N/A" job_details["company_details"] = company_details_str logger.info(f"Extracted company details: {company_details_str}") except Exception as e: logger.debug(f"No company details section found or error extracting: {e}") logger.info(f"Successfully extracted job details for: {job_details.get('title', 'N/A')} at {job_details.get('company', 'N/A')}") return job_details except Exception as e: logger.error(f"Error extracting job description for {job_id}: {str(e)}", exc_info=True) return {} finally: # Clean up the WebDriver try: driver.quit() except Exception as e: logger.error(f"Error closing WebDriver: {e}") @dataclass class JobPostingExtractor: """ A robust class for extracting job description details from various job posting URLs. Supports multiple job platforms with fallback mechanisms. """ _job_description_cache: BasicInMemoryCache | None = None linkedin_cache_key_name = "linkedin_job_id" def __post_init__(self): if not self._job_description_cache: self._job_description_cache = BasicInMemoryCache("linkedin-mcp", "raw_job_description_cache", "raw_job_descriptions.jsonl", cache_key_name=self.linkedin_cache_key_name, base_cache_dir=f"{os.environ.get('HOME')}/.cache") logger.info(f"Raw Description Cache initialized in {self._job_description_cache.cache_file}") def scrape_new_job_ids(self, new_job_ids: List[str], overwrite_cache_entries: bool = False) -> None: """ Scrape job descriptions for new job IDs using multiprocessing. Args: new_job_ids: List of job IDs to scrape overwrite_cache_entries: Whether to overwrite existing cache entries """ if not new_job_ids: logger.info("No new jobs to scrape") return logger.info(f"Scraping {len(new_job_ids)} new LinkedIn job IDs using multiprocessing") start_time = time.time() # Determine number of processes (use 75% of available CPUs) num_processes = 2 # max(1, int(cpu_count() * 0.75)) logger.info(f"Using {num_processes} processes for parallel scraping") # Create a process pool and map the job IDs to worker processes with Pool(processes=num_processes) as pool: try: # Map job IDs to worker processes results = pool.map(extract_job_description_worker, new_job_ids) # Filter out empty results and save to cache valid_results = [job for job in results if job] logger.info(f"Successfully scraped {len(valid_results)} out of {len(new_job_ids)} jobs") # Save to cache for job in valid_results: if self._job_description_cache is not None: self._job_description_cache.put(job, overwrite=overwrite_cache_entries) except Exception as e: logger.error(f"Error in parallel job scraping: {e}") raise finally: # Clean up pool.close() pool.join() duration = time.time() - start_time logger.info(f"Completed parallel scraping in {duration:.2f} seconds") def get_scraped_job_ids(self) -> List[str]: """Get a list of all job IDs that have already been scraped.""" if self._job_description_cache is None: return [] return list(self._job_description_cache._cache.keys()) def get_new_job_ids(self, job_ids: List[str]) -> List[str]: """ Filter out job IDs that have already been scraped. Args: job_ids: List of job IDs to check Returns: List of job IDs that haven't been scraped yet """ scraped_ids = set(self.get_scraped_job_ids()) logger.info(f"Found {len(scraped_ids)} scraped job IDs") logger.debug(f"Scraped job IDs: {scraped_ids}") new_job_ids = [job_id for job_id in job_ids if job_id not in scraped_ids] logger.info(f"Found {len(new_job_ids)} new jobs out of {len(job_ids)} total") return new_job_ids def retrieve_job_ids_from_linkedin(self, base_url: str = JOB_RETRIEVAL_URL, max_pages: int = 5) -> List[str]: """ Retrieve job IDs from LinkedIn using requests and BeautifulSoup. Args: max_pages: Maximum number of pages to scrape Returns: List of job IDs found """ logger.info(f"Starting job retrieval from LinkedIn\n({JOB_RETRIEVAL_URL})") start_time = time.time() all_job_ids: Set[str] = set() jobs_per_page = 10 url_with_pagination = base_url + "&start={}" for page in range(max_pages): try: start_idx = page * jobs_per_page url = url_with_pagination.format(start_idx) logger.info(f"Scraping job listings page {page + 1}: {url}") # Add random delay between requests time.sleep(random.uniform(1, 3)) res = requests.get(url) soup = BeautifulSoup(res.text, 'html.parser') for element in soup.find_all(attrs={"data-entity-urn": True}): if not isinstance(element, bs4.element.Tag): continue entity_urn = element.attrs.get("data-entity-urn") if isinstance(entity_urn, str) and entity_urn.startswith("urn:li:jobPosting:"): job_id = entity_urn.split(":")[-1] if job_id.isdigit(): all_job_ids.add(job_id) logger.info(f"Found job ID: {job_id}") except Exception as e: logger.error(f"Error scraping job listings page {page + 1}: {e}") continue duration = time.time() - start_time logger.info(f"Found {len(all_job_ids)} unique job IDs in {duration:.2f} seconds") return list(all_job_ids) def get_jobs_raw_metadata(self, job_ids: List[str]) -> Dict[str, Dict[str, Any]]: """ Gets the job description from the cache or scrapes it if not found. Args: job_ids: List of job IDs to get the description for Returns: List of job descriptions """ jobs_metadata: dict[str, Dict[str, Any]] = {} new_jobs = self.get_new_job_ids(job_ids) if new_jobs: self.scrape_new_job_ids(new_jobs) for job_id in job_ids: job_metadata = self._job_description_cache.get(job_id) if job_metadata is not None: jobs_metadata[job_id] = job_metadata else: logger.info(f"Job metadata not found for {job_id}") return jobs_metadata if __name__ == "__main__": extractor = JobPostingExtractor() # Get all job IDs from LinkedIn logger.info("Fetching job listings from LinkedIn...") all_job_ids = extractor.retrieve_job_ids_from_linkedin(max_pages=2) # Find only the new job IDs we haven't scraped yet new_job_ids = extractor.get_new_job_ids(all_job_ids) logger.info(f"Found {len(new_job_ids)} new jobs to process") # test_job_url = JOB_URL.format(job_id="4024185558") # test_job_url = JOB_URL.format(job_id="4051266841") # test_job_url = JOB_URL.format(job_id="4051266841") # new_job_ids = ["4024185558"] #, "4051266841", "4051266841"] extractor.scrape_new_job_ids(new_job_ids)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/francisco-perez-sorrosal/linkedin-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server