Skip to main content
Glama

CentralMind/Gateway

connector_test.go6 kB
package mysql import ( "context" "path/filepath" "testing" "github.com/centralmind/gateway/connectors" "github.com/centralmind/gateway/model" "github.com/docker/go-connections/nat" "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/testcontainers/testcontainers-go" "github.com/testcontainers/testcontainers-go/modules/mysql" ) func TestConnector(t *testing.T) { ctx := context.Background() dbName := "testdb" dbUser := "user" dbPassword := "password" mysqlContainer, err := mysql.Run(ctx, "mysql:8.0", mysql.WithDatabase(dbName), mysql.WithUsername(dbUser), mysql.WithPassword(dbPassword), mysql.WithScripts(filepath.Join("testdata", "test_data.sql")), ) require.NoError(t, err) defer func() { require.NoError(t, testcontainers.TerminateContainer(mysqlContainer)) }() host, err := mysqlContainer.Host(ctx) require.NoError(t, err) port, err := mysqlContainer.MappedPort(ctx, nat.Port("3306/tcp")) require.NoError(t, err) logrus.Infof("mysql running on %s:%d", host, port.Int()) cfg := Config{ Host: host, Database: dbName, User: dbUser, Password: dbPassword, Port: port.Int(), } connector, err := connectors.New("mysql", cfg) assert.NoError(t, err) assert.NotNil(t, connector) t.Run("Ping Database", func(t *testing.T) { err := connector.Ping(ctx) assert.NoError(t, err) }) t.Run("Discovery Tables", func(t *testing.T) { tables, err := connector.Discovery(ctx, nil) assert.NoError(t, err) assert.NotEmpty(t, tables) }) t.Run("Read Endpoint", func(t *testing.T) { endpoint := model.Endpoint{ Query: "SELECT COUNT(*) AS total_count FROM gachi_teams", Params: []model.EndpointParams{}, } params := map[string]any{} rows, err := connector.Query(ctx, endpoint, params) assert.NoError(t, err) assert.NotEmpty(t, rows) }) t.Run("Query Endpoint", func(t *testing.T) { endpoint := model.Endpoint{ Query: `SELECT name, strength_level, special_move FROM gachi_personas WHERE strength_level > :min_strength ORDER BY strength_level DESC`, Params: []model.EndpointParams{ { Name: "min_strength", Type: "int", Required: true, }, }, } params := map[string]any{ "min_strength": 90, } rows, err := connector.Query(ctx, endpoint, params) assert.NoError(t, err) assert.NotEmpty(t, rows) // Verify results expected := []map[string]any{ {"name": "Billy Herrington", "strength_level": int64(100), "special_move": "Anvil Drop"}, {"name": "Van Darkholme", "strength_level": int64(95), "special_move": "Whip of Submission"}, } assert.Equal(t, len(expected), len(rows)) for i, exp := range expected { assert.Equal(t, exp["name"], rows[i]["name"]) assert.Equal(t, exp["strength_level"], rows[i]["strength_level"]) assert.Equal(t, exp["special_move"], rows[i]["special_move"]) } }) } func TestMySQLTypeMapping(t *testing.T) { c := &Connector{} tests := []struct { name string sqlType string expected model.ColumnType }{ // String types {"varchar", "VARCHAR", model.TypeString}, {"text", "TEXT", model.TypeString}, {"char", "CHAR", model.TypeString}, {"tinytext", "TINYTEXT", model.TypeString}, {"mediumtext", "MEDIUMTEXT", model.TypeString}, {"longtext", "LONGTEXT", model.TypeString}, {"enum", "ENUM", model.TypeString}, // Numeric types {"decimal", "DECIMAL", model.TypeNumber}, {"numeric", "NUMERIC", model.TypeNumber}, {"float", "FLOAT", model.TypeNumber}, {"double", "DOUBLE", model.TypeNumber}, // Integer types {"int", "INT", model.TypeInteger}, {"tinyint", "TINYINT", model.TypeInteger}, {"smallint", "SMALLINT", model.TypeInteger}, {"mediumint", "MEDIUMINT", model.TypeInteger}, {"bigint", "BIGINT", model.TypeInteger}, // Boolean type (MySQL uses TINYINT(1) for boolean) {"boolean", "BOOLEAN", model.TypeBoolean}, {"bool", "BOOL", model.TypeBoolean}, {"tinyint(1)", "TINYINT(1)", model.TypeBoolean}, // JSON type {"json", "JSON", model.TypeObject}, // Set type (mapped to array) {"set", "SET", model.TypeArray}, // Date/Time types {"date", "DATE", model.TypeDatetime}, {"time", "TIME", model.TypeDatetime}, {"datetime", "DATETIME", model.TypeDatetime}, {"timestamp", "TIMESTAMP", model.TypeDatetime}, {"year", "YEAR", model.TypeDatetime}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { result := c.GuessColumnType(tt.sqlType) assert.Equal(t, tt.expected, result, "Type mapping mismatch for %s", tt.sqlType) }) } } func TestLoadColumns_WithPrimaryKey(t *testing.T) { ctx := context.Background() dbName := "testdb" dbUser := "user" dbPassword := "password" mysqlContainer, err := mysql.Run(ctx, "mysql:8.0", mysql.WithDatabase(dbName), mysql.WithUsername(dbUser), mysql.WithPassword(dbPassword), mysql.WithScripts(filepath.Join("testdata", "test_schema.sql")), ) require.NoError(t, err) defer func() { require.NoError(t, testcontainers.TerminateContainer(mysqlContainer)) }() host, err := mysqlContainer.Host(ctx) require.NoError(t, err) port, err := mysqlContainer.MappedPort(ctx, nat.Port("3306/tcp")) require.NoError(t, err) cfg := Config{ Host: host, Database: dbName, User: dbUser, Password: dbPassword, Port: port.Int(), } connector, err := connectors.New("mysql", cfg) require.NoError(t, err) // Test primary key detection c := connector.(*Connector) columns, err := c.LoadsColumns(ctx, "test_table") require.NoError(t, err) // Verify results assert.Len(t, columns, 3) // Find and verify primary key column var foundPK bool for _, col := range columns { if col.Name == "id" { assert.True(t, col.PrimaryKey, "Column 'id' should be a primary key") foundPK = true } else { assert.False(t, col.PrimaryKey, "Column '%s' should not be a primary key", col.Name) } } assert.True(t, foundPK, "Primary key column 'id' not found") }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/centralmind/gateway'

If you have feedback or need assistance with the MCP directory API, please join our Discord server