diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index 538aeb1a0e7927469cf90721dbbc16140c181e93..f1bab81573bd555963edf524ecfb7171ebdd44bd 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -78,6 +78,9 @@ typedef struct SRpcInit { // call back to retrieve the client auth info, for server app only int (*afp)(void *parent, char *tableId, char *spi, char *encrypt, char *secret, char *ckey); + // call back to keep conn or not + bool (*pfp)(void *parent, tmsg_t msgType); + void *parent; } SRpcInit; diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index d7574466cdfbe15ea38145c7dd3a9d0a3895e9bb..944da5a30737800513d13fb4cecf2b4eece72575 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -20,17 +20,17 @@ extern "C" { #endif -#include "taos.h" #include "common.h" -#include "tmsg.h" +#include "parser.h" +#include "query.h" +#include "taos.h" #include "tdef.h" #include "tep.h" #include "thash.h" #include "tlist.h" +#include "tmsg.h" #include "tmsgtype.h" #include "trpc.h" -#include "query.h" -#include "parser.h" #define CHECK_CODE_GOTO(expr, label) \ do { \ @@ -46,12 +46,12 @@ extern "C" { typedef struct SAppInstInfo SAppInstInfo; typedef struct SHbConnInfo { - void *param; - SClientHbReq *req; + void* param; + SClientHbReq* req; } SHbConnInfo; typedef struct SAppHbMgr { - char *key; + char* key; // statistics int32_t reportCnt; int32_t connKeyCnt; @@ -62,15 +62,13 @@ typedef struct SAppHbMgr { // connection SAppInstInfo* pAppInstInfo; // info - SHashObj* activeInfo; // hash - SHashObj* connInfo; // hash + SHashObj* activeInfo; // hash + SHashObj* connInfo; // hash } SAppHbMgr; +typedef int32_t (*FHbRspHandle)(struct SAppHbMgr* pAppHbMgr, SClientHbRsp* pRsp); -typedef int32_t (*FHbRspHandle)(struct SAppHbMgr *pAppHbMgr, SClientHbRsp* pRsp); - -typedef int32_t (*FHbReqHandle)(SClientHbKey *connKey, void* param, SClientHbReq *req); - +typedef int32_t (*FHbReqHandle)(SClientHbKey* connKey, void* param, SClientHbReq* req); typedef struct SClientHbMgr { int8_t inited; @@ -83,63 +81,62 @@ typedef struct SClientHbMgr { FHbRspHandle rspHandle[HEARTBEAT_TYPE_MAX]; } SClientHbMgr; - typedef struct SQueryExecMetric { - int64_t start; // start timestamp - int64_t parsed; // start to parse - int64_t send; // start to send to server - int64_t rsp; // receive response from server + int64_t start; // start timestamp + int64_t parsed; // start to parse + int64_t send; // start to send to server + int64_t rsp; // receive response from server } SQueryExecMetric; typedef struct SInstanceSummary { - uint64_t numOfInsertsReq; - uint64_t numOfInsertRows; - uint64_t insertElapsedTime; - uint64_t insertBytes; // submit to tsdb since launched. - - uint64_t fetchBytes; - uint64_t queryElapsedTime; - uint64_t numOfSlowQueries; - uint64_t totalRequests; - uint64_t currentRequests; // the number of SRequestObj + uint64_t numOfInsertsReq; + uint64_t numOfInsertRows; + uint64_t insertElapsedTime; + uint64_t insertBytes; // submit to tsdb since launched. + + uint64_t fetchBytes; + uint64_t queryElapsedTime; + uint64_t numOfSlowQueries; + uint64_t totalRequests; + uint64_t currentRequests; // the number of SRequestObj } SInstanceSummary; typedef struct SHeartBeatInfo { - void *pTimer; // timer, used to send request msg to mnode + void* pTimer; // timer, used to send request msg to mnode } SHeartBeatInfo; struct SAppInstInfo { - int64_t numOfConns; - SCorEpSet mgmtEp; - SInstanceSummary summary; - SList *pConnList; // STscObj linked list - int64_t clusterId; - void *pTransporter; - struct SAppHbMgr *pAppHbMgr; + int64_t numOfConns; + SCorEpSet mgmtEp; + SInstanceSummary summary; + SList* pConnList; // STscObj linked list + int64_t clusterId; + void* pTransporter; + struct SAppHbMgr* pAppHbMgr; }; typedef struct SAppInfo { int64_t startTime; char appName[TSDB_APP_NAME_LEN]; - char *ep; + char* ep; int32_t pid; int32_t numOfThreads; - SHashObj *pInstMap; + SHashObj* pInstMap; pthread_mutex_t mutex; } SAppInfo; typedef struct STscObj { - char user[TSDB_USER_LEN]; - char pass[TSDB_PASSWORD_LEN]; - char db[TSDB_DB_FNAME_LEN]; - char ver[128]; - int32_t acctId; - uint32_t connId; - int32_t connType; - uint64_t id; // ref ID returned by taosAddRef - pthread_mutex_t mutex; // used to protect the operation on db - int32_t numOfReqs; // number of sqlObj bound to this connection - SAppInstInfo *pAppInfo; + char user[TSDB_USER_LEN]; + char pass[TSDB_PASSWORD_LEN]; + char db[TSDB_DB_FNAME_LEN]; + char ver[128]; + int32_t acctId; + uint32_t connId; + int32_t connType; + uint64_t id; // ref ID returned by taosAddRef + pthread_mutex_t mutex; // used to protect the operation on db + int32_t numOfReqs; // number of sqlObj bound to this connection + SAppInstInfo* pAppInfo; } STscObj; typedef struct SMqConsumer { @@ -147,49 +144,49 @@ typedef struct SMqConsumer { } SMqConsumer; typedef struct SReqResultInfo { - const char *pRspMsg; - const char *pData; - TAOS_FIELD *fields; - uint32_t numOfCols; - int32_t *length; - TAOS_ROW row; - char **pCol; - uint32_t numOfRows; - uint64_t totalRows; - uint32_t current; - bool completed; + const char* pRspMsg; + const char* pData; + TAOS_FIELD* fields; + uint32_t numOfCols; + int32_t* length; + TAOS_ROW row; + char** pCol; + uint32_t numOfRows; + uint64_t totalRows; + uint32_t current; + bool completed; } SReqResultInfo; typedef struct SShowReqInfo { - int64_t execId; // showId/queryId - int32_t vgId; - SArray *pArray; // SArray - int32_t currentIndex; // current accessed vgroup index. + int64_t execId; // showId/queryId + int32_t vgId; + SArray* pArray; // SArray + int32_t currentIndex; // current accessed vgroup index. } SShowReqInfo; typedef struct SRequestSendRecvBody { - tsem_t rspSem; // not used now + tsem_t rspSem; // not used now void* fp; - SShowReqInfo showInfo; // todo this attribute will be removed after the query framework being completed. + SShowReqInfo showInfo; // todo this attribute will be removed after the query framework being completed. SDataBuf requestMsg; - struct SSchJob *pQueryJob; // query job, created according to sql query DAG. - struct SQueryDag *pDag; // the query dag, generated according to the sql statement. + struct SSchJob* pQueryJob; // query job, created according to sql query DAG. + struct SQueryDag* pDag; // the query dag, generated according to the sql statement. SReqResultInfo resInfo; } SRequestSendRecvBody; -#define ERROR_MSG_BUF_DEFAULT_SIZE 512 +#define ERROR_MSG_BUF_DEFAULT_SIZE 512 typedef struct SRequestObj { - uint64_t requestId; - int32_t type; // request type - STscObj *pTscObj; - char *sqlstr; // sql string - int32_t sqlLen; - int64_t self; - char *msgBuf; - void *pInfo; // sql parse info, generated by parser module - int32_t code; - SQueryExecMetric metric; + uint64_t requestId; + int32_t type; // request type + STscObj* pTscObj; + char* sqlstr; // sql string + int32_t sqlLen; + int64_t self; + char* msgBuf; + void* pInfo; // sql parse info, generated by parser module + int32_t code; + SQueryExecMetric metric; SRequestSendRecvBody body; } SRequestObj; @@ -198,51 +195,52 @@ extern int32_t clientReqRefPool; extern int32_t clientConnRefPool; extern int (*handleRequestRspFp[TDMT_MAX])(void*, const SDataBuf* pMsg, int32_t code); -int genericRspCallback(void* param, const SDataBuf* pMsg, int32_t code); +int genericRspCallback(void* param, const SDataBuf* pMsg, int32_t code); SMsgSendInfo* buildMsgInfoImpl(SRequestObj* pReqObj); -int taos_init(); +int taos_init(); -void* createTscObj(const char* user, const char* auth, const char *db, SAppInstInfo* pAppInfo); -void destroyTscObj(void*pObj); +void* createTscObj(const char* user, const char* auth, const char* db, SAppInstInfo* pAppInfo); +void destroyTscObj(void* pObj); uint64_t generateRequestId(); -void *createRequest(STscObj* pObj, __taos_async_fn_t fp, void* param, int32_t type); +void* createRequest(STscObj* pObj, __taos_async_fn_t fp, void* param, int32_t type); void destroyRequest(SRequestObj* pRequest); -char *getDbOfConnection(STscObj* pObj); +char* getDbOfConnection(STscObj* pObj); void setConnectionDB(STscObj* pTscObj, const char* db); void taos_init_imp(void); -int taos_options_imp(TSDB_OPTION option, const char *str); +int taos_options_imp(TSDB_OPTION option, const char* str); -void* openTransporter(const char *user, const char *auth, int32_t numOfThreads); +void* openTransporter(const char* user, const char* auth, int32_t numOfThreads); +bool persistConnForSpecificMsg(void* parenct, tmsg_t msgType); void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet); void initMsgHandleFp(); -TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, const char *auth, const char *db, uint16_t port); - -void *doFetchRow(SRequestObj* pRequest); +TAOS* taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db, + uint16_t port); -void setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows); +void* doFetchRow(SRequestObj* pRequest); +void setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows); -int32_t buildRequest(STscObj *pTscObj, const char *sql, int sqlLen, SRequestObj** pRequest); +int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj** pRequest); int32_t parseSql(SRequestObj* pRequest, SQueryNode** pQuery); -// --- heartbeat +// --- heartbeat // global, called by mgmt int hbMgrInit(); void hbMgrCleanUp(); int hbHandleRsp(SClientHbBatchRsp* hbRsp); // cluster level -SAppHbMgr* appHbMgrInit(SAppInstInfo* pAppInstInfo, char *key); -void appHbMgrCleanup(void); +SAppHbMgr* appHbMgrInit(SAppInstInfo* pAppInstInfo, char* key); +void appHbMgrCleanup(void); // conn level int hbRegisterConn(SAppHbMgr* pAppHbMgr, int32_t connId, int64_t clusterId, int32_t hbType); @@ -253,7 +251,6 @@ int hbAddConnInfo(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, void* key, void* v // --- mq void hbMgrInitMqHbRspHandle(); - #ifdef __cplusplus } #endif diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index cb73701bfa389ede7ed8073728acd7d418a369c5..af131f7624384581550802ad029878caa11be167 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -13,16 +13,16 @@ * along with this program. If not, see . */ -#include "os.h" #include "catalog.h" #include "clientInt.h" #include "clientLog.h" +#include "os.h" #include "query.h" #include "scheduler.h" -#include "tmsg.h" #include "tcache.h" #include "tconfig.h" #include "tglobal.h" +#include "tmsg.h" #include "tnote.h" #include "tref.h" #include "trpc.h" @@ -30,16 +30,16 @@ #include "ttimezone.h" #define TSC_VAR_NOT_RELEASE 1 -#define TSC_VAR_RELEASED 0 +#define TSC_VAR_RELEASED 0 -SAppInfo appInfo; -int32_t clientReqRefPool = -1; -int32_t clientConnRefPool = -1; +SAppInfo appInfo; +int32_t clientReqRefPool = -1; +int32_t clientConnRefPool = -1; static pthread_once_t tscinit = PTHREAD_ONCE_INIT; -volatile int32_t tscInitRes = 0; +volatile int32_t tscInitRes = 0; -static void registerRequest(SRequestObj* pRequest) { +static void registerRequest(SRequestObj *pRequest) { STscObj *pTscObj = (STscObj *)taosAcquireRef(clientConnRefPool, pRequest->pTscObj->id); assert(pTscObj != NULL); @@ -53,23 +53,25 @@ static void registerRequest(SRequestObj* pRequest) { int32_t total = atomic_add_fetch_32(&pSummary->totalRequests, 1); int32_t currentInst = atomic_add_fetch_32(&pSummary->currentRequests, 1); - tscDebug("0x%" PRIx64 " new Request from connObj:0x%" PRIx64 ", current:%d, app current:%d, total:%d, reqId:0x%"PRIx64, pRequest->self, - pRequest->pTscObj->id, num, currentInst, total, pRequest->requestId); + tscDebug("0x%" PRIx64 " new Request from connObj:0x%" PRIx64 + ", current:%d, app current:%d, total:%d, reqId:0x%" PRIx64, + pRequest->self, pRequest->pTscObj->id, num, currentInst, total, pRequest->requestId); } } -static void deregisterRequest(SRequestObj* pRequest) { +static void deregisterRequest(SRequestObj *pRequest) { assert(pRequest != NULL); - STscObj* pTscObj = pRequest->pTscObj; - SInstanceSummary* pActivity = &pTscObj->pAppInfo->summary; + STscObj * pTscObj = pRequest->pTscObj; + SInstanceSummary *pActivity = &pTscObj->pAppInfo->summary; int32_t currentInst = atomic_sub_fetch_32(&pActivity->currentRequests, 1); int32_t num = atomic_sub_fetch_32(&pTscObj->numOfReqs, 1); int64_t duration = taosGetTimestampMs() - pRequest->metric.start; - tscDebug("0x%"PRIx64" free Request from connObj: 0x%"PRIx64", reqId:0x%"PRIx64" elapsed:%"PRIu64" ms, current:%d, app current:%d", pRequest->self, pTscObj->id, - pRequest->requestId, duration, num, currentInst); + tscDebug("0x%" PRIx64 " free Request from connObj: 0x%" PRIx64 ", reqId:0x%" PRIx64 " elapsed:%" PRIu64 + " ms, current:%d, app current:%d", + pRequest->self, pTscObj->id, pRequest->requestId, duration, num, currentInst); taosReleaseRef(clientConnRefPool, pTscObj->id); } @@ -79,8 +81,8 @@ static void tscInitLogFile() { printf("failed to create log dir:%s\n", tsLogDir); } - const char *defaultLogFileNamePrefix = "taoslog"; - const int32_t maxLogFileNum = 10; + const char * defaultLogFileNamePrefix = "taoslog"; + const int32_t maxLogFileNum = 10; char temp[128] = {0}; sprintf(temp, "%s/%s", tsLogDir, defaultLogFileNamePrefix); @@ -90,23 +92,24 @@ static void tscInitLogFile() { } // todo close the transporter properly -void closeTransporter(STscObj* pTscObj) { +void closeTransporter(STscObj *pTscObj) { if (pTscObj == NULL || pTscObj->pAppInfo->pTransporter == NULL) { return; } - tscDebug("free transporter:%p in connObj: 0x%"PRIx64, pTscObj->pAppInfo->pTransporter, pTscObj->id); + tscDebug("free transporter:%p in connObj: 0x%" PRIx64, pTscObj->pAppInfo->pTransporter, pTscObj->id); rpcClose(pTscObj->pAppInfo->pTransporter); } // TODO refactor -void* openTransporter(const char *user, const char *auth, int32_t numOfThread) { +void *openTransporter(const char *user, const char *auth, int32_t numOfThread) { SRpcInit rpcInit; memset(&rpcInit, 0, sizeof(rpcInit)); rpcInit.localPort = 0; rpcInit.label = "TSC"; rpcInit.numOfThreads = numOfThread; rpcInit.cfp = processMsgFromServer; + rpcInit.pfp = persistConnForSpecificMsg; rpcInit.sessions = tsMaxConnections; rpcInit.connType = TAOS_CONN_CLIENT; rpcInit.user = (char *)user; @@ -115,7 +118,7 @@ void* openTransporter(const char *user, const char *auth, int32_t numOfThread) { rpcInit.spi = 1; rpcInit.secret = (char *)auth; - void* pDnodeConn = rpcOpen(&rpcInit); + void *pDnodeConn = rpcOpen(&rpcInit); if (pDnodeConn == NULL) { tscError("failed to init connection to server"); return NULL; @@ -130,12 +133,12 @@ void destroyTscObj(void *pObj) { SClientHbKey connKey = {.connId = pTscObj->connId, .hbType = pTscObj->connType}; hbDeregisterConn(pTscObj->pAppInfo->pAppHbMgr, connKey); atomic_sub_fetch_64(&pTscObj->pAppInfo->numOfConns, 1); - tscDebug("connObj 0x%"PRIx64" destroyed, totalConn:%"PRId64, pTscObj->id, pTscObj->pAppInfo->numOfConns); + tscDebug("connObj 0x%" PRIx64 " destroyed, totalConn:%" PRId64, pTscObj->id, pTscObj->pAppInfo->numOfConns); pthread_mutex_destroy(&pTscObj->mutex); tfree(pTscObj); } -void* createTscObj(const char* user, const char* auth, const char *db, SAppInstInfo* pAppInfo) { +void *createTscObj(const char *user, const char *auth, const char *db, SAppInstInfo *pAppInfo) { STscObj *pObj = (STscObj *)calloc(1, sizeof(STscObj)); if (NULL == pObj) { terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; @@ -153,11 +156,11 @@ void* createTscObj(const char* user, const char* auth, const char *db, SAppInstI pthread_mutex_init(&pObj->mutex, NULL); pObj->id = taosAddRef(clientConnRefPool, pObj); - tscDebug("connObj created, 0x%"PRIx64, pObj->id); + tscDebug("connObj created, 0x%" PRIx64, pObj->id); return pObj; } -void* createRequest(STscObj* pObj, __taos_async_fn_t fp, void* param, int32_t type) { +void *createRequest(STscObj *pObj, __taos_async_fn_t fp, void *param, int32_t type) { assert(pObj != NULL); SRequestObj *pRequest = (SRequestObj *)calloc(1, sizeof(SRequestObj)); @@ -166,20 +169,20 @@ void* createRequest(STscObj* pObj, __taos_async_fn_t fp, void* param, int32_t ty return NULL; } - pRequest->requestId = generateRequestId(); + pRequest->requestId = generateRequestId(); pRequest->metric.start = taosGetTimestampMs(); - pRequest->type = type; - pRequest->pTscObj = pObj; - pRequest->body.fp = fp; // not used it yet - pRequest->msgBuf = calloc(1, ERROR_MSG_BUF_DEFAULT_SIZE); + pRequest->type = type; + pRequest->pTscObj = pObj; + pRequest->body.fp = fp; // not used it yet + pRequest->msgBuf = calloc(1, ERROR_MSG_BUF_DEFAULT_SIZE); tsem_init(&pRequest->body.rspSem, 0, 0); registerRequest(pRequest); return pRequest; } -static void doFreeReqResultInfo(SReqResultInfo* pResInfo) { +static void doFreeReqResultInfo(SReqResultInfo *pResInfo) { tfree(pResInfo->pRspMsg); tfree(pResInfo->length); tfree(pResInfo->row); @@ -187,9 +190,9 @@ static void doFreeReqResultInfo(SReqResultInfo* pResInfo) { tfree(pResInfo->fields); } -static void doDestroyRequest(void* p) { +static void doDestroyRequest(void *p) { assert(p != NULL); - SRequestObj* pRequest = (SRequestObj*)p; + SRequestObj *pRequest = (SRequestObj *)p; assert(RID_VALID(pRequest->self)); @@ -208,7 +211,7 @@ static void doDestroyRequest(void* p) { tfree(pRequest); } -void destroyRequest(SRequestObj* pRequest) { +void destroyRequest(SRequestObj *pRequest) { if (pRequest == NULL) { return; } @@ -252,14 +255,14 @@ void taos_init_imp(void) { initTaskQueue(); clientConnRefPool = taosOpenRef(200, destroyTscObj); - clientReqRefPool = taosOpenRef(40960, doDestroyRequest); + clientReqRefPool = taosOpenRef(40960, doDestroyRequest); taosGetAppName(appInfo.appName, NULL); pthread_mutex_init(&appInfo.mutex, NULL); - appInfo.pid = taosGetPId(); + appInfo.pid = taosGetPId(); appInfo.startTime = taosGetTimestampMs(); - appInfo.pInstMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); + appInfo.pInstMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); tscDebug("client is initialized successfully"); } @@ -281,7 +284,8 @@ int taos_options_imp(TSDB_OPTION option, const char *str) { cfg->cfgStatus = TAOS_CFG_CSTATUS_OPTION; tscInfo("set config file directory:%s", str); } else { - tscWarn("config option:%s, input value:%s, is configured by %s, use %s", cfg->option, str, tsCfgStatusStr[cfg->cfgStatus], (char *)cfg->ptr); + tscWarn("config option:%s, input value:%s, is configured by %s, use %s", cfg->option, str, + tsCfgStatusStr[cfg->cfgStatus], (char *)cfg->ptr); } break; @@ -296,7 +300,8 @@ int taos_options_imp(TSDB_OPTION option, const char *str) { cfg->cfgStatus = TAOS_CFG_CSTATUS_OPTION; tscInfo("set shellActivityTimer:%d", tsShellActivityTimer); } else { - tscWarn("config option:%s, input value:%s, is configured by %s, use %d", cfg->option, str, tsCfgStatusStr[cfg->cfgStatus], *(int32_t *)cfg->ptr); + tscWarn("config option:%s, input value:%s, is configured by %s, use %d", cfg->option, str, + tsCfgStatusStr[cfg->cfgStatus], *(int32_t *)cfg->ptr); } break; @@ -313,8 +318,8 @@ int taos_options_imp(TSDB_OPTION option, const char *str) { if (cfg->cfgStatus <= TAOS_CFG_CSTATUS_OPTION) { char sep = '.'; - if (strlen(tsLocale) == 0) { // locale does not set yet - char* defaultLocale = setlocale(LC_CTYPE, ""); + if (strlen(tsLocale) == 0) { // locale does not set yet + char *defaultLocale = setlocale(LC_CTYPE, ""); // The locale of the current OS does not be set correctly, so the default locale cannot be acquired. // The launch of current system will abort soon. @@ -329,10 +334,10 @@ int taos_options_imp(TSDB_OPTION option, const char *str) { // set the user specified locale char *locale = setlocale(LC_CTYPE, str); - if (locale != NULL) { // failed to set the user specified locale + if (locale != NULL) { // failed to set the user specified locale tscInfo("locale set, prev locale:%s, new locale:%s", tsLocale, locale); cfg->cfgStatus = TAOS_CFG_CSTATUS_OPTION; - } else { // set the user specified locale failed, use default LC_CTYPE as current locale + } else { // set the user specified locale failed, use default LC_CTYPE as current locale locale = setlocale(LC_CTYPE, tsLocale); tscInfo("failed to set locale:%s, current locale:%s", str, tsLocale); } @@ -360,11 +365,12 @@ int taos_options_imp(TSDB_OPTION option, const char *str) { } free(charset); - } else { // it may be windows system + } else { // it may be windows system tscInfo("charset remains:%s", tsCharset); } } else { - tscWarn("config option:%s, input value:%s, is configured by %s, use %s", cfg->option, str, tsCfgStatusStr[cfg->cfgStatus], (char *)cfg->ptr); + tscWarn("config option:%s, input value:%s, is configured by %s, use %s", cfg->option, str, + tsCfgStatusStr[cfg->cfgStatus], (char *)cfg->ptr); } break; } @@ -394,7 +400,8 @@ int taos_options_imp(TSDB_OPTION option, const char *str) { tscInfo("charset:%s not valid", str); } } else { - tscWarn("config option:%s, input value:%s, is configured by %s, use %s", cfg->option, str, tsCfgStatusStr[cfg->cfgStatus], (char *)cfg->ptr); + tscWarn("config option:%s, input value:%s, is configured by %s, use %s", cfg->option, str, + tsCfgStatusStr[cfg->cfgStatus], (char *)cfg->ptr); } break; @@ -410,7 +417,8 @@ int taos_options_imp(TSDB_OPTION option, const char *str) { cfg->cfgStatus = TAOS_CFG_CSTATUS_OPTION; tscDebug("timezone set:%s, input:%s by taos_options", tsTimezone, str); } else { - tscWarn("config option:%s, input value:%s, is configured by %s, use %s", cfg->option, str, tsCfgStatusStr[cfg->cfgStatus], (char *)cfg->ptr); + tscWarn("config option:%s, input value:%s, is configured by %s, use %s", cfg->option, str, + tsCfgStatusStr[cfg->cfgStatus], (char *)cfg->ptr); } break; @@ -434,7 +442,7 @@ int taos_options_imp(TSDB_OPTION option, const char *str) { */ uint64_t generateRequestId() { static uint64_t hashId = 0; - static int32_t requestSerialId = 0; + static int32_t requestSerialId = 0; if (hashId == 0) { char uid[64] = {0}; @@ -448,9 +456,9 @@ uint64_t generateRequestId() { } } - int64_t ts = taosGetTimestampMs(); - uint64_t pid = taosGetPId(); - int32_t val = atomic_add_fetch_32(&requestSerialId, 1); + int64_t ts = taosGetTimestampMs(); + uint64_t pid = taosGetPId(); + int32_t val = atomic_add_fetch_32(&requestSerialId, 1); uint64_t id = ((hashId & 0x0FFF) << 52) | ((pid & 0x0FFF) << 40) | ((ts & 0xFFFFFF) << 16) | (val & 0xFFFF); return id; diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 2eff353c3947d2fa7f908300b79feadd0c75f93c..3b35b6bdd4bbdb6bc483f14dfc19d6cc0098dc5b 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -370,7 +370,6 @@ static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest) { pMsgSendInfo->fp = handleRequestRspFp[TMSG_INDEX(pMsgSendInfo->msgType)]; pMsgSendInfo->param = pRequest; - SConnectReq connectReq = {0}; STscObj* pObj = pRequest->pTscObj; @@ -398,7 +397,9 @@ static void destroySendMsgInfo(SMsgSendInfo* pMsgBody) { tfree(pMsgBody->msgInfo.pData); tfree(pMsgBody); } - +bool persistConnForSpecificMsg(void* parenct, tmsg_t msgType) { + return msgType == TDMT_VND_QUERY_RSP || msgType == TDMT_VND_FETCH_RSP || msgType == TDMT_VND_RES_READY_RSP; +} void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->ahandle; assert(pMsg->ahandle != NULL); diff --git a/source/libs/transport/inc/transportInt.h b/source/libs/transport/inc/transportInt.h index a36b671eb459268c1fcf51ca80ac21a88d050e37..d080db753d525362ecd8cca464f17f6d80fbd19e 100644 --- a/source/libs/transport/inc/transportInt.h +++ b/source/libs/transport/inc/transportInt.h @@ -66,6 +66,7 @@ typedef struct { void (*cfp)(void* parent, SRpcMsg*, SEpSet*); int (*afp)(void* parent, char* user, char* spi, char* encrypt, char* secret, char* ckey); + bool (*pfp)(void* parent, tmsg_t msgType); int32_t refCount; void* parent; diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index 48c15ca286e6202be5d71ca7791272b0a07ce3f3..453b5184d2c3d19ac388dd971af8894a9ed01135 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -29,7 +29,12 @@ void* rpcOpen(const SRpcInit* pInit) { if (pInit->label) { tstrncpy(pRpc->label, pInit->label, strlen(pInit->label) + 1); } + + // register callback handle pRpc->cfp = pInit->cfp; + pRpc->afp = pInit->afp; + pRpc->pfp = pInit->pfp; + if (pInit->connType == TAOS_CONN_SERVER) { pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads; } else { diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 8312c0217c13d40a43b57c82dc22ff04d5ddcd65..d5fa4c8c479494b82d97224af013f9b30cf0e961 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -134,8 +134,7 @@ static void clientHandleResp(SCliConn* conn) { rpcMsg.msgType = pHead->msgType; rpcMsg.ahandle = pCtx->ahandle; - if (rpcMsg.msgType == TDMT_VND_QUERY_RSP || rpcMsg.msgType == TDMT_VND_FETCH_RSP || - rpcMsg.msgType == TDMT_VND_RES_READY_RSP) { + if (pRpc->pfp != NULL && (pRpc->pfp)(pRpc->parent, rpcMsg.msgType)) { rpcMsg.handle = conn; conn->persist = 1; tDebug("client conn %p persist by app", conn); @@ -185,18 +184,13 @@ static void clientHandleExcept(SCliConn* pConn) { clientConnDestroy(pConn, true); return; } - SCliMsg* pMsg = pConn->data; - - tmsg_t msgType = TDMT_MND_CONNECT; - if (pMsg != NULL) { - msgType = pMsg->msg.msgType; - } + SCliMsg* pMsg = pConn->data; STransConnCtx* pCtx = pMsg->ctx; SRpcMsg rpcMsg = {0}; rpcMsg.ahandle = pCtx->ahandle; rpcMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL; - rpcMsg.msgType = msgType + 1; + rpcMsg.msgType = pMsg->msg.msgType + 1; if (pConn->push != NULL && pConn->ctnRdCnt != 0) { (*pConn->push->callback)(pConn->push->arg, &rpcMsg); @@ -445,7 +439,7 @@ static void clientConnCb(uv_connect_t* req, int status) { addrlen = sizeof(pConn->locaddr); uv_tcp_getsockname((uv_tcp_t*)pConn->stream, (struct sockaddr*)&pConn->locaddr, &addrlen); - tTrace("client conn %p create", pConn); + tTrace("client conn %p connect to server successfully", pConn); assert(pConn->stream == req->handle); clientWrite(pConn); @@ -524,6 +518,7 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { struct sockaddr_in addr; uv_ip4_addr(pMsg->ctx->ip, pMsg->ctx->port, &addr); // handle error in callback if fail to connect + tTrace("client conn %p try to connect to %s:%d", conn, pMsg->ctx->ip, pMsg->ctx->port); uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, clientConnCb); } diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index 9fca371bf30588f7bdd50f5a3cc452e000bd03eb..f0db054797b936b8fde9fa3b241c8dbff909bb90 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -413,11 +413,6 @@ void uvWorkerAsyncCb(uv_async_t* handle) { } else { uvStartSendResp(msg); } - // uv_buf_t wb; - // uvPrepareSendData(msg, &wb); - // uv_timer_stop(conn->pTimer); - - // uv_write(conn->pWriter, (uv_stream_t*)conn->pTcp, &wb, 1, uvOnWriteCb); } } static void uvAcceptAsyncCb(uv_async_t* async) { @@ -490,7 +485,6 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { pConn->pTimer->data = pConn; pConn->hostThrd = pThrd; - // pConn->pWorkerAsync = pThrd->workerAsync; // thread safty // init client handle pConn->pTcp = (uv_tcp_t*)malloc(sizeof(uv_tcp_t)); @@ -730,14 +724,9 @@ void destroyWorkThrd(SWorkThrdObj* pThrd) { } void sendQuitToWorkThrd(SWorkThrdObj* pThrd) { SSrvMsg* srvMsg = calloc(1, sizeof(SSrvMsg)); - - // pthread_mutex_lock(&pThrd->msgMtx); - // QUEUE_PUSH(&pThrd->msg, &srvMsg->q); - // pthread_mutex_unlock(&pThrd->msgMtx); tDebug("send quit msg to work thread"); transSendAsync(pThrd->asyncPool, &srvMsg->q); - // uv_async_send(pThrd->workerAsync); } void taosCloseServer(void* arg) { @@ -774,19 +763,12 @@ void rpcSendResponse(const SRpcMsg* pMsg) { SSrvMsg* srvMsg = calloc(1, sizeof(SSrvMsg)); srvMsg->pConn = pConn; srvMsg->msg = *pMsg; - - // pthread_mutex_lock(&pThrd->msgMtx); - // QUEUE_PUSH(&pThrd->msg, &srvMsg->q); - // pthread_mutex_unlock(&pThrd->msgMtx); - tTrace("server conn %p start to send resp", pConn); transSendAsync(pThrd->asyncPool, &srvMsg->q); - // uv_async_send(pThrd->workerAsync); } int rpcGetConnInfo(void* thandle, SRpcConnInfo* pInfo) { SSrvConn* pConn = thandle; - // struct sockaddr* pPeerName = &pConn->peername; struct sockaddr_in addr = pConn->addr; pInfo->clientIp = (uint32_t)(addr.sin_addr.s_addr);