Python并发编程(一)


多任务

​ 今天我们使用的计算机早已进入多CPU或多核时代,而我们使用的操作系统都是支持“多任务”的操作系统

  • 我们可以同时运行多个程序
  • 可以将一个程序分解为若干个相对独立的子任务
  • 让多个子任务并发的执行,从而缩短程序的执行时间
  • 同时也让用户获得更好的体验。
  • 实现让程序同时执行多个任务也就是常说的“并发编程”

Python既支持多进程又支持多线程,因此使用Python实现并发编程主要有3种方式:多进程、多线程、多进程+多线程。

并行与并发的区别

  • 并行:同时做某些事,可以互相不干扰的同一个时刻做几件事。(例如:高速公路,双向八车道,所有车都可以互不干扰的运行。)

  • 并发:同一时刻,有很多事情要做。(例如:乡村小路,只有一个车道,一个方向同一时间只能过一辆车。发生多辆车同时同行的情况就是并发。)

    • 可以用并行、队列、缓存区解决并发问题。
    • 排队是解决并发的办法之一。

并发的解决

”食堂打饭模型“

中午12点,开饭了,所有学生都涌向了食堂。这就是并发,如果人特别多,就是高并发。

1.队列、缓冲区

假设只有一个窗口,就只能排队打菜。

排队就是把人排成队列,解决了资源使用的问题。

排成的队列,其实就是一个缓冲地带,就是缓冲区

假设女士优先,窗口就要有两队,只要有女生就优先打饭,男生就等着,女生队伍就是一个优先队列

2.争抢

只开一个窗口,没有秩序,谁能挤进去就给谁打饭。窗口不能给其他人打饭。这是一种锁机制.

争抢也是一种解决高并发的方式,但是不好,因为有可能会有人很长时间抢不到。

3.预处理

如果打饭慢的原因是因为要现做的话,可以提前统计大多数人喜欢吃的菜品,提前做好,保证供应,提高打饭效率。

预处理的思想是加快用户获取数据的速度,常用缓存解决。

4.并行

由于吃饭的人数过多,一个窗口忙不过来,食堂老板决定扩大食堂,多开几个窗口,但是没有窗口都要雇人,造成成本上升。

企业可以通过购买更多服务器,或开启更多进程的方式做并行处理,来解决问题。(水平扩展)

注意:线程如果只在单个CPU核心上运行就并不是并行。

5.提速

提高单个窗口的打饭速度,也是解决并发的方式。

提高打饭人员技能,或为单个窗口配置更多服务人员。

提高单机的性能,或者单个服务器安装更多CPU.

这是一种垂直提升的思路

6.多地就近原则

多开几个食堂,分流。可以把食堂设置在宿舍附近,来提升吃饭速度。

进程与线程

进程(Process)就是操作系统中执行的一个程序.进程是系统进行运算调度的最小单元

  • 操作系统以进程为单位分配存储空间
  • 每个进程都有自己的地址空间、数据栈以及其他用于跟踪进程执行的辅助数据
  • 操作系统管理所有进程的执行,为它们合理的分配资源。
  • 一个进程还可以拥有多个并发的执行线索,简单的说就是拥有多个可以获得CPU调度的执行单元,这就是所谓的线程。

线程(Thread)有时候也被称为轻量级进程,是程序的执行流的最小单元。

  • 一个标准的线程是由线程ID,当前指令指针(PC),寄存器集合和堆栈组成。
  • 由于线程在同一个进程下,它们可以共享相同的上下文(Context),因此相对于进程而言,线程间的信息共享和通信更加容易。
  • 当然在单核CPU系统中,真正的并发是不可能的,因为在某个时刻能够获得CPU的只有唯一的一个线程,多个线程共享了CPU的执行时间。
  • 使用多线程实现并发编程为程序带来的好处是不言而喻的,最主要的体现在提升程序的性能和改善用户体验
进程、线程的理解
  • 现代操作系统中提出的进程的概念,每个进程都认为自己独占所有的计算机硬件资源
  • 进程就是独立的王国,进程间不能直接随便贡献资源。
  • 线程是省份,同一进程内的线程可以共享进程的资源,每一个线程拥有自己独立的堆栈。

线程的状态

  1. 运行态-正在占用CPU
  2. 就绪态-随时可以转换为运行状态
  3. 阻塞态-除非某些外部事件发生,否则进程不能运行
状态 含义
就绪(Ready) 线程能够运行,但是在等待被调度。可能线程刚刚创建启动或刚从阻塞中恢复
运行(Running) 程序正在运行
阻塞(Blocked) 线程等待外部事件发生而无法运行,如I/O操作
终止(Terminated) 线程完成,或退出,或被取消

Python中的多线程

​ 在Python早期的版本中就引入了thread模块(现在名为_thread)来实现多线程编程,然而该模块过于底层,而且很多功能都没有提供,因此目前的多线程开发我们推荐使用threading模块,该模块对多线程编程提供了更好的面向对象的封装。

threading中的函数
名称 含义
current_thread() 返回当前线程对象
main_thread() 返回主线程对象
active_count() 当前处于alive状态的线程个数
enumerate() 返回所有活着的线程列表,不包括已经终止的线程和未开始的线程
get_ident() 返回当前线程的ID,非0整数
from random import randint
from threading import Thread
from time import time, sleep


