开发进阶Flask的基础

剧情大概:

一.SQLAlchemy介绍

SQLAlchemy是三个依照Python实现的O哈弗M框架。该框架建立在
DB
API之上,使用关系对象映射实行数据库操作,简言之正是:将类和目标转换到SQL,然后利用数据API执行SQL并获得执行结果。

?

1
pip3 install sqlalchemy

亚洲必赢手机入口 1

组成都部队分:

  • Engine,框架的引擎
  • Connection Pooling
    ,数据库连接池
  • Dialect,选用总是数据库的DB
    API连串
  • Schema/Types,架构和档次
  • SQL Exprression
    Language,SQL表明式语言

SQLAlchemy本人不能操作数据库,其必须以来pymsql等第一方插件,Dialect用于和数据API实行调换,依据安插文件的差异调用不一致的数据库API,从而实现对数据库的操作,如:

SQLAlchemy用三个字符串表示连接消息:

'数据库类型+数据库驱动名称://用户名:口令@机器地址:端口号/数据库名'

 

?

1
2
3
4
5
6
7
8
9
10
11
12
13
MySQL-Python
    mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname>
    
pymysql
    mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>]
    
MySQL-Connector
    mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname>
    
cx_Oracle
    oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...]
    
更多:http://docs.sqlalchemy.org/en/latest/dialects/index.html

目录

一、pymysql

二、SQLAlchemy

亚洲必赢手机入口 ,目录

一、pymysql

二、SQLAlchemy

  1. SQLAlchemy
  2. flsak-sqlalchemy
  3. flask-script
  4. flask-migrate
  5. Flask的目录结构

二. 使用

一、pymysql

pymsql是Python中操作MySQL的模块,其应用办法和MySQLdb大致同样。

一、pymysql

pymsql是Python中操作MySQL的模块,其行使办法和MySQLdb差不离等同。

 

1. 进行原生SQL语句

import time
import threading
import sqlalchemy
from sqlalchemy import create_engine
from sqlalchemy.engine.base import Engine

engine = create_engine(
    "mysql+pymysql://root:123@127.0.0.1:3306/t1?charset=utf8",
    max_overflow=0,  # 超过连接池大小外最多创建的连接
    pool_size=5,  # 连接池大小
    pool_timeout=30,  # 池中没有线程时,最多等待的时间,超时报错,默认30秒
    pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置),-1代表永远不回收,即一直被重用
)


def task(arg):
    conn = engine.raw_connection()  #拿到的是一个原生的pymysql连接对象
    cursor = conn.cursor()
    cursor.execute(
        "select * from t1"
    )
    result = cursor.fetchall()
    cursor.close()
    conn.close()


for i in range(20):
    t = threading.Thread(target=task, args=(i,))
    t.start()

  

亚洲必赢手机入口 2亚洲必赢手机入口 3

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import time
import threading
import sqlalchemy
from sqlalchemy import create_engine
from sqlalchemy.engine.base import Engine

engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/t1", max_overflow=0, pool_size=5)


def task(arg):
    conn = engine.contextual_connect()
    with conn:
        cur = conn.execute(
            "select * from t1"
        )
        result = cur.fetchall()
        print(result)


for i in range(20):
    t = threading.Thread(target=task, args=(i,))
    t.start()

View Code

亚洲必赢手机入口 4亚洲必赢手机入口 5

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import time
import threading
import sqlalchemy
from sqlalchemy import create_engine
from sqlalchemy.engine.base import Engine
from sqlalchemy.engine.result import ResultProxy
engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/t1", max_overflow=0, pool_size=5)


def task(arg):
    cur = engine.execute("select * from t1")
    result = cur.fetchall()
    cur.close()
    print(result)


for i in range(20):
    t = threading.Thread(target=task, args=(i,))
    t.start()

View Code

留意: 查看连接,进程cmd,mysql中>输入 
show status like ‘Threads%’;

1. 下载安装

#在终端直接运行
pip3 install pymysql

1. 下载安装

#在终端直接运行
pip3 install pymysql

一、SQLAlchemy

2. ORM

开发进阶Flask的基础。2. 运用操作

a. 执行SQL

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import pymysql
  
# 创建连接
conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='123', db='t1')
# 创建游标
cursor = conn.cursor()
  
# 执行SQL,并返回受影响行数
effect_row = cursor.execute("update hosts set host = '1.1.1.2'")
  
# 执行SQL,并返回受影响行数
#effect_row = cursor.execute("update hosts set host = '1.1.1.2' where nid > %s", (1,))
  
# 执行SQL,并返回受影响行数
#effect_row = cursor.executemany("insert into hosts(host,color_id)values(%s,%s)", [("1.1.1.11",1),("1.1.1.11",2)])
  
  
# 提交,不然无法保存新建或者修改的数据
conn.commit()
  
# 关闭游标
cursor.close()
# 关闭连接
conn.close()

b. 得到新创立数据自增ID

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import pymysql
  
conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='123', db='t1')
cursor = conn.cursor()
cursor.executemany("insert into hosts(host,color_id)values(%s,%s)", [("1.1.1.11",1),("1.1.1.11",2)])
conn.commit()

# 获取最新自增ID
new_id = cursor.lastrowid

cursor.close()
conn.close()

c. 得到查询数据

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import pymysql
  
conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='123', db='t1')
cursor = conn.cursor()
cursor.execute("select * from hosts")
  
# 获取第一行数据
row_1 = cursor.fetchone()
  
# 获取前n行数据
# row_2 = cursor.fetchmany(3)
# 获取所有数据
# row_3 = cursor.fetchall()
  
conn.commit()
cursor.close()
conn.close()

注:在fetch数据时遵从顺序进行,能够动用cursor.scroll(num,mode)来移动游标地方,如:

  • cursor.scroll(1,mode=’relative’)     # 相对当前岗位移动
  • cursor.scroll(2,mode=’absolute’)   # 相对相对地方移动

d. fetch数据类型

关于默许获取的多少是元组类型,如若想要得到字典类型的数据,即:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import pymysql
  
conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='123', db='t1')
  
# 游标设置为字典类型
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
r = cursor.execute("call p1()")
  
result = cursor.fetchone()
  
conn.commit()
cursor.close()
conn.close()

2. 选择操作

a. 执行SQL

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import pymysql
  
# 创建连接
conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='123', db='t1')
# 创建游标
cursor = conn.cursor()
  
# 执行SQL,并返回受影响行数
effect_row = cursor.execute("update hosts set host = '1.1.1.2'")
  
# 执行SQL,并返回受影响行数
#effect_row = cursor.execute("update hosts set host = '1.1.1.2' where nid > %s", (1,))
  
# 执行SQL,并返回受影响行数
#effect_row = cursor.executemany("insert into hosts(host,color_id)values(%s,%s)", [("1.1.1.11",1),("1.1.1.11",2)])
  
  
# 提交,不然无法保存新建或者修改的数据
conn.commit()
  
# 关闭游标
cursor.close()
# 关闭连接
conn.close()

b. 获得新成立数据自增ID

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import pymysql
  
conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='123', db='t1')
cursor = conn.cursor()
cursor.executemany("insert into hosts(host,color_id)values(%s,%s)", [("1.1.1.11",1),("1.1.1.11",2)])
conn.commit()

# 获取最新自增ID
new_id = cursor.lastrowid

cursor.close()
conn.close()

c. 得到查询数据

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import pymysql
  
conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='123', db='t1')
cursor = conn.cursor()
cursor.execute("select * from hosts")
  
# 获取第一行数据
row_1 = cursor.fetchone()
  
# 获取前n行数据
# row_2 = cursor.fetchmany(3)
# 获取所有数据
# row_3 = cursor.fetchall()
  
conn.commit()
cursor.close()
conn.close()

注:在fetch数据时服从顺序举行,能够利用cursor.scroll(num,mode)来移动游标地方,如:

  • cursor.scroll(1,mode=’relative’)     # 绝对当前地点移动
  • cursor.scroll(2,mode=’absolute’)   # 相对相对地点移动

d. fetch数据类型

至于暗许获取的多少是元组类型,假若想要得到字典类型的多寡,即:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import pymysql
  
conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='123', db='t1')
  
# 游标设置为字典类型
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
r = cursor.execute("call p1()")
  
result = cursor.fetchone()
  
conn.commit()
cursor.close()
conn.close()

1、概述

SQLAlchemy是多个OOdysseyM的框架,O奥迪Q5M就是关联对象映射,具体能够参考Django中的O卡宴M。

职能:援救大家使用类和指标连忙达成数据库操作

 

