提交 d66243c0 编写于 作者: 程向往 提交者: Heng Du

[ISSUE #598] Enhance transaction by putting messages that exceed max check...

[ISSUE #598] Enhance transaction by putting messages that exceed max check times to system topic (#633)

* add logic of putting message that exceeds max-check-times to system topic TRANS_CHECK_MAXTIME_TOPIC

* add test case:testResolveDiscardMsg

* add @after logic to test case

* comment brokerController.shutdown and use mock

* add logic of resuming half message check

* add test case:resumeCheckHalfMessage

* delete commented codes
上级 ea88c4b7
......@@ -37,6 +37,7 @@ import org.apache.rocketmq.broker.client.ClientChannelInfo;
import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
import org.apache.rocketmq.broker.filter.ConsumerFilterData;
import org.apache.rocketmq.broker.filter.ExpressionMessageFilter;
import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageUtil;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.PlainAccessConfig;
......@@ -53,7 +54,10 @@ import org.apache.rocketmq.common.protocol.header.GetBrokerAclConfigResponseHead
import org.apache.rocketmq.common.protocol.header.UpdateGlobalWhiteAddrsConfigRequestHeader;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageId;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.RequestCode;
......@@ -100,6 +104,7 @@ import org.apache.rocketmq.common.protocol.header.QueryConsumeTimeSpanRequestHea
import org.apache.rocketmq.common.protocol.header.QueryCorrectionOffsetHeader;
import org.apache.rocketmq.common.protocol.header.QueryTopicConsumeByWhoRequestHeader;
import org.apache.rocketmq.common.protocol.header.ResetOffsetRequestHeader;
import org.apache.rocketmq.common.protocol.header.ResumeCheckHalfMessageRequestHeader;
import org.apache.rocketmq.common.protocol.header.SearchOffsetRequestHeader;
import org.apache.rocketmq.common.protocol.header.SearchOffsetResponseHeader;
import org.apache.rocketmq.common.protocol.header.ViewBrokerStatsDataRequestHeader;
......@@ -120,8 +125,11 @@ import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.apache.rocketmq.store.ConsumeQueue;
import org.apache.rocketmq.store.ConsumeQueueExt;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.MessageExtBrokerInner;
import org.apache.rocketmq.store.MessageFilter;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.PutMessageStatus;
import org.apache.rocketmq.store.SelectMappedBufferResult;
public class AdminBrokerProcessor implements NettyRequestProcessor {
......@@ -216,6 +224,8 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
return getBrokerAclConfigVersion(ctx, request);
case RequestCode.UPDATE_GLOBAL_WHITE_ADDRS_CONFIG:
return updateGlobalWhiteAddrsConfig(ctx, request);
case RequestCode.RESUME_CHECK_HALF_MESSAGE:
return resumeCheckHalfMessage(ctx, request);
default:
break;
}
......@@ -262,7 +272,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig);
this.brokerController.registerIncrementBrokerData(topicConfig,this.brokerController.getTopicConfigManager().getDataVersion());
this.brokerController.registerIncrementBrokerData(topicConfig, this.brokerController.getTopicConfigManager().getDataVersion());
return null;
}
......@@ -1518,4 +1528,62 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
return response;
}
private RemotingCommand resumeCheckHalfMessage(ChannelHandlerContext ctx,
RemotingCommand request)
throws RemotingCommandException {
final ResumeCheckHalfMessageRequestHeader requestHeader = (ResumeCheckHalfMessageRequestHeader) request
.decodeCommandCustomHeader(ResumeCheckHalfMessageRequestHeader.class);
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
SelectMappedBufferResult selectMappedBufferResult = null;
try {
MessageId messageId = MessageDecoder.decodeMessageId(requestHeader.getMsgId());
selectMappedBufferResult = this.brokerController.getMessageStore()
.selectOneMessageByOffset(messageId.getOffset());
MessageExt msg = MessageDecoder.decode(selectMappedBufferResult.getByteBuffer());
msg.putUserProperty(MessageConst.PROPERTY_TRANSACTION_CHECK_TIMES, String.valueOf(0));
PutMessageResult putMessageResult = this.brokerController.getMessageStore()
.putMessage(toMessageExtBrokerInner(msg));
if (putMessageResult != null
&& putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
log.info(
"Put message back to RMQ_SYS_TRANS_HALF_TOPIC. real topic={}",
msg.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC));
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
} else {
log.error("Put message back to RMQ_SYS_TRANS_HALF_TOPIC failed.");
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("Put message back to RMQ_SYS_TRANS_HALF_TOPIC failed.");
}
} catch (Exception e) {
log.error("Exception was thrown when putting message back to RMQ_SYS_TRANS_HALF_TOPIC.");
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("Exception was thrown when putting message back to RMQ_SYS_TRANS_HALF_TOPIC.");
} finally {
if (selectMappedBufferResult != null) {
selectMappedBufferResult.release();
}
}
return response;
}
private MessageExtBrokerInner toMessageExtBrokerInner(MessageExt msgExt) {
MessageExtBrokerInner inner = new MessageExtBrokerInner();
inner.setTopic(TransactionalMessageUtil.buildHalfTopic());
inner.setBody(msgExt.getBody());
inner.setFlag(msgExt.getFlag());
MessageAccessor.setProperties(inner, msgExt.getProperties());
inner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
inner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(msgExt.getTags()));
inner.setQueueId(0);
inner.setSysFlag(msgExt.getSysFlag());
inner.setBornHost(msgExt.getBornHost());
inner.setBornTimestamp(msgExt.getBornTimestamp());
inner.setStoreHost(msgExt.getStoreHost());
inner.setReconsumeTimes(msgExt.getReconsumeTimes());
inner.setMsgId(msgExt.getMsgId());
inner.setWaitStoreMsgOK(false);
return inner;
}
}
......@@ -220,7 +220,7 @@ public class TopicConfigManager extends ConfigManager {
}
if (createNew) {
this.brokerController.registerBrokerAll(false, true,true);
this.brokerController.registerBrokerAll(false, true, true);
}
return topicConfig;
......@@ -264,7 +264,47 @@ public class TopicConfigManager extends ConfigManager {
}
if (createNew) {
this.brokerController.registerBrokerAll(false, true,true);
this.brokerController.registerBrokerAll(false, true, true);
}
return topicConfig;
}
public TopicConfig createTopicOfTranCheckMaxTime(final int clientDefaultTopicQueueNums, final int perm) {
TopicConfig topicConfig = this.topicConfigTable.get(MixAll.TRANS_CHECK_MAX_TIME_TOPIC);
if (topicConfig != null)
return topicConfig;
boolean createNew = false;
try {
if (this.lockTopicConfigTable.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
topicConfig = this.topicConfigTable.get(MixAll.TRANS_CHECK_MAX_TIME_TOPIC);
if (topicConfig != null)
return topicConfig;
topicConfig = new TopicConfig(MixAll.TRANS_CHECK_MAX_TIME_TOPIC);
topicConfig.setReadQueueNums(clientDefaultTopicQueueNums);
topicConfig.setWriteQueueNums(clientDefaultTopicQueueNums);
topicConfig.setPerm(perm);
topicConfig.setTopicSysFlag(0);
log.info("create new topic {}", topicConfig);
this.topicConfigTable.put(MixAll.TRANS_CHECK_MAX_TIME_TOPIC, topicConfig);
createNew = true;
this.dataVersion.nextVersion();
this.persist();
} finally {
this.lockTopicConfigTable.unlock();
}
}
} catch (InterruptedException e) {
log.error("create TRANS_CHECK_MAX_TIME_TOPIC exception", e);
}
if (createNew) {
this.brokerController.registerBrokerAll(false, true, true);
}
return topicConfig;
......@@ -289,7 +329,7 @@ public class TopicConfigManager extends ConfigManager {
this.dataVersion.nextVersion();
this.persist();
this.brokerController.registerBrokerAll(false, true,true);
this.brokerController.registerBrokerAll(false, true, true);
}
}
......@@ -309,7 +349,7 @@ public class TopicConfigManager extends ConfigManager {
this.dataVersion.nextVersion();
this.persist();
this.brokerController.registerBrokerAll(false, true,true);
this.brokerController.registerBrokerAll(false, true, true);
}
}
......
......@@ -17,6 +17,7 @@
package org.apache.rocketmq.broker.transaction;
import io.netty.channel.Channel;
import java.util.Random;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.MessageConst;
......@@ -36,6 +37,10 @@ public abstract class AbstractTransactionalMessageCheckListener {
private BrokerController brokerController;
//queue nums of topic TRANS_CHECK_MAX_TIME_TOPIC
protected final static int TCMT_QUEUE_NUMS = 1;
protected final Random random = new Random(System.currentTimeMillis());
private static ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
......
......@@ -17,10 +17,18 @@
package org.apache.rocketmq.broker.transaction.queue;
import org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.store.MessageExtBrokerInner;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.PutMessageStatus;
public class DefaultTransactionalMessageCheckListener extends AbstractTransactionalMessageCheckListener {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.TRANSACTION_LOGGER_NAME);
......@@ -31,6 +39,41 @@ public class DefaultTransactionalMessageCheckListener extends AbstractTransactio
@Override
public void resolveDiscardMsg(MessageExt msgExt) {
log.error("MsgExt:{} has been checked too many times, so discard it", msgExt);
log.error("MsgExt:{} has been checked too many times, so discard it by moving it to system topic TRANS_CHECK_MAXTIME_TOPIC", msgExt);
try {
MessageExtBrokerInner brokerInner = toMessageExtBrokerInner(msgExt);
PutMessageResult putMessageResult = this.getBrokerController().getMessageStore().putMessage(brokerInner);
if (putMessageResult != null && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
log.info("Put checked-too-many-time half message to TRANS_CHECK_MAXTIME_TOPIC OK. Restored in queueOffset={}, " +
"commitLogOffset={}, real topic={}", msgExt.getQueueOffset(), msgExt.getCommitLogOffset(), msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC));
} else {
log.error("Put checked-too-many-time half message to TRANS_CHECK_MAXTIME_TOPIC failed, real topic={}, msgId={}", msgExt.getTopic(), msgExt.getMsgId());
}
} catch (Exception e) {
log.warn("Put checked-too-many-time message to TRANS_CHECK_MAXTIME_TOPIC error. {}", e);
}
}
private MessageExtBrokerInner toMessageExtBrokerInner(MessageExt msgExt) {
TopicConfig topicConfig = this.getBrokerController().getTopicConfigManager().createTopicOfTranCheckMaxTime(TCMT_QUEUE_NUMS, PermName.PERM_READ | PermName.PERM_WRITE);
int queueId = Math.abs(random.nextInt() % 99999999) % TCMT_QUEUE_NUMS;
MessageExtBrokerInner inner = new MessageExtBrokerInner();
inner.setTopic(topicConfig.getTopicName());
inner.setBody(msgExt.getBody());
inner.setFlag(msgExt.getFlag());
MessageAccessor.setProperties(inner, msgExt.getProperties());
inner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
inner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(msgExt.getTags()));
inner.setQueueId(queueId);
inner.setSysFlag(msgExt.getSysFlag());
inner.setBornHost(msgExt.getBornHost());
inner.setBornTimestamp(msgExt.getBornTimestamp());
inner.setStoreHost(msgExt.getStoreHost());
inner.setReconsumeTimes(msgExt.getReconsumeTimes());
inner.setMsgId(msgExt.getMsgId());
inner.setWaitStoreMsgOK(false);
return inner;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.broker.processor;
import io.netty.channel.ChannelHandlerContext;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.ResumeCheckHalfMessageRequestHeader;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.AppendMessageResult;
import org.apache.rocketmq.store.AppendMessageStatus;
import org.apache.rocketmq.store.MappedFile;
import org.apache.rocketmq.store.MessageExtBrokerInner;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.PutMessageStatus;
import org.apache.rocketmq.store.SelectMappedBufferResult;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Spy;
import org.mockito.junit.MockitoJUnitRunner;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class AdminBrokerProcessorTest {
private AdminBrokerProcessor adminBrokerProcessor;
@Mock
private ChannelHandlerContext handlerContext;
@Spy
private BrokerController
brokerController = new BrokerController(new BrokerConfig(), new NettyServerConfig(), new NettyClientConfig(),
new MessageStoreConfig());
@Mock
private MessageStore messageStore;
@Before
public void init() {
brokerController.setMessageStore(messageStore);
adminBrokerProcessor = new AdminBrokerProcessor(brokerController);
}
@Test
public void testProcessRequest_success() throws RemotingCommandException, UnknownHostException {
RemotingCommand request = createResumeCheckHalfMessageCommand();
when(messageStore.selectOneMessageByOffset(any(Long.class))).thenReturn(createSelectMappedBufferResult());
when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult
(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK)));
RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request);
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
}
@Test
public void testProcessRequest_fail() throws RemotingCommandException, UnknownHostException {
RemotingCommand request = createResumeCheckHalfMessageCommand();
when(messageStore.selectOneMessageByOffset(any(Long.class))).thenReturn(createSelectMappedBufferResult());
when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult
(PutMessageStatus.UNKNOWN_ERROR, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)));
RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request);
assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR);
}
private MessageExt createDefaultMessageExt() {
MessageExt messageExt = new MessageExt();
messageExt.setMsgId("12345678");
messageExt.setQueueId(0);
messageExt.setCommitLogOffset(123456789L);
messageExt.setQueueOffset(1234);
MessageAccessor.putProperty(messageExt, MessageConst.PROPERTY_REAL_QUEUE_ID, "0");
MessageAccessor.putProperty(messageExt, MessageConst.PROPERTY_REAL_TOPIC, "testTopic");
MessageAccessor.putProperty(messageExt, MessageConst.PROPERTY_TRANSACTION_CHECK_TIMES, "15");
return messageExt;
}
private SelectMappedBufferResult createSelectMappedBufferResult(){
SelectMappedBufferResult result = new SelectMappedBufferResult(0, ByteBuffer.allocate(1024) ,0, new MappedFile());
return result;
}
private ResumeCheckHalfMessageRequestHeader createResumeCheckHalfMessageRequestHeader() {
ResumeCheckHalfMessageRequestHeader header = new ResumeCheckHalfMessageRequestHeader();
header.setMsgId("C0A803CA00002A9F0000000000031367");
return header;
}
private RemotingCommand createResumeCheckHalfMessageCommand() {
ResumeCheckHalfMessageRequestHeader header = createResumeCheckHalfMessageRequestHeader();
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.RESUME_CHECK_HALF_MESSAGE, header);
request.makeCustomHeaderToNet();
return request;
}
}
......@@ -16,18 +16,23 @@
*/
package org.apache.rocketmq.broker.transaction.queue;
import java.net.InetSocketAddress;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.store.MessageExtBrokerInner;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Spy;
import org.mockito.junit.MockitoJUnitRunner;
......@@ -35,16 +40,25 @@ import org.mockito.junit.MockitoJUnitRunner;
public class DefaultTransactionalMessageCheckListenerTest {
private DefaultTransactionalMessageCheckListener listener;
@Mock
private MessageStore messageStore;
@Spy
private BrokerController brokerController = new BrokerController(new BrokerConfig(), new NettyServerConfig(),
private BrokerController brokerController = new BrokerController(new BrokerConfig(),
new NettyServerConfig(),
new NettyClientConfig(), new MessageStoreConfig());
@Before
public void init() {
public void init() throws Exception {
listener = new DefaultTransactionalMessageCheckListener();
listener.setBrokerController(brokerController);
brokerController.setMessageStore(messageStore);
}
@After
public void destroy() {
// brokerController.shutdown();
}
@Test
......@@ -53,21 +67,21 @@ public class DefaultTransactionalMessageCheckListenerTest {
}
@Test
public void testSendCheckMessage() throws Exception{
public void testSendCheckMessage() throws Exception {
MessageExt messageExt = createMessageExt();
listener.sendCheckMessage(messageExt);
}
@Test
public void sendCheckMessage(){
public void sendCheckMessage() {
listener.resolveDiscardMsg(createMessageExt());
}
private MessageExtBrokerInner createMessageExt() {
MessageExtBrokerInner inner = new MessageExtBrokerInner();
MessageAccessor.putProperty(inner,MessageConst.PROPERTY_REAL_QUEUE_ID,"1");
MessageAccessor.putProperty(inner,MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX,"1234255");
MessageAccessor.putProperty(inner,MessageConst.PROPERTY_REAL_TOPIC,"realTopic");
MessageAccessor.putProperty(inner, MessageConst.PROPERTY_REAL_QUEUE_ID, "1");
MessageAccessor.putProperty(inner, MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, "1234255");
MessageAccessor.putProperty(inner, MessageConst.PROPERTY_REAL_TOPIC, "realTopic");
inner.setTransactionId(inner.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
inner.setBody("check".getBytes());
inner.setMsgId("12344567890");
......@@ -75,4 +89,22 @@ public class DefaultTransactionalMessageCheckListenerTest {
return inner;
}
@Test
public void testResolveDiscardMsg() {
MessageExt messageExt = new MessageExt();
messageExt.setTopic(MixAll.RMQ_SYS_TRANS_HALF_TOPIC);
messageExt.setQueueId(0);
messageExt.setBody("test resolve discard msg".getBytes());
messageExt.setStoreHost(new InetSocketAddress("127.0.0.1", 10911));
messageExt.setBornHost(new InetSocketAddress("127.0.0.1", 54270));
MessageAccessor.putProperty(messageExt, MessageConst.PROPERTY_REAL_TOPIC, "test_topic");
MessageAccessor.putProperty(messageExt, MessageConst.PROPERTY_PRODUCER_GROUP, "PID_TEST_DISCARD_MSG");
MessageAccessor.putProperty(messageExt, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
MessageAccessor.putProperty(messageExt, MessageConst.PROPERTY_TRANSACTION_CHECK_TIMES, "15");
MessageAccessor.putProperty(messageExt, MessageConst.PROPERTY_REAL_QUEUE_ID, "2");
MessageAccessor.putProperty(messageExt, MessageConst.PROPERTY_TAGS, "test_discard_msg");
MessageAccessor.putProperty(messageExt, MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, "AC14157E4F1C18B4AAC27EB1A0F30000");
listener.resolveDiscardMsg(messageExt);
}
}
......@@ -122,6 +122,7 @@ import org.apache.rocketmq.common.protocol.header.QueryCorrectionOffsetHeader;
import org.apache.rocketmq.common.protocol.header.QueryMessageRequestHeader;
import org.apache.rocketmq.common.protocol.header.QueryTopicConsumeByWhoRequestHeader;
import org.apache.rocketmq.common.protocol.header.ResetOffsetRequestHeader;
import org.apache.rocketmq.common.protocol.header.ResumeCheckHalfMessageRequestHeader;
import org.apache.rocketmq.common.protocol.header.SearchOffsetRequestHeader;
import org.apache.rocketmq.common.protocol.header.SearchOffsetResponseHeader;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
......@@ -2207,4 +2208,24 @@ public class MQClientAPIImpl {
throw new MQClientException(response.getCode(), response.getRemark());
}
}
public boolean resumeCheckHalfMessage(final String addr, String msgId,
final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException {
ResumeCheckHalfMessageRequestHeader requestHeader = new ResumeCheckHalfMessageRequestHeader();
requestHeader.setMsgId(msgId);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.RESUME_CHECK_HALF_MESSAGE, requestHeader);
RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
return true;
}
default:
log.error("Failed to resume half message check logic. Remark={}", response.getRemark());
return false;
}
}
}
......@@ -26,17 +26,12 @@ import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.body.ClusterAclVersionInfo;
import org.apache.rocketmq.common.protocol.header.CreateAccessConfigRequestHeader;
import org.apache.rocketmq.common.protocol.header.GetBrokerAclConfigResponseHeader;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader;
import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader;
import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.RemotingClient;
import org.apache.rocketmq.remoting.exception.RemotingException;
......@@ -294,6 +289,45 @@ public class MQClientAPIImplTest {
assertThat(ex.getErrorMessage()).isEqualTo("corresponding to accessConfig has been deleted failed");
}
}
@Test
public void testResumeCheckHalfMessage_WithException() throws RemotingException, InterruptedException, MQBrokerException, MQClientException {
doAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock mock) throws Throwable {
RemotingCommand request = mock.getArgument(1);
RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setOpaque(request.getOpaque());
response.setRemark("Put message back to RMQ_SYS_TRANS_HALF_TOPIC failed.");
return response;
}
}).when(remotingClient).invokeSync(anyString(), any(RemotingCommand.class), anyLong());
boolean result = mqClientAPI.resumeCheckHalfMessage(brokerAddr, "test", 3000);
assertThat(result).isEqualTo(false);
}
@Test
public void testResumeCheckHalfMessage_Success() throws InterruptedException, RemotingException, MQBrokerException, MQClientException {
doAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock mock) throws Throwable {
RemotingCommand request = mock.getArgument(1);
return createResumeSuccessResponse(request);
}
}).when(remotingClient).invokeSync(anyString(), any(RemotingCommand.class), anyLong());
boolean result = mqClientAPI.resumeCheckHalfMessage(brokerAddr, "test", 3000);
assertThat(result).isEqualTo(true);
}
private RemotingCommand createResumeSuccessResponse(RemotingCommand request) {
RemotingCommand response = RemotingCommand.createResponseCommand(null);
response.setCode(ResponseCode.SUCCESS);
response.setOpaque(request.getOpaque());
return response;
}
private RemotingCommand createSuccessResponse(RemotingCommand request) {
RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
......
......@@ -92,6 +92,7 @@ public class MixAll {
public static final String RMQ_SYS_TRANS_HALF_TOPIC = "RMQ_SYS_TRANS_HALF_TOPIC";
public static final String RMQ_SYS_TRACE_TOPIC = "RMQ_SYS_TRACE_TOPIC";
public static final String RMQ_SYS_TRANS_OP_HALF_TOPIC = "RMQ_SYS_TRANS_OP_HALF_TOPIC";
public static final String TRANS_CHECK_MAX_TIME_TOPIC = "TRANS_CHECK_MAX_TIME_TOPIC";
public static final String CID_SYS_RMQ_TRANS = "CID_RMQ_SYS_TRANS";
public static final String ACL_CONF_TOOLS_FILE = "/conf/tools.yml";
......
......@@ -175,4 +175,9 @@ public class RequestCode {
public static final int QUERY_CONSUME_QUEUE = 321;
public static final int QUERY_DATA_VERSION = 322;
/**
* resume logic of checking half messages that have been put in TRANS_CHECK_MAXTIME_TOPIC before
*/
public static final int RESUME_CHECK_HALF_MESSAGE = 323;
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.protocol.header;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.annotation.CFNullable;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
public class ResumeCheckHalfMessageRequestHeader implements CommandCustomHeader {
@CFNullable
private String msgId;
@Override
public void checkFields() throws RemotingCommandException {
}
public String getMsgId() {
return msgId;
}
public void setMsgId(String msgId) {
this.msgId = msgId;
}
@Override
public String toString() {
return "ResumeCheckHalfMessageRequestHeader [msgId=" + msgId + "]";
}
}
......@@ -455,7 +455,7 @@
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>2.6.3</version>
<version>2.23.0</version>
<scope>test</scope>
</dependency>
</dependencies>
......
......@@ -537,4 +537,17 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
brokerAddr, topic, queueId, index, count, consumerGroup
);
}
@Override
public boolean resumeCheckHalfMessage(String msgId)
throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
return this.defaultMQAdminExtImpl.resumeCheckHalfMessage(msgId);
}
@Override
public boolean resumeCheckHalfMessage(String topic,
String msgId)
throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
return this.defaultMQAdminExtImpl.resumeCheckHalfMessage(topic, msgId);
}
}
......@@ -1025,4 +1025,23 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
brokerAddr, topic, queueId, index, count, consumerGroup, timeoutMillis
);
}
@Override
public boolean resumeCheckHalfMessage(String msgId)
throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
MessageExt msg = this.viewMessage(msgId);
return this.mqClientInstance.getMQClientAPIImpl().resumeCheckHalfMessage(RemotingUtil.socketAddress2String(msg.getStoreHost()), msgId, timeoutMillis);
}
@Override
public boolean resumeCheckHalfMessage(final String topic, final String msgId) throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
MessageExt msg = this.viewMessage(topic, msgId);
if (msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX) == null) {
return this.mqClientInstance.getMQClientAPIImpl().resumeCheckHalfMessage(RemotingUtil.socketAddress2String(msg.getStoreHost()), msgId, timeoutMillis);
} else {
MessageClientExt msgClient = (MessageClientExt) msg;
return this.mqClientInstance.getMQClientAPIImpl().resumeCheckHalfMessage(RemotingUtil.socketAddress2String(msg.getStoreHost()), msgClient.getOffsetMsgId(), timeoutMillis);
}
}
}
......@@ -273,4 +273,9 @@ public interface MQAdminExt extends MQAdmin {
final String topic, final int queueId,
final long index, final int count, final String consumerGroup)
throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException;
boolean resumeCheckHalfMessage(String msgId)
throws RemotingException, MQClientException, InterruptedException, MQBrokerException;
boolean resumeCheckHalfMessage(final String topic, final String msgId) throws RemotingException, MQClientException, InterruptedException, MQBrokerException;
}
......@@ -226,5 +226,20 @@ public class QueryMsgByUniqueKeySubCommandTest {
}
@Test
public void testExecute() throws SubCommandException {
System.setProperty("rocketmq.namesrv.addr", "127.0.0.1:9876");
QueryMsgByUniqueKeySubCommand cmd = new QueryMsgByUniqueKeySubCommand();
String[] args = new String[]{"-t myTopicTest", "-i 0A3A54F7BF7D18B4AAC28A3FA2CF0000"};
Options options = ServerUtil.buildCommandlineOptions(new Options());
CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin ", args, cmd.buildCommandlineOptions(options), new PosixParser());
cmd.execute(commandLine, options, null);
args = new String[]{"-t myTopicTest", "-i 0A3A54F7BF7D18B4AAC28A3FA2CF0000", "-g producerGroupName", "-d clientId"};
commandLine = ServerUtil.parseCmdLine("mqadmin ", args, cmd.buildCommandlineOptions(options), new PosixParser());
cmd.execute(commandLine, options, null);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册