第 8 章:Channel 与并发集合

jerry北京市2026年4月20日C# 8 次阅读 约 22 分钟
第 8 章:Channel 与并发集合

深入理解 System.Threading.Channels 的生产者-消费者模式、ConcurrentQueue/Stack/Bag/Dictionary 的实现原理,以及 BlockingCollection 的使用场景。


8.1 System.Threading.Channels

Channel 是 .NET 中实现生产者-消费者模式的首选方案,比 BlockingCollection 更现代,原生支持 async/await。

创建 Channel

// 无界 Channel(无容量限制,写入永远不会阻塞)
var unbounded = Channel.CreateUnbounded<int>();

// 有界 Channel(容量限制,写满后写入会等待)
var bounded = Channel.CreateBounded<int>(new BoundedChannelOptions(100)
{
    FullMode = BoundedChannelFullMode.Wait,       // 满了就等待(默认)
    SingleWriter = false,                          // 是否只有一个生产者
    SingleReader = false,                          // 是否只有一个消费者
    AllowSynchronousContinuations = false          // 是否允许同步延续
});

BoundedChannelFullMode 策略

模式 行为
Wait 等待直到有空间(默认)
DropNewest 丢弃最新的项
DropOldest 丢弃最旧的项
DropWrite 丢弃当前要写入的项

8.2 生产者-消费者模式

基本用法

var channel = Channel.CreateBounded<string>(10);

// 生产者
async Task ProduceAsync(ChannelWriter<string> writer)
{
    for (int i = 0; i < 100; i++)
    {
        await writer.WriteAsync($"消息 {i}");
        Console.WriteLine($"已写入: 消息 {i}");
    }
    writer.Complete(); // 通知消费者:不会再有新数据了
}

// 消费者
async Task ConsumeAsync(ChannelReader<string> reader)
{
    await foreach (var item in reader.ReadAllAsync())
    {
        Console.WriteLine($"已消费: {item}");
        await Task.Delay(50); // 模拟处理
    }
}

// 启动
var produceTask = ProduceAsync(channel.Writer);
var consumeTask = ConsumeAsync(channel.Reader);
await Task.WhenAll(produceTask, consumeTask);

多生产者-多消费者

var channel = Channel.CreateBounded<WorkItem>(new BoundedChannelOptions(200)
{
    SingleWriter = false,
    SingleReader = false
});

// 启动 3 个生产者
var producers = Enumerable.Range(0, 3).Select(id =>
    Task.Run(async () =>
    {
        for (int i = 0; i < 100; i++)
        {
            await channel.Writer.WriteAsync(new WorkItem(id, i));
        }
    }));

// 启动 2 个消费者
var consumers = Enumerable.Range(0, 2).Select(id =>
    Task.Run(async () =>
    {
        await foreach (var item in channel.Reader.ReadAllAsync())
        {
            Console.WriteLine($"消费者 {id} 处理: {item}");
        }
    }));

await Task.WhenAll(producers);
channel.Writer.Complete();
await Task.WhenAll(consumers);

record WorkItem(int ProducerId, int Index);

8.3 Channel 与 IAsyncEnumerable

Channel 天然适合与 IAsyncEnumerable<T> 配合,构建异步数据流管道:

// 数据源:从数据库分页读取
async IAsyncEnumerable<Order> GetOrdersAsync(
    [EnumeratorCancellation] CancellationToken ct = default)
{
    var channel = Channel.CreateBounded<Order>(50);

    _ = Task.Run(async () =>
    {
        int page = 0;
        while (true)
        {
            var orders = await db.Orders.Skip(page * 100).Take(100).ToListAsync(ct);
            if (orders.Count == 0) break;
            foreach (var order in orders)
                await channel.Writer.WriteAsync(order, ct);
            page++;
        }
        channel.Writer.Complete();
    }, ct);

    await foreach (var order in channel.Reader.ReadAllAsync(ct))
    {
        yield return order;
    }
}

// 使用
await foreach (var order in GetOrdersAsync())
{
    await ProcessOrderAsync(order);
}

管道模式(Pipeline)

// 阶段 1:读取原始数据
ChannelReader<string> ReadLines(string path)
{
    var channel = Channel.CreateUnbounded<string>();
    _ = Task.Run(async () =>
    {
        await foreach (var line in File.ReadLinesAsync(path))
            await channel.Writer.WriteAsync(line);
        channel.Writer.Complete();
    });
    return channel.Reader;
}

// 阶段 2:解析数据
ChannelReader<Record> Parse(ChannelReader<string> input)
{
    var channel = Channel.CreateUnbounded<Record>();
    _ = Task.Run(async () =>
    {
        await foreach (var line in input.ReadAllAsync())
            await channel.Writer.WriteAsync(Record.Parse(line));
        channel.Writer.Complete();
    });
    return channel.Reader;
}

// 阶段 3:写入数据库
async Task SaveAsync(ChannelReader<Record> input)
{
    await foreach (var record in input.ReadAllAsync())
        await db.InsertAsync(record);
}

// 组装管道
var lines = ReadLines("data.csv");
var records = Parse(lines);
await SaveAsync(records);

8.4 ConcurrentQueue<T>

线程安全的 FIFO 队列,基于链表 + 分段(Segment)实现:

var queue = new ConcurrentQueue<int>();

// 入队(永远成功)
queue.Enqueue(1);
queue.Enqueue(2);

// 出队(可能失败)
if (queue.TryDequeue(out int item))
    Console.WriteLine(item); // 1

