Skip to main content
Glama
Kaiohz

Prospectio MCP API

profile_database.py3.92 kB
from typing import Optional, List from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker from sqlalchemy.dialects.postgresql import insert from sqlalchemy import select from domain.ports.profile_respository import ProfileRepositoryPort from domain.entities.profile import Profile from domain.entities.work_experience import WorkExperience from infrastructure.dto.database.profile import ProfileDTO class ProfileDatabase(ProfileRepositoryPort): """ SQLAlchemy implementation of the profile repository port. Handles inserting and updating profile data in the database using async operations. """ def __init__(self, database_url: str): """ Initialize the profile database repository. Args: database_url (str): Async database connection URL (should start with postgresql+asyncpg://). """ self.database_url = database_url self.engine = create_async_engine(database_url) self.async_session = async_sessionmaker(self.engine) def _convert_dto_to_entity(self, profile_dto: ProfileDTO) -> Profile: """ Convert ProfileDTO to Profile entity. Args: profile_dto (ProfileDTO): The database DTO to convert. Returns: Profile: The converted domain entity. """ work_experiences = [] if profile_dto.work_experience: work_experiences = [ WorkExperience(**exp) for exp in profile_dto.work_experience ] return Profile( job_title=profile_dto.job_title, location=profile_dto.location, bio=profile_dto.bio, work_experience=work_experiences, technos=profile_dto.technos or [] ) async def get_profile(self) -> Optional[Profile]: """ Retrieve a profile from the database by ID. Args: profile_id (int): The profile ID to retrieve. Defaults to 1. Returns: Optional[Profile]: The profile entity if found, None otherwise. """ async with self.async_session() as session: try: stmt = select(ProfileDTO).where(ProfileDTO.id == 1) result = await session.execute(stmt) profile_dto = result.scalar_one_or_none() if profile_dto: return self._convert_dto_to_entity(profile_dto) return None except Exception as e: raise e async def upsert_profile(self, profile: Profile) -> ProfileDTO: """ Insert or update a profile in the database. Args: profile: The profile entity to upsert. Returns: ProfileDTO: The created or updated profile with database ID. """ async with self.async_session() as session: try: # Convert work_experience to JSON format profile_data = profile.model_dump() if profile_data.get("work_experience"): profile_data["work_experience"] = [ exp.model_dump() if hasattr(exp, "model_dump") else exp for exp in profile_data["work_experience"] ] # Update existing profile stmt = ( insert(ProfileDTO) .values(id=1, **profile_data) .on_conflict_do_update(index_elements=["id"], set_=profile_data) .returning(ProfileDTO) ) result = await session.execute(stmt) profile_dto = result.scalar_one() await session.commit() await session.refresh(profile_dto) return profile_dto except Exception as e: await session.rollback() raise e

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Kaiohz/prospectio-api-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server