File Finder MCP Server
from http.server import BaseHTTPRequestHandler, HTTPServer
import json
import os
import tempfile
import base64
from urllib.parse import parse_qs, urlparse
# Try to import faster-whisper, provide clear error if not installed
try:
from faster_whisper import WhisperModel
except ImportError:
print("Error: faster-whisper is not installed. Please install it with:")
print("pip install faster-whisper")
exit(1)
# Initialize the model (small model by default)
model_size = "small"
# Run on GPU if available
compute_type = "auto"
try:
model = WhisperModel(model_size, device="auto", compute_type=compute_type)
print(f"Loaded faster-whisper model: {model_size}")
except Exception as e:
print(f"Error loading model: {e}")
exit(1)
class WhisperHandler(BaseHTTPRequestHandler):
def do_POST(self):
if self.path.startswith("/transcribe"):
content_length = int(self.headers['Content-Length'])
post_data = self.rfile.read(content_length)
try:
data = json.loads(post_data.decode('utf-8'))
# Check if audio data is provided
if 'audio' not in data:
self.send_error(400, "Missing audio data")
return
# Get audio data from base64
audio_data = base64.b64decode(data['audio'])
# Get language if provided, otherwise auto-detect
language = data.get('language', None)
# Create a temporary file for the audio
with tempfile.NamedTemporaryFile(delete=False, suffix='.wav') as temp_file:
temp_filename = temp_file.name
temp_file.write(audio_data)
try:
# Transcribe the audio
segments, info = model.transcribe(
temp_filename,
language=language,
beam_size=5,
vad_filter=True
)
# Collect the segments
result = []
for segment in segments:
result.append({
"start": segment.start,
"end": segment.end,
"text": segment.text.strip()
})
# Prepare the response
response = {
"text": " ".join([segment["text"] for segment in result]),
"segments": result,
"language": info.language,
"language_probability": info.language_probability
}
# Send the response
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps(response, indent=4).encode("utf-8"))
finally:
# Clean up the temporary file
if os.path.exists(temp_filename):
os.unlink(temp_filename)
except json.JSONDecodeError:
self.send_error(400, "Invalid JSON")
except Exception as e:
self.send_error(500, f"Error processing audio: {str(e)}")
elif self.path.startswith("/health"):
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps({"status": "ok", "model": model_size}).encode("utf-8"))
else:
self.send_error(404, "Not Found")
def do_GET(self):
if self.path.startswith("/health"):
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps({"status": "ok", "model": model_size}).encode("utf-8"))
else:
self.send_error(404, "Not Found")
if __name__ == "__main__":
server_address = ("localhost", 8081) # Using port 8081 to avoid conflict with file finder server
httpd = HTTPServer(server_address, WhisperHandler)
print("Whisper STT сервис запущен на http://localhost:8081")
httpd.serve_forever()