Skip to main content
Glama
UpendraNath

Tavily Web Search MCP Server

by UpendraNath

organize_and_categorize

Clean, deduplicate, and categorize bookmark data to organize web search results into structured folders.

Instructions

Clean, deduplicate, and categorize bookmark data

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
data_folderNodata
output_folderNoorganized

Implementation Reference

  • server.py:38-53 (handler)
    The MCP tool handler function for 'organize_and_categorize'. It uses the BookmarkOrganizer class to perform cleaning, deduplication, categorization, organization, saving, and reporting of bookmark data.
    @mcp.tool()
    def organize_and_categorize(data_folder: str = "data", output_folder: str = "organized") -> str:
        """Clean, deduplicate, and categorize bookmark data"""
        try:
            from bookmark_organizer import BookmarkOrganizer
            organizer = BookmarkOrganizer(data_folder)
            organizer.load_data()
            organizer.clean_and_deduplicate()
            organizer.categorize_and_tag()
            organized_data = organizer.organize_by_category()
            organizer.save_organized_data(organized_data, output_folder)
            
            report = organizer.generate_report(organized_data)
            return f"Organization complete!\n\n{report}"
        except Exception as e:
            return f"Error organizing data: {str(e)}"
  • The BookmarkOrganizer class provides all supporting utility methods used by the tool handler: load_data(), clean_and_deduplicate(), categorize_and_tag(), organize_by_category(), generate_report(), and save_organized_data().
    class BookmarkOrganizer:
        """Main organizer class"""
        
        def __init__(self, data_folder: str = "data"):
            self.data_folder = Path(data_folder)
            self.parser = BookmarkParser()
            self.analyzer = ContentAnalyzer()
            self.deduplicator = Deduplicator()
            self.all_items = []
        
        def load_data(self):
            """Load all bookmark and history data"""
            print("Loading data from files...")
            
            # Load bookmarks
            bookmark_file = self.data_folder / "bookmarks_10_26_25.html"
            if bookmark_file.exists():
                bookmarks = self.parser.parse_netscape_html(str(bookmark_file))
                self.all_items.extend(bookmarks)
                print(f"Loaded {len(bookmarks)} bookmarks")
            
            # Load history
            history_file = self.data_folder / "BrowserHistory_10_26_25.csv"
            if history_file.exists():
                history = self.parser.parse_csv_history(str(history_file))
                self.all_items.extend(history)
                print(f"Loaded {len(history)} history entries")
            
            print(f"Total items loaded: {len(self.all_items)}")
        
        def clean_and_deduplicate(self):
            """Remove duplicates and clean data"""
            print("Cleaning and deduplicating...")
            
            cleaned_items = []
            duplicates_removed = 0
            
            for item in self.all_items:
                if not self.deduplicator.is_duplicate(item['url'], item.get('title', '')):
                    cleaned_items.append(item)
                else:
                    duplicates_removed += 1
            
            self.all_items = cleaned_items
            print(f"Removed {duplicates_removed} duplicates")
            print(f"Clean items remaining: {len(self.all_items)}")
        
        def categorize_and_tag(self):
            """Categorize items and add keywords"""
            print("Categorizing and tagging items...")
            
            for item in self.all_items:
                item['category'] = self.analyzer.categorize_url(item['url'], item.get('title', ''))
                item['keywords'] = self.analyzer.extract_keywords(item['url'], item.get('title', ''))
        
        def organize_by_category(self) -> Dict[str, List[Dict]]:
            """Organize items by category"""
            organized = defaultdict(list)
            
            for item in self.all_items:
                organized[item['category']].append(item)
            
            # Sort each category by domain frequency
            for category, items in organized.items():
                domain_counts = Counter(item['domain'] for item in items)
                items.sort(key=lambda x: domain_counts[x['domain']], reverse=True)
            
            return dict(organized)
        
        def generate_report(self, organized_data: Dict[str, List[Dict]]) -> str:
            """Generate a summary report"""
            report = []
            report.append("=" * 60)
            report.append("BOOKMARK & HISTORY ORGANIZATION REPORT")
            report.append("=" * 60)
            report.append(f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
            report.append("")
            
            # Summary statistics
            total_items = sum(len(items) for items in organized_data.values())
            report.append(f"Total organized items: {total_items}")
            report.append(f"Categories: {len(organized_data)}")
            report.append("")
            
            # Category breakdown
            report.append("CATEGORY BREAKDOWN:")
            report.append("-" * 30)
            
            for category, items in sorted(organized_data.items(), key=lambda x: len(x[1]), reverse=True):
                report.append(f"{category.upper()}: {len(items)} items")
                
                # Top domains in this category
                domain_counts = Counter(item['domain'] for item in items)
                top_domains = domain_counts.most_common(3)
                for domain, count in top_domains:
                    report.append(f"  - {domain}: {count} items")
                report.append("")
            
            return "\n".join(report)
        
        def save_organized_data(self, organized_data: Dict[str, List[Dict]], output_folder: str = "organized"):
            """Save organized data to files"""
            output_path = Path(output_folder)
            output_path.mkdir(exist_ok=True)
            
            print(f"Saving organized data to {output_path}...")
            
            # Save each category to separate JSON file
            for category, items in organized_data.items():
                category_file = output_path / f"{category}.json"
                with open(category_file, 'w', encoding='utf-8') as f:
                    json.dump(items, f, indent=2, ensure_ascii=False)
            
            # Save complete organized data
            complete_file = output_path / "complete_organized.json"
            with open(complete_file, 'w', encoding='utf-8') as f:
                json.dump(organized_data, f, indent=2, ensure_ascii=False)
            
            # Save report
            report_file = output_path / "organization_report.txt"
            report = self.generate_report(organized_data)
            with open(report_file, 'w', encoding='utf-8') as f:
                f.write(report)
            
            print(f"Saved {len(organized_data)} category files")
            print(f"Report saved to {report_file}")
  • server.py:38-38 (registration)
    The @mcp.tool() decorator registers the organize_and_categorize function as an MCP tool.
    @mcp.tool()
  • BookmarkParser class with methods to parse bookmark HTML and history CSV files.
    class BookmarkParser:
        """Parse different bookmark formats"""
        
        @staticmethod
        def parse_netscape_html(file_path: str) -> List[Dict]:
            """Parse Netscape bookmark HTML format"""
            bookmarks = []
            
            with open(file_path, 'r', encoding='utf-8') as f:
                content = f.read()
            
            # Extract bookmark links using regex
            pattern = r'<A HREF="([^"]+)"[^>]*>([^<]+)</A>'
            matches = re.findall(pattern, content)
            
            for url, title in matches:
                # Decode HTML entities
                title = html.unescape(title)
                url = html.unescape(url)
                
                bookmarks.append({
                    'url': url,
                    'title': title,
                    'source': 'bookmarks',
                    'domain': urlparse(url).netloc,
                    'added_date': datetime.now().isoformat()
                })
            
            return bookmarks
        
        @staticmethod
        def parse_csv_history(file_path: str) -> List[Dict]:
            """Parse CSV browser history format"""
            history = []
            
            with open(file_path, 'r', encoding='utf-8-sig') as f:  # Use utf-8-sig to handle BOM
                reader = csv.DictReader(f)
                for row in reader:
                    try:
                        # Handle BOM in column names
                        datetime_key = 'DateTime' if 'DateTime' in row else 'DateTime'
                        history.append({
                            'url': row['NavigatedToUrl'],
                            'title': row['PageTitle'],
                            'source': 'history',
                            'domain': urlparse(row['NavigatedToUrl']).netloc,
                            'visit_date': row[datetime_key]
                        })
                    except KeyError as e:
                        print(f"Warning: Missing column {e} in CSV file")
                        continue
            
            return history
  • Deduplicator class for cleaning duplicate and similar entries.
    class Deduplicator:
        """Remove duplicate and redundant URLs"""
        
        def __init__(self):
            self.seen_urls = set()
            self.seen_titles = set()
        
        def normalize_url(self, url: str) -> str:
            """Normalize URL for comparison"""
            parsed = urlparse(url)
            
            # Remove common tracking parameters
            query_params = parse_qs(parsed.query)
            filtered_params = {}
            
            for key, value in query_params.items():
                if key.lower() not in ['utm_source', 'utm_medium', 'utm_campaign', 'utm_content', 'utm_term', 'fbclid', 'gclid']:
                    filtered_params[key] = value
            
            # Rebuild URL
            new_query = '&'.join([f"{k}={v[0]}" for k, v in filtered_params.items()])
            normalized = f"{parsed.scheme}://{parsed.netloc}{parsed.path}"
            if new_query:
                normalized += f"?{new_query}"
            
            return normalized
        
        def is_duplicate(self, url: str, title: str = "") -> bool:
            """Check if URL is a duplicate"""
            normalized_url = self.normalize_url(url)
            
            # Check for exact URL match
            if normalized_url in self.seen_urls:
                return True
            
            # Check for similar titles (fuzzy matching)
            if title:
                title_lower = title.lower().strip()
                for seen_title in self.seen_titles:
                    if self._similarity(title_lower, seen_title.lower()) > 0.8:
                        return True
            
            # Add to seen sets
            self.seen_urls.add(normalized_url)
            if title:
                self.seen_titles.add(title.lower().strip())
            
            return False
        
        def _similarity(self, s1: str, s2: str) -> float:
            """Calculate similarity between two strings"""
            if not s1 or not s2:
                return 0.0
            
            # Simple Jaccard similarity
            set1 = set(s1.split())
            set2 = set(s2.split())
            
            intersection = len(set1.intersection(set2))
            union = len(set1.union(set2))
            
            return intersection / union if union > 0 else 0.0
  • ContentAnalyzer class for categorization and keyword extraction.
    class ContentAnalyzer:
        """Analyze content and categorize URLs"""
        
        def __init__(self):
            self.categories = {
                'programming': {
                    'keywords': ['github', 'stackoverflow', 'coding', 'programming', 'developer', 'api', 'documentation', 'tutorial', 'code', 'python', 'javascript', 'java', 'react', 'node', 'git'],
                    'domains': ['github.com', 'stackoverflow.com', 'developer.mozilla.org', 'docs.python.org', 'nodejs.org', 'reactjs.org']
                },
                'data_science': {
                    'keywords': ['data', 'analytics', 'machine learning', 'ai', 'pandas', 'numpy', 'tensorflow', 'pytorch', 'jupyter', 'databricks', 'spark', 'sql', 'database'],
                    'domains': ['kaggle.com', 'databricks.com', 'pandas.pydata.org', 'numpy.org', 'tensorflow.org', 'pytorch.org']
                },
                'education': {
                    'keywords': ['course', 'learn', 'education', 'university', 'college', 'tutorial', 'lesson', 'study', 'academic', 'research'],
                    'domains': ['coursera.org', 'edx.org', 'udemy.com', 'khanacademy.org', 'mit.edu', 'stanford.edu']
                },
                'news': {
                    'keywords': ['news', 'article', 'blog', 'post', 'medium', 'substack', 'newsletter'],
                    'domains': ['bbc.com', 'cnn.com', 'reuters.com', 'medium.com', 'substack.com']
                },
                'social': {
                    'keywords': ['twitter', 'facebook', 'instagram', 'linkedin', 'reddit', 'social', 'community'],
                    'domains': ['twitter.com', 'x.com', 'facebook.com', 'instagram.com', 'linkedin.com', 'reddit.com']
                },
                'tools': {
                    'keywords': ['tool', 'utility', 'converter', 'calculator', 'generator', 'editor', 'design', 'productivity'],
                    'domains': ['canva.com', 'figma.com', 'notion.so', 'trello.com', 'slack.com']
                },
                'entertainment': {
                    'keywords': ['video', 'music', 'game', 'movie', 'entertainment', 'youtube', 'netflix', 'spotify'],
                    'domains': ['youtube.com', 'netflix.com', 'spotify.com', 'twitch.tv', 'steam.com']
                },
                'shopping': {
                    'keywords': ['shop', 'buy', 'store', 'amazon', 'ebay', 'price', 'deal', 'discount'],
                    'domains': ['amazon.com', 'ebay.com', 'etsy.com', 'shopify.com']
                }
            }
        
        def categorize_url(self, url: str, title: str = "") -> str:
            """Categorize a URL based on domain and content"""
            domain = urlparse(url).netloc.lower()
            text_to_analyze = f"{url} {title}".lower()
            
            scores = defaultdict(int)
            
            for category, data in self.categories.items():
                # Check domain matches
                for cat_domain in data['domains']:
                    if cat_domain in domain:
                        scores[category] += 3
                
                # Check keyword matches
                for keyword in data['keywords']:
                    if keyword in text_to_analyze:
                        scores[category] += 1
            
            if scores:
                return max(scores, key=scores.get)
            return 'uncategorized'
        
        def extract_keywords(self, url: str, title: str = "") -> List[str]:
            """Extract relevant keywords from URL and title"""
            keywords = set()
            
            # Extract from URL path
            parsed = urlparse(url)
            path_parts = [part for part in parsed.path.split('/') if part and len(part) > 2]
            keywords.update(path_parts)
            
            # Extract from title
            title_words = re.findall(r'\b\w+\b', title.lower())
            keywords.update([word for word in title_words if len(word) > 3])
            
            # Extract from domain
            domain_parts = parsed.netloc.split('.')
            keywords.update([part for part in domain_parts if len(part) > 2 and part not in ['com', 'org', 'net', 'edu']])
            
            return list(keywords)[:10]  # Limit to 10 keywords

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/UpendraNath/MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server