AQS:是AbstractQueuedSynchronizer的简称,JUC的核心

  底层是sync queue双向链表,还可能有condition queue单向链表,使用Node实现FIFO队列,可以用于构建同步队列或者其他同步装置的基础框架

SRE实战 互联网时代守护先锋,助力企业售后服务体系运筹帷幄!一键直达领取阿里云限量特价优惠。

  利用了int类型表示状态,在AQS中有个state的成员变量,基于AQS的ReentrantLock,state表示获取锁的线程数,等于0,没有,1有,大于1表示重入锁的数量。

基于模板方法,需要继承AQS,重写某些方法

可以实现排它锁和共享锁的模式(独占Reentrantlock、共享countdownlatch,同时实现一种)

  并发容器J.U.C 随笔

 

实现思路:

  首先AQS中维护了一个queue来管理锁,线程会尝试获取锁,如果失败,就将当前线程以及等待状态等信息封装成一个node节点,加入到sync queue的tail,

head node的线程释放锁的时候,会唤醒队列中的后继线程,而后续节点在获取锁成功的时候把自己设置为首节点

  就是因为这些设计,jdk有很多基于AQS的设计,一些常用的组件:

  countdownlatch、semaphore、CyclicBarrier、Reentrantlock、Condition、Futuretask等


同步组件:

CountDownLatch(闭锁):

  可以实现阻塞当前的线程,通过一个计数器进行初始化,这个计数器都是进行原子操作,只能同时有个线程操作这个计数器,调用CountDownLatch的await()

会处于阻塞状态,其他线程调用Countdown(),每次减一,直到计数器变成零

  这时候所有因为调用await()阻塞的线程才能继续往下执行,CountDownLatch只能执行一次,不能重置,想要使用重置的计数器,可以使用


CyclicBarrier

  await()需要等到countdown()将计数器减到0,才会执行后续的代码。await()可以有时间参数,选择等待多长时间过后就会执行await后续的代码。

countDown()尽量卸载finally内部

countDownLatch.await();

countDownLatch.await(10, TimeUnit.MILLISECONDS);

 

使用场景:

Semaphore:信号量

  控制并发访问的个数,用于只能提供有限访问的资源
  

semaphore.acquire(3); // 获取多个许可
  test(threadNum);
  semaphore.release(3); // 释放多个许可

  Semaphore semaphore = new Semaphore(2);允许线程数一定要大于等于acquire和release的个数

  semaphore.tryAcquire()尝试获取许可,没有获取许可的线程都会丢弃
  semaphore.tryAcquire(5000, TimeUnit.MILLISECONDS)在5000ms中尝试获取许可

 

 

CyclicBarrier

  用于多线程计算数据,最后合并计算结果,例如Excel很多页流水,通过多线程计算每一页流水,最后计算总的

通过调用await()方法,线程进入等待状态,计数器进行加一操作,当值等于设置的初始值时,所有阻塞的线程继续执行

private static CyclicBarrier barrier = new CyclicBarrier(5, () -> {
    log.info("callback is running");
});

通过使用lambda,当计数器满足条件优先执行lambda表达式里面的代码

 

CyclicBarrier和CountDownLatch区别

  CountDownLatch只能使用一次,而CyclicBarrier可以循环利用,使用reset进行重置

  CountDownLatch描述:1或N个线程需要等待其他线程完成某个操作,才能继续往下执行

  CyclicBarrier:多个线程之间相互等待,知道所有线程都满足某个条件才能继续执行后续操作,是各个线程直接相互等待的操作

countdown表现:    CountDownLatch表现:

1 is ready        1 is ready
2 is ready        2 is ready
3 is ready        3 is ready
1 continue        1 continue
2 continue        2 continue
3 continue        3 continue
4 is ready        4 is ready
4 continue        5 is ready
5 is ready        6 is ready
5 continue        4 continue
6 is ready        5 continue
6 continue        6 continue

 

LOCK

ReadWriteLock

ReentrantReadWriteLock:

  支持多线程读,如果有一个线程已经占用了读锁,则此时其他线程如果要申请写锁,则申请写锁的线程会一直等待释放读锁。

如果有一个线程已经占用了写锁,则此时其他线程如果申请写锁或者读锁,则申请的线程会一直等待释放写锁。

 

