Skip to main content
Glama
client.py23.4 kB
import httpx import json import os import asyncio import logging from typing import Dict, Any, Optional, List, Callable # Set up logger logger = logging.getLogger(__name__) class WordwareClient: """Client for making requests to Wordware API.""" def __init__(self, api_key: Optional[str] = None, api_url: Optional[str] = None): """Initialize the Wordware client. Args: api_key: API key for the Wordware service. If not provided, will try to load from environment variables. api_url: Base URL for the Wordware API. If not provided, will try to load from environment variables. """ self.api_key = api_key or os.environ.get("WORDWARE_API_KEY", "") self.api_url = api_url or os.environ.get("WORDWARE_API_URL", "https://api.wordware.ai") self.client = httpx.AsyncClient(timeout=300.0) # Increase timeout to 5 minutes for long-running research tasks # Load configuration self.config = self._load_config() logger.info(f"WordwareClient initialized with API URL: {self.api_url}") def _load_config(self) -> Dict[str, Any]: """Load configuration from config.json.""" config_path = os.environ.get("CONFIG_PATH", "./config.json") try: with open(config_path, "r") as f: config = json.load(f) logger.info(f"Configuration loaded from {config_path}") return config except (FileNotFoundError, json.JSONDecodeError) as e: logger.error(f"Error loading config: {e}") return {} async def _process_sse_stream(self, stream_url: str) -> Dict[str, Any]: """Process a Server-Sent Events stream to get all value events. Args: stream_url: The URL of the SSE stream Returns: Dictionary containing all value events from the stream """ # Dictionary to store all received value events value_events = {} # Dictionary to store accumulated delta events delta_paths = {} max_wait_time = 180 # Maximum wait time in seconds start_time = asyncio.get_event_loop().time() logger.info(f"Starting SSE stream processing from: {stream_url}") try: async with self.client.stream('GET', stream_url) as response: response.raise_for_status() logger.info("SSE stream connection established successfully") async for line in response.aiter_lines(): # Check if the maximum wait time has been exceeded current_time = asyncio.get_event_loop().time() if current_time - start_time > max_wait_time: logger.warning(f"Maximum wait time ({max_wait_time}s) exceeded. Breaking.") break if line.startswith('data:'): data = line[5:].strip() if not data: continue try: event_data = json.loads(data) event_type = event_data.get("type", "unknown") # Process 'value' type events if event_type == "value": path = event_data.get("path", "unknown_path") value = event_data.get("value", {}) value_events[path] = value logger.debug(f"Received value event for path: {path}") # Process 'delta' type events - as a fallback option elif event_type == "delta": path = event_data.get("path", "") delta = event_data.get("delta", {}) # Initialize the path if it doesn't exist yet if path not in delta_paths: delta_paths[path] = "" # Update the text for this path for 'text' type if delta.get("type") == "text" and isinstance(delta_paths[path], str): delta_paths[path] += delta.get("value", "") logger.debug(f"Updated delta for path: {path}") # End processing when a completion event is received elif event_type == "status" and event_data.get("status") == "completed": logger.info("Stream completed status received.") break # Completion events with output data elif event_type == "ai.wordware.run.completed.v1": logger.info("Completion event received.") if "data" in event_data and "output" in event_data["data"]: # Add completion event output data if available value_events["completion_output"] = event_data["data"]["output"] break except json.JSONDecodeError as e: logger.error(f"Failed to parse event data: {data}, error: {e}") except httpx.HTTPStatusError as e: logger.error(f"Error processing SSE stream: {e}") return {"error": f"Stream error: {e}"} except Exception as e: logger.error(f"Unexpected error during SSE stream processing: {str(e)}", exc_info=True) return {"error": f"Stream error: {str(e)}"} # If there are no value events but there are accumulated deltas, use them as a fallback if not value_events and delta_paths: logger.info("No value events found, using accumulated deltas instead") # Convert accumulated deltas to output format for path, content in delta_paths.items(): value_events[path] = content logger.info(f"Stream processing complete. Collected {len(value_events)} values") # Return all collected value events return {"output": value_events} async def research_person(self, full_name: str, company: str = "", url: str = "") -> Dict[str, Any]: """Research a person using Wordware's specialized research agent. Args: full_name: The full name of the person to research company: The company the person is associated with (optional) url: A relevant URL for additional context (optional) Returns: Research results from Wordware """ # Get founder research flow ID from config flow_id = self.config.get("founder_research_flow_id", "2ef1755d-febd-47d6-b96d-b35e719da0f9") # Prepare the payload using the new API format payload = { "data": { "type": "runs", "attributes": { "version": "1.0", "inputs": { "Full Name": full_name, "Company": company, "URL": url }, "webhooks": [] } } } headers = { "Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json", "Accept": "*/*" } try: # Make the API call to initiate the run response = await self.client.post( f"{self.api_url}/v1/apps/{flow_id}/runs", headers=headers, json=payload ) response.raise_for_status() run_data = response.json() # Extract the stream URL stream_url = run_data.get("data", {}).get("links", {}).get("stream") if not stream_url: return {"error": "No stream URL in response", "response": run_data} # Process the stream to get all value events print(f"Processing stream for founder research: {full_name}") result = await self._process_sse_stream(stream_url) return result except httpx.HTTPStatusError as e: print(f"Error calling Wordware API: {e.response.status_code} {e.response.text}") return {"error": f"API error: {e.response.status_code}", "message": e.response.text} except json.JSONDecodeError as e: return {"error": f"Failed to parse Wordware API response: {e}"} except Exception as e: return {"error": f"Unexpected error: {e}"} async def research_topic(self, query: str) -> Dict[str, Any]: """Research a topic using Wordware's general research capabilities. Args: query: The topic or question to research Returns: Research results from Wordware """ # In the future, when a specific ID for topic research is available, # we can implement a real API call. For now, we return mock data. return { "output": { "research": f"Research on topic: {query}\n\n" f"This is a mock response. In a real implementation, " f"this would call the Wordware API with the appropriate flow ID " f"for general topic research." } } async def save_to_notion(self, title: str, body: str) -> Dict[str, Any]: """Save content to Notion using Wordware's integration. Args: title: The title of the Notion page body: The content body of the Notion page Returns: Result of the operation """ # Get Notion integration flow ID from config or use default flow_id = self.config.get("notion_integration_flow_id", "55921f92-9374-444b-879a-3a7820a29850") # Prepare the payload using the required API format payload = { "data": { "type": "runs", "attributes": { "version": "1.0", "inputs": { "title": title, "body": body }, "webhooks": [] } } } headers = { "Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json", "Accept": "*/*" } try: # Make the API call to initiate the run response = await self.client.post( f"{self.api_url}/v1/apps/{flow_id}/runs", headers=headers, json=payload ) response.raise_for_status() run_data = response.json() # Extract the stream URL stream_url = run_data.get("data", {}).get("links", {}).get("stream") if not stream_url: return {"error": "No stream URL in response", "response": run_data} # Process the stream to get all value events print(f"Processing stream for Notion save: {title}") result = await self._process_sse_stream(stream_url) return result except httpx.HTTPStatusError as e: print(f"Error calling Wordware API: {e.response.status_code} {e.response.text}") return {"error": f"API error: {e.response.status_code}", "message": e.response.text} except json.JSONDecodeError as e: return {"error": f"Failed to parse Wordware API response: {e}"} except Exception as e: return {"error": f"Unexpected error: {e}"} async def get_tool_metadata(self, tool_id: str) -> Dict[str, Any]: """Get metadata for a Wordware tool by its ID. Args: tool_id: The ID of the Wordware tool Returns: Metadata about the tool including input/output schema """ headers = { "Authorization": f"Bearer {self.api_key}", "Accept": "*/*" } try: response = await self.client.get( f"{self.api_url}/v1/apps/{tool_id}", headers=headers ) response.raise_for_status() return response.json() except httpx.HTTPStatusError as e: print(f"Error retrieving tool metadata: {e.response.status_code} {e.response.text}") return {"error": f"API error: {e.response.status_code}", "message": e.response.text} except json.JSONDecodeError as e: return {"error": f"Failed to parse Wordware API response: {e}"} except Exception as e: return {"error": f"Unexpected error: {e}"} async def run_generic_tool(self, tool_id: str, inputs: Dict[str, Any]) -> Dict[str, Any]: """Run a generic Wordware tool with the given inputs. Args: tool_id: The ID of the Wordware tool to run inputs: Dictionary of input parameters for the tool Returns: Result from running the tool """ logger.info(f"Starting generic tool execution for tool_id: {tool_id}") logger.info(f"Input parameters: {inputs}") # Extract parameters from nested structure if needed actual_inputs = inputs # If data comes in the format {"kwargs": {...}}, use the contents of the kwargs field if len(inputs) == 1 and "kwargs" in inputs and isinstance(inputs["kwargs"], dict): actual_inputs = inputs["kwargs"] logger.info(f"Extracted actual inputs from kwargs: {actual_inputs}") # Prepare the payload using the API format payload = { "data": { "type": "runs", "attributes": { "version": "1.0", "inputs": actual_inputs, "webhooks": [] } } } logger.info(f"Prepared payload with inputs: {payload['data']['attributes']['inputs']}") headers = { "Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json", "Accept": "*/*" } try: # Create a new client for each request async with httpx.AsyncClient(timeout=300.0) as request_client: # Make the API call to initiate the run logger.info(f"Sending POST request to: {self.api_url}/v1/apps/{tool_id}/runs") response = await request_client.post( f"{self.api_url}/v1/apps/{tool_id}/runs", headers=headers, json=payload ) logger.info(f"Received response with status code: {response.status_code}") response.raise_for_status() run_data = response.json() logger.debug(f"Run data response: {json.dumps(run_data, indent=2)}") # Extract the stream URL stream_url = run_data.get("data", {}).get("links", {}).get("stream") if not stream_url: logger.error("No stream URL in response") return {"error": "No stream URL in response", "response": run_data} # Process the stream with a new client logger.info(f"Processing stream for tool {tool_id} at: {stream_url}") async def process_stream(): async with httpx.AsyncClient(timeout=300.0) as stream_client: return await self._process_sse_stream_with_client(stream_url, stream_client) result = await process_stream() logger.info(f"Stream processing completed for tool {tool_id}") return result except httpx.HTTPStatusError as e: error_msg = f"HTTP error {e.response.status_code}: {e.response.text}" logger.error(error_msg) return {"error": error_msg} except json.JSONDecodeError as e: error_msg = f"Failed to parse Wordware API response: {e}" logger.error(error_msg) return {"error": error_msg} except asyncio.CancelledError as e: error_msg = f"Operation was cancelled: {str(e)}" logger.error(error_msg) return {"error": error_msg} except RuntimeError as e: if "Event loop is closed" in str(e): error_msg = f"Event loop error: {str(e)}" logger.error(error_msg) return {"error": f"Event loop is closed. This typically happens when the server is shutting down or when there are nested async operations."} else: error_msg = f"Runtime error: {str(e)}" logger.error(error_msg, exc_info=True) return {"error": error_msg} except Exception as e: error_msg = f"Unexpected error: {str(e)}" logger.error(error_msg, exc_info=True) return {"error": error_msg} async def _process_sse_stream_with_client(self, stream_url: str, client: httpx.AsyncClient) -> Dict[str, Any]: """Process a Server-Sent Events stream to get all value events with a given client. Args: stream_url: The URL of the SSE stream client: HTTPX client to use Returns: Dictionary containing all value events from the stream """ # Dictionary to store all received value events value_events = {} # Dictionary to store accumulated delta events delta_paths = {} max_wait_time = 180 # Maximum wait time in seconds start_time = asyncio.get_event_loop().time() logger.info(f"Starting SSE stream processing from: {stream_url}") try: async with client.stream('GET', stream_url) as response: response.raise_for_status() logger.info("SSE stream connection established successfully") async for line in response.aiter_lines(): # Check if the maximum wait time has been exceeded current_time = asyncio.get_event_loop().time() if current_time - start_time > max_wait_time: logger.warning(f"Maximum wait time ({max_wait_time}s) exceeded. Breaking.") break if line.startswith('data:'): data = line[5:].strip() if not data: continue try: event_data = json.loads(data) event_type = event_data.get("type", "unknown") # Process 'value' type events if event_type == "value": path = event_data.get("path", "unknown_path") value = event_data.get("value", {}) value_events[path] = value logger.debug(f"Received value event for path: {path}") # Process 'delta' type events - as a fallback option elif event_type == "delta": path = event_data.get("path", "") delta = event_data.get("delta", {}) # Initialize the path if it doesn't exist yet if path not in delta_paths: delta_paths[path] = "" # Update the text for this path for 'text' type if delta.get("type") == "text" and isinstance(delta_paths[path], str): delta_paths[path] += delta.get("value", "") logger.debug(f"Updated delta for path: {path}") # End processing when a completion event is received elif event_type == "status" and event_data.get("status") == "completed": logger.info("Stream completed status received.") break # Completion events with output data elif event_type == "ai.wordware.run.completed.v1": logger.info("Completion event received.") if "data" in event_data and "output" in event_data["data"]: # Add completion event output data if available value_events["completion_output"] = event_data["data"]["output"] break except json.JSONDecodeError as e: logger.error(f"Failed to parse event data: {data}, error: {e}") except httpx.HTTPStatusError as e: logger.error(f"Error processing SSE stream: {e}") return {"error": f"Stream error: {e}"} except Exception as e: logger.error(f"Unexpected error during SSE stream processing: {str(e)}", exc_info=True) return {"error": f"Stream error: {str(e)}"} # If there are no value events but there are accumulated deltas, use them as a fallback if not value_events and delta_paths: logger.info("No value events found, using accumulated deltas instead") # Convert accumulated deltas to output format for path, content in delta_paths.items(): value_events[path] = content logger.info(f"Stream processing complete. Collected {len(value_events)} values") # Return all collected value events return {"output": value_events} async def close(self): """Close the HTTP client.""" logger.info("Closing HTTP client") await self.client.aclose()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/aleksandrkrivolap/wordware-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server