很多人入门 Python 是从爬虫开始的,笔者也不例外。爬取大量网页需要用到多进程、多线程、协程等等特性,而这类代码的编写往往比较繁琐,如果经常需要爬取不同的网页,我们往往会用到 scrapy 等爬虫框架以减少工作量。笔者最近正好需要大量爬取一些内容,本着学习的目的,我们不使用爬虫框架,从零开始搭建一个简单的异步爬虫。

目标

  1. 利用协程异步请求网页。
  2. 利用多进程加快爬取速度。
  3. 提供需要请求的链接,设定好处理流程后,程序自动按顺序处理。

简易流程图

Blog-异步爬虫结构

需要用到的库

  1. pipenv 创建虚拟环境,以免影响原有环境
  2. aiohttp 是基于 asyncio 实现的HTTP框架

环境搭建

  1. 安装 pipenv(参考官方教程
  2. 打开终端,在合适的位置创建文件夹,并进入文件夹
1
2
mkdir AiospiderWorkshop
cd AiospiderWorkshop
  1. 创建虚拟环境
1
pipenv install --python 3.8
  1. 更换 pipenv 源为阿里云源
1
2
3
4
5
6
# 打开 Pipfile 文件,修改 source 部分如下
[[source]]
url = "https://mirrors.aliyun.com/pypi/simple"
verify_ssl = true
name = "aliyun"
# 保存后重启终端(否则换源可能未生效)
  1. 进入虚拟环境,并安装 aiohttp
1
2
3
4
# 进入刚才创建的文件夹,并进入环境安装 aiohttp
cd AiospiderWorkshop
pipenv shell
pipenv install aiohttp

创建任务类 Workshop 和任务分配类 Works

此部分需要你对 Pythonasyncio 库有基本了解即可。

  1. 导入需要的库
1
2
3
4
import asyncio
import time
from random import randint
from functools import wraps
  1. 首先创建一个任务类 Workshop,函数 async_simulation 用于模拟异步操作
1
2
3
4
5
6
class Workshop:
async def async_simulation(self):
# 模拟一个异步操作,随机停顿1-3秒
sleep_time = randint(1, 3)
print("等待{}秒".format(sleep_time))
await asyncio.sleep(sleep_time)
  1. 运行上述模拟异步操作,查看运行情况

在模拟运行之前,先创建一个装饰器用于记录程序运行时间

1
2
3
4
5
6
7
8
9
10
def print_run_time(func):
# 记录运行程序运行时间的装饰器
@wraps(func) # 保持被装饰的函数名不变,否则多进程调用出错
def wrap(*args, **kwargs):
start = time.time()
f = func(*args, **kwargs)
end = time.time()
print("运行时间:{:.2f}s".format(end - start))
return f
return wrap

运行查看结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class Works:
async def run_works(self):
workshop = Workshop()
for i in range(3):
await workshop.async_simulation()

@print_run_time
def async_run(self):
asyncio.run(self.run_works())


if __name__ == "__main__":
works = Works()
works.async_run()
1
2
3
4
5
# 运行结果
等待3秒
等待2秒
等待3秒
运行时间:8.01s

我们看到程序并没有异步执行,要实现异步执行,我们需要对 Works 类的 run_works 函数做如下修改

1
2
3
4
5
6
7
8
9
class Works:
async def run_works(self):
task_lst = list()
for i in range(3):
workshop = Workshop()
task = asyncio.create_task(workshop.async_simulation())
task_lst.append(task)
for task in task_lst:
await task
1
2
3
4
5
# 运行结果
等待3秒
等待1秒
等待2秒
运行时间:3.01s

程序成功实现异步执行

在多进程中调用任务分配类

此部分需要你对 Pythonmultiprocessing 库有基本了解

程序多进程化非常简单,进行如下修改即可

1
2
3
4
5
6
7
from multiprocessing import Process

if __name__ == "__main__":
works = Works()
p_run_works = Process(target=works.async_run)
p_run_works.start()
p_run_works.join()

创建 App 类

如果程序初始化和多进程逻辑都放在 main 主函数中执行,程序功能越复杂, main 主函数也会越复杂,因此创建一个 App 类负责程序的初始化运行。对代码如下部分做修改。

1
2
3
4
5
6
7
8
9
10
11
12
class App:
def __init__(self):
self.works = Works()

def async_run(self):
p_run_works = Process(target=self.works.async_run)
p_run_works.start()
p_run_works.join()

if __name__ == "__main__":
app = App()
app.async_run()

创建信息传递类 Bridge

从前面的简易流程图可以看出,我们程序的运行依赖多个队列传输数据,因此,我们创建一个 Bridge 类负责所有共享数据的保存和操作。

Bridge 类中的 init_works 方法。将 workshop_lst 中的任务传入任务队列 work_queue 中,用于程序开始运行时任务初始化。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from multiprocessing import Manager

class Bridge:
def __init__(self):
manager = Manager()
self.work_queue = manager.Queue()

def put_work_queue(self, workshop):
self.work_queue.put_nowait(workshop)

def get_work_queue(self):
return self.work_queue.get_nowait()

def work_queue_empty(self):
return self.work_queue.empty()

def init_works(self, workshop_lst):
for workshop in workshop_lst:
self.put_work_queue(workshop)
  1. 修改 Works 类,使其可以利用信息传递类中的队列等共享数据。

修改 async_run 函数。调用 Works 类中的 async_run 函数时需要传入 bridgebridge 为信息传递类 Bridge 的实例,可通过其操作任务队列。

修改 run_works 方法,其运行时,如果任务队列 work_queue 不为空,则从队列中取出所有任务异步执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class Works:
def __init__(self):
self.bridge = None

async def run_works(self):
task_lst = list()
while not self.bridge.work_queue_empty():
workshop = self.bridge.get_work_queue()
task = asyncio.create_task(workshop.async_simulation())
task_lst.append(task)
for task in task_lst:
await task

@print_run_time
def async_run(self, bridge):
self.bridge = bridge
asyncio.run(self.run_works())
  1. 修改 App 类,使其可以传入 bridge。并且在 async_run 方法中调用 bridge.init_works(workshop_lst) 初始化任务队列。
1
2
3
4
5
6
7
8
9
10
11
class App:
def __init__(self):
self.works = Works()
self.bridge = Bridge()

def async_run(self, workshop_lst):
self.bridge.init_works(workshop_lst)
p_run_works = Process(target=self.works.async_run,
args=(self.bridge, ))
p_run_works.start()
p_run_works.join()
  1. 修改主函数 main 传入任务列表
1
2
3
4
5
6
if __name__ == "__main__":
work_lst = list()
for _ in range(3):
work_lst.append(Workshop())
app = App()
app.async_run(work_lst)

修改至此,程序已可以正常运行

增加按步骤运行函数的能力

到目前为止,我们的程序已经实现了传入任务列表并利用协程异步执行的功能,但还有一个问题需要解决:只能运行特定的方法,而爬取数据的处理往往需要经过很多步骤。

为了解决这个问题,我们在 Workshop 类中提供一个统一的函数运行接口 run_next_process,每个函数运行结束后,设定好下次运行的函数,并返回实例自身或者需要运行的其他实例。同时修改 Works 类,使其获得返回的实例后将返回的实例再次传入队列中,以便下次调用。

  1. 修改 Workshop

增加了 _next_process 属性、set_start_process 方法、set_next_process 方法、set_end 方法、is_end 方法、run_next_process 方法。

_next_process 属性:用于存储下一步需要调用的函数,初始化为 None

set_start_process 方法:需要在实例化同时调用,设定程序第一个调用的函数。

set_next_process 方法:在程序运行时调用,设定下一个调用的函数。

set_end 方法:将 _next_process 属性设置为 \EOF,标记所有流程均完成。

is_end 方法:返回流程是否全部结束。

run_next_process 方法:调用当前设定的函数,得到异步调用返回的实例并返回,如果无返回值,则返回当前实例自身。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class Workshop:
def __init__(self):
self._next_process = None

def set_start_process(self, func):
self._next_process = func

def set_next_process(self, func):
self._next_process = func

def set_end(self):
self._next_process = "/EOF"

def is_end(self):
return self._next_process == "/EOF"

async def run_next_process(self):
workshop = await self._next_process()
if workshop:
return workshop
else:
return self
  1. 继承 Workshop 类创建 MyWorkshop 类,在其中编写业务业务代码,控制流程。

async_simulation 方法:模拟第一个异步操作,在其中调用set_next_process 方法设定下一个需要调用的函数。

print_end 方法:模拟第二步操作,在其中调用 set_end 方法设定流程结束。

注意:所有的业务流程函数均需要为异步函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class MyWorkshop(Workshop):
def __init__(self):
super().__init__()
self.set_start_process(self.async_simulation)

async def async_simulation(self):
# 模拟一个异步操作,随机停顿1-3秒
sleep_time = randint(1, 3)
print("等待{}秒".format(sleep_time))
await asyncio.sleep(sleep_time)
self.set_next_process(self.print_end)

async def print_end(self):
# 模拟第二步操作
print("End")
self.set_end()
  1. 修改 Bridge

因为任务 workshop 需要再次进入任务队列中,所以在 Works 类中不能再用 work_queue 是否为空来判断程序是否应该结束了,因此对 Bridge 类进行一些改造,添加运行任务计数,并提供是否应该结束程序的判断函数。

在保持原有属性和方法不变的情况下,增加 config_dict 属性、init_config 方法、flag_start 方法、work_end 方法、work_cnt_increase 方法、work_cnt_decrease 方法,修改 add_works 方法。

config_dict 属性:记录运行过程中的一些关键数据,如正在运行的任务数量 running_work_cnt、任务开始标记 work_start_flag,通过 init_config 方法初始化。因为这些常数数据在进入不同进程时会复制一份新数据,因此常数数据共享应该在可变数据类型下共享,比如这里采用 dict 共享数据。

flag_start 方法:在程序最开始运行时调用,标记程序已开始运行

work_end 方法:返回程序是否完全结束,判断方法为 work_start_flag 标记为 Truerunning_work_cnt0

work_cnt_increase 方法:标记正在运行中的任务增加。

work_cnt_decrease 方法:标记正在运行中的任务减少。

修改 init_works 方法:接收 workshop 列表,将其压入任务队列,入队时调用 self.bridge.work_cnt_increase() 方法增加正在运行的程序计数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class Bridge:
def __init__(self):
manager = Manager()
self.work_queue = manager.Queue()
self.config_dict = manager.dict()
self.init_config()

def init_works(self, workshop_lst):
for workshop in workshop_lst:
self.put_work_queue(workshop)
self.work_cnt_increase()

def init_config(self):
self.config_dict["running_work_cnt"] = 0
self.config_dict["work_start_flag"] = False

def flag_start(self):
self.config_dict["work_start_flag"] = True

def work_end(self):
return self.config_dict["work_start_flag"]\
and not self.config_dict["running_work_cnt"]

def work_cnt_increase(self):
self.config_dict["running_work_cnt"] += 1

def work_cnt_decrease(self):
self.config_dict["running_work_cnt"] -= 1
  1. 修改 Works

我们已经修改 Workshop 类使其每次运行完一个方法后,返回实例自身,因此需要修改 run_works 方法,拿到返回实例,并视情况将其重新加入队列。增加 distribute_works 方法处理返回的实例。并在 distribute_works 方法中处理运行中程序的计数问题。

run_works 方法:修改循环条件,由 not bridge.work_queue_empty() 变为 not bridge.work_end()await task 后调用 distribute_works 方法处理返回的实例。asyncio.create_task 中调用 Workshoprun_next_process() 方法。

distribute_works 方法:调用 Workshopis_end() 方法判断是否已完成所有流程,如未完成,任务重新入队,如已完成,调用 bridge.work_cnt_decrease() 减少正在运行的程序计数。

最后,在 run_works 方法中调用 bridge.flag_start() 方法标记程序开始运行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class Works:
def distribute_works(self, task):
workshop = task.result()
if not workshop.is_end():
self.bridge.put_work_queue(workshop)
else:
self.bridge.work_cnt_decrease()

async def run_works(self):
self.bridge.flag_start()
while not self.bridge.work_end():
task_lst = list()
while not self.bridge.work_queue_empty():
workshop = self.bridge.get_work_queue()
task = asyncio.create_task(workshop.run_next_process())
task_lst.append(task)
for task in task_lst:
await task
self.distribute_works(task)
  1. 修改 main 主函数

MyWorkshop() 代替 Workshop()

1
2
3
4
5
6
if __name__ == "__main__":
work_lst = list()
for _ in range(3):
work_lst.append(MyWorkshop())
app = App()
app.async_run(work_lst)

此时,程序能够正常运行,并按顺序调用业务流程函数。

1
2
3
4
5
6
7
8
# 运行结果
等待1秒
等待1秒
等待3秒
End
End
End
运行时间:3.01s

至此,我们成功的在多进程中利用协程实现了多任务异步执行和多流程次序执行,将来可以通过增减进程的方式灵活的调节速率。

本节完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
import asyncio
import time
from random import randint
from functools import wraps
from multiprocessing import Process
from multiprocessing import Manager


def print_run_time(func):
# 记录运行程序运行时间的装饰器
@wraps(func) # 保持被装饰的函数名不变,否则多进程调用出错
def wrap(*args, **kwargs):
start = time.time()
f = func(*args, **kwargs)
end = time.time()
print("运行时间:{:.2f}s".format(end - start))
return f
return wrap


class Bridge:
def __init__(self):
manager = Manager()
self.work_queue = manager.Queue()
self.config_dict = manager.dict()
self.init_config()

def init_works(self, workshop_lst):
for workshop in workshop_lst:
self.put_work_queue(workshop)
self.work_cnt_increase()

def init_config(self):
self.config_dict["running_work_cnt"] = 0
self.config_dict["work_start_flag"] = False

def flag_start(self):
self.config_dict["work_start_flag"] = True

def work_end(self):
return self.config_dict["work_start_flag"] \
and not self.config_dict["running_work_cnt"]

def work_cnt_increase(self):
self.config_dict["running_work_cnt"] += 1

def work_cnt_decrease(self):
self.config_dict["running_work_cnt"] -= 1

def put_work_queue(self, workshop):
self.work_queue.put_nowait(workshop)

def get_work_queue(self):
return self.work_queue.get_nowait()

def work_queue_empty(self):
return self.work_queue.empty()


class Workshop:
def __init__(self):
self._next_process = None

def set_start_process(self, func):
self._next_process = func

def set_next_process(self, func):
self._next_process = func

def set_end(self):
self._next_process = "/EOF"

def is_end(self):
return self._next_process == "/EOF"

async def run_next_process(self):
workshop = await self._next_process()
if workshop:
return workshop
else:
return self


class MyWorkshop(Workshop):
def __init__(self):
super().__init__()
self.set_start_process(self.async_simulation)

async def async_simulation(self):
# 模拟一个异步操作,随机停顿1-3秒
sleep_time = randint(1, 3)
print("等待{}秒".format(sleep_time))
await asyncio.sleep(sleep_time)
self.set_next_process(self.print_end)

async def print_end(self):
# 模拟第二步操作
print("End")
self.set_end()



class Works:
def __init__(self):
self.bridge = None

def distribute_works(self, task):
workshop = task.result()
if not workshop.is_end():
self.bridge.put_work_queue(workshop)
else:
self.bridge.work_cnt_decrease()

async def run_works(self):
self.bridge.flag_start()
while not self.bridge.work_end():
task_lst = list()
while not self.bridge.work_queue_empty():
workshop = self.bridge.get_work_queue()
task = asyncio.create_task(workshop.run_next_process())
task_lst.append(task)
for task in task_lst:
await task
self.distribute_works(task)

@print_run_time
def async_run(self, bridge):
self.bridge = bridge
asyncio.run(self.run_works())


class App:
def __init__(self):
self.works = Works()
self.bridge = Bridge()

def async_run(self, workshop_lst):
self.bridge.init_works(workshop_lst)
p_run_works = Process(target=self.works.async_run,
args=(self.bridge, ))
p_run_works.start()
p_run_works.join()


if __name__ == "__main__":
work_lst = list()
for _ in range(3):
work_lst.append(MyWorkshop())
app = App()
app.async_run(work_lst)

本节总结

在本节中,我们成功的在多进程中利用协程实现了多任务异步执行和多流程按次序执行,将来可以通过增减进程的方式灵活的调节速率,本文开头的目标第2、3点已基本完成。下一节将继续改造代码,增加网页请求功能,实现一个简单的异步爬虫,实现每次爬新网页只需要关注网络请求、网页解析和数据处理,多进程和异步请求部分由爬虫自身处理。