TS MQTT封装

  • TS MQTT封装已关闭评论
  • 77 次浏览
  • A+
所属分类:Web前端

TS MQTT封装

导入相关包

npm i mqtt npm i lodash  
  • guid 随机生成就行,具体可以参考百度或者随便生成一个随机数*

代码封装

import mqtt from 'mqtt' import type { MqttClient, OnMessageCallback, IClientOptions, IClientPublishOptions, IPublishPacket } from 'mqtt' import { getGuid } from '@/common/basic' import { without, uniq } from 'lodash'  export type TPublishFormat = {   topic: string   payload: string | Buffer   opts?: IClientPublishOptions } export type TMessageCallback<T> = (topic: string, payload: T) => void  export interface IMqClientOptions extends IClientOptions {   connectCb?: () => void   errorCb?: (e: Event) => void   reconnectCb?: () => void }  export default class MQTT {   private _type: string   private _url: string   private _opt: IMqClientOptions // mqtt配置   public client!: MqttClient   public topicArr: Array<string> = []    constructor(url: string, opt?: IMqClientOptions, type: string = 'Web') {     this._type = type     this._url = url     this._opt = {       clean: true,       clientId: this._type + '_' + getGuid(), // 客户端分类唯一       connectTimeout: 3000, // 超时时间       reconnectPeriod: 1000, //重连超时       ...(opt && opt),     }     this._init()   }    private _init() {     this.destroy()     this.client = mqtt.connect(this._url, this._opt)     this.client.on('connect', () => {       this._opt.connectCb && this._opt.connectCb()       console.log(this._url + '连接成功...')     })     this.client.on('error', (error: any) => {       this._opt.errorCb && this._opt.errorCb(error)       console.log(this._url + '异常中断...')     })     this.client.on('reconnect', () => {       this._opt.reconnectCb && this._opt.reconnectCb()       console.log(this._url + '重新连接...')     })   }    /**    * 函数“unSubscribe”是一个 TypeScript 函数,用于取消订阅一个或多个主题,并返回一个 Promise,该 Promise 解析为一个布尔值,指示取消订阅是否成功。    * @param {string | string[]} topic - topic 参数可以是字符串或字符串数组。它代表客户端想要取消订阅的主题。    * @returns 正在返回 Promise。    */   public unSubscribe(topic: string | string[]) {     return new Promise((resolve: (isOk: boolean) => void) => {       this.client &&         !this.client.disconnected &&         this.client           .unsubscribeAsync(topic)           .then((result) => {             if (typeof topic === 'string') {               topic = [topic]             }             //去重             this.topicArr = without(this.topicArr, ...topic)             console.log(topic, this.topicArr, '取消订阅成功...')             resolve(true)           })           .catch((err) => {             console.log(topic, '取消订阅失败...')             resolve(false)           })     })   }    /**    * 函数“onSubscribe”是一个 TypeScript 函数,它订阅一个或多个主题并返回一个 Promise,该 Promise 解析为一个布尔值,指示订阅是否成功。    * @param {string | string[]} topic - topic 参数可以是字符串或字符串数组。它代表您要订阅的主题。    * @returns 一个 Promise,解析为布尔值,指示订阅是否成功。    */   public onSubscribe(topic: string | string[]) {     if (typeof topic === 'string') {       topic = [topic]     }     const topicOk: Array<string> = without(topic, ...this.topicArr)     return new Promise((resolve: (isOk: boolean) => void) => {       this.client &&         !this.client.disconnected &&         topicOk.length > 0 &&         this.client           .subscribeAsync(topic)           .then((result) => {             this.topicArr.push(...topicOk)             this.topicArr = uniq(this.topicArr)             console.log(topicOk, this.topicArr, '订阅成功...')             resolve(true)           })           .catch((err) => {             console.log(topicOk, '订阅失败...')             resolve(false)           })     })   }    /**    * 函数“onPublish”使用客户端向主题发布消息,并返回一个解析为布尔值的 Promise,指示发布是否成功。    * @param {TPublishFormat} format - format参数的类型为TPublishFormat,它是一个包含两个属性的对象:topic和message。 topic    * 属性表示消息将发布到的主题,message 属性表示将发布的实际消息。    * @returns 正在返回 Promise。    */   public onPublish(format: TPublishFormat) {     return new Promise((resolve: (isOk: boolean) => void) => {       this.client &&         !this.client.disconnected &&         this.client           .publishAsync(format.topic, format.payload, format.opts)           .then((result) => {             console.log('发布消息成功...')             resolve(true)           })           .catch((err) => {             console.log('发布消息失败...')             resolve(false)           })     })   }    //收到的消息   public onMessage<T = any>(callback: TMessageCallback<T>) {     this.client &&       !this.client.disconnected &&       this.client.on('message', (topic: string, payload: Buffer) => {         try {           callback && callback(topic, JSON.parse(payload.toString()))         } catch (err) {           console.log('无法执行JSON.parse...')           callback && callback(topic, payload.toString() as T)         }       })   }    //销毁   public destroy() {     console.log('销毁...')     this.client && this.client.end()     this.topicArr = []   } }  

使用

//通过开源公共服务器测试,切换成自家服务器就行了 const mqtt = new MQTT('mqtt://broker.emqx.io:8083/mqtt', { username: 'emqx_test', password: 'emqx_test' }) mqtt.onSubscribe('/test/ss') mqtt.onMessage((topic, message) => {   console.log(topic, message) })  setTimeout(() => {   mqtt.onPublish({ topic: '/test/ss', payload: '测试1111' }) }, 3000);