This article mainly introduces ” How to apply Redis special data type stream”. In daily operations, I believe many people have doubts about how to use apply Redis special data type stream. The editor has consulte various materials and sorted out simple and easy-to-use operation methods. I hope it will help you answer the doubts about “How to apply Redis special data type stream”! Next, please follow the editor to learn together!
Redis Stream is a new data type add in Redis 5.0. It is a data type specially designe for message queues.
Before Redis 5.0 Stream was release
The implementation methods of message queues all had their own flaws, such as:
-
The publish-subscribe model cannot persist messages and cannot reliably save messages, and the client that reconnects offline cannot read historical messages.
-
The way List implements message queues cannot be consume repeatedly. A message will be delete after it is consume, and the producer needs to implement a globally unique ID by itself.
Base on the above problems, Redis 5.0 launche the Stream type, which is also the most important feature of this version, for perfectly implementing message queues. It supports message persistence, automatic generation of globally unique IDs, ack message confirmation mode, consumer group mode, etc., making message queues more stable and reliable.
Common commands
Stream message queue operation commands:
-
XADD: insert messages, ensure order, and automatically generate a globally unique ID
-
XDEL: delete the message according to the message ID;
-
DEL : delete the entire Stream;
-
XLEN: query message length;
-
XREAD: use to read messages, data can be read by ID;
-
XRANGE: read range messages;
-
XTRIM: trim the number of queue messages;
-
XGROUP CREATE: Create a consumer group;
-
XREADGROUP: read messages by consumer group;
-
XPENDING and XACK:
The XPENDING command can be use to query the messages that have been “read but not yet confirme” by all consumers in each consumer group;
The XACK command is use to confirm to the overseas chinese in europe data message queue that the message processing has been complete;
Application Scenario
Message Queues
The producer inserts a message via the XADD command
When a consumer reads messages from a message queue using the XREAD command, it can specify a message ID and start reading from the next message of this message ID (note that it starts reading from the next message of the input message ID, not querying the message of the input ID).
If you want to implement a blocking read
You can set the BLOCK configuration item when calling XRAE to implement a blocking read operation similar to BRPOP.
For example what is employer branding and why is it important? The following command sets the configuration item of BLOCK 10000. The unit of 10000 is milliseconds, which means that when XREAD reads the latest message, if no message arrives, XREAD will block for 10000 milliseconds (ie 10 seconds) and then return.
The basic method of Stream is to use xadd to store messages and xread to read messages in a blocking loop to implement a simple message queue. The interaction process is shown in the following figure:
The operations introduce above are also support by List. Next, let’s look at the functions specific to Stream.
Stream can use XGROUP to create a cz leads consumer group. After creating a consumer group, Stream can use the XREADGROUP command to allow consumers in the consumer group to read messages.
Create two consumer groups
The message queue consume by these two consumer groups is mymq. Both groups specify to start reading from the first message:
The command for consumer 1 in consumer group group1 to read all messages from the mymq message queue is as follows:
Once a message in a message queue is read. By a consumer in a consumer group, it can no longer be read by other consumers in the consumer group. That is, consumers in the same consumer group cannot consume the same message.
For example, after executing the XREADGROUP
However, consumers in different consumer groups can consume. The same message (but there is a prerequisite. When creating the message group, different consumer groups specify the same position to start reading the message).
Because I create two consumer groups and both start reading from the first message. You can see that the second consumer group can still consume the message with id 1665058759764-0. Therefore, consumers in different consumer groups can consume the same message.
The purpose of using a consumer group is to allow multiple consumers in the group to share the burden of reading messages. Therefore, we usually let each consumer read part of the message so that the message reading load is evenly distribute among multiple consumers.
For example, we execute the following command to let consumer1, 2, and 3 in group2 read a message respectively.
For a message queue implemente base on Stream, how can we ensure that consumers can still read unprocesse messages after a failure or downtime and restart?
Streams will automatically use an internal queue
Consumption confirmation increases the reliability of the message. Generally, after the business processing is complete, you need to execute the XACK command to confirm that the message has been consume. The execution of the entire process is shown in the following figure:
If the consumer does not successfully process the message, it will not send the XACK command to Streams, and the message will still be retaine. At this time, after restarting, the consumer can use the XPENDING command to view the messages that have been read but not yet confirme to be process.
For example, let’s check the number of messages that each consumer in group2 has read but not yet confirmed. The command is as follows:
If you want to see what data a consumer has read, you can execute the following command:
You can see that the ID of the message that consumer
Once message 1665060633903-0 is process by consumer2, consumer2 can use the XACK command to notify Streams, and then the message will be delete.
When we use the XPENDING command again, we can see that consumer2 no longer has any messages that have been read but not yet confirme.
summary
Well, that’s all about the message queue base on Stream. Let’s summarize:
-
Message order preservation: XADD/XREAD
-
Blocking read: XREAD block
-
Duplicate message processing: When using the XADD command, Stream will automatically generate a globally unique ID;
-
Message reliability: PENDING List is use internally to automatically save messages. XPENDING command is use to view messages that have been read but not confirm by the consumer group. Consumers use XACK to confirm messages.
-
Support consumption data in the form of consumption groups
What are the differences between Redis-base Stream message queues and professional message queues?
A professional message queue must do two things:
-
The message cannot be lost.
-
Messages can pile up.
1. Will Redis Stream messages be lost?
Using a message queue is actually divide into three parts: producers, queue middleware, and consumers. Therefore, to ensure the message is to ensure that data is not lost in any of the three links.
Can the Redis Stream message queue ensure that no data is lost in the three links?
-
Will the Redis producer lose messages? Whether the producer will lose messages depends on whether the producer handles the exceptions reasonably. From the time the message is produced and then submitted to MQ, as long as the ack confirmation response (from the MQ middleware) is received normally, it means that the message is sent successfully. Therefore, as long as the return value and exception are handled properly, and if an exception is returned, the message is resent, then there will be no message loss at this stage.
-
Will Redis consumers lose messages? No, because Stream (MQ middleware) will automatically use the internal queue (also called PENDING List) to retain the messages read by each consumer in the consumer group, but not confirmed. After restarting, the consumer can use the XPENDING command to view the messages that have been read but not confirmed to be processed. After the consumer executes the business logic, send the consumption confirmation XACK command to ensure that the message is not lost.
-
Will Redis message middleware lose messages? Yes, Redis will cause data loss in the following two scenarios:
AOF persistence is configure to write to disk every second
But this writing process is asynchronous, and there is a possibility of data loss when Redis crashes;
Master-slave replication is also asynchronous, and there is a possibility of data loss when switching between master and slave(opens new window).
As you can see, Redis cannot guarantee that messages will not be lost in the queue middleware link. Professional queue middleware such as RabbitMQ or Kafka deploys a cluster when in use. When producers publish messages, the queue middleware usually writes “multiple nodes”, that is, there are multiple copies. In this way, even if one of the nodes fails, the cluster data can be guaranteed not to be lost.
2. Can Redis Stream messages be accumulated?
Redis data is stored in memory, which means that once message backlog occurs, Redis memory will continue to grow. If it exceeds the machine memory limit, it will face the risk of OOM.
Therefore, Redis Stream provides the function of specifying the maximum length of the queue to avoid this situation.
When the maximum queue length is specified, when the queue length exceeds the upper limit, old messages will be deleted and only new messages of a fixed length will be retained. From this point of view, if the maximum length is specified when the Stream is backlogged, messages may still be lost.
However, the data of professional message queues such as Kafka and RabbitMQ are all stored on disk. When messages are accumulated, they simply take up more disk space.
Therefore, when using Redis as a queue, you will face two problems:
-
Redis itself may lose data;
-
Faced with message squeeze, memory resources will be tight;
Therefore, whether or not you can use Redis as a message queue depends on your business scenario:
-
If your business scenario is simple enough, you are not sensitive to data loss, and the probability of message backlog is relatively small, it is completely acceptable to use Redis as a queue.
-
If your business has a large number of messages, the probability of message backlog is relatively high, and data loss is unacceptable, then it is better to use professional message queue middleware.
Supplement: Why can’t the Redis publish/subscribe mechanism be used as a message queue?
The publish-subscribe mechanism has the following disadvantages
-
The publish/subscribe mechanism is not implemented based on any data type, so it does not have the ability of “data persistence”, that is, the related operations of the publish/subscribe mechanism will not be written to RDB and AOF. When Redis crashes and restarts, all the data of the publish/subscribe mechanism will be lost.
-
The publish-subscribe mode is a “fire and forget” working mode. If a subscriber goes offline and reconnects, he cannot consume previous historical messages.
-
When there is a certain message backlog on the consumer side, that is, when the consumer cannot consume the messages sent by the producer, if it exceeds 32M or remains above 8M for 60 seconds, the consumer side will be forcibly disconnected. This parameter is set in the configuration file. The default value is client-output-buffer-limit pubsub 32mb 8mb 60.
Therefore, the publish/subscribe mechanism is only suitable for real-time communication scenarios, such as the scenario of building a Sentinel cluster(opens new window) which adopts the publish/subscribe mechanism.