利用scrapy-redis爬取链家120个城市二手房源信息

使用scrapy-redis分布式爬取链家120个城市二手房源,数据存入mongoDB进行进一步分析

链家成交记录爬取差不多之后,我花了一下午功夫写好了链家二手房的爬虫脚本。先亮一下爬取的字段:

{
	"_id" : "101105654937",
	"index_url" : "https://bj.lianjia.com/ershoufang/chaoyang/pg4p5l1a3/",
	"detail_url" : "https://bj.lianjia.com/ershoufang/101105654937.html",
	"house_code" : "101105654937",
	"detail_title" : "满五年唯一户型方方正正落地窗很亮堂",
	"detail_houseInfo" : "1室1厅 | 73.08平米 | 西 | 简装 | 低楼层(共14层) | 2004年建 | 板楼",
	"detail_totalPrice" : "500",
	"detail_unitPrice" : "单价68419元/平米",
	"detail_sellpoint" : "VR看装修",
	"detail_agentName" : "张朝飞",
	"detail_agentId" : "1000000010088095",
	"aroundInfo" : {
		"小区名称" : "远洋天地三期",
		"所在区域" : "朝阳四惠四至五环",
		"看房时间" : "提前预约随时可看",
		"链家编号" : "101105654937"
	},
	"detail_dealBread" : "北京房产网北京二手房朝阳二手房四惠二手房远洋天地三期二手房",
	"detail_followers" : "143",
	"detail_basic_info" : {
		"房屋户型" : "1室1厅1厨1卫",
		"所在楼层" : "低楼层 (共14层)",
		"建筑面积" : "73.08㎡",
		"户型结构" : "平层",
		"套内面积" : "54.18㎡",
		"建筑类型" : "板楼",
		"房屋朝向" : "西",
		"建筑结构" : "钢混结构",
		"装修情况" : "简装",
		"梯户比例" : "两梯八户",
		"供暖方式" : "集中供暖",
		"配备电梯" : "有"
	},
	"detail_transaction_info" : {
		"挂牌时间" : "2019-09-04",
		"交易权属" : "商品房",
		"上次交易" : "2005-11-11",
		"房屋用途" : "普通住宅",
		"房屋年限" : "满五年",
		"产权所属" : "非共有",
		"抵押信息" : "有抵押 10万元 建设 业主自还",
		"房本备件" : "已上传房本照片"
	},
	"community_name" : "远洋天地三期",
	"community_url" : "https://bj.lianjia.com/xiaoqu/1111027382215/",
	"community_info" : {
		
	},
	"detail_features" : {
		"房源标签" : "VR看装修房本满五年",
		"核心卖点" : "此房满五年唯一,户型方方正正落地窗采光不错",
		"小区介绍" : "远洋天地是中远地产2004开发,物业是小区自有物业远洋亿家物业管理有限公司。小区环境优美,生活气息浓郁。24小时保安服务,24小时监控,进门需要刷门禁卡。",
		"户型介绍" : "房子户型格局方正,客厅西向落地窗,采光好,卧室朝西,餐厅位置可以做衣帽间或者书房。厨房带阳台,可以放置洗衣机。",
		"周边配套" : "小区是成熟社区,内部商业成熟,生活方便。交通方便,出西门就是四环路,北门是朝阳路,本房距离一号线四惠站520米(此数据来源百度地图)。商场有未来汇和苏宁生活广场,成龙耀莱影城。红领巾公园,朝阳公园。"
	},
	"resblockPosition" : "116.498731,39.918076",
	"city_id" : "110000",
	"city_name" : "city_name: '北京'",
	"resblockId" : "1111027382215"
}

需要注意的是,链家对于列表页反爬不厉害,但是对于每个房源详情页是有反爬的。我做过测试,一般2秒之内能返回数据,我将timeout设置为3秒,但是重试次数设置的很大-15,这里考虑到如果列表页没有返回数据会丢失很多数据。我用了6个代理服务器20秒拨号一次,对于链家来说足够了,但是对于正在采集的京东来说有的不够,尤其是京东评论,单IP并发超过3都容易返回空值。

采集二手房交易记录的时候我发现详情页的小区简介信息是js加载的,就没有去分析js,但是二手房在售房源的详情页小区简介信息却没有js加载。不过链家的js都是明文,很容易分析,有时间我做个解密贴出来。

核心代码如下:

#!/usr/bin/env python3
# # -*- coding: utf-8 -*-
import json
import logging
import uuid
import pickle
import scrapy
from scrapy_redis import spiders
from scrapy.utils.project import get_project_settings
from scrapy_redis.utils import bytes_to_str
import redis
import random
from scrapy_redis.spiders import RedisSpider
from lianjia.items import LianjiaItem, ErshoufangItem
from lianjia.log import logger
import re
import sys

