From 43336c0a3928f1e3c43fceff7291f106776e9864 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Mon, 21 Mar 2022 16:43:27 +0800 Subject: [PATCH] feature/scheduler --- include/common/tmsg.h | 1 - include/libs/qcom/query.h | 2 +- source/common/src/tmsg.c | 2 - source/dnode/mnode/impl/src/mndDb.c | 4 +- source/dnode/vnode/src/vnd/vnodeQuery.c | 5 +- source/libs/qcom/src/queryUtil.c | 4 +- source/libs/qworker/inc/qworkerInt.h | 8 +- source/libs/qworker/inc/qworkerMsg.h | 14 ++-- source/libs/qworker/src/qworker.c | 43 +++++----- source/libs/qworker/src/qworkerMsg.c | 10 ++- source/libs/scheduler/inc/schedulerInt.h | 1 - source/libs/scheduler/src/scheduler.c | 80 +++++++++---------- source/libs/scheduler/test/schedulerTests.cpp | 12 +-- 13 files changed, 91 insertions(+), 95 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 9269858645..1bdc07ba6f 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1112,7 +1112,6 @@ int32_t tDeserializeSSchedulerHbReq(void* buf, int32_t bufLen, SSchedulerHbReq* void tFreeSSchedulerHbReq(SSchedulerHbReq* pReq); typedef struct { - uint64_t seqId; SQueryNodeEpId epId; SArray* taskStatus; // SArray } SSchedulerHbRsp; diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index de01ae918f..9b500ff0db 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -150,7 +150,7 @@ int32_t cleanupTaskQueue(); */ int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code); -int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo, bool persistHandle, SRpcCtx *ctx); +int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo, bool persistHandle, void *ctx); /** * Asynchronously send message to server, after the response received, the callback will be incured. diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index d5909e2bb7..06d124cb28 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -2567,7 +2567,6 @@ int32_t tSerializeSSchedulerHbRsp(void *buf, int32_t bufLen, SSchedulerHbRsp *pR tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); if (tStartEncode(&encoder) < 0) return -1; - if (tEncodeU64(&encoder, pRsp->seqId) < 0) return -1; if (tEncodeI32(&encoder, pRsp->epId.nodeId) < 0) return -1; if (tEncodeU16(&encoder, pRsp->epId.ep.port) < 0) return -1; if (tEncodeCStr(&encoder, pRsp->epId.ep.fqdn) < 0) return -1; @@ -2596,7 +2595,6 @@ int32_t tDeserializeSSchedulerHbRsp(void *buf, int32_t bufLen, SSchedulerHbRsp * tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER); if (tStartDecode(&decoder) < 0) return -1; - if (tDecodeU64(&decoder, &pRsp->seqId) < 0) return -1; if (tDecodeI32(&decoder, &pRsp->epId.nodeId) < 0) return -1; if (tDecodeU16(&decoder, &pRsp->epId.ep.port) < 0) return -1; if (tDecodeCStrTo(&decoder, pRsp->epId.ep.fqdn) < 0) return -1; diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index ead56c1f8c..8f2a7ef2e5 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -986,7 +986,9 @@ static int32_t mndProcessUseDbReq(SNodeMsg *pReq) { usedbRsp.vgVersion = usedbReq.vgVersion; code = 0; } - usedbRsp.vgNum = taosArrayGetSize(usedbRsp.pVgroupInfos); + usedbRsp.vgNum = taosArrayGetSize(usedbRsp.pVgroupInfos); + + // no jump, need to construct rsp } else { pDb = mndAcquireDb(pMnode, usedbReq.db); if (pDb == NULL) { diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index 0ac60ea72d..5762e81ba4 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -33,9 +33,8 @@ int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { SReadHandle handle = {.reader = pVnode->pTsdb, .meta = pVnode->pMeta, .config = &pVnode->config}; switch (pMsg->msgType) { - case TDMT_VND_QUERY: { + case TDMT_VND_QUERY: return qWorkerProcessQueryMsg(&handle, pVnode->pQuery, pMsg); - } case TDMT_VND_QUERY_CONTINUE: return qWorkerProcessCQueryMsg(&handle, pVnode->pQuery, pMsg); default: @@ -205,7 +204,7 @@ _exit: rpcSendResponse(&rpcMsg); - return code; + return TSDB_CODE_SUCCESS; } static void freeItemHelper(void *pItem) { diff --git a/source/libs/qcom/src/queryUtil.c b/source/libs/qcom/src/queryUtil.c index b8162f596f..0cf46edf11 100644 --- a/source/libs/qcom/src/queryUtil.c +++ b/source/libs/qcom/src/queryUtil.c @@ -140,7 +140,7 @@ int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code) return 0; } -int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo, bool persistHandle, SRpcCtx *ctx) { +int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo, bool persistHandle, void *rpcCtx) { char* pMsg = rpcMallocCont(pInfo->msgInfo.len); if (NULL == pMsg) { qError("0x%" PRIx64 " msg:%s malloc failed", pInfo->requestId, TMSG_INFO(pInfo->msgType)); @@ -163,7 +163,7 @@ int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTra assert(pInfo->fp != NULL); - rpcSendRequestWithCtx(pTransporter, epSet, &rpcMsg, pTransporterId, ctx); + rpcSendRequestWithCtx(pTransporter, epSet, &rpcMsg, pTransporterId, rpcCtx); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/qworker/inc/qworkerInt.h b/source/libs/qworker/inc/qworkerInt.h index 9d51e1ccbf..ab55b4b76d 100644 --- a/source/libs/qworker/inc/qworkerInt.h +++ b/source/libs/qworker/inc/qworkerInt.h @@ -84,7 +84,7 @@ typedef struct SQWMsg { typedef struct SQWHbInfo { SSchedulerHbRsp rsp; - void *connection; + SQWConnInfo connInfo; } SQWHbInfo; typedef struct SQWPhaseInput { @@ -122,8 +122,8 @@ typedef struct SQWTaskCtx { typedef struct SQWSchStatus { int32_t lastAccessTs; // timestamp in second - uint64_t hbSeqId; - SQWConnInfo *hbConnection; + SRWLatch connLock; + SQWConnInfo connInfo; SRWLatch tasksLock; SHashObj *tasksHash; // key:queryId+taskId, value: SQWTaskStatus } SQWSchStatus; @@ -225,8 +225,6 @@ typedef struct SQWorkerMgmt { } \ } while (0) -int32_t qwBuildAndSendCancelRsp(SRpcMsg *pMsg, int32_t code); - #ifdef __cplusplus } #endif diff --git a/source/libs/qworker/inc/qworkerMsg.h b/source/libs/qworker/inc/qworkerMsg.h index 141e8f7916..be1d47a189 100644 --- a/source/libs/qworker/inc/qworkerMsg.h +++ b/source/libs/qworker/inc/qworkerMsg.h @@ -30,17 +30,17 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg); int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg); int32_t qwProcessHb(SQWorkerMgmt *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req); -int32_t qwBuildAndSendDropRsp(void *connection, int32_t code); -int32_t qwBuildAndSendCancelRsp(SRpcMsg *pMsg, int32_t code); -int32_t qwBuildAndSendFetchRsp(void *connection, SRetrieveTableRsp *pRsp, int32_t dataLength, int32_t code); +int32_t qwBuildAndSendDropRsp(SQWConnInfo *pConn, int32_t code); +int32_t qwBuildAndSendCancelRsp(SQWConnInfo *pConn, int32_t code); +int32_t qwBuildAndSendFetchRsp(SQWConnInfo *pConn, SRetrieveTableRsp *pRsp, int32_t dataLength, int32_t code); void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComplete); -int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, void *connection); -int32_t qwBuildAndSendReadyRsp(void *connection, int32_t code); -int32_t qwBuildAndSendQueryRsp(void *connection, int32_t code); +int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SQWConnInfo *pConn); +int32_t qwBuildAndSendReadyRsp(SQWConnInfo *pConn, int32_t code); +int32_t qwBuildAndSendQueryRsp(SQWConnInfo *pConn, int32_t code); void qwFreeFetchRsp(void *msg); int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp); int32_t qwGetSchTasksStatus(SQWorkerMgmt *mgmt, uint64_t sId, SSchedulerStatusRsp **rsp); -int32_t qwBuildAndSendHbRsp(SRpcMsg *pMsg, SSchedulerHbRsp *rsp, int32_t code); +int32_t qwBuildAndSendHbRsp(SQWConnInfo *pConn, SSchedulerHbRsp *rsp, int32_t code); int32_t qwRegisterBrokenLinkArg(QW_FPARAMS_DEF, SQWConnInfo *pConn); diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index c025567fb2..3b01c2c29e 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -402,7 +402,7 @@ int32_t qwKillTaskHandle(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { void qwFreeTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { - rpcReleaseHandle(ctx->connInfo.handle, CONN_SERVER); + rpcReleaseHandle(ctx->connInfo.handle, TAOS_CONN_SERVER); ctx->connInfo.handle = NULL; qwFreeTaskHandle(QW_FPARAMS(), &ctx->taskHandle); @@ -591,8 +591,7 @@ int32_t qwGenerateSchHbRsp(SQWorkerMgmt *mgmt, SQWSchStatus *sch, SQWHbInfo *hbI return TSDB_CODE_QRY_OUT_OF_MEMORY; } - hbInfo->connection = sch->hbConnection; - hbInfo->rsp.seqId = -1; + hbInfo->connInfo = sch->connInfo; void *key = NULL; size_t keyLen = 0; @@ -947,7 +946,7 @@ _return: QW_TASK_DLOG("query msg rsped, code:%x - %s", code, tstrerror(code)); } - QW_RET(code); + QW_RET(TSDB_CODE_SUCCESS); } int32_t qwProcessReady(QW_FPARAMS_DEF, SQWMsg *qwMsg) { @@ -1010,7 +1009,7 @@ _return: QW_TASK_DLOG("ready msg rsped, code:%x - %s", code, tstrerror(code)); } - QW_RET(code); + QW_RET(TSDB_CODE_SUCCESS); } @@ -1081,7 +1080,9 @@ _return: } while (true); input.code = code; - QW_RET(qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_CQUERY, &input, NULL)); + qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_CQUERY, &input, NULL); + + QW_RET(TSDB_CODE_SUCCESS); } @@ -1148,7 +1149,7 @@ _return: QW_TASK_DLOG("fetch msg rsped, code:%x, dataLen:%d", code, dataLen); } - QW_RET(code); + QW_RET(TSDB_CODE_SUCCESS); } @@ -1207,12 +1208,12 @@ _return: } if (TSDB_CODE_SUCCESS != code || needRsp) { - QW_ERR_RET(qwBuildAndSendDropRsp(&qwMsg->connInfo, code)); + qwBuildAndSendDropRsp(&qwMsg->connInfo, code); QW_TASK_DLOG("drop msg rsped, code:%x", code); } - QW_RET(code); + QW_RET(TSDB_CODE_SUCCESS); } int32_t qwProcessHb(SQWorkerMgmt *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) { @@ -1220,18 +1221,22 @@ int32_t qwProcessHb(SQWorkerMgmt *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) { SSchedulerHbRsp rsp = {0}; SQWSchStatus *sch = NULL; uint64_t seqId = 0; + void *origHandle = NULL; memcpy(&rsp.epId, &req->epId, sizeof(req->epId)); QW_ERR_JRET(qwAcquireAddScheduler(mgmt, req->sId, QW_READ, &sch)); - atomic_store_ptr(&sch->hbConnection, qwMsg->connInfo); - ++sch->hbSeqId; - - rsp.seqId = sch->hbSeqId; - - QW_DLOG("hb connection updated, seqId:%" PRIx64 ", sId:%" PRIx64 ", nodeId:%d, fqdn:%s, port:%d, handle:%p, ahandle:%p", - sch->hbSeqId, req->sId, req->epId.nodeId, req->epId.ep.fqdn, req->epId.ep.port, qwMsg->connInfo.handle, qwMsg->connInfo.ahandle); + QW_LOCK(QW_WRITE, &sch->connLock); + + origHandle = sch->connInfo.handle; + + memcpy(&sch->connInfo, &qwMsg->connInfo, sizeof(qwMsg->connInfo)); + + QW_UNLOCK(QW_WRITE, &sch->connLock); + + QW_DLOG("hb connection updated, sId:%" PRIx64 ", nodeId:%d, fqdn:%s, port:%d, handle:%p, ahandle:%p", + req->sId, req->epId.nodeId, req->epId.ep.fqdn, req->epId.ep.port, qwMsg->connInfo.handle, qwMsg->connInfo.ahandle); qwReleaseScheduler(QW_READ, mgmt); @@ -1239,7 +1244,7 @@ _return: qwBuildAndSendHbRsp(&qwMsg->connInfo, &rsp, code); - QW_RET(code); + QW_RET(TSDB_CODE_SUCCESS); } @@ -1288,8 +1293,8 @@ _return: QW_UNLOCK(QW_READ, &mgmt->schLock); for (int32_t j = 0; j < i; ++j) { - QW_DLOG("hb on connection %p, taskNum:%d", rspList[j].connection, (rspList[j].rsp.taskStatus ? (int32_t)taosArrayGetSize(rspList[j].rsp.taskStatus) : 0)); - qwBuildAndSendHbRsp(rspList[j].connection, &rspList[j].rsp, code); + QW_DLOG("hb on connection handle %p, taskNum:%d", rspList[j].connInfo.handle, (rspList[j].rsp.taskStatus ? (int32_t)taosArrayGetSize(rspList[j].rsp.taskStatus) : 0)); + qwBuildAndSendHbRsp(&rspList[j].connInfo, &rspList[j].rsp, code); tFreeSSchedulerHbRsp(&rspList[j].rsp); } diff --git a/source/libs/qworker/src/qworkerMsg.c b/source/libs/qworker/src/qworkerMsg.c index b07ddb7196..42d5b94397 100644 --- a/source/libs/qworker/src/qworkerMsg.c +++ b/source/libs/qworker/src/qworkerMsg.c @@ -484,11 +484,15 @@ int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { msg->taskId = be64toh(msg->taskId); msg->refId = be64toh(msg->refId); + SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0}; + qwMsg.connInfo.handle = pMsg->handle; + qwMsg.connInfo.ahandle = pMsg->ahandle; + //QW_ERR_JRET(qwCancelTask(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId)); _return: - QW_ERR_RET(qwBuildAndSendCancelRsp(pMsg, code)); + QW_ERR_RET(qwBuildAndSendCancelRsp(&qwMsg.connInfo, code)); return TSDB_CODE_SUCCESS; } @@ -572,7 +576,7 @@ int32_t qWorkerProcessShowMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { int32_t code = 0; SVShowTablesReq *pReq = pMsg->pCont; - QW_ERR_RET(qwBuildAndSendShowRsp(pMsg, code)); + QW_RET(qwBuildAndSendShowRsp(pMsg, code)); } int32_t qWorkerProcessShowFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { @@ -581,7 +585,7 @@ int32_t qWorkerProcessShowFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) } SVShowTablesFetchReq *pFetchReq = pMsg->pCont; - QW_ERR_RET(qwBuildAndSendShowFetchRsp(pMsg, pFetchReq)); + QW_RET(qwBuildAndSendShowFetchRsp(pMsg, pFetchReq)); } diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index 1c40f255cf..2c0311d593 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -44,7 +44,6 @@ typedef struct SSchTrans { typedef struct SSchHbTrans { SRWLatch lock; - uint64_t seqId; SSchTrans trans; } SSchHbTrans; diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index fc28427c05..7bad1860a9 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -18,6 +18,7 @@ #include "schedulerInt.h" #include "tmsg.h" #include "tref.h" +#include "trpc.h" SSchedulerMgmt schMgmt = {0}; @@ -73,7 +74,7 @@ void schFreeRpcCtx(SRpcCtx *pCtx) { while (pIter) { SRpcCtxVal *ctxVal = (SRpcCtxVal *)pIter; - ctxVal->free(ctxVal->v); + ctxVal->free(ctxVal->val); pIter = taosHashIterate(pCtx->args, pIter); } @@ -127,7 +128,7 @@ int32_t schValidateTaskReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t m } SCH_SET_TASK_LASTMSG_TYPE(pTask, -1); - return; + return TSDB_CODE_SUCCESS; case TDMT_VND_RES_READY_RSP: reqMsgType = TDMT_VND_QUERY; break; @@ -658,52 +659,42 @@ int32_t schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask) { return TSDB_CODE_SUCCESS; } -int32_t schUpdateHbConnection(SQueryNodeEpId *epId, SSchHbTrans *trans) { +int32_t schRegisterHbConnection(SQueryNodeEpId *epId, bool *exist) { int32_t code = 0; - SSchHbTrans *hb = NULL; - - while (true) { - hb = taosHashGet(schMgmt.hbConnections, epId, sizeof(SQueryNodeEpId)); - if (NULL == hb) { - code = taosHashPut(schMgmt.hbConnections, epId, sizeof(SQueryNodeEpId), trans, sizeof(SSchHbTrans)); - if (code) { - if (HASH_NODE_EXIST(code)) { - continue; - } - - qError("taosHashPut hb trans failed, nodeId:%d, fqdn:%s, port:%d", epId->nodeId, epId->ep.fqdn, epId->ep.port); - SCH_ERR_RET(code); - } - - qDebug("hb connection updated, seqId:%" PRIx64 ", sId:%" PRIx64 - ", nodeId:%d, fqdn:%s, port:%d, instance:%p, connection:%p", - trans->seqId, schMgmt.sId, epId->nodeId, epId->ep.fqdn, epId->ep.port, trans->trans.transInst, - trans->trans.transHandle); + SSchHbTrans hb = {0}; + code = taosHashPut(schMgmt.hbConnections, epId, sizeof(SQueryNodeEpId), &hb, sizeof(SSchHbTrans)); + if (code) { + if (HASH_NODE_EXIST(code)) { + *exist = true; return TSDB_CODE_SUCCESS; } - break; + qError("taosHashPut hb trans failed, nodeId:%d, fqdn:%s, port:%d", epId->nodeId, epId->ep.fqdn, epId->ep.port); + SCH_ERR_RET(code); } - SCH_LOCK(SCH_WRITE, &hb->lock); + return TSDB_CODE_SUCCESS; +} - if (hb->seqId >= trans->seqId) { - qDebug("hb trans seqId is old, seqId:%" PRId64 ", currentId:%" PRId64 ", nodeId:%d, fqdn:%s, port:%d", trans->seqId, - hb->seqId, epId->nodeId, epId->ep.fqdn, epId->ep.port); - SCH_UNLOCK(SCH_WRITE, &hb->lock); - return TSDB_CODE_SUCCESS; +int32_t schUpdateHbConnection(SQueryNodeEpId *epId, SSchHbTrans *trans) { + int32_t code = 0; + SSchHbTrans *hb = NULL; + + hb = taosHashGet(schMgmt.hbConnections, epId, sizeof(SQueryNodeEpId)); + if (NULL == hb) { + qError("taosHashGet hb connection failed, nodeId:%d, fqdn:%s, port:%d", epId->nodeId, epId->ep.fqdn, epId->ep.port); + SCH_ERR_RET(code); } - hb->seqId = trans->seqId; + SCH_LOCK(SCH_WRITE, &hb->lock); memcpy(&hb->trans, &trans->trans, sizeof(trans->trans)); - SCH_UNLOCK(SCH_WRITE, &hb->lock); - qDebug("hb connection updated, seqId:%" PRIx64 ", sId:%" PRIx64 + qDebug("hb connection updated, sId:%" PRIx64 ", nodeId:%d, fqdn:%s, port:%d, instance:%p, connection:%p", - trans->seqId, schMgmt.sId, epId->nodeId, epId->ep.fqdn, epId->ep.port, trans->trans.transInst, + schMgmt.sId, epId->nodeId, epId->ep.fqdn, epId->ep.port, trans->trans.transInst, trans->trans.transHandle); return TSDB_CODE_SUCCESS; @@ -1159,14 +1150,11 @@ int32_t schHandleHbCallback(void *param, const SDataBuf *pMsg, int32_t code) { SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } - if (rsp.seqId != (uint64_t)-1) { - SSchHbTrans trans = {0}; - trans.seqId = rsp.seqId; - trans.trans.transInst = pParam->transport; - trans.trans.transHandle = pMsg->handle; + SSchHbTrans trans = {0}; + trans.trans.transInst = pParam->transport; + trans.trans.transHandle = pMsg->handle; - SCH_RET(schUpdateHbConnection(&rsp.epId, &trans)); - } + SCH_RET(schUpdateHbConnection(&rsp.epId, &trans)); int32_t taskNum = (int32_t)taosArrayGetSize(rsp.taskStatus); for (int32_t i = 0; i < taskNum; ++i) { @@ -1232,7 +1220,7 @@ void schFreeRpcCtxVal(void *arg) { tfree(pMsgSendInfo->param); tfree(pMsgSendInfo); } - + int32_t schMakeQueryRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) { int32_t code = 0; SSchCallbackParam *param = NULL; @@ -1268,7 +1256,7 @@ int32_t schMakeQueryRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) { pMsgSendInfo->param = param; pMsgSendInfo->fp = fp; - SRpcCtxVal ctxVal = {.v = pMsgSendInfo, .len = sizeof(SMsgSendInfo), .free = schFreeRpcCtxVal}; + SRpcCtxVal ctxVal = {.val = pMsgSendInfo, .len = sizeof(SMsgSendInfo), .free = schFreeRpcCtxVal}; if (taosHashPut(pCtx->args, &msgType, sizeof(msgType), &ctxVal, sizeof(ctxVal))) { SCH_TASK_ELOG("taosHashPut msg %d to rpcCtx failed", msgType); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); @@ -1320,7 +1308,7 @@ int32_t schMakeHbRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) { pMsgSendInfo->param = param; pMsgSendInfo->fp = fp; - SRpcCtxVal ctxVal = {.v = pMsgSendInfo, .len = sizeof(SMsgSendInfo), .free = schFreeRpcCtxVal}; + SRpcCtxVal ctxVal = {.val = pMsgSendInfo, .len = sizeof(SMsgSendInfo), .free = schFreeRpcCtxVal}; if (taosHashPut(pCtx->args, &msgType, sizeof(msgType), &ctxVal, sizeof(ctxVal))) { SCH_TASK_ELOG("taosHashPut msg %d to rpcCtx failed", msgType); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); @@ -1557,7 +1545,11 @@ int32_t schEnsureHbConnection(SSchJob *pJob, SSchTask *pTask) { SSchHbTrans *hb = taosHashGet(schMgmt.hbConnections, &epId, sizeof(SQueryNodeEpId)); if (NULL == hb) { - SCH_ERR_RET(schBuildAndSendMsg(pJob, NULL, addr, TDMT_VND_QUERY_HEARTBEAT)); + bool exist = false; + SCH_ERR_RET(schRegisterHbConnection(&epId, &exist)); + if (!exist) { + SCH_ERR_RET(schBuildAndSendMsg(pJob, NULL, addr, TDMT_VND_QUERY_HEARTBEAT)); + } } return TSDB_CODE_SUCCESS; diff --git a/source/libs/scheduler/test/schedulerTests.cpp b/source/libs/scheduler/test/schedulerTests.cpp index bf73617d3e..503f5de5f8 100644 --- a/source/libs/scheduler/test/schedulerTests.cpp +++ b/source/libs/scheduler/test/schedulerTests.cpp @@ -746,7 +746,7 @@ TEST(queryTest, readyFirstCase) { SSchJob *pJob = schAcquireJob(job); - pIter = taosHashIterate(pJob->execTasks, NULL); + void *pIter = taosHashIterate(pJob->execTasks, NULL); while (pIter) { SSchTask *task = *(SSchTask **)pIter; @@ -757,7 +757,7 @@ TEST(queryTest, readyFirstCase) { pIter = taosHashIterate(pJob->execTasks, pIter); } - void *pIter = taosHashIterate(pJob->execTasks, NULL); + pIter = taosHashIterate(pJob->execTasks, NULL); while (pIter) { SSchTask *task = *(SSchTask **)pIter; @@ -792,11 +792,11 @@ TEST(queryTest, readyFirstCase) { - pthread_attr_t thattr; - pthread_attr_init(&thattr); + TdThreadAttr thattr; + taosThreadAttrInit(&thattr); - pthread_t thread1; - pthread_create(&(thread1), &thattr, schtCreateFetchRspThread, &job); + TdThread thread1; + taosThreadCreate(&(thread1), &thattr, schtCreateFetchRspThread, &job); void *data = NULL; code = schedulerFetchRows(job, &data); -- GitLab