SynchronousQueue源码分析
不像ArrayBlockingQueue、LinkedBlockingDeque之类的阻塞队列依赖AQS实现并发操作,SynchronousQueue直接使用CAS实现线程的安全访问。由于源码中充斥着大量的CAS代码,不易于理解,所以按照笔者的风格,接下来会使用简单的示例来描述背后的实现模型。
队列的实现策略通常分为公平模式和非公平模式,接下来将分别进行说明。
SRE实战 互联网时代守护先锋,助力企业售后服务体系运筹帷幄!一键直达领取阿里云限量特价优惠。3.2.1、公平模式下的模型:
公平模式下,底层实现使用的是TransferQueue这个内部队列,它有一个head和tail指针,用于指向当前正在等待匹配的线程节点。
初始化时,TransferQueue的状态如下:接着我们进行一些操作:1、线程put1执行 put(1)操作,由于当前没有配对的消费线程,所以put1线程入队列,自旋一小会后睡眠等待,这时队列状态如下:
2、接着,线程put2执行了put(2)操作,跟前面一样,put2线程入队列,自旋一小会后睡眠等待,这时队列状态如下:
3、这时候,来了一个线程take1,执行了 take操作,take1线程一定可以和次首节点(head.next)也是匹配的呢?其实大家可以拿个纸画一画,就会发现真的就是这样的。
公平策略总结下来就是:队尾匹配队头出队。
执行后put1线程被唤醒,take1线程的 take()方法返回了1(put1线程的数据),这样就实现了线程间的一对一通信,这时候内部状态如下:4、最后,再来一个线程take2,执行take操作,这时候只有put2线程在等候,而且两个线程匹配上了,线程put2被唤醒,
take2线程take操作返回了2(线程put2的数据),这时候队列又回到了起点,如下所示:
一个节点加进去后,设置这个节点的waiter=这个线程,并且这个线程park阻塞,消费者生产者来了之后,从节点中获取waiter线程,唤醒线程,线程继续执行完,然后设置节点的waiter=null。线程中断时候,设置这个线程对应的节点的item是节点自己,来标记这个线程中断了或者超时了。
put方法调用,生产者或者消费者进去。增加一个节点进去要2步:尾节点next新增元素,尾节点指向新元素。CAS就是 需要加锁的地方用cas,不需要的地方不用CAS。就是这2步用CAS单线程操作成功。
第一个节点进来,旋转后设置线程,然后阻塞,后面节点进来,加到队列,设置节点的线程,然后阻塞。队列只可能全部是生产者节点或者消费者节点,尾进头出。头没有元素尾有元素。多个生产者或者消费者,入队的时候是多线程同时操作一个共享变量TransferQueue。
void advanceTail(QNode t, QNode nt) { //TransferQueue的方法 if (tail == t) //准备把队列的尾节点从t(局部变量的尾节点)变为tn,tail !=t,说明队列的尾节点已经改变了,别的线程帮助推进了,就不用动了。 UNSAFE.compareAndSwapObject(this, tailOffset, t, nt); //设置这个queue的尾节点,从t变成nt(不是这样的),原来的值是t就成功。CAS的时候一定要把原来的值传进去,看原来的值是不是这个,然后更新。 } //TransferQueue的方法 void advanceHead(QNode h, QNode nh) { // 将head节点从h(原来头结点)变为nh(原来头结点的第一个next节点) if (h == head && // 原来的head节点地址暂存h, UNSAFE.compareAndSwapObject(this, headOffset, h, nh)) //改变head的地址值指向 h.next = h; // forget old next,原来头结点next指针改变 }
需要进行原子操作的方法有:队列改变头结点,队列改变尾节点,队列改变cleanMe节点,节点修改next值,节点修改item值,都用CAS操作,就相当于加锁了,多个线程同时修改共享变量,只有一个成功。
CAS的时候一定要把原来的值传进去,看原来的值是不是这个,然后更新。
一个节点加进去后,设置这个节点的waiter=这个线程,并且这个线程park阻塞,消费者生产者来了之后,从节点中获取waiter线程,唤醒线程,线程继续执行完,然后设置节点的waiter=null。线程中断时候,设置这个线程对应的节点的item是节点自己,来标记这个线程中断了。
节点超时或者中断时候,删除节点:
