diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index 53292ed46a32bc270fb7a07a7d45353528ffde6a..82a9d0f4892c1d0a404739bd84d5d76febf31da1 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -65,7 +65,7 @@ enum { typedef struct SAppInstInfo SAppInstInfo; typedef struct { - char* key; + char* key; // statistics int32_t reportCnt; int32_t connKeyCnt; @@ -177,14 +177,14 @@ typedef struct SReqResultInfo { } SReqResultInfo; typedef struct SRequestSendRecvBody { - tsem_t rspSem; // not used now - __taos_async_fn_t queryFp; - __taos_async_fn_t fetchFp; - void* param; - SDataBuf requestMsg; - int64_t queryJob; // query job, created according to sql query DAG. - int32_t subplanNum; - SReqResultInfo resInfo; + tsem_t rspSem; // not used now + __taos_async_fn_t queryFp; + __taos_async_fn_t fetchFp; + void* param; + SDataBuf requestMsg; + int64_t queryJob; // query job, created according to sql query DAG. + int32_t subplanNum; + SReqResultInfo resInfo; } SRequestSendRecvBody; typedef struct { @@ -284,6 +284,7 @@ static FORCE_INLINE SReqResultInfo* tscGetCurResInfo(TAOS_RES* res) { extern SAppInfo appInfo; extern int32_t clientReqRefPool; extern int32_t clientConnRefPool; +extern void* tscQhandle; __async_send_cb_fn_t getMsgRspHandle(int32_t msgType); @@ -301,7 +302,7 @@ void destroyRequest(SRequestObj* pRequest); SRequestObj* acquireRequest(int64_t rid); int32_t releaseRequest(int64_t rid); int32_t removeRequest(int64_t rid); -void doDestroyRequest(void *p); +void doDestroyRequest(void* p); char* getDbOfConnection(STscObj* pObj); void setConnectionDB(STscObj* pTscObj, const char* db); @@ -336,8 +337,8 @@ int hbHandleRsp(SClientHbBatchRsp* hbRsp); // cluster level SAppHbMgr* appHbMgrInit(SAppInstInfo* pAppInstInfo, char* key); void appHbMgrCleanup(void); -void hbRemoveAppHbMrg(SAppHbMgr **pAppHbMgr); -void closeAllRequests(SHashObj *pRequests); +void hbRemoveAppHbMrg(SAppHbMgr** pAppHbMgr); +void closeAllRequests(SHashObj* pRequests); // conn level int hbRegisterConn(SAppHbMgr* pAppHbMgr, int64_t tscRefId, int64_t clusterId, int8_t connType); @@ -356,6 +357,9 @@ int32_t removeMeta(STscObj* pTscObj, SArray* tbList); // todo move to clie int32_t handleAlterTbExecRes(void* res, struct SCatalog* pCatalog); // todo move to xxx bool qnodeRequired(SRequestObj* pRequest); +void initTscQhandle(); +void cleanupTscQhandle(); + #ifdef __cplusplus } #endif diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index fefabf65394b817f6bf32744693b4e660d6b882e..f58b942fad6e124eb44a4d7beafd3b6ae44c7232 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -25,6 +25,7 @@ #include "tmsg.h" #include "tref.h" #include "trpc.h" +#include "tsched.h" #include "ttime.h" #define TSC_VAR_NOT_RELEASE 1 @@ -34,9 +35,20 @@ SAppInfo appInfo; int32_t clientReqRefPool = -1; int32_t clientConnRefPool = -1; +void *tscQhandle = NULL; + static TdThreadOnce tscinit = PTHREAD_ONCE_INIT; volatile int32_t tscInitRes = 0; +void initTscQhandle() { + // init handle + tscQhandle = taosInitScheduler(4096, 5, "tsc"); +} + +void cleanupTscQhandle() { + // destroy handle + taosCleanUpScheduler(tscQhandle); +} static int32_t registerRequest(SRequestObj *pRequest) { STscObj *pTscObj = acquireTscObj(pRequest->pTscObj->id); if (NULL == pTscObj) { @@ -156,9 +168,9 @@ void destroyTscObj(void *pObj) { if (NULL == pObj) { return; } - + STscObj *pTscObj = pObj; - int64_t tscId = pTscObj->id; + int64_t tscId = pTscObj->id; tscDebug("begin to destroy tscObj %" PRIx64 " p:%p", tscId, pTscObj); SClientHbKey connKey = {.tscRid = pTscObj->id, .connType = pTscObj->connType}; @@ -272,11 +284,11 @@ void doDestroyRequest(void *p) { if (NULL == p) { return; } - + SRequestObj *pRequest = (SRequestObj *)p; - int64_t reqId = pRequest->self; + int64_t reqId = pRequest->self; tscDebug("begin to destroy request %" PRIx64 " p:%p", reqId, pRequest); - + taosHashRemove(pRequest->pTscObj->pRequests, &pRequest->self, sizeof(pRequest->self)); if (pRequest->body.queryJob != 0) { @@ -314,7 +326,7 @@ void taos_init_imp(void) { // In the APIs of other program language, taos_cleanup is not available yet. // So, to make sure taos_cleanup will be invoked to clean up the allocated resource to suppress the valgrind warning. atexit(taos_cleanup); - + initTscQhandle(); errno = TSDB_CODE_SUCCESS; taosSeedRand(taosGetTimestampSec()); diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index b6009ebbda83b9c22248059f22273ed6b5c2e88e..178ed63fb406e56f30f991741eccd576cb27e2d7 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -25,6 +25,7 @@ #include "tmsgtype.h" #include "tpagedbuf.h" #include "tref.h" +#include "tsched.h" static int32_t initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet); static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest); @@ -56,14 +57,14 @@ static char* getClusterKey(const char* user, const char* auth, const char* ip, i } bool chkRequestKilled(void* param) { - bool killed = false; + bool killed = false; SRequestObj* pRequest = acquireRequest((int64_t)param); if (NULL == pRequest || pRequest->killed) { killed = true; } releaseRequest((int64_t)param); - + return killed; } @@ -769,7 +770,7 @@ int32_t handleQueryExecRsp(SRequestObj* pRequest) { code = handleSubmitExecRes(pRequest, pRes->res, pCatalog, &epset); break; } - case TDMT_SCH_QUERY: + case TDMT_SCH_QUERY: case TDMT_SCH_MERGE_QUERY: { code = handleQueryExecRes(pRequest, pRes->res, pCatalog, &epset); break; @@ -1236,7 +1237,16 @@ void updateTargetEpSet(SMsgSendInfo* pSendInfo, STscObj* pTscObj, SRpcMsg* pMsg, } } -void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { +typedef struct SchedArg { + SRpcMsg msg; + SEpSet* pEpset; +} SchedArg; + +void doProcessMsgFromServer(SSchedMsg* schedMsg) { + SchedArg* arg = (SchedArg*)schedMsg->ahandle; + SRpcMsg* pMsg = &arg->msg; + SEpSet* pEpSet = arg->pEpset; + SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->info.ahandle; assert(pMsg->info.ahandle != NULL); STscObj* pTscObj = NULL; @@ -1269,7 +1279,8 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { updateTargetEpSet(pSendInfo, pTscObj, pMsg, pEpSet); - SDataBuf buf = {.msgType = pMsg->msgType, .len = pMsg->contLen, .pData = NULL, .handle = pMsg->info.handle, .pEpSet = pEpSet}; + SDataBuf buf = { + .msgType = pMsg->msgType, .len = pMsg->contLen, .pData = NULL, .handle = pMsg->info.handle, .pEpSet = pEpSet}; if (pMsg->contLen > 0) { buf.pData = taosMemoryCalloc(1, pMsg->contLen); @@ -1284,6 +1295,25 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { pSendInfo->fp(pSendInfo->param, &buf, pMsg->code); rpcFreeCont(pMsg->pCont); destroySendMsgInfo(pSendInfo); + + taosMemoryFree(arg); +} + +void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { + SSchedMsg schedMsg = {0}; + + SEpSet* tEpSet = pEpSet != NULL ? taosMemoryCalloc(1, sizeof(SEpSet)) : NULL; + if (tEpSet != NULL) { + *tEpSet = *pEpSet; + } + + SchedArg* arg = taosMemoryCalloc(1, sizeof(SchedArg)); + arg->msg = *pMsg; + arg->pEpset = tEpSet; + + schedMsg.fp = doProcessMsgFromServer; + schedMsg.ahandle = arg; + taosScheduleTask(tscQhandle, &schedMsg); } TAOS* taos_connect_auth(const char* ip, const char* user, const char* auth, const char* db, uint16_t port) { @@ -1412,7 +1442,7 @@ void* doAsyncFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertU pParam = taosMemoryCalloc(1, sizeof(SSyncQueryParam)); tsem_init(&pParam->sem, 0, 0); } - + // convert ucs4 to native multi-bytes string pResultInfo->convertUcs4 = convertUcs4; diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index d8a9ce581a656934d50359ce9c19f82ddbeb6fdf..dcfe65f6beb9a7e39d5abe0abdb4897f7526d9cd 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -47,11 +47,9 @@ int taos_options(TSDB_OPTION option, const void *arg, ...) { atomic_store_32(&lock, 0); return ret; } - // this function may be called by user or system, or by both simultaneously. void taos_cleanup(void) { - tscInfo("start to cleanup client environment"); - + tscInfo("start to cleanup client environment"); if (atomic_val_compare_exchange_32(&sentinel, TSC_VAR_NOT_RELEASE, TSC_VAR_RELEASED) != TSC_VAR_NOT_RELEASE) { return; } @@ -74,8 +72,8 @@ void taos_cleanup(void) { catalogDestroy(); schedulerDestroy(); + cleanupTscQhandle(); rpcCleanup(); - tscInfo("all local resources released"); taosCleanupCfg(); taosCloseLog(); @@ -108,7 +106,7 @@ TAOS *taos_connect(const char *ip, const char *user, const char *pass, const cha if (pObj) { int64_t *rid = taosMemoryCalloc(1, sizeof(int64_t)); *rid = pObj->id; - return (TAOS*)rid; + return (TAOS *)rid; } return NULL; @@ -196,9 +194,9 @@ void taos_kill_query(TAOS *taos) { if (NULL == taos) { return; } - int64_t rid = *(int64_t*)taos; - - STscObj* pTscObj = acquireTscObj(rid); + int64_t rid = *(int64_t *)taos; + + STscObj *pTscObj = acquireTscObj(rid); closeAllRequests(pTscObj->pRequests); releaseTscObj(rid); } @@ -244,7 +242,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) { #endif } else if (TD_RES_TMQ(res)) { - SMqRspObj *msg = ((SMqRspObj *)res); + SMqRspObj * msg = ((SMqRspObj *)res); SReqResultInfo *pResultInfo; if (msg->resIter == -1) { pResultInfo = tmqGetNextResInfo(res, true); @@ -420,7 +418,7 @@ int taos_affected_rows(TAOS_RES *res) { return 0; } - SRequestObj *pRequest = (SRequestObj *)res; + SRequestObj * pRequest = (SRequestObj *)res; SReqResultInfo *pResInfo = &pRequest->body.resInfo; return pResInfo->numOfRows; } @@ -606,7 +604,7 @@ int *taos_get_column_data_offset(TAOS_RES *res, int columnIndex) { } SReqResultInfo *pResInfo = tscGetCurResInfo(res); - TAOS_FIELD *pField = &pResInfo->userFields[columnIndex]; + TAOS_FIELD * pField = &pResInfo->userFields[columnIndex]; if (!IS_VAR_DATA_TYPE(pField->type)) { return 0; } @@ -650,8 +648,8 @@ const char *taos_get_server_info(TAOS *taos) { typedef struct SqlParseWrapper { SParseContext *pCtx; SCatalogReq catalogReq; - SRequestObj *pRequest; - SQuery *pQuery; + SRequestObj * pRequest; + SQuery * pQuery; } SqlParseWrapper; static void destorySqlParseWrapper(SqlParseWrapper *pWrapper) { @@ -672,8 +670,8 @@ void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) { tscDebug("enter meta callback, code %s", tstrerror(code)); SqlParseWrapper *pWrapper = (SqlParseWrapper *)param; - SQuery *pQuery = pWrapper->pQuery; - SRequestObj *pRequest = pWrapper->pRequest; + SQuery * pQuery = pWrapper->pQuery; + SRequestObj * pRequest = pWrapper->pRequest; if (code == TSDB_CODE_SUCCESS) { code = qAnalyseSqlSemantic(pWrapper->pCtx, &pWrapper->catalogReq, pResultMeta, pQuery); @@ -722,31 +720,29 @@ int32_t createParseContext(const SRequestObj *pRequest, SParseContext **pCxt) { return TSDB_CODE_OUT_OF_MEMORY; } - **pCxt = (SParseContext){ - .requestId = pRequest->requestId, - .requestRid = pRequest->self, - .acctId = pTscObj->acctId, - .db = pRequest->pDb, - .topicQuery = false, - .pSql = pRequest->sqlstr, - .sqlLen = pRequest->sqlLen, - .pMsg = pRequest->msgBuf, - .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE, - .pTransporter = pTscObj->pAppInfo->pTransporter, - .pStmtCb = NULL, - .pUser = pTscObj->user, - .schemalessType = pTscObj->schemalessType, - .isSuperUser = (0 == strcmp(pTscObj->user, TSDB_DEFAULT_USER)), - .async = true, - .svrVer = pTscObj->sVer, - .nodeOffline = (pTscObj->pAppInfo->onlineDnodes < pTscObj->pAppInfo->totalDnodes) - }; + **pCxt = (SParseContext){.requestId = pRequest->requestId, + .requestRid = pRequest->self, + .acctId = pTscObj->acctId, + .db = pRequest->pDb, + .topicQuery = false, + .pSql = pRequest->sqlstr, + .sqlLen = pRequest->sqlLen, + .pMsg = pRequest->msgBuf, + .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE, + .pTransporter = pTscObj->pAppInfo->pTransporter, + .pStmtCb = NULL, + .pUser = pTscObj->user, + .schemalessType = pTscObj->schemalessType, + .isSuperUser = (0 == strcmp(pTscObj->user, TSDB_DEFAULT_USER)), + .async = true, + .svrVer = pTscObj->sVer, + .nodeOffline = (pTscObj->pAppInfo->onlineDnodes < pTscObj->pAppInfo->totalDnodes)}; return TSDB_CODE_SUCCESS; } void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) { SParseContext *pCxt = NULL; - STscObj *pTscObj = pRequest->pTscObj; + STscObj * pTscObj = pRequest->pTscObj; int32_t code = 0; if (pRequest->retry++ > REQUEST_TOTAL_EXEC_TIMES) { @@ -911,10 +907,10 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) { return terrno; } - int64_t rid = *(int64_t*)taos; + int64_t rid = *(int64_t *)taos; const int32_t MAX_TABLE_NAME_LENGTH = 12 * 1024 * 1024; // 12MB list int32_t code = 0; - SRequestObj *pRequest = NULL; + SRequestObj * pRequest = NULL; SCatalogReq catalogReq = {0}; if (NULL == tableNameList) {