class ErshoufangSpider(RedisSpider):
    name = 'ershoufang'
    allowed_domains = ['lianjia.com']
    # start_urls = ['http://lianjia.com/']
    redis_key = 'lianjia_ershoufang:start_urls'
    
    def __init__(self, *args, **kwargs):
        super(ErshoufangSpider, self).__init__(*args, **kwargs)
            
    def parse(self, response):
        index_url = response.url
        if response.xpath('//h2[@class="total fl"]/span/text()'):
            num_found = int(response.xpath('//h2[@class="total fl"]/span/text()').extract_first())
            logger.info(f'num of apartments found in {index_url}: {num_found}')        
            if num_found in range(1,3001) or 'de' in index_url[:-6]:
                try:
                    logger.debug(f'index request.meta: {response.request.meta} {index_url}')
                    logger.debug(f'index request.headers: {response.request.headers} {index_url}')            
                    total_pages = int(num_found/30) + 1
                    aprt_list = response.xpath('//ul[@class="sellListContent"]/li')
                    logger.info(f'num of apartments in the current_pgNum: {len(aprt_list)}')
                    pattern = re.compile(r'"curPage":\d+')
                    curPage_ = re.search(pattern, response.text)[0]
                    patternd = re.compile(r'\d+')
                    current_pgNum = int(re.search(patternd, curPage_)[0])
                    logger.info(f'curPage matched: {current_pgNum}')
                    logger.debug(f'debug index_url: {index_url}')
                    # current_pgNum = int(response.xpath('//div[@class="contentBottom clear"]/div[@class="page-box fr"]/div[@class="page-box house-lst-page-box"]/a[@class="on"]/text()').extract_first())            
                    for li in aprt_list:
                        aprt_link = self.eleMissing(li.xpath('./div[@class="info clear"]/div[@class="title"]/a/@href').extract_first())
                        aprt_house_code = self.eleMissing(li.xpath('./div[@class="info clear"]/div[@class="title"]/a/@data-housecode').extract_first())
                        aprt_title = self.eleMissing(self.strJoin(li.xpath('./div[@class="info clear"]/div[@class="title"]/a/text()').extract()))
                        houseInfo = self.eleMissing(self.strJoin(li.xpath('./div[@class="info clear"]/div[@class="address"]/div[@class="houseInfo"]/text()').extract()))
                        aprt_totalPrice =  self.eleMissing(li.xpath('./div[@class="info clear"]/div[@class="priceInfo"]/div[@class="totalPrice"]/span/text()').extract_first())
                        aprt_unitPrice = self.eleMissing(li.xpath('./div[@class="info clear"]/div[@class="priceInfo"]/div[@class="unitPrice"]/span/text()').extract_first())
                        aprt_features = self.eleMissing(li.xpath('./div[@class="info clear"]/div[@class="tag"]/span/text()').extract_first())                
                        yield scrapy.Request(url=aprt_link, meta={'detail_url': aprt_link, 'house_code': aprt_house_code, 'detail_title': aprt_title, 'detail_houseInfo': houseInfo,
                        'detail_totalPrice': aprt_totalPrice, 'detail_unitPrice': aprt_unitPrice,
                        'detail_sellpoint': aprt_features, 'index_url': index_url,
                        'dont_redirect': True,'handle_httpstatus_list': [301,302], 'referer': index_url}, callback=self.parse_item, dont_filter=False) #'dont_redirect': True, 'handle_httpstatus_list': [302],
                    if current_pgNum < total_pages:
                        pg = 'pg' + str(current_pgNum)
                        next_url = re.sub(f'/{pg}', f'/pg{current_pgNum + 1}', index_url)
                        logger.debug(f'next_url: {next_url}')
                        yield scrapy.Request(url=next_url, callback=self.parse, dont_filter=True, meta={'referer': index_url, 'dont_redirect': True, 'handle_httpstatus_list': [301,302]})
                except Exception as e:
                    logger.info(e)
                    # logger.info(response.text)
                    # sys.exit()
            elif num_found > 3000:
            
                dir_filter = self.filter_extract(response.xpath('//div[@class="list-more"]/dl[4]/dd/a/@href')) #direction
                floor_filter = self.filter_extract(response.xpath('//div[@class="list-more"]/dl[5]/dd/a/@href')) #floor
                decoration_filter = self.filter_extract(response.xpath('//div[@class="list-more"]/dl[7]/dd/a/@href')) #decoration
                list_dir_floor = [(x+y+z) for x in dir_filter for y in floor_filter for z in decoration_filter]
                for dir in list_dir_floor:
                    dir_url = index_url[:-1] + dir + '/'
                    yield scrapy.Request(url=dir_url, callback=self.parse, dont_filter=True, meta={'referer': index_url, 'dont_redirect': True, 'handle_httpstatus_list': [301,302]})
            else:
                logger.warning(f'data ignored. total num of apartments exceeds 3000. {index_url}')
    def parse_item(self, response):    
        logger.debug(f'request.meta: {response.request.meta} {response.url}')
        logger.debug(f'request.headers: {response.request.headers} {response.url}')     
        item = ErshoufangItem()
        item['index_url'] = response.meta['index_url']
        item['detail_url'] = response.meta['detail_url']
        item['house_code'] = response.meta['house_code']
        item['_id'] = item['house_code']
        item['detail_title'] = response.meta['detail_title']
        item['detail_houseInfo'] = response.meta['detail_houseInfo'] 
        item['detail_totalPrice'] = response.meta['detail_totalPrice']
        item['detail_unitPrice'] = response.meta['detail_unitPrice']
        item['detail_sellpoint'] = response.meta['detail_sellpoint']
        item['detail_agentName'] = response.xpath('//div[@class="brokerName"]/a/text()').extract_first()
        item['detail_agentId'] = self.eleMissing(response.xpath('//div[@class="brokerName"]/a/@href').extract_first()).split('/')[-1]
        around_names = self.eleMissing(response.xpath('//div[@class="aroundInfo"]/div/span[@class="label"]/text()').extract())[:-1]
        community_name = self.eleMissing(response.xpath('//div[@class="aroundInfo"]/div[@class="communityName"]/a[1]/text()').extract_first())
        areaName = self.strJoin(self.eleMissing(response.xpath('//div[@class="aroundInfo"]/div[@class="areaName"]/span[@class="info"]/a/text()').extract()))\
        + self.eleMissing(self.strJoin(response.xpath('//div[@class="aroundInfo"]/div[@class="areaName"]/span[@class="info"]/text()').extract()))
        visitTime = self.eleMissing(response.xpath('//div[@class="aroundInfo"]/div[@class="visitTime"]/span[@class="info"]/text()').extract_first())
        houseRecord = self.eleMissing(response.xpath('//div[@class="aroundInfo"]/div[@class="houseRecord"]/span[@class="info"]/text()').extract_first())
        around_values = [community_name, areaName, visitTime, houseRecord]
        item['aroundInfo'] = dict(zip(around_names, around_values))
        item['detail_dealBread'] = self.strJoin(self.strJoin(response.xpath('//div[@class="fl l-txt"]/a/text()').extract()))
        # ajax
        # item['detail_visitTimes'] = self.eleMissing(response.xpath('//section[@class="wrapper"]//div[@class="msg"]/span[4]/label/text()').extract_first())
        item['detail_followers'] = self.eleMissing(response.xpath('//span[@id="favCount"]/text()').extract_first())
        basic_info_names = self.stripList(response.xpath('//div[@class="base"]/div[@class="content"]/ul/li/span/text()').extract())
        basic_info_values = self.stripList(response.xpath('//div[@class="base"]/div[@class="content"]/ul/li/text()').extract())
        item['detail_basic_info'] = dict(zip(basic_info_names, basic_info_values))
        transaction_info_names = self.stripList(response.xpath('//div[@class="transaction"]//div[@class="content"]/ul/li/span[contains(@class, "label")]/text()').extract())
        transaction_info_values = self.stripList(response.xpath('//div[@class="transaction"]//div[@class="content"]/ul/li/span[2]/text()').extract())        
        item['detail_transaction_info'] = dict(zip(transaction_info_names, transaction_info_values))   
        # item['community_name'] = self.eleMissing(response.xpath('//*[@id="resblockCardContainer"]/div[@class="newwrap"]/div[@class="xiaoquCard"]/div[@class="xiaoqu_header clear"]/h3/span/text()').extract_first())[:-2]
        item['community_name'] = community_name
        # item['community_url'] = response.xpath('//*[@id="resblockCardContainer"]/div[@class="newwrap"]/div[@class="xiaoquCard"]/div[@class="xiaoqu_header clear"]/a/@href').extract_first()
        # pattern_url = re.compile(r'https://bj.lianjia.com/chengjiao/c\d+')
        pattern = re.compile(r'.*?(?=/ershoufang)')         
        item['community_url'] = re.search(pattern, item['index_url'])[0] + self.eleMissing(response.xpath('//div[@class="aroundInfo"]/div[@class="communityName"]/a[1]/@href').extract_first())
        community_info_label = response.xpath('//*[@id="resblockCardContainer"]/div[@class="newwrap"]/div[@class="xiaoquCard"]/div[@class="xiaoqu_content clear"]/div[@class="xiaoqu_main fl"]/div/label/text()').extract()
        community_info_value = response.xpath('//*[@id="resblockCardContainer"]/div[@class="newwrap"]/div[@class="xiaoquCard"]/div[@class="xiaoqu_content clear"]/div[@class="xiaoqu_main fl"]/div/span/text()').extract()
        item['community_info'] = dict(zip(self.stripList(community_info_label), self.stripList(community_info_value)))
        feature_label = self.eleMissing(response.xpath('//div[@class="introContent showbasemore"]/div/div[@class="name"]/text()').extract())
        tags = [self.strJoin(response.xpath('//div[@class="introContent showbasemore"]/div[@class="tags clear"]/div[@class="content"]/a/text()').extract())]
        tags_ = self.eleMissing(response.xpath('//div[@class="introContent showbasemore"]/div[not(contains(@class,"tags clear"))]/div[@class="content"]/text()').extract())
        item['detail_features'] = dict(zip(self.stripList(feature_label), self.stripList(tags + tags_)))
        # positionInfo: 
        pattern_pos = re.compile(r"resblockPosition:'\d+.\d+,\d+.\d+")
        pos_ = re.search(pattern_pos, response.text)[0]
        item['resblockPosition'] = self.eleMissing(re.search(r'\d+.\d+,\d+.\d+', pos_))[0]
        # city_id:
        pattern_cityId = re.compile(r"city_id: '\d+")
        cityId_ = re.search(pattern_cityId, response.text)[0]
        item['city_id'] = self.eleMissing(re.search(r'\d+', cityId_))[0]
        # city_name
        pattern_cityName = re.compile(r"city_name: '.*'")
        item['city_name'] = self.eleMissing(re.search(pattern_cityName, response.text))[0]
        # resblockId
        pattern_resblockId = re.compile(r"resblockId:'\d+'")
        resblockId_ = re.search(pattern_resblockId, response.text)
        logger.debug(f'resblockId_: {resblockId_}')
        resblockId_ = resblockId_[0]
        item['resblockId'] = self.eleMissing(re.search(r'\d+', resblockId_))[0]
        yield item
    def strJoin(self, element_list):
        return ''.join(i.strip() for i in element_list)
    def eleMissing(self, element):
        if element is None:
            return " "
        else:
            return element
    def stripList(self, eleList):
        return [i.strip() for i in eleList]
    # 提取出筛选条件
    def filter_extract(self, filter_list):
        filters = []
        for i in filter_list:
            print(i.split('/')[-2])
            filters.append(i.split('/')[-2])
        return filters

