第一章 初识FLASK

HTML , css , JavaScript 分别作为一个 Web 页面的结构层、表现层和行为层

pipenv

1
pip install pipenv

Pipenv 是基 pip Python 理工具,它和 pip 的用 非常相似,可以看作 pip 的加强版,它的出现解决了旧的 pip+virtualenv+requirements.txt 作方式的弊端 具体来说,它是 pip Pipfile Virtualenv 结合体,它让包安装、包依赖 理和虚拟环境管理更加方便,使用它可 高效 Python 目开发工作流

之后我们就可以使用

1
pipenv install
1
pipenv install XXX

这会为当前项目创建一个文件夹,其中包含隔离的 Python 解释器环境,并且安装 pip wheel setuptools 等基本的包

类似于requirements.txt , pipenv使用的文件为Pipfile 文件Pipfile.lock文件,前者用来记录项目依赖包 ,而后者记录了固 定版本的详细依赖包列表,当我们使用 Pipenv安装/删除/更新依赖包时,Pipile以及 Pipilelock会自动更新。

默认情况 下, Pipenv统一管理所有虚拟环境

Windows 系统中,虚拟环境文件夹 在 C: \U sers\Administrator. virtualenvs 目录下创建,而 Linux macOS 会在 ~/.local/share/virtualenvs 目录下创建

如采你想在项目目录内创建虚拟环珑文件夫,可以设置 环境变量 PIPENV_VENV_IN_PROJECT ,这时名为 .venv 的虚拟环 文件夹将在项目根 目录被创建

激活虚拟环境

1
pipenv shell

flask.Flask 和 创建实例

1
2
from flask import Flask
app = Flask(__name__)

用特殊变量__name__ : Python会根据所处的模块来赋予_name_变量相应的值,对于我们的程序来说(app.py),这个值为app。除此之外,这也会帮助 Flask在相应的文件夹里找到需要的资源,比如模板和静态文件。

1
class flask.Flask(import_name, static_url_path=None, static_folder='static', static_host=None, host_matching=False, subdomain_matching=False, template_folder='templates', instance_path=None, instance_relative_config=False, root_path=None)
  • static_url_path : 前端访问资源文件的前缀目录。
  • static_folder : 后端存储资源文件的目录。
  • template_folder : 后端存储模板文件的目录

URL参数设置默认值

如果用户访问的 URL 没有添加变量,Flask在匹配失败后会返回一个404错误响应。

1
2
3
4
@app.route('/greet',defaults={'name':'hyl'})
@app.route('/greet/<name>')
def greet(name):
return f'hello {name}'

flask routes命令

查看当前项目的所有route

1
2
3
4
5
6
Endpoint        Methods  Rule
-------------- ------- -----------------------------
index.any_test GET /colors/<any(blue,red):color>
index.index GET /
index.test GET /test/
static GET /static/<path:filename>

自动发现程序实例

在执行 flask run命令运行程序前,我们需要提供程序实例所在模块的位置。但是我们可以直接运行程序,是因为
Flask会自动探测程序实例,自动探测存在下面这些规则:

  • 从当前目录寻找 app.py和 wsgi.py模块,并从中寻找名为app或 application的程序实例。
  • 从环境变量 FLASK_APP对应的值寻找名为app或 application 的程序实例。

因为我们的程序主模块命名为app.py,所以 flask run命令会自动在其中寻找程序实例。如果你的程序主模块是其他名称,比如 hello.py,那么需要设置环境变量 FLASK APP,将包含程序实例的模块名赋值给这个变量。

1
set FLASK_APP=hello

使服务器外部可见与flask run命令

1
flask run -- host 0.0.0.0 

执行 flask run命令时的host和port选项也可以通过环境变量 FLASK_RUN_HOSTFLASK_RUN_PORT设置。

事实上,Flask内置的命令都可以使用这种模式定义默认选项值,即FLASK_< COMMAND>_<OPTION>,你可以使用flask-help命令查看所有可用的命令

开发环境

1
FLASK_ENV=development

在开发环境下,调试模式(Debug Mode)将被开启,这时执行 flask run启动程序会自动激活 Werkzeug内置的调试器(debugger)和重载器(reloader)

  • 默认会使用 Werkzeug内置的stat重载器,它的缺点是耗电较严重,而且准确性一般。
  • 为了获得更优秀的体验,我们可以安装另一个用于监测文件变动的 Python库 Watchdog,安装后Werkzeug会自动使用它来监测文件变动:pipenv install watchdog--dev

上下文

  • 上下文( context )可以理解为环境
  • 为了正常运行程序,一些操作相关的状态和数据要被临时保存下来,这些状态和数据被统称为上下文
  • Flask 中,上下文有两种,分别为程序上下文请求上下文

项目配置

配置的名称必须是全大写形式,小写的变量将不被读取

这些配置变量都通过 Flask对象的 app.config属性作为统一的接口来设置和获取,它指向的Config类实际上是字典的子类,所以你可以像操作其他字典一样操作它.

url_for 的相对于绝对

  • url_for函数生成的URL是相对URL ,
  • 如果想要生成供外部使用的绝对URL,可以在使用 url_for函数时,将_external参数设为True,这会生成完整的URL

current_app

  • current_app是一个表示当前程序实例的代理对象
  • 当某个程序实例被创建并运行时,它会自动指向当前运行的程序实例,并把所有操作都转发到当前的程序实例
  • 比如,当我们需要获取配置值时,会使用 current app.config,其他方法和属性亦同。

工厂

工厂( factory )是指创建其他对象的对象 ,通常是一个返回其他类的对象的函数或方法,

注册命令 与 click

1
2
3
4
5
6
7
8
import click
from flask import Flask

app = Flask(__name__)

@app.cli.command()
def hello():
click.echo('Hello, Human!')

一般我们就将click相关函数封装成一个类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def configure_click(app):
@app.cli.command()
def hello():
"""Just say hello."""
click.echo('Hello, Human!')

def create_app(config_name):
"""创建app"""
app = Flask(__name__)
config_class = load_config_class(config_name)
app.config.from_object(config_class)
configure_click(app)
configure_errorhandlers(app)
configure_extensions(app)
configure_blueprint(app)
return app

这样 , 我们就可以在flask --help中看到了

1
2
3
4
5
6
7
>flask --help
...
Commands:
hello Just say hello.
routes Show the routes for the app.
run Run a development server.
shell Run a shell in the app context.

注册filters

1
2
3
4
5
6
7
8
def configure_template_filters(app):

@app.template_filter()
def round_(v, precision):
try:
return round(float(v), precision)
except Exception,e:
return v

注册模板上下文(模板变量)

1
2
3
4
5
def configure_processor(app):
@app.context_processor
def inject_foo():
foo = 'I am foo'
return dict(foo=foo)
1
2
3
4
<body>
{{ foo }}
</body>
</html>

注册全局函数

1
2
3
4
def configure_template_function(app):
@app.template_global()
def bar():
return 'i am bar

