Skip to main content
Glama
francisco-perez-sorrosal

LinkedIn MCP Server

get_jobs_raw_metadata

Retrieve raw metadata for specified LinkedIn job IDs, enabling detailed analysis and extraction of job-related information directly within the LinkedIn MCP Server.

Instructions

Gets the job raw metadata for the given job IDs passed as parameter.

Args:
    job_ids: List of job IDs to get the job raw metadata for
    
Returns:
    Dict job ids as keys, and the corresponding job metadata information 
    as values (encoded also as a dictonary)

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
job_idsYes

Implementation Reference

  • MCP tool handler decorated with @mcp.tool(). Performs input validation on job_ids (limits to 20, strips and filters valid strings), delegates to extractor.get_jobs_raw_metadata(), logs progress, and returns the metadata dictionary.
    @mcp.tool()
    def get_jobs_raw_metadata(job_ids: List[str]) -> Dict[str, Any]:
        """
        Gets the job raw metadata for the given job IDs passed as parameter.
        
        Args:
            job_ids: List of job IDs to get the job raw metadata for (max 20 jobs recommended)
            
        Returns:
            Dict: Job IDs as keys, and the corresponding job metadata information as values
        """
        if len(job_ids) > 50:
            logger.warning(f"Large number of job IDs ({len(job_ids)}), limiting to first 20")
            job_ids = job_ids[:20]
        
        # Validate individual job IDs
        valid_job_ids = [job_id.strip() for job_id in job_ids if isinstance(job_id, str) and job_id.strip()]
        
        logger.info(f"Retrieving metadata for {len(valid_job_ids)} job IDs")
        
        metadata = extractor.get_jobs_raw_metadata(valid_job_ids)
        logger.info(f"Successfully retrieved metadata for {len(metadata)} jobs")
        return metadata
  • Core implementation in JobPostingExtractor.get_jobs_raw_metadata(). Identifies new job_ids not in cache, triggers scraping for new ones via scrape_new_job_ids(), then retrieves all from cache into a dict with job_id keys and metadata values.
    def get_jobs_raw_metadata(self, job_ids: List[str]) -> Dict[str, Dict[str, Any]]:
        """
        Gets the job description from the cache or scrapes it if not found.
        
        Args:
            job_ids: List of job IDs to get the description for
            
        Returns:
            List of job descriptions
        """
        
        jobs_metadata: dict[str, Dict[str, Any]] = {}
        
        new_jobs = self.get_new_job_ids(job_ids)
        
        if new_jobs:
            self.scrape_new_job_ids(new_jobs)
        
        for job_id in job_ids:
            job_metadata = self._job_description_cache.get(job_id)
            if job_metadata is not None:
                jobs_metadata[job_id] = job_metadata
            else:
                logger.info(f"Job metadata not found for {job_id}")
                
        return jobs_metadata
  • Helper method that performs parallel scraping of new job postings using multiprocessing Pool with extract_job_description_worker, caches successful results using BasicInMemoryCache.
    def scrape_new_job_ids(self, new_job_ids: List[str], overwrite_cache_entries: bool = False) -> None:
        """
        Scrape job descriptions for new job IDs using multiprocessing.
        
        Args:
            new_job_ids: List of job IDs to scrape
            overwrite_cache_entries: Whether to overwrite existing cache entries
        """
        if not new_job_ids:
            logger.info("No new jobs to scrape")
            return
            
        logger.info(f"Scraping {len(new_job_ids)} new LinkedIn job IDs using multiprocessing")
        start_time = time.time()
        
        # Determine number of processes (use 75% of available CPUs)
        num_processes = 2 # max(1, int(cpu_count() * 0.75))
        logger.info(f"Using {num_processes} processes for parallel scraping")
        
        # Create a process pool and map the job IDs to worker processes
        with Pool(processes=num_processes) as pool:
            try:
                # Map job IDs to worker processes
                results = pool.map(extract_job_description_worker, new_job_ids)
                
                # Filter out empty results and save to cache
                valid_results = [job for job in results if job]
                logger.info(f"Successfully scraped {len(valid_results)} out of {len(new_job_ids)} jobs")
                
                # Save to cache
                for job in valid_results:
                    if self._job_description_cache is not None:
                        self._job_description_cache.put(job, overwrite=overwrite_cache_entries)
                
            except Exception as e:
                logger.error(f"Error in parallel job scraping: {e}")
                raise
            finally:
                # Clean up
                pool.close()
                pool.join()
        
        duration = time.time() - start_time
        logger.info(f"Completed parallel scraping in {duration:.2f} seconds")
Install Server

Other Tools

Related Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/francisco-perez-sorrosal/linkedin-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server