提交 7ea13edc 编写于 作者: D dapan1121

feature/scheduler

上级 13690803
......@@ -520,7 +520,6 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) {
while (true) {
QW_TASK_DLOG("start to execTask, loopIdx:%d", i++);
taosSsleep(20);
code = qExecTask(*taskHandle, &pRes, &useconds);
if (code) {
QW_TASK_ELOG("qExecTask failed, code:%x - %s", code, tstrerror(code));
......@@ -1196,8 +1195,8 @@ int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
QW_ERR_JRET(qwKillTaskHandle(QW_FPARAMS(), ctx));
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_DROPPING);
} else if (ctx->phase > 0) {
qwBuildAndSendDropRsp(&ctx->connInfo, code);
QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->connInfo.handle, code, tstrerror(code));
qwBuildAndSendDropRsp(&qwMsg->connInfo, code);
QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
rsped = true;
......@@ -1206,7 +1205,7 @@ int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
}
if (!rsped) {
ctx->connInfo.handle == qwMsg->connInfo.handle;
ctx->connInfo.handle = qwMsg->connInfo.handle;
ctx->connInfo.ahandle = qwMsg->connInfo.ahandle;
QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_DROP);
......
......@@ -103,6 +103,11 @@ typedef struct SSchFlowControl {
SArray *taskList; // Element is SSchTask*
} SSchFlowControl;
typedef struct SSchNodeInfo {
SQueryNodeAddr addr;
void *handle;
} SSchNodeInfo;
typedef struct SSchLevel {
int32_t level;
int8_t status;
......@@ -128,7 +133,7 @@ typedef struct SSchTask {
SQueryNodeAddr succeedAddr; // task executed success node address
int8_t candidateIdx; // current try condidation index
SArray *candidateAddrs; // condidate node addresses, element is SQueryNodeAddr
SArray *execAddrs; // all tried node for current task, element is SQueryNodeAddr
SArray *execNodes; // all tried node for current task, element is SSchNodeInfo
SQueryProfileSummary summary; // task execution summary
int32_t childReady; // child task ready number
SArray *children; // the datasource tasks,from which to fetch the result, element is SQueryTask*
......@@ -190,6 +195,8 @@ extern SSchedulerMgmt schMgmt;
#define SCH_GET_TASK_STATUS(task) atomic_load_8(&(task)->status)
#define SCH_GET_TASK_STATUS_STR(task) jobTaskStatusStr(SCH_GET_TASK_STATUS(task))
#define SCH_GET_TASK_HANDLE(_task) ((_task) ? (_task)->handle : NULL)
#define SCH_SET_TASK_HANDLE(_task, _handle) ((_task)->handle = (_handle))
#define SCH_SET_JOB_STATUS(job, st) atomic_store_8(&(job)->status, st)
#define SCH_GET_JOB_STATUS(job) atomic_load_8(&(job)->status)
......
......@@ -57,9 +57,9 @@ int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *
pTask->level = pLevel;
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_NOT_START);
pTask->taskId = schGenTaskId();
pTask->execAddrs = taosArrayInit(SCH_MAX_CANDIDATE_EP_NUM, sizeof(SQueryNodeAddr));
if (NULL == pTask->execAddrs) {
SCH_TASK_ELOG("taosArrayInit %d exec addrs failed", SCH_MAX_CANDIDATE_EP_NUM);
pTask->execNodes = taosArrayInit(SCH_MAX_CANDIDATE_EP_NUM, sizeof(SSchNodeInfo));
if (NULL == pTask->execNodes) {
SCH_TASK_ELOG("taosArrayInit %d execNodes failed", SCH_MAX_CANDIDATE_EP_NUM);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
......@@ -101,8 +101,8 @@ void schFreeTask(SSchTask* pTask) {
taosArrayDestroy(pTask->parents);
}
if (pTask->execAddrs) {
taosArrayDestroy(pTask->execAddrs);
if (pTask->execNodes) {
taosArrayDestroy(pTask->execNodes);
}
}
......@@ -355,12 +355,16 @@ int32_t schRecordTaskSucceedNode(SSchJob *pJob, SSchTask *pTask) {
return TSDB_CODE_SUCCESS;
}
int32_t schRecordTaskExecNode(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr) {
if (NULL == taosArrayPush(pTask->execAddrs, addr)) {
SCH_TASK_ELOG("taosArrayPush addr to execAddr list failed, errno:%d", errno);
int32_t schRecordTaskExecNode(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, void *handle) {
SSchNodeInfo nodeInfo = {.addr = *addr, .handle = handle};
if (NULL == taosArrayPush(pTask->execNodes, &nodeInfo)) {
SCH_TASK_ELOG("taosArrayPush nodeInfo to execNodes list failed, errno:%d", errno);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
SCH_TASK_DLOG("task execNode recorded, handle:%p", handle);
return TSDB_CODE_SUCCESS;
}
......@@ -1090,7 +1094,7 @@ int32_t schHandleCallback(void *param, const SDataBuf *pMsg, int32_t msgType, in
SSchJob *pJob = schAcquireJob(pParam->refId);
if (NULL == pJob) {
qError("QID:0x%" PRIx64 ",TID:0x%" PRIx64 "taosAcquireRef job failed, may be dropped, refId:%" PRIx64,
qWarn("QID:0x%" PRIx64 ",TID:0x%" PRIx64 "taosAcquireRef job failed, may be dropped, refId:%" PRIx64,
pParam->queryId, pParam->taskId, pParam->refId);
SCH_ERR_JRET(TSDB_CODE_QRY_JOB_FREED);
}
......@@ -1110,7 +1114,7 @@ int32_t schHandleCallback(void *param, const SDataBuf *pMsg, int32_t msgType, in
pTask = *task;
SCH_TASK_DLOG("rsp msg received, type:%s, handle:%p, code:%s", TMSG_INFO(msgType), pMsg->handle, tstrerror(rspCode));
pTask->handle = pMsg->handle;
SCH_SET_TASK_HANDLE(pTask, pMsg->handle);
SCH_ERR_JRET(schHandleResponseMsg(pJob, pTask, msgType, pMsg->pData, pMsg->len, rspCode));
_return:
......@@ -1849,11 +1853,11 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
SCH_SET_TASK_LASTMSG_TYPE(pTask, msgType);
SSchTrans trans = {.transInst = pJob->transport, .transHandle = pTask ? pTask->handle : NULL};
SSchTrans trans = {.transInst = pJob->transport, .transHandle = SCH_GET_TASK_HANDLE(pTask)};
SCH_ERR_JRET(schAsyncSendMsg(pJob, pTask, &trans, &epSet, msgType, msg, msgSize, persistHandle, (rpcCtx.args ? &rpcCtx : NULL)));
if (isCandidateAddr) {
SCH_ERR_RET(schRecordTaskExecNode(pJob, pTask, addr));
if (msgType == TDMT_VND_QUERY) {
SCH_ERR_RET(schRecordTaskExecNode(pJob, pTask, addr, trans.transHandle));
}
return TSDB_CODE_SUCCESS;
......@@ -1935,6 +1939,8 @@ int32_t schLaunchTask(SSchJob *pJob, SSchTask *pTask) {
bool enough = false;
int32_t code = 0;
SCH_SET_TASK_HANDLE(pTask, NULL);
if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) {
SCH_ERR_JRET(schCheckIncTaskFlowQuota(pJob, pTask, &enough));
......@@ -1975,23 +1981,24 @@ int32_t schLaunchJob(SSchJob *pJob) {
}
void schDropTaskOnExecutedNode(SSchJob *pJob, SSchTask *pTask) {
if (NULL == pTask->execAddrs) {
if (NULL == pTask->execNodes) {
SCH_TASK_DLOG("no exec address, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
return;
}
int32_t size = (int32_t)taosArrayGetSize(pTask->execAddrs);
int32_t size = (int32_t)taosArrayGetSize(pTask->execNodes);
if (size <= 0) {
SCH_TASK_DLOG("task has no exec address, no need to drop it, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
SCH_TASK_DLOG("task has no execNodes, no need to drop it, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
return;
}
SQueryNodeAddr *addr = NULL;
SSchNodeInfo *nodeInfo = NULL;
for (int32_t i = 0; i < size; ++i) {
addr = (SQueryNodeAddr *)taosArrayGet(pTask->execAddrs, i);
nodeInfo = (SSchNodeInfo *)taosArrayGet(pTask->execNodes, i);
SCH_SET_TASK_HANDLE(pTask, nodeInfo->handle);
schBuildAndSendMsg(pJob, pTask, addr, TDMT_VND_DROP_TASK);
schBuildAndSendMsg(pJob, pTask, &nodeInfo->addr, TDMT_VND_DROP_TASK);
}
SCH_TASK_DLOG("task has %d exec address", size);
......
......@@ -261,7 +261,7 @@ void cliHandleResp(SCliConn* conn) {
if (pMsg == NULL) {
transMsg.ahandle = transCtxDumpVal(&conn->ctx, transMsg.msgType);
tDebug("cli conn %p construct ahandle %p by %d, persist: 1", conn, transMsg.ahandle, transMsg.msgType);
if (!CONN_RELEASE_BY_SERVER(conn)&& transMsg.ahandle = NULL) {
if (!CONN_RELEASE_BY_SERVER(conn)&& transMsg.ahandle == NULL) {
transMsg.ahandle = transCtxDumpBrokenlinkVal(&conn->ctx, (int32_t*)&(transMsg.msgType));
tDebug("cli conn %p construct ahandle %p due brokenlink, persist: 1", conn, transMsg.ahandle);
}
......@@ -330,10 +330,12 @@ void cliHandleExcept(SCliConn* pConn) {
}
SCliThrdObj* pThrd = pConn->hostThrd;
STrans* pTransInst = pThrd->pTransInst;
while (!transQueueEmpty(&pConn->cliMsgs)) {
bool once = false;
do {
SCliMsg* pMsg = transQueuePop(&pConn->cliMsgs);
if (pMsg == NULL && once) {
break;
}
STransConnCtx* pCtx = pMsg ? pMsg->ctx : NULL;
STransMsg transMsg = {0};
......@@ -356,6 +358,7 @@ void cliHandleExcept(SCliConn* pConn) {
if (pCtx == NULL || pCtx->pSem == NULL) {
tTrace("%s cli conn %p handle except", pTransInst->label, pConn);
if (transMsg.ahandle == NULL) {
once = true;
continue;
}
(pTransInst->cfp)(pTransInst->parent, &transMsg, NULL);
......@@ -366,7 +369,7 @@ void cliHandleExcept(SCliConn* pConn) {
}
destroyCmsg(pMsg);
tTrace("%s cli conn %p start to destroy", CONN_GET_INST_LABEL(pConn), pConn);
};
} while (!transQueueEmpty(&pConn->cliMsgs));
transUnrefCliHandle(pConn);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册