Python系列(十四)复用、协程、asyncio

Posted by YaPi on December 24, 2018

IO复用方式

import socket
from urllib.parse import urlparse
from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE

selector = DefaultSelector()


# 使用select完成http请求
class Fetcher:
    # 连接成功
    # 取消注册当前事件,并且发送数据
    # 同时注册发送请求返回监听事件,并指定返回监听事件之后执行的函数
    def connect(self, key):
        selector.unregister(key.fd)
        self.client.send(
            "GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(self.path, self.host).encode("utf8"))
        selector.register(self.client.fileno(), EVENT_READ, self.readable)

    # 返回监听
    def readable(self, key):
        d = self.client.recv(1024)
        if d:
            self.data += d
        else:
            selector.unregister(key.fd)

        self.data = self.data.decode("utf8")
        print(self.data)
        self.client.close()

    def get_url(self, url):
        # 通过socket请求html
        url = urlparse(url)
        self.host = url.netloc
        self.path = url.path
        self.data = b""
        if self.path == "":
            self.path = "/"

        self.client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.client.setblocking(False)

        try:
            self.client.connect((self.host, 80))
        except BlockingIOError as e:
            pass

        # 注册一个select事件,并指定调用函数
        selector.register(self.client.fileno(),
                          EVENT_WRITE, self.connect)


def loop():
    # 事件循环,不停的请求socket状态调用对应的回调函数
    # select本身是不支持register模式
    # socket状态变化以后的回调是由程序员实现的
    while True:
        ready = selector.select()
        for key, mask in ready:
            call_bak = key.data
            call_bak(key)


if __name__ == "__main__":
    # socket + epoll + loop 实现循环
    fetcher = Fetcher()
    fetcher.get_url("http://www.baidu.com")
    loop()

协程

解决问题:

  1. 回调模式编码复杂
  2. 同步编程并发性不高
  3. 多线程编程需要线程间同步,lock

yield 相关

向内传值,yield.send() 关闭,yield.close() 传递一个异常,yield.throw()

def gen_func():
    # 1. 可以产出值 2,可以接受值(调用方传递进来值)
    html = yield "http://project.com"
    print(html)
    yield 2
    yield 3
    return "erg"


# 1. 生成器不只可以产出值,还可以接收值

if __name__ == "__main__":
    gen = gen_func()
    print(next(gen))
    gen.throw()
    gen.close()

    next(gen)
    # 在调用send发送非None之前,必须启动一次生成器,next 、send方法
    # gen.close()
    # 调用close方法后,若后续还有next(gen)等调用生成器的语句,则会抛出异常,若没有,就不会抛出异常
    # 若在yield内部进行了try catch
    # gen.throw方法会,会扔一个异常进入yield,若在yield内部没处理,则会报错
    # url = next(gen)
    # html = "ergou"
    # send方法可以传递值进入生成器内部,同时
    # 还可以重启生成器执行到下一个位置
    # print(gen.send(html))
    # 启动生成器方式有两种,next(),send

yield from 语法

# python3.3新加入yield from语法

# chain可以将多个可迭代对象连接起来
from itertools import chain

my_list = [1, 2, 3]
my_dict = {
    "a": "http://project.com",
    "b": "http://project2.com"
}

# 自定义chain
def my_chain(*args, **kwargs):
    for my_iter in args:
        # 使用yield from 简化语法
        yield from my_iter
        # for value in my_iter:
        #     yield value

for value in my_chain(my_list, my_dict, range(5, 10)):
    print(value)

# for value in chain(my_list, my_dict, range(5, 10)):
#     print(value)

def g1(gen):
    yield from gen


def main():
    g = g1()
    g.send(None)

# main 调用方 g1(委托生成器) gen 子生成器
# yield from 会在调用方和子生成器之间建立一个双向通道

asyncio

# 事件循环 + 回调(驱动生成器)+ epoll(IO多路复用)
# asyncio是python用于解决异步io编程的一整套解决方案
# tornado
import asyncio
import time


async def get_html(url):
    print("start get url")
    # 这个地方需要使用await修饰
    # 不能使用time.sleep(),因为它是一个阻塞io,会影响性能
    # loop循环是一个线程不断的执行,会调用注册事件的回调函数,然后执行
    # 调用阻塞式io就会有性能问题
    await asyncio.sleep(2)
    # time.sleep(2)
    print("end get url")


if __name__ == "__main__":
    start_time = time.time()
    # loop = asyncio.get_event_loop()
    # loop.run_until_complete(get_html("http://abc.com"))

    # 批量调用
    loop = asyncio.get_event_loop()
    tasks = [get_html("http://abc.com") for i in range(20)]
    # async def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED)
    loop.run_until_complete(asyncio.wait(tasks))

    print(time.time() - start_time)
基本操作
# 事件循环 + 回调(驱动生成器)+ epoll(IO多路复用)
# asyncio是python用于解决异步io编程的一整套解决方案
# tornado
import asyncio
import time
# 将一个函数包装成另一个函数
from functools import partial


async def get_html(url):
    print("start : ", url)
    await asyncio.sleep(1)
    # time.sleep(2)
    print("end : ", url)
    return "abc"


def callback(future):
    print("callback")


def callback2(url, future):
    print("callback2 : ", url)


if __name__ == "__main__":
    start_time = time.time()
    loop = asyncio.get_event_loop()
    
    
    # loop.call_later(等待时间段后执行,执行函数)
    # loop.call_soon() 立即执行
    
    # 指定时间后执行
    # now = loop.time()
    # loop.call_at(now +2)
    
    
    # 使用future的方式
    # get_future = asyncio.ensure_future(get_html("http://abc.com"))
    # 使用loop提供的task方式
    task = loop.create_task(get_html("http://abc.com"))
    # 指定回调函数
    # task.add_done_callback(callback)
    # 若指定的回调函数里有参数,引入包partial
    # task.add_done_callback(partial(callback2, "gggg"))

    # 完成后执行
    # loop.run_until_complete(asyncio.wait([task]))
    # 完成后执行还可以使用 asyncio.gather(), 可以用来分组等操作

    g1 = [get_html("http://abc.com/{id}".format(id="g1:" + str(i))) for i in range(2)]
    g2 = [get_html("http://abc.com/{id}".format(id="g2:" + str(i))) for i in range(2)]

    g1 = asyncio.gather(*g1)
    g2 = asyncio.gather(*g2)
    # 取消一组运行
    # g2.cancel()
    loop.run_until_complete(asyncio.gather(g1, g2))

    print(task.result())
    print(time.time() - start_time)

task 取消任务

try:
   loop.run_until_complete(asyncio.gather(g1, g2))
except KeyBoardInterrupt as e:
    all_tasks = asyncio.Task.all_tasks()
    for task in all_tasks:
        print("cancel task")
        print(task.cancel())
    loop.stop()
    loop.run_forever()
else:
    loop.close()

不建议在协中当中去执行阻塞IO

若是一定要执行,就可以使用多线程:在协程中集成阻塞IO

import asyncio
import time
from concurrent.futures import ThreadPoolExecutor


def get_url():
    time.sleep(2)


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    executor = ThreadPoolExecutor()

    tasks = loop.run_in_executor(executor, get_url, "http://abc.com")

    loop.run_until_complete(asyncio.wait([tasks]))

异步执行

import asyncio
import time
from urllib.parse import urlparse


async def get_url(url):
    # 通过socket请求html
    url = urlparse(url)
    host = url.netloc
    path = url.path
    if path == "":
        path = "/"

    reader, witer = await asyncio.open_connection(host, 80)
    witer.write("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(path, host).encode("utf8"))
    all_lines = []
    async for raw_line in reader:
        data = raw_line.decode("utf8")
        all_lines.append(data)

    html = "\n".join(all_lines)
    print(html)
    return html


async def main():
    tasks = []
    for url in range(20):
        url = "http://shop.projectsedu.com/goods/{}/".format(url)
        tasks.append(asyncio.ensure_future(get_url(url)))
    for task in asyncio.as_completed(tasks):
        result = await task
        print(result)


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    start_time = time.time()

    loop.run_until_complete(main())

    print(time.time() - start_time)