Fork/Join 框架
Fork/Join 是 Java 7 引入的并行计算框架,专为分治任务设计。
Fork/Join 原理
Java
大任务
↓ fork(拆分)
小任务1 小任务2 小任务3
↓ 并行执行
↓ join(合并)
结果合并 → 最终结果
核心思想
- Fork:将大任务拆分为小任务
- Join:等待小任务完成并合并结果
- Work-Stealing:空闲线程窃取其他线程任务
ForkJoinPool
Fork/Join 任务执行池:
Java
// 创建 ForkJoinPool
ForkJoinPool pool = new ForkJoinPool();
// 使用默认池(common pool)
ForkJoinPool commonPool = ForkJoinPool.commonPool();
RecursiveTask(有返回值)
Java
public class SumTask extends RecursiveTask<Long> {
private final long[] array;
private final int start;
private final int end;
private static final int THRESHOLD = 10000; // 阈值
public SumTask(long[] array, int start, int end) {
this.array = array;
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
// 小任务直接计算
if (end - start <= THRESHOLD) {
long sum = 0;
for (int i = start; i < end; i++) {
sum += array[i];
}
return sum;
}
// 大任务拆分
int mid = (start + end) / 2;
SumTask left = new SumTask(array, start, mid);
SumTask right = new SumTask(array, mid, end);
// fork 拆分,join 合并
left.fork(); // 左任务异步执行
long rightResult = right.compute(); // 右任务当前线程执行
long leftResult = left.join(); // 等待左任务完成
return leftResult + rightResult;
}
}
// 使用
long[] array = new long[100000];
for (int i = 0; i < array.length; i++) {
array[i] = i;
}
ForkJoinPool pool = new ForkJoinPool();
SumTask task = new SumTask(array, 0, array.length);
Long result = pool.invoke(task); // 提交并等待结果
System.out.println("总和:" + result);
RecursiveAction(无返回值)
Java
public class PrintTask extends RecursiveAction {
private final int start;
private final int end;
private static final int THRESHOLD = 5;
public PrintTask(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected void compute() {
if (end - start <= THRESHOLD) {
for (int i = start; i < end; i++) {
System.out.println(Thread.currentThread().getName() + ":" + i);
}
} else {
int mid = (start + end) / 2;
PrintTask left = new PrintTask(start, mid);
PrintTask right = new PrintTask(mid, end);
invokeAll(left, right); // 同时执行两个任务
}
}
}
// 使用
ForkJoinPool pool = new ForkJoinPool();
PrintTask task = new PrintTask(0, 20);
pool.invoke(task);
fork vs invokeAll
| 方法 | 说明 |
|---|---|
| fork() | 异步执行,返回 ForkJoinTask |
| join() | 等待任务完成,返回结果 |
| invoke() | 提交并等待完成,相当于 fork+join |
| invokeAll(task1, task2) | 同时执行多个任务并等待完成 |
推荐写法
Java
// 方式1:fork 一个,compute 一个
left.fork();
long rightResult = right.compute();
long leftResult = left.join();
// 方式2:invokeAll 同时执行
invokeAll(left, right);
long leftResult = left.join();
long rightResult = right.join();
阈值设置
阈值决定何时停止拆分:
- 太小:任务过多,调度开销大
- 太大:并行度不够,无法充分利用 CPU
- 一般:1000~10000,根据任务复杂度调整
Java
// 阈值判断
if (size <= THRESHOLD) {
// 直接计算
} else {
// 拆分任务
}
Work-Stealing(工作窃取)
空闲线程从其他线程的任务队列尾部窃取任务:
- 提高线程利用率
- 避免线程空闲等待
- 自动负载均衡
Fork/Join vs ThreadPoolExecutor
| 特性 | ForkJoinPool | ThreadPoolExecutor |
|---|---|---|
| 任务类型 | 分治任务 | 独立任务 |
| 负载均衡 | Work-Stealing | 无 |
| 任务拆分 | 自动拆分 | 不支持 |
| 适用场景 | 递归计算、大数据处理 | 独立任务并行 |
Java 8 parallelStream
底层使用 ForkJoinPool:
text
// 使用 parallelStream 并行处理
long sum = Arrays.stream(array)
.parallel() // 开启并行
.sum();
// 指定 ForkJoinPool
ForkJoinPool pool = new ForkJoinPool(4);
long sum = pool.submit(() ->
Arrays.stream(array).parallel().sum()
).get();
要点总结
- Fork/Join 专为分治任务设计
- RecursiveTask 有返回值,RecursiveAction 无返回值
- fork() 拆分任务,join() 合并结果
- invokeAll() 同时执行多个任务
- 阈值决定拆分粒度,一般 1000~10000
- Work-Stealing 自动负载均衡
- parallelStream 底层使用 ForkJoinPool
- 适用场景:大数组求和、递归计算、大数据处理
📝 发现内容有误?点击此处直接编辑