提交 7a6f0ac0 编写于 作者: S shenhui.backend

1. eliminate redundant CountDownLaunch in GroupCommitRequest

2. fix properties when store message
3. add IT for transaction
4. correct resonse code when slave down
上级 f6c05db9
......@@ -275,6 +275,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
msgInner.setBody(body);
msgInner.setFlag(requestHeader.getFlag());
MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties()));
msgInner.setPropertiesString(requestHeader.getProperties());
msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
msgInner.setBornHost(ctx.channel().remoteAddress());
......
......@@ -47,6 +47,7 @@ import org.mockito.junit.MockitoJUnitRunner;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
......@@ -87,6 +88,14 @@ public class TransactionalMessageBridgeTest {
assertThat(result.getPutMessageStatus()).isEqualTo(PutMessageStatus.PUT_OK);
}
@Test
public void testAsyncPutHalfMessage() throws Exception {
when(messageStore.asyncPutMessage(any(MessageExtBrokerInner.class)))
.thenReturn(CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK))));
CompletableFuture<PutMessageResult> result = transactionBridge.asyncPutHalfMessage(createMessageBrokerInner());
assertThat(result.get().getPutMessageStatus()).isEqualTo(PutMessageStatus.PUT_OK);
}
@Test
public void testFetchMessageQueues() {
Set<MessageQueue> messageQueues = transactionBridge.fetchMessageQueues(MixAll.RMQ_SYS_TRANS_HALF_TOPIC);
......
......@@ -24,7 +24,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
......@@ -661,14 +660,14 @@ public class CommitLog {
storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());
CompletableFuture<Boolean> flushResultFuture = submitFlushRequest(result, putMessageResult, msg);
CompletableFuture<Boolean> replicaResultFuture = submitReplicaRequest(result, putMessageResult, msg);
return flushResultFuture.thenCombine(replicaResultFuture, (flushOK, replicaOK) -> {
if (!flushOK) {
CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, putMessageResult, msg);
CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, putMessageResult, msg);
return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> {
if (flushStatus != PutMessageStatus.PUT_OK) {
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
}
if (!replicaOK) {
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
if (replicaStatus != PutMessageStatus.PUT_OK) {
putMessageResult.setPutMessageStatus(replicaStatus);
}
return putMessageResult;
});
......@@ -762,15 +761,15 @@ public class CommitLog {
storeStatsService.getSinglePutMessageTopicTimesTotal(messageExtBatch.getTopic()).addAndGet(result.getMsgNum());
storeStatsService.getSinglePutMessageTopicSizeTotal(messageExtBatch.getTopic()).addAndGet(result.getWroteBytes());
CompletableFuture<Boolean> flushOKFuture = submitFlushRequest(result, putMessageResult, messageExtBatch);
CompletableFuture<Boolean> replicaOKFuture = submitReplicaRequest(result, putMessageResult, messageExtBatch);
return flushOKFuture.thenCombine(replicaOKFuture, (flushOK, replicaOK) -> {
if (!flushOK) {
CompletableFuture<PutMessageStatus> flushOKFuture = submitFlushRequest(result, putMessageResult, messageExtBatch);
CompletableFuture<PutMessageStatus> replicaOKFuture = submitReplicaRequest(result, putMessageResult, messageExtBatch);
return flushOKFuture.thenCombine(replicaOKFuture, (flushStatus, replicaStatus) -> {
if (flushStatus != PutMessageStatus.PUT_OK) {
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
}
if (!replicaOK) {
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
if (replicaStatus != PutMessageStatus.PUT_OK) {
putMessageResult.setPutMessageStatus(replicaStatus);
}
return putMessageResult;
});
......@@ -900,7 +899,7 @@ public class CommitLog {
return putMessageResult;
}
public CompletableFuture<Boolean> submitFlushRequest(AppendMessageResult result, PutMessageResult putMessageResult,
public CompletableFuture<PutMessageStatus> submitFlushRequest(AppendMessageResult result, PutMessageResult putMessageResult,
MessageExt messageExt) {
// Synchronization flush
if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
......@@ -912,7 +911,7 @@ public class CommitLog {
return request.future();
} else {
service.wakeup();
return CompletableFuture.completedFuture(true);
return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
}
}
// Asynchronous flush
......@@ -922,11 +921,11 @@ public class CommitLog {
} else {
commitLogService.wakeup();
}
return CompletableFuture.completedFuture(true);
return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
}
}
public CompletableFuture<Boolean> submitReplicaRequest(AppendMessageResult result, PutMessageResult putMessageResult,
public CompletableFuture<PutMessageStatus> submitReplicaRequest(AppendMessageResult result, PutMessageResult putMessageResult,
MessageExt messageExt) {
if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
HAService service = this.defaultMessageStore.getHaService();
......@@ -939,11 +938,11 @@ public class CommitLog {
return request.future();
}
else {
return CompletableFuture.completedFuture(false);
return CompletableFuture.completedFuture(PutMessageStatus.SLAVE_NOT_AVAILABLE);
}
}
}
return CompletableFuture.completedFuture(true);
return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
}
......@@ -954,15 +953,15 @@ public class CommitLog {
if (messageExt.isWaitStoreMsgOK()) {
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
service.putRequest(request);
CompletableFuture<Boolean> flushOkFuture = request.future();
boolean flushOK = false;
CompletableFuture<PutMessageStatus> flushOkFuture = request.future();
PutMessageStatus flushStatus = null;
try {
flushOK = flushOkFuture.get(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(),
flushStatus = flushOkFuture.get(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(),
TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
//flushOK=false;
}
if (!flushOK) {
if (flushStatus != PutMessageStatus.PUT_OK) {
log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()
+ " client address: " + messageExt.getBornHostString());
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
......@@ -990,14 +989,13 @@ public class CommitLog {
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
service.putRequest(request);
service.getWaitNotifyObject().wakeupAll();
boolean flushOK = false;
PutMessageStatus replicaStatus = null;
try {
flushOK = request.future().get(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(),
replicaStatus = request.future().get(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(),
TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
//flushOK=false;
}
if (!flushOK) {
if (replicaStatus != PutMessageStatus.PUT_OK) {
log.error("do sync transfer other node, wait return, but failed, topic: " + messageExt.getTopic() + " tags: "
+ messageExt.getTags() + " client address: " + messageExt.getBornHostNameString());
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
......@@ -1367,8 +1365,7 @@ public class CommitLog {
public static class GroupCommitRequest {
private final long nextOffset;
private final CountDownLatch countDownLatch = new CountDownLatch(1);
private CompletableFuture<Boolean> flushOk = new CompletableFuture<>();
private CompletableFuture<PutMessageStatus> flushOKFuture = new CompletableFuture<>();
private final long startTimestamp = System.currentTimeMillis();
private long timeoutMillis = Long.MAX_VALUE;
......@@ -1388,23 +1385,15 @@ public class CommitLog {
public void wakeupCustomer(final boolean flushOK) {
long endTimestamp = System.currentTimeMillis();
this.flushOk.complete(flushOK && ((endTimestamp - this.startTimestamp) <= this.timeoutMillis));
this.countDownLatch.countDown();
PutMessageStatus result = (flushOK && ((endTimestamp - this.startTimestamp) <= this.timeoutMillis)) ?
PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_SLAVE_TIMEOUT;
this.flushOKFuture.complete(result);
}
public CompletableFuture<Boolean> future() {
return flushOk;
public CompletableFuture<PutMessageStatus> future() {
return flushOKFuture;
}
public boolean waitForFlush(long timeout) {
try {
this.countDownLatch.await(timeout, TimeUnit.MILLISECONDS);
return flushOk.get();
} catch (InterruptedException | ExecutionException e) {
log.error("Interrupted", e);
return false;
}
}
}
/**
......
......@@ -144,10 +144,12 @@ public class HATest {
//shutdown slave, putMessage should return FLUSH_SLAVE_TIMEOUT
slaveMessageStore.shutdown();
//wait to let master clean the slave's connection
Thread.sleep(masterMessageStoreConfig.getHaHousekeepingInterval() + 500);
for (long i = 0; i < totalMsgs; i++) {
CompletableFuture<PutMessageResult> putResultFuture = messageStore.asyncPutMessage(buildMessage());
PutMessageResult result = putResultFuture.get();
assertEquals(PutMessageStatus.FLUSH_SLAVE_TIMEOUT, result.getPutMessageStatus());
assertEquals(PutMessageStatus.SLAVE_NOT_AVAILABLE, result.getPutMessageStatus());
}
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.test.client.rmq;
import org.apache.log4j.Logger;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.test.clientinterface.AbstractMQProducer;
import org.apache.rocketmq.test.sendresult.ResultWrapper;
public class RMQTransactionalProducer extends AbstractMQProducer {
private static Logger logger = Logger.getLogger(RMQTransactionalProducer.class);
private TransactionMQProducer producer = null;
private String nsAddr = null;
public RMQTransactionalProducer(String nsAddr, String topic, TransactionListener transactionListener) {
this(nsAddr, topic, false, transactionListener);
}
public RMQTransactionalProducer(String nsAddr, String topic, boolean useTLS, TransactionListener transactionListener) {
super(topic);
this.nsAddr = nsAddr;
create(useTLS, transactionListener);
start();
}
protected void create(boolean useTLS, TransactionListener transactionListener) {
producer = new TransactionMQProducer();
producer.setProducerGroup(getProducerGroupName());
producer.setInstanceName(getProducerInstanceName());
producer.setTransactionListener(transactionListener);
producer.setUseTLS(useTLS);
if (nsAddr != null) {
producer.setNamesrvAddr(nsAddr);
}
}
public void start() {
try {
producer.start();
super.setStartSuccess(true);
} catch (MQClientException e) {
super.setStartSuccess(false);
logger.error(e);
e.printStackTrace();
}
}
@Override
public ResultWrapper send(Object msg, Object arg) {
boolean commitMsg = ((Pair<Boolean, LocalTransactionState>) arg).getObject2() == LocalTransactionState.COMMIT_MESSAGE;
org.apache.rocketmq.client.producer.SendResult metaqResult = null;
Message message = (Message) msg;
try {
long start = System.currentTimeMillis();
metaqResult = producer.sendMessageInTransaction(message, arg);
this.msgRTs.addData(System.currentTimeMillis() - start);
if (isDebug) {
logger.info(metaqResult);
}
sendResult.setMsgId(metaqResult.getMsgId());
sendResult.setSendResult(true);
sendResult.setBrokerIp(metaqResult.getMessageQueue().getBrokerName());
if (commitMsg) {
msgBodys.addData(new String(message.getBody()));
}
originMsgs.addData(msg);
originMsgIndex.put(new String(message.getBody()), metaqResult);
} catch (MQClientException e) {
if (isDebug) {
e.printStackTrace();
}
sendResult.setSendResult(false);
sendResult.setSendException(e);
errorMsgs.addData(msg);
}
return sendResult;
}
@Override
public void shutdown() {
producer.shutdown();
}
}
......@@ -22,12 +22,14 @@ import java.util.List;
import org.apache.log4j.Logger;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.namesrv.NamesrvController;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.test.client.rmq.RMQAsyncSendProducer;
import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
import org.apache.rocketmq.test.client.rmq.RMQTransactionalProducer;
import org.apache.rocketmq.test.clientinterface.AbstractMQConsumer;
import org.apache.rocketmq.test.clientinterface.AbstractMQProducer;
import org.apache.rocketmq.test.factory.ConsumerFactory;
......@@ -96,6 +98,15 @@ public class BaseConf {
return producer;
}
public static RMQTransactionalProducer getTransactionalProducer(String nsAddr, String topic, TransactionListener transactionListener) {
RMQTransactionalProducer producer = new RMQTransactionalProducer(nsAddr, topic, false, transactionListener);
if (debug) {
producer.setDebug();
}
mqClients.add(producer);
return producer;
}
public static RMQNormalProducer getProducer(String nsAddr, String topic, String producerGoup,
String instanceName) {
RMQNormalProducer producer = new RMQNormalProducer(nsAddr, topic, producerGoup,
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.test.client.producer.transaction;
import org.apache.log4j.Logger;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.test.base.BaseConf;
import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
import org.apache.rocketmq.test.client.rmq.RMQTransactionalProducer;
import org.apache.rocketmq.test.factory.MQMessageFactory;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.test.util.MQWait;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import static com.google.common.truth.Truth.assertThat;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
public class TransactionalMsgIT extends BaseConf {
private static Logger logger = Logger.getLogger(TransactionalMsgIT.class);
private RMQTransactionalProducer producer = null;
private RMQNormalConsumer consumer = null;
private String topic = null;
@Before
public void setUp() {
topic = initTopic();
logger.info(String.format("use topic: %s;", topic));
producer = getTransactionalProducer(nsAddr, topic, new TransactionListenerImpl());
consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListener());
}
@After
public void tearDown() {
super.shutdown();
}
@Test
public void testMessageVisibility() throws Exception {
Thread.sleep(3000);
int msgSize = 120;
List<Object> msgs = MQMessageFactory.getMsg(topic, msgSize);
for (int i = 0; i < msgSize; i++) {
producer.send(msgs.get(i), getTransactionHandle(i));
}
boolean recvAll = MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(), consumer.getListener());
assertThat(recvAll).isEqualTo(true);
}
static Pair<Boolean, LocalTransactionState> getTransactionHandle(int msgIndex) {
switch (msgIndex % 5) {
case 0:
//commit immediately
return new Pair<>(true, LocalTransactionState.COMMIT_MESSAGE);
case 1:
//rollback immediately
return new Pair<>(true, LocalTransactionState.ROLLBACK_MESSAGE);
case 2:
//commit in check
return new Pair<>(false, LocalTransactionState.COMMIT_MESSAGE);
case 3:
//rollback in check
return new Pair<>(false, LocalTransactionState.ROLLBACK_MESSAGE);
case 4:
default:
return new Pair<>(false, LocalTransactionState.UNKNOW);
}
}
static private class TransactionListenerImpl implements TransactionListener {
ConcurrentHashMap<String, LocalTransactionState> checkStatus = new ConcurrentHashMap<>();
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
Pair<Boolean, LocalTransactionState> transactionHandle = (Pair<Boolean,LocalTransactionState>) arg;
if (transactionHandle.getObject1()) {
return transactionHandle.getObject2();
} else {
checkStatus.put(msg.getTransactionId(), transactionHandle.getObject2());
return LocalTransactionState.UNKNOW;
}
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
LocalTransactionState state = checkStatus.get(msg.getTransactionId());
if (state == null) {
return LocalTransactionState.UNKNOW;
} else {
return state;
}
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册