提交 9800afb0 编写于 作者: Q qqeasonchen

rename REQUEST_UNIQ_ID to CORRELATION_ID

上级 ebc0ede8
......@@ -280,12 +280,12 @@ public class ClientRemotingProcessor implements NettyRequestProcessor {
}
private void processReplyMessage(MessageExt replyMsg) {
final String uniqueId = replyMsg.getUserProperty(MessageConst.PROPERTY_REQUEST_UNIQ_ID);
final RequestResponseFuture requestResponseFuture = RequestFutureTable.getRequestFutureTable().get(uniqueId);
final String correlationId = replyMsg.getUserProperty(MessageConst.PROPERTY_CORRELATION_ID);
final RequestResponseFuture requestResponseFuture = RequestFutureTable.getRequestFutureTable().get(correlationId);
if (requestResponseFuture != null) {
requestResponseFuture.putResponseMessage(replyMsg);
RequestFutureTable.getRequestFutureTable().remove(uniqueId);
RequestFutureTable.getRequestFutureTable().remove(correlationId);
if (requestResponseFuture.getRequestCallback() != null) {
requestResponseFuture.getRequestCallback().onSuccess(replyMsg);
......@@ -293,7 +293,7 @@ public class ClientRemotingProcessor implements NettyRequestProcessor {
requestResponseFuture.putResponseMessage(replyMsg);
}
} else {
log.warn(String.format("receive reply message, but not matched any request, REQUEST_UNIQ_ID: %s", uniqueId));
log.warn(String.format("receive reply message, but not matched any request, CorrelationId: %s", correlationId));
}
}
}
......@@ -85,13 +85,12 @@ import org.apache.rocketmq.common.protocol.header.CheckTransactionStateRequestHe
import org.apache.rocketmq.common.protocol.header.EndTransactionRequestHeader;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.common.utils.RequestIdUtil;
import org.apache.rocketmq.common.utils.CorrelationIdUtil;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
......@@ -1341,11 +1340,11 @@ public class DefaultMQProducerImpl implements MQProducerInner {
long timeout) throws RequestTimeoutException, MQClientException, RemotingException, MQBrokerException, InterruptedException {
long beginTimestamp = System.currentTimeMillis();
prepareSendRequest(msg, timeout);
final String requestUniqId = msg.getProperty(MessageConst.PROPERTY_REQUEST_UNIQ_ID);
final String correlationId = msg.getProperty(MessageConst.PROPERTY_CORRELATION_ID);
try {
final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(requestUniqId, timeout, null);
RequestFutureTable.getRequestFutureTable().put(requestUniqId, requestResponseFuture);
final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(correlationId, timeout, null);
RequestFutureTable.getRequestFutureTable().put(correlationId, requestResponseFuture);
long cost = System.currentTimeMillis() - beginTimestamp;
this.sendDefaultImpl(msg, CommunicationMode.ASYNC, new SendCallback() {
......@@ -1373,7 +1372,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
}
return responseMessage;
} finally {
RequestFutureTable.getRequestFutureTable().remove(requestUniqId);
RequestFutureTable.getRequestFutureTable().remove(correlationId);
}
}
......@@ -1381,10 +1380,10 @@ public class DefaultMQProducerImpl implements MQProducerInner {
throws RemotingException, InterruptedException, MQClientException, MQBrokerException {
long beginTimestamp = System.currentTimeMillis();
prepareSendRequest(msg, timeout);
final String requestUniqId = msg.getProperty(MessageConst.PROPERTY_REQUEST_UNIQ_ID);
final String correlationId = msg.getProperty(MessageConst.PROPERTY_CORRELATION_ID);
final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(requestUniqId, timeout, requestCallback);
RequestFutureTable.getRequestFutureTable().put(requestUniqId, requestResponseFuture);
final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(correlationId, timeout, requestCallback);
RequestFutureTable.getRequestFutureTable().put(correlationId, requestResponseFuture);
long cost = System.currentTimeMillis() - beginTimestamp;
this.sendDefaultImpl(msg, CommunicationMode.ASYNC, new SendCallback() {
......@@ -1396,7 +1395,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
@Override
public void onException(Throwable e) {
requestResponseFuture.setCause(e);
requestFail(requestUniqId);
requestFail(correlationId);
}
}, timeout - cost);
}
......@@ -1406,11 +1405,11 @@ public class DefaultMQProducerImpl implements MQProducerInner {
InterruptedException, RequestTimeoutException {
long beginTimestamp = System.currentTimeMillis();
prepareSendRequest(msg, timeout);
final String requestUniqId = msg.getProperty(MessageConst.PROPERTY_REQUEST_UNIQ_ID);
final String correlationId = msg.getProperty(MessageConst.PROPERTY_CORRELATION_ID);
try {
final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(requestUniqId, timeout, null);
RequestFutureTable.getRequestFutureTable().put(requestUniqId, requestResponseFuture);
final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(correlationId, timeout, null);
RequestFutureTable.getRequestFutureTable().put(correlationId, requestResponseFuture);
long cost = System.currentTimeMillis() - beginTimestamp;
this.sendSelectImpl(msg, selector, arg, CommunicationMode.ASYNC, new SendCallback() {
......@@ -1438,7 +1437,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
}
return responseMessage;
} finally {
RequestFutureTable.getRequestFutureTable().remove(requestUniqId);
RequestFutureTable.getRequestFutureTable().remove(correlationId);
}
}
......@@ -1447,10 +1446,10 @@ public class DefaultMQProducerImpl implements MQProducerInner {
throws RemotingException, InterruptedException, MQClientException, MQBrokerException {
long beginTimestamp = System.currentTimeMillis();
prepareSendRequest(msg, timeout);
final String requestUniqId = msg.getProperty(MessageConst.PROPERTY_REQUEST_UNIQ_ID);
final String correlationId = msg.getProperty(MessageConst.PROPERTY_CORRELATION_ID);
final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(requestUniqId, timeout, requestCallback);
RequestFutureTable.getRequestFutureTable().put(requestUniqId, requestResponseFuture);
final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(correlationId, timeout, requestCallback);
RequestFutureTable.getRequestFutureTable().put(correlationId, requestResponseFuture);
long cost = System.currentTimeMillis() - beginTimestamp;
this.sendSelectImpl(msg, selector, arg, CommunicationMode.ASYNC, new SendCallback() {
......@@ -1462,7 +1461,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
@Override
public void onException(Throwable e) {
requestResponseFuture.setCause(e);
requestFail(requestUniqId);
requestFail(correlationId);
}
}, timeout - cost);
......@@ -1472,11 +1471,11 @@ public class DefaultMQProducerImpl implements MQProducerInner {
throws MQClientException, RemotingException, MQBrokerException, InterruptedException, RequestTimeoutException {
long beginTimestamp = System.currentTimeMillis();
prepareSendRequest(msg, timeout);
final String requestUniqId = msg.getProperty(MessageConst.PROPERTY_REQUEST_UNIQ_ID);
final String correlationId = msg.getProperty(MessageConst.PROPERTY_CORRELATION_ID);
try {
final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(requestUniqId, timeout, null);
RequestFutureTable.getRequestFutureTable().put(requestUniqId, requestResponseFuture);
final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(correlationId, timeout, null);
RequestFutureTable.getRequestFutureTable().put(correlationId, requestResponseFuture);
long cost = System.currentTimeMillis() - beginTimestamp;
this.sendKernelImpl(msg, mq, CommunicationMode.ASYNC, new SendCallback() {
......@@ -1504,7 +1503,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
}
return responseMessage;
} finally {
RequestFutureTable.getRequestFutureTable().remove(requestUniqId);
RequestFutureTable.getRequestFutureTable().remove(correlationId);
}
}
......@@ -1512,10 +1511,10 @@ public class DefaultMQProducerImpl implements MQProducerInner {
throws RemotingException, InterruptedException, MQClientException, MQBrokerException {
long beginTimestamp = System.currentTimeMillis();
prepareSendRequest(msg, timeout);
final String requestUniqId = msg.getProperty(MessageConst.PROPERTY_REQUEST_UNIQ_ID);
final String correlationId = msg.getProperty(MessageConst.PROPERTY_CORRELATION_ID);
final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(requestUniqId, timeout, requestCallback);
RequestFutureTable.getRequestFutureTable().put(requestUniqId, requestResponseFuture);
final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(correlationId, timeout, requestCallback);
RequestFutureTable.getRequestFutureTable().put(correlationId, requestResponseFuture);
long cost = System.currentTimeMillis() - beginTimestamp;
this.sendKernelImpl(msg, mq, CommunicationMode.ASYNC, new SendCallback() {
......@@ -1527,13 +1526,13 @@ public class DefaultMQProducerImpl implements MQProducerInner {
@Override
public void onException(Throwable e) {
requestResponseFuture.setCause(e);
requestFail(requestUniqId);
requestFail(correlationId);
}
}, null, timeout - cost);
}
private void requestFail(final String requestUniqId) {
RequestResponseFuture responseFuture = RequestFutureTable.getRequestFutureTable().remove(requestUniqId);
private void requestFail(final String correlationId) {
RequestResponseFuture responseFuture = RequestFutureTable.getRequestFutureTable().remove(correlationId);
if (responseFuture != null) {
responseFuture.setSendReqeustOk(false);
responseFuture.putResponseMessage(null);
......@@ -1546,9 +1545,9 @@ public class DefaultMQProducerImpl implements MQProducerInner {
}
private void prepareSendRequest(final Message msg, long timeout) {
String requestUniqId = RequestIdUtil.createUniqueRequestId();
String correlationId = CorrelationIdUtil.createCorrelationId();
String requestClientId = this.getmQClientFactory().getClientId();
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REQUEST_UNIQ_ID, requestUniqId);
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_CORRELATION_ID, correlationId);
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MESSAGE_REPLY_TO, requestClientId);
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MESSAGE_TTL, String.valueOf(timeout));
......
......@@ -45,7 +45,7 @@ public class RequestFutureTable {
if (rep.isTimeout()) {
it.remove();
rfList.add(rep);
log.warn("remove timeout request, REQUEST_UNIQ_ID={}" + rep.getRequestUniqId());
log.warn("remove timeout request, CorrelationId={}" + rep.getCorrelationId());
}
}
......
......@@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.common.message.Message;
public class RequestResponseFuture {
private final String requestUniqId;
private final String correlationId;
private final RequestCallback requestCallback;
private final long beginTimestamp = System.currentTimeMillis();
private final Message requestMsg = null;
......@@ -32,8 +32,8 @@ public class RequestResponseFuture {
private volatile boolean sendRequestOk = true;
private volatile Throwable cause = null;
public RequestResponseFuture(String requestUniqId, long timeoutMillis, RequestCallback requestCallback) {
this.requestUniqId = requestUniqId;
public RequestResponseFuture(String correlationId, long timeoutMillis, RequestCallback requestCallback) {
this.correlationId = correlationId;
this.timeoutMillis = timeoutMillis;
this.requestCallback = requestCallback;
}
......@@ -63,8 +63,8 @@ public class RequestResponseFuture {
this.countDownLatch.countDown();
}
public String getRequestUniqId() {
return requestUniqId;
public String getCorrelationId() {
return correlationId;
}
public long getTimeoutMillis() {
......
......@@ -30,14 +30,14 @@ public class MessageUtil {
Message replyMessage = new Message();
String cluster = requestMessage.getProperty(MessageConst.PROPERTY_CLUSTER);
String replyTo = requestMessage.getProperty(MessageConst.PROPERTY_MESSAGE_REPLY_TO);
String requestUniqId = requestMessage.getProperty(MessageConst.PROPERTY_REQUEST_UNIQ_ID);
String correlationId = requestMessage.getProperty(MessageConst.PROPERTY_CORRELATION_ID);
String ttl = requestMessage.getProperty(MessageConst.PROPERTY_MESSAGE_TTL);
replyMessage.setBody(body);
if (cluster != null) {
String replyTopic = MixAll.getReplyTopic(cluster);
replyMessage.setTopic(replyTopic);
MessageAccessor.putProperty(replyMessage, MessageConst.PROPERTY_MESSAGE_TYPE, MixAll.REPLY_MESSAGE_FLAG);
MessageAccessor.putProperty(replyMessage, MessageConst.PROPERTY_REQUEST_UNIQ_ID, requestUniqId);
MessageAccessor.putProperty(replyMessage, MessageConst.PROPERTY_CORRELATION_ID, correlationId);
MessageAccessor.putProperty(replyMessage, MessageConst.PROPERTY_MESSAGE_REPLY_TO, replyTo);
MessageAccessor.putProperty(replyMessage, MessageConst.PROPERTY_MESSAGE_TTL, ttl);
......
......@@ -45,7 +45,7 @@ public class MessageConst {
public static final String PROPERTY_TRANSACTION_CHECK_TIMES = "TRANSACTION_CHECK_TIMES";
public static final String PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS = "CHECK_IMMUNITY_TIME_IN_SECONDS";
public static final String PROPERTY_INSTANCE_ID = "INSTANCE_ID";
public static final String PROPERTY_REQUEST_UNIQ_ID = "REQUEST_UNIQ_ID";
public static final String PROPERTY_CORRELATION_ID = "CORRELATION_ID";
public static final String PROPERTY_MESSAGE_REPLY_TO = "REPLY_TO";
public static final String PROPERTY_MESSAGE_TTL = "TTL";
public static final String PROPERTY_REPLY_MESSAGE_ARRIVE_TIME = "ARRIVE_TIME";
......@@ -81,7 +81,7 @@ public class MessageConst {
STRING_HASH_SET.add(PROPERTY_MAX_RECONSUME_TIMES);
STRING_HASH_SET.add(PROPERTY_CONSUME_START_TIMESTAMP);
STRING_HASH_SET.add(PROPERTY_INSTANCE_ID);
STRING_HASH_SET.add(PROPERTY_REQUEST_UNIQ_ID);
STRING_HASH_SET.add(PROPERTY_CORRELATION_ID);
STRING_HASH_SET.add(PROPERTY_MESSAGE_REPLY_TO);
STRING_HASH_SET.add(PROPERTY_MESSAGE_TTL);
STRING_HASH_SET.add(PROPERTY_REPLY_MESSAGE_ARRIVE_TIME);
......
......@@ -19,8 +19,8 @@ package org.apache.rocketmq.common.utils;
import java.util.UUID;
public class RequestIdUtil {
public static String createUniqueRequestId() {
public class CorrelationIdUtil {
public static String createCorrelationId() {
return UUID.randomUUID().toString();
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册