C# 异步编程async/await

  • C# 异步编程async/await已关闭评论
  • 138 次浏览
  • A+
所属分类:.NET技术
摘要

异步这个概念在不同语境下有不同的解释,比如在一个单核CPU里开启两个线程执行两个函数,通常认为这种执行方式是异步的,但对于CPU来说它是单核不可能同时运行两个函数,不过是由系统调度在不同的时间分片中执行。一般来说,如果两个工作能同时进行,就认为是异步的。在编程中,它通常代表函数的调用可以在不执行完的情况下返回,必要时在完成时回调。


概述

异步这个概念在不同语境下有不同的解释,比如在一个单核CPU里开启两个线程执行两个函数,通常认为这种执行方式是异步的,但对于CPU来说它是单核不可能同时运行两个函数,不过是由系统调度在不同的时间分片中执行。一般来说,如果两个工作能同时进行,就认为是异步的。在编程中,它通常代表函数的调用可以在不执行完的情况下返回,必要时在完成时回调。

有一个概念常常被混淆,多线程和异步。很多人认为异步就是多线程的,但是多线程只是实现异步的其中一种方式,除此之外还有系统中断,协程,定时器,甚至可以自己写一个状态机实现异步(C# 的异步实现类似状态机)。

不同的编程语言有不同异步编程方法,在C#语言中,常常使用async/await等关键字,和Task等类来实现异步编程。

C#异步编程用法

class Program {     static void Main(string[] args)     {         var task =  IntTask(); 		Console.WriteLine("等待中...");         Console.WriteLine($"算完了? 让我康康! result = {task.Result}");     }      static async Task<int> IntTask()     {         Console.WriteLine("等3秒吧");         await Task.Delay(3000);         return 1;     } } 

Main函数异步调用IntTask,打印"等三秒吧",随后返回到Main函数打印“等待中”,在task.Result取值时阻塞,三秒后IntTask返回(此时Task.Result被赋值)打印“result = 1”。看一下用法:

  • async: 异步函数使用async关键字修饰
  • await: 等待异步函数返回
  • Task:异步函数有返回值,且返回值为int类型

上述只是一个极简的用法,忽略了大量的细节,可以建立一个初步的印象。

async/await和Task简介

async

用async修饰一个方法,表明这个方法可以异步执行,其返回值必须是void/Task/Task<T>(T是返回值类型)其中一个,方法内的语句至少包含一个await关键字,否则会被同步的方式执行。

await

await只能修饰(返回值是)Task类型变量,此时会返回Task.Result或void而不是Task本身,在上述示例中,Main没有被async修饰,不能使用await,其返回值就是Task<int>, 而IntTask调用Task.Delay就是直接返回void。await也只能在被async修饰的函数的语句中使用。

Task

源于基于任务的异步模式(Task-based Asynchronous Pattern,TAP),被作为异步函数的返回值。异步函数的返回值有三种:

  • void:"fire and forget"(触发并忘记)不需要知道状态(是否完成),比如抛出异常、打印日志时可以使用
  • Task:需要知道是否完成(或失败)等状态,但是不需要返回值
  • Task<T>:在Task的基础上还想要返回值

其他

  • 异步函数不能使用ref/out修饰参数

实现原理剖析

如果使用反汇编等手段,可以看到上述示例代码的编译:
C# 异步编程async/await
在返回1之前,好像有什么“奇怪的东西”被调用,编译器又背着开发者偷偷干了什么呢?

实现原理示例

在微软的开发博客里有一个叫谢尔盖·杰普利亚科夫(Sergey Tepliakov)的毛子曾提到这部分,来看一下他的示例:

源码

class StockPrices {     private Dictionary<string, decimal> _stockPrices;     public async Task<decimal> GetStockPriceForAsync(string companyId)     {         await InitializeMapIfNeededAsync();         _stockPrices.TryGetValue(companyId, out var result);         return result;     }       private async Task InitializeMapIfNeededAsync()     {         if (_stockPrices != null)             return;           await Task.Delay(42);         // Getting the stock prices from the external source and cache in memory.         _stockPrices = new Dictionary<string, decimal> { { "MSFT", 42 } };     } } 

这是他的源代码,这个类叫做StockPrices(股票价格),其核心业务是根据公司ID查询股票价格GetStockPriceForAsync,这是一个异步调用,首先它先异步调用InitializeMapIfNeededAsync对数据库进行初始化,初始化完成尝试从数据库中获取该公司的股票价格返回。
上述提到编译器偷偷自己生成了代码,如果手动实现大概是怎样的呢?来看谢尔盖给出的解:

手动实现

class GetStockPriceForAsync_StateMachine {     enum State { Start, Step1, }     private readonly StockPrices @this;     private readonly string _companyId;     private readonly TaskCompletionSource<decimal> _tcs;     private Task _initializeMapIfNeededTask;     private State _state = State.Start;      public GetStockPriceForAsync_StateMachine(StockPrices @this, string companyId)     {         this.@this = @this;         _companyId = companyId;     }      public void Start()     {         try         {             if (_state == State.Start)             {                 // The code from the start of the method to the first 'await'.                  if (string.IsNullOrEmpty(_companyId))                     throw new ArgumentNullException();                  _initializeMapIfNeededTask = @this.InitializeMapIfNeeded();                  // Update state and schedule continuation                 _state = State.Step1;                 _initializeMapIfNeededTask.ContinueWith(_ => Start());             }             else if (_state == State.Step1)             {                 // Need to check the error and the cancel case first                 if (_initializeMapIfNeededTask.Status == TaskStatus.Canceled)                     _tcs.SetCanceled();                 else if (_initializeMapIfNeededTask.Status == TaskStatus.Faulted)                     _tcs.SetException(_initializeMapIfNeededTask.Exception.InnerException);                 else                 {                     // The code between first await and the rest of the method                      @this._store.TryGetValue(_companyId, out var result);                     _tcs.SetResult(result);                 }             }         }         catch (Exception e)         {             _tcs.SetException(e);         }     }      public Task<decimal> Task => _tcs.Task; }  public Task<decimal> GetStockPriceForAsync(string companyId) {     var stateMachine = new GetStockPriceForAsync_StateMachine(this, companyId);     stateMachine.Start();     return stateMachine.Task; } 

从类名GetStockPriceForAsync_StateMachine可以看到,他为这个异步调用生成了一个状态机来实现异步,先来看下成员变量:

  • StockPrices: 原来那个“股票价格”类的引用
  • _companyId: 调用方法时的参数公司ID
  • _tcs:TaskCompletionSource 创建并完成该任务的来源。
  • _initializeMapIfNeededTask:调用初始化数据的异步任务
  • _state:状态枚举
  • Task:直接就是_tcs.Task,即该任务创建并完成的来源

现在看来这段代码的逻辑就比较清楚了,在调用异步查询股票的接口时,创建了一个状态机并调用状态机的Start函数,第一次进入start函数时状态机的状态是Start状态,它给_initializeMapIfNeededTask赋值,把状态机状态流转到Step1,并让_initializeMapIfNeededTask执行结束末尾再次调用Start函数(ContinueWith)。

_initializeMapIfNeededTask任务在等待了42毫秒后(Task.Delay(42)),末尾时再次调用了Start函数,此时状态为Step1。首先检查了Task状态,符合要求调用_tcs.SetResult(其实是给Task的Result赋值),此时异步任务完成。

TaskCompletionSource

看官方文档给的定义:

表示未绑定到委托的 Task<TResult> 的制造者方,并通过 Task 属性提供对使用者方的访问

简单的示例:

static void Main() { 	TaskCompletionSource<int> tcs1 = new TaskCompletionSource<int>(); 	Task<int> t1 = tcs1.Task;  	// Start a background task that will complete tcs1.Task 	Task.Factory.StartNew(() => 	{ 		Thread.Sleep(1000); 		tcs1.SetResult(15); 	}); } 

看的出来这个类就是对Task的包装,方便创建分发给使用者的任务。其核心就是包装Task并方便外面设置其属性和状态

Task.ContinueWith

创建一个在目标 Task 完成时异步执行的延续任务

可以传入一个委托,在Task完成的末尾调用。这是一个典型的续体传递风格(continuation-pass style)。

续体传递风格

续体传递风格(continuation-pass style, CPS),来看维基百科的描述:

A function written in continuation-passing style takes an extra argument: an explicit "continuation"; i.e., a function of one argument. When the CPS function has computed its result value, it "returns" it by calling the continuation function with this value as the argument. That means that when invoking a CPS function, the calling function is required to supply a procedure to be invoked with the subroutine's "return" value. Expressing code in this form makes a number of things explicit which are implicit in direct style. These include: procedure returns, which become apparent as calls to a continuation; intermediate values, which are all given names; order of argument evaluation, which is made explicit; and tail calls, which simply call a procedure with the same continuation, unmodified, that was passed to the caller

大概的意思是,这种风格的函数比起普通的有一个额外的函数指针参数,调用结束(即将return)调用或者函数参数(替代直接return到调用者Caller)。还有一些其他细节,就不多说了,感兴趣自行翻译查看。

来看一个极简的例子:
int a = b + c + d;
这是一个链式运算,是有顺序的,在C++中,上述运算其实是:
int a = (b + c) + d; 先计算tmp = b + c(tmp是寄存器上一个临时的值,也称将亡值),然后计算 int a = tmp + c
使用链式传递来模拟这一过程:

class Program {     public class Result<T>     {         public T V;     }      static void Main(string[] args)     {         int a = 1;         int b = 2;         int c = 3;         int d = 4;          Result<int> ar = new() { V = a};         Calc3(ar, b, c, d, Calc2);          Console.WriteLine($"a = {ar.V}");     }      static void Calc3(Result<int> ar, int b, int c, int d, Action<Result<int>, int, int> continues)     {         int tmp = b + c;         continues(ar, tmp, d);     }      static void Calc2(Result<int> ar, int tmp, int d)     {         ar.V = tmp + d;     } } 

上述代码应该很清楚了,稍微看下应该能看明白。

C#编译器的实现

struct _GetStockPriceForAsync_d__1 : IAsyncStateMachine {     public StockPrices __this;     public string companyId;     public AsyncTaskMethodBuilder<decimal> __builder;     public int __state;     private TaskAwaiter __task1Awaiter;      public void MoveNext()     {         decimal result;         try         {             TaskAwaiter awaiter;             if (__state != 0)             {                 // State 1 of the generated state machine:                 if (string.IsNullOrEmpty(companyId))                     throw new ArgumentNullException();                  awaiter = __this.InitializeLocalStoreIfNeededAsync().GetAwaiter();                  // Hot path optimization: if the task is completed,                 // the state machine automatically moves to the next step                 if (!awaiter.IsCompleted)                 {                     __state = 0;                     __task1Awaiter = awaiter;                      // The following call will eventually cause boxing of the state machine.                     __builder.AwaitUnsafeOnCompleted(ref awaiter, ref this);                     return;                 }             }             else             {                 awaiter = __task1Awaiter;                 __task1Awaiter = default(TaskAwaiter);                 __state = -1;             }              // GetResult returns void, but it'll throw if the awaited task failed.             // This exception is catched later and changes the resulting task.             awaiter.GetResult();             __this._stocks.TryGetValue(companyId, out result);         }         catch (Exception exception)         {             // Final state: failure             __state = -2;             __builder.SetException(exception);             return;         }          // Final state: success         __state = -2;         __builder.SetResult(result);     }      void IAsyncStateMachine.SetStateMachine(IAsyncStateMachine stateMachine)     {         __builder.SetStateMachine(stateMachine);     } }  [AsyncStateMachine(typeof(_GetStockPriceForAsync_d__1))] public Task<decimal> GetStockPriceFor(string companyId) {     _GetStockPriceForAsync_d__1 _GetStockPriceFor_d__;     _GetStockPriceFor_d__.__this = this;     _GetStockPriceFor_d__.companyId = companyId;     _GetStockPriceFor_d__.__builder = AsyncTaskMethodBuilder<decimal>.Create();     _GetStockPriceFor_d__.__state = -1;     var __t__builder = _GetStockPriceFor_d__.__builder;     __t__builder.Start<_GetStockPriceForAsync_d__1>(ref _GetStockPriceFor_d__);     return _GetStockPriceFor_d__.__builder.Task; } 

比较一下C#编译器生成的状态机:

  • __this:StockPrices“股票价格”类的引用
  • companyId:公司ID参数
  • __builder:AsyncTaskMethodBuilder类型的表示返回任务的异步方法生成器
  • __state:状态索引
  • __task1Awaiter:TaskAwaiter类型,提供等待异步任务完成的对象

上述成员有一些和之前手撸的状态机不太一样,等下面会介绍,先来这一套的逻辑:
首先创建了一个_GetStockPriceForAsync_d__1状态机_GetStockPriceFor_d__并初始化赋值,随后调用了这个状态机的__builder的Start函数并把该状态机作为引用参数传入。__builder.Start函数会调到该状态机的MoveNext函数(下面会介绍),这和手撸代码状态机Start函数调用类似。

MoveNext与Start函数的处理过程也类似:第一次进来__state == -1,__builder.AwaitUnsafeOnCompleted切换上下文执行InitializeLocalStoreIfNeededAsync异步任务,并指定在完成后切换到当前上下文调用该状态机的MoveNext函数,类似手撸代码的Task.ContinueWith。第二次进入时,执行到__builder.SetResult(result),异步任务基本完成。

上述描述也是忽略了一些细节,下面是调用的时序图,会更清楚些,有些不太清楚的点后面会详细介绍。
C# 异步编程async/await

TaskAwaiter

来看下官方定义:

public readonly struct TaskAwaiter : System.Runtime.CompilerServices.ICriticalNotifyCompletion 提供等待异步任务完成的对象

结构:
C# 异步编程async/await
可以看到,这个所谓“等待异步任务完成的对象”,主要是保证实现ICriticalNotifyCompletion的接口OnCompleted等。

AsyncTaskMethodBuilder<TAwaiter,TStateMachine>(TAwaiter, TStateMachine)

官方定义:
C# 异步编程async/await
个人认为可以视为异步任务的“门面”,它负责启动状态机,传递一些中间状态,并在最终SetResult时表示它和其子例程的异步任务结束。其中有一个方法AwaitUnsafeOnCompleted,值得研究一下。

AsyncTaskMethodBuilder.AwaitUnsafeOnCompleted

这个方法在上述中一笔带过,被描述为类似Task.ContinueWith,确实如此,但执行过程相当复杂,在这里也只是简单介绍下过程。

AwaitUnsafeOnCompleted首先会调用GetCompletionAction,GetCompletionAction创建了一个保存了上下文 context = ExecuteContext.FastCapture()的MoveNextRunner,并返回了指向的MoveNextRunner.Run函数的委托。

接着调用参数awaiter的UnsafeOnCompleted(completionAction)函数,这里completionAction就是上述的那个委托,内部调用了成员Task.SetContinuationForAwait函数来初始化续体,SetContinuationForAwait又调用AddTaskContinuation把延续方法添加到Task中,当上述示例源码中的InitializeMapIfNeededAsync函数执行完调用Runner.Run:

[SecuritySafeCritical] internal void Run() {     if (this.m_context != null)     {         try         {             // 我们并未给 s_invokeMoveNext 赋值,所以 callback == null             ContextCallback callback = s_invokeMoveNext;             if (callback == null)             {                 // 将回调设置为下方的 InvokeMoveNext 方法                 s_invokeMoveNext = callback = new                 ContextCallback(AsyncMethodBuilderCore.MoveNextRunner.InvokeMoveNext);             }             ExecutionContext.Run(this.m_context, callback, this.m_stateMachine, true);             return;         }         finally         {             this.m_context.Dispose();         }     }     this.m_stateMachine.MoveNext(); }  [SecurityCritical] private static void InvokeMoveNext(object stateMachine) {     ((IAsyncStateMachine) stateMachine).MoveNext(); } 

((IAsyncStateMachine) stateMachine).MoveNext() 重新调用了状态机的MoveNext()

参考链接