时间管理

1
2
3
4
5
6
7
8
9
import datetime
from collections import namedtuple

_Time = namedtuple("Time", ("timestamp", "date", "datetime"))

def get_all_time():
now = datetime.datetime.now()
timestamp = int(now.strftime("%s"))
return _Time(timestamp=timestamp, date=now.date(), datetime=now)

这样, 就可以通过

1
2
3
4
t = get_all_time()
t.timestamp
t.data
t.datetime

来获取

Redis实现分布式锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import time

import redis

class RedisLock:
"""redis dist lock
@param name 锁名称
@param client redis连接
@param timeout 超过多少ms锁不释放,则自动过期
@param retries 重试次数
"""
def __init__(self, name, client, timeout=2000, retries=0):
assert retries >= 0, 'retries must >= 0'
self.key = "distributed-redis-lock:%s" % name
self.client = client
self.timeout = timeout
self.retries = retries
self.has_lock = False

def __enter__(self):
while True:
is_ok = self.client.set(self.key, 1, ex=self.timeout, nx=True)
if is_ok:
self.has_lock = True
return self
if self.retries == 0:
break
self.retries -= 1
time.sleep(0.1)
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self.client.delete(self.key)


# 锁调用示例
redis_url = "redis://@127.0.0.1:6379/0"
client = redis.StrictRedis.from_url(redis_url)

uname = "test001" # 注册用户名
with RedisLock("reg:uname:%s" % uname, client) as lock:
if lock.has_lock:
# 注册逻辑
pass

对于不敏感的数据 , 我们可以使用js写入cookies ,然后读取

这样就不需要经过后端

Flask视图函数使用装饰器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
def cookies_hander(func):
@functools.wraps(func)
def inner(*args, **kwargs):
res = func(*args, **kwargs)

if not isinstance(res,werkzeug.wrappers.response.Response):
res = Response(res)

if not request.cookies.get('collapse'):
res.set_cookie('collapse', 'True')
return res
return inner

@index_router.route('/login', methods=["GET", "POST", ])
@cookies_hander
def login():
"""
登录页
"""
form = UserOauthForm()
if form.validate_on_submit():
user = UserOauth.get_user(form.data.get('eid'))
# 登录用户
if form.data.get('remember_me'):
login_user(user, remember=True)
else:
login_user(user)
flash(f"欢迎登陆: 工号{user.id}", category="success")
return redirect(request.args.get('next') or url_for("emp.emp_detail"))
return render_template('user/login.html', form=form)

LocalStorage

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
export function handleLocalStorage(method, key, value) {
switch (method) {
case 'get' : {
let temp = window.localStorage.getItem(key);
if (temp) {
return temp
} else {
return false
}
}
case 'set' : {
window.localStorage.setItem(key, value);
break
}
case 'remove': {
window.localStorage.removeItem(key);
break
}
default : {
return false
}
}
}
1
2
3
4
5
6
7
8
// 存储
handleLocalStorage('set', 'userName', 'Tom');

// 获取
handleLocalStorage('get', 'userName');

// 删除
handleLocalStorage('remove', 'userName');

localStorage

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<script>
var if_collapse = localStorage.getItem('if_collapse');
if (!if_collapse || if_collapse === "0") {
$(`body`).attr(`class`, `sidebar-mini sidebar-open`)
} else {
$(`body`).attr(`class`, `sidebar-mini sidebar-collapse`)
}
$(`#sidebar-collapse-ctrl`).click(function () {
if (!if_collapse || if_collapse === "0") {
localStorage.setItem("if_collapse", "1");
} else {
localStorage.setItem("if_collapse", "0");
}
})
</script>

要多多使用or来代替If

1
2
3
4
5
6
7
8
<script>
var collapse = localStoIfrage.getItem("collapse") || "sidebar-collapse";
$(`body`).attr(`class`, `sidebar-mini ${collapse}`);
$(`.nav-link[data-widget="pushmenu"]`).click(function () {
var t = collapse === "sidebar-collapse" ? "sidebar-open":"sidebar-collapse";
localStorage.setItem("collapse", t);
})
</script>

后台验证

1
2
3
4
5
6
7
8
9
10
11
12
def func(site_id,login_username,login_password)
req = {
'site_id': site_id,
'login_username': login_username,
'login_password': login_password,
'time': int(time.time())
}
req['sign'] = generate_signature(req)
res = requests.post(url, json=req)
if res.status_code != 200 or res.json()['state'] != 1:
return None
return res.json()['data']
1
2
3
4
5
6
7
8
9
10
11
12
13
14
def generate_signature(req):
'''生成签名

Args:
req (dict): 包含请求参数的字典

Returns:
str: 使用请求参数生成的签名
'''
req_str = '&'.join(['{}={}'.format(k, v) for k, v in sorted(req.items())])
secret_key = current_app.config['ZK_LOGIN_SECRET_KEY']
unsign_str = req_str + secret_key
sign = md5(unsign_str.encode()).hexdigest()
return sign
  • 传递的参数包含一个sign
  • sign是由site_id,login_username,login_password,time和screct_key组成

对于Flask_login , 如果PK不是ID ,需要重写get_id

1
2
3
def get_id(self):
'''重写了UserMixin的get_id()'''
return str(self.uid)

get_id()

  • 返回一个能唯一识别用户的,并能用于从 user_loader 回调中加载用户的 unicode 。
  • 注意着 必须 是一个 unicode , 如果 ID 原本是 一个 int 或其它类型,你需要把它转换为 unicode 。

Flask-login内部逻辑

首次登陆

img

Flask-Login在登录过程中主要负责:

  • 将用户对象存入request context中
  • 将用户ID,Session ID等信息存入Session中
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def login_user(user, remember=False, force=False, fresh=True):
if not force and not user.is_active:
return False

user_id = getattr(user, current_app.login_manager.id_attribute)()
session['user_id'] = user_id
session['_fresh'] = fresh
session['_id'] = current_app.login_manager._session_identifier_generator()

if remember:
session['remember'] = 'set'

