提交 90e600ba 编写于 作者: F fjy

Merge pull request #511 from metamx/fix-hll-buffer-and-swap

HLL: avoid direct buffers + fix swap target buffer
......@@ -326,27 +326,26 @@ public abstract class HyperLogLogCollector implements Comparable<HyperLogLogColl
convertToMutableByteBuffer();
}
if (storageBuffer.remaining() != getNumBytesForDenseStorage()) {
convertToDenseStorage();
}
estimatedCardinality = null;
if (getRegisterOffset() < other.getRegisterOffset()) {
// "Swap" the buffers so that we are folding into the one with the higher offset
ByteBuffer newStorage = ByteBuffer.allocateDirect(other.storageBuffer.remaining());
newStorage.put(other.storageBuffer.asReadOnlyBuffer());
newStorage.clear();
other.storageBuffer = storageBuffer;
other.initPosition = initPosition;
storageBuffer = newStorage;
initPosition = 0;
final ByteBuffer tmpBuffer = ByteBuffer.allocate(storageBuffer.remaining());
tmpBuffer.put(storageBuffer.asReadOnlyBuffer());
tmpBuffer.clear();
storageBuffer.duplicate().put(other.storageBuffer.asReadOnlyBuffer());
other = HyperLogLogCollector.makeCollector(tmpBuffer);
}
final ByteBuffer otherBuffer = other.storageBuffer.asReadOnlyBuffer();
final byte otherOffset = other.getRegisterOffset();
if (storageBuffer.remaining() != getNumBytesForDenseStorage()) {
convertToDenseStorage();
}
byte myOffset = getRegisterOffset();
short numNonZero = getNumNonZeroRegisters();
......@@ -540,7 +539,7 @@ public abstract class HyperLogLogCollector implements Comparable<HyperLogLogColl
private void convertToMutableByteBuffer()
{
ByteBuffer tmpBuffer = ByteBuffer.allocateDirect(storageBuffer.remaining());
ByteBuffer tmpBuffer = ByteBuffer.allocate(storageBuffer.remaining());
tmpBuffer.put(storageBuffer.asReadOnlyBuffer());
tmpBuffer.position(0);
storageBuffer = tmpBuffer;
......@@ -549,7 +548,7 @@ public abstract class HyperLogLogCollector implements Comparable<HyperLogLogColl
private void convertToDenseStorage()
{
ByteBuffer tmpBuffer = ByteBuffer.allocateDirect(getNumBytesForDenseStorage());
ByteBuffer tmpBuffer = ByteBuffer.allocate(getNumBytesForDenseStorage());
// put header
setVersion(tmpBuffer);
setRegisterOffset(tmpBuffer, getRegisterOffset());
......
......@@ -67,7 +67,7 @@ public class HyperUniquesBufferAggregator implements BufferAggregator
public Object get(ByteBuffer buf, int position)
{
final int size = HyperLogLogCollector.getLatestNumBytesForDenseStorage();
ByteBuffer dataCopyBuffer = ByteBuffer.allocateDirect(size);
ByteBuffer dataCopyBuffer = ByteBuffer.allocate(size);
ByteBuffer mutationBuffer = buf.duplicate();
mutationBuffer.position(position);
mutationBuffer.limit(position + size);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册