2019-04-19-day036-协程与进程池
内容回顾
- 11:30
- 码云 :王老师检查作业+定期抽查
- 注册账号
- 考试的时间
threading.enumerate(),能够获取到当前正在运行的所有线程对象列表
守护线程
- 守护线程会等待所有的非守护线程结束之后结束
- 守护线程的结束是因为主进程的结束
- 在start之前设置daemon = True
####锁 - GIL锁 : 全局解释器锁,在Cpython解释器中,同一时刻同一个进程内只能有一个线程访问CPU
- 互斥锁
- 锁的是代码,一段代码被多个线程执行,并且要对全局的变量进行非原子性操作
- 互斥锁 : 在同一个线程中,不能连续acquire多次,并且可以做到多个线程中被锁的代码同时只有一个线程执行
- 递归锁 : 在同一个线程中,能连续acquire多次,并且可以做到多个线程中被锁的代码同时只有一个线程执行
* 从一定程度上可以避免死锁现象
* 使用递归锁也会产生死锁现象 - 死锁现象 : 只要实例化多把锁,并交替使用,都有可能产生死锁现象
* 只要是1把锁,递归锁永远不死锁
* 只要是2以及以上,交替使用,
* 只要是2以及以上,交替使用,递归锁互斥锁都可能死锁
####队列 - from queue import Queue,LifoQueue,PriorityQueue
- queue这个模块提供的所有队列都是线程安全的
- 先进先出 Queue
- 后进先出 LifoQueue
- 优先级队列 PriorityQueue
####进程池 - call_back回调函数
concurrent.futures
ProcessPoolExecutor
from concurrent.futures import ProcessPoolExecutor
pool = ProcessPoolExecutor(5)
pool进程池对象
ret = pool.map(func,iter)
#ret是一个可迭代对象,迭代器(惰性运算),可以直接通过for循环从ret中获取返回值
submit
- 1. for + submit(func,arg1,arg2) = map
- 任务对象task
- 2. task = submit(func)
- 3. task.result() 获取任务函数的返回值 (阻塞方法)
- 4. task.add_done_callback(回调函数)
####shutdown - pool.shutdown() 等待进程池中所有的任务都执行完毕之后结束阻塞
import time
from concurrent.futures import ProcessPoolExecutor
def func():
time.sleep(1)
return '*'*20
if __name__ == '__main__':
p = ProcessPoolExecutor(4)
task = p.submit(func)
time.sleep(2)
print('---->')
print(task.result()) * 阻塞方法
池
- time/random 在任务中睡一会儿,在主线程可以睡一会儿
- threading.current_thread().ident
线程池
from urllib.request import urlopen
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
def get_html(name,addr):
ret = urlopen(addr)
return {'name':name,'content':ret.read()}
def parser_page(ret_obj):
dic = ret_obj.result()
with open(dic['name']+'.html','wb') as f:
f.write(dic['content'])
url_lst = {
'协程':'http://www.cnblogs.com/Eva-J/articles/8324673.html',
'线程':'http://www.cnblogs.com/Eva-J/articles/8306047.html',
'目录':'https://www.cnblogs.com/Eva-J/p/7277026.html',
'百度':'http://www.baidu.com',
'sogou':'http://www.sogou.com'
}
t = ThreadPoolExecutor(20)
for url in url_lst:
task = t.submit(get_html,url,url_lst[url])
task.add_done_callback(parser_page)
task.result()
t.map()
t.shutdown()
使用多线程去执行get_html获取网页对应的内容
一旦get_html执行结束之后,立即使用parser_page来分析获取的页面结果
进程池 除非高计算型的场景否则几乎不用 CPU的个数*2 线程池 cpu的个数*5
4 cpu 4*20 = 80
协程
- 进程 计算机中最小的资源分配单位
- 线程 计算机中被CPU调度的最小单位
操作系统 负责调度 线程
- 对于操作系统来说 可见的最小单位就是线程
- 线程的开销比进程虽然小很多,但是开启线程\关闭线程仍然需要开销
协程(本质是一条线程,操作系统不可见)
- 是有程序员操作的,而不是由操作系统调度的
- 多个协程的本质是一条线程,所以多个协程不能利用多核
出现的意义 : 多个任务中的IO时间可以共享,当执行一个任务遇到IO操作的时候,
- 可以将程序切换到另一个任务中继续执行
- 在有限的线程中,实现任务的并发,节省了调用操作系统创建\销毁线程的时间
- 并且协程的切换效率比线程的切换效率要高
- 协程执行多个任务能够让线程少陷入阻塞,让线程看起来很忙
线程陷入阻塞的次数越少,那么能够抢占CPU资源就越多,你的程序效率看起来就越高
SRE实战 互联网时代守护先锋,助力企业售后服务体系运筹帷幄!一键直达领取阿里云限量特价优惠。1.开销变小了
2.效率变高了
两个任务,一个任务执行过程中遇到io操作就切到另一个任务执行
直到所有的任务都遇到IO操作了,才进入阻塞,其中有任何一个任务结束阻塞,都会回到就绪队列里
协程的本质 - 就是在多个任务之间能够来回切换
做切换
def consumer():
n = yield
n = yield
n = yield
n = yield
n = yield
n = yield
n = yield
n = yield
def producer():
g = consumer()
next(g)
for i in range(2000000):
g.send(i)
tornado python的web的异步框架 yield --> asyncio
协程的切换 会不会占用时间
- 仍然是占用时间的
- 基本在函数调用的时间级别
- 不依赖操作系统
扩展模块\第三方模块
- python语言 写了一段代码\一个功能
- c语言 写了一段代码\编译之后作为一个python模块为你提供功能
yield这个切换的功能是C语言能写的
能不能用C语言写一个第三方模块也能实现yield的效果
协程模块 帮助我们更加简单的进行函数之间的切换
import time
from greenlet import greenlet * 协程模块
def eat(): * 协程任务 协程函数
print('start eating')
g2.switch()
time.sleep(1)
print('end eating')
g2.switch()
def sleep(): * 协程任务 协程函数
print('start sleeping')
g1.switch()
time.sleep(1)
print('end sleeping')
g1 = greenlet(eat)
g2 = greenlet(sleep)
g1.switch()
gevent模块
import time
import gevent
def eat(): * 协程任务 协程函数
print('start eating')
gevent.sleep(1)
print('end eating')
def sleep(): * 协程任务 协程函数
print('start sleeping')
gevent.sleep(1)
print('end sleeping')
g1 = gevent.spawn(eat)
g2 = gevent.spawn(sleep)
* g1.join() * 阻塞,直到g1任务执行完毕
* g2.join() * 阻塞,直到g2任务执行完毕
gevent.joinall([g1,g2])
from gevent import monkey
monkey.patch_all()
import time
import gevent
def eat(): * 协程任务 协程函数
print('start eating')
time.sleep(1)
print('end eating')
def sleep(): * 协程任务 协程函数
print('start sleeping')
time.sleep(1)
print('end sleeping')
g1 = gevent.spawn(eat) * 创建协程
g2 = gevent.spawn(sleep)
gevent.joinall([g1,g2]) * 阻塞 直到协程任务结束
请求网页
url_dic = {
'协程':'http://www.cnblogs.com/Eva-J/articles/8324673.html',
'线程':'http://www.cnblogs.com/Eva-J/articles/8306047.html',
'目录':'https://www.cnblogs.com/Eva-J/p/7277026.html',
'百度':'http://www.baidu.com',
'sogou':'http://www.sogou.com',
'4399':'http://www.4399.com',
'豆瓣':'http://www.douban.com',
'sina':'http://www.sina.com.cn',
'淘宝':'http://www.taobao.com',
'JD':'http://www.JD.com'
}
import time
from gevent import monkey;monkey.patch_all()
from urllib.request import urlopen
import gevent
def get_html(name,url):
ret = urlopen(url)
content = ret.read()
with open(name,'wb') as f:
f.write(content)
start = time.time()
for name in url_dic:
get_html(name+'_sync.html',url_dic[name])
ret = time.time() - start
print('同步时间 :',ret)
start = time.time()
g_l = []
for name in url_dic:
g = gevent.spawn(get_html,name+'_async.html',url_dic[name])
g_l.append(g)
gevent.joinall(g_l)
ret = time.time() - start
print('异步时间 :',ret)
高并发socket
import socket
import gevent
from gevent import monkey
monkey.patch_all()
sk = socket.socket()
sk.bind(('127.0.0.1',9001))
sk.listen()
def talk(conn):
while True:
msg = conn.recv(1024).decode('utf-8')
conn.send(msg.upper().encode('utf-8'))
while True:
conn,_ = sk.accept()
gevent.spawn(talk,conn)
一条线程抗500个并发
进程 高计算型的场景下
线程 对于IO操作的检测是更加全面且灵敏的
协程 能够检测到的io操作是有限的
#20 * 500 = 10000
#5个进程 * 20 * 500 = 50000
Client
import socket
from threading import Thread
def client(i):
sk = socket.socket()
sk.connect(('127.0.0.1',9001))
while True:
sk.send(b'hello')
print(sk.recv(1024))
for i in range(500):
Thread(target=client,args=(i,)).start()
ClientII
import socket
sk = socket.socket()
sk.connect(('127.0.0.1',9001))
while True:
sk.send(b'beybey')
print(sk.recv(1024))

更多精彩