Python系列(十三)多线程、多进程、线程池

Posted by YaPi on December 23, 2018

多线程

Python GIL

GIL全局解释器锁:global interpreter lock

python中(cpython解释器)的一个线程对应与c语言的一个线程。因为cpython解析只允许拥有GIL全局解析器锁才能运行程序 这样就保证了保证同一个时刻只允许一个线程可以使用cpu。多线程并不会充分调用两个CPU,而是会在一个CPU上充分运转。 而多进程则是会完全调用两个CPU

gil会更具执行的字节码行数以及时间片释放gil,也会在遇到io操作时释放全局锁

以下例子执行多次,结果并不相同

total = 0


def add():
    global total
    for i in range(1000000):
        total += 1


def add2():
    global total
    for i in range(1000000):
        total += 1


import threading

t = threading.Thread(target=add)
t2 = threading.Thread(target=add2)
t.start()
t2.start()
t.join()
t2.join()
print(total)

实现多线程

两种方法实现多线程

  1. 引入threading包,传入执行方法实现
  2. 继承threading.Thread实现
# 对于io操作来说,多线程和多进程差别不大
# 1. 通过Thread类实例化
import threading
import time


def get_detail_html(url):
    print("get started ", url)
    time.sleep(2)
    print("get end ", url)


def get_detail_url(url):
    print("get url started ", url)
    time.sleep(4)
    print("get url end ", url)


# 通过继承Thread来实现多线程

class GetDetailHtml(threading.Thread):
    def __init__(self, name):
        super().__init__(name=name)

    def run(self):
        print("GetDetailHtml Started ")
        time.sleep(2)
        print("GetDetailHtml end ")


class GetDetailUrl(threading.Thread):
    def __init__(self, name):
        super().__init__(name=name)

    def run(self):
        print("GetDetailUrl Started ")
        time.sleep(4)
        print("GetDetailUrl end ")


if __name__ == '__main__':
    # 使用方法实现多线程
    # thread1 = threading.Thread(target=get_detail_html, args=("",))
    # thread2 = threading.Thread(target=get_detail_url, args=("",))

    # 使用继承实现多线程
    thread1 = GetDetailHtml("123")
    thread2 = GetDetailUrl("234")

    start_time = time.time()
    thread1.setDaemon(True)
    thread2.setDaemon(True)

    thread1.start()
    thread2.start()

    # 将两个线程阻塞到这儿,执行完后再运行后续逻辑
    thread1.join()
    thread2.join()

    # 主线程推出的时候,子线程推出
    print("last time: {}".format(time.time() - start_time))

线程间通信

常用两种方式

  1. 共享变量
  2. Queue
from queue import Queue

details_url_queue = Queue(maxsize=10000)

# 入对方法
# def put(self, item, block=True, timeout=None):
details_url_queue.put()

# 出对方法
# 无数据会阻塞
data = queue.get()

线程同步

Lock

Lock锁,不可冲入,会影响性能,可能会引起死锁

死锁对情况,对于Lock来说不是可重入的,获取到锁再次获取会造成死锁。同时,对于 多资源多锁获取时,也容易造成死锁

from threading import Lock

lock.acquire()
...业务逻辑
lock.release()
RLock

RLock,可重入锁,在同一个线程里面,可以连续调用多次acquire,但是需要相等数量的release操作

Condition
cond = threading.Condition()

# 启动顺序很重要
# 在调用 with cond之后才能调用 cond.wait()或cond.notify()方法
# condition 有两层锁,一把底层锁会在线程调用wait方法的时候释放
# 上面的锁会在每次调用wait的时候分配一把并放入到cond的等待队列中,等待notify方法的唤醒
semaphore

semaphore适用于控制进入数量的锁

sem = threading.Semaphore(3)

# 获取许可,最多3个,后续释放后可再次获取
sem.acquire()
# 释放一个许可
sem.release() 

线程池

from concurrent.futures import ThreadPoolExecutor, as_completed, wait
import time
from concurrent.futures._base import ALL_COMPLETED


# 线程池,为什么要用
# 主线程中可以获取某一个线程的状态或者某一个任务的状态,以及返回值
# 当一个线程完成时,我们主线程立即知道

