扒拉了一下老电脑,把 N 年前的笔记搬过来了。这个是初学 scrapy 的笔记,留作纪念。

Middleware

scrapy中间件

start_requests

1
2
3
4
5
6
def start_request(self):
for url in self.start_urls:
yield self.make_requests_from_url(url)

def make_requests_from_url(self, url):
return Request(url, dont_filter=True)

ImagePipeline

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
class ImagesPipeline(FilesPipeline):
"""Abstract pipeline that implement the image thumbnail generation logic

"""

MEDIA_NAME = 'image'
MIN_WIDTH = 0
MIN_HEIGHT = 0
THUMBS = {}
DEFAULT_IMAGES_URLS_FIELD = 'image_urls'
DEFAULT_IMAGES_RESULT_FIELD = 'images'

@classmethod
def from_settings(cls, settings):
cls.MIN_WIDTH = settings.getint('IMAGES_MIN_WIDTH', 0)
cls.MIN_HEIGHT = settings.getint('IMAGES_MIN_HEIGHT', 0)
cls.EXPIRES = settings.getint('IMAGES_EXPIRES', 90)
cls.THUMBS = settings.get('IMAGES_THUMBS', {})
s3store = cls.STORE_SCHEMES['s3']
s3store.AWS_ACCESS_KEY_ID = settings['AWS_ACCESS_KEY_ID']
s3store.AWS_SECRET_ACCESS_KEY = settings['AWS_SECRET_ACCESS_KEY']

cls.IMAGES_URLS_FIELD = settings.get('IMAGES_URLS_FIELD', cls.DEFAULT_IMAGES_URLS_FIELD)
cls.IMAGES_RESULT_FIELD = settings.get('IMAGES_RESULT_FIELD', cls.DEFAULT_IMAGES_RESULT_FIELD)
store_uri = settings['IMAGES_STORE']
return cls(store_uri)

def file_downloaded(self, response, request, info):
return self.image_downloaded(response, request, info)

def image_downloaded(self, response, request, info):
checksum = None
for path, image, buf in self.get_images(response, request, info):
if checksum is None:
buf.seek(0)
checksum = md5sum(buf)
width, height = image.size
self.store.persist_file(
path, buf, info,
meta={'width': width, 'height': height},
headers={'Content-Type': 'image/jpeg'})
return checksum

def get_images(self, response, request, info):
path = self.file_path(request, response=response, info=info)
orig_image = Image.open(StringIO(response.body))

width, height = orig_image.size
if width < self.MIN_WIDTH or height < self.MIN_HEIGHT:
raise ImageException("Image too small (%dx%d < %dx%d)" %
(width, height, self.MIN_WIDTH, self.MIN_HEIGHT))

image, buf = self.convert_image(orig_image)
yield path, image, buf

for thumb_id, size in self.THUMBS.iteritems():
thumb_path = self.thumb_path(request, thumb_id, response=response, info=info)
thumb_image, thumb_buf = self.convert_image(image, size)
yield thumb_path, thumb_image, thumb_buf

def convert_image(self, image, size=None):
if image.format == 'PNG' and image.mode == 'RGBA':
background = Image.new('RGBA', image.size, (255, 255, 255))
background.paste(image, image)
image = background.convert('RGB')
elif image.mode != 'RGB':
image = image.convert('RGB')

if size:
image = image.copy()
image.thumbnail(size, Image.ANTIALIAS)

buf = StringIO()
image.save(buf, 'JPEG')
return image, buf

def get_media_requests(self, item, info):
return [Request(x) for x in item.get(self.IMAGES_URLS_FIELD, [])]

def item_completed(self, results, item, info):
if self.IMAGES_RESULT_FIELD in item.fields:
item[self.IMAGES_RESULT_FIELD] = [x for ok, x in results if ok]
return item

def file_path(self, request, response=None, info=None):
## start of deprecation warning block (can be removed in the future)
def _warn():
from scrapy.exceptions import ScrapyDeprecationWarning
import warnings
warnings.warn('ImagesPipeline.image_key(url) and file_key(url) methods are deprecated, '
'please use file_path(request, response=None, info=None) instead',
category=ScrapyDeprecationWarning, stacklevel=1)

# check if called from image_key or file_key with url as first argument
if not isinstance(request, Request):
_warn()
url = request
else:
url = request.url

