privateGPT MCP Server

# Python/api_server.py import ssl from pathlib import Path from flask import Flask, request, jsonify import logging import json import threading from waitress import serve from flask_cors import CORS # Import von Flask-CORS from agents.AgentInterface.Python.color import Color from agents.SourceManagerAgent.Python.file_tools.loader_factory import LoadersFactory from agents.SourceManagerAgent.Python.local_db import create_sql_table, list_db, add_to_sql_table, \ delete_from_sql_table, \ get_from_sql_table, get_all_db_entries from ...AgentInterface.Python.agent import PrivateGPTAgent from ...AgentInterface.Python.config import Config, ConfigError import os import platform import socket #os.environ.setdefault("USER_AGENT", "PGPT") app = Flask(__name__) # Konfiguration von CORS # Erlaubt spezifische Origins, z.B., 'http://localhost:5500' # Ändern Sie die Origins entsprechend Ihrer tatsächlichen Frontend-URL #CORS(app, resources={r"/*": {"origins": "http://192.168.100.185:5500"}}, supports_credentials=False) # Konfiguration laden import os class FileUploadAgent(PrivateGPTAgent): # Konfigurieren von Logging für Flask (Werkzeug) werkzeug_logger = logging.getLogger('werkzeug') werkzeug_logger.setLevel(logging.ERROR) # Nur Fehler, keine Warnungen werkzeug_handler = logging.FileHandler('flask.log') # Logs in separate Datei werkzeug_handler.setLevel(logging.ERROR) werkzeug_formatter = logging.Formatter('%(asctime)s %(levelname)s: %(message)s') werkzeug_handler.setFormatter(werkzeug_formatter) werkzeug_logger.addHandler(werkzeug_handler) def display_startup_header(self): server_ip = config.get("api_ip", "0.0.0.0") server_port = config.get("api_port", 8000) api_key_status = "✔️ Set" if self.api_key != "default_api_key" else "❌ Not Set" header = f""" ──────────────────────────────────────────────── Fujitsu PrivateGPT ChatBot Agent - Startup ──────────────────────────────────────────────── System Information: - Hostname : {socket.gethostname()} - Operating Sys : {platform.system()} {platform.release()} - Python Version: {platform.python_version()} Server Configuration: - API Endpoint : http://{server_ip}:{server_port} - API Key Status: {api_key_status} Logs: - Flask Log : flask.log - Agent Log : agent.log ──────────────────────────────────────────────── 🚀 Ready to serve requests! """ print(header) print(f"Current working directory: {os.getcwd()}") def display_help_header(self): header = f""" ──────────────────────────────────────────────── Fujitsu PrivateGPT SourceManager Agent - Commands ──────────────────────────────────────────────── Upload: - upload file: <Filepath> Uploads the content of a file (pdf, csv, xdls, md) - upload content: <Content> Uploads content from user input List: - list: pgpt Lists documents on PGPT server - list: db Lists documents known in local database - info: <ID> Shows info for an id from the local database Delete: - delete: <ID> Deletes a Document from PGPT and local DB - delete: unknown Deletes all Documents on PGPT that are not in the local DB, and delete documents locally that are not on PGPT - delete: all Deletes all Documents in the selected groups ──────────────────────────────────────────────── """ print(header) print(f"Current working directory: {os.getcwd()}") @app.before_request def authenticate(self): # Erlaube OPTIONS-Anfragen ohne Authentifizierung if request.method == 'OPTIONS': return if request.endpoint != 'status': # Optional: Status-Endpoint ohne Auth provided_key = request.headers.get('X-API-KEY') if not provided_key or provided_key != self.api_key: return jsonify({"error": "Unauthorized"}), 401 @app.route('/logs', methods=['GET']) def view_logs(self): """ Endpoint, um das Flask-Log per Browser anzuzeigen. """ try: with open('flask.log', 'r') as log_file: log_content = log_file.read() # Rückgabe des Logs als Text return f"<pre>{log_content}</pre>", 200 # <pre> für eine schönere Darstellung except FileNotFoundError: return "Log file not found.", 404 except Exception as e: return f"An error occurred: {str(e)}", 500 @app.route('/status', methods=['GET']) def status(self): """ Endpoint zur Überprüfung des Serverstatus. """ return jsonify({"status": "PrivateGPT Agent is running."}), 200 def run_api_server(self): # Flask-Server konfigurieren server_ip = config.get("api_ip", "0.0.0.0") server_port = config.get("api_port", 5001) serve(app, host=server_ip, port=int(server_port)) def delete_source(self, source_id, use_ssl=False, accept_self_signed=False): """ Sends a request to the MCP server to delete an existing source. :param server_ip: IP address of the MCP server :param server_port: Port number of the MCP server :param token: Authorization token :param source_id: ID of the source to delete :param use_ssl: Whether to use SSL/TLS for the connection :param accept_self_signed: Whether to accept self-signed certificates :return: Response from the server """ payload = { "command": "delete_source", "token": self.token, "arguments": { "sourceId": source_id } } payload_json = json.dumps(payload) raw_socket = None client_socket = None try: raw_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) raw_socket.settimeout(10) if use_ssl: context = ssl.create_default_context() if accept_self_signed: context.check_hostname = False context.verify_mode = ssl.CERT_NONE client_socket = context.wrap_socket(raw_socket, server_hostname=self.server_ip) else: client_socket = raw_socket client_socket.connect((self.server_ip, self.server_port)) client_socket.sendall(payload_json.encode('utf-8')) response = b"" while True: part = client_socket.recv(4096) if not part: break response += part return response.decode('utf-8') except ssl.SSLError: return "Error: Server and/or client may require TLS encryption. Please enable SSL/TLS." except Exception as e: return f"Error: {e}" finally: if client_socket is not None: try: client_socket.shutdown(socket.SHUT_RDWR) except: pass client_socket.close() def send_list_sources_request(self, group_name, use_ssl=False, accept_self_signed=False): """ Sends a request to list sources in a specific group to the MCP server. :param server_ip: IP address of the MCP server :param server_port: Port number of the MCP server :param token: Authentication token :param group_name: Name of the group to list sources from :param use_ssl: Whether to use SSL/TLS for the connection :param accept_self_signed: Whether to accept self-signed certificates :return: Response from the server """ payload = { "command": "list_sources", "token": self.token, "attributes": { "groupName": group_name } } payload_json = json.dumps(payload) raw_socket = None client_socket = None try: raw_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) raw_socket.settimeout(10) if use_ssl: context = ssl.create_default_context() if accept_self_signed: context.check_hostname = False context.verify_mode = ssl.CERT_NONE client_socket = context.wrap_socket(raw_socket, server_hostname=self.server_ip) else: client_socket = raw_socket client_socket.connect((self.server_ip, self.server_port)) client_socket.sendall(payload_json.encode('utf-8')) response = b"" while True: part = client_socket.recv(4096) if not part: break response += part return response.decode('utf-8') except ssl.SSLError: return "Error: Server and/or client may require TLS encryption. Please enable SSL/TLS." except Exception as e: return f"Error: {e}" finally: if client_socket is not None: try: client_socket.shutdown(socket.SHUT_RDWR) except: pass client_socket.close() def send_create_source_request(self, name, content, groups, use_ssl=False, accept_self_signed=False): """ Sends a request to create a new source to the MCP server. :param server_ip: IP address of the MCP server :param server_port: Port number of the MCP server :param token: Authentication token :param name: Name of the new source :param content: Content to be formatted as markdown :param groups: List of groups to assign the source to :param use_ssl: Whether to use SSL/TLS for the connection :param accept_self_signed: Whether to accept self-signed certificates :return: Response from the server """ payload = { "command": "create_source", "token": self.token, "arguments": { "name": name, "content": content, "groups": groups or [] } } # Convert the payload to a JSON string payload_json = json.dumps(payload) raw_socket = None client_socket = None try: # Create a socket object raw_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) raw_socket.settimeout(10) # Establish SSL/TLS connection if required if use_ssl: context = ssl.create_default_context() if accept_self_signed: context.check_hostname = False context.verify_mode = ssl.CERT_NONE client_socket = context.wrap_socket(raw_socket, server_hostname=self.server_ip) else: client_socket = raw_socket # Connect to the server client_socket.connect((self.server_ip, self.server_port)) # Send the request client_socket.sendall(payload_json.encode('utf-8')) # Receive the response response = b"" while True: part = client_socket.recv(4096) if not part: break response += part # Decode the response return response.decode('utf-8') except ssl.SSLError: return "Error: Server and/or client may require TLS encryption. Please enable SSL/TLS." except Exception as e: return f"Error: {e}" finally: if client_socket is not None: try: client_socket.shutdown(socket.SHUT_RDWR) except: pass client_socket.close() def run(self): if not self.token: logging.error(self.get_lang_message("authentication_failed")) print(self.get_lang_message("authentication_failed"), flush=True) return # personal_groups = self.list_personal_groups() groups = self.chosen_groups #for group in groups: db = Path.absolute(Path(__file__).parent.parent / f"database/documents.sql") create_sql_table(db) #will only be created if not existent welcome_msg = f"{Color.OKGREEN}{self.get_lang_message('welcome')}{Color.ENDC}" print(welcome_msg, flush=True) logging.info(self.get_lang_message("user_interface_started")) while True: try: user_input = input(f"{Color.OKBLUE}{self.get_lang_message('user_question')}{Color.ENDC}") if user_input.strip().lower() == "delete: unknown": available_documents = self.send_list_sources_request(groups[0]) sources = json.loads(available_documents)["sources"] for sourceid in sources: db_entry = get_from_sql_table(db, sourceid) if db_entry is None: print("deleting: " + sourceid) response = self.delete_source(sourceid) print(response) else: print("keeping: " + sourceid + " " + db_entry.file + " " + db_entry.content[:100]) print("Cleaning local database of deleted entries...") available_documents = self.send_list_sources_request(groups[0]) sources = json.loads(available_documents)["sources"] db_entries = get_all_db_entries(db) for document in db_entries: if document.id not in sources: delete_from_sql_table(db, document.id) print(".. done.") elif user_input.strip().lower() == "delete: all": available_documents = self.send_list_sources_request(groups[0]) sources = json.loads(available_documents)["sources"] for sourceid in sources: response = self.delete_source(sourceid) print(response) elif user_input.strip().lower().startswith("delete: "): sourceid = user_input.strip().lower()[8:] print(sourceid) #if success delete from db... response = self.delete_source(sourceid) delete_from_sql_table(db, sourceid) elif user_input.strip().lower() == "list: pgpt": print("Documents in " + groups[0] + ":") available_documents = self.send_list_sources_request(groups[0]) sources = json.loads(available_documents)["sources"] print(sources) elif user_input.strip().lower() == "list: db": list_db(db) elif user_input.strip().startswith("info: "): sourceid = user_input.strip().lower()[6:] document = get_from_sql_table(db, sourceid) if document is not None: print("Id: " + document.id + " File: " + document.file + " User: " + document.user + " Group: " + document.groups + " Content: " + document.content ) else: print("No entry found in local database") elif user_input.strip().lower() == "exit": goodbye_msg = f"{Color.OKGREEN}{self.get_lang_message('goodbye')}{Color.ENDC}" print(goodbye_msg, flush=True) logging.info(self.get_lang_message("session_ended")) break elif user_input.strip().lower().startswith("upload file: "): file_path = user_input.strip()[13:].replace("\\", "\\\\").replace("\"", "") # Get the file extension file_extension = os.path.splitext(file_path)[1] print(f"File Extension: {file_extension}") content = "" if file_extension == ".pdf": content = LoadersFactory().pdf(file_path) # todo pgpt is not happy with all formats elif file_extension == ".csv": content = LoadersFactory().csv(file_path) elif file_extension == ".xlsx": content = LoadersFactory().xlsx(file_path) elif file_extension == ".md": content = LoadersFactory().markdown(file_path) #todo add more sources markdown = LoadersFactory().convert_documents_to_markdown(content) print(markdown) result = self.send_create_source_request(os.path.basename(file_path), markdown, groups) parsed_result = json.loads(result) if "documentId" in parsed_result: answer = parsed_result["documentId"] head, tail = os.path.split(file_path) file = tail print(file) add_to_sql_table(db, parsed_result["documentId"], markdown, str(groups), file, self.email) print(f"{Color.OKGREEN}{self.get_lang_message('agent_answer', answer=answer)}{Color.ENDC}", flush=True) else: error = parsed_result["error"] print(f"{Color.FAIL}{self.get_lang_message('agent_error', error=error)}{Color.ENDC}", flush=True) elif user_input.strip().lower().startswith("upload content: "): result = self.send_create_source_request(user_input, user_input, groups) parsed_result = json.loads(result) if "documentId" in parsed_result: answer = parsed_result["documentId"] file = "User Input" add_to_sql_table(db, parsed_result["documentId"], user_input, str(groups), file, self.email) print(f"{Color.OKGREEN}{self.get_lang_message('agent_answer', answer=answer)}{Color.ENDC}", flush=True) else: error = parsed_result["error"] print(f"{Color.FAIL}{self.get_lang_message('agent_error', error=error)}{Color.ENDC}", flush=True) except (KeyboardInterrupt, EOFError): goodbye_msg = f"{Color.OKGREEN}{self.get_lang_message('goodbye')}{Color.ENDC}" print(goodbye_msg, flush=True) logging.info(self.get_lang_message("session_interrupted")) break if __name__ == '__main__': # Starten des API-Servers in einem separaten Daemon-Thread try: config_file = Path.absolute(Path(__file__).parent.parent / "config.json") config = Config(config_file=config_file, required_fields=["server_ip", "server_port", "email", "password"]) except ConfigError as e: logging.error(f"Configuration Error: {e}") exit(1) agent = FileUploadAgent(config=config) api_thread = threading.Thread(target=agent.run_api_server) api_thread.daemon = True # Daemon-Thread api_thread.start() # Call this function right before the server starts agent.display_startup_header() agent.display_help_header() # Starten des manuellen Chat-Interfaces agent.run()