Skip to main content
Glama

YDB MCP

Official
by ydb-platform
Apache 2.0
22
  • Linux
  • Apple
test_mcp_server_integration.py41.5 kB
"""Integration tests for YDB MCP server. These tests assume a YDB server is running at localhost:2136 with a database /local, or will create a Docker container with YDB if none is available. They directly test the YDBMCPServer methods without using HTTP. """ import datetime import json import logging import time import warnings import pytest # Fixtures are automatically imported by pytest from conftest.py from tests.integration.conftest import call_mcp_tool # Suppress the utcfromtimestamp deprecation warning from the YDB library warnings.filterwarnings("ignore", message="datetime.datetime.utcfromtimestamp.*", category=DeprecationWarning) # Table name used for tests - using timestamp to avoid conflicts TEST_TABLE = f"mcp_integration_test_{int(time.time())}" # Set up logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Use loop_scope instead of scope for the asyncio marker pytestmark = [pytest.mark.integration, pytest.mark.asyncio] async def test_simple_query(mcp_server): """Test a basic YDB query.""" result = await call_mcp_tool(mcp_server, "ydb_query", sql="SELECT 1+1 as result") assert isinstance(result, list) and len(result) > 0 and "text" in result[0], ( f"Result should be a list of dicts with 'text': {result}" ) parsed = json.loads(result[0]["text"]) assert "result_sets" in parsed, f"No result_sets in parsed result: {parsed}" assert len(parsed["result_sets"]) == 1, f"Expected 1 result set, got {len(parsed['result_sets'])}" first_result = parsed["result_sets"][0] assert "columns" in first_result, f"No columns in result: {first_result}" assert "rows" in first_result, f"No rows in result: {first_result}" assert len(first_result["rows"]) > 0, f"Empty result set: {first_result}" assert first_result["columns"][0] == "result", f"Unexpected column name: {first_result['columns'][0]}" assert first_result["rows"][0][0] == 2, f"Unexpected result value: {first_result['rows'][0][0]}" async def test_create_table_and_query(mcp_server): """Test creating a table and executing queries against it.""" # Generate a unique table name to avoid conflicts with other tests test_table_name = f"temp_test_table_{int(time.time())}" try: # Create table create_result = await call_mcp_tool( mcp_server, "ydb_query", sql=f""" CREATE TABLE {test_table_name} ( id Uint64, name Utf8, PRIMARY KEY (id) ); """, ) assert isinstance(create_result, list) and len(create_result) > 0 and "text" in create_result[0], ( f"Result should be a list of dicts with 'text': {create_result}" ) parsed = json.loads(create_result[0]["text"]) assert "error" not in parsed, f"Error creating table: {parsed}" # Insert data insert_result = await call_mcp_tool( mcp_server, "ydb_query", sql=f""" UPSERT INTO {test_table_name} (id, name) VALUES (1, 'Test 1'), (2, 'Test 2'), (3, 'Test 3'); """, ) assert isinstance(insert_result, list) and len(insert_result) > 0 and "text" in insert_result[0], ( f"Result should be a list of dicts with 'text': {insert_result}" ) parsed = json.loads(insert_result[0]["text"]) assert "error" not in parsed, f"Error inserting data: {parsed}" # Query data query_result = await call_mcp_tool(mcp_server, "ydb_query", sql=f"SELECT * FROM {test_table_name} ORDER BY id;") assert isinstance(query_result, list) and len(query_result) > 0 and "text" in query_result[0], ( f"Result should be a list of dicts with 'text': {query_result}" ) parsed = json.loads(query_result[0]["text"]) assert "result_sets" in parsed, f"No result_sets in parsed result: {parsed}" assert len(parsed["result_sets"]) == 1, f"Expected 1 result set, got {len(parsed['result_sets'])}" first_result = parsed["result_sets"][0] assert "columns" in first_result, f"No columns in result: {first_result}" assert "rows" in first_result, f"No rows in result: {first_result}" assert len(first_result["rows"]) == 3, f"Expected 3 rows, got {len(first_result['rows'])}" # Check if 'id' and 'name' columns are present assert "id" in first_result["columns"], f"Column 'id' not found in {first_result['columns']}" assert "name" in first_result["columns"], f"Column 'name' not found in {first_result['columns']}" # Get column indexes id_idx = first_result["columns"].index("id") name_idx = first_result["columns"].index("name") # Verify values assert first_result["rows"][0][id_idx] == 1, f"Expected id=1, got {first_result['rows'][0][id_idx]}" # YDB may return strings as bytes, so handle both cases name_value = first_result["rows"][0][name_idx] if isinstance(name_value, bytes): assert name_value.decode("utf-8") == "Test 1", f"Expected name='Test 1', got {name_value.decode('utf-8')}" else: assert name_value == "Test 1", f"Expected name='Test 1', got {name_value}" finally: # Cleanup - drop the table after test cleanup_result = await call_mcp_tool(mcp_server, "ydb_query", sql=f"DROP TABLE {test_table_name};") logger.debug(f"Table cleanup result: {cleanup_result}") async def test_parameterized_query(mcp_server): """Test a parameterized query using the parameters feature of YDB.""" # Test with a simple parameterized query result = await call_mcp_tool( mcp_server, "ydb_query_with_params", sql=""" DECLARE $answer AS Int32; DECLARE $greeting AS Utf8; SELECT $answer as answer, $greeting as greeting """, params=json.dumps( {"answer": [-42, "Int32"], "greeting": "hello"} # Explicitly specify the type as Int32 ), ) assert isinstance(result, list) and len(result) > 0 and "text" in result[0], ( f"Result should be a list of dicts with 'text': {result}" ) parsed = json.loads(result[0]["text"]) assert "result_sets" in parsed, f"No result_sets in parsed result: {parsed}" assert len(parsed["result_sets"]) == 1, f"Expected 1 result set, got {len(parsed['result_sets'])}" first_result = parsed["result_sets"][0] assert "columns" in first_result, f"No columns in result: {first_result}" assert "rows" in first_result, f"No rows in result: {first_result}" assert len(first_result["rows"]) > 0, f"Empty result set: {first_result}" # Check column names assert "answer" in first_result["columns"], f"Expected 'answer' column in result: {first_result['columns']}" assert "greeting" in first_result["columns"], f"Expected 'greeting' column in result: {first_result['columns']}" # Check values answer_idx = first_result["columns"].index("answer") greeting_idx = first_result["columns"].index("greeting") assert first_result["rows"][0][answer_idx] == -42, f"Expected answer=-42, got {first_result['rows'][0][answer_idx]}" # YDB may return strings either as bytes or as strings depending on context greeting_value = first_result["rows"][0][greeting_idx] if isinstance(greeting_value, bytes): # If bytes, decode to string assert greeting_value.decode("utf-8") == "hello", ( f"Expected greeting to decode to 'hello', got {greeting_value.decode('utf-8')}" ) else: # If already string assert greeting_value == "hello", f"Expected greeting to be 'hello', got {greeting_value}" async def test_complex_query(mcp_server): """Test a more complex query with multiple result sets.""" result = await call_mcp_tool( mcp_server, "ydb_query", sql=""" SELECT 1 as value; SELECT 'test' as text, 2.5 as number; """, ) # Check for result sets assert isinstance(result, list) and len(result) > 0 and "text" in result[0], ( f"Result should be a list of dicts with 'text': {result}" ) parsed = json.loads(result[0]["text"]) assert "result_sets" in parsed, f"No result_sets in parsed result: {parsed}" # Verify first result set first_result = parsed["result_sets"][0] assert "columns" in first_result assert first_result["columns"][0] == "value" assert first_result["rows"][0][0] == 1 # Verify second result set second_result = parsed["result_sets"][1] assert "columns" in second_result assert len(second_result["columns"]) == 2 assert second_result["columns"][0] == "text" assert second_result["columns"][1] == "number" # The text value is returned as a binary string, need to handle this text_value = second_result["rows"][0][0] if isinstance(text_value, bytes): assert text_value.decode("utf-8") == "test" else: assert text_value == "test" assert second_result["rows"][0][1] == 2.5 async def test_multiple_resultsets_with_tables(mcp_server): """Test multiple result sets involving table creation and queries.""" # Create unique table names with timestamp to avoid conflicts test_table1 = f"temp_test_table1_{int(time.time())}" test_table2 = f"temp_test_table2_{int(time.time())}" try: # First, create tables - schema operations need to be separate setup_result = await call_mcp_tool( mcp_server, "ydb_query", sql=f""" CREATE TABLE {test_table1} (id Uint64, name Utf8, PRIMARY KEY (id)); CREATE TABLE {test_table2} (id Uint64, value Double, PRIMARY KEY (id)); """, ) assert isinstance(setup_result, list) and len(setup_result) > 0 and "text" in setup_result[0], ( f"Result should be a list of dicts with 'text': {setup_result}" ) parsed = json.loads(setup_result[0]["text"]) assert "error" not in parsed, f"Error creating tables: {parsed}" # Then insert data - separate operation insert_result = await call_mcp_tool( mcp_server, "ydb_query", sql=f""" UPSERT INTO {test_table1} (id, name) VALUES (1, 'First'), (2, 'Second'), (3, 'Third'); """, ) assert isinstance(insert_result, list) and len(insert_result) > 0 and "text" in insert_result[0], ( f"Result should be a list of dicts with 'text': {insert_result}" ) parsed = json.loads(insert_result[0]["text"]) assert "error" not in parsed, f"Error inserting data into table1: {parsed}" insert_result2 = await call_mcp_tool( mcp_server, "ydb_query", sql=f""" UPSERT INTO {test_table2} (id, value) VALUES (1, 10.5), (2, 20.75), (3, 30.25); """, ) assert isinstance(insert_result2, list) and len(insert_result2) > 0 and "text" in insert_result2[0], ( f"Result should be a list of dicts with 'text': {insert_result2}" ) parsed = json.loads(insert_result2[0]["text"]) assert "error" not in parsed, f"Error inserting data into table2: {parsed}" # Now query both tables in a single request to test multiple result sets result = await call_mcp_tool( mcp_server, "ydb_query", sql=f""" SELECT * FROM {test_table1} ORDER BY id; SELECT * FROM {test_table2} ORDER BY id; """, ) # Verify we have all result sets assert isinstance(result, list) and len(result) > 0 and "text" in result[0], ( f"Result should be a list of dicts with 'text': {result}" ) parsed = json.loads(result[0]["text"]) assert "result_sets" in parsed, f"No result_sets in parsed result: {parsed}" # Check first table results first_result = parsed["result_sets"][0] assert len(first_result["rows"]) == 3, "Expected 3 rows in first table" assert "id" in first_result["columns"], f"Expected 'id' column in first table, got {first_result['columns']}" assert "name" in first_result["columns"], f"Expected 'name' column in first table, got {first_result['columns']}" # Check second table results second_result = parsed["result_sets"][1] assert len(second_result["rows"]) == 3, "Expected 3 rows in second table" assert "id" in second_result["columns"], f"Expected 'id' column in second table, got {second_result['columns']}" assert "value" in second_result["columns"], ( f"Expected 'value' column in second table, got {second_result['columns']}" ) # Now test a join query - should return a single result set join_result = await call_mcp_tool( mcp_server, "ydb_query", sql=f""" SELECT t1.id, t1.name, t2.value FROM {test_table1} t1 JOIN {test_table2} t2 ON t1.id = t2.id ORDER BY t1.id; """, ) # Validate join results assert isinstance(join_result, list) and len(join_result) > 0 and "text" in join_result[0], ( f"Result should be a list of dicts with 'text': {join_result}" ) parsed = json.loads(join_result[0]["text"]) assert "result_sets" in parsed, "Join query should return result_sets" assert len(parsed["result_sets"]) == 1, f"Expected 1 result set for join, got {len(parsed['result_sets'])}" first_join_result = parsed["result_sets"][0] assert "columns" in first_join_result, "Join query should return columns" assert "rows" in first_join_result, "Join query should return rows" assert len(first_join_result["rows"]) == 3, ( f"Expected 3 rows in join result, got {len(first_join_result['rows'])}" ) assert len(first_join_result["columns"]) == 3, ( f"Expected 3 columns in join result, got {len(first_join_result['columns'])}" ) finally: # Cleanup - drop the tables after test try: cleanup_result = await call_mcp_tool(mcp_server, "ydb_query", sql=f"DROP TABLE {test_table1};") logger.debug(f"Table1 cleanup result: {cleanup_result}") cleanup_result2 = await call_mcp_tool(mcp_server, "ydb_query", sql=f"DROP TABLE {test_table2};") logger.debug(f"Table2 cleanup result: {cleanup_result2}") except Exception as e: logger.warning(f"Failed to clean up test tables: {e}") async def test_single_resultset_format(mcp_server): """Test that single result set queries use the new format with result_sets list.""" result = await call_mcp_tool(mcp_server, "ydb_query", sql="SELECT 42 as answer") # Single result set should have result_sets key with one item assert isinstance(result, list) and len(result) > 0 and "text" in result[0], ( f"Result should be a list of dicts with 'text': {result}" ) parsed = json.loads(result[0]["text"]) assert "result_sets" in parsed, "Single result should include result_sets key" assert len(parsed["result_sets"]) == 1, f"Expected 1 result set, got {len(parsed['result_sets'])}" first_result = parsed["result_sets"][0] assert "columns" in first_result, "Should have columns in result set" assert "rows" in first_result, "Should have rows in result set" assert first_result["columns"][0] == "answer", "Expected 'answer' column" assert first_result["rows"][0][0] == 42, "Expected value 42" async def test_data_types(mcp_server): """Test a very basic query with simple parameter types.""" # Execute a simple query that doesn't use complex parameter types result = await call_mcp_tool(mcp_server, "ydb_query", sql="SELECT 1 AS value, 'test' AS text") # Basic result checks assert isinstance(result, list) and len(result) > 0 and "text" in result[0], ( f"Result should be a list of dicts with 'text': {result}" ) parsed = json.loads(result[0]["text"]) assert "result_sets" in parsed, f"No result_sets in parsed result: {parsed}" assert len(parsed["result_sets"]) == 1, f"Expected 1 result set, got {len(parsed['result_sets'])}" first_result = parsed["result_sets"][0] assert "columns" in first_result, f"No columns in result: {first_result}" assert "rows" in first_result, f"No rows in result: {first_result}" assert len(first_result["columns"]) == 2, f"Expected 2 columns, got {len(first_result['columns'])}" assert len(first_result["rows"]) == 1, f"Expected 1 row, got {len(first_result['rows'])}" # Verify column names assert "value" in first_result["columns"], f"Expected column 'value', got {first_result['columns']}" assert "text" in first_result["columns"], f"Expected column 'text', got {first_result['columns']}" # Verify values row = first_result["rows"][0] value_idx = first_result["columns"].index("value") text_idx = first_result["columns"].index("text") assert row[value_idx] == 1, f"Expected value=1, got {row[value_idx]}" # Accept both bytes and string for text columns if isinstance(row[text_idx], bytes): assert row[text_idx] == b"test", f"Expected text=b'test', got {row[text_idx]}" else: assert row[text_idx] == "test", f"Expected text='test', got {row[text_idx]}" async def test_all_data_types(mcp_server): """Test all supported YDB data types to ensure proper round-trip processing.""" # Construct a query with literals of all supported data types result = await call_mcp_tool( mcp_server, "ydb_query", sql=""" SELECT -- Boolean type true AS bool_true, false AS bool_false, -- Integer types (signed) -128 AS int8_min, 127 AS int8_max, -32768 AS int16_min, 32767 AS int16_max, -2147483648 AS int32_min, 2147483647 AS int32_max, -9223372036854775808 AS int64_min, 9223372036854775807 AS int64_max, -- Integer types (unsigned) 0 AS uint8_min, 255 AS uint8_max, 0 AS uint16_min, 65535 AS uint16_max, 0 AS uint32_min, 4294967295 AS uint32_max, 0 AS uint64_min, 18446744073709551615 AS uint64_max, -- Floating point types 3.14 AS float_value, 2.7182818284590452 AS double_value, -- String types "Hello, World!" AS string_value, "UTF8 строка" AS utf8_value, "00000000-0000-0000-0000-000000000000" AS uuid_value, '{"key": "value"}' AS json_value, -- Date and time types Date("2023-07-15") AS date_value, Datetime("2023-07-15T12:30:45Z") AS datetime_value, Timestamp("2023-07-15T12:30:45.123456Z") AS timestamp_value, INTERVAL("P1DT2H3M4.567S") AS interval_value, -- Decimal CAST("123.456789" AS Decimal(22,9)) AS decimal_value, -- Container types -- List containers AsList(1, 2, 3) AS int_list, AsList("a", "b", "c") AS string_list, -- Struct containers (similar to tuples) AsStruct(1 AS a, "x" AS b) AS simple_struct, -- Dictionary containers AsDict( AsTuple("key1", 1), AsTuple("key2", 2), AsTuple("key3", 3) ) AS string_to_int_dict, -- Nested containers - list of structs AsList( AsStruct(1 AS id, "Alice" AS name), AsStruct(2 AS id, "Bob" AS name), AsStruct(3 AS id, "Charlie" AS name) ) AS list_of_structs, -- Nested containers - struct with list AsStruct( "users" AS collection_name, AsList(1, 2, 3) AS ids, true AS active ) AS struct_with_list, -- Dict with complex values AsDict( AsTuple("person1", AsStruct(1 AS id, "Alice" AS name, AsList(25, 30, 28) AS scores)), AsTuple("person2", AsStruct(2 AS id, "Bob" AS name, AsList(22, 27, 29) AS scores)) ) AS complex_dict, -- Triple-nested container: list of structs with lists AsList( AsStruct( 1 AS id, "Team A" AS name, AsList("Alice", "Bob") AS members ), AsStruct( 2 AS id, "Team B" AS name, AsList("Charlie", "David") AS members ) ) AS nested_list_struct_list, -- Tuple containers AsTuple(1, "a", true) AS mixed_tuple """, ) # Basic result checks assert isinstance(result, list) and len(result) > 0 and "text" in result[0], ( f"Result should be a list of dicts with 'text': {result}" ) parsed = json.loads(result[0]["text"]) assert "result_sets" in parsed, f"No result_sets in parsed result: {parsed}" assert len(parsed["result_sets"]) == 1, f"Expected 1 result set, got {len(parsed['result_sets'])}" first_result = parsed["result_sets"][0] assert "columns" in first_result, f"No columns in result: {first_result}" assert "rows" in first_result, f"No rows in result: {first_result}" assert len(first_result["rows"]) == 1, f"Expected 1 row, got {len(first_result['rows'])}" # Get the row row = first_result["rows"][0] # Helper function to get column index and value def get_value(column_name): try: idx = first_result["columns"].index(column_name) return row[idx] except ValueError: return None # Test each data type # Boolean values assert get_value("bool_true") is True, f"Expected bool_true to be True, got {get_value('bool_true')}" assert get_value("bool_false") is False, f"Expected bool_false to be False, got {get_value('bool_false')}" # Integer types (signed) assert get_value("int8_min") == -128, f"Expected int8_min to be -128, got {get_value('int8_min')}" assert get_value("int8_max") == 127, f"Expected int8_max to be 127, got {get_value('int8_max')}" assert get_value("int16_min") == -32768, f"Expected int16_min to be -32768, got {get_value('int16_min')}" assert get_value("int16_max") == 32767, f"Expected int16_max to be 32767, got {get_value('int16_max')}" assert get_value("int32_min") == -2147483648, f"Expected int32_min to be -2147483648, got {get_value('int32_min')}" assert get_value("int32_max") == 2147483647, f"Expected int32_max to be 2147483647, got {get_value('int32_max')}" assert get_value("int64_min") == -9223372036854775808, ( f"Expected int64_min to be -9223372036854775808, got {get_value('int64_min')}" ) assert get_value("int64_max") == 9223372036854775807, ( f"Expected int64_max to be 9223372036854775807, got {get_value('int64_max')}" ) # Integer types (unsigned) assert get_value("uint8_min") == 0, f"Expected uint8_min to be 0, got {get_value('uint8_min')}" assert get_value("uint8_max") == 255, f"Expected uint8_max to be 255, got {get_value('uint8_max')}" assert get_value("uint16_min") == 0, f"Expected uint16_min to be 0, got {get_value('uint16_min')}" assert get_value("uint16_max") == 65535, f"Expected uint16_max to be 65535, got {get_value('uint16_max')}" assert get_value("uint32_min") == 0, f"Expected uint32_min to be 0, got {get_value('uint32_min')}" assert get_value("uint32_max") == 4294967295, f"Expected uint32_max to be 4294967295, got {get_value('uint32_max')}" assert get_value("uint64_min") == 0, f"Expected uint64_min to be 0, got {get_value('uint64_min')}" assert get_value("uint64_max") == 18446744073709551615, ( f"Expected uint64_max to be 18446744073709551615, got {get_value('uint64_max')}" ) # Floating point types assert abs(get_value("float_value") - 3.14) < 0.0001, ( f"Expected float_value to be close to 3.14, got {get_value('float_value')}" ) assert abs(get_value("double_value") - 2.7182818284590452) < 0.0000000000000001, ( f"Expected double_value to be close to 2.7182818284590452, got {get_value('double_value')}" ) # String types - expect only str, not bytes string_value = get_value("string_value") assert string_value == "Hello, World!", f"Expected string_value to be 'Hello, World!', got {string_value}" utf8_value = get_value("utf8_value") assert utf8_value == "UTF8 строка", f"Expected utf8_value to be 'UTF8 строка', got {utf8_value}" uuid_value = get_value("uuid_value") assert uuid_value == "00000000-0000-0000-0000-000000000000", ( f"Expected uuid_value to be '00000000-0000-0000-0000-000000000000', got {uuid_value}" ) json_value = get_value("json_value") assert json_value == '{"key": "value"}', f"Expected json_value to be '{{'key': 'value'}}', got {json_value}" # Date and time types - YDB returns these as Python datetime objects date_value = get_value("date_value") if isinstance(date_value, str): # Parse string to date parsed_date = datetime.date.fromisoformat(date_value) assert parsed_date == datetime.date(2023, 7, 15), f"Expected date_value to be 2023-07-15, got {parsed_date}" else: assert isinstance(date_value, datetime.date), f"Expected date_value to be datetime.date, got {type(date_value)}" assert date_value == datetime.date(2023, 7, 15), f"Expected date_value to be 2023-07-15, got {date_value}" datetime_value = get_value("datetime_value") if isinstance(datetime_value, str): # Parse string to datetime parsed_dt = datetime.datetime.fromisoformat(datetime_value.replace("Z", "+00:00")) expected_datetime = datetime.datetime(2023, 7, 15, 12, 30, 45, tzinfo=datetime.timezone.utc) if parsed_dt.tzinfo is None: parsed_dt = parsed_dt.replace(tzinfo=datetime.timezone.utc) assert parsed_dt == expected_datetime, f"Expected datetime_value to be {expected_datetime}, got {parsed_dt}" else: assert isinstance(datetime_value, datetime.datetime), ( f"Expected datetime_value to be datetime.datetime, got {type(datetime_value)}" ) expected_datetime = datetime.datetime(2023, 7, 15, 12, 30, 45, tzinfo=datetime.timezone.utc) if datetime_value.tzinfo is None: datetime_value = datetime_value.replace(tzinfo=datetime.timezone.utc) assert datetime_value == expected_datetime, ( f"Expected datetime_value to be {expected_datetime}, got {datetime_value}" ) timestamp_value = get_value("timestamp_value") if isinstance(timestamp_value, str): parsed_ts = datetime.datetime.fromisoformat(timestamp_value.replace("Z", "+00:00")) expected_timestamp = datetime.datetime(2023, 7, 15, 12, 30, 45, 123456, tzinfo=datetime.timezone.utc) if parsed_ts.tzinfo is None: parsed_ts = parsed_ts.replace(tzinfo=datetime.timezone.utc) assert parsed_ts == expected_timestamp, f"Expected timestamp_value to be {expected_timestamp}, got {parsed_ts}" else: assert isinstance(timestamp_value, datetime.datetime), ( f"Expected timestamp_value to be datetime.datetime, got {type(timestamp_value)}" ) expected_timestamp = datetime.datetime(2023, 7, 15, 12, 30, 45, 123456, tzinfo=datetime.timezone.utc) if timestamp_value.tzinfo is None: timestamp_value = timestamp_value.replace(tzinfo=datetime.timezone.utc) assert timestamp_value == expected_timestamp, ( f"Expected timestamp_value to be {expected_timestamp}, got {timestamp_value}" ) interval_value = get_value("interval_value") # Accept both string and timedelta for interval_value expected_interval = datetime.timedelta(days=1, hours=2, minutes=3, seconds=4, microseconds=567000) if isinstance(interval_value, str): # Parse string like '93784.567s' to seconds if interval_value.endswith("s"): seconds = float(interval_value[:-1]) parsed_interval = datetime.timedelta(seconds=seconds) assert parsed_interval.total_seconds() == expected_interval.total_seconds(), ( f"Expected interval_value to be {expected_interval}, got {parsed_interval}" ) else: assert False, f"Unexpected interval string format: {interval_value}" else: assert isinstance(interval_value, datetime.timedelta), ( f"Expected interval_value to be datetime.timedelta, got {type(interval_value)}" ) assert interval_value.total_seconds() == expected_interval.total_seconds(), ( f"Expected interval_value to be {expected_interval}, got {interval_value}" ) # Decimal - YDB returns Decimal objects from decimal import Decimal decimal_value = get_value("decimal_value") if isinstance(decimal_value, str): parsed_decimal = Decimal(decimal_value) assert parsed_decimal == Decimal("123.456789"), ( f"Expected decimal_value to be Decimal('123.456789'), got {parsed_decimal}" ) else: assert isinstance(decimal_value, Decimal), f"Expected decimal_value to be Decimal, got {type(decimal_value)}" assert decimal_value == Decimal("123.456789"), ( f"Expected decimal_value to be Decimal('123.456789'), got {decimal_value}" ) # Container types # List containers int_list = get_value("int_list") assert isinstance(int_list, list), f"Expected int_list to be a list, got {type(int_list)}" assert int_list == [1, 2, 3], f"Expected int_list to be [1, 2, 3], got {int_list}" string_list = get_value("string_list") assert isinstance(string_list, list), f"Expected string_list to be a list, got {type(string_list)}" expected = ["a", "b", "c"] for actual, exp in zip(string_list, expected): assert actual == exp, f"Expected {exp}, got {actual} in string_list" # Struct containers (similar to Python dictionaries) simple_struct = get_value("simple_struct") assert isinstance(simple_struct, dict), f"Expected simple_struct to be a dict, got {type(simple_struct)}" assert "a" in simple_struct and "b" in simple_struct, ( f"Expected simple_struct to have keys 'a' and 'b', got {simple_struct}" ) assert simple_struct["a"] == 1, f"Expected simple_struct['a'] to be 1, got {simple_struct['a']}" assert simple_struct["b"] == "x", f"Expected simple_struct['b'] to be 'x', got {simple_struct['b']}" # Dictionary containers string_to_int_dict = get_value("string_to_int_dict") assert isinstance(string_to_int_dict, dict), ( f"Expected string_to_int_dict to be a dict, got {type(string_to_int_dict)}" ) # Accept both string keys and stringified bytes keys expected_dict = {"key1": 1, "key2": 2, "key3": 3} expected_bytes_dict = {f"b'{k}'": v for k, v in expected_dict.items()} assert string_to_int_dict == expected_dict or string_to_int_dict == expected_bytes_dict, ( f"Expected dict to be {expected_dict} or {expected_bytes_dict}, got {string_to_int_dict}" ) # Nested containers - list of structs list_of_structs = get_value("list_of_structs") assert isinstance(list_of_structs, list), f"Expected list_of_structs to be a list, got {type(list_of_structs)}" assert len(list_of_structs) == 3, f"Expected list_of_structs to have 3 items, got {len(list_of_structs)}" # Check first item in list of structs first_struct = list_of_structs[0] assert isinstance(first_struct, dict), f"Expected first_struct to be a dict, got {type(first_struct)}" assert first_struct == { "id": 1, "name": "Alice", }, f"Expected first_struct to be {{'id': 1, 'name': 'Alice'}}, got {first_struct}" # Struct with list struct_with_list = get_value("struct_with_list") assert isinstance(struct_with_list, dict), f"Expected struct_with_list to be a dict, got {type(struct_with_list)}" assert struct_with_list == { "collection_name": "users", "ids": [1, 2, 3], "active": True, }, ( f"Expected struct_with_list to be {{'collection_name': 'users', 'ids': [1, 2, 3], 'active': True}}, " f"got {struct_with_list}" ) # Complex dict complex_dict = get_value("complex_dict") assert isinstance(complex_dict, dict), f"Expected complex_dict to be a dict, got {type(complex_dict)}" expected_complex_dict = { "person1": {"id": 1, "name": "Alice", "scores": [25, 30, 28]}, "person2": {"id": 2, "name": "Bob", "scores": [22, 27, 29]}, } expected_bytes_complex_dict = {f"b'{k}'": v for k, v in expected_complex_dict.items()} assert complex_dict == expected_complex_dict or complex_dict == expected_bytes_complex_dict, ( f"Expected complex_dict to be {expected_complex_dict} or {expected_bytes_complex_dict}, got {complex_dict}" ) # Triple-nested list nested_list = get_value("nested_list_struct_list") assert isinstance(nested_list, list), f"Expected nested_list to be a list, got {type(nested_list)}" assert len(nested_list) == 2, f"Expected nested_list to have 2 items, got {len(nested_list)}" expected_nested_list = [ {"id": 1, "name": "Team A", "members": ["Alice", "Bob"]}, {"id": 2, "name": "Team B", "members": ["Charlie", "David"]}, ] assert nested_list == expected_nested_list, f"Expected nested_list to be {expected_nested_list}, got {nested_list}" # Tuple containers mixed_tuple = get_value("mixed_tuple") assert isinstance(mixed_tuple, (list, tuple)), f"Expected mixed_tuple to be a list or tuple, got {type(mixed_tuple)}" assert len(mixed_tuple) == 3, f"Expected mixed_tuple to have 3 items, got {len(mixed_tuple)}" expected_tuple = (1, "a", True) # Convert to tuple if it's a list for comparison if isinstance(mixed_tuple, list): mixed_tuple = tuple(mixed_tuple) assert mixed_tuple == expected_tuple, f"Expected mixed_tuple to be {expected_tuple}, got {mixed_tuple}" async def test_list_directory_integration(mcp_server): """Test listing directory contents in YDB.""" # List the contents of the root directory - this should always exist result = await call_mcp_tool(mcp_server, "ydb_list_directory", path="/") # Parse the JSON result assert isinstance(result, list) and len(result) > 0 and "text" in result[0], ( f"Result should be a list of dicts with 'text': {result}" ) parsed = json.loads(result[0]["text"]) # Verify the structure assert "path" in parsed, f"Missing 'path' field in dir_data: {parsed}" assert parsed["path"] == "/", f"Expected path to be '/', got {parsed['path']}" assert "items" in parsed, f"Missing 'items' field in dir_data: {parsed}" # Root directory should have at least some items assert isinstance(parsed["items"], list), f"Expected items to be a list, got {type(parsed['items'])}" assert len(parsed["items"]) > 0, f"Expected non-empty directory, got {parsed['items']}" # Verify at least one item has expected properties assert "name" in parsed["items"][0], f"Missing 'name' field in item: {parsed['items'][0]}" assert "type" in parsed["items"][0], f"Missing 'type' field in item: {parsed['items'][0]}" assert "owner" in parsed["items"][0], f"Missing 'owner' field in item: {parsed['items'][0]}" logger.debug(f"Found {len(parsed['items'])} items in root directory") for item in parsed["items"]: logger.debug(f"Item: {item['name']}, Type: {item['type']}") async def test_list_directory_nonexistent_integration(mcp_server): """Test listing a nonexistent directory in YDB.""" # Generate a random path that should not exist nonexistent_path = f"/nonexistent_{int(time.time())}" # Try to list a nonexistent directory result = await call_mcp_tool(mcp_server, "ydb_list_directory", path=nonexistent_path) # Parse the result assert isinstance(result, list) and len(result) > 0 and "text" in result[0], ( f"Result should be a list of dicts with 'text': {result}" ) parsed = json.loads(result[0]["text"]) # Should contain an error message assert "error" in parsed, f"Expected error message, got: {parsed}" async def test_describe_path_integration(mcp_server): """Test describing paths in YDB.""" # 1. First create a test table to describe test_table_name = f"describe_test_table_{int(time.time())}" try: # Create a table with various column types create_result = await call_mcp_tool( mcp_server, "ydb_query", sql=f""" CREATE TABLE {test_table_name} ( id Uint64, name Utf8, value Double, created Timestamp, PRIMARY KEY (id) ); """, ) assert isinstance(create_result, list) and len(create_result) > 0 and "text" in create_result[0], ( f"Result should be a list of dicts with 'text': {create_result}" ) parsed = json.loads(create_result[0]["text"]) assert "error" not in parsed, f"Error creating table: {parsed}" # Wait a moment for the table to be fully created time.sleep(1) # 2. Now describe the table result = await call_mcp_tool(mcp_server, "ydb_describe_path", path=f"/{test_table_name}") # Parse the JSON result assert isinstance(result, list) and len(result) > 0 and "text" in result[0], ( f"Result should be a list of dicts with 'text': {result}" ) parsed = json.loads(result[0]["text"]) # Only check for path if not error if "error" not in parsed: assert parsed["path"] == f"/{test_table_name}", ( f"Expected path to be '/{test_table_name}', got {parsed['path']}" ) assert "type" in parsed, f"Missing 'type' field in path_data: {parsed}" assert parsed["type"] == "TABLE", f"Expected type to be 'TABLE', got {parsed['type']}" # Verify table information assert "table" in parsed, f"Missing 'table' field in path_data: {parsed}" finally: # Clean up - drop the table even if test fails cleanup_result = await call_mcp_tool(mcp_server, "ydb_query", sql=f"DROP TABLE {test_table_name};") logger.debug(f"Table cleanup result: {cleanup_result}") async def test_describe_nonexistent_path_integration(mcp_server): """Test describing a nonexistent path in YDB.""" # Generate a random path that should not exist nonexistent_path = f"/nonexistent_{int(time.time())}" # Try to describe a nonexistent path result = await call_mcp_tool(mcp_server, "ydb_describe_path", path=nonexistent_path) # Parse the result assert isinstance(result, list) and len(result) > 0 and "text" in result[0], ( f"Result should be a list of dicts with 'text': {result}" ) parsed = json.loads(result[0]["text"]) # Should contain an error message assert "error" in parsed, f"Expected error message, got: {parsed}" async def test_ydb_status_integration(mcp_server): """Test getting YDB connection status.""" result = await call_mcp_tool(mcp_server, "ydb_status") # Parse the JSON result assert isinstance(result, list) and len(result) > 0 and "text" in result[0], ( f"Result should be a list of dicts with 'text': {result}" ) parsed = json.loads(result[0]["text"]) # Verify the structure assert "status" in parsed, f"Missing 'status' field in status_data: {parsed}" assert "ydb_endpoint" in parsed, f"Missing 'ydb_endpoint' field in status_data: {parsed}" assert "ydb_database" in parsed, f"Missing 'ydb_database' field in status_data: {parsed}" assert "auth_mode" in parsed, f"Missing 'auth_mode' field in status_data: {parsed}" assert "ydb_connection" in parsed, f"Missing 'ydb_connection' field in status_data: {parsed}" # For a successful test run, we expect to be connected assert parsed["status"] == "running", f"Expected status to be 'running', got {parsed['status']}" assert parsed["ydb_connection"] == "connected", ( f"Expected ydb_connection to be 'connected', got {parsed['ydb_connection']}" ) assert parsed["error"] is None, f"Expected no error, got: {parsed.get('error')}" logger.info(f"YDB status check successful: {parsed}")

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ydb-platform/ydb-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server