1.Fork-Join

1.1 分而治之的设计思想

SRE实战 互联网时代守护先锋,助力企业售后服务体系运筹帷幄!一键直达领取阿里云限量特价优惠。

将一个难以直接解决的大问题,分割成一些规模较小的相同问题,以便各个击破,分而治之。

分治策略:

对于一个规模为n的问题,若该问题可以解决,则直接解决,否则将其分解为K个模块较小的子问题,这些子问题相互独立且与原问题形式相同(子问题相互之间有管理就变成了动态规范算法),递归的解决浙西子问题,然后将子问题的解 合并得到原问题的解。

1.2 Fork/Join框架:

并发编程(二)线程并发工具类 随笔 第1张

 

工作密取:

在“生产者—消费者”模式中,生产者和消费者共享一个队列,而在工作密取的情境中,每个消费者都有一个双端队列,在消费者完成了自己队列中的工作时,可以去其他消费者队列的队尾取来工作,而并不会干扰其他消费者的工作。在工作密取情境中,消费者从自己队列的队头取自己的工作,从其他消费者的队尾取别人的工作来完成。

工作密取非常适合于消费者同时也是生产者的情形,当消费者执行工作时发现有更多的工作要做,则可以将这些工作放到自己队列的末尾,也可以送到其他消费者队列的队尾;当自己队列没有工作要做时,可以去其他消费者队列取工作来完成,这样每个消费者都会保持忙碌的状态。

ForkJoinPool中维护着多个线程(一般为CPU核数)在不断地执行Task,每个线程除了执行自己职务内的Task之外,还会根据自己工作线程的闲置情况去获取其他繁忙的工作线程的Task,如此一来就能能够减少线程阻塞或是闲置的时间,提高CPU利用率。

1.3 Fork/Join使用的标准范式

并发编程(二)线程并发工具类 随笔 第2张

 

ForkJoinPool pool = new ForkJoinPool();

MyTask myTask = new MyTask();

pool.invoke(myTask);

result = myTask.join();

=================================================

//invoke方法:
public <T> T invoke(ForkJoinTask<T> task) {
if (task == null)
throw new NullPointerException();
externalPush(task);
return task.join();
}

MyTask为我们自己的任务 可继承自

并发编程(二)线程并发工具类 随笔 第3张

其中常用的有:

1)RecursiveAction ,用于没有返回结果的任务

2)RecursiveTask<V> , 用于有返回结果的任务

Task要通过ForkJoinPool来执行,使用submit 或 invoke 提交,两者的区别是:invoke是同步执行,调用之后需要等待任务完成,才能执行后面的代码;submit(有返回值) ,execute(无返回值) 是异步执行。

并发编程(二)线程并发工具类 随笔 第4张

Task提交后完成后 可以通过task的join()和 get()方法获得返回结果

 

在我们自己实现的compute方法里,首先需要判断任务是否足够小,如果足够小就直接执行任务。如果不足够小,就必须分割成两个子任务,每个子任务在调用invokeAll方法时,又会进入compute方法,看看当前子任务是否需要继续分割成孙任务,如果不需要继续分割,则执行当前子任务并返回结果。使用join方法会等待子任务执行完并得到其结果。

 

get():等待任务执行完成,并返回计算结果

如果当前线程是ForkJoinWorkerThread,调用doJoin方法获取结果

如果当前线程不是ForkerJoinWorkerThread,调用externalInterruptibleAwaitDone方法。

任务执行完成返回后,如果任务完成状态是CANCELLED,抛出CancellationException异常。如果任务完成状态是EXCEPTIONAL,将任务执行过程中抛出的异常包装成ExecutionExcepiton重新抛出。

ForkJoinTask的几种任务状态
volatile int status; // accessed directly by pool and workers
static final int DONE_MASK = 0xf0000000; // mask out non-completion bits
static final int NORMAL = 0xf0000000; // must be negative //已完成
static final int CANCELLED = 0xc0000000; // must be < NORMAL //被取消
static final int EXCEPTIONAL = 0x80000000; // must be < CANCELLED //出现异常
static final int SIGNAL = 0x00010000; // must be >= 1 << 16 //信号
static final int SMASK = 0x0000ffff; // short bits for tags

/**
* Waits if necessary for the computation to complete, and then * retrieves its result. * * @return the computed result * @throws CancellationException if the computation was cancelled * @throws ExecutionException if the computation threw an * exception * @throws InterruptedException if the current thread is not a * member of a ForkJoinPool and was interrupted while waiting */ public final V get() throws InterruptedException, ExecutionException { int s = (Thread.currentThread() instanceof ForkJoinWorkerThread) ? doJoin() : externalInterruptibleAwaitDone(); Throwable ex; if ((s &= DONE_MASK) == CANCELLED) throw new CancellationException(); if (s == EXCEPTIONAL && (ex = getThrowableException()) != null) throw new ExecutionException(ex); return getRawResult(); }

