Skip to main content
Glama
mcp-client.js26.5 kB
import { spawn } from "child_process"; import { createInterface } from "readline"; import dotenv from "dotenv"; import net from "net"; // Load environment variables dotenv.config(); /** * McpClient - A client for interacting with the Twitter MCP server * * This class provides methods for: * - Starting a new MCP server or connecting to an existing one * - Sending requests to the MCP server * - Handling responses from the MCP server * - Managing the lifecycle of the MCP server */ export class McpClient { /** * Create a new McpClient instance * * @param {Object} options - Configuration options * @param {number} options.port - The port to use for the MCP server (default: 3001) * @param {boolean} options.debug - Whether to enable debug logging (default: false) * @param {number} options.maxPortAttempts - Maximum number of port attempts (default: 10) * @param {number} options.portIncrement - Increment port by this amount on conflict (default: 1) * @param {boolean} options.startServer - Whether to start a new server or connect to existing (default: true) */ constructor(options = {}) { this.mcpProcess = null; this.rl = null; this.responseHandlers = new Map(); this.nextRequestId = 1; this.options = { port: options.port || 3001, debug: options.debug || false, maxPortAttempts: options.maxPortAttempts || 10, portIncrement: options.portIncrement || 1, startServer: options.startServer !== false, // Start the server by default }; this.isReady = false; this.isExiting = false; this.restartAttempts = 0; this.maxRestartAttempts = 3; this.currentPort = this.options.port; this.socket = null; // Socket for connecting to existing server } /** * Start the MCP server or connect to an existing one * * @returns {Promise<void>} A promise that resolves when the server is started or connected */ async start() { if (this.options.startServer) { return this.startServer(); } else { return this.connectToServer(); } } /** * Connect to an existing MCP server * * @returns {Promise<void>} A promise that resolves when connected to the server */ async connectToServer() { return new Promise((resolve, reject) => { try { console.log(`Connecting to MCP server on port ${this.currentPort}...`); // Create a socket connection to the server this.socket = new net.Socket(); this.socket.on("connect", () => { console.log(`Connected to MCP server on port ${this.currentPort}`); this.isReady = true; // Create readline interface for reading responses this.rl = createInterface({ input: this.socket, crlfDelay: Infinity, }); // Handle responses this.rl.on("line", (line) => { // Log the raw line for debugging if (this.options.debug) { console.log("Raw MCP output:", line); } try { // Try to parse the line as JSON const response = JSON.parse(line); if (this.options.debug) { console.log( "Received response:", JSON.stringify(response, null, 2) ); } // Check if this is a JSON-RPC 2.0 response if (response.jsonrpc === "2.0" && response.id) { // Find and call the appropriate response handler if (this.responseHandlers.has(response.id)) { const handler = this.responseHandlers.get(response.id); this.responseHandlers.delete(response.id); handler(response); } } } catch (error) { // Don't let parsing errors crash the application if (this.options.debug) { console.error("Error parsing MCP response:", error); console.error("Problematic line:", line); } } }); resolve(); }); this.socket.on("error", (error) => { console.error(`Error connecting to MCP server: ${error.message}`); reject(error); }); // Connect to the server this.socket.connect(this.currentPort, "localhost"); } catch (error) { reject(error); } }); } /** * Start a new MCP server * * @returns {Promise<void>} A promise that resolves when the server is started */ async startServer() { return new Promise((resolve, reject) => { try { console.log("Starting MCP server..."); // Create a custom environment with our port settings const env = { ...process.env }; // Set environment variables to disable HTTP server and configure port env.DISABLE_HTTP_SERVER = "true"; env.PORT = this.currentPort.toString(); if (this.options.debug) { console.log("DEBUG: Environment variables for MCP process:", { DISABLE_HTTP_SERVER: env.DISABLE_HTTP_SERVER, PORT: env.PORT, }); } // Start the MCP server process if (this.options.debug) { console.log("DEBUG: Spawning MCP process"); } this.mcpProcess = spawn( "node", ["node_modules/agent-twitter-client-mcp/build/index.js"], { env: env, stdio: ["pipe", "pipe", "pipe"], cwd: process.cwd(), // Use the current working directory } ); if (this.options.debug) { console.log( `DEBUG: MCP process spawned with PID ${this.mcpProcess.pid}` ); } // Set up error handling for the process this.mcpProcess.on("error", (err) => { console.error("ERROR: MCP process error:", err); // Don't reject the promise if we're already exiting if (!this.isExiting) { reject(err); } }); // Handle stderr output this.mcpProcess.stderr.on("data", (data) => { const stderr = data.toString(); console.error("MCP Error:", stderr); // If this is a port conflict error, set the flag if (stderr.includes("EADDRINUSE")) { if (this.options.debug) { console.log("DEBUG: Port conflict detected in stderr"); } } }); // Create readline interface for reading MCP responses this.rl = createInterface({ input: this.mcpProcess.stdout, crlfDelay: Infinity, }); if (this.options.debug) { console.log("DEBUG: Readline interface created"); } // Handle MCP responses this.rl.on("line", (line) => { // Log the raw line for debugging if (this.options.debug) { console.log("Raw MCP output:", line); } try { // Check if this is a log message (starts with timestamp) if (line.match(/^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}/)) { // This is a log message, not a JSON response if (this.options.debug) { console.log("Log message:", line); } // Check if this indicates the server is ready if ( line.includes("Twitter MCP server running") || line.includes("Initial health check completed") ) { if (!this.isReady) { this.isReady = true; console.log("MCP client started successfully!"); resolve(); } } return; } // Try to parse the line as JSON const response = JSON.parse(line); if (this.options.debug) { console.log( "Received response:", JSON.stringify(response, null, 2) ); } // Check if this is a JSON-RPC 2.0 response if (response.jsonrpc === "2.0" && response.id) { // Find and call the appropriate response handler if (this.responseHandlers.has(response.id)) { const handler = this.responseHandlers.get(response.id); this.responseHandlers.delete(response.id); handler(response); } } } catch (error) { // Don't let parsing errors crash the application if ( this.options.debug && !line.match(/^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}/) ) { console.error("Error parsing MCP response:", error); console.error("Problematic line:", line); } } }); // Handle MCP errors this.mcpProcess.stderr.on("data", (data) => { const errorMsg = data.toString(); console.error("MCP Error:", errorMsg); // Check for port conflict if (errorMsg.includes("EADDRINUSE")) { if (this.options.debug) { console.log( "DEBUG: Port conflict detected, but continuing with stdio interface" ); } // If we haven't started yet, mark as started since we can still use stdio if (!this.isReady) { this.isReady = true; console.log( "MCP client started successfully (using stdio only)!" ); resolve(); } } }); // Handle process exit this.mcpProcess.on("exit", (code, signal) => { if (this.options.debug) { console.log( `DEBUG: MCP process exited with code ${code} and signal ${signal}` ); } // Don't treat exit as an error during normal shutdown if (!this.isExiting) { console.log(`MCP process exited unexpectedly with code ${code}`); // Check if the process exited due to a port conflict if (code === 1) { // Check if we've tried too many ports if ( this.currentPort - this.options.port >= this.options.maxPortAttempts * this.options.portIncrement ) { console.error( `ERROR: Tried ${this.options.maxPortAttempts} different ports without success. Giving up.` ); reject( new Error( `Failed to start MCP server after trying ${this.options.maxPortAttempts} different ports` ) ); return; } // Try the next port this.currentPort += this.options.portIncrement; if (this.options.debug) { console.log( `DEBUG: Port conflict detected. Trying port ${this.currentPort}` ); } // Restart the process with the new port this.restartMcpProcess(); return; } // If we're not already exiting, set the process as null this.mcpProcess = null; // Don't reject the promise if we've already resolved it if (this.isReady) { // The server was running but exited unexpectedly // We could try to restart it here if needed if (this.restartAttempts < this.maxRestartAttempts) { console.log("Attempting to restart MCP process..."); this.restartMcpProcess(); } } else { reject(new Error(`MCP process exited with code ${code}`)); } } }); // Wait for the MCP server to start setTimeout(() => { if (!this.isReady) { this.isReady = true; console.log("MCP client started successfully (timeout)!"); resolve(); } }, 5000); // Increased timeout to 5 seconds } catch (error) { reject(error); } }); } /** * Restart the MCP process (used for recovery) */ restartMcpProcess() { if (this.restartAttempts >= this.maxRestartAttempts) { console.error( `ERROR: Maximum restart attempts (${this.maxRestartAttempts}) reached. Giving up.` ); return; } this.restartAttempts++; console.log(`Restarting MCP process (attempt ${this.restartAttempts})...`); // Clean up the old process if (this.mcpProcess) { try { this.mcpProcess.kill(); } catch (error) { // Ignore errors when killing the process } } // Close the readline interface if (this.rl) { try { this.rl.close(); } catch (error) { // Ignore errors when closing the readline interface } } // Reset state this.mcpProcess = null; this.rl = null; this.isReady = false; // Restart after a short delay setTimeout(() => { this.startServer().catch((error) => { console.error("ERROR: Failed to restart MCP process:", error); }); }, 1000); } /** * List available tools from the MCP server * * @returns {Promise<Object>} A promise that resolves with the list of tools */ async listTools() { return new Promise((resolve, reject) => { try { // Check if we have a connection to the MCP server if ( (this.options.startServer && (!this.mcpProcess || this.mcpProcess.killed || this.mcpProcess.exitCode !== null || !this.mcpProcess.stdin || !this.mcpProcess.stdin.writable)) || (!this.options.startServer && (!this.socket || !this.socket.writable)) ) { const error = new Error("No connection to MCP server available"); error.details = { startServer: this.options.startServer, mcpProcess: this.options.startServer ? { killed: this.mcpProcess?.killed, exitCode: this.mcpProcess?.exitCode, stdinWritable: this.mcpProcess?.stdin?.writable, } : null, socket: !this.options.startServer ? { writable: this.socket?.writable, } : null, }; console.error( "ERROR: Cannot list tools:", error.message, error.details ); if (this.options.startServer) { // Try to restart the MCP process this.restartMcpProcess(); } reject(error); return; } const requestId = this.nextRequestId++; // Create the request in JSON-RPC 2.0 format const request = { jsonrpc: "2.0", id: requestId.toString(), method: "tools/list", params: {}, }; if (this.options.debug) { console.log( "Sending tools/list request:", JSON.stringify(request, null, 2) ); } // Register response handler this.responseHandlers.set(requestId.toString(), (response) => { if (response.result) { resolve(response.result); } else if (response.error) { console.error( "ERROR: Error listing tools:", response.error.message || "Unknown error" ); reject(new Error(response.error.message || "Unknown error")); } else { console.error("ERROR: Invalid response from MCP server"); reject(new Error("Invalid response from MCP server")); } }); // Send the request this.sendRequest(request); } catch (error) { console.error("ERROR: Exception in listTools:", error); reject(error); } }); } /** * Send a tweet * * @param {string} text - The text of the tweet * @param {string|null} replyToTweetId - Optional ID of a tweet to reply to * @returns {Promise<Object>} A promise that resolves with the result of sending the tweet */ async sendTweet(text, replyToTweetId = null) { return new Promise((resolve, reject) => { try { // Check if we have a connection to the MCP server if ( (this.options.startServer && (!this.mcpProcess || this.mcpProcess.killed || this.mcpProcess.exitCode !== null || !this.mcpProcess.stdin || !this.mcpProcess.stdin.writable)) || (!this.options.startServer && (!this.socket || !this.socket.writable)) ) { const error = new Error("No connection to MCP server available"); error.details = { startServer: this.options.startServer, mcpProcess: this.options.startServer ? { killed: this.mcpProcess?.killed, exitCode: this.mcpProcess?.exitCode, stdinWritable: this.mcpProcess?.stdin?.writable, } : null, socket: !this.options.startServer ? { writable: this.socket?.writable, } : null, }; console.error( "ERROR: Cannot send tweet:", error.message, error.details ); if (this.options.startServer) { // Try to restart the MCP process this.restartMcpProcess(); } reject(error); return; } if (this.options.debug && this.options.startServer) { console.log("DEBUG: MCP process state before sending:", { pid: this.mcpProcess.pid, killed: this.mcpProcess.killed, exitCode: this.mcpProcess.exitCode, stdinWritable: this.mcpProcess.stdin.writable, }); } const requestId = this.nextRequestId++; // Create the request in JSON-RPC 2.0 format // Based on our findings, the correct format is: // - method: "tools/call" // - params.name: The name of the tool // - params.arguments: The arguments for the tool (not args) const request = { jsonrpc: "2.0", id: requestId.toString(), method: "tools/call", params: { name: "send_tweet", arguments: { text: text, }, }, }; // Add replyToTweetId if provided if (replyToTweetId) { request.params.arguments.replyToTweetId = replyToTweetId; } // Log the request for debugging if (this.options.debug) { console.log( "DEBUG: Sending tweet request:", JSON.stringify(request, null, 2) ); } // Register response handler this.responseHandlers.set(requestId.toString(), (response) => { if (this.options.debug) { console.log( "Received response:", JSON.stringify(response, null, 2) ); } // Check for error in the response content if ( response.result && response.result.content && response.result.content.length > 0 && response.result.content[0].isError ) { const errorMessage = response.result.content[0].text || "Unknown error in response"; console.error("ERROR: Error sending tweet:", errorMessage); reject(new Error(errorMessage)); return; } // Check for standard error format if (response.error) { console.error( "ERROR: Error sending tweet:", response.error.message || "Unknown error" ); reject(new Error(response.error.message || "Unknown error")); return; } // If we get here, the tweet was sent successfully if ( response.result && response.result.content && response.result.content.length > 0 ) { try { // Try to parse the tweet data from the response const contentText = response.result.content[0].text; if (contentText) { try { const tweetData = JSON.parse(contentText); if (tweetData && tweetData.tweet && tweetData.tweet.id) { if (this.options.debug) { console.log( "DEBUG: Successfully parsed tweet data:", tweetData ); } resolve(tweetData.tweet); return; } } catch (parseError) { console.error( "ERROR: Failed to parse tweet data:", parseError ); // Continue with the original response if parsing fails } } } catch (error) { console.error( "ERROR: Exception while processing response:", error ); // Continue with the original response if processing fails } // If we couldn't extract the tweet data, return the original response resolve(response.result); } else { console.error("ERROR: Invalid response from MCP server"); reject(new Error("Invalid response from MCP server")); } }); // Send the request this.sendRequest(request); } catch (error) { console.error("ERROR: Exception in sendTweet:", error); reject(error); } }); } /** * Stop the MCP client */ stop() { if (this.options.debug) { console.log("DEBUG: Stopping MCP client"); } // Clear any pending response handlers this.responseHandlers.clear(); // Close the readline interface if it exists if (this.rl) { if (this.options.debug) { console.log("DEBUG: Closing readline interface"); } this.rl.close(); this.rl = null; } // Kill the MCP process if we started it if (this.options.startServer && this.mcpProcess) { if (this.options.debug) { console.log("DEBUG: Killing MCP process"); } // Close stdin before killing to prevent EPIPE errors if (this.mcpProcess.stdin && this.mcpProcess.stdin.writable) { try { this.mcpProcess.stdin.end(); } catch (error) { if (this.options.debug) { console.log("DEBUG: Error closing stdin:", error.message); } } } try { // Use SIGTERM for a graceful shutdown this.mcpProcess.kill("SIGTERM"); // Set a timeout to force kill if it doesn't exit gracefully setTimeout(() => { if (this.mcpProcess && !this.mcpProcess.killed) { if (this.options.debug) { console.log("DEBUG: Force killing MCP process with SIGKILL"); } try { this.mcpProcess.kill("SIGKILL"); } catch (error) { if (this.options.debug) { console.log( "DEBUG: Error force killing process:", error.message ); } } } }, 1000); } catch (error) { if (this.options.debug) { console.log("DEBUG: Error killing MCP process:", error.message); } } // Set up a one-time listener for the exit event this.mcpProcess.once("exit", (code, signal) => { if (this.options.debug) { console.log( `DEBUG: MCP process exited with code ${code} and signal ${signal}` ); } this.mcpProcess = null; }); } else if (this.socket) { // Close the socket if it exists if (this.options.debug) { console.log("DEBUG: Closing socket connection"); } try { this.socket.end(); this.socket = null; } catch (error) { if (this.options.debug) { console.log("DEBUG: Error closing socket:", error.message); } } } return Promise.resolve(); } /** * Send a request to the MCP server * * @param {Object} request - The request to send */ sendRequest(request) { try { // Validate connection before sending if ( this.options.startServer && this.mcpProcess && this.mcpProcess.stdin && this.mcpProcess.stdin.writable ) { // Send the request to the MCP process's stdin try { this.mcpProcess.stdin.write(JSON.stringify(request) + "\n"); } catch (writeError) { if (writeError.code === "EPIPE") { console.error( "ERROR: The connection to the MCP process has been closed (EPIPE)." ); // Try to restart the MCP process this.restartMcpProcess(); throw new Error( "Connection to MCP server lost. Attempting to restart." ); } else { throw writeError; } } } else if (this.socket && this.socket.writable) { // Send the request to the socket try { this.socket.write(JSON.stringify(request) + "\n"); } catch (writeError) { if (writeError.code === "EPIPE") { console.error( "ERROR: The connection to the MCP socket has been closed (EPIPE)." ); throw new Error("Connection to MCP server lost. Please reconnect."); } else { throw writeError; } } } else { throw new Error("No connection to MCP server available"); } } catch (error) { if (this.options.debug) { console.error("DEBUG: Error in sendRequest:", error); } throw error; } } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ryanmac/agent-twitter-client-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server