sql 的 =:语法

  • 通过=:变量名的方式,在具体调用的时候传入参数,是防止sql注入的写法

user_id = 1

1
WHERE userv2_user.user_id = :user_id

快速创建namedtuple

1
user = namedtuple("User", user_dict.keys())(*user_dict.values())

Flask Schema

注意 可以自己创建使用DateRangeField,OrderField

1
2
3
4
5
6
7
"start_date": DateRangeField(label="date_range", first=True, missing=lambda: ""),
"end_date": DateRangeField(label="date_range", last=True, missing=lambda: ""),
"order": OrderField(
missing="user_all_recharge desc",
_in=["user_all_recharge", "recharge_money", "last_logindate",
"last_paydate", "role_money"]
)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# -*- coding: utf-8 -*-

import datetime

from flask import request
from marshmallow.fields import Field


class DateRangeField(Field):

def __init__(self, *args, **kwargs):
if "missing" not in kwargs:
kwargs.update({"missing": datetime.date.today})
super(DateRangeField, self).__init__(required=True, *args, **kwargs)
self.is_first = kwargs.get("first", False)
self.is_last = kwargs.get("last", False)
self.label = kwargs.get("label", "date_range")

def _deserialize(self, value, attr, data):
_data = request.args.to_dict()
_default = datetime.date.today()
v = _data.get(self.label, None)
if not v:
return self.missing()
if self.is_first:
_index = 0
elif self.is_last:
_index = -1

v = v.split(u"到")[_index].strip()
try:
v = datetime.datetime.strptime(v, "%Y-%m-%d").date()
except ValueError, e:
return _default

return v


class IntegerField(Field):
"""ID查询框
- 可以不填值, 不填值或者不合合法值一律None
"""
def __init__(self, *args, **kwargs):
super(IntegerField, self).__init__(missing=None, *args, **kwargs)

def _deserialize(self, value, attr, data):
if not value:
return None
if not value.isdigit():
return None
value = int(value)
return value


class OrderField(Field):
"""排序字段
- 可以不填值, 不填值或者不合合法值一律None
"""
def __init__(self, *args, **kwargs):
super(OrderField, self).__init__(*args, **kwargs)
self.columns = kwargs.get("_in", [])

def _deserialize(self, value, attr, data):
"""order=id desc,create_time asc"""
value = [ item.split() for item in value.split(",") ]
value = [
item for item in value
if len(item) < 3
and (
len(item) == 1 or (
len(item) == 2 and
item[-1] in ("asc", "desc")
)
)
and (
self.columns and item[0] in self.columns
)
]
value = ",".join([ " ".join(item) for item in value])
return None if value == "" else value


class ListField(Field):
"""多ID查询框
- 可以不填值, 不填值或者不合合法值一律None
"""
def __init__(self, *args, **kwargs):
super(ListField, self).__init__(missing="", *args, **kwargs)

def _deserialize(self, value, attr, data):
result = []
if not value: return []
for item in value.split(","):
if item.isdigit():
result.append(int(item))
return result

使用schema处理参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
class OrderField(Field):
"""排序字段"""

def _deserialize(self, value, attr, data, **kwargs):
"""根据order , 生成sql中order的语句

:return: str "report_date desc,app_name,profit_bxm desc"
"""

order_by_li = []
for o in value.split(','):
field, mode = o.split('+')
if mode == 'desc':
order_by_li.append(f'{field} desc')
else:
order_by_li.append(field)
return ','.join(order_by_li)


class GroupField(Field):
"""组合字段"""

def _deserialize(self, value, attr, data, **kwargs):
"""根据group , 生成sql中group的字段

:return: str
"""

group_mapping = {
'all': 'all',
'user': 'user_name',
'app': 'app_name',
'media': 'media'
}
return group_mapping[value]


class FilterField(Field):
"""过滤字段"""

def _deserialize(self, value, attr, data, **kwargs):
"""根据filter_string , 过滤的字典

:return: dict {'app':'app1', 'user':'user1'}
"""
d = {}
for f in value.split(','):
field, val = f.split('+')
if not val == 'all':
d[field] = val
return d

线上线下不同配置

