IO复用方式
import socket
from urllib.parse import urlparse
from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE
selector = DefaultSelector()
# 使用select完成http请求
class Fetcher:
# 连接成功
# 取消注册当前事件,并且发送数据
# 同时注册发送请求返回监听事件,并指定返回监听事件之后执行的函数
def connect(self, key):
selector.unregister(key.fd)
self.client.send(
"GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(self.path, self.host).encode("utf8"))
selector.register(self.client.fileno(), EVENT_READ, self.readable)
# 返回监听
def readable(self, key):
d = self.client.recv(1024)
if d:
self.data += d
else:
selector.unregister(key.fd)
self.data = self.data.decode("utf8")
print(self.data)
self.client.close()
def get_url(self, url):
# 通过socket请求html
url = urlparse(url)
self.host = url.netloc
self.path = url.path
self.data = b""
if self.path == "":
self.path = "/"
self.client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.client.setblocking(False)
try:
self.client.connect((self.host, 80))
except BlockingIOError as e:
pass
# 注册一个select事件,并指定调用函数
selector.register(self.client.fileno(),
EVENT_WRITE, self.connect)
def loop():
# 事件循环,不停的请求socket状态调用对应的回调函数
# select本身是不支持register模式
# socket状态变化以后的回调是由程序员实现的
while True:
ready = selector.select()
for key, mask in ready:
call_bak = key.data
call_bak(key)
if __name__ == "__main__":
# socket + epoll + loop 实现循环
fetcher = Fetcher()
fetcher.get_url("http://www.baidu.com")
loop()
协程
解决问题:
- 回调模式编码复杂
- 同步编程并发性不高
- 多线程编程需要线程间同步,lock
yield 相关
向内传值,yield.send() 关闭,yield.close() 传递一个异常,yield.throw()
def gen_func():
# 1. 可以产出值 2,可以接受值(调用方传递进来值)
html = yield "http://project.com"
print(html)
yield 2
yield 3
return "erg"
# 1. 生成器不只可以产出值,还可以接收值
if __name__ == "__main__":
gen = gen_func()
print(next(gen))
gen.throw()
gen.close()
next(gen)
# 在调用send发送非None之前,必须启动一次生成器,next 、send方法
# gen.close()
# 调用close方法后,若后续还有next(gen)等调用生成器的语句,则会抛出异常,若没有,就不会抛出异常
# 若在yield内部进行了try catch
# gen.throw方法会,会扔一个异常进入yield,若在yield内部没处理,则会报错
# url = next(gen)
# html = "ergou"
# send方法可以传递值进入生成器内部,同时
# 还可以重启生成器执行到下一个位置
# print(gen.send(html))
# 启动生成器方式有两种,next(),send
yield from 语法
# python3.3新加入yield from语法
# chain可以将多个可迭代对象连接起来
from itertools import chain
my_list = [1, 2, 3]
my_dict = {
"a": "http://project.com",
"b": "http://project2.com"
}
# 自定义chain
def my_chain(*args, **kwargs):
for my_iter in args:
# 使用yield from 简化语法
yield from my_iter
# for value in my_iter:
# yield value
for value in my_chain(my_list, my_dict, range(5, 10)):
print(value)
# for value in chain(my_list, my_dict, range(5, 10)):
# print(value)
def g1(gen):
yield from gen
def main():
g = g1()
g.send(None)
# main 调用方 g1(委托生成器) gen 子生成器
# yield from 会在调用方和子生成器之间建立一个双向通道
asyncio
# 事件循环 + 回调(驱动生成器)+ epoll(IO多路复用)
# asyncio是python用于解决异步io编程的一整套解决方案
# tornado
import asyncio
import time
async def get_html(url):
print("start get url")
# 这个地方需要使用await修饰
# 不能使用time.sleep(),因为它是一个阻塞io,会影响性能
# loop循环是一个线程不断的执行,会调用注册事件的回调函数,然后执行
# 调用阻塞式io就会有性能问题
await asyncio.sleep(2)
# time.sleep(2)
print("end get url")
if __name__ == "__main__":
start_time = time.time()
# loop = asyncio.get_event_loop()
# loop.run_until_complete(get_html("http://abc.com"))
# 批量调用
loop = asyncio.get_event_loop()
tasks = [get_html("http://abc.com") for i in range(20)]
# async def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED)
loop.run_until_complete(asyncio.wait(tasks))
print(time.time() - start_time)
基本操作
# 事件循环 + 回调(驱动生成器)+ epoll(IO多路复用)
# asyncio是python用于解决异步io编程的一整套解决方案
# tornado
import asyncio
import time
# 将一个函数包装成另一个函数
from functools import partial
async def get_html(url):
print("start : ", url)
await asyncio.sleep(1)
# time.sleep(2)
print("end : ", url)
return "abc"
def callback(future):
print("callback")
def callback2(url, future):
print("callback2 : ", url)
if __name__ == "__main__":
start_time = time.time()
loop = asyncio.get_event_loop()
# loop.call_later(等待时间段后执行,执行函数)
# loop.call_soon() 立即执行
# 指定时间后执行
# now = loop.time()
# loop.call_at(now +2)
# 使用future的方式
# get_future = asyncio.ensure_future(get_html("http://abc.com"))
# 使用loop提供的task方式
task = loop.create_task(get_html("http://abc.com"))
# 指定回调函数
# task.add_done_callback(callback)
# 若指定的回调函数里有参数,引入包partial
# task.add_done_callback(partial(callback2, "gggg"))
# 完成后执行
# loop.run_until_complete(asyncio.wait([task]))
# 完成后执行还可以使用 asyncio.gather(), 可以用来分组等操作
g1 = [get_html("http://abc.com/{id}".format(id="g1:" + str(i))) for i in range(2)]
g2 = [get_html("http://abc.com/{id}".format(id="g2:" + str(i))) for i in range(2)]
g1 = asyncio.gather(*g1)
g2 = asyncio.gather(*g2)
# 取消一组运行
# g2.cancel()
loop.run_until_complete(asyncio.gather(g1, g2))
print(task.result())
print(time.time() - start_time)
task 取消任务
try:
loop.run_until_complete(asyncio.gather(g1, g2))
except KeyBoardInterrupt as e:
all_tasks = asyncio.Task.all_tasks()
for task in all_tasks:
print("cancel task")
print(task.cancel())
loop.stop()
loop.run_forever()
else:
loop.close()
不建议在协中当中去执行阻塞IO
若是一定要执行,就可以使用多线程:在协程中集成阻塞IO
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor
def get_url():
time.sleep(2)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
executor = ThreadPoolExecutor()
tasks = loop.run_in_executor(executor, get_url, "http://abc.com")
loop.run_until_complete(asyncio.wait([tasks]))
异步执行
import asyncio
import time
from urllib.parse import urlparse
async def get_url(url):
# 通过socket请求html
url = urlparse(url)
host = url.netloc
path = url.path
if path == "":
path = "/"
reader, witer = await asyncio.open_connection(host, 80)
witer.write("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(path, host).encode("utf8"))
all_lines = []
async for raw_line in reader:
data = raw_line.decode("utf8")
all_lines.append(data)
html = "\n".join(all_lines)
print(html)
return html
async def main():
tasks = []
for url in range(20):
url = "http://shop.projectsedu.com/goods/{}/".format(url)
tasks.append(asyncio.ensure_future(get_url(url)))
for task in asyncio.as_completed(tasks):
result = await task
print(result)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
start_time = time.time()
loop.run_until_complete(main())
print(time.time() - start_time)