from mcp.server.fastmcp import FastMCP
import json
import os
from typing import Dict, List, Any, Tuple
import numpy as np
from sentence_transformers import SentenceTransformer
import pickle
# Initialize the FastMCP server
mcp = FastMCP("MobiFone RAG Server")
# Paths
DATA_FILE_PATH = r"d:\windifybot\mcp-english-tutor\mcp-english-tutor\mobifone_introduce.json"
EMBEDDINGS_FILE_PATH = r"d:\windifybot\mcp-english-tutor\mcp-english-tutor\embeddings.pkl"
# Initialize embedding model (free multilingual model)
# This model works well with Vietnamese text
embedding_model = SentenceTransformer('paraphrase-multilingual-MiniLM-L12-v2')
class ServiceEmbeddings:
def __init__(self):
self.services: Dict[str, Dict[str, Any]] = {}
self.embeddings: Dict[str, np.ndarray] = {}
self.service_texts: Dict[str, str] = {}
def load_data(self) -> Dict[str, Any]:
"""Loads the MobiFone data from the JSON file."""
if not os.path.exists(DATA_FILE_PATH):
return {}
try:
with open(DATA_FILE_PATH, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception as e:
return {"error": str(e)}
def _flatten_services(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""Helper to flatten the service structure."""
services = {}
root = data.get("Tài liệu giới thiệu hệ sinh thái giải pháp số MobiFone", {})
for chapter_name, chapter_content in root.items():
if isinstance(chapter_content, dict) and chapter_name not in ["Mục lục", "Thông tin liên hệ"]:
for service_name, service_details in chapter_content.items():
if isinstance(service_details, dict):
services[service_name] = service_details
services[service_name]['_chapter'] = chapter_name
return services
def get_toc(self) -> Dict[str, Any]:
"""Returns the 'Mục lục' section from the data."""
data = self.load_data()
root = data.get("Tài liệu giới thiệu hệ sinh thái giải pháp số MobiFone", {})
return root.get("Mục lục", {})
def _service_to_text(self, service_name: str, service_details: Dict[str, Any]) -> str:
"""Convert service details to searchable text."""
text_parts = [f"Tên dịch vụ: {service_name}"]
# Add all text content from the service
for key, value in service_details.items():
if key == '_chapter':
text_parts.append(f"Chương: {value}")
elif isinstance(value, str):
text_parts.append(f"{key}: {value}")
elif isinstance(value, list):
text_parts.append(f"{key}: {' '.join(str(v) for v in value)}")
elif isinstance(value, dict):
# Handle nested dicts
dict_text = ' '.join(f"{k}: {v}" for k, v in value.items() if isinstance(v, (str, int, float)))
text_parts.append(f"{key}: {dict_text}")
return "\n".join(text_parts)
def build_embeddings(self):
"""Build embeddings for all services."""
print("Loading data...")
data = self.load_data()
print("Flattening services...")
self.services = self._flatten_services(data)
print(f"Found {len(self.services)} services")
print("Creating text representations...")
# Create text representation for each service
for service_name, service_details in self.services.items():
self.service_texts[service_name] = self._service_to_text(service_name, service_details)
print("Generating embeddings...")
# Generate embeddings
texts = list(self.service_texts.values())
names = list(self.service_texts.keys())
embeddings = embedding_model.encode(texts, show_progress_bar=True)
# Store embeddings
for name, embedding in zip(names, embeddings):
self.embeddings[name] = embedding
print("Saving embeddings...")
self.save_embeddings()
print("Done!")
def save_embeddings(self):
"""Save embeddings to file."""
data_to_save = {
'embeddings': self.embeddings,
'service_texts': self.service_texts,
'services': self.services
}
with open(EMBEDDINGS_FILE_PATH, 'wb') as f:
pickle.dump(data_to_save, f)
def load_embeddings(self):
"""Load embeddings from file."""
if not os.path.exists(EMBEDDINGS_FILE_PATH):
print("Embeddings file not found. Building embeddings...")
self.build_embeddings()
return
print("Loading embeddings from file...")
with open(EMBEDDINGS_FILE_PATH, 'rb') as f:
data = pickle.load(f)
self.embeddings = data['embeddings']
self.service_texts = data['service_texts']
self.services = data['services']
print(f"Loaded {len(self.embeddings)} service embeddings")
def search(self, query: str, top_k: int = 3) -> List[Tuple[str, float, Dict[str, Any]]]:
"""Search for most relevant services using cosine similarity."""
# Generate query embedding
query_embedding = embedding_model.encode([query])[0]
# Calculate cosine similarity with all service embeddings
similarities = {}
for service_name, service_embedding in self.embeddings.items():
# Cosine similarity
similarity = np.dot(query_embedding, service_embedding) / (
np.linalg.norm(query_embedding) * np.linalg.norm(service_embedding)
)
similarities[service_name] = similarity
# Sort by similarity
sorted_services = sorted(similarities.items(), key=lambda x: x[1], reverse=True)
# Return top k results
results = []
for service_name, score in sorted_services[:top_k]:
results.append((service_name, float(score), self.services[service_name]))
return results
# Initialize the embeddings system
service_embeddings = ServiceEmbeddings()
@mcp.tool()
def get_mobi_toc() -> str:
"""
Tool này dùng để lấy toàn bộ danh mục (Mục lục) các dịch vụ và giải pháp của MobiFone.
Giúp bạn có cái nhìn tổng quan về các nhóm sản phẩm (Cloud, Giải pháp số, Nội dung số) và danh sách các dịch vụ đi kèm.
"""
try:
toc = service_embeddings.get_toc()
return json.dumps(toc, ensure_ascii=False, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, ensure_ascii=False, indent=2)
@mcp.tool()
def search_services(query: str, top_k: int = 5) -> str:
"""
Tool này dùng để tìm kiếm thông tin chi tiết về các dịch vụ hoặc giải pháp số của mobifone.
query: câu hỏi của khách hàng
top_k: mặc định bằng 5
Trả về: 1 json chứa thông tin về dịch vụ mà khách hàng muốn biết
"""
if top_k > 5:
top_k = 5
try:
results = service_embeddings.search(query, top_k)
output = {
"query": query,
"results": []
}
for service_name, score, service_details in results:
output["results"].append({
"service_name": service_name,
"relevance_score": round(score, 4),
"details": service_details
})
return json.dumps(output, ensure_ascii=False, indent=2)
except Exception as e:
return json.dumps({
"error": str(e),
"query": query
}, ensure_ascii=False, indent=2)
if __name__ == "__main__":
service_embeddings.load_embeddings()
test_query = " phân phối lưu lượng"
result = search_services(test_query, top_k=1)
with open("output.txt", "w", encoding="utf-8") as f:
f.write(result)
mcp.run(transport="stdio")