服务端

用aiohttp构建我们的第一个应用程序

快速开始

走个流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from aiohttp import web

# 创建app
app = web.Application()

# 创建视图函数
async def index(request):
return web.Response(text='hello world')

# 绑定url到视图函数
app.router.add_get('/',index)

# 运行app
web.run_app(app, host='127.0.0.1', port=8080)

创建app – 绑定路由 – 路由绑定视图函数 – 最后运行app

创建应用程序

aiohttp的服务端程序都是 aiohttp.web.Application实例对象,也就是app对象。用于创建信号,连接路由等。

目录结构:

1
2
3
4
polls_app
├── main.py
├── routes.py
└── views.py
1
2
3
4
5
6
# polls_app/main.py
from aiohttp import web

# 创建app
app = web.Application()
web.run_app(app, host='127.0.0.1', port=8080)

创建视图

1
2
3
4
5
# polls_app/views.py
from aiohttp import web

async def index(request):
return web.Response(text='Hello Aiohttp!')

创建路由(url分发器)

1
2
3
4
5
6
7
# polls_app/routes.py

# 就像Django中一样,要将视图函数导入urls.py
from views import index

def setup_routes(app):
app.router.add_get('/', index)

绑定routes和views

在Django中,Django会帮我们自动绑定urls.py和views.py.
现在我们需要手动绑定,将路由连接到视图函数上.

在main.py中调用routes.py里的setup_routes函数。将main函数修改为 :

1
2
3
4
5
6
7
8
# polls_app/main.py
from aiohttp import web
from routes import setup_routes

app = web.Application()
# 给app绑定一个url
setup_routes(app)
web.run_app(app, host='127.0.0.1', port=8080)

流程分析:

  • 创建app

    1
    app = web.Application()
  • 为这个app绑定一个url,并且调用视图函数

    1
    app.router.add_get('/', index)
  • 这个视图函数返回一个Response对象

    1
    2
    async def index(request):
    return web.Response(text='Hello Aiohttp!')
  • 运行这个app:

    1
    web.run_app(app, host='127.0.0.1', port=8080)

构建数据库

使用SQLAlchemy来写数据库架构

新增一个db.py(相当于Django里的Model.py)
当前目录结果:

1
2
3
4
5
polls_app
├── main.py
├── db.py
├── routes.py
└── views.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# db.py
import sqlalchemy as sa
import aiopg

meta = sa.MetaData()

question = sa.Table(
'question', meta,
sa.Column('id', sa.Integer, nullable=False),
sa.Column('question_text', sa.String(200), nullable=False),
sa.Column('pub_date', sa.Date, nullable=False),
# Indexes
sa.PrimaryKeyConstraint('id', name='question_id_pkey')
)

choice = sa.Table(
'choice', meta,
sa.Column('id', sa.Integer, nullable=False),
sa.Column('question_id', sa.Integer, nullable=False),
sa.Column('choice_text', sa.String(200), nullable=False),
sa.Column('votes', server_default="0", nullable=False),
# Indexes #
sa.PrimayKeyConstraint('id', name='choice_id_pkey'),
sa.ForeignKeyContraint(['question_id'], [question.c.id],
name='choice_question_id_fkey',
ondelete='CASCADE'),
)


async def init_pg(app):
conf = app['config']
engine = await aiopg.sa.create_engine(
database=conf['database'],
user=conf['user'],
password=conf['password'],
host=conf['host'],
port=conf['host'],
minsize=conf['minsize'],
maxsize=conf['maxsize'])

app['db'] = engine

关闭数据库

程序退出时一块关闭所有的资源接口是一个很好的做法。
使用on_cleanup信号来关闭数据库接口:

1
2
3
4
5
async def close_pg(app):
app['db'].close()
await app['db'].wait_closed()

app.on_cleanup.append(close_pg)

使用模板

1
2
3
4
5
6
7
8
9
10
11
12
@aiohttp_jinja2.template('detail.html')
async def poll(request):
async with request['db'].acquire() as conn:
question_id = request.match_info['question_id']
try:
question, choices = await db.get_question(conn,question_id)
except db.RecordNotFound as e:
raise web.HTTPNotFound(text=str(e))
return {
'question': question,
'choices': choices
}

之后还要注册模板

1
2
3
4
import aiohttp_jinja2
import jinja2

aiohttp_jinja2.setup(app, loader=jinja2.PackageLoader('aiohttpdemo_polls', 'templates'))

静态文件

1
2
3
app.router.add_static('/static/',
path=str(project_root / 'static'),
name='static')

使用中间件

1
2
3
4
def setup_middlewares(app):
error_middleware = error_pages({404: handle_404,
500: handle_500})
app.middlewares.append(error_middleware)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def error_pages(overrides):
async def middleware(app, handler):
async def middleware_handler(request):
try:
response = await handler(request)
override = overrides.get(response.status)
if override is None:
return response
else:
return await override(request, response)
except web.HTTPException as ex:
override = overrides.get(ex.status)
if override is None:
raise
else:
return await override(request, ex)
return middleware_handler
return middleware

这些overrides(handle_404和handle_500)只是简单的用Jinja2模板渲染:

1
2
3
4
5
6
7
8
async def handle_404(request, response):
response = aiohttp_jinja2.render_template('404.html',request,{})
return response


async def handle_500(request, response):
response = aiohttp_jinja2.render_template('500.html',request,{})
return response

启动一个简单的Web服务器

部署web服务器首先要创建一个 请求处理器(request handler)。
请求处理器可以是协程方法也可以是普通方法,它只有一个用于接受Request实例对象的参数,之后会返回Response实例对象:

1
2
3
4
from aiohttp import web

async def hello(request):
return web.Response(text="Hello, world")

hello(request)这个协程就像是Django中的视图函数,在这里被称为请求处理器

创建应用(Appliaction)并将请求处理器配置到应用的路由

1
2
app = web.Application()
app.router.add_get('/', hello)

调用run_app()来启动应用

1
web.run_app(app)

使用命令行接口(CLI)

aiohttp.web带有一个基于TCP/IP的基本命令行接口,用于快速启动一个应用(Application)。

1
python -m aiohttp.web -H localhost -P 8080 package.module:init_func

package.module:init_func应是一个可导入,调用的对象,有一个接受包含命令行参数列表的参数,配置好之后返回Application对象

