提交 5262b4ad 编写于 作者: dengyihao's avatar dengyihao

modify transport

上级 5f51fb3a
...@@ -64,7 +64,6 @@ typedef struct SRpcInit { ...@@ -64,7 +64,6 @@ typedef struct SRpcInit {
int8_t connType; // TAOS_CONN_UDP, TAOS_CONN_TCPC, TAOS_CONN_TCPS int8_t connType; // TAOS_CONN_UDP, TAOS_CONN_TCPC, TAOS_CONN_TCPS
int idleTime; // milliseconds, 0 means idle timer is disabled int idleTime; // milliseconds, 0 means idle timer is disabled
bool noPool; // create conn pool or not
// the following is for client app ecurity only // the following is for client app ecurity only
char *user; // user name char *user; // user name
char spi; // security parameter index char spi; // security parameter index
......
...@@ -72,8 +72,6 @@ static void deregisterRequest(SRequestObj *pRequest) { ...@@ -72,8 +72,6 @@ static void deregisterRequest(SRequestObj *pRequest) {
taosReleaseRef(clientConnRefPool, pTscObj->id); taosReleaseRef(clientConnRefPool, pTscObj->id);
} }
// todo close the transporter properly // todo close the transporter properly
void closeTransporter(STscObj *pTscObj) { void closeTransporter(STscObj *pTscObj) {
if (pTscObj == NULL || pTscObj->pAppInfo->pTransporter == NULL) { if (pTscObj == NULL || pTscObj->pAppInfo->pTransporter == NULL) {
...@@ -241,6 +239,7 @@ void taos_init_imp(void) { ...@@ -241,6 +239,7 @@ void taos_init_imp(void) {
clientConnRefPool = taosOpenRef(200, destroyTscObj); clientConnRefPool = taosOpenRef(200, destroyTscObj);
clientReqRefPool = taosOpenRef(40960, doDestroyRequest); clientReqRefPool = taosOpenRef(40960, doDestroyRequest);
// transDestroyBuffer(&conn->readBuf);
taosGetAppName(appInfo.appName, NULL); taosGetAppName(appInfo.appName, NULL);
pthread_mutex_init(&appInfo.mutex, NULL); pthread_mutex_init(&appInfo.mutex, NULL);
......
...@@ -25,8 +25,8 @@ ...@@ -25,8 +25,8 @@
#include "dndMnode.h" #include "dndMnode.h"
#include "dndVnodes.h" #include "dndVnodes.h"
#define INTERNAL_USER "_dnd" #define INTERNAL_USER "_dnd"
#define INTERNAL_CKEY "_key" #define INTERNAL_CKEY "_key"
#define INTERNAL_SECRET "_pwd" #define INTERNAL_SECRET "_pwd"
static void dndInitMsgFp(STransMgmt *pMgmt) { static void dndInitMsgFp(STransMgmt *pMgmt) {
...@@ -155,7 +155,7 @@ static void dndInitMsgFp(STransMgmt *pMgmt) { ...@@ -155,7 +155,7 @@ static void dndInitMsgFp(STransMgmt *pMgmt) {
} }
static void dndProcessResponse(void *parent, SRpcMsg *pRsp, SEpSet *pEpSet) { static void dndProcessResponse(void *parent, SRpcMsg *pRsp, SEpSet *pEpSet) {
SDnode *pDnode = parent; SDnode * pDnode = parent;
STransMgmt *pMgmt = &pDnode->tmgmt; STransMgmt *pMgmt = &pDnode->tmgmt;
tmsg_t msgType = pRsp->msgType; tmsg_t msgType = pRsp->msgType;
...@@ -193,7 +193,6 @@ static int32_t dndInitClient(SDnode *pDnode) { ...@@ -193,7 +193,6 @@ static int32_t dndInitClient(SDnode *pDnode) {
rpcInit.ckey = INTERNAL_CKEY; rpcInit.ckey = INTERNAL_CKEY;
rpcInit.spi = 1; rpcInit.spi = 1;
rpcInit.parent = pDnode; rpcInit.parent = pDnode;
rpcInit.noPool = true;
char pass[TSDB_PASSWORD_LEN + 1] = {0}; char pass[TSDB_PASSWORD_LEN + 1] = {0};
taosEncryptPass_c((uint8_t *)(INTERNAL_SECRET), strlen(INTERNAL_SECRET), pass); taosEncryptPass_c((uint8_t *)(INTERNAL_SECRET), strlen(INTERNAL_SECRET), pass);
...@@ -219,7 +218,7 @@ static void dndCleanupClient(SDnode *pDnode) { ...@@ -219,7 +218,7 @@ static void dndCleanupClient(SDnode *pDnode) {
} }
static void dndProcessRequest(void *param, SRpcMsg *pReq, SEpSet *pEpSet) { static void dndProcessRequest(void *param, SRpcMsg *pReq, SEpSet *pEpSet) {
SDnode *pDnode = param; SDnode * pDnode = param;
STransMgmt *pMgmt = &pDnode->tmgmt; STransMgmt *pMgmt = &pDnode->tmgmt;
tmsg_t msgType = pReq->msgType; tmsg_t msgType = pReq->msgType;
...@@ -313,7 +312,7 @@ static int32_t dndRetrieveUserAuthInfo(void *parent, char *user, char *spi, char ...@@ -313,7 +312,7 @@ static int32_t dndRetrieveUserAuthInfo(void *parent, char *user, char *spi, char
SAuthReq authReq = {0}; SAuthReq authReq = {0};
tstrncpy(authReq.user, user, TSDB_USER_LEN); tstrncpy(authReq.user, user, TSDB_USER_LEN);
int32_t contLen = tSerializeSAuthReq(NULL, 0, &authReq); int32_t contLen = tSerializeSAuthReq(NULL, 0, &authReq);
void *pReq = rpcMallocCont(contLen); void * pReq = rpcMallocCont(contLen);
tSerializeSAuthReq(pReq, contLen, &authReq); tSerializeSAuthReq(pReq, contLen, &authReq);
SRpcMsg rpcMsg = {.pCont = pReq, .contLen = contLen, .msgType = TDMT_MND_AUTH, .ahandle = (void *)9528}; SRpcMsg rpcMsg = {.pCont = pReq, .contLen = contLen, .msgType = TDMT_MND_AUTH, .ahandle = (void *)9528};
......
...@@ -54,7 +54,6 @@ typedef struct { ...@@ -54,7 +54,6 @@ typedef struct {
int8_t connType; int8_t connType;
int64_t index; int64_t index;
char label[TSDB_LABEL_LEN]; char label[TSDB_LABEL_LEN];
bool noPool; // pool or not
char user[TSDB_UNI_LEN]; // meter ID char user[TSDB_UNI_LEN]; // meter ID
char spi; // security parameter index char spi; // security parameter index
......
...@@ -64,7 +64,6 @@ typedef struct { ...@@ -64,7 +64,6 @@ typedef struct {
void (*cfp)(void *parent, SRpcMsg *, SEpSet *); void (*cfp)(void *parent, SRpcMsg *, SEpSet *);
int (*afp)(void *parent, char *user, char *spi, char *encrypt, char *secret, char *ckey); int (*afp)(void *parent, char *user, char *spi, char *encrypt, char *secret, char *ckey);
bool noPool;
int32_t refCount; int32_t refCount;
void * parent; void * parent;
void * idPool; // handle to ID pool void * idPool; // handle to ID pool
......
...@@ -41,7 +41,6 @@ void* rpcOpen(const SRpcInit* pInit) { ...@@ -41,7 +41,6 @@ void* rpcOpen(const SRpcInit* pInit) {
pRpc->numOfThreads = pInit->numOfThreads; pRpc->numOfThreads = pInit->numOfThreads;
} }
pRpc->noPool = pInit->noPool;
pRpc->connType = pInit->connType; pRpc->connType = pInit->connType;
pRpc->idleTime = pInit->idleTime; pRpc->idleTime = pInit->idleTime;
pRpc->tcphandle = (*taosInitHandle[pRpc->connType])(0, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc); pRpc->tcphandle = (*taosInitHandle[pRpc->connType])(0, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc);
......
...@@ -170,13 +170,7 @@ static void clientHandleResp(SCliConn* conn) { ...@@ -170,13 +170,7 @@ static void clientHandleResp(SCliConn* conn) {
// user owns conn->persist = 1 // user owns conn->persist = 1
if (conn->persist == 0) { if (conn->persist == 0) {
if (pTransInst->noPool == true) { addConnToPool(pThrd->pool, pCtx->ip, pCtx->port, conn);
destroyCmsg(conn->data);
clientConnDestroy(conn, true);
return;
} else {
addConnToPool(pThrd->pool, pCtx->ip, pCtx->port, conn);
}
} }
destroyCmsg(conn->data); destroyCmsg(conn->data);
conn->data = NULL; conn->data = NULL;
...@@ -463,10 +457,8 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { ...@@ -463,10 +457,8 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
tTrace("%s client conn %p reused", CONN_GET_INST_LABEL(conn), conn); tTrace("%s client conn %p reused", CONN_GET_INST_LABEL(conn), conn);
} }
} else { } else {
if (pTransInst->noPool == false) { conn = getConnFromPool(pThrd->pool, pCtx->ip, pCtx->port);
conn = getConnFromPool(pThrd->pool, pCtx->ip, pCtx->port); if (conn != NULL) tTrace("%s client conn %p get from conn pool", CONN_GET_INST_LABEL(conn), conn);
if (conn != NULL) tTrace("%s client conn %p get from conn pool", CONN_GET_INST_LABEL(conn), conn);
}
} }
if (conn != NULL) { if (conn != NULL) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册