@app.template_global()@app.context_processor 的区别:

前者专门用来创建模板函数 , 后者创建模板变量(自然也可以用来创建函数)

注册测试器

1
2
3
4
def configure_tests(app):
@app.template_test()
def is_heyingliang(string):
return string == 'heyingliang'

注册Shell上下文

1
2
3
4
5
6
7
def configure_shell(app):
@app.shell_context_processor
def make_shell_context():
return dict (
db=db,
Note=Note,
)

注册errorhandlers

1
2
3
4
def configure_errorhandlers(app):
@app.errorhandler(401)
def unauthorized(error):
return render_template("errors/401.html", error=error)

注册blueprint

1
2
3
def configure_blueprint(app):
for blueprint, url_prefix in routers:
app.register_blueprint(blueprint, url_prefix=url_prefix)

注册普通日志

1
2
3
4
5
6
7
8
9
10
11
12
import logging
from logging.handlers import RoptatingFileHander

def register_logger():
app.logger.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
file_handler = RotatingFileHandler('/logs/bulelog' , maxBytes=10*1024*1024,backupCount=10)
file_handler.setFormatter(formatter)
file_handler.setLevel(logging.INFO)

if not app.debug:
app.logger.addHandler(file_handler)

注册SMTP日志

当发生ERROR级异常时 , 发送邮件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
def register_logging(app):
class RequestFormatter(logging.Formatter):
def format(self, record):
record.url = request.url
record.remote_addr = request.remote_addr
return super(RequestFormatter, self).format(record)

request_formatter = RequestFormatter(
'[%(asctime)s] %(remote_addr)s requested %(url)s\n'
'%(levelname)s in %(module)s: %(message)s'
)

formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')

file_handler = RotatingFileHandler(os.path.join(basedir, 'logs/bluelog.log'),
maxBytes=10 * 1024 * 1024, backupCount=10)
file_handler.setFormatter(formatter)
file_handler.setLevel(logging.INFO)

mail_handler = SMTPHandler(
mailhost=app.config['MAIL_SERVER'],
fromaddr=app.config['MAIL_USERNAME'],
toaddrs=['ADMIN_EMAIL'],
subject='Bluelog Application Error',
credentials=(app.config['MAIL_USERNAME'], app.config['MAIL_PASSWORD']))
mail_handler.setLevel(logging.ERROR)
mail_handler.setFormatter(request_formatter)

if not app.debug:
app.logger.addHandler(mail_handler)
app.logger.addHandler(file_handler)

注册请求钩子

1
2
3
4
5
6
7
8
9
def configure_hook(app):
@app.before_request
def hello():
print(123465789)

@app.after_request
def bye(resp):
print(123465789)
return resp

注册程序上下文

1
2
3
4
5
6
7
8
9
def configure_context_processors(app):
@app.context_processor
def global_style():
sidebar_collapsed_key = "ace_settings.sidebar-collapsed"
if sidebar_collapsed_key in request.cookies:
sidebar_collapsed = request.cookies[sidebar_collapsed_key]
if sidebar_collapsed == "1":
return {"sidebar_collapsed": True}
return {}

创建注册外部库

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import os
from flask import send_from_directory

class UploadManager(object):
def __init__(self, app=None):
if app is not None:
self.init_app(app)

def init_app(self, app):
config_name = os.getenv('GAMESDK_KFADMIN_CONFIG_NAME') or "local"
if config_name == 'local':
@app.route('/uploads/<filename>')
def uploaded_file(filename):
return send_from_directory(app.config["STATIC_FILE_UPLOAD_PATH"],
filename)

@app.route('/tmpls/<filename>')
def uploaded_tmpl(filename):
return send_from_directory(app.config["CREATIVE_TMPL_PATH"],
filename)

@app.route('/excels/<filename>')
def export_excel(filename):
return send_from_directory(app.config["TMP_FILE_PATH"],
filename)

upload_manager = UploadManager()
1
2
def configure_uploads(app):
upload_manager.init_app(app)

或者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import time
import simplejson as json

import redis
from flask import Blueprint, url_for
from flask_login import LoginManager, current_user

from .login import bp
from .models.user import User
from .cache import cache

class OALogin(object):
def __init__(self, app=None):
if app is not None:
self.init_app(app)

def init_app(self, app):
app.config.setdefault("ZK_LOGIN_SITE_ID", None)
app.config.setdefault("ZK_LOGIN_SECRET_KEY", None)
app.register_blueprint(bp, url_prefix="/zkoa")
self.configure_loginmanager(app)
cache["redis"] = redis.StrictRedis.from_url(app.config["ZK_REDIS_CACHE_URL"])

# 配置上下文
@app.context_processor
def project_name():
return dict(
site_name=app.config['SITE_NAME']
)

def configure_loginmanager(self, app):
login_manager = LoginManager()
login_manager.init_app(app)
login_manager.login_message = u"请先登录"

@login_manager.user_loader
def load_user(userid):
return User._get_user_from_cache(userid)

login_manager.login_view = 'zkoa.login'

第二章 FLASK与HTTP

  • Web服务器接收请求,通过WSGI将HTTP格式的请求数据转换成我们的Flask程序能够使用的Python数据
  • 响应依次经过 WSGI 转换生成 HTTP 响应,

MutliDict与ImmutableMultiDict

  • Werkzeug的 MutliDict类是字典的子类,它主要实现了同一个键对应多个值的情况。比如一个文件上传字段可能会接收多个文件。这时就可以通过 getlist方法来获取文件对象列表。
  • 而 ImmutableMultiDict类继承了 MutliDict类,但其值不可更改

路由表

  • 程序实例中存储了一个路由表(app.url_map),其中定义了URL规则和视图函数的映射关系。
  • 使用 flask routes 命令可以查看程序中定义的所有路由,这个列表由app.url_map解析得到

app.route("/colors/<any(blue,red):color>")

any限定必须是blue或red

1
2
3
@index_router.route("/colors/<any(blue,red):color>")
def any_test(color):
return color

MIME 类型

  • MIME类型(又称为 media type或 content type)是一种用来标识文件类型的机制,它与文件扩展名相对应,可以让客户端区分不同的内容类型,并执行不同的操作
  • 一般的格式为“类型名/子类型名”,其中的子类型名一般为文件扩展名。比如,HTML的MIME类为“text/html”,png图片的MIME类型为“Image/png”。
  • 常用的数据格式有纯文本、HTML、XML和JSON,
1
2
3
4
5
@index_router.route("/test/")
def test():
res = make_response('hello')
res.mimetype = 'text/plain'
return res

上下文

  • Flask 通过本地线程( thread local )技术将请求对象在特定的线程和请求中全局可访问
  • 请求上下文被激活时,程序上下文也被自动激活 , 当请求处理完毕后,请求上下文和程序上下文也会自动销毁 , 也就是说,在请求处理时这两者拥有相同的生命周期。

