| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391 |
- #!/usr/bin/env python
- # -*- encoding: utf-8 -*-
- """
- @Contact : liuyuqi.gov@msn.cn
- @Time : 2024/05/03
- @License : Copyright © 2017-2024 liuyuqi. All Rights Reserved.
- @Desc : 商品价格爬虫主程序
- """
- import argparse
- import logging
- import logging.config
- import sys
- import os
- from concurrent.futures import ThreadPoolExecutor, as_completed
- from typing import List, Dict, Any, Optional
- from datetime import datetime
- sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
- from config.settings import LOGGING_CONFIG
- from models.product import init_db
- from crawlers.taobao import TaobaoCrawler
- from crawlers.jd import JdCrawler
- from crawlers.alibaba1688 import Alibaba1688Crawler
- from utils.db_utils import DBUtils
- def setup_logging():
- """配置日志"""
- log_dir = 'logs'
- if not os.path.exists(log_dir):
- os.makedirs(log_dir)
- logging.config.dictConfig(LOGGING_CONFIG)
- def init_database():
- """初始化数据库"""
- logger = logging.getLogger(__name__)
- logger.info("初始化数据库...")
- try:
- init_db()
- logger.info("数据库初始化成功")
- except Exception as e:
- logger.error(f"数据库初始化失败: {e}")
- sys.exit(1)
- def get_crawler(platform: str, cookie: str = None, proxy: str = None):
- """
- 获取对应平台的爬虫实例
- :param platform: 平台名称: taobao, jd, alibaba1688
- :param cookie: Cookie字符串
- :param proxy: 代理地址
- :return: 爬虫实例
- """
- platform = platform.lower()
-
- if platform == 'taobao':
- return TaobaoCrawler(cookie=cookie, proxy=proxy)
- elif platform == 'jd':
- return JdCrawler(cookie=cookie, proxy=proxy)
- elif platform in ['alibaba1688', '1688', 'alibaba']:
- return Alibaba1688Crawler(cookie=cookie, proxy=proxy)
- else:
- raise ValueError(f"不支持的平台: {platform}")
- def crawl_single_platform(platform: str, keywords: List[str], pages: int = 1,
- cookie: str = None, proxy: str = None,
- save_to_db: bool = True) -> List[Dict[str, Any]]:
- """
- 爬取单个平台的商品
- :param platform: 平台名称
- :param keywords: 关键词列表
- :param pages: 爬取页数
- :param cookie: Cookie
- :param proxy: 代理
- :param save_to_db: 是否保存到数据库
- :return: 爬取的商品列表
- """
- logger = logging.getLogger(__name__)
- logger.info(f"开始爬取 {platform} 平台,关键词: {keywords}, 页数: {pages}")
-
- all_products = []
-
- try:
- crawler = get_crawler(platform, cookie=cookie, proxy=proxy)
-
- for keyword in keywords:
- for page in range(1, pages + 1):
- logger.info(f"爬取 {platform} - 关键词: {keyword}, 第 {page} 页")
-
- products = crawler.search(keyword, page=page)
-
- for product in products:
- product['keyword'] = keyword
- product['page'] = page
- product['crawl_time'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
- all_products.append(product)
-
- logger.info(f"第 {page} 页获取到 {len(products)} 个商品")
-
- if page < pages:
- from utils.helpers import random_delay
- random_delay(1, 3)
-
- crawler.close()
-
- if save_to_db and all_products:
- save_products_to_db(all_products, platform)
-
- except Exception as e:
- logger.error(f"爬取 {platform} 平台失败: {e}")
- import traceback
- traceback.print_exc()
-
- logger.info(f"{platform} 平台爬取完成,共获取 {len(all_products)} 个商品")
- return all_products
- def save_products_to_db(products: List[Dict[str, Any]], platform: str):
- """
- 保存商品到数据库
- """
- logger = logging.getLogger(__name__)
-
- try:
- db = DBUtils()
- success_count = 0
-
- for product in products:
- product_data = {
- 'product_id': str(product.get('product_id', '')),
- 'name': product.get('name', ''),
- 'url': product.get('url', ''),
- 'image_url': product.get('image_url', ''),
- 'shop_name': product.get('shop_name', ''),
- 'platform': platform,
- 'is_wholesale': 1 if product.get('is_wholesale', False) else 0,
- 'min_order_quantity': product.get('min_order_quantity'),
- 'unit': product.get('unit', ''),
- 'current_price': product.get('price', 0),
- 'original_price': product.get('original_price'),
- 'sales_volume': product.get('sales_volume'),
- 'crawl_time': datetime.now()
- }
-
- db_product = db.add_product(product_data)
-
- if db_product:
- price_data = {
- 'product_id': str(product.get('product_id', '')),
- 'price': product.get('price', 0),
- 'original_price': product.get('original_price'),
- 'platform': platform,
- 'source_url': product.get('url', ''),
- 'price_type': 'wholesale' if product.get('is_wholesale', False) else 'retail',
- 'min_quantity': product.get('min_order_quantity')
- }
- db.add_price_history(price_data)
- success_count += 1
-
- logger.info(f"成功保存 {success_count}/{len(products)} 个商品到数据库")
-
- except Exception as e:
- logger.error(f"保存商品到数据库失败: {e}")
- import traceback
- traceback.print_exc()
- def run_crawl(platforms: List[str], keywords: List[str], pages: int = 1,
- concurrent: bool = True, max_workers: int = 3,
- cookie: str = None, proxy: str = None) -> Dict[str, List[Dict[str, Any]]]:
- """
- 运行爬取任务
- :param platforms: 平台列表
- :param keywords: 关键词列表
- :param pages: 每个关键词爬取的页数
- :param concurrent: 是否并发爬取
- :param max_workers: 并发线程数
- :param cookie: Cookie
- :param proxy: 代理
- :return: 各平台的商品数据
- """
- logger = logging.getLogger(__name__)
- logger.info(f"开始爬取任务,平台: {platforms}, 关键词: {keywords}, 页数: {pages}")
-
- results = {}
-
- if concurrent and len(platforms) > 1:
- logger.info(f"使用并发模式,最大线程数: {max_workers}")
-
- with ThreadPoolExecutor(max_workers=max_workers) as executor:
- future_to_platform = {
- executor.submit(
- crawl_single_platform,
- platform, keywords, pages, cookie, proxy
- ): platform
- for platform in platforms
- }
-
- for future in as_completed(future_to_platform):
- platform = future_to_platform[future]
- try:
- products = future.result()
- results[platform] = products
- except Exception as e:
- logger.error(f"{platform} 平台爬取出错: {e}")
- results[platform] = []
- else:
- for platform in platforms:
- products = crawl_single_platform(platform, keywords, pages, cookie, proxy)
- results[platform] = products
-
- total_count = sum(len(products) for products in results.values())
- logger.info(f"爬取任务完成,共获取 {total_count} 个商品")
-
- return results
- def search_products(keyword: str, platform: str = None, limit: int = 50):
- """
- 从数据库搜索商品
- """
- logger = logging.getLogger(__name__)
-
- try:
- db = DBUtils()
- products = db.search_products(keyword, platform=platform, limit=limit)
-
- if products:
- logger.info(f"找到 {len(products)} 个相关商品:")
- for i, product in enumerate(products, 1):
- price_str = f"¥{product.get('current_price', 0):.2f}"
- wholesale = " [批发]" if product.get('is_wholesale') else ""
- logger.info(f"{i}. {product.get('name')[:50]}... - {price_str}{wholesale}")
- logger.info(f" 平台: {product.get('platform')}, 店铺: {product.get('shop_name', '未知')}")
- logger.info(f" 链接: {product.get('url', 'N/A')}")
- else:
- logger.info("未找到相关商品")
-
- except Exception as e:
- logger.error(f"搜索商品失败: {e}")
- def show_stats():
- """
- 显示数据库统计信息
- """
- logger = logging.getLogger(__name__)
-
- try:
- db = DBUtils()
-
- total_count = db.get_product_count()
- taobao_count = db.get_product_count('taobao')
- jd_count = db.get_product_count('jd')
- alibaba_count = db.get_product_count('alibaba1688')
-
- logger.info("=" * 50)
- logger.info("数据库统计信息")
- logger.info("=" * 50)
- logger.info(f"总商品数: {total_count}")
- logger.info(f" 淘宝: {taobao_count}")
- logger.info(f" 京东: {jd_count}")
- logger.info(f" 1688批发: {alibaba_count}")
- logger.info("=" * 50)
-
- recent_products = db.get_all_products(limit=5)
- if recent_products:
- logger.info("\n最近添加的商品:")
- for i, product in enumerate(recent_products, 1):
- price_str = f"¥{product.get('current_price', 0):.2f}"
- logger.info(f"{i}. {product.get('name')[:40]}... - {price_str} ({product.get('platform')})")
-
- except Exception as e:
- logger.error(f"获取统计信息失败: {e}")
- def main():
- parser = argparse.ArgumentParser(
- description='商品价格爬虫 - 支持淘宝、京东、1688批发平台',
- formatter_class=argparse.RawDescriptionHelpFormatter,
- epilog='''
- 使用示例:
- # 初始化数据库
- python main.py --mode init
-
- # 爬取淘宝和京东的"手机"商品,各爬取2页
- python main.py --mode crawl --platforms taobao jd --keywords 手机 --pages 2
-
- # 爬取1688批发平台的"服装"商品
- python main.py --mode crawl --platforms alibaba1688 --keywords 服装 --pages 3
-
- # 使用Cookie和代理
- python main.py --mode crawl --platforms taobao --keywords 电脑 --cookie "your_cookie_here" --proxy "http://127.0.0.1:7890"
-
- # 搜索数据库中的商品
- python main.py --mode search --keyword 手机
-
- # 显示统计信息
- python main.py --mode stats
- '''
- )
-
- parser.add_argument('--mode',
- choices=['init', 'crawl', 'search', 'stats'],
- default='stats',
- help='运行模式: init(初始化数据库), crawl(爬取), search(搜索), stats(统计)')
-
- parser.add_argument('--platforms',
- nargs='+',
- default=['taobao'],
- help='目标平台: taobao, jd, alibaba1688 (可多选)')
-
- parser.add_argument('--keywords',
- nargs='+',
- default=['手机'],
- help='搜索关键词 (可多选)')
-
- parser.add_argument('--pages',
- type=int,
- default=1,
- help='每个关键词爬取的页数 (默认: 1)')
-
- parser.add_argument('--keyword',
- type=str,
- help='搜索模式下的关键词')
-
- parser.add_argument('--limit',
- type=int,
- default=50,
- help='搜索结果数量限制 (默认: 50)')
-
- parser.add_argument('--concurrent',
- action='store_true',
- default=True,
- help='是否并发爬取 (默认: True)')
-
- parser.add_argument('--workers',
- type=int,
- default=3,
- help='并发线程数 (默认: 3)')
-
- parser.add_argument('--cookie',
- type=str,
- help='平台登录Cookie')
-
- parser.add_argument('--proxy',
- type=str,
- help='代理服务器地址,如 http://127.0.0.1:7890')
-
- args = parser.parse_args()
-
- setup_logging()
- logger = logging.getLogger(__name__)
-
- try:
- if args.mode == 'init':
- init_database()
-
- elif args.mode == 'crawl':
- init_database()
- run_crawl(
- platforms=args.platforms,
- keywords=args.keywords,
- pages=args.pages,
- concurrent=args.concurrent,
- max_workers=args.workers,
- cookie=args.cookie,
- proxy=args.proxy
- )
-
- elif args.mode == 'search':
- if not args.keyword:
- logger.error("search模式需要提供 --keyword 参数")
- sys.exit(1)
- search_products(args.keyword, limit=args.limit)
-
- elif args.mode == 'stats':
- show_stats()
-
- except Exception as e:
- logger.error(f"程序执行出错: {e}")
- import traceback
- traceback.print_exc()
- sys.exit(1)
- if __name__ == '__main__':
- main()
|