提交 914e06e8 编写于 作者: D dapan1121

feat: query redirect

上级 c6440a7a
......@@ -48,7 +48,7 @@ int32_t qCreateQueryPlan(SPlanContext* pCxt, SQueryPlan** pPlan, SArray* pExecNo
// @pSource one execution location of this group of datasource subplans
int32_t qSetSubplanExecutionNode(SSubplan* pSubplan, int32_t groupId, SDownstreamSourceNode* pSource);
int32_t qClearSubplanExecutionNode(SSubplan* pSubplan);
void qClearSubplanExecutionNode(SSubplan* pSubplan);
// Convert to subplan to string for the scheduler to send to the executor
int32_t qSubPlanToString(const SSubplan* pSubplan, char** pStr, int32_t* pLen);
......
......@@ -135,6 +135,7 @@ typedef struct STableMetaOutput {
} STableMetaOutput;
typedef struct SDataBuf {
int32_t msgType;
void* pData;
uint32_t len;
void* handle;
......@@ -147,7 +148,7 @@ typedef struct STargetInfo {
int32_t vgId;
} STargetInfo;
typedef int32_t (*__async_send_cb_fn_t)(void* param, const SDataBuf* pMsg, int32_t code);
typedef int32_t (*__async_send_cb_fn_t)(void* param, SDataBuf* pMsg, int32_t code);
typedef int32_t (*__async_exec_fn_t)(void* param);
typedef struct SRequestConnInfo {
......
......@@ -262,7 +262,7 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) {
return TSDB_CODE_SUCCESS;
}
static int32_t hbAsyncCallBack(void *param, const SDataBuf *pMsg, int32_t code) {
static int32_t hbAsyncCallBack(void *param, SDataBuf *pMsg, int32_t code) {
static int32_t emptyRspNum = 0;
if (code != 0) {
taosMemoryFreeClear(param);
......
......@@ -1307,7 +1307,7 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
updateTargetEpSet(pSendInfo, pTscObj, pMsg, pEpSet);
SDataBuf buf = {.len = pMsg->contLen, .pData = NULL, .handle = pMsg->info.handle, .pEpSet = pEpSet};
SDataBuf buf = {.msgType = pMsg->msgType, .len = pMsg->contLen, .pData = NULL, .handle = pMsg->info.handle, .pEpSet = pEpSet};
if (pMsg->contLen > 0) {
buf.pData = taosMemoryCalloc(1, pMsg->contLen);
......
......@@ -26,7 +26,7 @@ static void setErrno(SRequestObj* pRequest, int32_t code) {
terrno = code;
}
int32_t genericRspCallback(void* param, const SDataBuf* pMsg, int32_t code) {
int32_t genericRspCallback(void* param, SDataBuf* pMsg, int32_t code) {
SRequestObj* pRequest = param;
setErrno(pRequest, code);
......@@ -39,7 +39,7 @@ int32_t genericRspCallback(void* param, const SDataBuf* pMsg, int32_t code) {
return code;
}
int32_t processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) {
int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) {
SRequestObj* pRequest = param;
if (code != TSDB_CODE_SUCCESS) {
taosMemoryFree(pMsg->pData);
......@@ -116,7 +116,7 @@ SMsgSendInfo* buildMsgInfoImpl(SRequestObj* pRequest) {
return pMsgSendInfo;
}
int32_t processCreateDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
int32_t processCreateDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
// todo rsp with the vnode id list
SRequestObj* pRequest = param;
taosMemoryFree(pMsg->pData);
......@@ -132,7 +132,7 @@ int32_t processCreateDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
return code;
}
int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
int32_t processUseDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
SRequestObj* pRequest = param;
if (TSDB_CODE_MND_DB_NOT_EXIST == code) {
......@@ -211,7 +211,7 @@ int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
return 0;
}
int32_t processCreateSTableRsp(void* param, const SDataBuf* pMsg, int32_t code) {
int32_t processCreateSTableRsp(void* param, SDataBuf* pMsg, int32_t code) {
assert(pMsg != NULL && param != NULL);
SRequestObj* pRequest = param;
......@@ -229,7 +229,7 @@ int32_t processCreateSTableRsp(void* param, const SDataBuf* pMsg, int32_t code)
return code;
}
int32_t processDropDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
int32_t processDropDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
SRequestObj* pRequest = param;
if (code != TSDB_CODE_SUCCESS) {
setErrno(pRequest, code);
......@@ -250,7 +250,7 @@ int32_t processDropDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
return code;
}
int32_t processAlterStbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
int32_t processAlterStbRsp(void* param, SDataBuf* pMsg, int32_t code) {
SRequestObj* pRequest = param;
if (code != TSDB_CODE_SUCCESS) {
setErrno(pRequest, code);
......@@ -357,7 +357,7 @@ static int32_t buildShowVariablesRsp(SArray* pVars, SRetrieveTableRsp** pRsp) {
return TSDB_CODE_SUCCESS;
}
int32_t processShowVariablesRsp(void* param, const SDataBuf* pMsg, int32_t code) {
int32_t processShowVariablesRsp(void* param, SDataBuf* pMsg, int32_t code) {
SRequestObj* pRequest = param;
if (code != TSDB_CODE_SUCCESS) {
setErrno(pRequest, code);
......
......@@ -372,7 +372,7 @@ int32_t tmqCommitCb(void* param, const SDataBuf* pMsg, int32_t code) {
return 0;
}
int32_t tmqCommitCb2(void* param, const SDataBuf* pBuf, int32_t code) {
int32_t tmqCommitCb2(void* param, SDataBuf* pBuf, int32_t code) {
SMqCommitCbParam2* pParam = (SMqCommitCbParam2*)param;
SMqCommitCbParamSet* pParamSet = (SMqCommitCbParamSet*)pParam->params;
// push into array
......@@ -862,7 +862,7 @@ void tmqClearUnhandleMsg(tmq_t* tmq) {
}
}
int32_t tmqSubscribeCb(void* param, const SDataBuf* pMsg, int32_t code) {
int32_t tmqSubscribeCb(void* param, SDataBuf* pMsg, int32_t code) {
SMqSubscribeCbParam* pParam = (SMqSubscribeCbParam*)param;
pParam->rspErr = code;
/*tmq_t* tmq = pParam->tmq;*/
......@@ -1116,7 +1116,7 @@ int32_t tmqGetSkipLogNum(tmq_message_t* tmq_message) {
}
#endif
int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
SMqPollCbParam* pParam = (SMqPollCbParam*)param;
SMqClientVg* pVg = pParam->pVg;
SMqClientTopic* pTopic = pParam->pTopic;
......@@ -1368,7 +1368,7 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) {
}
#endif
int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) {
int32_t tmqAskEpCb(void* param, SDataBuf* pMsg, int32_t code) {
SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param;
tmq_t* tmq = pParam->tmq;
int8_t async = pParam->async;
......
......@@ -111,7 +111,7 @@ SArray *qmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_QUERY, qmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_CONTINUE, qmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_FETCH, qmPutNodeMsgToFetchQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH_RSP, qmPutNodeMsgToFetchQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_FETCH_RSP, qmPutNodeMsgToFetchQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_HEARTBEAT, qmPutNodeMsgToFetchQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_CANCEL_TASK, qmPutNodeMsgToFetchQueue, 1) == NULL) goto _OVER;
......
......@@ -86,7 +86,7 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
return;
case TDMT_MND_SYSTABLE_RETRIEVE_RSP:
case TDMT_DND_SYSTABLE_RETRIEVE_RSP:
case TDMT_VND_FETCH_RSP:
case TDMT_SCH_FETCH_RSP:
qWorkerProcessFetchRsp(NULL, NULL, pRpc, 0);
return;
case TDMT_MND_STATUS_RSP:
......
......@@ -88,7 +88,7 @@ int32_t qndProcessQueryMsg(SQnode *pQnode, int64_t ts, SRpcMsg *pMsg) {
case TDMT_SCH_FETCH:
code = qWorkerProcessFetchMsg(pQnode, pQnode->pQuery, pMsg, ts);
break;
case TDMT_VND_FETCH_RSP:
case TDMT_SCH_FETCH_RSP:
code = qWorkerProcessFetchRsp(pQnode, pQnode->pQuery, pMsg, ts);
break;
case TDMT_SCH_CANCEL_TASK:
......
......@@ -246,7 +246,7 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
switch (pMsg->msgType) {
case TDMT_SCH_FETCH:
return qWorkerProcessFetchMsg(pVnode, pVnode->pQuery, pMsg, 0);
case TDMT_VND_FETCH_RSP:
case TDMT_SCH_FETCH_RSP:
return qWorkerProcessFetchRsp(pVnode, pVnode->pQuery, pMsg, 0);
case TDMT_SCH_CANCEL_TASK:
return qWorkerProcessCancelMsg(pVnode, pVnode->pQuery, pMsg, 0);
......
......@@ -241,7 +241,7 @@ int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize,
}
int32_t ctgHandleMsgCallback(void *param, const SDataBuf *pMsg, int32_t rspCode) {
int32_t ctgHandleMsgCallback(void *param, SDataBuf *pMsg, int32_t rspCode) {
SCtgTaskCallbackParam* cbParam = (SCtgTaskCallbackParam*)param;
int32_t code = 0;
......
......@@ -1933,7 +1933,7 @@ typedef struct SFetchRspHandleWrapper {
int32_t sourceIndex;
} SFetchRspHandleWrapper;
int32_t loadRemoteDataCallback(void* param, const SDataBuf* pMsg, int32_t code) {
int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) {
SFetchRspHandleWrapper* pWrapper = (SFetchRspHandleWrapper*)param;
SExchangeInfo* pExchangeInfo = taosAcquireRef(exchangeObjRefPool, pWrapper->exchangeId);
......
......@@ -1322,7 +1322,7 @@ static void getDBNameFromCondition(SNode* pCondition, const char* dbName) {
nodesWalkExpr(pCondition, getDBNameFromConditionWalker, (char*)dbName);
}
static int32_t loadSysTableCallback(void* param, const SDataBuf* pMsg, int32_t code) {
static int32_t loadSysTableCallback(void* param, SDataBuf* pMsg, int32_t code) {
SOperatorInfo* operator=(SOperatorInfo*) param;
SSysTableScanInfo* pScanResInfo = (SSysTableScanInfo*)operator->info;
if (TSDB_CODE_SUCCESS == code) {
......
......@@ -82,6 +82,7 @@ extern SQWDebug gQWDebug;
typedef struct SQWMsg {
void *node;
int32_t code;
int32_t msgType;
char *msg;
int32_t msgLen;
SRpcHandleInfo connInfo;
......@@ -100,6 +101,7 @@ typedef struct SQWHbInfo {
typedef struct SQWPhaseInput {
int32_t code;
int32_t msgType;
} SQWPhaseInput;
typedef struct SQWPhaseOutput {
......@@ -119,6 +121,7 @@ typedef struct SQWTaskCtx {
int8_t phase;
int8_t taskType;
int8_t explain;
int32_t queryType;
bool queryFetched;
bool queryEnd;
......
......@@ -39,7 +39,7 @@ int32_t qwBuildAndSendFetchRsp(SRpcHandleInfo *pConn, SRetrieveTableRsp *pRsp, i
int32_t code);
void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComplete);
int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn);
int32_t qwBuildAndSendQueryRsp(SRpcHandleInfo *pConn, int32_t code, STbVerInfo* tbInfo);
int32_t qwBuildAndSendQueryRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code, STbVerInfo* tbInfo);
int32_t qwBuildAndSendExplainRsp(SRpcHandleInfo *pConn, SExplainExecInfo *execInfo, int32_t num);
void qwFreeFetchRsp(void *msg);
int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp);
......
......@@ -43,7 +43,7 @@ void qwFreeFetchRsp(void *msg) {
}
}
int32_t qwBuildAndSendQueryRsp(SRpcHandleInfo *pConn, int32_t code, STbVerInfo* tbInfo) {
int32_t qwBuildAndSendQueryRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code, STbVerInfo* tbInfo) {
SQueryTableRsp *pRsp = (SQueryTableRsp *)rpcMallocCont(sizeof(SQueryTableRsp));
pRsp->code = code;
if (tbInfo) {
......@@ -53,7 +53,7 @@ int32_t qwBuildAndSendQueryRsp(SRpcHandleInfo *pConn, int32_t code, STbVerInfo*
}
SRpcMsg rpcRsp = {
.msgType = TDMT_VND_QUERY_RSP,
.msgType = rspType,
.pCont = pRsp,
.contLen = sizeof(*pRsp),
.code = code,
......@@ -73,7 +73,7 @@ int32_t qwBuildAndSendExplainRsp(SRpcHandleInfo *pConn, SExplainExecInfo *execIn
tSerializeSExplainRsp(pRsp, contLen, &rsp);
SRpcMsg rpcRsp = {
.msgType = TDMT_VND_EXPLAIN_RSP,
.msgType = TDMT_SCH_EXPLAIN_RSP,
.pCont = pRsp,
.contLen = contLen,
.code = 0,
......@@ -92,7 +92,7 @@ int32_t qwBuildAndSendHbRsp(SRpcHandleInfo *pConn, SSchedulerHbRsp *pStatus, int
tSerializeSSchedulerHbRsp(pRsp, contLen, pStatus);
SRpcMsg rpcRsp = {
.msgType = TDMT_VND_QUERY_HEARTBEAT_RSP,
.msgType = TDMT_SCH_QUERY_HEARTBEAT_RSP,
.contLen = contLen,
.pCont = pRsp,
.code = code,
......@@ -112,7 +112,7 @@ int32_t qwBuildAndSendFetchRsp(SRpcHandleInfo *pConn, SRetrieveTableRsp *pRsp, i
}
SRpcMsg rpcRsp = {
.msgType = TDMT_VND_FETCH_RSP,
.msgType = TDMT_SCH_FETCH_RSP,
.pCont = pRsp,
.contLen = sizeof(*pRsp) + dataLength,
.code = code,
......@@ -129,7 +129,7 @@ int32_t qwBuildAndSendCancelRsp(SRpcHandleInfo *pConn, int32_t code) {
pRsp->code = code;
SRpcMsg rpcRsp = {
.msgType = TDMT_VND_CANCEL_TASK_RSP,
.msgType = TDMT_SCH_CANCEL_TASK_RSP,
.pCont = pRsp,
.contLen = sizeof(*pRsp),
.code = code,
......@@ -145,7 +145,7 @@ int32_t qwBuildAndSendDropRsp(SRpcHandleInfo *pConn, int32_t code) {
pRsp->code = code;
SRpcMsg rpcRsp = {
.msgType = TDMT_VND_DROP_TASK_RSP,
.msgType = TDMT_SCH_DROP_TASK_RSP,
.pCont = pRsp,
.contLen = sizeof(*pRsp),
.code = code,
......@@ -325,9 +325,9 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int
uint64_t tId = msg->taskId;
int64_t rId = msg->refId;
SQWMsg qwMsg = {.node = node, .msg = msg->msg + msg->sqlLen, .msgLen = msg->phyLen, .connInfo = pMsg->info};
SQWMsg qwMsg = {.node = node, .msg = msg->msg + msg->sqlLen, .msgLen = msg->phyLen, .connInfo = pMsg->info, .msgType = pMsg->msgType};
char * sql = strndup(msg->msg, msg->sqlLen);
QW_SCH_TASK_DLOG("processQuery start, node:%p, handle:%p, sql:%s", node, pMsg->info.handle, sql);
QW_SCH_TASK_DLOG("processQuery start, node:%p, type:%s, handle:%p, sql:%s", node, TMSG_INFO(pMsg->msgType), pMsg->info.handle, sql);
QW_ERR_RET(qwProcessQuery(QW_FPARAMS(), &qwMsg, msg->taskType, msg->explain, sql));
QW_SCH_TASK_DLOG("processQuery end, node:%p", node);
......
......@@ -417,17 +417,10 @@ int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inp
}
if (QW_PHASE_POST_QUERY == phase) {
#if 0
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_READY)) {
readyConnection = &ctx->connInfo;
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_READY);
}
#else
connInfo = ctx->ctrlConnInfo;
rspConnection = &connInfo;
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_READY);
#endif
}
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
......@@ -458,8 +451,8 @@ _return:
}
if (rspConnection) {
qwBuildAndSendQueryRsp(rspConnection, code, ctx ? &ctx->tbInfo : NULL);
QW_TASK_DLOG("ready msg rsped, handle:%p, code:%x - %s", rspConnection->handle, code, tstrerror(code));
qwBuildAndSendQueryRsp(input->msgType + 1, rspConnection, code, ctx ? &ctx->tbInfo : NULL);
QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", rspConnection->handle, code, tstrerror(code));
}
if (ctx) {
......@@ -530,8 +523,9 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t ex
QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx));
atomic_store_8(&ctx->taskType, taskType);
atomic_store_8(&ctx->explain, explain);
ctx->taskType = taskType;
ctx->explain = explain;
ctx->queryType = qwMsg->msgType;
QW_TASK_DLOGL("subplan json string, len:%d, %s", qwMsg->msgLen, qwMsg->msg);
......@@ -571,6 +565,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t ex
_return:
input.code = code;
input.msgType = qwMsg->msgType;
code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_QUERY, &input, NULL);
// if (!queryRsped) {
......
......@@ -202,7 +202,8 @@ void qwtSendReqToDnode(void* pVnode, struct SEpSet* epSet, struct SRpcMsg* pReq)
void qwtRpcSendResponse(const SRpcMsg *pRsp) {
switch (pRsp->msgType) {
case TDMT_VND_QUERY_RSP: {
case TDMT_SCH_QUERY_RSP:
case TDMT_SCH_MERGE_QUERY_RSP: {
SQueryTableRsp *rsp = (SQueryTableRsp *)pRsp->pCont;
if (pRsp->code) {
......@@ -213,7 +214,7 @@ void qwtRpcSendResponse(const SRpcMsg *pRsp) {
rpcFreeCont(rsp);
break;
}
case TDMT_VND_FETCH_RSP: {
case TDMT_SCH_FETCH_RSP: {
SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)pRsp->pCont;
if (0 == pRsp->code && 0 == rsp->completed) {
......@@ -229,7 +230,7 @@ void qwtRpcSendResponse(const SRpcMsg *pRsp) {
break;
}
case TDMT_VND_DROP_TASK_RSP: {
case TDMT_SCH_DROP_TASK_RSP: {
STaskDropRsp *rsp = (STaskDropRsp *)pRsp->pCont;
rpcFreeCont(rsp);
......
......@@ -353,7 +353,7 @@ void schFreeJobImpl(void *job);
int32_t schMakeHbRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx);
int32_t schEnsureHbConnection(SSchJob *pJob, SSchTask *pTask);
int32_t schUpdateHbConnection(SQueryNodeEpId *epId, SSchTrans *trans);
int32_t schHandleHbCallback(void *param, const SDataBuf *pMsg, int32_t code);
int32_t schHandleHbCallback(void *param, SDataBuf *pMsg, int32_t code);
void schFreeRpcCtx(SRpcCtx *pCtx);
int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp);
bool schJobNeedToStop(SSchJob *pJob, int8_t *pStatus);
......@@ -383,7 +383,8 @@ char* schGetOpStr(SCH_OP_TYPE type);
int32_t schBeginOperation(SSchJob *pJob, SCH_OP_TYPE type, bool sync);
int32_t schInitJob(SSchedulerReq *pReq, SSchJob **pSchJob);
int32_t schSetJobQueryRes(SSchJob* pJob, SQueryResult* pRes);
int32_t schUpdateTaskCandidateAddr(SSchTask *pTask, SEpSet* pEpSet);
int32_t schUpdateTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask, SEpSet* pEpSet);
int32_t schHandleRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf* pData, int32_t rspCode);
#ifdef __cplusplus
......
......@@ -682,9 +682,9 @@ int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) {
return TSDB_CODE_SUCCESS;
}
int32_t schUpdateTaskCandidateAddr(SSchTask *pTask, SEpSet* pEpSet) {
int32_t schUpdateTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask, SEpSet* pEpSet) {
if (NULL == pTask->candidateAddrs || 1 != taosArrayGetSize(pTask->candidateAddrs)) {
SCH_TASK_ELOG("not able to update cndidate addr, addr num %d", (pTask->candidateAddrs ? taosArrayGetSize(pTask->candidateAddrs): 0));
SCH_TASK_ELOG("not able to update cndidate addr, addr num %d", (int32_t)(pTask->candidateAddrs ? taosArrayGetSize(pTask->candidateAddrs): 0));
SCH_ERR_RET(TSDB_CODE_APP_ERROR);
}
......@@ -1685,7 +1685,7 @@ int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, int32_t rspCode) {
return TSDB_CODE_SUCCESS;
}
SCH_TASK_DLOG("task will be redirected now, status:%d", SCH_GET_TASK_STATUS_STR(pTask));
SCH_TASK_DLOG("task will be redirected now, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
schDropTaskOnExecNode(pJob, pTask);
taosHashClear(pTask->execNodes);
......@@ -1735,7 +1735,7 @@ _return:
SCH_RET(code);
}
int32_t schHandleRedirect(SSchJob *pJob, SSchTask *pTask, int32_t msgType, SDataBuf* pData, int32_t rspCode) {
int32_t schHandleRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf* pData, int32_t rspCode) {
int32_t code = 0;
if (SCH_IS_DATA_SRC_QRY_TASK(pTask)) {
......@@ -1744,7 +1744,7 @@ int32_t schHandleRedirect(SSchJob *pJob, SSchTask *pTask, int32_t msgType, SData
SCH_ERR_JRET(rspCode);
}
SCH_ERR_JRET(schUpdateTaskCandidateAddr(pTask, pData->pEpSet));
SCH_ERR_JRET(schUpdateTaskCandidateAddr(pJob, pTask, pData->pEpSet));
}
schDoTaskRedirect(pJob, pTask, rspCode);
......
......@@ -28,9 +28,10 @@ int32_t schValidateReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgTy
int32_t reqMsgType = msgType - 1;
switch (msgType) {
case TDMT_SCH_LINK_BROKEN:
case TDMT_VND_EXPLAIN_RSP:
case TDMT_SCH_EXPLAIN_RSP:
return TSDB_CODE_SUCCESS;
case TDMT_VND_QUERY_RSP: // query_rsp may be processed later than ready_rsp
case TDMT_SCH_MERGE_QUERY_RSP:
case TDMT_SCH_QUERY_RSP: // query_rsp may be processed later than ready_rsp
if (lastMsgType != reqMsgType && -1 != lastMsgType) {
SCH_TASK_DLOG("rsp msg type mis-match, last sent msgType:%s, rspType:%s", TMSG_INFO(lastMsgType),
TMSG_INFO(msgType));
......@@ -43,7 +44,7 @@ int32_t schValidateReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgTy
SCH_SET_TASK_LASTMSG_TYPE(pTask, -1);
return TSDB_CODE_SUCCESS;
case TDMT_VND_FETCH_RSP:
case TDMT_SCH_FETCH_RSP:
if (lastMsgType != reqMsgType && -1 != lastMsgType) {
SCH_TASK_ELOG("rsp msg type mis-match, last sent msgType:%s, rspType:%s", TMSG_INFO(lastMsgType),
TMSG_INFO(msgType));
......@@ -238,7 +239,8 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
break;
}
case TDMT_VND_QUERY_RSP: {
case TDMT_SCH_QUERY_RSP:
case TDMT_SCH_MERGE_QUERY_RSP: {
SQueryTableRsp *rsp = (SQueryTableRsp *)msg;
SCH_ERR_JRET(rspCode);
......@@ -255,7 +257,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
break;
}
case TDMT_VND_EXPLAIN_RSP: {
case TDMT_SCH_EXPLAIN_RSP: {
SCH_ERR_JRET(rspCode);
if (NULL == msg) {
SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
......@@ -285,7 +287,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
}
break;
}
case TDMT_VND_FETCH_RSP: {
case TDMT_SCH_FETCH_RSP: {
SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)msg;
SCH_ERR_JRET(rspCode);
......@@ -333,8 +335,8 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
schProcessOnDataFetched(pJob);
break;
}
case TDMT_VND_DROP_TASK_RSP: {
// SHOULD NEVER REACH HERE
case TDMT_SCH_DROP_TASK_RSP: {
// NEVER REACH HERE
SCH_TASK_ELOG("invalid status to handle drop task rsp, refId:0x%" PRIx64, pJob->refId);
SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
break;
......@@ -358,8 +360,9 @@ _return:
}
int32_t schHandleCallback(void *param, const SDataBuf *pMsg, int32_t msgType, int32_t rspCode) {
int32_t schHandleCallback(void *param, SDataBuf *pMsg, int32_t rspCode) {
int32_t code = 0;
int32_t msgType = pMsg->msgType;
SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param;
SSchTask *pTask = NULL;
......@@ -393,8 +396,8 @@ int32_t schHandleCallback(void *param, const SDataBuf *pMsg, int32_t msgType, in
SCH_ERR_JRET(schValidateReceivedMsgType(pJob, pTask, msgType));
if (NEED_SCHEDULER_REDIRECT_ERROR(rspCode) || ((rspCode == TSDB_CODE_RPC_NETWORK_UNAVAIL) && msgSize > 0)) {
code = schHandleRedirect(pJob, pTask, msgType, pMsg, rspCode);
if (NEED_SCHEDULER_REDIRECT_ERROR(rspCode) || ((rspCode == TSDB_CODE_RPC_NETWORK_UNAVAIL) && pMsg->len > 0)) {
code = schHandleRedirect(pJob, pTask, (SDataBuf *)pMsg, rspCode);
goto _return;
}
......@@ -416,46 +419,14 @@ _return:
SCH_RET(code);
}
int32_t schHandleSubmitCallback(void *param, const SDataBuf *pMsg, int32_t code) {
return schHandleCallback(param, pMsg, TDMT_VND_SUBMIT_RSP, code);
}
int32_t schHandleCreateTbCallback(void *param, const SDataBuf *pMsg, int32_t code) {
return schHandleCallback(param, pMsg, TDMT_VND_CREATE_TABLE_RSP, code);
}
int32_t schHandleDropTbCallback(void *param, const SDataBuf *pMsg, int32_t code) {
return schHandleCallback(param, pMsg, TDMT_VND_DROP_TABLE_RSP, code);
}
int32_t schHandleAlterTbCallback(void *param, const SDataBuf *pMsg, int32_t code) {
return schHandleCallback(param, pMsg, TDMT_VND_ALTER_TABLE_RSP, code);
}
int32_t schHandleQueryCallback(void *param, const SDataBuf *pMsg, int32_t code) {
return schHandleCallback(param, pMsg, TDMT_VND_QUERY_RSP, code);
}
int32_t schHandleDeleteCallback(void *param, const SDataBuf *pMsg, int32_t code) {
return schHandleCallback(param, pMsg, TDMT_VND_DELETE_RSP, code);
}
int32_t schHandleFetchCallback(void *param, const SDataBuf *pMsg, int32_t code) {
return schHandleCallback(param, pMsg, TDMT_VND_FETCH_RSP, code);
}
int32_t schHandleExplainCallback(void *param, const SDataBuf *pMsg, int32_t code) {
return schHandleCallback(param, pMsg, TDMT_VND_EXPLAIN_RSP, code);
}
int32_t schHandleDropCallback(void *param, const SDataBuf *pMsg, int32_t code) {
int32_t schHandleDropCallback(void *param, SDataBuf *pMsg, int32_t code) {
SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param;
qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 " drop task rsp received, code:0x%x", pParam->queryId, pParam->taskId, code);
taosMemoryFreeClear(param);
return TSDB_CODE_SUCCESS;
}
int32_t schHandleLinkBrokenCallback(void *param, const SDataBuf *pMsg, int32_t code) {
int32_t schHandleLinkBrokenCallback(void *param, SDataBuf *pMsg, int32_t code) {
SSchCallbackParamHeader *head = (SSchCallbackParamHeader *)param;
rpcReleaseHandle(pMsg->handle, TAOS_CONN_CLIENT);
......@@ -468,7 +439,7 @@ int32_t schHandleLinkBrokenCallback(void *param, const SDataBuf *pMsg, int32_t c
SCH_ERR_RET(schBuildAndSendHbMsg(&hbParam->nodeEpId, NULL));
} else {
SCH_ERR_RET(schHandleCallback(param, pMsg, TDMT_SCH_LINK_BROKEN, code));
SCH_ERR_RET(schHandleCallback(param, pMsg, code));
}
return TSDB_CODE_SUCCESS;
......@@ -564,29 +535,15 @@ _return:
int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) {
switch (msgType) {
case TDMT_VND_CREATE_TABLE:
*fp = schHandleCreateTbCallback;
break;
case TDMT_VND_DROP_TABLE:
*fp = schHandleDropTbCallback;
break;
case TDMT_VND_ALTER_TABLE:
*fp = schHandleAlterTbCallback;
break;
case TDMT_VND_SUBMIT:
*fp = schHandleSubmitCallback;
break;
case TDMT_SCH_QUERY:
case TDMT_SCH_MERGE_QUERY:
*fp = schHandleQueryCallback;
break;
case TDMT_VND_DELETE:
*fp = schHandleDeleteCallback;
break;
case TDMT_SCH_EXPLAIN:
*fp = schHandleExplainCallback;
break;
case TDMT_SCH_FETCH:
*fp = schHandleFetchCallback;
*fp = schHandleCallback;
break;
case TDMT_SCH_DROP_TASK:
*fp = schHandleDropCallback;
......@@ -700,7 +657,7 @@ int32_t schMakeHbRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) {
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
int32_t msgType = TDMT_VND_QUERY_HEARTBEAT_RSP;
int32_t msgType = TDMT_SCH_QUERY_HEARTBEAT_RSP;
__async_send_cb_fn_t fp = NULL;
SCH_ERR_JRET(schGetCallbackFp(TDMT_SCH_QUERY_HEARTBEAT, &fp));
......@@ -730,7 +687,7 @@ _return:
SCH_RET(code);
}
int32_t schHandleHbCallback(void *param, const SDataBuf *pMsg, int32_t code) {
int32_t schHandleHbCallback(void *param, SDataBuf *pMsg, int32_t code) {
SSchedulerHbRsp rsp = {0};
SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param;
......@@ -794,7 +751,7 @@ int32_t schMakeQueryRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) {
SSchTrans trans = {.pTrans = pJob->conn.pTrans, .pHandle = SCH_GET_TASK_HANDLE(pTask)};
SCH_ERR_JRET(schGenerateCallBackInfo(pJob, pTask, NULL, 0, TDMT_SCH_EXPLAIN, &trans, false, &pExplainMsgSendInfo));
int32_t msgType = TDMT_VND_EXPLAIN_RSP;
int32_t msgType = TDMT_SCH_EXPLAIN_RSP;
SRpcCtxVal ctxVal = {.val = pExplainMsgSendInfo, .clone = schCloneSMsgSendInfo};
if (taosHashPut(pCtx->args, &msgType, sizeof(msgType), &ctxVal, sizeof(ctxVal))) {
SCH_TASK_ELOG("taosHashPut msg %d to rpcCtx failed", msgType);
......
......@@ -412,7 +412,7 @@ void *schtCreateFetchRspThread(void *param) {
rsp->completed = 1;
rsp->numOfRows = 10;
code = schHandleResponseMsg(pJob, pJob->fetchTask, TDMT_VND_FETCH_RSP, (char *)rsp, sizeof(*rsp), 0);
code = schHandleResponseMsg(pJob, pJob->fetchTask, TDMT_SCH_FETCH_RSP, (char *)rsp, sizeof(*rsp), 0);
schReleaseJob(job);
......@@ -445,7 +445,7 @@ void *schtFetchRspThread(void *aa) {
dataBuf.pData = rsp;
dataBuf.len = sizeof(*rsp);
code = schHandleCallback(param, &dataBuf, TDMT_VND_FETCH_RSP, 0);
code = schHandleCallback(param, &dataBuf, TDMT_SCH_FETCH_RSP, 0);
assert(code == 0 || code);
}
......@@ -547,7 +547,7 @@ void* schtRunJobThread(void *aa) {
dataBuf.pData = &rsp;
dataBuf.len = sizeof(rsp);
code = schHandleCallback(param, &dataBuf, TDMT_VND_QUERY_RSP, 0);
code = schHandleCallback(param, &dataBuf, TDMT_SCH_QUERY_RSP, 0);
assert(code == 0 || code);
pIter = taosHashIterate(execTasks, pIter);
......@@ -566,7 +566,7 @@ void* schtRunJobThread(void *aa) {
dataBuf.pData = &rsp;
dataBuf.len = sizeof(rsp);
code = schHandleCallback(param, &dataBuf, TDMT_VND_QUERY_RSP, 0);
code = schHandleCallback(param, &dataBuf, TDMT_SCH_QUERY_RSP, 0);
assert(code == 0 || code);
pIter = taosHashIterate(execTasks, pIter);
......@@ -677,7 +677,7 @@ TEST(queryTest, normalCase) {
SSchTask *task = *(SSchTask **)pIter;
SQueryTableRsp rsp = {0};
code = schHandleResponseMsg(pJob, task, TDMT_VND_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0);
code = schHandleResponseMsg(pJob, task, TDMT_SCH_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0);
ASSERT_EQ(code, 0);
pIter = taosHashIterate(pJob->execTasks, pIter);
......@@ -688,7 +688,7 @@ TEST(queryTest, normalCase) {
SSchTask *task = *(SSchTask **)pIter;
SQueryTableRsp rsp = {0};
code = schHandleResponseMsg(pJob, task, TDMT_VND_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0);
code = schHandleResponseMsg(pJob, task, TDMT_SCH_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0);
ASSERT_EQ(code, 0);
pIter = taosHashIterate(pJob->execTasks, pIter);
......@@ -780,7 +780,7 @@ TEST(queryTest, readyFirstCase) {
SSchTask *task = *(SSchTask **)pIter;
SQueryTableRsp rsp = {0};
code = schHandleResponseMsg(pJob, task, TDMT_VND_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0);
code = schHandleResponseMsg(pJob, task, TDMT_SCH_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0);
ASSERT_EQ(code, 0);
pIter = taosHashIterate(pJob->execTasks, pIter);
......@@ -791,7 +791,7 @@ TEST(queryTest, readyFirstCase) {
SSchTask *task = *(SSchTask **)pIter;
SQueryTableRsp rsp = {0};
code = schHandleResponseMsg(pJob, task, TDMT_VND_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0);
code = schHandleResponseMsg(pJob, task, TDMT_SCH_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0);
ASSERT_EQ(code, 0);
pIter = taosHashIterate(pJob->execTasks, pIter);
......@@ -898,7 +898,7 @@ TEST(queryTest, flowCtrlCase) {
if (task->lastMsgType == TDMT_SCH_QUERY) {
SQueryTableRsp rsp = {0};
code = schHandleResponseMsg(pJob, task, TDMT_VND_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0);
code = schHandleResponseMsg(pJob, task, TDMT_SCH_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0);
ASSERT_EQ(code, 0);
} else {
......
......@@ -1114,4 +1114,6 @@ _return2:
rpcFreeCont(msg->pCont);
}
int transGetConnInfo(void* thandle, STransHandleInfo* pConnInfo) { return -1; }
#endif
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册