之前执行异步任务,等待异步结果使用的是Executors.newFixedThreadPool初始化线程连接池。将业务逻辑封装到Callable接口中,使用Future.get()阻塞
主线程来等待所有线程执行完毕。jdk1.7之后已经可以使用另一种方式来优雅的处理这种情况。
ForkJoinPool
当一个任务 需要拆分成2个及2个以上时,可以通过使用ForkJoinPool的execute方法执行ForkJoinTask执行,同样也支持子任务拆分子任务,通fork一样无线fork下去,最后所有子任务向上join到开始任务上。
ForkJoinPool中pool的大小默认采用cpu的核数,相应的可以根据业务量来调整,建议使用cpu的核心数,这样可以充分利用多核性能。
参照并发编程网 http://ifeve.com/fork-join-2/
实现了一个简单任务拆分的例子
package cn.db.forkJoin;
/**
* @author dongbin.yu
* @from 2018-09-15
* @since V1.0
*/
public class Product {
private String name;
private double price;
public Product(String name, double price) {
this.name = name;
this.price = price;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public double getPrice() {
return price;
}
public void setPrice(double price) {
this.price = price;
}
}
商品类
package cn.db.forkJoin;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
/**
* @author dongbin.yu
* @from 2018-09-15
* @since V1.0
*/
public class ProductTask extends RecursiveAction {
private int first;
private int last;
private List<Product> productList;
public ProductTask(int first, int last, List<Product> productList) {
this.first = first;
this.last = last;
this.productList = productList;
}
/**
* 编写业务逻辑
*/
@Override
protected void compute() {
if ((last - first) > 10) {
int middle = (last + first) / 2;
System.out.println("thread queue size: " + getQueuedTaskCount());
invokeAll(new ProductTask(first, middle + 1, productList),new ProductTask(middle + 1, last, productList));
} else {
updatePrice();
}
}
private void updatePrice() {
for (int i = first; i < last; i++) {
Product product = productList.get(i);
product.setPrice(2.5 * i);
}
}
public static void main(String[] args) throws InterruptedException {
long start = System.currentTimeMillis();
ForkJoinPool pool = new ForkJoinPool();
List<Product> productList = new ArrayList<>();
for (int i = 0; i < 2000; i++) {
productList.add(new Product("product" + i, 1.0));
}
ProductTask productTask = new ProductTask(0, 2000, productList);
pool.execute(productTask);
while (!productTask.isDone()) {
System.out.println("active count " + pool.getActiveThreadCount());
System.out.println("steal count " + pool.getStealCount());
System.out.println("parallelism count " + pool.getParallelism());
Thread.sleep(2);
}
pool.shutdown();
System.out.println("cost time: " + (System.currentTimeMillis() - start));
}
}
Task类,如果不需要汇总结果可以继承RecursiveAction类,需要汇总结果时请继承RecursiveTask类,compute方法中写入自己的业务逻辑代码,相应的也可以根据业务拆分子任务。