From 5f664247214696e5a5ffb8a8600dc9dba901eee9 Mon Sep 17 00:00:00 2001 From: Hu Zongtang Date: Fri, 28 Dec 2018 10:56:48 +0800 Subject: [PATCH] [ISSUE#525]add aclClient PRCHook for message track (#638) * prepare to separate production-ready projects from the external projects * Update fastjson to the latest stable version * Clean code, don't list the default config in JVM * Update README.md * update the year info in NOTICE * Release semaphore when timeout * [ISSUE#525]add aclClient PRCHook for message track * [ISSUE#525]add the apache license text,remove the merged from master branch * [ISSUE#525]add aclClient PRCHook for message track,remove the merged content of notice and readme.md from master branch,add some unit test to increase the code coverage. --- README.md | 3 +- .../rocketmq/acl/common/AclSignerTest.java | 16 +++++ .../rocketmq/acl/common/PermissionTest.java | 12 ++++ .../acl/common/SessionCredentialsTest.java | 62 ++++++++++++++++++ .../pagecache/ManyMessageTransferTest.java | 64 +++++++++++++++++++ .../pagecache/OneMessageTransferTest.java | 53 +++++++++++++++ .../consumer/DefaultMQPushConsumer.java | 2 +- .../client/producer/DefaultMQProducer.java | 2 +- .../dispatch/impl/AsyncArrayDispatcher.java | 5 +- .../impl/TrackTraceProducerFactory.java | 5 +- pom.xml | 1 + 11 files changed, 217 insertions(+), 8 deletions(-) create mode 100644 broker/src/test/java/org/apache/rocketmq/broker/pagecache/ManyMessageTransferTest.java create mode 100644 broker/src/test/java/org/apache/rocketmq/broker/pagecache/OneMessageTransferTest.java diff --git a/README.md b/README.md index a5f47e59..a6493b6f 100644 --- a/README.md +++ b/README.md @@ -37,8 +37,7 @@ It offers a variety of features: ---------- ## Apache RocketMQ Community -* [RocketMQ Community Incubator Projects](https://github.com/apache/rocketmq-externals) - +[RocketMQ Community Incubator Projects](https://github.com/apache/rocketmq-externals) ---------- ## Contributing diff --git a/acl/src/test/java/org/apache/rocketmq/acl/common/AclSignerTest.java b/acl/src/test/java/org/apache/rocketmq/acl/common/AclSignerTest.java index 4169d88f..eec62635 100644 --- a/acl/src/test/java/org/apache/rocketmq/acl/common/AclSignerTest.java +++ b/acl/src/test/java/org/apache/rocketmq/acl/common/AclSignerTest.java @@ -1,3 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.rocketmq.acl.common; import org.junit.Test; diff --git a/acl/src/test/java/org/apache/rocketmq/acl/common/PermissionTest.java b/acl/src/test/java/org/apache/rocketmq/acl/common/PermissionTest.java index 31820ad7..253b5b24 100644 --- a/acl/src/test/java/org/apache/rocketmq/acl/common/PermissionTest.java +++ b/acl/src/test/java/org/apache/rocketmq/acl/common/PermissionTest.java @@ -153,4 +153,16 @@ public class PermissionTest { } } } + + @Test + public void AclExceptionTest(){ + AclException aclException = new AclException("CAL_SIGNATURE_FAILED",10015); + AclException aclExceptionWithMessage = new AclException("CAL_SIGNATURE_FAILED",10015,"CAL_SIGNATURE_FAILED Exception"); + Assert.assertEquals(aclException.getCode(),10015); + Assert.assertEquals(aclExceptionWithMessage.getStatus(),"CAL_SIGNATURE_FAILED"); + aclException.setCode(10016); + Assert.assertEquals(aclException.getCode(),10016); + aclException.setStatus("netaddress examine scope Exception netaddress"); + Assert.assertEquals(aclException.getStatus(),"netaddress examine scope Exception netaddress"); + } } diff --git a/acl/src/test/java/org/apache/rocketmq/acl/common/SessionCredentialsTest.java b/acl/src/test/java/org/apache/rocketmq/acl/common/SessionCredentialsTest.java index b6f9b8ce..a1a4bde4 100644 --- a/acl/src/test/java/org/apache/rocketmq/acl/common/SessionCredentialsTest.java +++ b/acl/src/test/java/org/apache/rocketmq/acl/common/SessionCredentialsTest.java @@ -1,3 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.rocketmq.acl.common; import org.junit.Assert; @@ -25,5 +41,51 @@ public class SessionCredentialsTest { sessionCredentials.updateContent(properties); } + @Test + public void SessionCredentialHashCodeTest(){ + SessionCredentials sessionCredentials=new SessionCredentials(); + Properties properties=new Properties(); + properties.setProperty(SessionCredentials.ACCESS_KEY,"RocketMQ"); + properties.setProperty(SessionCredentials.SECRET_KEY,"12345678"); + properties.setProperty(SessionCredentials.SECURITY_TOKEN,"abcd"); + sessionCredentials.updateContent(properties); + Assert.assertEquals(sessionCredentials.hashCode(),353652211); + } + + @Test + public void SessionCredentialEqualsTest(){ + SessionCredentials sessionCredential1 =new SessionCredentials(); + Properties properties1=new Properties(); + properties1.setProperty(SessionCredentials.ACCESS_KEY,"RocketMQ"); + properties1.setProperty(SessionCredentials.SECRET_KEY,"12345678"); + properties1.setProperty(SessionCredentials.SECURITY_TOKEN,"abcd"); + sessionCredential1.updateContent(properties1); + + SessionCredentials sessionCredential2 =new SessionCredentials(); + Properties properties2=new Properties(); + properties2.setProperty(SessionCredentials.ACCESS_KEY,"RocketMQ"); + properties2.setProperty(SessionCredentials.SECRET_KEY,"12345678"); + properties2.setProperty(SessionCredentials.SECURITY_TOKEN,"abcd"); + sessionCredential2.updateContent(properties2); + + Assert.assertTrue(sessionCredential2.equals(sessionCredential1)); + sessionCredential2.setSecretKey("1234567899"); + sessionCredential2.setSignature("1234567899"); + Assert.assertFalse(sessionCredential2.equals(sessionCredential1)); + } + + @Test + public void SessionCredentialToStringTest(){ + SessionCredentials sessionCredential1 =new SessionCredentials(); + Properties properties1=new Properties(); + properties1.setProperty(SessionCredentials.ACCESS_KEY,"RocketMQ"); + properties1.setProperty(SessionCredentials.SECRET_KEY,"12345678"); + properties1.setProperty(SessionCredentials.SECURITY_TOKEN,"abcd"); + sessionCredential1.updateContent(properties1); + + Assert.assertEquals(sessionCredential1.toString(), + "SessionCredentials [accessKey=RocketMQ, secretKey=12345678, signature=null, SecurityToken=abcd]"); + } + } diff --git a/broker/src/test/java/org/apache/rocketmq/broker/pagecache/ManyMessageTransferTest.java b/broker/src/test/java/org/apache/rocketmq/broker/pagecache/ManyMessageTransferTest.java new file mode 100644 index 00000000..508635c0 --- /dev/null +++ b/broker/src/test/java/org/apache/rocketmq/broker/pagecache/ManyMessageTransferTest.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.broker.pagecache; + +import java.nio.ByteBuffer; +import org.apache.rocketmq.store.GetMessageResult; +import org.junit.Assert; +import org.junit.Test; + +public class ManyMessageTransferTest { + + @Test + public void ManyMessageTransferBuilderTest(){ + ByteBuffer byteBuffer = ByteBuffer.allocate(20); + byteBuffer.putInt(20); + GetMessageResult getMessageResult = new GetMessageResult(); + ManyMessageTransfer manyMessageTransfer = new ManyMessageTransfer(byteBuffer,getMessageResult); + } + + @Test + public void ManyMessageTransferPosTest(){ + ByteBuffer byteBuffer = ByteBuffer.allocate(20); + byteBuffer.putInt(20); + GetMessageResult getMessageResult = new GetMessageResult(); + ManyMessageTransfer manyMessageTransfer = new ManyMessageTransfer(byteBuffer,getMessageResult); + Assert.assertEquals(manyMessageTransfer.position(),4); + } + + @Test + public void ManyMessageTransferCountTest(){ + ByteBuffer byteBuffer = ByteBuffer.allocate(20); + byteBuffer.putInt(20); + GetMessageResult getMessageResult = new GetMessageResult(); + ManyMessageTransfer manyMessageTransfer = new ManyMessageTransfer(byteBuffer,getMessageResult); + + Assert.assertEquals(manyMessageTransfer.count(),20); + + } + + @Test + public void ManyMessageTransferCloseTest(){ + ByteBuffer byteBuffer = ByteBuffer.allocate(20); + byteBuffer.putInt(20); + GetMessageResult getMessageResult = new GetMessageResult(); + ManyMessageTransfer manyMessageTransfer = new ManyMessageTransfer(byteBuffer,getMessageResult); + manyMessageTransfer.close(); + manyMessageTransfer.deallocate(); + } +} diff --git a/broker/src/test/java/org/apache/rocketmq/broker/pagecache/OneMessageTransferTest.java b/broker/src/test/java/org/apache/rocketmq/broker/pagecache/OneMessageTransferTest.java new file mode 100644 index 00000000..2cd4bdc1 --- /dev/null +++ b/broker/src/test/java/org/apache/rocketmq/broker/pagecache/OneMessageTransferTest.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.broker.pagecache; + +import java.nio.ByteBuffer; +import org.apache.rocketmq.store.MappedFile; +import org.apache.rocketmq.store.SelectMappedBufferResult; +import org.junit.Assert; +import org.junit.Test; + +public class OneMessageTransferTest { + + @Test + public void OneMessageTransferTest(){ + ByteBuffer byteBuffer = ByteBuffer.allocate(20); + byteBuffer.putInt(20); + SelectMappedBufferResult selectMappedBufferResult = new SelectMappedBufferResult(0,byteBuffer,20,new MappedFile()); + OneMessageTransfer manyMessageTransfer = new OneMessageTransfer(byteBuffer,selectMappedBufferResult); + } + + @Test + public void OneMessageTransferCountTest(){ + ByteBuffer byteBuffer = ByteBuffer.allocate(20); + byteBuffer.putInt(20); + SelectMappedBufferResult selectMappedBufferResult = new SelectMappedBufferResult(0,byteBuffer,20,new MappedFile()); + OneMessageTransfer manyMessageTransfer = new OneMessageTransfer(byteBuffer,selectMappedBufferResult); + Assert.assertEquals(manyMessageTransfer.count(),40); + } + + @Test + public void OneMessageTransferPosTest(){ + ByteBuffer byteBuffer = ByteBuffer.allocate(20); + byteBuffer.putInt(20); + SelectMappedBufferResult selectMappedBufferResult = new SelectMappedBufferResult(0,byteBuffer,20,new MappedFile()); + OneMessageTransfer manyMessageTransfer = new OneMessageTransfer(byteBuffer,selectMappedBufferResult); + Assert.assertEquals(manyMessageTransfer.position(),8); + } +} diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java index 179a80da..2149d675 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java @@ -309,7 +309,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume } else { tempProperties.put(TrackTraceConstants.TRACE_TOPIC, MixAll.RMQ_SYS_TRACK_TRACE_TOPIC); } - AsyncArrayDispatcher dispatcher = new AsyncArrayDispatcher(tempProperties); + AsyncArrayDispatcher dispatcher = new AsyncArrayDispatcher(tempProperties, rpcHook); dispatcher.setHostConsumer(this.getDefaultMQPushConsumerImpl()); traceDispatcher = dispatcher; diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java index 3c33d2ee..22c67605 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java @@ -178,7 +178,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { } else { tempProperties.put(TrackTraceConstants.TRACE_TOPIC, MixAll.RMQ_SYS_TRACK_TRACE_TOPIC); } - AsyncArrayDispatcher dispatcher = new AsyncArrayDispatcher(tempProperties); + AsyncArrayDispatcher dispatcher = new AsyncArrayDispatcher(tempProperties, rpcHook); dispatcher.setHostProducer(this.getDefaultMQProducerImpl()); traceDispatcher = dispatcher; diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/core/dispatch/impl/AsyncArrayDispatcher.java b/client/src/main/java/org/apache/rocketmq/client/trace/core/dispatch/impl/AsyncArrayDispatcher.java index 90b00d41..496d290f 100644 --- a/client/src/main/java/org/apache/rocketmq/client/trace/core/dispatch/impl/AsyncArrayDispatcher.java +++ b/client/src/main/java/org/apache/rocketmq/client/trace/core/dispatch/impl/AsyncArrayDispatcher.java @@ -49,6 +49,7 @@ import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import org.apache.rocketmq.remoting.RPCHook; /** * Created by zongtanghu on 2018/11/6. @@ -74,7 +75,7 @@ public class AsyncArrayDispatcher implements AsyncDispatcher { private String dispatcherId = UUID.randomUUID().toString(); private String traceTopicName; - public AsyncArrayDispatcher(Properties properties) throws MQClientException { + public AsyncArrayDispatcher(Properties properties, RPCHook rpcHook) throws MQClientException { dispatcherType = properties.getProperty(TrackTraceConstants.TRACE_DISPATCHER_TYPE); int queueSize = Integer.parseInt(properties.getProperty(TrackTraceConstants.ASYNC_BUFFER_SIZE, "2048")); // queueSize is greater than or equal to the n power of 2 of value @@ -92,7 +93,7 @@ public class AsyncArrayDispatcher implements AsyncDispatcher { TimeUnit.MILLISECONDS, // this.appenderQueue, // new ThreadFactoryImpl("MQTraceSendThread_")); - traceProducer = TrackTraceProducerFactory.getTraceDispatcherProducer(properties); + traceProducer = TrackTraceProducerFactory.getTraceDispatcherProducer(properties, rpcHook); } public String getTraceTopicName() { diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/core/dispatch/impl/TrackTraceProducerFactory.java b/client/src/main/java/org/apache/rocketmq/client/trace/core/dispatch/impl/TrackTraceProducerFactory.java index 27447df7..37c39a14 100644 --- a/client/src/main/java/org/apache/rocketmq/client/trace/core/dispatch/impl/TrackTraceProducerFactory.java +++ b/client/src/main/java/org/apache/rocketmq/client/trace/core/dispatch/impl/TrackTraceProducerFactory.java @@ -25,6 +25,7 @@ import java.util.Map; import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.rocketmq.remoting.RPCHook; public class TrackTraceProducerFactory { @@ -33,10 +34,10 @@ public class TrackTraceProducerFactory { private static DefaultMQProducer traceProducer; - public static DefaultMQProducer getTraceDispatcherProducer(Properties properties) { + public static DefaultMQProducer getTraceDispatcherProducer(Properties properties, RPCHook rpcHook) { if (traceProducer == null) { - traceProducer = new DefaultMQProducer(); + traceProducer = new DefaultMQProducer(rpcHook); traceProducer.setProducerGroup(TrackTraceConstants.GROUP_NAME); traceProducer.setSendMsgTimeout(5000); traceProducer.setInstanceName(properties.getProperty(TrackTraceConstants.INSTANCE_NAME, String.valueOf(System.currentTimeMillis()))); diff --git a/pom.xml b/pom.xml index c20b04cb..1e3c4bcc 100644 --- a/pom.xml +++ b/pom.xml @@ -259,6 +259,7 @@ src/test/resources/certs/* src/test/**/*.log src/test/resources/META-INF/service/* + src/main/resources/META-INF/service/* */target/** */*.iml -- GitLab