第 8 章:Channel 与并发集合
jerry北京市2026年4月20日C# 8 次阅读 约 22 分钟

深入理解 System.Threading.Channels 的生产者-消费者模式、ConcurrentQueue/Stack/Bag/Dictionary 的实现原理,以及 BlockingCollection 的使用场景。
8.1 System.Threading.Channels
Channel 是 .NET 中实现生产者-消费者模式的首选方案,比 BlockingCollection 更现代,原生支持 async/await。
创建 Channel
// 无界 Channel(无容量限制,写入永远不会阻塞)
var unbounded = Channel.CreateUnbounded<int>();
// 有界 Channel(容量限制,写满后写入会等待)
var bounded = Channel.CreateBounded<int>(new BoundedChannelOptions(100)
{
FullMode = BoundedChannelFullMode.Wait, // 满了就等待(默认)
SingleWriter = false, // 是否只有一个生产者
SingleReader = false, // 是否只有一个消费者
AllowSynchronousContinuations = false // 是否允许同步延续
});
BoundedChannelFullMode 策略
| 模式 | 行为 |
|---|---|
Wait |
等待直到有空间(默认) |
DropNewest |
丢弃最新的项 |
DropOldest |
丢弃最旧的项 |
DropWrite |
丢弃当前要写入的项 |
8.2 生产者-消费者模式
基本用法
var channel = Channel.CreateBounded<string>(10);
// 生产者
async Task ProduceAsync(ChannelWriter<string> writer)
{
for (int i = 0; i < 100; i++)
{
await writer.WriteAsync($"消息 {i}");
Console.WriteLine($"已写入: 消息 {i}");
}
writer.Complete(); // 通知消费者:不会再有新数据了
}
// 消费者
async Task ConsumeAsync(ChannelReader<string> reader)
{
await foreach (var item in reader.ReadAllAsync())
{
Console.WriteLine($"已消费: {item}");
await Task.Delay(50); // 模拟处理
}
}
// 启动
var produceTask = ProduceAsync(channel.Writer);
var consumeTask = ConsumeAsync(channel.Reader);
await Task.WhenAll(produceTask, consumeTask);
多生产者-多消费者
var channel = Channel.CreateBounded<WorkItem>(new BoundedChannelOptions(200)
{
SingleWriter = false,
SingleReader = false
});
// 启动 3 个生产者
var producers = Enumerable.Range(0, 3).Select(id =>
Task.Run(async () =>
{
for (int i = 0; i < 100; i++)
{
await channel.Writer.WriteAsync(new WorkItem(id, i));
}
}));
// 启动 2 个消费者
var consumers = Enumerable.Range(0, 2).Select(id =>
Task.Run(async () =>
{
await foreach (var item in channel.Reader.ReadAllAsync())
{
Console.WriteLine($"消费者 {id} 处理: {item}");
}
}));
await Task.WhenAll(producers);
channel.Writer.Complete();
await Task.WhenAll(consumers);
record WorkItem(int ProducerId, int Index);
8.3 Channel 与 IAsyncEnumerable
Channel 天然适合与 IAsyncEnumerable<T> 配合,构建异步数据流管道:
// 数据源:从数据库分页读取
async IAsyncEnumerable<Order> GetOrdersAsync(
[EnumeratorCancellation] CancellationToken ct = default)
{
var channel = Channel.CreateBounded<Order>(50);
_ = Task.Run(async () =>
{
int page = 0;
while (true)
{
var orders = await db.Orders.Skip(page * 100).Take(100).ToListAsync(ct);
if (orders.Count == 0) break;
foreach (var order in orders)
await channel.Writer.WriteAsync(order, ct);
page++;
}
channel.Writer.Complete();
}, ct);
await foreach (var order in channel.Reader.ReadAllAsync(ct))
{
yield return order;
}
}
// 使用
await foreach (var order in GetOrdersAsync())
{
await ProcessOrderAsync(order);
}
管道模式(Pipeline)
// 阶段 1:读取原始数据
ChannelReader<string> ReadLines(string path)
{
var channel = Channel.CreateUnbounded<string>();
_ = Task.Run(async () =>
{
await foreach (var line in File.ReadLinesAsync(path))
await channel.Writer.WriteAsync(line);
channel.Writer.Complete();
});
return channel.Reader;
}
// 阶段 2:解析数据
ChannelReader<Record> Parse(ChannelReader<string> input)
{
var channel = Channel.CreateUnbounded<Record>();
_ = Task.Run(async () =>
{
await foreach (var line in input.ReadAllAsync())
await channel.Writer.WriteAsync(Record.Parse(line));
channel.Writer.Complete();
});
return channel.Reader;
}
// 阶段 3:写入数据库
async Task SaveAsync(ChannelReader<Record> input)
{
await foreach (var record in input.ReadAllAsync())
await db.InsertAsync(record);
}
// 组装管道
var lines = ReadLines("data.csv");
var records = Parse(lines);
await SaveAsync(records);
8.4 ConcurrentQueue<T>
线程安全的 FIFO 队列,基于链表 + 分段(Segment)实现:
var queue = new ConcurrentQueue<int>();
// 入队(永远成功)
queue.Enqueue(1);
queue.Enqueue(2);
// 出队(可能失败)
if (queue.TryDequeue(out int item))
Console.WriteLine(item); // 1
// 查看队首(不移除)
if (queue.TryPeek(out int front))
Console.WriteLine(front);
Console.WriteLine(queue.Count); // 近似值,非精确
内部结构:
ConcurrentQueue<T> 内部结构:
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Segment │───→│ Segment │───→│ Segment │
│ [32个槽] │ │ [32个槽] │ │ [32个槽] │
│ 已消费 │ │ 部分消费 │ │ 写入中 │
└──────────┘ └──────────┘ └──────────┘
↑ ↑
_head _tail
8.5 ConcurrentStack<T> 与 ConcurrentBag<T>
ConcurrentStack(LIFO)
var stack = new ConcurrentStack<int>();
stack.Push(1);
stack.Push(2);
stack.Push(3);
// 弹出
if (stack.TryPop(out int top))
Console.WriteLine(top); // 3
// 批量弹出
int[] items = new int[2];
int count = stack.TryPopRange(items); // 更高效
ConcurrentBag(无序)
每个线程有自己的本地列表,减少竞争:
var bag = new ConcurrentBag<int>();
// 并行添加
Parallel.For(0, 1000, i => bag.Add(i));
// 取出(优先从当前线程的本地列表取)
if (bag.TryTake(out int value))
Console.WriteLine(value);
Console.WriteLine(bag.Count); // 1000(如果没有取出)
选择指南
| 集合 | 顺序 | 最佳场景 |
|---|---|---|
ConcurrentQueue<T> |
FIFO | 任务队列、消息传递 |
ConcurrentStack<T> |
LIFO | 撤销操作、DFS |
ConcurrentBag<T> |
无序 | 同一线程生产和消费 |
ConcurrentDictionary<K,V> |
无序 | 并发缓存、查找表 |
8.6 BlockingCollection<T>
BlockingCollection<T> 是对并发集合的包装,提供阻塞式的生产者-消费者模式:
// 默认使用 ConcurrentQueue<T>,容量限制为 10
using var collection = new BlockingCollection<int>(10);
// 生产者线程
Task.Run(() =>
{
for (int i = 0; i < 100; i++)
{
collection.Add(i); // 满了会阻塞
Console.WriteLine($"添加: {i}");
}
collection.CompleteAdding(); // 标记完成
});
// 消费者线程
Task.Run(() =>
{
foreach (var item in collection.GetConsumingEnumerable())
{
Console.WriteLine($"消费: {item}");
Thread.Sleep(100); // 模拟处理
}
});
Channel vs BlockingCollection
| 特性 | Channel | BlockingCollection |
|---|---|---|
| async 支持 | 原生支持 | 不支持 |
| 性能 | 更高 | 较低 |
| 背压策略 | 多种(Wait/Drop) | 只有阻塞 |
| 推荐程度 | ✅ 新代码首选 | 遗留代码 |
8.7 实战:限流处理器
class RateLimitedProcessor<T>
{
private readonly Channel<T> _channel;
private readonly Func<T, Task> _handler;
private readonly SemaphoreSlim _semaphore;
public RateLimitedProcessor(int maxConcurrency, int bufferSize, Func<T, Task> handler)
{
_channel = Channel.CreateBounded<T>(bufferSize);
_handler = handler;
_semaphore = new SemaphoreSlim(maxConcurrency);
}
public async ValueTask EnqueueAsync(T item, CancellationToken ct = default)
=> await _channel.Writer.WriteAsync(item, ct);
public void Complete() => _channel.Writer.Complete();
public async Task ProcessAsync(CancellationToken ct = default)
{
await foreach (var item in _channel.Reader.ReadAllAsync(ct))
{
await _semaphore.WaitAsync(ct);
_ = Task.Run(async () =>
{
try { await _handler(item); }
finally { _semaphore.Release(); }
}, ct);
}
}
}
// 使用
var processor = new RateLimitedProcessor<string>(
maxConcurrency: 5, bufferSize: 100,
handler: async msg => {
await httpClient.PostAsync(url, new StringContent(msg));
});
_ = processor.ProcessAsync();
foreach (var msg in messages)
await processor.EnqueueAsync(msg);
processor.Complete();
8.8 面试要点
-
Channel 和 BlockingCollection 的区别?
- Channel 原生支持 async/await,性能更高,支持多种背压策略
- BlockingCollection 是阻塞式的,适合遗留代码
-
ConcurrentQueue 的内部实现?
- 基于链表分段(Segment),每个 Segment 包含固定数量的槽位
- 使用 CAS 操作实现无锁入队和出队
-
ConcurrentBag 为什么适合同一线程生产和消费?
- 每个线程有本地列表(ThreadLocal),优先操作本地数据,减少竞争
-
有界 Channel 满了怎么办?
- 取决于
BoundedChannelFullMode:等待、丢弃最新、丢弃最旧、丢弃当前
- 取决于
-
如何用 Channel 实现管道模式?
- 每个阶段接收
ChannelReader<T>输入,返回ChannelReader<T>输出
- 每个阶段接收
练习
- 使用 Channel 实现一个日志聚合器:多个生产者写日志,单个消费者批量写入文件
- 使用 ConcurrentDictionary 实现一个线程安全的内存缓存(带过期时间)
- 对比 Channel 和 BlockingCollection 在高并发下的性能差异
- 使用 Channel 管道模式实现 ETL(Extract-Transform-Load)流程
评论
登录 后发表评论
暂无评论