diff --git a/README.md b/README.md
index a5f47e597cee42340069a9bf63fd0e51eb6e3372..a6493b6f5073d554b003962a82b5f80705a64472 100644
--- a/README.md
+++ b/README.md
@@ -37,8 +37,7 @@ It offers a variety of features:
----------
## Apache RocketMQ Community
-* [RocketMQ Community Incubator Projects](https://github.com/apache/rocketmq-externals)
-
+[RocketMQ Community Incubator Projects](https://github.com/apache/rocketmq-externals)
----------
## Contributing
diff --git a/acl/src/test/java/org/apache/rocketmq/acl/common/AclSignerTest.java b/acl/src/test/java/org/apache/rocketmq/acl/common/AclSignerTest.java
index 4169d88fe9fe62c2e3b71f83f96ad60978d5c6f6..eec626357fce92bfcdfe42da90a5bacf80b699ab 100644
--- a/acl/src/test/java/org/apache/rocketmq/acl/common/AclSignerTest.java
+++ b/acl/src/test/java/org/apache/rocketmq/acl/common/AclSignerTest.java
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.rocketmq.acl.common;
import org.junit.Test;
diff --git a/acl/src/test/java/org/apache/rocketmq/acl/common/PermissionTest.java b/acl/src/test/java/org/apache/rocketmq/acl/common/PermissionTest.java
index 31820ad7d59850d3aea8e59746acf7a7de100bec..253b5b241e05a9e900f5c730e0cec8094fbe2d98 100644
--- a/acl/src/test/java/org/apache/rocketmq/acl/common/PermissionTest.java
+++ b/acl/src/test/java/org/apache/rocketmq/acl/common/PermissionTest.java
@@ -153,4 +153,16 @@ public class PermissionTest {
}
}
}
+
+ @Test
+ public void AclExceptionTest(){
+ AclException aclException = new AclException("CAL_SIGNATURE_FAILED",10015);
+ AclException aclExceptionWithMessage = new AclException("CAL_SIGNATURE_FAILED",10015,"CAL_SIGNATURE_FAILED Exception");
+ Assert.assertEquals(aclException.getCode(),10015);
+ Assert.assertEquals(aclExceptionWithMessage.getStatus(),"CAL_SIGNATURE_FAILED");
+ aclException.setCode(10016);
+ Assert.assertEquals(aclException.getCode(),10016);
+ aclException.setStatus("netaddress examine scope Exception netaddress");
+ Assert.assertEquals(aclException.getStatus(),"netaddress examine scope Exception netaddress");
+ }
}
diff --git a/acl/src/test/java/org/apache/rocketmq/acl/common/SessionCredentialsTest.java b/acl/src/test/java/org/apache/rocketmq/acl/common/SessionCredentialsTest.java
index b6f9b8ce05a34574bcd67008ab4c314489678072..a1a4bde4f875808a7f4e2d012b5b68beb03d4be1 100644
--- a/acl/src/test/java/org/apache/rocketmq/acl/common/SessionCredentialsTest.java
+++ b/acl/src/test/java/org/apache/rocketmq/acl/common/SessionCredentialsTest.java
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.rocketmq.acl.common;
import org.junit.Assert;
@@ -25,5 +41,51 @@ public class SessionCredentialsTest {
sessionCredentials.updateContent(properties);
}
+ @Test
+ public void SessionCredentialHashCodeTest(){
+ SessionCredentials sessionCredentials=new SessionCredentials();
+ Properties properties=new Properties();
+ properties.setProperty(SessionCredentials.ACCESS_KEY,"RocketMQ");
+ properties.setProperty(SessionCredentials.SECRET_KEY,"12345678");
+ properties.setProperty(SessionCredentials.SECURITY_TOKEN,"abcd");
+ sessionCredentials.updateContent(properties);
+ Assert.assertEquals(sessionCredentials.hashCode(),353652211);
+ }
+
+ @Test
+ public void SessionCredentialEqualsTest(){
+ SessionCredentials sessionCredential1 =new SessionCredentials();
+ Properties properties1=new Properties();
+ properties1.setProperty(SessionCredentials.ACCESS_KEY,"RocketMQ");
+ properties1.setProperty(SessionCredentials.SECRET_KEY,"12345678");
+ properties1.setProperty(SessionCredentials.SECURITY_TOKEN,"abcd");
+ sessionCredential1.updateContent(properties1);
+
+ SessionCredentials sessionCredential2 =new SessionCredentials();
+ Properties properties2=new Properties();
+ properties2.setProperty(SessionCredentials.ACCESS_KEY,"RocketMQ");
+ properties2.setProperty(SessionCredentials.SECRET_KEY,"12345678");
+ properties2.setProperty(SessionCredentials.SECURITY_TOKEN,"abcd");
+ sessionCredential2.updateContent(properties2);
+
+ Assert.assertTrue(sessionCredential2.equals(sessionCredential1));
+ sessionCredential2.setSecretKey("1234567899");
+ sessionCredential2.setSignature("1234567899");
+ Assert.assertFalse(sessionCredential2.equals(sessionCredential1));
+ }
+
+ @Test
+ public void SessionCredentialToStringTest(){
+ SessionCredentials sessionCredential1 =new SessionCredentials();
+ Properties properties1=new Properties();
+ properties1.setProperty(SessionCredentials.ACCESS_KEY,"RocketMQ");
+ properties1.setProperty(SessionCredentials.SECRET_KEY,"12345678");
+ properties1.setProperty(SessionCredentials.SECURITY_TOKEN,"abcd");
+ sessionCredential1.updateContent(properties1);
+
+ Assert.assertEquals(sessionCredential1.toString(),
+ "SessionCredentials [accessKey=RocketMQ, secretKey=12345678, signature=null, SecurityToken=abcd]");
+ }
+
}
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/pagecache/ManyMessageTransferTest.java b/broker/src/test/java/org/apache/rocketmq/broker/pagecache/ManyMessageTransferTest.java
new file mode 100644
index 0000000000000000000000000000000000000000..508635c044cf38997168030a73a6650c96b7b0fb
--- /dev/null
+++ b/broker/src/test/java/org/apache/rocketmq/broker/pagecache/ManyMessageTransferTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.pagecache;
+
+import java.nio.ByteBuffer;
+import org.apache.rocketmq.store.GetMessageResult;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ManyMessageTransferTest {
+
+ @Test
+ public void ManyMessageTransferBuilderTest(){
+ ByteBuffer byteBuffer = ByteBuffer.allocate(20);
+ byteBuffer.putInt(20);
+ GetMessageResult getMessageResult = new GetMessageResult();
+ ManyMessageTransfer manyMessageTransfer = new ManyMessageTransfer(byteBuffer,getMessageResult);
+ }
+
+ @Test
+ public void ManyMessageTransferPosTest(){
+ ByteBuffer byteBuffer = ByteBuffer.allocate(20);
+ byteBuffer.putInt(20);
+ GetMessageResult getMessageResult = new GetMessageResult();
+ ManyMessageTransfer manyMessageTransfer = new ManyMessageTransfer(byteBuffer,getMessageResult);
+ Assert.assertEquals(manyMessageTransfer.position(),4);
+ }
+
+ @Test
+ public void ManyMessageTransferCountTest(){
+ ByteBuffer byteBuffer = ByteBuffer.allocate(20);
+ byteBuffer.putInt(20);
+ GetMessageResult getMessageResult = new GetMessageResult();
+ ManyMessageTransfer manyMessageTransfer = new ManyMessageTransfer(byteBuffer,getMessageResult);
+
+ Assert.assertEquals(manyMessageTransfer.count(),20);
+
+ }
+
+ @Test
+ public void ManyMessageTransferCloseTest(){
+ ByteBuffer byteBuffer = ByteBuffer.allocate(20);
+ byteBuffer.putInt(20);
+ GetMessageResult getMessageResult = new GetMessageResult();
+ ManyMessageTransfer manyMessageTransfer = new ManyMessageTransfer(byteBuffer,getMessageResult);
+ manyMessageTransfer.close();
+ manyMessageTransfer.deallocate();
+ }
+}
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/pagecache/OneMessageTransferTest.java b/broker/src/test/java/org/apache/rocketmq/broker/pagecache/OneMessageTransferTest.java
new file mode 100644
index 0000000000000000000000000000000000000000..2cd4bdc1657a57650ec407ed89a3420b6eeeda52
--- /dev/null
+++ b/broker/src/test/java/org/apache/rocketmq/broker/pagecache/OneMessageTransferTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.pagecache;
+
+import java.nio.ByteBuffer;
+import org.apache.rocketmq.store.MappedFile;
+import org.apache.rocketmq.store.SelectMappedBufferResult;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class OneMessageTransferTest {
+
+ @Test
+ public void OneMessageTransferTest(){
+ ByteBuffer byteBuffer = ByteBuffer.allocate(20);
+ byteBuffer.putInt(20);
+ SelectMappedBufferResult selectMappedBufferResult = new SelectMappedBufferResult(0,byteBuffer,20,new MappedFile());
+ OneMessageTransfer manyMessageTransfer = new OneMessageTransfer(byteBuffer,selectMappedBufferResult);
+ }
+
+ @Test
+ public void OneMessageTransferCountTest(){
+ ByteBuffer byteBuffer = ByteBuffer.allocate(20);
+ byteBuffer.putInt(20);
+ SelectMappedBufferResult selectMappedBufferResult = new SelectMappedBufferResult(0,byteBuffer,20,new MappedFile());
+ OneMessageTransfer manyMessageTransfer = new OneMessageTransfer(byteBuffer,selectMappedBufferResult);
+ Assert.assertEquals(manyMessageTransfer.count(),40);
+ }
+
+ @Test
+ public void OneMessageTransferPosTest(){
+ ByteBuffer byteBuffer = ByteBuffer.allocate(20);
+ byteBuffer.putInt(20);
+ SelectMappedBufferResult selectMappedBufferResult = new SelectMappedBufferResult(0,byteBuffer,20,new MappedFile());
+ OneMessageTransfer manyMessageTransfer = new OneMessageTransfer(byteBuffer,selectMappedBufferResult);
+ Assert.assertEquals(manyMessageTransfer.position(),8);
+ }
+}
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
index 179a80daa42275257c2c4e80db426f54cb737580..2149d6756819bd9eb9ef957e948a46cfc4b65f87 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
@@ -309,7 +309,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
} else {
tempProperties.put(TrackTraceConstants.TRACE_TOPIC, MixAll.RMQ_SYS_TRACK_TRACE_TOPIC);
}
- AsyncArrayDispatcher dispatcher = new AsyncArrayDispatcher(tempProperties);
+ AsyncArrayDispatcher dispatcher = new AsyncArrayDispatcher(tempProperties, rpcHook);
dispatcher.setHostConsumer(this.getDefaultMQPushConsumerImpl());
traceDispatcher = dispatcher;
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
index 3c33d2eed5352561fc67689bed6b3bc65c2fe549..22c676055fce1e76c0b237cecb41e1e7065677e0 100644
--- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
@@ -178,7 +178,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
} else {
tempProperties.put(TrackTraceConstants.TRACE_TOPIC, MixAll.RMQ_SYS_TRACK_TRACE_TOPIC);
}
- AsyncArrayDispatcher dispatcher = new AsyncArrayDispatcher(tempProperties);
+ AsyncArrayDispatcher dispatcher = new AsyncArrayDispatcher(tempProperties, rpcHook);
dispatcher.setHostProducer(this.getDefaultMQProducerImpl());
traceDispatcher = dispatcher;
diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/core/dispatch/impl/AsyncArrayDispatcher.java b/client/src/main/java/org/apache/rocketmq/client/trace/core/dispatch/impl/AsyncArrayDispatcher.java
index 90b00d414c9f53e7cb602fa6d788d48bf7790ea8..496d290f5cf70eac0b03cc4c29e640335e0f5914 100644
--- a/client/src/main/java/org/apache/rocketmq/client/trace/core/dispatch/impl/AsyncArrayDispatcher.java
+++ b/client/src/main/java/org/apache/rocketmq/client/trace/core/dispatch/impl/AsyncArrayDispatcher.java
@@ -49,6 +49,7 @@ import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.rocketmq.remoting.RPCHook;
/**
* Created by zongtanghu on 2018/11/6.
@@ -74,7 +75,7 @@ public class AsyncArrayDispatcher implements AsyncDispatcher {
private String dispatcherId = UUID.randomUUID().toString();
private String traceTopicName;
- public AsyncArrayDispatcher(Properties properties) throws MQClientException {
+ public AsyncArrayDispatcher(Properties properties, RPCHook rpcHook) throws MQClientException {
dispatcherType = properties.getProperty(TrackTraceConstants.TRACE_DISPATCHER_TYPE);
int queueSize = Integer.parseInt(properties.getProperty(TrackTraceConstants.ASYNC_BUFFER_SIZE, "2048"));
// queueSize is greater than or equal to the n power of 2 of value
@@ -92,7 +93,7 @@ public class AsyncArrayDispatcher implements AsyncDispatcher {
TimeUnit.MILLISECONDS, //
this.appenderQueue, //
new ThreadFactoryImpl("MQTraceSendThread_"));
- traceProducer = TrackTraceProducerFactory.getTraceDispatcherProducer(properties);
+ traceProducer = TrackTraceProducerFactory.getTraceDispatcherProducer(properties, rpcHook);
}
public String getTraceTopicName() {
diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/core/dispatch/impl/TrackTraceProducerFactory.java b/client/src/main/java/org/apache/rocketmq/client/trace/core/dispatch/impl/TrackTraceProducerFactory.java
index 27447df7340841f35906c1659f8895d58785cd06..37c39a14e552a03412a9a9b36b0a274c4fcd0b9f 100644
--- a/client/src/main/java/org/apache/rocketmq/client/trace/core/dispatch/impl/TrackTraceProducerFactory.java
+++ b/client/src/main/java/org/apache/rocketmq/client/trace/core/dispatch/impl/TrackTraceProducerFactory.java
@@ -25,6 +25,7 @@ import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.rocketmq.remoting.RPCHook;
public class TrackTraceProducerFactory {
@@ -33,10 +34,10 @@ public class TrackTraceProducerFactory {
private static DefaultMQProducer traceProducer;
- public static DefaultMQProducer getTraceDispatcherProducer(Properties properties) {
+ public static DefaultMQProducer getTraceDispatcherProducer(Properties properties, RPCHook rpcHook) {
if (traceProducer == null) {
- traceProducer = new DefaultMQProducer();
+ traceProducer = new DefaultMQProducer(rpcHook);
traceProducer.setProducerGroup(TrackTraceConstants.GROUP_NAME);
traceProducer.setSendMsgTimeout(5000);
traceProducer.setInstanceName(properties.getProperty(TrackTraceConstants.INSTANCE_NAME, String.valueOf(System.currentTimeMillis())));
diff --git a/pom.xml b/pom.xml
index c20b04cb36679e4812665a0e44d62a7bf2b4a4b8..1e3c4bcc85970a374d3dc33ab399f8dbcb58933a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -259,6 +259,7 @@
src/test/resources/certs/*
src/test/**/*.log
src/test/resources/META-INF/service/*
+ src/main/resources/META-INF/service/*
*/target/**
*/*.iml