简单来说,有两种启动服务器方式:

  • 使用命令行:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    # test.py
    from aiohttp import web

    async def index(request):
    return web.Response(text='hello world2')

    def init_func(argv):
    app = web.Application()
    app.router.add_get("/", index)
    return app

    执行:

    1
    python -m aiohttp.web -H localhost -P 8081 test:init_func
  • 使用python模块:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    # test.py
    from aiohttp import web

    # 创建app
    app = web.Application()

    # 创建视图函数
    async def index(request):
    return web.Response(text='hello world2')

    # 绑定url到视图函数
    app.router.add_get('/',index)

    # 运行app
    web.run_app(app, host='127.0.0.1', port=8080)

    之后执行:

    1
    python tesy.py

使用处理器

就像前面说的,aiohttp的处理器就像是Django里的视图函数.

处理器对象可以被任意调用,它只接受Request实例对象,并且返回StreamResponse的派生对象实例(如Response):

处理器可以是函数也可以是协程

1
2
3
4
5
6
def handler(request):
return web.Response()

# 或者
async def handler(request):
return web.Response()

处理器必须被注册在路由上才能使用:

1
2
3
app.router.add_get('/', handler)
app.router.add_post('/post', post_handler)
app.router.add_put('/put', put_handler)

add_route()同样支持使用通配符方法:

1
app.router.add_route('*', '/path', all_handler)

可以使用Request.method来获知请求所使用的HTTP方法。

使用资源和路由

  • app.router.add_getapp.router.add_post其实是app.router.add_route(method, handler)的子类方法
  • 同样,app.router.add_route(method, handler)app.router..add_resource(path, name=name)的子类方法
1
2
3
4
5
app.router.add_get('/{name}',index)

# 等价于
resource = app.router.add_resource('/{name}')
resource.add_route('GET', index)

使用路径通配

比如,如果某一路径是'/a/{name}/c',那'/a/b/c', '/a/1/c','/a/etc/c',这样的路径就都可以匹配到。
使用Request.match_info来完成这一操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
from aiohttp import web

app = web.Application()

# 创建视图函数
async def index(request):
return web.Response(text="Hello, {}".format(request.match_info['name']))

# 使用通配
app.router.add_get('/{name}',index)

# 运行app
web.run_app(app, host='127.0.0.1', port=8080)

{name}的背后其实就是正则表达式,
所以,我们也可以像Django一样,使用自定义正则表达式:

1
app.router.add_get(r'/{name:\d+}')

url的反向解析

和django一样,aiohttp同样支持url反向解析

1
app.router.add_get('/', name='index')

之后可以访问和构建此资源的URL:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from aiohttp import web


app = web.Application()
async def info(request):
print('***********************************')
print(request.app.router['info'])
print(request.app.router['info'].url_for())
print(request.app.router['info'].url_for().with_query({"a": "b", "c": "d"}))
print('***********************************')
return web.Response(text="Hello, world")

app.router.add_get(r'/info',info,name='info')
web.run_app(app, host='127.0.0.1', port=8080)

# 访问http://127.0.0.1:8080/info
# ***********************************
# <PlainResource 'info' /info>
# /info
# /info?a=b&c=d
# ***********************************
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from aiohttp import web


app = web.Application()
async def info(request):
print('***********************************')
print(request.app.router['name'])
print(request.app.router['name'].url_for(name='hyl'))
print(request.app.router['name'].url_for(name='hyl').with_query({"a": "b", "c": "d"}))
print('***********************************')
return web.Response(text="Hello, world")

app.router.add_get('/{name}',info,name='name')
web.run_app(app, host='127.0.0.1', port=8080)

# 访问http://127.0.0.1:8080/asdasdada
# ***********************************
# <PlainResource 'name' /asdasdada>
# /hyl
# /hyl?a=b&c=d
# ***********************************

将处理器放到类中使用

将具有同样逻辑的一组路由放到类中:

1
2
3
async def hello(request):
return web.Response(text="Hello, world")
app.router.add_get('/', hello)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class Handler:

def __init__(self):
pass

def handle_intro(self, request):
return web.Response(text="Hello, world")

async def handle_greeting(self, request):
name = request.match_info.get('name', "Anonymous")
txt = "Hello, {}".format(name)
return web.Response(text=txt)

handler = Handler()
app.router.add_get('/intro', handler.handle_intro)
app.router.add_get('/greet/{name}', handler.handle_greeting)

注意 :

  • 这与Django不太一致,Django的视图类是针对同一个url请求,分成不同的方法(get,post)等等
  • 但是aiohttp已经使用add_get来明确标明请求方法,所以这个类只是将一些url的处理函数组合起来而已.

但是,如果不使用add_get方法,而是使用add_route,就可以使用类似django风格的基础视图类。

1
2
3
4
5
6
7
8
9
class MyView(web.View):
async def get(self):
return await get_resp(self.request)

async def post(self):
return await post_resp(self.request)

# 第一参数是请求方法,'*'表示任何方法都行,第二参数是路径,第三参数是处理器
app.router.add_route('*', '/path/to', MyView)

此时的处理器必须是一个协程方法,且只接受self参数

资源视图

所有在路由中注册的资源都可使用UrlDispatcher.routes()查看:

1
2
for resource in app.router.routes():
print(resource)

name的资源可以用UrlDispatcher.named_routes()来查看:

1
2
for name, resource in app.router.named_routes().items():
print(name, resource)

其他注册路由的方式

  • 路由表
    (Django风格)
  • 路由装饰器
    (Flask风格)
1
2
3
4
5
6
7
8
9
10
# 路由表
async def handle_get(request):
...


async def handle_post(request):
...

