search_engine.py•14.5 kB
"""
SearchEngine - Stateless search logic for TimeLooker Lambda function.
"""
import json
import hashlib
import logging
import re
from datetime import datetime
from typing import List, Dict, Any
from openai import OpenAI
import os
class SearchEngine:
"""
Handles the core search functionality without maintaining state.
All methods are stateless and can be used in Lambda functions.
"""
def __init__(self):
"""
Initialize SearchEngine with OpenAI client.
"""
# Initialize OpenAI client from environment variable
try:
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
raise ValueError("OPENAI_API_KEY environment variable is required")
# Initialize with minimal parameters to avoid compatibility issues
self.openai_client = OpenAI(api_key=api_key)
# Test the client with a simple call
logging.info("OpenAI client initialized successfully")
except Exception as e:
logging.error(f"Failed to initialize OpenAI client: {e}")
logging.error(f"OpenAI module version info: {getattr(__import__('openai'), '__version__', 'unknown')}")
raise
# Setup logging
self.logger = logging.getLogger(__name__)
def execute_search_task(self, task_description: str) -> str:
"""
Execute the search task using OpenAI Responses API with web search.
Args:
task_description: Natural language description of what to search for
Returns:
Raw response from OpenAI API
"""
prompt = f"""
Search the web for current information about: {task_description}
Please return ALL available results, not just a limited number. Return your response as a valid JSON array where each item is an object with the following structure:
{{
"name": "Title or name of the item found",
"description": "Brief description of the item",
"url": "Direct link to the source (if available)",
"source": "Source name or organization",
"location": "Location or context (if applicable)",
"additional_info": "Any other relevant details"
}}
If no items are found, return an empty JSON array: []
Make sure to return ONLY valid JSON, no additional text or formatting.
"""
try:
response = self.openai_client.responses.create(
model="gpt-4.1",
tools=[{
"type": "web_search_preview",
"search_context_size": "high"
}],
input=prompt
)
return response.output_text.strip()
except Exception as e:
self.logger.error(f"Error executing search task: {e}")
return "Error occurred during search."
def parse_search_results(self, raw_response: str) -> List[Dict[str, Any]]:
"""
Parse the OpenAI JSON response into structured data for comparison.
Args:
raw_response: Raw JSON response from OpenAI API
Returns:
List of parsed items with hashes and timestamps
"""
try:
# Remove markdown code blocks if present
response_text = raw_response.strip()
if response_text.startswith('```json'):
response_text = response_text[7:] # Remove ```json
if response_text.endswith('```'):
response_text = response_text[:-3] # Remove ```
response_text = response_text.strip()
# Try to fix common JSON issues
if not response_text.endswith(']'):
# Try to find the last complete object
last_complete_brace = response_text.rfind(' }')
if last_complete_brace != -1:
# Add closing bracket after the last complete object
response_text = response_text[:last_complete_brace + 5] + '\n]'
else:
# If no complete objects found, try to close current object and array
response_text = response_text.rstrip().rstrip(',') + '\n }\n]'
# Parse JSON response
items_data = json.loads(response_text)
if not items_data:
return []
items = []
seen_hashes = set() # Track duplicates
for item_data in items_data:
# Skip items with missing essential fields
if not item_data.get('name') or not item_data.get('source'):
continue
# Create a hash of the essential content for comparison
essential_content = f"{item_data.get('name', '')}-{item_data.get('source', '')}-{item_data.get('url', '')}"
content_hash = hashlib.md5(essential_content.encode()).hexdigest()
# Skip duplicates within the same response
if content_hash in seen_hashes:
continue
seen_hashes.add(content_hash)
# Add hash and timestamp to the item
item_data['hash'] = content_hash
item_data['timestamp'] = datetime.now().isoformat()
items.append(item_data)
return items
except json.JSONDecodeError as e:
self.logger.error(f"Failed to parse JSON response: {e}")
self.logger.debug(f"Attempted to parse: {response_text[:500]}...")
# Fallback: try to extract individual items manually
try:
return self._manual_parse_fallback(raw_response)
except Exception as fallback_error:
self.logger.error(f"Fallback parsing also failed: {fallback_error}")
return []
def _manual_parse_fallback(self, raw_response: str) -> List[Dict[str, Any]]:
"""
Fallback method to manually extract items from malformed JSON.
Args:
raw_response: Raw response that failed JSON parsing
Returns:
List of manually extracted items
"""
items = []
seen_hashes = set() # Track duplicates in fallback too
# Remove markdown blocks
response_text = raw_response.strip()
if response_text.startswith('```json'):
response_text = response_text[7:]
if response_text.endswith('```'):
response_text = response_text[:-3]
# Split by object boundaries and try to parse individual objects
# Use a more precise regex pattern
object_pattern = r'\{\s*"name":\s*"[^"]*",\s*"description":\s*"[^"]*",\s*"url":\s*"[^"]*",\s*"source":\s*"[^"]*",\s*"location":\s*"[^"]*",\s*"additional_info":\s*"[^"]*"\s*\}'
matches = re.findall(object_pattern, response_text, re.DOTALL)
# If the precise pattern doesn't work, try a more flexible one
if not matches:
object_pattern = r'\{[^{}]*?"name"[^{}]*?"description"[^{}]*?"url"[^{}]*?"source"[^{}]*?\}'
matches = re.findall(object_pattern, response_text, re.DOTALL)
for match in matches:
try:
# Try to parse individual object
item_data = json.loads(match)
if item_data.get('name') and item_data.get('source'):
essential_content = f"{item_data.get('name', '')}-{item_data.get('source', '')}-{item_data.get('url', '')}"
content_hash = hashlib.md5(essential_content.encode()).hexdigest()
# Skip duplicates
if content_hash in seen_hashes:
continue
seen_hashes.add(content_hash)
item_data['hash'] = content_hash
item_data['timestamp'] = datetime.now().isoformat()
items.append(item_data)
except json.JSONDecodeError:
continue
self.logger.info(f"Fallback parsing extracted {len(items)} items")
return items
def find_new_items(self, current_items: List[Dict[str, Any]], previous_items: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Compare current results with previous results to find new items using AI.
Args:
current_items: List of current search results
previous_items: List of previously seen search results
Returns:
List of new items not seen before
"""
if not previous_items:
# First run - all items are "new" but don't notify
return []
# First, do a quick hash-based filter to catch obvious duplicates
previous_hashes = {item['hash'] for item in previous_items}
potentially_new = [item for item in current_items if item['hash'] not in previous_hashes]
if not potentially_new:
return []
# Use AI to do intelligent comparison for potentially new items
truly_new_items = self._ai_compare_items(previous_items, potentially_new)
return truly_new_items
def _ai_compare_items(self, previous_items: List[Dict[str, Any]], potentially_new_items: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Use OpenAI to intelligently compare items and identify truly new ones.
Args:
previous_items: List of previously seen items
potentially_new_items: List of items that might be new
Returns:
List of genuinely new items
"""
try:
# Prepare comparison data
previous_summary = []
for item in previous_items:
previous_summary.append({
"name": item.get('name', ''),
"source": item.get('source', ''),
"location": item.get('location', ''),
"url": item.get('url', '')
})
potentially_new_summary = []
for item in potentially_new_items:
potentially_new_summary.append({
"name": item.get('name', ''),
"source": item.get('source', ''),
"location": item.get('location', ''),
"url": item.get('url', ''),
"original_index": len(potentially_new_summary) # Track original index
})
prompt = f"""
Compare these two lists and identify which items from the "potentially_new" list are genuinely NEW and not present in the "previous" list.
Consider items the SAME if they have:
- Same or very similar job title/name (ignore minor formatting differences)
- Same company/organization (source)
- Same or very similar location
Previous items seen before:
{json.dumps(previous_summary, indent=2)}
Potentially new items to check:
{json.dumps(potentially_new_summary, indent=2)}
Return ONLY a JSON array containing the original_index numbers of items that are genuinely NEW (not duplicates).
If no items are new, return an empty array [].
Example response: [0, 2, 5] (meaning items at indices 0, 2, and 5 are genuinely new)
"""
response = self.openai_client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": "You are an expert at comparing job listings and identifying duplicates. Return only valid JSON."},
{"role": "user", "content": prompt}
],
temperature=0.1,
max_tokens=500
)
# Parse the response to get indices of new items
response_text = response.choices[0].message.content.strip()
# Remove markdown if present
if response_text.startswith('```json'):
response_text = response_text[7:]
if response_text.endswith('```'):
response_text = response_text[:-3]
response_text = response_text.strip()
new_indices = json.loads(response_text)
# Return the genuinely new items
truly_new = [potentially_new_items[i] for i in new_indices if i < len(potentially_new_items)]
self.logger.info(f"AI comparison: {len(potentially_new_items)} potentially new items -> {len(truly_new)} genuinely new items")
return truly_new
except Exception as e:
self.logger.error(f"Error in AI comparison: {e}")
# Fallback to hash-based comparison if AI fails
self.logger.warning("Falling back to hash-based comparison")
return potentially_new_items
def search_and_compare(self, task_description: str, previous_items: List[Dict[str, Any]]) -> tuple[List[Dict[str, Any]], List[Dict[str, Any]]]:
"""
Execute a complete search cycle: search, parse, and compare results.
Args:
task_description: Natural language description of what to search for
previous_items: List of previously seen search results
Returns:
Tuple of (all_current_items, new_items_only)
"""
# Execute search
raw_response = self.execute_search_task(task_description)
# Parse results
current_items = self.parse_search_results(raw_response)
# Find new items
new_items = self.find_new_items(current_items, previous_items)
self.logger.info(f"Search completed: {len(current_items)} total items, {len(new_items)} new items")
return current_items, new_items