Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
6e465ebd
T
TDengine
项目概览
taosdata
/
TDengine
大约 1 年 前同步成功
通知
1184
Star
22015
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
6e465ebd
编写于
5月 27, 2022
作者:
D
dapan1121
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
optimize query msg
上级
f6e6554b
变更
20
展开全部
显示空白变更内容
内联
并排
Showing
20 changed file
with
698 addition
and
1367 deletion
+698
-1367
include/common/tmsg.h
include/common/tmsg.h
+3
-0
include/common/tmsgdef.h
include/common/tmsgdef.h
+0
-2
include/libs/qworker/qworker.h
include/libs/qworker/qworker.h
+0
-10
source/client/src/clientImpl.c
source/client/src/clientImpl.c
+0
-5
source/common/src/tmsg.c
source/common/src/tmsg.c
+0
-25
source/dnode/mgmt/mgmt_qnode/src/qmHandle.c
source/dnode/mgmt/mgmt_qnode/src/qmHandle.c
+0
-2
source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
+0
-2
source/dnode/qnode/src/qnode.c
source/dnode/qnode/src/qnode.c
+0
-9
source/dnode/vnode/src/vnd/vnodeSvr.c
source/dnode/vnode/src/vnd/vnodeSvr.c
+0
-4
source/libs/qworker/inc/qwInt.h
source/libs/qworker/inc/qwInt.h
+22
-2
source/libs/qworker/inc/qwMsg.h
source/libs/qworker/inc/qwMsg.h
+2
-4
source/libs/qworker/src/qwDbg.c
source/libs/qworker/src/qwDbg.c
+128
-0
source/libs/qworker/src/qwMsg.c
source/libs/qworker/src/qwMsg.c
+5
-174
source/libs/qworker/src/qwUtil.c
source/libs/qworker/src/qwUtil.c
+502
-0
source/libs/qworker/src/qworker.c
source/libs/qworker/src/qworker.c
+28
-852
source/libs/qworker/test/qworkerTests.cpp
source/libs/qworker/test/qworkerTests.cpp
+1
-109
source/libs/scheduler/inc/schedulerInt.h
source/libs/scheduler/inc/schedulerInt.h
+1
-1
source/libs/scheduler/src/schJob.c
source/libs/scheduler/src/schJob.c
+1
-1
source/libs/scheduler/src/schRemote.c
source/libs/scheduler/src/schRemote.c
+4
-76
source/libs/scheduler/test/schedulerTests.cpp
source/libs/scheduler/test/schedulerTests.cpp
+1
-89
未找到文件。
include/common/tmsg.h
浏览文件 @
6e465ebd
...
...
@@ -656,6 +656,9 @@ typedef struct {
typedef
struct
{
int32_t
code
;
char
tbFName
[
TSDB_TABLE_FNAME_LEN
];
int32_t
sversion
;
int32_t
tversion
;
}
SQueryTableRsp
;
int32_t
tSerializeSQueryTableRsp
(
void
*
buf
,
int32_t
bufLen
,
SQueryTableRsp
*
pRsp
);
...
...
include/common/tmsgdef.h
浏览文件 @
6e465ebd
...
...
@@ -181,8 +181,6 @@ enum {
TD_DEF_MSG_TYPE
(
TDMT_VND_MQ_DISCONNECT
,
"vnode-mq-disconnect"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_MQ_VG_CHANGE
,
"vnode-mq-vg-change"
,
SMqRebVgReq
,
SMqRebVgRsp
)
TD_DEF_MSG_TYPE
(
TDMT_VND_MQ_VG_DELETE
,
"vnode-mq-vg-delete"
,
SMqVDeleteReq
,
SMqVDeleteRsp
)
TD_DEF_MSG_TYPE
(
TDMT_VND_RES_READY
,
"vnode-res-ready"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_TASKS_STATUS
,
"vnode-tasks-status"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_CANCEL_TASK
,
"vnode-cancel-task"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_DROP_TASK
,
"vnode-drop-task"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_CREATE_TOPIC
,
"vnode-create-topic"
,
NULL
,
NULL
)
...
...
include/libs/qworker/qworker.h
浏览文件 @
6e465ebd
...
...
@@ -56,12 +56,6 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg);
int32_t
qWorkerProcessCQueryMsg
(
void
*
node
,
void
*
qWorkerMgmt
,
SRpcMsg
*
pMsg
);
int32_t
qWorkerProcessDataSinkMsg
(
void
*
node
,
void
*
qWorkerMgmt
,
SRpcMsg
*
pMsg
);
int32_t
qWorkerProcessReadyMsg
(
void
*
node
,
void
*
qWorkerMgmt
,
SRpcMsg
*
pMsg
);
int32_t
qWorkerProcessStatusMsg
(
void
*
node
,
void
*
qWorkerMgmt
,
SRpcMsg
*
pMsg
);
int32_t
qWorkerProcessFetchMsg
(
void
*
node
,
void
*
qWorkerMgmt
,
SRpcMsg
*
pMsg
);
int32_t
qWorkerProcessFetchRsp
(
void
*
node
,
void
*
qWorkerMgmt
,
SRpcMsg
*
pMsg
);
...
...
@@ -72,10 +66,6 @@ int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg);
int32_t
qWorkerProcessHbMsg
(
void
*
node
,
void
*
qWorkerMgmt
,
SRpcMsg
*
pMsg
);
int32_t
qWorkerProcessShowMsg
(
void
*
node
,
void
*
qWorkerMgmt
,
SRpcMsg
*
pMsg
);
int32_t
qWorkerProcessShowFetchMsg
(
void
*
node
,
void
*
qWorkerMgmt
,
SRpcMsg
*
pMsg
);
void
qWorkerDestroy
(
void
**
qWorkerMgmt
);
#ifdef __cplusplus
...
...
source/client/src/clientImpl.c
浏览文件 @
6e465ebd
...
...
@@ -730,11 +730,6 @@ static void destroySendMsgInfo(SMsgSendInfo* pMsgBody) {
taosMemoryFreeClear
(
pMsgBody
);
}
bool
persistConnForSpecificMsg
(
void
*
parenct
,
tmsg_t
msgType
)
{
return
msgType
==
TDMT_VND_QUERY_RSP
||
msgType
==
TDMT_VND_FETCH_RSP
||
msgType
==
TDMT_VND_RES_READY_RSP
||
msgType
==
TDMT_VND_QUERY_HEARTBEAT_RSP
;
}
void
processMsgFromServer
(
void
*
parent
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
)
{
SMsgSendInfo
*
pSendInfo
=
(
SMsgSendInfo
*
)
pMsg
->
info
.
ahandle
;
assert
(
pMsg
->
info
.
ahandle
!=
NULL
);
...
...
source/common/src/tmsg.c
浏览文件 @
6e465ebd
...
...
@@ -3507,31 +3507,6 @@ int32_t tDeserializeSSchedulerHbRsp(void *buf, int32_t bufLen, SSchedulerHbRsp *
void
tFreeSSchedulerHbRsp
(
SSchedulerHbRsp
*
pRsp
)
{
taosArrayDestroy
(
pRsp
->
taskStatus
);
}
int32_t
tSerializeSQueryTableRsp
(
void
*
buf
,
int32_t
bufLen
,
SQueryTableRsp
*
pRsp
)
{
SEncoder
encoder
=
{
0
};
tEncoderInit
(
&
encoder
,
buf
,
bufLen
);
if
(
tStartEncode
(
&
encoder
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pRsp
->
code
)
<
0
)
return
-
1
;
tEndEncode
(
&
encoder
);
int32_t
tlen
=
encoder
.
pos
;
tEncoderClear
(
&
encoder
);
return
tlen
;
}
int32_t
tDeserializeSQueryTableRsp
(
void
*
buf
,
int32_t
bufLen
,
SQueryTableRsp
*
pRsp
)
{
SDecoder
decoder
=
{
0
};
tDecoderInit
(
&
decoder
,
buf
,
bufLen
);
if
(
tStartDecode
(
&
decoder
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pRsp
->
code
)
<
0
)
return
-
1
;
tEndDecode
(
&
decoder
);
tDecoderClear
(
&
decoder
);
return
0
;
}
int32_t
tSerializeSVCreateTbBatchRsp
(
void
*
buf
,
int32_t
bufLen
,
SVCreateTbBatchRsp
*
pRsp
)
{
// SEncoder encoder = {0};
// tEncoderInit(&encoder, buf, bufLen);
...
...
source/dnode/mgmt/mgmt_qnode/src/qmHandle.c
浏览文件 @
6e465ebd
...
...
@@ -101,8 +101,6 @@ SArray *qmGetMsgHandles() {
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_FETCH_RSP
,
qmPutNodeMsgToFetchQueue
,
1
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_QUERY_HEARTBEAT
,
qmPutNodeMsgToFetchQueue
,
1
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_RES_READY
,
qmPutNodeMsgToFetchQueue
,
1
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_TASKS_STATUS
,
qmPutNodeMsgToFetchQueue
,
1
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_CANCEL_TASK
,
qmPutNodeMsgToFetchQueue
,
1
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_DROP_TASK
,
qmPutNodeMsgToFetchQueue
,
1
)
==
NULL
)
goto
_OVER
;
...
...
source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
浏览文件 @
6e465ebd
...
...
@@ -292,8 +292,6 @@ SArray *vmGetMsgHandles() {
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_MQ_CONNECT
,
vmPutNodeMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_MQ_DISCONNECT
,
vmPutNodeMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
// if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_SET_CUR, vmPutNodeMsgToWriteQueue, 0)== NULL) goto _OVER;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_RES_READY
,
vmPutNodeMsgToFetchQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_TASKS_STATUS
,
vmPutNodeMsgToFetchQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_CANCEL_TASK
,
vmPutNodeMsgToFetchQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_DROP_TASK
,
vmPutNodeMsgToFetchQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_CREATE_STB
,
vmPutNodeMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
...
...
source/dnode/qnode/src/qnode.c
浏览文件 @
6e465ebd
...
...
@@ -60,21 +60,12 @@ int32_t qndProcessQueryMsg(SQnode *pQnode, SRpcMsg *pMsg) {
case
TDMT_VND_FETCH_RSP
:
code
=
qWorkerProcessFetchRsp
(
pQnode
,
pQnode
->
pQuery
,
pMsg
);
break
;
case
TDMT_VND_RES_READY
:
code
=
qWorkerProcessReadyMsg
(
pQnode
,
pQnode
->
pQuery
,
pMsg
);
break
;
case
TDMT_VND_TASKS_STATUS
:
code
=
qWorkerProcessStatusMsg
(
pQnode
,
pQnode
->
pQuery
,
pMsg
);
break
;
case
TDMT_VND_CANCEL_TASK
:
code
=
qWorkerProcessCancelMsg
(
pQnode
,
pQnode
->
pQuery
,
pMsg
);
break
;
case
TDMT_VND_DROP_TASK
:
code
=
qWorkerProcessDropMsg
(
pQnode
,
pQnode
->
pQuery
,
pMsg
);
break
;
case
TDMT_VND_TABLE_META
:
// code = vnodeGetTableMeta(pQnode, pMsg);
// break;
case
TDMT_VND_CONSUME
:
// code = tqProcessConsumeReq(pQnode->pTq, pMsg);
// break;
...
...
source/dnode/vnode/src/vnd/vnodeSvr.c
浏览文件 @
6e465ebd
...
...
@@ -201,10 +201,6 @@ int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
return
qWorkerProcessFetchMsg
(
pVnode
,
pVnode
->
pQuery
,
pMsg
);
case
TDMT_VND_FETCH_RSP
:
return
qWorkerProcessFetchRsp
(
pVnode
,
pVnode
->
pQuery
,
pMsg
);
case
TDMT_VND_RES_READY
:
return
qWorkerProcessReadyMsg
(
pVnode
,
pVnode
->
pQuery
,
pMsg
);
case
TDMT_VND_TASKS_STATUS
:
return
qWorkerProcessStatusMsg
(
pVnode
,
pVnode
->
pQuery
,
pMsg
);
case
TDMT_VND_CANCEL_TASK
:
return
qWorkerProcessCancelMsg
(
pVnode
,
pVnode
->
pQuery
,
pMsg
);
case
TDMT_VND_DROP_TASK
:
...
...
source/libs/qworker/inc/qw
orker
Int.h
→
source/libs/qworker/inc/qwInt.h
浏览文件 @
6e465ebd
...
...
@@ -26,7 +26,7 @@ extern "C" {
#include "ttimer.h"
#include "tref.h"
#include "plannodes.h"
#include "executor.h"
#include "trpc.h"
#define QW_DEFAULT_SCHEDULER_NUMBER 10000
...
...
@@ -76,6 +76,8 @@ typedef struct SQWDebug {
bool
dumpEnable
;
}
SQWDebug
;
extern
SQWDebug
gQWDebug
;
typedef
struct
SQWMsg
{
void
*
node
;
int32_t
code
;
...
...
@@ -303,9 +305,27 @@ typedef struct SQWorkerMgmt {
extern
SQWorkerMgmt
gQwMgmt
;
static
FORCE_INLINE
SQWorker
*
qwAcquire
(
int64_t
refId
)
{
return
(
SQWorker
*
)
taosAcquireRef
(
atomic_load_32
(
&
gQwMgmt
.
qwRef
),
refId
);
}
static
FORCE_INLINE
int32_t
qwRelease
(
int64_t
refId
)
{
return
taosReleaseRef
(
gQwMgmt
.
qwRef
,
refId
);
}
char
*
qwPhaseStr
(
int32_t
phase
);
char
*
qwBufStatusStr
(
int32_t
bufStatus
);
int32_t
qwAcquireAddScheduler
(
SQWorker
*
mgmt
,
uint64_t
sId
,
int32_t
rwType
,
SQWSchStatus
**
sch
);
void
qwReleaseScheduler
(
int32_t
rwType
,
SQWorker
*
mgmt
);
int32_t
qwAddTaskStatus
(
QW_FPARAMS_DEF
,
int32_t
status
);
int32_t
qwAcquireTaskCtx
(
QW_FPARAMS_DEF
,
SQWTaskCtx
**
ctx
);
int32_t
qwGetTaskCtx
(
QW_FPARAMS_DEF
,
SQWTaskCtx
**
ctx
);
int32_t
qwAddAcquireTaskCtx
(
QW_FPARAMS_DEF
,
SQWTaskCtx
**
ctx
);
void
qwReleaseTaskCtx
(
SQWorker
*
mgmt
,
void
*
ctx
);
int32_t
qwKillTaskHandle
(
QW_FPARAMS_DEF
,
SQWTaskCtx
*
ctx
);
int32_t
qwUpdateTaskStatus
(
QW_FPARAMS_DEF
,
int8_t
status
);
int32_t
qwDropTask
(
QW_FPARAMS_DEF
);
void
qwSaveTbVersionInfo
(
qTaskInfo_t
pTaskInfo
,
SQWTaskCtx
*
ctx
);
int32_t
qwOpenRef
(
void
);
void
qwSetHbParam
(
int64_t
refId
,
SQWHbParam
**
pParam
);
void
qwDbgDumpMgmtInfo
(
SQWorker
*
mgmt
);
int32_t
qwDbgValidateStatus
(
QW_FPARAMS_DEF
,
int8_t
oriStatus
,
int8_t
newStatus
,
bool
*
ignore
);
#ifdef __cplusplus
}
...
...
source/libs/qworker/inc/qw
orker
Msg.h
→
source/libs/qworker/inc/qwMsg.h
浏览文件 @
6e465ebd
...
...
@@ -20,7 +20,7 @@
extern
"C"
{
#endif
#include "qw
orker
Int.h"
#include "qwInt.h"
#include "dataSinkMgt.h"
int32_t
qwProcessQuery
(
QW_FPARAMS_DEF
,
SQWMsg
*
qwMsg
,
int8_t
taskType
,
int8_t
explain
);
...
...
@@ -36,12 +36,10 @@ int32_t qwBuildAndSendFetchRsp(SRpcHandleInfo *pConn, SRetrieveTableRsp *pRsp, i
int32_t
code
);
void
qwBuildFetchRsp
(
void
*
msg
,
SOutputData
*
input
,
int32_t
len
,
bool
qComplete
);
int32_t
qwBuildAndSendCQueryMsg
(
QW_FPARAMS_DEF
,
SRpcHandleInfo
*
pConn
);
int32_t
qwBuildAndSendReadyRsp
(
SRpcHandleInfo
*
pConn
,
int32_t
code
,
STbVerInfo
*
tbInfo
);
int32_t
qwBuildAndSendQueryRsp
(
SRpcHandleInfo
*
pConn
,
int32_t
code
);
int32_t
qwBuildAndSendQueryRsp
(
SRpcHandleInfo
*
pConn
,
int32_t
code
,
STbVerInfo
*
tbInfo
);
int32_t
qwBuildAndSendExplainRsp
(
SRpcHandleInfo
*
pConn
,
SExplainExecInfo
*
execInfo
,
int32_t
num
);
void
qwFreeFetchRsp
(
void
*
msg
);
int32_t
qwMallocFetchRsp
(
int32_t
length
,
SRetrieveTableRsp
**
rsp
);
int32_t
qwGetSchTasksStatus
(
SQWorker
*
mgmt
,
uint64_t
sId
,
SSchedulerStatusRsp
**
rsp
);
int32_t
qwBuildAndSendHbRsp
(
SRpcHandleInfo
*
pConn
,
SSchedulerHbRsp
*
rsp
,
int32_t
code
);
int32_t
qwRegisterQueryBrokenLinkArg
(
QW_FPARAMS_DEF
,
SRpcHandleInfo
*
pConn
);
int32_t
qwRegisterHbBrokenLinkArg
(
SQWorker
*
mgmt
,
uint64_t
sId
,
SRpcHandleInfo
*
pConn
);
...
...
source/libs/qworker/src/qwDbg.c
0 → 100644
浏览文件 @
6e465ebd
#include "qworker.h"
#include "dataSinkMgt.h"
#include "executor.h"
#include "planner.h"
#include "query.h"
#include "qwInt.h"
#include "qwMsg.h"
#include "tcommon.h"
#include "tmsg.h"
#include "tname.h"
SQWDebug
gQWDebug
=
{.
statusEnable
=
true
,
.
dumpEnable
=
true
};
int32_t
qwDbgValidateStatus
(
QW_FPARAMS_DEF
,
int8_t
oriStatus
,
int8_t
newStatus
,
bool
*
ignore
)
{
if
(
!
gQWDebug
.
statusEnable
)
{
return
TSDB_CODE_SUCCESS
;
}
int32_t
code
=
0
;
if
(
oriStatus
==
newStatus
)
{
if
(
newStatus
==
JOB_TASK_STATUS_EXECUTING
||
newStatus
==
JOB_TASK_STATUS_FAILED
)
{
*
ignore
=
true
;
return
TSDB_CODE_SUCCESS
;
}
QW_ERR_JRET
(
TSDB_CODE_QRY_APP_ERROR
);
}
switch
(
oriStatus
)
{
case
JOB_TASK_STATUS_NULL
:
if
(
newStatus
!=
JOB_TASK_STATUS_EXECUTING
&&
newStatus
!=
JOB_TASK_STATUS_FAILED
&&
newStatus
!=
JOB_TASK_STATUS_NOT_START
)
{
QW_ERR_JRET
(
TSDB_CODE_QRY_APP_ERROR
);
}
break
;
case
JOB_TASK_STATUS_NOT_START
:
if
(
newStatus
!=
JOB_TASK_STATUS_CANCELLED
)
{
QW_ERR_JRET
(
TSDB_CODE_QRY_APP_ERROR
);
}
break
;
case
JOB_TASK_STATUS_EXECUTING
:
if
(
newStatus
!=
JOB_TASK_STATUS_PARTIAL_SUCCEED
&&
newStatus
!=
JOB_TASK_STATUS_SUCCEED
&&
newStatus
!=
JOB_TASK_STATUS_FAILED
&&
newStatus
!=
JOB_TASK_STATUS_CANCELLING
&&
newStatus
!=
JOB_TASK_STATUS_CANCELLED
&&
newStatus
!=
JOB_TASK_STATUS_DROPPING
)
{
QW_ERR_JRET
(
TSDB_CODE_QRY_APP_ERROR
);
}
break
;
case
JOB_TASK_STATUS_PARTIAL_SUCCEED
:
if
(
newStatus
!=
JOB_TASK_STATUS_EXECUTING
&&
newStatus
!=
JOB_TASK_STATUS_SUCCEED
&&
newStatus
!=
JOB_TASK_STATUS_CANCELLED
&&
newStatus
!=
JOB_TASK_STATUS_FAILED
&&
newStatus
!=
JOB_TASK_STATUS_DROPPING
)
{
QW_ERR_JRET
(
TSDB_CODE_QRY_APP_ERROR
);
}
break
;
case
JOB_TASK_STATUS_SUCCEED
:
if
(
newStatus
!=
JOB_TASK_STATUS_CANCELLED
&&
newStatus
!=
JOB_TASK_STATUS_DROPPING
&&
newStatus
!=
JOB_TASK_STATUS_FAILED
)
{
QW_ERR_JRET
(
TSDB_CODE_QRY_APP_ERROR
);
}
break
;
case
JOB_TASK_STATUS_FAILED
:
if
(
newStatus
!=
JOB_TASK_STATUS_CANCELLED
&&
newStatus
!=
JOB_TASK_STATUS_DROPPING
)
{
QW_ERR_JRET
(
TSDB_CODE_QRY_APP_ERROR
);
}
break
;
case
JOB_TASK_STATUS_CANCELLING
:
if
(
newStatus
!=
JOB_TASK_STATUS_CANCELLED
)
{
QW_ERR_JRET
(
TSDB_CODE_QRY_APP_ERROR
);
}
break
;
case
JOB_TASK_STATUS_CANCELLED
:
case
JOB_TASK_STATUS_DROPPING
:
if
(
newStatus
!=
JOB_TASK_STATUS_FAILED
&&
newStatus
!=
JOB_TASK_STATUS_PARTIAL_SUCCEED
)
{
QW_ERR_JRET
(
TSDB_CODE_QRY_APP_ERROR
);
}
break
;
default:
QW_TASK_ELOG
(
"invalid task origStatus:%s"
,
jobTaskStatusStr
(
oriStatus
));
return
TSDB_CODE_QRY_APP_ERROR
;
}
return
TSDB_CODE_SUCCESS
;
_return:
QW_TASK_ELOG
(
"invalid task status update from %s to %s"
,
jobTaskStatusStr
(
oriStatus
),
jobTaskStatusStr
(
newStatus
));
QW_RET
(
code
);
}
void
qwDbgDumpSchInfo
(
SQWSchStatus
*
sch
,
int32_t
i
)
{}
void
qwDbgDumpMgmtInfo
(
SQWorker
*
mgmt
)
{
if
(
!
gQWDebug
.
dumpEnable
)
{
return
;
}
QW_LOCK
(
QW_READ
,
&
mgmt
->
schLock
);
/*QW_DUMP("total remain schduler num:%d", taosHashGetSize(mgmt->schHash));*/
void
*
key
=
NULL
;
size_t
keyLen
=
0
;
int32_t
i
=
0
;
SQWSchStatus
*
sch
=
NULL
;
void
*
pIter
=
taosHashIterate
(
mgmt
->
schHash
,
NULL
);
while
(
pIter
)
{
sch
=
(
SQWSchStatus
*
)
pIter
;
qwDbgDumpSchInfo
(
sch
,
i
);
++
i
;
pIter
=
taosHashIterate
(
mgmt
->
schHash
,
pIter
);
}
QW_UNLOCK
(
QW_READ
,
&
mgmt
->
schLock
);
/*QW_DUMP("total remain ctx num:%d", taosHashGetSize(mgmt->ctxHash));*/
}
source/libs/qworker/src/qw
orker
Msg.c
→
source/libs/qworker/src/qwMsg.c
浏览文件 @
6e465ebd
#include "qw
orker
Msg.h"
#include "qwMsg.h"
#include "dataSinkMgt.h"
#include "executor.h"
#include "planner.h"
#include "query.h"
#include "qworker.h"
#include "qw
orker
Int.h"
#include "qwInt.h"
#include "tcommon.h"
#include "tmsg.h"
#include "tname.h"
...
...
@@ -43,28 +43,8 @@ void qwFreeFetchRsp(void *msg) {
}
}
int32_t
qwBuildAndSendQueryRsp
(
SRpcHandleInfo
*
pConn
,
int32_t
code
)
{
SQueryTableRsp
rsp
=
{.
code
=
code
};
int32_t
contLen
=
tSerializeSQueryTableRsp
(
NULL
,
0
,
&
rsp
);
void
*
msg
=
rpcMallocCont
(
contLen
);
tSerializeSQueryTableRsp
(
msg
,
contLen
,
&
rsp
);
SRpcMsg
rpcRsp
=
{
.
msgType
=
TDMT_VND_QUERY_RSP
,
.
pCont
=
msg
,
.
contLen
=
contLen
,
.
code
=
code
,
.
info
=
*
pConn
,
};
tmsgSendRsp
(
&
rpcRsp
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
qwBuildAndSendReadyRsp
(
SRpcHandleInfo
*
pConn
,
int32_t
code
,
STbVerInfo
*
tbInfo
)
{
SResReadyRsp
*
pRsp
=
(
SResReadyRsp
*
)
rpcMallocCont
(
sizeof
(
SResReadyRsp
));
int32_t
qwBuildAndSendQueryRsp
(
SRpcHandleInfo
*
pConn
,
int32_t
code
,
STbVerInfo
*
tbInfo
)
{
SQueryTableRsp
*
pRsp
=
(
SQueryTableRsp
*
)
rpcMallocCont
(
sizeof
(
SQueryTableRsp
));
pRsp
->
code
=
code
;
if
(
tbInfo
)
{
strcpy
(
pRsp
->
tbFName
,
tbInfo
->
tbFName
);
...
...
@@ -73,13 +53,12 @@ int32_t qwBuildAndSendReadyRsp(SRpcHandleInfo *pConn, int32_t code, STbVerInfo*
}
SRpcMsg
rpcRsp
=
{
.
msgType
=
TDMT_VND_
RES_READ
Y_RSP
,
.
msgType
=
TDMT_VND_
QUER
Y_RSP
,
.
pCont
=
pRsp
,
.
contLen
=
sizeof
(
*
pRsp
),
.
code
=
code
,
.
info
=
*
pConn
,
};
rpcRsp
.
info
.
ahandle
=
NULL
;
tmsgSendRsp
(
&
rpcRsp
);
...
...
@@ -177,76 +156,6 @@ int32_t qwBuildAndSendDropRsp(SRpcHandleInfo *pConn, int32_t code) {
return
TSDB_CODE_SUCCESS
;
}
int32_t
qwBuildAndSendShowRsp
(
SRpcMsg
*
pMsg
,
int32_t
code
)
{
int32_t
numOfCols
=
6
;
SVShowTablesRsp
showRsp
=
{
0
};
// showRsp.showId = 1;
showRsp
.
tableMeta
.
pSchemas
=
taosMemoryCalloc
(
numOfCols
,
sizeof
(
SSchema
));
if
(
showRsp
.
tableMeta
.
pSchemas
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
col_id_t
cols
=
0
;
SSchema
*
pSchema
=
showRsp
.
tableMeta
.
pSchemas
;
const
SSchema
*
s
=
tGetTbnameColumnSchema
();
*
pSchema
=
createSchema
(
s
->
type
,
s
->
bytes
,
++
cols
,
"name"
);
pSchema
++
;
int32_t
type
=
TSDB_DATA_TYPE_TIMESTAMP
;
*
pSchema
=
createSchema
(
type
,
tDataTypes
[
type
].
bytes
,
++
cols
,
"created"
);
pSchema
++
;
type
=
TSDB_DATA_TYPE_SMALLINT
;
*
pSchema
=
createSchema
(
type
,
tDataTypes
[
type
].
bytes
,
++
cols
,
"columns"
);
pSchema
++
;
*
pSchema
=
createSchema
(
s
->
type
,
s
->
bytes
,
++
cols
,
"stable"
);
pSchema
++
;
type
=
TSDB_DATA_TYPE_BIGINT
;
*
pSchema
=
createSchema
(
type
,
tDataTypes
[
type
].
bytes
,
++
cols
,
"uid"
);
pSchema
++
;
type
=
TSDB_DATA_TYPE_INT
;
*
pSchema
=
createSchema
(
type
,
tDataTypes
[
type
].
bytes
,
++
cols
,
"vgId"
);
assert
(
cols
==
numOfCols
);
showRsp
.
tableMeta
.
numOfColumns
=
cols
;
int32_t
bufLen
=
tSerializeSShowRsp
(
NULL
,
0
,
&
showRsp
);
void
*
pBuf
=
rpcMallocCont
(
bufLen
);
tSerializeSShowRsp
(
pBuf
,
bufLen
,
&
showRsp
);
SRpcMsg
rpcMsg
=
{
.
info
=
pMsg
->
info
,
.
pCont
=
pBuf
,
.
contLen
=
bufLen
,
.
code
=
code
,
};
tmsgSendRsp
(
&
rpcMsg
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
qwBuildAndSendShowFetchRsp
(
SRpcMsg
*
pMsg
,
SVShowTablesFetchReq
*
pFetchReq
)
{
SVShowTablesFetchRsp
*
pRsp
=
(
SVShowTablesFetchRsp
*
)
rpcMallocCont
(
sizeof
(
SVShowTablesFetchRsp
));
int32_t
handle
=
htonl
(
pFetchReq
->
id
);
pRsp
->
numOfRows
=
0
;
SRpcMsg
rpcMsg
=
{
.
info
=
pMsg
->
info
,
.
pCont
=
pRsp
,
.
contLen
=
sizeof
(
*
pRsp
),
.
code
=
0
,
};
tmsgSendRsp
(
&
rpcMsg
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
qwBuildAndSendCQueryMsg
(
QW_FPARAMS_DEF
,
SRpcHandleInfo
*
pConn
)
{
SQueryContinueReq
*
req
=
(
SQueryContinueReq
*
)
rpcMallocCont
(
sizeof
(
SQueryContinueReq
));
if
(
NULL
==
req
)
{
...
...
@@ -407,65 +316,6 @@ int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
return
TSDB_CODE_SUCCESS
;
}
int32_t
qWorkerProcessReadyMsg
(
void
*
node
,
void
*
qWorkerMgmt
,
SRpcMsg
*
pMsg
)
{
if
(
NULL
==
node
||
NULL
==
qWorkerMgmt
||
NULL
==
pMsg
)
{
return
TSDB_CODE_QRY_INVALID_INPUT
;
}
SQWorker
*
mgmt
=
(
SQWorker
*
)
qWorkerMgmt
;
SResReadyReq
*
msg
=
pMsg
->
pCont
;
if
(
NULL
==
msg
||
pMsg
->
contLen
<
sizeof
(
*
msg
))
{
QW_ELOG
(
"invalid task ready msg, msg:%p, msgLen:%d"
,
msg
,
pMsg
->
contLen
);
QW_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
msg
->
sId
=
be64toh
(
msg
->
sId
);
msg
->
queryId
=
be64toh
(
msg
->
queryId
);
msg
->
taskId
=
be64toh
(
msg
->
taskId
);
uint64_t
sId
=
msg
->
sId
;
uint64_t
qId
=
msg
->
queryId
;
uint64_t
tId
=
msg
->
taskId
;
int64_t
rId
=
0
;
SQWMsg
qwMsg
=
{.
node
=
node
,
.
msg
=
NULL
,
.
msgLen
=
0
,
.
connInfo
=
pMsg
->
info
};
QW_SCH_TASK_DLOG
(
"processReady start, node:%p, handle:%p"
,
node
,
pMsg
->
info
.
handle
);
QW_ERR_RET
(
qwProcessReady
(
QW_FPARAMS
(),
&
qwMsg
));
QW_SCH_TASK_DLOG
(
"processReady end, node:%p"
,
node
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
qWorkerProcessStatusMsg
(
void
*
node
,
void
*
qWorkerMgmt
,
SRpcMsg
*
pMsg
)
{
if
(
NULL
==
node
||
NULL
==
qWorkerMgmt
||
NULL
==
pMsg
)
{
return
TSDB_CODE_QRY_INVALID_INPUT
;
}
int32_t
code
=
0
;
SSchTasksStatusReq
*
msg
=
pMsg
->
pCont
;
if
(
NULL
==
msg
||
pMsg
->
contLen
<
sizeof
(
*
msg
))
{
qError
(
"invalid task status msg"
);
QW_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
SQWorker
*
mgmt
=
(
SQWorker
*
)
qWorkerMgmt
;
msg
->
sId
=
htobe64
(
msg
->
sId
);
uint64_t
sId
=
msg
->
sId
;
SSchedulerStatusRsp
*
sStatus
=
NULL
;
// QW_ERR_JRET(qwGetSchTasksStatus(qWorkerMgmt, msg->sId, &sStatus));
_return:
// QW_ERR_RET(qwBuildAndSendStatusRsp(pMsg, sStatus));
return
TSDB_CODE_SUCCESS
;
}
int32_t
qWorkerProcessFetchMsg
(
void
*
node
,
void
*
qWorkerMgmt
,
SRpcMsg
*
pMsg
)
{
if
(
NULL
==
node
||
NULL
==
qWorkerMgmt
||
NULL
==
pMsg
)
{
return
TSDB_CODE_QRY_INVALID_INPUT
;
...
...
@@ -613,22 +463,3 @@ int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
return
TSDB_CODE_SUCCESS
;
}
int32_t
qWorkerProcessShowMsg
(
void
*
node
,
void
*
qWorkerMgmt
,
SRpcMsg
*
pMsg
)
{
if
(
NULL
==
node
||
NULL
==
qWorkerMgmt
||
NULL
==
pMsg
)
{
return
TSDB_CODE_QRY_INVALID_INPUT
;
}
int32_t
code
=
0
;
SVShowTablesReq
*
pReq
=
pMsg
->
pCont
;
QW_RET
(
qwBuildAndSendShowRsp
(
pMsg
,
code
));
}
int32_t
qWorkerProcessShowFetchMsg
(
void
*
node
,
void
*
qWorkerMgmt
,
SRpcMsg
*
pMsg
)
{
if
(
NULL
==
node
||
NULL
==
qWorkerMgmt
||
NULL
==
pMsg
)
{
return
TSDB_CODE_QRY_INVALID_INPUT
;
}
SVShowTablesFetchReq
*
pFetchReq
=
pMsg
->
pCont
;
QW_RET
(
qwBuildAndSendShowFetchRsp
(
pMsg
,
pFetchReq
));
}
source/libs/qworker/src/qwUtil.c
0 → 100644
浏览文件 @
6e465ebd
#include "qworker.h"
#include "dataSinkMgt.h"
#include "executor.h"
#include "planner.h"
#include "query.h"
#include "qwInt.h"
#include "qwMsg.h"
#include "tcommon.h"
#include "tmsg.h"
#include "tname.h"
char
*
qwPhaseStr
(
int32_t
phase
)
{
switch
(
phase
)
{
case
QW_PHASE_PRE_QUERY
:
return
"PRE_QUERY"
;
case
QW_PHASE_POST_QUERY
:
return
"POST_QUERY"
;
case
QW_PHASE_PRE_FETCH
:
return
"PRE_FETCH"
;
case
QW_PHASE_POST_FETCH
:
return
"POST_FETCH"
;
case
QW_PHASE_PRE_CQUERY
:
return
"PRE_CQUERY"
;
case
QW_PHASE_POST_CQUERY
:
return
"POST_CQUERY"
;
default:
break
;
}
return
"UNKNOWN"
;
}
char
*
qwBufStatusStr
(
int32_t
bufStatus
)
{
switch
(
bufStatus
)
{
case
DS_BUF_LOW
:
return
"LOW"
;
case
DS_BUF_FULL
:
return
"FULL"
;
case
DS_BUF_EMPTY
:
return
"EMPTY"
;
default:
break
;
}
return
"UNKNOWN"
;
}
int32_t
qwSetTaskStatus
(
QW_FPARAMS_DEF
,
SQWTaskStatus
*
task
,
int8_t
status
)
{
int32_t
code
=
0
;
int8_t
origStatus
=
0
;
bool
ignore
=
false
;
while
(
true
)
{
origStatus
=
atomic_load_8
(
&
task
->
status
);
QW_ERR_RET
(
qwDbgValidateStatus
(
QW_FPARAMS
(),
origStatus
,
status
,
&
ignore
));
if
(
ignore
)
{
break
;
}
if
(
origStatus
!=
atomic_val_compare_exchange_8
(
&
task
->
status
,
origStatus
,
status
))
{
continue
;
}
QW_TASK_DLOG
(
"task status updated from %s to %s"
,
jobTaskStatusStr
(
origStatus
),
jobTaskStatusStr
(
status
));
break
;
}
return
TSDB_CODE_SUCCESS
;
}
int32_t
qwAddSchedulerImpl
(
SQWorker
*
mgmt
,
uint64_t
sId
,
int32_t
rwType
)
{
SQWSchStatus
newSch
=
{
0
};
newSch
.
tasksHash
=
taosHashInit
(
mgmt
->
cfg
.
maxSchTaskNum
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
false
,
HASH_NO_LOCK
);
if
(
NULL
==
newSch
.
tasksHash
)
{
QW_SCH_ELOG
(
"taosHashInit %d failed"
,
mgmt
->
cfg
.
maxSchTaskNum
);
QW_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
QW_LOCK
(
QW_WRITE
,
&
mgmt
->
schLock
);
int32_t
code
=
taosHashPut
(
mgmt
->
schHash
,
&
sId
,
sizeof
(
sId
),
&
newSch
,
sizeof
(
newSch
));
if
(
0
!=
code
)
{
if
(
!
HASH_NODE_EXIST
(
code
))
{
QW_UNLOCK
(
QW_WRITE
,
&
mgmt
->
schLock
);
QW_SCH_ELOG
(
"taosHashPut new sch to scheduleHash failed, errno:%d"
,
errno
);
taosHashCleanup
(
newSch
.
tasksHash
);
QW_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
taosHashCleanup
(
newSch
.
tasksHash
);
}
QW_UNLOCK
(
QW_WRITE
,
&
mgmt
->
schLock
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
qwAcquireSchedulerImpl
(
SQWorker
*
mgmt
,
uint64_t
sId
,
int32_t
rwType
,
SQWSchStatus
**
sch
,
int32_t
nOpt
)
{
while
(
true
)
{
QW_LOCK
(
rwType
,
&
mgmt
->
schLock
);
*
sch
=
taosHashGet
(
mgmt
->
schHash
,
&
sId
,
sizeof
(
sId
));
if
(
NULL
==
(
*
sch
))
{
QW_UNLOCK
(
rwType
,
&
mgmt
->
schLock
);
if
(
QW_NOT_EXIST_ADD
==
nOpt
)
{
QW_ERR_RET
(
qwAddSchedulerImpl
(
mgmt
,
sId
,
rwType
));
nOpt
=
QW_NOT_EXIST_RET_ERR
;
continue
;
}
else
if
(
QW_NOT_EXIST_RET_ERR
==
nOpt
)
{
QW_RET
(
TSDB_CODE_QRY_SCH_NOT_EXIST
);
}
else
{
QW_SCH_ELOG
(
"unknown notExistOpt:%d"
,
nOpt
);
QW_ERR_RET
(
TSDB_CODE_QRY_APP_ERROR
);
}
}
break
;
}
return
TSDB_CODE_SUCCESS
;
}
int32_t
qwAcquireAddScheduler
(
SQWorker
*
mgmt
,
uint64_t
sId
,
int32_t
rwType
,
SQWSchStatus
**
sch
)
{
return
qwAcquireSchedulerImpl
(
mgmt
,
sId
,
rwType
,
sch
,
QW_NOT_EXIST_ADD
);
}
int32_t
qwAcquireScheduler
(
SQWorker
*
mgmt
,
uint64_t
sId
,
int32_t
rwType
,
SQWSchStatus
**
sch
)
{
return
qwAcquireSchedulerImpl
(
mgmt
,
sId
,
rwType
,
sch
,
QW_NOT_EXIST_RET_ERR
);
}
void
qwReleaseScheduler
(
int32_t
rwType
,
SQWorker
*
mgmt
)
{
QW_UNLOCK
(
rwType
,
&
mgmt
->
schLock
);
}
int32_t
qwAcquireTaskStatus
(
QW_FPARAMS_DEF
,
int32_t
rwType
,
SQWSchStatus
*
sch
,
SQWTaskStatus
**
task
)
{
char
id
[
sizeof
(
qId
)
+
sizeof
(
tId
)]
=
{
0
};
QW_SET_QTID
(
id
,
qId
,
tId
);
QW_LOCK
(
rwType
,
&
sch
->
tasksLock
);
*
task
=
taosHashGet
(
sch
->
tasksHash
,
id
,
sizeof
(
id
));
if
(
NULL
==
(
*
task
))
{
QW_UNLOCK
(
rwType
,
&
sch
->
tasksLock
);
QW_ERR_RET
(
TSDB_CODE_QRY_TASK_NOT_EXIST
);
}
return
TSDB_CODE_SUCCESS
;
}
int32_t
qwAddTaskStatusImpl
(
QW_FPARAMS_DEF
,
SQWSchStatus
*
sch
,
int32_t
rwType
,
int32_t
status
,
SQWTaskStatus
**
task
)
{
int32_t
code
=
0
;
char
id
[
sizeof
(
qId
)
+
sizeof
(
tId
)]
=
{
0
};
QW_SET_QTID
(
id
,
qId
,
tId
);
SQWTaskStatus
ntask
=
{
0
};
ntask
.
status
=
status
;
ntask
.
refId
=
rId
;
QW_LOCK
(
QW_WRITE
,
&
sch
->
tasksLock
);
code
=
taosHashPut
(
sch
->
tasksHash
,
id
,
sizeof
(
id
),
&
ntask
,
sizeof
(
ntask
));
if
(
0
!=
code
)
{
QW_UNLOCK
(
QW_WRITE
,
&
sch
->
tasksLock
);
if
(
HASH_NODE_EXIST
(
code
))
{
if
(
rwType
&&
task
)
{
QW_RET
(
qwAcquireTaskStatus
(
QW_FPARAMS
(),
rwType
,
sch
,
task
));
}
else
{
QW_TASK_ELOG
(
"task status already exist, newStatus:%s"
,
jobTaskStatusStr
(
status
));
QW_ERR_RET
(
TSDB_CODE_QRY_TASK_ALREADY_EXIST
);
}
}
else
{
QW_TASK_ELOG
(
"taosHashPut to tasksHash failed, error:%x - %s"
,
code
,
tstrerror
(
code
));
QW_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
}
QW_UNLOCK
(
QW_WRITE
,
&
sch
->
tasksLock
);
QW_TASK_DLOG
(
"task status added, newStatus:%s"
,
jobTaskStatusStr
(
status
));
if
(
rwType
&&
task
)
{
QW_ERR_RET
(
qwAcquireTaskStatus
(
QW_FPARAMS
(),
rwType
,
sch
,
task
));
}
return
TSDB_CODE_SUCCESS
;
}
int32_t
qwAddTaskStatus
(
QW_FPARAMS_DEF
,
int32_t
status
)
{
SQWSchStatus
*
tsch
=
NULL
;
int32_t
code
=
0
;
QW_ERR_RET
(
qwAcquireAddScheduler
(
mgmt
,
sId
,
QW_READ
,
&
tsch
));
QW_ERR_JRET
(
qwAddTaskStatusImpl
(
QW_FPARAMS
(),
tsch
,
0
,
status
,
NULL
));
_return:
qwReleaseScheduler
(
QW_READ
,
mgmt
);
QW_RET
(
code
);
}
int32_t
qwAddAcquireTaskStatus
(
QW_FPARAMS_DEF
,
int32_t
rwType
,
SQWSchStatus
*
sch
,
int32_t
status
,
SQWTaskStatus
**
task
)
{
return
qwAddTaskStatusImpl
(
QW_FPARAMS
(),
sch
,
rwType
,
status
,
task
);
}
void
qwReleaseTaskStatus
(
int32_t
rwType
,
SQWSchStatus
*
sch
)
{
QW_UNLOCK
(
rwType
,
&
sch
->
tasksLock
);
}
int32_t
qwAcquireTaskCtx
(
QW_FPARAMS_DEF
,
SQWTaskCtx
**
ctx
)
{
char
id
[
sizeof
(
qId
)
+
sizeof
(
tId
)]
=
{
0
};
QW_SET_QTID
(
id
,
qId
,
tId
);
*
ctx
=
taosHashAcquire
(
mgmt
->
ctxHash
,
id
,
sizeof
(
id
));
if
(
NULL
==
(
*
ctx
))
{
QW_TASK_DLOG_E
(
"task ctx not exist, may be dropped"
);
QW_ERR_RET
(
TSDB_CODE_QRY_TASK_CTX_NOT_EXIST
);
}
return
TSDB_CODE_SUCCESS
;
}
int32_t
qwGetTaskCtx
(
QW_FPARAMS_DEF
,
SQWTaskCtx
**
ctx
)
{
char
id
[
sizeof
(
qId
)
+
sizeof
(
tId
)]
=
{
0
};
QW_SET_QTID
(
id
,
qId
,
tId
);
*
ctx
=
taosHashGet
(
mgmt
->
ctxHash
,
id
,
sizeof
(
id
));
if
(
NULL
==
(
*
ctx
))
{
QW_TASK_DLOG_E
(
"task ctx not exist, may be dropped"
);
QW_ERR_RET
(
TSDB_CODE_QRY_TASK_CTX_NOT_EXIST
);
}
return
TSDB_CODE_SUCCESS
;
}
int32_t
qwAddTaskCtxImpl
(
QW_FPARAMS_DEF
,
bool
acquire
,
SQWTaskCtx
**
ctx
)
{
char
id
[
sizeof
(
qId
)
+
sizeof
(
tId
)]
=
{
0
};
QW_SET_QTID
(
id
,
qId
,
tId
);
SQWTaskCtx
nctx
=
{
0
};
int32_t
code
=
taosHashPut
(
mgmt
->
ctxHash
,
id
,
sizeof
(
id
),
&
nctx
,
sizeof
(
SQWTaskCtx
));
if
(
0
!=
code
)
{
if
(
HASH_NODE_EXIST
(
code
))
{
if
(
acquire
&&
ctx
)
{
QW_RET
(
qwAcquireTaskCtx
(
QW_FPARAMS
(),
ctx
));
}
else
if
(
ctx
)
{
QW_RET
(
qwGetTaskCtx
(
QW_FPARAMS
(),
ctx
));
}
else
{
QW_TASK_ELOG_E
(
"task ctx already exist"
);
QW_ERR_RET
(
TSDB_CODE_QRY_TASK_ALREADY_EXIST
);
}
}
else
{
QW_TASK_ELOG
(
"taosHashPut to ctxHash failed, error:%x"
,
code
);
QW_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
}
if
(
acquire
&&
ctx
)
{
QW_RET
(
qwAcquireTaskCtx
(
QW_FPARAMS
(),
ctx
));
}
else
if
(
ctx
)
{
QW_RET
(
qwGetTaskCtx
(
QW_FPARAMS
(),
ctx
));
}
return
TSDB_CODE_SUCCESS
;
}
int32_t
qwAddTaskCtx
(
QW_FPARAMS_DEF
)
{
QW_RET
(
qwAddTaskCtxImpl
(
QW_FPARAMS
(),
false
,
NULL
));
}
int32_t
qwAddAcquireTaskCtx
(
QW_FPARAMS_DEF
,
SQWTaskCtx
**
ctx
)
{
return
qwAddTaskCtxImpl
(
QW_FPARAMS
(),
true
,
ctx
);
}
void
qwReleaseTaskCtx
(
SQWorker
*
mgmt
,
void
*
ctx
)
{
taosHashRelease
(
mgmt
->
ctxHash
,
ctx
);
}
void
qwFreeTaskHandle
(
QW_FPARAMS_DEF
,
qTaskInfo_t
*
taskHandle
)
{
// Note: free/kill may in RC
qTaskInfo_t
otaskHandle
=
atomic_load_ptr
(
taskHandle
);
if
(
otaskHandle
&&
atomic_val_compare_exchange_ptr
(
taskHandle
,
otaskHandle
,
NULL
))
{
qDestroyTask
(
otaskHandle
);
}
}
int32_t
qwKillTaskHandle
(
QW_FPARAMS_DEF
,
SQWTaskCtx
*
ctx
)
{
int32_t
code
=
0
;
// Note: free/kill may in RC
qTaskInfo_t
taskHandle
=
atomic_load_ptr
(
&
ctx
->
taskHandle
);
if
(
taskHandle
&&
atomic_val_compare_exchange_ptr
(
&
ctx
->
taskHandle
,
taskHandle
,
NULL
))
{
code
=
qAsyncKillTask
(
taskHandle
);
atomic_store_ptr
(
&
ctx
->
taskHandle
,
taskHandle
);
}
QW_RET
(
code
);
}
void
qwFreeTask
(
QW_FPARAMS_DEF
,
SQWTaskCtx
*
ctx
)
{
tmsgReleaseHandle
(
&
ctx
->
ctrlConnInfo
,
TAOS_CONN_SERVER
);
ctx
->
ctrlConnInfo
.
handle
=
NULL
;
ctx
->
ctrlConnInfo
.
refId
=
-
1
;
// NO need to release dataConnInfo
qwFreeTaskHandle
(
QW_FPARAMS
(),
&
ctx
->
taskHandle
);
if
(
ctx
->
sinkHandle
)
{
dsDestroyDataSinker
(
ctx
->
sinkHandle
);
ctx
->
sinkHandle
=
NULL
;
}
if
(
ctx
->
plan
)
{
nodesDestroyNode
(
ctx
->
plan
);
ctx
->
plan
=
NULL
;
}
}
int32_t
qwDropTaskCtx
(
QW_FPARAMS_DEF
)
{
char
id
[
sizeof
(
qId
)
+
sizeof
(
tId
)]
=
{
0
};
QW_SET_QTID
(
id
,
qId
,
tId
);
SQWTaskCtx
octx
;
SQWTaskCtx
*
ctx
=
taosHashGet
(
mgmt
->
ctxHash
,
id
,
sizeof
(
id
));
if
(
NULL
==
ctx
)
{
QW_ERR_RET
(
TSDB_CODE_QRY_TASK_CTX_NOT_EXIST
);
}
octx
=
*
ctx
;
atomic_store_ptr
(
&
ctx
->
taskHandle
,
NULL
);
atomic_store_ptr
(
&
ctx
->
sinkHandle
,
NULL
);
atomic_store_ptr
(
&
ctx
->
plan
,
NULL
);
QW_SET_EVENT_PROCESSED
(
ctx
,
QW_EVENT_DROP
);
if
(
taosHashRemove
(
mgmt
->
ctxHash
,
id
,
sizeof
(
id
)))
{
QW_TASK_ELOG_E
(
"taosHashRemove from ctx hash failed"
);
QW_ERR_RET
(
TSDB_CODE_QRY_TASK_CTX_NOT_EXIST
);
}
qwFreeTask
(
QW_FPARAMS
(),
&
octx
);
QW_TASK_DLOG_E
(
"task ctx dropped"
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
qwDropTaskStatus
(
QW_FPARAMS_DEF
)
{
SQWSchStatus
*
sch
=
NULL
;
SQWTaskStatus
*
task
=
NULL
;
int32_t
code
=
0
;
char
id
[
sizeof
(
qId
)
+
sizeof
(
tId
)]
=
{
0
};
QW_SET_QTID
(
id
,
qId
,
tId
);
if
(
qwAcquireScheduler
(
mgmt
,
sId
,
QW_WRITE
,
&
sch
))
{
QW_TASK_WLOG_E
(
"scheduler does not exist"
);
return
TSDB_CODE_SUCCESS
;
}
if
(
qwAcquireTaskStatus
(
QW_FPARAMS
(),
QW_WRITE
,
sch
,
&
task
))
{
qwReleaseScheduler
(
QW_WRITE
,
mgmt
);
QW_TASK_WLOG_E
(
"task does not exist"
);
return
TSDB_CODE_SUCCESS
;
}
if
(
taosHashRemove
(
sch
->
tasksHash
,
id
,
sizeof
(
id
)))
{
QW_TASK_ELOG_E
(
"taosHashRemove task from hash failed"
);
QW_ERR_JRET
(
TSDB_CODE_QRY_APP_ERROR
);
}
QW_TASK_DLOG_E
(
"task status dropped"
);
_return:
if
(
task
)
{
qwReleaseTaskStatus
(
QW_WRITE
,
sch
);
}
qwReleaseScheduler
(
QW_WRITE
,
mgmt
);
QW_RET
(
code
);
}
int32_t
qwUpdateTaskStatus
(
QW_FPARAMS_DEF
,
int8_t
status
)
{
SQWSchStatus
*
sch
=
NULL
;
SQWTaskStatus
*
task
=
NULL
;
int32_t
code
=
0
;
QW_ERR_RET
(
qwAcquireScheduler
(
mgmt
,
sId
,
QW_READ
,
&
sch
));
QW_ERR_JRET
(
qwAcquireTaskStatus
(
QW_FPARAMS
(),
QW_READ
,
sch
,
&
task
));
QW_ERR_JRET
(
qwSetTaskStatus
(
QW_FPARAMS
(),
task
,
status
));
_return:
if
(
task
)
{
qwReleaseTaskStatus
(
QW_READ
,
sch
);
}
qwReleaseScheduler
(
QW_READ
,
mgmt
);
QW_RET
(
code
);
}
int32_t
qwDropTask
(
QW_FPARAMS_DEF
)
{
QW_ERR_RET
(
qwDropTaskStatus
(
QW_FPARAMS
()));
QW_ERR_RET
(
qwDropTaskCtx
(
QW_FPARAMS
()));
QW_TASK_DLOG_E
(
"task is dropped"
);
return
TSDB_CODE_SUCCESS
;
}
void
qwSetHbParam
(
int64_t
refId
,
SQWHbParam
**
pParam
)
{
int32_t
paramIdx
=
0
;
int32_t
newParamIdx
=
0
;
while
(
true
)
{
paramIdx
=
atomic_load_32
(
&
gQwMgmt
.
paramIdx
);
if
(
paramIdx
==
tListLen
(
gQwMgmt
.
param
))
{
newParamIdx
=
0
;
}
else
{
newParamIdx
=
paramIdx
+
1
;
}
if
(
paramIdx
==
atomic_val_compare_exchange_32
(
&
gQwMgmt
.
paramIdx
,
paramIdx
,
newParamIdx
))
{
break
;
}
}
gQwMgmt
.
param
[
paramIdx
].
qwrId
=
gQwMgmt
.
qwRef
;
gQwMgmt
.
param
[
paramIdx
].
refId
=
refId
;
*
pParam
=
&
gQwMgmt
.
param
[
paramIdx
];
}
void
qwSaveTbVersionInfo
(
qTaskInfo_t
pTaskInfo
,
SQWTaskCtx
*
ctx
)
{
char
dbFName
[
TSDB_DB_FNAME_LEN
];
char
tbName
[
TSDB_TABLE_NAME_LEN
];
qGetQueriedTableSchemaVersion
(
pTaskInfo
,
dbFName
,
tbName
,
&
ctx
->
tbInfo
.
sversion
,
&
ctx
->
tbInfo
.
tversion
);
if
(
dbFName
[
0
]
&&
tbName
[
0
])
{
sprintf
(
ctx
->
tbInfo
.
tbFName
,
"%s.%s"
,
dbFName
,
tbName
);
}
else
{
ctx
->
tbInfo
.
tbFName
[
0
]
=
0
;
}
}
void
qwCloseRef
(
void
)
{
taosWLockLatch
(
&
gQwMgmt
.
lock
);
if
(
atomic_load_32
(
&
gQwMgmt
.
qwNum
)
<=
0
&&
gQwMgmt
.
qwRef
>=
0
)
{
taosCloseRef
(
gQwMgmt
.
qwRef
);
gQwMgmt
.
qwRef
=
-
1
;
}
taosWUnLockLatch
(
&
gQwMgmt
.
lock
);
}
void
qwDestroySchStatus
(
SQWSchStatus
*
pStatus
)
{
taosHashCleanup
(
pStatus
->
tasksHash
);
}
void
qwDestroyImpl
(
void
*
pMgmt
)
{
SQWorker
*
mgmt
=
(
SQWorker
*
)
pMgmt
;
taosTmrStopA
(
&
mgmt
->
hbTimer
);
taosTmrCleanUp
(
mgmt
->
timer
);
// TODO STOP ALL QUERY
// TODO FREE ALL
taosHashCleanup
(
mgmt
->
ctxHash
);
void
*
pIter
=
taosHashIterate
(
mgmt
->
schHash
,
NULL
);
while
(
pIter
)
{
SQWSchStatus
*
sch
=
(
SQWSchStatus
*
)
pIter
;
qwDestroySchStatus
(
sch
);
pIter
=
taosHashIterate
(
mgmt
->
schHash
,
pIter
);
}
taosHashCleanup
(
mgmt
->
schHash
);
taosMemoryFree
(
mgmt
);
atomic_sub_fetch_32
(
&
gQwMgmt
.
qwNum
,
1
);
qwCloseRef
();
}
int32_t
qwOpenRef
(
void
)
{
taosWLockLatch
(
&
gQwMgmt
.
lock
);
if
(
gQwMgmt
.
qwRef
<
0
)
{
gQwMgmt
.
qwRef
=
taosOpenRef
(
100
,
qwDestroyImpl
);
if
(
gQwMgmt
.
qwRef
<
0
)
{
taosWUnLockLatch
(
&
gQwMgmt
.
lock
);
qError
(
"init qworker ref failed"
);
QW_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
}
taosWUnLockLatch
(
&
gQwMgmt
.
lock
);
return
TSDB_CODE_SUCCESS
;
}
source/libs/qworker/src/qworker.c
浏览文件 @
6e465ebd
此差异已折叠。
点击以展开。
source/libs/qworker/test/qworkerTests.cpp
浏览文件 @
6e465ebd
...
...
@@ -127,15 +127,6 @@ void qwtBuildQueryReqMsg(SRpcMsg *queryRpc) {
queryRpc
->
contLen
=
sizeof
(
SSubQueryMsg
)
+
100
;
}
void
qwtBuildReadyReqMsg
(
SResReadyReq
*
readyMsg
,
SRpcMsg
*
readyRpc
)
{
readyMsg
->
sId
=
htobe64
(
1
);
readyMsg
->
queryId
=
htobe64
(
atomic_load_64
(
&
qwtTestQueryId
));
readyMsg
->
taskId
=
htobe64
(
1
);
readyRpc
->
msgType
=
TDMT_VND_RES_READY
;
readyRpc
->
pCont
=
readyMsg
;
readyRpc
->
contLen
=
sizeof
(
SResReadyReq
);
}
void
qwtBuildFetchReqMsg
(
SResFetchReq
*
fetchMsg
,
SRpcMsg
*
fetchRpc
)
{
fetchMsg
->
sId
=
htobe64
(
1
);
fetchMsg
->
queryId
=
htobe64
(
atomic_load_64
(
&
qwtTestQueryId
));
...
...
@@ -154,13 +145,6 @@ void qwtBuildDropReqMsg(STaskDropReq *dropMsg, SRpcMsg *dropRpc) {
dropRpc
->
contLen
=
sizeof
(
STaskDropReq
);
}
void
qwtBuildStatusReqMsg
(
SSchTasksStatusReq
*
statusMsg
,
SRpcMsg
*
statusRpc
)
{
statusMsg
->
sId
=
htobe64
(
1
);
statusRpc
->
pCont
=
statusMsg
;
statusRpc
->
contLen
=
sizeof
(
SSchTasksStatusReq
);
statusRpc
->
msgType
=
TDMT_VND_TASKS_STATUS
;
}
int32_t
qwtStringToPlan
(
const
char
*
str
,
SSubplan
**
subplan
)
{
*
subplan
=
(
SSubplan
*
)
0x1
;
return
0
;
...
...
@@ -222,10 +206,7 @@ void qwtRpcSendResponse(const SRpcMsg *pRsp) {
case
TDMT_VND_QUERY_RSP
:
{
SQueryTableRsp
*
rsp
=
(
SQueryTableRsp
*
)
pRsp
->
pCont
;
if
(
0
==
pRsp
->
code
)
{
qwtBuildReadyReqMsg
(
&
qwtreadyMsg
,
&
qwtreadyRpc
);
qwtPutReqToFetchQueue
((
void
*
)
0x1
,
&
qwtreadyRpc
);
}
else
{
if
(
pRsp
->
code
)
{
qwtBuildDropReqMsg
(
&
qwtdropMsg
,
&
qwtdropRpc
);
qwtPutReqToFetchQueue
((
void
*
)
0x1
,
&
qwtdropRpc
);
}
...
...
@@ -233,19 +214,6 @@ void qwtRpcSendResponse(const SRpcMsg *pRsp) {
rpcFreeCont
(
rsp
);
break
;
}
case
TDMT_VND_RES_READY_RSP
:
{
SResReadyRsp
*
rsp
=
(
SResReadyRsp
*
)
pRsp
->
pCont
;
if
(
0
==
pRsp
->
code
)
{
qwtBuildFetchReqMsg
(
&
qwtfetchMsg
,
&
qwtfetchRpc
);
qwtPutReqToFetchQueue
((
void
*
)
0x1
,
&
qwtfetchRpc
);
}
else
{
qwtBuildDropReqMsg
(
&
qwtdropMsg
,
&
qwtdropRpc
);
qwtPutReqToFetchQueue
((
void
*
)
0x1
,
&
qwtdropRpc
);
}
rpcFreeCont
(
rsp
);
break
;
}
case
TDMT_VND_FETCH_RSP
:
{
SRetrieveTableRsp
*
rsp
=
(
SRetrieveTableRsp
*
)
pRsp
->
pCont
;
...
...
@@ -679,28 +647,6 @@ void *queryThread(void *param) {
return
NULL
;
}
void
*
readyThread
(
void
*
param
)
{
SRpcMsg
readyRpc
=
{
0
};
int32_t
code
=
0
;
uint32_t
n
=
0
;
void
*
mockPointer
=
(
void
*
)
0x1
;
void
*
mgmt
=
param
;
SResReadyReq
readyMsg
=
{
0
};
while
(
!
qwtTestStop
)
{
qwtBuildReadyReqMsg
(
&
readyMsg
,
&
readyRpc
);
code
=
qWorkerProcessReadyMsg
(
mockPointer
,
mgmt
,
&
readyRpc
);
if
(
qwtTestEnableSleep
)
{
taosUsleep
(
taosRand
()
%
5
);
}
if
(
++
n
%
qwtTestPrintNum
==
0
)
{
printf
(
"ready:%d
\n
"
,
n
);
}
}
return
NULL
;
}
void
*
fetchThread
(
void
*
param
)
{
SRpcMsg
fetchRpc
=
{
0
};
int32_t
code
=
0
;
...
...
@@ -745,29 +691,6 @@ void *dropThread(void *param) {
return
NULL
;
}
void
*
statusThread
(
void
*
param
)
{
SRpcMsg
statusRpc
=
{
0
};
int32_t
code
=
0
;
uint32_t
n
=
0
;
void
*
mockPointer
=
(
void
*
)
0x1
;
void
*
mgmt
=
param
;
SSchTasksStatusReq
statusMsg
=
{
0
};
while
(
!
qwtTestStop
)
{
qwtBuildStatusReqMsg
(
&
statusMsg
,
&
statusRpc
);
code
=
qWorkerProcessStatusMsg
(
mockPointer
,
mgmt
,
&
statusRpc
);
if
(
qwtTestEnableSleep
)
{
taosUsleep
(
taosRand
()
%
5
);
}
if
(
++
n
%
qwtTestPrintNum
==
0
)
{
printf
(
"status:%d
\n
"
,
n
);
}
}
return
NULL
;
}
void
*
qwtclientThread
(
void
*
param
)
{
int32_t
code
=
0
;
uint32_t
n
=
0
;
...
...
@@ -894,12 +817,6 @@ void *fetchQueueThread(void *param) {
case
TDMT_VND_FETCH
:
qWorkerProcessFetchMsg
(
mockPointer
,
mgmt
,
fetchRpc
);
break
;
case
TDMT_VND_RES_READY
:
qWorkerProcessReadyMsg
(
mockPointer
,
mgmt
,
fetchRpc
);
break
;
case
TDMT_VND_TASKS_STATUS
:
qWorkerProcessStatusMsg
(
mockPointer
,
mgmt
,
fetchRpc
);
break
;
case
TDMT_VND_CANCEL_TASK
:
qWorkerProcessCancelMsg
(
mockPointer
,
mgmt
,
fetchRpc
);
break
;
...
...
@@ -934,15 +851,12 @@ TEST(seqTest, normalCase) {
int32_t
code
=
0
;
void
*
mockPointer
=
(
void
*
)
0x1
;
SRpcMsg
queryRpc
=
{
0
};
SRpcMsg
readyRpc
=
{
0
};
SRpcMsg
fetchRpc
=
{
0
};
SRpcMsg
dropRpc
=
{
0
};
SRpcMsg
statusRpc
=
{
0
};
qwtInitLogFile
();
qwtBuildQueryReqMsg
(
&
queryRpc
);
qwtBuildReadyReqMsg
(
&
qwtreadyMsg
,
&
readyRpc
);
qwtBuildFetchReqMsg
(
&
qwtfetchMsg
,
&
fetchRpc
);
qwtBuildDropReqMsg
(
&
qwtdropMsg
,
&
dropRpc
);
...
...
@@ -976,10 +890,6 @@ TEST(seqTest, normalCase) {
code
=
qWorkerProcessDropMsg
(
mockPointer
,
mgmt
,
&
dropRpc
);
ASSERT_EQ
(
code
,
0
);
qwtBuildStatusReqMsg
(
&
qwtstatusMsg
,
&
statusRpc
);
code
=
qWorkerProcessStatusMsg
(
mockPointer
,
mgmt
,
&
statusRpc
);
ASSERT_EQ
(
code
,
0
);
qWorkerDestroy
(
&
mgmt
);
}
...
...
@@ -989,13 +899,11 @@ TEST(seqTest, cancelFirst) {
void
*
mockPointer
=
(
void
*
)
0x1
;
SRpcMsg
queryRpc
=
{
0
};
SRpcMsg
dropRpc
=
{
0
};
SRpcMsg
statusRpc
=
{
0
};
qwtInitLogFile
();
qwtBuildQueryReqMsg
(
&
queryRpc
);
qwtBuildDropReqMsg
(
&
qwtdropMsg
,
&
dropRpc
);
qwtBuildStatusReqMsg
(
&
qwtstatusMsg
,
&
statusRpc
);
stubSetStringToPlan
();
stubSetRpcSendResponse
();
...
...
@@ -1006,24 +914,12 @@ TEST(seqTest, cancelFirst) {
code
=
qWorkerInit
(
NODE_TYPE_VNODE
,
1
,
NULL
,
&
mgmt
,
&
msgCb
);
ASSERT_EQ
(
code
,
0
);
qwtBuildStatusReqMsg
(
&
qwtstatusMsg
,
&
statusRpc
);
code
=
qWorkerProcessStatusMsg
(
mockPointer
,
mgmt
,
&
statusRpc
);
ASSERT_EQ
(
code
,
0
);
code
=
qWorkerProcessDropMsg
(
mockPointer
,
mgmt
,
&
dropRpc
);
ASSERT_EQ
(
code
,
0
);
qwtBuildStatusReqMsg
(
&
qwtstatusMsg
,
&
statusRpc
);
code
=
qWorkerProcessStatusMsg
(
mockPointer
,
mgmt
,
&
statusRpc
);
ASSERT_EQ
(
code
,
0
);
code
=
qWorkerProcessQueryMsg
(
mockPointer
,
mgmt
,
&
queryRpc
);
ASSERT_TRUE
(
0
!=
code
);
qwtBuildStatusReqMsg
(
&
qwtstatusMsg
,
&
statusRpc
);
code
=
qWorkerProcessStatusMsg
(
mockPointer
,
mgmt
,
&
statusRpc
);
ASSERT_EQ
(
code
,
0
);
qWorkerDestroy
(
&
mgmt
);
}
...
...
@@ -1087,9 +983,6 @@ TEST(seqTest, randCase) {
}
}
else
if
(
r
>=
maxr
*
4
/
5
&&
r
<
maxr
-
1
)
{
printf
(
"Status,%d
\n
"
,
t
++
);
qwtBuildStatusReqMsg
(
&
statusMsg
,
&
statusRpc
);
code
=
qWorkerProcessStatusMsg
(
mockPointer
,
mgmt
,
&
statusRpc
);
ASSERT_EQ
(
code
,
0
);
if
(
qwtTestEnableSleep
)
{
taosUsleep
(
1
);
}
...
...
@@ -1137,7 +1030,6 @@ TEST(seqTest, multithreadRand) {
//taosThreadCreate(&(t2), &thattr, readyThread, NULL);
taosThreadCreate
(
&
(
t3
),
&
thattr
,
fetchThread
,
NULL
);
taosThreadCreate
(
&
(
t4
),
&
thattr
,
dropThread
,
NULL
);
taosThreadCreate
(
&
(
t5
),
&
thattr
,
statusThread
,
NULL
);
taosThreadCreate
(
&
(
t6
),
&
thattr
,
fetchQueueThread
,
mgmt
);
while
(
true
)
{
...
...
source/libs/scheduler/inc/schedulerInt.h
浏览文件 @
6e465ebd
...
...
@@ -297,7 +297,7 @@ void schFreeRpcCtx(SRpcCtx *pCtx);
int32_t
schGetCallbackFp
(
int32_t
msgType
,
__async_send_cb_fn_t
*
fp
);
bool
schJobNeedToStop
(
SSchJob
*
pJob
,
int8_t
*
pStatus
);
int32_t
schProcessOnTaskSuccess
(
SSchJob
*
pJob
,
SSchTask
*
pTask
);
int32_t
schSaveJobQueryRes
(
SSchJob
*
pJob
,
S
ResReady
Rsp
*
rsp
);
int32_t
schSaveJobQueryRes
(
SSchJob
*
pJob
,
S
QueryTable
Rsp
*
rsp
);
int32_t
schProcessOnExplainDone
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
SRetrieveTableRsp
*
pRsp
);
void
schProcessOnDataFetched
(
SSchJob
*
job
);
int32_t
schGetTaskFromTaskList
(
SHashObj
*
pTaskList
,
uint64_t
taskId
,
SSchTask
**
pTask
);
...
...
source/libs/scheduler/src/schJob.c
浏览文件 @
6e465ebd
...
...
@@ -1067,7 +1067,7 @@ int32_t schProcessOnExplainDone(SSchJob *pJob, SSchTask *pTask, SRetrieveTableRs
return
TSDB_CODE_SUCCESS
;
}
int32_t
schSaveJobQueryRes
(
SSchJob
*
pJob
,
S
ResReady
Rsp
*
rsp
)
{
int32_t
schSaveJobQueryRes
(
SSchJob
*
pJob
,
S
QueryTable
Rsp
*
rsp
)
{
if
(
rsp
->
tbFName
[
0
])
{
if
(
NULL
==
pJob
->
queryRes
)
{
pJob
->
queryRes
=
taosArrayInit
(
pJob
->
taskNum
,
sizeof
(
STbVerInfo
));
...
...
source/libs/scheduler/src/schRemote.c
浏览文件 @
6e465ebd
...
...
@@ -31,7 +31,7 @@ int32_t schValidateReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgTy
case
TDMT_VND_EXPLAIN_RSP
:
return
TSDB_CODE_SUCCESS
;
case
TDMT_VND_QUERY_RSP
:
// query_rsp may be processed later than ready_rsp
if
(
lastMsgType
!=
reqMsgType
&&
-
1
!=
lastMsgType
&&
TDMT_VND_FETCH
!=
lastMsgType
)
{
if
(
lastMsgType
!=
reqMsgType
&&
-
1
!=
lastMsgType
)
{
SCH_TASK_DLOG
(
"rsp msg type mis-match, last sent msgType:%s, rspType:%s"
,
TMSG_INFO
(
lastMsgType
),
TMSG_INFO
(
msgType
));
}
...
...
@@ -41,22 +41,6 @@ int32_t schValidateReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgTy
TMSG_INFO
(
msgType
));
}
SCH_SET_TASK_LASTMSG_TYPE
(
pTask
,
-
1
);
return
TSDB_CODE_SUCCESS
;
case
TDMT_VND_RES_READY_RSP
:
reqMsgType
=
TDMT_VND_QUERY
;
if
(
lastMsgType
!=
reqMsgType
&&
-
1
!=
lastMsgType
)
{
SCH_TASK_ELOG
(
"rsp msg type mis-match, last sent msgType:%s, rspType:%s"
,
(
lastMsgType
>
0
?
TMSG_INFO
(
lastMsgType
)
:
"null"
),
TMSG_INFO
(
msgType
));
SCH_ERR_RET
(
TSDB_CODE_SCH_STATUS_ERROR
);
}
if
(
taskStatus
!=
JOB_TASK_STATUS_EXECUTING
&&
taskStatus
!=
JOB_TASK_STATUS_PARTIAL_SUCCEED
)
{
SCH_TASK_ELOG
(
"rsp msg conflicted with task status, status:%s, rspType:%s"
,
jobTaskStatusStr
(
taskStatus
),
TMSG_INFO
(
msgType
));
SCH_ERR_RET
(
TSDB_CODE_SCH_STATUS_ERROR
);
}
SCH_SET_TASK_LASTMSG_TYPE
(
pTask
,
-
1
);
return
TSDB_CODE_SUCCESS
;
case
TDMT_VND_FETCH_RSP
:
...
...
@@ -231,24 +215,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
break
;
}
case
TDMT_VND_QUERY_RSP
:
{
SQueryTableRsp
rsp
=
{
0
};
if
(
msg
)
{
SCH_ERR_JRET
(
tDeserializeSQueryTableRsp
(
msg
,
msgSize
,
&
rsp
));
SCH_ERR_JRET
(
rsp
.
code
);
}
SCH_ERR_JRET
(
rspCode
);
if
(
NULL
==
msg
)
{
SCH_ERR_JRET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
// SCH_ERR_JRET(schBuildAndSendMsg(pJob, pTask, NULL, TDMT_VND_RES_READY));
break
;
}
case
TDMT_VND_RES_READY_RSP
:
{
SResReadyRsp
*
rsp
=
(
SResReadyRsp
*
)
msg
;
SQueryTableRsp
*
rsp
=
(
SQueryTableRsp
*
)
msg
;
SCH_ERR_JRET
(
rspCode
);
if
(
NULL
==
msg
)
{
...
...
@@ -429,10 +396,6 @@ int32_t schHandleFetchCallback(void *param, const SDataBuf *pMsg, int32_t code)
return
schHandleCallback
(
param
,
pMsg
,
TDMT_VND_FETCH_RSP
,
code
);
}
int32_t
schHandleReadyCallback
(
void
*
param
,
const
SDataBuf
*
pMsg
,
int32_t
code
)
{
return
schHandleCallback
(
param
,
pMsg
,
TDMT_VND_RES_READY_RSP
,
code
);
}
int32_t
schHandleExplainCallback
(
void
*
param
,
const
SDataBuf
*
pMsg
,
int32_t
code
)
{
return
schHandleCallback
(
param
,
pMsg
,
TDMT_VND_EXPLAIN_RSP
,
code
);
}
...
...
@@ -518,9 +481,6 @@ int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) {
case
TDMT_VND_QUERY
:
*
fp
=
schHandleQueryCallback
;
break
;
case
TDMT_VND_RES_READY
:
*
fp
=
schHandleReadyCallback
;
break
;
case
TDMT_VND_EXPLAIN
:
*
fp
=
schHandleExplainCallback
;
break
;
...
...
@@ -933,7 +893,6 @@ _return:
int32_t
schMakeQueryRpcCtx
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
SRpcCtx
*
pCtx
)
{
int32_t
code
=
0
;
SMsgSendInfo
*
pReadyMsgSendInfo
=
NULL
;
SMsgSendInfo
*
pExplainMsgSendInfo
=
NULL
;
pCtx
->
args
=
taosHashInit
(
1
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
),
false
,
HASH_ENTRY_LOCK
);
...
...
@@ -942,18 +901,10 @@ int32_t schMakeQueryRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) {
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
SCH_ERR_JRET
(
schGenerateCallBackInfo
(
pJob
,
pTask
,
TDMT_VND_RES_READY
,
&
pReadyMsgSendInfo
));
SCH_ERR_JRET
(
schGenerateCallBackInfo
(
pJob
,
pTask
,
TDMT_VND_EXPLAIN
,
&
pExplainMsgSendInfo
));
int32_t
msgType
=
TDMT_VND_RES_READY_RSP
;
SRpcCtxVal
ctxVal
=
{.
val
=
pReadyMsgSendInfo
,
.
clone
=
schCloneSMsgSendInfo
};
if
(
taosHashPut
(
pCtx
->
args
,
&
msgType
,
sizeof
(
msgType
),
&
ctxVal
,
sizeof
(
ctxVal
)))
{
SCH_TASK_ELOG
(
"taosHashPut msg %d to rpcCtx failed"
,
msgType
);
SCH_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
msgType
=
TDMT_VND_EXPLAIN_RSP
;
ctxVal
.
val
=
pExplainMsgSendInfo
;
int32_t
msgType
=
TDMT_VND_EXPLAIN_RSP
;
SRpcCtxVal
ctxVal
=
{.
val
=
pExplainMsgSendInfo
,
.
clone
=
schCloneSMsgSendInfo
};
if
(
taosHashPut
(
pCtx
->
args
,
&
msgType
,
sizeof
(
msgType
),
&
ctxVal
,
sizeof
(
ctxVal
)))
{
SCH_TASK_ELOG
(
"taosHashPut msg %d to rpcCtx failed"
,
msgType
);
SCH_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
...
...
@@ -968,11 +919,6 @@ _return:
taosHashCleanup
(
pCtx
->
args
);
if
(
pReadyMsgSendInfo
)
{
taosMemoryFreeClear
(
pReadyMsgSendInfo
->
param
);
taosMemoryFreeClear
(
pReadyMsgSendInfo
);
}
if
(
pExplainMsgSendInfo
)
{
taosMemoryFreeClear
(
pExplainMsgSendInfo
->
param
);
taosMemoryFreeClear
(
pExplainMsgSendInfo
);
...
...
@@ -1128,24 +1074,6 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
persistHandle
=
true
;
break
;
}
case
TDMT_VND_RES_READY
:
{
msgSize
=
sizeof
(
SResReadyReq
);
msg
=
taosMemoryCalloc
(
1
,
msgSize
);
if
(
NULL
==
msg
)
{
SCH_TASK_ELOG
(
"calloc %d failed"
,
msgSize
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
SResReadyReq
*
pMsg
=
msg
;
pMsg
->
header
.
vgId
=
htonl
(
addr
->
nodeId
);
pMsg
->
sId
=
htobe64
(
schMgmt
.
sId
);
pMsg
->
queryId
=
htobe64
(
pJob
->
queryId
);
pMsg
->
taskId
=
htobe64
(
pTask
->
taskId
);
break
;
}
case
TDMT_VND_FETCH
:
{
msgSize
=
sizeof
(
SResFetchReq
);
msg
=
taosMemoryCalloc
(
1
,
msgSize
);
...
...
source/libs/scheduler/test/schedulerTests.cpp
浏览文件 @
6e465ebd
...
...
@@ -542,27 +542,6 @@ void* schtRunJobThread(void *aa) {
pIter
=
taosHashIterate
(
execTasks
,
pIter
);
}
param
=
(
SSchTaskCallbackParam
*
)
taosMemoryCalloc
(
1
,
sizeof
(
*
param
));
param
->
refId
=
queryJobRefId
;
param
->
queryId
=
pJob
->
queryId
;
pIter
=
taosHashIterate
(
execTasks
,
NULL
);
while
(
pIter
)
{
SSchTask
*
task
=
(
SSchTask
*
)
pIter
;
param
->
taskId
=
task
->
taskId
;
SResReadyRsp
rsp
=
{
0
};
dataBuf
.
pData
=
&
rsp
;
dataBuf
.
len
=
sizeof
(
rsp
);
code
=
schHandleCallback
(
param
,
&
dataBuf
,
TDMT_VND_RES_READY_RSP
,
0
);
assert
(
code
==
0
||
code
);
pIter
=
taosHashIterate
(
execTasks
,
pIter
);
}
param
=
(
SSchTaskCallbackParam
*
)
taosMemoryCalloc
(
1
,
sizeof
(
*
param
));
param
->
refId
=
queryJobRefId
;
param
->
queryId
=
pJob
->
queryId
;
...
...
@@ -583,25 +562,6 @@ void* schtRunJobThread(void *aa) {
}
param
=
(
SSchTaskCallbackParam
*
)
taosMemoryCalloc
(
1
,
sizeof
(
*
param
));
param
->
refId
=
queryJobRefId
;
param
->
queryId
=
pJob
->
queryId
;
pIter
=
taosHashIterate
(
execTasks
,
NULL
);
while
(
pIter
)
{
SSchTask
*
task
=
(
SSchTask
*
)
pIter
;
param
->
taskId
=
task
->
taskId
-
1
;
SResReadyRsp
rsp
=
{
0
};
dataBuf
.
pData
=
&
rsp
;
dataBuf
.
len
=
sizeof
(
rsp
);
code
=
schHandleCallback
(
param
,
&
dataBuf
,
TDMT_VND_RES_READY_RSP
,
0
);
assert
(
code
==
0
||
code
);
pIter
=
taosHashIterate
(
execTasks
,
pIter
);
}
while
(
true
)
{
if
(
queryDone
)
{
break
;
...
...
@@ -701,17 +661,6 @@ TEST(queryTest, normalCase) {
pIter
=
taosHashIterate
(
pJob
->
execTasks
,
pIter
);
}
pIter
=
taosHashIterate
(
pJob
->
execTasks
,
NULL
);
while
(
pIter
)
{
SSchTask
*
task
=
*
(
SSchTask
**
)
pIter
;
SResReadyRsp
rsp
=
{
0
};
code
=
schHandleResponseMsg
(
pJob
,
task
,
TDMT_VND_RES_READY_RSP
,
(
char
*
)
&
rsp
,
sizeof
(
rsp
),
0
);
printf
(
"code:%d"
,
code
);
ASSERT_EQ
(
code
,
0
);
pIter
=
taosHashIterate
(
pJob
->
execTasks
,
pIter
);
}
pIter
=
taosHashIterate
(
pJob
->
execTasks
,
NULL
);
while
(
pIter
)
{
SSchTask
*
task
=
*
(
SSchTask
**
)
pIter
;
...
...
@@ -723,17 +672,6 @@ TEST(queryTest, normalCase) {
pIter
=
taosHashIterate
(
pJob
->
execTasks
,
pIter
);
}
pIter
=
taosHashIterate
(
pJob
->
execTasks
,
NULL
);
while
(
pIter
)
{
SSchTask
*
task
=
*
(
SSchTask
**
)
pIter
;
SResReadyRsp
rsp
=
{
0
};
code
=
schHandleResponseMsg
(
pJob
,
task
,
TDMT_VND_RES_READY_RSP
,
(
char
*
)
&
rsp
,
sizeof
(
rsp
),
0
);
ASSERT_EQ
(
code
,
0
);
pIter
=
taosHashIterate
(
pJob
->
execTasks
,
pIter
);
}
while
(
true
)
{
if
(
queryDone
)
{
break
;
...
...
@@ -809,17 +747,6 @@ TEST(queryTest, readyFirstCase) {
while
(
pIter
)
{
SSchTask
*
task
=
*
(
SSchTask
**
)
pIter
;
SResReadyRsp
rsp
=
{
0
};
code
=
schHandleResponseMsg
(
pJob
,
task
,
TDMT_VND_RES_READY_RSP
,
(
char
*
)
&
rsp
,
sizeof
(
rsp
),
0
);
printf
(
"code:%d"
,
code
);
ASSERT_EQ
(
code
,
0
);
pIter
=
taosHashIterate
(
pJob
->
execTasks
,
pIter
);
}
pIter
=
taosHashIterate
(
pJob
->
execTasks
,
NULL
);
while
(
pIter
)
{
SSchTask
*
task
=
*
(
SSchTask
**
)
pIter
;
SQueryTableRsp
rsp
=
{
0
};
code
=
schHandleResponseMsg
(
pJob
,
task
,
TDMT_VND_QUERY_RSP
,
(
char
*
)
&
rsp
,
sizeof
(
rsp
),
0
);
...
...
@@ -827,17 +754,6 @@ TEST(queryTest, readyFirstCase) {
pIter
=
taosHashIterate
(
pJob
->
execTasks
,
pIter
);
}
pIter
=
taosHashIterate
(
pJob
->
execTasks
,
NULL
);
while
(
pIter
)
{
SSchTask
*
task
=
*
(
SSchTask
**
)
pIter
;
SResReadyRsp
rsp
=
{
0
};
code
=
schHandleResponseMsg
(
pJob
,
task
,
TDMT_VND_RES_READY_RSP
,
(
char
*
)
&
rsp
,
sizeof
(
rsp
),
0
);
ASSERT_EQ
(
code
,
0
);
pIter
=
taosHashIterate
(
pJob
->
execTasks
,
pIter
);
}
pIter
=
taosHashIterate
(
pJob
->
execTasks
,
NULL
);
while
(
pIter
)
{
SSchTask
*
task
=
*
(
SSchTask
**
)
pIter
;
...
...
@@ -942,10 +858,6 @@ TEST(queryTest, flowCtrlCase) {
SQueryTableRsp
rsp
=
{
0
};
code
=
schHandleResponseMsg
(
pJob
,
task
,
TDMT_VND_QUERY_RSP
,
(
char
*
)
&
rsp
,
sizeof
(
rsp
),
0
);
ASSERT_EQ
(
code
,
0
);
}
else
if
(
task
->
lastMsgType
==
TDMT_VND_RES_READY
)
{
SResReadyRsp
rsp
=
{
0
};
code
=
schHandleResponseMsg
(
pJob
,
task
,
TDMT_VND_RES_READY_RSP
,
(
char
*
)
&
rsp
,
sizeof
(
rsp
),
0
);
ASSERT_EQ
(
code
,
0
);
}
else
{
qDone
=
true
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录