在上节中,我们成功的在多进程中利用协程实现了多任务异步执行和多流程按次序执行的目标。本节我们将在原有代码的基础上继续改造代码,增加网页请求功能,实现一个简单的异步爬虫,实现每次爬新网页只需要关注网络请求、网页解析和数据处理,多进程和异步请求部分由爬虫自身处理。

详细流程图

需要用到的库

Beautifulsoup:一个可以从 HTMLXML 文件中提取数据的Python库。

1
2
3
4
# 安装方法
cd AiospiderWorkshop
pipenv shell
pipenv install beautifulsoup4

创建下载类 Downloader

我们以崔庆才崔老师建立的爬虫练习网站 https://scrape.center/ 为练习对象。我们用到的是其中最简单的一个网页 https://ssr1.scrape.center/page/1。阅读本节需要对 Beautifulsoup 库和 aiohttp 库有简单了解。

新建一个 py 文件,验证下载类 Downloader 的功能。

  1. 建立一个函数备用,从网页抽取电影名并打印到屏幕上。
1
2
3
4
5
6
7
from bs4 import BeautifulSoup

def extract_movie_name(html):
soup = BeautifulSoup(html, "html.parser")
name_tags = soup.find_all(class_="m-b-sm")
for name_tag in name_tags:
print(name_tag.string)
  1. 创建下载类 Downloader

Downloader 类主要有两个方法 get_asyncdownload

download:打开一个 session,异步请求 url 列表中的所有 url

get_async:请求网页并返回网页 html

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import asyncio
from aiohttp import ClientSession

class Downloader:
async def get_async(self, session, url):
async with session.get(url=url) as resp:
return await resp.text()

async def download(self):
async with ClientSession() as session:
url_lst = [
"https://ssr1.scrape.center/page/1",
"https://ssr1.scrape.center/page/2"
]
download_tasks = list()
for url in url_lst:
download_task = asyncio.create_task(self.get_async(session, url))
download_tasks.append(download_task)
for task in download_tasks:
await task
result = task.result()
extract_movie_name(result)

def async_run(self):
asyncio.run(self.download())
  1. 编写主函数 main
1
2
3
if __name__ == "__main__":
downloader = Downloader()
downloader.async_run()

此时,下载类能够正常运行。

1
2
3
4
5
# 运行结果
霸王别姬 - Farewell My Concubine
这个杀手不太冷 - Léon
肖申克的救赎 - The Shawshank Redemption
...

整合下载类

目前我们的下载类还是一个单独的功能,我们需要将下载方法整合进现有代码,采用多进程方法调用下载方法,并通过下载队列交换数据。

  1. 改造 Bridge

增加下载队列相关功能,原有代码不变。

download_queue:下载队列。

put_download_queueget_download_queuedownload_queue_empty 的功能不言自明。

1
2
3
4
5
6
7
8
9
10
11
12
13
class Bridge:
def __init__(self):
manager = Manager()
self.download_queue = manager.Queue()

def put_download_queue(self, workshop):
self.download_queue.put_nowait(workshop)

def get_download_queue(self):
return self.download_queue.get_nowait()

def download_queue_empty(self):
return self.download_queue.empty()
  1. 改造 Workshop

增加 urlneed_downloadhtml 三个属性

1
2
3
4
5
6
class Workshop:
def __init__(self, url, need_download):
self.url = url
self.need_download = need_download
self.html = None
self._next_process = None
  1. 改造 MyWorkshop

依据 Workshop 类的改变修改初始化代码,用本节的 extract_movie_name 方法稍加改造代替上节的两段模拟代码。

1
2
3
4
5
6
7
8
9
10
11
class MyWorkshop(Workshop):
def __init__(self, url, need_download):
super().__init__(url, need_download)
self.set_start_process(self.extract_movie_name)

async def extract_movie_name(self):
soup = BeautifulSoup(self.html, "html.parser")
name_tags = soup.find_all(class_="m-b-sm")
for name_tag in name_tags:
print(name_tag.string)
self.set_end()
  1. 改造 Downloader

