diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index 643b287303cb1aaea678ea0be8eb6e084fc15a8b..073e3679be0462ed657a942c1a63ba0698f04931 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -643,72 +643,64 @@ public class MQClientAPIImpl { final Message msg, final RemotingCommand response ) throws MQBrokerException, RemotingCommandException { + SendStatus sendStatus; switch (response.getCode()) { - case ResponseCode.FLUSH_DISK_TIMEOUT: - case ResponseCode.FLUSH_SLAVE_TIMEOUT: + case ResponseCode.FLUSH_DISK_TIMEOUT: { + sendStatus = SendStatus.FLUSH_DISK_TIMEOUT; + break; + } + case ResponseCode.FLUSH_SLAVE_TIMEOUT: { + sendStatus = SendStatus.FLUSH_SLAVE_TIMEOUT; + break; + } case ResponseCode.SLAVE_NOT_AVAILABLE: { + sendStatus = SendStatus.SLAVE_NOT_AVAILABLE; + break; } case ResponseCode.SUCCESS: { - SendStatus sendStatus = SendStatus.SEND_OK; - switch (response.getCode()) { - case ResponseCode.FLUSH_DISK_TIMEOUT: - sendStatus = SendStatus.FLUSH_DISK_TIMEOUT; - break; - case ResponseCode.FLUSH_SLAVE_TIMEOUT: - sendStatus = SendStatus.FLUSH_SLAVE_TIMEOUT; - break; - case ResponseCode.SLAVE_NOT_AVAILABLE: - sendStatus = SendStatus.SLAVE_NOT_AVAILABLE; - break; - case ResponseCode.SUCCESS: - sendStatus = SendStatus.SEND_OK; - break; - default: - assert false; - break; - } + sendStatus = SendStatus.SEND_OK; + break; + } + default: { + throw new MQBrokerException(response.getCode(), response.getRemark()); + } + } - SendMessageResponseHeader responseHeader = - (SendMessageResponseHeader) response.decodeCommandCustomHeader(SendMessageResponseHeader.class); + SendMessageResponseHeader responseHeader = + (SendMessageResponseHeader) response.decodeCommandCustomHeader(SendMessageResponseHeader.class); - //If namespace not null , reset Topic without namespace. - String topic = msg.getTopic(); - if (StringUtils.isNotEmpty(this.clientConfig.getNamespace())) { - topic = NamespaceUtil.withoutNamespace(topic, this.clientConfig.getNamespace()); - } + //If namespace not null , reset Topic without namespace. + String topic = msg.getTopic(); + if (StringUtils.isNotEmpty(this.clientConfig.getNamespace())) { + topic = NamespaceUtil.withoutNamespace(topic, this.clientConfig.getNamespace()); + } - MessageQueue messageQueue = new MessageQueue(topic, brokerName, responseHeader.getQueueId()); + MessageQueue messageQueue = new MessageQueue(topic, brokerName, responseHeader.getQueueId()); - String uniqMsgId = MessageClientIDSetter.getUniqID(msg); - if (msg instanceof MessageBatch) { - StringBuilder sb = new StringBuilder(); - for (Message message : (MessageBatch) msg) { - sb.append(sb.length() == 0 ? "" : ",").append(MessageClientIDSetter.getUniqID(message)); - } - uniqMsgId = sb.toString(); - } - SendResult sendResult = new SendResult(sendStatus, - uniqMsgId, - responseHeader.getMsgId(), messageQueue, responseHeader.getQueueOffset()); - sendResult.setTransactionId(responseHeader.getTransactionId()); - String regionId = response.getExtFields().get(MessageConst.PROPERTY_MSG_REGION); - String traceOn = response.getExtFields().get(MessageConst.PROPERTY_TRACE_SWITCH); - if (regionId == null || regionId.isEmpty()) { - regionId = MixAll.DEFAULT_TRACE_REGION_ID; - } - if (traceOn != null && traceOn.equals("false")) { - sendResult.setTraceOn(false); - } else { - sendResult.setTraceOn(true); - } - sendResult.setRegionId(regionId); - return sendResult; - } - default: - break; + String uniqMsgId = MessageClientIDSetter.getUniqID(msg); + if (msg instanceof MessageBatch) { + StringBuilder sb = new StringBuilder(); + for (Message message : (MessageBatch) msg) { + sb.append(sb.length() == 0 ? "" : ",").append(MessageClientIDSetter.getUniqID(message)); + } + uniqMsgId = sb.toString(); + } + SendResult sendResult = new SendResult(sendStatus, + uniqMsgId, + responseHeader.getMsgId(), messageQueue, responseHeader.getQueueOffset()); + sendResult.setTransactionId(responseHeader.getTransactionId()); + String regionId = response.getExtFields().get(MessageConst.PROPERTY_MSG_REGION); + String traceOn = response.getExtFields().get(MessageConst.PROPERTY_TRACE_SWITCH); + if (regionId == null || regionId.isEmpty()) { + regionId = MixAll.DEFAULT_TRACE_REGION_ID; + } + if (traceOn != null && traceOn.equals("false")) { + sendResult.setTraceOn(false); + } else { + sendResult.setTraceOn(true); } - - throw new MQBrokerException(response.getCode(), response.getRemark()); + sendResult.setRegionId(regionId); + return sendResult; } public PullResult pullMessage(