diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index d86a8dae6e5a682fd77422b77682ac1d82dd2137..0b767e96f64e7f4528b51480d907b21ac2d0c2ad 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -16,6 +16,7 @@ #ifndef _TD_QUERY_H_ #define _TD_QUERY_H_ +// clang-foramt off #ifdef __cplusplus extern "C" { #endif @@ -71,7 +72,7 @@ typedef struct SIndexMeta { } SIndexMeta; typedef struct STbVerInfo { - char tbFName[TSDB_TABLE_FNAME_LEN]; + char tbFName[TSDB_TABLE_FNAME_LEN]; int32_t sversion; int32_t tversion; } STbVerInfo; @@ -141,7 +142,7 @@ typedef struct SDataBuf { typedef struct STargetInfo { ETargetType type; - char* dbFName; // used to update db's vgroup epset + char* dbFName; // used to update db's vgroup epset int32_t vgId; } STargetInfo; @@ -149,15 +150,15 @@ typedef int32_t (*__async_send_cb_fn_t)(void* param, const SDataBuf* pMsg, int32 typedef int32_t (*__async_exec_fn_t)(void* param); typedef struct SRequestConnInfo { - void* pTrans; - uint64_t requestId; - int64_t requestObjRefId; - SEpSet mgmtEps; + void* pTrans; + uint64_t requestId; + int64_t requestObjRefId; + SEpSet mgmtEps; } SRequestConnInfo; typedef struct SMsgSendInfo { - __async_send_cb_fn_t fp; // async callback function - STargetInfo target; // for update epset + __async_send_cb_fn_t fp; // async callback function + STargetInfo target; // for update epset void* param; uint64_t requestId; uint64_t requestObjRefId; @@ -206,13 +207,15 @@ int32_t queryCreateTableMetaFromMsg(STableMetaRsp* msg, bool isSuperTable, STabl char* jobTaskStatusStr(int32_t status); SSchema createSchema(int8_t type, int32_t bytes, col_id_t colId, const char* name); -void destroyQueryExecRes(SQueryExecRes* pRes); -int32_t dataConverToStr(char *str, int type, void *buf, int32_t bufSize, int32_t *len); -char* parseTagDatatoJson(void* p); + +void destroyQueryExecRes(SQueryExecRes* pRes); +int32_t dataConverToStr(char* str, int type, void* buf, int32_t bufSize, int32_t* len); +char* parseTagDatatoJson(void* p); int32_t cloneTableMeta(STableMeta* pSrc, STableMeta** pDst); int32_t cloneDbVgInfo(SDBVgInfo* pSrc, SDBVgInfo** pDst); -extern int32_t (*queryBuildMsg[TDMT_MAX])(void *input, char **msg, int32_t msgSize, int32_t *msgLen, void*(*mallocFp)(int32_t)); +extern int32_t (*queryBuildMsg[TDMT_MAX])(void* input, char** msg, int32_t msgSize, int32_t* msgLen, + void* (*mallocFp)(int32_t)); extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t msgSize); #define SET_META_TYPE_NULL(t) (t) = META_TYPE_NULL_TABLE @@ -223,7 +226,7 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t #define NEED_CLIENT_RM_TBLMETA_ERROR(_code) \ ((_code) == TSDB_CODE_PAR_TABLE_NOT_EXIST || (_code) == TSDB_CODE_VND_TB_NOT_EXIST || \ (_code) == TSDB_CODE_PAR_INVALID_COLUMNS_NUM || (_code) == TSDB_CODE_PAR_INVALID_COLUMN || \ - (_code) == TSDB_CODE_PAR_TAGS_NOT_MATCHED || (_code) == TSDB_CODE_PAR_VALUE_TOO_LONG || \ + (_code) == TSDB_CODE_PAR_TAGS_NOT_MATCHED || (_code) == TSDB_CODE_PAR_VALUE_TOO_LONG || \ (_code) == TSDB_CODE_PAR_INVALID_DROP_COL || ((_code) == TSDB_CODE_TDB_INVALID_TABLE_ID)) #define NEED_CLIENT_REFRESH_VG_ERROR(_code) \ ((_code) == TSDB_CODE_VND_HASH_MISMATCH || (_code) == TSDB_CODE_VND_INVALID_VGROUP_ID) @@ -231,11 +234,13 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t #define NEED_CLIENT_HANDLE_ERROR(_code) \ (NEED_CLIENT_RM_TBLMETA_ERROR(_code) || NEED_CLIENT_REFRESH_VG_ERROR(_code) || \ NEED_CLIENT_REFRESH_TBLMETA_ERROR(_code)) -#define NEED_CLIENT_RM_TBLMETA_REQ(_type) ((_type) == TDMT_VND_CREATE_TABLE || (_type) == TDMT_VND_CREATE_STB \ - || (_type) == TDMT_VND_DROP_TABLE || (_type) == TDMT_VND_DROP_STB) +#define NEED_CLIENT_RM_TBLMETA_REQ(_type) \ + ((_type) == TDMT_VND_CREATE_TABLE || (_type) == TDMT_VND_CREATE_STB || (_type) == TDMT_VND_DROP_TABLE || \ + (_type) == TDMT_VND_DROP_STB) -#define NEED_SCHEDULER_RETRY_ERROR(_code) \ - ((_code) == TSDB_CODE_RPC_REDIRECT || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL || (_code) == TSDB_CODE_SCH_TIMEOUT_ERROR) +#define NEED_SCHEDULER_RETRY_ERROR(_code) \ + ((_code) == TSDB_CODE_RPC_REDIRECT || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL || \ + (_code) == TSDB_CODE_SCH_TIMEOUT_ERROR) #define REQUEST_TOTAL_EXEC_TIMES 2 @@ -312,3 +317,4 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t #endif #endif /*_TD_QUERY_H_*/ + // clang-foramt on diff --git a/include/os/osSocket.h b/include/os/osSocket.h index 213a6930eebb4fe7a97bf45f1f5db80a5d026a22..9dd5b972fa33b9fa47d7841ebdf0de22f683c81a 100644 --- a/include/os/osSocket.h +++ b/include/os/osSocket.h @@ -157,7 +157,10 @@ int32_t taosNonblockwrite(TdSocketPtr pSocket, char *ptr, int32_t nbytes); int64_t taosCopyFds(TdSocketPtr pSrcSocket, TdSocketPtr pDestSocket, int64_t len); void taosWinSocketInit(); -int taosCreateSocketWithTimeOutOpt(uint32_t conn_timeout_sec); +/* + * set timeout(ms) + */ +int32_t taosCreateSocketWithTimeout(uint32_t timeout); TdSocketPtr taosOpenUdpSocket(uint32_t localIp, uint16_t localPort); TdSocketPtr taosOpenTcpClientSocket(uint32_t ip, uint16_t port, uint32_t localIp); diff --git a/include/util/ttrace.h b/include/util/ttrace.h index 206cbbf28d9a785873333e3d32d7a67a0115d603..579768228a6086fda8656b0dca1430e7da06f9a6 100644 --- a/include/util/ttrace.h +++ b/include/util/ttrace.h @@ -45,9 +45,11 @@ typedef struct STraceId { #define TRACE_GET_MSGID(traceId) (traceId)->msgId -#define TRACE_TO_STR(traceId, buf) \ - do { \ - sprintf(buf, "0x%" PRIx64 ":0x%" PRIx64 "", traceId->rootId, traceId->msgId); \ +#define TRACE_TO_STR(traceId, buf) \ + do { \ + int64_t rootId = (traceId) != NULL ? (traceId)->rootId : 0; \ + int64_t msgId = (traceId) != NULL ? (traceId)->msgId : 0; \ + sprintf(buf, "0x%" PRIx64 ":0x%" PRIx64 "", rootId, msgId); \ } while (0) #ifdef __cplusplus diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index 9f04e89694f77153b747892872d0cdb0c173ca6f..d7bf4b60f1fdaacb1cbb69fa591166cd0cf9886d 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -13,11 +13,11 @@ * along with this program. If not, see . */ -#include "os.h" #include "catalog.h" -#include "functionMgt.h" #include "clientInt.h" #include "clientLog.h" +#include "functionMgt.h" +#include "os.h" #include "query.h" #include "scheduler.h" #include "tcache.h" @@ -38,7 +38,7 @@ static TdThreadOnce tscinit = PTHREAD_ONCE_INIT; volatile int32_t tscInitRes = 0; static void registerRequest(SRequestObj *pRequest) { - STscObj *pTscObj = acquireTscObj(*(int64_t*)pRequest->pTscObj->id); + STscObj *pTscObj = acquireTscObj(*(int64_t *)pRequest->pTscObj->id); assert(pTscObj != NULL); @@ -54,14 +54,14 @@ static void registerRequest(SRequestObj *pRequest) { int32_t currentInst = atomic_add_fetch_64((int64_t *)&pSummary->currentRequests, 1); tscDebug("0x%" PRIx64 " new Request from connObj:0x%" PRIx64 ", current:%d, app current:%d, total:%d, reqId:0x%" PRIx64, - pRequest->self, *(int64_t*)pRequest->pTscObj->id, num, currentInst, total, pRequest->requestId); + pRequest->self, *(int64_t *)pRequest->pTscObj->id, num, currentInst, total, pRequest->requestId); } } static void deregisterRequest(SRequestObj *pRequest) { assert(pRequest != NULL); - STscObj *pTscObj = pRequest->pTscObj; + STscObj * pTscObj = pRequest->pTscObj; SAppClusterSummary *pActivity = &pTscObj->pAppInfo->summary; int32_t currentInst = atomic_sub_fetch_64((int64_t *)&pActivity->currentRequests, 1); @@ -70,8 +70,8 @@ static void deregisterRequest(SRequestObj *pRequest) { int64_t duration = taosGetTimestampUs() - pRequest->metric.start; tscDebug("0x%" PRIx64 " free Request from connObj: 0x%" PRIx64 ", reqId:0x%" PRIx64 " elapsed:%" PRIu64 " ms, current:%d, app current:%d", - pRequest->self, *(int64_t*)pTscObj->id, pRequest->requestId, duration / 1000, num, currentInst); - releaseTscObj(*(int64_t*)pTscObj->id); + pRequest->self, *(int64_t *)pTscObj->id, pRequest->requestId, duration / 1000, num, currentInst); + releaseTscObj(*(int64_t *)pTscObj->id); } // todo close the transporter properly @@ -80,12 +80,13 @@ void closeTransporter(STscObj *pTscObj) { return; } - tscDebug("free transporter:%p in connObj: 0x%" PRIx64, pTscObj->pAppInfo->pTransporter, *(int64_t*)pTscObj->id); + tscDebug("free transporter:%p in connObj: 0x%" PRIx64, pTscObj->pAppInfo->pTransporter, *(int64_t *)pTscObj->id); rpcClose(pTscObj->pAppInfo->pTransporter); } static bool clientRpcRfp(int32_t code) { - if (code == TSDB_CODE_RPC_REDIRECT) { + if (code == TSDB_CODE_RPC_REDIRECT || code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_NODE_NOT_DEPLOYED || + code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_APP_NOT_READY) { return true; } else { return false; @@ -128,16 +129,17 @@ void closeAllRequests(SHashObj *pRequests) { void destroyTscObj(void *pObj) { STscObj *pTscObj = pObj; - SClientHbKey connKey = {.tscRid = *(int64_t*)pTscObj->id, .connType = pTscObj->connType}; + SClientHbKey connKey = {.tscRid = *(int64_t *)pTscObj->id, .connType = pTscObj->connType}; hbDeregisterConn(pTscObj->pAppInfo->pAppHbMgr, connKey); int64_t connNum = atomic_sub_fetch_64(&pTscObj->pAppInfo->numOfConns, 1); closeAllRequests(pTscObj->pRequests); schedulerStopQueryHb(pTscObj->pAppInfo->pTransporter); if (0 == connNum) { - // TODO - //closeTransporter(pTscObj); + // TODO + // closeTransporter(pTscObj); } - tscDebug("connObj 0x%" PRIx64 " destroyed, totalConn:%" PRId64, *(int64_t*)pTscObj->id, pTscObj->pAppInfo->numOfConns); + tscDebug("connObj 0x%" PRIx64 " destroyed, totalConn:%" PRId64, *(int64_t *)pTscObj->id, + pTscObj->pAppInfo->numOfConns); taosThreadMutexDestroy(&pTscObj->mutex); taosMemoryFreeClear(pTscObj); } @@ -167,10 +169,10 @@ void *createTscObj(const char *user, const char *auth, const char *db, int32_t c taosThreadMutexInit(&pObj->mutex, NULL); pObj->id = taosMemoryMalloc(sizeof(int64_t)); - *(int64_t*)pObj->id = taosAddRef(clientConnRefPool, pObj); + *(int64_t *)pObj->id = taosAddRef(clientConnRefPool, pObj); pObj->schemalessType = 1; - tscDebug("connObj created, 0x%" PRIx64, *(int64_t*)pObj->id); + tscDebug("connObj created, 0x%" PRIx64, *(int64_t *)pObj->id); return pObj; } @@ -325,7 +327,7 @@ int taos_options_imp(TSDB_OPTION option, const char *str) { return 0; } - SConfig *pCfg = taosGetCfg(); + SConfig * pCfg = taosGetCfg(); SConfigItem *pItem = NULL; switch (option) { diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 489966b6369f6100f3867cb8f613e09f42062134..ac9daa5119d41d58c7533b81af45f74503feb013 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -617,12 +617,12 @@ int32_t scheduleAsyncQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNod .requestId = pRequest->requestId, .requestObjRefId = pRequest->self}; SSchedulerReq req = {.pConn = &conn, - .pNodeList = pNodeList, - .pDag = pDag, - .sql = pRequest->sqlstr, - .startTs = pRequest->metric.start, - .fp = schdExecCallback, - .cbParam = &res}; + .pNodeList = pNodeList, + .pDag = pDag, + .sql = pRequest->sqlstr, + .startTs = pRequest->metric.start, + .fp = schdExecCallback, + .cbParam = &res}; int32_t code = schedulerAsyncExecJob(&req, &pRequest->body.queryJob); @@ -669,13 +669,13 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList .requestId = pRequest->requestId, .requestObjRefId = pRequest->self}; SSchedulerReq req = {.pConn = &conn, - .pNodeList = pNodeList, - .pDag = pDag, - .sql = pRequest->sqlstr, - .startTs = pRequest->metric.start, - .fp = NULL, - .cbParam = NULL, - .reqKilled = &pRequest->killed}; + .pNodeList = pNodeList, + .pDag = pDag, + .sql = pRequest->sqlstr, + .startTs = pRequest->metric.start, + .fp = NULL, + .cbParam = NULL, + .reqKilled = &pRequest->killed}; int32_t code = schedulerExecJob(&req, &pRequest->body.queryJob, &res); pRequest->body.resInfo.execRes = res.res; diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index 63d2a65df144340624acac260b50bdd1406d418b..a4745abd5b7361bcd0f743a0a2dc37f2dc6804f7 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -70,9 +70,9 @@ int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) { } static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { - SDnodeTrans *pTrans = &pDnode->trans; + SDnodeTrans * pTrans = &pDnode->trans; int32_t code = -1; - SRpcMsg *pMsg = NULL; + SRpcMsg * pMsg = NULL; SMgmtWrapper *pWrapper = NULL; SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pRpc->msgType)]; @@ -194,11 +194,11 @@ int32_t dmInitMsgHandle(SDnode *pDnode) { for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) { SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype]; - SArray *pArray = (*pWrapper->func.getHandlesFp)(); + SArray * pArray = (*pWrapper->func.getHandlesFp)(); if (pArray == NULL) return -1; for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) { - SMgmtHandle *pMgmt = taosArrayGet(pArray, i); + SMgmtHandle * pMgmt = taosArrayGet(pArray, i); SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pMgmt->msgType)]; if (pMgmt->needCheckVgId) { pHandle->needCheckVgId = pMgmt->needCheckVgId; @@ -248,7 +248,14 @@ static inline void dmReleaseHandle(SRpcHandleInfo *pHandle, int8_t type) { } } -static bool rpcRfp(int32_t code) { return code == TSDB_CODE_RPC_REDIRECT; } +static bool rpcRfp(int32_t code) { + if (code == TSDB_CODE_RPC_REDIRECT || code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_NODE_NOT_DEPLOYED || + code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_APP_NOT_READY) { + return true; + } else { + return false; + } +} int32_t dmInitClient(SDnode *pDnode) { SDnodeTrans *pTrans = &pDnode->trans; diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index 838071dbf1e7322c50d6cb62cacdfd49b41df07c..983cffe9dc4dca0aff5264e4991fd7e61ebb5789 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -12,6 +12,8 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ + +// clang-format off #include "uv.h" #include "os.h" #include "fnLog.h" @@ -25,6 +27,7 @@ #include "tglobal.h" #include "tmsg.h" #include "trpc.h" +// clang-foramt on typedef struct SUdfdContext { uv_loop_t * loop; @@ -103,12 +106,12 @@ typedef struct SUdfdRpcSendRecvInfo { uv_sem_t resultSem; } SUdfdRpcSendRecvInfo; -static void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet); +static void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet); static int32_t udfdFillUdfInfoFromMNode(void *clientRpc, char *udfName, SUdf *udf); static int32_t udfdConnectToMnode(); static int32_t udfdLoadUdf(char *udfName, SUdf *udf); -static bool udfdRpcRfp(int32_t code); -static int initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSet); +static bool udfdRpcRfp(int32_t code); +static int initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSet); static int32_t udfdOpenClientRpc(); static int32_t udfdCloseClientRpc(); @@ -126,19 +129,19 @@ static void udfdUvHandleError(SUdfdUvConn *conn) { uv_close((uv_handle_t *)conn- static void udfdPipeRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf); static void udfdOnNewConnection(uv_stream_t *server, int status); -static void udfdIntrSignalHandler(uv_signal_t *handle, int signum); +static void udfdIntrSignalHandler(uv_signal_t *handle, int signum); static int32_t removeListeningPipe(); -static void udfdPrintVersion(); +static void udfdPrintVersion(); static int32_t udfdParseArgs(int32_t argc, char *argv[]); static int32_t udfdInitLog(); -static void udfdCtrlAllocBufCb(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf); -static void udfdCtrlReadCb(uv_stream_t *q, ssize_t nread, const uv_buf_t *buf); +static void udfdCtrlAllocBufCb(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf); +static void udfdCtrlReadCb(uv_stream_t *q, ssize_t nread, const uv_buf_t *buf); static int32_t udfdUvInit(); -static void udfdCloseWalkCb(uv_handle_t *handle, void *arg); +static void udfdCloseWalkCb(uv_handle_t *handle, void *arg); static int32_t udfdRun(); -static void udfdConnectMnodeThreadFunc(void* args); +static void udfdConnectMnodeThreadFunc(void *args); void udfdProcessRequest(uv_work_t *req) { SUvUdfWork *uvUdf = (SUvUdfWork *)(req->data); @@ -401,11 +404,11 @@ void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { udf->bufSize = pFuncInfo->bufSize; char path[PATH_MAX] = {0}; - #ifdef WINDOWS +#ifdef WINDOWS snprintf(path, sizeof(path), "%s%s.dll", TD_TMP_DIR_PATH, pFuncInfo->name); - #else +#else snprintf(path, sizeof(path), "%s/lib%s.so", TD_TMP_DIR_PATH, pFuncInfo->name); - #endif +#endif TdFilePtr file = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC | TD_FILE_AUTO_DEL); if (file == NULL) { @@ -544,7 +547,8 @@ int32_t udfdLoadUdf(char *udfName, SUdf *udf) { return 0; } static bool udfdRpcRfp(int32_t code) { - if (code == TSDB_CODE_RPC_REDIRECT) { + if (code == TSDB_CODE_RPC_REDIRECT || code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_NODE_NOT_DEPLOYED || + code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_APP_NOT_READY) { return true; } else { return false; @@ -652,8 +656,7 @@ void udfdAllocBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf) { buf->base = ctx->inputBuf; buf->len = ctx->inputCap; } else { - fnError("udfd can not allocate enough memory") - buf->base = NULL; + fnError("udfd can not allocate enough memory") buf->base = NULL; buf->len = 0; } } else { @@ -664,8 +667,7 @@ void udfdAllocBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf) { buf->base = ctx->inputBuf + ctx->inputLen; buf->len = ctx->inputCap - ctx->inputLen; } else { - fnError("udfd can not allocate enough memory") - buf->base = NULL; + fnError("udfd can not allocate enough memory") buf->base = NULL; buf->len = 0; } } @@ -881,7 +883,7 @@ static int32_t udfdRun() { return 0; } -void udfdConnectMnodeThreadFunc(void* args) { +void udfdConnectMnodeThreadFunc(void *args) { int32_t retryMnodeTimes = 0; int32_t code = 0; while (retryMnodeTimes++ <= TSDB_MAX_REPLICA) { @@ -939,7 +941,7 @@ int main(int argc, char *argv[]) { uv_thread_create(&mnodeConnectThread, udfdConnectMnodeThreadFunc, NULL); udfdRun(); - + removeListeningPipe(); udfdCloseClientRpc(); diff --git a/source/libs/qcom/src/queryUtil.c b/source/libs/qcom/src/queryUtil.c index c807c1c4cdaa0e958df0842358a741e5ef73ddba..bee0461e58d30ca5933c20642f0c2a727ceab900 100644 --- a/source/libs/qcom/src/queryUtil.c +++ b/source/libs/qcom/src/queryUtil.c @@ -19,6 +19,7 @@ #include "tmsg.h" #include "trpc.h" #include "tsched.h" +// clang-format off #include "cJSON.h" #define VALIDNUMOFCOLS(x) ((x) >= TSDB_MIN_COLUMNS && (x) <= TSDB_MAX_COLUMNS) @@ -147,13 +148,15 @@ int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTra } memcpy(pMsg, pInfo->msgInfo.pData, pInfo->msgInfo.len); - SRpcMsg rpcMsg = {.msgType = pInfo->msgType, - .pCont = pMsg, - .contLen = pInfo->msgInfo.len, - .info.ahandle = (void*)pInfo, - .info.handle = pInfo->msgInfo.handle, - .info.persistHandle = persistHandle, - .code = 0}; + SRpcMsg rpcMsg = { + .msgType = pInfo->msgType, + .pCont = pMsg, + .contLen = pInfo->msgInfo.len, + .info.ahandle = (void*)pInfo, + .info.handle = pInfo->msgInfo.handle, + .info.persistHandle = persistHandle, + .code = 0 + }; assert(pInfo->fp != NULL); TRACE_SET_ROOTID(&rpcMsg.info.traceId, pInfo->requestId); rpcSendRequestWithCtx(pTransporter, epSet, &rpcMsg, pTransporterId, rpcCtx); @@ -221,8 +224,9 @@ void destroyQueryExecRes(SQueryExecRes* pRes) { qError("invalid exec result for request type %d", pRes->msgType); } } +// clang-format on -int32_t dataConverToStr(char *str, int type, void *buf, int32_t bufSize, int32_t *len) { +int32_t dataConverToStr(char* str, int type, void* buf, int32_t bufSize, int32_t* len) { int32_t n = 0; switch (type) { @@ -262,7 +266,7 @@ int32_t dataConverToStr(char *str, int type, void *buf, int32_t bufSize, int32_t case TSDB_DATA_TYPE_BINARY: case TSDB_DATA_TYPE_NCHAR: if (bufSize < 0) { -// tscError("invalid buf size"); + // tscError("invalid buf size"); return TSDB_CODE_TSC_INVALID_VALUE; } @@ -289,7 +293,7 @@ int32_t dataConverToStr(char *str, int type, void *buf, int32_t bufSize, int32_t break; default: -// tscError("unsupported type:%d", type); + // tscError("unsupported type:%d", type); return TSDB_CODE_TSC_INVALID_VALUE; } @@ -332,7 +336,7 @@ char* parseTagDatatoJson(void* p) { int32_t length = taosUcs4ToMbs((TdUcs4*)pTagVal->pData, pTagVal->nData, tagJsonValue); if (length < 0) { qError("charset:%s to %s. val:%s convert json value failed.", DEFAULT_UNICODE_ENCODEC, tsCharset, - pTagVal->pData); + pTagVal->pData); taosMemoryFree(tagJsonValue); goto end; } @@ -372,7 +376,6 @@ end: return string; } - int32_t cloneTableMeta(STableMeta* pSrc, STableMeta** pDst) { if (NULL == pSrc) { *pDst = NULL; @@ -393,37 +396,36 @@ int32_t cloneDbVgInfo(SDBVgInfo* pSrc, SDBVgInfo** pDst) { *pDst = NULL; return TSDB_CODE_SUCCESS; } - + *pDst = taosMemoryMalloc(sizeof(*pSrc)); if (NULL == *pDst) { return TSDB_CODE_TSC_OUT_OF_MEMORY; } memcpy(*pDst, pSrc, sizeof(*pSrc)); if (pSrc->vgHash) { - (*pDst)->vgHash = taosHashInit(taosHashGetSize(pSrc->vgHash), taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); + (*pDst)->vgHash = taosHashInit(taosHashGetSize(pSrc->vgHash), taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, + HASH_ENTRY_LOCK); if (NULL == (*pDst)->vgHash) { return TSDB_CODE_TSC_OUT_OF_MEMORY; } SVgroupInfo* vgInfo = NULL; - void *pIter = taosHashIterate(pSrc->vgHash, NULL); + void* pIter = taosHashIterate(pSrc->vgHash, NULL); while (pIter) { vgInfo = pIter; int32_t* vgId = taosHashGetKey(pIter, NULL); - + if (0 != taosHashPut((*pDst)->vgHash, vgId, sizeof(*vgId), vgInfo, sizeof(*vgInfo))) { qError("taosHashPut failed, vgId:%d", vgInfo->vgId); - taosHashCancelIterate(pSrc->vgHash, pIter); + taosHashCancelIterate(pSrc->vgHash, pIter); taosHashCleanup((*pDst)->vgHash); taosMemoryFreeClear(*pDst); return TSDB_CODE_CTG_MEM_ERROR; } - + pIter = taosHashIterate(pSrc->vgHash, pIter); } } - + return TSDB_CODE_SUCCESS; } - - diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index bbd9ce3a78801fd7cc5001e17bd66745c336d7b1..963a85922f340d86bea58b30a2dd5cabba17d8b5 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -105,6 +105,13 @@ typedef SRpcCtxVal STransCtxVal; typedef SRpcInfo STrans; typedef SRpcConnInfo STransHandleInfo; +// ref mgt +// handle +typedef struct SExHandle { + void* handle; + int64_t refId; + void* pThrd; +} SExHandle; /*convet from fqdn to ip */ typedef struct SCvtAddr { char ip[TSDB_FQDN_LEN]; @@ -113,14 +120,15 @@ typedef struct SCvtAddr { } SCvtAddr; typedef struct { - SEpSet epSet; // ip list provided by app - SEpSet origEpSet; - void* ahandle; // handle provided by app - tmsg_t msgType; // message type - int8_t connType; // connection type cli/srv - int64_t rid; // refId returned by taosAddRef - - int8_t retryCount; + SEpSet epSet; // ip list provided by app + SEpSet origEpSet; + void* ahandle; // handle provided by app + tmsg_t msgType; // message type + int8_t connType; // connection type cli/srv + + int8_t retryCnt; + int8_t retryLimit; + // bool setMaxRetry; STransCtx appCtx; // STransMsg* pRsp; // for synchronous API tsem_t* pSem; // for synchronous API @@ -239,6 +247,32 @@ int transSendAsync(SAsyncPool* pool, queue* mq); } \ } \ } while (0) + +#define ASYNC_CHECK_HANDLE(exh1, id) \ + do { \ + if (id > 0) { \ + tTrace("handle step1"); \ + SExHandle* exh2 = transAcquireExHandle(refMgt, id); \ + if (exh2 == NULL || id != exh2->refId) { \ + tTrace("handle %p except, may already freed, ignore msg, ref1: %" PRIu64 ", ref2 : %" PRIu64 "", exh1, \ + exh2 ? exh2->refId : 0, id); \ + goto _return1; \ + } \ + } else if (id == 0) { \ + tTrace("handle step2"); \ + SExHandle* exh2 = transAcquireExHandle(refMgt, id); \ + if (exh2 == NULL || id == exh2->refId) { \ + tTrace("handle %p except, may already freed, ignore msg, ref1: %" PRIu64 ", ref2 : %" PRIu64 "", exh1, id, \ + exh2 ? exh2->refId : 0); \ + goto _return1; \ + } else { \ + id = exh1->refId; \ + } \ + } else if (id < 0) { \ + tTrace("handle step3"); \ + goto _return2; \ + } \ + } while (0) int transInitBuffer(SConnBuffer* buf); int transClearBuffer(SConnBuffer* buf); int transDestroyBuffer(SConnBuffer* buf); @@ -349,21 +383,13 @@ void transDQDestroy(SDelayQueue* queue); int transDQSched(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_t timeoutMs); -void transPrintEpSet(SEpSet* pEpSet); +// void transPrintEpSet(SEpSet* pEpSet); bool transEpSetIsEqual(SEpSet* a, SEpSet* b); /* * init global func */ void transThreadOnce(); -// ref mgt -// handle -typedef struct SExHandle { - void* handle; - int64_t refId; - void* pThrd; -} SExHandle; - void transInitEnv(); int32_t transOpenExHandleMgt(int size); void transCloseExHandleMgt(int32_t mgt); diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index 1ec96f4a7ae2710f11db44988f002569ac39e3bf..4f7b19b5391c35e723a97d5f331db19297c256ef 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -79,6 +79,7 @@ void* rpcOpen(const SRpcInit* pInit) { return pRpc; } void rpcClose(void* arg) { + tInfo("start to close rpc"); SRpcInfo* pRpc = (SRpcInfo*)arg; (*taosCloseHandle[pRpc->connType])(pRpc->tcphandle); transCloseExHandleMgt(pRpc->refMgt); diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index e563bf0dddebb2e937c410b4de94bba281192320..7374d1fffc95ba06700a272a65826575d154ad17 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -1,4 +1,4 @@ -/* * Copyright (c) 2019 TAOS Data, Inc. +/** Copyright (c) 2019 TAOS Data, Inc. * * This program is free software: you can use, redistribute, and/or modify * it under the terms of the GNU Affero General Public License, version 3 @@ -25,7 +25,6 @@ typedef struct SCliConn { uv_write_t writeReq; void* hostThrd; - int hThrdIdx; SConnBuffer readBuf; STransQueue cliMsgs; @@ -36,6 +35,7 @@ typedef struct SCliConn { bool broken; // link broken or not ConnStatus status; // + int64_t refId; char* ip; uint32_t port; @@ -54,7 +54,7 @@ typedef struct SCliMsg { int sent; //(0: no send, 1: alread sent) } SCliMsg; -typedef struct SCliThrdObj { +typedef struct SCliThrd { TdThread thread; // tid int64_t pid; // pid uv_loop_t* loop; @@ -72,13 +72,13 @@ typedef struct SCliThrdObj { SCvtAddr cvtAddr; bool quit; -} SCliThrdObj; +} SCliThrd; typedef struct SCliObj { - char label[TSDB_LABEL_LEN]; - int32_t index; - int numOfThreads; - SCliThrdObj** pThreadObj; + char label[TSDB_LABEL_LEN]; + int32_t index; + int numOfThreads; + SCliThrd** pThreadObj; } SCliObj; typedef struct SConnList { @@ -106,11 +106,18 @@ static void cliAsyncCb(uv_async_t* handle); static int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg); -static SCliConn* cliCreateConn(SCliThrdObj* thrd); +static SCliConn* cliCreateConn(SCliThrd* thrd); static void cliDestroyConn(SCliConn* pConn, bool clear /*clear tcp handle or not*/); static void cliDestroy(uv_handle_t* handle); static void cliSend(SCliConn* pConn); +static bool cliIsEpsetUpdated(int32_t code, STransConnCtx* pCtx) { + if (code != 0) return false; + if (pCtx->retryCnt == 0) return false; + if (transEpSetIsEqual(&pCtx->epSet, &pCtx->origEpSet)) return false; + return true; +} + void cliMayCvtFqdnToIp(SEpSet* pEpSet, SCvtAddr* pCvtAddr); /* * set TCP connection timeout per-socket level @@ -122,14 +129,14 @@ static void cliHandleResp(SCliConn* conn); static void cliHandleExcept(SCliConn* conn); // handle req from app -static void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd); -static void cliHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd); -static void cliHandleRelease(SCliMsg* pMsg, SCliThrdObj* pThrd); -static void cliHandleUpdate(SCliMsg* pMsg, SCliThrdObj* pThrd); -static void (*cliAsyncHandle[])(SCliMsg* pMsg, SCliThrdObj* pThrd) = {cliHandleReq, cliHandleQuit, cliHandleRelease, - NULL, cliHandleUpdate}; - -static void cliSendQuit(SCliThrdObj* thrd); +static void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd); +static void cliHandleQuit(SCliMsg* pMsg, SCliThrd* pThrd); +static void cliHandleRelease(SCliMsg* pMsg, SCliThrd* pThrd); +static void cliHandleUpdate(SCliMsg* pMsg, SCliThrd* pThrd); +static void (*cliAsyncHandle[])(SCliMsg* pMsg, SCliThrd* pThrd) = {cliHandleReq, cliHandleQuit, cliHandleRelease, NULL, + cliHandleUpdate}; + +static void cliSendQuit(SCliThrd* thrd); static void destroyUserdata(STransMsg* userdata); static int cliRBChoseIdx(STrans* pTransInst); @@ -137,8 +144,8 @@ static int cliRBChoseIdx(STrans* pTransInst); static void destroyCmsg(SCliMsg* cmsg); static void transDestroyConnCtx(STransConnCtx* ctx); // thread obj -static SCliThrdObj* createThrdObj(); -static void destroyThrdObj(SCliThrdObj* pThrd); +static SCliThrd* createThrdObj(); +static void destroyThrdObj(SCliThrd* pThrd); static void cliWalkCb(uv_handle_t* handle, void* arg); @@ -154,7 +161,6 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) { destroyCmsg(pMsg); } } - #define CLI_RELEASE_UV(loop) \ do { \ uv_walk(loop, cliWalkCb, NULL); \ @@ -168,17 +174,23 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) { snprintf(key, sizeof(key), "%s:%d", ip, (int)port); \ } while (0) -#define CONN_HOST_THREAD_IDX(conn) (conn ? ((SCliConn*)conn)->hThrdIdx : -1) +#define CONN_HOST_THREAD_IDX1(idx, exh, refId, pThrd) \ + do { \ + if (exh == NULL) { \ + idx = -1; \ + } else { \ + ASYNC_CHECK_HANDLE((exh), refId); \ + pThrd = (SCliThrd*)(exh)->pThrd; \ + } \ + } while (0) #define CONN_PERSIST_TIME(para) (para * 1000 * 10) #define CONN_GET_HOST_THREAD(conn) (conn ? ((SCliConn*)conn)->hostThrd : NULL) -#define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrdObj*)(conn)->hostThrd)->pTransInst))->label) +#define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrd*)(conn)->hostThrd)->pTransInst))->label) #define CONN_SHOULD_RELEASE(conn, head) \ do { \ if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) { \ - int connStatus = conn->status; \ uint64_t ahandle = head->ahandle; \ CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle); \ - conn->status = ConnRelease; \ transClearBuffer(&conn->readBuf); \ transFreeMsg(transContFromHead((char*)head)); \ tDebug("%s conn %p receive release request, ref: %d", CONN_GET_INST_LABEL(conn), conn, T_REF_VAL_GET(conn)); \ @@ -187,9 +199,7 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) { } \ destroyCmsg(pMsg); \ cliReleaseUnfinishedMsg(conn); \ - if (connStatus != ConnInPool) { \ - addConnToPool(((SCliThrdObj*)conn->hostThrd)->pool, conn); \ - } \ + addConnToPool(((SCliThrd*)conn->hostThrd)->pool, conn); \ return; \ } \ } while (0) @@ -255,8 +265,25 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) { #define REQUEST_PERSIS_HANDLE(msg) ((msg)->info.persistHandle == 1) #define REQUEST_RELEASE_HANDLE(cmsg) ((cmsg)->type == Release) +#define EPSET_GET_SIZE(epSet) (epSet)->numOfEps #define EPSET_GET_INUSE_IP(epSet) ((epSet)->eps[(epSet)->inUse].fqdn) #define EPSET_GET_INUSE_PORT(epSet) ((epSet)->eps[(epSet)->inUse].port) +#define EPSET_FORWARD_INUSE(epSet) \ + do { \ + (epSet)->inUse = (++((epSet)->inUse)) % ((epSet)->numOfEps); \ + } while (0) +#define EPSET_DEBUG_STR(epSet, tbuf) \ + do { \ + int len = snprintf(tbuf, sizeof(tbuf), "epset:{"); \ + for (int i = 0; i < (epSet)->numOfEps; i++) { \ + if (i == (epSet)->numOfEps - 1) { \ + len += snprintf(tbuf + len, sizeof(tbuf) - len, "%d. %s:%d", i, (epSet)->eps[i].fqdn, (epSet)->eps[i].port); \ + } else { \ + len += snprintf(tbuf + len, sizeof(tbuf) - len, "%d. %s:%d, ", i, (epSet)->eps[i].fqdn, (epSet)->eps[i].port); \ + } \ + } \ + len += snprintf(tbuf + len, sizeof(tbuf) - len, "}, inUse:%d", (epSet)->inUse); \ + } while (0); static void* cliWorkThread(void* arg); @@ -271,8 +298,8 @@ _RETURN: return false; } void cliHandleResp(SCliConn* conn) { - SCliThrdObj* pThrd = conn->hostThrd; - STrans* pTransInst = pThrd->pTransInst; + SCliThrd* pThrd = conn->hostThrd; + STrans* pTransInst = pThrd->pTransInst; STransMsgHead* pHead = (STransMsgHead*)(conn->readBuf.buf); pHead->code = htonl(pHead->code); @@ -292,17 +319,9 @@ void cliHandleResp(SCliConn* conn) { if (CONN_NO_PERSIST_BY_APP(conn)) { pMsg = transQueuePop(&conn->cliMsgs); - pCtx = pMsg ? pMsg->ctx : NULL; - if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(conn)) { - transMsg.info.ahandle = transCtxDumpVal(&conn->ctx, transMsg.msgType); - if (transMsg.info.ahandle == NULL) { - transMsg.info.ahandle = transCtxDumpBrokenlinkVal(&conn->ctx, (int32_t*)&(transMsg.msgType)); - } - tDebug("%s conn %p construct ahandle %p, persist: 0", CONN_GET_INST_LABEL(conn), conn, transMsg.info.ahandle); - } else { - transMsg.info.ahandle = pCtx ? pCtx->ahandle : NULL; - tDebug("%s conn %p get ahandle %p, persist: 0", CONN_GET_INST_LABEL(conn), conn, transMsg.info.ahandle); - } + pCtx = pMsg->ctx; + transMsg.info.ahandle = pCtx->ahandle; + tDebug("%s conn %p get ahandle %p, persist: 0", CONN_GET_INST_LABEL(conn), conn, transMsg.info.ahandle); } else { uint64_t ahandle = (uint64_t)pHead->ahandle; CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle); @@ -324,26 +343,22 @@ void cliHandleResp(SCliConn* conn) { } // buf's mem alread translated to transMsg.pCont transClearBuffer(&conn->readBuf); - if (!CONN_NO_PERSIST_BY_APP(conn)) { - transMsg.info.handle = conn; + transMsg.info.handle = (void*)conn->refId; tDebug("%s conn %p ref by app", CONN_GET_INST_LABEL(conn), conn); } - // char buf[64] = {0}; - // TRACE_TO_STR(&transMsg.info.traceId, buf); + STraceId* trace = &transMsg.info.traceId; - tGTrace("conn %p %s received from %s:%d, local info: %s:%d, msg size: %d, code: %d", conn, TMSG_INFO(pHead->msgType), - taosInetNtoa(conn->addr.sin_addr), ntohs(conn->addr.sin_port), taosInetNtoa(conn->localAddr.sin_addr), - ntohs(conn->localAddr.sin_port), transMsg.contLen, transMsg.code); + tGTrace("%s conn %p %s received from %s:%d, local info: %s:%d, msg size: %d, code: %d", CONN_GET_INST_LABEL(conn), + conn, TMSG_INFO(pHead->msgType), taosInetNtoa(conn->addr.sin_addr), ntohs(conn->addr.sin_port), + taosInetNtoa(conn->localAddr.sin_addr), ntohs(conn->localAddr.sin_port), transMsg.contLen, transMsg.code); if (pCtx == NULL && CONN_NO_PERSIST_BY_APP(conn)) { - tDebug("%s except, server continue send while cli ignore it", CONN_GET_INST_LABEL(conn)); - // transUnrefCliHandle(conn); + tDebug("%s except, conn %p read while cli ignore it", CONN_GET_INST_LABEL(conn), conn); return; } if (CONN_RELEASE_BY_SERVER(conn) && transMsg.info.ahandle == NULL) { - tDebug("%s except, server continue send while cli ignore it", CONN_GET_INST_LABEL(conn)); - // transUnrefCliHandle(conn); + tDebug("%s except, conn %p read while cli ignore it", CONN_GET_INST_LABEL(conn), conn); return; } @@ -375,9 +390,9 @@ void cliHandleExcept(SCliConn* pConn) { return; } } - SCliThrdObj* pThrd = pConn->hostThrd; - STrans* pTransInst = pThrd->pTransInst; - bool once = false; + SCliThrd* pThrd = pConn->hostThrd; + STrans* pTransInst = pThrd->pTransInst; + bool once = false; do { SCliMsg* pMsg = transQueuePop(&pConn->cliMsgs); if (pMsg == NULL && once) { @@ -389,7 +404,6 @@ void cliHandleExcept(SCliConn* pConn) { transMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL; transMsg.msgType = pMsg ? pMsg->msg.msgType + 1 : 0; transMsg.info.ahandle = NULL; - transMsg.info.handle = pConn; if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(pConn)) { transMsg.info.ahandle = transCtxDumpVal(&pConn->ctx, transMsg.msgType); @@ -414,15 +428,15 @@ void cliHandleExcept(SCliConn* pConn) { return; } destroyCmsg(pMsg); - tTrace("%s conn %p start to destroy", CONN_GET_INST_LABEL(pConn), pConn); + tTrace("%s conn %p start to destroy, ref:%d", CONN_GET_INST_LABEL(pConn), pConn, T_REF_VAL_GET(pConn)); } while (!transQueueEmpty(&pConn->cliMsgs)); transUnrefCliHandle(pConn); } void cliTimeoutCb(uv_timer_t* handle) { - SCliThrdObj* pThrd = handle->data; - STrans* pTransInst = pThrd->pTransInst; - int64_t currentTime = pThrd->nextTimeout; + SCliThrd* pThrd = handle->data; + STrans* pTransInst = pThrd->pTransInst; + int64_t currentTime = pThrd->nextTimeout; tTrace("%s conn timeout, try to remove expire conn from conn pool", pTransInst->label); SConnList* p = taosHashIterate((SHashObj*)pThrd->pool, NULL); @@ -487,10 +501,26 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) { assert(h == &conn->conn); return conn; } +static void allocConnRef(SCliConn* conn, bool update) { + if (update) { + transRemoveExHandle(refMgt, conn->refId); + } + SExHandle* exh = taosMemoryCalloc(1, sizeof(SExHandle)); + exh->handle = conn; + exh->pThrd = conn->hostThrd; + exh->refId = transAddExHandle(refMgt, exh); + conn->refId = exh->refId; +} static void addConnToPool(void* pool, SCliConn* conn) { - SCliThrdObj* thrd = conn->hostThrd; + if (conn->status == ConnInPool) { + // assert(0); + return; + } + SCliThrd* thrd = conn->hostThrd; CONN_HANDLE_THREAD_QUIT(thrd); + allocConnRef(conn, true); + STrans* pTransInst = thrd->pTransInst; conn->expireTime = taosGetTimestampMs() + CONN_PERSIST_TIME(pTransInst->idleTime); transQueueClear(&conn->cliMsgs); @@ -499,7 +529,7 @@ static void addConnToPool(void* pool, SCliConn* conn) { char key[128] = {0}; CONN_CONSTRUCT_HASH_KEY(key, conn->ip, conn->port); - tTrace("%s conn %p added to conn pool, read buf cap: %d", CONN_GET_INST_LABEL(conn), conn, conn->readBuf.cap); + tTrace("%s conn %p added to conn pool, read buf cap:%d", CONN_GET_INST_LABEL(conn), conn, conn->readBuf.cap); SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key)); // list already create before @@ -540,13 +570,14 @@ static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) { return; } if (nread < 0) { - tError("%s conn %p read error: %s", CONN_GET_INST_LABEL(conn), conn, uv_err_name(nread)); + tError("%s conn %p read error: %s, ref: %d", CONN_GET_INST_LABEL(conn), conn, uv_err_name(nread), + T_REF_VAL_GET(conn)); conn->broken = true; cliHandleExcept(conn); } } -static SCliConn* cliCreateConn(SCliThrdObj* pThrd) { +static SCliConn* cliCreateConn(SCliThrd* pThrd) { SCliConn* conn = taosMemoryCalloc(1, sizeof(SCliConn)); // read/write stream handle conn->stream = (uv_stream_t*)taosMemoryMalloc(sizeof(uv_tcp_t)); @@ -562,11 +593,16 @@ static SCliConn* cliCreateConn(SCliThrdObj* pThrd) { conn->status = ConnNormal; conn->broken = 0; transRefCliHandle(conn); + + allocConnRef(conn, false); + return conn; } static void cliDestroyConn(SCliConn* conn, bool clear) { tTrace("%s conn %p remove from conn pool", CONN_GET_INST_LABEL(conn), conn); QUEUE_REMOVE(&conn->conn); + QUEUE_INIT(&conn->conn); + transRemoveExHandle(refMgt, conn->refId); if (clear) { uv_close((uv_handle_t*)conn->stream, cliDestroy); } @@ -593,7 +629,7 @@ static bool cliHandleNoResp(SCliConn* conn) { } if (res == true) { if (cliMaySendCachedMsg(conn) == false) { - SCliThrdObj* thrd = conn->hostThrd; + SCliThrd* thrd = conn->hostThrd; addConnToPool(thrd->pool, conn); } } @@ -629,8 +665,8 @@ void cliSend(SCliConn* pConn) { STransConnCtx* pCtx = pCliMsg->ctx; - SCliThrdObj* pThrd = pConn->hostThrd; - STrans* pTransInst = pThrd->pTransInst; + SCliThrd* pThrd = pConn->hostThrd; + STrans* pTransInst = pThrd->pTransInst; STransMsg* pMsg = (STransMsg*)(&pCliMsg->msg); if (pMsg->pCont == 0) { @@ -651,12 +687,10 @@ void cliSend(SCliConn* pConn) { uv_buf_t wb = uv_buf_init((char*)pHead, msgLen); - // char buf[64] = {0}; - // TRACE_TO_STR(&pMsg->info.traceId, buf); STraceId* trace = &pMsg->info.traceId; - tGTrace("conn %p %s is sent to %s:%d, local info %s:%d", pConn, TMSG_INFO(pHead->msgType), - taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->localAddr.sin_addr), - ntohs(pConn->localAddr.sin_port)); + tGTrace("%s conn %p %s is sent to %s:%d, local info %s:%d", CONN_GET_INST_LABEL(pConn), pConn, + TMSG_INFO(pHead->msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), + taosInetNtoa(pConn->localAddr.sin_addr), ntohs(pConn->localAddr.sin_port)); if (pHead->persist == 1) { CONN_SET_PERSIST_BY_APP(pConn); @@ -664,7 +698,6 @@ void cliSend(SCliConn* pConn) { pConn->writeReq.data = pConn; uv_write(&pConn->writeReq, (uv_stream_t*)pConn->stream, &wb, 1, cliSendCb); - return; _RETURN: return; @@ -690,19 +723,24 @@ void cliConnCb(uv_connect_t* req, int status) { cliSend(pConn); } -static void cliHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd) { +static void cliHandleQuit(SCliMsg* pMsg, SCliThrd* pThrd) { + pThrd->quit = true; tDebug("cli work thread %p start to quit", pThrd); destroyCmsg(pMsg); destroyConnPool(pThrd->pool); uv_timer_stop(&pThrd->timer); uv_walk(pThrd->loop, cliWalkCb, NULL); - pThrd->quit = true; - // uv_stop(pThrd->loop); } -static void cliHandleRelease(SCliMsg* pMsg, SCliThrdObj* pThrd) { - SCliConn* conn = pMsg->msg.info.handle; +static void cliHandleRelease(SCliMsg* pMsg, SCliThrd* pThrd) { + int64_t refId = (int64_t)(pMsg->msg.info.handle); + SExHandle* exh = transAcquireExHandle(refMgt, refId); + if (exh == NULL) { + tDebug("%" PRId64 " already release", refId); + } + + SCliConn* conn = exh->handle; tDebug("%s conn %p start to release to inst", CONN_GET_INST_LABEL(conn), conn); if (T_REF_VAL_GET(conn) == 2) { @@ -711,33 +749,37 @@ static void cliHandleRelease(SCliMsg* pMsg, SCliThrdObj* pThrd) { return; } cliSend(conn); - } else { - // conn already broken down - transUnrefCliHandle(conn); } } -static void cliHandleUpdate(SCliMsg* pMsg, SCliThrdObj* pThrd) { +static void cliHandleUpdate(SCliMsg* pMsg, SCliThrd* pThrd) { STransConnCtx* pCtx = pMsg->ctx; - pThrd->cvtAddr = pCtx->cvtAddr; destroyCmsg(pMsg); } -SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrdObj* pThrd) { +SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrd* pThrd, bool* ignore) { SCliConn* conn = NULL; - if (pMsg->msg.info.handle != NULL) { - conn = (SCliConn*)(pMsg->msg.info.handle); - if (conn != NULL) { - tTrace("%s conn %p reused", CONN_GET_INST_LABEL(conn), conn); - } - } else { - STransConnCtx* pCtx = pMsg->ctx; - conn = getConnFromPool(pThrd->pool, EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet)); - if (conn != NULL) { - tTrace("%s conn %p get from conn pool", CONN_GET_INST_LABEL(conn), conn); + int64_t refId = (int64_t)(pMsg->msg.info.handle); + if (refId != 0) { + SExHandle* exh = transAcquireExHandle(refMgt, refId); + if (exh == NULL) { + *ignore = true; + destroyCmsg(pMsg); + return NULL; + // assert(0); } else { - tTrace("%s not found conn in conn pool %p", ((STrans*)pThrd->pTransInst)->label, pThrd->pool); + conn = exh->handle; + transReleaseExHandle(refMgt, refId); } + return conn; + }; + + STransConnCtx* pCtx = pMsg->ctx; + conn = getConnFromPool(pThrd->pool, EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet)); + if (conn != NULL) { + tTrace("%s conn %p get from conn pool:%p", CONN_GET_INST_LABEL(conn), conn, pThrd->pool); + } else { + tTrace("%s not found conn in conn pool:%p", ((STrans*)pThrd->pTransInst)->label, pThrd->pool); } return conn; } @@ -752,22 +794,19 @@ void cliMayCvtFqdnToIp(SEpSet* pEpSet, SCvtAddr* pCvtAddr) { } } } -void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { - uint64_t et = taosGetTimestampUs(); - uint64_t el = et - pMsg->st; - // tTrace("%s cli msg tran time cost: %" PRIu64 "us", ((STrans*)pThrd->pTransInst)->label, el); - +void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { STransConnCtx* pCtx = pMsg->ctx; STrans* pTransInst = pThrd->pTransInst; cliMayCvtFqdnToIp(&pCtx->epSet, &pThrd->cvtAddr); - transPrintEpSet(&pCtx->epSet); - - SCliConn* conn = cliGetConn(pMsg, pThrd); + // transPrintEpSet(&pCtx->epSet); + bool ignore = false; + SCliConn* conn = cliGetConn(pMsg, pThrd, &ignore); + if (ignore == true) { + return; + } if (conn != NULL) { - conn->hThrdIdx = pCtx->hThrdIdx; - transCtxMerge(&conn->ctx, &pCtx->appCtx); transQueuePush(&conn->cliMsgs, pMsg); cliSend(conn); @@ -776,7 +815,6 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { transCtxMerge(&conn->ctx, &pCtx->appCtx); transQueuePush(&conn->cliMsgs, pMsg); - conn->hThrdIdx = pCtx->hThrdIdx; conn->ip = strdup(EPSET_GET_INUSE_IP(&pCtx->epSet)); conn->port = EPSET_GET_INUSE_PORT(&pCtx->epSet); @@ -784,7 +822,7 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { if (ret) { tError("%s conn %p failed to set conn option, errmsg %s", transLabel(pTransInst), conn, uv_err_name(ret)); } - int fd = taosCreateSocketWithTimeOutOpt(TRANS_CONN_TIMEOUT); + int32_t fd = taosCreateSocketWithTimeout(TRANS_CONN_TIMEOUT); if (fd == -1) { tTrace("%s conn %p failed to create socket", transLabel(pTransInst), conn); cliHandleExcept(conn); @@ -807,9 +845,9 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { } } static void cliAsyncCb(uv_async_t* handle) { - SAsyncItem* item = handle->data; - SCliThrdObj* pThrd = item->pThrd; - SCliMsg* pMsg = NULL; + SAsyncItem* item = handle->data; + SCliThrd* pThrd = item->pThrd; + SCliMsg* pMsg = NULL; // batch process to avoid to lock/unlock frequently queue wq; @@ -835,7 +873,7 @@ static void cliAsyncCb(uv_async_t* handle) { } static void* cliWorkThread(void* arg) { - SCliThrdObj* pThrd = (SCliThrdObj*)arg; + SCliThrd* pThrd = (SCliThrd*)arg; pThrd->pid = taosGetSelfPthreadId(); setThreadName("trans-cli-work"); uv_run(pThrd->loop, UV_RUN_DEFAULT); @@ -848,10 +886,10 @@ void* transInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, STrans* pTransInst = shandle; memcpy(cli->label, label, strlen(label)); cli->numOfThreads = numOfThreads; - cli->pThreadObj = (SCliThrdObj**)taosMemoryCalloc(cli->numOfThreads, sizeof(SCliThrdObj*)); + cli->pThreadObj = (SCliThrd**)taosMemoryCalloc(cli->numOfThreads, sizeof(SCliThrd*)); for (int i = 0; i < cli->numOfThreads; i++) { - SCliThrdObj* pThrd = createThrdObj(); + SCliThrd* pThrd = createThrdObj(); pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pTransInst->idleTime); pThrd->pTransInst = shandle; @@ -885,8 +923,8 @@ static void destroyCmsg(SCliMsg* pMsg) { taosMemoryFree(pMsg); } -static SCliThrdObj* createThrdObj() { - SCliThrdObj* pThrd = (SCliThrdObj*)taosMemoryCalloc(1, sizeof(SCliThrdObj)); +static SCliThrd* createThrdObj() { + SCliThrd* pThrd = (SCliThrd*)taosMemoryCalloc(1, sizeof(SCliThrd)); QUEUE_INIT(&pThrd->msg); taosThreadMutexInit(&pThrd->msgMtx, NULL); @@ -904,7 +942,7 @@ static SCliThrdObj* createThrdObj() { pThrd->quit = false; return pThrd; } -static void destroyThrdObj(SCliThrdObj* pThrd) { +static void destroyThrdObj(SCliThrd* pThrd) { if (pThrd == NULL) { return; } @@ -925,7 +963,7 @@ static void transDestroyConnCtx(STransConnCtx* ctx) { taosMemoryFree(ctx); } -void cliSendQuit(SCliThrdObj* thrd) { +void cliSendQuit(SCliThrd* thrd) { // cli can stop gracefully SCliMsg* msg = taosMemoryCalloc(1, sizeof(SCliMsg)); msg->type = Quit; @@ -938,7 +976,10 @@ void cliWalkCb(uv_handle_t* handle, void* arg) { } int cliRBChoseIdx(STrans* pTransInst) { - int64_t index = pTransInst->index; + int8_t index = pTransInst->index; + if (pTransInst->numOfThreads == 0) { + return -1; + } if (pTransInst->index++ >= pTransInst->numOfThreads) { pTransInst->index = 0; } @@ -946,82 +987,71 @@ int cliRBChoseIdx(STrans* pTransInst) { } static void doDelayTask(void* param) { STaskArg* arg = param; + SCliMsg* pMsg = arg->param1; + SCliThrd* pThrd = arg->param2; + taosMemoryFree(arg); - SCliMsg* pMsg = arg->param1; - SCliThrdObj* pThrd = arg->param2; cliHandleReq(pMsg, pThrd); +} - taosMemoryFree(arg); +static void cliSchedMsgToNextNode(SCliMsg* pMsg, SCliThrd* pThrd) { + STransConnCtx* pCtx = pMsg->ctx; + + STraceId* trace = &pMsg->msg.info.traceId; + char tbuf[256] = {0}; + EPSET_DEBUG_STR(&pCtx->epSet, tbuf); + tGTrace("%s retry on next node, use %s, retryCnt:%d, limit:%d", transLabel(pThrd->pTransInst), tbuf, + pCtx->retryCnt + 1, pCtx->retryLimit); + + STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg)); + arg->param1 = pMsg; + arg->param2 = pThrd; + transDQSched(pThrd->delayQueue, doDelayTask, arg, TRANS_RETRY_INTERVAL); +} + +void cliCompareAndSwap(int8_t* val, int8_t exp, int8_t newVal) { + if (*val != exp) { + *val = newVal; + } } int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { - SCliThrdObj* pThrd = pConn->hostThrd; - STrans* pTransInst = pThrd->pTransInst; + SCliThrd* pThrd = pConn->hostThrd; + STrans* pTransInst = pThrd->pTransInst; if (pMsg == NULL || pMsg->ctx == NULL) { tTrace("%s conn %p handle resp", pTransInst->label, pConn); pTransInst->cfp(pTransInst->parent, pResp, NULL); return 0; } - - STransConnCtx* pCtx = pMsg->ctx; - SEpSet* pEpSet = &pCtx->epSet; - - if (pCtx->retryCount == 0) { - pCtx->origEpSet = pCtx->epSet; - } /* - * upper layer handle retry if code equal TSDB_CODE_RPC_NETWORK_UNAVAIL + * no retry + * 1. query conn + * 2. rpc thread already receive quit msg */ - tmsg_t msgType = pCtx->msgType; - if ((pTransInst->retry != NULL && pEpSet->numOfEps > 1 && (pTransInst->retry(pResp->code))) || - (pResp->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || pResp->code == TSDB_CODE_APP_NOT_READY || - pResp->code == TSDB_CODE_NODE_NOT_DEPLOYED || pResp->code == TSDB_CODE_SYN_NOT_LEADER)) { + STransConnCtx* pCtx = pMsg->ctx; + int32_t code = pResp->code; + if (pTransInst->retry != NULL && pTransInst->retry(code)) { pMsg->sent = 0; - tTrace("try to send req to next node"); - pMsg->st = taosGetTimestampUs(); - pCtx->retryCount += 1; - if (pResp->code == TSDB_CODE_RPC_NETWORK_UNAVAIL && pCtx->setMaxRetry == false) { - if (pCtx->retryCount < pEpSet->numOfEps * 3) { - pEpSet->inUse = (++pEpSet->inUse) % pEpSet->numOfEps; - if (pThrd->quit == false) { - STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg)); - arg->param1 = pMsg; - arg->param2 = pThrd; - transDQSched(pThrd->delayQueue, doDelayTask, arg, TRANS_RETRY_INTERVAL); - transPrintEpSet(pEpSet); - tTrace("%s use local epset, inUse: %d, retry count:%d, limit: %d", pTransInst->label, pEpSet->inUse, - pCtx->retryCount + 1, pEpSet->numOfEps * 3); - - transUnrefCliHandle(pConn); - return -1; - } - } - } else if (pCtx->retryCount < TRANS_RETRY_COUNT_LIMIT) { - pCtx->setMaxRetry = true; - if (pResp->contLen == 0) { - pEpSet->inUse = (++pEpSet->inUse) % pEpSet->numOfEps; - transPrintEpSet(&pCtx->epSet); - tTrace("%s use local epset, inUse: %d, retry count:%d, limit: %d", pTransInst->label, pEpSet->inUse, - pCtx->retryCount + 1, TRANS_RETRY_COUNT_LIMIT); - } else { - SEpSet epSet = {0}; - tDeserializeSEpSet(pResp->pCont, pResp->contLen, &epSet); - pCtx->epSet = epSet; - - transPrintEpSet(&pCtx->epSet); - tTrace("%s use remote epset, inUse: %d, retry count:%d, limit: %d", pTransInst->label, pEpSet->inUse, - pCtx->retryCount + 1, TRANS_RETRY_COUNT_LIMIT); + pCtx->retryCnt += 1; + if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL) { + cliCompareAndSwap(&pCtx->retryLimit, TRANS_RETRY_COUNT_LIMIT, EPSET_GET_SIZE(&pCtx->epSet) * 3); + if (pCtx->retryCnt < pCtx->retryLimit) { + transUnrefCliHandle(pConn); + EPSET_FORWARD_INUSE(&pCtx->epSet); + cliSchedMsgToNextNode(pMsg, pThrd); + return -1; } - if (pThrd->quit == false) { - if (pResp->code != TSDB_CODE_RPC_NETWORK_UNAVAIL) { - if (pConn->status != ConnInPool) addConnToPool(pThrd->pool, pConn); + } else { + cliCompareAndSwap(&pCtx->retryLimit, TRANS_RETRY_COUNT_LIMIT, TRANS_RETRY_COUNT_LIMIT); + if (pCtx->retryCnt < pCtx->retryLimit) { + addConnToPool(pThrd->pool, pConn); + if (pResp->contLen == 0) { + EPSET_FORWARD_INUSE(&pCtx->epSet); } else { - transUnrefCliHandle(pConn); + tDeserializeSEpSet(pResp->pCont, pResp->contLen, &pCtx->epSet); } - STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg)); - arg->param1 = pMsg; - arg->param2 = pThrd; - transDQSched(pThrd->delayQueue, doDelayTask, arg, TRANS_RETRY_INTERVAL); + transFreeMsg(pResp->pCont); + cliSchedMsgToNextNode(pMsg, pThrd); return -1; } } @@ -1029,20 +1059,20 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { STraceId* trace = &pResp->info.traceId; if (pCtx->pSem != NULL) { - tGTrace("conn %p(sync) handle resp", pConn); + tGTrace("%s conn %p(sync) handle resp", CONN_GET_INST_LABEL(pConn), pConn); if (pCtx->pRsp == NULL) { - tGTrace("conn %p(sync) failed to resp, ignore", pConn); + tGTrace("%s conn %p(sync) failed to resp, ignore", CONN_GET_INST_LABEL(pConn), pConn); } else { memcpy((char*)pCtx->pRsp, (char*)pResp, sizeof(*pResp)); } tsem_post(pCtx->pSem); pCtx->pRsp = NULL; } else { - tGTrace("conn %p handle resp", pConn); - if (pResp->code != 0 || pCtx->retryCount == 0 || transEpSetIsEqual(&pCtx->epSet, &pCtx->origEpSet)) { + tGTrace("%s conn %p handle resp", CONN_GET_INST_LABEL(pConn), pConn); + if (!cliIsEpsetUpdated(code, pCtx)) { pTransInst->cfp(pTransInst->parent, pResp, NULL); } else { - pTransInst->cfp(pTransInst->parent, pResp, pEpSet); + pTransInst->cfp(pTransInst->parent, pResp, &pCtx->epSet); } } return 0; @@ -1074,38 +1104,58 @@ void transUnrefCliHandle(void* handle) { return; } int ref = T_REF_DEC((SCliConn*)handle); - tTrace("%s conn %p ref %d", CONN_GET_INST_LABEL((SCliConn*)handle), handle, ref); + tTrace("%s conn %p ref:%d", CONN_GET_INST_LABEL((SCliConn*)handle), handle, ref); if (ref == 0) { cliDestroyConn((SCliConn*)handle, true); } } +SCliThrd* transGetWorkThrdFromHandle(int64_t handle) { + SCliThrd* pThrd = NULL; + SExHandle* exh = transAcquireExHandle(refMgt, handle); + if (exh == NULL) { + return NULL; + } + pThrd = exh->pThrd; + transReleaseExHandle(refMgt, handle); + return pThrd; +} +SCliThrd* transGetWorkThrd(STrans* trans, int64_t handle) { + if (handle == 0) { + int idx = cliRBChoseIdx(trans); + if (idx < 0) return NULL; + return ((SCliObj*)trans->tcphandle)->pThreadObj[idx]; + } + return transGetWorkThrdFromHandle(handle); +} void transReleaseCliHandle(void* handle) { - SCliThrdObj* thrd = CONN_GET_HOST_THREAD(handle); - if (thrd == NULL) { + int idx = -1; + SCliThrd* pThrd = transGetWorkThrdFromHandle((int64_t)handle); + if (pThrd == NULL) { return; } - STransMsg tmsg = {.info.handle = handle}; SCliMsg* cmsg = taosMemoryCalloc(1, sizeof(SCliMsg)); cmsg->msg = tmsg; cmsg->type = Release; - transSendAsync(thrd->asyncPool, &cmsg->q); + transSendAsync(pThrd->asyncPool, &cmsg->q); + return; } void transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransCtx* ctx) { - STrans* pTransInst = (STrans*)shandle; - int idx = CONN_HOST_THREAD_IDX((SCliConn*)pReq->info.handle); - if (idx == -1) { - idx = cliRBChoseIdx(pTransInst); + STrans* pTransInst = (STrans*)shandle; + SCliThrd* pThrd = transGetWorkThrd(pTransInst, (int64_t)pReq->info.handle); + if (pThrd == NULL) { + transFreeMsg(pReq->pCont); + return; } + TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64()); STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx)); pCtx->epSet = *pEpSet; pCtx->ahandle = pReq->info.ahandle; pCtx->msgType = pReq->msgType; - pCtx->hThrdIdx = idx; if (ctx != NULL) { pCtx->appCtx = *ctx; @@ -1118,19 +1168,19 @@ void transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STra cliMsg->st = taosGetTimestampUs(); cliMsg->type = Normal; - SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[idx]; - STraceId* trace = &pReq->info.traceId; - tGTrace("%s send request at thread:%08" PRId64 ", dst: %s:%d, app:%p", transLabel(pTransInst), thrd->pid, + tGTrace("%s send request at thread:%08" PRId64 ", dst: %s:%d, app:%p", transLabel(pTransInst), pThrd->pid, EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle); - ASSERT(transSendAsync(thrd->asyncPool, &(cliMsg->q)) == 0); + ASSERT(transSendAsync(pThrd->asyncPool, &(cliMsg->q)) == 0); + return; } void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp) { - STrans* pTransInst = (STrans*)shandle; - int idx = CONN_HOST_THREAD_IDX(pReq->info.handle); - if (idx == -1) { - idx = cliRBChoseIdx(pTransInst); + STrans* pTransInst = (STrans*)shandle; + SCliThrd* pThrd = transGetWorkThrd(pTransInst, (int64_t)pReq->info.handle); + if (pThrd == NULL) { + transFreeMsg(pReq->pCont); + return; } tsem_t* sem = taosMemoryCalloc(1, sizeof(tsem_t)); tsem_init(sem, 0, 0); @@ -1139,9 +1189,9 @@ void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransM STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx)); pCtx->epSet = *pEpSet; + pCtx->origEpSet = *pEpSet; pCtx->ahandle = pReq->info.ahandle; pCtx->msgType = pReq->msgType; - pCtx->hThrdIdx = idx; pCtx->pSem = sem; pCtx->pRsp = pRsp; @@ -1151,22 +1201,21 @@ void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransM cliMsg->st = taosGetTimestampUs(); cliMsg->type = Normal; - SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[idx]; - STraceId* trace = &pReq->info.traceId; - tGTrace("%s send request at thread:%08" PRId64 ", dst: %s:%d, app:%p", transLabel(pTransInst), thrd->pid, + tGTrace("%s send request at thread:%08" PRId64 ", dst: %s:%d, app:%p", transLabel(pTransInst), pThrd->pid, EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle); - transSendAsync(thrd->asyncPool, &(cliMsg->q)); + transSendAsync(pThrd->asyncPool, &(cliMsg->q)); tsem_wait(sem); tsem_destroy(sem); taosMemoryFree(sem); + return; } /* * **/ -void transSetDefaultAddr(void* ahandle, const char* ip, const char* fqdn) { - STrans* pTransInst = ahandle; +void transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) { + STrans* pTransInst = shandle; SCvtAddr cvtAddr = {0}; if (ip != NULL && fqdn != NULL) { @@ -1176,14 +1225,13 @@ void transSetDefaultAddr(void* ahandle, const char* ip, const char* fqdn) { } for (int i = 0; i < pTransInst->numOfThreads; i++) { STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx)); - pCtx->hThrdIdx = i; pCtx->cvtAddr = cvtAddr; SCliMsg* cliMsg = taosMemoryCalloc(1, sizeof(SCliMsg)); cliMsg->ctx = pCtx; cliMsg->type = Update; - SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[i]; + SCliThrd* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[i]; tDebug("%s update epset at thread:%08" PRId64 "", pTransInst->label, thrd->pid); transSendAsync(thrd->asyncPool, &(cliMsg->q)); diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 8cd7f9d827396fbb52b7e4c926f77ada47429ad5..bff7d79bd38fd6cd8afabf6b005e17477090bcf8 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -455,16 +455,16 @@ void transPrintEpSet(SEpSet* pEpSet) { return; } char buf[512] = {0}; - int len = snprintf(buf, sizeof(buf), "epset { "); + int len = snprintf(buf, sizeof(buf), "epset:{"); for (int i = 0; i < pEpSet->numOfEps; i++) { if (i == pEpSet->numOfEps - 1) { - len += snprintf(buf + len, sizeof(buf) - len, "%d. %s:%d ", i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port); + len += snprintf(buf + len, sizeof(buf) - len, "%d. %s:%d", i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port); } else { len += snprintf(buf + len, sizeof(buf) - len, "%d. %s:%d, ", i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port); } } len += snprintf(buf + len, sizeof(buf) - len, "}"); - tTrace("%s, inUse: %d", buf, pEpSet->inUse); + tTrace("%s, inUse:%d", buf, pEpSet->inUse); } bool transEpSetIsEqual(SEpSet* a, SEpSet* b) { if (a->numOfEps != b->numOfEps || a->inUse != b->inUse) { diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 593a790a2121f12d30e17ee0adb1b5de9c21ffb7..892d32696eba25a909b60ba0f5a37368c2b7186c 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -65,7 +65,7 @@ typedef struct SSvrMsg { STransMsgType type; } SSvrMsg; -typedef struct SWorkThrdObj { +typedef struct SWorkThrd { TdThread thread; uv_connect_t connect_req; uv_pipe_t* pipe; @@ -78,7 +78,7 @@ typedef struct SWorkThrdObj { queue conn; void* pTransInst; bool quit; -} SWorkThrdObj; +} SWorkThrd; typedef struct SServerObj { TdThread thread; @@ -86,10 +86,10 @@ typedef struct SServerObj { uv_loop_t* loop; // work thread info - int workerIdx; - int numOfThreads; - int numOfWorkerReady; - SWorkThrdObj** pThreadObj; + int workerIdx; + int numOfThreads; + int numOfWorkerReady; + SWorkThrd** pThreadObj; uv_pipe_t pipeListen; uv_pipe_t** pipe; @@ -133,14 +133,14 @@ static SSvrConn* createConn(void* hThrd); static void destroyConn(SSvrConn* conn, bool clear /*clear handle or not*/); static void destroyConnRegArg(SSvrConn* conn); -static int reallocConnRefHandle(SSvrConn* conn); +static int reallocConnRef(SSvrConn* conn); -static void uvHandleQuit(SSvrMsg* msg, SWorkThrdObj* thrd); -static void uvHandleRelease(SSvrMsg* msg, SWorkThrdObj* thrd); -static void uvHandleResp(SSvrMsg* msg, SWorkThrdObj* thrd); -static void uvHandleRegister(SSvrMsg* msg, SWorkThrdObj* thrd); -static void (*transAsyncHandle[])(SSvrMsg* msg, SWorkThrdObj* thrd) = {uvHandleResp, uvHandleQuit, uvHandleRelease, - uvHandleRegister, NULL}; +static void uvHandleQuit(SSvrMsg* msg, SWorkThrd* thrd); +static void uvHandleRelease(SSvrMsg* msg, SWorkThrd* thrd); +static void uvHandleResp(SSvrMsg* msg, SWorkThrd* thrd); +static void uvHandleRegister(SSvrMsg* msg, SWorkThrd* thrd); +static void (*transAsyncHandle[])(SSvrMsg* msg, SWorkThrd* thrd) = {uvHandleResp, uvHandleQuit, uvHandleRelease, + uvHandleRegister, NULL}; static int32_t exHandlesMgt; @@ -160,7 +160,7 @@ static void* transWorkerThread(void* arg); static void* transAcceptThread(void* arg); // add handle loop -static bool addHandleToWorkloop(SWorkThrdObj* pThrd, char* pipeName); +static bool addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName); static bool addHandleToAcceptloop(void* arg); #define CONN_SHOULD_RELEASE(conn, head) \ @@ -176,7 +176,7 @@ static bool addHandleToAcceptloop(void* arg); srvMsg->msg = tmsg; \ srvMsg->type = Release; \ srvMsg->pConn = conn; \ - reallocConnRefHandle(conn); \ + reallocConnRef(conn); \ if (!transQueuePush(&conn->srvMsgs, srvMsg)) { \ return; \ } \ @@ -206,32 +206,6 @@ static bool addHandleToAcceptloop(void* arg); } \ } while (0) -#define ASYNC_CHECK_HANDLE(exh1, refId) \ - do { \ - if (refId > 0) { \ - tTrace("handle step1"); \ - SExHandle* exh2 = transAcquireExHandle(refMgt, refId); \ - if (exh2 == NULL || refId != exh2->refId) { \ - tTrace("handle %p except, may already freed, ignore msg, ref1: %" PRIu64 ", ref2 : %" PRIu64 "", exh1, \ - exh2 ? exh2->refId : 0, refId); \ - goto _return1; \ - } \ - } else if (refId == 0) { \ - tTrace("handle step2"); \ - SExHandle* exh2 = transAcquireExHandle(refMgt, refId); \ - if (exh2 == NULL || refId != exh2->refId) { \ - tTrace("handle %p except, may already freed, ignore msg, ref1: %" PRIu64 ", ref2 : %" PRIu64 "", exh1, refId, \ - exh2 ? exh2->refId : 0); \ - goto _return1; \ - } else { \ - refId = exh1->refId; \ - } \ - } else if (refId < 0) { \ - tTrace("handle step3"); \ - goto _return2; \ - } \ - } while (0) - void uvAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { SSvrConn* conn = handle->data; SConnBuffer* pBuf = &conn->readBuf; @@ -259,7 +233,7 @@ static void uvHandleReq(SSvrConn* pConn) { // wreq->data = pConn; // uv_read_stop((uv_stream_t*)pConn->pTcp); // transRefSrvHandle(pConn); - // uv_queue_work(((SWorkThrdObj*)pConn->hostThrd)->loop, wreq, uvWorkDoTask, uvWorkAfterTask); + // uv_queue_work(((SWorkThrd*)pConn->hostThrd)->loop, wreq, uvWorkDoTask, uvWorkAfterTask); CONN_SHOULD_RELEASE(pConn, pHead); @@ -379,7 +353,7 @@ void uvOnSendCb(uv_write_t* req, int status) { // if (msg->type == Release && conn->status != ConnNormal) { // conn->status = ConnNormal; // transUnrefSrvHandle(conn); - // reallocConnRefHandle(conn); + // reallocConnRef(conn); // destroySmsg(msg); // transQueueClear(&conn->srvMsgs); // return; @@ -448,6 +422,8 @@ static void uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) { transUnrefSrvHandle(pConn); } else { pHead->msgType = pMsg->msgType; + if (pHead->msgType == 0 && transMsgLenFromCont(pMsg->contLen) == sizeof(STransMsgHead)) + pHead->msgType = pConn->inType + 1; } } @@ -504,7 +480,7 @@ static void destroySmsg(SSvrMsg* smsg) { transFreeMsg(smsg->msg.pCont); taosMemoryFree(smsg); } -static void destroyAllConn(SWorkThrdObj* pThrd) { +static void destroyAllConn(SWorkThrd* pThrd) { tTrace("thread %p destroy all conn ", pThrd); while (!QUEUE_IS_EMPTY(&pThrd->conn)) { queue* h = QUEUE_HEAD(&pThrd->conn); @@ -519,10 +495,10 @@ static void destroyAllConn(SWorkThrdObj* pThrd) { } } void uvWorkerAsyncCb(uv_async_t* handle) { - SAsyncItem* item = handle->data; - SWorkThrdObj* pThrd = item->pThrd; - SSvrConn* conn = NULL; - queue wq; + SAsyncItem* item = handle->data; + SWorkThrd* pThrd = item->pThrd; + SSvrConn* conn = NULL; + queue wq; // batch process to avoid to lock/unlock frequently taosThreadMutexLock(&item->mtx); @@ -650,7 +626,7 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { assert(buf->base[0] == notify[0]); taosMemoryFree(buf->base); - SWorkThrdObj* pThrd = q->data; + SWorkThrd* pThrd = q->data; uv_pipe_t* pipe = (uv_pipe_t*)q; if (!uv_pipe_pending_count(pipe)) { @@ -718,10 +694,10 @@ void uvOnPipeConnectionCb(uv_connect_t* connect, int status) { if (status != 0) { return; } - SWorkThrdObj* pThrd = container_of(connect, SWorkThrdObj, connect_req); + SWorkThrd* pThrd = container_of(connect, SWorkThrd, connect_req); uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb); } -static bool addHandleToWorkloop(SWorkThrdObj* pThrd, char* pipeName) { +static bool addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName) { pThrd->loop = (uv_loop_t*)taosMemoryMalloc(sizeof(uv_loop_t)); if (0 != uv_loop_init(pThrd->loop)) { return false; @@ -774,14 +750,14 @@ static bool addHandleToAcceptloop(void* arg) { } void* transWorkerThread(void* arg) { setThreadName("trans-worker"); - SWorkThrdObj* pThrd = (SWorkThrdObj*)arg; + SWorkThrd* pThrd = (SWorkThrd*)arg; uv_run(pThrd->loop, UV_RUN_DEFAULT); return NULL; } static SSvrConn* createConn(void* hThrd) { - SWorkThrdObj* pThrd = hThrd; + SWorkThrd* pThrd = hThrd; SSvrConn* pConn = (SSvrConn*)taosMemoryCalloc(1, sizeof(SSvrConn)); QUEUE_INIT(&pConn->queue); @@ -826,7 +802,7 @@ static void destroyConnRegArg(SSvrConn* conn) { conn->regArg.init = 0; } } -static int reallocConnRefHandle(SSvrConn* conn) { +static int reallocConnRef(SSvrConn* conn) { transReleaseExHandle(refMgt, conn->refId); transRemoveExHandle(refMgt, conn->refId); // avoid app continue to send msg on invalid handle @@ -844,7 +820,7 @@ static void uvDestroyConn(uv_handle_t* handle) { if (conn == NULL) { return; } - SWorkThrdObj* thrd = conn->hostThrd; + SWorkThrd* thrd = conn->hostThrd; transReleaseExHandle(refMgt, conn->refId); transRemoveExHandle(refMgt, conn->refId); @@ -889,7 +865,7 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, srv->numOfThreads = numOfThreads; srv->workerIdx = 0; srv->numOfWorkerReady = 0; - srv->pThreadObj = (SWorkThrdObj**)taosMemoryCalloc(srv->numOfThreads, sizeof(SWorkThrdObj*)); + srv->pThreadObj = (SWorkThrd**)taosMemoryCalloc(srv->numOfThreads, sizeof(SWorkThrd*)); srv->pipe = (uv_pipe_t**)taosMemoryCalloc(srv->numOfThreads, sizeof(uv_pipe_t*)); srv->ip = ip; srv->port = port; @@ -914,7 +890,7 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, assert(0 == uv_listen((uv_stream_t*)&srv->pipeListen, SOMAXCONN, uvPipeListenCb)); for (int i = 0; i < srv->numOfThreads; i++) { - SWorkThrdObj* thrd = (SWorkThrdObj*)taosMemoryCalloc(1, sizeof(SWorkThrdObj)); + SWorkThrd* thrd = (SWorkThrd*)taosMemoryCalloc(1, sizeof(SWorkThrd)); thrd->pTransInst = shandle; thrd->quit = false; srv->pThreadObj[i] = thrd; @@ -959,7 +935,7 @@ End: return NULL; } -void uvHandleQuit(SSvrMsg* msg, SWorkThrdObj* thrd) { +void uvHandleQuit(SSvrMsg* msg, SWorkThrd* thrd) { thrd->quit = true; if (QUEUE_IS_EMPTY(&thrd->conn)) { uv_walk(thrd->loop, uvWalkCb, NULL); @@ -968,10 +944,10 @@ void uvHandleQuit(SSvrMsg* msg, SWorkThrdObj* thrd) { } taosMemoryFree(msg); } -void uvHandleRelease(SSvrMsg* msg, SWorkThrdObj* thrd) { +void uvHandleRelease(SSvrMsg* msg, SWorkThrd* thrd) { SSvrConn* conn = msg->pConn; if (conn->status == ConnAcquire) { - reallocConnRefHandle(conn); + reallocConnRef(conn); if (!transQueuePush(&conn->srvMsgs, msg)) { return; } @@ -982,12 +958,12 @@ void uvHandleRelease(SSvrMsg* msg, SWorkThrdObj* thrd) { } destroySmsg(msg); } -void uvHandleResp(SSvrMsg* msg, SWorkThrdObj* thrd) { +void uvHandleResp(SSvrMsg* msg, SWorkThrd* thrd) { // send msg to client tDebug("%s conn %p start to send resp (2/2)", transLabel(thrd->pTransInst), msg->pConn); uvStartSendResp(msg); } -void uvHandleRegister(SSvrMsg* msg, SWorkThrdObj* thrd) { +void uvHandleRegister(SSvrMsg* msg, SWorkThrd* thrd) { SSvrConn* conn = msg->pConn; tDebug("%s conn %p register brokenlink callback", transLabel(thrd->pTransInst), conn); if (conn->status == ConnAcquire) { @@ -1008,7 +984,7 @@ void uvHandleRegister(SSvrMsg* msg, SWorkThrdObj* thrd) { taosMemoryFree(msg); } } -void destroyWorkThrd(SWorkThrdObj* pThrd) { +void destroyWorkThrd(SWorkThrd* pThrd) { if (pThrd == NULL) { return; } @@ -1019,7 +995,7 @@ void destroyWorkThrd(SWorkThrdObj* pThrd) { taosMemoryFree(pThrd->loop); taosMemoryFree(pThrd); } -void sendQuitToWorkThrd(SWorkThrdObj* pThrd) { +void sendQuitToWorkThrd(SWorkThrd* pThrd) { SSvrMsg* msg = taosMemoryCalloc(1, sizeof(SSvrMsg)); msg->type = Quit; tDebug("server send quit msg to work thread"); @@ -1055,8 +1031,6 @@ void transCloseServer(void* arg) { int ref = atomic_sub_fetch_32(&tranSSvrInst, 1); if (ref == 0) { - // TdThreadOnce tmpInit = PTHREAD_ONCE_INIT; - // memcpy(&transModuleInit, &tmpInit, sizeof(TdThreadOnce)); transCloseExHandleMgt(refMgt); } } @@ -1086,7 +1060,7 @@ void transReleaseSrvHandle(void* handle) { ASYNC_CHECK_HANDLE(exh, refId); - SWorkThrdObj* pThrd = exh->pThrd; + SWorkThrd* pThrd = exh->pThrd; ASYNC_ERR_JRET(pThrd); STransMsg tmsg = {.code = 0, .info.handle = exh, .info.ahandle = NULL, .info.refId = refId}; @@ -1116,7 +1090,7 @@ void transSendResponse(const STransMsg* msg) { STransMsg tmsg = *msg; tmsg.info.refId = refId; - SWorkThrdObj* pThrd = exh->pThrd; + SWorkThrd* pThrd = exh->pThrd; ASYNC_ERR_JRET(pThrd); SSvrMsg* m = taosMemoryCalloc(1, sizeof(SSvrMsg)); @@ -1146,7 +1120,7 @@ void transRegisterMsg(const STransMsg* msg) { STransMsg tmsg = *msg; tmsg.info.refId = refId; - SWorkThrdObj* pThrd = exh->pThrd; + SWorkThrd* pThrd = exh->pThrd; ASYNC_ERR_JRET(pThrd); SSvrMsg* m = taosMemoryCalloc(1, sizeof(SSvrMsg)); diff --git a/source/os/src/osSocket.c b/source/os/src/osSocket.c index 4d61e7036d695e4c0df34ef88d02a4609e6dcf1e..b0e07ff01031075179f11253f439fc3233fb571f 100644 --- a/source/os/src/osSocket.c +++ b/source/os/src/osSocket.c @@ -946,7 +946,7 @@ int32_t taosGetFqdn(char *fqdn) { #endif // __APPLE__ int32_t ret = getaddrinfo(hostname, NULL, &hints, &result); if (!result) { - fprintf(stderr,"failed to get fqdn, code:%d, reason:%s", ret, gai_strerror(ret)); + fprintf(stderr, "failed to get fqdn, code:%d, reason:%s", ret, gai_strerror(ret)); return -1; } @@ -1073,7 +1073,7 @@ int32_t taosCloseEpoll(TdEpollPtr *ppEpoll) { * Set TCP connection timeout per-socket level. * ref [https://github.com/libuv/help/issues/54] */ -int taosCreateSocketWithTimeOutOpt(uint32_t conn_timeout_sec) { +int32_t taosCreateSocketWithTimeout(uint32_t timeout) { #if defined(WINDOWS) SOCKET fd; #else @@ -1083,11 +1083,11 @@ int taosCreateSocketWithTimeOutOpt(uint32_t conn_timeout_sec) { return -1; } #if defined(WINDOWS) - if (0 != setsockopt(fd, IPPROTO_TCP, TCP_MAXRT, (char *)&conn_timeout_sec, sizeof(conn_timeout_sec))) { + if (0 != setsockopt(fd, IPPROTO_TCP, TCP_MAXRT, (char *)&timeout, sizeof(timeout))) { return -1; } #else // Linux like systems - uint32_t conn_timeout_ms = conn_timeout_sec * 1000; + uint32_t conn_timeout_ms = timeout * 1000; if (0 != setsockopt(fd, IPPROTO_TCP, TCP_USER_TIMEOUT, (char *)&conn_timeout_ms, sizeof(conn_timeout_ms))) { return -1; }