get_jobs_raw_metadata
Retrieve raw metadata for specified LinkedIn job IDs, enabling detailed analysis and extraction of job-related information directly within the LinkedIn MCP Server.
Instructions
Gets the job raw metadata for the given job IDs passed as parameter.
Args:
job_ids: List of job IDs to get the job raw metadata for
Returns:
Dict job ids as keys, and the corresponding job metadata information
as values (encoded also as a dictonary)
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| job_ids | Yes |
Implementation Reference
- src/linkedin_mcp_server/main.py:138-160 (handler)MCP tool handler decorated with @mcp.tool(). Performs input validation on job_ids (limits to 20, strips and filters valid strings), delegates to extractor.get_jobs_raw_metadata(), logs progress, and returns the metadata dictionary.@mcp.tool() def get_jobs_raw_metadata(job_ids: List[str]) -> Dict[str, Any]: """ Gets the job raw metadata for the given job IDs passed as parameter. Args: job_ids: List of job IDs to get the job raw metadata for (max 20 jobs recommended) Returns: Dict: Job IDs as keys, and the corresponding job metadata information as values """ if len(job_ids) > 50: logger.warning(f"Large number of job IDs ({len(job_ids)}), limiting to first 20") job_ids = job_ids[:20] # Validate individual job IDs valid_job_ids = [job_id.strip() for job_id in job_ids if isinstance(job_id, str) and job_id.strip()] logger.info(f"Retrieving metadata for {len(valid_job_ids)} job IDs") metadata = extractor.get_jobs_raw_metadata(valid_job_ids) logger.info(f"Successfully retrieved metadata for {len(metadata)} jobs") return metadata
- Core implementation in JobPostingExtractor.get_jobs_raw_metadata(). Identifies new job_ids not in cache, triggers scraping for new ones via scrape_new_job_ids(), then retrieves all from cache into a dict with job_id keys and metadata values.def get_jobs_raw_metadata(self, job_ids: List[str]) -> Dict[str, Dict[str, Any]]: """ Gets the job description from the cache or scrapes it if not found. Args: job_ids: List of job IDs to get the description for Returns: List of job descriptions """ jobs_metadata: dict[str, Dict[str, Any]] = {} new_jobs = self.get_new_job_ids(job_ids) if new_jobs: self.scrape_new_job_ids(new_jobs) for job_id in job_ids: job_metadata = self._job_description_cache.get(job_id) if job_metadata is not None: jobs_metadata[job_id] = job_metadata else: logger.info(f"Job metadata not found for {job_id}") return jobs_metadata
- Helper method that performs parallel scraping of new job postings using multiprocessing Pool with extract_job_description_worker, caches successful results using BasicInMemoryCache.def scrape_new_job_ids(self, new_job_ids: List[str], overwrite_cache_entries: bool = False) -> None: """ Scrape job descriptions for new job IDs using multiprocessing. Args: new_job_ids: List of job IDs to scrape overwrite_cache_entries: Whether to overwrite existing cache entries """ if not new_job_ids: logger.info("No new jobs to scrape") return logger.info(f"Scraping {len(new_job_ids)} new LinkedIn job IDs using multiprocessing") start_time = time.time() # Determine number of processes (use 75% of available CPUs) num_processes = 2 # max(1, int(cpu_count() * 0.75)) logger.info(f"Using {num_processes} processes for parallel scraping") # Create a process pool and map the job IDs to worker processes with Pool(processes=num_processes) as pool: try: # Map job IDs to worker processes results = pool.map(extract_job_description_worker, new_job_ids) # Filter out empty results and save to cache valid_results = [job for job in results if job] logger.info(f"Successfully scraped {len(valid_results)} out of {len(new_job_ids)} jobs") # Save to cache for job in valid_results: if self._job_description_cache is not None: self._job_description_cache.put(job, overwrite=overwrite_cache_entries) except Exception as e: logger.error(f"Error in parallel job scraping: {e}") raise finally: # Clean up pool.close() pool.join() duration = time.time() - start_time logger.info(f"Completed parallel scraping in {duration:.2f} seconds")