_request_ctx_stack.top.user = user
user_logged_in.send(current_app._get_current_object(), user=_get_user())
return True
  • getattr(user, current_app.login_manager.id_attribute)() 这里login_manager.id_attribute是一个字符串'get_id'。因此这句的意思是获取User对象的get_id method,然后执行,从而获取到用户的ID

  • 通过session['user_id'] = user_id来将用户的ID存储进Session当中,后面紧跟着将fresh信息,session id信息,remember信息存储进session。

  • _request_ctx_stack.top.user = user这里是将user对象存储进当前的request context中,_request_ctx_stack是一个LocalStack对象,top属性指向的就是当前的request context

  • user_logged_in.send(current_app._get_current_object(), user=_get_user()) 此句中user_logged_in是Flask-Login定义的signal,此处通过send来发射此signal,当注册监听此signal的回调函数收到此signal之后就会执行函数。

    这里send有两个参数,

    • 第一个参数是sender对象,此处通过current_app._get_current_object()来获取当前的app对象,即此signal的sender设为当前的应用;
    • 第二个参数是该signal携带的数据,此处将user对象做为signal的数据传递给相应的回调函数。

非首次登陆

img

在这个流程图中,Flask-Login主要起如下作用:

  1. 从session中获取用户ID
  2. 当用户的请求访问的是受登录保护的路由时,就要通过用户ID重新load user(重新写入session值,将user重新写入请求上下文),如果load user失败则进入鉴权失败处理流程,如果成功,则允许正常处理请求

@login_required装饰器是怎么做到保护路由函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# flask_login/utils.py
def login_required(func):
@wraps(func)
def decorated_view(*args, **kwargs):
# 如果request method为例外method,即在EXEMPT_METHODS中的method,可以不必鉴权
if request.method in EXEMPT_METHODS:
return func(*args, **kwargs)

# 如果_login_disabled为True则不必鉴权
elif current_app.login_manager._login_disabled:
return func(*args, **kwargs)

# 正常鉴权
elif not current_user.is_authenticated:
return current_app.login_manager.unauthorized()
return func(*args, **kwargs)
return decorated_view

默认情况下只有OPTIONS method在EXEMPT_METHODS set中,而GET、PUT、POST等常见的methods都需要鉴权

_login_disabled默认为False

正常鉴权的关键在于current_user.is_authenticated是否为True,为True则正常处理请求,为False则进入unauthorized处理流程。

这个current_user到底怎么就能鉴权了?它是怎么来的呢?来看下定义:

1
2
# flask_login/utils.py
current_user = LocalProxy(lambda: _get_user())
  • current_user是一个LocalProxy对象,其代理的对象需要通过_get_user()来获取,简单来说_get_user()会返回两种用户,一种是正常的用户对象(鉴权成功),一种是anonymous用户对象(鉴权失败)。而正常的用户对象其is_authenticated属性总是为True,相对的anonymous用户对象的is_authenticated属性总是为False

LocalProxy对象每次操作都会重新获取代理的对象从而实现动态更新

而要实现动态更新的关键就在于_get_user函数,接下来我们看下_get_user函数是如何获取user对象的:

1
2
3
4
5
6
# flask_login/utils.py
def _get_user():
if has_request_context() and not hasattr(_request_ctx_stack.top, 'user'):
current_app.login_manager._load_user()

return getattr(_request_ctx_stack.top, 'user', None)

在之前的首次登陆那小节中,我们已经知道用户鉴权成功后,会将User对象保存在当前的request context当中,这时我们调用_get_user函数时就会直接从request context中获取user对象return getattr(_request_ctx_stack.top, 'user', None)
但如果是非首次登陆,当前request context中并没有保存user对象,就需要调用current_app.login_manager._load_user()来去load user对象

接下来再看看如何去load:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# flask_login/login_manager.py
def _load_user(self):
'''Loads user from session or remember_me cookie as applicable'''
user_accessed.send(current_app._get_current_object())

# first check SESSION_PROTECTION
config = current_app.config
if config.get('SESSION_PROTECTION', self.session_protection):
deleted = self._session_protection()
if deleted:
return self.reload_user()

# If a remember cookie is set, and the session is not, move the
# cookie user ID to the session.
#
# However, the session may have been set if the user has been
# logged out on this request, 'remember' would be set to clear,
# so we should check for that and not restore the session.
is_missing_user_id = 'user_id' not in session
if is_missing_user_id:
cookie_name = config.get('REMEMBER_COOKIE_NAME', COOKIE_NAME)
header_name = config.get('AUTH_HEADER_NAME', AUTH_HEADER_NAME)
has_cookie = (cookie_name in request.cookies and
session.get('remember') != 'clear')
if has_cookie:
return self._load_from_cookie(request.cookies[cookie_name])
elif self.request_callback:
return self._load_from_request(request)
elif header_name in request.headers:
return self._load_from_header(request.headers[header_name])

return self.reload_user()

_load_user大体的过程是

  1. 首先检查SESSION_PROTECTION设置,如果SESSION_PROTECTION 为strong或者basic类型,那么就会执行_session_protection()动作,否则不执行此操作。
  2. _session_protection在session_id不一致的时候(比如IP变化会导致session id的变化)才真正有用,这时,如果为basic类型或者session permanent为True时,只标注session为非新鲜的(not fresh);而如果为strong,则会删除session中的用户信息,并重新load user,即调用reload_user
  3. 接下来的代码是说当session中没有用户信息时(这里通过是否能获取到user_id来判断),如果有则直接reload_user,如果没有,则有三种方式来load user,一种是通过remember cookie,一种通过request,一种是通过request header,依次尝试。

remember cookie是指,当用户勾选’remember me’复选框时,Flask-Login会将用户信息放入到指定的cookie当中,同样也是加密的。这就是为什么当session中没有携带用户信息时,我们可以通过remember cookie来获取用户的信息

reload_user是如何获取用户的呢,来看下源代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# flask_login/login_manager.py
def reload_user(self, user=None):
ctx = _request_ctx_stack.top

if user is None:
user_id = session.get('user_id')
if user_id is None:
# 当无法获取到有效的用户id时,就认为是anonymous user
ctx.user = self.anonymous_user()
else:
# user callback就是我们通过@login_manager.user_loader装饰的函数,用于获取user object
if self.user_callback is None:
raise Exception(
"No user_loader has been installed for this "
"LoginManager. Add one with the "
"'LoginManager.user_loader' decorator.")
user = self.user_callback(user_id)
if user is None:
ctx.user = self.anonymous_user()
else:
ctx.user = user
else:
ctx.user = user
  1. 首先获取user id,如果获取不到有效的id,就将user设为anonymous user
  2. 获取到id后,再通过@login_manager.user_loader装饰的函数获取到user对象,如果没有获取到有效的user对象,就认为是anonymous user
  3. 最后将user保存于request context中(无论是正常的用户还是anonymous用户)

