Python数据库编程


Python数据库编程

Python 数据库编程是后端开发的核心技能之一,主要涉及通过 Python 代码与数据库交互(连接、增删改查等)。本文重点介绍 Python 操作 MySQL(关系型数据库)Redis(NoSQL 内存数据库) 的常用方法,包括核心库、基本操作及最佳实践。

一、Python 操作 MySQL

MySQL 是最流行的关系型数据库之一,适合存储结构化数据(如用户信息、订单记录),支持事务、复杂查询等特性。Python 操作 MySQL 主要依赖第三方库,常用的有 mysql-connector-python(Oracle 官方)和 PyMySQL(社区维护,兼容性好)。

1. 环境准备

(1)安装 MySQL 驱动

选择以下一种库安装:

# 安装 PyMySQL(推荐,轻量且兼容 MySQLdb)
pip install pymysql

# 或安装 mysql-connector-python(官方库)
pip install mysql-connector-python
(2)准备 MySQL 环境
  • 安装 MySQL 服务器(本地或远程),并创建测试数据库和表:
-- 创建数据库
CREATE DATABASE IF NOT EXISTS python_db;
USE python_db;

-- 创建测试表(用户表)
CREATE TABLE IF NOT EXISTS users (
   id INT PRIMARY KEY AUTO_INCREMENT,
   name VARCHAR(50) NOT NULL,
   age INT,
   email VARCHAR(100) UNIQUE
);

2. 核心操作:连接与 CRUD

PyMySQL 为例,基本流程为:连接数据库 → 获取游标 → 执行 SQL → 处理结果 → 关闭连接

(1)连接数据库
import pymysql
from pymysql.cursors import DictCursor  # 可选:让查询结果返回字典(默认元组)

# 数据库配置(实际开发中应放在配置文件,避免硬编码)
config = {
    "host": "localhost",    # 主机地址(远程数据库填 IP)
    "port": 3306,           # 端口(默认 3306)
    "user": "root",         # 用户名
    "password": "123456",   # 密码
    "database": "python_db",# 数据库名
    "charset": "utf8mb4"    # 字符集(支持中文)
}

# 建立连接
try:
    # 连接数据库(cursorclass=DictCursor 使结果为字典)
    conn = pymysql.connect(**config, cursorclass=DictCursor)
    print("数据库连接成功")
except pymysql.MySQLError as e:
    print(f"连接失败:{e}")
(2)插入数据(Create)
try:
    with conn.cursor() as cursor:  # 自动管理游标(离开 with 自动关闭)
        # SQL 插入语句(参数用 %s 占位,避免 SQL 注入)
        sql = "INSERT INTO users (name, age, email) VALUES (%s, %s, %s)"
        # 插入单条数据
        cursor.execute(sql, ("张三", 25, "zhangsan@example.com"))
        # 插入多条数据(批量插入,效率更高)
        users = [
            ("李四", 30, "lisi@example.com"),
            ("王五", 28, "wangwu@example.com")
        ]
        cursor.executemany(sql, users)  # 批量执行

    conn.commit()  # 提交事务(必须调用,否则数据不生效)
    print(f"插入成功,影响行数:{cursor.rowcount}")
except pymysql.MySQLError as e:
    conn.rollback()  # 出错时回滚事务
    print(f"插入失败:{e}")
(3)查询数据(Read)
try:
    with conn.cursor() as cursor:
        # 1. 简单查询(查询所有用户)
        sql = "SELECT * FROM users"
        cursor.execute(sql)
        all_users = cursor.fetchall()  # 获取所有结果(列表套字典)
        print("所有用户:", all_users)

        # 2. 条件查询(查询 age > 26 的用户)
        sql = "SELECT name, email FROM users WHERE age > %s"
        cursor.execute(sql, (26,))
        filtered_users = cursor.fetchmany(2)  # 获取前 2 条结果
        print("年龄 >26 的用户:", filtered_users)

        # 3. 单条查询(获取第一条)
        cursor.execute(sql, (26,))
        first_user = cursor.fetchone()  # 获取一条结果
        print("第一条符合条件的用户:", first_user)
except pymysql.MySQLError as e:
    print(f"查询失败:{e}")
(4)更新数据(Update)
try:
    with conn.cursor() as cursor:
        # 更新张三的年龄为 26
        sql = "UPDATE users SET age = %s WHERE name = %s"
        cursor.execute(sql, (26, "张三"))

    conn.commit()
    print(f"更新成功,影响行数:{cursor.rowcount}")
except pymysql.MySQLError as e:
    conn.rollback()
    print(f"更新失败:{e}")
