在Biwen.QuickApi中整合一个极简的发布订阅(事件总线)

  • 在Biwen.QuickApi中整合一个极简的发布订阅(事件总线)已关闭评论
  • 62 次浏览
  • A+
所属分类:.NET技术
摘要

闲来无聊在我的Biwen.QuickApi中实现一下极简的事件总线,其实代码还是蛮简单的,对于初学者可能有些帮助 就贴出来,有什么不足的地方也欢迎板砖交流~

闲来无聊在我的Biwen.QuickApi中实现一下极简的事件总线,其实代码还是蛮简单的,对于初学者可能有些帮助 就贴出来,有什么不足的地方也欢迎板砖交流~

首先定义一个事件约定的空接口

    public interface IEvent{} 

然后定义事件订阅者接口

public interface IEventSubscriber<T> where T : IEvent     {         Task HandleAsync(T @event, CancellationToken ct);         /// <summary>         /// 执行排序         /// </summary>         int Order { get; }          /// <summary>         /// 如果发生错误是否抛出异常,将阻塞后续Handler         /// </summary>         bool ThrowIfError { get; }     }     public abstract class EventSubscriber<T> : IEventSubscriber<T> where T : IEvent     {         public abstract Task HandleAsync(T @event, CancellationToken ct);         public virtual int Order => 0;         /// <summary>         /// 默认不抛出异常         /// </summary>         public virtual bool ThrowIfError => false;     } 

接着就是发布者

 internal class Publisher(IServiceProvider serviceProvider) { 	public async Task PublishAsync<T>(T @event, CancellationToken ct) where T : IEvent 	{ 		var handlers = serviceProvider.GetServices<IEventSubscriber<T>>(); 		if (handlers is null) return; 		foreach (var handler in handlers.OrderBy(x => x.Order)) 		{ 			try 			{ 				await handler.HandleAsync(@event, ct); 			} 			catch 			{ 				if (handler.ThrowIfError) 				{ 					throw; 				} 				//todo: 			} 		} 	} } 

到此发布订阅的基本代码也就写完了.接下来就是注册发布者和所有的订阅者了

核心代码如下:

        static readonly Type InterfaceEventSubscriber = typeof(IEventSubscriber<>);         static readonly object _lock = new();//锁         static bool IsToGenericInterface(this Type type, Type baseInterface)         {             if (type == null) return false;             if (baseInterface == null) return false;             return type.GetInterfaces().Any(x => x.IsGenericType && x.GetGenericTypeDefinition() == baseInterface);         }         static IEnumerable<Type> _eventHanlers = null!;         static IEnumerable<Type> EventHandlers         {             get             {                 lock (_lock)                     return _eventHanlers ??= ASS.InAllRequiredAssemblies.Where(x =>                     !x.IsAbstract && x.IsPublic && x.IsClass && x.IsToGenericInterface(InterfaceEventSubscriber));             }         } 		    //注册EventSubscribers             foreach (var subscriberType in EventSubscribers)             {                 //存在一个订阅者订阅多个事件的情况:                 var baseTypes = subscriberType.GetInterfaces().Where(x => x.IsGenericType && x.GetGenericTypeDefinition() == InterfaceEventSubscriber).ToArray();                 foreach (var baseType in baseTypes)                 {                     services.AddScoped(baseType, subscriberType);                 }             }             //注册Publisher             services.AddScoped<Publisher>(); 		 

至此发布订阅的代码也就完成了!
现在我们将发布订阅封装到QuickApi中使用:

 internal interface IPublisher { 	/// <summary> 	/// Event Publish 	/// </summary> 	/// <typeparam name="T"></typeparam> 	/// <param name="event">Event</param> 	/// <returns></returns> 	Task PublishAsync<T>(T @event, CancellationToken cancellationToken) where T : IEvent; }  

然后BaseQuickApi实现IPublisher接口

 internal interface IQuickApi<Req, Rsp> : IHandlerBuilder, IQuickApiMiddlewareHandler, IAntiforgeryApi, IPublisher {     ValueTask<Rsp> ExecuteAsync(Req request); }  // BaseQuickApi.PublishAsync public virtual async Task PublishAsync<T>(T @event, CancellationToken cancellationToken = default) where T : IEvent {     using var scope = ServiceRegistration.ServiceProvider.CreateScope();     var publisher = scope.ServiceProvider.GetRequiredService<Publisher>();     await publisher.PublishAsync(@event, cancellationToken); } 

至此功能完成,接下来我们测试一下:

 using Biwen.QuickApi.Events; using Microsoft.AspNetCore.Mvc;  namespace Biwen.QuickApi.DemoWeb.Apis {     public class MyEvent : BaseRequest<MyEvent>,IEvent     {         [FromQuery]         public string? Message { get; set; }     }      public class MyEventHandler : EventSubscriber<MyEvent>     {         private readonly ILogger<MyEventHandler> _logger;         public MyEventHandler(ILogger<MyEventHandler> logger)         {             _logger = logger;         }          public override Task HandleAsync(MyEvent @event, CancellationToken ct)         {             _logger.LogInformation($"msg 2 : {@event.Message}");             return Task.CompletedTask;         }     }      /// <summary>     /// 更早执行的Handler     /// </summary>     public class MyEventHandler2 : EventSubscriber<MyEvent>     {         private readonly ILogger<MyEventHandler2> _logger;         public MyEventHandler2(ILogger<MyEventHandler2> logger)         {             _logger = logger;         }          public override Task HandleAsync(MyEvent @event, CancellationToken ct)         {             _logger.LogInformation($"msg 1 : {@event.Message}");             return Task.CompletedTask;         }          public override int Order => -1;      }      /// <summary>     /// 抛出异常的Handler     /// </summary>     public class MyEventHandler3 : EventSubscriber<MyEvent>     {         private readonly ILogger<MyEventHandler3> _logger;         public MyEventHandler3(ILogger<MyEventHandler3> logger)         {             _logger = logger;         }          public override Task HandleAsync(MyEvent @event, CancellationToken ct)         {             throw new Exception("error");         }          public override int Order => -2;          public override bool ThrowIfError => false;      }      [QuickApi("event")]     public class EventApi : BaseQuickApi<MyEvent>     {         public override async ValueTask<IResultResponse> ExecuteAsync(MyEvent request)         {             //publish             await PublishAsync(request);             return IResultResponse.Content("send event");         }     } }  

最后我们运行项目测试一下功能:

curl -X 'GET'    'http://localhost:5101/quick/event?Message=hello%20world'    -H 'accept: */*' 

在Biwen.QuickApi中整合一个极简的发布订阅(事件总线)

源代码我发布到了GitHub,欢迎star! https://github.com/vipwan/Biwen.QuickApi