未验证 提交 c3d46410 编写于 作者: Y yuz10 提交者: GitHub

[ISSUE #2833] Support trace for TranscationProducer (#2834)

上级 bc4ecb3e
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.hook;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.common.message.Message;
public class EndTransactionContext {
private String producerGroup;
private Message message;
private String brokerAddr;
private String msgId;
private String transactionId;
private LocalTransactionState transactionState;
private boolean fromTransactionCheck;
public String getProducerGroup() {
return producerGroup;
}
public void setProducerGroup(String producerGroup) {
this.producerGroup = producerGroup;
}
public Message getMessage() {
return message;
}
public void setMessage(Message message) {
this.message = message;
}
public String getBrokerAddr() {
return brokerAddr;
}
public void setBrokerAddr(String brokerAddr) {
this.brokerAddr = brokerAddr;
}
public String getMsgId() {
return msgId;
}
public void setMsgId(String msgId) {
this.msgId = msgId;
}
public String getTransactionId() {
return transactionId;
}
public void setTransactionId(String transactionId) {
this.transactionId = transactionId;
}
public LocalTransactionState getTransactionState() {
return transactionState;
}
public void setTransactionState(LocalTransactionState transactionState) {
this.transactionState = transactionState;
}
public boolean isFromTransactionCheck() {
return fromTransactionCheck;
}
public void setFromTransactionCheck(boolean fromTransactionCheck) {
this.fromTransactionCheck = fromTransactionCheck;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.hook;
public interface EndTransactionHook {
String hookName();
void endTransaction(final EndTransactionContext context);
}
......@@ -44,6 +44,8 @@ import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.exception.RequestTimeoutException;
import org.apache.rocketmq.client.hook.CheckForbiddenContext;
import org.apache.rocketmq.client.hook.CheckForbiddenHook;
import org.apache.rocketmq.client.hook.EndTransactionContext;
import org.apache.rocketmq.client.hook.EndTransactionHook;
import org.apache.rocketmq.client.hook.SendMessageContext;
import org.apache.rocketmq.client.hook.SendMessageHook;
import org.apache.rocketmq.client.impl.CommunicationMode;
......@@ -101,6 +103,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
private final ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable =
new ConcurrentHashMap<String, TopicPublishInfo>();
private final ArrayList<SendMessageHook> sendMessageHookList = new ArrayList<SendMessageHook>();
private final ArrayList<EndTransactionHook> endTransactionHookList = new ArrayList<EndTransactionHook>();
private final RPCHook rpcHook;
private final BlockingQueue<Runnable> asyncSenderThreadPoolQueue;
private final ExecutorService defaultAsyncSenderExecutor;
......@@ -171,6 +174,11 @@ public class DefaultMQProducerImpl implements MQProducerInner {
log.info("register sendMessage Hook, {}", hook.hookName());
}
public void registerEndTransactionHook(final EndTransactionHook hook) {
this.endTransactionHookList.add(hook);
log.info("register endTransaction Hook, {}", hook.hookName());
}
public void start() throws MQClientException {
this.start(true);
}
......@@ -386,6 +394,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
if (exception != null) {
remark = "checkLocalTransactionState Exception: " + RemotingHelper.exceptionSimpleDesc(exception);
}
doExecuteEndTransactionHook(msg, uniqueKey, brokerAddr, localTransactionState, true);
try {
DefaultMQProducerImpl.this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, thisHeader, remark,
......@@ -967,6 +976,36 @@ public class DefaultMQProducerImpl implements MQProducerInner {
}
}
public boolean hasEndTransactionHook() {
return !this.endTransactionHookList.isEmpty();
}
public void executeEndTransactionHook(final EndTransactionContext context) {
if (!this.endTransactionHookList.isEmpty()) {
for (EndTransactionHook hook : this.endTransactionHookList) {
try {
hook.endTransaction(context);
} catch (Throwable e) {
log.warn("failed to executeEndTransactionHook", e);
}
}
}
}
public void doExecuteEndTransactionHook(Message msg, String msgId, String brokerAddr, LocalTransactionState state,
boolean fromTransactionCheck) {
if (hasEndTransactionHook()) {
EndTransactionContext context = new EndTransactionContext();
context.setProducerGroup(defaultMQProducer.getProducerGroup());
context.setBrokerAddr(brokerAddr);
context.setMessage(msg);
context.setMsgId(msgId);
context.setTransactionId(msg.getTransactionId());
context.setTransactionState(state);
context.setFromTransactionCheck(fromTransactionCheck);
executeEndTransactionHook(context);
}
}
/**
* DEFAULT ONEWAY -------------------------------------------------------
*/
......@@ -1266,7 +1305,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
}
try {
this.endTransaction(sendResult, localTransactionState, localException);
this.endTransaction(msg, sendResult, localTransactionState, localException);
} catch (Exception e) {
log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
}
......@@ -1290,6 +1329,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
}
public void endTransaction(
final Message msg,
final SendResult sendResult,
final LocalTransactionState localTransactionState,
final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
......@@ -1318,6 +1358,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
break;
}
doExecuteEndTransactionHook(msg, sendResult.getMsgId(), brokerAddr, localTransactionState, false);
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
requestHeader.setMsgId(sendResult.getMsgId());
......
......@@ -29,6 +29,7 @@ import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.client.trace.AsyncTraceDispatcher;
import org.apache.rocketmq.client.trace.TraceDispatcher;
import org.apache.rocketmq.client.trace.hook.EndTransactionTraceHookImpl;
import org.apache.rocketmq.client.trace.hook.SendMessageTraceHookImpl;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.Message;
......@@ -167,6 +168,8 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
traceDispatcher = dispatcher;
this.defaultMQProducerImpl.registerSendMessageHook(
new SendMessageTraceHookImpl(traceDispatcher));
this.defaultMQProducerImpl.registerEndTransactionHook(
new EndTransactionTraceHookImpl(traceDispatcher));
} catch (Throwable e) {
log.error("system mqtrace hook init failed ,maybe can't send msg trace data");
}
......@@ -252,6 +255,8 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
traceDispatcher = dispatcher;
this.getDefaultMQProducerImpl().registerSendMessageHook(
new SendMessageTraceHookImpl(traceDispatcher));
this.defaultMQProducerImpl.registerEndTransactionHook(
new EndTransactionTraceHookImpl(traceDispatcher));
} catch (Throwable e) {
log.error("system mqtrace hook init failed ,maybe can't send msg trace data");
}
......@@ -916,24 +921,24 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.defaultMQProducerImpl.send(batch(msgs), messageQueue, timeout);
}
@Override
public void send(Collection<Message> msgs, SendCallback sendCallback) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
this.defaultMQProducerImpl.send(batch(msgs), sendCallback);
}
@Override
public void send(Collection<Message> msgs, SendCallback sendCallback,
long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
this.defaultMQProducerImpl.send(batch(msgs), sendCallback, timeout);
}
@Override
public void send(Collection<Message> msgs, MessageQueue mq,
SendCallback sendCallback) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
this.defaultMQProducerImpl.send(batch(msgs), queueWithNamespace(mq), sendCallback);
}
@Override
public void send(Collection<Message> msgs, MessageQueue mq,
SendCallback sendCallback, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
......
......@@ -51,6 +51,10 @@ public class TransactionMQProducer extends DefaultMQProducer {
super(namespace, producerGroup, rpcHook);
}
public TransactionMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook, boolean enableMsgTrace, final String customizedTraceTopic) {
super(namespace, producerGroup, rpcHook, enableMsgTrace, customizedTraceTopic);
}
@Override
public void start() throws MQClientException {
this.defaultMQProducerImpl.initTransactionEnv();
......
......@@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.client.trace;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.MessageType;
......@@ -32,7 +33,9 @@ public class TraceBean {
private int retryTimes;
private int bodyLength;
private MessageType msgType;
private LocalTransactionState transactionState;
private String transactionId;
private boolean fromTransactionCheck;
public MessageType getMsgType() {
return msgType;
......@@ -141,4 +144,28 @@ public class TraceBean {
public void setBodyLength(int bodyLength) {
this.bodyLength = bodyLength;
}
public LocalTransactionState getTransactionState() {
return transactionState;
}
public void setTransactionState(LocalTransactionState transactionState) {
this.transactionState = transactionState;
}
public String getTransactionId() {
return transactionId;
}
public void setTransactionId(String transactionId) {
this.transactionId = transactionId;
}
public boolean isFromTransactionCheck() {
return fromTransactionCheck;
}
public void setFromTransactionCheck(boolean fromTransactionCheck) {
this.fromTransactionCheck = fromTransactionCheck;
}
}
......@@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.client.trace;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.common.message.MessageType;
import java.util.ArrayList;
......@@ -109,6 +110,27 @@ public class TraceDataEncoder {
subAfterContext.setGroupName(line[8]);
}
resList.add(subAfterContext);
} else if (line[0].equals(TraceType.EndTransaction.name())) {
TraceContext endTransactionContext = new TraceContext();
endTransactionContext.setTraceType(TraceType.EndTransaction);
endTransactionContext.setTimeStamp(Long.parseLong(line[1]));
endTransactionContext.setRegionId(line[2]);
endTransactionContext.setGroupName(line[3]);
TraceBean bean = new TraceBean();
bean.setTopic(line[4]);
bean.setMsgId(line[5]);
bean.setTags(line[6]);
bean.setKeys(line[7]);
bean.setStoreHost(line[8]);
bean.setMsgType(MessageType.values()[Integer.parseInt(line[9])]);
bean.setClientHost(line[10]);
bean.setTransactionId(line[11]);
bean.setTransactionState(LocalTransactionState.valueOf(line[12]));
bean.setFromTransactionCheck(Boolean.parseBoolean(line[13]));
endTransactionContext.setTraceBeans(new ArrayList<TraceBean>(1));
endTransactionContext.getTraceBeans().add(bean);
resList.add(endTransactionContext);
}
}
return resList;
......@@ -173,9 +195,26 @@ public class TraceDataEncoder {
.append(ctx.getContextCode()).append(TraceConstants.CONTENT_SPLITOR)
.append(ctx.getTimeStamp()).append(TraceConstants.CONTENT_SPLITOR)
.append(ctx.getGroupName()).append(TraceConstants.FIELD_SPLITOR);
}
}
case EndTransaction: {
TraceBean bean = ctx.getTraceBeans().get(0);
sb.append(ctx.getTraceType()).append(TraceConstants.CONTENT_SPLITOR)//
.append(ctx.getTimeStamp()).append(TraceConstants.CONTENT_SPLITOR)//
.append(ctx.getRegionId()).append(TraceConstants.CONTENT_SPLITOR)//
.append(ctx.getGroupName()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getTopic()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getMsgId()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getTags()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getKeys()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getStoreHost()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getMsgType().ordinal()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getClientHost()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getTransactionId()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getTransactionState().name()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.isFromTransactionCheck()).append(TraceConstants.FIELD_SPLITOR);
}
break;
default:
}
......
......@@ -20,4 +20,5 @@ public enum TraceType {
Pub,
SubBefore,
SubAfter,
EndTransaction,
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.trace.hook;
import org.apache.rocketmq.client.hook.EndTransactionContext;
import org.apache.rocketmq.client.hook.EndTransactionHook;
import org.apache.rocketmq.client.trace.AsyncTraceDispatcher;
import org.apache.rocketmq.client.trace.TraceBean;
import org.apache.rocketmq.client.trace.TraceContext;
import org.apache.rocketmq.client.trace.TraceDispatcher;
import org.apache.rocketmq.client.trace.TraceType;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageType;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
import java.util.ArrayList;
public class EndTransactionTraceHookImpl implements EndTransactionHook {
private TraceDispatcher localDispatcher;
public EndTransactionTraceHookImpl(TraceDispatcher localDispatcher) {
this.localDispatcher = localDispatcher;
}
@Override
public String hookName() {
return "EndTransactionTraceHook";
}
@Override
public void endTransaction(EndTransactionContext context) {
//if it is message trace data,then it doesn't recorded
if (context == null || context.getMessage().getTopic().startsWith(((AsyncTraceDispatcher) localDispatcher).getTraceTopicName())) {
return;
}
Message msg = context.getMessage();
//build the context content of TuxeTraceContext
TraceContext tuxeContext = new TraceContext();
tuxeContext.setTraceBeans(new ArrayList<TraceBean>(1));
tuxeContext.setTraceType(TraceType.EndTransaction);
tuxeContext.setGroupName(NamespaceUtil.withoutNamespace(context.getProducerGroup()));
//build the data bean object of message trace
TraceBean traceBean = new TraceBean();
traceBean.setTopic(NamespaceUtil.withoutNamespace(context.getMessage().getTopic()));
traceBean.setTags(context.getMessage().getTags());
traceBean.setKeys(context.getMessage().getKeys());
traceBean.setStoreHost(context.getBrokerAddr());
traceBean.setMsgType(MessageType.Trans_msg_Commit);
traceBean.setClientHost(((AsyncTraceDispatcher)localDispatcher).getHostProducer().getmQClientFactory().getClientId());
traceBean.setMsgId(context.getMsgId());
traceBean.setTransactionState(context.getTransactionState());
traceBean.setTransactionId(context.getTransactionId());
traceBean.setFromTransactionCheck(context.isFromTransactionCheck());
String regionId = msg.getProperty(MessageConst.PROPERTY_MSG_REGION);
if (regionId == null || regionId.isEmpty()) {
regionId = MixAll.DEFAULT_TRACE_REGION_ID;
}
tuxeContext.setRegionId(regionId);
tuxeContext.getTraceBeans().add(traceBean);
tuxeContext.setTimeStamp(System.currentTimeMillis());
localDispatcher.append(tuxeContext);
}
}
......@@ -17,6 +17,7 @@
package org.apache.rocketmq.client.trace;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.MessageType;
import org.junit.Assert;
......@@ -90,4 +91,46 @@ public class TraceDataEncoderTest {
Assert.assertEquals(traceTransferBean.getTransKey().size(), 2);
}
}
\ No newline at end of file
@Test
public void testEncoderFromContextBean_EndTransaction() {
TraceContext context = new TraceContext();
context.setTraceType(TraceType.EndTransaction);
context.setGroupName("PID-test");
context.setRegionId("DefaultRegion");
context.setTimeStamp(time);
TraceBean traceBean = new TraceBean();
traceBean.setTopic("topic-test");
traceBean.setKeys("Keys");
traceBean.setTags("Tags");
traceBean.setMsgId("AC1415116D1418B4AAC217FE1B4E0000");
traceBean.setStoreHost("127.0.0.1:10911");
traceBean.setClientHost("127.0.0.1@41700");
traceBean.setMsgType(MessageType.Trans_msg_Commit);
traceBean.setTransactionId("transactionId");
traceBean.setTransactionState(LocalTransactionState.COMMIT_MESSAGE);
traceBean.setFromTransactionCheck(false);
List<TraceBean> traceBeans = new ArrayList<TraceBean>();
traceBeans.add(traceBean);
context.setTraceBeans(traceBeans);
TraceTransferBean traceTransferBean = TraceDataEncoder.encoderFromContextBean(context);
Assert.assertEquals(traceTransferBean.getTransKey().size(), 2);
String traceData = traceTransferBean.getTransData();
TraceContext contextAfter = TraceDataEncoder.decoderFromTraceDataString(traceData).get(0);
Assert.assertEquals(context.getTraceType(), contextAfter.getTraceType());
Assert.assertEquals(context.getTimeStamp(), contextAfter.getTimeStamp());
Assert.assertEquals(context.getGroupName(), contextAfter.getGroupName());
TraceBean before = context.getTraceBeans().get(0);
TraceBean after = contextAfter.getTraceBeans().get(0);
Assert.assertEquals(before.getTopic(), after.getTopic());
Assert.assertEquals(before.getMsgId(), after.getMsgId());
Assert.assertEquals(before.getTags(), after.getTags());
Assert.assertEquals(before.getKeys(), after.getKeys());
Assert.assertEquals(before.getStoreHost(), after.getStoreHost());
Assert.assertEquals(before.getMsgType(), after.getMsgType());
Assert.assertEquals(before.getClientHost(), after.getClientHost());
Assert.assertEquals(before.getTransactionId(), after.getTransactionId());
Assert.assertEquals(before.getTransactionState(), after.getTransactionState());
Assert.assertEquals(before.isFromTransactionCheck(), after.isFromTransactionCheck());
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.trace;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.hook.EndTransactionContext;
import org.apache.rocketmq.client.hook.EndTransactionHook;
import org.apache.rocketmq.client.hook.SendMessageContext;
import org.apache.rocketmq.client.impl.CommunicationMode;
import org.apache.rocketmq.client.impl.MQClientAPIImpl;
import org.apache.rocketmq.client.impl.MQClientManager;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Spy;
import org.mockito.junit.MockitoJUnitRunner;
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class TransactionMQProducerWithTraceTest {
@Spy
private MQClientInstance mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());
@Mock
private MQClientAPIImpl mQClientAPIImpl;
@Mock
private EndTransactionHook endTransactionHook;
private AsyncTraceDispatcher asyncTraceDispatcher;
private TransactionMQProducer producer;
private DefaultMQProducer traceProducer;
private Message message;
private String topic = "FooBar";
private String producerGroupPrefix = "FooBar_PID";
private String producerGroupTemp = producerGroupPrefix + System.currentTimeMillis();
private String producerGroupTraceTemp = TopicValidator.RMQ_SYS_TRACE_TOPIC + System.currentTimeMillis();
private String customerTraceTopic = "rmq_trace_topic_12345";
@Before
public void init() throws Exception {
TransactionListener transactionListener = new TransactionListener() {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
return LocalTransactionState.COMMIT_MESSAGE;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
return LocalTransactionState.COMMIT_MESSAGE;
}
};
producer = new TransactionMQProducer(null, producerGroupTemp, null, true, null);
producer.setTransactionListener(transactionListener);
producer.setNamesrvAddr("127.0.0.1:9876");
message = new Message(topic, new byte[] {'a', 'b', 'c'});
asyncTraceDispatcher = (AsyncTraceDispatcher) producer.getTraceDispatcher();
traceProducer = asyncTraceDispatcher.getTraceProducer();
producer.start();
Field field = DefaultMQProducerImpl.class.getDeclaredField("mQClientFactory");
field.setAccessible(true);
field.set(producer.getDefaultMQProducerImpl(), mQClientFactory);
Field fieldTrace = DefaultMQProducerImpl.class.getDeclaredField("mQClientFactory");
fieldTrace.setAccessible(true);
fieldTrace.set(traceProducer.getDefaultMQProducerImpl(), mQClientFactory);
field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
field.setAccessible(true);
field.set(mQClientFactory, mQClientAPIImpl);
producer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTemp, producer.getDefaultMQProducerImpl());
Field fieldHooks = DefaultMQProducerImpl.class.getDeclaredField("endTransactionHookList");
fieldHooks.setAccessible(true);
List<EndTransactionHook>hooks = new ArrayList<>();
hooks.add(endTransactionHook);
fieldHooks.set(producer.getDefaultMQProducerImpl(), hooks);
when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class),
nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))).thenCallRealMethod();
when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class),
nullable(SendCallback.class), nullable(TopicPublishInfo.class), nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class)))
.thenReturn(createSendResult(SendStatus.SEND_OK));
}
@Test
public void testSendMessageSync_WithTrace_Success() throws RemotingException, InterruptedException, MQBrokerException, MQClientException {
traceProducer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTraceTemp, traceProducer.getDefaultMQProducerImpl());
when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute());
AtomicReference<EndTransactionContext> context = new AtomicReference<>();
doAnswer(mock -> {
context.set(mock.getArgument(0));
return null;
}).when(endTransactionHook).endTransaction(any());
producer.sendMessageInTransaction(message, null);
EndTransactionContext ctx = context.get();
assertThat(ctx.getProducerGroup()).isEqualTo(producerGroupTemp);
assertThat(ctx.getMsgId()).isEqualTo("123");
assertThat(ctx.isFromTransactionCheck()).isFalse();
assertThat(new String(ctx.getMessage().getBody())).isEqualTo(new String(message.getBody()));
assertThat(ctx.getMessage().getTopic()).isEqualTo(topic);
}
@After
public void terminate() {
producer.shutdown();
}
public static TopicRouteData createTopicRoute() {
TopicRouteData topicRouteData = new TopicRouteData();
topicRouteData.setFilterServerTable(new HashMap<String, List<String>>());
List<BrokerData> brokerDataList = new ArrayList<BrokerData>();
BrokerData brokerData = new BrokerData();
brokerData.setBrokerName("BrokerA");
brokerData.setCluster("DefaultCluster");
HashMap<Long, String> brokerAddrs = new HashMap<Long, String>();
brokerAddrs.put(0L, "127.0.0.1:10911");
brokerData.setBrokerAddrs(brokerAddrs);
brokerDataList.add(brokerData);
topicRouteData.setBrokerDatas(brokerDataList);
List<QueueData> queueDataList = new ArrayList<QueueData>();
QueueData queueData = new QueueData();
queueData.setBrokerName("BrokerA");
queueData.setPerm(6);
queueData.setReadQueueNums(3);
queueData.setWriteQueueNums(4);
queueData.setTopicSysFlag(0);
queueDataList.add(queueData);
topicRouteData.setQueueDatas(queueDataList);
return topicRouteData;
}
private SendResult createSendResult(SendStatus sendStatus) {
SendResult sendResult = new SendResult();
sendResult.setMsgId("123");
sendResult.setOffsetMsgId(MessageDecoder.createMessageId(new InetSocketAddress("127.0.0.1", 12), 1));
sendResult.setQueueOffset(456);
sendResult.setSendStatus(sendStatus);
sendResult.setRegionId("HZ");
sendResult.setMessageQueue(new MessageQueue(topic, "broker-trace", 0));
return sendResult;
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册