current_app , g , request , session :

  • 四个变量都是代理对象( proxy ),即指向真实对象的代理 一般情况下,我们不需太关注其中的区别
  • 在某些特定的情况下,如你需要获取原始对象,可以对代理对象调用_get_current_ object ()方法获取被代理的真实对象

通常会使用g结合请求钩子来保存每个请求处理前所需要的全局变量,比如当前登人的用户对象,数据库连接等

1
2
3
@app.before_request
def hello():
g.name = request.args.get('name')

应用上下文和请求上下文

  • 实现线程隔离后,为了在一个线程中更加方便使用这些变量,flask中还有一种堆栈的数据结构,可以处理这些变量,但是并不直接处理这些变量。假如有一个程序得到一个请求,那么flask会将这个请求的所有相关信息进行打包,打包形成的东西就是处理请求的一个环境。flask将这种环境称为“请求上下文”(request context) , 这样,请求发生时,我们一般都会指向堆栈中的“请求上下文”对象,这样可以通过请求上下文获取相关对象并直接访问
  • 对于单应用单请求来说,使用“请求上下文”确实就可以了。然而,Flask的设计理念之一就是多应用的支持。当在一个应用的请求上下文环境中,需要嵌套处理另一个应用的相关操作时(这种情况更多的是用于测试或者在console中对多个应用进行相关处理),“请求上下文”显然就不能很好地解决问题了,因为魔法current_app无法确定当前处理的到底是哪个应用
  • 如何让请求找到“正确”的应用呢?我们可能会想到,可以再增加一个请求上下文环境,并将其推入栈中。由于两个上下文环境的运行是独立的,不会相互干扰,所以通过调用栈顶对象的app属性或者调用current_app(current_app一直指向栈顶的对象)也可以获得当前上下文环境正在处理哪个应用
  • 在创建“请求上下文”时一定要创建一个“应用上下文”对象。有了“应用上下文”对象,便可以很容易地确定当前处理哪个应用,这就是魔法current_app

激活上下文的四种情况

  1. flask run命令启动程序时.
  2. app.run()方法启动程序时.
  3. 执行使用@app.cli command()装饰器注册的flask命令时.
  4. 使用 flask shell 命令

激活请求上下文 和 程序上下文

激活程序上下文

1
2
3
4
5
6
7
8
9
10
11
12
# 方法一
from app import app
from flask import current_app

with app.app_context():
current_app.name

# 方法二
app_ctx = app.app_context()
app_ctx.push()
current_app.name
app_ctx.pop()

激活请求上下文

1
2
3
4
5
from app import app
from flask import request

with app.test_request_context('/hello'):
requese.method

注意 , 因为这单单创建请求上下文 , 没有创建应用上下文 , 因此不能使用current_app

上下文钩子

  • Flask 为上下文提供了teardown_appcontext 钩子 ,

  • 使用它注册的回调函数会在程序上下文被销毁时调用,而且通常也会在请求上下文被销毁时调用 ,

  • 比如,你需要在每个请求处理结束后销毁数据库连接:

    1
    2
    3
    @app.teardown_appcontext
    def teardown_db(exception):
    db.close()

获取上一个页面的URL

用户单击某个需要登录才能访问的链接,这时程序会重定向到登录页面,当用户登录后合理的行为是重定向到用户登录前浏览的页面,以便用户执行未完成的操作,而不是直接重定向到主页。

使用referer

  • referer是一个用来记录请求发源地址的HTTP首部字段(HttpreFerer),即访问来源。
  • 当用户在某个站点单击链接,浏览器向新链接所在的服务器发起请求,请求的数据中包含的Referer字段记录了用户所在的原站点URL。
  • 这个值通常会用来追踪用户,比如记录用户进入程序的外部站点,以此来更有针对性地进行营销。
  • 在 Flask中,referer的值可以通过请求对象的 referrer属性获取,即 request。referrer。
1
return redirect(request.referrer or url_for('index'))

使用query args

1
return redirect(request.args.get('next',url_for('index')))

综合上述 + url安全认证

1
2
3
4
5
6
7
8
9
10
11
12
13
def is_safe_url(target):
ref_url = urlparse(request.host_url)
test_url = urlparse(urljoin(request.host_url, target))
return test_url.scheme in ('http', 'https') and \
ref_url.netloc == test_url.netloc

def redirect_back(default='hello', **kwargs):
for target in request.args.get('next'), request.referrer:
if not target:
continue
if is_safe_url(target):
return redirect(target)
return redirect(url_for(default, **kwargs))
1
2
3
@app.route('/abc')
def do_something():
return rediect_back()

HTTP服务器端推送

  • 不论是传统的HTTP请求-响应式的通信模式,还是异步的AJAX式请求,服务器端始终处于被动的应答状态,只有在客户端发出请求的情况下,服务器端才会返回响应。这种通信模式被称为客户端拉取(client pul)。在这种模式下,用户只能通过刷新页面或主动单击加载按钮来拉取新数据。

  • 然而,在某些场景下,我们需要的通信模式是服务器端的主动推送(server push)。

  • 比如聊天室 , 实时显示新提醒和私信的数量 , 用户的在线状态更新,股价行情监控、显示商品库存信息、多人游戏、文档协作等

  • 实现服务器端推送的一系列技术被合称为Http Server Push(HTTP服务器端推送)

常用推迭技术

名称 说明
传统轮询 在特定的时间间隔内,客户端使用AJAX技术不断向服务器发起HTTP请求,然传统轮询后获取新的数据并更新页面
长轮询 和传统轮询类似,但是如果服务器端没有返回数据,那就保持连接一直开启,直到有数据时才返回。取回数据后再次发送另一个请求
Server-Sent Events(SSE) SSE通过HTML5中的 Event Source API实现. SSE会在客户端和服务器端建立一个单向的通道,客户端监听来自服务器端的数据,而服务器端可以在任意时间发送数据,两者建立类似订阅/发布的通信模式
  • 除了这些推送技术,在HTML5的API中还包含了一个WebSocket协议,
  • 和HTTP不同,它是一种基于TCP协议的全双工通信协议。和前面介绍的服务器端推送技术相比,WebSocket实时性更强,而且可以实现双向通信。另外,Web Socket的浏览器兼容性要强于SSE。

注入攻击

注入攻击包括

  • 注入攻击包括系统命令(OS Command)注入、
  • SQL注人(SQL Injection)、
  • NOSQL注入、
  • ORM注入等。

XSS攻击

XSS攻击主要分为反射型XSS攻击(Reflected XSS Attack)和存储型XSS攻击(Stored XSS Attack)两类

  • 反射型XSS攻击 又被称为非持久型XSS
  • 反射型XSS 和 存储型XSS 的区别就是会不会存入数据库 , 存入数据库就是永久性攻击了 , 任何用户访问包含攻击代码的页面都会被殃及。
1
2
3
4
@app.route('index')
def index():
name = request.args.get('name')
response = '<h1>hello,%s!<h1>' % name

