精品国产一级在线观看,国产成人综合久久精品亚洲,免费一级欧美大片在线观看

當前位置:新聞中心行業相關 → 正文

開源項目:用環信MQTT實現"世界頻道"只需5分鐘【附源碼】

責任編輯:yang |來源:企業網D1Net  2022-04-27 13:20:36 本文摘自:中國IT產經新聞

說到“世界頻道”想必大家都不陌生,常見的如王者榮耀的世界廣播搖人組隊以及最近興起的Discord社區交友等等。究其目的就是在應用內讓海量用戶可以實時互動。有些開發者為了實現這種場景會選擇聊天室方案來實現,但是這種方式存在一定的局限性,比如聊天室人數上限、海量消息處理等各種情況。

當然如果有錢有顏,可以直接選擇云廠商產品(比如環信的聊天室方案和超級社區),如果有才有time,也可以選擇平替版MQTT實現方案。今天小猿將介紹用環信MQTT消息云實現應用內的世界頻道,滿滿干貨,不要錯過~~

 

使用MQTT實現世界頻道-Demo效果演示

 

 

協議優勢:

在介紹具體方案之前,我們先嘮一嘮為啥選擇MQTT協議。

輕量級:MQTT本身是物聯網的連接協議,專為受限設備和低帶寬場景使用。所以其代碼占用空間較小,同樣適用于注重SDK大小的移動應用領域(比如:游戲領域)。

易集成:MQTT作為標準開放的消息協議,經過多年演進,已支持30多種開發語言,10余種SDK,無論何種開發環境,都可以快速找到開源SDK。

高并發:MQTT是輕量級的消息傳輸協議,2字節心跳報文,最小化傳輸和連接成本,云廠商broker產品都可支持千萬級并發接入,適用于高并發連接場景。

低成本:MQTT是基于客戶端-服務器的訂閱/發布模型,通過服務器中間件實現消息分發,減少消息復制成本,快速實現一對多在線推送。

靈活性:MQTT協議支持多種消息特性,包括:topic主題層級、消息分級(QoS0,1,2)、遺囑消息、保留消息等,可以靈活實現多種業務場景。

衍生功能:隨著MQTT云服務的發展,部分服務器廠商已支持消息存儲、獲取在線設備列表、查看歷史消息等衍生功能,降低開發工作量與消息存儲成本。

 

實現方案:

言歸正傳,上干貨。本次技術實現方案包含:移動客戶端(Android)、后端服務(Java)以及MQTT服務器。這里提一下,MQTT服務器使用環信MQTT消息云,使用三方云服務比較省心,既節省開發時間,產品性能也不需要擔心,現在注冊可以直接使用環信MQTT消息云超高額度的免費版:每月100并發連接、300萬消息,完全滿足功能開發使用。

 

客戶端實現:

客戶端實現主要包含以下兩部分:

底層MQTT業務集成:包含引入SDK、MQTT方法封裝、業務交互(消息收發)。

APP上層交互:在APP首頁提供世界頻道入口,實現心情彈幕飄窗(接收)和發送。

接下來上底層MQTT業務集成代碼。

 

引入SDK:

這一步環信官方文檔比較明確,就是根據自己的平臺引入相應的mqtt客戶端sdk,這里簡單貼一下AndroidStudio的引入配置

 

1// 在根目錄 build.gradle repositories 下加入配置
2maven { url "https://repo.eclipse.org/content/repositories/paho-snapshots/" }
3...
4// 然后加入 MQTT 依賴
5// MQTT sdk https://docs-im.easemob.com/mqtt/qsandroidsdk
6implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.0'
7implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'

方法封裝

這里貼一下對mqtt相關方法的簡單封裝,代碼在vmmqtt模塊兒的MQTTHelper類下:

1 /**
  2 * Create by lzan13 on 2022/3/22
  3 * 描述:MQTT 幫助類
  4 */
  5 object MQTTHelper {
  6
  7    private var mqttClient: MqttAndroidClient? = null
  8
  9    // 緩存主題集合
 10    private val topicList = mutableListOf<String>()
 11
 12    /**
 13     * 鏈接MQTT
 14     * @param id 用戶 Id
 15     * @param token 用戶鏈接 MQTT 的 Token
 16     * @param topic 需要訂閱的主題,不為空就會在連接成功后進行訂閱
 17     */
 18    fun connect(id: String, token: String, topic: String = "") {
 19        // 處理訂閱主題
 20        if (topic.isNotEmpty()) topicList.add(topic)
 21
 22        // 拼接鏈接地址
 23        val url = "tcp://${MQTTConstants.mqttHost()}:${MQTTConstants.mqttPort()}"
 24        // 拼接 clientId
 25        val clientId = "${id}@${MQTTConstants.mqttAppId()}"
 26        mqttClient = MqttAndroidClient(VMTools.context, url, clientId)
 27
 28        //連接參數
 29        val options = MqttConnectOptions()
 30        options.isAutomaticReconnect = true //設置自動重連
 31        options.isCleanSession = true // 緩存
 32        options.connectionTimeout = CConstants.timeMinute.toInt() // 設置超時時間,單位:秒
 33        options.keepAliveInterval = CConstants.timeMinute.toInt() // 心跳包發送間隔,單位:秒
 34        options.userName = id // 用戶名
 35        options.password = token.toCharArray() // 密碼
 36        options.mqttVersion = MqttConnectOptions.MQTT_VERSION_3_1_1;
 37        // 設置MQTT監聽
 38        mqttClient?.setCallback(object : MqttCallback {
 39            override fun connectionLost(t: Throwable) {
 40                // 通知鏈接斷開
 41                VMLog.d("MQTT 鏈接斷開 $t")
 42            }
 43
 44            @Throws(Exception::class)
 45            override fun messageArrived(topic: String, message: MqttMessage) {
 46                // 通知收到消息
 47                VMLog.d("MQTT 收到消息:$message")
 48                // 如果未訂閱則直接丟棄
 49                if (!topicList.contains(topic)) return
 50                notifyEvent(topic, String(message.payload))
 51            }
 52
 53            override fun deliveryComplete(token: IMqttDeliveryToken) {}
 54        })
 55        //進行連接
 56        mqttClient?.connect(options, null, object : IMqttActionListener {
 57            override fun onSuccess(token: IMqttToken) {
 58                VMLog.d("MQTT 鏈接成功")
 59                // 鏈接成功,循環訂閱緩存的主題
 60                topicList.forEach { subscribe(it) }
 61            }
 62
 63            override fun onFailure(token: IMqttToken, t: Throwable) {
 64                VMLog.d("MQTT 鏈接失敗 $t")
 65            }
 66        })
 67    }
 68
 69    /**
 70     * 訂閱主題
 71     * @param topic 主題
 72     */
 73    fun subscribe(topic: String) {
 74        if (!topicList.contains(topic)) {
 75            topicList.add(topic)
 76        }
 77        try {
 78            //連接成功后訂閱主題
 79            mqttClient?.subscribe(topic, 0, null, object : IMqttActionListener {
 80                override fun onSuccess(token: IMqttToken) {
 81                    VMLog.d("MQTT 訂閱成功 $topic")
 82                }
 83
 84                override fun onFailure(token: IMqttToken, t: Throwable) {
 85                    VMLog.d("MQTT 訂閱失敗 $topic $t")
 86                }
 87            })
 88        } catch (e: MqttException) {
 89            e.printStackTrace()
 90        }
 91    }
 92
 93    /**
 94     * 取消訂閱
 95     * @param topic 主題
 96     */
 97    fun unsubscribe(topic: String) {
 98        if (topicList.contains(topic)) {
 99            topicList.remove(topic)
100        }
101        try {
102            mqttClient?.unsubscribe(topic)
103        } catch (e: MqttException) {
104            e.printStackTrace()
105        }
106    }
107
108    /**
109     * 發送 MQTT 消息
110     * @param topic 主題
111     * @param content 內容
112     */
113    fun sendMsg(topic: String, content: String) {
114        val msg = MqttMessage()
115        msg.payload = content.encodeToByteArray() // 設置消息內容
116        msg.qos = 0 //設置消息發送質量,可為0,1,2.
117        // 設置消息的topic,并發送。
118        mqttClient?.publish(topic, msg, null, object : IMqttActionListener {
119            override fun onSuccess(asyncActionToken: IMqttToken) {
120                VMLog.d("MQTT 消息發送成功")
121            }
122
123            override fun onFailure(asyncActionToken: IMqttToken, exception: Throwable) {
124                VMLog.d("MQTT 消息發送失敗 ${exception.message}")
125            }
126        })
127    }
128
129    /**
130     * 通知 MQTT 事件
131     */
132    private fun notifyEvent(topic: String, data: String) {
133        LDEventBus.post(topic, data)
134    }
135 }

業務交互

和業務相關的就是在啟動APP后,使用后端服務器返回的鑒權token信息及連接封裝接口登錄環信通MQTT服務器,登錄成功后訂閱主題并監聽消息。