数据库:

  -原生:MYSQLdb       pymysql

  差距正是 MYSQLdb 不支持python3   pymysql 都帮助

ORM框架

  SQLAlchemy

a. 创立数量库表

二、SQLAlchemy

SQLAlchemy是Python编制程序语言下的一款O奥迪Q5M框架,该框架建立在数据库API之上,使用关系对象映射举办数据库操作,简言之正是:将目的转换到SQL,然后选用数据API执行SQL并获取执行结果。

二、SQLAlchemy

SQLAlchemy是Python编制程序语言下的一款OCR-VM框架,该框架建立在数据库API之上,使用关系对象映射进行数据库操作,简言之正是:将对象转换来SQL,然后选拔数据API执行SQL并拿走执行结果。

2、SQLAlchemy用法  

1、安装

pip3 install sqlalchemy

2、配置

亚洲必赢手机入口 6亚洲必赢手机入口 7

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column
from sqlalchemy import Integer,String,Text,Date,DateTime
from sqlalchemy import create_engine


Base = declarative_base()

class Users(Base):
    __tablename__ = 'users'

    id = Column(Integer, primary_key=True)
    name = Column(String(32), index=True, nullable=False)


def create_all():
    engine = create_engine(
        "mysql+pymysql://root:123@127.0.0.1:3306/duoduo123?charset=utf8",
        max_overflow=0,  # 超过连接池大小外最多创建的连接
        pool_size=5,  # 连接池大小
        pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
        pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
    )

    Base.metadata.create_all(engine)

def drop_all():
    engine = create_engine(
        "mysql+pymysql://root:123@127.0.0.1:3306/duoduo123?charset=utf8",
        max_overflow=0,  # 超过连接池大小外最多创建的连接
        pool_size=5,  # 连接池大小
        pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
        pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
    )
    Base.metadata.drop_all(engine)

if __name__ == '__main__':
    create_all()

配置

③ 、增加和删除改查的示范

亚洲必赢手机入口 8亚洲必赢手机入口 9

from duoduo.test import Users
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine


engine = create_engine(
        "mysql+pymysql://root:123@127.0.0.1:3306/duoduo123?charset=utf8",
        max_overflow=0,  # 超过连接池大小外最多创建的连接
        pool_size=5,  # 连接池大小
        pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
        pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
    )



SessionFactory = sessionmaker(bind=engine)

#根据Users类对user表进行增删改查
session=SessionFactory()
#增加
#单个
# obj=Users(name='many qian')
# session.add(obj)
# session.commit()
#多个
# session.add_all([
#         Users(name='大娃'),
#         Users(name='二娃')
# ])
# session.commit()

#查  所有
# tes=session.query(Users).all()
# print(tes) #拿到的对象
# for row in tes:
#         print(row.id,row.name)

# tes=session.query(Users).filter(Users.id==3) #> < >= <=
#
# for row in tes:
#     print(row.id,row.name)

#
# tes=session.query(Users).filter(Users.id<=3).first()#> < >= <=
#
# print(tes.id,tes.name)

#删除
# tes=session.query(Users).filter(Users.id==3).delete() #> < >= <=
# session.commit()

#改
# session.query(Users).filter(Users.id ==4).update({Users.name:'三娃'})
# session.query(Users).filter(Users.id ==4).update({'name':'四娃'})
# session.query(Users).filter(Users.id ==4).update({'name':Users.name+'NICE'},synchronize_session=False)
# session.commit()
#
#
# session.close()

增加和删除改查

 肆 、单表常用操作

亚洲必赢手机入口 10亚洲必赢手机入口 11

# ############################## 其他常用 ###############################
# 1. 指定列
# select id,name as cname from users;
# result = session.query(Users.id,Users.name.label('cname')).all()
# for item in result:
#         print(item[0],item.id,item.cname)
# 2. 默认条件and
# session.query(Users).filter(Users.id > 1, Users.name == 'duoduo').all()
# 3. between
# session.query(Users).filter(Users.id.between(1, 3), Users.name == 'duoduo').all()
# 4. in
# session.query(Users).filter(Users.id.in_([1,3,4])).all()
# session.query(Users).filter(~Users.id.in_([1,3,4])).all()
# 5. 子查询
# session.query(Users).filter(Users.id.in_(session.query(Users.id).filter(Users.name=='duoduo'))).all()
# 6. and 和 or
# from sqlalchemy import and_, or_
# session.query(Users).filter(Users.id > 3, Users.name == 'duoduo').all()
# session.query(Users).filter(and_(Users.id > 3, Users.name == 'duoduo')).all()
# session.query(Users).filter(or_(Users.id < 2, Users.name == 'duoduo')).all()
# session.query(Users).filter(
#     or_(
#         Users.id < 2,
#         and_(Users.name == 'duoduo', Users.id > 3),
#         Users.extra != ""
#     )).all()

# 7. filter_by
# session.query(Users).filter_by(name='duoduo').all()

# 8. 通配符
# ret = session.query(Users).filter(Users.name.like('d%')).all()   #%任何的东西
# ret = session.query(Users).filter(~Users.name.like('d_')).all()   #_ 只有一个字符

# 9. 切片
# result = session.query(Users)[1:2]

# 10.排序
# ret = session.query(Users).order_by(Users.name.desc()).all()
# ret = session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all()  #desc从大到小  asc从小到大排

# 11. group by
from sqlalchemy.sql import func

# ret = session.query(
#         Users.depart_id,
#         func.count(Users.id),
# ).group_by(Users.depart_id).all()
# for item in ret:
#         print(item)
#
# from sqlalchemy.sql import func
#进行二次筛选,只能用having
# ret = session.query(
#         Users.depart_id,
#         func.count(Users.id),
# ).group_by(Users.depart_id).having(func.count(Users.id) >= 2).all()
# for item in ret:
#         print(item)

# 12.union 去重 和 union all 上下拼接不去重
"""
select id,name from users
UNION
select id,name from users;
"""
# q1 = session.query(Users.name).filter(Users.id > 2)
# q2 = session.query(Users.name).filter(Users.id > 2)
# ret = q1.union(q2).all()
#
# q1 = session.query(Users.name).filter(Users.id > 2)
# q2 = session.query(Users.name).filter(Users.id > 2)
# ret = q1.union_all(q2).all()


session.close()

单表常用操作

伍 、连表常用操作

在地点的布置里,重新搞两给表

from sqlalchemy import ForeignKey
from sqlalchemy.orm import relationship
#按照上面配置的代码加上这些东西,配置的表删除 
class Depart(Base):
    __tablename__ = 'depart'
    id = Column(Integer, primary_key=True)
    title = Column(String(32), index=True, nullable=False)

class Users(Base):
    __tablename__ = 'users'

    id = Column(Integer, primary_key=True)
    name = Column(String(32), index=True, nullable=False)
    depart_id = Column(Integer,ForeignKey("depart.id"))
    #跟表结构无关
    dp = relationship("Depart", backref='pers')

上边是实例演示:此前本人加上一点数量:

亚洲必赢手机入口 12亚洲必赢手机入口 13

#django的manytomany  在flask中两个ForeignKey完成
from duoduo.test import Users,Depart,Student,Course,Student2Course
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine


engine = create_engine(
        "mysql+pymysql://root:123@127.0.0.1:3306/duoduo123?charset=utf8",
        max_overflow=0,  # 超过连接池大小外最多创建的连接
        pool_size=5,  # 连接池大小
        pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
        pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
    )



SessionFactory = sessionmaker(bind=engine)

#根据Users类对user表进行增删改查
session=SessionFactory()

# #1、查询所有用户
# # ret =session.query(Users).all()
# #
# # for i in ret:
# #     print(i.id,i.name,i.depart_id)

#2、查询所有用户,所属部门名称
# ret=session.query(Users.id,Users.name,Depart.title).join(Depart,User.depart_id==Depart.id).all()
#  #这里有默认ForeignKey ,User.depart_id==Depart.id 可以不加也行
# for i in ret:
#     print(i.id ,i.name,i.title)

#SELECT users.id AS users_id, users.name AS users_name, depart.title AS depart_title
#FROM users INNER JOIN depart ON depart.id = users.depart_id
# q=session.query(Users.id,Users.name,Depart.title).join(Depart)
# print(q)

#3、relation字段:查询所有用户+所有部门名称
# ret=session.query(Users).all()
# for row in ret:
#     print(row.id,row.name,row.depart_id,row.dp.title)

#4、relation字段:查询销售部所有人员
# obj=session.query(Depart).filter(Depart.title =='大娃').first()
# # for i in obj.pers:
# #     print(i.id,i.name,obj.title)

#5、创建一个名称叫:IT部门,再在该部门中添加一个员工叫:多多
#方式一:
# d1=Depart(title='IT')
# session.add(d1)
# session.commit()
#
# u1=Users(name='duoduo',depart_id=d1.id)
# session.add(u1)
# session.commit()
#方式二:
# u1=Users(name='多多1',dp=Depart(title='IT'))
# session.add(u1)
# session.commit()


#6、创建一个部门叫王者荣耀,这个部门添加多个员工:亚瑟,后裔,貂蝉

# d1=Depart(title='王者荣耀')
# d1.pers=[Users(name='亚瑟'),Users(name='后裔'),Users(name='貂蝉')]
#
# session.add(d1)
# session.commit()


#1、录入数据
# session.add_all([
#     Student(name='大娃'),
#     Student(name='二娃'),
#     Course(title='物理'),
#     Course(title='化学'),
#
# ])
# session.add_all([
#     Student2Course(student_id=2,course_id=1),
#     # Student2Course(student_id=1,course_id=2)
# ]
# )

#2、查每个人选了课程的名称,三张表进行关联
# ret=session.query(Student2Course.id,Student.name,Course.title).join(Student,Student2Course.student_id==Student.id,isouter=True).join(Course,Student2Course.course_id==Course.id,isouter=True).order_by(Student2Course.id.asc())
# for i in ret:
#     print(i)

#3、'大娃'所选的所有课

# ret=session.query(Student2Course.id,Student.name,Course.title).join(Student,Student2Course.student_id==Student.id,isouter=True).join(Course,Student2Course.course_id==Course.id,isouter=True).filter(Student.name=='大娃').order_by(Student2Course.id.asc())
#
# for i in ret:
#     print(i)

# obj=session.query(Student).filter(Student.name=='大娃').first()
# for item in obj.course_list:
#     print(item.title)
#4选了'化学'课程的所有人的名字
# obj=session.query(Course).filter(Course.title=='化学').first()
# for item in obj.student_list:
#     print(item.name)


#创建一个课程,创建2个学,两个学生选新创建的课程
# obj=Course(title='体育')
# obj.student_list=[Student(name='五娃'),Student(name='六娃')]
#
# session.add(obj)
# session.commit()








# session.close()

ForeignKey

亚洲必赢手机入口 14亚洲必赢手机入口 15

class Student(Base):
    __tablename__ = 'student'
    id = Column(Integer, primary_key=True)
    name = Column(String(32), index=True, nullable=False)

    course_list = relationship('Course', secondary='student2course', backref='student_list')

class Course(Base):
    __tablename__ = 'course'
    id = Column(Integer, primary_key=True)
    title = Column(String(32), index=True, nullable=False)

class Student2Course(Base):
    __tablename__ = 'student2course'
    id = Column(Integer, primary_key=True, autoincrement=True)
    student_id = Column(Integer, ForeignKey('student.id'))
    course_id = Column(Integer, ForeignKey('course.id'))

    __table_args__ = (
        UniqueConstraint('student_id', 'course_id', name='uix_stu_cou'), # 联合唯一索引
        # Index('ix_id_name', 'name', 'extra'),        # 联合索引
    )

末端新建的五个表

 连接的法子:

from duoduo.test import Users,Depart,Student,Course,Student2Course
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine


engine = create_engine(
        "mysql+pymysql://root:123@127.0.0.1:3306/duoduo123?charset=utf8",
        max_overflow=0,  # 超过连接池大小外最多创建的连接
        pool_size=5,  # 连接池大小
        pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
        pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
    )



SessionFactory = sessionmaker(bind=engine)


#连接
#第一种方式
# #并发
# def task():
#     #去连接池获取一个连接
#     session=SessionFactory()
#     ret=session.query(Student).all()
#     print(ret)
#     #将连接交还给连接池
#     session.close()
#
#
# from threading import Thread
#
# for i in range(20):
#     t=Thread(target=task)
#     t.start()
#

#第二种方式

from sqlalchemy.orm import scoped_session
session=scoped_session(SessionFactory)


def task():
    ret=session.query(Student).all()
    print(ret)
    session.remove()  #连接断开


from threading import Thread
for i in range(20):
    t=Thread(target=task)
    t.start()

实施原生SQ:

from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session
from models import Student,Course,Student2Course

engine = create_engine(
        "mysql+pymysql://root:123@127.0.0.1:3306/duoduo123?charset=utf8",
        max_overflow=0,  # 超过连接池大小外最多创建的连接
        pool_size=5,  # 连接池大小
        pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
        pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
    )
SessionFactory = sessionmaker(bind=engine)
session = scoped_session(SessionFactory)


def task():
    """"""
    # 方式一:
    """
    # 查询
    # cursor = session.execute('select * from users')
    # result = cursor.fetchall()

    # 添加
    cursor = session.execute('INSERT INTO users(name) VALUES(:value)', params={"value": 'duoduo'})
    session.commit()
    print(cursor.lastrowid)
    """
    # 方式二:
    """
    # conn = engine.raw_connection()
    # cursor = conn.cursor()
    # cursor.execute(
    #     "select * from t1"
    # )
    # result = cursor.fetchall()
    # cursor.close()
    # conn.close()
    """

    # 将连接交还给连接池
    session.remove()


from threading import Thread

for i in range(20):
    t = Thread(target=task)
    t.start()

 

创建单表

import datetime
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index

Base = declarative_base()   # 创建对象的基类:

# 定义User对象:
class Users(Base):
    # 表的名字:
    __tablename__ = 'users'

    # 表的结构:
    id = Column(Integer, primary_key=True)
    name = Column(String(32), index=True, nullable=False,default='xx')   # index指定是否是索引,nullable是否能为空
    email = Column(String(32), unique=True)   # 指定唯一
    ctime = Column(DateTime, default=datetime.datetime.now) #注意,此处设置时datetime.datetime.now若加了括号,则时间永远是程序启动时的时间,后面创建数据时,不会变化
    extra = Column(Text, nullable=True)

    __table_args__ = (
        UniqueConstraint('id', 'name', name='uix_id_name'), # 联合唯一索引
        Index('ix_id_name', 'name', 'email'), #给name和email创建普通索引,索引名为ix_id_name
    )


def init_db():
    """
    根据类创建数据库表
    :return: 
    """
    # 初始化数据库连接:
    engine = create_engine(
        "mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8",
        max_overflow=0,  # 超过连接池大小外最多创建的连接
        pool_size=5,  # 连接池大小
        pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
        pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
    )

    Base.metadata.create_all(engine) #找到所有继承了Base的类,按照其结构建表


def drop_db():
    """
    根据类删除数据库表
    :return: 
    """
    engine = create_engine(
        "mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8",
        max_overflow=0,  # 超过连接池大小外最多创建的连接
        pool_size=5,  # 连接池大小
        pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
        pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
    )

    Base.metadata.drop_all(engine)


if __name__ == '__main__':
    drop_db()
    init_db()

  

私下认可建的表的引擎是MyISAM,假设要设置成InnoDB(匡助理工科程师作),该怎么设置呢?

    __table_args__ = {
        'mysql_engine': 'InnoDB',   # 指定表的引擎
        'mysql_charset': 'utf8'     # 指定表的编码格式
    }

 

1. 下载安装

#在终端直接运行
pip3 install SQLAlchemy

1. 下载安装

#在终端直接运行
pip3 install SQLAlchemy

贰 、Flask的第①方组件

FK,M2M关系的创制

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime,UniqueConstraint, Index
from sqlalchemy.orm import relationship
Base = declarative_base()   # 创建对象的基类:


# ##################### 一对多示例 #########################
class Hobby(Base):
    __tablename__ = 'hobby'
    id = Column(Integer, primary_key=True)
    caption = Column(String(50), default='篮球')


class Person(Base):
    __tablename__ = 'person'
    nid = Column(Integer, primary_key=True)
    name = Column(String(32), index=True, nullable=True)
    hobby_id = Column(Integer, ForeignKey("hobby.id"))  #建FK关系

    # 与生成表结构无关,仅用于查询方便
    hobby = relationship("Hobby", backref='pers')   #反向关联的名字


# ##################### 多对多示例 #########################
# 这里多对多需要自己建第三张表,并绑定关系
class Server2Group(Base):   
    __tablename__ = 'server2group'
    id = Column(Integer, primary_key=True, autoincrement=True)  #autoincrement 设置自增
    server_id = Column(Integer, ForeignKey('server.id'))
    group_id = Column(Integer, ForeignKey('group.id'))


