diff --git a/broker/src/main/java/org/apache/rocketmq/broker/pagecache/ManyMessageTransfer.java b/broker/src/main/java/org/apache/rocketmq/broker/pagecache/ManyMessageTransfer.java
index 968bcfb564f9fe67e48d84371a47089a4572783a..373a0a48b672002401603f03158e0f98224fe03d 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/pagecache/ManyMessageTransfer.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/pagecache/ManyMessageTransfer.java
@@ -53,6 +53,11 @@ public class ManyMessageTransfer extends AbstractReferenceCounted implements Fil
return transferred;
}
+ @Override
+ public long transferred() {
+ return transferred;
+ }
+
@Override
public long count() {
return byteBufferHeader.limit() + this.getMessageResult.getBufferTotalSize();
@@ -76,6 +81,28 @@ public class ManyMessageTransfer extends AbstractReferenceCounted implements Fil
return 0;
}
+ @Override
+ public FileRegion retain() {
+ super.retain();
+ return this;
+ }
+
+ @Override
+ public FileRegion retain(int increment) {
+ super.retain(increment);
+ return this;
+ }
+
+ @Override
+ public FileRegion touch() {
+ return this;
+ }
+
+ @Override
+ public FileRegion touch(Object hint) {
+ return this;
+ }
+
public void close() {
this.deallocate();
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/pagecache/OneMessageTransfer.java b/broker/src/main/java/org/apache/rocketmq/broker/pagecache/OneMessageTransfer.java
index b795d2d253bd3ade2d7a102ecdbbe596321d8d15..558f091763e75bd0a3d0fb35b852530756b298d2 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/pagecache/OneMessageTransfer.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/pagecache/OneMessageTransfer.java
@@ -47,6 +47,11 @@ public class OneMessageTransfer extends AbstractReferenceCounted implements File
return transferred;
}
+ @Override
+ public long transferred() {
+ return transferred;
+ }
+
@Override
public long count() {
return this.byteBufferHeader.limit() + this.selectMappedBufferResult.getSize();
@@ -65,6 +70,28 @@ public class OneMessageTransfer extends AbstractReferenceCounted implements File
return 0;
}
+ @Override
+ public FileRegion retain() {
+ super.retain();
+ return this;
+ }
+
+ @Override
+ public FileRegion retain(int increment) {
+ super.retain(increment);
+ return this;
+ }
+
+ @Override
+ public FileRegion touch() {
+ return this;
+ }
+
+ @Override
+ public FileRegion touch(Object hint) {
+ return this;
+ }
+
public void close() {
this.deallocate();
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/pagecache/QueryMessageTransfer.java b/broker/src/main/java/org/apache/rocketmq/broker/pagecache/QueryMessageTransfer.java
index e8f30996677a72cd2159baec3375ad42ad9a6c1a..db47b9e50e346652052065b06f51412eec25cf1d 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/pagecache/QueryMessageTransfer.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/pagecache/QueryMessageTransfer.java
@@ -53,6 +53,11 @@ public class QueryMessageTransfer extends AbstractReferenceCounted implements Fi
return transferred;
}
+ @Override
+ public long transferred() {
+ return transferred;
+ }
+
@Override
public long count() {
return byteBufferHeader.limit() + this.queryMessageResult.getBufferTotalSize();
@@ -76,6 +81,28 @@ public class QueryMessageTransfer extends AbstractReferenceCounted implements Fi
return 0;
}
+ @Override
+ public FileRegion retain() {
+ super.retain();
+ return this;
+ }
+
+ @Override
+ public FileRegion retain(int increment) {
+ super.retain(increment);
+ return this;
+ }
+
+ @Override
+ public FileRegion touch() {
+ return this;
+ }
+
+ @Override
+ public FileRegion touch(Object hint) {
+ return this;
+ }
+
public void close() {
this.deallocate();
}
diff --git a/pom.xml b/pom.xml
index b6f1770e2d85c6295c02f5077926e158d1d8f872..59fe5c8072b2af583a0ec43d65ab268de58f79f9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -537,7 +537,7 @@
io.netty
netty-all
- 4.0.42.Final
+ 4.1.65.Final
com.alibaba
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyLogger.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyLogger.java
index 4b4e86e68715c986c6823a310406221b785c62ff..1a0c9b53e32424857fc6d575a0989ac12875a4fe 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyLogger.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyLogger.java
@@ -52,6 +52,8 @@ public class NettyLogger {
private InternalLogger logger = null;
+ private static final String EXCEPTION_MESSAGE = "Unexpected exception:";
+
public NettyBridgeLogger(String name) {
logger = InternalLoggerFactory.getLogger(name);
}
@@ -161,6 +163,25 @@ public class NettyLogger {
}
}
+ @Override
+ public void log(InternalLogLevel internalLogLevel, Throwable throwable) {
+ if (internalLogLevel.equals(InternalLogLevel.DEBUG)) {
+ logger.debug(EXCEPTION_MESSAGE, throwable);
+ }
+ if (internalLogLevel.equals(InternalLogLevel.TRACE)) {
+ logger.info(EXCEPTION_MESSAGE, throwable);
+ }
+ if (internalLogLevel.equals(InternalLogLevel.INFO)) {
+ logger.info(EXCEPTION_MESSAGE, throwable);
+ }
+ if (internalLogLevel.equals(InternalLogLevel.WARN)) {
+ logger.warn(EXCEPTION_MESSAGE, throwable);
+ }
+ if (internalLogLevel.equals(InternalLogLevel.ERROR)) {
+ logger.error(EXCEPTION_MESSAGE, throwable);
+ }
+ }
+
@Override
public boolean isTraceEnabled() {
return isEnabled(InternalLogLevel.TRACE);
@@ -191,6 +212,11 @@ public class NettyLogger {
logger.info(var1, var2);
}
+ @Override
+ public void trace(Throwable var1) {
+ logger.info(EXCEPTION_MESSAGE, var1);
+ }
+
@Override
public boolean isDebugEnabled() {
return isEnabled(InternalLogLevel.DEBUG);
@@ -221,6 +247,11 @@ public class NettyLogger {
logger.debug(var1, var2);
}
+ @Override
+ public void debug(Throwable var1) {
+ logger.debug(EXCEPTION_MESSAGE, var1);
+ }
+
@Override
public boolean isInfoEnabled() {
return isEnabled(InternalLogLevel.INFO);
@@ -251,6 +282,11 @@ public class NettyLogger {
logger.info(var1, var2);
}
+ @Override
+ public void info(Throwable var1) {
+ logger.info(EXCEPTION_MESSAGE, var1);
+ }
+
@Override
public boolean isWarnEnabled() {
return isEnabled(InternalLogLevel.WARN);
@@ -281,6 +317,11 @@ public class NettyLogger {
logger.warn(var1, var2);
}
+ @Override
+ public void warn(Throwable var1) {
+ logger.warn(EXCEPTION_MESSAGE, var1);
+ }
+
@Override
public boolean isErrorEnabled() {
return isEnabled(InternalLogLevel.ERROR);
@@ -310,6 +351,11 @@ public class NettyLogger {
public void error(String var1, Throwable var2) {
logger.error(var1, var2);
}
+
+ @Override
+ public void error(Throwable var1) {
+ logger.error(EXCEPTION_MESSAGE, var1);
+ }
}
}