Google Search MCP Server
by mixelpixx
Verified
from flask import Flask, request, jsonify, send_from_directory
import json
from googleapiclient.discovery import build
from typing import Dict, List
from flask_cors import CORS
import os
app = Flask(__name__, static_folder='static')
CORS(app) # Enable CORS for frontend development
class MCPServer:
def __init__(self):
self.api_key = None
self.cse_id = None
self.search_service = None
self.load_settings()
def load_settings(self):
"""Load Google API credentials from environment variables or api-keys.json"""
try:
# First try to get from environment variables
self.api_key = os.environ.get('GOOGLE_API_KEY')
self.cse_id = os.environ.get('GOOGLE_SEARCH_ENGINE_ID')
# If not found in environment variables, try api-keys.json
if not self.api_key or not self.cse_id:
with open('api-keys.json', 'r') as f:
config = json.load(f)
self.api_key = config.get('api_key')
self.cse_id = config.get('search_engine_id')
if not self.api_key or not self.cse_id:
raise ValueError("Missing API credentials. Please set GOOGLE_API_KEY and GOOGLE_SEARCH_ENGINE_ID environment variables or configure api-keys.json")
self.initialize_google_search()
except Exception as e:
raise ValueError(f"Failed to load API credentials: {str(e)}")
def initialize_google_search(self):
"""Initialize Google Custom Search API service"""
self.search_service = build("customsearch", "v1", developerKey=self.api_key)
def perform_search(self, query: str, num_results: int = 10, date_restrict: str = None,
language: str = None, country: str = None, safe_search: str = None) -> List[Dict]:
"""
Perform a Google search and return formatted results
Parameters:
- query: Search query string
- num_results: Number of results to return (max 10)
- date_restrict: Restrict results to a date range (e.g., 'd1', 'w2', 'm3', 'y1')
- language: Restrict results to a specific language (e.g., 'en', 'es', 'fr')
- country: Restrict results to a specific country (e.g., 'us', 'uk', 'ca')
- safe_search: Safe search level ('off', 'medium', 'high')
"""
if not self.search_service:
raise ValueError("Google Search API not initialized")
try:
# Build search parameters
search_params = {
'q': query,
'cx': self.cse_id,
'num': min(num_results, 10)
}
# Add optional parameters if provided
if date_restrict:
search_params['dateRestrict'] = date_restrict
if language:
search_params['lr'] = f'lang_{language}'
if country:
search_params['cr'] = f'country{country.upper()}'
if safe_search:
search_params['safe'] = safe_search
# Execute search with parameters
result = self.search_service.cse().list(**search_params).execute()
formatted_results = []
if 'items' in result:
for item in result['items']:
formatted_results.append({
'title': item.get('title', ''),
'link': item.get('link', ''),
'snippet': item.get('snippet', ''),
'pagemap': item.get('pagemap', {}),
'datePublished': item.get('pagemap', {}).get('metatags', [{}])[0].get('article:published_time', ''),
'source': 'google_search'
})
return formatted_results
except Exception as e:
print(f"Search error: {str(e)}")
return []
# Initialize the MCP server
mcp = MCPServer()
@app.route('/')
def serve_frontend():
"""Serve the frontend HTML"""
return send_from_directory('static', 'index.html')
@app.route('/search', methods=['POST'])
def search():
"""Endpoint to perform Google search"""
data = request.get_json()
if not data or 'query' not in data:
return jsonify({'error': 'Missing search query'}), 400
# Extract search parameters
query = data['query']
num_results = data.get('num_results', 10)
date_restrict = data.get('date_restrict')
language = data.get('language')
country = data.get('country')
safe_search = data.get('safe_search')
try:
results = mcp.perform_search(
query=query,
num_results=num_results,
date_restrict=date_restrict,
language=language,
country=country,
safe_search=safe_search
)
return jsonify({'results': results}), 200
except Exception as e:
return jsonify({'error': f'Search failed: {str(e)}'}), 500
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)