# detect if file_key() or image_key() methods have been overridden
if not hasattr(self.file_key, '_base'):
_warn()
return self.file_key(url)
elif not hasattr(self.image_key, '_base'):
_warn()
return self.image_key(url)
## end of deprecation warning block

image_guid = hashlib.sha1(url).hexdigest() # change to request.url after deprecation
return 'full/%s.jpg' % (image_guid)

def thumb_path(self, request, thumb_id, response=None, info=None):
## start of deprecation warning block (can be removed in the future)
def _warn():
from scrapy.exceptions import ScrapyDeprecationWarning
import warnings
warnings.warn('ImagesPipeline.thumb_key(url) method is deprecated, please use '
'thumb_path(request, thumb_id, response=None, info=None) instead',
category=ScrapyDeprecationWarning, stacklevel=1)

# check if called from thumb_key with url as first argument
if not isinstance(request, Request):
_warn()
url = request
else:
url = request.url

# detect if thumb_key() method has been overridden
if not hasattr(self.thumb_key, '_base'):
_warn()
return self.thumb_key(url, thumb_id)
## end of deprecation warning block

thumb_guid = hashlib.sha1(url).hexdigest() # change to request.url after deprecation
return 'thumbs/%s/%s.jpg' % (thumb_id, thumb_guid)

# deprecated
def file_key(self, url):
return self.image_key(url)
file_key._base = True

# deprecated
def image_key(self, url):
return self.file_path(url)
image_key._base = True

# deprecated
def thumb_key(self, url, thumb_id):
return self.thumb_path(url, thumb_id)
thumb_key._base = True

JsonWriterPipeline

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 将项目写入JSON文件
import json

class JsonWriterPipeline(object):

def open_spider(self, spider):
self.file = open('items.jl', 'wb')

def close_spider(self, spider):
self.file.close()

def process_item(self, item, spider):
line = json.dumps(dict(item)) + "\n"
self.file.write(line)
return item

MongoPipeline

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# 将项目写入MongoDB
# MongoDB地址和数据库名称在Scrapy设置中指定; MongoDB集合以item类命名。
# 这个例子的要点是显示如何使用from_crawler()方法和如何正确清理资源
import pymongo

class MongoPipeline(object):

collection_name = 'scrapy_items'

def __init__(self, mongo_uri, mongo_db):
self.mongo_uri = mongo_uri
self.mongo_db = mongo_db

@classmethod
def from_crawler(cls, crawler):
return cls(
mongo_uri=crawler.settings.get('MONGO_URI'),
mongo_db=crawler.settings.get('MONGO_DATABASE', 'items')
)

def open_spider(self, spider):
self.client = pymongo.MongoClient(self.mongo_uri)
self.db = self.client[self.mongo_db]

def close_spider(self, spider):
self.client.close()

def process_item(self, item, spider):
self.db[self.collection_name].insert(dict(item))
return item

MysqlPipeline

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# 将项目写入MySQL

import pymysql

class MysqlPipeline(object):
def __init__(self,db,table,user,password):
self.db = db
self.table = table
self.user = user
self.password = password

@classmethod
def from_crawler(cls,crawler):
return cls(
db = crawler.settings.get('DB'),
table = crawler.settings.get('TABLE'),
user = crawler.settings.get('USER'),
password = crawler.settings.get('PASSWORD'),)

def open_spider(self,spider):
try:
# 连接数据库
self.db_conn = pymysql.connect(host='localhost',user=self.user,\
password=self.password,port=3306,db=self.db)
# 如果数据库不存在
except pymysql.err.InternalError:
# 连接Mysql
self.db_conn = pymysql.connect(host='localhost',user=self.user,
password=self.password,port=3306)
self.cursor = self.db_conn.cursor()
# 创建数据库
self.cursor.execute('CREATE DATABASE {} DEFAULT CHARACTER SET utf8'.format(self.db))
# 连接数据库
self.db_conn = pymysql.connect(host='localhost',user=self.user,\
password=self.password,port=3306,db=self.db)
finally:
# 创建数据库的操作游标
self.cursor = self.db_conn.cursor()
# 建表
sql = 'CREATE TABLE IF NOT EXISTS {} \
(title VARCHAR(255) NOT NULL,\
sort VARCHAR(255) NOT NULL,\
size VARCHAR(255) NOT NULL,\
href VARCHAR(255) NOT NULL,\
releasetime VARCHAR(255) NOT NULL,\
seed_num SMALLINT NOT NULL,\
download_time SMALLINT NOT NULL,\
publisher VARCHAR(255) NOT NULL,\
PRIMARY KEY (href))'.format(self.table)
self.cursor.execute(sql)