至关重要的中间件:代理,重试,增量去重和请求头(有时间再上传到github)

# Define here the models for your spider middleware
#
# See documentation in:
# https://docs.scrapy.org/en/latest/topics/spider-middleware.html

from scrapy import signals

# useful for handling different item types with a single interface
from itemadapter import is_item, ItemAdapter

from random import choice
from user_agent import generate_user_agent
from scrapy import signals
from scrapy.exceptions import NotConfigured
from scrapy.http.headers import Headers
from scrapy.utils.project import get_project_settings
import os
import time
import re
import random
import redis
import sys

# useful for handling different item types with a single interface
from itemadapter import is_item, ItemAdapter
import datetime
from lianjia.log import logger
from scrapy.downloadermiddlewares.retry import RetryMiddleware
import json
import jsonpath
# 异常处理
from twisted.internet import defer
from twisted.internet.error import TimeoutError, DNSLookupError, \
    ConnectionRefusedError, ConnectionDone, ConnectError, \
    ConnectionLost, TCPTimedOutError
from scrapy.http import HtmlResponse
from twisted.web.client import ResponseFailed
from scrapy.core.downloader.handlers.http11 import TunnelError
from scrapy.utils.httpobj import urlparse_cached
from scrapy.exceptions import IgnoreRequest

settings = get_project_settings()

