| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259 |
- import logging
- import time
- from abc import ABC, abstractmethod
- from typing import Dict, List, Optional, Any
- from datetime import datetime
- import requests
- from requests.adapters import HTTPAdapter
- from urllib3.util.retry import Retry
- from config.settings import CRAWLER_CONFIG
- from utils.helpers import retry, random_delay, clean_price
- class BaseCrawler(ABC):
- """
- 爬虫基类
- 所有平台爬虫都应该继承这个类
- """
-
- platform: str = ''
-
- def __init__(self):
- self.logger = logging.getLogger(__name__)
- self.config = CRAWLER_CONFIG.get(self.platform, {})
- self.headers = self.config.get('headers', {})
- self.timeout = self.config.get('timeout', 30)
- self.retry_times = self.config.get('retry_times', 3)
- self.retry_delay = self.config.get('retry_delay', 2)
- self.delay_range = self.config.get('delay_range', (1, 3))
-
- self.session = self._create_session()
-
- def _create_session(self) -> requests.Session:
- """
- 创建带有重试机制的会话
- """
- session = requests.Session()
- session.headers.update(self.headers)
-
- retry_strategy = Retry(
- total=self.retry_times,
- backoff_factor=self.retry_delay,
- status_forcelist=[429, 500, 502, 503, 504],
- allowed_methods=["GET", "POST"]
- )
-
- adapter = HTTPAdapter(max_retries=retry_strategy)
- session.mount("http://", adapter)
- session.mount("https://", adapter)
-
- return session
-
- def _make_request(self, method: str, url: str, params: Optional[Dict] = None,
- data: Optional[Dict] = None, json: Optional[Dict] = None,
- headers: Optional[Dict] = None) -> Optional[requests.Response]:
- """
- 发送HTTP请求
- :param method: HTTP方法 GET/POST
- :param url: 请求URL
- :param params: URL参数
- :param data: 表单数据
- :param json: JSON数据
- :param headers: 额外的请求头
- :return: Response对象或None
- """
- try:
- request_headers = self.headers.copy()
- if headers:
- request_headers.update(headers)
-
- self.logger.debug(f"发送 {method} 请求: {url}")
-
- if method.upper() == 'GET':
- response = self.session.get(
- url,
- params=params,
- headers=request_headers,
- timeout=self.timeout
- )
- else:
- response = self.session.post(
- url,
- params=params,
- data=data,
- json=json,
- headers=request_headers,
- timeout=self.timeout
- )
-
- response.raise_for_status()
- return response
-
- except requests.exceptions.RequestException as e:
- self.logger.error(f"请求失败: {url} - {e}")
- return None
-
- def _delay(self, min_seconds: float = None, max_seconds: float = None) -> float:
- """
- 随机延迟
- :param min_seconds: 最小延迟时间,默认使用配置中的值
- :param max_seconds: 最大延迟时间,默认使用配置中的值
- :return: 实际延迟时间
- """
- min_sec = min_seconds if min_seconds is not None else self.delay_range[0]
- max_sec = max_seconds if max_seconds is not None else self.delay_range[1]
- return random_delay(min_sec, max_sec)
-
- @abstractmethod
- def search(self, keyword: str, **kwargs) -> List[Dict[str, Any]]:
- """
- 搜索商品(抽象方法,子类必须实现)
- :param keyword: 搜索关键词
- :param kwargs: 其他参数
- :return: 商品列表
- """
- pass
-
- @abstractmethod
- def get_product_detail(self, product_id: str, **kwargs) -> Optional[Dict[str, Any]]:
- """
- 获取商品详情(抽象方法,子类必须实现)
- :param product_id: 商品ID
- :param kwargs: 其他参数
- :return: 商品详情
- """
- pass
-
- @abstractmethod
- def get_price(self, product_id: str, **kwargs) -> Optional[Dict[str, Any]]:
- """
- 获取商品价格(抽象方法,子类必须实现)
- :param product_id: 商品ID
- :param kwargs: 其他参数
- :return: 价格信息
- """
- pass
-
- def crawl_products(self, keywords: List[str], save_to_db: bool = True, **kwargs) -> List[Dict[str, Any]]:
- """
- 爬取多个关键词的商品
- :param keywords: 关键词列表
- :param save_to_db: 是否保存到数据库
- :param kwargs: 其他参数
- :return: 所有爬取的商品列表
- """
- all_products = []
-
- for keyword in keywords:
- self.logger.info(f"开始搜索关键词: {keyword}")
-
- products = self.search(keyword, **kwargs)
- self.logger.info(f"搜索 '{keyword}' 找到 {len(products)} 个商品")
-
- for product in products:
- product['keyword'] = keyword
- product['crawl_time'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
- all_products.append(product)
-
- self._delay()
-
- if save_to_db:
- self._save_products_to_db(all_products)
-
- return all_products
-
- def _save_products_to_db(self, products: List[Dict[str, Any]]) -> int:
- """
- 保存商品到数据库
- :param products: 商品列表
- :return: 成功保存的数量
- """
- try:
- from utils.db_utils import DBUtils
-
- db = DBUtils()
- success_count = 0
-
- for product in products:
- product_data = self._prepare_product_data(product)
- db_product = db.add_product(product_data)
-
- if db_product:
- price_data = self._prepare_price_data(product)
- if price_data:
- db.add_price_history(price_data)
- success_count += 1
-
- self.logger.info(f"成功保存 {success_count}/{len(products)} 个商品到数据库")
- return success_count
-
- except Exception as e:
- self.logger.error(f"保存商品到数据库失败: {e}")
- return 0
-
- def _prepare_product_data(self, product: Dict[str, Any]) -> Dict[str, Any]:
- """
- 准备商品数据,转换为数据库格式
- :param product: 原始商品数据
- :return: 格式化后的商品数据
- """
- return {
- 'product_id': str(product.get('product_id', '')),
- 'name': product.get('name', ''),
- 'url': product.get('url', ''),
- 'image_url': product.get('image_url', ''),
- 'shop_name': product.get('shop_name', ''),
- 'platform': self.platform,
- 'category': product.get('category', ''),
- 'brand': product.get('brand', ''),
- 'is_wholesale': 1 if product.get('is_wholesale', False) else 0,
- 'min_order_quantity': product.get('min_order_quantity'),
- 'unit': product.get('unit', ''),
- 'current_price': clean_price(str(product.get('price', 0))),
- 'original_price': clean_price(str(product.get('original_price', 0))) if product.get('original_price') else None,
- 'currency': product.get('currency', 'CNY'),
- 'sales_volume': product.get('sales_volume'),
- 'rating': product.get('rating'),
- 'review_count': product.get('review_count'),
- 'stock': product.get('stock'),
- 'description': product.get('description', ''),
- 'crawl_time': datetime.now()
- }
-
- def _prepare_price_data(self, product: Dict[str, Any]) -> Optional[Dict[str, Any]]:
- """
- 准备价格历史数据
- :param product: 商品数据
- :return: 价格数据字典或None
- """
- price = clean_price(str(product.get('price', 0)))
- if price <= 0:
- return None
-
- return {
- 'product_id': str(product.get('product_id', '')),
- 'price': price,
- 'original_price': clean_price(str(product.get('original_price', 0))) if product.get('original_price') else None,
- 'currency': product.get('currency', 'CNY'),
- 'platform': self.platform,
- 'source_url': product.get('url', ''),
- 'crawl_time': datetime.now(),
- 'price_type': 'wholesale' if product.get('is_wholesale', False) else 'retail',
- 'min_quantity': product.get('min_order_quantity'),
- 'max_quantity': product.get('max_order_quantity')
- }
-
- def close(self):
- """
- 关闭会话
- """
- if hasattr(self, 'session'):
- self.session.close()
-
- def __enter__(self):
- return self
-
- def __exit__(self, exc_type, exc_val, exc_tb):
- self.close()
|