对于线上线下不同的配置,我们可以使用current_app.config.get("DEBUG", False)来判别

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def role_required(fail_msg,redirect_end_point):
def wrapped(func):
@wraps(func)
def decorated(*args, **kwargs):
if current_app.config.get("DEBUG", False):
return func(*args, **kwargs)
if not current_user.is_authenticated or current_user.uid not in ALLOWED_USERS:
flash(fail_msg, category='error')
return redirect(url_for(redirect_end_point))
return func(*args, **kwargs)

return decorated

return wrapped

coalesce语句

COALESCE是一个函数, (expression_1, expression_2, …,expression_n)依次参考各参数表达式,遇到非null值即停止并返回该值。如果所有的表达式都是空值,最终将返回一个空值。使用COALESCE在于大部分包含空值的表达式最终将返回空值。

1
select coalesce(success_cnt, 1) from tableA

sql的执行顺序

mysql中的SQL语句执行是有一定顺序的,如下:

  1. from
  2. on
  3. join
  4. where
  5. group by
  6. with
  7. having
  8. select
  9. distinct
  10. order by
  11. limit

一条SQL会经过这11步的,中间的每一步都会生成一张虚拟表,后面的步骤都是在上一张虚拟表中进行筛选与查询的

select的执行顺序是排在第8步的,而select是针对以上几步生成的虚拟表进行操作的,所以你所要使用的字段,如果虚拟表中不存在,那么则会报错

错 :

1
SELECT SUM(field1 + field2) AS col1, col1 + field3 AS col3 from core

对 :

1
2
3
4
5
SELECT col1,  col1 + field3 AS col3 
FROM (
SELECT field1 + field2 as col1 , field3
from core
) as SubQueryAlias

db.session.query(obj)的扁平化处理

1
2
3
4
5
6
7
appid_query = db.session.query(
func.distinct(VerifyEventLog.app_id),
).filter(
or_(VerifyEventLog.extra_hs['sdk_version'] == obj.version_code,
VerifyEventLog.extra_hs['sdk_id'] == str(obj.id))
)
appid_list = list(chain(*appid_query.all()))

资源上传的通用操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@index_router.route("/upload", methods=["POST",])
def upload():
"""资源上传"""
upload_file = request.files.get("upload_file")
fname_suffix = upload_file.filename.split(".")[-1].lower()

fname_prefix = "upload"
fname_md5 = hashlib.md5(upload_file.read()).hexdigest()
fname = ".".join([fname_prefix, fname_md5, fname_suffix])
fpath = current_app.config["STATIC_FILE_UPLOAD_PATH"] + fname
data = current_app.config["STATIC_FILE_HOST"] + fname
upload_file.seek(0)
upload_file.save(fpath)
if request.args.has_key("_is_wang_editor"):
return jsonify(
errno=0,
data=[data]
)
elif request.args.has_key("_jquery_file_upload"):
return jsonify(
errno=0,
data=data
)
else:
return jsonify(
state=1,
code="200",
data=data
)

sqlalchemy的复杂操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
from sqlalchemy import func, text

query = db.session.query(
VerifyEventLog.idfa,
func.count(VerifyEventLog.id).label("count"),
func.max(VerifyEventLog.log_time).label("latest_time"),
).filter(
VerifyEventLog.idfa != '',
).group_by(
VerifyEventLog.idfa,
).order_by(
text("count desc")
)

query = db.session.query(
func.distinct(VerifyEventLog.log_time).label('log_time'),
VerifyEventLog.idfa,
VerifyApp.bundle_id,
VerifyApp.bundle_name
).join(
VerifyApp,
VerifyApp.app_id == VerifyEventLog.app_id
).filter(
VerifyEventLog.idfa == idfa
).order_by(
VerifyEventLog.log_time.desc(),
)

注意 上面两种order_by的方法

