并发编程.md
操作系统基础
-
人机矛盾: CPU利用率低
-
磁带存储+批处理:降低数据的读取时间,提高CPU的利用率
SRE实战 互联网时代守护先锋,助力企业售后服务体系运筹帷幄!一键直达领取阿里云限量特价优惠。 -
多道操作系统------在一个任务遇到IO的时候主动让出CPU,给其他任务使用
- 由操作系统完成
- 切换要需要时间
多道技术: 1.产生背景:针对单核,实现并发 ps: 现在的主机一般是多核,那么每个核都会利用多道技术 有4个cpu,运行于cpu1的某个程序遇到io阻塞,会等到io结束再重新调度,会被调度到4个 cpu中的任意一个,具体由操作系统调度算法决定。 2.空间上的复用:如内存中同时有多道程序 3.时间上的复用:复用一个cpu的时间片 强调:遇到io切,占用cpu时间过长也切,核心在于切之前将进程的状态保存下来,这样 才能保证下次切换回来时,能基于上次切走的位置继续运行
-
分时操作系统-------给时间分片,让多个任务轮流使用CPU
- 短作业优先算法
- 先来先服务算法
每个程序分配一个时间片,轮转使用CPU,切换需要时间,降低CPU利用率,提高用户体验
-
通用操作系统-------分时操作系统 + 多道操作系统 + 实时操作系统
- 多个程序一起在计算机中执行
- 一个程序如果遇到IO操作,切出去让出CPU
- 一个程序没有遇到IO,但是时间片到时了,切出去让出CPU
-
操作系统负责什么?
调度进程先后执行的顺序 控制执行的时间等等
资源的分配
进程的概念
进程:
- 运行中的程序
- 是计算机中最小的资源分配单位
- 在操作系统中唯一标识符:PID
进程和程序的区别:
程序只是一个文件
进程是这个文件被CPU运行起来了
操作系统调度进程的算法:
- 短作业优先
- 先来先服务
- 时间片轮转
- 多级反馈算法
并行与并发:
- 并行:并行是指两者同时执行,比如赛跑,两个人都在不停的往前跑;(资源够用,比如三个线程,四核的CPU )
- 并发:并发是指资源有限的情况下,两者交替轮流使用资源,比如一段路(单核CPU资源)同时只能过一个人,A走一段后,让给B,B用完继续给A ,交替使用,目的是提高效率。
-
就绪(Ready)状态
当进程已分配到除CPU以外的所有必要的资源,只要获得处理机便可立即执行,这时的进程状态称为就绪状态。
-
执行/运行(Running)
状态当进程已获得处理机,其程序正在处理机上执行,此时的进程状态称为执行状态。
-
阻塞(Blocked)状态
正在执行的进程,由于等待某个事件发生而无法执行时,便放弃处理机而处于阻塞状态。引起进程阻塞的事件可有多种,例如,等待I/O完成、申请缓冲区不能满足、等待信件(信号)等。
同步异步:
所谓同步就是一个任务的完成需要依赖另外一个任务时,只有等待被依赖的任务完成后,依赖的任务才能算完成,这是一种可靠的任务序列。要么成功都成功,失败都失败,两个任务的状态可以保持一致。
所谓异步是不需要等待被依赖的任务完成,只是通知被依赖的任务要完成什么工作,依赖的任务也立即执行,只要自己完成了整个任务就算完成了。至于被依赖的任务最终是否真正完成,依赖它的任务无法确定,所以它是不可靠的任务序列。
在python程序中的进程操作
进程:
# 创建进程 时间开销大
# 销毁进程 时间开销大
# 进程之间切换 时间开销大
线程:
线程是进程的一部分,每个进程中至少有一个线程
能被CPU调度的最小单位
一个进程中的多个线程是可以共享这个进程的数据的 —— 数据共享
线程的创建、销毁、切换 开销远远小于进程 —— 开销小
进程:是计算机中最小的资源分配单位(进程是负责圈资源)
线程:是计算机中能被CPU调度的最小单位 (线程是负责执行具体代码的)
os.getpid()
:获取当前进程pid
os.getppid()
:获取父级进程pid,可以创建子进程,在pycharm中启动的所有py程序都是pycharm的子进程
import os
import time
from multiprocessing import Process
# multiprocessing多进程模块Process类
def func():
print('start',os.getpid())
time.sleep(1)
print('end',os.getpid())
if __name__ == '__main__':
p = Process(target=func) # 将函数封装到类,创建一个要开启func进程的对象
p.start() # 异步 调用开启进程的方法 但是并不等待这个进程真的开启
print('main :',os.getpid())
#main : 11436
#start 9860
#end 9860
操作系统创建进程的方式不同
windows操作系统执行开启进程的代码
实际上新的子进程需要通过import父进程的代码来完成数据的导入工作
所以有一些内容我们只希望在父进程中完成,就写在if __name__ == '__main__':下面
ios linux操作系统创建进程 fork,拷贝的方式
-
主进程和子进程之间的关系
父进程会等待着所有的子进程结束之后才结束,为了回收资源
主进程代码执行完毕:
# 主进程负责回收子进程的资源
# 如果子进程执行结束,父进程没有回收资源,那么这个子进程会变成一个僵尸进程
# 主进程的结束逻辑
# 主进程的代码结束
# 所有的子进程结束
# 给子进程回收资源
# 主进程结束
# 主进程怎么知道子进程结束了的呢?
# 基于网络、文件
- join方法 :阻塞父进程,直到对应子进程结束就结束
import time
from multiprocessing import Process
def send_mail():
time.sleep(3)
print('发送了一封邮件')
if __name__ == '__main__':
p = Process(target=send_mail)
p.start() # 异步 非阻塞
# time.sleep(5)
print('join start')
p.join() # 同步 阻塞 直到p对应的进程结束之后才结束阻塞
print('5000封邮件已发送完毕')
#join start
#发送了一封邮件
#5000封邮件已发送完毕
import time
import random
from multiprocessing import Process
def send_mail(a):
time.sleep(random.random())
print('发送了一封邮件',a)
if __name__ == '__main__':
l = []
for i in range(10):
p = Process(target=send_mail,args=(i,))#向子进程传参数,用元组
p.start()
l.append(p) #回收多个子进程资源,先添指列表,最后统一处理
for p in l:p.join()
# 阻塞 直到上面的十个进程都结束
print('5000封邮件已发送完毕')
发送了一封邮件 5
发送了一封邮件 4
发送了一封邮件 3
......
5000封邮件已发送完毕
补充:Windows开启进程,由于创建机制,必须采用此方式.
print([__name__])
if __name__ == '__main__':
# 控制当这个py文件被当作脚本直接执行的时候,就执行这里面的代码
# 当这个py文件被当作模块导入的时候,就不执行这里面的代码
print('hello hello')
# __name__ == '__main__'
# 执行的文件就是__name__所在的文件
# __name__ == '文件名'
# __name__所在的文件被导入执行的时候
-
守护进程
随着主进程的代码结束而结束的,所有的子进程都必须在主进程结束之前结束,由主进程来负责回收资源
p.daemon = True
其他方法:
p.is_alive() 判断进程是否活着
p.terminate() # 可以解释异步非阻塞, 关闭需要时间,并不等到返回结束进程结果,会变僵尸
def son1():
while True:
print('is alive')
time.sleep(0.5)
if __name__ == '__main__':
p = Process(target=son1)
p.start() # 异步 非阻塞
print(p.is_alive())
time.sleep(1)
p.terminate() # 异步的 非阻塞
print(p.is_alive()) # 进程还活着 因为操作系统还没来得及关闭进程
time.sleep(0.01)
print(p.is_alive()) # 操作系统已经响应了我们要关闭进程的需求,再去检测的时候,得到的结果是进程已经结束了
使用面向对象方式开启进程
import os
import time
from multiprocessing import Process
class MyProcecss2(Process): #必须继承Process
def run(self): #必须要有run方法,重写process的run,start自动调用run
while True:
print('is alive')
time.sleep(0.5)
class MyProcecss1(Process):
def __init__(self,x,y): #传参数要定义init函数
self.x = x
self.y = y
super().__init__() #要导入父类的初始化参数
def run(self):
print(self.x,self.y,os.getpid())
for i in range(5):
print('in son2')
time.sleep(1)
if __name__ == '__main__':
mp = MyProcecss1(1,2)
mp.daemon = True
mp.start()
print(mp.is_alive())
mp.terminate()
# mp2 = MyProcecss2()
# mp2.start()
# print('main :',os.getpid())
# time.sleep(1)
Process操作进程的方法
# p.start() 开启进程 异步非阻塞
# p.terminate() 结束进程 异步非阻塞
# p.join() 同步阻塞
# p.isalive() 获取当前进程的状态
# daemon = True 设置为守护进程,守护进程永远在主进程的代码结束之后自动结束
锁
# 1.如果在一个并发的场景下,涉及到某部分内容
# 是需要修改一些所有进程共享数据资源
# 需要加锁来维护数据的安全
# 2.在数据安全的基础上,才考虑效率问题
# 3.同步存在的意义
# 数据的安全性
# 在主进程中实例化 lock = Lock()
# 把这把锁传递给子进程
# 在子进程中 对需要加锁的代码 进行 with lock:
# with lock相当于lock.acquire()和lock.release()
# 在进程中需要加锁的场景
# 共享的数据资源(文件、数据库)
# 对资源进行修改、删除操作
# 加锁之后能够保证数据的安全性 但是也降低了程序的执行效率
mport time
import json
from multiprocessing import Process,Lock
def search_ticket(user):
with open('ticket_count') as f:
dic = json.load(f)
print('%s查询结果 : %s张余票'%(user,dic['count']))
def buy_ticket(user,lock):
# with lock:
# lock.acquire() # 给这段代码加上一把锁
time.sleep(0.02)
with open('ticket_count') as f:
dic = json.load(f)
if dic['count'] > 0:
print('%s买到票了'%(user))
dic['count'] -= 1
else:
print('%s没买到票' % (user))
time.sleep(0.02)
with open('ticket_count','w') as f:
json.dump(dic,f)
# lock.release() # 给这段代码解锁
def task(user, lock):
search_ticket(user)
with lock:
buy_ticket(user, lock)
if __name__ == '__main__':
lock = Lock()
for i in range(10):
p = Process(target=task,args=('user%s'%i,lock))
p.start()
进程之间通信IPC
进程之间的通信 - IPC(inter process communication)
第三方:redis,memcache,kafka,rabbitmq
特点:并发需求,高可用,断电保存数据,解耦
from multiprocessing import Queue,Process
# 先进先出
def func(exp,q):
ret = eval(exp)
q.put({ret,2,3})
q.put(ret*2)
q.put(ret*4)
if __name__ == '__main__':
q = Queue()
Process(target=func,args=('1+2+3',q)).start()
print(q.get())
print(q.get())
print(q.get())
# Queue基于 天生就是数据安全的
# 文件家族的socket pickle lock
# pipe 管道(不安全的) = 文件家族的socket pickle
# 队列 = 管道 + 锁
# from multiprocessing import Pipe
# pip = Pipe()
# pip.send()
# pip.recv()
import queue
# from multiprocessing import Queue
# q = Queue(5)
# q.put(1)
# q.put(2)
# q.put(3)
# q.put(4)
# q.put(5) # 当队列为满的时候再向队列中放数据 队列会阻塞
# print('5555555')
# try:
# q.put_nowait(6) # 当队列为满的时候再向队列中放数据 会报错并且会丢失数据
# except queue.Full:
# pass
# print('6666666')
#
# print(q.get())
# print(q.get())
# print(q.get()) # 在队列为空的时候会发生阻塞
# print(q.get()) # 在队列为空的时候会发生阻塞
# print(q.get()) # 在队列为空的时候会发生阻塞
# try:
# print(q.get_nowait()) # 在队列为空的时候 直接报错
# except queue.Empty:pass
生产者消费者模型
第一种方式:
import time
import random
from multiprocessing import Process,Queue
def producer(q,name,food):
for i in range(10):
time.sleep(random.random())
fd = '%s%s'%(food,i)
q.put(fd)
print('%s生产了一个%s'%(name,food))
def consumer(q,name):
while True:
food = q.get()
if not food:break
time.sleep(random.randint(1,3))
print('%s吃了%s'%(name,food))
def cp(c_count,p_count):
q = Queue(10)
for i in range(c_count):
Process(target=consumer, args=(q, 'alex')).start()
p_l = []
for i in range(p_count):
p1 = Process(target=producer, args=(q, 'wusir', '泔水'))
p1.start()
p_l.append(p1)
for p in p_l:p.join()
for i in range(c_count):
q.put(None)
if __name__ == '__main__':
cp(2,3)
流程:消费者开启进程get,生产者开启进程put,加入队列,全部结束后(jion),队列put(None),消费者get到空终止
第二种方式:
import time
import random
from multiprocessing import JoinableQueue,Process
def producer(q,name,food):
for i in range(10):
time.sleep(random.random())
fd = '%s%s'%(food,i)
q.put(fd)
print('%s生产了一个%s'%(name,food))
q.join()
def consumer(q,name):
while True:
food = q.get()
time.sleep(random.random())
print('%s吃了%s'%(name,food))
q.task_done()
if __name__ == '__main__':
jq = JoinableQueue()
p =Process(target=producer,args=(jq,'wusir','泔水'))
p.start()
c = Process(target=consumer,args=(jq,'alex'))
c.daemon = True
c.start()
p.join()
JoinableQueue同样通过multiprocessing使用。
创建队列的另外一个类:
JoinableQueue([maxsize]):这就像是一个Queue对象,但队列允许项目的使用者通知生成者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。
参数介绍:
maxsize是队列中允许最大项数,省略则无大小限制。
方法介绍:
JoinableQueue的实例p除了与Queue对象相同的方法之外还具有:
q.task_done():使用者使用此方法发出信号,表示q.get()的返回项目已经被处理。如果调用此方法的次数大于从队列中删除项目的数量,将引发ValueError异常
q.join():生产者调用此方法进行阻塞,直到队列中所有的项目均被处理。阻塞将持续到队列中的每个项目均调用q.task_done()方法为止
