Skip to main content
Glama
Kaiohz

Prospectio MCP API

jsearch.py7.33 kB
from uuid import uuid4 import httpx from typing import TypeVar from prospectio_api_mcp.domain.ports.fetch_leads import FetchLeadsPort from infrastructure.dto.rapidapi.jsearch import JSearchResponseDTO from config import JsearchConfig from infrastructure.api.client import BaseApiClient from domain.entities.company import Company, CompanyEntity from domain.entities.job import Job, JobEntity from prospectio_api_mcp.domain.entities.leads import Leads from datetime import datetime import asyncio T = TypeVar("T") class JsearchAPI(FetchLeadsPort): """ Adapter for the JSearch API to fetch job data. """ def __init__(self, config: JsearchConfig) -> None: """ Initialize JSearchAPI with configuration. Args: config (JSearchConfig): JSearch API configuration object. """ self.api_base = config.JSEARCH_API_URL self.api_keys = config.RAPIDAPI_API_KEY self.search_endpoint = "/search" def _get_headers(self, api_key: str) -> dict[str, str]: """ Get headers for API requests with the specified API key. Args: api_key (str): The RapidAPI key to use. Returns: dict[str, str]: Headers for the API request. """ return { "accept": "application/json", "x-rapidapi-host": self.api_base.split("//")[-1].split("/")[0], "x-rapidapi-key": api_key, } async def _make_request_with_retry(self, endpoint: str, params: dict, key_index: int = 0) -> JSearchResponseDTO: """ Make API request with recursive retry logic for different API keys on 429 errors. Args: endpoint (str): The API endpoint to call. params (dict): Request parameters. key_index (int): Current key index to try. Returns: JSearchResponseDTO: The parsed response data. Raises: Exception: If all API keys are exhausted or other errors occur. """ if key_index >= len(self.api_keys): raise Exception("All API keys exhausted due to rate limiting") headers = self._get_headers(self.api_keys[key_index]) client = BaseApiClient(self.api_base, headers) result = await client.get(endpoint, params) if result.status_code == 429: await client.close() return await self._make_request_with_retry(endpoint, params, key_index + 1) return await self._check_error(client, result, JSearchResponseDTO) async def _check_error( self, client: BaseApiClient, result: httpx.Response, dto_type: type[T] ) -> T: """ Check the HTTP response for errors and parse the response into the given DTO type. Closes the client after processing. Args: client (BaseApiClient): The API client instance to close. result (httpx.Response): The HTTP response from the API call. dto_type (type[T]): The DTO class to parse the response into. Raises: Exception: If the response status code is not 200. Returns: T: An instance of the DTO type with the response data. """ if result.status_code != 200: await client.close() raise Exception(f"Failed to fetch leads: {result.text}") dto = dto_type(**result.json()) await client.close() return dto async def to_company_entity( self, dto: JSearchResponseDTO ) -> tuple[CompanyEntity, list[str]]: """ Convert JSearch response DTO to CompanyEntity. Args: dto (JSearchResponseDTO): The JSearch API response data. Returns: CompanyEntity: Entity containing companies from JSearch data. """ companies: list[Company] = [] ids: list[str] = [] for jsearch_company in dto.data if dto.data else []: company = Company( # type: ignore id=str(uuid4()), name=jsearch_company.employer_name, source="jsearch" ) ids.append(company.id or str(uuid4())) companies.append(company) return CompanyEntity(companies=companies), ids # type: ignore async def to_job_entity(self, dto: JSearchResponseDTO, ids: list[str]) -> JobEntity: """ Convert JSearch response DTO to JobEntity. Args: dto (JSearchResponseDTO): The JSearch API response data. Returns: JobEntity: Entity containing jobs from JSearch data. """ jobs: list[Job] = [] for index, job in enumerate(dto.data) if dto.data else []: job.job_id = str(uuid4()) job_entity = Job( # type: ignore id=job.job_id, company_id=ids[index], date_creation=( job.job_posted_at_datetime_utc.rstrip("Z") if job.job_posted_at_datetime_utc else datetime.now().isoformat() ), description=job.job_description, job_title=job.job_title, location=job.job_location, salary=f"{job.job_min_salary or ''} - {job.job_max_salary or ''}", job_type=job.job_employment_type, apply_url=[job.job_apply_link or "", job.job_google_link or ""], ) jobs.append(job_entity) return JobEntity(jobs=jobs) # type: ignore async def process_results( self, company_result: CompanyEntity, job_result: JobEntity, params: dict ) -> None: await asyncio.sleep(1) # Rate limiting to avoid hitting API limits jsearch = await self._make_request_with_retry(self.search_endpoint, params) company_entity, ids = await self.to_company_entity(jsearch) job_entity = await self.to_job_entity(jsearch, ids) company_result.companies.extend(company_entity.companies) job_result.jobs.extend(job_entity.jobs) async def fetch_leads(self, location: str, job_title: list[str]) -> Leads: """ Fetch jobs from the JSearch API based on search parameters. Args: location (str): The location to search jobs in. job_title (list[str]): List of job titles to search for. Returns: Leads: The leads containing companies and jobs data. """ params_list = [] company_result: CompanyEntity = CompanyEntity(companies=[]) # type: ignore job_result: JobEntity = JobEntity(jobs=[]) # type: ignore for title in job_title[:2]: params = { "query": f"{title} in {location}", "page": 1, "num_pages": 1, "date_posted": "month", "country": location[0:2].lower(), } params_list.append(params) await asyncio.gather( *[ self.process_results(company_result, job_result, params) for params in params_list ] ) # Combine results into a Leads object leads = Leads( companies=company_result, jobs=job_result, contacts=None, ) # type: ignore return leads

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Kaiohz/prospectio-api-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server