原则

  1. 可读性第一
  2. 快速失败,灵活使用断言保护代码。契约式编程(先验条件和后置条件),越早失败,越容易排查错误。
  3. 隐藏复杂性。如果复杂性避免不了,应该尽让内部复杂,接口要保持简单易用
  4. 不要直接吞掉任何非预知错误和异常
  5. 命名尽量要可以看出类型,比如复数表示容器类型,nums,cnts等后缀表示数值(通过后缀和词性来使名称更容易被推断出来含义,比如是属性还是方法)
  6. 传入了一个嵌套字典需要注释
  7. 保持函数参数和返回值尽量使用简单数据类型
  8. 函数要么修改传入的可变参数,要么返回一个值。请不要两者同时做。尽量不要直接修改传入的可变参数而是返回一个新的结果。
  9. 返回多个值可以使用namedtuple封装,比用下标更直观。对于可能经常需要变动的返回值,返回字典或者对象要比返回tuple容易修改。
  10. 函数尽量返回相同的类型
  11. 参数过多的时候推荐调用的地方显示写出参数名 f(a=1,b=2)。建议一个函数传参 5 个以上以后参数就指定参数名进行传递,防止参数不匹配导致的 bug。
  12. 如果没有特殊的性能需求,函数返回值尽量使用marshmallow之类序列化,之前的很多项目直接搞一个 dict 各种往里边塞字段然后返回,很难看清楚返回的啥结构,维护起来很累。
  13. 保持类的继承层级简单,适当使用mixin。
  14. 注意尽量不要在非 __init__ 方法中给类赋值属性,笔者在维护别人代码的过程中,发现经常在一些非 init 方法种赋值新的属性,导致后期难以维护,根本不知道这个对象包含哪些属性,删除一个属性的时候坑也多。
  15. 基于代码行为测试,不要片面追求测试覆盖率。
  16. 获取对象的时候尽量传入需要的字段(数据表列),减少数据传输同时还能避免拼对象的时间消耗
  17. 数据库30条军规
  18. 一个类、函数等如果难以用一句话描述它的职责,很有可能就违背了SRP(单一职责原则)。
  19. 对于复杂的数据结构(比如嵌套类型),可以适当注释出类型,比如最新的 tornado 源码里出现了这种注释 __impl_kwargs = None # type: Dict[str, Any]
  20. 当一个函数是另一个函数的私有函数的时候 , 可以设计成闭包
  21. 每个 commit 都是独立的小功能,可以单独回滚。
  22. 基本上代码的耗时是遵守2/8定律的,集中优化最耗时的代码,衡量成本和收益。
  23. 优先从数据结构、算法、数据库、网络IO等层面优化
  24. 不要频繁使用诸如data,info,result,handle,process等概念太广泛的词汇给变量命名
  25. 给函数命名的一个好办法:首先考虑应该给这个函数写上一句怎样的注释,然后想办法将注释变成函数名称。
  26. 用下划线前缀定义一个临时使用的变量
  27. 使用下划线开头区分是内部函数还是提供给外部调用

拼接url参数的方法

1
2
3
4
5
6
7
8
9
10
nv = []
nv += [_from_value(n, q) for n, q in
[('p', page), ('n', per_page), ('b', int(banned)),
('v', int(suspected))] if q]
nv += [_from_dict(n, q) for n, q in
[('e', filters), ('k', keywords), ('l', lower), ('u', upper),
('s', orders)] if q]
nv = [t for t in nv if t]
# import
qs = '?' + '&'.join(nv)

对于request请求 , 不要在request函数里添加status的判断

因为如果在request方法里判断status , 就可能返回None , 然后在调用request的方法中还要判断是否返回的是None . 还不如如下使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
def _request(method, url, **kwargs):
timeout = current_app.config['GAME_REQUEST_TIMEOUT']
try:
res = requests.request(method, url, timeout=timeout, **kwargs)
res.raise_for_status()
except:
return None
return res

def get(url, params=None):
return _request('GET', url, params=params)

def request_failed(res):
return res is None or not res.json()['state']

# 使用
@classmethod
def is_blocked(cls, id):
res = get(cls.is_blocked_url.format(id))
if request_failed(res):
return None
return res.json()['data']

返回空的对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class Pagination(object):
total = None
has_prev = False
has_next = False
page = 1
per_page = 20
items = []

def __init__(self, data=None):
if data is None:
return
self.items = data['data']
self.has_prev = data['prev'] is not None
self.has_next = data['next'] is not None
self.page = data['page']
self.per_page = data['per_page']
self.total = data['total']
1
2
3
4
5
6
7
@classmethod
def history(cls, **kwargs):
res = get(cls.history_url, params=kwargs)
if request_failed(res):
# 不是返回None
return Pagination()
return Pagination(res.json()['data'])

使用dict的copy函数

1
2
3
x = {'name':'hyl'}
y = x.copy()
print(y)

如何定义类变量和实例变量

