Skip to main content
Glama
UpendraNath

Tavily Web Search MCP Server

by UpendraNath

organize_and_categorize

Clean, deduplicate, and categorize bookmark data to organize web search results into structured folders.

Instructions

Clean, deduplicate, and categorize bookmark data

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
data_folderNodata
output_folderNoorganized

Output Schema

TableJSON Schema
NameRequiredDescriptionDefault
resultYes

Implementation Reference

  • server.py:38-53 (handler)
    The MCP tool handler function for 'organize_and_categorize'. It uses the BookmarkOrganizer class to perform cleaning, deduplication, categorization, organization, saving, and reporting of bookmark data.
    @mcp.tool()
    def organize_and_categorize(data_folder: str = "data", output_folder: str = "organized") -> str:
        """Clean, deduplicate, and categorize bookmark data"""
        try:
            from bookmark_organizer import BookmarkOrganizer
            organizer = BookmarkOrganizer(data_folder)
            organizer.load_data()
            organizer.clean_and_deduplicate()
            organizer.categorize_and_tag()
            organized_data = organizer.organize_by_category()
            organizer.save_organized_data(organized_data, output_folder)
            
            report = organizer.generate_report(organized_data)
            return f"Organization complete!\n\n{report}"
        except Exception as e:
            return f"Error organizing data: {str(e)}"
  • The BookmarkOrganizer class provides all supporting utility methods used by the tool handler: load_data(), clean_and_deduplicate(), categorize_and_tag(), organize_by_category(), generate_report(), and save_organized_data().
    class BookmarkOrganizer:
        """Main organizer class"""
        
        def __init__(self, data_folder: str = "data"):
            self.data_folder = Path(data_folder)
            self.parser = BookmarkParser()
            self.analyzer = ContentAnalyzer()
            self.deduplicator = Deduplicator()
            self.all_items = []
        
        def load_data(self):
            """Load all bookmark and history data"""
            print("Loading data from files...")
            
            # Load bookmarks
            bookmark_file = self.data_folder / "bookmarks_10_26_25.html"
            if bookmark_file.exists():
                bookmarks = self.parser.parse_netscape_html(str(bookmark_file))
                self.all_items.extend(bookmarks)
                print(f"Loaded {len(bookmarks)} bookmarks")
            
            # Load history
            history_file = self.data_folder / "BrowserHistory_10_26_25.csv"
            if history_file.exists():
                history = self.parser.parse_csv_history(str(history_file))
                self.all_items.extend(history)
                print(f"Loaded {len(history)} history entries")
            
            print(f"Total items loaded: {len(self.all_items)}")
        
        def clean_and_deduplicate(self):
            """Remove duplicates and clean data"""
            print("Cleaning and deduplicating...")
            
            cleaned_items = []
            duplicates_removed = 0
            
            for item in self.all_items:
                if not self.deduplicator.is_duplicate(item['url'], item.get('title', '')):
                    cleaned_items.append(item)
                else:
                    duplicates_removed += 1
            
            self.all_items = cleaned_items
            print(f"Removed {duplicates_removed} duplicates")
            print(f"Clean items remaining: {len(self.all_items)}")
        
        def categorize_and_tag(self):
            """Categorize items and add keywords"""
            print("Categorizing and tagging items...")
            
            for item in self.all_items:
                item['category'] = self.analyzer.categorize_url(item['url'], item.get('title', ''))
                item['keywords'] = self.analyzer.extract_keywords(item['url'], item.get('title', ''))
        
        def organize_by_category(self) -> Dict[str, List[Dict]]:
            """Organize items by category"""
            organized = defaultdict(list)
            
            for item in self.all_items:
                organized[item['category']].append(item)
            
            # Sort each category by domain frequency
            for category, items in organized.items():
                domain_counts = Counter(item['domain'] for item in items)
                items.sort(key=lambda x: domain_counts[x['domain']], reverse=True)
            
            return dict(organized)
        
        def generate_report(self, organized_data: Dict[str, List[Dict]]) -> str:
            """Generate a summary report"""
            report = []
            report.append("=" * 60)
            report.append("BOOKMARK & HISTORY ORGANIZATION REPORT")
            report.append("=" * 60)
            report.append(f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
            report.append("")
            
            # Summary statistics
            total_items = sum(len(items) for items in organized_data.values())
            report.append(f"Total organized items: {total_items}")
            report.append(f"Categories: {len(organized_data)}")
            report.append("")
            
            # Category breakdown
            report.append("CATEGORY BREAKDOWN:")
            report.append("-" * 30)
            
            for category, items in sorted(organized_data.items(), key=lambda x: len(x[1]), reverse=True):
                report.append(f"{category.upper()}: {len(items)} items")
                
                # Top domains in this category
                domain_counts = Counter(item['domain'] for item in items)
                top_domains = domain_counts.most_common(3)
                for domain, count in top_domains:
                    report.append(f"  - {domain}: {count} items")
                report.append("")
            
            return "\n".join(report)
        
        def save_organized_data(self, organized_data: Dict[str, List[Dict]], output_folder: str = "organized"):
            """Save organized data to files"""
            output_path = Path(output_folder)
            output_path.mkdir(exist_ok=True)
            
            print(f"Saving organized data to {output_path}...")
            
            # Save each category to separate JSON file
            for category, items in organized_data.items():
                category_file = output_path / f"{category}.json"
                with open(category_file, 'w', encoding='utf-8') as f:
                    json.dump(items, f, indent=2, ensure_ascii=False)
            
            # Save complete organized data
            complete_file = output_path / "complete_organized.json"
            with open(complete_file, 'w', encoding='utf-8') as f:
                json.dump(organized_data, f, indent=2, ensure_ascii=False)
            
            # Save report
            report_file = output_path / "organization_report.txt"
            report = self.generate_report(organized_data)
            with open(report_file, 'w', encoding='utf-8') as f:
                f.write(report)
            
            print(f"Saved {len(organized_data)} category files")
            print(f"Report saved to {report_file}")
  • server.py:38-38 (registration)
    The @mcp.tool() decorator registers the organize_and_categorize function as an MCP tool.
    @mcp.tool()
  • BookmarkParser class with methods to parse bookmark HTML and history CSV files.
    class BookmarkParser:
        """Parse different bookmark formats"""
        
        @staticmethod
        def parse_netscape_html(file_path: str) -> List[Dict]:
            """Parse Netscape bookmark HTML format"""
            bookmarks = []
            
            with open(file_path, 'r', encoding='utf-8') as f:
                content = f.read()
            
            # Extract bookmark links using regex
            pattern = r'<A HREF="([^"]+)"[^>]*>([^<]+)</A>'
            matches = re.findall(pattern, content)
            
            for url, title in matches:
                # Decode HTML entities
                title = html.unescape(title)
                url = html.unescape(url)
                
                bookmarks.append({
                    'url': url,
                    'title': title,
                    'source': 'bookmarks',
                    'domain': urlparse(url).netloc,
                    'added_date': datetime.now().isoformat()
                })
            
            return bookmarks
        
        @staticmethod
        def parse_csv_history(file_path: str) -> List[Dict]:
            """Parse CSV browser history format"""
            history = []
            
            with open(file_path, 'r', encoding='utf-8-sig') as f:  # Use utf-8-sig to handle BOM
                reader = csv.DictReader(f)
                for row in reader:
                    try:
                        # Handle BOM in column names
                        datetime_key = 'DateTime' if 'DateTime' in row else 'DateTime'
                        history.append({
                            'url': row['NavigatedToUrl'],
                            'title': row['PageTitle'],
                            'source': 'history',
                            'domain': urlparse(row['NavigatedToUrl']).netloc,
                            'visit_date': row[datetime_key]
                        })
                    except KeyError as e:
                        print(f"Warning: Missing column {e} in CSV file")
                        continue
            
            return history
  • Deduplicator class for cleaning duplicate and similar entries.
    class Deduplicator:
        """Remove duplicate and redundant URLs"""
        
        def __init__(self):
            self.seen_urls = set()
            self.seen_titles = set()
        
        def normalize_url(self, url: str) -> str:
            """Normalize URL for comparison"""
            parsed = urlparse(url)
            
            # Remove common tracking parameters
            query_params = parse_qs(parsed.query)
            filtered_params = {}
            
            for key, value in query_params.items():
                if key.lower() not in ['utm_source', 'utm_medium', 'utm_campaign', 'utm_content', 'utm_term', 'fbclid', 'gclid']:
                    filtered_params[key] = value
            
            # Rebuild URL
            new_query = '&'.join([f"{k}={v[0]}" for k, v in filtered_params.items()])
            normalized = f"{parsed.scheme}://{parsed.netloc}{parsed.path}"
            if new_query:
                normalized += f"?{new_query}"
            
            return normalized
        
        def is_duplicate(self, url: str, title: str = "") -> bool:
            """Check if URL is a duplicate"""
            normalized_url = self.normalize_url(url)
            
            # Check for exact URL match
            if normalized_url in self.seen_urls:
                return True
            
            # Check for similar titles (fuzzy matching)
            if title:
                title_lower = title.lower().strip()
                for seen_title in self.seen_titles:
                    if self._similarity(title_lower, seen_title.lower()) > 0.8:
                        return True
            
            # Add to seen sets
            self.seen_urls.add(normalized_url)
            if title:
                self.seen_titles.add(title.lower().strip())
            
            return False
        
        def _similarity(self, s1: str, s2: str) -> float:
            """Calculate similarity between two strings"""
            if not s1 or not s2:
                return 0.0
            
            # Simple Jaccard similarity
            set1 = set(s1.split())
            set2 = set(s2.split())
            
            intersection = len(set1.intersection(set2))
            union = len(set1.union(set2))
            
            return intersection / union if union > 0 else 0.0
  • ContentAnalyzer class for categorization and keyword extraction.
    class ContentAnalyzer:
        """Analyze content and categorize URLs"""
        
        def __init__(self):
            self.categories = {
                'programming': {
                    'keywords': ['github', 'stackoverflow', 'coding', 'programming', 'developer', 'api', 'documentation', 'tutorial', 'code', 'python', 'javascript', 'java', 'react', 'node', 'git'],
                    'domains': ['github.com', 'stackoverflow.com', 'developer.mozilla.org', 'docs.python.org', 'nodejs.org', 'reactjs.org']
                },
                'data_science': {
                    'keywords': ['data', 'analytics', 'machine learning', 'ai', 'pandas', 'numpy', 'tensorflow', 'pytorch', 'jupyter', 'databricks', 'spark', 'sql', 'database'],
                    'domains': ['kaggle.com', 'databricks.com', 'pandas.pydata.org', 'numpy.org', 'tensorflow.org', 'pytorch.org']
                },
                'education': {
                    'keywords': ['course', 'learn', 'education', 'university', 'college', 'tutorial', 'lesson', 'study', 'academic', 'research'],
                    'domains': ['coursera.org', 'edx.org', 'udemy.com', 'khanacademy.org', 'mit.edu', 'stanford.edu']
                },
                'news': {
                    'keywords': ['news', 'article', 'blog', 'post', 'medium', 'substack', 'newsletter'],
                    'domains': ['bbc.com', 'cnn.com', 'reuters.com', 'medium.com', 'substack.com']
                },
                'social': {
                    'keywords': ['twitter', 'facebook', 'instagram', 'linkedin', 'reddit', 'social', 'community'],
                    'domains': ['twitter.com', 'x.com', 'facebook.com', 'instagram.com', 'linkedin.com', 'reddit.com']
                },
                'tools': {
                    'keywords': ['tool', 'utility', 'converter', 'calculator', 'generator', 'editor', 'design', 'productivity'],
                    'domains': ['canva.com', 'figma.com', 'notion.so', 'trello.com', 'slack.com']
                },
                'entertainment': {
                    'keywords': ['video', 'music', 'game', 'movie', 'entertainment', 'youtube', 'netflix', 'spotify'],
                    'domains': ['youtube.com', 'netflix.com', 'spotify.com', 'twitch.tv', 'steam.com']
                },
                'shopping': {
                    'keywords': ['shop', 'buy', 'store', 'amazon', 'ebay', 'price', 'deal', 'discount'],
                    'domains': ['amazon.com', 'ebay.com', 'etsy.com', 'shopify.com']
                }
            }
        
        def categorize_url(self, url: str, title: str = "") -> str:
            """Categorize a URL based on domain and content"""
            domain = urlparse(url).netloc.lower()
            text_to_analyze = f"{url} {title}".lower()
            
            scores = defaultdict(int)
            
            for category, data in self.categories.items():
                # Check domain matches
                for cat_domain in data['domains']:
                    if cat_domain in domain:
                        scores[category] += 3
                
                # Check keyword matches
                for keyword in data['keywords']:
                    if keyword in text_to_analyze:
                        scores[category] += 1
            
            if scores:
                return max(scores, key=scores.get)
            return 'uncategorized'
        
        def extract_keywords(self, url: str, title: str = "") -> List[str]:
            """Extract relevant keywords from URL and title"""
            keywords = set()
            
            # Extract from URL path
            parsed = urlparse(url)
            path_parts = [part for part in parsed.path.split('/') if part and len(part) > 2]
            keywords.update(path_parts)
            
            # Extract from title
            title_words = re.findall(r'\b\w+\b', title.lower())
            keywords.update([word for word in title_words if len(word) > 3])
            
            # Extract from domain
            domain_parts = parsed.netloc.split('.')
            keywords.update([part for part in domain_parts if len(part) > 2 and part not in ['com', 'org', 'net', 'edu']])
            
            return list(keywords)[:10]  # Limit to 10 keywords

Tool Definition Quality

Score is being calculated. Check back soon.

Install Server

Other Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/UpendraNath/MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server