提交 86758572 编写于 作者: D dapan1121

enh: add client query policy

上级 a7131c8c
......@@ -52,6 +52,7 @@ typedef enum {
#define QUERY_POLICY_VNODE 1
#define QUERY_POLICY_HYBRID 2
#define QUERY_POLICY_QNODE 3
#define QUERY_POLICY_CLIENT 4
typedef struct STableComInfo {
uint8_t numOfTags; // the number of tags in schema
......@@ -267,43 +268,43 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t
#define qFatal(...) \
do { \
if (qDebugFlag & DEBUG_FATAL) { \
taosPrintLog("QRY FATAL ", DEBUG_FATAL, tsLogEmbedded ? 255 : qDebugFlag, __VA_ARGS__); \
taosPrintLog("QRY FATAL ", DEBUG_FATAL, qDebugFlag, __VA_ARGS__); \
} \
} while (0)
#define qError(...) \
do { \
if (qDebugFlag & DEBUG_ERROR) { \
taosPrintLog("QRY ERROR ", DEBUG_ERROR, tsLogEmbedded ? 255 : qDebugFlag, __VA_ARGS__); \
taosPrintLog("QRY ERROR ", DEBUG_ERROR, qDebugFlag, __VA_ARGS__); \
} \
} while (0)
#define qWarn(...) \
do { \
if (qDebugFlag & DEBUG_WARN) { \
taosPrintLog("QRY WARN ", DEBUG_WARN, tsLogEmbedded ? 255 : qDebugFlag, __VA_ARGS__); \
taosPrintLog("QRY WARN ", DEBUG_WARN, qDebugFlag, __VA_ARGS__); \
} \
} while (0)
#define qInfo(...) \
do { \
if (qDebugFlag & DEBUG_INFO) { \
taosPrintLog("QRY ", DEBUG_INFO, tsLogEmbedded ? 255 : qDebugFlag, __VA_ARGS__); \
taosPrintLog("QRY ", DEBUG_INFO, qDebugFlag, __VA_ARGS__); \
} \
} while (0)
#define qDebug(...) \
do { \
if (qDebugFlag & DEBUG_DEBUG) { \
taosPrintLog("QRY ", DEBUG_DEBUG, tsLogEmbedded ? 255 : qDebugFlag, __VA_ARGS__); \
taosPrintLog("QRY ", DEBUG_DEBUG, qDebugFlag, __VA_ARGS__); \
} \
} while (0)
#define qTrace(...) \
do { \
if (qDebugFlag & DEBUG_TRACE) { \
taosPrintLog("QRY ", DEBUG_TRACE, tsLogEmbedded ? 255 : qDebugFlag, __VA_ARGS__); \
taosPrintLog("QRY ", DEBUG_TRACE, qDebugFlag, __VA_ARGS__); \
} \
} while (0)
#define qDebugL(...) \
do { \
if (qDebugFlag & DEBUG_DEBUG) { \
taosPrintLongString("QRY ", DEBUG_DEBUG, tsLogEmbedded ? 255 : qDebugFlag, __VA_ARGS__); \
taosPrintLongString("QRY ", DEBUG_DEBUG, qDebugFlag, __VA_ARGS__); \
} \
} while (0)
......
......@@ -29,6 +29,7 @@ enum {
NODE_TYPE_QNODE,
NODE_TYPE_SNODE,
NODE_TYPE_MNODE,
NODE_TYPE_CLIENT,
};
typedef struct SQWorkerCfg {
......@@ -55,7 +56,7 @@ typedef struct {
uint64_t numOfErrors;
} SQWorkerStat;
int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qWorkerMgmt, const SMsgCb *pMsgCb);
int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, void **qWorkerMgmt, const SMsgCb *pMsgCb);
int32_t qWorkerAbortPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg);
......
......@@ -64,6 +64,7 @@ typedef bool (*schedulerChkKillFp)(void* param);
typedef struct SSchedulerReq {
bool syncReq;
bool localReq;
SRequestConnInfo *pConn;
SArray *pNodeList;
SQueryPlan *pDag;
......
......@@ -467,6 +467,7 @@ enum {
#define SNODE_HANDLE -2
#define VNODE_HANDLE -3
#define BNODE_HANDLE -4
#define CLIENT_HANDLE -5
#define TSDB_CONFIG_OPTION_LEN 32
#define TSDB_CONFIG_VALUE_LEN 64
......
......@@ -27,6 +27,7 @@
#include "trpc.h"
#include "tsched.h"
#include "ttime.h"
#include "qworker.h"
#define TSC_VAR_NOT_RELEASE 1
#define TSC_VAR_RELEASED 0
......
......@@ -540,6 +540,20 @@ _return:
return TSDB_CODE_SUCCESS;
}
int32_t buildClientPolicyNodeList(SRequestObj* pRequest, SArray** pNodeList) {
*pNodeList = taosArrayInit(1, sizeof(SQueryNodeLoad));
SQueryNodeLoad load = {0};
load.addr.nodeId = CLIENT_HANDLE;
taosArrayPush(*pNodeList, &load);
tscDebug("0x%" PRIx64 " client policy", pRequest->requestId);
return TSDB_CODE_SUCCESS;
}
int32_t buildAsyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList, SMetaData* pResultMeta) {
SArray* pDbVgList = NULL;
SArray* pQnodeList = NULL;
......@@ -585,6 +599,10 @@ int32_t buildAsyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray
code = buildQnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pQnodeList);
break;
}
case QUERY_POLICY_CLIENT: {
code = buildClientPolicyNodeList(pRequest, pNodeList);
break;
}
default:
tscError("unknown query policy: %d", tsQueryPolicy);
return TSDB_CODE_TSC_APP_ERROR;
......@@ -645,6 +663,10 @@ int32_t buildSyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray*
code = buildQnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pQnodeList);
break;
}
case QUERY_POLICY_CLIENT: {
code = buildClientPolicyNodeList(pRequest, pNodeList);
break;
}
default:
tscError("unknown query policy: %d", tsQueryPolicy);
return TSDB_CODE_TSC_APP_ERROR;
......@@ -667,6 +689,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList
.requestObjRefId = pRequest->self};
SSchedulerReq req = {
.syncReq = true,
.localReq = (tsQueryPolicy == CLIENT_HANDLE),
.pConn = &conn,
.pNodeList = pNodeList,
.pDag = pDag,
......@@ -1042,6 +1065,7 @@ void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultM
.pTrans = pAppInfo->pTransporter, .requestId = pRequest->requestId, .requestObjRefId = pRequest->self};
SSchedulerReq req = {
.syncReq = false,
.localReq = (tsQueryPolicy == CLIENT_HANDLE),
.pConn = &conn,
.pNodeList = pNodeList,
.pDag = pDag,
......
......@@ -26,6 +26,7 @@
#include "tref.h"
#include "trpc.h"
#include "version.h"
#include "qworker.h"
#define TSC_VAR_NOT_RELEASE 1
#define TSC_VAR_RELEASED 0
......@@ -75,6 +76,8 @@ void taos_cleanup(void) {
cleanupTaskQueue();
qWorkerDestroy(&tscQueryMgmt);
taosConvDestroy();
tscInfo("all local resources released");
......
......@@ -170,7 +170,7 @@ _exit:
}
int32_t mndInitQuery(SMnode *pMnode) {
if (qWorkerInit(NODE_TYPE_MNODE, MNODE_HANDLE, NULL, (void **)&pMnode->pQuery, &pMnode->msgCb) != 0) {
if (qWorkerInit(NODE_TYPE_MNODE, MNODE_HANDLE, (void **)&pMnode->pQuery, &pMnode->msgCb) != 0) {
mError("failed to init qworker in mnode since %s", terrstr());
return -1;
}
......
......@@ -26,7 +26,7 @@ SQnode *qndOpen(const SQnodeOpt *pOption) {
return NULL;
}
if (qWorkerInit(NODE_TYPE_QNODE, pQnode->qndId, NULL, (void **)&pQnode->pQuery, &pOption->msgCb)) {
if (qWorkerInit(NODE_TYPE_QNODE, pQnode->qndId, (void **)&pQnode->pQuery, &pOption->msgCb)) {
taosMemoryFreeClear(pQnode);
return NULL;
}
......
......@@ -16,7 +16,7 @@
#include "vnd.h"
int vnodeQueryOpen(SVnode *pVnode) {
return qWorkerInit(NODE_TYPE_VNODE, TD_VID(pVnode), NULL, (void **)&pVnode->pQuery, &pVnode->msgCb);
return qWorkerInit(NODE_TYPE_VNODE, TD_VID(pVnode), (void **)&pVnode->pQuery, &pVnode->msgCb);
}
void vnodeQueryClose(SVnode *pVnode) { qWorkerDestroy((void **)&pVnode->pQuery); }
......
......@@ -29,7 +29,7 @@ extern "C" {
#include "executor.h"
#include "trpc.h"
#define QW_DEFAULT_SCHEDULER_NUMBER 10000
#define QW_DEFAULT_SCHEDULER_NUMBER 100
#define QW_DEFAULT_TASK_NUMBER 10000
#define QW_DEFAULT_SCH_TASK_NUMBER 10000
#define QW_DEFAULT_SHORT_RUN_TIMES 2
......@@ -93,7 +93,7 @@ typedef struct SQWMsg {
void *node;
int32_t code;
int32_t msgType;
char *msg;
void *msg;
int32_t msgLen;
SQWMsgInfo msgInfo;
SRpcHandleInfo connInfo;
......
......@@ -512,11 +512,6 @@ int32_t qwAbortPrerocessQuery(QW_FPARAMS_DEF) {
int32_t qwPreprocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
int32_t code = 0;
bool queryRsped = false;
SSubplan *plan = NULL;
SQWPhaseInput input = {0};
qTaskInfo_t pTaskInfo = NULL;
DataSinkHandle sinkHandle = NULL;
SQWTaskCtx *ctx = NULL;
QW_ERR_JRET(qwRegisterQueryBrokenLinkArg(QW_FPARAMS(), &qwMsg->connInfo));
......@@ -578,19 +573,12 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char *sql) {
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
}
// QW_ERR_JRET(qwBuildAndSendQueryRsp(&qwMsg->connInfo, code));
// QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
// queryRsped = true;
ctx->level = plan->level;
atomic_store_ptr(&ctx->taskHandle, pTaskInfo);
atomic_store_ptr(&ctx->sinkHandle, sinkHandle);
if (pTaskInfo && sinkHandle) {
qwSaveTbVersionInfo(pTaskInfo, ctx);
QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, NULL));
}
_return:
......@@ -600,11 +588,6 @@ _return:
input.msgType = qwMsg->msgType;
code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_QUERY, &input, NULL);
// if (!queryRsped) {
// qwBuildAndSendQueryRsp(&qwMsg->connInfo, code);
// QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
//}
QW_RET(TSDB_CODE_SUCCESS);
}
......@@ -1000,8 +983,8 @@ _return:
QW_RET(TSDB_CODE_SUCCESS);
}
int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qWorkerMgmt, const SMsgCb *pMsgCb) {
if (NULL == qWorkerMgmt || pMsgCb->mgmt == NULL) {
int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, void **qWorkerMgmt, const SMsgCb *pMsgCb) {
if (NULL == qWorkerMgmt || (pMsgCb && pMsgCb->mgmt == NULL)) {
qError("invalid param to init qworker");
QW_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
......@@ -1024,22 +1007,9 @@ int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qW
QW_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
if (cfg) {
mgmt->cfg = *cfg;
if (0 == mgmt->cfg.maxSchedulerNum) {
mgmt->cfg.maxSchedulerNum = QW_DEFAULT_SCHEDULER_NUMBER;
}
if (0 == mgmt->cfg.maxTaskNum) {
mgmt->cfg.maxTaskNum = QW_DEFAULT_TASK_NUMBER;
}
if (0 == mgmt->cfg.maxSchTaskNum) {
mgmt->cfg.maxSchTaskNum = QW_DEFAULT_SCH_TASK_NUMBER;
}
} else {
mgmt->cfg.maxSchedulerNum = QW_DEFAULT_SCHEDULER_NUMBER;
mgmt->cfg.maxTaskNum = QW_DEFAULT_TASK_NUMBER;
mgmt->cfg.maxSchTaskNum = QW_DEFAULT_SCH_TASK_NUMBER;
}
mgmt->schHash = taosHashInit(mgmt->cfg.maxSchedulerNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false,
HASH_ENTRY_LOCK);
......@@ -1064,7 +1034,7 @@ int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qW
mgmt->nodeType = nodeType;
mgmt->nodeId = nodeId;
mgmt->msgCb = *pMsgCb;
mgmt->msgCb = pMsgCb ? *pMsgCb : NULL;
mgmt->refId = taosAddRef(gQwMgmt.qwRef, mgmt);
if (mgmt->refId < 0) {
......@@ -1140,3 +1110,51 @@ int32_t qWorkerGetStat(SReadHandle *handle, void *qWorkerMgmt, SQWorkerStat *pSt
return TSDB_CODE_SUCCESS;
}
int32_t qWorkerProcessLocalQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
int32_t code = 0;
SQWTaskCtx *ctx = NULL;
SSubplan *plan = (SSubplan *)qwMsg->msg;
SQWPhaseInput input = {0};
qTaskInfo_t pTaskInfo = NULL;
DataSinkHandle sinkHandle = NULL;
QW_ERR_JRET(qwAddTaskCtx(QW_FPARAMS()));
QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx));
QW_ERR_JRET(qwAddTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_INIT));
ctx->taskType = qwMsg->msgInfo.taskType;
ctx->explain = qwMsg->msgInfo.explain;
ctx->needFetch = qwMsg->msgInfo.needFetch;
ctx->msgType = qwMsg->msgType;
code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, NULL, OPTR_EXEC_MODEL_BATCH);
if (code) {
QW_TASK_ELOG("qCreateExecTask failed, code:%x - %s", code, tstrerror(code));
QW_ERR_JRET(code);
}
if (NULL == sinkHandle || NULL == pTaskInfo) {
QW_TASK_ELOG("create task result error, taskHandle:%p, sinkHandle:%p", pTaskInfo, sinkHandle);
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
}
ctx->level = plan->level;
atomic_store_ptr(&ctx->taskHandle, pTaskInfo);
atomic_store_ptr(&ctx->sinkHandle, sinkHandle);
QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, NULL));
_return:
if (ctx) {
QW_UPDATE_RSP_CODE(ctx, code);
qwReleaseTaskCtx(mgmt, ctx);
}
QW_RET(code);
}
......@@ -877,7 +877,7 @@ TEST(seqTest, normalCase) {
SMsgCb msgCb = {0};
msgCb.mgmt = (void *)mockPointer;
msgCb.putToQueueFp = (PutToQueueFp)qwtPutReqToQueue;
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb);
code = qWorkerInit(NODE_TYPE_VNODE, 1, &mgmt, &msgCb);
ASSERT_EQ(code, 0);
code = qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc, 0);
......@@ -913,7 +913,7 @@ TEST(seqTest, cancelFirst) {
SMsgCb msgCb = {0};
msgCb.mgmt = (void *)mockPointer;
msgCb.putToQueueFp = (PutToQueueFp)qwtPutReqToQueue;
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb);
code = qWorkerInit(NODE_TYPE_VNODE, 1, &mgmt, &msgCb);
ASSERT_EQ(code, 0);
code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc, 0);
......@@ -950,7 +950,7 @@ TEST(seqTest, randCase) {
SMsgCb msgCb = {0};
msgCb.mgmt = (void *)mockPointer;
msgCb.putToQueueFp = (PutToQueueFp)qwtPutReqToQueue;
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb);
code = qWorkerInit(NODE_TYPE_VNODE, 1, &mgmt, &msgCb);
ASSERT_EQ(code, 0);
int32_t t = 0;
......@@ -1021,7 +1021,7 @@ TEST(seqTest, multithreadRand) {
SMsgCb msgCb = {0};
msgCb.mgmt = (void *)mockPointer;
msgCb.putToQueueFp = (PutToQueueFp)qwtPutReqToQueue;
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb);
code = qWorkerInit(NODE_TYPE_VNODE, 1, &mgmt, &msgCb);
ASSERT_EQ(code, 0);
TdThreadAttr thattr;
......@@ -1084,7 +1084,7 @@ TEST(rcTest, shortExecshortDelay) {
SMsgCb msgCb = {0};
msgCb.mgmt = (void *)mockPointer;
msgCb.putToQueueFp = (PutToQueueFp)qwtPutReqToQueue;
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb);
code = qWorkerInit(NODE_TYPE_VNODE, 1, &mgmt, &msgCb);
ASSERT_EQ(code, 0);
qwtTestMaxExecTaskUsec = 0;
......@@ -1168,7 +1168,7 @@ TEST(rcTest, longExecshortDelay) {
SMsgCb msgCb = {0};
msgCb.mgmt = (void *)mockPointer;
msgCb.putToQueueFp = (PutToQueueFp)qwtPutReqToQueue;
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb);
code = qWorkerInit(NODE_TYPE_VNODE, 1, &mgmt, &msgCb);
ASSERT_EQ(code, 0);
qwtTestMaxExecTaskUsec = 1000000;
......@@ -1254,7 +1254,7 @@ TEST(rcTest, shortExeclongDelay) {
SMsgCb msgCb = {0};
msgCb.mgmt = (void *)mockPointer;
msgCb.putToQueueFp = (PutToQueueFp)qwtPutReqToQueue;
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb);
code = qWorkerInit(NODE_TYPE_VNODE, 1, &mgmt, &msgCb);
ASSERT_EQ(code, 0);
qwtTestMaxExecTaskUsec = 0;
......@@ -1338,7 +1338,7 @@ TEST(rcTest, dropTest) {
SMsgCb msgCb = {0};
msgCb.mgmt = (void *)mockPointer;
msgCb.putToQueueFp = (PutToQueueFp)qwtPutReqToQueue;
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb);
code = qWorkerInit(NODE_TYPE_VNODE, 1, &mgmt, &msgCb);
ASSERT_EQ(code, 0);
tsem_init(&qwtTestQuerySem, 0, 0);
......
......@@ -151,6 +151,7 @@ typedef struct SSchedulerMgmt {
SSchStat stat;
SRWLatch hbLock;
SHashObj *hbConnections;
void *queryMgmt;
} SSchedulerMgmt;
typedef struct SSchCallbackParamHeader {
......@@ -237,6 +238,7 @@ typedef struct SSchJobAttr {
bool queryJob;
bool needFetch;
bool needFlowCtrl;
bool localExec;
} SSchJobAttr;
typedef struct {
......@@ -302,6 +304,7 @@ extern SSchedulerMgmt schMgmt;
#define SCH_IS_DATA_BIND_QRY_TASK(task) ((task)->plan->subplanType == SUBPLAN_TYPE_SCAN)
#define SCH_IS_DATA_BIND_TASK(task) (((task)->plan->subplanType == SUBPLAN_TYPE_SCAN) || ((task)->plan->subplanType == SUBPLAN_TYPE_MODIFY))
#define SCH_IS_LEAF_TASK(_job, _task) (((_task)->level->level + 1) == (_job)->levelNum)
#define SCH_IS_DATA_MERGE_TASK(task) (!SCH_IS_DATA_BIND_TASK(task))
#define SCH_SET_TASK_STATUS(task, st) atomic_store_8(&(task)->status, st)
#define SCH_GET_TASK_STATUS(task) atomic_load_8(&(task)->status)
......
......@@ -719,6 +719,7 @@ int32_t schInitJob(int64_t *pJobId, SSchedulerReq *pReq) {
}
pJob->attr.explainMode = pReq->pDag->explainInfo.mode;
pJob->attr.localExec = pReq->localReq;
pJob->conn = *pReq->pConn;
if (pReq->sql) {
pJob->sql = strdup(pReq->sql);
......
......@@ -228,7 +228,6 @@ int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode)
SCH_RET(errCode);
}
// Note: no more task error processing, handled in function internal
int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
bool moved = false;
int32_t code = 0;
......@@ -819,6 +818,48 @@ int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId *pEpId, SArray *pStatusList) {
return TSDB_CODE_SUCCESS;
}
int32_t schLaunchRemoteTask(SSchJob *pJob, SSchTask *pTask) {
SSubplan *plan = pTask->plan;
int32_t code = 0;
if (NULL == pTask->msg) { // TODO add more detailed reason for failure
code = qSubPlanToString(plan, &pTask->msg, &pTask->msgLen);
if (TSDB_CODE_SUCCESS != code) {
SCH_TASK_ELOG("failed to create physical plan, code:%s, msg:%p, len:%d", tstrerror(code), pTask->msg,
pTask->msgLen);
SCH_ERR_RET(code);
} else {
SCH_TASK_DLOGL("physical plan len:%d, %s", pTask->msgLen, pTask->msg);
}
}
SCH_ERR_RET(schSetTaskCandidateAddrs(pJob, pTask));
if (SCH_IS_QUERY_JOB(pJob)) {
SCH_ERR_RET(schEnsureHbConnection(pJob, pTask));
}
SCH_RET(schBuildAndSendMsg(pJob, pTask, NULL, plan->msgType));
}
int32_t schLaunchLocalTask(SSchJob *pJob, SSchTask *pTask) {
//SCH_ERR_JRET(schSetTaskCandidateAddrs(pJob, pTask));
if (NULL == schMgmt.queryMgmt) {
SCH_ERR_RET(qWorkerInit(NODE_TYPE_CLIENT, CLIENT_HANDLE, (void **)&schMgmt.queryMgmt, NULL));
}
SQWMsg qwMsg = {0};
qwMsg.msgInfo.taskType = TASK_TYPE_TEMP;
qwMsg.msgInfo.explain = SCH_IS_EXPLAIN_JOB(pJob);
qwMsg.msgInfo.needFetch = SCH_TASK_NEED_FETCH(pTask);
qwMsg.msg = pTask->plan;
qwMsg.msgType = pTask->plan->msgType;
SCH_ERR_RET(qWorkerProcessLocalQuery((SQWorker*)schMgmt.queryMgmt, schMgmt.sId, pJob->queryId, pTask->taskId, pJob->refId, pTask->execId, &qwMsg));
SCH_RET(schProcessOnTaskSuccess(pJob, pTask));
}
int32_t schLaunchTaskImpl(void *param) {
SSchTaskCtx *pCtx = (SSchTaskCtx *)param;
SSchJob *pJob = schAcquireJob(pCtx->jobRid);
......@@ -852,27 +893,12 @@ int32_t schLaunchTaskImpl(void *param) {
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_EXEC);
}
SSubplan *plan = pTask->plan;
if (NULL == pTask->msg) { // TODO add more detailed reason for failure
code = qSubPlanToString(plan, &pTask->msg, &pTask->msgLen);
if (TSDB_CODE_SUCCESS != code) {
SCH_TASK_ELOG("failed to create physical plan, code:%s, msg:%p, len:%d", tstrerror(code), pTask->msg,
pTask->msgLen);
SCH_ERR_JRET(code);
if (pJob->attr.localExec && SCH_IS_QUERY_JOB(pJob) && SCH_IS_DATA_MERGE_TASK(pTask)) {
SCH_ERR_JRET(schLaunchLocalTask(pJob, pTask));
} else {
SCH_TASK_DLOGL("physical plan len:%d, %s", pTask->msgLen, pTask->msg);
}
}
SCH_ERR_JRET(schSetTaskCandidateAddrs(pJob, pTask));
if (SCH_IS_QUERY_JOB(pJob)) {
SCH_ERR_JRET(schEnsureHbConnection(pJob, pTask));
SCH_ERR_JRET(schLaunchRemoteTask(pJob, pTask));
}
SCH_ERR_JRET(schBuildAndSendMsg(pJob, pTask, NULL, plan->msgType));
_return:
taosMemoryFree(param);
......@@ -892,7 +918,6 @@ _return:
}
int32_t schAsyncLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) {
SSchTaskCtx *param = taosMemoryCalloc(1, sizeof(SSchTaskCtx));
if (NULL == param) {
SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册