scrapy redis bloomfilter去重

scrapy redis bloomfilter去重

今天出现了两个错误,一是忘记了scrapyd重启后重置Open file limit,导致京东爬虫刷完了所有start_urls而没有获取什么数据;二是redis内存爆了,设置了3G最大内存,结果因为自己对redis的利用不够优化内存超限爬虫失败。

第一个问题可以写个自动化脚本定期reset,第二个问题的解决办法可以用bloomfilter代替scrapy redis中的去重,可以大幅度节省内存,对于数据采集这种容错率高的应用来说足够了。bloomfilter高效又节省内存,有点小小的失误率没什么关系。

  • 安装scrapy-redis-bloomfilter
  • 修改配置文件
pip install scrapy-redis-bloomfilter

# scrapy redis 配置
SCHEDULER = "scrapy_redis_bloomfilter.scheduler.Scheduler"
DUPEFILTER_CLASS = "scrapy_redis_bloomfilter.dupefilter.RFPDupeFilter"

# redis://[user:pass]@host:port/db
REDIS_URL = 'redis://@localhost:6379'

# Schedule requests using a priority queue. (default)
#SCHEDULER_QUEUE_CLASS = 'scrapy_redis.queue.PriorityQueue'

# Alternative queues.
#SCHEDULER_QUEUE_CLASS = 'scrapy_redis.queue.FifoQueue'
#SCHEDULER_QUEUE_CLASS = 'scrapy_redis.queue.LifoQueue'

# 持久化,不自动清空
SCHEDULER_PERSIST = True

# Store scraped item in redis for post-processing.
# ITEM_PIPELINES = {
#     'scrapy_redis.pipelines.RedisPipeline': 300
# }

# Number of Hash Functions to use, defaults to 6
BLOOMFILTER_HASH_NUMBER = 6
# Redis Memory Bit of Bloomfilter Usage, 30 means 2^30 = 128MB, defaults to 30
BLOOMFILTER_BIT = 30

我这里用了31,258Mb内存可以去重2个亿吧,足够了。但是,经验告诉我,不能完全依赖这个,最好还是自己存个requests的记录,方便以后增量去重,可以用mysql, mongodb, 甚至redis。

redis快速遍历超级列表

redis快速遍历大列表而不撑爆内存

有时候我们需要遍历redis中的list,但是这个list超级大(比如千万以上),服务器一下子读到内存中操作有点吃不消。这时候就需要循环遍历了,当然不是一个个循环,不然循环到老了。这时候就用到了scan, python中就是hscan,可以设置每次读取的数量(这个根据本机器的配置测试),可以快速循环又不至于撑爆机器。注意,下面的cursor=0一定要在循环之外,放在循环中就成了死循环了。我在笔记本遍历远程服务器上的一个900多万的列表,花了800多秒,每次读取10000.

import redis
import time
r = redis.Redis(host=redis_host, port=7379, db=0, password=redis_pwd, encoding='utf-8', decode_responses=True)
# url_ = str(random.choice(client.hvals('adsl')))
# print('str_key1值字节长度:{}'.format(r.llen('lagouspd:start_urls')))
# # 获取元素:切片方式
# print(r.lrange('lagoutest:start_urls',0, 1), r.lrange('lagouspd:start_urls',1,2))
# list_redis = r.lrange('lagoutest:start_urls',0, -1)

# set_redis = {""}
# set_redis.update(list_redis)
# r.lpush('lagouspd:start_urls', 'https://www.lagou.com/wuhan/')
t1 = time.time()
cursor=0
while True:
    cursor, data = r.hscan('existing_skus', cursor, match=None, count=10000)
    print(f'cursor: {cursor}')
    list_redis = list(data.keys())
    set_redis = {""}
    set_redis.update(list_redis)
    r.lpush('jd_comment:skus', *set_redis)
    if cursor==0:
        break
t2 = time.time() 
print('time elapsed:', t2-t1)
# while cursor != 0:
#     cursor, keys = r.scan(cursor=cursor, count=1000000)
#     values = client.mget(*keys)
#     values = [value for value in values if not value == None]
#     data.update(dict(zip(keys, values)))

# for item in r.hscan_iter('xdaili'):
#     print(item)
# print(r.hscan_iter("xdaili"))    # 生成器内存地址