def download(filename):
    print('开始下载%s...' % filename)
    time_to_download = randint(5, 10)
    sleep(time_to_download)
    print('%s下载完成! 耗费了%d秒' % (filename, time_to_download))


def main():
    start = time()
    t1 = Thread(target=download, args=('Python从入门到住院.pdf',))
    t1.start()
    t2 = Thread(target=download, args=('Peking Hot.avi',))
    t2.start()
    t1.join()
    t2.join()
    end = time()
    print('总共耗费了%.3f秒' % (end - start))


if __name__ == '__main__':
    main()

​ 我们可以直接使用threading模块的Thread类来创建线程,但是我们之前讲过一个非常重要的概念叫“继承”,我们可以从已有的类创建新类,因此也可以通过继承Thread类的方式来创建自定义的线程类,然后再创建线程对象并启动线程。代码如下所示。

from random import randint
from threading import Thread
from time import time, sleep


class DownloadTask(Thread):

    def __init__(self, filename):
        super().__init__()
        self._filename = filename

    def run(self):
        print('开始下载%s...' % self._filename)
        time_to_download = randint(5, 10)
        sleep(time_to_download)
        print('%s下载完成! 耗费了%d秒' % (self._filename, time_to_download))


def main():
    start = time()
    t1 = DownloadTask('Python从入门到住院.pdf')
    t1.start()
    t2 = DownloadTask('Peking Hot.avi')
    t2.start()
    t1.join()
    t2.join()
    end = time()
    print('总共耗费了%.2f秒.' % (end - start))


if __name__ == '__main__':
    main()

Thread类

Thread(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)

参数 说明
group 为了日后扩展 ThreadGroup 类实现而保留
target 用于__run__ 方法调用的可调用对象。参数是函数。
name 线程名称。默认情况下,由 "Thread-N" 格式构成一个唯一的名称,其中 N 是小的十进制数。
args 是用于调用目标函数的参数元组。默认是 ().
kwargs 是用于调用目标函数的关键字参数字典。默认是 {}
daemon 不是 None,线程将被显式的设置为 守护模式,不管该线程是否是守护模式。如果是 None (默认值),线程将继承当前线程的守护模式属性。

Thread方法

方法 说明
start() 开始线程活动。它在一个线程里最多只能被调用一次。它安排对象的 run() 方法在一个独立的控制进程中调用。
run() 代表线程活动的方法。可以在子类型里重载这个方法。 标准的 run() 方法会对作为 target参数传递给该对象构造器的可调用对象(如果存在)发起调用,并附带从 argskwargs 参数分别获取的位置和关键字参数。
join(timeout=None) 等待,直到线程终结。这会阻塞调用这个方法的线程,直到被调用 join() 的线程终结 -- 不管是正常终结还是抛出未处理异常 -- 或者直到发生超时,超时选项是可选的。
getName() 旧的 name 取值API;
setName() 旧的 name 设值API;
ident 这个线程的 '线程标识符',如果线程尚未开始则为 None
is_alive() 返回线程是否存活。
daemon 一个表示这个线程是(True)否(False)守护线程的布尔值。一定要在调用 start() 前设置好,不然会抛出 RuntimeError
isDaemon()、setDaemon() 旧的 name 取值/设值 API;建议直接当做特征属性使用它

线程的退出

Python中没有提供线程退出的方法,线程在下面情况退出:

  • 线程函数内语句执行完毕
  • 线程函数中抛出未处理的异常

daemon的应用场景

daemon线程是为了让程序员不用去记录和管理那些后台线程。

当把一个线程设置为daemon,它会随着主线程的退出而退出。

主要的应用场景:

  • 1.后台任务、如监控、等待连接等。
  • 2.依赖主线程工作的线程。
  • 3.随时可以被终止的线程。

daemon简化程序员手动关闭线程的工作。

​ 因为多个线程可以共享进程的内存空间,因此要实现多个线程间的通信相对简单,大家能想到的最直接的办法就是设置一个全局变量,多个线程共享这个全局变量即可。

​ 但是当多个线程共享同一个变量(我们通常称之为“资源”)的时候,很有可能产生不可控的结果从而导致程序失效甚至崩溃。如果一个资源被多个线程竞争使用,那么我们通常称之为“临界资源”,对“临界资源”的访问需要加上保护,否则资源会处于“混乱”的状态。下面的例子演示了100个线程向同一个银行账户转账(转入1元钱)的场景,在这个例子中,银行账户就是一个临界资源,在没有保护的情况下我们很有可能会得到错误的结果。

from time import sleep
from threading import Thread


class Account(object):

    def __init__(self):
        self._balance = 0

    def deposit(self, money):
        # 计算存款后的余额
        new_balance = self._balance + money
        # 模拟受理存款业务需要0.01秒的时间
        sleep(0.01)
        # 修改账户余额
        self._balance = new_balance

    @property
    def balance(self):
        return self._balance


class AddMoneyThread(Thread):

    def __init__(self, account, money):
        super().__init__()
        self._account = account
        self._money = money

    def run(self):
        self._account.deposit(self._money)


def main():
    account = Account()
    threads = []
    # 创建100个存款的线程向同一个账户中存钱
    for _ in range(100):
        t = AddMoneyThread(account, 1)
        threads.append(t)
        t.start()
    # 等所有存款的线程都执行完毕
    for t in threads:
        t.join()
    print('账户余额为: ¥%d元' % account.balance)


if __name__ == '__main__':
    main()