class Group(Base):
    __tablename__ = 'group'
    id = Column(Integer, primary_key=True)
    name = Column(String(64), unique=True, nullable=False)

    # 与生成表结构无关,仅用于查询方便
    servers = relationship('Server', secondary='server2group', backref='groups')    #反向关联的名字


class Server(Base):
    __tablename__ = 'server'

    id = Column(Integer, primary_key=True, autoincrement=True)
    hostname = Column(String(64), unique=True, nullable=False)


def init_db():
    """
    根据类创建数据库表
    :return:
    """
    engine = create_engine(
        "mysql+pymysql://root:123@127.0.0.1:3306/userinfo?charset=utf8",
        max_overflow=0,  # 超过连接池大小外最多创建的连接
        pool_size=5,  # 连接池大小
        pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
        pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
    )

    Base.metadata.create_all(engine)


def drop_db():
    """
    根据类删除数据库表
    :return:
    """
    engine = create_engine(
        "mysql+pymysql://root:123@127.0.0.1:3306/userinfo?charset=utf8",
        max_overflow=0,  # 超过连接池大小外最多创建的连接
        pool_size=5,  # 连接池大小
        pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
        pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
    )

    Base.metadata.drop_all(engine)


if __name__ == '__main__':
    drop_db()
    init_db()

 

SQLALchemy分歧于Django的O普拉多M,当创建多对多涉及事,不会自动创造第③张表,要求大家友好定义关系表,进行关联

 

2. SQLAlchemy正视关系

SQLAlchemy本身不能够操作数据库,其必须信赖pymsql等第1方插件,Dialect用于和数量API进行交流,根据配置文件的两样调用分化的数据库API,从而达成对数据库的操作。

亚洲必赢手机入口 16

 

 

MySQL-Python
    mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname>
   
pymysql
    mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>]
   
MySQL-Connector
    mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname>
   
cx_Oracle
    oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...]

更多详见:http://docs.sqlalchemy.org/en/latest/dialects/index``.html

2. SQLAlchemy依赖关系

SQLAlchemy本身不可能操作数据库,其必须重视pymsql等第2方插件,Dialect用于和数量API举行交换,根据配置文件的两样调用分化的数据库API,从而完成对数据库的操作。

亚洲必赢手机入口 17

 

 

MySQL-Python
    mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname>
   
pymysql
    mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>]
   
MySQL-Connector
    mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname>
   
cx_Oracle
    oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...]

更多详见:http://docs.sqlalchemy.org/en/latest/dialects/index``.html

1、flask-sqlalchemy

a、先下载安装

pip3 install flask-sqlalchemy

b、项目下的__init__.py导入

#第一步  :导入并实例化SQLALchemy
from flask_sqlalchemy import SQLAlchemy
db=SQLAlchemy()
#一定要在蓝图导入的上面,是全局变量
#也要导入表的models.py的所有表

c、初始化

db.init_app(app)  
#在注册蓝图的地方的下面

d、在安顿文件中写入配置

# ##### SQLALchemy配置文件 #####
    SQLALCHEMY_DATABASE_URI = "mysql+pymysql://root:123@127.0.0.1:3306/duoduo?charset=utf8"
    SQLALCHEMY_POOL_SIZE = 10
    SQLALCHEMY_MAX_OVERFLOW = 5

e、制造models.py中的类(对应数据库中的表)

from sqlalchemy import Column
from sqlalchemy import Integer,String,Text,Date,DateTime
from (项目目录) import db

class Users(db.Model):  #继承的db.Model
    __tablename__ = 'users'

    id = Column(Integer, primary_key=True)
    name = Column(String(32), index=True, nullable=False)
    # depart_id = Column(Integer)

f、生成表(使用app上下文)

from (项目目录) import db,create_app
app = create_app()
app_ctx = app.app_context() # app_ctx = app/g
with app_ctx: # __enter__,通过LocalStack放入Local中
    db.create_all() # 调用LocalStack放入Local中获取app,再去app中获取配置

补给二个知识点:

class  Foo(object):
    def __enter__(self):
        print('进入')

    def __exit__(self, exc_type, exc_val, exc_tb):
        print('出来')
obj =Foo()
with obj:  #with Foo()
    print('多多')

#结果 
#进入
#多多
#出来

g、基于O凯雷德M对数据库实行操作

from flask import Blueprint
from (项目目录) import db
from (项目目录) import models
us = Blueprint('us',__name__)

@us.route('/index')
def index():
    # 使用SQLAlchemy在数据库中插入一条数据
    # db.session.add(models.Users(name='多多',depart_id=1))     #插入数据
    # db.session.commit()
    # db.session.remove()
    result = db.session.query(models.Users).all()
    print(result)
    db.session.remove()
    return 'Index'

这里面db.session.add的源码:

亚洲必赢手机入口 18

 

亚洲必赢手机入口 19

 

b. 操作数据库表

#!/usr/bin/env python
# -*- coding:utf-8 -*-
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from models import Users

engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5)
Session = sessionmaker(bind=engine)    #创建Session类

# 每次执行数据库操作时,都需要创建一个session
session = Session()    # 创建session对象:

# ############# 执行ORM操作 #############
# 创建新User对象
obj1 = Users(name="alex1")    
# 添加到session:
session.add(obj1)
# 提交即保存到数据库:
session.commit()
# 关闭session
session.close()

 

3. O奥迪Q7M成效利用

运用 OWranglerM/Schema Type/SQL Expression
Language/Engine/ConnectionPooling/Dialect
全部组件对数据开始展览操作。根据类创设对象,对象转换来SQL,执行SQL。

a. 创建表

#!/usr/bin/env python
# -*- coding:utf-8 -*-
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import create_engine

#表明依赖关系并创建连接,最大连接数为5 
engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/t1", max_overflow=5)
 
Base = declarative_base()
 
# 创建单表
class Users(Base):
    __tablename__ = 'users'    # 表名
    id = Column(Integer, primary_key=True,autoincrement=True)    # id列,主键自增
    name = Column(String(32))    # name列
    extra = Column(String(16))    # extra列
 
    __table_args__ = (
    UniqueConstraint('id', 'name', name='uix_id_name'),    # 创建联合唯一索引
        Index('ix_id_name', 'name', 'extra'),    # 创建普通索引
    )
 
 
# 一对多
class Favor(Base):
    __tablename__ = 'favor'
    nid = Column(Integer, primary_key=True)
    caption = Column(String(50), default='red', unique=True)
 
 
class Person(Base):
    __tablename__ = 'person'
    nid = Column(Integer, primary_key=True)
    name = Column(String(32), index=True, nullable=True)
    favor_id = Column(Integer, ForeignKey("favor.nid"))    # 创建外键
 
 
# 多对多
class Group(Base):
    __tablename__ = 'group'
    id = Column(Integer, primary_key=True)
    name = Column(String(64), unique=True, nullable=False)
    port = Column(Integer, default=22)
 
 
class Server(Base):
    __tablename__ = 'server'
    id = Column(Integer, primary_key=True, autoincrement=True)
    hostname = Column(String(64), unique=True, nullable=False)
 
 
class ServerToGroup(Base):
    __tablename__ = 'servertogroup'
    nid = Column(Integer, primary_key=True, autoincrement=True)
    server_id = Column(Integer, ForeignKey('server.id'))    # 创建外键
    group_id = Column(Integer, ForeignKey('group.id'))    # 创建外键
 
 
def init_db():
    Base.metadata.create_all(engine)
 
 
def drop_db():
    Base.metadata.drop_all(engine)

注:设置外键的另一种办法 ForeignKeyConstraint([‘other_id’],
[‘othertable.other_id’])

b. 操作表

 

亚洲必赢手机入口 20亚洲必赢手机入口 21

#!/usr/bin/env python
# -*- coding:utf-8 -*-
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import create_engine

engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/t1", max_overflow=5)

Base = declarative_base()

# 创建单表
class Users(Base):
    __tablename__ = 'users'
    id = Column(Integer, primary_key=True)
    name = Column(String(32))
    extra = Column(String(16))

    __table_args__ = (
    UniqueConstraint('id', 'name', name='uix_id_name'),
        Index('ix_id_name', 'name', 'extra'),
    )

    def __repr__(self):
        return "%s-%s" %(self.id, self.name)

# 一对多
class Favor(Base):
    __tablename__ = 'favor'
    nid = Column(Integer, primary_key=True)
    caption = Column(String(50), default='red', unique=True)

    def __repr__(self):
        return "%s-%s" %(self.nid, self.caption)