此时 , 访问http://example.com/index?name=<script>alert('bingo');</script> , 服务端将返回<h1>hello,<script>alert('bingo');</script>!<h1>

转义

jinja2自带转义函数

1
2
3
4
5
6
from jinja2 import escape

@app.route('index')
def index():
name = request.args.get('name')
return '<h1>hello,%s!<h1>' % escape(name)

转义无法防御的情况

1
2
3
4
5
6
from jinja2 import escape

@app.route('index')
def index():
name = request.args.get('url')
return '<a href="{{ url }}" >Website</a> '.format(url=url)

当传入的url为javascript:alert('bingo') , 因为不包含会被转义的< 和 > , 会返回

1
<a href="javascript:alert('bingo');">Website</a>

用户点击即可运行

将危险文本转为Markup对象进行防御

1
2
3
4
5
6
from flask import Markup

@index_router.route("/test/")
def test():
text = Markup('<h1>Hello Flask!<h1>')
return render_template('index.html', text=text)

这时在模板中可以直接使用{{ text } }

CSRF攻击

简单来讲 , 在我的网站放置一个

1
<img src= "http://example.com/account/delete”>
  • 用户一点击 , 就会访问example.com ,
  • 就算我们改用POST请求提交删除账户的请求。攻击者只需要在网站中内嵌一个隐藏表单,然后设置在页面加载后执行提交表单的 JavaScript函数,攻击仍然会在用户点击时发起。

CSRF攻击防御

  • 只使用GET和POST两种

  • GET方法属于安全方法,不会改变资源状态,仅用于获取资源,因此又被称为幂等方法(idempotent
    method)。页面中所有可以通过链接发起的请求都属于GET请求。

  • POST方法用于创建、修改和删除资源。在HTML中使用form标签创建表单并设置提交方法为POST,在提交时会创建POST请求。

  • CSRF令牌校验 :

    当处理非GET请求时,要想避免CSRF攻击,关键在于判断请求是否来自自己的网站。在前面我们曾经介绍过使用 Http referer获取请求来源,理论上说,通过 referer可以判断源站点从而避免CSRF攻击,但因为 referer很容易被修改和伪造,所以不能作为主要的防御措施。

    除了在表单中加入验证码外,一般的做法是通过在客户端页面中加入伪随机数来防御CSRF攻击,这个伪随机数通常被称为CSRF令牌(token)。

    POST方法的请求通过表单创建。我们把在服务器端创建的伪随机数(CSRF令牌)添加到表单中的隐藏字段里和 session变量(即签名 cookie)中,当用户提交表单时,这个令牌会和表单数据一起提交。在服务器端处理POST请求时,我们会对表单中的令牌值进行验证,如果表单中的令牌值和 session中的令牌值相同,那么就说明请求发自自己的网站。

    对于AJAX请求,我们可以在 XmlhTtpreoμuest请求首部添加一个自定义字段X-CSRFToken来保存CSRF令牌。

如果程序包含XSS漏洞,那么攻击者可以使用跨站脚本攻破CSRF防御机制,比如使用 JavaScript窃取 cookie内容,进而获取CSRF令牌

第三章 模板

注册模板上下文变量

如果多个模板都需要使用同一变量,那么比起在多个视图函数中重复传入,更好的方法是能够设置一个模板全局变量。Flask提供了一个 app.context_processor装饰器,可以用来注册模板上下文处理函数,它可以帮我们完成统一传入变量的工作。模板上下文处理函数需要返回个包含变量键值对的字典

1
2
3
4
5
def configure_processor(app):
@app.context_processor
def inject_foo():
foo = 'I am foo'
return dict(foo=foo)
1
2
3
4
<body>
{{ foo }}
</body>
</html>

和在 render template函数中传入变量类似,除了字符串、列表等数据结构,你也可以传入函数、类或类实例。

注册模板方法

1
2
3
4
def configure_template_function(app):
@app.template_global()
def bar():
return 'i am template function'

@app.template_global()@app.context_processor 的区别:

前者专门用来创建模板函数 , 后者创建模板变量(自然也可以用来创建函数)

Jinja2 内置模板全局函数

函数 说明
range([start,]stop,step]) 和 Python中的 range用法相同
lipsum(n=5,html=True,min=20,max=100) 生成随机文本(lorem ipsum),可以在测试时用来填充页面.默认生成5段HTML文本,每段包含20~100个单词
dict(**items) 和 Python中的dict用法相同
url_for() 用于生成URL的函数
get_ flashed_ messages() 用于获取 flash消息的函数

全部的全局函数请看 : https://jinja.palletsprojects.com/en/2.10.x/templates/#list-of-global-functions

filter的另一种使用方法

一般filter是用于变量的

1
{{ nameltitle }} 

但是 , 也可以用于文本 , 此时包含的所有文本当成第一参数

1
2
3
{% filter upper %}
i am a student
{% endfilter %}

自定义的filter也可以

1
2
3
@app.template_filter()
def my_upper2(v1, v2='asd'):
return f'-- {v1.upper()} -- {v2.upper()}'
1
2
3
{% filter my_upper2 %}
i am a student
{% endfilter %}

测试器

1
2
3
4
5
{% if age is number %}
{{ age * 365 }}
{% else %}
无效数字
{% endif %}

常用测试器

测试器 说明
callable(object) 判断对象是否可被调用
defined(value) 判断变量是否已定义
undefined(value) 判断变量是否未定义
none(value) 判断变量是否为None
number(value) 判断变量是否是数字
string(value) 判断变量是否是字符串
sequence(value) 判断变量是否是序列,比如字符串、列表、元组
iterable(value) 判断变量是否可迭代
mapping(value) 判断变量是否是匹配对象,比如字典
sameas(value,other) 判断变量与 other是否指向相同的内存地址

全部测试器 , 查看 : https://jinja.palletsprojects.com/en/2.10.x/templates/#list-of-builtin-tests

jinja_env

  • 在Jinja2中,渲染行为由Jinja2.Enviroment类控制 , 所有的配置选项、上下文变量、全局函数、过滤器和测试器都存储在 Enviroment实例上

  • 我们可以使用current_app.jinja_env 获取这个Enviroment类 , 可以通过current_app.jinja_env.__dict__.keys()查看该类的所有属性

  • 模板环境中的全局函数、过滤器和测试器分别存储在 Enviroment对象的 globals、filters和tests属性中

  • 注册装饰器其实本质就是往jinja_env属性添加变量

    1
    2
    3
    @app.template_global()
    def bar():
    return 'i am bar'

    等价于

    1
    2
    3
    4
    def bar():
    return 'i am bar'

    app.jinja_env.globals['bar'] = bar

    app.template_global()的源码:

    1
    2
    3
    @setupmethod
    def add_template_global(self, f, name=None):
    self.jinja_env.globals[name or f.__name__] = f

