pyp6xer_export_csv
Export project activities from a loaded Primavera P6 XER file to CSV format, using a cache key and optional project ID and field subset.
Instructions
Export activities to CSV format (returned as a string).
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| cache_key | No | Cache key identifying the loaded XER file (set when calling pyp6xer_load_file) | default |
| proj_id | No | Project ID or short name; uses first project if omitted | |
| fields | No | Subset of field names to return; call pyp6xer_get_activity_schema to see available names |
Output Schema
| Name | Required | Description | Default |
|---|---|---|---|
| result | Yes |
Implementation Reference
- server.py:1548-1554 (registration)The tool 'pyp6xer_export_csv' is registered as an MCP tool via the @mcp.tool decorator on line 1548.
@mcp.tool(annotations=ToolAnnotations(readOnlyHint=True, destructiveHint=False, idempotentHint=True, openWorldHint=False)) def pyp6xer_export_csv( cache_key: Annotated[str, Field(description="Cache key identifying the loaded XER file (set when calling pyp6xer_load_file)")] = "default", proj_id: Annotated[str | None, Field(description="Project ID or short name; uses first project if omitted")] = None, fields: Annotated[list[str] | None, Field(description="Subset of field names to return; call pyp6xer_get_activity_schema to see available names")] = None, ctx: Context = None, ) -> str: - server.py:1549-1587 (handler)The handler function for pyp6xer_export_csv — loads XER data, serializes activities to CSV using the csv.DictWriter, and returns the CSV content as a JSON response.
def pyp6xer_export_csv( cache_key: Annotated[str, Field(description="Cache key identifying the loaded XER file (set when calling pyp6xer_load_file)")] = "default", proj_id: Annotated[str | None, Field(description="Project ID or short name; uses first project if omitted")] = None, fields: Annotated[list[str] | None, Field(description="Subset of field names to return; call pyp6xer_get_activity_schema to see available names")] = None, ctx: Context = None, ) -> str: """Export activities to CSV format (returned as a string). Args: cache_key: Cache key of the loaded file. proj_id: Optional project filter. fields: Column names to include. Call pyp6xer_get_activity_schema for available names. Defaults to all standard summary fields. """ xer = _get_xer(ctx, cache_key) tasks = _get_tasks(xer, proj_id) default_fields = [ "task_code", "name", "status", "type", "wbs", "wbs_name", "start", "finish", "target_start", "target_finish", "original_duration_days", "remaining_duration_days", "total_float_days", "is_critical", "percent_complete", "budgeted_cost", "actual_cost", "remaining_cost", ] cols = fields or default_fields output = io.StringIO() writer = csv.DictWriter(output, fieldnames=cols, extrasaction="ignore") writer.writeheader() for t in tasks: row = _task_to_dict(t) writer.writerow(row) return json.dumps({ "format": "csv", "activity_count": len(tasks), "fields": cols, "csv_content": output.getvalue(), }, indent=2) - server.py:1549-1553 (schema)Input parameters: cache_key (str), proj_id (optional str), fields (optional list of str to select columns), and ctx (Context). Output is a JSON string with format, activity_count, fields, and csv_content.
def pyp6xer_export_csv( cache_key: Annotated[str, Field(description="Cache key identifying the loaded XER file (set when calling pyp6xer_load_file)")] = "default", proj_id: Annotated[str | None, Field(description="Project ID or short name; uses first project if omitted")] = None, fields: Annotated[list[str] | None, Field(description="Subset of field names to return; call pyp6xer_get_activity_schema to see available names")] = None, ctx: Context = None, - server.py:175-211 (helper)The _task_to_dict helper function is used by the export CSV handler to convert each task to a dict before writing CSV rows.
def _task_to_dict(task, fields: list[str] | None = None) -> dict: """Standard activity summary dict. Pass fields to project to a subset.""" try: start = _fmt_date(task.start) except (ValueError, AttributeError): start = "" try: finish = _fmt_date(task.finish) except (ValueError, AttributeError): finish = "" full = { "task_id": task.uid, "task_code": task.task_code, "name": task.name, "status": task.status.value, "type": task.type.value, "wbs": task.wbs.full_code if task.wbs else "", "wbs_name": task.wbs.name if task.wbs else "", "start": start, "finish": finish, "target_start": _fmt_date(task.target_start_date), "target_finish": _fmt_date(task.target_end_date), "original_duration_days": task.original_duration, "remaining_duration_days": task.remaining_duration, "total_float_days": task.total_float, "free_float_days": task.free_float, "is_critical": task.is_critical, "is_longest_path": task.is_longest_path, "percent_complete": round(task.percent_complete * 100, 1), "budgeted_cost": task.budgeted_cost, "actual_cost": task.actual_cost, "remaining_cost": task.remaining_cost, } if fields: return {k: v for k, v in full.items() if k in fields} return full