"""
Local AI Model Manager for MCP Data Manager
Supports various types of local AI models for data analysis and processing
"""
import os
import json
import asyncio
from typing import List, Dict, Any, Optional, Union
from pathlib import Path
import numpy as np
from datetime import datetime
# AI/ML imports
try:
from transformers import pipeline, AutoTokenizer, AutoModel
from sentence_transformers import SentenceTransformer
import torch
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
AI_AVAILABLE = True
except ImportError:
AI_AVAILABLE = False
print("Warning: AI dependencies not available. Install with: pip install transformers torch sentence-transformers scikit-learn")
class LocalAIModel:
"""Local AI Model Manager"""
def __init__(self, model_type: str = "sentence_transformer", model_name: str = "all-MiniLM-L6-v2"):
self.model_type = model_type
self.model_name = model_name
self.model = None
self.tokenizer = None
self.vectorizer = None
self.is_loaded = False
# Model cache directory
self.cache_dir = Path(__file__).parent.parent.parent / "models"
self.cache_dir.mkdir(exist_ok=True)
# Load model if AI dependencies are available
if AI_AVAILABLE:
self._load_model()
def _load_model(self):
"""Load the specified AI model"""
try:
if self.model_type == "sentence_transformer":
print(f"Loading sentence transformer model: {self.model_name}")
self.model = SentenceTransformer(self.model_name, cache_folder=str(self.cache_dir))
self.is_loaded = True
elif self.model_type == "text_generation":
print(f"Loading text generation model: {self.model_name}")
self.model = pipeline("text-generation", model=self.model_name, cache_dir=str(self.cache_dir))
self.is_loaded = True
elif self.model_type == "text_classification":
print(f"Loading text classification model: {self.model_name}")
self.model = pipeline("text-classification", model=self.model_name, cache_dir=str(self.cache_dir))
self.is_loaded = True
elif self.model_type == "sentiment_analysis":
print(f"Loading sentiment analysis model: {self.model_name}")
self.model = pipeline("sentiment-analysis", model=self.model_name, cache_dir=str(self.cache_dir))
self.is_loaded = True
elif self.model_type == "tfidf":
print("Initializing TF-IDF vectorizer")
self.vectorizer = TfidfVectorizer(max_features=1000, stop_words='english')
self.is_loaded = True
print(f"✅ Model loaded successfully: {self.model_type}")
except Exception as e:
print(f"❌ Error loading model: {e}")
self.is_loaded = False
async def analyze_text(self, text: str) -> Dict[str, Any]:
"""Analyze text using the loaded model"""
if not self.is_loaded:
return {"error": "Model not loaded"}
try:
if self.model_type == "sentence_transformer":
# Get embeddings
embeddings = self.model.encode(text)
return {
"embeddings": embeddings.tolist(),
"embedding_dim": len(embeddings),
"model": self.model_name
}
elif self.model_type == "text_generation":
# Generate text continuation
result = self.model(text, max_length=100, num_return_sequences=1)
return {
"generated_text": result[0]["generated_text"],
"model": self.model_name
}
elif self.model_type == "text_classification":
# Classify text
result = self.model(text)
return {
"classification": result,
"model": self.model_name
}
elif self.model_type == "sentiment_analysis":
# Analyze sentiment
result = self.model(text)
return {
"sentiment": result,
"model": self.model_name
}
elif self.model_type == "tfidf":
# TF-IDF analysis
tfidf_matrix = self.vectorizer.fit_transform([text])
feature_names = self.vectorizer.get_feature_names_out()
return {
"tfidf_scores": dict(zip(feature_names, tfidf_matrix.toarray()[0])),
"model": "TF-IDF"
}
except Exception as e:
return {"error": f"Analysis failed: {str(e)}"}
async def analyze_items(self, items: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Analyze a list of items"""
if not self.is_loaded:
return {"error": "Model not loaded"}
try:
# Extract text from items
texts = []
for item in items:
text_parts = []
if item.get('name'):
text_parts.append(item['name'])
if item.get('description'):
text_parts.append(item['description'])
if item.get('category'):
text_parts.append(item['category'])
texts.append(' '.join(text_parts))
if self.model_type == "sentence_transformer":
# Get embeddings for all items
embeddings = self.model.encode(texts)
# Calculate similarities
similarities = cosine_similarity(embeddings)
return {
"embeddings": embeddings.tolist(),
"similarities": similarities.tolist(),
"item_count": len(items),
"model": self.model_name
}
elif self.model_type == "tfidf":
# TF-IDF analysis for all items
tfidf_matrix = self.vectorizer.fit_transform(texts)
feature_names = self.vectorizer.get_feature_names_out()
return {
"tfidf_matrix": tfidf_matrix.toarray().tolist(),
"feature_names": feature_names.tolist(),
"item_count": len(items),
"model": "TF-IDF"
}
else:
# Analyze each item individually
results = []
for i, text in enumerate(texts):
if text.strip():
analysis = await self.analyze_text(text)
results.append({
"item_index": i,
"item_id": items[i].get('id'),
"analysis": analysis
})
return {
"item_analyses": results,
"item_count": len(items),
"model": self.model_name
}
except Exception as e:
return {"error": f"Batch analysis failed: {str(e)}"}
async def find_similar_items(self, query: str, items: List[Dict[str, Any]], top_k: int = 5) -> Dict[str, Any]:
"""Find items similar to a query"""
if not self.is_loaded:
return {"error": "Model not loaded"}
try:
if self.model_type == "sentence_transformer":
# Get embeddings for query and items
query_embedding = self.model.encode([query])
item_texts = []
for item in items:
text_parts = []
if item.get('name'):
text_parts.append(item['name'])
if item.get('description'):
text_parts.append(item['description'])
if item.get('category'):
text_parts.append(item['category'])
item_texts.append(' '.join(text_parts))
item_embeddings = self.model.encode(item_texts)
# Calculate similarities
similarities = cosine_similarity(query_embedding, item_embeddings)[0]
# Get top-k similar items
top_indices = np.argsort(similarities)[::-1][:top_k]
results = []
for idx in top_indices:
results.append({
"item": items[idx],
"similarity_score": float(similarities[idx]),
"rank": len(results) + 1
})
return {
"query": query,
"similar_items": results,
"model": self.model_name
}
else:
return {"error": "Similarity search only supported with sentence transformers"}
except Exception as e:
return {"error": f"Similarity search failed: {str(e)}"}
def get_model_info(self) -> Dict[str, Any]:
"""Get information about the loaded model"""
return {
"model_type": self.model_type,
"model_name": self.model_name,
"is_loaded": self.is_loaded,
"ai_available": AI_AVAILABLE,
"cache_dir": str(self.cache_dir)
}
async def change_model(self, model_type: str, model_name: str) -> Dict[str, Any]:
"""Change to a different model"""
self.model_type = model_type
self.model_name = model_name
self.model = None
self.tokenizer = None
self.vectorizer = None
self.is_loaded = False
if AI_AVAILABLE:
self._load_model()
return self.get_model_info()
# Global AI model instance
ai_model = LocalAIModel()