privateGPT MCP Server
by Fujitsu-AI
- agents
- AgentInterface
- Python
# agent.py
import logging
import json
import atexit
from .network import NetworkClient, NetworkError
from .color import Color
from .language import languages
class GroupValidationError(Exception):
"""Exception raised for errors in the group validation process."""
pass
class PrivateGPTAgent:
def __init__(self, config):
# mcp_server-Daten aus dem config-Objekt lesen
self.mcp_config = config.get("mcp_server")
# Lese host und port
self.mcp_host = self.mcp_config.get("host")
self.mcp_port = self.mcp_config.get("port")
self.server_ip = self.mcp_host
self.server_port = self.mcp_port
self.email = config.get("email")
self.password = config.get("password")
self.chosen_groups = config.get("groups", [])
self.language = config.get("language", "en") # Standard ist Englisch
if self.language not in languages:
self.language = "en"
logging.warning(f"Unsupported language '{config.get('language')}'. Falling back to English.")
self.lang = languages[self.language]
self.network_client = NetworkClient(self.server_ip, self.server_port, language=self.language)
self.token = None
atexit.register(self.logout)
# Initialer Login
self.login()
# Personalisierte Gruppen abholen
if self.token:
self.allowed_groups = self.list_personal_groups()
if not self.allowed_groups:
logging.warning(self.lang["no_personal_groups"])
print(self.lang["no_personal_groups"], flush=True)
self.allowed_groups = []
# Validierung der Gruppen
invalid = self.validate_groups(self.chosen_groups)
if invalid:
print(self.lang["invalid_group"].format(groups=invalid), flush=True)
logging.error(self.lang["invalid_group_error"])
raise GroupValidationError(self.lang["invalid_group"].format(groups=invalid))
else:
self.allowed_groups = []
# Lokale Wissensbasis (Beispiel)
self.knowledge_base = {
"What is AI?": self.lang["knowledge_ai"],
"Who created Python?": self.lang["knowledge_python"],
"What is Machine Learning?": self.lang["knowledge_ml"]
}
def get_lang_message(self, key, **kwargs):
message = self.lang.get(key, "Message not defined.")
try:
return message.format(**kwargs)
except KeyError as e:
logging.error(f"Missing placeholder in language file for key '{key}': {e}")
return message
def validate_groups(self, groups):
if groups is None:
return []
invalid = [g for g in groups if g not in self.allowed_groups]
if invalid:
logging.error(self.get_lang_message("group_validation_error", error=invalid))
return invalid
return []
def login(self):
payload = {
"command": "login",
"arguments": {
"email": self.email,
"password": self.password
}
}
logging.info(self.get_lang_message("login_attempt"))
try:
resp = self.network_client.send_request(payload)
#logging.info(self.get_lang_message("received_response", response=resp))
if resp.get("status") == 200 and resp.get("message") == "success":
self.token = resp.get("token")
logging.info(self.get_lang_message("login_success"))
return True
else:
msg = resp.get("message", self.get_lang_message("no_server_message"))
logging.error(self.get_lang_message("login_failed", message=msg))
return False
except NetworkError as e:
logging.error(self.get_lang_message("login_failed", message=str(e)))
return False
def list_personal_groups(self):
if not self.token:
logging.error(self.get_lang_message("authentication_failed"))
return []
payload = {
"command": "list_groups",
"token": self.token
}
try:
resp = self.network_client.send_request(payload)
data_block = resp.get("data")
if not data_block:
logging.warning(self.lang["no_data_in_response"].format(response=resp))
return []
if data_block.get("status") == 200 and data_block.get("message") == "success":
personal = data_block.get("personalGroups", [])
logging.info(self.lang["personal_groups"].format(groups=personal))
return personal
else:
logging.warning(self.lang["list_groups_failed"].format(
message=data_block.get("message", self.lang["no_server_message"])))
return []
except NetworkError as e:
logging.error(self.lang["list_groups_failed"].format(message=str(e)))
return []
def query_private_gpt(self, prompt, use_public=False, language="en", groups=None, _retry_on_token_expired=True):
if not self.token:
error_msg = self.get_lang_message("authentication_failed")
logging.error(error_msg)
return json.dumps({"error": error_msg})
if language not in languages:
language = 'en'
logging.warning(f"Unsupported language '{language}'. Falling back to English.")
lang = languages[language]
if groups is None:
groups = self.chosen_groups
else:
groups = [g.strip() for g in groups if g.strip()]
relevant_groups = [g for g in groups if g in self.allowed_groups]
payload = {
"command": "chat",
"token": self.token,
"arguments": {
"question": prompt,
"usePublic": use_public,
"groups": relevant_groups,
"language": language
}
}
#logging.info(lang["sending_payload"].format(payload=json.dumps(payload)))
try:
resp = self.network_client.send_request(payload)
#logging.info(lang["received_response"].format(response=resp))
# ─────────────────────────────────────────────────
# Token abgelaufen/ungültig => Re-Login
# ─────────────────────────────────────────────────
if (
(resp.get("status") in [401, 403])
or (resp.get("message") in ["token expired", "token invalid"])
):
if not _retry_on_token_expired:
return json.dumps({"error": "Token ungültig, Re-Login fehlgeschlagen."})
# Zusätzlicher Log-Eintrag, um sicher zu sehen, dass der Refresh hier wirklich passiert:
logging.warning("TOKEN REFRESH TRIGGERED! (401/403 or token expired/invalid recognized)")
old_token = self.token
self.token = None
if self.login():
return self.query_private_gpt(
prompt, use_public, language, groups,
_retry_on_token_expired=False
)
else:
return json.dumps({"error": "Automatischer Re-Login ist fehlgeschlagen."})
# Normaler Erfolgsfall
if resp.get("status") == 200 and resp.get("message") == "success":
content = resp.get("content", {})
answer = content.get("answer", lang["agent_error"].format(error=lang["no_answer_received"]))
return json.dumps({"answer": answer})
else:
return json.dumps({"error": resp.get("message", lang["agent_error"].format(error=lang["unknown_error"]))})
except NetworkError as e:
error_msg = lang["agent_error"].format(error=str(e))
logging.error(f"❌ {error_msg}")
return json.dumps({"error": error_msg})
def respond(self, user_input, groups=None):
response = self.knowledge_base.get(user_input, None)
if response:
#logging.info(self.get_lang_message("knowledge_response", input=user_input))
return json.dumps({"answer": response})
else:
return self.query_private_gpt(user_input, groups=groups)
def respond_with_context(self, messages):
user_input = f'{messages[-1].content}'
add_context = False
if add_context:
messages.pop()
user_input += "\nHere is some context about the previous conversation:\n"
for message in messages:
user_input += f"{message.role}: {message.content}\n"
result = self.query_private_gpt(user_input)
return json.loads(result)
def logout(self):
if not self.token:
logging.info(self.get_lang_message("no_token_logout"))
return
payload = {
"command": "logout",
"token": self.token
}
logging.info(self.get_lang_message("logout_attempt"))
try:
resp = self.network_client.send_request(payload)
logging.info(self.get_lang_message("received_response", response=resp))
if resp.get("status") == 200 and resp.get("message") == "success":
logging.info(self.get_lang_message("logout_success"))
self.token = None
else:
msg = resp.get("message", self.get_lang_message("no_server_message"))
logging.warning(self.get_lang_message("logout_failed", message=msg))
except NetworkError as e:
logging.error(self.get_lang_message("logout_failed", message=str(e)))
def run(self):
if not self.token:
logging.error(self.get_lang_message("authentication_failed"))
print(self.get_lang_message("authentication_failed"), flush=True)
return
welcome_msg = f"{Color.OKGREEN}{self.get_lang_message('welcome')}{Color.ENDC}"
print(welcome_msg, flush=True)
logging.info(self.get_lang_message("user_interface_started"))
while True:
try:
user_input = input(f"{Color.OKBLUE}{self.get_lang_message('user_question')}{Color.ENDC}")
if user_input.strip().lower() == "exit":
goodbye_msg = f"{Color.OKGREEN}{self.get_lang_message('goodbye')}{Color.ENDC}"
print(goodbye_msg, flush=True)
logging.info(self.get_lang_message("session_ended"))
break
elif not user_input.strip():
continue
result = self.respond(user_input)
parsed_result = json.loads(result)
if "answer" in parsed_result:
answer = parsed_result["answer"]
print(f"{Color.OKGREEN}{self.get_lang_message('agent_answer', answer=answer)}{Color.ENDC}", flush=True)
else:
error = parsed_result["error"]
print(f"{Color.FAIL}{self.get_lang_message('agent_error', error=error)}{Color.ENDC}", flush=True)
except (KeyboardInterrupt, EOFError):
goodbye_msg = f"{Color.OKGREEN}{self.get_lang_message('goodbye')}{Color.ENDC}"
print(goodbye_msg, flush=True)
logging.info(self.get_lang_message("session_interrupted"))
break