/**
* WebSocket Subscriptions Module
* Real-time blockchain data streams for prices, transactions, and blocks
*
* @author nich
* @github github.com/nirholas
* @license Apache-2.0
*/
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js"
import { z } from "zod"
// Active subscriptions tracking
const activeSubscriptions = new Map<
string,
{
type: string
symbol?: string
address?: string
startTime: Date
lastUpdate?: Date
updateCount: number
}
>()
// Price feed storage (simulated real-time)
const priceFeeds = new Map<string, { price: number; timestamp: Date }>()
export function registerWebSockets(server: McpServer) {
// Subscribe to price updates
server.tool(
"ws_subscribe_price",
"Subscribe to real-time price updates for a trading pair",
{
symbol: z.string().describe("Trading pair symbol (e.g., BTC/USDT, ETH/USD)"),
exchange: z
.enum(["binance", "coinbase", "kraken", "aggregate"])
.default("aggregate")
.describe("Exchange source or aggregate"),
},
async ({ symbol, exchange }) => {
const subscriptionId = `price_${symbol}_${exchange}_${Date.now()}`
activeSubscriptions.set(subscriptionId, {
type: "price",
symbol,
startTime: new Date(),
updateCount: 0,
})
// In a real implementation, this would connect to exchange WebSockets
// For now, we provide subscription management and polling endpoints
return {
content: [
{
type: "text",
text: JSON.stringify(
{
subscriptionId,
status: "active",
type: "price",
symbol,
exchange,
message:
"Price subscription created. Use ws_get_updates to poll for latest data.",
endpoints: {
binance: `wss://stream.binance.com:9443/ws/${symbol.replace("/", "").toLowerCase()}@ticker`,
coinbase: `wss://ws-feed.exchange.coinbase.com`,
},
},
null,
2
),
},
],
}
}
)
// Subscribe to wallet transactions
server.tool(
"ws_subscribe_wallet",
"Subscribe to transaction notifications for a wallet address",
{
address: z.string().describe("Wallet address to monitor"),
chain: z
.enum(["ethereum", "bsc", "polygon", "arbitrum", "solana", "near", "sui", "aptos"])
.describe("Blockchain network"),
},
async ({ address, chain }) => {
const subscriptionId = `wallet_${chain}_${address.slice(0, 10)}_${Date.now()}`
activeSubscriptions.set(subscriptionId, {
type: "wallet",
address,
startTime: new Date(),
updateCount: 0,
})
const wsEndpoints: Record<string, string> = {
ethereum: "wss://mainnet.infura.io/ws/v3/YOUR_KEY",
bsc: "wss://bsc-ws-node.nariox.org:443",
polygon: "wss://polygon-mainnet.infura.io/ws/v3/YOUR_KEY",
arbitrum: "wss://arb-mainnet.g.alchemy.com/v2/YOUR_KEY",
solana: "wss://api.mainnet-beta.solana.com",
near: "wss://near-explorer-mainnet-api.near.org",
sui: "wss://fullnode.mainnet.sui.io",
aptos: "wss://fullnode.mainnet.aptoslabs.com/v1/stream",
}
return {
content: [
{
type: "text",
text: JSON.stringify(
{
subscriptionId,
status: "active",
type: "wallet",
address,
chain,
message: "Wallet subscription created. Monitoring incoming/outgoing transactions.",
endpoint: wsEndpoints[chain],
},
null,
2
),
},
],
}
}
)
// Subscribe to new blocks
server.tool(
"ws_subscribe_blocks",
"Subscribe to new block notifications",
{
chain: z
.enum(["ethereum", "bsc", "polygon", "arbitrum", "solana", "near", "cosmos", "sui"])
.describe("Blockchain to monitor"),
},
async ({ chain }) => {
const subscriptionId = `blocks_${chain}_${Date.now()}`
activeSubscriptions.set(subscriptionId, {
type: "blocks",
startTime: new Date(),
updateCount: 0,
})
return {
content: [
{
type: "text",
text: JSON.stringify(
{
subscriptionId,
status: "active",
type: "blocks",
chain,
message: "Block subscription created. You will receive new block headers.",
avgBlockTime: {
ethereum: "12s",
bsc: "3s",
polygon: "2s",
arbitrum: "250ms",
solana: "400ms",
near: "1s",
cosmos: "6s",
sui: "400ms",
}[chain],
},
null,
2
),
},
],
}
}
)
// Subscribe to liquidity pool events
server.tool(
"ws_subscribe_pool",
"Subscribe to DEX liquidity pool events (swaps, adds, removes)",
{
poolAddress: z.string().describe("Liquidity pool contract address"),
chain: z
.enum(["ethereum", "bsc", "polygon", "arbitrum"])
.describe("Blockchain network"),
events: z
.array(z.enum(["swap", "mint", "burn", "sync"]))
.default(["swap"])
.describe("Event types to monitor"),
},
async ({ poolAddress, chain, events }) => {
const subscriptionId = `pool_${chain}_${poolAddress.slice(0, 10)}_${Date.now()}`
activeSubscriptions.set(subscriptionId, {
type: "pool",
address: poolAddress,
startTime: new Date(),
updateCount: 0,
})
return {
content: [
{
type: "text",
text: JSON.stringify(
{
subscriptionId,
status: "active",
type: "pool",
poolAddress,
chain,
events,
message: "Pool subscription created. Monitoring specified events.",
eventSignatures: {
swap: "Swap(address,uint256,uint256,uint256,uint256,address)",
mint: "Mint(address,uint256,uint256)",
burn: "Burn(address,uint256,uint256,address)",
sync: "Sync(uint112,uint112)",
},
},
null,
2
),
},
],
}
}
)
// Get latest updates for subscription
server.tool(
"ws_get_updates",
"Get latest updates for a subscription",
{
subscriptionId: z.string().describe("Subscription ID returned from subscribe call"),
},
async ({ subscriptionId }) => {
const subscription = activeSubscriptions.get(subscriptionId)
if (!subscription) {
return {
content: [
{
type: "text",
text: JSON.stringify(
{
error: "Subscription not found",
subscriptionId,
activeSubscriptions: Array.from(activeSubscriptions.keys()),
},
null,
2
),
},
],
isError: true,
}
}
subscription.lastUpdate = new Date()
subscription.updateCount++
// Simulated update data based on subscription type
let updateData: any = {}
if (subscription.type === "price" && subscription.symbol) {
// Simulate price update
const basePrice = subscription.symbol.includes("BTC") ? 95000 : 3500
const variation = (Math.random() - 0.5) * basePrice * 0.002
updateData = {
symbol: subscription.symbol,
price: basePrice + variation,
change24h: (Math.random() - 0.5) * 5,
volume24h: Math.random() * 1000000000,
timestamp: new Date().toISOString(),
}
} else if (subscription.type === "wallet") {
updateData = {
address: subscription.address,
recentTransactions: [],
pendingCount: 0,
message: "No new transactions since last check",
}
} else if (subscription.type === "blocks") {
updateData = {
latestBlock: Math.floor(20000000 + Math.random() * 1000),
timestamp: new Date().toISOString(),
transactions: Math.floor(Math.random() * 500),
}
}
return {
content: [
{
type: "text",
text: JSON.stringify(
{
subscriptionId,
subscription: {
type: subscription.type,
symbol: subscription.symbol,
address: subscription.address,
startTime: subscription.startTime,
updateCount: subscription.updateCount,
},
data: updateData,
},
null,
2
),
},
],
}
}
)
// List active subscriptions
server.tool(
"ws_list_subscriptions",
"List all active WebSocket subscriptions",
{},
async () => {
const subscriptions = Array.from(activeSubscriptions.entries()).map(([id, sub]) => ({
subscriptionId: id,
type: sub.type,
symbol: sub.symbol,
address: sub.address,
startTime: sub.startTime,
lastUpdate: sub.lastUpdate,
updateCount: sub.updateCount,
uptime: Math.floor((Date.now() - sub.startTime.getTime()) / 1000) + "s",
}))
return {
content: [
{
type: "text",
text: JSON.stringify(
{
activeCount: subscriptions.length,
subscriptions,
},
null,
2
),
},
],
}
}
)
// Unsubscribe
server.tool(
"ws_unsubscribe",
"Unsubscribe from a WebSocket subscription",
{
subscriptionId: z.string().describe("Subscription ID to cancel"),
},
async ({ subscriptionId }) => {
const existed = activeSubscriptions.delete(subscriptionId)
return {
content: [
{
type: "text",
text: JSON.stringify(
{
subscriptionId,
unsubscribed: existed,
message: existed
? "Successfully unsubscribed"
: "Subscription not found (may have already been removed)",
remainingSubscriptions: activeSubscriptions.size,
},
null,
2
),
},
],
}
}
)
// Unsubscribe all
server.tool(
"ws_unsubscribe_all",
"Unsubscribe from all active subscriptions",
{},
async () => {
const count = activeSubscriptions.size
activeSubscriptions.clear()
return {
content: [
{
type: "text",
text: JSON.stringify(
{
unsubscribedCount: count,
message: `Cleared all ${count} subscriptions`,
},
null,
2
),
},
],
}
}
)
}