1// 請求 token 成功后,調用MQTTHelper.connect()鏈接 MQTT 服務器,這里會同時傳遞監聽的主題
 2MQTTHelper.connect(mUser.id, token, MQTTConstants.Topic.newMatchInfo)
 3
 4/**
 5 * 發送匹配信息
 6 */
 7private fun sendMatchInfo() {
 8    if (selfMatch.user.nickname.isEmpty()) return
 9    // 提交自己的匹配信息到服務器
10    mViewModel.submitMatch(selfMatch)
11    val json = JSONObject()
12    json.put("content", selfMatch.content)
13    json.put("emotion", selfMatch.emotion)
14    json.put("gender", selfMatch.gender)
15    json.put("type", selfMatch.type)
16    val jsonUser = JSONObject()
17    jsonUser.put("avatar", mUser.avatar)
18    jsonUser.put("id", mUser.id)
19    jsonUser.put("nickname", mUser.nickname)
20    jsonUser.put("username", mUser.username)
21    json.put("user", jsonUser)
22    MQTTHelper.sendMsg(MQTTConstants.Topic.newMatchInfo, json.toString())
23}
24
25// 監聽消息這里使用了一個事件總線進行通知,在上邊封裝 MQTTHelper 發送消息也使用了這個,
26// 訂閱 MQTT 事件
27LDEventBus.observe(this, MQTTConstants.Topic.newMatchInfo, String::class.java) {
28        val match = JsonUtils.fromJson<Match>(it, Match::class.java)
29        // 這里收到匹配信息之后就增加一條彈幕
30    addBarrage(match)
31}

后端服務實現

接下來介紹后端服務實現,主要包含以下兩部分:

配置連接信息:配置環信MQTT消息云連接信息。

獲取鑒權信息:獲取客戶端連接需要的鑒權信息。

 

配置連接信息

配置部分只需要按照環信后臺配置信息進行替換就好,配置在config目錄下的config.xxx.json文件內

1/**
 2 * Easemob MQTT 配置 https://console.easemob.com/app/generalizeMsg/overviewService
 3 */
 4config.mqtt = {
 5    host: 'mqtt host', // MQTT 鏈接地址
 6  appId: 'appId', // MQTT AppId
 7  port: [ 1883, 1884, 80, 443 ], // MQTT 端口 1883(mqtt),1884(mqtts),80(ws),443(wss)
 8  restHost: 'https://api.cn1.mqtt.chat/app/8igtc0', // MQTT 服務 API 地址
 9  clientId: 'client id', // 替換環信后臺 clientId
10  clientSecret: 'client secret', // 替換環信后臺 clientSecret
11};

獲取鑒權信息

這里主要是獲取客戶端連接所需要的鑒權信息token,為了安全token肯定是要放在服務器端生成的,廢話不多說,上代碼:

