app.py•6.72 kB
def search_jobs(keywords, location, locale="en_US", affid="371d48447450886ce16b718533cca6f2", user_ip="127.0.0.1", user_agent="Mozilla/5.0", url="http://example.com", **kwargs):
"""
Search for jobs using Careerjet API.
Args:
keywords (str): Keywords to match job titles, content or company names
location (str): Location of requested jobs
locale (str): Locale code (default: en_US)
affid (str): Affiliate ID (required by Careerjet)
user_ip (str): IP address of the end-user
user_agent (str): User agent of the end-user's browser
url (str): URL of page that will display the search results
**kwargs: Additional search parameters (sort, start_num, pagesize, etc.)
Returns:
dict: Search results from Careerjet API
"""
try:
# First try the official client
try:
from careerjet_api_client import CareerjetAPIClient
# Initialize Careerjet API client
cj = CareerjetAPIClient(locale)
# Prepare search parameters
search_params = {
'keywords': keywords,
'location': location,
'affid': affid,
'user_ip': user_ip,
'user_agent': user_agent,
'url': url
}
# Add optional parameters
for key, value in kwargs.items():
if key in ['sort', 'start_num', 'pagesize', 'page', 'contracttype', 'contractperiod']:
search_params[key] = value
# Perform search
result = cj.search(search_params)
return result
except Exception as client_error:
# Fallback to direct HTTP API call
return search_jobs_direct_api(keywords, location, locale, affid, user_ip, user_agent, url, **kwargs)
except Exception as e:
return {"error": f"Failed to search jobs: {str(e)}"}
def search_jobs_direct_api(keywords, location, locale="en_US", affid="371d48447450886ce16b718533cca6f2", user_ip="127.0.0.1", user_agent="Mozilla/5.0", url="http://example.com", **kwargs):
"""
Direct HTTP API call to Careerjet.
"""
import requests
# Use the official API endpoint based on locale
if locale.startswith('tr'):
api_url = "https://public-api.careerjet.net/jobs"
elif locale.startswith('en_GB'):
api_url = "https://public-api.careerjet.co.uk/jobs"
elif locale.startswith('en_US'):
api_url = "https://public-api.careerjet.com/jobs"
elif locale.startswith('de'):
api_url = "https://public-api.careerjet.de/jobs"
elif locale.startswith('fr'):
api_url = "https://public-api.careerjet.fr/jobs"
else:
api_url = "https://public-api.careerjet.com/jobs"
# Prepare parameters for the API
params = {
'keywords': keywords,
'location': location,
'locale_code': locale,
'affid': affid,
'user_ip': user_ip,
'user_agent': user_agent,
'url': url
}
# Add optional parameters
for key, value in kwargs.items():
if key in ['sort', 'start_num', 'pagesize', 'page', 'contracttype', 'contractperiod'] and value is not None:
params[key] = value
try:
# Make API request
headers = {
'User-Agent': user_agent,
'Accept': 'application/json'
}
response = requests.get(api_url, params=params, headers=headers, timeout=30)
if response.status_code == 200:
try:
data = response.json()
if isinstance(data, dict):
return data
else:
return {
"error": "Invalid response format",
"raw_response": str(data)[:500]
}
except ValueError as json_error:
return {
"error": "Invalid JSON response from API",
"details": str(json_error),
"raw_response": response.text[:500],
"status_code": response.status_code
}
elif response.status_code == 401:
return {
"error": "Authentication failed - Invalid API key",
"message": "The provided affiliate ID (affid) is not valid. Please check your Careerjet API credentials.",
"status_code": 401,
"affid_used": affid
}
elif response.status_code == 403:
return {
"error": "Access forbidden",
"message": "API access denied. Please check your affiliate ID and permissions.",
"status_code": 403
}
elif response.status_code == 429:
return {
"error": "Rate limit exceeded",
"message": "Too many requests. Please wait before making another request.",
"status_code": 429
}
else:
return {
"error": f"API request failed with status {response.status_code}",
"message": response.text[:200] if response.text else "No response content",
"url": response.url,
"status_code": response.status_code
}
except requests.exceptions.Timeout:
return {
"error": "Request timeout",
"message": "The API request timed out. Please try again later."
}
except requests.exceptions.ConnectionError:
return {
"error": "Connection error",
"message": "Unable to connect to the Careerjet API. Please check your internet connection."
}
except requests.exceptions.RequestException as e:
return {
"error": "Request failed",
"message": f"API request failed: {str(e)}"
}
def get_job_details(job_url, locale="en_US"):
"""
Get detailed information about a specific job.
Args:
job_url (str): URL of the job posting
locale (str): Locale code (default: en_US)
Returns:
dict: Job details or error message
"""
try:
import requests
# Simple job details extraction (basic implementation)
# In a real scenario, you might want to scrape the job page or use additional APIs
return {
"job_url": job_url,
"message": "Job details retrieval would require additional implementation",
"suggestion": "Use the search_jobs function to get job listings with basic details"
}
except Exception as e:
return {"error": f"Failed to get job details: {str(e)}"}