async_await 源码分析

  • async_await 源码分析已关闭评论
  • 83 次浏览
  • A+
所属分类:.NET技术
摘要

这篇文章主要是分析 async/await 这个语法糖,分析一下 async 和 await 是如何做到异步的。首先,我先抛出两个问题,各位可以先想一下。


async/await 源码解析

这篇文章主要是分析 async/await 这个语法糖,分析一下 async 和 await 是如何做到异步的。首先,我先抛出两个问题,各位可以先想一下。

  1. await 之后的方法是何时执行,如何执行的?
  2. 为什么 await 之后的代码会在不同的线程执行?

demo

要想知道 async/await 是怎么运行的,需要先写一个demo,然后进行一下反编译,就可以得到 async/await 编译后的代码,然后就可以开始分析了。
下面是简单使用 async/await 的demo:

static async Task Main(string[] args) {        Console.WriteLine("1"+Thread.CurrentThread.ManagedThreadId.ToString());        await GetThreadID2();        Console.WriteLine("2"+Thread.CurrentThread.ManagedThreadId.ToString()); } public async static Task GetThreadID2() {        Console.WriteLine("3"+Thread.CurrentThread.ManagedThreadId.ToString());        await Task.Run(() => Console.WriteLine("4"+Thread.CurrentThread.ManagedThreadId.ToString()));                  Console.WriteLine("5"+Thread.CurrentThread.ManagedThreadId.ToString()); } 

然后我们来反编译一下这段代码(可以使用 dnSpy 进行反编译):

namespace AsyncTest {     internal class Program     {         [DebuggerStepThrough]         private static Task Main(string[] args)         {             Program.<Main>d__0 <Main>d__ = new Program.<Main>d__0();             <Main>d__.<>t__builder = AsyncTaskMethodBuilder.Create();             <Main>d__.args = args;             <Main>d__.<>1__state = -1;             <Main>d__.<>t__builder.Start<Program.<Main>d__0>(ref <Main>d__);             return <Main>d__.<>t__builder.Task;         }         [DebuggerStepThrough]         public static Task GetThreadID2()         {             Program.<GetThreadID2>d__2 <GetThreadID2>d__ = new Program.<GetThreadID2>d__2();             <GetThreadID2>d__.<>t__builder = AsyncTaskMethodBuilder.Create();             <GetThreadID2>d__.<>1__state = -1;             <GetThreadID2>d__.<>t__builder.Start<Program.<GetThreadID2>d__2>(ref <GetThreadID2>d__);             return <GetThreadID2>d__.<>t__builder.Task;         }          [DebuggerStepThrough]         private static void <Main>(string[] args)         {             Program.Main(args).GetAwaiter().GetResult();         }         [CompilerGenerated]         private sealed class <Main>d__0 : IAsyncStateMachine         {             void IAsyncStateMachine.MoveNext()             {                 int num = this.<>1__state;                 try                 {                     TaskAwaiter awaiter;                     if (num != 0)                     {                         Console.WriteLine("1" + Thread.CurrentThread.ManagedThreadId.ToString());                         awaiter = Program.GetThreadID2().GetAwaiter();                         if (!awaiter.IsCompleted)                         {                             this.<>1__state = 0;                             this.<>u__1 = awaiter;                             Program.<Main>d__0 <Main>d__ = this;                             this.<>t__builder.AwaitUnsafeOnCompleted<TaskAwaiter, Program.<Main>d__0>(ref awaiter, ref <Main>d__);                             return;                         }                     }                     else                     {                         awaiter = this.<>u__1;                         this.<>u__1 = default(TaskAwaiter);                         this.<>1__state = -1;                     }                     awaiter.GetResult();                     Console.WriteLine("2" + Thread.CurrentThread.ManagedThreadId.ToString());                     Console.ReadLine();                 }                 catch (Exception exception)                 {                     this.<>1__state = -2;                     this.<>t__builder.SetException(exception);                     return;                 }                 this.<>1__state = -2;                 this.<>t__builder.SetResult();             }         }         [CompilerGenerated]         [Serializable]         private sealed class <>c         {             internal void <GetThreadID2>b__2_0()             {                 Console.WriteLine("4" + Thread.CurrentThread.ManagedThreadId.ToString());             }             public static readonly Program.<>c <>9 = new Program.<>c();              public static Func<string> <>9__1_0;              public static Action <>9__2_0;         }         [CompilerGenerated]         private sealed class <GetThreadID2>d__2 : IAsyncStateMachine         {             void IAsyncStateMachine.MoveNext()             {                 int num = this.<>1__state;                 try                 {                     TaskAwaiter awaiter;                     if (num != 0)                     {                         Console.WriteLine("3" + Thread.CurrentThread.ManagedThreadId.ToString());                         awaiter = Task.Run(new Action(Program.<>c.<>9.<GetThreadID2>b__2_0)).GetAwaiter();                         if (!awaiter.IsCompleted)                         {                             this.<>1__state = 0;                             this.<>u__1 = awaiter;                             Program.<GetThreadID2>d__2 <GetThreadID2>d__ = this;                             this.<>t__builder.AwaitUnsafeOnCompleted<TaskAwaiter, Program.<GetThreadID2>d__2>(ref awaiter, ref <GetThreadID2>d__);                             return;                         }                     }                     else                     {                         awaiter = this.<>u__1;                         this.<>u__1 = default(TaskAwaiter);                         this.<>1__state = -1;                     }                     awaiter.GetResult();                     Console.WriteLine("5" + Thread.CurrentThread.ManagedThreadId.ToString());                 }                 catch (Exception exception)                 {                     this.<>1__state = -2;                     this.<>t__builder.SetException(exception);                     return;                 }                 this.<>1__state = -2;                 this.<>t__builder.SetResult();             }             [DebuggerHidden]             void IAsyncStateMachine.SetStateMachine(IAsyncStateMachine stateMachine){}              public int <>1__state;              public AsyncTaskMethodBuilder <>t__builder;              private TaskAwaiter <>u__1;         }     } } 

