Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
b14ce58b
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1187
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
b14ce58b
编写于
1月 19, 2022
作者:
D
dapan1121
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
feature/qnode
上级
8b808fbc
变更
7
隐藏空白更改
内联
并排
Showing
7 changed file
with
206 addition
and
168 deletion
+206
-168
include/common/tmsgdef.h
include/common/tmsgdef.h
+0
-1
include/libs/executor/dataSinkMgt.h
include/libs/executor/dataSinkMgt.h
+0
-1
source/dnode/vnode/src/vnd/vnodeQuery.c
source/dnode/vnode/src/vnd/vnodeQuery.c
+0
-2
source/libs/executor/src/dataDispatcher.c
source/libs/executor/src/dataDispatcher.c
+0
-1
source/libs/qworker/inc/qworkerInt.h
source/libs/qworker/inc/qworkerInt.h
+10
-7
source/libs/qworker/src/qworker.c
source/libs/qworker/src/qworker.c
+196
-101
source/libs/qworker/src/qworkerMsg.c
source/libs/qworker/src/qworkerMsg.c
+0
-55
未找到文件。
include/common/tmsgdef.h
浏览文件 @
b14ce58b
...
...
@@ -171,7 +171,6 @@ enum {
TD_DEF_MSG_TYPE
(
TDMT_VND_SHOW_TABLES
,
"vnode-show-tables"
,
SVShowTablesReq
,
SVShowTablesRsp
)
TD_DEF_MSG_TYPE
(
TDMT_VND_SHOW_TABLES_FETCH
,
"vnode-show-tables-fetch"
,
SVShowTablesFetchReq
,
SVShowTablesFetchRsp
)
TD_DEF_MSG_TYPE
(
TDMT_VND_QUERY_CONTINUE
,
"vnode-query-continue"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_SCHEDULE_DATA_SINK
,
"vnode-schedule-data-sink"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_SUBSCRIBE
,
"vnode-subscribe"
,
SMVSubscribeReq
,
SMVSubscribeRsp
)
...
...
include/libs/executor/dataSinkMgt.h
浏览文件 @
b14ce58b
...
...
@@ -48,7 +48,6 @@ typedef struct SOutputData {
int8_t
compressed
;
char
*
pData
;
bool
queryEnd
;
int32_t
scheduleJobNo
;
int32_t
bufStatus
;
int64_t
useconds
;
int8_t
precision
;
...
...
source/dnode/vnode/src/vnd/vnodeQuery.c
浏览文件 @
b14ce58b
...
...
@@ -29,8 +29,6 @@ int vnodeProcessQueryReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
return
qWorkerProcessQueryMsg
(
pVnode
->
pTsdb
,
pVnode
->
pQuery
,
pMsg
);
case
TDMT_VND_QUERY_CONTINUE
:
return
qWorkerProcessCQueryMsg
(
pVnode
->
pTsdb
,
pVnode
->
pQuery
,
pMsg
);
case
TDMT_VND_SCHEDULE_DATA_SINK
:
return
qWorkerProcessDataSinkMsg
(
pVnode
->
pTsdb
,
pVnode
->
pQuery
,
pMsg
);
default:
vError
(
"unknown msg type:%d in query queue"
,
pMsg
->
msgType
);
return
TSDB_CODE_VND_APP_ERROR
;
...
...
source/libs/executor/src/dataDispatcher.c
浏览文件 @
b14ce58b
...
...
@@ -196,7 +196,6 @@ static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) {
pOutput
->
bufStatus
=
updateStatus
(
pDispatcher
);
pthread_mutex_lock
(
&
pDispatcher
->
mutex
);
pOutput
->
queryEnd
=
pDispatcher
->
queryEnd
;
pOutput
->
scheduleJobNo
=
0
;
pOutput
->
useconds
=
pDispatcher
->
useconds
;
pOutput
->
precision
=
pDispatcher
->
schema
.
precision
;
pthread_mutex_unlock
(
&
pDispatcher
->
mutex
);
...
...
source/libs/qworker/inc/qworkerInt.h
浏览文件 @
b14ce58b
...
...
@@ -31,8 +31,6 @@ enum {
QW_PHASE_POST_QUERY
,
QW_PHASE_PRE_CQUERY
,
QW_PHASE_POST_CQUERY
,
QW_PHASE_PRE_SINK
,
QW_PHASE_POST_SINK
,
QW_PHASE_PRE_FETCH
,
QW_PHASE_POST_FETCH
,
};
...
...
@@ -105,10 +103,11 @@ typedef struct SQWTaskStatus {
typedef
struct
SQWTaskCtx
{
SRWLatch
lock
;
int32_t
phase
;
int32_t
sinkId
;
int32_t
readyCode
;
int8_t
phase
;
int8_t
queryContinue
;
int8_t
inQueue
;
int32_t
rspCode
;
int8_t
events
[
QW_EVENT_MAX
];
...
...
@@ -144,7 +143,11 @@ typedef struct SQWorkerMgmt {
#define QW_SET_EVENT_RECEIVED(ctx, event) atomic_store_8(&(ctx)->events[event], QW_EVENT_RECEIVED)
#define QW_SET_EVENT_PROCESSED(ctx, event) atomic_store_8(&(ctx)->events[event], QW_EVENT_PROCESSED)
#define QW_IN_EXECUTOR(ctx) ((ctx)->phase == QW_PHASE_PRE_QUERY || (ctx)->phase == QW_PHASE_PRE_CQUERY || (ctx)->phase == QW_PHASE_PRE_FETCH || (ctx)->phase == QW_PHASE_PRE_SINK)
#define QW_GET_PHASE(ctx) atomic_load_8(&(ctx)->phase)
#define QW_SET_RSP_CODE(ctx, code) atomic_val_compare_exchange_32(&(ctx)->rspCode, 0, code)
#define QW_IN_EXECUTOR(ctx) (QW_GET_PHASE(ctx) == QW_PHASE_PRE_QUERY || QW_GET_PHASE(ctx) == QW_PHASE_PRE_CQUERY || QW_GET_PHASE(ctx) == QW_PHASE_PRE_FETCH)
#define QW_TASK_NOT_EXIST(code) (TSDB_CODE_QRY_SCH_NOT_EXIST == (code) || TSDB_CODE_QRY_TASK_NOT_EXIST == (code))
#define QW_TASK_ALREADY_EXIST(code) (TSDB_CODE_QRY_TASK_ALREADY_EXIST == (code))
...
...
source/libs/qworker/src/qworker.c
浏览文件 @
b14ce58b
...
...
@@ -268,6 +268,8 @@ int32_t qwAddTaskCtxImpl(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_
if
(
HASH_NODE_EXIST
(
code
))
{
if
(
rwType
&&
ctx
)
{
QW_RET
(
qwAcquireTaskCtx
(
QW_FPARAMS
(),
rwType
,
ctx
));
}
else
if
(
ctx
)
{
QW_RET
(
qwGetTaskCtx
(
QW_FPARAMS
(),
ctx
));
}
else
{
QW_TASK_ELOG
(
"task ctx already exist, id:%s"
,
id
);
QW_ERR_RET
(
TSDB_CODE_QRY_TASK_ALREADY_EXIST
);
...
...
@@ -281,6 +283,8 @@ int32_t qwAddTaskCtxImpl(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_
if
(
rwType
&&
ctx
)
{
QW_RET
(
qwAcquireTaskCtx
(
QW_FPARAMS
(),
rwType
,
ctx
));
}
else
if
(
ctx
)
{
QW_RET
(
qwGetTaskCtx
(
QW_FPARAMS
(),
ctx
));
}
return
TSDB_CODE_SUCCESS
;
...
...
@@ -309,6 +313,11 @@ int32_t qwAddAcquireTaskCtx(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint
return
qwAddTaskCtxImpl
(
QW_FPARAMS
(),
rwType
,
0
,
ctx
);
}
int32_t
qwAddGetTaskCtx
(
SQWorkerMgmt
*
mgmt
,
uint64_t
sId
,
uint64_t
qId
,
uint64_t
tId
,
SQWTaskCtx
**
ctx
)
{
return
qwAddTaskCtxImpl
(
QW_FPARAMS
(),
0
,
0
,
ctx
);
}
void
qwReleaseTaskCtx
(
int32_t
rwType
,
SQWorkerMgmt
*
mgmt
)
{
QW_UNLOCK
(
rwType
,
&
mgmt
->
ctxLock
);
}
...
...
@@ -416,7 +425,7 @@ int32_t qwDropTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId,
SQWTaskCtx
*
ctx
=
NULL
;
bool
locked
=
false
;
QW_ERR_JRET
(
qwAdd
Acquire
TaskCtx
(
QW_FPARAMS
(),
QW_READ
,
&
ctx
));
QW_ERR_JRET
(
qwAdd
Get
TaskCtx
(
QW_FPARAMS
(),
QW_READ
,
&
ctx
));
QW_LOCK
(
QW_WRITE
,
&
ctx
->
lock
);
...
...
@@ -447,12 +456,12 @@ int32_t qwDropTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId,
_return:
if
(
locked
)
{
QW_
UNLOCK
(
QW_WRITE
,
&
ctx
->
lock
);
if
(
code
)
{
QW_
SET_RSP_CODE
(
ctx
,
code
);
}
if
(
ctx
)
{
qwReleaseTaskCtx
(
QW_READ
,
mgmt
);
if
(
locked
)
{
QW_UNLOCK
(
QW_WRITE
,
&
ctx
->
lock
);
}
QW_RET
(
code
);
...
...
@@ -463,8 +472,11 @@ int32_t qwExecTask(QW_FPARAMS_DEF, qTaskInfo_t taskHandle, DataSinkHandle sinkHa
bool
qcontinue
=
true
;
SSDataBlock
*
pRes
=
NULL
;
uint64_t
useconds
=
0
;
while
(
qcontinue
)
{
int32_t
i
=
0
;
while
(
true
)
{
QW_TASK_DLOG
(
"start to execTask in executor, loopIdx:%d"
,
i
++
);
code
=
qExecTask
(
taskHandle
,
&
pRes
,
&
useconds
);
if
(
code
)
{
QW_TASK_ELOG
(
"qExecTask failed, code:%x"
,
code
);
...
...
@@ -472,7 +484,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, qTaskInfo_t taskHandle, DataSinkHandle sinkHa
}
if
(
NULL
==
pRes
)
{
QW_TASK_DLOG
(
"query done, useconds:%"
PRIu64
,
useconds
);
QW_TASK_DLOG
(
"
task
query done, useconds:%"
PRIu64
,
useconds
);
dsEndPut
(
sinkHandle
,
useconds
);
break
;
}
...
...
@@ -483,6 +495,12 @@ int32_t qwExecTask(QW_FPARAMS_DEF, qTaskInfo_t taskHandle, DataSinkHandle sinkHa
QW_TASK_ELOG
(
"dsPutDataBlock failed, code:%x"
,
code
);
QW_ERR_JRET
(
code
);
}
QW_TASK_DLOG
(
"data put into sink, rows:%d, continueExecTask:%d"
,
pRes
->
info
.
rows
,
qcontinue
);
if
(
!
qcontinue
)
{
break
;
}
}
_return:
...
...
@@ -517,9 +535,7 @@ int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void
QW_ERR_RET
(
qwUpdateTaskStatus
(
QW_FPARAMS
(),
JOB_TASK_STATUS_SUCCEED
));
QW_ERR_RET
(
qwMallocFetchRsp
(
len
,
&
rsp
));
*
rspMsg
=
rsp
;
*
dataLen
=
0
;
return
TSDB_CODE_SUCCESS
;
...
...
@@ -538,15 +554,12 @@ int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void
QW_TASK_DLOG
(
"task got data in sink, dataLength:%d"
,
len
);
QW_ERR_RET
(
qwMallocFetchRsp
(
len
,
&
rsp
));
*
rspMsg
=
rsp
;
pOutput
->
pData
=
rsp
->
data
;
code
=
dsGetDataBlock
(
ctx
->
sinkHandle
,
pOutput
);
if
(
code
)
{
QW_TASK_ELOG
(
"dsGetDataBlock failed, code:%x"
,
code
);
qwFreeFetchRsp
(
rsp
);
QW_ERR_RET
(
code
);
}
...
...
@@ -574,7 +587,7 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int32_t phase, SQWPhaseInput *input, S
switch
(
phase
)
{
case
QW_PHASE_PRE_QUERY
:
{
QW_ERR_JRET
(
qwAdd
Acquire
TaskCtx
(
QW_FPARAMS
(),
QW_READ
,
&
ctx
));
QW_ERR_JRET
(
qwAdd
Get
TaskCtx
(
QW_FPARAMS
(),
QW_READ
,
&
ctx
));
ctx
->
phase
=
phase
;
...
...
@@ -607,7 +620,7 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int32_t phase, SQWPhaseInput *input, S
break
;
}
case
QW_PHASE_POST_QUERY
:
{
QW_ERR_JRET
(
qw
AddAcquire
TaskCtx
(
QW_FPARAMS
(),
QW_READ
,
&
ctx
));
QW_ERR_JRET
(
qw
Get
TaskCtx
(
QW_FPARAMS
(),
QW_READ
,
&
ctx
));
QW_LOCK
(
QW_WRITE
,
&
ctx
->
lock
);
...
...
@@ -616,7 +629,9 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int32_t phase, SQWPhaseInput *input, S
ctx
->
taskHandle
=
input
->
taskHandle
;
ctx
->
sinkHandle
=
input
->
sinkHandle
;
ctx
->
readyCode
=
input
->
code
;
if
(
input
->
code
)
{
QW_SET_RSP_CODE
(
ctx
,
input
->
code
);
}
assert
(
!
QW_IS_EVENT_PROCESSED
(
ctx
,
QW_EVENT_CANCEL
));
...
...
@@ -652,8 +667,78 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int32_t phase, SQWPhaseInput *input, S
}
break
;
}
case
QW_PHASE_PRE_CQUERY
:
{
QW_ERR_JRET
(
qwGetTaskCtx
(
QW_FPARAMS
(),
&
ctx
));
QW_LOCK
(
QW_WRITE
,
&
ctx
->
lock
);
locked
=
true
;
ctx
->
phase
=
phase
;
if
(
QW_IS_EVENT_PROCESSED
(
ctx
,
QW_EVENT_CANCEL
))
{
QW_TASK_WLOG
(
"task already cancelled, phase:%d"
,
phase
);
output
->
needStop
=
true
;
output
->
rspCode
=
TSDB_CODE_QRY_TASK_CANCELLED
;
QW_ERR_JRET
(
TSDB_CODE_QRY_TASK_CANCELLED
);
}
if
(
QW_IS_EVENT_RECEIVED
(
ctx
,
QW_EVENT_DROP
))
{
QW_TASK_WLOG
(
"task is dropping, phase:%d"
,
phase
);
output
->
needStop
=
true
;
output
->
rspCode
=
TSDB_CODE_QRY_TASK_DROPPING
;
}
else
if
(
QW_IS_EVENT_RECEIVED
(
ctx
,
QW_EVENT_CANCEL
))
{
QW_TASK_WLOG
(
"task is cancelling, phase:%d"
,
phase
);
output
->
needStop
=
true
;
output
->
rspCode
=
TSDB_CODE_QRY_TASK_CANCELLING
;
}
if
(
ctx
->
rspCode
)
{
QW_TASK_ELOG
(
"task already failed, code:%x, phase:%d"
,
ctx
->
rspCode
,
phase
);
output
->
needStop
=
true
;
output
->
rspCode
=
ctx
->
rspCode
;
QW_ERR_JRET
(
output
->
rspCode
);
}
break
;
}
case
QW_PHASE_POST_CQUERY
:
{
QW_ERR_JRET
(
qwGetTaskCtx
(
QW_FPARAMS
(),
QW_READ
,
&
ctx
));
QW_LOCK
(
QW_WRITE
,
&
ctx
->
lock
);
locked
=
true
;
if
(
input
->
code
)
{
QW_SET_RSP_CODE
(
ctx
,
input
->
code
);
}
if
(
QW_IS_EVENT_PROCESSED
(
ctx
,
QW_EVENT_CANCEL
))
{
QW_TASK_WLOG
(
"task already cancelled, phase:%d"
,
phase
);
output
->
needStop
=
true
;
output
->
rspCode
=
TSDB_CODE_QRY_TASK_CANCELLED
;
QW_ERR_JRET
(
TSDB_CODE_QRY_TASK_CANCELLED
);
}
if
(
QW_IS_EVENT_RECEIVED
(
ctx
,
QW_EVENT_DROP
))
{
QW_TASK_WLOG
(
"task is dropping, phase:%d"
,
phase
);
output
->
needStop
=
true
;
output
->
rspCode
=
TSDB_CODE_QRY_TASK_DROPPING
;
}
else
if
(
QW_IS_EVENT_RECEIVED
(
ctx
,
QW_EVENT_CANCEL
))
{
QW_TASK_WLOG
(
"task is cancelling, phase:%d"
,
phase
);
output
->
needStop
=
true
;
output
->
rspCode
=
TSDB_CODE_QRY_TASK_CANCELLING
;
}
if
(
ctx
->
rspCode
)
{
QW_TASK_ELOG
(
"task failed, code:%x, phase:%d"
,
ctx
->
rspCode
,
phase
);
output
->
needStop
=
true
;
output
->
rspCode
=
ctx
->
rspCode
;
QW_ERR_JRET
(
output
->
rspCode
);
}
break
;
}
case
QW_PHASE_PRE_FETCH
:
{
QW_ERR_JRET
(
qw
AddAcquire
TaskCtx
(
QW_FPARAMS
(),
QW_READ
,
&
ctx
));
QW_ERR_JRET
(
qw
Get
TaskCtx
(
QW_FPARAMS
(),
QW_READ
,
&
ctx
));
QW_LOCK
(
QW_WRITE
,
&
ctx
->
lock
);
...
...
@@ -691,15 +776,26 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int32_t phase, SQWPhaseInput *input, S
output
->
rspCode
=
TSDB_CODE_QRY_TASK_MSG_ERROR
;
QW_ERR_JRET
(
TSDB_CODE_QRY_TASK_MSG_ERROR
);
}
if
(
ctx
->
rspCode
)
{
QW_TASK_ELOG
(
"task already failed, code:%x, phase:%d"
,
ctx
->
rspCode
,
phase
);
output
->
needStop
=
true
;
output
->
rspCode
=
ctx
->
rspCode
;
QW_ERR_JRET
(
output
->
rspCode
);
}
break
;
}
case
QW_PHASE_POST_FETCH
:
{
QW_ERR_JRET
(
qw
AddAcquire
TaskCtx
(
QW_FPARAMS
(),
QW_READ
,
&
ctx
));
QW_ERR_JRET
(
qw
Get
TaskCtx
(
QW_FPARAMS
(),
QW_READ
,
&
ctx
));
QW_LOCK
(
QW_WRITE
,
&
ctx
->
lock
);
locked
=
true
;
if
(
input
->
code
)
{
QW_SET_RSP_CODE
(
ctx
,
input
->
code
);
}
if
(
QW_IS_EVENT_PROCESSED
(
ctx
,
QW_EVENT_CANCEL
))
{
QW_TASK_WLOG
(
"task already cancelled, phase:%d"
,
phase
);
output
->
needStop
=
true
;
...
...
@@ -716,12 +812,17 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int32_t phase, SQWPhaseInput *input, S
output
->
needStop
=
true
;
output
->
rspCode
=
TSDB_CODE_QRY_TASK_CANCELLING
;
}
if
(
ctx
->
rspCode
)
{
QW_TASK_ELOG
(
"task failed, code:%x, phase:%d"
,
ctx
->
rspCode
,
phase
);
output
->
needStop
=
true
;
output
->
rspCode
=
ctx
->
rspCode
;
QW_ERR_JRET
(
output
->
rspCode
);
}
break
;
}
}
_return:
if
(
locked
)
{
...
...
@@ -730,10 +831,6 @@ _return:
QW_UNLOCK
(
QW_WRITE
,
&
ctx
->
lock
);
}
if
(
ctx
)
{
qwReleaseTaskCtx
(
QW_READ
,
mgmt
);
}
QW_RET
(
code
);
}
...
...
@@ -817,24 +914,32 @@ _return:
int32_t
qwProcessReady
(
SQWorkerMgmt
*
mgmt
,
uint64_t
sId
,
uint64_t
qId
,
uint64_t
tId
,
SQWMsg
*
qwMsg
)
{
int32_t
code
=
0
;
SQWTaskCtx
*
ctx
=
NULL
;
QW_ERR_JRET
(
qwAddAcquireTaskCtx
(
QW_FPARAMS
(),
QW_READ
,
&
ctx
));
int8_t
phase
=
0
;
QW_ERR_JRET
(
qwGetTaskCtx
(
QW_FPARAMS
(),
&
ctx
));
QW_LOCK
(
QW_WRITE
,
&
ctx
->
lock
);
if
(
ctx
->
phase
==
QW_PHASE_PRE_QUERY
)
{
phase
=
QW_GET_PHASE
(
ctx
);
if
(
phase
==
QW_PHASE_PRE_QUERY
)
{
QW_SET_EVENT_RECEIVED
(
ctx
,
QW_EVENT_READY
);
}
else
if
(
ctx
->
phase
==
QW_PHASE_POST_QUERY
)
{
}
else
if
(
phase
==
QW_PHASE_POST_QUERY
)
{
QW_SET_EVENT_PROCESSED
(
ctx
,
QW_EVENT_READY
);
QW_ERR_JRET
(
qwBuildAndSendReadyRsp
(
qwMsg
->
connection
,
ctx
->
readyCode
));
QW_ERR_JRET
(
qwBuildAndSendReadyRsp
(
qwMsg
->
connection
,
ctx
->
rspCode
));
}
else
{
QW_TASK_ELOG
(
"invalid phase when got ready msg, phase:%d"
,
phase
);
assert
(
0
);
}
_return:
if
(
code
)
{
QW_SET_RSP_CODE
(
ctx
,
code
);
}
if
(
ctx
)
{
QW_UNLOCK
(
QW_WRITE
,
&
ctx
->
lock
);
qwReleaseTaskCtx
(
QW_READ
,
mgmt
);
}
QW_RET
(
code
);
...
...
@@ -842,84 +947,74 @@ _return:
int32_t
qwProcessCQuery
(
SQWorkerMgmt
*
mgmt
,
uint64_t
sId
,
uint64_t
qId
,
uint64_t
tId
,
SQWMsg
*
qwMsg
)
{
SQWTaskCtx
*
ctx
=
NULL
;
int32_t
code
=
0
;
bool
queryRsped
=
false
;
bool
needStop
=
false
;
struct
SSubplan
*
plan
=
NULL
;
int32_t
rspCode
=
0
;
SQWPhaseInput
input
=
{
0
};
SQWPhaseOutput
output
=
{
0
};
SQWTaskCtx
*
ctx
=
NULL
;
void
*
rsp
=
NULL
;
int32_t
dataLen
=
0
;
do
{
QW_ERR_JRET
(
qwHandleTaskEvent
(
QW_FPARAMS
(),
QW_PHASE_PRE_CQUERY
,
&
input
,
&
output
));
QW_ERR_JRET
(
qwHandleTaskEvent
(
QW_FPARAMS
(),
QW_PHASE_PRE_CQUERY
,
&
input
,
&
output
));
needStop
=
output
.
needStop
;
code
=
output
.
rspCode
;
if
(
needStop
)
{
QW_TASK_DLOG
(
"task need stop, phase:%d"
,
QW_PHASE_PRE_CQUERY
);
QW_ERR_JRET
(
code
);
}
needStop
=
output
.
needStop
;
code
=
output
.
rspCode
;
if
(
needStop
)
{
QW_TASK_DLOG
(
"task need stop, phase:%d"
,
QW_PHASE_PRE_CQUERY
);
QW_ERR_JRET
(
code
);
}
QW_ERR_JRET
(
qwGetTaskCtx
(
QW_FPARAMS
(),
&
ctx
));
QW_ERR_JRET
(
qwGetTaskCtx
(
QW_FPARAMS
(),
&
ctx
));
qTaskInfo_t
taskHandle
=
ctx
->
taskHandle
;
DataSinkHandle
sinkHandle
=
ctx
->
sinkHandle
;
atomic_store_8
(
&
ctx
->
inQueue
,
0
);
QW_ERR_JRET
(
qwExecTask
(
QW_FPARAMS
(),
taskHandle
,
sinkHandle
));
qTaskInfo_t
taskHandle
=
ctx
->
taskHandle
;
DataSinkHandle
sinkHandle
=
ctx
->
sinkHandle
;
QW_SET_EVENT_PROCESSED
(
ctx
,
QW_EVENT_CQUERY
);
QW_ERR_JRET
(
qwExecTask
(
QW_FPARAMS
(),
taskHandle
,
sinkHandle
)
);
if
(
QW_IS_EVENT_RECEIVED
(
ctx
,
QW_EVENT_FETCH
))
{
SOutputData
sOutput
=
{
0
};
QW_ERR_JRET
(
qwGetResFromSink
(
QW_FPARAMS
(),
ctx
,
&
dataLen
,
&
rsp
,
&
sOutput
));
// Note: schedule data sink firstly and will schedule query after it's done
if
(
sOutput
.
scheduleJobNo
)
{
if
(
sOutput
.
scheduleJobNo
>
ctx
->
sinkId
)
{
QW_TASK_DLOG
(
"sink need schedule, scheduleJobNo:%d"
,
sOutput
.
scheduleJobNo
);
ctx
->
sinkId
=
sOutput
.
scheduleJobNo
;
QW_ERR_JRET
(
qwBuildAndSendSchSinkMsg
(
QW_FPARAMS
(),
qwMsg
->
connection
));
if
(
QW_IS_EVENT_RECEIVED
(
ctx
,
QW_EVENT_FETCH
))
{
SOutputData
sOutput
=
{
0
};
QW_ERR_JRET
(
qwGetResFromSink
(
QW_FPARAMS
(),
ctx
,
&
dataLen
,
&
rsp
,
&
sOutput
));
if
((
!
sOutput
.
queryEnd
)
&&
(
DS_BUF_LOW
==
sOutput
.
bufStatus
||
DS_BUF_EMPTY
==
sOutput
.
bufStatus
))
{
QW_TASK_DLOG
(
"task not end, need to continue, bufStatus:%d"
,
sOutput
.
bufStatus
);
atomic_store_8
(
&
ctx
->
queryContinue
,
1
);
}
}
else
if
((
!
sOutput
.
queryEnd
)
&&
(
DS_BUF_LOW
==
sOutput
.
bufStatus
||
DS_BUF_EMPTY
==
sOutput
.
bufStatus
))
{
QW_TASK_DLOG
(
"task not end, need to continue, bufStatus:%d"
,
sOutput
.
bufStatus
);
if
(
!
QW_IS_EVENT_RECEIVED
(
ctx
,
QW_EVENT_CQUERY
))
{
QW_SET_EVENT_RECEIVED
(
ctx
,
QW_EVENT_CQUERY
);
QW_ERR_JRET
(
qwUpdateTaskStatus
(
QW_FPARAMS
(),
JOB_TASK_STATUS_EXECUTING
));
if
(
rsp
)
{
qwBuildFetchRsp
(
rsp
,
&
sOutput
,
dataLen
);
QW_SET_EVENT_PROCESSED
(
ctx
,
QW_EVENT_FETCH
);
QW_ERR_RET
(
qwBuildAndSendCQueryMsg
(
QW_FPARAMS
(),
qwMsg
->
connection
));
qwBuildAndSendFetchRsp
(
qwMsg
->
connection
,
rsp
,
dataLen
,
code
);
}
else
{
atomic_store_8
(
&
ctx
->
queryContinue
,
1
);
}
}
if
(
rsp
)
{
qwBuildFetchRsp
(
rsp
,
&
sOutput
,
dataLen
);
}
}
_return:
qwHandleTaskEvent
(
QW_FPARAMS
(),
QW_PHASE_POST_CQUERY
,
&
input
,
&
output
);
_return:
if
(
QW_IS_EVENT_RECEIVED
(
ctx
,
QW_EVENT_FETCH
))
{
if
(
code
)
{
if
(
code
&&
QW_IS_EVENT_RECEIVED
(
ctx
,
QW_EVENT_FETCH
))
{
QW_SET_EVENT_PROCESSED
(
ctx
,
QW_EVENT_FETCH
);
qwFreeFetchRsp
(
rsp
);
rsp
=
NULL
;
qwBuildAndSendFetchRsp
(
qwMsg
->
connection
,
rsp
,
0
,
code
);
}
else
if
(
rsp
)
{
QW_SET_EVENT_PROCESSED
(
ctx
,
QW_EVENT_FETCH
);
qwBuildAndSendFetchRsp
(
qwMsg
->
connection
,
rsp
,
dataLen
,
code
);
}
}
QW_RET
(
rspCode
);
input
.
code
=
code
;
qwHandleTaskEvent
(
QW_FPARAMS
(),
QW_PHASE_POST_CQUERY
,
&
input
,
&
output
);
needStop
=
output
.
needStop
;
code
=
output
.
rspCode
;
}
while
((
!
needStop
)
&&
(
0
==
code
)
&&
atomic_val_compare_exchange_8
(
&
ctx
->
queryContinue
,
1
,
0
));
QW_RET
(
code
);
}
...
...
@@ -956,35 +1051,35 @@ int32_t qwProcessFetch(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t
if
(
NULL
==
rsp
)
{
QW_SET_EVENT_RECEIVED
(
ctx
,
QW_EVENT_FETCH
);
}
else
{
qwBuildFetchRsp
(
rsp
,
&
sOutput
,
dataLen
);
}
// Note: schedule data sink firstly and will schedule query after it's done
if
(
sOutput
.
scheduleJobNo
)
{
if
(
sOutput
.
scheduleJobNo
>
ctx
->
sinkId
)
{
QW_TASK_DLOG
(
"sink need schedule, scheduleJobNo:%d"
,
sOutput
.
scheduleJobNo
);
ctx
->
sinkId
=
sOutput
.
scheduleJobNo
;
QW_ERR_JRET
(
qwBuildAndSendSchSinkMsg
(
QW_FPARAMS
(),
qwMsg
->
connection
));
}
}
else
if
((
!
sOutput
.
queryEnd
)
&&
(
/* DS_BUF_LOW == sOutput.bufStatus || */
DS_BUF_EMPTY
==
sOutput
.
bufStatus
))
{
if
((
!
sOutput
.
queryEnd
)
&&
(
/* DS_BUF_LOW == sOutput.bufStatus || */
DS_BUF_EMPTY
==
sOutput
.
bufStatus
))
{
QW_TASK_DLOG
(
"task not end, need to continue, bufStatus:%d"
,
sOutput
.
bufStatus
);
if
(
!
QW_IS_EVENT_RECEIVED
(
ctx
,
QW_EVENT_CQUERY
))
{
QW_SET_EVENT_RECEIVED
(
ctx
,
QW_EVENT_CQUERY
)
;
QW_LOCK
(
QW_WRITE
,
&
ctx
->
lock
);
locked
=
true
;
if
(
QW_IN_EXECUTOR
(
ctx
))
{
atomic_store_8
(
&
ctx
->
queryContinue
,
1
);
}
else
if
(
0
==
atomic_load_8
(
&
ctx
->
inQueue
))
{
QW_ERR_JRET
(
qwUpdateTaskStatus
(
QW_FPARAMS
(),
JOB_TASK_STATUS_EXECUTING
));
atomic_store_8
(
&
ctx
->
inQueue
,
1
);
QW_ERR_RET
(
qwBuildAndSendCQueryMsg
(
QW_FPARAMS
(),
qwMsg
->
connection
));
QW_ERR_
J
RET
(
qwBuildAndSendCQueryMsg
(
QW_FPARAMS
(),
qwMsg
->
connection
));
}
}
if
(
rsp
)
{
qwBuildFetchRsp
(
rsp
,
&
sOutput
,
dataLen
);
}
_return:
if
(
locked
)
{
QW_UNLOCK
(
QW_WRITE
,
&
ctx
->
lock
);
}
input
.
code
=
code
;
qwHandleTaskEvent
(
QW_FPARAMS
(),
QW_PHASE_POST_FETCH
,
&
input
,
&
output
);
if
(
code
)
{
...
...
source/libs/qworker/src/qworkerMsg.c
浏览文件 @
b14ce58b
...
...
@@ -229,42 +229,6 @@ int32_t qwBuildAndSendShowFetchRsp(SRpcMsg *pMsg, SVShowTablesFetchReq* pFetchRe
return
TSDB_CODE_SUCCESS
;
}
int32_t
qwBuildAndSendSchSinkMsg
(
SQWorkerMgmt
*
mgmt
,
uint64_t
sId
,
uint64_t
qId
,
uint64_t
tId
,
void
*
connection
)
{
SRpcMsg
*
pMsg
=
(
SRpcMsg
*
)
connection
;
SSinkDataReq
*
req
=
(
SSinkDataReq
*
)
rpcMallocCont
(
sizeof
(
SSinkDataReq
));
if
(
NULL
==
req
)
{
qError
(
"rpcMallocCont %d failed"
,
(
int32_t
)
sizeof
(
SSinkDataReq
));
QW_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
req
->
header
.
vgId
=
mgmt
->
nodeId
;
req
->
sId
=
sId
;
req
->
queryId
=
qId
;
req
->
taskId
=
tId
;
SRpcMsg
pNewMsg
=
{
.
handle
=
pMsg
->
handle
,
.
ahandle
=
pMsg
->
ahandle
,
.
msgType
=
TDMT_VND_SCHEDULE_DATA_SINK
,
.
pCont
=
req
,
.
contLen
=
sizeof
(
SSinkDataReq
),
.
code
=
0
,
};
int32_t
code
=
(
*
mgmt
->
putToQueueFp
)(
mgmt
->
nodeObj
,
&
pNewMsg
);
if
(
TSDB_CODE_SUCCESS
!=
code
)
{
qError
(
"put data sink schedule msg to queue failed, code:%x"
,
code
);
rpcFreeCont
(
req
);
QW_ERR_RET
(
code
);
}
qDebug
(
"put data sink schedule msg to query queue"
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
qwBuildAndSendCQueryMsg
(
SQWorkerMgmt
*
mgmt
,
uint64_t
sId
,
uint64_t
qId
,
uint64_t
tId
,
void
*
connection
)
{
SRpcMsg
*
pMsg
=
(
SRpcMsg
*
)
connection
;
SQueryContinueReq
*
req
=
(
SQueryContinueReq
*
)
rpcMallocCont
(
sizeof
(
SQueryContinueReq
));
...
...
@@ -366,25 +330,6 @@ int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
return
TSDB_CODE_SUCCESS
;
}
int32_t
qWorkerProcessDataSinkMsg
(
void
*
node
,
void
*
qWorkerMgmt
,
SRpcMsg
*
pMsg
){
if
(
NULL
==
node
||
NULL
==
qWorkerMgmt
||
NULL
==
pMsg
)
{
return
TSDB_CODE_QRY_INVALID_INPUT
;
}
SSinkDataReq
*
msg
=
pMsg
->
pCont
;
if
(
NULL
==
msg
||
pMsg
->
contLen
<
sizeof
(
*
msg
))
{
qError
(
"invalid sink data msg"
);
QW_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
//dsScheduleProcess();
//TODO
return
TSDB_CODE_SUCCESS
;
}
int32_t
qWorkerProcessReadyMsg
(
void
*
node
,
void
*
qWorkerMgmt
,
SRpcMsg
*
pMsg
){
if
(
NULL
==
node
||
NULL
==
qWorkerMgmt
||
NULL
==
pMsg
)
{
return
TSDB_CODE_QRY_INVALID_INPUT
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录