diff --git a/acl/src/test/java/org/apache/rocketmq/acl/common/AclSignerTest.java b/acl/src/test/java/org/apache/rocketmq/acl/common/AclSignerTest.java new file mode 100644 index 0000000000000000000000000000000000000000..4169d88fe9fe62c2e3b71f83f96ad60978d5c6f6 --- /dev/null +++ b/acl/src/test/java/org/apache/rocketmq/acl/common/AclSignerTest.java @@ -0,0 +1,18 @@ +package org.apache.rocketmq.acl.common; + +import org.junit.Test; + +public class AclSignerTest { + + @Test(expected = Exception.class) + public void calSignatureExceptionTest(){ + AclSigner.calSignature(new byte[]{},""); + } + + @Test + public void calSignatureTest(){ + AclSigner.calSignature("RocketMQ","12345678"); + AclSigner.calSignature("RocketMQ".getBytes(),"12345678"); + } + +} diff --git a/acl/src/test/java/org/apache/rocketmq/acl/common/SessionCredentialsTest.java b/acl/src/test/java/org/apache/rocketmq/acl/common/SessionCredentialsTest.java new file mode 100644 index 0000000000000000000000000000000000000000..b6f9b8ce05a34574bcd67008ab4c314489678072 --- /dev/null +++ b/acl/src/test/java/org/apache/rocketmq/acl/common/SessionCredentialsTest.java @@ -0,0 +1,29 @@ +package org.apache.rocketmq.acl.common; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.Properties; + +public class SessionCredentialsTest { + + @Test + public void equalsTest(){ + SessionCredentials sessionCredentials=new SessionCredentials("RocketMQ","12345678"); + sessionCredentials.setSecurityToken("abcd"); + SessionCredentials other=new SessionCredentials("RocketMQ","12345678","abcd"); + Assert.assertTrue(sessionCredentials.equals(other)); + } + + @Test + public void updateContentTest(){ + SessionCredentials sessionCredentials=new SessionCredentials(); + Properties properties=new Properties(); + properties.setProperty(SessionCredentials.ACCESS_KEY,"RocketMQ"); + properties.setProperty(SessionCredentials.SECRET_KEY,"12345678"); + properties.setProperty(SessionCredentials.SECURITY_TOKEN,"abcd"); + sessionCredentials.updateContent(properties); + } + + +} diff --git a/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainAccessValidatorTest.java b/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainAccessValidatorTest.java index 12e47afdcd8ff3c580afc5ca90488ef768d96c5e..77bbb1193f11652544a4e1c984761a4a30ccf86d 100644 --- a/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainAccessValidatorTest.java +++ b/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainAccessValidatorTest.java @@ -17,11 +17,18 @@ package org.apache.rocketmq.acl.plain; import java.nio.ByteBuffer; +import java.util.HashSet; +import java.util.Set; + import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.AclUtils; import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.common.protocol.RequestCode; -import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader; +import org.apache.rocketmq.common.protocol.header.*; +import org.apache.rocketmq.common.protocol.heartbeat.ConsumerData; +import org.apache.rocketmq.common.protocol.heartbeat.HeartbeatData; +import org.apache.rocketmq.common.protocol.heartbeat.ProducerData; +import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.junit.Assert; import org.junit.Before; @@ -29,22 +36,22 @@ import org.junit.Test; public class PlainAccessValidatorTest { - PlainAccessValidator plainAccessValidator; - + private PlainAccessValidator plainAccessValidator; + private AclClientRPCHook aclClient; + private SessionCredentials sessionCredentials; @Before public void init() { System.setProperty("rocketmq.home.dir", "src/test/resources"); plainAccessValidator = new PlainAccessValidator(); - } - - @Test - public void contentTest() { - SessionCredentials sessionCredentials = new SessionCredentials(); + sessionCredentials = new SessionCredentials(); sessionCredentials.setAccessKey("RocketMQ"); sessionCredentials.setSecretKey("12345678"); sessionCredentials.setSecurityToken("87654321"); - AclClientRPCHook aclClient = new AclClientRPCHook(sessionCredentials); + aclClient = new AclClientRPCHook(sessionCredentials); + } + @Test + public void contentTest() { SendMessageRequestHeader messageRequestHeader = new SendMessageRequestHeader(); messageRequestHeader.setTopic("topicA"); RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, messageRequestHeader); @@ -63,12 +70,22 @@ public class PlainAccessValidatorTest { @Test public void validateTest() { - SessionCredentials sessionCredentials = new SessionCredentials(); - sessionCredentials.setAccessKey("RocketMQ"); - sessionCredentials.setSecretKey("12345678"); - sessionCredentials.setSecurityToken("87654321"); - AclClientRPCHook aclClient = new AclClientRPCHook(sessionCredentials); + SendMessageRequestHeader messageRequestHeader = new SendMessageRequestHeader(); + messageRequestHeader.setTopic("topicB"); + RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, messageRequestHeader); + aclClient.doBeforeRequest("", remotingCommand); + ByteBuffer buf = remotingCommand.encodeHeader(); + buf.getInt(); + buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf); + buf.position(0); + PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "192.168.0.1"); + plainAccessValidator.validate(accessResource); + + } + + @Test + public void validateSendMessageTest() { SendMessageRequestHeader messageRequestHeader = new SendMessageRequestHeader(); messageRequestHeader.setTopic("topicB"); RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, messageRequestHeader); @@ -81,4 +98,136 @@ public class PlainAccessValidatorTest { PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "192.168.0.1"); plainAccessValidator.validate(accessResource); } + + @Test + public void validateSendMessageV2Test() { + SendMessageRequestHeader messageRequestHeader = new SendMessageRequestHeader(); + messageRequestHeader.setTopic("topicC"); + RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE_V2, SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(messageRequestHeader)); + aclClient.doBeforeRequest("", remotingCommand); + + ByteBuffer buf = remotingCommand.encodeHeader(); + buf.getInt(); + buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf); + buf.position(0); + PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "192.168.0.1:9876"); + plainAccessValidator.validate(accessResource); + } + + @Test + public void validatePullMessageTest() { + PullMessageRequestHeader pullMessageRequestHeader=new PullMessageRequestHeader(); + pullMessageRequestHeader.setTopic("topicC"); + pullMessageRequestHeader.setConsumerGroup("consumerGroupA"); + RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE,pullMessageRequestHeader); + aclClient.doBeforeRequest("", remotingCommand); + ByteBuffer buf = remotingCommand.encodeHeader(); + buf.getInt(); + buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf); + buf.position(0); + PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "192.168.0.1:9876"); + plainAccessValidator.validate(accessResource); + } + + @Test + public void validateConsumeMessageBackTest() { + ConsumerSendMsgBackRequestHeader consumerSendMsgBackRequestHeader=new ConsumerSendMsgBackRequestHeader(); + consumerSendMsgBackRequestHeader.setOriginTopic("topicC"); + consumerSendMsgBackRequestHeader.setGroup("consumerGroupA"); + RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK,consumerSendMsgBackRequestHeader); + aclClient.doBeforeRequest("", remotingCommand); + ByteBuffer buf = remotingCommand.encodeHeader(); + buf.getInt(); + buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf); + buf.position(0); + PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "192.168.0.1:9876"); + plainAccessValidator.validate(accessResource); + } + + @Test + public void validateQueryMessageTest() { + QueryMessageRequestHeader queryMessageRequestHeader=new QueryMessageRequestHeader(); + queryMessageRequestHeader.setTopic("topicC"); + RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.QUERY_MESSAGE,queryMessageRequestHeader); + aclClient.doBeforeRequest("", remotingCommand); + ByteBuffer buf = remotingCommand.encodeHeader(); + buf.getInt(); + buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf); + buf.position(0); + PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "192.168.0.1:9876"); + plainAccessValidator.validate(accessResource); + } + + @Test + public void validateHeartBeatTest() { + HeartbeatData heartbeatData=new HeartbeatData(); + Set producerDataSet=new HashSet<>(); + Set consumerDataSet=new HashSet<>(); + Set subscriptionDataSet=new HashSet<>(); + ProducerData producerData=new ProducerData(); + producerData.setGroupName("producerGroupA"); + ConsumerData consumerData=new ConsumerData(); + consumerData.setGroupName("consumerGroupA"); + SubscriptionData subscriptionData=new SubscriptionData(); + subscriptionData.setTopic("topicC"); + producerDataSet.add(producerData); + consumerDataSet.add(consumerData); + subscriptionDataSet.add(subscriptionData); + consumerData.setSubscriptionDataSet(subscriptionDataSet); + heartbeatData.setProducerDataSet(producerDataSet); + heartbeatData.setConsumerDataSet(consumerDataSet); + RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.HEART_BEAT,null); + remotingCommand.setBody(heartbeatData.encode()); + aclClient.doBeforeRequest("", remotingCommand); + ByteBuffer buf = remotingCommand.encode(); + buf.getInt(); + buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf); + buf.position(0); + PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "192.168.0.1:9876"); + plainAccessValidator.validate(accessResource); + } + + @Test + public void validateUnRegisterClientTest() { + UnregisterClientRequestHeader unregisterClientRequestHeader=new UnregisterClientRequestHeader(); + unregisterClientRequestHeader.setConsumerGroup("consumerGroupA"); + RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.UNREGISTER_CLIENT,unregisterClientRequestHeader); + aclClient.doBeforeRequest("", remotingCommand); + ByteBuffer buf = remotingCommand.encodeHeader(); + buf.getInt(); + buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf); + buf.position(0); + PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "192.168.0.1:9876"); + plainAccessValidator.validate(accessResource); + } + + @Test + public void validateGetConsumerListByGroupTest() { + GetConsumerListByGroupRequestHeader getConsumerListByGroupRequestHeader=new GetConsumerListByGroupRequestHeader(); + getConsumerListByGroupRequestHeader.setConsumerGroup("consumerGroupA"); + RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_LIST_BY_GROUP,getConsumerListByGroupRequestHeader); + aclClient.doBeforeRequest("", remotingCommand); + ByteBuffer buf = remotingCommand.encodeHeader(); + buf.getInt(); + buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf); + buf.position(0); + PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "192.168.0.1:9876"); + plainAccessValidator.validate(accessResource); + } + + @Test + public void validateUpdateConsumerOffSetTest() { + UpdateConsumerOffsetRequestHeader updateConsumerOffsetRequestHeader=new UpdateConsumerOffsetRequestHeader(); + updateConsumerOffsetRequestHeader.setConsumerGroup("consumerGroupA"); + RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.UPDATE_CONSUMER_OFFSET,updateConsumerOffsetRequestHeader); + aclClient.doBeforeRequest("", remotingCommand); + ByteBuffer buf = remotingCommand.encodeHeader(); + buf.getInt(); + buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf); + buf.position(0); + PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "192.168.0.1:9876"); + plainAccessValidator.validate(accessResource); + } + + }