未验证 提交 64e4ca72 编写于 作者: H Heng Du 提交者: GitHub

Merge pull request #1516 from shenhui0509/sync_pipeline

[ISSUE #1515] SYNC_MASTER could be change into pipeline manner 
...@@ -20,7 +20,9 @@ package org.apache.rocketmq.broker.plugin; ...@@ -20,7 +20,9 @@ package org.apache.rocketmq.broker.plugin;
import java.util.HashMap; import java.util.HashMap;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CompletableFuture;
import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageExtBatch;
import org.apache.rocketmq.store.CommitLogDispatcher; import org.apache.rocketmq.store.CommitLogDispatcher;
import org.apache.rocketmq.store.ConsumeQueue; import org.apache.rocketmq.store.ConsumeQueue;
import org.apache.rocketmq.store.GetMessageResult; import org.apache.rocketmq.store.GetMessageResult;
...@@ -86,6 +88,16 @@ public abstract class AbstractPluginMessageStore implements MessageStore { ...@@ -86,6 +88,16 @@ public abstract class AbstractPluginMessageStore implements MessageStore {
return next.putMessage(msg); return next.putMessage(msg);
} }
@Override
public CompletableFuture<PutMessageResult> asyncPutMessage(MessageExtBrokerInner msg) {
return next.asyncPutMessage(msg);
}
@Override
public CompletableFuture<PutMessageResult> asyncPutMessages(MessageExtBatch messageExtBatch) {
return next.asyncPutMessages(messageExtBatch);
}
@Override @Override
public GetMessageResult getMessage(String group, String topic, int queueId, long offset, public GetMessageResult getMessage(String group, String topic, int queueId, long offset,
int maxMsgNums, final MessageFilter messageFilter) { int maxMsgNums, final MessageFilter messageFilter) {
......
...@@ -49,11 +49,12 @@ import org.apache.rocketmq.logging.InternalLogger; ...@@ -49,11 +49,12 @@ import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.AsyncNettyRequestProcessor;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.MessageExtBrokerInner; import org.apache.rocketmq.store.MessageExtBrokerInner;
public abstract class AbstractSendMessageProcessor implements NettyRequestProcessor { public abstract class AbstractSendMessageProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
protected static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); protected static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
protected final static int DLQ_NUMS_PER_GROUP = 1; protected final static int DLQ_NUMS_PER_GROUP = 1;
......
...@@ -122,6 +122,7 @@ import org.apache.rocketmq.logging.InternalLoggerFactory; ...@@ -122,6 +122,7 @@ import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.netty.AsyncNettyRequestProcessor;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.protocol.LanguageCode; import org.apache.rocketmq.remoting.protocol.LanguageCode;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
...@@ -136,7 +137,7 @@ import org.apache.rocketmq.store.PutMessageResult; ...@@ -136,7 +137,7 @@ import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.PutMessageStatus; import org.apache.rocketmq.store.PutMessageStatus;
import org.apache.rocketmq.store.SelectMappedBufferResult; import org.apache.rocketmq.store.SelectMappedBufferResult;
public class AdminBrokerProcessor implements NettyRequestProcessor { public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private final BrokerController brokerController; private final BrokerController brokerController;
......
...@@ -40,9 +40,10 @@ import org.apache.rocketmq.logging.InternalLoggerFactory; ...@@ -40,9 +40,10 @@ import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.netty.AsyncNettyRequestProcessor;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public class ClientManageProcessor implements NettyRequestProcessor { public class ClientManageProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private final BrokerController brokerController; private final BrokerController brokerController;
......
...@@ -34,10 +34,11 @@ import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetResponseHe ...@@ -34,10 +34,11 @@ import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetResponseHe
import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.AsyncNettyRequestProcessor;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public class ConsumerManageProcessor implements NettyRequestProcessor { public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private final BrokerController brokerController; private final BrokerController brokerController;
......
...@@ -32,6 +32,7 @@ import org.apache.rocketmq.logging.InternalLogger; ...@@ -32,6 +32,7 @@ import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.AsyncNettyRequestProcessor;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.MessageExtBrokerInner; import org.apache.rocketmq.store.MessageExtBrokerInner;
...@@ -41,7 +42,7 @@ import org.apache.rocketmq.store.config.BrokerRole; ...@@ -41,7 +42,7 @@ import org.apache.rocketmq.store.config.BrokerRole;
/** /**
* EndTransaction processor: process commit and rollback message * EndTransaction processor: process commit and rollback message
*/ */
public class EndTransactionProcessor implements NettyRequestProcessor { public class EndTransactionProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
private static final InternalLogger LOGGER = InternalLoggerFactory.getLogger(LoggerName.TRANSACTION_LOGGER_NAME); private static final InternalLogger LOGGER = InternalLoggerFactory.getLogger(LoggerName.TRANSACTION_LOGGER_NAME);
private final BrokerController brokerController; private final BrokerController brokerController;
......
...@@ -21,10 +21,11 @@ import org.apache.rocketmq.broker.BrokerController; ...@@ -21,10 +21,11 @@ import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.netty.AsyncNettyRequestProcessor;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public class ForwardRequestProcessor implements NettyRequestProcessor { public class ForwardRequestProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private final BrokerController brokerController; private final BrokerController brokerController;
......
...@@ -57,6 +57,7 @@ import org.apache.rocketmq.logging.InternalLoggerFactory; ...@@ -57,6 +57,7 @@ import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.common.RemotingUtil; import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.AsyncNettyRequestProcessor;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.netty.RequestTask; import org.apache.rocketmq.remoting.netty.RequestTask;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
...@@ -67,7 +68,7 @@ import org.apache.rocketmq.store.PutMessageResult; ...@@ -67,7 +68,7 @@ import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.config.BrokerRole; import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.stats.BrokerStatsManager; import org.apache.rocketmq.store.stats.BrokerStatsManager;
public class PullMessageProcessor implements NettyRequestProcessor { public class PullMessageProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private final BrokerController brokerController; private final BrokerController brokerController;
private List<ConsumeMessageHook> consumeMessageHookList; private List<ConsumeMessageHook> consumeMessageHookList;
......
...@@ -33,12 +33,13 @@ import org.apache.rocketmq.common.protocol.header.QueryMessageRequestHeader; ...@@ -33,12 +33,13 @@ import org.apache.rocketmq.common.protocol.header.QueryMessageRequestHeader;
import org.apache.rocketmq.common.protocol.header.QueryMessageResponseHeader; import org.apache.rocketmq.common.protocol.header.QueryMessageResponseHeader;
import org.apache.rocketmq.common.protocol.header.ViewMessageRequestHeader; import org.apache.rocketmq.common.protocol.header.ViewMessageRequestHeader;
import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.AsyncNettyRequestProcessor;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.QueryMessageResult; import org.apache.rocketmq.store.QueryMessageResult;
import org.apache.rocketmq.store.SelectMappedBufferResult; import org.apache.rocketmq.store.SelectMappedBufferResult;
public class QueryMessageProcessor implements NettyRequestProcessor { public class QueryMessageProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private final BrokerController brokerController; private final BrokerController brokerController;
......
...@@ -20,6 +20,7 @@ import org.apache.rocketmq.common.message.MessageExt; ...@@ -20,6 +20,7 @@ import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.header.EndTransactionRequestHeader; import org.apache.rocketmq.common.protocol.header.EndTransactionRequestHeader;
import org.apache.rocketmq.store.MessageExtBrokerInner; import org.apache.rocketmq.store.MessageExtBrokerInner;
import org.apache.rocketmq.store.PutMessageResult; import org.apache.rocketmq.store.PutMessageResult;
import java.util.concurrent.CompletableFuture;
public interface TransactionalMessageService { public interface TransactionalMessageService {
...@@ -31,6 +32,14 @@ public interface TransactionalMessageService { ...@@ -31,6 +32,14 @@ public interface TransactionalMessageService {
*/ */
PutMessageResult prepareMessage(MessageExtBrokerInner messageInner); PutMessageResult prepareMessage(MessageExtBrokerInner messageInner);
/**
* Process prepare message in async manner, we should put this message to storage service
*
* @param messageInner Prepare(Half) message.
* @return CompletableFuture of put result, will be completed at put success(flush and replica done)
*/
CompletableFuture<PutMessageResult> asyncPrepareMessage(MessageExtBrokerInner messageInner);
/** /**
* Delete prepare message when this message has been committed or rolled back. * Delete prepare message when this message has been committed or rolled back.
* *
......
...@@ -47,6 +47,7 @@ import java.util.ArrayList; ...@@ -47,6 +47,7 @@ import java.util.ArrayList;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
public class TransactionalMessageBridge { public class TransactionalMessageBridge {
...@@ -195,6 +196,10 @@ public class TransactionalMessageBridge { ...@@ -195,6 +196,10 @@ public class TransactionalMessageBridge {
return store.putMessage(parseHalfMessageInner(messageInner)); return store.putMessage(parseHalfMessageInner(messageInner));
} }
public CompletableFuture<PutMessageResult> asyncPutHalfMessage(MessageExtBrokerInner messageInner) {
return store.asyncPutMessage(parseHalfMessageInner(messageInner));
}
private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) { private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic()); MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID, MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
......
...@@ -40,6 +40,7 @@ import java.util.Date; ...@@ -40,6 +40,7 @@ import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
public class TransactionalMessageServiceImpl implements TransactionalMessageService { public class TransactionalMessageServiceImpl implements TransactionalMessageService {
...@@ -59,6 +60,11 @@ public class TransactionalMessageServiceImpl implements TransactionalMessageServ ...@@ -59,6 +60,11 @@ public class TransactionalMessageServiceImpl implements TransactionalMessageServ
private ConcurrentHashMap<MessageQueue, MessageQueue> opQueueMap = new ConcurrentHashMap<>(); private ConcurrentHashMap<MessageQueue, MessageQueue> opQueueMap = new ConcurrentHashMap<>();
@Override
public CompletableFuture<PutMessageResult> asyncPrepareMessage(MessageExtBrokerInner messageInner) {
return transactionalMessageBridge.asyncPutHalfMessage(messageInner);
}
@Override @Override
public PutMessageResult prepareMessage(MessageExtBrokerInner messageInner) { public PutMessageResult prepareMessage(MessageExtBrokerInner messageInner) {
return transactionalMessageBridge.putHalfMessage(messageInner); return transactionalMessageBridge.putHalfMessage(messageInner);
......
...@@ -56,6 +56,7 @@ import java.net.InetSocketAddress; ...@@ -56,6 +56,7 @@ import java.net.InetSocketAddress;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CompletableFuture;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
...@@ -93,13 +94,15 @@ public class SendMessageProcessorTest { ...@@ -93,13 +94,15 @@ public class SendMessageProcessorTest {
@Test @Test
public void testProcessRequest() throws RemotingCommandException { public void testProcessRequest() throws RemotingCommandException {
when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK))); when(messageStore.asyncPutMessage(any(MessageExtBrokerInner.class)))
.thenReturn(CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK))));
assertPutResult(ResponseCode.SUCCESS); assertPutResult(ResponseCode.SUCCESS);
} }
@Test @Test
public void testProcessRequest_WithHook() throws RemotingCommandException { public void testProcessRequest_WithHook() throws RemotingCommandException {
when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK))); when(messageStore.asyncPutMessage(any(MessageExtBrokerInner.class)))
.thenReturn(CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK))));
List<SendMessageHook> sendMessageHookList = new ArrayList<>(); List<SendMessageHook> sendMessageHookList = new ArrayList<>();
final SendMessageContext[] sendMessageContext = new SendMessageContext[1]; final SendMessageContext[] sendMessageContext = new SendMessageContext[1];
SendMessageHook sendMessageHook = new SendMessageHook() { SendMessageHook sendMessageHook = new SendMessageHook() {
...@@ -129,55 +132,64 @@ public class SendMessageProcessorTest { ...@@ -129,55 +132,64 @@ public class SendMessageProcessorTest {
@Test @Test
public void testProcessRequest_FlushTimeOut() throws RemotingCommandException { public void testProcessRequest_FlushTimeOut() throws RemotingCommandException {
when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult(PutMessageStatus.FLUSH_DISK_TIMEOUT, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))); when(messageStore.asyncPutMessage(any(MessageExtBrokerInner.class)))
.thenReturn(CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.FLUSH_DISK_TIMEOUT, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))));
assertPutResult(ResponseCode.FLUSH_DISK_TIMEOUT); assertPutResult(ResponseCode.FLUSH_DISK_TIMEOUT);
} }
@Test @Test
public void testProcessRequest_MessageIllegal() throws RemotingCommandException { public void testProcessRequest_MessageIllegal() throws RemotingCommandException {
when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))); when(messageStore.asyncPutMessage(any(MessageExtBrokerInner.class)))
.thenReturn(CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))));
assertPutResult(ResponseCode.MESSAGE_ILLEGAL); assertPutResult(ResponseCode.MESSAGE_ILLEGAL);
} }
@Test @Test
public void testProcessRequest_CreateMappedFileFailed() throws RemotingCommandException { public void testProcessRequest_CreateMappedFileFailed() throws RemotingCommandException {
when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))); when(messageStore.asyncPutMessage(any(MessageExtBrokerInner.class)))
.thenReturn(CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))));
assertPutResult(ResponseCode.SYSTEM_ERROR); assertPutResult(ResponseCode.SYSTEM_ERROR);
} }
@Test @Test
public void testProcessRequest_FlushSlaveTimeout() throws RemotingCommandException { public void testProcessRequest_FlushSlaveTimeout() throws RemotingCommandException {
when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult(PutMessageStatus.FLUSH_SLAVE_TIMEOUT, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))); when(messageStore.asyncPutMessage(any(MessageExtBrokerInner.class)))
.thenReturn(CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.FLUSH_SLAVE_TIMEOUT, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))));
assertPutResult(ResponseCode.FLUSH_SLAVE_TIMEOUT); assertPutResult(ResponseCode.FLUSH_SLAVE_TIMEOUT);
} }
@Test @Test
public void testProcessRequest_PageCacheBusy() throws RemotingCommandException { public void testProcessRequest_PageCacheBusy() throws RemotingCommandException {
when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))); when(messageStore.asyncPutMessage(any(MessageExtBrokerInner.class)))
.thenReturn(CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))));
assertPutResult(ResponseCode.SYSTEM_ERROR); assertPutResult(ResponseCode.SYSTEM_ERROR);
} }
@Test @Test
public void testProcessRequest_PropertiesTooLong() throws RemotingCommandException { public void testProcessRequest_PropertiesTooLong() throws RemotingCommandException {
when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))); when(messageStore.asyncPutMessage(any(MessageExtBrokerInner.class)))
.thenReturn(CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))));
assertPutResult(ResponseCode.MESSAGE_ILLEGAL); assertPutResult(ResponseCode.MESSAGE_ILLEGAL);
} }
@Test @Test
public void testProcessRequest_ServiceNotAvailable() throws RemotingCommandException { public void testProcessRequest_ServiceNotAvailable() throws RemotingCommandException {
when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))); when(messageStore.asyncPutMessage(any(MessageExtBrokerInner.class)))
.thenReturn(CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))));
assertPutResult(ResponseCode.SERVICE_NOT_AVAILABLE); assertPutResult(ResponseCode.SERVICE_NOT_AVAILABLE);
} }
@Test @Test
public void testProcessRequest_SlaveNotAvailable() throws RemotingCommandException { public void testProcessRequest_SlaveNotAvailable() throws RemotingCommandException {
when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult(PutMessageStatus.SLAVE_NOT_AVAILABLE, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))); when(messageStore.asyncPutMessage(any(MessageExtBrokerInner.class)))
.thenReturn(CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.SLAVE_NOT_AVAILABLE, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))));
assertPutResult(ResponseCode.SLAVE_NOT_AVAILABLE); assertPutResult(ResponseCode.SLAVE_NOT_AVAILABLE);
} }
@Test @Test
public void testProcessRequest_WithMsgBack() throws RemotingCommandException { public void testProcessRequest_WithMsgBack() throws RemotingCommandException {
when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK))); when(messageStore.asyncPutMessage(any(MessageExtBrokerInner.class)))
.thenReturn(CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK))));
final RemotingCommand request = createSendMsgBackCommand(RequestCode.CONSUMER_SEND_MSG_BACK); final RemotingCommand request = createSendMsgBackCommand(RequestCode.CONSUMER_SEND_MSG_BACK);
sendMessageProcessor = new SendMessageProcessor(brokerController); sendMessageProcessor = new SendMessageProcessor(brokerController);
...@@ -189,7 +201,8 @@ public class SendMessageProcessorTest { ...@@ -189,7 +201,8 @@ public class SendMessageProcessorTest {
@Test @Test
public void testProcessRequest_Transaction() throws RemotingCommandException { public void testProcessRequest_Transaction() throws RemotingCommandException {
brokerController.setTransactionalMessageService(transactionMsgService); brokerController.setTransactionalMessageService(transactionMsgService);
when(brokerController.getTransactionalMessageService().prepareMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK))); when(brokerController.getTransactionalMessageService().asyncPrepareMessage(any(MessageExtBrokerInner.class)))
.thenReturn(CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK))));
RemotingCommand request = createSendTransactionMsgCommand(RequestCode.SEND_MESSAGE); RemotingCommand request = createSendTransactionMsgCommand(RequestCode.SEND_MESSAGE);
final RemotingCommand[] response = new RemotingCommand[1]; final RemotingCommand[] response = new RemotingCommand[1];
doAnswer(new Answer() { doAnswer(new Answer() {
......
...@@ -47,6 +47,7 @@ import org.mockito.junit.MockitoJUnitRunner; ...@@ -47,6 +47,7 @@ import org.mockito.junit.MockitoJUnitRunner;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CompletableFuture;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
...@@ -87,6 +88,14 @@ public class TransactionalMessageBridgeTest { ...@@ -87,6 +88,14 @@ public class TransactionalMessageBridgeTest {
assertThat(result.getPutMessageStatus()).isEqualTo(PutMessageStatus.PUT_OK); assertThat(result.getPutMessageStatus()).isEqualTo(PutMessageStatus.PUT_OK);
} }
@Test
public void testAsyncPutHalfMessage() throws Exception {
when(messageStore.asyncPutMessage(any(MessageExtBrokerInner.class)))
.thenReturn(CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK))));
CompletableFuture<PutMessageResult> result = transactionBridge.asyncPutHalfMessage(createMessageBrokerInner());
assertThat(result.get().getPutMessageStatus()).isEqualTo(PutMessageStatus.PUT_OK);
}
@Test @Test
public void testFetchMessageQueues() { public void testFetchMessageQueues() {
Set<MessageQueue> messageQueues = transactionBridge.fetchMessageQueues(MixAll.RMQ_SYS_TRANS_HALF_TOPIC); Set<MessageQueue> messageQueues = transactionBridge.fetchMessageQueues(MixAll.RMQ_SYS_TRANS_HALF_TOPIC);
......
...@@ -27,6 +27,8 @@ import org.apache.rocketmq.logging.InternalLoggerFactory; ...@@ -27,6 +27,8 @@ import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.store.MessageExtBrokerInner; import org.apache.rocketmq.store.MessageExtBrokerInner;
import org.apache.rocketmq.store.PutMessageResult; import org.apache.rocketmq.store.PutMessageResult;
import java.util.concurrent.CompletableFuture;
public class TransactionalMessageServiceImpl implements TransactionalMessageService { public class TransactionalMessageServiceImpl implements TransactionalMessageService {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.TRANSACTION_LOGGER_NAME); private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.TRANSACTION_LOGGER_NAME);
...@@ -35,6 +37,11 @@ public class TransactionalMessageServiceImpl implements TransactionalMessageServ ...@@ -35,6 +37,11 @@ public class TransactionalMessageServiceImpl implements TransactionalMessageServ
return null; return null;
} }
@Override
public CompletableFuture<PutMessageResult> asyncPrepareMessage(MessageExtBrokerInner messageInner) {
return null;
}
@Override @Override
public boolean deletePrepareMessage(MessageExt messageExt) { public boolean deletePrepareMessage(MessageExt messageExt) {
return false; return false;
......
...@@ -52,10 +52,11 @@ import org.apache.rocketmq.logging.InternalLogger; ...@@ -52,10 +52,11 @@ import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.common.RemotingUtil; import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.AsyncNettyRequestProcessor;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public class ClientRemotingProcessor implements NettyRequestProcessor { public class ClientRemotingProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
private final InternalLogger log = ClientLogger.getLog(); private final InternalLogger log = ClientLogger.getLog();
private final MQClientInstance mqClientFactory; private final MQClientInstance mqClientFactory;
......
...@@ -54,10 +54,11 @@ import org.apache.rocketmq.common.protocol.route.TopicRouteData; ...@@ -54,10 +54,11 @@ import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.namesrv.NamesrvController; import org.apache.rocketmq.namesrv.NamesrvController;
import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.AsyncNettyRequestProcessor;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public class DefaultRequestProcessor implements NettyRequestProcessor { public class DefaultRequestProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
private static InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME); private static InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
protected final NamesrvController namesrvController; protected final NamesrvController namesrvController;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.remoting.netty;
import io.netty.channel.ChannelHandlerContext;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public abstract class AsyncNettyRequestProcessor implements NettyRequestProcessor {
public void asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand request, RemotingResponseCallback responseCallback) throws Exception {
RemotingCommand response = processRequest(ctx, request);
responseCallback.callback(response);
}
}
...@@ -36,6 +36,7 @@ import java.util.concurrent.LinkedBlockingQueue; ...@@ -36,6 +36,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore; import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.ChannelEventListener; import org.apache.rocketmq.remoting.ChannelEventListener;
...@@ -102,7 +103,6 @@ public abstract class NettyRemotingAbstract { ...@@ -102,7 +103,6 @@ public abstract class NettyRemotingAbstract {
protected List<RPCHook> rpcHooks = new ArrayList<RPCHook>(); protected List<RPCHook> rpcHooks = new ArrayList<RPCHook>();
static { static {
NettyLogger.initNettyLogger(); NettyLogger.initNettyLogger();
} }
...@@ -200,23 +200,34 @@ public abstract class NettyRemotingAbstract { ...@@ -200,23 +200,34 @@ public abstract class NettyRemotingAbstract {
public void run() { public void run() {
try { try {
doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd); doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd); final RemotingResponseCallback callback = new RemotingResponseCallback() {
doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response); @Override
public void callback(RemotingCommand response) {
if (!cmd.isOnewayRPC()) { doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);
if (response != null) { if (!cmd.isOnewayRPC()) {
response.setOpaque(opaque); if (response != null) {
response.markResponseType(); response.setOpaque(opaque);
try { response.markResponseType();
ctx.writeAndFlush(response); try {
} catch (Throwable e) { ctx.writeAndFlush(response);
log.error("process request over, but response failed", e); } catch (Throwable e) {
log.error(cmd.toString()); log.error("process request over, but response failed", e);
log.error(response.toString()); log.error(cmd.toString());
log.error(response.toString());
}
} else {
}
} }
} else {
} }
};
if (pair.getObject1() instanceof AsyncNettyRequestProcessor) {
AsyncNettyRequestProcessor processor = (AsyncNettyRequestProcessor)pair.getObject1();
processor.asyncProcessRequest(ctx, cmd, callback);
} else {
NettyRequestProcessor processor = pair.getObject1();
RemotingCommand response = processor.processRequest(ctx, cmd);
doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);
callback.callback(response);
} }
} catch (Throwable e) { } catch (Throwable e) {
log.error("process request exception", e); log.error("process request exception", e);
......
...@@ -27,4 +27,5 @@ public interface NettyRequestProcessor { ...@@ -27,4 +27,5 @@ public interface NettyRequestProcessor {
throws Exception; throws Exception;
boolean rejectRequest(); boolean rejectRequest();
} }
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.remoting.netty;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public interface RemotingResponseCallback {
void callback(RemotingCommand response);
}
...@@ -26,12 +26,7 @@ import org.apache.rocketmq.remoting.exception.RemotingConnectException; ...@@ -26,12 +26,7 @@ import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException; import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
import org.apache.rocketmq.remoting.netty.NettyClientConfig; import org.apache.rocketmq.remoting.netty.*;
import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
import org.apache.rocketmq.remoting.netty.NettyRemotingServer;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.remoting.netty.ResponseFuture;
import org.apache.rocketmq.remoting.protocol.LanguageCode; import org.apache.rocketmq.remoting.protocol.LanguageCode;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.junit.AfterClass; import org.junit.AfterClass;
...@@ -48,7 +43,7 @@ public class RemotingServerTest { ...@@ -48,7 +43,7 @@ public class RemotingServerTest {
public static RemotingServer createRemotingServer() throws InterruptedException { public static RemotingServer createRemotingServer() throws InterruptedException {
NettyServerConfig config = new NettyServerConfig(); NettyServerConfig config = new NettyServerConfig();
RemotingServer remotingServer = new NettyRemotingServer(config); RemotingServer remotingServer = new NettyRemotingServer(config);
remotingServer.registerProcessor(0, new NettyRequestProcessor() { remotingServer.registerProcessor(0, new AsyncNettyRequestProcessor() {
@Override @Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) { public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) {
request.setRemark("Hi " + ctx.channel().remoteAddress()); request.setRemark("Hi " + ctx.channel().remoteAddress());
......
...@@ -23,8 +23,11 @@ import java.util.ArrayList; ...@@ -23,8 +23,11 @@ import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.rocketmq.common.ServiceThread; import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.LoggerName;
...@@ -63,6 +66,7 @@ public class CommitLog { ...@@ -63,6 +66,7 @@ public class CommitLog {
protected volatile long confirmOffset = -1L; protected volatile long confirmOffset = -1L;
private volatile long beginTimeInLock = 0; private volatile long beginTimeInLock = 0;
protected final PutMessageLock putMessageLock; protected final PutMessageLock putMessageLock;
public CommitLog(final DefaultMessageStore defaultMessageStore) { public CommitLog(final DefaultMessageStore defaultMessageStore) {
...@@ -550,6 +554,228 @@ public class CommitLog { ...@@ -550,6 +554,228 @@ public class CommitLog {
return beginTimeInLock; return beginTimeInLock;
} }
public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
// Set the storage time
msg.setStoreTimestamp(System.currentTimeMillis());
// Set the message body BODY CRC (consider the most appropriate setting
// on the client)
msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
// Back to Results
AppendMessageResult result = null;
StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
String topic = msg.getTopic();
int queueId = msg.getQueueId();
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
// Delay Delivery
if (msg.getDelayTimeLevel() > 0) {
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}
topic = ScheduleMessageService.SCHEDULE_TOPIC;
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
// Backup real topic, queueId
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
msg.setTopic(topic);
msg.setQueueId(queueId);
}
}
long elapsedTimeInLock = 0;
MappedFile unlockMappedFile = null;
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
try {
long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
this.beginTimeInLock = beginLockTimestamp;
// Here settings are stored timestamp, in order to ensure an orderly
// global
msg.setStoreTimestamp(beginLockTimestamp);
if (null == mappedFile || mappedFile.isFull()) {
mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
}
if (null == mappedFile) {
log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
beginTimeInLock = 0;
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null));
}
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
switch (result.getStatus()) {
case PUT_OK:
break;
case END_OF_FILE:
unlockMappedFile = mappedFile;
// Create a new file, re-write the message
mappedFile = this.mappedFileQueue.getLastMappedFile(0);
if (null == mappedFile) {
// XXX: warn and notify me
log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
beginTimeInLock = 0;
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result));
}
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
break;
case MESSAGE_SIZE_EXCEEDED:
case PROPERTIES_SIZE_EXCEEDED:
beginTimeInLock = 0;
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result));
case UNKNOWN_ERROR:
beginTimeInLock = 0;
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
default:
beginTimeInLock = 0;
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
}
elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
beginTimeInLock = 0;
} finally {
putMessageLock.unlock();
}
if (elapsedTimeInLock > 500) {
log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, msg.getBody().length, result);
}
if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
}
PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
// Statistics
storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());
CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, putMessageResult, msg);
CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, putMessageResult, msg);
return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> {
if (flushStatus != PutMessageStatus.PUT_OK) {
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
}
if (replicaStatus != PutMessageStatus.PUT_OK) {
putMessageResult.setPutMessageStatus(replicaStatus);
}
return putMessageResult;
});
}
public CompletableFuture<PutMessageResult> asyncPutMessages(final MessageExtBatch messageExtBatch) {
messageExtBatch.setStoreTimestamp(System.currentTimeMillis());
AppendMessageResult result;
StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
final int tranType = MessageSysFlag.getTransactionValue(messageExtBatch.getSysFlag());
if (tranType != MessageSysFlag.TRANSACTION_NOT_TYPE) {
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null));
}
if (messageExtBatch.getDelayTimeLevel() > 0) {
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null));
}
long elapsedTimeInLock = 0;
MappedFile unlockMappedFile = null;
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
//fine-grained lock instead of the coarse-grained
MessageExtBatchEncoder batchEncoder = batchEncoderThreadLocal.get();
messageExtBatch.setEncodedBuff(batchEncoder.encode(messageExtBatch));
putMessageLock.lock();
try {
long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
this.beginTimeInLock = beginLockTimestamp;
// Here settings are stored timestamp, in order to ensure an orderly
// global
messageExtBatch.setStoreTimestamp(beginLockTimestamp);
if (null == mappedFile || mappedFile.isFull()) {
mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
}
if (null == mappedFile) {
log.error("Create mapped file1 error, topic: {} clientAddr: {}", messageExtBatch.getTopic(), messageExtBatch.getBornHostString());
beginTimeInLock = 0;
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null));
}
result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback);
switch (result.getStatus()) {
case PUT_OK:
break;
case END_OF_FILE:
unlockMappedFile = mappedFile;
// Create a new file, re-write the message
mappedFile = this.mappedFileQueue.getLastMappedFile(0);
if (null == mappedFile) {
// XXX: warn and notify me
log.error("Create mapped file2 error, topic: {} clientAddr: {}", messageExtBatch.getTopic(), messageExtBatch.getBornHostString());
beginTimeInLock = 0;
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result));
}
result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback);
break;
case MESSAGE_SIZE_EXCEEDED:
case PROPERTIES_SIZE_EXCEEDED:
beginTimeInLock = 0;
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result));
case UNKNOWN_ERROR:
default:
beginTimeInLock = 0;
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
}
elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
beginTimeInLock = 0;
} finally {
putMessageLock.unlock();
}
if (elapsedTimeInLock > 500) {
log.warn("[NOTIFYME]putMessages in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, messageExtBatch.getBody().length, result);
}
if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
}
PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
// Statistics
storeStatsService.getSinglePutMessageTopicTimesTotal(messageExtBatch.getTopic()).addAndGet(result.getMsgNum());
storeStatsService.getSinglePutMessageTopicSizeTotal(messageExtBatch.getTopic()).addAndGet(result.getWroteBytes());
CompletableFuture<PutMessageStatus> flushOKFuture = submitFlushRequest(result, putMessageResult, messageExtBatch);
CompletableFuture<PutMessageStatus> replicaOKFuture = submitReplicaRequest(result, putMessageResult, messageExtBatch);
return flushOKFuture.thenCombine(replicaOKFuture, (flushStatus, replicaStatus) -> {
if (flushStatus != PutMessageStatus.PUT_OK) {
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
}
if (replicaStatus != PutMessageStatus.PUT_OK) {
putMessageResult.setPutMessageStatus(replicaStatus);
}
return putMessageResult;
});
}
public PutMessageResult putMessage(final MessageExtBrokerInner msg) { public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
// Set the storage time // Set the storage time
msg.setStoreTimestamp(System.currentTimeMillis()); msg.setStoreTimestamp(System.currentTimeMillis());
...@@ -673,6 +899,53 @@ public class CommitLog { ...@@ -673,6 +899,53 @@ public class CommitLog {
return putMessageResult; return putMessageResult;
} }
public CompletableFuture<PutMessageStatus> submitFlushRequest(AppendMessageResult result, PutMessageResult putMessageResult,
MessageExt messageExt) {
// Synchronization flush
if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
if (messageExt.isWaitStoreMsgOK()) {
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(),
this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
service.putRequest(request);
return request.future();
} else {
service.wakeup();
return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
}
}
// Asynchronous flush
else {
if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
flushCommitLogService.wakeup();
} else {
commitLogService.wakeup();
}
return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
}
}
public CompletableFuture<PutMessageStatus> submitReplicaRequest(AppendMessageResult result, PutMessageResult putMessageResult,
MessageExt messageExt) {
if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
HAService service = this.defaultMessageStore.getHaService();
if (messageExt.isWaitStoreMsgOK()) {
if (service.isSlaveOK(result.getWroteBytes() + result.getWroteOffset())) {
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(),
this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
service.putRequest(request);
service.getWaitNotifyObject().wakeupAll();
return request.future();
}
else {
return CompletableFuture.completedFuture(PutMessageStatus.SLAVE_NOT_AVAILABLE);
}
}
}
return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
}
public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) { public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
// Synchronization flush // Synchronization flush
if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) { if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
...@@ -680,8 +953,15 @@ public class CommitLog { ...@@ -680,8 +953,15 @@ public class CommitLog {
if (messageExt.isWaitStoreMsgOK()) { if (messageExt.isWaitStoreMsgOK()) {
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes()); GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
service.putRequest(request); service.putRequest(request);
boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout()); CompletableFuture<PutMessageStatus> flushOkFuture = request.future();
if (!flushOK) { PutMessageStatus flushStatus = null;
try {
flushStatus = flushOkFuture.get(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(),
TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
//flushOK=false;
}
if (flushStatus != PutMessageStatus.PUT_OK) {
log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags() log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()
+ " client address: " + messageExt.getBornHostString()); + " client address: " + messageExt.getBornHostString());
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT); putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
...@@ -709,9 +989,13 @@ public class CommitLog { ...@@ -709,9 +989,13 @@ public class CommitLog {
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes()); GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
service.putRequest(request); service.putRequest(request);
service.getWaitNotifyObject().wakeupAll(); service.getWaitNotifyObject().wakeupAll();
boolean flushOK = PutMessageStatus replicaStatus = null;
request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout()); try {
if (!flushOK) { replicaStatus = request.future().get(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(),
TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
}
if (replicaStatus != PutMessageStatus.PUT_OK) {
log.error("do sync transfer other node, wait return, but failed, topic: " + messageExt.getTopic() + " tags: " log.error("do sync transfer other node, wait return, but failed, topic: " + messageExt.getTopic() + " tags: "
+ messageExt.getTags() + " client address: " + messageExt.getBornHostNameString()); + messageExt.getTags() + " client address: " + messageExt.getBornHostNameString());
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT); putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
...@@ -1081,31 +1365,35 @@ public class CommitLog { ...@@ -1081,31 +1365,35 @@ public class CommitLog {
public static class GroupCommitRequest { public static class GroupCommitRequest {
private final long nextOffset; private final long nextOffset;
private final CountDownLatch countDownLatch = new CountDownLatch(1); private CompletableFuture<PutMessageStatus> flushOKFuture = new CompletableFuture<>();
private volatile boolean flushOK = false; private final long startTimestamp = System.currentTimeMillis();
private long timeoutMillis = Long.MAX_VALUE;
public GroupCommitRequest(long nextOffset, long timeoutMillis) {
this.nextOffset = nextOffset;
this.timeoutMillis = timeoutMillis;
}
public GroupCommitRequest(long nextOffset) { public GroupCommitRequest(long nextOffset) {
this.nextOffset = nextOffset; this.nextOffset = nextOffset;
} }
public long getNextOffset() { public long getNextOffset() {
return nextOffset; return nextOffset;
} }
public void wakeupCustomer(final boolean flushOK) { public void wakeupCustomer(final boolean flushOK) {
this.flushOK = flushOK; long endTimestamp = System.currentTimeMillis();
this.countDownLatch.countDown(); PutMessageStatus result = (flushOK && ((endTimestamp - this.startTimestamp) <= this.timeoutMillis)) ?
PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_SLAVE_TIMEOUT;
this.flushOKFuture.complete(result);
} }
public boolean waitForFlush(long timeout) { public CompletableFuture<PutMessageStatus> future() {
try { return flushOKFuture;
this.countDownLatch.await(timeout, TimeUnit.MILLISECONDS);
return this.flushOK;
} catch (InterruptedException e) {
log.error("Interrupted", e);
return false;
}
} }
} }
/** /**
......
...@@ -31,6 +31,7 @@ import java.util.LinkedList; ...@@ -31,6 +31,7 @@ import java.util.LinkedList;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
...@@ -351,109 +352,170 @@ public class DefaultMessageStore implements MessageStore { ...@@ -351,109 +352,170 @@ public class DefaultMessageStore implements MessageStore {
} }
} }
public PutMessageResult putMessage(MessageExtBrokerInner msg) { private PutMessageStatus checkMessage(MessageExtBrokerInner msg) {
if (msg.getTopic().length() > Byte.MAX_VALUE) {
log.warn("putMessage message topic length too long " + msg.getTopic().length());
return PutMessageStatus.MESSAGE_ILLEGAL;
}
if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) {
log.warn("putMessage message properties length too long " + msg.getPropertiesString().length());
return PutMessageStatus.MESSAGE_ILLEGAL;
}
return PutMessageStatus.PUT_OK;
}
private PutMessageStatus checkMessages(MessageExtBatch messageExtBatch) {
if (messageExtBatch.getTopic().length() > Byte.MAX_VALUE) {
log.warn("putMessage message topic length too long " + messageExtBatch.getTopic().length());
return PutMessageStatus.MESSAGE_ILLEGAL;
}
if (messageExtBatch.getBody().length > messageStoreConfig.getMaxMessageSize()) {
log.warn("PutMessages body length too long " + messageExtBatch.getBody().length);
return PutMessageStatus.MESSAGE_ILLEGAL;
}
return PutMessageStatus.PUT_OK;
}
private PutMessageStatus checkStoreStatus() {
if (this.shutdown) { if (this.shutdown) {
log.warn("message store has shutdown, so putMessage is forbidden"); log.warn("message store has shutdown, so putMessage is forbidden");
return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null); return PutMessageStatus.SERVICE_NOT_AVAILABLE;
} }
if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) { if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
long value = this.printTimes.getAndIncrement(); long value = this.printTimes.getAndIncrement();
if ((value % 50000) == 0) { if ((value % 50000) == 0) {
log.warn("message store is slave mode, so putMessage is forbidden "); log.warn("message store has shutdown, so putMessage is forbidden");
} }
return PutMessageStatus.SERVICE_NOT_AVAILABLE;
return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
} }
if (!this.runningFlags.isWriteable()) { if (!this.runningFlags.isWriteable()) {
long value = this.printTimes.getAndIncrement(); long value = this.printTimes.getAndIncrement();
if ((value % 50000) == 0) { if ((value % 50000) == 0) {
log.warn("message store is not writeable, so putMessage is forbidden " + this.runningFlags.getFlagBits()); log.warn("message store has shutdown, so putMessage is forbidden");
} }
return PutMessageStatus.SERVICE_NOT_AVAILABLE;
return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
} else { } else {
this.printTimes.set(0); this.printTimes.set(0);
} }
if (msg.getTopic().length() > Byte.MAX_VALUE) { if (this.isOSPageCacheBusy()) {
log.warn("putMessage message topic length too long " + msg.getTopic().length()); return PutMessageStatus.OS_PAGECACHE_BUSY;
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
} }
return PutMessageStatus.PUT_OK;
}
if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) { @Override
log.warn("putMessage message properties length too long " + msg.getPropertiesString().length()); public CompletableFuture<PutMessageResult> asyncPutMessage(MessageExtBrokerInner msg) {
return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null); PutMessageStatus checkStoreStatus = this.checkStoreStatus();
if (checkStoreStatus != PutMessageStatus.PUT_OK) {
return CompletableFuture.completedFuture(new PutMessageResult(checkStoreStatus, null));
} }
if (this.isOSPageCacheBusy()) { PutMessageStatus msgCheckStatus = this.checkMessage(msg);
return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null); if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) {
return CompletableFuture.completedFuture(new PutMessageResult(msgCheckStatus, null));
} }
long beginTime = this.getSystemClock().now(); long beginTime = this.getSystemClock().now();
PutMessageResult result = this.commitLog.putMessage(msg); CompletableFuture<PutMessageResult> putResultFuture = this.commitLog.asyncPutMessage(msg);
long elapsedTime = this.getSystemClock().now() - beginTime; putResultFuture.thenAccept((result) -> {
if (elapsedTime > 500) { long elapsedTime = this.getSystemClock().now() - beginTime;
log.warn("putMessage not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length); if (elapsedTime > 500) {
} log.warn("putMessage not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length);
this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime); }
this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);
if (null == result || !result.isOk()) { if (null == result || !result.isOk()) {
this.storeStatsService.getPutMessageFailedTimes().incrementAndGet(); this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
} }
});
return result; return putResultFuture;
} }
public PutMessageResult putMessages(MessageExtBatch messageExtBatch) { public CompletableFuture<PutMessageResult> asyncPutMessages(MessageExtBatch messageExtBatch) {
if (this.shutdown) { PutMessageStatus checkStoreStatus = this.checkStoreStatus();
log.warn("DefaultMessageStore has shutdown, so putMessages is forbidden"); if (checkStoreStatus != PutMessageStatus.PUT_OK) {
return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null); return CompletableFuture.completedFuture(new PutMessageResult(checkStoreStatus, null));
} }
if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) { PutMessageStatus msgCheckStatus = this.checkMessages(messageExtBatch);
long value = this.printTimes.getAndIncrement(); if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) {
if ((value % 50000) == 0) { return CompletableFuture.completedFuture(new PutMessageResult(msgCheckStatus, null));
log.warn("DefaultMessageStore is in slave mode, so putMessages is forbidden "); }
long beginTime = this.getSystemClock().now();
CompletableFuture<PutMessageResult> resultFuture = this.commitLog.asyncPutMessages(messageExtBatch);
resultFuture.thenAccept((result) -> {
long elapsedTime = this.getSystemClock().now() - beginTime;
if (elapsedTime > 500) {
log.warn("not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, messageExtBatch.getBody().length);
} }
return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null); this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);
}
if (!this.runningFlags.isWriteable()) { if (null == result || !result.isOk()) {
long value = this.printTimes.getAndIncrement(); this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
if ((value % 50000) == 0) {
log.warn("DefaultMessageStore is not writable, so putMessages is forbidden " + this.runningFlags.getFlagBits());
} }
});
return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null); return resultFuture;
} else { }
this.printTimes.set(0);
@Override
public PutMessageResult putMessage(MessageExtBrokerInner msg) {
PutMessageStatus checkStoreStatus = this.checkStoreStatus();
if (checkStoreStatus != PutMessageStatus.PUT_OK) {
return new PutMessageResult(checkStoreStatus, null);
} }
if (messageExtBatch.getTopic().length() > Byte.MAX_VALUE) { PutMessageStatus msgCheckStatus = this.checkMessage(msg);
log.warn("PutMessages topic length too long " + messageExtBatch.getTopic().length()); if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) {
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null); return new PutMessageResult(msgCheckStatus, null);
} }
if (messageExtBatch.getBody().length > messageStoreConfig.getMaxMessageSize()) { long beginTime = this.getSystemClock().now();
log.warn("PutMessages body length too long " + messageExtBatch.getBody().length); PutMessageResult result = this.commitLog.putMessage(msg);
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null); long elapsedTime = this.getSystemClock().now() - beginTime;
if (elapsedTime > 500) {
log.warn("not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length);
} }
if (this.isOSPageCacheBusy()) { this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);
return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null);
if (null == result || !result.isOk()) {
this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
}
return result;
}
@Override
public PutMessageResult putMessages(MessageExtBatch messageExtBatch) {
PutMessageStatus checkStoreStatus = this.checkStoreStatus();
if (checkStoreStatus != PutMessageStatus.PUT_OK) {
return new PutMessageResult(checkStoreStatus, null);
}
PutMessageStatus msgCheckStatus = this.checkMessages(messageExtBatch);
if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) {
return new PutMessageResult(msgCheckStatus, null);
} }
long beginTime = this.getSystemClock().now(); long beginTime = this.getSystemClock().now();
PutMessageResult result = this.commitLog.putMessages(messageExtBatch); PutMessageResult result = this.commitLog.putMessages(messageExtBatch);
long elapsedTime = this.getSystemClock().now() - beginTime; long elapsedTime = this.getSystemClock().now() - beginTime;
if (elapsedTime > 500) { if (elapsedTime > 500) {
log.warn("not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, messageExtBatch.getBody().length); log.warn("not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, messageExtBatch.getBody().length);
} }
this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime); this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);
if (null == result || !result.isOk()) { if (null == result || !result.isOk()) {
......
...@@ -19,6 +19,8 @@ package org.apache.rocketmq.store; ...@@ -19,6 +19,8 @@ package org.apache.rocketmq.store;
import java.util.HashMap; import java.util.HashMap;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CompletableFuture;
import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageExtBatch; import org.apache.rocketmq.common.message.MessageExtBatch;
import org.apache.rocketmq.store.config.BrokerRole; import org.apache.rocketmq.store.config.BrokerRole;
...@@ -53,6 +55,26 @@ public interface MessageStore { ...@@ -53,6 +55,26 @@ public interface MessageStore {
*/ */
void destroy(); void destroy();
/** Store a message into store in async manner, the processor can process the next request
* rather than wait for result
* when result is completed, notify the client in async manner
*
* @param msg MessageInstance to store
* @return a CompletableFuture for the result of store operation
*/
default CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
return CompletableFuture.completedFuture(putMessage(msg));
}
/**
* Store a batch of messages in async manner
* @param messageExtBatch the message batch
* @return a CompletableFuture for the result of store operation
*/
default CompletableFuture<PutMessageResult> asyncPutMessages(final MessageExtBatch messageExtBatch) {
return CompletableFuture.completedFuture(putMessages(messageExtBatch));
}
/** /**
* Store a message into store. * Store a message into store.
* *
......
...@@ -30,6 +30,7 @@ import io.openmessaging.storage.dledger.store.file.SelectMmapBufferResult; ...@@ -30,6 +30,7 @@ import io.openmessaging.storage.dledger.store.file.SelectMmapBufferResult;
import io.openmessaging.storage.dledger.utils.DLedgerUtils; import io.openmessaging.storage.dledger.utils.DLedgerUtils;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.HashMap; import java.util.HashMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.MessageAccessor; import org.apache.rocketmq.common.message.MessageAccessor;
...@@ -489,6 +490,16 @@ public class DLedgerCommitLog extends CommitLog { ...@@ -489,6 +490,16 @@ public class DLedgerCommitLog extends CommitLog {
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null); return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
} }
@Override
public CompletableFuture<PutMessageResult> asyncPutMessage(MessageExtBrokerInner msg) {
return CompletableFuture.completedFuture(this.putMessage(msg));
}
@Override
public CompletableFuture<PutMessageResult> asyncPutMessages(MessageExtBatch messageExtBatch) {
return CompletableFuture.completedFuture(putMessages(messageExtBatch));
}
@Override @Override
public SelectMappedBufferResult getMessage(final long offset, final int size) { public SelectMappedBufferResult getMessage(final long offset, final int size) {
if (offset < dividedCommitlogOffset) { if (offset < dividedCommitlogOffset) {
......
...@@ -19,6 +19,8 @@ package org.apache.rocketmq.store; ...@@ -19,6 +19,8 @@ package org.apache.rocketmq.store;
import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.store.config.BrokerRole; import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.config.FlushDiskType; import org.apache.rocketmq.store.config.FlushDiskType;
import org.apache.rocketmq.store.config.MessageStoreConfig; import org.apache.rocketmq.store.config.MessageStoreConfig;
...@@ -34,12 +36,13 @@ import java.lang.reflect.Method; ...@@ -34,12 +36,13 @@ import java.lang.reflect.Method;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.util.Arrays;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.*;
import static org.junit.Assert.assertTrue;
/** /**
* HATest * HATest
...@@ -119,6 +122,37 @@ public class HATest { ...@@ -119,6 +122,37 @@ public class HATest {
} }
} }
@Test
public void testSemiSyncReplica() throws Exception {
long totalMsgs = 5;
QUEUE_TOTAL = 1;
MessageBody = StoreMessage.getBytes();
for (long i = 0; i < totalMsgs; i++) {
MessageExtBrokerInner msg = buildMessage();
CompletableFuture<PutMessageResult> putResultFuture = messageStore.asyncPutMessage(msg);
PutMessageResult result = putResultFuture.get();
assertEquals(PutMessageStatus.PUT_OK, result.getPutMessageStatus());
//message has been replicated to slave's commitLog, but maybe not dispatch to ConsumeQueue yet
//so direct read from commitLog by physical offset
MessageExt slaveMsg = slaveMessageStore.lookMessageByOffset(result.getAppendMessageResult().getWroteOffset());
assertNotNull(slaveMsg);
assertTrue(Arrays.equals(msg.getBody(), slaveMsg.getBody()));
assertEquals(msg.getTopic(), slaveMsg.getTopic());
assertEquals(msg.getTags(), slaveMsg.getTags());
assertEquals(msg.getKeys(), slaveMsg.getKeys());
}
//shutdown slave, putMessage should return FLUSH_SLAVE_TIMEOUT
slaveMessageStore.shutdown();
//wait to let master clean the slave's connection
Thread.sleep(masterMessageStoreConfig.getHaHousekeepingInterval() + 500);
for (long i = 0; i < totalMsgs; i++) {
CompletableFuture<PutMessageResult> putResultFuture = messageStore.asyncPutMessage(buildMessage());
PutMessageResult result = putResultFuture.get();
assertEquals(PutMessageStatus.SLAVE_NOT_AVAILABLE, result.getPutMessageStatus());
}
}
@After @After
public void destroy() throws Exception{ public void destroy() throws Exception{
Thread.sleep(5000L); Thread.sleep(5000L);
...@@ -156,6 +190,7 @@ public class HATest { ...@@ -156,6 +190,7 @@ public class HATest {
msg.setBornTimestamp(System.currentTimeMillis()); msg.setBornTimestamp(System.currentTimeMillis());
msg.setStoreHost(StoreHost); msg.setStoreHost(StoreHost);
msg.setBornHost(BornHost); msg.setBornHost(BornHost);
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
return msg; return msg;
} }
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.test.client.rmq;
import org.apache.log4j.Logger;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.test.clientinterface.AbstractMQProducer;
import org.apache.rocketmq.test.sendresult.ResultWrapper;
public class RMQTransactionalProducer extends AbstractMQProducer {
private static Logger logger = Logger.getLogger(RMQTransactionalProducer.class);
private TransactionMQProducer producer = null;
private String nsAddr = null;
public RMQTransactionalProducer(String nsAddr, String topic, TransactionListener transactionListener) {
this(nsAddr, topic, false, transactionListener);
}
public RMQTransactionalProducer(String nsAddr, String topic, boolean useTLS, TransactionListener transactionListener) {
super(topic);
this.nsAddr = nsAddr;
create(useTLS, transactionListener);
start();
}
protected void create(boolean useTLS, TransactionListener transactionListener) {
producer = new TransactionMQProducer();
producer.setProducerGroup(getProducerGroupName());
producer.setInstanceName(getProducerInstanceName());
producer.setTransactionListener(transactionListener);
producer.setUseTLS(useTLS);
if (nsAddr != null) {
producer.setNamesrvAddr(nsAddr);
}
}
public void start() {
try {
producer.start();
super.setStartSuccess(true);
} catch (MQClientException e) {
super.setStartSuccess(false);
logger.error(e);
e.printStackTrace();
}
}
@Override
public ResultWrapper send(Object msg, Object arg) {
boolean commitMsg = ((Pair<Boolean, LocalTransactionState>) arg).getObject2() == LocalTransactionState.COMMIT_MESSAGE;
org.apache.rocketmq.client.producer.SendResult metaqResult = null;
Message message = (Message) msg;
try {
long start = System.currentTimeMillis();
metaqResult = producer.sendMessageInTransaction(message, arg);
this.msgRTs.addData(System.currentTimeMillis() - start);
if (isDebug) {
logger.info(metaqResult);
}
sendResult.setMsgId(metaqResult.getMsgId());
sendResult.setSendResult(true);
sendResult.setBrokerIp(metaqResult.getMessageQueue().getBrokerName());
if (commitMsg) {
msgBodys.addData(new String(message.getBody()));
}
originMsgs.addData(msg);
originMsgIndex.put(new String(message.getBody()), metaqResult);
} catch (MQClientException e) {
if (isDebug) {
e.printStackTrace();
}
sendResult.setSendResult(false);
sendResult.setSendException(e);
errorMsgs.addData(msg);
}
return sendResult;
}
@Override
public void shutdown() {
producer.shutdown();
}
}
...@@ -22,12 +22,14 @@ import java.util.List; ...@@ -22,12 +22,14 @@ import java.util.List;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.MQVersion; import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.namesrv.NamesrvController; import org.apache.rocketmq.namesrv.NamesrvController;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.test.client.rmq.RMQAsyncSendProducer; import org.apache.rocketmq.test.client.rmq.RMQAsyncSendProducer;
import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer; import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer; import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
import org.apache.rocketmq.test.client.rmq.RMQTransactionalProducer;
import org.apache.rocketmq.test.clientinterface.AbstractMQConsumer; import org.apache.rocketmq.test.clientinterface.AbstractMQConsumer;
import org.apache.rocketmq.test.clientinterface.AbstractMQProducer; import org.apache.rocketmq.test.clientinterface.AbstractMQProducer;
import org.apache.rocketmq.test.factory.ConsumerFactory; import org.apache.rocketmq.test.factory.ConsumerFactory;
...@@ -96,6 +98,15 @@ public class BaseConf { ...@@ -96,6 +98,15 @@ public class BaseConf {
return producer; return producer;
} }
public static RMQTransactionalProducer getTransactionalProducer(String nsAddr, String topic, TransactionListener transactionListener) {
RMQTransactionalProducer producer = new RMQTransactionalProducer(nsAddr, topic, false, transactionListener);
if (debug) {
producer.setDebug();
}
mqClients.add(producer);
return producer;
}
public static RMQNormalProducer getProducer(String nsAddr, String topic, String producerGoup, public static RMQNormalProducer getProducer(String nsAddr, String topic, String producerGoup,
String instanceName) { String instanceName) {
RMQNormalProducer producer = new RMQNormalProducer(nsAddr, topic, producerGoup, RMQNormalProducer producer = new RMQNormalProducer(nsAddr, topic, producerGoup,
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.test.client.producer.transaction;
import org.apache.log4j.Logger;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.test.base.BaseConf;
import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
import org.apache.rocketmq.test.client.rmq.RMQTransactionalProducer;
import org.apache.rocketmq.test.factory.MQMessageFactory;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.test.util.MQWait;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import static com.google.common.truth.Truth.assertThat;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
public class TransactionalMsgIT extends BaseConf {
private static Logger logger = Logger.getLogger(TransactionalMsgIT.class);
private RMQTransactionalProducer producer = null;
private RMQNormalConsumer consumer = null;
private String topic = null;
@Before
public void setUp() {
topic = initTopic();
logger.info(String.format("use topic: %s;", topic));
producer = getTransactionalProducer(nsAddr, topic, new TransactionListenerImpl());
consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListener());
}
@After
public void tearDown() {
super.shutdown();
}
@Test
public void testMessageVisibility() throws Exception {
Thread.sleep(3000);
int msgSize = 120;
List<Object> msgs = MQMessageFactory.getMsg(topic, msgSize);
for (int i = 0; i < msgSize; i++) {
producer.send(msgs.get(i), getTransactionHandle(i));
}
boolean recvAll = MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(), consumer.getListener());
assertThat(recvAll).isEqualTo(true);
}
static Pair<Boolean, LocalTransactionState> getTransactionHandle(int msgIndex) {
switch (msgIndex % 5) {
case 0:
//commit immediately
return new Pair<>(true, LocalTransactionState.COMMIT_MESSAGE);
case 1:
//rollback immediately
return new Pair<>(true, LocalTransactionState.ROLLBACK_MESSAGE);
case 2:
//commit in check
return new Pair<>(false, LocalTransactionState.COMMIT_MESSAGE);
case 3:
//rollback in check
return new Pair<>(false, LocalTransactionState.ROLLBACK_MESSAGE);
case 4:
default:
return new Pair<>(false, LocalTransactionState.UNKNOW);
}
}
static private class TransactionListenerImpl implements TransactionListener {
ConcurrentHashMap<String, LocalTransactionState> checkStatus = new ConcurrentHashMap<>();
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
Pair<Boolean, LocalTransactionState> transactionHandle = (Pair<Boolean,LocalTransactionState>) arg;
if (transactionHandle.getObject1()) {
return transactionHandle.getObject2();
} else {
checkStatus.put(msg.getTransactionId(), transactionHandle.getObject2());
return LocalTransactionState.UNKNOW;
}
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
LocalTransactionState state = checkStatus.get(msg.getTransactionId());
if (state == null) {
return LocalTransactionState.UNKNOW;
} else {
return state;
}
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册