Skip to main content
Glama
Kaiohz

Prospectio MCP API

active_jobs_db.py6.99 kB
from re import A import httpx from uuid import uuid4 from typing import TypeVar from prospectio_api_mcp.domain.ports.fetch_leads import FetchLeadsPort from infrastructure.dto.rapidapi.active_jobs_db import ActiveJobsResponseDTO from config import ActiveJobsDBConfig from infrastructure.api.client import BaseApiClient from domain.entities.company import Company, CompanyEntity from domain.entities.job import Job, JobEntity from domain.entities.leads import Leads from datetime import datetime T = TypeVar("T") class ActiveJobsDBAPI(FetchLeadsPort): """ Adapter for the Active Jobs DB API to fetch job data. """ def __init__(self, config: ActiveJobsDBConfig) -> None: """ Initialize ActiveJobsDBAPI with configuration. Args: config (ActiveJobsDBConfig): Active Jobs DB API configuration object. """ self.api_base = config.ACTIVE_JOBS_DB_URL self.api_keys = config.RAPIDAPI_API_KEY self.endpoint = "/active-ats-7d" async def _check_error( self, client: BaseApiClient, result: httpx.Response, dto_type: type[T] ) -> T: """ Check the HTTP response for errors and parse the response into the given DTO type. Closes the client after processing. Args: client (BaseApiClient): The API client instance to close. result (httpx.Response): The HTTP response from the API call. dto_type (type[T]): The DTO class to parse the response into. Raises: Exception: If the response status code is not 200. Returns: T: An instance of the DTO type with the response data. """ if result.status_code != 200: await client.close() raise Exception(f"Failed to fetch jobs: {result.text}") dto = dto_type(**{"active_jobs": result.json()}) await client.close() return dto def _get_headers(self, api_key: str) -> dict[str, str]: """ Get headers for API requests with the specified API key. Args: api_key (str): The RapidAPI key to use. Returns: dict[str, str]: Headers for the API request. """ return { "accept": "application/json", "x-rapidapi-host": self.api_base.split("//")[-1].split("/")[0], "x-rapidapi-key": api_key, } async def _make_request_with_retry(self, endpoint: str, params: dict, key_index: int = 0) -> ActiveJobsResponseDTO: """ Make API request with recursive retry logic for different API keys on 429 errors. Args: endpoint (str): The API endpoint to call. params (dict): Request parameters. key_index (int): Current key index to try. Returns: ActiveJobsResponseDTO: The parsed response data. Raises: Exception: If all API keys are exhausted or other errors occur. """ if key_index >= len(self.api_keys): raise Exception("All API keys exhausted due to rate limiting") headers = self._get_headers(self.api_keys[key_index]) client = BaseApiClient(self.api_base, headers) result = await client.get(endpoint, params) if result.status_code == 429: await client.close() return await self._make_request_with_retry(endpoint, params, key_index + 1) return await self._check_error(client, result, ActiveJobsResponseDTO) async def to_company_entity( self, dto: ActiveJobsResponseDTO ) -> tuple[CompanyEntity, list[str]]: """ Convert Active Jobs DB response DTO to CompanyEntity. Args: dto (ActiveJobsResponseDTO): The Active Jobs DB API response data. Returns: tuple[CompanyEntity, list[str]]: Entity containing companies and their IDs. """ companies: list[Company] = [] ids: list[str] = [] for active_job in dto.active_jobs if dto.active_jobs else []: company_id = str(uuid4()) company = Company( # type: ignore id=company_id, name=active_job.organization, source="active_jobs_db", website=active_job.organization_url, ) ids.append(company_id) companies.append(company) return CompanyEntity(companies=companies), ids # type: ignore async def to_job_entity( self, dto: ActiveJobsResponseDTO, ids: list[str] ) -> JobEntity: """ Convert Active Jobs DB response DTO to JobEntity. Args: dto (ActiveJobsResponseDTO): The Active Jobs DB API response data. ids (list[str]): List of company IDs to associate with jobs. Returns: JobEntity: Entity containing jobs from Active Jobs DB data. """ jobs: list[Job] = [] for index, active_job in enumerate(dto.active_jobs) if dto.active_jobs else []: active_job.id = str(uuid4()) job_entity = Job( # type: ignore id=active_job.id, company_id=ids[index] if index < len(ids) else str(uuid4()), date_creation=active_job.date_posted or datetime.now().isoformat(), description=active_job.description_text, job_title=active_job.title, location=( ", ".join(active_job.locations_derived) if active_job.locations_derived else None ), salary=str(active_job.salary_raw) if active_job.salary_raw else None, job_type=( ", ".join(active_job.employment_type) if active_job.employment_type else None ), apply_url=[active_job.url or ""], ) jobs.append(job_entity) return JobEntity(jobs=jobs) # type: ignore async def fetch_leads(self, location: str, job_title: list[str]) -> Leads: """ Fetch jobs from the Active Jobs DB API based on search parameters. Args: location (str): The location to search jobs in. job_title (list[str]): List of job titles to search for. Returns: Leads: The leads containing companies and jobs data. """ params = { "limit": 10, "offset": 0, "advanced_title_filter": f"{' | '.join(job_title)}", "location_filter": location, "description_type": "text", } active_jobs = await self._make_request_with_retry(self.endpoint, params) company_entity, ids = await self.to_company_entity(active_jobs) job_entity = await self.to_job_entity(active_jobs, ids) return Leads(companies=company_entity, jobs=job_entity, contacts=None) # type: ignore

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Kaiohz/prospectio-api-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server