提交 18601ec8 编写于 作者: H huzongtang

Merge remote-tracking branch 'origin/develop' into enchanced_acl

...@@ -4,7 +4,7 @@ about: Describe this issue template's purpose here. ...@@ -4,7 +4,7 @@ about: Describe this issue template's purpose here.
--- ---
The issue tracker is **ONLY** used for bug report and feature request. Keep in mind, please check whether there is an existing same report before your raise a new one. The issue tracker is **ONLY** used for bug report(feature request need to follow [RIP process](https://github.com/apache/rocketmq/wiki/RocketMQ-Improvement-Proposal)). Keep in mind, please check whether there is an existing same report before your raise a new one.
Alternately (especially if your communication is not a bug report), you can send mail to our [mailing lists](http://rocketmq.apache.org/about/contact/). We welcome any friendly suggestions, bug fixes, collaboration and other improvements. Alternately (especially if your communication is not a bug report), you can send mail to our [mailing lists](http://rocketmq.apache.org/about/contact/). We welcome any friendly suggestions, bug fixes, collaboration and other improvements.
......
dist: trusty
notifications: notifications:
email: email:
recipients: recipients:
...@@ -7,6 +9,9 @@ notifications: ...@@ -7,6 +9,9 @@ notifications:
language: java language: java
jdk:
- oraclejdk8
matrix: matrix:
include: include:
# On OSX, run with default JDK only. # On OSX, run with default JDK only.
...@@ -18,8 +23,8 @@ matrix: ...@@ -18,8 +23,8 @@ matrix:
before_install: before_install:
- echo 'MAVEN_OPTS="$MAVEN_OPTS -Xmx1024m -XX:MaxPermSize=512m -XX:+BytecodeVerificationLocal"' >> ~/.mavenrc - echo 'MAVEN_OPTS="$MAVEN_OPTS -Xmx1024m -XX:MaxPermSize=512m -XX:+BytecodeVerificationLocal"' >> ~/.mavenrc
- cat ~/.mavenrc - cat ~/.mavenrc
- if [ "$TRAVIS_OS_NAME" == "osx" ]; then export JAVA_HOME=$(/usr/libexec/java_home); fi - if [[ "$TRAVIS_OS_NAME" == "osx" ]]; then export JAVA_HOME=$(/usr/libexec/java_home); fi
- if [ "$TRAVIS_OS_NAME" == "linux" ]; then jdk_switcher use "$CUSTOM_JDK"; fi - if [[ "$TRAVIS_OS_NAME" == "linux" ]]; then jdk_switcher use "$CUSTOM_JDK"; fi
script: script:
- travis_retry mvn -B clean apache-rat:check - travis_retry mvn -B clean apache-rat:check
......
Apache RocketMQ Apache RocketMQ
Copyright 2016-2018 The Apache Software Foundation Copyright 2016-2019 The Apache Software Foundation
This product includes software developed at This product includes software developed at
The Apache Software Foundation (http://www.apache.org/). The Apache Software Foundation (http://www.apache.org/).
\ No newline at end of file
...@@ -32,6 +32,7 @@ It offers a variety of features: ...@@ -32,6 +32,7 @@ It offers a variety of features:
* Home: <https://rocketmq.apache.org> * Home: <https://rocketmq.apache.org>
* Docs: <https://rocketmq.apache.org/docs/quick-start/> * Docs: <https://rocketmq.apache.org/docs/quick-start/>
* Issues: <https://github.com/apache/rocketmq/issues> * Issues: <https://github.com/apache/rocketmq/issues>
* Rips: <https://github.com/apache/rocketmq/wiki/RocketMQ-Improvement-Proposal>
* Ask: <https://stackoverflow.com/questions/tagged/rocketmq> * Ask: <https://stackoverflow.com/questions/tagged/rocketmq>
* Slack: <https://rocketmq-invite-automation.herokuapp.com/> * Slack: <https://rocketmq-invite-automation.herokuapp.com/>
...@@ -43,7 +44,7 @@ It offers a variety of features: ...@@ -43,7 +44,7 @@ It offers a variety of features:
---------- ----------
## Contributing ## Contributing
We always welcome new contributions, whether for trivial cleanups, big new features or other material rewards, more details see [here](http://rocketmq.apache.org/docs/how-to-contribute/). We always welcome new contributions, whether for trivial cleanups, [big new features](https://github.com/apache/rocketmq/wiki/RocketMQ-Improvement-Proposal) or other material rewards, more details see [here](http://rocketmq.apache.org/docs/how-to-contribute/).
---------- ----------
## License ## License
......
...@@ -86,6 +86,7 @@ public class TransactionMQProducer extends DefaultMQProducer { ...@@ -86,6 +86,7 @@ public class TransactionMQProducer extends DefaultMQProducer {
throw new MQClientException("TransactionListener is null", null); throw new MQClientException("TransactionListener is null", null);
} }
msg.setTopic(NamespaceUtil.wrapNamespace(this.getNamespace(), msg.getTopic()));
return this.defaultMQProducerImpl.sendMessageInTransaction(msg, null, arg); return this.defaultMQProducerImpl.sendMessageInTransaction(msg, null, arg);
} }
......
...@@ -46,7 +46,7 @@ public class MessageBatch extends Message implements Iterable<Message> { ...@@ -46,7 +46,7 @@ public class MessageBatch extends Message implements Iterable<Message> {
Message first = null; Message first = null;
for (Message message : messages) { for (Message message : messages) {
if (message.getDelayTimeLevel() > 0) { if (message.getDelayTimeLevel() > 0) {
throw new UnsupportedOperationException("TimeDelayLevel in not supported for batching"); throw new UnsupportedOperationException("TimeDelayLevel is not supported for batching");
} }
if (message.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { if (message.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
throw new UnsupportedOperationException("Retry Group is not supported for batching"); throw new UnsupportedOperationException("Retry Group is not supported for batching");
......
Apache RocketMQ Apache RocketMQ
Copyright 2016-2017 The Apache Software Foundation Copyright 2016-2019 The Apache Software Foundation
This product includes software developed at This product includes software developed at
The Apache Software Foundation (http://www.apache.org/). The Apache Software Foundation (http://www.apache.org/).
...@@ -33,4 +33,4 @@ components that this product depends on. ...@@ -33,4 +33,4 @@ components that this product depends on.
------ ------
This product has a bundle commons-lang, which includes software from the Spring Framework, This product has a bundle commons-lang, which includes software from the Spring Framework,
under the Apache License 2.0 (see: StringUtils.containsWhitespace()) under the Apache License 2.0 (see: StringUtils.containsWhitespace())
\ No newline at end of file
...@@ -29,7 +29,7 @@ rem ============================================================================ ...@@ -29,7 +29,7 @@ rem ============================================================================
rem JVM Configuration rem JVM Configuration
rem =========================================================================================== rem ===========================================================================================
set "JAVA_OPT=%JAVA_OPT% -server -Xms1g -Xmx1g -Xmn256m -XX:PermSize=128m -XX:MaxPermSize=128m" set "JAVA_OPT=%JAVA_OPT% -server -Xms1g -Xmx1g -Xmn256m -XX:PermSize=128m -XX:MaxPermSize=128m"
set "JAVA_OPT=%JAVA_OPT% -Djava.ext.dirs="%BASE_DIR%\lib";"%JAVA_HOME%\jre\lib\ext"" set "JAVA_OPT=%JAVA_OPT% -Djava.ext.dirs="%BASE_DIR%\lib";"%JAVA_HOME%\jre\lib\ext";"%JAVA_HOME%\lib\ext""
set "JAVA_OPT=%JAVA_OPT% -cp "%CLASSPATH%"" set "JAVA_OPT=%JAVA_OPT% -cp "%CLASSPATH%""
"%JAVA%" %JAVA_OPT% %* "%JAVA%" %JAVA_OPT% %*
\ No newline at end of file
...@@ -37,7 +37,7 @@ export CLASSPATH=.:${BASE_DIR}/conf:${CLASSPATH} ...@@ -37,7 +37,7 @@ export CLASSPATH=.:${BASE_DIR}/conf:${CLASSPATH}
# JVM Configuration # JVM Configuration
#=========================================================================================== #===========================================================================================
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn256m -XX:PermSize=128m -XX:MaxPermSize=128m" JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn256m -XX:PermSize=128m -XX:MaxPermSize=128m"
JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${BASE_DIR}/lib:${JAVA_HOME}/jre/lib/ext" JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${BASE_DIR}/lib:${JAVA_HOME}/jre/lib/ext:${JAVA_HOME}/lib/ext"
JAVA_OPT="${JAVA_OPT} -cp ${CLASSPATH}" JAVA_OPT="${JAVA_OPT} -cp ${CLASSPATH}"
$JAVA ${JAVA_OPT} $@ $JAVA ${JAVA_OPT} $@
...@@ -190,7 +190,7 @@ msgId一定是全局唯一标识符,但是实际使用中,可能会存在相 ...@@ -190,7 +190,7 @@ msgId一定是全局唯一标识符,但是实际使用中,可能会存在相
| storePathConsumerQueue | $HOME/store/consumequeue/ | 存储 consume queue 的路径 | | storePathConsumerQueue | $HOME/store/consumequeue/ | 存储 consume queue 的路径 |
| mapedFileSizeCommitLog | 1024 * 1024 * 1024(1G) | commit log 的映射文件大小 |​ | mapedFileSizeCommitLog | 1024 * 1024 * 1024(1G) | commit log 的映射文件大小 |​
| deleteWhen | 04 | 在每天的什么时间删除已经超过文件保留时间的 commit log |​ | deleteWhen | 04 | 在每天的什么时间删除已经超过文件保留时间的 commit log |​
| fileReserverdTime | 72 | 以小时计算的文件保留时间 |​ | fileReservedTime | 72 | 以小时计算的文件保留时间 |​
| brokerRole | ASYNC_MASTER | SYNC_MASTER/ASYNC_MASTER/SLAVE |​ | brokerRole | ASYNC_MASTER | SYNC_MASTER/ASYNC_MASTER/SLAVE |​
| flushDiskType | ASYNC_FLUSH | SYNC_FLUSH/ASYNC_FLUSH SYNC_FLUSH 模式下的 broker 保证在收到确认生产者之前将消息刷盘。ASYNC_FLUSH 模式下的 broker 则利用刷盘一组消息的模式,可以取得更好的性能。 |​ | flushDiskType | ASYNC_FLUSH | SYNC_FLUSH/ASYNC_FLUSH SYNC_FLUSH 模式下的 broker 保证在收到确认生产者之前将消息刷盘。ASYNC_FLUSH 模式下的 broker 则利用刷盘一组消息的模式,可以取得更好的性能。 |​
......
...@@ -22,7 +22,7 @@ A type of Consumer, Under this high real-time performance mode, it will push the ...@@ -22,7 +22,7 @@ A type of Consumer, Under this high real-time performance mode, it will push the
## 9 Producer Group ## 9 Producer Group
A collection of the same type of Producer, which sends the same type of messages with consistent logic. If a transaction message is sent and the original producer crashes after sending, the broker server will contact other producers in the same producer group to commit or rollback the transactional message. A collection of the same type of Producer, which sends the same type of messages with consistent logic. If a transaction message is sent and the original producer crashes after sending, the broker server will contact other producers in the same producer group to commit or rollback the transactional message.
## 10 Consumer Group ## 10 Consumer Group
A collection of the same type of Consumer, which sends the same type of messages with consistent logic. The consumer group makes load-balance and fault-tolerance super easy in terms of message consuming. A collection of the same type of Consumer, which consume the same type of messages with consistent logic. The consumer group makes load-balance and fault-tolerance super easy in terms of message consuming.
Warning: consumer instances of one consumer group must have exactly the same topic subscription(s). Warning: consumer instances of one consumer group must have exactly the same topic subscription(s).
RocketMQ supports two types of consumption mode:Clustering and Broadcasting. RocketMQ supports two types of consumption mode:Clustering and Broadcasting.
......
# OpenMessaging Example
[OpenMessaging](https://openmessaging.github.io/), which includes the establishment of industry guidelines and messaging, streaming specifications to provide a common framework for finance, ecommerce, IoT and big-data area. The design principles are the cloud-oriented, simplicity, flexibility, and language independent in distributed heterogeneous environments. Conformance to these specifications will make it possible to develop a heterogeneous messaging applications across all major platforms and operating systems.
RocketMQ provides a partial implementation of OpenMessaging 0.1.0-alpha, the following examples demonstrate how to access RocketMQ based on OpenMessaging.
## OMSProducer
The following example shows how to send message to RocketMQ broker in synchronous, asynchronous, or one-way transmissions.
```
public class OMSProducer {
public static void main(String[] args) {
final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
.getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
final Producer producer = messagingAccessPoint.createProducer();
messagingAccessPoint.startup();
System.out.printf("MessagingAccessPoint startup OK%n");
producer.startup();
System.out.printf("Producer startup OK%n");
{
Message message = producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8")));
SendResult sendResult = producer.send(message);
System.out.printf("Send sync message OK, msgId: %s%n", sendResult.messageId());
}
{
final Promise<SendResult> result = producer.sendAsync(producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
result.addListener(new PromiseListener<SendResult>() {
@Override
public void operationCompleted(Promise<SendResult> promise) {
System.out.printf("Send async message OK, msgId: %s%n", promise.get().messageId());
}
@Override
public void operationFailed(Promise<SendResult> promise) {
System.out.printf("Send async message Failed, error: %s%n", promise.getThrowable().getMessage());
}
});
}
{
producer.sendOneway(producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
System.out.printf("Send oneway message OK%n");
}
producer.shutdown();
messagingAccessPoint.shutdown();
}
}
```
## OMSPullConsumer
Use OMS PullConsumer to poll messages from a specified queue.
```
public class OMSPullConsumer {
public static void main(String[] args) {
final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
.getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
final PullConsumer consumer = messagingAccessPoint.createPullConsumer("OMS_HELLO_TOPIC",
OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "OMS_CONSUMER"));
messagingAccessPoint.startup();
System.out.printf("MessagingAccessPoint startup OK%n");
consumer.startup();
System.out.printf("Consumer startup OK%n");
Message message = consumer.poll();
if (message != null) {
String msgId = message.headers().getString(MessageHeader.MESSAGE_ID);
System.out.printf("Received one message: %s%n", msgId);
consumer.ack(msgId);
}
consumer.shutdown();
messagingAccessPoint.shutdown();
}
}
```
## OMSPushConsumer
Attaches OMS PushConsumer to a specified queue and consumes messages by MessageListener
```
public class OMSPushConsumer {
public static void main(String[] args) {
final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
.getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
final PushConsumer consumer = messagingAccessPoint.
createPushConsumer(OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "OMS_CONSUMER"));
messagingAccessPoint.startup();
System.out.printf("MessagingAccessPoint startup OK%n");
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
consumer.shutdown();
messagingAccessPoint.shutdown();
}
}));
consumer.attachQueue("OMS_HELLO_TOPIC", new MessageListener() {
@Override
public void onMessage(final Message message, final ReceivedMessageContext context) {
System.out.printf("Received one message: %s%n", message.headers().getString(MessageHeader.MESSAGE_ID));
context.ack();
}
});
}
}
```
...@@ -10,7 +10,7 @@ The RocketMQ architecture is divided into four parts, as shown in the figure abo ...@@ -10,7 +10,7 @@ The RocketMQ architecture is divided into four parts, as shown in the figure abo
- Consumer:The role of message consumption supports distributed cluster deployment. Support push, pull two modes to consume messages. It also supports cluster mode and broadcast mode consumption, and it provides a real-time message subscription mechanism to meet the needs of most users. - Consumer:The role of message consumption supports distributed cluster deployment. Support push, pull two modes to consume messages. It also supports cluster mode and broadcast mode consumption, and it provides a real-time message subscription mechanism to meet the needs of most users.
- NameServer:NameServer is a very simple Topic routing registry with a role similar to ZooKeeper in Dubbo, which supports dynamic registration and discovery of Broker. It mainly includes two functions: Broker management, NameServer accepts the registration information of the Broker cluster and saves it as the basic data of the routing information. Then provide a heartbeat detection mechanism to check whether the broker is still alive; routing information management, each NameServer will save the entire routing information about the Broker cluster and the queue information for the client query. Then the Producer and Conumser can know the routing information of the entire Broker cluster through the NameServer, so as to deliver and consume the message. The NameServer is usually deployed in a cluster mode, and each instance does not communicate with each other. Broker registers its own routing information with each NameServer, so each NameServer instance stores a complete routing information. When a NameServer is offline for some reason, the Broker can still synchronize its routing information with other NameServers. The Producer and Consumer can still dynamically sense the information of the Broker's routing. - NameServer:NameServer is a very simple Topic routing registry with a role similar to ZooKeeper in Dubbo, which supports dynamic registration and discovery of Broker. It mainly includes two functions: Broker management, NameServer accepts the registration information of the Broker cluster and saves it as the basic data of the routing information. Then provide a heartbeat detection mechanism to check whether the broker is still alive; routing information management, each NameServer will save the entire routing information about the Broker cluster and the queue information for the client query. Then the Producer and Consumer can know the routing information of the entire Broker cluster through the NameServer, so as to deliver and consume the message. The NameServer is usually deployed in a cluster mode, and each instance does not communicate with each other. Broker registers its own routing information with each NameServer, so each NameServer instance stores a complete routing information. When a NameServer is offline for some reason, the Broker can still synchronize its routing information with other NameServers. The Producer and Consumer can still dynamically sense the information of the Broker's routing.
- BrokerServer:Broker is responsible for the storage, delivery and query of messages and high availability guarantees. In order to achieve these functions, Broker includes the following important sub-modules. - BrokerServer:Broker is responsible for the storage, delivery and query of messages and high availability guarantees. In order to achieve these functions, Broker includes the following important sub-modules.
1. Remoting Module:The entire broker entity handles requests from the clients side. 1. Remoting Module:The entire broker entity handles requests from the clients side.
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册