为了便于管理,我们可以把宏存储在单独的文件中,这个文件通常命名为 macros.html_macors.html

{% %}{-% %}的区别

  • 前者不会移除空行 , 后者会移除空行

  • {-% %}-放置规则是 , end语句在左边 , 其他的语句都在右边

    1
    2
    3
    4
    5
    {% if string == 'heyingliang' -%}
    {{ string }}
    {% else -%}
    无效数字
    {%- endif %}
  • 也可以使用模板环境对象提供的 trim blocksIstrip blocks属性设置,前者用来删除 Jinja2语句后的第一个空行,后者则用来删除 Jinja2语句所在行之前的空格和制表符(tabs):

    1
    2
    app.jinja_env.trim_blocks = True
    app.jinja_env.lstrip_blocks = True
  • 事实上,我们没有必要严格控制HTML输出,因为多余的空白并不影响浏览器的解析。

设置favicon.ico

1
<link rel='icon' type='image/x-icon' href='{{ url_for('static',filename='favicon'.ico') }}'>

使用宏加载静态资源

1
2
3
4
5
6
7
8
9
10
11
12
13
{% macro static_file(type, filename_or_url, local=True) %}
{% if local -%}
{% set filename_or_url = url_for('static', filename=filename_or_url) %}
{%- endif %}

{% if type == 'css' -%}
<link rel="stylesheet" href="{{ filename_or_url }}" type="text/css">
{%- elif type == 'js' -%}
<script type="text/javascript" src="{{ filename_or_url }}"></script>
{%- elif type == 'icon' -%}
<link rel="icon" href="{{ filename_or_url }}">
{%- endif %}
{% endmacro %}
1
2
static_file('css','css/bootstrap.min.css')
static_file('css',' https //maxcdn.../css/bootstrap.min.css', l ocal=False)

消息闪现的固定jinja模板

1
2
3
4
5
6
7
8
9
10
11
{% if get_flashed_messages() %}
{% for category, message in get_flashed_messages(with_categories=true) %}
<script type="text/javascript">
{% if category=="success" %}
layer.msg("{{ message }}", {time: 3000, icon: 6});
{% else %}
layer.msg("{{ message }}", {time: 5000, icon: 5});
{% endif %}
</script>
{% endfor %}
{% endif %}

自定义JS或CSS变量

自定义JS或CSS变量时 , 一般都要加入data-前缀 以示区别:

1
2
3
4
5
6
7
8
<span data-id="{{ user.id }}" data-username="{{ user.username }}">{{ user.username }}</span>

<style>
:root {
--theme-color:{{ theme_color }};
--background-url: {{ url_for('static',filename='background.jpg') }}
}
</style>

第四章 表单

WTForms实例化字段常用参数

参数 说明
label 字段标签< label>的值,也就是渲染后显示在输入字段前的文字
render_kw 字典,用来设置对应的HTML< Input>标签的属性,比如传人{placeholder’:”YourName},渲染后的HTML代码会将< Input>标签的 placeholder属性设为 Your name
validator 列表,包含一系列验证器,会在表单提交后被逐一调用验证表单数据
default 字符串或可调用对象,用来为表单字段设置默认值

WTForms常用验证器

