提交 b054a9d4 编写于 作者: D duheng 提交者: von gosling

[ISSUE #292] Add support of transactional message feature (#358)

上级 3ccd6f97
......@@ -16,21 +16,6 @@
*/
package org.apache.rocketmq.broker;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.broker.client.ClientHousekeepingService;
import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener;
import org.apache.rocketmq.broker.client.ConsumerManager;
......@@ -61,6 +46,13 @@ import org.apache.rocketmq.broker.processor.SendMessageProcessor;
import org.apache.rocketmq.broker.slave.SlaveSynchronize;
import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
import org.apache.rocketmq.broker.topic.TopicConfigManager;
import org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener;
import org.apache.rocketmq.broker.transaction.TransactionalMessageCheckService;
import org.apache.rocketmq.broker.transaction.TransactionalMessageService;
import org.apache.rocketmq.broker.transaction.queue.DefaultTransactionalMessageCheckListener;
import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageBridge;
import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl;
import org.apache.rocketmq.broker.util.ServiceProvider;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.Configuration;
import org.apache.rocketmq.common.DataVersion;
......@@ -69,12 +61,12 @@ import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.namesrv.RegisterBrokerResult;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.common.stats.MomentStatsItem;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.RemotingServer;
import org.apache.rocketmq.remoting.common.TlsMode;
......@@ -93,6 +85,22 @@ import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.stats.BrokerStats;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class BrokerController {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private static final InternalLogger LOG_PROTECTION = InternalLoggerFactory.getLogger(LoggerName.PROTECTION_LOGGER_NAME);
......@@ -142,6 +150,9 @@ public class BrokerController {
private BrokerFastFailure brokerFastFailure;
private Configuration configuration;
private FileWatchService fileWatchService;
private TransactionalMessageCheckService transactionalMessageCheckService;
private TransactionalMessageService transactionalMessageService;
private AbstractTransactionalMessageCheckListener transactionalMessageCheckListener;
public BrokerController(
final BrokerConfig brokerConfig,
......@@ -405,6 +416,7 @@ public class BrokerController {
},
new FileWatchService.Listener() {
boolean certChanged, keyChanged = false;
@Override
public void onChanged(String path) {
if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
......@@ -423,6 +435,7 @@ public class BrokerController {
reloadServerSslContext();
}
}
private void reloadServerSslContext() {
((NettyRemotingServer) remotingServer).loadSslContext();
((NettyRemotingServer) fastRemotingServer).loadSslContext();
......@@ -432,11 +445,26 @@ public class BrokerController {
log.warn("FileWatchService created error, can't load the certificate dynamically");
}
}
initialTransaction();
}
return result;
}
private void initialTransaction() {
this.transactionalMessageService = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_SERVICE_ID, TransactionalMessageService.class);
if (null == this.transactionalMessageService) {
this.transactionalMessageService = new TransactionalMessageServiceImpl(new TransactionalMessageBridge(this, this.getMessageStore()));
log.warn("Load default transaction message hook service: {}", TransactionalMessageServiceImpl.class.getSimpleName());
}
this.transactionalMessageCheckListener = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_LISTENER_ID, AbstractTransactionalMessageCheckListener.class);
if (null == this.transactionalMessageCheckListener) {
this.transactionalMessageCheckListener = new DefaultTransactionalMessageCheckListener();
log.warn("Load default discard message hook service: {}", DefaultTransactionalMessageCheckListener.class.getSimpleName());
}
this.transactionalMessageCheckListener.setBrokerController(this);
this.transactionalMessageCheckService = new TransactionalMessageCheckService(this);
}
public void registerProcessor() {
/**
* SendMessageProcessor
......@@ -539,8 +567,9 @@ public class BrokerController {
slowTimeMills = rt == null ? 0 : this.messageStore.now() - rt.getCreateTimestamp();
}
if (slowTimeMills < 0)
if (slowTimeMills < 0) {
slowTimeMills = 0;
}
return slowTimeMills;
}
......@@ -700,6 +729,10 @@ public class BrokerController {
if (this.fileWatchService != null) {
this.fileWatchService.shutdown();
}
if (this.transactionalMessageCheckService != null) {
this.transactionalMessageCheckService.shutdown(false);
}
}
private void unregisterBrokerAll() {
......@@ -768,6 +801,13 @@ public class BrokerController {
if (this.brokerFastFailure != null) {
this.brokerFastFailure.start();
}
if (BrokerRole.SLAVE != messageStoreConfig.getBrokerRole()) {
if (this.transactionalMessageCheckService != null) {
log.info("Start transaction service!");
this.transactionalMessageCheckService.start();
}
}
}
public synchronized void registerIncrementBrokerData(TopicConfig topicConfig, DataVersion dataVersion) {
......@@ -949,4 +989,30 @@ public class BrokerController {
public Configuration getConfiguration() {
return this.configuration;
}
public TransactionalMessageCheckService getTransactionalMessageCheckService() {
return transactionalMessageCheckService;
}
public void setTransactionalMessageCheckService(
TransactionalMessageCheckService transactionalMessageCheckService) {
this.transactionalMessageCheckService = transactionalMessageCheckService;
}
public TransactionalMessageService getTransactionalMessageService() {
return transactionalMessageService;
}
public void setTransactionalMessageService(TransactionalMessageService transactionalMessageService) {
this.transactionalMessageService = transactionalMessageService;
}
public AbstractTransactionalMessageCheckListener getTransactionalMessageCheckListener() {
return transactionalMessageCheckListener;
}
public void setTransactionalMessageCheckListener(
AbstractTransactionalMessageCheckListener transactionalMessageCheckListener) {
this.transactionalMessageCheckListener = transactionalMessageCheckListener;
}
}
......@@ -18,11 +18,6 @@ package org.apache.rocketmq.broker;
import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.joran.JoranConfigurator;
import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.InputStream;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
......@@ -45,6 +40,12 @@ import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.slf4j.LoggerFactory;
import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.InputStream;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.rocketmq.remoting.netty.TlsSystemConfig.TLS_ENABLE;
public class BrokerStartup {
......
......@@ -17,13 +17,18 @@
package org.apache.rocketmq.broker.client;
import io.netty.channel.Channel;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.rocketmq.broker.util.PositiveAtomicCounter;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
......@@ -34,10 +39,11 @@ public class ProducerManager {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private static final long LOCK_TIMEOUT_MILLIS = 3000;
private static final long CHANNEL_EXPIRED_TIMEOUT = 1000 * 120;
private static final int GET_AVALIABLE_CHANNEL_RETRY_COUNT = 3;
private final Lock groupChannelLock = new ReentrantLock();
private final HashMap<String /* group name */, HashMap<Channel, ClientChannelInfo>> groupChannelTable =
new HashMap<String, HashMap<Channel, ClientChannelInfo>>();
private PositiveAtomicCounter positiveAtomicCounter = new PositiveAtomicCounter();
public ProducerManager() {
}
......@@ -185,4 +191,33 @@ public class ProducerManager {
log.error("", e);
}
}
public Channel getAvaliableChannel(String groupId) {
HashMap<Channel, ClientChannelInfo> channelClientChannelInfoHashMap = groupChannelTable.get(groupId);
List<Channel> channelList = new ArrayList<Channel>();
if (channelClientChannelInfoHashMap != null) {
for (Channel channel : channelClientChannelInfoHashMap.keySet()) {
channelList.add(channel);
}
int size = channelList.size();
if (0 == size) {
log.warn("Channel list is empty. groupId={}", groupId);
return null;
}
int index = positiveAtomicCounter.incrementAndGet() % size;
Channel channel = channelList.get(index);
int count = 0;
boolean isOk = channel.isActive() && channel.isWritable();
while (isOk && count++ < GET_AVALIABLE_CHANNEL_RETRY_COUNT) {
index = (++index) % size;
channel = channelList.get(index);
return channel;
}
} else {
log.warn("Check transaction failed, channel table is empty. groupId={}", groupId);
return null;
}
return null;
}
}
......@@ -17,25 +17,15 @@
package org.apache.rocketmq.broker.client.net;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.FileRegion;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentMap;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.client.ClientChannelInfo;
import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
import org.apache.rocketmq.broker.pagecache.OneMessageTransfer;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.message.MessageQueueForC;
import org.apache.rocketmq.common.protocol.RequestCode;
......@@ -47,11 +37,19 @@ import org.apache.rocketmq.common.protocol.header.CheckTransactionStateRequestHe
import org.apache.rocketmq.common.protocol.header.GetConsumerStatusRequestHeader;
import org.apache.rocketmq.common.protocol.header.NotifyConsumerIdsChangedRequestHeader;
import org.apache.rocketmq.common.protocol.header.ResetOffsetRequestHeader;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.SelectMappedBufferResult;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentMap;
public class Broker2Client {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
......@@ -62,34 +60,22 @@ public class Broker2Client {
}
public void checkProducerTransactionState(
final String group,
final Channel channel,
final CheckTransactionStateRequestHeader requestHeader,
final SelectMappedBufferResult selectMappedBufferResult) {
final MessageExt messageExt) throws Exception {
RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.CHECK_TRANSACTION_STATE, requestHeader);
request.markOnewayRPC();
request.setBody(MessageDecoder.encode(messageExt, false));
try {
FileRegion fileRegion =
new OneMessageTransfer(request.encodeHeader(selectMappedBufferResult.getSize()),
selectMappedBufferResult);
channel.writeAndFlush(fileRegion).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
selectMappedBufferResult.release();
if (!future.isSuccess()) {
log.error("invokeProducer failed,", future.cause());
}
}
});
} catch (Throwable e) {
log.error("invokeProducer exception", e);
selectMappedBufferResult.release();
this.brokerController.getRemotingServer().invokeOneway(channel, request, 10);
} catch (Exception e) {
log.error("Check transaction failed because invoke producer exception. group={}, msgId={}", group, messageExt.getMsgId(), e.getMessage());
}
}
public RemotingCommand callClient(final Channel channel,
final RemotingCommand request
final RemotingCommand request
) throws RemotingSendRequestException, RemotingTimeoutException, InterruptedException {
return this.brokerController.getRemotingServer().invokeSync(channel, request, 10000);
}
......@@ -119,7 +105,7 @@ public class Broker2Client {
}
public RemotingCommand resetOffset(String topic, String group, long timeStamp, boolean isForce,
boolean isC) {
boolean isC) {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic);
......
......@@ -18,10 +18,9 @@ package org.apache.rocketmq.broker.processor;
import io.netty.channel.ChannelHandlerContext;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.transaction.OperationResult;
import org.apache.rocketmq.common.TopicFilterType;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
......@@ -29,14 +28,19 @@ import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.EndTransactionRequestHeader;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.MessageExtBrokerInner;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.config.BrokerRole;
/**
* EndTransaction processor: process commit and rollback message
*/
public class EndTransactionProcessor implements NettyRequestProcessor {
private static final InternalLogger LOGGER = InternalLoggerFactory.getLogger(LoggerName.TRANSACTION_LOGGER_NAME);
private final BrokerController brokerController;
......@@ -46,16 +50,22 @@ public class EndTransactionProcessor implements NettyRequestProcessor {
}
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws
RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final EndTransactionRequestHeader requestHeader =
(EndTransactionRequestHeader) request.decodeCommandCustomHeader(EndTransactionRequestHeader.class);
(EndTransactionRequestHeader)request.decodeCommandCustomHeader(EndTransactionRequestHeader.class);
LOGGER.info("Transaction request:{}", requestHeader);
if (BrokerRole.SLAVE == brokerController.getMessageStoreConfig().getBrokerRole()) {
response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);
LOGGER.warn("Message store is slave mode, so end transaction is forbidden. ");
return response;
}
if (requestHeader.getFromTransactionCheck()) {
switch (requestHeader.getCommitOrRollback()) {
case MessageSysFlag.TRANSACTION_NOT_TYPE: {
LOGGER.warn("check producer[{}] transaction state, but it's pending status."
LOGGER.warn("Check producer[{}] transaction state, but it's pending status."
+ "RequestHeader: {} Remark: {}",
RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
requestHeader.toString(),
......@@ -64,7 +74,7 @@ public class EndTransactionProcessor implements NettyRequestProcessor {
}
case MessageSysFlag.TRANSACTION_COMMIT_TYPE: {
LOGGER.warn("check producer[{}] transaction state, the producer commit the message."
LOGGER.warn("Check producer[{}] transaction state, the producer commit the message."
+ "RequestHeader: {} Remark: {}",
RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
requestHeader.toString(),
......@@ -74,7 +84,7 @@ public class EndTransactionProcessor implements NettyRequestProcessor {
}
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: {
LOGGER.warn("check producer[{}] transaction state, the producer rollback the message."
LOGGER.warn("Check producer[{}] transaction state, the producer rollback the message."
+ "RequestHeader: {} Remark: {}",
RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
requestHeader.toString(),
......@@ -87,7 +97,7 @@ public class EndTransactionProcessor implements NettyRequestProcessor {
} else {
switch (requestHeader.getCommitOrRollback()) {
case MessageSysFlag.TRANSACTION_NOT_TYPE: {
LOGGER.warn("the producer[{}] end transaction in sending message, and it's pending status."
LOGGER.warn("The producer[{}] end transaction in sending message, and it's pending status."
+ "RequestHeader: {} Remark: {}",
RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
requestHeader.toString(),
......@@ -100,7 +110,7 @@ public class EndTransactionProcessor implements NettyRequestProcessor {
}
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: {
LOGGER.warn("the producer[{}] end transaction in sending message, rollback the message."
LOGGER.warn("The producer[{}] end transaction in sending message, rollback the message."
+ "RequestHeader: {} Remark: {}",
RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
requestHeader.toString(),
......@@ -111,122 +121,145 @@ public class EndTransactionProcessor implements NettyRequestProcessor {
return null;
}
}
OperationResult result = new OperationResult();
if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {
result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);
if (result.getResponseCode() == ResponseCode.SUCCESS) {
RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
if (res.getCode() == ResponseCode.SUCCESS) {
MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());
msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));
msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());
RemotingCommand sendResult = sendFinalMessage(msgInner);
if (sendResult.getCode() == ResponseCode.SUCCESS) {
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
}
return sendResult;
}
return res;
}
} else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {
result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);
if (result.getResponseCode() == ResponseCode.SUCCESS) {
RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
if (res.getCode() == ResponseCode.SUCCESS) {
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
}
return res;
}
}
response.setCode(result.getResponseCode());
response.setRemark(result.getResponseRemark());
return response;
}
@Override
public boolean rejectRequest() {
return false;
}
final MessageExt msgExt = this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getCommitLogOffset());
private RemotingCommand checkPrepareMessage(MessageExt msgExt, EndTransactionRequestHeader requestHeader) {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
if (msgExt != null) {
final String pgroupRead = msgExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);
if (!pgroupRead.equals(requestHeader.getProducerGroup())) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("the producer group wrong");
response.setRemark("The producer group wrong");
return response;
}
if (msgExt.getQueueOffset() != requestHeader.getTranStateTableOffset()) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("the transaction state table offset wrong");
response.setRemark("The transaction state table offset wrong");
return response;
}
if (msgExt.getCommitLogOffset() != requestHeader.getCommitLogOffset()) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("the commit log offset wrong");
return response;
}
MessageExtBrokerInner msgInner = this.endMessageTransaction(msgExt);
msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));
msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
msgInner.setStoreTimestamp(msgExt.getStoreTimestamp());
if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {
msgInner.setBody(null);
}
final MessageStore messageStore = this.brokerController.getMessageStore();
final PutMessageResult putMessageResult = messageStore.putMessage(msgInner);
if (putMessageResult != null) {
switch (putMessageResult.getPutMessageStatus()) {
// Success
case PUT_OK:
case FLUSH_DISK_TIMEOUT:
case FLUSH_SLAVE_TIMEOUT:
case SLAVE_NOT_AVAILABLE:
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
break;
// Failed
case CREATE_MAPEDFILE_FAILED:
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("create mapped file failed.");
break;
case MESSAGE_ILLEGAL:
case PROPERTIES_SIZE_EXCEEDED:
response.setCode(ResponseCode.MESSAGE_ILLEGAL);
response.setRemark("the message is illegal, maybe msg body or properties length not matched. msg body length limit 128k, msg properties length limit 32k.");
break;
case SERVICE_NOT_AVAILABLE:
response.setCode(ResponseCode.SERVICE_NOT_AVAILABLE);
response.setRemark("service not available now.");
break;
case OS_PAGECACHE_BUSY:
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("OS page cache busy, please try another machine");
break;
case UNKNOWN_ERROR:
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("UNKNOWN_ERROR");
break;
default:
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("UNKNOWN_ERROR DEFAULT");
break;
}
response.setRemark("The commit log offset wrong");
return response;
} else {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("store putMessage return null");
}
} else {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("find prepared transaction message failed");
response.setRemark("Find prepared transaction message failed");
return response;
}
response.setCode(ResponseCode.SUCCESS);
return response;
}
@Override
public boolean rejectRequest() {
return false;
}
private MessageExtBrokerInner endMessageTransaction(MessageExt msgExt) {
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setTopic(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC));
msgInner.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID)));
msgInner.setBody(msgExt.getBody());
msgInner.setFlag(msgExt.getFlag());
MessageAccessor.setProperties(msgInner, msgExt.getProperties());
msgInner.setBornTimestamp(msgExt.getBornTimestamp());
msgInner.setBornHost(msgExt.getBornHost());
msgInner.setStoreHost(msgExt.getStoreHost());
msgInner.setReconsumeTimes(msgExt.getReconsumeTimes());
msgInner.setWaitStoreMsgOK(false);
msgInner.setTransactionId(msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
msgInner.setSysFlag(msgExt.getSysFlag());
TopicFilterType topicFilterType =
(msgInner.getSysFlag() & MessageSysFlag.MULTI_TAGS_FLAG) == MessageSysFlag.MULTI_TAGS_FLAG ? TopicFilterType.MULTI_TAG
: TopicFilterType.SINGLE_TAG;
long tagsCodeValue = MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, msgInner.getTags());
msgInner.setTagsCode(tagsCodeValue);
MessageAccessor.setProperties(msgInner, msgExt.getProperties());
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
msgInner.setSysFlag(msgExt.getSysFlag());
msgInner.setBornTimestamp(msgExt.getBornTimestamp());
msgInner.setBornHost(msgExt.getBornHost());
msgInner.setStoreHost(msgExt.getStoreHost());
msgInner.setReconsumeTimes(msgExt.getReconsumeTimes());
msgInner.setWaitStoreMsgOK(false);
MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
msgInner.setTopic(msgExt.getTopic());
msgInner.setQueueId(msgExt.getQueueId());
MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC);
MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID);
return msgInner;
}
private RemotingCommand sendFinalMessage(MessageExtBrokerInner msgInner) {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
if (putMessageResult != null) {
switch (putMessageResult.getPutMessageStatus()) {
// Success
case PUT_OK:
case FLUSH_DISK_TIMEOUT:
case FLUSH_SLAVE_TIMEOUT:
case SLAVE_NOT_AVAILABLE:
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
break;
// Failed
case CREATE_MAPEDFILE_FAILED:
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("Create mapped file failed.");
break;
case MESSAGE_ILLEGAL:
case PROPERTIES_SIZE_EXCEEDED:
response.setCode(ResponseCode.MESSAGE_ILLEGAL);
response.setRemark("The message is illegal, maybe msg body or properties length not matched. msg body length limit 128k, msg properties length limit 32k.");
break;
case SERVICE_NOT_AVAILABLE:
response.setCode(ResponseCode.SERVICE_NOT_AVAILABLE);
response.setRemark("Service not available now.");
break;
case OS_PAGECACHE_BUSY:
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("OS page cache busy, please try another machine");
break;
case UNKNOWN_ERROR:
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("UNKNOWN_ERROR");
break;
default:
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("UNKNOWN_ERROR DEFAULT");
break;
}
return response;
} else {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("store putMessage return null");
}
return response;
}
}
......@@ -17,8 +17,6 @@
package org.apache.rocketmq.broker.processor;
import io.netty.channel.ChannelHandlerContext;
import java.net.SocketAddress;
import java.util.List;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.mqtrace.ConsumeMessageContext;
import org.apache.rocketmq.broker.mqtrace.ConsumeMessageHook;
......@@ -51,6 +49,10 @@ import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.config.StorePathConfigHelper;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
import java.net.SocketAddress;
import java.util.List;
import java.util.Map;
public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {
private List<ConsumeMessageHook> consumeMessageHookList;
......@@ -61,7 +63,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
RemotingCommand request) throws RemotingCommandException {
SendMessageContext mqtraceContext;
switch (request.getCode()) {
case RequestCode.CONSUMER_SEND_MSG_BACK:
......@@ -97,7 +99,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final ConsumerSendMsgBackRequestHeader requestHeader =
(ConsumerSendMsgBackRequestHeader) request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class);
(ConsumerSendMsgBackRequestHeader)request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class);
if (this.hasConsumeMessageHook() && !UtilAll.isBlank(requestHeader.getOriginMsgId())) {
......@@ -247,8 +249,8 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
}
private boolean handleRetryAndDLQ(SendMessageRequestHeader requestHeader, RemotingCommand response,
RemotingCommand request,
MessageExt msg, TopicConfig topicConfig) {
RemotingCommand request,
MessageExt msg, TopicConfig topicConfig) {
String newTopic = requestHeader.getTopic();
if (null != newTopic && newTopic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
String groupName = newTopic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length());
......@@ -291,12 +293,12 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
}
private RemotingCommand sendMessage(final ChannelHandlerContext ctx,
final RemotingCommand request,
final SendMessageContext sendMessageContext,
final SendMessageRequestHeader requestHeader) throws RemotingCommandException {
final RemotingCommand request,
final SendMessageContext sendMessageContext,
final SendMessageRequestHeader requestHeader) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader) response.readCustomHeader();
final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();
response.setOpaque(request.getOpaque());
......@@ -343,27 +345,30 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
msgInner.setBornHost(ctx.channel().remoteAddress());
msgInner.setStoreHost(this.getStoreHost());
msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
String traFlag = msgInner.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (traFlag != null) {
PutMessageResult putMessageResult = null;
Map<String, String> oriProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());
String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (traFlag != null && Boolean.parseBoolean(traFlag)) {
if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark(
"the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending transaction message is forbidden");
"the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
+ "] sending transaction message is forbidden");
return response;
}
putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);
} else {
putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
}
PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
return handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt);
}
private RemotingCommand handlePutMessageResult(PutMessageResult putMessageResult, RemotingCommand response,
RemotingCommand request, MessageExt msg,
SendMessageResponseHeader responseHeader, SendMessageContext sendMessageContext, ChannelHandlerContext ctx,
int queueIdInt) {
RemotingCommand request, MessageExt msg,
SendMessageResponseHeader responseHeader, SendMessageContext sendMessageContext, ChannelHandlerContext ctx,
int queueIdInt) {
if (putMessageResult == null) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("store putMessage return null");
......@@ -443,7 +448,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
int commercialBaseCount = brokerController.getBrokerConfig().getCommercialBaseCount();
int wroteSize = putMessageResult.getAppendMessageResult().getWroteBytes();
int incValue = (int) Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT) * commercialBaseCount;
int incValue = (int)Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT) * commercialBaseCount;
sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_SUCCESS);
sendMessageContext.setCommercialSendTimes(incValue);
......@@ -454,7 +459,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
} else {
if (hasSendMessageHook()) {
int wroteSize = request.getBody().length;
int incValue = (int) Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT);
int incValue = (int)Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT);
sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_FAILURE);
sendMessageContext.setCommercialSendTimes(incValue);
......@@ -466,12 +471,12 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
}
private RemotingCommand sendBatchMessage(final ChannelHandlerContext ctx,
final RemotingCommand request,
final SendMessageContext sendMessageContext,
final SendMessageRequestHeader requestHeader) throws RemotingCommandException {
final RemotingCommand request,
final SendMessageContext sendMessageContext,
final SendMessageRequestHeader requestHeader) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader) response.readCustomHeader();
final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();
response.setOpaque(request.getOpaque());
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.broker.transaction;
import io.netty.channel.Channel;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.header.CheckTransactionStateRequestHeader;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public abstract class AbstractTransactionalMessageCheckListener {
private static final InternalLogger LOGGER = InternalLoggerFactory.getLogger(LoggerName.TRANSACTION_LOGGER_NAME);
private BrokerController brokerController;
private static ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("Transaction-msg-check-thread");
return thread;
}
});
public AbstractTransactionalMessageCheckListener() {
}
public AbstractTransactionalMessageCheckListener(BrokerController brokerController) {
this.brokerController = brokerController;
}
public void sendCheckMessage(MessageExt msgExt) throws Exception {
CheckTransactionStateRequestHeader checkTransactionStateRequestHeader = new CheckTransactionStateRequestHeader();
checkTransactionStateRequestHeader.setCommitLogOffset(msgExt.getCommitLogOffset());
checkTransactionStateRequestHeader.setOffsetMsgId(msgExt.getMsgId());
checkTransactionStateRequestHeader.setMsgId(msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
checkTransactionStateRequestHeader.setTransactionId(checkTransactionStateRequestHeader.getMsgId());
checkTransactionStateRequestHeader.setTranStateTableOffset(msgExt.getQueueOffset());
msgExt.setTopic(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC));
msgExt.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID)));
msgExt.setStoreSize(0);
String groupId = msgExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);
Channel channel = brokerController.getProducerManager().getAvaliableChannel(groupId);
if (channel != null) {
brokerController.getBroker2Client().checkProducerTransactionState(groupId, channel, checkTransactionStateRequestHeader, msgExt);
} else {
LOGGER.warn("Check transaction failed, channel is null. groupId={}", groupId);
}
}
public void resolveHalfMsg(final MessageExt msgExt) {
executorService.execute(new Runnable() {
@Override
public void run() {
try {
sendCheckMessage(msgExt);
} catch (Exception e) {
LOGGER.error("Send check message error!", e);
}
}
});
}
public BrokerController getBrokerController() {
return brokerController;
}
public void shutDown() {
executorService.shutdown();
}
/**
* Inject brokerController for this listener
*
* @param brokerController
*/
public void setBrokerController(BrokerController brokerController) {
this.brokerController = brokerController;
}
/**
* In order to avoid check back unlimited, we will discard the message that have been checked more than a certain
* number of times.
*
* @param msgExt Message to be discarded.
*/
public abstract void resolveDiscardMsg(MessageExt msgExt);
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.broker.transaction;
import org.apache.rocketmq.common.message.MessageExt;
public class OperationResult {
private MessageExt prepareMessage;
private int responseCode;
private String responseRemark;
public MessageExt getPrepareMessage() {
return prepareMessage;
}
public void setPrepareMessage(MessageExt prepareMessage) {
this.prepareMessage = prepareMessage;
}
public int getResponseCode() {
return responseCode;
}
public void setResponseCode(int responseCode) {
this.responseCode = responseCode;
}
public String getResponseRemark() {
return responseRemark;
}
public void setResponseRemark(String responseRemark) {
this.responseRemark = responseRemark;
}
}
......@@ -14,9 +14,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.broker.transaction;
/**
* This class will be removed in the version 4.4.0 and {@link OperationResult} class is recommended.
*/
@Deprecated
public class TransactionRecord {
// Commit Log Offset
private long offset;
......
......@@ -19,6 +19,10 @@ package org.apache.rocketmq.broker.transaction;
import java.util.List;
/**
* This class will be removed in ther version 4.4.0, and {@link TransactionalMessageService} class is recommended.
*/
@Deprecated
public interface TransactionStore {
boolean open();
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.broker.transaction;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import java.util.concurrent.atomic.AtomicBoolean;
public class TransactionalMessageCheckService extends ServiceThread {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.TRANSACTION_LOGGER_NAME);
private BrokerController brokerController;
private final AtomicBoolean started = new AtomicBoolean(false);
public TransactionalMessageCheckService(BrokerController brokerController) {
this.brokerController = brokerController;
}
@Override
public void start() {
if (started.compareAndSet(false, true)) {
super.start();
this.brokerController.getTransactionalMessageService().open();
}
}
@Override
public void shutdown(boolean interrupt) {
if (started.compareAndSet(true, false)) {
super.shutdown(interrupt);
this.brokerController.getTransactionalMessageService().close();
this.brokerController.getTransactionalMessageCheckListener().shutDown();
}
}
@Override
public String getServiceName() {
return TransactionalMessageCheckService.class.getSimpleName();
}
@Override
public void run() {
log.info("Start transaction check service thread!");
long checkInterval = brokerController.getBrokerConfig().getTransactionCheckInterval();
while (!this.isStopped()) {
this.waitForRunning(checkInterval);
}
log.info("End transaction check service thread!");
}
@Override
protected void onWaitEnd() {
long timeout = brokerController.getBrokerConfig().getTransactionTimeOut();
int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax();
long begin = System.currentTimeMillis();
log.info("Begin to check prepare message, begin time:{}", begin);
this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener());
log.info("End to check prepare message, consumed time:{}", System.currentTimeMillis() - begin);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.broker.transaction;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.header.EndTransactionRequestHeader;
import org.apache.rocketmq.store.MessageExtBrokerInner;
import org.apache.rocketmq.store.PutMessageResult;
public interface TransactionalMessageService {
/**
* Process prepare message, in common, we should put this message to storage service.
*
* @param messageInner Prepare(Half) message.
* @return Prepare message storage result.
*/
PutMessageResult prepareMessage(MessageExtBrokerInner messageInner);
/**
* Delete prepare message when this message has been committed or rolled back.
*
* @param messageExt
*/
boolean deletePrepareMessage(MessageExt messageExt);
/**
* Invoked to process commit prepare message.
*
* @param requestHeader Commit message request header.
* @return Operate result contains prepare message and relative error code.
*/
OperationResult commitMessage(EndTransactionRequestHeader requestHeader);
/**
* Invoked to roll back prepare message.
*
* @param requestHeader Prepare message request header.
* @return Operate result contains prepare message and relative error code.
*/
OperationResult rollbackMessage(EndTransactionRequestHeader requestHeader);
/**
* Traverse uncommitted/unroll back half message and send check back request to producer to obtain transaction
* status.
*
* @param transactionTimeout The minimum time of the transactional message to be checked firstly, one message only
* exceed this time interval that can be checked.
* @param transactionCheckMax The maximum number of times the message was checked, if exceed this value, this
* message will be discarded.
* @param listener When the message is considered to be checked or discarded, the relative method of this class will
* be invoked.
*/
void check(long transactionTimeout, int transactionCheckMax, AbstractTransactionalMessageCheckListener listener);
/**
* Open transaction service.
*
* @return If open success, return true.
*/
boolean open();
/**
* Close transaction service.
*/
void close();
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.broker.transaction.queue;
import org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
public class DefaultTransactionalMessageCheckListener extends AbstractTransactionalMessageCheckListener {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.TRANSACTION_LOGGER_NAME);
public DefaultTransactionalMessageCheckListener() {
super();
}
@Override
public void resolveDiscardMsg(MessageExt msgExt) {
log.error("MsgExt:{} has been checked too many times, so discard it", msgExt);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.broker.transaction.queue;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.common.message.MessageExt;
public class GetResult {
private MessageExt msg;
private PullResult pullResult;
public MessageExt getMsg() {
return msg;
}
public void setMsg(MessageExt msg) {
this.msg = msg;
}
public PullResult getPullResult() {
return pullResult;
}
public void setPullResult(PullResult pullResult) {
this.pullResult = pullResult;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.broker.transaction.queue;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.PullStatus;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageClientIDSetter;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.logging.InnerLoggerFactory;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.store.GetMessageResult;
import org.apache.rocketmq.store.MessageExtBrokerInner;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.PutMessageStatus;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
public class TransactionalMessageBridge {
private static final InternalLogger LOGGER = InnerLoggerFactory.getLogger(LoggerName.TRANSACTION_LOGGER_NAME);
private final ConcurrentHashMap<MessageQueue, MessageQueue> opQueueMap = new ConcurrentHashMap<>();
private final BrokerController brokerController;
private final MessageStore store;
private final SocketAddress storeHost;
public TransactionalMessageBridge(BrokerController brokerController, MessageStore store) {
try {
this.brokerController = brokerController;
this.store = store;
this.storeHost =
new InetSocketAddress(brokerController.getBrokerConfig().getBrokerIP1(),
brokerController.getNettyServerConfig().getListenPort());
} catch (Exception e) {
LOGGER.error("Init TransactionBridge error", e);
throw new RuntimeException(e);
}
}
public long fetchConsumeOffset(MessageQueue mq) {
long offset = brokerController.getConsumerOffsetManager().queryOffset(TransactionalMessageUtil.buildConsumerGroup(),
mq.getTopic(), mq.getQueueId());
if (offset == -1) {
offset = store.getMinOffsetInQueue(mq.getTopic(), mq.getQueueId());
}
return offset;
}
public Set<MessageQueue> fetchMessageQueues(String topic) {
Set<MessageQueue> mqSet = new HashSet<>();
TopicConfig topicConfig = selectTopicConfig(topic);
if (topicConfig != null && topicConfig.getReadQueueNums() > 0) {
for (int i = 0; i < topicConfig.getReadQueueNums(); i++) {
MessageQueue mq = new MessageQueue();
mq.setTopic(topic);
mq.setBrokerName(brokerController.getBrokerConfig().getBrokerName());
mq.setQueueId(i);
mqSet.add(mq);
}
}
return mqSet;
}
public void updateConsumeOffset(MessageQueue mq, long offset) {
this.brokerController.getConsumerOffsetManager().commitOffset(
RemotingHelper.parseSocketAddressAddr(this.storeHost), TransactionalMessageUtil.buildConsumerGroup(), mq.getTopic(),
mq.getQueueId(), offset);
}
public PullResult getHalfMessage(int queueId, long offset, int nums) {
String group = TransactionalMessageUtil.buildConsumerGroup();
String topic = TransactionalMessageUtil.buildHalfTopic();
SubscriptionData sub = new SubscriptionData(topic, "*");
return getMessage(group, topic, queueId, offset, nums, sub);
}
public PullResult getOpMessage(int queueId, long offset, int nums) {
String group = TransactionalMessageUtil.buildConsumerGroup();
String topic = TransactionalMessageUtil.buildOpTopic();
SubscriptionData sub = new SubscriptionData(topic, "*");
return getMessage(group, topic, queueId, offset, nums, sub);
}
private PullResult getMessage(String group, String topic, int queueId, long offset, int nums,
SubscriptionData sub) {
GetMessageResult getMessageResult = store.getMessage(group, topic, queueId, offset, nums, null);
if (getMessageResult != null) {
PullStatus pullStatus = PullStatus.NO_NEW_MSG;
List<MessageExt> foundList = null;
switch (getMessageResult.getStatus()) {
case FOUND:
pullStatus = PullStatus.FOUND;
foundList = decodeMsgList(getMessageResult);
this.brokerController.getBrokerStatsManager().incGroupGetNums(group, topic,
getMessageResult.getMessageCount());
this.brokerController.getBrokerStatsManager().incGroupGetSize(group, topic,
getMessageResult.getBufferTotalSize());
this.brokerController.getBrokerStatsManager().incBrokerGetNums(getMessageResult.getMessageCount());
this.brokerController.getBrokerStatsManager().recordDiskFallBehindTime(group, topic, queueId,
this.brokerController.getMessageStore().now() - foundList.get(foundList.size() - 1)
.getStoreTimestamp());
break;
case NO_MATCHED_MESSAGE:
pullStatus = PullStatus.NO_MATCHED_MSG;
LOGGER.warn("No matched message. GetMessageStatus={}, topic={}, groupId={}, requestOffset={}",
getMessageResult.getStatus(), topic, group, offset);
break;
case NO_MESSAGE_IN_QUEUE:
pullStatus = PullStatus.NO_NEW_MSG;
LOGGER.warn("No new message. GetMessageStatus={}, topic={}, groupId={}, requestOffset={}",
getMessageResult.getStatus(), topic, group, offset);
break;
case MESSAGE_WAS_REMOVING:
case NO_MATCHED_LOGIC_QUEUE:
case OFFSET_FOUND_NULL:
case OFFSET_OVERFLOW_BADLY:
case OFFSET_OVERFLOW_ONE:
case OFFSET_TOO_SMALL:
pullStatus = PullStatus.OFFSET_ILLEGAL;
LOGGER.warn("Offset illegal. GetMessageStatus={}, topic={}, groupId={}, requestOffset={}",
getMessageResult.getStatus(), topic, group, offset);
break;
default:
assert false;
break;
}
return new PullResult(pullStatus, getMessageResult.getNextBeginOffset(), getMessageResult.getMinOffset(),
getMessageResult.getMaxOffset(), foundList);
} else {
LOGGER.error("Get message from store return null. topic={}, groupId={}, requestOffset={}", topic, group,
offset);
return null;
}
}
private List<MessageExt> decodeMsgList(GetMessageResult getMessageResult) {
List<MessageExt> foundList = new ArrayList<>();
try {
List<ByteBuffer> messageBufferList = getMessageResult.getMessageBufferList();
for (ByteBuffer bb : messageBufferList) {
MessageExt msgExt = MessageDecoder.decode(bb);
foundList.add(msgExt);
}
} finally {
getMessageResult.release();
}
return foundList;
}
public PutMessageResult putHalfMessage(MessageExtBrokerInner messageInner) {
return store.putMessage(parseHalfMessageInner(messageInner));
}
private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
String.valueOf(msgInner.getQueueId()));
msgInner.setSysFlag(
MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
msgInner.setQueueId(0);
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
return msgInner;
}
public boolean putOpMessage(MessageExt messageExt, String opType) {
MessageQueue messageQueue = new MessageQueue(messageExt.getTopic(),
this.brokerController.getBrokerConfig().getBrokerName(), messageExt.getQueueId());
if (TransactionalMessageUtil.REMOVETAG.equals(opType)) {
return addRemoveTagInTransactionOp(messageExt, messageQueue);
}
return true;
}
public PutMessageResult putMessageReturnResult(MessageExtBrokerInner messageInner) {
LOGGER.debug("[BUG-TO-FIX] Thread:{} msgID:{}", Thread.currentThread().getName(), messageInner.getMsgId());
return store.putMessage(messageInner);
}
public boolean putMessage(MessageExtBrokerInner messageInner) {
PutMessageResult putMessageResult = store.putMessage(messageInner);
if (putMessageResult != null
&& putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
return true;
} else {
LOGGER.error("Put message failed, topic: {}, queueId: {}, msgId: {}",
messageInner.getTopic(), messageInner.getQueueId(), messageInner.getMsgId());
return false;
}
}
public MessageExtBrokerInner renewImmunityHalfMessageInner(MessageExt msgExt) {
MessageExtBrokerInner msgInner = renewHalfMessageInner(msgExt);
String queueOffsetFromPrepare = msgExt.getUserProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED_QUEUE_OFFSET);
if (null != queueOffsetFromPrepare) {
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PREPARED_QUEUE_OFFSET,
String.valueOf(queueOffsetFromPrepare));
} else {
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PREPARED_QUEUE_OFFSET,
String.valueOf(msgExt.getQueueOffset()));
}
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
return msgInner;
}
public MessageExtBrokerInner renewHalfMessageInner(MessageExt msgExt) {
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setTopic(msgExt.getTopic());
msgInner.setBody(msgExt.getBody());
msgInner.setQueueId(msgExt.getQueueId());
msgInner.setMsgId(msgExt.getMsgId());
msgInner.setSysFlag(msgExt.getSysFlag());
msgInner.setTags(msgExt.getTags());
msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(msgInner.getTags()));
MessageAccessor.setProperties(msgInner, msgExt.getProperties());
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
msgInner.setBornTimestamp(msgExt.getBornTimestamp());
msgInner.setBornHost(msgExt.getBornHost());
msgInner.setStoreHost(msgExt.getStoreHost());
msgInner.setWaitStoreMsgOK(false);
return msgInner;
}
private MessageExtBrokerInner makeOpMessageInner(Message message, MessageQueue messageQueue) {
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setTopic(message.getTopic());
msgInner.setBody(message.getBody());
msgInner.setQueueId(messageQueue.getQueueId());
msgInner.setTags(message.getTags());
msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(msgInner.getTags()));
msgInner.setSysFlag(0);
MessageAccessor.setProperties(msgInner, message.getProperties());
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(message.getProperties()));
msgInner.setBornTimestamp(System.currentTimeMillis());
msgInner.setBornHost(this.storeHost);
msgInner.setStoreHost(this.storeHost);
msgInner.setWaitStoreMsgOK(false);
MessageClientIDSetter.setUniqID(msgInner);
return msgInner;
}
private TopicConfig selectTopicConfig(String topic) {
TopicConfig topicConfig = brokerController.getTopicConfigManager().selectTopicConfig(topic);
if (topicConfig == null) {
topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
topic, 1, PermName.PERM_WRITE | PermName.PERM_READ, 0);
}
return topicConfig;
}
/**
* Use this function while transaction msg is committed or rollback write a flag 'd' to operation queue for the
* msg's offset
*
* @param messageExt Op message
* @param messageQueue Op message queue
* @return This method will always return true.
*/
private boolean addRemoveTagInTransactionOp(MessageExt messageExt, MessageQueue messageQueue) {
Message message = new Message(TransactionalMessageUtil.buildOpTopic(), TransactionalMessageUtil.REMOVETAG,
String.valueOf(messageExt.getQueueOffset()).getBytes(TransactionalMessageUtil.charset));
writeOp(message, messageQueue);
return true;
}
private void writeOp(Message message, MessageQueue mq) {
MessageQueue opQueue;
if (opQueueMap.containsKey(mq)) {
opQueue = opQueueMap.get(mq);
} else {
opQueue = getOpQueueByHalf(mq);
MessageQueue oldQueue = opQueueMap.putIfAbsent(mq, opQueue);
if (oldQueue != null) {
opQueue = oldQueue;
}
}
if (opQueue == null) {
opQueue = new MessageQueue(TransactionalMessageUtil.buildOpTopic(), mq.getBrokerName(), mq.getQueueId());
}
putMessage(makeOpMessageInner(message, opQueue));
}
private MessageQueue getOpQueueByHalf(MessageQueue halfMQ) {
MessageQueue opQueue = new MessageQueue();
opQueue.setTopic(TransactionalMessageUtil.buildOpTopic());
opQueue.setBrokerName(halfMQ.getBrokerName());
opQueue.setQueueId(halfMQ.getQueueId());
return opQueue;
}
public MessageExt lookMessageByOffset(final long commitLogOffset) {
return this.store.lookMessageByOffset(commitLogOffset);
}
public BrokerController getBrokerController() {
return brokerController;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.broker.transaction.queue;
import org.apache.rocketmq.common.MixAll;
import java.nio.charset.Charset;
public class TransactionalMessageUtil {
public static final String REMOVETAG = "d";
public static Charset charset = Charset.forName("utf-8");
public static String buildOpTopic() {
return MixAll.RMQ_SYS_TRANS_OP_HALF_TOPIC;
}
public static String buildHalfTopic() {
return MixAll.RMQ_SYS_TRANS_HALF_TOPIC;
}
public static String buildConsumerGroup() {
return MixAll.CID_SYS_RMQ_TRANS;
}
}
......@@ -14,10 +14,27 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.producer;
package org.apache.rocketmq.broker.util;
import org.apache.rocketmq.common.message.Message;
import java.util.concurrent.atomic.AtomicInteger;
public interface LocalTransactionExecuter {
LocalTransactionState executeLocalTransactionBranch(final Message msg, final Object arg);
public class PositiveAtomicCounter {
private static final int MASK = 0x7FFFFFFF;
private final AtomicInteger atom;
public PositiveAtomicCounter() {
atom = new AtomicInteger(0);
}
public final int incrementAndGet() {
final int rt = atom.incrementAndGet();
return rt & MASK;
}
public int intValue() {
return atom.intValue();
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to
* You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.rocketmq.broker.util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.List;
public class ServiceProvider {
private final static Logger LOG = LoggerFactory
.getLogger(ServiceProvider.class);
/**
* A reference to the classloader that loaded this class. It's more efficient to compute it once and cache it here.
*/
private static ClassLoader thisClassLoader;
/**
* JDK1.3+ <a href= "http://java.sun.com/j2se/1.3/docs/guide/jar/jar.html#Service%20Provider" > 'Service Provider' specification</a>.
*/
public static final String TRANSACTION_SERVICE_ID = "META-INF/service/org.apache.rocketmq.broker.transaction.TransactionalMessageService";
public static final String TRANSACTION_LISTENER_ID = "META-INF/service/org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener";
static {
thisClassLoader = getClassLoader(ServiceProvider.class);
}
/**
* Returns a string that uniquely identifies the specified object, including its class.
* <p>
* The returned string is of form "classname@hashcode", ie is the same as the return value of the Object.toString() method, but works even when the specified object's class has overidden the toString method.
*
* @param o may be null.
* @return a string of form classname@hashcode, or "null" if param o is null.
*/
protected static String objectId(Object o) {
if (o == null) {
return "null";
} else {
return o.getClass().getName() + "@" + System.identityHashCode(o);
}
}
protected static ClassLoader getClassLoader(Class<?> clazz) {
try {
return clazz.getClassLoader();
} catch (SecurityException e) {
LOG.error("Unable to get classloader for class {} due to security restrictions !",
clazz, e.getMessage());
throw e;
}
}
protected static ClassLoader getContextClassLoader() {
ClassLoader classLoader = null;
try {
classLoader = Thread.currentThread().getContextClassLoader();
} catch (SecurityException ex) {
/**
* The getContextClassLoader() method throws SecurityException when the context
* class loader isn't an ancestor of the calling class's class
* loader, or if security permissions are restricted.
*/
}
return classLoader;
}
protected static InputStream getResourceAsStream(ClassLoader loader, String name) {
if (loader != null) {
return loader.getResourceAsStream(name);
} else {
return ClassLoader.getSystemResourceAsStream(name);
}
}
public static <T> List<T> load(String name, Class<?> clazz) {
LOG.info("Looking for a resource file of name [{}] ...", name);
List<T> services = new ArrayList<T>();
try {
ArrayList<String> names = new ArrayList<String>();
final InputStream is = getResourceAsStream(getContextClassLoader(), name);
if (is != null) {
BufferedReader reader;
try {
reader = new BufferedReader(new InputStreamReader(is, "UTF-8"));
} catch (java.io.UnsupportedEncodingException e) {
reader = new BufferedReader(new InputStreamReader(is));
}
String serviceName = reader.readLine();
while (serviceName != null && !"".equals(serviceName)) {
LOG.info(
"Creating an instance as specified by file {} which was present in the path of the context classloader.",
name);
if (!names.contains(serviceName)) {
names.add(serviceName);
}
services.add((T)initService(getContextClassLoader(), serviceName, clazz));
serviceName = reader.readLine();
}
reader.close();
} else {
// is == null
LOG.warn("No resource file with name [{}] found.", name);
}
} catch (Exception e) {
LOG.error("Error occured when looking for resource file " + name, e);
}
return services;
}
public static <T> T loadClass(String name, Class<?> clazz) {
final InputStream is = getResourceAsStream(getContextClassLoader(), name);
BufferedReader reader;
try {
try {
reader = new BufferedReader(new InputStreamReader(is, "UTF-8"));
} catch (java.io.UnsupportedEncodingException e) {
reader = new BufferedReader(new InputStreamReader(is));
}
String serviceName = reader.readLine();
reader.close();
if (serviceName != null && !"".equals(serviceName)) {
return initService(getContextClassLoader(), serviceName, clazz);
} else {
LOG.warn("ServiceName is empty!");
return null;
}
} catch (Exception e) {
LOG.error("Error occured when looking for resource file " + name, e);
}
return null;
}
protected static <T> T initService(ClassLoader classLoader, String serviceName, Class<?> clazz) {
Class<?> serviceClazz = null;
try {
if (classLoader != null) {
try {
// Warning: must typecast here & allow exception to be generated/caught & recast properly
serviceClazz = classLoader.loadClass(serviceName);
if (clazz.isAssignableFrom(serviceClazz)) {
LOG.info("Loaded class {} from classloader {}", serviceClazz.getName(),
objectId(classLoader));
} else {
// This indicates a problem with the ClassLoader tree. An incompatible ClassLoader was used to load the implementation.
LOG.error(
"Class {} loaded from classloader {} does not extend {} as loaded by this classloader.",
new Object[] {serviceClazz.getName(),
objectId(serviceClazz.getClassLoader()), clazz.getName()});
}
return (T)serviceClazz.newInstance();
} catch (ClassNotFoundException ex) {
if (classLoader == thisClassLoader) {
// Nothing more to try, onwards.
LOG.warn("Unable to locate any class {} via classloader", serviceName,
objectId(classLoader));
throw ex;
}
// Ignore exception, continue
} catch (NoClassDefFoundError e) {
if (classLoader == thisClassLoader) {
// Nothing more to try, onwards.
LOG.warn(
"Class {} cannot be loaded via classloader {}.it depends on some other class that cannot be found.",
serviceClazz, objectId(classLoader));
throw e;
}
// Ignore exception, continue
}
}
} catch (Exception e) {
LOG.error("Unable to init service.", e);
}
return (T)serviceClazz;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.broker.processor;
import io.netty.channel.ChannelHandlerContext;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.transaction.OperationResult;
import org.apache.rocketmq.broker.transaction.TransactionalMessageService;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.EndTransactionRequestHeader;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.AppendMessageResult;
import org.apache.rocketmq.store.AppendMessageStatus;
import org.apache.rocketmq.store.MessageExtBrokerInner;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.PutMessageStatus;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Spy;
import org.mockito.junit.MockitoJUnitRunner;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class EndTransactionProcessorTest {
private EndTransactionProcessor endTransactionProcessor;
@Mock
private ChannelHandlerContext handlerContext;
@Spy
private BrokerController
brokerController = new BrokerController(new BrokerConfig(), new NettyServerConfig(), new NettyClientConfig(),
new MessageStoreConfig());
@Mock
private MessageStore messageStore;
@Mock
private TransactionalMessageService transactionMsgService;
@Before
public void init() {
brokerController.setMessageStore(messageStore);
brokerController.setTransactionalMessageService(transactionMsgService);
endTransactionProcessor = new EndTransactionProcessor(brokerController);
}
private OperationResult createResponse(int status){
OperationResult response = new OperationResult();
response.setPrepareMessage(createDefaultMessageExt());
response.setResponseCode(status);
response.setResponseRemark(null);
return response;
}
@Test
public void testProcessRequest() throws RemotingCommandException {
when(transactionMsgService.commitMessage(any(EndTransactionRequestHeader.class))).thenReturn(createResponse(ResponseCode.SUCCESS));
when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult
(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK)));
RemotingCommand request = createEndTransactionMsgCommand(MessageSysFlag.TRANSACTION_COMMIT_TYPE, false);
RemotingCommand response = endTransactionProcessor.processRequest(handlerContext, request);
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
}
@Test
public void testProcessRequest_CheckMessage() throws RemotingCommandException {
when(transactionMsgService.commitMessage(any(EndTransactionRequestHeader.class))).thenReturn(createResponse(ResponseCode.SUCCESS));
when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult
(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK)));
RemotingCommand request = createEndTransactionMsgCommand(MessageSysFlag.TRANSACTION_COMMIT_TYPE, true);
RemotingCommand response = endTransactionProcessor.processRequest(handlerContext, request);
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
}
@Test
public void testProcessRequest_NotType() throws RemotingCommandException {
RemotingCommand request = createEndTransactionMsgCommand(MessageSysFlag.TRANSACTION_NOT_TYPE, true);
RemotingCommand response = endTransactionProcessor.processRequest(handlerContext, request);
assertThat(response).isNull();
}
@Test
public void testProcessRequest_RollBack() throws RemotingCommandException {
when(transactionMsgService.rollbackMessage(any(EndTransactionRequestHeader.class))).thenReturn(createResponse(ResponseCode.SUCCESS));
RemotingCommand request = createEndTransactionMsgCommand(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE, true);
RemotingCommand response = endTransactionProcessor.processRequest(handlerContext, request);
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
}
private MessageExt createDefaultMessageExt() {
MessageExt messageExt = new MessageExt();
messageExt.setMsgId("12345678");
messageExt.setQueueId(0);
messageExt.setCommitLogOffset(123456789L);
messageExt.setQueueOffset(1234);
MessageAccessor.putProperty(messageExt, MessageConst.PROPERTY_REAL_QUEUE_ID, "0");
MessageAccessor.putProperty(messageExt, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
MessageAccessor.putProperty(messageExt, MessageConst.PROPERTY_PRODUCER_GROUP, "testTransactionGroup");
return messageExt;
}
private EndTransactionRequestHeader createEndTransactionRequestHeader(int status, boolean isCheckMsg) {
EndTransactionRequestHeader header = new EndTransactionRequestHeader();
header.setCommitLogOffset(123456789L);
header.setFromTransactionCheck(isCheckMsg);
header.setCommitOrRollback(status);
header.setMsgId("12345678");
header.setTransactionId("123");
header.setProducerGroup("testTransactionGroup");
header.setTranStateTableOffset(1234L);
return header;
}
private RemotingCommand createEndTransactionMsgCommand(int status, boolean isCheckMsg) {
EndTransactionRequestHeader header = createEndTransactionRequestHeader(status, isCheckMsg);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.END_TRANSACTION, header);
request.makeCustomHeaderToNet();
return request;
}
}
......@@ -18,19 +18,20 @@ package org.apache.rocketmq.broker.processor;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.mqtrace.SendMessageContext;
import org.apache.rocketmq.broker.mqtrace.SendMessageHook;
import org.apache.rocketmq.broker.transaction.TransactionalMessageService;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.ConsumerSendMsgBackRequestHeader;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
......@@ -51,6 +52,11 @@ import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
......@@ -68,6 +74,9 @@ public class SendMessageProcessorTest {
@Mock
private MessageStore messageStore;
@Mock
private TransactionalMessageService transactionMsgService;
private String topic = "FooBar";
private String group = "FooBarGroup";
......@@ -177,7 +186,42 @@ public class SendMessageProcessorTest {
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
}
private RemotingCommand createSendMsgCommand(int requestCode) {
@Test
public void testProcessRequest_Transaction() throws RemotingCommandException {
brokerController.setTransactionalMessageService(transactionMsgService);
when(brokerController.getTransactionalMessageService().prepareMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK)));
RemotingCommand request = createSendTransactionMsgCommand(RequestCode.SEND_MESSAGE);
final RemotingCommand[] response = new RemotingCommand[1];
doAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
response[0] = invocation.getArgument(0);
return null;
}
}).when(handlerContext).writeAndFlush(any(Object.class));
RemotingCommand responseToReturn = sendMessageProcessor.processRequest(handlerContext, request);
if (responseToReturn != null) {
assertThat(response[0]).isNull();
response[0] = responseToReturn;
}
assertThat(response[0].getCode()).isEqualTo(ResponseCode.SUCCESS);
}
private RemotingCommand createSendTransactionMsgCommand(int requestCode) {
SendMessageRequestHeader header = createSendMsgRequestHeader();
int sysFlag = header.getSysFlag();
Map<String, String> oriProps = MessageDecoder.string2messageProperties(header.getProperties());
oriProps.put(MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
header.setProperties(MessageDecoder.messageProperties2String(oriProps));
sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
header.setSysFlag(sysFlag);
RemotingCommand request = RemotingCommand.createRequestCommand(requestCode, header);
request.setBody(new byte[] {'a'});
request.makeCustomHeaderToNet();
return request;
}
private SendMessageRequestHeader createSendMsgRequestHeader() {
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
requestHeader.setProducerGroup(group);
requestHeader.setTopic(topic);
......@@ -188,6 +232,11 @@ public class SendMessageProcessorTest {
requestHeader.setBornTimestamp(System.currentTimeMillis());
requestHeader.setFlag(124);
requestHeader.setReconsumeTimes(0);
return requestHeader;
}
private RemotingCommand createSendMsgCommand(int requestCode) {
SendMessageRequestHeader requestHeader = createSendMsgRequestHeader();
RemotingCommand request = RemotingCommand.createRequestCommand(requestCode, requestHeader);
request.setBody(new byte[] {'a'});
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.broker.transaction.queue;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.store.MessageExtBrokerInner;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Spy;
import org.mockito.junit.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
public class DefaultTransactionalMessageCheckListenerTest {
private DefaultTransactionalMessageCheckListener listener;
@Spy
private BrokerController brokerController = new BrokerController(new BrokerConfig(), new NettyServerConfig(),
new NettyClientConfig(), new MessageStoreConfig());
@Before
public void init() {
listener = new DefaultTransactionalMessageCheckListener();
listener.setBrokerController(brokerController);
}
@Test
public void testResolveHalfMsg() {
listener.resolveHalfMsg(createMessageExt());
}
@Test
public void testSendCheckMessage() throws Exception{
MessageExt messageExt = createMessageExt();
listener.sendCheckMessage(messageExt);
}
@Test
public void sendCheckMessage(){
listener.resolveDiscardMsg(createMessageExt());
}
private MessageExtBrokerInner createMessageExt() {
MessageExtBrokerInner inner = new MessageExtBrokerInner();
MessageAccessor.putProperty(inner,MessageConst.PROPERTY_REAL_QUEUE_ID,"1");
MessageAccessor.putProperty(inner,MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX,"1234255");
MessageAccessor.putProperty(inner,MessageConst.PROPERTY_REAL_TOPIC,"realTopic");
inner.setTransactionId(inner.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
inner.setBody("check".getBytes());
inner.setMsgId("12344567890");
inner.setQueueId(0);
return inner;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.broker.transaction.queue;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.PullStatus;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.store.AppendMessageResult;
import org.apache.rocketmq.store.AppendMessageStatus;
import org.apache.rocketmq.store.GetMessageResult;
import org.apache.rocketmq.store.GetMessageStatus;
import org.apache.rocketmq.store.MessageExtBrokerInner;
import org.apache.rocketmq.store.MessageFilter;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.PutMessageStatus;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Spy;
import org.mockito.junit.MockitoJUnitRunner;
import java.util.Map;
import java.util.Set;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class TransactionalMessageBridgeTest {
private TransactionalMessageBridge transactionBridge;
@Spy
private BrokerController brokerController = new BrokerController(new BrokerConfig(), new NettyServerConfig(),
new NettyClientConfig(), new MessageStoreConfig());
@Mock
private MessageStore messageStore;
@Before
public void init() {
brokerController.setMessageStore(messageStore);
transactionBridge = new TransactionalMessageBridge(brokerController, messageStore);
}
@Test
public void testPutOpMessage() {
boolean isSuccess = transactionBridge.putOpMessage(createMessageBrokerInner(), TransactionalMessageUtil.REMOVETAG);
assertThat(isSuccess).isTrue();
}
@Test
public void testPutHalfMessage() {
when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult
(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK)));
PutMessageResult result = transactionBridge.putHalfMessage(createMessageBrokerInner());
assertThat(result.getPutMessageStatus()).isEqualTo(PutMessageStatus.PUT_OK);
}
@Test
public void testFetchMessageQueues() {
Set<MessageQueue> messageQueues = transactionBridge.fetchMessageQueues(MixAll.RMQ_SYS_TRANS_HALF_TOPIC);
assertThat(messageQueues.size()).isEqualTo(1);
}
@Test
public void testFetchConsumeOffset() {
MessageQueue mq = new MessageQueue(TransactionalMessageUtil.buildOpTopic(), this.brokerController.getBrokerConfig().getBrokerName(),
0);
long offset = transactionBridge.fetchConsumeOffset(mq);
assertThat(offset).isGreaterThan(-1);
}
@Test
public void updateConsumeOffset() {
MessageQueue mq = new MessageQueue(TransactionalMessageUtil.buildOpTopic(), this.brokerController.getBrokerConfig().getBrokerName(),
0);
transactionBridge.updateConsumeOffset(mq, 0);
}
@Test
public void testGetHalfMessage() {
when(messageStore.getMessage(anyString(), anyString(), anyInt(), anyLong(), anyInt(), ArgumentMatchers.nullable(MessageFilter.class))).thenReturn(createGetMessageResult(GetMessageStatus.NO_MESSAGE_IN_QUEUE));
PullResult result = transactionBridge.getHalfMessage(0, 0, 1);
assertThat(result.getPullStatus()).isEqualTo(PullStatus.NO_NEW_MSG);
}
@Test
public void testGetOpMessage() {
when(messageStore.getMessage(anyString(), anyString(), anyInt(), anyLong(), anyInt(), ArgumentMatchers.nullable(MessageFilter.class))).thenReturn(createGetMessageResult(GetMessageStatus.NO_MESSAGE_IN_QUEUE));
PullResult result = transactionBridge.getOpMessage(0, 0, 1);
assertThat(result.getPullStatus()).isEqualTo(PullStatus.NO_NEW_MSG);
}
@Test
public void testPutMessageReturnResult() {
when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult
(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK)));
PutMessageResult result = transactionBridge.putMessageReturnResult(createMessageBrokerInner());
assertThat(result.getPutMessageStatus()).isEqualTo(PutMessageStatus.PUT_OK);
}
@Test
public void testPutMessage() {
when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult
(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK)));
Boolean success = transactionBridge.putMessage(createMessageBrokerInner());
assertThat(success).isEqualTo(true);
}
@Test
public void testRenewImmunityHalfMessageInner() {
MessageExt messageExt = createMessageBrokerInner();
final String offset = "123456789";
MessageExtBrokerInner msgInner = transactionBridge.renewImmunityHalfMessageInner(messageExt);
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PREPARED_QUEUE_OFFSET,offset);
assertThat(msgInner).isNotNull();
Map<String,String> properties = msgInner.getProperties();
assertThat(properties).isNotNull();
String resOffset = properties.get(MessageConst.PROPERTY_TRANSACTION_PREPARED_QUEUE_OFFSET);
assertThat(resOffset).isEqualTo(offset);
}
@Test
public void testRenewHalfMessageInner() {
MessageExt messageExt = new MessageExt();
long bornTimeStamp = messageExt.getBornTimestamp();
MessageExt messageExtRes = transactionBridge.renewHalfMessageInner(messageExt);
assertThat( messageExtRes.getBornTimestamp()).isEqualTo(bornTimeStamp);
}
@Test
public void testLookMessageByOffset(){
when(messageStore.lookMessageByOffset(anyLong())).thenReturn(new MessageExt());
MessageExt messageExt = transactionBridge.lookMessageByOffset(123);
assertThat(messageExt).isNotNull();
}
private GetMessageResult createGetMessageResult(GetMessageStatus status) {
GetMessageResult getMessageResult = new GetMessageResult();
getMessageResult.setStatus(status);
getMessageResult.setMinOffset(100);
getMessageResult.setMaxOffset(1024);
getMessageResult.setNextBeginOffset(516);
return getMessageResult;
}
private MessageExtBrokerInner createMessageBrokerInner() {
MessageExtBrokerInner inner = new MessageExtBrokerInner();
inner.setTransactionId("12342123444");
inner.setBornTimestamp(System.currentTimeMillis());
inner.setBody("prepare".getBytes());
inner.setMsgId("123456-123");
inner.setQueueId(0);
inner.setTopic("hello");
return inner;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.broker.transaction.queue;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener;
import org.apache.rocketmq.broker.transaction.OperationResult;
import org.apache.rocketmq.broker.transaction.TransactionalMessageService;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.PullStatus;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.EndTransactionRequestHeader;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.store.AppendMessageResult;
import org.apache.rocketmq.store.AppendMessageStatus;
import org.apache.rocketmq.store.MessageExtBrokerInner;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.PutMessageStatus;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Spy;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class TransactionalMessageServiceImplTest {
private TransactionalMessageService queueTransactionMsgService;
@Mock
private TransactionalMessageBridge bridge;
@Spy
private BrokerController brokerController = new BrokerController(new BrokerConfig(), new NettyServerConfig(),
new NettyClientConfig(), new MessageStoreConfig());
@Mock
private AbstractTransactionalMessageCheckListener listener;
@Before
public void init() {
listener.setBrokerController(brokerController);
queueTransactionMsgService = new TransactionalMessageServiceImpl(bridge);
brokerController.getMessageStoreConfig().setFileReservedTime(3);
}
@Test
public void testPrepareMessage() {
MessageExtBrokerInner inner = createMessageBrokerInner();
when(bridge.putHalfMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult
(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK)));
PutMessageResult result = queueTransactionMsgService.prepareMessage(inner);
assert result.isOk();
}
@Test
public void testCommitMessage() {
when(bridge.lookMessageByOffset(anyLong())).thenReturn(createMessageBrokerInner());
OperationResult result = queueTransactionMsgService.commitMessage(createEndTransactionRequestHeader(MessageSysFlag.TRANSACTION_COMMIT_TYPE));
assertThat(result.getResponseCode()).isEqualTo(ResponseCode.SUCCESS);
}
@Test
public void testRollbackMessage() {
when(bridge.lookMessageByOffset(anyLong())).thenReturn(createMessageBrokerInner());
OperationResult result = queueTransactionMsgService.commitMessage(createEndTransactionRequestHeader(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE));
assertThat(result.getResponseCode()).isEqualTo(ResponseCode.SUCCESS);
}
@Test
public void testCheck_withDiscard() {
when(bridge.fetchMessageQueues(MixAll.RMQ_SYS_TRANS_HALF_TOPIC)).thenReturn(createMessageQueueSet(MixAll.RMQ_SYS_TRANS_HALF_TOPIC));
when(bridge.getHalfMessage(0, 0, 1)).thenReturn(createDiscardPullResult(MixAll.RMQ_SYS_TRANS_HALF_TOPIC, 5, "hellp", 1));
when(bridge.getHalfMessage(0, 1, 1)).thenReturn(createPullResult(MixAll.RMQ_SYS_TRANS_HALF_TOPIC, 6, "hellp", 0));
when(bridge.getOpMessage(anyInt(), anyLong(), anyInt())).thenReturn(createOpPulResult(MixAll.RMQ_SYS_TRANS_OP_HALF_TOPIC, 1, "10", 1));
long timeOut = this.brokerController.getBrokerConfig().getTransactionTimeOut();
int checkMax = this.brokerController.getBrokerConfig().getTransactionCheckMax();
final AtomicInteger checkMessage = new AtomicInteger(0);
doAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock invocation) {
checkMessage.addAndGet(1);
return null;
}
}).when(listener).resolveDiscardMsg(any(MessageExt.class));
queueTransactionMsgService.check(timeOut, checkMax, listener);
assertThat(checkMessage.get()).isEqualTo(1);
}
@Test
public void testCheck_withCheck() {
when(bridge.fetchMessageQueues(MixAll.RMQ_SYS_TRANS_HALF_TOPIC)).thenReturn(createMessageQueueSet(MixAll.RMQ_SYS_TRANS_HALF_TOPIC));
when(bridge.getHalfMessage(0, 0, 1)).thenReturn(createPullResult(MixAll.RMQ_SYS_TRANS_HALF_TOPIC, 5, "hello", 1));
when(bridge.getHalfMessage(0, 1, 1)).thenReturn(createPullResult(MixAll.RMQ_SYS_TRANS_HALF_TOPIC, 6, "hellp", 0));
when(bridge.getOpMessage(anyInt(), anyLong(), anyInt())).thenReturn(createPullResult(MixAll.RMQ_SYS_TRANS_OP_HALF_TOPIC, 1, "5", 0));
when(bridge.getBrokerController()).thenReturn(this.brokerController);
when(bridge.renewHalfMessageInner(any(MessageExtBrokerInner.class))).thenReturn(createMessageBrokerInner());
when(bridge.putMessageReturnResult(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult
(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK)));
long timeOut = this.brokerController.getBrokerConfig().getTransactionTimeOut();
final int checkMax = this.brokerController.getBrokerConfig().getTransactionCheckMax();
final AtomicInteger checkMessage = new AtomicInteger(0);
doAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock invocation) {
checkMessage.addAndGet(1);
return checkMessage;
}
}).when(listener).resolveHalfMsg(any(MessageExt.class));
queueTransactionMsgService.check(timeOut, checkMax, listener);
assertThat(checkMessage.get()).isEqualTo(1);
}
@Test
public void testDeletePrepareMessage() {
when(bridge.putOpMessage(any(MessageExt.class), anyString())).thenReturn(true);
boolean res = queueTransactionMsgService.deletePrepareMessage(createMessageBrokerInner());
assertThat(res).isTrue();
}
@Test
public void testOpen() {
boolean isOpen = queueTransactionMsgService.open();
assertThat(isOpen).isTrue();
}
private PullResult createDiscardPullResult(String topic, long queueOffset, String body, int size) {
PullResult result = createPullResult(topic, queueOffset, body, size);
List<MessageExt> msgs = result.getMsgFoundList();
for (MessageExt msg : msgs) {
msg.putUserProperty(MessageConst.PROPERTY_TRANSACTION_CHECK_TIMES, "100000");
}
return result;
}
private PullResult createPullResult(String topic, long queueOffset, String body, int size) {
PullResult result = null;
if (0 == size) {
result = new PullResult(PullStatus.NO_NEW_MSG, 1, 0, 1,
null);
} else {
result = new PullResult(PullStatus.FOUND, 1, 0, 1,
getMessageList(queueOffset, topic, body, 1));
return result;
}
return result;
}
private PullResult createOpPulResult(String topic, long queueOffset, String body, int size) {
PullResult result = createPullResult(topic, queueOffset, body, size);
List<MessageExt> msgs = result.getMsgFoundList();
for (MessageExt msg : msgs) {
msg.setTags(TransactionalMessageUtil.REMOVETAG);
}
return result;
}
private PullResult createImmunityPulResult(String topic, long queueOffset, String body, int size) {
PullResult result = createPullResult(topic, queueOffset, body, size);
List<MessageExt> msgs = result.getMsgFoundList();
for (MessageExt msg : msgs) {
msg.putUserProperty(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS, "0");
}
return result;
}
private List<MessageExt> getMessageList(long queueOffset, String topic, String body, int size) {
List<MessageExt> msgs = new ArrayList<>();
for (int i = 0; i < size; i++) {
MessageExt messageExt = createMessageBrokerInner(queueOffset, topic, body);
msgs.add(messageExt);
}
return msgs;
}
private Set<MessageQueue> createMessageQueueSet(String topic) {
Set<MessageQueue> messageQueues = new HashSet<>();
MessageQueue messageQueue = new MessageQueue(topic, "DefaultCluster", 0);
messageQueues.add(messageQueue);
return messageQueues;
}
private EndTransactionRequestHeader createEndTransactionRequestHeader(int status) {
EndTransactionRequestHeader header = new EndTransactionRequestHeader();
header.setCommitLogOffset(123456789L);
header.setCommitOrRollback(status);
header.setMsgId("12345678");
header.setTransactionId("123");
header.setProducerGroup("testTransactionGroup");
header.setTranStateTableOffset(1234L);
return header;
}
private MessageExtBrokerInner createMessageBrokerInner(long queueOffset, String topic, String body) {
MessageExtBrokerInner inner = new MessageExtBrokerInner();
inner.setBornTimestamp(System.currentTimeMillis() - 80000);
inner.setTransactionId("123456123");
inner.setTopic(topic);
inner.setQueueOffset(queueOffset);
inner.setBody(body.getBytes());
inner.setMsgId("123456123");
inner.setQueueId(0);
inner.setTopic("hello");
return inner;
}
private MessageExtBrokerInner createMessageBrokerInner() {
return createMessageBrokerInner(1, "testTopic", "hello world");
}
}
......@@ -14,10 +14,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.producer;
package org.apache.rocketmq.broker.util;
import org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener;
import org.apache.rocketmq.common.message.MessageExt;
public interface TransactionCheckListener {
LocalTransactionState checkLocalTransactionState(final MessageExt msg);
public class LogTransactionalMessageCheckListener extends AbstractTransactionalMessageCheckListener {
@Override
public void resolveDiscardMsg(MessageExt msgExt) {
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.broker.util;
import org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener;
import org.apache.rocketmq.broker.transaction.TransactionalMessageService;
import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat;
public class ServiceProviderTest {
@Test
public void loadTransactionMsgServiceTest() {
TransactionalMessageService transactionService = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_SERVICE_ID,
TransactionalMessageService.class);
assertThat(transactionService).isNotNull();
}
@Test
public void loadAbstractTransactionListenerTest() {
AbstractTransactionalMessageCheckListener listener = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_LISTENER_ID,
AbstractTransactionalMessageCheckListener.class);
assertThat(listener).isNotNull();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.broker.util;
import org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener;
import org.apache.rocketmq.broker.transaction.OperationResult;
import org.apache.rocketmq.broker.transaction.TransactionalMessageService;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.header.EndTransactionRequestHeader;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.store.MessageExtBrokerInner;
import org.apache.rocketmq.store.PutMessageResult;
public class TransactionalMessageServiceImpl implements TransactionalMessageService {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.TRANSACTION_LOGGER_NAME);
@Override
public PutMessageResult prepareMessage(MessageExtBrokerInner messageInner) {
return null;
}
@Override
public boolean deletePrepareMessage(MessageExt messageExt) {
return false;
}
@Override
public OperationResult commitMessage(EndTransactionRequestHeader requestHeader) {
return null;
}
@Override
public OperationResult rollbackMessage(EndTransactionRequestHeader requestHeader) {
return null;
}
@Override
public void check(long transactionTimeout, int transactionCheckMax, AbstractTransactionalMessageCheckListener listener) {
log.warn("check check!");
}
@Override
public boolean open() {
return true;
}
@Override
public void close() {
}
}
org.apache.rocketmq.broker.util.LogTransactionalMessageCheckListener
\ No newline at end of file
org.apache.rocketmq.broker.util.TransactionalMessageServiceImpl
\ No newline at end of file
......@@ -91,6 +91,10 @@ public class ClientRemotingProcessor implements NettyRequestProcessor {
final ByteBuffer byteBuffer = ByteBuffer.wrap(request.getBody());
final MessageExt messageExt = MessageDecoder.decode(byteBuffer);
if (messageExt != null) {
String transactionId = messageExt.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (null != transactionId && !"".equals(transactionId)) {
messageExt.setTransactionId(transactionId);
}
final String group = messageExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);
if (group != null) {
MQProducerInner producer = this.mqClientFactory.selectProducer(group);
......
......@@ -96,6 +96,10 @@ public class PullAPIWrapper {
}
for (MessageExt msg : msgListFilterAgain) {
String traFlag = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (traFlag != null && Boolean.parseBoolean(traFlag)) {
msg.setTransactionId(msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
}
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MIN_OFFSET,
Long.toString(pullResult.getMinOffset()));
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MAX_OFFSET,
......
......@@ -16,33 +16,6 @@
*/
package org.apache.rocketmq.client.impl.producer;
import java.io.IOException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageClientIDSetter;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageBatch;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageType;
import org.apache.rocketmq.common.message.MessageId;
import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.Validators;
import org.apache.rocketmq.client.common.ClientErrorCode;
......@@ -58,30 +31,56 @@ import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.latency.MQFaultStrategy;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.LocalTransactionExecuter;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.client.producer.TransactionCheckListener;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ServiceState;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.help.FAQUrl;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageBatch;
import org.apache.rocketmq.common.message.MessageClientIDSetter;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageId;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.message.MessageType;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.CheckTransactionStateRequestHeader;
import org.apache.rocketmq.common.protocol.header.EndTransactionRequestHeader;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import java.io.IOException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class DefaultMQProducerImpl implements MQProducerInner {
private final InternalLogger log = ClientLogger.getLog();
private final Random random = new Random();
......@@ -116,13 +115,17 @@ public class DefaultMQProducerImpl implements MQProducerInner {
public void initTransactionEnv() {
TransactionMQProducer producer = (TransactionMQProducer) this.defaultMQProducer;
this.checkRequestQueue = new LinkedBlockingQueue<Runnable>(producer.getCheckRequestHoldMax());
this.checkExecutor = new ThreadPoolExecutor(
producer.getCheckThreadPoolMinSize(),
producer.getCheckThreadPoolMaxSize(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.checkRequestQueue);
if (producer.getExecutorService() != null) {
this.checkExecutor = producer.getExecutorService();
} else {
this.checkRequestQueue = new LinkedBlockingQueue<Runnable>(2000);
this.checkExecutor = new ThreadPoolExecutor(
1,
1,
1000 * 60,
TimeUnit.MILLISECONDS,
this.checkRequestQueue);
}
}
public void destroyTransactionEnv() {
......@@ -239,10 +242,10 @@ public class DefaultMQProducerImpl implements MQProducerInner {
}
@Override
public TransactionCheckListener checkListener() {
public TransactionListener checkListener() {
if (this.defaultMQProducer instanceof TransactionMQProducer) {
TransactionMQProducer producer = (TransactionMQProducer) defaultMQProducer;
return producer.getTransactionCheckListener();
return producer.getTransactionListener();
}
return null;
......@@ -259,12 +262,12 @@ public class DefaultMQProducerImpl implements MQProducerInner {
@Override
public void run() {
TransactionCheckListener transactionCheckListener = DefaultMQProducerImpl.this.checkListener();
TransactionListener transactionCheckListener = DefaultMQProducerImpl.this.checkListener();
if (transactionCheckListener != null) {
LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable exception = null;
try {
localTransactionState = transactionCheckListener.checkLocalTransactionState(message);
localTransactionState = transactionCheckListener.checkLocalTransaction(message);
} catch (Throwable e) {
log.error("Broker call checkTransactionState, but checkLocalTransactionState exception", e);
exception = e;
......@@ -962,7 +965,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
}
public TransactionSendResult sendMessageInTransaction(final Message msg,
final LocalTransactionExecuter tranExecuter, final Object arg)
final TransactionListener tranExecuter, final Object arg)
throws MQClientException {
if (null == tranExecuter) {
throw new MQClientException("tranExecutor is null", null);
......@@ -986,7 +989,11 @@ public class DefaultMQProducerImpl implements MQProducerInner {
if (sendResult.getTransactionId() != null) {
msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
}
localTransactionState = tranExecuter.executeLocalTransactionBranch(msg, arg);
String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (null != transactionId && !"".equals(transactionId)) {
msg.setTransactionId(transactionId);
}
localTransactionState = tranExecuter.executeLocalTransaction(msg, arg);
if (null == localTransactionState) {
localTransactionState = LocalTransactionState.UNKNOW;
}
......
......@@ -16,17 +16,18 @@
*/
package org.apache.rocketmq.client.impl.producer;
import java.util.Set;
import org.apache.rocketmq.client.producer.TransactionCheckListener;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.header.CheckTransactionStateRequestHeader;
import java.util.Set;
public interface MQProducerInner {
Set<String> getPublishTopicList();
boolean isPublishTopicNeedUpdate(final String topic);
TransactionCheckListener checkListener();
TransactionListener checkListener();
void checkTransactionState(
final String addr,
......
......@@ -464,14 +464,12 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
* This method is to send transactional messages.
*
* @param msg Transactional message to send.
* @param tranExecuter local transaction executor.
* @param arg Argument used along with local transaction executor.
* @return Transaction result.
* @throws MQClientException if there is any client error.
*/
@Override
public TransactionSendResult sendMessageInTransaction(Message msg, LocalTransactionExecuter tranExecuter,
final Object arg)
public TransactionSendResult sendMessageInTransaction(Message msg, final Object arg)
throws MQClientException {
throw new RuntimeException("sendMessageInTransaction not implement, please use TransactionMQProducer class");
}
......
......@@ -80,8 +80,7 @@ public interface MQProducer extends MQAdmin {
void sendOneway(final Message msg, final MessageQueueSelector selector, final Object arg)
throws MQClientException, RemotingException, InterruptedException;
TransactionSendResult sendMessageInTransaction(final Message msg,
final LocalTransactionExecuter tranExecuter, final Object arg) throws MQClientException;
TransactionSendResult sendMessageInTransaction(final Message msg, final Object arg) throws MQClientException;
//for batch
SendResult send(final Collection<Message> msgs) throws MQClientException, RemotingException, MQBrokerException,
......
......@@ -14,28 +14,27 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.example.transaction;
package org.apache.rocketmq.client.producer;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.client.producer.LocalTransactionExecuter;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
public class TransactionExecuterImpl implements LocalTransactionExecuter {
private AtomicInteger transactionIndex = new AtomicInteger(1);
public interface TransactionListener {
/**
* When send transactional prepare(half) message succeed, this method will be invoked to execute local transaction.
*
* @param msg Half(prepare) message
* @param arg Custom business parameter
* @return Transaction state
*/
LocalTransactionState executeLocalTransaction(final Message msg, final Object arg);
@Override
public LocalTransactionState executeLocalTransactionBranch(final Message msg, final Object arg) {
int value = transactionIndex.getAndIncrement();
if (value == 0) {
throw new RuntimeException("Could not find db");
} else if ((value % 5) == 0) {
return LocalTransactionState.ROLLBACK_MESSAGE;
} else if ((value % 4) == 0) {
return LocalTransactionState.COMMIT_MESSAGE;
}
return LocalTransactionState.UNKNOW;
}
/**
* When no response to prepare(half) message. broker will send check message to check the transaction status, and this
* method will be invoked to get local transaction status.
*
* @param msg Check message
* @return Transaction state
*/
LocalTransactionState checkLocalTransaction(final MessageExt msg);
}
......@@ -20,11 +20,12 @@ import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import java.util.concurrent.ExecutorService;
public class TransactionMQProducer extends DefaultMQProducer {
private TransactionCheckListener transactionCheckListener;
private int checkThreadPoolMinSize = 1;
private int checkThreadPoolMaxSize = 1;
private int checkRequestHoldMax = 2000;
private TransactionListener transactionListener;
private ExecutorService executorService;
public TransactionMQProducer() {
}
......@@ -50,44 +51,27 @@ public class TransactionMQProducer extends DefaultMQProducer {
}
@Override
public TransactionSendResult sendMessageInTransaction(final Message msg,
final LocalTransactionExecuter tranExecuter, final Object arg) throws MQClientException {
if (null == this.transactionCheckListener) {
throw new MQClientException("localTransactionBranchCheckListener is null", null);
public TransactionSendResult sendMessageInTransaction(final Message msg, final Object arg) throws MQClientException {
if (null == this.transactionListener) {
throw new MQClientException("TransactionListener is null", null);
}
return this.defaultMQProducerImpl.sendMessageInTransaction(msg, tranExecuter, arg);
}
public TransactionCheckListener getTransactionCheckListener() {
return transactionCheckListener;
}
public void setTransactionCheckListener(TransactionCheckListener transactionCheckListener) {
this.transactionCheckListener = transactionCheckListener;
}
public int getCheckThreadPoolMinSize() {
return checkThreadPoolMinSize;
}
public void setCheckThreadPoolMinSize(int checkThreadPoolMinSize) {
this.checkThreadPoolMinSize = checkThreadPoolMinSize;
return this.defaultMQProducerImpl.sendMessageInTransaction(msg, transactionListener, arg);
}
public int getCheckThreadPoolMaxSize() {
return checkThreadPoolMaxSize;
public TransactionListener getTransactionListener() {
return transactionListener;
}
public void setCheckThreadPoolMaxSize(int checkThreadPoolMaxSize) {
this.checkThreadPoolMaxSize = checkThreadPoolMaxSize;
public void setTransactionListener(TransactionListener transactionListener) {
this.transactionListener = transactionListener;
}
public int getCheckRequestHoldMax() {
return checkRequestHoldMax;
public ExecutorService getExecutorService() {
return executorService;
}
public void setCheckRequestHoldMax(int checkRequestHoldMax) {
this.checkRequestHoldMax = checkRequestHoldMax;
public void setExecutorService(ExecutorService executorService) {
this.executorService = executorService;
}
}
......@@ -137,13 +137,30 @@ public class BrokerConfig {
private boolean forceRegister = true;
/**
*
* This configurable item defines interval of topics registration of broker to name server. Allowing values are
* between 10, 000 and 60, 000 milliseconds.
*
*/
private int registerNameServerPeriod = 1000 * 30;
/**
* The minimum time of the transactional message to be checked firstly, one message only exceed this time interval
* that can be checked.
*/
@ImportantField
private long transactionTimeOut = 3 * 1000;
/**
* The maximum number of times the message was checked, if exceed this value, this message will be discarded.
*/
@ImportantField
private int transactionCheckMax = 5;
/**
* Transaction message check interval.
*/
@ImportantField
private long transactionCheckInterval = 60 * 1000;
public boolean isTraceOn() {
return traceOn;
}
......@@ -633,4 +650,28 @@ public class BrokerConfig {
public void setRegisterNameServerPeriod(int registerNameServerPeriod) {
this.registerNameServerPeriod = registerNameServerPeriod;
}
public long getTransactionTimeOut() {
return transactionTimeOut;
}
public void setTransactionTimeOut(long transactionTimeOut) {
this.transactionTimeOut = transactionTimeOut;
}
public int getTransactionCheckMax() {
return transactionCheckMax;
}
public void setTransactionCheckMax(int transactionCheckMax) {
this.transactionCheckMax = transactionCheckMax;
}
public long getTransactionCheckInterval() {
return transactionCheckInterval;
}
public void setTransactionCheckInterval(long transactionCheckInterval) {
this.transactionCheckInterval = transactionCheckInterval;
}
}
......@@ -89,6 +89,10 @@ public class MixAll {
public static final String DEFAULT_TRACE_REGION_ID = "DefaultRegion";
public static final String CONSUME_CONTEXT_TYPE = "ConsumeContextType";
public static final String RMQ_SYS_TRANS_HALF_TOPIC = "RMQ_SYS_TRANS_HALF_TOPIC";
public static final String RMQ_SYS_TRANS_OP_HALF_TOPIC = "RMQ_SYS_TRANS_OP_HALF_TOPIC";
public static final String CID_SYS_RMQ_TRANS = "CID_RMQ_SYS_TRANS";
public static String getWSAddr() {
String wsDomainName = System.getProperty("rocketmq.namesrv.domain", DEFAULT_NAMESRV_ADDR_LOOKUP);
String wsDomainSubgroup = System.getProperty("rocketmq.namesrv.domain.subgroup", "nsaddr");
......
......@@ -17,6 +17,7 @@
package org.apache.rocketmq.common.message;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
......@@ -28,6 +29,7 @@ public class Message implements Serializable {
private int flag;
private Map<String, String> properties;
private byte[] body;
private String transactionId;
public Message() {
}
......@@ -191,9 +193,22 @@ public class Message implements Serializable {
putProperty(MessageConst.PROPERTY_BUYER_ID, buyerId);
}
public String getTransactionId() {
return transactionId;
}
public void setTransactionId(String transactionId) {
this.transactionId = transactionId;
}
@Override
public String toString() {
return "Message [topic=" + topic + ", flag=" + flag + ", properties=" + properties + ", body="
+ (body != null ? body.length : 0) + "]";
return "Message{" +
"topic='" + topic + '\'' +
", flag=" + flag +
", properties=" + properties +
", body=" + Arrays.toString(body) +
", transactionId='" + transactionId + '\'' +
'}';
}
}
......@@ -41,6 +41,9 @@ public class MessageConst {
public static final String PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX = "UNIQ_KEY";
public static final String PROPERTY_MAX_RECONSUME_TIMES = "MAX_RECONSUME_TIMES";
public static final String PROPERTY_CONSUME_START_TIMESTAMP = "CONSUME_START_TIME";
public static final String PROPERTY_TRANSACTION_PREPARED_QUEUE_OFFSET = "TRAN_PREPARED_QUEUE_OFFSET";
public static final String PROPERTY_TRANSACTION_CHECK_TIMES = "TRANSACTION_CHECK_TIMES";
public static final String PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS = "CHECK_IMMUNITY_TIME_IN_SECONDS";
public static final String KEY_SEPARATOR = " ";
......
......@@ -31,6 +31,7 @@ public class CheckTransactionStateRequestHeader implements CommandCustomHeader {
private Long commitLogOffset;
private String msgId;
private String transactionId;
private String offsetMsgId;
@Override
public void checkFields() throws RemotingCommandException {
......@@ -67,4 +68,12 @@ public class CheckTransactionStateRequestHeader implements CommandCustomHeader {
public void setTransactionId(String transactionId) {
this.transactionId = transactionId;
}
public String getOffsetMsgId() {
return offsetMsgId;
}
public void setOffsetMsgId(String offsetMsgId) {
this.offsetMsgId = offsetMsgId;
}
}
......@@ -15,9 +15,6 @@
* limitations under the License.
*/
/**
* $Id: EndTransactionRequestHeader.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
*/
package org.apache.rocketmq.common.protocol.header;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
......@@ -121,9 +118,14 @@ public class EndTransactionRequestHeader implements CommandCustomHeader {
@Override
public String toString() {
return "EndTransactionRequestHeader [producerGroup=" + producerGroup + ", tranStateTableOffset="
+ tranStateTableOffset + ", commitLogOffset=" + commitLogOffset + ", commitOrRollback="
+ commitOrRollback + ", fromTransactionCheck=" + fromTransactionCheck + ", msgId=" + msgId
+ "]";
return "EndTransactionRequestHeader{" +
"producerGroup='" + producerGroup + '\'' +
", tranStateTableOffset=" + tranStateTableOffset +
", commitLogOffset=" + commitLogOffset +
", commitOrRollback=" + commitOrRollback +
", fromTransactionCheck=" + fromTransactionCheck +
", msgId='" + msgId + '\'' +
", transactionId='" + transactionId + '\'' +
'}';
}
}
......@@ -17,34 +17,34 @@
package org.apache.rocketmq.example.benchmark;
import java.io.UnsupportedEncodingException;
import java.util.LinkedList;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.LocalTransactionExecuter;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionCheckListener;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.io.UnsupportedEncodingException;
import java.util.LinkedList;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
public class TransactionProducer {
private static int threadCount;
private static int messageSize;
private static boolean ischeck;
private static boolean ischeckffalse;
private static boolean isCheck;
private static boolean isCheckFalse;
public static void main(String[] args) throws MQClientException, UnsupportedEncodingException {
threadCount = args.length >= 1 ? Integer.parseInt(args[0]) : 32;
messageSize = args.length >= 2 ? Integer.parseInt(args[1]) : 1024 * 2;
ischeck = args.length >= 3 && Boolean.parseBoolean(args[2]);
ischeckffalse = args.length >= 4 && Boolean.parseBoolean(args[3]);
isCheck = args.length >= 3 && Boolean.parseBoolean(args[2]);
isCheckFalse = args.length >= 4 && Boolean.parseBoolean(args[3]);
final Message msg = buildMessage(messageSize);
......@@ -73,8 +73,8 @@ public class TransactionProducer {
Long[] end = snapshotList.getLast();
final long sendTps =
(long) (((end[3] - begin[3]) / (double) (end[0] - begin[0])) * 1000L);
final double averageRT = (end[5] - begin[5]) / (double) (end[3] - begin[3]);
(long)(((end[3] - begin[3]) / (double)(end[0] - begin[0])) * 1000L);
final double averageRT = (end[5] - begin[5]) / (double)(end[3] - begin[3]);
System.out.printf(
"Send TPS: %d Max RT: %d Average RT: %7.3f Send Failed: %d Response Failed: %d transaction checkCount: %d %n",
......@@ -92,16 +92,14 @@ public class TransactionProducer {
}
}, 10000, 10000);
final TransactionCheckListener transactionCheckListener =
new TransactionCheckListenerBImpl(ischeckffalse, statsBenchmark);
final TransactionListener transactionListener =
new TransactionListenerImpl(isCheckFalse, isCheck, statsBenchmark);
final TransactionMQProducer producer = new TransactionMQProducer("benchmark_transaction_producer");
producer.setInstanceName(Long.toString(System.currentTimeMillis()));
producer.setTransactionCheckListener(transactionCheckListener);
producer.setTransactionListener(transactionListener);
producer.setDefaultTopicQueueNums(1000);
producer.start();
final TransactionExecuterBImpl tranExecuter = new TransactionExecuterBImpl(ischeck);
for (int i = 0; i < threadCount; i++) {
sendThreadPool.execute(new Runnable() {
@Override
......@@ -111,7 +109,7 @@ public class TransactionProducer {
// Thread.sleep(1000);
final long beginTimestamp = System.currentTimeMillis();
SendResult sendResult =
producer.sendMessageInTransaction(msg, tranExecuter, null);
producer.sendMessageInTransaction(msg, null);
if (sendResult != null) {
statsBenchmark.getSendRequestSuccessCount().incrementAndGet();
statsBenchmark.getReceiveResponseSuccessCount().incrementAndGet();
......@@ -124,8 +122,7 @@ public class TransactionProducer {
boolean updated =
statsBenchmark.getSendMessageMaxRT().compareAndSet(prevMaxRT,
currentRT);
if (updated)
break;
if (updated) { break; }
prevMaxRT = statsBenchmark.getSendMessageMaxRT().get();
}
......@@ -153,43 +150,37 @@ public class TransactionProducer {
}
}
class TransactionExecuterBImpl implements LocalTransactionExecuter {
private boolean ischeck;
public TransactionExecuterBImpl(boolean ischeck) {
this.ischeck = ischeck;
}
@Override
public LocalTransactionState executeLocalTransactionBranch(final Message msg, final Object arg) {
if (ischeck) {
return LocalTransactionState.UNKNOW;
}
return LocalTransactionState.COMMIT_MESSAGE;
}
}
class TransactionCheckListenerBImpl implements TransactionCheckListener {
private boolean ischeckffalse;
class TransactionListenerImpl implements TransactionListener {
private boolean isCheckFalse;
private StatsBenchmarkTProducer statsBenchmarkTProducer;
private boolean isCheckLocal;
public TransactionCheckListenerBImpl(boolean ischeckffalse,
StatsBenchmarkTProducer statsBenchmarkTProducer) {
this.ischeckffalse = ischeckffalse;
public TransactionListenerImpl(boolean isCheckFalse, boolean isCheckLocal,
StatsBenchmarkTProducer statsBenchmarkTProducer) {
this.isCheckFalse = isCheckFalse;
this.isCheckLocal = isCheckLocal;
this.statsBenchmarkTProducer = statsBenchmarkTProducer;
}
@Override
public LocalTransactionState checkLocalTransactionState(MessageExt msg) {
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
statsBenchmarkTProducer.getCheckRequestSuccessCount().incrementAndGet();
if (ischeckffalse) {
if (isCheckFalse) {
return LocalTransactionState.ROLLBACK_MESSAGE;
}
return LocalTransactionState.COMMIT_MESSAGE;
}
@Override
public LocalTransactionState executeLocalTransaction(final Message msg, final Object arg) {
if (isCheckLocal) {
return LocalTransactionState.UNKNOW;
}
return LocalTransactionState.COMMIT_MESSAGE;
}
}
class StatsBenchmarkTProducer {
......
......@@ -16,27 +16,40 @@
*/
package org.apache.rocketmq.example.transaction;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionCheckListener;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
public class TransactionCheckListenerImpl implements TransactionCheckListener {
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
public class TransactionListenerImpl implements TransactionListener {
private AtomicInteger transactionIndex = new AtomicInteger(0);
@Override
public LocalTransactionState checkLocalTransactionState(MessageExt msg) {
System.out.printf("server checking TrMsg %s%n", msg);
private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
int value = transactionIndex.getAndIncrement();
if ((value % 6) == 0) {
throw new RuntimeException("Could not find db");
} else if ((value % 5) == 0) {
return LocalTransactionState.ROLLBACK_MESSAGE;
} else if ((value % 4) == 0) {
return LocalTransactionState.COMMIT_MESSAGE;
}
int status = value % 3;
localTrans.put(msg.getTransactionId(), status);
return LocalTransactionState.UNKNOW;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
Integer status = localTrans.get(msg.getTransactionId());
if (null != status) {
switch (status) {
case 0:
return LocalTransactionState.UNKNOW;
case 1:
return LocalTransactionState.COMMIT_MESSAGE;
case 2:
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
return LocalTransactionState.COMMIT_MESSAGE;
}
}
......@@ -16,32 +16,44 @@
*/
package org.apache.rocketmq.example.transaction;
import java.io.UnsupportedEncodingException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionCheckListener;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.io.UnsupportedEncodingException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class TransactionProducer {
public static void main(String[] args) throws MQClientException, InterruptedException {
TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl();
TransactionListener transactionListener = new TransactionListenerImpl();
TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
producer.setCheckThreadPoolMinSize(2);
producer.setCheckThreadPoolMaxSize(2);
producer.setCheckRequestHoldMax(2000);
producer.setTransactionCheckListener(transactionCheckListener);
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
producer.setExecutorService(executorService);
producer.setTransactionListener(transactionListener);
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl();
for (int i = 0; i < 100; i++) {
for (int i = 0; i < 10; i++) {
try {
Message msg =
new Message("TopicTest", tags[i % tags.length], "KEY" + i,
new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, null);
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n", sendResult);
Thread.sleep(10);
......
......@@ -88,7 +88,7 @@ public class LoggingBuilderTest extends BasicLoggerTest {
Assert.assertTrue(cc >= 2);
}
@Test
//@Test
public void testDailyRollingFileAppender() throws InterruptedException {
String rollingFile = loggingDir + "/daily-rolling--222.log";
Appender rollingFileAppender = LoggingBuilder.newAppenderBuilder().withAsync(false, 1024)
......
......@@ -254,6 +254,7 @@
<exclude>bin/README.md</exclude>
<exclude>.github/*</exclude>
<exclude>src/test/resources/certs/*</exclude>
<exclude>src/test/resources/META-INF/service/*</exclude>
<exclude>*/target/**</exclude>
<exclude>*/*.iml</exclude>
</excludes>
......
......@@ -25,12 +25,15 @@ public class MessageExtBrokerInner extends MessageExt {
private long tagsCode;
public static long tagsString2tagsCode(final TopicFilterType filter, final String tags) {
if (null == tags || tags.length() == 0)
return 0;
if (null == tags || tags.length() == 0) { return 0; }
return tags.hashCode();
}
public static long tagsString2tagsCode(final String tags) {
return tagsString2tagsCode(null, tags);
}
public String getPropertiesString() {
return propertiesString;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册