ASP.NET Core 5-Kestrel源码解读

  • A+
所属分类:.NET技术
摘要

上节讲到了kestrel服务器的配置及使用,相信很多同学已经对kestrel服务器有了初步的了解,那么有的同学可能会想更加深入的了解一下Kestrel服务器的是怎么实现监听和接收http请求的,今天我们来看下Kestrel服务器的源码,相信看完这些,你一定会对Kestrel服务器的运行机制有更深入的了解。

ASP.NET Core 5-Kestrel源码解读

上节讲到了kestrel服务器的配置及使用,相信很多同学已经对kestrel服务器有了初步的了解,那么有的同学可能会想更加深入的了解一下Kestrel服务器的是怎么实现监听和接收http请求的,今天我们来看下Kestrel服务器的源码,相信看完这些,你一定会对Kestrel服务器的运行机制有更深入的了解。

首先,让我们从程序启动类Program.cs开始分析。

public class Program {     public static void Main(string[] args)     {         CreateHostBuilder(args).Build().Run();     }       public static IHostBuilder CreateHostBuilder(string[] args) =>         Host.CreateDefaultBuilder(args)             .ConfigureWebHostDefaults(webBuilder =>              {                  webBuilder.UseStartup<Startup>();              }); }

 

其中,Host类链式调用了两个方法:

  • CreateDefaultBuilder
  • ConfigureWebHostDefaults

首先我们来看下CreateDefaultBuidler方法:

public static IHostBuilder CreateDefaultBuilder(string[] args) {       HostBuilder hostBuilder = new HostBuilder();       hostBuilder.UseContentRoot(Directory.GetCurrentDirectory());       hostBuilder.ConfigureHostConfiguration((Action<IConfigurationBuilder>) (config =>       {         ...       }));       hostBuilder.ConfigureAppConfiguration((Action<HostBuilderContext, IConfigurationBuilder>) ((hostingContext, config) =>       {         ...       })).ConfigureLogging((Action<HostBuilderContext, ILoggingBuilder>) ((hostingContext, logging) =>       {         ...       })).UseDefaultServiceProvider((Action<HostBuilderContext, ServiceProviderOptions>) ((context, options) =>       {         ...       }));       return (IHostBuilder) hostBuilder;     }  }

 

从上述代码可以看出,CreateDefaultBuilder并未涉及Kestrel服务器相关代码,仅仅是进行一些应用的初始化配置,例如,设置应用程序目录,设置配置文件等操作。

我们再来看下ConfigureWebHostDefaults方法:

public static IHostBuilder ConfigureWebHostDefaults(       this IHostBuilder builder,       Action<IWebHostBuilder> configure) {       if (configure == null)         throw new ArgumentNullException(nameof (configure));       return builder.ConfigureWebHost((Action<IWebHostBuilder>) (webHostBuilder =>       {         Microsoft.AspNetCore.WebHost.ConfigureWebDefaults(webHostBuilder);         configure(webHostBuilder);       })); }

通过阅读源码可以发现: ConfigureWebHostDefaults方法中的Microsoft.AspNetCore.WebHost.ConfigureWebDefaults(IWebHostBuilder)为实际执行初始化Kestrel服务器的代码。