https://www.jianshu.com/p/5ba6a956d504

  1. load_user(),被reload_user()调用,将load_user返回的Model实例写入上下文的user属性
  2. reload_user(),每次请求被保护的视图函数时都会被调用,更新请求上下文的user属性 , 和session
  3. current_user : 从请求上下文获取user属性
  4. login_required : 判断current_user.is_authenticated,是否返回current_app.login_manager.unauthorized()

Debug方法

  • 就像要请求别人一样,跟自己讲述自己的操作 , 然后审视这些操作真的没有错误吗
  • 别人发出的提问 ,说自己的错误操作 ,不要总想着反驳 , 审视自己真的没有这么做吗

flask的自增列千万不能自己添入,必须交给sql自己自增填入

flask绝对不能自己手动创建表

必须使用create_all()创建

flask的Form中参数不要使用default

1
2
3
4
5
6
status = SelectField(
'状态',
coerce=int,
choices=[],
#default=1 不要使用default
)

他会自动按照从零到大排序

代码复用的方法

  • 装饰器
  • 钩子函数
  • 封装函数
  • 创建父类

api文档

  • 接口功能
  • 接口地址
  • 接口所需参数
    • 必须参数 ,
    • 非必须参数
    • 通用参数
  • 接口返回数据
    • 数据字段
    • 状态码
    • 错误码

python修改环境变量

  • 若没有特别设定,环境变量继承自父进程
  • 因此,你在 python 里面修改了环境变量,只能影响自身,及由它创建的子进程(若没有显式设定)。
  • 要影响当前登录用户下的所有进程,你得从 “系统设置” - “高级” - “环境变量” 中设置,并重新登录(或重启)。

pytest以类的形式编写测试用例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# pytest以类形式的测试用例
class TestAPI:
@classmethod
def setup_class(cls):
print('\nsetup_class()')

@classmethod
def teardown_class(cls):
print('teardown_class()')

def setup_method(self, method):
print('\nsetup_method()')

def teardown_method(self, method):
print('\nteardown_method()')

def test_1(self):
print('- test_1()')

def test_2(self):
print('- test_2()')

Flask使用redis作为缓存装饰器

1
2
3
4
5
from flask_redis import FlaskRedis
cache_client = FlaskRedis(config_prefix="cache")
REDIS_HOST = os.environ.get('REDIS_HOST', '127.0.0.1')

cache_client.init(app)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import json
import pickle

from oa_app.extensions import cache_client


def cache_table_data(cache_time=60):
def decorator(func):
def warpper(cls, *args, **kwargs):
""" redis 缓存装饰器 """
args_key = ".".join([str(k) for k in args])
redis_key = "table:{}:{}:{}-{}".format(
cls.__name__,
func.__name__,
args_key,
json.dumps(kwargs)
)
# 获取缓存
cache_data = cache_client.get(redis_key)
if cache_data:
data = pickle.loads(cache_data)
else:
data = func(cls, *args, **kwargs)
cache_data = pickle.dumps(data)
cache_client.set(redis_key, cache_data, cache_time)
return data
return warpper
return decorator
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
class CRUDMixin(object):
__table_args__ = {'extend_existing': True}

id = db.Column(db.Integer, autoincrement=True, primary_key=True)

@classmethod
@cache_table_data()
def get_by_id(cls, id):
if any((isinstance(id, str) and id.isdigit(),
isinstance(id, (int, float))), ):
return cls.query.get(int(id))
return None

@classmethod
@cache_table_data()
def get_all(cls):
return cls.query.all()

@classmethod
def get_with_condition(cls, condition):
return cls.query.filter(condition).all()

@classmethod
def create(cls, **kwargs):
instance = cls(**kwargs)
return instance.save()

def update(self, commit=True, **kwargs):
for attr, value in kwargs.items():
setattr(self, attr, value)
return commit and self.save() or self

def save(self, commit=True):
if commit:
db.session.add(self)
db.session.commit()
return self

def delete(self, commit=True):
if commit:
db.session.delete(self)
return db.session.commit()

def rollback(self):
db.session.rollback(self)

def dump(self, filter=[]):
d = {}
for c in self.__table__.columns:
value = getattr(self, c.name, None)
if c.name in filter:
continue
elif isinstance(value, datetime.date):
d[c.name] = value.strftime('%Y-%m-%d')
elif isinstance(value, datetime.datetime):
d[c.name] = value.strftime('%Y-%m-%d %H:%M:%S')
else:
d[c.name] = value
return d

切换redis的时候报UnicodeDecodeError 错误

将原生的redis库改为使用flask_redis插件时 报UnicodeDecodeError . 此时重启即可

  • 有可能是redis中存在相关持久化文件,记住了前面的任务和配置信息,使得redis在进行get和set的时候出现混乱,造成错误
  • 同理使用celery的时候也可能与此错误

如果还是不行 , 那就将

1
cache_client = FlaskRedis(config_prefix="cache", decode_responses=True)

改为

1
cache_client = FlaskRedis(config_prefix="cache")

解决updtae时的问题

  1. 工号eid必须保持唯一(但是可以修改 , 也就是说必能和别人一样)
  2. update的时候 , 要想修改eid,就必须检测emp = Employee.query.filter(Employee.eid == eid).filter(Employee.id != id).first()不存在 ,
  3. 但是form里拿不到view里中的id , 这时就可以使用g
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# view.py
@emp_router.route('/editEmp/<int:id>', methods=["GET", "POST", ])
@login_required
def emp_edit(id):
"""
员工编辑页
"""
g.id = id
emp = Employee.get_by_id(id)
form = EmployeeEditForm(request.form, obj=emp)

