不像ArrayBlockingQueue、LinkedBlockingDeque之类的阻塞队列依赖AQS实现并发操作,SynchronousQueue直接使用CAS实现线程的安全访问。由于源码中充斥着大量的CAS代码,不易于理解,所以按照笔者的风格,接下来会使用简单的示例来描述背后的实现模型。

队列的实现策略通常分为公平模式非公平模式,接下来将分别进行说明。

SRE实战 互联网时代守护先锋,助力企业售后服务体系运筹帷幄!一键直达领取阿里云限量特价优惠。

3.2.1、公平模式下的模型:

  公平模式下,底层实现使用的是TransferQueue这个内部队列,它有一个head和tail指针,用于指向当前正在等待匹配的线程节点。
初始化时,TransferQueue的状态如下:SynchronousQueue源码分析 随笔 第1张接着我们进行一些操作:1、线程put1执行 put(1)操作,由于当前没有配对的消费线程,所以put1线程入队列,自旋一小会后睡眠等待,这时队列状态如下:SynchronousQueue源码分析 随笔 第2张2、接着,线程put2执行了put(2)操作,跟前面一样,put2线程入队列,自旋一小会后睡眠等待,这时队列状态如下:SynchronousQueue源码分析 随笔 第3张3、这时候,来了一个线程take1,执行了 take操作,take1线程一定可以和次首节点(head.next)也是匹配的呢?其实大家可以拿个纸画一画,就会发现真的就是这样的。
公平策略总结下来就是:队尾匹配队头出队。
执行后put1线程被唤醒,take1线程的 take()方法返回了1(put1线程的数据),这样就实现了线程间的一对一通信,这时候内部状态如下:SynchronousQueue源码分析 随笔 第4张4、最后,再来一个线程take2,执行take操作,这时候只有put2线程在等候,而且两个线程匹配上了,线程put2被唤醒,
take2线程take操作返回了2(线程put2的数据),这时候队列又回到了起点,如下所示:SynchronousQueue源码分析 随笔 第5张

 

一个节点加进去后,设置这个节点的waiter=这个线程,并且这个线程park阻塞,消费者生产者来了之后,从节点中获取waiter线程唤醒线程线程继续执行,然后设置节点waiter=null。线程中断时候,设置这个线程对应的节点的item是节点自己,来标记这个线程中断了或者超时了

SynchronousQueue源码分析 随笔 第6张

put方法调用,生产者或者消费者进去。增加一个节点进去要2步:尾节点next新增元素,尾节点指向新元素。CAS就是 需要加锁的地方用cas,不需要的地方不用CAS。就是这2步用CAS单线程操作成功

 

第一个节点进来,旋转后设置线程,然后阻塞,后面节点进来,加到队列设置节点的线程,然后阻塞。队列只可能全部是生产者节点或者消费者节点,尾进头出。头没有元素尾有元素。多个生产者或者消费者,入队的时候是多线程同时操作一个共享变量TransferQueue。

void advanceTail(QNode t, QNode nt) {   //TransferQueue的方法
            if (tail == t)  //准备把队列的尾节点从t(局部变量的尾节点)变为tn,tail !=t,说明队列的尾节点已经改变了,别的线程帮助推进了,就不用动了。
                UNSAFE.compareAndSwapObject(this, tailOffset, t, nt);  //设置这个queue的尾节点,从t变成nt(不是这样的),原来的值是t就成功。CAS的时候一定要把原来的值传进去,看原来的值是不是这个,然后更新。
        }
//TransferQueue的方法
void advanceHead(QNode h, QNode nh)  {  //   将head节点从h(原来头结点)变为nh(原来头结点的第一个next节点)
            if (h == head &&   //  原来的head节点地址暂存h,
                UNSAFE.compareAndSwapObject(this, headOffset, h, nh))   //改变head的地址值指向
                h.next = h; // forget old next,原来头结点next指针改变
        }

SynchronousQueue源码分析 随笔 第7张

SynchronousQueue源码分析 随笔 第8张

 

需要进行原子操作的方法有:队列改变头结点,队列改变尾节点,队列改变cleanMe节点,节点修改next值节点修改item值,都用CAS操作,就相当于加锁了,多个线程同时修改共享变量,只有一个成功。
CAS的时候一定要把原来的值传进去,看原来的值是不是这个,然后更新。
一个节点加进去后,设置这个节点的waiter=这个线程,并且这个线程park阻塞,消费者生产者来了之后,从节点中获取waiter线程唤醒线程,线程继续执行完,然后设置节点的waiter=null。线程中断时候,设置这个线程对应的节点的item是节点自己,来标记这个线程中断了。

SynchronousQueue源码分析 随笔 第9张

SynchronousQueue源码分析 随笔 第10张

节点超时或者中断时候,删除节点

SynchronousQueue源码分析 随笔 第11张

扫码关注我们
微信号:SRE实战
拒绝背锅 运筹帷幄