MassTransit 入门(一)

  • MassTransit 入门(一)已关闭评论
  • 317 次浏览
  • A+
所属分类:.NET技术
摘要

本文地址源码

本文地址源码

  • MassTransit是一个面向.net的免费开源分布式应用程序框架。
  • MassTransit使得创建应用程序和服务变得很容易,这些应用程序和服务利用基于消息的、松散耦合的异步通信来获得更高的可用性、可靠性和可伸缩性。
  • MassTransit 8.x版本。

实现简单发布订阅

  • 添加Nuget包引用:
    • MassTransit
    • MassTransit.RabbitMQ(演示也可基于内存)

生产端

  • 配置MassTransit
builder.Services.AddMassTransit(x => {         // 使用内存     //x.UsingInMemory();     // 使用RabbitMq     x.UsingRabbitMq((context, config) =>     {         config.Host("rabbitmq://localhost:5672", host =>         {             host.Username("admin");             host.Password("admin");         });     }); }); 
  • 定义消息体
public class OrderEto {     public Guid Id { get; init; }      public string Name { get; set; }          public DateTime CreationTime { get; set; } } 
  • 发布消息
[ApiController] [Route("[controller]")] public class PublishController : ControllerBase {     private readonly ILogger<PublishController> _logger;     private readonly IPublishEndpoint _publishEndpoint;      public PublishController(ILogger<PublishController> logger, IPublishEndpoint publishEndpoint)     {         _logger = logger;         _publishEndpoint = publishEndpoint;     }      [HttpGet]     public async Task Get()     {         await _publishEndpoint.Publish<OrderEto>(new OrderEto()         {             Id = Guid.NewGuid(),             Name = "Phone",             CreationTime = DateTime.Now         });     } } 

消费者端

builder.Services.AddMassTransit(x => {          // 通过扫描程序集注册消费者     x.AddConsumers(typeof(Program).Assembly);         // 通过类型单个注册消费者     // x.AddConsumer<OrderEtoConsumer>(typeof(OrderEtoConsumerDefinition));          // x.SetKebabCaseEndpointNameFormatter();          // 通过泛型单个注册消费者     //x.AddConsumer<OrderEtoConsumer, OrderEtoConsumerDefinition>();          // 通过指定命名空间注册消费者     // x.AddConsumersFromNamespaceContaining<OrderEtoConsumer>();          // 使用内存队列     // x.UsingInMemory();     x.UsingRabbitMq((context, config) =>     {                config.Host("rabbitmq://localhost:5672", hostconfig =>         {             hostconfig.Username("admin");             hostconfig.Password("admin");         });                  config.ConfigureEndpoints(context);             }); }); 
  • 消费者定义
public class OrderEtoConsumer : IConsumer<OrderEto> {     private readonly ILogger<OrderEtoConsumer> _logger;      public OrderEtoConsumer(ILogger<OrderEtoConsumer> logger)     {         _logger = logger;     }      public Task Consume(ConsumeContext<OrderEto> context)     {         _logger.LogInformation($"MassTransit.Consumer.One 收到消息:{JsonSerializer.Serialize(context.Message)}");         return Task.CompletedTask;     } }  public class OrderEtoConsumerDefinition : ConsumerDefinition<OrderEtoConsumer> {     protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator, IConsumerConfigurator<OrderEtoConsumer> consumerConfigurator)     {         endpointConfigurator.UseMessageRetry(r => r.Intervals(500, 1000));     } } 

MassTransit 入门(一)

Abp Vnext Vue实现