进程池与线程池:

1.socket服务端实现并发

服务端需满足需求:

​ 固定的ip 和 port

SRE实战 互联网时代守护先锋,助力企业售后服务体系运筹帷幄!一键直达领取阿里云限量特价优惠。

​ 24小时提供服务

​ 能够实现并发

示例:

客户端

import socket
client = socket.socket()
client.connect(('127.0.0.1',8081))

while True:
    msg = input('>>>:').encode('utf-8')
    if len(msg) == 0:continue
    client.send(msg)
    data = client.recv(1024)
    print(data.decode('utf-8'))

服务端

mport socket
from threading import Thread
server = socket.socket()
server.bind(('127.0.0.1', 8081))
server.listen(5)
def coummniucate(conn):
    while True:
        try:
            data =conn.recv(1024)
            if len(data) == 0:break
            conn.send(data.upper())
        except ConnectionResetError:
            break
    conn.close()
while True:
    conn, addr = server.accept()
    print(addr)
    t = Thread(target=coummniucate,args=(conn,))
    t.start()

2.进程池与线程池介绍:

线程进程不可能无限制的开下去,总要消耗和占用资源

进程池线程池概念:硬件有极限,为了减轻硬件压力,所有有了池的概念。

concurent.future模块需要了解的
1.concurent.future模块是用来创建并行的任务,提供了更高级别的接口,
为了异步执行调用
2.concurent.future这个模块用起来非常方便,它的接口也封装的非常简单
3.concurent.future模块既可以实现进程池,也可以实现线程池
4.模块导入进程池和线程池
from  concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
还可以导入一个Executor,但是你别这样导,这个类是一个抽象类
抽象类的目的是规范他的子类必须有某种方法(并且抽象类的方法必须实现),但是抽象类不能被实例化
5.
  p = ProcessPoolExecutor(max_works)对于进程池如果不写max_works:默认的是cpu的数目,默认是4个
  p = ThreadPoolExecutor(max_works)对于线程池如果不写max_works:默认的是cpu的数目*5
6.如果是进程池,得到的结果如果是一个对象。我们得用一个.get()方法得到结果
  但是现在用了concurent.future模块,我们可以用obj.result方法
  p.submit(task,i)  #相当于apply_async异步方法
  p.shutdown()  #默认有个参数wite=True (相当于close和join)
concurrent.futures模块导入
线程池创建(线程数=cpu核数*5左右)
submit提交任务(提交任务的两种方式)
异步提交的submit返回值对象
shutdown关闭池并等待所有任务运行结束
对象获取任务返回值
进程池的使用,验证进程池在创建的时候里面固定有指定的进 程数
异步提交回调函数的使用

线程池:

进程池:就是在一个进程内控制一定个数的线程

基于concurent.futures模块的进程池和线程池(他们的同步执行和异步执行是一样)

创建线程池:
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
import time
import os
# 示例化池对象
# 不知道参数的情况,默认是当前计算机cpu个数乘以5,也可以指定线程个数
pool = ProcessPoolExecutor(5)  # 创建了一个池子,池子里面有20个线程

def task(n):
    print(n,os.getpid())
    time.sleep(2)
    return n**2
def call_back(n):
    print('我拿到了结果:%s'%n.result())
"""
提交任务的方式
    同步:提交任务之后,原地等待任务的返回结果,再继续执行下一步代码
    异步:提交任务之后,不等待任务的返回结果(通过回调函数拿到返回结果并处理),直接执行下一步操作
"""


# 回调函数:异步提交之后一旦任务有返回结果,自动交给另外一个去执行
if __name__ == '__main__':
    # pool.submit(task,1)
    t_list = []
    for i in range(20):
        future = pool.submit(task,i).add_done_callback(call_back)  # 异步提交任务
        t_list.append(future)

    # pool.shutdown()  # 关闭池子并且等待池子中所有的任务运行完毕
    # for p in t_list:
    #     print('>>>:',p.result())
    print('主')

1.协程

  • 进程:资源单位
  • 线程:执行单位
  • 协程:单线程下实现并发(能够在多个任务之间切换和保存状态来节省IO),这里注意区分操作系统的切换+保存状态是针对多个线程而言,而我们现在是想在单个线程下自己手动实现操作系统的切换+保存状态的功能

