提交 367fac8b 编写于 作者: dengyihao's avatar dengyihao

set mnode list

上级 0081f2e7
...@@ -277,7 +277,7 @@ bool hasMoreClauseToTry(SSqlObj* pSql); ...@@ -277,7 +277,7 @@ bool hasMoreClauseToTry(SSqlObj* pSql);
void tscTryQueryNextVnode(SSqlObj *pSql, __async_cb_func_t fp); void tscTryQueryNextVnode(SSqlObj *pSql, __async_cb_func_t fp);
void tscAsyncQuerySingleRowForNextVnode(void *param, TAOS_RES *tres, int numOfRows); void tscAsyncQuerySingleRowForNextVnode(void *param, TAOS_RES *tres, int numOfRows);
void tscTryQueryNextClause(SSqlObj* pSql, __async_cb_func_t fp); void tscTryQueryNextClause(SSqlObj* pSql, __async_cb_func_t fp);
int tscSetMgmtEpSetFromCfg(const char *first, const char *second); int tscSetMgmtEpSetFromCfg(const char *first, const char *second, SRpcCorEpSet *corEpSet);
bool tscSetSqlOwner(SSqlObj* pSql); bool tscSetSqlOwner(SSqlObj* pSql);
void tscClearSqlOwner(SSqlObj* pSql); void tscClearSqlOwner(SSqlObj* pSql);
......
...@@ -334,6 +334,7 @@ typedef struct STscObj { ...@@ -334,6 +334,7 @@ typedef struct STscObj {
int64_t hbrid; int64_t hbrid;
struct SSqlObj * sqlList; struct SSqlObj * sqlList;
struct SSqlStream *streamList; struct SSqlStream *streamList;
SRpcCorEpSet *tscCorMgmtEpSet;
void* pDnodeConn; void* pDnodeConn;
pthread_mutex_t mutex; pthread_mutex_t mutex;
T_REF_DECLARE() T_REF_DECLARE()
...@@ -512,8 +513,6 @@ extern int tsInsertHeadSize; ...@@ -512,8 +513,6 @@ extern int tsInsertHeadSize;
extern int tscNumOfThreads; extern int tscNumOfThreads;
extern int tscRefId; extern int tscRefId;
extern SRpcCorEpSet tscMgmtEpSet;
extern int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo); extern int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo);
void tscBuildVgroupTableInfo(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, SArray* tables); void tscBuildVgroupTableInfo(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, SArray* tables);
......
...@@ -26,8 +26,6 @@ ...@@ -26,8 +26,6 @@
#include "ttimer.h" #include "ttimer.h"
#include "tlockfree.h" #include "tlockfree.h"
SRpcCorEpSet tscMgmtEpSet;
int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo) = {0}; int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo) = {0};
int (*tscProcessMsgRsp[TSDB_SQL_MAX])(SSqlObj *pSql); int (*tscProcessMsgRsp[TSDB_SQL_MAX])(SSqlObj *pSql);
...@@ -73,10 +71,11 @@ static void tscSetDnodeEpSet(SSqlObj* pSql, SVgroupInfo* pVgroupInfo) { ...@@ -73,10 +71,11 @@ static void tscSetDnodeEpSet(SSqlObj* pSql, SVgroupInfo* pVgroupInfo) {
assert(hasFqdn); assert(hasFqdn);
} }
static void tscDumpMgmtEpSet(SRpcEpSet *epSet) { static void tscDumpMgmtEpSet(SSqlObj *pSql) {
taosCorBeginRead(&tscMgmtEpSet.version); SRpcCorEpSet *pCorEpSet = pSql->pTscObj->tscCorMgmtEpSet;
*epSet = tscMgmtEpSet.epSet; taosCorBeginRead(&pCorEpSet->version);
taosCorEndRead(&tscMgmtEpSet.version); pSql->epSet = pCorEpSet->epSet;
taosCorEndRead(&pCorEpSet->version);
} }
static void tscEpSetHtons(SRpcEpSet *s) { static void tscEpSetHtons(SRpcEpSet *s) {
for (int32_t i = 0; i < s->numOfEps; i++) { for (int32_t i = 0; i < s->numOfEps; i++) {
...@@ -94,12 +93,13 @@ bool tscEpSetIsEqual(SRpcEpSet *s1, SRpcEpSet *s2) { ...@@ -94,12 +93,13 @@ bool tscEpSetIsEqual(SRpcEpSet *s1, SRpcEpSet *s2) {
} }
return true; return true;
} }
void tscUpdateMgmtEpSet(SRpcEpSet *pEpSet) { void tscUpdateMgmtEpSet(SSqlObj *pSql, SRpcEpSet *pEpSet) {
// no need to update if equal SRpcCorEpSet *pCorEpSet = pSql->pTscObj->tscCorMgmtEpSet;
taosCorBeginWrite(&tscMgmtEpSet.version); taosCorBeginWrite(&pCorEpSet->version);
tscMgmtEpSet.epSet = *pEpSet; pCorEpSet->epSet = *pEpSet;
taosCorEndWrite(&tscMgmtEpSet.version); taosCorEndWrite(&pCorEpSet->version);
} }
static void tscDumpEpSetFromVgroupInfo(SCorVgroupInfo *pVgroupInfo, SRpcEpSet *pEpSet) { static void tscDumpEpSetFromVgroupInfo(SCorVgroupInfo *pVgroupInfo, SRpcEpSet *pEpSet) {
if (pVgroupInfo == NULL) { return;} if (pVgroupInfo == NULL) { return;}
taosCorBeginRead(&pVgroupInfo->version); taosCorBeginRead(&pVgroupInfo->version);
...@@ -133,18 +133,6 @@ static void tscUpdateVgroupInfo(SSqlObj *pObj, SRpcEpSet *pEpSet) { ...@@ -133,18 +133,6 @@ static void tscUpdateVgroupInfo(SSqlObj *pObj, SRpcEpSet *pEpSet) {
taosCorEndWrite(&pVgroupInfo->version); taosCorEndWrite(&pVgroupInfo->version);
} }
void tscPrintMgmtEp() {
SRpcEpSet dump;
tscDumpMgmtEpSet(&dump);
if (dump.numOfEps <= 0) {
tscError("invalid mnode EP list:%d", dump.numOfEps);
} else {
for (int i = 0; i < dump.numOfEps; ++i) {
tscDebug("mnode index:%d %s:%d", i, dump.fqdn[i], dump.port[i]);
}
}
}
void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) { void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
STscObj *pObj = (STscObj *)param; STscObj *pObj = (STscObj *)param;
if (pObj == NULL) return; if (pObj == NULL) return;
...@@ -162,7 +150,7 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) { ...@@ -162,7 +150,7 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
SRpcEpSet * epSet = &pRsp->epSet; SRpcEpSet * epSet = &pRsp->epSet;
if (epSet->numOfEps > 0) { if (epSet->numOfEps > 0) {
tscEpSetHtons(epSet); tscEpSetHtons(epSet);
tscUpdateMgmtEpSet(epSet); tscUpdateMgmtEpSet(pSql, epSet);
} }
pSql->pTscObj->connId = htonl(pRsp->connId); pSql->pTscObj->connId = htonl(pRsp->connId);
...@@ -217,7 +205,7 @@ int tscSendMsgToServer(SSqlObj *pSql) { ...@@ -217,7 +205,7 @@ int tscSendMsgToServer(SSqlObj *pSql) {
// set the mgmt ip list // set the mgmt ip list
if (pSql->cmd.command >= TSDB_SQL_MGMT) { if (pSql->cmd.command >= TSDB_SQL_MGMT) {
tscDumpMgmtEpSet(&pSql->epSet); tscDumpMgmtEpSet(pSql);
} }
memcpy(pMsg, pSql->cmd.payload, pSql->cmd.payloadLen); memcpy(pMsg, pSql->cmd.payload, pSql->cmd.payloadLen);
...@@ -280,7 +268,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { ...@@ -280,7 +268,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
if (pCmd->command < TSDB_SQL_MGMT) { if (pCmd->command < TSDB_SQL_MGMT) {
tscUpdateVgroupInfo(pSql, pEpSet); tscUpdateVgroupInfo(pSql, pEpSet);
} else { } else {
tscUpdateMgmtEpSet(pEpSet); tscUpdateMgmtEpSet(pSql, pEpSet);
} }
} }
} }
...@@ -1996,7 +1984,7 @@ int tscProcessConnectRsp(SSqlObj *pSql) { ...@@ -1996,7 +1984,7 @@ int tscProcessConnectRsp(SSqlObj *pSql) {
if (pConnect->epSet.numOfEps > 0) { if (pConnect->epSet.numOfEps > 0) {
tscEpSetHtons(&pConnect->epSet); tscEpSetHtons(&pConnect->epSet);
tscUpdateMgmtEpSet(&pConnect->epSet); tscUpdateMgmtEpSet(pSql, &pConnect->epSet);
} }
strcpy(pObj->sversion, pConnect->serverVersion); strcpy(pObj->sversion, pConnect->serverVersion);
......
...@@ -60,6 +60,7 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa ...@@ -60,6 +60,7 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa
return NULL; return NULL;
} }
SRpcCorEpSet corMgmtEpSet;
char secretEncrypt[32] = {0}; char secretEncrypt[32] = {0};
int secretEncryptLen = 0; int secretEncryptLen = 0;
if (auth == NULL) { if (auth == NULL) {
...@@ -83,10 +84,11 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa ...@@ -83,10 +84,11 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa
} }
secretEncryptLen = outlen; secretEncryptLen = outlen;
} }
if (ip) { if (ip) {
if (tscSetMgmtEpSetFromCfg(ip, NULL) < 0) return NULL; if (tscSetMgmtEpSetFromCfg(ip, NULL, &corMgmtEpSet) < 0) return NULL;
if (port) tscMgmtEpSet.epSet.port[0] = port; if (port) corMgmtEpSet.epSet.port[0] = port;
} else {
if (tscSetMgmtEpSetFromCfg(tsFirst, tsSecond, &corMgmtEpSet) < 0) return NULL;
} }
void *pDnodeConn = NULL; void *pDnodeConn = NULL;
...@@ -101,6 +103,16 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa ...@@ -101,6 +103,16 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa
rpcClose(pDnodeConn); rpcClose(pDnodeConn);
return NULL; return NULL;
} }
// set up tscObj's mgmtEpSet
pObj->tscCorMgmtEpSet = (SRpcCorEpSet *)malloc(sizeof(SRpcCorEpSet));
if (NULL == pObj->tscCorMgmtEpSet) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
rpcClose(pDnodeConn);
free(pObj->tscCorMgmtEpSet);
free(pObj);
}
memcpy(pObj->tscCorMgmtEpSet, &corMgmtEpSet, sizeof(SRpcCorEpSet));
pObj->signature = pObj; pObj->signature = pObj;
pObj->pDnodeConn = pDnodeConn; pObj->pDnodeConn = pDnodeConn;
...@@ -116,6 +128,7 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa ...@@ -116,6 +128,7 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa
if (len >= TSDB_DB_NAME_LEN) { if (len >= TSDB_DB_NAME_LEN) {
terrno = TSDB_CODE_TSC_INVALID_DB_LENGTH; terrno = TSDB_CODE_TSC_INVALID_DB_LENGTH;
rpcClose(pDnodeConn); rpcClose(pDnodeConn);
free(pObj->tscCorMgmtEpSet);
free(pObj); free(pObj);
return NULL; return NULL;
} }
...@@ -133,6 +146,7 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa ...@@ -133,6 +146,7 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa
if (NULL == pSql) { if (NULL == pSql) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
rpcClose(pDnodeConn); rpcClose(pDnodeConn);
free(pObj->tscCorMgmtEpSet);
free(pObj); free(pObj);
return NULL; return NULL;
} }
...@@ -150,6 +164,7 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa ...@@ -150,6 +164,7 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
rpcClose(pDnodeConn); rpcClose(pDnodeConn);
free(pSql); free(pSql);
free(pObj->tscCorMgmtEpSet);
free(pObj); free(pObj);
return NULL; return NULL;
} }
......
...@@ -116,11 +116,6 @@ void taos_init_imp(void) { ...@@ -116,11 +116,6 @@ void taos_init_imp(void) {
taosInitNote(tsNumOfLogLines / 10, 1, (char*)"tsc_note"); taosInitNote(tsNumOfLogLines / 10, 1, (char*)"tsc_note");
} }
if (tscSetMgmtEpSetFromCfg(tsFirst, tsSecond) < 0) {
tscError("failed to init mnode EP list");
return;
}
tscInitMsgsFp(); tscInitMsgsFp();
int queueSize = tsMaxConnections*2; int queueSize = tsMaxConnections*2;
......
...@@ -783,7 +783,7 @@ void tscCloseTscObj(void *param) { ...@@ -783,7 +783,7 @@ void tscCloseTscObj(void *param) {
rpcClose(pObj->pDnodeConn); rpcClose(pObj->pDnodeConn);
pObj->pDnodeConn = NULL; pObj->pDnodeConn = NULL;
} }
tfree(pObj->tscCorMgmtEpSet);
pthread_mutex_destroy(&pObj->mutex); pthread_mutex_destroy(&pObj->mutex);
tscDebug("%p DB connection is closed, dnodeConn:%p", pObj, p); tscDebug("%p DB connection is closed, dnodeConn:%p", pObj, p);
...@@ -2378,10 +2378,10 @@ char* strdup_throw(const char* str) { ...@@ -2378,10 +2378,10 @@ char* strdup_throw(const char* str) {
return p; return p;
} }
int tscSetMgmtEpSetFromCfg(const char *first, const char *second) { int tscSetMgmtEpSetFromCfg(const char *first, const char *second, SRpcCorEpSet *corMgmtEpSet) {
// init mgmt ip set // init mgmt ip set
tscMgmtEpSet.version = 0; corMgmtEpSet->version = 0;
SRpcEpSet *mgmtEpSet = &(tscMgmtEpSet.epSet); SRpcEpSet *mgmtEpSet = &(corMgmtEpSet->epSet);
mgmtEpSet->numOfEps = 0; mgmtEpSet->numOfEps = 0;
mgmtEpSet->inUse = 0; mgmtEpSet->inUse = 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册