改造 async_run__init__ 方法,使其可以接收信息传递类 Bridge 并保存。

增加 get_page 方法:接收 workshop,取出 url 交给 get_async 下载,下载好的 html 保存在 workshophtml 属性,之后置 workshopneed_download 属性为 False,返回 workshop

修改 download 方法:和 works 一样采用 bridge.work_end() 判断是否程序结束,从 download_queue 下载队列中取得 workshop,交给 get_page 方法处理,返回的 workshop 放入任务队列 work_queue 中进行下一步处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class Downloader:
def __init__(self):
self.bridge = None

async def get_async(self, session, url):
async with session.get(url=url) as resp:
return await resp.text()

async def get_page(self, session, workshop):
workshop.html = await self.get_async(session, workshop.url)
workshop.need_download = False
return workshop

async def download(self):
while not self.bridge.work_end():
async with ClientSession() as session:
download_tasks = list()
while not self.bridge.download_queue_empty():
workshop = self.bridge.get_download_queue()
task = asyncio.create_task(self.get_page(session, workshop))
download_tasks.append(task)
for task in download_tasks:
await task
workshop = task.result()
self.bridge.put_work_queue(workshop)

def async_run(self, bridge):
self.bridge = bridge
asyncio.run(self.download())
  1. 改造 Works

修改 run_works 方法:从 work_queue 拿到 workshop 后,判断其是否需要下载,如果需要下载就推入下载队列 download_queue 让下载进程下载。

其余部分保持不变。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class Works:
async def run_works(self):
self.bridge.flag_start()
while not self.bridge.work_end():
task_lst = list()
while not self.bridge.work_queue_empty():
workshop = self.bridge.get_work_queue()
if workshop.need_download:
self.bridge.put_download_queue(workshop)
continue
task = asyncio.create_task(workshop.run_next_process())
task_lst.append(task)
for task in task_lst:
await task
self.distribute_works(task)
  1. 改造 App

下载进程作为一个新进程调用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class App:
def __init__(self):
self.works = Works()
self.bridge = Bridge()
self.download = Downloader()

def async_run(self, workshop_lst):
self.bridge.init_works(workshop_lst)
p_run_works = Process(target=self.works.async_run,
args=(self.bridge,))
p_download = Process(target=self.download.async_run,
args=(self.bridge,))
p_run_works.start()
p_download.start()
p_run_works.join()
p_download.join()
  1. 改造主函数 main

在主函数中生成 Workshop 的列表,交给 App 执行即可。

1
2
3
4
5
6
7
8
9
10
if __name__ == "__main__":
work_lst = list()
url_template = "https://ssr1.scrape.center/page/{}"
for i in range(1, 11):
url = url_template.format(str(i))
work_lst.append(
MyWorkshop(url=url, need_download=True)
)
app = App()
app.async_run(work_lst)

至此,程序已可正常执行。

1
2
3
4
5
# 运行结果
霸王别姬 - Farewell My Concubine
...
魂断蓝桥 - Waterloo Bridge
运行时间:2.26s

本节完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
import asyncio
import time
from functools import wraps
from multiprocessing import Process
from multiprocessing import Manager
from aiohttp import ClientSession
from bs4 import BeautifulSoup


def print_run_time(func):
# 记录运行程序运行时间的装饰器
@wraps(func) # 保持被装饰的函数名不变,否则多进程调用出错
def wrap(*args, **kwargs):
start = time.time()
f = func(*args, **kwargs)
end = time.time()
print("运行时间:{:.2f}s".format(end - start))
return f
return wrap


class Bridge:
def __init__(self):
manager = Manager()
self.work_queue = manager.Queue()
self.download_queue = manager.Queue()
self.config_dict = manager.dict()
self.init_config()

def init_config(self):
self.config_dict["running_work_cnt"] = 0
self.config_dict["work_start_flag"] = False

def init_works(self, workshop_lst):
for workshop in workshop_lst:
self.put_work_queue(workshop)
self.work_cnt_increase()

def flag_start(self):
self.config_dict["work_start_flag"] = True

