from .utils import catch_error_and_return
from qdrant_client.models import VectorParams, Distance, PointStruct, PointIdsList
from qdrant_client import QdrantClient
from typing import List, Dict, Any
from dotenv import load_dotenv
from fastmcp import FastMCP
import numpy as np
import requests
import os
load_dotenv()
qdrant_mcp = FastMCP(name="Qdrant server")
def create_connection_qdrant():
url = os.getenv("QDRANT_URL")
api_key = os.getenv("QDRANT_API_KEY")
client = QdrantClient(url=url, api_key=api_key)
return client
@qdrant_mcp.tool
@catch_error_and_return
def vector_create_collection(name: str, vector_size: int) -> str:
"""
Create a new vectors collection
:param name: collection name
:param vector_size: vector size in collection
:return: result of creation
"""
client = create_connection_qdrant()
if not client.collection_exists(name):
client.create_collection(
collection_name=name,
vectors_config=VectorParams(size=vector_size, distance=Distance.COSINE),
)
result = f"Collection `{name}` success created!"
else:
result = f"Collection `{name}` already exists!"
return result
@qdrant_mcp.tool
@catch_error_and_return
def vector_get_collection_info(name: str) -> str:
"""
Get collection info
:param name: collection name
:return: information about collection
"""
client = create_connection_qdrant()
if client.collection_exists(name):
collection_info = client.get_collection(name)
return collection_info.model_dump_json()
else:
return f"Collection `{name}` doesn't exist!"
@qdrant_mcp.tool
@catch_error_and_return
def vector_upsert_points(
collection_name: str,
vectors: List[List[float]],
data: List[Dict[str, Any]],
batch_size: int = 50,
) -> str:
"""
Add/update vectors in Qdrant collection
:param collection_name: Name of the collection
:param vectors: Numpy array of vectors (shape: [n_vectors, vector_dim])
:param data: List of dictionaries with point data. Structure:
[
{
"id": int, # Unique point ID
"payload": { # Metadata for filtering/search
"text": str, # Text content (required)
... # Any custom fields
},
# vector is passed separately in `vectors` param
},
...
]
:param batch_size: Batch size for insertion. default 50
:return: Operation status message
"""
client = create_connection_qdrant()
vectors = np.array(vectors)
if len(data) != vectors.shape[0]:
return str(
ValueError(
f"Data/vectors length mismatch: {len(data)} items vs {vectors.shape[0]} vectors"
)
)
# Конвертация векторов в список списков
vectors_list = vectors.tolist()
points = []
for i, item in enumerate(data):
points.append(
PointStruct(
id=item["id"], vector=vectors_list[i], payload=item["payload"]
)
)
print(points)
for i in range(0, len(points), batch_size):
batch = points[i:i + batch_size]
client.upsert(collection_name=collection_name, points=batch, wait=True)
return f"Successfully upsert {len(data)} vectors"
@qdrant_mcp.tool
@catch_error_and_return
def vector_search(
name_collection: str, query_vector: List[float], limit: int = 5
) -> list:
"""
Search for similar vectors
:param name_collection: name collection
:param query_vector: query vector
:param limit: limit closest points
:return:
"""
client = create_connection_qdrant()
query_vector = np.array(query_vector)
hits = client.search(
collection_name=name_collection, query_vector=query_vector, limit=limit
)
return [el.model_dump() for el in hits]
@qdrant_mcp.tool
@catch_error_and_return
def vector_delete_points(name_collection: str, ids: List[int]) -> str:
"""
Delete vectors from collection
:param name_collection: name collection
:param ids: list of ids points
:return: response status
"""
client = create_connection_qdrant()
client.delete(
collection_name=name_collection, points_selector=PointIdsList(points=ids)
)
return "Deleting vectors success!"
@qdrant_mcp.tool
def health_check():
"""
health check for server
:return: server status
"""
url = os.getenv("QDRANT_HEALTH_CHECK_URL")
api_key = os.getenv("QDRANT_API_KEY")
headers = {
"Authorization": f"Bearer {api_key}",
}
try:
result = requests.get(url, headers=headers)
if result.text == "healthz check passed":
return {"status": "ok"}
except Exception as ex:
return {"status": f"Error {ex}"}