多线程
Python GIL
GIL全局解释器锁:global interpreter lock
python中(cpython解释器)的一个线程对应与c语言的一个线程。因为cpython解析只允许拥有GIL全局解析器锁才能运行程序 这样就保证了保证同一个时刻只允许一个线程可以使用cpu。多线程并不会充分调用两个CPU,而是会在一个CPU上充分运转。 而多进程则是会完全调用两个CPU
gil会更具执行的字节码行数以及时间片释放gil,也会在遇到io操作时释放全局锁
以下例子执行多次,结果并不相同
total = 0
def add():
global total
for i in range(1000000):
total += 1
def add2():
global total
for i in range(1000000):
total += 1
import threading
t = threading.Thread(target=add)
t2 = threading.Thread(target=add2)
t.start()
t2.start()
t.join()
t2.join()
print(total)
实现多线程
两种方法实现多线程
- 引入threading包,传入执行方法实现
- 继承threading.Thread实现
# 对于io操作来说,多线程和多进程差别不大
# 1. 通过Thread类实例化
import threading
import time
def get_detail_html(url):
print("get started ", url)
time.sleep(2)
print("get end ", url)
def get_detail_url(url):
print("get url started ", url)
time.sleep(4)
print("get url end ", url)
# 通过继承Thread来实现多线程
class GetDetailHtml(threading.Thread):
def __init__(self, name):
super().__init__(name=name)
def run(self):
print("GetDetailHtml Started ")
time.sleep(2)
print("GetDetailHtml end ")
class GetDetailUrl(threading.Thread):
def __init__(self, name):
super().__init__(name=name)
def run(self):
print("GetDetailUrl Started ")
time.sleep(4)
print("GetDetailUrl end ")
if __name__ == '__main__':
# 使用方法实现多线程
# thread1 = threading.Thread(target=get_detail_html, args=("",))
# thread2 = threading.Thread(target=get_detail_url, args=("",))
# 使用继承实现多线程
thread1 = GetDetailHtml("123")
thread2 = GetDetailUrl("234")
start_time = time.time()
thread1.setDaemon(True)
thread2.setDaemon(True)
thread1.start()
thread2.start()
# 将两个线程阻塞到这儿,执行完后再运行后续逻辑
thread1.join()
thread2.join()
# 主线程推出的时候,子线程推出
print("last time: {}".format(time.time() - start_time))
线程间通信
常用两种方式
- 共享变量
- Queue
from queue import Queue
details_url_queue = Queue(maxsize=10000)
# 入对方法
# def put(self, item, block=True, timeout=None):
details_url_queue.put()
# 出对方法
# 无数据会阻塞
data = queue.get()
线程同步
Lock
Lock锁,不可冲入,会影响性能,可能会引起死锁
死锁对情况,对于Lock来说不是可重入的,获取到锁再次获取会造成死锁。同时,对于 多资源多锁获取时,也容易造成死锁
from threading import Lock
lock.acquire()
...业务逻辑
lock.release()
RLock
RLock,可重入锁,在同一个线程里面,可以连续调用多次acquire,但是需要相等数量的release操作
Condition
cond = threading.Condition()
# 启动顺序很重要
# 在调用 with cond之后才能调用 cond.wait()或cond.notify()方法
# condition 有两层锁,一把底层锁会在线程调用wait方法的时候释放
# 上面的锁会在每次调用wait的时候分配一把并放入到cond的等待队列中,等待notify方法的唤醒
semaphore
semaphore适用于控制进入数量的锁
sem = threading.Semaphore(3)
# 获取许可,最多3个,后续释放后可再次获取
sem.acquire()
# 释放一个许可
sem.release()
线程池
from concurrent.futures import ThreadPoolExecutor, as_completed, wait
import time
from concurrent.futures._base import ALL_COMPLETED
# 线程池,为什么要用
# 主线程中可以获取某一个线程的状态或者某一个任务的状态,以及返回值
# 当一个线程完成时,我们主线程立即知道
def get_html(times):
time.sleep(times)
print("get page {} success".format(times))
return times
executor = ThreadPoolExecutor(max_workers=2)
# 获取已经成功的task的返回
urls = [3, 2, 4]
all_task = [executor.submit(get_html, url) for url in urls]
# wait() 等待某一个或者某一些任务完成后执行
# wait(fs, timeout=None, return_when=ALL_COMPLETED)
# return_when 指定什么时候就不阻塞来,比如所有的都执行完、第一个执行完等
# FIRST_COMPLETED = 'FIRST_COMPLETED'
# FIRST_EXCEPTION = 'FIRST_EXCEPTION'
# ALL_COMPLETED = 'ALL_COMPLETED'
# _AS_COMPLETED = '_AS_COMPLETED'
wait(all_task, return_when=ALL_COMPLETED)
print("main")
# 谁先完成就处理谁
# for future in as_completed(all_task):
# data = future.result()
# print("get {} page success".format(data))
# 通过executor获取已经完成的task
# 这种方式返回的顺序是urls的顺序
for data in executor.map(get_html, urls):
print("get {} map page success".format(data))
# 通过submit函数提交执行的函数到线程池中,submit是立即返回
# task1 = executor.submit(get_html, 3)
# task2 = executor.submit(get_html, 2)
#
# # done方法用于判断某个任务是否完成
# print(task1.done())
# # 取消某个未执行的任务
# print(task2.cancel())
# time.sleep(3)
# print(task2.done())
# # 获取返回的结果
# print(task1.result())
多进程
对于耗cpu的操作,多线程不能起到很高的效率。因为python中有一个gil锁。但是,进程切换的代价 要高于线程切换的代价。对于io操作来说,更适用多线程。
对于耗费cpu的操作,计算,多进程优于多线程
import time
from concurrent.futures._base import as_completed
from concurrent.futures import ThreadPoolExecutor
# 多进程模式
from concurrent.futures import ProcessPoolExecutor
def fib(n):
if n <= 2:
return 1
return fib(n - 1) + fib(n - 2)
if __name__ == '__main__':
# 多线程模式
# with ThreadPoolExecutor (3) as executor:
# 多进程模式
# 多进程模式需要在 if __name__下,linux系统下不需要
with ProcessPoolExecutor(3) as executor:
urls = [3, 2, 4]
start_time = time.time()
all_task = [executor.submit(fib, num) for num in range(25, 35)]
for future in as_completed(all_task):
data = future.result()
print("exe result: {}".format(data))
print("last time is: {}".format(time.time() - start_time))
多进程成功fork过后,子进程会继续执行fork代码之后的代码
import os
pid = os.fork()
多进程编程
import multiprocessing import Process, Queue, Pool, Manager, Pipe
# 直接创建进程执行
progress = multiprocessing.Process(target=get_html, args=(2,))
# 也可以继承执行
# class A(multiprocessing.Process):
# 创建进程池
pool = multiprocessing.Pool(multiprocessing.cpu_count())
result = pool.apply_async(get_html,args=(3,))
# 不再接受任务
pool.close()
# 等待任务完成 一定要先调用pool.close(),不再接受任务
pool.join()
print(result.get())
# imap
for result in pool.imap(get_html, [1,5,3]):
print(result)
# 谁先完成就执行
# for result in pool.unorderd(get_html, [1,5,3]):
multiprocessing中的Queue不能用于multiprocessing的Pool 需要使用Manager中的Queue
from multiprocessing import Manager
queue = Manager().Queue()
通过pipe实现进程间通信
# pipe只能用于两个进程间的通信
import multiprocessing
from multiprocessing import Pipe
# Pipe性能高于queue
def producer(pipe):
pipe.send("aaa")
def consumer(pipe):
print(pipe.recv())
if __name__ == "__main__":
r, s = Pipe()
s = multiprocessing.Process(target=producer, args=(s,))
r = multiprocessing.Process(target=consumer, args=(r,))
s.start()
r.start()
s.join()
r.join()