class Person(Base):
    __tablename__ = 'person'
    nid = Column(Integer, primary_key=True)
    name = Column(String(32), index=True, nullable=True)
    favor_id = Column(Integer, ForeignKey("favor.nid"))
    # 与生成表结构无关,仅用于查询方便
    favor = relationship("Favor", backref='pers')

# 多对多
class ServerToGroup(Base):
    __tablename__ = 'servertogroup'
    nid = Column(Integer, primary_key=True, autoincrement=True)
    server_id = Column(Integer, ForeignKey('server.id'))
    group_id = Column(Integer, ForeignKey('group.id'))
    group = relationship("Group", backref='s2g')
    server = relationship("Server", backref='s2g')

class Group(Base):
    __tablename__ = 'group'
    id = Column(Integer, primary_key=True)
    name = Column(String(64), unique=True, nullable=False)
    port = Column(Integer, default=22)
    # group = relationship('Group',secondary=ServerToGroup,backref='host_list')

class Server(Base):
    __tablename__ = 'server'

    id = Column(Integer, primary_key=True, autoincrement=True)
    hostname = Column(String(64), unique=True, nullable=False)

def init_db():
    Base.metadata.create_all(engine)

def drop_db():
    Base.metadata.drop_all(engine)

Session = sessionmaker(bind=engine)
session = Session()

表结构 + 数据库连接

b.1 增

#单条增加
obj = Users(name="alex0", extra='sb')
session.add(obj)

#多条增加
session.add_all([
    Users(name="alex1", extra='sb'),
    Users(name="alex2", extra='sb'),
])

#提交
session.commit()

 

b.2 删

#先查询到要删除的记录,再delete
session.query(Users).filter(Users.id > 2).delete()
session.commit()

 

b.3 改

#先查询,再更新
session.query(Users).filter(Users.id > 2).update({"name" : "099"})    # 直接更改
session.query(Users).filter(Users.id > 2).update({Users.name: Users.name + "099"}, synchronize_session=False)    # 字符串拼接
session.query(Users).filter(Users.id > 2).update({"num": Users.num + 1}, synchronize_session="evaluate")    # 数字相加
session.commit()

 

b.4 查

 

ret = session.query(Users).all()
ret = session.query(Users.name, Users.extra).all()
ret = session.query(Users).filter_by(name='alex').all()
ret = session.query(Users).filter_by(name='alex').first()

ret = session.query(Users).filter(text("id<:value and name=:name")).params(value=224, name='fred').order_by(User.id).all()

ret = session.query(Users).from_statement(text("SELECT * FROM users where name=:name")).params(name='ed').all()

 

b.5 其它

 

# 条件
ret = session.query(Users).filter_by(name='alex').all()    # 条件内为关键字表达式
ret = session.query(Users).filter(Users.id > 1, Users.name == 'eric').all()    # 条件内为SQL表达式
ret = session.query(Users).filter(Users.id.between(1, 3), Users.name == 'eric').all()    # between
ret = session.query(Users).filter(Users.id.in_([1,3,4])).all()    # in
ret = session.query(Users).filter(~Users.id.in_([1,3,4])).all()    # not in
ret = session.query(Users).filter(Users.id.in_(session.query(Users.id).filter_by(name='eric'))).all()    # 子查询条件

from sqlalchemy import and_, or_
ret = session.query(Users).filter(and_(Users.id > 3, Users.name == 'eric')).all()    # and
ret = session.query(Users).filter(or_(Users.id < 2, Users.name == 'eric')).all()    # or
ret = session.query(Users).filter(
    or_(
        Users.id < 2,
        and_(Users.name == 'eric', Users.id > 3),
        Users.extra != ""
    )).all()


# 通配符
ret = session.query(Users).filter(Users.name.like('e%')).all()    # e开头
ret = session.query(Users).filter(~Users.name.like('e%')).all()    # 非e开头

# 限制
ret = session.query(Users)[1:2]    # 相当于limit

# 排序
ret = session.query(Users).order_by(Users.name.desc()).all()
ret = session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all()

# 分组
from sqlalchemy.sql import func

ret = session.query(Users).group_by(Users.extra).all()
ret = session.query(
    func.max(Users.id),
    func.sum(Users.id),
    func.min(Users.id)).group_by(Users.name).all()

ret = session.query(
    func.max(Users.id),
    func.sum(Users.id),
    func.min(Users.id)).group_by(Users.name).having(func.min(Users.id) >2).all()

# 连表

ret = session.query(Users, Favor).filter(Users.id == Favor.nid).all()    # 笛卡儿积连表
ret = session.query(Person).join(Favor).all()    # 默认内连 inner join
ret = session.query(Person).join(Favor, isouter=True).all()    # 左连


# 组合
q1 = session.query(Users.name).filter(Users.id > 2)
q2 = session.query(Favor.caption).filter(Favor.nid < 2)
ret = q1.union(q2).all()

q1 = session.query(Users.name).filter(Users.id > 2)
q2 = session.query(Favor.caption).filter(Favor.nid < 2)
ret = q1.union_all(q2).all()

 

 

 

参考资料:

1. Python开发【第⑨九篇】:Python操作MySQL

 

3. OTucsonM效率使用

选拔 O宝马X5M/Schema Type/SQL Expression
Language/Engine/ConnectionPooling/Dialect
全体组件对数据开始展览操作。依照类创设对象,对象转换来SQL,执行SQL。

a. 创建表

#!/usr/bin/env python
# -*- coding:utf-8 -*-
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import create_engine

#表明依赖关系并创建连接,最大连接数为5 
engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/t1", max_overflow=5)
 
Base = declarative_base()
 
# 创建单表
class Users(Base):
    __tablename__ = 'users'    # 表名
    id = Column(Integer, primary_key=True,autoincrement=True)    # id列,主键自增
    name = Column(String(32))    # name列
    extra = Column(String(16))    # extra列
 
    __table_args__ = (
    UniqueConstraint('id', 'name', name='uix_id_name'),    # 创建联合唯一索引
        Index('ix_id_name', 'name', 'extra'),    # 创建普通索引
    )
 
 
# 一对多
class Favor(Base):
    __tablename__ = 'favor'
    nid = Column(Integer, primary_key=True)
    caption = Column(String(50), default='red', unique=True)
 
 
class Person(Base):
    __tablename__ = 'person'
    nid = Column(Integer, primary_key=True)
    name = Column(String(32), index=True, nullable=True)
    favor_id = Column(Integer, ForeignKey("favor.nid"))    # 创建外键
 
 
# 多对多
class Group(Base):
    __tablename__ = 'group'
    id = Column(Integer, primary_key=True)
    name = Column(String(64), unique=True, nullable=False)
    port = Column(Integer, default=22)
 
 
class Server(Base):
    __tablename__ = 'server'
    id = Column(Integer, primary_key=True, autoincrement=True)
    hostname = Column(String(64), unique=True, nullable=False)
 
 
class ServerToGroup(Base):
    __tablename__ = 'servertogroup'
    nid = Column(Integer, primary_key=True, autoincrement=True)
    server_id = Column(Integer, ForeignKey('server.id'))    # 创建外键
    group_id = Column(Integer, ForeignKey('group.id'))    # 创建外键
 
 
def init_db():
    Base.metadata.create_all(engine)
 
 
def drop_db():
    Base.metadata.drop_all(engine)

注:设置外键的另一种方法 ForeignKeyConstraint([‘other_id’],
[‘othertable.other_id’])

b. 操作表

 

亚洲必赢手机入口 22亚洲必赢手机入口 23

#!/usr/bin/env python
# -*- coding:utf-8 -*-
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import create_engine

engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/t1", max_overflow=5)

Base = declarative_base()

# 创建单表
class Users(Base):
    __tablename__ = 'users'
    id = Column(Integer, primary_key=True)
    name = Column(String(32))
    extra = Column(String(16))

    __table_args__ = (
    UniqueConstraint('id', 'name', name='uix_id_name'),
        Index('ix_id_name', 'name', 'extra'),
    )

    def __repr__(self):
        return "%s-%s" %(self.id, self.name)

# 一对多
class Favor(Base):
    __tablename__ = 'favor'
    nid = Column(Integer, primary_key=True)
    caption = Column(String(50), default='red', unique=True)

    def __repr__(self):
        return "%s-%s" %(self.nid, self.caption)

class Person(Base):
    __tablename__ = 'person'
    nid = Column(Integer, primary_key=True)
    name = Column(String(32), index=True, nullable=True)
    favor_id = Column(Integer, ForeignKey("favor.nid"))
    # 与生成表结构无关,仅用于查询方便
    favor = relationship("Favor", backref='pers')

