第 10 章:Channel 与并发模式

jerry北京市2026年5月9日Go 22 次阅读 约 15 分钟
第 10 章:Channel 与并发模式

深入理解 channel 的类型和用法、select 多路复用,以及常见的并发设计模式。


10.1 Channel 基础

Channel 是 goroutine 之间通信的管道:

// 创建
ch := make(chan int)       // 无缓冲 channel
ch := make(chan int, 10)   // 缓冲 channel,容量 10

// 发送
ch <- 42

// 接收
v := <-ch

// 关闭
close(ch)

无缓冲 vs 缓冲 Channel

// 无缓冲:发送和接收必须同时就绪(同步)
ch := make(chan int)
go func() { ch <- 1 }()  // 发送方阻塞,直到有人接收
v := <-ch                  // 接收方阻塞,直到有人发送

// 缓冲:缓冲区满时发送阻塞,空时接收阻塞
ch := make(chan int, 3)
ch <- 1  // 不阻塞
ch <- 2  // 不阻塞
ch <- 3  // 不阻塞
ch <- 4  // 阻塞!缓冲区已满

Channel 方向

// 只发送
func producer(ch chan<- int) {
    ch <- 42
}

// 只接收
func consumer(ch <-chan int) {
    v := <-ch
    fmt.Println(v)
}

// 双向 channel 可以隐式转换为单向
ch := make(chan int)
go producer(ch)
go consumer(ch)

10.2 遍历和关闭 Channel

ch := make(chan int, 5)

// 生产者
go func() {
    for i := 0; i < 5; i++ {
        ch <- i
    }
    close(ch)  // 发送完毕后关闭
}()

// 消费者:使用 range 遍历(channel 关闭后自动退出)
for v := range ch {
    fmt.Println(v)
}

// 检测 channel 是否关闭
v, ok := <-ch
if !ok {
    fmt.Println("channel 已关闭")
}

关闭 channel 的规则:

  • 只有发送方应该关闭 channel
  • 关闭已关闭的 channel 会 panic
  • 向已关闭的 channel 发送会 panic
  • 从已关闭的 channel 接收会立即返回零值

10.3 select 多路复用

select 同时监听多个 channel 操作:

ch1 := make(chan string)
ch2 := make(chan string)

go func() {
    time.Sleep(1 * time.Second)
    ch1 <- "来自 ch1"
}()

go func() {
    time.Sleep(2 * time.Second)
    ch2 <- "来自 ch2"
}()

// 哪个先就绪就执行哪个
select {
case msg := <-ch1:
    fmt.Println(msg)
case msg := <-ch2:
    fmt.Println(msg)
}

超时控制

select {
case result := <-ch:
    fmt.Println(result)
case <-time.After(3 * time.Second):
    fmt.Println("超时了")
}

非阻塞操作

select {
case v := <-ch:
    fmt.Println(v)
default:
    fmt.Println("channel 没有数据,不阻塞")
}

10.4 常见并发模式

模式一:Fan-Out / Fan-In

多个 goroutine 从同一个 channel 读取(Fan-Out),多个 channel 的结果汇聚到一个 channel(Fan-In):

// Fan-Out:一个任务分发给多个 worker
func fanOut(input <-chan int, workers int) []<-chan int {
    channels := make([]<-chan int, workers)
    for i := 0; i < workers; i++ {
        channels[i] = worker(input)
    }
    return channels
}

func worker(input <-chan int) <-chan int {
    output := make(chan int)
    go func() {
        defer close(output)
        for n := range input {
            output <- n * n  // 处理任务
        }
    }()
    return output
}

// Fan-In:多个 channel 合并为一个
func fanIn(channels ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    merged := make(chan int)

    for _, ch := range channels {
        wg.Add(1)
        go func(c <-chan int) {
            defer wg.Done()
            for v := range c {
                merged <- v
            }
        }(ch)
    }

    go func() {
        wg.Wait()
        close(merged)
    }()

    return merged
}

模式二:Pipeline(管道)

// 阶段 1:生成数据
func generate(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for _, n := range nums {
            out <- n
        }
    }()
    return out
}

// 阶段 2:平方
func square(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            out <- n * n
        }
    }()
    return out
}

// 阶段 3:过滤
func filter(in <-chan int, predicate func(int) bool) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            if predicate(n) {
                out <- n
            }
        }
    }()
    return out
}

// 组合管道
nums := generate(1, 2, 3, 4, 5)
squared := square(nums)
even := filter(squared, func(n int) bool { return n%2 == 0 })

for v := range even {
    fmt.Println(v)  // 4, 16
}

模式三:Worker Pool

func workerPool(jobs <-chan int, results chan<- int, numWorkers int) {
    var wg sync.WaitGroup
    for i := 0; i < numWorkers; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            for job := range jobs {
                fmt.Printf("Worker %d 处理任务 %d\n", id, job)
                time.Sleep(time.Second)  // 模拟耗时操作
                results <- job * 2
            }
        }(i)
    }

    go func() {
        wg.Wait()
        close(results)
    }()
}

// 使用
jobs := make(chan int, 100)
results := make(chan int, 100)

workerPool(jobs, results, 3)

// 发送任务
for i := 1; i <= 10; i++ {
    jobs <- i
}
close(jobs)

// 收集结果
for r := range results {
    fmt.Println("结果:", r)
}

模式四:Context 取消

func longRunningTask(ctx context.Context) error {
    for {
        select {
        case <-ctx.Done():
            fmt.Println("任务被取消:", ctx.Err())
            return ctx.Err()
        default:
            fmt.Println("工作中...")
            time.Sleep(500 * time.Millisecond)
        }
    }
}

ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

go longRunningTask(ctx)
time.Sleep(3 * time.Second)

10.5 面试要点

  1. 无缓冲和缓冲 channel 的区别?

    • 无缓冲:同步通信,发送和接收必须同时就绪
    • 缓冲:异步通信,缓冲区满/空时才阻塞
  2. 向已关闭的 channel 发送/接收会怎样?

    • 发送:panic
    • 接收:返回零值和 false
  3. select 的执行规则?

    • 多个 case 就绪时随机选择一个
    • 没有 case 就绪且有 default 时执行 default
    • 没有 case 就绪且无 default 时阻塞
  4. 如何优雅地关闭 channel?

    • 只由发送方关闭
    • 多个发送方时,使用 sync.Once 或额外的 done channel
  5. context 的作用?

    • 控制 goroutine 的生命周期
    • 传递取消信号、超时、截止时间
    • 传递请求范围的值

练习

  1. 实现一个生产者-消费者模型
  2. 使用 pipeline 模式处理数据流
  3. 实现一个带超时的 HTTP 请求(使用 select + time.After)
  4. 实现一个 worker pool,处理 100 个任务

← 上一章:Goroutine 与并发基础 | 下一章:sync 包与并发安全 →

评论

登录 后发表评论

暂无评论