1/**
  2 * Create by lzan13 on 2022/3/22
  3 * 描述:MQTT 幫助類
  4 */
  5object MQTTHelper {
  6
  7    private var mqttClient: MqttAndroidClient? = null
  8
  9    // 緩存主題集合
 10    private val topicList = mutableListOf<String>()
 11
 12    /**
 13     * 鏈接MQTT
 14     * @param id 用戶 Id
 15     * @param token 用戶鏈接 MQTT 的 Token
 16     * @param topic 需要訂閱的主題,不為空就會在連接成功后進行訂閱
 17     */
 18    fun connect(id: String, token: String, topic: String = "") {
 19        // 處理訂閱主題
 20        if (topic.isNotEmpty()) topicList.add(topic)
 21
 22        // 拼接鏈接地址
 23        val url = "tcp://${MQTTConstants.mqttHost()}:${MQTTConstants.mqttPort()}"
 24        // 拼接 clientId
 25        val clientId = "${id}@${MQTTConstants.mqttAppId()}"
 26        mqttClient = MqttAndroidClient(VMTools.context, url, clientId)
 27
 28        //連接參數
 29        val options = MqttConnectOptions()
 30        options.isAutomaticReconnect = true //設置自動重連
 31        options.isCleanSession = true // 緩存
 32        options.connectionTimeout = CConstants.timeMinute.toInt() // 設置超時時間,單位:秒
 33        options.keepAliveInterval = CConstants.timeMinute.toInt() // 心跳包發送間隔,單位:秒
 34        options.userName = id // 用戶名
 35        options.password = token.toCharArray() // 密碼
 36        options.mqttVersion = MqttConnectOptions.MQTT_VERSION_3_1_1;
 37        // 設置MQTT監聽
 38        mqttClient?.setCallback(object : MqttCallback {
 39            override fun connectionLost(t: Throwable) {
 40                // 通知鏈接斷開
 41                VMLog.d("MQTT 鏈接斷開 $t")
 42            }
 43
 44            @Throws(Exception::class)
 45            override fun messageArrived(topic: String, message: MqttMessage) {
 46                // 通知收到消息
 47                VMLog.d("MQTT 收到消息:$message")
 48                // 如果未訂閱則直接丟棄
 49                if (!topicList.contains(topic)) return
 50                notifyEvent(topic, String(message.payload))
 51            }
 52
 53            override fun deliveryComplete(token: IMqttDeliveryToken) {}
 54        })
 55        //進行連接
 56        mqttClient?.connect(options, null, object : IMqttActionListener {
 57            override fun onSuccess(token: IMqttToken) {
 58                VMLog.d("MQTT 鏈接成功")
 59                // 鏈接成功,循環訂閱緩存的主題
 60                topicList.forEach { subscribe(it) }
 61            }
 62
 63            override fun onFailure(token: IMqttToken, t: Throwable) {
 64                VMLog.d("MQTT 鏈接失敗 $t")
 65            }
 66        })
 67    }
 68
 69    /**
 70     * 訂閱主題
 71     * @param topic 主題
 72     */
 73    fun subscribe(topic: String) {
 74        if (!topicList.contains(topic)) {
 75            topicList.add(topic)
 76        }
 77        try {
 78            //連接成功后訂閱主題
 79            mqttClient?.subscribe(topic, 0, null, object : IMqttActionListener {
 80                override fun onSuccess(token: IMqttToken) {
 81                    VMLog.d("MQTT 訂閱成功 $topic")
 82                }
 83
 84                override fun onFailure(token: IMqttToken, t: Throwable) {
 85                    VMLog.d("MQTT 訂閱失敗 $topic $t")
 86                }
 87            })
 88        } catch (e: MqttException) {
 89            e.printStackTrace()
 90        }
 91    }
 92
 93    /**
 94     * 取消訂閱
 95     * @param topic 主題
 96     */
 97    fun unsubscribe(topic: String) {
 98        if (topicList.contains(topic)) {
 99            topicList.remove(topic)
100        }
101        try {
102            mqttClient?.unsubscribe(topic)
103        } catch (e: MqttException) {
104            e.printStackTrace()
105        }
106    }
107
108    /**
109     * 發送 MQTT 消息
110     * @param topic 主題
111     * @param content 內容
112     */
113    fun sendMsg(topic: String, content: String) {
114        val msg = MqttMessage()
115        msg.payload = content.encodeToByteArray() // 設置消息內容
116        msg.qos = 0 //設置消息發送質量,可為0,1,2.
117        // 設置消息的topic,并發送。
118        mqttClient?.publish(topic, msg, null, object : IMqttActionListener {
119            override fun onSuccess(asyncActionToken: IMqttToken) {
120                VMLog.d("MQTT 消息發送成功")
121            }
122
123            override fun onFailure(asyncActionToken: IMqttToken, exception: Throwable) {
124                VMLog.d("MQTT 消息發送失敗 ${exception.message}")
125            }
126        })
127    }
128
129    /**
130     * 通知 MQTT 事件
131     */
132    private fun notifyEvent(topic: String, data: String) {
133        LDEventBus.post(topic, data)
134    }
135}

源碼地址

核心代碼就這么多,不超過500行,這里沒有直接調用環信歷史消息接口獲取消息存儲記錄,后續可以在進行改良,簡化實現流程。源碼鏈接附上,配合使用效果更佳。

服務端github源碼:

https://github.com/lzan13/vmtemplateserver

客戶端github源碼:

https://gitee.com/lzan13/VMTemplateAndroid

 

寫在最后

 

MQTT協議資源占用小,并發連接高,集成簡單,特別適用于高頻數據交互場景,比如:游戲的世界廣場、視頻平臺彈幕等等等等,歡迎各位小伙伴集思廣益,基于MQTT服務實現更多的業務場景,享受技術帶來的便利與快樂。

關鍵字:頻道世界開源

本文摘自:中國IT產經新聞

x 開源項目:用環信MQTT實現"世界頻道"只需5分鐘【附源碼】 掃一掃
分享本文到朋友圈
當前位置:新聞中心行業相關 → 正文

開源項目:用環信MQTT實現"世界頻道"只需5分鐘【附源碼】

責任編輯:yang |來源:企業網D1Net  2022-04-27 13:20:36 本文摘自:中國IT產經新聞

