Api.py•18.5 kB
import json
import os
import posixpath
from pathlib import Path
from time import sleep
import paramiko
import requests
import urllib3
import base64
from httpcore import NetworkError
from clients.Gradio.config import Config
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
def initialize_session(proxy_user, proxy_password, access_header):
    """Set up the session with proxy authentication."""
    session = requests.Session()
    session.verify = False
    headers = {
        'Accept': 'application/json',
        'Content-Type': 'application/json',
    }
    if access_header is not None:
        headers['X-Custom-Header'] = access_header
    elif proxy_user is not None and proxy_password is not None:
        auth = base64.b64encode(f"{proxy_user}:{proxy_password}".encode()).decode()
        headers['Authorization'] = f'Basic {auth}'
    session.headers.update(headers)
    return session
class PrivateGPTAPI:
    def __init__(self, config, client_api_key=None):
        """Initialize the chat client with proxy authentication."""
        self.token = None
        self.chat_id = None
        self.base_url = config.get("base_url")
        self.proxy_user = config.get("proxy_user", None)
        if self.proxy_user == "":
            self.proxy_user = None
        self.proxy_password = config.get("proxy_password", None)
        if self.proxy_password == "":
            self.proxy_password = None
        self.access_header = config.get("access_header", None)
        if self.access_header == "":
            self.access_header = None
        self.chosen_groups = config.get("groups", [])
        self.language = config.get("language", "en")
        self.use_public = config.get("use_public", False)
        self.whitelist_keys = config.get("whitelist_keys", [])
        self.logged_in = False
        if client_api_key is not None:
            self.email, self.password = decrypt_api_key(client_api_key)
            if len(self.whitelist_keys) > 0:
                if client_api_key not in self.whitelist_keys:
                    print("not authorized")
        else:
            self.email =  config.get("email", None)
            self.password =  config.get("password", None)
            self.ftp_password = config.get("ftp_password", None)
        self.session = initialize_session(self.proxy_user, self.proxy_password, self.access_header)
        if self.login():
            self.logged_in = True
        if self.ftp_password is not None:
            self.ftp_host = config.get("ftp_host", None)
            self.ftp_port = config.get("ftp_port", None)
            self.ftp_folder = config.get("ftp_folder", "/")
            self.ftp_subfolder = config.get("ftp_subfolder", "temp")
    def login(self):
        """Authenticate the user and retrieve the token."""
        url = f"{self.base_url}/login"
        payload = {"email": self.email, "password": self.password}
        try:
            response = self.session.post(url, json=payload)
            print(response.content)
            response.raise_for_status()
            data = response.json()
            self.token = data['data']['token']
            # Prüfen, ob der Header bereits existiert
            if 'Authorization' in self.session.headers:
                self.session.headers['Authorization'] += f', Bearer {self.token}'
            else:
                self.session.headers['Authorization'] = f'Bearer {self.token}'
            self.chat_id = None
            print("✅ Login successful.")
            return True
        except requests.exceptions.RequestException as e:
            print(f"❌ Login failed: {e}")
        return False
    def create_chat(self, user_input):
        """Start a new chat session.
        This method sends a POST request to the '/chats' endpoint with the provided parameters.
        It initializes a new chat session and stores the chat ID for future use.
        """
        url = f"{self.base_url}/chats"
        payload = {
            "language": self.language,
            "question": user_input,  # Initial question to start the chat
            "usePublic": self.use_public,
            "groups": self.chosen_groups
        }
        try:
            response = self.session.post(url, json=payload)
            response.raise_for_status()  # Raise an exception if the response was not successful
            data = response.json()
            self.chat_id = data['data']['chatId']  # Store the chat ID for future use
            print("✅ Chat initialized.")
            resp = response.json()
            try:
                answer = resp.get('data', None).get('answer', "error")
            except:
                print(response.json())
                resp = {"data":
                            {"answer": "error"}
                        }
                answer = "error"
            if answer.startswith("{\"role\":"):
                answerj = json.loads(answer)
                resp["data"]["answer"] = answerj["content"]
                resp["data"]["chatId"] = "0"
            print(f"💡 Response: {answer}")
            return resp
        except requests.exceptions.RequestException as e:
            # It seems we get disconnections from time to time..
            # print(f"⚠️ Failed to get response on first try, trying again..: {e}")
            try:
                response = self.session.patch(url, json=payload)
                response.raise_for_status()
                data = response.json()
                answer = data.get('data', {}).get('answer', "No answer provided.")
                print(f"💡 Response: {answer}")
                return data
            except:
                print(f"❌ Failed to get response: {e}")
                return {"error": f"❌ Failed to get response: {e}"}
    def list_personal_groups(self):
        url = f"{self.base_url}/groups"
        try:
            resp = self.session.get(url)
            try:
                j = json.loads(resp.content)
                data_block = j["data"]
                if not data_block:
                    return []
                personal = data_block.get("personalGroups", [])
                return personal
            except:
                return []
        except NetworkError as e:
            return []
    def get_document_info(self, id):
        url = f"{self.base_url}/sources/{id }"
        try:
            resp = self.session.get(url)
            j = json.loads(resp.content)
            data_block = j["data"]
            if not data_block:
                return []
            return data_block
        except NetworkError as e:
            return []
    def query_private_gpt(self, user_input) -> json:
        """Send a question to the chat and retrieve the response."""
        if not self.chat_id:
            print("❌ Chat session not initialized.")
            return False
        url = f"{self.base_url}/chats/{self.chat_id}"
        payload = {"question": user_input}
        try:
            response = self.session.patch(url, json=payload)
            # response.raise_for_status()
            resp = response.json()
            try:
                answer = resp.get('data', None).get('answer', "error")
            except:
                print(response.json())
                resp = {"data":
                            {"answer": "error"}
                        }
                answer = "error"
            if answer.startswith("{\"role\":"):
                answerj = json.loads(answer)
                resp["data"]["answer"] = answerj["content"]
                resp["data"]["chatId"] = "0"
            print(f"💡 Response: {answer}")
            return resp
        except requests.exceptions.RequestException as e:
            # It seems we get disconnections from time to time..
            # print(f"⚠️ Failed to get response on first try, trying again..: {e}")
            try:
                response = self.session.patch(url, json=payload)
                response.raise_for_status()
                data = response.json()
                answer = data.get('data', {}).get('answer', "No answer provided.")
                print(f"💡 Response: {answer}")
                return data
            except:
                print(f"❌ Failed to get response: {e}")
                return {"error": f"❌ Failed to get response: {e}"}
    def add_source(self, markdown, groups, name):
        """Send a source id to retrieve details. Working with version 1.3.3 and newer"""
        url = f"{self.base_url}/sources"
        try:
            payload = {
                "name": name,
                "groups": groups,
                "content": markdown
            }
            resp = self.session.post(url, json=payload)
            j = json.loads(resp.content)
            data_block = j["data"]
            if not data_block:
                return []
            return data_block
        except requests.exceptions.RequestException as e:
            print(f"❌ Failed to get response: {e}")
            return {"error": f"❌ Failed to get response: {e}"}
    def update_source(self, source_id, markdown=None, groups=None, name=None):
        """Edit an existing Source"""
        url = f"{self.base_url}/sources/{source_id}"
        try:
            payload = {}
            if groups is None:
                existing_groups = self.get_document_info(source_id)["groups"]
                payload["groups"] = existing_groups
            else:
                payload["groups"] = groups
            if markdown is not None:
                payload["content"] = markdown
            if name is not None:
                payload["name"] = name
            resp = self.session.patch(url, json=payload)
            j = json.loads(resp.content)
            data_block = j["data"]
            if not data_block:
                return []
            return data_block
        except requests.exceptions.RequestException as e:
            print(f"❌ Failed to get response: {e}")
            return {"error": f"❌ Failed to get response: {e}"}
    def delete_source(self, source_id):
        """Send a source id to retrieve details. Working with version 1.3.3 and newer"""
        url = f"{self.base_url}/sources/{source_id}"
        try:
            resp = self.session.delete(url)
            j = json.loads(resp.content)
            message = j["message"]
            if not message:
                return "failed"
            return message
        except requests.exceptions.RequestException as e:
            print(f"❌ Failed to get response: {e}")
            return {"error": f"❌ Failed to get response: {e}"}
    def upload_sftp(self, file_path):
        # Connect to SFTP to determine existing suffixes
        transport = paramiko.Transport((self.ftp_host, self.ftp_port))
        transport.connect(username=self.email, password=self.ftp_password)
        sftp = paramiko.SFTPClient.from_transport(transport)
        remote_base_dir = posixpath.join(self.ftp_folder, self.ftp_subfolder)
        # Ensure the remote directory exists
        try:
            sftp.chdir(remote_base_dir)
        except IOError:
            # Create remote dirs if missing
            parts = remote_base_dir.strip("/").split("/")
            path = ""
            for part in parts:
                path = posixpath.join(path, part)
                try:
                    sftp.chdir(path)
                except IOError:
                    sftp.mkdir(path)
                    sftp.chdir(path)
            # Determine remote file name
        remote_filename = os.path.basename(file_path)
        remote_path = posixpath.join(remote_base_dir, remote_filename)
        # Upload the file
        try:
            sftp.put(file_path, remote_path)
            print(f"Uploaded {file_path} to {remote_path} successfully.")
        except Exception as e:
            print(e)
        finally:
            sftp.close()
            transport.close()
            print(f"Connection closed")
            sources = []
            while len(sources) == 0:
               print(f"Checking file status")
               sleep(2)
               sources = self.get_sources_from_group("temp")
            return sources
    def get_sources_from_group(self, group):
        """Send a source id to retrieve details. Working with version 1.3.3 and newer"""
        url = f"{self.base_url}/sources/groups"
        try:
            payload = {
                "groupName": group
            }
            resp = self.session.post(url, json=payload)
            j = json.loads(resp.content)
            data_block = j["data"]
            if not data_block:
                return []
            sources = []
            for source in data_block["sources"]:
                doc = self.get_document_info(source)
                sources.append(doc)
            return sources
        except requests.exceptions.RequestException as e:
            print(f"❌ Failed to get response: {e}")
            return []
    def respond_with_context(self, messages, response_format=None, request_tools=None):
        last_user_message = next((p for p in reversed(messages) if p["role"] == "user"), None)
        user_input = ""
        for message in messages:
            if message["role"] == "system":
                user_input = str(message) + "\n"
        if last_user_message is not None:
            user_input += last_user_message["content"]
        last_assistant_message = next((p for p in reversed(messages) if p["role"] == "assistant"), None)
        last_tool_message = next((p for p in reversed(messages) if p["role"] == "tool"), None)
        hastoolresult = False
        if last_tool_message is not None and last_assistant_message is not None and last_assistant_message.tool_calls is not None and len(
                last_assistant_message.tool_calls) > 0:
            user_input += "\nYou called the tool: " + str(
                last_assistant_message.tool_calls[0]) + ". The result was: " + last_tool_message.content
            hastoolresult = True
        print(f"💁 Request: " + user_input)
        # PGPT manages history and context itself so we don't need to forward the history.
        add_context = False
        if add_context:
            messages.pop()
            user_input += "\nHere is some context about the previous conversation:\n"
            for message in messages:
                user_input += f"{message.role}: {message.content}\n"
        if response_format is not None:
            print("Response format: " + str(response_format))
            user_input += add_response_format(response_format)
        if request_tools is not None and not hastoolresult:
            user_input += add_tools(request_tools, last_tool_message)
        if not self.logged_in:
            self.login()
        else:
            if self.chat_id is None:
                result = self.create_chat(user_input)
            else:
                result = self.query_private_gpt(user_input)
            if 'data' in result:
                response_data = result.get("data")
                if request_tools is not None and not hastoolresult and is_json(
                        clean_response(response_data.get("answer"))):
                    response_data["tool_call"] = clean_response(response_data.get("answer", ""))
                return response_data
            elif 'error' in result:
                # Try to login again and send the query once more on error.
                if self.login():
                    if self.chat_id is None:
                        result = self.create_chat(user_input)
                    else:
                        result = self.query_private_gpt(user_input)
                    if 'data' in result:
                        return result['data']
                    else:
                        return result
            else:
                return result