if form.validate_on_submit():
emp.update(id=id, **form.dump())
flash("修改成功", category="success")
return redirect(url_for("emp.emp_detail"))
return render_template('emp/emp_edit.html', form=form, id=id)
1
2
3
4
5
6
7
8
9
10
# form.py
class EmployeeEditForm(EmployeeForm, PhoneValidateMixin):
submit = SubmitField('修改')

def validate_eid(self, field):
id = g.id
eid = field.data
emp = Employee.query.filter(Employee.eid == eid).filter(Employee.id != id).first()
if emp:
raise ValidationError('工号已存在')

当if想着要嵌套的时候可以考虑例外

考虑例外 , 直接让他return , 其他的继续运行

Serialize的优秀实践

1
2
3
4
5
6
7
8
9
10
11
12
13
class Serialize(object):
__SERIAL_INCLUDE__ = ()

def serialize(self):
result = {}
for k in self.__SERIAL_INCLUDE__:
if hasattr(self, k):
result[k] = getattr(self, k)
return result

def set_property(self, **kwargs):
for k, v in kwargs.items():
setattr(self, k, v)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class UserUser(db.Model, Serialize):
__tablename__ = "user_user"
__bind_key__ = "userdb"
__SERIAL_INCLUDE__ = ("user_id", "game_id", "package_id",
"chl_id", "raw_user_id",)
query_class = UserUserQuery

user_id = db.Column(db.String(60), primary_key=True)

game_id = db.Column(db.SmallInteger, nullable=False)
package_id = db.Column(db.Integer, nullable=False)
chl_id = db.Column(db.Integer, nullable=False)

chl_user_id = db.Column(db.String(50), nullable=False)
reg_time = db.Column(db.Integer, nullable=False)
reg_date = db.Column(db.Date, nullable=False)

@classmethod
def update_or_add_user(cls, data):
insert_stmt = insert(cls).values(**data)
do_nothing_stmt = insert_stmt.on_conflict_do_nothing(index_elements=[cls.user_id])
db.session.execute(do_nothing_stmt)
db.session.commit()

Mixin类的常用设计总结

所以 , 我们可以总结Mixin类的常用设计

  1. Mixin类不能用__init__ , 配置写成类属性 . 如__SERIAL_INCLUDE__
  2. 方法可以是类方法,也可以是实例方法
  3. 继承时,覆盖原先的类属性__SERIAL_INCLUDE__

Sqlalchemy的remote_side

remote_side : 表中的外键引用的是自身时,如Node类,如果想表示多对一的关系,那么就可以使用remote_side

1
2
3
4
5
6
class Node(Base):
__tablename__ = 'node'
id = Column(Integer, primary_key=True)
parent_id = Column(Integer, ForeignKey('node.id'))
data = Column(String(50))
parent = relationship("Node", remote_side=[id])

如果是想建立一种双向的关系,那么还是结合backref:

1
2
3
4
5
6
7
8
9
class Node(Base):
__tablename__ = 'node'
id = Column(Integer, primary_key=True)
parent_id = Column(Integer, ForeignKey('node.id'))
data = Column(String(50))
children = relationship(
"Node",
backref=backref('parent', remote_side=[id])
)

多级部门

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class Department(db.Model):
__tablename__ = "oa_department"

dept_id = db.Column(db.Integer, autoincrement=True, primary_key=True, comment="部门id")
dept_name = db.Column(db.VARCHAR(20), nullable=False, comment="部门名称")
dept_pid = db.Column(db.Integer, db.ForeignKey(dept_id, ondelete="SET NULL"), comment="上级部门id")
children = db.relationship("Department", backref=db.backref("parent", remote_side=dept_id))

@classmethod
def hier_dept_down(cls):
"""部门取最后一层级,用于员工所属部门。"""
d = {"dept_list": []}
_pids = cls.query.with_entities(cls.dept_pid).all()
pids = set([pid[0] for pid in _pids])
pids.remove(None)
depts = cls.query.filter(~cls.dept_id.in_(pids)).all()
for dept in depts:
d["dept_list"].append(dept.to_dict)
return d

上面的函数看着这张图想

1
2
3
4
5
6
 dept_id | dept_name | dept_pid
---------+-----------+----------
1 | 技术部 |
2 | 市场部 |
3 | 市场一部 | 2
4 | 人事部 |

db.create_all()是建表 , 不是建库

所以我们必须要有一个库才能执行成功

命令行登录psql

1
psql -h 172.16.35.179 -U username -d dbname
  • username为数据库用户名,
  • dbname为要连接的数据库名
1
psql -U postgres

pytest匹配部分函数

运行所有名字中含有的answer1的方法

1
pytest -k answer1
  • py.test -k method -v : 将会运行所有的方法(四个)
  • py.test -k methods -v : 不会运行任何的方法

运行同一个marked的用例:

1
py.test -m case1
1
2
3
4
5
6
7
8
9
10
11
12
import pytest

def func(x):
return x + 1

@pytest.mark.case1
def test_file2_answer1():
assert func(4) == 5

@pytest.mark.case1
def test_file2_answer2():
assert func(6) == 5

pytest常用命令

命令行 解释
pytest –fixtures 显示可用的内置函数
pytest –lf 运行上一次运行失败的用例
pytest -x | –exitfirst 第一次失败后停止
pytest –maxfail=2 第二(n)次失败后停止
pytest test_mod.py 运行单个文件中的用例
pytest testing/ 运行文件夹下的用例
pytest test_mod.py::test_func 运行模块内特指的方法
pytest test_mod.py::TestClass::test_method 运行模块下类内特指的方法
pytest -m slow 运行所有被装饰器标记@pytest.mark.slow的用例
pytest -v pytest1.py -v 用于显示每个测试函数的执行结果
pytest -s pytest1.py -s 用于显示测试函数中print()函数输出

正则获取表格的最佳实践

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
x='''UID        PID  PPID  C STIME TTY          TIME CMD
root 2 0 0 Nov30 ? 00:00:00 [kthreadd]
root 3 2 0 Nov30 ? 00:00:14 [ksoftirqd/0]
root 5 2 0 Nov30 ? 00:00:00 [kworker/0:0H]
root 7 2 0 Nov30 ? 00:00:00 [migration/0]
root 8 2 0 Nov30 ? 00:00:00 [rcu_bh]
root 9 2 0 Nov30 ? 00:00:57 [rcu_sched]'''


import re

