未验证 提交 ba285620 编写于 作者: L Li Minghao 提交者: GitHub

Merge pull request #14797 from taosdata/feature/3.0_mhli

refactor(sync): add test case
...@@ -35,7 +35,7 @@ int32_t schValidateRspMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgType) { ...@@ -35,7 +35,7 @@ int32_t schValidateRspMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgType) {
SCH_TASK_ELOG("rsp msg type mis-match, last sent msgType:%s, rspType:%s", TMSG_INFO(lastMsgType), SCH_TASK_ELOG("rsp msg type mis-match, last sent msgType:%s, rspType:%s", TMSG_INFO(lastMsgType),
TMSG_INFO(msgType)); TMSG_INFO(msgType));
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
} }
if (taskStatus != JOB_TASK_STATUS_PART_SUCC) { if (taskStatus != JOB_TASK_STATUS_PART_SUCC) {
SCH_TASK_ELOG("rsp msg conflicted with task status, status:%s, rspType:%s", jobTaskStatusStr(taskStatus), SCH_TASK_ELOG("rsp msg conflicted with task status, status:%s, rspType:%s", jobTaskStatusStr(taskStatus),
TMSG_INFO(msgType)); TMSG_INFO(msgType));
...@@ -75,7 +75,7 @@ int32_t schValidateRspMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgType) { ...@@ -75,7 +75,7 @@ int32_t schValidateRspMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgType) {
// Note: no more task error processing, handled in function internal // Note: no more task error processing, handled in function internal
int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDataBuf *pMsg, int32_t rspCode) { int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDataBuf *pMsg, int32_t rspCode) {
int32_t code = 0; int32_t code = 0;
char *msg = pMsg->pData; char *msg = pMsg->pData;
int32_t msgSize = pMsg->len; int32_t msgSize = pMsg->len;
int32_t msgType = pMsg->msgType; int32_t msgType = pMsg->msgType;
...@@ -253,15 +253,15 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa ...@@ -253,15 +253,15 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa
rsp->sversion = ntohl(rsp->sversion); rsp->sversion = ntohl(rsp->sversion);
rsp->tversion = ntohl(rsp->tversion); rsp->tversion = ntohl(rsp->tversion);
rsp->affectedRows = be64toh(rsp->affectedRows); rsp->affectedRows = be64toh(rsp->affectedRows);
SCH_ERR_JRET(rsp->code); SCH_ERR_JRET(rsp->code);
SCH_ERR_JRET(schSaveJobQueryRes(pJob, rsp)); SCH_ERR_JRET(schSaveJobQueryRes(pJob, rsp));
atomic_add_fetch_32(&pJob->resNumOfRows, rsp->affectedRows); atomic_add_fetch_32(&pJob->resNumOfRows, rsp->affectedRows);
taosMemoryFreeClear(msg); taosMemoryFreeClear(msg);
SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask)); SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));
break; break;
...@@ -375,7 +375,8 @@ int32_t schHandleCallback(void *param, SDataBuf *pMsg, int32_t rspCode) { ...@@ -375,7 +375,8 @@ int32_t schHandleCallback(void *param, SDataBuf *pMsg, int32_t rspCode) {
SSchTask *pTask = NULL; SSchTask *pTask = NULL;
SSchJob *pJob = NULL; SSchJob *pJob = NULL;
qDebug("begin to handle rsp msg, type:%s, handle:%p, code:%s", TMSG_INFO(pMsg->msgType), pMsg->handle, tstrerror(rspCode)); qDebug("begin to handle rsp msg, type:%s, handle:%p, code:%s", TMSG_INFO(pMsg->msgType), pMsg->handle,
tstrerror(rspCode));
SCH_ERR_RET(schProcessOnCbBegin(&pJob, &pTask, pParam->queryId, pParam->refId, pParam->taskId)); SCH_ERR_RET(schProcessOnCbBegin(&pJob, &pTask, pParam->queryId, pParam->refId, pParam->taskId));
...@@ -387,7 +388,8 @@ int32_t schHandleCallback(void *param, SDataBuf *pMsg, int32_t rspCode) { ...@@ -387,7 +388,8 @@ int32_t schHandleCallback(void *param, SDataBuf *pMsg, int32_t rspCode) {
taosMemoryFreeClear(pMsg->pData); taosMemoryFreeClear(pMsg->pData);
taosMemoryFreeClear(param); taosMemoryFreeClear(param);
qDebug("end to handle rsp msg, type:%s, handle:%p, code:%s", TMSG_INFO(pMsg->msgType), pMsg->handle, tstrerror(rspCode)); qDebug("end to handle rsp msg, type:%s, handle:%p, code:%s", TMSG_INFO(pMsg->msgType), pMsg->handle,
tstrerror(rspCode));
SCH_RET(code); SCH_RET(code);
} }
...@@ -424,7 +426,7 @@ int32_t schHandleCommitCallback(void *param, SDataBuf *pMsg, int32_t code) { ...@@ -424,7 +426,7 @@ int32_t schHandleCommitCallback(void *param, SDataBuf *pMsg, int32_t code) {
} }
int32_t schHandleHbCallback(void *param, SDataBuf *pMsg, int32_t code) { int32_t schHandleHbCallback(void *param, SDataBuf *pMsg, int32_t code) {
SSchedulerHbRsp rsp = {0}; SSchedulerHbRsp rsp = {0};
SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param; SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param;
if (code) { if (code) {
...@@ -453,8 +455,8 @@ _return: ...@@ -453,8 +455,8 @@ _return:
SCH_RET(code); SCH_RET(code);
} }
int32_t schMakeCallbackParam(SSchJob *pJob, SSchTask *pTask, int32_t msgType, bool isHb, SSchTrans *trans,
int32_t schMakeCallbackParam(SSchJob *pJob, SSchTask *pTask, int32_t msgType, bool isHb, SSchTrans *trans, void **pParam) { void **pParam) {
if (!isHb) { if (!isHb) {
SSchTaskCallbackParam *param = taosMemoryCalloc(1, sizeof(SSchTaskCallbackParam)); SSchTaskCallbackParam *param = taosMemoryCalloc(1, sizeof(SSchTaskCallbackParam));
if (NULL == param) { if (NULL == param) {
...@@ -940,7 +942,8 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, ...@@ -940,7 +942,8 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
if (NULL == addr) { if (NULL == addr) {
addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx); addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
isCandidateAddr = true; isCandidateAddr = true;
SCH_TASK_DLOG("target candidateIdx %d", pTask->candidateIdx); SCH_TASK_DLOG("target candidateIdx %d, epInUse %d/%d", pTask->candidateIdx, addr->epSet.inUse,
addr->epSet.numOfEps);
} }
switch (msgType) { switch (msgType) {
......
...@@ -21,11 +21,9 @@ ...@@ -21,11 +21,9 @@
#include "tref.h" #include "tref.h"
#include "trpc.h" #include "trpc.h"
void schFreeTask(SSchJob *pJob, SSchTask *pTask) { void schFreeTask(SSchJob *pJob, SSchTask *pTask) {
schDeregisterTaskHb(pJob, pTask); schDeregisterTaskHb(pJob, pTask);
if (pTask->candidateAddrs) { if (pTask->candidateAddrs) {
taosArrayDestroy(pTask->candidateAddrs); taosArrayDestroy(pTask->candidateAddrs);
} }
...@@ -45,17 +43,17 @@ void schFreeTask(SSchJob *pJob, SSchTask *pTask) { ...@@ -45,17 +43,17 @@ void schFreeTask(SSchJob *pJob, SSchTask *pTask) {
} }
} }
int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel, int32_t levelNum) { int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel, int32_t levelNum) {
int32_t code = 0; int32_t code = 0;
pTask->plan = pPlan; pTask->plan = pPlan;
pTask->level = pLevel; pTask->level = pLevel;
pTask->execId = -1; pTask->execId = -1;
pTask->maxExecTimes = SCH_TASK_MAX_EXEC_TIMES(pLevel->level, levelNum); pTask->maxExecTimes = SCH_TASK_MAX_EXEC_TIMES(pLevel->level, levelNum);
pTask->timeoutUsec = SCH_DEFAULT_TASK_TIMEOUT_USEC; pTask->timeoutUsec = SCH_DEFAULT_TASK_TIMEOUT_USEC;
pTask->taskId = schGenTaskId(); pTask->taskId = schGenTaskId();
pTask->execNodes = taosHashInit(SCH_MAX_CANDIDATE_EP_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); pTask->execNodes =
taosHashInit(SCH_MAX_CANDIDATE_EP_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
pTask->profile.execTime = taosMemoryCalloc(pTask->maxExecTimes, sizeof(int64_t)); pTask->profile.execTime = taosMemoryCalloc(pTask->maxExecTimes, sizeof(int64_t));
if (NULL == pTask->execNodes || NULL == pTask->profile.execTime) { if (NULL == pTask->execNodes || NULL == pTask->profile.execTime) {
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
...@@ -110,8 +108,8 @@ int32_t schDropTaskExecNode(SSchJob *pJob, SSchTask *pTask, void *handle, int32_ ...@@ -110,8 +108,8 @@ int32_t schDropTaskExecNode(SSchJob *pJob, SSchTask *pTask, void *handle, int32_
} else { } else {
SCH_TASK_DLOG("execId %d removed from execNodeList", execId); SCH_TASK_DLOG("execId %d removed from execNodeList", execId);
} }
if (execId != pTask->execId) { // ignore it if (execId != pTask->execId) { // ignore it
SCH_TASK_DLOG("execId %d is not current execId %d", execId, pTask->execId); SCH_TASK_DLOG("execId %d is not current execId %d", execId, pTask->execId);
SCH_ERR_RET(TSDB_CODE_SCH_IGNORE_ERROR); SCH_ERR_RET(TSDB_CODE_SCH_IGNORE_ERROR);
} }
...@@ -149,13 +147,13 @@ int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode) ...@@ -149,13 +147,13 @@ int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode)
if (TSDB_CODE_SCH_IGNORE_ERROR == errCode) { if (TSDB_CODE_SCH_IGNORE_ERROR == errCode) {
return TSDB_CODE_SCH_IGNORE_ERROR; return TSDB_CODE_SCH_IGNORE_ERROR;
} }
int8_t status = 0; int8_t status = 0;
if (schJobNeedToStop(pJob, &status)) { if (schJobNeedToStop(pJob, &status)) {
SCH_TASK_DLOG("no more task failure processing cause of job status %s", jobTaskStatusStr(status)); SCH_TASK_DLOG("no more task failure processing cause of job status %s", jobTaskStatusStr(status));
SCH_ERR_RET(TSDB_CODE_SCH_IGNORE_ERROR); SCH_ERR_RET(TSDB_CODE_SCH_IGNORE_ERROR);
} }
if (SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_EXEC) { if (SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_EXEC) {
SCH_TASK_ELOG("task already not in EXEC status, status:%s", SCH_GET_TASK_STATUS_STR(pTask)); SCH_TASK_ELOG("task already not in EXEC status, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
...@@ -204,8 +202,6 @@ int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode) ...@@ -204,8 +202,6 @@ int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode)
SCH_RET(errCode); SCH_RET(errCode);
} }
// Note: no more task error processing, handled in function internal // Note: no more task error processing, handled in function internal
int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
bool moved = false; bool moved = false;
...@@ -265,13 +261,14 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { ...@@ -265,13 +261,14 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
int32_t readyNum = atomic_add_fetch_32(&parent->childReady, 1); int32_t readyNum = atomic_add_fetch_32(&parent->childReady, 1);
SCH_LOCK_TASK(parent); SCH_LOCK_TASK(parent);
SDownstreamSourceNode source = {.type = QUERY_NODE_DOWNSTREAM_SOURCE, SDownstreamSourceNode source = {
.taskId = pTask->taskId, .type = QUERY_NODE_DOWNSTREAM_SOURCE,
.schedId = schMgmt.sId, .taskId = pTask->taskId,
.execId = pTask->execId, .schedId = schMgmt.sId,
.addr = pTask->succeedAddr, .execId = pTask->execId,
.fetchMsgType = SCH_FETCH_TYPE(pTask), .addr = pTask->succeedAddr,
}; .fetchMsgType = SCH_FETCH_TYPE(pTask),
};
qSetSubplanExecutionNode(parent->plan, pTask->plan->id.groupId, &source); qSetSubplanExecutionNode(parent->plan, pTask->plan->id.groupId, &source);
SCH_UNLOCK_TASK(parent); SCH_UNLOCK_TASK(parent);
...@@ -291,29 +288,29 @@ int32_t schRescheduleTask(SSchJob *pJob, SSchTask *pTask) { ...@@ -291,29 +288,29 @@ int32_t schRescheduleTask(SSchJob *pJob, SSchTask *pTask) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (SCH_TASK_TIMEOUT(pTask) && JOB_TASK_STATUS_EXEC == pTask->status && if (SCH_TASK_TIMEOUT(pTask) && JOB_TASK_STATUS_EXEC == pTask->status && pJob->fetchTask != pTask &&
pJob->fetchTask != pTask && taosArrayGetSize(pTask->candidateAddrs) > 1) { taosArrayGetSize(pTask->candidateAddrs) > 1) {
SCH_TASK_DLOG("task execId %d will be rescheduled now", pTask->execId); SCH_TASK_DLOG("task execId %d will be rescheduled now", pTask->execId);
schDropTaskOnExecNode(pJob, pTask); schDropTaskOnExecNode(pJob, pTask);
taosHashClear(pTask->execNodes); taosHashClear(pTask->execNodes);
SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, TSDB_CODE_SCH_TIMEOUT_ERROR)); SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, TSDB_CODE_SCH_TIMEOUT_ERROR));
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf* pData, int32_t rspCode) { int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32_t rspCode) {
int32_t code = 0; int32_t code = 0;
if ((pTask->execId + 1) >= pTask->maxExecTimes) { if ((pTask->execId + 1) >= pTask->maxExecTimes) {
SCH_TASK_DLOG("task no more retry since reach max try times, execId:%d", pTask->execId); SCH_TASK_DLOG("task no more retry since reach max try times, execId:%d", pTask->execId);
schSwitchJobStatus(pJob, JOB_TASK_STATUS_FAIL, (void*)&rspCode); schSwitchJobStatus(pJob, JOB_TASK_STATUS_FAIL, (void *)&rspCode);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SCH_TASK_DLOG("task will be redirected now, status:%s", SCH_GET_TASK_STATUS_STR(pTask)); SCH_TASK_DLOG("task will be redirected now, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
schDropTaskOnExecNode(pJob, pTask); schDropTaskOnExecNode(pJob, pTask);
taosHashClear(pTask->execNodes); taosHashClear(pTask->execNodes);
SCH_ERR_JRET(schRemoveTaskFromExecList(pJob, pTask)); SCH_ERR_JRET(schRemoveTaskFromExecList(pJob, pTask));
...@@ -328,25 +325,24 @@ int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf* pData, int32 ...@@ -328,25 +325,24 @@ int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf* pData, int32
if (pData) { if (pData) {
SCH_ERR_JRET(schUpdateTaskCandidateAddr(pJob, pTask, pData->pEpSet)); SCH_ERR_JRET(schUpdateTaskCandidateAddr(pJob, pTask, pData->pEpSet));
} }
if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) { if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) {
if (JOB_TASK_STATUS_EXEC == SCH_GET_TASK_STATUS(pTask)) { if (JOB_TASK_STATUS_EXEC == SCH_GET_TASK_STATUS(pTask)) {
SCH_ERR_JRET(schLaunchTasksInFlowCtrlList(pJob, pTask)); SCH_ERR_JRET(schLaunchTasksInFlowCtrlList(pJob, pTask));
} }
} }
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_INIT); SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_INIT);
SCH_ERR_JRET(schLaunchTask(pJob, pTask)); SCH_ERR_JRET(schLaunchTask(pJob, pTask));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
// merge plan // merge plan
pTask->childReady = 0; pTask->childReady = 0;
qClearSubplanExecutionNode(pTask->plan); qClearSubplanExecutionNode(pTask->plan);
// Note: current error task and upper level merge task // Note: current error task and upper level merge task
...@@ -355,10 +351,10 @@ int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf* pData, int32 ...@@ -355,10 +351,10 @@ int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf* pData, int32
} }
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_INIT); SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_INIT);
int32_t childrenNum = taosArrayGetSize(pTask->children); int32_t childrenNum = taosArrayGetSize(pTask->children);
for (int32_t i = 0; i < childrenNum; ++i) { for (int32_t i = 0; i < childrenNum; ++i) {
SSchTask* pChild = taosArrayGetP(pTask->children, i); SSchTask *pChild = taosArrayGetP(pTask->children, i);
SCH_LOCK_TASK(pChild); SCH_LOCK_TASK(pChild);
schDoTaskRedirect(pJob, pChild, NULL, rspCode); schDoTaskRedirect(pJob, pChild, NULL, rspCode);
SCH_UNLOCK_TASK(pChild); SCH_UNLOCK_TASK(pChild);
...@@ -371,7 +367,7 @@ _return: ...@@ -371,7 +367,7 @@ _return:
SCH_RET(schProcessOnTaskFailure(pJob, pTask, code)); SCH_RET(schProcessOnTaskFailure(pJob, pTask, code));
} }
int32_t schHandleRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf* pData, int32_t rspCode) { int32_t schHandleRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32_t rspCode) {
int32_t code = 0; int32_t code = 0;
if (SCH_IS_DATA_BIND_TASK(pTask)) { if (SCH_IS_DATA_BIND_TASK(pTask)) {
...@@ -537,7 +533,7 @@ int32_t schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask) { ...@@ -537,7 +533,7 @@ int32_t schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask) {
SCH_ERR_RET(schRemoveTaskFromExecList(pJob, pTask)); SCH_ERR_RET(schRemoveTaskFromExecList(pJob, pTask));
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_INIT); SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_INIT);
if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) { if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) {
SCH_ERR_RET(schLaunchTasksInFlowCtrlList(pJob, pTask)); SCH_ERR_RET(schLaunchTasksInFlowCtrlList(pJob, pTask));
} }
...@@ -545,7 +541,8 @@ int32_t schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask) { ...@@ -545,7 +541,8 @@ int32_t schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask) {
schDeregisterTaskHb(pJob, pTask); schDeregisterTaskHb(pJob, pTask);
if (SCH_IS_DATA_BIND_TASK(pTask)) { if (SCH_IS_DATA_BIND_TASK(pTask)) {
SCH_SWITCH_EPSET(&pTask->plan->execNode); SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
SCH_SWITCH_EPSET(addr);
} else { } else {
SCH_ERR_RET(schSwitchTaskCandidateAddr(pJob, pTask)); SCH_ERR_RET(schSwitchTaskCandidateAddr(pJob, pTask));
} }
...@@ -558,20 +555,21 @@ int32_t schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask) { ...@@ -558,20 +555,21 @@ int32_t schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask) {
int32_t schSetAddrsFromNodeList(SSchJob *pJob, SSchTask *pTask) { int32_t schSetAddrsFromNodeList(SSchJob *pJob, SSchTask *pTask) {
int32_t addNum = 0; int32_t addNum = 0;
int32_t nodeNum = 0; int32_t nodeNum = 0;
if (pJob->nodeList) { if (pJob->nodeList) {
nodeNum = taosArrayGetSize(pJob->nodeList); nodeNum = taosArrayGetSize(pJob->nodeList);
for (int32_t i = 0; i < nodeNum && addNum < SCH_MAX_CANDIDATE_EP_NUM; ++i) { for (int32_t i = 0; i < nodeNum && addNum < SCH_MAX_CANDIDATE_EP_NUM; ++i) {
SQueryNodeLoad *nload = taosArrayGet(pJob->nodeList, i); SQueryNodeLoad *nload = taosArrayGet(pJob->nodeList, i);
SQueryNodeAddr *naddr = &nload->addr; SQueryNodeAddr *naddr = &nload->addr;
if (NULL == taosArrayPush(pTask->candidateAddrs, naddr)) { if (NULL == taosArrayPush(pTask->candidateAddrs, naddr)) {
SCH_TASK_ELOG("taosArrayPush execNode to candidate addrs failed, addNum:%d, errno:%d", addNum, errno); SCH_TASK_ELOG("taosArrayPush execNode to candidate addrs failed, addNum:%d, errno:%d", addNum, errno);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
SCH_TASK_DLOG("set %dth candidate addr, id %d, fqdn:%s, port:%d", i, naddr->nodeId, SCH_GET_CUR_EP(naddr)->fqdn, SCH_GET_CUR_EP(naddr)->port); SCH_TASK_DLOG("set %dth candidate addr, id %d, fqdn:%s, port:%d", i, naddr->nodeId, SCH_GET_CUR_EP(naddr)->fqdn,
SCH_GET_CUR_EP(naddr)->port);
++addNum; ++addNum;
} }
...@@ -585,7 +583,6 @@ int32_t schSetAddrsFromNodeList(SSchJob *pJob, SSchTask *pTask) { ...@@ -585,7 +583,6 @@ int32_t schSetAddrsFromNodeList(SSchJob *pJob, SSchTask *pTask) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) { int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) {
if (NULL != pTask->candidateAddrs) { if (NULL != pTask->candidateAddrs) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -628,16 +625,17 @@ int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) { ...@@ -628,16 +625,17 @@ int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t schUpdateTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask, SEpSet* pEpSet) { int32_t schUpdateTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSet) {
if (NULL == pTask->candidateAddrs || 1 != taosArrayGetSize(pTask->candidateAddrs)) { if (NULL == pTask->candidateAddrs || 1 != taosArrayGetSize(pTask->candidateAddrs)) {
SCH_TASK_ELOG("not able to update cndidate addr, addr num %d", (int32_t)(pTask->candidateAddrs ? taosArrayGetSize(pTask->candidateAddrs): 0)); SCH_TASK_ELOG("not able to update cndidate addr, addr num %d",
(int32_t)(pTask->candidateAddrs ? taosArrayGetSize(pTask->candidateAddrs) : 0));
SCH_ERR_RET(TSDB_CODE_APP_ERROR); SCH_ERR_RET(TSDB_CODE_APP_ERROR);
} }
SQueryNodeAddr* pAddr = taosArrayGet(pTask->candidateAddrs, 0); SQueryNodeAddr *pAddr = taosArrayGet(pTask->candidateAddrs, 0);
SEp* pOld = &pAddr->epSet.eps[pAddr->epSet.inUse]; SEp *pOld = &pAddr->epSet.eps[pAddr->epSet.inUse];
SEp* pNew = &pEpSet->eps[pEpSet->inUse]; SEp *pNew = &pEpSet->eps[pEpSet->inUse];
SCH_TASK_DLOG("update task ep from %s:%d to %s:%d", pOld->fqdn, pOld->port, pNew->fqdn, pNew->port); SCH_TASK_DLOG("update task ep from %s:%d to %s:%d", pOld->fqdn, pOld->port, pNew->fqdn, pNew->port);
...@@ -647,7 +645,7 @@ int32_t schUpdateTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask, SEpSet* pEpSe ...@@ -647,7 +645,7 @@ int32_t schUpdateTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask, SEpSet* pEpSe
} }
int32_t schSwitchTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask) { int32_t schSwitchTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask) {
int32_t candidateNum = taosArrayGetSize(pTask->candidateAddrs); int32_t candidateNum = taosArrayGetSize(pTask->candidateAddrs);
if (++pTask->candidateIdx >= candidateNum) { if (++pTask->candidateIdx >= candidateNum) {
pTask->candidateIdx = 0; pTask->candidateIdx = 0;
} }
...@@ -655,8 +653,6 @@ int32_t schSwitchTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask) { ...@@ -655,8 +653,6 @@ int32_t schSwitchTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t schRemoveTaskFromExecList(SSchJob *pJob, SSchTask *pTask) { int32_t schRemoveTaskFromExecList(SSchJob *pJob, SSchTask *pTask) {
int32_t code = taosHashRemove(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId)); int32_t code = taosHashRemove(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId));
if (code) { if (code) {
...@@ -692,33 +688,32 @@ void schDropTaskOnExecNode(SSchJob *pJob, SSchTask *pTask) { ...@@ -692,33 +688,32 @@ void schDropTaskOnExecNode(SSchJob *pJob, SSchTask *pTask) {
SCH_TASK_DLOG("task has been dropped on %d exec nodes", size); SCH_TASK_DLOG("task has been dropped on %d exec nodes", size);
} }
int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId *pEpId, SArray *pStatusList) {
int32_t taskNum = (int32_t)taosArrayGetSize(pStatusList);
int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId* pEpId, SArray* pStatusList) {
int32_t taskNum = (int32_t)taosArrayGetSize(pStatusList);
SSchTask *pTask = NULL; SSchTask *pTask = NULL;
SSchJob *pJob = NULL; SSchJob *pJob = NULL;
qDebug("%d task status in hb rsp from nodeId:%d, fqdn:%s, port:%d", taskNum, pEpId->nodeId, pEpId->ep.fqdn, pEpId->ep.port); qDebug("%d task status in hb rsp from nodeId:%d, fqdn:%s, port:%d", taskNum, pEpId->nodeId, pEpId->ep.fqdn,
pEpId->ep.port);
for (int32_t i = 0; i < taskNum; ++i) { for (int32_t i = 0; i < taskNum; ++i) {
STaskStatus *pStatus = taosArrayGet(pStatusList, i); STaskStatus *pStatus = taosArrayGet(pStatusList, i);
int32_t code = 0; int32_t code = 0;
qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d task status in server: %s", qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d task status in server: %s", pStatus->queryId, pStatus->taskId,
pStatus->queryId, pStatus->taskId, pStatus->execId, jobTaskStatusStr(pStatus->status)); pStatus->execId, jobTaskStatusStr(pStatus->status));
if (schProcessOnCbBegin(&pJob, &pTask, pStatus->queryId, pStatus->refId, pStatus->taskId)) { if (schProcessOnCbBegin(&pJob, &pTask, pStatus->queryId, pStatus->refId, pStatus->taskId)) {
continue; continue;
} }
if (pStatus->execId != pTask->execId) { if (pStatus->execId != pTask->execId) {
//TODO // TODO
SCH_TASK_DLOG("execId %d mis-match current execId %d", pStatus->execId, pTask->execId); SCH_TASK_DLOG("execId %d mis-match current execId %d", pStatus->execId, pTask->execId);
schProcessOnCbEnd(pJob, pTask, 0); schProcessOnCbEnd(pJob, pTask, 0);
continue; continue;
} }
if (pStatus->status == JOB_TASK_STATUS_FAIL) { if (pStatus->status == JOB_TASK_STATUS_FAIL) {
// RECORD AND HANDLE ERROR!!!! // RECORD AND HANDLE ERROR!!!!
schProcessOnCbEnd(pJob, pTask, 0); schProcessOnCbEnd(pJob, pTask, 0);
...@@ -832,7 +827,6 @@ void schDropTaskInHashList(SSchJob *pJob, SHashObj *list) { ...@@ -832,7 +827,6 @@ void schDropTaskInHashList(SSchJob *pJob, SHashObj *list) {
} }
} }
// Note: no more error processing, handled in function internal // Note: no more error processing, handled in function internal
int32_t schLaunchFetchTask(SSchJob *pJob) { int32_t schLaunchFetchTask(SSchJob *pJob) {
int32_t code = 0; int32_t code = 0;
...@@ -851,5 +845,3 @@ _return: ...@@ -851,5 +845,3 @@ _return:
SCH_RET(schProcessOnTaskFailure(pJob, pJob->fetchTask, code)); SCH_RET(schProcessOnTaskFailure(pJob, pJob->fetchTask, code));
} }
...@@ -168,11 +168,103 @@ system sh/exec.sh -n dnode3 -s stop -x SIGINT ...@@ -168,11 +168,103 @@ system sh/exec.sh -n dnode3 -s stop -x SIGINT
########################################################
print ===> start dnode1 dnode2 dnode3 dnode4 print ===> start dnode1 dnode2 dnode3 dnode4
system sh/exec.sh -n dnode1 -s start system sh/exec.sh -n dnode1 -s start
system sh/exec.sh -n dnode2 -s start system sh/exec.sh -n dnode2 -s start
system sh/exec.sh -n dnode3 -s start system sh/exec.sh -n dnode3 -s start
system sh/exec.sh -n dnode4 -s start system sh/exec.sh -n dnode4 -s start
sleep 3000
print =============== query data
sql connect
sql use db
sql select * from ct1
print rows: $rows
print $data00 $data01 $data02
if $rows != 100 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode2 -s stop -x SIGINT
system sh/exec.sh -n dnode3 -s stop -x SIGINT
system sh/exec.sh -n dnode4 -s stop -x SIGINT
########################################################
########################################################
print ===> start dnode1 dnode3 dnode4
system sh/exec.sh -n dnode1 -s start
#system sh/exec.sh -n dnode2 -s start
system sh/exec.sh -n dnode3 -s start
system sh/exec.sh -n dnode4 -s start
sleep 7000
print =============== query data
sql connect
sql use db
sql select * from ct1
print rows: $rows
print $data00 $data01 $data02
if $rows != 100 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
#system sh/exec.sh -n dnode2 -s stop -x SIGINT
system sh/exec.sh -n dnode3 -s stop -x SIGINT
system sh/exec.sh -n dnode4 -s stop -x SIGINT
########################################################
########################################################
print ===> start dnode1 dnode2 dnode4
system sh/exec.sh -n dnode1 -s start
system sh/exec.sh -n dnode2 -s start
#system sh/exec.sh -n dnode3 -s start
system sh/exec.sh -n dnode4 -s start
sleep 3000
print =============== query data
sql select * from ct1
print rows: $rows
print $data00 $data01 $data02
if $rows != 100 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode2 -s stop -x SIGINT
#system sh/exec.sh -n dnode3 -s stop -x SIGINT
system sh/exec.sh -n dnode4 -s stop -x SIGINT
########################################################
########################################################
print ===> start dnode1 dnode2 dnode3
system sh/exec.sh -n dnode1 -s start
system sh/exec.sh -n dnode2 -s start
system sh/exec.sh -n dnode3 -s start
#system sh/exec.sh -n dnode4 -s start
sleep 3000
print =============== query data
sql select * from ct1
print rows: $rows
print $data00 $data01 $data02
if $rows != 100 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode2 -s stop -x SIGINT
system sh/exec.sh -n dnode3 -s stop -x SIGINT
#system sh/exec.sh -n dnode4 -s stop -x SIGINT
########################################################
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册