提交 5f664247 编写于 作者: H Hu Zongtang 提交者: Zhendong Liu

[ISSUE#525]add aclClient PRCHook for message track (#638)

* prepare to separate production-ready projects from the external projects

* Update fastjson to the latest stable version

* Clean code, don't list the default config in JVM

* Update README.md

* update the year info in NOTICE

* Release semaphore when timeout

* [ISSUE#525]add aclClient PRCHook for message track

* [ISSUE#525]add the apache license text,remove the merged from master branch

* [ISSUE#525]add aclClient PRCHook for message track,remove the merged content of notice and readme.md from master branch,add some unit test to increase the code coverage.
上级 4250af6e
...@@ -37,8 +37,7 @@ It offers a variety of features: ...@@ -37,8 +37,7 @@ It offers a variety of features:
---------- ----------
## Apache RocketMQ Community ## Apache RocketMQ Community
* [RocketMQ Community Incubator Projects](https://github.com/apache/rocketmq-externals) [RocketMQ Community Incubator Projects](https://github.com/apache/rocketmq-externals)
---------- ----------
## Contributing ## Contributing
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.common; package org.apache.rocketmq.acl.common;
import org.junit.Test; import org.junit.Test;
......
...@@ -153,4 +153,16 @@ public class PermissionTest { ...@@ -153,4 +153,16 @@ public class PermissionTest {
} }
} }
} }
@Test
public void AclExceptionTest(){
AclException aclException = new AclException("CAL_SIGNATURE_FAILED",10015);
AclException aclExceptionWithMessage = new AclException("CAL_SIGNATURE_FAILED",10015,"CAL_SIGNATURE_FAILED Exception");
Assert.assertEquals(aclException.getCode(),10015);
Assert.assertEquals(aclExceptionWithMessage.getStatus(),"CAL_SIGNATURE_FAILED");
aclException.setCode(10016);
Assert.assertEquals(aclException.getCode(),10016);
aclException.setStatus("netaddress examine scope Exception netaddress");
Assert.assertEquals(aclException.getStatus(),"netaddress examine scope Exception netaddress");
}
} }
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl.common; package org.apache.rocketmq.acl.common;
import org.junit.Assert; import org.junit.Assert;
...@@ -25,5 +41,51 @@ public class SessionCredentialsTest { ...@@ -25,5 +41,51 @@ public class SessionCredentialsTest {
sessionCredentials.updateContent(properties); sessionCredentials.updateContent(properties);
} }
@Test
public void SessionCredentialHashCodeTest(){
SessionCredentials sessionCredentials=new SessionCredentials();
Properties properties=new Properties();
properties.setProperty(SessionCredentials.ACCESS_KEY,"RocketMQ");
properties.setProperty(SessionCredentials.SECRET_KEY,"12345678");
properties.setProperty(SessionCredentials.SECURITY_TOKEN,"abcd");
sessionCredentials.updateContent(properties);
Assert.assertEquals(sessionCredentials.hashCode(),353652211);
}
@Test
public void SessionCredentialEqualsTest(){
SessionCredentials sessionCredential1 =new SessionCredentials();
Properties properties1=new Properties();
properties1.setProperty(SessionCredentials.ACCESS_KEY,"RocketMQ");
properties1.setProperty(SessionCredentials.SECRET_KEY,"12345678");
properties1.setProperty(SessionCredentials.SECURITY_TOKEN,"abcd");
sessionCredential1.updateContent(properties1);
SessionCredentials sessionCredential2 =new SessionCredentials();
Properties properties2=new Properties();
properties2.setProperty(SessionCredentials.ACCESS_KEY,"RocketMQ");
properties2.setProperty(SessionCredentials.SECRET_KEY,"12345678");
properties2.setProperty(SessionCredentials.SECURITY_TOKEN,"abcd");
sessionCredential2.updateContent(properties2);
Assert.assertTrue(sessionCredential2.equals(sessionCredential1));
sessionCredential2.setSecretKey("1234567899");
sessionCredential2.setSignature("1234567899");
Assert.assertFalse(sessionCredential2.equals(sessionCredential1));
}
@Test
public void SessionCredentialToStringTest(){
SessionCredentials sessionCredential1 =new SessionCredentials();
Properties properties1=new Properties();
properties1.setProperty(SessionCredentials.ACCESS_KEY,"RocketMQ");
properties1.setProperty(SessionCredentials.SECRET_KEY,"12345678");
properties1.setProperty(SessionCredentials.SECURITY_TOKEN,"abcd");
sessionCredential1.updateContent(properties1);
Assert.assertEquals(sessionCredential1.toString(),
"SessionCredentials [accessKey=RocketMQ, secretKey=12345678, signature=null, SecurityToken=abcd]");
}
} }
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.broker.pagecache;
import java.nio.ByteBuffer;
import org.apache.rocketmq.store.GetMessageResult;
import org.junit.Assert;
import org.junit.Test;
public class ManyMessageTransferTest {
@Test
public void ManyMessageTransferBuilderTest(){
ByteBuffer byteBuffer = ByteBuffer.allocate(20);
byteBuffer.putInt(20);
GetMessageResult getMessageResult = new GetMessageResult();
ManyMessageTransfer manyMessageTransfer = new ManyMessageTransfer(byteBuffer,getMessageResult);
}
@Test
public void ManyMessageTransferPosTest(){
ByteBuffer byteBuffer = ByteBuffer.allocate(20);
byteBuffer.putInt(20);
GetMessageResult getMessageResult = new GetMessageResult();
ManyMessageTransfer manyMessageTransfer = new ManyMessageTransfer(byteBuffer,getMessageResult);
Assert.assertEquals(manyMessageTransfer.position(),4);
}
@Test
public void ManyMessageTransferCountTest(){
ByteBuffer byteBuffer = ByteBuffer.allocate(20);
byteBuffer.putInt(20);
GetMessageResult getMessageResult = new GetMessageResult();
ManyMessageTransfer manyMessageTransfer = new ManyMessageTransfer(byteBuffer,getMessageResult);
Assert.assertEquals(manyMessageTransfer.count(),20);
}
@Test
public void ManyMessageTransferCloseTest(){
ByteBuffer byteBuffer = ByteBuffer.allocate(20);
byteBuffer.putInt(20);
GetMessageResult getMessageResult = new GetMessageResult();
ManyMessageTransfer manyMessageTransfer = new ManyMessageTransfer(byteBuffer,getMessageResult);
manyMessageTransfer.close();
manyMessageTransfer.deallocate();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.broker.pagecache;
import java.nio.ByteBuffer;
import org.apache.rocketmq.store.MappedFile;
import org.apache.rocketmq.store.SelectMappedBufferResult;
import org.junit.Assert;
import org.junit.Test;
public class OneMessageTransferTest {
@Test
public void OneMessageTransferTest(){
ByteBuffer byteBuffer = ByteBuffer.allocate(20);
byteBuffer.putInt(20);
SelectMappedBufferResult selectMappedBufferResult = new SelectMappedBufferResult(0,byteBuffer,20,new MappedFile());
OneMessageTransfer manyMessageTransfer = new OneMessageTransfer(byteBuffer,selectMappedBufferResult);
}
@Test
public void OneMessageTransferCountTest(){
ByteBuffer byteBuffer = ByteBuffer.allocate(20);
byteBuffer.putInt(20);
SelectMappedBufferResult selectMappedBufferResult = new SelectMappedBufferResult(0,byteBuffer,20,new MappedFile());
OneMessageTransfer manyMessageTransfer = new OneMessageTransfer(byteBuffer,selectMappedBufferResult);
Assert.assertEquals(manyMessageTransfer.count(),40);
}
@Test
public void OneMessageTransferPosTest(){
ByteBuffer byteBuffer = ByteBuffer.allocate(20);
byteBuffer.putInt(20);
SelectMappedBufferResult selectMappedBufferResult = new SelectMappedBufferResult(0,byteBuffer,20,new MappedFile());
OneMessageTransfer manyMessageTransfer = new OneMessageTransfer(byteBuffer,selectMappedBufferResult);
Assert.assertEquals(manyMessageTransfer.position(),8);
}
}
...@@ -309,7 +309,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume ...@@ -309,7 +309,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
} else { } else {
tempProperties.put(TrackTraceConstants.TRACE_TOPIC, MixAll.RMQ_SYS_TRACK_TRACE_TOPIC); tempProperties.put(TrackTraceConstants.TRACE_TOPIC, MixAll.RMQ_SYS_TRACK_TRACE_TOPIC);
} }
AsyncArrayDispatcher dispatcher = new AsyncArrayDispatcher(tempProperties); AsyncArrayDispatcher dispatcher = new AsyncArrayDispatcher(tempProperties, rpcHook);
dispatcher.setHostConsumer(this.getDefaultMQPushConsumerImpl()); dispatcher.setHostConsumer(this.getDefaultMQPushConsumerImpl());
traceDispatcher = dispatcher; traceDispatcher = dispatcher;
......
...@@ -178,7 +178,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { ...@@ -178,7 +178,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
} else { } else {
tempProperties.put(TrackTraceConstants.TRACE_TOPIC, MixAll.RMQ_SYS_TRACK_TRACE_TOPIC); tempProperties.put(TrackTraceConstants.TRACE_TOPIC, MixAll.RMQ_SYS_TRACK_TRACE_TOPIC);
} }
AsyncArrayDispatcher dispatcher = new AsyncArrayDispatcher(tempProperties); AsyncArrayDispatcher dispatcher = new AsyncArrayDispatcher(tempProperties, rpcHook);
dispatcher.setHostProducer(this.getDefaultMQProducerImpl()); dispatcher.setHostProducer(this.getDefaultMQProducerImpl());
traceDispatcher = dispatcher; traceDispatcher = dispatcher;
......
...@@ -49,6 +49,7 @@ import java.util.concurrent.ArrayBlockingQueue; ...@@ -49,6 +49,7 @@ import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.remoting.RPCHook;
/** /**
* Created by zongtanghu on 2018/11/6. * Created by zongtanghu on 2018/11/6.
...@@ -74,7 +75,7 @@ public class AsyncArrayDispatcher implements AsyncDispatcher { ...@@ -74,7 +75,7 @@ public class AsyncArrayDispatcher implements AsyncDispatcher {
private String dispatcherId = UUID.randomUUID().toString(); private String dispatcherId = UUID.randomUUID().toString();
private String traceTopicName; private String traceTopicName;
public AsyncArrayDispatcher(Properties properties) throws MQClientException { public AsyncArrayDispatcher(Properties properties, RPCHook rpcHook) throws MQClientException {
dispatcherType = properties.getProperty(TrackTraceConstants.TRACE_DISPATCHER_TYPE); dispatcherType = properties.getProperty(TrackTraceConstants.TRACE_DISPATCHER_TYPE);
int queueSize = Integer.parseInt(properties.getProperty(TrackTraceConstants.ASYNC_BUFFER_SIZE, "2048")); int queueSize = Integer.parseInt(properties.getProperty(TrackTraceConstants.ASYNC_BUFFER_SIZE, "2048"));
// queueSize is greater than or equal to the n power of 2 of value // queueSize is greater than or equal to the n power of 2 of value
...@@ -92,7 +93,7 @@ public class AsyncArrayDispatcher implements AsyncDispatcher { ...@@ -92,7 +93,7 @@ public class AsyncArrayDispatcher implements AsyncDispatcher {
TimeUnit.MILLISECONDS, // TimeUnit.MILLISECONDS, //
this.appenderQueue, // this.appenderQueue, //
new ThreadFactoryImpl("MQTraceSendThread_")); new ThreadFactoryImpl("MQTraceSendThread_"));
traceProducer = TrackTraceProducerFactory.getTraceDispatcherProducer(properties); traceProducer = TrackTraceProducerFactory.getTraceDispatcherProducer(properties, rpcHook);
} }
public String getTraceTopicName() { public String getTraceTopicName() {
......
...@@ -25,6 +25,7 @@ import java.util.Map; ...@@ -25,6 +25,7 @@ import java.util.Map;
import java.util.Properties; import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.rocketmq.remoting.RPCHook;
public class TrackTraceProducerFactory { public class TrackTraceProducerFactory {
...@@ -33,10 +34,10 @@ public class TrackTraceProducerFactory { ...@@ -33,10 +34,10 @@ public class TrackTraceProducerFactory {
private static DefaultMQProducer traceProducer; private static DefaultMQProducer traceProducer;
public static DefaultMQProducer getTraceDispatcherProducer(Properties properties) { public static DefaultMQProducer getTraceDispatcherProducer(Properties properties, RPCHook rpcHook) {
if (traceProducer == null) { if (traceProducer == null) {
traceProducer = new DefaultMQProducer(); traceProducer = new DefaultMQProducer(rpcHook);
traceProducer.setProducerGroup(TrackTraceConstants.GROUP_NAME); traceProducer.setProducerGroup(TrackTraceConstants.GROUP_NAME);
traceProducer.setSendMsgTimeout(5000); traceProducer.setSendMsgTimeout(5000);
traceProducer.setInstanceName(properties.getProperty(TrackTraceConstants.INSTANCE_NAME, String.valueOf(System.currentTimeMillis()))); traceProducer.setInstanceName(properties.getProperty(TrackTraceConstants.INSTANCE_NAME, String.valueOf(System.currentTimeMillis())));
......
...@@ -259,6 +259,7 @@ ...@@ -259,6 +259,7 @@
<exclude>src/test/resources/certs/*</exclude> <exclude>src/test/resources/certs/*</exclude>
<exclude>src/test/**/*.log</exclude> <exclude>src/test/**/*.log</exclude>
<exclude>src/test/resources/META-INF/service/*</exclude> <exclude>src/test/resources/META-INF/service/*</exclude>
<exclude>src/main/resources/META-INF/service/*</exclude>
<exclude>*/target/**</exclude> <exclude>*/target/**</exclude>
<exclude>*/*.iml</exclude> <exclude>*/*.iml</exclude>
</excludes> </excludes>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册