Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
2dot5
ClickHouse
提交
67d99d47
C
ClickHouse
项目概览
2dot5
/
ClickHouse
通知
3
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
C
ClickHouse
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
67d99d47
编写于
9月 19, 2011
作者:
A
Alexey Milovidov
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
dbms: development.
上级
c675a0fa
变更
18
隐藏空白更改
内联
并排
Showing
18 changed file
with
548 addition
and
63 deletion
+548
-63
dbms/include/DB/Columns/ColumnAggregateFunction.h
dbms/include/DB/Columns/ColumnAggregateFunction.h
+58
-0
dbms/include/DB/Columns/ColumnVector.h
dbms/include/DB/Columns/ColumnVector.h
+1
-4
dbms/include/DB/Core/ErrorCodes.h
dbms/include/DB/Core/ErrorCodes.h
+3
-0
dbms/include/DB/Core/Field.h
dbms/include/DB/Core/Field.h
+15
-6
dbms/include/DB/Core/Row.h
dbms/include/DB/Core/Row.h
+1
-4
dbms/include/DB/DataStreams/IRowInputStream.h
dbms/include/DB/DataStreams/IRowInputStream.h
+1
-4
dbms/include/DB/DataTypes/DataTypeAggregateFunction.h
dbms/include/DB/DataTypes/DataTypeAggregateFunction.h
+45
-0
dbms/include/DB/DataTypes/DataTypeFactory.h
dbms/include/DB/DataTypes/DataTypeFactory.h
+5
-1
dbms/include/DB/DataTypes/FieldToDataType.h
dbms/include/DB/DataTypes/FieldToDataType.h
+7
-1
dbms/include/DB/Interpreters/Aggregate.h
dbms/include/DB/Interpreters/Aggregate.h
+38
-0
dbms/include/DB/Interpreters/Context.h
dbms/include/DB/Interpreters/Context.h
+8
-6
dbms/include/DB/Interpreters/InterpreterSelectQuery.h
dbms/include/DB/Interpreters/InterpreterSelectQuery.h
+2
-0
dbms/include/DB/Parsers/ASTFunction.h
dbms/include/DB/Parsers/ASTFunction.h
+4
-4
dbms/src/DataTypes/DataTypeAggregateFunction.cpp
dbms/src/DataTypes/DataTypeAggregateFunction.cpp
+110
-0
dbms/src/DataTypes/DataTypeFactory.cpp
dbms/src/DataTypes/DataTypeFactory.cpp
+2
-2
dbms/src/Interpreters/Aggregate.cpp
dbms/src/Interpreters/Aggregate.cpp
+78
-0
dbms/src/Interpreters/Expression.cpp
dbms/src/Interpreters/Expression.cpp
+38
-31
dbms/src/Interpreters/tests/aggregate.cpp
dbms/src/Interpreters/tests/aggregate.cpp
+132
-0
未找到文件。
dbms/include/DB/Columns/ColumnAggregateFunction.h
0 → 100644
浏览文件 @
67d99d47
#pragma once
#include <DB/AggregateFunctions/IAggregateFunction.h>
#include <DB/Columns/ColumnVector.h>
namespace
DB
{
/** Столбец, хранящий состояния агрегатных функций.
*/
class
ColumnAggregateFunction
:
public
ColumnVector
<
AggregateFunctionPtr
>
{
public:
std
::
string
getName
()
const
{
return
"ColumnAggregateFunction"
;
}
bool
isNumeric
()
const
{
return
false
;
}
Field
operator
[](
size_t
n
)
const
{
return
data
[
n
];
}
void
cut
(
size_t
start
,
size_t
length
)
{
if
(
length
==
0
||
start
+
length
>
data
.
size
())
throw
Exception
(
"Parameters start = "
+
Poco
::
NumberFormatter
::
format
(
start
)
+
", length = "
+
Poco
::
NumberFormatter
::
format
(
length
)
+
" are out of bound in IColumnVector<T>::cut() method"
" (data.size() = "
+
Poco
::
NumberFormatter
::
format
(
data
.
size
())
+
")."
,
ErrorCodes
::
PARAMETER_OUT_OF_BOUND
);
if
(
start
==
0
)
data
.
resize
(
length
);
else
{
Container_t
tmp
(
data
.
begin
()
+
start
,
data
.
begin
()
+
start
+
length
);
tmp
.
swap
(
data
);
}
}
void
insert
(
const
Field
&
x
)
{
data
.
push_back
(
boost
::
get
<
AggregateFunctionPtr
>
(
x
));
}
int
compareAt
(
size_t
n
,
size_t
m
,
const
IColumn
&
rhs_
)
const
{
return
0
;
}
private:
Container_t
data
;
};
}
dbms/include/DB/Columns/ColumnVector.h
浏览文件 @
67d99d47
#ifndef DBMS_CORE_COLUMN_VECTOR_H
#define DBMS_CORE_COLUMN_VECTOR_H
#pragma once
#include <string.h>
...
...
@@ -142,5 +141,3 @@ private:
}
#endif
dbms/include/DB/Core/ErrorCodes.h
浏览文件 @
67d99d47
...
...
@@ -68,6 +68,9 @@ namespace ErrorCodes
UNKNOWN_TABLE
,
ONLY_FILTER_COLUMN_IN_BLOCK
,
SYNTAX_ERROR
,
UNKNOWN_AGGREGATE_FUNCTION
,
CANNOT_READ_AGGREGATE_FUNCTION_FROM_TEXT
,
CANNOT_WRITE_AGGREGATE_FUNCTION_AS_TEXT
,
};
}
...
...
dbms/include/DB/Core/Field.h
浏览文件 @
67d99d47
...
...
@@ -17,6 +17,10 @@
namespace
DB
{
using
Poco
::
SharedPtr
;
class
IAggregateFunction
;
/** Типы данных для представления единичного значения произвольного типа в оперативке.
* Внимание! Предпочтительно вместо единичных значений хранить кусочки столбцов. См. Column.h
*/
...
...
@@ -27,6 +31,7 @@ typedef boost::make_recursive_variant<
Int64
,
Float64
,
String
,
SharedPtr
<
IAggregateFunction
>
,
std
::
vector
<
boost
::
recursive_variant_
>
/// Array, Tuple
>::
type
Field
;
...
...
@@ -43,6 +48,7 @@ namespace FieldType
Int64
,
Float64
,
String
,
AggregateFunction
,
Array
};
}
...
...
@@ -65,6 +71,7 @@ public:
FieldType
::
Enum
operator
()
(
const
Int64
&
x
)
const
{
return
FieldType
::
Int64
;
}
FieldType
::
Enum
operator
()
(
const
Float64
&
x
)
const
{
return
FieldType
::
Float64
;
}
FieldType
::
Enum
operator
()
(
const
String
&
x
)
const
{
return
FieldType
::
String
;
}
FieldType
::
Enum
operator
()
(
const
SharedPtr
<
IAggregateFunction
>
&
x
)
const
{
return
FieldType
::
AggregateFunction
;
}
FieldType
::
Enum
operator
()
(
const
Array
&
x
)
const
{
return
FieldType
::
Array
;
}
};
...
...
@@ -72,19 +79,20 @@ public:
class
FieldVisitorDump
:
public
boost
::
static_visitor
<
std
::
string
>
{
public:
std
::
string
operator
()
(
const
Null
&
x
)
const
{
return
"NULL"
;
}
std
::
string
operator
()
(
const
UInt64
&
x
)
const
{
return
"UInt64_"
+
Poco
::
NumberFormatter
::
format
(
x
);
}
std
::
string
operator
()
(
const
Int64
&
x
)
const
{
return
"Int64_"
+
Poco
::
NumberFormatter
::
format
(
x
);
}
std
::
string
operator
()
(
const
Float64
&
x
)
const
{
return
"Float64_"
+
Poco
::
NumberFormatter
::
format
(
x
);
}
String
operator
()
(
const
Null
&
x
)
const
{
return
"NULL"
;
}
String
operator
()
(
const
UInt64
&
x
)
const
{
return
"UInt64_"
+
Poco
::
NumberFormatter
::
format
(
x
);
}
String
operator
()
(
const
Int64
&
x
)
const
{
return
"Int64_"
+
Poco
::
NumberFormatter
::
format
(
x
);
}
String
operator
()
(
const
Float64
&
x
)
const
{
return
"Float64_"
+
Poco
::
NumberFormatter
::
format
(
x
);
}
String
operator
()
(
const
SharedPtr
<
IAggregateFunction
>
&
x
)
const
{
return
"AggregateFunction"
;
}
std
::
s
tring
operator
()
(
const
String
&
x
)
const
S
tring
operator
()
(
const
String
&
x
)
const
{
std
::
stringstream
s
;
s
<<
mysqlxx
::
quote
<<
x
;
return
s
.
str
();
}
std
::
s
tring
operator
()
(
const
Array
&
x
)
const
S
tring
operator
()
(
const
Array
&
x
)
const
{
std
::
stringstream
s
;
...
...
@@ -109,6 +117,7 @@ public:
String
operator
()
(
const
UInt64
&
x
)
const
{
return
Poco
::
NumberFormatter
::
format
(
x
);
}
String
operator
()
(
const
Int64
&
x
)
const
{
return
Poco
::
NumberFormatter
::
format
(
x
);
}
String
operator
()
(
const
Float64
&
x
)
const
{
return
Poco
::
NumberFormatter
::
format
(
x
);
}
String
operator
()
(
const
SharedPtr
<
IAggregateFunction
>
&
x
)
const
{
return
"AggregateFunction"
;
}
String
operator
()
(
const
String
&
x
)
const
{
...
...
dbms/include/DB/Core/Row.h
浏览文件 @
67d99d47
#ifndef DBMS_CORE_ROW_H
#define DBMS_CORE_ROW_H
#pragma once
#include <vector>
...
...
@@ -16,5 +15,3 @@ namespace DB
typedef
std
::
vector
<
Field
>
Row
;
}
#endif
dbms/include/DB/DataStreams/IRowInputStream.h
浏览文件 @
67d99d47
#ifndef DBMS_DATA_STREAMS_IROWINPUTSTREAM_H
#define DBMS_DATA_STREAMS_IROWINPUTSTREAM_H
#pragma once
#include <DB/Core/Row.h>
...
...
@@ -22,5 +21,3 @@ public:
};
}
#endif
dbms/include/DB/DataTypes/DataTypeAggregateFunction.h
0 → 100644
浏览文件 @
67d99d47
#pragma once
#include <DB/AggregateFunctions/IAggregateFunction.h>
#include <DB/AggregateFunctions/AggregateFunctionFactory.h>
#include <DB/DataTypes/IDataType.h>
namespace
DB
{
using
Poco
::
SharedPtr
;
/** Тип - состояние агрегатной функции.
*/
class
DataTypeAggregateFunction
:
public
IDataType
{
private:
const
AggregateFunctionFactoryPtr
factory
;
public:
DataTypeAggregateFunction
()
:
factory
(
new
AggregateFunctionFactory
)
{}
DataTypeAggregateFunction
(
const
AggregateFunctionFactoryPtr
&
factory_
)
:
factory
(
factory_
)
{}
std
::
string
getName
()
const
{
return
"AggregateFunction"
;
}
DataTypePtr
clone
()
const
{
return
new
DataTypeAggregateFunction
(
factory
);
}
void
serializeBinary
(
const
Field
&
field
,
WriteBuffer
&
ostr
)
const
;
void
deserializeBinary
(
Field
&
field
,
ReadBuffer
&
istr
)
const
;
void
serializeBinary
(
const
IColumn
&
column
,
WriteBuffer
&
ostr
)
const
;
void
deserializeBinary
(
IColumn
&
column
,
ReadBuffer
&
istr
,
size_t
limit
)
const
;
void
serializeText
(
const
Field
&
field
,
WriteBuffer
&
ostr
)
const
;
void
deserializeText
(
Field
&
field
,
ReadBuffer
&
istr
)
const
;
void
serializeTextEscaped
(
const
Field
&
field
,
WriteBuffer
&
ostr
)
const
;
void
deserializeTextEscaped
(
Field
&
field
,
ReadBuffer
&
istr
)
const
;
void
serializeTextQuoted
(
const
Field
&
field
,
WriteBuffer
&
ostr
,
bool
compatible
=
false
)
const
;
void
deserializeTextQuoted
(
Field
&
field
,
ReadBuffer
&
istr
,
bool
compatible
=
false
)
const
;
ColumnPtr
createColumn
()
const
;
ColumnPtr
createConstColumn
(
size_t
size
,
const
Field
&
field
)
const
;
};
}
dbms/include/DB/DataTypes/DataTypeFactory.h
浏览文件 @
67d99d47
...
...
@@ -8,6 +8,8 @@
namespace
DB
{
using
Poco
::
SharedPtr
;
/** Позволяет создать тип данных по его имени.
*/
...
...
@@ -15,7 +17,7 @@ class DataTypeFactory
{
public:
DataTypeFactory
();
DataTypePtr
get
(
const
String
&
name
);
DataTypePtr
get
(
const
String
&
name
)
const
;
private:
typedef
std
::
map
<
String
,
DataTypePtr
>
NonParametricDataTypes
;
...
...
@@ -24,5 +26,7 @@ private:
Poco
::
RegularExpression
fixed_string_regexp
;
};
typedef
SharedPtr
<
DataTypeFactory
>
DataTypeFactoryPtr
;
}
dbms/include/DB/DataTypes/FieldToDataType.h
浏览文件 @
67d99d47
...
...
@@ -2,6 +2,7 @@
#include <DB/DataTypes/DataTypesNumberFixed.h>
#include <DB/DataTypes/DataTypeString.h>
#include <DB/DataTypes/DataTypeAggregateFunction.h>
namespace
DB
...
...
@@ -39,7 +40,12 @@ public:
DataTypePtr
operator
()
(
const
String
&
x
)
const
{
return
new
DataTypeString
;
return
new
DataTypeString
;
}
DataTypePtr
operator
()
(
const
AggregateFunctionPtr
&
x
)
const
{
return
new
DataTypeAggregateFunction
;
}
DataTypePtr
operator
()
(
const
Array
&
x
)
const
...
...
dbms/include/DB/Interpreters/Aggregate.h
0 → 100644
浏览文件 @
67d99d47
#pragma once
#include <DB/Core/ColumnNumbers.h>
#include <DB/DataStreams/IBlockInputStream.h>
#include <DB/AggregateFunctions/IAggregateFunction.h>
namespace
DB
{
struct
AggregateDescription
{
AggregateFunctionPtr
function
;
ColumnNumbers
arguments
;
};
typedef
std
::
vector
<
AggregateDescription
>
AggregateDescriptions
;
typedef
std
::
map
<
Row
,
AggregateFunctions
>
AggregatedData
;
/** Агрегирует поток блоков.
*/
class
Aggregate
{
public:
Aggregate
(
const
ColumnNumbers
&
keys_
,
AggregateDescriptions
&
aggregates_
)
:
keys
(
keys_
),
aggregates
(
aggregates_
)
{};
AggregatedData
execute
(
BlockInputStreamPtr
stream
);
private:
ColumnNumbers
keys
;
AggregateDescriptions
aggregates
;
};
}
dbms/include/DB/Interpreters/Context.h
浏览文件 @
67d99d47
...
...
@@ -9,6 +9,7 @@
#include <DB/Core/NamesAndTypes.h>
#include <DB/Storages/IStorage.h>
#include <DB/Functions/IFunction.h>
#include <DB/AggregateFunctions/AggregateFunctionFactory.h>
#include <DB/DataTypes/DataTypeFactory.h>
...
...
@@ -31,12 +32,13 @@ typedef std::map<String, Tables> Databases;
*/
struct
Context
{
String
path
;
/// Путь к директории с данными, со слешем на конце.
SharedPtr
<
Databases
>
databases
;
/// Список БД и таблиц в них.
String
current_database
;
/// Текущая БД.
SharedPtr
<
Functions
>
functions
;
/// Обычные функции.
SharedPtr
<
DataTypeFactory
>
data_type_factory
;
/// Типы данных.
NamesAndTypes
columns
;
/// Столбцы текущей обрабатываемой таблицы.
String
path
;
/// Путь к директории с данными, со слешем на конце.
SharedPtr
<
Databases
>
databases
;
/// Список БД и таблиц в них.
String
current_database
;
/// Текущая БД.
SharedPtr
<
Functions
>
functions
;
/// Обычные функции.
AggregateFunctionFactoryPtr
aggregate_function_factory
;
/// Агрегатные функции.
DataTypeFactoryPtr
data_type_factory
;
/// Типы данных.
NamesAndTypes
columns
;
/// Столбцы текущей обрабатываемой таблицы.
SharedPtr
<
Poco
::
FastMutex
>
mutex
;
/// Для доступа и модификации разделяемых объектов.
...
...
dbms/include/DB/Interpreters/InterpreterSelectQuery.h
浏览文件 @
67d99d47
...
...
@@ -34,6 +34,8 @@ private:
PART_WHERE
=
4
,
PART_HAVING
=
8
,
PART_ORDER
=
16
,
PART_BELOW_AGGREGATE_FUNCTIONS
=
32
,
PART_ABOVE_AGGREGATE_FUNCTIONS
=
64
,
};
...
...
dbms/include/DB/Parsers/ASTFunction.h
浏览文件 @
67d99d47
#ifndef DBMS_PARSERS_ASTFUNCTION_H
#define DBMS_PARSERS_ASTFUNCTION_H
#pragma once
#include <DB/Parsers/IAST.h>
#include <DB/Functions/IFunction.h>
#include <DB/AggregateFunctions/IAggregateFunction.h>
namespace
DB
...
...
@@ -20,6 +20,8 @@ public:
/// сама функция
FunctionPtr
function
;
/// или агрегатная функция
AggregateFunctionPtr
aggregate_function
;
/// типы возвращаемых значений
DataTypes
return_types
;
/// номера столбцов возвращаемых значений
...
...
@@ -33,5 +35,3 @@ public:
};
}
#endif
dbms/src/DataTypes/DataTypeAggregateFunction.cpp
0 → 100644
浏览文件 @
67d99d47
#include <DB/IO/WriteHelpers.h>
#include <DB/IO/ReadHelpers.h>
#include <DB/Columns/ColumnConst.h>
#include <DB/Columns/ColumnAggregateFunction.h>
#include <DB/DataTypes/DataTypeAggregateFunction.h>
namespace
DB
{
using
Poco
::
SharedPtr
;
void
DataTypeAggregateFunction
::
serializeBinary
(
const
Field
&
field
,
WriteBuffer
&
ostr
)
const
{
const
AggregateFunctionPtr
&
value
=
boost
::
get
<
const
AggregateFunctionPtr
&>
(
field
);
writeStringBinary
(
value
->
getName
(),
ostr
);
value
->
serialize
(
ostr
);
}
void
DataTypeAggregateFunction
::
deserializeBinary
(
Field
&
field
,
ReadBuffer
&
istr
)
const
{
String
name
;
readStringBinary
(
name
,
istr
);
AggregateFunctionPtr
value
=
factory
->
get
(
name
);
value
->
deserializeMerge
(
istr
);
field
=
value
;
}
void
DataTypeAggregateFunction
::
serializeBinary
(
const
IColumn
&
column
,
WriteBuffer
&
ostr
)
const
{
const
ColumnAggregateFunction
&
real_column
=
dynamic_cast
<
const
ColumnAggregateFunction
&>
(
column
);
const
ColumnAggregateFunction
::
Container_t
&
vec
=
real_column
.
getData
();
String
name
;
if
(
!
vec
.
empty
())
name
=
vec
[
0
]
->
getName
();
for
(
ColumnAggregateFunction
::
Container_t
::
const_iterator
it
=
vec
.
begin
();
it
!=
vec
.
end
();
++
it
)
{
writeStringBinary
(
name
,
ostr
);
(
*
it
)
->
serialize
(
ostr
);
}
}
void
DataTypeAggregateFunction
::
deserializeBinary
(
IColumn
&
column
,
ReadBuffer
&
istr
,
size_t
limit
)
const
{
ColumnAggregateFunction
&
real_column
=
dynamic_cast
<
ColumnAggregateFunction
&>
(
column
);
ColumnAggregateFunction
::
Container_t
&
vec
=
real_column
.
getData
();
vec
.
reserve
(
limit
);
for
(
size_t
i
=
0
;
i
<
limit
;
++
i
)
{
if
(
istr
.
eof
())
break
;
String
name
;
readStringBinary
(
name
,
istr
);
AggregateFunctionPtr
value
=
factory
->
get
(
name
);
value
->
deserializeMerge
(
istr
);
vec
.
push_back
(
value
);
}
}
void
DataTypeAggregateFunction
::
serializeText
(
const
Field
&
field
,
WriteBuffer
&
ostr
)
const
{
throw
Exception
(
"Cannot write aggregate function as text."
,
ErrorCodes
::
CANNOT_WRITE_AGGREGATE_FUNCTION_AS_TEXT
);
}
void
DataTypeAggregateFunction
::
deserializeText
(
Field
&
field
,
ReadBuffer
&
istr
)
const
{
throw
Exception
(
"Cannot read aggregate function from text."
,
ErrorCodes
::
CANNOT_READ_AGGREGATE_FUNCTION_FROM_TEXT
);
}
void
DataTypeAggregateFunction
::
serializeTextEscaped
(
const
Field
&
field
,
WriteBuffer
&
ostr
)
const
{
throw
Exception
(
"Cannot write aggregate function as text."
,
ErrorCodes
::
CANNOT_WRITE_AGGREGATE_FUNCTION_AS_TEXT
);
}
void
DataTypeAggregateFunction
::
deserializeTextEscaped
(
Field
&
field
,
ReadBuffer
&
istr
)
const
{
throw
Exception
(
"Cannot read aggregate function from text."
,
ErrorCodes
::
CANNOT_READ_AGGREGATE_FUNCTION_FROM_TEXT
);
}
void
DataTypeAggregateFunction
::
serializeTextQuoted
(
const
Field
&
field
,
WriteBuffer
&
ostr
,
bool
compatible
)
const
{
throw
Exception
(
"Cannot write aggregate function as text."
,
ErrorCodes
::
CANNOT_WRITE_AGGREGATE_FUNCTION_AS_TEXT
);
}
void
DataTypeAggregateFunction
::
deserializeTextQuoted
(
Field
&
field
,
ReadBuffer
&
istr
,
bool
compatible
)
const
{
throw
Exception
(
"Cannot read aggregate function from text."
,
ErrorCodes
::
CANNOT_READ_AGGREGATE_FUNCTION_FROM_TEXT
);
}
ColumnPtr
DataTypeAggregateFunction
::
createColumn
()
const
{
return
new
ColumnAggregateFunction
;
}
ColumnPtr
DataTypeAggregateFunction
::
createConstColumn
(
size_t
size
,
const
Field
&
field
)
const
{
return
new
ColumnConst
<
AggregateFunctionPtr
>
(
size
,
boost
::
get
<
AggregateFunctionPtr
>
(
field
));
}
}
dbms/src/DataTypes/DataTypeFactory.cpp
浏览文件 @
67d99d47
...
...
@@ -38,9 +38,9 @@ DataTypeFactory::DataTypeFactory()
}
DataTypePtr
DataTypeFactory
::
get
(
const
String
&
name
)
DataTypePtr
DataTypeFactory
::
get
(
const
String
&
name
)
const
{
NonParametricDataTypes
::
iterator
it
=
non_parametric_data_types
.
find
(
name
);
NonParametricDataTypes
::
const_
iterator
it
=
non_parametric_data_types
.
find
(
name
);
if
(
it
!=
non_parametric_data_types
.
end
())
return
it
->
second
;
...
...
dbms/src/Interpreters/Aggregate.cpp
0 → 100644
浏览文件 @
67d99d47
#include <DB/Interpreters/Aggregate.h>
namespace
DB
{
/** Простой алгоритм (агрегация с помощью std::map).
* Без оптимизации для агрегатных функций, принимающих не более одного значения.
* Без оптимизации по количеству ключей.
* Результат хранится в оперативке и должен полностью помещаться в оперативку.
*/
AggregatedData
Aggregate
::
execute
(
BlockInputStreamPtr
stream
)
{
AggregatedData
res
;
size_t
keys_size
=
keys
.
size
();
size_t
aggregates_size
=
aggregates
.
size
();
Row
key
(
keys_size
);
Columns
key_columns
(
keys_size
);
typedef
std
::
vector
<
Columns
>
AggregateColumns
;
AggregateColumns
aggregate_columns
(
aggregates_size
);
typedef
std
::
vector
<
Row
>
Rows
;
Rows
aggregate_arguments
(
aggregates_size
);
for
(
size_t
i
=
0
;
i
<
aggregates_size
;
++
i
)
{
aggregate_arguments
[
i
].
resize
(
aggregates
[
i
].
arguments
.
size
());
aggregate_columns
[
i
].
resize
(
aggregates
[
i
].
arguments
.
size
());
}
/// Читаем все данные
while
(
Block
block
=
stream
->
read
())
{
/// Запоминаем столбцы, с которыми будем работать
for
(
size_t
i
=
0
,
size
=
keys_size
;
i
<
size
;
++
i
)
key_columns
[
i
]
=
block
.
getByPosition
(
keys
[
i
]).
column
;
for
(
size_t
i
=
0
;
i
<
aggregates_size
;
++
i
)
for
(
size_t
j
=
0
;
j
<
aggregate_columns
[
i
].
size
();
++
j
)
aggregate_columns
[
i
][
j
]
=
block
.
getByPosition
(
aggregates
[
i
].
arguments
[
j
]).
column
;
size_t
rows
=
block
.
rows
();
/// Для всех строчек
for
(
size_t
i
=
0
;
i
<
rows
;
++
i
)
{
/// Строим ключ
for
(
size_t
j
=
0
;
j
<
keys_size
;
++
j
)
key
[
j
]
=
(
*
key_columns
[
j
])[
i
];
AggregatedData
::
iterator
it
=
res
.
find
(
key
);
if
(
it
==
res
.
end
())
{
it
=
res
.
insert
(
std
::
make_pair
(
key
,
AggregateFunctions
(
aggregates_size
))).
first
;
for
(
size_t
j
=
0
;
j
<
aggregates_size
;
++
j
)
it
->
second
[
j
]
=
aggregates
[
j
].
function
->
cloneEmpty
();
}
/// Добавляем значения
for
(
size_t
j
=
0
;
j
<
aggregates_size
;
++
j
)
{
for
(
size_t
k
=
0
,
size
=
aggregate_arguments
[
j
].
size
();
k
<
size
;
++
k
)
aggregate_arguments
[
j
][
k
]
=
(
*
aggregate_columns
[
j
][
k
])[
i
];
it
->
second
[
j
]
->
add
(
aggregate_arguments
[
j
]);
}
}
}
return
res
;
}
}
dbms/src/Interpreters/Expression.cpp
浏览文件 @
67d99d47
...
...
@@ -34,10 +34,11 @@ void Expression::addSemantic(ASTPtr & ast)
else
if
(
ASTFunction
*
node
=
dynamic_cast
<
ASTFunction
*>
(
&*
ast
))
{
Functions
::
const_iterator
it
=
context
.
functions
->
find
(
node
->
name
);
if
(
it
==
context
.
functions
->
end
())
node
->
aggregate_function
=
context
.
aggregate_function_factory
->
tryGet
(
node
->
name
);
if
(
it
==
context
.
functions
->
end
()
&&
node
->
aggregate_function
.
isNull
())
throw
Exception
(
"Unknown function "
+
node
->
name
,
ErrorCodes
::
UNKNOWN_FUNCTION
);
node
->
function
=
it
->
second
;
if
(
it
!=
context
.
functions
->
end
())
node
->
function
=
it
->
second
;
}
else
if
(
ASTIdentifier
*
node
=
dynamic_cast
<
ASTIdentifier
*>
(
&*
ast
))
{
...
...
@@ -85,7 +86,10 @@ void Expression::checkTypes(ASTPtr ast)
}
/// Получаем типы результата
node
->
return_types
=
node
->
function
->
getReturnTypes
(
argument_types
);
if
(
node
->
aggregate_function
)
node
->
return_types
.
push_back
(
node
->
aggregate_function
->
getReturnType
(
argument_types
));
else
node
->
return_types
=
node
->
function
->
getReturnTypes
(
argument_types
);
}
}
...
...
@@ -167,35 +171,38 @@ void Expression::executeImpl(ASTPtr ast, Block & block, unsigned part_id)
*/
if
(
ASTFunction
*
node
=
dynamic_cast
<
ASTFunction
*>
(
&*
ast
))
{
/// Вставляем в блок столбцы - результаты вычисления функции
ColumnNumbers
argument_numbers
;
ColumnNumbers
&
result_numbers
=
node
->
return_column_numbers
;
result_numbers
.
clear
();
size_t
res_num
=
0
;
for
(
DataTypes
::
const_iterator
it
=
node
->
return_types
.
begin
();
it
!=
node
->
return_types
.
end
();
++
it
)
if
(
node
->
function
)
{
ColumnWithNameAndType
column
;
column
.
type
=
*
it
;
column
.
name
=
node
->
getTreeID
()
+
functionReturnValueSuffix
(
res_num
);
result_numbers
.
push_back
(
block
.
columns
());
block
.
insert
(
column
);
++
res_num
;
/// Вставляем в блок столбцы - результаты вычисления функции
ColumnNumbers
argument_numbers
;
ColumnNumbers
&
result_numbers
=
node
->
return_column_numbers
;
result_numbers
.
clear
();
size_t
res_num
=
0
;
for
(
DataTypes
::
const_iterator
it
=
node
->
return_types
.
begin
();
it
!=
node
->
return_types
.
end
();
++
it
)
{
ColumnWithNameAndType
column
;
column
.
type
=
*
it
;
column
.
name
=
node
->
getTreeID
()
+
functionReturnValueSuffix
(
res_num
);
result_numbers
.
push_back
(
block
.
columns
());
block
.
insert
(
column
);
++
res_num
;
}
ASTs
arguments
=
node
->
arguments
->
children
;
for
(
ASTs
::
iterator
it
=
arguments
.
begin
();
it
!=
arguments
.
end
();
++
it
)
{
if
(
ASTIdentifier
*
ident
=
dynamic_cast
<
ASTIdentifier
*>
(
&**
it
))
argument_numbers
.
push_back
(
block
.
getPositionByName
(
ident
->
name
));
else
if
(
ASTFunction
*
func
=
dynamic_cast
<
ASTFunction
*>
(
&**
it
))
argument_numbers
.
insert
(
argument_numbers
.
end
(),
func
->
return_column_numbers
.
begin
(),
func
->
return_column_numbers
.
end
());
else
argument_numbers
.
push_back
(
block
.
getPositionByName
((
*
it
)
->
getTreeID
()));
}
node
->
function
->
execute
(
block
,
argument_numbers
,
result_numbers
);
}
ASTs
arguments
=
node
->
arguments
->
children
;
for
(
ASTs
::
iterator
it
=
arguments
.
begin
();
it
!=
arguments
.
end
();
++
it
)
{
if
(
ASTIdentifier
*
ident
=
dynamic_cast
<
ASTIdentifier
*>
(
&**
it
))
argument_numbers
.
push_back
(
block
.
getPositionByName
(
ident
->
name
));
else
if
(
ASTFunction
*
func
=
dynamic_cast
<
ASTFunction
*>
(
&**
it
))
argument_numbers
.
insert
(
argument_numbers
.
end
(),
func
->
return_column_numbers
.
begin
(),
func
->
return_column_numbers
.
end
());
else
argument_numbers
.
push_back
(
block
.
getPositionByName
((
*
it
)
->
getTreeID
()));
}
node
->
function
->
execute
(
block
,
argument_numbers
,
result_numbers
);
}
else
if
(
ASTLiteral
*
node
=
dynamic_cast
<
ASTLiteral
*>
(
&*
ast
))
{
...
...
dbms/src/Interpreters/tests/aggregate.cpp
0 → 100644
浏览文件 @
67d99d47
#include <iostream>
#include <iomanip>
#include <Poco/Stopwatch.h>
#include <DB/DataTypes/DataTypesNumberFixed.h>
#include <DB/DataTypes/DataTypeString.h>
#include <DB/Columns/ColumnsNumber.h>
#include <DB/Columns/ColumnString.h>
#include <DB/DataStreams/IBlockInputStream.h>
#include <DB/Interpreters/Aggregate.h>
#include <DB/AggregateFunctions/AggregateFunctionFactory.h>
class
OneBlockInputStream
:
public
DB
::
IBlockInputStream
{
private:
const
DB
::
Block
&
block
;
bool
has_been_read
;
public:
OneBlockInputStream
(
const
DB
::
Block
&
block_
)
:
block
(
block_
),
has_been_read
(
false
)
{}
DB
::
Block
read
()
{
if
(
!
has_been_read
)
{
has_been_read
=
true
;
return
block
;
}
else
return
DB
::
Block
();
}
DB
::
String
getName
()
const
{
return
"OneBlockInputStream"
;
}
};
int
main
(
int
argc
,
char
**
argv
)
{
try
{
size_t
n
=
argc
==
2
?
atoi
(
argv
[
1
])
:
10
;
DB
::
Block
block
;
DB
::
ColumnWithNameAndType
column_x
;
column_x
.
name
=
"x"
;
column_x
.
type
=
new
DB
::
DataTypeInt16
;
DB
::
ColumnInt16
*
x
=
new
DB
::
ColumnInt16
;
column_x
.
column
=
x
;
std
::
vector
<
Int16
>
&
vec_x
=
x
->
getData
();
vec_x
.
resize
(
n
);
for
(
size_t
i
=
0
;
i
<
n
;
++
i
)
vec_x
[
i
]
=
i
%
9
;
block
.
insert
(
column_x
);
const
char
*
strings
[]
=
{
"abc"
,
"def"
,
"abcd"
,
"defg"
,
"ac"
};
DB
::
ColumnWithNameAndType
column_s1
;
column_s1
.
name
=
"s1"
;
column_s1
.
type
=
new
DB
::
DataTypeString
;
column_s1
.
column
=
new
DB
::
ColumnString
;
for
(
size_t
i
=
0
;
i
<
n
;
++
i
)
column_s1
.
column
->
insert
(
strings
[
i
%
5
]);
block
.
insert
(
column_s1
);
DB
::
ColumnWithNameAndType
column_s2
;
column_s2
.
name
=
"s2"
;
column_s2
.
type
=
new
DB
::
DataTypeString
;
column_s2
.
column
=
new
DB
::
ColumnString
;
for
(
size_t
i
=
0
;
i
<
n
;
++
i
)
column_s2
.
column
->
insert
(
strings
[
i
%
3
]);
block
.
insert
(
column_s2
);
DB
::
BlockInputStreamPtr
stream
=
new
OneBlockInputStream
(
block
);
DB
::
AggregatedData
aggregated_data
;
DB
::
ColumnNumbers
key_column_numbers
;
key_column_numbers
.
push_back
(
0
);
key_column_numbers
.
push_back
(
1
);
DB
::
AggregateFunctionFactory
factory
;
DB
::
AggregateDescriptions
aggregate_descriptions
(
1
);
aggregate_descriptions
[
0
].
function
=
factory
.
get
(
"count"
);
DB
::
Aggregate
aggregator
(
key_column_numbers
,
aggregate_descriptions
);
{
Poco
::
Stopwatch
stopwatch
;
stopwatch
.
start
();
aggregated_data
=
aggregator
.
execute
(
stream
);
stopwatch
.
stop
();
std
::
cout
<<
std
::
fixed
<<
std
::
setprecision
(
2
)
<<
"Elapsed "
<<
stopwatch
.
elapsed
()
/
1000000.0
<<
" sec."
<<
", "
<<
n
*
1000000
/
stopwatch
.
elapsed
()
<<
" rows/sec."
<<
std
::
endl
;
}
for
(
DB
::
AggregatedData
::
const_iterator
it
=
aggregated_data
.
begin
();
it
!=
aggregated_data
.
end
();
++
it
)
{
for
(
DB
::
Row
::
const_iterator
jt
=
it
->
first
.
begin
();
jt
!=
it
->
first
.
end
();
++
jt
)
std
::
cout
<<
boost
::
apply_visitor
(
DB
::
FieldVisitorToString
(),
*
jt
)
<<
'\t'
;
for
(
DB
::
AggregateFunctions
::
const_iterator
jt
=
it
->
second
.
begin
();
jt
!=
it
->
second
.
end
();
++
jt
)
{
DB
::
Field
result
=
(
*
jt
)
->
getResult
();
std
::
cout
<<
boost
::
apply_visitor
(
DB
::
FieldVisitorToString
(),
result
)
<<
'\t'
;
}
std
::
cout
<<
'\n'
;
}
}
catch
(
const
DB
::
Exception
&
e
)
{
std
::
cerr
<<
e
.
message
()
<<
std
::
endl
;
}
return
0
;
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录