# 多对多
class ServerToGroup(Base):
    __tablename__ = 'servertogroup'
    nid = Column(Integer, primary_key=True, autoincrement=True)
    server_id = Column(Integer, ForeignKey('server.id'))
    group_id = Column(Integer, ForeignKey('group.id'))
    group = relationship("Group", backref='s2g')
    server = relationship("Server", backref='s2g')

class Group(Base):
    __tablename__ = 'group'
    id = Column(Integer, primary_key=True)
    name = Column(String(64), unique=True, nullable=False)
    port = Column(Integer, default=22)
    # group = relationship('Group',secondary=ServerToGroup,backref='host_list')

class Server(Base):
    __tablename__ = 'server'

    id = Column(Integer, primary_key=True, autoincrement=True)
    hostname = Column(String(64), unique=True, nullable=False)

def init_db():
    Base.metadata.create_all(engine)

def drop_db():
    Base.metadata.drop_all(engine)

Session = sessionmaker(bind=engine)
session = Session()

表结构 + 数据库连接

b.1 增

#单条增加
obj = Users(name="alex0", extra='sb')
session.add(obj)

#多条增加
session.add_all([
    Users(name="alex1", extra='sb'),
    Users(name="alex2", extra='sb'),
])

#提交
session.commit()

 

b.2 删

#先查询到要删除的记录,再delete
session.query(Users).filter(Users.id > 2).delete()
session.commit()

 

b.3 改

#先查询,再更新
session.query(Users).filter(Users.id > 2).update({"name" : "099"})    # 直接更改
session.query(Users).filter(Users.id > 2).update({Users.name: Users.name + "099"}, synchronize_session=False)    # 字符串拼接
session.query(Users).filter(Users.id > 2).update({"num": Users.num + 1}, synchronize_session="evaluate")    # 数字相加
session.commit()

 

b.4 查

 

ret = session.query(Users).all()
ret = session.query(Users.name, Users.extra).all()
ret = session.query(Users).filter_by(name='alex').all()
ret = session.query(Users).filter_by(name='alex').first()

ret = session.query(Users).filter(text("id<:value and name=:name")).params(value=224, name='fred').order_by(User.id).all()

ret = session.query(Users).from_statement(text("SELECT * FROM users where name=:name")).params(name='ed').all()

 

b.5 其它

 

# 条件
ret = session.query(Users).filter_by(name='alex').all()    # 条件内为关键字表达式
ret = session.query(Users).filter(Users.id > 1, Users.name == 'eric').all()    # 条件内为SQL表达式
ret = session.query(Users).filter(Users.id.between(1, 3), Users.name == 'eric').all()    # between
ret = session.query(Users).filter(Users.id.in_([1,3,4])).all()    # in
ret = session.query(Users).filter(~Users.id.in_([1,3,4])).all()    # not in
ret = session.query(Users).filter(Users.id.in_(session.query(Users.id).filter_by(name='eric'))).all()    # 子查询条件

from sqlalchemy import and_, or_
ret = session.query(Users).filter(and_(Users.id > 3, Users.name == 'eric')).all()    # and
ret = session.query(Users).filter(or_(Users.id < 2, Users.name == 'eric')).all()    # or
ret = session.query(Users).filter(
    or_(
        Users.id < 2,
        and_(Users.name == 'eric', Users.id > 3),
        Users.extra != ""
    )).all()


# 通配符
ret = session.query(Users).filter(Users.name.like('e%')).all()    # e开头
ret = session.query(Users).filter(~Users.name.like('e%')).all()    # 非e开头

# 限制
ret = session.query(Users)[1:2]    # 相当于limit

# 排序
ret = session.query(Users).order_by(Users.name.desc()).all()
ret = session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all()

# 分组
from sqlalchemy.sql import func

ret = session.query(Users).group_by(Users.extra).all()
ret = session.query(
    func.max(Users.id),
    func.sum(Users.id),
    func.min(Users.id)).group_by(Users.name).all()

ret = session.query(
    func.max(Users.id),
    func.sum(Users.id),
    func.min(Users.id)).group_by(Users.name).having(func.min(Users.id) >2).all()

# 连表

ret = session.query(Users, Favor).filter(Users.id == Favor.nid).all()    # 笛卡儿积连表
ret = session.query(Person).join(Favor).all()    # 默认内连 inner join
ret = session.query(Person).join(Favor, isouter=True).all()    # 左连


# 组合
q1 = session.query(Users.name).filter(Users.id > 2)
q2 = session.query(Favor.caption).filter(Favor.nid < 2)
ret = q1.union(q2).all()

q1 = session.query(Users.name).filter(Users.id > 2)
q2 = session.query(Favor.caption).filter(Favor.nid < 2)
ret = q1.union_all(q2).all()

 

 

 

参考资料:

1. Python开发【第七九篇】:Python操作MySQL

 

2、 flask-script

下载安装

pip3 install flask-script

功能:

  a、增加runsever

from (项目目录) import create_app
from flask_script import Manager

app = create_app()
manager = Manager(app)

if __name__ == '__main__':
    # app.run()
    manager.run()

  b、地点传参

from (项目目录) import create_app
from flask_script import Manager

app = create_app()
manager = Manager(app)

@manager.command
def custom(arg):
    """
    自定义命令
    python manage.py custom 123
    :param arg:
    :return:
    """
    print(arg)

if __name__ == '__main__':
    # app.run()
    manager.run()

  c、关键字传参  

from flask_script import Manager
from (项目目录) import create_app

app = create_app()
manager = Manager(app)

@manager.option('-n', '--name', dest='name')
@manager.option('-u', '--url', dest='url')
def cmd(name, url):
    """
    自定义命令
    执行: python manage.py  cmd -n qianduoduo -u http://www.baidu.com
    :param name:
    :param url:
    :return:
    """
    print(name, url)

if __name__ == '__main__':
    # app.run()
    manager.run()

 

c.通过原生SQL语句执行

亚洲必赢手机入口 24亚洲必赢手机入口 25

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import time
import threading

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import create_engine
from sqlalchemy.sql import text
from sqlalchemy.engine.result import ResultProxy
from db import Users, Hosts

engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5)
Session = sessionmaker(bind=engine)

session = Session()

# 查询
# cursor = session.execute('select * from users')
# result = cursor.fetchall()

# 添加
cursor = session.execute('insert into users(name) values(:value)',params={"value":'hc'})
# 注意占位符和传参的形式
session.commit()
print(cursor.lastrowid)

session.close()

原生SQL语句

View Code

 

 

 

3、flask-migrate

安装

pip3 install flask-migrate

配备是借助 flask-script

from flask_script import Manager
from flask_migrate import Migrate, MigrateCommand
from (项目目录) import create_app
from (项目目录) import db

app = create_app()
manager = Manager(app)
Migrate(app, db)
"""
# 数据库迁移命名
    python manage.py db init   #只执行一次
    python manage.py db migrate # makemirations
    python manage.py db upgrade # migrate
"""
manager.add_command('db', MigrateCommand)

if __name__ == '__main__':
    # app.run()
    manager.run()

d.基本增加和删除改查示例

import time
import threading

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import create_engine
from sqlalchemy.sql import text

from db import Users, Hosts

engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5)
Session = sessionmaker(bind=engine)

session = Session()

# ################ 添加 ################

obj1 = Users(name="hc")
session.add(obj1)   #添加一个对象

session.add_all([
    Users(name="hc"),
    Users(name="alex"),
    Hosts(name="c1.com"),
])      #添加多个对象
session.commit()


# ################ 删除 ################

# filter是where条件,最后调用one()或first()返回唯一行,如果调用all()则返回所有行
session.query(Users).filter(Users.id > 2).delete()  #删除Users表中id大于2的数据
session.commit()

# ################ 修改 ################

session.query(Users).filter(Users.id > 0).update({"name" : "099"})  # 将Users表中id>0的数据,把name字段改为099
session.query(Users).filter(Users.id > 0).update({Users.name: Users.name + "099"}, synchronize_session=False)  
#synchronize_session设置为False即执行字符串拼接
session.query(Users).filter(Users.id > 0).update({"age": Users.age + 1}, synchronize_session="evaluate")    
#synchronize_session设置为evaluate即执行四则运算
session.commit()

# ################ 查询 ################