app.router.add_routes([web.get('/echo', handle_get),
web.post('/{name}', handle_post),
1
2
3
4
5
6
7
8
9
10
11
12
13
# 装饰器
routes = web.RouteTableDef()

@routes.get('/get')
async def handle_get(request):
...


@routes.post('/post')
async def handle_post(request):
...

app.router.add_routes(routes)

Web 处理器中的取消操作

web处理器中每一条await语句都可被取消,这种情况一般发生在客户端还没有完全读取响应体然后中断了连接。
与著名WSGI架构 如Flask和Django在这点上有所不同。

  • 在处理GET请求时,代码可能会从数据库或其他web资源中获取数据,这些查询可能很慢。
  • 这时候取消查询是最好的: 该连接已经被抛弃了,没有理由再浪费时间和资源(内存等)进行查询,已经没有机会响应了。

为了预防被取消掉,生成一个新的任务不靠谱,这样不能等待:

1
2
3
async def handler(request):
request.loop.create_task(write_to_redis(request))
return web.Response('OK')

预防被取消掉可以用下列几种方法:

  • 使用 asyncio.shield() 来进行存进数据库的处理。
  • 开启一个存入数据库的协程任务。
  • 使用aiojobs或其他第三方库。

asyncio.shield() 挺不错的,保护协同程序不会被退出。唯一的缺点是你需要分清哪些是需要得到保护的代码哪些不是。

使用@atomic装饰器可以使整个处理器都防止有取消操作:

1
2
3
4
5
6
7
8
9
10
from aiojobs.aiohttp import atomic

@atomic
async def handler(request):
await write_to_db()
return web.Response()

app = web.Application()
setup(app)
app.router.add_post('/', handler)

自定义路由准则

下面代码就是:
对应首页网址,只接受请求头的accept为'application/json''application/xml'的请求

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8
class AcceptChooser:

def __init__(self):
self._accepts = {}

async def do_route(self, request):
# 获取accept请求头的参数
for accept in request.headers.getall('ACCEPT', []):
# 获取处理器
acceptor = self._accepts.get(accept)
# 如果存在该处理器
if acceptor is not None:
return (await acceptor(request))
raise HTTPNotAcceptable()

# 添加处理器
def reg_acceptor(self, accept, handler):
self._accepts[accept] = handler


async def handle_json(request):
# do json handling
pass

async def handle_xml(request):
# do xml handling
pass

chooser = AcceptChooser()

chooser.reg_acceptor('application/json', handle_json)
chooser.reg_acceptor('application/xml', handle_xml)

app.router.add_get('/', chooser.do_route)

静态文件的处理

处理静态文件( 图片,JavaScripts, CSS文件等)最好的方法是使用反向代理,像是nginx或CDN服务。

  • 通过 UrlDispatcher.add_static()注册个新的静态路由:

    1
    app.router.add_static('/prefix', path_to_static_folder)
  • 当访问静态文件的目录时,默认服务器会返回 HTTP/403 Forbidden。
    使用show_index并将其设置为True可以显示出索引:

    1
    app.router.add_static('/prefix', path_to_static_folder, show_index=True)
  • 当从静态文件目录访问一个符号链接(软链接)时,默认服务器会响应 HTTP/404 Not Found(未找到)。
    使用follow_symlinks并将其设置为True可以让服务器使用符号链接:

    1
    app.router.add_static('/prefix', path_to_static_folder, follow_symlinks=True)
  • 如果你想允许缓存清除,使用append_version并设为True

    缓存清除会对资源文件像JavaScript 和 CSS文件等的文件名上添加一个hash后的版本。这样的好处是我们可以让浏览器无限期缓存这些文件而不用担心这些文件是否发布了新版本。

    1
    app.router.add_static('/prefix', path_to_static_folder, append_version=True)

模板

使用第三方库 aiohttp_jinja2

首先我们用aiohttp_jinja2.setup()来设置下jinja2环境:

1
2
3
4
import aiohttp_jinja2

app = web.Application()
aiohttp_jinja2.setup(app,loader=jinja2.FileSystemLoader('/path/to/templates/folder'))

然后将模板引擎应用到处理器中。最简单的方式是使用aiohttp_jinja2.templates()装饰器:

1
2
3
@aiohttp_jinja2.template('tmpl.jinja2')
def handler(request):
return {'name': 'Andrew', 'surname': 'Svetlov'}

返回JSON 响应

使用 aiohttp.web.json_response()可以返回JSON响应:

1
2
3
def handler(request):
data = {'some': 'data'}
return web.json_response(data)

类似于Django的return JsonResponse({...}, safe=False)

这个方法返回的是aiohttp.web.Response实例对象,所以你可以做些其他的事,比如设置cookies。

处理用户会话

通过请求存储用户数据的仓库。一般简称为会话。

aiohttp.web没有内置会话,不过你可以使用第三方库aiohttp_session来提供会话支持:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import asyncio
import time
import base64
from cryptography import fernet

from aiohttp import web
from aiohttp_session import setup, get_session, session_middleware
from aiohttp_session.cookie_storage import EncryptedCookieStorage

async def handler(request):
# 获取session
session = await get_session(request)
last_visit = session['last_visit'] if 'last_visit' in session else None
text = 'Last visited: {}'.format(last_visit)
return web.Response(text=text)

def make_app():
app = web.Application()
# 设置secret_key
fernet_key = fernet.Fernet.generate_key()
# secret_key必须是32位的url安全的经过base64编码的字节
secret_key = base64.urlsafe_b64decode(fernet_key)

setup(app, EncryptedCookieStorage(secret_key))

app.router.add_route('GET', '/', handler)
return app

web.run_app(make_app())

处理HTTP表单

  • 如果表单的方法是 “GET”,要使用Request.query获取数据。

  • 如果是“POST”则用Request.post()Request.multipart() 获取数据。

    • Request.post()接受标明为'application/x-www-form-urlencoded''multipart/form-data' 的数据()。
    • 它会将数据存进一个临时字典中。如果指定了client_max_size,超出了的话会抛出ValueError异常,
    • 这时使用Request.multipart()是更好的选择,尤其是在上传大文件时。

由以下表单发送的数据:

1
2
3
4
5
6
7
8
9
10
<form action="/login" method="post" accept-charset="utf-8"
enctype="application/x-www-form-urlencoded">

<label for="login">Login</label>
<input id="login" name="login" type="text" value="" autofocus/>
<label for="password">Password</label>
<input id="password" name="password" type="password" value=""/>

<input type="submit" value="login"/>
</form>

可以用以下方法获取:

1
2
3
4
async def do_login(request):
data = await request.post()
login = data['login']
password = data['password']

文件上传

首先呢我们要确保标签的有enctype元素并且设置为了"multipart/form-data"

1
2
3
4
5
6
7
8
<form action="/store/mp3" method="post" accept-charset="utf-8"
enctype="multipart/form-data">

<label for="mp3">Mp3</label>
<input id="mp3" name="mp3" type="file" value=""/>

<input type="submit" value="submit"/>
</form>

请求处理器中接受这个文件,它变成了一个FileField实例对象。

  • Reuest.post()会把所有数据读到内容
  • Request.multipart()返回的是multipart读取器。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
async def store_mp3_handler(request):

# 注意,如果是个很大的文件不要用这种方法。
data = await request.post()

mp3 = data['mp3']

# .filename 包含该文件的名称,是个字符串。
filename = mp3.filename

# .file 包含该文件的内容。
mp3_file = data['mp3'].file

content = mp3_file.read()

return web.Response(body=content,
# mp3_file是一个二进制文件,使用MultiDict
headers=MultiDict(
{'CONTENT-DISPOSITION': mp3_file}))
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
async def store_mp3_handler(request):

reader = await request.multipart()

# 激活生成器
mp3 = await reader.next()

filename = mp3.filename

# 如果是分块传输的,别用Content-Length做判断。
size = 0
with open(os.path.join('/spool/yarrr-media/mp3/', filename), 'wb') as f:
while True:
chunk = await mp3.read_chunk() # 默认是8192个字节。
if not chunk:
break
size += len(chunk)
f.write(chunk)

return web.Response(text='{} sized of {} successfully stored'
''.format(filename, size))

使用WebSockets

在处理器中创建一个WebSocketResponse对象即可设置WebSocket,之后即可进行通信:

websocket使用GET方法注册

1
2
# 用HTTP GET方法注册
app.router.add_get('/ws', websocket_handler)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
async def websocket_handler(request):

ws = web.WebSocketResponse()
await ws.prepare(request)

async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
if msg.data == 'close':
await ws.close()
else:
await ws.send_str(msg.data + '/answer')

elif msg.type == aiohttp.WSMsgType.ERROR:
print('ws connection closed with exception %s' %ws.exception())

print('websocket connection closed')

return ws
  • WebSocket中读取数据(await ws.receive())必须在请求处理器内部完成,
  • 不过写数据(ws.send_str(...)),关闭(await ws.close())和取消操作可以在其他任务中完成。

注意 :

不要从websocket中并行地读取数据,aiohttp.web.WebSocketResponse.receive()不能在分布在两个任务中同时调用。

异常

  • aiohttp.web定义了所有HTTP状态码的异常。
  • 每个异常都是HTTPException的子类,同样还都是Response的子类
  • 所以就允许你在请求处理器中返回或抛出它们。
1
2
async def handler(request):
return aiohttp.web.HTTPFound('/redirect')

异常等级图:

HTTPException
HTTPSuccessful
* 200 - HTTPOk
* 201 - HTTPCreated
* 202 - HTTPAccepted
* 203 - HTTPNonAuthoritativeInformation
* 204 - HTTPNoContent
* 205 - HTTPResetContent
* 206 - HTTPPartialContent
HTTPRedirection
* 300 - HTTPMultipleChoices
* 301 - HTTPMovedPermanently
* 302 - HTTPFound
* 303 - HTTPSeeOther
* 304 - HTTPNotModified
* 305 - HTTPUseProxy
* 307 - HTTPTemporaryRedirect
* 308 - HTTPPermanentRedirect
HTTPError
HTTPClientError
* 400 - HTTPBadRequest
* 401 - HTTPUnauthorized
* 402 - HTTPPaymentRequired
* 403 - HTTPForbidden
* 404 - HTTPNotFound
* 405 - HTTPMethodNotAllowed
* 406 - HTTPNotAcceptable
* 407 - HTTPProxyAuthenticationRequired
* 408 - HTTPRequestTimeout
* 409 - HTTPConflict
* 410 - HTTPGone
* 411 - HTTPLengthRequired
* 412 - HTTPPreconditionFailed
* 413 - HTTPRequestEntityTooLarge
* 414 - HTTPRequestURITooLong
* 415 - HTTPUnsupportedMediaType
* 416 - HTTPRequestRangeNotSatisfiable
* 417 - HTTPExpectationFailed
* 421 - HTTPMisdirectedRequest
* 422 - HTTPUnprocessableEntity
* 424 - HTTPFailedDependency
* 426 - HTTPUpgradeRequired
* 428 - HTTPPreconditionRequired
* 429 - HTTPTooManyRequests
* 431 - HTTPRequestHeaderFieldsTooLarge
* 451 - HTTPUnavailableForLegalReasons
HTTPServerError
* 500 - HTTPInternalServerError
* 501 - HTTPNotImplemented
* 502 - HTTPBadGateway
* 503 - HTTPServiceUnavailable
* 504 - HTTPGatewayTimeout
* 505 - HTTPVersionNotSupported
* 506 - HTTPVariantAlsoNegotiates
* 507 - HTTPInsufficientStorage
* 510 - HTTPNotExtended
* 511 - HTTPNetworkAuthenticationRequired

所有的异常都拥有相同的结构:

1
HTTPNotFound(*, headers=None, reason=None,body=None, text=None, content_type=None)
  • 如果没有指定headers,默认是响应中的headers。

  • 其中HTTPMultipleChoices, HTTPMovedPermanently, HTTPFound, HTTPSeeOther, HTTPUseProxy, HTTPTemporaryRedirect的结构是下面这样的:

    1
    HTTPFound(location, *, headers=None, reason=None,body=None, text=None, content_type=None)

    location参数的值会写入到HTTP头部的Location中。

  • HTTPMethodNotAllowed的结构是这样的:

    1
    HTTPMethodNotAllowed(method, allowed_methods, *,headers=None, reason=None,body=None, text=None, content_type=None)

    method是不支持的那个方法,allowed_methods是所支持的方法。

数据共享

  • aiohttp.web不推荐使用全局变量进行数据共享。每个变量应在自己的上下文中而不是全局可用的。

  • aiohttp.web.Applicationaiohttp.web.Request提供collections.abc.MutableMapping(类字典对象)来存储数据。

    类全局变量存储到Application实例对象中:

    1
    app['my_private_key'] = data

    之后就可以在web处理器中获取出来:

    1
    2
    async def handler(request):
    data = request.app['my_private_key']

    如果变量的生命周期是一次请求,可以存储在请求中。

    1
    2
    async def handler(request):
    request['my_private_key'] = "data"

中间件

中间件是一个协程程序帮助修正请求和响应

1
2
3
4
5
6
7
8
from aiohttp.web import middleware

# 在响应中添加'wink'字符串:
@middleware
async def middleware(request, handler):
resp = await handler(request)
resp.text = resp.text + ' wink'
return resp

对于流式响应和websockets该例子不起作用

每个中间件需要接受两个参数,一个是请求实例另一个是处理器,中间件需要返回响应内容。

创建Application时,可以通过middlewares参数传递中间件过去:

1
app = web.Application(middlewares=[middleware_1, middleware_2])
  • 中间件写法有点像python里的上下文管理器context manager装饰器的写法,
    一个函数同时处理__enter____exit__事件.
    web.middleware也一样,一个函数同时处理request和response.

    • contextlib.contextmanager使用yield分开两个事件
    • web.middleware使用await分开request和response的处理
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    @contextlib.contextmanager
    def createContextManager(name):
    print '__enter__ %s' % name
    yield name
    print '__exit__ %s' % name


    @web.middleware
    async def middleware1(request, handler):
    print('Middleware 1 called')
    response = await handler(request)
    print('Middleware 1 finished')
    return response
  • 和Django,Scrapy的中间件一样,正序处理request,逆序处理response

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    from aiohttp import web

    def test(request):
    print('Handler function called')
    return web.Response(text="Hello")

    @web.middleware
    async def middleware1(request, handler):
    # 处理request
    print('Middleware 1 called')
    response = await handler(request)
    # 处理response
    print('Middleware 1 finished')
    return response

    @web.middleware
    async def middleware2(request, handler):
    # 处理request
    print('Middleware 2 called')
    response = await handler(request)
    # 处理response
    print('Middleware 2 finished')
    return response


    app = web.Application(middlewares=[middleware1, middleware2])
    app.router.add_get('/', test)
    web.run_app(app)


    # Middleware 1 called
    # Middleware 2 called
    # Handler function called
    # Middleware 2 finished
    # Middleware 1 finished

Django的中间件除了处理request和response,还可以处理error.
aiohttp也可以

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import json
from aiohttp import web

def json_error(message):
return web.Response(
body=json.dumps({'error': message}).encode('utf-8'),
content_type='application/json')

@web.middleware
async def error_middleware(request, handler):
try:
response = await handler(request)
if response.status == 404:
return json_error(response.message)
return response
except web.HTTPException as ex:
if ex.status == 404:
return json_error(ex.reason)
raise

app = web.Application(middlewares=[error_middleware])

信号

尽管中间件可以自定义之前和之后的处理行为,但并不能自定义响应中的行为。所以信号量由此而生。

比如,中间件只能改变没有预定义HTTP头的响应的HTTP 头(看on_prepare()),但有时我们需要一个可以改变流式响应和WebSockets HTTP头的钩子。所以我们可以用on_response_prepare信号来充当这个钩子:

1
2
3
4
async def on_prepare(request, response):
response.headers['My-Header'] = 'value'

app.on_response_prepare.append(on_prepare)

简单来说就是,每当response准备好的时候就会触发on_prepare

此外,你也可以用on_startupon_cleanup信号来捕获应用开启和释放时的状态。
解释说,每当app运行和结束前会执行create_aiopg和dispose_aiopg函数.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from aiopg.sa import create_engine

async def create_aiopg(app):
app['pg_engine'] = await create_engine(
user='postgre',
database='postgre',
host='localhost',
port=5432,
password=''
)

async def dispose_aiopg(app):
app['pg_engine'].close()
await app['pg_engine'].wait_closed()

app.on_startup.append(create_aiopg)
app.on_cleanup.append(dispose_aiopg)

嵌套应用

就像Django有很多app一样,aiohttp也支持多个app.

创建名为 admin 的子应用,我们可以用add_subapp()来完成:

1
2
3
4
admin = web.Application()
# setup admin routes, signals and middlewares

app.add_subapp('/admin/', admin)
  • 主应用和子应用间的中间件和信号是一个环一样的结构。

  • 也就是说如果请求'/admin/something'先调用主应用的中间件然后在调用子应用(admin.middlewares)的中间件。 信号也同样。

  • 所有注册的基础信号如on_startup,on_shutdown,on_cleanup都会给子应用也注册一份。只不过传递的参数是子应用。

  • 子应用也可以嵌套子应用。 子应用也可以使用Url反向引用,只不过会带上前缀:

    1
    2
    3
    4
    5
    6
    admin = web.Application()
    admin.router.add_get('/resource', handler, name='name')

    app.add_subapp('/admin/', admin)

    url = admin.router['name'].url_for() # url为'/admin/resource'
  • 如果主应用想得到子应用的Url反向引用,可以给主app注册一个变量指向子app:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    admin = web.Application()
    admin.router.add_get('/resource', handler, name='name')

    app.add_subapp('/admin/', admin)
    # 给主app注册一个变量指向子app
    app['admin'] = admin

    async def handler(request): # main application's handler
    admin = request.app['admin']
    url = admin.router['name'].url_for()

流控制

aiohttp.web有复杂的流控制机制来应对底层TCP套接字写入缓存。
问题是: 默认情况下TCP套接字使用Nagle算法来输出缓存,但这种算法对于流式数据协议如HTTP不是很理想。

Web服务器的响应是以下几种状态其中一个:

  1. CORK(tcp_cork设置为True)。这个选项不会发送一部分TCP/IP帧。当这个选项被(再次)清除时会发送所有已经在队列中的片段帧。因为会把各种小帧聚合起来发送,所以这种方式对发送大量片段数据非常理想。 如果操作系统不支持CORK模式(不管是socket.TCP_CORK还是socket.TCP_NOPUSH)那该模式与Nagle模式一样。一般来说是windows系统不支持此模式。
  2. NODELAY(tcp_nodelay设置为True)。这个选项会禁用Nagle算法。选用这个那么无论数据多小都会尽快发出去,即使是很小的数据。该模式对发送少量数据非常理想。
  3. Nagle算法(tcp_cork和tcp_nodelay都为False)。该模式会先缓存数据,直到达到预定的数据大小后再一起发送。如果要发送HTTP数据应该避免使用这个模式除非你确定要使用它。

默认情况下,流数据StreamResponse),标准响应Response和http异常及其派生类)和websocketsWebSocketResponse)使用NODELAY模式,静态文件处理器使用CORK模式。

