在协程里使用yield

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import asyncio

async def func():
await asyncio.sleep(1)
for x in range(12):
yield x**2


def func2(yy):
print(yy)

async def main():
async for each in func():
func2(each)


loop = asyncio.get_event_loop()
loop.run_until_complete(main())

抽离出每一个对象的设计代码结构

设计代码结构发生混乱的时候,使用功能来划分。就像Scrapy分成了url分发器,request,repose等等

这就是真正的面向对象编程,抽离出每一个对象

要注意还原这个对象最本质的功能 , 比如说一个downloader , 他最本质的功能就是接收一个url,然后去下载,返回response , 而不是从调度器里拿到url,然后去下载,返回response

  • 考虑三个方向 : 传入参数 , 功能 , 传出数据
  • 最好的方法就是考虑这个方法的方法名是什么 , 然后考虑这三点
  • 这种考虑的直接结果就是 :要使用单数 , 不要使用for

两种方案的区别:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class Downloader:
@classmethod
async def run(cls):
# 从scheduler中取出Request对象
request = await scheduler.get_request()
async with aiohttp.ClientSession() as sesssion:
try:
async with sesssion.get(url=request.url, headers=request.headers,
cookies=request.cookies) as resp:
if resp.status == 200:
content = await resp.text()
# 返回Response对象
return Response(content=content, callback=request.callback,
cookies=request.cookies, meta=request.meta)
else:
print(f'--- request status error:{request.url} ---')
except aiohttp.ClientConnectionError:
print(f'--- requests connection error:{request.url} ---')
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class Downloader:
@classmethod
async def run(cls,request):
# 下载
async with aiohttp.ClientSession() as sesssion:
try:
async with sesssion.get(url=request.url, headers=request.headers,
cookies=request.cookies) as resp:
if resp.status == 200:
content = await resp.text()
# 返回Response对象
return Response(content=content, callback=request.callback,
cookies=request.cookies, meta=request.meta)
else:
print(f'--- request status error:{request.url} ---')
except aiohttp.ClientConnectionError:
print(f'--- requests connection error:{request.url} ---')

async for

使用

1
2
3
4
async def func():
await asyncio.sleep(1)
for x in range(12):
yield x**2

之后必须使用async for each in func()接收

1
2
3
async def main():
async for each in func():
func2(each)

将协程视为单线程

遇到问题时,将协程视为单线程,这样有利于解决bug

调度器停止问题

  1. 使用while True:, 不要检测是否为空,以此作为退出的判断。
  2. 我们可以在需要停止的时候返回一个标志,
  3. 然后就可以break了

__init__里面获取实例方法

使用self.就可以在__init__里面获取实例方法了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class A:
def __init__(self):
self.parspe = self.func
self.parspe2 = __class__.func2

def func(self):
return 'a'

@classmethod
def func2(cls):
return 'b'

a = A()
print(a.parspe)
print(a.parspe2)

Flask中,Model的设计

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
class CRUDMixin(object):
__table_args__ = {'extend_existing': True}

id = db.Column(db.Integer, primary_key=True)

@classmethod
def get_by_id(cls, id):
if any((isinstance(id, str) and id.isdigit(),
isinstance(id, (int, float))),):
return cls.query.get(int(id))
return None

@classmethod
def create(cls, **kwargs):
instance = cls(**kwargs)
return instance.save()

def update(self, commit=True, **kwargs):
for attr, value in kwargs.items():
setattr(self, attr, value)
return commit and self.save() or self

def save(self, commit=True):
db.session.add(self)
if commit:
db.session.commit()
return self

def delete(self, commit=True):
db.session.delete(self)
return commit and db.session.commit()

继承CRUDMixin :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
class Cloud_Domains(CRUDMixin, db.Model):
""" 域名表 """
__tablename__ = "zms_cloud_domain"

id = db.Column(db.Integer, autoincrement=True, primary_key=True)
DomainID = db.Column(db.String(20), nullable=False, unique=True, comment="域名实例ID")
DomainName = db.Column(db.String(255), nullable=False, comment="域名")
DomainStatus = db.Column(db.String(20), nullable=False, comment="域名状态")
ServerName = db.Column(db.String(10), nullable=False, comment="域名所在云")
Created_at = db.Column(db.DateTime, default=datetime.datetime.now,
comment="注册日期")
End_at = db.Column(db.DateTime, nullable=False, comment="结束日期")
sid = db.Column(db.ForeignKey("zms_cloud_subject.sid", ondelete="SET NULL"))

@classmethod
def get_domain_list(cls, page):
domains = cls.query.order_by("id").paginate(page, per_page=10)
return domains

# 使用filter进行过滤 , 这样这个函数就有良好的拓展性
@property
def to_dict(self, filter=[]):
d = {}
for c in self.__table__.columns:
value = getattr(self, c.name, None)
if c.name in filter:
continue
elif isinstance(value, datetime.date):
d[c.name] = value.strftime('%Y-%m-%d')
elif isinstance(value, datetime.datetime):
d[c.name] = value.strftime('%Y-%m-%d %H:%M:%S')
else:
d[c.name] = value
return d

使用callbackdict

1
2
3
4
5
6
7
callback_dict = {
f'http://whois.chinaz.com/{domain}': parse_chinaz,
f'http://whois.xinnet.com/domains/{domain}': parse_xinnet,
}

for url, parser in callback_dict.items():
...

而不是

1
2
3
4
5
6
7
8
9
10
11
12
13
14
domain_url = [
f'http://whois.chinaz.com/{domain}',
f'http://whois.xinnet.com/domains/{domain}'
]

parser = [
parse_chinaz,
parse_xinnet,
]

url_parse = [
(url,parse)
for url,parse in zip(domain_url,parser)
]

Flask的Model的写法推荐

要写comment

1
server_id = db.Column(db.Integer, primary_key=True, comment="服务器ID(内部)")

