bible.py•20.3 kB
# bible.py
from typing import Any
from dotenv import load_dotenv
import httpx
import os
import json
from fastmcp import FastMCP
from fastmcp.server.auth.auth import OAuthProvider
from fastmcp.exceptions import ToolError
from mcp.server.auth.provider import AccessToken, AuthorizationCode, AuthorizationParams, RefreshToken
from mcp.shared.auth import OAuthClientInformationFull, OAuthToken
from typing import Annotated
from pydantic import Field
load_dotenv()
class SimpleTokenAuthProvider(OAuthProvider):
"""Simple token-based auth that checks against environment variable."""
def __init__(self, expected_token: str):
super().__init__(
issuer_url="https://bible.jbatson.dev",
client_registration_options=None,
revocation_options=None
)
self.expected_token = expected_token
async def load_access_token(self, token: str) -> AccessToken | None:
if token == self.expected_token:
return AccessToken(
token=token,
client_id="bible-client",
scopes=[],
expires_at=None
)
return None
async def get_client(self, client_id: str) -> OAuthClientInformationFull | None:
raise NotImplementedError("Client management not supported")
async def register_client(self, client_info: OAuthClientInformationFull) -> None:
raise NotImplementedError("Client registration not supported")
async def authorize(self, client: OAuthClientInformationFull, params: AuthorizationParams) -> str:
raise NotImplementedError("Authorization flow not supported")
async def load_authorization_code(self, client: OAuthClientInformationFull, authorization_code: str) -> AuthorizationCode | None:
raise NotImplementedError("Authorization code flow not supported")
async def exchange_authorization_code(self, client: OAuthClientInformationFull, authorization_code: AuthorizationCode) -> OAuthToken:
raise NotImplementedError("Authorization code exchange not supported")
async def load_refresh_token(self, client: OAuthClientInformationFull, refresh_token: str) -> RefreshToken | None:
raise NotImplementedError("Refresh token flow not supported")
async def exchange_refresh_token(self, client: OAuthClientInformationFull, refresh_token: RefreshToken, scopes: list[str]) -> OAuthToken:
raise NotImplementedError("Refresh token exchange not supported")
async def revoke_token(self, token: AccessToken | RefreshToken) -> None:
raise NotImplementedError("Token revocation not supported")
# Simple token-based auth using environment variable
auth_token = os.getenv("BIBLE_SERVER_TOKEN")
auth = SimpleTokenAuthProvider(auth_token) if auth_token else None
mcp = FastMCP("bible", auth=auth)
BIBLE_API_BASE = "https://bible.helloao.org"
def create_structured_error(full_details: str, user_message: str) -> str:
"""Create a structured error message with both technical details and user-friendly message."""
return json.dumps({
"full_details": full_details,
"user_message": user_message
})
async def make_bible_request(url: str) -> dict[str, Any]:
"""Make a request to the Bible API with proper error handling."""
headers = {
"Accept": "application/json"
}
async with httpx.AsyncClient() as client:
try:
response = await client.get(url, headers=headers, timeout=30.0)
response.raise_for_status()
return response.json()
except httpx.TimeoutException as e:
raise ToolError(create_structured_error(
f"Bible API request timed out: {str(e)}",
"The request took too long. Please try again."
))
except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
raise ToolError(create_structured_error(
f"Bible API 404 error: {url}",
"The requested Bible content was not found. Please check your selection and try again."
))
elif e.response.status_code >= 500:
raise ToolError(create_structured_error(
f"Bible API server error {e.response.status_code}: {url}",
"The Bible service is temporarily unavailable. Please try again in a few moments."
))
else:
raise ToolError(create_structured_error(
f"Bible API HTTP error {e.response.status_code}: {url}",
"There was a problem accessing the Bible content. Please try again."
))
except Exception as e:
raise ToolError(create_structured_error(
f"Bible API connection error: {str(e)}",
"Unable to connect to the Bible service. Please check your internet connection and try again."
))
@mcp.tool()
async def get_translations(
language: Annotated[str, Field(description="Language to filter for.")]
) -> list[dict]:
"""Get list of available Bible translations. The user will see the response of this tool call.
"""
if not language or not language.strip():
raise ValueError(create_structured_error(
"get_translations called with empty language parameter",
"Please specify a language to search for Bible translations."
))
url = f"{BIBLE_API_BASE}/api/available_translations.json"
data = await make_bible_request(url)
if "translations" not in data:
raise ToolError(create_structured_error(
"Bible API response missing 'translations' field",
"Unable to retrieve Bible translations. Please try again."
))
if language != None:
language_lower = language.strip().lower()
filtered_translations = list(filter(lambda x: language_lower == x["language"].lower() or language_lower == x["languageEnglishName"].lower() or language_lower == x["languageName"].lower(), data["translations"]))
data = filtered_translations
else:
data = data["translations"]
# Ensure we always have a list
if not isinstance(data, list):
data = [data] if data else []
keys_to_keep = { "id", "name","englishName", "language", "numberOfBooks", "languageName", "languageEnglishName"}
formatted_data = [ { k: x[k] for k in keys_to_keep} for x in data ]
return formatted_data
async def _get_books(translation_id: str) -> tuple[list[dict], dict]:
url = f"{BIBLE_API_BASE}/api/{translation_id}/books.json"
data = await make_bible_request(url)
if "books" not in data or "translation" not in data:
raise ToolError(create_structured_error(
f"Bible API response missing 'books' or 'translation' fields for translation_id: {translation_id}",
"Unable to retrieve books for this Bible translation. Please try a different translation."
))
books = data["books"]
translation_info = data["translation"]
keys_to_keep = { "id", "name", "commonName", "title", "numberOfChapters"}
formatted_data = [ { k: x[k] for k in keys_to_keep} for x in books ]
return formatted_data, translation_info
@mcp.tool()
async def get_books(translation_id: Annotated[str, Field(description="The translation ID to get books for")]) -> dict:
"""Get list of books available in a specific Bible translation. The user will see the response of this tool call.
Args:
translation_id: The id for the translation to get books from
"""
if not translation_id or not translation_id.strip():
raise ValueError(create_structured_error(
"get_books called with empty translation_id parameter",
"Please specify a Bible translation to view its books."
))
books, translation_info = await _get_books(translation_id.strip())
return {
"translation_id": translation_id,
"translation_name": translation_info.get("name", translation_id),
"language": translation_info.get("languageName", "Unknown"),
"total_books": len(books),
"books": books
}
def _get_book_id(book: str, books: list[dict]) -> dict:
book_lower = book.lower()
potential_books = list(filter(lambda x: book_lower == x["name"].lower() or book_lower == x["commonName"].lower() or book_lower == x["id"].lower(), books))
if len(potential_books) == 1:
book_id = potential_books[0]["id"]
elif len(potential_books) == 0:
raise ValueError(create_structured_error(
f"No book found with provided input: {book} (try using get_books - this could be language specific)",
f"The book '{book}' was not found. Please check the spelling or try a different book name."
))
elif len(potential_books) > 1:
book_names = ", ".join([book["commonName"] for book in potential_books])
raise ValueError(create_structured_error(
f"Multiple books found with provided input: {book_names}",
f"Multiple books match '{book}'. Please be more specific."
))
return book_id
async def _get_chapter_data(translation_id: str, book: str, chapter: int):
"""Get raw chapter data from the API."""
books, _ = await _get_books(translation_id)
book_id = _get_book_id(book, books)
url = f"{BIBLE_API_BASE}/api/{translation_id}/{book_id}/{chapter}.json"
data = await make_bible_request(url)
return data
def _extract_verse_text(verse_content):
"""Extract text from verse content, handling different formats and preserving line breaks and notes."""
verse_texts = []
if isinstance(verse_content, dict) and 'content' in verse_content:
verse_content = verse_content["content"]
if isinstance(verse_content, list):
prev_poem_level = None
for item in verse_content:
if isinstance(item, dict):
if 'text' in item:
# Handle poem formatting - add line breaks between different poem levels
current_poem_level = item.get('poem')
if (prev_poem_level is not None and
current_poem_level is not None and
current_poem_level != prev_poem_level and
verse_texts): # Don't add line break at start
verse_texts.append('\n')
# Format 1: Dictionary with 'text' key
# e.g., {'text': 'Take heed that ye do not...', 'wordsOfJesus': True}
verse_texts.append(item['text'])
prev_poem_level = current_poem_level
elif 'lineBreak' in item and item['lineBreak']:
# Handle explicit line breaks - only add if we already have content
# (ignore leading line breaks at start of verse)
if verse_texts:
verse_texts.append('\n')
elif 'noteId' in item:
# Handle footnote references
verse_texts.append(f' [{item["noteId"]}]')
else:
# Unknown dict format - pass
pass
elif isinstance(item, str):
# Format 2: Plain string
# e.g., 'I therefore, the prisoner of the Lord...'
verse_texts.append(item)
else:
# Format 3: Unknown format - pass
pass
return "".join(verse_texts)
def _extract_verses_structured(chapter_data, verse_start: int, verse_end: int | None = None):
"""Extract structured verse data from chapter data, handling different API formats."""
chapter_content = chapter_data["chapter"]["content"]
# Filter to only actual verses (not headings or other content)
verse_items = [item for item in chapter_content if isinstance(item, dict) and item.get("type") == "verse"]
# If verse_end is not provided, default to the last verse in the chapter
if verse_end is None:
verse_end = len(verse_items)
# Collect verses in the range
verses = []
for verse_item in verse_items:
verse_number = verse_item.get("number", 0)
# Check if this verse is in our requested range
if verse_number < verse_start or verse_number > verse_end:
continue
verse_text = _extract_verse_text(verse_item)
# Extract metadata like wordsOfJesus
metadata = {}
if "content" in verse_item and isinstance(verse_item["content"], list):
for item in verse_item["content"]:
if isinstance(item, dict) and 'wordsOfJesus' in item:
metadata['wordsOfJesus'] = item['wordsOfJesus']
break
if len(verse_text) > 0:
verses.append({
"number": verse_number,
"text": verse_text,
"metadata": metadata
})
return verses
@mcp.tool()
async def get_verses(translation_id: str, book: str, chapter: int, verse_start: int, verse_end: int | None = None) -> dict:
"""Get the verse(s) of a chapter from the specific translation. The user will see the response of this tool call - do not list the result.
Args:
translation_id: The id for the translation to look up
book: The book name, tag, or id
chapter: The chapter number
verse_start: The verse number to start at, inclusive
verse_end: The verse number to end at, inclusive (optional)
"""
# Input validation
if not translation_id or not translation_id.strip():
raise ValueError(create_structured_error(
"get_verses called with empty translation_id parameter",
"Please specify a Bible translation."
))
if not book or not book.strip():
raise ValueError(create_structured_error(
"get_verses called with empty book parameter",
"Please specify a book name (e.g., 'Genesis', 'Matthew')."
))
if chapter < 1:
raise ValueError(create_structured_error(
f"get_verses called with invalid chapter number: {chapter}",
"Please enter a valid chapter number (1 or higher)."
))
if verse_start < 1:
raise ValueError(create_structured_error(
f"get_verses called with invalid verse_start: {verse_start}",
"Please enter a valid starting verse number (1 or higher)."
))
if verse_end is not None and verse_end < verse_start:
raise ValueError(create_structured_error(
f"get_verses called with verse_end ({verse_end}) < verse_start ({verse_start})",
"The ending verse number must be greater than or equal to the starting verse."
))
# Get book metadata to validate chapter number before API call
books, translation_info = await _get_books(translation_id.strip())
book_id = _get_book_id(book.strip(), books)
book_info = next((b for b in books if b["id"] == book_id), {})
# Validate chapter number against book metadata
max_chapters = book_info.get("numberOfChapters", 0)
if chapter > max_chapters:
raise ValueError(create_structured_error(
f"get_verses called with chapter {chapter} but {book_info.get('commonName', book)} only has {max_chapters} chapters",
f"{book_info.get('commonName', book)} only has {max_chapters} chapters. Please enter a valid chapter number (1-{max_chapters})."
))
data = await _get_chapter_data(translation_id.strip(), book.strip(), chapter)
# If verse_end is not provided for get_verses, just get the single verse
if verse_end is None:
verse_end = verse_start
verses = _extract_verses_structured(data, verse_start, verse_end)
return {
"translation": {
"id": translation_id,
"name": translation_info.get("name", translation_id),
"language": translation_info.get("languageName", "Unknown"),
"language_english_name": translation_info.get("languageEnglishName", "Unknown")
},
"book": {
"id": book_id,
"name": book_info.get("name", book),
"common_name": book_info.get("commonName", book),
"number_of_chapters": book_info.get("numberOfChapters", 0)
},
"chapter": chapter,
"verse_start": verse_start,
"verse_end": verse_end,
"verses": verses
}
@mcp.tool()
async def get_chapter(translation_id: str, book: str, chapter: int) -> dict:
"""Get the chapter of a book from the specific translation, including all of the verse text. The user will see the response of this tool call.
Args:
translation_id: The id for the translation to look up
book: The book name, tag, or id
chapter: The chapter number
"""
# Input validation
if not translation_id or not translation_id.strip():
raise ValueError(create_structured_error(
"get_chapter called with empty translation_id parameter",
"Please specify a Bible translation."
))
if not book or not book.strip():
raise ValueError(create_structured_error(
"get_chapter called with empty book parameter",
"Please specify a book name (e.g., 'Genesis', 'Matthew')."
))
if chapter < 1:
raise ValueError(create_structured_error(
f"get_chapter called with invalid chapter number: {chapter}",
"Please enter a valid chapter number (1 or higher)."
))
# Get book metadata to validate chapter number before API call
books, translation_info = await _get_books(translation_id.strip())
book_id = _get_book_id(book.strip(), books)
book_info = next((b for b in books if b["id"] == book_id), {})
# Validate chapter number against book metadata
max_chapters = book_info.get("numberOfChapters", 0)
if chapter > max_chapters:
raise ValueError(create_structured_error(
f"get_chapter called with chapter {chapter} but {book_info.get('commonName', book)} only has {max_chapters} chapters",
f"{book_info.get('commonName', book)} only has {max_chapters} chapters. Please enter a valid chapter number (1-{max_chapters})."
))
data = await _get_chapter_data(translation_id.strip(), book.strip(), chapter)
# Get all verses in the chapter (start from verse 1, end determined by chapter length)
verses = _extract_verses_structured(data, verse_start=1)
return {
"translation": {
"id": translation_id,
"name": translation_info.get("name", translation_id),
"language": translation_info.get("languageName", "Unknown"),
"language_english_name": translation_info.get("languageEnglishName", "Unknown")
},
"book": {
"id": book_id,
"name": book_info.get("name", book),
"common_name": book_info.get("commonName", book),
"number_of_chapters": book_info.get("numberOfChapters", 0)
},
"chapter": chapter,
"verse_start": 1,
"verse_end": len(verses),
"verses": verses
}
if __name__ == "__main__":
# Initialize and run the server
# Use environment variables for host and port configuration
host = os.getenv('MCP_HOST', '0.0.0.0') # Default to all interfaces for container access
port = int(os.getenv('MCP_PORT', '8000')) # Default to port 8000
mcp.run(transport='streamable-http', host=host, port=port)