提交 f6c6083a 编写于 作者: D dapan1121

reschedule timeout task

上级 99be3934
...@@ -58,6 +58,8 @@ typedef struct { ...@@ -58,6 +58,8 @@ typedef struct {
int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qWorkerMgmt, const SMsgCb *pMsgCb); int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qWorkerMgmt, const SMsgCb *pMsgCb);
int32_t qWorkerPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg);
int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts); int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts);
int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts); int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts);
......
...@@ -563,7 +563,8 @@ int32_t* taosGetErrno(); ...@@ -563,7 +563,8 @@ int32_t* taosGetErrno();
//scheduler&qworker //scheduler&qworker
#define TSDB_CODE_SCH_STATUS_ERROR TAOS_DEF_ERROR_CODE(0, 0x2501) #define TSDB_CODE_SCH_STATUS_ERROR TAOS_DEF_ERROR_CODE(0, 0x2501)
#define TSDB_CODE_SCH_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x2502) #define TSDB_CODE_SCH_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x2502)
#define TSDB_CODE_QW_MSG_ERROR TAOS_DEF_ERROR_CODE(0, 0x2503) #define TSDB_CODE_SCH_IGNORE_ERROR TAOS_DEF_ERROR_CODE(0, 0x2503)
#define TSDB_CODE_QW_MSG_ERROR TAOS_DEF_ERROR_CODE(0, 0x2504)
//parser //parser
#define TSDB_CODE_PAR_SYNTAX_ERROR TAOS_DEF_ERROR_CODE(0, 0x2600) #define TSDB_CODE_PAR_SYNTAX_ERROR TAOS_DEF_ERROR_CODE(0, 0x2600)
......
...@@ -131,6 +131,7 @@ void destroyTscObj(void *pObj) { ...@@ -131,6 +131,7 @@ void destroyTscObj(void *pObj) {
hbDeregisterConn(pTscObj->pAppInfo->pAppHbMgr, connKey); hbDeregisterConn(pTscObj->pAppInfo->pAppHbMgr, connKey);
atomic_sub_fetch_64(&pTscObj->pAppInfo->numOfConns, 1); atomic_sub_fetch_64(&pTscObj->pAppInfo->numOfConns, 1);
closeAllRequests(pTscObj->pRequests); closeAllRequests(pTscObj->pRequests);
schedulerStopTransport(pTscObj->pAppInfo->pTransporter);
tscDebug("connObj 0x%" PRIx64 " destroyed, totalConn:%" PRId64, pTscObj->id, pTscObj->pAppInfo->numOfConns); tscDebug("connObj 0x%" PRIx64 " destroyed, totalConn:%" PRId64, pTscObj->id, pTscObj->pAppInfo->numOfConns);
taosThreadMutexDestroy(&pTscObj->mutex); taosThreadMutexDestroy(&pTscObj->mutex);
taosMemoryFreeClear(pTscObj); taosMemoryFreeClear(pTscObj);
......
...@@ -238,6 +238,8 @@ static int32_t vmPutNodeMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType ...@@ -238,6 +238,8 @@ static int32_t vmPutNodeMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType
switch (qtype) { switch (qtype) {
case QUERY_QUEUE: case QUERY_QUEUE:
vnodePreprocessQueryMsg(pVnode->pImpl, pMsg);
dTrace("msg:%p, put into vnode-query worker, type:%s", pMsg, TMSG_INFO(pMsg->msgType)); dTrace("msg:%p, put into vnode-query worker, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
taosWriteQitem(pVnode->pQueryQ, pMsg); taosWriteQitem(pVnode->pQueryQ, pMsg);
break; break;
......
...@@ -56,6 +56,7 @@ int32_t vnodePreprocessReq(SVnode *pVnode, SRpcMsg *pMsg); ...@@ -56,6 +56,7 @@ int32_t vnodePreprocessReq(SVnode *pVnode, SRpcMsg *pMsg);
int32_t vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp); int32_t vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp);
int32_t vnodeProcessCMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); int32_t vnodeProcessCMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
int32_t vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); int32_t vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
int32_t vnodePreprocessQueryMsg(SVnode * pVnode, SRpcMsg * pMsg);
int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg); int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg);
int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo); int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo);
int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad); int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad);
......
...@@ -189,7 +189,15 @@ _err: ...@@ -189,7 +189,15 @@ _err:
return -1; return -1;
} }
int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { int32_t vnodePreprocessQueryMsg(SVnode * pVnode, SRpcMsg * pMsg) {
if (TDMT_VND_QUERY != pMsg->msgType) {
return 0;
}
return qWorkerPreprocessQueryMsg(pVnode->pQuery, pMsg);
}
int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
vTrace("message in vnode query queue is processing"); vTrace("message in vnode query queue is processing");
SReadHandle handle = {.meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb}; SReadHandle handle = {.meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb};
switch (pMsg->msgType) { switch (pMsg->msgType) {
......
...@@ -33,7 +33,7 @@ extern "C" { ...@@ -33,7 +33,7 @@ extern "C" {
#define QW_DEFAULT_TASK_NUMBER 10000 #define QW_DEFAULT_TASK_NUMBER 10000
#define QW_DEFAULT_SCH_TASK_NUMBER 10000 #define QW_DEFAULT_SCH_TASK_NUMBER 10000
#define QW_DEFAULT_SHORT_RUN_TIMES 2 #define QW_DEFAULT_SHORT_RUN_TIMES 2
#define QW_DEFAULT_HEARTBEAT_MSEC 3000 #define QW_DEFAULT_HEARTBEAT_MSEC 5000
enum { enum {
QW_PHASE_PRE_QUERY = 1, QW_PHASE_PRE_QUERY = 1,
......
...@@ -23,6 +23,7 @@ extern "C" { ...@@ -23,6 +23,7 @@ extern "C" {
#include "qwInt.h" #include "qwInt.h"
#include "dataSinkMgt.h" #include "dataSinkMgt.h"
int32_t qwPrerocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg);
int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t explain); int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t explain);
int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg); int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg);
int32_t qwProcessReady(QW_FPARAMS_DEF, SQWMsg *qwMsg); int32_t qwProcessReady(QW_FPARAMS_DEF, SQWMsg *qwMsg);
......
...@@ -248,6 +248,41 @@ int32_t qwRegisterHbBrokenLinkArg(SQWorker *mgmt, uint64_t sId, SRpcHandleInfo * ...@@ -248,6 +248,41 @@ int32_t qwRegisterHbBrokenLinkArg(SQWorker *mgmt, uint64_t sId, SRpcHandleInfo *
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qWorkerPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg) {
if (NULL == qWorkerMgmt || NULL == pMsg) {
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
int32_t code = 0;
SSubQueryMsg *msg = pMsg->pCont;
SQWorker * mgmt = (SQWorker *)qWorkerMgmt;
if (NULL == msg || pMsg->contLen <= sizeof(*msg)) {
QW_ELOG("invalid query msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
msg->sId = be64toh(msg->sId);
msg->queryId = be64toh(msg->queryId);
msg->taskId = be64toh(msg->taskId);
msg->refId = be64toh(msg->refId);
msg->phyLen = ntohl(msg->phyLen);
msg->sqlLen = ntohl(msg->sqlLen);
uint64_t sId = msg->sId;
uint64_t qId = msg->queryId;
uint64_t tId = msg->taskId;
int64_t rId = msg->refId;
SQWMsg qwMsg = {.node = node, .msg = msg->msg + msg->sqlLen, .msgLen = msg->phyLen, .connInfo = pMsg->info};
QW_SCH_TASK_DLOG("prerocessQuery start, handle:%p", pMsg->info.handle);
QW_ERR_RET(qwPrerocessQuery(QW_FPARAMS(), &qwMsg));
QW_SCH_TASK_DLOG("prerocessQuery end, handle:%p", pMsg->info.handle);
return TSDB_CODE_SUCCESS;
}
int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) { int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) {
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) { if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
......
...@@ -248,11 +248,7 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu ...@@ -248,11 +248,7 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu
QW_TASK_DLOG("start to handle event at phase %s", qwPhaseStr(phase)); QW_TASK_DLOG("start to handle event at phase %s", qwPhaseStr(phase));
if (QW_PHASE_PRE_QUERY == phase) { QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx));
QW_ERR_JRET(qwAddAcquireTaskCtx(QW_FPARAMS(), &ctx));
} else {
QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx));
}
QW_LOCK(QW_WRITE, &ctx->lock); QW_LOCK(QW_WRITE, &ctx->lock);
...@@ -285,7 +281,7 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu ...@@ -285,7 +281,7 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu
break; break;
} }
QW_ERR_JRET(qwAddTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXECUTING)); QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXECUTING));
break; break;
} }
case QW_PHASE_PRE_FETCH: { case QW_PHASE_PRE_FETCH: {
...@@ -437,7 +433,7 @@ _return: ...@@ -437,7 +433,7 @@ _return:
QW_RET(code); QW_RET(code);
} }
int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t explain) { int32_t qwPrerocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
int32_t code = 0; int32_t code = 0;
bool queryRsped = false; bool queryRsped = false;
SSubplan *plan = NULL; SSubplan *plan = NULL;
...@@ -448,6 +444,30 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t ex ...@@ -448,6 +444,30 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t ex
QW_ERR_JRET(qwRegisterQueryBrokenLinkArg(QW_FPARAMS(), &qwMsg->connInfo)); QW_ERR_JRET(qwRegisterQueryBrokenLinkArg(QW_FPARAMS(), &qwMsg->connInfo));
QW_ERR_JRET(qwAddAcquireTaskCtx(QW_FPARAMS(), &ctx));
QW_ERR_JRET(qwAddTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_NOT_START));
_return:
if (ctx) {
QW_UPDATE_RSP_CODE(ctx, code);
qwReleaseTaskCtx(mgmt, ctx);
}
QW_RET(TSDB_CODE_SUCCESS);
}
int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t explain) {
int32_t code = 0;
bool queryRsped = false;
SSubplan *plan = NULL;
SQWPhaseInput input = {0};
qTaskInfo_t pTaskInfo = NULL;
DataSinkHandle sinkHandle = NULL;
SQWTaskCtx *ctx = NULL;
QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_QUERY, &input, NULL)); QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_QUERY, &input, NULL));
QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx)); QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx));
...@@ -663,7 +683,7 @@ int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) { ...@@ -663,7 +683,7 @@ int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
// TODO : TASK ALREADY REMOVED AND A NEW DROP MSG RECEIVED // TODO : TASK ALREADY REMOVED AND A NEW DROP MSG RECEIVED
QW_ERR_JRET(qwAddAcquireTaskCtx(QW_FPARAMS(), &ctx)); QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx));
QW_LOCK(QW_WRITE, &ctx->lock); QW_LOCK(QW_WRITE, &ctx->lock);
......
...@@ -32,6 +32,10 @@ extern "C" { ...@@ -32,6 +32,10 @@ extern "C" {
#define SCHEDULE_DEFAULT_MAX_TASK_NUM 1000 #define SCHEDULE_DEFAULT_MAX_TASK_NUM 1000
#define SCHEDULE_DEFAULT_MAX_NODE_TABLE_NUM 200 // unit is TSDB_TABLE_NUM_UNIT #define SCHEDULE_DEFAULT_MAX_NODE_TABLE_NUM 200 // unit is TSDB_TABLE_NUM_UNIT
#define SCH_DEFAULT_TASK_TIMEOUT_USEC 10000000
#define SCH_MAX_TASK_TIMEOUT_USEC 60000000
#define SCH_TASK_MAX_EXEC_TIMES 5
#define SCH_MAX_CANDIDATE_EP_NUM TSDB_MAX_REPLICA #define SCH_MAX_CANDIDATE_EP_NUM TSDB_MAX_REPLICA
enum { enum {
...@@ -51,6 +55,7 @@ typedef struct SSchTrans { ...@@ -51,6 +55,7 @@ typedef struct SSchTrans {
typedef struct SSchHbTrans { typedef struct SSchHbTrans {
SRWLatch lock; SRWLatch lock;
int64_t taskNum;
SRpcCtx rpcCtx; SRpcCtx rpcCtx;
SSchTrans trans; SSchTrans trans;
} SSchHbTrans; } SSchHbTrans;
...@@ -114,7 +119,8 @@ typedef struct SSchTaskCallbackParam { ...@@ -114,7 +119,8 @@ typedef struct SSchTaskCallbackParam {
uint64_t queryId; uint64_t queryId;
int64_t refId; int64_t refId;
uint64_t taskId; uint64_t taskId;
void *transport; int32_t execIdx;
void *pTrans;
} SSchTaskCallbackParam; } SSchTaskCallbackParam;
typedef struct SSchHbCallbackParam { typedef struct SSchHbCallbackParam {
...@@ -148,8 +154,16 @@ typedef struct SSchLevel { ...@@ -148,8 +154,16 @@ typedef struct SSchLevel {
SArray *subTasks; // Element is SQueryTask SArray *subTasks; // Element is SQueryTask
} SSchLevel; } SSchLevel;
typedef struct SSchTaskProfile {
int64_t startTs;
int64_t execUseTime[SCH_TASK_MAX_EXEC_TIMES];
int64_t waitTime;
int64_t endTs;
} SSchTaskProfile;
typedef struct SSchTask { typedef struct SSchTask {
uint64_t taskId; // task id uint64_t taskId; // task id
int32_t execIdx; // task current execute try index
SRWLatch lock; // task lock SRWLatch lock; // task lock
SSchLevel *level; // level SSchLevel *level; // level
SSubplan *plan; // subplan SSubplan *plan; // subplan
...@@ -157,16 +171,17 @@ typedef struct SSchTask { ...@@ -157,16 +171,17 @@ typedef struct SSchTask {
int32_t msgLen; // msg length int32_t msgLen; // msg length
int8_t status; // task status int8_t status; // task status
int32_t lastMsgType; // last sent msg type int32_t lastMsgType; // last sent msg type
int32_t tryTimes; // task already tried times int64_t timeoutUsec; // taks timeout useconds before reschedule
SQueryNodeAddr succeedAddr; // task executed success node address SQueryNodeAddr succeedAddr; // task executed success node address
int8_t candidateIdx; // current try condidation index int8_t candidateIdx; // current try condidation index
SArray *candidateAddrs; // condidate node addresses, element is SQueryNodeAddr SArray *candidateAddrs; // condidate node addresses, element is SQueryNodeAddr
SArray *execNodes; // all tried node for current task, element is SSchNodeInfo SHashObj *execNodes; // all tried node for current task, element is SSchNodeInfo
SQueryProfileSummary summary; // task execution summary SSchTaskProfile profile; // task execution profile
int32_t childReady; // child task ready number int32_t childReady; // child task ready number
SArray *children; // the datasource tasks,from which to fetch the result, element is SQueryTask* SArray *children; // the datasource tasks,from which to fetch the result, element is SQueryTask*
SArray *parents; // the data destination tasks, get data from current task, element is SQueryTask* SArray *parents; // the data destination tasks, get data from current task, element is SQueryTask*
void* handle; // task send handle void* handle; // task send handle
bool registerdHb; // registered in hb
} SSchTask; } SSchTask;
typedef struct SSchJobAttr { typedef struct SSchJobAttr {
...@@ -215,6 +230,24 @@ typedef struct SSchJob { ...@@ -215,6 +230,24 @@ typedef struct SSchJob {
extern SSchedulerMgmt schMgmt; extern SSchedulerMgmt schMgmt;
#define SCH_LOG_TASK_START_TS(_task) \
do { \
int64_t us = taosGetTimestampUs(); \
(_task)->profile.tryUseTime[(_task)->execIdx] = us; \
if (0 == (_task)->execIdx) { \
(_task)->profile.startTs = us; \
} \
} while (0)
#define SCH_LOG_TASK_END_TS(_task) \
do { \
int64_t us = taosGetTimestampUs(); \
(_task)->profile.tryUseTime[(_task)->execIdx] = us - (_task)->profile.tryUseTime[(_task)->execIdx]; \
(_task)->profile.endTs = us; \
} while (0)
#define SCH_TASK_TIMEOUT(_task) ((taosGetTimestampUs() - (_task)->profile.tryUseTime[(_task)->execIdx]) > (_taks)->timeoutUsec)
#define SCH_TASK_READY_FOR_LAUNCH(readyNum, task) ((readyNum) >= taosArrayGetSize((task)->children)) #define SCH_TASK_READY_FOR_LAUNCH(readyNum, task) ((readyNum) >= taosArrayGetSize((task)->children))
#define SCH_TASK_ID(_task) ((_task) ? (_task)->taskId : -1) #define SCH_TASK_ID(_task) ((_task) ? (_task)->taskId : -1)
...@@ -284,7 +317,7 @@ int32_t schLaunchTasksInFlowCtrlList(SSchJob *pJob, SSchTask *pTask); ...@@ -284,7 +317,7 @@ int32_t schLaunchTasksInFlowCtrlList(SSchJob *pJob, SSchTask *pTask);
int32_t schLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask); int32_t schLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask);
int32_t schFetchFromRemote(SSchJob *pJob); int32_t schFetchFromRemote(SSchJob *pJob);
int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode); int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode);
int32_t schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId); int32_t schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId, SArray* taskAction);
int32_t schCloneSMsgSendInfo(void *src, void **dst); int32_t schCloneSMsgSendInfo(void *src, void **dst);
int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob); int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob);
void schFreeJobImpl(void *job); void schFreeJobImpl(void *job);
...@@ -301,10 +334,9 @@ int32_t schSaveJobQueryRes(SSchJob *pJob, SQueryTableRsp *rsp); ...@@ -301,10 +334,9 @@ int32_t schSaveJobQueryRes(SSchJob *pJob, SQueryTableRsp *rsp);
int32_t schProcessOnExplainDone(SSchJob *pJob, SSchTask *pTask, SRetrieveTableRsp *pRsp); int32_t schProcessOnExplainDone(SSchJob *pJob, SSchTask *pTask, SRetrieveTableRsp *pRsp);
void schProcessOnDataFetched(SSchJob *job); void schProcessOnDataFetched(SSchJob *job);
int32_t schGetTaskInJob(SSchJob *pJob, uint64_t taskId, SSchTask **pTask); int32_t schGetTaskInJob(SSchJob *pJob, uint64_t taskId, SSchTask **pTask);
int32_t schUpdateTaskExecNodeHandle(SSchTask *pTask, void *handle, int32_t rspCode);
void schFreeRpcCtxVal(const void *arg); void schFreeRpcCtxVal(const void *arg);
int32_t schMakeBrokenLinkVal(SSchJob *pJob, SSchTask *pTask, SRpcBrokenlinkVal *brokenVal, bool isHb); int32_t schMakeBrokenLinkVal(SSchJob *pJob, SSchTask *pTask, SRpcBrokenlinkVal *brokenVal, bool isHb);
int32_t schRecordTaskExecNode(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, void *handle); int32_t schAppendTaskExecNode(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, int32_t execIdx);
int32_t schExecStaticExplainJob(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql, int32_t schExecStaticExplainJob(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql,
SSchResInfo *pRes, bool sync); SSchResInfo *pRes, bool sync);
int32_t schExecJobImpl(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql, int32_t schExecJobImpl(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql,
...@@ -318,7 +350,7 @@ int32_t schExecJob(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *p ...@@ -318,7 +350,7 @@ int32_t schExecJob(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *p
int32_t schAsyncExecJob(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *pJob, const char *sql, int64_t startTs, SSchResInfo *pRes); int32_t schAsyncExecJob(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *pJob, const char *sql, int64_t startTs, SSchResInfo *pRes);
int32_t schFetchRows(SSchJob *pJob); int32_t schFetchRows(SSchJob *pJob);
int32_t schAsyncFetchRows(SSchJob *pJob); int32_t schAsyncFetchRows(SSchJob *pJob);
int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, int32_t msgType, void *handle, int32_t rspCode); int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, int32_t msgType, void *handle, int32_t execIdx);
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -28,11 +28,13 @@ FORCE_INLINE int32_t schReleaseJob(int64_t refId) { return taosReleaseRef(schMgm ...@@ -28,11 +28,13 @@ FORCE_INLINE int32_t schReleaseJob(int64_t refId) { return taosReleaseRef(schMgm
int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel) { int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel) {
pTask->plan = pPlan; pTask->plan = pPlan;
pTask->level = pLevel; pTask->level = pLevel;
pTask->execIdx = -1;
pTask->timeoutUsec = SCH_DEFAULT_TASK_TIMEOUT_USEC;
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_NOT_START); SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_NOT_START);
pTask->taskId = schGenTaskId(); pTask->taskId = schGenTaskId();
pTask->execNodes = taosArrayInit(SCH_MAX_CANDIDATE_EP_NUM, sizeof(SSchNodeInfo)); pTask->execNodes = taosHashInit(SCH_MAX_CANDIDATE_EP_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
if (NULL == pTask->execNodes) { if (NULL == pTask->execNodes) {
SCH_TASK_ELOG("taosArrayInit %d execNodes failed", SCH_MAX_CANDIDATE_EP_NUM); SCH_TASK_ELOG("taosHashInit %d execNodes failed", SCH_MAX_CANDIDATE_EP_NUM);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
...@@ -123,7 +125,33 @@ _return: ...@@ -123,7 +125,33 @@ _return:
SCH_RET(code); SCH_RET(code);
} }
void schFreeTask(SSchTask *pTask) { void schDeregisterTaskHb(SSchJob *pJob, SSchTask *pTask) {
if (!pTask->registerdHb) {
return;
}
SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
SQueryNodeEpId epId = {0};
epId.nodeId = addr->nodeId;
SEp* pEp = SCH_GET_CUR_EP(addr);
strcpy(epId.ep.fqdn, pEp->fqdn);
epId.ep.port = pEp->port;
SSchHbTrans *hb = taosHashGet(schMgmt.hbConnections, &epId, sizeof(SQueryNodeEpId));
if (NULL == hb) {
SCH_TASK_ELOG("nodeId %d fqdn %s port %d not in hb connections", epId.nodeId, epId.ep.fqdn, epId.ep.port);
}
atomic_sub_fetch_64(&hb->taskNum, 1);
pTask->registerdHb = false;
}
void schFreeTask(SSchJob *pJob, SSchTask *pTask) {
schDeregisterTaskHb(pJob, pTask);
if (pTask->candidateAddrs) { if (pTask->candidateAddrs) {
taosArrayDestroy(pTask->candidateAddrs); taosArrayDestroy(pTask->candidateAddrs);
} }
...@@ -139,7 +167,7 @@ void schFreeTask(SSchTask *pTask) { ...@@ -139,7 +167,7 @@ void schFreeTask(SSchTask *pTask) {
} }
if (pTask->execNodes) { if (pTask->execNodes) {
taosArrayDestroy(pTask->execNodes); taosHashCleanup(pTask->execNodes);
} }
} }
...@@ -329,45 +357,52 @@ int32_t schRecordTaskSucceedNode(SSchJob *pJob, SSchTask *pTask) { ...@@ -329,45 +357,52 @@ int32_t schRecordTaskSucceedNode(SSchJob *pJob, SSchTask *pTask) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t schRecordTaskExecNode(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, void *handle) { int32_t schAppendTaskExecNode(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, int32_t execIdx) {
SSchNodeInfo nodeInfo = {.addr = *addr, .handle = handle}; SSchNodeInfo nodeInfo = {.addr = *addr, .handle = NULL};
if (NULL == taosArrayPush(pTask->execNodes, &nodeInfo)) { if (NULL == taosHashPut(pTask->execNodes, &execIdx, sizeof(execIdx), &nodeInfo, sizeof(nodeInfo))) {
SCH_TASK_ELOG("taosArrayPush nodeInfo to execNodes list failed, errno:%d", errno); SCH_TASK_ELOG("taosHashPut nodeInfo to execNodes failed, errno:%d", errno);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
SCH_TASK_DLOG("task execNode recorded, handle:%p", handle); SCH_TASK_DLOG("task execNode added, execIdx:%d", execIdx);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t schDropTaskExecNode(SSchJob *pJob, SSchTask *pTask, void *handle) { int32_t schDropTaskExecNode(SSchJob *pJob, SSchTask *pTask, void *handle, int32_t execIdx) {
if (NULL == pTask->execNodes) { if (NULL == pTask->execNodes) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t num = taosArrayGetSize(pTask->execNodes); taosHashRemove(pTask->execNodes, &execIdx, sizeof(execIdx));
for (int32_t i = 0; i < num; ++i) { if (execIdx != pTask->execIdx) { // ignore it
SSchNodeInfo* pNode = taosArrayGet(pTask->execNodes, i); SCH_RET(TSDB_CODE_SCH_IGNORE_ERROR);
if (pNode->handle == handle) {
taosArrayRemove(pTask->execNodes, i);
break;
}
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, int32_t msgType, void *handle, int32_t rspCode) { int32_t schUpdateTaskExecNode(SSchTask *pTask, void *handle, int32_t execIdx) {
SCH_SET_TASK_HANDLE(pTask, handle); if (taosArrayGetSize(pTask->execNodes) <= 0) {
return TSDB_CODE_SUCCESS;
}
schUpdateTaskExecNodeHandle(pTask, handle, rspCode); SSchNodeInfo *nodeInfo = taosHashGet(pTask->execNodes, &execIdx, sizeof(execIdx));
nodeInfo->handle = handle;
return TSDB_CODE_SUCCESS;
}
int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, int32_t msgType, void *handle, int32_t execIdx) {
if (msgType == TDMT_SCH_LINK_BROKEN) { if (msgType == TDMT_SCH_LINK_BROKEN) {
schDropTaskExecNode(pJob, pTask, handle); SCH_RET(schDropTaskExecNode(pJob, pTask, handle, execIdx));
} }
SCH_SET_TASK_HANDLE(pTask, handle);
schUpdateTaskExecNode(pTask, handle, execIdx);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -672,7 +707,6 @@ int32_t schMoveTaskToExecList(SSchJob *pJob, SSchTask *pTask, bool *moved) { ...@@ -672,7 +707,6 @@ int32_t schMoveTaskToExecList(SSchJob *pJob, SSchTask *pTask, bool *moved) {
int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bool *needRetry) { int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bool *needRetry) {
int8_t status = 0; int8_t status = 0;
++pTask->tryTimes;
if (schJobNeedToStop(pJob, &status)) { if (schJobNeedToStop(pJob, &status)) {
*needRetry = false; *needRetry = false;
...@@ -680,9 +714,9 @@ int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bo ...@@ -680,9 +714,9 @@ int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bo
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (pTask->tryTimes >= REQUEST_MAX_TRY_TIMES) { if ((pTask->execIdx + 1) >= SCH_TASK_MAX_EXEC_TIMES) {
*needRetry = false; *needRetry = false;
SCH_TASK_DLOG("task no more retry since reach max try times, tryTimes:%d", pTask->tryTimes); SCH_TASK_DLOG("task no more retry since reach max try times, execIdx:%d", pTask->execIdx);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -694,9 +728,9 @@ int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bo ...@@ -694,9 +728,9 @@ int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bo
// TODO CHECK epList/condidateList // TODO CHECK epList/condidateList
if (SCH_IS_DATA_SRC_TASK(pTask)) { if (SCH_IS_DATA_SRC_TASK(pTask)) {
if (pTask->tryTimes >= SCH_TASK_NUM_OF_EPS(&pTask->plan->execNode)) { if ((pTask->execIdx + 1) >= SCH_TASK_NUM_OF_EPS(&pTask->plan->execNode)) {
*needRetry = false; *needRetry = false;
SCH_TASK_DLOG("task no more retry since all ep tried, tryTimes:%d, epNum:%d", pTask->tryTimes, SCH_TASK_DLOG("task no more retry since all ep tried, execIdx:%d, epNum:%d", pTask->execIdx,
SCH_TASK_NUM_OF_EPS(&pTask->plan->execNode)); SCH_TASK_NUM_OF_EPS(&pTask->plan->execNode));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -712,7 +746,7 @@ int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bo ...@@ -712,7 +746,7 @@ int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bo
} }
*needRetry = true; *needRetry = true;
SCH_TASK_DLOG("task need the %dth retry, errCode:%x - %s", pTask->tryTimes, errCode, tstrerror(errCode)); SCH_TASK_DLOG("task need the %dth retry, errCode:%x - %s", pTask->execIdx + 1, errCode, tstrerror(errCode));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -728,6 +762,8 @@ int32_t schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask) { ...@@ -728,6 +762,8 @@ int32_t schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask) {
SCH_ERR_RET(schLaunchTasksInFlowCtrlList(pJob, pTask)); SCH_ERR_RET(schLaunchTasksInFlowCtrlList(pJob, pTask));
} }
schDeregisterTaskHb(pJob, pTask);
if (SCH_IS_DATA_SRC_TASK(pTask)) { if (SCH_IS_DATA_SRC_TASK(pTask)) {
SCH_SWITCH_EPSET(&pTask->plan->execNode); SCH_SWITCH_EPSET(&pTask->plan->execNode);
} else { } else {
...@@ -906,6 +942,8 @@ void schProcessOnDataFetched(SSchJob *job) { ...@@ -906,6 +942,8 @@ void schProcessOnDataFetched(SSchJob *job) {
int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode) { int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode) {
int8_t status = 0; int8_t status = 0;
SCH_LOG_TASK_END_TS(pTask);
if (schJobNeedToStop(pJob, &status)) { if (schJobNeedToStop(pJob, &status)) {
SCH_TASK_DLOG("task failed not processed cause of job status, job status:%s", jobTaskStatusStr(status)); SCH_TASK_DLOG("task failed not processed cause of job status, job status:%s", jobTaskStatusStr(status));
SCH_RET(atomic_load_32(&pJob->errCode)); SCH_RET(atomic_load_32(&pJob->errCode));
...@@ -989,6 +1027,8 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { ...@@ -989,6 +1027,8 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
SCH_TASK_DLOG("taskOnSuccess, status:%s", SCH_GET_TASK_STATUS_STR(pTask)); SCH_TASK_DLOG("taskOnSuccess, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
SCH_LOG_TASK_END_TS(pTask);
SCH_ERR_JRET(schMoveTaskToSuccList(pJob, pTask, &moved)); SCH_ERR_JRET(schMoveTaskToSuccList(pJob, pTask, &moved));
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_PARTIAL_SUCCEED); SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_PARTIAL_SUCCEED);
...@@ -1105,6 +1145,57 @@ int32_t schProcessOnExplainDone(SSchJob *pJob, SSchTask *pTask, SRetrieveTableRs ...@@ -1105,6 +1145,57 @@ int32_t schProcessOnExplainDone(SSchJob *pJob, SSchTask *pTask, SRetrieveTableRs
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t schRescheduleTask(SSchJob *pJob, SSchTask *pTask) {
if (SCH_IS_DATA_SRC_QRY_TASK(pTask)) {
return TSDB_CODE_SUCCESS;
}
}
int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId* pEpId, SArray* pStatusList) {
int32_t taskNum = (int32_t)taosArrayGetSize(pStatusList);
SSchTask *pTask = NULL;
qDebug("%d task status in hb rsp from nodeId:%d, fqdn:%s, port:%d", taskNum, pEpId->nodeId, pEpId->ep.fqdn, pEpId->ep.port);
for (int32_t i = 0; i < taskNum; ++i) {
STaskStatus *taskStatus = taosArrayGet(pStatusList, i);
SSchJob *pJob = schAcquireJob(taskStatus->refId);
if (NULL == pJob) {
qWarn("job not found, refId:0x%" PRIx64 ",QID:0x%" PRIx64 ",TID:0x%" PRIx64, taskStatus->refId,
taskStatus->queryId, taskStatus->taskId);
// TODO DROP TASK FROM SERVER!!!!
continue;
}
SCH_JOB_DLOG("TID:0x%" PRIx64 " task status in server: %s", taskStatus->taskId, jobTaskStatusStr(taskStatus->status));
pTask = NULL;
schGetTaskInJob(pJob, taskStatus->taskId, &pTask);
if (NULL == pTask) {
// TODO DROP TASK FROM SERVER!!!!
schReleaseJob(taskStatus->refId);
continue;
}
if (taskStatus->status == JOB_TASK_STATUS_FAILED) {
// RECORD AND HANDLE ERROR!!!!
schReleaseJob(taskStatus->refId);
continue;
}
if (taskStatus->status == JOB_TASK_STATUS_NOT_START && SCH_TASK_TIMEOUT(pTask)) {
schRescheduleTask(pJob, pTask);
}
schReleaseJob(taskStatus->refId);
}
}
int32_t schSaveJobQueryRes(SSchJob *pJob, SQueryTableRsp *rsp) { int32_t schSaveJobQueryRes(SSchJob *pJob, SQueryTableRsp *rsp) {
if (rsp->tbFName[0]) { if (rsp->tbFName[0]) {
if (NULL == pJob->execRes.res) { if (NULL == pJob->execRes.res) {
...@@ -1156,25 +1247,16 @@ int32_t schGetTaskInJob(SSchJob *pJob, uint64_t taskId, SSchTask **pTask) { ...@@ -1156,25 +1247,16 @@ int32_t schGetTaskInJob(SSchJob *pJob, uint64_t taskId, SSchTask **pTask) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t schUpdateTaskExecNodeHandle(SSchTask *pTask, void *handle, int32_t rspCode) {
if (rspCode || NULL == pTask->execNodes || taosArrayGetSize(pTask->execNodes) > 1 ||
taosArrayGetSize(pTask->execNodes) <= 0) {
return TSDB_CODE_SUCCESS;
}
SSchNodeInfo *nodeInfo = taosArrayGet(pTask->execNodes, 0);
nodeInfo->handle = handle;
return TSDB_CODE_SUCCESS;
}
int32_t schLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) { int32_t schLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) {
int8_t status = 0; int8_t status = 0;
int32_t code = 0; int32_t code = 0;
atomic_add_fetch_32(&pTask->level->taskLaunchedNum, 1); atomic_add_fetch_32(&pTask->level->taskLaunchedNum, 1);
pTask->execIdx++;
SCH_LOG_TASK_START_TS(pTask);
if (schJobNeedToStop(pJob, &status)) { if (schJobNeedToStop(pJob, &status)) {
SCH_TASK_DLOG("no need to launch task cause of job status, job status:%s", jobTaskStatusStr(status)); SCH_TASK_DLOG("no need to launch task cause of job status, job status:%s", jobTaskStatusStr(status));
...@@ -1263,19 +1345,20 @@ void schDropTaskOnExecNode(SSchJob *pJob, SSchTask *pTask) { ...@@ -1263,19 +1345,20 @@ void schDropTaskOnExecNode(SSchJob *pJob, SSchTask *pTask) {
return; return;
} }
int32_t size = (int32_t)taosArrayGetSize(pTask->execNodes); int32_t size = (int32_t)taosHashGetSize(pTask->execNodes);
if (size <= 0) { if (size <= 0) {
SCH_TASK_DLOG("task has no execNodes, no need to drop it, status:%s", SCH_GET_TASK_STATUS_STR(pTask)); SCH_TASK_DLOG("task has no execNodes, no need to drop it, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
return; return;
} }
SSchNodeInfo *nodeInfo = NULL; SSchNodeInfo *nodeInfo = taosHashIterate(pTask->execNodes, NULL);
for (int32_t i = 0; i < size; ++i) { while (nodeInfo) {
nodeInfo = (SSchNodeInfo *)taosArrayGet(pTask->execNodes, i);
SCH_SET_TASK_HANDLE(pTask, nodeInfo->handle); SCH_SET_TASK_HANDLE(pTask, nodeInfo->handle);
schBuildAndSendMsg(pJob, pTask, &nodeInfo->addr, TDMT_VND_DROP_TASK); schBuildAndSendMsg(pJob, pTask, &nodeInfo->addr, TDMT_VND_DROP_TASK);
nodeInfo = taosHashIterate(pTask->execNodes, nodeInfo);
} }
SCH_TASK_DLOG("task has %d exec address", size); SCH_TASK_DLOG("task has %d exec address", size);
...@@ -1332,7 +1415,7 @@ void schFreeJobImpl(void *job) { ...@@ -1332,7 +1415,7 @@ void schFreeJobImpl(void *job) {
int32_t numOfTasks = taosArrayGetSize(pLevel->subTasks); int32_t numOfTasks = taosArrayGetSize(pLevel->subTasks);
for (int32_t j = 0; j < numOfTasks; ++j) { for (int32_t j = 0; j < numOfTasks; ++j) {
SSchTask *pTask = taosArrayGet(pLevel->subTasks, j); SSchTask *pTask = taosArrayGet(pLevel->subTasks, j);
schFreeTask(pTask); schFreeTask(pJob, pTask);
} }
taosArrayDestroy(pLevel->subTasks); taosArrayDestroy(pLevel->subTasks);
......
...@@ -366,7 +366,12 @@ int32_t schHandleCallback(void *param, const SDataBuf *pMsg, int32_t msgType, in ...@@ -366,7 +366,12 @@ int32_t schHandleCallback(void *param, const SDataBuf *pMsg, int32_t msgType, in
SCH_TASK_DLOG("rsp msg received, type:%s, handle:%p, code:%s", TMSG_INFO(msgType), pMsg->handle, tstrerror(rspCode)); SCH_TASK_DLOG("rsp msg received, type:%s, handle:%p, code:%s", TMSG_INFO(msgType), pMsg->handle, tstrerror(rspCode));
SCH_ERR_JRET(schUpdateTaskHandle(pJob, pTask, msgType, pMsg->handle, rspCode)); if (pParam->execIdx != pTask->execIdx) {
SCH_TASK_DLOG("execIdx %d mis-match current execIdx %d", pParam->execIdx, pTask->execIdx);
goto _return;
}
SCH_ERR_JRET(schUpdateTaskHandle(pJob, pTask, msgType, pMsg->handle, pParam->execIdx));
SCH_ERR_JRET(schHandleResponseMsg(pJob, pTask, msgType, pMsg->pData, pMsg->len, rspCode)); SCH_ERR_JRET(schHandleResponseMsg(pJob, pTask, msgType, pMsg->pData, pMsg->len, rspCode));
...@@ -426,7 +431,7 @@ int32_t schHandleLinkBrokenCallback(void *param, const SDataBuf *pMsg, int32_t c ...@@ -426,7 +431,7 @@ int32_t schHandleLinkBrokenCallback(void *param, const SDataBuf *pMsg, int32_t c
SSchTrans trans = {.pTrans = hbParam->pTrans, .pHandle = NULL}; SSchTrans trans = {.pTrans = hbParam->pTrans, .pHandle = NULL};
SCH_ERR_RET(schUpdateHbConnection(&hbParam->nodeEpId, &trans)); SCH_ERR_RET(schUpdateHbConnection(&hbParam->nodeEpId, &trans));
SCH_ERR_RET(schBuildAndSendHbMsg(&hbParam->nodeEpId)); SCH_ERR_RET(schBuildAndSendHbMsg(&hbParam->nodeEpId, NULL));
} else { } else {
SCH_ERR_RET(schHandleCallback(param, pMsg, TDMT_SCH_LINK_BROKEN, code)); SCH_ERR_RET(schHandleCallback(param, pMsg, TDMT_SCH_LINK_BROKEN, code));
} }
...@@ -454,7 +459,8 @@ int32_t schGenerateCallBackInfo(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ...@@ -454,7 +459,8 @@ int32_t schGenerateCallBackInfo(SSchJob *pJob, SSchTask *pTask, int32_t msgType,
param->queryId = pJob->queryId; param->queryId = pJob->queryId;
param->refId = pJob->refId; param->refId = pJob->refId;
param->taskId = SCH_TASK_ID(pTask); param->taskId = SCH_TASK_ID(pTask);
param->transport = pJob->pTrans; param->pTrans = pJob->pTrans;
param->execIdx = pTask->execIdx;
msgSendInfo->param = param; msgSendInfo->param = param;
msgSendInfo->fp = fp; msgSendInfo->fp = fp;
...@@ -717,7 +723,7 @@ int32_t schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId) { ...@@ -717,7 +723,7 @@ int32_t schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId) {
__async_send_cb_fn_t fp = NULL; __async_send_cb_fn_t fp = NULL;
SCH_ERR_JRET(schGetCallbackFp(msgType, &fp)); SCH_ERR_JRET(schGetCallbackFp(msgType, &fp));
param->transport = trans.pTrans; param->pTrans = trans.pTrans;
pMsgSendInfo->param = param; pMsgSendInfo->param = param;
pMsgSendInfo->msgInfo.pData = msg; pMsgSendInfo->msgInfo.pData = msg;
...@@ -768,10 +774,14 @@ int32_t schEnsureHbConnection(SSchJob *pJob, SSchTask *pTask) { ...@@ -768,10 +774,14 @@ int32_t schEnsureHbConnection(SSchJob *pJob, SSchTask *pTask) {
bool exist = false; bool exist = false;
SCH_ERR_RET(schRegisterHbConnection(pJob, pTask, &epId, &exist)); SCH_ERR_RET(schRegisterHbConnection(pJob, pTask, &epId, &exist));
if (!exist) { if (!exist) {
SCH_ERR_RET(schBuildAndSendHbMsg(&epId)); SCH_ERR_RET(schBuildAndSendHbMsg(&epId, NULL));
} }
} }
atomic_add_fetch_64(&hb->taskNum, 1);
pTask->registerdHb = true;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -810,33 +820,12 @@ int32_t schHandleHbCallback(void *param, const SDataBuf *pMsg, int32_t code) { ...@@ -810,33 +820,12 @@ int32_t schHandleHbCallback(void *param, const SDataBuf *pMsg, int32_t code) {
} }
SSchTrans trans = {0}; SSchTrans trans = {0};
trans.pTrans = pParam->transport; trans.pTrans = pParam->pTrans;
trans.pHandle = pMsg->handle; trans.pHandle = pMsg->handle;
SCH_ERR_JRET(schUpdateHbConnection(&rsp.epId, &trans)); SCH_ERR_JRET(schUpdateHbConnection(&rsp.epId, &trans));
int32_t taskNum = (int32_t)taosArrayGetSize(rsp.taskStatus); SCH_ERR_JRET(schProcessOnTaskStatusRsp(&rsp.epId, rsp.taskStatus));
qDebug("%d task status in hb rsp, nodeId:%d, fqdn:%s, port:%d", taskNum, rsp.epId.nodeId, rsp.epId.ep.fqdn,
rsp.epId.ep.port);
for (int32_t i = 0; i < taskNum; ++i) {
STaskStatus *taskStatus = taosArrayGet(rsp.taskStatus, i);
SSchJob *pJob = schAcquireJob(taskStatus->refId);
if (NULL == pJob) {
qWarn("job not found, refId:0x%" PRIx64 ",QID:0x%" PRIx64 ",TID:0x%" PRIx64, taskStatus->refId,
taskStatus->queryId, taskStatus->taskId);
// TODO DROP TASK FROM SERVER!!!!
continue;
}
// TODO
SCH_JOB_DLOG("TID:0x%" PRIx64 " task status in server: %s", taskStatus->taskId,
jobTaskStatusStr(taskStatus->status));
schReleaseJob(taskStatus->refId);
}
_return: _return:
...@@ -856,7 +845,8 @@ int32_t schMakeCallbackParam(SSchJob *pJob, SSchTask *pTask, void **pParam) { ...@@ -856,7 +845,8 @@ int32_t schMakeCallbackParam(SSchJob *pJob, SSchTask *pTask, void **pParam) {
param->queryId = pJob->queryId; param->queryId = pJob->queryId;
param->refId = pJob->refId; param->refId = pJob->refId;
param->taskId = SCH_TASK_ID(pTask); param->taskId = SCH_TASK_ID(pTask);
param->transport = pJob->pTrans; param->pTrans = pJob->pTrans;
param->taskId = pTask->taskId;
*pParam = param; *pParam = param;
...@@ -1158,7 +1148,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, ...@@ -1158,7 +1148,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
(rpcCtx.args ? &rpcCtx : NULL))); (rpcCtx.args ? &rpcCtx : NULL)));
if (msgType == TDMT_VND_QUERY) { if (msgType == TDMT_VND_QUERY) {
SCH_ERR_RET(schRecordTaskExecNode(pJob, pTask, addr, trans.pHandle)); SCH_ERR_RET(schAppendTaskExecNode(pJob, pTask, addr, pTask->execIdx));
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
......
...@@ -176,6 +176,10 @@ int32_t scheduleCancelJob(int64_t job) { ...@@ -176,6 +176,10 @@ int32_t scheduleCancelJob(int64_t job) {
SCH_RET(code); SCH_RET(code);
} }
void schedulerStopTransport(void *pTrans) {
// CLOSE && REMOVE RELATED HB CONNECTIONS
}
void schedulerFreeJob(int64_t job) { void schedulerFreeJob(int64_t job) {
SSchJob *pJob = schAcquireJob(job); SSchJob *pJob = schAcquireJob(job);
if (NULL == pJob) { if (NULL == pJob) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册