可以使用set_tcp_cork()方法和set_tcp_nodelay()方法手动切换。

使用Expect请求头

Expect是一个请求消息头,包含一个期望条件,表示服务器只有在满足此期望条件的情况下才能妥善地处理请求。

  • aiohttp.web支持使用Expect头。默认是HTTP/1.1 100 Continue,如果Expect头不是"100-continue"则抛出HTTPExpectationFailed异常。
  • 你可以自定义Expect头处理器。如果Expect头存在的话则会调用Expect处理器Expect处理器会先于中间件和路由处理器被调用。
  • Expect处理器的返回值 :
    1. 可以返回None,返回None则会继续执行(调用中间件和路由处理器)。
    2. 如果返回的是StreamResponse实例对象,之后请求处理器则使用该返回对象作为响应内容。
    3. 也可以抛出HTTPException的子类对象。抛出错误的时候之后的处理将不会进行,客户端将会接受一个适当的http响应。

自定义expect请求处理器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
async def check_auth(request):
if request.version != aiohttp.HttpVersion11:
return

if request.headers.get('EXPECT') != '100-continue':
raise HTTPExpectationFailed(text="Unknown Expect: %s" % expect)

if request.headers.get('AUTHORIZATION') is None:
raise HTTPForbidden()

request.transport.write(b"HTTP/1.1 100 Continue\r\n\r\n")

