diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java b/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java index 3bea2d8e5dd9ea8337c8ebc98c1910381d9cda30..5cac5e832a87fc63ecb55185603ca8665dce2b56 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java @@ -280,12 +280,12 @@ public class ClientRemotingProcessor implements NettyRequestProcessor { } private void processReplyMessage(MessageExt replyMsg) { - final String uniqueId = replyMsg.getUserProperty(MessageConst.PROPERTY_REQUEST_UNIQ_ID); - final RequestResponseFuture requestResponseFuture = RequestFutureTable.getRequestFutureTable().get(uniqueId); + final String correlationId = replyMsg.getUserProperty(MessageConst.PROPERTY_CORRELATION_ID); + final RequestResponseFuture requestResponseFuture = RequestFutureTable.getRequestFutureTable().get(correlationId); if (requestResponseFuture != null) { requestResponseFuture.putResponseMessage(replyMsg); - RequestFutureTable.getRequestFutureTable().remove(uniqueId); + RequestFutureTable.getRequestFutureTable().remove(correlationId); if (requestResponseFuture.getRequestCallback() != null) { requestResponseFuture.getRequestCallback().onSuccess(replyMsg); @@ -293,7 +293,7 @@ public class ClientRemotingProcessor implements NettyRequestProcessor { requestResponseFuture.putResponseMessage(replyMsg); } } else { - log.warn(String.format("receive reply message, but not matched any request, REQUEST_UNIQ_ID: %s", uniqueId)); + log.warn(String.format("receive reply message, but not matched any request, CorrelationId: %s", correlationId)); } } } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java index c2736bc0cb54caa7bf3b73b7b6f216f43f3cc754..c1c37737d8bff2593d5c7cbc9a742e3e4725b0b6 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java @@ -85,13 +85,12 @@ import org.apache.rocketmq.common.protocol.header.CheckTransactionStateRequestHe import org.apache.rocketmq.common.protocol.header.EndTransactionRequestHeader; import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader; import org.apache.rocketmq.common.sysflag.MessageSysFlag; -import org.apache.rocketmq.common.utils.RequestIdUtil; +import org.apache.rocketmq.common.utils.CorrelationIdUtil; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingConnectException; import org.apache.rocketmq.remoting.exception.RemotingException; -import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException; @@ -1341,11 +1340,11 @@ public class DefaultMQProducerImpl implements MQProducerInner { long timeout) throws RequestTimeoutException, MQClientException, RemotingException, MQBrokerException, InterruptedException { long beginTimestamp = System.currentTimeMillis(); prepareSendRequest(msg, timeout); - final String requestUniqId = msg.getProperty(MessageConst.PROPERTY_REQUEST_UNIQ_ID); + final String correlationId = msg.getProperty(MessageConst.PROPERTY_CORRELATION_ID); try { - final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(requestUniqId, timeout, null); - RequestFutureTable.getRequestFutureTable().put(requestUniqId, requestResponseFuture); + final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(correlationId, timeout, null); + RequestFutureTable.getRequestFutureTable().put(correlationId, requestResponseFuture); long cost = System.currentTimeMillis() - beginTimestamp; this.sendDefaultImpl(msg, CommunicationMode.ASYNC, new SendCallback() { @@ -1373,7 +1372,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { } return responseMessage; } finally { - RequestFutureTable.getRequestFutureTable().remove(requestUniqId); + RequestFutureTable.getRequestFutureTable().remove(correlationId); } } @@ -1381,10 +1380,10 @@ public class DefaultMQProducerImpl implements MQProducerInner { throws RemotingException, InterruptedException, MQClientException, MQBrokerException { long beginTimestamp = System.currentTimeMillis(); prepareSendRequest(msg, timeout); - final String requestUniqId = msg.getProperty(MessageConst.PROPERTY_REQUEST_UNIQ_ID); + final String correlationId = msg.getProperty(MessageConst.PROPERTY_CORRELATION_ID); - final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(requestUniqId, timeout, requestCallback); - RequestFutureTable.getRequestFutureTable().put(requestUniqId, requestResponseFuture); + final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(correlationId, timeout, requestCallback); + RequestFutureTable.getRequestFutureTable().put(correlationId, requestResponseFuture); long cost = System.currentTimeMillis() - beginTimestamp; this.sendDefaultImpl(msg, CommunicationMode.ASYNC, new SendCallback() { @@ -1396,7 +1395,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { @Override public void onException(Throwable e) { requestResponseFuture.setCause(e); - requestFail(requestUniqId); + requestFail(correlationId); } }, timeout - cost); } @@ -1406,11 +1405,11 @@ public class DefaultMQProducerImpl implements MQProducerInner { InterruptedException, RequestTimeoutException { long beginTimestamp = System.currentTimeMillis(); prepareSendRequest(msg, timeout); - final String requestUniqId = msg.getProperty(MessageConst.PROPERTY_REQUEST_UNIQ_ID); + final String correlationId = msg.getProperty(MessageConst.PROPERTY_CORRELATION_ID); try { - final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(requestUniqId, timeout, null); - RequestFutureTable.getRequestFutureTable().put(requestUniqId, requestResponseFuture); + final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(correlationId, timeout, null); + RequestFutureTable.getRequestFutureTable().put(correlationId, requestResponseFuture); long cost = System.currentTimeMillis() - beginTimestamp; this.sendSelectImpl(msg, selector, arg, CommunicationMode.ASYNC, new SendCallback() { @@ -1438,7 +1437,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { } return responseMessage; } finally { - RequestFutureTable.getRequestFutureTable().remove(requestUniqId); + RequestFutureTable.getRequestFutureTable().remove(correlationId); } } @@ -1447,10 +1446,10 @@ public class DefaultMQProducerImpl implements MQProducerInner { throws RemotingException, InterruptedException, MQClientException, MQBrokerException { long beginTimestamp = System.currentTimeMillis(); prepareSendRequest(msg, timeout); - final String requestUniqId = msg.getProperty(MessageConst.PROPERTY_REQUEST_UNIQ_ID); + final String correlationId = msg.getProperty(MessageConst.PROPERTY_CORRELATION_ID); - final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(requestUniqId, timeout, requestCallback); - RequestFutureTable.getRequestFutureTable().put(requestUniqId, requestResponseFuture); + final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(correlationId, timeout, requestCallback); + RequestFutureTable.getRequestFutureTable().put(correlationId, requestResponseFuture); long cost = System.currentTimeMillis() - beginTimestamp; this.sendSelectImpl(msg, selector, arg, CommunicationMode.ASYNC, new SendCallback() { @@ -1462,7 +1461,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { @Override public void onException(Throwable e) { requestResponseFuture.setCause(e); - requestFail(requestUniqId); + requestFail(correlationId); } }, timeout - cost); @@ -1472,11 +1471,11 @@ public class DefaultMQProducerImpl implements MQProducerInner { throws MQClientException, RemotingException, MQBrokerException, InterruptedException, RequestTimeoutException { long beginTimestamp = System.currentTimeMillis(); prepareSendRequest(msg, timeout); - final String requestUniqId = msg.getProperty(MessageConst.PROPERTY_REQUEST_UNIQ_ID); + final String correlationId = msg.getProperty(MessageConst.PROPERTY_CORRELATION_ID); try { - final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(requestUniqId, timeout, null); - RequestFutureTable.getRequestFutureTable().put(requestUniqId, requestResponseFuture); + final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(correlationId, timeout, null); + RequestFutureTable.getRequestFutureTable().put(correlationId, requestResponseFuture); long cost = System.currentTimeMillis() - beginTimestamp; this.sendKernelImpl(msg, mq, CommunicationMode.ASYNC, new SendCallback() { @@ -1504,7 +1503,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { } return responseMessage; } finally { - RequestFutureTable.getRequestFutureTable().remove(requestUniqId); + RequestFutureTable.getRequestFutureTable().remove(correlationId); } } @@ -1512,10 +1511,10 @@ public class DefaultMQProducerImpl implements MQProducerInner { throws RemotingException, InterruptedException, MQClientException, MQBrokerException { long beginTimestamp = System.currentTimeMillis(); prepareSendRequest(msg, timeout); - final String requestUniqId = msg.getProperty(MessageConst.PROPERTY_REQUEST_UNIQ_ID); + final String correlationId = msg.getProperty(MessageConst.PROPERTY_CORRELATION_ID); - final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(requestUniqId, timeout, requestCallback); - RequestFutureTable.getRequestFutureTable().put(requestUniqId, requestResponseFuture); + final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(correlationId, timeout, requestCallback); + RequestFutureTable.getRequestFutureTable().put(correlationId, requestResponseFuture); long cost = System.currentTimeMillis() - beginTimestamp; this.sendKernelImpl(msg, mq, CommunicationMode.ASYNC, new SendCallback() { @@ -1527,13 +1526,13 @@ public class DefaultMQProducerImpl implements MQProducerInner { @Override public void onException(Throwable e) { requestResponseFuture.setCause(e); - requestFail(requestUniqId); + requestFail(correlationId); } }, null, timeout - cost); } - private void requestFail(final String requestUniqId) { - RequestResponseFuture responseFuture = RequestFutureTable.getRequestFutureTable().remove(requestUniqId); + private void requestFail(final String correlationId) { + RequestResponseFuture responseFuture = RequestFutureTable.getRequestFutureTable().remove(correlationId); if (responseFuture != null) { responseFuture.setSendReqeustOk(false); responseFuture.putResponseMessage(null); @@ -1546,9 +1545,9 @@ public class DefaultMQProducerImpl implements MQProducerInner { } private void prepareSendRequest(final Message msg, long timeout) { - String requestUniqId = RequestIdUtil.createUniqueRequestId(); + String correlationId = CorrelationIdUtil.createCorrelationId(); String requestClientId = this.getmQClientFactory().getClientId(); - MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REQUEST_UNIQ_ID, requestUniqId); + MessageAccessor.putProperty(msg, MessageConst.PROPERTY_CORRELATION_ID, correlationId); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MESSAGE_REPLY_TO, requestClientId); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MESSAGE_TTL, String.valueOf(timeout)); diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/RequestFutureTable.java b/client/src/main/java/org/apache/rocketmq/client/producer/RequestFutureTable.java index 6ce53119ea8f187ee85da5f3d9ae16d29c8f93ca..3d4caa208bd203c8842c389f763181f20f36a0b0 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/RequestFutureTable.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/RequestFutureTable.java @@ -45,7 +45,7 @@ public class RequestFutureTable { if (rep.isTimeout()) { it.remove(); rfList.add(rep); - log.warn("remove timeout request, REQUEST_UNIQ_ID={}" + rep.getRequestUniqId()); + log.warn("remove timeout request, CorrelationId={}" + rep.getCorrelationId()); } } diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/RequestResponseFuture.java b/client/src/main/java/org/apache/rocketmq/client/producer/RequestResponseFuture.java index c476ba30f1f7256b137cdb48f7494098826ceed8..c54b236b1616fcf9d4f586af8ac2f1e52d09c85f 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/RequestResponseFuture.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/RequestResponseFuture.java @@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit; import org.apache.rocketmq.common.message.Message; public class RequestResponseFuture { - private final String requestUniqId; + private final String correlationId; private final RequestCallback requestCallback; private final long beginTimestamp = System.currentTimeMillis(); private final Message requestMsg = null; @@ -32,8 +32,8 @@ public class RequestResponseFuture { private volatile boolean sendRequestOk = true; private volatile Throwable cause = null; - public RequestResponseFuture(String requestUniqId, long timeoutMillis, RequestCallback requestCallback) { - this.requestUniqId = requestUniqId; + public RequestResponseFuture(String correlationId, long timeoutMillis, RequestCallback requestCallback) { + this.correlationId = correlationId; this.timeoutMillis = timeoutMillis; this.requestCallback = requestCallback; } @@ -63,8 +63,8 @@ public class RequestResponseFuture { this.countDownLatch.countDown(); } - public String getRequestUniqId() { - return requestUniqId; + public String getCorrelationId() { + return correlationId; } public long getTimeoutMillis() { diff --git a/client/src/main/java/org/apache/rocketmq/client/utils/MessageUtil.java b/client/src/main/java/org/apache/rocketmq/client/utils/MessageUtil.java index 6db470fd5d1f33e8b11f16d3c524c3f0662c35e8..626b4525bb2adf230d01da0e864822628bfc34ee 100644 --- a/client/src/main/java/org/apache/rocketmq/client/utils/MessageUtil.java +++ b/client/src/main/java/org/apache/rocketmq/client/utils/MessageUtil.java @@ -30,14 +30,14 @@ public class MessageUtil { Message replyMessage = new Message(); String cluster = requestMessage.getProperty(MessageConst.PROPERTY_CLUSTER); String replyTo = requestMessage.getProperty(MessageConst.PROPERTY_MESSAGE_REPLY_TO); - String requestUniqId = requestMessage.getProperty(MessageConst.PROPERTY_REQUEST_UNIQ_ID); + String correlationId = requestMessage.getProperty(MessageConst.PROPERTY_CORRELATION_ID); String ttl = requestMessage.getProperty(MessageConst.PROPERTY_MESSAGE_TTL); replyMessage.setBody(body); if (cluster != null) { String replyTopic = MixAll.getReplyTopic(cluster); replyMessage.setTopic(replyTopic); MessageAccessor.putProperty(replyMessage, MessageConst.PROPERTY_MESSAGE_TYPE, MixAll.REPLY_MESSAGE_FLAG); - MessageAccessor.putProperty(replyMessage, MessageConst.PROPERTY_REQUEST_UNIQ_ID, requestUniqId); + MessageAccessor.putProperty(replyMessage, MessageConst.PROPERTY_CORRELATION_ID, correlationId); MessageAccessor.putProperty(replyMessage, MessageConst.PROPERTY_MESSAGE_REPLY_TO, replyTo); MessageAccessor.putProperty(replyMessage, MessageConst.PROPERTY_MESSAGE_TTL, ttl); diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java index 178e417c7a111716eba9ed7010fd048ecc835fe5..f773c2b945fe8ad6baa4379ef42fbce7ffcee0aa 100644 --- a/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java +++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java @@ -45,7 +45,7 @@ public class MessageConst { public static final String PROPERTY_TRANSACTION_CHECK_TIMES = "TRANSACTION_CHECK_TIMES"; public static final String PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS = "CHECK_IMMUNITY_TIME_IN_SECONDS"; public static final String PROPERTY_INSTANCE_ID = "INSTANCE_ID"; - public static final String PROPERTY_REQUEST_UNIQ_ID = "REQUEST_UNIQ_ID"; + public static final String PROPERTY_CORRELATION_ID = "CORRELATION_ID"; public static final String PROPERTY_MESSAGE_REPLY_TO = "REPLY_TO"; public static final String PROPERTY_MESSAGE_TTL = "TTL"; public static final String PROPERTY_REPLY_MESSAGE_ARRIVE_TIME = "ARRIVE_TIME"; @@ -81,7 +81,7 @@ public class MessageConst { STRING_HASH_SET.add(PROPERTY_MAX_RECONSUME_TIMES); STRING_HASH_SET.add(PROPERTY_CONSUME_START_TIMESTAMP); STRING_HASH_SET.add(PROPERTY_INSTANCE_ID); - STRING_HASH_SET.add(PROPERTY_REQUEST_UNIQ_ID); + STRING_HASH_SET.add(PROPERTY_CORRELATION_ID); STRING_HASH_SET.add(PROPERTY_MESSAGE_REPLY_TO); STRING_HASH_SET.add(PROPERTY_MESSAGE_TTL); STRING_HASH_SET.add(PROPERTY_REPLY_MESSAGE_ARRIVE_TIME); diff --git a/common/src/main/java/org/apache/rocketmq/common/utils/RequestIdUtil.java b/common/src/main/java/org/apache/rocketmq/common/utils/CorrelationIdUtil.java similarity index 91% rename from common/src/main/java/org/apache/rocketmq/common/utils/RequestIdUtil.java rename to common/src/main/java/org/apache/rocketmq/common/utils/CorrelationIdUtil.java index 86c5096df5d8e66d3c30beb0bb085c443ee4048a..86d1fd4d42dbf21db700df4ce84667c2d4980287 100644 --- a/common/src/main/java/org/apache/rocketmq/common/utils/RequestIdUtil.java +++ b/common/src/main/java/org/apache/rocketmq/common/utils/CorrelationIdUtil.java @@ -19,8 +19,8 @@ package org.apache.rocketmq.common.utils; import java.util.UUID; -public class RequestIdUtil { - public static String createUniqueRequestId() { +public class CorrelationIdUtil { + public static String createCorrelationId() { return UUID.randomUUID().toString(); } }