使用"""文档注释 解释说明各个字段

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class Servers(db.Model):
"""
服务器管理
:server_id: int 服务器ID(内部)
:instance_id: string 实例ID
:host_name: string 实例名称
:cpu: int CPU 核数
"""
__tablename__ = "zms_cloud_servers"

server_id = db.Column(db.Integer, primary_key=True, comment="服务器ID(内部)")
instance_id = db.Column(db.String(50), unique=True, comment="实例ID")
instance_name = db.Column(db.String(100), comment="实例名称")
cpu = db.Column(db.SmallInteger, comment="CPU 核数")

Flask中不要使用app,而是使用current_app

1
2
3
4
5
6
# wrong
from app import myModel

# right
from flask import current_app
from current_app import myModel

postgresql 报错 列不存在

原因 : 字符串必须使用单引号 , 不能使用双引号

测试,调bug

  • 调bug的时候需要将全部无关的代码注释掉
  • 注意使用Pycharm的dubug功能

Flask_Model中使用Enum

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
status = db.Column(db.Enum(ServerStatusEnum), index=True, comment="实例状态")

class ServerStatusEnum(Enum):
"""
服务器状态枚举类型
"""
RUNNING = "运行中" # 运行中
STARTING = "启动中" # 启动中
STOPPING = "停止中" # 停止中
STOPPED = "已停止" # 已停止
PENDING = "创建中" # 创建中
LAUNCH_FAILED = "创建失败" # 创建失败
REBOOTING = "重启中" # 重启中
SHUTDOWN = "停止待销毁" # 停止待销毁
TERINATING = "销毁中" # 销毁中

Flask_Form中使用choice

1
2
3
4
5
6
7
8
9
10
class EmployeeForm(FlaskForm):
department = SelectField( '所在部门',choices=[DEPARTEMT_CHOICE])

DEPARTEMT_CHOICE = [
(1, '总经办'),
(2, '人力资源部'),
(3, '技术部'),
(4, '商务部'),
(5, '渠道产品部'),
]

Flask中使用Update

1
2
3
4
def update(self, commit=True, **kwargs):
for attr, value in kwargs.items():
setattr(self, attr, value)
return commit and self.save() or self

CURDMixin

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
class CRUDMixin(object):
__table_args__ = {'extend_existing': True}

id = db.Column(db.Integer, primary_key=True)

@classmethod
def get_by_id(cls, id):
if any((isinstance(id, str) and id.isdigit(),
isinstance(id, (int, float))),):
return cls.query.get(int(id))
return None

@classmethod
def create(cls, **kwargs):
instance = cls(**kwargs)
return instance.save()

def update(self, commit=True, **kwargs):
for attr, value in kwargs.items():
setattr(self, attr, value)
return commit and self.save() or self

def save(self, commit=True):
db.session.add(self)
if commit:
try:
db.session.commit()
except Exception as e:
# 先回退,再抛出异常
db.session.rollback()
raise e
return self

def delete(self, commit=True):
if commit:
try:
db.session.add(self)
return db.session.commit()
except Exception as e:
db.session.rollback()
raise e

Flask一定要先创建表

1
db.create_all()

各种测试都应该在Flask shell中进行

数据库的创建和操作可以写在db.py以便之后使用

尤其是在shell中

.env文件

1
2
3
4
# .env
MAIL_SERVER=smtp.qq.com
MAIL_USERNAME=367224698@qq.com
MAIL_PASSWORD=ljfitqzfphlibjdj
1
2
3
4
5
6
7
8
app.config.update(
MAIL_SERVER = os.getenv('MAIL_SERVER'),
MAIL_PORT = 678,
MAIL_USE_TLS = True,
MAIL_USEERNAME = os.getenv('MAIL_USERNAME'),
MAIL_PASSWORD = os.getenv('MAIL_PASSWORD'),
MAIL_DEFAULT_SENDER = ('Grey Li', os.getenv('MAIL_USERNAME'))
)

Flask_email

默认发信人由一个两元素元祖组成,即(姓名,邮箱地址),比如:

1
MAIL_DEFAULT_SENDER = ('Your Name', 'your_name@example.com')

写数据库 查询语句 尽量返回对象

不要在函数里面也写一堆数据处理, 最好是这种返回形式

1
2
def get_model_data(cls):
return cls.query.all()

爬虫的时间戳参数

一般逻辑为 : 后台获取用户传入的时间戳参数A , 然后获取当前时间B ,如果B-A小于1000,说明此时请求是在1秒之前请求的,为正常请求

面向切面编程

有A,B,C三个方法,但是在调用每一个方法之前,要求打印一个日志:某一个方法被开始调用了!

在调用每个方法之后,也要求打印日志:某个方法被调用完了!

一般人会在每一个方法的开始和结尾部分都会添加一句日志打印吧,这样做如果方法多了,就会有很多重复的代码,显得很麻烦,这时候有人会想到,为什么不把打印日志这个功能封装一下,然后让它能在指定的地方(比如执行方法前,或者执行方法后)自动的去调用呢?如果可以的话,业务功能代码中就不会掺杂这一下其他的代码,所以AOP就是做了这一类的工作,比如,日志输出,事务控制,异常的处理等。。

如果把AOP当做成给我们写的“业务功能”增添一些特效,就会有这么几个问题:

  1. 我们要制作哪些特效

  2. 这些特效使用在什么地方

  3. 这些特效什么时候来使用

Flask出现关系不存在错误

1
2
sqlalchemy.exc.ProgrammingError: (psycopg2.errors.UndefinedTable) 错误:
关系 "zms_cloud_domain" 不存在

很有可能是数据表不存在 , 此错误经常出现在测试中

测试数据库

1
SQLALCHEMY_DATABASE_URI = 'sqlite:///:memory:'

可以把navibar的[item]提取出来形成一个macro

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
{# navi.html #}
{% macro navbar(items, search=False) %}

{%- for item in items %}
<li class="nav-item d-none d-sm-inline-block">
<a href="{{ item['link'] }}" class="nav-link">{{ item['name'] }}</a>
</li>
{%- endfor %}

{%- if search -%}
<!-- SEARCH FORM -->
<form class="form-inline ml-3">
<div class="input-group input-group-sm">
<input class="form-control form-control-navbar" type="search" placeholder="Search" aria-label="Search">
<div class="input-group-append">
<button class="btn btn-navbar" type="submit">
<i class="fas fa-search"></i>
</button>
</div>
</div>
</form>
{% endif %}

{% endmacro %}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
{# base.html #}
{% import "navi.html" as navi %}

{% block sidebar %}
{# 调用navi.html文件你的navbar函数 #}
{{ navi.navbar([
{'name': '控制台', 'link': '#', 'children': [
{'name': '服务器管理', 'link': '/servers', 'children': [
{'name': '服务器展示', 'link': '/servers/index'},
{'name': '服务器主体', 'link': '/servers/subject_index'},
{'name': '服务器列表', 'link': '/servers/server_index'},
]},
{'name': '域名', 'link': '/domains', 'children': [
{'name': '腾讯云', 'link': '/domains/qcloud', 'children':[]},
{'name': '阿里云', 'link': '/domains/aliyun', 'children':[]},
]},
{'name': '紧急联系人', 'link': '/alarm', 'children': []},
]},
{'name': '账户', 'link': '#', 'children': [
{'name': '登出', 'link': 'logout', children: []}
]}
],
active=None,
brand={'name': 'ZKOP', 'link': '#', 'img': '/static/img/AdminLTELogo.png'},
user={'name': 'admin', 'link': '#', 'img': '/static/img/avatar5.png'})
}}
{% endblock %}

封装field

1
2
3
4
5
6
7
8
9
10
11
12
13
{% macro render_field(field) %}
{% with errors = field.errors %}
<div class="form-group{{ 'has-error' if errors }}">
{{ field.label(class="control-label") }}
{{ field(class='form-control', **kwargs)|safe }}
{% if errors %}
{% for error in errors %}
<span class="help-block" style="color: red">{{ error }}</span>
{% endfor %}
{% endif %}
</div>
{% endwith %}
{% endmacro %}

a标签href属性直接写入js

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
<a href="javascript:DeleteDomain({{ item.id }})">删除</a>

<script>
function DeleteDomain(domain_id) {
let data= {"domain_id": domain_id};
$.ajax({
type: "POST",
url: "{{ url_for('.domains_delete') }}",
data: data,
dataType: 'json',
success: function (data) {
if (data.code == 200) {
layer.msg("域名删除成功", {
time: 3000,
end: function () {
window.location.reload()
}
});
} else {
layer.msg(data.error_msg, {
time: 3000,
end: function () {
}
});
}
},
error: function (jqXHR, textStatus, errorThrown) {
/*错误信息处理*/
console.log(jqXHR, textStatus, errorThrown)
}
})
}
</script>

对于一个可以直接继承的,就算再少也要新建一个文件

1
{% extends "domains/index.html" %}

使用两个base.html

  • 我们在使用Admin-TLE的时候 , 可以设计两个base.html : admin-lte/base.htmlbase.html
  • 这样做就将admin-lte/base.html视为一个被调用的库 , 而base.html就是一个实例化的admin-lte/base.html.更加的清晰

复写extend的内容

1
2
3
4
{# base.html #}
{% block content_header %}
{{ content.header(title) }}
{% endblock content_header %}
1
2
3
4
5
6
{# index.html #}
{% extends "base.html" %}

{% block content_header %}
{{ content.header(title) }}
{% endblock content_header %}

这样的好处就是能提醒自己这个template中有这个block

FlaskForm的validate_name失效问题

原因 , Flaskform的validate_name全靠本身的validate()方法进行验证 , 如果Flaskform的validate()被复写,验证就会失效

所以需要在新的validate()调用FlaskForm的validate()

1
2
3
4
5
6
7
8
9
10
11
def validate(self):
rv = FlaskForm.validate(self)
if not rv: return False

self.domain = Domains.query.filter_by(name=self.name.data).first()

if self.domain:
self.name.errors.append(u'域名已存在,请重新输入')
return False

return True

在validate()里调用name属性的方式就是self.name.data

使用FlaskFORM表单的时候就可以自定义一个render_field

1
2
3
4
5
6
7
8
9
10
11
12
13
{% macro render_field(field) %}
{% with errors = field.errors %}
<div class="form-group{{ 'has-error' if errors }}">
{{ field.label(class="control-label") }}
{{ field(class='form-control', **kwargs)|safe }}
{% if errors %}
{% for error in errors %}
<span class="help-block" style="color: red">{{ error }}</span>
{% endfor %}
{% endif %}
</div>
{% endwith %}
{% endmacro %}
1
2
3
4
5
6
7
8
9
10
11
<div class="card-body">
<form method="post">
{{ form.csrf_token }}
{{ render_field(form.types) }}
{{ render_field(form.rr) }}
{{ render_field(form.line) }}
{{ render_field(form.value) }}
{{ render_field(form.ttl) }}
{{ form.submit_bt(class_="btn btn-primary") }}
</form>
</div>

FlaskForm的choices要干掉”=== 请选择 ===”

jinjia中,block不要分离

1
2
3
4
5
<head>
{%- block head %}
<link rel="stylesheet" href="https:xxx.css">
{%- endblock head %}
</head>

调用

1
2
3
4
{%- block head %}
{{ super() }}
<link rel="stylesheet" href="https:xxx.css">
{%- endblock head %}

不要写成

1
2
3
4
5
<head>
<link rel="stylesheet" href="https:xxx.css">
{%- block head %}
{%- endblock head %}
</head>

调用

1
2
3
{%- block head %}
<link rel="stylesheet" href="https:xxx.css">
{%- endblock head %}

另类使用post

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
var iframes = $(layero).find("iframe")[0].contentWindow;
var form = iframes.document.getElementById("AddSubject");
var app_count = $(form).find("#AppCount").val();
var app_id = $(form).find("input[data='app-id']").val();
var app_key = $(form).find("#AppKey").val();
var app_secret = $(form).find("#AppSecret").val();
var service_isp = $(form).find('#ServiceIsp').val();

var data = {
"app_count": app_count,
"app_id": app_id,
"app_key": app_key,
"app_secret": app_secret,
"service_isp": service_isp,
"sid": 0,
};
console.log(data);

$.ajax({
type: "POST",
url: $(form).attr("action"),
// 直接传入映射作为data
data: data,
dataType: 'json',
success: function (data) {}

重构代码的时候一定要考虑实现某个函数的需要什么参数,在此基础上考虑使用类还是函数

gitlab迁移项目

仓库 – settings – general – advanced – Transfer project

之后删除全部的远程连接

1
2
git remote rm origin
git remote rm old-origin

添加新的远程连接

1
git remote add origin https://gitlab-inet.zkyouxi.com/pydev/intern/operat-plat form-server/operat-platform_celery.git

提交

1
git push -u origin --all

__hash__ __eq__

可哈希的集合(hashed collections),需要集合的元素实现了__eq____hash__,而这两个方法可以作一个形象的比喻:

  • 哈希集合就是很多个桶,但每个桶里面只能放一个球。__hash__函数的作用就是找到桶的位置,到底是几号桶。
  • __eq__函数的作用就是当桶里面已经有一个球了,但又来了一个球,它声称它也应该装进这个桶里面(__hash__函数给它说了桶的位置),双方僵持不下,那就得用__eq__函数来判断这两个球是不是相等的(equal),如果是判断是相等的,那么后来那个球就不应该放进桶里,哈希集合维持现状。

为什么要这么设计 : 为了哈希从映射

  • 我们以整形作为键的时候 , 很容易哈希相同 , 也就是__hash__返回的值相同,这时需要使用__eq__进行判断
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class A:
def __init__(self,name):
self.name = name

def __hash__(self):
return hash(self.name)

def __eq__(self, other):
return self.name == other.name

def __str__(self):
return f'{self.name}'

__repr__ = __str__

a = A(name='hyl')
b = A(name='asd')
c = A(name='hyl')

d = {}

for each in ((a,1),(b,2),(c,3)):
d[each[0]] = each[1]

print(d)
# {hyl: 3, asd: 2}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class A:
def __init__(self,name):
self.name = name

def __hash__(self):
return hash(self.name)

# def __eq__(self, other):
# return self.name == other.name

def __str__(self):
return f'{self.name}'

__repr__ = __str__

a = A(name='hyl')
b = A(name='asd')
c = A(name='hyl')

d = {}

for each in ((a,1),(b,2),(c,3)):
d[each[0]] = each[1]

print(d)
# {hyl: 1, asd: 2, hyl: 3}

requests.post中json参数和data参数的区别

参数中明确的参数有datajson
datajson既可以是str,也可以是dict
区别如下:

  • 不管jsonstr还是dict,如果不指定headers中的content-type,默认为application/json
  • datadict时,如果不指定content-type,默认为application/x-www-form-urlencoded,相当于普通form表单提交的形式,此时数据可以从request.POST里面获取,而request.body的内容则为a=1&b=2的这种形式,注意,即使指定content-type=application/json,request.body的值也是类似于a=1&b=2,所以并不能用json.loads(request.body.decode())得到想要的值
  • datastr时,如果不指定content-type,默认为application/json

避免使用getattr

每当我们想用gettattr的时候

1
getattr(celery_logger, log_type)

可以看下源码 , 一般源码就会有有关汇总的函数

1
2
3
def debug(self, msg, *args, **kwargs):
if self.isEnabledFor(DEBUG):
self._log(DEBUG, msg, args, **kwargs)

此时的_log就是那个汇总的函数

连接数据库

Flask-SQLAlchemy 可以容易地连接到多个数据库。

为了实现 这个功能,预配置了 SQLAlchemy 来支持多个“binds”。

1
2
3
4
5
6
7
8
9
10
11
class DevConfig(object):

SQLALCHEMY_BINDS = {
"domain":
'postgresql://%s:%s@%s:%s/%s' %
(POSTGRES_USER, POSTGRES_PASS, POSTGRES_HOST, POSTGRES_PORT,
POSTGRES_DB),

"operat-platform":
'postgresql://intern:intern@148.70.220.176:5432/operat-platform'
}
1
2
3
4
5
class CloudManager(db.Model):
__tablename__ = "zms_cloud_subject"
__bind_key__ = 'operat-platform'

sid = db.Column(db.Integer,autoincrement=True,primary_key=True,comment="主体id")

cls.attrname的本质

cls.attrname 本质就是cls.__dict__[attrname]

CRUDMixin

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
class CRUDMixin(object):

@classmethod
def get(cls, id):
if any((isinstance(id, str) and id.isdigit(),
isinstance(id, (int, float))),):
return cls.query.get(int(id))
return None

@classmethod
def get_by(cls, **kwargs):
return cls.query.filter_by(**kwargs).first()

@classmethod
def all(cls):
return cls.query.all()

@classmethod
def filter_by(cls, **kwargs):
return cls.query.filter_by(**kwargs).all()

@classmethod
def create(cls, **kwargs):
instance = cls(**kwargs)
return instance.save()

def update(self, commit=True, **kwargs):
for attr, value in kwargs.items():
setattr(self, attr, value)
return commit and self.save() or self

def save(self, commit=True):
db.session.add(self)
if commit:
db.session.commit()
return self

def delete(self, commit=True):
db.session.delete(self)
if commit:
return db.session.commit()
return self

def dump(self):
return {c.name: getattr(self, c.name, None)
for c in self.__table__.columns}

注意dump的写法

1
2
3
def dump(self):
return {c.name: getattr(self, c.name, None)
for c in self.__table__.columns}

不要写成

1
2
3
def dump(self):
return {c.name: getattr(self, c.name, None)
for c in self.__dict__}

之后就可以使用

1
2
3
4
@bp.route('/', methods=['POST'])
def create():
CloudManager.create(**request.json)
return make_response()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@domains.route('/records/<int:id>/update', methods=['GET', 'POST'])
def update_record(id):
res = DomainRecord.get(id)
print(res)
if not res or not res['state']:
flash('更新失败', 'danger')
return redirect(url_for('domains.list_domains'))
form = UpdateDomainRecordForm(**res['data']['record'])
if form.validate_on_submit():
res = DomainRecord.update(RecordID=id, **form.dump())
if not res or not res['state']:
flash('更新失败', 'danger')
return redirect(url_for('domains.update_record'))
flash('更新成功', 'success')
return redirect(url_for('domains.list_records',
DomainName=form.DomainName.data))
return render_template('domains/records/update.html', form=form)

手动封装一个返回JSON的方法

1
2
3
4
def make_response(state=1, code=200, msg='OK', data=None):
if data is None:
return jsonify({'state': state, 'code': code, 'msg': msg})
return jsonify({'state': state, 'code': code, 'msg': msg, 'data': data})

文档注释示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class BaseResource(object):
'''Base Resouce'''

@classmethod
def get(cls, id):
'''Get a resource item by id.

Args:
cls: Resource class
id: Resource id

Returns:
A dict containing resource data if the query succeeds,
None otherwise.

'''
raise NotImplementedError()

base类示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
class BaseResource(object):
'''Base Resouce'''

@classmethod
def get(cls, id):
'''Get a resource item by id.

Args:
cls: Resource class
id: Resource id

Returns:
A dict containing resource data if the query succeeds,
None otherwise.

'''
raise NotImplementedError()

@classmethod
def get_by(cls, **kwargs):
'''Get a resource item by some attributes.

Args:
cls: Resource class
**kwargs: Attributes

Returns:
A dict containing resource data if the query succeeds,
None otherwise.
'''
raise NotImplementedError()

@classmethod
def all(cls):
'''Get all resources.

Args:
cls: Resource class

Returns:
A list of dict containing resource data.

'''
raise NotImplementedError()

@classmethod
def filter_by(cls, **kwargs):
'''Get resource list by some attributes.

Args:
cls: Resource class
**kwargs: Attributes

Returns:
A list of dict containing resource data.

'''
raise NotImplementedError()

@classmethod
def create(cls, **kwargs):
'''Create a new resource.

Args:
cls: Resource class
**kwargs: Attributes

Returns:
A dict containing the new resource data.

'''
raise NotImplementedError()

@classmethod
def update(cls, id, **kwargs):
'''Update a resource.

Args:
cls: Resource class
id: The id of the resource to be updated
**kwargs: The update attributes

Returns:
True for success, and False for failure

'''
raise NotImplementedError()

@classmethod
def delete(cls, id):
'''Delete a resource.

Args:
cls: Resource class
id: The id of the resource to be deleted

Returns:
True for success, and False for failure

'''
raise NotImplementedError()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
from app.resource.base import BaseResource
from app.utils import requests


class Resource(BaseResource):
get_url = None
'''URL template for get(), accepts one variable: id'''

get_by_url = None
'''URL string for get_by()'''

all_url = None
'''URL string for all()'''

filter_by_url = None
'''URL string for filter_by()'''

create_url = None
'''URL string for create()'''

update_url = None
'''URL template for update(), accepts one variable: id'''

delete_url = None
'''URL template for update(), accepts one variable: id'''

@classmethod
def get(cls, id):
'''Get a resource item by id.

Args:
cls: Resource class
id: Resource id

Returns:
A dict containing resource data if the query succeeds,
None otherwise.

'''
return requests.get(cls.get_url.format(id))

@classmethod
def get_by(cls, **kwargs):
'''Get a resource item by some attributes.

Args:
cls: Resource class
**kwargs: Attributes

Returns:
A dict containing resource data if the query succeeds,
None otherwise.
'''
return requests.get(cls.get_by_url, params=kwargs)

@classmethod
def all(cls):
'''Get all resources.

Args:
cls: Resource class

Returns:
A list of dict containing resource data.

'''
return requests.get(cls.all_url)

@classmethod
def filter_by(cls, **kwargs):
'''Get resource list by some attributes.

Args:
cls: Resource class
**kwargs: Attributes

Returns:
A list of dict containing resource data.

'''
return requests.get(cls.filter_by_url, params=kwargs)

@classmethod
def create(cls, **kwargs):
'''Create a new resource.

Args:
cls: Resource class
**kwargs: Attributes

Returns:
A dict containing the new resource data.

'''
return requests.post(cls.create_url, json=kwargs)

@classmethod
def update(cls, id, **kwargs):
'''Update a resource.

Args:
cls: Resource class
id: The id of the resource to be updated
**kwargs: The update attributes

Returns:
True for success, and False for failure

'''
return requests.put(cls.update_url.format(id), json=kwargs)

@classmethod
def delete(cls, id):
'''Delete a resource.

Args:
cls: Resource class
id: The id of the resource to be deleted

Returns:
True for success, and False for failure

'''
return requests.delete(cls.delete_url.format(id))
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from app.resource import Resource
from app.utils import requests
from app.config import BaseConfig


class AlarmPerson(Resource):
base_url = BaseConfig.RC_ALARM_PERSON_BASE_URL
all_url = base_url + '/show_alarm'
create_url = base_url + '/add_alarm'
delete_url = base_url + '/delete_alarm'

@classmethod
def delete(cls, id):
return requests.post(cls.delete_url, json={'id': id})


class Employee(Resource):
base_url = BaseConfig.RC_EMPLOYEE_BASE_URL
filter_by_url = base_url + '/show_employee'

所以总共设计三层

  1. base层 : 此层必须高度抽象,规定必须要实现的方法
  2. 中间层: 实现base层的所有方法
  3. 实现层 : 部分实现中间层的方法

Form的最佳实践

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class BaseDomainRecordForm(FlaskForm, FormDataMixin):
DomainName = SelectField('域名')
RR = StringField('主机记录')
Line = StringField('记录')
TTL = IntegerField('TTL')
Types = StringField('类型')
Value = StringField('值')
Priority = IntegerField('MX优先级')

def __init__(self, *args, **kwargs):
super(BaseDomainRecordForm, self).__init__(*args, **kwargs)
self.DomainName.choices = [(d['DomainName'], d['DomainName'])
for d in Domain.all()['data']['domains']]


class CreateDomainRecordForm(BaseDomainRecordForm):
submit = SubmitField('创建')


class UpdateDomainRecordForm(BaseDomainRecordForm):
submit = SubmitField('更新')

就像model有mixin一样 , Form也可以设计Mixin

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from wtforms.fields import HiddenField, SubmitField


def is_hidden_wtf_field(field):
return isinstance(field, HiddenField)


class FormDataMixin(object):
def dump(self):
data = {}
for k, v in self.data.items():
field = getattr(self, k)
if isinstance(field, HiddenField):
continue
if isinstance(field, SubmitField):
continue
data[k] = v
return data

还可以避免使用popular_obj方法

1
2
3
4
5
6
7
8
9
10
11
12
@domains.route('/records/create', methods=['GET', 'POST'])
def create_record():
form = CreateDomainRecordForm()
if form.validate_on_submit():
res = DomainRecord.create(**form.dump())
if not res or not res['state']:
flash('创建失败', 'danger')
return redirect(url_for('domains.create_record'))
flash('创建成功', 'success')
return redirect(url_for('domains.list_records',
DomainName=form.DomainName.data))
return render_template('domains/records/create.html', form=form)

注意 :BaseForm设计的时候不要有ID (主键) , 尤其是UpdateForm

这样的话 : add(插入数据库的时候)没有id,ORM 就会自动生成

update的时候一定要给视图函数传入id , 使用res = DomainRecord.update(RecordID=id, **form.dump())即可

设计函数的时候可以通过默认值来使这个函数更具拓展性

1
2
3
4
5
from datetime import datetime

def fromtimestamp(timestamp, format='%Y-%m-%d %H:%M:%S'):
print(timestamp)
return datetime.fromtimestamp(timestamp).strftime(format)

警告

1
2
3
4
5
<script>
$(".delete-link").click(function() {
return confirm("确认删除吗?");
})
</script>

Docker访问

Linux中可以直接使用127访问容器里面的IP
win中需要在192.168.99.100里访问(这个是容器的默认IP,有多个容器的话就使用其他的IP)

原因:docker是运行在Linux上的,在Windows中运行docker,实际上还是在Windows下先安装了一个Linux环境,然后在这个系统中运行的docker。
也就是说,服务中使用的localhost指的是这个Linux环境的地址,而不是我们的宿主环境Windows10。

coverage测试覆盖率

1
2
coverage run --source=celery_app  -m pytest
coverage report

注意--source=celery_app -m pytest 不能颠倒

生成覆盖报告(htmlcov是文件夹名,任取)

1
coverage html -d htmlcov

Docker网络

Docker 强大的原因之一在于多个 Docker 容器之间的互相连接。涉及到连接,就引出了网络通信的几种模式。Docker 默认提供了 5 种网络驱动模式。

  • bridge: 默认的网络驱动模式。如果不指定驱动程序,bridge 便会作为默认的网络驱动模式。当应用程序运行在需要通信的独立容器 (standalone containers) 中时,通常会选择 bridge 模式。
  • host:移除容器和 Docker 宿主机之间的网络隔离,并直接使用主机的网络。host 模式仅适用于 Docker 17.06+。
  • overlay:overlay 网络将多个 Docker 守护进程连接在一起,并使集群服务能够相互通信。您还可以使用 overlay 网络来实现 swarm 集群和独立容器之间的通信,或者不同 Docker 守护进程上的两个独立容器之间的通信。该策略实现了在这些容器之间进行操作系统级别路由的需求。
  • macvlan:Macvlan 网络允许为容器分配 MAC 地址,使其显示为网络上的物理设备。 Docker 守护进程通过其 MAC 地址将流量路由到容器。对于希望直连到物理网络的传统应用程序而言,使用 macvlan 模式一般是最佳选择,而不应该通过 Docker 宿主机的网络进行路由。
  • none:对于此容器,禁用所有联网。通常与自定义网络驱动程序一起使用。none 模式不适用于集群服务。
1
docker run -it --name box4 --rm --network my-net busybox sh
  • 默认 bridge 网络上的容器只能通过 IP 地址互相访问,除非使用在 docker run 时添加 —link 参数。

    但是--link参数已经被标记为过时 , 我们可以使用docker-compose的links标签

  • docker-compose的links其实就是将redis的内部IP写入flask的hosts文件中

  • 如果只是单纯修改docker-compose文件 ,就不需要重新build一遍

查看本机IP

1
2
ifconfig
# 之后看inet部分

或者

1
hostname -I

连接远程redis

必须要把conf里的bind 取消注释 , 改为bind 0.0.0.0

之后我们就可以在本机里连接远程redis了

1
redis-cli -h ip地址 -p 端口号

远程传输文件

1
2
3
scp my_local_file.zip root@192.168.1.104:/usr/local/nginx/html/webs

scp redis.conf root@172.16.1.13:/apps/operat-platform/op-tasks/

多文件传入

1
scp index.css json.js root@192.168.1.104:/usr/local/nginx/html/webs

复制整个文件夹

从本地文件复制整个文件夹到远程主机上(文件夹假如是diff)
先进入本地目录下,然后运行如下命令:

1
scp -v -r diff root@192.168.1.104:/usr/local/nginx/html/webs

从远程主机复制整个文件夹到本地目录下(文件夹假如是diff)
先进入本地目录下,然后运行如下命令:
scp -r root@192.168.1.104:/usr/local/nginx/html/webs/diff .

docker-compose的command执行多条命令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
version: '3'
services:
op-tasks:
build: .
command:
- /bin/bash
- -c
- |
gunicorn -c gunicorn.py app:app &
python run_celery.py worker -l info --beat -c 1
container_name: flask-op-tasks
env_file:
- optasks-variables.env
ports:
- '50020:5000'
# volumes:
# - .:/var/www/op-tasks
depends_on:
- redis
links:
- redis
network_mode: bridge

redis:
image: redis
container_name: redis
network_mode: bridge

另类导入模块

1
2
import importlib
mod = importlib.import_module('hello')

相当于

1
import hello as mod

钩子函数

钩子函数、注册函数、回调函数,他们的概念其实是一样的。

钩子函数,顾名思义,就是把我们自己实现的hook函数在某一时刻挂接目标挂载点上。

钩子函数是指在执行函数和目标函数之间挂载的函数, 框架开发者给调用方提供一个point -挂载点, 至于挂载什么函数有我们调用方决定, 这样大大提高了灵活性

1. hook函数,就是我们自己实现的函数,函数类型与挂载点匹配(返回值,参数列表)
2. 挂接,也就是hook或者叫注册(register),使得hook函数对目标可用
3. 目标挂载点,也就是挂我们hook函数的地方(我们想在这个目标点实现我们自己的功能)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
import time

class LazyPerson(object):
def __init__(self, name):
self.name = name
self.watch_tv_func = None
self.have_dinner_func = None

def get_up(self):
print("%s get up at:%s" % (self.name, time.time()))

def go_to_sleep(self):
print("%s go to sleep at:%s" % (self.name, time.time()))

def register_tv_hook(self, watch_tv_func):
self.watch_tv_func = watch_tv_func

def register_dinner_hook(self, have_dinner_func):
self.have_dinner_func = have_dinner_func

def enjoy_a_lazy_day(self):

# get up
self.get_up()
time.sleep(3)
# watch tv
# check the watch_tv_func(hooked or unhooked)
# hooked
if self.watch_tv_func is not None:
self.watch_tv_func(self.name)
# unhooked
else:
print("no tv to watch")
time.sleep(3)
# have dinner
# check the have_dinner_func(hooked or unhooked)
# hooked
if self.have_dinner_func is not None:
self.have_dinner_func(self.name)
# unhooked
else:
print("nothing to eat at dinner")
time.sleep(3)
self.go_to_sleep()

def watch_daydayup(name):
print("%s : The program ---day day up--- is funny!!!" % name)

def watch_happyfamily(name):
print("%s : The program ---happy family--- is boring!!!" % name)

def eat_meat(name):
print("%s : The meat is nice!!!" % name)


def eat_hamburger(name):
print("%s : The hamburger is not so bad!!!" % name)


if __name__ == "__main__":
lazy_tom = LazyPerson("Tom")
lazy_jerry = LazyPerson("Jerry")
# register hook
lazy_tom.register_tv_hook(watch_daydayup)
lazy_tom.register_dinner_hook(eat_meat)
lazy_jerry.register_tv_hook(watch_happyfamily)
lazy_jerry.register_dinner_hook(eat_hamburger)
# enjoy a day
lazy_tom.enjoy_a_lazy_day()
lazy_jerry.enjoy_a_lazy_day()

为什么需要钩子

大家思考一下上面这个例子,左键按下方法具体的逻辑是由框架自身去实现,还是由我们用户(调用者)去实现呢?显然应该由我们自己去实现。要提供通用的框架能力,框架自身去实现该方法功能,是没有意义的,所以框架给提供一个挂载的point,把具体逻辑的实现交给用户就好了,灵活可用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
class APP:
def __init__(self):
self.before_request_list = []
self.after_request_list = []

def request(self):
for func in self.before_request_list:
func()

self._request()

for func in self.after_request_list:
func()

def _request(self):
print('requesting')

def before_request(self,func):
self.before_request_list.append(func)

def after_request(self,func):
self.after_request_list.append(func)

a = APP()

@a.before_request
def request_request_func():
print('request before1 ....')

@a.before_request
def request_request_func2():
print('request before2 ....')

@a.after_request
def request_request_func():
print('request before ....')


a.request()

任何不开放给客户端代码使用的方法或属性,应该有一个下划线前缀

gitlab-cd自动部署

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
services:
- redis:alpine

variables:
PIP_CACHE_DIR: "$CI_PROJECT_DIR/.cache/pip"

cache:
key: $CI_JOB_NAME-$CI_COMMIT_REF_SLUG
paths:
- ${PIP_CACHE_DIR}

stages:
- test
- deploy

test_job:
stage: test
image: python:3.7
before_script:
- pip install -r test-requirements.txt -i http://mirrors.aliyun.com/pypi/simple/ --trusted-host mirrors.aliyun.com
script:
- echo "Test started."
- coverage run --source app -m pytest
- coverage report
- echo "Test finished."

deploy_job:
stage: deploy
when: manual
only:
- master
image: "instrumentisto/rsync-ssh"
before_script:
- target_address=172.16.1.61
- ssh-keyscan ${target_address} > ~/.ssh/known_hosts
- chmod 644 ~/.ssh/known_hosts
script:
- echo "Deploy started."
- project_name=op-gateway
- stage_srv=root@${target_address}
- work_dir=/root/apps/operat-platform
- rsync -arzv --exclude='*.git' ./ ${stage_srv}:${work_dir}/${project_name}
- ssh ${stage_srv} "chmod 755 ${work_dir}/${project_name}"
- ssh ${stage_srv} "docker-compose -f ${work_dir}/${project_name}/docker-compose.yml up -d --build"
- ssh ${stage_srv} "docker-compose -f ${work_dir}/${project_name}/docker-compose.yml ps"
- echo "Deploy finished."

网关,网桥

  • 通过字面意思解释就是网络的关口。从技术角度来解释,就是连接两个不同网络的接口。

  • 比如局域网的共享上网服务器就是局域网和广域网的接口。

  • 网关工作在第三层,并且有不可逆性。也就是说,局域网用户可以通过网关直接访问广域网;而广域网用户却无法通过该网关来直接访问局域网用户

  • 网桥,即网络的桥接。也是用来连接两个网络的,但是网桥有一个特点,就是网桥有自己独立的IP地址。既然这样那么它就可以达到路由的作用。也就是说,通过网桥的连接(在设置正确的情况下),可是使两个网络的互相访问是对等的。

  • 网桥工作在数据链路层,将两个局域网(LAN)连起来,根据MAC地址(物理地址)来转发帧,可以看作一个低层的路由器(路由器工作在网络层,根据网络地址,如IP地址进行转发)。它可以有效地联接两个LAN,使本地通信限制在本网段内,并转发相应的信号至另一网段

  • 网桥通常用于联接数量不多的、同一类型的网段。网关可以使用不同的格式、通信协议或结构连接起两个系统。

    网桥连接网络是相似的 , 网关却不一定

装饰器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# print_msg是外围函数
def print_msg():
msg = "I'm closure"

# printer是嵌套函数
def printer():
print(msg)

return printer


# 这里获得的就是一个闭包
closure = print_msg()
# 输出 I'm closure
closure()

也就是说 , 只有在装饰函数执行的时候 , 装饰的闭包内容才会执行(只有执行了closure() , printer函数内容才会执行)

docker出现某个文件不存在的错误

去看看docker-compose , 看是否开启的volume

如果要下载自己git仓库的东西 , 不需要git remote , 直接git clone即可

docker-compose的command如果不使用多条语句 , 不能使用列表

1
2
ERROR: for hyl_zhangkun_oa  Cannot start service hyl_zhangkun_oa: OCI runtime create failed: container_linux.go:346: starting container process caused "exec: \"gunicorn -c gunicorn.py app:employee_oa_app\": executable file not found in $PATH": unknown
ERROR: Encountered errors while bringing up the project.

远程数据库连接数溢出

1
sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) FATAL:  remaining connection slots are reserved for non-replication superuser connections

在测试的时候__bind_key__还是要绑定相同的键 , 但是连接的数据库不同

1
2
3
4
5
class EmployeeModel(db.Model):
"""员工模型"""

__tablename__ = "zkyouxi_employee"
__bind_key__ = 'hyl_zkyouxi_oa'
1
2
3
4
5
6
7
class TestConfig(object):
"test config class"
DEBUG = True
SQLALCHEMY_BINDS = {
"hyl_zkyouxi_oa":
'postgresql://intern:intern@172.16.0.5:54322/hyl_zkyouxi_oa_test'
}
1
2
3
4
5
6
7
8
class DevConfig(object):
"dev config class"
DEBUG = True

SQLALCHEMY_BINDS = {
"hyl_zkyouxi_oa": os.environ.get('POSTGRES_URL') or
'postgresql://intern:intern@172.16.0.5:54322/hyl_zkyouxi_oa'
}

要检测值

不要写成 :

1
2
3
@classmethod
def get_emp(cls, emp_id):
return cls.query.get_or_404(emp_id)

而是

1
2
3
4
5
6
@classmethod
def get_by_id(cls, id):
if any((isinstance(id, str) and id.isdigit(),
isinstance(id, (int, float))), ):
return cls.query.get(int(id))
return None

在gitlab.ci时 , SQLALCHEMY_DATABASE_URI的host修改改成非本地

1
2
3
4
5
# 本地
SQLALCHEMY_DATABASE_URI = "postgresql://postgres:liangbo4869@127.0.0.1:5432/zkyouxi_oa_test"

# 非本地
SQLALCHEMY_DATABASE_URI = "postgresql://postgres:liangbo4869@postgres/zkyouxi_oa_test"

我们在设计表的时候力争主键ID是不会用上的(不能将对象的属性设置为PK)

比如说员工的员工号 , 我们可以设计 :

1
2
3
4
5
6
7
class Employee(db.Model, CRUDMixin):
'''员工的模型类'''
__tablename__ = 'employees'

id = db.Column(db.Integer, primary_key=True)
# 员工号
eid = db.Column(db.String(50), unique=True, index=True)

不要让id和eid共用一个

也就是说 , 力求让主键唯一作用就是 : 唯一标识 , 他是不参与展示的(比如说不会在template中,只会出现在template的url中)

不能将对象的属性设置为PK

在包装类使用完整的文档注释

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
class CRUDMixin(object):
'''CRUD Mixin

数据库CRUD操作的包装类

参考:https://github.com/cburmeister/flask-bones/blob/master/app/database.py

'''

__table_args__ = {'extend_existing': True}

@classmethod
def get_by_id(cls, id):
'''以id查询

Args:
cls: 模型类
id: 数据id,要求可转换为数字

Returns:
若id存在,返回找到的实例;否则返回None.

'''
if any((isinstance(id, str) and id.isdigit(),
isinstance(id, (int, float))),):
return cls.query.get(int(id))
return None

@classmethod
def get_by(cls, **kwargs):
'''以各属性值相等进行单项查询

Args:
cls: 模型类
**kwargs: 模型各属性值

Returns:
若存在符合查询的数据,返回首个实例;否则返回None.

'''
return cls.query.filter_by(**kwargs).first()

@classmethod
def all(cls):
'''返回模型所有数据

Args:
cls: 模型类

Returns:
list: 返回实例列表

'''
return cls.query.all()

@classmethod
def create(cls, **kwargs):
'''创建新的实例并插入

Args:
cls: 模型类
**kwargs: 模型各属性值

Returns:
返回创建得到的实例

'''
instance = cls(**kwargs)
return instance.save()

def update(self, commit=True, **kwargs):
'''更新实例各项属性

Args:
self: 实例
commit (bool): 是否提交到数据库
**kwargs: 更新的属性值

Returns:
返回更新后的实例

'''
for attr, value in kwargs.items():
setattr(self, attr, value)
return commit and self.save() or self

def save(self, commit=True):
'''保存实例

Args:
self: 实例
commit (bool): 是否提交到数据库

Returns:
返回实例

'''
db.session.add(self)
if commit:
db.session.commit()
return self

def delete(self, commit=True):
'''删除数据

Args:
self: 实例
commit (bool): 是否提交到数据库

Returns:
bool: 返回是否成功提交到数据库
'''
db.session.delete(self)
return commit and db.session.commit()

不要在template里手写submit标签

1
<button type="submit" class="btn btn-block btn-primary">添加</button>
1
2
3
4
5
6
class LoginForm(FlaskForm):
'''登录表单类'''
username = StringField('用户名', validators=[DataRequired()])
password = PasswordField('密码', validators=[DataRequired()])
remember_me = BooleanField('下次自动登录')
submit = SubmitField('登录')

在Model中写字段的文档注释

1
2
3
4
5
6
7
8
9
class Department(db.Model):
""" 部门模型
-- 部门id
-- 部门号
-- 外键
-- 所属部门(对应id),0為父部門
-- 是否有子部门
"""
__tablename__ = "zk_department"

macro中不能使用**kwargs参数和*args参数

1
2
3
4
5
6
7
8
9
10
11
12
13
{% macro render_field(field) %}
{% with errors = field.errors %}
<div class="form-group{{ 'has-error' if errors }}">
{{ field.label(class="control-label") }}
{{ field(class='form-control', **kwargs) }}
{% if errors %}
{% for error in errors %}
<span class="help-block" style="color: red">{{ error }}</span>
{% endfor %}
{% endif %}
</div>
{% endwith %}
{% endmacro %}

分页的做法

1
2
3
4
5
6
7
8
9
@user_router.route("/", methods=['GET'])
def user_index():
'''用户首页视图'''
page = request.args.get('page', 1, type=int)
pagination = User.users_pagination_display(page)
users = pagination.items
return render_template("user/user_index.html",
users=users,
pagination=pagination)
1
2
3
4
@classmethod
def users_pagination_display(cls, page):
users = cls.query.order_by("id").paginate(page, per_page=10)
return users
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
{% for item in users %}
<tr>
<td>
<a href="{{ url_for('user.user_edit',user_id=item.id) }}" target="_blank">
{{ item.id }}
</a>
</td>
<td>{{ item.realname }}</td>
<td>{{ item.phone }}</td>
<td>{{ item.get_department_display() }}</td>
<td>{{ item.get_career_display() }}</td>
<td>{{ item.position }}</td>
<td>{{ item.get_status_display() }}</td>
</tr>
{% endfor %}
1
2
3
4
5
<div class="card-footer clearfix">
{% if pagination %}
{{ macros.pagination_widget(pagination, '.user_index') }}
{% endif %}
</div>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
{% macro pagination_widget(pagination, endpoint) %}
<ul class="pagination pagination-sm">
<li {% if not pagination.has_prev %} class="page-item"{% endif %}>
<a href="
{% if pagination.has_prev %}{{ url_for(endpoint, page=pagination.prev_num, **kwargs) }}{% else %}#{% endif %}"
class="page-link">«</a>
</li>

{% for p in pagination.iter_pages() %}
{% if p %}
{% if p == pagination.page %}
<li class="page-item active">
<a href="{{ url_for(endpoint, page = p, **kwargs) }}" class="page-link">{{ p }}</a>
</li>
{% else %}
<li class="page-item">
<a href="{{ url_for(endpoint, page = p, **kwargs) }}" class="page-link">{{ p }}</a>
</li>
{% endif %}
{% else %}
<li class="page-item disabled"><a href="#" class="page-link">…</a></li>
{% endif %}
{% endfor %}

<li{% if not pagination.has_next %} class="page-item" {% endif %}>
<a href="
{% if pagination.has_next %}{{ url_for(endpoint, page=pagination.next_num, **kwargs) }}{% else %}#{% endif %}"
class="page-link">»</a>
</li>
</ul>
{% endmacro %}

web_args插件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from flask import Flask

from webargs import fields
from webargs.flaskparser import use_args

app = Flask(__name__)

hello_args = {
'name': fields.Str(required=True)
}

@app.route('/')
@use_args(hello_args)
def index(args):
return 'Hello ' + args['name']
1
2
3
4
5
6
class UpdateDepartmentSchema(Schema):
'''更新部门数据的参数Schema'''
id = Integer(required=True, validate=lambda i: \
Department.get_by_id(i) is not None)
name = String(allow_none=True, validate=Length(
max=Department.name.type.length))

蓝图

作为初学flask的我们,可以将蓝图理解为没有run方法的Flask对象

多功能类的设计思路

1
2
3
4
5
6
7
8
class Response(
BaseResponse,
ETagResponseMixin,
ResponseStreamMixin,
CommonResponseDescriptorsMixin,
WWWAuthenticateMixin,
):
pass
  1. 设计多个Mixin类
  2. 然后让不同功能的类继承不同的Mixin类即可

所以像我们的爬虫 , 可以设计多个RequestMxin类(比如说post请求 , get请求),多个pareser类,(比如说re解析 , Xpath解析 , beatifulsoup解析等等,还可以让他们支持链式操作)

注意这些Mixin类不应该有__init__方法

类似于

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
import re
from lxml import etree
from pyquery import PyQuery as pq

class ReParserMixin:
def _re_parse(self, func, wb_data, pattern):
return getattr(re.compile(pattern), func)(wb_data)

def re_iter(self, wb_data, pattern):
yield from self._re_parse('finditer', wb_data, pattern)

def re_all(self, wb_data, pattern):
return self._re_parse('findall', wb_data, pattern)

def re_match(self, wb_data, pattern):
return self._re_parse('match', wb_data, pattern)

def re_search(self, wb_data, pattern):
return self._re_parse('search', wb_data, pattern)


class XpathParserMixin:
def xpath_to_ele(self, wb_data, xpath):
yield from etree.HTML(wb_data).xpath(xpath)

def xpath_ele_to_text(self, ele):
return ele.text

def _xpath_to_text(self, wb_data, xpath):
for ele in self.xpath_to_ele(wb_data, xpath):
yield self.xpath_ele_to_text(ele)

def xpath_to_text(self, wb_data, xpath):
return ''.join(self._xpath_to_text(wb_data, xpath))


class CssParserMixin:
def css_to_ele(self, wb_data, css):
yield from pq(wb_data)(css).items()

def css_ele_to_text(self, ele):
return ele.text()

def _css_to_text(self, wb_data, css):
for ele in self.css_to_ele(wb_data, css):
yield self.css_ele_to_text(ele)

def css_to_text(self, wb_data, css):
return ''.join(self._css_to_text(wb_data, css))


class BaseParser(ReParserMixin, XpathParserMixin, CssParserMixin):
def __init__(self, wb_data):
self.wb_data = wb_data

def xpath(self, xpath):
return self.xpath_to_text(self.wb_data, xpath)

def css(self, css):
return self.css_to_text(self.wb_data, css)

def re(self, re_stirng):
return self.re_all(self.wb_data, re_stirng)

class Parser(BaseParser):
pass

win中启动celery

1
2
python run_celery.py worker -l info  --pool=solo
python run_celery.py beat -A app