​ 运行上面的程序,结果让人大跌眼镜,100个线程分别向账户中转入1元钱,结果居然远远小于100元。之所以出现这种情况是因为我们没有对银行账户这个“临界资源”加以保护,多个线程同时向账户中存钱时,会一起执行到new_balance = self._balance + money这行代码,多个线程得到的账户余额都是初始状态下的0,所以都是0上面做了+1的操作,因此得到了错误的结果。在这种情况下,“锁”就可以派上用场了。我们可以通过“锁”来保护“临界资源”,只有获得“锁”的线程才能访问“临界资源”,而其他没有得到“锁”的线程只能被阻塞起来,直到获得“锁”的线程释放了“锁”,其他线程才有机会获得“锁”,进而访问被保护的“临界资源”。下面的代码演示了如何使用“锁”来保护对银行账户的操作,从而获得正确的结果。

Lock

锁,一旦线程获得锁,其他试图获取锁的线程将会被阻塞

名称 含义
acquire(blocking=True,timeout=-1) 默认阻塞,阻塞可以设置超时时间,非阻塞时,timeout禁止设置,成功获取锁,返回True否则返回False
release() 释放锁,可以从任何线程调用释放。已上锁的锁,会被重置为unlocked未上锁状态

锁的应用场景

锁适用于访问和修改同一个共享资源的时候,即读写同一个资源的时候,如果全部都是读取一个共享资源的时候需要加锁吗?

不需要,因为这时可以认为共享资源时不可改变的,每次读取它都是一样的值,所以不用加锁。

使用锁的注意事项:

  • 少用锁,必要时在用锁,多线程访问被锁的资源的时候就成串行,要么排队执行,要么争抢执行。
  • 加锁时间越短越好,不需要时立即释放锁
  • 一定要避免死锁。

不使用锁,有了效率,但是结果是错的。

使用了锁,效率低下,但是结果是对的。

所以,我们是为了效率要错误的结果呢?还是为了对的结果,让计算降低效率。

import threading
import time

num = 0  #全局变量多个线程可以读写,传递数据
lock =threading.Lock() #创建一个锁

class Mythread(threading.Thread):
    def run(self):
        global num
        with lock:   #with Lock的作用相当于自动获取和释放锁(资源)
            for i in range(1000000): #锁定期间,其他线程不可以干活
                num+=1
        print(num)

mythread=[]
for i  in range(5):
    t=Mythread()
    t.start()
    mythread.append(t)
for t in mythread:
    t.join()
print("game over")

递归锁对象threading.RLock

​ 重入锁必须由获取它的线程释放。一旦线程获得了重入锁,同一个线程再次获取它将不阻塞;线程必须在每次获取它时释放一次。

​ 递归锁和普通锁方法相同。

from time import sleep
from threading import Thread, RLock


class Account(object):

    def __init__(self):
        self._balance = 0
        self._lock = RLock()

    def deposit(self, money):
        # 先获取锁才能执行后续的代码
        self._lock.acquire()
        try:
            new_balance = self._balance + money
            sleep(0.01)
            self._balance = new_balance
        finally:
            # 在finally中执行释放锁的操作保证正常异常锁都能释放
            self._lock.release()

    @property
    def balance(self):
        return self._balance


class AddMoneyThread(Thread):

    def __init__(self, account, money):
        super().__init__()
        self._account = account
        self._money = money

    def run(self):
        self._account.deposit(self._money)


def main():
    account = Account()
    threads = []
    for _ in range(100):
        t = AddMoneyThread(account, 1)
        threads.append(t)
        t.start()
    for t in threads:
        t.join()
    print('账户余额为: ¥%d元' % account.balance)


if __name__ == '__main__':
    main()

线程同步

​ 线程是共享全局变量。查询不用加锁,修改时需要加锁。

​ 线程同步,线程间协同,通过某种技术,让一个线程访问某些数据时,其他线程不能访问这些数据,直到该线程完成对数据的操作。

​ 临界区(Critical Section)、互斥量(Mutex) -->(锁)、信号量(Semaphore)、事件(Event)

临界区:指的是一个访问共用资源的程序片段,而这些共用资源又无法同时被多个线程访问的特性。当有线程进入临界区段时,其他线程或是进程必须等待,以确保这些共用资源是被互斥获得使用

Condition

线程间同步之条件变量

Python提供的Condition对象提供了对复杂线程同步问题的支持。Condition被称为条件变量,除了提供与Lock类似的acquirerelease方法外,还提供了waitnotify方法。

构造方法Condition(lock=None),可以传入一个Lock,或RLock对象,默认是RLock.

Condition用于生产者,消费者模型,为了解决生产者消费者速度匹配的问题。

名称 含义
acquire(*args) 获取锁
wait(self,timeout=None) 等待或超时
notify(n=1) 唤醒至多指定数目的等待线程,没有等待的线程就没有任何操作
notify_all() 唤醒所有的等待的线程
release() 释放锁
"""
实现场景:当a同学王火锅里面添加鱼丸加满后(最多5个,加满后通知b去吃掉),通知b同学去吃掉鱼丸(吃到0的时候通知a同学继续添加)
"""
import threading
import time

con = threading.Condition()
num = 0

# 生产者
class Producer(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)

    def run(self):
        with con:
            # 锁定线程
            global num
            while True:
                print("开始添加!!!")
                num += 1
                print("火锅里面鱼丸个数:%s" % str(num))
                time.sleep(1)
                if num >= 5:
                    print("火锅里面里面鱼丸数量已经到达5个,无法添加了!")
                    # 唤醒等待的线程
                    con.notify()  # 唤醒小伙伴开吃啦
                    # 等待通知
                    con.wait()


