Skip to main content
Glama
main.swift26.7 kB
import Logging import MCP import Network import ServiceLifecycle import SystemPackage import struct Foundation.Data import class Foundation.RunLoop var log = Logger(label: "me.mattt.iMCP.server") { StreamLogHandler.standardError(label: $0) } #if DEBUG log.logLevel = .debug #else log.logLevel = .warning #endif // Network setup let serviceType = "_mcp._tcp" let parameters = NWParameters.tcp parameters.acceptLocalOnly = true parameters.includePeerToPeer = false if let tcpOptions = parameters.defaultProtocolStack.internetProtocol as? NWProtocolIP.Options { tcpOptions.version = .v4 } // Create browser at top level let browser = NWBrowser( for: .bonjour(type: serviceType, domain: nil), using: parameters ) log.info("Created Bonjour browser for service type: \(serviceType)") actor ConnectionState { private var hasResumed = false func checkAndSetResumed() -> Bool { if !hasResumed { hasResumed = true return true } return false } } /// An actor that provides a configurable proxy between standard I/O and network connections actor StdioProxy { // Connection configuration private let endpoint: NWEndpoint private let parameters: NWParameters private let stdinBufferSize: Int private let networkBufferSize: Int // Connection state private var connection: NWConnection? private var isRunning = false // Message buffering for proper JSON-RPC message boundaries private var networkToStdoutBuffer = Data() /// Creates a new StdioProxy with the specified network configuration /// - Parameters: /// - endpoint: The network endpoint to connect to /// - parameters: Network connection parameters /// - stdinBufferSize: Buffer size for reading from stdin (default: 4096) /// - networkBufferSize: Buffer size for reading from network (default: 4096) init( endpoint: NWEndpoint, parameters: NWParameters = .tcp, stdinBufferSize: Int = 10 * 1024 * 1024, networkBufferSize: Int = 10 * 1024 * 1024 ) { self.endpoint = endpoint self.parameters = parameters self.stdinBufferSize = stdinBufferSize self.networkBufferSize = networkBufferSize } /// Starts the proxy func start() async throws { guard !isRunning else { return } isRunning = true // Create the connection let connection = NWConnection(to: endpoint, using: parameters) self.connection = connection // Start the connection connection.start(queue: .main) // Set up state monitoring for the entire lifetime of the connection connection.stateUpdateHandler = { state in Task { [weak self] in await self?.handleConnectionState(state, continuation: nil) } } // Wait for the connection to become ready try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<Void, Swift.Error>) in connection.stateUpdateHandler = { state in Task { [weak self] in await self?.handleConnectionState(state, continuation: continuation) } } } // Create a structured concurrency task group for handling I/O try await withThrowingTaskGroup(of: Void.self) { group in // Add task for handling stdin to network group.addTask { [stdinBufferSize] in do { try await self.handleStdinToNetwork(bufferSize: stdinBufferSize) } catch { await log.error("Stdin handler failed: \(error)") throw error } } // Add task for handling network to stdout group.addTask { [networkBufferSize] in do { try await self.handleNetworkToStdout(bufferSize: networkBufferSize) } catch { await log.error("Network handler failed: \(error)") throw error } } // Wait for any task to complete (or fail) try await group.next() await log.debug("A task completed, cancelling remaining tasks") // If we get here, one of the tasks completed or failed // Cancel all remaining tasks group.cancelAll() // Stop the proxy await self.stop() } } /// Stops the proxy and cleans up resources func stop() async { isRunning = false connection?.cancel() connection = nil } /// Handles connection state changes private func handleConnectionState( _ state: NWConnection.State, continuation: CheckedContinuation<Void, Swift.Error>? ) async { switch state { case .ready: await log.debug("Connection established to \(endpoint)") continuation?.resume() case .failed(let error): await log.debug("Connection failed: \(error)") if let continuation = continuation { continuation.resume(throwing: error) } await stop() case .cancelled: await log.debug("Connection cancelled") if let continuation = continuation { continuation.resume(throwing: CancellationError()) } await stop() case .waiting(let error): await log.debug("Connection waiting: \(error)") case .preparing: await log.debug("Connection preparing...") case .setup: await log.debug("Connection setup...") @unknown default: await log.debug("Unknown connection state") } } private func setNonBlocking(fileDescriptor: FileDescriptor) throws { let flags = fcntl(fileDescriptor.rawValue, F_GETFL) guard flags >= 0 else { throw MCPError.transportError(Errno.badFileDescriptor) } let result = fcntl(fileDescriptor.rawValue, F_SETFL, flags | O_NONBLOCK) guard result >= 0 else { throw MCPError.transportError(Errno.badFileDescriptor) } } /// Handles forwarding data from stdin to the network private func handleStdinToNetwork(bufferSize: Int) async throws { let stdin = FileDescriptor.standardInput try setNonBlocking(fileDescriptor: stdin) var buffer = [UInt8](repeating: 0, count: bufferSize) var pendingData = Data() while true { // Check connection state at the beginning of each loop iteration guard isRunning, let connection = self.connection else { await log.debug("Connection no longer active, stopping stdin handler") throw StdioProxyError.connectionClosed } // Also check connection state if connection.state != .ready && connection.state != .preparing { await log.debug( "Connection state changed to \(connection.state), stopping stdin handler") throw StdioProxyError.connectionClosed } do { // Read data from stdin using SystemPackage approach let bytesRead = try buffer.withUnsafeMutableBufferPointer { pointer in try stdin.read(into: UnsafeMutableRawBufferPointer(pointer)) } if bytesRead == 0 { // EOF reached await log.debug("EOF reached on stdin, stopping stdin handler") break } if bytesRead > 0 { // Append the read bytes to pending data pendingData.append(contentsOf: buffer[0..<bytesRead]) // Check if the data is only whitespace let isOnlyWhitespace = pendingData.allSatisfy { let char = Character(UnicodeScalar($0)) return char.isWhitespace || char.isNewline } // Only send if we have non-whitespace content if !isOnlyWhitespace && !pendingData.isEmpty { // Send data to the network connection try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<Void, Swift.Error>) in connection.send( content: pendingData, completion: .contentProcessed { error in if let error = error { continuation.resume(throwing: error) } else { continuation.resume() } }) } await log.debug("Sent \(pendingData.count) bytes to network") } else if isOnlyWhitespace && !pendingData.isEmpty { await log.trace( "Skipping send of \(pendingData.count) whitespace-only bytes") } // Clear pending data after processing pendingData.removeAll(keepingCapacity: true) } } catch { if let posixError = error as? Errno, posixError == .wouldBlock { try await Task.sleep(for: .milliseconds(10)) // Keep the sleep to yield CPU continue } await log.error("Error in stdin handler: \(error)") throw error } } await log.debug("Stdin handler task completed") } /// Handles forwarding data from the network to stdout private func handleNetworkToStdout(bufferSize: Int) async throws { let stdout = FileDescriptor.standardOutput var consecutiveEmptyReads = 0 let maxConsecutiveEmptyReads = 100 // After this many consecutive empty reads, we'll check connection state while true { // Check connection state at the beginning of each loop iteration guard isRunning, let connection = self.connection else { await log.debug("Connection no longer active, stopping network handler") throw StdioProxyError.connectionClosed } // Also check connection state if connection.state != .ready && connection.state != .preparing { await log.debug( "Connection state changed to \(connection.state), stopping network handler") throw StdioProxyError.connectionClosed } do { // Check connection state periodically if we're getting consecutive empty reads if consecutiveEmptyReads > 0 && consecutiveEmptyReads % maxConsecutiveEmptyReads == 0 { // If we've had too many empty reads, consider it a timeout if consecutiveEmptyReads > maxConsecutiveEmptyReads * 10 { await log.warning( "Network read timed out after \(consecutiveEmptyReads) consecutive empty reads" ) throw StdioProxyError.networkTimeout } } // Receive data from the network connection with timeout let receiveTask = Task { try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<Data, Swift.Error>) in connection.receive(minimumIncompleteLength: 1, maximumLength: bufferSize) { data, _, isComplete, error in if let error = error { continuation.resume(throwing: error) return } if let data = data { continuation.resume(returning: data) } else if isComplete { log.debug("Network connection complete") continuation.resume(throwing: StdioProxyError.connectionClosed) } else { continuation.resume(returning: Data()) } } } } // Add a timeout for the receive operation let timeoutTask = Task { try await Task.sleep(for: .seconds(5)) receiveTask.cancel() return Data() } // Wait for either the receive task or the timeout let data: Data do { data = try await receiveTask.value timeoutTask.cancel() } catch is CancellationError { // Receive was cancelled by timeout consecutiveEmptyReads += 10 // Accelerate the empty read counter try await Task.sleep(for: .milliseconds(10)) continue } var processedData = data // Check for and filter out heartbeat messages using MCP.NetworkTransport.Heartbeat // Assuming MCP module and NetworkTransport.Heartbeat are available if NetworkTransport.Heartbeat.isHeartbeat(processedData) { await log.debug( "Heartbeat signature detected in received network data using MCP definition." ) // Try to parse a full heartbeat. MCP.NetworkTransport.Heartbeat.from(data:) checks for minimum length internally. if let heartbeat = NetworkTransport.Heartbeat.from(data: processedData) { let heartbeatLength = heartbeat.rawValue.count // This should typically be 12 await log.debug( "Full MCP heartbeat message (\(heartbeatLength) bytes) received from network, skipping output." ) // Remove the full heartbeat from the data processedData = processedData.dropFirst(heartbeatLength) } else { // MCP.NetworkTransport.Heartbeat.isHeartbeat was true, but .from(data:) failed. // This means we have the magic bytes but not the full message (e.g., data length < 12 but >= 4). let expectedHeartbeatLength = MCP.NetworkTransport.Heartbeat().rawValue .count // Get expected length (12) await log.debug( "Partial MCP heartbeat message (<\(expectedHeartbeatLength) bytes) received, discarding this chunk to prevent garbled output." ) processedData = Data() // Discard the chunk } } if processedData.isEmpty { // No data available (or entire chunk was a heartbeat), yield to other tasks // If original data was not empty, but processedData is, it means it was a heartbeat. if !data.isEmpty { // Original data was not empty, so this was a heartbeat consecutiveEmptyReads = 0 // Reset counter as we did receive something (a heartbeat) } else { consecutiveEmptyReads += 1 } try await Task.sleep(for: .milliseconds(10)) continue } else { // Reset counter when we get actual data (not just a heartbeat) consecutiveEmptyReads = 0 await log.debug( "Received \(processedData.count) bytes of application data from network" ) } // Add data to buffer for message assembly networkToStdoutBuffer.append(processedData) // Process complete messages (delimited by newlines) while let newlineIndex = networkToStdoutBuffer.firstIndex(of: UInt8(ascii: "\n")) { let messageData = networkToStdoutBuffer[..<newlineIndex] var messageWithNewline = Data(messageData) messageWithNewline.append(UInt8(ascii: "\n")) // Remove processed message from buffer networkToStdoutBuffer = networkToStdoutBuffer[(newlineIndex + 1)...] // Write complete message to stdout var remainingDataToWrite = messageWithNewline while !remainingDataToWrite.isEmpty { let bytesWritten: Int = try remainingDataToWrite.withUnsafeBytes { buffer in try stdout.write(UnsafeRawBufferPointer(buffer)) } if bytesWritten < remainingDataToWrite.count { await log.debug( "Partial write: \(bytesWritten) of \(remainingDataToWrite.count) bytes" ) // Remove the bytes that were written remainingDataToWrite = remainingDataToWrite.dropFirst(bytesWritten) } else { // All bytes were written remainingDataToWrite.removeAll() } // If we still have data to write, give a small delay to allow the system to process if !remainingDataToWrite.isEmpty { try await Task.sleep(for: .milliseconds(1)) } } } } catch let error as NWError where error.errorCode == 96 { // Handle "No message available on STREAM" error await log.debug("Network read yielded no data, waiting...") consecutiveEmptyReads += 1 try await Task.sleep(for: .milliseconds(100)) } catch { // Check if the connection was cancelled or closed if let nwError = error as? NWError, nwError.errorCode == 57 // Socket is not connected || nwError.errorCode == 54 // Connection reset by peer { await log.debug("Connection closed by peer: \(error)") throw StdioProxyError.connectionClosed } if error is StdioProxyError { throw error } await log.error("Error in network handler: \(error)") throw error } } } } // Define custom errors for the StdioProxy enum StdioProxyError: Swift.Error { case networkTimeout case connectionClosed } // Create MCPService class to manage lifecycle actor MCPService: Service { let browser: NWBrowser private var currentProxy: StdioProxy? init() { self.browser = NWBrowser( for: .bonjour(type: serviceType, domain: nil), using: parameters ) } func run() async throws { while true { do { await log.info("Starting Bonjour service discovery...") // Find and connect to iMCP app with improved reliability let endpoint: NWEndpoint = try await withCheckedThrowingContinuation { continuation in let connectionState = ConnectionState() // Set up a timeout task to ensure we don't wait forever let timeoutTask = Task { // Allow 30 seconds to find the service try await Task.sleep(for: .seconds(30)) // If we haven't found a service by now, resume with an error if await connectionState.checkAndSetResumed() { await log.error("Bonjour service discovery timed out after 30 seconds") continuation.resume( throwing: MCPError.internalError("Service discovery timeout")) } } // Convert async handlers to sync handlers browser.stateUpdateHandler = { state in Task { switch state { case .failed(let error): await log.error("Browser failed: \(error)") if await connectionState.checkAndSetResumed() { timeoutTask.cancel() continuation.resume(throwing: error) } case .ready: await log.info("Browser is ready and searching for services") case .waiting(let error): await log.warning("Browser is waiting: \(error)") default: await log.debug("Browser state changed: \(state)") } } } browser.browseResultsChangedHandler = { results, changes in Task { await log.debug("Found \(results.count) Bonjour services") // Log all discovered services for debugging for (index, result) in results.enumerated() { await log.debug("Service \(index + 1): \(result.endpoint)") } // If we have results, select the most appropriate one if !results.isEmpty { // First, try to find a service with "iMCP" in the endpoint description let imcpServices = results.filter { String(describing: $0.endpoint).contains("iMCP") } let selectedService: NWBrowser.Result if !imcpServices.isEmpty { // Prefer services with iMCP in the description selectedService = imcpServices.first! await log.info( "Selected iMCP service: \(selectedService.endpoint)") } else { // Fall back to the first available service selectedService = results.first! await log.info( "No specific iMCP service found, using: \(selectedService.endpoint)" ) } if await connectionState.checkAndSetResumed() { timeoutTask.cancel() await log.info("Selected endpoint: \(selectedService.endpoint)") continuation.resume(returning: selectedService.endpoint) } } } } Task { await log.info("Starting Bonjour browser to discover MCP services...") } browser.start(queue: .main) } await log.info("Creating connection to endpoint...") // Create the proxy let proxy = StdioProxy( endpoint: endpoint, parameters: parameters, stdinBufferSize: 10 * 1024 * 1024, // 10MB for large responses networkBufferSize: 10 * 1024 * 1024 // 10MB for large responses ) self.currentProxy = proxy do { try await proxy.start() } catch let error as StdioProxyError { switch error { // Removed stdinTimeout case as it's no longer thrown // case .stdinTimeout: // await log.info("Stdin timed out, will reconnect...") // try await Task.sleep(for: .seconds(1)) // continue case .networkTimeout: await log.info("Network timed out, will reconnect...") try await Task.sleep(for: .seconds(1)) continue case .connectionClosed: await log.critical("Connection closed, terminating...") return } } catch let error as NWError where error.errorCode == 54 || error.errorCode == 57 { // Handle connection reset by peer (54) or socket not connected (57) await log.critical("Network connection terminated: \(error), shutting down...") return } catch { // Rethrow other errors to be handled by the outer catch block throw error } } catch { // Handle all other errors with retry await log.error("Connection error: \(error)") await log.info("Will retry connection in 5 seconds...") try await Task.sleep(for: .seconds(5)) } } } func shutdown() async throws { browser.cancel() if let proxy = currentProxy { await proxy.stop() } } } // Update the ServiceLifecycle initialization let lifecycle = ServiceGroup( configuration: .init( services: [MCPService()], logger: log ) ) try await lifecycle.run()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/mattt/iMCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server