Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
9c1b1b45
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22017
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
9c1b1b45
编写于
6月 10, 2022
作者:
H
Hongze Cheng
浏览文件
操作
浏览文件
下载
差异文件
Merge branch '3.0' of
https://github.com/taosdata/TDengine
into feat/row_refact
上级
a4638486
c55c3499
变更
20
隐藏空白更改
内联
并排
Showing
20 changed file
with
348 addition
and
142 deletion
+348
-142
include/client/taos.h
include/client/taos.h
+48
-49
include/common/tcommon.h
include/common/tcommon.h
+9
-3
include/libs/function/function.h
include/libs/function/function.h
+0
-1
include/libs/function/functionMgt.h
include/libs/function/functionMgt.h
+1
-0
include/libs/stream/tstream.h
include/libs/stream/tstream.h
+2
-3
source/client/src/clientMain.c
source/client/src/clientMain.c
+8
-0
source/dnode/mnode/impl/src/mndScheduler.c
source/dnode/mnode/impl/src/mndScheduler.c
+9
-10
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+1
-0
source/dnode/vnode/src/tsdb/tsdbRead.c
source/dnode/vnode/src/tsdb/tsdbRead.c
+20
-8
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+15
-11
source/libs/executor/src/executil.c
source/libs/executor/src/executil.c
+2
-2
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+1
-1
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+31
-42
source/libs/function/inc/builtinsimpl.h
source/libs/function/inc/builtinsimpl.h
+2
-3
source/libs/function/src/builtins.c
source/libs/function/src/builtins.c
+20
-0
source/libs/function/src/builtinsimpl.c
source/libs/function/src/builtinsimpl.c
+172
-1
source/libs/function/src/taggfunction.c
source/libs/function/src/taggfunction.c
+3
-3
source/libs/stream/src/streamDispatch.c
source/libs/stream/src/streamDispatch.c
+1
-0
source/libs/stream/src/streamTask.c
source/libs/stream/src/streamTask.c
+1
-4
tests/script/tsim/valgrind/checkError.sim
tests/script/tsim/valgrind/checkError.sim
+2
-1
未找到文件。
include/client/taos.h
浏览文件 @
9c1b1b45
...
...
@@ -99,7 +99,7 @@ typedef struct TAOS_FIELD_E {
#define DLL_EXPORT
#endif
typedef
void
(
*
__taos_async_fn_t
)(
void
*
param
,
TAOS_RES
*
,
int
code
);
typedef
void
(
*
__taos_async_fn_t
)(
void
*
param
,
TAOS_RES
*
res
,
int
code
);
typedef
struct
TAOS_MULTI_BIND
{
int
buffer_type
;
...
...
@@ -126,49 +126,47 @@ typedef struct setConfRet {
char
retMsg
[
RET_MSG_LENGTH
];
}
setConfRet
;
DLL_EXPORT
void
taos_cleanup
(
void
);
DLL_EXPORT
int
taos_options
(
TSDB_OPTION
option
,
const
void
*
arg
,
...);
DLL_EXPORT
setConfRet
taos_set_config
(
const
char
*
config
);
DLL_EXPORT
int
taos_init
(
void
);
DLL_EXPORT
TAOS
*
taos_connect
(
const
char
*
ip
,
const
char
*
user
,
const
char
*
pass
,
const
char
*
db
,
uint16_t
port
);
DLL_EXPORT
TAOS
*
taos_connect_l
(
const
char
*
ip
,
int
ipLen
,
const
char
*
user
,
int
userLen
,
const
char
*
pass
,
int
passLen
,
const
char
*
db
,
int
dbLen
,
uint16_t
port
);
DLL_EXPORT
TAOS
*
taos_connect_auth
(
const
char
*
ip
,
const
char
*
user
,
const
char
*
auth
,
const
char
*
db
,
uint16_t
port
);
DLL_EXPORT
void
taos_close
(
TAOS
*
taos
);
const
char
*
taos_data_type
(
int
type
);
DLL_EXPORT
TAOS_STMT
*
taos_stmt_init
(
TAOS
*
taos
);
DLL_EXPORT
int
taos_stmt_prepare
(
TAOS_STMT
*
stmt
,
const
char
*
sql
,
unsigned
long
length
);
DLL_EXPORT
int
taos_stmt_set_tbname_tags
(
TAOS_STMT
*
stmt
,
const
char
*
name
,
TAOS_MULTI_BIND
*
tags
);
DLL_EXPORT
int
taos_stmt_set_tbname
(
TAOS_STMT
*
stmt
,
const
char
*
name
);
DLL_EXPORT
int
taos_stmt_set_tags
(
TAOS_STMT
*
stmt
,
TAOS_MULTI_BIND
*
tags
);
DLL_EXPORT
int
taos_stmt_set_sub_tbname
(
TAOS_STMT
*
stmt
,
const
char
*
name
);
DLL_EXPORT
int
taos_stmt_get_tag_fields
(
TAOS_STMT
*
stmt
,
int
*
fieldNum
,
TAOS_FIELD_E
**
fields
);
DLL_EXPORT
int
taos_stmt_get_col_fields
(
TAOS_STMT
*
stmt
,
int
*
fieldNum
,
TAOS_FIELD_E
**
fields
);
DLL_EXPORT
int
taos_stmt_is_insert
(
TAOS_STMT
*
stmt
,
int
*
insert
);
DLL_EXPORT
int
taos_stmt_num_params
(
TAOS_STMT
*
stmt
,
int
*
nums
);
DLL_EXPORT
int
taos_stmt_get_param
(
TAOS_STMT
*
stmt
,
int
idx
,
int
*
type
,
int
*
bytes
);
DLL_EXPORT
int
taos_stmt_bind_param
(
TAOS_STMT
*
stmt
,
TAOS_MULTI_BIND
*
bind
);
DLL_EXPORT
int
taos_stmt_bind_param_batch
(
TAOS_STMT
*
stmt
,
TAOS_MULTI_BIND
*
bind
);
DLL_EXPORT
int
taos_stmt_bind_single_param_batch
(
TAOS_STMT
*
stmt
,
TAOS_MULTI_BIND
*
bind
,
int
colIdx
);
DLL_EXPORT
int
taos_stmt_add_batch
(
TAOS_STMT
*
stmt
);
DLL_EXPORT
int
taos_stmt_execute
(
TAOS_STMT
*
stmt
);
DLL_EXPORT
TAOS_RES
*
taos_stmt_use_result
(
TAOS_STMT
*
stmt
);
DLL_EXPORT
int
taos_stmt_close
(
TAOS_STMT
*
stmt
);
DLL_EXPORT
char
*
taos_stmt_errstr
(
TAOS_STMT
*
stmt
);
DLL_EXPORT
int
taos_stmt_affected_rows
(
TAOS_STMT
*
stmt
);
DLL_EXPORT
int
taos_stmt_affected_rows_once
(
TAOS_STMT
*
stmt
);
DLL_EXPORT
TAOS_RES
*
taos_query
(
TAOS
*
taos
,
const
char
*
sql
);
DLL_EXPORT
TAOS_ROW
taos_fetch_row
(
TAOS_RES
*
res
);
DLL_EXPORT
int
taos_result_precision
(
TAOS_RES
*
res
);
// get the time precision of result
DLL_EXPORT
void
taos_free_result
(
TAOS_RES
*
res
);
DLL_EXPORT
int
taos_field_count
(
TAOS_RES
*
res
);
DLL_EXPORT
int
taos_num_fields
(
TAOS_RES
*
res
);
DLL_EXPORT
int
taos_affected_rows
(
TAOS_RES
*
res
);
DLL_EXPORT
void
taos_cleanup
(
void
);
DLL_EXPORT
int
taos_options
(
TSDB_OPTION
option
,
const
void
*
arg
,
...);
DLL_EXPORT
setConfRet
taos_set_config
(
const
char
*
config
);
DLL_EXPORT
int
taos_init
(
void
);
DLL_EXPORT
TAOS
*
taos_connect
(
const
char
*
ip
,
const
char
*
user
,
const
char
*
pass
,
const
char
*
db
,
uint16_t
port
);
DLL_EXPORT
TAOS
*
taos_connect_auth
(
const
char
*
ip
,
const
char
*
user
,
const
char
*
auth
,
const
char
*
db
,
uint16_t
port
);
DLL_EXPORT
void
taos_close
(
TAOS
*
taos
);
const
char
*
taos_data_type
(
int
type
);
DLL_EXPORT
TAOS_STMT
*
taos_stmt_init
(
TAOS
*
taos
);
DLL_EXPORT
int
taos_stmt_prepare
(
TAOS_STMT
*
stmt
,
const
char
*
sql
,
unsigned
long
length
);
DLL_EXPORT
int
taos_stmt_set_tbname_tags
(
TAOS_STMT
*
stmt
,
const
char
*
name
,
TAOS_MULTI_BIND
*
tags
);
DLL_EXPORT
int
taos_stmt_set_tbname
(
TAOS_STMT
*
stmt
,
const
char
*
name
);
DLL_EXPORT
int
taos_stmt_set_tags
(
TAOS_STMT
*
stmt
,
TAOS_MULTI_BIND
*
tags
);
DLL_EXPORT
int
taos_stmt_set_sub_tbname
(
TAOS_STMT
*
stmt
,
const
char
*
name
);
DLL_EXPORT
int
taos_stmt_get_tag_fields
(
TAOS_STMT
*
stmt
,
int
*
fieldNum
,
TAOS_FIELD_E
**
fields
);
DLL_EXPORT
int
taos_stmt_get_col_fields
(
TAOS_STMT
*
stmt
,
int
*
fieldNum
,
TAOS_FIELD_E
**
fields
);
DLL_EXPORT
int
taos_stmt_is_insert
(
TAOS_STMT
*
stmt
,
int
*
insert
);
DLL_EXPORT
int
taos_stmt_num_params
(
TAOS_STMT
*
stmt
,
int
*
nums
);
DLL_EXPORT
int
taos_stmt_get_param
(
TAOS_STMT
*
stmt
,
int
idx
,
int
*
type
,
int
*
bytes
);
DLL_EXPORT
int
taos_stmt_bind_param
(
TAOS_STMT
*
stmt
,
TAOS_MULTI_BIND
*
bind
);
DLL_EXPORT
int
taos_stmt_bind_param_batch
(
TAOS_STMT
*
stmt
,
TAOS_MULTI_BIND
*
bind
);
DLL_EXPORT
int
taos_stmt_bind_single_param_batch
(
TAOS_STMT
*
stmt
,
TAOS_MULTI_BIND
*
bind
,
int
colIdx
);
DLL_EXPORT
int
taos_stmt_add_batch
(
TAOS_STMT
*
stmt
);
DLL_EXPORT
int
taos_stmt_execute
(
TAOS_STMT
*
stmt
);
DLL_EXPORT
TAOS_RES
*
taos_stmt_use_result
(
TAOS_STMT
*
stmt
);
DLL_EXPORT
int
taos_stmt_close
(
TAOS_STMT
*
stmt
);
DLL_EXPORT
char
*
taos_stmt_errstr
(
TAOS_STMT
*
stmt
);
DLL_EXPORT
int
taos_stmt_affected_rows
(
TAOS_STMT
*
stmt
);
DLL_EXPORT
int
taos_stmt_affected_rows_once
(
TAOS_STMT
*
stmt
);
DLL_EXPORT
TAOS_RES
*
taos_query
(
TAOS
*
taos
,
const
char
*
sql
);
DLL_EXPORT
TAOS_ROW
taos_fetch_row
(
TAOS_RES
*
res
);
DLL_EXPORT
int
taos_result_precision
(
TAOS_RES
*
res
);
// get the time precision of result
DLL_EXPORT
void
taos_free_result
(
TAOS_RES
*
res
);
DLL_EXPORT
int
taos_field_count
(
TAOS_RES
*
res
);
DLL_EXPORT
int
taos_num_fields
(
TAOS_RES
*
res
);
DLL_EXPORT
int
taos_affected_rows
(
TAOS_RES
*
res
);
DLL_EXPORT
TAOS_FIELD
*
taos_fetch_fields
(
TAOS_RES
*
res
);
DLL_EXPORT
int
taos_select_db
(
TAOS
*
taos
,
const
char
*
db
);
...
...
@@ -183,8 +181,8 @@ DLL_EXPORT int *taos_get_column_data_offset(TAOS_RES *res, int columnInde
DLL_EXPORT
int
taos_validate_sql
(
TAOS
*
taos
,
const
char
*
sql
);
DLL_EXPORT
void
taos_reset_current_db
(
TAOS
*
taos
);
DLL_EXPORT
int
*
taos_fetch_lengths
(
TAOS_RES
*
res
);
DLL_EXPORT
TAOS_ROW
*
taos_result_block
(
TAOS_RES
*
res
);
DLL_EXPORT
int
*
taos_fetch_lengths
(
TAOS_RES
*
res
);
DLL_EXPORT
TAOS_ROW
*
taos_result_block
(
TAOS_RES
*
res
);
DLL_EXPORT
const
char
*
taos_get_server_info
(
TAOS
*
taos
);
DLL_EXPORT
const
char
*
taos_get_client_info
();
...
...
@@ -192,9 +190,10 @@ DLL_EXPORT const char *taos_get_client_info();
DLL_EXPORT
const
char
*
taos_errstr
(
TAOS_RES
*
tres
);
DLL_EXPORT
int
taos_errno
(
TAOS_RES
*
tres
);
DLL_EXPORT
void
taos_query_a
(
TAOS
*
taos
,
const
char
*
sql
,
__taos_async_fn_t
fp
,
void
*
param
);
DLL_EXPORT
void
taos_fetch_rows_a
(
TAOS_RES
*
res
,
__taos_async_fn_t
fp
,
void
*
param
);
DLL_EXPORT
void
taos_fetch_raw_block_a
(
TAOS_RES
*
res
,
__taos_async_fn_t
fp
,
void
*
param
);
DLL_EXPORT
void
taos_query_a
(
TAOS
*
taos
,
const
char
*
sql
,
__taos_async_fn_t
fp
,
void
*
param
);
DLL_EXPORT
void
taos_fetch_rows_a
(
TAOS_RES
*
res
,
__taos_async_fn_t
fp
,
void
*
param
);
DLL_EXPORT
void
taos_fetch_raw_block_a
(
TAOS_RES
*
res
,
__taos_async_fn_t
fp
,
void
*
param
);
DLL_EXPORT
const
void
*
taos_get_raw_block
(
TAOS_RES
*
res
);
// Shuduo: temporary enable for app build
#if 1
...
...
include/common/tcommon.h
浏览文件 @
9c1b1b45
...
...
@@ -159,19 +159,25 @@ typedef struct SColumn {
}
SColumn
;
typedef
struct
STableBlockDistInfo
{
uint
16
_t
rowSize
;
uint
32
_t
rowSize
;
uint16_t
numOfFiles
;
uint32_t
numOfTables
;
uint32_t
numOfBlocks
;
uint64_t
totalSize
;
uint64_t
totalRows
;
int32_t
maxRows
;
int32_t
minRows
;
int32_t
defMinRows
;
int32_t
defMaxRows
;
int32_t
firstSeekTimeUs
;
uint32_t
numOf
RowsInMemTable
;
uint32_t
numOf
InmemRows
;
uint32_t
numOfSmallBlocks
;
SArray
*
dataBlockInfos
;
int32_t
blockRowsHisto
[
20
]
;
}
STableBlockDistInfo
;
int32_t
tSerializeBlockDistInfo
(
void
*
buf
,
int32_t
bufLen
,
const
STableBlockDistInfo
*
pInfo
);
int32_t
tDeserializeBlockDistInfo
(
void
*
buf
,
int32_t
bufLen
,
STableBlockDistInfo
*
pInfo
);
enum
{
FUNC_PARAM_TYPE_VALUE
=
0x1
,
FUNC_PARAM_TYPE_COLUMN
=
0x2
,
...
...
include/libs/function/function.h
浏览文件 @
9c1b1b45
...
...
@@ -58,7 +58,6 @@ typedef struct SFileBlockInfo {
int32_t
numBlocksOfStep
;
}
SFileBlockInfo
;
#define TSDB_BLOCK_DIST_STEP_ROWS 8
#define MAX_INTERVAL_TIME_WINDOW 1000000 // maximum allowed time windows in final results
#define TOP_BOTTOM_QUERY_LIMIT 100
...
...
include/libs/function/functionMgt.h
浏览文件 @
9c1b1b45
...
...
@@ -121,6 +121,7 @@ typedef enum EFunctionType {
// internal function
FUNCTION_TYPE_SELECT_VALUE
,
FUNCTION_TYPE_BLOCK_DIST
,
// block distribution aggregate function
// distributed splitting functions
FUNCTION_TYPE_APERCENTILE_PARTIAL
,
...
...
include/libs/stream/tstream.h
浏览文件 @
9c1b1b45
...
...
@@ -141,8 +141,7 @@ void* streamDataBlockDecode(const void* buf, SStreamDataBlock* pInput);
#endif
typedef
struct
{
int8_t
parallelizable
;
char
*
qmsg
;
char
*
qmsg
;
// followings are not applicable to encoder and decoder
void
*
inputHandle
;
void
*
executor
;
...
...
@@ -267,7 +266,7 @@ struct SStreamTask {
// void* ahandle;
};
SStreamTask
*
tNewSStreamTask
(
int64_t
streamId
,
int32_t
childId
);
SStreamTask
*
tNewSStreamTask
(
int64_t
streamId
);
int32_t
tEncodeSStreamTask
(
SEncoder
*
pEncoder
,
const
SStreamTask
*
pTask
);
int32_t
tDecodeSStreamTask
(
SDecoder
*
pDecoder
,
SStreamTask
*
pTask
);
void
tFreeSStreamTask
(
SStreamTask
*
pTask
);
...
...
source/client/src/clientMain.c
浏览文件 @
9c1b1b45
...
...
@@ -822,10 +822,18 @@ void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) {
void
taos_fetch_raw_block_a
(
TAOS_RES
*
res
,
__taos_async_fn_t
fp
,
void
*
param
)
{
ASSERT
(
res
!=
NULL
&&
fp
!=
NULL
);
SRequestObj
*
pRequest
=
res
;
pRequest
->
body
.
resInfo
.
convertUcs4
=
false
;
taos_fetch_rows_a
(
res
,
fp
,
param
);
}
const
void
*
taos_get_raw_block
(
TAOS_RES
*
res
)
{
ASSERT
(
res
!=
NULL
);
SRequestObj
*
pRequest
=
res
;
return
pRequest
->
body
.
resInfo
.
pData
;
}
TAOS_SUB
*
taos_subscribe
(
TAOS
*
taos
,
int
restart
,
const
char
*
topic
,
const
char
*
sql
,
TAOS_SUBSCRIBE_CALLBACK
fp
,
void
*
param
,
int
interval
)
{
// TODO
...
...
source/dnode/mnode/impl/src/mndScheduler.c
浏览文件 @
9c1b1b45
...
...
@@ -197,7 +197,7 @@ int32_t mndAddShuffledSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* p
sdbRelease
(
pSdb
,
pVgroup
);
continue
;
}
SStreamTask
*
pTask
=
tNewSStreamTask
(
pStream
->
uid
,
0
);
SStreamTask
*
pTask
=
tNewSStreamTask
(
pStream
->
uid
);
if
(
pTask
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
...
...
@@ -237,7 +237,7 @@ int32_t mndAddShuffledSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* p
int32_t
mndAddFixedSinkToStream
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SStreamObj
*
pStream
)
{
ASSERT
(
pStream
->
fixedSinkVgId
!=
0
);
SArray
*
tasks
=
taosArrayGetP
(
pStream
->
tasks
,
0
);
SStreamTask
*
pTask
=
tNewSStreamTask
(
pStream
->
uid
,
0
);
SStreamTask
*
pTask
=
tNewSStreamTask
(
pStream
->
uid
);
if
(
pTask
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
...
...
@@ -329,7 +329,8 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
sdbRelease
(
pSdb
,
pVgroup
);
continue
;
}
SStreamTask
*
pTask
=
tNewSStreamTask
(
pStream
->
uid
,
0
);
SStreamTask
*
pTask
=
tNewSStreamTask
(
pStream
->
uid
);
mndAddTaskToTaskSet
(
taskOneLevel
,
pTask
);
// source part
pTask
->
sourceType
=
TASK_SOURCE__SCAN
;
pTask
->
inputType
=
TASK_INPUT_TYPE__SUMBIT_BLOCK
;
...
...
@@ -378,14 +379,12 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
// exec part
pTask
->
execType
=
TASK_EXEC__PIPE
;
pTask
->
exec
.
parallelizable
=
1
;
if
(
mndAssignTaskToVg
(
pMnode
,
pTrans
,
pTask
,
plan
,
pVgroup
)
<
0
)
{
sdbRelease
(
pSdb
,
pVgroup
);
qDestroyQueryPlan
(
pPlan
);
return
-
1
;
}
sdbRelease
(
pSdb
,
pVgroup
);
mndAddTaskToTaskSet
(
taskOneLevel
,
pTask
);
}
}
else
{
// merge plan
...
...
@@ -394,7 +393,8 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
// else, assign to vnode
ASSERT
(
plan
->
subplanType
==
SUBPLAN_TYPE_MERGE
);
SStreamTask
*
pTask
=
tNewSStreamTask
(
pStream
->
uid
,
0
);
SStreamTask
*
pTask
=
tNewSStreamTask
(
pStream
->
uid
);
mndAddTaskToTaskSet
(
taskOneLevel
,
pTask
);
// source part, currently only support multi source
pTask
->
sourceType
=
TASK_SOURCE__PIPE
;
...
...
@@ -456,7 +456,6 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
// exec part
pTask
->
execType
=
TASK_EXEC__MERGE
;
pTask
->
exec
.
parallelizable
=
0
;
SVgObj
*
pVgroup
=
mndSchedFetchOneVg
(
pMnode
,
pStream
->
dbUid
);
ASSERT
(
pVgroup
);
if
(
mndAssignTaskToVg
(
pMnode
,
pTrans
,
pTask
,
plan
,
pVgroup
)
<
0
)
{
...
...
@@ -465,12 +464,12 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
return
-
1
;
}
sdbRelease
(
pSdb
,
pVgroup
);
taosArrayPush
(
taskOneLevel
,
&
pTask
);
}
taosArrayPush
(
pStream
->
tasks
,
&
taskOneLevel
);
}
#if 0
if (totLevel == 2) {
void* pIter = NULL;
while (1) {
...
...
@@ -481,7 +480,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
sdbRelease(pSdb, pVgroup);
continue;
}
SStreamTask
*
pTask
=
tNewSStreamTask
(
pStream
->
uid
,
0
);
SStreamTask* pTask = tNewSStreamTask(pStream->uid);
// source part
pTask->sourceType = TASK_SOURCE__MERGE;
...
...
@@ -495,9 +494,9 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
// exec part
pTask->execType = TASK_EXEC__NONE;
pTask
->
exec
.
parallelizable
=
0
;
}
}
#endif
// free memory
qDestroyQueryPlan
(
pPlan
);
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
9c1b1b45
...
...
@@ -364,6 +364,7 @@ int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) {
tdGetSTSChemaFromSSChema
(
&
pTask
->
tbSink
.
pSchemaWrapper
->
pSchema
,
pTask
->
tbSink
.
pSchemaWrapper
->
nCols
);
ASSERT
(
pTask
->
tbSink
.
pTSchema
);
}
tqInfo
(
"deploy stream task id %d child id %d on vg %d"
,
pTask
->
taskId
,
pTask
->
childId
,
pTq
->
pVnode
->
config
.
vgId
);
taosHashPut
(
pTq
->
pStreamTasks
,
&
pTask
->
taskId
,
sizeof
(
int32_t
),
pTask
,
sizeof
(
SStreamTask
));
...
...
source/dnode/vnode/src/tsdb/tsdbRead.c
浏览文件 @
9c1b1b45
...
...
@@ -2557,6 +2557,10 @@ static void moveToNextDataBlockInCurrentFile(STsdbReadHandle* pTsdbReadHandle) {
cur
->
blockCompleted
=
false
;
}
static
int32_t
getBucketIndex
(
int32_t
startRow
,
int32_t
bucketRange
,
int32_t
numOfRows
)
{
return
(
numOfRows
-
startRow
)
/
bucketRange
;
}
int32_t
tsdbGetFileBlocksDistInfo
(
tsdbReaderT
*
queryHandle
,
STableBlockDistInfo
*
pTableBlockInfo
)
{
STsdbReadHandle
*
pTsdbReadHandle
=
(
STsdbReadHandle
*
)
queryHandle
;
...
...
@@ -2575,16 +2579,20 @@ int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT* queryHandle, STableBlockDistInfo*
tsdbFSIterSeek
(
&
pTsdbReadHandle
->
fileIter
,
fid
);
tsdbUnLockFS
(
pFileHandle
);
STsdbCfg
*
pc
=
REPO_CFG
(
pTsdbReadHandle
->
pTsdb
);
pTableBlockInfo
->
defMinRows
=
pc
->
minRows
;
pTableBlockInfo
->
defMaxRows
=
pc
->
maxRows
;
int32_t
bucketRange
=
ceil
((
pc
->
maxRows
-
pc
->
minRows
)
/
20
.
0
);
pTableBlockInfo
->
numOfFiles
+=
1
;
int32_t
code
=
TSDB_CODE_SUCCESS
;
int32_t
numOfBlocks
=
0
;
int32_t
numOfTables
=
(
int32_t
)
taosArrayGetSize
(
pTsdbReadHandle
->
pTableCheckInfo
);
int
defaultRows
=
4096
;
// TSDB_DEFAULT_BLOCK_ROWS(pCfg->maxRowsPerFileBlock);
int
defaultRows
=
4096
;
STimeWindow
win
=
TSWINDOW_INITIALIZER
;
bool
ascTraverse
=
ASCENDING_TRAVERSE
(
pTsdbReadHandle
->
order
);
while
(
true
)
{
numOfBlocks
=
0
;
tsdbRLockFS
(
REPO_FS
(
pTsdbReadHandle
->
pTsdb
));
...
...
@@ -2597,8 +2605,7 @@ int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT* queryHandle, STableBlockDistInfo*
tsdbGetFidKeyRange
(
pCfg
->
days
,
pCfg
->
precision
,
pTsdbReadHandle
->
pFileGroup
->
fid
,
&
win
.
skey
,
&
win
.
ekey
);
// current file are not overlapped with query time window, ignore remain files
if
((
ascTraverse
&&
win
.
skey
>
pTsdbReadHandle
->
window
.
ekey
)
||
(
!
ascTraverse
&&
win
.
ekey
<
pTsdbReadHandle
->
window
.
ekey
))
{
if
((
win
.
skey
>
pTsdbReadHandle
->
window
.
ekey
)
/* || (!ascTraverse && win.ekey < pTsdbReadHandle->window.ekey)*/
)
{
tsdbUnLockFS
(
REPO_FS
(
pTsdbReadHandle
->
pTsdb
));
tsdbDebug
(
"%p remain files are not qualified for qrange:%"
PRId64
"-%"
PRId64
", ignore, %s"
,
pTsdbReadHandle
,
pTsdbReadHandle
->
window
.
skey
,
pTsdbReadHandle
->
window
.
ekey
,
pTsdbReadHandle
->
idStr
);
...
...
@@ -2631,15 +2638,19 @@ int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT* queryHandle, STableBlockDistInfo*
continue
;
}
pTableBlockInfo
->
numOfBlocks
+=
numOfBlocks
;
for
(
int32_t
i
=
0
;
i
<
numOfTables
;
++
i
)
{
STableCheckInfo
*
pCheckInfo
=
taosArrayGet
(
pTsdbReadHandle
->
pTableCheckInfo
,
i
);
SBlock
*
pBlock
=
pCheckInfo
->
pCompInfo
->
blocks
;
for
(
int32_t
j
=
0
;
j
<
pCheckInfo
->
numOfBlocks
;
++
j
)
{
pTableBlockInfo
->
totalSize
+=
pBlock
[
j
].
len
;
int32_t
numOfRows
=
pBlock
[
j
].
numOfRows
;
pTableBlockInfo
->
totalRows
+=
numOfRows
;
if
(
numOfRows
>
pTableBlockInfo
->
maxRows
)
{
pTableBlockInfo
->
maxRows
=
numOfRows
;
}
...
...
@@ -2651,13 +2662,14 @@ int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT* queryHandle, STableBlockDistInfo*
if
(
numOfRows
<
defaultRows
)
{
pTableBlockInfo
->
numOfSmallBlocks
+=
1
;
}
// int32_t stepIndex = (numOfRows-1)/TSDB_BLOCK_DIST_STEP_ROWS;
// SFileBlockInfo *blockInfo = (SFileBlockInfo*)taosArrayGet(pTableBlockInfo->dataBlockInfos, stepIndex
);
// blockInfo->numBlocksOfStep
++;
int32_t
bucketIndex
=
getBucketIndex
(
pTableBlockInfo
->
defMinRows
,
bucketRange
,
numOfRows
);
pTableBlockInfo
->
blockRowsHisto
[
bucketIndex
]
++
;
}
}
}
pTableBlockInfo
->
numOfTables
=
numOfTables
;
return
code
;
}
...
...
source/libs/executor/inc/executorimpl.h
浏览文件 @
9c1b1b45
...
...
@@ -421,19 +421,23 @@ typedef struct SSysTableScanInfo {
SRetrieveTableReq
req
;
SEpSet
epSet
;
tsem_t
ready
;
SReadHandle
readHandle
;
int32_t
accountId
;
bool
showRewrite
;
SNode
*
pCondition
;
// db_name filter condition, to discard data that are not in current database
SMTbCursor
*
pCur
;
// cursor for iterate the local table meta store.
SArray
*
scanCols
;
// SArray<int16_t> scan column id list
SName
name
;
SSDataBlock
*
pRes
;
int64_t
numOfBlocks
;
// extract basic running information.
SLoadRemoteDataInfo
loadInfo
;
SReadHandle
readHandle
;
int32_t
accountId
;
bool
showRewrite
;
SNode
*
pCondition
;
// db_name filter condition, to discard data that are not in current database
SMTbCursor
*
pCur
;
// cursor for iterate the local table meta store.
SArray
*
scanCols
;
// SArray<int16_t> scan column id list
SName
name
;
SSDataBlock
*
pRes
;
int64_t
numOfBlocks
;
// extract basic running information.
SLoadRemoteDataInfo
loadInfo
;
}
SSysTableScanInfo
;
typedef
struct
SBlockDistInfo
{
SSDataBlock
*
pResBlock
;
void
*
pHandle
;
}
SBlockDistInfo
;
typedef
struct
SOptrBasicInfo
{
SResultRowInfo
resultRowInfo
;
int32_t
*
rowCellInfoOffset
;
// offset value for each row result cell info
...
...
source/libs/executor/src/executil.c
浏览文件 @
9c1b1b45
...
...
@@ -449,7 +449,7 @@ int32_t mergeIntoGroupResult(SGroupResInfo* pGroupResInfo, STaskRuntimeEnv* pRun
// tbufWriteUint64(bw, pDist->totalRows);
// tbufWriteInt32(bw, pDist->maxRows);
// tbufWriteInt32(bw, pDist->minRows);
// tbufWriteUint32(bw, pDist->numOf
RowsInMemTable
);
// tbufWriteUint32(bw, pDist->numOf
InmemRows
);
// tbufWriteUint32(bw, pDist->numOfSmallBlocks);
// tbufWriteUint64(bw, taosArrayGetSize(pDist->dataBlockInfos));
//
...
...
@@ -488,7 +488,7 @@ int32_t mergeIntoGroupResult(SGroupResInfo* pGroupResInfo, STaskRuntimeEnv* pRun
// pDist->totalRows = tbufReadUint64(&br);
// pDist->maxRows = tbufReadInt32(&br);
// pDist->minRows = tbufReadInt32(&br);
// pDist->numOf
RowsInMemTable
= tbufReadUint32(&br);
// pDist->numOf
InmemRows
= tbufReadUint32(&br);
// pDist->numOfSmallBlocks = tbufReadUint32(&br);
// int64_t numSteps = tbufReadUint64(&br);
//
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
9c1b1b45
...
...
@@ -4602,7 +4602,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN
==
type
)
{
STagScanPhysiNode
*
pScanPhyNode
=
(
STagScanPhysiNode
*
)
pPhyNode
;
int32_t
code
=
getTableList
(
pHandle
->
meta
,
pScanPhyNode
->
tableType
,
pScanPhyNode
->
uid
,
pTableListInfo
,
p
TagCond
);
int32_t
code
=
getTableList
(
pHandle
->
meta
,
pScanPhyNode
->
tableType
,
pScanPhyNode
->
uid
,
pTableListInfo
,
p
ScanPhyNode
->
node
.
pConditions
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
NULL
;
}
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
9c1b1b45
...
...
@@ -641,72 +641,61 @@ static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) {
STableScanInfo
*
pTableScanInfo
=
pOperator
->
info
;
STableBlockDistInfo
tableBlockDist
=
{
0
};
tableBlockDist
.
numOfTables
=
1
;
// TODO set the correct number of tables
STableBlockDistInfo
blockDistInfo
=
{
0
};
blockDistInfo
.
maxRows
=
INT_MIN
;
blockDistInfo
.
minRows
=
INT_MAX
;
int32_t
numRowSteps
=
TSDB_DEFAULT_MAXROWS_FBLOCK
/
TSDB_BLOCK_DIST_STEP_ROWS
;
if
(
TSDB_DEFAULT_MAXROWS_FBLOCK
%
TSDB_BLOCK_DIST_STEP_ROWS
!=
0
)
{
++
numRowSteps
;
}
tableBlockDist
.
dataBlockInfos
=
taosArrayInit
(
numRowSteps
,
sizeof
(
SFileBlockInfo
));
taosArraySetSize
(
tableBlockDist
.
dataBlockInfos
,
numRowSteps
);
tableBlockDist
.
maxRows
=
INT_MIN
;
tableBlockDist
.
minRows
=
INT_MAX
;
tsdbGetFileBlocksDistInfo
(
pTableScanInfo
->
dataReader
,
&
tableBlockDist
);
tableBlockDist
.
numOfRowsInMemTable
=
(
int32_t
)
tsdbGetNumOfRowsInMemTable
(
pTableScanInfo
->
dataReader
);
tsdbGetFileBlocksDistInfo
(
pTableScanInfo
->
dataReader
,
&
blockDistInfo
);
blockDistInfo
.
numOfInmemRows
=
(
int32_t
)
tsdbGetNumOfRowsInMemTable
(
pTableScanInfo
->
dataReader
);
SSDataBlock
*
pBlock
=
pTableScanInfo
->
pResBlock
;
pBlock
->
info
.
rows
=
1
;
pBlock
->
info
.
numOfCols
=
1
;
// SBufferWriter bw = tbufInitWriter(NULL, false);
// blockDistInfoToBinary(&tableBlockDist, &bw);
SColumnInfoData
*
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
0
);
// int32_t len = (int32_t) tbufTell(&bw);
// pColInfo->pData = taosMemoryMalloc(len + sizeof(int32_t));
// *(int32_t*) pColInfo->pData = len;
// memcpy(pColInfo->pData + sizeof(int32_t), tbufGetData(&bw, false), len);
//
// tbufCloseWriter(&bw);
int32_t
len
=
tSerializeBlockDistInfo
(
NULL
,
0
,
&
blockDistInfo
);
char
*
p
=
taosMemoryCalloc
(
1
,
len
+
VARSTR_HEADER_SIZE
);
tSerializeBlockDistInfo
(
varDataVal
(
p
),
len
,
&
blockDistInfo
);
varDataSetLen
(
p
,
len
);
// SArray* g = GET_TABLEGROUP(pOperator->, 0
);
// pOperator->pRuntimeEnv->current = taosArrayGetP(g, 0
);
colDataAppend
(
pColInfo
,
0
,
p
,
false
);
taosMemoryFree
(
p
);
pOperator
->
status
=
OP_EXEC_DONE
;
return
pBlock
;
}
static
void
destroyBlockDistScanOperatorInfo
(
void
*
param
,
int32_t
numOfOutput
)
{
SBlockDistInfo
*
pDistInfo
=
(
SBlockDistInfo
*
)
param
;
blockDataDestroy
(
pDistInfo
->
pResBlock
);
}
SOperatorInfo
*
createDataBlockInfoScanOperator
(
void
*
dataReader
,
SExecTaskInfo
*
pTaskInfo
)
{
S
TableScanInfo
*
pInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
STableScan
Info
));
S
BlockDistInfo
*
pInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SBlockDist
Info
));
SOperatorInfo
*
pOperator
=
taosMemoryCalloc
(
1
,
sizeof
(
SOperatorInfo
));
if
(
pInfo
==
NULL
||
pOperator
==
NULL
)
{
pTaskInfo
->
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_error
;
}
pInfo
->
dataReader
=
dataReader
;
// pInfo->block.pDataBlock = taosArrayInit(1, sizeof(SColumnInfoData));
pInfo
->
pHandle
=
dataReader
;
SColumnInfoData
infoData
=
{
0
};
infoData
.
info
.
type
=
TSDB_DATA_TYPE_BINARY
;
infoData
.
info
.
bytes
=
1024
;
infoData
.
info
.
colId
=
0
;
// taosArrayPush(pInfo->block.pDataBlock, &infoData);
pInfo
->
pResBlock
=
taosMemoryCalloc
(
1
,
sizeof
(
SSDataBlock
));
pOperator
->
name
=
"DataBlockInfoScanOperator"
;
// pOperator->operatorType = OP_TableBlockInfoScan;
pOperator
->
blocking
=
false
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
fpSet
.
_openFn
=
operatorDummyOpenFn
;
pOperator
->
fpSet
.
getNextFn
=
doBlockInfoScan
;
SColumnInfoData
infoData
=
{
0
};
infoData
.
info
.
type
=
TSDB_DATA_TYPE_VARCHAR
;
infoData
.
info
.
bytes
=
1024
;
pOperator
->
info
=
pInfo
;
pOperator
->
pTaskInfo
=
pTaskInfo
;
taosArrayPush
(
pInfo
->
pResBlock
->
pDataBlock
,
&
infoData
);
pOperator
->
name
=
"DataBlockInfoScanOperator"
;
// pOperator->operatorType = OP_TableBlockInfoScan;
pOperator
->
blocking
=
false
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
info
=
pInfo
;
pOperator
->
pTaskInfo
=
pTaskInfo
;
pOperator
->
fpSet
=
createOperatorFpSet
(
operatorDummyOpenFn
,
doBlockInfoScan
,
NULL
,
NULL
,
destroyBlockDistScanOperatorInfo
,
NULL
,
NULL
,
NULL
);
return
pOperator
;
_error:
...
...
source/libs/function/inc/builtinsimpl.h
浏览文件 @
9c1b1b45
...
...
@@ -151,17 +151,14 @@ int32_t mavgFunction(SqlFunctionCtx* pCtx);
bool
getSampleFuncEnv
(
struct
SFunctionNode
*
pFunc
,
SFuncExecEnv
*
pEnv
);
bool
sampleFunctionSetup
(
SqlFunctionCtx
*
pCtx
,
SResultRowEntryInfo
*
pResultInfo
);
int32_t
sampleFunction
(
SqlFunctionCtx
*
pCtx
);
//int32_t sampleFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
bool
getTailFuncEnv
(
struct
SFunctionNode
*
pFunc
,
SFuncExecEnv
*
pEnv
);
bool
tailFunctionSetup
(
SqlFunctionCtx
*
pCtx
,
SResultRowEntryInfo
*
pResultInfo
);
int32_t
tailFunction
(
SqlFunctionCtx
*
pCtx
);
//int32_t tailFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
bool
getUniqueFuncEnv
(
struct
SFunctionNode
*
pFunc
,
SFuncExecEnv
*
pEnv
);
bool
uniqueFunctionSetup
(
SqlFunctionCtx
*
pCtx
,
SResultRowEntryInfo
*
pResultInfo
);
int32_t
uniqueFunction
(
SqlFunctionCtx
*
pCtx
);
//int32_t uniqueFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
bool
getTwaFuncEnv
(
struct
SFunctionNode
*
pFunc
,
SFuncExecEnv
*
pEnv
);
bool
twaFunctionSetup
(
SqlFunctionCtx
*
pCtx
,
SResultRowEntryInfo
*
pResultInfo
);
...
...
@@ -169,6 +166,8 @@ int32_t twaFunction(SqlFunctionCtx *pCtx);
int32_t
twaFinalize
(
struct
SqlFunctionCtx
*
pCtx
,
SSDataBlock
*
pBlock
);
bool
getSelectivityFuncEnv
(
SFunctionNode
*
pFunc
,
SFuncExecEnv
*
pEnv
);
int32_t
blockDistFunction
(
SqlFunctionCtx
*
pCtx
);
int32_t
blockDistFinalize
(
SqlFunctionCtx
*
pCtx
,
SSDataBlock
*
pBlock
);
#ifdef __cplusplus
}
...
...
source/libs/function/src/builtins.c
浏览文件 @
9c1b1b45
...
...
@@ -1319,6 +1319,17 @@ static int32_t translateSelectValue(SFunctionNode* pFunc, char* pErrBuf, int32_t
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
translateBlockDistFunc
(
SFunctionNode
*
pFunc
,
char
*
pErrBuf
,
int32_t
len
)
{
pFunc
->
node
.
resType
=
(
SDataType
)
{.
bytes
=
128
,
.
type
=
TSDB_DATA_TYPE_VARCHAR
};
return
TSDB_CODE_SUCCESS
;
}
static
bool
getBlockDistFuncEnv
(
SFunctionNode
*
UNUSED_PARAM
(
pFunc
),
SFuncExecEnv
*
pEnv
)
{
pEnv
->
calcMemSize
=
sizeof
(
STableBlockDistInfo
);
return
true
;
}
// clang-format off
const
SBuiltinFuncDefinition
funcMgtBuiltins
[]
=
{
{
...
...
@@ -2117,6 +2128,15 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.
initFunc
=
functionSetup
,
.
processFunc
=
NULL
,
.
finalizeFunc
=
NULL
},
{
.
name
=
"_block_dist"
,
.
type
=
FUNCTION_TYPE_BLOCK_DIST
,
.
classification
=
FUNC_MGT_AGG_FUNC
,
.
translateFunc
=
translateBlockDistFunc
,
.
getEnvFunc
=
getBlockDistFuncEnv
,
.
processFunc
=
blockDistFunction
,
.
finalizeFunc
=
blockDistFinalize
}
};
// clang-format on
...
...
source/libs/function/src/builtinsimpl.c
浏览文件 @
9c1b1b45
...
...
@@ -4627,7 +4627,6 @@ int32_t twaFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock) {
if
(
pResInfo
->
numOfRes
==
0
)
{
pResInfo
->
isNullRes
=
1
;
}
else
{
// assert(pInfo->win.ekey == pInfo->p.key && pInfo->hasResult == pResInfo->hasResult);
if
(
pInfo
->
win
.
ekey
==
pInfo
->
win
.
skey
)
{
pInfo
->
dOutput
=
pInfo
->
p
.
val
;
}
else
{
...
...
@@ -4640,3 +4639,175 @@ int32_t twaFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock) {
return
functionFinalize
(
pCtx
,
pBlock
);
}
int32_t
blockDistFunction
(
SqlFunctionCtx
*
pCtx
)
{
SInputColumnInfoData
*
pInput
=
&
pCtx
->
input
;
SColumnInfoData
*
pInputCol
=
pInput
->
pData
[
0
];
SResultRowEntryInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
STableBlockDistInfo
*
pDistInfo
=
GET_ROWCELL_INTERBUF
(
pResInfo
);
STableBlockDistInfo
p1
=
{
0
};
tDeserializeBlockDistInfo
(
varDataVal
(
pInputCol
->
pData
),
varDataLen
(
pInputCol
->
pData
),
&
p1
);
pDistInfo
->
numOfBlocks
+=
p1
.
numOfBlocks
;
pDistInfo
->
numOfTables
+=
p1
.
numOfTables
;
pDistInfo
->
numOfInmemRows
+=
p1
.
numOfInmemRows
;
pDistInfo
->
totalSize
+=
p1
.
totalSize
;
pDistInfo
->
totalRows
+=
p1
.
totalRows
;
pDistInfo
->
numOfFiles
+=
p1
.
numOfFiles
;
if
(
pDistInfo
->
minRows
>
p1
.
minRows
)
{
pDistInfo
->
minRows
=
p1
.
minRows
;
}
if
(
pDistInfo
->
maxRows
<
p1
.
maxRows
)
{
pDistInfo
->
maxRows
=
p1
.
maxRows
;
}
for
(
int32_t
i
=
0
;
i
<
tListLen
(
pDistInfo
->
blockRowsHisto
);
++
i
)
{
pDistInfo
->
blockRowsHisto
[
i
]
+=
p1
.
blockRowsHisto
[
i
];
}
pResInfo
->
numOfRes
=
1
;
return
TSDB_CODE_SUCCESS
;
}
int32_t
tSerializeBlockDistInfo
(
void
*
buf
,
int32_t
bufLen
,
const
STableBlockDistInfo
*
pInfo
)
{
SEncoder
encoder
=
{
0
};
tEncoderInit
(
&
encoder
,
buf
,
bufLen
);
if
(
tStartEncode
(
&
encoder
)
<
0
)
return
-
1
;
if
(
tEncodeU32
(
&
encoder
,
pInfo
->
rowSize
)
<
0
)
return
-
1
;
if
(
tEncodeU16
(
&
encoder
,
pInfo
->
numOfFiles
)
<
0
)
return
-
1
;
if
(
tEncodeU32
(
&
encoder
,
pInfo
->
rowSize
)
<
0
)
return
-
1
;
if
(
tEncodeU32
(
&
encoder
,
pInfo
->
numOfTables
)
<
0
)
return
-
1
;
if
(
tEncodeU64
(
&
encoder
,
pInfo
->
totalSize
)
<
0
)
return
-
1
;
if
(
tEncodeU64
(
&
encoder
,
pInfo
->
totalRows
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pInfo
->
maxRows
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pInfo
->
minRows
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pInfo
->
defMaxRows
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pInfo
->
defMinRows
)
<
0
)
return
-
1
;
if
(
tEncodeU32
(
&
encoder
,
pInfo
->
numOfInmemRows
)
<
0
)
return
-
1
;
if
(
tEncodeU32
(
&
encoder
,
pInfo
->
numOfSmallBlocks
)
<
0
)
return
-
1
;
for
(
int32_t
i
=
0
;
i
<
tListLen
(
pInfo
->
blockRowsHisto
);
++
i
)
{
if
(
tEncodeI32
(
&
encoder
,
pInfo
->
blockRowsHisto
[
i
])
<
0
)
return
-
1
;
}
tEndEncode
(
&
encoder
);
int32_t
tlen
=
encoder
.
pos
;
tEncoderClear
(
&
encoder
);
return
tlen
;
}
int32_t
tDeserializeBlockDistInfo
(
void
*
buf
,
int32_t
bufLen
,
STableBlockDistInfo
*
pInfo
)
{
SDecoder
decoder
=
{
0
};
tDecoderInit
(
&
decoder
,
buf
,
bufLen
);
if
(
tStartDecode
(
&
decoder
)
<
0
)
return
-
1
;
if
(
tDecodeU32
(
&
decoder
,
&
pInfo
->
rowSize
)
<
0
)
return
-
1
;
if
(
tDecodeU16
(
&
decoder
,
&
pInfo
->
numOfFiles
)
<
0
)
return
-
1
;
if
(
tDecodeU32
(
&
decoder
,
&
pInfo
->
rowSize
)
<
0
)
return
-
1
;
if
(
tDecodeU32
(
&
decoder
,
&
pInfo
->
numOfTables
)
<
0
)
return
-
1
;
if
(
tDecodeU64
(
&
decoder
,
&
pInfo
->
totalSize
)
<
0
)
return
-
1
;
if
(
tDecodeU64
(
&
decoder
,
&
pInfo
->
totalRows
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pInfo
->
maxRows
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pInfo
->
minRows
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pInfo
->
defMaxRows
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pInfo
->
defMinRows
)
<
0
)
return
-
1
;
if
(
tDecodeU32
(
&
decoder
,
&
pInfo
->
numOfInmemRows
)
<
0
)
return
-
1
;
if
(
tDecodeU32
(
&
decoder
,
&
pInfo
->
numOfSmallBlocks
)
<
0
)
return
-
1
;
for
(
int32_t
i
=
0
;
i
<
tListLen
(
pInfo
->
blockRowsHisto
);
++
i
)
{
if
(
tDecodeI32
(
&
decoder
,
&
pInfo
->
blockRowsHisto
[
i
])
<
0
)
return
-
1
;
}
tDecoderClear
(
&
decoder
);
return
0
;
}
int32_t
blockDistFinalize
(
SqlFunctionCtx
*
pCtx
,
SSDataBlock
*
pBlock
)
{
SResultRowEntryInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
char
*
pData
=
GET_ROWCELL_INTERBUF
(
pResInfo
);
SColumnInfoData
*
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
0
);
int32_t
row
=
0
;
STableBlockDistInfo
info
=
{
0
};
tDeserializeBlockDistInfo
(
varDataVal
(
pData
),
varDataLen
(
pData
),
&
info
);
char
st
[
256
]
=
{
0
};
int32_t
len
=
sprintf
(
st
+
VARSTR_HEADER_SIZE
,
"Blocks=[%d] Size=[%.3fKb] Average_Block_size=[%.3fKb] Compression_Ratio=[%.3f]"
,
info
.
numOfBlocks
,
info
.
totalSize
/
1024
.
0
,
info
.
totalSize
/
(
info
.
numOfBlocks
*
1024
.
0
),
info
.
totalSize
/
(
info
.
totalRows
*
info
.
rowSize
*
1
.
0
)
);
varDataSetLen
(
st
,
len
);
colDataAppend
(
pColInfo
,
row
++
,
st
,
false
);
len
=
sprintf
(
st
+
VARSTR_HEADER_SIZE
,
"Total_Rows=[%ld] MinRows=[%d] MaxRows=[%d] Averge_Rows=[%ld] Inmem_Rows=[%d]"
,
info
.
totalRows
,
info
.
minRows
,
info
.
maxRows
,
info
.
totalRows
/
info
.
numOfBlocks
,
info
.
numOfInmemRows
);
varDataSetLen
(
st
,
len
);
colDataAppend
(
pColInfo
,
row
++
,
st
,
false
);
len
=
sprintf
(
st
+
VARSTR_HEADER_SIZE
,
"Total_Tables=[%d] Total_Files=[%d] Total_Vgroups=[%d]"
,
info
.
numOfTables
,
info
.
numOfFiles
,
0
);
varDataSetLen
(
st
,
len
);
colDataAppend
(
pColInfo
,
row
++
,
st
,
false
);
len
=
sprintf
(
st
+
VARSTR_HEADER_SIZE
,
"--------------------------------------------------------------------------------"
);
varDataSetLen
(
st
,
len
);
colDataAppend
(
pColInfo
,
row
++
,
st
,
false
);
int32_t
maxVal
=
0
;
int32_t
minVal
=
INT32_MAX
;
for
(
int32_t
i
=
0
;
i
<
sizeof
(
info
.
blockRowsHisto
)
/
sizeof
(
info
.
blockRowsHisto
[
0
]);
++
i
)
{
if
(
maxVal
<
info
.
blockRowsHisto
[
i
])
{
maxVal
=
info
.
blockRowsHisto
[
i
];
}
if
(
minVal
>
info
.
blockRowsHisto
[
i
])
{
minVal
=
info
.
blockRowsHisto
[
i
];
}
}
int32_t
delta
=
maxVal
-
minVal
;
int32_t
step
=
delta
/
50
;
int32_t
numOfBuckets
=
sizeof
(
info
.
blockRowsHisto
)
/
sizeof
(
info
.
blockRowsHisto
[
0
]);
int32_t
bucketRange
=
(
info
.
maxRows
-
info
.
minRows
)
/
numOfBuckets
;
for
(
int32_t
i
=
0
;
i
<
20
;
++
i
)
{
len
+=
sprintf
(
st
+
VARSTR_HEADER_SIZE
,
"%04d |"
,
info
.
defMinRows
+
bucketRange
*
(
i
+
1
));
int32_t
num
=
(
info
.
blockRowsHisto
[
i
]
+
step
-
1
)
/
step
;
for
(
int32_t
j
=
0
;
j
<
num
;
++
j
)
{
int32_t
x
=
sprintf
(
st
+
VARSTR_HEADER_SIZE
+
len
,
"%c"
,
'|'
);
len
+=
x
;
}
double
v
=
info
.
blockRowsHisto
[
i
]
*
100
.
0
/
info
.
numOfBlocks
;
len
+=
sprintf
(
st
+
VARSTR_HEADER_SIZE
+
len
,
" %d (%.3f%c)"
,
info
.
blockRowsHisto
[
i
],
v
,
'%'
);
printf
(
"%s
\n
"
,
st
);
varDataSetLen
(
st
,
len
);
colDataAppend
(
pColInfo
,
row
++
,
st
,
false
);
}
return
row
;
}
source/libs/function/src/taggfunction.c
浏览文件 @
9c1b1b45
...
...
@@ -3684,7 +3684,7 @@ static void blockDistInfoFromBinary(const char* data, int32_t len, STableBlockDi
pDist
->
totalRows
=
tbufReadUint64
(
&
br
);
pDist
->
maxRows
=
tbufReadInt32
(
&
br
);
pDist
->
minRows
=
tbufReadInt32
(
&
br
);
pDist
->
numOf
RowsInMemTable
=
tbufReadUint32
(
&
br
);
pDist
->
numOf
InmemRows
=
tbufReadUint32
(
&
br
);
pDist
->
numOfSmallBlocks
=
tbufReadUint32
(
&
br
);
int64_t
numSteps
=
tbufReadUint64
(
&
br
);
...
...
@@ -3732,7 +3732,7 @@ static void mergeTableBlockDist(SResultRowEntryInfo* pResInfo, const STableBlock
assert
(
pDist
!=
NULL
&&
pSrc
!=
NULL
);
pDist
->
numOfTables
+=
pSrc
->
numOfTables
;
pDist
->
numOf
RowsInMemTable
+=
pSrc
->
numOfRowsInMemTable
;
pDist
->
numOf
InmemRows
+=
pSrc
->
numOfInmemRows
;
pDist
->
numOfSmallBlocks
+=
pSrc
->
numOfSmallBlocks
;
pDist
->
numOfFiles
+=
pSrc
->
numOfFiles
;
pDist
->
totalSize
+=
pSrc
->
totalSize
;
...
...
@@ -3862,7 +3862,7 @@ void generateBlockDistResult(STableBlockDistInfo *pTableBlockDist, char* result)
percentiles
[
6
],
percentiles
[
7
],
percentiles
[
8
],
percentiles
[
9
],
percentiles
[
10
],
percentiles
[
11
],
min
,
max
,
avg
,
stdDev
,
totalRows
,
totalBlocks
,
smallBlocks
,
totalLen
/
1024
.
0
,
compRatio
,
pTableBlockDist
->
numOf
RowsInMemTable
);
pTableBlockDist
->
numOf
InmemRows
);
varDataSetLen
(
result
,
sz
);
UNUSED
(
sz
);
}
...
...
source/libs/stream/src/streamDispatch.c
浏览文件 @
9c1b1b45
...
...
@@ -100,6 +100,7 @@ int32_t streamBuildDispatchMsg(SStreamTask* pTask, SStreamDataBlock* data, SRpcM
.
upstreamNodeId
=
pTask
->
nodeId
,
.
blockNum
=
blockNum
,
};
qInfo
(
"dispatch from task %d (child id %d)"
,
pTask
->
taskId
,
pTask
->
childId
);
req
.
data
=
taosArrayInit
(
blockNum
,
sizeof
(
void
*
));
req
.
dataLen
=
taosArrayInit
(
blockNum
,
sizeof
(
int32_t
));
...
...
source/libs/stream/src/streamTask.c
浏览文件 @
9c1b1b45
...
...
@@ -16,14 +16,13 @@
#include "executor.h"
#include "tstream.h"
SStreamTask
*
tNewSStreamTask
(
int64_t
streamId
,
int32_t
childId
)
{
SStreamTask
*
tNewSStreamTask
(
int64_t
streamId
)
{
SStreamTask
*
pTask
=
(
SStreamTask
*
)
taosMemoryCalloc
(
1
,
sizeof
(
SStreamTask
));
if
(
pTask
==
NULL
)
{
return
NULL
;
}
pTask
->
taskId
=
tGenIdPI32
();
pTask
->
streamId
=
streamId
;
pTask
->
childId
=
childId
;
pTask
->
status
=
TASK_STATUS__IDLE
;
pTask
->
inputStatus
=
TASK_INPUT_STATUS__NORMAL
;
pTask
->
outputStatus
=
TASK_OUTPUT_STATUS__NORMAL
;
...
...
@@ -48,7 +47,6 @@ int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
if
(
tEncodeSEpSet
(
pEncoder
,
&
pTask
->
epSet
)
<
0
)
return
-
1
;
if
(
pTask
->
execType
!=
TASK_EXEC__NONE
)
{
if
(
tEncodeI8
(
pEncoder
,
pTask
->
exec
.
parallelizable
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
pEncoder
,
pTask
->
exec
.
qmsg
)
<
0
)
return
-
1
;
}
...
...
@@ -96,7 +94,6 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
if
(
tDecodeSEpSet
(
pDecoder
,
&
pTask
->
epSet
)
<
0
)
return
-
1
;
if
(
pTask
->
execType
!=
TASK_EXEC__NONE
)
{
if
(
tDecodeI8
(
pDecoder
,
&
pTask
->
exec
.
parallelizable
)
<
0
)
return
-
1
;
if
(
tDecodeCStrAlloc
(
pDecoder
,
&
pTask
->
exec
.
qmsg
)
<
0
)
return
-
1
;
}
...
...
tests/script/tsim/valgrind/checkError.sim
浏览文件 @
9c1b1b45
...
...
@@ -71,7 +71,8 @@ print ====> start to check if there are ERRORS in vagrind log file for each dnod
# -n : dnode[x] be check
system_content sh/checkValgrind.sh -n dnode1
print cmd return result----> [ $system_content ]
if $system_content <= 3 then
# temporarily expand the threshold, since no time to fix the memory leaks.
if $system_content <= 5 then
return 0
endi
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录