workflows_api.py•67.9 kB
# coding: utf-8
"""
Alteryx Server API V3
No description provided (generated by Swagger Codegen https://github.com/swagger-api/swagger-codegen) # noqa: E501
OpenAPI spec version: 3
Generated by: https://github.com/swagger-api/swagger-codegen.git
"""
from __future__ import absolute_import
import re # noqa: F401
# python 2 and python 3 compatibility library
import six
from src.server_client.api_client import ApiClient
class WorkflowsApi(object):
"""NOTE: This class is auto generated by the swagger code generator program.
Do not edit the class manually.
Ref: https://github.com/swagger-api/swagger-codegen
"""
def __init__(self, api_client=None):
if api_client is None:
api_client = ApiClient()
self.api_client = api_client
def workflows_add_version_to_workflow(
self,
workflow_id,
file,
name,
owner_id,
others_may_download,
others_can_execute,
execution_mode,
make_published,
workflow_credential_type,
**kwargs,
): # noqa: E501
"""Upload a new version of an existing workflow. # noqa: E501
This method makes a synchronous HTTP request by default. To make an
asynchronous HTTP request, please pass async_req=True
>>> thread = api.workflows_add_version_to_workflow(workflow_id, file, name, owner_id, others_may_download,
others_can_execute, execution_mode, make_published, workflow_credential_type, async_req=True)
>>> result = thread.get()
:param async_req bool
:param str workflow_id: Identifier for an existing workflow or app (required)
:param file file: Must use a workflow package (.yxzp). (required)
:param str name: Name for the updated version of the workflow. (required)
:param str owner_id: Identifier for the owner of the new version of the workflow. (required)
:param bool others_may_download: (required)
:param bool others_can_execute: (required)
:param str execution_mode: (required)
:param bool make_published: Updates this new version as the published version of the workflow. (required)
:param str workflow_credential_type: (required)
:param bool has_private_data_exemption:
:param str comments:
:param str credential_id: Identifier for existing credentials used within the workflow.
:return: WorkflowView
If the method is called asynchronously,
returns the request thread.
"""
kwargs["_return_http_data_only"] = True
if kwargs.get("async_req"):
return self.workflows_add_version_to_workflow_with_http_info(
workflow_id,
file,
name,
owner_id,
others_may_download,
others_can_execute,
execution_mode,
make_published,
workflow_credential_type,
**kwargs,
) # noqa: E501
else:
(data) = self.workflows_add_version_to_workflow_with_http_info(
workflow_id,
file,
name,
owner_id,
others_may_download,
others_can_execute,
execution_mode,
make_published,
workflow_credential_type,
**kwargs,
) # noqa: E501
return data
def workflows_add_version_to_workflow_with_http_info(
self,
workflow_id,
file,
name,
owner_id,
others_may_download,
others_can_execute,
execution_mode,
make_published,
workflow_credential_type,
**kwargs,
): # noqa: E501
"""Upload a new version of an existing workflow. # noqa: E501
This method makes a synchronous HTTP request by default. To make an
asynchronous HTTP request, please pass async_req=True
>>> thread = api.workflows_add_version_to_workflow_with_http_info(workflow_id, file, name, owner_id,
others_may_download, others_can_execute, execution_mode, make_published, workflow_credential_type,
async_req=True)
>>> result = thread.get()
:param async_req bool
:param str workflow_id: Identifier for an existing workflow or app (required)
:param file file: Must use a workflow package (.yxzp). (required)
:param str name: Name for the updated version of the workflow. (required)
:param str owner_id: Identifier for the owner of the new version of the workflow. (required)
:param bool others_may_download: (required)
:param bool others_can_execute: (required)
:param str execution_mode: (required)
:param bool make_published: Updates this new version as the published version of the workflow. (required)
:param str workflow_credential_type: (required)
:param bool has_private_data_exemption:
:param str comments:
:param str credential_id: Identifier for existing credentials used within the workflow.
:return: WorkflowView
If the method is called asynchronously,
returns the request thread.
"""
all_params = [
"workflow_id",
"file",
"name",
"owner_id",
"others_may_download",
"others_can_execute",
"execution_mode",
"make_published",
"workflow_credential_type",
"has_private_data_exemption",
"comments",
"credential_id",
] # noqa: E501
all_params.append("async_req")
all_params.append("_return_http_data_only")
all_params.append("_preload_content")
all_params.append("_request_timeout")
params = locals()
for key, val in six.iteritems(params["kwargs"]):
if key not in all_params:
raise TypeError(
"Got an unexpected keyword argument '%s' to method workflows_add_version_to_workflow" % key
)
params[key] = val
del params["kwargs"]
# verify the required parameter 'workflow_id' is set
if self.api_client.client_side_validation and ("workflow_id" not in params or params["workflow_id"] is None): # noqa: E501
raise ValueError(
"Missing the required parameter `workflow_id` when calling `workflows_add_version_to_workflow`"
) # noqa: E501
# verify the required parameter 'file' is set
if self.api_client.client_side_validation and ("file" not in params or params["file"] is None): # noqa: E501
raise ValueError("Missing the required parameter `file` when calling `workflows_add_version_to_workflow`") # noqa: E501
# verify the required parameter 'name' is set
if self.api_client.client_side_validation and ("name" not in params or params["name"] is None): # noqa: E501
raise ValueError("Missing the required parameter `name` when calling `workflows_add_version_to_workflow`") # noqa: E501
# verify the required parameter 'owner_id' is set
if self.api_client.client_side_validation and ("owner_id" not in params or params["owner_id"] is None): # noqa: E501
raise ValueError(
"Missing the required parameter `owner_id` when calling `workflows_add_version_to_workflow`"
) # noqa: E501
# verify the required parameter 'others_may_download' is set
if self.api_client.client_side_validation and (
"others_may_download" not in params or params["others_may_download"] is None
): # noqa: E501
raise ValueError(
"Missing the required parameter `others_may_download` when calling `workflows_add_version_to_workflow`"
) # noqa: E501
# verify the required parameter 'others_can_execute' is set
if self.api_client.client_side_validation and (
"others_can_execute" not in params or params["others_can_execute"] is None
): # noqa: E501
raise ValueError(
"Missing the required parameter `others_can_execute` when calling `workflows_add_version_to_workflow`"
) # noqa: E501
# verify the required parameter 'execution_mode' is set
if self.api_client.client_side_validation and (
"execution_mode" not in params or params["execution_mode"] is None
): # noqa: E501
raise ValueError(
"Missing the required parameter `execution_mode` when calling `workflows_add_version_to_workflow`"
) # noqa: E501
# verify the required parameter 'make_published' is set
if self.api_client.client_side_validation and (
"make_published" not in params or params["make_published"] is None
): # noqa: E501
raise ValueError(
"Missing the required parameter `make_published` when calling `workflows_add_version_to_workflow`"
) # noqa: E501
# verify the required parameter 'workflow_credential_type' is set
if self.api_client.client_side_validation and (
"workflow_credential_type" not in params or params["workflow_credential_type"] is None
): # noqa: E501
raise ValueError(
"Missing the required parameter `workflow_credential_type` when calling "
"`workflows_add_version_to_workflow`"
) # noqa: E501
collection_formats = {}
path_params = {}
if "workflow_id" in params:
path_params["workflowId"] = params["workflow_id"] # noqa: E501
query_params = []
header_params = {}
form_params = []
local_var_files = {}
if "file" in params:
local_var_files["file"] = params["file"] # noqa: E501
if "name" in params:
form_params.append(("name", params["name"])) # noqa: E501
if "owner_id" in params:
form_params.append(("ownerId", params["owner_id"])) # noqa: E501
if "others_may_download" in params:
form_params.append(("othersMayDownload", params["others_may_download"])) # noqa: E501
if "others_can_execute" in params:
form_params.append(("othersCanExecute", params["others_can_execute"])) # noqa: E501
if "execution_mode" in params:
form_params.append(("executionMode", params["execution_mode"])) # noqa: E501
if "has_private_data_exemption" in params:
form_params.append(("hasPrivateDataExemption", params["has_private_data_exemption"])) # noqa: E501
if "comments" in params:
form_params.append(("comments", params["comments"])) # noqa: E501
if "make_published" in params:
form_params.append(("makePublished", params["make_published"])) # noqa: E501
if "workflow_credential_type" in params:
form_params.append(("workflowCredentialType", params["workflow_credential_type"])) # noqa: E501
if "credential_id" in params:
form_params.append(("credentialId", params["credential_id"])) # noqa: E501
body_params = None
# HTTP header `Accept`
header_params["Accept"] = self.api_client.select_header_accept(
["application/json", "text/json", "application/xml", "text/xml"]
) # noqa: E501
# HTTP header `Content-Type`
header_params["Content-Type"] = self.api_client.select_header_content_type( # noqa: E501
["application/form-data"]
) # noqa: E501
# Authentication setting
auth_settings = ["oauth2"] # noqa: E501
return self.api_client.call_api(
"/v3/workflows/{workflowId}/versions",
"POST",
path_params,
query_params,
header_params,
body=body_params,
post_params=form_params,
files=local_var_files,
response_type="WorkflowView", # noqa: E501
auth_settings=auth_settings,
async_req=params.get("async_req"),
_return_http_data_only=params.get("_return_http_data_only"),
_preload_content=params.get("_preload_content", True),
_request_timeout=params.get("_request_timeout"),
collection_formats=collection_formats,
)
def workflows_create_workflow(
self,
file,
name,
owner_id,
is_public,
is_ready_for_migration,
others_may_download,
others_can_execute,
execution_mode,
workflow_credential_type,
**kwargs,
): # noqa: E501
"""Upload a new workflow package. # noqa: E501
This method makes a synchronous HTTP request by default. To make an
asynchronous HTTP request, please pass async_req=True
>>> thread = api.workflows_create_workflow(file, name, owner_id, is_public, is_ready_for_migration,
others_may_download, others_can_execute, execution_mode, workflow_credential_type, async_req=True)
>>> result = thread.get()
:param async_req bool
:param file file: Must use a workflow package (.yxzp). (required)
:param str name: (required)
:param str owner_id: (required)
:param bool is_public: (required)
:param bool is_ready_for_migration: (required)
:param bool others_may_download: (required)
:param bool others_can_execute: (required)
:param str execution_mode: Accepted values are \"Safe\", \"SemiSafe\", \"Standard\", (required)
:param str workflow_credential_type: Accepted values are \"Default\", \"Required\", \"Specific\", (required)
:param str worker_tag:
:param str district_tags: Submit as JSON formatted array. IE: [\"id1\", \"id2\"]
:param str comments:
:param str source_app_id: Sets the source app id of a workflow. Can be used as the 'sourceId' reference for
the POST admin/v1/workflows API endpoint. Providing an pre existing sourceAppId will result
in an invalid request.
:param bool has_private_data_exemption:
:param str credential_id:
:param str collection_ids: Submit as JSON formatted array. IE: [\"id1\", \"id2\"]
:return: str
If the method is called asynchronously,
returns the request thread.
"""
kwargs["_return_http_data_only"] = True
if kwargs.get("async_req"):
return self.workflows_create_workflow_with_http_info(
file,
name,
owner_id,
is_public,
is_ready_for_migration,
others_may_download,
others_can_execute,
execution_mode,
workflow_credential_type,
**kwargs,
) # noqa: E501
else:
(data) = self.workflows_create_workflow_with_http_info(
file,
name,
owner_id,
is_public,
is_ready_for_migration,
others_may_download,
others_can_execute,
execution_mode,
workflow_credential_type,
**kwargs,
) # noqa: E501
return data
def workflows_create_workflow_with_http_info(
self,
file,
name,
owner_id,
is_public,
is_ready_for_migration,
others_may_download,
others_can_execute,
execution_mode,
workflow_credential_type,
**kwargs,
): # noqa: E501
"""Upload a new workflow package. # noqa: E501
This method makes a synchronous HTTP request by default. To make an
asynchronous HTTP request, please pass async_req=True
>>> thread = api.workflows_create_workflow_with_http_info(file, name, owner_id, is_public,
is_ready_for_migration, others_may_download, others_can_execute, execution_mode, workflow_credential_type,
async_req=True)
>>> result = thread.get()
:param async_req bool
:param file file: Must use a workflow package (.yxzp). (required)
:param str name: (required)
:param str owner_id: (required)
:param bool is_public: (required)
:param bool is_ready_for_migration: (required)
:param bool others_may_download: (required)
:param bool others_can_execute: (required)
:param str execution_mode: Accepted values are \"Safe\", \"SemiSafe\", \"Standard\", (required)
:param str workflow_credential_type: Accepted values are \"Default\", \"Required\", \"Specific\", (required)
:param str worker_tag:
:param str district_tags: Submit as JSON formatted array. IE: [\"id1\", \"id2\"]
:param str comments:
:param str source_app_id: Sets the source app id of a workflow. Can be used as the 'sourceId' reference for
the POST admin/v1/workflows API endpoint. Providing an pre existing sourceAppId will result in an invalid
request.
:param bool has_private_data_exemption:
:param str credential_id:
:param str collection_ids: Submit as JSON formatted array. IE: [\"id1\", \"id2\"]
:return: str
If the method is called asynchronously,
returns the request thread.
"""
all_params = [
"file",
"name",
"owner_id",
"is_public",
"is_ready_for_migration",
"others_may_download",
"others_can_execute",
"execution_mode",
"workflow_credential_type",
"worker_tag",
"district_tags",
"comments",
"source_app_id",
"has_private_data_exemption",
"credential_id",
"collection_ids",
] # noqa: E501
all_params.append("async_req")
all_params.append("_return_http_data_only")
all_params.append("_preload_content")
all_params.append("_request_timeout")
params = locals()
for key, val in six.iteritems(params["kwargs"]):
if key not in all_params:
raise TypeError("Got an unexpected keyword argument '%s' to method workflows_create_workflow" % key)
params[key] = val
del params["kwargs"]
# verify the required parameter 'file' is set
if self.api_client.client_side_validation and ("file" not in params or params["file"] is None): # noqa: E501
raise ValueError("Missing the required parameter `file` when calling `workflows_create_workflow`") # noqa: E501
# verify the required parameter 'name' is set
if self.api_client.client_side_validation and ("name" not in params or params["name"] is None): # noqa: E501
raise ValueError("Missing the required parameter `name` when calling `workflows_create_workflow`") # noqa: E501
# verify the required parameter 'owner_id' is set
if self.api_client.client_side_validation and ("owner_id" not in params or params["owner_id"] is None): # noqa: E501
raise ValueError("Missing the required parameter `owner_id` when calling `workflows_create_workflow`") # noqa: E501
# verify the required parameter 'is_public' is set
if self.api_client.client_side_validation and ("is_public" not in params or params["is_public"] is None): # noqa: E501
raise ValueError("Missing the required parameter `is_public` when calling `workflows_create_workflow`") # noqa: E501
# verify the required parameter 'is_ready_for_migration' is set
if self.api_client.client_side_validation and (
"is_ready_for_migration" not in params or params["is_ready_for_migration"] is None
): # noqa: E501
raise ValueError(
"Missing the required parameter `is_ready_for_migration` when calling `workflows_create_workflow`"
) # noqa: E501
# verify the required parameter 'others_may_download' is set
if self.api_client.client_side_validation and (
"others_may_download" not in params or params["others_may_download"] is None
): # noqa: E501
raise ValueError(
"Missing the required parameter `others_may_download` when calling `workflows_create_workflow`"
) # noqa: E501
# verify the required parameter 'others_can_execute' is set
if self.api_client.client_side_validation and (
"others_can_execute" not in params or params["others_can_execute"] is None
): # noqa: E501
raise ValueError(
"Missing the required parameter `others_can_execute` when calling `workflows_create_workflow`"
) # noqa: E501
# verify the required parameter 'execution_mode' is set
if self.api_client.client_side_validation and (
"execution_mode" not in params or params["execution_mode"] is None
): # noqa: E501
raise ValueError("Missing the required parameter `execution_mode` when calling `workflows_create_workflow`") # noqa: E501
# verify the required parameter 'workflow_credential_type' is set
if self.api_client.client_side_validation and (
"workflow_credential_type" not in params or params["workflow_credential_type"] is None
): # noqa: E501
raise ValueError(
"Missing the required parameter `workflow_credential_type` when calling `workflows_create_workflow`"
) # noqa: E501
collection_formats = {}
path_params = {}
query_params = []
header_params = {}
form_params = []
local_var_files = {}
if "file" in params:
local_var_files["file"] = params["file"] # noqa: E501
if "name" in params:
form_params.append(("name", params["name"])) # noqa: E501
if "owner_id" in params:
form_params.append(("ownerId", params["owner_id"])) # noqa: E501
if "worker_tag" in params:
form_params.append(("workerTag", params["worker_tag"])) # noqa: E501
if "district_tags" in params:
form_params.append(("districtTags", params["district_tags"])) # noqa: E501
if "comments" in params:
form_params.append(("comments", params["comments"])) # noqa: E501
if "is_public" in params:
form_params.append(("isPublic", params["is_public"])) # noqa: E501
if "is_ready_for_migration" in params:
form_params.append(("isReadyForMigration", params["is_ready_for_migration"])) # noqa: E501
if "source_app_id" in params:
form_params.append(("sourceAppId", params["source_app_id"])) # noqa: E501
if "others_may_download" in params:
form_params.append(("othersMayDownload", params["others_may_download"])) # noqa: E501
if "others_can_execute" in params:
form_params.append(("othersCanExecute", params["others_can_execute"])) # noqa: E501
if "execution_mode" in params:
form_params.append(("executionMode", params["execution_mode"])) # noqa: E501
if "has_private_data_exemption" in params:
form_params.append(("hasPrivateDataExemption", params["has_private_data_exemption"])) # noqa: E501
if "workflow_credential_type" in params:
form_params.append(("workflowCredentialType", params["workflow_credential_type"])) # noqa: E501
if "credential_id" in params:
form_params.append(("credentialId", params["credential_id"])) # noqa: E501
if "collection_ids" in params:
form_params.append(("collectionIds", params["collection_ids"])) # noqa: E501
body_params = None
# HTTP header `Accept`
header_params["Accept"] = self.api_client.select_header_accept(
["application/json", "text/json", "application/xml", "text/xml"]
) # noqa: E501
# HTTP header `Content-Type`
header_params["Content-Type"] = self.api_client.select_header_content_type( # noqa: E501
["application/form-data"]
) # noqa: E501
# Authentication setting
auth_settings = ["oauth2"] # noqa: E501
return self.api_client.call_api(
"/v3/workflows",
"POST",
path_params,
query_params,
header_params,
body=body_params,
post_params=form_params,
files=local_var_files,
response_type="str", # noqa: E501
auth_settings=auth_settings,
async_req=params.get("async_req"),
_return_http_data_only=params.get("_return_http_data_only"),
_preload_content=params.get("_preload_content", True),
_request_timeout=params.get("_request_timeout"),
collection_formats=collection_formats,
)
def workflows_delete_workflow(self, workflow_id, **kwargs): # noqa: E501
"""Delete an existing workflow. # noqa: E501
Only Curators can use this API endpoint. # noqa: E501
This method makes a synchronous HTTP request by default. To make an
asynchronous HTTP request, please pass async_req=True
>>> thread = api.workflows_delete_workflow(workflow_id, async_req=True)
>>> result = thread.get()
:param async_req bool
:param str workflow_id: Identifier for an existing workflow or app (required)
:param bool force: If the workflow is scheduled, setting force to True will delete all impacted
schedules before deleting the workflow.
:return: None
If the method is called asynchronously,
returns the request thread.
"""
kwargs["_return_http_data_only"] = True
if kwargs.get("async_req"):
return self.workflows_delete_workflow_with_http_info(workflow_id, **kwargs) # noqa: E501
else:
(data) = self.workflows_delete_workflow_with_http_info(workflow_id, **kwargs) # noqa: E501
return data
def workflows_delete_workflow_with_http_info(self, workflow_id, **kwargs): # noqa: E501
"""Delete an existing workflow. # noqa: E501
Only Curators can use this API endpoint. # noqa: E501
This method makes a synchronous HTTP request by default. To make an
asynchronous HTTP request, please pass async_req=True
>>> thread = api.workflows_delete_workflow_with_http_info(workflow_id, async_req=True)
>>> result = thread.get()
:param async_req bool
:param str workflow_id: Identifier for an existing workflow or app (required)
:param bool force: If the workflow is scheduled, setting force to True will delete all impacted schedule
s before deleting the workflow.
:return: None
If the method is called asynchronously,
returns the request thread.
"""
all_params = ["workflow_id", "force"] # noqa: E501
all_params.append("async_req")
all_params.append("_return_http_data_only")
all_params.append("_preload_content")
all_params.append("_request_timeout")
params = locals()
for key, val in six.iteritems(params["kwargs"]):
if key not in all_params:
raise TypeError("Got an unexpected keyword argument '%s' to method workflows_delete_workflow" % key)
params[key] = val
del params["kwargs"]
# verify the required parameter 'workflow_id' is set
if self.api_client.client_side_validation and ("workflow_id" not in params or params["workflow_id"] is None): # noqa: E501
raise ValueError("Missing the required parameter `workflow_id` when calling `workflows_delete_workflow`") # noqa: E501
collection_formats = {}
path_params = {}
if "workflow_id" in params:
path_params["workflowId"] = params["workflow_id"] # noqa: E501
query_params = []
if "force" in params:
query_params.append(("force", params["force"])) # noqa: E501
header_params = {}
form_params = []
local_var_files = {}
body_params = None
# HTTP header `Accept`
header_params["Accept"] = self.api_client.select_header_accept(
["application/json", "text/json", "application/xml", "text/xml"]
) # noqa: E501
# Authentication setting
auth_settings = ["oauth2"] # noqa: E501
return self.api_client.call_api(
"/v3/workflows/{workflowId}",
"DELETE",
path_params,
query_params,
header_params,
body=body_params,
post_params=form_params,
files=local_var_files,
response_type=None, # noqa: E501
auth_settings=auth_settings,
async_req=params.get("async_req"),
_return_http_data_only=params.get("_return_http_data_only"),
_preload_content=params.get("_preload_content", True),
_request_timeout=params.get("_request_timeout"),
collection_formats=collection_formats,
)
def workflows_download_workflow(self, workflow_id, **kwargs): # noqa: E501
"""Download a copy of an existing workflow package. # noqa: E501
This method makes a synchronous HTTP request by default. To make an
asynchronous HTTP request, please pass async_req=True
>>> thread = api.workflows_download_workflow(workflow_id, async_req=True)
>>> result = thread.get()
:param async_req bool
:param str workflow_id: Identifier for an existing workflow or app (required)
:param str version_id: A specific version number of the workflow. If no version is provided,
the published version will be downloaded.
:return: None
If the method is called asynchronously,
returns the request thread.
"""
kwargs["_return_http_data_only"] = True
if kwargs.get("async_req"):
return self.workflows_download_workflow_with_http_info(workflow_id, **kwargs) # noqa: E501
else:
(data) = self.workflows_download_workflow_with_http_info(workflow_id, **kwargs) # noqa: E501
return data
def workflows_download_workflow_with_http_info(self, workflow_id, **kwargs): # noqa: E501
"""Download a copy of an existing workflow package. # noqa: E501
This method makes a synchronous HTTP request by default. To make an
asynchronous HTTP request, please pass async_req=True
>>> thread = api.workflows_download_workflow_with_http_info(workflow_id, async_req=True)
>>> result = thread.get()
:param async_req bool
:param str workflow_id: Identifier for an existing workflow or app (required)
:param str version_id: A specific version number of the workflow. If no version is provided,
the published version will be downloaded.
:return: None
If the method is called asynchronously,
returns the request thread.
"""
all_params = ["workflow_id", "version_id"] # noqa: E501
all_params.append("async_req")
all_params.append("_return_http_data_only")
all_params.append("_preload_content")
all_params.append("_request_timeout")
params = locals()
for key, val in six.iteritems(params["kwargs"]):
if key not in all_params:
raise TypeError("Got an unexpected keyword argument '%s' to method workflows_download_workflow" % key)
params[key] = val
del params["kwargs"]
# verify the required parameter 'workflow_id' is set
if self.api_client.client_side_validation and ("workflow_id" not in params or params["workflow_id"] is None): # noqa: E501
raise ValueError("Missing the required parameter `workflow_id` when calling `workflows_download_workflow`") # noqa: E501
collection_formats = {}
path_params = {}
if "workflow_id" in params:
path_params["workflowId"] = params["workflow_id"] # noqa: E501
query_params = []
if "version_id" in params:
query_params.append(("versionId", params["version_id"])) # noqa: E501
header_params = {}
form_params = []
local_var_files = {}
body_params = None
# HTTP header `Accept`
header_params["Accept"] = self.api_client.select_header_accept(["application/octet-stream"]) # noqa: E501
# Authentication setting
auth_settings = ["oauth2"] # noqa: E501
return self.api_client.call_api(
"/v3/workflows/{workflowId}/package",
"GET",
path_params,
query_params,
header_params,
body=body_params,
post_params=form_params,
files=local_var_files,
response_type=None, # noqa: E501
auth_settings=auth_settings,
async_req=params.get("async_req"),
_return_http_data_only=params.get("_return_http_data_only"),
_preload_content=params.get("_preload_content", True),
_request_timeout=params.get("_request_timeout"),
collection_formats=collection_formats,
)
def workflows_enqueue(self, workflow_id, contract, **kwargs): # noqa: E501
"""Creates a new job and adds it to the job execution queue. # noqa: E501
This method makes a synchronous HTTP request by default. To make an
asynchronous HTTP request, please pass async_req=True
>>> thread = api.workflows_enqueue(workflow_id, contract, async_req=True)
>>> result = thread.get()
:param async_req bool
:param str workflow_id: Identifier for an existing workflow or app (required)
:param EnqueueJobContract contract: (required)
:return: JobView
If the method is called asynchronously,
returns the request thread.
"""
kwargs["_return_http_data_only"] = True
if kwargs.get("async_req"):
return self.workflows_enqueue_with_http_info(workflow_id, contract, **kwargs) # noqa: E501
else:
(data) = self.workflows_enqueue_with_http_info(workflow_id, contract, **kwargs) # noqa: E501
return data
def workflows_enqueue_with_http_info(self, workflow_id, contract, **kwargs): # noqa: E501
"""Creates a new job and adds it to the job execution queue. # noqa: E501
This method makes a synchronous HTTP request by default. To make an
asynchronous HTTP request, please pass async_req=True
>>> thread = api.workflows_enqueue_with_http_info(workflow_id, contract, async_req=True)
>>> result = thread.get()
:param async_req bool
:param str workflow_id: Identifier for an existing workflow or app (required)
:param EnqueueJobContract contract: (required)
:return: JobView
If the method is called asynchronously,
returns the request thread.
"""
all_params = ["workflow_id", "contract"] # noqa: E501
all_params.append("async_req")
all_params.append("_return_http_data_only")
all_params.append("_preload_content")
all_params.append("_request_timeout")
params = locals()
for key, val in six.iteritems(params["kwargs"]):
if key not in all_params:
raise TypeError("Got an unexpected keyword argument '%s' to method workflows_enqueue" % key)
params[key] = val
del params["kwargs"]
# verify the required parameter 'workflow_id' is set
if self.api_client.client_side_validation and ("workflow_id" not in params or params["workflow_id"] is None): # noqa: E501
raise ValueError("Missing the required parameter `workflow_id` when calling `workflows_enqueue`") # noqa: E501
# verify the required parameter 'contract' is set
if self.api_client.client_side_validation and ("contract" not in params or params["contract"] is None): # noqa: E501
raise ValueError("Missing the required parameter `contract` when calling `workflows_enqueue`") # noqa: E501
collection_formats = {}
path_params = {}
if "workflow_id" in params:
path_params["workflowId"] = params["workflow_id"] # noqa: E501
query_params = []
header_params = {}
form_params = []
local_var_files = {}
body_params = None
if "contract" in params:
body_params = params["contract"]
# HTTP header `Accept`
header_params["Accept"] = self.api_client.select_header_accept(
["application/json", "text/json", "application/xml", "text/xml"]
) # noqa: E501
# HTTP header `Content-Type`
header_params["Content-Type"] = self.api_client.select_header_content_type( # noqa: E501
["application/json", "text/json", "application/xml", "text/xml", "application/x-www-form-urlencoded"]
) # noqa: E501
# Authentication setting
auth_settings = ["oauth2"] # noqa: E501
return self.api_client.call_api(
"/v3/workflows/{workflowId}/jobs",
"POST",
path_params,
query_params,
header_params,
body=body_params,
post_params=form_params,
files=local_var_files,
response_type="JobView", # noqa: E501
auth_settings=auth_settings,
async_req=params.get("async_req"),
_return_http_data_only=params.get("_return_http_data_only"),
_preload_content=params.get("_preload_content", True),
_request_timeout=params.get("_request_timeout"),
collection_formats=collection_formats,
)
def workflows_get_jobs_for_workflow(self, workflow_id, **kwargs): # noqa: E501
"""Retrieve a list of jobs for an existing workflow. # noqa: E501
A Job may be returned as Completed even if the query status is Error.
This would indicate an error occurred during execution, but the workflow was run. # noqa: E501
This method makes a synchronous HTTP request by default. To make an
asynchronous HTTP request, please pass async_req=True
>>> thread = api.workflows_get_jobs_for_workflow(workflow_id, async_req=True)
>>> result = thread.get()
:param async_req bool
:param str workflow_id: Identifier for an existing workflow or app (required)
:param str sort_field:
:param str direction:
:param str offset:
:param str limit:
:param str status: The overall status of the job execution. A completed job may still have failed.
:param str result_code: The result code of the execution of a workflow. This can indicate a failed workflow,
but a successful job.
:return: list[WorkflowJobView]
If the method is called asynchronously,
returns the request thread.
"""
kwargs["_return_http_data_only"] = True
if kwargs.get("async_req"):
return self.workflows_get_jobs_for_workflow_with_http_info(workflow_id, **kwargs) # noqa: E501
else:
(data) = self.workflows_get_jobs_for_workflow_with_http_info(workflow_id, **kwargs) # noqa: E501
return data
def workflows_get_jobs_for_workflow_with_http_info(self, workflow_id, **kwargs): # noqa: E501
"""Retrieve a list of jobs for an existing workflow. # noqa: E501
A Job may be returned as Completed even if the query status is Error. This would indicate an error occurred
during execution, but the workflow was run. # noqa: E501
This method makes a synchronous HTTP request by default. To make an
asynchronous HTTP request, please pass async_req=True
>>> thread = api.workflows_get_jobs_for_workflow_with_http_info(workflow_id, async_req=True)
>>> result = thread.get()
:param async_req bool
:param str workflow_id: Identifier for an existing workflow or app (required)
:param str sort_field:
:param str direction:
:param str offset:
:param str limit:
:param str status: The overall status of the job execution. A completed job may still have failed.
:param str result_code: The result code of the execution of a workflow. This can indicate a failed workflow,
but a successful job.
:return: list[WorkflowJobView]
If the method is called asynchronously,
returns the request thread.
"""
all_params = ["workflow_id", "sort_field", "direction", "offset", "limit", "status", "result_code"] # noqa: E501
all_params.append("async_req")
all_params.append("_return_http_data_only")
all_params.append("_preload_content")
all_params.append("_request_timeout")
params = locals()
for key, val in six.iteritems(params["kwargs"]):
if key not in all_params:
raise TypeError(
"Got an unexpected keyword argument '%s' to method workflows_get_jobs_for_workflow" % key
)
params[key] = val
del params["kwargs"]
# verify the required parameter 'workflow_id' is set
if self.api_client.client_side_validation and ("workflow_id" not in params or params["workflow_id"] is None): # noqa: E501
raise ValueError(
"Missing the required parameter `workflow_id` when calling `workflows_get_jobs_for_workflow`"
) # noqa: E501
collection_formats = {}
path_params = {}
if "workflow_id" in params:
path_params["workflowId"] = params["workflow_id"] # noqa: E501
query_params = []
if "sort_field" in params:
query_params.append(("sortField", params["sort_field"])) # noqa: E501
if "direction" in params:
query_params.append(("direction", params["direction"])) # noqa: E501
if "offset" in params:
query_params.append(("offset", params["offset"])) # noqa: E501
if "limit" in params:
query_params.append(("limit", params["limit"])) # noqa: E501
if "status" in params:
query_params.append(("status", params["status"])) # noqa: E501
if "result_code" in params:
query_params.append(("resultCode", params["result_code"])) # noqa: E501
header_params = {}
form_params = []
local_var_files = {}
body_params = None
# HTTP header `Accept`
header_params["Accept"] = self.api_client.select_header_accept(
["application/json", "text/json", "application/xml", "text/xml"]
) # noqa: E501
# Authentication setting
auth_settings = ["oauth2"] # noqa: E501
return self.api_client.call_api(
"/v3/workflows/{workflowId}/jobs",
"GET",
path_params,
query_params,
header_params,
body=body_params,
post_params=form_params,
files=local_var_files,
response_type="list[WorkflowJobView]", # noqa: E501
auth_settings=auth_settings,
async_req=params.get("async_req"),
_return_http_data_only=params.get("_return_http_data_only"),
_preload_content=params.get("_preload_content", True),
_request_timeout=params.get("_request_timeout"),
collection_formats=collection_formats,
)
def workflows_get_workflow(self, workflow_id, **kwargs): # noqa: E501
"""Retrieve details about an existing workflow. # noqa: E501
This method makes a synchronous HTTP request by default. To make an
asynchronous HTTP request, please pass async_req=True
>>> thread = api.workflows_get_workflow(workflow_id, async_req=True)
>>> result = thread.get()
:param async_req bool
:param str workflow_id: Identifier for an existing workflow or app (required)
:return: WorkflowView
If the method is called asynchronously,
returns the request thread.
"""
kwargs["_return_http_data_only"] = True
if kwargs.get("async_req"):
return self.workflows_get_workflow_with_http_info(workflow_id, **kwargs) # noqa: E501
else:
(data) = self.workflows_get_workflow_with_http_info(workflow_id, **kwargs) # noqa: E501
return data
def workflows_get_workflow_with_http_info(self, workflow_id, **kwargs): # noqa: E501
"""Retrieve details about an existing workflow. # noqa: E501
This method makes a synchronous HTTP request by default. To make an
asynchronous HTTP request, please pass async_req=True
>>> thread = api.workflows_get_workflow_with_http_info(workflow_id, async_req=True)
>>> result = thread.get()
:param async_req bool
:param str workflow_id: Identifier for an existing workflow or app (required)
:return: WorkflowView
If the method is called asynchronously,
returns the request thread.
"""
all_params = ["workflow_id"] # noqa: E501
all_params.append("async_req")
all_params.append("_return_http_data_only")
all_params.append("_preload_content")
all_params.append("_request_timeout")
params = locals()
for key, val in six.iteritems(params["kwargs"]):
if key not in all_params:
raise TypeError("Got an unexpected keyword argument '%s' to method workflows_get_workflow" % key)
params[key] = val
del params["kwargs"]
# verify the required parameter 'workflow_id' is set
if self.api_client.client_side_validation and ("workflow_id" not in params or params["workflow_id"] is None): # noqa: E501
raise ValueError("Missing the required parameter `workflow_id` when calling `workflows_get_workflow`") # noqa: E501
collection_formats = {}
path_params = {}
if "workflow_id" in params:
path_params["workflowId"] = params["workflow_id"] # noqa: E501
query_params = []
header_params = {}
form_params = []
local_var_files = {}
body_params = None
# HTTP header `Accept`
header_params["Accept"] = self.api_client.select_header_accept(
["application/json", "text/json", "application/xml", "text/xml"]
) # noqa: E501
# Authentication setting
auth_settings = ["oauth2"] # noqa: E501
return self.api_client.call_api(
"/v3/workflows/{workflowId}",
"GET",
path_params,
query_params,
header_params,
body=body_params,
post_params=form_params,
files=local_var_files,
response_type="WorkflowView", # noqa: E501
auth_settings=auth_settings,
async_req=params.get("async_req"),
_return_http_data_only=params.get("_return_http_data_only"),
_preload_content=params.get("_preload_content", True),
_request_timeout=params.get("_request_timeout"),
collection_formats=collection_formats,
)
def workflows_get_workflow_questions(self, workflow_id, **kwargs): # noqa: E501
"""Retrieve question information for an analytic app. # noqa: E501
This method makes a synchronous HTTP request by default. To make an
asynchronous HTTP request, please pass async_req=True
>>> thread = api.workflows_get_workflow_questions(workflow_id, async_req=True)
>>> result = thread.get()
:param async_req bool
:param str workflow_id: Identifier for an existing workflow or app (required)
:param str version_id: A specific version number of the workflow to get questions of.
:return: list[WorkflowQuestionView]
If the method is called asynchronously,
returns the request thread.
"""
kwargs["_return_http_data_only"] = True
if kwargs.get("async_req"):
return self.workflows_get_workflow_questions_with_http_info(workflow_id, **kwargs) # noqa: E501
else:
(data) = self.workflows_get_workflow_questions_with_http_info(workflow_id, **kwargs) # noqa: E501
return data
def workflows_get_workflow_questions_with_http_info(self, workflow_id, **kwargs): # noqa: E501
"""Retrieve question information for an analytic app. # noqa: E501
This method makes a synchronous HTTP request by default. To make an
asynchronous HTTP request, please pass async_req=True
>>> thread = api.workflows_get_workflow_questions_with_http_info(workflow_id, async_req=True)
>>> result = thread.get()
:param async_req bool
:param str workflow_id: Identifier for an existing workflow or app (required)
:param str version_id: A specific version number of the workflow to get questions of.
:return: list[WorkflowQuestionView]
If the method is called asynchronously,
returns the request thread.
"""
all_params = ["workflow_id", "version_id"] # noqa: E501
all_params.append("async_req")
all_params.append("_return_http_data_only")
all_params.append("_preload_content")
all_params.append("_request_timeout")
params = locals()
for key, val in six.iteritems(params["kwargs"]):
if key not in all_params:
raise TypeError(
"Got an unexpected keyword argument '%s' to method workflows_get_workflow_questions" % key
)
params[key] = val
del params["kwargs"]
# verify the required parameter 'workflow_id' is set
if self.api_client.client_side_validation and ("workflow_id" not in params or params["workflow_id"] is None): # noqa: E501
raise ValueError(
"Missing the required parameter `workflow_id` when calling `workflows_get_workflow_questions`"
) # noqa: E501
collection_formats = {}
path_params = {}
if "workflow_id" in params:
path_params["workflowId"] = params["workflow_id"] # noqa: E501
query_params = []
if "version_id" in params:
query_params.append(("versionId", params["version_id"])) # noqa: E501
header_params = {}
form_params = []
local_var_files = {}
body_params = None
# HTTP header `Accept`
header_params["Accept"] = self.api_client.select_header_accept(
["application/json", "text/json", "application/xml", "text/xml"]
) # noqa: E501
# Authentication setting
auth_settings = ["oauth2"] # noqa: E501
return self.api_client.call_api(
"/v3/workflows/{workflowId}/questions",
"GET",
path_params,
query_params,
header_params,
body=body_params,
post_params=form_params,
files=local_var_files,
response_type="list[WorkflowQuestionView]", # noqa: E501
auth_settings=auth_settings,
async_req=params.get("async_req"),
_return_http_data_only=params.get("_return_http_data_only"),
_preload_content=params.get("_preload_content", True),
_request_timeout=params.get("_request_timeout"),
collection_formats=collection_formats,
)
def workflows_get_workflows(self, **kwargs): # noqa: E501
"""Retrieve all accessible workflow records. # noqa: E501
This method makes a synchronous HTTP request by default. To make an
asynchronous HTTP request, please pass async_req=True
>>> thread = api.workflows_get_workflows(async_req=True)
>>> result = thread.get()
:param async_req bool
:param str view:
:param str name: Filter results by a specific workflow name.
:param str owner_id: Filter results for a specific owner.
:param str created_after: Filter results based on the published revision's created date, inclusive.
:param str created_before: Filter results based on the published revision's created date, inclusive.
:return: list[ReducedWorkflowView]
If the method is called asynchronously,
returns the request thread.
"""
kwargs["_return_http_data_only"] = True
if kwargs.get("async_req"):
return self.workflows_get_workflows_with_http_info(**kwargs) # noqa: E501
else:
(data) = self.workflows_get_workflows_with_http_info(**kwargs) # noqa: E501
return data
def workflows_get_workflows_with_http_info(self, **kwargs): # noqa: E501
"""Retrieve all accessible workflow records. # noqa: E501
This method makes a synchronous HTTP request by default. To make an
asynchronous HTTP request, please pass async_req=True
>>> thread = api.workflows_get_workflows_with_http_info(async_req=True)
>>> result = thread.get()
:param async_req bool
:param str view:
:param str name: Filter results by a specific workflow name.
:param str owner_id: Filter results for a specific owner.
:param str created_after: Filter results based on the published revision's created date, inclusive.
:param str created_before: Filter results based on the published revision's created date, inclusive.
:return: list[ReducedWorkflowView]
If the method is called asynchronously,
returns the request thread.
"""
all_params = ["view", "name", "owner_id", "created_after", "created_before"] # noqa: E501
all_params.append("async_req")
all_params.append("_return_http_data_only")
all_params.append("_preload_content")
all_params.append("_request_timeout")
params = locals()
for key, val in six.iteritems(params["kwargs"]):
if key not in all_params:
raise TypeError("Got an unexpected keyword argument '%s' to method workflows_get_workflows" % key)
params[key] = val
del params["kwargs"]
collection_formats = {}
path_params = {}
query_params = []
if "view" in params:
query_params.append(("view", params["view"])) # noqa: E501
if "name" in params:
query_params.append(("name", params["name"])) # noqa: E501
if "owner_id" in params:
query_params.append(("ownerId", params["owner_id"])) # noqa: E501
if "created_after" in params:
query_params.append(("createdAfter", params["created_after"])) # noqa: E501
if "created_before" in params:
query_params.append(("createdBefore", params["created_before"])) # noqa: E501
header_params = {}
form_params = []
local_var_files = {}
body_params = None
# HTTP header `Accept`
header_params["Accept"] = self.api_client.select_header_accept(
["application/json", "text/json", "application/xml", "text/xml"]
) # noqa: E501
# Authentication setting
auth_settings = ["oauth2"] # noqa: E501
return self.api_client.call_api(
"/v3/workflows",
"GET",
path_params,
query_params,
header_params,
body=body_params,
post_params=form_params,
files=local_var_files,
response_type="list[ReducedWorkflowView]", # noqa: E501
auth_settings=auth_settings,
async_req=params.get("async_req"),
_return_http_data_only=params.get("_return_http_data_only"),
_preload_content=params.get("_preload_content", True),
_request_timeout=params.get("_request_timeout"),
collection_formats=collection_formats,
)
def workflows_transfer_workflow(self, workflow_id, contract, **kwargs): # noqa: E501
"""Transfer specified workflow to specified owner and also schedules if desired # noqa: E501
If any of the workflows require DCM connections, server connections, or specific run as credentials to run,
these items will need to be updated before the workflow can run.
Additionally, if users are not in the same studio,
when a workflow is transferred to the new studio, all other users in the new owner's studio will also receive
access to the workflow, and all users from the old studio will lose access. # noqa: E501
This method makes a synchronous HTTP request by default. To make an
asynchronous HTTP request, please pass async_req=True
>>> thread = api.workflows_transfer_workflow(workflow_id, contract, async_req=True)
>>> result = thread.get()
:param async_req bool
:param str workflow_id: Id of the workflow to transfer (required)
:param TransferWorkflowContract contract: (required)
:return: None
If the method is called asynchronously,
returns the request thread.
"""
kwargs["_return_http_data_only"] = True
if kwargs.get("async_req"):
return self.workflows_transfer_workflow_with_http_info(workflow_id, contract, **kwargs) # noqa: E501
else:
(data) = self.workflows_transfer_workflow_with_http_info(workflow_id, contract, **kwargs) # noqa: E501
return data
def workflows_transfer_workflow_with_http_info(self, workflow_id, contract, **kwargs): # noqa: E501
"""Transfer specified workflow to specified owner and also schedules if desired # noqa: E501
If any of the workflows require DCM connections, server connections, or specific run as credentials to run,
these items will need to be updated before the workflow can run. Additionally, if users are not in the same
studio, when a workflow is transferred to the new studio, all other users in the new owner's studio will also
receive access to the workflow, and all users from the old studio will lose access. # noqa: E501
This method makes a synchronous HTTP request by default. To make an
asynchronous HTTP request, please pass async_req=True
>>> thread = api.workflows_transfer_workflow_with_http_info(workflow_id, contract, async_req=True)
>>> result = thread.get()
:param async_req bool
:param str workflow_id: Id of the workflow to transfer (required)
:param TransferWorkflowContract contract: (required)
:return: None
If the method is called asynchronously,
returns the request thread.
"""
all_params = ["workflow_id", "contract"] # noqa: E501
all_params.append("async_req")
all_params.append("_return_http_data_only")
all_params.append("_preload_content")
all_params.append("_request_timeout")
params = locals()
for key, val in six.iteritems(params["kwargs"]):
if key not in all_params:
raise TypeError("Got an unexpected keyword argument '%s' to method workflows_transfer_workflow" % key)
params[key] = val
del params["kwargs"]
# verify the required parameter 'workflow_id' is set
if self.api_client.client_side_validation and ("workflow_id" not in params or params["workflow_id"] is None): # noqa: E501
raise ValueError("Missing the required parameter `workflow_id` when calling `workflows_transfer_workflow`") # noqa: E501
# verify the required parameter 'contract' is set
if self.api_client.client_side_validation and ("contract" not in params or params["contract"] is None): # noqa: E501
raise ValueError("Missing the required parameter `contract` when calling `workflows_transfer_workflow`") # noqa: E501
collection_formats = {}
path_params = {}
if "workflow_id" in params:
path_params["workflowId"] = params["workflow_id"] # noqa: E501
query_params = []
header_params = {}
form_params = []
local_var_files = {}
body_params = None
if "contract" in params:
body_params = params["contract"]
# HTTP header `Accept`
header_params["Accept"] = self.api_client.select_header_accept(
["application/json", "text/json", "application/xml", "text/xml"]
) # noqa: E501
# HTTP header `Content-Type`
header_params["Content-Type"] = self.api_client.select_header_content_type( # noqa: E501
["application/json", "text/json", "application/xml", "text/xml", "application/x-www-form-urlencoded"]
) # noqa: E501
# Authentication setting
auth_settings = ["oauth2"] # noqa: E501
return self.api_client.call_api(
"/v3/workflows/{workflowId}/transfer",
"PUT",
path_params,
query_params,
header_params,
body=body_params,
post_params=form_params,
files=local_var_files,
response_type=None, # noqa: E501
auth_settings=auth_settings,
async_req=params.get("async_req"),
_return_http_data_only=params.get("_return_http_data_only"),
_preload_content=params.get("_preload_content", True),
_request_timeout=params.get("_request_timeout"),
collection_formats=collection_formats,
)
def workflows_update_workflow(self, workflow_id, update_workflow_contract, **kwargs): # noqa: E501
"""Update details of an existing workflow. # noqa: E501
Only Curators can use this API endpoint. To change the ownerId, the new owner must be in the same subscription
as the current owner. # noqa: E501
This method makes a synchronous HTTP request by default. To make an
asynchronous HTTP request, please pass async_req=True
>>> thread = api.workflows_update_workflow(workflow_id, update_workflow_contract, async_req=True)
>>> result = thread.get()
:param async_req bool
:param str workflow_id: Identifier for an existing workflow or app (required)
:param UpdateWorkflowContract update_workflow_contract: (required)
:return: WorkflowView
If the method is called asynchronously,
returns the request thread.
"""
kwargs["_return_http_data_only"] = True
if kwargs.get("async_req"):
return self.workflows_update_workflow_with_http_info(workflow_id, update_workflow_contract, **kwargs) # noqa: E501
else:
(data) = self.workflows_update_workflow_with_http_info(workflow_id, update_workflow_contract, **kwargs) # noqa: E501
return data
def workflows_update_workflow_with_http_info(self, workflow_id, update_workflow_contract, **kwargs): # noqa: E501
"""Update details of an existing workflow. # noqa: E501
Only Curators can use this API endpoint. To change the ownerId, the new owner must be in the same subscription
as the current owner. # noqa: E501
This method makes a synchronous HTTP request by default. To make an
asynchronous HTTP request, please pass async_req=True
>>> thread = api.workflows_update_workflow_with_http_info(workflow_id, update_workflow_contract, async_req=True)
>>> result = thread.get()
:param async_req bool
:param str workflow_id: Identifier for an existing workflow or app (required)
:param UpdateWorkflowContract update_workflow_contract: (required)
:return: WorkflowView
If the method is called asynchronously,
returns the request thread.
"""
all_params = ["workflow_id", "update_workflow_contract"] # noqa: E501
all_params.append("async_req")
all_params.append("_return_http_data_only")
all_params.append("_preload_content")
all_params.append("_request_timeout")
params = locals()
for key, val in six.iteritems(params["kwargs"]):
if key not in all_params:
raise TypeError("Got an unexpected keyword argument '%s' to method workflows_update_workflow" % key)
params[key] = val
del params["kwargs"]
# verify the required parameter 'workflow_id' is set
if self.api_client.client_side_validation and ("workflow_id" not in params or params["workflow_id"] is None): # noqa: E501
raise ValueError("Missing the required parameter `workflow_id` when calling `workflows_update_workflow`") # noqa: E501
# verify the required parameter 'update_workflow_contract' is set
if self.api_client.client_side_validation and (
"update_workflow_contract" not in params or params["update_workflow_contract"] is None
): # noqa: E501
raise ValueError(
"Missing the required parameter `update_workflow_contract` when calling `workflows_update_workflow`"
) # noqa: E501
collection_formats = {}
path_params = {}
if "workflow_id" in params:
path_params["workflowId"] = params["workflow_id"] # noqa: E501
query_params = []
header_params = {}
form_params = []
local_var_files = {}
body_params = None
if "update_workflow_contract" in params:
body_params = params["update_workflow_contract"]
# HTTP header `Accept`
header_params["Accept"] = self.api_client.select_header_accept(
["application/json", "text/json", "application/xml", "text/xml"]
) # noqa: E501
# HTTP header `Content-Type`
header_params["Content-Type"] = self.api_client.select_header_content_type( # noqa: E501
["application/json", "text/json", "application/xml", "text/xml", "application/x-www-form-urlencoded"]
) # noqa: E501
# Authentication setting
auth_settings = ["oauth2"] # noqa: E501
return self.api_client.call_api(
"/v3/workflows/{workflowId}",
"PUT",
path_params,
query_params,
header_params,
body=body_params,
post_params=form_params,
files=local_var_files,
response_type="WorkflowView", # noqa: E501
auth_settings=auth_settings,
async_req=params.get("async_req"),
_return_http_data_only=params.get("_return_http_data_only"),
_preload_content=params.get("_preload_content", True),
_request_timeout=params.get("_request_timeout"),
collection_formats=collection_formats,
)