diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index f23cca62d9e7b093a281b8b63728bfa5183bd706..76a051b924c01bcdafd039d34c6c06f844f7063f 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -37,6 +37,7 @@ import org.apache.rocketmq.broker.client.ClientChannelInfo; import org.apache.rocketmq.broker.client.ConsumerGroupInfo; import org.apache.rocketmq.broker.filter.ConsumerFilterData; import org.apache.rocketmq.broker.filter.ExpressionMessageFilter; +import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageUtil; import org.apache.rocketmq.common.MQVersion; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.PlainAccessConfig; @@ -53,7 +54,10 @@ import org.apache.rocketmq.common.protocol.header.GetBrokerAclConfigResponseHead import org.apache.rocketmq.common.protocol.header.UpdateGlobalWhiteAddrsConfigRequestHeader; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; +import org.apache.rocketmq.common.message.MessageAccessor; +import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageDecoder; +import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageId; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.RequestCode; @@ -100,6 +104,7 @@ import org.apache.rocketmq.common.protocol.header.QueryConsumeTimeSpanRequestHea import org.apache.rocketmq.common.protocol.header.QueryCorrectionOffsetHeader; import org.apache.rocketmq.common.protocol.header.QueryTopicConsumeByWhoRequestHeader; import org.apache.rocketmq.common.protocol.header.ResetOffsetRequestHeader; +import org.apache.rocketmq.common.protocol.header.ResumeCheckHalfMessageRequestHeader; import org.apache.rocketmq.common.protocol.header.SearchOffsetRequestHeader; import org.apache.rocketmq.common.protocol.header.SearchOffsetResponseHeader; import org.apache.rocketmq.common.protocol.header.ViewBrokerStatsDataRequestHeader; @@ -120,8 +125,11 @@ import org.apache.rocketmq.remoting.protocol.RemotingSerializable; import org.apache.rocketmq.store.ConsumeQueue; import org.apache.rocketmq.store.ConsumeQueueExt; import org.apache.rocketmq.store.DefaultMessageStore; +import org.apache.rocketmq.store.MessageExtBrokerInner; import org.apache.rocketmq.store.MessageFilter; import org.apache.rocketmq.store.MessageStore; +import org.apache.rocketmq.store.PutMessageResult; +import org.apache.rocketmq.store.PutMessageStatus; import org.apache.rocketmq.store.SelectMappedBufferResult; public class AdminBrokerProcessor implements NettyRequestProcessor { @@ -216,6 +224,8 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { return getBrokerAclConfigVersion(ctx, request); case RequestCode.UPDATE_GLOBAL_WHITE_ADDRS_CONFIG: return updateGlobalWhiteAddrsConfig(ctx, request); + case RequestCode.RESUME_CHECK_HALF_MESSAGE: + return resumeCheckHalfMessage(ctx, request); default: break; } @@ -262,7 +272,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig); - this.brokerController.registerIncrementBrokerData(topicConfig,this.brokerController.getTopicConfigManager().getDataVersion()); + this.brokerController.registerIncrementBrokerData(topicConfig, this.brokerController.getTopicConfigManager().getDataVersion()); return null; } @@ -1518,4 +1528,62 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { return response; } + + private RemotingCommand resumeCheckHalfMessage(ChannelHandlerContext ctx, + RemotingCommand request) + throws RemotingCommandException { + final ResumeCheckHalfMessageRequestHeader requestHeader = (ResumeCheckHalfMessageRequestHeader) request + .decodeCommandCustomHeader(ResumeCheckHalfMessageRequestHeader.class); + final RemotingCommand response = RemotingCommand.createResponseCommand(null); + SelectMappedBufferResult selectMappedBufferResult = null; + try { + MessageId messageId = MessageDecoder.decodeMessageId(requestHeader.getMsgId()); + selectMappedBufferResult = this.brokerController.getMessageStore() + .selectOneMessageByOffset(messageId.getOffset()); + MessageExt msg = MessageDecoder.decode(selectMappedBufferResult.getByteBuffer()); + msg.putUserProperty(MessageConst.PROPERTY_TRANSACTION_CHECK_TIMES, String.valueOf(0)); + PutMessageResult putMessageResult = this.brokerController.getMessageStore() + .putMessage(toMessageExtBrokerInner(msg)); + if (putMessageResult != null + && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) { + log.info( + "Put message back to RMQ_SYS_TRANS_HALF_TOPIC. real topic={}", + msg.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC)); + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + } else { + log.error("Put message back to RMQ_SYS_TRANS_HALF_TOPIC failed."); + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("Put message back to RMQ_SYS_TRANS_HALF_TOPIC failed."); + } + } catch (Exception e) { + log.error("Exception was thrown when putting message back to RMQ_SYS_TRANS_HALF_TOPIC."); + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("Exception was thrown when putting message back to RMQ_SYS_TRANS_HALF_TOPIC."); + } finally { + if (selectMappedBufferResult != null) { + selectMappedBufferResult.release(); + } + } + return response; + } + + private MessageExtBrokerInner toMessageExtBrokerInner(MessageExt msgExt) { + MessageExtBrokerInner inner = new MessageExtBrokerInner(); + inner.setTopic(TransactionalMessageUtil.buildHalfTopic()); + inner.setBody(msgExt.getBody()); + inner.setFlag(msgExt.getFlag()); + MessageAccessor.setProperties(inner, msgExt.getProperties()); + inner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties())); + inner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(msgExt.getTags())); + inner.setQueueId(0); + inner.setSysFlag(msgExt.getSysFlag()); + inner.setBornHost(msgExt.getBornHost()); + inner.setBornTimestamp(msgExt.getBornTimestamp()); + inner.setStoreHost(msgExt.getStoreHost()); + inner.setReconsumeTimes(msgExt.getReconsumeTimes()); + inner.setMsgId(msgExt.getMsgId()); + inner.setWaitStoreMsgOK(false); + return inner; + } } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java index ed353da4bb90ad6fa806dd07a8c3b2e9016e77cd..8f215cdcb9cde3934b225f1e36e2d638d48dc4fb 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java @@ -220,7 +220,7 @@ public class TopicConfigManager extends ConfigManager { } if (createNew) { - this.brokerController.registerBrokerAll(false, true,true); + this.brokerController.registerBrokerAll(false, true, true); } return topicConfig; @@ -264,7 +264,47 @@ public class TopicConfigManager extends ConfigManager { } if (createNew) { - this.brokerController.registerBrokerAll(false, true,true); + this.brokerController.registerBrokerAll(false, true, true); + } + + return topicConfig; + } + + public TopicConfig createTopicOfTranCheckMaxTime(final int clientDefaultTopicQueueNums, final int perm) { + TopicConfig topicConfig = this.topicConfigTable.get(MixAll.TRANS_CHECK_MAX_TIME_TOPIC); + if (topicConfig != null) + return topicConfig; + + boolean createNew = false; + + try { + if (this.lockTopicConfigTable.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { + try { + topicConfig = this.topicConfigTable.get(MixAll.TRANS_CHECK_MAX_TIME_TOPIC); + if (topicConfig != null) + return topicConfig; + + topicConfig = new TopicConfig(MixAll.TRANS_CHECK_MAX_TIME_TOPIC); + topicConfig.setReadQueueNums(clientDefaultTopicQueueNums); + topicConfig.setWriteQueueNums(clientDefaultTopicQueueNums); + topicConfig.setPerm(perm); + topicConfig.setTopicSysFlag(0); + + log.info("create new topic {}", topicConfig); + this.topicConfigTable.put(MixAll.TRANS_CHECK_MAX_TIME_TOPIC, topicConfig); + createNew = true; + this.dataVersion.nextVersion(); + this.persist(); + } finally { + this.lockTopicConfigTable.unlock(); + } + } + } catch (InterruptedException e) { + log.error("create TRANS_CHECK_MAX_TIME_TOPIC exception", e); + } + + if (createNew) { + this.brokerController.registerBrokerAll(false, true, true); } return topicConfig; @@ -289,7 +329,7 @@ public class TopicConfigManager extends ConfigManager { this.dataVersion.nextVersion(); this.persist(); - this.brokerController.registerBrokerAll(false, true,true); + this.brokerController.registerBrokerAll(false, true, true); } } @@ -309,7 +349,7 @@ public class TopicConfigManager extends ConfigManager { this.dataVersion.nextVersion(); this.persist(); - this.brokerController.registerBrokerAll(false, true,true); + this.brokerController.registerBrokerAll(false, true, true); } } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/AbstractTransactionalMessageCheckListener.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/AbstractTransactionalMessageCheckListener.java index 659c6af16f075eb0f17373e32c15fe04c755c51d..62507cdfdba6ba5c75dd5001d00b8b68f2b9e926 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/transaction/AbstractTransactionalMessageCheckListener.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/AbstractTransactionalMessageCheckListener.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.broker.transaction; import io.netty.channel.Channel; +import java.util.Random; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.message.MessageConst; @@ -36,6 +37,10 @@ public abstract class AbstractTransactionalMessageCheckListener { private BrokerController brokerController; + //queue nums of topic TRANS_CHECK_MAX_TIME_TOPIC + protected final static int TCMT_QUEUE_NUMS = 1; + protected final Random random = new Random(System.currentTimeMillis()); + private static ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue(2000), new ThreadFactory() { @Override public Thread newThread(Runnable r) { diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/DefaultTransactionalMessageCheckListener.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/DefaultTransactionalMessageCheckListener.java index 529bfe4f3491ade9c983b01286d20f70afe6b0cf..ee87bd375300ea0fb80c4315ddaaffd78463250d 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/DefaultTransactionalMessageCheckListener.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/DefaultTransactionalMessageCheckListener.java @@ -17,10 +17,18 @@ package org.apache.rocketmq.broker.transaction.queue; import org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener; +import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.constant.PermName; +import org.apache.rocketmq.common.message.MessageAccessor; +import org.apache.rocketmq.common.message.MessageConst; +import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; +import org.apache.rocketmq.store.MessageExtBrokerInner; +import org.apache.rocketmq.store.PutMessageResult; +import org.apache.rocketmq.store.PutMessageStatus; public class DefaultTransactionalMessageCheckListener extends AbstractTransactionalMessageCheckListener { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.TRANSACTION_LOGGER_NAME); @@ -31,6 +39,41 @@ public class DefaultTransactionalMessageCheckListener extends AbstractTransactio @Override public void resolveDiscardMsg(MessageExt msgExt) { - log.error("MsgExt:{} has been checked too many times, so discard it", msgExt); + log.error("MsgExt:{} has been checked too many times, so discard it by moving it to system topic TRANS_CHECK_MAXTIME_TOPIC", msgExt); + + try { + MessageExtBrokerInner brokerInner = toMessageExtBrokerInner(msgExt); + PutMessageResult putMessageResult = this.getBrokerController().getMessageStore().putMessage(brokerInner); + if (putMessageResult != null && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) { + log.info("Put checked-too-many-time half message to TRANS_CHECK_MAXTIME_TOPIC OK. Restored in queueOffset={}, " + + "commitLogOffset={}, real topic={}", msgExt.getQueueOffset(), msgExt.getCommitLogOffset(), msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC)); + } else { + log.error("Put checked-too-many-time half message to TRANS_CHECK_MAXTIME_TOPIC failed, real topic={}, msgId={}", msgExt.getTopic(), msgExt.getMsgId()); + } + } catch (Exception e) { + log.warn("Put checked-too-many-time message to TRANS_CHECK_MAXTIME_TOPIC error. {}", e); + } + + } + + private MessageExtBrokerInner toMessageExtBrokerInner(MessageExt msgExt) { + TopicConfig topicConfig = this.getBrokerController().getTopicConfigManager().createTopicOfTranCheckMaxTime(TCMT_QUEUE_NUMS, PermName.PERM_READ | PermName.PERM_WRITE); + int queueId = Math.abs(random.nextInt() % 99999999) % TCMT_QUEUE_NUMS; + MessageExtBrokerInner inner = new MessageExtBrokerInner(); + inner.setTopic(topicConfig.getTopicName()); + inner.setBody(msgExt.getBody()); + inner.setFlag(msgExt.getFlag()); + MessageAccessor.setProperties(inner, msgExt.getProperties()); + inner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties())); + inner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(msgExt.getTags())); + inner.setQueueId(queueId); + inner.setSysFlag(msgExt.getSysFlag()); + inner.setBornHost(msgExt.getBornHost()); + inner.setBornTimestamp(msgExt.getBornTimestamp()); + inner.setStoreHost(msgExt.getStoreHost()); + inner.setReconsumeTimes(msgExt.getReconsumeTimes()); + inner.setMsgId(msgExt.getMsgId()); + inner.setWaitStoreMsgOK(false); + return inner; } } diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java new file mode 100644 index 0000000000000000000000000000000000000000..ec0a879c209c97d7c8ab2f63f254e343b0b62fba --- /dev/null +++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker.processor; + +import io.netty.channel.ChannelHandlerContext; +import java.net.UnknownHostException; +import java.nio.ByteBuffer; +import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.common.BrokerConfig; +import org.apache.rocketmq.common.message.MessageAccessor; +import org.apache.rocketmq.common.message.MessageConst; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.protocol.RequestCode; +import org.apache.rocketmq.common.protocol.ResponseCode; +import org.apache.rocketmq.common.protocol.header.ResumeCheckHalfMessageRequestHeader; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.netty.NettyClientConfig; +import org.apache.rocketmq.remoting.netty.NettyServerConfig; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.store.AppendMessageResult; +import org.apache.rocketmq.store.AppendMessageStatus; +import org.apache.rocketmq.store.MappedFile; +import org.apache.rocketmq.store.MessageExtBrokerInner; +import org.apache.rocketmq.store.MessageStore; +import org.apache.rocketmq.store.PutMessageResult; +import org.apache.rocketmq.store.PutMessageStatus; +import org.apache.rocketmq.store.SelectMappedBufferResult; +import org.apache.rocketmq.store.config.MessageStoreConfig; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Spy; +import org.mockito.junit.MockitoJUnitRunner; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class AdminBrokerProcessorTest { + + private AdminBrokerProcessor adminBrokerProcessor; + + @Mock + private ChannelHandlerContext handlerContext; + + @Spy + private BrokerController + brokerController = new BrokerController(new BrokerConfig(), new NettyServerConfig(), new NettyClientConfig(), + new MessageStoreConfig()); + + @Mock + private MessageStore messageStore; + + + @Before + public void init() { + brokerController.setMessageStore(messageStore); + adminBrokerProcessor = new AdminBrokerProcessor(brokerController); + } + + @Test + public void testProcessRequest_success() throws RemotingCommandException, UnknownHostException { + RemotingCommand request = createResumeCheckHalfMessageCommand(); + when(messageStore.selectOneMessageByOffset(any(Long.class))).thenReturn(createSelectMappedBufferResult()); + when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult + (PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK))); + RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request); + assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); + } + + @Test + public void testProcessRequest_fail() throws RemotingCommandException, UnknownHostException { + RemotingCommand request = createResumeCheckHalfMessageCommand(); + when(messageStore.selectOneMessageByOffset(any(Long.class))).thenReturn(createSelectMappedBufferResult()); + when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult + (PutMessageStatus.UNKNOWN_ERROR, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))); + RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request); + assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR); + } + + private MessageExt createDefaultMessageExt() { + MessageExt messageExt = new MessageExt(); + messageExt.setMsgId("12345678"); + messageExt.setQueueId(0); + messageExt.setCommitLogOffset(123456789L); + messageExt.setQueueOffset(1234); + MessageAccessor.putProperty(messageExt, MessageConst.PROPERTY_REAL_QUEUE_ID, "0"); + MessageAccessor.putProperty(messageExt, MessageConst.PROPERTY_REAL_TOPIC, "testTopic"); + MessageAccessor.putProperty(messageExt, MessageConst.PROPERTY_TRANSACTION_CHECK_TIMES, "15"); + return messageExt; + } + + private SelectMappedBufferResult createSelectMappedBufferResult(){ + SelectMappedBufferResult result = new SelectMappedBufferResult(0, ByteBuffer.allocate(1024) ,0, new MappedFile()); + return result; + } + private ResumeCheckHalfMessageRequestHeader createResumeCheckHalfMessageRequestHeader() { + ResumeCheckHalfMessageRequestHeader header = new ResumeCheckHalfMessageRequestHeader(); + header.setMsgId("C0A803CA00002A9F0000000000031367"); + return header; + } + + private RemotingCommand createResumeCheckHalfMessageCommand() { + ResumeCheckHalfMessageRequestHeader header = createResumeCheckHalfMessageRequestHeader(); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.RESUME_CHECK_HALF_MESSAGE, header); + request.makeCustomHeaderToNet(); + return request; + } +} diff --git a/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/DefaultTransactionalMessageCheckListenerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/DefaultTransactionalMessageCheckListenerTest.java index 17bf00b431091eef91b7c186a6069088beb0e266..653a96933ed5decb6b19a0086008c96d90ad7c77 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/DefaultTransactionalMessageCheckListenerTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/DefaultTransactionalMessageCheckListenerTest.java @@ -16,18 +16,23 @@ */ package org.apache.rocketmq.broker.transaction.queue; +import java.net.InetSocketAddress; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.common.BrokerConfig; +import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.message.MessageAccessor; import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.remoting.netty.NettyClientConfig; import org.apache.rocketmq.remoting.netty.NettyServerConfig; import org.apache.rocketmq.store.MessageExtBrokerInner; +import org.apache.rocketmq.store.MessageStore; import org.apache.rocketmq.store.config.MessageStoreConfig; +import org.junit.After; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; +import org.mockito.Mock; import org.mockito.Spy; import org.mockito.junit.MockitoJUnitRunner; @@ -35,16 +40,25 @@ import org.mockito.junit.MockitoJUnitRunner; public class DefaultTransactionalMessageCheckListenerTest { private DefaultTransactionalMessageCheckListener listener; + @Mock + private MessageStore messageStore; @Spy - private BrokerController brokerController = new BrokerController(new BrokerConfig(), new NettyServerConfig(), + private BrokerController brokerController = new BrokerController(new BrokerConfig(), + new NettyServerConfig(), new NettyClientConfig(), new MessageStoreConfig()); - @Before - public void init() { + public void init() throws Exception { listener = new DefaultTransactionalMessageCheckListener(); listener.setBrokerController(brokerController); + brokerController.setMessageStore(messageStore); + + } + + @After + public void destroy() { +// brokerController.shutdown(); } @Test @@ -53,21 +67,21 @@ public class DefaultTransactionalMessageCheckListenerTest { } @Test - public void testSendCheckMessage() throws Exception{ + public void testSendCheckMessage() throws Exception { MessageExt messageExt = createMessageExt(); listener.sendCheckMessage(messageExt); } @Test - public void sendCheckMessage(){ + public void sendCheckMessage() { listener.resolveDiscardMsg(createMessageExt()); } private MessageExtBrokerInner createMessageExt() { MessageExtBrokerInner inner = new MessageExtBrokerInner(); - MessageAccessor.putProperty(inner,MessageConst.PROPERTY_REAL_QUEUE_ID,"1"); - MessageAccessor.putProperty(inner,MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX,"1234255"); - MessageAccessor.putProperty(inner,MessageConst.PROPERTY_REAL_TOPIC,"realTopic"); + MessageAccessor.putProperty(inner, MessageConst.PROPERTY_REAL_QUEUE_ID, "1"); + MessageAccessor.putProperty(inner, MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, "1234255"); + MessageAccessor.putProperty(inner, MessageConst.PROPERTY_REAL_TOPIC, "realTopic"); inner.setTransactionId(inner.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX)); inner.setBody("check".getBytes()); inner.setMsgId("12344567890"); @@ -75,4 +89,22 @@ public class DefaultTransactionalMessageCheckListenerTest { return inner; } + @Test + public void testResolveDiscardMsg() { + MessageExt messageExt = new MessageExt(); + messageExt.setTopic(MixAll.RMQ_SYS_TRANS_HALF_TOPIC); + messageExt.setQueueId(0); + messageExt.setBody("test resolve discard msg".getBytes()); + messageExt.setStoreHost(new InetSocketAddress("127.0.0.1", 10911)); + messageExt.setBornHost(new InetSocketAddress("127.0.0.1", 54270)); + MessageAccessor.putProperty(messageExt, MessageConst.PROPERTY_REAL_TOPIC, "test_topic"); + MessageAccessor.putProperty(messageExt, MessageConst.PROPERTY_PRODUCER_GROUP, "PID_TEST_DISCARD_MSG"); + MessageAccessor.putProperty(messageExt, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true"); + MessageAccessor.putProperty(messageExt, MessageConst.PROPERTY_TRANSACTION_CHECK_TIMES, "15"); + MessageAccessor.putProperty(messageExt, MessageConst.PROPERTY_REAL_QUEUE_ID, "2"); + MessageAccessor.putProperty(messageExt, MessageConst.PROPERTY_TAGS, "test_discard_msg"); + MessageAccessor.putProperty(messageExt, MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, "AC14157E4F1C18B4AAC27EB1A0F30000"); + listener.resolveDiscardMsg(messageExt); + } + } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index c3382caa280aaac1baa7bb429d228884195403a2..b743af93d469f283a5158e43142bb137c87cebad 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -122,6 +122,7 @@ import org.apache.rocketmq.common.protocol.header.QueryCorrectionOffsetHeader; import org.apache.rocketmq.common.protocol.header.QueryMessageRequestHeader; import org.apache.rocketmq.common.protocol.header.QueryTopicConsumeByWhoRequestHeader; import org.apache.rocketmq.common.protocol.header.ResetOffsetRequestHeader; +import org.apache.rocketmq.common.protocol.header.ResumeCheckHalfMessageRequestHeader; import org.apache.rocketmq.common.protocol.header.SearchOffsetRequestHeader; import org.apache.rocketmq.common.protocol.header.SearchOffsetResponseHeader; import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader; @@ -2207,4 +2208,24 @@ public class MQClientAPIImpl { throw new MQClientException(response.getCode(), response.getRemark()); } } + + public boolean resumeCheckHalfMessage(final String addr, String msgId, + final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException { + ResumeCheckHalfMessageRequestHeader requestHeader = new ResumeCheckHalfMessageRequestHeader(); + requestHeader.setMsgId(msgId); + + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.RESUME_CHECK_HALF_MESSAGE, requestHeader); + + RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), + request, timeoutMillis); + assert response != null; + switch (response.getCode()) { + case ResponseCode.SUCCESS: { + return true; + } + default: + log.error("Failed to resume half message check logic. Remark={}", response.getRemark()); + return false; + } + } } diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java index 28555959ba9ba268ec1ded6ae24faae23837c15e..84af63235570bcfbe9d9919a7b39cfe0283e91a5 100644 --- a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java @@ -26,17 +26,12 @@ import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendStatus; -import org.apache.rocketmq.common.DataVersion; import org.apache.rocketmq.common.PlainAccessConfig; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.protocol.ResponseCode; -import org.apache.rocketmq.common.protocol.body.ClusterAclVersionInfo; -import org.apache.rocketmq.common.protocol.header.CreateAccessConfigRequestHeader; -import org.apache.rocketmq.common.protocol.header.GetBrokerAclConfigResponseHeader; import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader; import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader; -import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader; import org.apache.rocketmq.remoting.InvokeCallback; import org.apache.rocketmq.remoting.RemotingClient; import org.apache.rocketmq.remoting.exception.RemotingException; @@ -294,6 +289,45 @@ public class MQClientAPIImplTest { assertThat(ex.getErrorMessage()).isEqualTo("corresponding to accessConfig has been deleted failed"); } } + @Test + public void testResumeCheckHalfMessage_WithException() throws RemotingException, InterruptedException, MQBrokerException, MQClientException { + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock mock) throws Throwable { + RemotingCommand request = mock.getArgument(1); + RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class); + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setOpaque(request.getOpaque()); + response.setRemark("Put message back to RMQ_SYS_TRANS_HALF_TOPIC failed."); + return response; + } + }).when(remotingClient).invokeSync(anyString(), any(RemotingCommand.class), anyLong()); + + boolean result = mqClientAPI.resumeCheckHalfMessage(brokerAddr, "test", 3000); + assertThat(result).isEqualTo(false); + } + + @Test + public void testResumeCheckHalfMessage_Success() throws InterruptedException, RemotingException, MQBrokerException, MQClientException { + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock mock) throws Throwable { + RemotingCommand request = mock.getArgument(1); + return createResumeSuccessResponse(request); + } + }).when(remotingClient).invokeSync(anyString(), any(RemotingCommand.class), anyLong()); + + boolean result = mqClientAPI.resumeCheckHalfMessage(brokerAddr, "test", 3000); + + assertThat(result).isEqualTo(true); + } + + private RemotingCommand createResumeSuccessResponse(RemotingCommand request) { + RemotingCommand response = RemotingCommand.createResponseCommand(null); + response.setCode(ResponseCode.SUCCESS); + response.setOpaque(request.getOpaque()); + return response; + } private RemotingCommand createSuccessResponse(RemotingCommand request) { RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class); @@ -329,7 +363,7 @@ public class MQClientAPIImplTest { response.setOpaque(request.getOpaque()); response.markResponseType(); response.setRemark(null); - + return response; } diff --git a/common/src/main/java/org/apache/rocketmq/common/MixAll.java b/common/src/main/java/org/apache/rocketmq/common/MixAll.java index 550b0b6a556d9ab0aa0046d4872108b926a10ba1..0af65dff0f11b43e31ae73a8747adf1f72156a0d 100644 --- a/common/src/main/java/org/apache/rocketmq/common/MixAll.java +++ b/common/src/main/java/org/apache/rocketmq/common/MixAll.java @@ -92,6 +92,7 @@ public class MixAll { public static final String RMQ_SYS_TRANS_HALF_TOPIC = "RMQ_SYS_TRANS_HALF_TOPIC"; public static final String RMQ_SYS_TRACE_TOPIC = "RMQ_SYS_TRACE_TOPIC"; public static final String RMQ_SYS_TRANS_OP_HALF_TOPIC = "RMQ_SYS_TRANS_OP_HALF_TOPIC"; + public static final String TRANS_CHECK_MAX_TIME_TOPIC = "TRANS_CHECK_MAX_TIME_TOPIC"; public static final String CID_SYS_RMQ_TRANS = "CID_RMQ_SYS_TRANS"; public static final String ACL_CONF_TOOLS_FILE = "/conf/tools.yml"; diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java index b771b77b659949ba8564bd474031b4c79386bf18..58c4b9fe90f2d2ab4a63961deacf4b2ebffd775a 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java @@ -175,4 +175,9 @@ public class RequestCode { public static final int QUERY_CONSUME_QUEUE = 321; public static final int QUERY_DATA_VERSION = 322; + + /** + * resume logic of checking half messages that have been put in TRANS_CHECK_MAXTIME_TOPIC before + */ + public static final int RESUME_CHECK_HALF_MESSAGE = 323; } diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/ResumeCheckHalfMessageRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/ResumeCheckHalfMessageRequestHeader.java new file mode 100644 index 0000000000000000000000000000000000000000..14dacd5e8dc17cf502a00a59b5d634ed380fa710 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/ResumeCheckHalfMessageRequestHeader.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common.protocol.header; + +import org.apache.rocketmq.remoting.CommandCustomHeader; +import org.apache.rocketmq.remoting.annotation.CFNullable; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; + +public class ResumeCheckHalfMessageRequestHeader implements CommandCustomHeader { + @CFNullable + private String msgId; + + @Override + public void checkFields() throws RemotingCommandException { + + } + + public String getMsgId() { + return msgId; + } + + public void setMsgId(String msgId) { + this.msgId = msgId; + } + + @Override + public String toString() { + return "ResumeCheckHalfMessageRequestHeader [msgId=" + msgId + "]"; + } +} diff --git a/pom.xml b/pom.xml index 7ab89910ca2b1c31df5dc638716843fb3a8f0a46..7d183e748848b315647415642370f062a1ea9e1c 100644 --- a/pom.xml +++ b/pom.xml @@ -455,7 +455,7 @@ org.mockito mockito-core - 2.6.3 + 2.23.0 test diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java index f00dcefa4321820ba752af74c156763622e11245..92371f1e2c3350ae42ec06b46a6d035be6241b73 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java @@ -537,4 +537,17 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt { brokerAddr, topic, queueId, index, count, consumerGroup ); } + + @Override + public boolean resumeCheckHalfMessage(String msgId) + throws RemotingException, MQClientException, InterruptedException, MQBrokerException { + return this.defaultMQAdminExtImpl.resumeCheckHalfMessage(msgId); + } + + @Override + public boolean resumeCheckHalfMessage(String topic, + String msgId) + throws RemotingException, MQClientException, InterruptedException, MQBrokerException { + return this.defaultMQAdminExtImpl.resumeCheckHalfMessage(topic, msgId); + } } diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java index 502e9daa33b65e89ab311386ed4037c182f3a4c5..210d5a997d4d5d4c7171d71a8e1af91b47b12e99 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java @@ -1025,4 +1025,23 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { brokerAddr, topic, queueId, index, count, consumerGroup, timeoutMillis ); } + + @Override + public boolean resumeCheckHalfMessage(String msgId) + throws RemotingException, MQClientException, InterruptedException, MQBrokerException { + MessageExt msg = this.viewMessage(msgId); + + return this.mqClientInstance.getMQClientAPIImpl().resumeCheckHalfMessage(RemotingUtil.socketAddress2String(msg.getStoreHost()), msgId, timeoutMillis); + } + + @Override + public boolean resumeCheckHalfMessage(final String topic, final String msgId) throws RemotingException, MQClientException, InterruptedException, MQBrokerException { + MessageExt msg = this.viewMessage(topic, msgId); + if (msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX) == null) { + return this.mqClientInstance.getMQClientAPIImpl().resumeCheckHalfMessage(RemotingUtil.socketAddress2String(msg.getStoreHost()), msgId, timeoutMillis); + } else { + MessageClientExt msgClient = (MessageClientExt) msg; + return this.mqClientInstance.getMQClientAPIImpl().resumeCheckHalfMessage(RemotingUtil.socketAddress2String(msg.getStoreHost()), msgClient.getOffsetMsgId(), timeoutMillis); + } + } } diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java index 930785ec1e77337802ad080818ebca84eece2e32..d5c75f0e82bfb6420cab03edb4253d2db232bef2 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java @@ -273,4 +273,9 @@ public interface MQAdminExt extends MQAdmin { final String topic, final int queueId, final long index, final int count, final String consumerGroup) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException; + + boolean resumeCheckHalfMessage(String msgId) + throws RemotingException, MQClientException, InterruptedException, MQBrokerException; + + boolean resumeCheckHalfMessage(final String topic, final String msgId) throws RemotingException, MQClientException, InterruptedException, MQBrokerException; } diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/message/QueryMsgByUniqueKeySubCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/message/QueryMsgByUniqueKeySubCommandTest.java index be6b6369de016ef24e53a3f341fc5e366f96b323..e757608280ff8f912f95563e72a6734422712060 100644 --- a/tools/src/test/java/org/apache/rocketmq/tools/command/message/QueryMsgByUniqueKeySubCommandTest.java +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/message/QueryMsgByUniqueKeySubCommandTest.java @@ -226,5 +226,20 @@ public class QueryMsgByUniqueKeySubCommandTest { } + @Test + public void testExecute() throws SubCommandException { + + System.setProperty("rocketmq.namesrv.addr", "127.0.0.1:9876"); + QueryMsgByUniqueKeySubCommand cmd = new QueryMsgByUniqueKeySubCommand(); + String[] args = new String[]{"-t myTopicTest", "-i 0A3A54F7BF7D18B4AAC28A3FA2CF0000"}; + Options options = ServerUtil.buildCommandlineOptions(new Options()); + CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin ", args, cmd.buildCommandlineOptions(options), new PosixParser()); + cmd.execute(commandLine, options, null); + + args = new String[]{"-t myTopicTest", "-i 0A3A54F7BF7D18B4AAC28A3FA2CF0000", "-g producerGroupName", "-d clientId"}; + commandLine = ServerUtil.parseCmdLine("mqadmin ", args, cmd.buildCommandlineOptions(options), new PosixParser()); + cmd.execute(commandLine, options, null); + + } }