main.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391
  1. #!/usr/bin/env python
  2. # -*- encoding: utf-8 -*-
  3. """
  4. @Contact : liuyuqi.gov@msn.cn
  5. @Time : 2024/05/03
  6. @License : Copyright © 2017-2024 liuyuqi. All Rights Reserved.
  7. @Desc : 商品价格爬虫主程序
  8. """
  9. import argparse
  10. import logging
  11. import logging.config
  12. import sys
  13. import os
  14. from concurrent.futures import ThreadPoolExecutor, as_completed
  15. from typing import List, Dict, Any, Optional
  16. from datetime import datetime
  17. sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
  18. from config.settings import LOGGING_CONFIG
  19. from models.product import init_db
  20. from crawlers.taobao import TaobaoCrawler
  21. from crawlers.jd import JdCrawler
  22. from crawlers.alibaba1688 import Alibaba1688Crawler
  23. from utils.db_utils import DBUtils
  24. def setup_logging():
  25. """配置日志"""
  26. log_dir = 'logs'
  27. if not os.path.exists(log_dir):
  28. os.makedirs(log_dir)
  29. logging.config.dictConfig(LOGGING_CONFIG)
  30. def init_database():
  31. """初始化数据库"""
  32. logger = logging.getLogger(__name__)
  33. logger.info("初始化数据库...")
  34. try:
  35. init_db()
  36. logger.info("数据库初始化成功")
  37. except Exception as e:
  38. logger.error(f"数据库初始化失败: {e}")
  39. sys.exit(1)
  40. def get_crawler(platform: str, cookie: str = None, proxy: str = None):
  41. """
  42. 获取对应平台的爬虫实例
  43. :param platform: 平台名称: taobao, jd, alibaba1688
  44. :param cookie: Cookie字符串
  45. :param proxy: 代理地址
  46. :return: 爬虫实例
  47. """
  48. platform = platform.lower()
  49. if platform == 'taobao':
  50. return TaobaoCrawler(cookie=cookie, proxy=proxy)
  51. elif platform == 'jd':
  52. return JdCrawler(cookie=cookie, proxy=proxy)
  53. elif platform in ['alibaba1688', '1688', 'alibaba']:
  54. return Alibaba1688Crawler(cookie=cookie, proxy=proxy)
  55. else:
  56. raise ValueError(f"不支持的平台: {platform}")
  57. def crawl_single_platform(platform: str, keywords: List[str], pages: int = 1,
  58. cookie: str = None, proxy: str = None,
  59. save_to_db: bool = True) -> List[Dict[str, Any]]:
  60. """
  61. 爬取单个平台的商品
  62. :param platform: 平台名称
  63. :param keywords: 关键词列表
  64. :param pages: 爬取页数
  65. :param cookie: Cookie
  66. :param proxy: 代理
  67. :param save_to_db: 是否保存到数据库
  68. :return: 爬取的商品列表
  69. """
  70. logger = logging.getLogger(__name__)
  71. logger.info(f"开始爬取 {platform} 平台,关键词: {keywords}, 页数: {pages}")
  72. all_products = []
  73. try:
  74. crawler = get_crawler(platform, cookie=cookie, proxy=proxy)
  75. for keyword in keywords:
  76. for page in range(1, pages + 1):
  77. logger.info(f"爬取 {platform} - 关键词: {keyword}, 第 {page} 页")
  78. products = crawler.search(keyword, page=page)
  79. for product in products:
  80. product['keyword'] = keyword
  81. product['page'] = page
  82. product['crawl_time'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
  83. all_products.append(product)
  84. logger.info(f"第 {page} 页获取到 {len(products)} 个商品")
  85. if page < pages:
  86. from utils.helpers import random_delay
  87. random_delay(1, 3)
  88. crawler.close()
  89. if save_to_db and all_products:
  90. save_products_to_db(all_products, platform)
  91. except Exception as e:
  92. logger.error(f"爬取 {platform} 平台失败: {e}")
  93. import traceback
  94. traceback.print_exc()
  95. logger.info(f"{platform} 平台爬取完成,共获取 {len(all_products)} 个商品")
  96. return all_products
  97. def save_products_to_db(products: List[Dict[str, Any]], platform: str):
  98. """
  99. 保存商品到数据库
  100. """
  101. logger = logging.getLogger(__name__)
  102. try:
  103. db = DBUtils()
  104. success_count = 0
  105. for product in products:
  106. product_data = {
  107. 'product_id': str(product.get('product_id', '')),
  108. 'name': product.get('name', ''),
  109. 'url': product.get('url', ''),
  110. 'image_url': product.get('image_url', ''),
  111. 'shop_name': product.get('shop_name', ''),
  112. 'platform': platform,
  113. 'is_wholesale': 1 if product.get('is_wholesale', False) else 0,
  114. 'min_order_quantity': product.get('min_order_quantity'),
  115. 'unit': product.get('unit', ''),
  116. 'current_price': product.get('price', 0),
  117. 'original_price': product.get('original_price'),
  118. 'sales_volume': product.get('sales_volume'),
  119. 'crawl_time': datetime.now()
  120. }
  121. db_product = db.add_product(product_data)
  122. if db_product:
  123. price_data = {
  124. 'product_id': str(product.get('product_id', '')),
  125. 'price': product.get('price', 0),
  126. 'original_price': product.get('original_price'),
  127. 'platform': platform,
  128. 'source_url': product.get('url', ''),
  129. 'price_type': 'wholesale' if product.get('is_wholesale', False) else 'retail',
  130. 'min_quantity': product.get('min_order_quantity')
  131. }
  132. db.add_price_history(price_data)
  133. success_count += 1
  134. logger.info(f"成功保存 {success_count}/{len(products)} 个商品到数据库")
  135. except Exception as e:
  136. logger.error(f"保存商品到数据库失败: {e}")
  137. import traceback
  138. traceback.print_exc()
  139. def run_crawl(platforms: List[str], keywords: List[str], pages: int = 1,
  140. concurrent: bool = True, max_workers: int = 3,
  141. cookie: str = None, proxy: str = None) -> Dict[str, List[Dict[str, Any]]]:
  142. """
  143. 运行爬取任务
  144. :param platforms: 平台列表
  145. :param keywords: 关键词列表
  146. :param pages: 每个关键词爬取的页数
  147. :param concurrent: 是否并发爬取
  148. :param max_workers: 并发线程数
  149. :param cookie: Cookie
  150. :param proxy: 代理
  151. :return: 各平台的商品数据
  152. """
  153. logger = logging.getLogger(__name__)
  154. logger.info(f"开始爬取任务,平台: {platforms}, 关键词: {keywords}, 页数: {pages}")
  155. results = {}
  156. if concurrent and len(platforms) > 1:
  157. logger.info(f"使用并发模式,最大线程数: {max_workers}")
  158. with ThreadPoolExecutor(max_workers=max_workers) as executor:
  159. future_to_platform = {
  160. executor.submit(
  161. crawl_single_platform,
  162. platform, keywords, pages, cookie, proxy
  163. ): platform
  164. for platform in platforms
  165. }
  166. for future in as_completed(future_to_platform):
  167. platform = future_to_platform[future]
  168. try:
  169. products = future.result()
  170. results[platform] = products
  171. except Exception as e:
  172. logger.error(f"{platform} 平台爬取出错: {e}")
  173. results[platform] = []
  174. else:
  175. for platform in platforms:
  176. products = crawl_single_platform(platform, keywords, pages, cookie, proxy)
  177. results[platform] = products
  178. total_count = sum(len(products) for products in results.values())
  179. logger.info(f"爬取任务完成,共获取 {total_count} 个商品")
  180. return results
  181. def search_products(keyword: str, platform: str = None, limit: int = 50):
  182. """
  183. 从数据库搜索商品
  184. """
  185. logger = logging.getLogger(__name__)
  186. try:
  187. db = DBUtils()
  188. products = db.search_products(keyword, platform=platform, limit=limit)
  189. if products:
  190. logger.info(f"找到 {len(products)} 个相关商品:")
  191. for i, product in enumerate(products, 1):
  192. price_str = f"¥{product.get('current_price', 0):.2f}"
  193. wholesale = " [批发]" if product.get('is_wholesale') else ""
  194. logger.info(f"{i}. {product.get('name')[:50]}... - {price_str}{wholesale}")
  195. logger.info(f" 平台: {product.get('platform')}, 店铺: {product.get('shop_name', '未知')}")
  196. logger.info(f" 链接: {product.get('url', 'N/A')}")
  197. else:
  198. logger.info("未找到相关商品")
  199. except Exception as e:
  200. logger.error(f"搜索商品失败: {e}")
  201. def show_stats():
  202. """
  203. 显示数据库统计信息
  204. """
  205. logger = logging.getLogger(__name__)
  206. try:
  207. db = DBUtils()
  208. total_count = db.get_product_count()
  209. taobao_count = db.get_product_count('taobao')
  210. jd_count = db.get_product_count('jd')
  211. alibaba_count = db.get_product_count('alibaba1688')
  212. logger.info("=" * 50)
  213. logger.info("数据库统计信息")
  214. logger.info("=" * 50)
  215. logger.info(f"总商品数: {total_count}")
  216. logger.info(f" 淘宝: {taobao_count}")
  217. logger.info(f" 京东: {jd_count}")
  218. logger.info(f" 1688批发: {alibaba_count}")
  219. logger.info("=" * 50)
  220. recent_products = db.get_all_products(limit=5)
  221. if recent_products:
  222. logger.info("\n最近添加的商品:")
  223. for i, product in enumerate(recent_products, 1):
  224. price_str = f"¥{product.get('current_price', 0):.2f}"
  225. logger.info(f"{i}. {product.get('name')[:40]}... - {price_str} ({product.get('platform')})")
  226. except Exception as e:
  227. logger.error(f"获取统计信息失败: {e}")
  228. def main():
  229. parser = argparse.ArgumentParser(
  230. description='商品价格爬虫 - 支持淘宝、京东、1688批发平台',
  231. formatter_class=argparse.RawDescriptionHelpFormatter,
  232. epilog='''
  233. 使用示例:
  234. # 初始化数据库
  235. python main.py --mode init
  236. # 爬取淘宝和京东的"手机"商品,各爬取2页
  237. python main.py --mode crawl --platforms taobao jd --keywords 手机 --pages 2
  238. # 爬取1688批发平台的"服装"商品
  239. python main.py --mode crawl --platforms alibaba1688 --keywords 服装 --pages 3
  240. # 使用Cookie和代理
  241. python main.py --mode crawl --platforms taobao --keywords 电脑 --cookie "your_cookie_here" --proxy "http://127.0.0.1:7890"
  242. # 搜索数据库中的商品
  243. python main.py --mode search --keyword 手机
  244. # 显示统计信息
  245. python main.py --mode stats
  246. '''
  247. )
  248. parser.add_argument('--mode',
  249. choices=['init', 'crawl', 'search', 'stats'],
  250. default='stats',
  251. help='运行模式: init(初始化数据库), crawl(爬取), search(搜索), stats(统计)')
  252. parser.add_argument('--platforms',
  253. nargs='+',
  254. default=['taobao'],
  255. help='目标平台: taobao, jd, alibaba1688 (可多选)')
  256. parser.add_argument('--keywords',
  257. nargs='+',
  258. default=['手机'],
  259. help='搜索关键词 (可多选)')
  260. parser.add_argument('--pages',
  261. type=int,
  262. default=1,
  263. help='每个关键词爬取的页数 (默认: 1)')
  264. parser.add_argument('--keyword',
  265. type=str,
  266. help='搜索模式下的关键词')
  267. parser.add_argument('--limit',
  268. type=int,
  269. default=50,
  270. help='搜索结果数量限制 (默认: 50)')
  271. parser.add_argument('--concurrent',
  272. action='store_true',
  273. default=True,
  274. help='是否并发爬取 (默认: True)')
  275. parser.add_argument('--workers',
  276. type=int,
  277. default=3,
  278. help='并发线程数 (默认: 3)')
  279. parser.add_argument('--cookie',
  280. type=str,
  281. help='平台登录Cookie')
  282. parser.add_argument('--proxy',
  283. type=str,
  284. help='代理服务器地址,如 http://127.0.0.1:7890')
  285. args = parser.parse_args()
  286. setup_logging()
  287. logger = logging.getLogger(__name__)
  288. try:
  289. if args.mode == 'init':
  290. init_database()
  291. elif args.mode == 'crawl':
  292. init_database()
  293. run_crawl(
  294. platforms=args.platforms,
  295. keywords=args.keywords,
  296. pages=args.pages,
  297. concurrent=args.concurrent,
  298. max_workers=args.workers,
  299. cookie=args.cookie,
  300. proxy=args.proxy
  301. )
  302. elif args.mode == 'search':
  303. if not args.keyword:
  304. logger.error("search模式需要提供 --keyword 参数")
  305. sys.exit(1)
  306. search_products(args.keyword, limit=args.limit)
  307. elif args.mode == 'stats':
  308. show_stats()
  309. except Exception as e:
  310. logger.error(f"程序执行出错: {e}")
  311. import traceback
  312. traceback.print_exc()
  313. sys.exit(1)
  314. if __name__ == '__main__':
  315. main()