Browse Source

feat: 实现多平台商品价格爬虫系统

添加淘宝、京东和1688批发平台的爬虫实现
包含数据库模型、工具函数和配置文件
支持并发爬取、价格历史追踪和数据库存储
完善README文档说明使用方法和项目结构
liuyuqi-cnb 4 days ago
parent
commit
e68ff1aaac
17 changed files with 2696 additions and 1 deletions
  1. 15 0
      .env.example
  2. 43 0
      .gitignore
  3. 264 1
      README.md
  4. 1 0
      config/__init__.py
  5. 91 0
      config/settings.py
  6. 4 0
      crawlers/__init__.py
  7. 453 0
      crawlers/alibaba1688.py
  8. 259 0
      crawlers/base.py
  9. 387 0
      crawlers/jd.py
  10. 285 0
      crawlers/taobao.py
  11. 391 0
      main.py
  12. 1 0
      models/__init__.py
  13. 122 0
      models/product.py
  14. 6 0
      requirements.txt
  15. 2 0
      utils/__init__.py
  16. 193 0
      utils/db_utils.py
  17. 179 0
      utils/helpers.py

+ 15 - 0
.env.example

@@ -0,0 +1,15 @@
+# 数据库配置
+DB_HOST=127.0.0.1
+DB_PORT=3306
+DB_NAME=price_crawler
+DB_USER=root
+DB_PASSWORD=your_password
+
+# 可选:代理配置
+# HTTP_PROXY=http://127.0.0.1:7890
+# HTTPS_PROXY=http://127.0.0.1:7890
+
+# 可选:各平台Cookie配置
+# TAOBAO_COOKIE=your_taobao_cookie_here
+# JD_COOKIE=your_jd_cookie_here
+# ALIBABA1688_COOKIE=your_1688_cookie_here

+ 43 - 0
.gitignore

@@ -0,0 +1,43 @@
+# 环境变量
+.env
+.env.local
+
+# 日志目录
+logs/
+
+# 数据目录
+data/
+
+# Python
+__pycache__/
+*.py[cod]
+*$py.class
+*.so
+.Python
+*.egg
+*.egg-info/
+dist/
+build/
+eggs/
+.eggs/
+lib/
+lib64/
+
+# IDE
+.idea/
+.vscode/
+*.swp
+*.swo
+*~
+
+# 测试
+.pytest_cache/
+.coverage
+htmlcov/
+.tox/
+
+# 虚拟环境
+venv/
+env/
+.venv/
+.env/

+ 264 - 1
README.md

@@ -1,3 +1,266 @@
 # crawl_price
 # crawl_price
 
 
-获取各平台商品价格,批发价格
+获取各平台商品价格,批发价格
+
+## 功能特性
+
+- 支持多平台:淘宝、京东、1688批发平台
+- 支持批发价格获取(1688平台)
+- 支持并发爬取
+- 支持 Cookie 和代理配置
+- 数据持久化到 MySQL 数据库
+- 价格历史记录追踪
+
+## 项目结构
+
+```
+crawl_price/
+├── config/                 # 配置文件
+│   ├── __init__.py
+│   └── settings.py         # 数据库、爬虫、日志配置
+├── crawlers/               # 爬虫实现
+│   ├── __init__.py
+│   ├── base.py             # 爬虫基类
+│   ├── taobao.py           # 淘宝爬虫
+│   ├── jd.py               # 京东爬虫
+│   └── alibaba1688.py      # 1688批发平台爬虫
+├── models/                 # 数据库模型
+│   ├── __init__.py
+│   └── product.py          # 商品和价格历史模型
+├── utils/                  # 工具函数
+│   ├── __init__.py
+│   ├── db_utils.py         # 数据库操作工具
+│   └── helpers.py          # 通用辅助函数
+├── logs/                   # 日志目录
+├── main.py                 # 主程序入口
+├── requirements.txt        # 依赖文件
+├── .env.example            # 环境变量示例
+└── .gitignore
+```
+
+## 安装
+
+### 1. 安装依赖
+
+```bash
+pip install -r requirements.txt
+```
+
+### 2. 配置环境变量
+
+复制 `.env.example` 为 `.env` 并修改配置:
+
+```bash
+cp .env.example .env
+```
+
+编辑 `.env` 文件:
+
+```env
+# 数据库配置
+DB_HOST=127.0.0.1
+DB_PORT=3306
+DB_NAME=price_crawler
+DB_USER=root
+DB_PASSWORD=your_password
+
+# 可选:代理配置
+# HTTP_PROXY=http://127.0.0.1:7890
+# HTTPS_PROXY=http://127.0.0.1:7890
+```
+
+### 3. 创建数据库
+
+```sql
+CREATE DATABASE price_crawler CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
+```
+
+## 使用方法
+
+### 查看帮助
+
+```bash
+python main.py --help
+```
+
+### 初始化数据库
+
+```bash
+python main.py --mode init
+```
+
+### 爬取商品
+
+#### 基本用法
+
+```bash
+# 爬取淘宝的"手机"商品
+python main.py --mode crawl --platforms taobao --keywords 手机
+
+# 爬取淘宝和京东的"手机"商品,各爬取2页
+python main.py --mode crawl --platforms taobao jd --keywords 手机 --pages 2
+
+# 爬取1688批发平台的"服装"商品
+python main.py --mode crawl --platforms alibaba1688 --keywords 服装 --pages 3
+
+# 多个关键词
+python main.py --mode crawl --platforms taobao --keywords 手机 电脑 平板 --pages 1
+```
+
+#### 并发爬取
+
+```bash
+# 使用3个线程并发爬取多个平台
+python main.py --mode crawl --platforms taobao jd alibaba1688 --keywords 手机 --pages 2 --workers 3
+```
+
+#### 使用 Cookie 和代理
+
+由于淘宝、京东、1688等平台有较强的反爬机制,建议使用 Cookie 或代理:
+
+```bash
+# 使用 Cookie
+python main.py --mode crawl --platforms taobao --keywords 手机 --cookie "your_cookie_here"
+
+# 使用代理
+python main.py --mode crawl --platforms taobao --keywords 手机 --proxy "http://127.0.0.1:7890"
+
+# 同时使用 Cookie 和代理
+python main.py --mode crawl --platforms taobao --keywords 手机 --cookie "your_cookie" --proxy "http://127.0.0.1:7890"
+```
+
+### 搜索数据库中的商品
+
+```bash
+# 搜索所有平台的"手机"商品
+python main.py --mode search --keyword 手机
+
+# 搜索指定平台
+python main.py --mode search --keyword 手机 --platform taobao
+
+# 限制返回数量
+python main.py --mode search --keyword 手机 --limit 20
+```
+
+### 显示统计信息
+
+```bash
+python main.py --mode stats
+```
+
+## 数据库模型
+
+### products 表(商品表)
+
+| 字段 | 类型 | 说明 |
+|------|------|------|
+| id | Integer | 主键 |
+| product_id | String(100) | 平台商品ID |
+| name | String(500) | 商品名称 |
+| url | Text | 商品链接 |
+| image_url | Text | 商品图片 |
+| shop_name | String(200) | 店铺名称 |
+| platform | String(50) | 平台: taobao/jd/alibaba1688 |
+| is_wholesale | Integer | 是否批发: 0-否, 1-是 |
+| min_order_quantity | Integer | 最小起订量(批发) |
+| current_price | Float | 当前价格 |
+| original_price | Float | 原价 |
+| sales_volume | Integer | 销量 |
+| create_time | DateTime | 创建时间 |
+| update_time | DateTime | 更新时间 |
+| crawl_time | DateTime | 爬取时间 |
+
+### price_history 表(价格历史表)
+
+| 字段 | 类型 | 说明 |
+|------|------|------|
+| id | Integer | 主键 |
+| product_id | String(100) | 商品ID |
+| price | Float | 价格 |
+| original_price | Float | 原价 |
+| platform | String(50) | 平台 |
+| price_type | String(50) | 价格类型: retail/wholesale |
+| min_quantity | Integer | 最小起订量 |
+| crawl_time | DateTime | 爬取时间 |
+
+## 各平台特点
+
+### 淘宝 (Taobao)
+- 支持搜索、商品详情获取
+- 需要配置 Cookie 以获得更好的爬取效果
+- 价格为零售价
+
+### 京东 (JD)
+- 支持搜索、商品详情获取
+- 有独立的价格 API 接口
+- 价格为零售价
+
+### 1688 批发平台 (Alibaba1688)
+- 支持搜索、商品详情获取
+- **重点:批发价格区间**(根据起订量不同价格不同)
+- 支持最小起订量获取
+- 价格为批发价
+
+## 代码示例
+
+### 基本用法
+
+```python
+from crawlers.taobao import TaobaoCrawler
+from crawlers.jd import JdCrawler
+from crawlers.alibaba1688 import Alibaba1688Crawler
+
+# 淘宝爬虫
+with TaobaoCrawler(cookie='your_cookie', proxy='http://127.0.0.1:7890') as crawler:
+    products = crawler.search('手机', page=1)
+    for product in products:
+        print(f"{product['name']}: ¥{product['price']}")
+
+# 1688批发爬虫
+with Alibaba1688Crawler() as crawler:
+    products = crawler.search('服装', page=1)
+    for product in products:
+        print(f"{product['name']}")
+        print(f"  起订量: {product.get('min_order_quantity', 'N/A')}")
+        print(f"  价格区间: {product.get('price_ranges', [])}")
+```
+
+### 批量爬取并保存到数据库
+
+```python
+from main import run_crawl, init_database
+
+# 初始化数据库
+init_database()
+
+# 并发爬取多个平台
+results = run_crawl(
+    platforms=['taobao', 'jd', 'alibaba1688'],
+    keywords=['手机', '电脑'],
+    pages=2,
+    concurrent=True,
+    max_workers=3
+)
+
+print(f"淘宝: {len(results.get('taobao', []))} 个商品")
+print(f"京东: {len(results.get('jd', []))} 个商品")
+print(f"1688: {len(results.get('alibaba1688', []))} 个商品")
+```
+
+## 注意事项
+
+1. **反爬机制**:淘宝、京东、1688等平台有较强的反爬机制,建议:
+   - 配置有效的 Cookie
+   - 使用代理 IP 池
+   - 设置合理的请求间隔
+   - 避免短时间内大量请求
+
+2. **Cookie 获取**:登录对应平台后,从浏览器开发者工具中复制 Cookie。
+
+3. **频率控制**:程序已内置随机延迟机制,但仍需注意爬取频率,避免被封禁。
+
+4. **数据更新**:价格可能随时变动,建议定期重新爬取以获取最新价格。
+
+## License
+
+MIT License

+ 1 - 0
config/__init__.py

@@ -0,0 +1 @@
+from .settings import DATABASE_CONFIG, CRAWLER_CONFIG, LOGGING_CONFIG

+ 91 - 0
config/settings.py

