diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/HAConnection.java b/store/src/main/java/org/apache/rocketmq/store/ha/HAConnection.java index c102881c1ca5e5c2f772b21ddf2175579553da92..ce1b50b67a863a1c297136e7e303001e8fefddba 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ha/HAConnection.java +++ b/store/src/main/java/org/apache/rocketmq/store/ha/HAConnection.java @@ -83,7 +83,7 @@ public class HAConnection { private final Selector selector; private final SocketChannel socketChannel; private final ByteBuffer byteBufferRead = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE); - private int processPostion = 0; + private int processPosition = 0; private volatile long lastReadTimestamp = System.currentTimeMillis(); public ReadSocketService(final SocketChannel socketChannel) throws IOException { @@ -150,7 +150,7 @@ public class HAConnection { if (!this.byteBufferRead.hasRemaining()) { this.byteBufferRead.flip(); - this.processPostion = 0; + this.processPosition = 0; } while (this.byteBufferRead.hasRemaining()) { @@ -159,10 +159,10 @@ public class HAConnection { if (readSize > 0) { readSizeZeroTimes = 0; this.lastReadTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now(); - if ((this.byteBufferRead.position() - this.processPostion) >= 8) { + if ((this.byteBufferRead.position() - this.processPosition) >= 8) { int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8); long readOffset = this.byteBufferRead.getLong(pos - 8); - this.processPostion = pos; + this.processPosition = pos; HAConnection.this.slaveAckOffset = readOffset; if (HAConnection.this.slaveRequestOffset < 0) { diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java b/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java index 55bea0b2d7dd16080e355f8b8438faa412827794..bbadd1bcfe8ef991f017eeb23145b204ce4567e4 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java +++ b/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java @@ -332,7 +332,7 @@ public class HAService { private long lastWriteTimestamp = System.currentTimeMillis(); private long currentReportedOffset = 0; - private int dispatchPostion = 0; + private int dispatchPosition = 0; private ByteBuffer byteBufferRead = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE); private ByteBuffer byteBufferBackup = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE); @@ -378,9 +378,9 @@ public class HAService { } private void reallocateByteBuffer() { - int remain = READ_MAX_BUFFER_SIZE - this.dispatchPostion; + int remain = READ_MAX_BUFFER_SIZE - this.dispatchPosition; if (remain > 0) { - this.byteBufferRead.position(this.dispatchPostion); + this.byteBufferRead.position(this.dispatchPosition); this.byteBufferBackup.position(0); this.byteBufferBackup.limit(READ_MAX_BUFFER_SIZE); @@ -391,7 +391,7 @@ public class HAService { this.byteBufferRead.position(remain); this.byteBufferRead.limit(READ_MAX_BUFFER_SIZE); - this.dispatchPostion = 0; + this.dispatchPosition = 0; } private void swapByteBuffer() { @@ -435,10 +435,10 @@ public class HAService { int readSocketPos = this.byteBufferRead.position(); while (true) { - int diff = this.byteBufferRead.position() - this.dispatchPostion; + int diff = this.byteBufferRead.position() - this.dispatchPosition; if (diff >= msgHeaderSize) { - long masterPhyOffset = this.byteBufferRead.getLong(this.dispatchPostion); - int bodySize = this.byteBufferRead.getInt(this.dispatchPostion + 8); + long masterPhyOffset = this.byteBufferRead.getLong(this.dispatchPosition); + int bodySize = this.byteBufferRead.getInt(this.dispatchPosition + 8); long slavePhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset(); @@ -452,13 +452,13 @@ public class HAService { if (diff >= (msgHeaderSize + bodySize)) { byte[] bodyData = new byte[bodySize]; - this.byteBufferRead.position(this.dispatchPostion + msgHeaderSize); + this.byteBufferRead.position(this.dispatchPosition + msgHeaderSize); this.byteBufferRead.get(bodyData); HAService.this.defaultMessageStore.appendToCommitLog(masterPhyOffset, bodyData); this.byteBufferRead.position(readSocketPos); - this.dispatchPostion += msgHeaderSize + bodySize; + this.dispatchPosition += msgHeaderSize + bodySize; if (!reportSlaveMaxOffsetPlus()) { return false; @@ -532,7 +532,7 @@ public class HAService { } this.lastWriteTimestamp = 0; - this.dispatchPostion = 0; + this.dispatchPosition = 0; this.byteBufferBackup.position(0); this.byteBufferBackup.limit(READ_MAX_BUFFER_SIZE);