提交 72491dc5 编写于 作者: 陶建辉(Jeff)'s avatar 陶建辉(Jeff)

return reference ID via parameter instead of return value

上级 45039b1c
...@@ -241,11 +241,7 @@ int tscSendMsgToServer(SSqlObj *pSql) { ...@@ -241,11 +241,7 @@ int tscSendMsgToServer(SSqlObj *pSql) {
.code = 0 .code = 0
}; };
// NOTE: the rpc context should be acquired before sending data to server. rpcSendRequest(pObj->pDnodeConn, &pSql->epSet, &rpcMsg, &pSql->rpcRid);
// Otherwise, the pSql object may have been released already during the response function, which is
// processMsgFromServer function. In the meanwhile, the assignment of the rpc context to sql object will absolutely
// cause crash.
pSql->rpcRid = rpcSendRequest(pObj->pDnodeConn, &pSql->epSet, &rpcMsg);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -105,6 +105,7 @@ void taos_init_imp(void) { ...@@ -105,6 +105,7 @@ void taos_init_imp(void) {
taosReadGlobalCfg(); taosReadGlobalCfg();
taosCheckGlobalCfg(); taosCheckGlobalCfg();
rpcInit();
tscDebug("starting to initialize TAOS client ..."); tscDebug("starting to initialize TAOS client ...");
tscDebug("Local End Point is:%s", tsLocalEp); tscDebug("Local End Point is:%s", tsLocalEp);
} }
...@@ -179,6 +180,7 @@ void taos_cleanup(void) { ...@@ -179,6 +180,7 @@ void taos_cleanup(void) {
taosCloseRef(tscRefId); taosCloseRef(tscRefId);
taosCleanupKeywordsTable(); taosCleanupKeywordsTable();
taosCloseLog(); taosCloseLog();
if (tscEmbedded == 0) rpcCleanup();
m = tscTmr; m = tscTmr;
if (m != NULL && atomic_val_compare_exchange_ptr(&tscTmr, m, 0) == m) { if (m != NULL && atomic_val_compare_exchange_ptr(&tscTmr, m, 0) == m) {
......
...@@ -20,6 +20,7 @@ ...@@ -20,6 +20,7 @@
#include "tconfig.h" #include "tconfig.h"
#include "tglobal.h" #include "tglobal.h"
#include "twal.h" #include "twal.h"
#include "trpc.h"
#include "dnode.h" #include "dnode.h"
#include "dnodeInt.h" #include "dnodeInt.h"
#include "dnodeMgmt.h" #include "dnodeMgmt.h"
...@@ -54,6 +55,7 @@ typedef struct { ...@@ -54,6 +55,7 @@ typedef struct {
} SDnodeComponent; } SDnodeComponent;
static const SDnodeComponent tsDnodeComponents[] = { static const SDnodeComponent tsDnodeComponents[] = {
{"rpc", rpcInit, rpcCleanup},
{"storage", dnodeInitStorage, dnodeCleanupStorage}, {"storage", dnodeInitStorage, dnodeCleanupStorage},
{"dnodecfg", dnodeInitCfg, dnodeCleanupCfg}, {"dnodecfg", dnodeInitCfg, dnodeCleanupCfg},
{"dnodeeps", dnodeInitEps, dnodeCleanupEps}, {"dnodeeps", dnodeInitEps, dnodeCleanupEps},
......
...@@ -169,7 +169,7 @@ void dnodeAddClientRspHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg)) { ...@@ -169,7 +169,7 @@ void dnodeAddClientRspHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg)) {
} }
void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg) { void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg) {
rpcSendRequest(tsClientRpc, epSet, rpcMsg); rpcSendRequest(tsClientRpc, epSet, rpcMsg, NULL);
} }
void dnodeSendMsgToMnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp) { void dnodeSendMsgToMnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp) {
...@@ -180,4 +180,4 @@ void dnodeSendMsgToMnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp) { ...@@ -180,4 +180,4 @@ void dnodeSendMsgToMnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp) {
void dnodeSendMsgToDnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp, SRpcEpSet *epSet) { void dnodeSendMsgToDnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp, SRpcEpSet *epSet) {
rpcSendRecv(tsClientRpc, epSet, rpcMsg, rpcRsp); rpcSendRecv(tsClientRpc, epSet, rpcMsg, rpcRsp);
} }
\ No newline at end of file
...@@ -78,12 +78,14 @@ typedef struct SRpcInit { ...@@ -78,12 +78,14 @@ typedef struct SRpcInit {
int (*afp)(char *tableId, char *spi, char *encrypt, char *secret, char *ckey); int (*afp)(char *tableId, char *spi, char *encrypt, char *secret, char *ckey);
} SRpcInit; } SRpcInit;
int32_t rpcInit();
void rpcCleanup();
void *rpcOpen(const SRpcInit *pRpc); void *rpcOpen(const SRpcInit *pRpc);
void rpcClose(void *); void rpcClose(void *);
void *rpcMallocCont(int contLen); void *rpcMallocCont(int contLen);
void rpcFreeCont(void *pCont); void rpcFreeCont(void *pCont);
void *rpcReallocCont(void *ptr, int contLen); void *rpcReallocCont(void *ptr, int contLen);
int64_t rpcSendRequest(void *thandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg); void rpcSendRequest(void *thandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid);
void rpcSendResponse(const SRpcMsg *pMsg); void rpcSendResponse(const SRpcMsg *pMsg);
void rpcSendRedirectRsp(void *pConn, const SRpcEpSet *pEpSet); void rpcSendRedirectRsp(void *pConn, const SRpcEpSet *pEpSet);
int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo); int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo);
......
...@@ -135,7 +135,7 @@ int tsRpcOverhead; ...@@ -135,7 +135,7 @@ int tsRpcOverhead;
static int tsRpcRefId = -1; static int tsRpcRefId = -1;
static int32_t tsRpcNum = 0; static int32_t tsRpcNum = 0;
static pthread_once_t tsRpcInit = PTHREAD_ONCE_INIT; //static pthread_once_t tsRpcInit = PTHREAD_ONCE_INIT;
// server:0 client:1 tcp:2 udp:0 // server:0 client:1 tcp:2 udp:0
#define RPC_CONN_UDPS 0 #define RPC_CONN_UDPS 0
...@@ -221,13 +221,15 @@ static void rpcFree(void *p) { ...@@ -221,13 +221,15 @@ static void rpcFree(void *p) {
free(p); free(p);
} }
void rpcInit(void) { int32_t rpcInit(void) {
tsProgressTimer = tsRpcTimer/2; tsProgressTimer = tsRpcTimer/2;
tsRpcMaxRetry = tsRpcMaxTime * 1000/tsProgressTimer; tsRpcMaxRetry = tsRpcMaxTime * 1000/tsProgressTimer;
tsRpcHeadSize = RPC_MSG_OVERHEAD; tsRpcHeadSize = RPC_MSG_OVERHEAD;
tsRpcOverhead = sizeof(SRpcReqContext); tsRpcOverhead = sizeof(SRpcReqContext);
tsRpcRefId = taosOpenRef(200, rpcFree); tsRpcRefId = taosOpenRef(200, rpcFree);
return 0;
} }
void rpcCleanup(void) { void rpcCleanup(void) {
...@@ -238,7 +240,7 @@ void rpcCleanup(void) { ...@@ -238,7 +240,7 @@ void rpcCleanup(void) {
void *rpcOpen(const SRpcInit *pInit) { void *rpcOpen(const SRpcInit *pInit) {
SRpcInfo *pRpc; SRpcInfo *pRpc;
pthread_once(&tsRpcInit, rpcInit); //pthread_once(&tsRpcInit, rpcInit);
pRpc = (SRpcInfo *)calloc(1, sizeof(SRpcInfo)); pRpc = (SRpcInfo *)calloc(1, sizeof(SRpcInfo));
if (pRpc == NULL) return NULL; if (pRpc == NULL) return NULL;
...@@ -379,7 +381,7 @@ void *rpcReallocCont(void *ptr, int contLen) { ...@@ -379,7 +381,7 @@ void *rpcReallocCont(void *ptr, int contLen) {
return start + sizeof(SRpcReqContext) + sizeof(SRpcHead); return start + sizeof(SRpcReqContext) + sizeof(SRpcHead);
} }
int64_t rpcSendRequest(void *shandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg) { void rpcSendRequest(void *shandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg, int64_t *pRid) {
SRpcInfo *pRpc = (SRpcInfo *)shandle; SRpcInfo *pRpc = (SRpcInfo *)shandle;
SRpcReqContext *pContext; SRpcReqContext *pContext;
...@@ -405,14 +407,10 @@ int64_t rpcSendRequest(void *shandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg) { ...@@ -405,14 +407,10 @@ int64_t rpcSendRequest(void *shandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg) {
|| type == TSDB_MSG_TYPE_CM_SHOW ) || type == TSDB_MSG_TYPE_CM_SHOW )
pContext->connType = RPC_CONN_TCPC; pContext->connType = RPC_CONN_TCPC;
// set the handle to pContext, so app can cancel the request
if (pMsg->handle) *((void **)pMsg->handle) = pContext;
pContext->rid = taosAddRef(tsRpcRefId, pContext); pContext->rid = taosAddRef(tsRpcRefId, pContext);
if (pRid) *pRid = pContext->rid;
rpcSendReqToServer(pRpc, pContext); rpcSendReqToServer(pRpc, pContext);
return pContext->rid;
} }
void rpcSendResponse(const SRpcMsg *pRsp) { void rpcSendResponse(const SRpcMsg *pRsp) {
...@@ -528,7 +526,7 @@ void rpcSendRecv(void *shandle, SRpcEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) ...@@ -528,7 +526,7 @@ void rpcSendRecv(void *shandle, SRpcEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp)
pContext->pRsp = pRsp; pContext->pRsp = pRsp;
pContext->pSet = pEpSet; pContext->pSet = pEpSet;
rpcSendRequest(shandle, pEpSet, pMsg); rpcSendRequest(shandle, pEpSet, pMsg, NULL);
tsem_wait(&sem); tsem_wait(&sem);
tsem_destroy(&sem); tsem_destroy(&sem);
......
...@@ -57,7 +57,7 @@ static void *sendRequest(void *param) { ...@@ -57,7 +57,7 @@ static void *sendRequest(void *param) {
rpcMsg.ahandle = pInfo; rpcMsg.ahandle = pInfo;
rpcMsg.msgType = 1; rpcMsg.msgType = 1;
tDebug("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num); tDebug("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num);
rpcSendRequest(pInfo->pRpc, &pInfo->epSet, &rpcMsg); rpcSendRequest(pInfo->pRpc, &pInfo->epSet, &rpcMsg, NULL);
if ( pInfo->num % 20000 == 0 ) if ( pInfo->num % 20000 == 0 )
tInfo("thread:%d, %d requests have been sent", pInfo->index, pInfo->num); tInfo("thread:%d, %d requests have been sent", pInfo->index, pInfo->num);
tsem_wait(&pInfo->rspSem); tsem_wait(&pInfo->rspSem);
......
...@@ -57,7 +57,7 @@ void *sendRequest(void *param) { ...@@ -57,7 +57,7 @@ void *sendRequest(void *param) {
rpcMsg.ahandle = pInfo; rpcMsg.ahandle = pInfo;
rpcMsg.msgType = 1; rpcMsg.msgType = 1;
uDebug("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num); uDebug("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num);
rpcSendRequest(pInfo->pRpc, &pInfo->epSet, &rpcMsg); rpcSendRequest(pInfo->pRpc, &pInfo->epSet, &rpcMsg, NULL);
if (pInfo->num % 20000 == 0) { if (pInfo->num % 20000 == 0) {
uInfo("thread:%d, %d requests have been sent", pInfo->index, pInfo->num); uInfo("thread:%d, %d requests have been sent", pInfo->index, pInfo->num);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册