示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
import asyncio

import aiohttp
from bs4 import BeautifulSoup

import logging

class AsnycGrab(object):

def __init__(self, url_list, max_threads):

self.urls = url_list
self.results = {}
self.max_threads = max_threads

def __parse_results(self, url, html):

try:
soup = BeautifulSoup(html, 'html.parser')
title = soup.find('title').get_text()
except Exception as e:
raise e

if title:
self.results[url] = title

async def get_body(self, url):
async with aiohttp.ClientSession() as session:
async with session.get(url, timeout=30) as response:
assert response.status == 200
html = await response.read()
return response.url, html

async def get_results(self, url):
url, html = await self.get_body(url)
self.__parse_results(url, html)
return 'Completed'

async def handle_tasks(self, task_id, work_queue):
while not work_queue.empty():
current_url = await work_queue.get()
try:
task_status = await self.get_results(current_url)
except Exception as e:
logging.exception('Error for {}'.format(current_url), exc_info=True)

def eventloop(self):
q = asyncio.Queue()
[q.put_nowait(url) for url in self.urls]
loop = asyncio.get_event_loop()
tasks = [self.handle_tasks(task_id, q, ) for task_id in range(self.max_threads)]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()


if __name__ == '__main__':
async_example = AsnycGrab(['http://edmundmartin.com',
'https://www.udemy.com',
'https://github.com/',
'https://zhangslob.github.io/',
'https://www.zhihu.com/'], 5)
async_example.eventloop()
print(async_example.results)

NhentaiSpider

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
import re
import logging
import asyncio

import aiohttp
from pyquery import PyQuery as pq


class NhentaiSpider:
def __init__(self,language, max_pages, max_threads):
self.language = language
self.max_pages = max_pages
self.max_threads = max_threads

self.results = []
self.q = asyncio.Queue()

self.headers = {'user-agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) \
AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3538.77 Safari/537.36'}
self.detail_url_pattern = re.compile(r'galleries\/(\d+)\/(\d+)t',re.S)
self.language_pattern_dict = {
'All' : r'^.*$',
'Chinese' : r'^.*([Cc]hinese|汉化|漢化).*$',
'English' : r'^.*([Ee]nglish).*$',
'Japan' : r'^((?<![Cc]hinese|[Ee]nglish).)*$'
}


async def __request_dom(self,url):
async with aiohttp.ClientSession() as sesssion:
try:
async with sesssion.get(url=url,headers=self.headers) as resp:
if resp.status == 200:
return (await resp.text())
else:
print('request status error')
except aiohttp.ClientConnectionError:
print('requests connection error')


def __parse_index(self,index_html):
pattern = re.compile(self.language_pattern_dict[self.language],re.S)
div_tags = pq(index_html)('.container.index-container .gallery').items()
for div_tag in div_tags:
try:
title = pattern.search(div_tag('.caption').text()).group()
except AttributeError:
pass
else:
x = {
'title':title,
'commic_url':'https://nhentai.net' + div_tag('a[class="cover"]').attr('href'),
'face_img_url':div_tag('noscript img').attr('src'),
'pages_url':[],
}
self.results.append(x)


def __parse_detail(self,detail_html):
div_tags = pq(detail_html)('.thumb-container a img').items()
commic_pages_list = []
for div_tag in div_tags:
png_url = div_tag.attr('data-src')
if png_url:
result_url = self.detail_url_pattern.search(png_url).groups()

img_url = 'https://i.nhentai.net/galleries/{}/{}.jpg'.format(*result_url)
commic_pages_list.append(img_url)
return commic_pages_list


async def get_results(self,url):
'''抓取单个url'''
html = await self.__request_dom(url)
if html:
self.__parse_index(html)

for each_commic in self.results:
each_commic_url = each_commic['commic_url']
detail_html = await self.__request_dom(each_commic_url)
commic_pages_list = self.__parse_detail(detail_html)
each_commic['pages_url'] = commic_pages_list


async def handle_tasks(self,task_id):
while not self.q.empty():
current_url = await self.q.get()
try:
task_status = await self.get_results(current_url)
except Exception as e:
logging.exception('Error for {}'.format(current_url), exc_info=True)


def run(self):
for page in range(1,self.max_pages+1):
url = f'https://nhentai.net/?page={page}'
self.q.put_nowait(url)

loop = asyncio.get_event_loop()
tasks = [self.handle_tasks(task_id) for task_id in range(self.max_threads)]
loop.run_until_complete(asyncio.wait(tasks))


def main():
# 抓取本子的语言(其他语言丢弃)
# 填入All/Chinese/Japan/English
language = 'Chinese'
max_page = 5
max_threads = 5

nhentaispider = NhentaiSpider(language, max_page ,max_threads)
nhentaispider.run()
print(nhentaispider.results)

if __name__ == '__main__':
main()
  • 三步走:
    1. run函数 :
      创建所有需要爬取的url,将其传入queue,将handle_tasks制作成task,最后启动事件循环
    2. handle_tasks函数:
      不断的从queue中获取url,然后调用get_result函数
    3. get_results函数 :
      传入单个url,得到该url的结果,负责整合request_dom,parse_index,parse_detail函数
  • 解析函数不需要使用async,所以可以yield出来

IrmemberSpider

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
import asyncio
import json

import aiohttp
import motor.motor_asyncio
from pyquery import PyQuery as pq


class IrmemberSpider:
def __init__(self,start_url):
self.start_url = start_url

self.headers = {'user-agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) \
AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3538.77 Safari/537.36'}


async def request_dom(self,url):
async with aiohttp.ClientSession() as session:
try:
async with session.get(url,headers=self.headers) as resp:
if resp.status == 200:
return json.loads(await resp.text())
else:
print('status error :',resp.status)
except aiohttp.ClientConnectionError as e:
print(e)