/**
* Implementation for join, get, quietlyJoin. Directly handles
* only cases of already-completed, external wait, and
* unfork+exec. Others are relayed to ForkJoinPool.awaitJoin.
*
* @return status upon completion
*/
private int doJoin() {
int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
return (s = status) < 0 ? s :
((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
(w = (wt = (ForkJoinWorkerThread)t).workQueue).
tryUnpush(this) && (s = doExec()) < 0 ? s :
wt.pool.awaitJoin(w, this, 0L) :
externalAwaitDone();
}
 
/**
* Implementation for invoke, quietlyInvoke.
*
* @return status upon completion
*/
private int doInvoke() {
int s; Thread t; ForkJoinWorkerThread wt;
return (s = doExec()) < 0 ? s :
((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
(wt = (ForkJoinWorkerThread)t).pool.
awaitJoin(wt.workQueue, this, 0L) :
externalAwaitDone();
}
  

join():阻塞当前线程并等待获取结果,得到结果正常则返回值,得到结果异常 则抛出异常!

/**
     * Returns the result of the computation when it {@link #isDone is
     * done}.  This method differs from {@link #get()} in that
     * abnormal completion results in {@code RuntimeException} or
     * {@code Error}, not {@code ExecutionException}, and that
     * interrupts of the calling thread do <em>not</em> cause the
     * method to abruptly return by throwing {@code
     * InterruptedException}.
     *
     * @return the computed result
     */
    public final V join() {
        int s;
        if ((s = doJoin() & DONE_MASK) != NORMAL)
            reportException(s);
        return getRawResult();
    }

pool.invoke() 本身也是调用了task.join() 可以返回结果

 task.invok() 与task.join()类似 只不过调用的是doInvoke() 而非doJoin()

public class UseForkJoin {

    public static final int MAX = 100;

    private static class SumTask extends RecursiveTask<Integer> {

        /**
         * 自定义的任务大小
         */
        private int perSize = MAX / 10;
        /**
         * 起始数
         */
        private int fromIndex;
        /**
         * 结尾数
         */
        private int toIndex;

        public SumTask(int fromIndex, int toIndex) {
            this.fromIndex = fromIndex;
            this.toIndex = toIndex;
        }

        @Override
        protected Integer compute() {
            if (toIndex - fromIndex < perSize) {
                int count = 0;
                for (int i = fromIndex; i <= toIndex; i++) {
                    count = count + i;
                }
                return count;
            } else {
                int mid = (fromIndex + toIndex) / 2;
                SumTask left = new SumTask(fromIndex, mid);
                SumTask right = new SumTask(mid + 1, toIndex);
                invokeAll(left, right);//此处invokeAll会不断的调用执行compute方法 直到满足toIndex-fromIndex<perSize (递归)
                return left.join() + right.join();
            }
        }
    }

    public static void main(String[] args) {
        //范式
        ForkJoinPool pool = new ForkJoinPool();
        SumTask task = new SumTask(0, 100);
//        Integer invoke = pool.invoke(task);
        pool.invoke(task);
        Integer join = task.join();
        System.out.println(join);

    }
}

2.CountDownLatch

CountDownLatch这个类能够使一个线程等待其他线程完成各自的工作后再执行。如主线程在其他初始化线程启动完成后再执行。 闭锁:闭锁是一种同步工具,可以延迟线程直到其达到其终止状态。

CountDowmLatch是通过一个计数器来实现的,计数器的初始值为初始任务量。每当完成一个任务后,计数器的值就会减1(countDown()方法).,当计数器的值达到0时,表示所有任务已完成 然后 闭锁上 等待 await() 方法的线程就可以恢复执行任务。

public class UserCountDownLatch {

    static CountDownLatch latch = new CountDownLatch(6);

    private static class CountThread extends Thread {
        @Override
        public void run() {
            System.out.println("CountThread running...");
            SleepTools.second(1);
            System.out.println("CountThread end");
            latch.countDown();
        }
    }

    private static class BusThread extends Thread {
        @Override
        public void run() {
            try {
                latch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            for (int i = 0; i < 4; i++) {
                System.out.println("BusThread" + Thread.currentThread().getId()
                        + " do business-----");
            }
        }
    }

    private static class afterThread implements Runnable {

        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName() + " init something...");
            latch.countDown();
            System.out.println("init complete!");

            System.out.println("after init do something...");
            SleepTools.second(2);
            System.out.println("after end...");
            latch.countDown();

        }
    }

    public static void main(String[] args) {
        new BusThread().start();
        for (int i = 0; i < 3; i++) {
            new CountThread().start();
        }
        new Thread(new afterThread()).start();
        System.out.println("main is running...");
        SleepTools.second(3);
        System.out.println("main end!");
        latch.countDown();
    }
}

 参考:http://enjoy.ke.qq.com

扫码关注我们
微信号:SRE实战
拒绝背锅 运筹帷幄