base.py 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259
  1. import logging
  2. import time
  3. from abc import ABC, abstractmethod
  4. from typing import Dict, List, Optional, Any
  5. from datetime import datetime
  6. import requests
  7. from requests.adapters import HTTPAdapter
  8. from urllib3.util.retry import Retry
  9. from config.settings import CRAWLER_CONFIG
  10. from utils.helpers import retry, random_delay, clean_price
  11. class BaseCrawler(ABC):
  12. """
  13. 爬虫基类
  14. 所有平台爬虫都应该继承这个类
  15. """
  16. platform: str = ''
  17. def __init__(self):
  18. self.logger = logging.getLogger(__name__)
  19. self.config = CRAWLER_CONFIG.get(self.platform, {})
  20. self.headers = self.config.get('headers', {})
  21. self.timeout = self.config.get('timeout', 30)
  22. self.retry_times = self.config.get('retry_times', 3)
  23. self.retry_delay = self.config.get('retry_delay', 2)
  24. self.delay_range = self.config.get('delay_range', (1, 3))
  25. self.session = self._create_session()
  26. def _create_session(self) -> requests.Session:
  27. """
  28. 创建带有重试机制的会话
  29. """
  30. session = requests.Session()
  31. session.headers.update(self.headers)
  32. retry_strategy = Retry(
  33. total=self.retry_times,
  34. backoff_factor=self.retry_delay,
  35. status_forcelist=[429, 500, 502, 503, 504],
  36. allowed_methods=["GET", "POST"]
  37. )
  38. adapter = HTTPAdapter(max_retries=retry_strategy)
  39. session.mount("http://", adapter)
  40. session.mount("https://", adapter)
  41. return session
  42. def _make_request(self, method: str, url: str, params: Optional[Dict] = None,
  43. data: Optional[Dict] = None, json: Optional[Dict] = None,
  44. headers: Optional[Dict] = None) -> Optional[requests.Response]:
  45. """
  46. 发送HTTP请求
  47. :param method: HTTP方法 GET/POST
  48. :param url: 请求URL
  49. :param params: URL参数
  50. :param data: 表单数据
  51. :param json: JSON数据
  52. :param headers: 额外的请求头
  53. :return: Response对象或None
  54. """
  55. try:
  56. request_headers = self.headers.copy()
  57. if headers:
  58. request_headers.update(headers)
  59. self.logger.debug(f"发送 {method} 请求: {url}")
  60. if method.upper() == 'GET':
  61. response = self.session.get(
  62. url,
  63. params=params,
  64. headers=request_headers,
  65. timeout=self.timeout
  66. )
  67. else:
  68. response = self.session.post(
  69. url,
  70. params=params,
  71. data=data,
  72. json=json,
  73. headers=request_headers,
  74. timeout=self.timeout
  75. )
  76. response.raise_for_status()
  77. return response
  78. except requests.exceptions.RequestException as e:
  79. self.logger.error(f"请求失败: {url} - {e}")
  80. return None
  81. def _delay(self, min_seconds: float = None, max_seconds: float = None) -> float:
  82. """
  83. 随机延迟
  84. :param min_seconds: 最小延迟时间,默认使用配置中的值
  85. :param max_seconds: 最大延迟时间,默认使用配置中的值
  86. :return: 实际延迟时间
  87. """
  88. min_sec = min_seconds if min_seconds is not None else self.delay_range[0]
  89. max_sec = max_seconds if max_seconds is not None else self.delay_range[1]
  90. return random_delay(min_sec, max_sec)
  91. @abstractmethod
  92. def search(self, keyword: str, **kwargs) -> List[Dict[str, Any]]:
  93. """
  94. 搜索商品(抽象方法,子类必须实现)
  95. :param keyword: 搜索关键词
  96. :param kwargs: 其他参数
  97. :return: 商品列表
  98. """
  99. pass
  100. @abstractmethod
  101. def get_product_detail(self, product_id: str, **kwargs) -> Optional[Dict[str, Any]]:
  102. """
  103. 获取商品详情(抽象方法,子类必须实现)
  104. :param product_id: 商品ID
  105. :param kwargs: 其他参数
  106. :return: 商品详情
  107. """
  108. pass
  109. @abstractmethod
  110. def get_price(self, product_id: str, **kwargs) -> Optional[Dict[str, Any]]:
  111. """
  112. 获取商品价格(抽象方法,子类必须实现)
  113. :param product_id: 商品ID
  114. :param kwargs: 其他参数
  115. :return: 价格信息
  116. """
  117. pass
  118. def crawl_products(self, keywords: List[str], save_to_db: bool = True, **kwargs) -> List[Dict[str, Any]]:
  119. """
  120. 爬取多个关键词的商品
  121. :param keywords: 关键词列表
  122. :param save_to_db: 是否保存到数据库
  123. :param kwargs: 其他参数
  124. :return: 所有爬取的商品列表
  125. """
  126. all_products = []
  127. for keyword in keywords:
  128. self.logger.info(f"开始搜索关键词: {keyword}")
  129. products = self.search(keyword, **kwargs)
  130. self.logger.info(f"搜索 '{keyword}' 找到 {len(products)} 个商品")
  131. for product in products:
  132. product['keyword'] = keyword
  133. product['crawl_time'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
  134. all_products.append(product)
  135. self._delay()
  136. if save_to_db:
  137. self._save_products_to_db(all_products)
  138. return all_products
  139. def _save_products_to_db(self, products: List[Dict[str, Any]]) -> int:
  140. """
  141. 保存商品到数据库
  142. :param products: 商品列表
  143. :return: 成功保存的数量
  144. """
  145. try:
  146. from utils.db_utils import DBUtils
  147. db = DBUtils()
  148. success_count = 0
  149. for product in products:
  150. product_data = self._prepare_product_data(product)
  151. db_product = db.add_product(product_data)
  152. if db_product:
  153. price_data = self._prepare_price_data(product)
  154. if price_data:
  155. db.add_price_history(price_data)
  156. success_count += 1
  157. self.logger.info(f"成功保存 {success_count}/{len(products)} 个商品到数据库")
  158. return success_count
  159. except Exception as e:
  160. self.logger.error(f"保存商品到数据库失败: {e}")
  161. return 0
  162. def _prepare_product_data(self, product: Dict[str, Any]) -> Dict[str, Any]:
  163. """
  164. 准备商品数据,转换为数据库格式
  165. :param product: 原始商品数据
  166. :return: 格式化后的商品数据
  167. """
  168. return {
  169. 'product_id': str(product.get('product_id', '')),
  170. 'name': product.get('name', ''),
  171. 'url': product.get('url', ''),
  172. 'image_url': product.get('image_url', ''),
  173. 'shop_name': product.get('shop_name', ''),
  174. 'platform': self.platform,
  175. 'category': product.get('category', ''),
  176. 'brand': product.get('brand', ''),
  177. 'is_wholesale': 1 if product.get('is_wholesale', False) else 0,
  178. 'min_order_quantity': product.get('min_order_quantity'),
  179. 'unit': product.get('unit', ''),
  180. 'current_price': clean_price(str(product.get('price', 0))),
  181. 'original_price': clean_price(str(product.get('original_price', 0))) if product.get('original_price') else None,
  182. 'currency': product.get('currency', 'CNY'),
  183. 'sales_volume': product.get('sales_volume'),
  184. 'rating': product.get('rating'),
  185. 'review_count': product.get('review_count'),
  186. 'stock': product.get('stock'),
  187. 'description': product.get('description', ''),
  188. 'crawl_time': datetime.now()
  189. }
  190. def _prepare_price_data(self, product: Dict[str, Any]) -> Optional[Dict[str, Any]]:
  191. """
  192. 准备价格历史数据
  193. :param product: 商品数据
  194. :return: 价格数据字典或None
  195. """
  196. price = clean_price(str(product.get('price', 0)))
  197. if price <= 0:
  198. return None
  199. return {
  200. 'product_id': str(product.get('product_id', '')),
  201. 'price': price,
  202. 'original_price': clean_price(str(product.get('original_price', 0))) if product.get('original_price') else None,
  203. 'currency': product.get('currency', 'CNY'),
  204. 'platform': self.platform,
  205. 'source_url': product.get('url', ''),
  206. 'crawl_time': datetime.now(),
  207. 'price_type': 'wholesale' if product.get('is_wholesale', False) else 'retail',
  208. 'min_quantity': product.get('min_order_quantity'),
  209. 'max_quantity': product.get('max_order_quantity')
  210. }
  211. def close(self):
  212. """
  213. 关闭会话
  214. """
  215. if hasattr(self, 'session'):
  216. self.session.close()
  217. def __enter__(self):
  218. return self
  219. def __exit__(self, exc_type, exc_val, exc_tb):
  220. self.close()