Multi-service MCP Server
by AdamPippert
# tools/puppeteer_tool.py
from flask import Blueprint, request, jsonify, current_app
import os
import json
import base64
import tempfile
import subprocess
from pathlib import Path
import asyncio
import threading
puppeteer_routes = Blueprint('puppeteer', __name__)
# Path to the Node.js scripts
SCRIPT_DIR = Path(__file__).parent.parent / 'node_scripts'
def ensure_script_dir():
"""Ensure the puppeteer scripts directory exists and create necessary scripts"""
os.makedirs(SCRIPT_DIR, exist_ok=True)
# Create the screenshot script if it doesn't exist
screenshot_script = SCRIPT_DIR / 'screenshot.js'
if not screenshot_script.exists():
with open(screenshot_script, 'w') as f:
f.write("""
const puppeteer = require('puppeteer');
const fs = require('fs');
(async () => {
const args = JSON.parse(process.argv[2]);
const browser = await puppeteer.launch({
headless: args.headless !== false,
executablePath: args.executablePath || null,
args: ['--no-sandbox', '--disable-setuid-sandbox']
});
const page = await browser.newPage();
if (args.viewport) {
await page.setViewport(args.viewport);
}
if (args.userAgent) {
await page.setUserAgent(args.userAgent);
}
await page.goto(args.url, {
waitUntil: args.waitUntil || 'networkidle2',
timeout: args.timeout || 30000
});
if (args.waitForSelector) {
await page.waitForSelector(args.waitForSelector, { timeout: args.selectorTimeout || 30000 });
}
if (args.waitTime) {
await new Promise(resolve => setTimeout(resolve, args.waitTime));
}
const screenshotOptions = {
path: args.outputPath,
fullPage: args.fullPage === true,
type: args.type || 'png',
quality: args.type === 'jpeg' ? (args.quality || 80) : undefined
};
await page.screenshot(screenshotOptions);
await browser.close();
console.log(JSON.stringify({ success: true, outputPath: args.outputPath }));
})().catch(err => {
console.error(JSON.stringify({ success: false, error: err.message }));
process.exit(1);
});
""")
# Create the PDF script if it doesn't exist
pdf_script = SCRIPT_DIR / 'pdf.js'
if not pdf_script.exists():
with open(pdf_script, 'w') as f:
f.write("""
const puppeteer = require('puppeteer');
const fs = require('fs');
(async () => {
const args = JSON.parse(process.argv[2]);
const browser = await puppeteer.launch({
headless: args.headless !== false,
executablePath: args.executablePath || null,
args: ['--no-sandbox', '--disable-setuid-sandbox']
});
const page = await browser.newPage();
if (args.viewport) {
await page.setViewport(args.viewport);
}
if (args.userAgent) {
await page.setUserAgent(args.userAgent);
}
await page.goto(args.url, {
waitUntil: args.waitUntil || 'networkidle2',
timeout: args.timeout || 30000
});
if (args.waitForSelector) {
await page.waitForSelector(args.waitForSelector, { timeout: args.selectorTimeout || 30000 });
}
if (args.waitTime) {
await new Promise(resolve => setTimeout(resolve, args.waitTime));
}
const pdfOptions = {
path: args.outputPath,
format: args.format || 'A4',
printBackground: args.printBackground !== false,
margin: args.margin || { top: '1cm', right: '1cm', bottom: '1cm', left: '1cm' }
};
await page.pdf(pdfOptions);
await browser.close();
console.log(JSON.stringify({ success: true, outputPath: args.outputPath }));
})().catch(err => {
console.error(JSON.stringify({ success: false, error: err.message }));
process.exit(1);
});
""")
# Create the content extraction script if it doesn't exist
extract_script = SCRIPT_DIR / 'extract.js'
if not extract_script.exists():
with open(extract_script, 'w') as f:
f.write("""
const puppeteer = require('puppeteer');
const fs = require('fs');
(async () => {
const args = JSON.parse(process.argv[2]);
const browser = await puppeteer.launch({
headless: args.headless !== false,
executablePath: args.executablePath || null,
args: ['--no-sandbox', '--disable-setuid-sandbox']
});
const page = await browser.newPage();
if (args.userAgent) {
await page.setUserAgent(args.userAgent);
}
await page.goto(args.url, {
waitUntil: args.waitUntil || 'networkidle2',
timeout: args.timeout || 30000
});
if (args.waitForSelector) {
await page.waitForSelector(args.waitForSelector, { timeout: args.selectorTimeout || 30000 });
}
if (args.waitTime) {
await new Promise(resolve => setTimeout(resolve, args.waitTime));
}
let result;
if (args.selector) {
if (args.extractHtml) {
result = await page.evaluate((selector) => {
const elements = Array.from(document.querySelectorAll(selector));
return elements.map(el => el.outerHTML);
}, args.selector);
} else {
result = await page.evaluate((selector) => {
const elements = Array.from(document.querySelectorAll(selector));
return elements.map(el => el.textContent.trim());
}, args.selector);
}
} else {
if (args.extractHtml) {
result = await page.content();
} else {
result = await page.evaluate(() => document.body.innerText);
}
}
await browser.close();
console.log(JSON.stringify({ success: true, content: result }));
})().catch(err => {
console.error(JSON.stringify({ success: false, error: err.message }));
process.exit(1);
});
""")
def handle_action(action, parameters):
"""Handle Puppeteer tool actions according to MCP standard"""
ensure_script_dir()
action_handlers = {
"screenshot": take_screenshot,
"pdf": generate_pdf,
"extract": extract_content
}
if action not in action_handlers:
raise ValueError(f"Unknown action: {action}")
return action_handlers[action](parameters)
def take_screenshot(parameters):
"""Take a screenshot of a webpage"""
url = parameters.get('url')
full_page = parameters.get('fullPage', False)
image_type = parameters.get('type', 'png')
if not url:
raise ValueError("URL parameter is required")
# Create a temporary file for the screenshot
with tempfile.NamedTemporaryFile(suffix=f'.{image_type}', delete=False) as tmp_file:
output_path = tmp_file.name
# Prepare arguments for the Node.js script
script_args = {
'url': url,
'outputPath': output_path,
'fullPage': full_page,
'type': image_type,
'headless': current_app.config.get('PUPPETEER_HEADLESS', True),
'executablePath': current_app.config.get('CHROME_PATH')
}
# Add optional parameters if provided
for param in ['waitForSelector', 'waitTime', 'viewport', 'userAgent', 'quality']:
if param in parameters:
script_args[param] = parameters[param]
# Execute the Node.js script
script_path = SCRIPT_DIR / 'screenshot.js'
try:
process = subprocess.run(
['node', str(script_path), json.dumps(script_args)],
capture_output=True,
text=True,
check=True
)
# Parse the output
result = json.loads(process.stdout)
# Read the screenshot file
with open(output_path, 'rb') as f:
image_data = f.read()
# Encode as base64
base64_image = base64.b64encode(image_data).decode('utf-8')
# Clean up the file
os.unlink(output_path)
return {
'success': True,
'imageType': image_type,
'base64Image': base64_image
}
except subprocess.CalledProcessError as e:
# Clean up the file
if os.path.exists(output_path):
os.unlink(output_path)
error_message = e.stderr
try:
error_data = json.loads(error_message)
return {
'success': False,
'error': error_data.get('error', error_message)
}
except:
return {
'success': False,
'error': error_message
}
except Exception as e:
# Clean up the file
if os.path.exists(output_path):
os.unlink(output_path)
return {
'success': False,
'error': str(e)
}
def generate_pdf(parameters):
"""Generate a PDF of a webpage"""
url = parameters.get('url')
print_background = parameters.get('printBackground', True)
if not url:
raise ValueError("URL parameter is required")
# Create a temporary file for the PDF
with tempfile.NamedTemporaryFile(suffix='.pdf', delete=False) as tmp_file:
output_path = tmp_file.name
# Prepare arguments for the Node.js script
script_args = {
'url': url,
'outputPath': output_path,
'printBackground': print_background,
'headless': current_app.config.get('PUPPETEER_HEADLESS', True),
'executablePath': current_app.config.get('CHROME_PATH')
}
# Add optional parameters if provided
for param in ['format', 'margin', 'waitForSelector', 'waitTime', 'viewport', 'userAgent']:
if param in parameters:
script_args[param] = parameters[param]
# Execute the Node.js script
script_path = SCRIPT_DIR / 'pdf.js'
try:
process = subprocess.run(
['node', str(script_path), json.dumps(script_args)],
capture_output=True,
text=True,
check=True
)
# Parse the output
result = json.loads(process.stdout)
# Read the PDF file
with open(output_path, 'rb') as f:
pdf_data = f.read()
# Encode as base64
base64_pdf = base64.b64encode(pdf_data).decode('utf-8')
# Clean up the file
os.unlink(output_path)
return {
'success': True,
'base64Pdf': base64_pdf
}
except subprocess.CalledProcessError as e:
# Clean up the file
if os.path.exists(output_path):
os.unlink(output_path)
error_message = e.stderr
try:
error_data = json.loads(error_message)
return {
'success': False,
'error': error_data.get('error', error_message)
}
except:
return {
'success': False,
'error': error_message
}
except Exception as e:
# Clean up the file
if os.path.exists(output_path):
os.unlink(output_path)
return {
'success': False,
'error': str(e)
}
def extract_content(parameters):
"""Extract content from a webpage"""
url = parameters.get('url')
selector = parameters.get('selector')
extract_html = parameters.get('extractHtml', False)
if not url:
raise ValueError("URL parameter is required")
# Prepare arguments for the Node.js script
script_args = {
'url': url,
'selector': selector,
'extractHtml': extract_html,
'headless': current_app.config.get('PUPPETEER_HEADLESS', True),
'executablePath': current_app.config.get('CHROME_PATH')
}
# Add optional parameters if provided
for param in ['waitForSelector', 'waitTime', 'userAgent']:
if param in parameters:
script_args[param] = parameters[param]
# Execute the Node.js script
script_path = SCRIPT_DIR / 'extract.js'
try:
process = subprocess.run(
['node', str(script_path), json.dumps(script_args)],
capture_output=True,
text=True,
check=True
)
# Parse the output
result = json.loads(process.stdout)
return {
'success': True,
'content': result.get('content')
}
except subprocess.CalledProcessError as e:
error_message = e.stderr
try:
error_data = json.loads(error_message)
return {
'success': False,
'error': error_data.get('error', error_message)
}
except:
return {
'success': False,
'error': error_message
}
except Exception as e:
return {
'success': False,
'error': str(e)
}
# API routes for direct access (not through MCP gateway)
@puppeteer_routes.route('/screenshot', methods=['POST'])
def api_screenshot():
"""API endpoint for taking a screenshot"""
try:
data = request.get_json()
result = take_screenshot(data)
return jsonify(result)
except Exception as e:
return jsonify({'success': False, 'error': str(e)}), 400
@puppeteer_routes.route('/pdf', methods=['POST'])
def api_pdf():
"""API endpoint for generating a PDF"""
try:
data = request.get_json()
result = generate_pdf(data)
return jsonify(result)
except Exception as e:
return jsonify({'success': False, 'error': str(e)}), 400
@puppeteer_routes.route('/extract', methods=['POST'])
def api_extract():
"""API endpoint for extracting content"""
try:
data = request.get_json()
result = extract_content(data)
return jsonify(result)
except Exception as e:
return jsonify({'success': False, 'error': str(e)}), 400