get_job_output_data
Retrieve output data file paths from completed Alteryx jobs to access and analyze workflow results stored in the server's temporary directory.
Instructions
Get the output data generated by a job. This will return a list of file paths to the output data. The output data is stored in the temp directory of the server.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| job_id | Yes |
Implementation Reference
- src/tools.py:518-587 (handler)The main handler function that implements the logic for get_job_output_data. It checks if the job exists and is completed, then downloads each output file from the Alteryx API, saves them to the temp directory with appropriate extensions based on available formats, and returns the list of saved file paths.def get_job_output_data(self, job_id: str): """Get the output data for a job""" try: # check if job exists job = self.jobs_api.jobs_get_job_v3(job_id) if not job: return "Error: Job not found" # check if job is completed if job.status != "Completed": return "Error: Job is not completed" temp_directory = self.configuration.temp_directory # normalize the temp directory temp_directory = os.path.normpath(temp_directory) if not os.path.exists(temp_directory): os.makedirs(temp_directory) all_output_files = [] for output in job.outputs: output_id = output.id file_name = output.file_name available_output_types = output.available_formats # get file name with extension from file_name # Extract base name without extension if it exists base_name = os.path.splitext(os.path.basename(file_name))[0] # Get the file extension from the file name raw_file_extension = os.path.splitext(os.path.basename(file_name))[1] # Map output format to file extension format_extension_map = { 'Raw': raw_file_extension if raw_file_extension else '.txt', 'Yxdb': '.yxdb', 'Shp': '.shp', 'Kml': '.kml', 'Tab': '.tab', 'Mif': '.mif', 'Dbf': '.dbf', 'Csv': '.csv', 'Pdf': '.pdf', 'Docx': '.docx', 'Xlsx': '.xlsx', 'Html': '.html', 'Tde': '.tde', 'Zip': '.zip' } # Get the extension for the first available format output_format = available_output_types[0] if available_output_types else 'Raw' file_extension = format_extension_map.get(output_format, raw_file_extension) file_name_with_extension = base_name + file_extension # get the output data api_response = self.jobs_api.jobs_get_output_file(job_id, output_id, output_format) # Convert to bytes if it's a string if isinstance(api_response, str): api_response_bytes = api_response.encode('utf-8') else: api_response_bytes = api_response with open(f"{temp_directory}/{job_id}_{output_id}_{file_name_with_extension}", "wb") as f: f.write(api_response_bytes) all_output_files.append(f"{temp_directory}/{job_id}_{output_id}_{file_name_with_extension}") return f"Output files saved to: {pprint.pformat(all_output_files)} \n\n" except ApiException as e: return f"Error: {e}"
- src/mcp_server.py:297-302 (registration)The MCP tool registration using @self.app.tool() decorator. This wrapper function delegates the call to the underlying tools instance's get_job_output_data method.@self.app.tool() def get_job_output_data(job_id: str): """Get the output data generated by a job. This will return a list of file paths to the output data. The output data is stored in the temp directory of the server.""" return self.tools.get_job_output_data(job_id)