async def hello(request):
return web.Response(body=b"Hello, world")

app = web.Application()
# 设置expect请求的处理器
app.router.add_get('/', hello, expect_handler=check_auth)

优雅地关闭

  • 停止aiohttp web服务器时只关闭打开的连接时不够的。 因为可能会有一些websockets或流,在服务器关闭时这些连接还是打开状态。
  • 开发者可以使用Applicaiton.on_shutdown信号来完善这一功能。

关闭websocket处理器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# 关闭websocket处理器
app = web.Application()
app['websockets'] = []

async def websocket_handler(request):
ws = web.WebSocketResponse()
await ws.prepare(request)

request.app['websockets'].append(ws)
try:
async for msg in ws:
...
finally:
request.app['websockets'].remove(ws)

return ws


# 信号处理器:
async def on_shutdown(app):
for ws in app['websockets']:
await ws.close(code=WSCloseCode.GOING_AWAY,
message='Server shutdown')

app.on_shutdown.append(on_shutdown)

合适的关闭程序要注意以下:

  1. 不再接受新的连接。注意调用asyncio.Server.close()asyncio.Server.wait_closed()来关闭。
  2. 解除Application.shutdown()事件。
  3. 在一小段延迟后调用Server.shutdown()关闭已经开启的连接。
  4. 发出Application.cleanup()信号。

