提交 c6c699eb 编写于 作者: Q qqeasonchen

[RIP-16]impl rpc support

上级 90ae1167
......@@ -62,6 +62,7 @@ import org.apache.rocketmq.broker.processor.EndTransactionProcessor;
import org.apache.rocketmq.broker.processor.PullMessageProcessor;
import org.apache.rocketmq.broker.processor.QueryMessageProcessor;
import org.apache.rocketmq.broker.processor.SendMessageProcessor;
import org.apache.rocketmq.broker.processor.ReplyMessageProcessor;
import org.apache.rocketmq.broker.slave.SlaveSynchronize;
import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
import org.apache.rocketmq.broker.topic.TopicConfigManager;
......@@ -132,6 +133,7 @@ public class BrokerController {
private final SlaveSynchronize slaveSynchronize;
private final BlockingQueue<Runnable> sendThreadPoolQueue;
private final BlockingQueue<Runnable> pullThreadPoolQueue;
private final BlockingQueue<Runnable> replyThreadPoolQueue;
private final BlockingQueue<Runnable> queryThreadPoolQueue;
private final BlockingQueue<Runnable> clientManagerThreadPoolQueue;
private final BlockingQueue<Runnable> heartbeatThreadPoolQueue;
......@@ -147,6 +149,7 @@ public class BrokerController {
private TopicConfigManager topicConfigManager;
private ExecutorService sendMessageExecutor;
private ExecutorService pullMessageExecutor;
private ExecutorService replyMessageExecutor;
private ExecutorService queryMessageExecutor;
private ExecutorService adminBrokerExecutor;
private ExecutorService clientManageExecutor;
......@@ -194,6 +197,7 @@ public class BrokerController {
this.sendThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getSendThreadPoolQueueCapacity());
this.pullThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getPullThreadPoolQueueCapacity());
this.replyThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getReplyThreadPoolQueueCapacity());
this.queryThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getQueryThreadPoolQueueCapacity());
this.clientManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getClientManagerThreadPoolQueueCapacity());
this.consumerManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getConsumerManagerThreadPoolQueueCapacity());
......@@ -277,6 +281,14 @@ public class BrokerController {
this.pullThreadPoolQueue,
new ThreadFactoryImpl("PullMessageThread_"));
this.replyMessageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getProcessReplyMessageThreadPoolNums(),
this.brokerConfig.getProcessReplyMessageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.replyThreadPoolQueue,
new ThreadFactoryImpl("ProcessReplyMessageThread_"));
this.queryMessageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getQueryMessageThreadPoolNums(),
this.brokerConfig.getQueryMessageThreadPoolNums(),
......@@ -553,6 +565,18 @@ public class BrokerController {
this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor);
this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);
/**
* ReplyMessageProcessor
*/
ReplyMessageProcessor replyMessageProcessor = new ReplyMessageProcessor(this);
replyMessageProcessor.registerSendMessageHook(sendMessageHookList);
this.remotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE, replyMessageProcessor, replyMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE_V2, replyMessageProcessor, replyMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE, replyMessageProcessor, replyMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE_V2, replyMessageProcessor, replyMessageExecutor);
/**
* QueryMessageProcessor
*/
......@@ -763,6 +787,10 @@ public class BrokerController {
this.pullMessageExecutor.shutdown();
}
if (this.replyMessageExecutor != null) {
this.replyMessageExecutor.shutdown();
}
if (this.adminBrokerExecutor != null) {
this.adminBrokerExecutor.shutdown();
}
......
......@@ -24,6 +24,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
......@@ -43,6 +44,7 @@ public class ProducerManager {
private final Lock groupChannelLock = new ReentrantLock();
private final HashMap<String /* group name */, HashMap<Channel, ClientChannelInfo>> groupChannelTable =
new HashMap<String, HashMap<Channel, ClientChannelInfo>>();
private final ConcurrentHashMap<String, Channel> clientChannelTable = new ConcurrentHashMap<>();
private PositiveAtomicCounter positiveAtomicCounter = new PositiveAtomicCounter();
public ProducerManager() {
}
......@@ -82,6 +84,7 @@ public class ProducerManager {
long diff = System.currentTimeMillis() - info.getLastUpdateTimestamp();
if (diff > CHANNEL_EXPIRED_TIMEOUT) {
it.remove();
clientChannelTable.remove(info.getClientId());
log.warn(
"SCAN: remove expired channel[{}] from ProducerManager groupChannelTable, producer group name: {}",
RemotingHelper.parseChannelRemoteAddr(info.getChannel()), group);
......@@ -113,6 +116,7 @@ public class ProducerManager {
final ClientChannelInfo clientChannelInfo =
clientChannelInfoTable.remove(channel);
if (clientChannelInfo != null) {
clientChannelTable.remove(clientChannelInfo.getClientId());
log.info(
"NETTY EVENT: remove channel[{}][{}] from ProducerManager groupChannelTable, producer group: {}",
clientChannelInfo.toString(), remoteAddr, group);
......@@ -146,6 +150,7 @@ public class ProducerManager {
clientChannelInfoFound = channelTable.get(clientChannelInfo.getChannel());
if (null == clientChannelInfoFound) {
channelTable.put(clientChannelInfo.getChannel(), clientChannelInfo);
clientChannelTable.put(clientChannelInfo.getClientId(), clientChannelInfo.getChannel());
log.info("new producer connected, group: {} channel: {}", group,
clientChannelInfo.toString());
}
......@@ -171,6 +176,7 @@ public class ProducerManager {
HashMap<Channel, ClientChannelInfo> channelTable = this.groupChannelTable.get(group);
if (null != channelTable && !channelTable.isEmpty()) {
ClientChannelInfo old = channelTable.remove(clientChannelInfo.getChannel());
clientChannelTable.remove(clientChannelInfo.getClientId());
if (old != null) {
log.info("unregister a producer[{}] from groupChannelTable {}", group,
clientChannelInfo.toString());
......@@ -223,4 +229,8 @@ public class ProducerManager {
}
return null;
}
public Channel findChannel(String clientId) {
return clientChannelTable.get(clientId);
}
}
package org.apache.rocketmq.broker.processor;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.mqtrace.SendMessageContext;
import org.apache.rocketmq.common.*;
import org.apache.rocketmq.common.message.*;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.ReplyMessageRequestHeader;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeaderV2;
import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.*;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
public class ReplyMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {
public ReplyMessageProcessor(final BrokerController brokerController) {
super(brokerController);
}
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
SendMessageContext mqtraceContext = null;
SendMessageRequestHeader requestHeader = parseRequestHeader(request);
if (requestHeader == null) {
return null;
}
mqtraceContext = buildMsgContext(ctx, requestHeader);
this.executeSendMessageHookBefore(ctx, request, mqtraceContext);
RemotingCommand response = this.processReplyMessageRequest(ctx, request, mqtraceContext, requestHeader);
this.executeSendMessageHookAfter(response, mqtraceContext);
return response;
}
@Override
protected SendMessageRequestHeader parseRequestHeader(RemotingCommand request) throws RemotingCommandException {
SendMessageRequestHeaderV2 requestHeaderV2 = null;
SendMessageRequestHeader requestHeader = null;
switch (request.getCode()) {
case RequestCode.SEND_REPLY_MESSAGE_V2:
requestHeaderV2 =
(SendMessageRequestHeaderV2) request
.decodeCommandCustomHeader(SendMessageRequestHeaderV2.class);
case RequestCode.SEND_REPLY_MESSAGE:
if (null == requestHeaderV2) {
requestHeader =
(SendMessageRequestHeader) request
.decodeCommandCustomHeader(SendMessageRequestHeader.class);
} else {
requestHeader = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV1(requestHeaderV2);
}
default:
break;
}
return requestHeader;
}
private RemotingCommand processReplyMessageRequest(final ChannelHandlerContext ctx,
final RemotingCommand request,
final SendMessageContext sendMessageContext,
final SendMessageRequestHeader requestHeader) {
final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();
response.setOpaque(request.getOpaque());
response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId());
response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));
log.debug("receive SendReplyMessage request command, {}", request);
final long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp();
if (this.brokerController.getMessageStore().now() < startTimstamp) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimstamp)));
return response;
}
response.setCode(-1);
super.msgCheck(ctx, requestHeader, response);
if (response.getCode() != -1) {
return response;
}
final byte[] body = request.getBody();
int queueIdInt = requestHeader.getQueueId();
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
if (queueIdInt < 0) {
queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums();
}
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setTopic(requestHeader.getTopic());
msgInner.setQueueId(queueIdInt);
msgInner.setBody(body);
msgInner.setFlag(requestHeader.getFlag());
MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties()));
msgInner.setPropertiesString(requestHeader.getProperties());
msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
msgInner.setBornHost(ctx.channel().remoteAddress());
msgInner.setStoreHost(this.getStoreHost());
msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
boolean pushOk = this.pushReplyMessage(ctx, requestHeader, msgInner, response);
if (pushOk && this.brokerController.getBrokerConfig().isStoreReplyMessageEnable()) {
PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
this.handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt);
} else {
responseHeader.setMsgId("0");
responseHeader.setQueueId(0);
responseHeader.setQueueOffset(0L);
}
return response;
}
private boolean pushReplyMessage(final ChannelHandlerContext ctx, final SendMessageRequestHeader requestHeader, final Message msg, final RemotingCommand response) {
ReplyMessageRequestHeader replyMessageRequestHeader = new ReplyMessageRequestHeader();
replyMessageRequestHeader.setBornHost(ctx.channel().remoteAddress().toString());
replyMessageRequestHeader.setStoreHost(this.getStoreHost().toString());
replyMessageRequestHeader.setStoreTimestamp(System.currentTimeMillis());
replyMessageRequestHeader.setProducerGroup(requestHeader.getProducerGroup());
replyMessageRequestHeader.setTopic(requestHeader.getTopic());
replyMessageRequestHeader.setDefaultTopic(requestHeader.getDefaultTopic());
replyMessageRequestHeader.setDefaultTopicQueueNums(requestHeader.getDefaultTopicQueueNums());
replyMessageRequestHeader.setQueueId(requestHeader.getQueueId());
replyMessageRequestHeader.setSysFlag(requestHeader.getSysFlag());
replyMessageRequestHeader.setBornTimestamp(requestHeader.getBornTimestamp());
replyMessageRequestHeader.setFlag(requestHeader.getFlag());
replyMessageRequestHeader.setProperties(requestHeader.getProperties());
replyMessageRequestHeader.setReconsumeTimes(requestHeader.getReconsumeTimes());
replyMessageRequestHeader.setUnitMode(requestHeader.isUnitMode());
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PUSH_REPLY_MESSAGE_TO_CLIENT, replyMessageRequestHeader);
request.setBody(msg.getBody());
String senderId = msg.getProperties().get(MessageConst.PROPERTY_MESSAGE_REPLY_TO);
boolean pushOk = false;
if (senderId != null) {
Channel channel = this.brokerController.getProducerManager().findChannel(senderId);
if (channel != null) {
msg.getProperties().put(MessageConst.PROPERTY_PUSH_REPLY_TIME, String.valueOf(System.currentTimeMillis()));
replyMessageRequestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
try {
RemotingCommand pushResponse = this.brokerController.getBroker2Client().callClient(channel, request);
assert pushResponse != null;
switch (pushResponse.getCode()) {
case ResponseCode.SUCCESS: {
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
pushOk = true;
break;
}
default: {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("push reply message to requester fail");
log.warn("push reply message to <{}> return fail, remark: {}", senderId, response.getRemark());
}
}
} catch (InterruptedException e) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("push reply message to requester fail");
log.warn("push reply message to <{}> fail. {}", senderId, channel, e);
} catch (RemotingException e) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("push reply message to requester fail");
log.warn("push reply message to <{}> fail. {}", senderId, channel, e);
}
} else {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("channel of <" + senderId + "> not found");
log.warn("push reply message fial, channel of <{}> not found.", senderId);
}
return pushOk;
}
log.warn("REPLY_TO is null, can not reply message");
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("REPLY_TO is null");
return pushOk;
}
private void handlePutMessageResult(PutMessageResult putMessageResult, final RemotingCommand response,
final RemotingCommand request, final MessageExt msg,
final SendMessageResponseHeader responseHeader, SendMessageContext sendMessageContext, ChannelHandlerContext ctx,
int queueIdInt) {
if (putMessageResult == null) {
response.setRemark("push reply to requester success, but store putMessage return null");
return ;
}
boolean sendOK = false;
switch (putMessageResult.getPutMessageStatus()) {
// Success
case PUT_OK:
sendOK = true;
response.setCode(ResponseCode.SUCCESS);
break;
case FLUSH_DISK_TIMEOUT:
response.setCode(ResponseCode.FLUSH_DISK_TIMEOUT);
sendOK = true;
break;
case FLUSH_SLAVE_TIMEOUT:
response.setCode(ResponseCode.FLUSH_SLAVE_TIMEOUT);
sendOK = true;
break;
case SLAVE_NOT_AVAILABLE:
response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);
sendOK = true;
break;
// Failed
case CREATE_MAPEDFILE_FAILED:
// response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("create mapped file failed, server is busy or broken.");
break;
case MESSAGE_ILLEGAL:
case PROPERTIES_SIZE_EXCEEDED:
// response.setCode(ResponseCode.MESSAGE_ILLEGAL);
response.setRemark(
"the message is illegal, maybe msg body or properties length not matched. msg body length limit 128k, msg properties length limit 32k.");
break;
case SERVICE_NOT_AVAILABLE:
// response.setCode(ResponseCode.SERVICE_NOT_AVAILABLE);
response.setRemark(
"service not available now, maybe disk full, maybe your broker machine memory too small.");
break;
case OS_PAGECACHE_BUSY:
// response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("[PC_SYNCHRONIZED]broker busy, start flow control for a while");
break;
case UNKNOWN_ERROR:
// response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("UNKNOWN_ERROR");
break;
default:
// response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("UNKNOWN_ERROR DEFAULT");
break;
}
String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER);
if (sendOK) {
this.brokerController.getBrokerStatsManager().incTopicPutNums(msg.getTopic(), putMessageResult.getAppendMessageResult().getMsgNum(), 1);
this.brokerController.getBrokerStatsManager().incTopicPutSize(msg.getTopic(),
putMessageResult.getAppendMessageResult().getWroteBytes());
this.brokerController.getBrokerStatsManager().incBrokerPutNums(putMessageResult.getAppendMessageResult().getMsgNum());
response.setRemark(null);
responseHeader.setMsgId(putMessageResult.getAppendMessageResult().getMsgId());
responseHeader.setQueueId(queueIdInt);
responseHeader.setQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset());
if (hasSendMessageHook()) {
sendMessageContext.setMsgId(responseHeader.getMsgId());
sendMessageContext.setQueueId(responseHeader.getQueueId());
sendMessageContext.setQueueOffset(responseHeader.getQueueOffset());
int commercialBaseCount = brokerController.getBrokerConfig().getCommercialBaseCount();
int wroteSize = putMessageResult.getAppendMessageResult().getWroteBytes();
int incValue = (int)Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT) * commercialBaseCount;
sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_SUCCESS);
sendMessageContext.setCommercialSendTimes(incValue);
sendMessageContext.setCommercialSendSize(wroteSize);
sendMessageContext.setCommercialOwner(owner);
}
} else {
if (hasSendMessageHook()) {
int wroteSize = request.getBody().length;
int incValue = (int)Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT);
sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_FAILURE);
sendMessageContext.setCommercialSendTimes(incValue);
sendMessageContext.setCommercialSendSize(wroteSize);
sendMessageContext.setCommercialOwner(owner);
}
}
}
}
......@@ -343,11 +343,13 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
msgInner.setBody(body);
msgInner.setFlag(requestHeader.getFlag());
MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties()));
msgInner.setPropertiesString(requestHeader.getProperties());
msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
msgInner.setBornHost(ctx.channel().remoteAddress());
msgInner.setStoreHost(this.getStoreHost());
msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName();
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_CLUSTER, clusterName);
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
PutMessageResult putMessageResult = null;
Map<String, String> oriProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());
String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
......@@ -536,6 +538,8 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
messageExtBatch.setBornHost(ctx.channel().remoteAddress());
messageExtBatch.setStoreHost(this.getStoreHost());
messageExtBatch.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName();
MessageAccessor.putProperty(messageExtBatch, MessageConst.PROPERTY_CLUSTER, clusterName);
PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessages(messageExtBatch);
......
......@@ -134,6 +134,14 @@ public class TopicConfigManager extends ConfigManager {
this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
}
}
{
String topic = this.brokerController.getBrokerConfig().getBrokerClusterName() + "_" + MixAll.REPLY_TOPIC_POSTFIX;
TopicConfig topicConfig = new TopicConfig(topic);
this.systemTopicList.add(topic);
topicConfig.setReadQueueNums(1);
topicConfig.setWriteQueueNums(1);
this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
}
}
public boolean isSystemTopic(final String topic) {
......
......@@ -23,4 +23,5 @@ public class ClientErrorCode {
public static final int BROKER_NOT_EXIST_EXCEPTION = 10003;
public static final int NO_NAME_SERVER_EXCEPTION = 10004;
public static final int NOT_FOUND_TOPIC_EXCEPTION = 10005;
public static final int REQUEST_TIMEOUT_EXCEPTION = 10006;
}
\ No newline at end of file
package org.apache.rocketmq.client.exception;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.help.FAQUrl;
public class RequestTimeoutException extends Exception {
private static final long serialVersionUID = -5758410930844185841L;
private int responseCode;
private String errorMessage;
public RequestTimeoutException(String errorMessage, Throwable cause) {
super(errorMessage, cause);
this.responseCode = -1;
this.errorMessage = errorMessage;
}
public RequestTimeoutException(int responseCode, String errorMessage) {
super("CODE: " + UtilAll.responseCode2String(responseCode) + " DESC: "
+ errorMessage);
this.responseCode = responseCode;
this.errorMessage = errorMessage;
}
public int getResponseCode() {
return responseCode;
}
public RequestTimeoutException setResponseCode(final int responseCode) {
this.responseCode = responseCode;
return this;
}
public String getErrorMessage() {
return errorMessage;
}
public void setErrorMessage(final String errorMessage) {
this.errorMessage = errorMessage;
}
}
......@@ -16,6 +16,8 @@
*/
package org.apache.rocketmq.client.impl;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
......@@ -25,11 +27,10 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.impl.producer.MQProducerInner;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.client.producer.RequestFutureTable;
import org.apache.rocketmq.client.producer.RequestResponseFuture;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.message.*;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
......@@ -43,6 +44,8 @@ import org.apache.rocketmq.common.protocol.header.GetConsumerRunningInfoRequestH
import org.apache.rocketmq.common.protocol.header.GetConsumerStatusRequestHeader;
import org.apache.rocketmq.common.protocol.header.NotifyConsumerIdsChangedRequestHeader;
import org.apache.rocketmq.common.protocol.header.ResetOffsetRequestHeader;
import org.apache.rocketmq.common.protocol.header.ReplyMessageRequestHeader;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
......@@ -76,6 +79,9 @@ public class ClientRemotingProcessor implements NettyRequestProcessor {
case RequestCode.CONSUME_MESSAGE_DIRECTLY:
return this.consumeMessageDirectly(ctx, request);
case RequestCode.PUSH_REPLY_MESSAGE_TO_CLIENT:
return this.receiveReplyMssage(ctx, request);
default:
break;
}
......@@ -213,4 +219,78 @@ public class ClientRemotingProcessor implements NettyRequestProcessor {
return response;
}
private RemotingCommand receiveReplyMssage(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
long receiveTime = System.currentTimeMillis();
ReplyMessageRequestHeader requestHeader = (ReplyMessageRequestHeader) request.decodeCommandCustomHeader(ReplyMessageRequestHeader.class);
try {
MessageExt msg = new MessageExt();
msg.setTopic(requestHeader.getTopic());
msg.setQueueId(requestHeader.getQueueId());
msg.setStoreTimestamp(requestHeader.getStoreTimestamp());
if (requestHeader.getBornHost() != null) {
String[] bornHostArr = requestHeader.getBornHost().split("/");
String bornHost/*ip:port*/ = bornHostArr[bornHostArr.length - 1];
String[] host = bornHost.split(":");
if (host.length == 2)
msg.setBornHost(new InetSocketAddress(host[0], Integer.parseInt(host[1])));
}
if (requestHeader.getStoreHost() != null) {
String[] storeHostArr = requestHeader.getStoreHost().split("/");
String storeHost = storeHostArr[storeHostArr.length - 1];
String[] host = storeHost.split(":");
if (host.length == 2)
msg.setStoreHost(new InetSocketAddress(host[0], Integer.parseInt(host[1])));
}
byte[] body = request.getBody();
if ((requestHeader.getSysFlag() & MessageSysFlag.COMPRESSED_FLAG) == MessageSysFlag.COMPRESSED_FLAG) {
try {
body = UtilAll.uncompress(body);
} catch (IOException e) {
log.warn("err when uncompress constant", e);
}
}
msg.setBody(body);
msg.setFlag(requestHeader.getFlag());
MessageAccessor.setProperties(msg, MessageDecoder.string2messageProperties(requestHeader.getProperties()));
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REPLY_MESSAGE_ARRIVE_TIME, String.valueOf(receiveTime));
msg.setBornTimestamp(requestHeader.getBornTimestamp());
msg.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
log.debug("receive reply message :{}", msg);
processReplyMessage(msg);
} catch (Exception e) {
log.warn("unknown err when receiveRRReplyMsg", e);
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("process reply message fail");
}
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
private void processReplyMessage(MessageExt replyMsg) {
final String uniqueId = replyMsg.getUserProperty(MessageConst.PROPERTY_REQUEST_UNIQ_ID);
final RequestResponseFuture requestResponseFuture = RequestFutureTable.getRequestFutureTable().get(uniqueId);
if (requestResponseFuture != null) {
requestResponseFuture.putResponseMessage(replyMsg);
RequestFutureTable.getRequestFutureTable().remove(uniqueId);
if (requestResponseFuture.getRequestCallback() != null) {
requestResponseFuture.getRequestCallback().onSuccess(replyMsg);
} else {
requestResponseFuture.putResponseMessage(replyMsg);
}
} else {
log.warn(String.format("receive reply message, but not matched any request, REQUEST_UNIQ_ID: %s", uniqueId));
}
}
}
......@@ -199,6 +199,8 @@ public class MQClientAPIImpl {
this.remotingClient.registerProcessor(RequestCode.GET_CONSUMER_RUNNING_INFO, this.clientRemotingProcessor, null);
this.remotingClient.registerProcessor(RequestCode.CONSUME_MESSAGE_DIRECTLY, this.clientRemotingProcessor, null);
this.remotingClient.registerProcessor(RequestCode.PUSH_REPLY_MESSAGE_TO_CLIENT, this.clientRemotingProcessor, null);
}
public List<String> getNameServerAddressList() {
......@@ -419,13 +421,23 @@ public class MQClientAPIImpl {
) throws RemotingException, MQBrokerException, InterruptedException {
long beginStartTime = System.currentTimeMillis();
RemotingCommand request = null;
if (sendSmartMsg || msg instanceof MessageBatch) {
SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
String msgType = msg.getProperty(MessageConst.PROPERTY_MESSAGE_TYPE);
boolean isReply = msgType != null && msgType.equals(MixAll.REPLY_MESSAGE_FLAG);
if (isReply) {
if (sendSmartMsg) {
SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE_V2, requestHeaderV2);
} else {
request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE, requestHeader);
}
} else {
request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
if (sendSmartMsg || msg instanceof MessageBatch) {
SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
} else {
request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
}
}
request.setBody(msg.getBody());
switch (communicationMode) {
......
......@@ -24,6 +24,8 @@ import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
......@@ -39,6 +41,7 @@ import org.apache.rocketmq.client.Validators;
import org.apache.rocketmq.client.common.ClientErrorCode;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.exception.RequestTimeoutException;
import org.apache.rocketmq.client.hook.CheckForbiddenContext;
import org.apache.rocketmq.client.hook.CheckForbiddenHook;
import org.apache.rocketmq.client.hook.SendMessageContext;
......@@ -52,6 +55,9 @@ import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.LocalTransactionExecuter;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.RequestCallback;
import org.apache.rocketmq.client.producer.RequestFutureTable;
import org.apache.rocketmq.client.producer.RequestResponseFuture;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
......@@ -79,11 +85,13 @@ import org.apache.rocketmq.common.protocol.header.CheckTransactionStateRequestHe
import org.apache.rocketmq.common.protocol.header.EndTransactionRequestHeader;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.common.utils.RequestIdUtil;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
......@@ -107,6 +115,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
private final BlockingQueue<Runnable> asyncSenderThreadPoolQueue;
private final ExecutorService defaultAsyncSenderExecutor;
private ExecutorService asyncSenderExecutor;
private final Timer timer = new Timer("RequestHouseKeepingService", true);
public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer) {
this(defaultMQProducer, null);
......@@ -212,6 +221,17 @@ public class DefaultMQProducerImpl implements MQProducerInner {
}
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
RequestFutureTable.scanExpiredRequest();
} catch (Throwable e) {
log.error("scan RequestFutureTable exception", e);
}
}
}, 1000 * 3, 1000);
}
private void checkConfig() throws MQClientException {
......@@ -1310,6 +1330,244 @@ public class DefaultMQProducerImpl implements MQProducerInner {
return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
}
public Message request(Message msg, long timeout) throws RequestTimeoutException, MQClientException, RemotingException, MQBrokerException, InterruptedException {
prepareSendRequest(msg, timeout);
final String requestUniqId = msg.getProperty(MessageConst.PROPERTY_REQUEST_UNIQ_ID);
try {
final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(requestUniqId, timeout, null);
RequestFutureTable.getRequestFutureTable().put(requestUniqId, requestResponseFuture);
this.sendDefaultImpl(msg, CommunicationMode.ASYNC, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
requestResponseFuture.setSendReqeustOk(true);
}
@Override
public void onException(Throwable e) {
requestResponseFuture.setSendReqeustOk(false);
requestResponseFuture.putResponseMessage(null);
requestResponseFuture.setCause(e);
}
}, timeout);
Message responseMessage = requestResponseFuture.waitResponseMessage(timeout);
if (responseMessage == null) {
if (requestResponseFuture.isSendReqeustOk()) {
throw new RequestTimeoutException(ClientErrorCode.REQUEST_TIMEOUT_EXCEPTION,
"send request message to <" + msg.getTopic() + "> OK, but wait reply message timeout, " + timeout + " ms.");
} else {
throw new MQClientException("send request message to <" + msg.getTopic() + "> fail", requestResponseFuture.getCause());
}
}
return responseMessage;
} finally {
RequestFutureTable.getRequestFutureTable().remove(requestUniqId);
}
}
public void request(Message msg, final RequestCallback requestCallback, long timeout) throws RemotingException {
prepareSendRequest(msg, timeout);
final String requestUniqId = msg.getProperty(MessageConst.PROPERTY_REQUEST_UNIQ_ID);
try {
final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(requestUniqId, timeout, requestCallback);
RequestFutureTable.getRequestFutureTable().put(requestUniqId, requestResponseFuture);
this.sendDefaultImpl(msg, CommunicationMode.ASYNC, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
requestResponseFuture.setSendReqeustOk(true);
}
@Override
public void onException(Throwable e) {
requestResponseFuture.setCause(e);
requestFail(requestUniqId);
}
}, timeout);
} catch (Exception ex) {
log.warn("send request message to <{}> failed.", msg.getTopic(), ex);
throw new RemotingSendRequestException(msg.getTopic(), ex);
}
}
public Message request(final Message msg, final MessageQueueSelector selector, final Object arg,
final long timeout) throws MQClientException, RemotingException, MQBrokerException,
InterruptedException, RequestTimeoutException {
prepareSendRequest(msg, timeout);
final String requestUniqId = msg.getProperty(MessageConst.PROPERTY_REQUEST_UNIQ_ID);
try {
final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(requestUniqId, timeout, null);
RequestFutureTable.getRequestFutureTable().put(requestUniqId, requestResponseFuture);
this.sendSelectImpl(msg, selector, arg, CommunicationMode.ASYNC, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
requestResponseFuture.setSendReqeustOk(true);
}
@Override
public void onException(Throwable e) {
requestResponseFuture.setSendReqeustOk(false);
requestResponseFuture.putResponseMessage(null);
requestResponseFuture.setCause(e);
}
}, timeout);
Message responseMessage = requestResponseFuture.waitResponseMessage(timeout);
if (responseMessage == null) {
if (requestResponseFuture.isSendReqeustOk()) {
throw new RequestTimeoutException(ClientErrorCode.REQUEST_TIMEOUT_EXCEPTION,
"send request message to <" + msg.getTopic() + "> OK, but wait reply message timeout, " + timeout + " ms.");
} else {
throw new MQClientException("send request message to <" + msg.getTopic() + "> fail", requestResponseFuture.getCause());
}
}
return responseMessage;
} finally {
RequestFutureTable.getRequestFutureTable().remove(requestUniqId);
}
}
public void request(final Message msg, final MessageQueueSelector selector, final Object arg,
final RequestCallback requestCallback, final long timeout) throws RemotingException {
prepareSendRequest(msg, timeout);
final String requestUniqId = msg.getProperty(MessageConst.PROPERTY_REQUEST_UNIQ_ID);
try {
final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(requestUniqId, timeout, requestCallback);
RequestFutureTable.getRequestFutureTable().put(requestUniqId, requestResponseFuture);
this.sendSelectImpl(msg, selector, arg, CommunicationMode.ASYNC, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
requestResponseFuture.setSendReqeustOk(true);
}
@Override
public void onException(Throwable e) {
requestResponseFuture.setCause(e);
requestFail(requestUniqId);
}
}, timeout);
} catch (Exception ex) {
log.warn("send request message to <{}> failed.", msg.getTopic(), ex);
throw new RemotingSendRequestException(msg.getTopic(), ex);
}
}
public Message request(final Message msg, final MessageQueue mq, final long timeout)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException, RequestTimeoutException {
prepareSendRequest(msg, timeout);
final String requestUniqId = msg.getProperty(MessageConst.PROPERTY_REQUEST_UNIQ_ID);
try {
final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(requestUniqId, timeout, null);
RequestFutureTable.getRequestFutureTable().put(requestUniqId, requestResponseFuture);
this.sendKernelImpl(msg, mq, CommunicationMode.ASYNC, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
requestResponseFuture.setSendReqeustOk(true);
}
@Override
public void onException(Throwable e) {
requestResponseFuture.setSendReqeustOk(false);
requestResponseFuture.putResponseMessage(null);
requestResponseFuture.setCause(e);
}
}, null, timeout);
Message responseMessage = requestResponseFuture.waitResponseMessage(timeout);
if (responseMessage == null) {
if (requestResponseFuture.isSendReqeustOk()) {
throw new RequestTimeoutException(ClientErrorCode.REQUEST_TIMEOUT_EXCEPTION,
"send request message to <" + msg.getTopic() + "> OK, but wait reply message timeout, " + timeout + " ms.");
} else {
throw new MQClientException("send request message to <" + msg.getTopic() + "> fail", requestResponseFuture.getCause());
}
}
return responseMessage;
} finally {
RequestFutureTable.getRequestFutureTable().remove(requestUniqId);
}
}
public void request(final Message msg, final MessageQueue mq, final RequestCallback requestCallback, long timeout)
throws RemotingException {
prepareSendRequest(msg, timeout);
final String requestUniqId = msg.getProperty(MessageConst.PROPERTY_REQUEST_UNIQ_ID);
try {
final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(requestUniqId, timeout, requestCallback);
RequestFutureTable.getRequestFutureTable().put(requestUniqId, requestResponseFuture);
this.sendKernelImpl(msg, mq, CommunicationMode.ASYNC, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
requestResponseFuture.setSendReqeustOk(true);
}
@Override
public void onException(Throwable e) {
requestResponseFuture.setCause(e);
requestFail(requestUniqId);
}
}, null, timeout);
} catch (Exception ex) {
log.warn("send request message to <{}> failed.", msg.getTopic(), ex);
throw new RemotingSendRequestException(msg.getTopic(), ex);
}
}
private void requestFail(final String requestUniqId) {
RequestResponseFuture responseFuture = RequestFutureTable.getRequestFutureTable().remove(requestUniqId);
if (responseFuture != null) {
responseFuture.setSendReqeustOk(false);
responseFuture.putResponseMessage(null);
try {
responseFuture.executeRequestCallback();
} catch (Exception e) {
log.warn("execute requestCallback in requestFail, and callback throw", e);
}
}
}
private void prepareSendRequest(final Message msg, long timeout){
String requestUniqId = RequestIdUtil.createUniqueRequestId();
String requestClientId = this.getmQClientFactory().getClientId();
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REQUEST_UNIQ_ID, requestUniqId);
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MESSAGE_REPLY_TO, requestClientId);
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MESSAGE_TTL, String.valueOf(timeout));
boolean hasRouteData = this.getmQClientFactory().getTopicRouteTable().containsKey(msg.getTopic());
if (!hasRouteData) {
long beginTimestamp = System.currentTimeMillis();
this.tryToFindTopicPublishInfo(msg.getTopic());
this.getmQClientFactory().sendHeartbeatToAllBrokerWithLock();
long cost = System.currentTimeMillis() - beginTimestamp;
if (cost > 500) {
log.warn("prepare send request for <{}> cost {} ms", msg.getTopic(), cost);
}
}
}
private SendResult reply(final Message msg, long timeoutMillis) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeoutMillis);
}
private SendResult reply(final Message msg, final SendCallback sendCallback, long timeoutMillis) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
return this.sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeoutMillis);
}
public void replyOneway(final Message msg) throws RemotingException, MQClientException, InterruptedException {
this.sendOneway(msg);
}
public ConcurrentMap<String, TopicPublishInfo> getTopicPublishInfoTable() {
return topicPublishInfoTable;
}
......
......@@ -24,6 +24,7 @@ import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.Validators;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.exception.RequestTimeoutException;
import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.client.trace.AsyncTraceDispatcher;
......@@ -582,6 +583,111 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
this.defaultMQProducerImpl.send(msg, selector, arg, sendCallback, timeout);
}
/**
*
* @param msg
* @param timeout
* @return
* @throws MQClientException
* @throws RemotingException
* @throws MQBrokerException
* @throws InterruptedException
*/
@Override
public Message request(final Message msg, final long timeout) throws RequestTimeoutException, MQClientException,
RemotingException, MQBrokerException, InterruptedException {
msg.setTopic(withNamespace(msg.getTopic()));
return this.defaultMQProducerImpl.request(msg, timeout);
}
/**
*
* @param msg
* @param requestCallback
* @param timeout
* @return
* @throws MQClientException
* @throws RemotingException
* @throws InterruptedException
*/
@Override
public void request(final Message msg, final RequestCallback requestCallback, final long timeout)
throws MQClientException, RemotingException, InterruptedException, MQBrokerException, RequestTimeoutException {
this.defaultMQProducerImpl.request(msg, requestCallback, timeout);
}
/**
*
* @param msg
* @param selector
* @param arg
* @param timeout
* @return
* @throws MQClientException
* @throws RemotingException
* @throws MQBrokerException
* @throws InterruptedException
*/
@Override
public Message request(final Message msg, final MessageQueueSelector selector, final Object arg,
final long timeout) throws MQClientException, RemotingException, MQBrokerException,
InterruptedException, RequestTimeoutException {
return this.defaultMQProducerImpl.request(msg, selector, arg, timeout);
}
/**
*
* @param msg
* @param selector
* @param arg
* @param requestCallback
* @param timeout
* @return
* @throws MQClientException
* @throws RemotingException
* @throws InterruptedException
*/
@Override
public void request(final Message msg, final MessageQueueSelector selector, final Object arg,
final RequestCallback requestCallback, final long timeout) throws MQClientException, RemotingException,
InterruptedException {
this.defaultMQProducerImpl.request(msg, selector, arg, requestCallback, timeout);
}
/**
*
* @param msg
* @param mq
* @param timeout
* @return
* @throws MQClientException
* @throws RemotingException
* @throws MQBrokerException
* @throws InterruptedException
*/
@Override
public Message request(final Message msg, final MessageQueue mq, final long timeout)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException, RequestTimeoutException {
return this.defaultMQProducerImpl.request(msg, mq, timeout);
}
/**
*
* @param msg
* @param mq
* @param requestCallback
* @param timeout
* @return
* @throws MQClientException
* @throws RemotingException
* @throws InterruptedException
*/
@Override
public void request(final Message msg, final MessageQueue mq, final RequestCallback requestCallback, long timeout)
throws MQClientException, RemotingException, InterruptedException {
this.defaultMQProducerImpl.request(msg, mq, requestCallback, timeout);
}
/**
* Same to {@link #sendOneway(Message)} with message queue selector specified.
*
......
......@@ -21,6 +21,7 @@ import java.util.List;
import org.apache.rocketmq.client.MQAdmin;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.exception.RequestTimeoutException;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.exception.RemotingException;
......@@ -98,4 +99,25 @@ public interface MQProducer extends MQAdmin {
SendResult send(final Collection<Message> msgs, final MessageQueue mq, final long timeout)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
//for rpc
Message request(final Message msg, final long timeout) throws RequestTimeoutException, MQClientException,
RemotingException, MQBrokerException, InterruptedException;
void request(final Message msg, final RequestCallback requestCallback, final long timeout)
throws RequestTimeoutException, MQClientException, RemotingException, InterruptedException, MQBrokerException;
Message request(final Message msg, final MessageQueueSelector selector, final Object arg,
final long timeout) throws RequestTimeoutException, MQClientException, RemotingException, MQBrokerException,
InterruptedException;
void request(final Message msg, final MessageQueueSelector selector, final Object arg,
final RequestCallback requestCallback, final long timeout) throws RequestTimeoutException, MQClientException, RemotingException,
InterruptedException;
Message request(final Message msg, final MessageQueue mq, final long timeout)
throws RequestTimeoutException, MQClientException, RemotingException, MQBrokerException, InterruptedException;
void request(final Message msg, final MessageQueue mq, final RequestCallback requestCallback, long timeout)
throws RequestTimeoutException, MQClientException, RemotingException, InterruptedException;
}
package org.apache.rocketmq.client.producer;
import org.apache.rocketmq.common.message.Message;
public interface RequestCallback {
void onSuccess(final Message message);
void onException(final Throwable e);
}
package org.apache.rocketmq.client.producer;
import org.apache.rocketmq.client.common.ClientErrorCode;
import org.apache.rocketmq.client.exception.RequestTimeoutException;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.logging.InternalLogger;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class RequestFutureTable {
private static InternalLogger log = ClientLogger.getLog();
private static ConcurrentHashMap<String, RequestResponseFuture> requestFutureTable = new ConcurrentHashMap<String, RequestResponseFuture>();
public static ConcurrentHashMap<String, RequestResponseFuture> getRequestFutureTable(){
return requestFutureTable;
}
public static void scanExpiredRequest(){
final List<RequestResponseFuture> rfList = new LinkedList<RequestResponseFuture>();
Iterator<Map.Entry<String, RequestResponseFuture>> it = requestFutureTable.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<String, RequestResponseFuture> next = it.next();
RequestResponseFuture rep = next.getValue();
if (rep.isTimeout()) {
it.remove();
rfList.add(rep);
log.warn("remove timeout request, REQUEST_UNIQ_ID={}" + rep.getRequestUniqId());
}
}
for (RequestResponseFuture rf : rfList) {
try {
Throwable cause = new RequestTimeoutException(ClientErrorCode.REQUEST_TIMEOUT_EXCEPTION, "request timeout, no reply message.");
rf.setCause(cause);
rf.executeRequestCallback();
} catch (Throwable e) {
log.warn("scanResponseTable, operationComplete Exception", e);
}
}
}
}
package org.apache.rocketmq.client.producer;
import org.apache.rocketmq.common.message.Message;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
public class RequestResponseFuture {
private final String requestUniqId;
private long timeoutMillis;
private final RequestCallback requestCallback;
private final long beginTimestamp = System.currentTimeMillis();
private CountDownLatch countDownLatch = new CountDownLatch(1);
private AtomicBoolean ececuteCallbackOnlyOnce = new AtomicBoolean(false);
private volatile Message responseMsg = null;
private volatile boolean sendReqeustOk = true;
private volatile Throwable cause = null;
private final Message requestMsg = null;
public RequestResponseFuture(String requestUniqId, long timeoutMillis, RequestCallback requestCallback){
this.requestUniqId = requestUniqId;
this.timeoutMillis = timeoutMillis;
this.requestCallback = requestCallback;
}
public void executeRequestCallback(){
if (requestCallback != null) {
if (sendReqeustOk && cause == null) {
requestCallback.onSuccess(responseMsg);
} else {
requestCallback.onException(cause);
}
}
}
public boolean isTimeout(){
long diff = System.currentTimeMillis() - this.beginTimestamp;
return diff > this.timeoutMillis;
}
public Message waitResponseMessage(final long timeout) throws InterruptedException {
this.countDownLatch.await(timeout, TimeUnit.MILLISECONDS);
return this.responseMsg;
}
public void putResponseMessage(final Message responseMsg){
this.responseMsg = responseMsg;
this.countDownLatch.countDown();
}
public String getRequestUniqId() {
return requestUniqId;
}
public long getTimeoutMillis() {
return timeoutMillis;
}
public void setTimeoutMillis(long timeoutMillis) {
this.timeoutMillis = timeoutMillis;
}
public RequestCallback getRequestCallback() {
return requestCallback;
}
public long getBeginTimestamp() {
return beginTimestamp;
}
public CountDownLatch getCountDownLatch() {
return countDownLatch;
}
public void setCountDownLatch(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}
public AtomicBoolean getEcecuteCallbackOnlyOnce() {
return ececuteCallbackOnlyOnce;
}
public void setEcecuteCallbackOnlyOnce(AtomicBoolean ececuteCallbackOnlyOnce) {
this.ececuteCallbackOnlyOnce = ececuteCallbackOnlyOnce;
}
public Message getResponseMsg() {
return responseMsg;
}
public void setResponseMsg(Message responseMsg) {
this.responseMsg = responseMsg;
}
public boolean isSendReqeustOk() {
return sendReqeustOk;
}
public void setSendReqeustOk(boolean sendReqeustOk) {
this.sendReqeustOk = sendReqeustOk;
}
public Message getRequestMsg() {
return requestMsg;
}
public Throwable getCause() {
return cause;
}
public void setCause(Throwable cause) {
this.cause = cause;
}
}
package org.apache.rocketmq.client.utils;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
public class MessageUtil {
public static Message createReplyMessage(final Message requestMessage) {
if (requestMessage != null) {
Message replyMessage = new Message();
String cluster = requestMessage.getProperty(MessageConst.PROPERTY_CLUSTER);
String replyTo = requestMessage.getProperty(MessageConst.PROPERTY_MESSAGE_REPLY_TO);
String requestUniqId = requestMessage.getProperty(MessageConst.PROPERTY_REQUEST_UNIQ_ID);
String ttl = requestMessage.getProperty(MessageConst.PROPERTY_MESSAGE_TTL);
if (cluster == null) {
}
String replyTopic = MixAll.getReplyTopic(cluster);
replyMessage.setTopic(replyTopic);
MessageAccessor.putProperty(replyMessage, MessageConst.PROPERTY_MESSAGE_TYPE, MixAll.REPLY_MESSAGE_FLAG);
MessageAccessor.putProperty(replyMessage, MessageConst.PROPERTY_REQUEST_UNIQ_ID, requestUniqId);
MessageAccessor.putProperty(replyMessage, MessageConst.PROPERTY_MESSAGE_REPLY_TO, replyTo);
MessageAccessor.putProperty(replyMessage, MessageConst.PROPERTY_MESSAGE_TTL, ttl);
return replyMessage;
}
return null;
}
}
......@@ -61,6 +61,7 @@ public class BrokerConfig {
*/
private int sendMessageThreadPoolNums = 1; //16 + Runtime.getRuntime().availableProcessors() * 4;
private int pullMessageThreadPoolNums = 16 + Runtime.getRuntime().availableProcessors() * 2;
private int processReplyMessageThreadPoolNums = 16 + Runtime.getRuntime().availableProcessors() * 2;
private int queryMessageThreadPoolNums = 8 + Runtime.getRuntime().availableProcessors();
private int adminBrokerThreadPoolNums = 16;
......@@ -83,6 +84,7 @@ public class BrokerConfig {
private boolean fetchNamesrvAddrByAddressServer = false;
private int sendThreadPoolQueueCapacity = 10000;
private int pullThreadPoolQueueCapacity = 100000;
private int replyThreadPoolQueueCapacity = 10000;
private int queryThreadPoolQueueCapacity = 20000;
private int clientManagerThreadPoolQueueCapacity = 1000000;
private int consumerManagerThreadPoolQueueCapacity = 1000000;
......@@ -180,6 +182,8 @@ public class BrokerConfig {
@ImportantField
private boolean aclEnable = false;
private boolean storeReplyMessageEnable = true;
public static String localHostName() {
try {
return InetAddress.getLocalHost().getHostName();
......@@ -374,6 +378,14 @@ public class BrokerConfig {
this.pullMessageThreadPoolNums = pullMessageThreadPoolNums;
}
public int getProcessReplyMessageThreadPoolNums() {
return processReplyMessageThreadPoolNums;
}
public void setProcessReplyMessageThreadPoolNums(int processReplyMessageThreadPoolNums) {
this.processReplyMessageThreadPoolNums = processReplyMessageThreadPoolNums;
}
public int getQueryMessageThreadPoolNums() {
return queryMessageThreadPoolNums;
}
......@@ -470,6 +482,14 @@ public class BrokerConfig {
this.pullThreadPoolQueueCapacity = pullThreadPoolQueueCapacity;
}
public int getReplyThreadPoolQueueCapacity() {
return replyThreadPoolQueueCapacity;
}
public void setReplyThreadPoolQueueCapacity(int replyThreadPoolQueueCapacity) {
this.replyThreadPoolQueueCapacity = replyThreadPoolQueueCapacity;
}
public int getQueryThreadPoolQueueCapacity() {
return queryThreadPoolQueueCapacity;
}
......@@ -765,4 +785,12 @@ public class BrokerConfig {
public void setAclEnable(boolean aclEnable) {
this.aclEnable = aclEnable;
}
public boolean isStoreReplyMessageEnable() {
return storeReplyMessageEnable;
}
public void setStoreReplyMessageEnable(boolean storeReplyMessageEnable) {
this.storeReplyMessageEnable = storeReplyMessageEnable;
}
}
......@@ -84,6 +84,7 @@ public class MixAll {
public static final String RETRY_GROUP_TOPIC_PREFIX = "%RETRY%";
public static final String DLQ_GROUP_TOPIC_PREFIX = "%DLQ%";
public static final String REPLY_TOPIC_POSTFIX = "REPLY_TOPIC";
public static final String SYSTEM_TOPIC_PREFIX = "rmq_sys_";
public static final String UNIQUE_MSG_QUERY_FLAG = "_UNIQUE_KEY_QUERY";
public static final String DEFAULT_TRACE_REGION_ID = "DefaultRegion";
......@@ -96,6 +97,8 @@ public class MixAll {
public static final String CID_SYS_RMQ_TRANS = "CID_RMQ_SYS_TRANS";
public static final String ACL_CONF_TOOLS_FILE = "/conf/tools.yml";
public static final String REPLY_MESSAGE_FLAG = "reply";
public static String getWSAddr() {
String wsDomainName = System.getProperty("rocketmq.namesrv.domain", DEFAULT_NAMESRV_ADDR_LOOKUP);
String wsDomainSubgroup = System.getProperty("rocketmq.namesrv.domain.subgroup", "nsaddr");
......@@ -110,6 +113,10 @@ public class MixAll {
return RETRY_GROUP_TOPIC_PREFIX + consumerGroup;
}
public static String getReplyTopic(final String clusterName) {
return clusterName + "_" + REPLY_TOPIC_POSTFIX;
}
public static boolean isSysConsumerGroup(final String consumerGroup) {
return consumerGroup.startsWith(CID_RMQ_SYS_PREFIX);
}
......
......@@ -45,6 +45,13 @@ public class MessageConst {
public static final String PROPERTY_TRANSACTION_CHECK_TIMES = "TRANSACTION_CHECK_TIMES";
public static final String PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS = "CHECK_IMMUNITY_TIME_IN_SECONDS";
public static final String PROPERTY_INSTANCE_ID = "INSTANCE_ID";
public static final String PROPERTY_REQUEST_UNIQ_ID = "REQUEST_UNIQ_ID";
public static final String PROPERTY_MESSAGE_REPLY_TO = "REPLY_TO";
public static final String PROPERTY_MESSAGE_TTL = "TTL";
public static final String PROPERTY_REPLY_MESSAGE_ARRIVE_TIME = "ARRIVE_TIME";
public static final String PROPERTY_PUSH_REPLY_TIME = "PUSH_REPLY_TIME";
public static final String PROPERTY_CLUSTER = "CLUSTER";
public static final String PROPERTY_MESSAGE_TYPE = "MSG_TYPE";
public static final String KEY_SEPARATOR = " ";
......@@ -74,5 +81,12 @@ public class MessageConst {
STRING_HASH_SET.add(PROPERTY_MAX_RECONSUME_TIMES);
STRING_HASH_SET.add(PROPERTY_CONSUME_START_TIMESTAMP);
STRING_HASH_SET.add(PROPERTY_INSTANCE_ID);
STRING_HASH_SET.add(PROPERTY_REQUEST_UNIQ_ID);
STRING_HASH_SET.add(PROPERTY_MESSAGE_REPLY_TO);
STRING_HASH_SET.add(PROPERTY_MESSAGE_TTL);
STRING_HASH_SET.add(PROPERTY_REPLY_MESSAGE_ARRIVE_TIME);
STRING_HASH_SET.add(PROPERTY_PUSH_REPLY_TIME);
STRING_HASH_SET.add(PROPERTY_CLUSTER);
STRING_HASH_SET.add(PROPERTY_MESSAGE_TYPE);
}
}
......@@ -180,4 +180,10 @@ public class RequestCode {
* resume logic of checking half messages that have been put in TRANS_CHECK_MAXTIME_TOPIC before
*/
public static final int RESUME_CHECK_HALF_MESSAGE = 323;
public static final int SEND_REPLY_MESSAGE = 324;
public static final int SEND_REPLY_MESSAGE_V2 = 325;
public static final int PUSH_REPLY_MESSAGE_TO_CLIENT = 326;
}
package org.apache.rocketmq.common.protocol.header;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.annotation.CFNullable;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
public class ReplyMessageRequestHeader implements CommandCustomHeader {
@CFNotNull
private String producerGroup;
@CFNotNull
private String topic;
@CFNotNull
private String defaultTopic;
@CFNotNull
private Integer defaultTopicQueueNums;
@CFNotNull
private Integer queueId;
@CFNotNull
private Integer sysFlag;
@CFNotNull
private Long bornTimestamp;
@CFNotNull
private Integer flag;
@CFNullable
private String properties;
@CFNullable
private Integer reconsumeTimes;
@CFNullable
private boolean unitMode = false;
@CFNotNull
private String bornHost;
@CFNotNull
private String storeHost;
@CFNotNull
private long storeTimestamp;
public void checkFields() throws RemotingCommandException {
}
public String getProducerGroup() {
return producerGroup;
}
public void setProducerGroup(String producerGroup) {
this.producerGroup = producerGroup;
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public String getDefaultTopic() {
return defaultTopic;
}
public void setDefaultTopic(String defaultTopic) {
this.defaultTopic = defaultTopic;
}
public Integer getDefaultTopicQueueNums() {
return defaultTopicQueueNums;
}
public void setDefaultTopicQueueNums(Integer defaultTopicQueueNums) {
this.defaultTopicQueueNums = defaultTopicQueueNums;
}
public Integer getQueueId() {
return queueId;
}
public void setQueueId(Integer queueId) {
this.queueId = queueId;
}
public Integer getSysFlag() {
return sysFlag;
}
public void setSysFlag(Integer sysFlag) {
this.sysFlag = sysFlag;
}
public Long getBornTimestamp() {
return bornTimestamp;
}
public void setBornTimestamp(Long bornTimestamp) {
this.bornTimestamp = bornTimestamp;
}
public Integer getFlag() {
return flag;
}
public void setFlag(Integer flag) {
this.flag = flag;
}
public String getProperties() {
return properties;
}
public void setProperties(String properties) {
this.properties = properties;
}
public Integer getReconsumeTimes() {
return reconsumeTimes;
}
public void setReconsumeTimes(Integer reconsumeTimes) {
this.reconsumeTimes = reconsumeTimes;
}
public boolean isUnitMode() {
return unitMode;
}
public void setUnitMode(boolean unitMode) {
this.unitMode = unitMode;
}
public String getBornHost() {
return bornHost;
}
public void setBornHost(String bornHost) {
this.bornHost = bornHost;
}
public String getStoreHost() {
return storeHost;
}
public void setStoreHost(String storeHost) {
this.storeHost = storeHost;
}
public long getStoreTimestamp() {
return storeTimestamp;
}
public void setStoreTimestamp(long storeTimestamp) {
this.storeTimestamp = storeTimestamp;
}
}
package org.apache.rocketmq.common.utils;
import java.util.UUID;
public class RequestIdUtil {
public static String createUniqueRequestId(){
return UUID.randomUUID().toString();
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册