提交 1f897336 编写于 作者: D dongeforever

Polish compile errrors

上级 61338871
......@@ -691,7 +691,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
public void sendMessageBack(MessageExt msg, int delayLevel)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
msg.setTopic(withNamespace(msg.getTopic()));
this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, null);
this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, (String) null);
}
/**
......
......@@ -16,7 +16,6 @@
*/
package org.apache.rocketmq.client.impl;
import com.google.common.base.Function;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
......@@ -180,9 +179,7 @@ import org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData;
import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfo;
import org.apache.rocketmq.common.protocol.route.MessageQueueRouteState;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.protocol.route.TopicRouteDataNameSrv;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.InvokeCallback;
......@@ -201,8 +198,6 @@ import org.apache.rocketmq.remoting.protocol.LanguageCode;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import static com.google.common.base.Optional.fromNullable;
public class MQClientAPIImpl {
private final static InternalLogger log = ClientLogger.getLog();
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.consumer;
import com.alibaba.fastjson.JSON;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.SettableFuture;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.exception.MQRedirectException;
import org.apache.rocketmq.client.impl.CommunicationMode;
import org.apache.rocketmq.client.impl.FindBrokerResult;
import org.apache.rocketmq.client.impl.MQClientAPIImpl;
import org.apache.rocketmq.client.impl.MQClientManager;
import org.apache.rocketmq.client.impl.consumer.PullAPIWrapper;
import org.apache.rocketmq.client.impl.consumer.PullResultExt;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData;
import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfo;
import org.apache.rocketmq.common.protocol.route.MessageQueueRouteState;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.assertj.core.util.Lists;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class DefaultMQPullConsumerLogicalQueueTest {
private MQClientInstance mQClientFactory;
@Mock
private MQClientAPIImpl mQClientAPIImpl;
private DefaultMQPullConsumer pullConsumer;
private String topic;
private static final String cluster = "DefaultCluster";
private static final String broker1Name = "BrokerA";
private static final String broker1Addr = "127.0.0.2:10911";
private static final String broker2Name = "BrokerB";
private static final String broker2Addr = "127.0.0.3:10911";
@Before
public void init() throws Exception {
topic = "FooBar" + System.nanoTime();
mQClientFactory = spy(MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig()));
FieldUtils.writeField(mQClientFactory, "mQClientAPIImpl", mQClientAPIImpl, true);
pullConsumer = new DefaultMQPullConsumer("FooBarGroup" + System.nanoTime());
pullConsumer.setNamesrvAddr("127.0.0.1:9876");
pullConsumer.start();
PullAPIWrapper pullAPIWrapper = pullConsumer.getDefaultMQPullConsumerImpl().getPullAPIWrapper();
FieldUtils.writeDeclaredField(pullAPIWrapper, "mQClientFactory", mQClientFactory, true);
when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong(), anyBoolean(), ArgumentMatchers.<Set<Integer>>any())).thenReturn(createTopicRouteData());
doReturn(new FindBrokerResult(broker1Addr, false)).when(mQClientFactory).findBrokerAddressInSubscribe(eq(broker1Name), anyLong(), anyBoolean());
doReturn(new FindBrokerResult(broker2Addr, false)).when(mQClientFactory).findBrokerAddressInSubscribe(eq(broker2Name), anyLong(), anyBoolean());
}
@After
public void terminate() {
pullConsumer.shutdown();
}
@Test
public void testStart_OffsetShouldNotNUllAfterStart() {
Assert.assertNotNull(pullConsumer.getOffsetStore());
}
@Test
public void testPullMessage_Success() throws Exception {
doAnswer(new Answer<PullResultExt>() {
@Override public PullResultExt answer(InvocationOnMock mock) throws Throwable {
PullMessageRequestHeader requestHeader = mock.getArgument(1);
return DefaultMQPullConsumerLogicalQueueTest.this.createPullResult(requestHeader, PullStatus.FOUND, Collections.singletonList(new MessageExt()));
}
}).when(mQClientAPIImpl).pullMessage(eq(broker1Addr), any(PullMessageRequestHeader.class), anyLong(), eq(CommunicationMode.SYNC), (PullCallback) isNull());
MessageQueue messageQueue = new MessageQueue(topic, MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, 0);
PullResult pullResult = pullConsumer.pull(messageQueue, "*", 1024, 3);
assertThat(pullResult).isNotNull();
assertThat(pullResult.getPullStatus()).isEqualTo(PullStatus.FOUND);
assertThat(pullResult.getNextBeginOffset()).isEqualTo(1024 + 1);
assertThat(pullResult.getMinOffset()).isEqualTo(123);
assertThat(pullResult.getMaxOffset()).isEqualTo(2048);
assertThat(pullResult.getMsgFoundList()).isEqualTo(Collections.emptyList());
}
@Test
public void testPullMessage_NotFound() throws Exception {
doAnswer(new Answer<PullResult>() {
@Override public PullResult answer(InvocationOnMock mock) throws Throwable {
PullMessageRequestHeader requestHeader = mock.getArgument(1);
return DefaultMQPullConsumerLogicalQueueTest.this.createPullResult(requestHeader, PullStatus.NO_NEW_MSG, new ArrayList<MessageExt>());
}
}).when(mQClientAPIImpl).pullMessage(eq(broker1Addr), any(PullMessageRequestHeader.class), anyLong(), eq(CommunicationMode.SYNC), (PullCallback) isNull());
MessageQueue messageQueue = new MessageQueue(topic, MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, 0);
PullResult pullResult = pullConsumer.pull(messageQueue, "*", 1024, 3);
assertThat(pullResult.getPullStatus()).isEqualTo(PullStatus.NO_NEW_MSG);
}
@Test
public void testPullMessageAsync_Success() throws Exception {
doAnswer(new Answer<PullResult>() {
@Override public PullResult answer(InvocationOnMock mock) throws Throwable {
PullMessageRequestHeader requestHeader = mock.getArgument(1);
PullResult pullResult = DefaultMQPullConsumerLogicalQueueTest.this.createPullResult(requestHeader, PullStatus.FOUND, Collections.singletonList(new MessageExt()));
PullCallback pullCallback = mock.getArgument(4);
pullCallback.onSuccess(pullResult);
return null;
}
}).when(mQClientAPIImpl).pullMessage(eq(broker1Addr), any(PullMessageRequestHeader.class), anyLong(), eq(CommunicationMode.ASYNC), any(PullCallback.class));
final SettableFuture<PullResult> future = SettableFuture.create();
MessageQueue messageQueue = new MessageQueue(topic, broker1Name, 0);
pullConsumer.pull(messageQueue, "*", 1024, 3, new PullCallback() {
@Override
public void onSuccess(PullResult pullResult) {
future.set(pullResult);
}
@Override
public void onException(Throwable e) {
future.setException(e);
}
});
PullResult pullResult = future.get(3, TimeUnit.SECONDS);
assertThat(pullResult).isNotNull();
assertThat(pullResult.getPullStatus()).isEqualTo(PullStatus.FOUND);
assertThat(pullResult.getNextBeginOffset()).isEqualTo(1024 + 1);
assertThat(pullResult.getMinOffset()).isEqualTo(123);
assertThat(pullResult.getMaxOffset()).isEqualTo(2048);
assertThat(pullResult.getMsgFoundList()).isEqualTo(Collections.emptyList());
}
@Test
public void testPullMessageSync_Redirect() throws Exception {
doAnswer(new Answer<PullResult>() {
@Override public PullResult answer(InvocationOnMock mock) throws Throwable {
throw new MQRedirectException(JSON.toJSONBytes(ImmutableList.of(
new LogicalQueueRouteData(0, 0, new MessageQueue(topic, broker1Name, 0), MessageQueueRouteState.Expired, 0, 0, 0, 0, broker1Addr),
new LogicalQueueRouteData(0, 10, new MessageQueue(topic, broker2Name, 0), MessageQueueRouteState.Normal, 0, -1, -1, -1, broker2Addr)
)));
}
}).when(mQClientAPIImpl).pullMessage(eq(broker1Addr), any(PullMessageRequestHeader.class), anyLong(), eq(CommunicationMode.SYNC), (PullCallback) isNull());
doAnswer(new Answer<PullResult>() {
@Override public PullResult answer(InvocationOnMock mock) throws Throwable {
PullMessageRequestHeader requestHeader = mock.getArgument(1);
return DefaultMQPullConsumerLogicalQueueTest.this.createPullResult(requestHeader, PullStatus.FOUND, Collections.singletonList(new MessageExt()));
}
}).when(mQClientAPIImpl).pullMessage(eq(broker2Addr), any(PullMessageRequestHeader.class), anyLong(), eq(CommunicationMode.SYNC), (PullCallback) isNull());
MessageQueue messageQueue = new MessageQueue(topic, MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, 0);
PullResult pullResult = pullConsumer.pull(messageQueue, "*", 1024, 3);
assertThat(pullResult).isNotNull();
assertThat(pullResult.getPullStatus()).isEqualTo(PullStatus.FOUND);
assertThat(pullResult.getNextBeginOffset()).isEqualTo(1024 + 1);
assertThat(pullResult.getMinOffset()).isEqualTo(123 + 10);
assertThat(pullResult.getMaxOffset()).isEqualTo(2048 + 10);
assertThat(pullResult.getMsgFoundList()).isEqualTo(Collections.emptyList());
}
private TopicRouteData createTopicRouteData() {
TopicRouteData topicRouteData = new TopicRouteData();
topicRouteData.setFilterServerTable(new HashMap<String, List<String>>());
topicRouteData.setBrokerDatas(ImmutableList.of(
new BrokerData(cluster, broker1Name, new HashMap<Long, String>(Collections.singletonMap(MixAll.MASTER_ID, broker1Addr))),
new BrokerData(cluster, broker2Name, new HashMap<Long, String>(Collections.singletonMap(MixAll.MASTER_ID, broker2Addr)))
));
List<QueueData> queueDataList = new ArrayList<QueueData>();
QueueData queueData;
queueData = new QueueData();
queueData.setBrokerName(broker1Name);
queueData.setPerm(6);
queueData.setReadQueueNums(3);
queueData.setWriteQueueNums(4);
queueData.setTopicSysFlag(0);
queueDataList.add(queueData);
queueData = new QueueData();
queueData.setBrokerName(broker2Name);
queueData.setPerm(6);
queueData.setReadQueueNums(3);
queueData.setWriteQueueNums(4);
queueData.setTopicSysFlag(0);
queueDataList.add(queueData);
topicRouteData.setQueueDatas(queueDataList);
LogicalQueuesInfo info = new LogicalQueuesInfo();
info.put(0, Lists.newArrayList(new LogicalQueueRouteData(0, 0, new MessageQueue(topic, broker1Name, 0), MessageQueueRouteState.Normal, 0, 0, 0, 0, broker1Addr)));
topicRouteData.setLogicalQueuesInfo(info);
return topicRouteData;
}
private PullResultExt createPullResult(PullMessageRequestHeader requestHeader, PullStatus pullStatus,
List<MessageExt> messageExtList) throws Exception {
return new PullResultExt(pullStatus, requestHeader.getQueueOffset() + messageExtList.size(), 123, 2048, messageExtList, 0, new byte[] {});
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.producer;
import com.alibaba.fastjson.JSON;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.SettableFuture;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.exception.MQRedirectException;
import org.apache.rocketmq.client.hook.SendMessageContext;
import org.apache.rocketmq.client.impl.CommunicationMode;
import org.apache.rocketmq.client.impl.MQClientAPIImpl;
import org.apache.rocketmq.client.impl.MQClientManager;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData;
import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfo;
import org.apache.rocketmq.common.protocol.route.MessageQueueRouteState;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.assertj.core.api.ThrowableAssert;
import org.assertj.core.util.Lists;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class DefaultMQProducerLogicalQueueTest {
private MQClientInstance mQClientFactory;
@Mock
private MQClientAPIImpl mQClientAPIImpl;
private DefaultMQProducer producer;
private Message message;
private String topic;
private MessageQueue messageQueue;
private static final String cluster = "DefaultCluster";
private static final String broker1Name = "broker1";
private static final String broker2Name = "broker2";
private static final String broker1Addr = "127.0.0.2:10911";
private static final String broker2Addr = "127.0.0.3:10911";
@Before
public void init() throws Exception {
topic = "Foobar" + System.nanoTime();
messageQueue = new MessageQueue(topic, MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, 0);
ConcurrentMap<String, MQClientInstance> factoryTable = (ConcurrentMap<String/* clientId */, MQClientInstance>) FieldUtils.readDeclaredField(MQClientManager.getInstance(), "factoryTable", true);
for (MQClientInstance instance : factoryTable.values()) {
instance.shutdown();
}
factoryTable.clear();
mQClientFactory = spy(MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig()));
factoryTable.put(new ClientConfig().buildMQClientId(), mQClientFactory);
String producerGroupTemp = "FooBar_PID" + System.nanoTime();
producer = new DefaultMQProducer(producerGroupTemp);
producer.setNamesrvAddr("127.0.0.1:9876");
producer.setCompressMsgBodyOverHowmuch(Integer.MAX_VALUE);
message = new Message(topic, new byte[] {'a'});
mQClientFactory.registerProducer(producerGroupTemp, producer.getDefaultMQProducerImpl());
producer.start();
FieldUtils.writeDeclaredField(producer.getDefaultMQProducerImpl(), "mQClientFactory", mQClientFactory, true);
FieldUtils.writeField(mQClientFactory, "mQClientAPIImpl", mQClientAPIImpl, true);
when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class),
nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))).thenCallRealMethod();
when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class),
(SendCallback) isNull(), nullable(TopicPublishInfo.class), nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class)))
.thenReturn(createSendResult(SendStatus.SEND_OK));
when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class),
any(SendCallback.class), nullable(TopicPublishInfo.class), nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class)))
.thenAnswer(new Answer<SendResult>() {
@Override public SendResult answer(InvocationOnMock invocation) throws Throwable {
SendCallback sendCallback = invocation.getArgument(6);
sendCallback.onSuccess(DefaultMQProducerLogicalQueueTest.this.createSendResult(SendStatus.SEND_OK));
return null;
}
});
}
@After
public void terminate() {
producer.shutdown();
}
@Test
public void testSendMessageSync_Success() throws Exception {
when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong(), anyBoolean(), ArgumentMatchers.<Set<Integer>>any())).thenReturn(createTopicRoute());
SendResult sendResult = producer.send(message, messageQueue);
assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK);
assertThat(sendResult.getOffsetMsgId()).isEqualTo("123");
assertThat(sendResult.getQueueOffset()).isEqualTo(456L);
}
@Test
public void testSendMessageSync_Redirect() throws Exception {
when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong(), anyBoolean(), ArgumentMatchers.<Set<Integer>>any())).thenReturn(createTopicRoute());
when(mQClientAPIImpl.sendMessage(eq(broker1Addr), eq(broker1Name), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), eq(CommunicationMode.SYNC),
(SendCallback) isNull(), nullable(TopicPublishInfo.class), nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class)))
.thenThrow(new MQRedirectException(null));
assertThatThrownBy(new ThrowableAssert.ThrowingCallable() {
@Override public void call() throws Throwable {
producer.send(message, messageQueue);
}
}).isInstanceOf(MQBrokerException.class).hasMessageContaining("redirect");
when(mQClientAPIImpl.sendMessage(eq(broker1Addr), eq(broker1Name), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), eq(CommunicationMode.SYNC),
(SendCallback) isNull(), nullable(TopicPublishInfo.class), nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class)))
.thenThrow(new MQRedirectException(JSON.toJSONBytes(ImmutableList.of(
new LogicalQueueRouteData(0, 0, new MessageQueue(topic, broker1Name, 0), MessageQueueRouteState.Expired, 0, 0, 0, 0, broker1Addr),
new LogicalQueueRouteData(0, 10, new MessageQueue(topic, broker2Name, 0), MessageQueueRouteState.Normal, 0, -1, -1, -1, broker2Addr)))));
when(mQClientAPIImpl.sendMessage(eq(broker2Addr), eq(broker2Name), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), eq(CommunicationMode.SYNC),
(SendCallback) isNull(), nullable(TopicPublishInfo.class), nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class)))
.thenReturn(createSendResult(SendStatus.SEND_OK));
SendResult sendResult = producer.send(message, messageQueue);
assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK);
assertThat(sendResult.getOffsetMsgId()).isEqualTo("123");
assertThat(sendResult.getQueueOffset()).isEqualTo(466L);
}
@Test
public void testSendMessageSync_RemotingException() throws Exception {
TopicRouteData topicRouteData = createTopicRoute();
when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong(), anyBoolean(), ArgumentMatchers.<Set<Integer>>any())).thenReturn(topicRouteData);
when(mQClientAPIImpl.sendMessage(eq(broker1Addr), eq(broker1Name), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), eq(CommunicationMode.SYNC),
(SendCallback) isNull(), nullable(TopicPublishInfo.class), nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class)))
.thenThrow(new RemotingConnectException(broker1Addr));
SendResult returnSendResult = createSendResult(SendStatus.SEND_OK);
when(mQClientAPIImpl.sendMessage(eq(broker2Addr), eq(broker2Name), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), eq(CommunicationMode.SYNC),
(SendCallback) isNull(), nullable(TopicPublishInfo.class), nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class)))
.thenReturn(returnSendResult);
assertThatThrownBy(new ThrowableAssert.ThrowingCallable() {
@Override public void call() throws Throwable {
producer.send(message, messageQueue);
}
}).isInstanceOf(RemotingConnectException.class).hasMessageContaining(broker1Addr);
topicRouteData.getLogicalQueuesInfo().get(0).add(new LogicalQueueRouteData(0, -1, new MessageQueue(topic, broker2Name, 1), MessageQueueRouteState.WriteOnly, 0, -1, -1, -1, broker2Addr));
SendResult sendResult = producer.send(message, messageQueue);
assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK);
assertThat(sendResult.getOffsetMsgId()).isEqualTo("123");
assertThat(sendResult.getQueueOffset()).isEqualTo(-1L);
}
@Test
public void testSendMessageAsync_Success() throws Exception {
when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong(), anyBoolean(), ArgumentMatchers.<Set<Integer>>any())).thenReturn(createTopicRoute());
final SettableFuture<SendResult> future = SettableFuture.create();
producer.send(message, messageQueue, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
future.set(sendResult);
}
@Override
public void onException(Throwable e) {
future.setException(e);
}
});
SendResult sendResult = future.get(3, TimeUnit.SECONDS);
assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK);
assertThat(sendResult.getOffsetMsgId()).isEqualTo("123");
assertThat(sendResult.getQueueOffset()).isEqualTo(456L);
}
@Test
public void testSendMessageAsync() throws Exception {
when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong(), anyBoolean(), ArgumentMatchers.<Set<Integer>>any())).thenReturn(createTopicRoute());
final AtomicReference<SettableFuture<SendResult>> future = new AtomicReference<SettableFuture<SendResult>>();
SendCallback sendCallback = new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
future.get().set(sendResult);
}
@Override
public void onException(Throwable e) {
future.get().setException(e);
}
};
Message message = new Message();
message.setTopic("test");
message.setBody("hello world".getBytes());
future.set(SettableFuture.<SendResult>create());
producer.send(new Message(), messageQueue, sendCallback);
assertThatThrownBy(new ThrowableAssert.ThrowingCallable() {
@Override public void call() throws Throwable {
future.get().get(3, TimeUnit.SECONDS);
}
}).hasCauseInstanceOf(MQClientException.class).hasMessageContaining("The specified topic is blank");
//this message is send success
message.setTopic(topic);
future.set(SettableFuture.<SendResult>create());
producer.send(message, messageQueue, sendCallback, 1000);
future.get().get(3, TimeUnit.SECONDS);
}
public TopicRouteData createTopicRoute() {
TopicRouteData topicRouteData = new TopicRouteData();
topicRouteData.setFilterServerTable(new HashMap<String, List<String>>());
topicRouteData.setBrokerDatas(ImmutableList.of(
new BrokerData(cluster, broker1Name, new HashMap<Long, String>(Collections.singletonMap(MixAll.MASTER_ID, broker1Addr))),
new BrokerData(cluster, broker2Name, new HashMap<Long, String>(Collections.singletonMap(MixAll.MASTER_ID, broker2Addr)))
));
List<QueueData> queueDataList = new ArrayList<QueueData>();
QueueData queueData;
queueData = new QueueData();
queueData.setBrokerName(broker1Name);
queueData.setPerm(6);
queueData.setReadQueueNums(3);
queueData.setWriteQueueNums(4);
queueData.setTopicSysFlag(0);
queueDataList.add(queueData);
queueData = new QueueData();
queueData.setBrokerName(broker2Name);
queueData.setPerm(6);
queueData.setReadQueueNums(3);
queueData.setWriteQueueNums(4);
queueData.setTopicSysFlag(0);
queueDataList.add(queueData);
topicRouteData.setQueueDatas(queueDataList);
LogicalQueuesInfo info = new LogicalQueuesInfo();
info.put(0, Lists.newArrayList(new LogicalQueueRouteData(0, 0, new MessageQueue(topic, broker1Name, 0), MessageQueueRouteState.Normal, 0, 0, 0, 0, broker1Addr)));
topicRouteData.setLogicalQueuesInfo(info);
return topicRouteData;
}
private SendResult createSendResult(SendStatus sendStatus) {
SendResult sendResult = new SendResult();
sendResult.setMsgId("123");
sendResult.setOffsetMsgId("123");
sendResult.setQueueOffset(456);
sendResult.setSendStatus(sendStatus);
sendResult.setRegionId("HZ");
sendResult.setMessageQueue(new MessageQueue(topic, broker1Name, 0));
return sendResult;
}
}
......@@ -22,7 +22,6 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
......@@ -49,20 +48,21 @@ import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Spy;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
......@@ -153,7 +153,7 @@ public class DefaultMQProducerTest {
@Test
public void testSendMessageSync_Success() throws RemotingException, InterruptedException, MQBrokerException, MQClientException {
when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong(), anyBoolean(), ArgumentMatchers.<Set<Integer>>any())).thenReturn(createTopicRoute());
when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute());
SendResult sendResult = producer.send(message);
assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK);
......@@ -163,7 +163,7 @@ public class DefaultMQProducerTest {
@Test
public void testSendMessageSync_WithBodyCompressed() throws RemotingException, InterruptedException, MQBrokerException, MQClientException {
when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong(), anyBoolean(), ArgumentMatchers.<Set<Integer>>any())).thenReturn(createTopicRoute());
when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute());
SendResult sendResult = producer.send(bigMessage);
assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK);
......@@ -174,7 +174,7 @@ public class DefaultMQProducerTest {
@Test
public void testSendMessageAsync_Success() throws RemotingException, InterruptedException, MQBrokerException, MQClientException {
final CountDownLatch countDownLatch = new CountDownLatch(1);
when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong(), anyBoolean(), ArgumentMatchers.<Set<Integer>>any())).thenReturn(createTopicRoute());
when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute());
producer.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
......@@ -197,7 +197,7 @@ public class DefaultMQProducerTest {
final AtomicInteger cc = new AtomicInteger(0);
final CountDownLatch countDownLatch = new CountDownLatch(6);
when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong(), anyBoolean(), ArgumentMatchers.<Set<Integer>>any())).thenReturn(createTopicRoute());
when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute());
SendCallback sendCallback = new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
......@@ -239,7 +239,7 @@ public class DefaultMQProducerTest {
final AtomicInteger cc = new AtomicInteger(0);
final CountDownLatch countDownLatch = new CountDownLatch(4);
when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong(), anyBoolean(), ArgumentMatchers.<Set<Integer>>any())).thenReturn(createTopicRoute());
when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute());
SendCallback sendCallback = new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
......@@ -260,7 +260,7 @@ public class DefaultMQProducerTest {
}
};
List<Message> msgs = new ArrayList<Message>();
List<Message> msgs = new ArrayList<>();
for (int i = 0; i < 5; i++) {
Message message = new Message();
message.setTopic("test");
......@@ -281,7 +281,7 @@ public class DefaultMQProducerTest {
@Test
public void testSendMessageAsync_BodyCompressed() throws RemotingException, InterruptedException, MQBrokerException, MQClientException {
final CountDownLatch countDownLatch = new CountDownLatch(1);
when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong(), anyBoolean(), ArgumentMatchers.<Set<Integer>>any())).thenReturn(createTopicRoute());
when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute());
producer.send(bigMessage, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
......@@ -300,7 +300,7 @@ public class DefaultMQProducerTest {
@Test
public void testSendMessageSync_SuccessWithHook() throws Throwable {
when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong(), anyBoolean(), ArgumentMatchers.<Set<Integer>>any())).thenReturn(createTopicRoute());
when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute());
final Throwable[] assertionErrors = new Throwable[1];
final CountDownLatch countDownLatch = new CountDownLatch(2);
producer.getDefaultMQProducerImpl().registerSendMessageHook(new SendMessageHook() {
......@@ -368,7 +368,7 @@ public class DefaultMQProducerTest {
@Test
public void testRequestMessage() throws RemotingException, RequestTimeoutException, MQClientException, InterruptedException, MQBrokerException {
when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong(), anyBoolean(), ArgumentMatchers.<Set<Integer>>any())).thenReturn(createTopicRoute());
when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute());
final AtomicBoolean finish = new AtomicBoolean(false);
new Thread(new Runnable() {
@Override public void run() {
......@@ -394,13 +394,13 @@ public class DefaultMQProducerTest {
@Test(expected = RequestTimeoutException.class)
public void testRequestMessage_RequestTimeoutException() throws RemotingException, RequestTimeoutException, MQClientException, InterruptedException, MQBrokerException {
when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong(), anyBoolean(), ArgumentMatchers.<Set<Integer>>any())).thenReturn(createTopicRoute());
when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute());
Message result = producer.request(message, 3 * 1000L);
}
@Test
public void testAsyncRequest_OnSuccess() throws Exception {
when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong(), anyBoolean(), ArgumentMatchers.<Set<Integer>>any())).thenReturn(createTopicRoute());
when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute());
final CountDownLatch countDownLatch = new CountDownLatch(1);
RequestCallback requestCallback = new RequestCallback() {
@Override public void onSuccess(Message message) {
......
......@@ -20,11 +20,6 @@ package org.apache.rocketmq.client.trace;
import io.opentracing.mock.MockSpan;
import io.opentracing.mock.MockTracer;
import io.opentracing.tag.Tags;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
......@@ -52,14 +47,17 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Spy;
import org.mockito.junit.MockitoJUnitRunner;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
......@@ -115,7 +113,7 @@ public class DefaultMQProducerWithOpenTracingTest {
@Test
public void testSendMessageSync_WithTrace_Success() throws RemotingException, InterruptedException, MQBrokerException, MQClientException {
producer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTraceTemp, producer.getDefaultMQProducerImpl());
when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong(), anyBoolean(), ArgumentMatchers.<Set<Integer>>any())).thenReturn(createTopicRoute());
when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute());
producer.send(message);
assertThat(tracer.finishedSpans().size()).isEqualTo(1);
MockSpan span = tracer.finishedSpans().get(0);
......
......@@ -17,13 +17,6 @@
package org.apache.rocketmq.client.trace;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
......@@ -50,17 +43,18 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Spy;
import org.mockito.junit.MockitoJUnitRunner;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.nullable;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
......@@ -128,7 +122,7 @@ public class DefaultMQProducerWithTraceTest {
@Test
public void testSendMessageSync_WithTrace_Success() throws RemotingException, InterruptedException, MQBrokerException, MQClientException {
traceProducer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTraceTemp, traceProducer.getDefaultMQProducerImpl());
when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong(), anyBoolean(), ArgumentMatchers.<Set<Integer>>any())).thenReturn(createTopicRoute());
when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute());
final CountDownLatch countDownLatch = new CountDownLatch(1);
try {
producer.send(message);
......@@ -140,7 +134,7 @@ public class DefaultMQProducerWithTraceTest {
@Test
public void testSendMessageSync_WithTrace_NoBrokerSet_Exception() throws RemotingException, InterruptedException, MQBrokerException, MQClientException {
when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong(), anyBoolean(), ArgumentMatchers.<Set<Integer>>any())).thenReturn(createTopicRoute());
when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute());
final CountDownLatch countDownLatch = new CountDownLatch(1);
try {
producer.send(message);
......
......@@ -20,12 +20,6 @@ package org.apache.rocketmq.client.trace;
import io.opentracing.mock.MockSpan;
import io.opentracing.mock.MockTracer;
import io.opentracing.tag.Tags;
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
......@@ -59,14 +53,18 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Spy;
import org.mockito.junit.MockitoJUnitRunner;
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
......@@ -133,7 +131,7 @@ public class TransactionMQProducerWithOpenTracingTest {
@Test
public void testSendMessageSync_WithTrace_Success() throws RemotingException, InterruptedException, MQBrokerException, MQClientException {
producer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTraceTemp, producer.getDefaultMQProducerImpl());
when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong(), anyBoolean(), ArgumentMatchers.<Set<Integer>>any())).thenReturn(createTopicRoute());
when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute());
producer.sendMessageInTransaction(message, null);
assertThat(tracer.finishedSpans().size()).isEqualTo(2);
......
......@@ -17,13 +17,6 @@
package org.apache.rocketmq.client.trace;
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
......@@ -57,20 +50,19 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Spy;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.when;
......@@ -135,7 +127,7 @@ public class TransactionMQProducerWithTraceTest {
Field fieldHooks = DefaultMQProducerImpl.class.getDeclaredField("endTransactionHookList");
fieldHooks.setAccessible(true);
List<EndTransactionHook>hooks = new ArrayList<EndTransactionHook>();
List<EndTransactionHook>hooks = new ArrayList<>();
hooks.add(endTransactionHook);
fieldHooks.set(producer.getDefaultMQProducerImpl(), hooks);
......@@ -150,14 +142,12 @@ public class TransactionMQProducerWithTraceTest {
@Test
public void testSendMessageSync_WithTrace_Success() throws RemotingException, InterruptedException, MQBrokerException, MQClientException {
traceProducer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTraceTemp, traceProducer.getDefaultMQProducerImpl());
when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong(), anyBoolean(), ArgumentMatchers.<Set<Integer>>any())).thenReturn(createTopicRoute());
final AtomicReference<EndTransactionContext> context = new AtomicReference<EndTransactionContext>();
doAnswer(new Answer() {
@Override public Object answer(InvocationOnMock mock) throws Throwable {
context.set(mock.<EndTransactionContext>getArgument(0));
return null;
}
}).when(endTransactionHook).endTransaction(ArgumentMatchers.<EndTransactionContext>any());
when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute());
AtomicReference<EndTransactionContext> context = new AtomicReference<>();
doAnswer(mock -> {
context.set(mock.getArgument(0));
return null;
}).when(endTransactionHook).endTransaction(any());
producer.sendMessageInTransaction(message, null);
EndTransactionContext ctx = context.get();
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.protocol.route;
import com.google.common.base.Objects;
public class TopicRouteDataNameSrv extends TopicRouteData {
private LogicalQueuesInfoUnordered logicalQueuesInfoUnordered;
public TopicRouteDataNameSrv() {
}
public LogicalQueuesInfoUnordered getLogicalQueuesInfoUnordered() {
return logicalQueuesInfoUnordered;
}
public void setLogicalQueuesInfoUnordered(
LogicalQueuesInfoUnordered logicalQueuesInfoUnordered) {
this.logicalQueuesInfoUnordered = logicalQueuesInfoUnordered;
}
@Override public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
if (!super.equals(o))
return false;
TopicRouteDataNameSrv srv = (TopicRouteDataNameSrv) o;
return Objects.equal(logicalQueuesInfoUnordered, srv.logicalQueuesInfoUnordered);
}
@Override public int hashCode() {
return Objects.hashCode(super.hashCode(), logicalQueuesInfoUnordered);
}
@Override public String toString() {
return "TopicRouteDataNameSrv{" +
"logicalQueuesInfoUnordered=" + logicalQueuesInfoUnordered +
"} " + super.toString();
}
public TopicRouteData toTopicRouteData() {
TopicRouteData topicRouteData = new TopicRouteData(this);
if (this.logicalQueuesInfoUnordered != null) {
topicRouteData.setLogicalQueuesInfo(this.logicalQueuesInfoUnordered.toLogicalQueuesInfoOrdered());
}
return topicRouteData;
}
}
......@@ -21,6 +21,7 @@ import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.rocketmq.common.protocol.body.RegisterBrokerBody;
import org.apache.rocketmq.common.protocol.body.TopicConfigAndMappingSerializeWrapper;
import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
import static org.junit.Assert.assertEquals;
import org.junit.Test;
......@@ -29,7 +30,7 @@ public class RegisterBrokerBodyTest {
@Test
public void test_encode_decode() throws IOException {
RegisterBrokerBody registerBrokerBody = new RegisterBrokerBody();
TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper();
TopicConfigAndMappingSerializeWrapper topicConfigSerializeWrapper = new TopicConfigAndMappingSerializeWrapper();
registerBrokerBody.setTopicConfigSerializeWrapper(topicConfigSerializeWrapper);
ConcurrentMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>();
......
......@@ -16,42 +16,27 @@
*/
package org.apache.rocketmq.namesrv.processor;
import com.alibaba.fastjson.JSON;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.namesrv.NamesrvConfig;
import org.apache.rocketmq.common.namesrv.RegisterBrokerResult;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.body.RegisterBrokerBody;
import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.common.protocol.header.namesrv.DeleteKVConfigRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.GetKVConfigRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.GetKVConfigResponseHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.GetRouteInfoRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.PutKVConfigRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerRequestHeader;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData;
import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfo;
import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfoUnordered;
import org.apache.rocketmq.common.protocol.route.MessageQueueRouteState;
import org.apache.rocketmq.common.protocol.route.TopicRouteDataNameSrv;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.namesrv.NamesrvController;
import org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册