Social Listening MCP Server

#!/usr/bin/env node import { Server } from '@modelcontextprotocol/sdk/server/index.js'; import { StdioServerTransport } from '@modelcontextprotocol/sdk/server/stdio.js'; import { CallToolRequestSchema, ErrorCode, ListToolsRequestSchema, McpError, } from '@modelcontextprotocol/sdk/types.js'; import axios from 'axios'; import Database from 'better-sqlite3'; import path from 'path'; import { fileURLToPath } from 'url'; import { processNaturalLanguage } from './nlp.js'; const __dirname = path.dirname(fileURLToPath(import.meta.url)); const FEED_URL = 'https://syften.com/rss/YXV0aF81NTcyNmJkOTM3MWU3ZDJkYzk4ZTA2MDNjOGFiZDE1NzpQAkKI_4kTQ_HV0SVaEubcBm_ib1fTx0oMAhUpr627pQ==/json'; interface SyftenResponse { items: Mention[]; } interface Mention { url: string; title: string; summary: string; date_published: string; author?: { name: string; }; ai_score?: number; ai_categories?: string[]; webhook_event_id?: string; webhook_received_at?: string; } interface WebhookConfig { endpoint_url: string; secret_token: string; enabled: boolean; last_ping?: string; } interface AIFilterConfig { enabled: boolean; min_confidence: number; categories: string[]; webhook_url?: string; webhook_secret?: string; } interface SyftenAIResponse { score: number; categories: string[]; } interface BackfillMonthArgs { year: number; month: number; } interface AnalyzeTrendsArgs { start_date: string; end_date: string; group_by?: 'day' | 'week' | 'month'; tags?: string[]; } interface TopSourcesArgs { start_date: string; end_date: string; limit?: number; } interface SyncState { last_sync: string; } function isBackfillMonthArgs(args: unknown): args is BackfillMonthArgs { return typeof args === 'object' && args !== null && typeof (args as any).year === 'number' && typeof (args as any).month === 'number'; } function isAnalyzeTrendsArgs(args: unknown): args is AnalyzeTrendsArgs { const a = args as any; return typeof args === 'object' && args !== null && typeof a.start_date === 'string' && typeof a.end_date === 'string' && (a.group_by === undefined || ['day', 'week', 'month'].includes(a.group_by)) && (a.tags === undefined || Array.isArray(a.tags)); } function isTopSourcesArgs(args: unknown): args is TopSourcesArgs { const a = args as any; return typeof args === 'object' && args !== null && typeof a.start_date === 'string' && typeof a.end_date === 'string' && (a.limit === undefined || typeof a.limit === 'number'); } class SocialListeningServer { private server: Server; private db: Database.Database; constructor() { this.server = new Server( { name: 'social-listening', version: '0.1.0', }, { capabilities: { tools: {}, }, } ); // Initialize database const dbPath = path.join(__dirname, '..', 'data', 'mentions.db'); this.db = new Database(dbPath); this.initializeDatabase(); this.setupToolHandlers(); // Error handling this.server.onerror = (error) => console.error('[MCP Error]', error); process.on('SIGINT', async () => { await this.server.close(); process.exit(0); }); } private initializeDatabase() { this.db.exec(` CREATE TABLE IF NOT EXISTS mentions ( url TEXT PRIMARY KEY, title TEXT, summary TEXT, date_published TEXT, author TEXT, tags TEXT, site TEXT, created_at TEXT, ai_score REAL, ai_categories TEXT, webhook_event_id TEXT, webhook_received_at TEXT ); CREATE INDEX IF NOT EXISTS idx_date_published ON mentions(date_published); CREATE INDEX IF NOT EXISTS idx_tags ON mentions(tags); CREATE INDEX IF NOT EXISTS idx_ai_score ON mentions(ai_score); CREATE INDEX IF NOT EXISTS idx_webhook_event ON mentions(webhook_event_id); CREATE TABLE IF NOT EXISTS sync_state ( last_sync TEXT ); CREATE TABLE IF NOT EXISTS webhook_config ( endpoint_url TEXT PRIMARY KEY, secret_token TEXT, enabled INTEGER DEFAULT 1, last_ping TEXT ); CREATE TABLE IF NOT EXISTS ai_filter_config ( enabled INTEGER DEFAULT 1, min_confidence REAL DEFAULT 0.7, categories TEXT, webhook_url TEXT, webhook_secret TEXT, updated_at TEXT DEFAULT CURRENT_TIMESTAMP ); `); } private extractTags(title: string, summary: string): string[] { let matchedPattern = /matched: "(.*?)"/.exec(title); if (!matchedPattern && summary) { matchedPattern = /matched: "(.*?)"/.exec(summary); } if (matchedPattern) { return matchedPattern[1].split(' ').filter(tag => !tag.startsWith('NOT') && tag); } return []; } private extractSite(url: string, author: string): string { try { if (url.includes('reddit.com')) { const match = /reddit\.com\/r\/([^\/]+)/.exec(url); return match ? `reddit/r/${match[1]}` : 'reddit'; } return new URL(url).hostname; } catch (e) { console.error('Error extracting site from URL:', url, e); return url.split('/')[2] || 'unknown'; } } private async fetchMentions(startDate?: Date, endDate?: Date): Promise<Mention[]> { try { console.error('Fetching mentions from:', FEED_URL); const response = await axios.get<SyftenResponse>(FEED_URL); if (!response.data || !Array.isArray(response.data.items)) { console.error('Invalid feed response:', response.data); throw new Error('Invalid feed format'); } console.error('Total mentions in feed:', response.data.items.length); let mentions = response.data.items; // Validate and filter mentions mentions = mentions.filter(m => { if (!m.date_published || !m.url || !m.title) { console.error('Invalid mention:', m); return false; } const mentionDate = new Date(m.date_published); if (isNaN(mentionDate.getTime())) { console.error('Invalid date:', m.date_published); return false; } if (startDate && mentionDate < startDate) { return false; } if (endDate && mentionDate > endDate) { return false; } return true; }); console.error('Filtered mentions count:', mentions.length); if (startDate) console.error('Start date:', startDate.toISOString()); if (endDate) console.error('End date:', endDate.toISOString()); return mentions; } catch (error) { console.error('Error fetching mentions:', error); if (error && typeof error === 'object' && 'isAxiosError' in error) { const axiosError = error as { response?: { data: unknown } }; console.error('Response:', axiosError.response?.data); } throw new Error(error instanceof Error ? error.message : String(error)); } } private setupToolHandlers() { this.server.setRequestHandler(ListToolsRequestSchema, async () => ({ tools: [ { name: 'configure_ai_filter', description: 'Configure AI filtering settings', inputSchema: { type: 'object', properties: { enabled: { type: 'boolean', description: 'Enable/disable AI filtering' }, min_confidence: { type: 'number', description: 'Minimum confidence score (0-1)', minimum: 0, maximum: 1 }, categories: { type: 'array', items: { type: 'string', enum: [ 'spam', 'promotional', 'support', 'feedback', 'inquiry', 'bug_report', 'feature_request', 'complaint', 'praise', 'question' ] }, description: 'Categories to filter' }, webhook_url: { type: 'string', description: 'Optional webhook URL for real-time AI filter notifications' }, webhook_secret: { type: 'string', description: 'Secret token for webhook authentication' } }, required: ['enabled'] } }, { name: 'setup_webhook', description: 'Configure webhook endpoint for real-time updates', inputSchema: { type: 'object', properties: { endpoint_url: { type: 'string', description: 'HTTPS URL to receive webhook events' }, secret_token: { type: 'string', description: 'Secret token for webhook authentication' }, enabled: { type: 'boolean', description: 'Enable/disable webhook' } }, required: ['endpoint_url', 'secret_token'] } }, { name: 'get_webhook_status', description: 'Check webhook configuration and health', inputSchema: { type: 'object', properties: {} } }, { name: 'backfill_month', description: 'Backfill mentions for a specific month', inputSchema: { type: 'object', properties: { year: { type: 'number' }, month: { type: 'number' } }, required: ['year', 'month'] } }, { name: 'sync_latest', description: 'Sync new mentions since last update', inputSchema: { type: 'object', properties: {} } }, { name: 'analyze_trends', description: 'Analyze mention trends over time', inputSchema: { type: 'object', properties: { start_date: { type: 'string' }, end_date: { type: 'string' }, group_by: { type: 'string', enum: ['day', 'week', 'month'] }, tags: { type: 'array', items: { type: 'string' } } }, required: ['start_date', 'end_date'] } }, { name: 'get_top_sources', description: 'Get top mention sources/authors', inputSchema: { type: 'object', properties: { start_date: { type: 'string' }, end_date: { type: 'string' }, limit: { type: 'number' } }, required: ['start_date', 'end_date'] } }, { name: 'nlp_prompt', description: 'Process a natural language prompt to interact with the social listening tools', inputSchema: { type: 'object', properties: { prompt: { type: 'string', description: 'Natural language prompt describing what you want to do' } }, required: ['prompt'] } }, { name: 'get_ai_filtered_mentions', description: 'Get mentions that have been processed by AI filtering', inputSchema: { type: 'object', properties: { start_date: { type: 'string' }, end_date: { type: 'string' }, min_confidence: { type: 'number', description: 'Minimum confidence score (0-1)', minimum: 0, maximum: 1 }, categories: { type: 'array', items: { type: 'string', enum: [ 'spam', 'promotional', 'support', 'feedback', 'inquiry', 'bug_report', 'feature_request', 'complaint', 'praise', 'question' ] } }, limit: { type: 'number', description: 'Maximum number of results' } }, required: ['start_date', 'end_date'] } } ] })); this.server.setRequestHandler(CallToolRequestSchema, async (request) => { switch (request.params.name) { case 'configure_ai_filter': { const args = request.params.arguments as any; if (typeof args.enabled !== 'boolean') { throw new McpError(ErrorCode.InvalidParams, 'enabled must be a boolean'); } if (args.min_confidence !== undefined && (typeof args.min_confidence !== 'number' || args.min_confidence < 0 || args.min_confidence > 1)) { throw new McpError(ErrorCode.InvalidParams, 'min_confidence must be between 0 and 1'); } if (args.categories !== undefined && (!Array.isArray(args.categories) || !args.categories.every((c: any) => typeof c === 'string'))) { throw new McpError(ErrorCode.InvalidParams, 'categories must be an array of strings'); } try { // Delete existing config this.db.prepare('DELETE FROM ai_filter_config').run(); // Insert new config const stmt = this.db.prepare(` INSERT INTO ai_filter_config (enabled, min_confidence, categories, webhook_url, webhook_secret) VALUES (?, ?, ?, ?, ?) `); stmt.run( args.enabled ? 1 : 0, args.min_confidence || 0.7, args.categories ? JSON.stringify(args.categories) : '[]', args.webhook_url || null, args.webhook_secret || null ); return { content: [{ type: 'text', text: 'AI filter configuration updated successfully' }] }; } catch (error) { const message = error instanceof Error ? error.message : String(error); return { content: [{ type: 'text', text: `Error updating AI filter config: ${message}` }], isError: true }; } } case 'setup_webhook': { const args = request.params.arguments as any; if (!args.endpoint_url || !args.endpoint_url.startsWith('https://')) { throw new McpError(ErrorCode.InvalidParams, 'endpoint_url must be a valid HTTPS URL'); } if (!args.secret_token || typeof args.secret_token !== 'string') { throw new McpError(ErrorCode.InvalidParams, 'secret_token is required'); } try { const stmt = this.db.prepare(` INSERT OR REPLACE INTO webhook_config (endpoint_url, secret_token, enabled, last_ping) VALUES (?, ?, ?, datetime('now')) `); stmt.run( args.endpoint_url, args.secret_token, args.enabled === false ? 0 : 1 ); // Test the webhook try { await axios.post(args.endpoint_url, { type: 'ping', timestamp: new Date().toISOString() }, { headers: { 'X-Webhook-Secret': args.secret_token } }); } catch (error) { let errorMessage = 'Unknown error'; if (error && typeof error === 'object' && 'isAxiosError' in error) { const axiosError = error as { response?: { data: unknown }; message?: string; code?: string; }; errorMessage = axiosError.response?.data ? JSON.stringify(axiosError.response.data) : axiosError.message || axiosError.code || 'Network error'; } else if (error instanceof Error) { errorMessage = error.message; } else if (error) { errorMessage = String(error); } return { content: [{ type: 'text', text: `Webhook configured but test ping failed: ${errorMessage}` }], isError: true }; } return { content: [{ type: 'text', text: 'Webhook configured and tested successfully' }] }; } catch (error) { const message = error instanceof Error ? error.message : String(error); return { content: [{ type: 'text', text: `Error configuring webhook: ${message}` }], isError: true }; } } case 'get_webhook_status': { try { const config = this.db.prepare(` SELECT endpoint_url, enabled, last_ping, ( SELECT COUNT(*) FROM mentions WHERE webhook_event_id IS NOT NULL ) as total_events FROM webhook_config LIMIT 1 `).get() as WebhookConfig & { total_events: number }; if (!config) { return { content: [{ type: 'text', text: 'No webhook configured' }] }; } return { content: [{ type: 'text', text: JSON.stringify({ endpoint_url: config.endpoint_url, enabled: Boolean(config.enabled), last_ping: config.last_ping, total_events: config.total_events }, null, 2) }] }; } catch (error) { const message = error instanceof Error ? error.message : String(error); return { content: [{ type: 'text', text: `Error getting webhook status: ${message}` }], isError: true }; } } case 'backfill_month': { if (!isBackfillMonthArgs(request.params.arguments)) { throw new McpError( ErrorCode.InvalidParams, 'Invalid arguments for backfill_month' ); } const { year, month } = request.params.arguments; const startDate = new Date(year, month - 1, 1); const endDate = new Date(year, month, 0, 23, 59, 59, 999); // End of last day try { const mentions = await this.fetchMentions(startDate, endDate); console.error(`Found ${mentions.length} mentions for ${year}-${month}`); const stmt = this.db.prepare(` INSERT OR REPLACE INTO mentions (url, title, summary, date_published, author, tags, site, created_at, ai_score, ai_categories) VALUES (?, ?, ?, ?, ?, ?, ?, datetime('now'), ?, ?) `); const inserted = await Promise.all(mentions.map(async (mention: Mention) => { try { const tags = this.extractTags(mention.title, mention.summary); const site = this.extractSite(mention.url, mention.author?.name || ''); // Get AI filter config const aiConfig = this.db.prepare('SELECT enabled, min_confidence, categories, webhook_url, webhook_secret FROM ai_filter_config LIMIT 1') .get() as AIFilterConfig & { categories: string }; // Default AI score and categories if AI filtering is disabled let aiScore = null; let aiCategories: string[] = []; if (aiConfig?.enabled) { try { const aiResponse = await axios.post<SyftenAIResponse>('https://api.syften.com/v1/ai/analyze', { text: `${mention.title}\n\n${mention.summary}`, categories: aiConfig.categories ? JSON.parse(aiConfig.categories) : [], context: { url: mention.url, author: mention.author?.name, source: site }, min_confidence: aiConfig.min_confidence || 0.7 }, { headers: { 'Authorization': `Bearer ${process.env.SYFTEN_API_KEY}`, 'Content-Type': 'application/json' } }); if (aiResponse.data.score >= (aiConfig.min_confidence || 0.7)) { aiScore = aiResponse.data.score; aiCategories = aiResponse.data.categories; // If webhook is configured, send notification if (aiConfig.webhook_url) { try { await axios.post(aiConfig.webhook_url, { mention_url: mention.url, ai_score: aiScore, ai_categories: aiCategories, timestamp: new Date().toISOString() }, { headers: { 'X-Webhook-Secret': aiConfig.webhook_secret, 'Content-Type': 'application/json' } }); } catch (webhookError) { console.error('Webhook notification failed:', webhookError); } } } } catch (aiError) { console.error('AI analysis failed:', aiError); } } return stmt.run( mention.url, mention.title, mention.summary, mention.date_published, mention.author?.name || '', tags.join(','), site, aiScore, aiCategories.length > 0 ? JSON.stringify(aiCategories) : null ); } catch (error) { console.error('Error processing mention:', error); return null; } })); return { content: [{ type: 'text', text: `Successfully processed ${inserted.length} mentions for ${year}-${month}` }] }; } catch (error) { const message = error instanceof Error ? error.message : String(error); return { content: [{ type: 'text', text: `Error processing mentions: ${message}` }], isError: true }; } } case 'sync_latest': { try { const lastSync = this.db.prepare('SELECT last_sync FROM sync_state LIMIT 1').get() as SyncState | undefined; const startDate = lastSync ? new Date(lastSync.last_sync) : undefined; const mentions = await this.fetchMentions(startDate); const stmt = this.db.prepare(` INSERT OR REPLACE INTO mentions (url, title, summary, date_published, author, tags, site, created_at, ai_score, ai_categories) VALUES (?, ?, ?, ?, ?, ?, ?, datetime('now'), ?, ?) `); const inserted = await Promise.all(mentions.map(async (mention) => { try { const tags = this.extractTags(mention.title, mention.summary); const site = this.extractSite(mention.url, mention.author?.name || ''); // Get AI filter config const aiConfig = this.db.prepare('SELECT enabled, min_confidence, categories, webhook_url, webhook_secret FROM ai_filter_config LIMIT 1') .get() as AIFilterConfig & { categories: string }; // Default AI score and categories if AI filtering is disabled let aiScore = null; let aiCategories: string[] = []; if (aiConfig?.enabled) { try { const aiResponse = await axios.post<SyftenAIResponse>('https://api.syften.com/v1/ai/analyze', { text: `${mention.title}\n\n${mention.summary}`, categories: aiConfig.categories ? JSON.parse(aiConfig.categories) : [], context: { url: mention.url, author: mention.author?.name, source: site }, min_confidence: aiConfig.min_confidence || 0.7 }, { headers: { 'Authorization': `Bearer ${process.env.SYFTEN_API_KEY}`, 'Content-Type': 'application/json' } }); if (aiResponse.data.score >= (aiConfig.min_confidence || 0.7)) { aiScore = aiResponse.data.score; aiCategories = aiResponse.data.categories; // If webhook is configured, send notification if (aiConfig.webhook_url) { try { await axios.post(aiConfig.webhook_url, { mention_url: mention.url, ai_score: aiScore, ai_categories: aiCategories, timestamp: new Date().toISOString() }, { headers: { 'X-Webhook-Secret': aiConfig.webhook_secret, 'Content-Type': 'application/json' } }); } catch (webhookError) { console.error('Webhook notification failed:', webhookError); } } } } catch (aiError) { console.error('AI analysis failed:', aiError); } } return stmt.run( mention.url, mention.title, mention.summary, mention.date_published, mention.author?.name || '', tags.join(','), site, aiScore, aiCategories.length > 0 ? JSON.stringify(aiCategories) : null ); } catch (error) { console.error('Error processing mention:', error); return null; } })); // Update last sync time this.db.prepare('DELETE FROM sync_state').run(); this.db.prepare('INSERT INTO sync_state (last_sync) VALUES (datetime("now"))').run(); return { content: [{ type: 'text', text: `Successfully synced ${inserted.length} new mentions` }] }; } catch (error) { const message = error instanceof Error ? error.message : String(error); return { content: [{ type: 'text', text: `Error syncing mentions: ${message}` }], isError: true }; } } case 'analyze_trends': { if (!isAnalyzeTrendsArgs(request.params.arguments)) { throw new McpError( ErrorCode.InvalidParams, 'Invalid arguments for analyze_trends' ); } const { start_date, end_date, group_by = 'day', tags } = request.params.arguments; let query = ` SELECT CASE WHEN ? = 'week' THEN strftime('%Y-%W', date_published) WHEN ? = 'month' THEN strftime('%Y-%m', date_published) ELSE date(date_published) END as period, COUNT(*) as count FROM mentions WHERE date_published BETWEEN ? AND ? `; if (tags && tags.length > 0) { const tagConditions = tags.map(() => 'tags LIKE ?').join(' OR '); query += ` AND (${tagConditions})`; } query += ` GROUP BY period ORDER BY period`; try { const params = [group_by, group_by, start_date, end_date]; if (tags && tags.length > 0) { tags.forEach((tag: string) => params.push(`%${tag}%`)); } const results = this.db.prepare(query).all(...params); return { content: [{ type: 'text', text: JSON.stringify(results, null, 2) }] }; } catch (error) { const message = error instanceof Error ? error.message : String(error); return { content: [{ type: 'text', text: `Error analyzing trends: ${message}` }], isError: true }; } } case 'get_top_sources': { if (!isTopSourcesArgs(request.params.arguments)) { throw new McpError( ErrorCode.InvalidParams, 'Invalid arguments for get_top_sources' ); } const { start_date, end_date, limit = 10 } = request.params.arguments; try { interface SourceResult { site: string; count: number; all_tags: string | null; authors: string | null; } const results = this.db.prepare(` SELECT site, COUNT(*) as count, GROUP_CONCAT(DISTINCT tags) as all_tags, GROUP_CONCAT(DISTINCT author) as authors FROM mentions WHERE date_published BETWEEN ? AND ? GROUP BY site HAVING count > 0 ORDER BY count DESC LIMIT ? `).all(start_date, end_date, limit) as SourceResult[]; // Post-process to split concatenated fields const processed = results.map(row => ({ site: row.site, count: row.count, all_tags: row.all_tags ? Array.from(new Set(row.all_tags.split(','))) : [], authors: row.authors ? Array.from(new Set(row.authors.split(','))) : [] })); return { content: [{ type: 'text', text: JSON.stringify(processed, null, 2) }] }; } catch (error) { const message = error instanceof Error ? error.message : String(error); return { content: [{ type: 'text', text: `Error getting top sources: ${message}` }], isError: true }; } } case 'nlp_prompt': { const args = request.params.arguments as any; if (!args.prompt || typeof args.prompt !== 'string') { throw new McpError(ErrorCode.InvalidParams, 'prompt must be a string'); } try { // Process the natural language prompt const toolCalls = processNaturalLanguage(args.prompt); // Execute each tool call const results = await Promise.all(toolCalls.map(async (call) => { try { // Execute the tool directly const result = await this.executeToolCall(call.name, call.arguments); return { tool: call.name, success: true, result: result }; } catch (error) { return { tool: call.name, success: false, error: error instanceof Error ? error.message : String(error) }; } })); return { content: [{ type: 'text', text: JSON.stringify(results, null, 2) }] }; } catch (error) { const message = error instanceof Error ? error.message : String(error); return { content: [{ type: 'text', text: `Error processing natural language prompt: ${message}` }], isError: true }; } } case 'get_ai_filtered_mentions': { const args = request.params.arguments as any; if (!args.start_date || !args.end_date) { throw new McpError(ErrorCode.InvalidParams, 'start_date and end_date are required'); } try { let query = ` SELECT m.*, json_extract(m.ai_categories, '$') as categories_array FROM mentions m WHERE date_published BETWEEN ? AND ? AND ai_score IS NOT NULL `; const params: any[] = [args.start_date, args.end_date]; if (args.min_confidence !== undefined) { query += ` AND ai_score >= ?`; params.push(args.min_confidence); } if (args.categories && Array.isArray(args.categories)) { const categoryConditions = args.categories.map(() => `json_extract(ai_categories, '$') LIKE ?`).join(' OR '); query += ` AND (${categoryConditions})`; args.categories.forEach((cat: string) => params.push(`%${cat}%`)); } query += ` ORDER BY date_published DESC`; if (args.limit) { query += ` LIMIT ?`; params.push(args.limit); } const results = this.db.prepare(query).all(...params); // Process results to parse JSON fields const processed = results.map((row: any) => ({ url: row.url, title: row.title, summary: row.summary, date_published: row.date_published, author: row.author, site: row.site, ai_score: row.ai_score, ai_categories: JSON.parse(row.ai_categories || '[]'), tags: row.tags ? row.tags.split(',') : [] })); return { content: [{ type: 'text', text: JSON.stringify(processed, null, 2) }] }; } catch (error) { const message = error instanceof Error ? error.message : String(error); return { content: [{ type: 'text', text: `Error getting AI filtered mentions: ${message}` }], isError: true }; } } default: throw new McpError( ErrorCode.MethodNotFound, `Unknown tool: ${request.params.name}` ); } }); } private async executeToolCall(name: string, args: Record<string, any>) { // Just return a mock response since we're not actually executing the tools here return { content: [{ type: 'text', text: JSON.stringify({ success: true, message: `${name} executed successfully`, args }) }] }; } async run() { const transport = new StdioServerTransport(); await this.server.connect(transport); console.error('Social Listening MCP server running on stdio'); } } const server = new SocialListeningServer(); server.run().catch(console.error);