Python数据库编程


Python数据库编程

MySQL

python可以通过pymysql模块连接使用MySQL

PyMySQL

# 安装pymysql
pip install pymysql

连接数据库

import pymysql

# 创建连接  
conn = pymysql.Connection(
    host = '127.0.0.1',
    user = 'root',
    password = 'root123',
    database = 'school',
    charset = 'utf8mb4'
)


# 创建游标(查询数据返回为元组格式)  
# cursor = conn.cursor()  

# 创建游标(查询数据返回为字典格式)  
cursor = conn.cursor(pymysql.cursors.DictCursor)  

# 1. 执行SQL,返回受影响的行数  
effect_row1 = cursor.execute("select * from USER")  

# 2. 执行SQL,返回受影响的行数,一次插入多行数据  
effect_row2 = cursor.executemany("insert into USER (NAME) values(%s)", [("jack"), ("boom"), ("lucy")])  # 3 

# 获取最新创建的数据自增ID
new_id = cursor.lastrowid  
print(new_id)   

# 查询所有数据,默认返回数据为元组格式  
result = cursor.fetchall()  

# 增/删/改均需要进行commit提交,进行保存  
conn.commit()  

# 关闭游标  
cursor.close()  

# 关闭连接  
conn.close()  

print(result)  
"""  
[{'id': 6, 'name': 'boom'}, {'id': 5, 'name': 'jack'}, {'id': 7, 'name': 'lucy'}, {'id': 4, 'name': 'tome'}, {'id': 3, 'name': 'zff'}, {'id': 1, 'name': 'zhaofengfeng'}, {'id': 2, 'name': 'zhaofengfeng02'}]  
"""  

查询操作

# 获取第一行数据  
row_1 = cursor.fetchone()  

# 获取前n行数据  
row_2 = cursor.fetchmany(3)  

#  获取所有数据  
row_3 = cursor.fetchall()  

Redis

A. 安装 Redis 客户端库

首先,我们需要安装 Redis 客户端库。使用以下命令通过 pip 进行安装:

pip install redis
B. 导入 Redis 模块

在 Python 脚本中,导入 Redis 模块以使用 Redis 客户端库的功能:

import redis
C. 创建 Redis 客户端实例

使用 Redis 模块创建 Redis 客户端实例,用于与 Redis 服务器进行通信:

r = redis.Redis(host='localhost', port=6379, db=0)

# 指定密码
r = redis.Redis(host='localhost', port=6379, db=0, password='your_password')

三. 数据操作

A. 键值对操作
1. 设置键值对

可以使用 set 方法设置键值对:

r.set('mykey', 'myvalue')
2. 获取键值对

使用 get 方法获取键的值:

value = r.get('mykey')
print(value)  # 输出 b'myvalue'
3. 检查键是否存在

使用 exists 方法检查键是否存在:

exists = r.exists('mykey')
print(exists)  # 输出 True
4. 删除键

使用 delete 方法删除键:

deleted = r.delete('mykey')
print(deleted)  # 输出 1
5. 批量设置多个键值对

mset命令用于同时设置多个键值对。

# 批量写入数据
data = {
    'key1': 'value1',
    'key2': 'value2',
    'key3': 'value3'
}
r.mset(data)
6.批量获取多个键的值

mget命令用于同时获取多个键的值。

values = r.mget('key1', 'key2', 'key3')

#或者
keys = ['key1', 'key2', 'key3']
values = r.mget(keys)
7.批量删除多个键

delete命令用于同时删除多个键。

r.delete('key1', 'key2', 'key3')
B. 哈希表操作
1. 存储哈希表

使用 hset 方法存储哈希表:

r.hset('myhash', 'field1', 'value1')
r.hset('myhash', 'field2', 'value2')
2. 获取哈表中指定字段的值

使用 hget 方法获取哈希表中指定字段的值:

field_value = r.hget('myhash', 'field1')
print(field_value)  # 输出 b'value1'
3.删除哈希表
# 删除整个哈希表
r.delete('myhash')
4.获取哈希表的所有字段和值

使用 hgetall 方法获取哈希表的所有字段和值:

hash_data = r.hgetall('myhash')
print(hash_data)  # 输出 {b'field1': b'value1', b'field2': b'value2'}
5.批量设置哈希表字段

hmset命令用于同时设置多个哈希表字段的值。

r.hmset('myhash', {'field1': 'value1', 'field2': 'value2', 'field3': 'value3'})
6.批量获取哈希表字段的值

hmget 命令用于同时获取多个哈希表字段的值。

values = r.hmget('myhash', 'field1', 'field2', 'field3')
7.批量删除哈希表字段

hdel 命令用于同时删除多个哈希表字段。

