Before introducing the mqadmin management tool, the following points need to be declared:
1. The way of executing a command is:./mqadmin {command} {args}
2. Almost all commands need to attach the -n option to represent the nameServer address, formatted as ip:port;
3. Almost all commands can get help information with the -h option;
4. If the broker address -b option and clusterName -c option are both configured with specific values, the command execution will select the broker address specified by -b option. The value of the -b option can only be configured with a single address. The format is ip:port. The default port value is 10911. If the value of the -b option is not configured, the command will be applied to all brokers in the entire cluster.
5. You can see many commands under tools, but not all commands can be used, only the commands initialized in MQAdminStartup can be used, you can also modify this class, add or customize commands;
6. Due to the issue of version update, a small number of commands may not be updated in time, please read the related command source code to eliminate and resolve the error.
- The way of executing a command is:./mqadmin {command} {args}
- Almost all commands need to attach the -n option to represent the nameServer address, formatted as ip:port;
- Almost all commands can get help information with the -h option;
- If the broker address -b option and clusterName -c option are both configured with specific values, the command execution will select the broker address specified by -b option. The value of the -b option can only be configured with a single address. The format is ip:port. The default port value is 10911. If the value of the -b option is not configured, the command will be applied to all brokers in the entire cluster.
- You can see many commands under tools, but not all commands can be used, only the commands initialized in MQAdminStartup can be used, you can also modify this class, add or customize commands;
- Due to the issue of version update, a small number of commands may not be updated in time, please read the related command source code to eliminate and resolve the error.
## 1 Topic related command instructions
...
...
@@ -262,7 +262,7 @@ Before introducing the mqadmin management tool, the following points need to be
The RocketMQ message model is mainly composed of Producer, Broker and Consumer. The Producer is responsible for producing messages, the Consumer is responsible for consuming messages, and the Broker is responsible for storing messages.
The Broker corresponds to one server during actual deployment, and each Broker can store messages from multiple topics, and messages from each Topic can be stored in a different Broker by sharding strategy.
The Message Queue is used to store physical offsets of messages, and the Message addresses in multiple Topic are stored in multiple Message queues. The Consumer Group consists of multiple Consumer instances.
RocketMQ message model is mainly composed of Producer, Broker and Consumer. The producer is responsible for producing messages and the consumer is for consuming messages, while the broker stores messages.
The broker is an independent server during actual deployment, and each broker can store messages from multiple topics. Even messages from the same topic can be stored in the different brokers by sharding strategy.
The message queue is used to store physical offsets of messages, and the message addresses are stored in seperate queues. The consumer group consists of multiple consumer instances.
## 2 Producer
The Producer is responsible for producing messages, typically by business systems. It sends messages generated by the business application systems to brokers. The RocketMQ provides multiple paradigms of sending: synchronous, asynchronous,sequential and one-way. Both synchronous and asynchronous methods require the Broker to return confirmation information, while one-way sending is not required.
The Producer is responsible for producing messages, typically by business systems. It sends messages generated by the systems to brokers. RocketMQ provides multiple paradigms of sending: synchronous, asynchronous, sequential and one-way. Both synchronous and asynchronous methods require the confirmation information return from the Broker, but one-way method does not require it.
## 3 Consumer
The Consumer is responsible for consuming messages, typically the background system is responsible for asynchronous consumption. The Consumer pulls messages from brokers and feeds them into application. In perspective of user application, two types of consumers are provided:Pull Consumer and Push Consumer.
The Consumer is responsible for consuming messages, typically the background system is responsible for asynchronous consumption. The consumer pulls messages from brokers and feeds them into application. From the perspective of user, two types of consumers are provided: pull consumer and push consumer.
## 4 Topic
The Topic refers to a collection of one kind message. Each topic contains several messages and one message can only belong to one topic. The Topic is the basic unit of RocketMQ for message subscription.
The Topic refers to a collection of one kind of message. Each topic contains several messages and one message can only belong to one topic. The topic is the basic unit of RocketMQ for message subscription.
## 5 Broker Server
As the role of the transfer station, the Broker Server stores, and forwards messages. In the RocketMQ system, the Broker Server is responsible for receiving messages sent from producers, storing them and preparing to handle pull requests. It also stores message related meta data, including consumer groups, consuming progress, topics and queues info.
As the role of the transfer station, the Broker Server stores and forwards messages. In RocketMQ, the broker server is responsible for receiving messages sent from producers, storing them and preparing to handle pull requests. It also stores the related message meta data, including consumer groups, consuming progress, topics, queues info and so on.
## 6 Name Server
The Name Server serves as the provider of routing service. The Producer or Consumer can find the list of Broker IP addresses corresponding each topic through name server. Multiple Name Servers can be deployed as a cluster, but are independent of each other and do not exchange information.
The Name Server serves as the provider of routing service. The producer or the consumer can find the list of broker IP addresses for each topic through name server. Multiple name servers can be deployed in one cluster, but they are independent of each other and do not exchange information.
## 7 Pull Consumer
A type of Consumer. Applications are usually pulls messages from brokers by actively calling the Consumer's pull message method, and the application has the advantages of controlling the timing and frequency of pulling messages. Once batches of messages are pulled, user application initiates consuming process.
A type of Consumer, the application pulls messages from brokers by actively invoking the consumer pull message method, and the application has the advantages of controlling the timing and frequency of pulling messages. Once the batch of messages is pulled, user application will initiate consuming process.
## 8 Push Consumer
A type of Consumer. Under this mode, after the Broker receives the data, it will actively push it to the consumer, which is generally of high real-time performance.
A type of Consumer. Under this high real-time performance mode, it will push the message to the consumer actively when the Broker receives the data.
## 9 Producer Group
A collection of the same type of Producer, which sends the same type of messages with consistent logic. If a transaction message is sent and the original producer crashes after sending, the Broker Server will contacts other Producer in the same Producer group to commit or rollback the transactional message.
A collection of the same type of Producer, which sends the same type of messages with consistent logic. If a transaction message is sent and the original producer crashes after sending, the broker server will contact other producers in the same producer group to commit or rollback the transactional message.
## 10 Consumer Group
A collection of the same type of Consumer, which sends the same type of messages with consistent logic. The Consumer Group makes load-balance and fault-tolerance super easy in terms of message consuming.
Warning: consumer instances of one consumer group must have exactly the same topic subscription(s).
A collection of the same type of Consumer, which sends the same type of messages with consistent logic. The consumer group makes load-balance and fault-tolerance super easy in terms of message consuming.
Warning: consumer instances of one consumer group must have exactly the same topic subscription(s).
RocketMQ supports two type of consumption mode:Clustering and Broadcasting.
RocketMQ supports two types of consumption mode:Clustering and Broadcasting.
## 11 Consumption Mode - Clustering
Under the Clustering mode, all messages from one topic will be delivered to all consumer instances averagely as much as possible. That is, one message can be consumed by only one consumer instance.
Under the Clustering mode, all the messages from one topic will be delivered to all the consumers instances averagely as much as possible. That is, one message can be consumed by only one consumer instance.
## 12 Consumption Mode - Broadcasting
Under the Broadcasting mode, each Consumer instance of the same Consumer Group receives every message that published to the corresponding topic.
Under the Broadcasting mode, each consumer instance of the same consumer group receives every message published to the corresponding topic.
## 13 Normal Ordered Message
Under the Normal Ordered Message mode, the messages received by consumers from the same ConsumeQueue are sequential, while the messages received from different message queues may be non-sequential.
Under the Normal Ordered Message mode, the messages received by consumers from the same ConsumeQueue are sequential, but the messages received from the different message queues may be non-sequential.
## 14 Strictly Ordered Message
Under the Strictly Ordered Message mode, all messages received by the consumers from the same topic are sequential, as the order they were stored.
Under the Strictly Ordered Message mode, all messages received by the consumers from the same topic are sequential as the order they are stored.
## 15 Message
The physical carrier of information transmitted by a messaging system, the smallest unit of production and consumption data, each message must belong to a topic.
Each Message in RocketMQ has a unique Message ID and can carry a key, generally used to store business-related value. The system provides the function to query messages by Message ID and Key.
The physical carrier of information transmitted by a messaging system, the smallest unit of production and consumption data, each message must belong to one topic.
Each Message in RocketMQ has a unique message id and can carry a key used to store business-related value. The system has the function to query messages by its id or key.
## 16 Tag
Flags set for messages to distinguish different types of messages under the same topic, functioning as a "sub-topic". Messages from the same business unit can set different tags under the same topic in terms of different business purposes. The Tag can effectively maintain the clarity and consistency of the code and optimize the query system provided by RocketMQ. The Consumer can realize different "sub-topic" by using Tag, so as to achieve better expansibility.
Flags set for messages to distinguish different types of messages under the same topic, functioning as a "sub-topic". Messages from the same business unit can set different tags under the same topic in terms of different business purposes. The tag can effectively maintain the clarity and consistency of the code and optimize the query system provided by RocketMQ. The consumer can realize different "sub-topic" by using tag in order to achieve better expansibility.
Relative to RocketMQ's Broker cluster, producers and consumers are client. In this section, it mainly describes the common behavior configuration of producers and consumers.
### Client Addressing mode
### 1 Client Addressing mode
```RocketMQ``` can let client find the ```Name Server```, and then find the ```Broker```by the ```Name Server```. Followings show a variety of configurations, and priority level from highly to lower, the highly priority configurations can override the lower priority configurations.
...
...
@@ -36,11 +36,11 @@ By default, the client accesses the HTTP server every 2 minutes, and update the
```
HTTP static server addressing is recommended, because it is simple client deployment, and the Name Server cluster can be upgraded hot.
### Client Configuration
### 2 Client Configuration
```DefaultMQProducer```,```TransactionMQProducer```,```DefaultMQPushConsumer```,```DefaultMQPullConsumer``` all extends the ```ClientConfig``` Class, ```ClientConfig``` as the client common configuration class. Client configuration style like getXXX,setXXX, each of the parameters can config by spring and also config their in the code. Such as the ```namesrvAddr``` parameter: ```producer.setNamesrvAddr("192.168.0.1:9876")```, same with the other parameters.
@@ -70,7 +70,7 @@ HTTP static server addressing is recommended, because it is simple client deploy
| checkRequestHoldMax | 2000 | Producer local buffer request queue size when Broker look back Producer transaction status |
| RPCHook | null | This parameter is passed in when the Producer is creating, including the pre-processing before the message sending and the processing after the message response. The user can do some security control or other operations in the first interface.|
This section focuses on the configuration of the system (JVM/OS)
## **1 JVM Options** ##
The latest released version of JDK 1.8 is recommended. Set the same Xms and Xmx value to prevent the JVM from resizing the heap for better performance. A simple JVM configurations looks like this:
The latest released version of JDK 1.8 is recommended. Set the same Xms and Xmx value to prevent the JVM from resizing the heap for better performance. A simple JVM configuration is as follows:
This is the simplest, but also the riskiest, mode that makes the entire service unavailable once the broker restarts or goes down. Production environments are not recommended, but can be used for local testing and development. Here are the steps to build.
This is the simplest but also the riskiest mode, that makes the entire service unavailable once the broker restarts or goes down. Production environments are not recommended, but can be used for local testing and development. Here are the steps to build.
The boot command shown above is used in the case of a single NameServer.For clusters of multiple NameServer, the address list after the -n argument in the broker boot command is separated by semicolons, for example, 192.168.1.1: 9876;192.161.2: 9876.
The boot command shown above is used in the case of a single NameServer.For clusters of multiple NameServer, the address list after the -n argument in the broker boot command is separated by semicolons, for example, 192.168.1.1: 9876;192.161.2: 9876.
### 3 Multiple Master And Multiple Slave Mode-Asynchronous replication
@@ -5,6 +5,6 @@ RocketMQ - a distributed message queue, is different with all other MQ middlewar
The RocketMQ has two mainly filter types:
* Tag filtering: Consumer can specify not only the message topic but also the message tag values, when subscribing. Multiple tag values need to be separated by '||'. When consumer subscribing a message, it builds the subscription request into a `SubscriptionData` object and sends a Pull message request to the Broker side. Before the Broker side reads data from the RocketMQ file storage layer - Store, it will construct a `MessageFilter` using the `SubscriptionData` object and then pass it to the Store. Store get a record from `ConsumeQueue`, and it will filter the message by the saved tag hashcode, it is unable to filter the messages exactly in the server side because of only the hashcode will be used when filtering, Therefore, after the Consumer pulls the message, it also needs to compare the original tag string of the message. If the original tag string is not same with the expected, the message will be ignored.
* Tag filtering: Consumer can specify not only the message topic but also the message tag values, when subscribing. Multiple tag values need to be separated by '||'. When consumer subscribing a message, it builds the subscription request into a `SubscriptionData` object and sends a pull message request to the Broker side. Before the Broker side reads data from the RocketMQ file storage layer - Store, it will construct a `MessageFilter` using the `SubscriptionData` object and then pass it to the Store. Store get a record from `ConsumeQueue`, and it will filter the message by the saved tag hashcode, it is unable to filter the messages exactly in the server side because of only the hashcode will be used when filtering, Therefore, after the Consumer pulls the message, it also needs to compare the original tag string of the message. If the original tag string is not same with the expected, the message will be ignored.
* SQL92 filtering: This filter behavior is almost same with the above Tag filtering method. The only difference is on the way how Store works. The rocketmq-filter module is responsible for the construction and execution of the real SQL expression. Executing an SQL expression every time a filter is executed affects efficiency, so RocketMQ uses BloomFilter to avoid doing it every time. The expression context of SQL92 is a property of the message.
* SQL92 filtering: This filter behavior is almost same with the above `Tag filtering` method. The only difference is on the way how Store works. The rocketmq-filter module is responsible for the construction and execution of the real SQL expression. Executing an SQL expression every time a filter is executed affects efficiency, so RocketMQ uses BloomFilter to avoid doing it every time. The expression context of SQL92 is a property of the message.
RocketMQ message queue cluster mainly includes four roles: NameServer, Broker (Master/Slave), Producer and Consumer. The basic communication process is as follows:
(1) After Broker start-up, it needs to complete one operation: register itself to NameServer, and then report Topic routing information to NameServer at regular intervals of 30 seconds.
(2) When message producer Producer sends a message as a client, it needs to obtain routing information from the local cache TopicPublishInfoTable according to the Topic of the message. If not, it will be retrieved from NameServer and update to local cache, at the same time, Producer will retrieve routing information from NameServer every 30 seconds by default.
(2) When message Producer sends a message as a client, it needs to obtain routing information from the local cache TopicPublishInfoTable according to the Topic of the message. If not, it will be retrieved from NameServer and update to local cache, at the same time, Producer will retrieve routing information from NameServer every 30 seconds by default.
(3) Message Producer chooses a queue to send the message according to the routing information obtained in 2); Broker receives the message and records it in disk as the receiver of the message.
(4) After message Consumer gets the routing information according to 2) and complete the load balancing of the client, then select one or several message queues to pull messages and consume them.
Message storage is the most complicated and important part of RocketMQ. This section will describe the three aspects of RocketMQ: message storage architecture, PageCache and memory mapping, and RocketMQ's two different disk flushing methods.
Message storage is the most complicated and important part of RocketMQ. This section will describe the three aspects of RocketMQ:
* Message storage architecture
* PageCache and memory mapping
* RocketMQ's two different disk flushing methods.
## 1 Message Storage Architecture
...
...
@@ -18,7 +21,7 @@ The message storage architecture diagram consists of 3 files related to message
From the above architecture of the RocketMQ message storage, we can see RocketMQ uses a hybrid storage structure, that is, all the queues in an instance of the broker share a single log file `CommitLog` to store messages. RocketMQ's hybrid storage structure(messages of multiple topics are stored in one CommitLog) uses a separate storage structure for the data and index parts for Producer and Consumer respectively. The Producer sends the message to the Broker, then the Broker persists the message to the CommitLog file synchronously or asynchronously. As long as the message is persisted to the CommitLog on the disk, the message sent by the Producer will not be lost. Because of this, Consumer will definitely have the opportunity to consume this message. When no message can be pulled, the consumer can wait for the next pull. And the server also supports the long polling mode: if a pull request pulls no messages, the Broker can wait for 30 seconds, as long as new message arrives in this interval, it will be returned directly to the consumer. Here, RocketMQ's specific approach is using Broker's background service thread `ReputMessageService` to continuously dispatch requests and asynchronously build ConsumeQueue (Logical Queue) and IndexFile data.
## 2 PageCache and memory map
## 2 PageCache and Memory Map
PageCache is a cache of files by the operating system to speed up the reading and writing of files. In general, the speed of sequential read and write files is almost the same as the speed of read and write memory. The main reason is that the OS uses a portion of the memory as PageCache to optimize the performance of the read and write operations. For data writing, the OS will first write to the Cache, and then the `pdflush` kernel thread asynchronously flush the data in the Cache to the physical disk. For data reading, if it can not hit the page cache when reading a file at a time, the OS will read the file from the physical disk and prefetch the data files of other neighboring blocks sequentially.
@@ -64,15 +64,15 @@ public class ScheduledMessageProducer {
}
```
### 3.Verification
### 3Verification
You should see messages are consumed about 10 seconds later than their storing time.
### 4.Use scenarios for scheduled messages
### 4Use scenarios for scheduled messages
For example, in e-commerce, if an order is submitted, a delay message can be sent, and the status of the order can be checked after 1 hour. If the order is still unpaid, the order can be cancelled and the inventory released.
### 5.Restrictions on the use of scheduled messages
### 5Restrictions on the use of scheduled messages
Nowadays RocketMq does not support any time delay. It needs to set several fixed delay levels, which correspond to level 1 to 18 from 1s to 2h. Message consumption failure will enter the delay message queue. Message sending time is related to the set delay level and the number of retries.
The following questions are frequently asked with regard to the RocketMQ project in general.
## General
## 1 General
### 1. Why did we create rocketmq project instead of selecting other products?
1. Why did we create rocketmq project instead of selecting other products?
Please refer to [Why RocketMQ](http://rocketmq.apache.org/docs/motivation)
Please refer to [Why RocketMQ](http://rocketmq.apache.org/docs/motivation)
### 2. Do I have to install other softeware, such as zookeeper, to use RocketMQ?
2. Do I have to install other softeware, such as zookeeper, to use RocketMQ?
No. RocketMQ can run independently.
No. RocketMQ can run independently.
## Usage
## 2 Usage
### 1. Where does the newly created Consumer ID start consuming messages?
1. If the topic sends a message within three days, then the consumer start consuming messages from the first message saved in the server.
2. If the topic sends a message three days ago, the consumer starts to consume messages from the latest message in the server, in other words, starting from the tail of message queue.
3. If such consumer is rebooted, then it starts to consume messages from the last consumption location.
 1) If the topic sends a message within three days, then the consumer start consuming messages from the first message saved in the server.
 2) If the topic sends a message three days ago, the consumer starts to consume messages from the latest message in the server, in other words, starting from the tail of message queue.
 3) If such consumer is rebooted, then it starts to consume messages from the last consumption location.
