Skip to main content
Glama
UpendraNath

Tavily Web Search MCP Server

by UpendraNath

organize_and_categorize

Clean, deduplicate, and categorize bookmark data to organize web search results into structured folders.

Instructions

Clean, deduplicate, and categorize bookmark data

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
data_folderNodata
output_folderNoorganized

Implementation Reference

  • server.py:38-53 (handler)
    The MCP tool handler function for 'organize_and_categorize'. It uses the BookmarkOrganizer class to perform cleaning, deduplication, categorization, organization, saving, and reporting of bookmark data.
    @mcp.tool() def organize_and_categorize(data_folder: str = "data", output_folder: str = "organized") -> str: """Clean, deduplicate, and categorize bookmark data""" try: from bookmark_organizer import BookmarkOrganizer organizer = BookmarkOrganizer(data_folder) organizer.load_data() organizer.clean_and_deduplicate() organizer.categorize_and_tag() organized_data = organizer.organize_by_category() organizer.save_organized_data(organized_data, output_folder) report = organizer.generate_report(organized_data) return f"Organization complete!\n\n{report}" except Exception as e: return f"Error organizing data: {str(e)}"
  • The BookmarkOrganizer class provides all supporting utility methods used by the tool handler: load_data(), clean_and_deduplicate(), categorize_and_tag(), organize_by_category(), generate_report(), and save_organized_data().
    class BookmarkOrganizer: """Main organizer class""" def __init__(self, data_folder: str = "data"): self.data_folder = Path(data_folder) self.parser = BookmarkParser() self.analyzer = ContentAnalyzer() self.deduplicator = Deduplicator() self.all_items = [] def load_data(self): """Load all bookmark and history data""" print("Loading data from files...") # Load bookmarks bookmark_file = self.data_folder / "bookmarks_10_26_25.html" if bookmark_file.exists(): bookmarks = self.parser.parse_netscape_html(str(bookmark_file)) self.all_items.extend(bookmarks) print(f"Loaded {len(bookmarks)} bookmarks") # Load history history_file = self.data_folder / "BrowserHistory_10_26_25.csv" if history_file.exists(): history = self.parser.parse_csv_history(str(history_file)) self.all_items.extend(history) print(f"Loaded {len(history)} history entries") print(f"Total items loaded: {len(self.all_items)}") def clean_and_deduplicate(self): """Remove duplicates and clean data""" print("Cleaning and deduplicating...") cleaned_items = [] duplicates_removed = 0 for item in self.all_items: if not self.deduplicator.is_duplicate(item['url'], item.get('title', '')): cleaned_items.append(item) else: duplicates_removed += 1 self.all_items = cleaned_items print(f"Removed {duplicates_removed} duplicates") print(f"Clean items remaining: {len(self.all_items)}") def categorize_and_tag(self): """Categorize items and add keywords""" print("Categorizing and tagging items...") for item in self.all_items: item['category'] = self.analyzer.categorize_url(item['url'], item.get('title', '')) item['keywords'] = self.analyzer.extract_keywords(item['url'], item.get('title', '')) def organize_by_category(self) -> Dict[str, List[Dict]]: """Organize items by category""" organized = defaultdict(list) for item in self.all_items: organized[item['category']].append(item) # Sort each category by domain frequency for category, items in organized.items(): domain_counts = Counter(item['domain'] for item in items) items.sort(key=lambda x: domain_counts[x['domain']], reverse=True) return dict(organized) def generate_report(self, organized_data: Dict[str, List[Dict]]) -> str: """Generate a summary report""" report = [] report.append("=" * 60) report.append("BOOKMARK & HISTORY ORGANIZATION REPORT") report.append("=" * 60) report.append(f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") report.append("") # Summary statistics total_items = sum(len(items) for items in organized_data.values()) report.append(f"Total organized items: {total_items}") report.append(f"Categories: {len(organized_data)}") report.append("") # Category breakdown report.append("CATEGORY BREAKDOWN:") report.append("-" * 30) for category, items in sorted(organized_data.items(), key=lambda x: len(x[1]), reverse=True): report.append(f"{category.upper()}: {len(items)} items") # Top domains in this category domain_counts = Counter(item['domain'] for item in items) top_domains = domain_counts.most_common(3) for domain, count in top_domains: report.append(f" - {domain}: {count} items") report.append("") return "\n".join(report) def save_organized_data(self, organized_data: Dict[str, List[Dict]], output_folder: str = "organized"): """Save organized data to files""" output_path = Path(output_folder) output_path.mkdir(exist_ok=True) print(f"Saving organized data to {output_path}...") # Save each category to separate JSON file for category, items in organized_data.items(): category_file = output_path / f"{category}.json" with open(category_file, 'w', encoding='utf-8') as f: json.dump(items, f, indent=2, ensure_ascii=False) # Save complete organized data complete_file = output_path / "complete_organized.json" with open(complete_file, 'w', encoding='utf-8') as f: json.dump(organized_data, f, indent=2, ensure_ascii=False) # Save report report_file = output_path / "organization_report.txt" report = self.generate_report(organized_data) with open(report_file, 'w', encoding='utf-8') as f: f.write(report) print(f"Saved {len(organized_data)} category files") print(f"Report saved to {report_file}")
  • server.py:38-38 (registration)
    The @mcp.tool() decorator registers the organize_and_categorize function as an MCP tool.
    @mcp.tool()
  • BookmarkParser class with methods to parse bookmark HTML and history CSV files.
    class BookmarkParser: """Parse different bookmark formats""" @staticmethod def parse_netscape_html(file_path: str) -> List[Dict]: """Parse Netscape bookmark HTML format""" bookmarks = [] with open(file_path, 'r', encoding='utf-8') as f: content = f.read() # Extract bookmark links using regex pattern = r'<A HREF="([^"]+)"[^>]*>([^<]+)</A>' matches = re.findall(pattern, content) for url, title in matches: # Decode HTML entities title = html.unescape(title) url = html.unescape(url) bookmarks.append({ 'url': url, 'title': title, 'source': 'bookmarks', 'domain': urlparse(url).netloc, 'added_date': datetime.now().isoformat() }) return bookmarks @staticmethod def parse_csv_history(file_path: str) -> List[Dict]: """Parse CSV browser history format""" history = [] with open(file_path, 'r', encoding='utf-8-sig') as f: # Use utf-8-sig to handle BOM reader = csv.DictReader(f) for row in reader: try: # Handle BOM in column names datetime_key = 'DateTime' if 'DateTime' in row else 'DateTime' history.append({ 'url': row['NavigatedToUrl'], 'title': row['PageTitle'], 'source': 'history', 'domain': urlparse(row['NavigatedToUrl']).netloc, 'visit_date': row[datetime_key] }) except KeyError as e: print(f"Warning: Missing column {e} in CSV file") continue return history
  • Deduplicator class for cleaning duplicate and similar entries.
    class Deduplicator: """Remove duplicate and redundant URLs""" def __init__(self): self.seen_urls = set() self.seen_titles = set() def normalize_url(self, url: str) -> str: """Normalize URL for comparison""" parsed = urlparse(url) # Remove common tracking parameters query_params = parse_qs(parsed.query) filtered_params = {} for key, value in query_params.items(): if key.lower() not in ['utm_source', 'utm_medium', 'utm_campaign', 'utm_content', 'utm_term', 'fbclid', 'gclid']: filtered_params[key] = value # Rebuild URL new_query = '&'.join([f"{k}={v[0]}" for k, v in filtered_params.items()]) normalized = f"{parsed.scheme}://{parsed.netloc}{parsed.path}" if new_query: normalized += f"?{new_query}" return normalized def is_duplicate(self, url: str, title: str = "") -> bool: """Check if URL is a duplicate""" normalized_url = self.normalize_url(url) # Check for exact URL match if normalized_url in self.seen_urls: return True # Check for similar titles (fuzzy matching) if title: title_lower = title.lower().strip() for seen_title in self.seen_titles: if self._similarity(title_lower, seen_title.lower()) > 0.8: return True # Add to seen sets self.seen_urls.add(normalized_url) if title: self.seen_titles.add(title.lower().strip()) return False def _similarity(self, s1: str, s2: str) -> float: """Calculate similarity between two strings""" if not s1 or not s2: return 0.0 # Simple Jaccard similarity set1 = set(s1.split()) set2 = set(s2.split()) intersection = len(set1.intersection(set2)) union = len(set1.union(set2)) return intersection / union if union > 0 else 0.0
  • ContentAnalyzer class for categorization and keyword extraction.
    class ContentAnalyzer: """Analyze content and categorize URLs""" def __init__(self): self.categories = { 'programming': { 'keywords': ['github', 'stackoverflow', 'coding', 'programming', 'developer', 'api', 'documentation', 'tutorial', 'code', 'python', 'javascript', 'java', 'react', 'node', 'git'], 'domains': ['github.com', 'stackoverflow.com', 'developer.mozilla.org', 'docs.python.org', 'nodejs.org', 'reactjs.org'] }, 'data_science': { 'keywords': ['data', 'analytics', 'machine learning', 'ai', 'pandas', 'numpy', 'tensorflow', 'pytorch', 'jupyter', 'databricks', 'spark', 'sql', 'database'], 'domains': ['kaggle.com', 'databricks.com', 'pandas.pydata.org', 'numpy.org', 'tensorflow.org', 'pytorch.org'] }, 'education': { 'keywords': ['course', 'learn', 'education', 'university', 'college', 'tutorial', 'lesson', 'study', 'academic', 'research'], 'domains': ['coursera.org', 'edx.org', 'udemy.com', 'khanacademy.org', 'mit.edu', 'stanford.edu'] }, 'news': { 'keywords': ['news', 'article', 'blog', 'post', 'medium', 'substack', 'newsletter'], 'domains': ['bbc.com', 'cnn.com', 'reuters.com', 'medium.com', 'substack.com'] }, 'social': { 'keywords': ['twitter', 'facebook', 'instagram', 'linkedin', 'reddit', 'social', 'community'], 'domains': ['twitter.com', 'x.com', 'facebook.com', 'instagram.com', 'linkedin.com', 'reddit.com'] }, 'tools': { 'keywords': ['tool', 'utility', 'converter', 'calculator', 'generator', 'editor', 'design', 'productivity'], 'domains': ['canva.com', 'figma.com', 'notion.so', 'trello.com', 'slack.com'] }, 'entertainment': { 'keywords': ['video', 'music', 'game', 'movie', 'entertainment', 'youtube', 'netflix', 'spotify'], 'domains': ['youtube.com', 'netflix.com', 'spotify.com', 'twitch.tv', 'steam.com'] }, 'shopping': { 'keywords': ['shop', 'buy', 'store', 'amazon', 'ebay', 'price', 'deal', 'discount'], 'domains': ['amazon.com', 'ebay.com', 'etsy.com', 'shopify.com'] } } def categorize_url(self, url: str, title: str = "") -> str: """Categorize a URL based on domain and content""" domain = urlparse(url).netloc.lower() text_to_analyze = f"{url} {title}".lower() scores = defaultdict(int) for category, data in self.categories.items(): # Check domain matches for cat_domain in data['domains']: if cat_domain in domain: scores[category] += 3 # Check keyword matches for keyword in data['keywords']: if keyword in text_to_analyze: scores[category] += 1 if scores: return max(scores, key=scores.get) return 'uncategorized' def extract_keywords(self, url: str, title: str = "") -> List[str]: """Extract relevant keywords from URL and title""" keywords = set() # Extract from URL path parsed = urlparse(url) path_parts = [part for part in parsed.path.split('/') if part and len(part) > 2] keywords.update(path_parts) # Extract from title title_words = re.findall(r'\b\w+\b', title.lower()) keywords.update([word for word in title_words if len(word) > 3]) # Extract from domain domain_parts = parsed.netloc.split('.') keywords.update([part for part in domain_parts if len(part) > 2 and part not in ['com', 'org', 'net', 'edu']]) return list(keywords)[:10] # Limit to 10 keywords

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/UpendraNath/MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server