# 消费者
class Consumers(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)

    def run(self):
        with con:
            global num
            while True:
                print("开始吃啦!!!")
                num -= 1
                print("火锅里面剩余鱼丸数量:%s" % str(num))
                time.sleep(2)
                if num <= 0:
                    print("锅底没货了,赶紧加鱼丸吧!")
                    con.notify()  # 唤醒其它线程
                    # 等待通知
                    con.wait()


p = Producer()
c = Consumers()
p.start()
c.start()

Condition 总结

condition用于生产者消费者模型中,解决生产值速度匹配的问题。

采用了通知机制,非常有效。

使用方式

使用Condition,必须先acqueire,用完后relsase,因为内部使用了锁,默认使用RLock,最好的方式是使用with上下文。

消费者wait,等待通知。

生产者生产好消息,对消费者发通知,可以使用notify或notify_all方法。

Event

​ Event事件,是线程间通信机制中最简单的实现,使用一个内部的标记flag,通过flag的True或False的变化来进行操作。

​ event = threading.Event() # 事件变量 (flag = False) 等待 , (flag = True) 解除等待

​ event.wait() --> flag = False # 在线程内部 设置为事件为等待状态,线程会暂停执行。

​ event.set() --> flag = True # 让线程从等待状态退出,继续执行

​ event.clear() --> flag = False # 清空状态。

名称 含义
set() flag标记设置为True
clear() flag标记设置为Fales
is_set() return flag 返回标记
wait(timeout=None) 设置标记为True的时长,None为无标记,等到返回True,超时返回False

需求:

老板雇佣了一个工人,让他生产杯子,老板一直等待工人,直到生产了10个杯子。

import time
import logging
from threading import Event, Thread

log_format = "%(asctime)s-%(levelname)s-%(module)s-%(funcName)s-%(threadName)s%(message)s"
logging.basicConfig(format=log_format, level=logging.INFO)


def boss(event: Event):
    logging.info("我是老板正在等待中....")
    event.wait()
    logging.info("完成的不错!")


def worker(event: Event, count: int = 10):
    logging.info("我正在工作中....")
    cups = []
    while True:
        logging.info("制作中....")
        time.sleep(0.5)
        cups.append("u")
        if len(cups) >= count:
            event.set()
            break

    logging.info("我制作的杯子.cups={}".format(cups))


event = Event()

w = Thread(target=worker, args=(event,))
b = Thread(target=boss, args=(event,))
b.start()
w.start()

Barrier

屏障,可以想象成路障、道闸。

Python3.2引入的新的功能。

Barrier类是设置了一个线程数量障碍,当等待的线程到达了这个数量就会唤醒所有的等待线程。

名称 含义
Barrier(parties,action=None,timeout=None) 构建Barrier对象,指定参与方数目,timeout是wait为指定超时时间时的默认值。
n_waition 当前在屏障中等待的线程数
parities 通过障碍所需的线程数。
wait(time=None) 等待通过屏障
#!/usr/bin/env python3 
# coding:utf-8 
# author:ZuoJie
# date:2019/5/15 9:32

import time
import threading
import logging
from threading import Barrier

log_format = "%(asctime)s-15s\t [%(threadName)s,%(thread)8d] %(message)s"
logging.basicConfig(level=logging.INFO, format=log_format)


def worker(barrier: threading.Barrier):
    logging.info("等待{}线程".format(barrier.n_waiting))
    try:
        barrier_id = barrier.wait()
        logging.info("下一个是barrier{}".format(barrier_id))
    except threading.BrokenBarrierError:
        logging.info("损毁的Barrier")


barrier = Barrier(3)

for _ in range(100):
    threading.Thread(target=worker, args=(barrier,)).start()
名称 作用
bronken 如果屏障处于打破状态,返回True
abort() 将瓶装置于broken状态,直到reset的方法来恢复屏障
reset() 恢复屏障,重新开始拦截

Barrier应用

并发初始化

​ 所有线程都必须完成初始化后,才能继续工作,例如运行前的加载数据,检查,如果这这些工作没有完成就开始工作,将不能正常工作。

例如:启动一个程序,需要先加载磁盘文件、缓存预热、初始化连接池等工作后才能正常使用。这些工作可以齐头并进的。不过只有都满足了,程序才能继续向后执行。假设数据库连接失败了,则初始化工作失败,就要about,屏障broken,所有线程收到异常退出。

Semaphore信号量

semaphore是一个内置的计数器.

​ 设置同一时刻允许运行的线程的数量。

​ 信号量,信号量对象内部维护一个倒计数器,每一次acquire都会减1,当acquire方法发现计数为0就阻塞请求的线程,直到其它线程对信号量release后,计数大于0,恢复阻塞的线程。

名称 含义
Semaphore(value=1) 构造方法,value小于0,抛出ValueError错误
acquire(blocking=True,time=None) 获取信号量,计数器减1,获取成功返回True
release() 释放信号量,计数器加1
#!/usr/bin/env python3 
# coding:utf-8 
# author:ZuoJie
# date:2019/5/15 10:26
import time
import threading
import logging
from threading import Semaphore

log_format = "%(asctime)s-15s\t [%(threadName)s,%(thread)8d] %(message)s"
logging.basicConfig(level=logging.INFO, format=log_format)

