扩展ABP的Webhook功能,推送数据到第三方接口(企业微信群、钉钉群等)

  • 扩展ABP的Webhook功能,推送数据到第三方接口(企业微信群、钉钉群等)已关闭评论
  • 101 次浏览
  • A+
所属分类:.NET技术
摘要

基于ABP/ZERO的Webhook功能实现,进行一些扩展改造,可以实现业务数据按用户订阅进行推送,包括推送到第三方接口(企业微信群、钉钉等),在很大程度上提升了业务系统的灵活性与实用性。


前言

在上一篇文章【基于ASP.NET ZERO,开发SaaS版供应链管理系统】中有提到对Webhook功能的扩展改造,本文详细介绍一下具体过程。

Webhook功能操作说明,请参见此文档链接:Webhook数据推送

Webhook功能发布日期:

  • ASP.NET Boilerplate(以下简称ABP)在v5.2(2020-02-18)版本中发布了Webhook功能,详细说明,请参见:官方帮助链接
  • ASP.NET ZERO(以下简称ZERO)在v8.2.0(2020-02-20)版本中发布了Webhook功能;
  • 我们系统是在2021年4月完成了对Webhook功能的改造:内部接口(用户自行设定接口地址的)、第三方接口(微信内部群、钉钉群、聚水潭API等)。

