未验证 提交 deed895c 编写于 作者: H haojun Liao 提交者: GitHub

Merge pull request #4372 from taosdata/hotfix/TD-2257

TD-2257
...@@ -278,7 +278,7 @@ bool hasMoreClauseToTry(SSqlObj* pSql); ...@@ -278,7 +278,7 @@ bool hasMoreClauseToTry(SSqlObj* pSql);
void tscTryQueryNextVnode(SSqlObj *pSql, __async_cb_func_t fp); void tscTryQueryNextVnode(SSqlObj *pSql, __async_cb_func_t fp);
void tscAsyncQuerySingleRowForNextVnode(void *param, TAOS_RES *tres, int numOfRows); void tscAsyncQuerySingleRowForNextVnode(void *param, TAOS_RES *tres, int numOfRows);
void tscTryQueryNextClause(SSqlObj* pSql, __async_cb_func_t fp); void tscTryQueryNextClause(SSqlObj* pSql, __async_cb_func_t fp);
int tscSetMgmtEpSetFromCfg(const char *first, const char *second); int tscSetMgmtEpSetFromCfg(const char *first, const char *second, SRpcCorEpSet *corEpSet);
bool tscSetSqlOwner(SSqlObj* pSql); bool tscSetSqlOwner(SSqlObj* pSql);
void tscClearSqlOwner(SSqlObj* pSql); void tscClearSqlOwner(SSqlObj* pSql);
......
...@@ -339,6 +339,7 @@ typedef struct STscObj { ...@@ -339,6 +339,7 @@ typedef struct STscObj {
int64_t hbrid; int64_t hbrid;
struct SSqlObj * sqlList; struct SSqlObj * sqlList;
struct SSqlStream *streamList; struct SSqlStream *streamList;
SRpcCorEpSet *tscCorMgmtEpSet;
void* pDnodeConn; void* pDnodeConn;
pthread_mutex_t mutex; pthread_mutex_t mutex;
T_REF_DECLARE() T_REF_DECLARE()
...@@ -518,7 +519,6 @@ extern int tsInsertHeadSize; ...@@ -518,7 +519,6 @@ extern int tsInsertHeadSize;
extern int tscNumOfThreads; extern int tscNumOfThreads;
extern int tscRefId; extern int tscRefId;
extern SRpcCorEpSet tscMgmtEpSet;
extern int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo); extern int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo);
......
...@@ -26,7 +26,7 @@ ...@@ -26,7 +26,7 @@
#include "ttimer.h" #include "ttimer.h"
#include "tlockfree.h" #include "tlockfree.h"
SRpcCorEpSet tscMgmtEpSet; ///SRpcCorEpSet tscMgmtEpSet;
int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo) = {0}; int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo) = {0};
...@@ -73,10 +73,11 @@ static void tscSetDnodeEpSet(SSqlObj* pSql, SVgroupInfo* pVgroupInfo) { ...@@ -73,10 +73,11 @@ static void tscSetDnodeEpSet(SSqlObj* pSql, SVgroupInfo* pVgroupInfo) {
assert(hasFqdn); assert(hasFqdn);
} }
static void tscDumpMgmtEpSet(SRpcEpSet *epSet) { static void tscDumpMgmtEpSet(SSqlObj *pSql) {
taosCorBeginRead(&tscMgmtEpSet.version); SRpcCorEpSet *pCorEpSet = pSql->pTscObj->tscCorMgmtEpSet;
*epSet = tscMgmtEpSet.epSet; taosCorBeginRead(&pCorEpSet->version);
taosCorEndRead(&tscMgmtEpSet.version); pSql->epSet = pCorEpSet->epSet;
taosCorEndRead(&pCorEpSet->version);
} }
static void tscEpSetHtons(SRpcEpSet *s) { static void tscEpSetHtons(SRpcEpSet *s) {
for (int32_t i = 0; i < s->numOfEps; i++) { for (int32_t i = 0; i < s->numOfEps; i++) {
...@@ -94,11 +95,12 @@ bool tscEpSetIsEqual(SRpcEpSet *s1, SRpcEpSet *s2) { ...@@ -94,11 +95,12 @@ bool tscEpSetIsEqual(SRpcEpSet *s1, SRpcEpSet *s2) {
} }
return true; return true;
} }
void tscUpdateMgmtEpSet(SRpcEpSet *pEpSet) { void tscUpdateMgmtEpSet(SSqlObj *pSql, SRpcEpSet *pEpSet) {
// no need to update if equal // no need to update if equal
taosCorBeginWrite(&tscMgmtEpSet.version); SRpcCorEpSet *pCorEpSet = pSql->pTscObj->tscCorMgmtEpSet;
tscMgmtEpSet.epSet = *pEpSet; taosCorBeginWrite(&pCorEpSet->version);
taosCorEndWrite(&tscMgmtEpSet.version); pCorEpSet->epSet = *pEpSet;
taosCorEndWrite(&pCorEpSet->version);
} }
static void tscDumpEpSetFromVgroupInfo(SCorVgroupInfo *pVgroupInfo, SRpcEpSet *pEpSet) { static void tscDumpEpSetFromVgroupInfo(SCorVgroupInfo *pVgroupInfo, SRpcEpSet *pEpSet) {
if (pVgroupInfo == NULL) { return;} if (pVgroupInfo == NULL) { return;}
...@@ -133,18 +135,6 @@ static void tscUpdateVgroupInfo(SSqlObj *pObj, SRpcEpSet *pEpSet) { ...@@ -133,18 +135,6 @@ static void tscUpdateVgroupInfo(SSqlObj *pObj, SRpcEpSet *pEpSet) {
taosCorEndWrite(&pVgroupInfo->version); taosCorEndWrite(&pVgroupInfo->version);
} }
void tscPrintMgmtEp() {
SRpcEpSet dump;
tscDumpMgmtEpSet(&dump);
if (dump.numOfEps <= 0) {
tscError("invalid mnode EP list:%d", dump.numOfEps);
} else {
for (int i = 0; i < dump.numOfEps; ++i) {
tscDebug("mnode index:%d %s:%d", i, dump.fqdn[i], dump.port[i]);
}
}
}
void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) { void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
STscObj *pObj = (STscObj *)param; STscObj *pObj = (STscObj *)param;
if (pObj == NULL) return; if (pObj == NULL) return;
...@@ -162,7 +152,7 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) { ...@@ -162,7 +152,7 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
SRpcEpSet * epSet = &pRsp->epSet; SRpcEpSet * epSet = &pRsp->epSet;
if (epSet->numOfEps > 0) { if (epSet->numOfEps > 0) {
tscEpSetHtons(epSet); tscEpSetHtons(epSet);
tscUpdateMgmtEpSet(epSet); tscUpdateMgmtEpSet(pSql, epSet);
} }
pSql->pTscObj->connId = htonl(pRsp->connId); pSql->pTscObj->connId = htonl(pRsp->connId);
...@@ -219,7 +209,7 @@ int tscSendMsgToServer(SSqlObj *pSql) { ...@@ -219,7 +209,7 @@ int tscSendMsgToServer(SSqlObj *pSql) {
// set the mgmt ip list // set the mgmt ip list
if (pSql->cmd.command >= TSDB_SQL_MGMT) { if (pSql->cmd.command >= TSDB_SQL_MGMT) {
tscDumpMgmtEpSet(&pSql->epSet); tscDumpMgmtEpSet(pSql);
} }
memcpy(pMsg, pSql->cmd.payload, pSql->cmd.payloadLen); memcpy(pMsg, pSql->cmd.payload, pSql->cmd.payloadLen);
...@@ -277,7 +267,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { ...@@ -277,7 +267,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
if (pCmd->command < TSDB_SQL_MGMT) { if (pCmd->command < TSDB_SQL_MGMT) {
tscUpdateVgroupInfo(pSql, pEpSet); tscUpdateVgroupInfo(pSql, pEpSet);
} else { } else {
tscUpdateMgmtEpSet(pEpSet); tscUpdateMgmtEpSet(pSql, pEpSet);
} }
} }
} }
...@@ -2058,11 +2048,7 @@ int tscProcessConnectRsp(SSqlObj *pSql) { ...@@ -2058,11 +2048,7 @@ int tscProcessConnectRsp(SSqlObj *pSql) {
if (pConnect->epSet.numOfEps > 0) { if (pConnect->epSet.numOfEps > 0) {
tscEpSetHtons(&pConnect->epSet); tscEpSetHtons(&pConnect->epSet);
tscUpdateMgmtEpSet(&pConnect->epSet); tscUpdateMgmtEpSet(pSql, &pConnect->epSet);
for (int i = 0; i < pConnect->epSet.numOfEps; ++i) {
tscDebug("%p epSet.fqdn[%d]: %s, pObj:%p", pSql, i, pConnect->epSet.fqdn[i], pObj);
}
} }
strcpy(pObj->sversion, pConnect->serverVersion); strcpy(pObj->sversion, pConnect->serverVersion);
......
...@@ -58,6 +58,7 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa ...@@ -58,6 +58,7 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa
terrno = TSDB_CODE_TSC_INVALID_USER_LENGTH; terrno = TSDB_CODE_TSC_INVALID_USER_LENGTH;
return NULL; return NULL;
} }
SRpcCorEpSet corMgmtEpSet;
char secretEncrypt[32] = {0}; char secretEncrypt[32] = {0};
int secretEncryptLen = 0; int secretEncryptLen = 0;
...@@ -82,11 +83,13 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa ...@@ -82,11 +83,13 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa
} }
secretEncryptLen = outlen; secretEncryptLen = outlen;
} }
if (ip) { if (ip) {
if (tscSetMgmtEpSetFromCfg(ip, NULL) < 0) return NULL; if (tscSetMgmtEpSetFromCfg(ip, NULL, &corMgmtEpSet) < 0) return NULL;
if (port) tscMgmtEpSet.epSet.port[0] = port; if (port) corMgmtEpSet.epSet.port[0] = port;
} } else {
if (tscSetMgmtEpSetFromCfg(tsFirst, tsSecond, &corMgmtEpSet) < 0) return NULL;
}
void *pDnodeConn = NULL; void *pDnodeConn = NULL;
if (tscInitRpc(user, secretEncrypt, &pDnodeConn) != 0) { if (tscInitRpc(user, secretEncrypt, &pDnodeConn) != 0) {
...@@ -100,10 +103,20 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa ...@@ -100,10 +103,20 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa
rpcClose(pDnodeConn); rpcClose(pDnodeConn);
return NULL; return NULL;
} }
// set up tscObj's mgmtEpSet
pObj->tscCorMgmtEpSet = (SRpcCorEpSet *)malloc(sizeof(SRpcCorEpSet));
if (NULL == pObj->tscCorMgmtEpSet) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
rpcClose(pDnodeConn);
free(pObj->tscCorMgmtEpSet);
free(pObj);
}
memcpy(pObj->tscCorMgmtEpSet, &corMgmtEpSet, sizeof(SRpcCorEpSet));
pObj->signature = pObj; pObj->signature = pObj;
pObj->pDnodeConn = pDnodeConn; pObj->pDnodeConn = pDnodeConn;
T_REF_INIT_VAL(pObj, 1); T_REF_INIT_VAL(pObj, 1);
tstrncpy(pObj->user, user, sizeof(pObj->user)); tstrncpy(pObj->user, user, sizeof(pObj->user));
secretEncryptLen = MIN(secretEncryptLen, sizeof(pObj->pass)); secretEncryptLen = MIN(secretEncryptLen, sizeof(pObj->pass));
...@@ -115,6 +128,7 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa ...@@ -115,6 +128,7 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa
if (len >= TSDB_DB_NAME_LEN) { if (len >= TSDB_DB_NAME_LEN) {
terrno = TSDB_CODE_TSC_INVALID_DB_LENGTH; terrno = TSDB_CODE_TSC_INVALID_DB_LENGTH;
rpcClose(pDnodeConn); rpcClose(pDnodeConn);
free(pObj->tscCorMgmtEpSet);
free(pObj); free(pObj);
return NULL; return NULL;
} }
...@@ -132,6 +146,7 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa ...@@ -132,6 +146,7 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa
if (NULL == pSql) { if (NULL == pSql) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
rpcClose(pDnodeConn); rpcClose(pDnodeConn);
free(pObj->tscCorMgmtEpSet);
free(pObj); free(pObj);
return NULL; return NULL;
} }
...@@ -149,6 +164,7 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa ...@@ -149,6 +164,7 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
rpcClose(pDnodeConn); rpcClose(pDnodeConn);
free(pSql); free(pSql);
free(pObj->tscCorMgmtEpSet);
free(pObj); free(pObj);
return NULL; return NULL;
} }
......
...@@ -116,11 +116,6 @@ void taos_init_imp(void) { ...@@ -116,11 +116,6 @@ void taos_init_imp(void) {
taosInitNote(tsNumOfLogLines / 10, 1, (char*)"tsc_note"); taosInitNote(tsNumOfLogLines / 10, 1, (char*)"tsc_note");
} }
if (tscSetMgmtEpSetFromCfg(tsFirst, tsSecond) < 0) {
tscError("failed to init mnode EP list");
return;
}
tscInitMsgsFp(); tscInitMsgsFp();
int queueSize = tsMaxConnections*2; int queueSize = tsMaxConnections*2;
......
...@@ -864,7 +864,7 @@ void tscCloseTscObj(void *param) { ...@@ -864,7 +864,7 @@ void tscCloseTscObj(void *param) {
rpcClose(pObj->pDnodeConn); rpcClose(pObj->pDnodeConn);
pObj->pDnodeConn = NULL; pObj->pDnodeConn = NULL;
} }
tfree(pObj->tscCorMgmtEpSet);
pthread_mutex_destroy(&pObj->mutex); pthread_mutex_destroy(&pObj->mutex);
tscDebug("%p DB connection is closed, dnodeConn:%p", pObj, p); tscDebug("%p DB connection is closed, dnodeConn:%p", pObj, p);
...@@ -2440,10 +2440,10 @@ char* strdup_throw(const char* str) { ...@@ -2440,10 +2440,10 @@ char* strdup_throw(const char* str) {
return p; return p;
} }
int tscSetMgmtEpSetFromCfg(const char *first, const char *second) { int tscSetMgmtEpSetFromCfg(const char *first, const char *second, SRpcCorEpSet *corMgmtEpSet) {
corMgmtEpSet->version = 0;
// init mgmt ip set // init mgmt ip set
tscMgmtEpSet.version = 0; SRpcEpSet *mgmtEpSet = &(corMgmtEpSet->epSet);
SRpcEpSet *mgmtEpSet = &(tscMgmtEpSet.epSet);
mgmtEpSet->numOfEps = 0; mgmtEpSet->numOfEps = 0;
mgmtEpSet->inUse = 0; mgmtEpSet->inUse = 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册