diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index a30f3be7a17398b91db04678d14c3648dcab38ed..296b18e8dea7524244dcd8ade1a1149bfe97533d 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -43,6 +43,12 @@ typedef enum { TASK_TYPE_TEMP, } ETaskType; +typedef enum { + TARGET_TYPE_MNODE = 1, + TARGET_TYPE_VNODE, + TARGET_TYPE_OTHER, +} ETargetType; + typedef struct STableComInfo { uint8_t numOfTags; // the number of tags in schema uint8_t precision; // the number of precision @@ -126,11 +132,18 @@ typedef struct SDataBuf { void* handle; } SDataBuf; +typedef struct STargetInfo { + ETargetType type; + char dbFName[TSDB_DB_FNAME_LEN]; // used to update db's vgroup epset + int32_t vgId; +} STargetInfo; + typedef int32_t (*__async_send_cb_fn_t)(void* param, const SDataBuf* pMsg, int32_t code); typedef int32_t (*__async_exec_fn_t)(void* param); typedef struct SMsgSendInfo { __async_send_cb_fn_t fp; // async callback function + STargetInfo target; // for update epset void* param; uint64_t requestId; uint64_t requestObjRefId; diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 91e0d7623736b9f526c5a152fb2a5076517f0db5..ebbeadccb390a55556527e4b2f4ef1c96d04e76a 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -730,23 +730,55 @@ static void destroySendMsgInfo(SMsgSendInfo* pMsgBody) { taosMemoryFreeClear(pMsgBody); } +void updateTargetEpSet(SMsgSendInfo* pSendInfo, STscObj* pTscObj, SRpcMsg* pMsg, SEpSet* pEpSet) { + if (NULL == pEpSet) { + return; + } + + switch (pSendInfo->target.type) { + case TARGET_TYPE_MNODE: + if (NULL == pTscObj) { + tscError("mnode epset changed but not able to update it, reqObjRefId:%" PRIx64, pSendInfo->requestObjRefId); + return; + } + + updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, pEpSet); + break; + case TARGET_TYPE_VNODE: { + if (NULL == pTscObj) { + tscError("vnode epset changed but not able to update it, reqObjRefId:%" PRIx64, pSendInfo->requestObjRefId); + return; + } + + SCatalog* pCatalog = NULL; + int32_t code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog); + if (code != TSDB_CODE_SUCCESS) { + tscError("fail to get catalog handle, clusterId:%" PRIx64 ", error %s", pTscObj->pAppInfo->clusterId, tstrerror(code)); + return; + } + + catalogUpdateVgEpSet(pCatalog, pSendInfo->target.dbFName, pSendInfo->target.vgId, pEpSet); + break; + } + default: + tscDebug("epset changed, not updated, msgType %s", TMSG_INFO(pMsg->msgType)); + break; + } +} + + void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->info.ahandle; assert(pMsg->info.ahandle != NULL); + SRequestObj* pRequest = NULL; + STscObj* pTscObj = NULL; if (pSendInfo->requestObjRefId != 0) { SRequestObj* pRequest = (SRequestObj*)taosAcquireRef(clientReqRefPool, pSendInfo->requestObjRefId); assert(pRequest->self == pSendInfo->requestObjRefId); pRequest->metric.rsp = taosGetTimestampUs(); - - //STscObj* pTscObj = pRequest->pTscObj; - //if (pEpSet) { - // if (!isEpsetEqual(&pTscObj->pAppInfo->mgmtEp.epSet, pEpSet)) { - // updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, pEpSet); - // } - //} - + pTscObj = pRequest->pTscObj; /* * There is not response callback function for submit response. * The actual inserted number of points is the first number. @@ -763,6 +795,8 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId); } + updateTargetEpSet(pSendInfo, pTscObj, pMsg, pEpSet); + SDataBuf buf = {.len = pMsg->contLen, .pData = NULL, .handle = pMsg->info.handle}; if (pMsg->contLen > 0) {