### 2. How to reconsume message when consumption fails?
1. Cluster consumption pattern, The consumer business logic code returns Action.ReconsumerLater, NULL, or throws an exception, if a message failed to be consumed, it will retry for up to 16 times, after that, the message would be descarded.
2. Broadcast consumption patternThe broadcaset consumption still ensures that a message is consumered at least once, but no resend option is provided.
 1) Cluster consumption pattern, The consumer business logic code returns Action.ReconsumerLater, NULL, or throws an exception, if a message failed to be consumed, it will retry for up to 16 times, after that, the message would be descarded.
 2) Broadcast consumption patternThe broadcaset consumption still ensures that a message is consumered at least once, but no resend option is provided.
### 3. How to query the failed message if there is a consumption failure?
1. Using topic query by time, you can query messages within a period of time.
2. Using Topic and Message Id to accurately query the message.
3. Using Topic and Message Key accurately query a class of messages with the same Message Key.
 1) Using topic query by time, you can query messages within a period of time.
 2) Using Topic and Message Id to accurately query the message.
 3) Using Topic and Message Key accurately query a class of messages with the same Message Key.
### 4. Are messages delivered exactly once?
...
...
@@ -37,10 +42,11 @@ RocketMQ ensures that all messages are delivered at least once. In most cases, t
### 5. How to add a new broker?
1. Start up a new broker and register it to the same list of name servers.
2. By default, only internal system topics and consumer groups are created automatically. If you would like to have your business topic and consumer groups on the new node, please replicate them from the existing broker. Admin tool and command lines are provided to handle this.
 1) Start up a new broker and register it to the same list of name servers.
 2) By default, only internal system topics and consumer groups are created automatically. If you would like to have your business topic and consumer groups on the new node, please replicate them from the existing broker. Admin tool and command lines are provided to handle this.
