第 6 章:Task 与线程池深入

jerry北京市2026年4月12日C# 11 次阅读 约 13 分钟
第 6 章:Task 与线程池深入

理解 Task 的生命周期、TaskScheduler、线程池的工作窃取机制,以及 ValueTask 的使用场景。


6.1 Task 的本质

Task 是对异步操作的抽象,代表一个"将来会完成的操作":

// Task 的关键状态
public enum TaskStatus
{
    Created,              // 已创建,未启动
    WaitingForActivation, // 等待调度(async 方法返回的 Task)
    WaitingToRun,         // 已排队,等待线程池线程
    Running,              // 正在执行
    WaitingForChildrenToComplete,
    RanToCompletion,      // 成功完成
    Canceled,             // 被取消
    Faulted               // 出现异常
}

创建 Task 的方式

// 1. async 方法(最常用)
async Task<int> GetValueAsync() => await ...;

// 2. Task.Run(将 CPU 密集型工作放到线程池)
var task = Task.Run(() => HeavyComputation());

// 3. Task.FromResult(已完成的 Task,无分配开销)
Task<int> cached = Task.FromResult(42);

// 4. TaskCompletionSource(手动控制 Task 的完成)
var tcs = new TaskCompletionSource<int>();
// 某个时刻...
tcs.SetResult(42);       // 或 tcs.SetException(ex);
Task<int> task = tcs.Task;

// 5. Task.Factory.StartNew(高级场景,通常用 Task.Run 替代)
var task = Task.Factory.StartNew(() => Work(),
    CancellationToken.None,
    TaskCreationOptions.LongRunning,  // 提示线程池创建专用线程
    TaskScheduler.Default);

6.2 线程池(ThreadPool)

工作原理

┌─────────────────────────────────────────┐
│              ThreadPool                  │
│                                         │
│  全局队列:[Task] [Task] [Task]          │
│                                         │
│  Worker Thread 1: 本地队列 [Task][Task] │
│  Worker Thread 2: 本地队列 [Task]       │
│  Worker Thread 3: 本地队列 []           │ ← 空闲,从其他线程偷取
│  Worker Thread 4: 本地队列 [Task]       │
│                                         │
│  I/O 完成端口线程(处理异步 I/O 回调)   │
└─────────────────────────────────────────┘

工作窃取(Work Stealing):

  • 每个工作线程有自己的本地队列(LIFO)
  • 本地队列为空时,从全局队列取任务
  • 全局队列也为空时,从其他线程的本地队列偷取(FIFO)

线程池配置

// 查看线程池信息
ThreadPool.GetMinThreads(out int workerMin, out int ioMin);
ThreadPool.GetMaxThreads(out int workerMax, out int ioMax);
ThreadPool.GetAvailableThreads(out int workerAvail, out int ioAvail);

// 设置最小线程数(避免线程饥饿时的缓慢增长)
ThreadPool.SetMinThreads(100, 100);

// 设置最大线程数
ThreadPool.SetMaxThreads(200, 200);

线程饥饿(Thread Starvation)

// 危险:在线程池线程上同步等待
async Task BadPattern()
{
    // 所有线程池线程都在等待 → 没有线程处理新任务 → 死锁
    var result = SomeAsyncMethod().Result;  // 阻塞线程池线程
}

6.3 Task 组合

// 等待所有完成
Task[] tasks = Enumerable.Range(0, 10)
    .Select(i => ProcessAsync(i))
    .ToArray();
await Task.WhenAll(tasks);

// 等待任一完成
Task<int> first = await Task.WhenAny(task1, task2, task3);

// 超时控制
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
try
{
    await LongRunningAsync(cts.Token);
}
catch (OperationCanceledException)
{
    Console.WriteLine("超时");
}

// 并行限制(SemaphoreSlim)
var semaphore = new SemaphoreSlim(10);  // 最多 10 个并发
var tasks = urls.Select(async url =>
{
    await semaphore.WaitAsync();
    try { return await httpClient.GetStringAsync(url); }
    finally { semaphore.Release(); }
});
await Task.WhenAll(tasks);

6.4 ValueTask

ValueTask<T> 是为了减少异步方法的堆分配而设计的:

// Task<T>:每次调用都分配一个 Task 对象(堆分配)
async Task<int> GetValueAsync()
{
    if (_cache.TryGetValue(key, out var value))
        return value;  // 同步完成,但仍然分配了 Task<int>
    return await LoadFromDbAsync(key);
}

// ValueTask<T>:同步完成时零分配
async ValueTask<int> GetValueAsync()
{
    if (_cache.TryGetValue(key, out var value))
        return value;  // 同步完成,ValueTask 是 struct,无堆分配
    return await LoadFromDbAsync(key);
}

ValueTask 的限制:

  • 只能 await 一次
  • 不能并发 await
  • 不能调用 .Result 多次
  • 不能用 Task.WhenAll(需要先 .AsTask()
// 错误用法
var vt = GetValueAsync();
var result1 = await vt;
var result2 = await vt;  // 错误!不能 await 两次

// 正确:如果需要多次使用,转为 Task
var task = GetValueAsync().AsTask();

6.5 CancellationToken

// 创建取消令牌
using var cts = new CancellationTokenSource();

// 超时自动取消
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));

// 手动取消
cts.Cancel();

// 在异步方法中检查取消
async Task ProcessAsync(CancellationToken ct)
{
    for (int i = 0; i < 1000; i++)
    {
        ct.ThrowIfCancellationRequested();  // 抛出 OperationCanceledException
        await Task.Delay(100, ct);          // 传递 token 给内部操作
    }
}

// 注册取消回调
ct.Register(() => Console.WriteLine("操作被取消"));

// 链接多个 CancellationToken
using var linked = CancellationTokenSource.CreateLinkedTokenSource(ct1, ct2);
// 任一 token 取消时,linked.Token 也会取消

6.6 面试要点

  1. Task.Run 和 Task.Factory.StartNew 的区别?

    • Task.Run 是简化版,默认使用线程池
    • StartNew 更灵活,支持 LongRunning 等选项
  2. ValueTask 和 Task 的区别?

    • ValueTask 是 struct,同步完成时零分配
    • 只能 await 一次,有使用限制
  3. 线程池的工作窃取机制?

    • 每个线程有本地队列,空闲时从其他线程偷取任务
  4. 如何避免线程饥饿?

    • 不要在线程池线程上同步等待(.Result/.Wait())
    • 使用 async/await 全链路
  5. CancellationToken 的作用?

    • 协作式取消,异步操作可以检查并响应取消请求

练习

  1. 使用 Task.WhenAll + SemaphoreSlim 实现并发限制的批量请求
  2. 对比 Task<int>ValueTask<int> 在缓存命中场景下的分配差异
  3. 实现一个带超时和取消的异步操作
  4. 使用 ThreadPool.GetAvailableThreads 监控线程池状态

← 上一章:async/await 的本质 | 下一章:并发编程与线程安全 →

评论

登录 后发表评论

暂无评论