scrapy redis 增量爬虫

各种技术贴满满的copy, paste味道,真遇到实际问题却很少提及,比如最常见的增量爬虫。scrapy redis在第一次爬取时可以用自带的驱虫中间件,但是增量爬虫不行,这里我写了个中间件可以实现增量爬取,前提是目标url会变化。如果是url不变但是网页内容变化的增量爬虫,就需要新的解决方案了。

虽然配合redis,scrapy理论上可以暂停重启,但是有时候程序可能有问题一次ctrl c无法停止,强制两次ctrl C又会破坏scrapy redis的重启机制。很多时候dupefilter里的url指纹和mysql数据库里的url可能不一致,甚至有些情况下你必须清空dupefilter重新运行爬虫(强制停止爬虫时很多requests虽然没有爬取成功但是指纹却放到了dupefilter,下一次爬取时就会忽略这些url),这个中间件就能解决你的问题。

方案很简单,就是从mysql读取所有的url放到redis,每次request之前都会检查下所要请求的url是否已经爬取了,redis比mysql效率高很多,非常适合这种秒级应用。另外,如果url很多,可以用md5来节省redis内存,这里没有用,很容易实现。

class DedupeMiddleware(object): # 定制去重中间件

    logger = logging.getLogger(__name__) # 利用这个可以方便进行scrapy 的log输出
    client = redis.Redis(host=settings.REDIS_HOST, port=6379, db=0, password=settings.REDIS_PWD)
    if client:
        logger.info('redis connected for dedupe')
    else:
        logger.info('redis connect failed for dedupe') 

    def process_request(self, request, spider):
        # Called for each request that goes through the downloader
        # middleware.

        # Must either:
        # - return None: continue processing this request
        # - or return a Response object
        # - or return a Request object
        # - or raise IgnoreRequest: process_exception() methods of
        #   installed downloader middleware will be called
        if self.client.hexists('existing_url', request.url): #取item里的url和key里的字段对比,看是否存在,存在就丢掉这个item。不存在返回item给后面的函数处理
            self.logger.info(f'{request.url} already in mysql')
            raise IgnoreRequest(f'{request.url} already in mysql')
        return None

Docker swarm部署splash服务集群+HAProxy负载均衡

docker stack deploy建立多服务集群并使用HAProxy搭建负载均衡

github中有一个非常好的单机多开splash Docker container的方案,但是,它仅仅适用于单机,如果是集群,它就无法应付了。这篇文章主要是解决集群部署splash的方案,Google半天也没有现成的方案,自己琢磨了下搞定了。

https://github.com/TeamHG-Memex/aquarium

适用平台: windows 10, centos 7.6

技术方案:利用yaml搭建Docker Swarm集群并部署多个服务(Visualizer, splash 3.5.0, HAproxy)

为什么用yml呢?看下下面docker service create,一次只能搭建一个服务,想搭建多个就不太方便,也不容易定制一些参数

集群建设

(boss) [chen@VM_0_3_centos aquarium]$ docker service create -p 8050:8050 –replicas 6 –name splash scrapinghub/splash /bin/bash
j3qtsdci0qum515nzoxpxu7u4
overall progress: 6 out of 6 tasks
1/6: running [==================================================>]
2/6: running [==================================================>]
3/6: running [==================================================>]
4/6: running [==================================================>]
5/6: running [==================================================>]
6/6: running [==================================================>]
verify: Service converged

Prerequisite: docker-ce (Linux平台需要单独安装,只安装docker是不够的) 或者Docker for windows, windows平台是集成化。

version: “3”

services:
visualizer:
image: dockersamples/visualizer
volumes:
– “/var/run/docker.sock:/var/run/docker.sock”
ports:
– 8080:8080
deploy:
placement:
constraints: [node.role == manager]
# labels:
# – com.df.notify=true
# – com.df.serviceDomain=visualizer.youclk.com
# – com.df.port=8080
# – com.df.usersSecret=admin

yml文件如下:

splash:
    image: scrapinghub/splash
    ports:
        - 8050:8050

    deploy:
        mode: replicated
        replicas: 2
        labels: [APP=SPLASH]
        # service resource management
        resources:
            # Hard limit - Docker does not allow to allocate more
            limits:
                cpus: '0.25'
                memory: 2048M
            # Soft limit - Docker makes best effort to return to it
            reservations:
                cpus: '0.25'
                memory: 2560M
        # service restart policy
        restart_policy:
            condition: any
            delay: 5s
            max_attempts: 3
            window: 120s

        # placement constraint - in this case on 'worker' nodes only
        placement:
            constraints: [node.role == worker]

