未验证 提交 f28c39da 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #2708 from taosdata/feature/vnode

replace ipList and ipSet to epSet
...@@ -268,7 +268,7 @@ bool hasMoreClauseToTry(SSqlObj* pSql); ...@@ -268,7 +268,7 @@ bool hasMoreClauseToTry(SSqlObj* pSql);
void tscTryQueryNextVnode(SSqlObj *pSql, __async_cb_func_t fp); void tscTryQueryNextVnode(SSqlObj *pSql, __async_cb_func_t fp);
void tscAsyncQuerySingleRowForNextVnode(void *param, TAOS_RES *tres, int numOfRows); void tscAsyncQuerySingleRowForNextVnode(void *param, TAOS_RES *tres, int numOfRows);
void tscTryQueryNextClause(SSqlObj* pSql, __async_cb_func_t fp); void tscTryQueryNextClause(SSqlObj* pSql, __async_cb_func_t fp);
int tscSetMgmtIpListFromCfg(const char *first, const char *second); int tscSetMgmtEpSetFromCfg(const char *first, const char *second);
void* malloc_throw(size_t size); void* malloc_throw(size_t size);
void* calloc_throw(size_t nmemb, size_t size); void* calloc_throw(size_t nmemb, size_t size);
......
...@@ -306,7 +306,7 @@ typedef struct SSqlObj { ...@@ -306,7 +306,7 @@ typedef struct SSqlObj {
char * sqlstr; char * sqlstr;
char retry; char retry;
char maxRetry; char maxRetry;
SRpcIpSet ipList; SRpcEpSet epSet;
char listed; char listed;
tsem_t rspSem; tsem_t rspSem;
SSqlCmd cmd; SSqlCmd cmd;
...@@ -350,7 +350,7 @@ void tscInitMsgsFp(); ...@@ -350,7 +350,7 @@ void tscInitMsgsFp();
int tsParseSql(SSqlObj *pSql, bool initial); int tsParseSql(SSqlObj *pSql, bool initial);
void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcIpSet *pIpSet); void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet);
int tscProcessSql(SSqlObj *pSql); int tscProcessSql(SSqlObj *pSql);
int tscRenewTableMeta(SSqlObj *pSql, char *tableId); int tscRenewTableMeta(SSqlObj *pSql, char *tableId);
...@@ -456,7 +456,7 @@ extern void * tscQhandle; ...@@ -456,7 +456,7 @@ extern void * tscQhandle;
extern int tscKeepConn[]; extern int tscKeepConn[];
extern int tsInsertHeadSize; extern int tsInsertHeadSize;
extern int tscNumOfThreads; extern int tscNumOfThreads;
extern SRpcIpSet tscMgmtIpSet; extern SRpcEpSet tscMgmtEpSet;
extern int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo); extern int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo);
......
...@@ -2355,9 +2355,9 @@ bool validateIpAddress(const char* ip, size_t size) { ...@@ -2355,9 +2355,9 @@ bool validateIpAddress(const char* ip, size_t size) {
strncpy(tmp, ip, size); strncpy(tmp, ip, size);
in_addr_t ipAddr = inet_addr(tmp); in_addr_t epAddr = inet_addr(tmp);
return ipAddr != INADDR_NONE; return epAddr != INADDR_NONE;
} }
int32_t tscTansformSQLFuncForSTableQuery(SQueryInfo* pQueryInfo) { int32_t tscTansformSQLFuncForSTableQuery(SQueryInfo* pQueryInfo) {
......
...@@ -29,8 +29,8 @@ ...@@ -29,8 +29,8 @@
#define TSC_MGMT_VNODE 999 #define TSC_MGMT_VNODE 999
SRpcIpSet tscMgmtIpSet; SRpcEpSet tscMgmtEpSet;
SRpcIpSet tscDnodeIpSet; SRpcEpSet tscDnodeEpSet;
int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo) = {0}; int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo) = {0};
...@@ -44,44 +44,44 @@ void tscSaveSubscriptionProgress(void* sub); ...@@ -44,44 +44,44 @@ void tscSaveSubscriptionProgress(void* sub);
static int32_t minMsgSize() { return tsRpcHeadSize + 100; } static int32_t minMsgSize() { return tsRpcHeadSize + 100; }
static void tscSetDnodeIpList(SSqlObj* pSql, SCMVgroupInfo* pVgroupInfo) { static void tscSetDnodeEpSet(SSqlObj* pSql, SCMVgroupInfo* pVgroupInfo) {
SRpcIpSet* pIpList = &pSql->ipList; SRpcEpSet* pEpSet = &pSql->epSet;
pIpList->inUse = 0; pEpSet->inUse = 0;
if (pVgroupInfo == NULL) { if (pVgroupInfo == NULL) {
pIpList->numOfIps = 0; pEpSet->numOfEps = 0;
return; return;
} }
pIpList->numOfIps = pVgroupInfo->numOfIps; pEpSet->numOfEps = pVgroupInfo->numOfEps;
for(int32_t i = 0; i < pVgroupInfo->numOfIps; ++i) { for(int32_t i = 0; i < pVgroupInfo->numOfEps; ++i) {
strcpy(pIpList->fqdn[i], pVgroupInfo->ipAddr[i].fqdn); strcpy(pEpSet->fqdn[i], pVgroupInfo->epAddr[i].fqdn);
pIpList->port[i] = pVgroupInfo->ipAddr[i].port; pEpSet->port[i] = pVgroupInfo->epAddr[i].port;
} }
} }
void tscPrintMgmtIp() { void tscPrintMgmtEp() {
if (tscMgmtIpSet.numOfIps <= 0) { if (tscMgmtEpSet.numOfEps <= 0) {
tscError("invalid mnode IP list:%d", tscMgmtIpSet.numOfIps); tscError("invalid mnode EP list:%d", tscMgmtEpSet.numOfEps);
} else { } else {
for (int i = 0; i < tscMgmtIpSet.numOfIps; ++i) { for (int i = 0; i < tscMgmtEpSet.numOfEps; ++i) {
tscDebug("mnode index:%d %s:%d", i, tscMgmtIpSet.fqdn[i], tscMgmtIpSet.port[i]); tscDebug("mnode index:%d %s:%d", i, tscMgmtEpSet.fqdn[i], tscMgmtEpSet.port[i]);
} }
} }
} }
void tscSetMgmtIpList(SRpcIpSet *pIpList) { void tscSetMgmtEpSet(SRpcEpSet *pEpSet) {
tscMgmtIpSet.numOfIps = pIpList->numOfIps; tscMgmtEpSet.numOfEps = pEpSet->numOfEps;
tscMgmtIpSet.inUse = pIpList->inUse; tscMgmtEpSet.inUse = pEpSet->inUse;
for (int32_t i = 0; i < tscMgmtIpSet.numOfIps; ++i) { for (int32_t i = 0; i < tscMgmtEpSet.numOfEps; ++i) {
tscMgmtIpSet.port[i] = htons(pIpList->port[i]); tscMgmtEpSet.port[i] = htons(pEpSet->port[i]);
} }
} }
void tscUpdateIpSet(void *ahandle, SRpcIpSet *pIpSet) { void tscUpdateEpSet(void *ahandle, SRpcEpSet *pEpSet) {
tscMgmtIpSet = *pIpSet; tscMgmtEpSet = *pEpSet;
tscDebug("mnode IP list is changed for ufp is called, numOfIps:%d inUse:%d", tscMgmtIpSet.numOfIps, tscMgmtIpSet.inUse); tscDebug("mnode EP list is changed for ufp is called, numOfEps:%d inUse:%d", tscMgmtEpSet.numOfEps, tscMgmtEpSet.inUse);
for (int32_t i = 0; i < tscMgmtIpSet.numOfIps; ++i) { for (int32_t i = 0; i < tscMgmtEpSet.numOfEps; ++i) {
tscDebug("index:%d fqdn:%s port:%d", i, tscMgmtIpSet.fqdn[i], tscMgmtIpSet.port[i]); tscDebug("index:%d fqdn:%s port:%d", i, tscMgmtEpSet.fqdn[i], tscMgmtEpSet.port[i]);
} }
} }
...@@ -95,7 +95,7 @@ void tscUpdateIpSet(void *ahandle, SRpcIpSet *pIpSet) { ...@@ -95,7 +95,7 @@ void tscUpdateIpSet(void *ahandle, SRpcIpSet *pIpSet) {
UNUSED_FUNC UNUSED_FUNC
static int32_t tscGetMgmtConnMaxRetryTimes() { static int32_t tscGetMgmtConnMaxRetryTimes() {
int32_t factor = 2; int32_t factor = 2;
return tscMgmtIpSet.numOfIps * factor; return tscMgmtEpSet.numOfEps * factor;
} }
void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) { void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
...@@ -111,9 +111,9 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) { ...@@ -111,9 +111,9 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
if (code == 0) { if (code == 0) {
SCMHeartBeatRsp *pRsp = (SCMHeartBeatRsp *)pRes->pRsp; SCMHeartBeatRsp *pRsp = (SCMHeartBeatRsp *)pRes->pRsp;
SRpcIpSet * pIpList = &pRsp->ipList; SRpcEpSet * pEpSet = &pRsp->epSet;
if (pIpList->numOfIps > 0) if (pEpSet->numOfEps > 0)
tscSetMgmtIpList(pIpList); tscSetMgmtEpSet(pEpSet);
pSql->pTscObj->connId = htonl(pRsp->connId); pSql->pTscObj->connId = htonl(pRsp->connId);
...@@ -185,7 +185,7 @@ int tscSendMsgToServer(SSqlObj *pSql) { ...@@ -185,7 +185,7 @@ int tscSendMsgToServer(SSqlObj *pSql) {
// set the mgmt ip list // set the mgmt ip list
if (pSql->cmd.command >= TSDB_SQL_MGMT) { if (pSql->cmd.command >= TSDB_SQL_MGMT) {
pSql->ipList = tscMgmtIpSet; pSql->epSet = tscMgmtEpSet;
} }
memcpy(pMsg, pSql->cmd.payload, pSql->cmd.payloadLen); memcpy(pMsg, pSql->cmd.payload, pSql->cmd.payloadLen);
...@@ -203,11 +203,11 @@ int tscSendMsgToServer(SSqlObj *pSql) { ...@@ -203,11 +203,11 @@ int tscSendMsgToServer(SSqlObj *pSql) {
// Otherwise, the pSql object may have been released already during the response function, which is // Otherwise, the pSql object may have been released already during the response function, which is
// processMsgFromServer function. In the meanwhile, the assignment of the rpc context to sql object will absolutely // processMsgFromServer function. In the meanwhile, the assignment of the rpc context to sql object will absolutely
// cause crash. // cause crash.
rpcSendRequest(pObj->pDnodeConn, &pSql->ipList, &rpcMsg); rpcSendRequest(pObj->pDnodeConn, &pSql->epSet, &rpcMsg);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcIpSet *pIpSet) { void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
SSqlObj *pSql = (SSqlObj *)rpcMsg->ahandle; SSqlObj *pSql = (SSqlObj *)rpcMsg->ahandle;
if (pSql == NULL || pSql->signature != pSql) { if (pSql == NULL || pSql->signature != pSql) {
tscError("%p sql is already released", pSql); tscError("%p sql is already released", pSql);
...@@ -237,9 +237,9 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcIpSet *pIpSet) { ...@@ -237,9 +237,9 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcIpSet *pIpSet) {
} }
if (pCmd->command < TSDB_SQL_MGMT) { if (pCmd->command < TSDB_SQL_MGMT) {
if (pIpSet) pSql->ipList = *pIpSet; if (pEpSet) pSql->epSet = *pEpSet;
} else { } else {
if (pIpSet) tscMgmtIpSet = *pIpSet; if (pEpSet) tscMgmtEpSet = *pEpSet;
} }
if (rpcMsg->pCont == NULL) { if (rpcMsg->pCont == NULL) {
...@@ -421,7 +421,7 @@ int tscProcessSql(SSqlObj *pSql) { ...@@ -421,7 +421,7 @@ int tscProcessSql(SSqlObj *pSql) {
return pSql->res.code; return pSql->res.code;
} }
} else if (pCmd->command < TSDB_SQL_LOCAL) { } else if (pCmd->command < TSDB_SQL_LOCAL) {
pSql->ipList = tscMgmtIpSet; pSql->epSet = tscMgmtEpSet;
} else { // local handler } else { // local handler
return (*tscProcessMsgRsp[pCmd->command])(pSql); return (*tscProcessMsgRsp[pCmd->command])(pSql);
} }
...@@ -525,10 +525,10 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -525,10 +525,10 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
// pSql->cmd.payloadLen is set during copying data into payload // pSql->cmd.payloadLen is set during copying data into payload
pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT; pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT;
tscSetDnodeIpList(pSql, &pTableMeta->vgroupInfo); tscSetDnodeEpSet(pSql, &pTableMeta->vgroupInfo);
tscDebug("%p build submit msg, vgId:%d numOfTables:%d numberOfIP:%d", pSql, vgId, pSql->cmd.numOfTablesInSubmit, tscDebug("%p build submit msg, vgId:%d numOfTables:%d numberOfEP:%d", pSql, vgId, pSql->cmd.numOfTablesInSubmit,
pSql->ipList.numOfIps); pSql->epSet.numOfEps);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -568,7 +568,7 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char ...@@ -568,7 +568,7 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char
pVgroupInfo = &pTableMeta->vgroupInfo; pVgroupInfo = &pTableMeta->vgroupInfo;
} }
tscSetDnodeIpList(pSql, pVgroupInfo); tscSetDnodeEpSet(pSql, pVgroupInfo);
if (pVgroupInfo != NULL) { if (pVgroupInfo != NULL) {
pQueryMsg->head.vgId = htonl(pVgroupInfo->vgId); pQueryMsg->head.vgId = htonl(pVgroupInfo->vgId);
} }
...@@ -580,7 +580,7 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char ...@@ -580,7 +580,7 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char
pQueryMsg->numOfTables = htonl(1); // set the number of tables pQueryMsg->numOfTables = htonl(1); // set the number of tables
pMsg += sizeof(STableIdInfo); pMsg += sizeof(STableIdInfo);
} else { // it is a subquery of the super table query, this IP info is acquired from vgroupInfo } else { // it is a subquery of the super table query, this EP info is acquired from vgroupInfo
int32_t index = pTableMetaInfo->vgroupIndex; int32_t index = pTableMetaInfo->vgroupIndex;
int32_t numOfVgroups = taosArrayGetSize(pTableMetaInfo->pVgroupTables); int32_t numOfVgroups = taosArrayGetSize(pTableMetaInfo->pVgroupTables);
assert(index >= 0 && index < numOfVgroups); assert(index >= 0 && index < numOfVgroups);
...@@ -590,7 +590,7 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char ...@@ -590,7 +590,7 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char
SVgroupTableInfo* pTableIdList = taosArrayGet(pTableMetaInfo->pVgroupTables, index); SVgroupTableInfo* pTableIdList = taosArrayGet(pTableMetaInfo->pVgroupTables, index);
// set the vgroup info // set the vgroup info
tscSetDnodeIpList(pSql, &pTableIdList->vgInfo); tscSetDnodeEpSet(pSql, &pTableIdList->vgInfo);
pQueryMsg->head.vgId = htonl(pTableIdList->vgInfo.vgId); pQueryMsg->head.vgId = htonl(pTableIdList->vgInfo.vgId);
int32_t numOfTables = taosArrayGetSize(pTableIdList->itemList); int32_t numOfTables = taosArrayGetSize(pTableIdList->itemList);
...@@ -1323,7 +1323,7 @@ int tscBuildUpdateTagMsg(SSqlObj* pSql, SSqlInfo *pInfo) { ...@@ -1323,7 +1323,7 @@ int tscBuildUpdateTagMsg(SSqlObj* pSql, SSqlInfo *pInfo) {
SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
tscSetDnodeIpList(pSql, &pTableMetaInfo->pTableMeta->vgroupInfo); tscSetDnodeEpSet(pSql, &pTableMetaInfo->pTableMeta->vgroupInfo);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -1658,8 +1658,8 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) { ...@@ -1658,8 +1658,8 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) {
pMetaMsg->contLen = htons(pMetaMsg->contLen); pMetaMsg->contLen = htons(pMetaMsg->contLen);
pMetaMsg->numOfColumns = htons(pMetaMsg->numOfColumns); pMetaMsg->numOfColumns = htons(pMetaMsg->numOfColumns);
if (pMetaMsg->sid < 0 || pMetaMsg->vgroup.numOfIps < 0) { if (pMetaMsg->sid < 0 || pMetaMsg->vgroup.numOfEps < 0) {
tscError("invalid meter vgId:%d, sid%d", pMetaMsg->vgroup.numOfIps, pMetaMsg->sid); tscError("invalid meter vgId:%d, sid%d", pMetaMsg->vgroup.numOfEps, pMetaMsg->sid);
return TSDB_CODE_TSC_INVALID_VALUE; return TSDB_CODE_TSC_INVALID_VALUE;
} }
...@@ -1673,8 +1673,8 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) { ...@@ -1673,8 +1673,8 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) {
return TSDB_CODE_TSC_INVALID_VALUE; return TSDB_CODE_TSC_INVALID_VALUE;
} }
for (int i = 0; i < pMetaMsg->vgroup.numOfIps; ++i) { for (int i = 0; i < pMetaMsg->vgroup.numOfEps; ++i) {
pMetaMsg->vgroup.ipAddr[i].port = htons(pMetaMsg->vgroup.ipAddr[i].port); pMetaMsg->vgroup.epAddr[i].port = htons(pMetaMsg->vgroup.epAddr[i].port);
} }
SSchema* pSchema = pMetaMsg->schema; SSchema* pSchema = pMetaMsg->schema;
...@@ -1850,10 +1850,10 @@ int tscProcessSTableVgroupRsp(SSqlObj *pSql) { ...@@ -1850,10 +1850,10 @@ int tscProcessSTableVgroupRsp(SSqlObj *pSql) {
SCMVgroupInfo *pVgroups = &pInfo->vgroupList->vgroups[j]; SCMVgroupInfo *pVgroups = &pInfo->vgroupList->vgroups[j];
pVgroups->vgId = htonl(pVgroups->vgId); pVgroups->vgId = htonl(pVgroups->vgId);
assert(pVgroups->numOfIps >= 1); assert(pVgroups->numOfEps >= 1);
for (int32_t k = 0; k < pVgroups->numOfIps; ++k) { for (int32_t k = 0; k < pVgroups->numOfEps; ++k) {
pVgroups->ipAddr[k].port = htons(pVgroups->ipAddr[k].port); pVgroups->epAddr[k].port = htons(pVgroups->epAddr[k].port);
} }
pMsg += size; pMsg += size;
...@@ -1946,8 +1946,8 @@ int tscProcessConnectRsp(SSqlObj *pSql) { ...@@ -1946,8 +1946,8 @@ int tscProcessConnectRsp(SSqlObj *pSql) {
assert(len <= sizeof(pObj->db)); assert(len <= sizeof(pObj->db));
tstrncpy(pObj->db, temp, sizeof(pObj->db)); tstrncpy(pObj->db, temp, sizeof(pObj->db));
if (pConnect->ipList.numOfIps > 0) if (pConnect->epSet.numOfEps > 0)
tscSetMgmtIpList(&pConnect->ipList); tscSetMgmtEpSet(&pConnect->epSet);
strcpy(pObj->sversion, pConnect->serverVersion); strcpy(pObj->sversion, pConnect->serverVersion);
pObj->writeAuth = pConnect->writeAuth; pObj->writeAuth = pConnect->writeAuth;
......
...@@ -62,8 +62,8 @@ SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con ...@@ -62,8 +62,8 @@ SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con
} }
if (ip) { if (ip) {
if (tscSetMgmtIpListFromCfg(ip, NULL) < 0) return NULL; if (tscSetMgmtEpSetFromCfg(ip, NULL) < 0) return NULL;
if (port) tscMgmtIpSet.port[0] = port; if (port) tscMgmtEpSet.port[0] = port;
} }
void *pDnodeConn = NULL; void *pDnodeConn = NULL;
......
...@@ -458,7 +458,7 @@ void tscBuildVgroupTableInfo(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, SArr ...@@ -458,7 +458,7 @@ void tscBuildVgroupTableInfo(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, SArr
break; break;
} }
} }
assert(info.vgInfo.numOfIps != 0); assert(info.vgInfo.numOfEps != 0);
vgTables = taosArrayInit(4, sizeof(STableIdInfo)); vgTables = taosArrayInit(4, sizeof(STableIdInfo));
info.itemList = vgTables; info.itemList = vgTables;
...@@ -1600,8 +1600,8 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p ...@@ -1600,8 +1600,8 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p
// data in from current vnode is stored in cache and disk // data in from current vnode is stored in cache and disk
uint32_t numOfRowsFromSubquery = trsupport->pExtMemBuffer[idx]->numOfTotalElems + trsupport->localBuffer->num; uint32_t numOfRowsFromSubquery = trsupport->pExtMemBuffer[idx]->numOfTotalElems + trsupport->localBuffer->num;
tscDebug("%p sub:%p all data retrieved from ip:%s, vgId:%d, numOfRows:%d, orderOfSub:%d", pParentSql, pSql, tscDebug("%p sub:%p all data retrieved from ep:%s, vgId:%d, numOfRows:%d, orderOfSub:%d", pParentSql, pSql,
pTableMetaInfo->vgroupList->vgroups[0].ipAddr[0].fqdn, pTableMetaInfo->vgroupList->vgroups[0].vgId, pTableMetaInfo->vgroupList->vgroups[0].epAddr[0].fqdn, pTableMetaInfo->vgroupList->vgroups[0].vgId,
numOfRowsFromSubquery, idx); numOfRowsFromSubquery, idx);
tColModelCompact(pDesc->pColumnModel, trsupport->localBuffer, pDesc->pColumnModel->capacity); tColModelCompact(pDesc->pColumnModel, trsupport->localBuffer, pDesc->pColumnModel->capacity);
...@@ -1719,8 +1719,8 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR ...@@ -1719,8 +1719,8 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR
assert(pRes->numOfRows == numOfRows); assert(pRes->numOfRows == numOfRows);
int64_t num = atomic_add_fetch_64(&pState->numOfRetrievedRows, numOfRows); int64_t num = atomic_add_fetch_64(&pState->numOfRetrievedRows, numOfRows);
tscDebug("%p sub:%p retrieve numOfRows:%" PRId64 " totalNumOfRows:%" PRIu64 " from ip:%s, orderOfSub:%d", pParentSql, pSql, tscDebug("%p sub:%p retrieve numOfRows:%" PRId64 " totalNumOfRows:%" PRIu64 " from ep:%s, orderOfSub:%d", pParentSql, pSql,
pRes->numOfRows, pState->numOfRetrievedRows, pSql->ipList.fqdn[pSql->ipList.inUse], idx); pRes->numOfRows, pState->numOfRetrievedRows, pSql->epSet.fqdn[pSql->epSet.inUse], idx);
if (num > tsMaxNumOfOrderedResults && tscIsProjectionQueryOnSTable(pQueryInfo, 0)) { if (num > tsMaxNumOfOrderedResults && tscIsProjectionQueryOnSTable(pQueryInfo, 0)) {
tscError("%p sub:%p num of OrderedRes is too many, max allowed:%" PRId32 " , current:%" PRId64, tscError("%p sub:%p num of OrderedRes is too many, max allowed:%" PRId32 " , current:%" PRId64,
...@@ -1828,8 +1828,8 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) { ...@@ -1828,8 +1828,8 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
return; return;
} }
tscTrace("%p sub:%p query complete, ip:%s, vgId:%d, orderOfSub:%d, retrieve data", trsupport->pParentSql, pSql, tscTrace("%p sub:%p query complete, ep:%s, vgId:%d, orderOfSub:%d, retrieve data", trsupport->pParentSql, pSql,
pVgroup->ipAddr[0].fqdn, pVgroup->vgId, trsupport->subqueryIndex); pVgroup->epAddr[0].fqdn, pVgroup->vgId, trsupport->subqueryIndex);
if (pSql->res.qhandle == 0) { // qhandle is NULL, code is TSDB_CODE_SUCCESS means no results generated from this vnode if (pSql->res.qhandle == 0) { // qhandle is NULL, code is TSDB_CODE_SUCCESS means no results generated from this vnode
tscRetrieveFromDnodeCallBack(param, pSql, 0); tscRetrieveFromDnodeCallBack(param, pSql, 0);
......
...@@ -41,7 +41,7 @@ int tscNumOfThreads; ...@@ -41,7 +41,7 @@ int tscNumOfThreads;
static pthread_once_t tscinit = PTHREAD_ONCE_INIT; static pthread_once_t tscinit = PTHREAD_ONCE_INIT;
void taosInitNote(int numOfNoteLines, int maxNotes, char* lable); void taosInitNote(int numOfNoteLines, int maxNotes, char* lable);
void tscUpdateIpSet(void *ahandle, SRpcIpSet *pIpSet); void tscUpdateEpSet(void *ahandle, SRpcEpSet *pEpSet);
void tscCheckDiskUsage(void *UNUSED_PARAM(para), void* UNUSED_PARAM(param)) { void tscCheckDiskUsage(void *UNUSED_PARAM(para), void* UNUSED_PARAM(param)) {
taosGetDisk(); taosGetDisk();
...@@ -116,8 +116,8 @@ void taos_init_imp() { ...@@ -116,8 +116,8 @@ void taos_init_imp() {
taosInitNote(tsNumOfLogLines / 10, 1, (char*)"tsc_note"); taosInitNote(tsNumOfLogLines / 10, 1, (char*)"tsc_note");
} }
if (tscSetMgmtIpListFromCfg(tsFirst, tsSecond) < 0) { if (tscSetMgmtEpSetFromCfg(tsFirst, tsSecond) < 0) {
tscError("failed to init mnode IP list"); tscError("failed to init mnode EP list");
return; return;
} }
......
...@@ -2145,17 +2145,17 @@ char* strdup_throw(const char* str) { ...@@ -2145,17 +2145,17 @@ char* strdup_throw(const char* str) {
return p; return p;
} }
int tscSetMgmtIpListFromCfg(const char *first, const char *second) { int tscSetMgmtEpSetFromCfg(const char *first, const char *second) {
tscMgmtIpSet.numOfIps = 0; tscMgmtEpSet.numOfEps = 0;
tscMgmtIpSet.inUse = 0; tscMgmtEpSet.inUse = 0;
if (first && first[0] != 0) { if (first && first[0] != 0) {
if (strlen(first) >= TSDB_EP_LEN) { if (strlen(first) >= TSDB_EP_LEN) {
terrno = TSDB_CODE_TSC_INVALID_FQDN; terrno = TSDB_CODE_TSC_INVALID_FQDN;
return -1; return -1;
} }
taosGetFqdnPortFromEp(first, tscMgmtIpSet.fqdn[tscMgmtIpSet.numOfIps], &tscMgmtIpSet.port[tscMgmtIpSet.numOfIps]); taosGetFqdnPortFromEp(first, tscMgmtEpSet.fqdn[tscMgmtEpSet.numOfEps], &tscMgmtEpSet.port[tscMgmtEpSet.numOfEps]);
tscMgmtIpSet.numOfIps++; tscMgmtEpSet.numOfEps++;
} }
if (second && second[0] != 0) { if (second && second[0] != 0) {
...@@ -2163,11 +2163,11 @@ int tscSetMgmtIpListFromCfg(const char *first, const char *second) { ...@@ -2163,11 +2163,11 @@ int tscSetMgmtIpListFromCfg(const char *first, const char *second) {
terrno = TSDB_CODE_TSC_INVALID_FQDN; terrno = TSDB_CODE_TSC_INVALID_FQDN;
return -1; return -1;
} }
taosGetFqdnPortFromEp(second, tscMgmtIpSet.fqdn[tscMgmtIpSet.numOfIps], &tscMgmtIpSet.port[tscMgmtIpSet.numOfIps]); taosGetFqdnPortFromEp(second, tscMgmtEpSet.fqdn[tscMgmtEpSet.numOfEps], &tscMgmtEpSet.port[tscMgmtEpSet.numOfEps]);
tscMgmtIpSet.numOfIps++; tscMgmtEpSet.numOfEps++;
} }
if ( tscMgmtIpSet.numOfIps == 0) { if ( tscMgmtEpSet.numOfEps == 0) {
terrno = TSDB_CODE_TSC_INVALID_FQDN; terrno = TSDB_CODE_TSC_INVALID_FQDN;
return -1; return -1;
} }
......
...@@ -35,8 +35,8 @@ void* dnodeGetVnodeTsdb(void *pVnode); ...@@ -35,8 +35,8 @@ void* dnodeGetVnodeTsdb(void *pVnode);
void dnodeReleaseVnode(void *pVnode); void dnodeReleaseVnode(void *pVnode);
void dnodeSendRedirectMsg(SRpcMsg *rpcMsg, bool forShell); void dnodeSendRedirectMsg(SRpcMsg *rpcMsg, bool forShell);
void dnodeGetMnodeIpSetForPeer(void *ipSet); void dnodeGetMnodeEpSetForPeer(void *epSet);
void dnodeGetMnodeIpSetForShell(void *ipSet); void dnodeGetMnodeEpSetForShell(void *epSet);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -52,7 +52,7 @@ void * tsDnodeTmr = NULL; ...@@ -52,7 +52,7 @@ void * tsDnodeTmr = NULL;
static void * tsStatusTimer = NULL; static void * tsStatusTimer = NULL;
static uint32_t tsRebootTime; static uint32_t tsRebootTime;
static SRpcIpSet tsDMnodeIpSet = {0}; static SRpcEpSet tsDMnodeEpSet = {0};
static SDMMnodeInfos tsDMnodeInfos = {0}; static SDMMnodeInfos tsDMnodeInfos = {0};
static SDMDnodeCfg tsDnodeCfg = {0}; static SDMDnodeCfg tsDnodeCfg = {0};
static taos_qset tsMgmtQset = NULL; static taos_qset tsMgmtQset = NULL;
...@@ -90,21 +90,21 @@ int32_t dnodeInitMgmt() { ...@@ -90,21 +90,21 @@ int32_t dnodeInitMgmt() {
tsRebootTime = taosGetTimestampSec(); tsRebootTime = taosGetTimestampSec();
if (!dnodeReadMnodeInfos()) { if (!dnodeReadMnodeInfos()) {
memset(&tsDMnodeIpSet, 0, sizeof(SRpcIpSet)); memset(&tsDMnodeEpSet, 0, sizeof(SRpcEpSet));
memset(&tsDMnodeInfos, 0, sizeof(SDMMnodeInfos)); memset(&tsDMnodeInfos, 0, sizeof(SDMMnodeInfos));
tsDMnodeIpSet.numOfIps = 1; tsDMnodeEpSet.numOfEps = 1;
taosGetFqdnPortFromEp(tsFirst, tsDMnodeIpSet.fqdn[0], &tsDMnodeIpSet.port[0]); taosGetFqdnPortFromEp(tsFirst, tsDMnodeEpSet.fqdn[0], &tsDMnodeEpSet.port[0]);
if (strcmp(tsSecond, tsFirst) != 0) { if (strcmp(tsSecond, tsFirst) != 0) {
tsDMnodeIpSet.numOfIps = 2; tsDMnodeEpSet.numOfEps = 2;
taosGetFqdnPortFromEp(tsSecond, tsDMnodeIpSet.fqdn[1], &tsDMnodeIpSet.port[1]); taosGetFqdnPortFromEp(tsSecond, tsDMnodeEpSet.fqdn[1], &tsDMnodeEpSet.port[1]);
} }
} else { } else {
tsDMnodeIpSet.inUse = tsDMnodeInfos.inUse; tsDMnodeEpSet.inUse = tsDMnodeInfos.inUse;
tsDMnodeIpSet.numOfIps = tsDMnodeInfos.nodeNum; tsDMnodeEpSet.numOfEps = tsDMnodeInfos.nodeNum;
for (int32_t i = 0; i < tsDMnodeInfos.nodeNum; i++) { for (int32_t i = 0; i < tsDMnodeInfos.nodeNum; i++) {
taosGetFqdnPortFromEp(tsDMnodeInfos.nodeInfos[i].nodeEp, tsDMnodeIpSet.fqdn[i], &tsDMnodeIpSet.port[i]); taosGetFqdnPortFromEp(tsDMnodeInfos.nodeInfos[i].nodeEp, tsDMnodeEpSet.fqdn[i], &tsDMnodeEpSet.port[i]);
} }
} }
...@@ -450,27 +450,27 @@ static int32_t dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg) { ...@@ -450,27 +450,27 @@ static int32_t dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg) {
return taosCfgDynamicOptions(pCfg->config); return taosCfgDynamicOptions(pCfg->config);
} }
void dnodeUpdateMnodeIpSetForPeer(SRpcIpSet *pIpSet) { void dnodeUpdateMnodeEpSetForPeer(SRpcEpSet *pEpSet) {
dInfo("mnode IP list for is changed, numOfIps:%d inUse:%d", pIpSet->numOfIps, pIpSet->inUse); dInfo("mnode EP list for is changed, numOfEps:%d inUse:%d", pEpSet->numOfEps, pEpSet->inUse);
for (int i = 0; i < pIpSet->numOfIps; ++i) { for (int i = 0; i < pEpSet->numOfEps; ++i) {
pIpSet->port[i] -= TSDB_PORT_DNODEDNODE; pEpSet->port[i] -= TSDB_PORT_DNODEDNODE;
dInfo("mnode index:%d %s:%u", i, pIpSet->fqdn[i], pIpSet->port[i]) dInfo("mnode index:%d %s:%u", i, pEpSet->fqdn[i], pEpSet->port[i])
} }
tsDMnodeIpSet = *pIpSet; tsDMnodeEpSet = *pEpSet;
} }
void dnodeGetMnodeIpSetForPeer(void *ipSetRaw) { void dnodeGetMnodeEpSetForPeer(void *epSetRaw) {
SRpcIpSet *ipSet = ipSetRaw; SRpcEpSet *epSet = epSetRaw;
*ipSet = tsDMnodeIpSet; *epSet = tsDMnodeEpSet;
for (int i=0; i<ipSet->numOfIps; ++i) for (int i=0; i<epSet->numOfEps; ++i)
ipSet->port[i] += TSDB_PORT_DNODEDNODE; epSet->port[i] += TSDB_PORT_DNODEDNODE;
} }
void dnodeGetMnodeIpSetForShell(void *ipSetRaw) { void dnodeGetMnodeEpSetForShell(void *epSetRaw) {
SRpcIpSet *ipSet = ipSetRaw; SRpcEpSet *epSet = epSetRaw;
*ipSet = tsDMnodeIpSet; *epSet = tsDMnodeEpSet;
} }
static void dnodeProcessStatusRsp(SRpcMsg *pMsg) { static void dnodeProcessStatusRsp(SRpcMsg *pMsg) {
...@@ -536,10 +536,10 @@ static void dnodeUpdateMnodeInfos(SDMMnodeInfos *pMnodes) { ...@@ -536,10 +536,10 @@ static void dnodeUpdateMnodeInfos(SDMMnodeInfos *pMnodes) {
dInfo("mnode index:%d, %s", tsDMnodeInfos.nodeInfos[i].nodeId, tsDMnodeInfos.nodeInfos[i].nodeEp); dInfo("mnode index:%d, %s", tsDMnodeInfos.nodeInfos[i].nodeId, tsDMnodeInfos.nodeInfos[i].nodeEp);
} }
tsDMnodeIpSet.inUse = tsDMnodeInfos.inUse; tsDMnodeEpSet.inUse = tsDMnodeInfos.inUse;
tsDMnodeIpSet.numOfIps = tsDMnodeInfos.nodeNum; tsDMnodeEpSet.numOfEps = tsDMnodeInfos.nodeNum;
for (int32_t i = 0; i < tsDMnodeInfos.nodeNum; i++) { for (int32_t i = 0; i < tsDMnodeInfos.nodeNum; i++) {
taosGetFqdnPortFromEp(tsDMnodeInfos.nodeInfos[i].nodeEp, tsDMnodeIpSet.fqdn[i], &tsDMnodeIpSet.port[i]); taosGetFqdnPortFromEp(tsDMnodeInfos.nodeInfos[i].nodeEp, tsDMnodeEpSet.fqdn[i], &tsDMnodeEpSet.port[i]);
} }
dnodeSaveMnodeInfos(); dnodeSaveMnodeInfos();
...@@ -549,10 +549,10 @@ static void dnodeUpdateMnodeInfos(SDMMnodeInfos *pMnodes) { ...@@ -549,10 +549,10 @@ static void dnodeUpdateMnodeInfos(SDMMnodeInfos *pMnodes) {
static bool dnodeReadMnodeInfos() { static bool dnodeReadMnodeInfos() {
char ipFile[TSDB_FILENAME_LEN*2] = {0}; char ipFile[TSDB_FILENAME_LEN*2] = {0};
sprintf(ipFile, "%s/mnodeIpList.json", tsDnodeDir); sprintf(ipFile, "%s/mnodeEpSet.json", tsDnodeDir);
FILE *fp = fopen(ipFile, "r"); FILE *fp = fopen(ipFile, "r");
if (!fp) { if (!fp) {
dDebug("failed to read mnodeIpList.json, file not exist"); dDebug("failed to read mnodeEpSet.json, file not exist");
return false; return false;
} }
...@@ -563,40 +563,40 @@ static bool dnodeReadMnodeInfos() { ...@@ -563,40 +563,40 @@ static bool dnodeReadMnodeInfos() {
if (len <= 0) { if (len <= 0) {
free(content); free(content);
fclose(fp); fclose(fp);
dError("failed to read mnodeIpList.json, content is null"); dError("failed to read mnodeEpSet.json, content is null");
return false; return false;
} }
content[len] = 0; content[len] = 0;
cJSON* root = cJSON_Parse(content); cJSON* root = cJSON_Parse(content);
if (root == NULL) { if (root == NULL) {
dError("failed to read mnodeIpList.json, invalid json format"); dError("failed to read mnodeEpSet.json, invalid json format");
goto PARSE_OVER; goto PARSE_OVER;
} }
cJSON* inUse = cJSON_GetObjectItem(root, "inUse"); cJSON* inUse = cJSON_GetObjectItem(root, "inUse");
if (!inUse || inUse->type != cJSON_Number) { if (!inUse || inUse->type != cJSON_Number) {
dError("failed to read mnodeIpList.json, inUse not found"); dError("failed to read mnodeEpSet.json, inUse not found");
goto PARSE_OVER; goto PARSE_OVER;
} }
tsDMnodeInfos.inUse = inUse->valueint; tsDMnodeInfos.inUse = inUse->valueint;
cJSON* nodeNum = cJSON_GetObjectItem(root, "nodeNum"); cJSON* nodeNum = cJSON_GetObjectItem(root, "nodeNum");
if (!nodeNum || nodeNum->type != cJSON_Number) { if (!nodeNum || nodeNum->type != cJSON_Number) {
dError("failed to read mnodeIpList.json, nodeNum not found"); dError("failed to read mnodeEpSet.json, nodeNum not found");
goto PARSE_OVER; goto PARSE_OVER;
} }
tsDMnodeInfos.nodeNum = nodeNum->valueint; tsDMnodeInfos.nodeNum = nodeNum->valueint;
cJSON* nodeInfos = cJSON_GetObjectItem(root, "nodeInfos"); cJSON* nodeInfos = cJSON_GetObjectItem(root, "nodeInfos");
if (!nodeInfos || nodeInfos->type != cJSON_Array) { if (!nodeInfos || nodeInfos->type != cJSON_Array) {
dError("failed to read mnodeIpList.json, nodeInfos not found"); dError("failed to read mnodeEpSet.json, nodeInfos not found");
goto PARSE_OVER; goto PARSE_OVER;
} }
int size = cJSON_GetArraySize(nodeInfos); int size = cJSON_GetArraySize(nodeInfos);
if (size != tsDMnodeInfos.nodeNum) { if (size != tsDMnodeInfos.nodeNum) {
dError("failed to read mnodeIpList.json, nodeInfos size not matched"); dError("failed to read mnodeEpSet.json, nodeInfos size not matched");
goto PARSE_OVER; goto PARSE_OVER;
} }
...@@ -606,14 +606,14 @@ static bool dnodeReadMnodeInfos() { ...@@ -606,14 +606,14 @@ static bool dnodeReadMnodeInfos() {
cJSON *nodeId = cJSON_GetObjectItem(nodeInfo, "nodeId"); cJSON *nodeId = cJSON_GetObjectItem(nodeInfo, "nodeId");
if (!nodeId || nodeId->type != cJSON_Number) { if (!nodeId || nodeId->type != cJSON_Number) {
dError("failed to read mnodeIpList.json, nodeId not found"); dError("failed to read mnodeEpSet.json, nodeId not found");
goto PARSE_OVER; goto PARSE_OVER;
} }
tsDMnodeInfos.nodeInfos[i].nodeId = nodeId->valueint; tsDMnodeInfos.nodeInfos[i].nodeId = nodeId->valueint;
cJSON *nodeEp = cJSON_GetObjectItem(nodeInfo, "nodeEp"); cJSON *nodeEp = cJSON_GetObjectItem(nodeInfo, "nodeEp");
if (!nodeEp || nodeEp->type != cJSON_String || nodeEp->valuestring == NULL) { if (!nodeEp || nodeEp->type != cJSON_String || nodeEp->valuestring == NULL) {
dError("failed to read mnodeIpList.json, nodeName not found"); dError("failed to read mnodeEpSet.json, nodeName not found");
goto PARSE_OVER; goto PARSE_OVER;
} }
strncpy(tsDMnodeInfos.nodeInfos[i].nodeEp, nodeEp->valuestring, TSDB_EP_LEN); strncpy(tsDMnodeInfos.nodeInfos[i].nodeEp, nodeEp->valuestring, TSDB_EP_LEN);
...@@ -621,7 +621,7 @@ static bool dnodeReadMnodeInfos() { ...@@ -621,7 +621,7 @@ static bool dnodeReadMnodeInfos() {
ret = true; ret = true;
dInfo("read mnode iplist successed, numOfIps:%d inUse:%d", tsDMnodeInfos.nodeNum, tsDMnodeInfos.inUse); dInfo("read mnode epSet successed, numOfEps:%d inUse:%d", tsDMnodeInfos.nodeNum, tsDMnodeInfos.inUse);
for (int32_t i = 0; i < tsDMnodeInfos.nodeNum; i++) { for (int32_t i = 0; i < tsDMnodeInfos.nodeNum; i++) {
dInfo("mnode:%d, %s", tsDMnodeInfos.nodeInfos[i].nodeId, tsDMnodeInfos.nodeInfos[i].nodeEp); dInfo("mnode:%d, %s", tsDMnodeInfos.nodeInfos[i].nodeId, tsDMnodeInfos.nodeInfos[i].nodeEp);
} }
...@@ -635,7 +635,7 @@ PARSE_OVER: ...@@ -635,7 +635,7 @@ PARSE_OVER:
static void dnodeSaveMnodeInfos() { static void dnodeSaveMnodeInfos() {
char ipFile[TSDB_FILENAME_LEN] = {0}; char ipFile[TSDB_FILENAME_LEN] = {0};
sprintf(ipFile, "%s/mnodeIpList.json", tsDnodeDir); sprintf(ipFile, "%s/mnodeEpSet.json", tsDnodeDir);
FILE *fp = fopen(ipFile, "w"); FILE *fp = fopen(ipFile, "w");
if (!fp) return; if (!fp) return;
...@@ -663,11 +663,11 @@ static void dnodeSaveMnodeInfos() { ...@@ -663,11 +663,11 @@ static void dnodeSaveMnodeInfos() {
fclose(fp); fclose(fp);
free(content); free(content);
dInfo("save mnode iplist successed"); dInfo("save mnode epSet successed");
} }
char *dnodeGetMnodeMasterEp() { char *dnodeGetMnodeMasterEp() {
return tsDMnodeInfos.nodeInfos[tsDMnodeIpSet.inUse].nodeEp; return tsDMnodeInfos.nodeInfos[tsDMnodeEpSet.inUse].nodeEp;
} }
void* dnodeGetMnodeInfos() { void* dnodeGetMnodeInfos() {
...@@ -726,9 +726,9 @@ static void dnodeSendStatusMsg(void *handle, void *tmrId) { ...@@ -726,9 +726,9 @@ static void dnodeSendStatusMsg(void *handle, void *tmrId) {
.msgType = TSDB_MSG_TYPE_DM_STATUS .msgType = TSDB_MSG_TYPE_DM_STATUS
}; };
SRpcIpSet ipSet; SRpcEpSet epSet;
dnodeGetMnodeIpSetForPeer(&ipSet); dnodeGetMnodeEpSetForPeer(&epSet);
dnodeSendMsgToDnode(&ipSet, &rpcMsg); dnodeSendMsgToDnode(&epSet, &rpcMsg);
} }
static bool dnodeReadDnodeCfg() { static bool dnodeReadDnodeCfg() {
...@@ -817,20 +817,20 @@ void dnodeSendRedirectMsg(SRpcMsg *rpcMsg, bool forShell) { ...@@ -817,20 +817,20 @@ void dnodeSendRedirectMsg(SRpcMsg *rpcMsg, bool forShell) {
SRpcConnInfo connInfo = {0}; SRpcConnInfo connInfo = {0};
rpcGetConnInfo(rpcMsg->handle, &connInfo); rpcGetConnInfo(rpcMsg->handle, &connInfo);
SRpcIpSet ipSet = {0}; SRpcEpSet epSet = {0};
if (forShell) { if (forShell) {
dnodeGetMnodeIpSetForShell(&ipSet); dnodeGetMnodeEpSetForShell(&epSet);
} else { } else {
dnodeGetMnodeIpSetForPeer(&ipSet); dnodeGetMnodeEpSetForPeer(&epSet);
} }
dDebug("msg:%s will be redirected, dnodeIp:%s user:%s, numOfIps:%d inUse:%d", taosMsg[rpcMsg->msgType], dDebug("msg:%s will be redirected, dnodeIp:%s user:%s, numOfEps:%d inUse:%d", taosMsg[rpcMsg->msgType],
taosIpStr(connInfo.clientIp), connInfo.user, ipSet.numOfIps, ipSet.inUse); taosIpStr(connInfo.clientIp), connInfo.user, epSet.numOfEps, epSet.inUse);
for (int i = 0; i < ipSet.numOfIps; ++i) { for (int i = 0; i < epSet.numOfEps; ++i) {
dDebug("mnode index:%d %s:%d", i, ipSet.fqdn[i], ipSet.port[i]); dDebug("mnode index:%d %s:%d", i, epSet.fqdn[i], epSet.port[i]);
ipSet.port[i] = htons(ipSet.port[i]); epSet.port[i] = htons(epSet.port[i]);
} }
rpcSendRedirectRsp(rpcMsg->handle, &ipSet); rpcSendRedirectRsp(rpcMsg->handle, &epSet);
} }
...@@ -29,11 +29,11 @@ ...@@ -29,11 +29,11 @@
#include "dnodeVWrite.h" #include "dnodeVWrite.h"
#include "dnodeMPeer.h" #include "dnodeMPeer.h"
extern void dnodeUpdateMnodeIpSetForPeer(SRpcIpSet *pIpSet); extern void dnodeUpdateMnodeEpSetForPeer(SRpcEpSet *pEpSet);
static void (*dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *); static void (*dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *);
static void dnodeProcessReqMsgFromDnode(SRpcMsg *pMsg, SRpcIpSet *); static void dnodeProcessReqMsgFromDnode(SRpcMsg *pMsg, SRpcEpSet *);
static void (*dnodeProcessRspMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *rpcMsg); static void (*dnodeProcessRspMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *rpcMsg);
static void dnodeProcessRspFromDnode(SRpcMsg *pMsg, SRpcIpSet *pIpSet); static void dnodeProcessRspFromDnode(SRpcMsg *pMsg, SRpcEpSet *pEpSet);
static void *tsDnodeServerRpc = NULL; static void *tsDnodeServerRpc = NULL;
static void *tsDnodeClientRpc = NULL; static void *tsDnodeClientRpc = NULL;
...@@ -83,7 +83,7 @@ void dnodeCleanupServer() { ...@@ -83,7 +83,7 @@ void dnodeCleanupServer() {
} }
} }
static void dnodeProcessReqMsgFromDnode(SRpcMsg *pMsg, SRpcIpSet *pIpSet) { static void dnodeProcessReqMsgFromDnode(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
SRpcMsg rspMsg = { SRpcMsg rspMsg = {
.handle = pMsg->handle, .handle = pMsg->handle,
.pCont = NULL, .pCont = NULL,
...@@ -148,9 +148,9 @@ void dnodeCleanupClient() { ...@@ -148,9 +148,9 @@ void dnodeCleanupClient() {
} }
} }
static void dnodeProcessRspFromDnode(SRpcMsg *pMsg, SRpcIpSet *pIpSet) { static void dnodeProcessRspFromDnode(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
if (pMsg->msgType == TSDB_MSG_TYPE_DM_STATUS_RSP && pIpSet) { if (pMsg->msgType == TSDB_MSG_TYPE_DM_STATUS_RSP && pEpSet) {
dnodeUpdateMnodeIpSetForPeer(pIpSet); dnodeUpdateMnodeEpSetForPeer(pEpSet);
} }
if (dnodeProcessRspMsgFp[pMsg->msgType]) { if (dnodeProcessRspMsgFp[pMsg->msgType]) {
...@@ -166,12 +166,12 @@ void dnodeAddClientRspHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg)) { ...@@ -166,12 +166,12 @@ void dnodeAddClientRspHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg)) {
dnodeProcessRspMsgFp[msgType] = fp; dnodeProcessRspMsgFp[msgType] = fp;
} }
void dnodeSendMsgToDnode(SRpcIpSet *ipSet, SRpcMsg *rpcMsg) { void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg) {
rpcSendRequest(tsDnodeClientRpc, ipSet, rpcMsg); rpcSendRequest(tsDnodeClientRpc, epSet, rpcMsg);
} }
void dnodeSendMsgToDnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp) { void dnodeSendMsgToDnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp) {
SRpcIpSet ipSet = {0}; SRpcEpSet epSet = {0};
dnodeGetMnodeIpSetForPeer(&ipSet); dnodeGetMnodeEpSetForPeer(&epSet);
rpcSendRecv(tsDnodeClientRpc, &ipSet, rpcMsg, rpcRsp); rpcSendRecv(tsDnodeClientRpc, &epSet, rpcMsg, rpcRsp);
} }
...@@ -31,7 +31,7 @@ ...@@ -31,7 +31,7 @@
#include "dnodeShell.h" #include "dnodeShell.h"
static void (*dnodeProcessShellMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *); static void (*dnodeProcessShellMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *);
static void dnodeProcessMsgFromShell(SRpcMsg *pMsg, SRpcIpSet *); static void dnodeProcessMsgFromShell(SRpcMsg *pMsg, SRpcEpSet *);
static int dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey); static int dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey);
static void * tsDnodeShellRpc = NULL; static void * tsDnodeShellRpc = NULL;
static int32_t tsDnodeQueryReqNum = 0; static int32_t tsDnodeQueryReqNum = 0;
...@@ -108,7 +108,7 @@ void dnodeCleanupShell() { ...@@ -108,7 +108,7 @@ void dnodeCleanupShell() {
} }
} }
void dnodeProcessMsgFromShell(SRpcMsg *pMsg, SRpcIpSet *pIpSet) { void dnodeProcessMsgFromShell(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
SRpcMsg rpcMsg = { SRpcMsg rpcMsg = {
.handle = pMsg->handle, .handle = pMsg->handle,
.pCont = NULL, .pCont = NULL,
......
...@@ -39,13 +39,13 @@ SDnodeStatisInfo dnodeGetStatisInfo(); ...@@ -39,13 +39,13 @@ SDnodeStatisInfo dnodeGetStatisInfo();
bool dnodeIsFirstDeploy(); bool dnodeIsFirstDeploy();
char * dnodeGetMnodeMasterEp(); char * dnodeGetMnodeMasterEp();
void dnodeGetMnodeIpSetForPeer(void *ipSet); void dnodeGetMnodeEpSetForPeer(void *epSet);
void dnodeGetMnodeIpSetForShell(void *ipSet); void dnodeGetMnodeEpSetForShell(void *epSet);
void * dnodeGetMnodeInfos(); void * dnodeGetMnodeInfos();
int32_t dnodeGetDnodeId(); int32_t dnodeGetDnodeId();
void dnodeAddClientRspHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg)); void dnodeAddClientRspHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg));
void dnodeSendMsgToDnode(SRpcIpSet *ipSet, SRpcMsg *rpcMsg); void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg);
void dnodeSendMsgToDnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp); void dnodeSendMsgToDnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp);
void *dnodeSendCfgTableToRecv(int32_t vgId, int32_t sid); void *dnodeSendCfgTableToRecv(int32_t vgId, int32_t sid);
......
...@@ -176,7 +176,7 @@ extern char *taosMsg[]; ...@@ -176,7 +176,7 @@ extern char *taosMsg[];
typedef struct { typedef struct {
char fqdn[TSDB_FQDN_LEN]; char fqdn[TSDB_FQDN_LEN];
uint16_t port; uint16_t port;
} SIpAddr; } SEpAddr;
typedef struct { typedef struct {
int32_t numOfVnodes; int32_t numOfVnodes;
...@@ -306,7 +306,7 @@ typedef struct { ...@@ -306,7 +306,7 @@ typedef struct {
int8_t reserved1; int8_t reserved1;
int8_t reserved2; int8_t reserved2;
int32_t connId; int32_t connId;
SRpcIpSet ipList; SRpcEpSet epSet;
} SCMConnectRsp; } SCMConnectRsp;
typedef struct { typedef struct {
...@@ -648,8 +648,8 @@ typedef struct SCMSTableVgroupMsg { ...@@ -648,8 +648,8 @@ typedef struct SCMSTableVgroupMsg {
typedef struct { typedef struct {
int32_t vgId; int32_t vgId;
int8_t numOfIps; int8_t numOfEps;
SIpAddr ipAddr[TSDB_MAX_REPLICA]; SEpAddr epAddr[TSDB_MAX_REPLICA];
} SCMVgroupInfo; } SCMVgroupInfo;
typedef struct { typedef struct {
...@@ -753,7 +753,7 @@ typedef struct { ...@@ -753,7 +753,7 @@ typedef struct {
uint32_t onlineDnodes; uint32_t onlineDnodes;
uint32_t connId; uint32_t connId;
int8_t killConnection; int8_t killConnection;
SRpcIpSet ipList; SRpcEpSet epSet;
} SCMHeartBeatRsp; } SCMHeartBeatRsp;
typedef struct { typedef struct {
......
...@@ -28,12 +28,12 @@ extern "C" { ...@@ -28,12 +28,12 @@ extern "C" {
extern int tsRpcHeadSize; extern int tsRpcHeadSize;
typedef struct SRpcIpSet { typedef struct SRpcEpSet {
int8_t inUse; int8_t inUse;
int8_t numOfIps; int8_t numOfEps;
uint16_t port[TSDB_MAX_REPLICA]; uint16_t port[TSDB_MAX_REPLICA];
char fqdn[TSDB_MAX_REPLICA][TSDB_FQDN_LEN]; char fqdn[TSDB_MAX_REPLICA][TSDB_FQDN_LEN];
} SRpcIpSet; } SRpcEpSet;
typedef struct SRpcConnInfo { typedef struct SRpcConnInfo {
uint32_t clientIp; uint32_t clientIp;
...@@ -67,7 +67,7 @@ typedef struct SRpcInit { ...@@ -67,7 +67,7 @@ typedef struct SRpcInit {
char *ckey; // ciphering key char *ckey; // ciphering key
// call back to process incoming msg, code shall be ignored by server app // call back to process incoming msg, code shall be ignored by server app
void (*cfp)(SRpcMsg *, SRpcIpSet *); void (*cfp)(SRpcMsg *, SRpcEpSet *);
// call back to retrieve the client auth info, for server app only // call back to retrieve the client auth info, for server app only
int (*afp)(char *tableId, char *spi, char *encrypt, char *secret, char *ckey); int (*afp)(char *tableId, char *spi, char *encrypt, char *secret, char *ckey);
...@@ -78,11 +78,11 @@ void rpcClose(void *); ...@@ -78,11 +78,11 @@ void rpcClose(void *);
void *rpcMallocCont(int contLen); void *rpcMallocCont(int contLen);
void rpcFreeCont(void *pCont); void rpcFreeCont(void *pCont);
void *rpcReallocCont(void *ptr, int contLen); void *rpcReallocCont(void *ptr, int contLen);
void rpcSendRequest(void *thandle, const SRpcIpSet *pIpSet, SRpcMsg *pMsg); void rpcSendRequest(void *thandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg);
void rpcSendResponse(const SRpcMsg *pMsg); void rpcSendResponse(const SRpcMsg *pMsg);
void rpcSendRedirectRsp(void *pConn, const SRpcIpSet *pIpSet); void rpcSendRedirectRsp(void *pConn, const SRpcEpSet *pEpSet);
int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo); int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo);
void rpcSendRecv(void *shandle, SRpcIpSet *pIpSet, SRpcMsg *pReq, SRpcMsg *pRsp); void rpcSendRecv(void *shandle, SRpcEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp);
int rpcReportProgress(void *pConn, char *pCont, int contLen); int rpcReportProgress(void *pConn, char *pCont, int contLen);
void rpcCancelRequest(void *pContext); void rpcCancelRequest(void *pContext);
......
...@@ -210,10 +210,10 @@ int32_t main(int32_t argc, char *argv[]) { ...@@ -210,10 +210,10 @@ int32_t main(int32_t argc, char *argv[]) {
(void)snprintf(mnodeWal, TSDB_FILENAME_LEN*2, "%s/mnode/wal/wal0", arguments.dataDir); (void)snprintf(mnodeWal, TSDB_FILENAME_LEN*2, "%s/mnode/wal/wal0", arguments.dataDir);
walModWalFile(mnodeWal); walModWalFile(mnodeWal);
// 2. modfiy dnode config: mnodeIpList.json // 2. modfiy dnode config: mnodeEpSet.json
char dnodeIpList[TSDB_FILENAME_LEN*2] = {0}; char dnodeEpSet[TSDB_FILENAME_LEN*2] = {0};
(void)snprintf(dnodeIpList, TSDB_FILENAME_LEN*2, "%s/dnode/mnodeIpList.json", arguments.dataDir); (void)snprintf(dnodeEpSet, TSDB_FILENAME_LEN*2, "%s/dnode/mnodeEpSet.json", arguments.dataDir);
modDnodeIpList(dnodeIpList); modDnodeEpSet(dnodeEpSet);
// 3. modify vnode config: config.json // 3. modify vnode config: config.json
char vnodeDir[TSDB_FILENAME_LEN*2] = {0}; char vnodeDir[TSDB_FILENAME_LEN*2] = {0};
......
...@@ -71,7 +71,7 @@ int tSystemShell(const char * cmd); ...@@ -71,7 +71,7 @@ int tSystemShell(const char * cmd);
void taosMvFile(char* destFile, char *srcFile) ; void taosMvFile(char* destFile, char *srcFile) ;
void walModWalFile(char* walfile); void walModWalFile(char* walfile);
SdnodeIfo* getDnodeInfo(int32_t dnodeId); SdnodeIfo* getDnodeInfo(int32_t dnodeId);
void modDnodeIpList(char* dnodeIpList); void modDnodeEpSet(char* dnodeEpSet);
void modAllVnode(char *vnodeDir); void modAllVnode(char *vnodeDir);
#endif #endif
...@@ -23,10 +23,10 @@ ...@@ -23,10 +23,10 @@
static SDMMnodeInfos tsDnodeIpInfos = {0}; static SDMMnodeInfos tsDnodeIpInfos = {0};
static bool dnodeReadMnodeInfos(char* dnodeIpList) { static bool dnodeReadMnodeInfos(char* dnodeEpSet) {
FILE *fp = fopen(dnodeIpList, "r"); FILE *fp = fopen(dnodeEpSet, "r");
if (!fp) { if (!fp) {
printf("failed to read mnodeIpList.json, file not exist\n"); printf("failed to read mnodeEpSet.json, file not exist\n");
return false; return false;
} }
...@@ -37,40 +37,40 @@ static bool dnodeReadMnodeInfos(char* dnodeIpList) { ...@@ -37,40 +37,40 @@ static bool dnodeReadMnodeInfos(char* dnodeIpList) {
if (len <= 0) { if (len <= 0) {
free(content); free(content);
fclose(fp); fclose(fp);
printf("failed to read mnodeIpList.json, content is null\n"); printf("failed to read mnodeEpSet.json, content is null\n");
return false; return false;
} }
content[len] = 0; content[len] = 0;
cJSON* root = cJSON_Parse(content); cJSON* root = cJSON_Parse(content);
if (root == NULL) { if (root == NULL) {
printf("failed to read mnodeIpList.json, invalid json format\n"); printf("failed to read mnodeEpSet.json, invalid json format\n");
goto PARSE_OVER; goto PARSE_OVER;
} }
cJSON* inUse = cJSON_GetObjectItem(root, "inUse"); cJSON* inUse = cJSON_GetObjectItem(root, "inUse");
if (!inUse || inUse->type != cJSON_Number) { if (!inUse || inUse->type != cJSON_Number) {
printf("failed to read mnodeIpList.json, inUse not found\n"); printf("failed to read mnodeEpSet.json, inUse not found\n");
goto PARSE_OVER; goto PARSE_OVER;
} }
tsDnodeIpInfos.inUse = inUse->valueint; tsDnodeIpInfos.inUse = inUse->valueint;
cJSON* nodeNum = cJSON_GetObjectItem(root, "nodeNum"); cJSON* nodeNum = cJSON_GetObjectItem(root, "nodeNum");
if (!nodeNum || nodeNum->type != cJSON_Number) { if (!nodeNum || nodeNum->type != cJSON_Number) {
printf("failed to read mnodeIpList.json, nodeNum not found\n"); printf("failed to read mnodeEpSet.json, nodeNum not found\n");
goto PARSE_OVER; goto PARSE_OVER;
} }
tsDnodeIpInfos.nodeNum = nodeNum->valueint; tsDnodeIpInfos.nodeNum = nodeNum->valueint;
cJSON* nodeInfos = cJSON_GetObjectItem(root, "nodeInfos"); cJSON* nodeInfos = cJSON_GetObjectItem(root, "nodeInfos");
if (!nodeInfos || nodeInfos->type != cJSON_Array) { if (!nodeInfos || nodeInfos->type != cJSON_Array) {
printf("failed to read mnodeIpList.json, nodeInfos not found\n"); printf("failed to read mnodeEpSet.json, nodeInfos not found\n");
goto PARSE_OVER; goto PARSE_OVER;
} }
int size = cJSON_GetArraySize(nodeInfos); int size = cJSON_GetArraySize(nodeInfos);
if (size != tsDnodeIpInfos.nodeNum) { if (size != tsDnodeIpInfos.nodeNum) {
printf("failed to read mnodeIpList.json, nodeInfos size not matched\n"); printf("failed to read mnodeEpSet.json, nodeInfos size not matched\n");
goto PARSE_OVER; goto PARSE_OVER;
} }
...@@ -80,14 +80,14 @@ static bool dnodeReadMnodeInfos(char* dnodeIpList) { ...@@ -80,14 +80,14 @@ static bool dnodeReadMnodeInfos(char* dnodeIpList) {
cJSON *nodeId = cJSON_GetObjectItem(nodeInfo, "nodeId"); cJSON *nodeId = cJSON_GetObjectItem(nodeInfo, "nodeId");
if (!nodeId || nodeId->type != cJSON_Number) { if (!nodeId || nodeId->type != cJSON_Number) {
printf("failed to read mnodeIpList.json, nodeId not found\n"); printf("failed to read mnodeEpSet.json, nodeId not found\n");
goto PARSE_OVER; goto PARSE_OVER;
} }
tsDnodeIpInfos.nodeInfos[i].nodeId = nodeId->valueint; tsDnodeIpInfos.nodeInfos[i].nodeId = nodeId->valueint;
cJSON *nodeEp = cJSON_GetObjectItem(nodeInfo, "nodeEp"); cJSON *nodeEp = cJSON_GetObjectItem(nodeInfo, "nodeEp");
if (!nodeEp || nodeEp->type != cJSON_String || nodeEp->valuestring == NULL) { if (!nodeEp || nodeEp->type != cJSON_String || nodeEp->valuestring == NULL) {
printf("failed to read mnodeIpList.json, nodeName not found\n"); printf("failed to read mnodeEpSet.json, nodeName not found\n");
goto PARSE_OVER; goto PARSE_OVER;
} }
strncpy(tsDnodeIpInfos.nodeInfos[i].nodeEp, nodeEp->valuestring, TSDB_EP_LEN); strncpy(tsDnodeIpInfos.nodeInfos[i].nodeEp, nodeEp->valuestring, TSDB_EP_LEN);
...@@ -102,7 +102,7 @@ static bool dnodeReadMnodeInfos(char* dnodeIpList) { ...@@ -102,7 +102,7 @@ static bool dnodeReadMnodeInfos(char* dnodeIpList) {
ret = true; ret = true;
//printf("read mnode iplist successed, numOfIps:%d inUse:%d\n", tsDnodeIpInfos.nodeNum, tsDnodeIpInfos.inUse); //printf("read mnode epSet successed, numOfEps:%d inUse:%d\n", tsDnodeIpInfos.nodeNum, tsDnodeIpInfos.inUse);
//for (int32_t i = 0; i < tsDnodeIpInfos.nodeNum; i++) { //for (int32_t i = 0; i < tsDnodeIpInfos.nodeNum; i++) {
// printf("mnode:%d, %s\n", tsDnodeIpInfos.nodeInfos[i].nodeId, tsDnodeIpInfos.nodeInfos[i].nodeEp); // printf("mnode:%d, %s\n", tsDnodeIpInfos.nodeInfos[i].nodeId, tsDnodeIpInfos.nodeInfos[i].nodeEp);
//} //}
...@@ -115,8 +115,8 @@ PARSE_OVER: ...@@ -115,8 +115,8 @@ PARSE_OVER:
} }
static void dnodeSaveMnodeInfos(char* dnodeIpList) { static void dnodeSaveMnodeInfos(char* dnodeEpSet) {
FILE *fp = fopen(dnodeIpList, "w"); FILE *fp = fopen(dnodeEpSet, "w");
if (!fp) return; if (!fp) return;
int32_t len = 0; int32_t len = 0;
...@@ -143,13 +143,13 @@ static void dnodeSaveMnodeInfos(char* dnodeIpList) { ...@@ -143,13 +143,13 @@ static void dnodeSaveMnodeInfos(char* dnodeIpList) {
fclose(fp); fclose(fp);
free(content); free(content);
printf("mod mnode iplist successed\n"); printf("mod mnode epSet successed\n");
} }
void modDnodeIpList(char* dnodeIpList) void modDnodeEpSet(char* dnodeEpSet)
{ {
(void)dnodeReadMnodeInfos(dnodeIpList); (void)dnodeReadMnodeInfos(dnodeEpSet);
dnodeSaveMnodeInfos(dnodeIpList); dnodeSaveMnodeInfos(dnodeEpSet);
return; return;
} }
......
...@@ -42,12 +42,12 @@ void mnodeIncMnodeRef(struct SMnodeObj *pMnode); ...@@ -42,12 +42,12 @@ void mnodeIncMnodeRef(struct SMnodeObj *pMnode);
void mnodeDecMnodeRef(struct SMnodeObj *pMnode); void mnodeDecMnodeRef(struct SMnodeObj *pMnode);
char * mnodeGetMnodeRoleStr(); char * mnodeGetMnodeRoleStr();
void mnodeGetMnodeIpSetForPeer(SRpcIpSet *ipSet); void mnodeGetMnodeEpSetForPeer(SRpcEpSet *epSet);
void mnodeGetMnodeIpSetForShell(SRpcIpSet *ipSet); void mnodeGetMnodeEpSetForShell(SRpcEpSet *epSet);
char* mnodeGetMnodeMasterEp(); char* mnodeGetMnodeMasterEp();
void mnodeGetMnodeInfos(void *mnodes); void mnodeGetMnodeInfos(void *mnodes);
void mnodeUpdateMnodeIpSet(); void mnodeUpdateMnodeEpSet();
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -44,12 +44,12 @@ int32_t mnodeGetAvailableVgroup(struct SMnodeMsg *pMsg, SVgObj **pVgroup, int32_ ...@@ -44,12 +44,12 @@ int32_t mnodeGetAvailableVgroup(struct SMnodeMsg *pMsg, SVgObj **pVgroup, int32_
void mnodeAddTableIntoVgroup(SVgObj *pVgroup, SChildTableObj *pTable); void mnodeAddTableIntoVgroup(SVgObj *pVgroup, SChildTableObj *pTable);
void mnodeRemoveTableFromVgroup(SVgObj *pVgroup, SChildTableObj *pTable); void mnodeRemoveTableFromVgroup(SVgObj *pVgroup, SChildTableObj *pTable);
void mnodeSendDropVnodeMsg(int32_t vgId, SRpcIpSet *ipSet, void *ahandle); void mnodeSendDropVnodeMsg(int32_t vgId, SRpcEpSet *epSet, void *ahandle);
void mnodeSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle); void mnodeSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle);
void mnodeSendAlterVgroupMsg(SVgObj *pVgroup); void mnodeSendAlterVgroupMsg(SVgObj *pVgroup);
SRpcIpSet mnodeGetIpSetFromVgroup(SVgObj *pVgroup); SRpcEpSet mnodeGetEpSetFromVgroup(SVgObj *pVgroup);
SRpcIpSet mnodeGetIpSetFromIp(char *ep); SRpcEpSet mnodeGetEpSetFromIp(char *ep);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -289,14 +289,14 @@ static int32_t mnodeProcessCfgDnodeMsg(SMnodeMsg *pMsg) { ...@@ -289,14 +289,14 @@ static int32_t mnodeProcessCfgDnodeMsg(SMnodeMsg *pMsg) {
} }
} }
SRpcIpSet ipSet = mnodeGetIpSetFromIp(pCmCfgDnode->ep); SRpcEpSet epSet = mnodeGetEpSetFromIp(pCmCfgDnode->ep);
if (dnodeId != 0) { if (dnodeId != 0) {
SDnodeObj *pDnode = mnodeGetDnode(dnodeId); SDnodeObj *pDnode = mnodeGetDnode(dnodeId);
if (pDnode == NULL) { if (pDnode == NULL) {
mError("failed to cfg dnode, invalid dnodeId:%d", dnodeId); mError("failed to cfg dnode, invalid dnodeId:%d", dnodeId);
return TSDB_CODE_MND_DNODE_NOT_EXIST; return TSDB_CODE_MND_DNODE_NOT_EXIST;
} }
ipSet = mnodeGetIpSetFromIp(pDnode->dnodeEp); epSet = mnodeGetEpSetFromIp(pDnode->dnodeEp);
mnodeDecDnodeRef(pDnode); mnodeDecDnodeRef(pDnode);
} }
...@@ -313,7 +313,7 @@ static int32_t mnodeProcessCfgDnodeMsg(SMnodeMsg *pMsg) { ...@@ -313,7 +313,7 @@ static int32_t mnodeProcessCfgDnodeMsg(SMnodeMsg *pMsg) {
}; };
mInfo("dnode:%s, is configured by %s", pCmCfgDnode->ep, pMsg->pUser->user); mInfo("dnode:%s, is configured by %s", pCmCfgDnode->ep, pMsg->pUser->user);
dnodeSendMsgToDnode(&ipSet, &rpcMdCfgDnodeMsg); dnodeSendMsgToDnode(&epSet, &rpcMdCfgDnodeMsg);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -399,9 +399,9 @@ static int32_t mnodeProcessDnodeStatusMsg(SMnodeMsg *pMsg) { ...@@ -399,9 +399,9 @@ static int32_t mnodeProcessDnodeStatusMsg(SMnodeMsg *pMsg) {
SVgObj *pVgroup = mnodeGetVgroup(pVload->vgId); SVgObj *pVgroup = mnodeGetVgroup(pVload->vgId);
if (pVgroup == NULL) { if (pVgroup == NULL) {
SRpcIpSet ipSet = mnodeGetIpSetFromIp(pDnode->dnodeEp); SRpcEpSet epSet = mnodeGetEpSetFromIp(pDnode->dnodeEp);
mInfo("dnode:%d, vgId:%d not exist in mnode, drop it", pDnode->dnodeId, pVload->vgId); mInfo("dnode:%d, vgId:%d not exist in mnode, drop it", pDnode->dnodeId, pVload->vgId);
mnodeSendDropVnodeMsg(pVload->vgId, &ipSet, NULL); mnodeSendDropVnodeMsg(pVload->vgId, &epSet, NULL);
} else { } else {
mnodeUpdateVgroupStatus(pVgroup, pDnode, pVload); mnodeUpdateVgroupStatus(pVgroup, pDnode, pVload);
pAccess->vgId = htonl(pVload->vgId); pAccess->vgId = htonl(pVload->vgId);
......
...@@ -35,8 +35,8 @@ ...@@ -35,8 +35,8 @@
static void * tsMnodeSdb = NULL; static void * tsMnodeSdb = NULL;
static int32_t tsMnodeUpdateSize = 0; static int32_t tsMnodeUpdateSize = 0;
static SRpcIpSet tsMnodeIpSetForShell; static SRpcEpSet tsMnodeEpSetForShell;
static SRpcIpSet tsMnodeIpSetForPeer; static SRpcEpSet tsMnodeEpSetForPeer;
static SDMMnodeInfos tsMnodeInfos; static SDMMnodeInfos tsMnodeInfos;
static int32_t mnodeGetMnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); static int32_t mnodeGetMnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn);
static int32_t mnodeRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn); static int32_t mnodeRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn);
...@@ -123,7 +123,7 @@ static int32_t mnodeMnodeActionRestored() { ...@@ -123,7 +123,7 @@ static int32_t mnodeMnodeActionRestored() {
sdbFreeIter(pIter); sdbFreeIter(pIter);
} }
mnodeUpdateMnodeIpSet(); mnodeUpdateMnodeEpSet();
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -204,13 +204,13 @@ char *mnodeGetMnodeRoleStr(int32_t role) { ...@@ -204,13 +204,13 @@ char *mnodeGetMnodeRoleStr(int32_t role) {
} }
} }
void mnodeUpdateMnodeIpSet() { void mnodeUpdateMnodeEpSet() {
mInfo("update mnodes ipset, numOfIps:%d ", mnodeGetMnodesNum()); mInfo("update mnodes epSet, numOfEps:%d ", mnodeGetMnodesNum());
mnodeMnodeWrLock(); mnodeMnodeWrLock();
memset(&tsMnodeIpSetForShell, 0, sizeof(SRpcIpSet)); memset(&tsMnodeEpSetForShell, 0, sizeof(SRpcEpSet));
memset(&tsMnodeIpSetForPeer, 0, sizeof(SRpcIpSet)); memset(&tsMnodeEpSetForPeer, 0, sizeof(SRpcEpSet));
memset(&tsMnodeInfos, 0, sizeof(SDMMnodeInfos)); memset(&tsMnodeInfos, 0, sizeof(SDMMnodeInfos));
int32_t index = 0; int32_t index = 0;
...@@ -222,20 +222,20 @@ void mnodeUpdateMnodeIpSet() { ...@@ -222,20 +222,20 @@ void mnodeUpdateMnodeIpSet() {
SDnodeObj *pDnode = mnodeGetDnode(pMnode->mnodeId); SDnodeObj *pDnode = mnodeGetDnode(pMnode->mnodeId);
if (pDnode != NULL) { if (pDnode != NULL) {
strcpy(tsMnodeIpSetForShell.fqdn[index], pDnode->dnodeFqdn); strcpy(tsMnodeEpSetForShell.fqdn[index], pDnode->dnodeFqdn);
tsMnodeIpSetForShell.port[index] = htons(pDnode->dnodePort); tsMnodeEpSetForShell.port[index] = htons(pDnode->dnodePort);
mDebug("mnode:%d, for shell fqdn:%s %d", pDnode->dnodeId, tsMnodeIpSetForShell.fqdn[index], htons(tsMnodeIpSetForShell.port[index])); mDebug("mnode:%d, for shell fqdn:%s %d", pDnode->dnodeId, tsMnodeEpSetForShell.fqdn[index], htons(tsMnodeEpSetForShell.port[index]));
strcpy(tsMnodeIpSetForPeer.fqdn[index], pDnode->dnodeFqdn); strcpy(tsMnodeEpSetForPeer.fqdn[index], pDnode->dnodeFqdn);
tsMnodeIpSetForPeer.port[index] = htons(pDnode->dnodePort + TSDB_PORT_DNODEDNODE); tsMnodeEpSetForPeer.port[index] = htons(pDnode->dnodePort + TSDB_PORT_DNODEDNODE);
mDebug("mnode:%d, for peer fqdn:%s %d", pDnode->dnodeId, tsMnodeIpSetForPeer.fqdn[index], htons(tsMnodeIpSetForPeer.port[index])); mDebug("mnode:%d, for peer fqdn:%s %d", pDnode->dnodeId, tsMnodeEpSetForPeer.fqdn[index], htons(tsMnodeEpSetForPeer.port[index]));
tsMnodeInfos.nodeInfos[index].nodeId = htonl(pMnode->mnodeId); tsMnodeInfos.nodeInfos[index].nodeId = htonl(pMnode->mnodeId);
strcpy(tsMnodeInfos.nodeInfos[index].nodeEp, pDnode->dnodeEp); strcpy(tsMnodeInfos.nodeInfos[index].nodeEp, pDnode->dnodeEp);
if (pMnode->role == TAOS_SYNC_ROLE_MASTER) { if (pMnode->role == TAOS_SYNC_ROLE_MASTER) {
tsMnodeIpSetForShell.inUse = index; tsMnodeEpSetForShell.inUse = index;
tsMnodeIpSetForPeer.inUse = index; tsMnodeEpSetForPeer.inUse = index;
tsMnodeInfos.inUse = index; tsMnodeInfos.inUse = index;
} }
...@@ -248,23 +248,23 @@ void mnodeUpdateMnodeIpSet() { ...@@ -248,23 +248,23 @@ void mnodeUpdateMnodeIpSet() {
} }
tsMnodeInfos.nodeNum = index; tsMnodeInfos.nodeNum = index;
tsMnodeIpSetForShell.numOfIps = index; tsMnodeEpSetForShell.numOfEps = index;
tsMnodeIpSetForPeer.numOfIps = index; tsMnodeEpSetForPeer.numOfEps = index;
sdbFreeIter(pIter); sdbFreeIter(pIter);
mnodeMnodeUnLock(); mnodeMnodeUnLock();
} }
void mnodeGetMnodeIpSetForPeer(SRpcIpSet *ipSet) { void mnodeGetMnodeEpSetForPeer(SRpcEpSet *epSet) {
mnodeMnodeRdLock(); mnodeMnodeRdLock();
*ipSet = tsMnodeIpSetForPeer; *epSet = tsMnodeEpSetForPeer;
mnodeMnodeUnLock(); mnodeMnodeUnLock();
} }
void mnodeGetMnodeIpSetForShell(SRpcIpSet *ipSet) { void mnodeGetMnodeEpSetForShell(SRpcEpSet *epSet) {
mnodeMnodeRdLock(); mnodeMnodeRdLock();
*ipSet = tsMnodeIpSetForShell; *epSet = tsMnodeEpSetForShell;
mnodeMnodeUnLock(); mnodeMnodeUnLock();
} }
...@@ -295,7 +295,7 @@ int32_t mnodeAddMnode(int32_t dnodeId) { ...@@ -295,7 +295,7 @@ int32_t mnodeAddMnode(int32_t dnodeId) {
code = TSDB_CODE_MND_SDB_ERROR; code = TSDB_CODE_MND_SDB_ERROR;
} }
mnodeUpdateMnodeIpSet(); mnodeUpdateMnodeEpSet();
return code; return code;
} }
...@@ -308,7 +308,7 @@ void mnodeDropMnodeLocal(int32_t dnodeId) { ...@@ -308,7 +308,7 @@ void mnodeDropMnodeLocal(int32_t dnodeId) {
mnodeDecMnodeRef(pMnode); mnodeDecMnodeRef(pMnode);
} }
mnodeUpdateMnodeIpSet(); mnodeUpdateMnodeEpSet();
} }
int32_t mnodeDropMnode(int32_t dnodeId) { int32_t mnodeDropMnode(int32_t dnodeId) {
...@@ -330,7 +330,7 @@ int32_t mnodeDropMnode(int32_t dnodeId) { ...@@ -330,7 +330,7 @@ int32_t mnodeDropMnode(int32_t dnodeId) {
sdbDecRef(tsMnodeSdb, pMnode); sdbDecRef(tsMnodeSdb, pMnode);
mnodeUpdateMnodeIpSet(); mnodeUpdateMnodeEpSet();
return code; return code;
} }
......
...@@ -53,14 +53,14 @@ int32_t mnodeProcessPeerReq(SMnodeMsg *pMsg) { ...@@ -53,14 +53,14 @@ int32_t mnodeProcessPeerReq(SMnodeMsg *pMsg) {
if (!sdbIsMaster()) { if (!sdbIsMaster()) {
SMnodeRsp *rpcRsp = &pMsg->rpcRsp; SMnodeRsp *rpcRsp = &pMsg->rpcRsp;
SRpcIpSet *ipSet = rpcMallocCont(sizeof(SRpcIpSet)); SRpcEpSet *epSet = rpcMallocCont(sizeof(SRpcEpSet));
mnodeGetMnodeIpSetForPeer(ipSet); mnodeGetMnodeEpSetForPeer(epSet);
rpcRsp->rsp = ipSet; rpcRsp->rsp = epSet;
rpcRsp->len = sizeof(SRpcIpSet); rpcRsp->len = sizeof(SRpcEpSet);
mDebug("%p, msg:%s in mpeer queue, will be redireced inUse:%d", pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType], ipSet->inUse); mDebug("%p, msg:%s in mpeer queue, will be redireced inUse:%d", pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType], epSet->inUse);
for (int32_t i = 0; i < ipSet->numOfIps; ++i) { for (int32_t i = 0; i < epSet->numOfEps; ++i) {
mDebug("mnode index:%d ip:%s:%d", i, ipSet->fqdn[i], htons(ipSet->port[i])); mDebug("mnode index:%d ep:%s:%d", i, epSet->fqdn[i], htons(epSet->port[i]));
} }
return TSDB_CODE_RPC_REDIRECT; return TSDB_CODE_RPC_REDIRECT;
......
...@@ -49,14 +49,14 @@ int32_t mnodeProcessRead(SMnodeMsg *pMsg) { ...@@ -49,14 +49,14 @@ int32_t mnodeProcessRead(SMnodeMsg *pMsg) {
if (!sdbIsMaster()) { if (!sdbIsMaster()) {
SMnodeRsp *rpcRsp = &pMsg->rpcRsp; SMnodeRsp *rpcRsp = &pMsg->rpcRsp;
SRpcIpSet *ipSet = rpcMallocCont(sizeof(SRpcIpSet)); SRpcEpSet *epSet = rpcMallocCont(sizeof(SRpcEpSet));
mnodeGetMnodeIpSetForShell(ipSet); mnodeGetMnodeEpSetForShell(epSet);
rpcRsp->rsp = ipSet; rpcRsp->rsp = epSet;
rpcRsp->len = sizeof(SRpcIpSet); rpcRsp->len = sizeof(SRpcEpSet);
mDebug("%p, msg:%s in mread queue, will be redireced, inUse:%d", pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType], ipSet->inUse); mDebug("%p, msg:%s in mread queue, will be redireced, inUse:%d", pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType], epSet->inUse);
for (int32_t i = 0; i < ipSet->numOfIps; ++i) { for (int32_t i = 0; i < epSet->numOfEps; ++i) {
mDebug("mnode index:%d ip:%s:%d", i, ipSet->fqdn[i], htons(ipSet->port[i])); mDebug("mnode index:%d ep:%s:%d", i, epSet->fqdn[i], htons(epSet->port[i]));
} }
return TSDB_CODE_RPC_REDIRECT; return TSDB_CODE_RPC_REDIRECT;
......
...@@ -219,7 +219,7 @@ void sdbUpdateMnodeRoles() { ...@@ -219,7 +219,7 @@ void sdbUpdateMnodeRoles() {
} }
} }
mnodeUpdateMnodeIpSet(); mnodeUpdateMnodeEpSet();
} }
static uint32_t sdbGetFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int32_t *size, uint64_t *fversion) { static uint32_t sdbGetFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int32_t *size, uint64_t *fversion) {
......
...@@ -270,7 +270,7 @@ static int32_t mnodeProcessHeartBeatMsg(SMnodeMsg *pMsg) { ...@@ -270,7 +270,7 @@ static int32_t mnodeProcessHeartBeatMsg(SMnodeMsg *pMsg) {
pHBRsp->onlineDnodes = htonl(mnodeGetOnlinDnodesNum()); pHBRsp->onlineDnodes = htonl(mnodeGetOnlinDnodesNum());
pHBRsp->totalDnodes = htonl(mnodeGetDnodesNum()); pHBRsp->totalDnodes = htonl(mnodeGetDnodesNum());
mnodeGetMnodeIpSetForShell(&pHBRsp->ipList); mnodeGetMnodeEpSetForShell(&pHBRsp->epSet);
pMsg->rpcRsp.rsp = pHBRsp; pMsg->rpcRsp.rsp = pHBRsp;
pMsg->rpcRsp.len = sizeof(SCMHeartBeatRsp); pMsg->rpcRsp.len = sizeof(SCMHeartBeatRsp);
...@@ -335,7 +335,7 @@ static int32_t mnodeProcessConnectMsg(SMnodeMsg *pMsg) { ...@@ -335,7 +335,7 @@ static int32_t mnodeProcessConnectMsg(SMnodeMsg *pMsg) {
pConnectRsp->writeAuth = pUser->writeAuth; pConnectRsp->writeAuth = pUser->writeAuth;
pConnectRsp->superAuth = pUser->superAuth; pConnectRsp->superAuth = pUser->superAuth;
mnodeGetMnodeIpSetForShell(&pConnectRsp->ipList); mnodeGetMnodeEpSetForShell(&pConnectRsp->epSet);
connect_over: connect_over:
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
......
...@@ -910,9 +910,9 @@ static int32_t mnodeProcessDropSuperTableMsg(SMnodeMsg *pMsg) { ...@@ -910,9 +910,9 @@ static int32_t mnodeProcessDropSuperTableMsg(SMnodeMsg *pMsg) {
mInfo("app:%p:%p, stable:%s, send drop stable msg to vgId:%d", pMsg->rpcMsg.ahandle, pMsg, pStable->info.tableId, mInfo("app:%p:%p, stable:%s, send drop stable msg to vgId:%d", pMsg->rpcMsg.ahandle, pMsg, pStable->info.tableId,
pVgroup->vgId); pVgroup->vgId);
SRpcIpSet ipSet = mnodeGetIpSetFromVgroup(pVgroup); SRpcEpSet epSet = mnodeGetEpSetFromVgroup(pVgroup);
SRpcMsg rpcMsg = {.pCont = pDrop, .contLen = sizeof(SMDDropSTableMsg), .msgType = TSDB_MSG_TYPE_MD_DROP_STABLE}; SRpcMsg rpcMsg = {.pCont = pDrop, .contLen = sizeof(SMDDropSTableMsg), .msgType = TSDB_MSG_TYPE_MD_DROP_STABLE};
dnodeSendMsgToDnode(&ipSet, &rpcMsg); dnodeSendMsgToDnode(&epSet, &rpcMsg);
mnodeDecVgroupRef(pVgroup); mnodeDecVgroupRef(pVgroup);
} }
taosHashDestroyIter(pIter); taosHashDestroyIter(pIter);
...@@ -1484,10 +1484,10 @@ static int32_t mnodeProcessSuperTableVgroupMsg(SMnodeMsg *pMsg) { ...@@ -1484,10 +1484,10 @@ static int32_t mnodeProcessSuperTableVgroupMsg(SMnodeMsg *pMsg) {
SDnodeObj *pDnode = pVgroup->vnodeGid[vn].pDnode; SDnodeObj *pDnode = pVgroup->vnodeGid[vn].pDnode;
if (pDnode == NULL) break; if (pDnode == NULL) break;
tstrncpy(pVgroupInfo->vgroups[vgSize].ipAddr[vn].fqdn, pDnode->dnodeFqdn, TSDB_FQDN_LEN); tstrncpy(pVgroupInfo->vgroups[vgSize].epAddr[vn].fqdn, pDnode->dnodeFqdn, TSDB_FQDN_LEN);
pVgroupInfo->vgroups[vgSize].ipAddr[vn].port = htons(pDnode->dnodePort); pVgroupInfo->vgroups[vgSize].epAddr[vn].port = htons(pDnode->dnodePort);
pVgroupInfo->vgroups[vgSize].numOfIps++; pVgroupInfo->vgroups[vgSize].numOfEps++;
} }
vgSize++; vgSize++;
...@@ -1615,7 +1615,7 @@ static int32_t mnodeDoCreateChildTableCb(SMnodeMsg *pMsg, int32_t code) { ...@@ -1615,7 +1615,7 @@ static int32_t mnodeDoCreateChildTableCb(SMnodeMsg *pMsg, int32_t code) {
return terrno; return terrno;
} }
SRpcIpSet ipSet = mnodeGetIpSetFromVgroup(pMsg->pVgroup); SRpcEpSet epSet = mnodeGetEpSetFromVgroup(pMsg->pVgroup);
SRpcMsg rpcMsg = { SRpcMsg rpcMsg = {
.ahandle = pMsg, .ahandle = pMsg,
.pCont = pMDCreate, .pCont = pMDCreate,
...@@ -1624,7 +1624,7 @@ static int32_t mnodeDoCreateChildTableCb(SMnodeMsg *pMsg, int32_t code) { ...@@ -1624,7 +1624,7 @@ static int32_t mnodeDoCreateChildTableCb(SMnodeMsg *pMsg, int32_t code) {
.msgType = TSDB_MSG_TYPE_MD_CREATE_TABLE .msgType = TSDB_MSG_TYPE_MD_CREATE_TABLE
}; };
dnodeSendMsgToDnode(&ipSet, &rpcMsg); dnodeSendMsgToDnode(&epSet, &rpcMsg);
return TSDB_CODE_MND_ACTION_IN_PROGRESS; return TSDB_CODE_MND_ACTION_IN_PROGRESS;
} }
...@@ -1788,7 +1788,7 @@ static int32_t mnodeProcessDropChildTableMsg(SMnodeMsg *pMsg, bool needReturn) { ...@@ -1788,7 +1788,7 @@ static int32_t mnodeProcessDropChildTableMsg(SMnodeMsg *pMsg, bool needReturn) {
pDrop->sid = htonl(pTable->sid); pDrop->sid = htonl(pTable->sid);
pDrop->uid = htobe64(pTable->uid); pDrop->uid = htobe64(pTable->uid);
SRpcIpSet ipSet = mnodeGetIpSetFromVgroup(pMsg->pVgroup); SRpcEpSet epSet = mnodeGetEpSetFromVgroup(pMsg->pVgroup);
mInfo("app:%p:%p, table:%s, send drop ctable msg, vgId:%d sid:%d uid:%" PRIu64, pMsg->rpcMsg.ahandle, pMsg, mInfo("app:%p:%p, table:%s, send drop ctable msg, vgId:%d sid:%d uid:%" PRIu64, pMsg->rpcMsg.ahandle, pMsg,
pDrop->tableId, pTable->vgId, pTable->sid, pTable->uid); pDrop->tableId, pTable->vgId, pTable->sid, pTable->uid);
...@@ -1803,7 +1803,7 @@ static int32_t mnodeProcessDropChildTableMsg(SMnodeMsg *pMsg, bool needReturn) { ...@@ -1803,7 +1803,7 @@ static int32_t mnodeProcessDropChildTableMsg(SMnodeMsg *pMsg, bool needReturn) {
if (!needReturn) rpcMsg.ahandle = NULL; if (!needReturn) rpcMsg.ahandle = NULL;
dnodeSendMsgToDnode(&ipSet, &rpcMsg); dnodeSendMsgToDnode(&epSet, &rpcMsg);
return TSDB_CODE_MND_ACTION_IN_PROGRESS; return TSDB_CODE_MND_ACTION_IN_PROGRESS;
} }
...@@ -1842,7 +1842,7 @@ static int32_t mnodeAlterNormalTableColumnCb(SMnodeMsg *pMsg, int32_t code) { ...@@ -1842,7 +1842,7 @@ static int32_t mnodeAlterNormalTableColumnCb(SMnodeMsg *pMsg, int32_t code) {
} }
} }
SRpcIpSet ipSet = mnodeGetIpSetFromVgroup(pMsg->pVgroup); SRpcEpSet epSet = mnodeGetEpSetFromVgroup(pMsg->pVgroup);
SRpcMsg rpcMsg = { SRpcMsg rpcMsg = {
.ahandle = pMsg, .ahandle = pMsg,
.pCont = pMDCreate, .pCont = pMDCreate,
...@@ -1854,7 +1854,7 @@ static int32_t mnodeAlterNormalTableColumnCb(SMnodeMsg *pMsg, int32_t code) { ...@@ -1854,7 +1854,7 @@ static int32_t mnodeAlterNormalTableColumnCb(SMnodeMsg *pMsg, int32_t code) {
mDebug("app:%p:%p, ctable %s, send alter column msg to vgId:%d", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId, mDebug("app:%p:%p, ctable %s, send alter column msg to vgId:%d", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId,
pMsg->pVgroup->vgId); pMsg->pVgroup->vgId);
dnodeSendMsgToDnode(&ipSet, &rpcMsg); dnodeSendMsgToDnode(&epSet, &rpcMsg);
return TSDB_CODE_MND_ACTION_IN_PROGRESS; return TSDB_CODE_MND_ACTION_IN_PROGRESS;
} }
...@@ -1996,9 +1996,9 @@ static int32_t mnodeDoGetChildTableMeta(SMnodeMsg *pMsg, STableMetaMsg *pMeta) { ...@@ -1996,9 +1996,9 @@ static int32_t mnodeDoGetChildTableMeta(SMnodeMsg *pMsg, STableMetaMsg *pMeta) {
for (int32_t i = 0; i < pMsg->pVgroup->numOfVnodes; ++i) { for (int32_t i = 0; i < pMsg->pVgroup->numOfVnodes; ++i) {
SDnodeObj *pDnode = mnodeGetDnode(pMsg->pVgroup->vnodeGid[i].dnodeId); SDnodeObj *pDnode = mnodeGetDnode(pMsg->pVgroup->vnodeGid[i].dnodeId);
if (pDnode == NULL) break; if (pDnode == NULL) break;
strcpy(pMeta->vgroup.ipAddr[i].fqdn, pDnode->dnodeFqdn); strcpy(pMeta->vgroup.epAddr[i].fqdn, pDnode->dnodeFqdn);
pMeta->vgroup.ipAddr[i].port = htons(pDnode->dnodePort + TSDB_PORT_DNODESHELL); pMeta->vgroup.epAddr[i].port = htons(pDnode->dnodePort + TSDB_PORT_DNODESHELL);
pMeta->vgroup.numOfIps++; pMeta->vgroup.numOfEps++;
mnodeDecDnodeRef(pDnode); mnodeDecDnodeRef(pDnode);
} }
pMeta->vgroup.vgId = htonl(pMsg->pVgroup->vgId); pMeta->vgroup.vgId = htonl(pMsg->pVgroup->vgId);
......
...@@ -317,9 +317,9 @@ void mnodeUpdateVgroupStatus(SVgObj *pVgroup, SDnodeObj *pDnode, SVnodeLoad *pVl ...@@ -317,9 +317,9 @@ void mnodeUpdateVgroupStatus(SVgObj *pVgroup, SDnodeObj *pDnode, SVnodeLoad *pVl
} }
if (!dnodeExist) { if (!dnodeExist) {
SRpcIpSet ipSet = mnodeGetIpSetFromIp(pDnode->dnodeEp); SRpcEpSet epSet = mnodeGetEpSetFromIp(pDnode->dnodeEp);
mError("vgId:%d, dnode:%d not exist in mnode, drop it", pVload->vgId, pDnode->dnodeId); mError("vgId:%d, dnode:%d not exist in mnode, drop it", pVload->vgId, pDnode->dnodeId);
mnodeSendDropVnodeMsg(pVload->vgId, &ipSet, NULL); mnodeSendDropVnodeMsg(pVload->vgId, &epSet, NULL);
return; return;
} }
...@@ -809,29 +809,29 @@ static SMDCreateVnodeMsg *mnodeBuildVnodeMsg(SVgObj *pVgroup) { ...@@ -809,29 +809,29 @@ static SMDCreateVnodeMsg *mnodeBuildVnodeMsg(SVgObj *pVgroup) {
return pVnode; return pVnode;
} }
SRpcIpSet mnodeGetIpSetFromVgroup(SVgObj *pVgroup) { SRpcEpSet mnodeGetEpSetFromVgroup(SVgObj *pVgroup) {
SRpcIpSet ipSet = { SRpcEpSet epSet = {
.numOfIps = pVgroup->numOfVnodes, .numOfEps = pVgroup->numOfVnodes,
.inUse = 0, .inUse = 0,
}; };
for (int i = 0; i < pVgroup->numOfVnodes; ++i) { for (int i = 0; i < pVgroup->numOfVnodes; ++i) {
strcpy(ipSet.fqdn[i], pVgroup->vnodeGid[i].pDnode->dnodeFqdn); strcpy(epSet.fqdn[i], pVgroup->vnodeGid[i].pDnode->dnodeFqdn);
ipSet.port[i] = pVgroup->vnodeGid[i].pDnode->dnodePort + TSDB_PORT_DNODEDNODE; epSet.port[i] = pVgroup->vnodeGid[i].pDnode->dnodePort + TSDB_PORT_DNODEDNODE;
} }
return ipSet; return epSet;
} }
SRpcIpSet mnodeGetIpSetFromIp(char *ep) { SRpcEpSet mnodeGetEpSetFromIp(char *ep) {
SRpcIpSet ipSet; SRpcEpSet epSet;
ipSet.numOfIps = 1; epSet.numOfEps = 1;
ipSet.inUse = 0; epSet.inUse = 0;
taosGetFqdnPortFromEp(ep, ipSet.fqdn[0], &ipSet.port[0]); taosGetFqdnPortFromEp(ep, epSet.fqdn[0], &epSet.port[0]);
ipSet.port[0] += TSDB_PORT_DNODEDNODE; epSet.port[0] += TSDB_PORT_DNODEDNODE;
return ipSet; return epSet;
} }
static void mnodeSendAlterVnodeMsg(SVgObj *pVgroup, SRpcIpSet *ipSet) { static void mnodeSendAlterVnodeMsg(SVgObj *pVgroup, SRpcEpSet *epSet) {
SMDAlterVnodeMsg *pAlter = mnodeBuildVnodeMsg(pVgroup); SMDAlterVnodeMsg *pAlter = mnodeBuildVnodeMsg(pVgroup);
SRpcMsg rpcMsg = { SRpcMsg rpcMsg = {
.ahandle = NULL, .ahandle = NULL,
...@@ -840,21 +840,21 @@ static void mnodeSendAlterVnodeMsg(SVgObj *pVgroup, SRpcIpSet *ipSet) { ...@@ -840,21 +840,21 @@ static void mnodeSendAlterVnodeMsg(SVgObj *pVgroup, SRpcIpSet *ipSet) {
.code = 0, .code = 0,
.msgType = TSDB_MSG_TYPE_MD_ALTER_VNODE .msgType = TSDB_MSG_TYPE_MD_ALTER_VNODE
}; };
dnodeSendMsgToDnode(ipSet, &rpcMsg); dnodeSendMsgToDnode(epSet, &rpcMsg);
} }
void mnodeSendAlterVgroupMsg(SVgObj *pVgroup) { void mnodeSendAlterVgroupMsg(SVgObj *pVgroup) {
mDebug("vgId:%d, send alter all vnodes msg, numOfVnodes:%d db:%s", pVgroup->vgId, pVgroup->numOfVnodes, mDebug("vgId:%d, send alter all vnodes msg, numOfVnodes:%d db:%s", pVgroup->vgId, pVgroup->numOfVnodes,
pVgroup->dbName); pVgroup->dbName);
for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) { for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
SRpcIpSet ipSet = mnodeGetIpSetFromIp(pVgroup->vnodeGid[i].pDnode->dnodeEp); SRpcEpSet epSet = mnodeGetEpSetFromIp(pVgroup->vnodeGid[i].pDnode->dnodeEp);
mDebug("vgId:%d, index:%d, send alter vnode msg to dnode %s", pVgroup->vgId, i, mDebug("vgId:%d, index:%d, send alter vnode msg to dnode %s", pVgroup->vgId, i,
pVgroup->vnodeGid[i].pDnode->dnodeEp); pVgroup->vnodeGid[i].pDnode->dnodeEp);
mnodeSendAlterVnodeMsg(pVgroup, &ipSet); mnodeSendAlterVnodeMsg(pVgroup, &epSet);
} }
} }
static void mnodeSendCreateVnodeMsg(SVgObj *pVgroup, SRpcIpSet *ipSet, void *ahandle) { static void mnodeSendCreateVnodeMsg(SVgObj *pVgroup, SRpcEpSet *epSet, void *ahandle) {
SMDCreateVnodeMsg *pCreate = mnodeBuildVnodeMsg(pVgroup); SMDCreateVnodeMsg *pCreate = mnodeBuildVnodeMsg(pVgroup);
SRpcMsg rpcMsg = { SRpcMsg rpcMsg = {
.ahandle = ahandle, .ahandle = ahandle,
...@@ -863,17 +863,17 @@ static void mnodeSendCreateVnodeMsg(SVgObj *pVgroup, SRpcIpSet *ipSet, void *aha ...@@ -863,17 +863,17 @@ static void mnodeSendCreateVnodeMsg(SVgObj *pVgroup, SRpcIpSet *ipSet, void *aha
.code = 0, .code = 0,
.msgType = TSDB_MSG_TYPE_MD_CREATE_VNODE .msgType = TSDB_MSG_TYPE_MD_CREATE_VNODE
}; };
dnodeSendMsgToDnode(ipSet, &rpcMsg); dnodeSendMsgToDnode(epSet, &rpcMsg);
} }
void mnodeSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle) { void mnodeSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle) {
mDebug("vgId:%d, send create all vnodes msg, numOfVnodes:%d db:%s", pVgroup->vgId, pVgroup->numOfVnodes, mDebug("vgId:%d, send create all vnodes msg, numOfVnodes:%d db:%s", pVgroup->vgId, pVgroup->numOfVnodes,
pVgroup->dbName); pVgroup->dbName);
for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) { for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
SRpcIpSet ipSet = mnodeGetIpSetFromIp(pVgroup->vnodeGid[i].pDnode->dnodeEp); SRpcEpSet epSet = mnodeGetEpSetFromIp(pVgroup->vnodeGid[i].pDnode->dnodeEp);
mDebug("vgId:%d, index:%d, send create vnode msg to dnode %s, ahandle:%p", pVgroup->vgId, mDebug("vgId:%d, index:%d, send create vnode msg to dnode %s, ahandle:%p", pVgroup->vgId,
i, pVgroup->vnodeGid[i].pDnode->dnodeEp, ahandle); i, pVgroup->vnodeGid[i].pDnode->dnodeEp, ahandle);
mnodeSendCreateVnodeMsg(pVgroup, &ipSet, ahandle); mnodeSendCreateVnodeMsg(pVgroup, &epSet, ahandle);
} }
} }
...@@ -926,7 +926,7 @@ static SMDDropVnodeMsg *mnodeBuildDropVnodeMsg(int32_t vgId) { ...@@ -926,7 +926,7 @@ static SMDDropVnodeMsg *mnodeBuildDropVnodeMsg(int32_t vgId) {
return pDrop; return pDrop;
} }
void mnodeSendDropVnodeMsg(int32_t vgId, SRpcIpSet *ipSet, void *ahandle) { void mnodeSendDropVnodeMsg(int32_t vgId, SRpcEpSet *epSet, void *ahandle) {
SMDDropVnodeMsg *pDrop = mnodeBuildDropVnodeMsg(vgId); SMDDropVnodeMsg *pDrop = mnodeBuildDropVnodeMsg(vgId);
SRpcMsg rpcMsg = { SRpcMsg rpcMsg = {
.ahandle = ahandle, .ahandle = ahandle,
...@@ -935,16 +935,16 @@ void mnodeSendDropVnodeMsg(int32_t vgId, SRpcIpSet *ipSet, void *ahandle) { ...@@ -935,16 +935,16 @@ void mnodeSendDropVnodeMsg(int32_t vgId, SRpcIpSet *ipSet, void *ahandle) {
.code = 0, .code = 0,
.msgType = TSDB_MSG_TYPE_MD_DROP_VNODE .msgType = TSDB_MSG_TYPE_MD_DROP_VNODE
}; };
dnodeSendMsgToDnode(ipSet, &rpcMsg); dnodeSendMsgToDnode(epSet, &rpcMsg);
} }
static void mnodeSendDropVgroupMsg(SVgObj *pVgroup, void *ahandle) { static void mnodeSendDropVgroupMsg(SVgObj *pVgroup, void *ahandle) {
pVgroup->status = TAOS_VG_STATUS_DROPPING; // deleting pVgroup->status = TAOS_VG_STATUS_DROPPING; // deleting
mDebug("vgId:%d, send drop all vnodes msg, ahandle:%p", pVgroup->vgId, ahandle); mDebug("vgId:%d, send drop all vnodes msg, ahandle:%p", pVgroup->vgId, ahandle);
for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) { for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
SRpcIpSet ipSet = mnodeGetIpSetFromIp(pVgroup->vnodeGid[i].pDnode->dnodeEp); SRpcEpSet epSet = mnodeGetEpSetFromIp(pVgroup->vnodeGid[i].pDnode->dnodeEp);
mDebug("vgId:%d, send drop vnode msg to dnode:%d, ahandle:%p", pVgroup->vgId, pVgroup->vnodeGid[i].dnodeId, ahandle); mDebug("vgId:%d, send drop vnode msg to dnode:%d, ahandle:%p", pVgroup->vgId, pVgroup->vnodeGid[i].dnodeId, ahandle);
mnodeSendDropVnodeMsg(pVgroup->vgId, &ipSet, ahandle); mnodeSendDropVnodeMsg(pVgroup->vgId, &epSet, ahandle);
} }
} }
...@@ -998,8 +998,8 @@ static int32_t mnodeProcessVnodeCfgMsg(SMnodeMsg *pMsg) { ...@@ -998,8 +998,8 @@ static int32_t mnodeProcessVnodeCfgMsg(SMnodeMsg *pMsg) {
} }
mDebug("vgId:%d, send create vnode msg to dnode %s for vnode cfg msg", pVgroup->vgId, pDnode->dnodeEp); mDebug("vgId:%d, send create vnode msg to dnode %s for vnode cfg msg", pVgroup->vgId, pDnode->dnodeEp);
SRpcIpSet ipSet = mnodeGetIpSetFromIp(pDnode->dnodeEp); SRpcEpSet epSet = mnodeGetEpSetFromIp(pDnode->dnodeEp);
mnodeSendCreateVnodeMsg(pVgroup, &ipSet, NULL); mnodeSendCreateVnodeMsg(pVgroup, &epSet, NULL);
mnodeDecDnodeRef(pDnode); mnodeDecDnodeRef(pDnode);
mnodeDecVgroupRef(pVgroup); mnodeDecVgroupRef(pVgroup);
......
...@@ -49,16 +49,16 @@ int32_t mnodeProcessWrite(SMnodeMsg *pMsg) { ...@@ -49,16 +49,16 @@ int32_t mnodeProcessWrite(SMnodeMsg *pMsg) {
if (!sdbIsMaster()) { if (!sdbIsMaster()) {
SMnodeRsp *rpcRsp = &pMsg->rpcRsp; SMnodeRsp *rpcRsp = &pMsg->rpcRsp;
SRpcIpSet *ipSet = rpcMallocCont(sizeof(SRpcIpSet)); SRpcEpSet *epSet = rpcMallocCont(sizeof(SRpcEpSet));
mnodeGetMnodeIpSetForShell(ipSet); mnodeGetMnodeEpSetForShell(epSet);
rpcRsp->rsp = ipSet; rpcRsp->rsp = epSet;
rpcRsp->len = sizeof(SRpcIpSet); rpcRsp->len = sizeof(SRpcEpSet);
mDebug("app:%p:%p, msg:%s will be redireced inUse:%d", pMsg->rpcMsg.ahandle, pMsg, taosMsg[pMsg->rpcMsg.msgType], mDebug("app:%p:%p, msg:%s will be redireced inUse:%d", pMsg->rpcMsg.ahandle, pMsg, taosMsg[pMsg->rpcMsg.msgType],
ipSet->inUse); epSet->inUse);
for (int32_t i = 0; i < ipSet->numOfIps; ++i) { for (int32_t i = 0; i < epSet->numOfEps; ++i) {
mDebug("app:%p:%p, mnode index:%d ip:%s:%d", pMsg->rpcMsg.ahandle, pMsg, i, ipSet->fqdn[i], mDebug("app:%p:%p, mnode index:%d ep:%s:%d", pMsg->rpcMsg.ahandle, pMsg, i, epSet->fqdn[i],
htons(ipSet->port[i])); htons(epSet->port[i]));
} }
return TSDB_CODE_RPC_REDIRECT; return TSDB_CODE_RPC_REDIRECT;
......
...@@ -55,7 +55,7 @@ typedef struct { ...@@ -55,7 +55,7 @@ typedef struct {
char secret[TSDB_KEY_LEN]; // secret for the link char secret[TSDB_KEY_LEN]; // secret for the link
char ckey[TSDB_KEY_LEN]; // ciphering key char ckey[TSDB_KEY_LEN]; // ciphering key
void (*cfp)(SRpcMsg *, SRpcIpSet *); void (*cfp)(SRpcMsg *, SRpcEpSet *);
int (*afp)(char *user, char *spi, char *encrypt, char *secret, char *ckey); int (*afp)(char *user, char *spi, char *encrypt, char *secret, char *ckey);
int32_t refCount; int32_t refCount;
...@@ -71,7 +71,7 @@ typedef struct { ...@@ -71,7 +71,7 @@ typedef struct {
typedef struct { typedef struct {
SRpcInfo *pRpc; // associated SRpcInfo SRpcInfo *pRpc; // associated SRpcInfo
SRpcIpSet ipSet; // ip list provided by app SRpcEpSet epSet; // ip list provided by app
void *ahandle; // handle provided by app void *ahandle; // handle provided by app
void *signature; // for validation void *signature; // for validation
struct SRpcConn *pConn; // pConn allocated struct SRpcConn *pConn; // pConn allocated
...@@ -80,12 +80,12 @@ typedef struct { ...@@ -80,12 +80,12 @@ typedef struct {
int32_t contLen; // content length int32_t contLen; // content length
int32_t code; // error code int32_t code; // error code
int16_t numOfTry; // number of try for different servers int16_t numOfTry; // number of try for different servers
int8_t oldInUse; // server IP inUse passed by app int8_t oldInUse; // server EP inUse passed by app
int8_t redirect; // flag to indicate redirect int8_t redirect; // flag to indicate redirect
int8_t connType; // connection type int8_t connType; // connection type
SRpcMsg *pRsp; // for synchronous API SRpcMsg *pRsp; // for synchronous API
tsem_t *pSem; // for synchronous API tsem_t *pSem; // for synchronous API
SRpcIpSet *pSet; // for synchronous API SRpcEpSet *pSet; // for synchronous API
char msg[0]; // RpcHead starts from here char msg[0]; // RpcHead starts from here
} SRpcReqContext; } SRpcReqContext;
...@@ -355,7 +355,7 @@ void *rpcReallocCont(void *ptr, int contLen) { ...@@ -355,7 +355,7 @@ void *rpcReallocCont(void *ptr, int contLen) {
return start + sizeof(SRpcReqContext) + sizeof(SRpcHead); return start + sizeof(SRpcReqContext) + sizeof(SRpcHead);
} }
void rpcSendRequest(void *shandle, const SRpcIpSet *pIpSet, SRpcMsg *pMsg) { void rpcSendRequest(void *shandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg) {
SRpcInfo *pRpc = (SRpcInfo *)shandle; SRpcInfo *pRpc = (SRpcInfo *)shandle;
SRpcReqContext *pContext; SRpcReqContext *pContext;
...@@ -364,11 +364,11 @@ void rpcSendRequest(void *shandle, const SRpcIpSet *pIpSet, SRpcMsg *pMsg) { ...@@ -364,11 +364,11 @@ void rpcSendRequest(void *shandle, const SRpcIpSet *pIpSet, SRpcMsg *pMsg) {
pContext->ahandle = pMsg->ahandle; pContext->ahandle = pMsg->ahandle;
pContext->signature = pContext; pContext->signature = pContext;
pContext->pRpc = (SRpcInfo *)shandle; pContext->pRpc = (SRpcInfo *)shandle;
pContext->ipSet = *pIpSet; pContext->epSet = *pEpSet;
pContext->contLen = contLen; pContext->contLen = contLen;
pContext->pCont = pMsg->pCont; pContext->pCont = pMsg->pCont;
pContext->msgType = pMsg->msgType; pContext->msgType = pMsg->msgType;
pContext->oldInUse = pIpSet->inUse; pContext->oldInUse = pEpSet->inUse;
pContext->connType = RPC_CONN_UDPC; pContext->connType = RPC_CONN_UDPC;
if (contLen > tsRpcMaxUdpSize) pContext->connType = RPC_CONN_TCPC; if (contLen > tsRpcMaxUdpSize) pContext->connType = RPC_CONN_TCPC;
...@@ -458,15 +458,15 @@ void rpcSendResponse(const SRpcMsg *pRsp) { ...@@ -458,15 +458,15 @@ void rpcSendResponse(const SRpcMsg *pRsp) {
return; return;
} }
void rpcSendRedirectRsp(void *thandle, const SRpcIpSet *pIpSet) { void rpcSendRedirectRsp(void *thandle, const SRpcEpSet *pEpSet) {
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
memset(&rpcMsg, 0, sizeof(rpcMsg)); memset(&rpcMsg, 0, sizeof(rpcMsg));
rpcMsg.contLen = sizeof(SRpcIpSet); rpcMsg.contLen = sizeof(SRpcEpSet);
rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen); rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
if (rpcMsg.pCont == NULL) return; if (rpcMsg.pCont == NULL) return;
memcpy(rpcMsg.pCont, pIpSet, sizeof(SRpcIpSet)); memcpy(rpcMsg.pCont, pEpSet, sizeof(SRpcEpSet));
rpcMsg.code = TSDB_CODE_RPC_REDIRECT; rpcMsg.code = TSDB_CODE_RPC_REDIRECT;
rpcMsg.handle = thandle; rpcMsg.handle = thandle;
...@@ -488,7 +488,7 @@ int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo) { ...@@ -488,7 +488,7 @@ int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo) {
return 0; return 0;
} }
void rpcSendRecv(void *shandle, SRpcIpSet *pIpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) { void rpcSendRecv(void *shandle, SRpcEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) {
SRpcReqContext *pContext; SRpcReqContext *pContext;
pContext = (SRpcReqContext *) (pMsg->pCont-sizeof(SRpcHead)-sizeof(SRpcReqContext)); pContext = (SRpcReqContext *) (pMsg->pCont-sizeof(SRpcHead)-sizeof(SRpcReqContext));
...@@ -498,9 +498,9 @@ void rpcSendRecv(void *shandle, SRpcIpSet *pIpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) ...@@ -498,9 +498,9 @@ void rpcSendRecv(void *shandle, SRpcIpSet *pIpSet, SRpcMsg *pMsg, SRpcMsg *pRsp)
tsem_init(&sem, 0, 0); tsem_init(&sem, 0, 0);
pContext->pSem = &sem; pContext->pSem = &sem;
pContext->pRsp = pRsp; pContext->pRsp = pRsp;
pContext->pSet = pIpSet; pContext->pSet = pEpSet;
rpcSendRequest(shandle, pIpSet, pMsg); rpcSendRequest(shandle, pEpSet, pMsg);
tsem_wait(&sem); tsem_wait(&sem);
tsem_destroy(&sem); tsem_destroy(&sem);
...@@ -755,11 +755,11 @@ static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, SRecvInfo *pRecv) { ...@@ -755,11 +755,11 @@ static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, SRecvInfo *pRecv) {
static SRpcConn *rpcSetupConnToServer(SRpcReqContext *pContext) { static SRpcConn *rpcSetupConnToServer(SRpcReqContext *pContext) {
SRpcConn *pConn; SRpcConn *pConn;
SRpcInfo *pRpc = pContext->pRpc; SRpcInfo *pRpc = pContext->pRpc;
SRpcIpSet *pIpSet = &pContext->ipSet; SRpcEpSet *pEpSet = &pContext->epSet;
pConn = rpcGetConnFromCache(pRpc->pCache, pIpSet->fqdn[pIpSet->inUse], pIpSet->port[pIpSet->inUse], pContext->connType); pConn = rpcGetConnFromCache(pRpc->pCache, pEpSet->fqdn[pEpSet->inUse], pEpSet->port[pEpSet->inUse], pContext->connType);
if ( pConn == NULL || pConn->user[0] == 0) { if ( pConn == NULL || pConn->user[0] == 0) {
pConn = rpcOpenConn(pRpc, pIpSet->fqdn[pIpSet->inUse], pIpSet->port[pIpSet->inUse], pContext->connType); pConn = rpcOpenConn(pRpc, pEpSet->fqdn[pEpSet->inUse], pEpSet->port[pEpSet->inUse], pContext->connType);
} }
if (pConn) { if (pConn) {
...@@ -1020,16 +1020,16 @@ static void rpcNotifyClient(SRpcReqContext *pContext, SRpcMsg *pMsg) { ...@@ -1020,16 +1020,16 @@ static void rpcNotifyClient(SRpcReqContext *pContext, SRpcMsg *pMsg) {
pContext->pConn = NULL; pContext->pConn = NULL;
if (pContext->pRsp) { if (pContext->pRsp) {
// for synchronous API // for synchronous API
memcpy(pContext->pSet, &pContext->ipSet, sizeof(SRpcIpSet)); memcpy(pContext->pSet, &pContext->epSet, sizeof(SRpcEpSet));
memcpy(pContext->pRsp, pMsg, sizeof(SRpcMsg)); memcpy(pContext->pRsp, pMsg, sizeof(SRpcMsg));
tsem_post(pContext->pSem); tsem_post(pContext->pSem);
} else { } else {
// for asynchronous API // for asynchronous API
SRpcIpSet *pIpSet = NULL; SRpcEpSet *pEpSet = NULL;
if (pContext->ipSet.inUse != pContext->oldInUse || pContext->redirect) if (pContext->epSet.inUse != pContext->oldInUse || pContext->redirect)
pIpSet = &pContext->ipSet; pEpSet = &pContext->epSet;
(*pRpc->cfp)(pMsg, pIpSet); (*pRpc->cfp)(pMsg, pEpSet);
} }
// free the request message // free the request message
...@@ -1070,9 +1070,9 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) { ...@@ -1070,9 +1070,9 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) {
pConn->pContext = NULL; pConn->pContext = NULL;
pConn->pReqMsg = NULL; pConn->pReqMsg = NULL;
// for UDP, port may be changed by server, the port in ipSet shall be used for cache // for UDP, port may be changed by server, the port in epSet shall be used for cache
if (pHead->code != TSDB_CODE_RPC_TOO_SLOW) { if (pHead->code != TSDB_CODE_RPC_TOO_SLOW) {
rpcAddConnIntoCache(pRpc->pCache, pConn, pConn->peerFqdn, pContext->ipSet.port[pContext->ipSet.inUse], pConn->connType); rpcAddConnIntoCache(pRpc->pCache, pConn, pConn->peerFqdn, pContext->epSet.port[pContext->epSet.inUse], pConn->connType);
} else { } else {
rpcCloseConn(pConn); rpcCloseConn(pConn);
} }
...@@ -1087,10 +1087,10 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) { ...@@ -1087,10 +1087,10 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) {
if (pHead->code == TSDB_CODE_RPC_REDIRECT) { if (pHead->code == TSDB_CODE_RPC_REDIRECT) {
pContext->numOfTry = 0; pContext->numOfTry = 0;
memcpy(&pContext->ipSet, pHead->content, sizeof(pContext->ipSet)); memcpy(&pContext->epSet, pHead->content, sizeof(pContext->epSet));
tDebug("%s, redirect is received, numOfIps:%d", pConn->info, pContext->ipSet.numOfIps); tDebug("%s, redirect is received, numOfEps:%d", pConn->info, pContext->epSet.numOfEps);
for (int i=0; i<pContext->ipSet.numOfIps; ++i) for (int i=0; i<pContext->epSet.numOfEps; ++i)
pContext->ipSet.port[i] = htons(pContext->ipSet.port[i]); pContext->epSet.port[i] = htons(pContext->epSet.port[i]);
rpcSendReqToServer(pRpc, pContext); rpcSendReqToServer(pRpc, pContext);
rpcFreeCont(rpcMsg.pCont); rpcFreeCont(rpcMsg.pCont);
} else if (pHead->code == TSDB_CODE_RPC_NOT_READY) { } else if (pHead->code == TSDB_CODE_RPC_NOT_READY) {
...@@ -1269,7 +1269,7 @@ static void rpcProcessConnError(void *param, void *id) { ...@@ -1269,7 +1269,7 @@ static void rpcProcessConnError(void *param, void *id) {
tDebug("%s %p, connection error happens", pRpc->label, pContext->ahandle); tDebug("%s %p, connection error happens", pRpc->label, pContext->ahandle);
if (pContext->numOfTry >= pContext->ipSet.numOfIps) { if (pContext->numOfTry >= pContext->epSet.numOfEps) {
rpcMsg.msgType = pContext->msgType+1; rpcMsg.msgType = pContext->msgType+1;
rpcMsg.ahandle = pContext->ahandle; rpcMsg.ahandle = pContext->ahandle;
rpcMsg.code = pContext->code; rpcMsg.code = pContext->code;
...@@ -1279,8 +1279,8 @@ static void rpcProcessConnError(void *param, void *id) { ...@@ -1279,8 +1279,8 @@ static void rpcProcessConnError(void *param, void *id) {
rpcNotifyClient(pContext, &rpcMsg); rpcNotifyClient(pContext, &rpcMsg);
} else { } else {
// move to next IP // move to next IP
pContext->ipSet.inUse++; pContext->epSet.inUse++;
pContext->ipSet.inUse = pContext->ipSet.inUse % pContext->ipSet.numOfIps; pContext->epSet.inUse = pContext->epSet.inUse % pContext->epSet.numOfEps;
rpcSendReqToServer(pRpc, pContext); rpcSendReqToServer(pRpc, pContext);
} }
} }
......
...@@ -22,7 +22,7 @@ ...@@ -22,7 +22,7 @@
typedef struct { typedef struct {
int index; int index;
SRpcIpSet ipSet; SRpcEpSet epSet;
int num; int num;
int numOfReqs; int numOfReqs;
int msgSize; int msgSize;
...@@ -32,11 +32,11 @@ typedef struct { ...@@ -32,11 +32,11 @@ typedef struct {
void *pRpc; void *pRpc;
} SInfo; } SInfo;
static void processResponse(SRpcMsg *pMsg, SRpcIpSet *pIpSet) { static void processResponse(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
SInfo *pInfo = (SInfo *)pMsg->ahandle; SInfo *pInfo = (SInfo *)pMsg->ahandle;
tDebug("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, pMsg->msgType, pMsg->contLen, pMsg->code); tDebug("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, pMsg->msgType, pMsg->contLen, pMsg->code);
if (pIpSet) pInfo->ipSet = *pIpSet; if (pEpSet) pInfo->epSet = *pEpSet;
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
sem_post(&pInfo->rspSem); sem_post(&pInfo->rspSem);
...@@ -57,7 +57,7 @@ static void *sendRequest(void *param) { ...@@ -57,7 +57,7 @@ static void *sendRequest(void *param) {
rpcMsg.ahandle = pInfo; rpcMsg.ahandle = pInfo;
rpcMsg.msgType = 1; rpcMsg.msgType = 1;
tDebug("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num); tDebug("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num);
rpcSendRequest(pInfo->pRpc, &pInfo->ipSet, &rpcMsg); rpcSendRequest(pInfo->pRpc, &pInfo->epSet, &rpcMsg);
if ( pInfo->num % 20000 == 0 ) if ( pInfo->num % 20000 == 0 )
tInfo("thread:%d, %d requests have been sent", pInfo->index, pInfo->num); tInfo("thread:%d, %d requests have been sent", pInfo->index, pInfo->num);
sem_wait(&pInfo->rspSem); sem_wait(&pInfo->rspSem);
...@@ -71,7 +71,7 @@ static void *sendRequest(void *param) { ...@@ -71,7 +71,7 @@ static void *sendRequest(void *param) {
int main(int argc, char *argv[]) { int main(int argc, char *argv[]) {
SRpcInit rpcInit; SRpcInit rpcInit;
SRpcIpSet ipSet; SRpcEpSet epSet;
int msgSize = 128; int msgSize = 128;
int numOfReqs = 0; int numOfReqs = 0;
int appThreads = 1; int appThreads = 1;
...@@ -82,12 +82,12 @@ int main(int argc, char *argv[]) { ...@@ -82,12 +82,12 @@ int main(int argc, char *argv[]) {
pthread_attr_t thattr; pthread_attr_t thattr;
// server info // server info
ipSet.numOfIps = 1; epSet.numOfEps = 1;
ipSet.inUse = 0; epSet.inUse = 0;
ipSet.port[0] = 7000; epSet.port[0] = 7000;
ipSet.port[1] = 7000; epSet.port[1] = 7000;
strcpy(ipSet.fqdn[0], serverIp); strcpy(epSet.fqdn[0], serverIp);
strcpy(ipSet.fqdn[1], "192.168.0.1"); strcpy(epSet.fqdn[1], "192.168.0.1");
// client info // client info
memset(&rpcInit, 0, sizeof(rpcInit)); memset(&rpcInit, 0, sizeof(rpcInit));
...@@ -105,9 +105,9 @@ int main(int argc, char *argv[]) { ...@@ -105,9 +105,9 @@ int main(int argc, char *argv[]) {
for (int i=1; i<argc; ++i) { for (int i=1; i<argc; ++i) {
if (strcmp(argv[i], "-p")==0 && i < argc-1) { if (strcmp(argv[i], "-p")==0 && i < argc-1) {
ipSet.port[0] = atoi(argv[++i]); epSet.port[0] = atoi(argv[++i]);
} else if (strcmp(argv[i], "-i") ==0 && i < argc-1) { } else if (strcmp(argv[i], "-i") ==0 && i < argc-1) {
tstrncpy(ipSet.fqdn[0], argv[++i], sizeof(ipSet.fqdn[0])); tstrncpy(epSet.fqdn[0], argv[++i], sizeof(epSet.fqdn[0]));
} else if (strcmp(argv[i], "-t")==0 && i < argc-1) { } else if (strcmp(argv[i], "-t")==0 && i < argc-1) {
rpcInit.numOfThreads = atoi(argv[++i]); rpcInit.numOfThreads = atoi(argv[++i]);
} else if (strcmp(argv[i], "-m")==0 && i < argc-1) { } else if (strcmp(argv[i], "-m")==0 && i < argc-1) {
...@@ -131,7 +131,7 @@ int main(int argc, char *argv[]) { ...@@ -131,7 +131,7 @@ int main(int argc, char *argv[]) {
} else { } else {
printf("\nusage: %s [options] \n", argv[0]); printf("\nusage: %s [options] \n", argv[0]);
printf(" [-i ip]: first server IP address, default is:%s\n", serverIp); printf(" [-i ip]: first server IP address, default is:%s\n", serverIp);
printf(" [-p port]: server port number, default is:%d\n", ipSet.port[0]); printf(" [-p port]: server port number, default is:%d\n", epSet.port[0]);
printf(" [-t threads]: number of rpc threads, default is:%d\n", rpcInit.numOfThreads); printf(" [-t threads]: number of rpc threads, default is:%d\n", rpcInit.numOfThreads);
printf(" [-s sessions]: number of rpc sessions, default is:%d\n", rpcInit.sessions); printf(" [-s sessions]: number of rpc sessions, default is:%d\n", rpcInit.sessions);
printf(" [-m msgSize]: message body size, default is:%d\n", msgSize); printf(" [-m msgSize]: message body size, default is:%d\n", msgSize);
...@@ -168,7 +168,7 @@ int main(int argc, char *argv[]) { ...@@ -168,7 +168,7 @@ int main(int argc, char *argv[]) {
for (int i=0; i<appThreads; ++i) { for (int i=0; i<appThreads; ++i) {
pInfo->index = i; pInfo->index = i;
pInfo->ipSet = ipSet; pInfo->epSet = epSet;
pInfo->numOfReqs = numOfReqs; pInfo->numOfReqs = numOfReqs;
pInfo->msgSize = msgSize; pInfo->msgSize = msgSize;
sem_init(&pInfo->rspSem, 0, 0); sem_init(&pInfo->rspSem, 0, 0);
......
...@@ -23,7 +23,7 @@ ...@@ -23,7 +23,7 @@
typedef struct { typedef struct {
int index; int index;
SRpcIpSet ipSet; SRpcEpSet epSet;
int num; int num;
int numOfReqs; int numOfReqs;
int msgSize; int msgSize;
...@@ -51,7 +51,7 @@ static void *sendRequest(void *param) { ...@@ -51,7 +51,7 @@ static void *sendRequest(void *param) {
rpcMsg.msgType = 1; rpcMsg.msgType = 1;
tDebug("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num); tDebug("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num);
rpcSendRecv(pInfo->pRpc, &pInfo->ipSet, &rpcMsg, &rspMsg); rpcSendRecv(pInfo->pRpc, &pInfo->epSet, &rpcMsg, &rspMsg);
// handle response // handle response
if (rspMsg.code != 0) terror++; if (rspMsg.code != 0) terror++;
...@@ -72,7 +72,7 @@ static void *sendRequest(void *param) { ...@@ -72,7 +72,7 @@ static void *sendRequest(void *param) {
int main(int argc, char *argv[]) { int main(int argc, char *argv[]) {
SRpcInit rpcInit; SRpcInit rpcInit;
SRpcIpSet ipSet; SRpcEpSet epSet;
int msgSize = 128; int msgSize = 128;
int numOfReqs = 0; int numOfReqs = 0;
int appThreads = 1; int appThreads = 1;
...@@ -83,12 +83,12 @@ int main(int argc, char *argv[]) { ...@@ -83,12 +83,12 @@ int main(int argc, char *argv[]) {
pthread_attr_t thattr; pthread_attr_t thattr;
// server info // server info
ipSet.numOfIps = 1; epSet.numOfEps = 1;
ipSet.inUse = 0; epSet.inUse = 0;
ipSet.port[0] = 7000; epSet.port[0] = 7000;
ipSet.port[1] = 7000; epSet.port[1] = 7000;
strcpy(ipSet.fqdn[0], serverIp); strcpy(epSet.fqdn[0], serverIp);
strcpy(ipSet.fqdn[1], "192.168.0.1"); strcpy(epSet.fqdn[1], "192.168.0.1");
// client info // client info
memset(&rpcInit, 0, sizeof(rpcInit)); memset(&rpcInit, 0, sizeof(rpcInit));
...@@ -106,9 +106,9 @@ int main(int argc, char *argv[]) { ...@@ -106,9 +106,9 @@ int main(int argc, char *argv[]) {
for (int i=1; i<argc; ++i) { for (int i=1; i<argc; ++i) {
if (strcmp(argv[i], "-p")==0 && i < argc-1) { if (strcmp(argv[i], "-p")==0 && i < argc-1) {
ipSet.port[0] = atoi(argv[++i]); epSet.port[0] = atoi(argv[++i]);
} else if (strcmp(argv[i], "-i") ==0 && i < argc-1) { } else if (strcmp(argv[i], "-i") ==0 && i < argc-1) {
tstrncpy(ipSet.fqdn[0], argv[++i], sizeof(ipSet.fqdn[0])); tstrncpy(epSet.fqdn[0], argv[++i], sizeof(epSet.fqdn[0]));
} else if (strcmp(argv[i], "-t")==0 && i < argc-1) { } else if (strcmp(argv[i], "-t")==0 && i < argc-1) {
rpcInit.numOfThreads = atoi(argv[++i]); rpcInit.numOfThreads = atoi(argv[++i]);
} else if (strcmp(argv[i], "-m")==0 && i < argc-1) { } else if (strcmp(argv[i], "-m")==0 && i < argc-1) {
...@@ -132,7 +132,7 @@ int main(int argc, char *argv[]) { ...@@ -132,7 +132,7 @@ int main(int argc, char *argv[]) {
} else { } else {
printf("\nusage: %s [options] \n", argv[0]); printf("\nusage: %s [options] \n", argv[0]);
printf(" [-i ip]: first server IP address, default is:%s\n", serverIp); printf(" [-i ip]: first server IP address, default is:%s\n", serverIp);
printf(" [-p port]: server port number, default is:%d\n", ipSet.port[0]); printf(" [-p port]: server port number, default is:%d\n", epSet.port[0]);
printf(" [-t threads]: number of rpc threads, default is:%d\n", rpcInit.numOfThreads); printf(" [-t threads]: number of rpc threads, default is:%d\n", rpcInit.numOfThreads);
printf(" [-s sessions]: number of rpc sessions, default is:%d\n", rpcInit.sessions); printf(" [-s sessions]: number of rpc sessions, default is:%d\n", rpcInit.sessions);
printf(" [-m msgSize]: message body size, default is:%d\n", msgSize); printf(" [-m msgSize]: message body size, default is:%d\n", msgSize);
...@@ -168,7 +168,7 @@ int main(int argc, char *argv[]) { ...@@ -168,7 +168,7 @@ int main(int argc, char *argv[]) {
for (int i=0; i<appThreads; ++i) { for (int i=0; i<appThreads; ++i) {
pInfo->index = i; pInfo->index = i;
pInfo->ipSet = ipSet; pInfo->epSet = epSet;
pInfo->numOfReqs = numOfReqs; pInfo->numOfReqs = numOfReqs;
pInfo->msgSize = msgSize; pInfo->msgSize = msgSize;
sem_init(&pInfo->rspSem, 0, 0); sem_init(&pInfo->rspSem, 0, 0);
......
...@@ -103,7 +103,7 @@ int retrieveAuthInfo(char *meterId, char *spi, char *encrypt, char *secret, char ...@@ -103,7 +103,7 @@ int retrieveAuthInfo(char *meterId, char *spi, char *encrypt, char *secret, char
return ret; return ret;
} }
void processRequestMsg(SRpcMsg *pMsg, SRpcIpSet *pIpSet) { void processRequestMsg(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
SRpcMsg *pTemp; SRpcMsg *pTemp;
pTemp = taosAllocateQitem(sizeof(SRpcMsg)); pTemp = taosAllocateQitem(sizeof(SRpcMsg));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册