柚子快報(bào)激活碼778899分享:物聯(lián)網(wǎng) MQTT-Java
柚子快報(bào)激活碼778899分享:物聯(lián)網(wǎng) MQTT-Java
背景:最近接觸到MQTT協(xié)議用來和設(shè)備進(jìn)行數(shù)據(jù)對(duì)接。先將設(shè)備的信息接入網(wǎng)關(guān),網(wǎng)關(guān)配置采集的信息后發(fā)送到MQTT服務(wù)器中,我們?cè)趩?dòng)java程序來拉取MQTT topic中的數(shù)據(jù)
MQTT服務(wù)器使用 EMQX(Docker 部署指南 | EMQX 5.1 文檔) 文檔很全,我這里使用開源版本。
MQTT客戶端工具使用 MQTTX(MQTTX: Your All-in-one MQTT Client Toolbox)
使用Java語言,完成MQTT的學(xué)習(xí)和測(cè)試,
1.引入pom依賴
2.編寫測(cè)試代碼
@Test
void testSub() throws MqttException, InterruptedException {
String clientId = "client-testSub2";
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(new String[]{"tcp://192.168.**.**:1883"});
options.setUserName("");
options.setPassword("".toCharArray());
options.setConnectionTimeout(10);
options.setAutomaticReconnect(true);
options.setCleanSession(false);
MqttClient mqttClient = new MqttClient(options.getServerURIs()[0], clientId, new MemoryPersistence());
mqttClient.connect(options);
mqttClient.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
System.out.println(topic + "收到messageId=" + message.getId() + ",payload=" + new String(message.getPayload()));
String msg = new String(message.getPayload());
if ("1".equals(msg)) {
throw new RuntimeException("模擬消息處理失敗");
}
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
}
});
mqttClient.subscribe(new String[]{"topic/testPublish"}, new int[] {1});
while(true) {
Thread.sleep(30 * 1000);
}
}
3.假設(shè)MQTT服務(wù)端已安裝完畢,MQTTX已安裝完畢,使用MQTTX連接MQTT服務(wù)端
4.測(cè)試retain消息
mqtt協(xié)議設(shè)計(jì)retain消息是用來,當(dāng)一個(gè)訂閱者訂閱了一個(gè)topic后,這個(gè)topic的retain消息就可以接收到,如果訂閱者id不變且保留session,當(dāng)不重新訂閱topic時(shí)不會(huì)收到retain消息,如果重新訂閱則會(huì)再次收到retain消息。注意:一個(gè)topic只能有一條retain消息,新發(fā)的會(huì)替代舊的;當(dāng)不需要保留retain消息時(shí),發(fā)送一條payload為空的消息就可以了;這里無論qos是什么都一樣
retain消息:用來針對(duì)數(shù)據(jù)采集間隔時(shí)間長的情景,這樣新上線的訂閱者就可以得到上一次設(shè)備的數(shù)據(jù)。
4.1 發(fā)送retain消息
4.2 啟動(dòng)Java程序
當(dāng)關(guān)閉Java程序,再次啟動(dòng)時(shí),它會(huì)再次消費(fèi)
發(fā)送一個(gè)內(nèi)容為空的Retain消息
消費(fèi)者會(huì)再次接收一個(gè)內(nèi)容為空的Retain消息
再次啟動(dòng)就不會(huì)再收到Retain消息了
5.訂閱消息及指定等級(jí)
QOS=0? 至多一次,消息發(fā)送接收依賴TCP
QOS=1?承諾至少會(huì)有一次發(fā)送給接收者
QOS=2?保證消息僅僅傳送到目的地一次
結(jié)論:訂閱消息的處理邏輯要看發(fā)送消息的qos及訂閱消息的qos,取最下的那個(gè)執(zhí)行。
當(dāng)相同的clientId訂閱同一個(gè)主題時(shí),消息將會(huì)以輪詢的方式被每個(gè)消費(fèi)者消費(fèi)
5.1 發(fā)送qos=0 接收qos=0 1 2
5.1.1 發(fā)送消費(fèi)會(huì)出現(xiàn)異常的消息,也只會(huì)處理一次
5.2 發(fā)送qos=1 2 接收qos=0
效果等同 發(fā)送qos=0 接收qos=0 1 2
5.2.1 發(fā)送消費(fèi)會(huì)出現(xiàn)異常的消息,也只會(huì)處理一次
5.3 發(fā)送qos=1 接收qos=1 2
發(fā)送一次會(huì)導(dǎo)致消費(fèi)失敗的消息
5.3.1 消費(fèi)失敗后會(huì)再次收到失敗的消息,重復(fù)消費(fèi)
5.4 發(fā)送qos=2 接收 2
效果和發(fā)送qos=1 接收qos=1 2的一致
5.4.1 消費(fèi)失敗后會(huì)再次收到失敗的消息,重復(fù)消費(fèi)
6.cleanSession
結(jié)論:無論qos為多少,cleanSession為true時(shí),消費(fèi)端下線時(shí)的消息都不能被消費(fèi)。cleanSession為false時(shí),mqtt broker沒有自動(dòng)清除session時(shí),消費(fèi)端下線時(shí)的消息可以被消費(fèi)。
7. 異常
7.1 消費(fèi)能觸發(fā)消費(fèi)異常的消息
當(dāng)發(fā)送qos=0 接收qos=0,先發(fā)送一條會(huì)觸發(fā)消費(fèi)異常的消息,程序啟動(dòng)后會(huì)觸發(fā)異常,程序會(huì)自動(dòng)退出。如果程序先啟動(dòng)起來,再收到能觸發(fā)異常的消息,程序會(huì)正常執(zhí)行
柚子快報(bào)激活碼778899分享:物聯(lián)網(wǎng) MQTT-Java
相關(guān)鏈接
本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點(diǎn)和立場。
轉(zhuǎn)載請(qǐng)注明,如有侵權(quán),聯(lián)系刪除。