class DedupeMiddleware(object): # 代理中间件

    client = redis.Redis(host=settings['REDIS_HOST'], port=7379, db=0, password=settings['REDIS_PWD'])
    if client:
        logger.info('redis connected for dedupe')
    else:
        logger.info('redis connect failed for dedupe') 

    def process_request(self, request, spider):
        # Called for each request that goes through the downloader
        # middleware.

        # Must either:
        # - return None: continue processing this request
        # - or return a Response object
        # - or return a Request object
        # - or raise IgnoreRequest: process_exception() methods of
        #   installed downloader middleware will be called 
        if 'chengjiao' in request.url:
            if 'pg' not in request.url:
                # sku_ = request.url.split('/')[-1]
                # sku = re.search('\d+', sku_)[0]    
                sku = request.url.split('/')[-1].split('.')[0]
                if self.client.hexists('lianjia_transaction_existing_skus', sku): #取item里的url和key里的字段对比,看是否存在,存在就丢掉这个item。不存在返回item给后面的函数处理
                    logger.info(f'{request.url} already in mysql')
                    raise IgnoreRequest(f'{request.url} already in mysql')
        else:
            if 'pg' not in request.url:
                # sku_ = request.url.split('/')[-1]
                # sku = re.search('\d+', sku_)[0]    
                sku = request.url.split('/')[-1].split('.')[0]
                if self.client.hexists('lianjiaErshoufang_existing_skus', sku): #取item里的url和key里的字段对比,看是否存在,存在就丢掉这个item。不存在返回item给后面的函数处理
                    logger.info(f'{request.url} already in mysql')
                    raise IgnoreRequest(f'{request.url} already in mysql')            
class LocalRetryMiddleware(RetryMiddleware):
    ALL_EXCEPTIONS = (defer.TimeoutError, TimeoutError, DNSLookupError,
                      ConnectionRefusedError, ConnectionDone, ConnectError,
                      ConnectionLost, TCPTimedOutError, ResponseFailed,
                      IOError, TunnelError)
    pool = redis.ConnectionPool(
        host=settings['REDIS_HOST'], port=settings['REDIS_PORT'], password=settings['REDIS_PWD'], db=0)
    db = redis.Redis(connection_pool=pool)    
    def process_response(self, request, response, spider):      
        ip = request.meta["proxy"].split('//')[1].split(':')[0]
        reason = 'ip banned: ' + str(request.meta["proxy"])
        new_response = HtmlResponse(response.url, body=response.body, encoding="utf-8").text
        if len(new_response) < 100:
            logger.info(f'task {os.getpid()} check url: {new_response[:150]} {response.url}, {response.status}') 
            ip = request.meta["proxy"].split('//')[1].split(':')[0]
            reason = 'ip banned: ' + str(request.meta["proxy"])            
            self.remove_proxy(ip, response.url)       
            logger.warning('返回值异常,进行重试...')            
            return self._retry(request, reason, spider) or response
        elif 'ershoufang' in response.url or 'chengjiao' in response.url:
            if not new_response.xpath('//div[@class="total fl"]/span/text() | //h2[@class="total fl"]/span/text()'):
                logger.info(f'task {os.getpid()} check url: {new_response[:150]} {response.url}, {response.status}') 
                ip = request.meta["proxy"].split('//')[1].split(':')[0]
                reason = 'ip banned: ' + str(request.meta["proxy"])            
                self.remove_proxy(ip, response.url)       
                logger.warning('返回值异常,进行重试...')            
                return self._retry(request, reason, spider) or response                
        return response
        
    def process_exception(self, request, exception, spider):
        if isinstance(exception, self.EXCEPTIONS_TO_RETRY) \
                and not request.meta.get('dont_retry', False):
            # # 删除该代理
            ip = request.meta["proxy"].split('//')[1].split(':')[0]
            reason = 'ip banned: ' + str(request.meta["proxy"])            
            self.remove_proxy(ip, request.url)       
            logger.warning('连接异常, 进行重试...')
            return self._retry(request, exception, spider)        
    def remove_proxy(self, ip, url):
        """
        移除代理
        :return: None
        通常情况下,连续拨号失败几次就需要重启机器了,这时候VPS已经无法成功拨号连接互联网了
        """
        logger.info(f'task {os.getpid()} Removing {ip}...')
        proxies = self.db.hvals('adsl')
        names = ['adsl1', 'adsl2', 'adsl3', 'adsl4', 'adsl5', 'adsl6', 'adsl7']
        if proxies:
            for name in names:
                x = self.db.hget(('adsl'), name) #get specified proxy
                if x:
                    proxy = re.search('\d+.\d+.\d+.\d+', str(x))[0] # ip digit only                                
                    try:
                        if proxy == ip:                
                            self.db.hdel('adsl', name)                    
                            logger.info(f'task {os.getpid()} Removed {name} {proxy} successfully for {url}')
                        else:
                            logger.info(f'task {os.getpid()} proxy to remove {ip}; ip from redis: {proxy};  for {url}')
                    except redis.ConnectionError:
                        logger.info(f'task {os.getpid()} Remove {ip} failed for {url}')  
            logger.info(f'{proxy} is already gone')
            
