sql 的 =:
语法
通过=:变量名
的方式,在具体调用的时候传入参数,是防止sql注入的写法
user_id = 1
1 WHERE userv2_user.user_id = :user_id
快速创建namedtuple 1 user = namedtuple("User" , user_dict.keys())(*user_dict.values())
Flask Schema 注意 可以自己创建使用DateRangeField
,OrderField
1 2 3 4 5 6 7 "start_date" : DateRangeField(label="date_range" , first=True , missing=lambda : "" ),"end_date" : DateRangeField(label="date_range" , last=True , missing=lambda : "" ), "order" : OrderField( missing="user_all_recharge desc" , _in =["user_all_recharge" , "recharge_money" , "last_logindate" , "last_paydate" , "role_money" ] )
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 import datetimefrom flask import requestfrom marshmallow.fields import Fieldclass DateRangeField (Field ): def __init__ (self, *args, **kwargs ): if "missing" not in kwargs: kwargs.update({"missing" : datetime.date.today}) super (DateRangeField, self).__init__(required=True , *args, **kwargs) self.is_first = kwargs.get("first" , False ) self.is_last = kwargs.get("last" , False ) self.label = kwargs.get("label" , "date_range" ) def _deserialize (self, value, attr, data ): _data = request.args.to_dict() _default = datetime.date.today() v = _data.get(self.label, None ) if not v: return self.missing() if self.is_first: _index = 0 elif self.is_last: _index = -1 v = v.split(u"到" )[_index].strip() try : v = datetime.datetime.strptime(v, "%Y-%m-%d" ).date() except ValueError, e: return _default return v class IntegerField (Field ): """ID查询框 - 可以不填值, 不填值或者不合合法值一律None """ def __init__ (self, *args, **kwargs ): super (IntegerField, self).__init__(missing=None , *args, **kwargs) def _deserialize (self, value, attr, data ): if not value: return None if not value.isdigit(): return None value = int (value) return value class OrderField (Field ): """排序字段 - 可以不填值, 不填值或者不合合法值一律None """ def __init__ (self, *args, **kwargs ): super (OrderField, self).__init__(*args, **kwargs) self.columns = kwargs.get("_in" , []) def _deserialize (self, value, attr, data ): """order=id desc,create_time asc""" value = [ item.split() for item in value.split("," ) ] value = [ item for item in value if len (item) < 3 and ( len (item) == 1 or ( len (item) == 2 and item[-1 ] in ("asc" , "desc" ) ) ) and ( self.columns and item[0 ] in self.columns ) ] value = "," .join([ " " .join(item) for item in value]) return None if value == "" else value class ListField (Field ): """多ID查询框 - 可以不填值, 不填值或者不合合法值一律None """ def __init__ (self, *args, **kwargs ): super (ListField, self).__init__(missing="" , *args, **kwargs) def _deserialize (self, value, attr, data ): result = [] if not value: return [] for item in value.split("," ): if item.isdigit(): result.append(int (item)) return result
使用schema处理参数 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 class OrderField (Field ): """排序字段""" def _deserialize (self, value, attr, data, **kwargs ): """根据order , 生成sql中order的语句 :return: str "report_date desc,app_name,profit_bxm desc" """ order_by_li = [] for o in value.split(',' ): field, mode = o.split('+' ) if mode == 'desc' : order_by_li.append(f'{field} desc' ) else : order_by_li.append(field) return ',' .join(order_by_li) class GroupField (Field ): """组合字段""" def _deserialize (self, value, attr, data, **kwargs ): """根据group , 生成sql中group的字段 :return: str """ group_mapping = { 'all' : 'all' , 'user' : 'user_name' , 'app' : 'app_name' , 'media' : 'media' } return group_mapping[value] class FilterField (Field ): """过滤字段""" def _deserialize (self, value, attr, data, **kwargs ): """根据filter_string , 过滤的字典 :return: dict {'app':'app1', 'user':'user1'} """ d = {} for f in value.split(',' ): field, val = f.split('+' ) if not val == 'all' : d[field] = val return d
线上线下不同配置 对于线上线下不同的配置,我们可以使用current_app.config.get("DEBUG", False)
来判别
1 2 3 4 5 6 7 8 9 10 11 12 13 14 def role_required (fail_msg,redirect_end_point ): def wrapped (func ): @wraps(func ) def decorated (*args, **kwargs ): if current_app.config.get("DEBUG" , False ): return func(*args, **kwargs) if not current_user.is_authenticated or current_user.uid not in ALLOWED_USERS: flash(fail_msg, category='error' ) return redirect(url_for(redirect_end_point)) return func(*args, **kwargs) return decorated return wrapped
coalesce语句 COALESCE是一个函数, (expression_1, expression_2, …,expression_n)依次参考各参数表达式,遇到非null值即停止并返回该值。如果所有的表达式都是空值,最终将返回一个空值。使用COALESCE在于大部分包含空值的表达式最终将返回空值。
1 select coalesce (success_cnt, 1 ) from tableA
sql的执行顺序 mysql中的SQL语句执行是有一定顺序的,如下:
from
on
join
where
group by
with
having
select
distinct
order by
limit
一条SQL会经过这11步的,中间的每一步都会生成一张虚拟表,后面的步骤都是在上一张虚拟表中进行筛选与查询的
select的执行顺序是排在第8步的,而select是针对以上几步生成的虚拟表进行操作的,所以你所要使用的字段,如果虚拟表中不存在,那么则会报错
错 :
1 SELECT SUM (field1 + field2) AS col1, col1 + field3 AS col3 from core
对 :
1 2 3 4 5 SELECT col1, col1 + field3 AS col3 FROM ( SELECT field1 + field2 as col1 , field3 from core ) as SubQueryAlias
db.session.query(obj)的扁平化处理 1 2 3 4 5 6 7 appid_query = db.session.query( func.distinct(VerifyEventLog.app_id), ).filter ( or_(VerifyEventLog.extra_hs['sdk_version' ] == obj.version_code, VerifyEventLog.extra_hs['sdk_id' ] == str (obj.id )) ) appid_list = list (chain(*appid_query.all ()))
资源上传的通用操作 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 @index_router.route("/upload" , methods=["POST" ,] ) def upload (): """资源上传""" upload_file = request.files.get("upload_file" ) fname_suffix = upload_file.filename.split("." )[-1 ].lower() fname_prefix = "upload" fname_md5 = hashlib.md5(upload_file.read()).hexdigest() fname = "." .join([fname_prefix, fname_md5, fname_suffix]) fpath = current_app.config["STATIC_FILE_UPLOAD_PATH" ] + fname data = current_app.config["STATIC_FILE_HOST" ] + fname upload_file.seek(0 ) upload_file.save(fpath) if request.args.has_key("_is_wang_editor" ): return jsonify( errno=0 , data=[data] ) elif request.args.has_key("_jquery_file_upload" ): return jsonify( errno=0 , data=data ) else : return jsonify( state=1 , code="200" , data=data )
sqlalchemy的复杂操作 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 from sqlalchemy import func, textquery = db.session.query( VerifyEventLog.idfa, func.count(VerifyEventLog.id ).label("count" ), func.max (VerifyEventLog.log_time).label("latest_time" ), ).filter ( VerifyEventLog.idfa != '' , ).group_by( VerifyEventLog.idfa, ).order_by( text("count desc" ) ) query = db.session.query( func.distinct(VerifyEventLog.log_time).label('log_time' ), VerifyEventLog.idfa, VerifyApp.bundle_id, VerifyApp.bundle_name ).join( VerifyApp, VerifyApp.app_id == VerifyEventLog.app_id ).filter ( VerifyEventLog.idfa == idfa ).order_by( VerifyEventLog.log_time.desc(), )
注意 上面两种order_by
的方法
原则
可读性第一
快速失败,灵活使用断言保护代码。契约式编程(先验条件和后置条件),越早失败,越容易排查错误。
隐藏复杂性。如果复杂性避免不了,应该尽让内部复杂,接口要保持简单易用
不要直接吞掉任何非预知错误和异常
命名尽量要可以看出类型,比如复数表示容器类型,nums,cnts等后缀表示数值(通过后缀和词性来使名称更容易被推断出来含义,比如是属性还是方法)
传入了一个嵌套字典需要注释
保持函数参数和返回值尽量使用简单数据类型
函数要么修改传入的可变参数,要么返回一个值。请不要两者同时做。尽量不要直接修改传入的可变参数而是返回一个新的结果。
返回多个值可以使用namedtuple封装,比用下标更直观。对于可能经常需要变动的返回值,返回字典或者对象要比返回tuple容易修改。
函数尽量返回相同的类型
参数过多的时候推荐调用的地方显示写出参数名 f(a=1,b=2)。建议一个函数传参 5 个以上以后参数就指定参数名进行传递,防止参数不匹配导致的 bug。
如果没有特殊的性能需求,函数返回值尽量使用marshmallow之类序列化,之前的很多项目直接搞一个 dict 各种往里边塞字段然后返回,很难看清楚返回的啥结构,维护起来很累。
保持类的继承层级简单,适当使用mixin。
注意尽量不要在非 __init__
方法中给类赋值属性,笔者在维护别人代码的过程中,发现经常在一些非 init 方法种赋值新的属性,导致后期难以维护,根本不知道这个对象包含哪些属性,删除一个属性的时候坑也多。
基于代码行为测试,不要片面追求测试覆盖率。
获取对象的时候尽量传入需要的字段(数据表列),减少数据传输同时还能避免拼对象的时间消耗
数据库30条军规
一个类、函数等如果难以用一句话描述它的职责,很有可能就违背了SRP(单一职责原则)。
对于复杂的数据结构(比如嵌套类型),可以适当注释出类型,比如最新的 tornado 源码里出现了这种注释 __impl_kwargs = None # type: Dict[str, Any]
当一个函数是另一个函数的私有函数的时候 , 可以设计成闭包
每个 commit 都是独立的小功能,可以单独回滚。
基本上代码的耗时是遵守2/8定律的,集中优化最耗时的代码,衡量成本和收益。
优先从数据结构、算法、数据库、网络IO等层面优化
不要频繁使用诸如data,info,result,handle,process等概念太广泛的词汇给变量命名
给函数命名的一个好办法:首先考虑应该给这个函数写上一句怎样的注释,然后想办法将注释变成函数名称。
用下划线前缀定义一个临时使用的变量
使用下划线开头区分是内部函数还是提供给外部调用
拼接url参数的方法 1 2 3 4 5 6 7 8 9 10 nv = [] nv += [_from_value(n, q) for n, q in [('p' , page), ('n' , per_page), ('b' , int (banned)), ('v' , int (suspected))] if q] nv += [_from_dict(n, q) for n, q in [('e' , filters), ('k' , keywords), ('l' , lower), ('u' , upper), ('s' , orders)] if q] nv = [t for t in nv if t] qs = '?' + '&' .join(nv)
对于request请求 , 不要在request函数里添加status的判断 因为如果在request方法里判断status , 就可能返回None , 然后在调用request的方法中还要判断是否返回的是None . 还不如如下使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 def _request (method, url, **kwargs ): timeout = current_app.config['GAME_REQUEST_TIMEOUT' ] try : res = requests.request(method, url, timeout=timeout, **kwargs) res.raise_for_status() except : return None return res def get (url, params=None ): return _request('GET' , url, params=params) def request_failed (res ): return res is None or not res.json()['state' ] @classmethod def is_blocked (cls, id ): res = get(cls.is_blocked_url.format (id )) if request_failed(res): return None return res.json()['data' ]
返回空的对象 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 class Pagination (object ): total = None has_prev = False has_next = False page = 1 per_page = 20 items = [] def __init__ (self, data=None ): if data is None : return self.items = data['data' ] self.has_prev = data['prev' ] is not None self.has_next = data['next' ] is not None self.page = data['page' ] self.per_page = data['per_page' ] self.total = data['total' ]
1 2 3 4 5 6 7 @classmethod def history (cls, **kwargs ): res = get(cls.history_url, params=kwargs) if request_failed(res): return Pagination() return Pagination(res.json()['data' ])
使用dict的copy函数 1 2 3 x = {'name' :'hyl' } y = x.copy() print (y)
如何定义类变量和实例变量 1 2 3 4 5 6 7 8 9 10 11 12 class BlockWord (object ): base_url = BaseConfig.GAME_CHAT_API search_url = base_url + '/words/block' add_url = base_url + '/words/block' remove_url = base_url + '/words/block' @classmethod def search (cls, **kwargs ): res = get(cls.search_url, params=kwargs) if request_failed(res): return None return Pagination(res.json()['data' ])
定义变量的时候应该考虑 , 这个变量是属于类的 , 还是属于实例的 .
如果是属于类的 , 就应该定义为类变量 , 如果是属于实例的 , 就应该定义在__init__
里面
如果在定义时就确定下来的变量, 就可以写成类变量 , 运行时才确定下来的 , 写成实例变量
1 2 3 4 5 6 class RefreshApp : __REDIS_KEY__ = "app_data_game" def __init__ (self ): self.config = current_app.config self.client = self._init_redis()
redis_key , 就是好存储的键 , 这是已经确定下来的键 , 写成类变量
config要从current_app里拿 , client需要连接 , 都是在运行时才确定下来的 , 写成实例变量
只使用一次的函数就没有必要封装 注意divmod的使用 1 2 3 4 5 6 7 8 9 10 11 12 13 def to_natural_time (seconds ): result = '' units = [('天' , 24 *60 *60 ), ('小时' , 60 *60 ), ('分钟' , 60 ), ('秒' , 1 )] r = seconds for s, d in units: q, r = divmod (r, d) if q > 0 : result += '{}{}' .format (q, s) return result x = to_natural_time(111111 ) print (x)
Sqlalchemy的复杂使用 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 User.query.filter (User.username.endswith('@example.com' )).all () User.query.filter (User.name.like('xxx%' )) User.query.filter (and_(User.name == 'Python' ,User.id ==2 )) User.query.filter (User.name == 'Python' , User.id == 2 ) from sqlalchemy import textUser.query.filter (text('id>=:value1 and id <:value2' )).params(value1=2 ,value2=5 ) User.query.from_statement(text("select * from tags where id=:value" )).params(value=1 ) db.session.execute("SELECT COUNT(*) FROM table" ).scalar() from sqlalchemy import funcdb.session.query(Model).with_entities(Model.field, func.count(Model.field)).group_by(field).all ()
Marshmallow
Marshmallow 很好的实现了 object -> dict
, objects -> list
, string -> dict
和 string -> list
。
要对一个类(记为Class_A
,以便表达)进行序列化和反序列化,首先要创建一个与之对应的类(记Class_A'
),负责实现Class_A
的序列化、序列化和数据校验等,**Class_A'
就是schema**
Schema是序列化功能的载体,每个需要被序列化或反序列化的类,都要设计一个相应的Schema,以实现相关功能。
1 2 3 4 class UserSchema (Schema ): name = fields.Str() email = fields.Email() created_at = fields.DateTime()
序列化使用schema中的dump()
或dumps()
方法,
dump()
方法实现obj -> dict
,
dumps()
方法实现 obj -> string
,
反序列化基于schema中的load()
或loads()
方法
load()
方法将一个传入的dict
,结合schema的约定,再转换为一个dict
,而loads()
方法的传入参数是json格式的string
,同样将传入数据转换为符合规范的dict
。
判断是linux的方法 1 sys.platform.startswith("linux" )
__enter__
和__exit__
__enter(self)__
: 负责返回一个值,该返回值将赋值给as子句后面的var_name,通常返回对象自己,即“self”。函数优先于with后面的“代码块”(statements1,statements2,……)被执行。
__exit__(self, exc_type, exc_val, exc_tb)
: 执行完with后面的代码块后自动调用该函数。with语句后面的“代码块”中有异常(不包括因调用某函数,由被调用函数内部抛出的异常) ,会把异常类型,异常值,异常跟踪信息分别赋值给函数参数exc_type, exc_val, exc_tb ,没有异常的情况下,exc_type, exc_val, exc_tb值都为None。
如果该函数返回True、1类值的Boolean真值,那么将忽略“代码块”中的异常,停止执行“代码块”中剩余语句,但是会继续执行“代码块”后面的语句;
如果函数返回类似0,False类的Boolean假值、或者没返回值,将抛出“代码块”中的异常,那么在没有捕获异常的情况下,中断“代码块”及“代码块”之后语句的执行
js判断是否包含的函数 查看item这个字符串中 abcd
这个子串的索引 , 如果索引不为-1 , 说明包含此子串
1 myarr2.filter(item => item.indexOf('abcd' )>-1 );
posgresql插入数组 1 2 3 class GameGame (db.Model ): __tablename__ = "game_game" os_type = db.Column(db.ARRAY(db.SmallInteger), nullable=False , default=[])
1 2 3 4 5 CREATE TABLE home_game_group ( os_type smallint [] NOT NULL DEFAULT '{}' , ); INSERT INTO home_game_group(os_type) VALUES ('{1,2}' );
flask_wtf避免choices被无限append的方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 class ActicleForm (FlaskForm ): def __init__ (self, *args, **kwargs ): super (XgNewsTitleForm, self).__init__(*args, **kwargs) news_title_choices = [] news_title_choices.extend([ (i.news_id, i.title) for i in XgNews.query.all ()] ) self.news_title.choices = news_title_choices news_title = SelectField( label=u"新闻标题" , coerce=int , choices=[], )
flask选择性filter的最佳实践 1 2 3 4 5 6 7 8 filters = [] query = XgKf_issue.query if issue_type: filters.append(XgKf_issue.issue_type == issue_type) if title: filters.append(XgKf_issue.title.like("%" + title + "%" )) if filters: query = query.filter (*filters)
flask的model使用property模仿外键 1 2 3 4 @property def config (self ): config = GameConfigInfo.query.filter ( GameConfigInfo.game_id == self.game_id).first() return config
__bind_key__
缺失问题在连接不到测试服线上的__bind_key__
的时候 , 可以在本地创建
一个数据库连接到__bind_key__
对应的键上
Flask 报错 422 错误 出现这段原因一般是在 web_args 获取时类型转化错误 , 如
example.com/?args=2 , 后台使用 args:fields.Boolean()获取 , 这种错误的异常栈不会打
印本行 , 很难 debug
html 使用假表单来提交 在 html 使用假表单来提交 , 不需要在 jinja2 前端里设置没有表单元素为 display:none , 而
是在 form 中直接将其设置为 HiddenField
Form 表单在 submit ,会验证各个字段,其中包括 required . 但是如果
使用 Ajax 发送 $(form).serialize()的 data ,就不会触发字段的验证机制
Form 字段 , form 在提交的时候 , input 其实就是将 name:value 构建成一个键值
对 , 所以 input 标签中必须要用 name 属性 . 如果没有 name 属性 , 那么提交的表单将没
有这个字段
sql的写法 如果在join的基础上使用where , 很可能会产生歧义 , 必须使用table.filed方法 不过可以使用 join ( select * from … where ) on … 这样就将where放在了一张表里 , 就不会产生歧义了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 SELECT home_game_server.ogame_id, home_game_server.open_server_time, home_game_server.system_os, server_name FROM home_game_server JOIN ( SELECT home_game_server.ogame_id as ogame_id, max (open_server_time) as open_server_time, system_os FROM home_game_server JOIN home_game_group ON home_game_group.game_group_id = home_game_server.ogame_id AND home_game_group.status = true WHERE open_server_date >= CURRENT_DATE - 7 AND open_server_date <= CURRENT_DATE %s GROUP BY ogame_id, system_os ) t1 ON home_game_server.ogame_id = t1.ogame_id AND home_game_server.open_server_time = t1.open_server_time AND home_game_server.system_os = t1.system_os LIMIT %s OFFSET %s; """ % ( " AND system_os = '%s'" % os_type if os_type else "", page_size, (page - 1) * page_size )
python3使用use_kwargs 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 from webargs import fields, missing, validatefrom webargs.flaskparser import use_kwargs@domain_router.route("/" , methods=["DELETE" ] ) @use_kwargs({"DomainName" : fields.Str( required=True , error_messages={"required" : "缺少域名" }, ),}, location="query" )def delete_domain (DomainName ): pass @domain_router.route("/" , methods=["GET" ] ) @use_kwargs({"cloud" : fields.Str(required=False , missing='' )}, location="query" ) def get_domains (cloud ): """domains列表""" print ('ccccccccc' , cloud) if cloud: domains = Domains.get_domain_list(cloud) else : domains = Domains.query.all () data = [domain.to_dict() for domain in domains] return JsonpResp().success(data)
python2使用use_kwargs 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 from webargs.flaskparser import use_kwargsfrom webargs import fields, missing, validate@news_router.route("/" ) @use_kwargs({ "page" : fields.Integer(missing=1 ), "title" : fields.Str(missing="" ), } )def news_index (page, title ): """新闻公告列表""" query = XgNews.query if title: query = query.filter (XgNews.title.like('%' + title + '%' )) page_obj = SqlalchemyPages(query, page, 100 ) return render_template( "news/news_index.html" , page_obj=page_obj, title=title )
使用SqlalchemyPages 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 class BasePages (object ): def __init__ (self, Query, page, per_page ): self.page = page self.per_page = per_page self.Query = Query self.total_count = self._load_count() def _load_count (self ): raise NotImplementedError() @property def pages (self ): return int (ceil(self.total_count / float (self.per_page))) @property def has_prev (self ): return self.page > 1 @property def has_next (self ): return self.page < self.pages def iter_pages (self, left_edge=2 , left_current=2 , right_current=5 , right_edge=2 ): last = 0 for num in range (1 , self.pages + 1 ): if num <= left_edge or \ (num > self.page - left_current - 1 and \ num < self.page + right_current) or \ num > self.pages - right_edge: if last + 1 != num: yield None yield num last = num def list (self ): raise NotImplementedError() class SqlalchemyPages (BasePages ): def _load_count (self ): return self.Query.count() def list (self ): return self.Query.offset((self.page-1 )*self.per_page).limit(self.per_page).all ()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 def home_picture_unchecked (page, ogame_id ): """玩家晒图列表(未审核)""" query = HomeGamePicture.query.filter ( HomeGamePicture.status == False ) filters = [] if ogame_id: filters.append(HomeGamePicture.ogame_id == ogame_id) if filters: query = query.filter (*filters) page_obj = SqlalchemyPages(query, page, 100 ) return render_template( "home/home_strategy/home_picture_unchecked.html" , page_obj=page_obj)
1 {{ render_pagination(page_obj) }}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 {% macro render_pagination(pagination) %} <nav> <ul class="pagination"> {% if pagination.has_prev %} <li><a href="?page={{ pagination.page - 1 }}&{{ request.args|page_args }}">«</a></li> {% else %} <li class="disabled"><a href="#">«</a></li> {% endif %} {%- for page in pagination.iter_pages() %} {% if page %} {% if page != pagination.page %} <li class=""> <a href="?page={{ page }}&{{ request.args|page_args }}">{{ page }}</a> </li> {% else %} <li class="active"> <a href="?page={{ page }}&{{ request.args|page_args }}">{{ page }}</a> </li> {% endif %} {% else %} <li class="disabled"> <a>......</a> </li> {% endif %} {%- endfor %} {% if pagination.has_next %} <li><a href="?page={{ pagination.page + 1 }}&{{ request.args|page_args }}">»</a></li> {% else %} <li class="disabled"><a href="#">»</a></li> {% endif %} <li class="disabled"> <a style="color:black;">共{{ pagination.total_count }}条记录</a> </li> </ul> </nav> {% endmacro %}
1 2 3 4 from wtforms import TextAreaField, HiddenField, BooleanField, \ RadioField, PasswordField, SubmitField, TextField, \ DateField, IntegerField, SelectField, SelectMultipleField, \ FileField, DateTimeField, DecimalField, StringField, FloatField
FlaskForm.validate_on_submit()验证问题 : FlaskForm使用validate()函数的时候FlaskForm.validators属性可能会失效
通过阅读源码得知 : FlaskForm.validate_on_submit()后台调用Field的validate() , 将extra_validators和FlaskForm的validators属性合成一个chain , 再调用_run_validation_chain()进行验证 . 所以当我们复写validate()的时候需要在函数里调用父类的validate(),保证FlaskForm.validators属性有效