摘要:前言前些日子了解到這樣一個協(xié)議,可以在上達到即時通訊的效果,但網(wǎng)上并不能很方便地找到一篇目前版本的在下正確實現(xiàn)這個協(xié)議的博客。
前言
前些日子了解到mqtt這樣一個協(xié)議,可以在web上達到即時通訊的效果,但網(wǎng)上并不能很方便地找到一篇目前版本的在node下正確實現(xiàn)這個協(xié)議的博客。
自己搗鼓了一段時間,理解不深刻,但也算是基本能夠達到使用目的。
本文目的為對MQTT有需求的學(xué)習(xí)者提供一定程度上的便利,省去了查閱官方文檔及源碼的功夫,但尚未對離線消息的接收順序進行處理。
代碼服務(wù)端: server.js
//服務(wù)端引入中間件mosca let mosca = require("mosca") let settings = { port: 5112 } let server = new mosca.Server(settings) server.on("ready", function(){ console.log("Mosca server is up and running at port 5112"); }) server.on("published", function(packet, client) { console.log("Published", packet.payload) }) server.on("clientDisconnected", function(client){ console.log("disconnected: ", client.id) })
推送端: pub.js
//客戶端引入mqtt let mqtt = require("mqtt"); let client = mqtt.connect("mqtt://localhost", { port: 5112, clientId: "cli_pub", }) let num = 0; setInterval(function (){ client.publish("test", "Hello mqtt " + (++num), {qos:1}, () => console.log(num)); }, 1000)
訂閱端: sub.js
let mqtt = require("mqtt") let client = mqtt.connect("mqtt://localhost", { port: 5112, clientId: "cli_sub", }) client.subscribe("test",{qos:1}) client.on("message", function (topic, message) { console.log("received message: ", message.toString()) })
server運行后,先啟動sub,再啟動pub,即可在sub中接收到推送過來的消息序列
至此實現(xiàn)了簡單的即時推送
要實現(xiàn)消息的離線推送,必然需要一個存儲臨時數(shù)據(jù)的部件
此處用到的是mongo,當(dāng)然可以根據(jù)需要選擇其他的存儲工具
server.js中的settings需更改為:
let settings = { port: 5112, persistence:{ //增加了此項 factory: mosca.persistence.Mongo, url: "mongodb://localhost:27017/mosca" } }
factory: 引入mosca對特定存儲工具的一些處理方法
url: 其中的 27017 為mongo所監(jiān)聽的端口號,mosca為存儲相關(guān)數(shù)據(jù)的數(shù)據(jù)庫
值得一提的是:配置好mongo的環(huán)境后,不需要提前在mongo中手動創(chuàng)建,若數(shù)據(jù)庫不存在會自動生成,而且mosca會為你作好其他一切基本事項 (即:若只想臨時體驗下效果,甚至可以暫時把mongo放一邊 )
在mongo中,可以看到自動新添了db: mosca
及其下的collection(相當(dāng)于關(guān)系型數(shù)據(jù)庫中的表/關(guān)系)
pub.js和sub.js中的client中都可以改為:
let client = mqtt.connect("mqtt://localhost", { port: 5112, clientId: "cli_**", clean: false//增加了此項 })
clientId: 區(qū)分客戶端的識別碼
clean: 此處決定了客戶端在服務(wù)端的session是否會被清除,默認為true,為實現(xiàn)離線推送,我們需要將其保留
clean及上文中的persistence為實現(xiàn)離線推送的關(guān)鍵配置
mqtt.connect()會返回一個mqttClient對象,包含了:reconnect(), subscribe(), publish()等一系列方法。
本文中發(fā)送端接收端被分為了pub.js和sub.js兩個獨立文件,僅僅為了方便在不同控制臺中觀察效果
一個client可以既為推送端,又為訂閱端
至此,所有代碼已完成
其他介紹:client.subscribe():
為本客戶端訂閱一個話題,所有訂閱此話題的用戶都會收到在此話題下推送的信息
//client.subscribe(topic,opts) client.subscribe("test",{qos:1})
opts中的qos為通信機制,控制發(fā)送端與接收端的互鎖程度
上文中的其中一個collection: subscriptions即記錄各用戶話題訂閱情況
用戶cli_sub及cli2_sub訂閱了話題test:
(新增一個cli2_pub,下文有用)
注:
重復(fù)執(zhí)行腳本sub.js實際上對topic進行了重復(fù)訂閱
實際編碼時,應(yīng)避免topic的重復(fù)訂閱,即使重復(fù)訂閱并不影響實現(xiàn)效果
client.publish():
向指定topic發(fā)送數(shù)據(jù)
message為Buffer或String格式,可以通過序列化或轉(zhuǎn)json實現(xiàn)對復(fù)雜數(shù)據(jù)對象的傳送
//client.publish(topic, message, opts, callback) let num = 0; setInterval(function (){ client.publish("test", "Hello mqtt " + (++num), {qos:1}, () => console.log(num)); }, 1000)
參數(shù)不再贅述
此處用一個定時器定時在 topic: test 下發(fā)送"Hello mqtt 1,2,3.."
用回調(diào)函數(shù)實時打印一下發(fā)送的num:
當(dāng)訂閱者處于離線狀態(tài)時,可以在collection: packets中查看到臨時數(shù)據(jù)的存儲情況:
mosca把每一條推送消息為所有訂閱用戶都生成了獨立的記錄,用同一個messageId進行關(guān)聯(lián)
當(dāng)其中一個用戶(cli2_sub)上線時,獲取到其對應(yīng)的數(shù)據(jù),
而后數(shù)據(jù)庫中相應(yīng)記錄便會被刪除
此時僅有cli_sub用戶的數(shù)據(jù)
當(dāng)cli2_sub上線接收消息后,packets中記錄將被清空
client.on():
即在client上觸發(fā)的事件,此處只列舉消息接收事件
//client.on(event, callback) client.on("message", function (topic, message) { console.log("received message: ", message.toString()) })
處理為簡單地打印到控制臺
附mosca.js文檔:
https://www.npmjs.com/package...
mqtt.js文檔:
https://www.npmjs.com/package...
windows環(huán)境下mongo的配置:
https://jingyan.baidu.com/art...
及一位前輩的文章:
https://www.jianshu.com/p/831...
轉(zhuǎn)載請注明出處 ; )
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://www.ezyhdfw.cn/yun/19456.html
摘要:現(xiàn)在很多網(wǎng)站都通過服務(wù)來實現(xiàn)消息推送及數(shù)據(jù)即時同步功能,即時通訊組件逐漸成為產(chǎn)品的標(biāo)配。目前國內(nèi)有很多成熟穩(wěn)定的第三方即時通訊服務(wù)廠家,比如融云。 現(xiàn)在很多網(wǎng)站、APP都通過IM服務(wù)來實現(xiàn)消息推送及數(shù)據(jù)即時同步功能,即時通訊組件逐漸成為產(chǎn)品的標(biāo)配。目前國內(nèi)有很多成熟穩(wěn)定的第三方即時通訊服務(wù)廠家,比如:融云。使用這些專業(yè)的服務(wù)可以提高開發(fā)效率而且服務(wù)穩(wěn)定有保障。 如果自己DIY或者需要在...
摘要:,消息隊列遙測傳輸是開發(fā)的一個即時通訊協(xié)議,有可能成為物聯(lián)網(wǎng)的重要組成部分。會發(fā)生消息丟失或重復(fù)。只有一次,確保消息到達一次。此外,國內(nèi)很多企業(yè)都廣泛使用作為手機客戶端與服務(wù)器端推送消息的協(xié)議。 前幾天寫了一下MQTT協(xié)議實現(xiàn)推送數(shù)據(jù)傳輸,所以我會不定期的更新一下關(guān)注MQTT的知識。 MQTT: MQTT(Message Queuing Telemetry Transport,消息隊列...
閱讀 696·2021-09-22 10:02
閱讀 6652·2021-09-03 10:49
閱讀 622·2021-09-02 09:47
閱讀 2210·2019-08-30 15:53
閱讀 2981·2019-08-30 15:44
閱讀 979·2019-08-30 13:20
閱讀 1872·2019-08-29 16:32
閱讀 940·2019-08-29 12:46