在协程里使用yield 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 import asyncioasync def func (): await asyncio.sleep(1 ) for x in range (12 ): yield x**2 def func2 (yy ): print (yy) async def main (): async for each in func(): func2(each) loop = asyncio.get_event_loop() loop.run_until_complete(main())
抽离出每一个对象的设计代码结构 设计代码结构发生混乱的时候,使用功能来划分。就像Scrapy分成了url分发器,request,repose等等
这就是真正的面向对象
编程,抽离出每一个对象
要注意还原这个对象最本质的功能 , 比如说一个downloader , 他最本质的功能就是接收一个url,然后去下载,返回response , 而不是从调度器里拿到url,然后去下载,返回response
考虑三个方向 : 传入参数 , 功能 , 传出数据
最好的方法就是考虑这个方法的方法名是什么 , 然后考虑这三点
这种考虑的直接结果就是 :要使用单数 , 不要使用for
两种方案的区别:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 class Downloader : @classmethod async def run (cls ): request = await scheduler.get_request() async with aiohttp.ClientSession() as sesssion: try : async with sesssion.get(url=request.url, headers=request.headers, cookies=request.cookies) as resp: if resp.status == 200 : content = await resp.text() return Response(content=content, callback=request.callback, cookies=request.cookies, meta=request.meta) else : print (f'--- request status error:{request.url} ---' ) except aiohttp.ClientConnectionError: print (f'--- requests connection error:{request.url} ---' )
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 class Downloader : @classmethod async def run (cls,request ): async with aiohttp.ClientSession() as sesssion: try : async with sesssion.get(url=request.url, headers=request.headers, cookies=request.cookies) as resp: if resp.status == 200 : content = await resp.text() return Response(content=content, callback=request.callback, cookies=request.cookies, meta=request.meta) else : print (f'--- request status error:{request.url} ---' ) except aiohttp.ClientConnectionError: print (f'--- requests connection error:{request.url} ---' )
async for 使用
1 2 3 4 async def func (): await asyncio.sleep(1 ) for x in range (12 ): yield x**2
之后必须使用async for each in func()接收
1 2 3 async def main (): async for each in func(): func2(each)
将协程视为单线程 遇到问题时,将协程视为单线程,这样有利于解决bug
调度器停止问题
使用while True:, 不要检测是否为空,以此作为退出的判断。
我们可以在需要停止的时候返回一个标志,
然后就可以break了
在__init__
里面获取实例方法 使用self.
就可以在__init__
里面获取实例方法了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 class A : def __init__ (self ): self.parspe = self.func self.parspe2 = __class__.func2 def func (self ): return 'a' @classmethod def func2 (cls ): return 'b' a = A() print (a.parspe)print (a.parspe2)
Flask中,Model的设计 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 class CRUDMixin (object ): __table_args__ = {'extend_existing' : True } id = db.Column(db.Integer, primary_key=True ) @classmethod def get_by_id (cls, id ): if any ((isinstance (id , str ) and id .isdigit(), isinstance (id , (int , float ))),): return cls.query.get(int (id )) return None @classmethod def create (cls, **kwargs ): instance = cls(**kwargs) return instance.save() def update (self, commit=True , **kwargs ): for attr, value in kwargs.items(): setattr (self, attr, value) return commit and self.save() or self def save (self, commit=True ): db.session.add(self) if commit: db.session.commit() return self def delete (self, commit=True ): db.session.delete(self) return commit and db.session.commit()
继承CRUDMixin :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 class Cloud_Domains (CRUDMixin, db.Model ): """ 域名表 """ __tablename__ = "zms_cloud_domain" id = db.Column(db.Integer, autoincrement=True , primary_key=True ) DomainID = db.Column(db.String(20 ), nullable=False , unique=True , comment="域名实例ID" ) DomainName = db.Column(db.String(255 ), nullable=False , comment="域名" ) DomainStatus = db.Column(db.String(20 ), nullable=False , comment="域名状态" ) ServerName = db.Column(db.String(10 ), nullable=False , comment="域名所在云" ) Created_at = db.Column(db.DateTime, default=datetime.datetime.now, comment="注册日期" ) End_at = db.Column(db.DateTime, nullable=False , comment="结束日期" ) sid = db.Column(db.ForeignKey("zms_cloud_subject.sid" , ondelete="SET NULL" )) @classmethod def get_domain_list (cls, page ): domains = cls.query.order_by("id" ).paginate(page, per_page=10 ) return domains @property def to_dict (self, filter =[] ): d = {} for c in self.__table__.columns: value = getattr (self, c.name, None ) if c.name in filter : continue elif isinstance (value, datetime.date): d[c.name] = value.strftime('%Y-%m-%d' ) elif isinstance (value, datetime.datetime): d[c.name] = value.strftime('%Y-%m-%d %H:%M:%S' ) else : d[c.name] = value return d
使用callbackdict 1 2 3 4 5 6 7 callback_dict = { f'http://whois.chinaz.com/{domain} ' : parse_chinaz, f'http://whois.xinnet.com/domains/{domain} ' : parse_xinnet, } for url, parser in callback_dict.items(): ...
而不是
1 2 3 4 5 6 7 8 9 10 11 12 13 14 domain_url = [ f'http://whois.chinaz.com/{domain} ' , f'http://whois.xinnet.com/domains/{domain} ' ] parser = [ parse_chinaz, parse_xinnet, ] url_parse = [ (url,parse) for url,parse in zip (domain_url,parser) ]
Flask的Model的写法推荐 要写comment
1 server_id = db.Column(db.Integer, primary_key=True , comment="服务器ID(内部)" )
使用"""
文档注释 解释说明各个字段
1 2 3 4 5 6 7 8 9 10 11 12 13 14 class Servers (db.Model ): """ 服务器管理 :server_id: int 服务器ID(内部) :instance_id: string 实例ID :host_name: string 实例名称 :cpu: int CPU 核数 """ __tablename__ = "zms_cloud_servers" server_id = db.Column(db.Integer, primary_key=True , comment="服务器ID(内部)" ) instance_id = db.Column(db.String(50 ), unique=True , comment="实例ID" ) instance_name = db.Column(db.String(100 ), comment="实例名称" ) cpu = db.Column(db.SmallInteger, comment="CPU 核数" )
Flask中不要使用app,而是使用current_app 1 2 3 4 5 6 from app import myModelfrom flask import current_appfrom current_app import myModel
postgresql 报错 列不存在 原因 : 字符串必须使用单引号 , 不能使用双引号
测试,调bug
调bug的时候需要将全部无关的代码注释掉
注意使用Pycharm的dubug功能
Flask_Model中使用Enum 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 status = db.Column(db.Enum(ServerStatusEnum), index=True , comment="实例状态" ) class ServerStatusEnum (Enum ): """ 服务器状态枚举类型 """ RUNNING = "运行中" STARTING = "启动中" STOPPING = "停止中" STOPPED = "已停止" PENDING = "创建中" LAUNCH_FAILED = "创建失败" REBOOTING = "重启中" SHUTDOWN = "停止待销毁" TERINATING = "销毁中"
1 2 3 4 5 6 7 8 9 10 class EmployeeForm (FlaskForm ): department = SelectField( '所在部门' ,choices=[DEPARTEMT_CHOICE]) DEPARTEMT_CHOICE = [ (1 , '总经办' ), (2 , '人力资源部' ), (3 , '技术部' ), (4 , '商务部' ), (5 , '渠道产品部' ), ]
Flask中使用Update 1 2 3 4 def update (self, commit=True , **kwargs ): for attr, value in kwargs.items(): setattr (self, attr, value) return commit and self.save() or self
CURDMixin 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 class CRUDMixin (object ): __table_args__ = {'extend_existing' : True } id = db.Column(db.Integer, primary_key=True ) @classmethod def get_by_id (cls, id ): if any ((isinstance (id , str ) and id .isdigit(), isinstance (id , (int , float ))),): return cls.query.get(int (id )) return None @classmethod def create (cls, **kwargs ): instance = cls(**kwargs) return instance.save() def update (self, commit=True , **kwargs ): for attr, value in kwargs.items(): setattr (self, attr, value) return commit and self.save() or self def save (self, commit=True ): db.session.add(self) if commit: try : db.session.commit() except Exception as e: db.session.rollback() raise e return self def delete (self, commit=True ): if commit: try : db.session.add(self) return db.session.commit() except Exception as e: db.session.rollback() raise e
Flask一定要先创建表
各种测试都应该在Flask shell中进行 数据库的创建和操作可以写在db.py以便之后使用 尤其是在shell中
.env文件 1 2 3 4 # .env MAIL_SERVER=smtp.qq.com MAIL_USERNAME=367224698@qq.com MAIL_PASSWORD=ljfitqzfphlibjdj
1 2 3 4 5 6 7 8 app.config.update( MAIL_SERVER = os.getenv('MAIL_SERVER' ), MAIL_PORT = 678 , MAIL_USE_TLS = True , MAIL_USEERNAME = os.getenv('MAIL_USERNAME' ), MAIL_PASSWORD = os.getenv('MAIL_PASSWORD' ), MAIL_DEFAULT_SENDER = ('Grey Li' , os.getenv('MAIL_USERNAME' )) )
Flask_email 默认发信人由一个两元素元祖组成,即(姓名,邮箱地址),比如:
1 MAIL_DEFAULT_SENDER = ('Your Name' , 'your_name@example.com' )
写数据库 查询语句 尽量返回对象 不要在函数里面也写一堆数据处理, 最好是这种返回形式
1 2 def get_model_data (cls ): return cls.query.all ()
爬虫的时间戳参数 一般逻辑为 : 后台获取用户传入的时间戳参数A , 然后获取当前时间B ,如果B-A小于1000,说明此时请求是在1秒之前请求的,为正常请求
面向切面编程 有A,B,C三个方法,但是在调用每一个方法之前,要求打印一个日志:某一个方法被开始调用了!
在调用每个方法之后,也要求打印日志:某个方法被调用完了!
一般人会在每一个方法的开始和结尾部分都会添加一句日志打印吧,这样做如果方法多了,就会有很多重复的代码,显得很麻烦,这时候有人会想到,为什么不把打印日志这个功能封装一下,然后让它能在指定的地方(比如执行方法前,或者执行方法后)自动的去调用呢?如果可以的话,业务功能代码中就不会掺杂这一下其他的代码,所以AOP就是做了这一类的工作,比如,日志输出,事务控制,异常的处理等。。
如果把AOP当做成给我们写的“业务功能”增添一些特效,就会有这么几个问题:
我们要制作哪些特效
这些特效使用在什么地方
这些特效什么时候来使用
Flask出现关系不存在错误 1 2 sqlalchemy.exc.ProgrammingError: (psycopg2.errors.UndefinedTable) 错误: 关系 "zms_cloud_domain" 不存在
很有可能是数据表不存在 , 此错误经常出现在测试中
测试数据库 1 SQLALCHEMY_DATABASE_URI = 'sqlite:///:memory:'
可以把navibar的[item]提取出来形成一个macro 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 {# navi.html #} {% macro navbar(items, search=False) %} {%- for item in items %} <li class="nav-item d-none d-sm-inline-block"> <a href="{{ item['link'] }}" class="nav-link">{{ item['name'] }}</a> </li> {%- endfor %} {%- if search -%} <!-- SEARCH FORM --> <form class="form-inline ml-3"> <div class="input-group input-group-sm"> <input class="form-control form-control-navbar" type="search" placeholder="Search" aria-label="Search"> <div class="input-group-append"> <button class="btn btn-navbar" type="submit"> <i class="fas fa-search"></i> </button> </div> </div> </form> {% endif %} {% endmacro %}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 {# base.html #} {% import "navi.html" as navi %} {% block sidebar %} {# 调用navi.html文件你的navbar函数 #} {{ navi.navbar([ {'name': '控制台', 'link': '#', 'children': [ {'name': '服务器管理', 'link': '/servers', 'children': [ {'name': '服务器展示', 'link': '/servers/index'}, {'name': '服务器主体', 'link': '/servers/subject_index'}, {'name': '服务器列表', 'link': '/servers/server_index'}, ]}, {'name': '域名', 'link': '/domains', 'children': [ {'name': '腾讯云', 'link': '/domains/qcloud', 'children':[]}, {'name': '阿里云', 'link': '/domains/aliyun', 'children':[]}, ]}, {'name': '紧急联系人', 'link': '/alarm', 'children': []}, ]}, {'name': '账户', 'link': '#', 'children': [ {'name': '登出', 'link': 'logout', children: []} ]} ], active=None, brand={'name': 'ZKOP', 'link': '#', 'img': '/static/img/AdminLTELogo.png'}, user={'name': 'admin', 'link': '#', 'img': '/static/img/avatar5.png'}) }} {% endblock %}
封装field 1 2 3 4 5 6 7 8 9 10 11 12 13 {% macro render_field(field) %} {% with errors = field.errors %} <div class="form-group{{ 'has-error' if errors }}"> {{ field.label(class="control-label") }} {{ field(class='form-control', **kwargs)|safe }} {% if errors %} {% for error in errors %} <span class="help-block" style="color: red">{{ error }}</span> {% endfor %} {% endif %} </div> {% endwith %} {% endmacro %}
a标签href属性直接写入js 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 <a href="javascript:DeleteDomain({{ item.id }})">删除</a> <script> function DeleteDomain(domain_id) { let data= {"domain_id": domain_id}; $.ajax({ type: "POST", url: "{{ url_for('.domains_delete') }}", data: data, dataType: 'json', success: function (data) { if (data.code == 200) { layer.msg("域名删除成功", { time: 3000, end: function () { window.location.reload() } }); } else { layer.msg(data.error_msg, { time: 3000, end: function () { } }); } }, error: function (jqXHR, textStatus, errorThrown) { /*错误信息处理*/ console.log(jqXHR, textStatus, errorThrown) } }) } </script>
对于一个可以直接继承的,就算再少也要新建一个文件 1 {% extends "domains/index.html" %}
使用两个base.html
我们在使用Admin-TLE的时候 , 可以设计两个base.html : admin-lte/base.html
和base.html
这样做就将admin-lte/base.html
视为一个被调用的库 , 而base.html
就是一个实例化的admin-lte/base.html
.更加的清晰
复写extend的内容 1 2 3 4 {# base.html #} {% block content_header %} {{ content.header(title) }} {% endblock content_header %}
1 2 3 4 5 6 {# index.html #} {% extends "base.html" %} {% block content_header %} {{ content.header(title) }} {% endblock content_header %}
这样的好处就是能提醒自己这个template中有这个block
原因 , Flaskform的validate_name全靠本身的validate()方法进行验证 , 如果Flaskform的validate()被复写,验证就会失效
所以需要在新的validate()调用FlaskForm的validate()
1 2 3 4 5 6 7 8 9 10 11 def validate (self ): rv = FlaskForm.validate(self) if not rv: return False self.domain = Domains.query.filter_by(name=self.name.data).first() if self.domain: self.name.errors.append(u'域名已存在,请重新输入' ) return False return True
在validate()里调用name属性的方式就是self.name.data
1 2 3 4 5 6 7 8 9 10 11 12 13 {% macro render_field(field) %} {% with errors = field.errors %} <div class="form-group{{ 'has-error' if errors }}"> {{ field.label(class="control-label") }} {{ field(class='form-control', **kwargs)|safe }} {% if errors %} {% for error in errors %} <span class="help-block" style="color: red">{{ error }}</span> {% endfor %} {% endif %} </div> {% endwith %} {% endmacro %}
1 2 3 4 5 6 7 8 9 10 11 <div class="card-body"> <form method="post"> {{ form.csrf_token }} {{ render_field(form.types) }} {{ render_field(form.rr) }} {{ render_field(form.line) }} {{ render_field(form.value) }} {{ render_field(form.ttl) }} {{ form.submit_bt(class_="btn btn-primary") }} </form> </div>
jinjia中,block不要分离 1 2 3 4 5 <head> {%- block head %} <link rel="stylesheet" href="https:xxx.css"> {%- endblock head %} </head>
调用
1 2 3 4 {%- block head %} {{ super() }} <link rel="stylesheet" href="https:xxx.css"> {%- endblock head %}
不要写成
1 2 3 4 5 <head> <link rel="stylesheet" href="https:xxx.css"> {%- block head %} {%- endblock head %} </head>
调用
1 2 3 {%- block head %} <link rel="stylesheet" href="https:xxx.css"> {%- endblock head %}
另类使用post 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 var iframes = $(layero).find("iframe" )[0 ].contentWindow;var form = iframes.document.getElementById("AddSubject" );var app_count = $(form).find("#AppCount" ).val();var app_id = $(form).find("input[data='app-id']" ).val();var app_key = $(form).find("#AppKey" ).val();var app_secret = $(form).find("#AppSecret" ).val();var service_isp = $(form).find('#ServiceIsp' ).val();var data = {"app_count" : app_count,"app_id" : app_id,"app_key" : app_key,"app_secret" : app_secret,"service_isp" : service_isp,"sid" : 0 ,}; console .log(data);$.ajax({ type : "POST" , url : $(form).attr("action" ), data : data, dataType : 'json' , success : function (data ) {}
重构代码的时候一定要考虑实现某个函数的需要什么参数,在此基础上考虑使用类还是函数 gitlab迁移项目 仓库 – settings – general – advanced – Transfer project
之后删除全部的远程连接
1 2 git remote rm origin git remote rm old-origin
添加新的远程连接
1 git remote add origin https://gitlab-inet.zkyouxi.com/pydev/intern/operat-plat form-server/operat-platform_celery.git
提交
1 git push -u origin --all
__hash__
与 __eq__
可哈希的集合(hashed collections),需要集合的元素实现了__eq__
和__hash__
,而这两个方法可以作一个形象的比喻:
哈希集合就是很多个桶,但每个桶里面只能放一个球。__hash__
函数的作用就是找到桶的位置,到底是几号桶。
__eq__
函数的作用就是当桶里面已经有一个球了,但又来了一个球,它声称它也应该装进这个桶里面(__hash__
函数给它说了桶的位置),双方僵持不下,那就得用__eq__
函数来判断这两个球是不是相等的(equal),如果是判断是相等的,那么后来那个球就不应该放进桶里,哈希集合维持现状。
为什么要这么设计 : 为了哈希从映射
我们以整形作为键的时候 , 很容易哈希相同 , 也就是__hash__
返回的值相同,这时需要使用__eq__
进行判断
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 class A : def __init__ (self,name ): self.name = name def __hash__ (self ): return hash (self.name) def __eq__ (self, other ): return self.name == other.name def __str__ (self ): return f'{self.name} ' __repr__ = __str__ a = A(name='hyl' ) b = A(name='asd' ) c = A(name='hyl' ) d = {} for each in ((a,1 ),(b,2 ),(c,3 )): d[each[0 ]] = each[1 ] print (d)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 class A : def __init__ (self,name ): self.name = name def __hash__ (self ): return hash (self.name) def __str__ (self ): return f'{self.name} ' __repr__ = __str__ a = A(name='hyl' ) b = A(name='asd' ) c = A(name='hyl' ) d = {} for each in ((a,1 ),(b,2 ),(c,3 )): d[each[0 ]] = each[1 ] print (d)
requests.post中json参数和data参数的区别 参数中明确的参数有data
与json
data
与json
既可以是str
,也可以是dict
区别如下:
不管json
是str
还是dict
,如果不指定headers
中的content-type
,默认为application/json
data
为dict
时,如果不指定content-type
,默认为application/x-www-form-urlencoded
,相当于普通form表单提交的形式,此时数据可以从request.POST
里面获取,而request.body
的内容则为a=1&b=2
的这种形式,注意,即使指定content-type=application/json
,request.body的值也是类似于a=1&b=2
,所以并不能用json.loads(request.body.decode())
得到想要的值
data
为str
时,如果不指定content-type
,默认为application/json
避免使用getattr 每当我们想用gettattr的时候
1 getattr (celery_logger, log_type)
可以看下源码 , 一般源码就会有有关汇总的函数
1 2 3 def debug (self, msg, *args, **kwargs ): if self.isEnabledFor(DEBUG): self._log(DEBUG, msg, args, **kwargs)
此时的_log就是那个汇总的函数
连接数据库
Flask-SQLAlchemy 可以容易地连接到多个数据库。
为了实现 这个功能,预配置了 SQLAlchemy 来支持多个“binds”。
1 2 3 4 5 6 7 8 9 10 11 class DevConfig (object ): SQLALCHEMY_BINDS = { "domain" : 'postgresql://%s:%s@%s:%s/%s' % (POSTGRES_USER, POSTGRES_PASS, POSTGRES_HOST, POSTGRES_PORT, POSTGRES_DB), "operat-platform" : 'postgresql://intern:intern@148.70.220.176:5432/operat-platform' }
1 2 3 4 5 class CloudManager (db.Model ): __tablename__ = "zms_cloud_subject" __bind_key__ = 'operat-platform' sid = db.Column(db.Integer,autoincrement=True ,primary_key=True ,comment="主体id" )
cls.attrname的本质 cls.attrname 本质就是cls.__dict__[attrname]
CRUDMixin 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 class CRUDMixin (object ): @classmethod def get (cls, id ): if any ((isinstance (id , str ) and id .isdigit(), isinstance (id , (int , float ))),): return cls.query.get(int (id )) return None @classmethod def get_by (cls, **kwargs ): return cls.query.filter_by(**kwargs).first() @classmethod def all (cls ): return cls.query.all () @classmethod def filter_by (cls, **kwargs ): return cls.query.filter_by(**kwargs).all () @classmethod def create (cls, **kwargs ): instance = cls(**kwargs) return instance.save() def update (self, commit=True , **kwargs ): for attr, value in kwargs.items(): setattr (self, attr, value) return commit and self.save() or self def save (self, commit=True ): db.session.add(self) if commit: db.session.commit() return self def delete (self, commit=True ): db.session.delete(self) if commit: return db.session.commit() return self def dump (self ): return {c.name: getattr (self, c.name, None ) for c in self.__table__.columns}
注意dump的写法
1 2 3 def dump (self ): return {c.name: getattr (self, c.name, None ) for c in self.__table__.columns}
不要写成
1 2 3 def dump (self ): return {c.name: getattr (self, c.name, None ) for c in self.__dict__}
之后就可以使用
1 2 3 4 @bp.route('/' , methods=['POST' ] ) def create (): CloudManager.create(**request.json) return make_response()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @domains.route('/records/<int:id>/update' , methods=['GET' , 'POST' ] ) def update_record (id ): res = DomainRecord.get(id ) print (res) if not res or not res['state' ]: flash('更新失败' , 'danger' ) return redirect(url_for('domains.list_domains' )) form = UpdateDomainRecordForm(**res['data' ]['record' ]) if form.validate_on_submit(): res = DomainRecord.update(RecordID=id , **form.dump()) if not res or not res['state' ]: flash('更新失败' , 'danger' ) return redirect(url_for('domains.update_record' )) flash('更新成功' , 'success' ) return redirect(url_for('domains.list_records' , DomainName=form.DomainName.data)) return render_template('domains/records/update.html' , form=form)
手动封装一个返回JSON的方法 1 2 3 4 def make_response (state=1 , code=200 , msg='OK' , data=None ): if data is None : return jsonify({'state' : state, 'code' : code, 'msg' : msg}) return jsonify({'state' : state, 'code' : code, 'msg' : msg, 'data' : data})
文档注释示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 class BaseResource (object ): '''Base Resouce''' @classmethod def get (cls, id ): '''Get a resource item by id. Args: cls: Resource class id: Resource id Returns: A dict containing resource data if the query succeeds, None otherwise. ''' raise NotImplementedError()
base类示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 class BaseResource (object ): '''Base Resouce''' @classmethod def get (cls, id ): '''Get a resource item by id. Args: cls: Resource class id: Resource id Returns: A dict containing resource data if the query succeeds, None otherwise. ''' raise NotImplementedError() @classmethod def get_by (cls, **kwargs ): '''Get a resource item by some attributes. Args: cls: Resource class **kwargs: Attributes Returns: A dict containing resource data if the query succeeds, None otherwise. ''' raise NotImplementedError() @classmethod def all (cls ): '''Get all resources. Args: cls: Resource class Returns: A list of dict containing resource data. ''' raise NotImplementedError() @classmethod def filter_by (cls, **kwargs ): '''Get resource list by some attributes. Args: cls: Resource class **kwargs: Attributes Returns: A list of dict containing resource data. ''' raise NotImplementedError() @classmethod def create (cls, **kwargs ): '''Create a new resource. Args: cls: Resource class **kwargs: Attributes Returns: A dict containing the new resource data. ''' raise NotImplementedError() @classmethod def update (cls, id , **kwargs ): '''Update a resource. Args: cls: Resource class id: The id of the resource to be updated **kwargs: The update attributes Returns: True for success, and False for failure ''' raise NotImplementedError() @classmethod def delete (cls, id ): '''Delete a resource. Args: cls: Resource class id: The id of the resource to be deleted Returns: True for success, and False for failure ''' raise NotImplementedError()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 from app.resource.base import BaseResourcefrom app.utils import requestsclass Resource (BaseResource ): get_url = None '''URL template for get(), accepts one variable: id''' get_by_url = None '''URL string for get_by()''' all_url = None '''URL string for all()''' filter_by_url = None '''URL string for filter_by()''' create_url = None '''URL string for create()''' update_url = None '''URL template for update(), accepts one variable: id''' delete_url = None '''URL template for update(), accepts one variable: id''' @classmethod def get (cls, id ): '''Get a resource item by id. Args: cls: Resource class id: Resource id Returns: A dict containing resource data if the query succeeds, None otherwise. ''' return requests.get(cls.get_url.format (id )) @classmethod def get_by (cls, **kwargs ): '''Get a resource item by some attributes. Args: cls: Resource class **kwargs: Attributes Returns: A dict containing resource data if the query succeeds, None otherwise. ''' return requests.get(cls.get_by_url, params=kwargs) @classmethod def all (cls ): '''Get all resources. Args: cls: Resource class Returns: A list of dict containing resource data. ''' return requests.get(cls.all_url) @classmethod def filter_by (cls, **kwargs ): '''Get resource list by some attributes. Args: cls: Resource class **kwargs: Attributes Returns: A list of dict containing resource data. ''' return requests.get(cls.filter_by_url, params=kwargs) @classmethod def create (cls, **kwargs ): '''Create a new resource. Args: cls: Resource class **kwargs: Attributes Returns: A dict containing the new resource data. ''' return requests.post(cls.create_url, json=kwargs) @classmethod def update (cls, id , **kwargs ): '''Update a resource. Args: cls: Resource class id: The id of the resource to be updated **kwargs: The update attributes Returns: True for success, and False for failure ''' return requests.put(cls.update_url.format (id ), json=kwargs) @classmethod def delete (cls, id ): '''Delete a resource. Args: cls: Resource class id: The id of the resource to be deleted Returns: True for success, and False for failure ''' return requests.delete(cls.delete_url.format (id ))
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 from app.resource import Resourcefrom app.utils import requestsfrom app.config import BaseConfigclass AlarmPerson (Resource ): base_url = BaseConfig.RC_ALARM_PERSON_BASE_URL all_url = base_url + '/show_alarm' create_url = base_url + '/add_alarm' delete_url = base_url + '/delete_alarm' @classmethod def delete (cls, id ): return requests.post(cls.delete_url, json={'id' : id }) class Employee (Resource ): base_url = BaseConfig.RC_EMPLOYEE_BASE_URL filter_by_url = base_url + '/show_employee'
所以总共设计三层
base层 : 此层必须高度抽象,规定必须要实现的方法
中间层: 实现base层的所有方法
实现层 : 部分实现中间层的方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 class BaseDomainRecordForm (FlaskForm, FormDataMixin ): DomainName = SelectField('域名' ) RR = StringField('主机记录' ) Line = StringField('记录' ) TTL = IntegerField('TTL' ) Types = StringField('类型' ) Value = StringField('值' ) Priority = IntegerField('MX优先级' ) def __init__ (self, *args, **kwargs ): super (BaseDomainRecordForm, self).__init__(*args, **kwargs) self.DomainName.choices = [(d['DomainName' ], d['DomainName' ]) for d in Domain.all ()['data' ]['domains' ]] class CreateDomainRecordForm (BaseDomainRecordForm ): submit = SubmitField('创建' ) class UpdateDomainRecordForm (BaseDomainRecordForm ): submit = SubmitField('更新' )
就像model有mixin一样 , Form也可以设计Mixin
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 from wtforms.fields import HiddenField, SubmitFielddef is_hidden_wtf_field (field ): return isinstance (field, HiddenField) class FormDataMixin (object ): def dump (self ): data = {} for k, v in self.data.items(): field = getattr (self, k) if isinstance (field, HiddenField): continue if isinstance (field, SubmitField): continue data[k] = v return data
还可以避免使用popular_obj方法
1 2 3 4 5 6 7 8 9 10 11 12 @domains.route('/records/create' , methods=['GET' , 'POST' ] ) def create_record (): form = CreateDomainRecordForm() if form.validate_on_submit(): res = DomainRecord.create(**form.dump()) if not res or not res['state' ]: flash('创建失败' , 'danger' ) return redirect(url_for('domains.create_record' )) flash('创建成功' , 'success' ) return redirect(url_for('domains.list_records' , DomainName=form.DomainName.data)) return render_template('domains/records/create.html' , form=form)
注意 :BaseForm设计的时候不要有ID (主键) , 尤其是UpdateForm
这样的话 : add(插入数据库的时候)没有id,ORM 就会自动生成
update的时候一定要给视图函数传入id , 使用res = DomainRecord.update(RecordID=id, **form.dump())
即可
设计函数的时候可以通过默认值来使这个函数更具拓展性 1 2 3 4 5 from datetime import datetimedef fromtimestamp (timestamp, format ='%Y-%m-%d %H:%M:%S' ): print (timestamp) return datetime.fromtimestamp(timestamp).strftime(format )
警告 1 2 3 4 5 <script > $(".delete-link" ).click(function ( ) { return confirm("确认删除吗?" ); }) </script >
Docker访问 Linux中可以直接使用127访问容器里面的IP win中需要在192.168.99.100里访问(这个是容器的默认IP,有多个容器的话就使用其他的IP)
原因:docker是运行在Linux上的,在Windows中运行docker,实际上还是在Windows下先安装了一个Linux环境,然后在这个系统中运行的docker。 也就是说,服务中使用的localhost指的是这个Linux环境的地址,而不是我们的宿主环境Windows10。
coverage测试覆盖率 1 2 coverage run --source =celery_app -m pytest coverage report
注意--source=celery_app
和 -m pytest
不能颠倒
生成覆盖报告(htmlcov是文件夹名,任取)
1 coverage html -d htmlcov
Docker网络 Docker 强大的原因之一在于多个 Docker 容器之间的互相连接。涉及到连接,就引出了网络通信的几种模式。Docker 默认提供了 5 种网络驱动模式。
bridge: 默认的网络驱动模式。如果不指定驱动程序,bridge 便会作为默认的网络驱动模式。当应用程序运行在需要通信的独立容器 (standalone containers) 中时,通常会选择 bridge 模式。
host:移除容器和 Docker 宿主机之间的网络隔离,并直接使用主机的网络。host 模式仅适用于 Docker 17.06+。
overlay:overlay 网络将多个 Docker 守护进程连接在一起,并使集群服务能够相互通信。您还可以使用 overlay 网络来实现 swarm 集群和独立容器之间的通信,或者不同 Docker 守护进程上的两个独立容器之间的通信。该策略实现了在这些容器之间进行操作系统级别路由的需求。
macvlan:Macvlan 网络允许为容器分配 MAC 地址,使其显示为网络上的物理设备。 Docker 守护进程通过其 MAC 地址将流量路由到容器。对于希望直连到物理网络的传统应用程序而言,使用 macvlan 模式一般是最佳选择,而不应该通过 Docker 宿主机的网络进行路由。
none:对于此容器,禁用所有联网。通常与自定义网络驱动程序一起使用。none 模式不适用于集群服务。
1 docker run -it --name box4 --rm --network my-net busybox sh
docker links
默认 bridge 网络上的容器只能通过 IP 地址互相访问,除非使用在 docker run 时添加 —link
参数。
但是--link
参数已经被标记为过时 , 我们可以使用docker-compose的links标签
docker-compose的links其实就是将redis的内部IP写入flask的hosts文件中
如果只是单纯修改docker-compose文件 ,就不需要重新build一遍
查看本机IP
或者
连接远程redis 必须要把conf里的bind
取消注释 , 改为bind 0.0.0.0
之后我们就可以在本机里连接远程redis了
1 redis-cli -h ip地址 -p 端口号
远程传输文件 1 2 3 scp my_local_file.zip root@192.168.1.104:/usr/local /nginx/html/webs scp redis.conf root@172.16.1.13:/apps/operat-platform/op-tasks/
多文件传入
1 scp index.css json.js root@192.168.1.104:/usr/local /nginx/html/webs
复制整个文件夹
从本地文件复制整个文件夹到远程主机上(文件夹假如是diff) 先进入本地目录下,然后运行如下命令:
1 scp -v -r diff root@192.168.1.104:/usr/local /nginx/html/webs
从远程主机复制整个文件夹到本地目录下(文件夹假如是diff) 先进入本地目录下,然后运行如下命令:scp -r root@192.168.1.104 :/usr/local/nginx/html/webs/diff .
docker-compose的command执行多条命令 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 version: '3' services: op-tasks: build: . command: - /bin/bash - -c - | gunicorn -c gunicorn.py app:app & python run_celery.py worker -l info --beat -c 1 container_name: flask-op-tasks env_file: - optasks-variables.env ports: - '50020:5000' depends_on: - redis links: - redis network_mode: bridge redis: image: redis container_name: redis network_mode: bridge
另类导入模块 1 2 import importlibmod = importlib.import_module('hello' )
相当于
钩子函数 钩子函数、注册函数、回调函数,他们的概念其实是一样的。
钩子函数,顾名思义,就是把我们自己实现的hook函数 在某一时刻挂接 到目标挂载点 上。
钩子函数是指在执行函数和目标函数之间挂载的函数, 框架开发者给调用方提供一个point -挂载点, 至于挂载什么函数有我们调用方决定, 这样大大提高了灵活性
1. hook函数 ,就是我们自己实现的函数,函数类型与挂载点匹配(返回值,参数列表)2. 挂接 ,也就是hook或者叫注册(register),使得hook函数对目标可用3. 目标挂载点 ,也就是挂我们hook函数的地方(我们想在这个目标点实现我们自己的功能)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 import time class LazyPerson (object ): def __init__ (self, name ): self.name = name self.watch_tv_func = None self.have_dinner_func = None def get_up (self ): print ("%s get up at:%s" % (self.name, time.time())) def go_to_sleep (self ): print ("%s go to sleep at:%s" % (self.name, time.time())) def register_tv_hook (self, watch_tv_func ): self.watch_tv_func = watch_tv_func def register_dinner_hook (self, have_dinner_func ): self.have_dinner_func = have_dinner_func def enjoy_a_lazy_day (self ): self.get_up() time.sleep(3 ) if self.watch_tv_func is not None : self.watch_tv_func(self.name) else : print ("no tv to watch" ) time.sleep(3 ) if self.have_dinner_func is not None : self.have_dinner_func(self.name) else : print ("nothing to eat at dinner" ) time.sleep(3 ) self.go_to_sleep() def watch_daydayup (name ): print ("%s : The program ---day day up--- is funny!!!" % name) def watch_happyfamily (name ): print ("%s : The program ---happy family--- is boring!!!" % name) def eat_meat (name ): print ("%s : The meat is nice!!!" % name) def eat_hamburger (name ): print ("%s : The hamburger is not so bad!!!" % name) if __name__ == "__main__" : lazy_tom = LazyPerson("Tom" ) lazy_jerry = LazyPerson("Jerry" ) lazy_tom.register_tv_hook(watch_daydayup) lazy_tom.register_dinner_hook(eat_meat) lazy_jerry.register_tv_hook(watch_happyfamily) lazy_jerry.register_dinner_hook(eat_hamburger) lazy_tom.enjoy_a_lazy_day() lazy_jerry.enjoy_a_lazy_day()
为什么需要钩子
大家思考一下上面这个例子,左键按下方法具体的逻辑是由框架自身去实现,还是由我们用户(调用者)去实现呢?显然应该由我们自己去实现。要提供通用的框架能力,框架自身去实现该方法功能,是没有意义的,所以框架给提供一个挂载的point,把具体逻辑的实现交给用户就好了,灵活可用。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 class APP : def __init__ (self ): self.before_request_list = [] self.after_request_list = [] def request (self ): for func in self.before_request_list: func() self._request() for func in self.after_request_list: func() def _request (self ): print ('requesting' ) def before_request (self,func ): self.before_request_list.append(func) def after_request (self,func ): self.after_request_list.append(func) a = APP() @a.before_request def request_request_func (): print ('request before1 ....' ) @a.before_request def request_request_func2 (): print ('request before2 ....' ) @a.after_request def request_request_func (): print ('request before ....' ) a.request()
任何不开放给客户端代码使用的方法或属性,应该有一个下划线前缀 。gitlab-cd自动部署 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 services: - redis:alpine variables: PIP_CACHE_DIR: "$CI_PROJECT_DIR/.cache/pip" cache: key: $CI_JOB_NAME-$CI_COMMIT_REF_SLUG paths: - ${PIP_CACHE_DIR} stages: - test - deploy test_job: stage: test image: python:3.7 before_script: - pip install -r test-requirements.txt -i http://mirrors.aliyun.com/pypi/simple/ --trusted-host mirrors.aliyun.com script: - echo "Test started." - coverage run --source app -m pytest - coverage report - echo "Test finished." deploy_job: stage: deploy when: manual only: - master image: "instrumentisto/rsync-ssh" before_script: - target_address=172.16.1.61 - ssh-keyscan ${target_address} > ~/.ssh/known_hosts - chmod 644 ~/.ssh/known_hosts script: - echo "Deploy started." - project_name=op-gateway - stage_srv=root@${target_address} - work_dir=/root/apps/operat-platform - rsync -arzv --exclude='*.git' ./ ${stage_srv}:${work_dir}/${project_name} - ssh ${stage_srv} "chmod 755 ${work_dir}/${project_name}" - ssh ${stage_srv} "docker-compose -f ${work_dir}/${project_name}/docker-compose.yml up -d --build" - ssh ${stage_srv} "docker-compose -f ${work_dir}/${project_name}/docker-compose.yml ps" - echo "Deploy finished."
网关,网桥
通过字面意思解释就是网络的关口。从技术角度来解释,就是连接两个不同网络的接口。
比如局域网的共享上网服务器就是局域网和广域网的接口。
网关工作在第三层,并且有不可逆性。也就是说,局域网用户可以通过网关直接访问广域网;而广域网用户却无法通过该网关来直接访问局域网用户
网桥,即网络的桥接
。也是用来连接两个网络
的,但是网桥有一个特点,就是网桥有自己独立的IP地址 。既然这样那么它就可以达到路由的作用。也就是说,通过网桥的连接(在设置正确的情况下),可是使两个网络的互相访问是对等的。
网桥工作在数据链路层,将两个局域网(LAN)连起来,根据MAC地址(物理地址)来转发帧,可以看作一个低层的路由器
(路由器工作在网络层,根据网络地址,如IP地址进行转发)。它可以有效地联接两个LAN,使本地通信限制在本网段内,并转发相应的信号至另一网段
网桥通常用于联接数量不多的、同一类型 的网段。网关可以使用不同的格式、通信协议或结构连接起两个系统。
网桥连接网络是相似的 , 网关却不一定
装饰器 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 def print_msg (): msg = "I'm closure" def printer (): print (msg) return printer closure = print_msg() closure()
也就是说 , 只有在装饰函数执行的时候 , 装饰的闭包内容才会执行(只有执行了closure() , printer函数内容才会执行)
docker出现某个文件不存在的错误 去看看docker-compose , 看是否开启的volume
如果要下载自己git仓库的东西 , 不需要git remote
, 直接git clone
即可 docker-compose的command如果不使用多条语句 , 不能使用列表 1 2 ERROR: for hyl_zhangkun_oa Cannot start service hyl_zhangkun_oa: OCI runtime create failed: container_linux.go:346: starting container process caused "exec: \"gunicorn -c gunicorn.py app:employee_oa_app\": executable file not found in $PATH": unknown ERROR: Encountered errors while bringing up the project.
远程数据库连接数溢出 1 sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) FATAL: remaining connection slots are reserved for non-replication superuser connections
在测试的时候__bind_key__
还是要绑定相同的键 , 但是连接的数据库不同 1 2 3 4 5 class EmployeeModel (db.Model ): """员工模型""" __tablename__ = "zkyouxi_employee" __bind_key__ = 'hyl_zkyouxi_oa'
1 2 3 4 5 6 7 class TestConfig (object ): "test config class" DEBUG = True SQLALCHEMY_BINDS = { "hyl_zkyouxi_oa" : 'postgresql://intern:intern@172.16.0.5:54322/hyl_zkyouxi_oa_test' }
1 2 3 4 5 6 7 8 class DevConfig (object ): "dev config class" DEBUG = True SQLALCHEMY_BINDS = { "hyl_zkyouxi_oa" : os.environ.get('POSTGRES_URL' ) or 'postgresql://intern:intern@172.16.0.5:54322/hyl_zkyouxi_oa' }
要检测值 不要写成 :
1 2 3 @classmethod def get_emp (cls, emp_id ): return cls.query.get_or_404(emp_id)
而是
1 2 3 4 5 6 @classmethod def get_by_id (cls, id ): if any ((isinstance (id , str ) and id .isdigit(), isinstance (id , (int , float ))), ): return cls.query.get(int (id )) return None
在gitlab.ci时 , SQLALCHEMY_DATABASE_URI的host修改改成非本地 1 2 3 4 5 SQLALCHEMY_DATABASE_URI = "postgresql://postgres:liangbo4869@127.0.0.1:5432/zkyouxi_oa_test" SQLALCHEMY_DATABASE_URI = "postgresql://postgres:liangbo4869@postgres/zkyouxi_oa_test"
我们在设计表的时候力争主键ID是不会用上的(不能将对象的属性设置为PK) 比如说员工的员工号 , 我们可以设计 :
1 2 3 4 5 6 7 class Employee (db.Model, CRUDMixin ): '''员工的模型类''' __tablename__ = 'employees' id = db.Column(db.Integer, primary_key=True ) eid = db.Column(db.String(50 ), unique=True , index=True )
不要让id和eid共用一个
也就是说 , 力求让主键
的唯一作用就是 : 唯一标识 , 他是不参与展示的 (比如说不会在template中,只会出现在template的url中)
不能将对象的属性设置为PK
在包装类使用完整的文档注释 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 class CRUDMixin (object ): '''CRUD Mixin 数据库CRUD操作的包装类 参考:https://github.com/cburmeister/flask-bones/blob/master/app/database.py ''' __table_args__ = {'extend_existing' : True } @classmethod def get_by_id (cls, id ): '''以id查询 Args: cls: 模型类 id: 数据id,要求可转换为数字 Returns: 若id存在,返回找到的实例;否则返回None. ''' if any ((isinstance (id , str ) and id .isdigit(), isinstance (id , (int , float ))),): return cls.query.get(int (id )) return None @classmethod def get_by (cls, **kwargs ): '''以各属性值相等进行单项查询 Args: cls: 模型类 **kwargs: 模型各属性值 Returns: 若存在符合查询的数据,返回首个实例;否则返回None. ''' return cls.query.filter_by(**kwargs).first() @classmethod def all (cls ): '''返回模型所有数据 Args: cls: 模型类 Returns: list: 返回实例列表 ''' return cls.query.all () @classmethod def create (cls, **kwargs ): '''创建新的实例并插入 Args: cls: 模型类 **kwargs: 模型各属性值 Returns: 返回创建得到的实例 ''' instance = cls(**kwargs) return instance.save() def update (self, commit=True , **kwargs ): '''更新实例各项属性 Args: self: 实例 commit (bool): 是否提交到数据库 **kwargs: 更新的属性值 Returns: 返回更新后的实例 ''' for attr, value in kwargs.items(): setattr (self, attr, value) return commit and self.save() or self def save (self, commit=True ): '''保存实例 Args: self: 实例 commit (bool): 是否提交到数据库 Returns: 返回实例 ''' db.session.add(self) if commit: db.session.commit() return self def delete (self, commit=True ): '''删除数据 Args: self: 实例 commit (bool): 是否提交到数据库 Returns: bool: 返回是否成功提交到数据库 ''' db.session.delete(self) return commit and db.session.commit()
不要在template里手写submit标签 1 <button type ="submit" class ="btn btn-block btn-primary" > 添加</button >
1 2 3 4 5 6 class LoginForm (FlaskForm ): '''登录表单类''' username = StringField('用户名' , validators=[DataRequired()]) password = PasswordField('密码' , validators=[DataRequired()]) remember_me = BooleanField('下次自动登录' ) submit = SubmitField('登录' )
在Model中写字段的文档注释 1 2 3 4 5 6 7 8 9 class Department (db.Model ): """ 部门模型 -- 部门id -- 部门号 -- 外键 -- 所属部门(对应id),0為父部門 -- 是否有子部门 """ __tablename__ = "zk_department"
macro中不能使用**kwargs
参数和*args
参数 1 2 3 4 5 6 7 8 9 10 11 12 13 {% macro render_field(field) %} {% with errors = field.errors %} <div class="form-group{{ 'has-error' if errors }}"> {{ field.label(class="control-label") }} {{ field(class='form-control', **kwargs) }} {% if errors %} {% for error in errors %} <span class="help-block" style="color: red">{{ error }}</span> {% endfor %} {% endif %} </div> {% endwith %} {% endmacro %}
分页的做法 1 2 3 4 5 6 7 8 9 @user_router.route("/" , methods=['GET' ] ) def user_index (): '''用户首页视图''' page = request.args.get('page' , 1 , type =int ) pagination = User.users_pagination_display(page) users = pagination.items return render_template("user/user_index.html" , users=users, pagination=pagination)
1 2 3 4 @classmethod def users_pagination_display (cls, page ): users = cls.query.order_by("id" ).paginate(page, per_page=10 ) return users
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 {% for item in users %} <tr> <td> <a href="{{ url_for('user.user_edit',user_id=item.id) }}" target="_blank"> {{ item.id }} </a> </td> <td>{{ item.realname }}</td> <td>{{ item.phone }}</td> <td>{{ item.get_department_display() }}</td> <td>{{ item.get_career_display() }}</td> <td>{{ item.position }}</td> <td>{{ item.get_status_display() }}</td> </tr> {% endfor %}
1 2 3 4 5 <div class="card-footer clearfix"> {% if pagination %} {{ macros.pagination_widget(pagination, '.user_index') }} {% endif %} </div>
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 {% macro pagination_widget(pagination, endpoint) %} <ul class="pagination pagination-sm"> <li {% if not pagination.has_prev %} class="page-item"{% endif %}> <a href=" {% if pagination.has_prev %}{{ url_for(endpoint, page=pagination.prev_num, **kwargs) }}{% else %}#{% endif %}" class="page-link">«</a> </li> {% for p in pagination.iter_pages() %} {% if p %} {% if p == pagination.page %} <li class="page-item active"> <a href="{{ url_for(endpoint, page = p, **kwargs) }}" class="page-link">{{ p }}</a> </li> {% else %} <li class="page-item"> <a href="{{ url_for(endpoint, page = p, **kwargs) }}" class="page-link">{{ p }}</a> </li> {% endif %} {% else %} <li class="page-item disabled"><a href="#" class="page-link">…</a></li> {% endif %} {% endfor %} <li{% if not pagination.has_next %} class="page-item" {% endif %}> <a href=" {% if pagination.has_next %}{{ url_for(endpoint, page=pagination.next_num, **kwargs) }}{% else %}#{% endif %}" class="page-link">»</a> </li> </ul> {% endmacro %}
web_args插件 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 from flask import Flaskfrom webargs import fieldsfrom webargs.flaskparser import use_argsapp = Flask(__name__) hello_args = { 'name' : fields.Str(required=True ) } @app.route('/' ) @use_args(hello_args ) def index (args ): return 'Hello ' + args['name' ]
1 2 3 4 5 6 class UpdateDepartmentSchema (Schema ): '''更新部门数据的参数Schema''' id = Integer(required=True , validate=lambda i: \ Department.get_by_id(i) is not None ) name = String(allow_none=True , validate=Length( max =Department.name.type .length))
蓝图 作为初学flask的我们,可以将蓝图理解为没有run方法的Flask对象
多功能类的设计思路 1 2 3 4 5 6 7 8 class Response ( BaseResponse, ETagResponseMixin, ResponseStreamMixin, CommonResponseDescriptorsMixin, WWWAuthenticateMixin, ): pass
设计多个Mixin类
然后让不同功能的类继承不同的Mixin类即可
所以像我们的爬虫 , 可以设计多个RequestMxin类(比如说post请求 , get请求),多个pareser类,(比如说re解析 , Xpath解析 , beatifulsoup解析等等,还可以让他们支持链式操作)
注意这些Mixin
类不应该有__init__
方法
类似于
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 import refrom lxml import etreefrom pyquery import PyQuery as pqclass ReParserMixin : def _re_parse (self, func, wb_data, pattern ): return getattr (re.compile (pattern), func)(wb_data) def re_iter (self, wb_data, pattern ): yield from self._re_parse('finditer' , wb_data, pattern) def re_all (self, wb_data, pattern ): return self._re_parse('findall' , wb_data, pattern) def re_match (self, wb_data, pattern ): return self._re_parse('match' , wb_data, pattern) def re_search (self, wb_data, pattern ): return self._re_parse('search' , wb_data, pattern) class XpathParserMixin : def xpath_to_ele (self, wb_data, xpath ): yield from etree.HTML(wb_data).xpath(xpath) def xpath_ele_to_text (self, ele ): return ele.text def _xpath_to_text (self, wb_data, xpath ): for ele in self.xpath_to_ele(wb_data, xpath): yield self.xpath_ele_to_text(ele) def xpath_to_text (self, wb_data, xpath ): return '' .join(self._xpath_to_text(wb_data, xpath)) class CssParserMixin : def css_to_ele (self, wb_data, css ): yield from pq(wb_data)(css).items() def css_ele_to_text (self, ele ): return ele.text() def _css_to_text (self, wb_data, css ): for ele in self.css_to_ele(wb_data, css): yield self.css_ele_to_text(ele) def css_to_text (self, wb_data, css ): return '' .join(self._css_to_text(wb_data, css)) class BaseParser (ReParserMixin, XpathParserMixin, CssParserMixin ): def __init__ (self, wb_data ): self.wb_data = wb_data def xpath (self, xpath ): return self.xpath_to_text(self.wb_data, xpath) def css (self, css ): return self.css_to_text(self.wb_data, css) def re (self, re_stirng ): return self.re_all(self.wb_data, re_stirng) class Parser (BaseParser ): pass
win中启动celery 1 2 python run_celery.py worker -l info --pool=solo python run_celery.py beat -A app