提交 b2cbf4e8 编写于 作者: M Matteo Merli 提交者: GitHub

Fixed stats buffer rotation (#17)

上级 47a213c1
......@@ -15,7 +15,6 @@
*/
package com.yahoo.pulsar.broker.admin;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Collection;
import java.util.Map;
......@@ -33,13 +32,6 @@ import org.apache.bookkeeper.mledger.proto.PendingBookieOpsStats;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import com.yahoo.pulsar.common.naming.NamespaceName;
import com.yahoo.pulsar.common.policies.data.loadbalancer.LoadReport;
import com.yahoo.pulsar.common.stats.AllocatorStats;
import com.yahoo.pulsar.broker.loadbalance.ResourceUnit;
import com.yahoo.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
import com.yahoo.pulsar.broker.stats.AllocatorStatsGenerator;
......@@ -47,9 +39,14 @@ import com.yahoo.pulsar.broker.stats.BookieClientStatsGenerator;
import com.yahoo.pulsar.broker.stats.MBeanStatsGenerator;
import com.yahoo.pulsar.broker.stats.Metrics;
import com.yahoo.pulsar.broker.web.RestException;
import com.yahoo.pulsar.common.naming.NamespaceName;
import com.yahoo.pulsar.common.policies.data.loadbalancer.LoadReport;
import com.yahoo.pulsar.common.stats.AllocatorStats;
import io.netty.buffer.ByteBuf;
import io.netty.util.ReferenceCountUtil;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
@Path("/broker-stats")
@Api(value = "/broker-stats", description = "Stats for broker", tags = "broker-stats")
......@@ -99,19 +96,13 @@ public class BrokerStats extends AdminResource {
public StreamingOutput getDestinations2() throws Exception {
// Ensure super user access only
validateSuperUserAccess();
return new StreamingOutput() {
public void write(OutputStream output) throws IOException, WebApplicationException {
ByteBuf statsBuf = null;
try {
statsBuf = pulsar().getBrokerService().getDimensionMetrics();
output.write(statsBuf.array(), statsBuf.arrayOffset(), statsBuf.readableBytes());
} catch (Exception e) {
throw new WebApplicationException(e);
} finally {
ReferenceCountUtil.release(statsBuf);
}
return output -> pulsar().getBrokerService().getDimensionMetrics(statsBuf -> {
try {
output.write(statsBuf.array(), statsBuf.arrayOffset(), statsBuf.readableBytes());
} catch (Exception e) {
throw new WebApplicationException(e);
}
};
});
}
@GET
......
......@@ -37,6 +37,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenLedgerCallback;
......@@ -591,8 +592,8 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
}
}
public ByteBuf getDimensionMetrics() {
return pulsarStats.getDimensionMetrics();
public void getDimensionMetrics(Consumer<ByteBuf> consumer) {
pulsarStats.getDimensionMetrics(consumer);
}
public List<Metrics> getDestinationMetrics() {
......
......@@ -18,6 +18,8 @@ package com.yahoo.pulsar.broker.service;
import java.io.Closeable;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -51,6 +53,8 @@ public class PulsarStats implements Closeable {
private List<Metrics> metricsCollection;
private final BrokerOperabilityMetrics brokerOperabilityMetrics;
private final ReentrantReadWriteLock bufferLock = new ReentrantReadWriteLock();
public PulsarStats(PulsarService pulsar) {
this.topicStatsBuf = PooledByteBufAllocator.DEFAULT.heapBuffer(16 * 1024);
this.tempTopicStatsBuf = PooledByteBufAllocator.DEFAULT.heapBuffer(16 * 1024);
......@@ -148,21 +152,23 @@ public class PulsarStats implements Closeable {
metricsCollection = tempMetricsCollection;
tempMetricsCollection = tempRefMetrics;
ByteBuf tmp = topicStatsBuf;
topicStatsBuf = tempTopicStatsBuf;
tempTopicStatsBuf = tmp;
bufferLock.writeLock().lock();
try {
ByteBuf tmp = topicStatsBuf;
topicStatsBuf = tempTopicStatsBuf;
tempTopicStatsBuf = tmp;
tempTopicStatsBuf.clear();
} finally {
bufferLock.writeLock().unlock();
}
}
public ByteBuf getDimensionMetrics() {
while (true) {
ByteBuf topicStatsBuf = this.topicStatsBuf;
try {
topicStatsBuf.retain();
return topicStatsBuf;
} catch (Exception e) {
// Re-fetch the buffer, since it have been swapped and release
continue;
}
public void getDimensionMetrics(Consumer<ByteBuf> consumer) {
bufferLock.readLock().lock();
try {
consumer.accept(topicStatsBuf);
} finally {
bufferLock.readLock().unlock();
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册