r1 = session.query(Users).all()
r2 = session.query(Users.name.label('xx'), Users.age).all()     #label 取别名的,即在查询结果中,显示name的别名'xx'
r3 = session.query(Users).filter(Users.name == "alex").one()    # one()返回唯一行,类似于django的get,如果返回数据为多个则报错
r3 = session.query(Users).filter(Users.name == "alex").all()    # all()获取所有数据
r4 = session.query(Users).filter_by(name='alex').all()          # 注意filter和filter_by后面括号内条件的写法
r5 = session.query(Users).filter_by(name='alex').first()        # first()获取返回数据的第一行
r6 = session.query(Users).filter(text("id<:value and name=:name")).params(value=224, name='fred').order_by(Users.id).all()  
#order_by后面还可以.desc()降序排列,默认为.asc()升序排列
# text(自定义条件,:的功能类似%s占位),params中进行传参
r7 = session.query(Users).from_statement(text("SELECT * FROM Hosts where name=:name")).params(name='ed').all()
# text中还能从另一个表中查询,前面要用from_statement,而不是filter


session.close()

  

补给工具:(协同开发的有限支撑支付的条件一致性)

1、pipreqs

找到项目利用的富有组件的版本

pip3 install  pipreqs

在档次中的命令行   

pipreqs ./ --encoding=utf-8

在项目中找了文件requirements.txt

什么装这一个模块?  

pip  install 的时候能够钦点3个文件,他会协调读每一行,然后安装

pycharm 打开文件的时候也会提醒下载

 

贰 、虚拟环境 (开发环境版本无法存活的)

安装

pip3 install virtualenv

创立虚拟环境

#找一个存放虚拟环境的文件夹
virtualenv env1 --no-site-packages  创建环境

activate  #激活环境

deactivate # 退出环境


#也可以用pycharm鼠标点点就好了,最新的版本就有虚拟环境goon功能
#当我们需要特定的环境,可以在电脑上有多高开发环境

 

e.基于relationship操作ForeignKey

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import time
import threading

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import create_engine
from sqlalchemy.sql import text
from sqlalchemy.engine.result import ResultProxy
from db import Users, Hosts, Hobby, Person

engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8", max_overflow=0, pool_size=5)
Session = sessionmaker(bind=engine)
session = Session()
# 添加
"""
session.add_all([
    Hobby(caption='乒乓球'),
    Hobby(caption='羽毛球'),
    Person(name='张三', hobby_id=3),
    Person(name='李四', hobby_id=4),
])

person = Person(name='张九', hobby=Hobby(caption='姑娘'))
session.add(person)

hb = Hobby(caption='人妖')
hb.pers = [Person(name='文飞'), Person(name='博雅')]
session.add(hb)     #  会同时创建3条数据(1条hobby的数据,2条person的数据)

session.commit()
"""

# 使用relationship正向查询
"""
v = session.query(Person).first()
print(v.name)
print(v.hobby.caption)
"""

# 使用relationship反向查询
"""
v = session.query(Hobby).first()
print(v.caption)
print(v.pers)
"""

session.close()

 

f.基于relationship操作m2m

亚洲必赢手机入口 26亚洲必赢手机入口 27

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import time
import threading

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import create_engine
from sqlalchemy.sql import text
from sqlalchemy.engine.result import ResultProxy
from db import Users, Hosts, Hobby, Person, Group, Server, Server2Group

engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8", max_overflow=0, pool_size=5)
Session = sessionmaker(bind=engine)
session = Session()
# 添加
"""
session.add_all([
    Server(hostname='c1.com'),
    Server(hostname='c2.com'),
    Group(name='A组'),
    Group(name='B组'),
])
session.commit()

s2g = Server2Group(server_id=1, group_id=1)
session.add(s2g)
session.commit()


gp = Group(name='C组')
gp.servers = [Server(hostname='c3.com'),Server(hostname='c4.com')]
session.add(gp)
session.commit()


ser = Server(hostname='c6.com')
ser.groups = [Group(name='F组'),Group(name='G组')]
session.add(ser)
session.commit()
"""


# 使用relationship正向查询
"""
v = session.query(Group).first()
print(v.name)
print(v.servers)
"""

# 使用relationship反向查询
"""
v = session.query(Server).first()
print(v.hostname)
print(v.groups)
"""


session.close()

基于relationship操作m2m

View Code

 

g.进阶操作

and、or、like、limit、排序、分组、连表、组合

亚洲必赢手机入口 28亚洲必赢手机入口 29

# 条件
ret = session.query(Users).filter_by(name='alex').all()
ret = session.query(Users).filter(Users.id > 1, Users.name == 'eric').all()
ret = session.query(Users).filter(Users.id.between(1, 3), Users.name == 'eric').all()
ret = session.query(Users).filter(Users.id.in_([1,3,4])).all()
ret = session.query(Users).filter(~Users.id.in_([1,3,4])).all()
ret = session.query(Users).filter(Users.id.in_(session.query(Users.id).filter_by(name='eric'))).all()

from sqlalchemy import and_, or_    #需要导入

ret = session.query(Users).filter(and_(Users.id > 3, Users.name == 'eric')).all()
ret = session.query(Users).filter(or_(Users.id < 2, Users.name == 'eric')).all()
ret = session.query(Users).filter(
    or_(
        Users.id < 2,
        and_(Users.name == 'eric', Users.id > 3),
        Users.extra != ""
    )).all()


# 通配符
ret = session.query(Users).filter(Users.name.like('e%')).all()
ret = session.query(Users).filter(~Users.name.like('e%')).all()

# 限制
ret = session.query(Users)[1:2]

# 排序
ret = session.query(Users).order_by(Users.name.desc()).all()  # 倒序排列
ret = session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all()    

# 分组
from sqlalchemy.sql import func   #需要导入

ret = session.query(Users).group_by(Users.extra).all()
ret = session.query(
    func.max(Users.id),
    func.sum(Users.id),
    func.min(Users.id)).group_by(Users.name).all()

ret = session.query(
    func.max(Users.id),
    func.sum(Users.id),
    func.min(Users.id)).group_by(Users.name).having(func.min(Users.id) >2).all()

# 连表

ret = session.query(Users, Favor).filter(Users.id == Favor.nid).all()

ret = session.query(Person).join(Favor).all()

ret = session.query(Person).join(Favor, isouter=True).all()


# 组合
q1 = session.query(Users.name).filter(Users.id > 2)
q2 = session.query(Favor.caption).filter(Favor.nid < 2)
ret = q1.union(q2).all()

q1 = session.query(Users.name).filter(Users.id > 2)
q2 = session.query(Favor.caption).filter(Favor.nid < 2)
ret = q1.union_all(q2).all()

View Code

 

 关联子查询

亚洲必赢手机入口 30亚洲必赢手机入口 31

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import time
import threading

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import create_engine
from sqlalchemy.sql import text, func
from sqlalchemy.engine.result import ResultProxy
from db import Users, Hosts, Hobby, Person, Group, Server, Server2Group

engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8", max_overflow=0, pool_size=5)
Session = sessionmaker(bind=engine)
session = Session()

# 关联子查询
subqry = session.query(func.count(Server.id).label("sid")).filter(Server.id == Group.id).correlate(Group).as_scalar()
result = session.query(Group.name, subqry)
"""
SELECT `group`.name AS group_name, (SELECT count(server.id) AS sid 
FROM server 
WHERE server.id = `group`.id) AS anon_1 
FROM `group`
"""


# 原生SQL
"""
# 查询
cursor = session.execute('select * from users')
result = cursor.fetchall()

# 添加
cursor = session.execute('insert into users(name) values(:value)',params={"value":'wupeiqi'})
session.commit()
print(cursor.lastrowid)
"""

session.close()

View Code

 

        子查询:
            session.query(Users).filter(Users.id.in_(session.query(Users.id).filter_by(name='eric'))).all()
            """
            select * from users where id in (select id from xxx)
            """


            subqry = session.query(func.count(Server.id).label("sid")).filter(Server.id == Group.id).correlate(Group).as_scalar()
            #第一步:  session.query(func.count(Server.id).label("sid")).filter(Server.id == Group.id)
            #这句的sql语句为 select count(id) as sid from server where server.id = group.id      如果直接运行,则会报错
            # 第二步:.correlate(Group).as_scalar() ==> 代表此时不执行查询操作,将其当作条件,在group表中查询时,才执行查询


            result = session.query(Group.name, subqry)
            # sql语句为:select group.name  subqry  from group
            #第三步:将subqry替换为上面的条件,则此句的SQL为:
            #    select group.name,(select count(id) as sid from server where server.id = group.id) as xx  from group

 

点击

 

 h.session对象怎么着兑现线程安全?

session有两种创设格局

方式一:

from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine

engine = create_engine(
    "mysql+pymysql://root:123@47.93.4.198:3306/s6?charset=utf8",
    max_overflow=0,  # 超过连接池大小外最多创建的连接
    pool_size=5,  # 连接池大小
    pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
    pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
)
Session = sessionmaker(bind=engine)

# 方式一:
# 由于无法提供线程共享功能,所有在开发时要注意,在每个线程中自己创建 session。
#  from sqlalchemy.orm.session import Session
#         具有操作数据库的:'close', 'commit', 'connection', 'delete', 'execute', 'expire',.....
session = Session()     # 创建普通的session
print('原生session',session)
# 操作数据库
session.close()

由于无法提供线程共享功能,所有在开发时要注意,在每个线程中自己创建 session
解决办法如下:

亚洲必赢手机入口 32亚洲必赢手机入口 33

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import time
import threading

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import create_engine
from db import Users

engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5)
Session = sessionmaker(bind=engine)


def task(arg):
    session = Session()

    obj1 = Users(name="alex1")
    session.add(obj1)

    session.commit()


for i in range(10):
    t = threading.Thread(target=task, args=(i,))
    t.start()

多线程执行示例

 

方式二(推荐):

from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session

engine = create_engine(
    "mysql+pymysql://root:123@47.93.4.198:3306/s6?charset=utf8",
    max_overflow=0,  # 超过连接池大小外最多创建的连接
    pool_size=5,  # 连接池大小
    pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
    pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
)
Session = sessionmaker(bind=engine)

# 方式二:支持线程安全,自动为每个线程创建一个session,单线程时,只创建一个
#               - threading.Local
#               - 唯一标识
# ScopedSession对象
#       self.registry(), 加括号 创建session
#       self.registry(), 加括号 创建session
#       self.registry(), 加括号 创建session
from greenlet import getcurrent as get_ident #本地线程的唯一标识的函数,加括号则执行函数
session = scoped_session(Session,get_ident)
# session.add
# 操作数据库
session.remove()

支持线程安全,自动为每个线程创建一个session,单线程时,只创建一个

 

 

I.sqlalchemy-utils给SqlAlchemy提供choice功能

SqlAlchemy本人没有chocie,需求安装这一个才能提供choice作用

pip install sqlalchemy-utils

 

亚洲必赢手机入口 34亚洲必赢手机入口 35

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column,Integer,String
from sqlalchemy_utils import ChoiceType
from sqlalchemy import create_engine

Base = declarative_base()
class User(Base):
    __tablename__ = 'users'
    type_choices=(
        (1,'北京'),
        (2,'上海'),
        )
    id = Column(Integer, primary_key=True)  #必须要有主键
    name =Column(String(64))
    types=Column(ChoiceType(type_choices,Integer()))    # 注意:Integer后面要有括号

    __table_args__ = {
        'mysql_engine': 'InnoDB',
        'mysql_charset': 'utf8'
    }

def init_db():
    """
    根据类创建数据库表
    :return:
    """
    engine = create_engine(
        "mysql+pymysql://root:123@127.0.0.1:3306/db1?charset=utf8",
        max_overflow=0,  # 超过连接池大小外最多创建的连接
        pool_size=5,  # 连接池大小
        pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
        pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
    )

    Base.metadata.create_all(engine)


def drop_db():
    """
    根据类删除数据库表
    :return:
    """
    engine = create_engine(
        "mysql+pymysql://root:123@127.0.0.1:3306/db1?charset=utf8",
        max_overflow=0,  # 超过连接池大小外最多创建的连接
        pool_size=5,  # 连接池大小
        pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
        pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
    )

    Base.metadata.drop_all(engine)


if __name__ == '__main__':
    drop_db()
    init_db()

建表

亚洲必赢手机入口 36亚洲必赢手机入口 37

#! /usr/bin/env python
# -*- coding: utf-8 -*-
# __author__ = "HuChong"
# Date: 2018/1/12

from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from ru import User

engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/db1", max_overflow=0, pool_size=5)
Session = sessionmaker(bind=engine)

session = Session()

obj1 = User(name="xz",types=1)
obj2 = User(name="zz",types=2)
session.add_all([obj1,obj2])
session.commit()
session.close()

计划数据

亚洲必赢手机入口 38亚洲必赢手机入口 39

#! /usr/bin/env python
# -*- coding: utf-8 -*-
# __author__ = "HuChong"
# Date: 2018/1/12

from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from ru import User

engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/db1", max_overflow=0, pool_size=5)
Session = sessionmaker(bind=engine)

session = Session()

result_list=session.query(User).all()
print(result_list)
for item in result_list:
    print(item.types)
    print(item.types.code,item.types.value)

session.close()


#######打印结果如下########
'''
[<ru.User object at 0x0386D770>, <ru.User object at 0x0386D7D0>]
Choice(code=1, value=北京)
1 北京
Choice(code=2, value=上海)
2 上海
'''

获取值

 

 

三、Flask-SQLAlchemy及Flask-Migrate组件

1.Flask-SQLAlchemy

  用于将Flask和SQLAlchemy联系起来,使用在此以前供给装上面那个模块

pip install flask-sqlalchemy

假使选用Flask-sqlalchemy组件,则在动用时有一点扭转

# 1. 引入Flask-SQLAlchemy
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy()    #实例化SQLAlchemy对象

# 2. 注册 Flask-SQLAlchemy
    # SQLAlchemy(app)
    # 由于这个对象在其他地方想要使用,所有用以下方式注册 
    db.init_app(app) #读取配置文件,配置文件中写以前在create_engine里面的链接数据

#settings.py中,加上配置

SQLALCHEMY_DATABASE_URI =
“mysql+pymysql://root:123@47.93.4.198:3306/s6?charset=utf8”
SQLALCHEMY_POOL_SIZE = 2
SQLALCHEMY_POOL_TIMEOUT = 30
SQLALCHEMY_POOL_RECYCLE = -1

# 追踪对象的改动并且发送信号
SQLALCHEMY_TRACK_MODIFICATIONS = False

# 3. 导入models中的表
from .models import *

from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index
from app import db

# 4. 写类继承db.Model
class Users(db.Model):  #再不是继承Base,而且继承db.Model
    __tablename__ = 'users'

    id = Column(Integer, primary_key=True)
    name = Column(String(32), index=True, nullable=False)
    pwd = Column(String(32))

    __table_args__ = {
        'mysql_engine': 'InnoDB',   # 指定表的引擎
        'mysql_charset': 'utf8'     # 指定表的编码格式
    }


class Group(db.Model):
    __tablename__ = 'group'

    id = Column(Integer, primary_key=True)
    name = Column(String(32), index=True, nullable=False)

    __table_args__ = {
        'mysql_engine': 'InnoDB',
        'mysql_charset': 'utf8'
    }

# 5. 创建和删除表
  #  以后执行db.create_all()
  #  以后执行db.drop_all()
但是这样不好,我们引入 Flask-Migrate

2.Flask-Migrate

可以由此类似Django里的命令,进行多少迁移,创立表,删除表,更新表

安装  pip install Flask-Migrate

# 5.1 导入
from flask_migrate import Migrate, MigrateCommand
from app import create_app, db

app = create_app()
manager = Manager(app)
# 5.2 创建migrate实例
migrate = Migrate(app, db)

 

#执行命令:
    初次:python manage.py db init

    python manage.py db migrate
    python manage.py db upgrade

 

以后执行SQL时:
    方式一:
        result = db.session.query(models.User.id,models.User.name).all()
        db.session.remove()
    方式二:
        result = models.Users.query.all()

 

 3.代码规范之生成requestments.txt文件

pip  freeze  # 获取环境中所有安装的模块以及其对应的版本

pip  freeze > requirements.txt  # 生成对应的文本文件

亚洲必赢手机入口 40

 

出于获得的是有所,我们还得和谐手动在文书里删除一些不必要的,全部这些措施倒霉,大家利用上面包车型客车法子

 pip install pipreqs

率先安装模块,安装到位未来,大家就能够在极限,执行pipreqs命令

# 获取当前所在程序目录中涉及到的所有模块,并自动生成 requirements.txt 且写入内容。
 pipreqs ./

提出在Linux系统下利用,windows环境下会报错

亚洲必赢手机入口 41

 

尔后使用别人的次序,进入程序目录:

安装requirements.txt依赖
pip install -r requirements.txt

会自动安装文件里,全部对应版本模块

 

 

 

 

 

 

 

 

 

发表评论

电子邮件地址不会被公开。 必填项已用*标注

网站地图xml地图