def process_item(self,item,spider):
keys = ','.join(item.keys())
values = ','.join(['%s'] * len(item))

sql = 'INSERT INTO {table}({keys}) values({values}) ON DUPLICATE KEY UPDATE '\
.format(table=self.table,keys=keys,values=values)
update = ','.join(["{key} = %s".format(key=key) for key in item])
sql += update

try:
self.cursor.execute(sql,tuple(item.values())*2)
self.db_conn.commit()
print('succeed',item['title'])
return item
except:
print('----------------','error',item['title'])
self.db_conn.rollback()

def close_spider(self,spider):
self.db_conn.close()

ScreenshotPipeline

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39

# 拍摄项目的屏幕截图
# 此示例演示如何从方法返回Deferredprocess_item()。
# 它使用Splash来呈现项目网址的屏幕截图。Pipeline请求本地运行的Splash实例。
# 在请求被下载并且Deferred回调触发后,它将项目保存到一个文件并将文件名添加到项目。
import scrapy
import hashlib
from urllib.parse import quote


class ScreenshotPipeline(object):
"""Pipeline that uses Splash to render screenshot of
every Scrapy item."""

SPLASH_URL = "http://localhost:8050/render.png?url={}"

def process_item(self, item, spider):
encoded_item_url = quote(item["url"])
screenshot_url = self.SPLASH_URL.format(encoded_item_url)
request = scrapy.Request(screenshot_url)
dfd = spider.crawler.engine.download(request, spider)
dfd.addBoth(self.return_item, item)
return dfd

def return_item(self, response, item):

if response.status != 200:
return item # Error happened, return item.

# Save screenshot to file, filename will be hash of url.
url = item["url"]
url_hash = hashlib.md5(url.encode("utf8")).hexdigest()
filename = "{}.png".format(url_hash)
with open(filename, "wb") as f:
f.write(response.body)

# Store filename in item.
item["screenshot_filename"] = filename
return item

DuplicatesPipeline:项目去重

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 复制过滤器
# 用于查找重复项目并删除已处理的项目的过滤器。
# 假设我们的项目具有唯一的ID,但是我们的蜘蛛会返回具有相同id的多个项目:

from scrapy.exceptions import DropItem

class DuplicatesPipeline(object):

def __init__(self):
self.ids_seen = set()

def process_item(self, item, spider):
if item['id'] in self.ids_seen:
raise DropItem("Duplicate item found: %s" % item)
else:
self.ids_seen.add(item['id']) #这里换成你自己的item["#"]
return item

RandomUserAgentMiddleware

1
2
3
4
5
6
7
8
9
10
from fake_useragent import UserAgent

class RandomUserAgentMiddleware(object):

def process_request(self, request, spider):
request.headers.setdefault('User-Agent', UserAgent().chrome)

# 因为默认内建有UserAgentMiddleware,所以,在middleware里要关闭UserAgentMiddleware
# 'scrapy.downloadermiddlewares.useragent.UserAgentMiddleware': None,
# '……………….RandomUserAgentMiddleware': 501,

ProxyMiddleware

1
2
3
4
5
6
7
8
9
10
11
12
13
class ProxyMiddleware(object):
def process_request(self,request,spider):
# 添加代理
request.meta['proxy'] = 'http://118.190.95.35:9001'

def process_response(self,request,response,spider):
response.status = 201
return response

def process_exception(self,request,exception,spider):
print('get exception')
request.meta['proxy'] = 'https://45.76.96.148:12211'
return request

ChromeDownloaderMiddleware:对接selenium

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
from scrapy.http import HtmlResponse
from selenium import webdriver
from selenium.common.exceptions import TimeoutException
from gp.configs import *