(5)删除数据(Delete)
try:
    with conn.cursor() as cursor:
        # 删除邮箱为 lisi@example.com 的用户
        sql = "DELETE FROM users WHERE email = %s"
        cursor.execute(sql, ("lisi@example.com",))

    conn.commit()
    print(f"删除成功,影响行数:{cursor.rowcount}")
except pymysql.MySQLError as e:
    conn.rollback()
    print(f"删除失败:{e}")
finally:
    # 关闭连接(操作完成后必须关闭)
    conn.close()
    print("数据库连接已关闭")

3. 关键注意事项

  • SQL 注入防护:必须用 %s 作为参数占位符(而非字符串拼接),execute 方法会自动转义参数。
  • 事务管理INSERT/UPDATE/DELETE 需调用 conn.commit() 提交,出错时用 conn.rollback() 回滚。
  • 连接关闭:使用 with 语句或 finally 块确保连接关闭,避免资源泄露。
  • 游标类型DictCursor 使查询结果为字典(键为列名),更易处理;默认游标返回元组(需按索引取数据)。

4. ORM 框架(进阶)

原生 SQL 操作繁琐,实际开发中常用 ORM(对象关系映射)框架(如 SQLAlchemy),将数据库表映射为 Python 类,通过类方法操作数据(无需写 SQL):

pip install sqlalchemy

示例(SQLAlchemy 操作 users 表):

from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

# 初始化引擎(连接数据库)
engine = create_engine("mysql+pymysql://root:123456@localhost:3306/python_db?charset=utf8mb4")
Base = declarative_base()  # 基类

# 定义 User 类(映射 users 表)
class User(Base):
    __tablename__ = "users"  # 表名
    id = Column(Integer, primary_key=True, autoincrement=True)
    name = Column(String(50), nullable=False)
    age = Column(Integer)
    email = Column(String(100), unique=True)

# 创建表(若不存在)
Base.metadata.create_all(engine)

# 创建会话(类似游标)
Session = sessionmaker(bind=engine)
session = Session()

# 插入数据
new_user = User(name="赵六", age=32, email="zhaoliu@example.com")
session.add(new_user)
session.commit()

# 查询数据
user = session.query(User).filter(User.age > 30).first()
print(f"查询结果:{user.name}, {user.email}")

session.close()

二、Python 操作 Redis

Redis 是高性能的 NoSQL 内存数据库,基于键值对存储,支持多种数据类型(字符串、哈希、列表等),适合缓存、会话存储、计数器等场景(速度远快于 MySQL)。

1. 环境准备

(1)安装 Redis 客户端

Python 操作 Redis 主要用 redis-py 库:

pip install redis
(2)准备 Redis 环境
  • 安装 Redis 服务器(本地或远程),默认端口 6379,无密码(生产环境需配置密码)。

2. 核心操作:连接与数据类型

基本流程:连接 Redis → 操作数据(基于数据类型) → 关闭连接(通常用连接池管理连接)。

(1)连接 Redis(使用连接池)

Redis 连接池可复用连接,减少创建/关闭连接的开销(推荐使用):

import redis

# 创建连接池(参数与 redis 配置一致)
pool = redis.ConnectionPool(
    host="localhost",  # 主机
    port=6379,         # 端口
    password="",       # 密码(无密码则为空)
    db=0,              # 数据库编号(默认 0,Redis 支持 16 个库)
    decode_responses=True  # 自动将返回的字节解码为字符串(否则返回 bytes)
)

# 从连接池获取连接
r = redis.Redis(connection_pool=pool)
print("Redis 连接成功")
(2)字符串(String):最基础的键值对

字符串是 Redis 最常用的类型,可存储文本、数字等(最大 512MB)。

# 设置键值(set key value,默认永久有效)
r.set("name", "Redis")  # → True

# 设置键值并指定过期时间(30 秒后过期)
r.setex("temp_key", 30, "30秒后消失")  # ex=30 表示过期时间(秒)

# 获取值
print(r.get("name"))  # → "Redis"

# 自增/自减(适合计数器)
r.set("count", 10)
r.incr("count")  # 自增 1 → 11
r.decr("count", 2)  # 自减 2 → 9
print(r.get("count"))  # → "9"

# 删除键
r.delete("temp_key")  # → 1(删除成功的数量)
(3)哈希(Hash):适合存储对象(如用户信息)

哈希是键值对的集合(类似 Python 字典),适合存储结构化对象(如用户的姓名、年龄、邮箱)。

