flink-mcp
Provides tools to interact with Apache Flink SQL Gateway, enabling session management, SQL query execution, streaming query results, and job control.
Click on "Install Server".
Wait a few minutes for the server to deploy. Once ready, it will show a "Started" state.
In the chat, type
@followed by the MCP server name and your instructions, e.g., "@flink-mcpRun a SQL query to select from the 'orders' table and show the first 5 rows."
That's it! The server will respond to your query, and you can continue using it as needed.
Here is a step-by-step guide with screenshots.
flink-mcp — Flink MCP Server
This project provides an MCP server that connects to Apache Flink SQL Gateway.
Prerequisites
A running Apache Flink cluster and SQL Gateway
Start cluster:
./bin/start-cluster.shStart gateway:
./bin/sql-gateway.sh start -Dsql-gateway.endpoint.rest.address=localhostVerify:
curl http://localhost:8083/v3/info
Configure environment:
Set
SQL_GATEWAY_API_BASE_URL(defaulthttp://localhost:8083). You can use a.envfile at repo root.
Run
Install and run via the console script:
pip install -e .
flink-mcpMCP clients should launch the server over stdio with command: flink-mcp.
Ensure SQL_GATEWAY_API_BASE_URL is set in your environment or .env.
Tools (v0.2.5)
flink_info(resource): returns cluster info from/v3/info.open_new_session(properties?: dict)->{ sessionHandle, ... }.get_config(sessionHandle: str): returns session configuration.configure_session(sessionHandle: str, statement: str): apply session-scoped DDL/config (CREATE/USE/SET/RESET/LOAD/UNLOAD/ADD JAR).run_query_collect_and_stop(sessionHandle: str, query: str, max_rows: int=5, max_seconds: float=15.0): execute, fetch up to N rows within T seconds, then STOP the job if ajobIDis present; closes the operation.run_query_stream_start(sessionHandle: str, query: str): execute a streaming query and return{ jobID, operationHandle }; the job is left running.fetch_result_page(sessionHandle: str, operationHandle: str, token: int): fetch a single page; returns{ page, nextToken, isEnd }.cancel_job(sessionHandle: str, jobId: str): issueSTOP JOB '<jobId>', wait until DESCRIBE JOB status is not RUNNING; returns{ jobID, status, jobGone, jobStatus }.
Notes
Tools are stateless; clients manage and pass session/operation handles explicitly.
run_query_stream_startreturns bothjobIDandoperationHandle; usefetch_result_pageto stream results.cancel_jobissues STOP and waits using DESCRIBE JOB;close_operationis invoked internally where appropriate.Endpoints target SQL Gateway v3-style paths.
This server cannot be installed
Maintenance
Resources
Unclaimed servers have limited discoverability.
Looking for Admin?
If you are the server author, to access and configure the admin panel.
Latest Blog Posts
MCP directory API
We provide all the information about MCP servers via our MCP API.
curl -X GET 'https://glama.ai/api/mcp/v1/servers/Cledar/flink-mcp'
If you have feedback or need assistance with the MCP directory API, please join our Discord server