pipelines.py 1.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647
  1. # -*- coding: utf-8 -*-
  2. # Define your item pipelines here
  3. #
  4. # Don't forget to add your pipeline to the ITEM_PIPELINES setting
  5. # See: https://docs.scrapy.org/en/latest/topics/item-pipeline.html
  6. from pymongo import MongoClient
  7. from dateutil.parser import parse
  8. import hashlib
  9. class SpidernoticesPipeline(object):
  10. def process_item(self, item, spider):
  11. return item
  12. class ItemToMongo(object):
  13. def __init__(self, uri, db_name):
  14. self.client = MongoClient(uri)
  15. self.db_name = db_name
  16. @classmethod
  17. def from_crawler(cls, crawler):
  18. return cls(
  19. uri=crawler.settings.get('REMOTEMONGO')['uri'],
  20. db_name=crawler.settings.get('REMOTEMONGO')['notices']
  21. )
  22. def close_spider(self, spider):
  23. self.client.close()
  24. def process_item(self, item, spider):
  25. """ 存储到mongodb,数据库aiStkNotices
  26. 一个股票对应一张表,表名只有xxxxxx六位证券代码。
  27. """
  28. post = dict(item)
  29. coll = self.client[self.db_name][post['code']]
  30. temp = parse(post['ann_date']).strftime('%Y-%m-%d') # 网站上显示的只有前面的日期,不考虑tzone
  31. post['ann_date'] = parse(temp)
  32. post['_id'] = hashlib.md5(post['href'].encode('utf8')).hexdigest()
  33. # coll.insert_one(post) # 有值再插入pymongo.errors.DuplicateKeyError
  34. coll.update_one({'_id': post.pop('_id')}, {'$set': post}, upsert=True)
  35. return item