Skip to main content
Glama
applier.py14.7 kB
""" LinkedIn自动申请模块 """ import asyncio import logging import random from typing import Dict, Optional, List from playwright.async_api import Page, Browser, async_playwright from src.utils.logger import get_logger logger = get_logger(__name__) class LinkedInApplier: """LinkedIn自动申请器""" def __init__(self, browser_config: Dict): self.config = browser_config self.browser: Optional[Browser] = None self.page: Optional[Page] = None self.is_logged_in = False async def initialize(self): """初始化浏览器""" try: self.playwright = await async_playwright().start() self.browser = await self.playwright.chromium.launch( headless=self.config.get('headless', True), slow_mo=self.config.get('slow_mo', 500) ) context = await self.browser.new_context( viewport={ 'width': self.config.get('window_width', 1920), 'height': self.config.get('window_height', 1080) }, user_agent=self.config.get('user_agent') ) self.page = await context.new_page() self.page.set_default_timeout(self.config.get('page_timeout', 30000)) logger.info("LinkedIn applier initialized") except Exception as e: logger.error(f"初始化浏览器失败: {e}") raise async def apply_to_job(self, job_url: str, cover_letter: str = "", custom_answers: Dict = None) -> Dict: """申请LinkedIn职位 Args: job_url: 职位URL cover_letter: 求职信 custom_answers: 自定义问题答案 Returns: 申请结果 """ if not self.page: await self.initialize() if custom_answers is None: custom_answers = {} try: logger.info(f"开始申请LinkedIn职位: {job_url}") # 导航到职位页面 await self.page.goto(job_url, wait_until='networkidle') # 检查是否需要登录 if await self._need_login(): return { "status": "failed", "message": "需要先登录LinkedIn账号" } # 检查是否有Easy Apply按钮 easy_apply_button = await self.page.query_selector('button[data-tracking-control-name*="easy_apply"]') if not easy_apply_button: return { "status": "failed", "message": "此职位不支持Easy Apply" } # 点击Easy Apply按钮 await easy_apply_button.click() await asyncio.sleep(2) # 等待申请模态框出现 await self.page.wait_for_selector('.jobs-easy-apply-modal', timeout=10000) # 填写申请表单 form_result = await self._fill_application_form(cover_letter, custom_answers) if not form_result["success"]: return { "status": "failed", "message": f"填写表单失败: {form_result['message']}" } # 提交申请 submit_result = await self._submit_application() if submit_result: logger.info(f"成功申请职位: {job_url}") return { "status": "success", "message": "申请已成功提交", "job_url": job_url } else: return { "status": "failed", "message": "申请提交失败" } except Exception as e: logger.error(f"申请职位失败 {job_url}: {e}") return { "status": "failed", "message": f"申请过程中出错: {str(e)}" } async def _need_login(self) -> bool: """检查是否需要登录""" try: # 检查是否存在登录相关元素 login_elements = await self.page.query_selector_all('a[href*="login"]') return len(login_elements) > 0 except Exception: return True async def _fill_application_form(self, cover_letter: str, custom_answers: Dict) -> Dict: """填写申请表单 Args: cover_letter: 求职信 custom_answers: 自定义问题答案 Returns: 填写结果 """ try: # 处理多步骤表单 step = 1 max_steps = 5 while step <= max_steps: logger.info(f"处理申请表单步骤 {step}") # 检查当前步骤的表单元素 form_filled = await self._fill_current_step(cover_letter, custom_answers, step) # 查找下一步按钮 next_button = await self.page.query_selector('button[aria-label="继续到下一步骤"]') if not next_button: next_button = await self.page.query_selector('button:has-text("下一步")') if not next_button: next_button = await self.page.query_selector('button:has-text("Next")') if next_button: await next_button.click() await asyncio.sleep(2) step += 1 else: # 没有下一步按钮,表单填写完成 break # 避免无限循环 if step > max_steps: logger.warning("申请表单步骤过多,可能存在问题") break return {"success": True, "message": "表单填写完成"} except Exception as e: logger.error(f"填写申请表单失败: {e}") return {"success": False, "message": str(e)} async def _fill_current_step(self, cover_letter: str, custom_answers: Dict, step: int) -> bool: """填写当前步骤的表单""" try: filled_any = False # 填写求职信 if cover_letter and step == 1: cover_letter_field = await self.page.query_selector('textarea[name*="coverLetter"]') if not cover_letter_field: cover_letter_field = await self.page.query_selector('textarea[placeholder*="求职信"]') if not cover_letter_field: cover_letter_field = await self.page.query_selector('textarea[placeholder*="cover letter"]') if cover_letter_field: await cover_letter_field.fill(cover_letter) filled_any = True logger.info("已填写求职信") # 处理文件上传 await self._handle_file_uploads() # 处理问答题 filled_questions = await self._handle_questions(custom_answers) filled_any = filled_any or filled_questions # 处理选择题 filled_selections = await self._handle_selections(custom_answers) filled_any = filled_any or filled_selections # 随机延迟模拟人工操作 await asyncio.sleep(random.uniform(1, 3)) return filled_any except Exception as e: logger.error(f"填写表单步骤失败: {e}") return False async def _handle_file_uploads(self): """处理文件上传""" try: # 查找文件上传字段 file_inputs = await self.page.query_selector_all('input[type="file"]') for file_input in file_inputs: # 检查是否已有文件上传 upload_status = await self.page.query_selector('.file-upload__status') if upload_status: continue # 已有文件,跳过 # 这里可以上传简历文件 # 暂时跳过文件上传,因为需要具体的文件路径 logger.info("检测到文件上传字段,但暂未处理") except Exception as e: logger.warning(f"处理文件上传失败: {e}") async def _handle_questions(self, custom_answers: Dict) -> bool: """处理问答题""" try: filled_any = False # 查找文本输入框 text_inputs = await self.page.query_selector_all('input[type="text"], textarea') for input_elem in text_inputs: # 获取问题文本 question_text = await self._get_question_text(input_elem) if question_text and question_text in custom_answers: await input_elem.fill(custom_answers[question_text]) filled_any = True logger.info(f"回答问题: {question_text[:50]}...") # 处理常见问题 elif question_text: answer = await self._get_default_answer(question_text) if answer: await input_elem.fill(answer) filled_any = True logger.info(f"使用默认答案回答: {question_text[:50]}...") return filled_any except Exception as e: logger.error(f"处理问答题失败: {e}") return False async def _handle_selections(self, custom_answers: Dict) -> bool: """处理选择题""" try: filled_any = False # 处理下拉选择 selects = await self.page.query_selector_all('select') for select in selects: options = await select.query_selector_all('option') if len(options) > 1: # 有选项可选 # 选择第一个非空选项 await select.select_option(index=1) filled_any = True # 处理单选按钮 radio_groups = await self.page.query_selector_all('input[type="radio"]') processed_names = set() for radio in radio_groups: name = await radio.get_attribute('name') if name and name not in processed_names: # 选择第一个选项 await radio.click() processed_names.add(name) filled_any = True return filled_any except Exception as e: logger.error(f"处理选择题失败: {e}") return False async def _get_question_text(self, input_elem) -> Optional[str]: """获取问题文本""" try: # 尝试多种方式获取问题文本 label = await input_elem.get_attribute('aria-label') if label: return label placeholder = await input_elem.get_attribute('placeholder') if placeholder: return placeholder # 查找相关的label元素 input_id = await input_elem.get_attribute('id') if input_id: label_elem = await self.page.query_selector(f'label[for="{input_id}"]') if label_elem: return await label_elem.inner_text() return None except Exception: return None async def _get_default_answer(self, question_text: str) -> Optional[str]: """获取问题的默认答案""" question_lower = question_text.lower() # 常见问题的默认答案 default_answers = { "experience": "5+ years", "visa": "Authorized to work", "salary": "Competitive", "start": "Available immediately", "relocate": "Yes", "remote": "Yes", "willing": "Yes" } for keyword, answer in default_answers.items(): if keyword in question_lower: return answer return None async def _submit_application(self) -> bool: """提交申请""" try: # 查找提交按钮 submit_button = await self.page.query_selector('button[data-tracking-control-name*="submit"]') if not submit_button: submit_button = await self.page.query_selector('button:has-text("提交申请")') if not submit_button: submit_button = await self.page.query_selector('button:has-text("Submit application")') if submit_button: await submit_button.click() # 等待提交完成 - 查找成功消息或跳转 try: await self.page.wait_for_selector('.jobs-apply-success', timeout=10000) return True except Exception: # 检查URL是否发生变化(通常表示提交成功) await asyncio.sleep(3) current_url = self.page.url if 'applied' in current_url or 'success' in current_url: return True return False except Exception as e: logger.error(f"提交申请失败: {e}") return False async def close(self): """关闭浏览器""" try: if self.browser: await self.browser.close() if hasattr(self, 'playwright'): await self.playwright.stop() logger.info("LinkedIn applier closed") except Exception as e: logger.error(f"关闭浏览器失败: {e}") async def __aenter__(self): """异步上下文管理器入口""" await self.initialize() return self async def __aexit__(self, exc_type, exc_val, exc_tb): """异步上下文管理器出口""" await self.close() if __name__ == "__main__": async def test_linkedin_applier(): """测试LinkedIn申请器""" config = { 'headless': False, 'slow_mo': 1000, 'window_width': 1920, 'window_height': 1080 } async with LinkedInApplier(config) as applier: # 测试申请(需要有效的job_url) test_url = "https://www.linkedin.com/jobs/view/test-job-id" cover_letter = "I am interested in this position..." result = await applier.apply_to_job(test_url, cover_letter) print(f"申请结果: {result}") # asyncio.run(test_linkedin_applier())

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/guangliangyang/mcp4Interview'

If you have feedback or need assistance with the MCP directory API, please join our Discord server