上面的代码是反编译出来的所有代码,如果各位选择太长不看的话,这里还有个简化版本:

private static Task Main(string[] args)  <Main>d__.<>t__builder.Start<Program.<Main>d__0>(ref <Main>d__);  <Main>d__0.MoveNext()      <GetThreadID2>d__.<>t__builder.Start<Program.<GetThreadID2>d__2>(ref <GetThreadID2>d__);     <GetThreadID2>d__2.MoveNext()     <GetThreadID2>d__2.<>t__builder.AwaitUnsafeOnCompleted<TaskAwaiter, Program.<GetThreadID2>d__2>(ref awaiter, ref <GetThreadID2>d__);     <GetThreadID2>d__2.MoveNext() <Main>d__0.<>t__builder.AwaitUnsafeOnCompleted<TaskAwaiter, Program.<Main>d__0>(ref awaiter, ref <Main>d__);  <Main>d__0.MoveNext()  

入口

我们先来看一下 Main 方法。

private static Task Main(string[] args)         {     Program.<Main>d__0 <Main>d__ = new Program.<Main>d__0();     <Main>d__.<>t__builder = AsyncTaskMethodBuilder.Create();     <Main>d__.args = args;     <Main>d__.<>1__state = -1;     <Main>d__.<>t__builder.Start<Program.<Main>d__0>(ref <Main>d__);     return <Main>d__.<>t__builder.Task; } 

首先第一行,构建了一个 Program.<Main>d__0 类型实例,嗯? 这个类哪里来的,没写过呀。
哎嘿,这个类就是编译器帮我们实现的。每一个 async/await 方法,编译器都会帮我们实现一个这样的类。
简单分析一下这个方法,实例化一个 Program.<Main>d__0(), 初始化 1__state 值为-1,调用 Program.<Main>d__0()Start 方法。

Start

public void Start<TStateMachine>(ref TStateMachine stateMachine) where TStateMachine : IAsyncStateMachine {     ExecutionContextSwitcher executionContextSwitcher = default(ExecutionContextSwitcher);     RuntimeHelpers.PrepareConstrainedRegions();     try     {         ExecutionContext.EstablishCopyOnWriteScope(ref executionContextSwitcher);         stateMachine.MoveNext();     }     finally     {         executionContextSwitcher.Undo();     } } 

Start 代码的主要作用就是启动 stateMachine,即 stateMachine.MoveNext()

MoveNext

MoveNext 是 async/await 里非常重要的一段代码,这个方法控制了整个异步任务的执行步骤。
我们可以将其分解成三个部分,即

  1. await 之前
  2. await 执行
  3. await 之后的代码
    具体的分析各位请看注释。
