configuration.py•10.3 kB
# coding: utf-8
"""
Alteryx Server API V3
OpenAPI spec version: 3
"""
from __future__ import absolute_import
import copy
import logging
import multiprocessing
import sys
import urllib3
import os
import tempfile
import six
import http.client as httplib
class Configuration(object):
"""NOTE: This class is auto generated by the swagger code generator program.
Ref: https://github.com/swagger-api/swagger-codegen
Do not edit the class manually.
"""
_default = None
def __init__(self):
"""Constructor"""
if self._default:
for key in self._default.__dict__.keys():
self.__dict__[key] = copy.copy(self._default.__dict__[key])
return
# Default Base url
self.host = os.getenv("ALTERYX_API_HOST", "http://localhost/webapi/")
# Temp file folder for downloading files
self.temp_folder_path = None
# Authentication Settings
# dict to store API key(s)
self.api_key = {}
# dict to store API prefix (e.g. Bearer)
self.api_key_prefix = {}
# function to refresh API key if expired
self.refresh_api_key_hook = None
# Username for HTTP basic authentication
self.username = os.getenv("ALTERYX_USERNAME", "")
# Password for HTTP basic authentication
self.password = os.getenv("ALTERYX_PASSWORD", "")
# Client ID for OAuth2 client credentials
self.client_id = os.getenv("ALTERYX_CLIENT_ID", "")
# Client secret for OAuth2 client credentials
self.client_secret = os.getenv("ALTERYX_CLIENT_SECRET", "")
# OAuth2 access token
self.access_token = ""
# OAuth2 token endpoint
self.token_url = "/oauth2/token"
# Temp Directory
self.temp_directory = os.getenv("ALTERYX_TEMP_DIRECTORY", tempfile.gettempdir())
# Logging Settings
self.logger = {}
self.logger["package_logger"] = logging.getLogger("server_client")
self.logger["urllib3_logger"] = logging.getLogger("urllib3")
# Log format
self.logger_format = "%(asctime)s %(levelname)s %(message)s"
# Log stream handler
self.logger_stream_handler = None
# Log file handler
self.logger_file_handler = None
# Debug file location
self.logger_file = None
# Debug switch
self.debug = False
# SSL/TLS verification
# Set this to false to skip verifying SSL certificate when calling API
# from https server.
self.verify_ssl = os.getenv("ALTERYX_VERIFY_SSL", "1").lower() not in ("0", "false", "no")
# Set this to customize the certificate file to verify the peer.
self.ssl_ca_cert = None
# client certificate file
self.cert_file = None
# client key file
self.key_file = None
# Set this to True/False to enable/disable SSL hostname verification.
self.assert_hostname = None
# urllib3 connection pool's maximum number of connections saved
# per pool. urllib3 uses 1 connection as default value, but this is
# not the best value when you are making a lot of possibly parallel
# requests to the same host, which is often the case here.
# cpu_count * 5 is used as default value to increase performance.
self.connection_pool_maxsize = multiprocessing.cpu_count() * 5
# Proxy URL
self.proxy = None
# Safe chars for path_param
self.safe_chars_for_path_param = ""
# Disable client side validation
self.client_side_validation = False
@classmethod
def set_default(cls, default):
cls._default = default
@property
def logger_file(self):
"""The logger file.
If the logger_file is None, then add stream handler and remove file
handler. Otherwise, add file handler and remove stream handler.
:param value: The logger_file path.
:type: str
"""
return self.__logger_file
@logger_file.setter
def logger_file(self, value):
"""The logger file.
If the logger_file is None, then add stream handler and remove file
handler. Otherwise, add file handler and remove stream handler.
:param value: The logger_file path.
:type: str
"""
self.__logger_file = value
if self.__logger_file:
# If set logging file,
# then add file handler and remove stream handler.
self.logger_file_handler = logging.FileHandler(self.__logger_file)
self.logger_file_handler.setFormatter(self.logger_formatter)
for _, logger in six.iteritems(self.logger):
logger.addHandler(self.logger_file_handler)
if self.logger_stream_handler:
logger.removeHandler(self.logger_stream_handler)
else:
# If not set logging file,
# then add stream handler and remove file handler.
self.logger_stream_handler = logging.StreamHandler()
self.logger_stream_handler.setFormatter(self.logger_formatter)
for _, logger in six.iteritems(self.logger):
logger.addHandler(self.logger_stream_handler)
if self.logger_file_handler:
logger.removeHandler(self.logger_file_handler)
@property
def debug(self):
"""Debug status
:param value: The debug status, True or False.
:type: bool
"""
return self.__debug
@debug.setter
def debug(self, value):
"""Debug status
:param value: The debug status, True or False.
:type: bool
"""
self.__debug = value
if self.__debug:
# if debug status is True, turn on debug logging
for _, logger in six.iteritems(self.logger):
logger.setLevel(logging.DEBUG)
# turn on httplib debug
httplib.HTTPConnection.debuglevel = 1
else:
# if debug status is False, turn off debug logging,
# setting log level to default `logging.WARNING`
for _, logger in six.iteritems(self.logger):
logger.setLevel(logging.WARNING)
# turn off httplib debug
httplib.HTTPConnection.debuglevel = 0
@property
def logger_format(self):
"""The logger format.
The logger_formatter will be updated when sets logger_format.
:param value: The format string.
:type: str
"""
return self.__logger_format
@logger_format.setter
def logger_format(self, value):
"""The logger format.
The logger_formatter will be updated when sets logger_format.
:param value: The format string.
:type: str
"""
self.__logger_format = value
self.logger_formatter = logging.Formatter(self.__logger_format)
def get_api_key_with_prefix(self, identifier):
"""Gets API key (with prefix if set).
:param identifier: The identifier of apiKey.
:return: The token for api key authentication.
"""
if self.refresh_api_key_hook:
self.refresh_api_key_hook(self)
key = self.api_key.get(identifier)
if key:
prefix = self.api_key_prefix.get(identifier)
if prefix:
return "%s %s" % (prefix, key)
else:
return key
def get_basic_auth_token(self):
"""Gets HTTP basic authentication header (string).
:return: The token for basic HTTP authentication.
"""
token = ""
if self.username or self.password:
token = urllib3.util.make_headers(basic_auth=self.username + ":" + self.password).get("authorization")
return token
def get_client_credentials_token(self):
"""Gets client credentials token.
:return: The token for client credentials authentication.
"""
if not self.client_id or not self.client_secret:
return None
return self.client_id + ":" + self.client_secret
def get_access_token(self):
"""Gets OAuth2 access token using client credentials.
:return: The access token string
:rtype: str
"""
if not self.client_id or not self.client_secret:
return None
import requests
from base64 import b64encode
# Create basic auth header from client credentials
credentials = f"{self.client_id}:{self.client_secret}"
auth_header = b64encode(credentials.encode()).decode()
headers = {"Authorization": f"Basic {auth_header}", "Content-Type": "application/x-www-form-urlencoded"}
data = {"grant_type": "client_credentials"}
url = self.host.rstrip("/") + self.token_url
try:
response = requests.post(url, headers=headers, data=data, verify=self.verify_ssl)
response.raise_for_status()
token_data = response.json()
self.access_token = token_data.get("access_token")
return self.access_token
except Exception as e:
self.logger.error(f"Error getting access token: {str(e)}")
return None
def auth_settings(self):
"""Gets Auth Settings dict for api client.
:return: The Auth Settings information dict.
"""
# Try to get access token if we have client credentials but no token
if not self.access_token and self.client_id and self.client_secret:
self.get_access_token()
return {
"oauth2": {
"type": "oauth2",
"in": "header",
"key": "Authorization",
"value": "Bearer " + (self.access_token or ""),
}
}
def to_debug_report(self):
"""Gets the essential information for debugging.
:return: The report for debugging.
"""
return (
"Python SDK Debug Report:\n"
"OS: {env}\n"
"Python Version: {pyversion}\n"
"Version of the API: 3\n"
"SDK Package Version: 1.0.0".format(env=sys.platform, pyversion=sys.version)
)