r.hdel('myhash', 'field1', 'field2', 'field3')
C. 列表操作
1. 添加元素到列表

使用 lpushrpush 方法向列表的左侧或右侧添加元素:

r.lpush('mylist', 'value1')
r.rpush('mylist', 'value2')
# 批量添加元素到列表的右侧
r.rpush('mylist', 'element1', 'element2', 'element3')
# 批量添加元素到列表的左侧
r.lpush('mylist', 'element0', 'element-1', 'element-2')
2. 获取列表元素

使用 lrange 方法获取列表的指定范围元素:

list_data = r.lrange('mylist', 0, -1)
print(list_data)  # 输出 [b'value1', b'value2']
3. 获取列表长度

使用 llen 方法获取列表的长度:

list_length = r.llen('mylist')
print(list_length)  # 输出 2
D. 集合操作
1. 添加元素到集合

使用 sadd 方法向集合中添加元素:

r.sadd('myset', 'value1')
r.sadd('myset', 'value2')
# 批量添加元素到合集
r.sadd('myset', 'element1', 'element2', 'element3')
2. 检查元素是否存在于集合中

使用 sismember 方法检查元素是否存在于集合中:

is_member = r.sismember('myset', 'value1')
print(is_member)  # 输出 True
3. 获取集合的所有元素

使用 smembers 方法获取集合的所有元素:

set_data = r.smembers('myset')
print(set_data)  # 输出 {b'value1', b'value2'}
4.删除合集元素
# 删除单个元素
r.srem('myset', 'element3')

# 批量删除多个元素
r.srem('myset', 'element1', 'element5')
E. 有序集合操作
1. 添加元素到有序集合

使用 zadd 方法向有序集合中添加元素:

r.zadd('myzset', {'value1': 1, 'value2': 2})
2. 获取有序集合的元素

使用 zrange 方法获取有序集合的指定范围元素:

zset_data = r.zrange('myzset', 0, -1)
print(zset_data)  # 输出 [b'value1', b'value2']
3. 获取有序集合的长度

使用 zcard 方法获取有序集合的长度:

zset_length = r.zcard('myzset')
print(zset_length)  # 输出 2
F. 发布/订阅操作
1. 发布消息

使用 publish 方法发布消息到指定频道:

r.publish('mychannel', 'Hello, Redis!')
2. 订阅消息

使用 Redis 模块的 pubsub 类进行消息订阅:

pubsub = r.pubsub()
pubsub.subscribe('mychannel')

for message in pubsub.listen():
    print(message)

四. 高级功能和用例

A. 事务操作

Redis 支持事务操作,可以一次性执行多个命令,并保证这些命令的原子性。

# 开启事务
pipe = r.pipeline()

# 执行事务操作
pipe.set('key1', 'value1')
pipe.set('key2', 'value2')
pipe.get('key1')
pipe.get('key2')

# 提交事务
result = pipe.execute()

print(result)  # 输出 [True, True, b'value1', b'value2']
B. 过期时间和持久化

Redis 支持设置键的过期时间,以及将数据持久化到磁盘。

# 设置键的过期时间(单位为秒)
r.setex('mykey', 60, 'myvalue')

# 获取键的剩余生存时间
ttl = r.ttl('key')
print(ttl)  # 输出: 57,表示剩余的生存时间为 57 秒

# 持久化数据到磁盘
r.save()
C. 分布式锁

Redis 可以用作分布式锁的实现,确保在分布式环境下对共享资源的访问安全。

# 获取分布式锁
lock_acquired = r.set('mylock', 'locked', nx=True, ex=10)

if lock_acquired:
    # 执行需要加锁的操作
    print('Lock acquired. Performing critical section.')

    # 释放锁
    r.delete('mylock')
else:
    print('Failed to acquire lock. Another process holds the lock.')

我们使用 Redis 的 set 方法来设置一个键值对作为分布式锁。参数nx=True表示只有当键不存在时才设置该键,即实现了原子性的加锁操作。参数ex=10设置了该键的过期时间为 10 秒,以防止锁被长时间占用。如果 lock_acquired 为 True,表示成功获取到了锁。在这种情况下,我们可以执行需要加锁的操作,然后使用 r.delete('mylock') 释放锁,让其他进程有机会获取锁。如果 lock_acquired 为 False,表示获取锁失败,说明另一个进程已经持有了该锁。在这种情况下,我们可以执行相应的逻辑,比如等待一段时间后再尝试获取锁或执行备选方案。 需要注意的是,在释放锁之前,确保只有获取锁的进程能够删除该键。这可以通过在设置锁时为其设置一个唯一的标识符来实现,以便在释放锁时进行验证。