第 10 章:Channel 与并发模式
jerry北京市2026年5月9日Go 22 次阅读 约 15 分钟

深入理解 channel 的类型和用法、select 多路复用,以及常见的并发设计模式。
10.1 Channel 基础
Channel 是 goroutine 之间通信的管道:
// 创建
ch := make(chan int) // 无缓冲 channel
ch := make(chan int, 10) // 缓冲 channel,容量 10
// 发送
ch <- 42
// 接收
v := <-ch
// 关闭
close(ch)
无缓冲 vs 缓冲 Channel
// 无缓冲:发送和接收必须同时就绪(同步)
ch := make(chan int)
go func() { ch <- 1 }() // 发送方阻塞,直到有人接收
v := <-ch // 接收方阻塞,直到有人发送
// 缓冲:缓冲区满时发送阻塞,空时接收阻塞
ch := make(chan int, 3)
ch <- 1 // 不阻塞
ch <- 2 // 不阻塞
ch <- 3 // 不阻塞
ch <- 4 // 阻塞!缓冲区已满
Channel 方向
// 只发送
func producer(ch chan<- int) {
ch <- 42
}
// 只接收
func consumer(ch <-chan int) {
v := <-ch
fmt.Println(v)
}
// 双向 channel 可以隐式转换为单向
ch := make(chan int)
go producer(ch)
go consumer(ch)
10.2 遍历和关闭 Channel
ch := make(chan int, 5)
// 生产者
go func() {
for i := 0; i < 5; i++ {
ch <- i
}
close(ch) // 发送完毕后关闭
}()
// 消费者:使用 range 遍历(channel 关闭后自动退出)
for v := range ch {
fmt.Println(v)
}
// 检测 channel 是否关闭
v, ok := <-ch
if !ok {
fmt.Println("channel 已关闭")
}
关闭 channel 的规则:
- 只有发送方应该关闭 channel
- 关闭已关闭的 channel 会 panic
- 向已关闭的 channel 发送会 panic
- 从已关闭的 channel 接收会立即返回零值
10.3 select 多路复用
select 同时监听多个 channel 操作:
ch1 := make(chan string)
ch2 := make(chan string)
go func() {
time.Sleep(1 * time.Second)
ch1 <- "来自 ch1"
}()
go func() {
time.Sleep(2 * time.Second)
ch2 <- "来自 ch2"
}()
// 哪个先就绪就执行哪个
select {
case msg := <-ch1:
fmt.Println(msg)
case msg := <-ch2:
fmt.Println(msg)
}
超时控制
select {
case result := <-ch:
fmt.Println(result)
case <-time.After(3 * time.Second):
fmt.Println("超时了")
}
非阻塞操作
select {
case v := <-ch:
fmt.Println(v)
default:
fmt.Println("channel 没有数据,不阻塞")
}
10.4 常见并发模式
模式一:Fan-Out / Fan-In
多个 goroutine 从同一个 channel 读取(Fan-Out),多个 channel 的结果汇聚到一个 channel(Fan-In):
// Fan-Out:一个任务分发给多个 worker
func fanOut(input <-chan int, workers int) []<-chan int {
channels := make([]<-chan int, workers)
for i := 0; i < workers; i++ {
channels[i] = worker(input)
}
return channels
}
func worker(input <-chan int) <-chan int {
output := make(chan int)
go func() {
defer close(output)
for n := range input {
output <- n * n // 处理任务
}
}()
return output
}
// Fan-In:多个 channel 合并为一个
func fanIn(channels ...<-chan int) <-chan int {
var wg sync.WaitGroup
merged := make(chan int)
for _, ch := range channels {
wg.Add(1)
go func(c <-chan int) {
defer wg.Done()
for v := range c {
merged <- v
}
}(ch)
}
go func() {
wg.Wait()
close(merged)
}()
return merged
}
模式二:Pipeline(管道)
// 阶段 1:生成数据
func generate(nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
out <- n
}
}()
return out
}
// 阶段 2:平方
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
out <- n * n
}
}()
return out
}
// 阶段 3:过滤
func filter(in <-chan int, predicate func(int) bool) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
if predicate(n) {
out <- n
}
}
}()
return out
}
// 组合管道
nums := generate(1, 2, 3, 4, 5)
squared := square(nums)
even := filter(squared, func(n int) bool { return n%2 == 0 })
for v := range even {
fmt.Println(v) // 4, 16
}
模式三:Worker Pool
func workerPool(jobs <-chan int, results chan<- int, numWorkers int) {
var wg sync.WaitGroup
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for job := range jobs {
fmt.Printf("Worker %d 处理任务 %d\n", id, job)
time.Sleep(time.Second) // 模拟耗时操作
results <- job * 2
}
}(i)
}
go func() {
wg.Wait()
close(results)
}()
}
// 使用
jobs := make(chan int, 100)
results := make(chan int, 100)
workerPool(jobs, results, 3)
// 发送任务
for i := 1; i <= 10; i++ {
jobs <- i
}
close(jobs)
// 收集结果
for r := range results {
fmt.Println("结果:", r)
}
模式四:Context 取消
func longRunningTask(ctx context.Context) error {
for {
select {
case <-ctx.Done():
fmt.Println("任务被取消:", ctx.Err())
return ctx.Err()
default:
fmt.Println("工作中...")
time.Sleep(500 * time.Millisecond)
}
}
}
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
go longRunningTask(ctx)
time.Sleep(3 * time.Second)
10.5 面试要点
-
无缓冲和缓冲 channel 的区别?
- 无缓冲:同步通信,发送和接收必须同时就绪
- 缓冲:异步通信,缓冲区满/空时才阻塞
-
向已关闭的 channel 发送/接收会怎样?
- 发送:panic
- 接收:返回零值和 false
-
select 的执行规则?
- 多个 case 就绪时随机选择一个
- 没有 case 就绪且有 default 时执行 default
- 没有 case 就绪且无 default 时阻塞
-
如何优雅地关闭 channel?
- 只由发送方关闭
- 多个发送方时,使用 sync.Once 或额外的 done channel
-
context 的作用?
- 控制 goroutine 的生命周期
- 传递取消信号、超时、截止时间
- 传递请求范围的值
练习
- 实现一个生产者-消费者模型
- 使用 pipeline 模式处理数据流
- 实现一个带超时的 HTTP 请求(使用 select + time.After)
- 实现一个 worker pool,处理 100 个任务
评论
登录 后发表评论
暂无评论