From 246ea1df248e8721bad64f40fd57ed0ec0db263c Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sat, 18 Jul 2020 10:31:31 +0000 Subject: [PATCH] modify from ip to ep --- src/client/inc/tsclient.h | 4 +- src/client/src/tscSchemaUtil.c | 8 +-- src/client/src/tscServer.c | 95 +++++++++++++++------------------- src/client/src/tscSql.c | 2 +- src/client/src/tscUtil.c | 2 +- src/inc/trpc.h | 6 +-- 6 files changed, 53 insertions(+), 64 deletions(-) diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 83f9165d35..b5455ed1fb 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -55,8 +55,8 @@ typedef struct STableComInfo { typedef struct SCMCorVgroupInfo { int32_t version; int8_t inUse; - int8_t numOfIps; - SIpAddr ipAddr[TSDB_MAX_REPLICA]; + int8_t numOfEps; + SEpAddr epAddr[TSDB_MAX_REPLICA]; } SCMCorVgroupInfo; typedef struct STableMeta { diff --git a/src/client/src/tscSchemaUtil.c b/src/client/src/tscSchemaUtil.c index 52342b3650..9b8f48b109 100644 --- a/src/client/src/tscSchemaUtil.c +++ b/src/client/src/tscSchemaUtil.c @@ -143,10 +143,10 @@ struct SSchema tscGetTbnameColumnSchema() { static void tscInitCorVgroupInfo(SCMCorVgroupInfo *corVgroupInfo, SCMVgroupInfo *vgroupInfo) { corVgroupInfo->version = 0; corVgroupInfo->inUse = 0; - corVgroupInfo->numOfIps = vgroupInfo->numOfIps; - for (int32_t i = 0; i < corVgroupInfo->numOfIps; i++) { - strncpy(corVgroupInfo->ipAddr[i].fqdn, vgroupInfo->ipAddr[i].fqdn, TSDB_FQDN_LEN); - corVgroupInfo->ipAddr[i].port = vgroupInfo->ipAddr[i].port; + corVgroupInfo->numOfEps = vgroupInfo->numOfEps; + for (int32_t i = 0; i < corVgroupInfo->numOfEps; i++) { + strncpy(corVgroupInfo->epAddr[i].fqdn, vgroupInfo->epAddr[i].fqdn, TSDB_FQDN_LEN); + corVgroupInfo->epAddr[i].port = vgroupInfo->epAddr[i].port; } } STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg, size_t* size) { diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 3d78063bed..d9922b8718 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -53,64 +53,53 @@ static void tscSetDnodeEpSet(SSqlObj* pSql, SCMVgroupInfo* pVgroupInfo) { return; } - pEpList->numOfEps = pVgroupInfo->numOfEps; + pEpSet->numOfEps = pVgroupInfo->numOfEps; for(int32_t i = 0; i < pVgroupInfo->numOfEps; ++i) { - strcpy(pEpList->fqdn[i], pVgroupInfo->epAddr[i].fqdn); - pEpList->port[i] = pVgroupInfo->epAddr[i].port; + strcpy(pEpSet->fqdn[i], pVgroupInfo->epAddr[i].fqdn); + pEpSet->port[i] = pVgroupInfo->epAddr[i].port; } } -void tscIpSetCopy(SRpcIpSet *dst, SRpcIpSet *src) { - dst->numOfIps = src->numOfIps; - dst->inUse = src->inUse; - for (int32_t i = 0; i < src->numOfIps; ++i) { - dst->port[i] = src->port[i]; - strncpy(dst->fqdn[i], src->fqdn[i], TSDB_FQDN_LEN); - } -} -static void tscDumpMgmtIpSet(SRpcIpSet *ipSet) { - taosCorBeginRead(&tscMgmtIpSet.version); - SRpcIpSet* src = &tscMgmtIpSet.ipSet; - tscIpSetCopy(ipSet, src); - taosCorEndRead(&tscMgmtIpSet.version); +static void tscDumpMgmtEpSet(SRpcEpSet *epSet) { + taosCorBeginRead(&tscMgmtEpSet.version); + *epSet = tscMgmtEpSet.epSet; + taosCorEndRead(&tscMgmtEpSet.version); } -static void tscIpSetHtons(SRpcIpSet *s) { - for (int32_t i = 0; i < s->numOfIps; i++) { +static void tscEpSetHtons(SRpcEpSet *s) { + for (int32_t i = 0; i < s->numOfEps; i++) { s->port[i] = htons(s->port[i]); } } -bool tscIpSetIsEqual(SRpcIpSet *s1, SRpcIpSet *s2) { - if (s1->numOfIps != s2->numOfIps || s1->inUse != s2->inUse) { +bool tscEpSetIsEqual(SRpcEpSet *s1, SRpcEpSet *s2) { + if (s1->numOfEps != s2->numOfEps || s1->inUse != s2->inUse) { return false; } - for (int32_t i = 0; i < s1->numOfIps; i++) { + for (int32_t i = 0; i < s1->numOfEps; i++) { if (s1->port[i] != s2->port[i] || strncmp(s1->fqdn[i], s2->fqdn[i], TSDB_FQDN_LEN) != 0) return false; } return true; } -void tscUpdateMgmtIpList(SRpcIpSet *pIpSet) { +void tscUpdateMgmtEpSet(SRpcEpSet *pEpSet) { // no need to update if equal - taosCorBeginWrite(&tscMgmtIpSet.version); - // or copy directly, tscMgmtIpSet.ipSet = *pIpSet - SRpcIpSet *mgmtIpSet = &tscMgmtIpSet.ipSet; - tscIpSetCopy(mgmtIpSet, pIpSet); - taosCorEndWrite(&tscMgmtIpSet.version); + taosCorBeginWrite(&tscMgmtEpSet.version); + tscMgmtEpSet.epSet = *pEpSet; + taosCorEndWrite(&tscMgmtEpSet.version); } -static void tscDumpIpSetFromVgroupInfo(SCMCorVgroupInfo *pVgroupInfo, SRpcIpSet *pIpSet) { +static void tscDumpEpSetFromVgroupInfo(SCMCorVgroupInfo *pVgroupInfo, SRpcEpSet *pEpSet) { if (pVgroupInfo == NULL) { return;} taosCorBeginRead(&pVgroupInfo->version); int8_t inUse = pVgroupInfo->inUse; - pIpSet->inUse = (inUse >= 0 && inUse < TSDB_MAX_REPLICA) ? inUse: 0; - pIpSet->numOfIps = pVgroupInfo->numOfIps; - for (int32_t i = 0; i < pVgroupInfo->numOfIps; ++i) { - strncpy(pIpSet->fqdn[i], pVgroupInfo->ipAddr[i].fqdn, TSDB_FQDN_LEN); - pIpSet->port[i] = pVgroupInfo->ipAddr[i].port; + pEpSet->inUse = (inUse >= 0 && inUse < TSDB_MAX_REPLICA) ? inUse: 0; + pEpSet->numOfEps = pVgroupInfo->numOfEps; + for (int32_t i = 0; i < pVgroupInfo->numOfEps; ++i) { + strncpy(pEpSet->fqdn[i], pVgroupInfo->epAddr[i].fqdn, TSDB_FQDN_LEN); + pEpSet->port[i] = pVgroupInfo->epAddr[i].port; } taosCorEndRead(&pVgroupInfo->version); } -static void tscUpdateVgroupInfo(SSqlObj *pObj, SRpcIpSet *pIpSet) { +static void tscUpdateVgroupInfo(SSqlObj *pObj, SRpcEpSet *pEpSet) { SSqlCmd *pCmd = &pObj->cmd; STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0); if (pTableMetaInfo == NULL || pTableMetaInfo->pTableMeta == NULL) { return;} @@ -130,7 +119,7 @@ void tscPrintMgmtEp() { SRpcEpSet dump; tscDumpMgmtEpSet(&dump); if (dump.numOfEps <= 0) { - tscError("invalid mnode EP list:%d", dump.numOfEPs); + tscError("invalid mnode EP list:%d", dump.numOfEps); } else { for (int i = 0; i < dump.numOfEps; ++i) { tscDebug("mnode index:%d %s:%d", i, dump.fqdn[i], dump.port[i]); @@ -166,10 +155,10 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) { if (code == 0) { SCMHeartBeatRsp *pRsp = (SCMHeartBeatRsp *)pRes->pRsp; - SRpcEpSet * pEpList = &pRsp->epList; - if (pEpList->numOfEps > 0) { - tscEpSetHtons(pEpList); - tscUpdateMgmtEpList(pEpList); + SRpcEpSet * epSet = &pRsp->epSet; + if (epSet->numOfEps > 0) { + tscEpSetHtons(epSet); + tscUpdateMgmtEpSet(epSet); } pSql->pTscObj->connId = htonl(pRsp->connId); @@ -242,7 +231,7 @@ int tscSendMsgToServer(SSqlObj *pSql) { // set the mgmt ip list if (pSql->cmd.command >= TSDB_SQL_MGMT) { - tscDumpMgmtEpSet(&pSql->epList); + tscDumpMgmtEpSet(&pSql->epSet); } memcpy(pMsg, pSql->cmd.payload, pSql->cmd.payloadLen); @@ -296,11 +285,11 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { if (pEpSet) { //SRpcEpSet dump; tscEpSetHtons(pEpSet); - if (tscEpSetIsEqual(&pSql->epList, pEpSet)) { + if (tscEpSetIsEqual(&pSql->epSet, pEpSet)) { if(pCmd->command < TSDB_SQL_MGMT) { tscUpdateVgroupInfo(pSql, pEpSet); } else { - tscUpdateMgmtEpList(pEpSet); + tscUpdateMgmtEpSet(pEpSet); } } } @@ -589,7 +578,7 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) { // pSql->cmd.payloadLen is set during copying data into payload pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT; - tscDumpEpSetFromVgroupInfo(&pTableMeta->corVgroupInfo, &pSql->epList); + tscDumpEpSetFromVgroupInfo(&pTableMeta->corVgroupInfo, &pSql->epSet); tscDebug("%p build submit msg, vgId:%d numOfTables:%d numberOfEP:%d", pSql, vgId, pSql->cmd.numOfTablesInSubmit, pSql->epSet.numOfEps); @@ -631,7 +620,7 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char } else { pVgroupInfo = &pTableMeta->vgroupInfo; } - tscSetDnodeEpList(pSql, pVgroupInfo); + tscSetDnodeEpSet(pSql, pVgroupInfo); if (pVgroupInfo != NULL) { pQueryMsg->head.vgId = htonl(pVgroupInfo->vgId); @@ -654,7 +643,7 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char SVgroupTableInfo* pTableIdList = taosArrayGet(pTableMetaInfo->pVgroupTables, index); // set the vgroup info - tscSetDnodeEpList(pSql, &pTableIdList->vgInfo); + tscSetDnodeEpSet(pSql, &pTableIdList->vgInfo); pQueryMsg->head.vgId = htonl(pTableIdList->vgInfo.vgId); int32_t numOfTables = taosArrayGetSize(pTableIdList->itemList); @@ -1200,11 +1189,11 @@ int32_t tscBuildShowMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pShowMsg->payloadLen = htons(pPattern->n); } } else { - SSQLToken *pIpAddr = &pShowInfo->prefix; - assert(pIpAddr->n > 0 && pIpAddr->type > 0); + SSQLToken *pEpAddr = &pShowInfo->prefix; + assert(pEpAddr->n > 0 && pEpAddr->type > 0); - strncpy(pShowMsg->payload, pIpAddr->z, pIpAddr->n); - pShowMsg->payloadLen = htons(pIpAddr->n); + strncpy(pShowMsg->payload, pEpAddr->z, pEpAddr->n); + pShowMsg->payloadLen = htons(pEpAddr->n); } pCmd->payloadLen = sizeof(SCMShowMsg) + pShowMsg->payloadLen; @@ -1387,7 +1376,7 @@ int tscBuildUpdateTagMsg(SSqlObj* pSql, SSqlInfo *pInfo) { SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - tscDumpEpSetFromVgroupInfo(&pTableMetaInfo->pTableMeta->corVgroupInfo, &pSql->epList); + tscDumpEpSetFromVgroupInfo(&pTableMetaInfo->pTableMeta->corVgroupInfo, &pSql->epSet); return TSDB_CODE_SUCCESS; } @@ -2011,9 +2000,9 @@ int tscProcessConnectRsp(SSqlObj *pSql) { assert(len <= sizeof(pObj->db)); tstrncpy(pObj->db, temp, sizeof(pObj->db)); - if (pConnect->epList.numOfEps > 0) { - tscEpSetHtons(&pConnect->epList); - tscUpdateMgmtEpList(&pConnect->epList); + if (pConnect->epSet.numOfEps > 0) { + tscEpSetHtons(&pConnect->epSet); + tscUpdateMgmtEpSet(&pConnect->epSet); } strcpy(pObj->sversion, pConnect->serverVersion); diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 59b506a454..5848b7b82f 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -62,7 +62,7 @@ SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con } if (ip) { - if (tscSetMgmtEpListFromCfg(ip, NULL) < 0) return NULL; + if (tscSetMgmtEpSetFromCfg(ip, NULL) < 0) return NULL; if (port) tscMgmtEpSet.epSet.port[0] = port; } diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index c3cae58d8a..957bdeeb7f 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -2145,7 +2145,7 @@ char* strdup_throw(const char* str) { return p; } -int tscSetMgmtEpListFromCfg(const char *first, const char *second) { +int tscSetMgmtEpSetFromCfg(const char *first, const char *second) { // init mgmt ip set tscMgmtEpSet.version = 0; SRpcEpSet *mgmtEpSet = &(tscMgmtEpSet.epSet); diff --git a/src/inc/trpc.h b/src/inc/trpc.h index 41ed98fd70..bdee917b5e 100644 --- a/src/inc/trpc.h +++ b/src/inc/trpc.h @@ -35,10 +35,10 @@ typedef struct SRpcEpSet { char fqdn[TSDB_MAX_REPLICA][TSDB_FQDN_LEN]; } SRpcEpSet; -typedef struct SRpcCorIpSet { +typedef struct SRpcCorEpSet { int32_t version; - SRpcIpSet ipSet; -} SRpcCorIpSet; + SRpcEpSet epSet; +} SRpcCorEpSet; typedef struct SRpcConnInfo { uint32_t clientIp; -- GitLab