1、Webhook定义

  • 为了区分内部接口与第三方接口,在第三方接口名称前统一附加特定前缀,如:Third.WX.XXX、Third.DD.XXX等;
  • 添加定义条目时候设定对应的特性(featureDependency),基于特性功能对不同租户显示或者隐藏定义的条目。
    public class AppWebhookDefinitionProvider : WebhookDefinitionProvider     {         public override void SetWebhooks(IWebhookDefinitionContext context)         {             //物料档案 - 全部可见             context.Manager.Add(new WebhookDefinition(name: AppWebHookNames.T11071001_Created));             context.Manager.Add(new WebhookDefinition(name: AppWebHookNames.T11071001_Updated));             context.Manager.Add(new WebhookDefinition(name: AppWebHookNames.T11071001_Deleted));              //生产订单 - 生产管理可见             var featureC = new SimpleFeatureDependency("SCM.C");             context.Manager.Add(new WebhookDefinition(name: AppWebHookNames.T13041001_Created, featureDependency: featureC));             context.Manager.Add(new WebhookDefinition(name: AppWebHookNames.T13041001_Updated, featureDependency: featureC));             context.Manager.Add(new WebhookDefinition(name: AppWebHookNames.T13041001_Deleted, featureDependency: featureC));             context.Manager.Add(new WebhookDefinition(name: AppWebHookNames.T13041001_MRP_Data, featureDependency: featureC));              //...         }     } 
  • CoreModule中添加Webhook定义,并设定参数选项:
    public class SCMCoreModule : AbpModule     {         public override void PreInitialize()         {             Configuration.Webhooks.Providers.Add<AppWebhookDefinitionProvider>();             Configuration.Webhooks.TimeoutDuration = TimeSpan.FromMinutes(1);             Configuration.Webhooks.IsAutomaticSubscriptionDeactivationEnabled = true;             Configuration.Webhooks.MaxSendAttemptCount = 3;             Configuration.Webhooks.MaxConsecutiveFailCountBeforeDeactivateSubscription = 10;              //...         }          //...     } 

2、Webhook订阅

  • 前端用户创建Webhook订阅记录(WebhookUri、Webhooks、Headers等),之后传递到后端API;
  • 后端API通过WebhookSubscriptionManager添加保存WebhookSubscription(Webhook订阅):
    [AbpAuthorize(AppPermissions.Pages_Administration_WebhookSubscription)]     public class WebhookSubscriptionAppService : SCMAppServiceBase, IWebhookSubscriptionAppService     {         //...          [AbpAuthorize(AppPermissions.Pages_Administration_WebhookSubscription_Create)]         public async Task AddSubscription(WebhookSubscription subscription)         {             subscription.TenantId = AbpSession.TenantId;              await _webHookSubscriptionManager.AddOrUpdateSubscriptionAsync(subscription);         }          //...     } 

3、Webhook发布(数据推送)

监测实体事件(CreatedEvent、UpdatedEvent、DeletedEvent)数据,按租户用户创建的Webhook订阅,推送数据:

    public class T11071001Syncronizer :          IEventHandler<EntityCreatedEventData<T11071001>>,         IEventHandler<EntityUpdatedEventData<T11071001>>,         IEventHandler<EntityDeletedEventData<T11071001>>,         ITransientDependency     {         private readonly IAppWebhookPublisher _appWebhookPublisher;          public T11071001Syncronizer(IAppWebhookPublisher appWebhookPublisher)          {             _appWebhookPublisher = appWebhookPublisher;         }         public void HandleEvent(EntityCreatedEventData<T11071001> eventData)         {             DoWebhook("N", eventData.Entity);         }          public void HandleEvent(EntityUpdatedEventData<T11071001> eventData)         {             DoWebhook("U", eventData.Entity);         }          public void HandleEvent(EntityDeletedEventData<T11071001> eventData)         {             int? tenantId = eventData.Entity.TenantId;              string whName = AppWebHookNames.T11071001_Deleted;             var subscriptions = _appWebhookPublisher.GetSubscriptions(tenantId, whName);              if (subscriptions == null) { return; }              _appWebhookPublisher.PublishWebhookUOW(whName, eventData.Entity, tenantId, subscriptions);         }      } 
  • DoWebhook()方法:基于具体的订阅(内部接口、第三方接口等)推送对应的内容:
        private void DoWebhook(string nu, T11071001 entity)         {             int? tenantId = entity.TenantId;             var whCache = _appWebhookPublisher.GetWebhookCache(tenantId); if (whCache.Count == 0) { return; }              string whName = nu == "N" ? AppWebHookNames.T11071001_Created : AppWebHookNames.T11071001_Updated;             string whNameWX = AppWebHookNames.WX_T11071001_Created;             string whNameDD = AppWebHookNames.DD_T11071001_Created;              bool isWH = whCache.Names.ContainsKey(whName);             bool isWX = whCache.Names.ContainsKey(whNameWX);             bool isDD = whCache.Names.ContainsKey(whNameDD);              if (!(isWH || isWX || isDD)) { return; }              var data = ObjectMapper.Map<T11071001WebhookDto>(entity);              //内部接口             if (isWH)             {                 _appWebhookPublisher.PublishWebhookUOW(whName, data, tenantId, whCache.Names[whName], false);             }              //企业微信内部群             if (isWX)             {                 var wxData = new WxTCardWebhookDto { template_card = GetWxTCard(data, tenantId, nu) };                 _appWebhookPublisher.PublishWebhookUOW(whNameWX, wxData, tenantId, whCache.Names[whNameWX], true);             }              //钉钉内部群             if (isDD)             {                 var title = GetNUTitle(nu, L(T));                 var mdText = GetNewMarkdown(data, title);                 var ddData = new DdMarkdownWebhookDto { markdown = new DdMarkdownContentDto { title = title, text = mdText } };                 _appWebhookPublisher.PublishWebhookUOW(whNameDD, ddData, tenantId, whCache.Names[whNameDD], true);             }         } 
  • GetWebhookCache()方法:实现按租户缓存Webhook订阅的数据:
        public SCMWebhookCacheItem GetWebhookCache(int? tenantId)         {            return SetAndGetCache(tenantId);         }          private SCMWebhookCacheItem SetAndGetCache(int? tenantId, string keyName = "SubscriptionCount")         {            int tid = tenantId ?? 0; var cacheKey = $"{keyName}-{tid}";             return _cacheManager.GetSCMWebhookCache().Get(cacheKey, () =>            {                 int count = 0;                 var names = new Dictionary<string, List<WebhookSubscription>>();                  UnitOfWorkManager.WithUnitOfWork(() =>                 {                     using (UnitOfWorkManager.Current.SetTenantId(tenantId))                     {                         if (_featureChecker.IsEnabled(tid, "SCM.H")) //Feature核查                         {                             var items = _webhookSubscriptionRepository.GetAllList(e => e.TenantId == tenantId && e.IsActive == true);                             count = items.Count;                              foreach (var item in items)                             {                                 if (string.IsNullOrWhiteSpace(item.Webhooks)) { continue; }                                 var whNames = JsonHelper.DeserializeObject<string[]>(item.Webhooks); if (whNames == null) { continue; }                                 foreach (string whName in whNames)                                 {                                     if (names.ContainsKey(whName))                                     {                                         names[whName].Add(item.ToWebhookSubscription());                                     }                                     else                                     {                                         names.Add(whName, new List<WebhookSubscription> { item.ToWebhookSubscription() });                                     }                                 }                             }                         }                     }                 });                  return new SCMWebhookCacheItem(count, names);             });         } 
  • PublishWebhookUOW()方法:替换ABP中WebHookPublisher的默认实现,直接按传入的订阅,通过WebhookSenderJob推送数据:
        public void PublishWebhookUOW(string webHookName, object data, int? tenantId, List<WebhookSubscription> webhookSubscriptions = null, bool sendExactSameData = false)         {             UnitOfWorkManager.WithUnitOfWork(() =>             {                 using (UnitOfWorkManager.Current.SetTenantId(tenantId))                    {                     Publish(webHookName, data, tenantId, webhookSubscriptions, sendExactSameData);                 }             });         }          private void Publish(string webhookName, object data, int? tenantId, List<WebhookSubscription> webhookSubscriptions, bool sendExactSameData = false)         {             if (string.IsNullOrWhiteSpace(webhookName)) { return; }              //若无直接传入订阅则按webhookName查询             webhookSubscriptions ??= _webhookSubscriptionRepository.GetAllList(subscriptionInfo =>                     subscriptionInfo.TenantId == tenantId &&                     subscriptionInfo.IsActive &&                     subscriptionInfo.Webhooks.Contains(""" + webhookName + """)                 ).Select(subscriptionInfo => subscriptionInfo.ToWebhookSubscription()).ToList();              if (webhookSubscriptions.IsNullOrEmpty()) { return; }              var webhookInfo = SaveAndGetWebhookEvent(tenantId, webhookName, data);              foreach (var webhookSubscription in webhookSubscriptions)             {                 var jobArgs = new WebhookSenderArgs                 {                     TenantId = webhookSubscription.TenantId,                     WebhookEventId = webhookInfo.Id,                     Data = webhookInfo.Data,                     WebhookName = webhookInfo.WebhookName,                     WebhookSubscriptionId = webhookSubscription.Id,                     Headers = webhookSubscription.Headers,                     Secret = webhookSubscription.Secret,                     WebhookUri = webhookSubscription.WebhookUri,                     SendExactSameData = sendExactSameData                 };                  //指定队列执行任务,由触发事件的server执行                 IBackgroundJobClient hangFireClient = new BackgroundJobClient();                 hangFireClient.Create<WebhookSenderJob>(x => x.ExecuteAsync(jobArgs), new EnqueuedState(AppVersionHelper.MachineName));             }         }  
  • WebhookSenderJob:重写WebhookManager的SignWebhookRequest方法,对于第三方接口,不添加签名的Header:
        public override void SignWebhookRequest(HttpRequestMessage request, string serializedBody, string secret)         {             if (request == null)             {                 throw new ArgumentNullException(nameof(request));             }              //第三方接口,不添加签名Header             if (IsThirdAPI(request))             {                 return;             }              if (string.IsNullOrWhiteSpace(serializedBody))             {                 throw new ArgumentNullException(nameof(serializedBody));             }              var secretBytes = Encoding.UTF8.GetBytes(secret);              using (var hasher = new HMACSHA256(secretBytes))             {                 request.Content = new StringContent(serializedBody, Encoding.UTF8, "application/json");                  var data = Encoding.UTF8.GetBytes(serializedBody);                 var sha256 = hasher.ComputeHash(data);                  var headerValue = string.Format(CultureInfo.InvariantCulture, SignatureHeaderValueTemplate, BitConverter.ToString(sha256));                  request.Headers.Add(SignatureHeaderName, headerValue);             }         } 
  • WebhookSenderJob:重写WebhookSender的CreateWebhookRequestMessage方法,对于第三方接口,进行特殊处理:
        protected override HttpRequestMessage CreateWebhookRequestMessage(WebhookSenderArgs webhookSenderArgs)         {             return webhookSenderArgs.WebhookName switch             {                 AppWebHookNames.JST_supplier_upload => JSTHttpRequestMessage(webhookSenderArgs), //聚水潭 - 供应商上传                 //...                 _ => new HttpRequestMessage(HttpMethod.Post, webhookSenderArgs.WebhookUri)             };         } 
  • WebhookSenderJob:重写WebhookSender的AddAdditionalHeaders方法, 对于第三方接口,不添加Headers:
        protected override void AddAdditionalHeaders(HttpRequestMessage request, WebhookSenderArgs webhookSenderArgs)         {             //第三方接口,不添加Header             if (IsThirdAPI(request))             {                 return;             }              foreach (var header in webhookSenderArgs.Headers)             {                 if (request.Headers.TryAddWithoutValidation(header.Key, header.Value))                 {                     continue;                 }                  if (request.Content.Headers.TryAddWithoutValidation(header.Key, header.Value))                 {                     continue;                 }                  throw new Exception($"Invalid Header. SubscriptionId:{webhookSenderArgs.WebhookSubscriptionId},Header: {header.Key}:{header.Value}");             }         } 
  • WebhookSenderJob:重写WebhookSender的SendHttpRequest方法,处理第三方接口的回传数据:
        protected override async Task<(bool isSucceed, HttpStatusCode statusCode, string content)> SendHttpRequest(HttpRequestMessage request)         {             using var client = _httpClientFactory.CreateClient(); //避免使用 new HttpClient()方式             client.Timeout = _webhooksConfiguration.TimeoutDuration;              var response = await client.SendAsync(request);              var isSucceed = response.IsSuccessStatusCode;             var statusCode = response.StatusCode;             var content = await response.Content.ReadAsStringAsync();              //第三方接口,需要处理回传的数据                  if (IsThirdAPI(request))             {                 string method = TryGetHeader(request.Headers, "ThirdAPI1");                 int tenantId = Convert.ToInt32(TryGetHeader(request.Headers, "ThirdAPI2"));                 switch (method)                 {                     case AppWebHookNames.JST_supplier_upload: await JSTSupplierUploadResponse(method, content, tenantId); break;                     //...                     default: break;                 }             }              return (isSucceed, statusCode, content);         } 

总结

基于ABP/ZERO的Webhook功能实现,进行一些扩展改造,可以实现业务数据按用户订阅进行推送,包括推送到第三方接口(企业微信群、钉钉等),在很大程度上提升了业务系统的灵活性与实用性。