一个是读操作相关的锁,称为共享锁;一个是写相关的锁,称为排他锁

一个线程要想同时持有写锁和读锁,必须先获取写锁再获取读锁;写锁可以“降级”为读锁;读锁不能“升级”为写锁。

读读共享、其他都是互斥

 

ReentrantLock:

  注意不要把lock的实例化做成局部变量,每个线程执行该方法时都会保存一个副本,那么理所当然每个线程执行到lock.lock()处获取的是不同的锁,

这样lock就不能起作用了。

 

1、synchronized:可重入性,jvm实现,在之前和ReentrantLock性能差别很大,但是引入了偏向锁、轻量级锁,效率已经相差不大,只能使用非公平锁,

可以通过一些工具进行监控,jvm自动做加锁、解锁操作

2、ReentrantLock:可重入性,jdk实现,粒度更小,可以指定公平锁(先等待的线程先获得锁)或非公平锁,提供一个condition类,

可以实现分组唤醒需要唤醒的线程,而synchronized关键字要么唤醒一个线程,要么全部线程,可以通过lockInterruptibly()中断等待锁的线程机制,

一定要记得在finally释放锁

 

3、StampLock:对吞吐量有很大的改进,性能上有很大的提升,特别是适合读操作比较多的情况

 

ReentrantLock、ReentrantReadWriteLock、StampLock等lock都是对象层面的锁定

 

锁使用原则:

  当只有少量线程竞争的时候,可以使用synchronized,而且不会引发死锁

  线程竞争不少,线程增长能够预估,可以选择ReentrantLock

 

可重入锁:

  synchronized和ReentrantLock都是可重入锁

锁的分配机制是基于线程的分配,而不是基于方法的分配,在method1中已经获取了对象锁,在方法内部调用method2不用重新获取锁。

 

可中断锁:

  synchronized就不是可中断锁,而Lock是可中断锁。

  lockInterruptibly()的用法时已经体现了Lock的可中断性。

 

公平锁:

  公平锁是指当一个锁被释放的时候,等待时间最长的线程会获取该锁,非公平锁可能导致某些线程永远不会获取到锁

  synchronized不是公平锁,ReentrantLock和ReentrantReadWriteLock,它默认情况下是非公平锁,但是可以设置为公平锁。

  ReentrantLock在实例化的时候参数true表示公平锁,false表示非公平锁,而且有很多判断锁状态的方法。

 

读写锁:

  多线程读操作不会发生冲突

condition:await()、signal()可以实现多路通知功能,但是通知部分线程要使用多个condition类,否则会全部唤醒

 

J.U.C组件拓展:

Callable与Runnable、Thread接口对比:

  Future接口:可以得到线程任务方法的返回值

  FutureTask类:实现了Runnable、Future,使用场景:线程A做一件事,线程B做别的事,在需要的时候可以的到线程A的返回值

  Fork/Join(jdk1.7):就是把大任务拆分成若干小任务,放到双端队列,每个队列分配一个线程,先做完的线程帮助其他线程,一个从下面,一个从上面,

并行执行,最终汇总结果,但是某些情况下还是有线程竞争的情况

 

局限性:

  1、只能通过fork、join进行操作 2、不能有io操作 3、任务不能抛出检查异常

 

Queue:

  除了优先级队列和LIFO队列外,队列都是以FIFO(先进先出)的方式对各个元素进行排序的

add(E e):

  将元素e插入到队列末尾,如果插入成功,则返回true;如果插入失败(即队列已满),则会抛出异常

remove():

  移除队首元素,若移除成功,则返回true;如果移除失败(队列为空),则会抛出异常

offer(E e):

  将元素e插入到队列末尾,如果插入成功,则返回true;如果插入失败(即队列已满),则返回false

poll():

  移除并获取队首元素,若成功,则返回队首元素;否则返回null

peek():

  获取队首元素,若成功,则返回队首元素;否则返回null
 
  对于非阻塞队列,一般情况下建议使用offer、poll和peek三个方法,不建议使用add和remove方法。因为使用offer、poll和peek三个方法可以通过返回值

  判断操作成功与否,而使用add和remove方法却不能达到这样的效果。注意,非阻塞队列中的方法都没有进行同步措施。

  阻塞队列对于上面五个方法有做同步处理,而非阻塞队列没有同步

