Python 多线程是并发编程的基础手段之一,适用于处理 I/O 密集型任务(如网络请求、文件读写等)。它通过在同一进程内创建多个线程,共享进程资源,实现任务的“并发”执行(宏观上同时进行,微观上 CPU 快速切换)。本文将详细介绍 Python 多线程的核心概念、实现方式、同步机制及应用场景。
一、核心概念:线程与 GIL
1. 线程(Thread)
线程是进程内的最小执行单元,一个进程可以包含多个线程。线程共享进程的内存空间(如全局变量、文件句柄等),但拥有独立的栈空间(用于函数调用和局部变量)。
- 优势:创建和切换成本远低于进程,适合任务间需要频繁通信的场景。
- 适用场景:I/O 密集型任务(如爬虫、文件读写、数据库操作),这类任务大部分时间在等待(如等待网络响应),多线程可在等待时切换到其他线程,提高 CPU 利用率。
2. GIL(全局解释器锁)
Python 的 GIL(Global Interpreter Lock) 是一个关键特性,它限制了同一时刻只有一个线程能执行 Python 字节码。这导致:
- CPU 密集型任务(如大量计算)无法通过多线程实现“并行”(只能并发,即交替执行),因为 GIL 会在线程间频繁切换,但同一时刻只有一个线程在计算。
- I/O 密集型任务不受 GIL 显著影响,因为 I/O 操作时线程会释放 GIL(等待期间不占用 CPU),其他线程可获取 GIL 执行。
二、多线程实现:threading 模块
Python 提供 threading 模块(推荐)和 legacy 的 _thread 模块(低级接口),threading 封装了更易用的 API,支持线程创建、同步、管理等。
1. 创建线程的两种方式
(1)通过 threading.Thread 类传入目标函数(推荐)
将线程要执行的逻辑封装为函数,作为 Thread 的 target 参数传入。
import threading
import time
# 线程要执行的函数
def task(name, delay):
for i in range(3):
print(f"线程 {name} 执行第 {i+1} 次,当前时间:{time.ctime()}")
time.sleep(delay) # 模拟 I/O 等待(释放 GIL)
# 创建线程(target 为目标函数,args 为函数参数)
t1 = threading.Thread(target=task, args=("T1", 1)) # 线程1:间隔1秒
t2 = threading.Thread(target=task, args=("T2", 2)) # 线程2:间隔2秒
# 启动线程(进入就绪状态,等待 CPU 调度)
t1.start()
t2.start()
# 主线程等待子线程结束(避免主线程先退出)
t1.join()
t2.join()
print("所有线程执行完毕")
start():启动线程(调用线程的run()方法),而非直接调用task()(否则会变成单线程执行)。join(timeout=None):主线程阻塞,等待子线程执行完毕(timeout为最长等待时间,单位秒)。
(2)继承 threading.Thread 类并重写 run() 方法
将线程逻辑封装在 run() 方法中,适合复杂逻辑的封装。
import threading
import time
class MyThread(threading.Thread):
def __init__(self, name, delay):
super().__init__() # 调用父类初始化
self.name = name
self.delay = delay
# 重写 run() 方法:线程执行的逻辑
def run(self):
for i in range(3):
print(f"线程 {self.name} 执行第 {i+1} 次,当前时间:{time.ctime()}")
time.sleep(self.delay)
# 创建并启动线程
t1 = MyThread("T1", 1)
t2 = MyThread("T2", 2)
t1.start()
t2.start()
t1.join()
t2.join()
print("所有线程执行完毕")
2. 线程的常用属性和方法
| 方法/属性 | 功能描述 |
|---|---|
threading.current_thread() |
返回当前正在执行的线程对象。 |
threading.enumerate() |
返回所有存活的线程对象列表(包括主线程)。 |
threading.active_count() |
返回存活的线程数量(等价于 len(threading.enumerate()))。 |
thread.getName() / thread.setName() |
获取/设置线程名称。 |
thread.is_alive() |
判断线程是否存活(已启动且未结束)。 |
thread.daemon / thread.setDaemon(bool) |
设置线程是否为守护线程(见下文)。 |
3. 守护线程(Daemon Thread)
守护线程是“后台线程”,主线程退出时会自动终止(无论是否执行完毕),而非守护线程(默认)会阻止主线程退出(主线程需等待其完成)。
- 用途:用于后台支持任务(如日志记录、监控),无需等待其完成。
import threading
import time
def daemon_task():
while True: # 无限循环
print("守护线程运行中...")
time.sleep(1)
# 创建守护线程(必须在 start() 前设置)
t = threading.Thread(target=daemon_task)
t.daemon = True # 或 t.setDaemon(True)
t.start()
# 主线程执行 3 秒后退出
time.sleep(3)
print("主线程退出,守护线程将被终止")
- 输出:守护线程打印 3 次后,随主线程退出而终止。
三、线程同步:解决资源竞争
多线程共享进程资源(如全局变量),若多个线程同时修改共享资源,可能导致竞态条件(Race Condition)(结果依赖线程执行顺序,出现不可预期的错误)。需通过同步机制保证资源访问的原子性(同一时间只有一个线程操作)。
1. 锁(threading.Lock)
Lock 是最基础的同步工具,通过“获取锁-操作资源-释放锁”的流程,确保临界区(访问共享资源的代码)同一时间只有一个线程执行。
- 方法:
acquire(blocking=True, timeout=-1):获取锁(阻塞等待直到获取,blocking=False不阻塞,获取失败返回False)。release():释放锁(必须由持有锁的线程调用,否则报错)。
示例:用锁解决计数器竞争问题
import threading
count = 0 # 共享资源:计数器
lock = threading.Lock() # 创建锁
def increment():
global count
for _ in range(100000):
# 获取锁(进入临界区)
lock.acquire()
try:
count += 1 # 临界区操作(非原子操作,需保护)
finally:
# 确保锁释放(即使发生异常)
lock.release()
# 创建 10 个线程同时操作计数器
threads = [threading.Thread(target=increment) for _ in range(10)]
# 启动所有线程
for t in threads:
t.start()
# 等待所有线程结束
for t in threads:
t.join()
print(f"最终计数:{count}") # 若不加锁,结果可能小于 1000000;加锁后正确为 1000000
- 若无锁,
count += 1(等价于count = count + 1)可能被多个线程打断(如线程1读取count=100,线程2也读取count=100,两者都加1后写回,导致少加1)。 - 用
try...finally确保锁释放,避免死锁。
2. 上下文管理器简化锁操作
Lock 支持 with 语句,自动完成 acquire() 和 release(),更简洁安全:
def increment():
global count
for _ in range(100000):
with lock: # 自动获取锁,退出 with 时自动释放
count += 1
3. 其他同步工具
RLock(可重入锁):允许同一线程多次获取锁(避免自身死锁),需释放相同次数。适合嵌套访问临界区的场景。Condition(条件变量):在锁的基础上增加“等待-通知”机制,用于线程间协作(如生产者-消费者模型)。Semaphore(信号量):限制同时访问资源的线程数量(如控制并发连接数)。
示例:Condition 实现生产者-消费者模型
生产者生成数据,消费者消费数据,通过 Condition 协调两者(消费者等待数据,生产者生成后通知):
import threading
import time
from queue import Queue
# 共享队列(存储数据)
queue = Queue(maxsize=5) # 最大容量5
cond = threading.Condition() # 条件变量
# 生产者:生成数据放入队列
def producer():
for i in range(10):
data = f"数据{i}"
with cond:
# 若队列满,等待消费者消费
while queue.full():
cond.wait() # 释放锁并等待通知
queue.put(data)
print(f"生产者放入:{data}")
cond.notify() # 通知消费者有数据
time.sleep(0.5)
# 消费者:从队列取数据
def consumer():
for _ in range(10):
with cond:
# 若队列空,等待生产者生成
while queue.empty():
cond.wait() # 释放锁并等待通知
data = queue.get()
print(f"消费者取出:{data}")
cond.notify() # 通知生产者有空间
time.sleep(1)
# 启动线程
t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer)
t1.start()
t2.start()
t1.join()
t2.join()
四、线程局部变量(threading.local)
多线程共享全局变量,但有时需要线程私有数据(每个线程独立拥有一份),可通过 threading.local() 实现。
import threading
# 创建线程局部变量(每个线程有独立副本)
local_data = threading.local()
def task(name):
# 为当前线程设置局部变量
local_data.value = name
# 模拟处理
time.sleep(1)
# 访问当前线程的局部变量
print(f"线程 {name} 的局部变量:{local_data.value}")
# 创建线程
t1 = threading.Thread(target=task, args=("T1",))
t2 = threading.Thread(target=task, args=("T2",))
t1.start()
t2.start()
t1.join()
t2.join()
- 输出:
线程 T1 的局部变量:T1和线程 T2 的局部变量:T2,两者互不干扰。
五、多线程的局限性与适用场景
1. 局限性
- GIL 限制:CPU 密集型任务无法通过多线程实现并行(效率可能不如单线程,因线程切换有开销)。
- 线程安全:共享资源需同步,增加代码复杂度,可能引入死锁(如线程 A 持有锁1等待锁2,线程 B 持有锁2等待锁1)。
- 不能无限创建:每个线程占用内存,过多线程会导致内存消耗激增,且切换开销增大。
2. 适用场景
- I/O 密集型任务:如网络爬虫(等待网页响应)、文件读写(等待磁盘 I/O)、数据库查询(等待数据库返回)等。
- 任务间需频繁通信:因线程共享内存,通信成本低(无需进程间的 IPC 机制)。
总结
Python 多线程基于 threading 模块,核心特点:
- 适合 I/O 密集型任务,受 GIL 影响小;CPU 密集型任务不推荐。
- 线程共享资源,需通过
Lock、Condition等同步工具避免竞态条件。 - 守护线程用于后台任务,主线程退出时自动终止。
使用多线程时,需权衡同步成本和性能收益,避免滥用导致复杂的线程安全问题。对于 CPU 密集型任务,建议使用多进程(multiprocessing 模块)绕过 GIL 限制。
多进程和协程
Python 并发编程中,多进程和协程是与多线程并列的核心技术,分别解决不同场景的并发问题:多进程突破 GIL 限制,适合 CPU 密集型任务;协程通过轻量级调度,高效处理高并发 I/O 密集型任务。本文详细解析两者的原理、实现及应用。
一、多进程(Multiprocessing):突破 GIL 限制
进程是操作系统资源分配的最小单位,每个进程拥有独立的内存空间、Python 解释器和 GIL(全局解释器锁)。因此,多进程可实现真正的并行(利用多核 CPU 同时执行),适合 CPU 密集型任务(如大量计算)。
1. 多进程与多线程的核心区别
| 特性 | 多线程(Thread) | 多进程(Process) |
|---|---|---|
| 资源共享 | 共享进程内存空间(全局变量、文件句柄等) | 内存独立(不共享,需通过 IPC 机制通信) |
| GIL 影响 | 受 GIL 限制,同一时刻仅一个线程执行字节码 | 每个进程有独立 GIL,可并行利用多核 CPU |
| 开销 | 创建和切换成本低 | 创建和切换成本高(资源分配、内存复制) |
| 适用场景 | I/O 密集型任务(如网络请求、文件读写) | CPU 密集型任务(如数据计算、图像处理) |
2. 多进程实现:multiprocessing 模块
Python 的 multiprocessing 模块提供了类似 threading 的 API,支持进程创建、管理和通信,且跨平台(Windows/Linux/macOS)。
(1)创建进程的两种方式
① 通过 multiprocessing.Process 传入目标函数
import multiprocessing
import time
# 进程要执行的函数(计算密集型任务:累加)
def calculate(name, n):
total = 0
for i in range(n):
total += i
print(f"进程 {name} 计算完成,结果:{total}")
return total
if __name__ == "__main__": # Windows 必须在 main 模块中创建进程
# 创建进程(target 为目标函数,args 为参数)
p1 = multiprocessing.Process(target=calculate, args=("P1", 10**7))
p2 = multiprocessing.Process(target=calculate, args=("P2", 10**7))
# 启动进程
start = time.time()
p1.start()
p2.start()
# 等待进程结束
p1.join()
p2.join()
print(f"总耗时:{time.time() - start:.2f}秒") # 并行执行,耗时约为单进程的一半
- 注意:Windows 系统中,进程创建依赖
pickle序列化,因此必须将进程创建代码放在if __name__ == "__main__":块中,否则会无限递归创建进程。
② 继承 multiprocessing.Process 并重写 run() 方法
import multiprocessing
class MyProcess(multiprocessing.Process):
def __init__(self, name, n):
super().__init__()
self.name = name
self.n = n
def run(self): # 进程执行的逻辑
total = 0
for i in range(self.n):
total += i
print(f"进程 {self.name} 计算完成,结果:{total}")
if __name__ == "__main__":
p1 = MyProcess("P1", 10**7)
p2 = MyProcess("P2", 10**7)
p1.start()
p2.start()
p1.join()
p2.join()
3. 进程间通信(IPC):解决内存不共享问题
多进程内存独立,需通过进程间通信(IPC) 机制交换数据,multiprocessing 提供了多种方式:
(1)Queue:安全的跨进程队列(推荐)
Queue 基于管道(Pipe)实现,线程/进程安全,支持多生产者和多消费者。
import multiprocessing
def producer(queue):
"""生产者:向队列放入数据"""
for i in range(5):
data = f"数据{i}"
queue.put(data)
print(f"生产者放入:{data}")
def consumer(queue):
"""消费者:从队列取出数据"""
while True:
data = queue.get() # 阻塞等待数据
if data is None: # 终止信号
break
print(f"消费者取出:{data}")
if __name__ == "__main__":
queue = multiprocessing.Queue() # 创建进程安全的队列
# 创建生产者和消费者进程
p_prod = multiprocessing.Process(target=producer, args=(queue,))
p_cons = multiprocessing.Process(target=consumer, args=(queue,))
p_prod.start()
p_cons.start()
p_prod.join() # 等待生产者完成
queue.put(None) # 发送终止信号
p_cons.join()
(2)Pipe:双向管道(适合两个进程通信)
Pipe() 返回两个连接对象(conn1, conn2),进程通过 send() 发送数据,recv() 接收数据(阻塞)。
import multiprocessing
def send_data(conn):
conn.send("Hello from child process") # 发送数据
conn.close()
if __name__ == "__main__":
conn1, conn2 = multiprocessing.Pipe() # 创建管道
p = multiprocessing.Process(target=send_data, args=(conn2,))
p.start()
print(conn1.recv()) # 接收数据 → "Hello from child process"
p.join()
(3)Manager:共享复杂数据结构(如字典、列表)
Manager 通过服务器进程管理共享资源,支持字典、列表、锁等,适合多进程共享复杂数据。
import multiprocessing
def update_dict(shared_dict, key, value):
shared_dict[key] = value # 修改共享字典
if __name__ == "__main__":
with multiprocessing.Manager() as manager:
shared_dict = manager.dict() # 创建共享字典
# 创建两个进程修改字典
p1 = multiprocessing.Process(target=update_dict, args=(shared_dict, "a", 1))
p2 = multiprocessing.Process(target=update_dict, args=(shared_dict, "b", 2))
p1.start()
p2.start()
p1.join()
p2.join()
print(shared_dict) # → {'a': 1, 'b': 2}(共享修改结果)
4. 进程池(Pool):管理多个进程
频繁创建/销毁进程会消耗大量资源,Pool 可创建固定数量的进程(进程池),重复利用进程处理任务,适合批量任务处理。
(1)Pool 基本用法
import multiprocessing
import time
def task(n):
"""计算 n 的平方(模拟耗时任务)"""
time.sleep(1)
return n * n
if __name__ == "__main__":
# 创建包含 4 个进程的进程池
with multiprocessing.Pool(processes=4) as pool:
# 方法1:map(批量执行任务,返回结果列表)
inputs = [1, 2, 3, 4, 5]
results = pool.map(task, inputs) # 阻塞,直到所有任务完成
print("map 结果:", results) # → [1, 4, 9, 16, 25]
# 方法2:apply_async(异步执行单个任务,非阻塞)
async_result = pool.apply_async(task, args=(6,))
print("apply_async 结果:", async_result.get()) # → 36(阻塞获取结果)
(2)Pool 常用方法
| 方法 | 功能描述 |
|---|---|
map(func, iterable) |
批量执行 iterable 中的任务,返回结果列表(阻塞)。 |
map_async(func, iterable) |
异步版 map,返回 AsyncResult 对象,通过 get() 获取结果(非阻塞)。 |
apply(func, args) |
执行单个任务(阻塞)。 |
apply_async(func, args) |
异步执行单个任务,返回 AsyncResult 对象。 |
close() |
关闭进程池,不再接受新任务。 |
join() |
等待所有进程完成任务(需在 close() 后调用)。 |
5. 多进程的适用场景
- CPU 密集型任务:如数据计算(矩阵运算、数据分析)、图像处理、密码破解等(充分利用多核 CPU)。
- 需要隔离的任务:如运行不稳定的代码(一个进程崩溃不影响其他进程)。
二、协程(Coroutine):轻量级并发
协程是用户态的轻量级线程,由程序(而非操作系统)控制调度,切换成本极低(无需内核参与),适合高并发 I/O 密集型任务(如百万级网络请求)。
1. 协程的核心特点
- 协作式调度:协程主动让出 CPU(通过
await),而非操作系统强制切换,无上下文切换开销。 - 单线程内执行:所有协程在同一线程内运行,共享线程内存,无需处理复杂的同步问题(但需避免阻塞操作)。
- 高并发支持:单线程可支持数万甚至数十万协程(内存占用远低于线程/进程)。
2. 协程实现:asyncio 模块
Python 3.4+ 引入 asyncio 模块,标准化协程支持;Python 3.5+ 新增 async/await 语法,简化协程编写(推荐)。
(1)协程的定义与运行
- 用
async def定义协程函数(返回协程对象)。 - 通过事件循环(Event Loop) 运行协程(协程不能直接调用,需注册到事件循环)。
import asyncio
# 定义协程函数
async def hello(name):
print(f"Hello, {name}!开始")
await asyncio.sleep(1) # 模拟 I/O 等待(主动让出 CPU,不阻塞线程)
print(f"Hello, {name}!结束")
return f"结果:{name}"
# 运行协程的入口函数(需在 async 函数中)
async def main():
# 方式1:直接 await 单个协程
result = await hello("协程1")
print(result)
# 方式2:并发运行多个协程(用 asyncio.gather)
results = await asyncio.gather(
hello("协程2"),
hello("协程3")
)
print("并发结果:", results)
if __name__ == "__main__":
# 获取事件循环并运行
asyncio.run(main()) # Python 3.7+ 简化写法,自动管理事件循环
- 输出分析:
- 先执行
hello("协程1"),等待 1 秒后完成。 - 然后并发执行
协程2和协程3,总耗时约 1 秒(而非 2 秒),体现协程的并发效率。
(2)任务(Task):协程的调度单位
asyncio.Task 是对协程的封装,负责将协程注册到事件循环并调度执行。asyncio.create_task() 可创建任务(并发执行)。
async def main():
# 创建任务(立即加入事件循环,开始运行)
task1 = asyncio.create_task(hello("任务1"))
task2 = asyncio.create_task(hello("任务2"))
print("等待任务完成...")
# 等待所有任务完成
await task1
await task2
asyncio.run(main())
(3)异步 I/O:协程的核心价值
协程的高效依赖异步 I/O 操作(如异步网络请求、异步文件读写),这些操作在等待时会让出 CPU,让其他协程执行。
示例:用 aiohttp 异步爬取网页(需先安装:pip install aiohttp)
import asyncio
import aiohttp
async def fetch_url(session, url):
async with session.get(url) as response: # 异步请求
return await response.text() # 异步获取响应内容
async def main():
urls = [
"https://www.baidu.com",
"https://www.bing.com",
"https://www.google.com"
]
async with aiohttp.ClientSession() as session: # 异步会话
# 创建任务列表
tasks = [fetch_url(session, url) for url in urls]
# 并发执行所有任务
results = await asyncio.gather(*tasks)
print(f"爬取完成,共 {len(results)} 个网页")
asyncio.run(main())
- 相比多线程爬虫,异步爬虫通过单线程内的协程切换,避免了线程切换开销,支持更高并发(如同时爬取数千个 URL)。
(4)避免同步阻塞操作
协程中若调用同步阻塞函数(如 time.sleep()、requests.get()),会阻塞整个事件循环(所有协程暂停)。需用异步替代方案:
- 同步 time.sleep(1) → 异步 await asyncio.sleep(1)
- 同步 requests.get() → 异步 aiohttp.ClientSession().get()
3. 协程的适用场景
- 高并发 I/O 密集型任务:如网络爬虫、API 服务、WebSocket 服务、消息队列消费等(需配合异步 I/O 库)。
- 需要轻量级并发的场景:如单线程处理数万并发连接(内存和性能优势明显)。
三、多进程、多线程、协程的对比与选择
| 技术 | 并行性 | 开销 | 数据共享 | 适用场景 | 典型库/模块 |
|---|---|---|---|---|---|
| 多线程 | 并发(GIL限制) | 低 | 共享内存 | I/O 密集型(中等并发) | threading |
| 多进程 | 并行(多核) | 高 | 需 IPC 机制 | CPU 密集型、任务隔离 | multiprocessing |
| 协程 | 并发(单线程) | 极低 | 共享线程内存 | 高并发 I/O 密集型(如百万级连接) | asyncio、aiohttp |
选择建议:
- 若任务是 CPU 密集型(如计算)→ 多进程。
- 若任务是 I/O 密集型且并发量中等 → 多线程。
- 若任务是 I/O 密集型且并发量极高(如高并发网络服务)→ 协程。
总结
- 多进程通过独立内存和 GIL 实现并行,适合 CPU 密集型任务,需通过
Queue/Manager等进行进程间通信。 - 协程是单线程内的轻量级调度,通过
async/await实现高效并发,适合高并发 I/O 密集型任务,依赖异步 I/O 库。 - 实际开发中,可结合使用(如多进程 + 协程:每个进程运行多个协程,充分利用多核和高并发优势)。
掌握这三种并发模型,能根据任务特性选择最优方案,显著提升程序性能。
多任务
今天我们使用的计算机早已进入多CPU或多核时代,而我们使用的操作系统都是支持“多任务”的操作系统
- 我们可以
同时运行多个程序 - 可以将一个程序分解为若干个相对独立的子任务
- 让多个子任务并发的执行,从而缩短程序的执行时间
- 同时也让用户获得更好的体验。
- 实现让程序同时执行多个任务也就是常说的“并发编程”
Python既支持多进程又支持多线程,因此使用Python实现并发编程主要有3种方式:多进程、多线程、多进程+多线程。
并行与并发的区别
-
并行:同时做某些事,可以互相不干扰的同一个时刻做几件事。(例如:高速公路,双向八车道,所有车都可以互不干扰的运行。)
-
并发:同一时刻,有很多事情要做。(例如:乡村小路,只有一个车道,一个方向同一时间只能过一辆车。发生多辆车同时同行的情况就是并发。)
- 可以用并行、队列、缓存区解决并发问题。
- 排队是解决并发的办法之一。
并发的解决
”食堂打饭模型“
中午12点,开饭了,所有学生都涌向了食堂。这就是并发,如果人特别多,就是高并发。
1.队列、缓冲区
假设只有一个窗口,就只能排队打菜。
排队就是把人排成队列,解决了资源使用的问题。
排成的队列,其实就是一个缓冲地带,就是缓冲区
假设女士优先,窗口就要有两队,只要有女生就优先打饭,男生就等着,女生队伍就是一个优先队列。
2.争抢
只开一个窗口,没有秩序,谁能挤进去就给谁打饭。窗口不能给其他人打饭。这是一种锁机制.
争抢也是一种解决高并发的方式,但是不好,因为有可能会有人很长时间抢不到。
3.预处理
如果打饭慢的原因是因为要现做的话,可以提前统计大多数人喜欢吃的菜品,提前做好,保证供应,提高打饭效率。
预处理的思想是加快用户获取数据的速度,常用缓存解决。
4.并行
由于吃饭的人数过多,一个窗口忙不过来,食堂老板决定扩大食堂,多开几个窗口,但是没有窗口都要雇人,造成成本上升。
企业可以通过购买更多服务器,或开启更多进程的方式做并行处理,来解决问题。(水平扩展)
注意:线程如果只在单个CPU核心上运行就并不是并行。
5.提速
提高单个窗口的打饭速度,也是解决并发的方式。
提高打饭人员技能,或为单个窗口配置更多服务人员。
提高单机的性能,或者单个服务器安装更多CPU.
这是一种垂直提升的思路
6.多地就近原则
多开几个食堂,分流。可以把食堂设置在宿舍附近,来提升吃饭速度。
进程与线程
进程(Process)就是操作系统中执行的一个程序.进程是系统进行运算调度的最小单元。
- 操作系统以进程为单位分配存储空间
- 每个进程都有自己的地址空间、数据栈以及其他用于跟踪进程执行的辅助数据
- 操作系统管理所有进程的执行,为它们合理的分配资源。
- 一个进程还可以拥有多个并发的执行线索,简单的说就是拥有多个可以获得CPU调度的执行单元,这就是所谓的线程。
线程(Thread)有时候也被称为轻量级进程,是程序的执行流的最小单元。
- 一个标准的线程是由线程ID,当前指令指针(PC),寄存器集合和堆栈组成。
- 由于线程在同一个进程下,它们可以共享相同的上下文(Context),因此相对于进程而言,线程间的信息共享和通信更加容易。
- 当然在单核CPU系统中,真正的并发是不可能的,因为在某个时刻能够获得CPU的只有唯一的一个线程,多个线程共享了CPU的执行时间。
- 使用多线程实现并发编程为程序带来的好处是不言而喻的,最主要的体现在提升程序的性能和改善用户体验
进程、线程的理解
- 现代操作系统中提出的进程的概念,每个进程都认为自己独占所有的计算机硬件资源
- 进程就是独立的王国,进程间不能直接随便贡献资源。
- 线程是省份,同一进程内的线程可以共享进程的资源,每一个线程拥有自己独立的堆栈。
线程的状态
- 运行态-正在占用CPU
- 就绪态-随时可以转换为运行状态
- 阻塞态-除非某些外部事件发生,否则进程不能运行
| 状态 | 含义 |
|---|---|
| 就绪(Ready) | 线程能够运行,但是在等待被调度。可能线程刚刚创建启动或刚从阻塞中恢复 |
| 运行(Running) | 程序正在运行 |
| 阻塞(Blocked) | 线程等待外部事件发生而无法运行,如I/O操作 |
| 终止(Terminated) | 线程完成,或退出,或被取消 |
Python中的多线程
在Python早期的版本中就引入了thread模块(现在名为_thread)来实现多线程编程,然而该模块过于底层,而且很多功能都没有提供,因此目前的多线程开发我们推荐使用threading模块,该模块对多线程编程提供了更好的面向对象的封装。
threading中的函数
| 名称 | 含义 |
|---|---|
| current_thread() | 返回当前线程对象 |
| main_thread() | 返回主线程对象 |
| active_count() | 当前处于alive状态的线程个数 |
| enumerate() | 返回所有活着的线程列表,不包括已经终止的线程和未开始的线程 |
| get_ident() | 返回当前线程的ID,非0整数 |
from random import randint
from threading import Thread
from time import time, sleep
def download(filename):
print('开始下载%s...' % filename)
time_to_download = randint(5, 10)
sleep(time_to_download)
print('%s下载完成! 耗费了%d秒' % (filename, time_to_download))
def main():
start = time()
t1 = Thread(target=download, args=('Python从入门到住院.pdf',))
t1.start()
t2 = Thread(target=download, args=('Peking Hot.avi',))
t2.start()
t1.join()
t2.join()
end = time()
print('总共耗费了%.3f秒' % (end - start))
if __name__ == '__main__':
main()
我们可以直接使用threading模块的Thread类来创建线程,但是我们之前讲过一个非常重要的概念叫“继承”,我们可以从已有的类创建新类,因此也可以通过继承Thread类的方式来创建自定义的线程类,然后再创建线程对象并启动线程。代码如下所示。
from random import randint
from threading import Thread
from time import time, sleep
class DownloadTask(Thread):
def __init__(self, filename):
super().__init__()
self._filename = filename
def run(self):
print('开始下载%s...' % self._filename)
time_to_download = randint(5, 10)
sleep(time_to_download)
print('%s下载完成! 耗费了%d秒' % (self._filename, time_to_download))
def main():
start = time()
t1 = DownloadTask('Python从入门到住院.pdf')
t1.start()
t2 = DownloadTask('Peking Hot.avi')
t2.start()
t1.join()
t2.join()
end = time()
print('总共耗费了%.2f秒.' % (end - start))
if __name__ == '__main__':
main()
Thread类
Thread(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
| 参数 | 说明 |
|---|---|
| group | 为了日后扩展 ThreadGroup 类实现而保留 |
| target | 用于__run__ 方法调用的可调用对象。参数是函数。 |
| name | 线程名称。默认情况下,由 "Thread-N" 格式构成一个唯一的名称,其中 N 是小的十进制数。 |
| args | 是用于调用目标函数的参数元组。默认是 (). |
| kwargs | 是用于调用目标函数的关键字参数字典。默认是 {}。 |
| daemon | 不是 None,线程将被显式的设置为 守护模式,不管该线程是否是守护模式。如果是 None (默认值),线程将继承当前线程的守护模式属性。 |
Thread方法
| 方法 | 说明 |
|---|---|
start() |
开始线程活动。它在一个线程里最多只能被调用一次。它安排对象的 run() 方法在一个独立的控制进程中调用。 |
run() |
代表线程活动的方法。可以在子类型里重载这个方法。 标准的 run() 方法会对作为 target参数传递给该对象构造器的可调用对象(如果存在)发起调用,并附带从 args 和 kwargs 参数分别获取的位置和关键字参数。 |
join(timeout=None) |
等待,直到线程终结。这会阻塞调用这个方法的线程,直到被调用 join() 的线程终结 -- 不管是正常终结还是抛出未处理异常 -- 或者直到发生超时,超时选项是可选的。 |
getName() |
旧的 name 取值API; |
setName() |
旧的 name 设值API; |
| ident | 这个线程的 '线程标识符',如果线程尚未开始则为 None 。 |
is_alive() |
返回线程是否存活。 |
| daemon | 一个表示这个线程是(True)否(False)守护线程的布尔值。一定要在调用 start() 前设置好,不然会抛出 RuntimeError 。 |
isDaemon()、setDaemon() |
旧的 name 取值/设值 API;建议直接当做特征属性使用它 |
线程的退出
Python中没有提供线程退出的方法,线程在下面情况退出:
- 线程函数内语句执行完毕
- 线程函数中抛出未处理的异常
daemon的应用场景
daemon线程是为了让程序员不用去记录和管理那些后台线程。
当把一个线程设置为daemon,它会随着主线程的退出而退出。
主要的应用场景:
- 1.后台任务、如监控、等待连接等。
- 2.依赖主线程工作的线程。
- 3.随时可以被终止的线程。
daemon简化程序员手动关闭线程的工作。
锁
因为多个线程可以共享进程的内存空间,因此要实现多个线程间的通信相对简单,大家能想到的最直接的办法就是设置一个全局变量,多个线程共享这个全局变量即可。
但是当多个线程共享同一个变量(我们通常称之为“资源”)的时候,很有可能产生不可控的结果从而导致程序失效甚至崩溃。如果一个资源被多个线程竞争使用,那么我们通常称之为“临界资源”,对“临界资源”的访问需要加上保护,否则资源会处于“混乱”的状态。下面的例子演示了100个线程向同一个银行账户转账(转入1元钱)的场景,在这个例子中,银行账户就是一个临界资源,在没有保护的情况下我们很有可能会得到错误的结果。
from time import sleep
from threading import Thread
class Account(object):
def __init__(self):
self._balance = 0
def deposit(self, money):
# 计算存款后的余额
new_balance = self._balance + money
# 模拟受理存款业务需要0.01秒的时间
sleep(0.01)
# 修改账户余额
self._balance = new_balance
@property
def balance(self):
return self._balance
class AddMoneyThread(Thread):
def __init__(self, account, money):
super().__init__()
self._account = account
self._money = money
def run(self):
self._account.deposit(self._money)
def main():
account = Account()
threads = []
# 创建100个存款的线程向同一个账户中存钱
for _ in range(100):
t = AddMoneyThread(account, 1)
threads.append(t)
t.start()
# 等所有存款的线程都执行完毕
for t in threads:
t.join()
print('账户余额为: ¥%d元' % account.balance)
if __name__ == '__main__':
main()
运行上面的程序,结果让人大跌眼镜,100个线程分别向账户中转入1元钱,结果居然远远小于100元。之所以出现这种情况是因为我们没有对银行账户这个“临界资源”加以保护,多个线程同时向账户中存钱时,会一起执行到new_balance = self._balance + money这行代码,多个线程得到的账户余额都是初始状态下的0,所以都是0上面做了+1的操作,因此得到了错误的结果。在这种情况下,“锁”就可以派上用场了。我们可以通过“锁”来保护“临界资源”,只有获得“锁”的线程才能访问“临界资源”,而其他没有得到“锁”的线程只能被阻塞起来,直到获得“锁”的线程释放了“锁”,其他线程才有机会获得“锁”,进而访问被保护的“临界资源”。下面的代码演示了如何使用“锁”来保护对银行账户的操作,从而获得正确的结果。
Lock
锁,一旦线程获得锁,其他试图获取锁的线程将会被阻塞
| 名称 | 含义 |
|---|---|
| acquire(blocking=True,timeout=-1) | 默认阻塞,阻塞可以设置超时时间,非阻塞时,timeout禁止设置,成功获取锁,返回True否则返回False |
| release() | 释放锁,可以从任何线程调用释放。已上锁的锁,会被重置为unlocked未上锁状态 |
锁的应用场景
锁适用于访问和修改同一个共享资源的时候,即读写同一个资源的时候,如果全部都是读取一个共享资源的时候需要加锁吗?
不需要,因为这时可以认为共享资源时不可改变的,每次读取它都是一样的值,所以不用加锁。
使用锁的注意事项:
- 少用锁,必要时在用锁,多线程访问被锁的资源的时候就成串行,要么排队执行,要么争抢执行。
- 加锁时间越短越好,不需要时立即释放锁
- 一定要避免死锁。
不使用锁,有了效率,但是结果是错的。
使用了锁,效率低下,但是结果是对的。
所以,我们是为了效率要错误的结果呢?还是为了对的结果,让计算降低效率。
import threading
import time
num = 0 #全局变量多个线程可以读写,传递数据
lock =threading.Lock() #创建一个锁
class Mythread(threading.Thread):
def run(self):
global num
with lock: #with Lock的作用相当于自动获取和释放锁(资源)
for i in range(1000000): #锁定期间,其他线程不可以干活
num+=1
print(num)
mythread=[]
for i in range(5):
t=Mythread()
t.start()
mythread.append(t)
for t in mythread:
t.join()
print("game over")
递归锁对象threading.RLock
重入锁必须由获取它的线程释放。一旦线程获得了重入锁,同一个线程再次获取它将不阻塞;线程必须在每次获取它时释放一次。
递归锁和普通锁方法相同。
from time import sleep
from threading import Thread, RLock
class Account(object):
def __init__(self):
self._balance = 0
self._lock = RLock()
def deposit(self, money):
# 先获取锁才能执行后续的代码
self._lock.acquire()
try:
new_balance = self._balance + money
sleep(0.01)
self._balance = new_balance
finally:
# 在finally中执行释放锁的操作保证正常异常锁都能释放
self._lock.release()
@property
def balance(self):
return self._balance
class AddMoneyThread(Thread):
def __init__(self, account, money):
super().__init__()
self._account = account
self._money = money
def run(self):
self._account.deposit(self._money)
def main():
account = Account()
threads = []
for _ in range(100):
t = AddMoneyThread(account, 1)
threads.append(t)
t.start()
for t in threads:
t.join()
print('账户余额为: ¥%d元' % account.balance)
if __name__ == '__main__':
main()
线程同步
线程是共享全局变量。查询不用加锁,修改时需要加锁。
线程同步,线程间协同,通过某种技术,让一个线程访问某些数据时,其他线程不能访问这些数据,直到该线程完成对数据的操作。
临界区(Critical Section)、互斥量(Mutex) -->(锁)、信号量(Semaphore)、事件(Event)
临界区:指的是一个访问共用资源的程序片段,而这些共用资源又无法同时被多个线程访问的特性。当有线程进入临界区段时,其他线程或是进程必须等待,以确保这些共用资源是被互斥获得使用
Condition
线程间同步之条件变量
Python提供的Condition对象提供了对复杂线程同步问题的支持。Condition被称为条件变量,除了提供与Lock类似的acquire和release方法外,还提供了wait和notify方法。
构造方法Condition(lock=None),可以传入一个Lock,或RLock对象,默认是RLock.
Condition用于生产者,消费者模型,为了解决生产者消费者速度匹配的问题。
| 名称 | 含义 |
|---|---|
| acquire(*args) | 获取锁 |
| wait(self,timeout=None) | 等待或超时 |
| notify(n=1) | 唤醒至多指定数目的等待线程,没有等待的线程就没有任何操作 |
| notify_all() | 唤醒所有的等待的线程 |
| release() | 释放锁 |
"""
实现场景:当a同学王火锅里面添加鱼丸加满后(最多5个,加满后通知b去吃掉),通知b同学去吃掉鱼丸(吃到0的时候通知a同学继续添加)
"""
import threading
import time
con = threading.Condition()
num = 0
# 生产者
class Producer(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
def run(self):
with con:
# 锁定线程
global num
while True:
print("开始添加!!!")
num += 1
print("火锅里面鱼丸个数:%s" % str(num))
time.sleep(1)
if num >= 5:
print("火锅里面里面鱼丸数量已经到达5个,无法添加了!")
# 唤醒等待的线程
con.notify() # 唤醒小伙伴开吃啦
# 等待通知
con.wait()
# 消费者
class Consumers(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
def run(self):
with con:
global num
while True:
print("开始吃啦!!!")
num -= 1
print("火锅里面剩余鱼丸数量:%s" % str(num))
time.sleep(2)
if num <= 0:
print("锅底没货了,赶紧加鱼丸吧!")
con.notify() # 唤醒其它线程
# 等待通知
con.wait()
p = Producer()
c = Consumers()
p.start()
c.start()
Condition 总结
condition用于生产者消费者模型中,解决生产值速度匹配的问题。
采用了通知机制,非常有效。
使用方式
使用Condition,必须先acqueire,用完后relsase,因为内部使用了锁,默认使用RLock,最好的方式是使用with上下文。
消费者wait,等待通知。
生产者生产好消息,对消费者发通知,可以使用notify或notify_all方法。
Event
Event事件,是线程间通信机制中最简单的实现,使用一个内部的标记flag,通过flag的True或False的变化来进行操作。
event = threading.Event() # 事件变量 (flag = False) 等待 , (flag = True) 解除等待
event.wait() --> flag = False # 在线程内部 设置为事件为等待状态,线程会暂停执行。
event.set() --> flag = True # 让线程从等待状态退出,继续执行
event.clear() --> flag = False # 清空状态。
| 名称 | 含义 |
|---|---|
| set() | flag标记设置为True |
| clear() | flag标记设置为Fales |
| is_set() | return flag 返回标记 |
| wait(timeout=None) | 设置标记为True的时长,None为无标记,等到返回True,超时返回False |
需求:
老板雇佣了一个工人,让他生产杯子,老板一直等待工人,直到生产了10个杯子。
import time
import logging
from threading import Event, Thread
log_format = "%(asctime)s-%(levelname)s-%(module)s-%(funcName)s-%(threadName)s%(message)s"
logging.basicConfig(format=log_format, level=logging.INFO)
def boss(event: Event):
logging.info("我是老板正在等待中....")
event.wait()
logging.info("完成的不错!")
def worker(event: Event, count: int = 10):
logging.info("我正在工作中....")
cups = []
while True:
logging.info("制作中....")
time.sleep(0.5)
cups.append("u")
if len(cups) >= count:
event.set()
break
logging.info("我制作的杯子.cups={}".format(cups))
event = Event()
w = Thread(target=worker, args=(event,))
b = Thread(target=boss, args=(event,))
b.start()
w.start()
Barrier
屏障,可以想象成路障、道闸。
Python3.2引入的新的功能。
Barrier类是设置了一个线程数量障碍,当等待的线程到达了这个数量就会唤醒所有的等待线程。
| 名称 | 含义 |
|---|---|
| Barrier(parties,action=None,timeout=None) | 构建Barrier对象,指定参与方数目,timeout是wait为指定超时时间时的默认值。 |
| n_waition | 当前在屏障中等待的线程数 |
| parities | 通过障碍所需的线程数。 |
| wait(time=None) | 等待通过屏障 |
#!/usr/bin/env python3
# coding:utf-8
# author:ZuoJie
# date:2019/5/15 9:32
import time
import threading
import logging
from threading import Barrier
log_format = "%(asctime)s-15s\t [%(threadName)s,%(thread)8d] %(message)s"
logging.basicConfig(level=logging.INFO, format=log_format)
def worker(barrier: threading.Barrier):
logging.info("等待{}线程".format(barrier.n_waiting))
try:
barrier_id = barrier.wait()
logging.info("下一个是barrier{}".format(barrier_id))
except threading.BrokenBarrierError:
logging.info("损毁的Barrier")
barrier = Barrier(3)
for _ in range(100):
threading.Thread(target=worker, args=(barrier,)).start()
| 名称 | 作用 |
|---|---|
| bronken | 如果屏障处于打破状态,返回True |
| abort() | 将瓶装置于broken状态,直到reset的方法来恢复屏障 |
| reset() | 恢复屏障,重新开始拦截 |
Barrier应用
并发初始化
所有线程都必须完成初始化后,才能继续工作,例如运行前的加载数据,检查,如果这这些工作没有完成就开始工作,将不能正常工作。
例如:启动一个程序,需要先加载磁盘文件、缓存预热、初始化连接池等工作后才能正常使用。这些工作可以齐头并进的。不过只有都满足了,程序才能继续向后执行。假设数据库连接失败了,则初始化工作失败,就要about,屏障broken,所有线程收到异常退出。
Semaphore信号量
semaphore是一个内置的计数器.
设置同一时刻允许运行的线程的数量。
信号量,信号量对象内部维护一个倒计数器,每一次acquire都会减1,当acquire方法发现计数为0就阻塞请求的线程,直到其它线程对信号量release后,计数大于0,恢复阻塞的线程。
| 名称 | 含义 |
|---|---|
| Semaphore(value=1) | 构造方法,value小于0,抛出ValueError错误 |
| acquire(blocking=True,time=None) | 获取信号量,计数器减1,获取成功返回True |
| release() | 释放信号量,计数器加1 |
#!/usr/bin/env python3
# coding:utf-8
# author:ZuoJie
# date:2019/5/15 10:26
import time
import threading
import logging
from threading import Semaphore
log_format = "%(asctime)s-15s\t [%(threadName)s,%(thread)8d] %(message)s"
logging.basicConfig(level=logging.INFO, format=log_format)
sem = Semaphore(3)
class MyThread(threading.Thread):
def run(self):
logging.info("{} 等待中的信号量".format(self.name))
sem.acquire()
logging.info("被锁定的信号量{}".format(self.name))
time.sleep(5)
logging.info("{} 解除锁定信号量".format(self.name))
sem.release()
for i in range(10):
t = MyThread(name="Thread-{}".format(i + 1))
t.start()
queue--- 一个同步的队列类
queue 模块实现多生产者,多消费者队列。当信息必须安全的在多线程之间交换时,它在线程编程中是特别有用的。
Queue对象
队列对象 (Queue, LifoQueue, 或者 PriorityQueue) 提供下列描述的公共方法。
| 方法 | 说明 |
|---|---|
| qsize() | 返回队列的大致大小。注意,qsize() > 0 不保证后续的 get() 不被阻塞,qsize() < maxsize 也不保证 put() 不被阻塞。 |
| empty() | 如果队列为空,返回 True ,否则返回 False |
| full() | 如果队列是满的返回 True ,否则返回 False |
put(item, block=True, timeout=None) |
将 item 放入队列。如果可选参数 block 是 true 并且 timeout 是 None (默认),则在必要时阻塞至有空闲插槽可用。 |
put_nowait(item) |
相当于 put(item, False) 。 |
get(block=True, timeout=None) |
从队列中移除并返回一个项目。 |
Queue.get_nowait() |
相当于 get(False) |
join() |
阻塞至队列中所有的元素都被接收和处理完毕。 |
task_done() |
表示前面排队的任务已经被完成。被队列的消费者线程使用。每个 get() 被用于获取一个任务, 后续调用 task_done() 告诉队列,该任务的处理已经完成。 |
def worker():
while True:
item = q.get()
if item is None:
break
do_work(item)
q.task_done()
q = queue.Queue()
threads = []
for i in range(num_worker_threads):
t = threading.Thread(target=worker)
t.start()
threads.append(t)
for item in source():
q.put(item)
# block until all tasks are done
q.join()
# stop workers
for i in range(num_worker_threads):
q.put(None)
for t in threads:
t.join()
定时器 Timer/延时执行
threading.Timer继承自Thread。
Timer(interval, function, args=[], kwargs={})
| 参数 | 说明 |
|---|---|
| interval | 指定延时的时间 |
| function | 要执行的方法 |
| args/kwargs | 方法的参数 |
import threading
def func():
print('hello timer!')
timer = threading.Timer(5, func)
timer.start()
GIL全局解释器锁
- CPython在解释器进程级别有一把锁,就做GIL全局解释器锁。
- GIL保证CPython进程中,在一个时刻只能有一个线程执行字节码,甚至在多核CPU下,也是如此。
- CPython中IO密型操作,由于线程阻塞,就会调用其他线程。
- IO密集型操作,使用多线程.CPU密集型使用多进程,绕开GIL.
Python中的多进程
Unix和Linux操作系统上提供了fork()系统调用来创建进程,调用fork()函数的是父进程,创建出的是子进程,子进程是父进程的一个拷贝,但是子进程拥有自己的PID。
fork()函数非常特殊它会返回两次,父进程中可以通过fork()函数的返回值得到子进程的PID,而子进程中的返回值永远都是0。Python的os模块提供了fork()函数。
由于Windows系统没有fork()调用,因此要实现跨平台的多进程编程,可以使用multiprocessing模块的Process类来创建子进程,而且该模块还提供了更高级的封装,例如批量启动进程的进程池(Pool)、用于进程间通信的队列(Queue)和管道(Pipe)等。
上下文和启动方法
根据不同的平台, multiprocessing 支持三种启动进程的方法。这些 启动方法 有
spawn
父进程启动一个新的Python解释器进程。子进程只会继承那些运行进程对象的
run()方法所需的资源。特别是父进程中非必须的文件描述符和句柄不会被继承。相对于使用 fork 或者 forkserver,使用这个方法启动进程相当慢。可在Unix和Windows上使用。 Windows上的默认设置。fork
父进程使用
os.fork()来产生 Python 解释器分叉。子进程在开始时实际上与父进程相同。父进程的所有资源都由子进程继承。请注意,安全分叉多线程进程是棘手的。只存在于Unix。Unix中的默认值。forkserver
程序启动并选择* forkserver * 启动方法时,将启动服务器进程。从那时起,每当需要一个新进程时,父进程就会连接到服务器并请求它分叉一个新进程。分叉服务器进程是单线程的,因此使用
os.fork()是安全的。没有不必要的资源被继承。可在Unix平台上使用,支持通过Unix管道传递文件描述符。
下面用一个下载文件的例子来说明使用多进程和不使用多进程到底有什么差别,先看看下面的代码。
from random import randint
from time import time, sleep
def download_task(filename):
print('开始下载%s...' % filename)
time_to_download = randint(5, 10)
sleep(time_to_download)
print('%s下载完成! 耗费了%d秒' % (filename, time_to_download))
def main():
start = time()
download_task('Python从入门到住院.pdf')
download_task('Peking Hot.avi')
end = time()
print('总共耗费了%.2f秒.' % (end - start))
if __name__ == '__main__':
main()
下面是运行程序得到的一次运行结果。
开始下载Python从入门到住院.pdf...
Python从入门到住院.pdf下载完成! 耗费了6秒
开始下载Peking Hot.avi...
Peking Hot.avi下载完成! 耗费了7秒
总共耗费了13.01秒.
从上面的例子可以看出,如果程序中的代码只能按顺序一点点的往下执行,那么即使执行两个毫不相关的下载任务,也需要先等待一个文件下载完成后才能开始下一个下载任务,很显然这并不合理也没有效率。接下来我们使用多进程的方式将两个下载任务放到不同的进程中,代码如下所示。
from multiprocessing import Process
from os import getpid
from random import randint
from time import time, sleep
def download_task(filename):
print('启动下载进程,进程号[%d].' % getpid())
print('开始下载%s...' % filename)
time_to_download = randint(5, 10)
sleep(time_to_download)
print('%s下载完成! 耗费了%d秒' % (filename, time_to_download))
def main():
start = time()
p1 = Process(target=download_task, args=('Python从入门到住院.pdf', ))
p1.start()
p2 = Process(target=download_task, args=('Peking Hot.avi', ))
p2.start()
p1.join()
p2.join()
end = time()
print('总共耗费了%.2f秒.' % (end - start))
if __name__ == '__main__':
main()
在上面的代码中,我们通过Process类创建了进程对象,通过target参数我们传入一个函数来表示进程启动后要执行的代码,后面的args是一个元组,它代表了传递给函数的参数。Process对象的start方法用来启动进程,而join方法表示等待进程执行结束。运行上面的代码可以明显发现两个下载任务“同时”启动了,而且程序的执行时间将大大缩短,不再是两个任务的时间总和。下面是程序的一次执行结果。
启动下载进程,进程号[1530].
开始下载Python从入门到住院.pdf...
启动下载进程,进程号[1531].
开始下载Peking Hot.avi...
Peking Hot.avi下载完成! 耗费了7秒
Python从入门到住院.pdf下载完成! 耗费了10秒
总共耗费了10.01秒.
我们也可以使用subprocess模块中的类和函数来创建和启动子进程,然后通过管道来和子进程通信,这些内容我们不在此进行讲解,有兴趣的读者可以自己了解这些知识。接下来我们将重点放在如何实现两个进程间的通信。我们启动两个进程,一个输出Ping,一个输出Pong,两个进程输出的Ping和Pong加起来一共10个。听起来很简单吧,但是如果这样写可是错的哦。
from multiprocessing import Process
from time import sleep
counter = 0
def sub_task(string):
global counter
while counter < 10:
print(string, end='', flush=True)
counter += 1
sleep(0.01)
def main():
Process(target=sub_task, args=('Ping', )).start()
Process(target=sub_task, args=('Pong', )).start()
if __name__ == '__main__':
main()
看起来没毛病,但是最后的结果是Ping和Pong各输出了10个,Why?当我们在程序中创建进程的时候,子进程复制了父进程及其所有的数据结构,每个子进程有自己独立的内存空间,这也就意味着两个子进程中各有一个counter变量,所以结果也就可想而知了。要解决这个问题比较简单的办法是使用multiprocessing模块中的Queue类,它是可以被多个进程共享的队列,底层是通过管道和信号量(semaphore)机制来实现的,有兴趣的读者可以自己尝试一下。
由于Python的GIL,多进程未必是CPU密集型程序的好的选择
多进程可以完全独立的进程环境中运行程序,可以充分地利用多处理器。
但是进程本身的隔离带来的数据不共享也是一问题,而且线程比进程轻量。
multiprocessing
Process类
process类遵循了Thread类的API,减少了学习的难度。
#!/usr/bin/env python3
# coding:utf-8
# author:ZuoJie
# date:2019/5/16 9:48
import time
import logging
import multiprocessing
from functools import wraps
log_format = "%(asctime)-15s\t %(message)s"
logging.basicConfig(level=logging.INFO, format=log_format)
def run_time(func):
@wraps(func)
def func_fun_time(*args, **kwargs):
start = time.time()
func(*args, **kwargs)
logging.info("运行了{}s".format(time.time() - start))
return func_fun_time
@run_time
def calc(q: multiprocessing.Queue, n: int):
num = 0
for _ in range(n):
num += 1
q.put(num)
if __name__ == '__main__':
start = time.time()
lis = []
q = multiprocessing.Queue()
for i in range(2):
multiprocessing.Process(target=calc, args=(q, 50000000)).start()
for i in range(2):
lis.append(q.get())
print(sum(lis))
print(time.time() - start)
| 参数 | 作用 |
|---|---|
| pid | 进程id |
| exitcode | 进程的退出状态 |
| terminate() | 终止指定的进程 |
进程间同步
进程提供了与线程一样的类,使用的方法一样,使用的效果也类似。
不过进程的代价要高于线程,而且底层实现是不同的,只不过Python屏蔽了这些,让用户简单使用。
multiprocessing还提供了共享内存、服务器进程来共享数据,还提供了Queue队列、Pipe管道用户进程通信。
进程间可以使用RPC(即Remote Procedure Call,远程过程调用)来进行数据通信
通信方式不同
1.多进程就是启动了多个解释器进程,进程间通信必须要序列化、反序列化
2.数据的线程安全问题
由于每个进程中没有实现多线程,GIL可以说就没有什么用了。
进程池
池的概念:池是一组资源的集合,这组资源在服务器启动之初就完全被创建并初始化,这称为静态资源分配。
当服务器进入正式运行阶段,即开始处理客户请求的时候,如果它需要相关的资源,就可以直接从池中获取,无需动态分配。
直接从池中取得所需资源比动态分配资源的速度要快得多,因为分配系统资源的系统调用都是很耗时的。
当服务器处理完一个客户连接后,可以把相关的资源放回池中,无需执行系统调用来释放资源。
由于服务器的硬件资源“充裕”,那么提高服务器性能的一个很直接的方法就是以空间换时间,即“浪费”服务器的硬件资源,以换取其运行效率。
进程池是由服务器预先创建的一组子进程,这些子进程的数目在 3~10 个之间(当然这只是典型情况)。线程池中的线程数量应该和 CPU 数量差不多。
进程池中的所有子进程都运行着相同的代码,并具有相同的属性,比如优先级、 PGID 等。
当有新的任务来到时,主进程将通过某种方式选择进程池中的某一个子进程来为之服务。相比于动态创建子进程,选择一个已经存在的子进程的代价显得小得多。至于主进程选择哪个子进程来为新任务服务,则有两种方法:
1)主进程使用某种算法来主动选择子进程。最简单、最常用的算法是随机算法和 Round Robin (轮流算法)。
2)主进程和所有子进程通过一个共享的工作队列来同步,子进程都睡眠在该工作队列上。当有新的任务到来时,主进程将任务添加到工作队列中。这将唤醒正在等待任务的子进程,不过只有一个子进程将获得新任务的“接管权”,它可以从工作队列中取出任务并执行之,而其他子进程将继续睡眠在工作队列上。
当选择好子进程后,主进程还需要使用某种通知机制来告诉目标子进程有新任务需要处理,并传递必要的数据。最简单的方式是,在父进程和子进程之间预先建立好一条管道,然后通过管道来实现所有的进程间通信。在父线程和子线程之间传递数据就要简单得多,因为我们可以把这些数据定义为全局,那么它们本身就是被所有线程共享的。

#!/usr/bin/env python3
# coding:utf-8
# author:ZuoJie
# date:2019/5/16 9:48
import time
import logging
import multiprocessing
from functools import wraps
log_format = "%(asctime)-15s\t %(message)s"
logging.basicConfig(level=logging.INFO, format=log_format)
def run_time(func):
@wraps(func)
def func_fun_time(*args, **kwargs):
start = time.time()
func(*args, **kwargs)
logging.info("运行了{}s".format(time.time() - start))
return func_fun_time
@run_time
def calc(n: int):
num = 0
for _ in range(n):
num += 1
if __name__ == '__main__':
start = time.time()
pool = multiprocessing.Pool(4)
for i in range(4):
pool.apply_async(func=calc, args=(25000000,))
pool.close()
pool.join()
print(time.time() - start)
多进程、多线程的选择
1.CPU密集型
CPython中使用了GIL,多线程的时候锁相互竞争,且多核优秀不能发挥,Python多进程效率更高。
2.IO密集型
适合是多线程,减少IO序列化开销,且在等待的时候,切换到其他线程执行,效率不错。
应用
请求/应答模型:WEB应用中常见的处理模式。
master启动多个worker工作进程,一般与CPU数量一致
worker工作进程中启动多进程,提高并发处理能力,worker处理用户的请求,往往需要等待数据。
这就是nginx的工作模式。
concurrent包
concurrent.futures
Python 3.2版本中引入的模块
from concurrent import futures
futures.ThreadPoolExecutor() # 异步调用线程池
futures.ProcessPoolExecutor() # 异步调用进程池
使用的时候首先要定义一个池的执行器对象,Executor类子类对象。
| 方法 | 作用 |
|---|---|
| ThreadPoolExecutor(max_worker=1) | 池中至多创建max_workers个线程池来同时异步执行,返回Executor实例 |
| submit(fn,args,*kwargs) | 提交执行的函数及其参数,返回Future实例 |
| shutdown(wait=True) | 清理池 |
Futuer类
| 方法 | 作用 |
|---|---|
| result() | 可以常看代用的返回结果 |
| done() | 如果调用成功的取消或者执行完成,返回True |
| cancelled() | 如果调用被成功取消,返回True |
| running() | 如果正在运行且不能被取消,返回True |
| cancal() | 尝试取消调用,如果已经执行且不能取消的返回False,否则返回True |
| result(timeout=None) | 出concurrent.futures.TimeoutError异常 |
| exception(timeout=None) | 取返回值异常,超时为None,一直等待返回,超时时间设置到期,抛出concurrent.futures.TimeoutError异常 |
#!/usr/bin/env python3
# coding:utf-8
# author:ZuoJie
# date:2019/5/16 11:53
import time
import threading
import logging
from concurrent import futures
log_format = "%(asctime)-15s\t %(threadName)s-%(thread)s-%(message)s"
logging.basicConfig(level=logging.INFO, format=log_format)
# futures.ThreadPoolExecutor() # 异步调用线程池
# futures.ProcessPoolExecutor() # 异步调用进程池
# f = futures.Future()
def worker(n):
logging.info("working-{}".format(n))
time.sleep(5)
logging.info("end work-{}".format(n))
if __name__ == '__main__':
# tpe = futures.ThreadPoolExecutor(3)
tpe = futures.ProcessPoolExecutor(3)
fs = []
for i in range(3):
f = tpe.submit(worker, i)
fs.append(f)
for i in range(3, 6):
f = tpe.submit(worker, i)
fs.append(f)
while True:
time.sleep(2)
logging.info(threading.enumerate())
flag = True
for f in fs:
flag = flag and f.done()
if flag:
tpe.shutdown()
logging.info(threading.enumerate())
break
上下文支持管理
concurrent.futures.ProcessPoolExecutor 继承concurrent.futures.base.Executor.而父类有__enter__、__exit__方法,支持上下文管理,可以使用with语句。
with futures.ProcessPoolExecutor(3) as tpe:
pass
总结
concurrennt 统一了线程池、进程池调用,简化了编程
使用Python简单的思想哲学的体现。
唯一的缺点是:无法设置线程名称。
应用案例
例子1:将耗时间的任务放到线程中以获得更好的用户体验。
如下所示的界面中,有“下载”和“关于”两个按钮,用休眠的方式模拟点击“下载”按钮会联网下载文件需要耗费10秒的时间,如果不使用“多线程”,我们会发现,当点击“下载”按钮后整个程序的其他部分都被这个耗时间的任务阻塞而无法执行了,这显然是非常糟糕的用户体验,代码如下所示。
import time
import tkinter
import tkinter.messagebox
def download():
# 模拟下载任务需要花费10秒钟时间
time.sleep(10)
tkinter.messagebox.showinfo('提示', '下载完成!')
def show_about():
tkinter.messagebox.showinfo('关于', '作者: 老吴(v1.0)')
def main():
top = tkinter.Tk()
top.title('单线程')
top.geometry('200x150')
top.wm_attributes('-topmost', True)
panel = tkinter.Frame(top)
button1 = tkinter.Button(panel, text='下载', command=download)
button1.pack(side='left')
button2 = tkinter.Button(panel, text='关于', command=show_about)
button2.pack(side='right')
panel.pack(side='bottom')
tkinter.mainloop()
if __name__ == '__main__':
main()
如果使用多线程将耗时间的任务放到一个独立的线程中执行,这样就不会因为执行耗时间的任务而阻塞了主线程,修改后的代码如下所示。
import time
import tkinter
import tkinter.messagebox
from threading import Thread
def main():
class DownloadTaskHandler(Thread):
def run(self):
time.sleep(10)
tkinter.messagebox.showinfo('提示', '下载完成!')
# 启用下载按钮
button1.config(state=tkinter.NORMAL)
def download():
# 禁用下载按钮
button1.config(state=tkinter.DISABLED)
# 通过daemon参数将线程设置为守护线程(主程序退出就不再保留执行)
# 在线程中处理耗时间的下载任务
DownloadTaskHandler(daemon=True).start()
def show_about():
tkinter.messagebox.showinfo('关于', '作者: 老吴(v1.0)')
top = tkinter.Tk()
top.title('单线程')
top.geometry('200x150')
top.wm_attributes('-topmost', 1)
panel = tkinter.Frame(top)
button1 = tkinter.Button(panel, text='下载', command=download)
button1.pack(side='left')
button2 = tkinter.Button(panel, text='关于', command=show_about)
button2.pack(side='right')
panel.pack(side='bottom')
tkinter.mainloop()
if __name__ == '__main__':
main()
例子2:使用多进程对复杂任务进行“分而治之”。
我们来完成1~100000000求和的计算密集型任务,这个问题本身非常简单,有点循环的知识就能解决,代码如下所示。
from time import time
def main():
total = 0
number_list = [x for x in range(1, 100000001)]
start = time()
for number in number_list:
total += number
print(total)
end = time()
print('Execution time: %.3fs' % (end - start))
if __name__ == '__main__':
main()
在上面的代码中,我故意先去创建了一个列表容器然后填入了100000000个数,这一步其实是比较耗时间的,所以为了公平起见,当我们将这个任务分解到8个进程中去执行的时候,我们暂时也不考虑列表切片操作花费的时间,只是把做运算和合并运算结果的时间统计出来,代码如下所示。
from multiprocessing import Process, Queue
from random import randint
from time import time
def task_handler(curr_list, result_queue):
total = 0
for number in curr_list:
total += number
result_queue.put(total)
def main():
processes = []
number_list = [x for x in range(1, 100000001)]
result_queue = Queue()
index = 0
# 启动8个进程将数据切片后进行运算
for _ in range(8):
p = Process(target=task_handler,
args=(number_list[index:index + 12500000], result_queue))
index += 12500000
processes.append(p)
p.start()
# 开始记录所有进程执行完成花费的时间
start = time()
for p in processes:
p.join()
# 合并执行结果
total = 0
while not result_queue.empty():
total += result_queue.get()
print(total)
end = time()
print('Execution time: ', (end - start), 's', sep='')
if __name__ == '__main__':
main()
比较两段代码的执行结果,使用多进程后由于获得了更多的CPU执行时间以及更好的利用了CPU的多核特性,明显的减少了程序的执行时间,而且计算量越大效果越明显。当然,如果愿意还可以将多个进程部署在不同的计算机上,做成分布式进程,具体的做法就是通过multiprocessing.managers模块中提供的管理器将Queue对象通过网络共享出来(注册到网络上让其他计算机可以访问),这部分内容也留到爬虫的专题再进行讲解。