class ChromeDownloaderMiddleware(object):

    def __init__(self):
        options = webdriver.ChromeOptions()
        options.add_argument('--headless')  # 设置无界面
        if CHROME_PATH:
            options.binary_location = CHROME_PATH
        if CHROME_DRIVER_PATH:
            self.driver = webdriver.Chrome(chrome_options=options, executable_path=CHROME_DRIVER_PATH)  # 初始化Chrome驱动
        else:
            self.driver = webdriver.Chrome(chrome_options=options)  # 初始化Chrome驱动

    def __del__(self):
        self.driver.close()

    def process_request(self, request, spider):
        try:
            print('Chrome driver begin...')
            self.driver.get(request.url)  # 获取网页链接内容
            return HtmlResponse(url=request.url, body=self.driver.page_source, request=request, encoding='utf-8',
                                status=200)  # 返回HTML数据
        except TimeoutException:
            return HtmlResponse(url=request.url, request=request, encoding='utf-8', status=500)
        finally:
            print('Chrome driver end...')

TaobaoSpider

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# -*- coding: utf-8 -*-
from scrapy import Request, Spider
from urllib.parse import quote
from scrapyseleniumtest.items import ProductItem


class TaobaoSpider(Spider):
name = 'taobao'
allowed_domains = ['www.taobao.com']
base_url = 'https://s.taobao.com/search?q='

def start_requests(self):
for keyword in self.settings.get('KEYWORDS'):
for page in range(1, self.settings.get('MAX_PAGE') + 1):
url = self.base_url + quote(keyword)
yield Request(url=url, callback=self.parse, meta={'page': page}, dont_filter=True)

def parse(self, response):
products = response.xpath(
'//div[@id="mainsrp-itemlist"]//div[@class="items"][1]//div[contains(@class, "item")]')
for product in products:
item = ProductItem()
item['price'] = ''.join(product.xpath('.//div[contains(@class, "price")]//text()').extract()).strip()
item['title'] = ''.join(product.xpath('.//div[contains(@class, "title")]//text()').extract()).strip()
item['shop'] = ''.join(product.xpath('.//div[contains(@class, "shop")]//text()').extract()).strip()
item['image'] = ''.join(product.xpath('.//div[@class="pic"]//img[contains(@class, "img")]/@data-src').extract()).strip()
item['deal'] = product.xpath('.//div[contains(@class, "deal-cnt")]//text()').extract_first()
item['location'] = product.xpath('.//div[contains(@class, "location")]//text()').extract_first()
yield item


####################################################################################################################

# -*- coding: utf-8 -*-

from selenium import webdriver
from selenium.common.exceptions import TimeoutException
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from scrapy.http import HtmlResponse
from logging import getLogger


class SeleniumMiddleware():
def __init__(self, timeout=None, service_args=[]):
self.logger = getLogger(__name__)
self.timeout = timeout
self.browser = webdriver.PhantomJS(service_args=service_args)
self.browser.set_window_size(1400, 700)
self.browser.set_page_load_timeout(self.timeout)
self.wait = WebDriverWait(self.browser, self.timeout)

def __del__(self):
self.browser.close()

def process_request(self, request, spider):
"""
用PhantomJS抓取页面
:param request: Request对象
:param spider: Spider对象
:return: HtmlResponse
"""
self.logger.debug('PhantomJS is Starting')
page = request.meta.get('page', 1)
try:
self.browser.get(request.url)
if page > 1:
input = self.wait.until(
EC.presence_of_element_located((By.CSS_SELECTOR, '#mainsrp-pager div.form > input')))
submit = self.wait.until(
EC.element_to_be_clickable((By.CSS_SELECTOR, '#mainsrp-pager div.form > span.btn.J_Submit')))
input.clear()
input.send_keys(page)
submit.click()
self.wait.until(
EC.text_to_be_present_in_element((By.CSS_SELECTOR, '#mainsrp-pager li.item.active > span'), str(page)))
self.wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, '.m-itemlist .items .item')))
return HtmlResponse(url=request.url, body=self.browser.page_source, request=request, encoding='utf-8',
status=200)
except TimeoutException:
return HtmlResponse(url=request.url, status=500, request=request)

@classmethod
def from_crawler(cls, crawler):
return cls(timeout=crawler.settings.get('SELENIUM_TIMEOUT'),
service_args=crawler.settings.get('PHANTOMJS_SERVICE_ARGS'))