From 42d6ab3b00782d2896ed1f9c9b4746cab1c1fd6a Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 12 Jul 2022 20:00:26 +0800 Subject: [PATCH] refactor client queue --- source/client/inc/clientInt.h | 10 ++++----- source/client/src/clientEnv.c | 24 ++++++--------------- source/client/src/clientImpl.c | 11 ++++------ source/client/src/clientMain.c | 36 ++++++++++++++++---------------- source/libs/qcom/src/queryUtil.c | 12 +++++------ 5 files changed, 38 insertions(+), 55 deletions(-) diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index 91f21f6e6a..d6f5001571 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -222,8 +222,8 @@ typedef struct SRequestObj { SArray* tableList; SQueryExecMetric metric; SRequestSendRecvBody body; - bool stableQuery; // todo refactor - bool validateOnly; // todo refactor + bool stableQuery; // todo refactor + bool validateOnly; // todo refactor bool killed; uint32_t prevCode; // previous error code: todo refactor, add update flag for catalog @@ -324,7 +324,8 @@ int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery, SStmtC int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray* pNodeList); -int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param, bool validateSql, SRequestObj** pRequest); +int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param, bool validateSql, + SRequestObj** pRequest); void taos_close_internal(void* taos); @@ -358,9 +359,6 @@ int32_t removeMeta(STscObj* pTscObj, SArray* tbList); // todo move to clie int32_t handleAlterTbExecRes(void* res, struct SCatalog* pCatalog); // todo move to xxx bool qnodeRequired(SRequestObj* pRequest); -void initTscQhandle(); -void cleanupTscQhandle(); - #ifdef __cplusplus } #endif diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index ba92ed238b..a65b803dbc 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -35,22 +35,10 @@ SAppInfo appInfo; int32_t clientReqRefPool = -1; int32_t clientConnRefPool = -1; -void *tscQhandle = NULL; - static TdThreadOnce tscinit = PTHREAD_ONCE_INIT; volatile int32_t tscInitRes = 0; -void initTscQhandle() { - // init handle - tscQhandle = taosInitScheduler(4096, 5, "tsc"); -} - -void cleanupTscQhandle() { - // destroy handle - taosCleanUpScheduler(tscQhandle); -} - -static int32_t registerRequest(SRequestObj *pRequest, STscObj* pTscObj) { +static int32_t registerRequest(SRequestObj *pRequest, STscObj *pTscObj) { // connection has been released already, abort creating request. pRequest->self = taosAddRef(clientReqRefPool, pRequest); @@ -72,7 +60,7 @@ static int32_t registerRequest(SRequestObj *pRequest, STscObj* pTscObj) { static void deregisterRequest(SRequestObj *pRequest) { assert(pRequest != NULL); - STscObj * pTscObj = pRequest->pTscObj; + STscObj *pTscObj = pRequest->pTscObj; SAppClusterSummary *pActivity = &pTscObj->pAppInfo->summary; int32_t currentInst = atomic_sub_fetch_64((int64_t *)&pActivity->currentRequests, 1); @@ -97,7 +85,8 @@ void closeTransporter(SAppInstInfo *pAppInfo) { static bool clientRpcRfp(int32_t code, tmsg_t msgType) { if (NEED_REDIRECT_ERROR(code)) { - if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH || msgType == TDMT_SCH_MERGE_FETCH) { + if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH || + msgType == TDMT_SCH_MERGE_FETCH) { return false; } return true; @@ -248,7 +237,7 @@ void *createRequest(uint64_t connId, int32_t type) { return NULL; } - STscObj* pTscObj = acquireTscObj(connId); + STscObj *pTscObj = acquireTscObj(connId); if (pTscObj == NULL) { terrno = TSDB_CODE_TSC_DISCONNECTED; return NULL; @@ -345,7 +334,6 @@ void taos_init_imp(void) { // In the APIs of other program language, taos_cleanup is not available yet. // So, to make sure taos_cleanup will be invoked to clean up the allocated resource to suppress the valgrind warning. atexit(taos_cleanup); - initTscQhandle(); errno = TSDB_CODE_SUCCESS; taosSeedRand(taosGetTimestampSec()); @@ -404,7 +392,7 @@ int taos_options_imp(TSDB_OPTION option, const char *str) { return 0; } - SConfig * pCfg = taosGetCfg(); + SConfig *pCfg = taosGetCfg(); SConfigItem *pItem = NULL; switch (option) { diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 1bb179b962..c844f8a966 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -1274,8 +1274,8 @@ typedef struct SchedArg { SEpSet* pEpset; } SchedArg; -void doProcessMsgFromServer(SSchedMsg* schedMsg) { - SchedArg* arg = (SchedArg*)schedMsg->ahandle; +int32_t doProcessMsgFromServer(void* param) { + SchedArg* arg = (SchedArg*)param; SRpcMsg* pMsg = &arg->msg; SEpSet* pEpSet = arg->pEpset; @@ -1328,11 +1328,10 @@ void doProcessMsgFromServer(SSchedMsg* schedMsg) { rpcFreeCont(pMsg->pCont); destroySendMsgInfo(pSendInfo); taosMemoryFree(arg); + return TSDB_CODE_SUCCESS; } void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { - SSchedMsg schedMsg = {0}; - SEpSet* tEpSet = NULL; if (pEpSet != NULL) { tEpSet = taosMemoryCalloc(1, sizeof(SEpSet)); @@ -1343,9 +1342,7 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { arg->msg = *pMsg; arg->pEpset = tEpSet; - schedMsg.fp = doProcessMsgFromServer; - schedMsg.ahandle = arg; - taosScheduleTask(tscQhandle, &schedMsg); + taosAsyncExec(doProcessMsgFromServer, arg, NULL); } TAOS* taos_connect_auth(const char* ip, const char* user, const char* auth, const char* db, uint16_t port) { diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 12de522cbc..0b890b6749 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -72,7 +72,6 @@ void taos_cleanup(void) { catalogDestroy(); schedulerDestroy(); - cleanupTscQhandle(); rpcCleanup(); tscInfo("all local resources released"); taosCleanupCfg(); @@ -242,7 +241,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) { #endif } else if (TD_RES_TMQ(res)) { - SMqRspObj * msg = ((SMqRspObj *)res); + SMqRspObj *msg = ((SMqRspObj *)res); SReqResultInfo *pResultInfo; if (msg->resIter == -1) { pResultInfo = tmqGetNextResInfo(res, true); @@ -418,7 +417,7 @@ int taos_affected_rows(TAOS_RES *res) { return 0; } - SRequestObj * pRequest = (SRequestObj *)res; + SRequestObj *pRequest = (SRequestObj *)res; SReqResultInfo *pResInfo = &pRequest->body.resInfo; return pResInfo->numOfRows; } @@ -601,7 +600,7 @@ int *taos_get_column_data_offset(TAOS_RES *res, int columnIndex) { } SReqResultInfo *pResInfo = tscGetCurResInfo(res); - TAOS_FIELD * pField = &pResInfo->userFields[columnIndex]; + TAOS_FIELD *pField = &pResInfo->userFields[columnIndex]; if (!IS_VAR_DATA_TYPE(pField->type)) { return 0; } @@ -645,8 +644,8 @@ const char *taos_get_server_info(TAOS *taos) { typedef struct SqlParseWrapper { SParseContext *pCtx; SCatalogReq catalogReq; - SRequestObj * pRequest; - SQuery * pQuery; + SRequestObj *pRequest; + SQuery *pQuery; } SqlParseWrapper; static void destorySqlParseWrapper(SqlParseWrapper *pWrapper) { @@ -665,8 +664,8 @@ static void destorySqlParseWrapper(SqlParseWrapper *pWrapper) { void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) { SqlParseWrapper *pWrapper = (SqlParseWrapper *)param; - SQuery * pQuery = pWrapper->pQuery; - SRequestObj * pRequest = pWrapper->pRequest; + SQuery *pQuery = pWrapper->pQuery; + SRequestObj *pRequest = pWrapper->pRequest; if (code == TSDB_CODE_SUCCESS) { code = qAnalyseSqlSemantic(pWrapper->pCtx, &pWrapper->catalogReq, pResultMeta, pQuery); @@ -684,7 +683,8 @@ void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) { destorySqlParseWrapper(pWrapper); - tscDebug("0x%"PRIx64" analysis semantics completed, start async query, reqId:0x%"PRIx64, pRequest->self, pRequest->requestId); + tscDebug("0x%" PRIx64 " analysis semantics completed, start async query, reqId:0x%" PRIx64, pRequest->self, + pRequest->requestId); launchAsyncQuery(pRequest, pQuery, pResultMeta); } else { destorySqlParseWrapper(pWrapper); @@ -705,7 +705,7 @@ void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) { } void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param) { - int64_t connId = *(int64_t*)taos; + int64_t connId = *(int64_t *)taos; taosAsyncQueryImpl(connId, sql, fp, param, false); } @@ -739,7 +739,7 @@ int32_t createParseContext(const SRequestObj *pRequest, SParseContext **pCxt) { void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) { SParseContext *pCxt = NULL; - STscObj * pTscObj = pRequest->pTscObj; + STscObj *pTscObj = pRequest->pTscObj; int32_t code = 0; if (pRequest->retry++ > REQUEST_TOTAL_EXEC_TIMES) { @@ -865,9 +865,9 @@ void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) { } SSchedulerReq req = { - .syncReq = false, - .fetchFp = fetchCallback, - .cbParam = pRequest, + .syncReq = false, + .fetchFp = fetchCallback, + .cbParam = pRequest, }; schedulerFetchRows(pRequest->body.queryJob, &req); } @@ -876,7 +876,7 @@ void taos_fetch_raw_block_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) { ASSERT(res != NULL && fp != NULL); ASSERT(TD_RES_QUERY(res)); - SRequestObj *pRequest = res; + SRequestObj *pRequest = res; SReqResultInfo *pResultInfo = &pRequest->body.resInfo; // set the current block is all consumed @@ -918,7 +918,7 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) { int64_t connId = *(int64_t *)taos; const int32_t MAX_TABLE_NAME_LENGTH = 12 * 1024 * 1024; // 12MB list int32_t code = 0; - SRequestObj * pRequest = NULL; + SRequestObj *pRequest = NULL; SCatalogReq catalogReq = {0}; if (NULL == tableNameList) { @@ -940,7 +940,7 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) { goto _return; } - STscObj* pTscObj = pRequest->pTscObj; + STscObj *pTscObj = pRequest->pTscObj; code = transferTableNameList(tableNameList, pTscObj->acctId, pTscObj->db, &catalogReq.pTableMeta); if (code) { goto _return; @@ -962,7 +962,7 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) { goto _return; } - SSyncQueryParam* pParam = pRequest->body.param; + SSyncQueryParam *pParam = pRequest->body.param; tsem_wait(&pParam->sem); _return: diff --git a/source/libs/qcom/src/queryUtil.c b/source/libs/qcom/src/queryUtil.c index b4e217ef74..eeb44c4f82 100644 --- a/source/libs/qcom/src/queryUtil.c +++ b/source/libs/qcom/src/queryUtil.c @@ -282,7 +282,7 @@ int32_t dataConverToStr(char* str, int type, void* buf, int32_t bufSize, int32_t } *str = '"'; - int32_t length = taosUcs4ToMbs((TdUcs4 *)buf, bufSize, str + 1); + int32_t length = taosUcs4ToMbs((TdUcs4*)buf, bufSize, str + 1); if (length <= 0) { return TSDB_CODE_TSC_INVALID_VALUE; } @@ -310,15 +310,15 @@ int32_t dataConverToStr(char* str, int type, void* buf, int32_t bufSize, int32_t return TSDB_CODE_TSC_INVALID_VALUE; } - if(len) *len = n; + if (len) *len = n; return TSDB_CODE_SUCCESS; } char* parseTagDatatoJson(void* p) { - char* string = NULL; + char* string = NULL; SArray* pTagVals = NULL; - cJSON* json = NULL; + cJSON* json = NULL; if (tTagToValArray((const STag*)p, &pTagVals) != 0) { goto end; } @@ -327,7 +327,7 @@ char* parseTagDatatoJson(void* p) { if (nCols == 0) { goto end; } - char tagJsonKey[256] = {0}; + char tagJsonKey[256] = {0}; json = cJSON_CreateObject(); if (json == NULL) { goto end; @@ -390,7 +390,7 @@ char* parseTagDatatoJson(void* p) { end: cJSON_Delete(json); taosArrayDestroy(pTagVals); - if(string == NULL){ + if (string == NULL) { string = strdup(TSDB_DATA_NULL_STR_L); } return string; -- GitLab