for line in re.split(r'\n',x):
res = re.split(r'\s\s+',line)
print(res)
# ['UID', 'PID', 'PPID', 'C STIME TTY', 'TIME CMD']
# ['root', '2', '0', '0 Nov30 ?', '00:00:00 [kthreadd]']
# ['root', '3', '2', '0 Nov30 ?', '00:00:14 [ksoftirqd/0]']
# ['root', '5', '2', '0 Nov30 ?', '00:00:00 [kworker/0:0H]']
# ['root', '7', '2', '0 Nov30 ?', '00:00:00 [migration/0]']
# ['root', '8', '2', '0 Nov30 ?', '00:00:00 [rcu_bh]']
# ['root', '9', '2', '0 Nov30 ?', '00:00:57 [rcu_sched]']
  1. 先使用\n分行
  2. 再使用\s\s+分列

re.findall

注意 : re.findall本质就是非贪婪匹配

match 和 search 方法都是一次匹配,只要找到了一个匹配的结果就返回

boostrap响应click事件

1
2
$('#dataTable tbody').on('click', 'button[action="UpdateUser"]', function () {
$('#update-user').modal('show');

其实不需要使用debugtool , 直接在network里查看返回值

注意jquery的trigger函数 , 直接触发某个事件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
<body>
<input type="text" id="username" />
<input type="button" value="触发text的focus" id="btn1" />
</body>

<script src="https://cdn.bootcss.com/jquery/3.4.1/jquery.js"></script>
<script>
$(function() {
$("#username").focus(function() {
console.log("text focus 被触发了")
})

$("#bn1").click(function() {
console.log('1111');
// 触发text的focus
$("#username").trigger("focus");
})
})
</script>

trigger : 根据绑定到匹配元素的给定的事件类型执行所有的处理程序和行为。

  • trigger : 触发事件 , 但是会执行类似浏览将光标移到输入框内的这种浏览器默认行为
  • triggerHandler : 仅仅只会触发事件所对应的函数

trigger 经常用于处理相同逻辑的时候 :
例如监控密码框必须长度大于6 , 监控的时候有键入 , 失去焦点 , 那么我们只需要做一个逻辑给失去焦点 , 对键入的判断逻辑 我们只要使用trigger就好了

在前端自己写一个flash

1
2
3
4
5
6
7
function flash(msg, category) {
var alert = '<div class="alert alert-' + category + ' alert-dismissible" role="alert">' +
'<button class="close" aria-label="Close" type="button" data-dismiss="alert"><span aria-hidden="true">×</span></button>' +
msg +
'</div>';
$("section.content").prepend(alert);
}

模态框的显示与消失

1
2
$("#ban-modal").modal();
$("#ban-modal").modal("hide");

对于像前端不能设置传递变量的问题

我们可以将两个事件(如click事件)设置成函数 , 然后只要trigger并且传值就可以了

对于wtform

我们可以使用form.errors获取全部的验证错误

1
2
3
4
5
6
7
8
9
10
11
12
13
@emp_router.route('/addEmp/', methods=["GET", "POST", ])
@login_required
def emp_add():
"""
员工添加页
"""
form = EmployeeAddForm(request.form)
if form.validate_on_submit():
Employee.create(**form.dump())
flash("添加成功", category="success")
return redirect(url_for("emp.emp_detail"))
print(form.errors)
return render_template('emp/emp_add.html', form=form)

提示框的简单实用

1
2
3
4
5
6
7
<script src="{{ url_for('static', filename='js/toastr.min.js') }}"></script>
<link href="{{ url_for('static', filename='css/toastr.min.css') }}" rel="stylesheet"/>

{# 初始化信息提示框 #}
<script type="text/javascript">
toastr.options.positionClass = 'toast-top-right';
</script>

触发

1
2
toastr.success('提交数据成功');
toastr.error('提交数据成功');

使用boostrap的图标

1
2
3
4
5
6
7
8
<a href="#" class="btn btn-info btn-lg">
<span class="glyphicon glyphicon-asterisk"></span> Asterisk
</a>


<button type="button" class="btn btn-default btn-sm">
<span class="glyphicon glyphicon-asterisk"></span> Asterisk
</button>

marshmallow验证

1
2
3
4
5
6
7
8
9
10
from marshmallow import fields, Schema, validates, ValidationError
class ItemSchema(Schema):
quantity = fields.Integer()

@validates('quantity')
def validate_quantity(self, value):
if value < 0:
raise ValidationError('Quantity must be greater than 0.')
if value > 30:
raise ValidationError('Quantity must not be greater than 30.')

如果将strict=True传入Schema构造器或者classMeta参数里,则仅会在传入无效数据是报错。可以使用ValidationError.messages变量来获取验证错误的dictionary

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class UserSchema(Schema):
name = fields.Str()
email = fields.Email()
created_at = fields.DateTime()

@post_load
def make_user(self, data):
return User(**data)

class User:
def __init__(name, email, created_at):
self.name = name
self.email = email
self.created_at = created_at
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# coding=utf-8

from flask import Flask
from webargs import fields
from webargs.flaskparser import parser

app = Flask(__name__)


@app.route('/')
def index():
argmap = {"age": fields.Int(), "years_employed": fields.Int()}
result = parser.parse(
argmap,
# 配置全局验证,返回 False 或抛出 ValidationError 异常表示验证失败
validate=lambda args: args["years_employed"] < args["age"]
)
print(result)
return ''


if __name__ == "__main__":
# app.run()
with app.test_client() as client:
response = client.get('/?age=1&years_employed=2')
print(response) # <Response streamed [422 UNPROCESSABLE ENTITY]>
assert response.status_code == 422

response2 = client.get('/?age=2&years_employed=1')
print(response2) # <Response streamed [200 OK]>
assert response2.status_code == 200

序列化与反序列化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class ReportSchema(ma.Schema):
'''Report数据Schema'''
page = fields.Integer(allow_none=True, missing=1)
pn = fields.Integer(allow_none=True, missing=10)
filtertime = fields.String(allow_none=True)
emp = fields.String(allow_none=True)
app = fields.String(allow_none=True)

begin_time = fields.Method('get_begin_time',deserialize='load_begin_time')

def get_begin_time(self,obj):
return obj.filtertime

def load_begin_time(self,obj):
return obj.value
  • get_begin_time功能就是 , dump到外面的时候多生成一个begin_time字段
  • load_begin_time功能就是 , 从外面load进来的时候就begin_time字段的进行修改(也就是说, 外面传进来的时候就有了begin_time这个字段 , 我们只是修改它的值而已)

falsk使用join

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
query = db.session.query(
ReportPersonConsumeTsf, ReportSummaryConsumeTsf,
ReportProfitDaily, ReportSummaryProfit
).filter(
ReportProfitDaily.report_date == ReportPersonConsumeTsf.report_date
).filter(
ReportProfitDaily.report_date == ReportSummaryConsumeTsf.report_date
).filter(
ReportProfitDaily.report_date == ReportSummaryProfit.report_date
).filter(
ReportProfitDaily.app_id == ReportPersonConsumeTsf.app_id
).filter(
ReportProfitDaily.app_id == ReportSummaryConsumeTsf.app_id
).filter(
ReportProfitDaily.app_id == ReportSummaryProfit.app_id
).order_by(ReportPersonConsumeTsf.report_date.desc())
1
2
3
db.session.query(ReportProfitDaily, ReportSummaryConsumeTsf).join(
ReportSummaryConsumeTsf,ReportSummaryConsumeTsf.app_id==ReportProfitDaily.app_id,
).all()
1
2
3
4
5
6
7
x = db.session.query(ReportProfitDaily, ReportSummaryConsumeTsf).join(
ReportSummaryConsumeTsf,
and_(
ReportSummaryConsumeTsf.app_id == ReportProfitDaily.app_id,
ReportSummaryConsumeTsf.report_date == ReportProfitDaily.report_date
)
).all()

group_by 和 join一起使用

1
2
3
select * fromselect sum(qty) from a1 group by a1.name) as A
left Join (select sum(qty) from b1 group by b1.name) as B
on A.id=B.id

flask-query

1
2
3
4
5
6
7
8
9
10
query = (session
.query(User)
.filter(User.username == "asd")
.filter_by(username="asd")
#上面两个都是添加where
.join(Addreess)#使用ForeignKey
.join(Addreess,Addreess.user_id==User.id)#使用显式声明
.limit(10)
.offset(0)
)

常用WTF

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
{% macro form_errors(form, hiddens=True) %}
{%- if form.errors %}
{%- for fieldname, errors in form.errors.items() %}
{%- if is_hidden_wtf_field(form[fieldname]) and hiddens or
not is_hidden_wtf_field(form[fieldname]) and hiddens != 'only' %}
{%- for error in errors %}
<div class="invalid-feedback">{{ error }}</div>
{%- endfor %}
{%- endif %}
{%- endfor %}
{%- endif %}
{%- endmacro %}

{% macro _hz_form_wrap(horizontal_columns, form_type, add_group=False, required=False) %}
{% if form_type == "horizontal" %}
{% if add_group %}
<div class="form-group row{% if required %} required{% endif %}">{% endif %}
<div class="offset-{{ horizontal_columns[0] }}-{{ horizontal_columns[1] }}
col-{{ horizontal_columns[0] }}-{{ horizontal_columns[2] }}
">
{% endif %}
{{ caller() }}

{% if form_type == "horizontal" %}
{% if add_group %}</div>{% endif %}
</div>
{% endif %}
{% endmacro %}

{% macro form_field(field,
form_type="basic",
horizontal_columns=('lg', 2, 10),
button_map={}) %}

{# this is a workaround hack for the more straightforward-code of just passing required=required parameter. older versions of wtforms do not have
the necessary fix for required=False attributes, but will also not set the required flag in the first place. we skirt the issue using the code below #}
{% if field.flags.required and not required in kwargs %}
{% set kwargs = dict(required=True, **kwargs) %}
{% endif %}

{% if field.widget.input_type == 'checkbox' %}
{% call _hz_form_wrap(horizontal_columns, form_type, True, required=required) %}
<div class="form-check{% if form_type == "inline" %} form-check-inline{% endif %}">
<label class="form-check-label">
{{ field(class_="form-check-input")|safe }} {{ field.label.text|safe }}
</label>
</div>
{% endcall %}
{%- elif field.type == 'RadioField' -%}
{# note: A cleaner solution would be rendering depending on the widget,
this is just a hack for now, until I can think of something better #}
{% call _hz_form_wrap(horizontal_columns, form_type, True, required=required) %}
{% for item in field -%}
<div class="form-check{% if form_type == "inline" %} form-check-inline{% endif %}">
<label class="form-check-label">
{{ item(class_="form-check-input")|safe }} {{ item.label.text|safe }}
</label>
</div>
{% endfor %}
{% endcall %}
{%- elif field.type == 'SubmitField' -%}
{# deal with jinja scoping issues? #}
{% set field_kwargs = kwargs %}

{# note: same issue as above - should check widget, not field type #}
{% call _hz_form_wrap(horizontal_columns, form_type, True, required=required) %}
{{ field(class='btn btn-%s' % button_map.get(field.name, 'primary'),
**field_kwargs) }}
{% endcall %}
{%- elif field.type == 'FormField' -%}
{# note: FormFields are tricky to get right and complex setups requiring
these are probably beyond the scope of what this macro tries to do.
the code below ensures that things don't break horribly if we run into
one, but does not try too hard to get things pretty. #}
<fieldset>
<legend>{{ field.label }}</legend>
{%- for subfield in field %}
{% if not is_hidden_wtf_field(subfield) -%}
{{ form_field(subfield,
form_type=form_type,
horizontal_columns=horizontal_columns,
button_map=button_map) }}
{%- endif %}
{%- endfor %}
</fieldset>
{% else -%}
{# Note: Bootstrap 4 no longer supports has-danger. Flask-Bootstrap4 has an issue. #}
<div class="form-group {%- if form_type == "horizontal" %} row{% endif -%}
{%- if field.flags.required %} required{% endif -%}
">
{%- if form_type == "inline" %}
{{ field.label(class="sr-only")|safe }}
{% if field.type == 'FileField' %}
{{ field(class="form-control-file", **kwargs)|safe }}
{% else %}
{{ field(class="form-control mb-2 mr-sm-2 mb-sm-0" + (" is-invalid" if field.errors else ""), **kwargs)|safe }}
{% endif %}
{% elif form_type == "horizontal" %}
{{ field.label(class="form-control-label " + (
" col-%s-%s" % horizontal_columns[0:2]
))|safe }}
<div class=" col-{{ horizontal_columns[0] }}-{{ horizontal_columns[2] }}">
{% if field.type == 'FileField' %}
{{ field(class="form-control-file", **kwargs)|safe }}
{% else %}
{{ field(class="form-control" + (" is-invalid" if field.errors else ""), **kwargs)|safe }}
{% endif %}
</div>
{%- if field.errors %}
{%- for error in field.errors %}
{% call _hz_form_wrap(horizontal_columns, form_type, required=required) %}
<div class="invalid-feedback">{{ error }}</div>
{% endcall %}
{%- endfor %}
{%- elif field.description -%}
{% call _hz_form_wrap(horizontal_columns, form_type, required=required) %}
<small class="form-text text-muted">{{ field.description|safe }}</small>
{% endcall %}
{%- endif %}
{%- else -%}
{{ field.label(class="form-control-label")|safe }}
{% if field.type == 'FileField' %}
{{ field(class="form-control-file", **kwargs)|safe }}
{% else %}
{{ field(class="form-control" + (" is-invalid" if field.errors else ""), **kwargs)|safe }}
{% endif %}

{%- if field.errors %}
{%- for error in field.errors %}
<div class="invalid-feedback">{{ error }}</div>
{%- endfor %}
{%- elif field.description -%}
<small class="form-text text-muted">{{ field.description|safe }}</small>
{%- endif %}
{%- endif %}
</div>
{% endif %}
{% endmacro %}

{# valid form types are "basic", "inline" and "horizontal" #}
{% macro quick_form(form,
action="",
method="post",
extra_classes=None,
role="form",
form_type="basic",
horizontal_columns=('lg', 2, 10),
enctype=None,
button_map={},
id="",
novalidate=False,
render_kw={}) %}
{#-
action="" is what we want, from http://www.ietf.org/rfc/rfc2396.txt:

4.2. Same-document References

A URI reference that does not contain a URI is a reference to the
current document. In other words, an empty URI reference within a
document is interpreted as a reference to the start of that document,
and a reference containing only a fragment identifier is a reference
to the identified fragment of that document. Traversal of such a
reference should not result in an additional retrieval action.
However, if the URI reference occurs in a context that is always
intended to result in a new request, as in the case of HTML's FORM
element, then an empty URI reference represents the base URI of the
current document and should be replaced by that URI when transformed
into a request.

-#}
{#- if any file fields are inside the form and enctype is automatic, adjust
if file fields are found. could really use the equalto test of jinja2
here, but latter is not available until 2.8

warning: the code below is guaranteed to make you cry =(
#}
{%- set _enctype = [] %}
{%- if enctype is none -%}
{%- for field in form %}
{%- if field.type == 'FileField' %}
{#- for loops come with a fairly watertight scope, so this list-hack is
used to be able to set values outside of it #}
{%- set _ = _enctype.append('multipart/form-data') -%}
{%- endif %}
{%- endfor %}
{%- else %}
{% set _ = _enctype.append(enctype) %}
{%- endif %}
<form
{%- if action != None %} action="{{ action }}"{% endif -%}
{%- if id %} id="{{ id }}"{% endif -%}
{%- if method %} method="{{ method }}"{% endif %}
class="form
{%- if extra_classes %} {{ extra_classes }}{% endif -%}
{%- if form_type == "inline" %} form-inline{% endif -%}
"
{%- if _enctype[0] %} enctype="{{ _enctype[0] }}"{% endif -%}
{%- if role %} role="{{ role }}"{% endif -%}
{%- if novalidate %} novalidate{% endif -%}
{%- if render_kw %} {{ render_kw|xmlattr }}{% endif -%}
>
{{ form.hidden_tag() }}
{{ form_errors(form, hiddens='only') }}

{%- for field in form %}
{% if not is_hidden_wtf_field(field) -%}
{{ form_field(field,
form_type=form_type,
horizontal_columns=horizontal_columns,
button_map=button_map) }}
{%- endif %}
{%- endfor %}

</form>
{%- endmacro %}

常用Modal

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
{% macro modal(id, title, form, action) %}
<div class="modal fade" id="{{ id }}" style="display: none;"
aria-hidden="true">
<div class="modal-dialog">
<div class="modal-content">
<div class="modal-header">
<h4 class="modal-title">{{ title }}</h4>
<button type="button" class="close" data-dismiss="modal" aria-label="Close">
<span aria-hidden="true">×</span>
</button>
</div>
<div class="modal-body">
{{ wtf.quick_form(form, action=action) }}
</div>
</div>
<!-- /.modal-content -->
</div>
<!-- /.modal-dialog -->
</div>
{% endmacro %}


{% macro modal_ajax(id, title, form, btn) %}
<div class="modal fade" id="{{ id }}" style="display: none;"
aria-hidden="true">
<div class="modal-dialog">
<div class="modal-content">
<div class="modal-header">
<h4 class="modal-title">{{ title }}</h4>
<button type="button" class="close" data-dismiss="modal" aria-label="Close">
<span aria-hidden="true">×</span>
</button>
</div>
<div class="modal-body">
<form id="{{ id }}">
{{ form.csrf_token() }}
{{ wtf.form_field(form['app_id']) }}
{{ wtf.form_field(form['channel_num']) }}
{{ wtf.form_field(form['remark']) }}
{{ wtf.form_field(form['upload_file']) }}
</form>
</div>
<div class="modal-footer justify-content-between">
<button type="button" class="btn btn-default" data-dismiss="modal">取消</button>
<button type="button" class="btn btn-primary" id="save-btn">保存</button>
</div>
</div>
<!-- /.modal-content -->
</div>
<!-- /.modal-dialog -->
</div>
{% endmacro %}

group_by

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
field = [
ReportPersonConsumeTsf.user_name,
ReportPersonConsumeTsf.report_date,
ReportPersonConsumeTsf.app_name
]

table_fields = {
ReportPersonConsumeTsf: [
'consume_jrtt', 'consume_gdt', 'consume_ks', 'consume_qtt',
'consume_bd','consume_aqy','consume_xl','transformation_jrtt',
'transformation_gdt', 'transformation_ks', 'transformation_qtt', 'transformation_bd',
'transformation_aqy','transformation_xl'
],
}

_sum_fields = [
getattr(table, field)
for table, fields in table_fields.items()
for field in fields
]

q = (
db.session.query(*field, *[func.sum(f) for f in _sum_fields])
.filter(ReportPersonConsumeTsf.app_name == ReportProfitDaily.app_name)
.filter(ReportPersonConsumeTsf.report_date == ReportProfitDaily.report_date)
)

if self.starttime and self.endtime:
q = (
q.filter(ReportPersonConsumeTsf.report_date <= self.endtime)
.filter(ReportPersonConsumeTsf.report_date >= self.starttime)
)
return q.group_by(*field)

html复制功能

1
2
<button class="btn btn-sm btn-secondary copy_btn" data-clipboard-text="http://127.0.0.1:50080\junhaiyouxi_kdfasdf_01.png">复制
</button>

新增wtform不存在的字段

1
2
3
4
5
6
7
8
9
{% macro make_stringfield(field) %}
<div class="input-group">
<span class="input-group-text col-3">{{ field.label.text }}</span>
<input class="col-9" type="text" id="{{ field.id }}" name="{{ field.name }}"
value="{{ field.data }}">
</div>
{% endmacro %}

<div class="col">{{ make_stringfield(form.app_name) }}</div>

使用html的layer

1
2
3
4
5
6
7
8
9
<script type="text/javascript">
new Clipboard('.copy_btn').on(
'success',
function (e) {
layer.msg(
"复制成功!",
);
});
</script>

select 实现placeholder 效果

1
2
3
4
5
6
7
<select>
<option value="1" selected disabled style="display: none;">--请选择--</option>
<option value="2">可乐</option>
<option value="3">橙汁</option>
<option value="4">雪碧</option>
<option value="5">牛奶</option>
</select>
1
2
3
4
5
6
7
<select>
<option value="1" hidden>--请选择--</option>
<option value="2">可乐</option>
<option value="3">橙汁</option>
<option value="4">雪碧</option>
<option value="5">牛奶</option>
</select>

解决app_choices不断被append的问题

1
2
3
4
5
6
class AndroidPackageAddForm(FlaskForm):
def __init__(self, *args, **kwargs):
super(AndroidPackageAddForm, self).__init__(*args, **kwargs)
app_choices = [(0, u"--请选择应用--")]
app_choices.extend([(x.id, x.app_name) for x in Application.query.all()])
self.app_id.choices = app_choices

使用QueryClass

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
class AndroidPackageQuery(BaseQuery):
''' 查询类 '''

@classmethod
def get_data(cls):
data = AndroidPackage.query.order_by(AndroidPackage.id.desc()).all()
return data

@classmethod
def fuzzy_channel_num(cls, _list, channel_num):
data = AndroidPackage.query.filter(
AndroidPackage.channel_num.like('%' + channel_num + '%')).all()
return data

@classmethod
def filter_app_name(cls, _list, app_name):
app = Application.get_by(app_name=app_name)
if app is None:
return None
data = [item for item in _list if item.app_id == app.id]
return data


class AndroidPackage(db.Model, DBOperationMixin):
''' 安卓包模型 '''
__tablename__ = "android_package"
query_class = AndroidPackageQuery

id = db.Column(db.Integer, autoincrement=True, primary_key=True)
app_id = db.Column(db.Integer, nullable=False, comment="应用id")
link = db.Column(db.String(200), nullable=False, comment="文件名")
channel_num = db.Column(db.String(200), nullable=False, comment="渠道号")
status = db.Column(db.SMALLINT, nullable=False, comment="上传状态")
remark = db.Column(db.Text, nullable=False, comment='备注', default='')

def __repr__(self):
return '<AndroidPackage %s>:' % self.id

@property
def get_app_name(self):
data = Application.query.get(self.app_id)
return data.app_name

@property
def get_status(self):
STATUS_FORMAT = {1: '上传', 2: '未更新', 3: '正常', 4: '更新中', 5: '更新失败'}
data = STATUS_FORMAT[self.status]
return data

@property
def get_link(self):
new_link = os.path.join(current_app.config['CDN_HOST'], self.link)
return new_link

@classmethod
def get_update_or_create(cls, app_id, file):
# 应用和渠道号相同情况下更新
package = AndroidPackage.query.\
filter(AndroidPackage.channel_num == file['channel_num'],
AndroidPackage.app_id == app_id).first()
if package:
return package, 'update'
return cls(), 'create'

@classmethod
def check_file_name(cls, app_id, file):
app = Application.get_by_id(app_id)
file_name = file['file_name']
_list = file_name.split('-')
if len(_list) < 2:
return "文件名%s格式错误,请按标准化名称输入" % file_name

# 对比文件名的应用包名是否相同
if _list[0] != app.app_package_name:
return "文件%s,应用包名缩写错误,正确应用包名应为%s" % (file_name, app.cp_check_name)

return

def create_db(self, **kwargs):
app = Application.query.get(kwargs['app_id'])
self.app_id = kwargs['app_id'],
self.link = '/'.join([app.app_package_name, kwargs['file']['file_name']]),
self.channel_num = kwargs['file']['channel_num'],
self.remark = '',
self.status = STATUS_LEVEL['NORMAL'],
return self.save()

def update_db(self, file_name):
app = Application.query.get(self.app_id)
self.link = '/'.join([app.app_package_name, file_name]),
self.status = STATUS_LEVEL['UPGRADE']
db.session.commit()

查询的使用使用的filter

1
2
3
4
5
6
7
filters = []
if form.app_name.data:
filters.append(Application.app_name.like('%' + form.app_name.data + '%'))
if form.channel_num.data:
filters.append(AndroidPackage.channel_num.like('%' + form.channel_num.data + '%'))
if filters:
query = query.filter(*filters)
1
2
3
4
5
6
7
8
9
query = BlockClient.query.filter()
filters = [
BlockClient.create_date >= start_date,
BlockClient.create_date <= end_date
]
if client:
filters.append(BlockClient.block_value == client)

query = query.filter(*filters)