全部学科
Python全栈
python
NodeJS全栈
nodejs
小程序首页
📅 2026-05-10 10 分钟 ✍️ juanwangdev

Fork/Join 框架

Fork/Join 是 Java 7 引入的并行计算框架,专为分治任务设计。

Fork/Join 原理

Java
大任务
  ↓ fork(拆分)
小任务1  小任务2  小任务3
  ↓ 并行执行
  ↓ join(合并)
结果合并 → 最终结果

核心思想

  • Fork:将大任务拆分为小任务
  • Join:等待小任务完成并合并结果
  • Work-Stealing:空闲线程窃取其他线程任务

ForkJoinPool

Fork/Join 任务执行池:

Java
// 创建 ForkJoinPool
ForkJoinPool pool = new ForkJoinPool();

// 使用默认池(common pool)
ForkJoinPool commonPool = ForkJoinPool.commonPool();

RecursiveTask(有返回值)

Java
public class SumTask extends RecursiveTask<Long> {
    private final long[] array;
    private final int start;
    private final int end;
    private static final int THRESHOLD = 10000;  // 阈值

    public SumTask(long[] array, int start, int end) {
        this.array = array;
        this.start = start;
        this.end = end;
    }

    @Override
    protected Long compute() {
        // 小任务直接计算
        if (end - start <= THRESHOLD) {
            long sum = 0;
            for (int i = start; i < end; i++) {
                sum += array[i];
            }
            return sum;
        }

        // 大任务拆分
        int mid = (start + end) / 2;
        SumTask left = new SumTask(array, start, mid);
        SumTask right = new SumTask(array, mid, end);

        // fork 拆分,join 合并
        left.fork();          // 左任务异步执行
        long rightResult = right.compute();  // 右任务当前线程执行
        long leftResult = left.join();       // 等待左任务完成

        return leftResult + rightResult;
    }
}

// 使用
long[] array = new long[100000];
for (int i = 0; i < array.length; i++) {
    array[i] = i;
}

ForkJoinPool pool = new ForkJoinPool();
SumTask task = new SumTask(array, 0, array.length);
Long result = pool.invoke(task);  // 提交并等待结果
System.out.println("总和:" + result);

RecursiveAction(无返回值)

Java
public class PrintTask extends RecursiveAction {
    private final int start;
    private final int end;
    private static final int THRESHOLD = 5;

    public PrintTask(int start, int end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected void compute() {
        if (end - start <= THRESHOLD) {
            for (int i = start; i < end; i++) {
                System.out.println(Thread.currentThread().getName() + ":" + i);
            }
        } else {
            int mid = (start + end) / 2;
            PrintTask left = new PrintTask(start, mid);
            PrintTask right = new PrintTask(mid, end);
            invokeAll(left, right);  // 同时执行两个任务
        }
    }
}

// 使用
ForkJoinPool pool = new ForkJoinPool();
PrintTask task = new PrintTask(0, 20);
pool.invoke(task);

fork vs invokeAll

方法说明
fork()异步执行,返回 ForkJoinTask
join()等待任务完成,返回结果
invoke()提交并等待完成,相当于 fork+join
invokeAll(task1, task2)同时执行多个任务并等待完成

推荐写法

Java
// 方式1:fork 一个,compute 一个
left.fork();
long rightResult = right.compute();
long leftResult = left.join();

// 方式2:invokeAll 同时执行
invokeAll(left, right);
long leftResult = left.join();
long rightResult = right.join();

阈值设置

阈值决定何时停止拆分:

  • 太小:任务过多,调度开销大
  • 太大:并行度不够,无法充分利用 CPU
  • 一般:1000~10000,根据任务复杂度调整
Java
// 阈值判断
if (size <= THRESHOLD) {
    // 直接计算
} else {
    // 拆分任务
}

Work-Stealing(工作窃取)

空闲线程从其他线程的任务队列尾部窃取任务:

  • 提高线程利用率
  • 避免线程空闲等待
  • 自动负载均衡

Fork/Join vs ThreadPoolExecutor

特性ForkJoinPoolThreadPoolExecutor
任务类型分治任务独立任务
负载均衡Work-Stealing
任务拆分自动拆分不支持
适用场景递归计算、大数据处理独立任务并行

Java 8 parallelStream

底层使用 ForkJoinPool:

text
// 使用 parallelStream 并行处理
long sum = Arrays.stream(array)
    .parallel()  // 开启并行
    .sum();

// 指定 ForkJoinPool
ForkJoinPool pool = new ForkJoinPool(4);
long sum = pool.submit(() ->
    Arrays.stream(array).parallel().sum()
).get();

要点总结

  • Fork/Join 专为分治任务设计
  • RecursiveTask 有返回值,RecursiveAction 无返回值
  • fork() 拆分任务,join() 合并结果
  • invokeAll() 同时执行多个任务
  • 阈值决定拆分粒度,一般 1000~10000
  • Work-Stealing 自动负载均衡
  • parallelStream 底层使用 ForkJoinPool
  • 适用场景:大数组求和、递归计算、大数据处理

📝 发现内容有误?点击此处直接编辑

← 上一篇 Java WebSocket
下一篇 → 原子类与 CAS
想查看更多题目和详细解析?
小程序提供完整的题库、模拟考试和详细解析
马上就来

长按或扫描二维码,立即体验

扫码体验小程序
马上就来
使用微信扫描二维码
立即体验完整题库