Skip to main content
Glama
Kaiohz

Prospectio MCP API

leads_routes.py8.42 kB
import asyncio import logging import traceback from typing import Union from fastapi import APIRouter, HTTPException, Path from application.requests.insert_leads import InsertLeadsRequest from application.use_cases.generate_message import GenerateMessageUseCase from application.use_cases.get_leads import GetLeadsUseCase from domain.entities.company import CompanyEntity from domain.entities.contact import ContactEntity from domain.entities.job import JobEntity from domain.entities.leads import Leads from domain.entities.prospect_message import ProspectMessage from domain.ports.compatibility_score import CompatibilityScorePort from domain.ports.enrich_leads import EnrichLeadsPort from domain.ports.generate_message import GenerateMessagePort from domain.ports.profile_respository import ProfileRepositoryPort from domain.ports.task_manager import TaskManagerPort from domain.services.leads.leads_processor import LeadsProcessor from prospectio_api_mcp.application.use_cases.insert_leads import ( InsertLeadsUseCase, ) from collections.abc import Callable from domain.services.leads.strategy import LeadsStrategy from domain.ports.leads_repository import LeadsRepositoryPort from application.api.mcp_routes import mcp_prospectio from uuid import uuid4 from domain.entities.task import Task logger = logging.getLogger(__name__) def leads_router( jobs_strategy: dict[str, Callable[[str, list[str]], LeadsStrategy]], repository: LeadsRepositoryPort, compatibility: CompatibilityScorePort, profile_repository: ProfileRepositoryPort, enrich_port: EnrichLeadsPort, message_port: GenerateMessagePort, task_manager: TaskManagerPort, ) -> APIRouter: """ Create an APIRouter for company jobs endpoints with injected strategy. Args: jobs_strategy (dict[str, callable]): Mapping of source to strategy factory. repository (LeadsRepositoryPort): Repository for data persistence. Returns: APIRouter: Configured router with endpoints. """ leads_router = APIRouter() @leads_router.get("/leads/{type}/{offset}/{limit}") @mcp_prospectio.tool( description="ALWAYS USE THIS FIRST to retrieve existing data from the database before searching for new opportunities. " "Returns companies, jobs, contacts or leads that are already stored in the database. " "This endpoint is paginated: use the 'offset' parameter to paginate through results. Offset begins at 0. " "Pagination size: 5 for companies, 10 for contacts, 3 for jobs. " "Use this tool when the user wants to see existing leads, companies, jobs, or contacts. " "Only use the insert/leads endpoint when the user specifically asks for new opportunities or when no relevant data is found in the database. " "The parameter 'type' can be: 'companies', 'jobs', 'contacts', or 'leads'. " "Example: GET /get/leads/companies/0 to get the first 5 companies, /get/leads/companies/5 for the next 5, /get/leads/contacts/0 for the first 10 contacts, /get/leads/jobs/0 for the first 3 jobs, etc." ) async def get_leads( type: str = Path(..., description="Lead source"), offset: int = Path(..., description="Offset for pagination"), limit: int = Path(..., description="Limit for pagination"), ) -> Union[Leads, CompanyEntity, JobEntity, ContactEntity]: try: leads = await GetLeadsUseCase(type, repository).get_leads(offset, limit) return leads except Exception as e: logger.error(f"Error in get leads: {e}\n{traceback.format_exc()}") raise HTTPException(status_code=500, detail=str(e)) @leads_router.post("/insert/leads") @mcp_prospectio.tool( description="Use this ONLY when the user asks for NEW opportunities/leads or when get/leads returns insufficient data. " "This tool searches external sources and inserts NEW leads into the database. " "Sources available: 'jsearch', 'active_jobs_db'." "If a source does not work or returns no data, try another one. " "Requires location (country code) and job titles as technologies or job titles (e.g., 'Python', 'AI', 'RAG', 'LLM', 'Tech lead', 'Software Engineer'). Focus on technologies found on profile." "IMPORTANT: Before using this, check the user profile or ask to update it if missing. " 'Example JSON: {"source": "jsearch", "location": "FR", "job_params": ["Python", "AI", "RAG", "LLM", "Tech lead", "Software Engineer"]}' ) async def insert_leads( request: InsertLeadsRequest ) -> Task: """ Retrieve leads with contacts from the specified source. Args: source (str): The source from which to get leads with contacts. location (str): The country code for the location. job_title (list[str]): List of job titles to filter leads. Returns: dict: A dictionary containing the leads data. """ try: if request.source not in jobs_strategy: raise ValueError(f"Unknown source: {request.source}") job_params = [title.strip().lower() for title in request.job_params] location = request.location.strip().lower() strategy = request.source.strip().lower() task_uuid = str(uuid4()) strategy = jobs_strategy[request.source](location, job_params) processor = LeadsProcessor(compatibility) asyncio.create_task( InsertLeadsUseCase( task_uuid, strategy, repository, processor, profile_repository, enrich_port, task_manager ).insert_leads() ) return Task( task_id=task_uuid, message=f"Lead insertion started for source '{request.source}' in location '{location}'", status="processing" ) except Exception as e: await task_manager.remove_task(task_uuid) logger.error(f"Error in insert leads: {e}\n{traceback.format_exc()}") raise HTTPException(status_code=500, detail=str(e)) @leads_router.get("/task/{task_id}") @mcp_prospectio.tool( description="Check the status and progress of a background task by its unique ID. " "Essential for monitoring long-running operations like lead insertion, enrichment, or data processing. " "Returns current status ('processing', 'completed', 'failed'), progress information, and any error details. " "Use the task_id returned from operations like '/insert/leads' to track their execution. " "Completed or failed tasks are automatically cleaned up after status retrieval. " "Example: After starting lead insertion, use this to monitor when new leads are ready in the database." ) async def get_task_status(task_id: str) -> Task: """ Get the status of a task by its ID. Args: task_id (str): The ID of the task to retrieve. Returns: Task: The task with its current status. """ try: task = await task_manager.get_task_status(task_id) if not task: raise HTTPException(status_code=404, detail="Task not found") if task.status == "completed" or task.status == "failed": await task_manager.remove_task(task_id) return task except Exception as e: logger.error(f"Error in get task status: {e}\n{traceback.format_exc()}") raise HTTPException(status_code=500, detail=str(e)) @leads_router.get("/generate/message/{contact_id}") async def generate_prospecting_message(contact_id: str) -> ProspectMessage: """ Generate a prospecting message for a contact by its ID. Args: id (str): The ID of the contact to generate a message for. Returns: str: The generated prospecting message. """ try: message = await GenerateMessageUseCase(repository, profile_repository, message_port).generate_message(contact_id) return message except Exception as e: logger.error(f"Error in generate prospecting message: {e}\n{traceback.format_exc()}") raise HTTPException(status_code=500, detail=str(e)) return leads_router

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Kaiohz/prospectio-api-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server