注意协程这个概念完全是程序员自己想出来的东西,它对于操作系统来说根本不存在。操作系统只知道进程和线程。并且需要注意的是并不是单个线程下实现切换+保存状态就能提升效率,因为你可能是没有遇到io也切,那反而会降低效率

再回过头来想上面的socket服务端实现并发的例子,单个线程服务端在建立连接的时候无法去干通信的活,在干通信的时候也无法去干连接的活。这两者肯定都会有IO,如果能够实现通信io了我就去干建连接,建连接io了我就去干通信,那其实我们就可以实现单线程下实现并发

将单个线程的效率提升到最高,多进程下开多线程,多线程下用协程>>> 实现高并发!!!

三者都是实现并发的手段

# yield能够实现保存上次运行状态,但是无法识别遇到io才切换

#串行执行
import time

def func1():
    for i in range(10000000):
        i+1

def func2():
    for i in range(10000000):
        i+1

start = time.time()
func1()
func2()
stop = time.time()
print(stop - start)


#基于yield并发执行
import time
def func1():
    while True:
        10000000+1
        yield

def func2():
    g=func1()
    for i in range(10000000):
        # time.sleep(100)  # 模拟IO,yield并不会捕捉到并自动切换
        i+1
        next(g)

start=time.time()
func2()
stop=time.time()
print(stop-start)

yield并不能帮我们自动捕获到io行为才切换,那什么模块可以呢?

3.gevent模块

一个spawn就是一个帮你管理任务的对象

gevent模块不能识别它本身以外的所有的IO行为,但是它内部封装了一个模块,能够帮助我们识别所有的IO行为

from gevent import monkey;monkey.patch_all()  # 检测所有的IO行为
from gevent import spawn,joinall  # joinall列表里面放多个对象,实现join效果
import time

def play(name):
    print('%s play 1' %name)
    time.sleep(5)
    print('%s play 2' %name)

def eat(name):
    print('%s eat 1' %name)
    time.sleep(3)
    print('%s eat 2' %name)


start=time.time()
g1=spawn(play,'刘清正')
g2=spawn(eat,'刘清正')

# g1.join()
# g2.join()
joinall([g1,g2])
print('主',time.time()-start)  # 单线程下实现并发,提升效率

4.协程实现服务端客户端通信

链接和通信都是io密集型操作,我们只需要在这两者之间来回切换其实就能实现并发的效果

服务端监测链接和通信任务,客户端起多线程同时链接服务端

# 服务端
from gevent import monkey;monkey.patch_all()
from socket import *
from gevent import spawn


def communicate(conn):
    while True:
        try:
            data = conn.recv(1024)
            if len(data) == 0: break
            conn.send(data.upper())
        except ConnectionResetError:
            break
    conn.close()
    

def server(ip, port, backlog=5):
    server = socket(AF_INET, SOCK_STREAM)
    server.bind((ip, port))
    server.listen(backlog)

    while True:  # 链接循环
        conn, client_addr = server.accept()
        print(client_addr)

        # 通信
        spawn(comunicate,conn)


if __name__ == '__main__':
    g1=spawn(server,'127.0.0.1',8080)
    g1.join()

    
# 客户端
from threading import Thread, current_thread
from socket import *


def client():
    client = socket(AF_INET, SOCK_STREAM)
    client.connect(('127.0.0.1', 8080))

    n = 0
    while True:
        msg = '%s say hello %s' % (current_thread().name, n)
        n += 1
        client.send(msg.encode('utf-8'))
        data = client.recv(1024)
        print(data.decode('utf-8'))


if __name__ == '__main__':
    for i in range(500):
        t = Thread(target=client)
        t.start()
# 原本服务端需要开启500个线程才能跟500个客户端通信,现在只需要一个线程就可以扛住500客户端
# 进程下面开多个线程,线程下面再开多个协程,最大化提升软件运行效率

IO模型

  • 阻塞IO

  • 非阻塞IO(服务端通信针对accept用s.setblocking(False)加异常捕获,cpu占用率过高)

  • IO多路复用

    在只检测一个套接字的情况下,他的效率连阻塞IO都比不上。因为select这个中间人增加了环节。

    但是在检测多个套接字的情况下,就能省去wait for data过程

  • 异步IO

扫码关注我们
微信号:SRE实战
拒绝背锅 运筹帷幄