#!/usr/bin/env python # -*- encoding: utf-8 -*- """ @Contact : liuyuqi.gov@msn.cn @Time : 2024/05/03 @License : Copyright © 2017-2024 liuyuqi. All Rights Reserved. @Desc : 商品价格爬虫主程序 """ import argparse import logging import logging.config import sys import os from concurrent.futures import ThreadPoolExecutor, as_completed from typing import List, Dict, Any, Optional from datetime import datetime sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from config.settings import LOGGING_CONFIG from models.product import init_db from crawlers.taobao import TaobaoCrawler from crawlers.jd import JdCrawler from crawlers.alibaba1688 import Alibaba1688Crawler from utils.db_utils import DBUtils def setup_logging(): """配置日志""" log_dir = 'logs' if not os.path.exists(log_dir): os.makedirs(log_dir) logging.config.dictConfig(LOGGING_CONFIG) def init_database(): """初始化数据库""" logger = logging.getLogger(__name__) logger.info("初始化数据库...") try: init_db() logger.info("数据库初始化成功") except Exception as e: logger.error(f"数据库初始化失败: {e}") sys.exit(1) def get_crawler(platform: str, cookie: str = None, proxy: str = None): """ 获取对应平台的爬虫实例 :param platform: 平台名称: taobao, jd, alibaba1688 :param cookie: Cookie字符串 :param proxy: 代理地址 :return: 爬虫实例 """ platform = platform.lower() if platform == 'taobao': return TaobaoCrawler(cookie=cookie, proxy=proxy) elif platform == 'jd': return JdCrawler(cookie=cookie, proxy=proxy) elif platform in ['alibaba1688', '1688', 'alibaba']: return Alibaba1688Crawler(cookie=cookie, proxy=proxy) else: raise ValueError(f"不支持的平台: {platform}") def crawl_single_platform(platform: str, keywords: List[str], pages: int = 1, cookie: str = None, proxy: str = None, save_to_db: bool = True) -> List[Dict[str, Any]]: """ 爬取单个平台的商品 :param platform: 平台名称 :param keywords: 关键词列表 :param pages: 爬取页数 :param cookie: Cookie :param proxy: 代理 :param save_to_db: 是否保存到数据库 :return: 爬取的商品列表 """ logger = logging.getLogger(__name__) logger.info(f"开始爬取 {platform} 平台,关键词: {keywords}, 页数: {pages}") all_products = [] try: crawler = get_crawler(platform, cookie=cookie, proxy=proxy) for keyword in keywords: for page in range(1, pages + 1): logger.info(f"爬取 {platform} - 关键词: {keyword}, 第 {page} 页") products = crawler.search(keyword, page=page) for product in products: product['keyword'] = keyword product['page'] = page product['crawl_time'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S') all_products.append(product) logger.info(f"第 {page} 页获取到 {len(products)} 个商品") if page < pages: from utils.helpers import random_delay random_delay(1, 3) crawler.close() if save_to_db and all_products: save_products_to_db(all_products, platform) except Exception as e: logger.error(f"爬取 {platform} 平台失败: {e}") import traceback traceback.print_exc() logger.info(f"{platform} 平台爬取完成,共获取 {len(all_products)} 个商品") return all_products def save_products_to_db(products: List[Dict[str, Any]], platform: str): """ 保存商品到数据库 """ logger = logging.getLogger(__name__) try: db = DBUtils() success_count = 0 for product in products: product_data = { 'product_id': str(product.get('product_id', '')), 'name': product.get('name', ''), 'url': product.get('url', ''), 'image_url': product.get('image_url', ''), 'shop_name': product.get('shop_name', ''), 'platform': platform, 'is_wholesale': 1 if product.get('is_wholesale', False) else 0, 'min_order_quantity': product.get('min_order_quantity'), 'unit': product.get('unit', ''), 'current_price': product.get('price', 0), 'original_price': product.get('original_price'), 'sales_volume': product.get('sales_volume'), 'crawl_time': datetime.now() } db_product = db.add_product(product_data) if db_product: price_data = { 'product_id': str(product.get('product_id', '')), 'price': product.get('price', 0), 'original_price': product.get('original_price'), 'platform': platform, 'source_url': product.get('url', ''), 'price_type': 'wholesale' if product.get('is_wholesale', False) else 'retail', 'min_quantity': product.get('min_order_quantity') } db.add_price_history(price_data) success_count += 1 logger.info(f"成功保存 {success_count}/{len(products)} 个商品到数据库") except Exception as e: logger.error(f"保存商品到数据库失败: {e}") import traceback traceback.print_exc() def run_crawl(platforms: List[str], keywords: List[str], pages: int = 1, concurrent: bool = True, max_workers: int = 3, cookie: str = None, proxy: str = None) -> Dict[str, List[Dict[str, Any]]]: """ 运行爬取任务 :param platforms: 平台列表 :param keywords: 关键词列表 :param pages: 每个关键词爬取的页数 :param concurrent: 是否并发爬取 :param max_workers: 并发线程数 :param cookie: Cookie :param proxy: 代理 :return: 各平台的商品数据 """ logger = logging.getLogger(__name__) logger.info(f"开始爬取任务,平台: {platforms}, 关键词: {keywords}, 页数: {pages}") results = {} if concurrent and len(platforms) > 1: logger.info(f"使用并发模式,最大线程数: {max_workers}") with ThreadPoolExecutor(max_workers=max_workers) as executor: future_to_platform = { executor.submit( crawl_single_platform, platform, keywords, pages, cookie, proxy ): platform for platform in platforms } for future in as_completed(future_to_platform): platform = future_to_platform[future] try: products = future.result() results[platform] = products except Exception as e: logger.error(f"{platform} 平台爬取出错: {e}") results[platform] = [] else: for platform in platforms: products = crawl_single_platform(platform, keywords, pages, cookie, proxy) results[platform] = products total_count = sum(len(products) for products in results.values()) logger.info(f"爬取任务完成,共获取 {total_count} 个商品") return results def search_products(keyword: str, platform: str = None, limit: int = 50): """ 从数据库搜索商品 """ logger = logging.getLogger(__name__) try: db = DBUtils() products = db.search_products(keyword, platform=platform, limit=limit) if products: logger.info(f"找到 {len(products)} 个相关商品:") for i, product in enumerate(products, 1): price_str = f"¥{product.get('current_price', 0):.2f}" wholesale = " [批发]" if product.get('is_wholesale') else "" logger.info(f"{i}. {product.get('name')[:50]}... - {price_str}{wholesale}") logger.info(f" 平台: {product.get('platform')}, 店铺: {product.get('shop_name', '未知')}") logger.info(f" 链接: {product.get('url', 'N/A')}") else: logger.info("未找到相关商品") except Exception as e: logger.error(f"搜索商品失败: {e}") def show_stats(): """ 显示数据库统计信息 """ logger = logging.getLogger(__name__) try: db = DBUtils() total_count = db.get_product_count() taobao_count = db.get_product_count('taobao') jd_count = db.get_product_count('jd') alibaba_count = db.get_product_count('alibaba1688') logger.info("=" * 50) logger.info("数据库统计信息") logger.info("=" * 50) logger.info(f"总商品数: {total_count}") logger.info(f" 淘宝: {taobao_count}") logger.info(f" 京东: {jd_count}") logger.info(f" 1688批发: {alibaba_count}") logger.info("=" * 50) recent_products = db.get_all_products(limit=5) if recent_products: logger.info("\n最近添加的商品:") for i, product in enumerate(recent_products, 1): price_str = f"¥{product.get('current_price', 0):.2f}" logger.info(f"{i}. {product.get('name')[:40]}... - {price_str} ({product.get('platform')})") except Exception as e: logger.error(f"获取统计信息失败: {e}") def main(): parser = argparse.ArgumentParser( description='商品价格爬虫 - 支持淘宝、京东、1688批发平台', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=''' 使用示例: # 初始化数据库 python main.py --mode init # 爬取淘宝和京东的"手机"商品,各爬取2页 python main.py --mode crawl --platforms taobao jd --keywords 手机 --pages 2 # 爬取1688批发平台的"服装"商品 python main.py --mode crawl --platforms alibaba1688 --keywords 服装 --pages 3 # 使用Cookie和代理 python main.py --mode crawl --platforms taobao --keywords 电脑 --cookie "your_cookie_here" --proxy "http://127.0.0.1:7890" # 搜索数据库中的商品 python main.py --mode search --keyword 手机 # 显示统计信息 python main.py --mode stats ''' ) parser.add_argument('--mode', choices=['init', 'crawl', 'search', 'stats'], default='stats', help='运行模式: init(初始化数据库), crawl(爬取), search(搜索), stats(统计)') parser.add_argument('--platforms', nargs='+', default=['taobao'], help='目标平台: taobao, jd, alibaba1688 (可多选)') parser.add_argument('--keywords', nargs='+', default=['手机'], help='搜索关键词 (可多选)') parser.add_argument('--pages', type=int, default=1, help='每个关键词爬取的页数 (默认: 1)') parser.add_argument('--keyword', type=str, help='搜索模式下的关键词') parser.add_argument('--limit', type=int, default=50, help='搜索结果数量限制 (默认: 50)') parser.add_argument('--concurrent', action='store_true', default=True, help='是否并发爬取 (默认: True)') parser.add_argument('--workers', type=int, default=3, help='并发线程数 (默认: 3)') parser.add_argument('--cookie', type=str, help='平台登录Cookie') parser.add_argument('--proxy', type=str, help='代理服务器地址,如 http://127.0.0.1:7890') args = parser.parse_args() setup_logging() logger = logging.getLogger(__name__) try: if args.mode == 'init': init_database() elif args.mode == 'crawl': init_database() run_crawl( platforms=args.platforms, keywords=args.keywords, pages=args.pages, concurrent=args.concurrent, max_workers=args.workers, cookie=args.cookie, proxy=args.proxy ) elif args.mode == 'search': if not args.keyword: logger.error("search模式需要提供 --keyword 参数") sys.exit(1) search_products(args.keyword, limit=args.limit) elif args.mode == 'stats': show_stats() except Exception as e: logger.error(f"程序执行出错: {e}") import traceback traceback.print_exc() sys.exit(1) if __name__ == '__main__': main()