def get_html(times):
    time.sleep(times)
    print("get page {} success".format(times))
    return times


executor = ThreadPoolExecutor(max_workers=2)

# 获取已经成功的task的返回
urls = [3, 2, 4]
all_task = [executor.submit(get_html, url) for url in urls]

# wait() 等待某一个或者某一些任务完成后执行
# wait(fs, timeout=None, return_when=ALL_COMPLETED)
# return_when 指定什么时候就不阻塞来,比如所有的都执行完、第一个执行完等
# FIRST_COMPLETED = 'FIRST_COMPLETED'
# FIRST_EXCEPTION = 'FIRST_EXCEPTION'
# ALL_COMPLETED = 'ALL_COMPLETED'
# _AS_COMPLETED = '_AS_COMPLETED'
wait(all_task, return_when=ALL_COMPLETED)
print("main")

# 谁先完成就处理谁
# for future in as_completed(all_task):
#     data = future.result()
#     print("get {} page success".format(data))

# 通过executor获取已经完成的task
# 这种方式返回的顺序是urls的顺序
for data in executor.map(get_html, urls):
    print("get {} map page success".format(data))

# 通过submit函数提交执行的函数到线程池中,submit是立即返回
# task1 = executor.submit(get_html, 3)
# task2 = executor.submit(get_html, 2)
#
# # done方法用于判断某个任务是否完成
# print(task1.done()) 
# # 取消某个未执行的任务
# print(task2.cancel())
# time.sleep(3)
# print(task2.done())
# # 获取返回的结果
# print(task1.result())

多进程

对于耗cpu的操作,多线程不能起到很高的效率。因为python中有一个gil锁。但是,进程切换的代价 要高于线程切换的代价。对于io操作来说,更适用多线程。

对于耗费cpu的操作,计算,多进程优于多线程

import time
from concurrent.futures._base import as_completed
from concurrent.futures import ThreadPoolExecutor
# 多进程模式
from concurrent.futures import ProcessPoolExecutor

def fib(n):
    if n <= 2:
        return 1
    return fib(n - 1) + fib(n - 2)


if __name__ == '__main__':
    # 多线程模式
    # with ThreadPoolExecutor (3) as executor:
    # 多进程模式
    # 多进程模式需要在 if __name__下,linux系统下不需要
    with ProcessPoolExecutor(3) as executor:
        urls = [3, 2, 4]
        start_time = time.time()
        all_task = [executor.submit(fib, num) for num in range(25, 35)]
        for future in as_completed(all_task):
            data = future.result()
            print("exe result: {}".format(data))

        print("last time is: {}".format(time.time() - start_time))

多进程成功fork过后,子进程会继续执行fork代码之后的代码

import os 
pid = os.fork()

多进程编程

import multiprocessing import Process, Queue, Pool, Manager, Pipe
# 直接创建进程执行
progress = multiprocessing.Process(target=get_html, args=(2,))
# 也可以继承执行
# class A(multiprocessing.Process):

# 创建进程池
pool = multiprocessing.Pool(multiprocessing.cpu_count())
result = pool.apply_async(get_html,args=(3,))
# 不再接受任务
pool.close()
# 等待任务完成 一定要先调用pool.close(),不再接受任务
pool.join()
print(result.get())

# imap
for result in pool.imap(get_html, [1,5,3]):
    print(result)
# 谁先完成就执行    
#  for result in pool.unorderd(get_html, [1,5,3]):

multiprocessing中的Queue不能用于multiprocessing的Pool 需要使用Manager中的Queue

from multiprocessing import Manager

queue = Manager().Queue()
通过pipe实现进程间通信
# pipe只能用于两个进程间的通信 
import multiprocessing
from multiprocessing import Pipe


# Pipe性能高于queue

def producer(pipe):
    pipe.send("aaa")


def consumer(pipe):
    print(pipe.recv())


if __name__ == "__main__":
    r, s = Pipe()
    s = multiprocessing.Process(target=producer, args=(s,))
    r = multiprocessing.Process(target=consumer, args=(r,))
    s.start()
    r.start()
    s.join()
    r.join()