未验证 提交 4097b5ae 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #14986 from taosdata/fix/fixmsgorder

fix: fix msg disorder and other issues
......@@ -135,7 +135,7 @@ void rpcSendRequestWithCtx(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg
int32_t rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo);
void rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp);
void rpcSetDefaultAddr(void *thandle, const char *ip, const char *fqdn);
int64_t rpcAllocHandle();
void* rpcAllocHandle();
#ifdef __cplusplus
}
......
......@@ -171,6 +171,7 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) {
pTscObj->pAppInfo->totalDnodes = pRsp->query->totalDnodes;
pTscObj->pAppInfo->onlineDnodes = pRsp->query->onlineDnodes;
pTscObj->connId = pRsp->query->connId;
tscTrace("conn %p hb rsp, dnodes %d/%d", pTscObj->connId, pTscObj->pAppInfo->onlineDnodes, pTscObj->pAppInfo->totalDnodes);
if (pRsp->query->killRid) {
tscDebug("request rid %" PRIx64 " need to be killed now", pRsp->query->killRid);
......@@ -294,6 +295,7 @@ static int32_t hbAsyncCallBack(void *param, SDataBuf *pMsg, int32_t code) {
if (code != 0) {
(*pInst)->onlineDnodes = ((*pInst)->totalDnodes ? 0 : -1);
tscDebug("hb rsp error %s, update server status %d/%d", tstrerror(code), (*pInst)->onlineDnodes, (*pInst)->totalDnodes);
}
if (rspNum) {
......
......@@ -1276,7 +1276,12 @@ int32_t doProcessMsgFromServer(void* param) {
assert(pMsg->info.ahandle != NULL);
STscObj* pTscObj = NULL;
tscDebug("processMsgFromServer message: %s, code: %s", TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code));
STraceId* trace = &pMsg->info.traceId;
char tbuf[40] = {0};
TRACE_TO_STR(trace, tbuf);
tscDebug("processMsgFromServer handle %p, message: %s, code: %s, gtid: %s", pMsg->info.handle, TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code),
tbuf);
if (pSendInfo->requestObjRefId != 0) {
SRequestObj* pRequest = (SRequestObj*)taosAcquireRef(clientReqRefPool, pSendInfo->requestObjRefId);
......
......@@ -81,6 +81,7 @@ int32_t qmPutRpcMsgToQueue(SQnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
taosWriteQitem(pMgmt->queryWorker.queue, pMsg);
return 0;
case READ_QUEUE:
case FETCH_QUEUE:
dTrace("msg:%p, is created and will put into qnode-fetch queue", pMsg);
taosWriteQitem(pMgmt->fetchWorker.queue, pMsg);
return 0;
......
......@@ -163,7 +163,7 @@ int32_t ctgInitGetQnodeTask(SCtgJob *pJob, int32_t taskIdx, void* param) {
taosArrayPush(pJob->pTasks, &task);
qDebug("QID:0x%" PRIx64 " the %d task type %s initialized", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type));
qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type));
return TSDB_CODE_SUCCESS;
}
......@@ -178,7 +178,7 @@ int32_t ctgInitGetDnodeTask(SCtgJob *pJob, int32_t taskIdx, void* param) {
taosArrayPush(pJob->pTasks, &task);
qDebug("QID:0x%" PRIx64 " the %d task type %s initialized", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type));
qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type));
return TSDB_CODE_SUCCESS;
}
......@@ -264,7 +264,7 @@ int32_t ctgInitGetSvrVerTask(SCtgJob *pJob, int32_t taskIdx, void* param) {
taosArrayPush(pJob->pTasks, &task);
qDebug("QID:0x%" PRIx64 " [%dth] task type %s initialized", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type));
qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type));
return TSDB_CODE_SUCCESS;
}
......
......@@ -1980,6 +1980,7 @@ int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) {
qDebug("%s fetch rsp received, index:%d, rows:%d", pSourceDataInfo->taskId, index, pRsp->numOfRows);
} else {
pSourceDataInfo->code = code;
qDebug("%s fetch rsp received, index:%d, error:%d", pSourceDataInfo->taskId, index, tstrerror(code));
}
pSourceDataInfo->status = EX_SOURCE_DATA_READY;
......
......@@ -75,7 +75,9 @@ typedef struct SQWDebug {
bool lockEnable;
bool statusEnable;
bool dumpEnable;
bool tmp;
bool sleepSimulate;
bool deadSimulate;
bool redirectSimulate;
} SQWDebug;
extern SQWDebug gQWDebug;
......@@ -130,12 +132,11 @@ typedef struct SQWTaskCtx {
int8_t taskType;
int8_t explain;
int8_t needFetch;
int32_t queryType;
int32_t msgType;
int32_t fetchType;
int32_t execId;
bool queryRsped;
bool queryFetched;
bool queryEnd;
bool queryContinue;
bool queryInQueue;
......@@ -228,6 +229,7 @@ typedef struct SQWorkerMgmt {
#define QW_SET_EVENT_PROCESSED(ctx, event) atomic_store_8(&(ctx)->events[event], QW_EVENT_PROCESSED)
#define QW_GET_PHASE(ctx) atomic_load_8(&(ctx)->phase)
#define QW_SET_PHASE(ctx, _value) do { if ((_value) != QW_PHASE_PRE_FETCH && (_value) != QW_PHASE_POST_FETCH) { atomic_store_8(&(ctx)->phase, _value); } } while (0)
#define QW_SET_RSP_CODE(ctx, code) atomic_store_32(&(ctx)->rspCode, code)
#define QW_UPDATE_RSP_CODE(ctx, code) atomic_val_compare_exchange_32(&(ctx)->rspCode, 0, code)
......@@ -362,7 +364,7 @@ int32_t qwAcquireTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx);
int32_t qwGetTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx);
int32_t qwAddAcquireTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx);
void qwReleaseTaskCtx(SQWorker *mgmt, void *ctx);
int32_t qwKillTaskHandle(QW_FPARAMS_DEF, SQWTaskCtx *ctx);
int32_t qwKillTaskHandle(SQWTaskCtx *ctx);
int32_t qwUpdateTaskStatus(QW_FPARAMS_DEF, int8_t status);
int32_t qwDropTask(QW_FPARAMS_DEF);
void qwSaveTbVersionInfo(qTaskInfo_t pTaskInfo, SQWTaskCtx *ctx);
......@@ -372,13 +374,15 @@ int32_t qwUpdateTimeInQueue(SQWorker *mgmt, int64_t ts, EQueueType type);
int64_t qwGetTimeInQueue(SQWorker *mgmt, EQueueType type);
void qwClearExpiredSch(SQWorker *mgmt, SArray* pExpiredSch);
int32_t qwAcquireScheduler(SQWorker *mgmt, uint64_t sId, int32_t rwType, SQWSchStatus **sch);
void qwFreeTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx *ctx);
void qwFreeTaskCtx(SQWTaskCtx *ctx);
void qwDbgDumpMgmtInfo(SQWorker *mgmt);
int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, bool *ignore);
int32_t qwDbgBuildAndSendRedirectRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code, SEpSet *pEpSet);
int32_t qwAddTaskCtx(QW_FPARAMS_DEF);
int32_t qwDbgResponseRedirect(SQWMsg *qwMsg, SQWTaskCtx *ctx);
void qwDbgSimulateRedirect(SQWMsg *qwMsg, SQWTaskCtx *ctx, bool *rsped);
void qwDbgSimulateSleep(void);
void qwDbgSimulateDead(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *rsped);
#ifdef __cplusplus
......
......@@ -40,11 +40,13 @@ void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComple
int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn);
int32_t qwBuildAndSendQueryRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code, SQWTaskCtx *ctx);
int32_t qwBuildAndSendExplainRsp(SRpcHandleInfo *pConn, SExplainExecInfo *execInfo, int32_t num);
int32_t qwBuildAndSendErrorRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code);
void qwFreeFetchRsp(void *msg);
int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp);
int32_t qwBuildAndSendHbRsp(SRpcHandleInfo *pConn, SSchedulerHbRsp *rsp, int32_t code);
int32_t qwRegisterQueryBrokenLinkArg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn);
int32_t qwRegisterHbBrokenLinkArg(SQWorker *mgmt, uint64_t sId, SRpcHandleInfo *pConn);
int32_t qwBuildAndSendDropMsg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn);
#ifdef __cplusplus
}
......
......@@ -9,7 +9,7 @@
#include "tmsg.h"
#include "tname.h"
SQWDebug gQWDebug = {.statusEnable = true, .dumpEnable = false, .tmp = false};
SQWDebug gQWDebug = {.statusEnable = true, .dumpEnable = false, .redirectSimulate = false, .deadSimulate = false, .sleepSimulate = false};
int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, bool *ignore) {
if (!gQWDebug.statusEnable) {
......@@ -147,8 +147,17 @@ int32_t qwDbgBuildAndSendRedirectRsp(int32_t rspType, SRpcHandleInfo *pConn, int
return TSDB_CODE_SUCCESS;
}
int32_t qwDbgResponseRedirect(SQWMsg *qwMsg, SQWTaskCtx *ctx) {
if (gQWDebug.tmp) {
void qwDbgSimulateRedirect(SQWMsg *qwMsg, SQWTaskCtx *ctx, bool *rsped) {
static int32_t ignoreTime = 0;
if (*rsped) {
return;
}
if (gQWDebug.redirectSimulate) {
if (++ignoreTime <= 10) {
return;
}
if (TDMT_SCH_QUERY == qwMsg->msgType && (0 == taosRand() % 3)) {
SEpSet epSet = {0};
epSet.inUse = 1;
......@@ -162,42 +171,94 @@ int32_t qwDbgResponseRedirect(SQWMsg *qwMsg, SQWTaskCtx *ctx) {
ctx->phase = QW_PHASE_POST_QUERY;
qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_RPC_REDIRECT, &epSet);
return TSDB_CODE_SUCCESS;
*rsped = true;
return;
}
if (TDMT_SCH_MERGE_QUERY == qwMsg->msgType && (0 == taosRand() % 3)) {
ctx->phase = QW_PHASE_POST_QUERY;
QW_SET_PHASE(ctx, QW_PHASE_POST_QUERY);
qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_RPC_REDIRECT, NULL);
return TSDB_CODE_SUCCESS;
*rsped = true;
return;
}
if ((TDMT_SCH_FETCH == qwMsg->msgType) && (0 == taosRand() % 9)) {
qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_RPC_REDIRECT, NULL);
*rsped = true;
return;
}
}
}
return TSDB_CODE_SUCCESS;
void qwDbgSimulateSleep(void) {
if (!gQWDebug.sleepSimulate) {
return;
}
static int32_t ignoreTime = 0;
if (++ignoreTime > 10) {
taosSsleep(taosRand() % 20);
}
}
void qwDbgSimulateDead(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *rsped) {
if (!gQWDebug.deadSimulate) {
return;
}
if (*rsped) {
return;
}
static int32_t ignoreTime = 0;
if (++ignoreTime > 10 && 0 == taosRand() % 9) {
SRpcHandleInfo *pConn = ((ctx->msgType == TDMT_SCH_FETCH || ctx->msgType == TDMT_SCH_MERGE_FETCH) ? &ctx->dataConnInfo : &ctx->ctrlConnInfo);
qwBuildAndSendErrorRsp(ctx->msgType + 1, pConn, TSDB_CODE_RPC_BROKEN_LINK);
qwBuildAndSendDropMsg(QW_FPARAMS(), pConn);
*rsped = true;
return;
}
}
int32_t qwDbgEnableDebug(char *option) {
if (0 == strcasecmp(option, "lock")) {
gQWDebug.lockEnable = true;
qDebug("qw lock debug enabled");
qError("qw lock debug enabled");
return TSDB_CODE_SUCCESS;
}
if (0 == strcasecmp(option, "status")) {
gQWDebug.statusEnable = true;
qDebug("qw status debug enabled");
qError("qw status debug enabled");
return TSDB_CODE_SUCCESS;
}
if (0 == strcasecmp(option, "dump")) {
gQWDebug.dumpEnable = true;
qDebug("qw dump debug enabled");
qError("qw dump debug enabled");
return TSDB_CODE_SUCCESS;
}
if (0 == strcasecmp(option, "sleep")) {
gQWDebug.sleepSimulate = true;
qError("qw sleep debug enabled");
return TSDB_CODE_SUCCESS;
}
if (0 == strcasecmp(option, "dead")) {
gQWDebug.sleepSimulate = true;
qError("qw dead debug enabled");
return TSDB_CODE_SUCCESS;
}
if (0 == strcasecmp(option, "tmp")) {
gQWDebug.tmp = true;
qDebug("qw tmp debug enabled");
if (0 == strcasecmp(option, "redirect")) {
gQWDebug.redirectSimulate = true;
qError("qw redirect debug enabled");
return TSDB_CODE_SUCCESS;
}
......
......@@ -43,6 +43,20 @@ void qwFreeFetchRsp(void *msg) {
}
}
int32_t qwBuildAndSendErrorRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code) {
SRpcMsg rpcRsp = {
.msgType = rspType,
.pCont = NULL,
.contLen = 0,
.code = code,
.info = *pConn,
};
tmsgSendRsp(&rpcRsp);
return TSDB_CODE_SUCCESS;
}
int32_t qwBuildAndSendQueryRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code, SQWTaskCtx *ctx) {
STbVerInfo* tbInfo = ctx ? &ctx->tbInfo : NULL;
int64_t affectedRows = ctx ? ctx->affectedRows : 0;
......@@ -184,7 +198,6 @@ int32_t qwBuildAndSendDropMsg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn) {
int32_t code = tmsgPutToQueue(&mgmt->msgCb, FETCH_QUEUE, &pNewMsg);
if (TSDB_CODE_SUCCESS != code) {
QW_SCH_TASK_ELOG("put drop task msg to queue failed, vgId:%d, code:%s", mgmt->nodeId, tstrerror(code));
rpcFreeCont(req);
QW_ERR_RET(code);
}
......@@ -374,8 +387,7 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int
qwMsg.msgInfo.needFetch = msg->needFetch;
char * sql = strndup(msg->msg, msg->sqlLen);
QW_SCH_TASK_DLOG("processQuery start, node:%p, type:%s, handle:%p, sql:%s", node, TMSG_INFO(pMsg->msgType), pMsg->info.handle, sql);
QW_SCH_TASK_DLOG("processQuery start, node:%p, type:%s, handle:%p, SQL:%s", node, TMSG_INFO(pMsg->msgType), pMsg->info.handle, sql);
QW_ERR_RET(qwProcessQuery(QW_FPARAMS(), &qwMsg, sql));
QW_SCH_TASK_DLOG("processQuery end, node:%p", node);
......
......@@ -270,7 +270,7 @@ int32_t qwAddAcquireTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) { return qwAddTask
void qwReleaseTaskCtx(SQWorker *mgmt, void *ctx) { taosHashRelease(mgmt->ctxHash, ctx); }
void qwFreeTaskHandle(QW_FPARAMS_DEF, qTaskInfo_t *taskHandle) {
void qwFreeTaskHandle(qTaskInfo_t *taskHandle) {
// Note: free/kill may in RC
qTaskInfo_t otaskHandle = atomic_load_ptr(taskHandle);
if (otaskHandle && atomic_val_compare_exchange_ptr(taskHandle, otaskHandle, NULL)) {
......@@ -278,7 +278,7 @@ void qwFreeTaskHandle(QW_FPARAMS_DEF, qTaskInfo_t *taskHandle) {
}
}
int32_t qwKillTaskHandle(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
int32_t qwKillTaskHandle(SQWTaskCtx *ctx) {
int32_t code = 0;
// Note: free/kill may in RC
qTaskInfo_t taskHandle = atomic_load_ptr(&ctx->taskHandle);
......@@ -290,7 +290,7 @@ int32_t qwKillTaskHandle(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
QW_RET(code);
}
void qwFreeTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
void qwFreeTaskCtx(SQWTaskCtx *ctx) {
if (ctx->ctrlConnInfo.handle) {
tmsgReleaseHandle(&ctx->ctrlConnInfo, TAOS_CONN_SERVER);
}
......@@ -300,7 +300,7 @@ void qwFreeTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
// NO need to release dataConnInfo
qwFreeTaskHandle(QW_FPARAMS(), &ctx->taskHandle);
qwFreeTaskHandle(&ctx->taskHandle);
if (ctx->sinkHandle) {
dsDestroyDataSinker(ctx->sinkHandle);
......@@ -336,7 +336,7 @@ int32_t qwDropTaskCtx(QW_FPARAMS_DEF) {
QW_ERR_RET(TSDB_CODE_QRY_TASK_CTX_NOT_EXIST);
}
qwFreeTaskCtx(QW_FPARAMS(), &octx);
qwFreeTaskCtx(&octx);
QW_TASK_DLOG_E("task ctx dropped");
......@@ -463,13 +463,21 @@ void qwDestroyImpl(void *pMgmt) {
mgmt->hbTimer = NULL;
taosTmrCleanUp(mgmt->timer);
// TODO STOP ALL QUERY
// TODO FREE ALL
uint64_t qId, tId;
int32_t eId;
void *pIter = taosHashIterate(mgmt->ctxHash, NULL);
while (pIter) {
SQWTaskCtx *ctx = (SQWTaskCtx *)pIter;
void *key = taosHashGetKey(pIter, NULL);
QW_GET_QTID(key, qId, tId, eId);
qwFreeTaskCtx(ctx);
QW_TASK_DLOG_E("task ctx freed");
pIter = taosHashIterate(mgmt->ctxHash, pIter);
}
taosHashCleanup(mgmt->ctxHash);
void *pIter = taosHashIterate(mgmt->schHash, NULL);
pIter = taosHashIterate(mgmt->schHash, NULL);
while (pIter) {
SQWSchStatus *sch = (SQWSchStatus *)pIter;
qwDestroySchStatus(sch);
......
......@@ -83,6 +83,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) {
// if *taskHandle is NULL, it's killed right now
if (taskHandle) {
qwDbgSimulateSleep();
code = qExecTask(taskHandle, &pRes, &useconds);
if (code) {
if (code != TSDB_CODE_OPS_NOT_SUPPORT) {
......@@ -293,11 +294,7 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu
QW_LOCK(QW_WRITE, &ctx->lock);
if (QW_PHASE_PRE_FETCH == phase) {
atomic_store_8((int8_t *)&ctx->queryFetched, true);
} else {
atomic_store_8(&ctx->phase, phase);
}
QW_SET_PHASE(ctx, phase);
if (atomic_load_8((int8_t *)&ctx->queryEnd)) {
QW_TASK_ELOG_E("query already end");
......@@ -370,6 +367,7 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu
}
_return:
if (ctx) {
QW_UPDATE_RSP_CODE(ctx, code);
......@@ -390,7 +388,6 @@ int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inp
int32_t code = 0;
SQWTaskCtx *ctx = NULL;
SRpcHandleInfo connInfo = {0};
SRpcHandleInfo *rspConnection = NULL;
QW_TASK_DLOG("start to handle event at phase %s", qwPhaseStr(phase));
......@@ -403,13 +400,6 @@ int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inp
QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
}
if (QW_PHASE_POST_QUERY == phase) {
connInfo = ctx->ctrlConnInfo;
rspConnection = &connInfo;
ctx->queryRsped = true;
}
if (QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
if (QW_PHASE_POST_FETCH == phase) {
QW_TASK_WLOG("drop received at wrong phase %s", qwPhaseStr(phase));
......@@ -437,17 +427,23 @@ _return:
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_PART_SUCC);
}
if (rspConnection) {
qwBuildAndSendQueryRsp(input->msgType + 1, rspConnection, code, ctx);
QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", rspConnection->handle, code, tstrerror(code));
if (QW_PHASE_POST_QUERY == phase && ctx) {
ctx->queryRsped = true;
bool rsped = false;
SQWMsg qwMsg = {.msgType = ctx->msgType, .connInfo = ctx->ctrlConnInfo};
qwDbgSimulateRedirect(&qwMsg, ctx, &rsped);
qwDbgSimulateDead(QW_FPARAMS(), ctx, &rsped);
if (!rsped) {
qwBuildAndSendQueryRsp(input->msgType + 1, &ctx->ctrlConnInfo, code, ctx);
QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code));
}
}
if (ctx) {
QW_UPDATE_RSP_CODE(ctx, code);
if (QW_PHASE_POST_FETCH != phase) {
atomic_store_8(&ctx->phase, phase);
}
QW_SET_PHASE(ctx, phase);
QW_UNLOCK(QW_WRITE, &ctx->lock);
qwReleaseTaskCtx(mgmt, ctx);
......@@ -488,8 +484,6 @@ int32_t qwPreprocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
QW_ERR_JRET(qwAddTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_INIT));
qwDbgResponseRedirect(qwMsg, ctx);
_return:
if (ctx) {
......@@ -517,7 +511,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, const char* sql) {
ctx->taskType = qwMsg->msgInfo.taskType;
ctx->explain = qwMsg->msgInfo.explain;
ctx->needFetch = qwMsg->msgInfo.needFetch;
ctx->queryType = qwMsg->msgType;
ctx->msgType = qwMsg->msgType;
//QW_TASK_DLOGL("subplan json string, len:%d, %s", qwMsg->msgLen, qwMsg->msg);
......@@ -636,8 +630,8 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
QW_LOCK(QW_WRITE, &ctx->lock);
if (queryEnd || code || 0 == atomic_load_8((int8_t *)&ctx->queryContinue)) {
// Note: if necessary, fetch need to put cquery to queue again
atomic_store_8(&ctx->phase, 0);
// Note: query is not running anymore
QW_SET_PHASE(ctx, 0);
QW_UNLOCK(QW_WRITE, &ctx->lock);
break;
}
......@@ -662,14 +656,13 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx));
ctx->queryType = qwMsg->msgType;
ctx->msgType = qwMsg->msgType;
ctx->dataConnInfo = qwMsg->connInfo;
SOutputData sOutput = {0};
QW_ERR_JRET(qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput));
if (NULL == rsp) {
ctx->dataConnInfo = qwMsg->connInfo;
QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_FETCH);
} else {
bool qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd);
......@@ -714,9 +707,16 @@ _return:
}
if (code || rsp) {
qwBuildAndSendFetchRsp(qwMsg->msgType + 1, &qwMsg->connInfo, rsp, dataLen, code);
QW_TASK_DLOG("%s send, handle:%p, code:%x - %s, dataLen:%d", TMSG_INFO(qwMsg->msgType + 1), qwMsg->connInfo.handle, code, tstrerror(code),
dataLen);
bool rsped = false;
if (ctx) {
qwDbgSimulateRedirect(qwMsg, ctx, &rsped);
qwDbgSimulateDead(QW_FPARAMS(), ctx, &rsped);
}
if (!rsped) {
qwBuildAndSendFetchRsp(qwMsg->msgType + 1, &qwMsg->connInfo, rsp, dataLen, code);
QW_TASK_DLOG("%s send, handle:%p, code:%x - %s, dataLen:%d", TMSG_INFO(qwMsg->msgType + 1), qwMsg->connInfo.handle, code, tstrerror(code),
dataLen);
}
}
QW_RET(TSDB_CODE_SUCCESS);
......@@ -724,7 +724,7 @@ _return:
int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
int32_t code = 0;
bool rsped = false;
bool dropped = false;
SQWTaskCtx *ctx = NULL;
bool locked = false;
......@@ -740,18 +740,14 @@ int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
}
if (QW_QUERY_RUNNING(ctx)) {
QW_ERR_JRET(qwKillTaskHandle(QW_FPARAMS(), ctx));
QW_ERR_JRET(qwKillTaskHandle(ctx));
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_DROP);
} else if (ctx->phase > 0) {
QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
rsped = true;
} else {
// task not started
QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
dropped = true;
}
if (!rsped) {
ctx->ctrlConnInfo = qwMsg->connInfo;
if (!dropped) {
QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_DROP);
}
......@@ -954,7 +950,7 @@ int32_t qwProcessDelete(QW_FPARAMS_DEF, SQWMsg *qwMsg, SDeleteRes *pRes) {
_return:
qwFreeTaskCtx(QW_FPARAMS(), &ctx);
qwFreeTaskCtx(&ctx);
QW_RET(TSDB_CODE_SUCCESS);
}
......
......@@ -55,13 +55,11 @@ typedef enum {
#define SCHEDULE_DEFAULT_MAX_TASK_NUM 1000
#define SCHEDULE_DEFAULT_MAX_NODE_TABLE_NUM 200 // unit is TSDB_TABLE_NUM_UNIT
#define SCHEDULE_DEFAULT_POLICY SCH_LOAD_SEQ
#define SCHEDULE_DEFAULT_MAX_NODE_NUM 20
#define SCH_DEFAULT_TASK_TIMEOUT_USEC 10000000
#define SCH_MAX_TASK_TIMEOUT_USEC 60000000
#define SCH_MAX_CANDIDATE_EP_NUM TSDB_MAX_REPLICA
#define SCH_DEFAULT_MAX_RETRY_NUM 6
typedef struct SSchDebug {
bool lockEnable;
......@@ -211,6 +209,7 @@ typedef struct SSchTask {
int32_t maxExecTimes; // task max exec times
int32_t maxRetryTimes; // task max retry times
int32_t retryTimes; // task retry times
bool waitRetry; // wait for retry
int32_t execId; // task current execute index
SSchLevel *level; // level
SRWLatch planLock; // task update plan lock
......@@ -274,7 +273,8 @@ typedef struct SSchJob {
int32_t errCode;
SRWLatch resLock;
SExecResult execRes;
void *resData; //TODO free it or not
void *fetchRes; //TODO free it or not
bool fetched;
int32_t resNumOfRows;
SSchResInfo userRes;
const char *sql;
......@@ -326,7 +326,7 @@ extern SSchedulerMgmt schMgmt;
#define SCH_IS_EXPLAIN_JOB(_job) (EXPLAIN_MODE_ANALYZE == (_job)->attr.explainMode)
#define SCH_NETWORK_ERR(_code) ((_code) == TSDB_CODE_RPC_BROKEN_LINK || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL)
#define SCH_MERGE_TASK_NETWORK_ERR(_task, _code, _len) (SCH_NETWORK_ERR(_code) && (((_len) > 0) || (!SCH_IS_DATA_BIND_TASK(_task))))
#define SCH_REDIRECT_MSGTYPE(_msgType) ((_msgType) == TDMT_SCH_QUERY || (_msgType) == TDMT_SCH_MERGE_QUERY || (_msgType) == TDMT_SCH_FETCH || (_msgType) == TDMT_SCH_MERGE_FETCH)
#define SCH_REDIRECT_MSGTYPE(_msgType) ((_msgType) == TDMT_SCH_LINK_BROKEN || (_msgType) == TDMT_SCH_QUERY || (_msgType) == TDMT_SCH_MERGE_QUERY || (_msgType) == TDMT_SCH_FETCH || (_msgType) == TDMT_SCH_MERGE_FETCH)
#define SCH_TASK_NEED_REDIRECT(_task, _msgType, _code, _rspLen) (SCH_REDIRECT_MSGTYPE(_msgType) && (NEED_SCHEDULER_REDIRECT_ERROR(_code) || SCH_MERGE_TASK_NETWORK_ERR((_task), (_code), (_rspLen))))
#define SCH_NEED_RETRY(_msgType, _code) ((SCH_NETWORK_ERR(_code) && SCH_REDIRECT_MSGTYPE(_msgType)) || (_code) == TSDB_CODE_SCH_TIMEOUT_ERROR)
......@@ -368,6 +368,8 @@ extern SSchedulerMgmt schMgmt;
qError("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_TASK_ID(pTask), SCH_TASK_EID(pTask),__VA_ARGS__)
#define SCH_TASK_DLOG(param, ...) \
qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_TASK_ID(pTask), SCH_TASK_EID(pTask),__VA_ARGS__)
#define SCH_TASK_TLOG(param, ...) \
qTrace("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_TASK_ID(pTask), SCH_TASK_EID(pTask),__VA_ARGS__)
#define SCH_TASK_DLOGL(param, ...) \
qDebugL("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_TASK_ID(pTask), SCH_TASK_EID(pTask),__VA_ARGS__)
#define SCH_TASK_WLOG(param, ...) \
......@@ -441,7 +443,7 @@ void schFreeRpcCtx(SRpcCtx *pCtx);
int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp);
bool schJobNeedToStop(SSchJob *pJob, int8_t *pStatus);
int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask);
int32_t schSaveJobQueryRes(SSchJob *pJob, SQueryTableRsp *rsp);
int32_t schSaveJobExecRes(SSchJob *pJob, SQueryTableRsp *rsp);
int32_t schProcessOnExplainDone(SSchJob *pJob, SSchTask *pTask, SRetrieveTableRsp *pRsp);
void schProcessOnDataFetched(SSchJob *job);
int32_t schGetTaskInJob(SSchJob *pJob, uint64_t taskId, SSchTask **pTask);
......@@ -492,7 +494,7 @@ int32_t schSwitchTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask);
void schDirectPostJobRes(SSchedulerReq* pReq, int32_t errCode);
int32_t schHandleJobFailure(SSchJob *pJob, int32_t errCode);
int32_t schHandleJobDrop(SSchJob *pJob, int32_t errCode);
bool schChkCurrentOp(SSchJob *pJob, int32_t op, bool sync);
bool schChkCurrentOp(SSchJob *pJob, int32_t op, int8_t sync);
extern SSchDebug gSCHDebug;
......
......@@ -110,7 +110,7 @@ int32_t schUpdateJobStatus(SSchJob *pJob, int8_t newStatus) {
break;
case JOB_TASK_STATUS_PART_SUCC:
if (newStatus != JOB_TASK_STATUS_FAIL && newStatus != JOB_TASK_STATUS_SUCC &&
newStatus != JOB_TASK_STATUS_DROP) {
newStatus != JOB_TASK_STATUS_DROP && newStatus != JOB_TASK_STATUS_EXEC) {
SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
}
......@@ -389,13 +389,18 @@ int32_t schDumpJobExecRes(SSchJob* pJob, SExecResult* pRes) {
int32_t schDumpJobFetchRes(SSchJob* pJob, void** pData) {
int32_t code = 0;
if (pJob->resData && ((SRetrieveTableRsp *)pJob->resData)->completed) {
SCH_ERR_RET(schSwitchJobStatus(pJob, JOB_TASK_STATUS_SUCC, NULL));
SCH_LOCK(SCH_WRITE, &pJob->resLock);
pJob->fetched = true;
if (pJob->fetchRes && ((SRetrieveTableRsp *)pJob->fetchRes)->completed) {
SCH_ERR_JRET(schSwitchJobStatus(pJob, JOB_TASK_STATUS_SUCC, NULL));
}
while (true) {
*pData = atomic_load_ptr(&pJob->resData);
if (*pData != atomic_val_compare_exchange_ptr(&pJob->resData, *pData, NULL)) {
*pData = atomic_load_ptr(&pJob->fetchRes);
if (*pData != atomic_val_compare_exchange_ptr(&pJob->fetchRes, *pData, NULL)) {
continue;
}
......@@ -414,7 +419,11 @@ int32_t schDumpJobFetchRes(SSchJob* pJob, void** pData) {
SCH_JOB_DLOG("fetch done, totalRows:%d", pJob->resNumOfRows);
return TSDB_CODE_SUCCESS;
_return:
SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
return code;
}
int32_t schNotifyUserExecRes(SSchJob* pJob) {
......@@ -512,8 +521,12 @@ int32_t schHandleJobDrop(SSchJob *pJob, int32_t errCode) {
}
int32_t schProcessOnJobPartialSuccess(SSchJob *pJob) {
schPostJobRes(pJob, SCH_OP_EXEC);
int32_t schProcessOnJobPartialSuccess(SSchJob *pJob) {
if (schChkCurrentOp(pJob, SCH_OP_FETCH, -1)) {
SCH_ERR_RET(schLaunchFetchTask(pJob));
} else {
schPostJobRes(pJob, 0);
}
return TSDB_CODE_SUCCESS;
}
......@@ -526,7 +539,7 @@ int32_t schProcessOnExplainDone(SSchJob *pJob, SSchTask *pTask, SRetrieveTableRs
SCH_TASK_DLOG("got explain rsp, rows:%d, complete:%d", htonl(pRsp->numOfRows), pRsp->completed);
atomic_store_32(&pJob->resNumOfRows, htonl(pRsp->numOfRows));
atomic_store_ptr(&pJob->resData, pRsp);
atomic_store_ptr(&pJob->fetchRes, pRsp);
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCC);
......@@ -561,7 +574,7 @@ int32_t schLaunchJobLowerLevel(SSchJob *pJob, SSchTask *pTask) {
return TSDB_CODE_SUCCESS;
}
int32_t schSaveJobQueryRes(SSchJob *pJob, SQueryTableRsp *rsp) {
int32_t schSaveJobExecRes(SSchJob *pJob, SQueryTableRsp *rsp) {
if (rsp->tbFName[0]) {
SCH_LOCK(SCH_WRITE, &pJob->resLock);
......@@ -600,7 +613,7 @@ int32_t schGetTaskInJob(SSchJob *pJob, uint64_t taskId, SSchTask **pTask) {
int32_t schLaunchJob(SSchJob *pJob) {
if (EXPLAIN_MODE_STATIC == pJob->attr.explainMode) {
SCH_ERR_RET(qExecStaticExplain(pJob->pDag, (SRetrieveTableRsp **)&pJob->resData));
SCH_ERR_RET(qExecStaticExplain(pJob->pDag, (SRetrieveTableRsp **)&pJob->fetchRes));
SCH_ERR_RET(schSwitchJobStatus(pJob, JOB_TASK_STATUS_PART_SUCC, NULL));
} else {
SSchLevel *level = taosArrayGet(pJob->levels, pJob->levelIdx);
......@@ -661,7 +674,7 @@ void schFreeJobImpl(void *job) {
qDestroyQueryPlan(pJob->pDag);
taosMemoryFreeClear(pJob->userRes.execRes);
taosMemoryFreeClear(pJob->resData);
taosMemoryFreeClear(pJob->fetchRes);
taosMemoryFree(pJob);
int32_t jobNum = atomic_sub_fetch_32(&schMgmt.jobNum, 1);
......@@ -795,9 +808,14 @@ void schDirectPostJobRes(SSchedulerReq* pReq, int32_t errCode) {
}
}
bool schChkCurrentOp(SSchJob *pJob, int32_t op, bool sync) {
bool schChkCurrentOp(SSchJob *pJob, int32_t op, int8_t sync) {
bool r = false;
SCH_LOCK(SCH_READ, &pJob->opStatus.lock);
bool r = (pJob->opStatus.op == op) && (pJob->opStatus.syncReq == sync);
if (sync >= 0) {
r = (pJob->opStatus.op == op) && (pJob->opStatus.syncReq == sync);
} else {
r = (pJob->opStatus.op == op);
}
SCH_UNLOCK(SCH_READ, &pJob->opStatus.lock);
return r;
......
......@@ -256,7 +256,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa
SCH_ERR_JRET(rsp->code);
SCH_ERR_JRET(schSaveJobQueryRes(pJob, rsp));
SCH_ERR_JRET(schSaveJobExecRes(pJob, rsp));
atomic_add_fetch_32(&pJob->resNumOfRows, rsp->affectedRows);
......@@ -277,8 +277,8 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa
SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
}
if (pJob->resData) {
SCH_TASK_ELOG("explain result is already generated, res:%p", pJob->resData);
if (pJob->fetchRes) {
SCH_TASK_ELOG("explain result is already generated, res:%p", pJob->fetchRes);
SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR);
}
......@@ -325,13 +325,13 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa
return TSDB_CODE_SUCCESS;
}
if (pJob->resData) {
SCH_TASK_ELOG("got fetch rsp while res already exists, res:%p", pJob->resData);
if (pJob->fetchRes) {
SCH_TASK_ELOG("got fetch rsp while res already exists, res:%p", pJob->fetchRes);
taosMemoryFreeClear(rsp);
SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR);
}
atomic_store_ptr(&pJob->resData, rsp);
atomic_store_ptr(&pJob->fetchRes, rsp);
atomic_add_fetch_32(&pJob->resNumOfRows, htonl(rsp->numOfRows));
if (rsp->completed) {
......@@ -1010,6 +1010,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
memcpy(pMsg->msg + len, pTask->msg, pTask->msgLen);
persistHandle = true;
SCH_SET_TASK_HANDLE(pTask, rpcAllocHandle());
break;
}
case TDMT_SCH_FETCH:
......
......@@ -47,10 +47,10 @@ void schFreeTask(SSchJob *pJob, SSchTask *pTask) {
void schInitTaskRetryTimes(SSchJob *pJob, SSchTask *pTask, SSchLevel *pLevel) {
if (SCH_IS_DATA_BIND_TASK(pTask) || (!SCH_IS_QUERY_JOB(pJob)) || (SCH_ALL != schMgmt.cfg.schPolicy)) {
pTask->maxRetryTimes = SCH_MAX_CANDIDATE_EP_NUM;
pTask->maxRetryTimes = SCH_DEFAULT_MAX_RETRY_NUM;
} else {
int32_t nodeNum = taosArrayGetSize(pJob->nodeList);
pTask->maxRetryTimes = TMAX(nodeNum, SCH_MAX_CANDIDATE_EP_NUM);
pTask->maxRetryTimes = TMAX(nodeNum, SCH_DEFAULT_MAX_RETRY_NUM);
}
pTask->maxExecTimes = pTask->maxRetryTimes * (pLevel->level + 1);
......@@ -64,11 +64,11 @@ int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *
pTask->execId = -1;
pTask->timeoutUsec = SCH_DEFAULT_TASK_TIMEOUT_USEC;
pTask->taskId = schGenTaskId();
pTask->execNodes =
taosHashInit(SCH_MAX_CANDIDATE_EP_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
schInitTaskRetryTimes(pJob, pTask, pLevel);
pTask->execNodes =
taosHashInit(pTask->maxExecTimes, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
pTask->profile.execTime = taosArrayInit(pTask->maxExecTimes, sizeof(int64_t));
if (NULL == pTask->execNodes || NULL == pTask->profile.execTime) {
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
......@@ -125,8 +125,8 @@ int32_t schDropTaskExecNode(SSchJob *pJob, SSchTask *pTask, void *handle, int32_
SCH_TASK_DLOG("execId %d removed from execNodeList", execId);
}
if (execId != pTask->execId) { // ignore it
SCH_TASK_DLOG("execId %d is not current execId %d", execId, pTask->execId);
if ((execId != pTask->execId) || pTask->waitRetry) { // ignore it
SCH_TASK_DLOG("execId %d is already not current execId %d, waitRetry %d", execId, pTask->execId, pTask->waitRetry);
SCH_ERR_RET(TSDB_CODE_SCH_IGNORE_ERROR);
}
......@@ -138,7 +138,17 @@ int32_t schUpdateTaskExecNode(SSchJob *pJob, SSchTask *pTask, void *handle, int3
return TSDB_CODE_SUCCESS;
}
if ((execId != pTask->execId) || pTask->waitRetry) { // ignore it
SCH_TASK_DLOG("handle not updated since execId %d is already not current execId %d, waitRetry %d", execId, pTask->execId, pTask->waitRetry);
return TSDB_CODE_SUCCESS;
}
SSchNodeInfo *nodeInfo = taosHashGet(pTask->execNodes, &execId, sizeof(execId));
if (NULL == nodeInfo) { // ignore it
SCH_TASK_DLOG("handle not updated since execId %d already not exist, current execId %d, waitRetry %d", execId, pTask->execId, pTask->waitRetry);
return TSDB_CODE_SUCCESS;
}
nodeInfo->handle = handle;
SCH_TASK_DLOG("handle updated to %p for execId %d", handle, execId);
......@@ -335,6 +345,7 @@ int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32
return TSDB_CODE_SUCCESS;
}
pTask->waitRetry = true;
schDropTaskOnExecNode(pJob, pTask);
taosHashClear(pTask->execNodes);
SCH_ERR_JRET(schRemoveTaskFromExecList(pJob, pTask));
......@@ -394,6 +405,18 @@ _return:
int32_t schHandleRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32_t rspCode) {
int32_t code = 0;
if (JOB_TASK_STATUS_PART_SUCC == pJob->status) {
SCH_LOCK(SCH_WRITE, &pJob->resLock);
if (pJob->fetched) {
SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
SCH_TASK_ELOG("already fetched while got error %s", tstrerror(rspCode));
SCH_ERR_RET(rspCode);
}
SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
schUpdateJobStatus(pJob, JOB_TASK_STATUS_EXEC);
}
if (SCH_IS_DATA_BIND_TASK(pTask)) {
if (NULL == pData->pEpSet) {
SCH_TASK_ELOG("no epset updated while got error %s", tstrerror(rspCode));
......@@ -591,7 +614,7 @@ int32_t schSetAddrsFromNodeList(SSchJob *pJob, SSchTask *pTask) {
if (pJob->nodeList) {
nodeNum = taosArrayGetSize(pJob->nodeList);
for (int32_t i = 0; i < nodeNum && addNum < SCH_MAX_CANDIDATE_EP_NUM; ++i) {
for (int32_t i = 0; i < nodeNum; ++i) {
SQueryNodeLoad *nload = taosArrayGet(pJob->nodeList, i);
SQueryNodeAddr *naddr = &nload->addr;
......@@ -600,8 +623,8 @@ int32_t schSetAddrsFromNodeList(SSchJob *pJob, SSchTask *pTask) {
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
SCH_TASK_DLOG("set %dth candidate addr, id %d, fqdn:%s, port:%d", i, naddr->nodeId, SCH_GET_CUR_EP(naddr)->fqdn,
SCH_GET_CUR_EP(naddr)->port);
SCH_TASK_TLOG("set %dth candidate addr, id %d, inUse:%d/%d, fqdn:%s, port:%d", i, naddr->nodeId, naddr->epSet.inUse, naddr->epSet.numOfEps,
SCH_GET_CUR_EP(naddr)->fqdn, SCH_GET_CUR_EP(naddr)->port);
++addNum;
}
......@@ -621,9 +644,9 @@ int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) {
}
pTask->candidateIdx = 0;
pTask->candidateAddrs = taosArrayInit(SCH_MAX_CANDIDATE_EP_NUM, sizeof(SQueryNodeAddr));
pTask->candidateAddrs = taosArrayInit(SCHEDULE_DEFAULT_MAX_NODE_NUM, sizeof(SQueryNodeAddr));
if (NULL == pTask->candidateAddrs) {
SCH_TASK_ELOG("taosArrayInit %d condidate addrs failed", SCH_MAX_CANDIDATE_EP_NUM);
SCH_TASK_ELOG("taosArrayInit %d condidate addrs failed", SCHEDULE_DEFAULT_MAX_NODE_NUM);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
......@@ -790,6 +813,7 @@ int32_t schLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) {
atomic_add_fetch_32(&pTask->level->taskLaunchedNum, 1);
pTask->execId++;
pTask->retryTimes++;
pTask->waitRetry = false;
SCH_TASK_DLOG("start to launch task, execId %d, retry %d", pTask->execId, pTask->retryTimes);
......@@ -885,9 +909,9 @@ void schDropTaskInHashList(SSchJob *pJob, SHashObj *list) {
int32_t schLaunchFetchTask(SSchJob *pJob) {
int32_t code = 0;
void *resData = atomic_load_ptr(&pJob->resData);
if (resData) {
SCH_JOB_DLOG("res already fetched, res:%p", resData);
void *fetchRes = atomic_load_ptr(&pJob->fetchRes);
if (fetchRes) {
SCH_JOB_DLOG("res already fetched, res:%p", fetchRes);
return TSDB_CODE_SUCCESS;
}
......
......@@ -170,7 +170,7 @@ void rpcSetDefaultAddr(void* thandle, const char* ip, const char* fqdn) {
transSetDefaultAddr(thandle, ip, fqdn);
}
int64_t rpcAllocHandle() { return transAllocHandle(); }
void* rpcAllocHandle() { return (void*)transAllocHandle(); }
int32_t rpcInit() {
transInit();
......
/** Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
......@@ -809,7 +810,7 @@ SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrd* pThrd, bool* ignore) {
conn = exh->handle;
if (conn == NULL) {
conn = getConnFromPool(pThrd->pool, EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet));
*ignore = (conn && 0 == specifyConnRef(conn, true, refId)) ? false : true;
if (conn != NULL) specifyConnRef(conn, true, refId);
}
transReleaseExHandle(transGetRefMgt(), refId);
}
......@@ -849,14 +850,20 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
bool ignore = false;
SCliConn* conn = cliGetConn(pMsg, pThrd, &ignore);
if (ignore == true) {
tError("ignore msg");
return;
}
if (conn != NULL) {
transCtxMerge(&conn->ctx, &pCtx->appCtx);
transQueuePush(&conn->cliMsgs, pMsg);
cliSend(conn);
} else {
conn = cliCreateConn(pThrd);
int64_t refId = (int64_t)pMsg->msg.info.handle;
if (refId != 0) specifyConnRef(conn, true, refId);
transCtxMerge(&conn->ctx, &pCtx->appCtx);
transQueuePush(&conn->cliMsgs, pMsg);
......@@ -1206,7 +1213,13 @@ SCliThrd* transGetWorkThrd(STrans* trans, int64_t handle, bool* validHandle) {
if (idx < 0) return NULL;
return ((SCliObj*)trans->tcphandle)->pThreadObj[idx];
}
return transGetWorkThrdFromHandle(handle, validHandle);
SCliThrd* pThrd = transGetWorkThrdFromHandle(handle, validHandle);
if (*validHandle == true && pThrd == NULL) {
int idx = cliRBChoseIdx(trans);
if (idx < 0) return NULL;
pThrd = ((SCliObj*)trans->tcphandle)->pThreadObj[idx];
}
return pThrd;
}
void transReleaseCliHandle(void* handle) {
int idx = -1;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册