def parse_json_file(self,file):
try:
posts = file.get('data').get('posts')
except AttributeError:
pass
else:
return [
{'created_time': post.get('created_at'),
'id' : post.get('id'),
'img' : post.get('img'),
'name' : post.get('name'),
'text' : post.get('text'),
}
for post in posts]


def put_next_url(self,last_id):
next_url = f'http://i-remember.fr/api/search-posts?ln=en&lastId={last_id}'
self.q.put_nowait(next_url)


def connect_to_mongo(self):
client = motor.motor_asyncio.AsyncIOMotorClient('mongodb://localhost:27017')
db = client['aiohttp_iremember']
self.collection = db['posts']


async def save_to_mongo(self,post):
result = await self.collection.insert_many(post)
if result:
print('---------- 存储成功 : {} - {} ----------'.format(post[0]['id'],post[-1]['id']))
else:
print('---------- 存储失败 : {} - {} ----------'.format(post[0]['id'],post[-1]['id']))


async def scrapy_one_page(self,url):
print(f'---------- 正在处理 : {url} ----------')
json_file = await self.request_dom(url)
posts_list = self.parse_json_file(json_file)
self.put_next_url(posts_list[-1]['id'])
await self.save_to_mongo(posts_list)


async def handle_tasks(self):
while not self.q.empty():
next_url = await self.q.get()
await self.scrapy_one_page(next_url)


def run(self):
self.connect_to_mongo()
self.q = asyncio.Queue()
self.q.put_nowait()

loop = asyncio.get_event_loop()
loop.run_until_complete(self.handle_tasks())


def main():
start_url = 'http://i-remember.fr/api/search-posts?ln=en'
spider = IrmemberSpider(start_url)
spider.run()


if __name__ == '__main__':
main()

异步存储

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
import asyncio
import json
import datetime

import aiohttp
import motor.motor_asyncio
from pyquery import PyQuery as pq


class IrmemberSpider:
def __init__(self,start_url):
self.start_url = start_url
self.max_threads = 5

self.headers = {'user-agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) \
AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3538.77 Safari/537.36'}


async def request_dom(self,url):
async with aiohttp.ClientSession() as session:
try:
async with session.get(url,headers=self.headers) as resp:
if resp.status == 200:
return json.loads(await resp.text())
else:
print('status error :',resp.status)
except aiohttp.ClientConnectionError as e:
print(e)


def parse_json_file(self,file):
try:
posts = file.get('data').get('posts')
except AttributeError:
pass
else:
return [
{'created_time': post.get('created_at'),
'id' : post.get('id'),
'img' : post.get('img'),
'name' : post.get('name'),
'text' : post.get('text'),
}
for post in posts]


def put_next_url(self,last_id):
next_url = f'http://i-remember.fr/api/search-posts?ln=en&lastId={last_id}'
self.q.put_nowait(next_url)


def connect_to_mongo(self):
client = motor.motor_asyncio.AsyncIOMotorClient('mongodb://localhost:27017')
db = client['aiohttp_iremember']
self.collection = db['posts']


async def save_to_mongo(self,post,task_id):
print('--- {} 任务{} 正在存储 : {} - {}'.format(datetime.datetime.now().strftime('%H:%M:%S'),task_id,post[-1]['id'],post[0]['id']))
await self.collection.insert_many(post)


async def scrapy_one_page(self,url,task_id):
now = datetime.datetime.now().strftime('%H:%M:%S')
print(f'--- {now} 任务{task_id} 正在处理 : {url}')
json_file = await self.request_dom(url)
posts_list = self.parse_json_file(json_file)
self.put_next_url(posts_list[-1]['id'])
await self.save_to_mongo(posts_list,task_id)


async def handle_tasks(self,task_id):
while True:
next_url = await self.q.get()
await self.scrapy_one_page(next_url,task_id)


def run(self):
self.connect_to_mongo()
self.q = asyncio.Queue()
self.q.put_nowait(self.start_url)

loop = asyncio.get_event_loop()
tasks = [self.handle_tasks(task_id) for task_id in range(1,self.max_threads+1)]
loop.run_until_complete(asyncio.wait(tasks))


def main():
start_url = 'http://i-remember.fr/api/search-posts?ln=en'
spider = IrmemberSpider(start_url)
spider.run()


if __name__ == '__main__':
main()

对于动态添加任务,我们可以使用如下思路:

  • 必不添加任务,一开始就创建好,但是让他们使用queue.get()来阻塞

    1
    2
    while True:
    await queue.get()
  • 之后找个合适的时间queue.put()即可

当然,我们也可以使用正统操作:

  • 创建一个子线程,该子线程创建一个新的事件循环B,并且run_forever
  • 在主线程里调用某个task,运行事件循环A
  • 这个task不断的往事件循环B添加新的task
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import asyncio
from threading import Thread


async def create_task(event_loop):
i = 0
while True:
# 每秒产生一个[任务], 提交到线程里的循环中, event_loop作为参数
asyncio.run_coroutine_threadsafe(production(i), event_loop)
await asyncio.sleep(1)
i += 1


async def production(i):
while True:
print("第{}个coroutine任务".format(i))
await asyncio.sleep(1)


def start_loop(loop):
# 运行事件循环, loop作为参数
asyncio.set_event_loop(loop)
loop.run_forever()


thread_loop = asyncio.new_event_loop() # 创建事件循环
run_loop_thread = Thread(target=start_loop, args=(thread_loop,)) # 新起线程运行事件循环, 防止阻塞主线程
run_loop_thread.start() # 运行线程,即运行协程事件循环

main_loop = asyncio.new_event_loop()
main_loop.run_until_complete(create_task(thread_loop)) # 主线程负责create coroutine object