- A+
async/await 源码解析
这篇文章主要是分析 async/await 这个语法糖,分析一下 async 和 await 是如何做到异步的。首先,我先抛出两个问题,各位可以先想一下。
- await 之后的方法是何时执行,如何执行的?
- 为什么 await 之后的代码会在不同的线程执行?
demo
要想知道 async/await 是怎么运行的,需要先写一个demo,然后进行一下反编译,就可以得到 async/await 编译后的代码,然后就可以开始分析了。
下面是简单使用 async/await 的demo:
static async Task Main(string[] args) { Console.WriteLine("1"+Thread.CurrentThread.ManagedThreadId.ToString()); await GetThreadID2(); Console.WriteLine("2"+Thread.CurrentThread.ManagedThreadId.ToString()); } public async static Task GetThreadID2() { Console.WriteLine("3"+Thread.CurrentThread.ManagedThreadId.ToString()); await Task.Run(() => Console.WriteLine("4"+Thread.CurrentThread.ManagedThreadId.ToString())); Console.WriteLine("5"+Thread.CurrentThread.ManagedThreadId.ToString()); }
然后我们来反编译一下这段代码(可以使用 dnSpy 进行反编译):
namespace AsyncTest { internal class Program { [DebuggerStepThrough] private static Task Main(string[] args) { Program.<Main>d__0 <Main>d__ = new Program.<Main>d__0(); <Main>d__.<>t__builder = AsyncTaskMethodBuilder.Create(); <Main>d__.args = args; <Main>d__.<>1__state = -1; <Main>d__.<>t__builder.Start<Program.<Main>d__0>(ref <Main>d__); return <Main>d__.<>t__builder.Task; } [DebuggerStepThrough] public static Task GetThreadID2() { Program.<GetThreadID2>d__2 <GetThreadID2>d__ = new Program.<GetThreadID2>d__2(); <GetThreadID2>d__.<>t__builder = AsyncTaskMethodBuilder.Create(); <GetThreadID2>d__.<>1__state = -1; <GetThreadID2>d__.<>t__builder.Start<Program.<GetThreadID2>d__2>(ref <GetThreadID2>d__); return <GetThreadID2>d__.<>t__builder.Task; } [DebuggerStepThrough] private static void <Main>(string[] args) { Program.Main(args).GetAwaiter().GetResult(); } [CompilerGenerated] private sealed class <Main>d__0 : IAsyncStateMachine { void IAsyncStateMachine.MoveNext() { int num = this.<>1__state; try { TaskAwaiter awaiter; if (num != 0) { Console.WriteLine("1" + Thread.CurrentThread.ManagedThreadId.ToString()); awaiter = Program.GetThreadID2().GetAwaiter(); if (!awaiter.IsCompleted) { this.<>1__state = 0; this.<>u__1 = awaiter; Program.<Main>d__0 <Main>d__ = this; this.<>t__builder.AwaitUnsafeOnCompleted<TaskAwaiter, Program.<Main>d__0>(ref awaiter, ref <Main>d__); return; } } else { awaiter = this.<>u__1; this.<>u__1 = default(TaskAwaiter); this.<>1__state = -1; } awaiter.GetResult(); Console.WriteLine("2" + Thread.CurrentThread.ManagedThreadId.ToString()); Console.ReadLine(); } catch (Exception exception) { this.<>1__state = -2; this.<>t__builder.SetException(exception); return; } this.<>1__state = -2; this.<>t__builder.SetResult(); } } [CompilerGenerated] [Serializable] private sealed class <>c { internal void <GetThreadID2>b__2_0() { Console.WriteLine("4" + Thread.CurrentThread.ManagedThreadId.ToString()); } public static readonly Program.<>c <>9 = new Program.<>c(); public static Func<string> <>9__1_0; public static Action <>9__2_0; } [CompilerGenerated] private sealed class <GetThreadID2>d__2 : IAsyncStateMachine { void IAsyncStateMachine.MoveNext() { int num = this.<>1__state; try { TaskAwaiter awaiter; if (num != 0) { Console.WriteLine("3" + Thread.CurrentThread.ManagedThreadId.ToString()); awaiter = Task.Run(new Action(Program.<>c.<>9.<GetThreadID2>b__2_0)).GetAwaiter(); if (!awaiter.IsCompleted) { this.<>1__state = 0; this.<>u__1 = awaiter; Program.<GetThreadID2>d__2 <GetThreadID2>d__ = this; this.<>t__builder.AwaitUnsafeOnCompleted<TaskAwaiter, Program.<GetThreadID2>d__2>(ref awaiter, ref <GetThreadID2>d__); return; } } else { awaiter = this.<>u__1; this.<>u__1 = default(TaskAwaiter); this.<>1__state = -1; } awaiter.GetResult(); Console.WriteLine("5" + Thread.CurrentThread.ManagedThreadId.ToString()); } catch (Exception exception) { this.<>1__state = -2; this.<>t__builder.SetException(exception); return; } this.<>1__state = -2; this.<>t__builder.SetResult(); } [DebuggerHidden] void IAsyncStateMachine.SetStateMachine(IAsyncStateMachine stateMachine){} public int <>1__state; public AsyncTaskMethodBuilder <>t__builder; private TaskAwaiter <>u__1; } } }
上面的代码是反编译出来的所有代码,如果各位选择太长不看的话,这里还有个简化版本:
private static Task Main(string[] args) <Main>d__.<>t__builder.Start<Program.<Main>d__0>(ref <Main>d__); <Main>d__0.MoveNext() <GetThreadID2>d__.<>t__builder.Start<Program.<GetThreadID2>d__2>(ref <GetThreadID2>d__); <GetThreadID2>d__2.MoveNext() <GetThreadID2>d__2.<>t__builder.AwaitUnsafeOnCompleted<TaskAwaiter, Program.<GetThreadID2>d__2>(ref awaiter, ref <GetThreadID2>d__); <GetThreadID2>d__2.MoveNext() <Main>d__0.<>t__builder.AwaitUnsafeOnCompleted<TaskAwaiter, Program.<Main>d__0>(ref awaiter, ref <Main>d__); <Main>d__0.MoveNext()
入口
我们先来看一下 Main 方法。
private static Task Main(string[] args) { Program.<Main>d__0 <Main>d__ = new Program.<Main>d__0(); <Main>d__.<>t__builder = AsyncTaskMethodBuilder.Create(); <Main>d__.args = args; <Main>d__.<>1__state = -1; <Main>d__.<>t__builder.Start<Program.<Main>d__0>(ref <Main>d__); return <Main>d__.<>t__builder.Task; }
首先第一行,构建了一个 Program.<Main>d__0
类型实例,嗯? 这个类哪里来的,没写过呀。
哎嘿,这个类就是编译器帮我们实现的。每一个 async/await 方法,编译器都会帮我们实现一个这样的类。
简单分析一下这个方法,实例化一个 Program.<Main>d__0()
, 初始化 1__state
值为-1,调用 Program.<Main>d__0()
的 Start
方法。
Start
public void Start<TStateMachine>(ref TStateMachine stateMachine) where TStateMachine : IAsyncStateMachine { ExecutionContextSwitcher executionContextSwitcher = default(ExecutionContextSwitcher); RuntimeHelpers.PrepareConstrainedRegions(); try { ExecutionContext.EstablishCopyOnWriteScope(ref executionContextSwitcher); stateMachine.MoveNext(); } finally { executionContextSwitcher.Undo(); } }
Start 代码的主要作用就是启动 stateMachine,即 stateMachine.MoveNext()
。
MoveNext
MoveNext 是 async/await 里非常重要的一段代码,这个方法控制了整个异步任务的执行步骤。
我们可以将其分解成三个部分,即
- await 之前
- await 执行
- await 之后的代码
具体的分析各位请看注释。
void IAsyncStateMachine.MoveNext() { // 这里 <>1__state 之前初始化的时候是 -1,所以 num 就是-1 int num = this.<>1__state; try { TaskAwaiter awaiter; // 第一次进来,num 是 -1 所以接下来的逻辑 if (num != 0) { // 这里执行 await 之前的代码 Console.WriteLine("1" + Thread.CurrentThread.ManagedThreadId.ToString()); // 这里就是执行 await 方法 awaiter = Program.GetThreadID2().GetAwaiter(); // 判断是否执行完成,通常第一次进来 IsCompleted 都是 false if (!awaiter.IsCompleted) { // 修改状态,下次再执行 MoveNext 就不会继续走这段逻辑来了 this.<>1__state = 0; this.<>u__1 = awaiter; Program.<Main>d__0 <Main>d__ = this; // 这里其实是往 Task 里添加了一个 任务结束时的回调,在任务结束时会再次调用 MoveNext // 这就解释了为什么 await 之后的方法是另外一个线程,因为await 之后的方法是在 下一个 MoveNext 的里调用的 // 而那个 MoveNext 是由线程池挑选一个线程进行执行的 this.<>t__builder.AwaitUnsafeOnCompleted<TaskAwaiter, Program.<Main>d__0>(ref awaiter, ref <Main>d__); return; } } else { awaiter = this.<>u__1; this.<>u__1 = default(TaskAwaiter); this.<>1__state = -1; } // GetResult 里有一个 死循环 等待结果 awaiter.GetResult(); // await 之后的方法 Console.WriteLine("2" + Thread.CurrentThread.ManagedThreadId.ToString()); Console.ReadLine(); } catch (Exception exception) { this.<>1__state = -2; this.<>t__builder.SetException(exception); return; } this.<>1__state = -2; this.<>t__builder.SetResult(); }
await 之前的代码
// 这里执行 await 之前的代码 Console.WriteLine("1" + Thread.CurrentThread.ManagedThreadId.ToString());
await 之前的代码很简单,就是正常依次执行。
await 的代码
await 的代码大都以 Task 形式执行,主要分为两步。
第一步,将 Task 推入线程池中。
awaiter = Task.Run(new Action(Program.<>c.<>9.<GetThreadID2>b__3_0)).GetAwaiter(); public static Task Run(Action action) { StackCrawlMark stackCrawlMark = StackCrawlMark.LookForMyCaller; return Task.InternalStartNew(null, action, null, default(CancellationToken), TaskScheduler.Default, TaskCreationOptions.DenyChildAttach, InternalTaskOptions.None, ref stackCrawlMark); } internal static Task InternalStartNew(Task creatingTask, Delegate action, object state, CancellationToken cancellationToken, TaskScheduler scheduler, TaskCreationOptions options, InternalTaskOptions internalOptions, ref StackCrawlMark stackMark) { Task task = new Task(action, state, creatingTask, cancellationToken, options, internalOptions | InternalTaskOptions.QueuedByRuntime, scheduler); task.ScheduleAndStart(false); return task; } internal void ScheduleAndStart(bool needsProtection) { try { this.m_taskScheduler.InternalQueueTask(this); } catch (ThreadAbortException exceptionObject){} } internal void InternalQueueTask(Task task) { this.QueueTask(task); }
第二步,线程池执行 Task。
void IThreadPoolWorkItem.ExecuteWorkItem() { this.ExecuteEntry(false); } internal bool ExecuteEntry(bool bPreventDoubleExecution) { if (!this.IsCancellationRequested && !this.IsCanceled) { this.ExecuteWithThreadLocal(ref Task.t_currentTask); } return true; } private void ExecuteWithThreadLocal(ref Task currentTaskSlot) { Task task = currentTaskSlot; TplEtwProvider log = TplEtwProvider.Log; Guid currentThreadActivityId = default(Guid); bool flag = log.IsEnabled(); try { currentTaskSlot = this; ExecutionContext capturedContext = this.CapturedContext; if (capturedContext == null) { this.Execute(); } else { if (this.IsSelfReplicatingRoot || this.IsChildReplica) { this.CapturedContext = Task.CopyExecutionContext(capturedContext); } ContextCallback contextCallback = Task.s_ecCallback; if (contextCallback == null) { contextCallback = (Task.s_ecCallback = new ContextCallback(Task.ExecutionContextCallback)); } // 这里是执行 Task 方法的地方 ExecutionContext.Run(capturedContext, contextCallback, this, true); } this.Finish(true); } finally { currentTaskSlot = task; } }
await 之后的代码
那么 await 之后的代码是如何执行的呢?
大家回头看一下 MoveNext
方法,可以看到首次执行 MoveNext
的时候,代码的逻辑是这样的
if(num!=0) -> if(!awaiter.IsCompleted) -> AwaitUnsafeOnCompleted;return;
关键就在于 AwaitUnsafeOnCompleted
这个方法,来看一下 AwaitUnsafeOnCompleted
方法。
public void AwaitUnsafeOnCompleted<TAwaiter, TStateMachine>(ref TAwaiter awaiter, ref TStateMachine stateMachine) where TAwaiter : ICriticalNotifyCompletion where TStateMachine : IAsyncStateMachine { try { AsyncMethodBuilderCore.MoveNextRunner runner = null; // 这里是准备一个 任务执行完的回调,简单理解成是将 MoveNext 包装成一个 action Action completionAction = this.m_coreState.GetCompletionAction(AsyncCausalityTracer.LoggingOn ? this.Task : null, ref runner); if (this.m_coreState.m_stateMachine == null) { Task<TResult> task = this.Task; this.m_coreState.PostBoxInitialization(stateMachine, runner, task); } // 将 completionAction 推到 task里,具体看一下接下来的代码 awaiter.UnsafeOnCompleted(completionAction); } catch (Exception exception) { AsyncMethodBuilderCore.ThrowAsync(exception, null); } } public void UnsafeOnCompleted(Action continuation) { TaskAwaiter.OnCompletedInternal(this.m_task, continuation, true, false); } internal static void OnCompletedInternal(Task task, Action continuation, bool continueOnCapturedContext, bool flowExecutionContext) { task.SetContinuationForAwait(continuation, continueOnCapturedContext, flowExecutionContext, ref stackCrawlMark); } internal void SetContinuationForAwait(Action continuationAction, bool continueOnCapturedContext, bool flowExecutionContext, ref StackCrawlMark stackMark) { if (!this.AddTaskContinuation(continuationAction, false)) { AwaitTaskContinuation.UnsafeScheduleAction(continuationAction, this); } } private bool AddTaskContinuation(object tc, bool addBeforeOthers) { //这里就把completionAction 放到了 Task 的 m_continuationObject 里 return !this.IsCompleted && ((this.m_continuationObject == null && Interlocked.CompareExchange(ref this.m_continuationObject, tc, null) == null) || this.AddTaskContinuationComplex(tc, addBeforeOthers)); }
至此为止,准备工作(将 MoveNext
放到 task 的回调里)已经做完了,接下来就是如何来触发这个方法。
简单分析一下,MoveNext
肯定是在 task 执行完触发,让我们回头看看 Task 执行的代码。会发现,哎嘿,在ExecutionContext.Run(capturedContext, contextCallback, this, true);
方法执行完后还有一句代码 this.Finish(true)
。
internal void Finish(bool bUserDelegateExecuted) { this.FinishStageTwo(); } internal void FinishStageTwo() { this.FinishStageThree(); } internal void FinishStageThree() { this.FinishContinuations(); } internal void FinishContinuations() { // 获取之前注册的所有回调 最简单的就可以理解成 MoveNext object obj = Interlocked.Exchange(ref this.m_continuationObject, Task.s_taskCompletionSentinel); if (obj != null) { bool flag = (this.m_stateFlags & 134217728) == 0 && Thread.CurrentThread.ThreadState != ThreadState.AbortRequested && (this.m_stateFlags & 64) == 0; Action action = obj as Action; if (action != null) { AwaitTaskContinuation.RunOrScheduleAction(action, flag, ref Task.t_currentTask); return; } } } internal static void RunOrScheduleAction(Action action, bool allowInlining, ref Task currentTask) { // 这里调用了 MoveNextRunner.Run ,简单理解 就是 MoveNext 方法 action(); } void IAsyncStateMachine.MoveNext() { // 此时 num 就是 0 了,就会执行 await 之后的代码了。 int num = this.<>1__state; try { TaskAwaiter awaiter; if (num != 0) {} else { awaiter = this.<>u__1; this.<>u__1 = default(TaskAwaiter); this.<>1__state = -1; } awaiter.GetResult(); Console.WriteLine("2" + Thread.CurrentThread.ManagedThreadId.ToString()); Console.ReadLine(); } catch (Exception exception) { } this.<>1__state = -2; this.<>t__builder.SetResult(); }
这一段代码我删掉了很多内容,但大致的执行顺序就是如此,如果想要了解更多细节,可以查看 Task 这个类的源码。
总结
回头看看开头的两个问题,现在就可以回答了。
- await 之后的方法是何时执行,如何执行的?
await 的方法在 Task 执行完成之后,通过调用Finish
方法执行的。
具体的执行步骤是先将MoveNext
方法注册到 Task 的回调里,然后在 Task 执行完后调用这个方法。 - 为什么 await 之后的代码会在不同的线程执行?
这个其实是因为 Task 的机制,Task 会被推到线程池里,由线程池挑选一个线程去执行,await 之后的代码其实是由这个线程去执行的,自然就跟 await 的之前的代码不是一个线程。
PS: 补一张图