diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h index f61073c9a5c31ba92752dcec8071a50c78b6999d..b88afcb39df49c518b41968eee74638ca2143ae1 100644 --- a/include/libs/catalog/catalog.h +++ b/include/libs/catalog/catalog.h @@ -213,7 +213,6 @@ int32_t catalogGetTableHashVgroup(SCatalog* pCatalog, void * pTransporter, const */ int32_t catalogGetAllMeta(SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SCatalogReq* pReq, SMetaData* pRsp); - int32_t catalogGetQnodeList(SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, SArray* pQnodeList); int32_t catalogGetExpiredSTables(SCatalog* pCatalog, SSTableMetaVersion **stables, uint32_t *num); diff --git a/source/libs/parser/src/parInsert.c b/source/libs/parser/src/parInsert.c index cdccf0b1a17a15bb132ba4d53c2c6fda5bba9878..99c849634d841993b5abac91dc59d9d0d79d1d89 100644 --- a/source/libs/parser/src/parInsert.c +++ b/source/libs/parser/src/parInsert.c @@ -1325,7 +1325,6 @@ int32_t qBindStmtColsValue(void *pBlock, TAOS_BIND_v2 *bind, char *msgBuf, int32 STSRow* row = (STSRow*)(pDataBlock->pData + pDataBlock->size); // skip the SSubmitBlk header tdSRowResetBuf(pBuilder, row); - // 1. set the parsed value from sql string for (int c = 0; c < spd->numOfBound; ++c) { SSchema* pColSchema = &pSchema[spd->boundColumns[c] - 1]; diff --git a/source/libs/qworker/inc/qworkerMsg.h b/source/libs/qworker/inc/qworkerMsg.h index c0f4d6d1574c0e9b843d600ac81c84a2a6264f50..fb1950d16bb6dafcbf8c44537e94a33d51b58cf3 100644 --- a/source/libs/qworker/inc/qworkerMsg.h +++ b/source/libs/qworker/inc/qworkerMsg.h @@ -43,7 +43,8 @@ void qwFreeFetchRsp(void *msg); int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp); int32_t qwGetSchTasksStatus(SQWorkerMgmt *mgmt, uint64_t sId, SSchedulerStatusRsp **rsp); int32_t qwBuildAndSendHbRsp(SQWConnInfo *pConn, SSchedulerHbRsp *rsp, int32_t code); -int32_t qwRegisterBrokenLinkArg(QW_FPARAMS_DEF, SQWConnInfo *pConn); +int32_t qwRegisterQueryBrokenLinkArg(QW_FPARAMS_DEF, SQWConnInfo *pConn); +int32_t qwRegisterHbBrokenLinkArg(SQWorkerMgmt *mgmt, uint64_t sId, SQWConnInfo *pConn); #ifdef __cplusplus } diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index f97d7ffae0b344716f4947f436bb89e0ad665fd0..a74c28cf5395c7e37b382654e89234d5e8f93ba1 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -948,7 +948,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t ex DataSinkHandle sinkHandle = NULL; SQWTaskCtx *ctx = NULL; - QW_ERR_JRET(qwRegisterBrokenLinkArg(QW_FPARAMS(), &qwMsg->connInfo)); + QW_ERR_JRET(qwRegisterQueryBrokenLinkArg(QW_FPARAMS(), &qwMsg->connInfo)); QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_QUERY, &input, NULL)); @@ -1285,23 +1285,51 @@ _return: QW_RET(TSDB_CODE_SUCCESS); } +int32_t qwProcessHbLinkBroken(SQWorkerMgmt *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) { + int32_t code = 0; + SSchedulerHbRsp rsp = {0}; + SQWSchStatus *sch = NULL; + + QW_ERR_RET(qwAcquireAddScheduler(mgmt, req->sId, QW_READ, &sch)); + + QW_LOCK(QW_WRITE, &sch->hbConnLock); + + if (qwMsg->connInfo.handle == sch->hbConnInfo.handle) { + tmsgReleaseHandle(sch->hbConnInfo.handle, TAOS_CONN_SERVER); + sch->hbConnInfo.handle = NULL; + sch->hbConnInfo.ahandle = NULL; + + QW_DLOG("release hb handle due to connection broken, handle:%p", qwMsg->connInfo.handle); + } else { + QW_DLOG("ignore hb connection broken, handle:%p, currentHandle:%p", qwMsg->connInfo.handle, sch->hbConnInfo.handle); + } + + QW_UNLOCK(QW_WRITE, &sch->hbConnLock); + + qwReleaseScheduler(QW_READ, mgmt); + + QW_RET(TSDB_CODE_SUCCESS); +} + int32_t qwProcessHb(SQWorkerMgmt *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) { int32_t code = 0; SSchedulerHbRsp rsp = {0}; SQWSchStatus *sch = NULL; - uint64_t seqId = 0; - void *origHandle = NULL; - memcpy(&rsp.epId, &req->epId, sizeof(req->epId)); + if (qwMsg->code) { + QW_RET(qwProcessHbLinkBroken(mgmt, qwMsg, req)); + } QW_ERR_JRET(qwAcquireAddScheduler(mgmt, req->sId, QW_READ, &sch)); + QW_ERR_JRET(qwRegisterHbBrokenLinkArg(mgmt, req->sId, &qwMsg->connInfo)); + QW_LOCK(QW_WRITE, &sch->hbConnLock); if (sch->hbConnInfo.handle) { tmsgReleaseHandle(sch->hbConnInfo.handle, TAOS_CONN_SERVER); } - + memcpy(&sch->hbConnInfo, &qwMsg->connInfo, sizeof(qwMsg->connInfo)); memcpy(&sch->hbEpId, &req->epId, sizeof(req->epId)); @@ -1314,7 +1342,14 @@ int32_t qwProcessHb(SQWorkerMgmt *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) { _return: + memcpy(&rsp.epId, &req->epId, sizeof(req->epId)); + qwBuildAndSendHbRsp(&qwMsg->connInfo, &rsp, code); + + if (code) { + tmsgReleaseHandle(qwMsg->connInfo.handle, TAOS_CONN_SERVER); + } + QW_DLOG("hb rsp send, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code)); QW_RET(TSDB_CODE_SUCCESS); diff --git a/source/libs/qworker/src/qworkerMsg.c b/source/libs/qworker/src/qworkerMsg.c index 15a42d3a3157411ba3a9bbbdaa6d0576d41ce7d4..6723eb21c1c6ad96db086fc7e08c9826249fc5c7 100644 --- a/source/libs/qworker/src/qworkerMsg.c +++ b/source/libs/qworker/src/qworkerMsg.c @@ -286,7 +286,7 @@ int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SQWConnInfo *pConn) { } -int32_t qwRegisterBrokenLinkArg(QW_FPARAMS_DEF, SQWConnInfo *pConn) { +int32_t qwRegisterQueryBrokenLinkArg(QW_FPARAMS_DEF, SQWConnInfo *pConn) { STaskDropReq * req = (STaskDropReq *)rpcMallocCont(sizeof(STaskDropReq)); if (NULL == req) { QW_SCH_TASK_ELOG("rpcMallocCont %d failed", (int32_t)sizeof(STaskDropReq)); @@ -313,6 +313,42 @@ int32_t qwRegisterBrokenLinkArg(QW_FPARAMS_DEF, SQWConnInfo *pConn) { return TSDB_CODE_SUCCESS; } +int32_t qwRegisterHbBrokenLinkArg(SQWorkerMgmt *mgmt, uint64_t sId, SQWConnInfo *pConn) { + SSchedulerHbReq req = {0}; + req.header.vgId = mgmt->nodeId; + req.sId = sId; + + int32_t msgSize = tSerializeSSchedulerHbReq(NULL, 0, &req); + if (msgSize < 0) { + QW_SCH_ELOG("tSerializeSSchedulerHbReq hbReq failed, size:%d", msgSize); + QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + void *msg = rpcMallocCont(msgSize); + if (NULL == msg) { + QW_SCH_ELOG("calloc %d failed", msgSize); + QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + if (tSerializeSSchedulerHbReq(msg, msgSize, &req) < 0) { + QW_SCH_ELOG("tSerializeSSchedulerHbReq hbReq failed, size:%d", msgSize); + taosMemoryFree(msg); + QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + SRpcMsg pMsg = { + .handle = pConn->handle, + .ahandle = pConn->ahandle, + .msgType = TDMT_VND_QUERY_HEARTBEAT, + .pCont = msg, + .contLen = sizeof(SSchedulerHbReq), + .code = TSDB_CODE_RPC_NETWORK_UNAVAIL, + }; + + tmsgRegisterBrokenLinkArg(&mgmt->msgCb, &pMsg); + + return TSDB_CODE_SUCCESS; +} + + int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) { @@ -587,10 +623,14 @@ int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { } uint64_t sId = req.sId; - SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0}; + SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .code = pMsg->code}; qwMsg.connInfo.handle = pMsg->handle; qwMsg.connInfo.ahandle = pMsg->ahandle; + if (TSDB_CODE_RPC_NETWORK_UNAVAIL == pMsg->code) { + QW_SCH_DLOG("receive Hb msg due to network broken, error:%s", tstrerror(pMsg->code)); + } + QW_SCH_DLOG("processHb start, node:%p, handle:%p", node, pMsg->handle); QW_ERR_RET(qwProcessHb(mgmt, &qwMsg, &req));