提交 246ea1df 编写于 作者: dengyihao's avatar dengyihao

modify from ip to ep

上级 599df650
...@@ -55,8 +55,8 @@ typedef struct STableComInfo { ...@@ -55,8 +55,8 @@ typedef struct STableComInfo {
typedef struct SCMCorVgroupInfo { typedef struct SCMCorVgroupInfo {
int32_t version; int32_t version;
int8_t inUse; int8_t inUse;
int8_t numOfIps; int8_t numOfEps;
SIpAddr ipAddr[TSDB_MAX_REPLICA]; SEpAddr epAddr[TSDB_MAX_REPLICA];
} SCMCorVgroupInfo; } SCMCorVgroupInfo;
typedef struct STableMeta { typedef struct STableMeta {
......
...@@ -143,10 +143,10 @@ struct SSchema tscGetTbnameColumnSchema() { ...@@ -143,10 +143,10 @@ struct SSchema tscGetTbnameColumnSchema() {
static void tscInitCorVgroupInfo(SCMCorVgroupInfo *corVgroupInfo, SCMVgroupInfo *vgroupInfo) { static void tscInitCorVgroupInfo(SCMCorVgroupInfo *corVgroupInfo, SCMVgroupInfo *vgroupInfo) {
corVgroupInfo->version = 0; corVgroupInfo->version = 0;
corVgroupInfo->inUse = 0; corVgroupInfo->inUse = 0;
corVgroupInfo->numOfIps = vgroupInfo->numOfIps; corVgroupInfo->numOfEps = vgroupInfo->numOfEps;
for (int32_t i = 0; i < corVgroupInfo->numOfIps; i++) { for (int32_t i = 0; i < corVgroupInfo->numOfEps; i++) {
strncpy(corVgroupInfo->ipAddr[i].fqdn, vgroupInfo->ipAddr[i].fqdn, TSDB_FQDN_LEN); strncpy(corVgroupInfo->epAddr[i].fqdn, vgroupInfo->epAddr[i].fqdn, TSDB_FQDN_LEN);
corVgroupInfo->ipAddr[i].port = vgroupInfo->ipAddr[i].port; corVgroupInfo->epAddr[i].port = vgroupInfo->epAddr[i].port;
} }
} }
STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg, size_t* size) { STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg, size_t* size) {
......
...@@ -53,64 +53,53 @@ static void tscSetDnodeEpSet(SSqlObj* pSql, SCMVgroupInfo* pVgroupInfo) { ...@@ -53,64 +53,53 @@ static void tscSetDnodeEpSet(SSqlObj* pSql, SCMVgroupInfo* pVgroupInfo) {
return; return;
} }
pEpList->numOfEps = pVgroupInfo->numOfEps; pEpSet->numOfEps = pVgroupInfo->numOfEps;
for(int32_t i = 0; i < pVgroupInfo->numOfEps; ++i) { for(int32_t i = 0; i < pVgroupInfo->numOfEps; ++i) {
strcpy(pEpList->fqdn[i], pVgroupInfo->epAddr[i].fqdn); strcpy(pEpSet->fqdn[i], pVgroupInfo->epAddr[i].fqdn);
pEpList->port[i] = pVgroupInfo->epAddr[i].port; pEpSet->port[i] = pVgroupInfo->epAddr[i].port;
} }
} }
void tscIpSetCopy(SRpcIpSet *dst, SRpcIpSet *src) { static void tscDumpMgmtEpSet(SRpcEpSet *epSet) {
dst->numOfIps = src->numOfIps; taosCorBeginRead(&tscMgmtEpSet.version);
dst->inUse = src->inUse; *epSet = tscMgmtEpSet.epSet;
for (int32_t i = 0; i < src->numOfIps; ++i) { taosCorEndRead(&tscMgmtEpSet.version);
dst->port[i] = src->port[i];
strncpy(dst->fqdn[i], src->fqdn[i], TSDB_FQDN_LEN);
}
}
static void tscDumpMgmtIpSet(SRpcIpSet *ipSet) {
taosCorBeginRead(&tscMgmtIpSet.version);
SRpcIpSet* src = &tscMgmtIpSet.ipSet;
tscIpSetCopy(ipSet, src);
taosCorEndRead(&tscMgmtIpSet.version);
} }
static void tscIpSetHtons(SRpcIpSet *s) { static void tscEpSetHtons(SRpcEpSet *s) {
for (int32_t i = 0; i < s->numOfIps; i++) { for (int32_t i = 0; i < s->numOfEps; i++) {
s->port[i] = htons(s->port[i]); s->port[i] = htons(s->port[i]);
} }
} }
bool tscIpSetIsEqual(SRpcIpSet *s1, SRpcIpSet *s2) { bool tscEpSetIsEqual(SRpcEpSet *s1, SRpcEpSet *s2) {
if (s1->numOfIps != s2->numOfIps || s1->inUse != s2->inUse) { if (s1->numOfEps != s2->numOfEps || s1->inUse != s2->inUse) {
return false; return false;
} }
for (int32_t i = 0; i < s1->numOfIps; i++) { for (int32_t i = 0; i < s1->numOfEps; i++) {
if (s1->port[i] != s2->port[i] if (s1->port[i] != s2->port[i]
|| strncmp(s1->fqdn[i], s2->fqdn[i], TSDB_FQDN_LEN) != 0) || strncmp(s1->fqdn[i], s2->fqdn[i], TSDB_FQDN_LEN) != 0)
return false; return false;
} }
return true; return true;
} }
void tscUpdateMgmtIpList(SRpcIpSet *pIpSet) { void tscUpdateMgmtEpSet(SRpcEpSet *pEpSet) {
// no need to update if equal // no need to update if equal
taosCorBeginWrite(&tscMgmtIpSet.version); taosCorBeginWrite(&tscMgmtEpSet.version);
// or copy directly, tscMgmtIpSet.ipSet = *pIpSet tscMgmtEpSet.epSet = *pEpSet;
SRpcIpSet *mgmtIpSet = &tscMgmtIpSet.ipSet; taosCorEndWrite(&tscMgmtEpSet.version);
tscIpSetCopy(mgmtIpSet, pIpSet);
taosCorEndWrite(&tscMgmtIpSet.version);
} }
static void tscDumpIpSetFromVgroupInfo(SCMCorVgroupInfo *pVgroupInfo, SRpcIpSet *pIpSet) { static void tscDumpEpSetFromVgroupInfo(SCMCorVgroupInfo *pVgroupInfo, SRpcEpSet *pEpSet) {
if (pVgroupInfo == NULL) { return;} if (pVgroupInfo == NULL) { return;}
taosCorBeginRead(&pVgroupInfo->version); taosCorBeginRead(&pVgroupInfo->version);
int8_t inUse = pVgroupInfo->inUse; int8_t inUse = pVgroupInfo->inUse;
pIpSet->inUse = (inUse >= 0 && inUse < TSDB_MAX_REPLICA) ? inUse: 0; pEpSet->inUse = (inUse >= 0 && inUse < TSDB_MAX_REPLICA) ? inUse: 0;
pIpSet->numOfIps = pVgroupInfo->numOfIps; pEpSet->numOfEps = pVgroupInfo->numOfEps;
for (int32_t i = 0; i < pVgroupInfo->numOfIps; ++i) { for (int32_t i = 0; i < pVgroupInfo->numOfEps; ++i) {
strncpy(pIpSet->fqdn[i], pVgroupInfo->ipAddr[i].fqdn, TSDB_FQDN_LEN); strncpy(pEpSet->fqdn[i], pVgroupInfo->epAddr[i].fqdn, TSDB_FQDN_LEN);
pIpSet->port[i] = pVgroupInfo->ipAddr[i].port; pEpSet->port[i] = pVgroupInfo->epAddr[i].port;
} }
taosCorEndRead(&pVgroupInfo->version); taosCorEndRead(&pVgroupInfo->version);
} }
static void tscUpdateVgroupInfo(SSqlObj *pObj, SRpcIpSet *pIpSet) { static void tscUpdateVgroupInfo(SSqlObj *pObj, SRpcEpSet *pEpSet) {
SSqlCmd *pCmd = &pObj->cmd; SSqlCmd *pCmd = &pObj->cmd;
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0); STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
if (pTableMetaInfo == NULL || pTableMetaInfo->pTableMeta == NULL) { return;} if (pTableMetaInfo == NULL || pTableMetaInfo->pTableMeta == NULL) { return;}
...@@ -130,7 +119,7 @@ void tscPrintMgmtEp() { ...@@ -130,7 +119,7 @@ void tscPrintMgmtEp() {
SRpcEpSet dump; SRpcEpSet dump;
tscDumpMgmtEpSet(&dump); tscDumpMgmtEpSet(&dump);
if (dump.numOfEps <= 0) { if (dump.numOfEps <= 0) {
tscError("invalid mnode EP list:%d", dump.numOfEPs); tscError("invalid mnode EP list:%d", dump.numOfEps);
} else { } else {
for (int i = 0; i < dump.numOfEps; ++i) { for (int i = 0; i < dump.numOfEps; ++i) {
tscDebug("mnode index:%d %s:%d", i, dump.fqdn[i], dump.port[i]); tscDebug("mnode index:%d %s:%d", i, dump.fqdn[i], dump.port[i]);
...@@ -166,10 +155,10 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) { ...@@ -166,10 +155,10 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
if (code == 0) { if (code == 0) {
SCMHeartBeatRsp *pRsp = (SCMHeartBeatRsp *)pRes->pRsp; SCMHeartBeatRsp *pRsp = (SCMHeartBeatRsp *)pRes->pRsp;
SRpcEpSet * pEpList = &pRsp->epList; SRpcEpSet * epSet = &pRsp->epSet;
if (pEpList->numOfEps > 0) { if (epSet->numOfEps > 0) {
tscEpSetHtons(pEpList); tscEpSetHtons(epSet);
tscUpdateMgmtEpList(pEpList); tscUpdateMgmtEpSet(epSet);
} }
pSql->pTscObj->connId = htonl(pRsp->connId); pSql->pTscObj->connId = htonl(pRsp->connId);
...@@ -242,7 +231,7 @@ int tscSendMsgToServer(SSqlObj *pSql) { ...@@ -242,7 +231,7 @@ int tscSendMsgToServer(SSqlObj *pSql) {
// set the mgmt ip list // set the mgmt ip list
if (pSql->cmd.command >= TSDB_SQL_MGMT) { if (pSql->cmd.command >= TSDB_SQL_MGMT) {
tscDumpMgmtEpSet(&pSql->epList); tscDumpMgmtEpSet(&pSql->epSet);
} }
memcpy(pMsg, pSql->cmd.payload, pSql->cmd.payloadLen); memcpy(pMsg, pSql->cmd.payload, pSql->cmd.payloadLen);
...@@ -296,11 +285,11 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { ...@@ -296,11 +285,11 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
if (pEpSet) { if (pEpSet) {
//SRpcEpSet dump; //SRpcEpSet dump;
tscEpSetHtons(pEpSet); tscEpSetHtons(pEpSet);
if (tscEpSetIsEqual(&pSql->epList, pEpSet)) { if (tscEpSetIsEqual(&pSql->epSet, pEpSet)) {
if(pCmd->command < TSDB_SQL_MGMT) { if(pCmd->command < TSDB_SQL_MGMT) {
tscUpdateVgroupInfo(pSql, pEpSet); tscUpdateVgroupInfo(pSql, pEpSet);
} else { } else {
tscUpdateMgmtEpList(pEpSet); tscUpdateMgmtEpSet(pEpSet);
} }
} }
} }
...@@ -589,7 +578,7 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -589,7 +578,7 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
// pSql->cmd.payloadLen is set during copying data into payload // pSql->cmd.payloadLen is set during copying data into payload
pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT; pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT;
tscDumpEpSetFromVgroupInfo(&pTableMeta->corVgroupInfo, &pSql->epList); tscDumpEpSetFromVgroupInfo(&pTableMeta->corVgroupInfo, &pSql->epSet);
tscDebug("%p build submit msg, vgId:%d numOfTables:%d numberOfEP:%d", pSql, vgId, pSql->cmd.numOfTablesInSubmit, tscDebug("%p build submit msg, vgId:%d numOfTables:%d numberOfEP:%d", pSql, vgId, pSql->cmd.numOfTablesInSubmit,
pSql->epSet.numOfEps); pSql->epSet.numOfEps);
...@@ -631,7 +620,7 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char ...@@ -631,7 +620,7 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char
} else { } else {
pVgroupInfo = &pTableMeta->vgroupInfo; pVgroupInfo = &pTableMeta->vgroupInfo;
} }
tscSetDnodeEpList(pSql, pVgroupInfo); tscSetDnodeEpSet(pSql, pVgroupInfo);
if (pVgroupInfo != NULL) { if (pVgroupInfo != NULL) {
pQueryMsg->head.vgId = htonl(pVgroupInfo->vgId); pQueryMsg->head.vgId = htonl(pVgroupInfo->vgId);
...@@ -654,7 +643,7 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char ...@@ -654,7 +643,7 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char
SVgroupTableInfo* pTableIdList = taosArrayGet(pTableMetaInfo->pVgroupTables, index); SVgroupTableInfo* pTableIdList = taosArrayGet(pTableMetaInfo->pVgroupTables, index);
// set the vgroup info // set the vgroup info
tscSetDnodeEpList(pSql, &pTableIdList->vgInfo); tscSetDnodeEpSet(pSql, &pTableIdList->vgInfo);
pQueryMsg->head.vgId = htonl(pTableIdList->vgInfo.vgId); pQueryMsg->head.vgId = htonl(pTableIdList->vgInfo.vgId);
int32_t numOfTables = taosArrayGetSize(pTableIdList->itemList); int32_t numOfTables = taosArrayGetSize(pTableIdList->itemList);
...@@ -1200,11 +1189,11 @@ int32_t tscBuildShowMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -1200,11 +1189,11 @@ int32_t tscBuildShowMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pShowMsg->payloadLen = htons(pPattern->n); pShowMsg->payloadLen = htons(pPattern->n);
} }
} else { } else {
SSQLToken *pIpAddr = &pShowInfo->prefix; SSQLToken *pEpAddr = &pShowInfo->prefix;
assert(pIpAddr->n > 0 && pIpAddr->type > 0); assert(pEpAddr->n > 0 && pEpAddr->type > 0);
strncpy(pShowMsg->payload, pIpAddr->z, pIpAddr->n); strncpy(pShowMsg->payload, pEpAddr->z, pEpAddr->n);
pShowMsg->payloadLen = htons(pIpAddr->n); pShowMsg->payloadLen = htons(pEpAddr->n);
} }
pCmd->payloadLen = sizeof(SCMShowMsg) + pShowMsg->payloadLen; pCmd->payloadLen = sizeof(SCMShowMsg) + pShowMsg->payloadLen;
...@@ -1387,7 +1376,7 @@ int tscBuildUpdateTagMsg(SSqlObj* pSql, SSqlInfo *pInfo) { ...@@ -1387,7 +1376,7 @@ int tscBuildUpdateTagMsg(SSqlObj* pSql, SSqlInfo *pInfo) {
SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
tscDumpEpSetFromVgroupInfo(&pTableMetaInfo->pTableMeta->corVgroupInfo, &pSql->epList); tscDumpEpSetFromVgroupInfo(&pTableMetaInfo->pTableMeta->corVgroupInfo, &pSql->epSet);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -2011,9 +2000,9 @@ int tscProcessConnectRsp(SSqlObj *pSql) { ...@@ -2011,9 +2000,9 @@ int tscProcessConnectRsp(SSqlObj *pSql) {
assert(len <= sizeof(pObj->db)); assert(len <= sizeof(pObj->db));
tstrncpy(pObj->db, temp, sizeof(pObj->db)); tstrncpy(pObj->db, temp, sizeof(pObj->db));
if (pConnect->epList.numOfEps > 0) { if (pConnect->epSet.numOfEps > 0) {
tscEpSetHtons(&pConnect->epList); tscEpSetHtons(&pConnect->epSet);
tscUpdateMgmtEpList(&pConnect->epList); tscUpdateMgmtEpSet(&pConnect->epSet);
} }
strcpy(pObj->sversion, pConnect->serverVersion); strcpy(pObj->sversion, pConnect->serverVersion);
......
...@@ -62,7 +62,7 @@ SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con ...@@ -62,7 +62,7 @@ SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con
} }
if (ip) { if (ip) {
if (tscSetMgmtEpListFromCfg(ip, NULL) < 0) return NULL; if (tscSetMgmtEpSetFromCfg(ip, NULL) < 0) return NULL;
if (port) tscMgmtEpSet.epSet.port[0] = port; if (port) tscMgmtEpSet.epSet.port[0] = port;
} }
......
...@@ -2145,7 +2145,7 @@ char* strdup_throw(const char* str) { ...@@ -2145,7 +2145,7 @@ char* strdup_throw(const char* str) {
return p; return p;
} }
int tscSetMgmtEpListFromCfg(const char *first, const char *second) { int tscSetMgmtEpSetFromCfg(const char *first, const char *second) {
// init mgmt ip set // init mgmt ip set
tscMgmtEpSet.version = 0; tscMgmtEpSet.version = 0;
SRpcEpSet *mgmtEpSet = &(tscMgmtEpSet.epSet); SRpcEpSet *mgmtEpSet = &(tscMgmtEpSet.epSet);
......
...@@ -35,10 +35,10 @@ typedef struct SRpcEpSet { ...@@ -35,10 +35,10 @@ typedef struct SRpcEpSet {
char fqdn[TSDB_MAX_REPLICA][TSDB_FQDN_LEN]; char fqdn[TSDB_MAX_REPLICA][TSDB_FQDN_LEN];
} SRpcEpSet; } SRpcEpSet;
typedef struct SRpcCorIpSet { typedef struct SRpcCorEpSet {
int32_t version; int32_t version;
SRpcIpSet ipSet; SRpcEpSet epSet;
} SRpcCorIpSet; } SRpcCorEpSet;
typedef struct SRpcConnInfo { typedef struct SRpcConnInfo {
uint32_t clientIp; uint32_t clientIp;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册