未验证 提交 4868c62c 编写于 作者: H haojun Liao 提交者: GitHub

Merge pull request #4225 from taosdata/patch/TD-2012

return reference ID via parameter instead of return value
......@@ -241,11 +241,7 @@ int tscSendMsgToServer(SSqlObj *pSql) {
.code = 0
};
// NOTE: the rpc context should be acquired before sending data to server.
// Otherwise, the pSql object may have been released already during the response function, which is
// processMsgFromServer function. In the meanwhile, the assignment of the rpc context to sql object will absolutely
// cause crash.
pSql->rpcRid = rpcSendRequest(pObj->pDnodeConn, &pSql->epSet, &rpcMsg);
rpcSendRequest(pObj->pDnodeConn, &pSql->epSet, &rpcMsg, &pSql->rpcRid);
return TSDB_CODE_SUCCESS;
}
......
......@@ -105,6 +105,7 @@ void taos_init_imp(void) {
taosReadGlobalCfg();
taosCheckGlobalCfg();
rpcInit();
tscDebug("starting to initialize TAOS client ...");
tscDebug("Local End Point is:%s", tsLocalEp);
}
......@@ -179,6 +180,7 @@ void taos_cleanup(void) {
taosCloseRef(tscRefId);
taosCleanupKeywordsTable();
taosCloseLog();
if (tscEmbedded == 0) rpcCleanup();
m = tscTmr;
if (m != NULL && atomic_val_compare_exchange_ptr(&tscTmr, m, 0) == m) {
......
......@@ -20,6 +20,7 @@
#include "tconfig.h"
#include "tglobal.h"
#include "twal.h"
#include "trpc.h"
#include "dnode.h"
#include "dnodeInt.h"
#include "dnodeMgmt.h"
......@@ -54,6 +55,7 @@ typedef struct {
} SDnodeComponent;
static const SDnodeComponent tsDnodeComponents[] = {
{"rpc", rpcInit, rpcCleanup},
{"storage", dnodeInitStorage, dnodeCleanupStorage},
{"dnodecfg", dnodeInitCfg, dnodeCleanupCfg},
{"dnodeeps", dnodeInitEps, dnodeCleanupEps},
......
......@@ -169,7 +169,7 @@ void dnodeAddClientRspHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg)) {
}
void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg) {
rpcSendRequest(tsClientRpc, epSet, rpcMsg);
rpcSendRequest(tsClientRpc, epSet, rpcMsg, NULL);
}
void dnodeSendMsgToMnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp) {
......
......@@ -78,12 +78,14 @@ typedef struct SRpcInit {
int (*afp)(char *tableId, char *spi, char *encrypt, char *secret, char *ckey);
} SRpcInit;
int32_t rpcInit();
void rpcCleanup();
void *rpcOpen(const SRpcInit *pRpc);
void rpcClose(void *);
void *rpcMallocCont(int contLen);
void rpcFreeCont(void *pCont);
void *rpcReallocCont(void *ptr, int contLen);
int64_t rpcSendRequest(void *thandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg);
void rpcSendRequest(void *thandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid);
void rpcSendResponse(const SRpcMsg *pMsg);
void rpcSendRedirectRsp(void *pConn, const SRpcEpSet *pEpSet);
int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo);
......
......@@ -135,7 +135,7 @@ int tsRpcOverhead;
static int tsRpcRefId = -1;
static int32_t tsRpcNum = 0;
static pthread_once_t tsRpcInit = PTHREAD_ONCE_INIT;
//static pthread_once_t tsRpcInit = PTHREAD_ONCE_INIT;
// server:0 client:1 tcp:2 udp:0
#define RPC_CONN_UDPS 0
......@@ -221,13 +221,15 @@ static void rpcFree(void *p) {
free(p);
}
void rpcInit(void) {
int32_t rpcInit(void) {
tsProgressTimer = tsRpcTimer/2;
tsRpcMaxRetry = tsRpcMaxTime * 1000/tsProgressTimer;
tsRpcHeadSize = RPC_MSG_OVERHEAD;
tsRpcOverhead = sizeof(SRpcReqContext);
tsRpcRefId = taosOpenRef(200, rpcFree);
return 0;
}
void rpcCleanup(void) {
......@@ -238,7 +240,7 @@ void rpcCleanup(void) {
void *rpcOpen(const SRpcInit *pInit) {
SRpcInfo *pRpc;
pthread_once(&tsRpcInit, rpcInit);
//pthread_once(&tsRpcInit, rpcInit);
pRpc = (SRpcInfo *)calloc(1, sizeof(SRpcInfo));
if (pRpc == NULL) return NULL;
......@@ -379,7 +381,7 @@ void *rpcReallocCont(void *ptr, int contLen) {
return start + sizeof(SRpcReqContext) + sizeof(SRpcHead);
}
int64_t rpcSendRequest(void *shandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg) {
void rpcSendRequest(void *shandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg, int64_t *pRid) {
SRpcInfo *pRpc = (SRpcInfo *)shandle;
SRpcReqContext *pContext;
......@@ -405,14 +407,10 @@ int64_t rpcSendRequest(void *shandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg) {
|| type == TSDB_MSG_TYPE_CM_SHOW )
pContext->connType = RPC_CONN_TCPC;
// set the handle to pContext, so app can cancel the request
if (pMsg->handle) *((void **)pMsg->handle) = pContext;
pContext->rid = taosAddRef(tsRpcRefId, pContext);
if (pRid) *pRid = pContext->rid;
rpcSendReqToServer(pRpc, pContext);
return pContext->rid;
}
void rpcSendResponse(const SRpcMsg *pRsp) {
......@@ -528,7 +526,7 @@ void rpcSendRecv(void *shandle, SRpcEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp)
pContext->pRsp = pRsp;
pContext->pSet = pEpSet;
rpcSendRequest(shandle, pEpSet, pMsg);
rpcSendRequest(shandle, pEpSet, pMsg, NULL);
tsem_wait(&sem);
tsem_destroy(&sem);
......
......@@ -57,7 +57,7 @@ static void *sendRequest(void *param) {
rpcMsg.ahandle = pInfo;
rpcMsg.msgType = 1;
tDebug("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num);
rpcSendRequest(pInfo->pRpc, &pInfo->epSet, &rpcMsg);
rpcSendRequest(pInfo->pRpc, &pInfo->epSet, &rpcMsg, NULL);
if ( pInfo->num % 20000 == 0 )
tInfo("thread:%d, %d requests have been sent", pInfo->index, pInfo->num);
tsem_wait(&pInfo->rspSem);
......
......@@ -57,7 +57,7 @@ void *sendRequest(void *param) {
rpcMsg.ahandle = pInfo;
rpcMsg.msgType = 1;
uDebug("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num);
rpcSendRequest(pInfo->pRpc, &pInfo->epSet, &rpcMsg);
rpcSendRequest(pInfo->pRpc, &pInfo->epSet, &rpcMsg, NULL);
if (pInfo->num % 20000 == 0) {
uInfo("thread:%d, %d requests have been sent", pInfo->index, pInfo->num);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册