test_server.pyโข2.96 kB
import json
import uuid
import time
import threading
from sseclient import SSEClient
from openai import OpenAI
import os
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
VECTOR_STORE_ID = os.getenv("VECTOR_STORE_ID")
SERVER_URL = "http://localhost:8000/sse"
client = OpenAI(api_key=OPENAI_API_KEY)
# ---- Utility: vector store file list ----
def list_vector_store_files():
print("\n๐ Loading vector store files...\n")
files = client.vector_stores.files.list(vector_store_id=VECTOR_STORE_ID)
items = files.data
if not items:
print("โ No files found")
return None
for f in items:
print(f"- {f.id} | {f.filename}")
return items[0].id # return first file id
# ---- Send event to MCP server ----
def send_mcp_message(sse_connection, message_type, payload):
"""
FastMCP๋ SSE ์ก์ ์ ์ํด connection ๊ฐ์ฒด์ send_event ์ฌ์ฉ
๋จ, SSEClient๋ read-only ์ด๋ฏ๋ก ๋ณ๋ thread๋ก ์๋ฒ์ ์ด๋ฒคํธ POST
FastMCP์์๋ /sse endpoint๋ก ์ด๋ฒคํธ POST ๊ฐ๋ฅ
"""
import requests
data = {
"type": message_type,
**payload,
"message_id": str(uuid.uuid4())
}
print(f"\nโก๏ธ Sending MCP message ({message_type}):")
print(json.dumps(data, indent=2, ensure_ascii=False))
r = requests.post(
SERVER_URL,
json=data,
headers={"Content-Type": "application/json"}
)
print("POST status:", r.status_code)
if r.status_code >= 400:
print("Server returned error:")
print(r.text)
# ---- Main logic ----
def run_test():
print("\n๐ Connecting to MCP server SSE stream...\n")
messages = SSEClient(SERVER_URL)
# Start receiving in another thread
def listen():
for msg in messages:
if not msg.data.strip():
continue
print("\n๐ฅ MCP EVENT RECEIVED:")
try:
parsed = json.loads(msg.data)
print(json.dumps(parsed, indent=2, ensure_ascii=False))
except:
print(msg.data)
listener = threading.Thread(target=listen, daemon=True)
listener.start()
time.sleep(1)
# Search test
send_mcp_message(
messages,
"call_tool",
{
"name": "search",
"arguments": {"query": "example"}
}
)
time.sleep(2)
# Fetch test (auto detect file)
file_id = list_vector_store_files()
if file_id:
send_mcp_message(
messages,
"call_tool",
{
"name": "fetch",
"arguments": {"id": file_id}
}
)
else:
print("\nโ ๏ธ No vector store files โ skipping fetch test")
# Keep listening a bit longer
print("\nโ Waiting for responses...")
time.sleep(10)
if __name__ == "__main__":
run_test()