提交 79394deb 编写于 作者: D dapan1121

update epset

上级 6e465ebd
......@@ -43,6 +43,12 @@ typedef enum {
TASK_TYPE_TEMP,
} ETaskType;
typedef enum {
TARGET_TYPE_MNODE = 1,
TARGET_TYPE_VNODE,
TARGET_TYPE_OTHER,
} ETargetType;
typedef struct STableComInfo {
uint8_t numOfTags; // the number of tags in schema
uint8_t precision; // the number of precision
......@@ -126,11 +132,18 @@ typedef struct SDataBuf {
void* handle;
} SDataBuf;
typedef struct STargetInfo {
ETargetType type;
char dbFName[TSDB_DB_FNAME_LEN]; // used to update db's vgroup epset
int32_t vgId;
} STargetInfo;
typedef int32_t (*__async_send_cb_fn_t)(void* param, const SDataBuf* pMsg, int32_t code);
typedef int32_t (*__async_exec_fn_t)(void* param);
typedef struct SMsgSendInfo {
__async_send_cb_fn_t fp; // async callback function
STargetInfo target; // for update epset
void* param;
uint64_t requestId;
uint64_t requestObjRefId;
......
......@@ -730,23 +730,55 @@ static void destroySendMsgInfo(SMsgSendInfo* pMsgBody) {
taosMemoryFreeClear(pMsgBody);
}
void updateTargetEpSet(SMsgSendInfo* pSendInfo, STscObj* pTscObj, SRpcMsg* pMsg, SEpSet* pEpSet) {
if (NULL == pEpSet) {
return;
}
switch (pSendInfo->target.type) {
case TARGET_TYPE_MNODE:
if (NULL == pTscObj) {
tscError("mnode epset changed but not able to update it, reqObjRefId:%" PRIx64, pSendInfo->requestObjRefId);
return;
}
updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, pEpSet);
break;
case TARGET_TYPE_VNODE: {
if (NULL == pTscObj) {
tscError("vnode epset changed but not able to update it, reqObjRefId:%" PRIx64, pSendInfo->requestObjRefId);
return;
}
SCatalog* pCatalog = NULL;
int32_t code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
if (code != TSDB_CODE_SUCCESS) {
tscError("fail to get catalog handle, clusterId:%" PRIx64 ", error %s", pTscObj->pAppInfo->clusterId, tstrerror(code));
return;
}
catalogUpdateVgEpSet(pCatalog, pSendInfo->target.dbFName, pSendInfo->target.vgId, pEpSet);
break;
}
default:
tscDebug("epset changed, not updated, msgType %s", TMSG_INFO(pMsg->msgType));
break;
}
}
void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->info.ahandle;
assert(pMsg->info.ahandle != NULL);
SRequestObj* pRequest = NULL;
STscObj* pTscObj = NULL;
if (pSendInfo->requestObjRefId != 0) {
SRequestObj* pRequest = (SRequestObj*)taosAcquireRef(clientReqRefPool, pSendInfo->requestObjRefId);
assert(pRequest->self == pSendInfo->requestObjRefId);
pRequest->metric.rsp = taosGetTimestampUs();
//STscObj* pTscObj = pRequest->pTscObj;
//if (pEpSet) {
// if (!isEpsetEqual(&pTscObj->pAppInfo->mgmtEp.epSet, pEpSet)) {
// updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, pEpSet);
// }
//}
pTscObj = pRequest->pTscObj;
/*
* There is not response callback function for submit response.
* The actual inserted number of points is the first number.
......@@ -763,6 +795,8 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId);
}
updateTargetEpSet(pSendInfo, pTscObj, pMsg, pEpSet);
SDataBuf buf = {.len = pMsg->contLen, .pData = NULL, .handle = pMsg->info.handle};
if (pMsg->contLen > 0) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册