全部学科
Python全栈
python
NodeJS全栈
nodejs
小程序首页
📅 2026-05-13 9 分钟 ✍️ juanwangdev

Go常见并发模式详解

Go提供简洁的并发模式,实现高效并行处理。

Worker Pool工作池

基本实现

Go
func workerPool(tasks []Task, workers int) {
    taskCh := make(chan Task, len(tasks))
    resultCh := make(chan Result, len(tasks))

    // 启动worker
    for i := 0; i < workers; i++ {
        go worker(taskCh, resultCh)
    }

    // 分发任务
    for _, task := range tasks {
        taskCh <- task
    }
    close(taskCh)

    // 收集结果
    for i := 0; i < len(tasks); i++ {
        result := <-resultCh
        processResult(result)
    }
}

func worker(taskCh <-chan Task, resultCh chan<- Result) {
    for task := range taskCh {
        result := process(task)
        resultCh <- result
    }
}

控制并发数

Go
// 使用带缓冲channel限制并发
sem := make(chan struct{}, 10)  // 最大10并发

for _, task := range tasks {
    sem <- struct{}{}  // 获取令牌
    go func(t Task) {
        defer func() { <-sem }()  // 释放令牌
        process(t)
    }(task)
}

// 等待全部完成
for i := 0; i < cap(sem); i++ {
    sem <- struct{}{}
}

Pipeline流水线

串行处理

Go
func pipeline() {
    // 阶段1:生成数据
    gen := func(nums ...int) <-chan int {
        out := make(chan int)
        go func() {
            for _, n := range nums {
                out <- n
            }
            close(out)
        }()
        return out
    }

    // 阶段2:处理数据
    sq := func(in <-chan int) <-chan int {
        out := make(chan int)
        go func() {
            for n := range in {
                out <- n * n
            }
            close(out)
        }()
        return out
    }

    // 阶段3:消费数据
    for n := range sq(gen(1, 2, 3)) {
        fmt.Println(n)  // 1, 4, 9
    }
}

可取消的pipeline

Go
func pipelineWithContext(ctx context.Context) {
    in := gen(1, 2, 3)

    out := make(chan int)
    go func() {
        defer close(out)
        for {
            select {
            case n, ok := <-in:
                if !ok {
                    return
                }
                out <- n * n
            case <-ctx.Done():
                return  // 取消
            }
        }
    }()
}

Fan-in/Fan-out扇入扇出

Fan-out分发

Go
func fanOut(in <-chan int, n int) []<-chan int {
    outputs := make([]<-chan int, n)

    for i := 0; i < n; i++ {
        outputs[i] = process(in)  // 每个worker独立处理
    }

    return outputs
}

func process(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            out <- n * n
        }
    }()
    return out
}

Fan-in合并

Go
func fanIn(inputs ...<-chan int) <-chan int {
    out := make(chan int)
    var wg sync.WaitGroup

    wg.Add(len(inputs))
    for _, in := range inputs {
        go func(ch <-chan int) {
            defer wg.Done()
            for n := range ch {
                out <- n
            }
        }(in)
    }

    go func() {
        wg.Wait()
        close(out)
    }()

    return out
}

// 使用
merged := fanIn(fanOut(source, 3)...)

errgroup并发错误处理

基本用法

Go
import "golang.org/x/sync/errgroup"

func processAll(tasks []Task) error {
    g, ctx := errgroup.WithContext(context.Background())

    for _, task := range tasks {
        g.Go(func() error {
            return processTask(ctx, task)
        })
    }

    return g.Wait()  // 任一错误即返回
}

限制并发数

Go
func processWithLimit(tasks []Task) error {
    g := &errgroup.Group{}
    sem := make(chan struct{}, 5)  // 最大5并发

    for _, task := range tasks {
        sem <- struct{}{}  // 获取令牌

        g.Go(func() error {
            defer func() { <-sem }()  // 释放
            return process(task)
        })
    }

    return g.Wait()
}

Graceful Shutdown优雅退出

基本实现

Go
func main() {
    stop := make(chan os.Signal, 1)
    signal.Notify(stop, syscall.SIGINT, syscall.SIGTERM)

    // 启动服务
    go serve()

    // 等待信号
    <-stop
    fmt.Println("收到退出信号")

    // 清理资源
    cleanup()
}

超时退出

Go
func main() {
    stop := make(chan os.Signal, 1)
    signal.Notify(stop, syscall.SIGINT, syscall.SIGTERM)

    go serve()

    select {
    case <-stop:
        fmt.Println("收到信号")
    case <-time.After(30 * time.Second):
        fmt.Println("超时")
    }

    // 限时清理
    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    defer cancel()

    if err := shutdown(ctx); err != nil {
        fmt.Println("清理超时")
    }
}

并发模式对比

模式用途特点
Worker Pool任务并行控制并发数
Pipeline串行处理分阶段解耦
Fan-out分发处理多worker并行
Fan-in合合结果统一输出
errgroup错误处理任一错误即停
Semaphore并发限制令牌控制
Graceful优雅退出清理资源

模式选择指南

  • 需要控制并发数:Worker Pool或Semaphore
  • 数据分阶段处理:Pipeline
  • 多worker并行处理:Fan-out/Fan-in
  • 需要错误收集:errgroup
  • 服务需要退出:Graceful Shutdown

要点总结

  • Worker Pool控制并发worker数量
  • Pipeline分阶段串行处理数据
  • Fan-out分发任务到多worker
  • Fan-in合并多worker结果
  • errgroup统一处理并发错误
  • Semaphore用channel限制并发
  • Graceful Shutdown清理后退出
  • 模式组合实现复杂并发逻辑

📝 发现内容有误?点击此处直接编辑

← 上一篇 Go goroutine基本概念与创建
下一篇 → Go并发同步原语详解
想查看更多题目和详细解析?
小程序提供完整的题库、模拟考试和详细解析
马上就来

长按或扫描二维码,立即体验

扫码体验小程序
马上就来
使用微信扫描二维码
立即体验完整题库