1
2
3
4
5
6
7
8
9
10
11
12
class BlockWord(object):
base_url = BaseConfig.GAME_CHAT_API
search_url = base_url + '/words/block'
add_url = base_url + '/words/block'
remove_url = base_url + '/words/block'

@classmethod
def search(cls, **kwargs):
res = get(cls.search_url, params=kwargs)
if request_failed(res):
return None
return Pagination(res.json()['data'])

定义变量的时候应该考虑 , 这个变量是属于类的 , 还是属于实例的 .

  • 如果是属于类的 , 就应该定义为类变量 , 如果是属于实例的 , 就应该定义在__init__里面
  • 如果在定义时就确定下来的变量, 就可以写成类变量 , 运行时才确定下来的 , 写成实例变量
1
2
3
4
5
6
class RefreshApp:
__REDIS_KEY__ = "app_data_game"

def __init__(self):
self.config = current_app.config
self.client = self._init_redis()
  • redis_key , 就是好存储的键 , 这是已经确定下来的键 , 写成类变量
  • config要从current_app里拿 , client需要连接 , 都是在运行时才确定下来的 , 写成实例变量

只使用一次的函数就没有必要封装

注意divmod的使用

1
2
3
4
5
6
7
8
9
10
11
12
13
def to_natural_time(seconds):
result = ''
units = [('天', 24*60*60), ('小时', 60*60), ('分钟', 60), ('秒', 1)]
r = seconds
for s, d in units:
q, r = divmod(r, d)
if q > 0:
result += '{}{}'.format(q, s)
return result


x = to_natural_time(111111)
print(x) # 1天6小时51分钟51秒

Sqlalchemy的复杂使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
User.query.filter(User.username.endswith('@example.com')).all()

# 模糊查询
User.query.filter(User.name.like('xxx%'))

# and 查询
User.query.filter(and_(User.name == 'Python',User.id==2))
#或者
User.query.filter(User.name == 'Python', User.id == 2)

# 原生查询
from sqlalchemy import text
User.query.filter(text('id>=:value1 and id <:value2')).params(value1=2,value2=5)

# 完整 sql 语句查询
User.query.from_statement(text("select * from tags where id=:value")).params(value=1)

# 原生查询数据数量
db.session.execute("SELECT COUNT(*) FROM table").scalar()

# group by 统计数据
from sqlalchemy import func
db.session.query(Model).with_entities(Model.field, func.count(Model.field)).group_by(field).all()