sem = Semaphore(3)


class MyThread(threading.Thread):
    def run(self):
        logging.info("{} 等待中的信号量".format(self.name))
        sem.acquire()
        logging.info("被锁定的信号量{}".format(self.name))
        time.sleep(5)
        logging.info("{} 解除锁定信号量".format(self.name))
        sem.release()


for i in range(10):
    t = MyThread(name="Thread-{}".format(i + 1))
    t.start()

queue--- 一个同步的队列类

queue 模块实现多生产者,多消费者队列。当信息必须安全的在多线程之间交换时,它在线程编程中是特别有用的。

Queue对象

​ 队列对象 (Queue, LifoQueue, 或者 PriorityQueue) 提供下列描述的公共方法。

方法 说明
qsize() 返回队列的大致大小。注意,qsize() > 0 不保证后续的 get() 不被阻塞,qsize() < maxsize 也不保证 put() 不被阻塞。
empty() 如果队列为空,返回 True ,否则返回 False
full() 如果队列是满的返回 True ,否则返回 False
put(item, block=True, timeout=None) item 放入队列。如果可选参数 block 是 true 并且 timeoutNone (默认),则在必要时阻塞至有空闲插槽可用。
put_nowait(item) 相当于 put(item, False)
get(block=True, timeout=None) 从队列中移除并返回一个项目。
Queue.get_nowait() 相当于 get(False)
join() 阻塞至队列中所有的元素都被接收和处理完毕。
task_done() 表示前面排队的任务已经被完成。被队列的消费者线程使用。每个 get() 被用于获取一个任务, 后续调用 task_done() 告诉队列,该任务的处理已经完成。
def worker():
    while True:
        item = q.get()
        if item is None:
            break
        do_work(item)
        q.task_done()

q = queue.Queue()
threads = []
for i in range(num_worker_threads):
    t = threading.Thread(target=worker)
    t.start()
    threads.append(t)

for item in source():
    q.put(item)

# block until all tasks are done
q.join()

# stop workers
for i in range(num_worker_threads):
    q.put(None)
for t in threads:
    t.join()

定时器 Timer/延时执行

threading.Timer继承自Thread。

Timer(interval, function, args=[], kwargs={})

参数 说明
interval 指定延时的时间
function 要执行的方法
args/kwargs 方法的参数
import threading

def func():
    print('hello timer!')

timer = threading.Timer(5, func)
timer.start()

GIL全局解释器锁

  • CPython在解释器进程级别有一把锁,就做GIL全局解释器锁。
  • GIL保证CPython进程中,在一个时刻只能有一个线程执行字节码,甚至在多核CPU下,也是如此。
  • CPython中IO密型操作,由于线程阻塞,就会调用其他线程。
  • IO密集型操作,使用多线程.CPU密集型使用多进程,绕开GIL.

Python中的多进程

​ Unix和Linux操作系统上提供了fork()系统调用来创建进程,调用fork()函数的是父进程,创建出的是子进程,子进程是父进程的一个拷贝,但是子进程拥有自己的PID。

fork()函数非常特殊它会返回两次,父进程中可以通过fork()函数的返回值得到子进程的PID,而子进程中的返回值永远都是0。Python的os模块提供了fork()函数。

由于Windows系统没有fork()调用,因此要实现跨平台的多进程编程,可以使用multiprocessing模块的Process类来创建子进程,而且该模块还提供了更高级的封装,例如批量启动进程的进程池(Pool)、用于进程间通信的队列(Queue)和管道(Pipe)等。

上下文和启动方法

根据不同的平台, multiprocessing 支持三种启动进程的方法。这些 启动方法

  • spawn

    父进程启动一个新的Python解释器进程。子进程只会继承那些运行进程对象的 run() 方法所需的资源。特别是父进程中非必须的文件描述符和句柄不会被继承。相对于使用 fork 或者 forkserver,使用这个方法启动进程相当慢。可在Unix和Windows上使用。 Windows上的默认设置。

  • fork

    父进程使用 os.fork() 来产生 Python 解释器分叉。子进程在开始时实际上与父进程相同。父进程的所有资源都由子进程继承。请注意,安全分叉多线程进程是棘手的。只存在于Unix。Unix中的默认值。

  • forkserver

    程序启动并选择* forkserver * 启动方法时,将启动服务器进程。从那时起,每当需要一个新进程时,父进程就会连接到服务器并请求它分叉一个新进程。分叉服务器进程是单线程的,因此使用 os.fork() 是安全的。没有不必要的资源被继承。可在Unix平台上使用,支持通过Unix管道传递文件描述符。

下面用一个下载文件的例子来说明使用多进程和不使用多进程到底有什么差别,先看看下面的代码。

from random import randint
from time import time, sleep


def download_task(filename):
    print('开始下载%s...' % filename)
    time_to_download = randint(5, 10)
    sleep(time_to_download)
    print('%s下载完成! 耗费了%d秒' % (filename, time_to_download))


def main():
    start = time()
    download_task('Python从入门到住院.pdf')
    download_task('Peking Hot.avi')
    end = time()
    print('总共耗费了%.2f秒.' % (end - start))


if __name__ == '__main__':
    main()

下面是运行程序得到的一次运行结果。

开始下载Python从入门到住院.pdf...
Python从入门到住院.pdf下载完成! 耗费了6秒
开始下载Peking Hot.avi...
Peking Hot.avi下载完成! 耗费了7秒
总共耗费了13.01秒.