說到“世界頻道”想必大家都不陌生,常見的如王者榮耀的世界廣播搖人組隊以及最近興起的Discord社區交友等等。究其目的就是在應用內讓海量用戶可以實時互動。有些開發者為了實現這種場景會選擇聊天室方案來實現,但是這種方式存在一定的局限性,比如聊天室人數上限、海量消息處理等各種情況。

當然如果有錢有顏,可以直接選擇云廠商產品(比如環信的聊天室方案和超級社區),如果有才有time,也可以選擇平替版MQTT實現方案。今天小猿將介紹用環信MQTT消息云實現應用內的世界頻道,滿滿干貨,不要錯過~~

 

使用MQTT實現世界頻道-Demo效果演示

 

 

協議優勢:

在介紹具體方案之前,我們先嘮一嘮為啥選擇MQTT協議。

輕量級:MQTT本身是物聯網的連接協議,專為受限設備和低帶寬場景使用。所以其代碼占用空間較小,同樣適用于注重SDK大小的移動應用領域(比如:游戲領域)。

易集成:MQTT作為標準開放的消息協議,經過多年演進,已支持30多種開發語言,10余種SDK,無論何種開發環境,都可以快速找到開源SDK。

高并發:MQTT是輕量級的消息傳輸協議,2字節心跳報文,最小化傳輸和連接成本,云廠商broker產品都可支持千萬級并發接入,適用于高并發連接場景。

低成本:MQTT是基于客戶端-服務器的訂閱/發布模型,通過服務器中間件實現消息分發,減少消息復制成本,快速實現一對多在線推送。

靈活性:MQTT協議支持多種消息特性,包括:topic主題層級、消息分級(QoS0,1,2)、遺囑消息、保留消息等,可以靈活實現多種業務場景。

衍生功能:隨著MQTT云服務的發展,部分服務器廠商已支持消息存儲、獲取在線設備列表、查看歷史消息等衍生功能,降低開發工作量與消息存儲成本。

 

實現方案:

言歸正傳,上干貨。本次技術實現方案包含:移動客戶端(Android)、后端服務(Java)以及MQTT服務器。這里提一下,MQTT服務器使用環信MQTT消息云,使用三方云服務比較省心,既節省開發時間,產品性能也不需要擔心,現在注冊可以直接使用環信MQTT消息云超高額度的免費版:每月100并發連接、300萬消息,完全滿足功能開發使用。

 

客戶端實現:

客戶端實現主要包含以下兩部分:

底層MQTT業務集成:包含引入SDK、MQTT方法封裝、業務交互(消息收發)。

APP上層交互:在APP首頁提供世界頻道入口,實現心情彈幕飄窗(接收)和發送。

接下來上底層MQTT業務集成代碼。

 

引入SDK:

這一步環信官方文檔比較明確,就是根據自己的平臺引入相應的mqtt客戶端sdk,這里簡單貼一下AndroidStudio的引入配置

 

1// 在根目錄 build.gradle repositories 下加入配置
2maven { url "https://repo.eclipse.org/content/repositories/paho-snapshots/" }
3...
4// 然后加入 MQTT 依賴
5// MQTT sdk https://docs-im.easemob.com/mqtt/qsandroidsdk
6implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.0'
7implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'

方法封裝

這里貼一下對mqtt相關方法的簡單封裝,代碼在vmmqtt模塊兒的MQTTHelper類下:

