privateGPT MCP Server
by Fujitsu-AI
- agents
- SourceManagerAgent
- Python
# Python/api_server.py
import ssl
from pathlib import Path
from flask import Flask, request, jsonify
import logging
import json
import threading
from waitress import serve
from flask_cors import CORS # Import von Flask-CORS
from agents.AgentInterface.Python.color import Color
from agents.SourceManagerAgent.Python.file_tools.loader_factory import LoadersFactory
from agents.SourceManagerAgent.Python.local_db import create_sql_table, list_db, add_to_sql_table, \
delete_from_sql_table, \
get_from_sql_table, get_all_db_entries
from ...AgentInterface.Python.agent import PrivateGPTAgent
from ...AgentInterface.Python.config import Config, ConfigError
import os
import platform
import socket
#os.environ.setdefault("USER_AGENT", "PGPT")
app = Flask(__name__)
# Konfiguration von CORS
# Erlaubt spezifische Origins, z.B., 'http://localhost:5500'
# Ändern Sie die Origins entsprechend Ihrer tatsächlichen Frontend-URL
#CORS(app, resources={r"/*": {"origins": "http://192.168.100.185:5500"}}, supports_credentials=False)
# Konfiguration laden
import os
class FileUploadAgent(PrivateGPTAgent):
# Konfigurieren von Logging für Flask (Werkzeug)
werkzeug_logger = logging.getLogger('werkzeug')
werkzeug_logger.setLevel(logging.ERROR) # Nur Fehler, keine Warnungen
werkzeug_handler = logging.FileHandler('flask.log') # Logs in separate Datei
werkzeug_handler.setLevel(logging.ERROR)
werkzeug_formatter = logging.Formatter('%(asctime)s %(levelname)s: %(message)s')
werkzeug_handler.setFormatter(werkzeug_formatter)
werkzeug_logger.addHandler(werkzeug_handler)
def display_startup_header(self):
server_ip = config.get("api_ip", "0.0.0.0")
server_port = config.get("api_port", 8000)
api_key_status = "✔️ Set" if self.api_key != "default_api_key" else "❌ Not Set"
header = f"""
────────────────────────────────────────────────
Fujitsu PrivateGPT ChatBot Agent - Startup
────────────────────────────────────────────────
System Information:
- Hostname : {socket.gethostname()}
- Operating Sys : {platform.system()} {platform.release()}
- Python Version: {platform.python_version()}
Server Configuration:
- API Endpoint : http://{server_ip}:{server_port}
- API Key Status: {api_key_status}
Logs:
- Flask Log : flask.log
- Agent Log : agent.log
────────────────────────────────────────────────
🚀 Ready to serve requests!
"""
print(header)
print(f"Current working directory: {os.getcwd()}")
def display_help_header(self):
header = f"""
────────────────────────────────────────────────
Fujitsu PrivateGPT SourceManager Agent - Commands
────────────────────────────────────────────────
Upload:
- upload file: <Filepath> Uploads the content of a file (pdf, csv, xdls, md)
- upload content: <Content> Uploads content from user input
List:
- list: pgpt Lists documents on PGPT server
- list: db Lists documents known in local database
- info: <ID> Shows info for an id from the local database
Delete:
- delete: <ID> Deletes a Document from PGPT and local DB
- delete: unknown Deletes all Documents on PGPT that are not in the local DB, and delete documents locally that are not on PGPT
- delete: all Deletes all Documents in the selected groups
────────────────────────────────────────────────
"""
print(header)
print(f"Current working directory: {os.getcwd()}")
@app.before_request
def authenticate(self):
# Erlaube OPTIONS-Anfragen ohne Authentifizierung
if request.method == 'OPTIONS':
return
if request.endpoint != 'status': # Optional: Status-Endpoint ohne Auth
provided_key = request.headers.get('X-API-KEY')
if not provided_key or provided_key != self.api_key:
return jsonify({"error": "Unauthorized"}), 401
@app.route('/logs', methods=['GET'])
def view_logs(self):
"""
Endpoint, um das Flask-Log per Browser anzuzeigen.
"""
try:
with open('flask.log', 'r') as log_file:
log_content = log_file.read()
# Rückgabe des Logs als Text
return f"<pre>{log_content}</pre>", 200 # <pre> für eine schönere Darstellung
except FileNotFoundError:
return "Log file not found.", 404
except Exception as e:
return f"An error occurred: {str(e)}", 500
@app.route('/status', methods=['GET'])
def status(self):
"""
Endpoint zur Überprüfung des Serverstatus.
"""
return jsonify({"status": "PrivateGPT Agent is running."}), 200
def run_api_server(self):
# Flask-Server konfigurieren
server_ip = config.get("api_ip", "0.0.0.0")
server_port = config.get("api_port", 5001)
serve(app, host=server_ip, port=int(server_port))
def delete_source(self, source_id, use_ssl=False, accept_self_signed=False):
"""
Sends a request to the MCP server to delete an existing source.
:param server_ip: IP address of the MCP server
:param server_port: Port number of the MCP server
:param token: Authorization token
:param source_id: ID of the source to delete
:param use_ssl: Whether to use SSL/TLS for the connection
:param accept_self_signed: Whether to accept self-signed certificates
:return: Response from the server
"""
payload = {
"command": "delete_source",
"token": self.token,
"arguments": {
"sourceId": source_id
}
}
payload_json = json.dumps(payload)
raw_socket = None
client_socket = None
try:
raw_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
raw_socket.settimeout(10)
if use_ssl:
context = ssl.create_default_context()
if accept_self_signed:
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
client_socket = context.wrap_socket(raw_socket, server_hostname=self.server_ip)
else:
client_socket = raw_socket
client_socket.connect((self.server_ip, self.server_port))
client_socket.sendall(payload_json.encode('utf-8'))
response = b""
while True:
part = client_socket.recv(4096)
if not part:
break
response += part
return response.decode('utf-8')
except ssl.SSLError:
return "Error: Server and/or client may require TLS encryption. Please enable SSL/TLS."
except Exception as e:
return f"Error: {e}"
finally:
if client_socket is not None:
try:
client_socket.shutdown(socket.SHUT_RDWR)
except:
pass
client_socket.close()
def send_list_sources_request(self, group_name, use_ssl=False, accept_self_signed=False):
"""
Sends a request to list sources in a specific group to the MCP server.
:param server_ip: IP address of the MCP server
:param server_port: Port number of the MCP server
:param token: Authentication token
:param group_name: Name of the group to list sources from
:param use_ssl: Whether to use SSL/TLS for the connection
:param accept_self_signed: Whether to accept self-signed certificates
:return: Response from the server
"""
payload = {
"command": "list_sources",
"token": self.token,
"attributes": {
"groupName": group_name
}
}
payload_json = json.dumps(payload)
raw_socket = None
client_socket = None
try:
raw_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
raw_socket.settimeout(10)
if use_ssl:
context = ssl.create_default_context()
if accept_self_signed:
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
client_socket = context.wrap_socket(raw_socket, server_hostname=self.server_ip)
else:
client_socket = raw_socket
client_socket.connect((self.server_ip, self.server_port))
client_socket.sendall(payload_json.encode('utf-8'))
response = b""
while True:
part = client_socket.recv(4096)
if not part:
break
response += part
return response.decode('utf-8')
except ssl.SSLError:
return "Error: Server and/or client may require TLS encryption. Please enable SSL/TLS."
except Exception as e:
return f"Error: {e}"
finally:
if client_socket is not None:
try:
client_socket.shutdown(socket.SHUT_RDWR)
except:
pass
client_socket.close()
def send_create_source_request(self, name, content, groups, use_ssl=False,
accept_self_signed=False):
"""
Sends a request to create a new source to the MCP server.
:param server_ip: IP address of the MCP server
:param server_port: Port number of the MCP server
:param token: Authentication token
:param name: Name of the new source
:param content: Content to be formatted as markdown
:param groups: List of groups to assign the source to
:param use_ssl: Whether to use SSL/TLS for the connection
:param accept_self_signed: Whether to accept self-signed certificates
:return: Response from the server
"""
payload = {
"command": "create_source",
"token": self.token,
"arguments": {
"name": name,
"content": content,
"groups": groups or []
}
}
# Convert the payload to a JSON string
payload_json = json.dumps(payload)
raw_socket = None
client_socket = None
try:
# Create a socket object
raw_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
raw_socket.settimeout(10)
# Establish SSL/TLS connection if required
if use_ssl:
context = ssl.create_default_context()
if accept_self_signed:
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
client_socket = context.wrap_socket(raw_socket, server_hostname=self.server_ip)
else:
client_socket = raw_socket
# Connect to the server
client_socket.connect((self.server_ip, self.server_port))
# Send the request
client_socket.sendall(payload_json.encode('utf-8'))
# Receive the response
response = b""
while True:
part = client_socket.recv(4096)
if not part:
break
response += part
# Decode the response
return response.decode('utf-8')
except ssl.SSLError:
return "Error: Server and/or client may require TLS encryption. Please enable SSL/TLS."
except Exception as e:
return f"Error: {e}"
finally:
if client_socket is not None:
try:
client_socket.shutdown(socket.SHUT_RDWR)
except:
pass
client_socket.close()
def run(self):
if not self.token:
logging.error(self.get_lang_message("authentication_failed"))
print(self.get_lang_message("authentication_failed"), flush=True)
return
# personal_groups = self.list_personal_groups()
groups = self.chosen_groups
#for group in groups:
db = Path.absolute(Path(__file__).parent.parent / f"database/documents.sql")
create_sql_table(db) #will only be created if not existent
welcome_msg = f"{Color.OKGREEN}{self.get_lang_message('welcome')}{Color.ENDC}"
print(welcome_msg, flush=True)
logging.info(self.get_lang_message("user_interface_started"))
while True:
try:
user_input = input(f"{Color.OKBLUE}{self.get_lang_message('user_question')}{Color.ENDC}")
if user_input.strip().lower() == "delete: unknown":
available_documents = self.send_list_sources_request(groups[0])
sources = json.loads(available_documents)["sources"]
for sourceid in sources:
db_entry = get_from_sql_table(db, sourceid)
if db_entry is None:
print("deleting: " + sourceid)
response = self.delete_source(sourceid)
print(response)
else:
print("keeping: " + sourceid + " " + db_entry.file + " " + db_entry.content[:100])
print("Cleaning local database of deleted entries...")
available_documents = self.send_list_sources_request(groups[0])
sources = json.loads(available_documents)["sources"]
db_entries = get_all_db_entries(db)
for document in db_entries:
if document.id not in sources:
delete_from_sql_table(db, document.id)
print(".. done.")
elif user_input.strip().lower() == "delete: all":
available_documents = self.send_list_sources_request(groups[0])
sources = json.loads(available_documents)["sources"]
for sourceid in sources:
response = self.delete_source(sourceid)
print(response)
elif user_input.strip().lower().startswith("delete: "):
sourceid = user_input.strip().lower()[8:]
print(sourceid)
#if success delete from db...
response = self.delete_source(sourceid)
delete_from_sql_table(db, sourceid)
elif user_input.strip().lower() == "list: pgpt":
print("Documents in " + groups[0] + ":")
available_documents = self.send_list_sources_request(groups[0])
sources = json.loads(available_documents)["sources"]
print(sources)
elif user_input.strip().lower() == "list: db":
list_db(db)
elif user_input.strip().startswith("info: "):
sourceid = user_input.strip().lower()[6:]
document = get_from_sql_table(db, sourceid)
if document is not None:
print("Id: " + document.id + " File: " + document.file + " User: " + document.user + " Group: " + document.groups + " Content: " + document.content )
else:
print("No entry found in local database")
elif user_input.strip().lower() == "exit":
goodbye_msg = f"{Color.OKGREEN}{self.get_lang_message('goodbye')}{Color.ENDC}"
print(goodbye_msg, flush=True)
logging.info(self.get_lang_message("session_ended"))
break
elif user_input.strip().lower().startswith("upload file: "):
file_path = user_input.strip()[13:].replace("\\", "\\\\").replace("\"", "")
# Get the file extension
file_extension = os.path.splitext(file_path)[1]
print(f"File Extension: {file_extension}")
content = ""
if file_extension == ".pdf":
content = LoadersFactory().pdf(file_path)
# todo pgpt is not happy with all formats
elif file_extension == ".csv":
content = LoadersFactory().csv(file_path)
elif file_extension == ".xlsx":
content = LoadersFactory().xlsx(file_path)
elif file_extension == ".md":
content = LoadersFactory().markdown(file_path)
#todo add more sources
markdown = LoadersFactory().convert_documents_to_markdown(content)
print(markdown)
result = self.send_create_source_request(os.path.basename(file_path), markdown, groups)
parsed_result = json.loads(result)
if "documentId" in parsed_result:
answer = parsed_result["documentId"]
head, tail = os.path.split(file_path)
file = tail
print(file)
add_to_sql_table(db, parsed_result["documentId"], markdown, str(groups), file, self.email)
print(f"{Color.OKGREEN}{self.get_lang_message('agent_answer', answer=answer)}{Color.ENDC}",
flush=True)
else:
error = parsed_result["error"]
print(f"{Color.FAIL}{self.get_lang_message('agent_error', error=error)}{Color.ENDC}",
flush=True)
elif user_input.strip().lower().startswith("upload content: "):
result = self.send_create_source_request(user_input, user_input, groups)
parsed_result = json.loads(result)
if "documentId" in parsed_result:
answer = parsed_result["documentId"]
file = "User Input"
add_to_sql_table(db, parsed_result["documentId"], user_input, str(groups), file, self.email)
print(f"{Color.OKGREEN}{self.get_lang_message('agent_answer', answer=answer)}{Color.ENDC}", flush=True)
else:
error = parsed_result["error"]
print(f"{Color.FAIL}{self.get_lang_message('agent_error', error=error)}{Color.ENDC}", flush=True)
except (KeyboardInterrupt, EOFError):
goodbye_msg = f"{Color.OKGREEN}{self.get_lang_message('goodbye')}{Color.ENDC}"
print(goodbye_msg, flush=True)
logging.info(self.get_lang_message("session_interrupted"))
break
if __name__ == '__main__':
# Starten des API-Servers in einem separaten Daemon-Thread
try:
config_file = Path.absolute(Path(__file__).parent.parent / "config.json")
config = Config(config_file=config_file, required_fields=["server_ip", "server_port", "email", "password"])
except ConfigError as e:
logging.error(f"Configuration Error: {e}")
exit(1)
agent = FileUploadAgent(config=config)
api_thread = threading.Thread(target=agent.run_api_server)
api_thread.daemon = True # Daemon-Thread
api_thread.start()
# Call this function right before the server starts
agent.display_startup_header()
agent.display_help_header()
# Starten des manuellen Chat-Interfaces
agent.run()