验证器 说明
DataRequired(message=None) 验证数据是否有效
Email(message None) 验证 Email地址
EqualTo(fieldname,message=None) 验证两个字段值是否相同
InputRequired(message=None) 验证是否有数据
Length(min=-1,max=-1,message=None) 验证输入值长度是否在给定范围内
Number Range(min=None,max=None,message=None) 验证输入数字是否在给定范围内
Optional(strip whitespace=True) 允许输入值为空,并跳过其他验证
Regexp(regex,flags=0,message=None 使用正则表达式验证输入
URL(require tld=True,message=None) 验证URL
Anyof(values,message=None,values formatter=None) 确保输人值在可选值列表中
Noneof(values,message=None,values formatter=None 确保输入值不在可选值列表中
1
2
3
4
5
6
7
8
from flask_wtf import FlaskForm
from wtforms import StringField, IntegerField, SelectField,SubmitField

class Act2ndAnniversaryForm(FlaskForm):
start_date = TextField(
label=u"开始日期",
validators=[validators.DataRequired(u"开始日期是必选项")]
)

InputRequired验证器和 DataRequired很相似,但

  • InputRequired仅验证用户是否有输入,而不管输入的值是否有效。例如,由空格组成的数据也会通过验证。
  • 当使用DataRequired时,如果用户输入的数据不符合字段要求,比如在 IntegerField输入非数字时会视为未输入,而不是类型错误。

Field实例化就是HTML代码

1
2
3
4
5
6
form = LoginForm()
print(form.username())
# "<input id='username' name='username' type='text' value=''>"

print(form.username.label())
# "<label for='username'>Username</label>"

默认情况下,WTForms输出的字段HTML代码只会包含id和name属性,属性值均为表单类中对应的字段属性名称。如果要添加额外的属性,通常有两种方法。

  1. 使用render_kw : username = StringField(Username ’, render_kw={ ’ placeholder’:’Your Username ’ })
  2. 在调用宇段时传入 : form.username(style=’ width: 200px;’ class_= ’ bar ’ )

通过上面的方法也可以修改id和name属性,但表单被提交后,WTForms需要通过name属性来获取对应的数据,所以不能修改name属性值。

form.csrf_token字段 和form.hidden_tag()方法的区别

1
2
{{ form.csrf_token }}
{{ form.hidden_tag() }}

form.hidden tag()方法会依次渲染表单中所有的隐藏字段。因为 csrf_token字段也是隐藏字段,所以当这个方法被调用时也会渲染 csrf_token字段。

PRG(Post/Redirect/Get)模式

  • 在浏览器中,当单击F5刷新/重载时的默认行为是发送上一个请求。如果上一个请求是POST请求,那么就会弹出一个确认窗口,询问用户是否再次提交表单。
  • 为了避免出现这个容易让人产生困惑的提示,我们尽量不要让提交表单的POST请求作为最后一个请求。这就是为什么我们在处理表单后返回一个重定向响应,这会让浏览器重新发送一个新的GET请求到重定向的目标URL。最终,最后一个请求就变成了GET请求。
1
2
3
4
5
6
7
8
9
10
11
12
13
@dept_router.route('/editDept/<int:id>', methods=["GET", "POST", ])
@login_required
def dept_edit(id):
g.id = id
dept = Department.get_by_id(id)
form = DeparmentEditForm(request.form, obj=dept)
if form.validate_on_submit():
form.populate_obj(dept)
db.session.commit()
flash("修改成功", category="success")
# 返回redirect
return redirect(url_for("dept.dept_detail"))
return render_template('dept/dept_edit.html', form=form, id=id)

简单来说 , 每次处理POST请求后 , 服务端都要返回redirect

WTF本地化

内置错误消息语言设为简体中文 :

1
2
3
4
5
6
7
8
9
10
11
12
from flask_wtf import FlaskForm

app = Flask(__name__)
app.config['WTF_I18N_ENABLED'] = False

class MyBaseForm(FlaskForm):
class Meta:
locales = ['zh']

class DomainForm(MyBaseForm):
name = StringField('域名', render_kw={'readonly': 'readonly'})
rr = StringField('主机记录')

form_field的宏

1
2
3
4
5
6
7
8
9
{% macro form_field(field) %}
{{ field.label }}<br>
{{ field(**kwargs) }}<br>
{% if field.errors -%}
{% for error in field.errors -%}
<small class="error">{{ error }}</small><br>
{%- endfor %}
{%- endif %}
{% endmacro %}

WTForm的行内和全局validator

最常用的 , 被称为行内验证器:

1
2
3
4
5
6
7
8
9
from wtforms.validators import ValidationError

class DeparmentAddForm(DeparmentForm):
submit = SubmitField('添加')

def validate_id(self, field):
did = field.data
if Department.query.filter_by(did=did).first():
raise ValidationError('部门号已存在')

全局验证器 , 就是使用field的validators属性

1
2
3
4
5
6
7
8
9
10
def is_22(message=None):
if message is None:
message = 'Must be 22'
def _is_22(form,filed):
if int(filed.data) != 22:
raise ValidationError('不是22')
return _is_22

class DeparmentAddForm(DeparmentForm):
submit = SubmitField('添加',validators=[is_22(message='必须是22')])

文件上传

单文件 与 多文件上传通用代码

1
2
3
4
5
6
7
8
9
10
11
12
13
def allowed_file(filename):
return '.' in filename and \
filename.rsplit('.', 1)[1].lower() in app.config['ALLOWED_EXTENSIONS']

# upload form
class UploadForm(FlaskForm):
photo = FileField('Upload Image', validators=[FileRequired(), FileAllowed(['jpg', 'jpeg', 'png', 'gif'])])
submit = SubmitField()

# multiple files upload form
class MultiUploadForm(FlaskForm):
photo = MultipleFileField('Upload Image', validators=[DataRequired()])
submit = SubmitField()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
def random_filename(filename):
ext = os.path.splitext(filename)[1]
new_filename = uuid.uuid4().hex + ext
return new_filename


@app.route('/upload', methods=['GET', 'POST'])
def upload():
form = UploadForm()
if form.validate_on_submit():
f = form.photo.data
filename = random_filename(f.filename)
f.save(os.path.join(app.config['UPLOAD_PATH'], filename))
flash('Upload success.')
session['filenames'] = [filename]
return redirect(url_for('show_images'))
return render_template('upload.html', form=form)


@app.route('/multi-upload', methods=['GET', 'POST'])
def multi_upload():
form = MultiUploadForm()
if request.method == 'POST':
filenames = []

# check csrf token
try:
validate_csrf(form.csrf_token.data)
except ValidationError:
flash('CSRF token error.')
return redirect(url_for('multi_upload'))

# check if the post request has the file part
if 'photo' not in request.files:
flash('This field is required.')
return redirect(url_for('multi_upload'))

for f in request.files.getlist('photo'):
# if user does not select file, browser also
# submit a empty part without filename
# if f.filename == '':
# flash('No selected file.')
# return redirect(url_for('multi_upload'))
# check the file extension
if f and allowed_file(f.filename):
filename = random_filename(f.filename)
f.save(os.path.join(
app.config['UPLOAD_PATH'], filename
))
filenames.append(filename)
else:
flash('Invalid file type.')
return redirect(url_for('multi_upload'))
flash('Upload success.')
session['filenames'] = filenames
return redirect(url_for('show_images'))
return render_template('upload.html', form=form)

MAX_CONTENT_LENGTH : 限制文件上传大小

secure_filename函数

1
2
from werkzeug import secure_filename
filename = secure_filename('avatar!&^%$.jpg')

使用Flask-CKEditor 集成富文本编辑器

1
pip install flask-ckeditor

详见 : https://zhuanlan.zhihu.com/p/38643309

单个表单多个提交按钮

  • 在某些情况下,我们可能需要为一个表单添加多个提交按钮。比如在创建文章的表单中添加发布新文章和保存草稿的按钮。
  • 当用户提交表单时,我们需要在视图函数中根据按下的按钮来做出不同的处理。
1
2
3
4
5
6
# multiple submit button
class NewPostForm(FlaskForm):
title = StringField('Title', validators=[DataRequired(), Length(1, 50)])
body = TextAreaField('Body', validators=[DataRequired()])
save = SubmitField('Save') # 保存按钮
publish = SubmitField('Publish') # 发布按钮
  • 当表单数据通过POST请求提交时,Flask会把表单数据解析到 request.form字典。
  • 如果表单中有两个提交字段,那么只有被单击的提交字段才会出现在这个字典中。当我们对表单类实例或特定的字段属性调用data属性时,WTForms会对数据做进一步处理。对于提交字段的值,会将其转换为布尔值:被单击的提交字段的值将是True,未被单击的值则是 False
1
2
3
4
5
6
7
8
9
10
11
12
@app.route('/two-submits', methods=['GET', 'POST'])
def two_submits():
form = NewPostForm()
if form.validate_on_submit():
if form.save.data:
# save it...
flash('You click the "Save" button.')
elif form.publish.data:
# publish it...
flash('You click the "Publish" button.')
return redirect(url_for('index'))
return render_template('2submit.html', form=form)

单个页面多个表单

  • 在程序的主页上同时添加登录和注册表单。
  • 当在同一个页面上添加多个表单时,我们要解决的个问题就是在视图函数中判断当前被提交的是哪个表单。

单视图处理

  • 被单击的提交字段最终的data属性值是布尔值即True或 False。
  • 为了区分两个submit , 为两个表单的提交字段设置不同的名称submit1 , submit2
1
2
3
4
5
6
7
8
9
10
class SigninForm(FlaskForm):
username = StringField('Username', validators=[DataRequired(), Length(1, 20)])
password = PasswordField('Password', validators=[DataRequired(), Length(8, 128)])
submit1 = SubmitField('Sign in')

class RegisterForm(FlaskForm):
username = StringField('Username', validators=[DataRequired(), Length(1, 20)])
email = StringField('Email', validators=[DataRequired(), Email(), Length(1, 254)])
password = PasswordField('Password', validators=[DataRequired(), Length(8, 128)])
submit2 = SubmitField('Register')

视图函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@app.route('/multi-form', methods=['GET', 'POST'])
def multi_form():
signin_form = SigninForm()
register_form = RegisterForm()

if signin_form.submit1.data and signin_form.validate():
username = signin_form.username.data
flash('%s, you just submit the Signin Form.' % username)
return redirect(url_for('index'))

if register_form.submit2.data and register_form.validate():
username = register_form.username.data
flash('%s, you just submit the Register Form.' % username)
return redirect(url_for('index'))

return render_template('2form.html', signin_form=signin_form, register_form=register_form)

多视图处理

更简洁的方法是通过分离表单的渲染和验证实现。

1
2
3
4
5
@app.route('/multi-form-multi-view')
def multi_form_multi_view():
signin_form = SigninForm2()
register_form = RegisterForm2()
return render_template('2form2view.html', signin_form=signin_form, register_form=register_form)

视图函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@app.route('/handle-signin', methods=['POST'])
def handle_signin():
signin_form = SigninForm2()
register_form = RegisterForm2()

if signin_form.validate_on_submit():
username = signin_form.username.data
flash('%s, you just submit the Signin Form.' % username)
return redirect(url_for('index'))

return render_template('2form2view.html', signin_form=signin_form, register_form=register_form)


@app.route('/handle-register', methods=['POST'])
def handle_register():
signin_form = SigninForm2()
register_form = RegisterForm2()

if register_form.validate_on_submit():
username = register_form.username.data
flash('%s, you just submit the Register Form.' % username)
return redirect(url_for('index'))
return render_template('2form2view.html', signin_form=signin_form, register_form=register_form)

第五章 数据库

  • SQLALCHEMY_TRACK_ MODIFICATIONS配置 : 是否追踪对象的修改,这用于Flask-SQLAlchemy的事件通知系统。一般设置为False就行

默认情况下,Flask-SQLAlchemy会根据 模型类的名称 生成 表名称 , 可以通过__tablename__属性自定义

查看对应SQL语句

1
2
3
4
5
6
7
from sqlalchemy.schema import CreateTable
print(CreateTable(Note.__table__))
# CREATE TABLE note (
# id INTEGER NOT NULL ,
# body TEXT,
# PRIMARY KEY (id)
# )

将数据库的重建定义成click

1
2
3
4
5
6
7
import click

@app.cli.command()
def initdb():
db.drop_all()
db.create_all()
click.echo('inited db')

之后只要指向flask initdb即可

删除数据不能使用get , 必须改成post

为了防御CSRF攻击

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class DeleteNoteForm(FlaskForm):
submit = SubmitField('Delete')

@app.route('/delete/<int:note_id>', methods=['POST'])
def delete_note(note_id):
form = DeleteNoteForm()
if form.validate_on_submit():
note = Note.query.get(note_id)
db.session.delete(note)
db.session.commit()
flash('Your note is deleted.')
else:
abort(400)
return redirect(url_for('index'))
1
2
3
4
5
6
7
8
9
10
{% for note in notes %}
<div class="note">
<p>{{ note.body }}</p>
<a class='btn' href="{{ url_for('edit_note', note_id=note.id) }}">Edit</a>
<form method="post" action="{{ url_for('delete_note', note_id=note.id) }}">
{{ form.csrf_token }}
{{ form.submit(class='btn') }}
</form>
</div>
{% endfor %}

配置 Python Shell 上下文

  • 每次使用 flask shell命令启动 Python Shell后都要从app模块里导人db对象和相应的模型类。为什么不把它们自动集成到 Python Shell上下文里呢?
  • 使用app.shell_context_processor装饰器注册一个shell上下文处理函数
1
2
3
4
5
6
@app.shell_context_processor
def make_shell_context():
return dict (
db=db,
Note=Note,
)

一对多

1
2
3
4
5
6
7
8
9
10
11
class Author(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(20), unique=True)
phone = db.Column(db.String(20))
articles = db.relationship('Article') # collection

class Article(db.Model):
id = db.Column(db.Integer, primary_key=True)
title = db.Column(db.String(50), index=True)
body = db.Column(db.Text)
author_id = db.Column(db.Integer, db.ForeignKey('author.id'))
  • 使用关系函数定义的属性不是数据库字段,而是类似于特定的查询函数。
  • 当某个 Aritcle对象被删除时,在对应 Author对象的 aritcles属性调用时返回的列表也不会包含该对象

简单来说 , relationship就像一个函数 , 而不是一个数据库字段

一对多的双向关系

一个作者有很多本书

1
2
3
4
5
6
7
8
9
10
class Writer(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(64), unique=True)
books = db.relationship('Book', back_populates='writer')

class Book(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(50), index=True)
writer_id = db.Column(db.Integer, db.ForeignKey('writer.id'))
writer = db.relationship('Writer', back_populates='books')

或者使用backref进行简化

1
2
3
4
5
6
7
8
9
class Singer(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(70), unique=True)
songs = db.relationship('Song', backref='singer')

class Song(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(50), index=True)
singer_id = db.Column(db.Integer, db.ForeignKey('singer.id'))

多对一

1
2
3
4
5
6
7
8
9
class Citizen(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(70), unique=True)
city_id = db.Column(db.Integer, db.ForeignKey('city.id'))
city = db.relationship('City') # scalar

class City(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(30), unique=True)

一对一

1
2
3
4
5
6
7
8
9
10
class Country(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(30), unique=True)
capital = db.relationship('Capital', uselist=False) # collection -> scalar

class Capital(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(30), unique=True)
country_id = db.Column(db.Integer, db.ForeignKey('country.id'))
country = db.relationship('Country') # scalar

多对多1:虚拟表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# many to many with association table
association_table = db.Table('association',
db.Column('student_id', db.Integer, db.ForeignKey('student.id')),
db.Column('teacher_id', db.Integer, db.ForeignKey('teacher.id'))
)

class Student(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(70), unique=True)
grade = db.Column(db.String(20))
teachers = db.relationship('Teacher',
secondary=association_table,
back_populates='students') # collection

class Teacher(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(70), unique=True)
office = db.Column(db.String(20))
students = db.relationship('Student',
secondary=association_table,
back_populates='teachers') # collection

多对多2:实体表

上面的操作是使用sqlalchemy的特性创建了一张虚拟表 , 实际上我们也可以将这张表实体化

用户和收藏的图片是多对多关系

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class User(db.Model, UserMixin):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(20), unique=True, index=True)
collections = db.relationship('Collect', back_populates='collector', cascade='all')

class Photo(db.Model):
id = db.Column(db.Integer, primary_key=True)
filename = db.Column(db.String(64))
collectors = db.relationship('Collect', back_populates='collected', cascade='all')

class Collect(db.Model):
# 收藏者
collector_id = db.Column(db.Integer, db.ForeignKey('user.id'),primary_key=True)
# 被收藏的图片
collected_id = db.Column(db.Integer, db.ForeignKey('photo.id'),primary_key=True)
timestamp = db.Column(db.DateTime, default=datetime.utcnow)
collector = db.relationship('User', back_populates='collections', lazy='joined')
collected = db.relationship('Photo', back_populates='collectors', lazy='joined')
  • 我们在 Photo和User模型中定义的关系属性返回的不再是关系另一侧的记录,而是存储对应关系的中间人—Collect记录。
  • 在 Collect记录中添加的标量关系属性 collector和 collected,分别表示收藏者和被收藏图片,指向对应的User和
    Photo记录,我们需要进一步调用这两个关系属性,才可以获取关系另一侧的记录。
  • 举例来说,对于使用关联表的标(Tag)和图片(Photo)来说,对于使用关联模型的用户(User)和图片(Photo),当我们调用 photo.collectors 时,获得的只是一堆包含当前图片和对应收藏者关系的 Collect记录。我们需要对记录进一步调用 collector属性才会获得对应的User记录。
  • 用户a收藏一张图片,仅使用 user_a.collections.append(photo_a)就可以了

多对多3:自引用

用户的关注就是自引用的多对多

  • 因为在Follow模型中 , 两个字段定义的外键是指向同一个表的同一个字段(user.id)的 , 而当我们需要在Follow模型上建立反向属性的时候 , Sqlalchemy没法知道那个外键对应哪个反向属性 , 所以我们需要在关系函数中使用foreign_keys参数来明确对应的字段
  • 同样因为同一个外键值包含歧义,对于集合关系属性 followers和 followings来说,我们无法通过 with_parent()查询方法筛选子对象,所以关系定义时使用 dynamic方式加载关系记录,这样就可以直接调用关系属性附加的查询方法
1
2
3
4
5
6
7
8
9
10
11
12
class Follow(db.Model):
follower_id = db.Column(db.Integer, db.ForeignKey('user.id'),primary_key=True)
followed_id = db.Column(db.Integer, db.ForeignKey('user.id'),primary_key=True)
timestamp = db.Column(db.DateTime, default=datetime.utcnow)
follower = db.relationship('User', foreign_keys=[follower_id], back_populates='following', lazy='joined')
followed = db.relationship('User', foreign_keys=[followed_id], back_populates='followers', lazy='joined')

class User(db.Model, UserMixin):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(20), unique=True, index=True)
following = db.relationship('Follow', foreign_keys=[Follow.follower_id],back_populates='follower',lazy='dynamic', cascade='all')
followers = db.relationship('Follow', foreign_keys=[Follow.followed_id],back_populates='followed',lazy='dynamic', cascade='all')

