index.ts•11.1 kB
import axios from 'axios';
import { parseStringPromise } from 'xml2js';
import fs from 'fs';
import path from 'path';
import os from 'os';
import { FeedItem, ParsedFeed, DbFeed, DbFeedItem, ApiResponse } from 'shared-types';
import db from 'data-access';
// Ensure database directory exists
const DB_DIR = process.env.DB_DIR || path.join(os.homedir(), '.mcp-rss-crawler');
if (!fs.existsSync(DB_DIR)) {
  fs.mkdirSync(DB_DIR, { recursive: true });
}
/**
 * Parse an RSS feed and convert it to the standard format
 */
export async function parseRssFeed(feedUrl: string): Promise<ParsedFeed> {
  try {
    // Fetch the RSS feed
    const response = await axios.get(feedUrl, {
      headers: {
        'User-Agent': 'Mozilla/5.0 (compatible; RSSManager/1.0)'
      }
    });
    // Parse the XML
    const result = await parseStringPromise(response.data, {
      explicitArray: false,
      mergeAttrs: true
    });
    // Handle different RSS formats
    let channel;
    let items = [];
    let feedTitle = '';
    let feedDescription = '';
    let feedLink = '';
    // Regular RSS
    if (result.rss?.channel) {
      channel = result.rss.channel;
      items = Array.isArray(channel.item) ? channel.item : channel.item ? [channel.item] : [];
      feedTitle = channel.title || '';
      feedDescription = channel.description || '';
      feedLink = channel.link || '';
    } 
    // Atom
    else if (result.feed) {
      channel = result.feed;
      items = Array.isArray(channel.entry) ? channel.entry : channel.entry ? [channel.entry] : [];
      feedTitle = channel.title || '';
      feedDescription = channel.subtitle || '';
      feedLink = channel.link?.href || channel.link || '';
    } 
    // RDF
    else if (result.rdf?.channel) {
      channel = result.rdf.channel;
      items = Array.isArray(result.rdf.item) ? result.rdf.item : result.rdf.item ? [result.rdf.item] : [];
      feedTitle = channel.title || '';
      feedDescription = channel.description || '';
      feedLink = channel.link || '';
    }
    // Other formats
    else if (result['rdf:RDF']) {
      const rdf = result['rdf:RDF'];
      channel = rdf.channel || rdf['channel:channel'] || {};
      items = Array.isArray(rdf.item) ? rdf.item : rdf.item ? [rdf.item] : [];
      feedTitle = channel.title || '';
      feedDescription = channel.description || '';
      feedLink = channel.link || '';
    }
    
    if (!channel || items.length === 0) {
      console.error('Unsupported RSS format or no items found:', Object.keys(result));
      return { title: 'Unknown Feed', items: [] };
    }
    // Convert items to standard format
    const standardItems = items.map((item: any) => {
      // Get the title
      const title = item.title || '';
      
      // Get the published date
      let published = 0;
      if (item.pubDate) {
        published = Math.floor(new Date(item.pubDate).getTime() / 1000);
      } else if (item.published) {
        published = Math.floor(new Date(item.published).getTime() / 1000);
      } else if (item.updated) {
        published = Math.floor(new Date(item.updated).getTime() / 1000);
      } else if (item['dc:date']) {
        published = Math.floor(new Date(item['dc:date']).getTime() / 1000);
      } else {
        published = Math.floor(Date.now() / 1000);
      }
      
      // Get the updated date (default to published date)
      let updated = published;
      if (item.updated) {
        updated = Math.floor(new Date(item.updated).getTime() / 1000);
      }
      
      // Get the summary/content
      let summary = '';
      if (item.description) {
        summary = item.description;
      } else if (item.summary) {
        summary = item.summary;
      } else if (item.content) {
        summary = item.content;
      } else if (item['content:encoded']) {
        summary = item['content:encoded'];
      }
      
      // Get the author
      let author = '';
      if (item.author) {
        if (typeof item.author === 'string') {
          author = item.author;
        } else if (item.author.name) {
          author = item.author.name;
        }
      } else if (item['dc:creator']) {
        author = item['dc:creator'];
      }
      
      // Get the link
      let link = '';
      if (item.link) {
        if (typeof item.link === 'string') {
          link = item.link;
        } else if (item.link.href) {
          link = item.link.href;
        }
      }
      
      // Get the categories
      let categories: string[] = [];
      if (item.category) {
        if (typeof item.category === 'string') {
          categories = [item.category];
        } else if (Array.isArray(item.category)) {
          categories = item.category.map((cat: any) => 
            typeof cat === 'string' ? cat : cat._ || ''
          ).filter(Boolean);
        }
      }
      
      return {
        id: item.guid || item.id || `${feedUrl}/${title}`,
        title,
        published,
        updated,
        summary: {
          direction: 'ltr',
          content: summary
        },
        author,
        categories,
        origin: {
          streamId: `feed/${Buffer.from(feedUrl).toString('base64').substring(0, 20)}`,
          title: feedTitle,
          htmlUrl: feedLink
        },
        alternate: [{
          href: link,
          type: 'text/html'
        }],
        link
      };
    });
    return {
      title: feedTitle,
      description: feedDescription,
      link: feedLink,
      items: standardItems
    };
  } catch (error) {
    console.error('Error parsing RSS feed:', error);
    return { title: 'Error', items: [] };
  }
}
/**
 * Fetch feeds from all configured RSS sources and save to the database
 * @param itemsPerFeed Number of items to fetch per feed (default: 20)
 * @param dbOptions Database connection options
 */