@@ -0,0 +1,91 @@
+import os
+from dotenv import load_dotenv
+
+load_dotenv()
+
+DATABASE_CONFIG = {
+    'host': os.getenv('DB_HOST', 'localhost'),
+    'port': int(os.getenv('DB_PORT', 3306)),
+    'user': os.getenv('DB_USER', 'root'),
+    'password': os.getenv('DB_PASSWORD', ''),
+    'database': os.getenv('DB_NAME', 'price_crawler'),
+    'charset': 'utf8mb4'
+}
+
+CRAWLER_CONFIG = {
+    'taobao': {
+        'base_url': 'https://s.taobao.com',
+        'search_url': 'https://s.taobao.com/search',
+        'headers': {
+            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
+            'Referer': 'https://www.taobao.com',
+            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
+            'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
+        },
+        'timeout': 30,
+        'retry_times': 3,
+        'retry_delay': 2,
+        'delay_range': (1, 3),
+    },
+    'jd': {
+        'base_url': 'https://search.jd.com',
+        'search_url': 'https://search.jd.com/Search',
+        'headers': {
+            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
+            'Referer': 'https://www.jd.com',
+            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
+            'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
+        },
+        'timeout': 30,
+        'retry_times': 3,
+        'retry_delay': 2,
+        'delay_range': (1, 3),
+    },
+    'alibaba1688': {
+        'base_url': 'https://s.1688.com',
+        'search_url': 'https://s.1688.com/selloffer/offer_search.htm',
+        'headers': {
+            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
+            'Referer': 'https://www.1688.com',
+            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
+            'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
+        },
+        'timeout': 30,
+        'retry_times': 3,
+        'retry_delay': 2,
+        'delay_range': (1, 3),
+    }
+}
+
+LOGGING_CONFIG = {
+    'version': 1,
+    'disable_existing_loggers': False,
+    'formatters': {
+        'standard': {
+            'format': '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
+        },
+    },
+    'handlers': {
+        'console': {
+            'class': 'logging.StreamHandler',
+            'level': 'INFO',
+            'formatter': 'standard'
+        },
+        'file': {
+            'class': 'logging.handlers.RotatingFileHandler',
+            'level': 'INFO',
+            'formatter': 'standard',
+            'filename': 'logs/crawler.log',
+            'maxBytes': 1024*1024*10,
+            'backupCount': 5,
+            'encoding': 'utf-8'
+        }
+    },
+    'loggers': {
+        '': {
+            'handlers': ['console', 'file'],
+            'level': 'INFO',
+            'propagate': True
+        }
+    }
+}

+ 4 - 0
crawlers/__init__.py

@@ -0,0 +1,4 @@
+from .base import BaseCrawler
+from .taobao import TaobaoCrawler
+from .jd import JdCrawler
+from .alibaba1688 import Alibaba1688Crawler

+ 453 - 0
crawlers/alibaba1688.py

@@ -0,0 +1,453 @@
+import re
+import json
+import logging
+from typing import Dict, List, Optional, Any, Tuple
+from urllib.parse import urlencode, quote, urlparse, parse_qs
+
+from .base import BaseCrawler
+from utils.helpers import clean_price, parse_sales_volume, extract_numbers
+
+
+class Alibaba1688Crawler(BaseCrawler):
+    """
+    1688批发平台爬虫
+    重点关注批发价格、起订量等批发相关信息
+    """
+    
+    platform = 'alibaba1688'
+    
+    def __init__(self, cookie: str = None, proxy: str = None):
+        """
+        初始化1688爬虫
+        :param cookie: 1688登录后的 Cookie 字符串
+        :param proxy: 代理服务器地址
+        """
+        super().__init__()
+        self.cookie = cookie
+        self.proxy = proxy
+        
+        if cookie:
+            self.headers['Cookie'] = cookie
+        
+        if proxy:
+            self.session.proxies = {
+                'http': proxy,
+                'https': proxy
+            }
+    
+    def search(self, keyword: str, page: int = 1, sort: str = 'default', 
+               price_range: Tuple[float, float] = None, **kwargs) -> List[Dict[str, Any]]:
+        """
+        搜索1688商品
+        :param keyword: 搜索关键词
+        :param page: 页码,从1开始
+        :param sort: 排序方式: 
+            - default (综合)
+            - va (销量)
+            - price_asc (价格升序)
+            - price_desc (价格降序)
+            - bookTime (最新发布)
+        :param price_range: 价格范围 (min_price, max_price)
+        :return: 商品列表
+        """
+        self.logger.info(f"搜索1688商品: keyword={keyword}, page={page}")
+        
+        params = {
+            'keywords': keyword,
+            'pageSize': 40,
+            'beginPage': page,
+        }
+        
+        if sort and sort != 'default':
+            params['sortType'] = sort
+        
+        if price_range:
+            min_price, max_price = price_range
+            if min_price:
+                params['filtPriceMin'] = min_price
+            if max_price:
+                params['filtPriceMax'] = max_price
+        
+        search_url = f"{self.config.get('search_url')}?{urlencode(params)}"
+        
+        self.logger.debug(f"搜索URL: {search_url}")
+        
+        response = self._make_request('GET', search_url)
+        
+        if not response:
+            self.logger.warning(f"搜索请求失败: {keyword}")
+            return []
+        
+        products = self._parse_search_result(response.text, search_url)
+        
+        self.logger.info(f"从搜索结果中解析到 {len(products)} 个商品")
+        
+        return products
+    
+    def _parse_search_result(self, html: str, source_url: str) -> List[Dict[str, Any]]:
+        """
+        解析1688搜索结果页面
+        """
+        products = []
+        
+        json_pattern = r'window\.pageData\s*=\s*({.*?});\s*</script>'
+        json_match = re.search(json_pattern, html, re.DOTALL)
+        
+        if json_match:
+            try:
+                page_data = json.loads(json_match.group(1))
+                items = page_data.get('data', {}).get('searchList', []) or \
+                        page_data.get('data', {}).get('offerList', []) or \
+                        page_data.get('offerList', [])
+                
+                for item in items:
+                    product = self._parse_json_item(item, source_url)
+                    if product:
+                        products.append(product)
+                        
+            except json.JSONDecodeError as e:
+                self.logger.error(f"解析pageData失败: {e}")
+        
+        if not products:
+            products = self._parse_html_directly(html, source_url)
+        
+        return products
+    
+    def _parse_json_item(self, item: Dict, source_url: str) -> Optional[Dict[str, Any]]:
+        """
+        解析JSON格式的商品数据
+        """
+        try:
+            offer_id = item.get('offerId', '') or item.get('id', '')
+            if not offer_id:
+                return None
+            
+            title = item.get('subject', '') or item.get('title', '')
+            title = title.strip()
+            if not title:
+                return None
+            
+            price_info = self._extract_price_info(item)
+            
+            image_url = item.get('imgUrl', '') or item.get('imageUrl', '')
+            if image_url and not image_url.startswith('http'):
+                image_url = 'https:' + image_url
+            
+            detail_url = item.get('detailUrl', '') or f"https://detail.1688.com/offer/{offer_id}.html"
+            if detail_url and not detail_url.startswith('http'):
+                detail_url = 'https:' + detail_url
+            
+            shop_name = item.get('companyName', '') or item.get('shopName', '')
+            
+            sales_str = item.get('soldQuantity', '') or item.get('salesNum', '') or str(item.get('bookedCount', ''))
+            sales_volume = parse_sales_volume(str(sales_str)) if sales_str else None
+            
+            min_order = item.get('minOrder', '') or item.get('startQuantity', '')
+            min_order_quantity = self._parse_quantity(min_order)
+            
+            unit = item.get('unit', '') or item.get('quantityUnit', '件')
+            
+            is_wholesale = True
+            
+            return {
+                'product_id': str(offer_id),
+                'name': title,
+                'price': price_info.get('min_price', 0),
+                'original_price': price_info.get('original_price'),
+                'price_ranges': price_info.get('price_ranges', []),
+                'image_url': image_url,
+                'url': detail_url,
+                'shop_name': shop_name,
+                'sales_volume': sales_volume,
+                'min_order_quantity': min_order_quantity,
+                'unit': unit,
+                'source_url': source_url,
+                'is_wholesale': is_wholesale,
+                'currency': 'CNY'
+            }
+            
+        except Exception as e:
+            self.logger.error(f"解析JSON商品失败: {e}")
+            return None
+    
+    def _extract_price_info(self, item: Dict) -> Dict[str, Any]:
+        """
+        从商品数据中提取价格信息
+        1688的价格通常是区间价格,根据起订量不同价格不同
+        """
+        price_info = {
+            'min_price': 0,
+            'max_price': 0,
+            'original_price': None,
+            'price_ranges': []
+        }
+        
+        price_ranges = item.get('priceRanges', []) or item.get('priceRangeList', [])
+        
+        if price_ranges:
+            prices = []
+            for pr in price_ranges:
+                price = clean_price(str(pr.get('price', '0')))
+                min_quantity = pr.get('quantity', 0) or pr.get('startQuantity', 0)
+                max_quantity = pr.get('endQuantity')
+                
+                prices.append(price)
+                price_info['price_ranges'].append({
+                    'min_quantity': min_quantity,
+                    'max_quantity': max_quantity,
+                    'price': price
+                })
+            
+            if prices:
+                price_info['min_price'] = min(prices)
+                price_info['max_price'] = max(prices)
+        else:
+            price_str = item.get('price', '') or item.get('displayPrice', '')
+            if price_str:
+                price = clean_price(str(price_str))
+                price_info['min_price'] = price
+                price_info['max_price'] = price
+        
+        original_price_str = item.get('originalPrice', '') or item.get('marketPrice', '')
+        if original_price_str:
+            price_info['original_price'] = clean_price(str(original_price_str))
+        
+        return price_info
+    
+    def _parse_quantity(self, quantity_str: Any) -> Optional[int]:
+        """
+        解析数量字符串
+        """
+        if not quantity_str:
+            return None
+        
+        quantity_str = str(quantity_str).strip()
+        numbers = extract_numbers(quantity_str)
+        if numbers:
+            return int(numbers[0])
+        return None
+    
+    def _parse_html_directly(self, html: str, source_url: str) -> List[Dict[str, Any]]:
+        """
+        直接从HTML解析商品(备用方法)
+        """
+        products = []
+        
+        offer_pattern = r'data-offerid="(\d+)"[^>]*>(.*?)</div>\s*</div>\s*</div>'
+        offers = re.findall(offer_pattern, html, re.DOTALL | re.IGNORECASE)
+        
+        for offer_id, offer_html in offers:
+            try:
+                title_match = re.search(r'<a[^>]*title="([^"]+)"', offer_html) or \
+                             re.search(r'<div[^>]*class="[^"]*title[^"]*"[^>]*>.*?<a[^>]*>(.*?)</a>', offer_html, re.DOTALL)
+                title = ''
+                if title_match:
+                    title = title_match.group(1)
+                    title = re.sub(r'<[^>]+>', '', title).strip()
+                
+                if not title:
+                    continue
+                
+                price_match = re.search(r'<div[^>]*class="[^"]*price[^"]*"[^>]*>.*?<span[^>]*>([\d¥.,]+)</span>', offer_html, re.DOTALL | re.IGNORECASE) or \
+                             re.search(r'¥([\d.]+)', offer_html)
+                price = clean_price(price_match.group(1)) if price_match else 0
+                
+                img_match = re.search(r'<img[^>]*data-src="([^"]+)"', offer_html) or \
+                           re.search(r'<img[^>]*src="([^"]+)"', offer_html)
+                img_url = img_match.group(1) if img_match else ''
+                if img_url and not img_url.startswith('http'):
+                    img_url = 'https:' + img_url
+                
+                shop_match = re.search(r'<div[^>]*class="[^"]*company[^"]*"[^>]*>.*?<a[^>]*>(.*?)</a>', offer_html, re.DOTALL | re.IGNORECASE)
+                shop_name = shop_match.group(1).strip() if shop_match else ''
+                shop_name = re.sub(r'<[^>]+>', '', shop_name)
+                
+                sales_match = re.search(r'<span[^>]*class="[^"]*sales[^"]*"[^>]*>([^<]+)</span>', offer_html, re.IGNORECASE) or \
+                             re.search(r'成交量[::]\s*([\d万+]+)', offer_html)
+                sales_str = sales_match.group(1) if sales_match else ''
+                sales_volume = parse_sales_volume(sales_str)
+                
+                min_order_match = re.search(r'起订量?[::]\s*([\d]+)', offer_html) or \
+                                 re.search(r'<span[^>]*class="[^"]*moq[^"]*"[^>]*>([^<]+)</span>', offer_html, re.IGNORECASE)
+                min_order_quantity = self._parse_quantity(min_order_match.group(1)) if min_order_match else None
+                
+                detail_url = f"https://detail.1688.com/offer/{offer_id}.html"
+                
+                products.append({
+                    'product_id': str(offer_id),
+                    'name': title,
+                    'price': price,
+                    'image_url': img_url,
+                    'url': detail_url,
+                    'shop_name': shop_name,
+                    'sales_volume': sales_volume,
+                    'min_order_quantity': min_order_quantity,
+                    'source_url': source_url,
+                    'is_wholesale': True,
+                    'currency': 'CNY'
+                })
+                
+            except Exception as e:
+                self.logger.error(f"解析HTML商品失败: {e}")
+                continue
+        
+        return products
+    
+    def get_product_detail(self, product_id: str, **kwargs) -> Optional[Dict[str, Any]]:
+        """
+        获取1688商品详情
+        :param product_id: 商品ID (offerId)
+        :return: 商品详情
+        """
+        self.logger.info(f"获取1688商品详情: product_id={product_id}")
+        
+        detail_url = f"https://detail.1688.com/offer/{product_id}.html"
+        
+        response = self._make_request('GET', detail_url)
+        
+        if not response:
+            self.logger.warning(f"获取商品详情失败: {product_id}")
+            return None
+        
+        return self._parse_product_detail(response.text, product_id, detail_url)
+    
+    def _parse_product_detail(self, html: str, product_id: str, source_url: str) -> Optional[Dict[str, Any]]:
+        """
+        解析商品详情页面
+        """
+        try:
+            title_match = re.search(r'<title>([^<]+)</title>', html)
+            title = title_match.group(1).split('-')[0].strip() if title_match else ''
+            
+            json_pattern = r'window\.iDetailData\s*=\s*({.*?});\s*</script>'
+            json_match = re.search(json_pattern, html, re.DOTALL)
+            
+            price_info = {'min_price': 0, 'price_ranges': []}
+            shop_name = ''
+            min_order_quantity = None
+            unit = '件'
+            sales_volume = None
+            
+            if json_match:
+                try:
+                    detail_data = json.loads(json_match.group(1))
+                    
+                    if not title:
+                        title = detail_data.get('subject', '') or detail_data.get('title', '')
+                    
+                    price_module = detail_data.get('price', {})
+                    if price_module:
+                        price_ranges = price_module.get('priceRanges', [])
+                        if price_ranges:
+                            prices = []
+                            for pr in price_ranges:
+                                price = clean_price(str(pr.get('price', '0')))
+                                min_qty = pr.get('quantity', 0)
+                                max_qty = pr.get('endQuantity')
+                                prices.append(price)
+                                price_info['price_ranges'].append({
+                                    'min_quantity': min_qty,
+                                    'max_quantity': max_qty,
+                                    'price': price
+                                })
+                            if prices:
+                                price_info['min_price'] = min(prices)
+                        else:
+                            price_str = price_module.get('showPrice', '') or price_module.get('price', '')
+                            if price_str:
+                                price_info['min_price'] = clean_price(str(price_str))
+                    
+                    shop_name = detail_data.get('companyName', '') or detail_data.get('shopName', '')
+                    
+                    sales_data = detail_data.get('trade', {})
+                    sales_volume = sales_data.get('soldQuantity') or sales_data.get('totalSoldQuantity')
+                    
+                    moq_data = detail_data.get('moq', {})
+                    min_order_quantity = moq_data.get('minOrderQuantity')
+                    unit = moq_data.get('unit', '件')
+                    
+                except json.JSONDecodeError as e:
+                    self.logger.error(f"解析商品详情JSON失败: {e}")
+            
+            if price_info['min_price'] <= 0:
+                price_match = re.search(r'["\']price["\']\s*:\s*["\']?([\d.]+)["\']?', html) or \
+                             re.search(r'¥([\d.]+)', html)
+                if price_match:
+                    price_info['min_price'] = clean_price(price_match.group(1))
+            
+            return {
+                'product_id': str(product_id),
+                'name': title,
+                'price': price_info['min_price'],
+                'price_ranges': price_info.get('price_ranges', []),
+                'url': source_url,
+                'shop_name': shop_name,
+                'sales_volume': sales_volume,
+                'min_order_quantity': min_order_quantity,
+                'unit': unit,
+                'is_wholesale': True,
+                'currency': 'CNY'
+            }
+            
+        except Exception as e:
+            self.logger.error(f"解析商品详情失败: {e}")
+            return None
+    
+    def get_price(self, product_id: str, **kwargs) -> Optional[Dict[str, Any]]:
+        """
+        获取商品价格
+        :param product_id: 商品ID
+        :return: 价格信息
+        """
+        detail = self.get_product_detail(product_id, **kwargs)
+        if detail:
+            return {
+                'product_id': str(product_id),
+                'price': detail.get('price', 0),
+                'original_price': detail.get('original_price'),
+                'currency': 'CNY',
+                'platform': self.platform,
+                'source_url': detail.get('url', ''),
+                'price_type': 'wholesale',
+                'price_ranges': detail.get('price_ranges', []),
+                'min_quantity': detail.get('min_order_quantity')
+            }
+        return None
+    
+    def get_wholesale_prices(self, product_id: str, **kwargs) -> List[Dict[str, Any]]:
+        """
+        获取批发价格区间
+        :param product_id: 商品ID
+        :return: 价格区间列表
+        """
+        detail = self.get_product_detail(product_id, **kwargs)
+        if detail:
+            price_ranges = detail.get('price_ranges', [])
+            if price_ranges:
+                return price_ranges
+            
+            price = detail.get('price', 0)
+            min_qty = detail.get('min_order_quantity', 1)
+            return [{
+                'min_quantity': min_qty,
+                'max_quantity': None,
+                'price': price
+            }]
+        return []
+    
+    def set_cookie(self, cookie: str):
+        """
+        设置 Cookie
+        """
+        self.cookie = cookie
+        self.headers['Cookie'] = cookie
+    
+    def set_proxy(self, proxy: str):
+        """
+        设置代理
+        """
+        self.proxy = proxy
+        self.session.proxies = {
+            'http': proxy,
+            'https': proxy
+        }