级联操作

1
2
3
4
5
6
7
8
9
10
11
12
# cascade
class Post(db.Model):
id = db.Column(db.Integer, primary_key=True)
title = db.Column(db.String(50))
body = db.Column(db.Text)
comments = db.relationship('Comment', back_populates='post', cascade='all, delete-orphan') # collection

class Comment(db.Model):
id = db.Column(db.Integer, primary_key=True)
body = db.Column(db.Text)
post_id = db.Column(db.Integer, db.ForeignKey('post.id'))
post = db.relationship('Post', back_populates='comments') # scalar

常用的配置组合如下所示:

  • save-update , merge
  • save-update , merge , delete
  • all
  • all , delete-orphan

with_parent()

根据父对象找出所有的子对象 (使用前提是foreignKey)

1
2
3
4
5
6
7
# 文章
class Post(db.Model):
pass

# 评论
class Comment(db.Model):
pass
  • 文章和评论是一对多
  • 所以文章是父对象 , 评论是子对象
  • 根据父对象post , 找到该文章下的所有评论
1
2
post = Post.query.filter_by(id=23).first()
comments = Comment.query. with_parent(post).all()

SQLalchemy事件监听

  • SQLAlchemy提供了一个 listen_for装饰器,它可以用来注册事件回调函数。

  • listen_for装饰器主要接收两个参数,

    • target表示监听的对象,这个对象可以是模型类、类实例或类属性等。
    • identifier参数表示被监听事件的标识符,比如,用于监听属性的事件标识符有set、append、remove、init
      scalar、init collection等。
  • 示例

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    # event listening
    class Draft(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    body = db.Column(db.Text)
    edit_time = db.Column(db.Integer, default=0) # 草稿被修改次数


    @db.event.listens_for(Draft.body, 'set')
    def increment_edit_time(target, value, oldvalue, initiator):
    if target.edit_time is not None:
    target.edit_time += 1

    # same with:
    @db.event.listens_for(Draft.body, 'set', named=True)
    def increment_edit_time(**kwargs):
    if kwargs['target'].edit_time is not None:
    kwargs['target'].edit_time += 1

第六章 电子邮件

简单使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
from smtplib import SMTPSenderRefused, SMTPAuthenticationError
from flask_mail import Message

def send_email(Type):
from app import app
sender = (app.config.get('MAIL_NAME'), app.config.get('MAIL_USERNAME'))

if Type == 'server':
days = 15
subject = '服务器过期提醒'
template = 'servers_expires.html'
elif Type == 'domain':
days = 15
subject = '域名过期提醒'
template = 'domains_expires.html'
# 获取预警联系人的字典
contact_dict = _get_contact_cloud_dict(Type=Type, days=days)

if not contact_dict:
return

with app.app_context():
for contact, clouds in contact_dict.items():
msg = Message(subject, sender=sender, recipients=[contact.email])
msg.html = render_template(template, clouds=clouds, contact=contact)
try:
mail.send(msg)
except (SMTPSenderRefused,SMTPAuthenticationError,ConnectionRefusedError) as e:
send_email_error_logger(cloud_type=Type, clouds=clouds, contact=contact)
else:
send_email_info_logger(cloud_type=Type, clouds=clouds, contact=contact)

简化版本

1
2
3
4
5
6
7
from flask_mail import Mail, Message

def send_subscribe_mail(subject, to, **kwargs):
message = Message(subject, recipients=[to], sender='Flask Weekly <%s>' % os.getenv('MAIL_USERNAME'))
message.body = render_template('emails/subscribe.txt', **kwargs)
message.html = render_template('emails/subscribe.html', **kwargs)
mail.send(message)

异步发送

1
2
3
4
5
6
7
8
9
10
11
12
# send email asynchronously
def _send_async_mail(app, message):
with app.app_context():
mail.send(message)


def send_async_mail(subject, to, body):
app = current_app._get_current_object() # 获取被代理的真实对象
message = Message(subject, recipients=[to], body=body)
thr = Thread(target=_send_async_mail, args=[app, message])
thr.start()
return thr
1
2
3
4
5
6
7
8
9
10
11
12
13
@app.route('/', methods=['GET', 'POST'])
def index():
form = EmailForm()
if form.validate_on_submit():
to = form.to.data
subject = form.subject.data
body = form.body.data
send_async_mail(subject, to, body)
flash('Email sent ! Check your inbox.' )
return redirect(url_for('index'))
form.subject.data = 'Hello, World!'
form.body.data = 'Across the Great Wall we can reach every corner in the world.'
return render_template('index.html', form=form)
  • 因为我们的程序实例是通过工厂函数构建的,所以实例化 Thread类时,我们使用代理对象 current
    app作为args参数列表中ap的值。
  • 另外,因为在新建的线程时需要真正的程序对象来创建上下文,所以我们不能直接传人 current app,而是传入对 current app调用get current objecto方法获取到的被代理的程序实例。