进程池线程池、协程
一、socket服务端实现并发
服务端:
import socket from threading import Thread """ 服务端: 1、固定的ip和port 2、24小时不间断提供服务 3、支持高并发 """ server = socket.socket() server.bind(('127.0.0.1',8080)) server.listen(5) # 半连接池 def communicate(conn): while True: try: data = conn.recv(1024) if len(data) == 0:break print(data) conn.send(data.upper()) except ConnectionResetError: break conn.close() while True: conn,addr = server.accept() print(addr) t = Thread(target=communicate,args=(conn,)) t.start()
客户端:
import socket client = socket.socket() client.connect(('127.0.0.1',8080)) while True: msg = input('>>>:').encode('utf-8') if len(msg) == 0:continue client.send(msg) data = client.recv(1024) print(data)
二、进程池线程池介绍
池:
为了减缓计算机硬件的压力,避免计算机硬件设施崩溃
虽然减轻了计算机的压力,但一定程度上降低了持续的效率
进程池线程池:
为了限制开设的进程数和线程数,从而保证计算机硬件的安全
使用方法:
concurrent.futures模块导入
线程池创建(线程数=CPU核数*5左右)
submit提交任务(提交任务的两种方式)
异步提交的submit返回值对象
shutdown关闭池并等待所有任务运行结束
对象获取任务返回值
进程池的使用,验证进程池在创建的时候里面固定有指定的进程数
异步提交回调函数的使用
进程池:
from concurrent.futures import ProcessPoolExecutor import time import os pool = ProcessPoolExecutor(5) # 创建一个池子,池子里面有5个进程 def task(n): print(n,os.getpid()) time.sleep(2) return n**2 def call_back(n): print('拿到了结果:%s'%n.result()) """ 提交任务的方式 同步:提交任务之后,原地等待任务的返回结果,再继续执行下一步代码 异步:提交任务之后,不等待任务的返回结果(通过回调函数拿到返回结果并处理),直接执行下一步操作 """ if __name__ == '__main__': for i in range(20): future = pool.submit(task,i).add_done_callback(call_back) print('主')
线程池:
from concurrent.futures import ThreadPoolExecutor import time import os pool = ThreadPoolExecutor(5) # 创建一个池子,池子里面有5个线程 def task(n): print(n,os.getpid()) time.sleep(2) return n**2 def call_back(n): print('拿到了结果:%s'%n.result()) """ 提交任务的方式 同步:提交任务之后,原地等待任务的返回结果,再继续执行下一步代码 异步:提交任务之后,不等待任务的返回结果(通过回调函数拿到返回结果并处理),直接执行下一步操作 """ if __name__ == '__main__': for i in range(20): future = pool.submit(task,i).add_done_callback(call_back) print('主')
三、协程
进程:资源单位
线程:执行单位
协程:单线程下实现并发(能够在多个任务之间切换和保存状态来节省IO),
这里注意区分操作系统的切换+保存状态是针对多个线程而言,
而我们现在是想在单个线程下自己手动实现操作系统的切换+保存状态的功能
注意:协程这个概念完全是程序员自己想出来的东西,它对于操作系统来说根本不存在
操作系统只知道进程和线程,并不是单个线程下实现切换+保存状态就能提升效率,
如果没有遇到io操作反而会降低效率
高并发:
多进程下开多线程,多线程下用协程
实现并发的手段:
yield能够实现保存上次运行状态,但是无法识别遇到io才切
gevent模块:
一个spawn就是一个帮你管理任务的对象
from gevent import monkey;monkey.patch_all() # 检测所有的io行为 from gevent import spawn,joinall # joinall列表里放多个对象,实现join效果 import time def play(name): print('%s play 1' % name) time.sleep(5) print('%s play 2' % name) def eat(name): print('%s eat 1' %name) time.sleep(3) print('%s eat 2' % name) start = time.time() g1 = spawn(play,'lucas') g2 = spawn(eat,'lucas') joinall([g1,g2]) print('主',time.time()-start)
四、协程实现服务端客户端通信
链接和通信都是io密集型操作,我们只需要在这两者之间来回切换其实就能实现并发的效果
服务端监测链接和通信任务,客户端起多线程同时链接服务端
服务端:
from gevent import monkey;monkey.patch_all() from gevent import spawn import socket def communicate(conn): while True: while True: try: data = conn.recv(1024) if len(data) == 0:break print(data) conn.send(data.upper()) except ConnectionResetError: break conn.close() def sever(): server = socket.socket() server.bind(('127.0.0.1',8080)) server.listen(5) while True: conn,addr = server.accept() spawn(communicate,conn) if __name__ == '__main__': s1 = spawn(sever) s1.join()
客户端:
from threading import Thread,current_thread import socket def client(): client = socket.socket() client.connect(('127.0.0.1',8080)) n = 1 while True: data = '%s %s' % (current_thread().name,n) n += 1 client.send(data.encode('utf-8')) info = client.recv(1024) print(info) if __name__ == '__main__': for i in range(500): t = Thread(target=client) t.start()
五、IO模型
阻塞IO
非阻塞IO(服务端通信针对accept用s.setblocking(False)加异常捕获,cpu占用率过高)
IO多路复用
异步IO

更多精彩