class process_ProxiesMiddlewares(object):
    pool = redis.ConnectionPool(
        host=settings['REDIS_HOST'], port=settings['REDIS_PORT'], password=settings['REDIS_PWD'], db=0)
    server = redis.Redis(connection_pool=pool)
    if not os.path.exists("used_IP_list.txt"):
        with open("used_IP_list.txt", "w") as f:
            f.write('')
    def process_request(self, request, spider):
        # 知道ip和端口的免费代理
        proxies = 'http://use:password@' + self.get_proxy() + ':3389'
        # logger.info(f'proxies: {proxies}')
        request.meta['proxy'] = proxies
    # def process_response(self, request, response, spider):
        # # 返回数据的处理
        # if response.url == 'https://www.jd.com/?ds':
            # logger.info(f'task {os.getpid()} 商品可能不存在: {response.url}')
        # return response

    # def process_exception(self, request, exception, spider):
        # # Called when a download handler or a process_request()
        # # (from other downloader middleware) raises an exception.
 
        # # Must either:
        # # - return None: continue processing this exception
        # # - return a Response object: stops process_exception() chain
        # # - return a Request object: stops process_exception() chain
        # if isinstance(exception,TimeoutError):
            # request.meta['dont_filter'] = True
            # return request
            
    def get_proxy(self):
        """ ADSL ip代理配置 """
        while True:
            # now = datetime.datetime.now()
            # if datetime.datetime.now().hour%3 == 0 and datetime.datetime.now().minute == 0:
                # time.sleep(120)
            proxies = self.server.hvals('adsl')
            if proxies:
                proxy_ = str(random.choice(proxies))
                y = re.search('\d+.\d+.\d+.\d+', proxy_)
                proxy = y[0]        
                # logger.info(proxy_.split('_'))
                # logger.info(int(proxy_.split('_')[1].replace("'", "")))
                # logger.info(int(time.time()) - int(proxy_.split('_')[1].replace("'", "")))
                proxy_life = 20 # 代理存活周期,可以写在settings, 6为拨号耗时
                proxy_elapsed_time = (time.time() - int(proxy_.split('_')[1].replace("'", "")))
                # time_to_life = proxy_life - proxy_elapsed_time                
                if  proxy_elapsed_time <= 15:
                    # with open("used_IP_list.txt", "rb") as f:
                        # lines = [line.rstrip() for line in f]
                        # lines = list(set(lines))
                    # if proxy in lines:
                        # # logger.info(f'task {os.getpid()} --该IP已经使用过: %s', proxy)  
                        # with open("used_IP_list.txt", "a+") as file:
                            # file.write('\n' + proxy)                         
                    # else:
                        # logger.info(f'task {os.getpid()} --proxy_elapsed_time: %s', proxy_elapsed_time)
                        # with open("used_IP_list.txt", "a+") as file:
                            # file.write('\n' + proxy)                                                             
                        break
                else:
                    continue                
            else:
                time.sleep(0.3)
                logger.info(f'task {os.getpid()} --等待proxy中......')
            # logger.info(f'task {os.getpid()} -- proxy info from redis: %s', proxy_)                  
        # logger.info(f'task {os.getpid()} -- proxy to be used: %s', proxy)      
        return proxy 
        
class UserAgentMiddleware(object):
    def __init__(self, user_agent=''):
        # with open('/home/chen/jd_spider/jd/jd/ua.txt', 'r') as f:
            # uastring = random.choice(f.readlines())
            # print(uastring.strip())   
        # self.user_agent = random.choice(agents)
        self.user_agent = generate_user_agent(os='win', navigator='chrome', device_type='desktop')
        self.headers = {
        "User-Agent": self.user_agent, #random.choice(agents),
        # 'Referer': 'https://bj.lianjia.com/chengjiao/',
        # 'Cookie': 'lianjia_uuid=a5b828d2-1f33-433e-aa90-b4be7385065f; Hm_lvt_9152f8221cb6243a53c83b956842be8a=1609347650; _jzqc=1; UM_distinctid=176b498526b4b-0157ca1b93e587-c791039-144000-176b498526c159; _smt_uid=5fecb242.4f86c74c; _qzjc=1; sensorsdata2015jssdkcross=%7B%22distinct_id%22%3A%22176b49854ae792-0f2edd1abf8f6e-c791039-1327104-176b49854af9f0%22%2C%22%24device_id%22%3A%22176b49854ae792-0f2edd1abf8f6e-c791039-1327104-176b49854af9f0%22%2C%22props%22%3A%7B%22%24latest_traffic_source_type%22%3A%22%E7%9B%B4%E6%8E%A5%E6%B5%81%E9%87%8F%22%2C%22%24latest_referrer%22%3A%22%22%2C%22%24latest_referrer_host%22%3A%22%22%2C%22%24latest_search_keyword%22%3A%22%E6%9C%AA%E5%8F%96%E5%88%B0%E5%80%BC_%E7%9B%B4%E6%8E%A5%E6%89%93%E5%BC%80%22%7D%7D; _ga=GA1.2.1425138003.1609347653; _gid=GA1.2.359986535.1609347653; select_city=110000; _jzqckmp=1; _jzqx=1.1609379391.1609442428.5.jzqsr=bj%2Elianjia%2Ecom|jzqct=/chengjiao/.jzqsr=bj%2Elianjia%2Ecom|jzqct=/chengjiao/dongcheng/; CNZZDATA1253477573=882224077-1609347648-%7C1609462712; CNZZDATA1254525948=1134180723-1609344452-%7C1609464147; CNZZDATA1255633284=154247568-1609343258-%7C1609462671; CNZZDATA1255604082=740153180-1609347648-%7C1609464929; _jzqa=1.2258303215606024700.1609347650.1609442428.1609465530.11; Hm_lpvt_9152f8221cb6243a53c83b956842be8a=1609466930; _qzja=1.2109333563.1609347650261.1609442427936.1609465530222.1609466756818.1609466929958.0.0.0.105.11; _qzjto=23.3.0; srcid=eyJ0Ijoie1wiZGF0YVwiOlwiZjNiZTFiYzMwNmE5NmY3Njc5NDAzYzJmNmNlZDgxYTY0NmRkZDNiNzFhNzdlZjBjNGEyMWYxY2JjNGY4YjJiYWI4NTIyMTdhNWNjNGM0NzMzMzliNTMyMDIwNmNhMTVjOTM2OTBjYzEyNmVkMWVmMzZlMWYyY2UwYjU2MWUzNjRmZWM3MjcyNWM0OTI4NTZlYTU5NDhhNDI5NWRhNmE3ZDA5MDRmYjY0NTcyYjJiOTQxMmUwOGRlMGEyNjVkMzI2ZWViZTQ1ZDRkZTQxZDI1OGE3YmJmOWY2YTU1NGUxYTE4ZGU1ODVhM2Y2NzhhYzM3ZWRmZmE0Nzg2ZmViZWVkYjMzOTMyMTdlYTkzMTVjMzFkMGZlMDlkYzNmM2M4ZDU4NTgxMmI0ZmNmYjhmNzI5YmM4MTcwOTQwMDZlOTkyYjc4OWIwMzUzMWFkYzViMjU0MzY0ODQ1MmE1ZDM2MTkyNlwiLFwia2V5X2lkXCI6XCIxXCIsXCJzaWduXCI6XCI0MGJmOTg4MlwifSIsInIiOiJodHRwczovL2JqLmxpYW5qaWEuY29tL2NoZW5namlhby8xMDExMDU3NTk3MTEuaHRtbCIsIm9zIjoid2ViIiwidiI6IjAuMSJ9',              
        "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9",
        "Accept-Encoding": "gzip, deflate, br",
        "Accept-Language": "en-US,en;q=0.9,zh-CN;q=0.8,zh;q=0.7",
        "Connection": "keep-alive"          
        }
        #"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9",
        #"Accept-Encoding": "gzip, deflate, br",
        #"Accept-Language": "en-US,en;q=0.9,zh-CN;q=0.8,zh;q=0.7"        
        self.ios = ['iOS 12',
                    'iOS 12.0.1',
                    'iOS 12.1',
                    'iOS 12.1.1',
                    'iOS 12.1.2',
                    'iOS 12.1.3',
                    'iOS 12.1.4',
                    'iOS 12.2',
                    'iOS 12.3',
                    'iOS 12.3.1',
                    'iOS 12.3.2',
                    'iOS 12.4',
                    'iOS 12.4.1',
                    'iOS 12.4.2',
                    'iOS 12.4.3',
                    'iOS 12.4.4',
                    'iOS 12.4.5',
                    'iOS 12.4.6',
                    'iOS 12.4.7',
                    'iOS 12.4.8',
                    'iOS 12.4.9']
    def process_request(self, request, spider):  
        request.headers = Headers(self.headers)    
        # referer=request.url
        # if referer:
            # request.headers["referer"] = referer 
           

