并发编程-线程
线程定义:顾名思义,就是一条流水线工作的过程,一条流水线属于一个车间,一个车间的工作过程就是一个进程。
进程:资源整合单位
线程:执行单位
补充:1 所有进程里面真正执行的是线程(进程里面都有默认线程)
2 进程知识用来把资源互相隔离开,而线程才是真正负责cpu来调动他的
线程与进程的区别:
1 创建线程比进程开销小(开一个进程,需要申请内存空间,而线程在进程里面,无需申请内存空 间)。
SRE实战 互联网时代守护先锋,助力企业售后服务体系运筹帷幄!一键直达领取阿里云限量特价优惠。 2 多线程一定是在一个进程里面开启的,共享进程里面资源
3 线程启动的速度快
4 同一个进程下的多个线程共享进程的资源,而多个进程之间内存空间是隔离的
5 线程可以跟它所在的进程之内的线程通信
为何要使用多线程:
多线程指的是在一个进程中开启多个线程,如果多个任务共用同一块地址空间,那么必须在一个进程内开启多个线程。
1 多线程共享一个进程的地址空间
2 线程比进程更轻量级,线程比进程更容易创建和撤销,在许多操作系统中创建一个线程比进程快10-100倍,在有大量线程需要动态和快速修改时,这一特性作用明显。
3 若多个线程都是cpu密集型的,那么并不能获得性能上的增强,但如果存在大量的计算和大量的io处理,拥有多个线程允许这些活动彼此重叠运行,从而会加快程序执行的速度。
4 在cpu系统中,为了最大限度的利用多核,可以开启多个线程,比开启进程开销小很多(这一条并不适用于python)
创建线程的两种方式:
开启线程的两种方式
一
from threading import Thread
import time
# 一
def task(name):
print('%s is running!'%name)
time.sleep(1)
print('%s is over!'%name)
if __name__ == '__main__':
t = Thread(target=task,args=('天上人间',))
t.start()
t.join()
print('落日黄昏!')
二
from threading import Thread
import time
class MyThread(Thread):
def __init__(self,name):
super().__init__()
self.name = name
def run(self):
print('%s is runing!'%self.name)
time.sleep(1)
print('%s is over!'%self.name)
if __name__ == '__main__':
t = MyThread('天上人间!')
t.start()
t.join()
print('人间清凉!')
线程中的互斥锁使用实例:
线程之间互斥锁的使用
from threading import Thread,Lock
import time
mutex = Lock()
n = 100
def run_a():
global n
mutex.acquire()
tem = n
time.sleep(0.1)
n = tem-1
mutex.release()
l1 = []
for i in range(100):
t = Thread(target=run_a)
t.start()
l1.append(t)
for i in l1:
i.join()
print(n)
守护线程:
守护线程与守护进程的区别
1.守护进程:主进程会等到所有的非守护进程结束,才销毁守护进程。也就是说(主进程运行完了被守护的那个就干掉了)
2.守护线程:主线程等非守护线程全都结束它才结束
守护线程
from multiprocessing import Process
from threading import Thread,currentThread
import time,os
def talk1():
time.sleep(1)
print('hello')
def talk2():
time.sleep(3)
print('you see see')
if __name__ == '__main__':
t1 = Thread(target=talk1)
t2 = Thread(target=talk2)
# t1 = Process(target=talk1)
# t2 = Process(target=talk2)
t1.daemon = True
t1.start()
t2.start()
print('主线程',os.getpid())
全局解释器锁GIL:
定义:在Cpython解释器中,同一个进程下开启的多线程,同一时刻只能有一个线程执行,无法利用多核又是
1.python GIL ==》全局的解释器锁
2.锁的目的:牺牲了效率,保证了数据的安全
- 保护不同的数据加不同的锁
- python自带垃圾回收机制
- 谁拿到GIL锁谁就获得Cpython解释器的执行权限
- GIL锁保护的是Cpython解释器数据的安全,而不会保护你自己程序的数据的安全
- GIL锁当遇到阻塞的时候,就被迫的把锁给释放了,那么其它的就开始抢锁了,抢到后把值修改了,但是第一个拿到的还在原本拿到的那个数据那停留着呢,当再次拿到锁的时候数据已经修改了,而你还拿原来的数据,这就导致了一个数据的混乱,所以也就保证不了数据的安全了,需要自己在给数据加把锁 :mutex = Lock()。
同步锁:GIL 与 lock 是两把锁,保护的数据不一样,前者是解释器级别的(保护的就是解释器级别的数据,比如垃圾回收机制),后者是保护用户自己开发的应用程序的数据,很明显GIL不负责这件事,只能用户自定义加锁处理,即Lock。
补充了解:因为python解释器会定期帮你进行内存回收,你可以理解为python解释器里面有一个独立的线程,每过一段时间它会扫描一遍全局看看哪些内存是可以被清空的,此时你自己程序里的线程和py解释器自己的线程是并发运行的,假设你的线程删除了一个变量,py解释器的垃圾回收机制在清空这个变量的过程中,可一个其他的线程正好又重新给这个还没来得及清空的内存空间赋值了,结果就有可能新赋值的数据被删除了,为了解决类似的为你,py解释器简单粗暴的加了锁,即当一个线程运行时,其他人都不能动,这样就解决了上述问题。
多线程应用场景性能测试:
1.多核也就是多个CPU
1.cpu越多,提高的是计算的性能
2.如果程序是IO操作的时候(多核和单核是一样的),再多的Cpu也没有意义。
2.实现并发:
1.一个进程下,开多个线程
2.开多个进程
3.多进程:
优点:可以利用多核
缺点:开销大
4.多线程:
优点:开销小
缺点:不可以利用多核
5.多进程和多线程的应用场景:
1.计算密集型:也就是计算多,IO少
如果是计算密集型,就用多进程(例如金融分析)
2.IO密集型:也就是IO多,计算少
如果是IO密集型,就用多线程(一般遇到的都是IO密集型)
示例:
# 计算密集型
from multiprocessing import Process
from threading import Thread
import os,time
def work():
res=0
for i in range(100000000):
res*=i
if __name__ == '__main__':
l=[]
print(os.cpu_count()) # 本机为12核
start=time.time()
for i in range(12):
# p=Process(target=work) #耗时8s多
p=Thread(target=work) #耗时44s多
l.append(p)
p.start()
for p in l:
p.join()
stop=time.time()
print('run time is %s' %(stop-start))
# IO密集型
from multiprocessing import Process
from threading import Thread
import threading
import os,time
def work():
time.sleep(2)
if __name__ == '__main__':
l=[]
print(os.cpu_count()) #本机为12核
start=time.time()
for i in range(400):
p=Process(target=work) #耗时12s多,大部分时间耗费在创建进程上
# p=Thread(target=work) #耗时2s多
l.append(p)
p.start()
for p in l:
p.join()
stop=time.time()
print('run time is %s' %(stop-start))
死锁与递归锁:
进程产生死锁现象,所谓死锁是指两个或两个以上的进程在执行过程中因争夺资源而造成的一种互相等待的现象,若无外力作用他们都将无法推进下去,此时称系统处于死锁状态或系统系统产生了死锁,这些永远在互相等待的进程称为死锁进程。
解决方法:递归锁
在py中为了支持在同一线程中多次请求统一资源,py提供了可重入锁RLock
这个RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。
直到一个线程所有的acquire都被release,其他的线程才能获得资源
示例:
from threading import Thread,Lock,RLock
import time
# mutexA=Lock()
# mutexB=Lock()
mutexB=mutexA=RLock() # 使用递归锁
class Mythead(Thread):
def run(self):
self.f1()
self.f2()
def f1(self):
mutexA.acquire()
print('%s 抢到A锁' %self.name)
mutexB.acquire()
print('%s 抢到B锁' %self.name)
mutexB.release()
mutexA.release()
def f2(self):
mutexB.acquire()
print('%s 抢到了B锁' %self.name)
time.sleep(2)
mutexA.acquire()
print('%s 抢到了A锁' %self.name)
mutexA.release()
mutexB.release()
if __name__ == '__main__':
for i in range(100):
t=Mythead()
t.start()
Event事件:
线程的一个关键特性是每个线程都是独立运行且状态不可预估。如果程序中的其它线程需要通过判断某个线程的状态来确定自己下一步的操作,这时线程同步问题就会变得非常棘手,为了解决这些问题,我们需要使用threding库中的Event对象。对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生,在初始情况,Event对象中的信号i安置被设置为假。
如果有一个线程等待一个Event对象,而这个Event对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。一个线程如果将一个Event对象额信号标志设置为真,他将唤醒所有等待这个Even对象的线程。如果一个线程等待一个已经被设置为真Event对象,那么它会忽略这个事件,继续执行。
示例:
from threading import Thread,Event
import time
event = Event() # 造了一个红绿灯
def light():
print('红灯亮着的')
time.sleep(3)
print('绿灯亮了')
event.set()
def car(name):
print('%s 车正在等红灯'%name)
event.wait()
print('%s 车加油门飙车走了'%name)
if __name__ == '__main__':
t = Thread(target=light)
t.start()
for i in range(10):
t = Thread(target=car,args=('%s'%i,))
t.start()
线程queue:
queue对列:使用import queue,用法与进程Queue一样
queue.Queue 先进先出:
import queue
queue.Queue() #先进先出
q=queue.Queue(3)
q.put(1)
q.put(2)
q.put(3)
print(q.get())
print(q.get())
print(q.get())
queue.LifoQueue() 后进先出->堆栈:
queue.LifoQueue() #后进先出->堆栈
q=queue.LifoQueue(3)
q.put(1)
q.put(2)
q.put(3)
print(q.get())
print(q.get())
print(q.get())
queue.PriorityQueue() 优先级
q=queue.PriorityQueue(3) #优先级,优先级用数字表示,数字越小优先级越高
q.put((10,'a'))
q.put((-1,'b'))
q.put((100,'c'))
print(q.get())
print(q.get())
print(q.get())
信号量Semaphore(其实也是一把锁):
Semaphone管理一个内置的计数器
Semaphore与进程池看起来类似,但是是完全不同的概念
进程池:Pool(4),最大只能产生四个进程,而且从头到尾都只是这四个进程,不会产生新的。
信号量:信号量是产生的一堆进程/线程,即产生了多个任务都去抢那一把锁
补充:自定义的互斥锁如果是一个厕所,那么信号量就相当于公共厕所,门口挂着多个厕所的钥匙。抢和释放跟互斥锁一致
from threading import Thread,Semaphore
import time
import random
sm = Semaphore(5) # 公共厕所里面有五个坑位,在厕所外面放了五把钥匙
def task(name):
sm.acquire()
print('%s正在蹲坑'%name)
# 模拟蹲坑耗时
time.sleep(random.randint(1,5))
sm.release()
if __name__ == '__main__':
for i in range(20):
t = Thread(target=task,args=('伞兵%s号'%i,))
t.start()