def work_end(self):
return self.config_dict["work_start_flag"]\
and not self.config_dict["running_work_cnt"]

def work_cnt_increase(self):
self.config_dict["running_work_cnt"] += 1

def work_cnt_decrease(self):
self.config_dict["running_work_cnt"] -= 1

def put_work_queue(self, workshop):
self.work_queue.put_nowait(workshop)

def get_work_queue(self):
return self.work_queue.get_nowait()

def work_queue_empty(self):
return self.work_queue.empty()

def put_download_queue(self, workshop):
self.download_queue.put_nowait(workshop)

def get_download_queue(self):
return self.download_queue.get_nowait()

def download_queue_empty(self):
return self.download_queue.empty()


class Workshop:
def __init__(self, url, need_download):
self.url = url
self.need_download = need_download
self.html = None
self._next_process = None

def set_start_process(self, func):
self._next_process = func

def set_next_process(self, func):
self._next_process = func

def set_end(self):
self._next_process = "/EOF"

def is_end(self):
return self._next_process == "/EOF"

async def run_next_process(self):
workshop = await self._next_process()
if workshop:
return workshop
else:
return self


class Works:
def __init__(self):
self.bridge = None

def distribute_works(self, task):
workshop = task.result()
if not workshop.is_end():
self.bridge.put_work_queue(workshop)
else:
self.bridge.work_cnt_decrease()

async def run_works(self):
self.bridge.flag_start()
while not self.bridge.work_end():
task_lst = list()
while not self.bridge.work_queue_empty():
workshop = self.bridge.get_work_queue()
if workshop.need_download:
self.bridge.put_download_queue(workshop)
continue
task = asyncio.create_task(workshop.run_next_process())
task_lst.append(task)
for task in task_lst:
await task
self.distribute_works(task)

@print_run_time
def async_run(self, bridge):
self.bridge = bridge
asyncio.run(self.run_works())


class Downloader:
def __init__(self):
self.bridge = None

async def get_async(self, session, url):
async with session.get(url=url) as resp:
return await resp.text()

async def get_page(self, session, workshop):
workshop.html = await self.get_async(session, workshop.url)
workshop.need_download = False
return workshop

async def download(self):
while not self.bridge.work_end():
async with ClientSession() as session:
download_tasks = list()
while not self.bridge.download_queue_empty():
workshop = self.bridge.get_download_queue()
task = asyncio.create_task(self.get_page(session, workshop))
download_tasks.append(task)
for task in download_tasks:
await task
workshop = task.result()
self.bridge.put_work_queue(workshop)

def async_run(self, bridge):
self.bridge = bridge
asyncio.run(self.download())


class App:
def __init__(self):
self.works = Works()
self.bridge = Bridge()
self.download = Downloader()

def async_run(self, workshop_lst):
self.bridge.init_works(workshop_lst)
p_run_works = Process(target=self.works.async_run,
args=(self.bridge, ))
p_download = Process(target=self.download.async_run,
args=(self.bridge,))
p_run_works.start()
p_download.start()
p_run_works.join()
p_download.join()


class MyWorkshop(Workshop):
def __init__(self, url, need_download):
super().__init__(url, need_download)
self.set_start_process(self.extract_movie_name)

async def extract_movie_name(self):
soup = BeautifulSoup(self.html, "html.parser")
name_tags = soup.find_all(class_="m-b-sm")
for name_tag in name_tags:
print(name_tag.string)
self.set_end()


if __name__ == "__main__":
work_lst = list()
url_template = "https://ssr1.scrape.center/page/{}"
for i in range(1, 11):
url = url_template.format(str(i))
work_lst.append(
MyWorkshop(url=url, need_download=True)
)
app = App()
app.async_run(work_lst)

本节总结

经过本节的改造,我们已经得到了一个简单的异步爬虫。针对一系列新网页,只需要继承 Workshop 类,实现自己的爬取流程代码即可。当然,目前它只能胜任最简单的工作,没有考虑错误处理、定制请求参数、代理、日志等一系列问题,这些需要在日后的使用中慢慢完善。