提交 3b1664d9 编写于 作者: dengyihao's avatar dengyihao

feat: refactor rpc code

上级 963a72ee
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
#ifndef _TD_QUERY_H_ #ifndef _TD_QUERY_H_
#define _TD_QUERY_H_ #define _TD_QUERY_H_
// clang-foramt off
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
...@@ -208,7 +209,8 @@ char* jobTaskStatusStr(int32_t status); ...@@ -208,7 +209,8 @@ char* jobTaskStatusStr(int32_t status);
SSchema createSchema(int8_t type, int32_t bytes, col_id_t colId, const char* name); SSchema createSchema(int8_t type, int32_t bytes, col_id_t colId, const char* name);
void destroyQueryExecRes(SQueryExecRes* pRes); void destroyQueryExecRes(SQueryExecRes* pRes);
extern int32_t (*queryBuildMsg[TDMT_MAX])(void *input, char **msg, int32_t msgSize, int32_t *msgLen, void*(*mallocFp)(int32_t)); extern int32_t (*queryBuildMsg[TDMT_MAX])(void* input, char** msg, int32_t msgSize, int32_t* msgLen,
void* (*mallocFp)(int32_t));
extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t msgSize); extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t msgSize);
#define SET_META_TYPE_NULL(t) (t) = META_TYPE_NULL_TABLE #define SET_META_TYPE_NULL(t) (t) = META_TYPE_NULL_TABLE
...@@ -227,11 +229,13 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t ...@@ -227,11 +229,13 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t
#define NEED_CLIENT_HANDLE_ERROR(_code) \ #define NEED_CLIENT_HANDLE_ERROR(_code) \
(NEED_CLIENT_RM_TBLMETA_ERROR(_code) || NEED_CLIENT_REFRESH_VG_ERROR(_code) || \ (NEED_CLIENT_RM_TBLMETA_ERROR(_code) || NEED_CLIENT_REFRESH_VG_ERROR(_code) || \
NEED_CLIENT_REFRESH_TBLMETA_ERROR(_code)) NEED_CLIENT_REFRESH_TBLMETA_ERROR(_code))
#define NEED_CLIENT_RM_TBLMETA_REQ(_type) ((_type) == TDMT_VND_CREATE_TABLE || (_type) == TDMT_VND_CREATE_STB \ #define NEED_CLIENT_RM_TBLMETA_REQ(_type) \
|| (_type) == TDMT_VND_DROP_TABLE || (_type) == TDMT_VND_DROP_STB) ((_type) == TDMT_VND_CREATE_TABLE || (_type) == TDMT_VND_CREATE_STB || (_type) == TDMT_VND_DROP_TABLE || \
(_type) == TDMT_VND_DROP_STB)
#define NEED_SCHEDULER_RETRY_ERROR(_code) \ #define NEED_SCHEDULER_RETRY_ERROR(_code) \
((_code) == TSDB_CODE_RPC_REDIRECT || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL || (_code) == TSDB_CODE_SCH_TIMEOUT_ERROR) ((_code) == TSDB_CODE_RPC_REDIRECT || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL || \
(_code) == TSDB_CODE_SCH_TIMEOUT_ERROR)
#define REQUEST_TOTAL_EXEC_TIMES 2 #define REQUEST_TOTAL_EXEC_TIMES 2
...@@ -308,3 +312,4 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t ...@@ -308,3 +312,4 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t
#endif #endif
#endif /*_TD_QUERY_H_*/ #endif /*_TD_QUERY_H_*/
// clang-foramt on
...@@ -157,7 +157,10 @@ int32_t taosNonblockwrite(TdSocketPtr pSocket, char *ptr, int32_t nbytes); ...@@ -157,7 +157,10 @@ int32_t taosNonblockwrite(TdSocketPtr pSocket, char *ptr, int32_t nbytes);
int64_t taosCopyFds(TdSocketPtr pSrcSocket, TdSocketPtr pDestSocket, int64_t len); int64_t taosCopyFds(TdSocketPtr pSrcSocket, TdSocketPtr pDestSocket, int64_t len);
void taosWinSocketInit(); void taosWinSocketInit();
int taosCreateSocketWithTimeOutOpt(uint32_t conn_timeout_sec); /*
* set timeout(ms)
*/
int32_t taosCreateSocketWithTimeout(uint32_t timeout);
TdSocketPtr taosOpenUdpSocket(uint32_t localIp, uint16_t localPort); TdSocketPtr taosOpenUdpSocket(uint32_t localIp, uint16_t localPort);
TdSocketPtr taosOpenTcpClientSocket(uint32_t ip, uint16_t port, uint32_t localIp); TdSocketPtr taosOpenTcpClientSocket(uint32_t ip, uint16_t port, uint32_t localIp);
......
...@@ -481,8 +481,7 @@ _return: ...@@ -481,8 +481,7 @@ _return:
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t buildAsyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList, SMetaData* pResultMeta) {
int32_t buildAsyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList, SMetaData *pResultMeta) {
SArray* pDbVgList = NULL; SArray* pDbVgList = NULL;
SArray* pQnodeList = NULL; SArray* pQnodeList = NULL;
int32_t code = 0; int32_t code = 0;
...@@ -595,7 +594,6 @@ _return: ...@@ -595,7 +594,6 @@ _return:
return code; return code;
} }
int32_t scheduleAsyncQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList) { int32_t scheduleAsyncQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList) {
tsem_init(&schdRspSem, 0, 0); tsem_init(&schdRspSem, 0, 0);
...@@ -819,8 +817,8 @@ void schedulerExecCb(SQueryResult* pResult, void* param, int32_t code) { ...@@ -819,8 +817,8 @@ void schedulerExecCb(SQueryResult* pResult, void* param, int32_t code) {
} }
} }
tscDebug("0x%" PRIx64 " enter scheduler exec cb, code:%d - %s, reqId:0x%" PRIx64, tscDebug("0x%" PRIx64 " enter scheduler exec cb, code:%d - %s, reqId:0x%" PRIx64, pRequest->self, code,
pRequest->self, code, tstrerror(code), pRequest->requestId); tstrerror(code), pRequest->requestId);
STscObj* pTscObj = pRequest->pTscObj; STscObj* pTscObj = pRequest->pTscObj;
if (code != TSDB_CODE_SUCCESS && NEED_CLIENT_HANDLE_ERROR(code)) { if (code != TSDB_CODE_SUCCESS && NEED_CLIENT_HANDLE_ERROR(code)) {
...@@ -915,7 +913,7 @@ SRequestObj* launchQuery(STscObj* pTscObj, const char* sql, int sqlLen) { ...@@ -915,7 +913,7 @@ SRequestObj* launchQuery(STscObj* pTscObj, const char* sql, int sqlLen) {
return launchQueryImpl(pRequest, pQuery, false, NULL); return launchQueryImpl(pRequest, pQuery, false, NULL);
} }
void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData *pResultMeta) { void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultMeta) {
int32_t code = 0; int32_t code = 0;
switch (pQuery->execMode) { switch (pQuery->execMode) {
...@@ -1554,7 +1552,7 @@ static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t numOfRows, int ...@@ -1554,7 +1552,7 @@ static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t numOfRows, int
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t estimateJsonLen(SReqResultInfo* pResultInfo, int32_t numOfCols, int32_t numOfRows){ static int32_t estimateJsonLen(SReqResultInfo* pResultInfo, int32_t numOfCols, int32_t numOfRows) {
char* p = (char*)pResultInfo->pData; char* p = (char*)pResultInfo->pData;
int32_t len = sizeof(int32_t) + sizeof(uint64_t) + numOfCols * (sizeof(int16_t) + sizeof(int32_t)); int32_t len = sizeof(int32_t) + sizeof(uint64_t) + numOfCols * (sizeof(int16_t) + sizeof(int32_t));
...@@ -1592,7 +1590,6 @@ static int32_t estimateJsonLen(SReqResultInfo* pResultInfo, int32_t numOfCols, i ...@@ -1592,7 +1590,6 @@ static int32_t estimateJsonLen(SReqResultInfo* pResultInfo, int32_t numOfCols, i
} else { } else {
ASSERT(0); ASSERT(0);
} }
} }
} else if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) { } else if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
int32_t lenTmp = numOfRows * sizeof(int32_t); int32_t lenTmp = numOfRows * sizeof(int32_t);
...@@ -1616,13 +1613,13 @@ static int32_t doConvertJson(SReqResultInfo* pResultInfo, int32_t numOfCols, int ...@@ -1616,13 +1613,13 @@ static int32_t doConvertJson(SReqResultInfo* pResultInfo, int32_t numOfCols, int
break; break;
} }
} }
if(!needConvert) return TSDB_CODE_SUCCESS; if (!needConvert) return TSDB_CODE_SUCCESS;
char* p = (char*)pResultInfo->pData; char* p = (char*)pResultInfo->pData;
int32_t dataLen = estimateJsonLen(pResultInfo, numOfCols, numOfRows); int32_t dataLen = estimateJsonLen(pResultInfo, numOfCols, numOfRows);
pResultInfo->convertJson = taosMemoryCalloc(1, dataLen); pResultInfo->convertJson = taosMemoryCalloc(1, dataLen);
if(pResultInfo->convertJson == NULL) return TSDB_CODE_OUT_OF_MEMORY; if (pResultInfo->convertJson == NULL) return TSDB_CODE_OUT_OF_MEMORY;
char* p1 = pResultInfo->convertJson; char* p1 = pResultInfo->convertJson;
int32_t len = sizeof(int32_t) + sizeof(uint64_t) + numOfCols * (sizeof(int16_t) + sizeof(int32_t)); int32_t len = sizeof(int32_t) + sizeof(uint64_t) + numOfCols * (sizeof(int16_t) + sizeof(int32_t));
...@@ -1691,7 +1688,7 @@ static int32_t doConvertJson(SReqResultInfo* pResultInfo, int32_t numOfCols, int ...@@ -1691,7 +1688,7 @@ static int32_t doConvertJson(SReqResultInfo* pResultInfo, int32_t numOfCols, int
ASSERT(0); ASSERT(0);
} }
offset1[j]= len; offset1[j] = len;
memcpy(pStart1 + len, dst, varDataTLen(dst)); memcpy(pStart1 + len, dst, varDataTLen(dst));
len += varDataTLen(dst); len += varDataTLen(dst);
} }
...@@ -1709,7 +1706,6 @@ static int32_t doConvertJson(SReqResultInfo* pResultInfo, int32_t numOfCols, int ...@@ -1709,7 +1706,6 @@ static int32_t doConvertJson(SReqResultInfo* pResultInfo, int32_t numOfCols, int
pStart += len; pStart += len;
pStart1 += len; pStart1 += len;
memcpy(pStart1, pStart, colLen); memcpy(pStart1, pStart, colLen);
} }
pStart += colLen; pStart += colLen;
pStart1 += colLen1; pStart1 += colLen1;
...@@ -1777,7 +1773,7 @@ int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32 ...@@ -1777,7 +1773,7 @@ int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32
pStart += colLength[i]; pStart += colLength[i];
} }
if(convertUcs4){ if (convertUcs4) {
code = doConvertUCS4(pResultInfo, numOfRows, numOfCols, colLength); code = doConvertUCS4(pResultInfo, numOfRows, numOfCols, colLength);
} }
......
...@@ -19,7 +19,7 @@ ...@@ -19,7 +19,7 @@
#include "tmsg.h" #include "tmsg.h"
#include "trpc.h" #include "trpc.h"
#include "tsched.h" #include "tsched.h"
// clang-format off
#define VALIDNUMOFCOLS(x) ((x) >= TSDB_MIN_COLUMNS && (x) <= TSDB_MAX_COLUMNS) #define VALIDNUMOFCOLS(x) ((x) >= TSDB_MIN_COLUMNS && (x) <= TSDB_MAX_COLUMNS)
#define VALIDNUMOFTAGS(x) ((x) >= 0 && (x) <= TSDB_MAX_TAGS) #define VALIDNUMOFTAGS(x) ((x) >= 0 && (x) <= TSDB_MAX_TAGS)
...@@ -146,13 +146,15 @@ int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTra ...@@ -146,13 +146,15 @@ int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTra
} }
memcpy(pMsg, pInfo->msgInfo.pData, pInfo->msgInfo.len); memcpy(pMsg, pInfo->msgInfo.pData, pInfo->msgInfo.len);
SRpcMsg rpcMsg = {.msgType = pInfo->msgType, SRpcMsg rpcMsg = {
.msgType = pInfo->msgType,
.pCont = pMsg, .pCont = pMsg,
.contLen = pInfo->msgInfo.len, .contLen = pInfo->msgInfo.len,
.info.ahandle = (void*)pInfo, .info.ahandle = (void*)pInfo,
.info.handle = pInfo->msgInfo.handle, .info.handle = pInfo->msgInfo.handle,
.info.persistHandle = persistHandle, .info.persistHandle = persistHandle,
.code = 0}; .code = 0
};
assert(pInfo->fp != NULL); assert(pInfo->fp != NULL);
TRACE_SET_ROOTID(&rpcMsg.info.traceId, pInfo->requestId); TRACE_SET_ROOTID(&rpcMsg.info.traceId, pInfo->requestId);
rpcSendRequestWithCtx(pTransporter, epSet, &rpcMsg, pTransporterId, rpcCtx); rpcSendRequestWithCtx(pTransporter, epSet, &rpcMsg, pTransporterId, rpcCtx);
...@@ -220,3 +222,4 @@ void destroyQueryExecRes(SQueryExecRes* pRes) { ...@@ -220,3 +222,4 @@ void destroyQueryExecRes(SQueryExecRes* pRes) {
qError("invalid exec result for request type %d", pRes->msgType); qError("invalid exec result for request type %d", pRes->msgType);
} }
} }
// clang-format on
...@@ -238,6 +238,32 @@ int transSendAsync(SAsyncPool* pool, queue* mq); ...@@ -238,6 +238,32 @@ int transSendAsync(SAsyncPool* pool, queue* mq);
} \ } \
} \ } \
} while (0) } while (0)
#define ASYNC_CHECK_HANDLE(exh1, refId) \
do { \
if (refId > 0) { \
tTrace("handle step1"); \
SExHandle* exh2 = transAcquireExHandle(refMgt, refId); \
if (exh2 == NULL || refId != exh2->refId) { \
tTrace("handle %p except, may already freed, ignore msg, ref1: %" PRIu64 ", ref2 : %" PRIu64 "", exh1, \
exh2 ? exh2->refId : 0, refId); \
goto _return1; \
} \
} else if (refId == 0) { \
tTrace("handle step2"); \
SExHandle* exh2 = transAcquireExHandle(refMgt, refId); \
if (exh2 == NULL || refId != exh2->refId) { \
tTrace("handle %p except, may already freed, ignore msg, ref1: %" PRIu64 ", ref2 : %" PRIu64 "", exh1, refId, \
exh2 ? exh2->refId : 0); \
goto _return1; \
} else { \
refId = exh1->refId; \
} \
} else if (refId < 0) { \
tTrace("handle step3"); \
goto _return2; \
} \
} while (0)
int transInitBuffer(SConnBuffer* buf); int transInitBuffer(SConnBuffer* buf);
int transClearBuffer(SConnBuffer* buf); int transClearBuffer(SConnBuffer* buf);
int transDestroyBuffer(SConnBuffer* buf); int transDestroyBuffer(SConnBuffer* buf);
......
...@@ -25,7 +25,6 @@ typedef struct SCliConn { ...@@ -25,7 +25,6 @@ typedef struct SCliConn {
uv_write_t writeReq; uv_write_t writeReq;
void* hostThrd; void* hostThrd;
int hThrdIdx;
SConnBuffer readBuf; SConnBuffer readBuf;
STransQueue cliMsgs; STransQueue cliMsgs;
...@@ -36,6 +35,7 @@ typedef struct SCliConn { ...@@ -36,6 +35,7 @@ typedef struct SCliConn {
bool broken; // link broken or not bool broken; // link broken or not
ConnStatus status; // ConnStatus status; //
int64_t refId;
char* ip; char* ip;
uint32_t port; uint32_t port;
...@@ -168,16 +168,24 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) { ...@@ -168,16 +168,24 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) {
snprintf(key, sizeof(key), "%s:%d", ip, (int)port); \ snprintf(key, sizeof(key), "%s:%d", ip, (int)port); \
} while (0) } while (0)
#define CONN_HOST_THREAD_IDX(conn) (conn ? ((SCliConn*)conn)->hThrdIdx : -1) #define CONN_HOST_THREAD_IDX1(idx, exh, refId, pThrd) \
do { \
if (exh == NULL) { \
idx = -1; \
} else { \
ASYNC_CHECK_HANDLE(exh, refId); \
pThrd = (SCliThrdObj*)exh->pThrd; \
} \
} while (0)
#define CONN_PERSIST_TIME(para) (para * 1000 * 10) #define CONN_PERSIST_TIME(para) (para * 1000 * 10)
#define CONN_GET_HOST_THREAD(conn) (conn ? ((SCliConn*)conn)->hostThrd : NULL) #define CONN_GET_HOST_THREAD(conn) (conn ? ((SCliConn*)conn)->hostThrd : NULL)
#define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrdObj*)(conn)->hostThrd)->pTransInst))->label) #define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrdObj*)(conn)->hostThrd)->pTransInst))->label)
#define CONN_SHOULD_RELEASE(conn, head) \ #define CONN_SHOULD_RELEASE(conn, head) \
do { \ do { \
if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) { \ if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) { \
int status = conn->status; \
uint64_t ahandle = head->ahandle; \ uint64_t ahandle = head->ahandle; \
CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle); \ CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle); \
conn->status = ConnRelease; \
transClearBuffer(&conn->readBuf); \ transClearBuffer(&conn->readBuf); \
transFreeMsg(transContFromHead((char*)head)); \ transFreeMsg(transContFromHead((char*)head)); \
tDebug("%s conn %p receive release request, ref: %d", CONN_GET_INST_LABEL(conn), conn, T_REF_VAL_GET(conn)); \ tDebug("%s conn %p receive release request, ref: %d", CONN_GET_INST_LABEL(conn), conn, T_REF_VAL_GET(conn)); \
...@@ -186,7 +194,9 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) { ...@@ -186,7 +194,9 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) {
} \ } \
destroyCmsg(pMsg); \ destroyCmsg(pMsg); \
cliReleaseUnfinishedMsg(conn); \ cliReleaseUnfinishedMsg(conn); \
if (status != ConnInPool) { \
addConnToPool(((SCliThrdObj*)conn->hostThrd)->pool, conn); \ addConnToPool(((SCliThrdObj*)conn->hostThrd)->pool, conn); \
} \
return; \ return; \
} \ } \
} while (0) } while (0)
...@@ -323,23 +333,29 @@ void cliHandleResp(SCliConn* conn) { ...@@ -323,23 +333,29 @@ void cliHandleResp(SCliConn* conn) {
transClearBuffer(&conn->readBuf); transClearBuffer(&conn->readBuf);
if (!CONN_NO_PERSIST_BY_APP(conn)) { if (!CONN_NO_PERSIST_BY_APP(conn)) {
transMsg.info.handle = conn; SExHandle* exh = taosMemoryCalloc(1, sizeof(SExHandle));
exh->handle = conn;
exh->pThrd = pThrd;
exh->refId = transAddExHandle(refMgt, exh);
transMsg.info.handle = exh;
transMsg.info.refId = exh->refId;
conn->refId = exh->refId;
tDebug("%s conn %p ref by app", CONN_GET_INST_LABEL(conn), conn); tDebug("%s conn %p ref by app", CONN_GET_INST_LABEL(conn), conn);
} }
// char buf[64] = {0};
// TRACE_TO_STR(&transMsg.info.traceId, buf);
STraceId* trace = &transMsg.info.traceId; STraceId* trace = &transMsg.info.traceId;
tGTrace("conn %p %s received from %s:%d, local info: %s:%d, msg size: %d, code: %d", conn, TMSG_INFO(pHead->msgType), tGTrace("%s conn %p %s received from %s:%d, local info: %s:%d, msg size: %d, code: %d", CONN_GET_INST_LABEL(conn),
taosInetNtoa(conn->addr.sin_addr), ntohs(conn->addr.sin_port), taosInetNtoa(conn->localAddr.sin_addr), conn, TMSG_INFO(pHead->msgType), taosInetNtoa(conn->addr.sin_addr), ntohs(conn->addr.sin_port),
ntohs(conn->localAddr.sin_port), transMsg.contLen, transMsg.code); taosInetNtoa(conn->localAddr.sin_addr), ntohs(conn->localAddr.sin_port), transMsg.contLen, transMsg.code);
if (pCtx == NULL && CONN_NO_PERSIST_BY_APP(conn)) { if (pCtx == NULL && CONN_NO_PERSIST_BY_APP(conn)) {
tDebug("%s except, server continue send while cli ignore it", CONN_GET_INST_LABEL(conn)); tDebug("%s except, conn %p read while cli ignore it", CONN_GET_INST_LABEL(conn), conn);
// transUnrefCliHandle(conn); // transUnrefCliHandle(conn);
return; return;
} }
if (CONN_RELEASE_BY_SERVER(conn) && transMsg.info.ahandle == NULL) { if (CONN_RELEASE_BY_SERVER(conn) && transMsg.info.ahandle == NULL) {
tDebug("%s except, server continue send while cli ignore it", CONN_GET_INST_LABEL(conn)); tDebug("%s except, conn %p read while cli ignore it", CONN_GET_INST_LABEL(conn), conn);
// transUnrefCliHandle(conn); // transUnrefCliHandle(conn);
return; return;
} }
...@@ -477,9 +493,8 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) { ...@@ -477,9 +493,8 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) {
return NULL; return NULL;
} }
queue* h = QUEUE_HEAD(&plist->conn); queue* h = QUEUE_HEAD(&plist->conn);
// //QUEUE_REMOVE(h);
SCliConn* conn = QUEUE_DATA(h, SCliConn, conn); SCliConn* conn = QUEUE_DATA(h, SCliConn, conn);
// conn->status = ConnNormal; conn->status = ConnNormal;
QUEUE_REMOVE(&conn->conn); QUEUE_REMOVE(&conn->conn);
QUEUE_INIT(&conn->conn); QUEUE_INIT(&conn->conn);
return conn; return conn;
...@@ -559,6 +574,13 @@ static SCliConn* cliCreateConn(SCliThrdObj* pThrd) { ...@@ -559,6 +574,13 @@ static SCliConn* cliCreateConn(SCliThrdObj* pThrd) {
conn->status = ConnNormal; conn->status = ConnNormal;
conn->broken = 0; conn->broken = 0;
transRefCliHandle(conn); transRefCliHandle(conn);
SExHandle* exh = taosMemoryCalloc(1, sizeof(SExHandle));
exh->handle = conn;
exh->pThrd = pThrd;
exh->refId = transAddExHandle(refMgt, exh);
conn->refId = exh->refId;
return conn; return conn;
} }
static void cliDestroyConn(SCliConn* conn, bool clear) { static void cliDestroyConn(SCliConn* conn, bool clear) {
...@@ -566,6 +588,7 @@ static void cliDestroyConn(SCliConn* conn, bool clear) { ...@@ -566,6 +588,7 @@ static void cliDestroyConn(SCliConn* conn, bool clear) {
QUEUE_REMOVE(&conn->conn); QUEUE_REMOVE(&conn->conn);
QUEUE_INIT(&conn->conn); QUEUE_INIT(&conn->conn);
transRemoveExHandle(refMgt, conn->refId);
if (clear) { if (clear) {
uv_close((uv_handle_t*)conn->stream, cliDestroy); uv_close((uv_handle_t*)conn->stream, cliDestroy);
} }
...@@ -650,12 +673,10 @@ void cliSend(SCliConn* pConn) { ...@@ -650,12 +673,10 @@ void cliSend(SCliConn* pConn) {
uv_buf_t wb = uv_buf_init((char*)pHead, msgLen); uv_buf_t wb = uv_buf_init((char*)pHead, msgLen);
// char buf[64] = {0};
// TRACE_TO_STR(&pMsg->info.traceId, buf);
STraceId* trace = &pMsg->info.traceId; STraceId* trace = &pMsg->info.traceId;
tGTrace("conn %p %s is sent to %s:%d, local info %s:%d", pConn, TMSG_INFO(pHead->msgType), tGTrace("%s conn %p %s is sent to %s:%d, local info %s:%d", CONN_GET_INST_LABEL(pConn), pConn,
taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->localAddr.sin_addr), TMSG_INFO(pHead->msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port),
ntohs(pConn->localAddr.sin_port)); taosInetNtoa(pConn->localAddr.sin_addr), ntohs(pConn->localAddr.sin_port));
if (pHead->persist == 1) { if (pHead->persist == 1) {
CONN_SET_PERSIST_BY_APP(pConn); CONN_SET_PERSIST_BY_APP(pConn);
...@@ -663,7 +684,6 @@ void cliSend(SCliConn* pConn) { ...@@ -663,7 +684,6 @@ void cliSend(SCliConn* pConn) {
pConn->writeReq.data = pConn; pConn->writeReq.data = pConn;
uv_write(&pConn->writeReq, (uv_stream_t*)pConn->stream, &wb, 1, cliSendCb); uv_write(&pConn->writeReq, (uv_stream_t*)pConn->stream, &wb, 1, cliSendCb);
return; return;
_RETURN: _RETURN:
return; return;
...@@ -724,20 +744,32 @@ static void cliHandleUpdate(SCliMsg* pMsg, SCliThrdObj* pThrd) { ...@@ -724,20 +744,32 @@ static void cliHandleUpdate(SCliMsg* pMsg, SCliThrdObj* pThrd) {
SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrdObj* pThrd) { SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrdObj* pThrd) {
SCliConn* conn = NULL; SCliConn* conn = NULL;
if (pMsg->msg.info.handle != NULL) { SRpcHandleInfo* pInfo = &pMsg->msg.info;
conn = (SCliConn*)(pMsg->msg.info.handle);
if (conn != NULL) { SExHandle* exh = transAcquireExHandle(refMgt, pInfo->refId);
tTrace("%s conn %p reused", CONN_GET_INST_LABEL(conn), conn); if (exh == NULL) {
if (pInfo->refId != 0) {
tTrace("%s conn %p ignore msg", CONN_GET_INST_LABEL(conn), conn);
assert(0);
return NULL;
} }
} else { } else {
transReleaseExHandle(refMgt, pInfo->refId);
return exh->handle;
}
STransConnCtx* pCtx = pMsg->ctx; STransConnCtx* pCtx = pMsg->ctx;
conn = getConnFromPool(pThrd->pool, EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet)); conn = getConnFromPool(pThrd->pool, EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet));
if (conn != NULL) { if (conn != NULL) {
exh = taosMemoryCalloc(1, sizeof(SExHandle));
exh->handle = conn;
exh->pThrd = pThrd;
exh->refId = transAddExHandle(refMgt, exh);
conn->refId = exh->refId;
tTrace("%s conn %p get from conn pool", CONN_GET_INST_LABEL(conn), conn); tTrace("%s conn %p get from conn pool", CONN_GET_INST_LABEL(conn), conn);
} else { } else {
tTrace("%s not found conn in conn pool %p", ((STrans*)pThrd->pTransInst)->label, pThrd->pool); tTrace("%s not found conn in conn pool %p", ((STrans*)pThrd->pTransInst)->label, pThrd->pool);
} }
}
return conn; return conn;
} }
void cliMayCvtFqdnToIp(SEpSet* pEpSet, SCvtAddr* pCvtAddr) { void cliMayCvtFqdnToIp(SEpSet* pEpSet, SCvtAddr* pCvtAddr) {
...@@ -765,8 +797,6 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { ...@@ -765,8 +797,6 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
SCliConn* conn = cliGetConn(pMsg, pThrd); SCliConn* conn = cliGetConn(pMsg, pThrd);
if (conn != NULL) { if (conn != NULL) {
conn->hThrdIdx = pCtx->hThrdIdx;
transCtxMerge(&conn->ctx, &pCtx->appCtx); transCtxMerge(&conn->ctx, &pCtx->appCtx);
transQueuePush(&conn->cliMsgs, pMsg); transQueuePush(&conn->cliMsgs, pMsg);
cliSend(conn); cliSend(conn);
...@@ -775,7 +805,6 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { ...@@ -775,7 +805,6 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
transCtxMerge(&conn->ctx, &pCtx->appCtx); transCtxMerge(&conn->ctx, &pCtx->appCtx);
transQueuePush(&conn->cliMsgs, pMsg); transQueuePush(&conn->cliMsgs, pMsg);
conn->hThrdIdx = pCtx->hThrdIdx;
conn->ip = strdup(EPSET_GET_INUSE_IP(&pCtx->epSet)); conn->ip = strdup(EPSET_GET_INUSE_IP(&pCtx->epSet));
conn->port = EPSET_GET_INUSE_PORT(&pCtx->epSet); conn->port = EPSET_GET_INUSE_PORT(&pCtx->epSet);
...@@ -783,7 +812,7 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { ...@@ -783,7 +812,7 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
if (ret) { if (ret) {
tError("%s conn %p failed to set conn option, errmsg %s", transLabel(pTransInst), conn, uv_err_name(ret)); tError("%s conn %p failed to set conn option, errmsg %s", transLabel(pTransInst), conn, uv_err_name(ret));
} }
int fd = taosCreateSocketWithTimeOutOpt(TRANS_CONN_TIMEOUT); int32_t fd = taosCreateSocketWithTimeout(TRANS_CONN_TIMEOUT);
if (fd == -1) { if (fd == -1) {
tTrace("%s conn %p failed to create socket", transLabel(pTransInst), conn); tTrace("%s conn %p failed to create socket", transLabel(pTransInst), conn);
cliHandleExcept(conn); cliHandleExcept(conn);
...@@ -1009,7 +1038,9 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { ...@@ -1009,7 +1038,9 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
tTrace("%s use remote epset, inUse: %d, retry count:%d, limit: %d", pTransInst->label, pEpSet->inUse, tTrace("%s use remote epset, inUse: %d, retry count:%d, limit: %d", pTransInst->label, pEpSet->inUse,
pCtx->retryCount + 1, TRANS_RETRY_COUNT_LIMIT); pCtx->retryCount + 1, TRANS_RETRY_COUNT_LIMIT);
} }
if (pConn->status != ConnInPool) {
addConnToPool(pThrd->pool, pConn); addConnToPool(pThrd->pool, pConn);
}
STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg)); STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg));
arg->param1 = pMsg; arg->param1 = pMsg;
...@@ -1087,9 +1118,20 @@ void transReleaseCliHandle(void* handle) { ...@@ -1087,9 +1118,20 @@ void transReleaseCliHandle(void* handle) {
void transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransCtx* ctx) { void transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransCtx* ctx) {
STrans* pTransInst = (STrans*)shandle; STrans* pTransInst = (STrans*)shandle;
int idx = CONN_HOST_THREAD_IDX((SCliConn*)pReq->info.handle); SRpcHandleInfo* info = &pReq->info;
int idx = -1;
SCliThrdObj* pThrd = NULL;
SExHandle* exh = info->handle;
int64_t refId = -1;
if (exh != NULL) {
refId = exh->refId;
}
CONN_HOST_THREAD_IDX1(idx, exh, refId, pThrd);
if (idx == -1) { if (idx == -1) {
idx = cliRBChoseIdx(pTransInst); idx = cliRBChoseIdx(pTransInst);
pThrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[idx];
} }
TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64()); TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64());
...@@ -1097,7 +1139,6 @@ void transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STra ...@@ -1097,7 +1139,6 @@ void transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STra
pCtx->epSet = *pEpSet; pCtx->epSet = *pEpSet;
pCtx->ahandle = pReq->info.ahandle; pCtx->ahandle = pReq->info.ahandle;
pCtx->msgType = pReq->msgType; pCtx->msgType = pReq->msgType;
pCtx->hThrdIdx = idx;
if (ctx != NULL) { if (ctx != NULL) {
pCtx->appCtx = *ctx; pCtx->appCtx = *ctx;
...@@ -1110,19 +1151,31 @@ void transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STra ...@@ -1110,19 +1151,31 @@ void transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STra
cliMsg->st = taosGetTimestampUs(); cliMsg->st = taosGetTimestampUs();
cliMsg->type = Normal; cliMsg->type = Normal;
SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[idx];
STraceId* trace = &pReq->info.traceId; STraceId* trace = &pReq->info.traceId;
tGTrace("%s send request at thread:%08" PRId64 ", dst: %s:%d, app:%p", transLabel(pTransInst), thrd->pid, tGTrace("%s send request at thread:%08" PRId64 ", dst: %s:%d, app:%p", transLabel(pTransInst), pThrd->pid,
EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle); EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle);
ASSERT(transSendAsync(thrd->asyncPool, &(cliMsg->q)) == 0); ASSERT(transSendAsync(pThrd->asyncPool, &(cliMsg->q)) == 0);
_return1:
return;
_return2:
return;
} }
void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp) { void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp) {
STrans* pTransInst = (STrans*)shandle; STrans* pTransInst = (STrans*)shandle;
int idx = CONN_HOST_THREAD_IDX(pReq->info.handle); SRpcHandleInfo* info = &pReq->info;
SCliThrdObj* pThrd = NULL;
int idx = -1;
SExHandle* exh = info->handle;
int64_t refId = -1;
if (exh != NULL) {
refId = exh->refId;
}
CONN_HOST_THREAD_IDX1(idx, exh, refId, pThrd);
if (idx == -1) { if (idx == -1) {
idx = cliRBChoseIdx(pTransInst); idx = cliRBChoseIdx(pTransInst);
pThrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[idx];
} }
tsem_t* sem = taosMemoryCalloc(1, sizeof(tsem_t)); tsem_t* sem = taosMemoryCalloc(1, sizeof(tsem_t));
tsem_init(sem, 0, 0); tsem_init(sem, 0, 0);
...@@ -1133,7 +1186,6 @@ void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransM ...@@ -1133,7 +1186,6 @@ void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransM
pCtx->epSet = *pEpSet; pCtx->epSet = *pEpSet;
pCtx->ahandle = pReq->info.ahandle; pCtx->ahandle = pReq->info.ahandle;
pCtx->msgType = pReq->msgType; pCtx->msgType = pReq->msgType;
pCtx->hThrdIdx = idx;
pCtx->pSem = sem; pCtx->pSem = sem;
pCtx->pRsp = pRsp; pCtx->pRsp = pRsp;
...@@ -1143,16 +1195,18 @@ void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransM ...@@ -1143,16 +1195,18 @@ void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransM
cliMsg->st = taosGetTimestampUs(); cliMsg->st = taosGetTimestampUs();
cliMsg->type = Normal; cliMsg->type = Normal;
SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[idx];
STraceId* trace = &pReq->info.traceId; STraceId* trace = &pReq->info.traceId;
tGTrace("%s send request at thread:%08" PRId64 ", dst: %s:%d, app:%p", transLabel(pTransInst), thrd->pid, tGTrace("%s send request at thread:%08" PRId64 ", dst: %s:%d, app:%p", transLabel(pTransInst), pThrd->pid,
EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle); EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle);
transSendAsync(thrd->asyncPool, &(cliMsg->q)); transSendAsync(pThrd->asyncPool, &(cliMsg->q));
tsem_wait(sem); tsem_wait(sem);
tsem_destroy(sem); tsem_destroy(sem);
taosMemoryFree(sem); taosMemoryFree(sem);
_return1:
return;
_return2:
return;
} }
/* /*
* *
...@@ -1168,7 +1222,6 @@ void transSetDefaultAddr(void* ahandle, const char* ip, const char* fqdn) { ...@@ -1168,7 +1222,6 @@ void transSetDefaultAddr(void* ahandle, const char* ip, const char* fqdn) {
} }
for (int i = 0; i < pTransInst->numOfThreads; i++) { for (int i = 0; i < pTransInst->numOfThreads; i++) {
STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx)); STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx));
pCtx->hThrdIdx = i;
pCtx->cvtAddr = cvtAddr; pCtx->cvtAddr = cvtAddr;
SCliMsg* cliMsg = taosMemoryCalloc(1, sizeof(SCliMsg)); SCliMsg* cliMsg = taosMemoryCalloc(1, sizeof(SCliMsg));
......
...@@ -455,7 +455,7 @@ void transPrintEpSet(SEpSet* pEpSet) { ...@@ -455,7 +455,7 @@ void transPrintEpSet(SEpSet* pEpSet) {
return; return;
} }
char buf[512] = {0}; char buf[512] = {0};
int len = snprintf(buf, sizeof(buf), "epset { "); int len = snprintf(buf, sizeof(buf), "epset:{ ");
for (int i = 0; i < pEpSet->numOfEps; i++) { for (int i = 0; i < pEpSet->numOfEps; i++) {
if (i == pEpSet->numOfEps - 1) { if (i == pEpSet->numOfEps - 1) {
len += snprintf(buf + len, sizeof(buf) - len, "%d. %s:%d ", i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port); len += snprintf(buf + len, sizeof(buf) - len, "%d. %s:%d ", i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port);
......
...@@ -206,32 +206,6 @@ static bool addHandleToAcceptloop(void* arg); ...@@ -206,32 +206,6 @@ static bool addHandleToAcceptloop(void* arg);
} \ } \
} while (0) } while (0)
#define ASYNC_CHECK_HANDLE(exh1, refId) \
do { \
if (refId > 0) { \
tTrace("handle step1"); \
SExHandle* exh2 = transAcquireExHandle(refMgt, refId); \
if (exh2 == NULL || refId != exh2->refId) { \
tTrace("handle %p except, may already freed, ignore msg, ref1: %" PRIu64 ", ref2 : %" PRIu64 "", exh1, \
exh2 ? exh2->refId : 0, refId); \
goto _return1; \
} \
} else if (refId == 0) { \
tTrace("handle step2"); \
SExHandle* exh2 = transAcquireExHandle(refMgt, refId); \
if (exh2 == NULL || refId != exh2->refId) { \
tTrace("handle %p except, may already freed, ignore msg, ref1: %" PRIu64 ", ref2 : %" PRIu64 "", exh1, refId, \
exh2 ? exh2->refId : 0); \
goto _return1; \
} else { \
refId = exh1->refId; \
} \
} else if (refId < 0) { \
tTrace("handle step3"); \
goto _return2; \
} \
} while (0)
void uvAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { void uvAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
SSvrConn* conn = handle->data; SSvrConn* conn = handle->data;
SConnBuffer* pBuf = &conn->readBuf; SConnBuffer* pBuf = &conn->readBuf;
......
...@@ -946,7 +946,7 @@ int32_t taosGetFqdn(char *fqdn) { ...@@ -946,7 +946,7 @@ int32_t taosGetFqdn(char *fqdn) {
#endif // __APPLE__ #endif // __APPLE__
int32_t ret = getaddrinfo(hostname, NULL, &hints, &result); int32_t ret = getaddrinfo(hostname, NULL, &hints, &result);
if (!result) { if (!result) {
fprintf(stderr,"failed to get fqdn, code:%d, reason:%s", ret, gai_strerror(ret)); fprintf(stderr, "failed to get fqdn, code:%d, reason:%s", ret, gai_strerror(ret));
return -1; return -1;
} }
...@@ -1073,7 +1073,7 @@ int32_t taosCloseEpoll(TdEpollPtr *ppEpoll) { ...@@ -1073,7 +1073,7 @@ int32_t taosCloseEpoll(TdEpollPtr *ppEpoll) {
* Set TCP connection timeout per-socket level. * Set TCP connection timeout per-socket level.
* ref [https://github.com/libuv/help/issues/54] * ref [https://github.com/libuv/help/issues/54]
*/ */
int taosCreateSocketWithTimeOutOpt(uint32_t conn_timeout_sec) { int32_t taosCreateSocketWithTimeout(uint32_t timeout) {
#if defined(WINDOWS) #if defined(WINDOWS)
SOCKET fd; SOCKET fd;
#else #else
...@@ -1083,11 +1083,11 @@ int taosCreateSocketWithTimeOutOpt(uint32_t conn_timeout_sec) { ...@@ -1083,11 +1083,11 @@ int taosCreateSocketWithTimeOutOpt(uint32_t conn_timeout_sec) {
return -1; return -1;
} }
#if defined(WINDOWS) #if defined(WINDOWS)
if (0 != setsockopt(fd, IPPROTO_TCP, TCP_MAXRT, (char *)&conn_timeout_sec, sizeof(conn_timeout_sec))) { if (0 != setsockopt(fd, IPPROTO_TCP, TCP_MAXRT, (char *)&timeout, sizeof(timeout))) {
return -1; return -1;
} }
#else // Linux like systems #else // Linux like systems
uint32_t conn_timeout_ms = conn_timeout_sec * 1000; uint32_t conn_timeout_ms = timeout * 1000;
if (0 != setsockopt(fd, IPPROTO_TCP, TCP_USER_TIMEOUT, (char *)&conn_timeout_ms, sizeof(conn_timeout_ms))) { if (0 != setsockopt(fd, IPPROTO_TCP, TCP_USER_TIMEOUT, (char *)&conn_timeout_ms, sizeof(conn_timeout_ms))) {
return -1; return -1;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册