SQLalchemy
demo01_sqlalchemy.py
"""
数据库操作流程
- 1. 安装扩展
- pip install flask-sqlalchemy
- pip install flask-mysqldb / pymysql
- 2. 设置数据库的配置信息
- 3. 创建 sqlalchemy 对象 db,关联 app
- 4. 编写模型类,继承自 db.Model
- 5. 操作数据库
- 增删改
- 查询
"""
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
app = Flask(__name__)
# 2. 设置数据库的配置信息
# mysql://[用户名]:[密码]@[主机地址][端口号]/[数据库名]
app.config["SQLALCHEMY_DATABASE_URI"] = "mysql://root:123456@127.0.0.1:3306/data36"
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False
# 3. 创建 sqlalchemy 对象 db,关联 app
db = SQLAlchemy(app)
# 4. 编写模型类,继承自 db.Model
class Student(db.Model): # 默认表名为 student
# 【主键】 参数1:表示 id 的类型;参数2:表示 id 的约束类型
__tablename__ = "person" # 手动设置表名
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(32))
@app.route("/")
def helloworld():
return "hello world Flask"
if __name__ == "__main__":
# 创建数据库的表,创建的是继承自 db.Model 的表
with app.app_context():
# 创建所有继承自db.Model的表
db.create_all()
# 删除所有继承自db.Model的表
# db.drop_all()
app.run(debug=True)
CRUD
demo02_CRUD.py
"""
增删改查
- 全部都是 db.session 操作
- 常见方法:
- db.session.add(obj) 添加单个对象
- db.session.add_all([obj1, obj2]) 添加多个对象
- db.session.delete(obj) 删除单个对象
- db.session.commit() 提交会话
- db.drop_all() 删除继承自 db.Model 的所有表
- db.create_all() 创建继承自 db.Model 的所有表
- db.session.rollback() 回滚
- db.session.remove() 移除会话
- 如果想要打印一个对象的时候想看到指定信息,那么请重写 __repr__ 方法
"""
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
app = Flask(__name__)
# 2. 设置数据库的配置信息
# mysql://[用户名]:[密码]@[主机地址][端口号]/[数据库名]
app.config["SQLALCHEMY_DATABASE_URI"] = "mysql://root:123456@127.0.0.1:3306/data36"
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False
# 3. 创建 sqlalchemy 对象 db,关联 app
db = SQLAlchemy(app)
# 4. 编写模型类,继承自 db.Model
# 角色【一方】
class Role(db.Model): # 默认表名为 student
# 【主键】 参数1:表示 id 的类型;参数2:表示 id 的约束类型
__tablename__ = "roles" # 手动设置表名
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(32))
# 用户【多方】
class User(db.Model): # 默认表名为 student
# 【主键】 参数1:表示 id 的类型;参数2:表示 id 的约束类型
__tablename__ = "users" # 手动设置表名
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(32))
# 建立外键
role_id = db.Column(db.Integer, db.ForeignKey(Role.id))
@app.route("/")
def helloworld():
return "hello world Flask"
def add():
role = Role(name="role")
db.session.add(role)
db.session.commit()
user = User(name="user", role_id=role.id)
db.session.add(user)
db.session.commit()
if __name__ == "__main__":
# 创建数据库的表,创建的是继承自 db.Model 的表
with app.app_context():
# 创建所有继承自db.Model的表
db.create_all()
# 删除所有继承自db.Model的表
# db.drop_all()
add()
app.run(debug=True)
查询
demo03_select.py
"""
查询
xxx.query.[过滤器].[执行器]
- 查询过滤器【可选】
- filter()
- filter_by()
- limit
- offset()
- order_by()
- group_by()
- 查询执行器【必选】
- all()
- first()
- first_or_404()
- get()
- get_or_404()
- count()
# page: 表示要查询的页数;per_page: 表示每页有多少条数据;
# Error_out: False查不到不会报错;会返回 paginate 对象
- paginate(page, per_page, Error_out) # 分页
# paginate.pages 总页数
# paginate.page 当前页
# paginate.items 当前的对象列表
"""
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
app = Flask(__name__)
# 2. 设置数据库的配置信息
# mysql://[用户名]:[密码]@[主机地址][端口号]/[数据库名]
app.config["SQLALCHEMY_DATABASE_URI"] = "mysql://root:123456@127.0.0.1:3306/data36"
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False
# 3. 创建 sqlalchemy 对象 db,关联 app
db = SQLAlchemy(app)
# 4. 编写模型类,继承自 db.Model
# 角色【一方】
class Role(db.Model): # 默认表名为 student
# 【主键】 参数1:表示 id 的类型;参数2:表示 id 的约束类型
__tablename__ = "roles" # 手动设置表名
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(32))
# 用户【多方】
class User(db.Model): # 默认表名为 student
# 【主键】 参数1:表示 id 的类型;参数2:表示 id 的约束类型
__tablename__ = "users" # 手动设置表名
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(32))
# 建立外键
role_id = db.Column(db.Integer, db.ForeignKey(Role.id))
@app.route("/")
def helloworld():
return "hello world Flask"
def add():
role = Role(name="role")
db.session.add(role)
db.session.commit()
user = User(name="user", role_id=role.id)
db.session.add(user)
db.session.commit()
if __name__ == "__main__":
# 创建数据库的表,创建的是继承自 db.Model 的表
with app.app_context():
# 创建所有继承自db.Model的表
# db.create_all()
# 删除所有继承自db.Model的表
# db.drop_all()
pass
add()
app.run(debug=True)
图书馆案例
demo04_library.py
"""
图书馆案例
- 1. 数据库配置
- 作者模型【一方】
- 书籍模型【多方】
- 2. 添加测试数据
- 3. 添加作者、书籍
- 4. 删除作者、删除书籍
"""
from flask import Flask, render_template, request, redirect, flash
from flask_sqlalchemy import SQLAlchemy
from flask_wtf.csrf import CSRFProtect
app = Flask(__name__)
# 设置密钥
app.config["SECRET_KEY"] = "dsfsdfasdfa"
# 使用 CSRFProtect 保护 app
CSRFProtect(app)
# 1. 设置数据库的配置信息
# mysql://[用户名]:[密码]@[主机地址][端口号]/[数据库名]
app.config["SQLALCHEMY_DATABASE_URI"] = "mysql://root:123456@127.0.0.1:3306/library"
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False
# 2. 创建 sqlalchemy 对象,关联app
db = SQLAlchemy(app)
# 3. 作者模型类
# 作者模型【一方】
class Author(db.Model): # 默认表名为 student
# 【主键】 参数1:表示 id 的类型;参数2:表示 id 的约束类型
__tablename__ = "authors" # 手动设置表名
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(32))
# 关系属性
# 查看作者名下的书籍:Author.books
# 查看书籍背后的作者:Book.author
books = db.relationship("Book", backref="author")
# 4. 书籍模型类
# 书籍模型【多方】
class Book(db.Model): # 默认表名为 student
# 【主键】 参数1:表示 id 的类型;参数2:表示 id 的约束类型
__tablename__ = "books" # 手动设置表名
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(32))
# 建立外键
author_id = db.Column(db.Integer, db.ForeignKey(Author.id))
# 等价语句:author_id = db.Column(db.Integer, db.ForeignKey("authors.id"))
@app.route("/")
def show_index():
# 1. 查询所有作者的信息
authors = Author.query.all()
# 2. 携带作者的信息,渲染页面
response = render_template("Cdemo01_library.html", authors=authors)
return response
"""
添加数据的逻辑
作者不存在,则添加数据
作者存在,书籍存在,不添加数据
作者存在,书籍不存在,添加数据
"""
# 添加书籍
@app.route("/add_data", methods=["POST", "GET"])
def add_data():
# 1. 获取提交的数据
author_name = request.form.get("author")
book_name = request.form.get("book")
# 判断输入的数据是否为空
if not all([author_name, book_name]):
flash("作者或书籍不能为空!")
return redirect("/")
# 2. 携带作者的信息,渲染页面
author = Author.query.filter(Author.name == author_name).first()
# 3.判断作者是否存在
if author:
# 4. 通过书籍名称查询书籍对象
book = Book.query.filter(
# 判断作者是否写过该书
Book.name == book_name,
Book.author_id == author.id,
).first()
# 5. 判断书籍对象是否存在
if book:
flash(f"{author_name}已经写了《{book_name}》")
else:
# 创建书籍对象,添加到数据库
book_adding = Book(name=book_name, author_id=author.id)
db.session.add(book_adding)
db.session.commit()
else:
# 创建作者对象,添加到数据库
author_adding = Author(name=author_name)
db.session.add(author_adding)
db.session.commit()
# 创建书籍对象,添加到数据库
book_adding = Book(name=book_name, author_id=author_adding.id)
db.session.add(book_adding)
db.session.commit()
return redirect("/")
# 删除书籍
@app.route("/del_data/<int:book_id>", methods=["POST", "GET"])
def del_data(book_id):
# 根据书籍编号取出书籍对象
book = Book.query.get(book_id)
# 删除该书籍对象
db.session.delete(book)
db.session.commit()
# 重定向到显示页面
return redirect("/")
# 删除书籍
@app.route("/del_author/<int:author_id>", methods=["POST", "GET"])
def del_author(author_id):
# 根据作者编号取出作者对象
author = Author.query.get(author_id)
# 删除作者名下所有书籍
for book in author.books:
db.session.delete(book)
# 删除该作者对象
db.session.delete(author)
db.session.commit()
# 重定向到显示页面
return redirect("/")
if __name__ == "__main__":
# 为了演示方便,先删除后创建
# 创建数据库的表,创建的是继承自 db.Model 的表
with app.app_context():
# 删除所有继承自db.Model的表
db.drop_all()
# 创建所有继承自db.Model的表
db.create_all()
# 添加测试数据
# 生成作者数据
au1 = Author(name="金庸")
au2 = Author(name="古龙")
au3 = Author(name="莫言")
# 把数据提交给用户会话
db.session.add_all([au1, au2, au3])
# 提交会话
db.session.commit()
# 生成书籍数据
bk1 = Book(name="笑傲江湖", author_id=au1.id)
bk2 = Book(name="神雕侠侣", author_id=au1.id)
bk3 = Book(name="欢乐英雄", author_id=au2.id)
bk4 = Book(name="白玉老虎", author_id=au2.id)
bk5 = Book(name="丰乳肥臀", author_id=au3.id)
bk6 = Book(name="红高粱家族", author_id=au3.id)
# 把数据提交给用户会话
db.session.add_all([bk1, bk2, bk3, bk4, bk5, bk6])
# 提交会话
db.session.commit()
app.run(debug=True)
templates/Cdemo01_library.html
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>library</title>
</head>
<body>
<form action="add_data" , method="post">
{# 设置隐藏字段csrf_token,
只要使用了CSRFProtect,然后使用模板渲染时可以直接使用csrf_token() 方法 #}
<input type="hidden" name="csrf_token" value="{{csrf_token()}}">
作者:<input type="text" name="author"><br>
书籍:<input type="text" name="book"><br>
<input type="submit" value="添加"><br>
{% for message in get_flashed_messages() %}
<span style="color: red;">{{message}}</span>
{% endfor %}
</form>
<hr>
{# 数据展示 #}
<ul>
{% for author in authors %}
<!-- <li>作者:{{ author.name }} <a href="/del_author/{{author.id}}">删除</a> </li> -->
<li>作者:{{ author.name }} <a href="{{url_for('del_author', author_id = author.id)}}">删除</a> </li>
<ul>
{% for book in author.books %}
<li>书籍:{{book.name}} <a href="/del_data/{{book.id}}">删除</a> </li>
{% endfor %}
</ul>
{% endfor %}
</ul>
</body>
</html>
多对多
demo05_many_to_many.py
"""
多对多
案例:学生和课程
"""
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
app = Flask(__name__)
# 2. 设置数据库的配置信息
# mysql://[用户名]:[密码]@[主机地址][端口号]/[数据库名]
app.config["SQLALCHEMY_DATABASE_URI"] = "mysql://root:123456@127.0.0.1:3306/school"
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False
# 3. 创建 sqlalchemy 对象 db,关联 app
db = SQLAlchemy(app)
# 4. 编写模型类,继承自 db.Model
tb_student_course = db.Table(
"tb_student_course",
db.Column("student_id", db.Integer, db.ForeignKey("students.id")),
db.Column("course_id", db.Integer, db.ForeignKey("courses.id")),
)
class Student(db.Model):
# 【主键】 参数1:表示 id 的类型;参数2:表示 id 的约束类型
__tablename__ = "students" # 手动设置表名
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(32))
# 关系属性
# secondary:使用在多对多中,用来表示二次查询的
courses = db.relationship(
"Course", backref="students", secondary="tb_student_course"
)
class Course(db.Model):
# 【主键】 参数1:表示 id 的类型;参数2:表示 id 的约束类型
__tablename__ = "courses" # 手动设置表名
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(32))
@app.route("/")
def helloworld():
return "hello world Flask"
if __name__ == "__main__":
# 创建数据库的表,创建的是继承自 db.Model 的表
with app.app_context():
# 删除所有继承自db.Model的表
db.drop_all()
# 创建所有继承自db.Model的表
db.create_all()
# 添加测试数据
stu1 = Student(name="张三")
stu2 = Student(name="李四")
stu3 = Student(name="王五")
cou1 = Course(name="物理")
cou2 = Course(name="化学")
cou3 = Course(name="生物")
stu1.courses = [cou2, cou3]
stu2.courses = [cou2]
stu3.courses = [cou1, cou2, cou3]
db.session.add_all([stu1, stu2, stu3])
db.session.add_all([cou1, cou2, cou3])
db.session.commit()
app.run(debug=True)
数据库迁移
"""
数据库迁移
- 目的:当数据的表结构发生变化之后,如果直接删除原有的数据,再添加新的数据,有可能会导致数据丢失
- 注意点:
- 1. 是为了备份表结构,而不是数据
- 2. 如果想要备份数据,需要使用工具,navicat,mysqlworkbench……
- 操作流程
- 1. 安装扩展
- pip install flask_script
- pip install flask_migrate
- 2. 导入三个类
- from flask_script import Manager
- from install flask_migrate
- 3. 通过 Manager 类创建对象 manager,管理app
- manager = Manager(app)
- 4. 使用 Migrate,关联app,db
- Migrate(app, db)
- 5. 给manager添加一条操作命令
- manager.add_command("db", MigrateCommand)
- 相关迁移命令:
- 生成迁移文件夹:
- python xxx.py db init
- 将模型类生成迁移脚本
- python xxx.py db migrate -m '注释'
- 将迁移脚本更新到数据库中
- python xxx.py db upgrade
"""
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from flask_script import Manager
# 导入数据迁移核心类
from flask_migrate import Migrate, MigrateCommand
app = Flask(__name__)
# 设置数据库的配置信息
# mysql://[用户名]:[密码]@[主机地址][端口号]/[数据库名]
app.config["SQLALCHEMY_DATABASE_URI"] = "mysql://root:123456@127.0.0.1:3306/flasktest"
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False
# 创建 sqlalchemy 对象 db,关联 app
db = SQLAlchemy(app)
manager = Manager(app)
# 初始化数据迁移
migrate = Migrate(app, db)
# 给manager添加一条操作命令
manager.add_command("db", MigrateCommand)
# 编写模型类
class Student(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(32))
@app.route("/")
def helloworld():
return "hello world Flask"
if __name__ == "__main__":
manager.run()