# 存储用户信息(hset key field value)
r.hset("user:1001", "name", "张三")  # 单个字段
r.hset("user:1001", mapping={        # 多个字段(mapping 参数)
    "age": 25,
    "email": "zhangsan@example.com"
})

# 获取字段值
print(r.hget("user:1001", "name"))  # → "张三"

# 获取所有字段和值
print(r.hgetall("user:1001"))  # → {'name': '张三', 'age': '25', 'email': 'zhangsan@example.com'}

# 获取所有字段名/值
print(r.hkeys("user:1001"))  # → ['name', 'age', 'email']
print(r.hvals("user:1001"))  # → ['张三', '25', 'zhangsan@example.com']

# 删除字段
r.hdel("user:1001", "age")  # → 1(删除的字段数)
(4)列表(List):有序可重复,适合队列/栈

列表是有序的字符串集合,支持两端插入/删除,可实现队列(FIFO)或栈(LIFO)。

# 从右侧插入元素(rpush:右侧 push)
r.rpush("fruits", "apple", "banana", "cherry")  # → 3(列表长度)

# 从左侧插入元素(lpush)
r.lpush("fruits", "orange")  # → 4

# 获取指定范围元素(lrange key start end,-1 表示最后一个)
print(r.lrange("fruits", 0, -1))  # → ['orange', 'apple', 'banana', 'cherry']

# 弹出元素(右侧弹出 rpop,左侧弹出 lpop)
print(r.rpop("fruits"))  # → 'cherry'(右侧弹出)
print(r.lpop("fruits"))  # → 'orange'(左侧弹出)
print(r.lrange("fruits", 0, -1))  # → ['apple', 'banana']
(5)集合(Set):无序不可重复,适合去重/交集

集合是无序的字符串集合,元素唯一,支持交集、并集等操作(如共同关注的用户)。

# 添加元素(sadd key member...)
r.sadd("set1", "a", "b", "c")  # → 3
r.sadd("set2", "b", "c", "d")  # → 3

# 判断元素是否在集合中
print(r.sismember("set1", "a"))  # → True

# 获取所有元素
print(r.smembers("set1"))  # → {'a', 'b', 'c'}(无序)

# 交集(两个集合的共同元素)
print(r.sinter("set1", "set2"))  # → {'b', 'c'}

# 并集(合并两个集合,去重)
print(r.sunion("set1", "set2"))  # → {'a', 'b', 'c', 'd'}
(6)其他常用操作
# 查看所有键
print(r.keys("*"))  # → 所有键(如 ['name', 'user:1001', 'fruits'])

# 检查键是否存在
print(r.exists("name"))  # → 1(存在)

# 设置键的过期时间(单位:秒)
r.expire("name", 60)  # 60 秒后过期

# 查看键的剩余过期时间(-1 表示永久,-2 表示已过期)
print(r.ttl("name"))  # → 剩余秒数

3. 关键注意事项

  • 连接池:必须使用连接池管理连接,避免频繁创建连接(Redis 性能虽高,但连接开销仍存在)。
  • 数据类型选择:根据场景选择合适的类型(如缓存用户信息用 Hash,计数器用 String,去重用 Set)。
  • 过期时间:合理设置 expiresetex,避免内存溢出(Redis 是内存数据库,内存有限)。
  • 序列化:若存储 Python 对象(如字典、列表),需先序列化(如 json.dumps()),读取时反序列化(json.loads())。

三、MySQL 与 Redis 的适用场景对比

特性 MySQL(关系型) Redis(NoSQL 内存型)
数据存储 磁盘(持久化,可存大量数据) 内存(速度快,适合小数据量/热点数据)
数据结构 二维表(结构化,支持 SQL 复杂查询) 键值对(字符串、Hash、List 等简单结构)
事务支持 强事务(ACID 特性) 简单事务(部分命令支持原子性)
适用场景 持久化存储(用户数据、订单记录)、复杂查询 缓存(减轻 MySQL 压力)、会话存储、计数器、实时排行榜

总结

  • MySQL:通过 PyMySQLmysql-connector-python 连接,核心是执行 SQL 语句,需注意事务和 SQL 注入防护;进阶用 ORM 框架(如 SQLAlchemy)简化开发。
  • Redis:通过 redis-py 连接,基于数据类型操作,需用连接池管理连接,适合缓存和高频读写场景。

实际开发中,两者常结合使用(如 Redis 缓存 MySQL 中的热点数据,提高访问速度),需根据数据特性和业务需求选择合适的操作方式。

连接数据库

import pymysql

