您现在的位置是:Instagram刷粉絲, Ins買粉絲自助下單平台, Ins買贊網站可微信支付寶付款 > 

04 發布訂閱消息傳遞模式(如何采用MQTT協議實現android消息推送)

Instagram刷粉絲, Ins買粉絲自助下單平台, Ins買贊網站可微信支付寶付款2024-05-07 15:12:58【】2人已围观

简介3publicSubscribeClient(Stringi){024try{025mqttClient=newMqttClient(CONNECTION_STRING);026SimpleCallb

3 public SubscribeClient(String i){ 024 try { 025 mqttClient = new MqttClient(CONNECTION_STRING);026 SimpleCallbackHandler simpleCallbackHandler = new SimpleCallbackHandler();027 mqttClient

registerSimpleHandler(simpleCallbackHandler);//注冊接收消息方法028 mqttClient

買粉絲nnect(CLIENT_ID+i, CLEAN_START, KEEP_ALIVE);029 mqttClient

subscribe(TOPICS, QOS_VALUES);//訂閱接主題030031 /**032 * 完成訂閱后,可以增加心跳,保持網絡通暢,也可以發布自己的消息033 */034035 mqttClient

publish(PUBLISH_TOPICS, "keepalive"

getBytes(), QOS_VALUES[0], true);036037 } catch (MqttException e) { 038 // TODO Auto-generated catch block039 e

printStackTrace();040 }041 }042043 /**044 * 簡單回調函數,處理client接收到的主題消息045 * @author pig046 *047 */048 class SimpleCallbackHandler implements MqttSimpleCallback{ 049050 /**051 * 當客戶機和broker意外斷開時觸發052 * 可以再此處理重新訂閱053 */054 @Override055 public void 買粉絲nnectionLost() throws Exception { 056 // TODO Auto-generated method stub057 System

out

println("客戶機和broker已經斷開");058 }059060 /**061 * 客戶端訂閱消息后,該方法負責回調接收處理消息062 */063 @Override064 public void publishArrived(String topicName, byte[] payload, int Qos, booleanretained) throws Exception { 065 // TODO Auto-generated method stub066 System

out

println("訂閱主題: " + topicName);067 System

out

println("消息數據: " + new String(payload));068 System

out

println("消息級別(0,1,2): " + Qos);069 System

out

println("是否是實時發送的消息(false=實時,true=服務器上保留的最后消息): " + retained);070 }071072 }073074 /**075 * 高級回調076 * @author pig077 *078 */079 class AdvancedCallbackHandler implements MqttSimpleCallback{ 080081 @Override082 public void 買粉絲nnectionLost() throws Exception { 083 // TODO Auto-generated method stub084085 }086087 @Override088 public void publishArrived(String arg0, byte[] arg1, int arg2,089 boolean arg3) throws Exception { 090 // TODO Auto-generated method stub091092 }093094 }095096 /**097 * @param args098 */099 public static void main(String[] args) { 100 // TODO Auto-generated method stub101 new SubscribeClient("" + i);102103 }104105 }

Redis發布訂閱和Stream

發布訂單系統是日常開發中經常會用到的功能。簡單來說,就是發布者發布消息,訂閱者就會接受到消息并進行相應的處理,如下圖所示。

Redis為我們提供了發布/訂閱的功能模塊PubSub,可以用于消息傳遞。

其中發布者publisher、訂閱者subscriber都是redis客戶端,channel則是redis服務器。

發布者publisher向channel發送消息,訂閱該channel的subscriber就會接收到消息。

發布消息publish

訂閱test1、test2的客戶端會收到消息

按照上述這種方式,如果 訂閱者subscriber想要訂閱多個channel 則需要同時指定多個channel的名稱,redis為了解決這個問題提供 psubscribe模式匹配 這種訂閱方式,可以通過通配符的方式匹配頻道。

發布消息

之前訂閱ch*的客戶端就會收到cha頻道和買粉絲頻道的消息,這樣就一次性訂閱多個頻道

redis服務端存儲了訂閱頻道/模式的客戶端列表

相當于如果客戶端訂閱一個頻道 ,那么服務端的 pubsub_channels 就會存儲一條數據, pubsub_channels 其實是一個鏈表,key對應channel,value對應客戶端列表,根據key訂閱的頻道,就可以找到訂閱該頻道的所有客戶端。

同時如果客戶端訂閱一個模式 , pubsub_patterns 也會新增一條數據,記錄當前客戶端訂閱的模式, pubsub_patterns 也有自己的數據結構,其中就包含了客戶端以及模式。

當發布者向某個頻道發布消息時,就會遍歷 pubsub_channels 找到訂閱該頻道的客戶端列表,依次向這些客戶端發送消息。

然后遍歷 pubsub_patterns 找到符合當前頻道的模式,同時找到模式對應的客戶端,然后向客戶端發送消息。

雖然Redis提供了發布/訂閱的功能,但是并不完善,導致基本沒有合適的場景能夠使用。

PubSub缺點:

直到Redis5.0出現之后,出現了Stream這種數據結構,才終于完善了Redis的消息機制 。

Stream實際上就是一個消息列表,只是他幾乎實現了消息隊列所需要的所有功能,包括:

同時需要注意的是Stream只是一個數據結構,他不會主動把消息推送給消費者,需要消費者主動來消費數據 。

每個Stream都有唯一的名稱,它就是Redis的key,首次使用 xadd 指令追加消息時自動創建。

常見操作命令如下表:

如果客戶端希望知道自身消費到第幾條數據了,那么就需要記錄一下當前消費的消息ID,下次再次消費的時候就從上次消費的消息ID開始讀取數據即可。

消費組中多了一個游標 last_delivered_id ,表示當前消費到了哪一條數據。同時所有的數據都是待處理消息( PEL ),只有消費者處理完畢之后使用 ack 指令告知redis服務器,數據才會從 PEL 中移除,確認后的消息就無法再次消費。

如果接收到的消息比較多,為了避免Stream過長,可以選擇指定Stream的最大長度,一旦到達了最大長度,就會從最早的消息開始清除,保證Stream中最新的消息。

很赞哦!(85)

Instagram刷粉絲, Ins買粉絲自助下單平台, Ins買贊網站可微信支付寶付款的名片

职业:程序员,设计师

现居:江苏盐城阜宁县

工作室:小组

Email:[email protected]