From 85467dfd34d8ff379d2ddfec0489d78dcba20c27 Mon Sep 17 00:00:00 2001 From: qinliujie <765152203@qq.com> Date: Mon, 9 Jan 2017 12:06:23 +0800 Subject: [PATCH] Fix-35 [ROCKETMQ-35] Reslove underlying NPE in ConsumeRequest, closes apache/incubator-rocketmq#32 --- .../impl/consumer/ConsumeMessageConcurrentlyService.java | 6 +++++- .../client/impl/consumer/ConsumeMessageOrderlyService.java | 6 +++++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java index 5440522f..f566ed0f 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java @@ -439,7 +439,11 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService } else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) { returnType = ConsumeReturnType.SUCCESS; } - consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name()); + + if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) { + consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name()); + } + if (null == status) { log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}", ConsumeMessageConcurrentlyService.this.consumerGroup, diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java index 6c92315f..1fa474ca 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java @@ -501,7 +501,11 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService { } else if (ConsumeOrderlyStatus.SUCCESS == status) { returnType = ConsumeReturnType.SUCCESS; } - consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name()); + + if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) { + consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name()); + } + if (null == status) { status = ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; } -- GitLab