| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387 |
- import re
- import json
- import logging
- from typing import Dict, List, Optional, Any
- from urllib.parse import urlencode, quote
- from .base import BaseCrawler
- from utils.helpers import clean_price, parse_sales_volume
- class JdCrawler(BaseCrawler):
- """
- 京东平台爬虫
- 注意:京东有较强的反爬机制,可能需要配置 Cookie 或使用代理
- """
-
- platform = 'jd'
-
- def __init__(self, cookie: str = None, proxy: str = None):
- """
- 初始化京东爬虫
- :param cookie: 京东登录后的 Cookie 字符串
- :param proxy: 代理服务器地址
- """
- super().__init__()
- self.cookie = cookie
- self.proxy = proxy
-
- if cookie:
- self.headers['Cookie'] = cookie
-
- if proxy:
- self.session.proxies = {
- 'http': proxy,
- 'https': proxy
- }
-
- def search(self, keyword: str, page: int = 1, sort: str = 'sort_totalsales15_desc', **kwargs) -> List[Dict[str, Any]]:
- """
- 搜索京东商品
- :param keyword: 搜索关键词
- :param page: 页码,从1开始
- :param sort: 排序方式:
- - sort_totalsales15_desc (销量)
- - sort_price_asc (价格升序)
- - sort_price_desc (价格降序)
- - sort_discount_desc (折扣)
- - sort_totalsales15_desc (销量)
- :return: 商品列表
- """
- self.logger.info(f"搜索京东商品: keyword={keyword}, page={page}")
-
- params = {
- 'keyword': keyword,
- 'wq': keyword,
- 'pvid': self._generate_pvid(),
- 'page': page,
- 's': (page - 1) * 30 + 1,
- }
-
- if sort and sort != 'default':
- params['psort'] = sort
-
- search_url = f"{self.config.get('search_url')}?{urlencode(params)}"
-
- self.logger.debug(f"搜索URL: {search_url}")
-
- response = self._make_request('GET', search_url)
-
- if not response:
- self.logger.warning(f"搜索请求失败: {keyword}")
- return []
-
- products = self._parse_search_result(response.text, search_url)
-
- if products:
- self._fill_product_prices(products)
-
- self.logger.info(f"从搜索结果中解析到 {len(products)} 个商品")
-
- return products
-
- def _generate_pvid(self) -> str:
- """
- 生成京东 pvid
- """
- import time
- import random
- timestamp = int(time.time() * 1000)
- random_num = random.randint(100000, 999999)
- return f"{timestamp}{random_num}"
-
- def _parse_search_result(self, html: str, source_url: str) -> List[Dict[str, Any]]:
- """
- 解析京东搜索结果页面
- """
- products = []
-
- item_pattern = r'<li[^>]*data-sku="(\d+)"[^>]*>(.*?)</li>'
- items = re.findall(item_pattern, html, re.DOTALL | re.IGNORECASE)
-
- self.logger.debug(f"找到 {len(items)} 个商品项")
-
- for sku_id, item_html in items:
- try:
- product = self._parse_item_html(sku_id, item_html, source_url)
- if product:
- products.append(product)
- except Exception as e:
- self.logger.error(f"解析商品失败: {e}")
- continue
-
- if not products:
- products = self._parse_from_json(html, source_url)
-
- return products
-
- def _parse_item_html(self, sku_id: str, item_html: str, source_url: str) -> Optional[Dict[str, Any]]:
- """
- 解析单个商品HTML
- """
- try:
- title_match = re.search(r'<div[^>]*class="p-name[^"]*"[^>]*>.*?<a[^>]*>(.*?)</a>', item_html, re.DOTALL | re.IGNORECASE)
- title = ''
- if title_match:
- title = title_match.group(1)
- title = re.sub(r'<[^>]+>', '', title).strip()
- title = re.sub(r'\s+', ' ', title)
-
- if not title:
- return None
-
- price_match = re.search(r'<div[^>]*class="p-price[^"]*"[^>]*>.*?<i[^>]*>([\d.]*)</i>', item_html, re.DOTALL | re.IGNORECASE)
- price = clean_price(price_match.group(1)) if price_match else 0
-
- shop_match = re.search(r'<div[^>]*class="p-shop[^"]*"[^>]*>.*?<a[^>]*>(.*?)</a>', item_html, re.DOTALL | re.IGNORECASE)
- shop_name = ''
- if shop_match:
- shop_name = shop_match.group(1)
- shop_name = re.sub(r'<[^>]+>', '', shop_name).strip()
-
- sales_match = re.search(r'<div[^>]*class="p-commit[^"]*"[^>]*>.*?<a[^>]*>([^<]+)</a>', item_html, re.DOTALL | re.IGNORECASE)
- sales_str = sales_match.group(1) if sales_match else ''
- sales_volume = parse_sales_volume(sales_str)
-
- img_match = re.search(r'<div[^>]*class="p-img[^"]*"[^>]*>.*?<img[^>]*data-lazy-img="([^"]+)"', item_html, re.DOTALL | re.IGNORECASE) or \
- re.search(r'<div[^>]*class="p-img[^"]*"[^>]*>.*?<img[^>]*src="([^"]+)"', item_html, re.DOTALL | re.IGNORECASE)
- img_url = img_match.group(1) if img_match else ''
- if img_url and not img_url.startswith('http'):
- img_url = 'https:' + img_url
-
- detail_url = f"https://item.jd.com/{sku_id}.html"
-
- is_self = '京东自营' in shop_name or 'p-icons' in item_html and '自营' in item_html
-
- return {
- 'product_id': str(sku_id),
- 'name': title,
- 'price': price,
- 'image_url': img_url,
- 'url': detail_url,
- 'shop_name': shop_name,
- 'sales_volume': sales_volume,
- 'is_jd_self': is_self,
- 'source_url': source_url,
- 'is_wholesale': False,
- 'currency': 'CNY'
- }
-
- except Exception as e:
- self.logger.error(f"解析商品HTML失败: {e}")
- return None
-
- def _parse_from_json(self, html: str, source_url: str) -> List[Dict[str, Any]]:
- """
- 从页面中的 JSON 数据解析商品
- """
- products = []
-
- json_pattern = r'window\.__SEARCH_RESULT__\s*=\s*({.*?});'
- json_match = re.search(json_pattern, html, re.DOTALL)
-
- if json_match:
- try:
- data = json.loads(json_match.group(1))
- items = data.get('wareList', {}).get('wareInfo', [])
-
- for item in items:
- product = self._parse_json_item(item, source_url)
- if product:
- products.append(product)
-
- except json.JSONDecodeError as e:
- self.logger.error(f"解析JSON失败: {e}")
-
- return products
-
- def _parse_json_item(self, item: Dict, source_url: str) -> Optional[Dict[str, Any]]:
- """
- 解析 JSON 格式的商品数据
- """
- try:
- sku_id = item.get('wname', '') or item.get('wareId', '')
- if not sku_id:
- return None
-
- title = item.get('wname', '').strip()
- if not title:
- return None
-
- price = clean_price(str(item.get('price', '0')))
- original_price = clean_price(str(item.get('oprice', '0'))) or None
-
- img_url = item.get('imgurl', '')
- if img_url and not img_url.startswith('http'):
- img_url = 'https:' + img_url
-
- shop_name = item.get('goodShop', {}).get('shopName', '') if item.get('goodShop') else ''
-
- sales_str = item.get('reviews', '')
- sales_volume = parse_sales_volume(sales_str)
-
- detail_url = f"https://item.jd.com/{sku_id}.html"
-
- return {
- 'product_id': str(sku_id),
- 'name': title,
- 'price': price,
- 'original_price': original_price,
- 'image_url': img_url,
- 'url': detail_url,
- 'shop_name': shop_name,
- 'sales_volume': sales_volume,
- 'source_url': source_url,
- 'is_wholesale': False,
- 'currency': 'CNY'
- }
-
- except Exception as e:
- self.logger.error(f"解析JSON商品失败: {e}")
- return None
-
- def _fill_product_prices(self, products: List[Dict[str, Any]]):
- """
- 批量获取商品价格(京东价格接口)
- """
- if not products:
- return
-
- sku_ids = [p.get('product_id') for p in products if p.get('product_id') and p.get('price', 0) <= 0]
-
- if not sku_ids:
- return
-
- self.logger.debug(f"批量获取 {len(sku_ids)} 个商品的价格")
-
- sku_str = ','.join([f'J_{sku}' for sku in sku_ids])
- price_url = f"https://p.3.cn/prices/mgets?skuIds={sku_str}&type=1"
-
- response = self._make_request('GET', price_url, headers={
- 'Referer': 'https://www.jd.com'
- })
-
- if response:
- try:
- price_data = response.json()
- price_map = {}
- for item in price_data:
- sku = item.get('id', '').replace('J_', '')
- price = clean_price(item.get('p', '0'))
- original_price = clean_price(item.get('op', '0')) or None
- price_map[sku] = {'price': price, 'original_price': original_price}
-
- for product in products:
- sku = product.get('product_id')
- if sku in price_map:
- if product.get('price', 0) <= 0:
- product['price'] = price_map[sku]['price']
- if not product.get('original_price'):
- product['original_price'] = price_map[sku]['original_price']
-
- except json.JSONDecodeError as e:
- self.logger.error(f"解析价格数据失败: {e}")
-
- def get_product_detail(self, product_id: str, **kwargs) -> Optional[Dict[str, Any]]:
- """
- 获取京东商品详情
- :param product_id: 商品ID (sku)
- :return: 商品详情
- """
- self.logger.info(f"获取京东商品详情: product_id={product_id}")
-
- detail_url = f"https://item.jd.com/{product_id}.html"
-
- response = self._make_request('GET', detail_url)
-
- if not response:
- self.logger.warning(f"获取商品详情失败: {product_id}")
- return None
-
- return self._parse_product_detail(response.text, product_id, detail_url)
-
- def _parse_product_detail(self, html: str, product_id: str, source_url: str) -> Optional[Dict[str, Any]]:
- """
- 解析商品详情页面
- """
- try:
- title_match = re.search(r'<title>([^<]+)</title>', html) or \
- re.search(r'<div[^>]*class="sku-name[^"]*"[^>]*>(.*?)</div>', html, re.DOTALL | re.IGNORECASE)
- title = ''
- if title_match:
- title = title_match.group(1)
- title = re.sub(r'<[^>]+>', '', title).strip()
- title = title.split('-')[0].strip() if '-' in title else title
-
- shop_match = re.search(r'<div[^>]*class="name[^"]*"[^>]*>.*?<a[^>]*>(.*?)</a>', html, re.DOTALL | re.IGNORECASE) or \
- re.search(r'shopName\s*:\s*"([^"]+)"', html)
- shop_name = shop_match.group(1).strip() if shop_match else ''
-
- self._fill_product_prices([{'product_id': product_id, 'price': 0}])
-
- return {
- 'product_id': str(product_id),
- 'name': title,
- 'url': source_url,
- 'shop_name': shop_name,
- 'is_wholesale': False,
- 'currency': 'CNY'
- }
-
- except Exception as e:
- self.logger.error(f"解析商品详情失败: {e}")
- return None
-
- def get_price(self, product_id: str, **kwargs) -> Optional[Dict[str, Any]]:
- """
- 获取商品价格
- :param product_id: 商品ID
- :return: 价格信息
- """
- self.logger.info(f"获取京东商品价格: product_id={product_id}")
-
- sku_str = f'J_{product_id}'
- price_url = f"https://p.3.cn/prices/mgets?skuIds={sku_str}&type=1"
-
- response = self._make_request('GET', price_url, headers={
- 'Referer': 'https://www.jd.com'
- })
-
- if response:
- try:
- price_data = response.json()
- if price_data and len(price_data) > 0:
- item = price_data[0]
- price = clean_price(item.get('p', '0'))
- original_price = clean_price(item.get('op', '0')) or None
-
- return {
- 'product_id': str(product_id),
- 'price': price,
- 'original_price': original_price,
- 'currency': 'CNY',
- 'platform': self.platform,
- 'source_url': f"https://item.jd.com/{product_id}.html",
- 'price_type': 'retail'
- }
- except json.JSONDecodeError as e:
- self.logger.error(f"解析价格数据失败: {e}")
-
- return None
-
- def set_cookie(self, cookie: str):
- """
- 设置 Cookie
- """
- self.cookie = cookie
- self.headers['Cookie'] = cookie
-
- def set_proxy(self, proxy: str):
- """
- 设置代理
- """
- self.proxy = proxy
- self.session.proxies = {
- 'http': proxy,
- 'https': proxy
- }
|