提交 5476ccaa 编写于 作者: H hjxilinx

[td-225] use the dnodeConn instead of mgmtConn and vnodeConn

上级 b4d549eb
...@@ -304,6 +304,7 @@ typedef struct STscObj { ...@@ -304,6 +304,7 @@ typedef struct STscObj {
struct SSqlObj * pHb; struct SSqlObj * pHb;
struct SSqlObj * sqlList; struct SSqlObj * sqlList;
struct SSqlStream *streamList; struct SSqlStream *streamList;
void* pDnodeConn;
pthread_mutex_t mutex; pthread_mutex_t mutex;
} STscObj; } STscObj;
...@@ -359,7 +360,7 @@ typedef struct SSqlStream { ...@@ -359,7 +360,7 @@ typedef struct SSqlStream {
struct SSqlStream *prev, *next; struct SSqlStream *prev, *next;
} SSqlStream; } SSqlStream;
int32_t tscInitRpc(const char *user, const char *secret); int32_t tscInitRpc(const char *user, const char *secret, void** pDnodeConn);
void tscInitMsgsFp(); void tscInitMsgsFp();
int tsParseSql(SSqlObj *pSql, bool multiVnodeInsertion); int tsParseSql(SSqlObj *pSql, bool multiVnodeInsertion);
...@@ -425,7 +426,6 @@ void tscQueueAsyncFreeResult(SSqlObj *pSql); ...@@ -425,7 +426,6 @@ void tscQueueAsyncFreeResult(SSqlObj *pSql);
int32_t tscToSQLCmd(SSqlObj *pSql, struct SSqlInfo *pInfo); int32_t tscToSQLCmd(SSqlObj *pSql, struct SSqlInfo *pInfo);
void tscGetResultColumnChr(SSqlRes *pRes, SFieldInfo* pFieldInfo, int32_t column); void tscGetResultColumnChr(SSqlRes *pRes, SFieldInfo* pFieldInfo, int32_t column);
extern void * pDnodeConn;
extern void * tscCacheHandle; extern void * tscCacheHandle;
extern void * tscTmr; extern void * tscTmr;
extern void * tscQhandle; extern void * tscQhandle;
......
...@@ -191,6 +191,7 @@ void tscProcessActivityTimer(void *handle, void *tmrId) { ...@@ -191,6 +191,7 @@ void tscProcessActivityTimer(void *handle, void *tmrId) {
} }
int tscSendMsgToServer(SSqlObj *pSql) { int tscSendMsgToServer(SSqlObj *pSql) {
STscObj* pObj = pSql->pTscObj;
SSqlCmd* pCmd = &pSql->cmd; SSqlCmd* pCmd = &pSql->cmd;
char *pMsg = rpcMallocCont(pCmd->payloadLen); char *pMsg = rpcMallocCont(pCmd->payloadLen);
...@@ -215,7 +216,7 @@ int tscSendMsgToServer(SSqlObj *pSql) { ...@@ -215,7 +216,7 @@ int tscSendMsgToServer(SSqlObj *pSql) {
.handle = pSql, .handle = pSql,
.code = 0 .code = 0
}; };
rpcSendRequest(pDnodeConn, &pSql->ipList, &rpcMsg); rpcSendRequest(pObj->pDnodeConn, &pSql->ipList, &rpcMsg);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -65,8 +65,9 @@ STscObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con ...@@ -65,8 +65,9 @@ STscObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con
terrno = TSDB_CODE_INVALID_PASS; terrno = TSDB_CODE_INVALID_PASS;
return NULL; return NULL;
} }
if (tscInitRpc(user, pass) != 0) { void *pDnodeConn = NULL;
if (tscInitRpc(user, pass, &pDnodeConn) != 0) {
terrno = TSDB_CODE_NETWORK_UNAVAIL; terrno = TSDB_CODE_NETWORK_UNAVAIL;
return NULL; return NULL;
} }
...@@ -93,6 +94,7 @@ STscObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con ...@@ -93,6 +94,7 @@ STscObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con
STscObj *pObj = (STscObj *)calloc(1, sizeof(STscObj)); STscObj *pObj = (STscObj *)calloc(1, sizeof(STscObj));
if (NULL == pObj) { if (NULL == pObj) {
terrno = TSDB_CODE_CLI_OUT_OF_MEMORY; terrno = TSDB_CODE_CLI_OUT_OF_MEMORY;
rpcClose(pDnodeConn);
return NULL; return NULL;
} }
...@@ -106,8 +108,9 @@ STscObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con ...@@ -106,8 +108,9 @@ STscObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con
int32_t len = strlen(db); int32_t len = strlen(db);
/* db name is too long */ /* db name is too long */
if (len > TSDB_DB_NAME_LEN) { if (len > TSDB_DB_NAME_LEN) {
free(pObj);
terrno = TSDB_CODE_INVALID_DB; terrno = TSDB_CODE_INVALID_DB;
rpcClose(pDnodeConn);
free(pObj);
return NULL; return NULL;
} }
...@@ -123,6 +126,7 @@ STscObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con ...@@ -123,6 +126,7 @@ STscObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con
SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj)); SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj));
if (NULL == pSql) { if (NULL == pSql) {
terrno = TSDB_CODE_CLI_OUT_OF_MEMORY; terrno = TSDB_CODE_CLI_OUT_OF_MEMORY;
rpcClose(pDnodeConn);
free(pObj); free(pObj);
return NULL; return NULL;
} }
...@@ -134,6 +138,8 @@ STscObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con ...@@ -134,6 +138,8 @@ STscObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con
tsem_init(&pSql->rspSem, 0, 0); tsem_init(&pSql->rspSem, 0, 0);
pObj->pSql = pSql; pObj->pSql = pSql;
pObj->pDnodeConn = pDnodeConn;
pSql->fp = fp; pSql->fp = fp;
pSql->param = param; pSql->param = param;
if (taos != NULL) { if (taos != NULL) {
...@@ -143,6 +149,7 @@ STscObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con ...@@ -143,6 +149,7 @@ STscObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con
pSql->cmd.command = TSDB_SQL_CONNECT; pSql->cmd.command = TSDB_SQL_CONNECT;
if (TSDB_CODE_SUCCESS != tscAllocPayload(&pSql->cmd, TSDB_DEFAULT_PAYLOAD_SIZE)) { if (TSDB_CODE_SUCCESS != tscAllocPayload(&pSql->cmd, TSDB_DEFAULT_PAYLOAD_SIZE)) {
terrno = TSDB_CODE_CLI_OUT_OF_MEMORY; terrno = TSDB_CODE_CLI_OUT_OF_MEMORY;
rpcClose(pDnodeConn);
free(pSql); free(pSql);
free(pObj); free(pObj);
return NULL; return NULL;
......
...@@ -30,7 +30,6 @@ ...@@ -30,7 +30,6 @@
#include "tlocale.h" #include "tlocale.h"
// global, not configurable // global, not configurable
void * pDnodeConn;
void * tscCacheHandle; void * tscCacheHandle;
void * tscTmr; void * tscTmr;
void * tscQhandle; void * tscQhandle;
...@@ -48,12 +47,12 @@ void tscCheckDiskUsage(void *UNUSED_PARAM(para), void* UNUSED_PARAM(param)) { ...@@ -48,12 +47,12 @@ void tscCheckDiskUsage(void *UNUSED_PARAM(para), void* UNUSED_PARAM(param)) {
taosTmrReset(tscCheckDiskUsage, 1000, NULL, tscTmr, &tscCheckDiskUsageTmr); taosTmrReset(tscCheckDiskUsage, 1000, NULL, tscTmr, &tscCheckDiskUsageTmr);
} }
int32_t tscInitRpc(const char *user, const char *secret) { int32_t tscInitRpc(const char *user, const char *secret, void** pDnodeConn) {
SRpcInit rpcInit; SRpcInit rpcInit;
char secretEncrypt[32] = {0}; char secretEncrypt[32] = {0};
taosEncryptPass((uint8_t *)secret, strlen(secret), secretEncrypt); taosEncryptPass((uint8_t *)secret, strlen(secret), secretEncrypt);
if (pDnodeConn == NULL) { if (*pDnodeConn == NULL) {
memset(&rpcInit, 0, sizeof(rpcInit)); memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localPort = 0; rpcInit.localPort = 0;
rpcInit.label = "TSC"; rpcInit.label = "TSC";
...@@ -66,9 +65,9 @@ int32_t tscInitRpc(const char *user, const char *secret) { ...@@ -66,9 +65,9 @@ int32_t tscInitRpc(const char *user, const char *secret) {
rpcInit.ckey = "key"; rpcInit.ckey = "key";
rpcInit.secret = secretEncrypt; rpcInit.secret = secretEncrypt;
pDnodeConn = rpcOpen(&rpcInit); *pDnodeConn = rpcOpen(&rpcInit);
if (pDnodeConn == NULL) { if (*pDnodeConn == NULL) {
tscError("failed to init connection to vnode"); tscError("failed to init connection to TDengine");
return -1; return -1;
} }
} }
...@@ -168,11 +167,6 @@ void taos_cleanup() { ...@@ -168,11 +167,6 @@ void taos_cleanup() {
taosCloseLog(); taosCloseLog();
if (pDnodeConn != NULL) {
rpcClose(pDnodeConn);
pDnodeConn = NULL;
}
taosTmrCleanUp(tscTmr); taosTmrCleanUp(tscTmr);
} }
......
...@@ -762,6 +762,10 @@ void tscCloseTscObj(STscObj* pObj) { ...@@ -762,6 +762,10 @@ void tscCloseTscObj(STscObj* pObj) {
pthread_mutex_destroy(&pObj->mutex); pthread_mutex_destroy(&pObj->mutex);
if (pObj->pDnodeConn != NULL) {
rpcClose(pObj->pDnodeConn);
}
tscTrace("%p DB connection is closed", pObj); tscTrace("%p DB connection is closed", pObj);
tfree(pObj); tfree(pObj);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册