當然如果有錢有顏,可以直接選擇云廠商產品(比如環信的聊天室方案和超級社區),如果有才有time,也可以選擇平替版MQTT實現方案。今天小猿將介紹用環信MQTT消息云實現應用內的世界頻道,滿滿干貨,不要錯過~~
使用MQTT實現世界頻道-Demo效果演示
協議優勢:
在介紹具體方案之前,我們先嘮一嘮為啥選擇MQTT協議。
輕量級:MQTT本身是物聯網的連接協議,專為受限設備和低帶寬場景使用。所以其代碼占用空間較小,同樣適用于注重SDK大小的移動應用領域(比如:游戲領域)。
易集成:MQTT作為標準開放的消息協議,經過多年演進,已支持30多種開發語言,10余種SDK,無論何種開發環境,都可以快速找到開源SDK。
高并發:MQTT是輕量級的消息傳輸協議,2字節心跳報文,最小化傳輸和連接成本,云廠商broker產品都可支持千萬級并發接入,適用于高并發連接場景。
低成本:MQTT是基于客戶端-服務器的訂閱/發布模型,通過服務器中間件實現消息分發,減少消息復制成本,快速實現一對多在線推送。
靈活性:MQTT協議支持多種消息特性,包括:topic主題層級、消息分級(QoS0,1,2)、遺囑消息、保留消息等,可以靈活實現多種業務場景。
衍生功能:隨著MQTT云服務的發展,部分服務器廠商已支持消息存儲、獲取在線設備列表、查看歷史消息等衍生功能,降低開發工作量與消息存儲成本。
實現方案:
言歸正傳,上干貨。本次技術實現方案包含:移動客戶端(Android)、后端服務(Java)以及MQTT服務器。這里提一下,MQTT服務器使用環信MQTT消息云,使用三方云服務比較省心,既節省開發時間,產品性能也不需要擔心,現在注冊可以直接使用環信MQTT消息云超高額度的免費版:每月100并發連接、300萬消息,完全滿足功能開發使用。
客戶端實現:
客戶端實現主要包含以下兩部分:
底層MQTT業務集成:包含引入SDK、MQTT方法封裝、業務交互(消息收發)。
APP上層交互:在APP首頁提供世界頻道入口,實現心情彈幕飄窗(接收)和發送。
接下來上底層MQTT業務集成代碼。
引入SDK:
這一步環信官方文檔比較明確,就是根據自己的平臺引入相應的mqtt客戶端sdk,這里簡單貼一下AndroidStudio的引入配置
1// 在根目錄 build.gradle repositories 下加入配置 2maven { url "https://repo.eclipse.org/content/repositories/paho-snapshots/" } 3... 4// 然后加入 MQTT 依賴 5// MQTT sdk https://docs-im.easemob.com/mqtt/qsandroidsdk 6implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.0' 7implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'
方法封裝
這里貼一下對mqtt相關方法的簡單封裝,代碼在vmmqtt模塊兒的MQTTHelper類下:
1 /** 2 * Create by lzan13 on 2022/3/22 3 * 描述:MQTT 幫助類 4 */ 5 object MQTTHelper { 6 7 private var mqttClient: MqttAndroidClient? = null 8 9 // 緩存主題集合 10 private val topicList = mutableListOf<String>() 11 12 /** 13 * 鏈接MQTT 14 * @param id 用戶 Id 15 * @param token 用戶鏈接 MQTT 的 Token 16 * @param topic 需要訂閱的主題,不為空就會在連接成功后進行訂閱 17 */ 18 fun connect(id: String, token: String, topic: String = "") { 19 // 處理訂閱主題 20 if (topic.isNotEmpty()) topicList.add(topic) 21 22 // 拼接鏈接地址 23 val url = "tcp://${MQTTConstants.mqttHost()}:${MQTTConstants.mqttPort()}" 24 // 拼接 clientId 25 val clientId = "${id}@${MQTTConstants.mqttAppId()}" 26 mqttClient = MqttAndroidClient(VMTools.context, url, clientId) 27 28 //連接參數 29 val options = MqttConnectOptions() 30 options.isAutomaticReconnect = true //設置自動重連 31 options.isCleanSession = true // 緩存 32 options.connectionTimeout = CConstants.timeMinute.toInt() // 設置超時時間,單位:秒 33 options.keepAliveInterval = CConstants.timeMinute.toInt() // 心跳包發送間隔,單位:秒 34 options.userName = id // 用戶名 35 options.password = token.toCharArray() // 密碼 36 options.mqttVersion = MqttConnectOptions.MQTT_VERSION_3_1_1; 37 // 設置MQTT監聽 38 mqttClient?.setCallback(object : MqttCallback { 39 override fun connectionLost(t: Throwable) { 40 // 通知鏈接斷開 41 VMLog.d("MQTT 鏈接斷開 $t") 42 } 43 44 @Throws(Exception::class) 45 override fun messageArrived(topic: String, message: MqttMessage) { 46 // 通知收到消息 47 VMLog.d("MQTT 收到消息:$message") 48 // 如果未訂閱則直接丟棄 49 if (!topicList.contains(topic)) return 50 notifyEvent(topic, String(message.payload)) 51 } 52 53 override fun deliveryComplete(token: IMqttDeliveryToken) {} 54 }) 55 //進行連接 56 mqttClient?.connect(options, null, object : IMqttActionListener { 57 override fun onSuccess(token: IMqttToken) { 58 VMLog.d("MQTT 鏈接成功") 59 // 鏈接成功,循環訂閱緩存的主題 60 topicList.forEach { subscribe(it) } 61 } 62 63 override fun onFailure(token: IMqttToken, t: Throwable) { 64 VMLog.d("MQTT 鏈接失敗 $t") 65 } 66 }) 67 } 68 69 /** 70 * 訂閱主題 71 * @param topic 主題 72 */ 73 fun subscribe(topic: String) { 74 if (!topicList.contains(topic)) { 75 topicList.add(topic) 76 } 77 try { 78 //連接成功后訂閱主題 79 mqttClient?.subscribe(topic, 0, null, object : IMqttActionListener { 80 override fun onSuccess(token: IMqttToken) { 81 VMLog.d("MQTT 訂閱成功 $topic") 82 } 83 84 override fun onFailure(token: IMqttToken, t: Throwable) { 85 VMLog.d("MQTT 訂閱失敗 $topic $t") 86 } 87 }) 88 } catch (e: MqttException) { 89 e.printStackTrace() 90 } 91 } 92 93 /** 94 * 取消訂閱 95 * @param topic 主題 96 */ 97 fun unsubscribe(topic: String) { 98 if (topicList.contains(topic)) { 99 topicList.remove(topic) 100 } 101 try { 102 mqttClient?.unsubscribe(topic) 103 } catch (e: MqttException) { 104 e.printStackTrace() 105 } 106 } 107 108 /** 109 * 發送 MQTT 消息 110 * @param topic 主題 111 * @param content 內容 112 */ 113 fun sendMsg(topic: String, content: String) { 114 val msg = MqttMessage() 115 msg.payload = content.encodeToByteArray() // 設置消息內容 116 msg.qos = 0 //設置消息發送質量,可為0,1,2. 117 // 設置消息的topic,并發送。 118 mqttClient?.publish(topic, msg, null, object : IMqttActionListener { 119 override fun onSuccess(asyncActionToken: IMqttToken) { 120 VMLog.d("MQTT 消息發送成功") 121 } 122 123 override fun onFailure(asyncActionToken: IMqttToken, exception: Throwable) { 124 VMLog.d("MQTT 消息發送失敗 ${exception.message}") 125 } 126 }) 127 } 128 129 /** 130 * 通知 MQTT 事件 131 */ 132 private fun notifyEvent(topic: String, data: String) { 133 LDEventBus.post(topic, data) 134 } 135 }
業務交互
和業務相關的就是在啟動APP后,使用后端服務器返回的鑒權token信息及連接封裝接口登錄環信通MQTT服務器,登錄成功后訂閱主題并監聽消息。
1// 請求 token 成功后,調用MQTTHelper.connect()鏈接 MQTT 服務器,這里會同時傳遞監聽的主題 2MQTTHelper.connect(mUser.id, token, MQTTConstants.Topic.newMatchInfo) 3 4/** 5 * 發送匹配信息 6 */ 7private fun sendMatchInfo() { 8 if (selfMatch.user.nickname.isEmpty()) return 9 // 提交自己的匹配信息到服務器 10 mViewModel.submitMatch(selfMatch) 11 val json = JSONObject() 12 json.put("content", selfMatch.content) 13 json.put("emotion", selfMatch.emotion) 14 json.put("gender", selfMatch.gender) 15 json.put("type", selfMatch.type) 16 val jsonUser = JSONObject() 17 jsonUser.put("avatar", mUser.avatar) 18 jsonUser.put("id", mUser.id) 19 jsonUser.put("nickname", mUser.nickname) 20 jsonUser.put("username", mUser.username) 21 json.put("user", jsonUser) 22 MQTTHelper.sendMsg(MQTTConstants.Topic.newMatchInfo, json.toString()) 23} 24 25// 監聽消息這里使用了一個事件總線進行通知,在上邊封裝 MQTTHelper 發送消息也使用了這個, 26// 訂閱 MQTT 事件 27LDEventBus.observe(this, MQTTConstants.Topic.newMatchInfo, String::class.java) { 28 val match = JsonUtils.fromJson<Match>(it, Match::class.java) 29 // 這里收到匹配信息之后就增加一條彈幕 30 addBarrage(match) 31}
后端服務實現
接下來介紹后端服務實現,主要包含以下兩部分:
配置連接信息:配置環信MQTT消息云連接信息。
獲取鑒權信息:獲取客戶端連接需要的鑒權信息。
配置連接信息
配置部分只需要按照環信后臺配置信息進行替換就好,配置在config目錄下的config.xxx.json文件內
1/** 2 * Easemob MQTT 配置 https://console.easemob.com/app/generalizeMsg/overviewService 3 */ 4config.mqtt = { 5 host: 'mqtt host', // MQTT 鏈接地址 6 appId: 'appId', // MQTT AppId 7 port: [ 1883, 1884, 80, 443 ], // MQTT 端口 1883(mqtt),1884(mqtts),80(ws),443(wss) 8 restHost: 'https://api.cn1.mqtt.chat/app/8igtc0', // MQTT 服務 API 地址 9 clientId: 'client id', // 替換環信后臺 clientId 10 clientSecret: 'client secret', // 替換環信后臺 clientSecret 11};
獲取鑒權信息
這里主要是獲取客戶端連接所需要的鑒權信息token,為了安全token肯定是要放在服務器端生成的,廢話不多說,上代碼:
1/** 2 * Create by lzan13 on 2022/3/22 3 * 描述:MQTT 幫助類 4 */ 5object MQTTHelper { 6 7 private var mqttClient: MqttAndroidClient? = null 8 9 // 緩存主題集合 10 private val topicList = mutableListOf<String>() 11 12 /** 13 * 鏈接MQTT 14 * @param id 用戶 Id 15 * @param token 用戶鏈接 MQTT 的 Token 16 * @param topic 需要訂閱的主題,不為空就會在連接成功后進行訂閱 17 */ 18 fun connect(id: String, token: String, topic: String = "") { 19 // 處理訂閱主題 20 if (topic.isNotEmpty()) topicList.add(topic) 21 22 // 拼接鏈接地址 23 val url = "tcp://${MQTTConstants.mqttHost()}:${MQTTConstants.mqttPort()}" 24 // 拼接 clientId 25 val clientId = "${id}@${MQTTConstants.mqttAppId()}" 26 mqttClient = MqttAndroidClient(VMTools.context, url, clientId) 27 28 //連接參數 29 val options = MqttConnectOptions() 30 options.isAutomaticReconnect = true //設置自動重連 31 options.isCleanSession = true // 緩存 32 options.connectionTimeout = CConstants.timeMinute.toInt() // 設置超時時間,單位:秒 33 options.keepAliveInterval = CConstants.timeMinute.toInt() // 心跳包發送間隔,單位:秒 34 options.userName = id // 用戶名 35 options.password = token.toCharArray() // 密碼 36 options.mqttVersion = MqttConnectOptions.MQTT_VERSION_3_1_1; 37 // 設置MQTT監聽 38 mqttClient?.setCallback(object : MqttCallback { 39 override fun connectionLost(t: Throwable) { 40 // 通知鏈接斷開 41 VMLog.d("MQTT 鏈接斷開 $t") 42 } 43 44 @Throws(Exception::class) 45 override fun messageArrived(topic: String, message: MqttMessage) { 46 // 通知收到消息 47 VMLog.d("MQTT 收到消息:$message") 48 // 如果未訂閱則直接丟棄 49 if (!topicList.contains(topic)) return 50 notifyEvent(topic, String(message.payload)) 51 } 52 53 override fun deliveryComplete(token: IMqttDeliveryToken) {} 54 }) 55 //進行連接 56 mqttClient?.connect(options, null, object : IMqttActionListener { 57 override fun onSuccess(token: IMqttToken) { 58 VMLog.d("MQTT 鏈接成功") 59 // 鏈接成功,循環訂閱緩存的主題 60 topicList.forEach { subscribe(it) } 61 } 62 63 override fun onFailure(token: IMqttToken, t: Throwable) { 64 VMLog.d("MQTT 鏈接失敗 $t") 65 } 66 }) 67 } 68 69 /** 70 * 訂閱主題 71 * @param topic 主題 72 */ 73 fun subscribe(topic: String) { 74 if (!topicList.contains(topic)) { 75 topicList.add(topic) 76 } 77 try { 78 //連接成功后訂閱主題 79 mqttClient?.subscribe(topic, 0, null, object : IMqttActionListener { 80 override fun onSuccess(token: IMqttToken) { 81 VMLog.d("MQTT 訂閱成功 $topic") 82 } 83 84 override fun onFailure(token: IMqttToken, t: Throwable) { 85 VMLog.d("MQTT 訂閱失敗 $topic $t") 86 } 87 }) 88 } catch (e: MqttException) { 89 e.printStackTrace() 90 } 91 } 92 93 /** 94 * 取消訂閱 95 * @param topic 主題 96 */ 97 fun unsubscribe(topic: String) { 98 if (topicList.contains(topic)) { 99 topicList.remove(topic) 100 } 101 try { 102 mqttClient?.unsubscribe(topic) 103 } catch (e: MqttException) { 104 e.printStackTrace() 105 } 106 } 107 108 /** 109 * 發送 MQTT 消息 110 * @param topic 主題 111 * @param content 內容 112 */ 113 fun sendMsg(topic: String, content: String) { 114 val msg = MqttMessage() 115 msg.payload = content.encodeToByteArray() // 設置消息內容 116 msg.qos = 0 //設置消息發送質量,可為0,1,2. 117 // 設置消息的topic,并發送。 118 mqttClient?.publish(topic, msg, null, object : IMqttActionListener { 119 override fun onSuccess(asyncActionToken: IMqttToken) { 120 VMLog.d("MQTT 消息發送成功") 121 } 122 123 override fun onFailure(asyncActionToken: IMqttToken, exception: Throwable) { 124 VMLog.d("MQTT 消息發送失敗 ${exception.message}") 125 } 126 }) 127 } 128 129 /** 130 * 通知 MQTT 事件 131 */ 132 private fun notifyEvent(topic: String, data: String) { 133 LDEventBus.post(topic, data) 134 }
135}
源碼地址
核心代碼就這么多,不超過500行,這里沒有直接調用環信歷史消息接口獲取消息存儲記錄,后續可以在進行改良,簡化實現流程。源碼鏈接附上,配合使用效果更佳。
服務端github源碼:
https://github.com/lzan13/vmtemplateserver
客戶端github源碼:
https://gitee.com/lzan13/VMTemplateAndroid
寫在最后
MQTT協議資源占用小,并發連接高,集成簡單,特別適用于高頻數據交互場景,比如:游戲的世界廣場、視頻平臺彈幕等等等等,歡迎各位小伙伴集思廣益,基于MQTT服務實現更多的業務場景,享受技術帶來的便利與快樂。