提交 c7851754 编写于 作者: S Shengliang Guan

refactor: adjust SRpcMsg

上级 893fb646
...@@ -26,39 +26,47 @@ extern "C" { ...@@ -26,39 +26,47 @@ extern "C" {
#define TAOS_CONN_SERVER 0 #define TAOS_CONN_SERVER 0
#define TAOS_CONN_CLIENT 1 #define TAOS_CONN_CLIENT 1
#define IsReq(pMsg) (pMsg->msgType & 1U)
extern int tsRpcHeadSize; extern int tsRpcHeadSize;
typedef struct SRpcConnInfo { typedef struct {
uint32_t clientIp; uint32_t clientIp;
uint16_t clientPort; uint16_t clientPort;
uint32_t serverIp;
char user[TSDB_USER_LEN]; char user[TSDB_USER_LEN];
} SRpcConnInfo; } SRpcConnInfo;
typedef struct SRpcMsg { typedef struct {
tmsg_t msgType; // rpc info
void * pCont; struct {
int contLen; void *handle; // rpc handle returned to app
int32_t code; int64_t refId; // refid, used by server
void * handle; // rpc handle returned to app int32_t noResp; // has response or not(default 0, 0: resp, 1: no resp);
void * ahandle; // app handle set by client int32_t persistHandle; // persist handle or not
int64_t refId; // refid, used by server };
int noResp; // has response or not(default 0, 0: resp, 1: no resp); // app info
int persistHandle; // persist handle or not struct {
void *ahandle; // app handle set by client
void *proc; // proc handle
void *wrapper; // wrapper handle
void *node; // node mgmt handle
};
// resp info
struct {
void *rsp;
int32_t rspLen;
};
} SRpcHandleInfo;
typedef struct SRpcMsg {
tmsg_t msgType;
void *pCont;
int32_t contLen;
int32_t code;
SRpcHandleInfo info;
SRpcConnInfo conn;
} SRpcMsg; } SRpcMsg;
typedef struct {
char user[TSDB_USER_LEN];
uint32_t clientIp;
uint16_t clientPort;
SRpcMsg rpcMsg;
int32_t rspLen;
void * pRsp;
void * pNode;
} SNodeMsg;
typedef void (*RpcCfp)(void *parent, SRpcMsg *, SEpSet *rf); typedef void (*RpcCfp)(void *parent, SRpcMsg *, SEpSet *rf);
typedef int (*RpcAfp)(void *parent, char *tableId, char *spi, char *encrypt, char *secret, char *ckey); typedef int (*RpcAfp)(void *parent, char *tableId, char *spi, char *encrypt, char *secret, char *ckey);
/// ///
......
...@@ -223,8 +223,8 @@ static void cliWalkCb(uv_handle_t* handle, void* arg); ...@@ -223,8 +223,8 @@ static void cliWalkCb(uv_handle_t* handle, void* arg);
#define CONN_RELEASE_BY_SERVER(conn) \ #define CONN_RELEASE_BY_SERVER(conn) \
(((conn)->status == ConnRelease || (conn)->status == ConnInPool) && T_REF_VAL_GET(conn) == 1) (((conn)->status == ConnRelease || (conn)->status == ConnInPool) && T_REF_VAL_GET(conn) == 1)
#define REQUEST_NO_RESP(msg) ((msg)->noResp == 1) #define REQUEST_NO_RESP(msg) ((msg)->info.noResp == 1)
#define REQUEST_PERSIS_HANDLE(msg) ((msg)->persistHandle == 1) #define REQUEST_PERSIS_HANDLE(msg) ((msg)->info.persistHandle == 1)
#define REQUEST_RELEASE_HANDLE(cmsg) ((cmsg)->type == Release) #define REQUEST_RELEASE_HANDLE(cmsg) ((cmsg)->type == Release)
#define EPSET_GET_INUSE_IP(epSet) ((epSet)->eps[(epSet)->inUse].fqdn) #define EPSET_GET_INUSE_IP(epSet) ((epSet)->eps[(epSet)->inUse].fqdn)
...@@ -255,7 +255,7 @@ void cliHandleResp(SCliConn* conn) { ...@@ -255,7 +255,7 @@ void cliHandleResp(SCliConn* conn) {
transMsg.pCont = transContFromHead((char*)pHead); transMsg.pCont = transContFromHead((char*)pHead);
transMsg.code = pHead->code; transMsg.code = pHead->code;
transMsg.msgType = pHead->msgType; transMsg.msgType = pHead->msgType;
transMsg.ahandle = NULL; transMsg.info.ahandle = NULL;
SCliMsg* pMsg = NULL; SCliMsg* pMsg = NULL;
STransConnCtx* pCtx = NULL; STransConnCtx* pCtx = NULL;
...@@ -265,37 +265,38 @@ void cliHandleResp(SCliConn* conn) { ...@@ -265,37 +265,38 @@ void cliHandleResp(SCliConn* conn) {
pMsg = transQueuePop(&conn->cliMsgs); pMsg = transQueuePop(&conn->cliMsgs);
pCtx = pMsg ? pMsg->ctx : NULL; pCtx = pMsg ? pMsg->ctx : NULL;
if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(conn)) { if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(conn)) {
transMsg.ahandle = transCtxDumpVal(&conn->ctx, transMsg.msgType); transMsg.info.ahandle = transCtxDumpVal(&conn->ctx, transMsg.msgType);
if (transMsg.ahandle == NULL) { if (transMsg.info.ahandle == NULL) {
transMsg.ahandle = transCtxDumpBrokenlinkVal(&conn->ctx, (int32_t*)&(transMsg.msgType)); transMsg.info.ahandle = transCtxDumpBrokenlinkVal(&conn->ctx, (int32_t*)&(transMsg.msgType));
} }
tDebug("cli conn %p construct ahandle %p, persist: 0", conn, transMsg.ahandle); tDebug("cli conn %p construct ahandle %p, persist: 0", conn, transMsg.info.ahandle);
} else { } else {
transMsg.ahandle = pCtx ? pCtx->ahandle : NULL; transMsg.info.ahandle = pCtx ? pCtx->ahandle : NULL;
tDebug("cli conn %p get ahandle %p, persist: 0", conn, transMsg.ahandle); tDebug("cli conn %p get ahandle %p, persist: 0", conn, transMsg.info.ahandle);
} }
} else { } else {
uint64_t ahandle = (uint64_t)pHead->ahandle; uint64_t ahandle = (uint64_t)pHead->ahandle;
CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle); CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle);
if (pMsg == NULL) { if (pMsg == NULL) {
transMsg.ahandle = transCtxDumpVal(&conn->ctx, transMsg.msgType); transMsg.info.ahandle = transCtxDumpVal(&conn->ctx, transMsg.msgType);
tDebug("cli conn %p construct ahandle %p by %s, persist: 1", conn, transMsg.ahandle, TMSG_INFO(transMsg.msgType)); tDebug("cli conn %p construct ahandle %p by %s, persist: 1", conn, transMsg.info.ahandle,
if (!CONN_RELEASE_BY_SERVER(conn) && transMsg.ahandle == NULL) { TMSG_INFO(transMsg.msgType));
if (!CONN_RELEASE_BY_SERVER(conn) && transMsg.info.ahandle == NULL) {
transMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL; transMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
transMsg.ahandle = transCtxDumpBrokenlinkVal(&conn->ctx, (int32_t*)&(transMsg.msgType)); transMsg.info.ahandle = transCtxDumpBrokenlinkVal(&conn->ctx, (int32_t*)&(transMsg.msgType));
tDebug("cli conn %p construct ahandle %p due brokenlink, persist: 1", conn, transMsg.ahandle); tDebug("cli conn %p construct ahandle %p due brokenlink, persist: 1", conn, transMsg.info.ahandle);
} }
} else { } else {
pCtx = pMsg ? pMsg->ctx : NULL; pCtx = pMsg ? pMsg->ctx : NULL;
transMsg.ahandle = pCtx ? pCtx->ahandle : NULL; transMsg.info.ahandle = pCtx ? pCtx->ahandle : NULL;
tDebug("cli conn %p get ahandle %p, persist: 1", conn, transMsg.ahandle); tDebug("cli conn %p get ahandle %p, persist: 1", conn, transMsg.info.ahandle);
} }
} }
// buf's mem alread translated to transMsg.pCont // buf's mem alread translated to transMsg.pCont
transClearBuffer(&conn->readBuf); transClearBuffer(&conn->readBuf);
if (!CONN_NO_PERSIST_BY_APP(conn)) { if (!CONN_NO_PERSIST_BY_APP(conn)) {
transMsg.handle = conn; transMsg.info.handle = conn;
tDebug("%s cli conn %p ref by app", CONN_GET_INST_LABEL(conn), conn); tDebug("%s cli conn %p ref by app", CONN_GET_INST_LABEL(conn), conn);
} }
...@@ -308,7 +309,7 @@ void cliHandleResp(SCliConn* conn) { ...@@ -308,7 +309,7 @@ void cliHandleResp(SCliConn* conn) {
// transUnrefCliHandle(conn); // transUnrefCliHandle(conn);
return; return;
} }
if (CONN_RELEASE_BY_SERVER(conn) && transMsg.ahandle == NULL) { if (CONN_RELEASE_BY_SERVER(conn) && transMsg.info.ahandle == NULL) {
tTrace("except, server continue send while cli ignore it"); tTrace("except, server continue send while cli ignore it");
// transUnrefCliHandle(conn); // transUnrefCliHandle(conn);
return; return;
...@@ -357,24 +358,24 @@ void cliHandleExcept(SCliConn* pConn) { ...@@ -357,24 +358,24 @@ void cliHandleExcept(SCliConn* pConn) {
STransMsg transMsg = {0}; STransMsg transMsg = {0};
transMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL; transMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
transMsg.msgType = pMsg ? pMsg->msg.msgType + 1 : 0; transMsg.msgType = pMsg ? pMsg->msg.msgType + 1 : 0;
transMsg.ahandle = NULL; transMsg.info.ahandle = NULL;
transMsg.handle = pConn; transMsg.info.handle = pConn;
if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(pConn)) { if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(pConn)) {
transMsg.ahandle = transCtxDumpVal(&pConn->ctx, transMsg.msgType); transMsg.info.ahandle = transCtxDumpVal(&pConn->ctx, transMsg.msgType);
tDebug("%s cli conn %p construct ahandle %p by %s", CONN_GET_INST_LABEL(pConn), pConn, transMsg.ahandle, tDebug("%s cli conn %p construct ahandle %p by %s", CONN_GET_INST_LABEL(pConn), pConn, transMsg.info.ahandle,
TMSG_INFO(transMsg.msgType)); TMSG_INFO(transMsg.msgType));
if (transMsg.ahandle == NULL) { if (transMsg.info.ahandle == NULL) {
transMsg.ahandle = transCtxDumpBrokenlinkVal(&pConn->ctx, (int32_t*)&(transMsg.msgType)); transMsg.info.ahandle = transCtxDumpBrokenlinkVal(&pConn->ctx, (int32_t*)&(transMsg.msgType));
tDebug("%s cli conn %p construct ahandle %p due to brokenlink", CONN_GET_INST_LABEL(pConn), pConn, tDebug("%s cli conn %p construct ahandle %p due to brokenlink", CONN_GET_INST_LABEL(pConn), pConn,
transMsg.ahandle); transMsg.info.ahandle);
} }
} else { } else {
transMsg.ahandle = pCtx ? pCtx->ahandle : NULL; transMsg.info.ahandle = pCtx ? pCtx->ahandle : NULL;
} }
if (pCtx == NULL || pCtx->pSem == NULL) { if (pCtx == NULL || pCtx->pSem == NULL) {
if (transMsg.ahandle == NULL) { if (transMsg.info.ahandle == NULL) {
once = true; once = true;
continue; continue;
} }
...@@ -668,7 +669,7 @@ static void cliHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd) { ...@@ -668,7 +669,7 @@ static void cliHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd) {
// uv_stop(pThrd->loop); // uv_stop(pThrd->loop);
} }
static void cliHandleRelease(SCliMsg* pMsg, SCliThrdObj* pThrd) { static void cliHandleRelease(SCliMsg* pMsg, SCliThrdObj* pThrd) {
SCliConn* conn = pMsg->msg.handle; SCliConn* conn = pMsg->msg.info.handle;
tDebug("%s cli conn %p start to release to inst", CONN_GET_INST_LABEL(conn), conn); tDebug("%s cli conn %p start to release to inst", CONN_GET_INST_LABEL(conn), conn);
if (T_REF_VAL_GET(conn) == 2) { if (T_REF_VAL_GET(conn) == 2) {
...@@ -685,8 +686,8 @@ static void cliHandleRelease(SCliMsg* pMsg, SCliThrdObj* pThrd) { ...@@ -685,8 +686,8 @@ static void cliHandleRelease(SCliMsg* pMsg, SCliThrdObj* pThrd) {
SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrdObj* pThrd) { SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrdObj* pThrd) {
SCliConn* conn = NULL; SCliConn* conn = NULL;
if (pMsg->msg.handle != NULL) { if (pMsg->msg.info.handle != NULL) {
conn = (SCliConn*)(pMsg->msg.handle); conn = (SCliConn*)(pMsg->msg.info.handle);
if (conn != NULL) { if (conn != NULL) {
tTrace("%s cli conn %p reused", CONN_GET_INST_LABEL(conn), conn); tTrace("%s cli conn %p reused", CONN_GET_INST_LABEL(conn), conn);
} }
...@@ -995,7 +996,7 @@ void transReleaseCliHandle(void* handle) { ...@@ -995,7 +996,7 @@ void transReleaseCliHandle(void* handle) {
return; return;
} }
STransMsg tmsg = {.handle = handle}; STransMsg tmsg = {.info.handle = handle};
SCliMsg* cmsg = taosMemoryCalloc(1, sizeof(SCliMsg)); SCliMsg* cmsg = taosMemoryCalloc(1, sizeof(SCliMsg));
cmsg->msg = tmsg; cmsg->msg = tmsg;
cmsg->type = Release; cmsg->type = Release;
...@@ -1005,14 +1006,14 @@ void transReleaseCliHandle(void* handle) { ...@@ -1005,14 +1006,14 @@ void transReleaseCliHandle(void* handle) {
void transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransCtx* ctx) { void transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransCtx* ctx) {
STrans* pTransInst = (STrans*)shandle; STrans* pTransInst = (STrans*)shandle;
int index = CONN_HOST_THREAD_INDEX((SCliConn*)pReq->handle); int index = CONN_HOST_THREAD_INDEX((SCliConn*)pReq->info.handle);
if (index == -1) { if (index == -1) {
index = cliRBChoseIdx(pTransInst); index = cliRBChoseIdx(pTransInst);
} }
STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx)); STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx));
pCtx->epSet = *pEpSet; pCtx->epSet = *pEpSet;
pCtx->ahandle = pReq->ahandle; pCtx->ahandle = pReq->info.ahandle;
pCtx->msgType = pReq->msgType; pCtx->msgType = pReq->msgType;
pCtx->hThrdIdx = index; pCtx->hThrdIdx = index;
...@@ -1030,20 +1031,20 @@ void transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STra ...@@ -1030,20 +1031,20 @@ void transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STra
SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[index]; SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[index];
tDebug("send request at thread:%d, threadID: %" PRId64 ", msg: %p, dst: %s:%d, app:%p", index, thrd->thread, pReq, tDebug("send request at thread:%d, threadID: %" PRId64 ", msg: %p, dst: %s:%d, app:%p", index, thrd->thread, pReq,
EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->ahandle); EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle);
ASSERT(transSendAsync(thrd->asyncPool, &(cliMsg->q)) == 0); ASSERT(transSendAsync(thrd->asyncPool, &(cliMsg->q)) == 0);
} }
void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp) { void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp) {
STrans* pTransInst = (STrans*)shandle; STrans* pTransInst = (STrans*)shandle;
int index = CONN_HOST_THREAD_INDEX(pReq->handle); int index = CONN_HOST_THREAD_INDEX(pReq->info.handle);
if (index == -1) { if (index == -1) {
index = cliRBChoseIdx(pTransInst); index = cliRBChoseIdx(pTransInst);
} }
STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx)); STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx));
pCtx->epSet = *pEpSet; pCtx->epSet = *pEpSet;
pCtx->ahandle = pReq->ahandle; pCtx->ahandle = pReq->info.ahandle;
pCtx->msgType = pReq->msgType; pCtx->msgType = pReq->msgType;
pCtx->hThrdIdx = index; pCtx->hThrdIdx = index;
pCtx->pSem = taosMemoryCalloc(1, sizeof(tsem_t)); pCtx->pSem = taosMemoryCalloc(1, sizeof(tsem_t));
...@@ -1058,7 +1059,7 @@ void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransM ...@@ -1058,7 +1059,7 @@ void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransM
SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[index]; SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[index];
tDebug("send request at thread:%d, threadID:%" PRId64 ", msg: %p, dst: %s:%d, app:%p", index, thrd->thread, pReq, tDebug("send request at thread:%d, threadID:%" PRId64 ", msg: %p, dst: %s:%d, app:%p", index, thrd->thread, pReq,
EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->ahandle); EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle);
transSendAsync(thrd->asyncPool, &(cliMsg->q)); transSendAsync(thrd->asyncPool, &(cliMsg->q));
tsem_t* pSem = pCtx->pSem; tsem_t* pSem = pCtx->pSem;
......
...@@ -167,26 +167,26 @@ static void* transAcceptThread(void* arg); ...@@ -167,26 +167,26 @@ static void* transAcceptThread(void* arg);
static bool addHandleToWorkloop(SWorkThrdObj* pThrd, char* pipeName); static bool addHandleToWorkloop(SWorkThrdObj* pThrd, char* pipeName);
static bool addHandleToAcceptloop(void* arg); static bool addHandleToAcceptloop(void* arg);
#define CONN_SHOULD_RELEASE(conn, head) \ #define CONN_SHOULD_RELEASE(conn, head) \
do { \ do { \
if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) { \ if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) { \
conn->status = ConnRelease; \ conn->status = ConnRelease; \
transClearBuffer(&conn->readBuf); \ transClearBuffer(&conn->readBuf); \
transFreeMsg(transContFromHead((char*)head)); \ transFreeMsg(transContFromHead((char*)head)); \
tTrace("server conn %p received release request", conn); \ tTrace("server conn %p received release request", conn); \
\ \
STransMsg tmsg = {.code = 0, .handle = (void*)conn, .ahandle = NULL}; \ STransMsg tmsg = {.code = 0, .info.handle = (void*)conn, .info.ahandle = NULL}; \
SSrvMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSrvMsg)); \ SSrvMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSrvMsg)); \
srvMsg->msg = tmsg; \ srvMsg->msg = tmsg; \
srvMsg->type = Release; \ srvMsg->type = Release; \
srvMsg->pConn = conn; \ srvMsg->pConn = conn; \
reallocConnRefHandle(conn); \ reallocConnRefHandle(conn); \
if (!transQueuePush(&conn->srvMsgs, srvMsg)) { \ if (!transQueuePush(&conn->srvMsgs, srvMsg)) { \
return; \ return; \
} \ } \
uvStartSendRespInternal(srvMsg); \ uvStartSendRespInternal(srvMsg); \
return; \ return; \
} \ } \
} while (0) } while (0)
#define SRV_RELEASE_UV(loop) \ #define SRV_RELEASE_UV(loop) \
...@@ -266,8 +266,8 @@ static void uvHandleReq(SSrvConn* pConn) { ...@@ -266,8 +266,8 @@ static void uvHandleReq(SSrvConn* pConn) {
transMsg.pCont = pHead->content; transMsg.pCont = pHead->content;
transMsg.msgType = pHead->msgType; transMsg.msgType = pHead->msgType;
transMsg.code = pHead->code; transMsg.code = pHead->code;
transMsg.ahandle = (void*)pHead->ahandle; transMsg.info.ahandle = (void*)pHead->ahandle;
transMsg.handle = NULL; transMsg.info.handle = NULL;
// transDestroyBuffer(&pConn->readBuf); // transDestroyBuffer(&pConn->readBuf);
transClearBuffer(&pConn->readBuf); transClearBuffer(&pConn->readBuf);
...@@ -296,12 +296,12 @@ static void uvHandleReq(SSrvConn* pConn) { ...@@ -296,12 +296,12 @@ static void uvHandleReq(SSrvConn* pConn) {
// 2. once send out data, cli conn released to conn pool immediately // 2. once send out data, cli conn released to conn pool immediately
// 3. not mixed with persist // 3. not mixed with persist
transMsg.handle = (void*)uvAcquireExHandle(pConn->refId); transMsg.info.handle = (void*)uvAcquireExHandle(pConn->refId);
tTrace("server handle %p conn: %p translated to app, refId: %" PRIu64 "", transMsg.handle, pConn, pConn->refId); tTrace("server handle %p conn: %p translated to app, refId: %" PRIu64 "", transMsg.info.handle, pConn, pConn->refId);
transMsg.refId = pConn->refId; transMsg.info.refId = pConn->refId;
assert(transMsg.handle != NULL); assert(transMsg.info.handle != NULL);
if (pHead->noResp == 1) { if (pHead->noResp == 1) {
transMsg.refId = -1; transMsg.info.refId = -1;
} }
uvReleaseExHandle(pConn->refId); uvReleaseExHandle(pConn->refId);
...@@ -421,7 +421,7 @@ static void uvPrepareSendData(SSrvMsg* smsg, uv_buf_t* wb) { ...@@ -421,7 +421,7 @@ static void uvPrepareSendData(SSrvMsg* smsg, uv_buf_t* wb) {
pMsg->contLen = 0; pMsg->contLen = 0;
} }
STransMsgHead* pHead = transHeadFromCont(pMsg->pCont); STransMsgHead* pHead = transHeadFromCont(pMsg->pCont);
pHead->ahandle = (uint64_t)pMsg->ahandle; pHead->ahandle = (uint64_t)pMsg->info.ahandle;
if (pConn->status == ConnNormal) { if (pConn->status == ConnNormal) {
pHead->msgType = pConn->inType + 1; pHead->msgType = pConn->inType + 1;
...@@ -525,8 +525,8 @@ void uvWorkerAsyncCb(uv_async_t* handle) { ...@@ -525,8 +525,8 @@ void uvWorkerAsyncCb(uv_async_t* handle) {
} else { } else {
STransMsg transMsg = msg->msg; STransMsg transMsg = msg->msg;
SExHandle* exh1 = transMsg.handle; SExHandle* exh1 = transMsg.info.handle;
int64_t refId = transMsg.refId; int64_t refId = transMsg.info.refId;
SExHandle* exh2 = uvAcquireExHandle(refId); SExHandle* exh2 = uvAcquireExHandle(refId);
if (exh2 == NULL || exh1 != exh2) { if (exh2 == NULL || exh1 != exh2) {
tTrace("server handle except msg %p, ignore it", exh1); tTrace("server handle except msg %p, ignore it", exh1);
...@@ -1103,7 +1103,7 @@ void transReleaseSrvHandle(void* handle) { ...@@ -1103,7 +1103,7 @@ void transReleaseSrvHandle(void* handle) {
SWorkThrdObj* pThrd = exh->pThrd; SWorkThrdObj* pThrd = exh->pThrd;
ASYNC_ERR_JRET(pThrd); ASYNC_ERR_JRET(pThrd);
STransMsg tmsg = {.code = 0, .handle = exh, .ahandle = NULL, .refId = refId}; STransMsg tmsg = {.code = 0, .info.handle = exh, .info.ahandle = NULL, .info.refId = refId};
SSrvMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSrvMsg)); SSrvMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSrvMsg));
srvMsg->msg = tmsg; srvMsg->msg = tmsg;
...@@ -1122,13 +1122,13 @@ _return2: ...@@ -1122,13 +1122,13 @@ _return2:
return; return;
} }
void transSendResponse(const STransMsg* msg) { void transSendResponse(const STransMsg* msg) {
SExHandle* exh = msg->handle; SExHandle* exh = msg->info.handle;
int64_t refId = msg->refId; int64_t refId = msg->info.refId;
ASYNC_CHECK_HANDLE(exh, refId); ASYNC_CHECK_HANDLE(exh, refId);
assert(refId != 0); assert(refId != 0);
STransMsg tmsg = *msg; STransMsg tmsg = *msg;
tmsg.refId = refId; tmsg.info.refId = refId;
SWorkThrdObj* pThrd = exh->pThrd; SWorkThrdObj* pThrd = exh->pThrd;
ASYNC_ERR_JRET(pThrd); ASYNC_ERR_JRET(pThrd);
...@@ -1151,12 +1151,12 @@ _return2: ...@@ -1151,12 +1151,12 @@ _return2:
return; return;
} }
void transRegisterMsg(const STransMsg* msg) { void transRegisterMsg(const STransMsg* msg) {
SExHandle* exh = msg->handle; SExHandle* exh = msg->info.handle;
int64_t refId = msg->refId; int64_t refId = msg->info.refId;
ASYNC_CHECK_HANDLE(exh, refId); ASYNC_CHECK_HANDLE(exh, refId);
STransMsg tmsg = *msg; STransMsg tmsg = *msg;
tmsg.refId = refId; tmsg.info.refId = refId;
SWorkThrdObj* pThrd = exh->pThrd; SWorkThrdObj* pThrd = exh->pThrd;
ASYNC_ERR_JRET(pThrd); ASYNC_ERR_JRET(pThrd);
......
...@@ -69,11 +69,11 @@ void processShellMsg() { ...@@ -69,11 +69,11 @@ void processShellMsg() {
memset(&rpcMsg, 0, sizeof(rpcMsg)); memset(&rpcMsg, 0, sizeof(rpcMsg));
rpcMsg.pCont = rpcMallocCont(msgSize); rpcMsg.pCont = rpcMallocCont(msgSize);
rpcMsg.contLen = msgSize; rpcMsg.contLen = msgSize;
rpcMsg.handle = pRpcMsg->handle; rpcMsg.info = pRpcMsg->info;
rpcMsg.code = 0; rpcMsg.code = 0;
rpcSendResponse(&rpcMsg); rpcSendResponse(&rpcMsg);
void *handle = pRpcMsg->handle; void *handle = pRpcMsg->info.handle;
taosFreeQitem(pRpcMsg); taosFreeQitem(pRpcMsg);
{ {
...@@ -81,7 +81,7 @@ void processShellMsg() { ...@@ -81,7 +81,7 @@ void processShellMsg() {
SRpcMsg nRpcMsg = {0}; SRpcMsg nRpcMsg = {0};
nRpcMsg.pCont = rpcMallocCont(msgSize); nRpcMsg.pCont = rpcMallocCont(msgSize);
nRpcMsg.contLen = msgSize; nRpcMsg.contLen = msgSize;
nRpcMsg.handle = handle; nRpcMsg.info.handle = handle;
nRpcMsg.code = TSDB_CODE_CTG_NOT_READY; nRpcMsg.code = TSDB_CODE_CTG_NOT_READY;
rpcSendResponse(&nRpcMsg); rpcSendResponse(&nRpcMsg);
} }
......
...@@ -32,7 +32,7 @@ typedef struct { ...@@ -32,7 +32,7 @@ typedef struct {
void * pRpc; void * pRpc;
} SInfo; } SInfo;
static void processResponse(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) { static void processResponse(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) {
SInfo *pInfo = (SInfo *)pMsg->ahandle; SInfo *pInfo = (SInfo *)pMsg->info.ahandle;
// tError("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, pMsg->msgType, pMsg->contLen, // tError("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, pMsg->msgType, pMsg->contLen,
// pMsg->code); // pMsg->code);
...@@ -61,7 +61,7 @@ static void *sendRequest(void *param) { ...@@ -61,7 +61,7 @@ static void *sendRequest(void *param) {
pInfo->num++; pInfo->num++;
rpcMsg.pCont = rpcMallocCont(pInfo->msgSize); rpcMsg.pCont = rpcMallocCont(pInfo->msgSize);
rpcMsg.contLen = pInfo->msgSize; rpcMsg.contLen = pInfo->msgSize;
rpcMsg.ahandle = pInfo; rpcMsg.info.ahandle = pInfo;
rpcMsg.msgType = 1; rpcMsg.msgType = 1;
// tDebug("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num); // tDebug("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num);
int64_t start = taosGetTimestampUs(); int64_t start = taosGetTimestampUs();
......
...@@ -69,7 +69,7 @@ void processShellMsg() { ...@@ -69,7 +69,7 @@ void processShellMsg() {
memset(&rpcMsg, 0, sizeof(rpcMsg)); memset(&rpcMsg, 0, sizeof(rpcMsg));
rpcMsg.pCont = rpcMallocCont(msgSize); rpcMsg.pCont = rpcMallocCont(msgSize);
rpcMsg.contLen = msgSize; rpcMsg.contLen = msgSize;
rpcMsg.handle = pRpcMsg->handle; rpcMsg.info = pRpcMsg->info;
rpcMsg.code = 0; rpcMsg.code = 0;
rpcSendResponse(&rpcMsg); rpcSendResponse(&rpcMsg);
......
...@@ -83,9 +83,9 @@ class Client { ...@@ -83,9 +83,9 @@ class Client {
*resp = this->resp; *resp = this->resp;
} }
void SendAndRecvNoHandle(SRpcMsg *req, SRpcMsg *resp) { void SendAndRecvNoHandle(SRpcMsg *req, SRpcMsg *resp) {
if (req->handle != NULL) { if (req->info.handle != NULL) {
rpcReleaseHandle(req->handle, TAOS_CONN_CLIENT); rpcReleaseHandle(req->info.handle, TAOS_CONN_CLIENT);
req->handle = NULL; req->info.handle = NULL;
} }
SendAndRecv(req, resp); SendAndRecv(req, resp);
} }
...@@ -154,7 +154,7 @@ static void processReq(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { ...@@ -154,7 +154,7 @@ static void processReq(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
SRpcMsg rpcMsg = {0}; SRpcMsg rpcMsg = {0};
rpcMsg.pCont = rpcMallocCont(100); rpcMsg.pCont = rpcMallocCont(100);
rpcMsg.contLen = 100; rpcMsg.contLen = 100;
rpcMsg.handle = pMsg->handle; rpcMsg.info = pMsg->info;
rpcMsg.code = 0; rpcMsg.code = 0;
rpcSendResponse(&rpcMsg); rpcSendResponse(&rpcMsg);
} }
...@@ -164,7 +164,7 @@ static void processContinueSend(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { ...@@ -164,7 +164,7 @@ static void processContinueSend(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
SRpcMsg rpcMsg = {0}; SRpcMsg rpcMsg = {0};
rpcMsg.pCont = rpcMallocCont(100); rpcMsg.pCont = rpcMallocCont(100);
rpcMsg.contLen = 100; rpcMsg.contLen = 100;
rpcMsg.handle = pMsg->handle; rpcMsg.info = pMsg->info;
rpcMsg.code = 0; rpcMsg.code = 0;
rpcSendResponse(&rpcMsg); rpcSendResponse(&rpcMsg);
} }
...@@ -173,19 +173,18 @@ static void processReleaseHandleCb(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) ...@@ -173,19 +173,18 @@ static void processReleaseHandleCb(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet)
SRpcMsg rpcMsg = {0}; SRpcMsg rpcMsg = {0};
rpcMsg.pCont = rpcMallocCont(100); rpcMsg.pCont = rpcMallocCont(100);
rpcMsg.contLen = 100; rpcMsg.contLen = 100;
rpcMsg.handle = pMsg->handle; rpcMsg.info = pMsg->info;
rpcMsg.code = 0; rpcMsg.code = 0;
rpcSendResponse(&rpcMsg); rpcSendResponse(&rpcMsg);
rpcReleaseHandle(pMsg->handle, TAOS_CONN_SERVER); rpcReleaseHandle(pMsg->info.handle, TAOS_CONN_SERVER);
} }
static void processRegisterFailure(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { static void processRegisterFailure(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
void *handle = pMsg->handle;
{ {
SRpcMsg rpcMsg1 = {0}; SRpcMsg rpcMsg1 = {0};
rpcMsg1.pCont = rpcMallocCont(100); rpcMsg1.pCont = rpcMallocCont(100);
rpcMsg1.contLen = 100; rpcMsg1.contLen = 100;
rpcMsg1.handle = handle; rpcMsg1.info = pMsg->info;
rpcMsg1.code = 0; rpcMsg1.code = 0;
rpcRegisterBrokenLinkArg(&rpcMsg1); rpcRegisterBrokenLinkArg(&rpcMsg1);
} }
...@@ -194,7 +193,7 @@ static void processRegisterFailure(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) ...@@ -194,7 +193,7 @@ static void processRegisterFailure(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet)
SRpcMsg rpcMsg = {0}; SRpcMsg rpcMsg = {0};
rpcMsg.pCont = rpcMallocCont(100); rpcMsg.pCont = rpcMallocCont(100);
rpcMsg.contLen = 100; rpcMsg.contLen = 100;
rpcMsg.handle = pMsg->handle; rpcMsg.info = pMsg->info;
rpcMsg.code = 0; rpcMsg.code = 0;
rpcSendResponse(&rpcMsg); rpcSendResponse(&rpcMsg);
} }
...@@ -334,8 +333,8 @@ TEST_F(TransEnv, cliPersistHandle) { ...@@ -334,8 +333,8 @@ TEST_F(TransEnv, cliPersistHandle) {
void * handle = NULL; void * handle = NULL;
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
SRpcMsg req = {0}; SRpcMsg req = {0};
req.handle = resp.handle; req.info = resp.info;
req.persistHandle = 1; req.info.persistHandle = 1;
req.msgType = 1; req.msgType = 1;
req.pCont = rpcMallocCont(10); req.pCont = rpcMallocCont(10);
...@@ -348,7 +347,7 @@ TEST_F(TransEnv, cliPersistHandle) { ...@@ -348,7 +347,7 @@ TEST_F(TransEnv, cliPersistHandle) {
// if (i >= 6) { // if (i >= 6) {
// EXPECT_TRUE(resp.code != 0); // EXPECT_TRUE(resp.code != 0);
//} //}
handle = resp.handle; handle = resp.info.handle;
} }
rpcReleaseHandle(handle, TAOS_CONN_CLIENT); rpcReleaseHandle(handle, TAOS_CONN_CLIENT);
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
...@@ -371,8 +370,8 @@ TEST_F(TransEnv, srvReleaseHandle) { ...@@ -371,8 +370,8 @@ TEST_F(TransEnv, srvReleaseHandle) {
SRpcMsg req = {0}; SRpcMsg req = {0};
for (int i = 0; i < 1; i++) { for (int i = 0; i < 1; i++) {
memset(&req, 0, sizeof(req)); memset(&req, 0, sizeof(req));
req.handle = resp.handle; req.info = resp.info;
req.persistHandle = 1; req.info.persistHandle = 1;
req.msgType = 1; req.msgType = 1;
req.pCont = rpcMallocCont(10); req.pCont = rpcMallocCont(10);
req.contLen = 10; req.contLen = 10;
...@@ -387,8 +386,8 @@ TEST_F(TransEnv, cliReleaseHandleExcept) { ...@@ -387,8 +386,8 @@ TEST_F(TransEnv, cliReleaseHandleExcept) {
SRpcMsg req = {0}; SRpcMsg req = {0};
for (int i = 0; i < 3; i++) { for (int i = 0; i < 3; i++) {
memset(&req, 0, sizeof(req)); memset(&req, 0, sizeof(req));
req.handle = resp.handle; req.info = resp.info;
req.persistHandle = 1; req.info.persistHandle = 1;
req.msgType = 1; req.msgType = 1;
req.pCont = rpcMallocCont(10); req.pCont = rpcMallocCont(10);
req.contLen = 10; req.contLen = 10;
...@@ -424,7 +423,7 @@ TEST_F(TransEnv, srvPersistHandleExcept) { ...@@ -424,7 +423,7 @@ TEST_F(TransEnv, srvPersistHandleExcept) {
SRpcMsg req = {0}; SRpcMsg req = {0};
for (int i = 0; i < 5; i++) { for (int i = 0; i < 5; i++) {
memset(&req, 0, sizeof(req)); memset(&req, 0, sizeof(req));
req.handle = resp.handle; req.info = resp.info;
req.msgType = 1; req.msgType = 1;
req.pCont = rpcMallocCont(10); req.pCont = rpcMallocCont(10);
req.contLen = 10; req.contLen = 10;
...@@ -444,7 +443,7 @@ TEST_F(TransEnv, cliPersistHandleExcept) { ...@@ -444,7 +443,7 @@ TEST_F(TransEnv, cliPersistHandleExcept) {
SRpcMsg req = {0}; SRpcMsg req = {0};
for (int i = 0; i < 5; i++) { for (int i = 0; i < 5; i++) {
memset(&req, 0, sizeof(req)); memset(&req, 0, sizeof(req));
req.handle = resp.handle; req.info = resp.info;
req.msgType = 1; req.msgType = 1;
req.pCont = rpcMallocCont(10); req.pCont = rpcMallocCont(10);
req.contLen = 10; req.contLen = 10;
...@@ -468,14 +467,14 @@ TEST_F(TransEnv, queryExcept) { ...@@ -468,14 +467,14 @@ TEST_F(TransEnv, queryExcept) {
SRpcMsg req = {0}; SRpcMsg req = {0};
for (int i = 0; i < 5; i++) { for (int i = 0; i < 5; i++) {
memset(&req, 0, sizeof(req)); memset(&req, 0, sizeof(req));
req.handle = resp.handle; req.info = resp.info;
req.persistHandle = 1; req.info.persistHandle = 1;
req.msgType = 1; req.msgType = 1;
req.pCont = rpcMallocCont(10); req.pCont = rpcMallocCont(10);
req.contLen = 10; req.contLen = 10;
tr->cliSendAndRecv(&req, &resp); tr->cliSendAndRecv(&req, &resp);
if (i == 2) { if (i == 2) {
rpcReleaseHandle(resp.handle, TAOS_CONN_CLIENT); rpcReleaseHandle(resp.info.handle, TAOS_CONN_CLIENT);
tr->StopCli(); tr->StopCli();
break; break;
} }
...@@ -487,7 +486,7 @@ TEST_F(TransEnv, noResp) { ...@@ -487,7 +486,7 @@ TEST_F(TransEnv, noResp) {
SRpcMsg req = {0}; SRpcMsg req = {0};
for (int i = 0; i < 5; i++) { for (int i = 0; i < 5; i++) {
memset(&req, 0, sizeof(req)); memset(&req, 0, sizeof(req));
req.noResp = 1; req.info.noResp = 1;
req.msgType = 1; req.msgType = 1;
req.pCont = rpcMallocCont(10); req.pCont = rpcMallocCont(10);
req.contLen = 10; req.contLen = 10;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册