pyp6xer_batch_update
Update multiple project activities in one API call for efficient batch modifications in Primavera P6 XER files.
Instructions
Update multiple activities in a single call.
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| updates | Yes | List of {task_code, ...fields} dicts, one per activity to update | |
| cache_key | No | Cache key identifying the loaded XER file (set when calling pyp6xer_load_file) | default |
Output Schema
| Name | Required | Description | Default |
|---|---|---|---|
| result | Yes |
Implementation Reference
- server.py:1786-1824 (handler)Main handler for pyp6xer_batch_update tool. Accepts a list of update dicts, iterates over them applying each via _apply_activity_update helper, and returns a JSON summary of results and errors.
@mcp.tool(annotations=ToolAnnotations(readOnlyHint=False, destructiveHint=False, idempotentHint=True, openWorldHint=False)) def pyp6xer_batch_update( updates: Annotated[list, Field(description="List of {task_code, ...fields} dicts, one per activity to update")], cache_key: Annotated[str, Field(description="Cache key identifying the loaded XER file (set when calling pyp6xer_load_file)")] = "default", ctx: Context = None, ) -> str: """Update multiple activities in a single call. Args: updates: List of update dicts, each with: - task_code (str): Activity ID - proj_id (str, optional): Project filter - fields (dict): {field_name: new_value} cache_key: Cache key of the loaded file. Example updates list: [{"task_code": "A1000", "fields": {"status_code": "TK_Complete"}}, {"task_code": "A1010", "fields": {"phys_complete_pct": 75}}] """ entry = _get_cache(ctx, cache_key) results = [] errors = [] for item in updates: code = item.get("task_code") pid = item.get("proj_id") fields = item.get("fields", {}) try: applied = _apply_activity_update(entry, code, pid, fields) results.append({"task_code": code, "status": "updated", "changes": applied}) except Exception as e: errors.append({"task_code": code, "error": str(e)}) return json.dumps({ "updated": len(results), "errors": len(errors), "results": results, "error_details": errors, "note": "Call pyp6xer_write_file to persist. Omit output_path to overwrite the source file.", }, indent=2) - server.py:1682-1747 (helper)Helper function that performs the actual field updates on both the in-memory Xer task object and the raw_tables dict. Used by both pyp6xer_update_activity and pyp6xer_batch_update.
def _apply_activity_update(entry: dict, task_code: str, proj_id: str | None, updates: dict) -> dict: """ Apply updates to both the in-memory Xer object and the raw_tables dict. Returns a dict of {field: {from, to}} describing what changed. """ xer: Xer = entry["xer"] raw_tables: dict = entry["raw_tables"] # Find the in-memory task tasks = _get_tasks(xer, proj_id) task = next((t for t in tasks if t.task_code == task_code), None) if task is None: raise ValueError(f"Activity '{task_code}' not found.") # Find the raw TASK row task_rows = raw_tables.get("TASK", {}).get("rows", []) raw_row = next((r for r in task_rows if r.get("task_id") == task.uid), None) applied = {} for field, value in updates.items(): if field not in _UPDATABLE_FIELDS: raise ValueError( f"Field '{field}' is not updatable. " f"Updatable fields: {list(_UPDATABLE_FIELDS.keys())}" ) raw_field = _UPDATABLE_FIELDS[field] # Get old value for change log old_raw = raw_row.get(raw_field, "") if raw_row else "" # Update raw table row if raw_row is not None: if field in ("act_start_date", "act_end_date", "expect_end_date", "target_start_date", "target_end_date"): # Convert YYYY-MM-DD to XER datetime format if needed if value and ":" not in str(value): value_raw = f"{value} 00:00" else: value_raw = value or "" raw_row[raw_field] = value_raw else: raw_row[raw_field] = str(value) if value is not None else "" # Update in-memory object (best-effort) try: if field == "status_code": from xerparser.schemas.task import TASK as TaskClass task.status = TaskClass.TaskStatus[value] elif field == "phys_complete_pct": task.phys_complete_pct = float(value) / 100.0 elif field == "remain_drtn_hr_cnt": task.remain_drtn_hr_cnt = float(value) elif field in ("act_start_date", "act_end_date", "expect_end_date", "target_start_date", "target_end_date"): from xerparser.src.utils import optional_date if value: dt = datetime.strptime(str(value).strip(), "%Y-%m-%d") setattr(task, field, dt) else: setattr(task, field, None) except Exception: pass # Raw table update is authoritative; in-memory is best-effort applied[field] = {"from": old_raw, "to": str(value)} return applied - server.py:1668-1677 (schema)Schema/field mapping defining which fields are updatable via batch_update (and update_activity). Maps friendly names to raw XER table field names.
_UPDATABLE_FIELDS = { "status_code": "status_code", "phys_complete_pct": "phys_complete_pct", "remain_drtn_hr_cnt": "remain_drtn_hr_cnt", "act_start_date": "act_start_date", "act_end_date": "act_end_date", "expect_end_date": "expect_end_date", "target_start_date": "target_start_date", "target_end_date": "target_end_date", } - server.py:1786-1804 (registration)Registration of pyp6xer_batch_update as an MCP tool via the @mcp.tool decorator on line 1786.
@mcp.tool(annotations=ToolAnnotations(readOnlyHint=False, destructiveHint=False, idempotentHint=True, openWorldHint=False)) def pyp6xer_batch_update( updates: Annotated[list, Field(description="List of {task_code, ...fields} dicts, one per activity to update")], cache_key: Annotated[str, Field(description="Cache key identifying the loaded XER file (set when calling pyp6xer_load_file)")] = "default", ctx: Context = None, ) -> str: """Update multiple activities in a single call. Args: updates: List of update dicts, each with: - task_code (str): Activity ID - proj_id (str, optional): Project filter - fields (dict): {field_name: new_value} cache_key: Cache key of the loaded file. Example updates list: [{"task_code": "A1000", "fields": {"status_code": "TK_Complete"}}, {"task_code": "A1010", "fields": {"phys_complete_pct": 75}}] """