2019-04-17-day034-线程与数据共享
内容回顾
锁
- 互斥锁
- 能够保护数据的安全性
- 保证对于数据的修改操作同一时刻多个进程只有一个进程执行
- 进程数据不安全 : 同时修改文件/数据库/其他共享资源的数据
###队列 -- 实现了进程之间的通信(IPC) - 进程队列 -- 进程安全
- 从multiprocessing导入Queue
- q = Queue()
- put/get/put_nowait(丢数据)/get_nowait
- 基于 管道 + 锁,管道进程不安全
- 管道 基于文件级别的socket实现的
import queue
from multiprocessing import Queue
q = Queue(5)
q.put(1)
q.put(1)
q.put(1)
q.put(1)
q.put(1)
print('________')
try:
q.put_nowait(1)
except queue.Full:
pass
print('********')
print(q.get())
print(q.get())
print(q.get())
print(q.get())
print(q.get())
try:
print(q.get_nowait())
except queue.Empty:
pass
生产者消费者模型
- 一个生产数据和消费数据的完整的流程解耦成为两个部分 : 生产 和 消费
- 由于生产速度和消费速度不一致,所以需要我们调整生产者和消费者的个数来达到效率平衡
互斥锁
from multiprocessing import Lock
# 互斥锁:在同一个进程内,也有锁的竞争关系
# 在同一个进程中连续acquire多次会产生死锁
lock = Lock()
lock.acquire() * 拿走钥匙
print(123)
lock.acquire() * 又想要钥匙 卡住
print(456)
lock.release()
lock.release()
数据共享
from multiprocessing import Process,Manager,Lock
def func(dic,lock):
with lock:
dic['count'] -= 1
if __name__ == '__main__':
lock = Lock()
m = Manager()
dic = m.dict({'count':100})
p_l = []
for i in range(100):
p = Process(target=func,args=(dic,lock))
p.start()
p_l.append(p)
for p in p_l:p.join()
print(dic)
- 100减100次1这么慢? 不是减操作造成的 而是开启进程 管理进程 销毁进程拖慢了程序的执行速度
- 为什么在这里出现了数据不安全的现象?
- 什么情况下会出现数据不安全 : Manager类当中对字典\列表 += -= *= /=
- 如何解决 : 加锁
线程
- 线程是进程中的一个单位
- 进程是计算机中最小的资源分配单位
- 线程是计算机中被CPU调度的最小单位
开启进程 关闭进程 切换进程都需要时间
- 你的电脑的资源还是有限的
- 开启过多的进程会导致你的计算机崩溃
- count_cpu*2 = 8
- 同时对8个客户端服务
- qps 每秒的请求数 2w
- 2000/8 = 250台机器
数据隔离的
- ftp server端
- qq server端
###并发 :同一时刻能同时接收多个客户端的请求
###线程 - 轻型进程 轻量级的进程
- 在同一个进程中的多个线程是可以共享一部分数据的
- 线程的开启\销毁\切换都比进程要高效很多
多核利用
- 多个进程可不可以利用多核(多个CPU) 可以
- 多个线程可不可以利用多核(多个CPU) 可以
多进程和多线程之间的区别
- 进程 数据隔离 开销大
- 线程 数据共享 开销小
python当中的多线程
- 不能访问多个cpu
- 是Cpython解释器导致的,GIL锁
- GIL锁 = 全局解释器锁,导致了同一时刻只能有一个线程访问CPU
- jpython pypy解释器中的多线程是可以访问多核的
from dis import dis
def func():
a =[]
a.append(1)
dis(func)
python --> 字节码 --> 机器码
python代码机器语言
利用多核 意味着 多个CPU可以同时计算线程中的代码
IO操作
- accept负责从网络上读取数据
- open() 调用操作系统的指令开启一个文件
分布式\多进程
web框架
- django flask tornado twistwed scrapy sanic
threading模块
import threading
threading 和 multiprocessing
先有的threading模块
- 没有池的功能
- multiprocessing完全模仿threading模块完成的
- 实现了池的功能
- concurrent.futures
- 实现了线程池\进程池
import os
import time
from threading import Thread
def func():
time.sleep(1)
print('in func',os.getpid())
print('in main',os.getpid())
for i in range(20):
* func()
Thread(target=func).start()
传参数
import os
import time
from threading import Thread
def func(i):
time.sleep(1)
print('in func',i,os.getpid())
print('in main',os.getpid())
for i in range(20):
* func()
Thread(target=func,args=(i,)).start()
if name == '__main__'在开启线程的时候可以不加
- 在线程部分不需要通过import来为新的线程获取代码
- 因为新的线程和之前的主线程共享同一段代码
- 不需要import 也就不存在在子线程中又重复了一次创建线程的操作
- 所以就不必要if name == '__main__'
开销
import time
from multiprocessing import Process
from threading import Thread
def func(a):
a += 1
if __name__ == '__main__':
start = time.time()
t_l = []
for i in range(100):
t = Thread(target=func,args=(i,))
t.start()
t_l.append(t)
for t in t_l:t.join()
print('thread :',time.time() - start)
start = time.time()
t_l = []
for i in range(100):
t = Process(target=func, args=(i,))
t.start()
t_l.append(t)
for t in t_l: t.join()
print('Process',time.time() - start)
多个线程之间的全局变量是共享的
from threading import Thread
tn = 0
def func():
global tn
tn += 1
t_l = []
for i in range(100):
t = Thread(target=func)
t.start()
t_l.append(t)
for t in t_l:t.join()
print(tn)
进程之间数据隔离
from multiprocessing import Process
pn = 0
def func():
global pn
pn += 1
if __name__ == '__main__':
p_l = []
for i in range(100):
p = Process(target=func)
p.start()
p_l.append(p)
for p in p_l:p.join()
print(pn)
线程中的几个其他方法
import os
from threading import Thread,currentThread
def func():
t = currentThread()
print(t.name,t.ident,os.getpid())
tobj = Thread(target=func)
tobj.start()
print('tobj :',tobj)
t2 = currentThread()
print(t2.name,t2.ident,os.getpid())
lst = [1,2,3,4,5,6,7,8,9,10]
按照顺序把列表中的每一个元素都计算一个平方
使用多线程的方式
并且将结果按照顺序返回
import time
import random
from threading import Thread,currentThread
dic = {}
def func(i):
t = currentThread()
time.sleep(random.random())
dic[t.ident] = i**2
t_lst = []
for i in range(1,11):
t = Thread(target=func,args=(i,))
t.start()
t_lst.append(t)
for t in t_lst:
t.join()
print(dic[t.ident])
from threading import active_count # 返回当前有多少个正在工作的线程
import time
import random
from threading import Thread,currentThread
dic = {}
def func(i):
t = currentThread()
time.sleep(random.random())
dic[t.ident] = i**2
for i in range(10):
Thread(target=func,args=(i,)).start()
print(active_count()) * ???
线程有terminate么?
- 没有terminate 不能强制结束
- 所有的子线程都会在执行完所有的任务之后自动结束
作业
- 通读博客 :操作系统 进程 线程
- 把课上的代码都搞明白
- 多线程实现一个并发的tcp协议的socket server
- 明天默写和线程

更多精彩