Skip to main content
Glama

Ancestry MCP

by reeeeemo
from ast import Dict import asyncio import logging import json import os from pathlib import Path import mcp.types as types from mcp.server import Server from mcp.server.stdio import stdio_server from enum import Enum from pydantic import BaseModel import chardet ged_level_1_tags = ['BIRT', 'DEAT', 'MARR', 'BURI', 'DIV', 'OCCU', 'RESI', 'CHR'] # Tools schemas class ListFiles(BaseModel): name: str class RenameFiles(BaseModel): file_name: str new_name: str class ViewFiles(BaseModel): name: str # Tool names class AncestryTools(str, Enum): LIST_FILES = "list_files" RENAME_FILE = "rename_file" VIEW_FILES = "view_file" # Tool helper functions def find_files_with_name(name: str | None = None, path: Path | None = None) -> list[Path]: pattern = f"{name}.ged" if name is not None else "*.ged" return list(path.glob(pattern)) def rename_files(new_name: str | None = None, files: list[Path] | None = None) -> tuple[str, list[Dict], str]: try: renamed_files = [] for file in files: try: new_path = file.parent / f"{new_name.removesuffix('.ged')}.ged" if new_path.exists(): return [], f"Cannot rename, {new_path.name} already exists" file.rename(new_path) renamed_files.append(new_path) except PermissionError: return [], f'Permission denied: Cannot rename {file.name}. Check write perms' except OSError as e: return [], f'Error renaming {file.name}: {str(e)}' except Exception as e: return [], f'An unexpected error ocurred: {str(e)}. Please try again later or contact support.' return renamed_files, "" def parse_ged_file(files: list[Path] | None = None) -> tuple[list[Dict], str]: try: parsed_geds = {} for file in files: if not file.exists() or file.suffix.lower() != '.ged': continue parsed_geds[file.name] = [] # determine encoding raw_bytes = file.read_bytes() result = chardet.detect(raw_bytes) # open file, and parse ged data try: with file.open(encoding=result['encoding']) as ged: ged_obj = {} cur_lvl1_tag = None for line in ged: ''' Level 0: root records Level 1: main info about records Level 2: details about level 1 info ''' parts = line.strip().split(' ', 2) if not parts: continue level = int(parts[0]) tag = parts[1] value = parts[2] if len(parts) > 2 else '' if level == 0: # save prev obj if exists if ged_obj and 'type' in ged_obj: parsed_geds[file.name].append(ged_obj) ged_obj = {} if '@' in tag: # ID ged_obj['id'] = tag ged_obj['type'] = value elif level == 1: cur_lvl1_tag = tag if tag in ged_level_1_tags: ged_obj[tag] = {} else: ged_obj[tag] = value elif level == 2 and cur_lvl1_tag: # If parent is an event if cur_lvl1_tag in ged_level_1_tags: if cur_lvl1_tag not in ged_obj: ged_obj[cur_lvl1_tag] = {} ged_obj[cur_lvl1_tag][tag] = value elif cur_lvl1_tag == 'NAME': ged_obj[f'NAME_{tag}'] = value else: ged_obj[tag] = value if ged_obj and 'type' in ged_obj: parsed_geds[file.name].append(ged_obj) except UnicodeDecodeError: return [], f'File could not be decoded, please check encoding on the .ged' except Exception as e: return [], f'An unexpected error occured: {str(e)}. Please try again later or contact support.' return parsed_geds, "" # logging config logging.basicConfig( filename='mcp_ancestry.log', level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s' ) # server main code async def serve(gedcom_path: str | None = None) -> None: app = Server("ancestry") # Verification of GEDCOM path path = Path(gedcom_path) if not path.exists(): raise ValueError(f'Invalid path: {gedcom_path}') if not path.is_dir(): raise ValueError(f'GEDCOM path is not a directory: {gedcom_path}') if not os.access(path, os.R_OK): raise ValueError(f'GEDCOM path does not have read / write permissions: {gedcom_path}') # debug stuff ! logging.debug(f'Path exists and is valid: {path.absolute()}') logging.debug(f'Contents of directory: {list(path.iterdir())}') # makes GEDCOM files visible to Claude @app.list_resources() async def list_resources() -> list[types.Resource]: gedcom_files = list(path.glob("*.ged")) # scan gedcom path dir for .ged files return [ types.Resource( uri=f"gedcom://{file.name}", name=file.name, mimeType="application/x-gedcom" ) for file in gedcom_files ] @app.list_tools() async def list_tools() -> list[types.Tool]: return [ types.Tool( name=AncestryTools.LIST_FILES, description="List GEDCOM files", inputSchema=ListFiles.model_json_schema() ), types.Tool( name=AncestryTools.RENAME_FILE, description="Rename a GEDCOM file", inputSchema=RenameFiles.model_json_schema() ), types.Tool( name=AncestryTools.VIEW_FILES, description="View a GEDCOM file in plaintext format", inputSchema=ViewFiles.model_json_schema() ) ] @app.call_tool() async def call_tool(name: str, arguments: dict) -> list[types.TextContent]: match name: case AncestryTools.LIST_FILES: gedcom_files = find_files_with_name(arguments["name"].removesuffix('.ged'), path) return [ types.TextContent( type="text", text=f"File: {file.name}\nSize: {file.stat().st_size} bytes\nURI: gedcom://{file.name}" ) for file in gedcom_files ] case AncestryTools.RENAME_FILE: # get files, if none found tell server that gedcom_files = find_files_with_name(arguments["file_name"].removesuffix('.ged'), path) if not gedcom_files: return [ types.TextContent( type="text", text=f'No files found matching {arguments["file_name"]}' ) ] # rename files, if error message tell server renamed_files, message = rename_files(arguments["new_name"].removesuffix('.ged'), gedcom_files) if message: return [ types.TextContent( type="text", text=message ) ] return [ types.TextContent( type="text", text=f"{file.name}\nURI:gedcom://{file.name}" ) for file in renamed_files ] case AncestryTools.VIEW_FILES: # get files, if none found tell serve rthat gedcom_files = find_files_with_name(arguments["name"].removesuffix('.ged'), path) if not gedcom_files: return [ types.TextContent( type="text", text=f'No files found matching {arguments["name"]}' ) ] # show file, if error message tell server parsed_geds, message = parse_ged_file(gedcom_files) if message: return [ types.TextContent( type="text", text=message ) ] return [ types.TextContent( type="text", text=json.dumps({filename: data}, indent=2) ) for filename, data in parsed_geds.items() ] case _: raise ValueError(f"Unknown Tool: {name}") async with stdio_server() as streams: await app.run( streams[0], streams[1], app.create_initialization_options() ) if __name__ == "__main__": asyncio.run(serve())

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/reeeeemo/ancestry-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server