1 /**
  2 * Create by lzan13 on 2022/3/22
  3 * 描述:MQTT 幫助類
  4 */
  5 object MQTTHelper {
  6
  7    private var mqttClient: MqttAndroidClient? = null
  8
  9    // 緩存主題集合
 10    private val topicList = mutableListOf<String>()
 11
 12    /**
 13     * 鏈接MQTT
 14     * @param id 用戶 Id
 15     * @param token 用戶鏈接 MQTT 的 Token
 16     * @param topic 需要訂閱的主題,不為空就會在連接成功后進行訂閱
 17     */
 18    fun connect(id: String, token: String, topic: String = "") {
 19        // 處理訂閱主題
 20        if (topic.isNotEmpty()) topicList.add(topic)
 21
 22        // 拼接鏈接地址
 23        val url = "tcp://${MQTTConstants.mqttHost()}:${MQTTConstants.mqttPort()}"
 24        // 拼接 clientId
 25        val clientId = "${id}@${MQTTConstants.mqttAppId()}"
 26        mqttClient = MqttAndroidClient(VMTools.context, url, clientId)
 27
 28        //連接參數
 29        val options = MqttConnectOptions()
 30        options.isAutomaticReconnect = true //設置自動重連
 31        options.isCleanSession = true // 緩存
 32        options.connectionTimeout = CConstants.timeMinute.toInt() // 設置超時時間,單位:秒
 33        options.keepAliveInterval = CConstants.timeMinute.toInt() // 心跳包發送間隔,單位:秒
 34        options.userName = id // 用戶名
 35        options.password = token.toCharArray() // 密碼
 36        options.mqttVersion = MqttConnectOptions.MQTT_VERSION_3_1_1;
 37        // 設置MQTT監聽
 38        mqttClient?.setCallback(object : MqttCallback {
 39            override fun connectionLost(t: Throwable) {
 40                // 通知鏈接斷開
 41                VMLog.d("MQTT 鏈接斷開 $t")
 42            }
 43
 44            @Throws(Exception::class)
 45            override fun messageArrived(topic: String, message: MqttMessage) {
 46                // 通知收到消息
 47                VMLog.d("MQTT 收到消息:$message")
 48                // 如果未訂閱則直接丟棄
 49                if (!topicList.contains(topic)) return
 50                notifyEvent(topic, String(message.payload))
 51            }
 52
 53            override fun deliveryComplete(token: IMqttDeliveryToken) {}
 54        })
 55        //進行連接
 56        mqttClient?.connect(options, null, object : IMqttActionListener {
 57            override fun onSuccess(token: IMqttToken) {
 58                VMLog.d("MQTT 鏈接成功")
 59                // 鏈接成功,循環訂閱緩存的主題
 60                topicList.forEach { subscribe(it) }
 61            }
 62
 63            override fun onFailure(token: IMqttToken, t: Throwable) {
 64                VMLog.d("MQTT 鏈接失敗 $t")
 65            }
 66        })
 67    }
 68
 69    /**
 70     * 訂閱主題
 71     * @param topic 主題
 72     */
 73    fun subscribe(topic: String) {
 74        if (!topicList.contains(topic)) {
 75            topicList.add(topic)
 76        }
 77        try {
 78            //連接成功后訂閱主題
 79            mqttClient?.subscribe(topic, 0, null, object : IMqttActionListener {
 80                override fun onSuccess(token: IMqttToken) {
 81                    VMLog.d("MQTT 訂閱成功 $topic")
 82                }
 83
 84                override fun onFailure(token: IMqttToken, t: Throwable) {
 85                    VMLog.d("MQTT 訂閱失敗 $topic $t")
 86                }
 87            })
 88        } catch (e: MqttException) {
 89            e.printStackTrace()
 90        }
 91    }
 92
 93    /**
 94     * 取消訂閱
 95     * @param topic 主題
 96     */
 97    fun unsubscribe(topic: String) {
 98        if (topicList.contains(topic)) {
 99            topicList.remove(topic)
100        }
101        try {
102            mqttClient?.unsubscribe(topic)
103        } catch (e: MqttException) {
104            e.printStackTrace()
105        }
106    }
107
108    /**
109     * 發送 MQTT 消息
110     * @param topic 主題
111     * @param content 內容
112     */
113    fun sendMsg(topic: String, content: String) {
114        val msg = MqttMessage()
115        msg.payload = content.encodeToByteArray() // 設置消息內容
116        msg.qos = 0 //設置消息發送質量,可為0,1,2.
117        // 設置消息的topic,并發送。
118        mqttClient?.publish(topic, msg, null, object : IMqttActionListener {
119            override fun onSuccess(asyncActionToken: IMqttToken) {
120                VMLog.d("MQTT 消息發送成功")
121            }
122
123            override fun onFailure(asyncActionToken: IMqttToken, exception: Throwable) {
124                VMLog.d("MQTT 消息發送失敗 ${exception.message}")
125            }
126        })
127    }
128
129    /**
130     * 通知 MQTT 事件
131     */
132    private fun notifyEvent(topic: String, data: String) {
133        LDEventBus.post(topic, data)
134    }
135 }

業務交互

和業務相關的就是在啟動APP后,使用后端服務器返回的鑒權token信息及連接封裝接口登錄環信通MQTT服務器,登錄成功后訂閱主題并監聽消息。

