异步协程爬虫实战—365淘房

前言

最近在学习如何使用Async/Aiohttp实现异步协程爬虫,基于cxa大佬分享的discogs爬虫源码,实现365淘房的通用爬虫,网站本身没有爬取难度,重点是通用性的实现。因此本文不会对分析过程做过多说明,主要分享通用爬虫思路和cxa大佬异步爬虫框架的使用。

需求分析

需求是爬取365淘房网上的二手房和租房的个人房源信息。以二手房为例,首先找到入口的url是http://hf.sell.house365.com/district_i1/dl_p1.html,分析下url的参数含义:

参数 含义
hf 城市缩写,代表合肥
sell 房屋类别,代表二手房
district_i1 房源类别,代表个人房源
dl_p1 翻页参数,代表第一页

试着把hf换成hz(杭州),发现杭州的页面跟合肥的页面不太一样,自然页面源码也不同。再试试把sell换成(rent),发现租房的页面跟二手房的页面也不一样。这意味着我们要根据不同城市、不同的房屋类别写不同的xpath解析,那么如何只写一份代码,就能实现所有城市所有类别的爬取就值得好好考虑了。

解决方案

虽然借鉴了大佬的代码,但在爬取思路上还是有较大差异的。discogs的爬取思路是:先把列表页中的详情url全部爬下来,记录状态为0(初始),再另写一个爬虫,取出初始状态的详情url进行爬取,取出时更新状态为1(开始下载),保存后更新状态为2(下载完了)。这样做的好处是:

  1. 爬取列表页需要发起的请求较少,可以快速爬完
  2. 爬取详情过程中出现意外中断,只要将状态为1的数据更新为0,就能继续爬取。
  3. 通过对状态字段的控制,可支持分布式爬取

由于只看了通用类和方法,具体爬虫代码并没有细看,因此理解可能有误,但大致思路是这样。

分析了365淘房后,发现这个网站的数据量并不大,每页列表有20条数据,而最热门的城市也只有100来页,甚至有些城市的最大页码是个位数。使用大佬的思路爬的话有点杀鸡用牛刀的感觉,所以我做了下简化:循环城市,将当前城市列表页中的详情url全部爬取,但不入库,而是查询数据库,过滤已存在的数据(避免对详情url的重复请求),然后爬取所有过滤后的详情url。相当于每爬完一个城市的列表页进行一次入库。

这样做的好处是:

  1. 列表页数少,即便意外中断,顶多浪费100多次请求次数。
  2. 不进行状态记录,意外中断无需运维。
  3. 单个爬虫,操作方便。

剩下就是通用性的实现。可以将xpath选择器抽象为一个类,它包含的属性为我们需要解析的字段。然后将网站上结构相同的城市归为一类,根据每种类型创建一个xpath选择器对象,这里用namedtuple来实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
XPATH_SELECTOR = namedtuple("XpathSelector", [
"max_page",
"info_list",
"house_url",
'title',
'name',
'phone'
])


SH_TYPE_0 = XPATH_SELECTOR(
max_page='//div[@class="lb_zdfy fr"]/p[@class="fl"]/span[2]/text()',
info_list='//div[@id="qy_list_cont"]/div[@class="info_list"]',
house_url='.//a[@class="title fl"]/@href',
title='.//a[@class="title fl"]/text()[last()]',
name='//div[@class="person_information"]//span[@class="p_name fl"]/text()',
phone='//div[@class="person_info fl"]//div[@class="gr_phone_div_fr fl"]/p[1]/text()'
)

SH_TYPE_1 = XPATH_SELECTOR(
max_page='//div[@class="triggerBox"]//p[@class="number"]/text()',
info_list='//div[@class="listPagBox"]/dl[@id="JS_listPag"]/dd',
house_url='./div[@class="info"]/h3[@class="name"]/a/@href',
title='./div[@class="info"]/h3[@class="name"]/a/text()[last()]',
name='//div[@id="personal"]//p[@class="name"]/text()',
phone='//div[@class="telephoneBox"]/div/text()'
)

再做个字典,键是城市,值为xpath选择器对象,之后在解析时,只要根据城市获取对应的选择器对象进行解析即可。

代码实现

项目源码已上传至GitHub

以下是对公共基类base_crawler中两段代码的说明。

请求公共方法

在get_session中实现异步请求的公共方法,_kwargs参数可以传入请求常用的参数(如headers,data等),也可根据需求增加自定义参数。比如该项目中请求回来的源码只能用gbk来解码,因此增加了一个encoding的参数,用于传入解码方式。

bound_get_session方法通过asyncio.Semaphore限制并发数。discogs中并未使用该方法,而是通过对异步生成器进行切片的方式来实现并发限制。通常使用Semaphore就可满足需求,但是当数据量较大时,为节省内存,需选择后者。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
async def bound_get_session(self, url, _kwargs: dict = {}, source_type="text", status_code=200) -> Response:
async with sem:
res = await self.get_session(url, _kwargs, source_type, status_code)
await asyncio.sleep(random.random())
return res

@retry(attempts=MAX_RETRY_TIMES)
async def get_session(self, url, _kwargs: dict = {}, source_type="text", status_code=200) -> Response:
'''
:param kwargs:url,headers,data,params,etc,,
:param method: get post.
:param timeout: defalut 5s.
'''
# 使用marshal复制提高性能
kwargs = marshal.loads(marshal.dumps(_kwargs))

if USE_PROXY:
kwargs["proxy"] = await self.get_proxy()
method = kwargs.pop("method", "get")
timeout = kwargs.pop("timeout", 5)
encoding = kwargs.pop("encoding", None)
with async_timeout.timeout(timeout):
async with getattr(self.session, method)(url, **kwargs) as req:
status = req.status
if status in [status_code, 201]:
if source_type == "text":
source = await req.text(encoding=encoding,errors='ignore') if encoding else await req.text()
elif source_type == "buff":
source = await req.read()

crawler_log.debug(f"get url:{url},status:{status}")
res = Response(status=status, source=source)
return res

解析公共方法

_response参数可以是Response对象、页面源码字符串、甚至是xpath节点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def xpath(self, _response, rule, _attr=None,clean_method=None):
if isinstance(_response, Response):
source = _response.text
root = html.fromstring(source)

elif isinstance(_response, str):
source = _response
root = html.fromstring(source)
else:
root = _response
nodes = root.xpath(rule)

if _attr:
if _attr == "text":
result = [entry.text for entry in nodes]
else:
result = [entry.get(_attr) for entry in nodes]
else:
result = nodes

if clean_method:
result[0] = clean_method(result[0])

return result