# 创建连接  
conn = pymysql.Connection(
    host = '127.0.0.1',
    user = 'root',
    password = 'root123',
    database = 'school',
    charset = 'utf8mb4'
)


# 创建游标(查询数据返回为元组格式)  
# cursor = conn.cursor()  

# 创建游标(查询数据返回为字典格式)  
cursor = conn.cursor(pymysql.cursors.DictCursor)  

# 1. 执行SQL,返回受影响的行数  
effect_row1 = cursor.execute("select * from USER")  

# 2. 执行SQL,返回受影响的行数,一次插入多行数据  
effect_row2 = cursor.executemany("insert into USER (NAME) values(%s)", [("jack"), ("boom"), ("lucy")])  # 3 

# 获取最新创建的数据自增ID
new_id = cursor.lastrowid  
print(new_id)   

# 查询所有数据,默认返回数据为元组格式  
result = cursor.fetchall()  

# 增/删/改均需要进行commit提交,进行保存  
conn.commit()  

# 关闭游标  
cursor.close()  

# 关闭连接  
conn.close()  

print(result)  
"""  
[{'id': 6, 'name': 'boom'}, {'id': 5, 'name': 'jack'}, {'id': 7, 'name': 'lucy'}, {'id': 4, 'name': 'tome'}, {'id': 3, 'name': 'zff'}, {'id': 1, 'name': 'zhaofengfeng'}, {'id': 2, 'name': 'zhaofengfeng02'}]  
"""  

查询操作

# 获取第一行数据  
row_1 = cursor.fetchone()  

# 获取前n行数据  
row_2 = cursor.fetchmany(3)  

#  获取所有数据  
row_3 = cursor.fetchall()  

Redis

A. 安装 Redis 客户端库

首先,我们需要安装 Redis 客户端库。使用以下命令通过 pip 进行安装:

pip install redis
B. 导入 Redis 模块

在 Python 脚本中,导入 Redis 模块以使用 Redis 客户端库的功能:

import redis
C. 创建 Redis 客户端实例

使用 Redis 模块创建 Redis 客户端实例,用于与 Redis 服务器进行通信:

r = redis.Redis(host='localhost', port=6379, db=0)

# 指定密码
r = redis.Redis(host='localhost', port=6379, db=0, password='your_password')

三. 数据操作

A. 键值对操作
1. 设置键值对

可以使用 set 方法设置键值对:

r.set('mykey', 'myvalue')
2. 获取键值对

使用 get 方法获取键的值:

value = r.get('mykey')
print(value)  # 输出 b'myvalue'
3. 检查键是否存在

使用 exists 方法检查键是否存在:

exists = r.exists('mykey')
print(exists)  # 输出 True
4. 删除键

使用 delete 方法删除键:

deleted = r.delete('mykey')
print(deleted)  # 输出 1
5. 批量设置多个键值对

mset命令用于同时设置多个键值对。

# 批量写入数据
data = {
    'key1': 'value1',
    'key2': 'value2',
    'key3': 'value3'
}
r.mset(data)
6.批量获取多个键的值

mget命令用于同时获取多个键的值。

values = r.mget('key1', 'key2', 'key3')

#或者
keys = ['key1', 'key2', 'key3']
values = r.mget(keys)
7.批量删除多个键

delete命令用于同时删除多个键。

r.delete('key1', 'key2', 'key3')
B. 哈希表操作
1. 存储哈希表

使用 hset 方法存储哈希表:

r.hset('myhash', 'field1', 'value1')
r.hset('myhash', 'field2', 'value2')
2. 获取哈表中指定字段的值

使用 hget 方法获取哈希表中指定字段的值:

field_value = r.hget('myhash', 'field1')
print(field_value)  # 输出 b'value1'
3.删除哈希表
# 删除整个哈希表
r.delete('myhash')
4.获取哈希表的所有字段和值

使用 hgetall 方法获取哈希表的所有字段和值:

hash_data = r.hgetall('myhash')
print(hash_data)  # 输出 {b'field1': b'value1', b'field2': b'value2'}
5.批量设置哈希表字段

hmset命令用于同时设置多个哈希表字段的值。

r.hmset('myhash', {'field1': 'value1', 'field2': 'value2', 'field3': 'value3'})
6.批量获取哈希表字段的值

hmget 命令用于同时获取多个哈希表字段的值。

values = r.hmget('myhash', 'field1', 'field2', 'field3')
7.批量删除哈希表字段

hdel 命令用于同时删除多个哈希表字段。

r.hdel('myhash', 'field1', 'field2', 'field3')
C. 列表操作
1. 添加元素到列表