## Configuration related
## 3 Configuration related
The following answers are all default values and can be modified by configuration.
### 1. If you start a producer or consumer failed and the error message is producer group or consumer repeat.
...
...
@@ -76,15 +82,15 @@ Solution: Fastjson version has to be upgraded to rocketmq client dependent versi
### 3. What is the impact of a broker crash.
1. Master crashes
 1) Master crashes
Messages can no longer be sent to this broker set, but if you have another broker set available, messages can still be sent given the topic is present. Messages can still be consumed from slaves.
2. Some slave crash
 2) Some slave crash
As long as there is another working slave, there will be no impact on sending messages. There will also be no impact on consuming messages except when the consumer group is set to consume from this slave preferably. By default, comsumer group consumes from master.
3. All slaves crash
 3) All slaves crash
There will be no impact on sending messages to master, but, if the master is SYNC_MASTER, producer will get a SLAVE_NOT_AVAILABLE indicating that the message is not sent to any slaves. There will also be no impact on consuming messages except that if the consumer group is set to consume from slave preferably. By default, comsumer group consumes from master.
...
...
@@ -92,9 +98,12 @@ There will be no impact on sending messages to master, but, if the master is SYN
This happens when you are trying to send messages to a topic whose routing info is not available to the producer.
1. Make sure that the producer can connect to a name server and is capable of fetching routing meta info from it.
2. Make sure that name servers do contain routing meta info of the topic. You may query the routing meta info from name server through topicRoute using admin tools or web console.
3. Make sure that your brokers are sending heartbeats to the same list of name servers your producer is connecting to.
4. Make sure that the topic’s permssion is 6(rw-), or at least 2(-w-).
 1) Make sure that the producer can connect to a name server and is capable of fetching routing meta info from it.
 2) Make sure that name servers do contain routing meta info of the topic. You may query the routing meta info from name server through topicRoute using admin tools or web console.
 3) Make sure that your brokers are sending heartbeats to the same list of name servers your producer is connecting to.
 4) Make sure that the topic’s permssion is 6(rw-), or at least 2(-w-).
