From fdfa789cbcffb9f621badec2d09e6c8ca3ebda10 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sat, 18 Jul 2020 12:13:01 +0000 Subject: [PATCH] replace ipList and ipSet to epSet --- src/client/inc/tscUtil.h | 2 +- src/client/inc/tsclient.h | 6 +- src/client/src/tscSQLParser.c | 4 +- src/client/src/tscServer.c | 102 ++++++++++----------- src/client/src/tscSql.c | 4 +- src/client/src/tscSubquery.c | 14 +-- src/client/src/tscSystem.c | 6 +- src/client/src/tscUtil.c | 16 ++-- src/dnode/inc/dnodeMgmt.h | 4 +- src/dnode/src/dnodeMgmt.c | 104 +++++++++++----------- src/dnode/src/dnodePeer.c | 24 ++--- src/dnode/src/dnodeShell.c | 4 +- src/inc/dnode.h | 6 +- src/inc/taosmsg.h | 10 +-- src/inc/trpc.h | 14 +-- src/kit/taosmigrate/taosmigrate.c | 8 +- src/kit/taosmigrate/taosmigrate.h | 2 +- src/kit/taosmigrate/taosmigrateDnodeCfg.c | 36 ++++---- src/mnode/inc/mnodeMnode.h | 6 +- src/mnode/inc/mnodeVgroup.h | 6 +- src/mnode/src/mnodeDnode.c | 10 +-- src/mnode/src/mnodeMnode.c | 48 +++++----- src/mnode/src/mnodePeer.c | 14 +-- src/mnode/src/mnodeRead.c | 14 +-- src/mnode/src/mnodeSdb.c | 2 +- src/mnode/src/mnodeShow.c | 4 +- src/mnode/src/mnodeTable.c | 28 +++--- src/mnode/src/mnodeVgroup.c | 58 ++++++------ src/mnode/src/mnodeWrite.c | 16 ++-- src/rpc/src/rpcMain.c | 60 ++++++------- src/rpc/test/rclient.c | 30 +++---- src/rpc/test/rsclient.c | 26 +++--- src/rpc/test/rserver.c | 2 +- 33 files changed, 345 insertions(+), 345 deletions(-) diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index 6a5c2d0099..786133a8f3 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -268,7 +268,7 @@ bool hasMoreClauseToTry(SSqlObj* pSql); void tscTryQueryNextVnode(SSqlObj *pSql, __async_cb_func_t fp); void tscAsyncQuerySingleRowForNextVnode(void *param, TAOS_RES *tres, int numOfRows); void tscTryQueryNextClause(SSqlObj* pSql, __async_cb_func_t fp); -int tscSetMgmtIpListFromCfg(const char *first, const char *second); +int tscSetMgmtEpSetFromCfg(const char *first, const char *second); void* malloc_throw(size_t size); void* calloc_throw(size_t nmemb, size_t size); diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 1b30c4ffca..ef8184990d 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -306,7 +306,7 @@ typedef struct SSqlObj { char * sqlstr; char retry; char maxRetry; - SRpcIpSet ipList; + SRpcEpSet epSet; char listed; tsem_t rspSem; SSqlCmd cmd; @@ -350,7 +350,7 @@ void tscInitMsgsFp(); int tsParseSql(SSqlObj *pSql, bool initial); -void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcIpSet *pIpSet); +void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet); int tscProcessSql(SSqlObj *pSql); int tscRenewTableMeta(SSqlObj *pSql, char *tableId); @@ -456,7 +456,7 @@ extern void * tscQhandle; extern int tscKeepConn[]; extern int tsInsertHeadSize; extern int tscNumOfThreads; -extern SRpcIpSet tscMgmtIpSet; +extern SRpcEpSet tscMgmtEpSet; extern int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo); diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index f9b12f3f6d..b97e486449 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -2355,9 +2355,9 @@ bool validateIpAddress(const char* ip, size_t size) { strncpy(tmp, ip, size); - in_addr_t ipAddr = inet_addr(tmp); + in_addr_t epAddr = inet_addr(tmp); - return ipAddr != INADDR_NONE; + return epAddr != INADDR_NONE; } int32_t tscTansformSQLFuncForSTableQuery(SQueryInfo* pQueryInfo) { diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 99ac2249e0..398dea9d09 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -29,8 +29,8 @@ #define TSC_MGMT_VNODE 999 -SRpcIpSet tscMgmtIpSet; -SRpcIpSet tscDnodeIpSet; +SRpcEpSet tscMgmtEpSet; +SRpcEpSet tscDnodeEpSet; int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo) = {0}; @@ -44,44 +44,44 @@ void tscSaveSubscriptionProgress(void* sub); static int32_t minMsgSize() { return tsRpcHeadSize + 100; } -static void tscSetDnodeIpList(SSqlObj* pSql, SCMVgroupInfo* pVgroupInfo) { - SRpcIpSet* pIpList = &pSql->ipList; - pIpList->inUse = 0; +static void tscSetDnodeEpSet(SSqlObj* pSql, SCMVgroupInfo* pVgroupInfo) { + SRpcEpSet* pEpSet = &pSql->epSet; + pEpSet->inUse = 0; if (pVgroupInfo == NULL) { - pIpList->numOfIps = 0; + pEpSet->numOfEps = 0; return; } - pIpList->numOfIps = pVgroupInfo->numOfIps; - for(int32_t i = 0; i < pVgroupInfo->numOfIps; ++i) { - strcpy(pIpList->fqdn[i], pVgroupInfo->ipAddr[i].fqdn); - pIpList->port[i] = pVgroupInfo->ipAddr[i].port; + pEpSet->numOfEps = pVgroupInfo->numOfEps; + for(int32_t i = 0; i < pVgroupInfo->numOfEps; ++i) { + strcpy(pEpSet->fqdn[i], pVgroupInfo->epAddr[i].fqdn); + pEpSet->port[i] = pVgroupInfo->epAddr[i].port; } } -void tscPrintMgmtIp() { - if (tscMgmtIpSet.numOfIps <= 0) { - tscError("invalid mnode IP list:%d", tscMgmtIpSet.numOfIps); +void tscPrintMgmtEp() { + if (tscMgmtEpSet.numOfEps <= 0) { + tscError("invalid mnode EP list:%d", tscMgmtEpSet.numOfEps); } else { - for (int i = 0; i < tscMgmtIpSet.numOfIps; ++i) { - tscDebug("mnode index:%d %s:%d", i, tscMgmtIpSet.fqdn[i], tscMgmtIpSet.port[i]); + for (int i = 0; i < tscMgmtEpSet.numOfEps; ++i) { + tscDebug("mnode index:%d %s:%d", i, tscMgmtEpSet.fqdn[i], tscMgmtEpSet.port[i]); } } } -void tscSetMgmtIpList(SRpcIpSet *pIpList) { - tscMgmtIpSet.numOfIps = pIpList->numOfIps; - tscMgmtIpSet.inUse = pIpList->inUse; - for (int32_t i = 0; i < tscMgmtIpSet.numOfIps; ++i) { - tscMgmtIpSet.port[i] = htons(pIpList->port[i]); +void tscSetMgmtEpSet(SRpcEpSet *pEpSet) { + tscMgmtEpSet.numOfEps = pEpSet->numOfEps; + tscMgmtEpSet.inUse = pEpSet->inUse; + for (int32_t i = 0; i < tscMgmtEpSet.numOfEps; ++i) { + tscMgmtEpSet.port[i] = htons(pEpSet->port[i]); } } -void tscUpdateIpSet(void *ahandle, SRpcIpSet *pIpSet) { - tscMgmtIpSet = *pIpSet; - tscDebug("mnode IP list is changed for ufp is called, numOfIps:%d inUse:%d", tscMgmtIpSet.numOfIps, tscMgmtIpSet.inUse); - for (int32_t i = 0; i < tscMgmtIpSet.numOfIps; ++i) { - tscDebug("index:%d fqdn:%s port:%d", i, tscMgmtIpSet.fqdn[i], tscMgmtIpSet.port[i]); +void tscUpdateEpSet(void *ahandle, SRpcEpSet *pEpSet) { + tscMgmtEpSet = *pEpSet; + tscDebug("mnode EP list is changed for ufp is called, numOfEps:%d inUse:%d", tscMgmtEpSet.numOfEps, tscMgmtEpSet.inUse); + for (int32_t i = 0; i < tscMgmtEpSet.numOfEps; ++i) { + tscDebug("index:%d fqdn:%s port:%d", i, tscMgmtEpSet.fqdn[i], tscMgmtEpSet.port[i]); } } @@ -95,7 +95,7 @@ void tscUpdateIpSet(void *ahandle, SRpcIpSet *pIpSet) { UNUSED_FUNC static int32_t tscGetMgmtConnMaxRetryTimes() { int32_t factor = 2; - return tscMgmtIpSet.numOfIps * factor; + return tscMgmtEpSet.numOfEps * factor; } void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) { @@ -111,9 +111,9 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) { if (code == 0) { SCMHeartBeatRsp *pRsp = (SCMHeartBeatRsp *)pRes->pRsp; - SRpcIpSet * pIpList = &pRsp->ipList; - if (pIpList->numOfIps > 0) - tscSetMgmtIpList(pIpList); + SRpcEpSet * pEpSet = &pRsp->epSet; + if (pEpSet->numOfEps > 0) + tscSetMgmtEpSet(pEpSet); pSql->pTscObj->connId = htonl(pRsp->connId); @@ -185,7 +185,7 @@ int tscSendMsgToServer(SSqlObj *pSql) { // set the mgmt ip list if (pSql->cmd.command >= TSDB_SQL_MGMT) { - pSql->ipList = tscMgmtIpSet; + pSql->epSet = tscMgmtEpSet; } memcpy(pMsg, pSql->cmd.payload, pSql->cmd.payloadLen); @@ -203,11 +203,11 @@ int tscSendMsgToServer(SSqlObj *pSql) { // Otherwise, the pSql object may have been released already during the response function, which is // processMsgFromServer function. In the meanwhile, the assignment of the rpc context to sql object will absolutely // cause crash. - rpcSendRequest(pObj->pDnodeConn, &pSql->ipList, &rpcMsg); + rpcSendRequest(pObj->pDnodeConn, &pSql->epSet, &rpcMsg); return TSDB_CODE_SUCCESS; } -void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcIpSet *pIpSet) { +void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { SSqlObj *pSql = (SSqlObj *)rpcMsg->ahandle; if (pSql == NULL || pSql->signature != pSql) { tscError("%p sql is already released", pSql); @@ -237,9 +237,9 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcIpSet *pIpSet) { } if (pCmd->command < TSDB_SQL_MGMT) { - if (pIpSet) pSql->ipList = *pIpSet; + if (pEpSet) pSql->epSet = *pEpSet; } else { - if (pIpSet) tscMgmtIpSet = *pIpSet; + if (pEpSet) tscMgmtEpSet = *pEpSet; } if (rpcMsg->pCont == NULL) { @@ -421,7 +421,7 @@ int tscProcessSql(SSqlObj *pSql) { return pSql->res.code; } } else if (pCmd->command < TSDB_SQL_LOCAL) { - pSql->ipList = tscMgmtIpSet; + pSql->epSet = tscMgmtEpSet; } else { // local handler return (*tscProcessMsgRsp[pCmd->command])(pSql); } @@ -525,10 +525,10 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) { // pSql->cmd.payloadLen is set during copying data into payload pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT; - tscSetDnodeIpList(pSql, &pTableMeta->vgroupInfo); + tscSetDnodeEpSet(pSql, &pTableMeta->vgroupInfo); - tscDebug("%p build submit msg, vgId:%d numOfTables:%d numberOfIP:%d", pSql, vgId, pSql->cmd.numOfTablesInSubmit, - pSql->ipList.numOfIps); + tscDebug("%p build submit msg, vgId:%d numOfTables:%d numberOfEP:%d", pSql, vgId, pSql->cmd.numOfTablesInSubmit, + pSql->epSet.numOfEps); return TSDB_CODE_SUCCESS; } @@ -568,7 +568,7 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char pVgroupInfo = &pTableMeta->vgroupInfo; } - tscSetDnodeIpList(pSql, pVgroupInfo); + tscSetDnodeEpSet(pSql, pVgroupInfo); if (pVgroupInfo != NULL) { pQueryMsg->head.vgId = htonl(pVgroupInfo->vgId); } @@ -580,7 +580,7 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char pQueryMsg->numOfTables = htonl(1); // set the number of tables pMsg += sizeof(STableIdInfo); - } else { // it is a subquery of the super table query, this IP info is acquired from vgroupInfo + } else { // it is a subquery of the super table query, this EP info is acquired from vgroupInfo int32_t index = pTableMetaInfo->vgroupIndex; int32_t numOfVgroups = taosArrayGetSize(pTableMetaInfo->pVgroupTables); assert(index >= 0 && index < numOfVgroups); @@ -590,7 +590,7 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char SVgroupTableInfo* pTableIdList = taosArrayGet(pTableMetaInfo->pVgroupTables, index); // set the vgroup info - tscSetDnodeIpList(pSql, &pTableIdList->vgInfo); + tscSetDnodeEpSet(pSql, &pTableIdList->vgInfo); pQueryMsg->head.vgId = htonl(pTableIdList->vgInfo.vgId); int32_t numOfTables = taosArrayGetSize(pTableIdList->itemList); @@ -1323,7 +1323,7 @@ int tscBuildUpdateTagMsg(SSqlObj* pSql, SSqlInfo *pInfo) { SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - tscSetDnodeIpList(pSql, &pTableMetaInfo->pTableMeta->vgroupInfo); + tscSetDnodeEpSet(pSql, &pTableMetaInfo->pTableMeta->vgroupInfo); return TSDB_CODE_SUCCESS; } @@ -1658,8 +1658,8 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) { pMetaMsg->contLen = htons(pMetaMsg->contLen); pMetaMsg->numOfColumns = htons(pMetaMsg->numOfColumns); - if (pMetaMsg->sid < 0 || pMetaMsg->vgroup.numOfIps < 0) { - tscError("invalid meter vgId:%d, sid%d", pMetaMsg->vgroup.numOfIps, pMetaMsg->sid); + if (pMetaMsg->sid < 0 || pMetaMsg->vgroup.numOfEps < 0) { + tscError("invalid meter vgId:%d, sid%d", pMetaMsg->vgroup.numOfEps, pMetaMsg->sid); return TSDB_CODE_TSC_INVALID_VALUE; } @@ -1673,8 +1673,8 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) { return TSDB_CODE_TSC_INVALID_VALUE; } - for (int i = 0; i < pMetaMsg->vgroup.numOfIps; ++i) { - pMetaMsg->vgroup.ipAddr[i].port = htons(pMetaMsg->vgroup.ipAddr[i].port); + for (int i = 0; i < pMetaMsg->vgroup.numOfEps; ++i) { + pMetaMsg->vgroup.epAddr[i].port = htons(pMetaMsg->vgroup.epAddr[i].port); } SSchema* pSchema = pMetaMsg->schema; @@ -1850,10 +1850,10 @@ int tscProcessSTableVgroupRsp(SSqlObj *pSql) { SCMVgroupInfo *pVgroups = &pInfo->vgroupList->vgroups[j]; pVgroups->vgId = htonl(pVgroups->vgId); - assert(pVgroups->numOfIps >= 1); + assert(pVgroups->numOfEps >= 1); - for (int32_t k = 0; k < pVgroups->numOfIps; ++k) { - pVgroups->ipAddr[k].port = htons(pVgroups->ipAddr[k].port); + for (int32_t k = 0; k < pVgroups->numOfEps; ++k) { + pVgroups->epAddr[k].port = htons(pVgroups->epAddr[k].port); } pMsg += size; @@ -1946,8 +1946,8 @@ int tscProcessConnectRsp(SSqlObj *pSql) { assert(len <= sizeof(pObj->db)); tstrncpy(pObj->db, temp, sizeof(pObj->db)); - if (pConnect->ipList.numOfIps > 0) - tscSetMgmtIpList(&pConnect->ipList); + if (pConnect->epSet.numOfEps > 0) + tscSetMgmtEpSet(&pConnect->epSet); strcpy(pObj->sversion, pConnect->serverVersion); pObj->writeAuth = pConnect->writeAuth; diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index d5160cd3c6..6a14d3a65e 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -62,8 +62,8 @@ SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con } if (ip) { - if (tscSetMgmtIpListFromCfg(ip, NULL) < 0) return NULL; - if (port) tscMgmtIpSet.port[0] = port; + if (tscSetMgmtEpSetFromCfg(ip, NULL) < 0) return NULL; + if (port) tscMgmtEpSet.port[0] = port; } void *pDnodeConn = NULL; diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index faa4a2488a..1dbc52efb0 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -458,7 +458,7 @@ void tscBuildVgroupTableInfo(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, SArr break; } } - assert(info.vgInfo.numOfIps != 0); + assert(info.vgInfo.numOfEps != 0); vgTables = taosArrayInit(4, sizeof(STableIdInfo)); info.itemList = vgTables; @@ -1600,8 +1600,8 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p // data in from current vnode is stored in cache and disk uint32_t numOfRowsFromSubquery = trsupport->pExtMemBuffer[idx]->numOfTotalElems + trsupport->localBuffer->num; - tscDebug("%p sub:%p all data retrieved from ip:%s, vgId:%d, numOfRows:%d, orderOfSub:%d", pParentSql, pSql, - pTableMetaInfo->vgroupList->vgroups[0].ipAddr[0].fqdn, pTableMetaInfo->vgroupList->vgroups[0].vgId, + tscDebug("%p sub:%p all data retrieved from ep:%s, vgId:%d, numOfRows:%d, orderOfSub:%d", pParentSql, pSql, + pTableMetaInfo->vgroupList->vgroups[0].epAddr[0].fqdn, pTableMetaInfo->vgroupList->vgroups[0].vgId, numOfRowsFromSubquery, idx); tColModelCompact(pDesc->pColumnModel, trsupport->localBuffer, pDesc->pColumnModel->capacity); @@ -1719,8 +1719,8 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR assert(pRes->numOfRows == numOfRows); int64_t num = atomic_add_fetch_64(&pState->numOfRetrievedRows, numOfRows); - tscDebug("%p sub:%p retrieve numOfRows:%" PRId64 " totalNumOfRows:%" PRIu64 " from ip:%s, orderOfSub:%d", pParentSql, pSql, - pRes->numOfRows, pState->numOfRetrievedRows, pSql->ipList.fqdn[pSql->ipList.inUse], idx); + tscDebug("%p sub:%p retrieve numOfRows:%" PRId64 " totalNumOfRows:%" PRIu64 " from ep:%s, orderOfSub:%d", pParentSql, pSql, + pRes->numOfRows, pState->numOfRetrievedRows, pSql->epSet.fqdn[pSql->epSet.inUse], idx); if (num > tsMaxNumOfOrderedResults && tscIsProjectionQueryOnSTable(pQueryInfo, 0)) { tscError("%p sub:%p num of OrderedRes is too many, max allowed:%" PRId32 " , current:%" PRId64, @@ -1828,8 +1828,8 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) { return; } - tscTrace("%p sub:%p query complete, ip:%s, vgId:%d, orderOfSub:%d, retrieve data", trsupport->pParentSql, pSql, - pVgroup->ipAddr[0].fqdn, pVgroup->vgId, trsupport->subqueryIndex); + tscTrace("%p sub:%p query complete, ep:%s, vgId:%d, orderOfSub:%d, retrieve data", trsupport->pParentSql, pSql, + pVgroup->epAddr[0].fqdn, pVgroup->vgId, trsupport->subqueryIndex); if (pSql->res.qhandle == 0) { // qhandle is NULL, code is TSDB_CODE_SUCCESS means no results generated from this vnode tscRetrieveFromDnodeCallBack(param, pSql, 0); diff --git a/src/client/src/tscSystem.c b/src/client/src/tscSystem.c index 82cc8cc225..7722acf0d0 100644 --- a/src/client/src/tscSystem.c +++ b/src/client/src/tscSystem.c @@ -41,7 +41,7 @@ int tscNumOfThreads; static pthread_once_t tscinit = PTHREAD_ONCE_INIT; void taosInitNote(int numOfNoteLines, int maxNotes, char* lable); -void tscUpdateIpSet(void *ahandle, SRpcIpSet *pIpSet); +void tscUpdateEpSet(void *ahandle, SRpcEpSet *pEpSet); void tscCheckDiskUsage(void *UNUSED_PARAM(para), void* UNUSED_PARAM(param)) { taosGetDisk(); @@ -116,8 +116,8 @@ void taos_init_imp() { taosInitNote(tsNumOfLogLines / 10, 1, (char*)"tsc_note"); } - if (tscSetMgmtIpListFromCfg(tsFirst, tsSecond) < 0) { - tscError("failed to init mnode IP list"); + if (tscSetMgmtEpSetFromCfg(tsFirst, tsSecond) < 0) { + tscError("failed to init mnode EP list"); return; } diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 49d15f6499..4e6133663e 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -2145,17 +2145,17 @@ char* strdup_throw(const char* str) { return p; } -int tscSetMgmtIpListFromCfg(const char *first, const char *second) { - tscMgmtIpSet.numOfIps = 0; - tscMgmtIpSet.inUse = 0; +int tscSetMgmtEpSetFromCfg(const char *first, const char *second) { + tscMgmtEpSet.numOfEps = 0; + tscMgmtEpSet.inUse = 0; if (first && first[0] != 0) { if (strlen(first) >= TSDB_EP_LEN) { terrno = TSDB_CODE_TSC_INVALID_FQDN; return -1; } - taosGetFqdnPortFromEp(first, tscMgmtIpSet.fqdn[tscMgmtIpSet.numOfIps], &tscMgmtIpSet.port[tscMgmtIpSet.numOfIps]); - tscMgmtIpSet.numOfIps++; + taosGetFqdnPortFromEp(first, tscMgmtEpSet.fqdn[tscMgmtEpSet.numOfEps], &tscMgmtEpSet.port[tscMgmtEpSet.numOfEps]); + tscMgmtEpSet.numOfEps++; } if (second && second[0] != 0) { @@ -2163,11 +2163,11 @@ int tscSetMgmtIpListFromCfg(const char *first, const char *second) { terrno = TSDB_CODE_TSC_INVALID_FQDN; return -1; } - taosGetFqdnPortFromEp(second, tscMgmtIpSet.fqdn[tscMgmtIpSet.numOfIps], &tscMgmtIpSet.port[tscMgmtIpSet.numOfIps]); - tscMgmtIpSet.numOfIps++; + taosGetFqdnPortFromEp(second, tscMgmtEpSet.fqdn[tscMgmtEpSet.numOfEps], &tscMgmtEpSet.port[tscMgmtEpSet.numOfEps]); + tscMgmtEpSet.numOfEps++; } - if ( tscMgmtIpSet.numOfIps == 0) { + if ( tscMgmtEpSet.numOfEps == 0) { terrno = TSDB_CODE_TSC_INVALID_FQDN; return -1; } diff --git a/src/dnode/inc/dnodeMgmt.h b/src/dnode/inc/dnodeMgmt.h index 092c06d84b..e8f4a0823f 100644 --- a/src/dnode/inc/dnodeMgmt.h +++ b/src/dnode/inc/dnodeMgmt.h @@ -35,8 +35,8 @@ void* dnodeGetVnodeTsdb(void *pVnode); void dnodeReleaseVnode(void *pVnode); void dnodeSendRedirectMsg(SRpcMsg *rpcMsg, bool forShell); -void dnodeGetMnodeIpSetForPeer(void *ipSet); -void dnodeGetMnodeIpSetForShell(void *ipSet); +void dnodeGetMnodeEpSetForPeer(void *epSet); +void dnodeGetMnodeEpSetForShell(void *epSet); #ifdef __cplusplus } diff --git a/src/dnode/src/dnodeMgmt.c b/src/dnode/src/dnodeMgmt.c index 7d3ef9926d..9050b9c582 100644 --- a/src/dnode/src/dnodeMgmt.c +++ b/src/dnode/src/dnodeMgmt.c @@ -52,7 +52,7 @@ void * tsDnodeTmr = NULL; static void * tsStatusTimer = NULL; static uint32_t tsRebootTime; -static SRpcIpSet tsDMnodeIpSet = {0}; +static SRpcEpSet tsDMnodeEpSet = {0}; static SDMMnodeInfos tsDMnodeInfos = {0}; static SDMDnodeCfg tsDnodeCfg = {0}; static taos_qset tsMgmtQset = NULL; @@ -90,21 +90,21 @@ int32_t dnodeInitMgmt() { tsRebootTime = taosGetTimestampSec(); if (!dnodeReadMnodeInfos()) { - memset(&tsDMnodeIpSet, 0, sizeof(SRpcIpSet)); + memset(&tsDMnodeEpSet, 0, sizeof(SRpcEpSet)); memset(&tsDMnodeInfos, 0, sizeof(SDMMnodeInfos)); - tsDMnodeIpSet.numOfIps = 1; - taosGetFqdnPortFromEp(tsFirst, tsDMnodeIpSet.fqdn[0], &tsDMnodeIpSet.port[0]); + tsDMnodeEpSet.numOfEps = 1; + taosGetFqdnPortFromEp(tsFirst, tsDMnodeEpSet.fqdn[0], &tsDMnodeEpSet.port[0]); if (strcmp(tsSecond, tsFirst) != 0) { - tsDMnodeIpSet.numOfIps = 2; - taosGetFqdnPortFromEp(tsSecond, tsDMnodeIpSet.fqdn[1], &tsDMnodeIpSet.port[1]); + tsDMnodeEpSet.numOfEps = 2; + taosGetFqdnPortFromEp(tsSecond, tsDMnodeEpSet.fqdn[1], &tsDMnodeEpSet.port[1]); } } else { - tsDMnodeIpSet.inUse = tsDMnodeInfos.inUse; - tsDMnodeIpSet.numOfIps = tsDMnodeInfos.nodeNum; + tsDMnodeEpSet.inUse = tsDMnodeInfos.inUse; + tsDMnodeEpSet.numOfEps = tsDMnodeInfos.nodeNum; for (int32_t i = 0; i < tsDMnodeInfos.nodeNum; i++) { - taosGetFqdnPortFromEp(tsDMnodeInfos.nodeInfos[i].nodeEp, tsDMnodeIpSet.fqdn[i], &tsDMnodeIpSet.port[i]); + taosGetFqdnPortFromEp(tsDMnodeInfos.nodeInfos[i].nodeEp, tsDMnodeEpSet.fqdn[i], &tsDMnodeEpSet.port[i]); } } @@ -450,27 +450,27 @@ static int32_t dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg) { return taosCfgDynamicOptions(pCfg->config); } -void dnodeUpdateMnodeIpSetForPeer(SRpcIpSet *pIpSet) { - dInfo("mnode IP list for is changed, numOfIps:%d inUse:%d", pIpSet->numOfIps, pIpSet->inUse); - for (int i = 0; i < pIpSet->numOfIps; ++i) { - pIpSet->port[i] -= TSDB_PORT_DNODEDNODE; - dInfo("mnode index:%d %s:%u", i, pIpSet->fqdn[i], pIpSet->port[i]) +void dnodeUpdateMnodeEpSetForPeer(SRpcEpSet *pEpSet) { + dInfo("mnode EP list for is changed, numOfEps:%d inUse:%d", pEpSet->numOfEps, pEpSet->inUse); + for (int i = 0; i < pEpSet->numOfEps; ++i) { + pEpSet->port[i] -= TSDB_PORT_DNODEDNODE; + dInfo("mnode index:%d %s:%u", i, pEpSet->fqdn[i], pEpSet->port[i]) } - tsDMnodeIpSet = *pIpSet; + tsDMnodeEpSet = *pEpSet; } -void dnodeGetMnodeIpSetForPeer(void *ipSetRaw) { - SRpcIpSet *ipSet = ipSetRaw; - *ipSet = tsDMnodeIpSet; +void dnodeGetMnodeEpSetForPeer(void *epSetRaw) { + SRpcEpSet *epSet = epSetRaw; + *epSet = tsDMnodeEpSet; - for (int i=0; inumOfIps; ++i) - ipSet->port[i] += TSDB_PORT_DNODEDNODE; + for (int i=0; inumOfEps; ++i) + epSet->port[i] += TSDB_PORT_DNODEDNODE; } -void dnodeGetMnodeIpSetForShell(void *ipSetRaw) { - SRpcIpSet *ipSet = ipSetRaw; - *ipSet = tsDMnodeIpSet; +void dnodeGetMnodeEpSetForShell(void *epSetRaw) { + SRpcEpSet *epSet = epSetRaw; + *epSet = tsDMnodeEpSet; } static void dnodeProcessStatusRsp(SRpcMsg *pMsg) { @@ -536,10 +536,10 @@ static void dnodeUpdateMnodeInfos(SDMMnodeInfos *pMnodes) { dInfo("mnode index:%d, %s", tsDMnodeInfos.nodeInfos[i].nodeId, tsDMnodeInfos.nodeInfos[i].nodeEp); } - tsDMnodeIpSet.inUse = tsDMnodeInfos.inUse; - tsDMnodeIpSet.numOfIps = tsDMnodeInfos.nodeNum; + tsDMnodeEpSet.inUse = tsDMnodeInfos.inUse; + tsDMnodeEpSet.numOfEps = tsDMnodeInfos.nodeNum; for (int32_t i = 0; i < tsDMnodeInfos.nodeNum; i++) { - taosGetFqdnPortFromEp(tsDMnodeInfos.nodeInfos[i].nodeEp, tsDMnodeIpSet.fqdn[i], &tsDMnodeIpSet.port[i]); + taosGetFqdnPortFromEp(tsDMnodeInfos.nodeInfos[i].nodeEp, tsDMnodeEpSet.fqdn[i], &tsDMnodeEpSet.port[i]); } dnodeSaveMnodeInfos(); @@ -549,10 +549,10 @@ static void dnodeUpdateMnodeInfos(SDMMnodeInfos *pMnodes) { static bool dnodeReadMnodeInfos() { char ipFile[TSDB_FILENAME_LEN*2] = {0}; - sprintf(ipFile, "%s/mnodeIpList.json", tsDnodeDir); + sprintf(ipFile, "%s/mnodeEpSet.json", tsDnodeDir); FILE *fp = fopen(ipFile, "r"); if (!fp) { - dDebug("failed to read mnodeIpList.json, file not exist"); + dDebug("failed to read mnodeEpSet.json, file not exist"); return false; } @@ -563,40 +563,40 @@ static bool dnodeReadMnodeInfos() { if (len <= 0) { free(content); fclose(fp); - dError("failed to read mnodeIpList.json, content is null"); + dError("failed to read mnodeEpSet.json, content is null"); return false; } content[len] = 0; cJSON* root = cJSON_Parse(content); if (root == NULL) { - dError("failed to read mnodeIpList.json, invalid json format"); + dError("failed to read mnodeEpSet.json, invalid json format"); goto PARSE_OVER; } cJSON* inUse = cJSON_GetObjectItem(root, "inUse"); if (!inUse || inUse->type != cJSON_Number) { - dError("failed to read mnodeIpList.json, inUse not found"); + dError("failed to read mnodeEpSet.json, inUse not found"); goto PARSE_OVER; } tsDMnodeInfos.inUse = inUse->valueint; cJSON* nodeNum = cJSON_GetObjectItem(root, "nodeNum"); if (!nodeNum || nodeNum->type != cJSON_Number) { - dError("failed to read mnodeIpList.json, nodeNum not found"); + dError("failed to read mnodeEpSet.json, nodeNum not found"); goto PARSE_OVER; } tsDMnodeInfos.nodeNum = nodeNum->valueint; cJSON* nodeInfos = cJSON_GetObjectItem(root, "nodeInfos"); if (!nodeInfos || nodeInfos->type != cJSON_Array) { - dError("failed to read mnodeIpList.json, nodeInfos not found"); + dError("failed to read mnodeEpSet.json, nodeInfos not found"); goto PARSE_OVER; } int size = cJSON_GetArraySize(nodeInfos); if (size != tsDMnodeInfos.nodeNum) { - dError("failed to read mnodeIpList.json, nodeInfos size not matched"); + dError("failed to read mnodeEpSet.json, nodeInfos size not matched"); goto PARSE_OVER; } @@ -606,14 +606,14 @@ static bool dnodeReadMnodeInfos() { cJSON *nodeId = cJSON_GetObjectItem(nodeInfo, "nodeId"); if (!nodeId || nodeId->type != cJSON_Number) { - dError("failed to read mnodeIpList.json, nodeId not found"); + dError("failed to read mnodeEpSet.json, nodeId not found"); goto PARSE_OVER; } tsDMnodeInfos.nodeInfos[i].nodeId = nodeId->valueint; cJSON *nodeEp = cJSON_GetObjectItem(nodeInfo, "nodeEp"); if (!nodeEp || nodeEp->type != cJSON_String || nodeEp->valuestring == NULL) { - dError("failed to read mnodeIpList.json, nodeName not found"); + dError("failed to read mnodeEpSet.json, nodeName not found"); goto PARSE_OVER; } strncpy(tsDMnodeInfos.nodeInfos[i].nodeEp, nodeEp->valuestring, TSDB_EP_LEN); @@ -621,7 +621,7 @@ static bool dnodeReadMnodeInfos() { ret = true; - dInfo("read mnode iplist successed, numOfIps:%d inUse:%d", tsDMnodeInfos.nodeNum, tsDMnodeInfos.inUse); + dInfo("read mnode epSet successed, numOfEps:%d inUse:%d", tsDMnodeInfos.nodeNum, tsDMnodeInfos.inUse); for (int32_t i = 0; i < tsDMnodeInfos.nodeNum; i++) { dInfo("mnode:%d, %s", tsDMnodeInfos.nodeInfos[i].nodeId, tsDMnodeInfos.nodeInfos[i].nodeEp); } @@ -635,7 +635,7 @@ PARSE_OVER: static void dnodeSaveMnodeInfos() { char ipFile[TSDB_FILENAME_LEN] = {0}; - sprintf(ipFile, "%s/mnodeIpList.json", tsDnodeDir); + sprintf(ipFile, "%s/mnodeEpSet.json", tsDnodeDir); FILE *fp = fopen(ipFile, "w"); if (!fp) return; @@ -663,11 +663,11 @@ static void dnodeSaveMnodeInfos() { fclose(fp); free(content); - dInfo("save mnode iplist successed"); + dInfo("save mnode epSet successed"); } char *dnodeGetMnodeMasterEp() { - return tsDMnodeInfos.nodeInfos[tsDMnodeIpSet.inUse].nodeEp; + return tsDMnodeInfos.nodeInfos[tsDMnodeEpSet.inUse].nodeEp; } void* dnodeGetMnodeInfos() { @@ -726,9 +726,9 @@ static void dnodeSendStatusMsg(void *handle, void *tmrId) { .msgType = TSDB_MSG_TYPE_DM_STATUS }; - SRpcIpSet ipSet; - dnodeGetMnodeIpSetForPeer(&ipSet); - dnodeSendMsgToDnode(&ipSet, &rpcMsg); + SRpcEpSet epSet; + dnodeGetMnodeEpSetForPeer(&epSet); + dnodeSendMsgToDnode(&epSet, &rpcMsg); } static bool dnodeReadDnodeCfg() { @@ -817,20 +817,20 @@ void dnodeSendRedirectMsg(SRpcMsg *rpcMsg, bool forShell) { SRpcConnInfo connInfo = {0}; rpcGetConnInfo(rpcMsg->handle, &connInfo); - SRpcIpSet ipSet = {0}; + SRpcEpSet epSet = {0}; if (forShell) { - dnodeGetMnodeIpSetForShell(&ipSet); + dnodeGetMnodeEpSetForShell(&epSet); } else { - dnodeGetMnodeIpSetForPeer(&ipSet); + dnodeGetMnodeEpSetForPeer(&epSet); } - dDebug("msg:%s will be redirected, dnodeIp:%s user:%s, numOfIps:%d inUse:%d", taosMsg[rpcMsg->msgType], - taosIpStr(connInfo.clientIp), connInfo.user, ipSet.numOfIps, ipSet.inUse); + dDebug("msg:%s will be redirected, dnodeIp:%s user:%s, numOfEps:%d inUse:%d", taosMsg[rpcMsg->msgType], + taosIpStr(connInfo.clientIp), connInfo.user, epSet.numOfEps, epSet.inUse); - for (int i = 0; i < ipSet.numOfIps; ++i) { - dDebug("mnode index:%d %s:%d", i, ipSet.fqdn[i], ipSet.port[i]); - ipSet.port[i] = htons(ipSet.port[i]); + for (int i = 0; i < epSet.numOfEps; ++i) { + dDebug("mnode index:%d %s:%d", i, epSet.fqdn[i], epSet.port[i]); + epSet.port[i] = htons(epSet.port[i]); } - rpcSendRedirectRsp(rpcMsg->handle, &ipSet); + rpcSendRedirectRsp(rpcMsg->handle, &epSet); } diff --git a/src/dnode/src/dnodePeer.c b/src/dnode/src/dnodePeer.c index 2a3436583f..b27f56a871 100644 --- a/src/dnode/src/dnodePeer.c +++ b/src/dnode/src/dnodePeer.c @@ -29,11 +29,11 @@ #include "dnodeVWrite.h" #include "dnodeMPeer.h" -extern void dnodeUpdateMnodeIpSetForPeer(SRpcIpSet *pIpSet); +extern void dnodeUpdateMnodeEpSetForPeer(SRpcEpSet *pEpSet); static void (*dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *); -static void dnodeProcessReqMsgFromDnode(SRpcMsg *pMsg, SRpcIpSet *); +static void dnodeProcessReqMsgFromDnode(SRpcMsg *pMsg, SRpcEpSet *); static void (*dnodeProcessRspMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *rpcMsg); -static void dnodeProcessRspFromDnode(SRpcMsg *pMsg, SRpcIpSet *pIpSet); +static void dnodeProcessRspFromDnode(SRpcMsg *pMsg, SRpcEpSet *pEpSet); static void *tsDnodeServerRpc = NULL; static void *tsDnodeClientRpc = NULL; @@ -83,7 +83,7 @@ void dnodeCleanupServer() { } } -static void dnodeProcessReqMsgFromDnode(SRpcMsg *pMsg, SRpcIpSet *pIpSet) { +static void dnodeProcessReqMsgFromDnode(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { SRpcMsg rspMsg = { .handle = pMsg->handle, .pCont = NULL, @@ -148,9 +148,9 @@ void dnodeCleanupClient() { } } -static void dnodeProcessRspFromDnode(SRpcMsg *pMsg, SRpcIpSet *pIpSet) { - if (pMsg->msgType == TSDB_MSG_TYPE_DM_STATUS_RSP && pIpSet) { - dnodeUpdateMnodeIpSetForPeer(pIpSet); +static void dnodeProcessRspFromDnode(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { + if (pMsg->msgType == TSDB_MSG_TYPE_DM_STATUS_RSP && pEpSet) { + dnodeUpdateMnodeEpSetForPeer(pEpSet); } if (dnodeProcessRspMsgFp[pMsg->msgType]) { @@ -166,12 +166,12 @@ void dnodeAddClientRspHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg)) { dnodeProcessRspMsgFp[msgType] = fp; } -void dnodeSendMsgToDnode(SRpcIpSet *ipSet, SRpcMsg *rpcMsg) { - rpcSendRequest(tsDnodeClientRpc, ipSet, rpcMsg); +void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg) { + rpcSendRequest(tsDnodeClientRpc, epSet, rpcMsg); } void dnodeSendMsgToDnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp) { - SRpcIpSet ipSet = {0}; - dnodeGetMnodeIpSetForPeer(&ipSet); - rpcSendRecv(tsDnodeClientRpc, &ipSet, rpcMsg, rpcRsp); + SRpcEpSet epSet = {0}; + dnodeGetMnodeEpSetForPeer(&epSet); + rpcSendRecv(tsDnodeClientRpc, &epSet, rpcMsg, rpcRsp); } diff --git a/src/dnode/src/dnodeShell.c b/src/dnode/src/dnodeShell.c index d089495079..f9d137bb99 100644 --- a/src/dnode/src/dnodeShell.c +++ b/src/dnode/src/dnodeShell.c @@ -31,7 +31,7 @@ #include "dnodeShell.h" static void (*dnodeProcessShellMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *); -static void dnodeProcessMsgFromShell(SRpcMsg *pMsg, SRpcIpSet *); +static void dnodeProcessMsgFromShell(SRpcMsg *pMsg, SRpcEpSet *); static int dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey); static void * tsDnodeShellRpc = NULL; static int32_t tsDnodeQueryReqNum = 0; @@ -108,7 +108,7 @@ void dnodeCleanupShell() { } } -void dnodeProcessMsgFromShell(SRpcMsg *pMsg, SRpcIpSet *pIpSet) { +void dnodeProcessMsgFromShell(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { SRpcMsg rpcMsg = { .handle = pMsg->handle, .pCont = NULL, diff --git a/src/inc/dnode.h b/src/inc/dnode.h index b561c407a3..5a059c93a6 100644 --- a/src/inc/dnode.h +++ b/src/inc/dnode.h @@ -39,13 +39,13 @@ SDnodeStatisInfo dnodeGetStatisInfo(); bool dnodeIsFirstDeploy(); char * dnodeGetMnodeMasterEp(); -void dnodeGetMnodeIpSetForPeer(void *ipSet); -void dnodeGetMnodeIpSetForShell(void *ipSet); +void dnodeGetMnodeEpSetForPeer(void *epSet); +void dnodeGetMnodeEpSetForShell(void *epSet); void * dnodeGetMnodeInfos(); int32_t dnodeGetDnodeId(); void dnodeAddClientRspHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg)); -void dnodeSendMsgToDnode(SRpcIpSet *ipSet, SRpcMsg *rpcMsg); +void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg); void dnodeSendMsgToDnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp); void *dnodeSendCfgTableToRecv(int32_t vgId, int32_t sid); diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index a367d6b93b..96386aab65 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -176,7 +176,7 @@ extern char *taosMsg[]; typedef struct { char fqdn[TSDB_FQDN_LEN]; uint16_t port; -} SIpAddr; +} SEpAddr; typedef struct { int32_t numOfVnodes; @@ -306,7 +306,7 @@ typedef struct { int8_t reserved1; int8_t reserved2; int32_t connId; - SRpcIpSet ipList; + SRpcEpSet epSet; } SCMConnectRsp; typedef struct { @@ -648,8 +648,8 @@ typedef struct SCMSTableVgroupMsg { typedef struct { int32_t vgId; - int8_t numOfIps; - SIpAddr ipAddr[TSDB_MAX_REPLICA]; + int8_t numOfEps; + SEpAddr epAddr[TSDB_MAX_REPLICA]; } SCMVgroupInfo; typedef struct { @@ -753,7 +753,7 @@ typedef struct { uint32_t onlineDnodes; uint32_t connId; int8_t killConnection; - SRpcIpSet ipList; + SRpcEpSet epSet; } SCMHeartBeatRsp; typedef struct { diff --git a/src/inc/trpc.h b/src/inc/trpc.h index d1adfb7494..1af6a5eb0f 100644 --- a/src/inc/trpc.h +++ b/src/inc/trpc.h @@ -28,12 +28,12 @@ extern "C" { extern int tsRpcHeadSize; -typedef struct SRpcIpSet { +typedef struct SRpcEpSet { int8_t inUse; - int8_t numOfIps; + int8_t numOfEps; uint16_t port[TSDB_MAX_REPLICA]; char fqdn[TSDB_MAX_REPLICA][TSDB_FQDN_LEN]; -} SRpcIpSet; +} SRpcEpSet; typedef struct SRpcConnInfo { uint32_t clientIp; @@ -67,7 +67,7 @@ typedef struct SRpcInit { char *ckey; // ciphering key // call back to process incoming msg, code shall be ignored by server app - void (*cfp)(SRpcMsg *, SRpcIpSet *); + void (*cfp)(SRpcMsg *, SRpcEpSet *); // call back to retrieve the client auth info, for server app only int (*afp)(char *tableId, char *spi, char *encrypt, char *secret, char *ckey); @@ -78,11 +78,11 @@ void rpcClose(void *); void *rpcMallocCont(int contLen); void rpcFreeCont(void *pCont); void *rpcReallocCont(void *ptr, int contLen); -void rpcSendRequest(void *thandle, const SRpcIpSet *pIpSet, SRpcMsg *pMsg); +void rpcSendRequest(void *thandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg); void rpcSendResponse(const SRpcMsg *pMsg); -void rpcSendRedirectRsp(void *pConn, const SRpcIpSet *pIpSet); +void rpcSendRedirectRsp(void *pConn, const SRpcEpSet *pEpSet); int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo); -void rpcSendRecv(void *shandle, SRpcIpSet *pIpSet, SRpcMsg *pReq, SRpcMsg *pRsp); +void rpcSendRecv(void *shandle, SRpcEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp); int rpcReportProgress(void *pConn, char *pCont, int contLen); void rpcCancelRequest(void *pContext); diff --git a/src/kit/taosmigrate/taosmigrate.c b/src/kit/taosmigrate/taosmigrate.c index b80ca44a10..3ef73ef169 100644 --- a/src/kit/taosmigrate/taosmigrate.c +++ b/src/kit/taosmigrate/taosmigrate.c @@ -210,10 +210,10 @@ int32_t main(int32_t argc, char *argv[]) { (void)snprintf(mnodeWal, TSDB_FILENAME_LEN*2, "%s/mnode/wal/wal0", arguments.dataDir); walModWalFile(mnodeWal); - // 2. modfiy dnode config: mnodeIpList.json - char dnodeIpList[TSDB_FILENAME_LEN*2] = {0}; - (void)snprintf(dnodeIpList, TSDB_FILENAME_LEN*2, "%s/dnode/mnodeIpList.json", arguments.dataDir); - modDnodeIpList(dnodeIpList); + // 2. modfiy dnode config: mnodeEpSet.json + char dnodeEpSet[TSDB_FILENAME_LEN*2] = {0}; + (void)snprintf(dnodeEpSet, TSDB_FILENAME_LEN*2, "%s/dnode/mnodeEpSet.json", arguments.dataDir); + modDnodeEpSet(dnodeEpSet); // 3. modify vnode config: config.json char vnodeDir[TSDB_FILENAME_LEN*2] = {0}; diff --git a/src/kit/taosmigrate/taosmigrate.h b/src/kit/taosmigrate/taosmigrate.h index a0a02e651c..9fb3c92db2 100644 --- a/src/kit/taosmigrate/taosmigrate.h +++ b/src/kit/taosmigrate/taosmigrate.h @@ -71,7 +71,7 @@ int tSystemShell(const char * cmd); void taosMvFile(char* destFile, char *srcFile) ; void walModWalFile(char* walfile); SdnodeIfo* getDnodeInfo(int32_t dnodeId); -void modDnodeIpList(char* dnodeIpList); +void modDnodeEpSet(char* dnodeEpSet); void modAllVnode(char *vnodeDir); #endif diff --git a/src/kit/taosmigrate/taosmigrateDnodeCfg.c b/src/kit/taosmigrate/taosmigrateDnodeCfg.c index 263d5521e9..7f6fd03fea 100644 --- a/src/kit/taosmigrate/taosmigrateDnodeCfg.c +++ b/src/kit/taosmigrate/taosmigrateDnodeCfg.c @@ -23,10 +23,10 @@ static SDMMnodeInfos tsDnodeIpInfos = {0}; -static bool dnodeReadMnodeInfos(char* dnodeIpList) { - FILE *fp = fopen(dnodeIpList, "r"); +static bool dnodeReadMnodeInfos(char* dnodeEpSet) { + FILE *fp = fopen(dnodeEpSet, "r"); if (!fp) { - printf("failed to read mnodeIpList.json, file not exist\n"); + printf("failed to read mnodeEpSet.json, file not exist\n"); return false; } @@ -37,40 +37,40 @@ static bool dnodeReadMnodeInfos(char* dnodeIpList) { if (len <= 0) { free(content); fclose(fp); - printf("failed to read mnodeIpList.json, content is null\n"); + printf("failed to read mnodeEpSet.json, content is null\n"); return false; } content[len] = 0; cJSON* root = cJSON_Parse(content); if (root == NULL) { - printf("failed to read mnodeIpList.json, invalid json format\n"); + printf("failed to read mnodeEpSet.json, invalid json format\n"); goto PARSE_OVER; } cJSON* inUse = cJSON_GetObjectItem(root, "inUse"); if (!inUse || inUse->type != cJSON_Number) { - printf("failed to read mnodeIpList.json, inUse not found\n"); + printf("failed to read mnodeEpSet.json, inUse not found\n"); goto PARSE_OVER; } tsDnodeIpInfos.inUse = inUse->valueint; cJSON* nodeNum = cJSON_GetObjectItem(root, "nodeNum"); if (!nodeNum || nodeNum->type != cJSON_Number) { - printf("failed to read mnodeIpList.json, nodeNum not found\n"); + printf("failed to read mnodeEpSet.json, nodeNum not found\n"); goto PARSE_OVER; } tsDnodeIpInfos.nodeNum = nodeNum->valueint; cJSON* nodeInfos = cJSON_GetObjectItem(root, "nodeInfos"); if (!nodeInfos || nodeInfos->type != cJSON_Array) { - printf("failed to read mnodeIpList.json, nodeInfos not found\n"); + printf("failed to read mnodeEpSet.json, nodeInfos not found\n"); goto PARSE_OVER; } int size = cJSON_GetArraySize(nodeInfos); if (size != tsDnodeIpInfos.nodeNum) { - printf("failed to read mnodeIpList.json, nodeInfos size not matched\n"); + printf("failed to read mnodeEpSet.json, nodeInfos size not matched\n"); goto PARSE_OVER; } @@ -80,14 +80,14 @@ static bool dnodeReadMnodeInfos(char* dnodeIpList) { cJSON *nodeId = cJSON_GetObjectItem(nodeInfo, "nodeId"); if (!nodeId || nodeId->type != cJSON_Number) { - printf("failed to read mnodeIpList.json, nodeId not found\n"); + printf("failed to read mnodeEpSet.json, nodeId not found\n"); goto PARSE_OVER; } tsDnodeIpInfos.nodeInfos[i].nodeId = nodeId->valueint; cJSON *nodeEp = cJSON_GetObjectItem(nodeInfo, "nodeEp"); if (!nodeEp || nodeEp->type != cJSON_String || nodeEp->valuestring == NULL) { - printf("failed to read mnodeIpList.json, nodeName not found\n"); + printf("failed to read mnodeEpSet.json, nodeName not found\n"); goto PARSE_OVER; } strncpy(tsDnodeIpInfos.nodeInfos[i].nodeEp, nodeEp->valuestring, TSDB_EP_LEN); @@ -102,7 +102,7 @@ static bool dnodeReadMnodeInfos(char* dnodeIpList) { ret = true; - //printf("read mnode iplist successed, numOfIps:%d inUse:%d\n", tsDnodeIpInfos.nodeNum, tsDnodeIpInfos.inUse); + //printf("read mnode epSet successed, numOfEps:%d inUse:%d\n", tsDnodeIpInfos.nodeNum, tsDnodeIpInfos.inUse); //for (int32_t i = 0; i < tsDnodeIpInfos.nodeNum; i++) { // printf("mnode:%d, %s\n", tsDnodeIpInfos.nodeInfos[i].nodeId, tsDnodeIpInfos.nodeInfos[i].nodeEp); //} @@ -115,8 +115,8 @@ PARSE_OVER: } -static void dnodeSaveMnodeInfos(char* dnodeIpList) { - FILE *fp = fopen(dnodeIpList, "w"); +static void dnodeSaveMnodeInfos(char* dnodeEpSet) { + FILE *fp = fopen(dnodeEpSet, "w"); if (!fp) return; int32_t len = 0; @@ -143,13 +143,13 @@ static void dnodeSaveMnodeInfos(char* dnodeIpList) { fclose(fp); free(content); - printf("mod mnode iplist successed\n"); + printf("mod mnode epSet successed\n"); } -void modDnodeIpList(char* dnodeIpList) +void modDnodeEpSet(char* dnodeEpSet) { - (void)dnodeReadMnodeInfos(dnodeIpList); - dnodeSaveMnodeInfos(dnodeIpList); + (void)dnodeReadMnodeInfos(dnodeEpSet); + dnodeSaveMnodeInfos(dnodeEpSet); return; } diff --git a/src/mnode/inc/mnodeMnode.h b/src/mnode/inc/mnodeMnode.h index 1060907234..0976ea8acd 100644 --- a/src/mnode/inc/mnodeMnode.h +++ b/src/mnode/inc/mnodeMnode.h @@ -42,12 +42,12 @@ void mnodeIncMnodeRef(struct SMnodeObj *pMnode); void mnodeDecMnodeRef(struct SMnodeObj *pMnode); char * mnodeGetMnodeRoleStr(); -void mnodeGetMnodeIpSetForPeer(SRpcIpSet *ipSet); -void mnodeGetMnodeIpSetForShell(SRpcIpSet *ipSet); +void mnodeGetMnodeEpSetForPeer(SRpcEpSet *epSet); +void mnodeGetMnodeEpSetForShell(SRpcEpSet *epSet); char* mnodeGetMnodeMasterEp(); void mnodeGetMnodeInfos(void *mnodes); -void mnodeUpdateMnodeIpSet(); +void mnodeUpdateMnodeEpSet(); #ifdef __cplusplus } diff --git a/src/mnode/inc/mnodeVgroup.h b/src/mnode/inc/mnodeVgroup.h index 3f1da89605..9c5b201e93 100644 --- a/src/mnode/inc/mnodeVgroup.h +++ b/src/mnode/inc/mnodeVgroup.h @@ -44,12 +44,12 @@ int32_t mnodeGetAvailableVgroup(struct SMnodeMsg *pMsg, SVgObj **pVgroup, int32_ void mnodeAddTableIntoVgroup(SVgObj *pVgroup, SChildTableObj *pTable); void mnodeRemoveTableFromVgroup(SVgObj *pVgroup, SChildTableObj *pTable); -void mnodeSendDropVnodeMsg(int32_t vgId, SRpcIpSet *ipSet, void *ahandle); +void mnodeSendDropVnodeMsg(int32_t vgId, SRpcEpSet *epSet, void *ahandle); void mnodeSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle); void mnodeSendAlterVgroupMsg(SVgObj *pVgroup); -SRpcIpSet mnodeGetIpSetFromVgroup(SVgObj *pVgroup); -SRpcIpSet mnodeGetIpSetFromIp(char *ep); +SRpcEpSet mnodeGetEpSetFromVgroup(SVgObj *pVgroup); +SRpcEpSet mnodeGetEpSetFromIp(char *ep); #ifdef __cplusplus } diff --git a/src/mnode/src/mnodeDnode.c b/src/mnode/src/mnodeDnode.c index 9085f1a9f7..7edba8662e 100644 --- a/src/mnode/src/mnodeDnode.c +++ b/src/mnode/src/mnodeDnode.c @@ -289,14 +289,14 @@ static int32_t mnodeProcessCfgDnodeMsg(SMnodeMsg *pMsg) { } } - SRpcIpSet ipSet = mnodeGetIpSetFromIp(pCmCfgDnode->ep); + SRpcEpSet epSet = mnodeGetEpSetFromIp(pCmCfgDnode->ep); if (dnodeId != 0) { SDnodeObj *pDnode = mnodeGetDnode(dnodeId); if (pDnode == NULL) { mError("failed to cfg dnode, invalid dnodeId:%d", dnodeId); return TSDB_CODE_MND_DNODE_NOT_EXIST; } - ipSet = mnodeGetIpSetFromIp(pDnode->dnodeEp); + epSet = mnodeGetEpSetFromIp(pDnode->dnodeEp); mnodeDecDnodeRef(pDnode); } @@ -313,7 +313,7 @@ static int32_t mnodeProcessCfgDnodeMsg(SMnodeMsg *pMsg) { }; mInfo("dnode:%s, is configured by %s", pCmCfgDnode->ep, pMsg->pUser->user); - dnodeSendMsgToDnode(&ipSet, &rpcMdCfgDnodeMsg); + dnodeSendMsgToDnode(&epSet, &rpcMdCfgDnodeMsg); return TSDB_CODE_SUCCESS; } @@ -399,9 +399,9 @@ static int32_t mnodeProcessDnodeStatusMsg(SMnodeMsg *pMsg) { SVgObj *pVgroup = mnodeGetVgroup(pVload->vgId); if (pVgroup == NULL) { - SRpcIpSet ipSet = mnodeGetIpSetFromIp(pDnode->dnodeEp); + SRpcEpSet epSet = mnodeGetEpSetFromIp(pDnode->dnodeEp); mInfo("dnode:%d, vgId:%d not exist in mnode, drop it", pDnode->dnodeId, pVload->vgId); - mnodeSendDropVnodeMsg(pVload->vgId, &ipSet, NULL); + mnodeSendDropVnodeMsg(pVload->vgId, &epSet, NULL); } else { mnodeUpdateVgroupStatus(pVgroup, pDnode, pVload); pAccess->vgId = htonl(pVload->vgId); diff --git a/src/mnode/src/mnodeMnode.c b/src/mnode/src/mnodeMnode.c index f74de2b325..d2f389ca0b 100644 --- a/src/mnode/src/mnodeMnode.c +++ b/src/mnode/src/mnodeMnode.c @@ -35,8 +35,8 @@ static void * tsMnodeSdb = NULL; static int32_t tsMnodeUpdateSize = 0; -static SRpcIpSet tsMnodeIpSetForShell; -static SRpcIpSet tsMnodeIpSetForPeer; +static SRpcEpSet tsMnodeEpSetForShell; +static SRpcEpSet tsMnodeEpSetForPeer; static SDMMnodeInfos tsMnodeInfos; static int32_t mnodeGetMnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); static int32_t mnodeRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn); @@ -123,7 +123,7 @@ static int32_t mnodeMnodeActionRestored() { sdbFreeIter(pIter); } - mnodeUpdateMnodeIpSet(); + mnodeUpdateMnodeEpSet(); return TSDB_CODE_SUCCESS; } @@ -204,13 +204,13 @@ char *mnodeGetMnodeRoleStr(int32_t role) { } } -void mnodeUpdateMnodeIpSet() { - mInfo("update mnodes ipset, numOfIps:%d ", mnodeGetMnodesNum()); +void mnodeUpdateMnodeEpSet() { + mInfo("update mnodes epSet, numOfEps:%d ", mnodeGetMnodesNum()); mnodeMnodeWrLock(); - memset(&tsMnodeIpSetForShell, 0, sizeof(SRpcIpSet)); - memset(&tsMnodeIpSetForPeer, 0, sizeof(SRpcIpSet)); + memset(&tsMnodeEpSetForShell, 0, sizeof(SRpcEpSet)); + memset(&tsMnodeEpSetForPeer, 0, sizeof(SRpcEpSet)); memset(&tsMnodeInfos, 0, sizeof(SDMMnodeInfos)); int32_t index = 0; @@ -222,20 +222,20 @@ void mnodeUpdateMnodeIpSet() { SDnodeObj *pDnode = mnodeGetDnode(pMnode->mnodeId); if (pDnode != NULL) { - strcpy(tsMnodeIpSetForShell.fqdn[index], pDnode->dnodeFqdn); - tsMnodeIpSetForShell.port[index] = htons(pDnode->dnodePort); - mDebug("mnode:%d, for shell fqdn:%s %d", pDnode->dnodeId, tsMnodeIpSetForShell.fqdn[index], htons(tsMnodeIpSetForShell.port[index])); + strcpy(tsMnodeEpSetForShell.fqdn[index], pDnode->dnodeFqdn); + tsMnodeEpSetForShell.port[index] = htons(pDnode->dnodePort); + mDebug("mnode:%d, for shell fqdn:%s %d", pDnode->dnodeId, tsMnodeEpSetForShell.fqdn[index], htons(tsMnodeEpSetForShell.port[index])); - strcpy(tsMnodeIpSetForPeer.fqdn[index], pDnode->dnodeFqdn); - tsMnodeIpSetForPeer.port[index] = htons(pDnode->dnodePort + TSDB_PORT_DNODEDNODE); - mDebug("mnode:%d, for peer fqdn:%s %d", pDnode->dnodeId, tsMnodeIpSetForPeer.fqdn[index], htons(tsMnodeIpSetForPeer.port[index])); + strcpy(tsMnodeEpSetForPeer.fqdn[index], pDnode->dnodeFqdn); + tsMnodeEpSetForPeer.port[index] = htons(pDnode->dnodePort + TSDB_PORT_DNODEDNODE); + mDebug("mnode:%d, for peer fqdn:%s %d", pDnode->dnodeId, tsMnodeEpSetForPeer.fqdn[index], htons(tsMnodeEpSetForPeer.port[index])); tsMnodeInfos.nodeInfos[index].nodeId = htonl(pMnode->mnodeId); strcpy(tsMnodeInfos.nodeInfos[index].nodeEp, pDnode->dnodeEp); if (pMnode->role == TAOS_SYNC_ROLE_MASTER) { - tsMnodeIpSetForShell.inUse = index; - tsMnodeIpSetForPeer.inUse = index; + tsMnodeEpSetForShell.inUse = index; + tsMnodeEpSetForPeer.inUse = index; tsMnodeInfos.inUse = index; } @@ -248,23 +248,23 @@ void mnodeUpdateMnodeIpSet() { } tsMnodeInfos.nodeNum = index; - tsMnodeIpSetForShell.numOfIps = index; - tsMnodeIpSetForPeer.numOfIps = index; + tsMnodeEpSetForShell.numOfEps = index; + tsMnodeEpSetForPeer.numOfEps = index; sdbFreeIter(pIter); mnodeMnodeUnLock(); } -void mnodeGetMnodeIpSetForPeer(SRpcIpSet *ipSet) { +void mnodeGetMnodeEpSetForPeer(SRpcEpSet *epSet) { mnodeMnodeRdLock(); - *ipSet = tsMnodeIpSetForPeer; + *epSet = tsMnodeEpSetForPeer; mnodeMnodeUnLock(); } -void mnodeGetMnodeIpSetForShell(SRpcIpSet *ipSet) { +void mnodeGetMnodeEpSetForShell(SRpcEpSet *epSet) { mnodeMnodeRdLock(); - *ipSet = tsMnodeIpSetForShell; + *epSet = tsMnodeEpSetForShell; mnodeMnodeUnLock(); } @@ -295,7 +295,7 @@ int32_t mnodeAddMnode(int32_t dnodeId) { code = TSDB_CODE_MND_SDB_ERROR; } - mnodeUpdateMnodeIpSet(); + mnodeUpdateMnodeEpSet(); return code; } @@ -308,7 +308,7 @@ void mnodeDropMnodeLocal(int32_t dnodeId) { mnodeDecMnodeRef(pMnode); } - mnodeUpdateMnodeIpSet(); + mnodeUpdateMnodeEpSet(); } int32_t mnodeDropMnode(int32_t dnodeId) { @@ -330,7 +330,7 @@ int32_t mnodeDropMnode(int32_t dnodeId) { sdbDecRef(tsMnodeSdb, pMnode); - mnodeUpdateMnodeIpSet(); + mnodeUpdateMnodeEpSet(); return code; } diff --git a/src/mnode/src/mnodePeer.c b/src/mnode/src/mnodePeer.c index 04f2889607..71b8b1ea84 100644 --- a/src/mnode/src/mnodePeer.c +++ b/src/mnode/src/mnodePeer.c @@ -53,14 +53,14 @@ int32_t mnodeProcessPeerReq(SMnodeMsg *pMsg) { if (!sdbIsMaster()) { SMnodeRsp *rpcRsp = &pMsg->rpcRsp; - SRpcIpSet *ipSet = rpcMallocCont(sizeof(SRpcIpSet)); - mnodeGetMnodeIpSetForPeer(ipSet); - rpcRsp->rsp = ipSet; - rpcRsp->len = sizeof(SRpcIpSet); + SRpcEpSet *epSet = rpcMallocCont(sizeof(SRpcEpSet)); + mnodeGetMnodeEpSetForPeer(epSet); + rpcRsp->rsp = epSet; + rpcRsp->len = sizeof(SRpcEpSet); - mDebug("%p, msg:%s in mpeer queue, will be redireced inUse:%d", pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType], ipSet->inUse); - for (int32_t i = 0; i < ipSet->numOfIps; ++i) { - mDebug("mnode index:%d ip:%s:%d", i, ipSet->fqdn[i], htons(ipSet->port[i])); + mDebug("%p, msg:%s in mpeer queue, will be redireced inUse:%d", pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType], epSet->inUse); + for (int32_t i = 0; i < epSet->numOfEps; ++i) { + mDebug("mnode index:%d ep:%s:%d", i, epSet->fqdn[i], htons(epSet->port[i])); } return TSDB_CODE_RPC_REDIRECT; diff --git a/src/mnode/src/mnodeRead.c b/src/mnode/src/mnodeRead.c index dfecf1f693..af2fed3408 100644 --- a/src/mnode/src/mnodeRead.c +++ b/src/mnode/src/mnodeRead.c @@ -49,14 +49,14 @@ int32_t mnodeProcessRead(SMnodeMsg *pMsg) { if (!sdbIsMaster()) { SMnodeRsp *rpcRsp = &pMsg->rpcRsp; - SRpcIpSet *ipSet = rpcMallocCont(sizeof(SRpcIpSet)); - mnodeGetMnodeIpSetForShell(ipSet); - rpcRsp->rsp = ipSet; - rpcRsp->len = sizeof(SRpcIpSet); + SRpcEpSet *epSet = rpcMallocCont(sizeof(SRpcEpSet)); + mnodeGetMnodeEpSetForShell(epSet); + rpcRsp->rsp = epSet; + rpcRsp->len = sizeof(SRpcEpSet); - mDebug("%p, msg:%s in mread queue, will be redireced, inUse:%d", pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType], ipSet->inUse); - for (int32_t i = 0; i < ipSet->numOfIps; ++i) { - mDebug("mnode index:%d ip:%s:%d", i, ipSet->fqdn[i], htons(ipSet->port[i])); + mDebug("%p, msg:%s in mread queue, will be redireced, inUse:%d", pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType], epSet->inUse); + for (int32_t i = 0; i < epSet->numOfEps; ++i) { + mDebug("mnode index:%d ep:%s:%d", i, epSet->fqdn[i], htons(epSet->port[i])); } return TSDB_CODE_RPC_REDIRECT; diff --git a/src/mnode/src/mnodeSdb.c b/src/mnode/src/mnodeSdb.c index e08f8ca0db..0f72dbdec4 100644 --- a/src/mnode/src/mnodeSdb.c +++ b/src/mnode/src/mnodeSdb.c @@ -219,7 +219,7 @@ void sdbUpdateMnodeRoles() { } } - mnodeUpdateMnodeIpSet(); + mnodeUpdateMnodeEpSet(); } static uint32_t sdbGetFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int32_t *size, uint64_t *fversion) { diff --git a/src/mnode/src/mnodeShow.c b/src/mnode/src/mnodeShow.c index a56ad34a25..e3d5b41be3 100644 --- a/src/mnode/src/mnodeShow.c +++ b/src/mnode/src/mnodeShow.c @@ -270,7 +270,7 @@ static int32_t mnodeProcessHeartBeatMsg(SMnodeMsg *pMsg) { pHBRsp->onlineDnodes = htonl(mnodeGetOnlinDnodesNum()); pHBRsp->totalDnodes = htonl(mnodeGetDnodesNum()); - mnodeGetMnodeIpSetForShell(&pHBRsp->ipList); + mnodeGetMnodeEpSetForShell(&pHBRsp->epSet); pMsg->rpcRsp.rsp = pHBRsp; pMsg->rpcRsp.len = sizeof(SCMHeartBeatRsp); @@ -335,7 +335,7 @@ static int32_t mnodeProcessConnectMsg(SMnodeMsg *pMsg) { pConnectRsp->writeAuth = pUser->writeAuth; pConnectRsp->superAuth = pUser->superAuth; - mnodeGetMnodeIpSetForShell(&pConnectRsp->ipList); + mnodeGetMnodeEpSetForShell(&pConnectRsp->epSet); connect_over: if (code != TSDB_CODE_SUCCESS) { diff --git a/src/mnode/src/mnodeTable.c b/src/mnode/src/mnodeTable.c index 80c4fc95ab..75ed442cd4 100644 --- a/src/mnode/src/mnodeTable.c +++ b/src/mnode/src/mnodeTable.c @@ -910,9 +910,9 @@ static int32_t mnodeProcessDropSuperTableMsg(SMnodeMsg *pMsg) { mInfo("app:%p:%p, stable:%s, send drop stable msg to vgId:%d", pMsg->rpcMsg.ahandle, pMsg, pStable->info.tableId, pVgroup->vgId); - SRpcIpSet ipSet = mnodeGetIpSetFromVgroup(pVgroup); + SRpcEpSet epSet = mnodeGetEpSetFromVgroup(pVgroup); SRpcMsg rpcMsg = {.pCont = pDrop, .contLen = sizeof(SMDDropSTableMsg), .msgType = TSDB_MSG_TYPE_MD_DROP_STABLE}; - dnodeSendMsgToDnode(&ipSet, &rpcMsg); + dnodeSendMsgToDnode(&epSet, &rpcMsg); mnodeDecVgroupRef(pVgroup); } taosHashDestroyIter(pIter); @@ -1484,10 +1484,10 @@ static int32_t mnodeProcessSuperTableVgroupMsg(SMnodeMsg *pMsg) { SDnodeObj *pDnode = pVgroup->vnodeGid[vn].pDnode; if (pDnode == NULL) break; - tstrncpy(pVgroupInfo->vgroups[vgSize].ipAddr[vn].fqdn, pDnode->dnodeFqdn, TSDB_FQDN_LEN); - pVgroupInfo->vgroups[vgSize].ipAddr[vn].port = htons(pDnode->dnodePort); + tstrncpy(pVgroupInfo->vgroups[vgSize].epAddr[vn].fqdn, pDnode->dnodeFqdn, TSDB_FQDN_LEN); + pVgroupInfo->vgroups[vgSize].epAddr[vn].port = htons(pDnode->dnodePort); - pVgroupInfo->vgroups[vgSize].numOfIps++; + pVgroupInfo->vgroups[vgSize].numOfEps++; } vgSize++; @@ -1615,7 +1615,7 @@ static int32_t mnodeDoCreateChildTableCb(SMnodeMsg *pMsg, int32_t code) { return terrno; } - SRpcIpSet ipSet = mnodeGetIpSetFromVgroup(pMsg->pVgroup); + SRpcEpSet epSet = mnodeGetEpSetFromVgroup(pMsg->pVgroup); SRpcMsg rpcMsg = { .ahandle = pMsg, .pCont = pMDCreate, @@ -1624,7 +1624,7 @@ static int32_t mnodeDoCreateChildTableCb(SMnodeMsg *pMsg, int32_t code) { .msgType = TSDB_MSG_TYPE_MD_CREATE_TABLE }; - dnodeSendMsgToDnode(&ipSet, &rpcMsg); + dnodeSendMsgToDnode(&epSet, &rpcMsg); return TSDB_CODE_MND_ACTION_IN_PROGRESS; } @@ -1788,7 +1788,7 @@ static int32_t mnodeProcessDropChildTableMsg(SMnodeMsg *pMsg, bool needReturn) { pDrop->sid = htonl(pTable->sid); pDrop->uid = htobe64(pTable->uid); - SRpcIpSet ipSet = mnodeGetIpSetFromVgroup(pMsg->pVgroup); + SRpcEpSet epSet = mnodeGetEpSetFromVgroup(pMsg->pVgroup); mInfo("app:%p:%p, table:%s, send drop ctable msg, vgId:%d sid:%d uid:%" PRIu64, pMsg->rpcMsg.ahandle, pMsg, pDrop->tableId, pTable->vgId, pTable->sid, pTable->uid); @@ -1803,7 +1803,7 @@ static int32_t mnodeProcessDropChildTableMsg(SMnodeMsg *pMsg, bool needReturn) { if (!needReturn) rpcMsg.ahandle = NULL; - dnodeSendMsgToDnode(&ipSet, &rpcMsg); + dnodeSendMsgToDnode(&epSet, &rpcMsg); return TSDB_CODE_MND_ACTION_IN_PROGRESS; } @@ -1842,7 +1842,7 @@ static int32_t mnodeAlterNormalTableColumnCb(SMnodeMsg *pMsg, int32_t code) { } } - SRpcIpSet ipSet = mnodeGetIpSetFromVgroup(pMsg->pVgroup); + SRpcEpSet epSet = mnodeGetEpSetFromVgroup(pMsg->pVgroup); SRpcMsg rpcMsg = { .ahandle = pMsg, .pCont = pMDCreate, @@ -1854,7 +1854,7 @@ static int32_t mnodeAlterNormalTableColumnCb(SMnodeMsg *pMsg, int32_t code) { mDebug("app:%p:%p, ctable %s, send alter column msg to vgId:%d", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId, pMsg->pVgroup->vgId); - dnodeSendMsgToDnode(&ipSet, &rpcMsg); + dnodeSendMsgToDnode(&epSet, &rpcMsg); return TSDB_CODE_MND_ACTION_IN_PROGRESS; } @@ -1996,9 +1996,9 @@ static int32_t mnodeDoGetChildTableMeta(SMnodeMsg *pMsg, STableMetaMsg *pMeta) { for (int32_t i = 0; i < pMsg->pVgroup->numOfVnodes; ++i) { SDnodeObj *pDnode = mnodeGetDnode(pMsg->pVgroup->vnodeGid[i].dnodeId); if (pDnode == NULL) break; - strcpy(pMeta->vgroup.ipAddr[i].fqdn, pDnode->dnodeFqdn); - pMeta->vgroup.ipAddr[i].port = htons(pDnode->dnodePort + TSDB_PORT_DNODESHELL); - pMeta->vgroup.numOfIps++; + strcpy(pMeta->vgroup.epAddr[i].fqdn, pDnode->dnodeFqdn); + pMeta->vgroup.epAddr[i].port = htons(pDnode->dnodePort + TSDB_PORT_DNODESHELL); + pMeta->vgroup.numOfEps++; mnodeDecDnodeRef(pDnode); } pMeta->vgroup.vgId = htonl(pMsg->pVgroup->vgId); diff --git a/src/mnode/src/mnodeVgroup.c b/src/mnode/src/mnodeVgroup.c index 5f24981b50..1de591df7c 100644 --- a/src/mnode/src/mnodeVgroup.c +++ b/src/mnode/src/mnodeVgroup.c @@ -317,9 +317,9 @@ void mnodeUpdateVgroupStatus(SVgObj *pVgroup, SDnodeObj *pDnode, SVnodeLoad *pVl } if (!dnodeExist) { - SRpcIpSet ipSet = mnodeGetIpSetFromIp(pDnode->dnodeEp); + SRpcEpSet epSet = mnodeGetEpSetFromIp(pDnode->dnodeEp); mError("vgId:%d, dnode:%d not exist in mnode, drop it", pVload->vgId, pDnode->dnodeId); - mnodeSendDropVnodeMsg(pVload->vgId, &ipSet, NULL); + mnodeSendDropVnodeMsg(pVload->vgId, &epSet, NULL); return; } @@ -809,29 +809,29 @@ static SMDCreateVnodeMsg *mnodeBuildVnodeMsg(SVgObj *pVgroup) { return pVnode; } -SRpcIpSet mnodeGetIpSetFromVgroup(SVgObj *pVgroup) { - SRpcIpSet ipSet = { - .numOfIps = pVgroup->numOfVnodes, +SRpcEpSet mnodeGetEpSetFromVgroup(SVgObj *pVgroup) { + SRpcEpSet epSet = { + .numOfEps = pVgroup->numOfVnodes, .inUse = 0, }; for (int i = 0; i < pVgroup->numOfVnodes; ++i) { - strcpy(ipSet.fqdn[i], pVgroup->vnodeGid[i].pDnode->dnodeFqdn); - ipSet.port[i] = pVgroup->vnodeGid[i].pDnode->dnodePort + TSDB_PORT_DNODEDNODE; + strcpy(epSet.fqdn[i], pVgroup->vnodeGid[i].pDnode->dnodeFqdn); + epSet.port[i] = pVgroup->vnodeGid[i].pDnode->dnodePort + TSDB_PORT_DNODEDNODE; } - return ipSet; + return epSet; } -SRpcIpSet mnodeGetIpSetFromIp(char *ep) { - SRpcIpSet ipSet; +SRpcEpSet mnodeGetEpSetFromIp(char *ep) { + SRpcEpSet epSet; - ipSet.numOfIps = 1; - ipSet.inUse = 0; - taosGetFqdnPortFromEp(ep, ipSet.fqdn[0], &ipSet.port[0]); - ipSet.port[0] += TSDB_PORT_DNODEDNODE; - return ipSet; + epSet.numOfEps = 1; + epSet.inUse = 0; + taosGetFqdnPortFromEp(ep, epSet.fqdn[0], &epSet.port[0]); + epSet.port[0] += TSDB_PORT_DNODEDNODE; + return epSet; } -static void mnodeSendAlterVnodeMsg(SVgObj *pVgroup, SRpcIpSet *ipSet) { +static void mnodeSendAlterVnodeMsg(SVgObj *pVgroup, SRpcEpSet *epSet) { SMDAlterVnodeMsg *pAlter = mnodeBuildVnodeMsg(pVgroup); SRpcMsg rpcMsg = { .ahandle = NULL, @@ -840,21 +840,21 @@ static void mnodeSendAlterVnodeMsg(SVgObj *pVgroup, SRpcIpSet *ipSet) { .code = 0, .msgType = TSDB_MSG_TYPE_MD_ALTER_VNODE }; - dnodeSendMsgToDnode(ipSet, &rpcMsg); + dnodeSendMsgToDnode(epSet, &rpcMsg); } void mnodeSendAlterVgroupMsg(SVgObj *pVgroup) { mDebug("vgId:%d, send alter all vnodes msg, numOfVnodes:%d db:%s", pVgroup->vgId, pVgroup->numOfVnodes, pVgroup->dbName); for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) { - SRpcIpSet ipSet = mnodeGetIpSetFromIp(pVgroup->vnodeGid[i].pDnode->dnodeEp); + SRpcEpSet epSet = mnodeGetEpSetFromIp(pVgroup->vnodeGid[i].pDnode->dnodeEp); mDebug("vgId:%d, index:%d, send alter vnode msg to dnode %s", pVgroup->vgId, i, pVgroup->vnodeGid[i].pDnode->dnodeEp); - mnodeSendAlterVnodeMsg(pVgroup, &ipSet); + mnodeSendAlterVnodeMsg(pVgroup, &epSet); } } -static void mnodeSendCreateVnodeMsg(SVgObj *pVgroup, SRpcIpSet *ipSet, void *ahandle) { +static void mnodeSendCreateVnodeMsg(SVgObj *pVgroup, SRpcEpSet *epSet, void *ahandle) { SMDCreateVnodeMsg *pCreate = mnodeBuildVnodeMsg(pVgroup); SRpcMsg rpcMsg = { .ahandle = ahandle, @@ -863,17 +863,17 @@ static void mnodeSendCreateVnodeMsg(SVgObj *pVgroup, SRpcIpSet *ipSet, void *aha .code = 0, .msgType = TSDB_MSG_TYPE_MD_CREATE_VNODE }; - dnodeSendMsgToDnode(ipSet, &rpcMsg); + dnodeSendMsgToDnode(epSet, &rpcMsg); } void mnodeSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle) { mDebug("vgId:%d, send create all vnodes msg, numOfVnodes:%d db:%s", pVgroup->vgId, pVgroup->numOfVnodes, pVgroup->dbName); for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) { - SRpcIpSet ipSet = mnodeGetIpSetFromIp(pVgroup->vnodeGid[i].pDnode->dnodeEp); + SRpcEpSet epSet = mnodeGetEpSetFromIp(pVgroup->vnodeGid[i].pDnode->dnodeEp); mDebug("vgId:%d, index:%d, send create vnode msg to dnode %s, ahandle:%p", pVgroup->vgId, i, pVgroup->vnodeGid[i].pDnode->dnodeEp, ahandle); - mnodeSendCreateVnodeMsg(pVgroup, &ipSet, ahandle); + mnodeSendCreateVnodeMsg(pVgroup, &epSet, ahandle); } } @@ -926,7 +926,7 @@ static SMDDropVnodeMsg *mnodeBuildDropVnodeMsg(int32_t vgId) { return pDrop; } -void mnodeSendDropVnodeMsg(int32_t vgId, SRpcIpSet *ipSet, void *ahandle) { +void mnodeSendDropVnodeMsg(int32_t vgId, SRpcEpSet *epSet, void *ahandle) { SMDDropVnodeMsg *pDrop = mnodeBuildDropVnodeMsg(vgId); SRpcMsg rpcMsg = { .ahandle = ahandle, @@ -935,16 +935,16 @@ void mnodeSendDropVnodeMsg(int32_t vgId, SRpcIpSet *ipSet, void *ahandle) { .code = 0, .msgType = TSDB_MSG_TYPE_MD_DROP_VNODE }; - dnodeSendMsgToDnode(ipSet, &rpcMsg); + dnodeSendMsgToDnode(epSet, &rpcMsg); } static void mnodeSendDropVgroupMsg(SVgObj *pVgroup, void *ahandle) { pVgroup->status = TAOS_VG_STATUS_DROPPING; // deleting mDebug("vgId:%d, send drop all vnodes msg, ahandle:%p", pVgroup->vgId, ahandle); for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) { - SRpcIpSet ipSet = mnodeGetIpSetFromIp(pVgroup->vnodeGid[i].pDnode->dnodeEp); + SRpcEpSet epSet = mnodeGetEpSetFromIp(pVgroup->vnodeGid[i].pDnode->dnodeEp); mDebug("vgId:%d, send drop vnode msg to dnode:%d, ahandle:%p", pVgroup->vgId, pVgroup->vnodeGid[i].dnodeId, ahandle); - mnodeSendDropVnodeMsg(pVgroup->vgId, &ipSet, ahandle); + mnodeSendDropVnodeMsg(pVgroup->vgId, &epSet, ahandle); } } @@ -998,8 +998,8 @@ static int32_t mnodeProcessVnodeCfgMsg(SMnodeMsg *pMsg) { } mDebug("vgId:%d, send create vnode msg to dnode %s for vnode cfg msg", pVgroup->vgId, pDnode->dnodeEp); - SRpcIpSet ipSet = mnodeGetIpSetFromIp(pDnode->dnodeEp); - mnodeSendCreateVnodeMsg(pVgroup, &ipSet, NULL); + SRpcEpSet epSet = mnodeGetEpSetFromIp(pDnode->dnodeEp); + mnodeSendCreateVnodeMsg(pVgroup, &epSet, NULL); mnodeDecDnodeRef(pDnode); mnodeDecVgroupRef(pVgroup); diff --git a/src/mnode/src/mnodeWrite.c b/src/mnode/src/mnodeWrite.c index 326ed50981..ab3cfa2dad 100644 --- a/src/mnode/src/mnodeWrite.c +++ b/src/mnode/src/mnodeWrite.c @@ -49,16 +49,16 @@ int32_t mnodeProcessWrite(SMnodeMsg *pMsg) { if (!sdbIsMaster()) { SMnodeRsp *rpcRsp = &pMsg->rpcRsp; - SRpcIpSet *ipSet = rpcMallocCont(sizeof(SRpcIpSet)); - mnodeGetMnodeIpSetForShell(ipSet); - rpcRsp->rsp = ipSet; - rpcRsp->len = sizeof(SRpcIpSet); + SRpcEpSet *epSet = rpcMallocCont(sizeof(SRpcEpSet)); + mnodeGetMnodeEpSetForShell(epSet); + rpcRsp->rsp = epSet; + rpcRsp->len = sizeof(SRpcEpSet); mDebug("app:%p:%p, msg:%s will be redireced inUse:%d", pMsg->rpcMsg.ahandle, pMsg, taosMsg[pMsg->rpcMsg.msgType], - ipSet->inUse); - for (int32_t i = 0; i < ipSet->numOfIps; ++i) { - mDebug("app:%p:%p, mnode index:%d ip:%s:%d", pMsg->rpcMsg.ahandle, pMsg, i, ipSet->fqdn[i], - htons(ipSet->port[i])); + epSet->inUse); + for (int32_t i = 0; i < epSet->numOfEps; ++i) { + mDebug("app:%p:%p, mnode index:%d ep:%s:%d", pMsg->rpcMsg.ahandle, pMsg, i, epSet->fqdn[i], + htons(epSet->port[i])); } return TSDB_CODE_RPC_REDIRECT; diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index 5d67d5e615..2325d12d92 100644 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -55,7 +55,7 @@ typedef struct { char secret[TSDB_KEY_LEN]; // secret for the link char ckey[TSDB_KEY_LEN]; // ciphering key - void (*cfp)(SRpcMsg *, SRpcIpSet *); + void (*cfp)(SRpcMsg *, SRpcEpSet *); int (*afp)(char *user, char *spi, char *encrypt, char *secret, char *ckey); int32_t refCount; @@ -71,7 +71,7 @@ typedef struct { typedef struct { SRpcInfo *pRpc; // associated SRpcInfo - SRpcIpSet ipSet; // ip list provided by app + SRpcEpSet epSet; // ip list provided by app void *ahandle; // handle provided by app void *signature; // for validation struct SRpcConn *pConn; // pConn allocated @@ -80,12 +80,12 @@ typedef struct { int32_t contLen; // content length int32_t code; // error code int16_t numOfTry; // number of try for different servers - int8_t oldInUse; // server IP inUse passed by app + int8_t oldInUse; // server EP inUse passed by app int8_t redirect; // flag to indicate redirect int8_t connType; // connection type SRpcMsg *pRsp; // for synchronous API tsem_t *pSem; // for synchronous API - SRpcIpSet *pSet; // for synchronous API + SRpcEpSet *pSet; // for synchronous API char msg[0]; // RpcHead starts from here } SRpcReqContext; @@ -355,7 +355,7 @@ void *rpcReallocCont(void *ptr, int contLen) { return start + sizeof(SRpcReqContext) + sizeof(SRpcHead); } -void rpcSendRequest(void *shandle, const SRpcIpSet *pIpSet, SRpcMsg *pMsg) { +void rpcSendRequest(void *shandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg) { SRpcInfo *pRpc = (SRpcInfo *)shandle; SRpcReqContext *pContext; @@ -364,11 +364,11 @@ void rpcSendRequest(void *shandle, const SRpcIpSet *pIpSet, SRpcMsg *pMsg) { pContext->ahandle = pMsg->ahandle; pContext->signature = pContext; pContext->pRpc = (SRpcInfo *)shandle; - pContext->ipSet = *pIpSet; + pContext->epSet = *pEpSet; pContext->contLen = contLen; pContext->pCont = pMsg->pCont; pContext->msgType = pMsg->msgType; - pContext->oldInUse = pIpSet->inUse; + pContext->oldInUse = pEpSet->inUse; pContext->connType = RPC_CONN_UDPC; if (contLen > tsRpcMaxUdpSize) pContext->connType = RPC_CONN_TCPC; @@ -458,15 +458,15 @@ void rpcSendResponse(const SRpcMsg *pRsp) { return; } -void rpcSendRedirectRsp(void *thandle, const SRpcIpSet *pIpSet) { +void rpcSendRedirectRsp(void *thandle, const SRpcEpSet *pEpSet) { SRpcMsg rpcMsg; memset(&rpcMsg, 0, sizeof(rpcMsg)); - rpcMsg.contLen = sizeof(SRpcIpSet); + rpcMsg.contLen = sizeof(SRpcEpSet); rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen); if (rpcMsg.pCont == NULL) return; - memcpy(rpcMsg.pCont, pIpSet, sizeof(SRpcIpSet)); + memcpy(rpcMsg.pCont, pEpSet, sizeof(SRpcEpSet)); rpcMsg.code = TSDB_CODE_RPC_REDIRECT; rpcMsg.handle = thandle; @@ -488,7 +488,7 @@ int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo) { return 0; } -void rpcSendRecv(void *shandle, SRpcIpSet *pIpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) { +void rpcSendRecv(void *shandle, SRpcEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) { SRpcReqContext *pContext; pContext = (SRpcReqContext *) (pMsg->pCont-sizeof(SRpcHead)-sizeof(SRpcReqContext)); @@ -498,9 +498,9 @@ void rpcSendRecv(void *shandle, SRpcIpSet *pIpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) tsem_init(&sem, 0, 0); pContext->pSem = &sem; pContext->pRsp = pRsp; - pContext->pSet = pIpSet; + pContext->pSet = pEpSet; - rpcSendRequest(shandle, pIpSet, pMsg); + rpcSendRequest(shandle, pEpSet, pMsg); tsem_wait(&sem); tsem_destroy(&sem); @@ -755,11 +755,11 @@ static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, SRecvInfo *pRecv) { static SRpcConn *rpcSetupConnToServer(SRpcReqContext *pContext) { SRpcConn *pConn; SRpcInfo *pRpc = pContext->pRpc; - SRpcIpSet *pIpSet = &pContext->ipSet; + SRpcEpSet *pEpSet = &pContext->epSet; - pConn = rpcGetConnFromCache(pRpc->pCache, pIpSet->fqdn[pIpSet->inUse], pIpSet->port[pIpSet->inUse], pContext->connType); + pConn = rpcGetConnFromCache(pRpc->pCache, pEpSet->fqdn[pEpSet->inUse], pEpSet->port[pEpSet->inUse], pContext->connType); if ( pConn == NULL || pConn->user[0] == 0) { - pConn = rpcOpenConn(pRpc, pIpSet->fqdn[pIpSet->inUse], pIpSet->port[pIpSet->inUse], pContext->connType); + pConn = rpcOpenConn(pRpc, pEpSet->fqdn[pEpSet->inUse], pEpSet->port[pEpSet->inUse], pContext->connType); } if (pConn) { @@ -1020,16 +1020,16 @@ static void rpcNotifyClient(SRpcReqContext *pContext, SRpcMsg *pMsg) { pContext->pConn = NULL; if (pContext->pRsp) { // for synchronous API - memcpy(pContext->pSet, &pContext->ipSet, sizeof(SRpcIpSet)); + memcpy(pContext->pSet, &pContext->epSet, sizeof(SRpcEpSet)); memcpy(pContext->pRsp, pMsg, sizeof(SRpcMsg)); tsem_post(pContext->pSem); } else { // for asynchronous API - SRpcIpSet *pIpSet = NULL; - if (pContext->ipSet.inUse != pContext->oldInUse || pContext->redirect) - pIpSet = &pContext->ipSet; + SRpcEpSet *pEpSet = NULL; + if (pContext->epSet.inUse != pContext->oldInUse || pContext->redirect) + pEpSet = &pContext->epSet; - (*pRpc->cfp)(pMsg, pIpSet); + (*pRpc->cfp)(pMsg, pEpSet); } // free the request message @@ -1070,9 +1070,9 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) { pConn->pContext = NULL; pConn->pReqMsg = NULL; - // for UDP, port may be changed by server, the port in ipSet shall be used for cache + // for UDP, port may be changed by server, the port in epSet shall be used for cache if (pHead->code != TSDB_CODE_RPC_TOO_SLOW) { - rpcAddConnIntoCache(pRpc->pCache, pConn, pConn->peerFqdn, pContext->ipSet.port[pContext->ipSet.inUse], pConn->connType); + rpcAddConnIntoCache(pRpc->pCache, pConn, pConn->peerFqdn, pContext->epSet.port[pContext->epSet.inUse], pConn->connType); } else { rpcCloseConn(pConn); } @@ -1087,10 +1087,10 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) { if (pHead->code == TSDB_CODE_RPC_REDIRECT) { pContext->numOfTry = 0; - memcpy(&pContext->ipSet, pHead->content, sizeof(pContext->ipSet)); - tDebug("%s, redirect is received, numOfIps:%d", pConn->info, pContext->ipSet.numOfIps); - for (int i=0; iipSet.numOfIps; ++i) - pContext->ipSet.port[i] = htons(pContext->ipSet.port[i]); + memcpy(&pContext->epSet, pHead->content, sizeof(pContext->epSet)); + tDebug("%s, redirect is received, numOfEps:%d", pConn->info, pContext->epSet.numOfEps); + for (int i=0; iepSet.numOfEps; ++i) + pContext->epSet.port[i] = htons(pContext->epSet.port[i]); rpcSendReqToServer(pRpc, pContext); rpcFreeCont(rpcMsg.pCont); } else if (pHead->code == TSDB_CODE_RPC_NOT_READY) { @@ -1269,7 +1269,7 @@ static void rpcProcessConnError(void *param, void *id) { tDebug("%s %p, connection error happens", pRpc->label, pContext->ahandle); - if (pContext->numOfTry >= pContext->ipSet.numOfIps) { + if (pContext->numOfTry >= pContext->epSet.numOfEps) { rpcMsg.msgType = pContext->msgType+1; rpcMsg.ahandle = pContext->ahandle; rpcMsg.code = pContext->code; @@ -1279,8 +1279,8 @@ static void rpcProcessConnError(void *param, void *id) { rpcNotifyClient(pContext, &rpcMsg); } else { // move to next IP - pContext->ipSet.inUse++; - pContext->ipSet.inUse = pContext->ipSet.inUse % pContext->ipSet.numOfIps; + pContext->epSet.inUse++; + pContext->epSet.inUse = pContext->epSet.inUse % pContext->epSet.numOfEps; rpcSendReqToServer(pRpc, pContext); } } diff --git a/src/rpc/test/rclient.c b/src/rpc/test/rclient.c index f8dbbedb11..e2d9d388f9 100644 --- a/src/rpc/test/rclient.c +++ b/src/rpc/test/rclient.c @@ -22,7 +22,7 @@ typedef struct { int index; - SRpcIpSet ipSet; + SRpcEpSet epSet; int num; int numOfReqs; int msgSize; @@ -32,11 +32,11 @@ typedef struct { void *pRpc; } SInfo; -static void processResponse(SRpcMsg *pMsg, SRpcIpSet *pIpSet) { +static void processResponse(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { SInfo *pInfo = (SInfo *)pMsg->ahandle; tDebug("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, pMsg->msgType, pMsg->contLen, pMsg->code); - if (pIpSet) pInfo->ipSet = *pIpSet; + if (pEpSet) pInfo->epSet = *pEpSet; rpcFreeCont(pMsg->pCont); sem_post(&pInfo->rspSem); @@ -57,7 +57,7 @@ static void *sendRequest(void *param) { rpcMsg.ahandle = pInfo; rpcMsg.msgType = 1; tDebug("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num); - rpcSendRequest(pInfo->pRpc, &pInfo->ipSet, &rpcMsg); + rpcSendRequest(pInfo->pRpc, &pInfo->epSet, &rpcMsg); if ( pInfo->num % 20000 == 0 ) tInfo("thread:%d, %d requests have been sent", pInfo->index, pInfo->num); sem_wait(&pInfo->rspSem); @@ -71,7 +71,7 @@ static void *sendRequest(void *param) { int main(int argc, char *argv[]) { SRpcInit rpcInit; - SRpcIpSet ipSet; + SRpcEpSet epSet; int msgSize = 128; int numOfReqs = 0; int appThreads = 1; @@ -82,12 +82,12 @@ int main(int argc, char *argv[]) { pthread_attr_t thattr; // server info - ipSet.numOfIps = 1; - ipSet.inUse = 0; - ipSet.port[0] = 7000; - ipSet.port[1] = 7000; - strcpy(ipSet.fqdn[0], serverIp); - strcpy(ipSet.fqdn[1], "192.168.0.1"); + epSet.numOfEps = 1; + epSet.inUse = 0; + epSet.port[0] = 7000; + epSet.port[1] = 7000; + strcpy(epSet.fqdn[0], serverIp); + strcpy(epSet.fqdn[1], "192.168.0.1"); // client info memset(&rpcInit, 0, sizeof(rpcInit)); @@ -105,9 +105,9 @@ int main(int argc, char *argv[]) { for (int i=1; iindex = i; - pInfo->ipSet = ipSet; + pInfo->epSet = epSet; pInfo->numOfReqs = numOfReqs; pInfo->msgSize = msgSize; sem_init(&pInfo->rspSem, 0, 0); diff --git a/src/rpc/test/rsclient.c b/src/rpc/test/rsclient.c index 266697d555..6e6961784b 100644 --- a/src/rpc/test/rsclient.c +++ b/src/rpc/test/rsclient.c @@ -23,7 +23,7 @@ typedef struct { int index; - SRpcIpSet ipSet; + SRpcEpSet epSet; int num; int numOfReqs; int msgSize; @@ -51,7 +51,7 @@ static void *sendRequest(void *param) { rpcMsg.msgType = 1; tDebug("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num); - rpcSendRecv(pInfo->pRpc, &pInfo->ipSet, &rpcMsg, &rspMsg); + rpcSendRecv(pInfo->pRpc, &pInfo->epSet, &rpcMsg, &rspMsg); // handle response if (rspMsg.code != 0) terror++; @@ -72,7 +72,7 @@ static void *sendRequest(void *param) { int main(int argc, char *argv[]) { SRpcInit rpcInit; - SRpcIpSet ipSet; + SRpcEpSet epSet; int msgSize = 128; int numOfReqs = 0; int appThreads = 1; @@ -83,12 +83,12 @@ int main(int argc, char *argv[]) { pthread_attr_t thattr; // server info - ipSet.numOfIps = 1; - ipSet.inUse = 0; - ipSet.port[0] = 7000; - ipSet.port[1] = 7000; - strcpy(ipSet.fqdn[0], serverIp); - strcpy(ipSet.fqdn[1], "192.168.0.1"); + epSet.numOfEps = 1; + epSet.inUse = 0; + epSet.port[0] = 7000; + epSet.port[1] = 7000; + strcpy(epSet.fqdn[0], serverIp); + strcpy(epSet.fqdn[1], "192.168.0.1"); // client info memset(&rpcInit, 0, sizeof(rpcInit)); @@ -106,9 +106,9 @@ int main(int argc, char *argv[]) { for (int i=1; iindex = i; - pInfo->ipSet = ipSet; + pInfo->epSet = epSet; pInfo->numOfReqs = numOfReqs; pInfo->msgSize = msgSize; sem_init(&pInfo->rspSem, 0, 0); diff --git a/src/rpc/test/rserver.c b/src/rpc/test/rserver.c index d06e9df64b..44c5cd6ab4 100644 --- a/src/rpc/test/rserver.c +++ b/src/rpc/test/rserver.c @@ -103,7 +103,7 @@ int retrieveAuthInfo(char *meterId, char *spi, char *encrypt, char *secret, char return ret; } -void processRequestMsg(SRpcMsg *pMsg, SRpcIpSet *pIpSet) { +void processRequestMsg(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { SRpcMsg *pTemp; pTemp = taosAllocateQitem(sizeof(SRpcMsg)); -- GitLab