search_events
Search calendar events within a specified date range using the Microsoft MCP server. Retrieve relevant events by query and account ID, with customizable parameters for days ahead, days back, and result limits.
Instructions
Search calendar events using the modern search API.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| account_id | Yes | ||
| days_ahead | No | ||
| days_back | No | ||
| limit | No | ||
| query | Yes |
Implementation Reference
- src/microsoft_mcp/tools.py:888-920 (handler)The handler function implementing the search_events tool logic, including registration via @mcp.tool decorator. Searches events via graph.search_query and applies optional date filtering.@mcp.tool def search_events( query: str, account_id: str, days_ahead: int = 365, days_back: int = 365, limit: int = 50, ) -> list[dict[str, Any]]: """Search calendar events using the modern search API.""" events = list(graph.search_query(query, ["event"], account_id, limit)) # Filter by date range if needed if days_ahead != 365 or days_back != 365: now = dt.datetime.now(dt.timezone.utc) start = now - dt.timedelta(days=days_back) end = now + dt.timedelta(days=days_ahead) filtered_events = [] for event in events: event_start = dt.datetime.fromisoformat( event.get("start", {}).get("dateTime", "").replace("Z", "+00:00") ) event_end = dt.datetime.fromisoformat( event.get("end", {}).get("dateTime", "").replace("Z", "+00:00") ) if event_start <= end and event_end >= start: filtered_events.append(event) return filtered_events return events
- src/microsoft_mcp/graph.py:277-331 (helper)Supporting utility function that performs the actual search query to Microsoft Graph /search/query endpoint, handling pagination and yielding results. Called by search_events.def search_query( query: str, entity_types: list[str], account_id: str | None = None, limit: int = 50, fields: list[str] | None = None, ) -> Iterator[dict[str, Any]]: """Use the modern /search/query API endpoint""" payload = { "requests": [ { "entityTypes": entity_types, "query": {"queryString": query}, "size": min(limit, 25), "from": 0, } ] } if fields: payload["requests"][0]["fields"] = fields items_returned = 0 while True: result = request("POST", "/search/query", account_id, json=payload) if not result or "value" not in result: break for response in result["value"]: if "hitsContainers" in response: for container in response["hitsContainers"]: if "hits" in container: for hit in container["hits"]: if limit and items_returned >= limit: return yield hit["resource"] items_returned += 1 if "@odata.nextLink" in result: break has_more = False for response in result.get("value", []): for container in response.get("hitsContainers", []): if container.get("moreResultsAvailable"): has_more = True break if not has_more: break payload["requests"][0]["from"] += payload["requests"][0]["size"]