下列代码演示从开始到结束:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
loop = asyncio.get_event_loop()
handler = app.make_handler()

f = loop.create_server(handler, '0.0.0.0', 8080)
srv = loop.run_until_complete(f)
print('serving on', srv.sockets[0].getsockname())

try:
loop.run_forever()
except KeyboardInterrupt:
pass
finally:
srv.close()
loop.run_until_complete(srv.wait_closed())
loop.run_until_complete(app.shutdown())
loop.run_until_complete(handler.shutdown(60.0))
loop.run_until_complete(app.cleanup())
loop.close()

后台任务

  • 例如创建一个后台任务,用于在zmq.SUB套接字上监听ZeroMQ,然后通过WebSocket(app['websockets'])处理并转发接收到的消息给客户端。
  • 使用Application.on_startup信号注册的后台任务可以让这些任务在应用的请求处理器执行时一并执行。 比如我们需要一个一次性的任务和两个常驻任务。最好的方法是通过信号注册:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
async def listen_to_redis(app):
try:
sub = await aioredis.create_redis(('localhost', 6379), loop=app.loop)
ch, *_ = await sub.subscribe('news')
async for msg in ch.iter(encoding='utf-8'):
# Forward message to all connected websockets:
for ws in app['websockets']:
ws.send_str('{}: {}'.format(ch.name, msg))
except asyncio.CancelledError:
pass
finally:
await sub.unsubscribe(ch.name)
await sub.quit()


async def start_background_tasks(app):
app['redis_listener'] = app.loop.create_task(listen_to_redis(app))


async def cleanup_background_tasks(app):
app['redis_listener'].cancel()
await app['redis_listener']


app = web.Application()
app.on_startup.append(start_background_tasks)
app.on_cleanup.append(cleanup_background_tasks)
web.run_app(app)

listen_to_redis()将会一直运行下去。当关闭时发出的on_cleanup信号会调用关闭处理器以关闭它。

底层服务器

  • 有时候用户不需要更高级的封装,像是 application,routers和signals。 只是需要一个支持异步调用并且是接受请求返回响应对象的东西。
  • 在aiohttp.web.Server类中有介绍过一个服务协议工厂——asyncio.AbstractEventLoop.create_server()
  • 他可以将数据流桥接到web处理器以及反馈结果。 底层web处理器应该接收单个BaseRequest参数并且执行下列中的其中一个:
    1. 返回一个包含HTTP响应体的响应对象。
    2. 创建一个StreamResponse对象,然后可以调用StreamResponse.prepare()发送头信息,调用StreamResponse.write() / StreamResponse.drain()发送数据块,最后结束响应。
    3. 抛出HTTPException派生的异常(看Exception部分)。
    4. 使用WebSocketResponse发起/处理Web-Socket连接。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import asyncio
from aiohttp import web


async def handler(request):
return web.Response(text="OK")


async def main(loop):
server = web.Server(handler)
await loop.create_server(server, "127.0.0.1", 8080)
print("======= Serving on http://127.0.0.1:8080/ ======")

# pause here for very long time by serving HTTP requests and
# waiting for keyboard interruption
# 通过提供HTTP请求并等待键盘中断,在此暂停很长时间
await asyncio.sleep(100*3600)


loop = asyncio.get_event_loop()

try:
loop.run_until_complete(main(loop))
except KeyboardInterrupt:
pass
loop.close()

这个处理器可以接受所有的请求: 不论GET, POST, Web-Socket都可以,无论哪一个路径的访问也都同样由其处理。 不过也同样很基础: 无论如何处理器都只返回200 OK。实际生活中所产生的状态要复杂的多。

APIs

请求类

基础请求(BaseRequest)

  • Request 对象中包含所有的HTTP请求信息。
    Request对象拥有Request.app和Request.match_info属性。
  • BaseRequest 用在底层服务器中(底层服务器没有应用,路由,信号和中间件)。
  • BaseRequest和Reuqest都是类字典对象,以便在中间件和信号处理器中共享数据。
1
class aiohttp.web.BaseRequest