从上面的例子可以看出,如果程序中的代码只能按顺序一点点的往下执行,那么即使执行两个毫不相关的下载任务,也需要先等待一个文件下载完成后才能开始下一个下载任务,很显然这并不合理也没有效率。接下来我们使用多进程的方式将两个下载任务放到不同的进程中,代码如下所示。

from multiprocessing import Process
from os import getpid
from random import randint
from time import time, sleep


def download_task(filename):
    print('启动下载进程,进程号[%d].' % getpid())
    print('开始下载%s...' % filename)
    time_to_download = randint(5, 10)
    sleep(time_to_download)
    print('%s下载完成! 耗费了%d秒' % (filename, time_to_download))


def main():
    start = time()
    p1 = Process(target=download_task, args=('Python从入门到住院.pdf', ))
    p1.start()
    p2 = Process(target=download_task, args=('Peking Hot.avi', ))
    p2.start()
    p1.join()
    p2.join()
    end = time()
    print('总共耗费了%.2f秒.' % (end - start))


if __name__ == '__main__':
    main()

​ 在上面的代码中,我们通过Process类创建了进程对象,通过target参数我们传入一个函数来表示进程启动后要执行的代码,后面的args是一个元组,它代表了传递给函数的参数。Process对象的start方法用来启动进程,而join方法表示等待进程执行结束。运行上面的代码可以明显发现两个下载任务“同时”启动了,而且程序的执行时间将大大缩短,不再是两个任务的时间总和。下面是程序的一次执行结果。

启动下载进程,进程号[1530].
开始下载Python从入门到住院.pdf...
启动下载进程,进程号[1531].
开始下载Peking Hot.avi...
Peking Hot.avi下载完成! 耗费了7秒
Python从入门到住院.pdf下载完成! 耗费了10秒
总共耗费了10.01秒.

我们也可以使用subprocess模块中的类和函数来创建和启动子进程,然后通过管道来和子进程通信,这些内容我们不在此进行讲解,有兴趣的读者可以自己了解这些知识。接下来我们将重点放在如何实现两个进程间的通信。我们启动两个进程,一个输出Ping,一个输出Pong,两个进程输出的Ping和Pong加起来一共10个。听起来很简单吧,但是如果这样写可是错的哦。

from multiprocessing import Process
from time import sleep

counter = 0


def sub_task(string):
    global counter
    while counter < 10:
        print(string, end='', flush=True)
        counter += 1
        sleep(0.01)


def main():
    Process(target=sub_task, args=('Ping', )).start()
    Process(target=sub_task, args=('Pong', )).start()


if __name__ == '__main__':
    main()

​ 看起来没毛病,但是最后的结果是Ping和Pong各输出了10个,Why?当我们在程序中创建进程的时候,子进程复制了父进程及其所有的数据结构,每个子进程有自己独立的内存空间,这也就意味着两个子进程中各有一个counter变量,所以结果也就可想而知了。要解决这个问题比较简单的办法是使用multiprocessing模块中的Queue类,它是可以被多个进程共享的队列,底层是通过管道和信号量(semaphore)机制来实现的,有兴趣的读者可以自己尝试一下。

由于Python的GIL,多进程未必是CPU密集型程序的好的选择

多进程可以完全独立的进程环境中运行程序,可以充分地利用多处理器。

但是进程本身的隔离带来的数据不共享也是一问题,而且线程比进程轻量。

multiprocessing

Process类

process类遵循了Thread类的API,减少了学习的难度。

#!/usr/bin/env python3 
# coding:utf-8 
# author:ZuoJie
# date:2019/5/16 9:48

import time
import logging
import multiprocessing
from functools import wraps

log_format = "%(asctime)-15s\t %(message)s"
logging.basicConfig(level=logging.INFO, format=log_format)


def run_time(func):
    @wraps(func)
    def func_fun_time(*args, **kwargs):
        start = time.time()
        func(*args, **kwargs)
        logging.info("运行了{}s".format(time.time() - start))

    return func_fun_time


@run_time
def calc(q: multiprocessing.Queue, n: int):
    num = 0
    for _ in range(n):
        num += 1
    q.put(num)


if __name__ == '__main__':
    start = time.time()
    lis = []
    q = multiprocessing.Queue()
    for i in range(2):
        multiprocessing.Process(target=calc, args=(q, 50000000)).start()
    for i in range(2):
        lis.append(q.get())
    print(sum(lis))
    print(time.time() - start)
参数 作用
pid 进程id
exitcode 进程的退出状态
terminate() 终止指定的进程

进程间同步

​ 进程提供了与线程一样的类,使用的方法一样,使用的效果也类似。

不过进程的代价要高于线程,而且底层实现是不同的,只不过Python屏蔽了这些,让用户简单使用。

multiprocessing还提供了共享内存、服务器进程来共享数据,还提供了Queue队列、Pipe管道用户进程通信。

进程间可以使用RPC(即Remote Procedure Call,远程过程调用)来进行数据通信

通信方式不同

1.多进程就是启动了多个解释器进程,进程间通信必须要序列化、反序列化

2.数据的线程安全问题

​ 由于每个进程中没有实现多线程,GIL可以说就没有什么用了。

进程池

池的概念:池是一组资源的集合,这组资源在服务器启动之初就完全被创建并初始化,这称为静态资源分配。