// 查看队首(不移除)
if (queue.TryPeek(out int front))
    Console.WriteLine(front);

Console.WriteLine(queue.Count); // 近似值,非精确

内部结构:

ConcurrentQueue<T> 内部结构:
┌──────────┐    ┌──────────┐    ┌──────────┐
│ Segment  │───→│ Segment  │───→│ Segment  │
│ [32个槽] │    │ [32个槽] │    │ [32个槽] │
│ 已消费   │    │ 部分消费  │    │ 写入中   │
└──────────┘    └──────────┘    └──────────┘
     ↑                               ↑
   _head                           _tail

8.5 ConcurrentStack<T> 与 ConcurrentBag<T>

ConcurrentStack(LIFO)

var stack = new ConcurrentStack<int>();

stack.Push(1);
stack.Push(2);
stack.Push(3);

// 弹出
if (stack.TryPop(out int top))
    Console.WriteLine(top); // 3

// 批量弹出
int[] items = new int[2];
int count = stack.TryPopRange(items); // 更高效

ConcurrentBag(无序)

每个线程有自己的本地列表,减少竞争:

var bag = new ConcurrentBag<int>();

// 并行添加
Parallel.For(0, 1000, i => bag.Add(i));

// 取出(优先从当前线程的本地列表取)
if (bag.TryTake(out int value))
    Console.WriteLine(value);

Console.WriteLine(bag.Count); // 1000(如果没有取出)

选择指南

集合 顺序 最佳场景
ConcurrentQueue<T> FIFO 任务队列、消息传递
ConcurrentStack<T> LIFO 撤销操作、DFS
ConcurrentBag<T> 无序 同一线程生产和消费
ConcurrentDictionary<K,V> 无序 并发缓存、查找表

8.6 BlockingCollection<T>

BlockingCollection<T> 是对并发集合的包装,提供阻塞式的生产者-消费者模式:

// 默认使用 ConcurrentQueue<T>,容量限制为 10
using var collection = new BlockingCollection<int>(10);

// 生产者线程
Task.Run(() =>
{
    for (int i = 0; i < 100; i++)
    {
        collection.Add(i);  // 满了会阻塞
        Console.WriteLine($"添加: {i}");
    }
    collection.CompleteAdding(); // 标记完成
});

// 消费者线程
Task.Run(() =>
{
    foreach (var item in collection.GetConsumingEnumerable())
    {
        Console.WriteLine($"消费: {item}");
        Thread.Sleep(100); // 模拟处理
    }
});

Channel vs BlockingCollection

特性 Channel BlockingCollection
async 支持 原生支持 不支持
性能 更高 较低
背压策略 多种(Wait/Drop) 只有阻塞
推荐程度 ✅ 新代码首选 遗留代码

8.7 实战:限流处理器

class RateLimitedProcessor<T>
{
    private readonly Channel<T> _channel;
    private readonly Func<T, Task> _handler;
    private readonly SemaphoreSlim _semaphore;

    public RateLimitedProcessor(int maxConcurrency, int bufferSize, Func<T, Task> handler)
    {
        _channel = Channel.CreateBounded<T>(bufferSize);
        _handler = handler;
        _semaphore = new SemaphoreSlim(maxConcurrency);
    }

    public async ValueTask EnqueueAsync(T item, CancellationToken ct = default)
        => await _channel.Writer.WriteAsync(item, ct);

    public void Complete() => _channel.Writer.Complete();

    public async Task ProcessAsync(CancellationToken ct = default)
    {
        await foreach (var item in _channel.Reader.ReadAllAsync(ct))
        {
            await _semaphore.WaitAsync(ct);
            _ = Task.Run(async () =>
            {
                try { await _handler(item); }
                finally { _semaphore.Release(); }
            }, ct);
        }
    }
}

// 使用
var processor = new RateLimitedProcessor<string>(
    maxConcurrency: 5, bufferSize: 100,
    handler: async msg => {
        await httpClient.PostAsync(url, new StringContent(msg));
    });

_ = processor.ProcessAsync();
foreach (var msg in messages)
    await processor.EnqueueAsync(msg);
processor.Complete();

8.8 面试要点

  1. Channel 和 BlockingCollection 的区别?

    • Channel 原生支持 async/await,性能更高,支持多种背压策略
    • BlockingCollection 是阻塞式的,适合遗留代码
  2. ConcurrentQueue 的内部实现?

    • 基于链表分段(Segment),每个 Segment 包含固定数量的槽位
    • 使用 CAS 操作实现无锁入队和出队
  3. ConcurrentBag 为什么适合同一线程生产和消费?

    • 每个线程有本地列表(ThreadLocal),优先操作本地数据,减少竞争
  4. 有界 Channel 满了怎么办?

    • 取决于 BoundedChannelFullMode:等待、丢弃最新、丢弃最旧、丢弃当前
  5. 如何用 Channel 实现管道模式?

    • 每个阶段接收 ChannelReader<T> 输入,返回 ChannelReader<T> 输出

练习

  1. 使用 Channel 实现一个日志聚合器:多个生产者写日志,单个消费者批量写入文件
  2. 使用 ConcurrentDictionary 实现一个线程安全的内存缓存(带过期时间)
  3. 对比 Channel 和 BlockingCollection 在高并发下的性能差异
  4. 使用 Channel 管道模式实现 ETL(Extract-Transform-Load)流程

← 上一章:并发编程与线程安全 | 下一章:委托、事件与表达式树 →

评论

登录 后发表评论

暂无评论