提交 f349c8af 编写于 作者: dengyihao's avatar dengyihao

TD-2257

上级 07bf2b3b
......@@ -278,7 +278,7 @@ bool hasMoreClauseToTry(SSqlObj* pSql);
void tscTryQueryNextVnode(SSqlObj *pSql, __async_cb_func_t fp);
void tscAsyncQuerySingleRowForNextVnode(void *param, TAOS_RES *tres, int numOfRows);
void tscTryQueryNextClause(SSqlObj* pSql, __async_cb_func_t fp);
int tscSetMgmtEpSetFromCfg(const char *first, const char *second);
int tscSetMgmtEpSetFromCfg(const char *first, const char *second, SRpcCorEpSet *corEpSet);
bool tscSetSqlOwner(SSqlObj* pSql);
void tscClearSqlOwner(SSqlObj* pSql);
......
......@@ -336,6 +336,7 @@ typedef struct STscObj {
struct SSqlObj * pHb;
struct SSqlObj * sqlList;
struct SSqlStream *streamList;
SRpcCorEpSet *tscCorMgmtEpSet;
void* pDnodeConn;
pthread_mutex_t mutex;
T_REF_DECLARE()
......@@ -515,7 +516,6 @@ extern int tsInsertHeadSize;
extern int tscNumOfThreads;
extern int tscRefId;
extern SRpcCorEpSet tscMgmtEpSet;
extern int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo);
......
......@@ -26,7 +26,7 @@
#include "ttimer.h"
#include "tlockfree.h"
SRpcCorEpSet tscMgmtEpSet;
///SRpcCorEpSet tscMgmtEpSet;
int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo) = {0};
......@@ -73,10 +73,11 @@ static void tscSetDnodeEpSet(SSqlObj* pSql, SVgroupInfo* pVgroupInfo) {
assert(hasFqdn);
}
static void tscDumpMgmtEpSet(SRpcEpSet *epSet) {
taosCorBeginRead(&tscMgmtEpSet.version);
*epSet = tscMgmtEpSet.epSet;
taosCorEndRead(&tscMgmtEpSet.version);
static void tscDumpMgmtEpSet(SSqlObj *pSql) {
SRpcCorEpSet *pCorEpSet = pSql->pTscObj->tscCorMgmtEpSet;
taosCorBeginRead(&pCorEpSet->version);
pSql->epSet = pCorEpSet->epSet;
taosCorEndRead(&pCorEpSet->version);
}
static void tscEpSetHtons(SRpcEpSet *s) {
for (int32_t i = 0; i < s->numOfEps; i++) {
......@@ -94,11 +95,12 @@ bool tscEpSetIsEqual(SRpcEpSet *s1, SRpcEpSet *s2) {
}
return true;
}
void tscUpdateMgmtEpSet(SRpcEpSet *pEpSet) {
void tscUpdateMgmtEpSet(SSqlObj *pSql, SRpcEpSet *pEpSet) {
// no need to update if equal
taosCorBeginWrite(&tscMgmtEpSet.version);
tscMgmtEpSet.epSet = *pEpSet;
taosCorEndWrite(&tscMgmtEpSet.version);
SRpcCorEpSet *pCorEpSet = pSql->pTscObj->tscCorMgmtEpSet;
taosCorBeginWrite(&pCorEpSet->version);
pCorEpSet->epSet = *pEpSet;
taosCorEndWrite(&pCorEpSet->version);
}
static void tscDumpEpSetFromVgroupInfo(SCorVgroupInfo *pVgroupInfo, SRpcEpSet *pEpSet) {
if (pVgroupInfo == NULL) { return;}
......@@ -133,18 +135,6 @@ static void tscUpdateVgroupInfo(SSqlObj *pObj, SRpcEpSet *pEpSet) {
taosCorEndWrite(&pVgroupInfo->version);
}
void tscPrintMgmtEp() {
SRpcEpSet dump;
tscDumpMgmtEpSet(&dump);
if (dump.numOfEps <= 0) {
tscError("invalid mnode EP list:%d", dump.numOfEps);
} else {
for (int i = 0; i < dump.numOfEps; ++i) {
tscDebug("mnode index:%d %s:%d", i, dump.fqdn[i], dump.port[i]);
}
}
}
void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
STscObj *pObj = (STscObj *)param;
if (pObj == NULL) return;
......@@ -162,7 +152,7 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
SRpcEpSet * epSet = &pRsp->epSet;
if (epSet->numOfEps > 0) {
tscEpSetHtons(epSet);
tscUpdateMgmtEpSet(epSet);
tscUpdateMgmtEpSet(pSql, epSet);
}
pSql->pTscObj->connId = htonl(pRsp->connId);
......@@ -227,7 +217,7 @@ int tscSendMsgToServer(SSqlObj *pSql) {
// set the mgmt ip list
if (pSql->cmd.command >= TSDB_SQL_MGMT) {
tscDumpMgmtEpSet(&pSql->epSet);
tscDumpMgmtEpSet(pSql);
}
memcpy(pMsg, pSql->cmd.payload, pSql->cmd.payloadLen);
......@@ -289,7 +279,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
if (pCmd->command < TSDB_SQL_MGMT) {
tscUpdateVgroupInfo(pSql, pEpSet);
} else {
tscUpdateMgmtEpSet(pEpSet);
tscUpdateMgmtEpSet(pSql, pEpSet);
}
}
}
......@@ -2070,7 +2060,7 @@ int tscProcessConnectRsp(SSqlObj *pSql) {
if (pConnect->epSet.numOfEps > 0) {
tscEpSetHtons(&pConnect->epSet);
tscUpdateMgmtEpSet(&pConnect->epSet);
tscUpdateMgmtEpSet(pSql, &pConnect->epSet);
}
strcpy(pObj->sversion, pConnect->serverVersion);
......
......@@ -58,6 +58,7 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa
terrno = TSDB_CODE_TSC_INVALID_USER_LENGTH;
return NULL;
}
SRpcCorEpSet corMgmtEpSet;
char secretEncrypt[32] = {0};
int secretEncryptLen = 0;
......@@ -82,11 +83,13 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa
}
secretEncryptLen = outlen;
}
if (ip) {
if (tscSetMgmtEpSetFromCfg(ip, NULL) < 0) return NULL;
if (port) tscMgmtEpSet.epSet.port[0] = port;
}
if (tscSetMgmtEpSetFromCfg(ip, NULL, &corMgmtEpSet) < 0) return NULL;
if (port) corMgmtEpSet.epSet.port[0] = port;
} else {
if (tscSetMgmtEpSetFromCfg(tsFirst, tsSecond, &corMgmtEpSet) < 0) return NULL;
}
void *pDnodeConn = NULL;
if (tscInitRpc(user, secretEncrypt, &pDnodeConn) != 0) {
......@@ -100,10 +103,20 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa
rpcClose(pDnodeConn);
return NULL;
}
// set up tscObj's mgmtEpSet
pObj->tscCorMgmtEpSet = (SRpcCorEpSet *)malloc(sizeof(SRpcCorEpSet));
if (NULL == pObj->tscCorMgmtEpSet) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
rpcClose(pDnodeConn);
free(pObj->tscCorMgmtEpSet);
free(pObj);
}
memcpy(pObj->tscCorMgmtEpSet, &corMgmtEpSet, sizeof(SRpcCorEpSet));
pObj->signature = pObj;
pObj->pDnodeConn = pDnodeConn;
T_REF_INIT_VAL(pObj, 1);
tstrncpy(pObj->user, user, sizeof(pObj->user));
secretEncryptLen = MIN(secretEncryptLen, sizeof(pObj->pass));
......@@ -115,6 +128,7 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa
if (len >= TSDB_DB_NAME_LEN) {
terrno = TSDB_CODE_TSC_INVALID_DB_LENGTH;
rpcClose(pDnodeConn);
free(pObj->tscCorMgmtEpSet);
free(pObj);
return NULL;
}
......@@ -132,6 +146,7 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa
if (NULL == pSql) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
rpcClose(pDnodeConn);
free(pObj->tscCorMgmtEpSet);
free(pObj);
return NULL;
}
......@@ -149,6 +164,7 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
rpcClose(pDnodeConn);
free(pSql);
free(pObj->tscCorMgmtEpSet);
free(pObj);
return NULL;
}
......
......@@ -116,11 +116,6 @@ void taos_init_imp(void) {
taosInitNote(tsNumOfLogLines / 10, 1, (char*)"tsc_note");
}
if (tscSetMgmtEpSetFromCfg(tsFirst, tsSecond) < 0) {
tscError("failed to init mnode EP list");
return;
}
tscInitMsgsFp();
int queueSize = tsMaxConnections*2;
......
......@@ -867,7 +867,7 @@ void tscCloseTscObj(void *param) {
rpcClose(pObj->pDnodeConn);
pObj->pDnodeConn = NULL;
}
tfree(pObj->tscCorMgmtEpSet);
pthread_mutex_destroy(&pObj->mutex);
tscDebug("%p DB connection is closed, dnodeConn:%p", pObj, p);
......@@ -2459,10 +2459,10 @@ char* strdup_throw(const char* str) {
return p;
}
int tscSetMgmtEpSetFromCfg(const char *first, const char *second) {
int tscSetMgmtEpSetFromCfg(const char *first, const char *second, SRpcCorEpSet *corMgmtEpSet) {
corMgmtEpSet->version = 0;
// init mgmt ip set
tscMgmtEpSet.version = 0;
SRpcEpSet *mgmtEpSet = &(tscMgmtEpSet.epSet);
SRpcEpSet *mgmtEpSet = &(corMgmtEpSet->epSet);
mgmtEpSet->numOfEps = 0;
mgmtEpSet->inUse = 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册