1// 請求 token 成功后,調用MQTTHelper.connect()鏈接 MQTT 服務器,這里會同時傳遞監聽的主題
 2MQTTHelper.connect(mUser.id, token, MQTTConstants.Topic.newMatchInfo)
 3
 4/**
 5 * 發送匹配信息
 6 */
 7private fun sendMatchInfo() {
 8    if (selfMatch.user.nickname.isEmpty()) return
 9    // 提交自己的匹配信息到服務器
10    mViewModel.submitMatch(selfMatch)
11    val json = JSONObject()
12    json.put("content", selfMatch.content)
13    json.put("emotion", selfMatch.emotion)
14    json.put("gender", selfMatch.gender)
15    json.put("type", selfMatch.type)
16    val jsonUser = JSONObject()
17    jsonUser.put("avatar", mUser.avatar)
18    jsonUser.put("id", mUser.id)
19    jsonUser.put("nickname", mUser.nickname)
20    jsonUser.put("username", mUser.username)
21    json.put("user", jsonUser)
22    MQTTHelper.sendMsg(MQTTConstants.Topic.newMatchInfo, json.toString())
23}
24
25// 監聽消息這里使用了一個事件總線進行通知,在上邊封裝 MQTTHelper 發送消息也使用了這個,
26// 訂閱 MQTT 事件
27LDEventBus.observe(this, MQTTConstants.Topic.newMatchInfo, String::class.java) {
28        val match = JsonUtils.fromJson<Match>(it, Match::class.java)
29        // 這里收到匹配信息之后就增加一條彈幕
30    addBarrage(match)
31}

后端服務實現

接下來介紹后端服務實現,主要包含以下兩部分:

配置連接信息:配置環信MQTT消息云連接信息。

獲取鑒權信息:獲取客戶端連接需要的鑒權信息。

 

配置連接信息

配置部分只需要按照環信后臺配置信息進行替換就好,配置在config目錄下的config.xxx.json文件內

1/**
 2 * Easemob MQTT 配置 https://console.easemob.com/app/generalizeMsg/overviewService
 3 */
 4config.mqtt = {
 5    host: 'mqtt host', // MQTT 鏈接地址
 6  appId: 'appId', // MQTT AppId
 7  port: [ 1883, 1884, 80, 443 ], // MQTT 端口 1883(mqtt),1884(mqtts),80(ws),443(wss)
 8  restHost: 'https://api.cn1.mqtt.chat/app/8igtc0', // MQTT 服務 API 地址
 9  clientId: 'client id', // 替換環信后臺 clientId
10  clientSecret: 'client secret', // 替換環信后臺 clientSecret
11};

獲取鑒權信息

這里主要是獲取客戶端連接所需要的鑒權信息token,為了安全token肯定是要放在服務器端生成的,廢話不多說,上代碼:

