Python数据库编程
MySQL
python可以通过pymysql模块连接使用MySQL
PyMySQL
# 安装pymysql
pip install pymysql
连接数据库
import pymysql
# 创建连接
conn = pymysql.Connection(
host = '127.0.0.1',
user = 'root',
password = 'root123',
database = 'school',
charset = 'utf8mb4'
)
# 创建游标(查询数据返回为元组格式)
# cursor = conn.cursor()
# 创建游标(查询数据返回为字典格式)
cursor = conn.cursor(pymysql.cursors.DictCursor)
# 1. 执行SQL,返回受影响的行数
effect_row1 = cursor.execute("select * from USER")
# 2. 执行SQL,返回受影响的行数,一次插入多行数据
effect_row2 = cursor.executemany("insert into USER (NAME) values(%s)", [("jack"), ("boom"), ("lucy")]) # 3
# 获取最新创建的数据自增ID
new_id = cursor.lastrowid
print(new_id)
# 查询所有数据,默认返回数据为元组格式
result = cursor.fetchall()
# 增/删/改均需要进行commit提交,进行保存
conn.commit()
# 关闭游标
cursor.close()
# 关闭连接
conn.close()
print(result)
"""
[{'id': 6, 'name': 'boom'}, {'id': 5, 'name': 'jack'}, {'id': 7, 'name': 'lucy'}, {'id': 4, 'name': 'tome'}, {'id': 3, 'name': 'zff'}, {'id': 1, 'name': 'zhaofengfeng'}, {'id': 2, 'name': 'zhaofengfeng02'}]
"""
查询操作
# 获取第一行数据
row_1 = cursor.fetchone()
# 获取前n行数据
row_2 = cursor.fetchmany(3)
# 获取所有数据
row_3 = cursor.fetchall()
Redis
A. 安装 Redis 客户端库
首先,我们需要安装 Redis 客户端库。使用以下命令通过 pip 进行安装:
pip install redis
B. 导入 Redis 模块
在 Python 脚本中,导入 Redis 模块以使用 Redis 客户端库的功能:
import redis
C. 创建 Redis 客户端实例
使用 Redis 模块创建 Redis 客户端实例,用于与 Redis 服务器进行通信:
r = redis.Redis(host='localhost', port=6379, db=0)
# 指定密码
r = redis.Redis(host='localhost', port=6379, db=0, password='your_password')
三. 数据操作
A. 键值对操作
1. 设置键值对
可以使用 set
方法设置键值对:
r.set('mykey', 'myvalue')
2. 获取键值对
使用 get
方法获取键的值:
value = r.get('mykey')
print(value) # 输出 b'myvalue'
3. 检查键是否存在
使用 exists
方法检查键是否存在:
exists = r.exists('mykey')
print(exists) # 输出 True
4. 删除键
使用 delete
方法删除键:
deleted = r.delete('mykey')
print(deleted) # 输出 1
5. 批量设置多个键值对
mset
命令用于同时设置多个键值对。
# 批量写入数据
data = {
'key1': 'value1',
'key2': 'value2',
'key3': 'value3'
}
r.mset(data)
6.批量获取多个键的值
mget
命令用于同时获取多个键的值。
values = r.mget('key1', 'key2', 'key3')
#或者
keys = ['key1', 'key2', 'key3']
values = r.mget(keys)
7.批量删除多个键
delete
命令用于同时删除多个键。
r.delete('key1', 'key2', 'key3')
B. 哈希表操作
1. 存储哈希表
使用 hset
方法存储哈希表:
r.hset('myhash', 'field1', 'value1')
r.hset('myhash', 'field2', 'value2')
2. 获取哈表中指定字段的值
使用 hget
方法获取哈希表中指定字段的值:
field_value = r.hget('myhash', 'field1')
print(field_value) # 输出 b'value1'
3.删除哈希表
# 删除整个哈希表
r.delete('myhash')
4.获取哈希表的所有字段和值
使用 hgetall 方法获取哈希表的所有字段和值:
hash_data = r.hgetall('myhash')
print(hash_data) # 输出 {b'field1': b'value1', b'field2': b'value2'}
5.批量设置哈希表字段
hmset
命令用于同时设置多个哈希表字段的值。
r.hmset('myhash', {'field1': 'value1', 'field2': 'value2', 'field3': 'value3'})
6.批量获取哈希表字段的值
hmget
命令用于同时获取多个哈希表字段的值。
values = r.hmget('myhash', 'field1', 'field2', 'field3')
7.批量删除哈希表字段
hdel
命令用于同时删除多个哈希表字段。
r.hdel('myhash', 'field1', 'field2', 'field3')
C. 列表操作
1. 添加元素到列表
使用 lpush
或 rpush
方法向列表的左侧或右侧添加元素:
r.lpush('mylist', 'value1')
r.rpush('mylist', 'value2')
# 批量添加元素到列表的右侧
r.rpush('mylist', 'element1', 'element2', 'element3')
# 批量添加元素到列表的左侧
r.lpush('mylist', 'element0', 'element-1', 'element-2')
2. 获取列表元素
使用 lrange
方法获取列表的指定范围元素:
list_data = r.lrange('mylist', 0, -1)
print(list_data) # 输出 [b'value1', b'value2']
3. 获取列表长度
使用 llen
方法获取列表的长度:
list_length = r.llen('mylist')
print(list_length) # 输出 2
D. 集合操作
1. 添加元素到集合
使用 sadd
方法向集合中添加元素:
r.sadd('myset', 'value1')
r.sadd('myset', 'value2')
# 批量添加元素到合集
r.sadd('myset', 'element1', 'element2', 'element3')
2. 检查元素是否存在于集合中
使用 sismember
方法检查元素是否存在于集合中:
is_member = r.sismember('myset', 'value1')
print(is_member) # 输出 True
3. 获取集合的所有元素
使用 smembers
方法获取集合的所有元素:
set_data = r.smembers('myset')
print(set_data) # 输出 {b'value1', b'value2'}
4.删除合集元素
# 删除单个元素
r.srem('myset', 'element3')
# 批量删除多个元素
r.srem('myset', 'element1', 'element5')
E. 有序集合操作
1. 添加元素到有序集合
使用 zadd
方法向有序集合中添加元素:
r.zadd('myzset', {'value1': 1, 'value2': 2})
2. 获取有序集合的元素
使用 zrange
方法获取有序集合的指定范围元素:
zset_data = r.zrange('myzset', 0, -1)
print(zset_data) # 输出 [b'value1', b'value2']
3. 获取有序集合的长度
使用 zcard
方法获取有序集合的长度:
zset_length = r.zcard('myzset')
print(zset_length) # 输出 2
F. 发布/订阅操作
1. 发布消息
使用 publish
方法发布消息到指定频道:
r.publish('mychannel', 'Hello, Redis!')
2. 订阅消息
使用 Redis 模块的 pubsub
类进行消息订阅:
pubsub = r.pubsub()
pubsub.subscribe('mychannel')
for message in pubsub.listen():
print(message)
四. 高级功能和用例
A. 事务操作
Redis 支持事务操作,可以一次性执行多个命令,并保证这些命令的原子性。
# 开启事务
pipe = r.pipeline()
# 执行事务操作
pipe.set('key1', 'value1')
pipe.set('key2', 'value2')
pipe.get('key1')
pipe.get('key2')
# 提交事务
result = pipe.execute()
print(result) # 输出 [True, True, b'value1', b'value2']
B. 过期时间和持久化
Redis 支持设置键的过期时间,以及将数据持久化到磁盘。
# 设置键的过期时间(单位为秒)
r.setex('mykey', 60, 'myvalue')
# 获取键的剩余生存时间
ttl = r.ttl('key')
print(ttl) # 输出: 57,表示剩余的生存时间为 57 秒
# 持久化数据到磁盘
r.save()
C. 分布式锁
Redis 可以用作分布式锁的实现,确保在分布式环境下对共享资源的访问安全。
# 获取分布式锁
lock_acquired = r.set('mylock', 'locked', nx=True, ex=10)
if lock_acquired:
# 执行需要加锁的操作
print('Lock acquired. Performing critical section.')
# 释放锁
r.delete('mylock')
else:
print('Failed to acquire lock. Another process holds the lock.')
我们使用 Redis 的 set 方法来设置一个键值对作为分布式锁。参数nx=True
表示只有当键不存在时才设置该键,即实现了原子性的加锁操作。参数ex=10
设置了该键的过期时间为 10 秒,以防止锁被长时间占用。如果 lock_acquired
为 True,表示成功获取到了锁。在这种情况下,我们可以执行需要加锁的操作,然后使用 r.delete('mylock')
释放锁,让其他进程有机会获取锁。如果 lock_acquired
为 False,表示获取锁失败,说明另一个进程已经持有了该锁。在这种情况下,我们可以执行相应的逻辑,比如等待一段时间后再尝试获取锁或执行备选方案。
需要注意的是,在释放锁之前,确保只有获取锁的进程能够删除该键。这可以通过在设置锁时为其设置一个唯一的标识符来实现,以便在释放锁时进行验证。