+ 259 - 0
crawlers/base.py

@@ -0,0 +1,259 @@
+import logging
+import time
+from abc import ABC, abstractmethod
+from typing import Dict, List, Optional, Any
+from datetime import datetime
+
+import requests
+from requests.adapters import HTTPAdapter
+from urllib3.util.retry import Retry
+
+from config.settings import CRAWLER_CONFIG
+from utils.helpers import retry, random_delay, clean_price
+
+
+class BaseCrawler(ABC):
+    """
+    爬虫基类
+    所有平台爬虫都应该继承这个类
+    """
+    
+    platform: str = ''
+    
+    def __init__(self):
+        self.logger = logging.getLogger(__name__)
+        self.config = CRAWLER_CONFIG.get(self.platform, {})
+        self.headers = self.config.get('headers', {})
+        self.timeout = self.config.get('timeout', 30)
+        self.retry_times = self.config.get('retry_times', 3)
+        self.retry_delay = self.config.get('retry_delay', 2)
+        self.delay_range = self.config.get('delay_range', (1, 3))
+        
+        self.session = self._create_session()
+        
+    def _create_session(self) -> requests.Session:
+        """
+        创建带有重试机制的会话
+        """
+        session = requests.Session()
+        session.headers.update(self.headers)
+        
+        retry_strategy = Retry(
+            total=self.retry_times,
+            backoff_factor=self.retry_delay,
+            status_forcelist=[429, 500, 502, 503, 504],
+            allowed_methods=["GET", "POST"]
+        )
+        
+        adapter = HTTPAdapter(max_retries=retry_strategy)
+        session.mount("http://", adapter)
+        session.mount("https://", adapter)
+        
+        return session
+    
+    def _make_request(self, method: str, url: str, params: Optional[Dict] = None,
+                      data: Optional[Dict] = None, json: Optional[Dict] = None,
+                      headers: Optional[Dict] = None) -> Optional[requests.Response]:
+        """
+        发送HTTP请求
+        :param method: HTTP方法 GET/POST
+        :param url: 请求URL
+        :param params: URL参数
+        :param data: 表单数据
+        :param json: JSON数据
+        :param headers: 额外的请求头
+        :return: Response对象或None
+        """
+        try:
+            request_headers = self.headers.copy()
+            if headers:
+                request_headers.update(headers)
+            
+            self.logger.debug(f"发送 {method} 请求: {url}")
+            
+            if method.upper() == 'GET':
+                response = self.session.get(
+                    url, 
+                    params=params, 
+                    headers=request_headers,
+                    timeout=self.timeout
+                )
+            else:
+                response = self.session.post(
+                    url, 
+                    params=params,
+                    data=data,
+                    json=json,
+                    headers=request_headers,
+                    timeout=self.timeout
+                )
+            
+            response.raise_for_status()
+            return response
+            
+        except requests.exceptions.RequestException as e:
+            self.logger.error(f"请求失败: {url} - {e}")
+            return None
+    
+    def _delay(self, min_seconds: float = None, max_seconds: float = None) -> float:
+        """
+        随机延迟
+        :param min_seconds: 最小延迟时间,默认使用配置中的值
+        :param max_seconds: 最大延迟时间,默认使用配置中的值
+        :return: 实际延迟时间
+        """
+        min_sec = min_seconds if min_seconds is not None else self.delay_range[0]
+        max_sec = max_seconds if max_seconds is not None else self.delay_range[1]
+        return random_delay(min_sec, max_sec)
+    
+    @abstractmethod
+    def search(self, keyword: str, **kwargs) -> List[Dict[str, Any]]:
+        """
+        搜索商品(抽象方法,子类必须实现)
+        :param keyword: 搜索关键词
+        :param kwargs: 其他参数
+        :return: 商品列表
+        """
+        pass
+    
+    @abstractmethod
+    def get_product_detail(self, product_id: str, **kwargs) -> Optional[Dict[str, Any]]:
+        """
+        获取商品详情(抽象方法,子类必须实现)
+        :param product_id: 商品ID
+        :param kwargs: 其他参数
+        :return: 商品详情
+        """
+        pass
+    
+    @abstractmethod
+    def get_price(self, product_id: str, **kwargs) -> Optional[Dict[str, Any]]:
+        """
+        获取商品价格(抽象方法,子类必须实现)
+        :param product_id: 商品ID
+        :param kwargs: 其他参数
+        :return: 价格信息
+        """
+        pass
+    
+    def crawl_products(self, keywords: List[str], save_to_db: bool = True, **kwargs) -> List[Dict[str, Any]]:
+        """
+        爬取多个关键词的商品
+        :param keywords: 关键词列表
+        :param save_to_db: 是否保存到数据库
+        :param kwargs: 其他参数
+        :return: 所有爬取的商品列表
+        """
+        all_products = []
+        
+        for keyword in keywords:
+            self.logger.info(f"开始搜索关键词: {keyword}")
+            
+            products = self.search(keyword, **kwargs)
+            self.logger.info(f"搜索 '{keyword}' 找到 {len(products)} 个商品")
+            
+            for product in products:
+                product['keyword'] = keyword
+                product['crawl_time'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
+                all_products.append(product)
+            
+            self._delay()
+        
+        if save_to_db:
+            self._save_products_to_db(all_products)
+        
+        return all_products
+    
+    def _save_products_to_db(self, products: List[Dict[str, Any]]) -> int:
+        """
+        保存商品到数据库
+        :param products: 商品列表
+        :return: 成功保存的数量
+        """
+        try:
+            from utils.db_utils import DBUtils
+            
+            db = DBUtils()
+            success_count = 0
+            
+            for product in products:
+                product_data = self._prepare_product_data(product)
+                db_product = db.add_product(product_data)
+                
+                if db_product:
+                    price_data = self._prepare_price_data(product)
+                    if price_data:
+                        db.add_price_history(price_data)
+                    success_count += 1
+            
+            self.logger.info(f"成功保存 {success_count}/{len(products)} 个商品到数据库")
+            return success_count
+            
+        except Exception as e:
+            self.logger.error(f"保存商品到数据库失败: {e}")
+            return 0
+    
+    def _prepare_product_data(self, product: Dict[str, Any]) -> Dict[str, Any]:
+        """
+        准备商品数据,转换为数据库格式
+        :param product: 原始商品数据
+        :return: 格式化后的商品数据
+        """
+        return {
+            'product_id': str(product.get('product_id', '')),
+            'name': product.get('name', ''),
+            'url': product.get('url', ''),
+            'image_url': product.get('image_url', ''),
+            'shop_name': product.get('shop_name', ''),
+            'platform': self.platform,
+            'category': product.get('category', ''),
+            'brand': product.get('brand', ''),
+            'is_wholesale': 1 if product.get('is_wholesale', False) else 0,
+            'min_order_quantity': product.get('min_order_quantity'),
+            'unit': product.get('unit', ''),
+            'current_price': clean_price(str(product.get('price', 0))),
+            'original_price': clean_price(str(product.get('original_price', 0))) if product.get('original_price') else None,
+            'currency': product.get('currency', 'CNY'),
+            'sales_volume': product.get('sales_volume'),
+            'rating': product.get('rating'),
+            'review_count': product.get('review_count'),
+            'stock': product.get('stock'),
+            'description': product.get('description', ''),
+            'crawl_time': datetime.now()
+        }
+    
+    def _prepare_price_data(self, product: Dict[str, Any]) -> Optional[Dict[str, Any]]:
+        """
+        准备价格历史数据
+        :param product: 商品数据
+        :return: 价格数据字典或None
+        """
+        price = clean_price(str(product.get('price', 0)))
+        if price <= 0:
+            return None
+        
+        return {
+            'product_id': str(product.get('product_id', '')),
+            'price': price,
+            'original_price': clean_price(str(product.get('original_price', 0))) if product.get('original_price') else None,
+            'currency': product.get('currency', 'CNY'),
+            'platform': self.platform,
+            'source_url': product.get('url', ''),
+            'crawl_time': datetime.now(),
+            'price_type': 'wholesale' if product.get('is_wholesale', False) else 'retail',
+            'min_quantity': product.get('min_order_quantity'),
+            'max_quantity': product.get('max_order_quantity')
+        }
+    
+    def close(self):
+        """
+        关闭会话
+        """
+        if hasattr(self, 'session'):
+            self.session.close()
+    
+    def __enter__(self):
+        return self
+    
+    def __exit__(self, exc_type, exc_val, exc_tb):
+        self.close()

+ 387 - 0
crawlers/jd.py

@@ -0,0 +1,387 @@
+import re
+import json
+import logging
+from typing import Dict, List, Optional, Any
+from urllib.parse import urlencode, quote
+
+from .base import BaseCrawler
+from utils.helpers import clean_price, parse_sales_volume
+
+
+class JdCrawler(BaseCrawler):
+    """
+    京东平台爬虫
+    注意:京东有较强的反爬机制,可能需要配置 Cookie 或使用代理
+    """
+    
+    platform = 'jd'
+    
+    def __init__(self, cookie: str = None, proxy: str = None):
+        """
+        初始化京东爬虫
+        :param cookie: 京东登录后的 Cookie 字符串
+        :param proxy: 代理服务器地址
+        """
+        super().__init__()
+        self.cookie = cookie
+        self.proxy = proxy
+        
+        if cookie:
+            self.headers['Cookie'] = cookie
+        
+        if proxy:
+            self.session.proxies = {
+                'http': proxy,
+                'https': proxy
+            }
+    
+    def search(self, keyword: str, page: int = 1, sort: str = 'sort_totalsales15_desc', **kwargs) -> List[Dict[str, Any]]:
+        """
+        搜索京东商品
+        :param keyword: 搜索关键词
+        :param page: 页码,从1开始
+        :param sort: 排序方式: 
+            - sort_totalsales15_desc (销量)
+            - sort_price_asc (价格升序)
+            - sort_price_desc (价格降序)
+            - sort_discount_desc (折扣)
+            - sort_totalsales15_desc (销量)
+        :return: 商品列表
+        """
+        self.logger.info(f"搜索京东商品: keyword={keyword}, page={page}")
+        
+        params = {
+            'keyword': keyword,
+            'wq': keyword,
+            'pvid': self._generate_pvid(),
+            'page': page,
+            's': (page - 1) * 30 + 1,
+        }
+        
+        if sort and sort != 'default':
+            params['psort'] = sort
+        
+        search_url = f"{self.config.get('search_url')}?{urlencode(params)}"
+        
+        self.logger.debug(f"搜索URL: {search_url}")
+        
+        response = self._make_request('GET', search_url)
+        
+        if not response:
+            self.logger.warning(f"搜索请求失败: {keyword}")
+            return []
+        
+        products = self._parse_search_result(response.text, search_url)
+        
+        if products:
+            self._fill_product_prices(products)
+        
+        self.logger.info(f"从搜索结果中解析到 {len(products)} 个商品")
+        
+        return products
+    
+    def _generate_pvid(self) -> str:
+        """
+        生成京东 pvid
+        """
+        import time
+        import random
+        timestamp = int(time.time() * 1000)
+        random_num = random.randint(100000, 999999)
+        return f"{timestamp}{random_num}"
+    
+    def _parse_search_result(self, html: str, source_url: str) -> List[Dict[str, Any]]:
+        """
+        解析京东搜索结果页面
+        """
+        products = []
+        
+        item_pattern = r'<li[^>]*data-sku="(\d+)"[^>]*>(.*?)</li>'
+        items = re.findall(item_pattern, html, re.DOTALL | re.IGNORECASE)
+        
+        self.logger.debug(f"找到 {len(items)} 个商品项")
+        
+        for sku_id, item_html in items:
+            try:
+                product = self._parse_item_html(sku_id, item_html, source_url)
+                if product:
+                    products.append(product)
+            except Exception as e:
+                self.logger.error(f"解析商品失败: {e}")
+                continue
+        
+        if not products:
+            products = self._parse_from_json(html, source_url)
+        
+        return products
+    
+    def _parse_item_html(self, sku_id: str, item_html: str, source_url: str) -> Optional[Dict[str, Any]]:
+        """
+        解析单个商品HTML
+        """
+        try:
+            title_match = re.search(r'<div[^>]*class="p-name[^"]*"[^>]*>.*?<a[^>]*>(.*?)</a>', item_html, re.DOTALL | re.IGNORECASE)
+            title = ''
+            if title_match:
+                title = title_match.group(1)
+                title = re.sub(r'<[^>]+>', '', title).strip()
+                title = re.sub(r'\s+', ' ', title)
+            
+            if not title:
+                return None
+            
+            price_match = re.search(r'<div[^>]*class="p-price[^"]*"[^>]*>.*?<i[^>]*>([\d.]*)</i>', item_html, re.DOTALL | re.IGNORECASE)
+            price = clean_price(price_match.group(1)) if price_match else 0
+            
+            shop_match = re.search(r'<div[^>]*class="p-shop[^"]*"[^>]*>.*?<a[^>]*>(.*?)</a>', item_html, re.DOTALL | re.IGNORECASE)
+            shop_name = ''
+            if shop_match:
+                shop_name = shop_match.group(1)
+                shop_name = re.sub(r'<[^>]+>', '', shop_name).strip()
+            
+            sales_match = re.search(r'<div[^>]*class="p-commit[^"]*"[^>]*>.*?<a[^>]*>([^<]+)</a>', item_html, re.DOTALL | re.IGNORECASE)
+            sales_str = sales_match.group(1) if sales_match else ''
+            sales_volume = parse_sales_volume(sales_str)
+            
+            img_match = re.search(r'<div[^>]*class="p-img[^"]*"[^>]*>.*?<img[^>]*data-lazy-img="([^"]+)"', item_html, re.DOTALL | re.IGNORECASE) or \
+                       re.search(r'<div[^>]*class="p-img[^"]*"[^>]*>.*?<img[^>]*src="([^"]+)"', item_html, re.DOTALL | re.IGNORECASE)
+            img_url = img_match.group(1) if img_match else ''
+            if img_url and not img_url.startswith('http'):
+                img_url = 'https:' + img_url
+            
+            detail_url = f"https://item.jd.com/{sku_id}.html"
+            
+            is_self = '京东自营' in shop_name or 'p-icons' in item_html and '自营' in item_html
+            
+            return {
+                'product_id': str(sku_id),
+                'name': title,
+                'price': price,
+                'image_url': img_url,
+                'url': detail_url,
+                'shop_name': shop_name,
+                'sales_volume': sales_volume,
+                'is_jd_self': is_self,
+                'source_url': source_url,
+                'is_wholesale': False,
+                'currency': 'CNY'
+            }
+            
+        except Exception as e:
+            self.logger.error(f"解析商品HTML失败: {e}")
+            return None
+    
+    def _parse_from_json(self, html: str, source_url: str) -> List[Dict[str, Any]]:
+        """
+        从页面中的 JSON 数据解析商品
+        """
+        products = []
+        
+        json_pattern = r'window\.__SEARCH_RESULT__\s*=\s*({.*?});'
+        json_match = re.search(json_pattern, html, re.DOTALL)
+        
+        if json_match:
+            try:
+                data = json.loads(json_match.group(1))
+                items = data.get('wareList', {}).get('wareInfo', [])
+                
+                for item in items:
+                    product = self._parse_json_item(item, source_url)
+                    if product:
+                        products.append(product)
+                        
+            except json.JSONDecodeError as e:
+                self.logger.error(f"解析JSON失败: {e}")
+        
+        return products
+    
+    def _parse_json_item(self, item: Dict, source_url: str) -> Optional[Dict[str, Any]]:
+        """
+        解析 JSON 格式的商品数据
+        """
+        try:
+            sku_id = item.get('wname', '') or item.get('wareId', '')
+            if not sku_id:
+                return None
+            
+            title = item.get('wname', '').strip()
+            if not title:
+                return None
+            
+            price = clean_price(str(item.get('price', '0')))
+            original_price = clean_price(str(item.get('oprice', '0'))) or None
+            
+            img_url = item.get('imgurl', '')
+            if img_url and not img_url.startswith('http'):
+                img_url = 'https:' + img_url
+            
+            shop_name = item.get('goodShop', {}).get('shopName', '') if item.get('goodShop') else ''
+            
+            sales_str = item.get('reviews', '')
+            sales_volume = parse_sales_volume(sales_str)
+            
+            detail_url = f"https://item.jd.com/{sku_id}.html"
+            
+            return {
+                'product_id': str(sku_id),
+                'name': title,
+                'price': price,
+                'original_price': original_price,
+                'image_url': img_url,
+                'url': detail_url,
+                'shop_name': shop_name,
+                'sales_volume': sales_volume,
+                'source_url': source_url,
+                'is_wholesale': False,
+                'currency': 'CNY'
+            }
+            
+        except Exception as e:
+            self.logger.error(f"解析JSON商品失败: {e}")
+            return None
+    
+    def _fill_product_prices(self, products: List[Dict[str, Any]]):
+        """
+        批量获取商品价格(京东价格接口)
+        """
+        if not products:
+            return
+        
+        sku_ids = [p.get('product_id') for p in products if p.get('product_id') and p.get('price', 0) <= 0]
+        
+        if not sku_ids:
+            return
+        
+        self.logger.debug(f"批量获取 {len(sku_ids)} 个商品的价格")
+        
+        sku_str = ','.join([f'J_{sku}' for sku in sku_ids])
+        price_url = f"https://p.3.cn/prices/mgets?skuIds={sku_str}&type=1"
+        
+        response = self._make_request('GET', price_url, headers={
+            'Referer': 'https://www.jd.com'
+        })
+        
+        if response:
+            try:
+                price_data = response.json()
+                price_map = {}
+                for item in price_data:
+                    sku = item.get('id', '').replace('J_', '')
+                    price = clean_price(item.get('p', '0'))
+                    original_price = clean_price(item.get('op', '0')) or None
+                    price_map[sku] = {'price': price, 'original_price': original_price}
+                
+                for product in products:
+                    sku = product.get('product_id')
+                    if sku in price_map:
+                        if product.get('price', 0) <= 0:
+                            product['price'] = price_map[sku]['price']
+                        if not product.get('original_price'):
+                            product['original_price'] = price_map[sku]['original_price']
+                            
+            except json.JSONDecodeError as e:
+                self.logger.error(f"解析价格数据失败: {e}")
+    
+    def get_product_detail(self, product_id: str, **kwargs) -> Optional[Dict[str, Any]]:
+        """
+        获取京东商品详情
+        :param product_id: 商品ID (sku)
+        :return: 商品详情
+        """
+        self.logger.info(f"获取京东商品详情: product_id={product_id}")
+        
+        detail_url = f"https://item.jd.com/{product_id}.html"
+        
+        response = self._make_request('GET', detail_url)
+        
+        if not response:
+            self.logger.warning(f"获取商品详情失败: {product_id}")
+            return None
+        
+        return self._parse_product_detail(response.text, product_id, detail_url)
+    
+    def _parse_product_detail(self, html: str, product_id: str, source_url: str) -> Optional[Dict[str, Any]]:
+        """
+        解析商品详情页面
+        """
+        try:
+            title_match = re.search(r'<title>([^<]+)</title>', html) or \
+                         re.search(r'<div[^>]*class="sku-name[^"]*"[^>]*>(.*?)</div>', html, re.DOTALL | re.IGNORECASE)
+            title = ''
+            if title_match:
+                title = title_match.group(1)
+                title = re.sub(r'<[^>]+>', '', title).strip()
+                title = title.split('-')[0].strip() if '-' in title else title
+            
+            shop_match = re.search(r'<div[^>]*class="name[^"]*"[^>]*>.*?<a[^>]*>(.*?)</a>', html, re.DOTALL | re.IGNORECASE) or \
+                         re.search(r'shopName\s*:\s*"([^"]+)"', html)
+            shop_name = shop_match.group(1).strip() if shop_match else ''
+            
+            self._fill_product_prices([{'product_id': product_id, 'price': 0}])
+            
+            return {
+                'product_id': str(product_id),
+                'name': title,
+                'url': source_url,
+                'shop_name': shop_name,
+                'is_wholesale': False,
+                'currency': 'CNY'
+            }
+            
+        except Exception as e:
+            self.logger.error(f"解析商品详情失败: {e}")
+            return None
+    
+    def get_price(self, product_id: str, **kwargs) -> Optional[Dict[str, Any]]:
+        """
+        获取商品价格
+        :param product_id: 商品ID
+        :return: 价格信息
+        """
+        self.logger.info(f"获取京东商品价格: product_id={product_id}")
+        
+        sku_str = f'J_{product_id}'
+        price_url = f"https://p.3.cn/prices/mgets?skuIds={sku_str}&type=1"
+        
+        response = self._make_request('GET', price_url, headers={
+            'Referer': 'https://www.jd.com'
+        })
+        
+        if response:
+            try:
+                price_data = response.json()
+                if price_data and len(price_data) > 0:
+                    item = price_data[0]
+                    price = clean_price(item.get('p', '0'))
+                    original_price = clean_price(item.get('op', '0')) or None
+                    
+                    return {
+                        'product_id': str(product_id),
+                        'price': price,
+                        'original_price': original_price,
+                        'currency': 'CNY',
+                        'platform': self.platform,
+                        'source_url': f"https://item.jd.com/{product_id}.html",
+                        'price_type': 'retail'
+                    }
+            except json.JSONDecodeError as e:
+                self.logger.error(f"解析价格数据失败: {e}")
+        
+        return None
+    
+    def set_cookie(self, cookie: str):
+        """
+        设置 Cookie
+        """
+        self.cookie = cookie
+        self.headers['Cookie'] = cookie
+    
+    def set_proxy(self, proxy: str):
+        """
+        设置代理
+        """
+        self.proxy = proxy
+        self.session.proxies = {
+            'http': proxy,
+            'https': proxy
+        }

+ 285 - 0
crawlers/taobao.py

@@ -0,0 +1,285 @@
+import re
+import json
+import logging
+from typing import Dict, List, Optional, Any
+from urllib.parse import urlencode, urlparse, parse_qs
+
+from .base import BaseCrawler
+from utils.helpers import clean_price, parse_sales_volume
+
+
+class TaobaoCrawler(BaseCrawler):
+    """
+    淘宝平台爬虫
+    注意:淘宝有较强的反爬机制,需要配置 Cookie 或使用代理
+    """
+    
+    platform = 'taobao'
+    
+    def __init__(self, cookie: str = None, proxy: str = None):
+        """
+        初始化淘宝爬虫
+        :param cookie: 淘宝登录后的 Cookie 字符串
+        :param proxy: 代理服务器地址,如 'http://127.0.0.1:7890'
+        """
+        super().__init__()
+        self.cookie = cookie
+        self.proxy = proxy
+        
+        if cookie:
+            self.headers['Cookie'] = cookie
+        
+        if proxy:
+            self.session.proxies = {
+                'http': proxy,
+                'https': proxy
+            }
+    
+    def search(self, keyword: str, page: int = 1, sort: str = 'default', **kwargs) -> List[Dict[str, Any]]:
+        """
+        搜索淘宝商品
+        :param keyword: 搜索关键词
+        :param page: 页码,从1开始
+        :param sort: 排序方式: default(综合), sale-desc(销量), price-asc(价格升序), price-desc(价格降序)
+        :return: 商品列表
+        """
+        self.logger.info(f"搜索淘宝商品: keyword={keyword}, page={page}")
+        
+        params = {
+            'q': keyword,
+            's': (page - 1) * 44,
+            'sort': sort
+        }
+        
+        search_url = f"{self.config.get('search_url')}?{urlencode(params)}"
+        
+        self.logger.debug(f"搜索URL: {search_url}")
+        
+        response = self._make_request('GET', search_url)
+        
+        if not response:
+            self.logger.warning(f"搜索请求失败: {keyword}")
+            return []
+        
+        products = self._parse_search_result(response.text, search_url)
+        
+        self.logger.info(f"从搜索结果中解析到 {len(products)} 个商品")
+        
+        return products
+    
+    def _parse_search_result(self, html: str, source_url: str) -> List[Dict[str, Any]]:
+        """
+        解析淘宝搜索结果页面
+        注意:淘宝搜索结果主要通过 JavaScript 渲染,需要从页面中的 JSON 数据提取
+        """
+        products = []
+        
+        g_page_config_match = re.search(r'g_page_config\s*=\s*({.*?});', html, re.DOTALL)
+        if g_page_config_match:
+            try:
+                g_page_config = json.loads(g_page_config_match.group(1))
+                auctions = g_page_config.get('mods', {}).get('itemlist', {}).get('data', {}).get('auctions', [])
+                
+                for auction in auctions:
+                    product = self._parse_auction_item(auction, source_url)
+                    if product:
+                        products.append(product)
+                        
+            except json.JSONDecodeError as e:
+                self.logger.error(f"解析 g_page_config 失败: {e}")
+        
+        if not products:
+            products = self._parse_html_directly(html, source_url)
+        
+        return products
+    
+    def _parse_auction_item(self, auction: Dict, source_url: str) -> Optional[Dict[str, Any]]:
+        """
+        解析单个拍卖商品数据
+        """
+        try:
+            nid = auction.get('nid', '')
+            if not nid:
+                return None
+            
+            title = auction.get('raw_title', '') or auction.get('title', '')
+            if not title:
+                return None
+            
+            price = clean_price(auction.get('view_price', '0'))
+            original_price = clean_price(auction.get('view_fee', '0')) or None
+            
+            pic_url = auction.get('pic_url', '')
+            if pic_url and not pic_url.startswith('http'):
+                pic_url = 'https:' + pic_url
+            
+            detail_url = auction.get('detail_url', '')
+            if detail_url and not detail_url.startswith('http'):
+                detail_url = 'https:' + detail_url
+            
+            sales_str = auction.get('view_sales', '')
+            sales_volume = parse_sales_volume(sales_str)
+            
+            shop_name = auction.get('nick', '')
+            
+            is_tmall = auction.get('shopcard', {}).get('isTmall', False) if auction.get('shopcard') else False
+            
+            return {
+                'product_id': str(nid),
+                'name': title.strip(),
+                'price': price,
+                'original_price': original_price,
+                'image_url': pic_url,
+                'url': detail_url,
+                'shop_name': shop_name,
+                'sales_volume': sales_volume,
+                'is_tmall': is_tmall,
+                'source_url': source_url,
+                'is_wholesale': False,
+                'currency': 'CNY'
+            }
+            
+        except Exception as e:
+            self.logger.error(f"解析商品数据失败: {e}")
+            return None
+    
+    def _parse_html_directly(self, html: str, source_url: str) -> List[Dict[str, Any]]:
+        """
+        直接从 HTML 中解析商品(备用方法)
+        """
+        products = []
+        
+        item_pattern = r'<div[^>]*class="[^"]*item[^"]*"[^>]*data-id="(\d+)"[^>]*>(.*?)</div>\s*</div>\s*</div>'
+        items = re.findall(item_pattern, html, re.DOTALL | re.IGNORECASE)
+        
+        for item_id, item_html in items:
+            try:
+                title_match = re.search(r'<a[^>]*class="[^"]*J_ClickStat[^"]*"[^>]*>(.*?)</a>', item_html, re.DOTALL)
+                title = title_match.group(1) if title_match else ''
+                title = re.sub(r'<[^>]+>', '', title).strip() if title else ''
+                
+                price_match = re.search(r'<strong[^>]*data-price="([\d.]+)"', item_html)
+                price = clean_price(price_match.group(1)) if price_match else 0
+                
+                sales_match = re.search(r'<div[^>]*class="deal-cnt"[^>]*>([^<]+)</div>', item_html)
+                sales_str = sales_match.group(1) if sales_match else ''
+                sales_volume = parse_sales_volume(sales_str)
+                
+                shop_match = re.search(r'<a[^>]*class="shopname[^"]*"[^>]*>(.*?)</a>', item_html, re.DOTALL)
+                shop_name = shop_match.group(1) if shop_match else ''
+                shop_name = re.sub(r'<[^>]+>', '', shop_name).strip() if shop_name else ''
+                
+                img_match = re.search(r'<img[^>]*data-src="([^"]+)"', item_html)
+                img_url = img_match.group(1) if img_match else ''
+                if img_url and not img_url.startswith('http'):
+                    img_url = 'https:' + img_url
+                
+                if title and item_id:
+                    products.append({
+                        'product_id': str(item_id),
+                        'name': title,
+                        'price': price,
+                        'image_url': img_url,
+                        'shop_name': shop_name,
+                        'sales_volume': sales_volume,
+                        'source_url': source_url,
+                        'is_wholesale': False,
+                        'currency': 'CNY'
+                    })
+                    
+            except Exception as e:
+                self.logger.error(f"解析HTML商品失败: {e}")
+                continue
+        
+        return products
+    
+    def get_product_detail(self, product_id: str, **kwargs) -> Optional[Dict[str, Any]]:
+        """
+        获取淘宝商品详情
+        :param product_id: 商品ID (nid)
+        :return: 商品详情
+        """
+        self.logger.info(f"获取淘宝商品详情: product_id={product_id}")
+        
+        detail_url = f"https://item.taobao.com/item.htm?id={product_id}"
+        
+        response = self._make_request('GET', detail_url)
+        
+        if not response:
+            self.logger.warning(f"获取商品详情失败: {product_id}")
+            return None
+        
+        return self._parse_product_detail(response.text, product_id, detail_url)
+    
+    def _parse_product_detail(self, html: str, product_id: str, source_url: str) -> Optional[Dict[str, Any]]:
+        """
+        解析商品详情页面
+        """
+        try:
+            title_match = re.search(r'<title>([^<]+)</title>', html)
+            title = title_match.group(1).split('-')[0].strip() if title_match else ''
+            
+            price_match = re.search(r'"price"\s*:\s*"([\d.]+)"', html) or \
+                          re.search(r'"defaultItemPrice"\s*:\s*"([\d.]+)"', html)
+            price = clean_price(price_match.group(1)) if price_match else 0
+            
+            shop_match = re.search(r'"nick"\s*:\s*"([^"]+)"', html) or \
+                         re.search(r'shopName["\']\s*[:=]\s*["\']([^"\']+)["\']', html)
+            shop_name = shop_match.group(1) if shop_match else ''
+            
+            sales_match = re.search(r'"sellCount"\s*:\s*(\d+)', html) or \
+                          re.search(r'"totalSoldQuantity"\s*:\s*(\d+)', html)
+            sales_volume = int(sales_match.group(1)) if sales_match else None
+            
+            return {
+                'product_id': str(product_id),
+                'name': title,
+                'price': price,
+                'url': source_url,
+                'shop_name': shop_name,
+                'sales_volume': sales_volume,
+                'is_wholesale': False,
+                'currency': 'CNY'
+            }
+            
+        except Exception as e:
+            self.logger.error(f"解析商品详情失败: {e}")
+            return None
+    
+    def get_price(self, product_id: str, **kwargs) -> Optional[Dict[str, Any]]:
+        """
+        获取商品价格
+        :param product_id: 商品ID
+        :return: 价格信息
+        """
+        detail = self.get_product_detail(product_id, **kwargs)
+        if detail:
+            return {
+                'product_id': str(product_id),
+                'price': detail.get('price', 0),
+                'original_price': detail.get('original_price'),
+                'currency': 'CNY',
+                'platform': self.platform,
+                'source_url': detail.get('url', ''),
+                'price_type': 'retail'
+            }
+        return None
+    
+    def set_cookie(self, cookie: str):
+        """
+        设置 Cookie
+        :param cookie: Cookie 字符串
+        """
+        self.cookie = cookie
+        self.headers['Cookie'] = cookie
+    
+    def set_proxy(self, proxy: str):
+        """
+        设置代理
+        :param proxy: 代理服务器地址
+        """
+        self.proxy = proxy
+        self.session.proxies = {
+            'http': proxy,
+            'https': proxy
+        }

+ 391 - 0
main.py

@@ -0,0 +1,391 @@
+#!/usr/bin/env python
+# -*- encoding: utf-8 -*-
+"""
+@Contact :   liuyuqi.gov@msn.cn
+@Time    :   2024/05/03
+@License :   Copyright © 2017-2024 liuyuqi. All Rights Reserved.
+@Desc    :   商品价格爬虫主程序
+"""
+import argparse
+import logging
+import logging.config
+import sys
+import os
+from concurrent.futures import ThreadPoolExecutor, as_completed
+from typing import List, Dict, Any, Optional
+from datetime import datetime
+
+sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
+
+from config.settings import LOGGING_CONFIG
+from models.product import init_db
+from crawlers.taobao import TaobaoCrawler
+from crawlers.jd import JdCrawler
+from crawlers.alibaba1688 import Alibaba1688Crawler
+from utils.db_utils import DBUtils
+
+
+def setup_logging():
+    """配置日志"""
+    log_dir = 'logs'
+    if not os.path.exists(log_dir):
+        os.makedirs(log_dir)
+    logging.config.dictConfig(LOGGING_CONFIG)
+
+
+def init_database():
+    """初始化数据库"""
+    logger = logging.getLogger(__name__)
+    logger.info("初始化数据库...")
+    try:
+        init_db()
+        logger.info("数据库初始化成功")
+    except Exception as e:
+        logger.error(f"数据库初始化失败: {e}")
+        sys.exit(1)
+
+
+def get_crawler(platform: str, cookie: str = None, proxy: str = None):
+    """
+    获取对应平台的爬虫实例
+    :param platform: 平台名称: taobao, jd, alibaba1688
+    :param cookie: Cookie字符串
+    :param proxy: 代理地址
+    :return: 爬虫实例
+    """
+    platform = platform.lower()
+    
+    if platform == 'taobao':
+        return TaobaoCrawler(cookie=cookie, proxy=proxy)
+    elif platform == 'jd':
+        return JdCrawler(cookie=cookie, proxy=proxy)
+    elif platform in ['alibaba1688', '1688', 'alibaba']:
+        return Alibaba1688Crawler(cookie=cookie, proxy=proxy)
+    else:
+        raise ValueError(f"不支持的平台: {platform}")
+
+
+def crawl_single_platform(platform: str, keywords: List[str], pages: int = 1, 
+                          cookie: str = None, proxy: str = None,
+                          save_to_db: bool = True) -> List[Dict[str, Any]]:
+    """
+    爬取单个平台的商品
+    :param platform: 平台名称
+    :param keywords: 关键词列表
+    :param pages: 爬取页数
+    :param cookie: Cookie
+    :param proxy: 代理
+    :param save_to_db: 是否保存到数据库
+    :return: 爬取的商品列表
+    """
+    logger = logging.getLogger(__name__)
+    logger.info(f"开始爬取 {platform} 平台,关键词: {keywords}, 页数: {pages}")
+    
+    all_products = []
+    
+    try:
+        crawler = get_crawler(platform, cookie=cookie, proxy=proxy)
+        
+        for keyword in keywords:
+            for page in range(1, pages + 1):
+                logger.info(f"爬取 {platform} - 关键词: {keyword}, 第 {page} 页")
+                
+                products = crawler.search(keyword, page=page)
+                
+                for product in products:
+                    product['keyword'] = keyword
+                    product['page'] = page
+                    product['crawl_time'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
+                    all_products.append(product)
+                
+                logger.info(f"第 {page} 页获取到 {len(products)} 个商品")
+                
+                if page < pages:
+                    from utils.helpers import random_delay
+                    random_delay(1, 3)
+        
+        crawler.close()
+        
+        if save_to_db and all_products:
+            save_products_to_db(all_products, platform)
+        
+    except Exception as e:
+        logger.error(f"爬取 {platform} 平台失败: {e}")
+        import traceback
+        traceback.print_exc()
+    
+    logger.info(f"{platform} 平台爬取完成,共获取 {len(all_products)} 个商品")
+    return all_products
+
+
+def save_products_to_db(products: List[Dict[str, Any]], platform: str):
+    """
+    保存商品到数据库
+    """
+    logger = logging.getLogger(__name__)
+    
+    try:
+        db = DBUtils()
+        success_count = 0
+        
+        for product in products:
+            product_data = {
+                'product_id': str(product.get('product_id', '')),
+                'name': product.get('name', ''),
+                'url': product.get('url', ''),
+                'image_url': product.get('image_url', ''),
+                'shop_name': product.get('shop_name', ''),
+                'platform': platform,
+                'is_wholesale': 1 if product.get('is_wholesale', False) else 0,
+                'min_order_quantity': product.get('min_order_quantity'),
+                'unit': product.get('unit', ''),
+                'current_price': product.get('price', 0),
+                'original_price': product.get('original_price'),
+                'sales_volume': product.get('sales_volume'),
+                'crawl_time': datetime.now()
+            }
+            
+            db_product = db.add_product(product_data)
+            
+            if db_product:
+                price_data = {
+                    'product_id': str(product.get('product_id', '')),
+                    'price': product.get('price', 0),
+                    'original_price': product.get('original_price'),
+                    'platform': platform,
+                    'source_url': product.get('url', ''),
+                    'price_type': 'wholesale' if product.get('is_wholesale', False) else 'retail',
+                    'min_quantity': product.get('min_order_quantity')
+                }
+                db.add_price_history(price_data)
+                success_count += 1
+        
+        logger.info(f"成功保存 {success_count}/{len(products)} 个商品到数据库")
+        
+    except Exception as e:
+        logger.error(f"保存商品到数据库失败: {e}")
+        import traceback
+        traceback.print_exc()
+
+
+def run_crawl(platforms: List[str], keywords: List[str], pages: int = 1,
+              concurrent: bool = True, max_workers: int = 3,
+              cookie: str = None, proxy: str = None) -> Dict[str, List[Dict[str, Any]]]:
+    """
+    运行爬取任务
+    :param platforms: 平台列表
+    :param keywords: 关键词列表
+    :param pages: 每个关键词爬取的页数
+    :param concurrent: 是否并发爬取
+    :param max_workers: 并发线程数
+    :param cookie: Cookie
+    :param proxy: 代理
+    :return: 各平台的商品数据
+    """
+    logger = logging.getLogger(__name__)
+    logger.info(f"开始爬取任务,平台: {platforms}, 关键词: {keywords}, 页数: {pages}")
+    
+    results = {}
+    
+    if concurrent and len(platforms) > 1:
+        logger.info(f"使用并发模式,最大线程数: {max_workers}")
+        
+        with ThreadPoolExecutor(max_workers=max_workers) as executor:
+            future_to_platform = {
+                executor.submit(
+                    crawl_single_platform, 
+                    platform, keywords, pages, cookie, proxy
+                ): platform 
+                for platform in platforms
+            }
+            
+            for future in as_completed(future_to_platform):
+                platform = future_to_platform[future]
+                try:
+                    products = future.result()
+                    results[platform] = products
+                except Exception as e:
+                    logger.error(f"{platform} 平台爬取出错: {e}")
+                    results[platform] = []
+    else:
+        for platform in platforms:
+            products = crawl_single_platform(platform, keywords, pages, cookie, proxy)
+            results[platform] = products
+    
+    total_count = sum(len(products) for products in results.values())
+    logger.info(f"爬取任务完成,共获取 {total_count} 个商品")
+    
+    return results
+
+
+def search_products(keyword: str, platform: str = None, limit: int = 50):
+    """
+    从数据库搜索商品
+    """
+    logger = logging.getLogger(__name__)
+    
+    try:
+        db = DBUtils()
+        products = db.search_products(keyword, platform=platform, limit=limit)
+        
+        if products:
+            logger.info(f"找到 {len(products)} 个相关商品:")
+            for i, product in enumerate(products, 1):
+                price_str = f"¥{product.get('current_price', 0):.2f}"
+                wholesale = " [批发]" if product.get('is_wholesale') else ""
+                logger.info(f"{i}. {product.get('name')[:50]}... - {price_str}{wholesale}")
+                logger.info(f"   平台: {product.get('platform')}, 店铺: {product.get('shop_name', '未知')}")
+                logger.info(f"   链接: {product.get('url', 'N/A')}")
+        else:
+            logger.info("未找到相关商品")
+            
+    except Exception as e:
+        logger.error(f"搜索商品失败: {e}")
+
+
+def show_stats():
+    """
+    显示数据库统计信息
+    """
+    logger = logging.getLogger(__name__)
+    
+    try:
+        db = DBUtils()
+        
+        total_count = db.get_product_count()
+        taobao_count = db.get_product_count('taobao')
+        jd_count = db.get_product_count('jd')
+        alibaba_count = db.get_product_count('alibaba1688')
+        
+        logger.info("=" * 50)
+        logger.info("数据库统计信息")
+        logger.info("=" * 50)
+        logger.info(f"总商品数: {total_count}")
+        logger.info(f"  淘宝: {taobao_count}")
+        logger.info(f"  京东: {jd_count}")
+        logger.info(f"  1688批发: {alibaba_count}")
+        logger.info("=" * 50)
+        
+        recent_products = db.get_all_products(limit=5)
+        if recent_products:
+            logger.info("\n最近添加的商品:")
+            for i, product in enumerate(recent_products, 1):
+                price_str = f"¥{product.get('current_price', 0):.2f}"
+                logger.info(f"{i}. {product.get('name')[:40]}... - {price_str} ({product.get('platform')})")
+                
+    except Exception as e:
+        logger.error(f"获取统计信息失败: {e}")
+
+
+def main():
+    parser = argparse.ArgumentParser(
+        description='商品价格爬虫 - 支持淘宝、京东、1688批发平台',
+        formatter_class=argparse.RawDescriptionHelpFormatter,
+        epilog='''
+使用示例:
+  # 初始化数据库
+  python main.py --mode init
+  
+  # 爬取淘宝和京东的"手机"商品,各爬取2页
+  python main.py --mode crawl --platforms taobao jd --keywords 手机 --pages 2
+  
+  # 爬取1688批发平台的"服装"商品
+  python main.py --mode crawl --platforms alibaba1688 --keywords 服装 --pages 3
+  
+  # 使用Cookie和代理
+  python main.py --mode crawl --platforms taobao --keywords 电脑 --cookie "your_cookie_here" --proxy "http://127.0.0.1:7890"
+  
+  # 搜索数据库中的商品
+  python main.py --mode search --keyword 手机
+  
+  # 显示统计信息
+  python main.py --mode stats
+        '''
+    )
+    
+    parser.add_argument('--mode', 
+                        choices=['init', 'crawl', 'search', 'stats'],
+                        default='stats',
+                        help='运行模式: init(初始化数据库), crawl(爬取), search(搜索), stats(统计)')
+    
+    parser.add_argument('--platforms', 
+                        nargs='+',
+                        default=['taobao'],
+                        help='目标平台: taobao, jd, alibaba1688 (可多选)')
+    
+    parser.add_argument('--keywords', 
+                        nargs='+',
+                        default=['手机'],
+                        help='搜索关键词 (可多选)')
+    
+    parser.add_argument('--pages', 
+                        type=int,
+                        default=1,
+                        help='每个关键词爬取的页数 (默认: 1)')
+    
+    parser.add_argument('--keyword',
+                        type=str,
+                        help='搜索模式下的关键词')
+    
+    parser.add_argument('--limit',
+                        type=int,
+                        default=50,
+                        help='搜索结果数量限制 (默认: 50)')
+    
+    parser.add_argument('--concurrent',
+                        action='store_true',
+                        default=True,
+                        help='是否并发爬取 (默认: True)')
+    
+    parser.add_argument('--workers',
+                        type=int,
+                        default=3,
+                        help='并发线程数 (默认: 3)')
+    
+    parser.add_argument('--cookie',
+                        type=str,
+                        help='平台登录Cookie')
+    
+    parser.add_argument('--proxy',
+                        type=str,
+                        help='代理服务器地址,如 http://127.0.0.1:7890')
+    
+    args = parser.parse_args()
+    
+    setup_logging()
+    logger = logging.getLogger(__name__)
+    
+    try:
+        if args.mode == 'init':
+            init_database()
+            
+        elif args.mode == 'crawl':
+            init_database()
+            run_crawl(
+                platforms=args.platforms,
+                keywords=args.keywords,
+                pages=args.pages,
+                concurrent=args.concurrent,
+                max_workers=args.workers,
+                cookie=args.cookie,
+                proxy=args.proxy
+            )
+            
+        elif args.mode == 'search':
+            if not args.keyword:
+                logger.error("search模式需要提供 --keyword 参数")
+                sys.exit(1)
+            search_products(args.keyword, limit=args.limit)
+            
+        elif args.mode == 'stats':
+            show_stats()
+            
+    except Exception as e:
+        logger.error(f"程序执行出错: {e}")
+        import traceback
+        traceback.print_exc()
+        sys.exit(1)
+
+
+if __name__ == '__main__':
+    main()

+ 1 - 0
models/__init__.py

@@ -0,0 +1 @@
+from .product import Product, PriceHistory, get_engine, init_db, get_session

+ 122 - 0
models/product.py

@@ -0,0 +1,122 @@
+from sqlalchemy import Column, Integer, String, Text, DateTime, Float, create_engine
+from sqlalchemy.ext.declarative import declarative_base
+from sqlalchemy.orm import sessionmaker
+from datetime import datetime
+
+from config.settings import DATABASE_CONFIG
+
+Base = declarative_base()
+
+
+class Product(Base):
+    __tablename__ = 'products'
+
+    id = Column(Integer, primary_key=True, autoincrement=True)
+    product_id = Column(String(100), unique=True, nullable=False, comment='来源平台的商品ID')
+    name = Column(String(500), nullable=False, comment='商品名称')
+    url = Column(Text, comment='商品链接')
+    image_url = Column(Text, comment='商品图片链接')
+    shop_name = Column(String(200), comment='店铺名称')
+    platform = Column(String(50), nullable=False, comment='来源平台: taobao, jd, alibaba1688')
+    category = Column(String(200), comment='商品分类')
+    brand = Column(String(100), comment='品牌')
+    is_wholesale = Column(Integer, default=0, comment='是否为批发商品: 0-否, 1-是')
+    min_order_quantity = Column(Integer, comment='最小起订量(批发)')
+    unit = Column(String(50), comment='单位')
+    current_price = Column(Float, comment='当前价格')
+    original_price = Column(Float, comment='原价')
+    currency = Column(String(10), default='CNY', comment='货币单位')
+    sales_volume = Column(Integer, comment='销量')
+    rating = Column(Float, comment='评分')
+    review_count = Column(Integer, comment='评论数')
+    stock = Column(Integer, comment='库存')
+    description = Column(Text, comment='商品描述')
+    create_time = Column(DateTime, default=datetime.now, comment='创建时间')
+    update_time = Column(DateTime, default=datetime.now, onupdate=datetime.now, comment='更新时间')
+    crawl_time = Column(DateTime, comment='最后爬取时间')
+
+    def __repr__(self):
+        return f"<Product(product_id='{self.product_id}', name='{self.name}', platform='{self.platform}')>"
+
+    def to_dict(self):
+        return {
+            'id': self.id,
+            'product_id': self.product_id,
+            'name': self.name,
+            'url': self.url,
+            'image_url': self.image_url,
+            'shop_name': self.shop_name,
+            'platform': self.platform,
+            'category': self.category,
+            'brand': self.brand,
+            'is_wholesale': self.is_wholesale,
+            'min_order_quantity': self.min_order_quantity,
+            'unit': self.unit,
+            'current_price': self.current_price,
+            'original_price': self.original_price,
+            'currency': self.currency,
+            'sales_volume': self.sales_volume,
+            'rating': self.rating,
+            'review_count': self.review_count,
+            'stock': self.stock,
+            'description': self.description,
+            'create_time': self.create_time.strftime('%Y-%m-%d %H:%M:%S') if self.create_time else None,
+            'update_time': self.update_time.strftime('%Y-%m-%d %H:%M:%S') if self.update_time else None,
+            'crawl_time': self.crawl_time.strftime('%Y-%m-%d %H:%M:%S') if self.crawl_time else None
+        }
+
+
+class PriceHistory(Base):
+    __tablename__ = 'price_history'
+
+    id = Column(Integer, primary_key=True, autoincrement=True)
+    product_id = Column(String(100), nullable=False, comment='商品ID')
+    price = Column(Float, nullable=False, comment='价格')
+    original_price = Column(Float, comment='原价')
+    currency = Column(String(10), default='CNY', comment='货币单位')
+    platform = Column(String(50), nullable=False, comment='来源平台')
+    source_url = Column(Text, comment='来源页面URL')
+    crawl_time = Column(DateTime, default=datetime.now, comment='爬取时间')
+    price_type = Column(String(50), default='retail', comment='价格类型: retail(零售), wholesale(批发)')
+    min_quantity = Column(Integer, comment='最小起订量(批发价格)')
+    max_quantity = Column(Integer, comment='最大起订量(批发价格)')
+
+    def __repr__(self):
+        return f"<PriceHistory(product_id='{self.product_id}', price={self.price}, crawl_time='{self.crawl_time}')>"
+
+    def to_dict(self):
+        return {
+            'id': self.id,
+            'product_id': self.product_id,
+            'price': self.price,
+            'original_price': self.original_price,
+            'currency': self.currency,
+            'platform': self.platform,
+            'source_url': self.source_url,
+            'crawl_time': self.crawl_time.strftime('%Y-%m-%d %H:%M:%S') if self.crawl_time else None,
+            'price_type': self.price_type,
+            'min_quantity': self.min_quantity,
+            'max_quantity': self.max_quantity
+        }
+
+
+def get_engine():
+    db_config = DATABASE_CONFIG
+    connection_string = (
+        f"mysql+pymysql://{db_config['user']}:{db_config['password']}"
+        f"@{db_config['host']}:{db_config['port']}/{db_config['database']}"
+        f"?charset={db_config['charset']}"
+    )
+    return create_engine(connection_string, echo=False)
+
+
+def init_db():
+    engine = get_engine()
+    Base.metadata.create_all(engine)
+    print("数据库初始化完成")
+
+
+def get_session():
+    engine = get_engine()
+    Session = sessionmaker(bind=engine)
+    return Session()

+ 6 - 0
requirements.txt

@@ -0,0 +1,6 @@
+requests>=2.28.0
+sqlalchemy>=2.0.0
+pymysql>=1.0.0
+lxml>=4.9.0
+beautifulsoup4>=4.12.0
+python-dotenv>=1.0.0

+ 2 - 0
utils/__init__.py

@@ -0,0 +1,2 @@
+from .db_utils import DBUtils
+from .helpers import retry, random_delay, clean_price, extract_numbers

+ 193 - 0
utils/db_utils.py

@@ -0,0 +1,193 @@
+import logging
+from typing import Dict, List, Optional
+from datetime import datetime
+
+from sqlalchemy.exc import IntegrityError
+from models.product import Product, PriceHistory, get_session
+
+
+class DBUtils:
+    def __init__(self):
+        self.logger = logging.getLogger(__name__)
+        self.session = get_session()
+
+    def __del__(self):
+        if hasattr(self, 'session'):
+            self.session.close()
+
+    def add_product(self, product_data: Dict) -> Optional[Product]:
+        """
+        添加或更新商品信息
+        :param product_data: 商品数据字典
+        :return: Product对象或None
+        """
+        try:
+            existing = self.session.query(Product).filter(
+                Product.product_id == product_data.get('product_id'),
+                Product.platform == product_data.get('platform')
+            ).first()
+
+            if existing:
+                for key, value in product_data.items():
+                    if hasattr(existing, key) and value is not None:
+                        setattr(existing, key, value)
+                existing.update_time = datetime.now()
+                self.session.commit()
+                self.logger.info(f"更新商品信息: {product_data.get('product_id')}")
+                return existing
+            else:
+                product = Product(**product_data)
+                self.session.add(product)
+                self.session.commit()
+                self.logger.info(f"添加新商品: {product_data.get('product_id')}")
+                return product
+        except IntegrityError as e:
+            self.session.rollback()
+            self.logger.warning(f"商品已存在,跳过: {product_data.get('product_id')} - {e}")
+            return None
+        except Exception as e:
+            self.session.rollback()
+            self.logger.error(f"添加商品失败: {e}")
+            return None
+
+    def add_products_batch(self, products_data: List[Dict]) -> int:
+        """
+        批量添加商品
+        :param products_data: 商品数据列表
+        :return: 成功添加的数量
+        """
+        success_count = 0
+        for product_data in products_data:
+            if self.add_product(product_data):
+                success_count += 1
+        return success_count
+
+    def add_price_history(self, price_data: Dict) -> Optional[PriceHistory]:
+        """
+        添加价格历史记录
+        :param price_data: 价格数据字典
+        :return: PriceHistory对象或None
+        """
+        try:
+            price_history = PriceHistory(**price_data)
+            self.session.add(price_history)
+            self.session.commit()
+            self.logger.debug(f"添加价格历史: product_id={price_data.get('product_id')}, price={price_data.get('price')}")
+            return price_history
+        except Exception as e:
+            self.session.rollback()
+            self.logger.error(f"添加价格历史失败: {e}")
+            return None
+
+    def add_price_history_batch(self, prices_data: List[Dict]) -> int:
+        """
+        批量添加价格历史
+        :param prices_data: 价格数据列表
+        :return: 成功添加的数量
+        """
+        success_count = 0
+        for price_data in prices_data:
+            if self.add_price_history(price_data):
+                success_count += 1
+        return success_count
+
+    def get_product_by_id(self, product_id: str, platform: str) -> Optional[Product]:
+        """
+        根据商品ID和平台获取商品
+        :param product_id: 商品ID
+        :param platform: 平台
+        :return: Product对象或None
+        """
+        try:
+            return self.session.query(Product).filter(
+                Product.product_id == product_id,
+                Product.platform == platform
+            ).first()
+        except Exception as e:
+            self.logger.error(f"获取商品失败: {e}")
+            return None
+
+    def get_products_by_platform(self, platform: str, limit: int = 100) -> List[Dict]:
+        """
+        获取指定平台的商品
+        :param platform: 平台
+        :param limit: 返回数量限制
+        :return: 商品列表
+        """
+        try:
+            products = self.session.query(Product).filter(
+                Product.platform == platform
+            ).order_by(Product.update_time.desc()).limit(limit).all()
+            return [p.to_dict() for p in products]
+        except Exception as e:
+            self.logger.error(f"获取商品列表失败: {e}")
+            return []
+
+    def get_product_count(self, platform: str = None) -> int:
+        """
+        获取商品总数
+        :param platform: 平台,可选
+        :return: 商品数量
+        """
+        try:
+            query = self.session.query(Product)
+            if platform:
+                query = query.filter(Product.platform == platform)
+            return query.count()
+        except Exception as e:
+            self.logger.error(f"获取商品数量失败: {e}")
+            return 0
+
+    def get_price_history(self, product_id: str, platform: str, limit: int = 30) -> List[Dict]:
+        """
+        获取商品的价格历史
+        :param product_id: 商品ID
+        :param platform: 平台
+        :param limit: 返回数量限制
+        :return: 价格历史列表
+        """
+        try:
+            history = self.session.query(PriceHistory).filter(
+                PriceHistory.product_id == product_id,
+                PriceHistory.platform == platform
+            ).order_by(PriceHistory.crawl_time.desc()).limit(limit).all()
+            return [h.to_dict() for h in history]
+        except Exception as e:
+            self.logger.error(f"获取价格历史失败: {e}")
+            return []
+
+    def get_all_products(self, limit: int = 100, offset: int = 0) -> List[Dict]:
+        """
+        获取所有商品
+        :param limit: 返回数量限制
+        :param offset: 偏移量
+        :return: 商品列表
+        """
+        try:
+            products = self.session.query(Product).order_by(
+                Product.update_time.desc()
+            ).offset(offset).limit(limit).all()
+            return [p.to_dict() for p in products]
+        except Exception as e:
+            self.logger.error(f"获取所有商品失败: {e}")
+            return []
+
+    def search_products(self, keyword: str, platform: str = None, limit: int = 50) -> List[Dict]:
+        """
+        搜索商品
+        :param keyword: 搜索关键词
+        :param platform: 平台,可选
+        :param limit: 返回数量限制
+        :return: 商品列表
+        """
+        try:
+            query = self.session.query(Product).filter(
+                Product.name.like(f'%{keyword}%')
+            )
+            if platform:
+                query = query.filter(Product.platform == platform)
+            products = query.order_by(Product.update_time.desc()).limit(limit).all()
+            return [p.to_dict() for p in products]
+        except Exception as e:
+            self.logger.error(f"搜索商品失败: {e}")
+            return []

+ 179 - 0
utils/helpers.py

@@ -0,0 +1,179 @@
+import re
+import time
+import random
+import logging
+from functools import wraps
+from typing import Any, Callable, TypeVar, Tuple
+
+T = TypeVar('T')
+
+
+def retry(times: int = 3, delay: float = 2.0, backoff: float = 2.0, exceptions: Tuple = (Exception,)) -> Callable:
+    """
+    重试装饰器
+    :param times: 重试次数
+    :param delay: 初始延迟时间(秒)
+    :param backoff: 延迟倍数
+    :param exceptions: 需要捕获的异常类型
+    """
+    def decorator(func: Callable[..., T]) -> Callable[..., T]:
+        @wraps(func)
+        def wrapper(*args, **kwargs) -> T:
+            logger = logging.getLogger(__name__)
+            _delay = delay
+            for attempt in range(times):
+                try:
+                    return func(*args, **kwargs)
+                except exceptions as e:
+                    logger.warning(f"函数 {func.__name__} 执行失败 (尝试 {attempt + 1}/{times}): {e}")
+                    if attempt < times - 1:
+                        logger.info(f"等待 {_delay} 秒后重试...")
+                        time.sleep(_delay)
+                        _delay *= backoff
+                    else:
+                        logger.error(f"函数 {func.__name__} 已达到最大重试次数 {times}")
+                        raise
+        return wrapper
+    return decorator
+
+
+def random_delay(min_seconds: float = 1.0, max_seconds: float = 3.0) -> float:
+    """
+    随机延迟
+    :param min_seconds: 最小延迟时间
+    :param max_seconds: 最大延迟时间
+    :return: 实际延迟时间
+    """
+    delay = random.uniform(min_seconds, max_seconds)
+    time.sleep(delay)
+    return delay
+
+
+def clean_price(price_str: str) -> float:
+    """
+    清洗价格字符串,转换为浮点数
+    :param price_str: 价格字符串,如 "¥123.45", "123.45元", "1,234.56"
+    :return: 价格浮点数,解析失败返回 0.0
+    """
+    if not price_str:
+        return 0.0
+    
+    price_str = str(price_str).strip()
+    
+    price_str = price_str.replace('¥', '').replace('¥', '').replace('元', '')
+    price_str = price_str.replace(',', '').replace(',', '')
+    
+    match = re.search(r'(\d+\.?\d*)', price_str)
+    if match:
+        try:
+            return float(match.group(1))
+        except (ValueError, TypeError):
+            return 0.0
+    return 0.0
+
+
+def extract_numbers(text: str) -> list:
+    """
+    从文本中提取所有数字
+    :param text: 输入文本
+    :return: 数字列表
+    """
+    if not text:
+        return []
+    return [float(num) if '.' in num else int(num) for num in re.findall(r'\d+\.?\d*', str(text))]
+
+
+def parse_sales_volume(sales_str: str) -> int:
+    """
+    解析销量字符串
+    :param sales_str: 销量字符串,如 "1.5万+", "1000+", "500"
+    :return: 销量整数
+    """
+    if not sales_str:
+        return 0
+    
+    sales_str = str(sales_str).strip().lower()
+    
+    multiplier = 1
+    if '万' in sales_str or 'w' in sales_str:
+        multiplier = 10000
+        sales_str = sales_str.replace('万', '').replace('w', '')
+    
+    sales_str = sales_str.replace('+', '').replace('人', '').replace('付款', '')
+    
+    numbers = extract_numbers(sales_str)
+    if numbers:
+        return int(numbers[0] * multiplier)
+    return 0
+
+
+def format_price(price: float, currency: str = 'CNY') -> str:
+    """
+    格式化价格显示
+    :param price: 价格数值
+    :param currency: 货币单位
+    :return: 格式化后的价格字符串
+    """
+    if currency == 'CNY':
+        return f"¥{price:.2f}"
+    elif currency == 'USD':
+        return f"${price:.2f}"
+    else:
+        return f"{price:.2f} {currency}"
+
+
+def is_valid_url(url: str) -> bool:
+    """
+    检查URL是否有效
+    :param url: URL字符串
+    :return: 是否有效
+    """
+    if not url:
+        return False
+    pattern = re.compile(
+        r'^https?://'
+        r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+[A-Z]{2,6}\.?|'
+        r'localhost|'
+        r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})'
+        r'(?::\d+)?'
+        r'(?:/?|[/?]\S+)$', re.IGNORECASE)
+    return bool(pattern.match(str(url)))
+
+
+def truncate_text(text: str, max_length: int, suffix: str = '...') -> str:
+    """
+    截断文本
+    :param text: 原始文本
+    :param max_length: 最大长度
+    :param suffix: 截断后缀
+    :return: 截断后的文本
+    """
+    if not text:
+        return ''
+    text = str(text)
+    if len(text) <= max_length:
+        return text
+    return text[:max_length - len(suffix)] + suffix
+
+
+def remove_html_tags(text: str) -> str:
+    """
+    移除HTML标签
+    :param text: 包含HTML标签的文本
+    :return: 纯文本
+    """
+    if not text:
+        return ''
+    clean = re.compile('<.*?>')
+    return re.sub(clean, '', str(text))
+
+
+def normalize_whitespace(text: str) -> str:
+    """
+    规范化空白字符
+    :param text: 原始文本
+    :return: 规范化后的文本
+    """
+    if not text:
+        return ''
+    return ' '.join(str(text).split())