Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
Forever310
druid
提交
cd78b916
D
druid
项目概览
Forever310
/
druid
与 Fork 源项目一致
从无法访问的项目Fork
通知
3
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
D
druid
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
cd78b916
编写于
5月 30, 2014
作者:
F
fjy
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
remove druid-hll and use hyperUnique
上级
4c133272
变更
9
隐藏空白更改
内联
并排
Showing
9 changed file
with
0 addition
and
1006 deletion
+0
-1006
hll/pom.xml
hll/pom.xml
+0
-81
hll/src/main/java/io/druid/query/aggregation/HyperloglogAggregator.java
...ava/io/druid/query/aggregation/HyperloglogAggregator.java
+0
-137
hll/src/main/java/io/druid/query/aggregation/HyperloglogAggregatorFactory.java
...druid/query/aggregation/HyperloglogAggregatorFactory.java
+0
-209
hll/src/main/java/io/druid/query/aggregation/HyperloglogBufferAggregator.java
.../druid/query/aggregation/HyperloglogBufferAggregator.java
+0
-94
hll/src/main/java/io/druid/query/aggregation/HyperloglogComplexMetricSerde.java
...ruid/query/aggregation/HyperloglogComplexMetricSerde.java
+0
-137
hll/src/main/java/io/druid/query/aggregation/HyperloglogDruidModule.java
...va/io/druid/query/aggregation/HyperloglogDruidModule.java
+0
-140
hll/src/main/resources/META-INF/services/io.druid.initialization.DruidModule
...ces/META-INF/services/io.druid.initialization.DruidModule
+0
-1
hll/src/test/java/io/druid/query/aggregation/HyperloglogAggregatorTest.java
...io/druid/query/aggregation/HyperloglogAggregatorTest.java
+0
-162
hll/src/test/java/io/druid/query/aggregation/TestHllComplexMetricSelector.java
...druid/query/aggregation/TestHllComplexMetricSelector.java
+0
-45
未找到文件。
hll/pom.xml
已删除
100644 → 0
浏览文件 @
4c133272
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Druid - a distributed column store.
~ Copyright (C) 2012, 2013 Metamarkets Group Inc.
~
~ This program is free software; you can redistribute it and/or
~ modify it under the terms of the GNU General Public License
~ as published by the Free Software Foundation; either version 2
~ of the License, or (at your option) any later version.
~
~ This program is distributed in the hope that it will be useful,
~ but WITHOUT ANY WARRANTY; without even the implied warranty of
~ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
~ GNU General Public License for more details.
~
~ You should have received a copy of the GNU General Public License
~ along with this program; if not, write to the Free Software
~ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
-->
<project
xmlns=
"http://maven.apache.org/POM/4.0.0"
xmlns:xsi=
"http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation=
"http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"
>
<modelVersion>
4.0.0
</modelVersion>
<groupId>
io.druid.extensions
</groupId>
<artifactId>
druid-hll
</artifactId>
<name>
druid-hll
</name>
<description>
druid-hll
</description>
<parent>
<groupId>
io.druid
</groupId>
<artifactId>
druid
</artifactId>
<version>
0.6.116-SNAPSHOT
</version>
</parent>
<dependencies>
<dependency>
<groupId>
io.druid
</groupId>
<artifactId>
druid-api
</artifactId>
</dependency>
<dependency>
<groupId>
io.druid
</groupId>
<artifactId>
druid-processing
</artifactId>
<version>
${project.parent.version}
</version>
</dependency>
<dependency>
<groupId>
com.metamx
</groupId>
<artifactId>
emitter
</artifactId>
</dependency>
<dependency>
<groupId>
net.sf.trove4j
</groupId>
<artifactId>
trove4j
</artifactId>
<version>
3.0.3
</version>
</dependency>
<dependency>
<groupId>
commons-codec
</groupId>
<artifactId>
commons-codec
</artifactId>
</dependency>
<!-- Tests -->
<dependency>
<groupId>
junit
</groupId>
<artifactId>
junit
</artifactId>
<scope>
test
</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>
maven-jar-plugin
</artifactId>
<configuration>
<archive>
<manifest>
<addDefaultImplementationEntries>
true
</addDefaultImplementationEntries>
<addDefaultSpecificationEntries>
true
</addDefaultSpecificationEntries>
</manifest>
</archive>
</configuration>
</plugin>
</plugins>
</build>
</project>
hll/src/main/java/io/druid/query/aggregation/HyperloglogAggregator.java
已删除
100755 → 0
浏览文件 @
4c133272
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package
io.druid.query.aggregation
;
import
com.google.common.hash.Hashing
;
import
com.metamx.common.ISE
;
import
com.metamx.common.logger.Logger
;
import
gnu.trove.map.TIntByteMap
;
import
gnu.trove.map.hash.TIntByteHashMap
;
import
io.druid.segment.ObjectColumnSelector
;
import
java.util.Comparator
;
public
class
HyperloglogAggregator
implements
Aggregator
{
private
static
final
Logger
log
=
new
Logger
(
HyperloglogAggregator
.
class
);
public
static
final
int
log2m
=
12
;
public
static
final
int
m
=
(
int
)
Math
.
pow
(
2
,
log2m
);
public
static
final
double
alphaMM
=
(
0.7213
/
(
1
+
1.079
/
m
))
*
m
*
m
;
private
final
String
name
;
private
final
ObjectColumnSelector
selector
;
private
TIntByteHashMap
ibMap
;
static
final
Comparator
COMPARATOR
=
new
Comparator
()
{
@Override
public
int
compare
(
Object
o
,
Object
o1
)
{
return
o
.
equals
(
o1
)
?
0
:
1
;
}
};
public
static
Object
combine
(
Object
lhs
,
Object
rhs
)
{
final
TIntByteMap
newIbMap
=
new
TIntByteHashMap
((
TIntByteMap
)
lhs
);
final
TIntByteMap
rightIbMap
=
(
TIntByteMap
)
rhs
;
final
int
[]
keys
=
rightIbMap
.
keys
();
for
(
int
key
:
keys
)
{
if
(
newIbMap
.
get
(
key
)
==
newIbMap
.
getNoEntryValue
()
||
rightIbMap
.
get
(
key
)
>
newIbMap
.
get
(
key
))
{
newIbMap
.
put
(
key
,
rightIbMap
.
get
(
key
));
}
}
return
newIbMap
;
}
public
HyperloglogAggregator
(
String
name
,
ObjectColumnSelector
selector
)
{
this
.
name
=
name
;
this
.
selector
=
selector
;
this
.
ibMap
=
new
TIntByteHashMap
();
}
@Override
public
void
aggregate
()
{
final
Object
value
=
selector
.
get
();
if
(
value
==
null
)
{
return
;
}
if
(
value
instanceof
TIntByteHashMap
)
{
final
TIntByteHashMap
newIbMap
=
(
TIntByteHashMap
)
value
;
final
int
[]
indexes
=
newIbMap
.
keys
();
for
(
int
index
:
indexes
)
{
if
(
ibMap
.
get
(
index
)
==
ibMap
.
getNoEntryValue
()
||
newIbMap
.
get
(
index
)
>
ibMap
.
get
(
index
))
{
ibMap
.
put
(
index
,
newIbMap
.
get
(
index
));
}
}
}
else
if
(
value
instanceof
String
)
{
log
.
debug
(
"value [%s]"
,
selector
.
get
());
final
long
id
=
Hashing
.
murmur3_128
().
hashString
((
String
)
(
value
)).
asLong
();
final
int
bucket
=
(
int
)
(
id
>>>
(
Long
.
SIZE
-
log2m
));
final
int
zerolength
=
Long
.
numberOfLeadingZeros
((
id
<<
log2m
)
|
(
1
<<
(
log2m
-
1
))
+
1
)
+
1
;
if
(
ibMap
.
get
(
bucket
)
==
ibMap
.
getNoEntryValue
()
||
ibMap
.
get
(
bucket
)
<
(
byte
)
zerolength
)
{
ibMap
.
put
(
bucket
,
(
byte
)
zerolength
);
}
}
else
{
throw
new
ISE
(
"Aggregate does not support values of type[%s]"
,
value
.
getClass
().
getName
());
}
}
@Override
public
void
reset
()
{
this
.
ibMap
=
new
TIntByteHashMap
();
}
@Override
public
Object
get
()
{
return
ibMap
;
}
@Override
public
float
getFloat
()
{
throw
new
UnsupportedOperationException
(
"HyperloglogAggregator does not support getFloat()"
);
}
@Override
public
String
getName
()
{
return
name
;
}
@Override
public
void
close
()
{
// do nothing
}
}
hll/src/main/java/io/druid/query/aggregation/HyperloglogAggregatorFactory.java
已删除
100755 → 0
浏览文件 @
4c133272
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package
io.druid.query.aggregation
;
import
com.fasterxml.jackson.annotation.JsonCreator
;
import
com.fasterxml.jackson.annotation.JsonProperty
;
import
com.google.common.base.Preconditions
;
import
com.metamx.common.logger.Logger
;
import
gnu.trove.map.hash.TIntByteHashMap
;
import
io.druid.segment.ColumnSelectorFactory
;
import
org.apache.commons.codec.binary.Base64
;
import
java.nio.ByteBuffer
;
import
java.util.Arrays
;
import
java.util.Comparator
;
import
java.util.List
;
public
class
HyperloglogAggregatorFactory
implements
AggregatorFactory
{
private
static
final
Logger
log
=
new
Logger
(
HyperloglogAggregatorFactory
.
class
);
private
static
final
byte
[]
CACHE_KEY
=
new
byte
[]{
0x37
};
private
final
String
name
;
private
final
String
fieldName
;
@JsonCreator
public
HyperloglogAggregatorFactory
(
@JsonProperty
(
"name"
)
final
String
name
,
@JsonProperty
(
"fieldName"
)
final
String
fieldName
)
{
Preconditions
.
checkNotNull
(
name
,
"Must have a valid, non-null aggregator name"
);
Preconditions
.
checkNotNull
(
fieldName
,
"Must have a valid, non-null fieldName"
);
this
.
name
=
name
;
this
.
fieldName
=
fieldName
;
}
@Override
public
Aggregator
factorize
(
ColumnSelectorFactory
metricFactory
)
{
return
new
HyperloglogAggregator
(
name
,
metricFactory
.
makeObjectColumnSelector
(
fieldName
)
);
}
@Override
public
BufferAggregator
factorizeBuffered
(
ColumnSelectorFactory
metricFactory
)
{
return
new
HyperloglogBufferAggregator
(
metricFactory
.
makeObjectColumnSelector
(
fieldName
)
);
}
@Override
public
Comparator
getComparator
()
{
return
HyperloglogAggregator
.
COMPARATOR
;
}
@Override
public
Object
combine
(
Object
lhs
,
Object
rhs
)
{
if
(
rhs
==
null
)
{
return
lhs
;
}
if
(
lhs
==
null
)
{
return
rhs
;
}
return
HyperloglogAggregator
.
combine
(
lhs
,
rhs
);
}
@Override
public
AggregatorFactory
getCombiningFactory
()
{
log
.
debug
(
"factory name: %s"
,
name
);
return
new
HyperloglogAggregatorFactory
(
name
,
fieldName
);
}
@Override
public
Object
deserialize
(
Object
object
)
{
log
.
debug
(
"class name: [%s]:value [%s]"
,
object
.
getClass
().
getName
(),
object
);
final
String
k
=
(
String
)
object
;
final
byte
[]
ibmapByte
=
Base64
.
decodeBase64
(
k
);
final
ByteBuffer
buffer
=
ByteBuffer
.
wrap
(
ibmapByte
);
final
int
keylength
=
buffer
.
getInt
();
final
int
valuelength
=
buffer
.
getInt
();
TIntByteHashMap
newIbMap
;
if
(
keylength
==
0
)
{
newIbMap
=
new
TIntByteHashMap
();
}
else
{
final
int
[]
keys
=
new
int
[
keylength
];
final
byte
[]
values
=
new
byte
[
valuelength
];
for
(
int
i
=
0
;
i
<
keylength
;
i
++)
{
keys
[
i
]
=
buffer
.
getInt
();
}
buffer
.
get
(
values
);
newIbMap
=
new
TIntByteHashMap
(
keys
,
values
);
}
return
newIbMap
;
}
@Override
public
Object
finalizeComputation
(
Object
object
)
{
final
TIntByteHashMap
ibMap
=
(
TIntByteHashMap
)
object
;
final
int
[]
keys
=
ibMap
.
keys
();
final
int
count
=
keys
.
length
;
double
registerSum
=
0
;
double
zeros
=
0.0
;
for
(
int
key
:
keys
)
{
int
val
=
ibMap
.
get
(
key
);
registerSum
+=
1.0
/
(
1
<<
val
);
if
(
val
==
0
)
{
zeros
++;
}
}
registerSum
+=
(
HyperloglogAggregator
.
m
-
count
);
zeros
+=
HyperloglogAggregator
.
m
-
count
;
double
estimate
=
HyperloglogAggregator
.
alphaMM
*
(
1.0
/
registerSum
);
if
(
estimate
<=
(
5.0
/
2.0
)
*
(
HyperloglogAggregator
.
m
))
{
// Small Range Estimate
return
Math
.
round
(
HyperloglogAggregator
.
m
*
Math
.
log
(
HyperloglogAggregator
.
m
/
zeros
));
}
else
{
return
Math
.
round
(
estimate
);
}
}
@JsonProperty
public
String
getFieldName
()
{
return
fieldName
;
}
@Override
@JsonProperty
public
String
getName
()
{
return
name
;
}
@Override
public
List
<
String
>
requiredFields
()
{
return
Arrays
.
asList
(
fieldName
);
}
@Override
public
byte
[]
getCacheKey
()
{
byte
[]
fieldNameBytes
=
fieldName
.
getBytes
();
return
ByteBuffer
.
allocate
(
1
+
fieldNameBytes
.
length
).
put
(
CACHE_KEY
)
.
put
(
fieldNameBytes
).
array
();
}
@Override
public
String
getTypeName
()
{
return
"hyperloglog"
;
}
@Override
public
int
getMaxIntermediateSize
()
{
return
HyperloglogAggregator
.
m
;
}
@Override
public
Object
getAggregatorStartValue
()
{
return
new
TIntByteHashMap
();
}
}
hll/src/main/java/io/druid/query/aggregation/HyperloglogBufferAggregator.java
已删除
100755 → 0
浏览文件 @
4c133272
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package
io.druid.query.aggregation
;
import
gnu.trove.map.hash.TIntByteHashMap
;
import
gnu.trove.procedure.TIntByteProcedure
;
import
io.druid.segment.ObjectColumnSelector
;
import
java.nio.ByteBuffer
;
public
class
HyperloglogBufferAggregator
implements
BufferAggregator
{
private
final
ObjectColumnSelector
selector
;
public
HyperloglogBufferAggregator
(
ObjectColumnSelector
selector
)
{
this
.
selector
=
selector
;
}
/*
* byte 1 key length byte 2 value length byte 3...n key array byte n+1....
* value array
*/
@Override
public
void
init
(
ByteBuffer
buf
,
int
position
)
{
for
(
int
i
=
0
;
i
<
HyperloglogAggregator
.
m
;
i
++)
{
buf
.
put
(
position
+
i
,
(
byte
)
0
);
}
}
@Override
public
void
aggregate
(
ByteBuffer
buf
,
int
position
)
{
final
ByteBuffer
fb
=
buf
;
final
int
fp
=
position
;
final
TIntByteHashMap
newObj
=
(
TIntByteHashMap
)
(
selector
.
get
());
newObj
.
forEachEntry
(
new
TIntByteProcedure
()
{
public
boolean
execute
(
int
a
,
byte
b
)
{
if
(
b
>
fb
.
get
(
fp
+
a
))
{
fb
.
put
(
fp
+
a
,
b
);
}
return
true
;
}
}
);
}
@Override
public
Object
get
(
ByteBuffer
buf
,
int
position
)
{
final
TIntByteHashMap
ret
=
new
TIntByteHashMap
();
for
(
int
i
=
0
;
i
<
HyperloglogAggregator
.
m
;
i
++)
{
if
(
buf
.
get
(
position
+
i
)
!=
0
)
{
ret
.
put
(
i
,
buf
.
get
(
position
+
i
));
}
}
return
ret
;
}
@Override
public
float
getFloat
(
ByteBuffer
buf
,
int
position
)
{
throw
new
UnsupportedOperationException
(
"HyperloglogAggregator does not support getFloat()"
);
}
@Override
public
void
close
()
{
// do nothing
}
}
hll/src/main/java/io/druid/query/aggregation/HyperloglogComplexMetricSerde.java
已删除
100755 → 0
浏览文件 @
4c133272
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package
io.druid.query.aggregation
;
import
gnu.trove.map.hash.TIntByteHashMap
;
import
io.druid.data.input.InputRow
;
import
io.druid.segment.column.ColumnBuilder
;
import
io.druid.segment.column.ValueType
;
import
io.druid.segment.data.GenericIndexed
;
import
io.druid.segment.data.ObjectStrategy
;
import
io.druid.segment.serde.ColumnPartSerde
;
import
io.druid.segment.serde.ComplexColumnPartSerde
;
import
io.druid.segment.serde.ComplexColumnPartSupplier
;
import
io.druid.segment.serde.ComplexMetricExtractor
;
import
io.druid.segment.serde.ComplexMetricSerde
;
import
java.nio.ByteBuffer
;
import
java.util.List
;
public
class
HyperloglogComplexMetricSerde
extends
ComplexMetricSerde
{
@Override
public
String
getTypeName
()
{
return
"hyperloglog"
;
}
@Override
public
ComplexMetricExtractor
getExtractor
()
{
return
new
HyperloglogComplexMetricExtractor
();
}
@Override
public
ColumnPartSerde
deserializeColumn
(
ByteBuffer
buffer
,
ColumnBuilder
builder
)
{
GenericIndexed
column
=
GenericIndexed
.
read
(
buffer
,
getObjectStrategy
());
builder
.
setType
(
ValueType
.
COMPLEX
);
builder
.
setComplexColumn
(
new
ComplexColumnPartSupplier
(
"hyperloglog"
,
column
));
return
new
ComplexColumnPartSerde
(
column
,
"hyperloglog"
);
}
@Override
public
ObjectStrategy
getObjectStrategy
()
{
return
new
HyperloglogObjectStrategy
();
}
public
static
class
HyperloglogObjectStrategy
implements
ObjectStrategy
<
TIntByteHashMap
>
{
@Override
public
Class
<?
extends
TIntByteHashMap
>
getClazz
()
{
return
TIntByteHashMap
.
class
;
}
@Override
public
TIntByteHashMap
fromByteBuffer
(
ByteBuffer
buffer
,
int
numBytes
)
{
int
keylength
=
buffer
.
getInt
();
int
valuelength
=
buffer
.
getInt
();
if
(
keylength
==
0
)
{
return
new
TIntByteHashMap
();
}
int
[]
keys
=
new
int
[
keylength
];
byte
[]
values
=
new
byte
[
valuelength
];
for
(
int
i
=
0
;
i
<
keylength
;
i
++)
{
keys
[
i
]
=
buffer
.
getInt
();
}
buffer
.
get
(
values
);
TIntByteHashMap
tib
=
new
TIntByteHashMap
(
keys
,
values
);
return
tib
;
}
@Override
public
byte
[]
toBytes
(
TIntByteHashMap
val
)
{
TIntByteHashMap
ibmap
=
val
;
int
[]
indexesResult
=
ibmap
.
keys
();
byte
[]
valueResult
=
ibmap
.
values
();
ByteBuffer
buffer
=
ByteBuffer
.
allocate
(
4
*
indexesResult
.
length
+
valueResult
.
length
+
8
);
byte
[]
result
=
new
byte
[
4
*
indexesResult
.
length
+
valueResult
.
length
+
8
];
buffer
.
putInt
((
int
)
indexesResult
.
length
);
buffer
.
putInt
((
int
)
valueResult
.
length
);
for
(
int
i
=
0
;
i
<
indexesResult
.
length
;
i
++)
{
buffer
.
putInt
(
indexesResult
[
i
]);
}
buffer
.
put
(
valueResult
);
buffer
.
flip
();
buffer
.
get
(
result
);
return
result
;
}
@Override
public
int
compare
(
TIntByteHashMap
o1
,
TIntByteHashMap
o2
)
{
return
o1
.
equals
(
o2
)
?
0
:
1
;
}
}
public
static
class
HyperloglogComplexMetricExtractor
implements
ComplexMetricExtractor
{
@Override
public
Class
<?>
extractedClass
()
{
return
List
.
class
;
}
@Override
public
Object
extractValue
(
InputRow
inputRow
,
String
metricName
)
{
return
inputRow
.
getRaw
(
metricName
);
}
}
}
hll/src/main/java/io/druid/query/aggregation/HyperloglogDruidModule.java
已删除
100644 → 0
浏览文件 @
4c133272
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package
io.druid.query.aggregation
;
import
com.fasterxml.jackson.core.JsonGenerator
;
import
com.fasterxml.jackson.core.JsonParser
;
import
com.fasterxml.jackson.core.JsonProcessingException
;
import
com.fasterxml.jackson.databind.DeserializationContext
;
import
com.fasterxml.jackson.databind.JsonDeserializer
;
import
com.fasterxml.jackson.databind.JsonSerializer
;
import
com.fasterxml.jackson.databind.Module
;
import
com.fasterxml.jackson.databind.SerializerProvider
;
import
com.fasterxml.jackson.databind.jsontype.NamedType
;
import
com.fasterxml.jackson.databind.module.SimpleModule
;
import
com.google.common.collect.ImmutableList
;
import
com.google.inject.Binder
;
import
gnu.trove.map.hash.TIntByteHashMap
;
import
io.druid.initialization.DruidModule
;
import
io.druid.segment.serde.ComplexMetrics
;
import
org.apache.commons.codec.binary.Base64
;
import
java.io.IOException
;
import
java.nio.ByteBuffer
;
import
java.util.List
;
/**
*/
public
class
HyperloglogDruidModule
implements
DruidModule
{
@Override
public
List
<?
extends
Module
>
getJacksonModules
()
{
return
ImmutableList
.
of
(
new
HyperloglogJacksonSerdeModule
().
registerSubtypes
(
new
NamedType
(
HyperloglogAggregatorFactory
.
class
,
"hyperloglog"
)
)
);
}
@Override
public
void
configure
(
Binder
binder
)
{
if
(
ComplexMetrics
.
getSerdeForType
(
"hyperloglog"
)
==
null
)
{
ComplexMetrics
.
registerSerde
(
"hyperloglog"
,
new
HyperloglogComplexMetricSerde
());
}
}
public
static
class
HyperloglogJacksonSerdeModule
extends
SimpleModule
{
public
HyperloglogJacksonSerdeModule
()
{
super
(
"Hyperloglog deserializers"
);
addDeserializer
(
TIntByteHashMap
.
class
,
new
JsonDeserializer
<
TIntByteHashMap
>()
{
@Override
public
TIntByteHashMap
deserialize
(
JsonParser
jp
,
DeserializationContext
ctxt
)
throws
IOException
{
byte
[]
ibmapByte
=
Base64
.
decodeBase64
(
jp
.
getText
());
ByteBuffer
buffer
=
ByteBuffer
.
wrap
(
ibmapByte
);
int
keylength
=
buffer
.
getInt
();
int
valuelength
=
buffer
.
getInt
();
if
(
keylength
==
0
)
{
return
(
new
TIntByteHashMap
());
}
int
[]
keys
=
new
int
[
keylength
];
byte
[]
values
=
new
byte
[
valuelength
];
for
(
int
i
=
0
;
i
<
keylength
;
i
++)
{
keys
[
i
]
=
buffer
.
getInt
();
}
buffer
.
get
(
values
);
return
(
new
TIntByteHashMap
(
keys
,
values
));
}
}
);
addSerializer
(
TIntByteHashMap
.
class
,
new
JsonSerializer
<
TIntByteHashMap
>()
{
@Override
public
void
serialize
(
TIntByteHashMap
ibmap
,
JsonGenerator
jsonGenerator
,
SerializerProvider
serializerProvider
)
throws
IOException
,
JsonProcessingException
{
int
[]
indexesResult
=
ibmap
.
keys
();
byte
[]
valueResult
=
ibmap
.
values
();
ByteBuffer
buffer
=
ByteBuffer
.
allocate
(
4
*
indexesResult
.
length
+
valueResult
.
length
+
8
);
byte
[]
result
=
new
byte
[
4
*
indexesResult
.
length
+
valueResult
.
length
+
8
];
buffer
.
putInt
((
int
)
indexesResult
.
length
);
buffer
.
putInt
((
int
)
valueResult
.
length
);
for
(
int
i
=
0
;
i
<
indexesResult
.
length
;
i
++)
{
buffer
.
putInt
(
indexesResult
[
i
]);
}
buffer
.
put
(
valueResult
);
buffer
.
flip
();
buffer
.
get
(
result
);
String
str
=
Base64
.
encodeBase64String
(
result
);
jsonGenerator
.
writeString
(
str
);
}
}
);
}
}
}
hll/src/main/resources/META-INF/services/io.druid.initialization.DruidModule
已删除
100644 → 0
浏览文件 @
4c133272
io.druid.query.aggregation.HyperloglogDruidModule
\ No newline at end of file
hll/src/test/java/io/druid/query/aggregation/HyperloglogAggregatorTest.java
已删除
100644 → 0
浏览文件 @
4c133272
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package
io.druid.query.aggregation
;
import
gnu.trove.map.hash.TIntByteHashMap
;
import
org.junit.Assert
;
import
org.junit.Test
;
import
java.util.Comparator
;
public
class
HyperloglogAggregatorTest
{
@Test
public
void
testAggregate
()
{
final
TestHllComplexMetricSelector
selector
=
new
TestHllComplexMetricSelector
();
final
HyperloglogAggregatorFactory
aggFactory
=
new
HyperloglogAggregatorFactory
(
"billy"
,
"billyG"
);
final
HyperloglogAggregator
agg
=
new
HyperloglogAggregator
(
"billy"
,
selector
);
Assert
.
assertEquals
(
"billy"
,
agg
.
getName
());
Assert
.
assertEquals
(
0L
,
aggFactory
.
finalizeComputation
(
agg
.
get
()));
Assert
.
assertEquals
(
0L
,
aggFactory
.
finalizeComputation
(
agg
.
get
()));
Assert
.
assertEquals
(
0L
,
aggFactory
.
finalizeComputation
(
agg
.
get
()));
aggregate
(
selector
,
agg
);
aggregate
(
selector
,
agg
);
aggregate
(
selector
,
agg
);
Assert
.
assertEquals
(
3L
,
aggFactory
.
finalizeComputation
(
agg
.
get
()));
Assert
.
assertEquals
(
3L
,
aggFactory
.
finalizeComputation
(
agg
.
get
()));
Assert
.
assertEquals
(
3L
,
aggFactory
.
finalizeComputation
(
agg
.
get
()));
aggregate
(
selector
,
agg
);
aggregate
(
selector
,
agg
);
Assert
.
assertEquals
(
5L
,
aggFactory
.
finalizeComputation
(
agg
.
get
()));
Assert
.
assertEquals
(
5L
,
aggFactory
.
finalizeComputation
(
agg
.
get
()));
}
@Test
public
void
testComparator
()
{
final
TestHllComplexMetricSelector
selector
=
new
TestHllComplexMetricSelector
();
final
Comparator
comp
=
new
HyperloglogAggregatorFactory
(
"billy"
,
"billyG"
).
getComparator
();
final
HyperloglogAggregator
agg
=
new
HyperloglogAggregator
(
"billy"
,
selector
);
Object
first
=
new
TIntByteHashMap
((
TIntByteHashMap
)
agg
.
get
());
agg
.
aggregate
();
Assert
.
assertEquals
(
0
,
comp
.
compare
(
first
,
first
));
Assert
.
assertEquals
(
0
,
comp
.
compare
(
agg
.
get
(),
agg
.
get
()));
Assert
.
assertEquals
(
1
,
comp
.
compare
(
agg
.
get
(),
first
));
}
@Test
public
void
testHighCardinalityAggregate
()
{
final
TestHllComplexMetricSelector
selector
=
new
TestHllComplexMetricSelector
();
final
HyperloglogAggregatorFactory
aggFactory
=
new
HyperloglogAggregatorFactory
(
"billy"
,
"billyG"
);
final
HyperloglogAggregator
agg
=
new
HyperloglogAggregator
(
"billy"
,
selector
);
final
int
card
=
100000
;
for
(
int
i
=
0
;
i
<
card
;
i
++)
{
aggregate
(
selector
,
agg
);
}
Assert
.
assertEquals
(
99443L
,
aggFactory
.
finalizeComputation
(
agg
.
get
()));
}
// Provides a nice printout of error rates as a function of cardinality
//@Test
public
void
benchmarkAggregation
()
throws
Exception
{
final
TestHllComplexMetricSelector
selector
=
new
TestHllComplexMetricSelector
();
final
HyperloglogAggregatorFactory
aggFactory
=
new
HyperloglogAggregatorFactory
(
"billy"
,
"billyG"
);
double
error
=
0.0d
;
int
count
=
0
;
final
int
[]
valsToCheck
=
{
10
,
20
,
50
,
100
,
1000
,
2000
,
5000
,
10000
,
20000
,
50000
,
100000
,
1000000
,
2000000
,
10000000
,
Integer
.
MAX_VALUE
};
for
(
int
numThings
:
valsToCheck
)
{
long
startTime
=
System
.
currentTimeMillis
();
final
HyperloglogAggregator
agg
=
new
HyperloglogAggregator
(
"billy"
,
selector
);
for
(
int
i
=
0
;
i
<
numThings
;
++
i
)
{
if
(
i
!=
0
&&
i
%
100000000
==
0
)
{
++
count
;
error
=
computeError
(
error
,
count
,
i
,
(
Long
)
aggFactory
.
finalizeComputation
(
agg
.
get
()),
startTime
);
}
aggregate
(
selector
,
agg
);
}
++
count
;
error
=
computeError
(
error
,
count
,
numThings
,
(
Long
)
aggFactory
.
finalizeComputation
(
agg
.
get
()),
startTime
);
}
}
//@Test
public
void
benchmarkCombine
()
throws
Exception
{
int
count
;
long
totalTime
=
0
;
final
TestHllComplexMetricSelector
selector
=
new
TestHllComplexMetricSelector
();
TIntByteHashMap
combined
=
new
TIntByteHashMap
();
for
(
count
=
0
;
count
<
1000000
;
++
count
)
{
final
HyperloglogAggregator
agg
=
new
HyperloglogAggregator
(
"billy"
,
selector
);
aggregate
(
selector
,
agg
);
long
start
=
System
.
nanoTime
();
combined
=
(
TIntByteHashMap
)
HyperloglogAggregator
.
combine
(
agg
.
get
(),
combined
);
totalTime
+=
System
.
nanoTime
()
-
start
;
}
System
.
out
.
printf
(
"benchmarkCombine took %d ms%n"
,
totalTime
/
1000000
);
}
private
double
computeError
(
double
error
,
int
count
,
long
exactValue
,
long
estimatedValue
,
long
startTime
)
{
final
double
errorThisTime
=
Math
.
abs
((
double
)
exactValue
-
estimatedValue
)
/
exactValue
;
error
+=
errorThisTime
;
System
.
out
.
printf
(
"%,d ==? %,d in %,d millis. actual error[%,f%%], avg. error [%,f%%]%n"
,
exactValue
,
estimatedValue
,
System
.
currentTimeMillis
()
-
startTime
,
100
*
errorThisTime
,
(
error
/
count
)
*
100
);
return
error
;
}
private
void
aggregate
(
TestHllComplexMetricSelector
selector
,
HyperloglogAggregator
agg
)
{
agg
.
aggregate
();
selector
.
increment
();
}
}
hll/src/test/java/io/druid/query/aggregation/TestHllComplexMetricSelector.java
已删除
100644 → 0
浏览文件 @
4c133272
/*
* Druid - a distributed column store.
* Copyright (C) 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package
io.druid.query.aggregation
;
import
io.druid.segment.ObjectColumnSelector
;
public
class
TestHllComplexMetricSelector
implements
ObjectColumnSelector
<
String
>
{
private
int
index
=
0
;
@Override
public
Class
<
String
>
classOfObject
()
{
return
String
.
class
;
}
@Override
public
String
get
()
{
return
String
.
valueOf
(
index
);
}
public
void
increment
()
{
++
index
;
}
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录