import re import json import logging from typing import Dict, List, Optional, Any from urllib.parse import urlencode, quote from .base import BaseCrawler from utils.helpers import clean_price, parse_sales_volume class JdCrawler(BaseCrawler): """ 京东平台爬虫 注意:京东有较强的反爬机制,可能需要配置 Cookie 或使用代理 """ platform = 'jd' def __init__(self, cookie: str = None, proxy: str = None): """ 初始化京东爬虫 :param cookie: 京东登录后的 Cookie 字符串 :param proxy: 代理服务器地址 """ super().__init__() self.cookie = cookie self.proxy = proxy if cookie: self.headers['Cookie'] = cookie if proxy: self.session.proxies = { 'http': proxy, 'https': proxy } def search(self, keyword: str, page: int = 1, sort: str = 'sort_totalsales15_desc', **kwargs) -> List[Dict[str, Any]]: """ 搜索京东商品 :param keyword: 搜索关键词 :param page: 页码,从1开始 :param sort: 排序方式: - sort_totalsales15_desc (销量) - sort_price_asc (价格升序) - sort_price_desc (价格降序) - sort_discount_desc (折扣) - sort_totalsales15_desc (销量) :return: 商品列表 """ self.logger.info(f"搜索京东商品: keyword={keyword}, page={page}") params = { 'keyword': keyword, 'wq': keyword, 'pvid': self._generate_pvid(), 'page': page, 's': (page - 1) * 30 + 1, } if sort and sort != 'default': params['psort'] = sort search_url = f"{self.config.get('search_url')}?{urlencode(params)}" self.logger.debug(f"搜索URL: {search_url}") response = self._make_request('GET', search_url) if not response: self.logger.warning(f"搜索请求失败: {keyword}") return [] products = self._parse_search_result(response.text, search_url) if products: self._fill_product_prices(products) self.logger.info(f"从搜索结果中解析到 {len(products)} 个商品") return products def _generate_pvid(self) -> str: """ 生成京东 pvid """ import time import random timestamp = int(time.time() * 1000) random_num = random.randint(100000, 999999) return f"{timestamp}{random_num}" def _parse_search_result(self, html: str, source_url: str) -> List[Dict[str, Any]]: """ 解析京东搜索结果页面 """ products = [] item_pattern = r']*data-sku="(\d+)"[^>]*>(.*?)' items = re.findall(item_pattern, html, re.DOTALL | re.IGNORECASE) self.logger.debug(f"找到 {len(items)} 个商品项") for sku_id, item_html in items: try: product = self._parse_item_html(sku_id, item_html, source_url) if product: products.append(product) except Exception as e: self.logger.error(f"解析商品失败: {e}") continue if not products: products = self._parse_from_json(html, source_url) return products def _parse_item_html(self, sku_id: str, item_html: str, source_url: str) -> Optional[Dict[str, Any]]: """ 解析单个商品HTML """ try: title_match = re.search(r']*class="p-name[^"]*"[^>]*>.*?]*>(.*?)', item_html, re.DOTALL | re.IGNORECASE) title = '' if title_match: title = title_match.group(1) title = re.sub(r'<[^>]+>', '', title).strip() title = re.sub(r'\s+', ' ', title) if not title: return None price_match = re.search(r']*class="p-price[^"]*"[^>]*>.*?]*>([\d.]*)', item_html, re.DOTALL | re.IGNORECASE) price = clean_price(price_match.group(1)) if price_match else 0 shop_match = re.search(r']*class="p-shop[^"]*"[^>]*>.*?]*>(.*?)', item_html, re.DOTALL | re.IGNORECASE) shop_name = '' if shop_match: shop_name = shop_match.group(1) shop_name = re.sub(r'<[^>]+>', '', shop_name).strip() sales_match = re.search(r']*class="p-commit[^"]*"[^>]*>.*?]*>([^<]+)', item_html, re.DOTALL | re.IGNORECASE) sales_str = sales_match.group(1) if sales_match else '' sales_volume = parse_sales_volume(sales_str) img_match = re.search(r']*class="p-img[^"]*"[^>]*>.*?]*data-lazy-img="([^"]+)"', item_html, re.DOTALL | re.IGNORECASE) or \ re.search(r']*class="p-img[^"]*"[^>]*>.*?]*src="([^"]+)"', item_html, re.DOTALL | re.IGNORECASE) img_url = img_match.group(1) if img_match else '' if img_url and not img_url.startswith('http'): img_url = 'https:' + img_url detail_url = f"https://item.jd.com/{sku_id}.html" is_self = '京东自营' in shop_name or 'p-icons' in item_html and '自营' in item_html return { 'product_id': str(sku_id), 'name': title, 'price': price, 'image_url': img_url, 'url': detail_url, 'shop_name': shop_name, 'sales_volume': sales_volume, 'is_jd_self': is_self, 'source_url': source_url, 'is_wholesale': False, 'currency': 'CNY' } except Exception as e: self.logger.error(f"解析商品HTML失败: {e}") return None def _parse_from_json(self, html: str, source_url: str) -> List[Dict[str, Any]]: """ 从页面中的 JSON 数据解析商品 """ products = [] json_pattern = r'window\.__SEARCH_RESULT__\s*=\s*({.*?});' json_match = re.search(json_pattern, html, re.DOTALL) if json_match: try: data = json.loads(json_match.group(1)) items = data.get('wareList', {}).get('wareInfo', []) for item in items: product = self._parse_json_item(item, source_url) if product: products.append(product) except json.JSONDecodeError as e: self.logger.error(f"解析JSON失败: {e}") return products def _parse_json_item(self, item: Dict, source_url: str) -> Optional[Dict[str, Any]]: """ 解析 JSON 格式的商品数据 """ try: sku_id = item.get('wname', '') or item.get('wareId', '') if not sku_id: return None title = item.get('wname', '').strip() if not title: return None price = clean_price(str(item.get('price', '0'))) original_price = clean_price(str(item.get('oprice', '0'))) or None img_url = item.get('imgurl', '') if img_url and not img_url.startswith('http'): img_url = 'https:' + img_url shop_name = item.get('goodShop', {}).get('shopName', '') if item.get('goodShop') else '' sales_str = item.get('reviews', '') sales_volume = parse_sales_volume(sales_str) detail_url = f"https://item.jd.com/{sku_id}.html" return { 'product_id': str(sku_id), 'name': title, 'price': price, 'original_price': original_price, 'image_url': img_url, 'url': detail_url, 'shop_name': shop_name, 'sales_volume': sales_volume, 'source_url': source_url, 'is_wholesale': False, 'currency': 'CNY' } except Exception as e: self.logger.error(f"解析JSON商品失败: {e}") return None def _fill_product_prices(self, products: List[Dict[str, Any]]): """ 批量获取商品价格(京东价格接口) """ if not products: return sku_ids = [p.get('product_id') for p in products if p.get('product_id') and p.get('price', 0) <= 0] if not sku_ids: return self.logger.debug(f"批量获取 {len(sku_ids)} 个商品的价格") sku_str = ','.join([f'J_{sku}' for sku in sku_ids]) price_url = f"https://p.3.cn/prices/mgets?skuIds={sku_str}&type=1" response = self._make_request('GET', price_url, headers={ 'Referer': 'https://www.jd.com' }) if response: try: price_data = response.json() price_map = {} for item in price_data: sku = item.get('id', '').replace('J_', '') price = clean_price(item.get('p', '0')) original_price = clean_price(item.get('op', '0')) or None price_map[sku] = {'price': price, 'original_price': original_price} for product in products: sku = product.get('product_id') if sku in price_map: if product.get('price', 0) <= 0: product['price'] = price_map[sku]['price'] if not product.get('original_price'): product['original_price'] = price_map[sku]['original_price'] except json.JSONDecodeError as e: self.logger.error(f"解析价格数据失败: {e}") def get_product_detail(self, product_id: str, **kwargs) -> Optional[Dict[str, Any]]: """ 获取京东商品详情 :param product_id: 商品ID (sku) :return: 商品详情 """ self.logger.info(f"获取京东商品详情: product_id={product_id}") detail_url = f"https://item.jd.com/{product_id}.html" response = self._make_request('GET', detail_url) if not response: self.logger.warning(f"获取商品详情失败: {product_id}") return None return self._parse_product_detail(response.text, product_id, detail_url) def _parse_product_detail(self, html: str, product_id: str, source_url: str) -> Optional[Dict[str, Any]]: """ 解析商品详情页面 """ try: title_match = re.search(r'([^<]+)', html) or \ re.search(r']*class="sku-name[^"]*"[^>]*>(.*?)', html, re.DOTALL | re.IGNORECASE) title = '' if title_match: title = title_match.group(1) title = re.sub(r'<[^>]+>', '', title).strip() title = title.split('-')[0].strip() if '-' in title else title shop_match = re.search(r']*class="name[^"]*"[^>]*>.*?]*>(.*?)', html, re.DOTALL | re.IGNORECASE) or \ re.search(r'shopName\s*:\s*"([^"]+)"', html) shop_name = shop_match.group(1).strip() if shop_match else '' self._fill_product_prices([{'product_id': product_id, 'price': 0}]) return { 'product_id': str(product_id), 'name': title, 'url': source_url, 'shop_name': shop_name, 'is_wholesale': False, 'currency': 'CNY' } except Exception as e: self.logger.error(f"解析商品详情失败: {e}") return None def get_price(self, product_id: str, **kwargs) -> Optional[Dict[str, Any]]: """ 获取商品价格 :param product_id: 商品ID :return: 价格信息 """ self.logger.info(f"获取京东商品价格: product_id={product_id}") sku_str = f'J_{product_id}' price_url = f"https://p.3.cn/prices/mgets?skuIds={sku_str}&type=1" response = self._make_request('GET', price_url, headers={ 'Referer': 'https://www.jd.com' }) if response: try: price_data = response.json() if price_data and len(price_data) > 0: item = price_data[0] price = clean_price(item.get('p', '0')) original_price = clean_price(item.get('op', '0')) or None return { 'product_id': str(product_id), 'price': price, 'original_price': original_price, 'currency': 'CNY', 'platform': self.platform, 'source_url': f"https://item.jd.com/{product_id}.html", 'price_type': 'retail' } except json.JSONDecodeError as e: self.logger.error(f"解析价格数据失败: {e}") return None def set_cookie(self, cookie: str): """ 设置 Cookie """ self.cookie = cookie self.headers['Cookie'] = cookie def set_proxy(self, proxy: str): """ 设置代理 """ self.proxy = proxy self.session.proxies = { 'http': proxy, 'https': proxy }