本地事件总线和事务

  • 本地事件总线和事务已关闭评论
  • 76 次浏览
  • A+
所属分类:.NET技术
摘要

通过重写Ef Core的SaveChanges/SaveChangesAsync来实现事务。当然,如果您愿意实现仓储层,可以在仓储层实现展开对应实体包含的事件,并且调整事件的处理顺序。


本地事件总线和事务

通过重写Ef CoreSaveChanges/SaveChangesAsync来实现事务。当然,如果您愿意实现仓储层,可以在仓储层实现展开对应实体包含的事件,并且调整事件的处理顺序。

Github仓库地址:soda-event-bus

实现AggregateRoot

AggregateRoot类主要通过一个集合来记录本次事务的所有事件,到保存前再展开读取,在Abp中采用的ICollection记录的本地事件,通过实现一个排序器来保证顺序问题,我这里直接采用了ConcurrentQueue,保证原子操作的同时保证了顺序性,实现更简单一些。

public abstract class AggregateRoot {     public ConcurrentQueue<object> LocalEvents { get; } = new();      public void AddLocalEvent<TEvent>(TEvent eventData) where TEvent : IEvent     {         LocalEvents.Enqueue(eventData);     }      public bool GetLocalEvent(out object? @event)     {         LocalEvents.TryDequeue(out var eventData);          @event = eventData;         return @event is not null;     }      public void ClearLocalEvents()     {         LocalEvents.Clear();     } } 

重写DbContext

主要是从ServiceProvider中获取对应实体类包含的事件,并且找到对应的Handler进行处理,然后再当作一个事务提交。

public class EventBusDbContext<TDbContext> : DbContext     where TDbContext : DbContext {     private readonly IServiceProvider _serviceProvider;      public EventBusDbContext(DbContextOptions<TDbContext> options, IServiceProvider serviceProvider) : base(options)     {         _serviceProvider = serviceProvider;     }      public override int SaveChanges()     {         return base.SaveChanges();     }      public override int SaveChanges(bool acceptAllChangesOnSuccess)     {         return base.SaveChanges(acceptAllChangesOnSuccess);     }      public override async Task<int> SaveChangesAsync(CancellationToken cancellationToken = default)     {         await HandleEventsAsync();          return await base.SaveChangesAsync(cancellationToken);     }      public override async Task<int> SaveChangesAsync(bool acceptAllChangesOnSuccess, CancellationToken cancellationToken = default)     {         await HandleEventsAsync();          return await base.SaveChangesAsync(acceptAllChangesOnSuccess, cancellationToken);     }      private async Task HandleEventsAsync()     {         foreach (var entityEntry in ChangeTracker.Entries<AggregateRoot>())         {             while (entityEntry.Entity.GetLocalEvent(out var @event))             {                 if (@event is null) break;                  await HandleEventAsync(@event);             }              entityEntry.Entity.ClearLocalEvents();         }     }      private async Task HandleEventAsync(object @event)     {         var eventHandlerType = typeof(IAsyncEventHandler<>).MakeGenericType(@event.GetType());         var eventHandler = _serviceProvider.GetRequiredService(eventHandlerType);          var method = eventHandler.GetType().GetMethod(nameof(IAsyncEventHandler<IEvent>.HandleAsync));         var exceptionHandleMethod = eventHandlerType.GetMethod(nameof(IAsyncEventHandler<IEvent>.HandleException));          try         {             await (Task)method!.Invoke(eventHandler, new[] { @event })!;         }         catch (Exception ex)         {             exceptionHandleMethod!.Invoke(eventHandler, new[] { @event, ex });         }     } } 

分布式事件总线和事务

根据需要扩展即可,基本逻辑相同,但可能需要增加确认机制等。