diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java index 6befbf3b5e92f4bc7531892a77e7cb60cc00a528..5ffb6ac2a3819770c6c783b48eb9a5e0d33dc8a3 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java @@ -40,18 +40,16 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume protected final transient DefaultMQPullConsumerImpl defaultMQPullConsumerImpl; /** - * Do the same thing for the same Group, the application must be set,and - * guarantee Globally unique + * Do the same thing for the same Group, the application must be set,and guarantee Globally unique */ private String consumerGroup; /** - * Long polling mode, the Consumer connection max suspend time, it is not - * recommended to modify + * Long polling mode, the Consumer connection max suspend time, it is not recommended to modify */ private long brokerSuspendMaxTimeMillis = 1000 * 20; /** - * Long polling mode, the Consumer connection timeout(must greater than - * brokerSuspendMaxTimeMillis), it is not recommended to modify + * Long polling mode, the Consumer connection timeout(must greater than brokerSuspendMaxTimeMillis), it is not + * recommended to modify */ private long consumerTimeoutMillisWhenSuspend = 1000 * 30; /** @@ -102,42 +100,74 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume this(MixAll.DEFAULT_CONSUMER_GROUP, rpcHook); } + /** + * This method will be removed in a certain version after April 5, 2020, so please do not use this method. + */ + @Deprecated @Override public void createTopic(String key, String newTopic, int queueNum) throws MQClientException { createTopic(key, newTopic, queueNum, 0); } + /** + * This method will be removed in a certain version after April 5, 2020, so please do not use this method. + */ + @Deprecated @Override public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException { this.defaultMQPullConsumerImpl.createTopic(key, newTopic, queueNum, topicSysFlag); } + /** + * This method will be removed in a certain version after April 5, 2020, so please do not use this method. + */ + @Deprecated @Override public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException { return this.defaultMQPullConsumerImpl.searchOffset(mq, timestamp); } + /** + * This method will be removed in a certain version after April 5, 2020, so please do not use this method. + */ + @Deprecated @Override public long maxOffset(MessageQueue mq) throws MQClientException { return this.defaultMQPullConsumerImpl.maxOffset(mq); } + /** + * This method will be removed in a certain version after April 5, 2020, so please do not use this method. + */ + @Deprecated @Override public long minOffset(MessageQueue mq) throws MQClientException { return this.defaultMQPullConsumerImpl.minOffset(mq); } + /** + * This method will be removed in a certain version after April 5, 2020, so please do not use this method. + */ + @Deprecated @Override public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException { return this.defaultMQPullConsumerImpl.earliestMsgStoreTime(mq); } + /** + * This method will be removed in a certain version after April 5, 2020, so please do not use this method. + */ + @Deprecated @Override public MessageExt viewMessage(String offsetMsgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { return this.defaultMQPullConsumerImpl.viewMessage(offsetMsgId); } + /** + * This method will be removed in a certain version after April 5, 2020, so please do not use this method. + */ + @Deprecated @Override public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end) throws MQClientException, InterruptedException { @@ -156,6 +186,11 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume return brokerSuspendMaxTimeMillis; } + + /** + * This method will be removed in a certain version after April 5, 2020, so please do not use this method. + */ + @Deprecated public void setBrokerSuspendMaxTimeMillis(long brokerSuspendMaxTimeMillis) { this.brokerSuspendMaxTimeMillis = brokerSuspendMaxTimeMillis; } @@ -342,14 +377,27 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume this.defaultMQPullConsumerImpl.sendMessageBack(msg, delayLevel, brokerName, consumerGroup); } + /** + * This method will be removed in a certain version after April 5, 2020, so please do not use this method. + */ + @Deprecated public OffsetStore getOffsetStore() { return offsetStore; } + + /** + * This method will be removed in a certain version after April 5, 2020, so please do not use this method. + */ + @Deprecated public void setOffsetStore(OffsetStore offsetStore) { this.offsetStore = offsetStore; } + /** + * This method will be removed in a certain version after April 5, 2020, so please do not use this method. + */ + @Deprecated public DefaultMQPullConsumerImpl getDefaultMQPullConsumerImpl() { return defaultMQPullConsumerImpl; } diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java index 6cd2ad18a5a3cdc736e6bd235910d338eaea9335..4ea741c9c05bb43dae52cb247f241f14b58fb08d 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java @@ -160,7 +160,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume /** * Max consumer thread number */ - private int consumeThreadMax = 64; + private int consumeThreadMax = 20; /** * Threshold for dynamic adjustment of the number of thread pool @@ -346,48 +346,84 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume this(consumerGroup, null, new AllocateMessageQueueAveragely()); } + /** + * This method will be removed in a certain version after April 5, 2020, so please do not use this method. + */ + @Deprecated @Override public void createTopic(String key, String newTopic, int queueNum) throws MQClientException { createTopic(key, newTopic, queueNum, 0); } + /** + * This method will be removed in a certain version after April 5, 2020, so please do not use this method. + */ + @Deprecated @Override public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException { this.defaultMQPushConsumerImpl.createTopic(key, newTopic, queueNum, topicSysFlag); } + /** + * This method will be removed in a certain version after April 5, 2020, so please do not use this method. + */ + @Deprecated @Override public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException { return this.defaultMQPushConsumerImpl.searchOffset(mq, timestamp); } + /** + * This method will be removed in a certain version after April 5, 2020, so please do not use this method. + */ + @Deprecated @Override public long maxOffset(MessageQueue mq) throws MQClientException { return this.defaultMQPushConsumerImpl.maxOffset(mq); } + /** + * This method will be removed in a certain version after April 5, 2020, so please do not use this method. + */ + @Deprecated @Override public long minOffset(MessageQueue mq) throws MQClientException { return this.defaultMQPushConsumerImpl.minOffset(mq); } + /** + * This method will be removed in a certain version after April 5, 2020, so please do not use this method. + */ + @Deprecated @Override public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException { return this.defaultMQPushConsumerImpl.earliestMsgStoreTime(mq); } + /** + * This method will be removed in a certain version after April 5, 2020, so please do not use this method. + */ + @Deprecated @Override public MessageExt viewMessage( String offsetMsgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { return this.defaultMQPushConsumerImpl.viewMessage(offsetMsgId); } + /** + * This method will be removed in a certain version after April 5, 2020, so please do not use this method. + */ + @Deprecated @Override public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end) throws MQClientException, InterruptedException { return this.defaultMQPushConsumerImpl.queryMessage(topic, key, maxNum, begin, end); } + /** + * This method will be removed in a certain version after April 5, 2020, so please do not use this method. + */ + @Deprecated @Override public MessageExt viewMessage(String topic, String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { @@ -456,6 +492,10 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume this.consumeThreadMin = consumeThreadMin; } + /** + * This method will be removed in a certain version after April 5, 2020, so please do not use this method. + */ + @Deprecated public DefaultMQPushConsumerImpl getDefaultMQPushConsumerImpl() { return defaultMQPushConsumerImpl; } @@ -528,6 +568,10 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume return subscription; } + /** + * This method will be removed in a certain version after April 5, 2020, so please do not use this method. + */ + @Deprecated public void setSubscription(Map subscription) { this.subscription = subscription; } @@ -702,10 +746,18 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume this.defaultMQPushConsumerImpl.resume(); } + /** + * This method will be removed in a certain version after April 5, 2020, so please do not use this method. + */ + @Deprecated public OffsetStore getOffsetStore() { return offsetStore; } + /** + * This method will be removed in a certain version after April 5, 2020, so please do not use this method. + */ + @Deprecated public void setOffsetStore(OffsetStore offsetStore) { this.offsetStore = offsetStore; } diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java index 2339142616a1602e9c671d5a7a8080a2b51190bf..1f8729e5f9335daedf7115a19e739098139a5d7f 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java @@ -155,18 +155,20 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { * @param producerGroup Producer group, see the name-sake field. * @param rpcHook RPC hook to execute per each remoting command execution. * @param enableMsgTrace Switch flag instance for message trace. - * @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default trace topic name. + * @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default + * trace topic name. */ - public DefaultMQProducer(final String producerGroup, RPCHook rpcHook, boolean enableMsgTrace,final String customizedTraceTopic) { + public DefaultMQProducer(final String producerGroup, RPCHook rpcHook, boolean enableMsgTrace, + final String customizedTraceTopic) { this.producerGroup = producerGroup; defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook); //if client open the message trace feature if (enableMsgTrace) { try { AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(customizedTraceTopic, rpcHook); - dispatcher.setHostProducer(this.getDefaultMQProducerImpl()); + dispatcher.setHostProducer(this.defaultMQProducerImpl); traceDispatcher = dispatcher; - this.getDefaultMQProducerImpl().registerSendMessageHook( + this.defaultMQProducerImpl.registerSendMessageHook( new SendMessageTraceHookImpl(traceDispatcher)); } catch (Throwable e) { log.error("system mqtrace hook init failed ,maybe can't send msg trace data"); @@ -193,13 +195,13 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { this(producerGroup, null, enableMsgTrace, null); } - /** * Constructor specifying producer group, enabled msgTrace flag and customized trace topic name. * * @param producerGroup Producer group, see the name-sake field. * @param enableMsgTrace Switch flag instance for message trace. - * @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default trace topic name. + * @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default + * trace topic name. */ public DefaultMQProducer(final String producerGroup, boolean enableMsgTrace, final String customizedTraceTopic) { this(producerGroup, null, enableMsgTrace, customizedTraceTopic); @@ -207,7 +209,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { /** * Constructor specifying the RPC hook. - * + * * @param rpcHook RPC hook to execute per each remoting command execution. */ public DefaultMQProducer(RPCHook rpcHook) { @@ -308,9 +310,9 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { * This method returns immediately. On sending completion, sendCallback will be executed. *

* - * Similar to {@link #send(Message)}, internal implementation would potentially retry up to - * {@link #retryTimesWhenSendAsyncFailed} times before claiming sending failure, which may yield message duplication - * and application developers are the one to resolve this potential issue. + * Similar to {@link #send(Message)}, internal implementation would potentially retry up to {@link + * #retryTimesWhenSendAsyncFailed} times before claiming sending failure, which may yield message duplication and + * application developers are the one to resolve this potential issue. * * @param msg Message to send. * @param sendCallback Callback to execute on sending completed, either successful or unsuccessful. @@ -547,6 +549,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { /** * This method is used to send transactional messages. + * * @param msg Transactional message to send. * @param arg Argument used along with local transaction executor. * @return Transaction result. @@ -559,20 +562,22 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { } /** - * Create a topic on broker. + * This method will be removed in a certain version after April 5, 2020, so please do not use this method. * * @param key accesskey * @param newTopic topic name * @param queueNum topic's queue number * @throws MQClientException if there is any client error. */ + @Deprecated @Override public void createTopic(String key, String newTopic, int queueNum) throws MQClientException { createTopic(key, newTopic, queueNum, 0); } /** - * Create a topic on broker. + * Create a topic on broker. This method will be removed in a certain version after April 5, 2020, so please do not + * use this method. * * @param key accesskey * @param newTopic topic name @@ -580,6 +585,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { * @param topicSysFlag topic system flag * @throws MQClientException if there is any client error. */ + @Deprecated @Override public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException { this.defaultMQProducerImpl.createTopic(key, newTopic, queueNum, topicSysFlag); @@ -601,10 +607,13 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { /** * Query maximum offset of the given message queue. * + * This method will be removed in a certain version after April 5, 2020, so please do not use this method. + * * @param mq Instance of MessageQueue * @return maximum offset of the given consume queue. * @throws MQClientException if there is any client error. */ + @Deprecated @Override public long maxOffset(MessageQueue mq) throws MQClientException { return this.defaultMQProducerImpl.maxOffset(mq); @@ -613,10 +622,13 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { /** * Query minimum offset of the given message queue. * + * This method will be removed in a certain version after April 5, 2020, so please do not use this method. + * * @param mq Instance of MessageQueue * @return minimum offset of the given message queue. * @throws MQClientException if there is any client error. */ + @Deprecated @Override public long minOffset(MessageQueue mq) throws MQClientException { return this.defaultMQProducerImpl.minOffset(mq); @@ -625,10 +637,13 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { /** * Query earliest message store time. * + * This method will be removed in a certain version after April 5, 2020, so please do not use this method. + * * @param mq Instance of MessageQueue * @return earliest message store time. * @throws MQClientException if there is any client error. */ + @Deprecated @Override public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException { return this.defaultMQProducerImpl.earliestMsgStoreTime(mq); @@ -637,6 +652,8 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { /** * Query message of the given offset message ID. * + * This method will be removed in a certain version after April 5, 2020, so please do not use this method. + * * @param offsetMsgId message id * @return Message specified. * @throws MQBrokerException if there is any broker error. @@ -644,6 +661,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { * @throws RemotingException if there is any network-tier error. * @throws InterruptedException if the sending thread is interrupted. */ + @Deprecated @Override public MessageExt viewMessage( String offsetMsgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { @@ -653,6 +671,8 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { /** * Query message by key. * + * This method will be removed in a certain version after April 5, 2020, so please do not use this method. + * * @param topic message topic * @param key message key index word * @param maxNum max message number @@ -662,6 +682,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { * @throws MQClientException if there is any client error. * @throws InterruptedException if the thread is interrupted. */ + @Deprecated @Override public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end) throws MQClientException, InterruptedException { @@ -671,6 +692,8 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { /** * Query message of the given message ID. * + * This method will be removed in a certain version after April 5, 2020, so please do not use this method. + * * @param topic Topic * @param msgId Message ID * @return Message specified. @@ -679,6 +702,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { * @throws RemotingException if there is any network-tier error. * @throws InterruptedException if the sending thread is interrupted. */ + @Deprecated @Override public MessageExt viewMessage(String topic, String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { @@ -715,8 +739,8 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { } /** - * Sets an Executor to be used for executing callback methods. - * If the Executor is not set, {@link NettyRemotingClient#publicExecutor} will be used. + * Sets an Executor to be used for executing callback methods. If the Executor is not set, {@link + * NettyRemotingClient#publicExecutor} will be used. * * @param callbackExecutor the instance of Executor */ @@ -781,6 +805,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { this.compressMsgBodyOverHowmuch = compressMsgBodyOverHowmuch; } + @Deprecated public DefaultMQProducerImpl getDefaultMQProducerImpl() { return defaultMQProducerImpl; }