Api.py•18.4 kB
import json
import os
import posixpath
from pathlib import Path
from time import sleep
import paramiko
import requests
import urllib3
import base64
from httpcore import NetworkError
from .config import Config
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
def initialize_session(proxy_user, proxy_password, access_header):
"""Set up the session with proxy authentication."""
session = requests.Session()
session.verify = False
headers = {
'Accept': 'application/json',
'Content-Type': 'application/json',
}
if access_header is not None:
headers['X-Custom-Header'] = access_header
elif proxy_user is not None and proxy_password is not None:
auth = base64.b64encode(f"{proxy_user}:{proxy_password}".encode()).decode()
headers['Authorization'] = f'Basic {auth}'
session.headers.update(headers)
return session
class PrivateGPTAPI:
def __init__(self, config, client_api_key=None):
"""Initialize the chat client with proxy authentication."""
self.token = None
self.chat_id = None
self.base_url = config.get("base_url")
self.proxy_user = config.get("proxy_user", None)
if self.proxy_user == "":
self.proxy_user = None
self.proxy_password = config.get("proxy_password", None)
if self.proxy_password == "":
self.proxy_password = None
self.access_header = config.get("access_header", None)
if self.access_header == "":
self.access_header = None
self.chosen_groups = config.get("groups", [])
self.language = config.get("language", "en")
self.use_public = config.get("use_public", True)
self.whitelist_keys = config.get("whitelist_keys", [])
self.logged_in = False
if client_api_key is not None:
self.email, self.password = decrypt_api_key(client_api_key)
if len(self.whitelist_keys) > 0:
if client_api_key not in self.whitelist_keys:
print("not authorized")
else:
self.email = config.get("email", None)
self.password = config.get("password", None)
self.ftp_password = config.get("ftp_password", None)
self.session = initialize_session(self.proxy_user, self.proxy_password, self.access_header)
if self.login():
self.logged_in = True
if self.ftp_password is not None:
self.ftp_host = config.get("ftp_host", None)
self.ftp_port = config.get("ftp_port", None)
self.ftp_folder = config.get("ftp_folder", "/")
self.ftp_subfolder = config.get("ftp_subfolder", "temp")
def login(self):
"""Authenticate the user and retrieve the token."""
url = f"{self.base_url}/login"
payload = {"email": self.email, "password": self.password}
try:
response = self.session.post(url, json=payload)
print(response.content)
response.raise_for_status()
data = response.json()
self.token = data['data']['token']
# Prüfen, ob der Header bereits existiert
if 'Authorization' in self.session.headers:
self.session.headers['Authorization'] += f', Bearer {self.token}'
else:
self.session.headers['Authorization'] = f'Bearer {self.token}'
self.chat_id = None
print("✅ Login successful.")
return True
except requests.exceptions.RequestException as e:
print(f"❌ Login failed: {e}")
return False
def create_chat(self, user_input):
"""Start a new chat session.
This method sends a POST request to the '/chats' endpoint with the provided parameters.
It initializes a new chat session and stores the chat ID for future use.
"""
url = f"{self.base_url}/chats"
payload = {
"language": self.language,
"question": user_input, # Initial question to start the chat
"usePublic": self.use_public,
"groups": self.chosen_groups
}
try:
response = self.session.post(url, json=payload)
response.raise_for_status() # Raise an exception if the response was not successful
data = response.json()
self.chat_id = data['data']['chatId'] # Store the chat ID for future use
print("✅ Chat initialized.")
resp = response.json()
try:
answer = resp.get('data', None).get('answer', "error")
except:
print(response.json())
resp = {"data":
{"answer": "error"}
}
answer = "error"
if answer.startswith("{\"role\":"):
answerj = json.loads(answer)
resp["data"]["answer"] = answerj["content"]
resp["data"]["chatId"] = "0"
print(f"💡 Response: {answer}")
return resp
except requests.exceptions.RequestException as e:
# It seems we get disconnections from time to time..
# print(f"⚠️ Failed to get response on first try, trying again..: {e}")
try:
response = self.session.patch(url, json=payload)
response.raise_for_status()
data = response.json()
answer = data.get('data', {}).get('answer', "No answer provided.")
print(f"💡 Response: {answer}")
return data
except:
print(f"❌ Failed to get response: {e}")
return {"error": f"❌ Failed to get response: {e}"}
def list_personal_groups(self):
url = f"{self.base_url}/groups"
try:
resp = self.session.get(url)
try:
j = json.loads(resp.content)
data_block = j["data"]
if not data_block:
return []
personal = data_block.get("personalGroups", [])
return personal
except:
return []
except NetworkError as e:
return []
def get_document_info(self, id):
url = f"{self.base_url}/sources/{id }"
try:
resp = self.session.get(url)
j = json.loads(resp.content)
data_block = j["data"]
if not data_block:
return []
return data_block
except NetworkError as e:
return []
def query_private_gpt(self, user_input) -> json:
"""Send a question to the chat and retrieve the response."""
if not self.chat_id:
print("❌ Chat session not initialized.")
return False
url = f"{self.base_url}/chats/{self.chat_id}"
payload = {"question": user_input}
try:
response = self.session.patch(url, json=payload)
# response.raise_for_status()
resp = response.json()
try:
answer = resp.get('data', None).get('answer', "error")
except:
print(response.json())
resp = {"data":
{"answer": "error"}
}
answer = "error"
if answer.startswith("{\"role\":"):
answerj = json.loads(answer)
resp["data"]["answer"] = answerj["content"]
resp["data"]["chatId"] = "0"
print(f"💡 Response: {answer}")
return resp
except requests.exceptions.RequestException as e:
# It seems we get disconnections from time to time..
# print(f"⚠️ Failed to get response on first try, trying again..: {e}")
try:
response = self.session.patch(url, json=payload)
response.raise_for_status()
data = response.json()
answer = data.get('data', {}).get('answer', "No answer provided.")
print(f"💡 Response: {answer}")
return data
except:
print(f"❌ Failed to get response: {e}")
return {"error": f"❌ Failed to get response: {e}"}
def add_source(self, markdown, groups, name):
"""Send a source id to retrieve details. Working with version 1.3.3 and newer"""
url = f"{self.base_url}/sources"
try:
payload = {
"name": name,
"groups": groups,
"content": markdown
}
resp = self.session.post(url, json=payload)
j = json.loads(resp.content)
data_block = j["data"]
if not data_block:
return []
return data_block
except requests.exceptions.RequestException as e:
print(f"❌ Failed to get response: {e}")
return {"error": f"❌ Failed to get response: {e}"}
def update_source(self, source_id, markdown=None, groups=None, name=None):
"""Edit an existing Source"""
url = f"{self.base_url}/sources/{source_id}"
try:
payload = {}
if groups is None:
existing_groups = self.get_document_info(source_id)["groups"]
payload["groups"] = existing_groups
else:
payload["groups"] = groups
if markdown is not None:
payload["content"] = markdown
if name is not None:
payload["name"] = name
resp = self.session.patch(url, json=payload)
j = json.loads(resp.content)
data_block = j["data"]
if not data_block:
return []
return data_block
except requests.exceptions.RequestException as e:
print(f"❌ Failed to get response: {e}")
return {"error": f"❌ Failed to get response: {e}"}
def delete_source(self, source_id):
"""Send a source id to retrieve details. Working with version 1.3.3 and newer"""
url = f"{self.base_url}/sources/{source_id}"
try:
resp = self.session.delete(url)
j = json.loads(resp.content)
message = j["message"]
if not message:
return "failed"
return message
except requests.exceptions.RequestException as e:
print(f"❌ Failed to get response: {e}")
return {"error": f"❌ Failed to get response: {e}"}
def upload_sftp(self, file_path):
# Connect to SFTP to determine existing suffixes
transport = paramiko.Transport((self.ftp_host, self.ftp_port))
transport.connect(username=self.email, password=self.ftp_password)
sftp = paramiko.SFTPClient.from_transport(transport)
remote_base_dir = posixpath.join(self.ftp_folder, self.ftp_subfolder)
# Ensure the remote directory exists
try:
sftp.chdir(remote_base_dir)
except IOError:
# Create remote dirs if missing
parts = remote_base_dir.strip("/").split("/")
path = ""
for part in parts:
path = posixpath.join(path, part)
try:
sftp.chdir(path)
except IOError:
sftp.mkdir(path)
sftp.chdir(path)
# Determine remote file name
remote_filename = os.path.basename(file_path)
remote_path = posixpath.join(remote_base_dir, remote_filename)
# Upload the file
try:
sftp.put(file_path, remote_path)
print(f"Uploaded {file_path} to {remote_path} successfully.")
except Exception as e:
print(e)
finally:
sftp.close()
transport.close()
print(f"Connection closed")
sources = []
while len(sources) == 0:
print(f"Checking file status")
sleep(2)
sources = self.get_sources_from_group("temp")
return sources
def get_sources_from_group(self, group):
"""Send a source id to retrieve details. Working with version 1.3.3 and newer"""
url = f"{self.base_url}/sources/groups"
try:
payload = {
"groupName": group
}
resp = self.session.post(url, json=payload)
j = json.loads(resp.content)
data_block = j["data"]
if not data_block:
return []
sources = []
for source in data_block["sources"]:
doc = self.get_document_info(source)
sources.append(doc)
return sources
except requests.exceptions.RequestException as e:
print(f"❌ Failed to get response: {e}")
return []
def respond_with_context(self, messages, response_format=None, request_tools=None):
last_user_message = next((p for p in reversed(messages) if p["role"] == "user"), None)
user_input = ""
for message in messages:
if message["role"] == "system":
user_input = str(message) + "\n"
if last_user_message is not None:
user_input += last_user_message["content"]
last_assistant_message = next((p for p in reversed(messages) if p["role"] == "assistant"), None)
last_tool_message = next((p for p in reversed(messages) if p["role"] == "tool"), None)
hastoolresult = False
if last_tool_message is not None and last_assistant_message is not None and last_assistant_message.tool_calls is not None and len(
last_assistant_message.tool_calls) > 0:
user_input += "\nYou called the tool: " + str(
last_assistant_message.tool_calls[0]) + ". The result was: " + last_tool_message.content
hastoolresult = True
print(f"💁 Request: " + user_input)
# PGPT manages history and context itself so we don't need to forward the history.
add_context = False
if add_context:
messages.pop()
user_input += "\nHere is some context about the previous conversation:\n"
for message in messages:
user_input += f"{message.role}: {message.content}\n"
if response_format is not None:
print("Response format: " + str(response_format))
user_input += add_response_format(response_format)
if request_tools is not None and not hastoolresult:
user_input += add_tools(request_tools, last_tool_message)
if not self.logged_in:
self.login()
else:
if self.chat_id is None:
result = self.create_chat(user_input)
else:
result = self.query_private_gpt(user_input)
if 'data' in result:
response_data = result.get("data")
if request_tools is not None and not hastoolresult and is_json(
clean_response(response_data.get("answer"))):
response_data["tool_call"] = clean_response(response_data.get("answer", ""))
return response_data
elif 'error' in result:
# Try to login again and send the query once more on error.
if self.login():
if self.chat_id is None:
result = self.create_chat(user_input)
else:
result = self.query_private_gpt(user_input)
if 'data' in result:
return result['data']
else:
return result
else:
return result
def is_json(myjson):
try:
json.loads(myjson)
except ValueError as e:
return False
return True
def add_response_format(response_format):
# prompt = "\nPlease fill in the following template with realistic and appropriate information. Be creative. The field 'type' defines the output format. In your reply, only return the generated json\n"
prompt = "\nPlease fill in the following json template with realistic and appropriate information. In your reply, only return the generated json. If you can't answer return an empty json.\n"
prompt += json.dumps(response_format)
return prompt
def add_tools(response_tools, last_tool_message):
prompt = "\nPlease select the fitting provided tool to create your answer. Only return the generated result of the tool. Do not describe what you are doing, just return the json.\n"
index = 1
for tool in response_tools:
prompt += "\n" + json.dumps(tool) + "\n"
index += 1
return prompt
def clean_response(response):
# Remove artefacts from reply here
response = response.replace("[TOOL_CALLS]", "")
return response
def decrypt_api_key(api_key):
"""
This is PoC code and methods should be replaced with a more secure way to deal with credentials (e.g. in a db)
"""
try:
base64_bytes = api_key.encode("ascii")
decoded_string_bytes = base64.b64decode(base64_bytes)
decoded_key = decoded_string_bytes.decode("ascii")
except Exception as e:
print(e)
decoded_key = "invalid:invalid"
return decoded_key.split(":")[0], decoded_key.split(":")[1]
def main():
"""Main function to run the chat application."""
config_file = Path.absolute(Path(__file__).parent.parent / "pgpt_openai_api_proxy.json")
config = Config(config_file=config_file, required_fields=["base_url"])
chat = PrivateGPTAPI(config)
print("Type your questions below. Type 'quit' to exit.")
while True:
try:
question = input("❓ Question: ").strip()
if question.lower() == 'quit':
break
if question:
chat.query_private_gpt(question)
except KeyboardInterrupt:
print("\nExiting chat...")
break
except Exception as e:
print(f"❌ Error: {str(e)}")
break
if __name__ == "__main__":
main()