内容回顾

  • 互斥锁
  • 能够保护数据的安全性
    • 保证对于数据的修改操作同一时刻多个进程只有一个进程执行
  • 进程数据不安全 : 同时修改文件/数据库/其他共享资源的数据
    ###队列 -- 实现了进程之间的通信(IPC)
  • 进程队列 -- 进程安全
    • 从multiprocessing导入Queue
    • q = Queue()
    • put/get/put_nowait(丢数据)/get_nowait
  • 基于 管道 + 锁,管道进程不安全
  • 管道 基于文件级别的socket实现的
import queue
from multiprocessing import Queue

q = Queue(5)
q.put(1)
q.put(1)
q.put(1)
q.put(1)
q.put(1)
print('________')
try:
    q.put_nowait(1)
except queue.Full:
    pass
print('********')
print(q.get())
print(q.get())
print(q.get())
print(q.get())
print(q.get())
try:
    print(q.get_nowait())
except queue.Empty:
    pass

生产者消费者模型

  • 一个生产数据和消费数据的完整的流程解耦成为两个部分 : 生产 和 消费
  • 由于生产速度和消费速度不一致,所以需要我们调整生产者和消费者的个数来达到效率平衡

互斥锁

from multiprocessing import Lock
# 互斥锁:在同一个进程内,也有锁的竞争关系
# 在同一个进程中连续acquire多次会产生死锁
lock = Lock()
lock.acquire()    * 拿走钥匙
print(123)
lock.acquire()   * 又想要钥匙 卡住
print(456)
lock.release()
lock.release()

数据共享

from multiprocessing import Process,Manager,Lock

def func(dic,lock):
   with lock:
       dic['count'] -= 1

if __name__ == '__main__':
   lock = Lock()
   m = Manager()
   dic = m.dict({'count':100})
   p_l = []
   for i in range(100):
       p = Process(target=func,args=(dic,lock))
       p.start()
       p_l.append(p)
   for p in p_l:p.join()
   print(dic)
  • 100减100次1这么慢? 不是减操作造成的 而是开启进程 管理进程 销毁进程拖慢了程序的执行速度
  • 为什么在这里出现了数据不安全的现象?
  • 什么情况下会出现数据不安全 : Manager类当中对字典\列表 += -= *= /=
  • 如何解决 : 加锁

线程

  1. 线程是进程中的一个单位
  2. 进程是计算机中最小的资源分配单位
  3. 线程是计算机中被CPU调度的最小单位

开启进程 关闭进程 切换进程都需要时间

  • 你的电脑的资源还是有限的
  • 开启过多的进程会导致你的计算机崩溃
  • count_cpu*2 = 8
  • 同时对8个客户端服务
  • qps 每秒的请求数 2w
  • 2000/8 = 250台机器

数据隔离的

  • ftp server端
  • qq server端
    ###并发 :同一时刻能同时接收多个客户端的请求
    ###线程
  • 轻型进程 轻量级的进程
  • 在同一个进程中的多个线程是可以共享一部分数据的
  • 线程的开启\销毁\切换都比进程要高效很多

多核利用

  1. 多个进程可不可以利用多核(多个CPU) 可以
  2. 多个线程可不可以利用多核(多个CPU) 可以

多进程和多线程之间的区别

  • 进程 数据隔离 开销大
  • 线程 数据共享 开销小

python当中的多线程

  • 不能访问多个cpu
  • 是Cpython解释器导致的,GIL锁
  • GIL锁 = 全局解释器锁,导致了同一时刻只能有一个线程访问CPU
  • jpython pypy解释器中的多线程是可以访问多核的
from dis import dis
def func():
    a =[]
    a.append(1)

dis(func)

python --> 字节码 --> 机器码
python代码机器语言

利用多核 意味着 多个CPU可以同时计算线程中的代码

IO操作

  • accept负责从网络上读取数据
  • open() 调用操作系统的指令开启一个文件

分布式\多进程

web框架

  • django flask tornado twistwed scrapy sanic

threading模块

import threading

threading 和 multiprocessing

先有的threading模块
  • 没有池的功能
  • multiprocessing完全模仿threading模块完成的
    • 实现了池的功能
  • concurrent.futures
    • 实现了线程池\进程池
import os
import time
from threading import Thread

def func():
    time.sleep(1)
    print('in func',os.getpid())

print('in main',os.getpid())
for i in range(20):
    * func()
    Thread(target=func).start()

传参数

import os
import time
from threading import Thread

def func(i):
    time.sleep(1)
    print('in func',i,os.getpid())

print('in main',os.getpid())
for i in range(20):
    * func()
    Thread(target=func,args=(i,)).start()

if name == '__main__'在开启线程的时候可以不加

  • 在线程部分不需要通过import来为新的线程获取代码
  • 因为新的线程和之前的主线程共享同一段代码
  • 不需要import 也就不存在在子线程中又重复了一次创建线程的操作
  • 所以就不必要if name == '__main__'

开销

import time
from multiprocessing import Process
from threading import Thread

def func(a):
    a += 1

if __name__ == '__main__':
    start = time.time()
    t_l = []
    for i in range(100):
        t = Thread(target=func,args=(i,))
        t.start()
        t_l.append(t)
    for t in t_l:t.join()
    print('thread :',time.time() - start)

    start = time.time()
    t_l = []
    for i in range(100):
        t = Process(target=func, args=(i,))
        t.start()
        t_l.append(t)
    for t in t_l: t.join()
    print('Process',time.time() - start)

多个线程之间的全局变量是共享的

from threading import Thread
tn = 0
def func():
    global tn
    tn += 1
t_l = []
for i in range(100):
    t = Thread(target=func)
    t.start()
    t_l.append(t)
for t in t_l:t.join()
print(tn)

进程之间数据隔离

from multiprocessing import Process
pn = 0
def func():
    global pn
    pn += 1
if __name__ == '__main__':
    p_l = []
    for i in range(100):
        p = Process(target=func)
        p.start()
        p_l.append(p)
    for p in p_l:p.join()
    print(pn)

线程中的几个其他方法

import os
from threading import Thread,currentThread
def func():
    t = currentThread()
    print(t.name,t.ident,os.getpid())

tobj = Thread(target=func)
tobj.start()
print('tobj :',tobj)
t2 = currentThread()
print(t2.name,t2.ident,os.getpid())


lst = [1,2,3,4,5,6,7,8,9,10]
按照顺序把列表中的每一个元素都计算一个平方
使用多线程的方式
并且将结果按照顺序返回
import time
import random
from threading import Thread,currentThread
dic = {}
def func(i):
    t = currentThread()
    time.sleep(random.random())
    dic[t.ident] = i**2
t_lst = []
for i in range(1,11):
    t = Thread(target=func,args=(i,))
    t.start()
    t_lst.append(t)
for t in t_lst:
    t.join()
    print(dic[t.ident])
from threading import active_count   # 返回当前有多少个正在工作的线程
import time
import random
from threading import Thread,currentThread
dic = {}
def func(i):
    t = currentThread()
    time.sleep(random.random())
    dic[t.ident] = i**2

for i in range(10):
    Thread(target=func,args=(i,)).start()
print(active_count())    * ???

线程有terminate么?

  • 没有terminate 不能强制结束
  • 所有的子线程都会在执行完所有的任务之后自动结束

作业

  1. 通读博客 :操作系统 进程 线程
  2. 把课上的代码都搞明白
  3. 多线程实现一个并发的tcp协议的socket server
  4. 明天默写和线程
扫码关注我们
微信号:SRE实战
拒绝背锅 运筹帷幄