mcp4amazon.py•5.93 kB
# -*- coding: utf-8 -*-
# @Author : Jeremy-666
import re
import json
import time
from bs4 import BeautifulSoup
from mcp.server.fastmcp import FastMCP
from playwright.async_api import async_playwright
from playwright._impl._errors import TimeoutError
mcp = FastMCP("mcp4amazon")
@mcp.tool()
async def get_product_info(asin: str) -> str:
""" 获取该asin的产品信息, 包括: 产品标题, 产品描述, 所有变体信息
Args:
asin: 亚马逊asin码
"""
async with async_playwright() as playwright:
#通过9222端口连接谷歌浏览器
browser = await playwright.chromium.connect_over_cdp("http://localhost:9222")
page = await browser.contexts[0].new_page()
await page.bring_to_front()
#打开亚马逊网站asin详情页
url = "https://www.amazon.com/dp/%s?th=1" % asin
async with page.expect_response(url, timeout=0) as response:
await page.goto(url, wait_until='commit')
temp = await response.value
html = await temp.text()
soup = BeautifulSoup(html, 'html.parser')
target_script = None
for script in soup.find_all('script'):
if 'twister-js-init-dpx-data' in script.text:
target_script = script.text
break
span_title = page.locator("xpath=//span[@id='productTitle']")
title = await span_title.inner_text()
result = '产品标题:\n%s' % title
features = []
ul_feature = page.locator("xpath=//ul[@class='a-unordered-list a-vertical a-spacing-mini']//span[@class='a-list-item']")
ul_features = await ul_feature.all()
for ul in ul_features:
feature = await ul.inner_text()
features.append(feature)
result += '\n产品描述:\n%s' % '\n'.join(features)
if target_script:
pattern = r'"dimensionValuesDisplayData"\s*:\s*({.*?})\s*(?=,|}|$)'
match = re.search(pattern, target_script, re.DOTALL)
if match:
variations = []
data = json.loads(match.group(1))
for key in list(data.keys()):
variations.append('变体(%s): %s' % (key, '-'.join(data[key])))
data = '\n'.join(variations)
else:
data = 'Not Found'
result += '\n所有变体信息:\n%s' % data
await page.close()
return result
@mcp.tool()
async def get_product_review(asin: str, max_length: int) -> str:
""" 获取该asin的评论信息, 字符总数不超过max_length
Args:
asin: 亚马逊asin码
max_length: 字符总数
"""
async with async_playwright() as playwright:
#通过9222端口连接谷歌浏览器
browser = await playwright.chromium.connect_over_cdp("http://localhost:9222")
page = await browser.contexts[0].new_page()
await page.bring_to_front()
url = "https://www.amazon.com/product-reviews/%s/ref=cm_cr_arp_d_viewopt_srt?ie=UTF8&reviewerType=all_reviews&sortBy=recent&pageNumber=1" % asin
#打开亚马逊网站asin详情页
async with page.expect_response(url, timeout=0) as response:
await page.goto(url, wait_until='commit')
temp = await response.value
html = await temp.text()
list_review, index = [], 2
while True:
if html:
soup = BeautifulSoup(html, 'html.parser')
list_div = soup.find_all('div', class_='a-section celwidget')
for div in list_div:
name = div.find('span', class_='a-profile-name').get_text().strip()
star = div.find('span', class_='a-icon-alt').get_text()[0: 1]
a_property = div.find('a', class_='a-size-mini a-link-normal a-color-secondary')
if a_property:
for i_tag in a_property.find_all('i'):
i_tag.replace_with(' | ')
property = a_property.get_text().strip()
else:
property = 'Unknown'
title = div.find('a', class_='a-size-base a-link-normal review-title a-color-base review-title-content a-text-bold').find_all('span')[-1].get_text().strip()
content = div.find('span', class_='a-size-base review-text review-text-content').find('span').get_text().strip()
list_review.append({'Name': name, 'Star': star, 'Property': property, 'Title': title, 'Content': content})
if len(json.dumps(list_review, indent=4)) > max_length:
break
time.sleep(1)
if len(json.dumps(list_review, indent=4)) > max_length:
break
else:
url = re.compile(r"https://www.amazon.com/hz/reviews-render/ajax/reviews/get/ref=.*next_%s" % index)
next_button = page.get_by_text('Next page')
if await next_button.get_attribute('class') == 'a-disabled a-last':
break
await next_button.scroll_into_view_if_needed()
try:
async with page.expect_response(url, timeout=5000) as response:
await next_button.click()
temp = await response.value
text = await temp.text()
operations = [json.loads(s) for s in text.split("&&&")[0: -1]]
target_appends = [op[2] for op in operations if op[0] == "append" and op[1] == "#cm_cr-review_list"]
html = "".join(target_appends)
index += 1
except TimeoutError:
html = ''
await page.close()
return json.dumps(list_review, indent=4)
if __name__ == "__main__":
mcp.run(transport='stdio')