使用 lpushrpush 方法向列表的左侧或右侧添加元素:

r.lpush('mylist', 'value1')
r.rpush('mylist', 'value2')
# 批量添加元素到列表的右侧
r.rpush('mylist', 'element1', 'element2', 'element3')
# 批量添加元素到列表的左侧
r.lpush('mylist', 'element0', 'element-1', 'element-2')
2. 获取列表元素

使用 lrange 方法获取列表的指定范围元素:

list_data = r.lrange('mylist', 0, -1)
print(list_data)  # 输出 [b'value1', b'value2']
3. 获取列表长度

使用 llen 方法获取列表的长度:

list_length = r.llen('mylist')
print(list_length)  # 输出 2
D. 集合操作
1. 添加元素到集合

使用 sadd 方法向集合中添加元素:

r.sadd('myset', 'value1')
r.sadd('myset', 'value2')
# 批量添加元素到合集
r.sadd('myset', 'element1', 'element2', 'element3')
2. 检查元素是否存在于集合中

使用 sismember 方法检查元素是否存在于集合中:

is_member = r.sismember('myset', 'value1')
print(is_member)  # 输出 True
3. 获取集合的所有元素

使用 smembers 方法获取集合的所有元素:

set_data = r.smembers('myset')
print(set_data)  # 输出 {b'value1', b'value2'}
4.删除合集元素
# 删除单个元素
r.srem('myset', 'element3')

# 批量删除多个元素
r.srem('myset', 'element1', 'element5')
E. 有序集合操作
1. 添加元素到有序集合

使用 zadd 方法向有序集合中添加元素:

r.zadd('myzset', {'value1': 1, 'value2': 2})
2. 获取有序集合的元素

使用 zrange 方法获取有序集合的指定范围元素:

zset_data = r.zrange('myzset', 0, -1)
print(zset_data)  # 输出 [b'value1', b'value2']
3. 获取有序集合的长度

使用 zcard 方法获取有序集合的长度:

zset_length = r.zcard('myzset')
print(zset_length)  # 输出 2
F. 发布/订阅操作
1. 发布消息

使用 publish 方法发布消息到指定频道:

r.publish('mychannel', 'Hello, Redis!')
2. 订阅消息

使用 Redis 模块的 pubsub 类进行消息订阅:

pubsub = r.pubsub()
pubsub.subscribe('mychannel')

for message in pubsub.listen():
    print(message)

四. 高级功能和用例

A. 事务操作

Redis 支持事务操作,可以一次性执行多个命令,并保证这些命令的原子性。

# 开启事务
pipe = r.pipeline()

# 执行事务操作
pipe.set('key1', 'value1')
pipe.set('key2', 'value2')
pipe.get('key1')
pipe.get('key2')

# 提交事务
result = pipe.execute()

print(result)  # 输出 [True, True, b'value1', b'value2']
B. 过期时间和持久化

Redis 支持设置键的过期时间,以及将数据持久化到磁盘。

# 设置键的过期时间(单位为秒)
r.setex('mykey', 60, 'myvalue')

# 获取键的剩余生存时间
ttl = r.ttl('key')
print(ttl)  # 输出: 57,表示剩余的生存时间为 57 秒

# 持久化数据到磁盘
r.save()
C. 分布式锁

Redis 可以用作分布式锁的实现,确保在分布式环境下对共享资源的访问安全。

# 获取分布式锁
lock_acquired = r.set('mylock', 'locked', nx=True, ex=10)

if lock_acquired:
    # 执行需要加锁的操作
    print('Lock acquired. Performing critical section.')

    # 释放锁
    r.delete('mylock')
else:
    print('Failed to acquire lock. Another process holds the lock.')

我们使用 Redis 的 set 方法来设置一个键值对作为分布式锁。参数nx=True表示只有当键不存在时才设置该键,即实现了原子性的加锁操作。参数ex=10设置了该键的过期时间为 10 秒,以防止锁被长时间占用。如果 lock_acquired 为 True,表示成功获取到了锁。在这种情况下,我们可以执行需要加锁的操作,然后使用 r.delete('mylock') 释放锁,让其他进程有机会获取锁。如果 lock_acquired 为 False,表示获取锁失败,说明另一个进程已经持有了该锁。在这种情况下,我们可以执行相应的逻辑,比如等待一段时间后再尝试获取锁或执行备选方案。 需要注意的是,在释放锁之前,确保只有获取锁的进程能够删除该键。这可以通过在设置锁时为其设置一个唯一的标识符来实现,以便在释放锁时进行验证。