server_utils.py•2.89 kB
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import httpx
from awslabs.aws_documentation_mcp_server.util import (
extract_content_from_html,
format_documentation_result,
is_html_content,
)
from importlib.metadata import version
from loguru import logger
from mcp.server.fastmcp import Context
try:
__version__ = version('awslabs.aws-documentation-mcp-server')
except Exception:
from . import __version__
DEFAULT_USER_AGENT = f'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36 ModelContextProtocol/{__version__} (AWS Documentation Server)'
async def read_documentation_impl(
ctx: Context,
url_str: str,
max_length: int,
start_index: int,
session_uuid: str,
) -> str:
"""The implementation of the read_documentation tool."""
logger.debug(f'Fetching documentation from {url_str}')
url_with_session = f'{url_str}?session={session_uuid}'
async with httpx.AsyncClient() as client:
try:
response = await client.get(
url_with_session,
follow_redirects=True,
headers={
'User-Agent': DEFAULT_USER_AGENT,
'X-MCP-Session-Id': session_uuid,
},
timeout=30,
)
except httpx.HTTPError as e:
error_msg = f'Failed to fetch {url_str}: {str(e)}'
logger.error(error_msg)
await ctx.error(error_msg)
return error_msg
if response.status_code >= 400:
error_msg = f'Failed to fetch {url_str} - status code {response.status_code}'
logger.error(error_msg)
await ctx.error(error_msg)
return error_msg
page_raw = response.text
content_type = response.headers.get('content-type', '')
if is_html_content(page_raw, content_type):
content = extract_content_from_html(page_raw)
else:
content = page_raw
result = format_documentation_result(url_str, content, start_index, max_length)
# Log if content was truncated
if len(content) > start_index + max_length:
logger.debug(
f'Content truncated at {start_index + max_length} of {len(content)} characters'
)
return result