属性:

  • version : 发起请求的HTTP版本,该属性只读。

  • method : 发起请求的HTTP方法,该属性只读。

  • url : 包含资源的绝对路径的URL实例对象。

  • rel_url : 包含资源的相对路径的URL实例对象。与.url.relative()相同。

  • host : 主机名

  • remote : 初始化HTTP请求的IP地址。

  • path_qs : 包含路径信息和查询字符串的URL(如/app/blog?id=10

  • path : 包含路径信息但不含有主机(host)或协议(scheme)的URL(/app/blog)。路径是URL解码(URL-unquoted)后的。

  • raw_path : 同 path ,路径是没有经过URL解码的。(如/my%2Fpath%7Cwith%21some%25strange%24characters)

  • query : 带有查询字符串的并联字典。

  • headers : 请求头

  • transport : 用于处理请求的传输端口,该属性只读。
    该属性可以被用在获取客户端 peer的IP地址时。

    1
    2
    3
    peername = request.transport.get_extra_info('peername')
    if peername is not None:
    host, port = peername
  • cookies

  • content : StreamReader实例对象,用于读取请求的主体(BODY)的输入流。

  • content_type : 返回Content-Type头信息的内容,该属性只读。类型是str,如’text/html’。
    同理还有charset,content_length,http_range

方法 :

  • clone(*, method=…, rel_url=…, headers=…)
    克隆自己,并将相应的值替换。

    创建并返回一个新Request实例对象。如果没有传递任何参数,将会复制一份一模一样的。如果没有传递某一个参数,那个值与原值一样。

  • coroutine json(*, loads=json.loads)
    读取请求主体,以JSON形式返回。

  • coroutine multipart(*, reader=aiohttp.multipart.MultipartReader)
    返回一个用于处理即将到来的multipart请求的aiohttp.multipart.MultipartReader对象。

请求(Request)

1
class aiohttp.web.Request

在web处理器中接受请求信息的Request类。
所有的处理器的第一个参数都要接受Request类的实例对象。
该类派生于BaseRequest,支持父类中所有的方法和属性。还有几个额外的:

  • match_info : 返回AbstractMatchInfo实例对象,内容是路由解析的结果,该属性只读。
  • app : 返回一个用于调用请求处理器的应用(Application)实例对象。

响应类

aiohttp.web提供三个不同的响应类: StreamResponse, ResponseFileResponse

StreamResponse用于流数据,Response的父类是StreamResponse

StreamResponse

1
class aiohttp.web.StreamResponse(*, header,status=200, reason=None)

参数 :

  • status (int)
  • reason (int)
  • task : 一个承载请求处理的任务。在关闭服务器时对于那些需要长时间运行的请求(流内容,长轮询或web-socket)非常有用。
  • chunked : 指代是否使用了分块编码,该属性只读。可以通过调用enable_chunked_encoding()开启分块编码。
  • cookies
  • content_length , 同理还有 content_type charset last_modified

方法 :

  • set_cookie(name, value, *, path=’/‘, expires=None, domain=None, max_age=None, secure=None, httponly=None, version=None)

    • name (str) - cookie名称。
    • value (str) - cookie值(如果是其他类型的话会尝试转换为str类型)
    • expires - 过期时间(可选)。
    • domain (str) - cookie主域(可选)。
    • max_age (int) - 定义cookie的生命时长,以秒为单位。该参数为非负整数。在经过这些秒后,客户端会抛弃该cookie。如果设置为0则表示立即抛弃。
    • path (str) - 设置该cookie应用在哪个路径上(可选,默认是’/‘)。
    • secure (bool) - 该属性(没有任何值)会让用户代理使用安全协议。用户代理(或许会在用户控制之下)需要决定安全等级,在适当的时候考虑使用“安全”cookie。是不是要使用“安全”要考虑从服务器到用户代理的这段历程,“安全”协议的目的是保证会话在安全情况下进行(可选)。
    • httponly (bool) - 如果要设置为HTTP only则为True(可选)。
    • version (int) - 一个十进制数,表示使用哪个版本的cookie管理(可选,默认为1)。

    expires 和 max_age 功能一致.但是建议使用max_age

  • del_cookie(name, *, path=’/‘. domain=None)
    删除某cookie。

  • corotine prepare(request)
    发送HTTP头信息出去。
    在调用了该方法之后你就不要在修改任何头信息的数据了。该方法同时会调用on_response_prepare信号所连接的处理器。

  • write(data)
    发送byte-ish数据,该数据作为响应主体的一部分。
    需要在此之前调用prepare()

  • coroutine write_eof()
    一个可以作为HTTP响应结束标志的协程方法。
    如果需要的话,内部会在完成请求处理后调用这个方法
    调用write_eof()后,任何对响应对象的操作都是禁止的。

Response

1
class aiohttp.web.Response(*, body=None, status=200, reason=None, text=None, headers=None, content_type=None, charset=None)

最常用的响应类,继承于StreamResponse。StreamResponse的参数和方法不赘述。

参数 :

  • body(bytes) : 响应主体。类型为bytes。
  • text(str) : 响应主体。类型为str。
  • status
  • headers
  • content_type(str) : 响应的内容类型。

WebSocketResponse

1
class aiohttp.web.WebSocketResponse(\, timeout=10.0, receive_timeout=None, autoclose=True, autoping=True, heartbeat=None, protocols=(), compress=True)
  • 用于进行服务端处理websockets的类。
  • 调用prepare()后你就不能在使用write()方法了,但你仍然可以与websocket客户端通过send_str(), receive()等方法沟通。

为什么websocket的相关方法必须是协程?

为了支持缓慢的websockets所带来的背压现象(back-pressure,服务器达到可连接的上限,而这时又有新的客户端请求连接就会出现背压)

参数:

  • autoping (bool) - 自动向发送了PING消息的客户端发送PONG消息,以及自动处理客户端的PONG响应。需要注意的是该方法不会自动向客户端发送PING消息,你需要自己调用ping()方法实现。

  • heartbeat (float) - 每一次心跳都发送ping消息并等待pong消息,如果没有接收到pong响应则关闭连接。

  • receive_timeout (float) - 接收操作的超时时间。默认是None也就是不限时间。

  • compress (bool) - 允许每一次消息都使用deflate扩展。传入False禁止此功能。默认是True。

    该类支持async for语句来对即将到来的消息进行迭代:

    1
    2
    3
    4
    5
    ws = web.WebSocketResponse()
    await ws.prepare(request)

    async for msg in ws:
    print(msg.data)

方法:

  • coroutine prepare(request)
    开启websocket。调用此方法后你就可以使用其他的websockets方法了。
  • coroutine send_str(data)
    向peer发送TEXT(文本)消息。
  • coroutine send_bytes(data)
    向peer发送BINARY(二进制)消息。
  • corotine close(*, code=1000, message=b’’)
    一个初始化关闭握手消息的协程方法。5不同任务中的close()调用会被保留。
  • coroutine receive(timeout=None)
    等待peer即将发来的数据消息并返回它的协程方法。
    该方法在PING,PONG和CLOSE处理时都有调用,但并不返回。
    该方法在内部会执行ping-pong游戏和closing握手。
  • coroutine receive_str(*, timeout=None)
    调用receive()方法,并判断其消息类型是否为TEXT(文本)。
  • coroutine receive_bytes(*, timeout=None)
    调用receive()方法,并判断其消息类型是否为BINARY(二进制)。
  • coroutine receive_json(*, loads=json.loads,timeout=None)
    调用receive_str()方法,并将JSON字符串转换为Python字典(dict)。
WebSocketReady
1
class aiohttp.web.WebSocketReady

WebSocketResponse.can_prepare()所返回的对象。可使用bool类型判断:

1
2
if not await ws.can_prepare(...):
cannot_start_websocket()

json_response

1
aiohttp.web.json_response([data, ]*, text=None, body=None, status=200, reason=None, headers=None, content_type='application/json', dumps=json.dumps)

返回内容为JSON数据(默认由json.dumps()转换),并带有’application/json’信息的响应对象。

应用类和路由器类

Application

应用(Application)是web服务器的代名词。

要得到完整地可工作例子,你必须创建应用(Application)和路由器(Router)并且使用Server创建服务器套接字作为协议工厂。Server可以使用Application.make_handler()来创建。

应用(Application)还是一个类字典对象,所以你可以用它作为全局共享数据容器,你可以在处理器中使用Request.app来访问它:

1
2
3
4
5
6
app = Application()
app['database'] = await aiopg.create_engine(**db_config)

async def handler(request):
with (await request.app['database']) as conn:
conn.execute("DELETE * FROM table")
1
class aiohttp.web.Application(*, logger= , router=None, middlewares=(), handler_args=None, client_max_size=1024**2, loop=None, debug=...) 

Application类继承于dict

  • logger - logging.Logger实例对象,用于存储应用程序的日志。默认值为logging.getLogger("aiohttp.web")
  • router - aiohttp.abc.AbstractRouter实例对象,如果是None则默认创建UrlDispatcher
  • middlewares - 存放中间件工厂的列表。
  • handler_args - 类字典对象,用于覆盖Application.make_handler()中的关键字参数。
  • client_max_size - 客户端请求中携带的数据的最大大小。如果POST请求超过这个值,将会抛出HTTPRequestEntityTooLarge异常。
  • loop - 事件循环。自2.0版本后不再赞成使用:在冻结阶段Loop会被自动设置。
  • debug - 调试组件。

信号:

  • on_response_prepare
    一个在StreamResponse.prepare()执行时发送的信号,触发信号时将请求(request)和响应(response)对象作为参数传递。在某些情况下很有用,比如说,你想为每个响应都添加一个自定义头信息。

    1
    2
    async def on_prepare(request, response):
    pass
  • on_startup
    一个在应用程序开启时触发的信号。我们可以捕获这个信号来做一些后台任务。

    1
    2
    async def on_startup(app):
    pass
  • on_shutdown
    一个在应用程序关闭时触发的信号。我们可以捕获这个信号来做一些对于需要长时间运行的连接的清理工作(websockets和数据流之类的)。

    1
    2
    async def on_shutdown(app):
    pass
  • on_cleanup
    一个在应用程序执行清理时发送的信号。我们可以捕获这个信号来优雅的关闭数据库服务器之类的连接。

    1
    2
    async def on_cleanup(app):
    pass

方法:

  • make_handler(loop=None, **kwargs)
    创建一个处理请求的HTTP协议工厂。

    1
    2
    3
    4
    5
    6
    7
    loop = asyncio.get_event_loop()

    app = Application()

    # setup route table
    # app.router.add_route(...)
    await loop.create_server(app.make_handler(), '0.0.0.0', 8080)
  • coroutine startup()
    一个会与应用程序的请求处理器一起调用的协程方法。
    该方法的目的是调用on_startup信号所连接的处理器。

  • coroutine shutdown()
    该方法应该在服务器正在停止且在(要)调用cleanup()前调用。
    该方法会调用on_shutdown信号所连接的处理器。

  • coroutine cleanup()
    该方法应该在服务器正在停止且在(要)调用shutdown()后调用。
    该方法会调用on_cleanup信号所连接的处理器。

Server

一个与create_server()兼容的协议工厂。

class aiohttp.web.Server
该类用于创建处理HTTP连接的HTTP协议对象。

Server.connections
一个包含当前已开启的连接的列表。

aiohttp.web.request_count
已处理请求的总数。

coroutine Server.shutdown(timeout)
一个用于关闭所有已开启连接的协程方法。

Router

用于将分发URL到特定的处理器,aiohttp.web使用路由来建立联系。
路由可以是任何部署了AbstractRouter接口的对象。
aiohttp.web提供的部署方式为UrlDispatcher
Application也使用UrlDispatcher作为router()的默认返回。

1
class aiohttp.web.UrlDispatcher

最直接地url匹配型路由,同时具有collections.abc.Mapping的功能,可以用于访问已命名的路由。

  • add_resource(path, *, name=None)
    添加一个资源到路由表尾部。
    path可以只是字符串’/a/b/c’,也可以带变量’/a/{var}’。

  • add_route(method, path, handler, *, name=None, expect_handler=None)
    添加一个处理器到路由表尾部。
    path可以只是字符串’/a/b/c’,也可以带变量’/a/{var}’。

    注意: 如果处理器是一个普通函数,aiohttp会在内部将其转换为协程函数。

  • add_routes(routes_table)
    从路由表(routes_table)中注册路由。
    路由表(routes_table)需要是包含RouteDef组件或RouteTableDef的列表。

  • add_get(path, handler, *, name=None, allow_head=True, **kwargs)
    添加GET方法路由的快捷方式。等价于调用add_route(method=’GET’)。

    同理:
    add_post(path, handler, **kwargs)

    add_put(path, handler, **kwargs)

    add_patch(path, handler, **kwargs)

    add_delete(path, handler, **kwargs)

  • add_static(prefix, path, , name=None, expect_handler=None, chunk_size=2561024, response_factory=StreamResponse, show_index=False, follow_symlinks=False, append_version=False)
    添加一个用于返回静态文件的路由和处理器。对于获取图片,js和css等文件非常有用。

    add_static()仅用于开发。生存环境中,静态文件功能应由wbe服务器(比如nginx,apache)提供。

  • add_subapp(prefix, subapp)
    在给定路径前缀下注册一个嵌套子应用。
    以定前缀开始的请求会交由subapp处理。

Resource

资源是路由表中的一个含有路径,独特的名字和至少有一条路由的组件。

1
class aiohttp.web.AbstractResource

所有资源的基类。继承自collections.abc.Sizedcollections.abc.Iterable
len(resource)会返回属于该资源的路由总数,也允许进行迭代for route in resource

  • get_info()
    返回资源的描述。如{'path': '/path/to'}, {'formatter': '/path/{to}', 'pattern': re.compile(r'^/path/(?P<to>[a-zA-Z][_a-zA-Z0-9]+)$)
  • url_for(*args, **kwargs)
    构建一个该路由的URL(可附带额外参数)。
    args和kwargs依赖继承于资源类的可接受参数列表。返回URL实例对象
1
class aiohttp.web.Resource

新类型资源的基类,继承于AbstractResource

add_route(method, handler, *, expect_handler=None)
添加一个web处理器到资源中。

Route

路由(Route)具有HTTP方法(通配符’*’是可以用的),web处理器和异常处理器(可选)。每个路由都可属于多个资源。

问题解决

路由斜杠问题

  1. app.router.add_get('/chat/', chat)app.router.add_get('/chat', chat)是不一样的.
    他们都是正确的,分别响应127.0.0.1:8080/chat/ 和127.0.0.1:8080/chat
  2. 这个问题很容易出现在websocket里.因为websocket默认就是ws://127.0.0.1:8080/chat/ 多了一个斜杠.我们在写路由的时候很可能会少写这个斜杠

其他工具包

参考网址

  1. aiohttp 中文文档
  2. sqlalchemy中多对多的关系