您现在的位置是:Instagram刷粉絲, Ins買粉絲自助下單平台, Ins買贊網站可微信支付寶付款 > 

01 redis發布訂閱模式是基于pull還是push(Spring Boot使用Redis進行消息的發布訂閱 原創)

Instagram刷粉絲, Ins買粉絲自助下單平台, Ins買贊網站可微信支付寶付款2024-05-15 09:50:42【】0人已围观

简介Redis發布訂閱和Stream發布訂單系統是日常開發中經常會用到的功能。簡單來說,就是發布者發布消息,訂閱者就會接受到消息并進行相應的處理,如下圖所示。Redis為我們提供了發布/訂閱的功能模塊Pu

Redis發布訂閱和Stream

發布訂單系統是日常開發中經常會用到的功能。簡單來說,就是發布者發布消息,訂閱者就會接受到消息并進行相應的處理,如下圖所示。

Redis為我們提供了發布/訂閱的功能模塊PubSub,可以用于消息傳遞。

其中發布者publisher、訂閱者subscriber都是redis客戶端,channel則是redis服務器。

發布者publisher向channel發送消息,訂閱該channel的subscriber就會接收到消息。

發布消息publish

訂閱test1、test2的客戶端會收到消息

按照上述這種方式,如果 訂閱者subscriber想要訂閱多個channel 則需要同時指定多個channel的名稱,redis為了解決這個問題提供 psubscribe模式匹配 這種訂閱方式,可以通過通配符的方式匹配頻道。

發布消息

之前訂閱ch*的客戶端就會收到cha頻道和買粉絲頻道的消息,這樣就一次性訂閱多個頻道

redis服務端存儲了訂閱頻道/模式的客戶端列表

相當于如果客戶端訂閱一個頻道 ,那么服務端的 pubsub_channels 就會存儲一條數據, pubsub_channels 其實是一個鏈表,key對應channel,value對應客戶端列表,根據key訂閱的頻道,就可以找到訂閱該頻道的所有客戶端。

同時如果客戶端訂閱一個模式 , pubsub_patterns 也會新增一條數據,記錄當前客戶端訂閱的模式, pubsub_patterns 也有自己的數據結構,其中就包含了客戶端以及模式。

當發布者向某個頻道發布消息時,就會遍歷 pubsub_channels 找到訂閱該頻道的客戶端列表,依次向這些客戶端發送消息。

然后遍歷 pubsub_patterns 找到符合當前頻道的模式,同時找到模式對應的客戶端,然后向客戶端發送消息。

雖然Redis提供了發布/訂閱的功能,但是并不完善,導致基本沒有合適的場景能夠使用。

PubSub缺點:

直到Redis5.0出現之后,出現了Stream這種數據結構,才終于完善了Redis的消息機制 。

Stream實際上就是一個消息列表,只是他幾乎實現了消息隊列所需要的所有功能,包括:

同時需要注意的是Stream只是一個數據結構,他不會主動把消息推送給消費者,需要消費者主動來消費數據 。

每個Stream都有唯一的名稱,它就是Redis的key,首次使用 xadd 指令追加消息時自動創建。

常見操作命令如下表:

如果客戶端希望知道自身消費到第幾條數據了,那么就需要記錄一下當前消費的消息ID,下次再次消費的時候就從上次消費的消息ID開始讀取數據即可。

消費組中多了一個游標 last_delivered_id ,表示當前消費到了哪一條數據。同時所有的數據都是待處理消息( PEL ),只有消費者處理完畢之后使用 ack 指令告知redis服務器,數據才會從 PEL 中移除,確認后的消息就無法再次消費。

如果接收到的消息比較多,為了避免Stream過長,可以選擇指定Stream的最大長度,一旦到達了最大長度,就會從最早的消息開始清除,保證Stream中最新的消息。

大數據核心技術有哪些

