import re import json import logging from typing import Dict, List, Optional, Any, Tuple from urllib.parse import urlencode, quote, urlparse, parse_qs from .base import BaseCrawler from utils.helpers import clean_price, parse_sales_volume, extract_numbers class Alibaba1688Crawler(BaseCrawler): """ 1688批发平台爬虫 重点关注批发价格、起订量等批发相关信息 """ platform = 'alibaba1688' def __init__(self, cookie: str = None, proxy: str = None): """ 初始化1688爬虫 :param cookie: 1688登录后的 Cookie 字符串 :param proxy: 代理服务器地址 """ super().__init__() self.cookie = cookie self.proxy = proxy if cookie: self.headers['Cookie'] = cookie if proxy: self.session.proxies = { 'http': proxy, 'https': proxy } def search(self, keyword: str, page: int = 1, sort: str = 'default', price_range: Tuple[float, float] = None, **kwargs) -> List[Dict[str, Any]]: """ 搜索1688商品 :param keyword: 搜索关键词 :param page: 页码,从1开始 :param sort: 排序方式: - default (综合) - va (销量) - price_asc (价格升序) - price_desc (价格降序) - bookTime (最新发布) :param price_range: 价格范围 (min_price, max_price) :return: 商品列表 """ self.logger.info(f"搜索1688商品: keyword={keyword}, page={page}") params = { 'keywords': keyword, 'pageSize': 40, 'beginPage': page, } if sort and sort != 'default': params['sortType'] = sort if price_range: min_price, max_price = price_range if min_price: params['filtPriceMin'] = min_price if max_price: params['filtPriceMax'] = max_price search_url = f"{self.config.get('search_url')}?{urlencode(params)}" self.logger.debug(f"搜索URL: {search_url}") response = self._make_request('GET', search_url) if not response: self.logger.warning(f"搜索请求失败: {keyword}") return [] products = self._parse_search_result(response.text, search_url) self.logger.info(f"从搜索结果中解析到 {len(products)} 个商品") return products def _parse_search_result(self, html: str, source_url: str) -> List[Dict[str, Any]]: """ 解析1688搜索结果页面 """ products = [] json_pattern = r'window\.pageData\s*=\s*({.*?});\s*' json_match = re.search(json_pattern, html, re.DOTALL) if json_match: try: page_data = json.loads(json_match.group(1)) items = page_data.get('data', {}).get('searchList', []) or \ page_data.get('data', {}).get('offerList', []) or \ page_data.get('offerList', []) for item in items: product = self._parse_json_item(item, source_url) if product: products.append(product) except json.JSONDecodeError as e: self.logger.error(f"解析pageData失败: {e}") if not products: products = self._parse_html_directly(html, source_url) return products def _parse_json_item(self, item: Dict, source_url: str) -> Optional[Dict[str, Any]]: """ 解析JSON格式的商品数据 """ try: offer_id = item.get('offerId', '') or item.get('id', '') if not offer_id: return None title = item.get('subject', '') or item.get('title', '') title = title.strip() if not title: return None price_info = self._extract_price_info(item) image_url = item.get('imgUrl', '') or item.get('imageUrl', '') if image_url and not image_url.startswith('http'): image_url = 'https:' + image_url detail_url = item.get('detailUrl', '') or f"https://detail.1688.com/offer/{offer_id}.html" if detail_url and not detail_url.startswith('http'): detail_url = 'https:' + detail_url shop_name = item.get('companyName', '') or item.get('shopName', '') sales_str = item.get('soldQuantity', '') or item.get('salesNum', '') or str(item.get('bookedCount', '')) sales_volume = parse_sales_volume(str(sales_str)) if sales_str else None min_order = item.get('minOrder', '') or item.get('startQuantity', '') min_order_quantity = self._parse_quantity(min_order) unit = item.get('unit', '') or item.get('quantityUnit', '件') is_wholesale = True return { 'product_id': str(offer_id), 'name': title, 'price': price_info.get('min_price', 0), 'original_price': price_info.get('original_price'), 'price_ranges': price_info.get('price_ranges', []), 'image_url': image_url, 'url': detail_url, 'shop_name': shop_name, 'sales_volume': sales_volume, 'min_order_quantity': min_order_quantity, 'unit': unit, 'source_url': source_url, 'is_wholesale': is_wholesale, 'currency': 'CNY' } except Exception as e: self.logger.error(f"解析JSON商品失败: {e}") return None def _extract_price_info(self, item: Dict) -> Dict[str, Any]: """ 从商品数据中提取价格信息 1688的价格通常是区间价格,根据起订量不同价格不同 """ price_info = { 'min_price': 0, 'max_price': 0, 'original_price': None, 'price_ranges': [] } price_ranges = item.get('priceRanges', []) or item.get('priceRangeList', []) if price_ranges: prices = [] for pr in price_ranges: price = clean_price(str(pr.get('price', '0'))) min_quantity = pr.get('quantity', 0) or pr.get('startQuantity', 0) max_quantity = pr.get('endQuantity') prices.append(price) price_info['price_ranges'].append({ 'min_quantity': min_quantity, 'max_quantity': max_quantity, 'price': price }) if prices: price_info['min_price'] = min(prices) price_info['max_price'] = max(prices) else: price_str = item.get('price', '') or item.get('displayPrice', '') if price_str: price = clean_price(str(price_str)) price_info['min_price'] = price price_info['max_price'] = price original_price_str = item.get('originalPrice', '') or item.get('marketPrice', '') if original_price_str: price_info['original_price'] = clean_price(str(original_price_str)) return price_info def _parse_quantity(self, quantity_str: Any) -> Optional[int]: """ 解析数量字符串 """ if not quantity_str: return None quantity_str = str(quantity_str).strip() numbers = extract_numbers(quantity_str) if numbers: return int(numbers[0]) return None def _parse_html_directly(self, html: str, source_url: str) -> List[Dict[str, Any]]: """ 直接从HTML解析商品(备用方法) """ products = [] offer_pattern = r'data-offerid="(\d+)"[^>]*>(.*?)\s*\s*' offers = re.findall(offer_pattern, html, re.DOTALL | re.IGNORECASE) for offer_id, offer_html in offers: try: title_match = re.search(r']*title="([^"]+)"', offer_html) or \ re.search(r']*class="[^"]*title[^"]*"[^>]*>.*?]*>(.*?)', offer_html, re.DOTALL) title = '' if title_match: title = title_match.group(1) title = re.sub(r'<[^>]+>', '', title).strip() if not title: continue price_match = re.search(r']*class="[^"]*price[^"]*"[^>]*>.*?]*>([\d¥.,]+)', offer_html, re.DOTALL | re.IGNORECASE) or \ re.search(r'¥([\d.]+)', offer_html) price = clean_price(price_match.group(1)) if price_match else 0 img_match = re.search(r']*data-src="([^"]+)"', offer_html) or \ re.search(r']*src="([^"]+)"', offer_html) img_url = img_match.group(1) if img_match else '' if img_url and not img_url.startswith('http'): img_url = 'https:' + img_url shop_match = re.search(r']*class="[^"]*company[^"]*"[^>]*>.*?]*>(.*?)', offer_html, re.DOTALL | re.IGNORECASE) shop_name = shop_match.group(1).strip() if shop_match else '' shop_name = re.sub(r'<[^>]+>', '', shop_name) sales_match = re.search(r']*class="[^"]*sales[^"]*"[^>]*>([^<]+)', offer_html, re.IGNORECASE) or \ re.search(r'成交量[::]\s*([\d万+]+)', offer_html) sales_str = sales_match.group(1) if sales_match else '' sales_volume = parse_sales_volume(sales_str) min_order_match = re.search(r'起订量?[::]\s*([\d]+)', offer_html) or \ re.search(r']*class="[^"]*moq[^"]*"[^>]*>([^<]+)', offer_html, re.IGNORECASE) min_order_quantity = self._parse_quantity(min_order_match.group(1)) if min_order_match else None detail_url = f"https://detail.1688.com/offer/{offer_id}.html" products.append({ 'product_id': str(offer_id), 'name': title, 'price': price, 'image_url': img_url, 'url': detail_url, 'shop_name': shop_name, 'sales_volume': sales_volume, 'min_order_quantity': min_order_quantity, 'source_url': source_url, 'is_wholesale': True, 'currency': 'CNY' }) except Exception as e: self.logger.error(f"解析HTML商品失败: {e}") continue return products def get_product_detail(self, product_id: str, **kwargs) -> Optional[Dict[str, Any]]: """ 获取1688商品详情 :param product_id: 商品ID (offerId) :return: 商品详情 """ self.logger.info(f"获取1688商品详情: product_id={product_id}") detail_url = f"https://detail.1688.com/offer/{product_id}.html" response = self._make_request('GET', detail_url) if not response: self.logger.warning(f"获取商品详情失败: {product_id}") return None return self._parse_product_detail(response.text, product_id, detail_url) def _parse_product_detail(self, html: str, product_id: str, source_url: str) -> Optional[Dict[str, Any]]: """ 解析商品详情页面 """ try: title_match = re.search(r'([^<]+)', html) title = title_match.group(1).split('-')[0].strip() if title_match else '' json_pattern = r'window\.iDetailData\s*=\s*({.*?});\s*' json_match = re.search(json_pattern, html, re.DOTALL) price_info = {'min_price': 0, 'price_ranges': []} shop_name = '' min_order_quantity = None unit = '件' sales_volume = None if json_match: try: detail_data = json.loads(json_match.group(1)) if not title: title = detail_data.get('subject', '') or detail_data.get('title', '') price_module = detail_data.get('price', {}) if price_module: price_ranges = price_module.get('priceRanges', []) if price_ranges: prices = [] for pr in price_ranges: price = clean_price(str(pr.get('price', '0'))) min_qty = pr.get('quantity', 0) max_qty = pr.get('endQuantity') prices.append(price) price_info['price_ranges'].append({ 'min_quantity': min_qty, 'max_quantity': max_qty, 'price': price }) if prices: price_info['min_price'] = min(prices) else: price_str = price_module.get('showPrice', '') or price_module.get('price', '') if price_str: price_info['min_price'] = clean_price(str(price_str)) shop_name = detail_data.get('companyName', '') or detail_data.get('shopName', '') sales_data = detail_data.get('trade', {}) sales_volume = sales_data.get('soldQuantity') or sales_data.get('totalSoldQuantity') moq_data = detail_data.get('moq', {}) min_order_quantity = moq_data.get('minOrderQuantity') unit = moq_data.get('unit', '件') except json.JSONDecodeError as e: self.logger.error(f"解析商品详情JSON失败: {e}") if price_info['min_price'] <= 0: price_match = re.search(r'["\']price["\']\s*:\s*["\']?([\d.]+)["\']?', html) or \ re.search(r'¥([\d.]+)', html) if price_match: price_info['min_price'] = clean_price(price_match.group(1)) return { 'product_id': str(product_id), 'name': title, 'price': price_info['min_price'], 'price_ranges': price_info.get('price_ranges', []), 'url': source_url, 'shop_name': shop_name, 'sales_volume': sales_volume, 'min_order_quantity': min_order_quantity, 'unit': unit, 'is_wholesale': True, 'currency': 'CNY' } except Exception as e: self.logger.error(f"解析商品详情失败: {e}") return None def get_price(self, product_id: str, **kwargs) -> Optional[Dict[str, Any]]: """ 获取商品价格 :param product_id: 商品ID :return: 价格信息 """ detail = self.get_product_detail(product_id, **kwargs) if detail: return { 'product_id': str(product_id), 'price': detail.get('price', 0), 'original_price': detail.get('original_price'), 'currency': 'CNY', 'platform': self.platform, 'source_url': detail.get('url', ''), 'price_type': 'wholesale', 'price_ranges': detail.get('price_ranges', []), 'min_quantity': detail.get('min_order_quantity') } return None def get_wholesale_prices(self, product_id: str, **kwargs) -> List[Dict[str, Any]]: """ 获取批发价格区间 :param product_id: 商品ID :return: 价格区间列表 """ detail = self.get_product_detail(product_id, **kwargs) if detail: price_ranges = detail.get('price_ranges', []) if price_ranges: return price_ranges price = detail.get('price', 0) min_qty = detail.get('min_order_quantity', 1) return [{ 'min_quantity': min_qty, 'max_quantity': None, 'price': price }] return [] def set_cookie(self, cookie: str): """ 设置 Cookie """ self.cookie = cookie self.headers['Cookie'] = cookie def set_proxy(self, proxy: str): """ 设置代理 """ self.proxy = proxy self.session.proxies = { 'http': proxy, 'https': proxy }