put(E e)
take()
offer(E e,long timeout, TimeUnit unit)
poll(long timeout, TimeUnit unit)

put方法用来向队尾存入元素,如果队列满,则等待;
take方法用来从队首取元素,如果队列为空,则等待;
offer方法用来向队尾存入元素,如果队列满,则等待一定的时间,当时间期限达到时,如果还没有插入成功,则返回false;否则返回true;
poll方法用来从队首取元素,如果队列空,则等待一定的时间,当时间期限达到时,如果取到,则返回null;否则返回取得的元素;

 

BlockingQueue:

  主要用在生产者消费者场景,不需要关注什么时候阻塞和唤醒

实现类:

ArrayBlockingQueue:

  有界的阻塞队列,就是容量是有限的,初始化指定容量大小,FIFO,内部是由数组实现

DelayQueue:

  必须实现Delay接口,它的元素要进行排序,应用场景:定时关闭连接、缓存对象,超时处理等

LinkedBlockingQueue:

  内部是链表,和ArrayBlockingQueue相似,FIFO

priorityBlockingQueue:

  允许插入null

SynchronousQueue:

  只能插入一个值,插入一个元素就会阻塞,也叫同步队列


线程池:

优点:

  1、减少了创建和销毁线程的次数,每个工作线程都可以被重复利用,可执行多个任务

  2、可以根据系统的承受能力,调整线程池中工作线程的数据,防止因为消耗过多的内存导致服务器崩溃

 

ThreadPoolExecutor:

  注意corePoolSize、maximumPoolSize、workQueue等参数之间的关系,详情见9-1

1、corePoolSize

  核心池的大小。在创建了线程池之后,默认情况下,线程池中没有任何线程,而是等待有任务到来才创建线程去执行任务。默认情况下,在创建了线程池之后,

  线程池中的线程数为0,当有任务到来后就会创建一个线程去执行任务

2、maximumPoolSize

  池中允许的最大线程数,这个参数表示了线程池中最多能创建的线程数量,当任务数量比corePoolSize大时,任务添加到workQueue,当workQueue满了,

将继续创建线程以处理任务,maximumPoolSize表示的就是wordQueue满了,线程池中最多可以创建的线程数量


3、keepAliveTime

  只有当线程池中的线程数大于corePoolSize时,这个参数才会起作用。当线程数大于corePoolSize时,终止前多余的空闲线程等待新任务的最长时间


4、unit

  keepAliveTime时间单位


5、workQueue

  存储还没来得及执行的任务


6、threadFactory

  执行程序创建新线程时使用的工厂

 

7、handler

  由于超出线程范围和队列容量而使执行被阻塞时所使用的处理程序

 

1、池中线程数小于corePoolSize,新任务都不排队而是直接添加新线程

2、池中线程数大于等于corePoolSize,workQueue未满,首选将新任务加入workQueue而不是添加新线程

3、池中线程数大于等于corePoolSize,workQueue已满,但是线程数小于maximumPoolSize,添加新的线程来处理被添加的任务

4、池中线程数大于大于corePoolSize,workQueue已满,并且线程数大于等于maximumPoolSize,新任务被拒绝,使用handler处理被拒绝的任务

强烈建议程序员使用较为方便的Executors工厂方法Executors.newCachedThreadPool()(无界线程池,可以进行线程自动回收)、

Executors.newFixedThreadPool(int)(固定大小线程池)和Executors.newSingleThreadExecutor()(单个后台线程),它们均为大多数使用场景预定义了设置。

所以重点关注一下JDK推荐的Executors


并发最佳实践:

  1、使用本地变量

  2、使用不可变类

  3、最小化锁的作用范围:S=1/(1-a+a/n)

  4、宁可使用同步也不使用线程的wait和notify

  5、使用BlockingQueue实现生产-消费模式

  6、使用并发集合而不是加了锁的同步集合

  7、使用Semaphore创建有界的访问

  8、在使用synchronized时,宁可使用同步代码块,也不使用同步方法

  9、避免使用静态变量

 

扫码关注我们
微信号:SRE实战
拒绝背锅 运筹帷幄