void IAsyncStateMachine.MoveNext() {     // 这里 <>1__state 之前初始化的时候是 -1,所以 num 就是-1      int num = this.<>1__state;     try     {         TaskAwaiter awaiter;         // 第一次进来,num 是 -1 所以接下来的逻辑         if (num != 0)         {             // 这里执行 await 之前的代码             Console.WriteLine("1" + Thread.CurrentThread.ManagedThreadId.ToString());             // 这里就是执行  await 方法             awaiter = Program.GetThreadID2().GetAwaiter();             // 判断是否执行完成,通常第一次进来 IsCompleted 都是 false             if (!awaiter.IsCompleted)             {                 // 修改状态,下次再执行 MoveNext 就不会继续走这段逻辑来了                 this.<>1__state = 0;                 this.<>u__1 = awaiter;                 Program.<Main>d__0 <Main>d__ = this;                 // 这里其实是往 Task 里添加了一个 任务结束时的回调,在任务结束时会再次调用 MoveNext                   // 这就解释了为什么 await 之后的方法是另外一个线程,因为await 之后的方法是在 下一个 MoveNext 的里调用的                 // 而那个 MoveNext 是由线程池挑选一个线程进行执行的                               this.<>t__builder.AwaitUnsafeOnCompleted<TaskAwaiter, Program.<Main>d__0>(ref awaiter, ref <Main>d__);                 return;             }         }         else         {         awaiter = this.<>u__1;             this.<>u__1 = default(TaskAwaiter);             this.<>1__state = -1;         }         // GetResult 里有一个 死循环 等待结果         awaiter.GetResult();         // await 之后的方法          Console.WriteLine("2" + Thread.CurrentThread.ManagedThreadId.ToString());         Console.ReadLine();     }     catch (Exception exception)     {         this.<>1__state = -2;         this.<>t__builder.SetException(exception);         return;     }     this.<>1__state = -2;     this.<>t__builder.SetResult(); } 

await 之前的代码

// 这里执行 await 之前的代码 Console.WriteLine("1" + Thread.CurrentThread.ManagedThreadId.ToString()); 

await 之前的代码很简单,就是正常依次执行。

await 的代码

await 的代码大都以 Task 形式执行,主要分为两步。
第一步,将 Task 推入线程池中。

awaiter = Task.Run(new Action(Program.<>c.<>9.<GetThreadID2>b__3_0)).GetAwaiter(); public static Task Run(Action action)         {                 StackCrawlMark stackCrawlMark = StackCrawlMark.LookForMyCaller;                 return Task.InternalStartNew(null, action, null, default(CancellationToken), TaskScheduler.Default, TaskCreationOptions.DenyChildAttach, InternalTaskOptions.None, ref stackCrawlMark);        } internal static Task InternalStartNew(Task creatingTask, Delegate action, object state, CancellationToken cancellationToken, TaskScheduler scheduler, TaskCreationOptions options, InternalTaskOptions internalOptions, ref StackCrawlMark stackMark)         {                      Task task = new Task(action, state, creatingTask, cancellationToken, options, internalOptions | InternalTaskOptions.QueuedByRuntime, scheduler);                           task.ScheduleAndStart(false);                 return task;         } internal void ScheduleAndStart(bool needsProtection) {     try     {      this.m_taskScheduler.InternalQueueTask(this);     }     catch (ThreadAbortException exceptionObject){} } internal void InternalQueueTask(Task task)         {                         this.QueueTask(task);        } 

第二步,线程池执行 Task。

void IThreadPoolWorkItem.ExecuteWorkItem()         {                 this.ExecuteEntry(false);         } internal bool ExecuteEntry(bool bPreventDoubleExecution) {      if (!this.IsCancellationRequested && !this.IsCanceled)     {         this.ExecuteWithThreadLocal(ref Task.t_currentTask);     }     return true; } private void ExecuteWithThreadLocal(ref Task currentTaskSlot) {     Task task = currentTaskSlot;     TplEtwProvider log = TplEtwProvider.Log;     Guid currentThreadActivityId = default(Guid);     bool flag = log.IsEnabled();     try     {         currentTaskSlot = this;         ExecutionContext capturedContext = this.CapturedContext;         if (capturedContext == null)         {             this.Execute();         }         else         {             if (this.IsSelfReplicatingRoot || this.IsChildReplica)             {                 this.CapturedContext = Task.CopyExecutionContext(capturedContext);             }             ContextCallback contextCallback = Task.s_ecCallback;             if (contextCallback == null)             {                 contextCallback = (Task.s_ecCallback = new ContextCallback(Task.ExecutionContextCallback));             }             // 这里是执行 Task 方法的地方             ExecutionContext.Run(capturedContext, contextCallback, this, true);         }         this.Finish(true);     }     finally     {         currentTaskSlot = task;     } } 

await 之后的代码

那么 await 之后的代码是如何执行的呢?
大家回头看一下 MoveNext 方法,可以看到首次执行 MoveNext 的时候,代码的逻辑是这样的

if(num!=0) -> if(!awaiter.IsCompleted) -> AwaitUnsafeOnCompleted;return; 

关键就在于 AwaitUnsafeOnCompleted 这个方法,来看一下 AwaitUnsafeOnCompleted 方法。

public void AwaitUnsafeOnCompleted<TAwaiter, TStateMachine>(ref TAwaiter awaiter, ref TStateMachine stateMachine) where TAwaiter : ICriticalNotifyCompletion where TStateMachine : IAsyncStateMachine {     try     {         AsyncMethodBuilderCore.MoveNextRunner runner = null;         // 这里是准备一个 任务执行完的回调,简单理解成是将 MoveNext 包装成一个 action         Action completionAction = this.m_coreState.GetCompletionAction(AsyncCausalityTracer.LoggingOn ? this.Task : null, ref runner);         if (this.m_coreState.m_stateMachine == null)         {             Task<TResult> task = this.Task;             this.m_coreState.PostBoxInitialization(stateMachine, runner, task);         }                 // 将 completionAction 推到 task里,具体看一下接下来的代码         awaiter.UnsafeOnCompleted(completionAction);     }     catch (Exception exception)     {         AsyncMethodBuilderCore.ThrowAsync(exception, null);     } } public void UnsafeOnCompleted(Action continuation)         {                 TaskAwaiter.OnCompletedInternal(this.m_task, continuation, true, false);         } internal static void OnCompletedInternal(Task task, Action continuation, bool continueOnCapturedContext, bool flowExecutionContext) {     task.SetContinuationForAwait(continuation, continueOnCapturedContext, flowExecutionContext, ref stackCrawlMark); } internal void SetContinuationForAwait(Action continuationAction, bool continueOnCapturedContext, bool flowExecutionContext, ref StackCrawlMark stackMark) {      if (!this.AddTaskContinuation(continuationAction, false))      {             AwaitTaskContinuation.UnsafeScheduleAction(continuationAction, this);      } } private bool AddTaskContinuation(object tc, bool addBeforeOthers) {     //这里就把completionAction 放到了 Task 的 m_continuationObject 里     return !this.IsCompleted && ((this.m_continuationObject == null && Interlocked.CompareExchange(ref this.m_continuationObject, tc, null) == null) || this.AddTaskContinuationComplex(tc, addBeforeOthers)); } 

至此为止,准备工作(将 MoveNext 放到 task 的回调里)已经做完了,接下来就是如何来触发这个方法。
简单分析一下,MoveNext 肯定是在 task 执行完触发,让我们回头看看 Task 执行的代码。会发现,哎嘿,在ExecutionContext.Run(capturedContext, contextCallback, this, true); 方法执行完后还有一句代码 this.Finish(true)

internal void Finish(bool bUserDelegateExecuted) {     this.FinishStageTwo(); } internal void FinishStageTwo() {     this.FinishStageThree(); } internal void FinishStageThree() {     this.FinishContinuations(); } internal void FinishContinuations() {     // 获取之前注册的所有回调  最简单的就可以理解成 MoveNext     object obj = Interlocked.Exchange(ref this.m_continuationObject, Task.s_taskCompletionSentinel);     if (obj != null)     {         bool flag = (this.m_stateFlags & 134217728) == 0 && Thread.CurrentThread.ThreadState != ThreadState.AbortRequested && (this.m_stateFlags & 64) == 0;         Action action = obj as Action;         if (action != null)         {             AwaitTaskContinuation.RunOrScheduleAction(action, flag, ref Task.t_currentTask);             return;         }     } } internal static void RunOrScheduleAction(Action action, bool allowInlining, ref Task currentTask) {     // 这里调用了  MoveNextRunner.Run ,简单理解 就是  MoveNext 方法     action(); } void IAsyncStateMachine.MoveNext() {         // 此时 num 就是 0 了,就会执行 await 之后的代码了。     int num = this.<>1__state;     try     {         TaskAwaiter awaiter;         if (num != 0)         {}         else         {         awaiter = this.<>u__1;             this.<>u__1 = default(TaskAwaiter);             this.<>1__state = -1;         }         awaiter.GetResult();         Console.WriteLine("2" + Thread.CurrentThread.ManagedThreadId.ToString());         Console.ReadLine();     }     catch (Exception exception)     {     }     this.<>1__state = -2;     this.<>t__builder.SetResult(); } 

这一段代码我删掉了很多内容,但大致的执行顺序就是如此,如果想要了解更多细节,可以查看 Task 这个类的源码。

总结

回头看看开头的两个问题,现在就可以回答了。

  1. await 之后的方法是何时执行,如何执行的?
    await 的方法在 Task 执行完成之后,通过调用 Finish 方法执行的。
    具体的执行步骤是先将 MoveNext 方法注册到 Task 的回调里,然后在 Task 执行完后调用这个方法。
  2. 为什么 await 之后的代码会在不同的线程执行?
    这个其实是因为 Task 的机制,Task 会被推到线程池里,由线程池挑选一个线程去执行,await 之后的代码其实是由这个线程去执行的,自然就跟 await 的之前的代码不是一个线程。

PS: 补一张图
async_await 源码分析