diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java index f566ed0fcca4781d0eb7cca0b62ad059d286cc0f..6c232a9590bc43398b163efebda89c9cb1707b1f 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java @@ -16,19 +16,6 @@ */ package org.apache.rocketmq.client.impl.consumer; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; @@ -48,6 +35,20 @@ import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.slf4j.Logger; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + public class ConsumeMessageConcurrentlyService implements ConsumeMessageService { private static final Logger log = ClientLogger.getLog(); private final DefaultMQPushConsumerImpl defaultMQPushConsumerImpl; @@ -163,7 +164,7 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService final long beginTime = System.currentTimeMillis(); - log.info("consumeMessageDirectly receive new messge: {}", msg); + log.info("consumeMessageDirectly receive new message: {}", msg); try { ConsumeConcurrentlyStatus status = this.messageListener.consumeMessage(msgs, context); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java index 1fa474caa1d256920ee7aa2ff23ef87ef1a5990a..8c536c52c42508646561bf15ebbe578e683f7f31 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java @@ -16,16 +16,6 @@ */ package org.apache.rocketmq.client.impl.consumer; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; @@ -48,6 +38,17 @@ import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.slf4j.Logger; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + public class ConsumeMessageOrderlyService implements ConsumeMessageService { private static final Logger log = ClientLogger.getLog(); private final static long MAX_TIME_CONSUME_CONTINUOUSLY = @@ -143,7 +144,7 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService { final long beginTime = System.currentTimeMillis(); - log.info("consumeMessageDirectly receive new messge: {}", msg); + log.info("consumeMessageDirectly receive new message: {}", msg); try { ConsumeOrderlyStatus status = this.messageListener.consumeMessage(msgs, context); diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java index b44211c5712cf5e0c2828212d0db5a267b221e9e..1b6af2643e029e36376841f4021d164b49912e34 100644 --- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java @@ -16,13 +16,6 @@ */ package org.apache.rocketmq.store; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; import org.apache.rocketmq.common.ServiceThread; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.constant.LoggerName; @@ -39,6 +32,14 @@ import org.apache.rocketmq.store.schedule.ScheduleMessageService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + /** * Store all metadata downtime for recovery, data protection reliability */ @@ -183,7 +184,7 @@ public class CommitLog { index++; if (index >= mappedFiles.size()) { // Current branch can not happen - log.info("recover last 3 physics file over, last maped file " + mappedFile.getFileName()); + log.info("recover last 3 physics file over, last mapped file " + mappedFile.getFileName()); break; } else { mappedFile = mappedFiles.get(index); @@ -417,7 +418,7 @@ public class CommitLog { for (; index >= 0; index--) { mappedFile = mappedFiles.get(index); if (this.isMappedFileMatchedRecover(mappedFile)) { - log.info("recover from this maped file " + mappedFile.getFileName()); + log.info("recover from this mapped file " + mappedFile.getFileName()); break; } } @@ -459,7 +460,7 @@ public class CommitLog { if (index >= mappedFiles.size()) { // The current branch under normal circumstances should // not happen - log.info("recover physics file over, last maped file " + mappedFile.getFileName()); + log.info("recover physics file over, last mapped file " + mappedFile.getFileName()); break; } else { mappedFile = mappedFiles.get(index); @@ -585,7 +586,7 @@ public class CommitLog { mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise } if (null == mappedFile) { - log.error("create maped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString()); + log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString()); beginTimeInLock = 0; return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null); } @@ -600,7 +601,7 @@ public class CommitLog { mappedFile = this.mappedFileQueue.getLastMappedFile(0); if (null == mappedFile) { // XXX: warn and notify me - log.error("create maped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString()); + log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString()); beginTimeInLock = 0; return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result); } @@ -735,7 +736,7 @@ public class CommitLog { mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise } if (null == mappedFile) { - log.error("Create maped file1 error, topic: {} clientAddr: {}", messageExtBatch.getTopic(), messageExtBatch.getBornHostString()); + log.error("Create mapped file1 error, topic: {} clientAddr: {}", messageExtBatch.getTopic(), messageExtBatch.getBornHostString()); beginTimeInLock = 0; return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null); } @@ -750,7 +751,7 @@ public class CommitLog { mappedFile = this.mappedFileQueue.getLastMappedFile(0); if (null == mappedFile) { // XXX: warn and notify me - log.error("Create maped file2 error, topic: {} clientAddr: {}", messageExtBatch.getTopic(), messageExtBatch.getBornHostString()); + log.error("Create mapped file2 error, topic: {} clientAddr: {}", messageExtBatch.getTopic(), messageExtBatch.getBornHostString()); beginTimeInLock = 0; return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result); } diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java index d03ff0f32ebe790855665b244024b394e125eee3..2437c9c402bd98adf5517c3f7f692489e165e1de 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java +++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java @@ -16,14 +16,15 @@ */ package org.apache.rocketmq.store; -import java.io.File; -import java.nio.ByteBuffer; -import java.util.List; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.store.config.StorePathConfigHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; +import java.nio.ByteBuffer; +import java.util.List; + public class ConsumeQueue { public static final int CQ_STORE_UNIT_SIZE = 20; @@ -121,7 +122,7 @@ public class ConsumeQueue { index++; if (index >= mappedFiles.size()) { - log.info("recover last consume queue file over, last maped file " + log.info("recover last consume queue file over, last mapped file " + mappedFile.getFileName()); break; } else {