利用scrapy-redis爬取链家百万成交记录

新年的钟声敲响之际,我也敲定了链家成交记录的爬虫代码,从北京开始,一共近百万成交记录。代码总共用了两天,关键代码1.31号写完,元旦完成全部代码修补了几个小bug,先亮一下爬取的字段:

{
	"_id" : "101100313085",
	"index_url" : "https://bj.lianjia.com/chengjiao/dongcheng/pg1p2l2a2/",
	"detail_url" : "https://bj.lianjia.com/chengjiao/101100313085.html",
	"house_code" : "101100313085",
	"detail_title" : "定安里 2室1厅 54.34平米",
	"detail_dirFurnish" : "南 北 | 简装",
	"detail_dealDate" : "2016.07.21",
	"detail_floor" : "中楼层(共6层) 1980年建板楼",
	"detail_totalPrice" : "240",
	"detail_unitPrice" : "44167",
	"detail_listPrice" : "挂牌240万",
	"aprt_dealCycle" : "成交周期7天",
	"detail_agentName" : "李玉军",
	"detail_agentId" : "1000000010080328",
	"detail_dealInfo" : "定安里 2室1厅 54.34平米2016.07.21 成交",
	"detail_dealBread" : "北京房产网北京二手房成交东城二手房成交永定门二手房成交定安里二手房成交",
	"detail_priceChangeTimes" : "0",
	"detail_visitTimes" : "",
	"detail_followers" : "7",
	"detail_viewTimes" : "66",
	"detail_basic_info" : {
		"房屋户型" : "2室1厅1厨1卫",
		"所在楼层" : "中楼层(共6层)",
		"建筑面积" : "54.34㎡",
		"户型结构" : "平层",
		"套内面积" : "暂无数据",
		"建筑类型" : "板楼",
		"房屋朝向" : "南 北",
		"建成年代" : "1980",
		"装修情况" : "简装",
		"建筑结构" : "混合结构",
		"供暖方式" : "集中供暖",
		"梯户比例" : "一梯两户",
		"配备电梯" : "无"
	},
	"detail_transaction_info" : {
		"链家编号" : "101100313085",
		"交易权属" : "商品房",
		"挂牌时间" : "2016-07-15",
		"房屋用途" : "普通住宅",
		"房屋年限" : "满五年",
		"房权所属" : "非共有"
	},
	"detail_transaction_history" : "240万单价44167元/平,2016-07成交",
	"community_name" : "定安里",
	"community_url" : "https://bj.lianjia.com/chengjiao/c1111027376735",
	"community_info" : {
		
	},
	"detail_features" : {
		"房源标签" : "房本满五年"
	},
	"resblockPosition" : "116.418443,39.866651",
	"city_id" : "110000",
	"city_name" : "city_name: '北京'",
	"resblockId" : "1111027376735"
}

实际在生产项目中是需要把html文件保存下来的,但是我的服务器只有区区50G空间,可用空间只有10G多点儿了,吃不消,所以爬取时尽量地采集更多的字段。

链家成交记录是有反爬的,需要使用大量代理IP,不然爬取速度会受限。

  • https://bj.lianjia.com/chengjiao/,这个是入口,很明显,每页只有30个房源,最多显示100页的限制需要通过不同分类才能全面地获取数据。我专门写了个脚本,利用区域、售价、房型三个限制条件来构造所有的url,这样能够采集所有的url,https://bj.lianjia.com/chengjiao/haidian/l3a3p4/,这个是海淀300-400万,三室,70-90平的url,可以通过解析下面的页面构造出来。
链家成交记录页面
  • 禁用cookie和redirect,设置timeout=3, retry_times=5,单IP并发数16,单机并发数我控制在了80,用了6个拨号服务器做代理20秒拨号一次,这样scrapy每秒能抓取10个房源,基本上一天多就采集完北京站的交易记录。
  • 关于adsl vps,我博客讲过多次,即使是少量,也要尽量用不同地区甚至省份的VPS,确保IP多样性和高可用率。以我的经验,江浙和广东一带的服务器IP最多,毕竟网络发达程度和互联网发达程度成正比,推荐杭州,景德镇,中山这类城市。
  • 关键代码如下。实际上,中间件至关重要,请求头, referer和代理一定要设置好并做好测试。