def is_json(myjson):
    try:
        json.loads(myjson)
    except ValueError as e:
        return False
    return True
def add_response_format(response_format):
    # prompt = "\nPlease fill in the following template with realistic and appropriate information. Be creative. The field 'type' defines the output format. In your reply, only return the generated json\n"
    prompt = "\nPlease fill in the following json template with realistic and appropriate information. In your reply, only return the generated json. If you can't answer return an empty json.\n"
    prompt += json.dumps(response_format)
    return prompt
def add_tools(response_tools, last_tool_message):
    prompt = "\nPlease select the fitting provided tool to create your answer. Only return the generated result of the tool. Do not describe what you are doing, just return the json.\n"
    index = 1
    for tool in response_tools:
        prompt += "\n" + json.dumps(tool) + "\n"
        index += 1
    return prompt
def clean_response(response):
    # Remove artefacts from reply here
    response = response.replace("[TOOL_CALLS]", "")
    return response
def decrypt_api_key(api_key):
    """
    This is PoC code and methods should be replaced with a more secure way to deal with credentials (e.g. in a db)
    """
    try:
        base64_bytes = api_key.encode("ascii")
        decoded_string_bytes = base64.b64decode(base64_bytes)
        decoded_key = decoded_string_bytes.decode("ascii")
    except Exception as e:
        print(e)
        decoded_key = "invalid:invalid"
    return decoded_key.split(":")[0], decoded_key.split(":")[1]
def main():
    """Main function to run the chat application."""
    config_file = Path.absolute(Path(__file__).parent / "config.json")
    config = Config(config_file=config_file, required_fields=["base_url"])
    chat = PrivateGPTAPI(config)
    print("Type your questions below. Type 'quit' to exit.")
    while True:
        try:
            question = input("❓ Question: ").strip()
            if question.lower() == 'quit':
                break
            if question:
                if chat.chat_id is None:
                    chat.create_chat(question)
                else:
                    chat.query_private_gpt(question)
        except KeyboardInterrupt:
            print("\nExiting chat...")
            break
        except Exception as e:
            print(f"❌ Error: {str(e)}")
            break
if __name__ == "__main__":
    main()