If you can’t find this topic, create it on a broker via admin tools command updateTopic or web console.
Message publication refers to that a producer sends messages to a topic; Message subscription means a consumer follows a topic with certain tags and then consumes data from that topic.
## Message Ordering
## 2 Message Ordering
Message ordering refers to that a group of messages can be consumed orderly as they are published. For example, an order generates three messages: order creation, order payment, and order completion. It only makes sense to consume them in their generated order, but orders can be consumed in parallel at the same time. RocketMQ can strictly guarantee these messages are in order.
Orderly message are divided into global orderly message and partitioned orderly message. Global order means that all messages under a certain topic must be in order, partitioned order only requires each group of messages are consumed orderly.
...
...
@@ -13,32 +13,32 @@ Applicable scenario: the performance requirement is not high, and all messages a
For a given Topic, all messages are partitioned according to sharding key. Messages within the same partition are published and consumed in strict FIFO order. Sharding key is the key field to distinguish message's partition, which is a completely different concept from the key of ordinary messages.
Applicable scenario: high performance requirement, with sharding key as the partition field, messages within the same partition are published and consumed according to FIFO principle strictly.
## Message Filter
## 3 Message Filter
Consumers of RocketMQ can filter messages based on tags as well as support for user-defined attribute filtering. Message filter is currently implemented on the Broker side, with the advantage of reducing the network transmission of useless messages for Consumer and the disadvantage of increasing the burden on the Broker and relatively complex implementation.
## Message Reliability
## 4 Message Reliability
RocketMQ supports high reliability of messages in several situations:
1) Broker shutdown normally
2) Broker abnormal crash
3) OS Crash
4) The machine is out of power, but it can be recovered immediately
5) The machine cannot be started up (the CPU, motherboard, memory and other key equipment may be damaged)
6) Disk equipment damaged
1 Broker shutdown normally
2 Broker abnormal crash
3 OS Crash
4 The machine is out of power, but it can be recovered immediately
5 The machine cannot be started up (the CPU, motherboard, memory and other key equipment may be damaged)
6 Disk equipment damaged
In the four cases of 1), 2), 3), and 4) where the hardware resource can be recovered immediately, RocketMQ guarantees that the message will not be lost or a small amount of data will be lost (depending on whether the flush disk type is synchronous or asynchronous)
5) and 6) are single point of failure and cannot be recovered. Once it happens, all messages on the single point will be lost. In both cases, RocketMQ ensures that 99% of the messages are not lost through asynchronous replication, but a very few number of messages may still be lost. Synchronous double write mode can completely avoid single point of failure, which will surely affect the performance and suitable for the occasion of high demand for message reliability, such as money related applications. Note: RocketMQ supports synchronous double writes since version 3.0.
## At Least Once
## 5 At Least Once
At least Once refers to that every message will be delivered at least once. RocketMQ supports this feature because the Consumer pulls the message locally and does not send an ack back to the server until it has consumed it.
## Backtracking Consumption
## 6 Backtracking Consumption
Backtracking consumption refers to that the Consumer has consumed the message successfully, but the business needs to consume again. To support this function, the message still needs to be retained after the Broker sends the message to the Consumer successfully. The re-consumption is normally based on time dimension. For example, after the recovery of the Consumer system failure, the data one hour ago needs to be re-consumed, then the Broker needs to provide a mechanism to reverse the consumption progress according to the time dimension. RocketMQ supports backtracking consumption by time trace, with the time dimension down to milliseconds.
## Transactional Message
## 7 Transactional Message
RocketMQ transactional message refers to the fact that the application of a local transaction and the sending of a Message operation can be defined in a global transaction which means both succeed or fail simultaneously. RocketMQ transactional message provides distributed transaction functionality similar to X/Open XA, enabling the ultimate consistency of distributed transactions through transactional message.
## Scheduled Message
## 8 Scheduled Message
Scheduled message(delay queue) refers to that messages are not consumed immediately after they are sent to the broker, but waiting to be delivered to the real topic after a specific time.
The broker has a configuration item, messageDelayLevel, with default values “1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”, 18 levels. Users can configure a custom messageDelayLevel. Note that messageDelayLevel is a broker's property rather than a topic's. When sending a message, just set the delayLevel level: msg.setDelayLevel(level). There are three types of levels:
...
...
@@ -50,21 +50,21 @@ Scheduled messages are temporarily saved in a topic named SCHEDULE_TOPIC_XXXX, a
Note that Scheduled messages are counted both the first time they are written and the time they are scheduled to be written to the real topic, so both the send number and the TPS are increased.
## Message Retry
## 9 Message Retry
When the Consumer fails to consume the message, a retry mechanism is needed to make the message to be consumed again. Consumer's consume failure can usually be classified as follows:
- due to the reasons of the message itself, such as deserialization failure, the message data itself cannot be processed (for example, the phone number of the current message is cancelled and cannot be charged), etc. This kind of error usually requires skipping this message and consuming others since immediately retry would be failed 99%, so it is better to provide a timed retry mechanism that retries after 10 seconds.
- due to the reasons of dependent downstream application services are not available, such as db connection is not usable, perimeter network is not unreachable, etc. When this kind of error is encountered, consuming other messages will also result in an error even if the current failed message is skipped. In this case, it is recommended to sleep for 30s before consuming the next message, which will reduce the pressure on the broker to retry the message.
RocketMQ will set up a retry queue named “%RETRY%+consumerGroup” for each consumer group(Note that the retry queue for this topic is for consumer groups, not for each topic) to temporarily save messages cannot be consumed by customer due to all kinds of reasons. Considering that it takes some time for the exception to recover, multiple retry levels are set for the retry queue, and each retry level has a corresponding re-deliver delay. The more retries, the greater the deliver delay. RocketMQ first save retry messages to the delay queue which topic is named “SCHEDULE_TOPIC_XXXX”, then background schedule task will save the messages to “%RETRY%+consumerGroup” retry queue according to their corresponding delay.
## Message Resend
## 10 Message Resend
When a producer sends a message, the synchronous message will be resent if fails, the asynchronous message will retry and oneway message is without any guarantee. Message resend ensures that messages are sent successfully and without lost as much as possible, but it can lead to message duplication, which is an unavoidable problem in RocketMQ. Under normal circumstances, message duplication will not occur, but when there is a large number of messages and network jitter, message duplication will be a high-probability event. In addition, producer initiative messages resend and the consumer load changes will also result in duplicate messages. The message retry policy can be set as follows:
- retryTimesWhenSendFailed: Synchronous message retry times when send failed, default value is 2, so the producer will try to send retryTimesWhenSendFailed + 1 times at most. To ensure that the message is not lost, producer will try sending the message to another broker instead of selecting the broker that failed last time. An exception will be thrown if it reaches the retry limit, and the client should guarantee that the message will not be lost. Messages will resend when RemotingException, MQClientException, and partial MQBrokerException occur.
- retryTimesWhenSendAsyncFailed: Asynchronous message retry times when send failed, asynchronous retry sends message to the same broker instead of selecting another one and does not guarantee that the message wont lost.
- retryAnotherBrokerWhenNotStoreOK: Message flush disk (master or slave) timeout or slave not available (return status is not SEND_OK), whether to try to send to another broker, default value is false. Very important messages can set to true.
## Flow Control
## 11 Flow Control
Producer flow control, because broker processing capacity reaches a bottleneck; Consumer flow control, because the consumption capacity reaches a bottleneck.
Producer flow control:
...
...
@@ -80,7 +80,7 @@ Consumer flow control:
The result of consumer flow control is to reduce the pull frequency.
## Dead Letter Queue
## 12 Dead Letter Queue
Dead letter queue is used to deal messages that cannot be consumed normally. When a message is consumed failed at first time, the message queue will automatically resend the message. If the consumption still fails after the maximum number retry, it indicates that the consumer cannot properly consume the message under normal circumstances. At this time, the message queue will not immediately abandon the message, but send it to the special queue corresponding to the consumer.
RocketMQ defines the messages that could not be consumed under normal circumstances as Dead-Letter Messages, and the special queue in which the Dead-Letter Messages are saved as Dead-Letter Queues. In RocketMQ, the consumer instance can consume again by resending messages in the Dead-Letter Queue using console.
RocketMQ cannot avoid Exactly-Once, so if the business is very sensitive to consumption repetition, it is important to perform deduplication at the business level. Deduplication can be done with a relational database. First, you need to determine the unique key of the message, which can be either msgId or a unique identifier field in the message content, such as the order Id. Determine if a unique key exists in the relational database before consumption. If it does not exist, insert it and consume it, otherwise skip it. (The actual process should consider the atomic problem, determine whether there is an attempt to insert, if the primary key conflicts, the insertion fails, skip directly)
### 2.2 Slow message processing
### 2 Slow message processing
#### 2.2.1 Increase consumption parallelism
#### 2.1 Increase consumption parallelism
Most messages consumption behaviors are IO-intensive, That is, it may be to operate the database, or call RPC. The consumption speed of such consumption behavior lies in the throughput of the back-end database or the external system. By increasing the consumption parallelism, the total consumption throughput can be increased, but the degree of parallelism is increased to a certain extent. Instead it will fall.Therefore, the application must set a reasonable degree of parallelism. There are several ways to modify the degree of parallelism of consumption as follows:
* Under the same ConsumerGroup, increase the degree of parallelism by increasing the number of Consumer instances (note that the Consumer instance that exceeds the number of subscription queues is invalid). Can be done by adding machines, or by starting multiple processes on an existing machine.
* Improve the consumption parallel thread of a single Consumer by modifying the parameters consumeThreadMin and consumeThreadMax.
#### 2.2.2 Batch mode consumption
#### 2.2 Batch mode consumption
Some business processes can increase consumption throughput to a large extent if they support batch mode consumption. For example, order deduction application, it takes 1s to process one order at a time, and it takes only 2s to process 10 orders at a time. In this way, the throughput of consumption can be greatly improved. By setting the consumer's consumeMessageBatchMaxSize to return a parameter, the default is 1, that is, only one message is consumed at a time, for example, set to N, then the number of messages consumed each time is less than or equal to N.
#### 2.2.3 Skip non-critical messages
#### 2.3 Skip non-critical messages
When a message is accumulated, if the consumption speed cannot keep up with the transmission speed, if the service does not require high data, you can choose to discard the unimportant message. For example, when the number of messages in a queue is more than 100,000 , try to discard some or all of the messages, so that you can quickly catch up with the speed of sending messages. The sample code is as follows:
...
...
@@ -40,7 +40,7 @@ public ConsumeConcurrentlyStatus consumeMessage(
}
```
#### 2.2.4 Optimize each message consumption process
#### 2.4 Optimize each message consumption process
For example, the consumption process of a message is as follows:
...
...
@@ -52,7 +52,7 @@ For example, the consumption process of a message is as follows:
There are 4 interactions with the DB in the consumption process of this message. If it is calculated by 5ms each time, it takes a total of 20ms. If the business calculation takes 5ms, then the total time is 25ms, So if you can optimize 4 DB interactions to 2 times, the total time can be optimized to 15ms, which means the overall performance is increased by 40%. Therefore, if the application is sensitive to delay, the DB can be deployed on the SSD hard disk. Compared with the SCSI disk, the former RT will be much smaller.
### 2.3 Print Log
### 3 Print Log
If the amount of messages is small, it is recommended to print the message in the consumption entry method, consume time, etc., to facilitate subsequent troubleshooting.
...
...
@@ -68,33 +68,33 @@ public ConsumeConcurrentlyStatus consumeMessage(
If you can print the time spent on each message, it will be more convenient when troubleshooting online problems such as slow consumption.
### 2.4 Other consumption suggestions
### 4 Other consumption suggestions
#### 2.4.1、Consumer Group and Subscriptions
#### 4.1、Consumer Group and Subscriptions
The first thing you should be aware of is that different Consumer Group can consume the same topic independently, and each of them will have their own consuming offsets. Please make sure each Consumer within the same Group to subscribe the same topics.
#### 2.4.2、Orderly
#### 4.2、Orderly
The Consumer will lock each MessageQueue to make sure it is consumed one by one in order. This will cause a performance loss, but it is useful when you care about the order of the messages. It is not recommended to throw exceptions, you can return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT instead.
#### 2.4.3、Concurrently
#### 4.3、Concurrently
As the name tells, the Consumer will consume the messages concurrently. It is recommended to use this for good performance. It is not recommended to throw exceptions, you can return ConsumeConcurrentlyStatus.RECONSUME_LATER instead.
#### 2.4.4、Consume Status
#### 4.4、Consume Status
For MessageListenerConcurrently, you can return RECONSUME_LATER to tell the consumer that you can not consume it right now and want to reconsume it later. Then you can continue to consume other messages. For MessageListenerOrderly, because you care about the order, you can not jump over the message, but you can return SUSPEND_CURRENT_QUEUE_A_MOMENT to tell the consumer to wait for a moment.
#### 2.4.5、Blocking
#### 4.5、Blocking
It is not recommend to block the Listener, because it will block the thread pool, and eventually may stop the consuming process.
#### 2.4.6、Thread Number
#### 4.6、Thread Number
The consumer use a ThreadPoolExecutor to process consuming internally, so you can change it by setting setConsumeThreadMin or setConsumeThreadMax.
#### 2.4.7、ConsumeFromWhere
#### 4.7、ConsumeFromWhere
When a new Consumer Group is established, it will need to decide whether it needs to consume the historical messages which had already existed in the Broker. CONSUME_FROM_LAST_OFFSET will ignore the historical messages, and consume anything produced after that. CONSUME_FROM_FIRST_OFFSET will consume every message existed in the Broker. You can also use CONSUME_FROM_TIMESTAMP to consume messages produced after the specified timestamp.
One application instance should use one topic as much as possible and the subtype of messages can be marked by tags. Tag provides extra flexibility to users. In the consume subscribing process, the messages filtering can only be handled by using tags when the tags are specified in the message sending process: `message.setTags("TagA")`
One application instance should use one topic as much as possible and the subtype of messages can be marked by tags. Tag provides extra flexibility to users. In the consume subscribing process, the messages filtering can only be handled by using tags when the tags are specified in the message sending process: `message.setTags("TagA")`.
###### 1.2 The Use of Keys
A business key can be set in one message and it will be easier to look up the message on a broker server to diagnose issues during development. Each message will be created index(hash index) by server, instance can query the content of this message by topic and key and who consumes the message.Because of the hash index, make sure that the key should be unique in order to avoid potential hash index conflict.
``` java
...
...
@@ -41,4 +41,4 @@ The message sending is usually a process like below:
* Sever handles request
* Sever returns response to client
The total costing time of sending one message is the sum of costing time of three steps above. Some situations demand that total costing time must be in a quite low level, however, do not take reliable performance into consideration, such as log collection. This kind of application could be called in one-way mode, which means client sends request but not wait for response. In this kind of mode, the cost of sending request is only a call of system operation which means one operation writing data to client socket buffer. Generally, the time cost of this process will be controlled n microseconds level.
\ No newline at end of file
The total costing time of sending one message is the sum of costing time of three steps above. Some situations demand that total costing time must be in a quite low level, however, do not take reliable performance into consideration, such as log collection. This kind of application could be called in one-way mode, which means client sends request but not wait for response. In this kind of mode, the cost of sending request is only a call of system operation which means one operation writing data to client socket buffer. Generally, the time cost of this process will be controlled n microseconds level.
@@ -13,7 +9,7 @@ This document focuses on how to quickly deploy and use RocketMQ clusters that su
| whether the message was sent successfully | Whether the message was successfully consumed | The Key of the message |
| Time spent sending | Time spent consuming | Tag of the message |
## 2. Support for Message Trace Cluster Deployment
## 2 Support for Message Trace Cluster Deployment
### 2.1 Broker Configuration Fille
...
...
@@ -46,7 +42,7 @@ For scenarios with large amount of trace message data , one of the Broker nodes
### 2.4 Start the Broker that Starts the MessageTrace
`nohup sh mqbroker -c ../conf/2m-noslave/broker-a.properties &`
## 3. Save the Topic Definition of Message Trace
## 3 Save the Topic Definition of Message Trace
RocketMQ's message trace feature supports two ways to store trace data:
### 3.1 System-level TraceTopic
...
...
@@ -55,7 +51,7 @@ By default, message track data is stored in the system-level TraceTopic(names:
### 3.2 Custom TraceTopic
If the user is not prepared to store the message track data in the system-level default TraceTopic, you can also define and create a user-level Topic to save the track (that is, to create a regular Topic to save the message track data)。The following section introduces how the Client interface supports the user-defined TraceTopic.
## 4. Client Practices that Support Message Trace
## 4 Client Practices that Support Message Trace
In order to reduce as much as possible the transformation work of RocketMQ message trace feature used in the user service system, the author added a switch parameter (**enableMsgTrace**) to the original interface in the design to realize whether the message trace is opened or not.
### 4.1 Opening the Message Trace when Sending the Message
> Problem: Sometimes after deploying the RocketMQ cluster, when you try to execute some commands of "mqadmin", the following exception will appear:
>
...
...
@@ -10,7 +10,7 @@
Solution: Execute `export NAMESRV_ADDR=ip:9876` (ip refers to the address of NameServer deployed in the cluster) on the VM that deploys the RocketMQ cluster.Then you will execute commands of "mqadmin" successfully.
## 2. The inconsistent version of RocketMQ between the producer and consumer leads to the problem that message can't be consumed normally.
## 2 The inconsistent version of RocketMQ between the producer and consumer leads to the problem that message can't be consumed normally.
> Problem: The same producer sends a message, consumer A can consume, but consumer B can't consume, and the RocketMQ Console appears:
>
...
...
@@ -20,7 +20,7 @@ Solution: Execute `export NAMESRV_ADDR=ip:9876` (ip refers to the address of Na
Solution: The jar package of RocketMQ, such as rocketmq-client, should be the same version on the consumer and producer.
## 3. When adding a new topic consumer group, historical messages can't be consumed.
## 3 When adding a new topic consumer group, historical messages can't be consumed.
> Problem: When a new consumer group of the same topic is started, the consumed message is the current offset message, and the historical message is not obtained.
In some cases, the Consumer needs to reset the consume position to 1-2 days ago. At this time, on the Master Broker with limited memory, the CommitLog will carry a relatively heavy IO pressure, affecting the reading and writing of other messages on that Broker. You can enable `slaveReadEnable=true`. When Master Broker finds that the difference between the Consumer's consume position and the latest value of CommitLog exceeds the percentage of machine's memory (`accessMessageInMemoryMaxRatio=40%`), it will recommend Consumer to read from Slave Broker and relieve Master Broker's IO.
## 5. Performance
## 5 Performance
Asynchronous flush disk is recommended to use spin lock.
...
...
@@ -64,7 +64,7 @@ Asynchronous flush disk is recommended to open `TransientStorePoolEnable` and cl
Synchronous flush disk is recommended to increase the `sendMessageThreadPoolNums` appropriately. The specific configuration needs to be tested.
## 6. The meaning and difference between msgId and offsetMsgId in RocketMQ
## 6 The meaning and difference between msgId and offsetMsgId in RocketMQ
After sending message with RocketMQ, you will usually see the following log:
* 1. Finish dispatching the messages fall behind, then to start other services.
* 2. DLedger committedPos may be missing, so here just require dispatchBehindBytes <= 0
*/
while(true){
if(dispatchBehindBytes()<=0){
break;
}
Thread.sleep(1000);
log.info("Try to finish doing reput the messages fall behind during the starting, reputOffset={} maxOffset={} behind={}",this.reputMessageService.getReputFromOffset(),this.getMaxPhyOffset(),this.dispatchBehindBytes());
log.warn("The reputFromOffset={} is smaller than minPyOffset={}, this usually indicate that the dispatch behind too much and the commitlog has expired.",