Skip to main content
Glama
run_project_sse.py2.93 kB
"""Test module for the agentskills-mcp MCP service over HTTP. This module exercises the agentskills-mcp MCP service by: 1. Starting the service with the given configuration using :class:`AgentskillsMcpServiceRunner`. 2. Connecting to the service via :class:`FastMcpClient`. 3. Listing available tools exposed by the MCP server. 4. Invoking a selection of tools and asserting that each call succeeds. It is intended as an integration/diagnostic script rather than a unit test. """ import sys import json import asyncio from loguru import logger from fastmcp.client.client import CallToolResult from flowllm.core.utils.fastmcp_client import FastMcpClient from agentskills_mcp.core.utils.service_runner import AgentSkillsMcpServiceRunner async def test_mcp_service(mcp_config) -> None: """Connect to the MCP service, list tools, and run sample tool calls.""" # Connect to the MCP service using FastMcpClient async with FastMcpClient( name="agentskills-mcp-test", config=mcp_config, max_retries=1, ) as client: # List available tools print("=" * 50) print("Getting available MCP tools...") tool_calls = await client.list_tool_calls() print(f"Found {len(tool_calls)} tools:") for tool_call in tool_calls: tool_info = tool_call.simple_input_dump() print(json.dumps(tool_info, ensure_ascii=False)) for tool_name, test_arguments in [ ("load_skill_metadata", {}), ("load_skill", {"skill_name": "pdf"}), ("read_reference_file", {"skill_name": "pdf", "file_name": "reference.md"}), ("run_shell_command", {"skill_name": "pdf", "command": "ls -l"}), ]: result: CallToolResult = await client.call_tool(tool_name, test_arguments) result_content = result.content[0].text success = not result.is_error print(f"Tool call result: {tool_name}, success: {success}, content: {result_content}") assert success def main(skill_dir: str) -> None: """Run the MCP service in-process and execute the async test routine.""" # Service configuration service_args = [ "agentskills-mcp", "config=default", "mcp.transport=sse", f"metadata.skill_dir={skill_dir}", ] # MCP client configuration host = "0.0.0.0" port = 8150 mcp_config = { "type": "sse", "url": f"http://{host}:{port}/sse", } with AgentSkillsMcpServiceRunner( service_args, host=host, port=port, ) as service: logger.info(f"Service is running on port {service.port}") asyncio.run(test_mcp_service(mcp_config)) if __name__ == "__main__": if len(sys.argv) != 2: print("Please provide the skill directory as a command line argument.") sys.exit(1) skill_dir = sys.argv[1] main(skill_dir)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/zouyingcao/agentskills-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server