import logging import time from abc import ABC, abstractmethod from typing import Dict, List, Optional, Any from datetime import datetime import requests from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry from config.settings import CRAWLER_CONFIG from utils.helpers import retry, random_delay, clean_price class BaseCrawler(ABC): """ 爬虫基类 所有平台爬虫都应该继承这个类 """ platform: str = '' def __init__(self): self.logger = logging.getLogger(__name__) self.config = CRAWLER_CONFIG.get(self.platform, {}) self.headers = self.config.get('headers', {}) self.timeout = self.config.get('timeout', 30) self.retry_times = self.config.get('retry_times', 3) self.retry_delay = self.config.get('retry_delay', 2) self.delay_range = self.config.get('delay_range', (1, 3)) self.session = self._create_session() def _create_session(self) -> requests.Session: """ 创建带有重试机制的会话 """ session = requests.Session() session.headers.update(self.headers) retry_strategy = Retry( total=self.retry_times, backoff_factor=self.retry_delay, status_forcelist=[429, 500, 502, 503, 504], allowed_methods=["GET", "POST"] ) adapter = HTTPAdapter(max_retries=retry_strategy) session.mount("http://", adapter) session.mount("https://", adapter) return session def _make_request(self, method: str, url: str, params: Optional[Dict] = None, data: Optional[Dict] = None, json: Optional[Dict] = None, headers: Optional[Dict] = None) -> Optional[requests.Response]: """ 发送HTTP请求 :param method: HTTP方法 GET/POST :param url: 请求URL :param params: URL参数 :param data: 表单数据 :param json: JSON数据 :param headers: 额外的请求头 :return: Response对象或None """ try: request_headers = self.headers.copy() if headers: request_headers.update(headers) self.logger.debug(f"发送 {method} 请求: {url}") if method.upper() == 'GET': response = self.session.get( url, params=params, headers=request_headers, timeout=self.timeout ) else: response = self.session.post( url, params=params, data=data, json=json, headers=request_headers, timeout=self.timeout ) response.raise_for_status() return response except requests.exceptions.RequestException as e: self.logger.error(f"请求失败: {url} - {e}") return None def _delay(self, min_seconds: float = None, max_seconds: float = None) -> float: """ 随机延迟 :param min_seconds: 最小延迟时间,默认使用配置中的值 :param max_seconds: 最大延迟时间,默认使用配置中的值 :return: 实际延迟时间 """ min_sec = min_seconds if min_seconds is not None else self.delay_range[0] max_sec = max_seconds if max_seconds is not None else self.delay_range[1] return random_delay(min_sec, max_sec) @abstractmethod def search(self, keyword: str, **kwargs) -> List[Dict[str, Any]]: """ 搜索商品(抽象方法,子类必须实现) :param keyword: 搜索关键词 :param kwargs: 其他参数 :return: 商品列表 """ pass @abstractmethod def get_product_detail(self, product_id: str, **kwargs) -> Optional[Dict[str, Any]]: """ 获取商品详情(抽象方法,子类必须实现) :param product_id: 商品ID :param kwargs: 其他参数 :return: 商品详情 """ pass @abstractmethod def get_price(self, product_id: str, **kwargs) -> Optional[Dict[str, Any]]: """ 获取商品价格(抽象方法,子类必须实现) :param product_id: 商品ID :param kwargs: 其他参数 :return: 价格信息 """ pass def crawl_products(self, keywords: List[str], save_to_db: bool = True, **kwargs) -> List[Dict[str, Any]]: """ 爬取多个关键词的商品 :param keywords: 关键词列表 :param save_to_db: 是否保存到数据库 :param kwargs: 其他参数 :return: 所有爬取的商品列表 """ all_products = [] for keyword in keywords: self.logger.info(f"开始搜索关键词: {keyword}") products = self.search(keyword, **kwargs) self.logger.info(f"搜索 '{keyword}' 找到 {len(products)} 个商品") for product in products: product['keyword'] = keyword product['crawl_time'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S') all_products.append(product) self._delay() if save_to_db: self._save_products_to_db(all_products) return all_products def _save_products_to_db(self, products: List[Dict[str, Any]]) -> int: """ 保存商品到数据库 :param products: 商品列表 :return: 成功保存的数量 """ try: from utils.db_utils import DBUtils db = DBUtils() success_count = 0 for product in products: product_data = self._prepare_product_data(product) db_product = db.add_product(product_data) if db_product: price_data = self._prepare_price_data(product) if price_data: db.add_price_history(price_data) success_count += 1 self.logger.info(f"成功保存 {success_count}/{len(products)} 个商品到数据库") return success_count except Exception as e: self.logger.error(f"保存商品到数据库失败: {e}") return 0 def _prepare_product_data(self, product: Dict[str, Any]) -> Dict[str, Any]: """ 准备商品数据,转换为数据库格式 :param product: 原始商品数据 :return: 格式化后的商品数据 """ return { 'product_id': str(product.get('product_id', '')), 'name': product.get('name', ''), 'url': product.get('url', ''), 'image_url': product.get('image_url', ''), 'shop_name': product.get('shop_name', ''), 'platform': self.platform, 'category': product.get('category', ''), 'brand': product.get('brand', ''), 'is_wholesale': 1 if product.get('is_wholesale', False) else 0, 'min_order_quantity': product.get('min_order_quantity'), 'unit': product.get('unit', ''), 'current_price': clean_price(str(product.get('price', 0))), 'original_price': clean_price(str(product.get('original_price', 0))) if product.get('original_price') else None, 'currency': product.get('currency', 'CNY'), 'sales_volume': product.get('sales_volume'), 'rating': product.get('rating'), 'review_count': product.get('review_count'), 'stock': product.get('stock'), 'description': product.get('description', ''), 'crawl_time': datetime.now() } def _prepare_price_data(self, product: Dict[str, Any]) -> Optional[Dict[str, Any]]: """ 准备价格历史数据 :param product: 商品数据 :return: 价格数据字典或None """ price = clean_price(str(product.get('price', 0))) if price <= 0: return None return { 'product_id': str(product.get('product_id', '')), 'price': price, 'original_price': clean_price(str(product.get('original_price', 0))) if product.get('original_price') else None, 'currency': product.get('currency', 'CNY'), 'platform': self.platform, 'source_url': product.get('url', ''), 'crawl_time': datetime.now(), 'price_type': 'wholesale' if product.get('is_wholesale', False) else 'retail', 'min_quantity': product.get('min_order_quantity'), 'max_quantity': product.get('max_order_quantity') } def close(self): """ 关闭会话 """ if hasattr(self, 'session'): self.session.close() def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.close()