提交 9422bdb3 编写于 作者: 王松松

correct spelling mistake

上级 bfa7f1ac
......@@ -16,19 +16,6 @@
*/
package org.apache.rocketmq.client.impl.consumer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
......@@ -48,6 +35,20 @@ import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.slf4j.Logger;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ConsumeMessageConcurrentlyService implements ConsumeMessageService {
private static final Logger log = ClientLogger.getLog();
private final DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;
......@@ -163,7 +164,7 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
final long beginTime = System.currentTimeMillis();
log.info("consumeMessageDirectly receive new messge: {}", msg);
log.info("consumeMessageDirectly receive new message: {}", msg);
try {
ConsumeConcurrentlyStatus status = this.messageListener.consumeMessage(msgs, context);
......
......@@ -16,16 +16,6 @@
*/
package org.apache.rocketmq.client.impl.consumer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
......@@ -48,6 +38,17 @@ import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.slf4j.Logger;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ConsumeMessageOrderlyService implements ConsumeMessageService {
private static final Logger log = ClientLogger.getLog();
private final static long MAX_TIME_CONSUME_CONTINUOUSLY =
......@@ -143,7 +144,7 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
final long beginTime = System.currentTimeMillis();
log.info("consumeMessageDirectly receive new messge: {}", msg);
log.info("consumeMessageDirectly receive new message: {}", msg);
try {
ConsumeOrderlyStatus status = this.messageListener.consumeMessage(msgs, context);
......
......@@ -16,13 +16,6 @@
*/
package org.apache.rocketmq.store;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
......@@ -39,6 +32,14 @@ import org.apache.rocketmq.store.schedule.ScheduleMessageService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
* Store all metadata downtime for recovery, data protection reliability
*/
......@@ -183,7 +184,7 @@ public class CommitLog {
index++;
if (index >= mappedFiles.size()) {
// Current branch can not happen
log.info("recover last 3 physics file over, last maped file " + mappedFile.getFileName());
log.info("recover last 3 physics file over, last mapped file " + mappedFile.getFileName());
break;
} else {
mappedFile = mappedFiles.get(index);
......@@ -417,7 +418,7 @@ public class CommitLog {
for (; index >= 0; index--) {
mappedFile = mappedFiles.get(index);
if (this.isMappedFileMatchedRecover(mappedFile)) {
log.info("recover from this maped file " + mappedFile.getFileName());
log.info("recover from this mapped file " + mappedFile.getFileName());
break;
}
}
......@@ -459,7 +460,7 @@ public class CommitLog {
if (index >= mappedFiles.size()) {
// The current branch under normal circumstances should
// not happen
log.info("recover physics file over, last maped file " + mappedFile.getFileName());
log.info("recover physics file over, last mapped file " + mappedFile.getFileName());
break;
} else {
mappedFile = mappedFiles.get(index);
......@@ -585,7 +586,7 @@ public class CommitLog {
mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
}
if (null == mappedFile) {
log.error("create maped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
}
......@@ -600,7 +601,7 @@ public class CommitLog {
mappedFile = this.mappedFileQueue.getLastMappedFile(0);
if (null == mappedFile) {
// XXX: warn and notify me
log.error("create maped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
}
......@@ -735,7 +736,7 @@ public class CommitLog {
mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
}
if (null == mappedFile) {
log.error("Create maped file1 error, topic: {} clientAddr: {}", messageExtBatch.getTopic(), messageExtBatch.getBornHostString());
log.error("Create mapped file1 error, topic: {} clientAddr: {}", messageExtBatch.getTopic(), messageExtBatch.getBornHostString());
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
}
......@@ -750,7 +751,7 @@ public class CommitLog {
mappedFile = this.mappedFileQueue.getLastMappedFile(0);
if (null == mappedFile) {
// XXX: warn and notify me
log.error("Create maped file2 error, topic: {} clientAddr: {}", messageExtBatch.getTopic(), messageExtBatch.getBornHostString());
log.error("Create mapped file2 error, topic: {} clientAddr: {}", messageExtBatch.getTopic(), messageExtBatch.getBornHostString());
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
}
......
......@@ -16,14 +16,15 @@
*/
package org.apache.rocketmq.store;
import java.io.File;
import java.nio.ByteBuffer;
import java.util.List;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.store.config.StorePathConfigHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.nio.ByteBuffer;
import java.util.List;
public class ConsumeQueue {
public static final int CQ_STORE_UNIT_SIZE = 20;
......@@ -121,7 +122,7 @@ public class ConsumeQueue {
index++;
if (index >= mappedFiles.size()) {
log.info("recover last consume queue file over, last maped file "
log.info("recover last consume queue file over, last mapped file "
+ mappedFile.getFileName());
break;
} else {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册