(boss) [chen@VM_0_3_centos aquarium]$ docker stack deploy -c docker-compose.yml splash
Updating service splash_visualizer (id: lcfw3l45xkvewmly5yz7ukywh)
Updating service splash_splash (id: v1s5gbl9nfzmmp1hct6c8okce)

如果之前运行过上述命令,那么重新运行时会更新服务,方便扩展。因为replicas是2,每个服务器只创建了一个splash container。

然后我把replica改成4,Visualizer监控界面就看到了4个container:

来看一下其中一个服务器,能看到两个不同时间创建的container:

(boss) [chen@VM_0_2_centos ~]$ docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
0ae3f202164d scrapinghub/splash:latest “python3 /app/bin/sp…” 3 minutes ago Up 3 minutes 8050/tcp splash_splash.1.kpj3rv7piojx4yq3h5w3vamtv
9d353fe1bd6a scrapinghub/splash:latest “python3 /app/bin/sp…” 27 minutes ago Up 26 minutes 8050/tcp splash_splash.3.bl3amsvyticoje9vv3yethp4c

那么,为什么说它是高可用呢?我们重启或者停下VM_0_2_centos下的一个container,我们看到集群重新建了一个container:

(boss) [chen@VM_0_2_centos ~]$ docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
0ae3f202164d scrapinghub/splash:latest “python3 /app/bin/sp…” 3 minutes ago Up 3 minutes 8050/tcp splash_splash.1.kpj3rv7piojx4yq3h5w3vamtv
9d353fe1bd6a scrapinghub/splash:latest “python3 /app/bin/sp…” 27 minutes ago Up 26 minutes 8050/tcp splash_splash.3.bl3amsvyticoje9vv3yethp4c
(boss) [chen@VM_0_2_centos ~]$ docker stop 0ae3f202164d
0ae3f202164d
(boss) [chen@VM_0_2_centos ~]$ docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
d9253824289f scrapinghub/splash:latest “python3 /app/bin/sp…” 39 seconds ago Up 33 seconds 8050/tcp splash_splash.1.fo051grhynfybzbh407a1hith
9d353fe1bd6a scrapinghub/splash:latest “python3 /app/bin/sp…” 30 minutes ago Up 29 minutes 8050/tcp splash_splash.3.bl3amsvyticoje9vv3yethp4c

这个集群会努力保持集群中始终有4个container,如果一个死掉或者重启了会立即重建一个container,如果一个服务器宕机了,那么会在其他服务器重建4个container。当然,前提是你的服务器没有全部GG了

文中没有细说集群建设,网络中到处是这种帖子,简单说下:

创建管理节点manager node:

docker swarm init –advertise-addr=Manager IP

运行这个命令后会提示一个token,用这个token在worker node上运行下面命令即可加入集群,非常方便,不需要考虑网络问题:

docker swarm join –token SWMTKN-1-0zhhp71dfw77axxiuoixrechl8kcd9tapi9tbqskgtvt9yxxxxxxxxxxxxxxxxxxxxxxx –advertise-addr=WORKER IP:8050

HAproxy负载均衡

虽然swarm据说也有负载均衡,但是我测试了下,它的负载均衡仅仅限于一个节点上,外面还要套一层负载均衡才能对外服务,这个可以参考Youtube上的微软工程师的视频: https://www.youtube.com/watch?v=ZfMV5JmkWCY&t=170s

视频一共三个,第三个提到了负载均衡提供对外服务。这里我用一个性能很弱的腾讯云服务器做的负载均衡,也是docker集群的管理节点。

HAProxy监控界面
# HAProxy 1.7 config for Splash. It assumes Splash instances are executed
# on the same machine and connected to HAProxy using Docker links.
global
    # raise it if necessary
    maxconn 512
    # required for stats page
    stats socket /tmp/haproxy

userlist users
    user USER insecure-password PASSWD