export async function fetchAllFeeds(
  itemsPerFeed: number = 20, 
  dbOptions = {}
): Promise<ApiResponse<{
  feedsProcessed: number;
  totalItems: number;
}>> {
  try {
    console.log(`Fetching feeds with ${itemsPerFeed} items per feed...`);
    
    // Get all feeds from the database
    const feeds = db.feeds.getAllFeeds(dbOptions);
    
    if (!feeds || feeds.length === 0) {
      console.log('No feeds found in the database');
      return { 
        success: false, 
        error: 'No feeds found in the database',
        data: { feedsProcessed: 0, totalItems: 0 }
      };
    }
    
    console.log(`Found ${feeds.length} feeds to process`);
    
    let totalItems = 0;
    let processedFeeds = 0;
    
    // Process each feed
    for (const feed of feeds) {
      try {
        console.log(`Processing feed: ${feed.name} (${feed.url})`);
        
        // Parse the RSS feed
        const parsedFeed = await parseRssFeed(feed.url);
        
        if (!parsedFeed || !parsedFeed.items || parsedFeed.items.length === 0) {
          console.log(`No items found in feed: ${feed.url}`);
          continue;
        }
        
        // Limit items to the specified number
        const limitedItems = parsedFeed.items.slice(0, itemsPerFeed);
        console.log(`Fetched ${limitedItems.length} items from feed: ${feed.url}`);
        
        // Save each item to the database
        const feedId = feed.id;
        
        // Begin transaction
        const writeDb = db.getWriteDb(dbOptions);
        writeDb.transaction(() => {
          for (const item of limitedItems) {
            // Convert date string to timestamp if needed
            const publishedTime = typeof item.published === 'string' 
              ? new Date(item.published).getTime() / 1000 
              : Math.floor(item.published || Date.now() / 1000);
            
            // Get summary content
            const summary = typeof item.summary === 'string' 
              ? item.summary 
              : item.summary?.content || '';
            
            // Get content
            const content = typeof item.content === 'string'
              ? item.content
              : item.content?.content || '';
            
            // Get link
            const link = typeof item.link === 'string'
              ? item.link
              : item.alternate?.[0]?.href || '';
            
            // Insert the item
            writeDb.prepare(`
              INSERT OR REPLACE INTO items (id, feed_id, title, link, summary, content, published, author)
              VALUES (?, ?, ?, ?, ?, ?, ?, ?)
            `).run(
              item.id,
              feedId,
              item.title || 'Untitled',
              link,
              summary,
              content,
              publishedTime,
              item.author || ''
            );
            
            // Insert categories
            if (item.categories && Array.isArray(item.categories)) {
              for (const category of item.categories) {
                if (category) {
                  writeDb.prepare(`
                    INSERT OR REPLACE INTO categories (item_id, category)
                    VALUES (?, ?)
                  `).run(item.id, category);
                }
              }
            }
          }
          
          // Update the feed's last_updated timestamp
          writeDb.prepare(`
            UPDATE feeds SET last_updated = ? WHERE id = ?
          `).run(Date.now(), feedId);
        })();
        
        totalItems += limitedItems.length;
        processedFeeds++;
      } catch (error) {
        console.error(`Error processing feed ${feed.url}:`, error);
      }
    }
    
    console.log(`Processed ${processedFeeds} feeds with a total of ${totalItems} items`);
    
    return {
      success: true,
      data: {
        feedsProcessed: processedFeeds,
        totalItems
      }
    };
  } catch (error) {
    console.error('Error fetching feeds:', error);
    return {
      success: false,
      error: error instanceof Error ? error.message : String(error),
      data: {
        feedsProcessed: 0,
        totalItems: 0
      }
    };
  }
}
/**
 * Add a new feed to the database
 */
export async function addFeed(
  feedUrl: string, 
  category?: string, 
  dbOptions = {}
): Promise<ApiResponse<DbFeed>> {
  try {
    // Parse the feed to get its title
    const parsedFeed = await parseRssFeed(feedUrl);
    
    if (!parsedFeed || !parsedFeed.title) {
      return {
        success: false,
        error: 'Failed to parse feed or feed has no title',
        data: undefined
      };
    }
    
    // Add the feed to the database
    const feedData = {
      url: feedUrl,
      name: parsedFeed.title,
      category: category || null,
      last_updated: Date.now()
    };
    
    const feedId = db.feeds.addFeed(feedData, dbOptions);
    
    // Return the new feed
    return {
      success: true,
      data: {
        id: feedId,
        ...feedData
      }
    };
  } catch (error) {
    console.error('Error adding feed:', error);
    return {
      success: false,
      error: error instanceof Error ? error.message : String(error),
      data: undefined
    };
  }
}
// Export all functions
export default {
  parseRssFeed,
  fetchAllFeeds,
  addFeed
};