Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
2dot5
ClickHouse
提交
9f7a5a03
C
ClickHouse
项目概览
2dot5
/
ClickHouse
通知
3
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
C
ClickHouse
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
9f7a5a03
编写于
8月 26, 2015
作者:
A
Alexey Arno
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
dbms: Server: Removed modification of runningAccumulate since it is no longer needed. [#METR-17276]
上级
b25b8661
变更
1
隐藏空白更改
内联
并排
Showing
1 changed file
with
18 addition
and
118 deletion
+18
-118
dbms/include/DB/Functions/FunctionsMiscellaneous.h
dbms/include/DB/Functions/FunctionsMiscellaneous.h
+18
-118
未找到文件。
dbms/include/DB/Functions/FunctionsMiscellaneous.h
浏览文件 @
9f7a5a03
...
...
@@ -964,14 +964,26 @@ private:
*/
class
FunctionRunningAccumulate
:
public
IFunction
{
private:
template
<
typename
T
>
bool
checkType
(
const
IDataType
*
type
)
const
public:
static
constexpr
auto
name
=
"runningAccumulate"
;
static
IFunction
*
create
(
const
Context
&
context
)
{
return
new
FunctionRunningAccumulate
;
}
String
getName
()
const
override
{
return
name
;
}
DataTypePtr
getReturnType
(
const
DataTypes
&
arguments
)
const
override
{
return
typeid_cast
<
const
T
*>
(
type
)
!=
nullptr
;
if
(
arguments
.
size
()
!=
1
)
throw
Exception
(
"Function "
+
getName
()
+
" requires exactly one argument."
,
ErrorCodes
::
NUMBER_OF_ARGUMENTS_DOESNT_MATCH
);
const
DataTypeAggregateFunction
*
type
=
typeid_cast
<
const
DataTypeAggregateFunction
*>
(
&*
arguments
[
0
]);
if
(
!
type
)
throw
Exception
(
"Argument for function "
+
getName
()
+
" must have type AggregateFunction - state of aggregate function."
,
ErrorCodes
::
ILLEGAL_TYPE_OF_ARGUMENT
);
return
type
->
getReturnType
()
->
clone
();
}
void
execute
WithoutKey
(
Block
&
block
,
const
ColumnNumbers
&
arguments
,
size_t
result
)
void
execute
(
Block
&
block
,
const
ColumnNumbers
&
arguments
,
size_t
result
)
override
{
const
ColumnAggregateFunction
*
column_with_states
=
typeid_cast
<
const
ColumnAggregateFunction
*>
(
&*
block
.
getByPosition
(
arguments
.
at
(
0
)).
column
);
if
(
!
column_with_states
)
...
...
@@ -999,121 +1011,9 @@ private:
agg_func
.
insertResultInto
(
place
.
get
(),
result_column
);
}
}
template
<
typename
T
>
bool
executeForType
(
Block
&
block
,
const
ColumnNumbers
&
arguments
,
size_t
result
)
{
using
ColumnType
=
ColumnVector
<
T
>
;
if
(
const
auto
*
column_keys
=
typeid_cast
<
const
ColumnType
*>
(
&*
block
.
getByPosition
(
arguments
[
1
]).
column
))
{
const
ColumnAggregateFunction
*
column_with_states
=
typeid_cast
<
const
ColumnAggregateFunction
*>
(
&*
block
.
getByPosition
(
arguments
.
at
(
0
)).
column
);
if
(
!
column_with_states
)
throw
Exception
(
"Illegal column "
+
block
.
getByPosition
(
arguments
.
at
(
0
)).
column
->
getName
()
+
" of first argument of function "
+
getName
(),
ErrorCodes
::
ILLEGAL_COLUMN
);
AggregateFunctionPtr
aggregate_function_ptr
=
column_with_states
->
getAggregateFunction
();
const
IAggregateFunction
&
agg_func
=
*
aggregate_function_ptr
;
auto
deleter
=
[
&
agg_func
]
(
char
*
ptr
)
{
agg_func
.
destroy
(
ptr
);
free
(
ptr
);
};
std
::
unique_ptr
<
char
,
decltype
(
deleter
)
>
place
{
reinterpret_cast
<
char
*>
(
malloc
(
agg_func
.
sizeOfData
())),
deleter
};
agg_func
.
create
(
place
.
get
());
/// Немного не exception-safe. Если здесь выкинется исключение, то зря вызовется destroy.
ColumnPtr
result_column_ptr
=
agg_func
.
getReturnType
()
->
createColumn
();
block
.
getByPosition
(
result
).
column
=
result_column_ptr
;
IColumn
&
result_column
=
*
result_column_ptr
;
result_column
.
reserve
(
column_with_states
->
size
());
const
auto
&
states
=
column_with_states
->
getData
();
const
auto
&
keys
=
column_keys
->
getData
();
size_t
size
=
states
.
size
();
using
Position
=
typename
std
::
pair
<
T
,
size_t
>
;
std
::
vector
<
Position
>
positions
;
positions
.
reserve
(
size
);
for
(
size_t
i
=
0
;
i
<
size
;
++
i
)
positions
.
push_back
(
std
::
make_pair
(
keys
[
i
],
i
));
std
::
sort
(
positions
.
begin
(),
positions
.
end
(),
[](
const
Position
&
lhs
,
const
Position
&
rhs
)
{
return
lhs
.
first
<
rhs
.
first
;
});
for
(
const
auto
&
pos
:
positions
)
{
size_t
index
=
pos
.
second
;
agg_func
.
merge
(
place
.
get
(),
states
[
index
]);
agg_func
.
insertResultInto
(
place
.
get
(),
result_column
);
}
return
true
;
}
else
return
false
;
}
public:
static
constexpr
auto
name
=
"runningAccumulate"
;
static
IFunction
*
create
(
const
Context
&
context
)
{
return
new
FunctionRunningAccumulate
;
}
String
getName
()
const
override
{
return
name
;
}
DataTypePtr
getReturnType
(
const
DataTypes
&
arguments
)
const
override
{
if
((
arguments
.
size
()
<
1
)
||
(
arguments
.
size
()
>
2
))
throw
Exception
(
"Function "
+
getName
()
+
" requires 1 or 2 arguments."
,
ErrorCodes
::
NUMBER_OF_ARGUMENTS_DOESNT_MATCH
);
if
(
arguments
.
size
()
==
2
)
{
const
IDataType
*
type
=
&*
arguments
[
1
];
if
(
!
(
checkType
<
DataTypeUInt8
>
(
type
)
||
checkType
<
DataTypeUInt16
>
(
type
)
||
checkType
<
DataTypeUInt32
>
(
type
)
||
checkType
<
DataTypeUInt64
>
(
type
)
||
checkType
<
DataTypeInt8
>
(
type
)
||
checkType
<
DataTypeInt16
>
(
type
)
||
checkType
<
DataTypeInt32
>
(
type
)
||
checkType
<
DataTypeInt64
>
(
type
)))
{
throw
Exception
(
"Illegal type in second argument of function "
+
getName
(),
ErrorCodes
::
ILLEGAL_TYPE_OF_ARGUMENT
);
}
}
const
DataTypeAggregateFunction
*
type
=
typeid_cast
<
const
DataTypeAggregateFunction
*>
(
&*
arguments
[
0
]);
if
(
!
type
)
throw
Exception
(
"Argument for function "
+
getName
()
+
" must have type AggregateFunction - state of aggregate function."
,
ErrorCodes
::
ILLEGAL_TYPE_OF_ARGUMENT
);
return
type
->
getReturnType
()
->
clone
();
}
void
execute
(
Block
&
block
,
const
ColumnNumbers
&
arguments
,
size_t
result
)
override
{
if
(
arguments
.
size
()
==
1
)
executeWithoutKey
(
block
,
arguments
,
result
);
else
if
(
arguments
.
size
()
==
2
)
{
if
(
!
(
executeForType
<
UInt8
>
(
block
,
arguments
,
result
)
||
executeForType
<
UInt16
>
(
block
,
arguments
,
result
)
||
executeForType
<
UInt32
>
(
block
,
arguments
,
result
)
||
executeForType
<
UInt64
>
(
block
,
arguments
,
result
)
||
executeForType
<
Int8
>
(
block
,
arguments
,
result
)
||
executeForType
<
Int16
>
(
block
,
arguments
,
result
)
||
executeForType
<
Int32
>
(
block
,
arguments
,
result
)
||
executeForType
<
Int64
>
(
block
,
arguments
,
result
)))
{
throw
Exception
(
"Illegal column "
+
block
.
getByPosition
(
arguments
[
0
]).
column
->
getName
()
+
" of argument of function "
+
getName
(),
ErrorCodes
::
ILLEGAL_COLUMN
);
}
}
else
throw
Exception
(
"Function "
+
getName
()
+
" requires 1 or 2 arguments."
,
ErrorCodes
::
NUMBER_OF_ARGUMENTS_DOESNT_MATCH
);
}
};
/** Принимает состояние агрегатной функции. Возвращает результат агрегации.
*/
class
FunctionFinalizeAggregation
:
public
IFunction
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录