| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140 |
- import os
- from sqlalchemy import Column, Integer, String, Text, DateTime, Float, create_engine
- from sqlalchemy.ext.declarative import declarative_base
- from sqlalchemy.orm import sessionmaker
- from datetime import datetime
- from config.settings import DATABASE_CONFIG
- Base = declarative_base()
- class Product(Base):
- __tablename__ = 'products'
- id = Column(Integer, primary_key=True, autoincrement=True)
- product_id = Column(String(100), unique=True, nullable=False, comment='来源平台的商品ID')
- name = Column(String(500), nullable=False, comment='商品名称')
- url = Column(Text, comment='商品链接')
- image_url = Column(Text, comment='商品图片链接')
- shop_name = Column(String(200), comment='店铺名称')
- platform = Column(String(50), nullable=False, comment='来源平台: taobao, jd, alibaba1688')
- category = Column(String(200), comment='商品分类')
- brand = Column(String(100), comment='品牌')
- is_wholesale = Column(Integer, default=0, comment='是否为批发商品: 0-否, 1-是')
- min_order_quantity = Column(Integer, comment='最小起订量(批发)')
- unit = Column(String(50), comment='单位')
- current_price = Column(Float, comment='当前价格')
- original_price = Column(Float, comment='原价')
- currency = Column(String(10), default='CNY', comment='货币单位')
- sales_volume = Column(Integer, comment='销量')
- rating = Column(Float, comment='评分')
- review_count = Column(Integer, comment='评论数')
- stock = Column(Integer, comment='库存')
- description = Column(Text, comment='商品描述')
- create_time = Column(DateTime, default=datetime.now, comment='创建时间')
- update_time = Column(DateTime, default=datetime.now, onupdate=datetime.now, comment='更新时间')
- crawl_time = Column(DateTime, comment='最后爬取时间')
- def __repr__(self):
- return f"<Product(product_id='{self.product_id}', name='{self.name}', platform='{self.platform}')>"
- def to_dict(self):
- return {
- 'id': self.id,
- 'product_id': self.product_id,
- 'name': self.name,
- 'url': self.url,
- 'image_url': self.image_url,
- 'shop_name': self.shop_name,
- 'platform': self.platform,
- 'category': self.category,
- 'brand': self.brand,
- 'is_wholesale': self.is_wholesale,
- 'min_order_quantity': self.min_order_quantity,
- 'unit': self.unit,
- 'current_price': self.current_price,
- 'original_price': self.original_price,
- 'currency': self.currency,
- 'sales_volume': self.sales_volume,
- 'rating': self.rating,
- 'review_count': self.review_count,
- 'stock': self.stock,
- 'description': self.description,
- 'create_time': self.create_time.strftime('%Y-%m-%d %H:%M:%S') if self.create_time else None,
- 'update_time': self.update_time.strftime('%Y-%m-%d %H:%M:%S') if self.update_time else None,
- 'crawl_time': self.crawl_time.strftime('%Y-%m-%d %H:%M:%S') if self.crawl_time else None
- }
- class PriceHistory(Base):
- __tablename__ = 'price_history'
- id = Column(Integer, primary_key=True, autoincrement=True)
- product_id = Column(String(100), nullable=False, comment='商品ID')
- price = Column(Float, nullable=False, comment='价格')
- original_price = Column(Float, comment='原价')
- currency = Column(String(10), default='CNY', comment='货币单位')
- platform = Column(String(50), nullable=False, comment='来源平台')
- source_url = Column(Text, comment='来源页面URL')
- crawl_time = Column(DateTime, default=datetime.now, comment='爬取时间')
- price_type = Column(String(50), default='retail', comment='价格类型: retail(零售), wholesale(批发)')
- min_quantity = Column(Integer, comment='最小起订量(批发价格)')
- max_quantity = Column(Integer, comment='最大起订量(批发价格)')
- def __repr__(self):
- return f"<PriceHistory(product_id='{self.product_id}', price={self.price}, crawl_time='{self.crawl_time}')>"
- def to_dict(self):
- return {
- 'id': self.id,
- 'product_id': self.product_id,
- 'price': self.price,
- 'original_price': self.original_price,
- 'currency': self.currency,
- 'platform': self.platform,
- 'source_url': self.source_url,
- 'crawl_time': self.crawl_time.strftime('%Y-%m-%d %H:%M:%S') if self.crawl_time else None,
- 'price_type': self.price_type,
- 'min_quantity': self.min_quantity,
- 'max_quantity': self.max_quantity
- }
- def get_engine():
- db_type = DATABASE_CONFIG.get('type', 'sqlite')
-
- if db_type == 'sqlite':
- sqlite_config = DATABASE_CONFIG.get('sqlite', {})
- db_path = sqlite_config.get('path', 'data/price_crawler.db')
-
- db_dir = os.path.dirname(db_path)
- if db_dir and not os.path.exists(db_dir):
- os.makedirs(db_dir, exist_ok=True)
-
- connection_string = f"sqlite:///{db_path}"
- return create_engine(connection_string, echo=False, connect_args={'check_same_thread': False})
-
- elif db_type == 'mysql':
- mysql_config = DATABASE_CONFIG.get('mysql', {})
- connection_string = (
- f"mysql+pymysql://{mysql_config.get('user', 'root')}:{mysql_config.get('password', '')}"
- f"@{mysql_config.get('host', 'localhost')}:{mysql_config.get('port', 3306)}/{mysql_config.get('database', 'price_crawler')}"
- f"?charset={mysql_config.get('charset', 'utf8mb4')}"
- )
- return create_engine(connection_string, echo=False)
-
- else:
- raise ValueError(f"不支持的数据库类型: {db_type}")
- def init_db():
- engine = get_engine()
- Base.metadata.create_all(engine)
- print("数据库初始化完成")
- def get_session():
- engine = get_engine()
- Session = sessionmaker(bind=engine)
- return Session()
|