create_file
Adds a new file to a QuantConnect project by specifying the project ID, file name, and content. Facilitates efficient strategy design and implementation.
Instructions
Create a new file in a QuantConnect project.
Args: project_id: ID of the project to add the file to name: Name of the file (e.g., "main.py", "algorithm.cs") content: Content of the file
Returns: Dictionary containing file creation result
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| content | Yes | ||
| name | Yes | ||
| project_id | Yes |
Implementation Reference
- The core implementation of the `create_file` tool. This async function, decorated with `@mcp.tool()`, handles authentication, makes a POST request to QuantConnect's `/files/create` endpoint, and returns a structured success/error response.@mcp.tool() async def create_file(project_id: int, name: str, content: str) -> Dict[str, Any]: """ Create a new file in a QuantConnect project. Args: project_id: ID of the project to add the file to name: Name of the file (e.g., "main.py", "algorithm.cs") content: Content of the file Returns: Dictionary containing file creation result """ auth = get_auth_instance() if auth is None: return { "status": "error", "error": "QuantConnect authentication not configured. Use configure_auth() first.", } try: # Prepare request data request_data = {"projectId": project_id, "name": name, "content": content} # Make API request response = await auth.make_authenticated_request( endpoint="files/create", method="POST", json=request_data ) # Parse response if response.status_code == 200: data = response.json() if data.get("success", False): return { "status": "success", "project_id": project_id, "file_name": name, "content_length": len(content), "message": f"Successfully created file '{name}' in project {project_id}", } else: # API returned success=false errors = data.get("errors", ["Unknown error"]) return { "status": "error", "error": "File creation failed", "details": errors, "project_id": project_id, "file_name": name, } elif response.status_code == 401: return { "status": "error", "error": "Authentication failed. Check your credentials and ensure they haven't expired.", } else: return { "status": "error", "error": f"API request failed with status {response.status_code}", "response_text": ( response.text[:500] if hasattr(response, "text") else "No response text" ), } except Exception as e: return { "status": "error", "error": f"Failed to create file: {str(e)}", "project_id": project_id, "file_name": name, }
- quantconnect_mcp/src/server.py:76-76 (registration)Server configuration where `register_file_tools(mcp)` is called, triggering the definition and registration of the `create_file` tool (and other file tools) with the FastMCP instance.register_file_tools(mcp)
- quantconnect_mcp/main.py:49-49 (registration)Main entrypoint script where `register_file_tools(mcp)` is called as part of tool registration sequence.register_file_tools(mcp)
- Type annotations and docstring defining the input schema (project_id: int, name: str, content: str) and output format (Dict[str, Any] with status, error/success details).async def create_file(project_id: int, name: str, content: str) -> Dict[str, Any]: """ Create a new file in a QuantConnect project. Args: project_id: ID of the project to add the file to name: Name of the file (e.g., "main.py", "algorithm.cs") content: Content of the file Returns: Dictionary containing file creation result """
- Helper function `register_file_tools` that defines all file-related tools including `create_file` when invoked with the MCP instance.def register_file_tools(mcp: FastMCP): """Register file management tools with the MCP server.""" @mcp.tool() async def create_file(project_id: int, name: str, content: str) -> Dict[str, Any]: """ Create a new file in a QuantConnect project. Args: project_id: ID of the project to add the file to name: Name of the file (e.g., "main.py", "algorithm.cs") content: Content of the file Returns: Dictionary containing file creation result """ auth = get_auth_instance() if auth is None: return { "status": "error", "error": "QuantConnect authentication not configured. Use configure_auth() first.", } try: # Prepare request data request_data = {"projectId": project_id, "name": name, "content": content} # Make API request response = await auth.make_authenticated_request( endpoint="files/create", method="POST", json=request_data ) # Parse response if response.status_code == 200: data = response.json() if data.get("success", False): return { "status": "success", "project_id": project_id, "file_name": name, "content_length": len(content), "message": f"Successfully created file '{name}' in project {project_id}", } else: # API returned success=false errors = data.get("errors", ["Unknown error"]) return { "status": "error", "error": "File creation failed", "details": errors, "project_id": project_id, "file_name": name, } elif response.status_code == 401: return { "status": "error", "error": "Authentication failed. Check your credentials and ensure they haven't expired.", } else: return { "status": "error", "error": f"API request failed with status {response.status_code}", "response_text": ( response.text[:500] if hasattr(response, "text") else "No response text" ), } except Exception as e: return { "status": "error", "error": f"Failed to create file: {str(e)}", "project_id": project_id, "file_name": name, } @mcp.tool() async def read_file(project_id: int, name: Optional[str] = None) -> Dict[str, Any]: """ Read a specific file from a project or all files if no name provided. Args: project_id: ID of the project to read files from name: Optional name of specific file to read. If not provided, reads all files. Returns: Dictionary containing file content(s) or error information """ auth = get_auth_instance() if auth is None: return { "status": "error", "error": "QuantConnect authentication not configured. Use configure_auth() first.", } try: # Prepare request data request_data: Dict[str, Any] = {"projectId": project_id} if name is not None: request_data["name"] = name # Make API request response = await auth.make_authenticated_request( endpoint="files/read", method="POST", json=request_data ) # Parse response if response.status_code == 200: data = response.json() if data.get("success", False): files = data.get("files", []) # If specific file was requested if name is not None: if files: file_data = files[0] return { "status": "success", "project_id": project_id, "file": file_data, "message": f"Successfully read file '{name}' from project {project_id}", } else: return { "status": "error", "error": f"File '{name}' not found in project {project_id}", } # If all files were requested else: return { "status": "success", "project_id": project_id, "files": files, "total_files": len(files), "message": f"Successfully read {len(files)} files from project {project_id}", } else: # API returned success=false errors = data.get("errors", ["Unknown error"]) return { "status": "error", "error": "File read failed", "details": errors, "project_id": project_id, "file_name": name, } elif response.status_code == 401: return { "status": "error", "error": "Authentication failed. Check your credentials and ensure they haven't expired.", } else: return { "status": "error", "error": f"API request failed with status {response.status_code}", "response_text": ( response.text[:500] if hasattr(response, "text") else "No response text" ), } except Exception as e: return { "status": "error", "error": f"Failed to read file(s): {str(e)}", "project_id": project_id, "file_name": name, } @mcp.tool() async def update_file_content( project_id: int, name: str, content: str ) -> Dict[str, Any]: """ Update the content of a file in a QuantConnect project. Args: project_id: ID of the project containing the file name: Name of the file to update content: New content for the file Returns: Dictionary containing update result """ auth = get_auth_instance() if auth is None: return { "status": "error", "error": "QuantConnect authentication not configured. Use configure_auth() first.", } try: # Prepare request data request_data = {"projectId": project_id, "name": name, "content": content} # Make API request response = await auth.make_authenticated_request( endpoint="files/update", method="POST", json=request_data ) # Parse response if response.status_code == 200: data = response.json() if data.get("success", False): return { "status": "success", "project_id": project_id, "file_name": name, "content_length": len(content), "message": f"Successfully updated content of file '{name}' in project {project_id}", } else: # API returned success=false errors = data.get("errors", ["Unknown error"]) return { "status": "error", "error": "File content update failed", "details": errors, "project_id": project_id, "file_name": name, } elif response.status_code == 401: return { "status": "error", "error": "Authentication failed. Check your credentials and ensure they haven't expired.", } else: return { "status": "error", "error": f"API request failed with status {response.status_code}", "response_text": ( response.text[:500] if hasattr(response, "text") else "No response text" ), } except Exception as e: return { "status": "error", "error": f"Failed to update file content: {str(e)}", "project_id": project_id, "file_name": name, } @mcp.tool() async def update_file_name( project_id: int, old_file_name: str, new_name: str ) -> Dict[str, Any]: """ Update the name of a file in a QuantConnect project. Args: project_id: ID of the project containing the file old_file_name: Current name of the file new_name: New name for the file Returns: Dictionary containing update result """ auth = get_auth_instance() if auth is None: return { "status": "error", "error": "QuantConnect authentication not configured. Use configure_auth() first.", } try: # Prepare request data request_data = { "projectId": project_id, "oldFileName": old_file_name, "newName": new_name, } # Make API request response = await auth.make_authenticated_request( endpoint="files/update", method="POST", json=request_data ) # Parse response if response.status_code == 200: data = response.json() if data.get("success", False): return { "status": "success", "project_id": project_id, "old_name": old_file_name, "new_name": new_name, "message": f"Successfully renamed file from '{old_file_name}' to '{new_name}' in project {project_id}", } else: # API returned success=false errors = data.get("errors", ["Unknown error"]) return { "status": "error", "error": "File name update failed", "details": errors, "project_id": project_id, "old_name": old_file_name, "new_name": new_name, } elif response.status_code == 401: return { "status": "error", "error": "Authentication failed. Check your credentials and ensure they haven't expired.", } else: return { "status": "error", "error": f"API request failed with status {response.status_code}", "response_text": ( response.text[:500] if hasattr(response, "text") else "No response text" ), } except Exception as e: return { "status": "error", "error": f"Failed to update file name: {str(e)}", "project_id": project_id, "old_name": old_file_name, "new_name": new_name, } @mcp.tool() async def delete_file(project_id: int, name: str) -> Dict[str, Any]: """ Delete a file from a QuantConnect project. Args: project_id: ID of the project containing the file to delete name: Name of the file to delete Returns: Dictionary containing deletion result """ auth = get_auth_instance() if auth is None: return { "status": "error", "error": "QuantConnect authentication not configured. Use configure_auth() first.", } try: # Prepare request data request_data = {"projectId": project_id, "name": name} # Make API request response = await auth.make_authenticated_request( endpoint="files/delete", method="POST", json=request_data ) # Parse response if response.status_code == 200: data = response.json() if data.get("success", False): return { "status": "success", "project_id": project_id, "file_name": name, "message": f"Successfully deleted file '{name}' from project {project_id}", } else: # API returned success=false errors = data.get("errors", ["Unknown error"]) return { "status": "error", "error": "File deletion failed", "details": errors, "project_id": project_id, "file_name": name, } elif response.status_code == 401: return { "status": "error", "error": "Authentication failed. Check your credentials and ensure they haven't expired.", } else: return { "status": "error", "error": f"API request failed with status {response.status_code}", "response_text": ( response.text[:500] if hasattr(response, "text") else "No response text" ), } except Exception as e: return { "status": "error", "error": f"Failed to delete file: {str(e)}", "project_id": project_id, "file_name": name, }