MQTTnet 2.8 及 3.0.16 的使用

  • MQTTnet 2.8 及 3.0.16 的使用已关闭评论
  • 135 次浏览
  • A+
所属分类:.NET技术
摘要

十年河东,十年河西,莫欺少年穷学无止境,精益求精netcore3.1控制台应用程序,引入MQTTnet 2.8版本

十年河东,十年河西,莫欺少年穷

学无止境,精益求精

netcore3.1控制台应用程序,引入MQTTnet 2.8版本

订阅端:

MQTTnet 2.8 及 3.0.16  的使用MQTTnet 2.8 及 3.0.16  的使用

using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Text; using MQTTnet; using MQTTnet.Server;  using MQTTnet.Client; using System.Threading; using System.Threading.Tasks; using System.Collections.Generic; using MQTTnet.Protocol;  namespace swapConsole {     class Program     {         private static MqttClient mqttClient = null;         private static  string topic = "test123ABC";         private static IMqttClientOptions Options         {             get             {                 MqttClientOptionsBuilder builder = new MqttClientOptionsBuilder();                  builder.WithCleanSession(false);                 //用户名 密码                 builder.WithCredentials("", "");                 var id = Guid.NewGuid().ToString();                 builder.WithClientId(id);                 builder.WithTcpServer("1270.0.0.0", 1883);                 return builder.Build();             }         }         static async Task Main(string[] args)         {             MqttFactory factory = new MqttFactory();             if (mqttClient == null)             {                 mqttClient = (MqttClient)factory.CreateMqttClient();                 mqttClient.ApplicationMessageReceived += MqttClient_ApplicationMessageReceived;                 mqttClient.Connected += MqttClient_Connected;                 mqttClient.Disconnected += async (s, e) =>                  {                      Console.WriteLine("尝试重连!" + Environment.NewLine);                      await ConnectToServer();                  };             }             await ConnectToServer();               Console.ReadLine();         }         /// <summary>         /// 连接MQTT服务器         /// </summary>         private   static async Task ConnectToServer()         {             try             {                 var res =await  mqttClient.ConnectAsync(Options);             }             catch (Exception ex)             {                 Console.WriteLine($"连接到MQTT服务器失败!" + Environment.NewLine + ex.Message + Environment.NewLine);             }         }         /// <summary>         /// 连接MQTT服务器触发         /// </summary>         /// <param name="sender"></param>         /// <param name="e"></param>         private static void MqttClient_Connected(object sender, EventArgs e)         {             Console.WriteLine("已连接到MQTT服务器!" + Environment.NewLine);             SubscribeInfo();         }          /// <summary>         /// 接收消息         /// </summary>         /// <param name="sender"></param>         /// <param name="e"></param>         private static void MqttClient_ApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e)         {             Console.WriteLine($">> {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}{Environment.NewLine}");         }          /// <summary>         /// 订阅消息         /// </summary>         public static void SubscribeInfo()         {             if (string.IsNullOrEmpty(topic))             {                 Console.WriteLine("订阅主题不能为空!");                 return;             }              if (!mqttClient.IsConnected)             {                 Console.WriteLine("MQTT客户端尚未连接!");                  return;             }             mqttClient.SubscribeAsync(new List<TopicFilter> {                 new  TopicFilter(topic, MqttQualityOfServiceLevel.ExactlyOnce)             });              Console.WriteLine($"已订阅[{topic}]主题" + Environment.NewLine);         }          /// <summary>         /// 退订消息         /// </summary>         public static void UnSubscribeInfo()         {               if (string.IsNullOrEmpty(topic))             {                 Console.WriteLine("退订主题不能为空!");                 return;             }             if (!mqttClient.IsConnected)             {                 Console.WriteLine("MQTT客户端尚未连接!");                 return;             }             mqttClient.UnsubscribeAsync(topic);             Console.WriteLine($"已退订[{topic}]主题" + Environment.NewLine);         }      } }

View Code

发布端:

MQTTnet 2.8 及 3.0.16  的使用MQTTnet 2.8 及 3.0.16  的使用

using MQTTnet; using MQTTnet.Client; using System; using System.Text; using System.Threading; using System.Threading.Tasks;  namespace swapPublish {     class Program     {         private static MqttClient mqttClient = null;         private static string topic = "test123ABC";         private static IMqttClientOptions Options         {             get             {                 MqttClientOptionsBuilder builder = new MqttClientOptionsBuilder();                 builder.WithCleanSession(false);                 //用户名 密码                 builder.WithCredentials("", "");                 var id = Guid.NewGuid().ToString();                 builder.WithClientId(id);                 builder.WithTcpServer("127.0.0.1", 1883);                 return builder.Build();             }         }         static async Task  Main(string[] args)         {             MqttFactory factory = new MqttFactory();             if (mqttClient == null)             {                 mqttClient = (MqttClient)factory.CreateMqttClient();                  mqttClient.Connected += MqttClient_Connected;                 mqttClient.Disconnected += async(s, e) =>                 {                     Console.WriteLine("尝试重连!" + Environment.NewLine);                     await ConnectToServer();                 };             }            await  ConnectToServer();             Console.WriteLine("已断开MQTT连接!" + Environment.NewLine);              Console.ReadLine();         }         /// <summary>         /// 连接MQTT服务器         /// </summary>         private static async Task ConnectToServer()         {             try             {                 var res = await mqttClient.ConnectAsync(Options);             }             catch (Exception ex)             {                 Console.WriteLine($"连接到MQTT服务器失败!" + Environment.NewLine + ex.Message + Environment.NewLine);             }         }         /// <summary>         /// 连接MQTT服务器触发         /// </summary>         /// <param name="sender"></param>         /// <param name="e"></param>         private static void MqttClient_Connected(object sender, EventArgs e)         {             Console.WriteLine("已连接到MQTT服务器!" + Environment.NewLine);             for(int i = 0; i < 10; i++)             {                 var tak = PublishInfo();                  Thread.Sleep(2000);             }                    }          private static async  Task PublishInfo( )         {               if (string.IsNullOrEmpty(topic))             {                Console.WriteLine("发布主题不能为空!");                 return;             }              string inputString = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss");             MqttApplicationMessageBuilder builder = new MqttApplicationMessageBuilder();              builder.WithPayload(Encoding.UTF8.GetBytes(inputString));             builder.WithTopic(topic);             builder.WithRetainFlag(false);             builder.WithExactlyOnceQoS();             await mqttClient.PublishAsync(builder.Build());         }     } }

View Code

 如何只允许一个客户端消费同一个消息,暂时未解决!

大家有解决方法,请贴出评论。谢谢

MQTTnet  3.0.16 版本的使用

客户端:

MQTTnet 2.8 及 3.0.16  的使用MQTTnet 2.8 及 3.0.16  的使用

using MQTTnet; using MQTTnet.Adapter; using MQTTnet.Client; using MQTTnet.Client.Connecting; using MQTTnet.Client.Disconnecting; using MQTTnet.Client.Options; using MQTTnet.Client.Receiving; using MQTTnet.Protocol; using System; using System.Collections.Generic; using System.Text; using System.Threading.Tasks;  namespace mqttsub {     class Program     {         static async Task Main(string[] args)         {             MqttClient mqtt = new MqttClient();             await mqtt.StartAsync();             Console.ReadKey();         }     }      public class MqttClient     {         private IMqttClient client;          private IMqttClientOptions options;         MqttClientDto model =null;         public MqttClient()         {             model = new MqttClientDto             {                 Account = "",                 PassWord = "",                 ClientId = Guid.NewGuid().ToString(),                 IP = "",                 Port = 1883,                 Topic="test/+/ABC" //通配符模式 该模式匹配 test/123/ABC  testABC  test/DDDDD/ABC 等             };         }         public async Task StartAsync()         {             try             {                 client = new MqttFactory().CreateMqttClient();                 var build = new MqttClientOptionsBuilder()                 //配置客户端Id                 .WithClientId(Guid.NewGuid().ToString())                 //配置登录账号                 .WithCredentials(model.Account,model.PassWord)                 //配置服务器IP端口 这里得端口号是可空的                 .WithTcpServer(model.IP, 1883)                 .WithCleanSession();                  options = build.Build();                 //收到服务器发来消息                 client.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(MessageReceivedHandler);                 //client.UseApplicationMessageReceivedHandler(args=> {                 //    Console.WriteLine("===================================================");                 //    Console.WriteLine("收到消息:");                 //    Console.WriteLine($"主题:{args.ApplicationMessage.Topic}");                 //    Console.WriteLine($"消息:{Encoding.UTF8.GetString(args.ApplicationMessage.Payload)}");                 //    Console.WriteLine("+++++++++++++++++++++++++++++++++++++++++++++++++++");                 //    Console.WriteLine();                 //});                 //连接成功                  client.ConnectedHandler = new MqttClientConnectedHandlerDelegate(ConnectedHandler);                 //client.UseConnectedHandler(args=> {                 //    Console.WriteLine("本客户端已连接成功");                 //    Console.WriteLine($"地址:{model.IP}");                 //    Console.WriteLine($"端口:{model.Port}");                 //    Console.WriteLine($"客户端:{model.ClientId}");                 //    Console.WriteLine($"账号:{model.Account}");                 //    Console.WriteLine();                 //    //第1种订阅方式                 //    client.SubscribeAsync("主题名称").GetAwaiter().GetResult();                  //    //第2种订阅方式                 //    List<MqttTopicFilter> Topics = new List<MqttTopicFilter>();                 //    Topics.Add(new MqttTopicFilter() { Topic = "主题名称A", QualityOfServiceLevel = MqttQualityOfServiceLevel.ExactlyOnce });                 //    Topics.Add(new MqttTopicFilter() { Topic = "主题名称B" });                 //    Topics.Add(new MqttTopicFilter() { Topic = "主题名称C" });                 //    client.SubscribeAsync(Topics.ToArray()).GetAwaiter().GetResult();                  //    //第3种订阅方式                 //    MqttClientSubscribeOptionsBuilder builder = new MqttClientSubscribeOptionsBuilder();                 //    builder.WithTopicFilter("AAA");                 //    client.SubscribeAsync(builder.Build()).GetAwaiter().GetResult();                 //});                 //断开连接 重连就写在此处                 client.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(DisconnectedHandler);                 //client.UseDisconnectedHandler(args =>                 //{                 //    Console.WriteLine("本客户端已经断开连接");                 //    Console.WriteLine();                 //    try                 //    {                 //        client.ConnectAsync(options).GetAwaiter().GetResult();                 //    }                 //    catch (Exception ex)                 //    {                 //        Console.WriteLine("重连失败");                 //    }                 //});                 //客户端发送消息                 //await client.PublishAsync("你想要的主题", "你需要发送的东西");                 //await client.PublishAsync("你想要的主题", Encoding.UTF8.GetBytes("你需要发送的东西").ToList());                 //连接                 await client.ConnectAsync(options);             }             catch (MqttConnectingFailedException)             {                 Console.WriteLine("身份校验失败");             }             catch (Exception ex)             {                 Console.WriteLine("出现异常");                 Console.WriteLine(ex.Message);             }         }           /// <summary>         /// 客户端断开连接后,如果需要重连在此处实现         /// </summary>         /// <param name="obj"></param>         private async void DisconnectedHandler(MqttClientDisconnectedEventArgs obj)         {             Console.WriteLine("本客户端已经断开连接");             Console.WriteLine();             try             {                 await client.ConnectAsync(options);             }             catch (Exception)             {                 Console.WriteLine("重连失败");             }         }          /// <summary>         /// 连接成功 在此处做订阅主题(Topic)操作         /// </summary>         /// <param name="obj"></param>         private async void ConnectedHandler(MqttClientConnectedEventArgs obj)         {             Console.WriteLine("本客户端已连接成功");             Console.WriteLine($"地址:{model.IP}");             Console.WriteLine($"端口:{model.Port}");             Console.WriteLine($"客户端:{model.ClientId}");             Console.WriteLine($"账号:{model.Account}");             Console.WriteLine();             //第1种订阅方式             // client.SubscribeAsync("主题名称").GetAwaiter().GetResult();              //第2种订阅方式             List<MqttTopicFilter> Topics = new List<MqttTopicFilter>();             Topics.Add(new MqttTopicFilter() { Topic = model.Topic, QualityOfServiceLevel = MqttQualityOfServiceLevel.ExactlyOnce});             //Topics.Add(new MqttTopicFilter() { Topic = "主题名称B" });             //Topics.Add(new MqttTopicFilter() { Topic = "主题名称C" });             await client.SubscribeAsync(Topics.ToArray());              //第3种订阅方式             //MqttClientSubscribeOptionsBuilder builder = new MqttClientSubscribeOptionsBuilder();             //builder.WithTopicFilter("AAA");             //client.SubscribeAsync(builder.Build()).GetAwaiter().GetResult();         }          /// <summary>         /// 收到消息         /// </summary>         /// <param name="obj"></param>         private void MessageReceivedHandler(MqttApplicationMessageReceivedEventArgs obj)         {             Console.WriteLine("===================================================");             Console.WriteLine("收到消息:");             Console.WriteLine($"主题:{obj.ApplicationMessage.Topic}");             Console.WriteLine($"消息:{Encoding.UTF8.GetString(obj.ApplicationMessage.Payload)}");             Console.WriteLine("+++++++++++++++++++++++++++++++++++++++++++++++++++");             Console.WriteLine();         }     }      public class MqttClientDto     {         /// <summary>         /// 连接地址         /// </summary>         public string IP { get; set; }         /// <summary>         /// 账号         /// </summary>         public string Account { get; set; }         /// <summary>         /// 密码         /// </summary>         public string PassWord { get; set; }         /// <summary>         /// 客户端Id         /// </summary>         public string ClientId { get; set; }          public int Port { get; set; }          public string Topic { get; set; }     } }

View Code

服务端:

MQTTnet 2.8 及 3.0.16  的使用MQTTnet 2.8 及 3.0.16  的使用

using MQTTnet; using MQTTnet.Client.Receiving; using MQTTnet.Protocol; using MQTTnet.Server; using System; using System.Net; using System.Text; using System.Threading.Tasks;  namespace MqttPub {     class Program     {         static async Task Main(string[] args)         {             await new ServerDome(). StartAsync();             Console.Read();         }     }      public class ServerDome       {         private IMqttServer server;         MqttClientDto model = null;         public ServerDome()         {             model = new MqttClientDto             {                 Account = "",                 PassWord = "",                 ClientId = Guid.NewGuid().ToString(),                 IP = "",                 Port = 1883,                 Topic = "test"             };         }          public async Task StartAsync()         {             if (server == null || !server.IsStarted)             {                  server = new MqttFactory().CreateMqttServer();                 MqttServerOptionsBuilder serverOptions = new MqttServerOptionsBuilder();                 //、默认监听端口                  serverOptions.WithDefaultEndpointPort(model.Port);                 //校验客户端信息                 serverOptions.WithConnectionValidator(client => {                     string Account = client.Username;                     string PassWord = client.Password;                     string clientid = client.ClientId;                     if (Account == "" && PassWord == "")                     {                         client.ReasonCode = MqttConnectReasonCode.Success;                         Console.WriteLine("校验成功");                     }                     else                     {                         client.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;                         Console.WriteLine("校验失败");                     }                 });                  //客户端发送消息监听                 server.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(MessageReceivedHandler);                 //server.UseApplicationMessageReceivedHandler(args=>{                 //    Console.WriteLine("===================================================");                 //    Console.WriteLine("收到消息:");                 //    Console.WriteLine($"客户端:{args.ClientId}");                 //    Console.WriteLine($"主题:{args.ApplicationMessage.Topic}");                 //    Console.WriteLine($"消息:{Encoding.UTF8.GetString(args.ApplicationMessage.Payload)}");                 //    Console.WriteLine("+++++++++++++++++++++++++++++++++++++++++++++++++++");                 //    Console.WriteLine();                 //});                 //客户端连接事件                 server.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate(ClientConnectedHandler);                 //server.UseClientConnectedHandler(args =>                 //{                 //    Console.WriteLine($"{args.ClientId}此客户端已经连接到服务器");                 //});                 //客户端断开连接事件                 server.ClientDisconnectedHandler = new MqttServerClientDisconnectedHandlerDelegate(ClientDisconnectedHandler);                 //server.UseClientDisconnectedHandler(args => {                 //    Console.WriteLine($"断开连接的客户端:{args.ClientId}");                 //    Console.WriteLine($"断开连接类型:{args.DisconnectType.ToString()}");                 //});                  //客户端订阅主题事件                 server.ClientSubscribedTopicHandler = new MqttServerClientSubscribedHandlerDelegate(ClientSubscribedTopicHandler);                 //客户端取消订阅主题事件                 server.ClientUnsubscribedTopicHandler = new MqttServerClientUnsubscribedTopicHandlerDelegate(ClientUnsubscribedTopicHandler);                 //服务器启动事件                 server.StartedHandler = new MqttServerStartedHandlerDelegate(StartedHandler);                 //服务器停止事件                 server.StoppedHandler = new MqttServerStoppedHandlerDelegate(StoppedHandler);                 //服务端发送数据                 //await  server.PublishAsync("你想要的主题","你需要发送的东西");                 //var mqttApplicationMessage = new MqttApplicationMessage();                 //mqttApplicationMessage.Topic = "你想要的主题";                 //mqttApplicationMessage.Payload = Encoding.ASCII.GetBytes("你需要发送的东西");                 //await server.PublishAsync(mqttApplicationMessage);                 //启动服务器                 await server.StartAsync(serverOptions.Build());             }         }          public async Task StopAsync()         {             if (server != null)             {                 if (server.IsStarted)                 {                     await server.StopAsync();                     server.Dispose();                 }             }         }          /// <summary>         /// 客户端取消订阅主题         /// </summary>         /// <param name="obj"></param>         private void ClientUnsubscribedTopicHandler(MqttServerClientUnsubscribedTopicEventArgs obj)         {             Console.WriteLine($"客户端:{obj.ClientId}");             Console.WriteLine($"取消订阅主题:{obj.TopicFilter}");         }          /// <summary>         /// 客户端订阅的主题         /// </summary>         /// <param name="obj"></param>         private void ClientSubscribedTopicHandler(MqttServerClientSubscribedTopicEventArgs obj)         {             Console.WriteLine($"客户端:{obj.ClientId}");             Console.WriteLine($"订阅主题:{obj.TopicFilter.Topic}");         }          /// <summary>         /// 客户端断开连接         /// </summary>         /// <param name="obj"></param>         private void ClientDisconnectedHandler(MqttServerClientDisconnectedEventArgs obj)         {             Console.WriteLine($"断开连接的客户端:{obj.ClientId}");             Console.WriteLine($"断开连接类型:{obj.DisconnectType.ToString()}");         }          /// <summary>         /// 客户端连接到服务器事件         /// </summary>         /// <param name="obj"></param>         private void ClientConnectedHandler(MqttServerClientConnectedEventArgs obj)         {             throw new NotImplementedException();         }          /// <summary>         /// 收到各个客户端发送的消息         /// </summary>         /// <param name="obj"></param>         private void MessageReceivedHandler(MqttApplicationMessageReceivedEventArgs obj)         {             Console.WriteLine("===================================================");             Console.WriteLine("收到消息:");             Console.WriteLine($"客户端:{obj.ClientId}");             Console.WriteLine($"主题:{obj.ApplicationMessage.Topic}");             Console.WriteLine($"消息:{Encoding.UTF8.GetString(obj.ApplicationMessage.Payload)}");             Console.WriteLine("+++++++++++++++++++++++++++++++++++++++++++++++++++");             Console.WriteLine();         }           /// <summary>         /// MQTT启动服务器事件         /// </summary>         /// <param name="obj"></param>         private void StartedHandler(EventArgs obj)         {             Console.WriteLine($"程序已经启动!监听端口为:{model.Port}");         }          /// <summary>         /// MQTT服务器停止事件         /// </summary>         /// <param name="obj"></param>         private void StoppedHandler(EventArgs obj)         {             Console.WriteLine("程序已经关闭");         }     }      public class MqttClientDto     {         /// <summary>         /// 连接地址         /// </summary>         public string IP { get; set; }         /// <summary>         /// 账号         /// </summary>         public string Account { get; set; }         /// <summary>         /// 密码         /// </summary>         public string PassWord { get; set; }         /// <summary>         /// 客户端Id         /// </summary>         public string ClientId { get; set; }          public int Port { get; set; }          public string Topic { get; set; }     } }

View Code

这里说明下如何使用通配符

例如,发送 topic 主题为:test/123/ABC 或者 test/234/ABC ,消费者在订阅时,可以使用:test/+/ABC  来订阅该类消息。

通配符的作用为分组订阅、

MQTTnet 2.8 及 3.0.16  的使用

 

 发布者发布内容为: test//status ,订阅者订阅的为:test/+/status

MQTTnet 2.8 及 3.0.16  的使用

 

 当然,发布者也可以在 / / 之间增加内容,例如设备号:

MQTTnet 2.8 及 3.0.16  的使用

 

  • 主题名不能使用通配符, 但是主题过滤器中可以使用通配符.因此,订阅者可以通过过滤器接合通配符订阅一类消息

MQTTnet 2.8 及 3.0.16  的使用

 以MQTTnet  3.0.16 为例,开启自动确认,开启不保留最后一跳消息。

MQTTnet 2.8 及 3.0.16  的使用