当服务器进入正式运行阶段,即开始处理客户请求的时候,如果它需要相关的资源,就可以直接从池中获取,无需动态分配。

直接从池中取得所需资源比动态分配资源的速度要快得多,因为分配系统资源的系统调用都是很耗时的。

当服务器处理完一个客户连接后,可以把相关的资源放回池中,无需执行系统调用来释放资源。

由于服务器的硬件资源“充裕”,那么提高服务器性能的一个很直接的方法就是以空间换时间,即“浪费”服务器的硬件资源,以换取其运行效率。

进程池是由服务器预先创建的一组子进程,这些子进程的数目在 3~10 个之间(当然这只是典型情况)。线程池中的线程数量应该和 CPU 数量差不多。

进程池中的所有子进程都运行着相同的代码,并具有相同的属性,比如优先级、 PGID 等。

当有新的任务来到时,主进程将通过某种方式选择进程池中的某一个子进程来为之服务。相比于动态创建子进程,选择一个已经存在的子进程的代价显得小得多。至于主进程选择哪个子进程来为新任务服务,则有两种方法:

1)主进程使用某种算法来主动选择子进程。最简单、最常用的算法是随机算法和 Round Robin (轮流算法)。

2)主进程和所有子进程通过一个共享的工作队列来同步,子进程都睡眠在该工作队列上。当有新的任务到来时,主进程将任务添加到工作队列中。这将唤醒正在等待任务的子进程,不过只有一个子进程将获得新任务的“接管权”,它可以从工作队列中取出任务并执行之,而其他子进程将继续睡眠在工作队列上。

当选择好子进程后,主进程还需要使用某种通知机制来告诉目标子进程有新任务需要处理,并传递必要的数据。最简单的方式是,在父进程和子进程之间预先建立好一条管道,然后通过管道来实现所有的进程间通信。在父线程和子线程之间传递数据就要简单得多,因为我们可以把这些数据定义为全局,那么它们本身就是被所有线程共享的。

img

#!/usr/bin/env python3 
# coding:utf-8 
# author:ZuoJie
# date:2019/5/16 9:48

import time
import logging
import multiprocessing
from functools import wraps

log_format = "%(asctime)-15s\t %(message)s"
logging.basicConfig(level=logging.INFO, format=log_format)


def run_time(func):
    @wraps(func)
    def func_fun_time(*args, **kwargs):
        start = time.time()
        func(*args, **kwargs)
        logging.info("运行了{}s".format(time.time() - start))

    return func_fun_time


@run_time
def calc(n: int):
    num = 0
    for _ in range(n):
        num += 1


if __name__ == '__main__':
    start = time.time()
    pool = multiprocessing.Pool(4)
    for i in range(4):
        pool.apply_async(func=calc, args=(25000000,))
    pool.close()
    pool.join()
    print(time.time() - start)

多进程、多线程的选择

1.CPU密集型

CPython中使用了GIL,多线程的时候锁相互竞争,且多核优秀不能发挥,Python多进程效率更高。

2.IO密集型

适合是多线程,减少IO序列化开销,且在等待的时候,切换到其他线程执行,效率不错。

应用

请求/应答模型:WEB应用中常见的处理模式。

master启动多个worker工作进程,一般与CPU数量一致

worker工作进程中启动多进程,提高并发处理能力,worker处理用户的请求,往往需要等待数据。

这就是nginx的工作模式。

concurrent包

concurrent.futures

Python 3.2版本中引入的模块

from concurrent import futures

futures.ThreadPoolExecutor()  # 异步调用线程池
futures.ProcessPoolExecutor()  # 异步调用进程池

使用的时候首先要定义一个池的执行器对象,Executor类子类对象。

方法 作用
ThreadPoolExecutor(max_worker=1) 池中至多创建max_workers个线程池来同时异步执行,返回Executor实例
submit(fn,args,*kwargs) 提交执行的函数及其参数,返回Future实例
shutdown(wait=True) 清理池

Futuer类

方法 作用
result() 可以常看代用的返回结果
done() 如果调用成功的取消或者执行完成,返回True
cancelled() 如果调用被成功取消,返回True
running() 如果正在运行且不能被取消,返回True
cancal() 尝试取消调用,如果已经执行且不能取消的返回False,否则返回True
result(timeout=None) 出concurrent.futures.TimeoutError异常
exception(timeout=None) 取返回值异常,超时为None,一直等待返回,超时时间设置到期,抛出concurrent.futures.TimeoutError异常
#!/usr/bin/env python3 
# coding:utf-8 
# author:ZuoJie
# date:2019/5/16 11:53
import time
import threading
import logging
from concurrent import futures

log_format = "%(asctime)-15s\t %(threadName)s-%(thread)s-%(message)s"
logging.basicConfig(level=logging.INFO, format=log_format)


# futures.ThreadPoolExecutor()  # 异步调用线程池
# futures.ProcessPoolExecutor()  # 异步调用进程池
# f = futures.Future()

def worker(n):
    logging.info("working-{}".format(n))
    time.sleep(5)
    logging.info("end work-{}".format(n))


if __name__ == '__main__':
    # tpe = futures.ThreadPoolExecutor(3)
    tpe = futures.ProcessPoolExecutor(3)
    fs = []

    for i in range(3):
        f = tpe.submit(worker, i)
        fs.append(f)

    for i in range(3, 6):
        f = tpe.submit(worker, i)
        fs.append(f)

    while True:
        time.sleep(2)
        logging.info(threading.enumerate())

        flag = True

        for f in fs:
            flag = flag and f.done()
        if flag:
            tpe.shutdown()
            logging.info(threading.enumerate())
            break

