Fork And Join Task

之前执行异步任务,等待异步结果使用的是Executors.newFixedThreadPool初始化线程连接池。将业务逻辑封装到Callable接口中,使用Future.get()阻塞
主线程来等待所有线程执行完毕。jdk1.7之后已经可以使用另一种方式来优雅的处理这种情况。

ForkJoinPool

当一个任务 需要拆分成2个及2个以上时,可以通过使用ForkJoinPool的execute方法执行ForkJoinTask执行,同样也支持子任务拆分子任务,通fork一样无线fork下去,最后所有子任务向上join到开始任务上。

ForkJoinPool中pool的大小默认采用cpu的核数,相应的可以根据业务量来调整,建议使用cpu的核心数,这样可以充分利用多核性能。

参照并发编程网 http://ifeve.com/fork-join-2/

实现了一个简单任务拆分的例子

    package cn.db.forkJoin;

    /**
     * @author dongbin.yu
     * @from 2018-09-15
     * @since V1.0
     */
    public class Product {

        private String name;

        private double price;

        public Product(String name, double price) {
            this.name = name;
            this.price = price;
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        public double getPrice() {
            return price;
        }

        public void setPrice(double price) {
            this.price = price;
        }
    }


商品类

    package cn.db.forkJoin;

    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.ForkJoinPool;
    import java.util.concurrent.RecursiveAction;

    /**
     * @author dongbin.yu
     * @from 2018-09-15
     * @since V1.0
     */
    public class ProductTask extends RecursiveAction {

        private int first;

        private int last;

        private List<Product> productList;

        public ProductTask(int first, int last, List<Product> productList) {
            this.first = first;
            this.last = last;
            this.productList = productList;
        }

        /**
         * 编写业务逻辑
         */
        @Override
        protected void compute() {

            if ((last - first) > 10) {
                int middle = (last + first) / 2;
                System.out.println("thread queue size: " + getQueuedTaskCount());
                invokeAll(new ProductTask(first, middle + 1, productList),new ProductTask(middle + 1, last, productList));
            } else {
                updatePrice();
            }

        }

        private void updatePrice() {
            for (int i = first; i < last; i++) {
                Product product = productList.get(i);
                product.setPrice(2.5 * i);
            }
        }

        public static void main(String[] args) throws InterruptedException {

            long start = System.currentTimeMillis();
            ForkJoinPool pool = new ForkJoinPool();
            List<Product> productList = new ArrayList<>();
            for (int i = 0; i < 2000; i++) {
                productList.add(new Product("product" + i, 1.0));
            }
            ProductTask productTask = new ProductTask(0, 2000, productList);
            pool.execute(productTask);

            while (!productTask.isDone()) {
                System.out.println("active count " + pool.getActiveThreadCount());
                System.out.println("steal count " + pool.getStealCount());
                System.out.println("parallelism count " + pool.getParallelism());

                Thread.sleep(2);
            }

            pool.shutdown();
            System.out.println("cost time: " + (System.currentTimeMillis() - start));

        }

    }


Task类,如果不需要汇总结果可以继承RecursiveAction类,需要汇总结果时请继承RecursiveTask类,compute方法中写入自己的业务逻辑代码,相应的也可以根据业务拆分子任务。