Marshmallow

  • Marshmallow 很好的实现了 object -> dictobjects -> liststring -> dictstring -> list

  • 要对一个类(记为Class_A,以便表达)进行序列化和反序列化,首先要创建一个与之对应的类(记Class_A'),负责实现Class_A的序列化、序列化和数据校验等,**Class_A'就是schema**

  • Schema是序列化功能的载体,每个需要被序列化或反序列化的类,都要设计一个相应的Schema,以实现相关功能。

    1
    2
    3
    4
    class UserSchema(Schema):
    name = fields.Str()
    email = fields.Email()
    created_at = fields.DateTime()
  • 序列化使用schema中的dump()dumps()方法,

    • dump() 方法实现obj -> dict
    • dumps()方法实现 obj -> string
  • 反序列化基于schema中的load()loads()方法

    load()方法将一个传入的dict,结合schema的约定,再转换为一个dict,而loads()方法的传入参数是json格式的string,同样将传入数据转换为符合规范的dict

判断是linux的方法

1
sys.platform.startswith("linux")

__enter____exit__

  • __enter(self)__ : 负责返回一个值,该返回值将赋值给as子句后面的var_name,通常返回对象自己,即“self”。函数优先于with后面的“代码块”(statements1,statements2,……)被执行。
  • __exit__(self, exc_type, exc_val, exc_tb) : 执行完with后面的代码块后自动调用该函数。with语句后面的“代码块”中有异常(不包括因调用某函数,由被调用函数内部抛出的异常) ,会把异常类型,异常值,异常跟踪信息分别赋值给函数参数exc_type, exc_val, exc_tb,没有异常的情况下,exc_type, exc_val, exc_tb值都为None。
    • 如果该函数返回True、1类值的Boolean真值,那么将忽略“代码块”中的异常,停止执行“代码块”中剩余语句,但是会继续执行“代码块”后面的语句;
    • 如果函数返回类似0,False类的Boolean假值、或者没返回值,将抛出“代码块”中的异常,那么在没有捕获异常的情况下,中断“代码块”及“代码块”之后语句的执行

js判断是否包含的函数

查看item这个字符串中 abcd这个子串的索引 , 如果索引不为-1 , 说明包含此子串

1
myarr2.filter(item => item.indexOf('abcd')>-1);

posgresql插入数组

1
2
3
class GameGame(db.Model):
__tablename__ = "game_game"
os_type = db.Column(db.ARRAY(db.SmallInteger), nullable=False, default=[])
1
2
3
4
5
CREATE TABLE home_game_group (
os_type smallint[] NOT NULL DEFAULT '{}',
);

INSERT INTO home_game_group(os_type) VALUES ('{1,2}');

flask_wtf避免choices被无限append的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class ActicleForm(FlaskForm):

def __init__(self, *args, **kwargs):
super(XgNewsTitleForm, self).__init__(*args, **kwargs)

news_title_choices = []
news_title_choices.extend([
(i.news_id, i.title)
for i in XgNews.query.all()]
)
self.news_title.choices = news_title_choices

news_title = SelectField(
label=u"新闻标题",
coerce=int,
choices=[],
)

flask选择性filter的最佳实践

1
2
3
4
5
6
7
8
filters = []
query = XgKf_issue.query
if issue_type:
filters.append(XgKf_issue.issue_type == issue_type)
if title:
filters.append(XgKf_issue.title.like("%" + title + "%"))
if filters:
query = query.filter(*filters)

flask的model使用property模仿外键

1
2
3
4
@property 
def config(self):
config = GameConfigInfo.query.filter( GameConfigInfo.game_id == self.game_id).first()
return config

__bind_key__缺失问题

在连接不到测试服线上的__bind_key__的时候 , 可以在本地创建

一个数据库连接到__bind_key__对应的键上

Flask 报错 422 错误

出现这段原因一般是在 web_args 获取时类型转化错误 , 如

example.com/?args=2 , 后台使用 args:fields.Boolean()获取 , 这种错误的异常栈不会打

印本行 , 很难 debug

html 使用假表单来提交

在 html 使用假表单来提交 , 不需要在 jinja2 前端里设置没有表单元素为 display:none , 而

是在 form 中直接将其设置为 HiddenField

input 的 required 属性失效

Form 表单在 submit ,会验证各个字段,其中包括 required . 但是如果

使用 Ajax 发送 $(form).serialize()的 data ,就不会触发字段的验证机制

Form 的 name 属性

Form 字段 , form 在提交的时候 , input 其实就是将 name:value 构建成一个键值

对 , 所以 input 标签中必须要用 name 属性 . 如果没有 name 属性 , 那么提交的表单将没

有这个字段

sql的写法

如果在join的基础上使用where , 很可能会产生歧义 , 必须使用table.filed方法
不过可以使用 join ( select * from … where ) on …
这样就将where放在了一张表里 , 就不会产生歧义了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
SELECT home_game_server.ogame_id,
home_game_server.open_server_time,
home_game_server.system_os,
server_name
FROM home_game_server
JOIN (
SELECT home_game_server.ogame_id as ogame_id,
max(open_server_time) as open_server_time, system_os
FROM home_game_server
JOIN home_game_group
ON home_game_group.game_group_id = home_game_server.ogame_id
AND home_game_group.status = true
WHERE open_server_date >= CURRENT_DATE - 7
AND open_server_date <= CURRENT_DATE
%s
GROUP BY ogame_id, system_os
) t1 ON home_game_server.ogame_id = t1.ogame_id
AND home_game_server.open_server_time = t1.open_server_time
AND home_game_server.system_os = t1.system_os
LIMIT %s OFFSET %s;
""" % (
" AND system_os = '%s'" % os_type if os_type else "",
page_size,
(page - 1) * page_size
)

python3使用use_kwargs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from webargs import fields, missing, validate
from webargs.flaskparser import use_kwargs


@domain_router.route("/", methods=["DELETE"])
@use_kwargs({"DomainName": fields.Str(
required=True,
error_messages={"required": "缺少域名"}, ),
}, location="query")
def delete_domain(DomainName):
pass

@domain_router.route("/", methods=["GET"])
@use_kwargs({"cloud": fields.Str(required=False, missing='')}, location="query")
def get_domains(cloud):
"""domains列表"""
print('ccccccccc', cloud)
if cloud:
domains = Domains.get_domain_list(cloud)
else:
domains = Domains.query.all()
data = [domain.to_dict() for domain in domains]
return JsonpResp().success(data)

python2使用use_kwargs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from webargs.flaskparser import use_kwargs
from webargs import fields, missing, validate

@news_router.route("/")
@use_kwargs({
"page": fields.Integer(missing=1),
"title": fields.Str(missing=""),
})
def news_index(page, title):
"""新闻公告列表"""
query = XgNews.query
if title:
query = query.filter(XgNews.title.like('%' + title + '%'))
page_obj = SqlalchemyPages(query, page, 100)
return render_template(
"news/news_index.html",
page_obj=page_obj,
title=title
)

使用SqlalchemyPages

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
class BasePages(object):

def __init__(self, Query, page, per_page):
self.page = page
self.per_page = per_page
self.Query = Query
self.total_count = self._load_count()

def _load_count(self):
raise NotImplementedError()

@property
def pages(self):
return int(ceil(self.total_count / float(self.per_page)))

@property
def has_prev(self):
return self.page > 1

@property
def has_next(self):
return self.page < self.pages

def iter_pages(self, left_edge=2, left_current=2, right_current=5, right_edge=2):
last = 0
for num in range(1, self.pages + 1):
if num <= left_edge or \
(num > self.page - left_current - 1 and \
num < self.page + right_current) or \
num > self.pages - right_edge:
if last + 1 != num:
yield None
yield num
last = num

def list(self):
raise NotImplementedError()

class SqlalchemyPages(BasePages):
def _load_count(self):
return self.Query.count()

def list(self):
return self.Query.offset((self.page-1)*self.per_page).limit(self.per_page).all()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
def home_picture_unchecked(page, ogame_id):
"""玩家晒图列表(未审核)"""
query = HomeGamePicture.query.filter(
HomeGamePicture.status == False
)
filters = []
if ogame_id:
filters.append(HomeGamePicture.ogame_id == ogame_id)
if filters:
query = query.filter(*filters)

page_obj = SqlalchemyPages(query, page, 100)
return render_template(
"home/home_strategy/home_picture_unchecked.html", page_obj=page_obj)
1
{{ render_pagination(page_obj) }}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
{% macro render_pagination(pagination) %}
<nav>
<ul class="pagination">
{% if pagination.has_prev %}
<li><a href="?page={{ pagination.page - 1 }}&{{ request.args|page_args }}">«</a></li>
{% else %}
<li class="disabled"><a href="#">«</a></li>
{% endif %}
{%- for page in pagination.iter_pages() %}
{% if page %}
{% if page != pagination.page %}
<li class="">
<a href="?page={{ page }}&{{ request.args|page_args }}">{{ page }}</a>
</li>
{% else %}
<li class="active">
<a href="?page={{ page }}&{{ request.args|page_args }}">{{ page }}</a>
</li>
{% endif %}
{% else %}
<li class="disabled">
<a>......</a>
</li>
{% endif %}
{%- endfor %}
{% if pagination.has_next %}
<li><a href="?page={{ pagination.page + 1 }}&{{ request.args|page_args }}">»</a></li>
{% else %}
<li class="disabled"><a href="#">»</a></li>
{% endif %}
<li class="disabled">
<a style="color:black;">共{{ pagination.total_count }}条记录</a>
</li>
</ul>
</nav>
{% endmacro %}

wtform的所有field

1
2
3
4
from wtforms import TextAreaField, HiddenField, BooleanField, \
RadioField, PasswordField, SubmitField, TextField, \
DateField, IntegerField, SelectField, SelectMultipleField, \
FileField, DateTimeField, DecimalField, StringField, FloatField

Wtform的validate_on_submit()验证问题

  • FlaskForm.validate_on_submit()验证问题 : FlaskForm使用validate()函数的时候FlaskForm.validators属性可能会失效
  • 通过阅读源码得知 : FlaskForm.validate_on_submit()后台调用Field的validate() , 将extra_validators和FlaskForm的validators属性合成一个chain , 再调用_run_validation_chain()进行验证 . 所以当我们复写validate()的时候需要在函数里调用父类的validate(),保证FlaskForm.validators属性有效