taobao.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285
  1. import re
  2. import json
  3. import logging
  4. from typing import Dict, List, Optional, Any
  5. from urllib.parse import urlencode, urlparse, parse_qs
  6. from .base import BaseCrawler
  7. from utils.helpers import clean_price, parse_sales_volume
  8. class TaobaoCrawler(BaseCrawler):
  9. """
  10. 淘宝平台爬虫
  11. 注意:淘宝有较强的反爬机制,需要配置 Cookie 或使用代理
  12. """
  13. platform = 'taobao'
  14. def __init__(self, cookie: str = None, proxy: str = None):
  15. """
  16. 初始化淘宝爬虫
  17. :param cookie: 淘宝登录后的 Cookie 字符串
  18. :param proxy: 代理服务器地址,如 'http://127.0.0.1:7890'
  19. """
  20. super().__init__()
  21. self.cookie = cookie
  22. self.proxy = proxy
  23. if cookie:
  24. self.headers['Cookie'] = cookie
  25. if proxy:
  26. self.session.proxies = {
  27. 'http': proxy,
  28. 'https': proxy
  29. }
  30. def search(self, keyword: str, page: int = 1, sort: str = 'default', **kwargs) -> List[Dict[str, Any]]:
  31. """
  32. 搜索淘宝商品
  33. :param keyword: 搜索关键词
  34. :param page: 页码,从1开始
  35. :param sort: 排序方式: default(综合), sale-desc(销量), price-asc(价格升序), price-desc(价格降序)
  36. :return: 商品列表
  37. """
  38. self.logger.info(f"搜索淘宝商品: keyword={keyword}, page={page}")
  39. params = {
  40. 'q': keyword,
  41. 's': (page - 1) * 44,
  42. 'sort': sort
  43. }
  44. search_url = f"{self.config.get('search_url')}?{urlencode(params)}"
  45. self.logger.debug(f"搜索URL: {search_url}")
  46. response = self._make_request('GET', search_url)
  47. if not response:
  48. self.logger.warning(f"搜索请求失败: {keyword}")
  49. return []
  50. products = self._parse_search_result(response.text, search_url)
  51. self.logger.info(f"从搜索结果中解析到 {len(products)} 个商品")
  52. return products
  53. def _parse_search_result(self, html: str, source_url: str) -> List[Dict[str, Any]]:
  54. """
  55. 解析淘宝搜索结果页面
  56. 注意:淘宝搜索结果主要通过 JavaScript 渲染,需要从页面中的 JSON 数据提取
  57. """
  58. products = []
  59. g_page_config_match = re.search(r'g_page_config\s*=\s*({.*?});', html, re.DOTALL)
  60. if g_page_config_match:
  61. try:
  62. g_page_config = json.loads(g_page_config_match.group(1))
  63. auctions = g_page_config.get('mods', {}).get('itemlist', {}).get('data', {}).get('auctions', [])
  64. for auction in auctions:
  65. product = self._parse_auction_item(auction, source_url)
  66. if product:
  67. products.append(product)
  68. except json.JSONDecodeError as e:
  69. self.logger.error(f"解析 g_page_config 失败: {e}")
  70. if not products:
  71. products = self._parse_html_directly(html, source_url)
  72. return products
  73. def _parse_auction_item(self, auction: Dict, source_url: str) -> Optional[Dict[str, Any]]:
  74. """
  75. 解析单个拍卖商品数据
  76. """
  77. try:
  78. nid = auction.get('nid', '')
  79. if not nid:
  80. return None
  81. title = auction.get('raw_title', '') or auction.get('title', '')
  82. if not title:
  83. return None
  84. price = clean_price(auction.get('view_price', '0'))
  85. original_price = clean_price(auction.get('view_fee', '0')) or None
  86. pic_url = auction.get('pic_url', '')
  87. if pic_url and not pic_url.startswith('http'):
  88. pic_url = 'https:' + pic_url
  89. detail_url = auction.get('detail_url', '')
  90. if detail_url and not detail_url.startswith('http'):
  91. detail_url = 'https:' + detail_url
  92. sales_str = auction.get('view_sales', '')
  93. sales_volume = parse_sales_volume(sales_str)
  94. shop_name = auction.get('nick', '')
  95. is_tmall = auction.get('shopcard', {}).get('isTmall', False) if auction.get('shopcard') else False
  96. return {
  97. 'product_id': str(nid),
  98. 'name': title.strip(),
  99. 'price': price,
  100. 'original_price': original_price,
  101. 'image_url': pic_url,
  102. 'url': detail_url,
  103. 'shop_name': shop_name,
  104. 'sales_volume': sales_volume,
  105. 'is_tmall': is_tmall,
  106. 'source_url': source_url,
  107. 'is_wholesale': False,
  108. 'currency': 'CNY'
  109. }
  110. except Exception as e:
  111. self.logger.error(f"解析商品数据失败: {e}")
  112. return None
  113. def _parse_html_directly(self, html: str, source_url: str) -> List[Dict[str, Any]]:
  114. """
  115. 直接从 HTML 中解析商品(备用方法)
  116. """
  117. products = []
  118. item_pattern = r'<div[^>]*class="[^"]*item[^"]*"[^>]*data-id="(\d+)"[^>]*>(.*?)</div>\s*</div>\s*</div>'
  119. items = re.findall(item_pattern, html, re.DOTALL | re.IGNORECASE)
  120. for item_id, item_html in items:
  121. try:
  122. title_match = re.search(r'<a[^>]*class="[^"]*J_ClickStat[^"]*"[^>]*>(.*?)</a>', item_html, re.DOTALL)
  123. title = title_match.group(1) if title_match else ''
  124. title = re.sub(r'<[^>]+>', '', title).strip() if title else ''
  125. price_match = re.search(r'<strong[^>]*data-price="([\d.]+)"', item_html)
  126. price = clean_price(price_match.group(1)) if price_match else 0
  127. sales_match = re.search(r'<div[^>]*class="deal-cnt"[^>]*>([^<]+)</div>', item_html)
  128. sales_str = sales_match.group(1) if sales_match else ''
  129. sales_volume = parse_sales_volume(sales_str)
  130. shop_match = re.search(r'<a[^>]*class="shopname[^"]*"[^>]*>(.*?)</a>', item_html, re.DOTALL)
  131. shop_name = shop_match.group(1) if shop_match else ''
  132. shop_name = re.sub(r'<[^>]+>', '', shop_name).strip() if shop_name else ''
  133. img_match = re.search(r'<img[^>]*data-src="([^"]+)"', item_html)
  134. img_url = img_match.group(1) if img_match else ''
  135. if img_url and not img_url.startswith('http'):
  136. img_url = 'https:' + img_url
  137. if title and item_id:
  138. products.append({
  139. 'product_id': str(item_id),
  140. 'name': title,
  141. 'price': price,
  142. 'image_url': img_url,
  143. 'shop_name': shop_name,
  144. 'sales_volume': sales_volume,
  145. 'source_url': source_url,
  146. 'is_wholesale': False,
  147. 'currency': 'CNY'
  148. })
  149. except Exception as e:
  150. self.logger.error(f"解析HTML商品失败: {e}")
  151. continue
  152. return products
  153. def get_product_detail(self, product_id: str, **kwargs) -> Optional[Dict[str, Any]]:
  154. """
  155. 获取淘宝商品详情
  156. :param product_id: 商品ID (nid)
  157. :return: 商品详情
  158. """
  159. self.logger.info(f"获取淘宝商品详情: product_id={product_id}")
  160. detail_url = f"https://item.taobao.com/item.htm?id={product_id}"
  161. response = self._make_request('GET', detail_url)
  162. if not response:
  163. self.logger.warning(f"获取商品详情失败: {product_id}")
  164. return None
  165. return self._parse_product_detail(response.text, product_id, detail_url)
  166. def _parse_product_detail(self, html: str, product_id: str, source_url: str) -> Optional[Dict[str, Any]]:
  167. """
  168. 解析商品详情页面
  169. """
  170. try:
  171. title_match = re.search(r'<title>([^<]+)</title>', html)
  172. title = title_match.group(1).split('-')[0].strip() if title_match else ''
  173. price_match = re.search(r'"price"\s*:\s*"([\d.]+)"', html) or \
  174. re.search(r'"defaultItemPrice"\s*:\s*"([\d.]+)"', html)
  175. price = clean_price(price_match.group(1)) if price_match else 0
  176. shop_match = re.search(r'"nick"\s*:\s*"([^"]+)"', html) or \
  177. re.search(r'shopName["\']\s*[:=]\s*["\']([^"\']+)["\']', html)
  178. shop_name = shop_match.group(1) if shop_match else ''
  179. sales_match = re.search(r'"sellCount"\s*:\s*(\d+)', html) or \
  180. re.search(r'"totalSoldQuantity"\s*:\s*(\d+)', html)
  181. sales_volume = int(sales_match.group(1)) if sales_match else None
  182. return {
  183. 'product_id': str(product_id),
  184. 'name': title,
  185. 'price': price,
  186. 'url': source_url,
  187. 'shop_name': shop_name,
  188. 'sales_volume': sales_volume,
  189. 'is_wholesale': False,
  190. 'currency': 'CNY'
  191. }
  192. except Exception as e:
  193. self.logger.error(f"解析商品详情失败: {e}")
  194. return None
  195. def get_price(self, product_id: str, **kwargs) -> Optional[Dict[str, Any]]:
  196. """
  197. 获取商品价格
  198. :param product_id: 商品ID
  199. :return: 价格信息
  200. """
  201. detail = self.get_product_detail(product_id, **kwargs)
  202. if detail:
  203. return {
  204. 'product_id': str(product_id),
  205. 'price': detail.get('price', 0),
  206. 'original_price': detail.get('original_price'),
  207. 'currency': 'CNY',
  208. 'platform': self.platform,
  209. 'source_url': detail.get('url', ''),
  210. 'price_type': 'retail'
  211. }
  212. return None
  213. def set_cookie(self, cookie: str):
  214. """
  215. 设置 Cookie
  216. :param cookie: Cookie 字符串
  217. """
  218. self.cookie = cookie
  219. self.headers['Cookie'] = cookie
  220. def set_proxy(self, proxy: str):
  221. """
  222. 设置代理
  223. :param proxy: 代理服务器地址
  224. """
  225. self.proxy = proxy
  226. self.session.proxies = {
  227. 'http': proxy,
  228. 'https': proxy
  229. }