internal static void ConfigureWebDefaults(IWebHostBuilder builder) {      ...      builder.UseKestrel((Action<WebHostBuilderContext, KestrelServerOptions>) ((builderContext, options) => options.Configure((IConfiguration) builderContext.Configuration.GetSection("Kestrel"), true))).ConfigureServices((Action<WebHostBuilderContext, IServiceCollection>) ((hostingContext, services) =>      {        services.PostConfigure<HostFilteringOptions>((Action<HostFilteringOptions>) (options =>        {         ...        }      })).UseIIS().UseIISIntegration(); }

看到这里,可能有的同学已经的迫不及待的想要看下Kestrel初始化流程相关的代码了。别着急,我们一步一步来。

首先我们查看一下上面的UseKestrel扩展方法:

public static IWebHostBuilder UseKestrel(       this IWebHostBuilder hostBuilder,       Action<WebHostBuilderContext, KestrelServerOptions> configureOptions)     {       return hostBuilder.UseKestrel().ConfigureKestrel(configureOptions);     }

发现该方法只是对传入的配置项KestrelServerOptions做了封装,最终是调用了IWebHostBuilder的扩展方法UseKestrel和ConfigureKestrel(Action<WebHostBuilderContext, KestrelServerOptions> configureOptions)扩展方法来初始化Kestrel服务器配置,同样是链式调用。

现在我们来看下UseKestrel()这个扩展方法:

public static IWebHostBuilder UseKestrel(this IWebHostBuilder hostBuilder) {     return hostBuilder.ConfigureServices((Action<IServiceCollection>) (services =>     {       services.TryAddSingleton<IConnectionListenerFactory, SocketTransportFactory>();       services.AddTransient<IConfigureOptions<KestrelServerOptions>, KestrelServerOptionsSetup>();       services.AddSingleton<IServer, KestrelServerImpl>();     })); }

细心的同学可能会发现,配置一个Kestrel服务器居然只需要仅仅三行代码?是不是感觉有些不可思议?Kestrel服务器这么简单?是的,Kestrel服务器就是这么简单。那么,Kestrel服务器是如何实现监听和接收请求的呢?

首先看下IConnectionListenerFactory接口类:

public interface IConnectionListenerFactory {     ValueTask<IConnectionListener> BindAsync(       EndPoint endpoint,       CancellationToken cancellationToken = default (CancellationToken)); }

这个接口职责只有一个,就是执行Sokcert的绑定EndPoint操作,然后返回一个IConnectionListener对象。EndPoint可以有三种实现:

  • FileHandleEndPoint
  • UnixDomainSocketEndPoint
  • IPEndPoint

我们再来看下实现类SocketTransportFactory:

public sealed class SocketTransportFactory : IConnectionListenerFactory {     private readonly SocketTransportOptions _options;     private readonly SocketsTrace _trace;     public SocketTransportFactory(       IOptions<SocketTransportOptions> options,       ILoggerFactory loggerFactory)     {       if (options == null)         throw new ArgumentNullException(nameof (options));       if (loggerFactory == null)         throw new ArgumentNullException(nameof (loggerFactory));       this._options = options.Value;       this._trace = new SocketsTrace(loggerFactory.CreateLogger("Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets"));     }     public ValueTask<IConnectionListener> BindAsync(       EndPoint endpoint,       CancellationToken cancellationToken = default (CancellationToken))     {       SocketConnectionListener connectionListener = new SocketConnectionListener(endpoint, this._options, (ISocketsTrace) this._trace);       connectionListener.Bind();       return new ValueTask<IConnectionListener>((IConnectionListener) connectionListener);     } }

代码非常简单,先实例化SocketConnectionListener对象,然后调用SocketConnectionListener的Bind方法并根据传入的EndPoint类型来创建Socket对象,来实现对EndPoint的监听和绑定操作。

internal void Bind() {     if (this._listenSocket != null)       throw new InvalidOperationException(SocketsStrings.TransportAlreadyBound);     Socket listenSocket;     switch (this.EndPoint)     {       case FileHandleEndPoint fileHandleEndPoint:         this._socketHandle = new SafeSocketHandle((IntPtr) (long) fileHandleEndPoint.FileHandle, true);         listenSocket = new Socket(this._socketHandle);         break;       case UnixDomainSocketEndPoint domainSocketEndPoint:         listenSocket = new Socket(domainSocketEndPoint.AddressFamily, SocketType.Stream, ProtocolType.IP);         BindSocket();         break;       case IPEndPoint ipEndPoint:         listenSocket = new Socket(ipEndPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);         if (ipEndPoint.Address == IPAddress.IPv6Any)           listenSocket.DualMode = true;         BindSocket();         break;       default:         listenSocket = new Socket(this.EndPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);         BindSocket();         break;     }     this.EndPoint = listenSocket.LocalEndPoint;     listenSocket.Listen(this._options.Backlog);     this._listenSocket = listenSocket;     void BindSocket()     {       try       {         listenSocket.Bind(this.EndPoint);       }       catch (SocketException ex) when (ex.SocketErrorCode == SocketError.AddressAlreadyInUse)       {         throw new AddressInUseException(ex.Message, (Exception) ex);       }     } }

现在我们已经知道了Kestrel服务器内部是如何进行绑定和监听操作。那么Kestrel服务器是如何对http请求进行接收处理的呢?

接下来我们来看IServer接口:

public interface IServer : IDisposable {     IFeatureCollection Features { get; }     Task StartAsync<TContext>(IHttpApplication<TContext> application, CancellationToken cancellationToken) where TContext : notnull;     Task StopAsync(CancellationToken cancellationToken); }

IServer接口也非常简单,定义了一个Server最基本的有两个功能:启动和停止。那么Kestrel服务器是怎么实现的这个接口呢?

下面我们来看下微软官方为IServer注入的实现类KestrelServerImpl:

internal class KestrelServerImpl : IServer {     ...     public IFeatureCollection Features { get; }     public KestrelServerOptions Options => ServiceContext.ServerOptions;     private ServiceContext ServiceContext { get; }     private IKestrelTrace Trace => ServiceContext.Log;     private AddressBindContext AddressBindContext { get; set; }     public async Task StartAsync<TContext>(IHttpApplication<TContext> application, CancellationToken cancellationToken)     {         ...         async Task OnBind(ListenOptions options)         {             if (!BitConverter.IsLittleEndian)             {                 throw new PlatformNotSupportedException(CoreStrings.BigEndianNotSupported);             }             ValidateOptions();             if (_hasStarted)             {                     // The server has already started and/or has not been cleaned up yet                 throw new InvalidOperationException(CoreStrings.ServerAlreadyStarted);             }             _hasStarted = true;              ServiceContext.Heartbeat?.Start();             if ((options.Protocols & HttpProtocols.Http3) == HttpProtocols.Http3)             {                 if (_multiplexedTransportFactory is null)                 {                     throw new InvalidOperationException($"Cannot start HTTP/3 server if no {nameof(IMultiplexedConnectionListenerFactory)} is registered.");                 }                   options.UseHttp3Server(ServiceContext, application, options.Protocols);                 var multiplexedConnectionDelegate = ((IMultiplexedConnectionBuilder)options).Build();                   multiplexedConnectionDelegate = EnforceConnectionLimit(multiplexedConnectionDelegate, Options.Limits.MaxConcurrentConnections, Trace);                 options.EndPoint = await _transportManager.BindAsync(options.EndPoint, multiplexedConnectionDelegate, options.EndpointConfig).ConfigureAwait(false);             }               if ((options.Protocols & HttpProtocols.Http1) == HttpProtocols.Http1                 || (options.Protocols & HttpProtocols.Http2) == HttpProtocols.Http2                 || options.Protocols == HttpProtocols.None) // TODO a test fails because it doesn't throw an exception in the right place                                                                 // when there is no HttpProtocols in KestrelServer, can we remove/change the test?             {                if (_transportFactory is null)                 {                     throw new InvalidOperationException($"Cannot start HTTP/1.x or HTTP/2 server if no {nameof(IConnectionListenerFactory)} is registered.");                 }                 options.UseHttpServer(ServiceContext, application, options.Protocols);                 var connectionDelegate = options.Build();                 connectionDelegate = EnforceConnectionLimit(connectionDelegate, Options.Limits.MaxConcurrentConnections, Trace);                 options.EndPoint = await _transportManager.BindAsync(options.EndPoint, connectionDelegate, options.EndpointConfig).ConfigureAwait(false);             }          }           AddressBindContext = new AddressBindContext          {              ServerAddressesFeature = _serverAddresses,              ServerOptions = Options,              Logger = Trace,              CreateBinding = OnBind,          };          await BindAsync(cancellationToken).ConfigureAwait(false);          ...     }       public async Task StopAsync(CancellationToken cancellationToken)     {             ...     }     ...     private async Task BindAsync(CancellationToken cancellationToken)     {              ...          await AddressBinder.BindAsync(Options.ListenOptions, AddressBindContext).ConfigureAwait(false);              ...     }     ... }

我们来整理一下StartAsync方法的流程:

  1. 字节序校验:不支持BigEndian
  2. 请求参数长度校验,最大8kb
  3. 判断服务器是否已经启动过
  4. 启动心跳检测
  5. 实例化AddressBindContext用于BindAsync方法使用
  6. 执行BindAsync方法来绑定地址操作

BindAsync调用了AddressBindContext的OnBind方法。OnBind方法会根据使用的http协议类型创建不同的HttpConnectionMiddleware中间件并加入到connection管道中,用于处理Http请求。

具体规则如下:

  • 当协议是HttpProtocols.Http1/2时,创建HttpConnectionMiddleware中间件
  • 当协议是HttpProtocols.Http3时,创建Http3ConnectionMiddleware中间件

目前常用的是HttpConnectionMiddleware:

IConnectionBuilder UseHttpServer<TContext>(       this IConnectionBuilder builder,       ServiceContext serviceContext,       IHttpApplication<TContext> application,       HttpProtocols protocols)     {       HttpConnectionMiddleware<TContext> middleware = new HttpConnectionMiddleware<TContext>(serviceContext, application, protocols);       return builder.Use((Func<ConnectionDelegate, ConnectionDelegate>) (next => new ConnectionDelegate(middleware.OnConnectionAsync)));     }

 

UseHttpServer方法为connection管道(注意不是IApplicationBuilder中的请求管道)添加了一个HttpConnectionmiddleware中间件,当请求到达时,会执行OnConnectionAsync方法来创建HttpConnection对象,然后通过该对象处理http请求:

public Task OnConnectionAsync(ConnectionContext connectionContext) {      IMemoryPoolFeature memoryPoolFeature = connectionContext.Features.Get<IMemoryPoolFeature>();      HttpConnectionContext context = new HttpConnectionContext();      context.ConnectionId = connectionContext.ConnectionId;      context.ConnectionContext = connectionContext;      HttpProtocolsFeature protocolsFeature = connectionContext.Features.Get<HttpProtocolsFeature>();      context.Protocols = protocolsFeature != null ? protocolsFeature.HttpProtocols : this._endpointDefaultProtocols;      context.ServiceContext = this._serviceContext;      context.ConnectionFeatures = connectionContext.Features;      context.MemoryPool = memoryPoolFeature?.MemoryPool ?? MemoryPool<byte>.Shared;      context.Transport = connectionContext.Transport;      context.LocalEndPoint = connectionContext.LocalEndPoint as IPEndPoint;      context.RemoteEndPoint = connectionContext.RemoteEndPoint as IPEndPoint;      return new HttpConnection(context).ProcessRequestsAsync<TContext>(this._application); }

ProcessRequestsAsync为具体的处理请求的方法,此方法会根据使用的http协议版本来创建Http1Connection还是Http2Connection,然后使用此httpConnection来创建context对象(注意不是HttpContext对象)。

Kestrel服务器对请求的接收是通过OnBind里面的TransportManager.BindAsync来实现的。

public async Task<EndPoint> BindAsync(       EndPoint endPoint,       ConnectionDelegate connectionDelegate,       EndpointConfig? endpointConfig) {      if (this._transportFactory == null)        throw new InvalidOperationException("Cannot bind with ConnectionDelegate no IConnectionListenerFactory is registered.");      IConnectionListener connectionListener = await this._transportFactory.BindAsync(endPoint).ConfigureAwait(false);      this.StartAcceptLoop<ConnectionContext>((IConnectionListener<ConnectionContext>) new TransportManager.GenericConnectionListener(connectionListener), (Func<ConnectionContext, Task>) (c => connectionDelegate(c)), endpointConfig);      return connectionListener.EndPoint;
}

 

其中StartAcceptLoop方法为实际接收数据的方法,通过方法名“开始循环接收”,我们猜测,是不是Kestrel服务器是通过对Socket的Accept方法进行循环监听来接收数据的?那么到底是不是呢?让我们来继续跟踪一下connectionDispatcher.StartAcceptingConnections方法:

public Task StartAcceptingConnections(IConnectionListener<T> listener) {      ThreadPool.UnsafeQueueUserWorkItem<IConnectionListener<T>>(new Action<IConnectionListener<T>>(this.StartAcceptingConnectionsCore), listener, false);      return this._acceptLoopTcs.Task; } private void StartAcceptingConnectionsCore(IConnectionListener<T> listener) {      AcceptConnectionsAsync();        async Task AcceptConnectionsAsync()      {        try        {          while (true)          {            T connectionContext = await listener.AcceptAsync(new CancellationToken());            if ((object) connectionContext != null)            {              long id = Interlocked.Increment(ref ConnectionDispatcher<T>._lastConnectionId);              KestrelConnection<T> kestrelConnection = new KestrelConnection<T>(id, this._serviceContext, this._transportConnectionManager, this._connectionDelegate, connectionContext, this.Log);              this._transportConnectionManager.AddConnection(id, (KestrelConnection) kestrelConnection);              this.Log.ConnectionAccepted(connectionContext.ConnectionId);              KestrelEventSource.Log.ConnectionQueuedStart((BaseConnectionContext) connectionContext);              ThreadPool.UnsafeQueueUserWorkItem((IThreadPoolWorkItem) kestrelConnection, false);            }            else              break;          }        }        catch (Exception ex)        {          this.Log.LogCritical((EventId) 0, ex, "The connection listener failed to accept any new connections.");        }        finally        {          this._acceptLoopTcs.TrySetResult();        }   } }

相信现在大家已经了解是怎么回事了吧?原来Kestrel服务器是通过while(true)循环接收的方式接收用户请求数据,然后通过线程池的ThreadPool.UnsafeQueueUserWorkItem方法将请求分发到CLR线程池来处理的。换句话说,在请求到来时,TransportManager将OnConnectionAsync方法加入线程池并待CLR线程池调度。

那么回到开始的时候,Kestrel服务器是如何启动的呢?

让我们再回顾一下Program.cs中的方法

public static void Main(string[] args) {    CreateHostBuilder(args).Build().Run(); }

相信聪明的同学已经猜到了,是通过Run()方法来执行的,Run()方法做了些什么呢?

Run方法实际上是执行了Host类中的StartAsync方法,此方法通过获取预先注入的GenericeWebHostService类中注入的IServer类来最终调用到IServer实现类的StartAsnyc方法的。

internal class GenericWebHostService : IHostedService {   ...   public IServer Server { get; }   ...     public async Task StartAsync(CancellationToken cancellationToken)     {      ...      var httpApplication = new HostingApplication(application, Logger, DiagnosticListener, HttpContextFactory);      await Server.StartAsync(httpApplication, cancellationToken);      ...     } }

至此,Kestrel成功启动并开始监听用户请求。

一句话总结:其实ASP.NET Core 5中的Kestrel服务器只是对Socket的简单封装,简单到直接用socket通过while(true)的方式来循环接收socket请求,并直接放入clr线程池中来等待线程池调度处理。

原来,Kestrel服务器这么简单~

相信通过本文的介绍,大家已经对ASP.NET Core 5中的Kestrel服务器有了解了吧?