1/**
  2 * Create by lzan13 on 2022/3/22
  3 * 描述:MQTT 幫助類
  4 */
  5object MQTTHelper {
  6
  7    private var mqttClient: MqttAndroidClient? = null
  8
  9    // 緩存主題集合
 10    private val topicList = mutableListOf<String>()
 11
 12    /**
 13     * 鏈接MQTT
 14     * @param id 用戶 Id
 15     * @param token 用戶鏈接 MQTT 的 Token
 16     * @param topic 需要訂閱的主題,不為空就會在連接成功后進行訂閱
 17     */
 18    fun connect(id: String, token: String, topic: String = "") {
 19        // 處理訂閱主題
 20        if (topic.isNotEmpty()) topicList.add(topic)
 21
 22        // 拼接鏈接地址
 23        val url = "tcp://${MQTTConstants.mqttHost()}:${MQTTConstants.mqttPort()}"
 24        // 拼接 clientId
 25        val clientId = "${id}@${MQTTConstants.mqttAppId()}"
 26        mqttClient = MqttAndroidClient(VMTools.context, url, clientId)
 27
 28        //連接參數
 29        val options = MqttConnectOptions()
 30        options.isAutomaticReconnect = true //設置自動重連
 31        options.isCleanSession = true // 緩存
 32        options.connectionTimeout = CConstants.timeMinute.toInt() // 設置超時時間,單位:秒
 33        options.keepAliveInterval = CConstants.timeMinute.toInt() // 心跳包發送間隔,單位:秒
 34        options.userName = id // 用戶名
 35        options.password = token.toCharArray() // 密碼
 36        options.mqttVersion = MqttConnectOptions.MQTT_VERSION_3_1_1;
 37        // 設置MQTT監聽
 38        mqttClient?.setCallback(object : MqttCallback {
 39            override fun connectionLost(t: Throwable) {
 40                // 通知鏈接斷開
 41                VMLog.d("MQTT 鏈接斷開 $t")
 42            }
 43
 44            @Throws(Exception::class)
 45            override fun messageArrived(topic: String, message: MqttMessage) {
 46                // 通知收到消息
 47                VMLog.d("MQTT 收到消息:$message")
 48                // 如果未訂閱則直接丟棄
 49                if (!topicList.contains(topic)) return
 50                notifyEvent(topic, String(message.payload))
 51            }
 52
 53            override fun deliveryComplete(token: IMqttDeliveryToken) {}
 54        })
 55        //進行連接
 56        mqttClient?.connect(options, null, object : IMqttActionListener {
 57            override fun onSuccess(token: IMqttToken) {
 58                VMLog.d("MQTT 鏈接成功")
 59                // 鏈接成功,循環訂閱緩存的主題
 60                topicList.forEach { subscribe(it) }
 61            }
 62
 63            override fun onFailure(token: IMqttToken, t: Throwable) {
 64                VMLog.d("MQTT 鏈接失敗 $t")
 65            }
 66        })
 67    }
 68
 69    /**
 70     * 訂閱主題
 71     * @param topic 主題
 72     */
 73    fun subscribe(topic: String) {
 74        if (!topicList.contains(topic)) {
 75            topicList.add(topic)
 76        }
 77        try {
 78            //連接成功后訂閱主題
 79            mqttClient?.subscribe(topic, 0, null, object : IMqttActionListener {
 80                override fun onSuccess(token: IMqttToken) {
 81                    VMLog.d("MQTT 訂閱成功 $topic")
 82                }
 83
 84                override fun onFailure(token: IMqttToken, t: Throwable) {
 85                    VMLog.d("MQTT 訂閱失敗 $topic $t")
 86                }
 87            })
 88        } catch (e: MqttException) {
 89            e.printStackTrace()
 90        }
 91    }
 92
 93    /**
 94     * 取消訂閱
 95     * @param topic 主題
 96     */
 97    fun unsubscribe(topic: String) {
 98        if (topicList.contains(topic)) {
 99            topicList.remove(topic)
100        }
101        try {
102            mqttClient?.unsubscribe(topic)
103        } catch (e: MqttException) {
104            e.printStackTrace()
105        }
106    }
107
108    /**
109     * 發送 MQTT 消息
110     * @param topic 主題
111     * @param content 內容
112     */
113    fun sendMsg(topic: String, content: String) {
114        val msg = MqttMessage()
115        msg.payload = content.encodeToByteArray() // 設置消息內容
116        msg.qos = 0 //設置消息發送質量,可為0,1,2.
117        // 設置消息的topic,并發送。
118        mqttClient?.publish(topic, msg, null, object : IMqttActionListener {
119            override fun onSuccess(asyncActionToken: IMqttToken) {
120                VMLog.d("MQTT 消息發送成功")
121            }
122
123            override fun onFailure(asyncActionToken: IMqttToken, exception: Throwable) {
124                VMLog.d("MQTT 消息發送失敗 ${exception.message}")
125            }
126        })
127    }
128
129    /**
130     * 通知 MQTT 事件
131     */
132    private fun notifyEvent(topic: String, data: String) {
133        LDEventBus.post(topic, data)
134    }
135}

源碼地址

核心代碼就這么多,不超過500行,這里沒有直接調用環信歷史消息接口獲取消息存儲記錄,后續可以在進行改良,簡化實現流程。源碼鏈接附上,配合使用效果更佳。

服務端github源碼:

https://github.com/lzan13/vmtemplateserver

客戶端github源碼:

https://gitee.com/lzan13/VMTemplateAndroid

 

寫在最后

 

MQTT協議資源占用小,并發連接高,集成簡單,特別適用于高頻數據交互場景,比如:游戲的世界廣場、視頻平臺彈幕等等等等,歡迎各位小伙伴集思廣益,基于MQTT服務實現更多的業務場景,享受技術帶來的便利與快樂。

關鍵字:頻道世界開源

本文摘自:中國IT產經新聞

電子周刊
回到頂部

關于我們聯系我們版權聲明隱私條款廣告服務友情鏈接投稿中心招賢納士

企業網版權所有 ©2010-2024 京ICP備09108050號-6 京公網安備 11010502049343號

^
  • <menuitem id="jw4sk"></menuitem>

    1. <form id="jw4sk"><tbody id="jw4sk"><dfn id="jw4sk"></dfn></tbody></form>
      主站蜘蛛池模板: 云林县| 冀州市| 乌什县| 姜堰市| 阿城市| 米林县| 晋中市| 沂南县| 崇义县| 平江县| 武义县| 潞城市| 河北省| 通化市| 普兰县| 郧西县| 沂源县| 恭城| 惠州市| 徐州市| 潞西市| 武陟县| 英吉沙县| 大邑县| 启东市| 安国市| 榆树市| 延津县| 会同县| 纳雍县| 盘山县| 子长县| 平昌县| 兴国县| 怀柔区| 嘉义县| 石城县| 崇信县| 江陵县| 合水县| 米泉市|