diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java index fa076aa98ea4852e5c0b26419640443a27c363ec..e4b9654d60a0de0d3a1aeed77650e763d99ad9ba 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java @@ -275,6 +275,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement msgInner.setBody(body); msgInner.setFlag(requestHeader.getFlag()); + MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties())); msgInner.setPropertiesString(requestHeader.getProperties()); msgInner.setBornTimestamp(requestHeader.getBornTimestamp()); msgInner.setBornHost(ctx.channel().remoteAddress()); diff --git a/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageBridgeTest.java b/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageBridgeTest.java index b1c669c9933b1a0f2af5e0cee918c1eefb8a1717..9e88bd2e894fc42810b7af58f7797c69a6892f19 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageBridgeTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageBridgeTest.java @@ -47,6 +47,7 @@ import org.mockito.junit.MockitoJUnitRunner; import java.util.Map; import java.util.Set; +import java.util.concurrent.CompletableFuture; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; @@ -87,6 +88,14 @@ public class TransactionalMessageBridgeTest { assertThat(result.getPutMessageStatus()).isEqualTo(PutMessageStatus.PUT_OK); } + @Test + public void testAsyncPutHalfMessage() throws Exception { + when(messageStore.asyncPutMessage(any(MessageExtBrokerInner.class))) + .thenReturn(CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK)))); + CompletableFuture result = transactionBridge.asyncPutHalfMessage(createMessageBrokerInner()); + assertThat(result.get().getPutMessageStatus()).isEqualTo(PutMessageStatus.PUT_OK); + } + @Test public void testFetchMessageQueues() { Set messageQueues = transactionBridge.fetchMessageQueues(MixAll.RMQ_SYS_TRANS_HALF_TOPIC); diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java index be2e4d6e4d9820556193854891bcda28dad2d3c4..7b565ecdeedca35da170db996ff12054e7084441 100644 --- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java @@ -24,7 +24,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -661,14 +660,14 @@ public class CommitLog { storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet(); storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes()); - CompletableFuture flushResultFuture = submitFlushRequest(result, putMessageResult, msg); - CompletableFuture replicaResultFuture = submitReplicaRequest(result, putMessageResult, msg); - return flushResultFuture.thenCombine(replicaResultFuture, (flushOK, replicaOK) -> { - if (!flushOK) { + CompletableFuture flushResultFuture = submitFlushRequest(result, putMessageResult, msg); + CompletableFuture replicaResultFuture = submitReplicaRequest(result, putMessageResult, msg); + return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> { + if (flushStatus != PutMessageStatus.PUT_OK) { putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT); } - if (!replicaOK) { - putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT); + if (replicaStatus != PutMessageStatus.PUT_OK) { + putMessageResult.setPutMessageStatus(replicaStatus); } return putMessageResult; }); @@ -762,15 +761,15 @@ public class CommitLog { storeStatsService.getSinglePutMessageTopicTimesTotal(messageExtBatch.getTopic()).addAndGet(result.getMsgNum()); storeStatsService.getSinglePutMessageTopicSizeTotal(messageExtBatch.getTopic()).addAndGet(result.getWroteBytes()); - CompletableFuture flushOKFuture = submitFlushRequest(result, putMessageResult, messageExtBatch); - CompletableFuture replicaOKFuture = submitReplicaRequest(result, putMessageResult, messageExtBatch); - return flushOKFuture.thenCombine(replicaOKFuture, (flushOK, replicaOK) -> { - if (!flushOK) { + CompletableFuture flushOKFuture = submitFlushRequest(result, putMessageResult, messageExtBatch); + CompletableFuture replicaOKFuture = submitReplicaRequest(result, putMessageResult, messageExtBatch); + return flushOKFuture.thenCombine(replicaOKFuture, (flushStatus, replicaStatus) -> { + if (flushStatus != PutMessageStatus.PUT_OK) { putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT); } - if (!replicaOK) { - putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT); + if (replicaStatus != PutMessageStatus.PUT_OK) { + putMessageResult.setPutMessageStatus(replicaStatus); } return putMessageResult; }); @@ -900,7 +899,7 @@ public class CommitLog { return putMessageResult; } - public CompletableFuture submitFlushRequest(AppendMessageResult result, PutMessageResult putMessageResult, + public CompletableFuture submitFlushRequest(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) { // Synchronization flush if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) { @@ -912,7 +911,7 @@ public class CommitLog { return request.future(); } else { service.wakeup(); - return CompletableFuture.completedFuture(true); + return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK); } } // Asynchronous flush @@ -922,11 +921,11 @@ public class CommitLog { } else { commitLogService.wakeup(); } - return CompletableFuture.completedFuture(true); + return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK); } } - public CompletableFuture submitReplicaRequest(AppendMessageResult result, PutMessageResult putMessageResult, + public CompletableFuture submitReplicaRequest(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) { if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) { HAService service = this.defaultMessageStore.getHaService(); @@ -939,11 +938,11 @@ public class CommitLog { return request.future(); } else { - return CompletableFuture.completedFuture(false); + return CompletableFuture.completedFuture(PutMessageStatus.SLAVE_NOT_AVAILABLE); } } } - return CompletableFuture.completedFuture(true); + return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK); } @@ -954,15 +953,15 @@ public class CommitLog { if (messageExt.isWaitStoreMsgOK()) { GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes()); service.putRequest(request); - CompletableFuture flushOkFuture = request.future(); - boolean flushOK = false; + CompletableFuture flushOkFuture = request.future(); + PutMessageStatus flushStatus = null; try { - flushOK = flushOkFuture.get(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(), + flushStatus = flushOkFuture.get(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(), TimeUnit.MILLISECONDS); } catch (InterruptedException | ExecutionException | TimeoutException e) { //flushOK=false; } - if (!flushOK) { + if (flushStatus != PutMessageStatus.PUT_OK) { log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags() + " client address: " + messageExt.getBornHostString()); putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT); @@ -990,14 +989,13 @@ public class CommitLog { GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes()); service.putRequest(request); service.getWaitNotifyObject().wakeupAll(); - boolean flushOK = false; + PutMessageStatus replicaStatus = null; try { - flushOK = request.future().get(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(), + replicaStatus = request.future().get(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(), TimeUnit.MILLISECONDS); } catch (InterruptedException | ExecutionException | TimeoutException e) { - //flushOK=false; } - if (!flushOK) { + if (replicaStatus != PutMessageStatus.PUT_OK) { log.error("do sync transfer other node, wait return, but failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags() + " client address: " + messageExt.getBornHostNameString()); putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT); @@ -1367,8 +1365,7 @@ public class CommitLog { public static class GroupCommitRequest { private final long nextOffset; - private final CountDownLatch countDownLatch = new CountDownLatch(1); - private CompletableFuture flushOk = new CompletableFuture<>(); + private CompletableFuture flushOKFuture = new CompletableFuture<>(); private final long startTimestamp = System.currentTimeMillis(); private long timeoutMillis = Long.MAX_VALUE; @@ -1388,23 +1385,15 @@ public class CommitLog { public void wakeupCustomer(final boolean flushOK) { long endTimestamp = System.currentTimeMillis(); - this.flushOk.complete(flushOK && ((endTimestamp - this.startTimestamp) <= this.timeoutMillis)); - this.countDownLatch.countDown(); + PutMessageStatus result = (flushOK && ((endTimestamp - this.startTimestamp) <= this.timeoutMillis)) ? + PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_SLAVE_TIMEOUT; + this.flushOKFuture.complete(result); } - public CompletableFuture future() { - return flushOk; + public CompletableFuture future() { + return flushOKFuture; } - public boolean waitForFlush(long timeout) { - try { - this.countDownLatch.await(timeout, TimeUnit.MILLISECONDS); - return flushOk.get(); - } catch (InterruptedException | ExecutionException e) { - log.error("Interrupted", e); - return false; - } - } } /** diff --git a/store/src/test/java/org/apache/rocketmq/store/HATest.java b/store/src/test/java/org/apache/rocketmq/store/HATest.java index cd23d8bb499dbdba9de2637a39c7a283d7699560..a2702a0f67afcda651198c9748ab9b40a04b6af7 100644 --- a/store/src/test/java/org/apache/rocketmq/store/HATest.java +++ b/store/src/test/java/org/apache/rocketmq/store/HATest.java @@ -144,10 +144,12 @@ public class HATest { //shutdown slave, putMessage should return FLUSH_SLAVE_TIMEOUT slaveMessageStore.shutdown(); + //wait to let master clean the slave's connection + Thread.sleep(masterMessageStoreConfig.getHaHousekeepingInterval() + 500); for (long i = 0; i < totalMsgs; i++) { CompletableFuture putResultFuture = messageStore.asyncPutMessage(buildMessage()); PutMessageResult result = putResultFuture.get(); - assertEquals(PutMessageStatus.FLUSH_SLAVE_TIMEOUT, result.getPutMessageStatus()); + assertEquals(PutMessageStatus.SLAVE_NOT_AVAILABLE, result.getPutMessageStatus()); } } diff --git a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQTransactionalProducer.java b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQTransactionalProducer.java new file mode 100644 index 0000000000000000000000000000000000000000..dcc76b2d844e303096a25c5869f4ec9851e3eaa9 --- /dev/null +++ b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQTransactionalProducer.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.client.rmq; + +import org.apache.log4j.Logger; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.LocalTransactionState; +import org.apache.rocketmq.client.producer.TransactionListener; +import org.apache.rocketmq.client.producer.TransactionMQProducer; +import org.apache.rocketmq.common.Pair; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.test.clientinterface.AbstractMQProducer; +import org.apache.rocketmq.test.sendresult.ResultWrapper; + +public class RMQTransactionalProducer extends AbstractMQProducer { + private static Logger logger = Logger.getLogger(RMQTransactionalProducer.class); + private TransactionMQProducer producer = null; + private String nsAddr = null; + + public RMQTransactionalProducer(String nsAddr, String topic, TransactionListener transactionListener) { + this(nsAddr, topic, false, transactionListener); + } + + public RMQTransactionalProducer(String nsAddr, String topic, boolean useTLS, TransactionListener transactionListener) { + super(topic); + this.nsAddr = nsAddr; + create(useTLS, transactionListener); + start(); + } + + protected void create(boolean useTLS, TransactionListener transactionListener) { + producer = new TransactionMQProducer(); + producer.setProducerGroup(getProducerGroupName()); + producer.setInstanceName(getProducerInstanceName()); + producer.setTransactionListener(transactionListener); + producer.setUseTLS(useTLS); + + if (nsAddr != null) { + producer.setNamesrvAddr(nsAddr); + } + } + + public void start() { + try { + producer.start(); + super.setStartSuccess(true); + } catch (MQClientException e) { + super.setStartSuccess(false); + logger.error(e); + e.printStackTrace(); + } + } + + @Override + public ResultWrapper send(Object msg, Object arg) { + boolean commitMsg = ((Pair) arg).getObject2() == LocalTransactionState.COMMIT_MESSAGE; + org.apache.rocketmq.client.producer.SendResult metaqResult = null; + Message message = (Message) msg; + try { + long start = System.currentTimeMillis(); + metaqResult = producer.sendMessageInTransaction(message, arg); + this.msgRTs.addData(System.currentTimeMillis() - start); + if (isDebug) { + logger.info(metaqResult); + } + sendResult.setMsgId(metaqResult.getMsgId()); + sendResult.setSendResult(true); + sendResult.setBrokerIp(metaqResult.getMessageQueue().getBrokerName()); + if (commitMsg) { + msgBodys.addData(new String(message.getBody())); + } + originMsgs.addData(msg); + originMsgIndex.put(new String(message.getBody()), metaqResult); + } catch (MQClientException e) { + if (isDebug) { + e.printStackTrace(); + } + + sendResult.setSendResult(false); + sendResult.setSendException(e); + errorMsgs.addData(msg); + } + return sendResult; + } + + @Override + public void shutdown() { + producer.shutdown(); + } + +} diff --git a/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java b/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java index 45c6750dfdf98a370913f910ca523ea02122b201..c6a835fd67f0bf990e64da2049f3c2468adc1ac5 100644 --- a/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java +++ b/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java @@ -22,12 +22,14 @@ import java.util.List; import org.apache.log4j.Logger; import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.client.producer.TransactionListener; import org.apache.rocketmq.common.MQVersion; import org.apache.rocketmq.namesrv.NamesrvController; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.test.client.rmq.RMQAsyncSendProducer; import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer; import org.apache.rocketmq.test.client.rmq.RMQNormalProducer; +import org.apache.rocketmq.test.client.rmq.RMQTransactionalProducer; import org.apache.rocketmq.test.clientinterface.AbstractMQConsumer; import org.apache.rocketmq.test.clientinterface.AbstractMQProducer; import org.apache.rocketmq.test.factory.ConsumerFactory; @@ -96,6 +98,15 @@ public class BaseConf { return producer; } + public static RMQTransactionalProducer getTransactionalProducer(String nsAddr, String topic, TransactionListener transactionListener) { + RMQTransactionalProducer producer = new RMQTransactionalProducer(nsAddr, topic, false, transactionListener); + if (debug) { + producer.setDebug(); + } + mqClients.add(producer); + return producer; + } + public static RMQNormalProducer getProducer(String nsAddr, String topic, String producerGoup, String instanceName) { RMQNormalProducer producer = new RMQNormalProducer(nsAddr, topic, producerGoup, diff --git a/test/src/test/java/org/apache/rocketmq/test/client/producer/transaction/TransactionalMsgIT.java b/test/src/test/java/org/apache/rocketmq/test/client/producer/transaction/TransactionalMsgIT.java new file mode 100644 index 0000000000000000000000000000000000000000..b5f46c298732e31ce1bee59b17e62a200f96eee9 --- /dev/null +++ b/test/src/test/java/org/apache/rocketmq/test/client/producer/transaction/TransactionalMsgIT.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.client.producer.transaction; + +import org.apache.log4j.Logger; +import org.apache.rocketmq.client.producer.LocalTransactionState; +import org.apache.rocketmq.client.producer.TransactionListener; +import org.apache.rocketmq.common.Pair; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.test.base.BaseConf; +import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer; +import org.apache.rocketmq.test.client.rmq.RMQTransactionalProducer; +import org.apache.rocketmq.test.factory.MQMessageFactory; +import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener; +import org.apache.rocketmq.test.util.MQWait; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import static com.google.common.truth.Truth.assertThat; + +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; + +public class TransactionalMsgIT extends BaseConf { + private static Logger logger = Logger.getLogger(TransactionalMsgIT.class); + private RMQTransactionalProducer producer = null; + private RMQNormalConsumer consumer = null; + private String topic = null; + + @Before + public void setUp() { + topic = initTopic(); + logger.info(String.format("use topic: %s;", topic)); + producer = getTransactionalProducer(nsAddr, topic, new TransactionListenerImpl()); + consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListener()); + } + + @After + public void tearDown() { + super.shutdown(); + } + + @Test + public void testMessageVisibility() throws Exception { + Thread.sleep(3000); + int msgSize = 120; + List msgs = MQMessageFactory.getMsg(topic, msgSize); + for (int i = 0; i < msgSize; i++) { + producer.send(msgs.get(i), getTransactionHandle(i)); + } + boolean recvAll = MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(), consumer.getListener()); + assertThat(recvAll).isEqualTo(true); + } + + static Pair getTransactionHandle(int msgIndex) { + switch (msgIndex % 5) { + case 0: + //commit immediately + return new Pair<>(true, LocalTransactionState.COMMIT_MESSAGE); + case 1: + //rollback immediately + return new Pair<>(true, LocalTransactionState.ROLLBACK_MESSAGE); + case 2: + //commit in check + return new Pair<>(false, LocalTransactionState.COMMIT_MESSAGE); + case 3: + //rollback in check + return new Pair<>(false, LocalTransactionState.ROLLBACK_MESSAGE); + case 4: + default: + return new Pair<>(false, LocalTransactionState.UNKNOW); + + } + } + + static private class TransactionListenerImpl implements TransactionListener { + ConcurrentHashMap checkStatus = new ConcurrentHashMap<>(); + + @Override + public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { + Pair transactionHandle = (Pair) arg; + if (transactionHandle.getObject1()) { + return transactionHandle.getObject2(); + } else { + checkStatus.put(msg.getTransactionId(), transactionHandle.getObject2()); + return LocalTransactionState.UNKNOW; + } + } + + @Override + public LocalTransactionState checkLocalTransaction(MessageExt msg) { + LocalTransactionState state = checkStatus.get(msg.getTransactionId()); + if (state == null) { + return LocalTransactionState.UNKNOW; + } else { + return state; + } + } + } +}