大數據技術的體系龐大且復雜,基礎的技術包含數據的采集、數據預處理、分布式存儲、數據庫、數據倉庫、機器學習、并行計算、可視化等。

1、數據采集與預處理:FlumeNG實時日志收集系統,支持在日志系統中定制各類數據發送方,用于收集數據;Zookeeper是一個分布式的,開放源碼的分布式應用程序協調服務,提供數據同步服務。

2、數據存儲:Hadoop作為一個開源的框架,專為離線和大規模數據分析而設計,HDFS作為其核心的存儲引擎,已被廣泛用于數據存儲。HBase,是一個分布式的、面向列的開源數據庫,可以認為是hdfs的封裝,本質是數據存儲、NoSQL數據庫。

3、數據清洗:MapRece作為Hadoop的查詢引擎,用于大規模數據集的并行計算。

4、數據查詢分析:Hive的核心工作就是把SQL語句翻譯成MR程序,可以將結構化的數據映射為一張數據庫表,并提供HQL(HiveSQL)查詢功能。Spark啟用了內存分布數據集,除了能夠提供交互式查詢外,它還可以優化迭代工作負載。

5、數據可視化:對接一些BI平臺,將分析得到的數據進行可視化,用于指導決策服務。

Spring Boot使用Redis進行消息的發布訂閱 原創

/

**

* Redis 相關的配置,包含推送,以及對象編碼的定義

*/

@Configuration

public class RedisConfig {

public @Bean

RedisTemplate redisTemplate(RedisConnectionFactory 買粉絲nnectionFactory) {

RedisTemplate template = new RedisTemplate>();

template.setConnectionFactory(買粉絲nnectionFactory);

template.setDefaultSerializer(new GenericJackson2JsonRedisSerializer());

template.setKeySerializer(new StringRedisSerializer());

template.setHashKeySerializer(new GenericJackson2JsonRedisSerializer());

template.setValueSerializer(new GenericJackson2JsonRedisSerializer());

return template;

}

@Bean

ChannelTopic topic() {

return new ChannelTopic("messageQueue");

}

@Bean

RedisMessageListenerContainer 買粉絲ntainer(RedisConnectionFactory 買粉絲nnectionFactory,

MessageListenerAdapter listenerAdapter) {

RedisMessageListenerContainer 買粉絲ntainer = new RedisMessageListenerContainer();

買粉絲ntainer.setConnectionFactory(買粉絲nnectionFactory);

買粉絲ntainer.addMessageListener(listenerAdapter, new PatternTopic("messageQueue"));

return 買粉絲ntainer;

}

@Bean

MessageListenerAdapter listenerAdapter(Receiver receiver) {

return new MessageListenerAdapter(receiver, "receiveMessage");

}

}

public enum BusinessTypeEnum {

//修改SystemConfig緩存

UPDATE_SYSTEM_CONFIG;

}

//發布者接口

public interface MessagePublisher {

void publish(String message);

}

@Component

public class RedisMessagePublisher implements MessagePublisher{

private final static Logger logger = LoggerFactory.getLogger(RedisMessagePublisher.class);

@Autowired

private RedisTemplate redisTemplate;

@Autowired

private ChannelTopic topic;

@Override

public void publish(String message) {

logger.info("推送信息:"+message);

redisTemplate.買粉絲nvertAndSend(topic.getTopic(), message);

}

}

public interface Receiver { void receiveMessage(String message); }

@Component

public class MessageReceiver implements Receiver {

private final static Logger logger = LoggerFactory.getLogger(MessageReceiver.class);

@Autowired

private ISystemConfigService systemConfigService;

@Override

public void receiveMessage(String message) {

logger.info("消息接收:"+message);

JSONObject object = JSONObject.parseObject(message);

String bus

很赞哦!(41176)

Instagram刷粉絲, Ins買粉絲自助下單平台, Ins買贊網站可微信支付寶付款的名片

职业:程序员,设计师

现居:广东珠海香洲区

工作室:小组

Email:[email protected]