defaults
    log global
    mode http

    # remove requests from a queue when clients disconnect;
    # see https://cbonte.github.io/haproxy-dconv/1.7/configuration.html#4.2-option%20abortonclose
    option abortonclose

    # gzip can save quite a lot of traffic with json, html or base64 data
    # compression algo gzip
    compression type text/html text/plain application/json

    # increase these values if you want to
    # allow longer request queues in HAProxy
    timeout connect 3600s
    timeout client 3600s
    timeout server 3600s


# visit 0.0.0.0:8036 to see HAProxy stats page
listen stats
    bind *:8036
    mode http
    stats enable
    stats hide-version
    stats show-legends
    stats show-desc Splash Cluster
    stats uri /
    stats refresh 10s
    stats realm Haproxy\ Statistics
    stats auth    admin:adminpass


# Splash Cluster configuration
# 代理服务器监听全局的8050端口
frontend http-in
    bind *:8050
    # 如果你需要开启Splash的访问认证
    # 则注释default_backend splash-cluster
    # 并放开其余default_backend splash-cluster 之上的其余注释
    # 账号密码为user  userpass
    acl auth_ok http_auth(users)
    http-request auth realm Splash if !auth_ok
    http-request allow if auth_ok
    http-request deny

    acl staticfiles path_beg /_harviewer/
    acl misc path / /info /_debug /debug

    use_backend splash-cluster if auth_ok !staticfiles !misc
    use_backend splash-misc if auth_ok staticfiles
    use_backend splash-misc if auth_ok misc
    default_backend splash-cluster


backend splash-cluster
    option httpchk GET /
    balance leastconn

    # try another instance when connection is dropped
    retries 2
    option redispatch
    # 将下面IP地址替换为你自己的Splash服务IP和端口
    # 按照以下格式一次增加其余的Splash服务器
    server splash-0 SPLASH0_IP:8050 check maxconn 50 inter 2s fall 10 observe layer4
    server splash-1 SPLASH1_IP:8050 check maxconn 50 inter 2s fall 10 observe layer4

backend splash-misc
    balance roundrobin
    # 将下面IP地址替换为你自己的Splash服务IP和端口
    # 按照以下格式一次增加其余的Splash服务器
    server splash-0 SPLASH0_IP:8050 check fall 15
    server splash-1 SPLASH1_IP:8050 check fall 15

重磅: scrapy crawl SPIDER -a http_user=’USER’ -a http_pass=’PASSWD’

splash加密访问之后,如何在scrapy项目中利用是个问题,这个命令行解决了一大难题。网络上多数帖子都是抄袭的,这种关键问题却是没几个人提到。如果像我一样你想用多进程,一个服务器开多个scrapy进程,那么下面的脚本能解决你的问题:

from scrapy.crawler import CrawlerProcess
from scrapy.utils.project import get_project_settings
from multiprocessing import Pool
import os, time, random, multiprocessing

def long_time_task(name):
    # 如下代码中加入异常捕捉后可以正常返回结果,防止主进程一直被阻塞
    pid=os.getpid()
    try:
        print('pid:%d'%pid)
        print('Run task %s (%s)...' % (name, os.getpid()))
        start = time.time()
        process = CrawlerProcess(get_project_settings())
        process.crawl('lagouspd', domain='lagou.com', http_user='USER', http_pass='PASSWD')
        process.start(stop_after_crawl = False) # the script will block here until the crawling is finished       
        end = time.time()
        print('Task %s runs %0.2f seconds.' % (name, (end - start)))       
        time.sleep(2)
    except KeyboardInterrupt:
        print('进程%d被中断...'%pid)



if __name__=='__main__':
    print('Parent process %s.' % os.getpid())
    try:
        p = Pool(multiprocessing.cpu_count()*4)
        for i in range(multiprocessing.cpu_count()*4):
            p.apply_async(long_time_task, args=(i,))
            time.sleep(random.random() * 3)
        print('Waiting for all subprocesses done...')
        p.close()
        p.join()
        print('All subprocesses done.')
    except KeyboardInterrupt:
        print('catch keyboardinterupterror')
        pid=os.getpid()
        os.popen('taskkill.exe /f /pid:%d'%pid) #在unix下无需此命令,但在windows下需要此命令强制退出当前进程
    except Exception as e:
        print(e)
    else:
        print('quit normally')        

脚本中的用户名密码就是上面的USER, PASSWD,不然splash没法使用。该脚本还解决了multiprocessing多进程用进程池时没法CTRL C暂停爬虫的问题