Skip to main content
Glama

privateGPT MCP Server

by Fujitsu-AI
Api.py18.5 kB
import json import os import posixpath from pathlib import Path from time import sleep import paramiko import requests import urllib3 import base64 from httpcore import NetworkError from clients.Gradio.config import Config urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) def initialize_session(proxy_user, proxy_password, access_header): """Set up the session with proxy authentication.""" session = requests.Session() session.verify = False headers = { 'Accept': 'application/json', 'Content-Type': 'application/json', } if access_header is not None: headers['X-Custom-Header'] = access_header elif proxy_user is not None and proxy_password is not None: auth = base64.b64encode(f"{proxy_user}:{proxy_password}".encode()).decode() headers['Authorization'] = f'Basic {auth}' session.headers.update(headers) return session class PrivateGPTAPI: def __init__(self, config, client_api_key=None): """Initialize the chat client with proxy authentication.""" self.token = None self.chat_id = None self.base_url = config.get("base_url") self.proxy_user = config.get("proxy_user", None) if self.proxy_user == "": self.proxy_user = None self.proxy_password = config.get("proxy_password", None) if self.proxy_password == "": self.proxy_password = None self.access_header = config.get("access_header", None) if self.access_header == "": self.access_header = None self.chosen_groups = config.get("groups", []) self.language = config.get("language", "en") self.use_public = config.get("use_public", False) self.whitelist_keys = config.get("whitelist_keys", []) self.logged_in = False if client_api_key is not None: self.email, self.password = decrypt_api_key(client_api_key) if len(self.whitelist_keys) > 0: if client_api_key not in self.whitelist_keys: print("not authorized") else: self.email = config.get("email", None) self.password = config.get("password", None) self.ftp_password = config.get("ftp_password", None) self.session = initialize_session(self.proxy_user, self.proxy_password, self.access_header) if self.login(): self.logged_in = True if self.ftp_password is not None: self.ftp_host = config.get("ftp_host", None) self.ftp_port = config.get("ftp_port", None) self.ftp_folder = config.get("ftp_folder", "/") self.ftp_subfolder = config.get("ftp_subfolder", "temp") def login(self): """Authenticate the user and retrieve the token.""" url = f"{self.base_url}/login" payload = {"email": self.email, "password": self.password} try: response = self.session.post(url, json=payload) print(response.content) response.raise_for_status() data = response.json() self.token = data['data']['token'] # Prüfen, ob der Header bereits existiert if 'Authorization' in self.session.headers: self.session.headers['Authorization'] += f', Bearer {self.token}' else: self.session.headers['Authorization'] = f'Bearer {self.token}' self.chat_id = None print("✅ Login successful.") return True except requests.exceptions.RequestException as e: print(f"❌ Login failed: {e}") return False def create_chat(self, user_input): """Start a new chat session. This method sends a POST request to the '/chats' endpoint with the provided parameters. It initializes a new chat session and stores the chat ID for future use. """ url = f"{self.base_url}/chats" payload = { "language": self.language, "question": user_input, # Initial question to start the chat "usePublic": self.use_public, "groups": self.chosen_groups } try: response = self.session.post(url, json=payload) response.raise_for_status() # Raise an exception if the response was not successful data = response.json() self.chat_id = data['data']['chatId'] # Store the chat ID for future use print("✅ Chat initialized.") resp = response.json() try: answer = resp.get('data', None).get('answer', "error") except: print(response.json()) resp = {"data": {"answer": "error"} } answer = "error" if answer.startswith("{\"role\":"): answerj = json.loads(answer) resp["data"]["answer"] = answerj["content"] resp["data"]["chatId"] = "0" print(f"💡 Response: {answer}") return resp except requests.exceptions.RequestException as e: # It seems we get disconnections from time to time.. # print(f"⚠️ Failed to get response on first try, trying again..: {e}") try: response = self.session.patch(url, json=payload) response.raise_for_status() data = response.json() answer = data.get('data', {}).get('answer', "No answer provided.") print(f"💡 Response: {answer}") return data except: print(f"❌ Failed to get response: {e}") return {"error": f"❌ Failed to get response: {e}"} def list_personal_groups(self): url = f"{self.base_url}/groups" try: resp = self.session.get(url) try: j = json.loads(resp.content) data_block = j["data"] if not data_block: return [] personal = data_block.get("personalGroups", []) return personal except: return [] except NetworkError as e: return [] def get_document_info(self, id): url = f"{self.base_url}/sources/{id }" try: resp = self.session.get(url) j = json.loads(resp.content) data_block = j["data"] if not data_block: return [] return data_block except NetworkError as e: return [] def query_private_gpt(self, user_input) -> json: """Send a question to the chat and retrieve the response.""" if not self.chat_id: print("❌ Chat session not initialized.") return False url = f"{self.base_url}/chats/{self.chat_id}" payload = {"question": user_input} try: response = self.session.patch(url, json=payload) # response.raise_for_status() resp = response.json() try: answer = resp.get('data', None).get('answer', "error") except: print(response.json()) resp = {"data": {"answer": "error"} } answer = "error" if answer.startswith("{\"role\":"): answerj = json.loads(answer) resp["data"]["answer"] = answerj["content"] resp["data"]["chatId"] = "0" print(f"💡 Response: {answer}") return resp except requests.exceptions.RequestException as e: # It seems we get disconnections from time to time.. # print(f"⚠️ Failed to get response on first try, trying again..: {e}") try: response = self.session.patch(url, json=payload) response.raise_for_status() data = response.json() answer = data.get('data', {}).get('answer', "No answer provided.") print(f"💡 Response: {answer}") return data except: print(f"❌ Failed to get response: {e}") return {"error": f"❌ Failed to get response: {e}"} def add_source(self, markdown, groups, name): """Send a source id to retrieve details. Working with version 1.3.3 and newer""" url = f"{self.base_url}/sources" try: payload = { "name": name, "groups": groups, "content": markdown } resp = self.session.post(url, json=payload) j = json.loads(resp.content) data_block = j["data"] if not data_block: return [] return data_block except requests.exceptions.RequestException as e: print(f"❌ Failed to get response: {e}") return {"error": f"❌ Failed to get response: {e}"} def update_source(self, source_id, markdown=None, groups=None, name=None): """Edit an existing Source""" url = f"{self.base_url}/sources/{source_id}" try: payload = {} if groups is None: existing_groups = self.get_document_info(source_id)["groups"] payload["groups"] = existing_groups else: payload["groups"] = groups if markdown is not None: payload["content"] = markdown if name is not None: payload["name"] = name resp = self.session.patch(url, json=payload) j = json.loads(resp.content) data_block = j["data"] if not data_block: return [] return data_block except requests.exceptions.RequestException as e: print(f"❌ Failed to get response: {e}") return {"error": f"❌ Failed to get response: {e}"} def delete_source(self, source_id): """Send a source id to retrieve details. Working with version 1.3.3 and newer""" url = f"{self.base_url}/sources/{source_id}" try: resp = self.session.delete(url) j = json.loads(resp.content) message = j["message"] if not message: return "failed" return message except requests.exceptions.RequestException as e: print(f"❌ Failed to get response: {e}") return {"error": f"❌ Failed to get response: {e}"} def upload_sftp(self, file_path): # Connect to SFTP to determine existing suffixes transport = paramiko.Transport((self.ftp_host, self.ftp_port)) transport.connect(username=self.email, password=self.ftp_password) sftp = paramiko.SFTPClient.from_transport(transport) remote_base_dir = posixpath.join(self.ftp_folder, self.ftp_subfolder) # Ensure the remote directory exists try: sftp.chdir(remote_base_dir) except IOError: # Create remote dirs if missing parts = remote_base_dir.strip("/").split("/") path = "" for part in parts: path = posixpath.join(path, part) try: sftp.chdir(path) except IOError: sftp.mkdir(path) sftp.chdir(path) # Determine remote file name remote_filename = os.path.basename(file_path) remote_path = posixpath.join(remote_base_dir, remote_filename) # Upload the file try: sftp.put(file_path, remote_path) print(f"Uploaded {file_path} to {remote_path} successfully.") except Exception as e: print(e) finally: sftp.close() transport.close() print(f"Connection closed") sources = [] while len(sources) == 0: print(f"Checking file status") sleep(2) sources = self.get_sources_from_group("temp") return sources def get_sources_from_group(self, group): """Send a source id to retrieve details. Working with version 1.3.3 and newer""" url = f"{self.base_url}/sources/groups" try: payload = { "groupName": group } resp = self.session.post(url, json=payload) j = json.loads(resp.content) data_block = j["data"] if not data_block: return [] sources = [] for source in data_block["sources"]: doc = self.get_document_info(source) sources.append(doc) return sources except requests.exceptions.RequestException as e: print(f"❌ Failed to get response: {e}") return [] def respond_with_context(self, messages, response_format=None, request_tools=None): last_user_message = next((p for p in reversed(messages) if p["role"] == "user"), None) user_input = "" for message in messages: if message["role"] == "system": user_input = str(message) + "\n" if last_user_message is not None: user_input += last_user_message["content"] last_assistant_message = next((p for p in reversed(messages) if p["role"] == "assistant"), None) last_tool_message = next((p for p in reversed(messages) if p["role"] == "tool"), None) hastoolresult = False if last_tool_message is not None and last_assistant_message is not None and last_assistant_message.tool_calls is not None and len( last_assistant_message.tool_calls) > 0: user_input += "\nYou called the tool: " + str( last_assistant_message.tool_calls[0]) + ". The result was: " + last_tool_message.content hastoolresult = True print(f"💁 Request: " + user_input) # PGPT manages history and context itself so we don't need to forward the history. add_context = False if add_context: messages.pop() user_input += "\nHere is some context about the previous conversation:\n" for message in messages: user_input += f"{message.role}: {message.content}\n" if response_format is not None: print("Response format: " + str(response_format)) user_input += add_response_format(response_format) if request_tools is not None and not hastoolresult: user_input += add_tools(request_tools, last_tool_message) if not self.logged_in: self.login() else: if self.chat_id is None: result = self.create_chat(user_input) else: result = self.query_private_gpt(user_input) if 'data' in result: response_data = result.get("data") if request_tools is not None and not hastoolresult and is_json( clean_response(response_data.get("answer"))): response_data["tool_call"] = clean_response(response_data.get("answer", "")) return response_data elif 'error' in result: # Try to login again and send the query once more on error. if self.login(): if self.chat_id is None: result = self.create_chat(user_input) else: result = self.query_private_gpt(user_input) if 'data' in result: return result['data'] else: return result else: return result def is_json(myjson): try: json.loads(myjson) except ValueError as e: return False return True def add_response_format(response_format): # prompt = "\nPlease fill in the following template with realistic and appropriate information. Be creative. The field 'type' defines the output format. In your reply, only return the generated json\n" prompt = "\nPlease fill in the following json template with realistic and appropriate information. In your reply, only return the generated json. If you can't answer return an empty json.\n" prompt += json.dumps(response_format) return prompt def add_tools(response_tools, last_tool_message): prompt = "\nPlease select the fitting provided tool to create your answer. Only return the generated result of the tool. Do not describe what you are doing, just return the json.\n" index = 1 for tool in response_tools: prompt += "\n" + json.dumps(tool) + "\n" index += 1 return prompt def clean_response(response): # Remove artefacts from reply here response = response.replace("[TOOL_CALLS]", "") return response def decrypt_api_key(api_key): """ This is PoC code and methods should be replaced with a more secure way to deal with credentials (e.g. in a db) """ try: base64_bytes = api_key.encode("ascii") decoded_string_bytes = base64.b64decode(base64_bytes) decoded_key = decoded_string_bytes.decode("ascii") except Exception as e: print(e) decoded_key = "invalid:invalid" return decoded_key.split(":")[0], decoded_key.split(":")[1] def main(): """Main function to run the chat application.""" config_file = Path.absolute(Path(__file__).parent / "config.json") config = Config(config_file=config_file, required_fields=["base_url"]) chat = PrivateGPTAPI(config) print("Type your questions below. Type 'quit' to exit.") while True: try: question = input("❓ Question: ").strip() if question.lower() == 'quit': break if question: if chat.chat_id is None: chat.create_chat(question) else: chat.query_private_gpt(question) except KeyboardInterrupt: print("\nExiting chat...") break except Exception as e: print(f"❌ Error: {str(e)}") break if __name__ == "__main__": main()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Fujitsu-AI/MCP-Server-for-MAS-Developments'

If you have feedback or need assistance with the MCP directory API, please join our Discord server