Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
c1b8c597
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
c1b8c597
编写于
1月 20, 2022
作者:
D
dapan1121
提交者:
GitHub
1月 20, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #9914 from taosdata/feature/qnode
Feature/qnode
上级
8adf6d9b
a818c130
变更
7
显示空白变更内容
内联
并排
Showing
7 changed file
with
283 addition
and
197 deletion
+283
-197
include/common/tmsgdef.h
include/common/tmsgdef.h
+0
-1
include/libs/executor/dataSinkMgt.h
include/libs/executor/dataSinkMgt.h
+0
-1
source/dnode/vnode/src/vnd/vnodeQuery.c
source/dnode/vnode/src/vnd/vnodeQuery.c
+0
-2
source/libs/executor/src/dataDispatcher.c
source/libs/executor/src/dataDispatcher.c
+0
-1
source/libs/qworker/inc/qworkerInt.h
source/libs/qworker/inc/qworkerInt.h
+11
-7
source/libs/qworker/src/qworker.c
source/libs/qworker/src/qworker.c
+272
-130
source/libs/qworker/src/qworkerMsg.c
source/libs/qworker/src/qworkerMsg.c
+0
-55
未找到文件。
include/common/tmsgdef.h
浏览文件 @
c1b8c597
...
@@ -172,7 +172,6 @@ enum {
...
@@ -172,7 +172,6 @@ enum {
TD_DEF_MSG_TYPE
(
TDMT_VND_SHOW_TABLES
,
"vnode-show-tables"
,
SVShowTablesReq
,
SVShowTablesRsp
)
TD_DEF_MSG_TYPE
(
TDMT_VND_SHOW_TABLES
,
"vnode-show-tables"
,
SVShowTablesReq
,
SVShowTablesRsp
)
TD_DEF_MSG_TYPE
(
TDMT_VND_SHOW_TABLES_FETCH
,
"vnode-show-tables-fetch"
,
SVShowTablesFetchReq
,
SVShowTablesFetchRsp
)
TD_DEF_MSG_TYPE
(
TDMT_VND_SHOW_TABLES_FETCH
,
"vnode-show-tables-fetch"
,
SVShowTablesFetchReq
,
SVShowTablesFetchRsp
)
TD_DEF_MSG_TYPE
(
TDMT_VND_QUERY_CONTINUE
,
"vnode-query-continue"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_QUERY_CONTINUE
,
"vnode-query-continue"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_SCHEDULE_DATA_SINK
,
"vnode-schedule-data-sink"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_SUBSCRIBE
,
"vnode-subscribe"
,
SMVSubscribeReq
,
SMVSubscribeRsp
)
TD_DEF_MSG_TYPE
(
TDMT_VND_SUBSCRIBE
,
"vnode-subscribe"
,
SMVSubscribeReq
,
SMVSubscribeRsp
)
TD_DEF_MSG_TYPE
(
TDMT_VND_CONSUME
,
"vnode-consume"
,
SMqCVConsumeReq
,
SMqCVConsumeRsp
)
TD_DEF_MSG_TYPE
(
TDMT_VND_CONSUME
,
"vnode-consume"
,
SMqCVConsumeReq
,
SMqCVConsumeRsp
)
...
...
include/libs/executor/dataSinkMgt.h
浏览文件 @
c1b8c597
...
@@ -48,7 +48,6 @@ typedef struct SOutputData {
...
@@ -48,7 +48,6 @@ typedef struct SOutputData {
int8_t
compressed
;
int8_t
compressed
;
char
*
pData
;
char
*
pData
;
bool
queryEnd
;
bool
queryEnd
;
int32_t
scheduleJobNo
;
int32_t
bufStatus
;
int32_t
bufStatus
;
int64_t
useconds
;
int64_t
useconds
;
int8_t
precision
;
int8_t
precision
;
...
...
source/dnode/vnode/src/vnd/vnodeQuery.c
浏览文件 @
c1b8c597
...
@@ -29,8 +29,6 @@ int vnodeProcessQueryReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
...
@@ -29,8 +29,6 @@ int vnodeProcessQueryReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
return
qWorkerProcessQueryMsg
(
pVnode
->
pTsdb
,
pVnode
->
pQuery
,
pMsg
);
return
qWorkerProcessQueryMsg
(
pVnode
->
pTsdb
,
pVnode
->
pQuery
,
pMsg
);
case
TDMT_VND_QUERY_CONTINUE
:
case
TDMT_VND_QUERY_CONTINUE
:
return
qWorkerProcessCQueryMsg
(
pVnode
->
pTsdb
,
pVnode
->
pQuery
,
pMsg
);
return
qWorkerProcessCQueryMsg
(
pVnode
->
pTsdb
,
pVnode
->
pQuery
,
pMsg
);
case
TDMT_VND_SCHEDULE_DATA_SINK
:
return
qWorkerProcessDataSinkMsg
(
pVnode
->
pTsdb
,
pVnode
->
pQuery
,
pMsg
);
default:
default:
vError
(
"unknown msg type:%d in query queue"
,
pMsg
->
msgType
);
vError
(
"unknown msg type:%d in query queue"
,
pMsg
->
msgType
);
return
TSDB_CODE_VND_APP_ERROR
;
return
TSDB_CODE_VND_APP_ERROR
;
...
...
source/libs/executor/src/dataDispatcher.c
浏览文件 @
c1b8c597
...
@@ -196,7 +196,6 @@ static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) {
...
@@ -196,7 +196,6 @@ static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) {
pOutput
->
bufStatus
=
updateStatus
(
pDispatcher
);
pOutput
->
bufStatus
=
updateStatus
(
pDispatcher
);
pthread_mutex_lock
(
&
pDispatcher
->
mutex
);
pthread_mutex_lock
(
&
pDispatcher
->
mutex
);
pOutput
->
queryEnd
=
pDispatcher
->
queryEnd
;
pOutput
->
queryEnd
=
pDispatcher
->
queryEnd
;
pOutput
->
scheduleJobNo
=
0
;
pOutput
->
useconds
=
pDispatcher
->
useconds
;
pOutput
->
useconds
=
pDispatcher
->
useconds
;
pOutput
->
precision
=
pDispatcher
->
schema
.
precision
;
pOutput
->
precision
=
pDispatcher
->
schema
.
precision
;
pthread_mutex_unlock
(
&
pDispatcher
->
mutex
);
pthread_mutex_unlock
(
&
pDispatcher
->
mutex
);
...
...
source/libs/qworker/inc/qworkerInt.h
浏览文件 @
c1b8c597
...
@@ -31,8 +31,6 @@ enum {
...
@@ -31,8 +31,6 @@ enum {
QW_PHASE_POST_QUERY
,
QW_PHASE_POST_QUERY
,
QW_PHASE_PRE_CQUERY
,
QW_PHASE_PRE_CQUERY
,
QW_PHASE_POST_CQUERY
,
QW_PHASE_POST_CQUERY
,
QW_PHASE_PRE_SINK
,
QW_PHASE_POST_SINK
,
QW_PHASE_PRE_FETCH
,
QW_PHASE_PRE_FETCH
,
QW_PHASE_POST_FETCH
,
QW_PHASE_POST_FETCH
,
};
};
...
@@ -105,10 +103,12 @@ typedef struct SQWTaskStatus {
...
@@ -105,10 +103,12 @@ typedef struct SQWTaskStatus {
typedef
struct
SQWTaskCtx
{
typedef
struct
SQWTaskCtx
{
SRWLatch
lock
;
SRWLatch
lock
;
int
32_t
phase
;
int
8_t
phase
;
int32_t
sinkId
;
bool
emptyRes
;
int32_t
readyCode
;
int8_t
queryContinue
;
int8_t
inQueue
;
int32_t
rspCode
;
int8_t
events
[
QW_EVENT_MAX
];
int8_t
events
[
QW_EVENT_MAX
];
...
@@ -144,7 +144,11 @@ typedef struct SQWorkerMgmt {
...
@@ -144,7 +144,11 @@ typedef struct SQWorkerMgmt {
#define QW_SET_EVENT_RECEIVED(ctx, event) atomic_store_8(&(ctx)->events[event], QW_EVENT_RECEIVED)
#define QW_SET_EVENT_RECEIVED(ctx, event) atomic_store_8(&(ctx)->events[event], QW_EVENT_RECEIVED)
#define QW_SET_EVENT_PROCESSED(ctx, event) atomic_store_8(&(ctx)->events[event], QW_EVENT_PROCESSED)
#define QW_SET_EVENT_PROCESSED(ctx, event) atomic_store_8(&(ctx)->events[event], QW_EVENT_PROCESSED)
#define QW_IN_EXECUTOR(ctx) ((ctx)->phase == QW_PHASE_PRE_QUERY || (ctx)->phase == QW_PHASE_PRE_CQUERY || (ctx)->phase == QW_PHASE_PRE_FETCH || (ctx)->phase == QW_PHASE_PRE_SINK)
#define QW_GET_PHASE(ctx) atomic_load_8(&(ctx)->phase)
#define QW_SET_RSP_CODE(ctx, code) atomic_val_compare_exchange_32(&(ctx)->rspCode, 0, code)
#define QW_IN_EXECUTOR(ctx) (QW_GET_PHASE(ctx) == QW_PHASE_PRE_QUERY || QW_GET_PHASE(ctx) == QW_PHASE_PRE_CQUERY || QW_GET_PHASE(ctx) == QW_PHASE_PRE_FETCH)
#define QW_TASK_NOT_EXIST(code) (TSDB_CODE_QRY_SCH_NOT_EXIST == (code) || TSDB_CODE_QRY_TASK_NOT_EXIST == (code))
#define QW_TASK_NOT_EXIST(code) (TSDB_CODE_QRY_SCH_NOT_EXIST == (code) || TSDB_CODE_QRY_TASK_NOT_EXIST == (code))
#define QW_TASK_ALREADY_EXIST(code) (TSDB_CODE_QRY_TASK_ALREADY_EXIST == (code))
#define QW_TASK_ALREADY_EXIST(code) (TSDB_CODE_QRY_TASK_ALREADY_EXIST == (code))
...
...
source/libs/qworker/src/qworker.c
浏览文件 @
c1b8c597
...
@@ -254,6 +254,19 @@ int32_t qwAcquireTaskCtx(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_
...
@@ -254,6 +254,19 @@ int32_t qwAcquireTaskCtx(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
int32_t
qwGetTaskCtx
(
SQWorkerMgmt
*
mgmt
,
uint64_t
sId
,
uint64_t
qId
,
uint64_t
tId
,
SQWTaskCtx
**
ctx
)
{
char
id
[
sizeof
(
qId
)
+
sizeof
(
tId
)]
=
{
0
};
QW_SET_QTID
(
id
,
qId
,
tId
);
*
ctx
=
taosHashGet
(
mgmt
->
ctxHash
,
id
,
sizeof
(
id
));
if
(
NULL
==
(
*
ctx
))
{
QW_TASK_ELOG
(
"ctx not in ctxHash, id:%s"
,
id
);
QW_ERR_RET
(
TSDB_CODE_QRY_RES_CACHE_NOT_EXIST
);
}
return
TSDB_CODE_SUCCESS
;
}
int32_t
qwAddTaskCtxImpl
(
SQWorkerMgmt
*
mgmt
,
uint64_t
sId
,
uint64_t
qId
,
uint64_t
tId
,
int32_t
rwType
,
int32_t
status
,
SQWTaskCtx
**
ctx
)
{
int32_t
qwAddTaskCtxImpl
(
SQWorkerMgmt
*
mgmt
,
uint64_t
sId
,
uint64_t
qId
,
uint64_t
tId
,
int32_t
rwType
,
int32_t
status
,
SQWTaskCtx
**
ctx
)
{
char
id
[
sizeof
(
qId
)
+
sizeof
(
tId
)]
=
{
0
};
char
id
[
sizeof
(
qId
)
+
sizeof
(
tId
)]
=
{
0
};
QW_SET_QTID
(
id
,
qId
,
tId
);
QW_SET_QTID
(
id
,
qId
,
tId
);
...
@@ -270,6 +283,8 @@ int32_t qwAddTaskCtxImpl(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_
...
@@ -270,6 +283,8 @@ int32_t qwAddTaskCtxImpl(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_
if
(
HASH_NODE_EXIST
(
code
))
{
if
(
HASH_NODE_EXIST
(
code
))
{
if
(
rwType
&&
ctx
)
{
if
(
rwType
&&
ctx
)
{
QW_RET
(
qwAcquireTaskCtx
(
QW_FPARAMS
(),
rwType
,
ctx
));
QW_RET
(
qwAcquireTaskCtx
(
QW_FPARAMS
(),
rwType
,
ctx
));
}
else
if
(
ctx
)
{
QW_RET
(
qwGetTaskCtx
(
QW_FPARAMS
(),
ctx
));
}
else
{
}
else
{
QW_TASK_ELOG
(
"task ctx already exist, id:%s"
,
id
);
QW_TASK_ELOG
(
"task ctx already exist, id:%s"
,
id
);
QW_ERR_RET
(
TSDB_CODE_QRY_TASK_ALREADY_EXIST
);
QW_ERR_RET
(
TSDB_CODE_QRY_TASK_ALREADY_EXIST
);
...
@@ -283,6 +298,8 @@ int32_t qwAddTaskCtxImpl(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_
...
@@ -283,6 +298,8 @@ int32_t qwAddTaskCtxImpl(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_
if
(
rwType
&&
ctx
)
{
if
(
rwType
&&
ctx
)
{
QW_RET
(
qwAcquireTaskCtx
(
QW_FPARAMS
(),
rwType
,
ctx
));
QW_RET
(
qwAcquireTaskCtx
(
QW_FPARAMS
(),
rwType
,
ctx
));
}
else
if
(
ctx
)
{
QW_RET
(
qwGetTaskCtx
(
QW_FPARAMS
(),
ctx
));
}
}
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
...
@@ -292,39 +309,48 @@ int32_t qwAddTaskCtx(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tI
...
@@ -292,39 +309,48 @@ int32_t qwAddTaskCtx(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tI
QW_RET
(
qwAddTaskCtxImpl
(
QW_FPARAMS
(),
0
,
0
,
NULL
));
QW_RET
(
qwAddTaskCtxImpl
(
QW_FPARAMS
(),
0
,
0
,
NULL
));
}
}
int32_t
qwGetTaskCtx
(
SQWorkerMgmt
*
mgmt
,
uint64_t
sId
,
uint64_t
qId
,
uint64_t
tId
,
SQWTaskCtx
**
ctx
)
{
char
id
[
sizeof
(
qId
)
+
sizeof
(
tId
)]
=
{
0
};
QW_SET_QTID
(
id
,
qId
,
tId
);
*
ctx
=
taosHashGet
(
mgmt
->
ctxHash
,
id
,
sizeof
(
id
));
if
(
NULL
==
(
*
ctx
))
{
QW_TASK_ELOG
(
"ctx not in ctxHash, id:%s"
,
id
);
QW_ERR_RET
(
TSDB_CODE_QRY_RES_CACHE_NOT_EXIST
);
}
return
TSDB_CODE_SUCCESS
;
}
int32_t
qwAddAcquireTaskCtx
(
SQWorkerMgmt
*
mgmt
,
uint64_t
sId
,
uint64_t
qId
,
uint64_t
tId
,
int32_t
rwType
,
SQWTaskCtx
**
ctx
)
{
int32_t
qwAddAcquireTaskCtx
(
SQWorkerMgmt
*
mgmt
,
uint64_t
sId
,
uint64_t
qId
,
uint64_t
tId
,
int32_t
rwType
,
SQWTaskCtx
**
ctx
)
{
return
qwAddTaskCtxImpl
(
QW_FPARAMS
(),
rwType
,
0
,
ctx
);
return
qwAddTaskCtxImpl
(
QW_FPARAMS
(),
rwType
,
0
,
ctx
);
}
}
int32_t
qwAddGetTaskCtx
(
SQWorkerMgmt
*
mgmt
,
uint64_t
sId
,
uint64_t
qId
,
uint64_t
tId
,
SQWTaskCtx
**
ctx
)
{
return
qwAddTaskCtxImpl
(
QW_FPARAMS
(),
0
,
0
,
ctx
);
}
void
qwReleaseTaskCtx
(
int32_t
rwType
,
SQWorkerMgmt
*
mgmt
)
{
void
qwReleaseTaskCtx
(
int32_t
rwType
,
SQWorkerMgmt
*
mgmt
)
{
QW_UNLOCK
(
rwType
,
&
mgmt
->
ctxLock
);
QW_UNLOCK
(
rwType
,
&
mgmt
->
ctxLock
);
}
}
void
qwFreeTaskHandle
(
QW_FPARAMS_DEF
,
SQWTaskCtx
*
ctx
)
{
// RC WARNING
qTaskInfo_t
taskHandle
=
atomic_load_ptr
(
&
ctx
->
taskHandle
);
if
(
taskHandle
&&
atomic_val_compare_exchange_ptr
(
&
ctx
->
taskHandle
,
taskHandle
,
NULL
))
{
qDestroyTask
(
taskHandle
);
}
}
void
qwFreeTask
(
SQWorkerMgmt
*
mgmt
,
uint64_t
sId
,
uint64_t
qId
,
uint64_t
tId
,
SQWTaskCtx
*
ctx
)
{
int32_t
qwKillTaskHandle
(
QW_FPARAMS_DEF
,
SQWTaskCtx
*
ctx
)
{
if
(
ctx
->
taskHandle
)
{
int32_t
code
=
0
;
qDestroyTask
(
ctx
->
taskHandle
);
// RC WARNING
ctx
->
taskHandle
=
NULL
;
qTaskInfo_t
taskHandle
=
atomic_load_ptr
(
&
ctx
->
taskHandle
);
if
(
taskHandle
&&
atomic_val_compare_exchange_ptr
(
&
ctx
->
taskHandle
,
taskHandle
,
NULL
))
{
code
=
qKillTask
(
taskHandle
);
atomic_store_ptr
(
&
ctx
->
taskHandle
,
taskHandle
);
}
}
// TODO
QW_RET
(
code
);
if
(
ctx
->
sinkHandle
)
{
}
void
qwFreeTask
(
QW_FPARAMS_DEF
,
SQWTaskCtx
*
ctx
)
{
qwFreeTaskHandle
(
QW_FPARAMS
(),
ctx
);
if
(
ctx
->
sinkHandle
)
{
dsDestroyDataSinker
(
ctx
->
sinkHandle
);
ctx
->
sinkHandle
=
NULL
;
}
}
}
}
...
@@ -418,7 +444,7 @@ int32_t qwDropTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId,
...
@@ -418,7 +444,7 @@ int32_t qwDropTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId,
SQWTaskCtx
*
ctx
=
NULL
;
SQWTaskCtx
*
ctx
=
NULL
;
bool
locked
=
false
;
bool
locked
=
false
;
QW_ERR_JRET
(
qwAdd
AcquireTaskCtx
(
QW_FPARAMS
(),
QW_READ
,
&
ctx
));
QW_ERR_JRET
(
qwAdd
GetTaskCtx
(
QW_FPARAMS
()
,
&
ctx
));
QW_LOCK
(
QW_WRITE
,
&
ctx
->
lock
);
QW_LOCK
(
QW_WRITE
,
&
ctx
->
lock
);
...
@@ -430,9 +456,7 @@ int32_t qwDropTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId,
...
@@ -430,9 +456,7 @@ int32_t qwDropTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId,
}
}
if
(
QW_IN_EXECUTOR
(
ctx
))
{
if
(
QW_IN_EXECUTOR
(
ctx
))
{
if
(
ctx
->
taskHandle
)
{
QW_ERR_JRET
(
qwKillTaskHandle
(
QW_FPARAMS
(),
ctx
));
QW_ERR_JRET
(
qKillTask
(
ctx
->
taskHandle
));
}
QW_ERR_JRET
(
qwUpdateTaskStatus
(
QW_FPARAMS
(),
JOB_TASK_STATUS_DROPPING
));
QW_ERR_JRET
(
qwUpdateTaskStatus
(
QW_FPARAMS
(),
JOB_TASK_STATUS_DROPPING
));
}
else
if
(
ctx
->
phase
>
0
)
{
}
else
if
(
ctx
->
phase
>
0
)
{
...
@@ -449,12 +473,12 @@ int32_t qwDropTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId,
...
@@ -449,12 +473,12 @@ int32_t qwDropTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId,
_return:
_return:
if
(
locked
)
{
if
(
code
)
{
QW_
UNLOCK
(
QW_WRITE
,
&
ctx
->
lock
);
QW_
SET_RSP_CODE
(
ctx
,
code
);
}
}
if
(
ctx
)
{
if
(
locked
)
{
qwReleaseTaskCtx
(
QW_READ
,
mgmt
);
QW_UNLOCK
(
QW_WRITE
,
&
ctx
->
lock
);
}
}
QW_RET
(
code
);
QW_RET
(
code
);
...
@@ -465,8 +489,11 @@ int32_t qwExecTask(QW_FPARAMS_DEF, qTaskInfo_t taskHandle, DataSinkHandle sinkHa
...
@@ -465,8 +489,11 @@ int32_t qwExecTask(QW_FPARAMS_DEF, qTaskInfo_t taskHandle, DataSinkHandle sinkHa
bool
qcontinue
=
true
;
bool
qcontinue
=
true
;
SSDataBlock
*
pRes
=
NULL
;
SSDataBlock
*
pRes
=
NULL
;
uint64_t
useconds
=
0
;
uint64_t
useconds
=
0
;
int32_t
i
=
0
;
while
(
true
)
{
QW_TASK_DLOG
(
"start to execTask in executor, loopIdx:%d"
,
i
++
);
while
(
qcontinue
)
{
code
=
qExecTask
(
taskHandle
,
&
pRes
,
&
useconds
);
code
=
qExecTask
(
taskHandle
,
&
pRes
,
&
useconds
);
if
(
code
)
{
if
(
code
)
{
QW_TASK_ELOG
(
"qExecTask failed, code:%x"
,
code
);
QW_TASK_ELOG
(
"qExecTask failed, code:%x"
,
code
);
...
@@ -474,7 +501,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, qTaskInfo_t taskHandle, DataSinkHandle sinkHa
...
@@ -474,7 +501,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, qTaskInfo_t taskHandle, DataSinkHandle sinkHa
}
}
if
(
NULL
==
pRes
)
{
if
(
NULL
==
pRes
)
{
QW_TASK_DLOG
(
"query done, useconds:%"
PRIu64
,
useconds
);
QW_TASK_DLOG
(
"
task
query done, useconds:%"
PRIu64
,
useconds
);
dsEndPut
(
sinkHandle
,
useconds
);
dsEndPut
(
sinkHandle
,
useconds
);
break
;
break
;
}
}
...
@@ -485,6 +512,12 @@ int32_t qwExecTask(QW_FPARAMS_DEF, qTaskInfo_t taskHandle, DataSinkHandle sinkHa
...
@@ -485,6 +512,12 @@ int32_t qwExecTask(QW_FPARAMS_DEF, qTaskInfo_t taskHandle, DataSinkHandle sinkHa
QW_TASK_ELOG
(
"dsPutDataBlock failed, code:%x"
,
code
);
QW_TASK_ELOG
(
"dsPutDataBlock failed, code:%x"
,
code
);
QW_ERR_JRET
(
code
);
QW_ERR_JRET
(
code
);
}
}
QW_TASK_DLOG
(
"data put into sink, rows:%d, continueExecTask:%d"
,
pRes
->
info
.
rows
,
qcontinue
);
if
(
!
qcontinue
)
{
break
;
}
}
}
_return:
_return:
...
@@ -499,6 +532,20 @@ int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void
...
@@ -499,6 +532,20 @@ int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void
bool
queryEnd
=
false
;
bool
queryEnd
=
false
;
int32_t
code
=
0
;
int32_t
code
=
0
;
if
(
ctx
->
emptyRes
)
{
QW_TASK_DLOG
(
"query empty result, query end, phase:%d"
,
ctx
->
phase
);
QW_ERR_RET
(
qwUpdateTaskStatus
(
QW_FPARAMS
(),
JOB_TASK_STATUS_SUCCEED
));
QW_ERR_RET
(
qwMallocFetchRsp
(
len
,
&
rsp
));
*
rspMsg
=
rsp
;
*
dataLen
=
0
;
pOutput
->
queryEnd
=
true
;
return
TSDB_CODE_SUCCESS
;
}
dsGetDataLength
(
ctx
->
sinkHandle
,
&
len
,
&
queryEnd
);
dsGetDataLength
(
ctx
->
sinkHandle
,
&
len
,
&
queryEnd
);
if
(
len
<
0
)
{
if
(
len
<
0
)
{
...
@@ -519,9 +566,7 @@ int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void
...
@@ -519,9 +566,7 @@ int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void
QW_ERR_RET
(
qwUpdateTaskStatus
(
QW_FPARAMS
(),
JOB_TASK_STATUS_SUCCEED
));
QW_ERR_RET
(
qwUpdateTaskStatus
(
QW_FPARAMS
(),
JOB_TASK_STATUS_SUCCEED
));
QW_ERR_RET
(
qwMallocFetchRsp
(
len
,
&
rsp
));
QW_ERR_RET
(
qwMallocFetchRsp
(
len
,
&
rsp
));
*
rspMsg
=
rsp
;
*
rspMsg
=
rsp
;
*
dataLen
=
0
;
*
dataLen
=
0
;
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
...
@@ -540,15 +585,12 @@ int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void
...
@@ -540,15 +585,12 @@ int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void
QW_TASK_DLOG
(
"task got data in sink, dataLength:%d"
,
len
);
QW_TASK_DLOG
(
"task got data in sink, dataLength:%d"
,
len
);
QW_ERR_RET
(
qwMallocFetchRsp
(
len
,
&
rsp
));
QW_ERR_RET
(
qwMallocFetchRsp
(
len
,
&
rsp
));
*
rspMsg
=
rsp
;
*
rspMsg
=
rsp
;
pOutput
->
pData
=
rsp
->
data
;
pOutput
->
pData
=
rsp
->
data
;
code
=
dsGetDataBlock
(
ctx
->
sinkHandle
,
pOutput
);
code
=
dsGetDataBlock
(
ctx
->
sinkHandle
,
pOutput
);
if
(
code
)
{
if
(
code
)
{
QW_TASK_ELOG
(
"dsGetDataBlock failed, code:%x"
,
code
);
QW_TASK_ELOG
(
"dsGetDataBlock failed, code:%x"
,
code
);
qwFreeFetchRsp
(
rsp
);
QW_ERR_RET
(
code
);
QW_ERR_RET
(
code
);
}
}
...
@@ -576,7 +618,7 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int32_t phase, SQWPhaseInput *input, S
...
@@ -576,7 +618,7 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int32_t phase, SQWPhaseInput *input, S
switch
(
phase
)
{
switch
(
phase
)
{
case
QW_PHASE_PRE_QUERY
:
{
case
QW_PHASE_PRE_QUERY
:
{
QW_ERR_JRET
(
qwAdd
AcquireTaskCtx
(
QW_FPARAMS
(),
QW_READ
,
&
ctx
));
QW_ERR_JRET
(
qwAdd
GetTaskCtx
(
QW_FPARAMS
()
,
&
ctx
));
ctx
->
phase
=
phase
;
ctx
->
phase
=
phase
;
...
@@ -596,7 +638,6 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int32_t phase, SQWPhaseInput *input, S
...
@@ -596,7 +638,6 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int32_t phase, SQWPhaseInput *input, S
output
->
needStop
=
true
;
output
->
needStop
=
true
;
QW_ERR_JRET
(
qwAddTaskStatus
(
QW_FPARAMS
(),
JOB_TASK_STATUS_CANCELLED
));
QW_ERR_JRET
(
qwAddTaskStatus
(
QW_FPARAMS
(),
JOB_TASK_STATUS_CANCELLED
));
qwFreeTask
(
QW_FPARAMS
(),
ctx
);
QW_SET_EVENT_PROCESSED
(
ctx
,
QW_EVENT_CANCEL
);
QW_SET_EVENT_PROCESSED
(
ctx
,
QW_EVENT_CANCEL
);
...
@@ -609,7 +650,7 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int32_t phase, SQWPhaseInput *input, S
...
@@ -609,7 +650,7 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int32_t phase, SQWPhaseInput *input, S
break
;
break
;
}
}
case
QW_PHASE_POST_QUERY
:
{
case
QW_PHASE_POST_QUERY
:
{
QW_ERR_JRET
(
qw
AddAcquireTaskCtx
(
QW_FPARAMS
(),
QW_READ
,
&
ctx
));
QW_ERR_JRET
(
qw
GetTaskCtx
(
QW_FPARAMS
()
,
&
ctx
));
QW_LOCK
(
QW_WRITE
,
&
ctx
->
lock
);
QW_LOCK
(
QW_WRITE
,
&
ctx
->
lock
);
...
@@ -618,7 +659,13 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int32_t phase, SQWPhaseInput *input, S
...
@@ -618,7 +659,13 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int32_t phase, SQWPhaseInput *input, S
ctx
->
taskHandle
=
input
->
taskHandle
;
ctx
->
taskHandle
=
input
->
taskHandle
;
ctx
->
sinkHandle
=
input
->
sinkHandle
;
ctx
->
sinkHandle
=
input
->
sinkHandle
;
ctx
->
readyCode
=
input
->
code
;
if
(
NULL
==
ctx
->
taskHandle
&&
NULL
==
ctx
->
sinkHandle
)
{
ctx
->
emptyRes
=
true
;
}
if
(
input
->
code
)
{
QW_SET_RSP_CODE
(
ctx
,
input
->
code
);
}
assert
(
!
QW_IS_EVENT_PROCESSED
(
ctx
,
QW_EVENT_CANCEL
));
assert
(
!
QW_IS_EVENT_PROCESSED
(
ctx
,
QW_EVENT_CANCEL
));
...
@@ -654,8 +701,78 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int32_t phase, SQWPhaseInput *input, S
...
@@ -654,8 +701,78 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int32_t phase, SQWPhaseInput *input, S
}
}
break
;
break
;
}
}
case
QW_PHASE_PRE_CQUERY
:
{
QW_ERR_JRET
(
qwGetTaskCtx
(
QW_FPARAMS
(),
&
ctx
));
QW_LOCK
(
QW_WRITE
,
&
ctx
->
lock
);
locked
=
true
;
ctx
->
phase
=
phase
;
if
(
QW_IS_EVENT_PROCESSED
(
ctx
,
QW_EVENT_CANCEL
))
{
QW_TASK_WLOG
(
"task already cancelled, phase:%d"
,
phase
);
output
->
needStop
=
true
;
output
->
rspCode
=
TSDB_CODE_QRY_TASK_CANCELLED
;
QW_ERR_JRET
(
TSDB_CODE_QRY_TASK_CANCELLED
);
}
if
(
QW_IS_EVENT_RECEIVED
(
ctx
,
QW_EVENT_DROP
))
{
QW_TASK_WLOG
(
"task is dropping, phase:%d"
,
phase
);
output
->
needStop
=
true
;
output
->
rspCode
=
TSDB_CODE_QRY_TASK_DROPPING
;
}
else
if
(
QW_IS_EVENT_RECEIVED
(
ctx
,
QW_EVENT_CANCEL
))
{
QW_TASK_WLOG
(
"task is cancelling, phase:%d"
,
phase
);
output
->
needStop
=
true
;
output
->
rspCode
=
TSDB_CODE_QRY_TASK_CANCELLING
;
}
if
(
ctx
->
rspCode
)
{
QW_TASK_ELOG
(
"task already failed, code:%x, phase:%d"
,
ctx
->
rspCode
,
phase
);
output
->
needStop
=
true
;
output
->
rspCode
=
ctx
->
rspCode
;
QW_ERR_JRET
(
output
->
rspCode
);
}
break
;
}
case
QW_PHASE_POST_CQUERY
:
{
QW_ERR_JRET
(
qwGetTaskCtx
(
QW_FPARAMS
(),
&
ctx
));
QW_LOCK
(
QW_WRITE
,
&
ctx
->
lock
);
locked
=
true
;
if
(
input
->
code
)
{
QW_SET_RSP_CODE
(
ctx
,
input
->
code
);
}
if
(
QW_IS_EVENT_PROCESSED
(
ctx
,
QW_EVENT_CANCEL
))
{
QW_TASK_WLOG
(
"task already cancelled, phase:%d"
,
phase
);
output
->
needStop
=
true
;
output
->
rspCode
=
TSDB_CODE_QRY_TASK_CANCELLED
;
QW_ERR_JRET
(
TSDB_CODE_QRY_TASK_CANCELLED
);
}
if
(
QW_IS_EVENT_RECEIVED
(
ctx
,
QW_EVENT_DROP
))
{
QW_TASK_WLOG
(
"task is dropping, phase:%d"
,
phase
);
output
->
needStop
=
true
;
output
->
rspCode
=
TSDB_CODE_QRY_TASK_DROPPING
;
}
else
if
(
QW_IS_EVENT_RECEIVED
(
ctx
,
QW_EVENT_CANCEL
))
{
QW_TASK_WLOG
(
"task is cancelling, phase:%d"
,
phase
);
output
->
needStop
=
true
;
output
->
rspCode
=
TSDB_CODE_QRY_TASK_CANCELLING
;
}
if
(
ctx
->
rspCode
)
{
QW_TASK_ELOG
(
"task failed, code:%x, phase:%d"
,
ctx
->
rspCode
,
phase
);
output
->
needStop
=
true
;
output
->
rspCode
=
ctx
->
rspCode
;
QW_ERR_JRET
(
output
->
rspCode
);
}
break
;
}
case
QW_PHASE_PRE_FETCH
:
{
case
QW_PHASE_PRE_FETCH
:
{
QW_ERR_JRET
(
qw
AddAcquireTaskCtx
(
QW_FPARAMS
(),
QW_READ
,
&
ctx
));
QW_ERR_JRET
(
qw
GetTaskCtx
(
QW_FPARAMS
()
,
&
ctx
));
QW_LOCK
(
QW_WRITE
,
&
ctx
->
lock
);
QW_LOCK
(
QW_WRITE
,
&
ctx
->
lock
);
...
@@ -693,15 +810,26 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int32_t phase, SQWPhaseInput *input, S
...
@@ -693,15 +810,26 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int32_t phase, SQWPhaseInput *input, S
output
->
rspCode
=
TSDB_CODE_QRY_TASK_MSG_ERROR
;
output
->
rspCode
=
TSDB_CODE_QRY_TASK_MSG_ERROR
;
QW_ERR_JRET
(
TSDB_CODE_QRY_TASK_MSG_ERROR
);
QW_ERR_JRET
(
TSDB_CODE_QRY_TASK_MSG_ERROR
);
}
}
if
(
ctx
->
rspCode
)
{
QW_TASK_ELOG
(
"task already failed, code:%x, phase:%d"
,
ctx
->
rspCode
,
phase
);
output
->
needStop
=
true
;
output
->
rspCode
=
ctx
->
rspCode
;
QW_ERR_JRET
(
output
->
rspCode
);
}
break
;
break
;
}
}
case
QW_PHASE_POST_FETCH
:
{
case
QW_PHASE_POST_FETCH
:
{
QW_ERR_JRET
(
qw
AddAcquireTaskCtx
(
QW_FPARAMS
(),
QW_READ
,
&
ctx
));
QW_ERR_JRET
(
qw
GetTaskCtx
(
QW_FPARAMS
()
,
&
ctx
));
QW_LOCK
(
QW_WRITE
,
&
ctx
->
lock
);
QW_LOCK
(
QW_WRITE
,
&
ctx
->
lock
);
locked
=
true
;
locked
=
true
;
if
(
input
->
code
)
{
QW_SET_RSP_CODE
(
ctx
,
input
->
code
);
}
if
(
QW_IS_EVENT_PROCESSED
(
ctx
,
QW_EVENT_CANCEL
))
{
if
(
QW_IS_EVENT_PROCESSED
(
ctx
,
QW_EVENT_CANCEL
))
{
QW_TASK_WLOG
(
"task already cancelled, phase:%d"
,
phase
);
QW_TASK_WLOG
(
"task already cancelled, phase:%d"
,
phase
);
output
->
needStop
=
true
;
output
->
needStop
=
true
;
...
@@ -718,12 +846,17 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int32_t phase, SQWPhaseInput *input, S
...
@@ -718,12 +846,17 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int32_t phase, SQWPhaseInput *input, S
output
->
needStop
=
true
;
output
->
needStop
=
true
;
output
->
rspCode
=
TSDB_CODE_QRY_TASK_CANCELLING
;
output
->
rspCode
=
TSDB_CODE_QRY_TASK_CANCELLING
;
}
}
if
(
ctx
->
rspCode
)
{
QW_TASK_ELOG
(
"task failed, code:%x, phase:%d"
,
ctx
->
rspCode
,
phase
);
output
->
needStop
=
true
;
output
->
rspCode
=
ctx
->
rspCode
;
QW_ERR_JRET
(
output
->
rspCode
);
}
break
;
break
;
}
}
}
}
_return:
_return:
if
(
locked
)
{
if
(
locked
)
{
...
@@ -732,10 +865,6 @@ _return:
...
@@ -732,10 +865,6 @@ _return:
QW_UNLOCK
(
QW_WRITE
,
&
ctx
->
lock
);
QW_UNLOCK
(
QW_WRITE
,
&
ctx
->
lock
);
}
}
if
(
ctx
)
{
qwReleaseTaskCtx
(
QW_READ
,
mgmt
);
}
QW_RET
(
code
);
QW_RET
(
code
);
}
}
...
@@ -774,12 +903,14 @@ int32_t qwProcessQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t
...
@@ -774,12 +903,14 @@ int32_t qwProcessQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t
QW_ERR_JRET
(
code
);
QW_ERR_JRET
(
code
);
}
}
QW_ERR_JRET
(
qwBuildAndSendQueryRsp
(
qwMsg
->
connection
,
TSDB_CODE_SUCCESS
));
QW_ERR_JRET
(
qwBuildAndSendQueryRsp
(
qwMsg
->
connection
,
code
));
QW_TASK_DLOG
(
"query msg rsped, code:%d"
,
code
);
queryRsped
=
true
;
queryRsped
=
true
;
if
(
pTaskInfo
&&
sinkHandle
)
{
QW_ERR_JRET
(
qwExecTask
(
QW_FPARAMS
(),
pTaskInfo
,
sinkHandle
));
QW_ERR_JRET
(
qwExecTask
(
QW_FPARAMS
(),
pTaskInfo
,
sinkHandle
));
}
_return:
_return:
if
(
code
)
{
if
(
code
)
{
...
@@ -788,6 +919,10 @@ _return:
...
@@ -788,6 +919,10 @@ _return:
if
(
!
queryRsped
)
{
if
(
!
queryRsped
)
{
code
=
qwBuildAndSendQueryRsp
(
qwMsg
->
connection
,
rspCode
);
code
=
qwBuildAndSendQueryRsp
(
qwMsg
->
connection
,
rspCode
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
QW_TASK_DLOG
(
"query msg rsped, code:%d"
,
rspCode
);
}
if
(
TSDB_CODE_SUCCESS
==
rspCode
&&
code
)
{
if
(
TSDB_CODE_SUCCESS
==
rspCode
&&
code
)
{
rspCode
=
code
;
rspCode
=
code
;
}
}
...
@@ -811,6 +946,7 @@ _return:
...
@@ -811,6 +946,7 @@ _return:
if
(
queryRsped
&&
output
.
needRsp
)
{
if
(
queryRsped
&&
output
.
needRsp
)
{
qwBuildAndSendReadyRsp
(
qwMsg
->
connection
,
output
.
rspCode
);
qwBuildAndSendReadyRsp
(
qwMsg
->
connection
,
output
.
rspCode
);
QW_TASK_DLOG
(
"ready msg rsped, code:%x"
,
output
.
rspCode
);
}
}
QW_RET
(
rspCode
);
QW_RET
(
rspCode
);
...
@@ -819,24 +955,34 @@ _return:
...
@@ -819,24 +955,34 @@ _return:
int32_t
qwProcessReady
(
SQWorkerMgmt
*
mgmt
,
uint64_t
sId
,
uint64_t
qId
,
uint64_t
tId
,
SQWMsg
*
qwMsg
)
{
int32_t
qwProcessReady
(
SQWorkerMgmt
*
mgmt
,
uint64_t
sId
,
uint64_t
qId
,
uint64_t
tId
,
SQWMsg
*
qwMsg
)
{
int32_t
code
=
0
;
int32_t
code
=
0
;
SQWTaskCtx
*
ctx
=
NULL
;
SQWTaskCtx
*
ctx
=
NULL
;
int8_t
phase
=
0
;
QW_ERR_JRET
(
qw
AddAcquireTaskCtx
(
QW_FPARAMS
(),
QW_READ
,
&
ctx
));
QW_ERR_JRET
(
qw
GetTaskCtx
(
QW_FPARAMS
()
,
&
ctx
));
QW_LOCK
(
QW_WRITE
,
&
ctx
->
lock
);
QW_LOCK
(
QW_WRITE
,
&
ctx
->
lock
);
if
(
ctx
->
phase
==
QW_PHASE_PRE_QUERY
)
{
phase
=
QW_GET_PHASE
(
ctx
);
if
(
phase
==
QW_PHASE_PRE_QUERY
)
{
QW_SET_EVENT_RECEIVED
(
ctx
,
QW_EVENT_READY
);
QW_SET_EVENT_RECEIVED
(
ctx
,
QW_EVENT_READY
);
}
else
if
(
ctx
->
phase
==
QW_PHASE_POST_QUERY
)
{
QW_TASK_DLOG
(
"ready msg not rsped, phase:%d"
,
phase
);
}
else
if
(
phase
==
QW_PHASE_POST_QUERY
)
{
QW_SET_EVENT_PROCESSED
(
ctx
,
QW_EVENT_READY
);
QW_SET_EVENT_PROCESSED
(
ctx
,
QW_EVENT_READY
);
QW_ERR_JRET
(
qwBuildAndSendReadyRsp
(
qwMsg
->
connection
,
ctx
->
readyCode
));
QW_ERR_JRET
(
qwBuildAndSendReadyRsp
(
qwMsg
->
connection
,
ctx
->
rspCode
));
QW_TASK_DLOG
(
"ready msg rsped, code:%x"
,
ctx
->
rspCode
);
}
else
{
QW_TASK_ELOG
(
"invalid phase when got ready msg, phase:%d"
,
phase
);
assert
(
0
);
}
}
_return:
_return:
if
(
code
)
{
QW_SET_RSP_CODE
(
ctx
,
code
);
}
if
(
ctx
)
{
if
(
ctx
)
{
QW_UNLOCK
(
QW_WRITE
,
&
ctx
->
lock
);
QW_UNLOCK
(
QW_WRITE
,
&
ctx
->
lock
);
qwReleaseTaskCtx
(
QW_READ
,
mgmt
);
}
}
QW_RET
(
code
);
QW_RET
(
code
);
...
@@ -844,17 +990,17 @@ _return:
...
@@ -844,17 +990,17 @@ _return:
int32_t
qwProcessCQuery
(
SQWorkerMgmt
*
mgmt
,
uint64_t
sId
,
uint64_t
qId
,
uint64_t
tId
,
SQWMsg
*
qwMsg
)
{
int32_t
qwProcessCQuery
(
SQWorkerMgmt
*
mgmt
,
uint64_t
sId
,
uint64_t
qId
,
uint64_t
tId
,
SQWMsg
*
qwMsg
)
{
SQWTaskCtx
*
ctx
=
NULL
;
int32_t
code
=
0
;
int32_t
code
=
0
;
bool
queryRsped
=
false
;
bool
queryRsped
=
false
;
bool
needStop
=
false
;
bool
needStop
=
false
;
struct
SSubplan
*
plan
=
NULL
;
struct
SSubplan
*
plan
=
NULL
;
int32_t
rspCode
=
0
;
SQWPhaseInput
input
=
{
0
};
SQWPhaseInput
input
=
{
0
};
SQWPhaseOutput
output
=
{
0
};
SQWPhaseOutput
output
=
{
0
};
SQWTaskCtx
*
ctx
=
NULL
;
void
*
rsp
=
NULL
;
void
*
rsp
=
NULL
;
int32_t
dataLen
=
0
;
int32_t
dataLen
=
0
;
do
{
QW_ERR_JRET
(
qwHandleTaskEvent
(
QW_FPARAMS
(),
QW_PHASE_PRE_CQUERY
,
&
input
,
&
output
));
QW_ERR_JRET
(
qwHandleTaskEvent
(
QW_FPARAMS
(),
QW_PHASE_PRE_CQUERY
,
&
input
,
&
output
));
needStop
=
output
.
needStop
;
needStop
=
output
.
needStop
;
...
@@ -867,61 +1013,54 @@ int32_t qwProcessCQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t
...
@@ -867,61 +1013,54 @@ int32_t qwProcessCQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t
QW_ERR_JRET
(
qwGetTaskCtx
(
QW_FPARAMS
(),
&
ctx
));
QW_ERR_JRET
(
qwGetTaskCtx
(
QW_FPARAMS
(),
&
ctx
));
atomic_store_8
(
&
ctx
->
inQueue
,
0
);
qTaskInfo_t
taskHandle
=
ctx
->
taskHandle
;
qTaskInfo_t
taskHandle
=
ctx
->
taskHandle
;
DataSinkHandle
sinkHandle
=
ctx
->
sinkHandle
;
DataSinkHandle
sinkHandle
=
ctx
->
sinkHandle
;
QW_ERR_JRET
(
qwExecTask
(
QW_FPARAMS
(),
taskHandle
,
sinkHandle
));
QW_ERR_JRET
(
qwExecTask
(
QW_FPARAMS
(),
taskHandle
,
sinkHandle
));
QW_SET_EVENT_PROCESSED
(
ctx
,
QW_EVENT_CQUERY
);
if
(
QW_IS_EVENT_RECEIVED
(
ctx
,
QW_EVENT_FETCH
))
{
if
(
QW_IS_EVENT_RECEIVED
(
ctx
,
QW_EVENT_FETCH
))
{
SOutputData
sOutput
=
{
0
};
SOutputData
sOutput
=
{
0
};
QW_ERR_JRET
(
qwGetResFromSink
(
QW_FPARAMS
(),
ctx
,
&
dataLen
,
&
rsp
,
&
sOutput
));
QW_ERR_JRET
(
qwGetResFromSink
(
QW_FPARAMS
(),
ctx
,
&
dataLen
,
&
rsp
,
&
sOutput
));
// Note: schedule data sink firstly and will schedule query after it's done
if
((
!
sOutput
.
queryEnd
)
&&
(
DS_BUF_LOW
==
sOutput
.
bufStatus
||
DS_BUF_EMPTY
==
sOutput
.
bufStatus
))
{
if
(
sOutput
.
scheduleJobNo
)
{
if
(
sOutput
.
scheduleJobNo
>
ctx
->
sinkId
)
{
QW_TASK_DLOG
(
"sink need schedule, scheduleJobNo:%d"
,
sOutput
.
scheduleJobNo
);
ctx
->
sinkId
=
sOutput
.
scheduleJobNo
;
QW_ERR_JRET
(
qwBuildAndSendSchSinkMsg
(
QW_FPARAMS
(),
qwMsg
->
connection
));
}
}
else
if
((
!
sOutput
.
queryEnd
)
&&
(
DS_BUF_LOW
==
sOutput
.
bufStatus
||
DS_BUF_EMPTY
==
sOutput
.
bufStatus
))
{
QW_TASK_DLOG
(
"task not end, need to continue, bufStatus:%d"
,
sOutput
.
bufStatus
);
QW_TASK_DLOG
(
"task not end, need to continue, bufStatus:%d"
,
sOutput
.
bufStatus
);
if
(
!
QW_IS_EVENT_RECEIVED
(
ctx
,
QW_EVENT_CQUERY
))
{
// RC WARNING
QW_SET_EVENT_RECEIVED
(
ctx
,
QW_EVENT_CQUERY
);
atomic_store_8
(
&
ctx
->
queryContinue
,
1
);
QW_ERR_JRET
(
qwUpdateTaskStatus
(
QW_FPARAMS
(),
JOB_TASK_STATUS_EXECUTING
));
QW_ERR_RET
(
qwBuildAndSendCQueryMsg
(
QW_FPARAMS
(),
qwMsg
->
connection
));
}
}
}
if
(
rsp
)
{
if
(
rsp
)
{
qwBuildFetchRsp
(
rsp
,
&
sOutput
,
dataLen
);
qwBuildFetchRsp
(
rsp
,
&
sOutput
,
dataLen
);
}
}
QW_SET_EVENT_PROCESSED
(
ctx
,
QW_EVENT_FETCH
);
_return:
qwBuildAndSendFetchRsp
(
qwMsg
->
connection
,
rsp
,
dataLen
,
code
);
QW_TASK_DLOG
(
"fetch msg rsped, code:%x, dataLen:%d"
,
code
,
dataLen
);
}
else
{
atomic_store_8
(
&
ctx
->
queryContinue
,
1
);
}
}
qwHandleTaskEvent
(
QW_FPARAMS
(),
QW_PHASE_POST_CQUERY
,
&
input
,
&
output
);
_return:
if
(
QW_IS_EVENT_RECEIVED
(
ctx
,
QW_EVENT_FETCH
))
{
if
(
code
&&
QW_IS_EVENT_RECEIVED
(
ctx
,
QW_EVENT_FETCH
))
{
if
(
code
)
{
QW_SET_EVENT_PROCESSED
(
ctx
,
QW_EVENT_FETCH
);
QW_SET_EVENT_PROCESSED
(
ctx
,
QW_EVENT_FETCH
);
qwFreeFetchRsp
(
rsp
);
qwFreeFetchRsp
(
rsp
);
rsp
=
NULL
;
rsp
=
NULL
;
qwBuildAndSendFetchRsp
(
qwMsg
->
connection
,
rsp
,
0
,
code
);
qwBuildAndSendFetchRsp
(
qwMsg
->
connection
,
rsp
,
0
,
code
);
}
else
if
(
rsp
)
{
QW_TASK_DLOG
(
"fetch msg rsped, code:%x, dataLen:%d"
,
code
,
0
);
QW_SET_EVENT_PROCESSED
(
ctx
,
QW_EVENT_FETCH
);
qwBuildAndSendFetchRsp
(
qwMsg
->
connection
,
rsp
,
dataLen
,
code
);
}
}
}
QW_RET
(
rspCode
);
input
.
code
=
code
;
qwHandleTaskEvent
(
QW_FPARAMS
(),
QW_PHASE_POST_CQUERY
,
&
input
,
&
output
);
needStop
=
output
.
needStop
;
code
=
output
.
rspCode
;
}
while
((
!
needStop
)
&&
(
0
==
code
)
&&
atomic_val_compare_exchange_8
(
&
ctx
->
queryContinue
,
1
,
0
));
QW_RET
(
code
);
}
}
...
@@ -958,46 +1097,49 @@ int32_t qwProcessFetch(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t
...
@@ -958,46 +1097,49 @@ int32_t qwProcessFetch(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t
if
(
NULL
==
rsp
)
{
if
(
NULL
==
rsp
)
{
QW_SET_EVENT_RECEIVED
(
ctx
,
QW_EVENT_FETCH
);
QW_SET_EVENT_RECEIVED
(
ctx
,
QW_EVENT_FETCH
);
}
else
{
qwBuildFetchRsp
(
rsp
,
&
sOutput
,
dataLen
);
}
}
// Note: schedule data sink firstly and will schedule query after it's done
if
((
!
sOutput
.
queryEnd
)
&&
(
DS_BUF_LOW
==
sOutput
.
bufStatus
||
DS_BUF_EMPTY
==
sOutput
.
bufStatus
))
{
if
(
sOutput
.
scheduleJobNo
)
{
if
(
sOutput
.
scheduleJobNo
>
ctx
->
sinkId
)
{
QW_TASK_DLOG
(
"sink need schedule, scheduleJobNo:%d"
,
sOutput
.
scheduleJobNo
);
ctx
->
sinkId
=
sOutput
.
scheduleJobNo
;
QW_ERR_JRET
(
qwBuildAndSendSchSinkMsg
(
QW_FPARAMS
(),
qwMsg
->
connection
));
}
}
else
if
((
!
sOutput
.
queryEnd
)
&&
(
/* DS_BUF_LOW == sOutput.bufStatus || */
DS_BUF_EMPTY
==
sOutput
.
bufStatus
))
{
QW_TASK_DLOG
(
"task not end, need to continue, bufStatus:%d"
,
sOutput
.
bufStatus
);
QW_TASK_DLOG
(
"task not end, need to continue, bufStatus:%d"
,
sOutput
.
bufStatus
);
if
(
!
QW_IS_EVENT_RECEIVED
(
ctx
,
QW_EVENT_CQUERY
))
{
QW_LOCK
(
QW_WRITE
,
&
ctx
->
lock
);
QW_SET_EVENT_RECEIVED
(
ctx
,
QW_EVENT_CQUERY
)
;
locked
=
true
;
// RC WARNING
if
(
QW_IN_EXECUTOR
(
ctx
))
{
atomic_store_8
(
&
ctx
->
queryContinue
,
1
);
}
else
if
(
0
==
atomic_load_8
(
&
ctx
->
inQueue
))
{
QW_ERR_JRET
(
qwUpdateTaskStatus
(
QW_FPARAMS
(),
JOB_TASK_STATUS_EXECUTING
));
QW_ERR_JRET
(
qwUpdateTaskStatus
(
QW_FPARAMS
(),
JOB_TASK_STATUS_EXECUTING
));
QW_ERR_RET
(
qwBuildAndSendCQueryMsg
(
QW_FPARAMS
(),
qwMsg
->
connection
));
atomic_store_8
(
&
ctx
->
inQueue
,
1
);
QW_ERR_JRET
(
qwBuildAndSendCQueryMsg
(
QW_FPARAMS
(),
qwMsg
->
connection
));
}
}
}
}
if
(
rsp
)
{
_return:
qwBuildFetchRsp
(
rsp
,
&
sOutput
,
dataLen
);
if
(
locked
)
{
QW_UNLOCK
(
QW_WRITE
,
&
ctx
->
lock
);
}
}
_return:
input
.
code
=
code
;
qwHandleTaskEvent
(
QW_FPARAMS
(),
QW_PHASE_POST_FETCH
,
&
input
,
&
output
);
qwHandleTaskEvent
(
QW_FPARAMS
(),
QW_PHASE_POST_FETCH
,
&
input
,
&
output
);
if
(
code
)
{
if
(
code
)
{
qwFreeFetchRsp
(
rsp
);
qwFreeFetchRsp
(
rsp
);
rsp
=
NULL
;
rsp
=
NULL
;
qwBuildAndSendFetchRsp
(
qwMsg
->
connection
,
rsp
,
0
,
code
);
dataLen
=
0
;
qwBuildAndSendFetchRsp
(
qwMsg
->
connection
,
rsp
,
dataLen
,
code
);
QW_TASK_DLOG
(
"fetch msg rsped, code:%x, dataLen:%d"
,
code
,
dataLen
);
}
else
if
(
rsp
)
{
}
else
if
(
rsp
)
{
qwBuildAndSendFetchRsp
(
qwMsg
->
connection
,
rsp
,
dataLen
,
code
);
qwBuildAndSendFetchRsp
(
qwMsg
->
connection
,
rsp
,
dataLen
,
code
);
QW_TASK_DLOG
(
"fetch msg rsped, code:%x, dataLen:%d"
,
code
,
dataLen
);
}
}
QW_RET
(
code
);
QW_RET
(
code
);
}
}
...
...
source/libs/qworker/src/qworkerMsg.c
浏览文件 @
c1b8c597
...
@@ -229,42 +229,6 @@ int32_t qwBuildAndSendShowFetchRsp(SRpcMsg *pMsg, SVShowTablesFetchReq* pFetchRe
...
@@ -229,42 +229,6 @@ int32_t qwBuildAndSendShowFetchRsp(SRpcMsg *pMsg, SVShowTablesFetchReq* pFetchRe
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
int32_t
qwBuildAndSendSchSinkMsg
(
SQWorkerMgmt
*
mgmt
,
uint64_t
sId
,
uint64_t
qId
,
uint64_t
tId
,
void
*
connection
)
{
SRpcMsg
*
pMsg
=
(
SRpcMsg
*
)
connection
;
SSinkDataReq
*
req
=
(
SSinkDataReq
*
)
rpcMallocCont
(
sizeof
(
SSinkDataReq
));
if
(
NULL
==
req
)
{
qError
(
"rpcMallocCont %d failed"
,
(
int32_t
)
sizeof
(
SSinkDataReq
));
QW_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
req
->
header
.
vgId
=
mgmt
->
nodeId
;
req
->
sId
=
sId
;
req
->
queryId
=
qId
;
req
->
taskId
=
tId
;
SRpcMsg
pNewMsg
=
{
.
handle
=
pMsg
->
handle
,
.
ahandle
=
pMsg
->
ahandle
,
.
msgType
=
TDMT_VND_SCHEDULE_DATA_SINK
,
.
pCont
=
req
,
.
contLen
=
sizeof
(
SSinkDataReq
),
.
code
=
0
,
};
int32_t
code
=
(
*
mgmt
->
putToQueueFp
)(
mgmt
->
nodeObj
,
&
pNewMsg
);
if
(
TSDB_CODE_SUCCESS
!=
code
)
{
qError
(
"put data sink schedule msg to queue failed, code:%x"
,
code
);
rpcFreeCont
(
req
);
QW_ERR_RET
(
code
);
}
qDebug
(
"put data sink schedule msg to query queue"
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
qwBuildAndSendCQueryMsg
(
SQWorkerMgmt
*
mgmt
,
uint64_t
sId
,
uint64_t
qId
,
uint64_t
tId
,
void
*
connection
)
{
int32_t
qwBuildAndSendCQueryMsg
(
SQWorkerMgmt
*
mgmt
,
uint64_t
sId
,
uint64_t
qId
,
uint64_t
tId
,
void
*
connection
)
{
SRpcMsg
*
pMsg
=
(
SRpcMsg
*
)
connection
;
SRpcMsg
*
pMsg
=
(
SRpcMsg
*
)
connection
;
SQueryContinueReq
*
req
=
(
SQueryContinueReq
*
)
rpcMallocCont
(
sizeof
(
SQueryContinueReq
));
SQueryContinueReq
*
req
=
(
SQueryContinueReq
*
)
rpcMallocCont
(
sizeof
(
SQueryContinueReq
));
...
@@ -366,25 +330,6 @@ int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
...
@@ -366,25 +330,6 @@ int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
int32_t
qWorkerProcessDataSinkMsg
(
void
*
node
,
void
*
qWorkerMgmt
,
SRpcMsg
*
pMsg
){
if
(
NULL
==
node
||
NULL
==
qWorkerMgmt
||
NULL
==
pMsg
)
{
return
TSDB_CODE_QRY_INVALID_INPUT
;
}
SSinkDataReq
*
msg
=
pMsg
->
pCont
;
if
(
NULL
==
msg
||
pMsg
->
contLen
<
sizeof
(
*
msg
))
{
qError
(
"invalid sink data msg"
);
QW_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
//dsScheduleProcess();
//TODO
return
TSDB_CODE_SUCCESS
;
}
int32_t
qWorkerProcessReadyMsg
(
void
*
node
,
void
*
qWorkerMgmt
,
SRpcMsg
*
pMsg
){
int32_t
qWorkerProcessReadyMsg
(
void
*
node
,
void
*
qWorkerMgmt
,
SRpcMsg
*
pMsg
){
if
(
NULL
==
node
||
NULL
==
qWorkerMgmt
||
NULL
==
pMsg
)
{
if
(
NULL
==
node
||
NULL
==
qWorkerMgmt
||
NULL
==
pMsg
)
{
return
TSDB_CODE_QRY_INVALID_INPUT
;
return
TSDB_CODE_QRY_INVALID_INPUT
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录