上下文支持管理

concurrent.futures.ProcessPoolExecutor 继承concurrent.futures.base.Executor.而父类有__enter____exit__方法,支持上下文管理,可以使用with语句。

with futures.ProcessPoolExecutor(3) as tpe:
    pass

总结

concurrennt 统一了线程池、进程池调用,简化了编程

使用Python简单的思想哲学的体现。

唯一的缺点是:无法设置线程名称。

应用案例

例子1:将耗时间的任务放到线程中以获得更好的用户体验。

​ 如下所示的界面中,有“下载”和“关于”两个按钮,用休眠的方式模拟点击“下载”按钮会联网下载文件需要耗费10秒的时间,如果不使用“多线程”,我们会发现,当点击“下载”按钮后整个程序的其他部分都被这个耗时间的任务阻塞而无法执行了,这显然是非常糟糕的用户体验,代码如下所示。

import time
import tkinter
import tkinter.messagebox


def download():
    # 模拟下载任务需要花费10秒钟时间
    time.sleep(10)
    tkinter.messagebox.showinfo('提示', '下载完成!')


def show_about():
    tkinter.messagebox.showinfo('关于', '作者: 老吴(v1.0)')


def main():
    top = tkinter.Tk()
    top.title('单线程')
    top.geometry('200x150')
    top.wm_attributes('-topmost', True)

    panel = tkinter.Frame(top)
    button1 = tkinter.Button(panel, text='下载', command=download)
    button1.pack(side='left')
    button2 = tkinter.Button(panel, text='关于', command=show_about)
    button2.pack(side='right')
    panel.pack(side='bottom')

    tkinter.mainloop()


if __name__ == '__main__':
    main()

如果使用多线程将耗时间的任务放到一个独立的线程中执行,这样就不会因为执行耗时间的任务而阻塞了主线程,修改后的代码如下所示。

import time
import tkinter
import tkinter.messagebox
from threading import Thread


def main():

    class DownloadTaskHandler(Thread):

        def run(self):
            time.sleep(10)
            tkinter.messagebox.showinfo('提示', '下载完成!')
            # 启用下载按钮
            button1.config(state=tkinter.NORMAL)

    def download():
        # 禁用下载按钮
        button1.config(state=tkinter.DISABLED)
        # 通过daemon参数将线程设置为守护线程(主程序退出就不再保留执行)
        # 在线程中处理耗时间的下载任务
        DownloadTaskHandler(daemon=True).start()

    def show_about():
        tkinter.messagebox.showinfo('关于', '作者: 老吴(v1.0)')

    top = tkinter.Tk()
    top.title('单线程')
    top.geometry('200x150')
    top.wm_attributes('-topmost', 1)

    panel = tkinter.Frame(top)
    button1 = tkinter.Button(panel, text='下载', command=download)
    button1.pack(side='left')
    button2 = tkinter.Button(panel, text='关于', command=show_about)
    button2.pack(side='right')
    panel.pack(side='bottom')

    tkinter.mainloop()


if __name__ == '__main__':
    main()

例子2:使用多进程对复杂任务进行“分而治之”。

我们来完成1~100000000求和的计算密集型任务,这个问题本身非常简单,有点循环的知识就能解决,代码如下所示。

from time import time


def main():
    total = 0
    number_list = [x for x in range(1, 100000001)]
    start = time()
    for number in number_list:
        total += number
    print(total)
    end = time()
    print('Execution time: %.3fs' % (end - start))


if __name__ == '__main__':
    main()

在上面的代码中,我故意先去创建了一个列表容器然后填入了100000000个数,这一步其实是比较耗时间的,所以为了公平起见,当我们将这个任务分解到8个进程中去执行的时候,我们暂时也不考虑列表切片操作花费的时间,只是把做运算和合并运算结果的时间统计出来,代码如下所示。

from multiprocessing import Process, Queue
from random import randint
from time import time


def task_handler(curr_list, result_queue):
    total = 0
    for number in curr_list:
        total += number
    result_queue.put(total)


def main():
    processes = []
    number_list = [x for x in range(1, 100000001)]
    result_queue = Queue()
    index = 0
    # 启动8个进程将数据切片后进行运算
    for _ in range(8):
        p = Process(target=task_handler,
                    args=(number_list[index:index + 12500000], result_queue))
        index += 12500000
        processes.append(p)
        p.start()
    # 开始记录所有进程执行完成花费的时间
    start = time()
    for p in processes:
        p.join()
    # 合并执行结果
    total = 0
    while not result_queue.empty():
        total += result_queue.get()
    print(total)
    end = time()
    print('Execution time: ', (end - start), 's', sep='')


if __name__ == '__main__':
    main()

​ 比较两段代码的执行结果,使用多进程后由于获得了更多的CPU执行时间以及更好的利用了CPU的多核特性,明显的减少了程序的执行时间,而且计算量越大效果越明显。当然,如果愿意还可以将多个进程部署在不同的计算机上,做成分布式进程,具体的做法就是通过multiprocessing.managers模块中提供的管理器将Queue对象通过网络共享出来(注册到网络上让其他计算机可以访问),这部分内容也留到爬虫的专题再进行讲解。