Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
c4f813af
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
c4f813af
编写于
3月 30, 2022
作者:
H
Haojun Liao
提交者:
cpwu
3月 30, 2022
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[td-13039] support stream execution interval query.
上级
b63e26ff
变更
6
隐藏空白更改
内联
并排
Showing
6 changed file
with
33 addition
and
24 deletion
+33
-24
include/libs/executor/executor.h
include/libs/executor/executor.h
+6
-1
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+7
-11
source/libs/executor/src/executor.c
source/libs/executor/src/executor.c
+1
-1
source/libs/executor/src/executorMain.c
source/libs/executor/src/executorMain.c
+3
-2
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+15
-8
source/libs/qworker/src/qworker.c
source/libs/qworker/src/qworker.c
+1
-1
未找到文件。
include/libs/executor/executor.h
浏览文件 @
c4f813af
...
@@ -37,6 +37,11 @@ typedef struct SReadHandle {
...
@@ -37,6 +37,11 @@ typedef struct SReadHandle {
#define STREAM_DATA_TYPE_SUBMIT_BLOCK 0x1
#define STREAM_DATA_TYPE_SUBMIT_BLOCK 0x1
#define STREAM_DATA_TYPE_SSDATA_BLOCK 0x2
#define STREAM_DATA_TYPE_SSDATA_BLOCK 0x2
typedef
enum
{
OPTR_EXEC_MODEL_BATCH
=
0x1
,
OPTR_EXEC_MODEL_STREAM
=
0x2
,
}
EOPTR_EXEC_MODEL
;
/**
/**
* Create the exec task for streaming mode
* Create the exec task for streaming mode
* @param pMsg
* @param pMsg
...
@@ -84,7 +89,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, SArray* tableIdList, bool isA
...
@@ -84,7 +89,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, SArray* tableIdList, bool isA
* @return
* @return
*/
*/
int32_t
qCreateExecTask
(
SReadHandle
*
readHandle
,
int32_t
vgId
,
uint64_t
taskId
,
struct
SSubplan
*
pPlan
,
int32_t
qCreateExecTask
(
SReadHandle
*
readHandle
,
int32_t
vgId
,
uint64_t
taskId
,
struct
SSubplan
*
pPlan
,
qTaskInfo_t
*
pTaskInfo
,
DataSinkHandle
*
handle
);
qTaskInfo_t
*
pTaskInfo
,
DataSinkHandle
*
handle
,
EOPTR_EXEC_MODEL
model
);
/**
/**
* The main task execution function, including query on both table and multiple tables,
* The main task execution function, including query on both table and multiple tables,
...
...
source/libs/executor/inc/executorimpl.h
浏览文件 @
c4f813af
...
@@ -233,8 +233,8 @@ struct SOperatorInfo;
...
@@ -233,8 +233,8 @@ struct SOperatorInfo;
typedef
void
(
*
__optr_encode_fn_t
)(
struct
SOperatorInfo
*
pOperator
,
char
**
result
,
int32_t
*
length
);
typedef
void
(
*
__optr_encode_fn_t
)(
struct
SOperatorInfo
*
pOperator
,
char
**
result
,
int32_t
*
length
);
typedef
bool
(
*
__optr_decode_fn_t
)(
struct
SOperatorInfo
*
pOperator
,
char
*
result
,
int32_t
length
);
typedef
bool
(
*
__optr_decode_fn_t
)(
struct
SOperatorInfo
*
pOperator
,
char
*
result
,
int32_t
length
);
typedef
int32_t
(
*
__optr_open_fn_t
)(
struct
SOperatorInfo
*
p
aram
);
typedef
int32_t
(
*
__optr_open_fn_t
)(
struct
SOperatorInfo
*
p
Optr
);
typedef
SSDataBlock
*
(
*
__optr_fn_t
)(
struct
SOperatorInfo
*
p
aram
,
bool
*
newgroup
);
typedef
SSDataBlock
*
(
*
__optr_fn_t
)(
struct
SOperatorInfo
*
p
Optr
,
bool
*
newgroup
);
typedef
void
(
*
__optr_close_fn_t
)(
void
*
param
,
int32_t
num
);
typedef
void
(
*
__optr_close_fn_t
)(
void
*
param
,
int32_t
num
);
typedef
struct
STaskIdInfo
{
typedef
struct
STaskIdInfo
{
...
@@ -255,7 +255,8 @@ typedef struct SExecTaskInfo {
...
@@ -255,7 +255,8 @@ typedef struct SExecTaskInfo {
uint64_t
totalRows
;
// total number of rows
uint64_t
totalRows
;
// total number of rows
STableGroupInfo
tableqinfoGroupInfo
;
// this is a group array list, including SArray<STableQueryInfo*> structure
STableGroupInfo
tableqinfoGroupInfo
;
// this is a group array list, including SArray<STableQueryInfo*> structure
char
*
sql
;
// query sql string
char
*
sql
;
// query sql string
jmp_buf
env
;
//
jmp_buf
env
;
// jump to this position when error happens.
EOPTR_EXEC_MODEL
execModel
;
// operator execution model [batch model|stream model]
struct
SOperatorInfo
*
pRoot
;
struct
SOperatorInfo
*
pRoot
;
}
SExecTaskInfo
;
}
SExecTaskInfo
;
...
@@ -466,11 +467,6 @@ typedef struct SAggSupporter {
...
@@ -466,11 +467,6 @@ typedef struct SAggSupporter {
int32_t
resultRowSize
;
// the result buffer size for each result row, with the meta data size for each row
int32_t
resultRowSize
;
// the result buffer size for each result row, with the meta data size for each row
}
SAggSupporter
;
}
SAggSupporter
;
typedef
enum
{
OPTR_EXEC_MODEL_BATCH
=
0x1
,
OPTR_EXEC_MODEL_STREAM
=
0x2
,
}
OPTR_EXEC_MODEL
;
typedef
struct
STableIntervalOperatorInfo
{
typedef
struct
STableIntervalOperatorInfo
{
SOptrBasicInfo
binfo
;
// basic info
SOptrBasicInfo
binfo
;
// basic info
SGroupResInfo
groupResInfo
;
// multiple results build supporter
SGroupResInfo
groupResInfo
;
// multiple results build supporter
...
@@ -481,9 +477,9 @@ typedef struct STableIntervalOperatorInfo {
...
@@ -481,9 +477,9 @@ typedef struct STableIntervalOperatorInfo {
SAggSupporter
aggSup
;
// aggregate supporter
SAggSupporter
aggSup
;
// aggregate supporter
STableQueryInfo
*
pCurrent
;
// current tableQueryInfo struct
STableQueryInfo
*
pCurrent
;
// current tableQueryInfo struct
int32_t
order
;
// current SSDataBlock scan order
int32_t
order
;
// current SSDataBlock scan order
OPTR_EXEC_MODEL
execModel
;
// operator execution model [batch model|stream model]
E
OPTR_EXEC_MODEL
execModel
;
// operator execution model [batch model|stream model]
SArray
*
pUpdatedWindow
;
// updated time window due to the input data block from the downstream operator.
SArray
*
pUpdatedWindow
;
// updated time window due to the input data block from the downstream operator.
SColumnInfoData
timeWindowData
;
// query time window info for scalar function execution.
SColumnInfoData
timeWindowData
;
// query time window info for scalar function execution.
}
STableIntervalOperatorInfo
;
}
STableIntervalOperatorInfo
;
typedef
struct
SAggOperatorInfo
{
typedef
struct
SAggOperatorInfo
{
...
@@ -718,7 +714,7 @@ int32_t getMaximumIdleDurationSec();
...
@@ -718,7 +714,7 @@ int32_t getMaximumIdleDurationSec();
void
doInvokeUdf
(
struct
SUdfInfo
*
pUdfInfo
,
SqlFunctionCtx
*
pCtx
,
int32_t
idx
,
int32_t
type
);
void
doInvokeUdf
(
struct
SUdfInfo
*
pUdfInfo
,
SqlFunctionCtx
*
pCtx
,
int32_t
idx
,
int32_t
type
);
void
setTaskStatus
(
SExecTaskInfo
*
pTaskInfo
,
int8_t
status
);
void
setTaskStatus
(
SExecTaskInfo
*
pTaskInfo
,
int8_t
status
);
int32_t
createExecTaskInfoImpl
(
SSubplan
*
pPlan
,
SExecTaskInfo
**
pTaskInfo
,
SReadHandle
*
pHandle
,
uint64_t
taskId
);
int32_t
createExecTaskInfoImpl
(
SSubplan
*
pPlan
,
SExecTaskInfo
**
pTaskInfo
,
SReadHandle
*
pHandle
,
uint64_t
taskId
,
EOPTR_EXEC_MODEL
model
);
#ifdef __cplusplus
#ifdef __cplusplus
}
}
...
...
source/libs/executor/src/executor.c
浏览文件 @
c4f813af
...
@@ -113,7 +113,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, void* streamReadHandle) {
...
@@ -113,7 +113,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, void* streamReadHandle) {
}
}
qTaskInfo_t
pTaskInfo
=
NULL
;
qTaskInfo_t
pTaskInfo
=
NULL
;
code
=
qCreateExecTask
(
streamReadHandle
,
0
,
0
,
plan
,
&
pTaskInfo
,
NULL
);
code
=
qCreateExecTask
(
streamReadHandle
,
0
,
0
,
plan
,
&
pTaskInfo
,
NULL
,
OPTR_EXEC_MODEL_STREAM
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
// TODO: destroy SSubplan & pTaskInfo
// TODO: destroy SSubplan & pTaskInfo
terrno
=
code
;
terrno
=
code
;
...
...
source/libs/executor/src/executorMain.c
浏览文件 @
c4f813af
...
@@ -51,11 +51,12 @@ static void freeqinfoFn(void *qhandle) {
...
@@ -51,11 +51,12 @@ static void freeqinfoFn(void *qhandle) {
qDestroyTask
(
*
handle
);
qDestroyTask
(
*
handle
);
}
}
int32_t
qCreateExecTask
(
SReadHandle
*
readHandle
,
int32_t
vgId
,
uint64_t
taskId
,
SSubplan
*
pSubplan
,
qTaskInfo_t
*
pTaskInfo
,
DataSinkHandle
*
handle
)
{
int32_t
qCreateExecTask
(
SReadHandle
*
readHandle
,
int32_t
vgId
,
uint64_t
taskId
,
SSubplan
*
pSubplan
,
qTaskInfo_t
*
pTaskInfo
,
DataSinkHandle
*
handle
,
EOPTR_EXEC_MODEL
model
)
{
assert
(
readHandle
!=
NULL
&&
pSubplan
!=
NULL
);
assert
(
readHandle
!=
NULL
&&
pSubplan
!=
NULL
);
SExecTaskInfo
**
pTask
=
(
SExecTaskInfo
**
)
pTaskInfo
;
SExecTaskInfo
**
pTask
=
(
SExecTaskInfo
**
)
pTaskInfo
;
int32_t
code
=
createExecTaskInfoImpl
(
pSubplan
,
pTask
,
readHandle
,
taskId
);
int32_t
code
=
createExecTaskInfoImpl
(
pSubplan
,
pTask
,
readHandle
,
taskId
,
model
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
goto
_error
;
}
}
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
c4f813af
...
@@ -6930,6 +6930,10 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator, bool* newgro
...
@@ -6930,6 +6930,10 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator, bool* newgro
return NULL;
return NULL;
}
}
if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM) {
return pOperator->getStreamResFn(pOperator, newgroup);
}
pTaskInfo->code = pOperator->_openFn(pOperator);
pTaskInfo->code = pOperator->_openFn(pOperator);
if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
return NULL;
return NULL;
...
@@ -6946,7 +6950,7 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator, bool* newgro
...
@@ -6946,7 +6950,7 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator, bool* newgro
return pInfo->binfo.pRes->info.rows == 0 ? NULL : pInfo->binfo.pRes;
return pInfo->binfo.pRes->info.rows == 0 ? NULL : pInfo->binfo.pRes;
}
}
static SSDataBlock* doStreamIntervalAgg(SOperatorInfo *pOperator) {
static SSDataBlock* doStreamIntervalAgg(SOperatorInfo *pOperator
, bool* newgroup
) {
STableIntervalOperatorInfo* pInfo = pOperator->info;
STableIntervalOperatorInfo* pInfo = pOperator->info;
int32_t order = TSDB_ORDER_ASC;
int32_t order = TSDB_ORDER_ASC;
...
@@ -6964,14 +6968,14 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo *pOperator) {
...
@@ -6964,14 +6968,14 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo *pOperator) {
}
}
// STimeWindow win = {0};
// STimeWindow win = {0};
bool
newgroup = false;
*
newgroup = false;
SOperatorInfo* downstream = pOperator->pDownstream[0];
SOperatorInfo* downstream = pOperator->pDownstream[0];
SArray* pUpdated = NULL;
SArray* pUpdated = NULL;
while (1) {
while (1) {
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = downstream->getNextFn(downstream,
&
newgroup);
SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup);
publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
if (pBlock == NULL) {
...
@@ -7882,9 +7886,10 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
...
@@ -7882,9 +7886,10 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
}
}
pInfo->order = TSDB_ORDER_ASC;
pInfo->order = TSDB_ORDER_ASC;
pInfo->win = pTaskInfo->window;
pInfo->interval = *pInterval;
pInfo->interval = *pInterval;
pInfo->execModel = OPTR_EXEC_MODEL_BATCH;
pInfo->execModel = pTaskInfo->execModel;
pInfo->win = pTaskInfo->window;
pInfo->win.skey = 0;
pInfo->win.skey = 0;
pInfo->win.ekey = INT64_MAX;
pInfo->win.ekey = INT64_MAX;
...
@@ -7909,6 +7914,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
...
@@ -7909,6 +7914,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
pOperator->info = pInfo;
pOperator->info = pInfo;
pOperator->_openFn = doOpenIntervalAgg;
pOperator->_openFn = doOpenIntervalAgg;
pOperator->getNextFn = doBuildIntervalResult;
pOperator->getNextFn = doBuildIntervalResult;
pOperator->getStreamResFn= doStreamIntervalAgg;
pOperator->closeFn = destroyIntervalOperatorInfo;
pOperator->closeFn = destroyIntervalOperatorInfo;
code = appendDownstream(pOperator, &downstream, 1);
code = appendDownstream(pOperator, &downstream, 1);
...
@@ -8736,12 +8742,13 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t*
...
@@ -8736,12 +8742,13 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t*
return pExprs;
return pExprs;
}
}
static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId) {
static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId
, EOPTR_EXEC_MODEL model
) {
SExecTaskInfo* pTaskInfo = taosMemoryCalloc(1, sizeof(SExecTaskInfo));
SExecTaskInfo* pTaskInfo = taosMemoryCalloc(1, sizeof(SExecTaskInfo));
setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
pTaskInfo->cost.created = taosGetTimestampMs();
pTaskInfo->cost.created = taosGetTimestampMs();
pTaskInfo->id.queryId = queryId;
pTaskInfo->id.queryId = queryId;
pTaskInfo->execModel = model;
char* p = taosMemoryCalloc(1, 128);
char* p = taosMemoryCalloc(1, 128);
snprintf(p, 128, "TID:0x%" PRIx64 " QID:0x%" PRIx64, taskId, queryId);
snprintf(p, 128, "TID:0x%" PRIx64 " QID:0x%" PRIx64, taskId, queryId);
...
@@ -9102,11 +9109,11 @@ _error:
...
@@ -9102,11 +9109,11 @@ _error:
return NULL;
return NULL;
}
}
int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId) {
int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId
, EOPTR_EXEC_MODEL model
) {
uint64_t queryId = pPlan->id.queryId;
uint64_t queryId = pPlan->id.queryId;
int32_t code = TSDB_CODE_SUCCESS;
int32_t code = TSDB_CODE_SUCCESS;
*pTaskInfo = createExecTaskInfo(queryId, taskId);
*pTaskInfo = createExecTaskInfo(queryId, taskId
, model
);
if (*pTaskInfo == NULL) {
if (*pTaskInfo == NULL) {
code = TSDB_CODE_QRY_OUT_OF_MEMORY;
code = TSDB_CODE_QRY_OUT_OF_MEMORY;
goto _complete;
goto _complete;
...
...
source/libs/qworker/src/qworker.c
浏览文件 @
c4f813af
...
@@ -959,7 +959,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType) {
...
@@ -959,7 +959,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType) {
QW_ERR_JRET
(
code
);
QW_ERR_JRET
(
code
);
}
}
code
=
qCreateExecTask
(
qwMsg
->
node
,
mgmt
->
nodeId
,
tId
,
plan
,
&
pTaskInfo
,
&
sinkHandle
);
code
=
qCreateExecTask
(
qwMsg
->
node
,
mgmt
->
nodeId
,
tId
,
plan
,
&
pTaskInfo
,
&
sinkHandle
,
OPTR_EXEC_MODEL_BATCH
);
if
(
code
)
{
if
(
code
)
{
QW_TASK_ELOG
(
"qCreateExecTask failed, code:%x - %s"
,
code
,
tstrerror
(
code
));
QW_TASK_ELOG
(
"qCreateExecTask failed, code:%x - %s"
,
code
,
tstrerror
(
code
));
QW_ERR_JRET
(
code
);
QW_ERR_JRET
(
code
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录