diff --git a/.github/ISSUE_TEMPLATE/issue_template.md b/.github/ISSUE_TEMPLATE/issue_template.md index a77fb61d06977796a3f4d465e3f1867c9338a4f4..1c8fa94aac20a1fb077162d331b5434542c8114d 100644 --- a/.github/ISSUE_TEMPLATE/issue_template.md +++ b/.github/ISSUE_TEMPLATE/issue_template.md @@ -4,7 +4,7 @@ about: Describe this issue template's purpose here. --- -The issue tracker is **ONLY** used for bug report and feature request. Keep in mind, please check whether there is an existing same report before your raise a new one. +The issue tracker is **ONLY** used for bug report(feature request need to follow [RIP process](https://github.com/apache/rocketmq/wiki/RocketMQ-Improvement-Proposal)). Keep in mind, please check whether there is an existing same report before your raise a new one. Alternately (especially if your communication is not a bug report), you can send mail to our [mailing lists](http://rocketmq.apache.org/about/contact/). We welcome any friendly suggestions, bug fixes, collaboration and other improvements. diff --git a/.travis.yml b/.travis.yml index 8f65c72f7d0717ec597b59cf48cc2aa45b458029..f3bcb1db58f25ce911dfbab66d38542ea6999f4b 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,3 +1,5 @@ +dist: trusty + notifications: email: recipients: @@ -7,6 +9,9 @@ notifications: language: java +jdk: + - oraclejdk8 + matrix: include: # On OSX, run with default JDK only. @@ -18,8 +23,8 @@ matrix: before_install: - echo 'MAVEN_OPTS="$MAVEN_OPTS -Xmx1024m -XX:MaxPermSize=512m -XX:+BytecodeVerificationLocal"' >> ~/.mavenrc - cat ~/.mavenrc - - if [ "$TRAVIS_OS_NAME" == "osx" ]; then export JAVA_HOME=$(/usr/libexec/java_home); fi - - if [ "$TRAVIS_OS_NAME" == "linux" ]; then jdk_switcher use "$CUSTOM_JDK"; fi + - if [[ "$TRAVIS_OS_NAME" == "osx" ]]; then export JAVA_HOME=$(/usr/libexec/java_home); fi + - if [[ "$TRAVIS_OS_NAME" == "linux" ]]; then jdk_switcher use "$CUSTOM_JDK"; fi script: - travis_retry mvn -B clean apache-rat:check diff --git a/NOTICE b/NOTICE index 703c28b27c5cc6c509526a587a56d806b8afa79e..85e2dc3f51128f29710daaa4bf49138a96df37a1 100644 --- a/NOTICE +++ b/NOTICE @@ -1,5 +1,5 @@ Apache RocketMQ -Copyright 2016-2018 The Apache Software Foundation +Copyright 2016-2019 The Apache Software Foundation This product includes software developed at -The Apache Software Foundation (http://www.apache.org/). \ No newline at end of file +The Apache Software Foundation (http://www.apache.org/). diff --git a/README.md b/README.md index f964f46fc99fcc2cb56e8121441e14d95c0edb99..98d97a00ffd2badb69c1f7130144c2e942fc08e2 100644 --- a/README.md +++ b/README.md @@ -32,6 +32,7 @@ It offers a variety of features: * Home: * Docs: * Issues: +* Rips: * Ask: * Slack: @@ -43,7 +44,7 @@ It offers a variety of features: ---------- ## Contributing -We always welcome new contributions, whether for trivial cleanups, big new features or other material rewards, more details see [here](http://rocketmq.apache.org/docs/how-to-contribute/). +We always welcome new contributions, whether for trivial cleanups, [big new features](https://github.com/apache/rocketmq/wiki/RocketMQ-Improvement-Proposal) or other material rewards, more details see [here](http://rocketmq.apache.org/docs/how-to-contribute/). ---------- ## License diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java index 9873aca89dc637c10bab7c03cf254cae6fa103c3..63b512df7d5374857624d81892acdf2c61252814 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java @@ -86,6 +86,7 @@ public class TransactionMQProducer extends DefaultMQProducer { throw new MQClientException("TransactionListener is null", null); } + msg.setTopic(NamespaceUtil.wrapNamespace(this.getNamespace(), msg.getTopic())); return this.defaultMQProducerImpl.sendMessageInTransaction(msg, null, arg); } diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageBatch.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageBatch.java index ca2ce88ec7c523aa26ae0459643913435abc6e25..a6b801edab59b1c5774aa9f8729d98af13ec53a8 100644 --- a/common/src/main/java/org/apache/rocketmq/common/message/MessageBatch.java +++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageBatch.java @@ -46,7 +46,7 @@ public class MessageBatch extends Message implements Iterable { Message first = null; for (Message message : messages) { if (message.getDelayTimeLevel() > 0) { - throw new UnsupportedOperationException("TimeDelayLevel in not supported for batching"); + throw new UnsupportedOperationException("TimeDelayLevel is not supported for batching"); } if (message.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { throw new UnsupportedOperationException("Retry Group is not supported for batching"); diff --git a/distribution/NOTICE-BIN b/distribution/NOTICE-BIN index c91dc225fb2aacaefb00362b36a6413fbb0a4d17..c2f511fafe6decee5b827d47bbb52ac512fb8474 100644 --- a/distribution/NOTICE-BIN +++ b/distribution/NOTICE-BIN @@ -1,5 +1,5 @@ Apache RocketMQ -Copyright 2016-2017 The Apache Software Foundation +Copyright 2016-2019 The Apache Software Foundation This product includes software developed at The Apache Software Foundation (http://www.apache.org/). @@ -33,4 +33,4 @@ components that this product depends on. ------ This product has a bundle commons-lang, which includes software from the Spring Framework, -under the Apache License 2.0 (see: StringUtils.containsWhitespace()) \ No newline at end of file +under the Apache License 2.0 (see: StringUtils.containsWhitespace()) diff --git a/distribution/bin/tools.cmd b/distribution/bin/tools.cmd index 28ce765326724a21f9c9019d0607a14df14b8a47..0e25c39b1520334b1799159ee1330ed632ca805d 100644 --- a/distribution/bin/tools.cmd +++ b/distribution/bin/tools.cmd @@ -29,7 +29,7 @@ rem ============================================================================ rem JVM Configuration rem =========================================================================================== set "JAVA_OPT=%JAVA_OPT% -server -Xms1g -Xmx1g -Xmn256m -XX:PermSize=128m -XX:MaxPermSize=128m" -set "JAVA_OPT=%JAVA_OPT% -Djava.ext.dirs="%BASE_DIR%\lib";"%JAVA_HOME%\jre\lib\ext"" +set "JAVA_OPT=%JAVA_OPT% -Djava.ext.dirs="%BASE_DIR%\lib";"%JAVA_HOME%\jre\lib\ext";"%JAVA_HOME%\lib\ext"" set "JAVA_OPT=%JAVA_OPT% -cp "%CLASSPATH%"" -"%JAVA%" %JAVA_OPT% %* \ No newline at end of file +"%JAVA%" %JAVA_OPT% %* diff --git a/distribution/bin/tools.sh b/distribution/bin/tools.sh index 66862cac5600efd890eb42021f73bbe0cb8023bf..645a5959bbc8b22eb9717c720897182381bb5a22 100644 --- a/distribution/bin/tools.sh +++ b/distribution/bin/tools.sh @@ -37,7 +37,7 @@ export CLASSPATH=.:${BASE_DIR}/conf:${CLASSPATH} # JVM Configuration #=========================================================================================== JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn256m -XX:PermSize=128m -XX:MaxPermSize=128m" -JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${BASE_DIR}/lib:${JAVA_HOME}/jre/lib/ext" +JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${BASE_DIR}/lib:${JAVA_HOME}/jre/lib/ext:${JAVA_HOME}/lib/ext" JAVA_OPT="${JAVA_OPT} -cp ${CLASSPATH}" $JAVA ${JAVA_OPT} $@ diff --git a/docs/cn/best_practice.md b/docs/cn/best_practice.md index 731a24be267c8f3fcf38d33349c6306d900864c5..3b64ed66ec99c4c4a60645c2a199be21bf8ada4c 100755 --- a/docs/cn/best_practice.md +++ b/docs/cn/best_practice.md @@ -190,7 +190,7 @@ msgId一定是全局唯一标识符,但是实际使用中,可能会存在相 | storePathConsumerQueue | $HOME/store/consumequeue/ | 存储 consume queue 的路径 | | mapedFileSizeCommitLog | 1024 * 1024 * 1024(1G) | commit log 的映射文件大小 |​ | deleteWhen | 04 | 在每天的什么时间删除已经超过文件保留时间的 commit log |​ -| fileReserverdTime | 72 | 以小时计算的文件保留时间 |​ +| fileReservedTime | 72 | 以小时计算的文件保留时间 |​ | brokerRole | ASYNC_MASTER | SYNC_MASTER/ASYNC_MASTER/SLAVE |​ | flushDiskType | ASYNC_FLUSH | SYNC_FLUSH/ASYNC_FLUSH SYNC_FLUSH 模式下的 broker 保证在收到确认生产者之前将消息刷盘。ASYNC_FLUSH 模式下的 broker 则利用刷盘一组消息的模式,可以取得更好的性能。 |​ diff --git a/docs/en/Concept.md b/docs/en/Concept.md index 76788b92196e5ec6fee29c97a05c838513e79b7b..75f084e1723465f7c1fec96642a648698673c2c3 100644 --- a/docs/en/Concept.md +++ b/docs/en/Concept.md @@ -22,7 +22,7 @@ A type of Consumer, Under this high real-time performance mode, it will push the ## 9 Producer Group A collection of the same type of Producer, which sends the same type of messages with consistent logic. If a transaction message is sent and the original producer crashes after sending, the broker server will contact other producers in the same producer group to commit or rollback the transactional message. ## 10 Consumer Group -A collection of the same type of Consumer, which sends the same type of messages with consistent logic. The consumer group makes load-balance and fault-tolerance super easy in terms of message consuming. +A collection of the same type of Consumer, which consume the same type of messages with consistent logic. The consumer group makes load-balance and fault-tolerance super easy in terms of message consuming. Warning: consumer instances of one consumer group must have exactly the same topic subscription(s). RocketMQ supports two types of consumption mode:Clustering and Broadcasting. diff --git a/docs/en/Example_OpenMessaging.md b/docs/en/Example_OpenMessaging.md new file mode 100644 index 0000000000000000000000000000000000000000..026e76e902d2f7abdeab4e95bb078fc4f15f6654 --- /dev/null +++ b/docs/en/Example_OpenMessaging.md @@ -0,0 +1,118 @@ +# OpenMessaging Example +[OpenMessaging](https://openmessaging.github.io/), which includes the establishment of industry guidelines and messaging, streaming specifications to provide a common framework for finance, ecommerce, IoT and big-data area. The design principles are the cloud-oriented, simplicity, flexibility, and language independent in distributed heterogeneous environments. Conformance to these specifications will make it possible to develop a heterogeneous messaging applications across all major platforms and operating systems. + +RocketMQ provides a partial implementation of OpenMessaging 0.1.0-alpha, the following examples demonstrate how to access RocketMQ based on OpenMessaging. + +## OMSProducer +The following example shows how to send message to RocketMQ broker in synchronous, asynchronous, or one-way transmissions. + +``` +public class OMSProducer { + public static void main(String[] args) { + final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory + .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace"); + + final Producer producer = messagingAccessPoint.createProducer(); + + messagingAccessPoint.startup(); + System.out.printf("MessagingAccessPoint startup OK%n"); + + producer.startup(); + System.out.printf("Producer startup OK%n"); + + { + Message message = producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))); + SendResult sendResult = producer.send(message); + System.out.printf("Send sync message OK, msgId: %s%n", sendResult.messageId()); + } + + { + final Promise result = producer.sendAsync(producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8")))); + result.addListener(new PromiseListener() { + @Override + public void operationCompleted(Promise promise) { + System.out.printf("Send async message OK, msgId: %s%n", promise.get().messageId()); + } + + @Override + public void operationFailed(Promise promise) { + System.out.printf("Send async message Failed, error: %s%n", promise.getThrowable().getMessage()); + } + }); + } + + { + producer.sendOneway(producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8")))); + System.out.printf("Send oneway message OK%n"); + } + + producer.shutdown(); + messagingAccessPoint.shutdown(); + } +} +``` +## OMSPullConsumer +Use OMS PullConsumer to poll messages from a specified queue. + +``` +public class OMSPullConsumer { + public static void main(String[] args) { + final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory + .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace"); + + final PullConsumer consumer = messagingAccessPoint.createPullConsumer("OMS_HELLO_TOPIC", + OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "OMS_CONSUMER")); + + messagingAccessPoint.startup(); + System.out.printf("MessagingAccessPoint startup OK%n"); + + consumer.startup(); + System.out.printf("Consumer startup OK%n"); + + Message message = consumer.poll(); + if (message != null) { + String msgId = message.headers().getString(MessageHeader.MESSAGE_ID); + System.out.printf("Received one message: %s%n", msgId); + consumer.ack(msgId); + } + + consumer.shutdown(); + messagingAccessPoint.shutdown(); + } +} + +``` +## OMSPushConsumer +Attaches OMS PushConsumer to a specified queue and consumes messages by MessageListener + +``` +public class OMSPushConsumer { + public static void main(String[] args) { + final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory + .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace"); + + final PushConsumer consumer = messagingAccessPoint. + createPushConsumer(OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "OMS_CONSUMER")); + + messagingAccessPoint.startup(); + System.out.printf("MessagingAccessPoint startup OK%n"); + + Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { + @Override + public void run() { + consumer.shutdown(); + messagingAccessPoint.shutdown(); + } + })); + + consumer.attachQueue("OMS_HELLO_TOPIC", new MessageListener() { + @Override + public void onMessage(final Message message, final ReceivedMessageContext context) { + System.out.printf("Received one message: %s%n", message.headers().getString(MessageHeader.MESSAGE_ID)); + context.ack(); + } + }); + + } +} +``` diff --git a/docs/en/architecture.md b/docs/en/architecture.md index 0e362663e8805e4ef4ca55edfd2b3db5c8bed031..18e1cc0023bf804b8bbd34013ec4b4cdfea26d30 100644 --- a/docs/en/architecture.md +++ b/docs/en/architecture.md @@ -10,7 +10,7 @@ The RocketMQ architecture is divided into four parts, as shown in the figure abo - Consumer:The role of message consumption supports distributed cluster deployment. Support push, pull two modes to consume messages. It also supports cluster mode and broadcast mode consumption, and it provides a real-time message subscription mechanism to meet the needs of most users. -- NameServer:NameServer is a very simple Topic routing registry with a role similar to ZooKeeper in Dubbo, which supports dynamic registration and discovery of Broker. It mainly includes two functions: Broker management, NameServer accepts the registration information of the Broker cluster and saves it as the basic data of the routing information. Then provide a heartbeat detection mechanism to check whether the broker is still alive; routing information management, each NameServer will save the entire routing information about the Broker cluster and the queue information for the client query. Then the Producer and Conumser can know the routing information of the entire Broker cluster through the NameServer, so as to deliver and consume the message. The NameServer is usually deployed in a cluster mode, and each instance does not communicate with each other. Broker registers its own routing information with each NameServer, so each NameServer instance stores a complete routing information. When a NameServer is offline for some reason, the Broker can still synchronize its routing information with other NameServers. The Producer and Consumer can still dynamically sense the information of the Broker's routing. +- NameServer:NameServer is a very simple Topic routing registry with a role similar to ZooKeeper in Dubbo, which supports dynamic registration and discovery of Broker. It mainly includes two functions: Broker management, NameServer accepts the registration information of the Broker cluster and saves it as the basic data of the routing information. Then provide a heartbeat detection mechanism to check whether the broker is still alive; routing information management, each NameServer will save the entire routing information about the Broker cluster and the queue information for the client query. Then the Producer and Consumer can know the routing information of the entire Broker cluster through the NameServer, so as to deliver and consume the message. The NameServer is usually deployed in a cluster mode, and each instance does not communicate with each other. Broker registers its own routing information with each NameServer, so each NameServer instance stores a complete routing information. When a NameServer is offline for some reason, the Broker can still synchronize its routing information with other NameServers. The Producer and Consumer can still dynamically sense the information of the Broker's routing. - BrokerServer:Broker is responsible for the storage, delivery and query of messages and high availability guarantees. In order to achieve these functions, Broker includes the following important sub-modules. 1. Remoting Module:The entire broker entity handles requests from the clients side.