#!/usr/bin/env python3
# # -*- coding: utf-8 -*-
import json
import logging
import uuid
import pickle
import scrapy
from scrapy_redis import spiders
from scrapy.utils.project import get_project_settings
from scrapy_redis.utils import bytes_to_str
import redis
import random
from scrapy_redis.spiders import RedisSpider
from lianjia.items import LianjiaItem
from lianjia.log import logger
import re
import sys

class DealsSpider(RedisSpider):
    name = 'deals'
    allowed_domains = ['lianjia.com']
    # start_urls = ['http://lianjia.com/']
    redis_key = 'lianjia:start_urls'
    
    def __init__(self, *args, **kwargs):
        super(DealsSpider, self).__init__(*args, **kwargs)
            
    def parse(self, response):
        index_url = response.url
        num_found = int(response.xpath('//div[@class="total fl"]/span/text()').extract_first())
        logger.info(f'num of apartments found in {index_url}: {num_found}')        
        if num_found > 0:
            try:
                logger.debug(f'index request.meta: {response.request.meta} {index_url}')
                logger.debug(f'index request.headers: {response.request.headers} {index_url}')            
                total_pages = int(num_found/30) + 1
                aprt_list = response.xpath('//ul[@class="listContent"]/li')
                logger.info(f'num of apartments in the current_pgNum: {len(aprt_list)}')
                pattern = re.compile(r'"curPage":\d+')
                curPage_ = re.search(pattern, response.text)[0]
                patternd = re.compile(r'\d+')
                current_pgNum = int(re.search(patternd, curPage_)[0])
                logger.info(f'curPage matched: {current_pgNum}')
                logger.debug(f'debug index_url: {index_url}')
                # current_pgNum = int(response.xpath('//div[@class="contentBottom clear"]/div[@class="page-box fr"]/div[@class="page-box house-lst-page-box"]/a[@class="on"]/text()').extract_first())            
                for li in aprt_list:

                    aprt_link = self.eleMissing(li.xpath('./a/@href').extract_first())
                   
                    aprt_title = self.eleMissing(self.strJoin(li.xpath('./div[@class="info"]/div[@class="title"]/a/text()').extract()))
                    aprt_dirFurnish = self.eleMissing(self.strJoin(li.xpath('./div[@class="info"]/div[@class="address"]/div[@class="houseInfo"]/text()').extract()))
                    aprt_dealDate = self.eleMissing(self.strJoin(li.xpath('./div[@class="info"]//div[@class="dealDate"]/text()').extract()))
                    aprt_floor = self.eleMissing(self.strJoin(li.xpath('./div[@class="info"]/div[@class="flood"]/div[@class="positionInfo"]/text()').extract()))                
                    aprt_totalPrice =  self.eleMissing(li.xpath('./div[@class="info"]/div[@class="address"]/div[@class="totalPrice"]/span[@class="number"]/text()').extract_first())
                    aprt_unitPrice = self.eleMissing(li.xpath('./div[@class="info"]/div[@class="flood"]/div[@class="unitPrice"]/span[@class="number"]/text()').extract_first())
                    aprt_features = self.eleMissing(li.xpath('./div[@class="info"]/div[@class="dealHouseInfo"]/span[@class="dealHouseTxt"]/span/text()').extract_first())                
                    aprt_listPrice = self.eleMissing(self.strJoin(li.xpath('./div[@class="info"]/div[@class="dealCycleeInfo"]/span[@class="dealCycleTxt"]/span[1]/text()').extract()))
                    aprt_dealCycle = self.eleMissing(li.xpath('./div[@class="info"]/div[@class="dealCycleeInfo"]/span[@class="dealCycleTxt"]/span[2]/text()').extract_first())
                    aprt_agent_name = self.eleMissing(li.xpath('./div[@class="info"]/div[@class="agentInfoList"]/a/text()').extract_first())
                    aprt_agent_id = self.eleMissing(li.xpath('./div[@class="info"]/div[@class="agentInfoList"]/div[@class="agent_chat_btn im-talk LOGCLICKDATA"]/@data-lj_action_agent_id').extract_first())                    
                    yield scrapy.Request(url=aprt_link, meta={'detail_url': aprt_link, 'detail_title': aprt_title, 'detail_dirFurnish': aprt_dirFurnish,
                    'detail_dealDate': aprt_dealDate, 'detail_floor': aprt_floor, 'detail_totalPrice': aprt_totalPrice, 'detail_unitPrice': aprt_unitPrice,
                    'detail_sellpoint': aprt_features, 'detail_listPrice': aprt_listPrice, 'aprt_dealCycle': aprt_dealCycle, 'index_url': index_url,
                    'detail_agent_name': aprt_agent_name, 'detail_agent_id': aprt_agent_id, 'dont_redirect': True, 'referer': index_url}, callback=self.parse_item, dont_filter=False)
                if current_pgNum < total_pages:
                    pg = 'pg' + str(current_pgNum)
                    next_url = re.sub(f'/{pg}', f'/pg{current_pgNum + 1}', index_url)
                    logger.debug(f'next_url: {next_url}')
                    yield scrapy.Request(url=next_url, callback=self.parse, dont_filter=False, meta={'dont_redirect': True, 'referer': index_url})
            except Exception as e:
                logger.info(e)
                # logger.info(response.text)
                # sys.exit()
    def parse_item(self, response):    
        logger.debug(f'request.meta: {response.request.meta} {response.url}')
        logger.debug(f'request.headers: {response.request.headers} {response.url}')     
        item = LianjiaItem()
        item['index_url'] = response.meta['index_url']
        item['detail_url'] = response.meta['detail_url']
        item['house_code'] = response.meta['detail_url'].split('/')[-1].split('.')[0]
        item['_id'] = item['house_code']
        item['detail_title'] = response.meta['detail_title']
        item['detail_dirFurnish'] = response.meta['detail_dirFurnish'] 
        item['detail_dealDate'] = response.meta['detail_dealDate']
        item['detail_floor'] = response.meta['detail_floor']
        item['detail_totalPrice'] = response.meta['detail_totalPrice']
        item['detail_unitPrice'] = response.meta['detail_unitPrice']
        # item['detail_sellpoint'] = response.meta['detail_sellpoint']
        item['detail_listPrice'] = response.meta['detail_listPrice']
        if len(item['detail_listPrice']) == 0:
            item['detail_listPrice'] = self.eleMissing(response.xpath('//section[@class="wrapper"]//div[@class="msg"]/span[1]/label/text()').extract_first())
        item['aprt_dealCycle'] = response.meta['aprt_dealCycle']
        # Not all aprt_agent_id exist
        item['detail_agentName'] = response.meta['detail_agent_name']
        item['detail_agentId'] = response.meta['detail_agent_id']        
        item['detail_dealInfo'] = self.eleMissing(response.xpath('//div[@class="wrapper"]/text()').extract_first() + response.xpath('//div[@class="wrapper"]/span/text()').extract_first())
        item['detail_dealBread'] = self.eleMissing(self.strJoin(response.xpath('//section[@class="wrapper"]/div[@class="deal-bread"]/a/text()').extract()))
        item['detail_priceChangeTimes'] = self.eleMissing(response.xpath('//section[@class="wrapper"]//div[@class="msg"]/span[3]/label/text()').extract_first())
        item['detail_visitTimes'] = self.eleMissing(response.xpath('//section[@class="wrapper"]//div[@class="msg"]/span[4]/label/text()').extract_first())
        item['detail_followers'] = self.eleMissing(response.xpath('//section[@class="wrapper"]//div[@class="msg"]/span[5]/label/text()').extract_first())
        item['detail_viewTimes'] = self.eleMissing(response.xpath('//section[@class="wrapper"]//div[@class="msg"]/span[6]/label/text()').extract_first())
        basic_info_names = self.stripList(response.xpath('//section[@class="houseContentBox"]//div[@class="base"]/div[@class="content"]/ul/li/span/text()').extract())
        basic_info_values = self.stripList(response.xpath('//section[@class="houseContentBox"]//div[@class="base"]/div[@class="content"]/ul/li/text()').extract())
        item['detail_basic_info'] = dict(zip(basic_info_names, basic_info_values))
        transaction_info_names = self.stripList(response.xpath('//div[@class="transaction"]//div[@class="content"]/ul/li/span/text()').extract())
        transaction_info_values = self.stripList(response.xpath('//div[@class="transaction"]//div[@class="content"]/ul/li/text()').extract())        
        item['detail_transaction_info'] = dict(zip(transaction_info_names, transaction_info_values))   
        item['detail_transaction_history'] = self.eleMissing(self.strJoin(response.xpath('//*[@id="chengjiao_record"]/ul/li//text()').extract()))       
        # item['community_name'] = self.eleMissing(response.xpath('//*[@id="resblockCardContainer"]/div[@class="newwrap"]/div[@class="xiaoquCard"]/div[@class="xiaoqu_header clear"]/h3/span/text()').extract_first())[:-2]
        item['community_name'] = item['detail_title'].split(' ')[0]
        # item['community_url'] = response.xpath('//*[@id="resblockCardContainer"]/div[@class="newwrap"]/div[@class="xiaoquCard"]/div[@class="xiaoqu_header clear"]/a/@href').extract_first()
        pattern_url = re.compile(r'https://bj.lianjia.com/chengjiao/c\d+')
        item['community_url'] = self.eleMissing(re.search(pattern_url, response.text)[0])
        community_info_label = response.xpath('//*[@id="resblockCardContainer"]/div[@class="newwrap"]/div[@class="xiaoquCard"]/div[@class="xiaoqu_content clear"]/div[@class="xiaoqu_main fl"]/div/label/text()').extract()
        community_info_value = response.xpath('//*[@id="resblockCardContainer"]/div[@class="newwrap"]/div[@class="xiaoquCard"]/div[@class="xiaoqu_content clear"]/div[@class="xiaoqu_main fl"]/div/span/text()').extract()
        item['community_info'] = dict(zip(self.stripList(community_info_label), self.stripList(community_info_value)))
        feature_label = self.eleMissing(response.xpath('//*[@id="house_feature"]/div[@class="introContent showbasemore"]/div/div[@class="name"]/text()').extract())
        feature_value = self.eleMissing(response.xpath('//*[@id="house_feature"]/div[@class="introContent showbasemore"]/div/div[@class="content"]/a/text()').extract())
        item['detail_features'] = dict(zip(self.stripList(feature_label), self.stripList(feature_value)))
        # positionInfo: 
        pattern_pos = re.compile(r"resblockPosition:'\d+.\d+,\d+.\d+")
        pos_ = re.search(pattern_pos, response.text)[0]
        item['resblockPosition'] = self.eleMissing(re.search(r'\d+.\d+,\d+.\d+', pos_)[0])
        # city_id:
        pattern_cityId = re.compile(r"city_id: '\d+")
        cityId_ = re.search(pattern_cityId, response.text)[0]
        item['city_id'] = self.eleMissing(re.search(r'\d+', cityId_)[0])
        # city_name
        pattern_cityName = re.compile(r"city_name: '.*'")
        item['city_name'] = self.eleMissing((re.search(pattern_cityName, response.text)[0]))
        # resblockId
        pattern_resblockId = re.compile(r"resblockId:'\d+'")
        resblockId_ = re.search(pattern_resblockId, response.text)[0]
        item['resblockId'] = self.eleMissing(re.search(r'\d+', resblockId_)[0])
        yield item
    def strJoin(self, element_list):
        return ''.join(i for i in element_list)
    def eleMissing(self, element):
        if element is None:
            return ""
        else:
            return element
    def stripList(self, eleList):
        return [i.strip() for i in eleList]