From 95eb0c9f653556dc2998cf64ab8ff475cbbede3c Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 11 May 2022 15:47:05 +0800 Subject: [PATCH] refactor(query): do some internal refactor. --- include/libs/catalog/catalog.h | 18 ++++++++++++++++++ source/client/inc/clientInt.h | 4 +++- source/client/src/clientMain.c | 4 +++- source/client/src/clientMsgHandler.c | 8 ++++---- source/libs/catalog/src/catalog.c | 6 +++++- 5 files changed, 33 insertions(+), 7 deletions(-) diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h index 04a24c4f32..8bf01f7903 100644 --- a/include/libs/catalog/catalog.h +++ b/include/libs/catalog/catalog.h @@ -122,6 +122,24 @@ int32_t catalogGetDBVgVersion(SCatalog* pCtg, const char* dbFName, int32_t* vers */ int32_t catalogGetDBVgInfo(SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const char* pDBName, SArray** pVgroupList); +typedef void *__async_cb_fn_t(void* pResult, void* param, int32_t code); + +typedef struct { + SCatalog* pCatalog; + void* pTransporter; + SEpSet* pMgmtEps; + char* pDbname; +} CatalogParamWrapper; + +/** + * + * @param pCatalogWrapper + * @param fp + * @param param + * @return + */ +int32_t catalogGetDBVgInfo_a(CatalogParamWrapper* pCatalogWrapper, __async_cb_fn_t fp, void* param); + int32_t catalogUpdateDBVgInfo(SCatalog* pCatalog, const char* dbName, uint64_t dbId, SDBVgInfo* dbInfo); int32_t catalogRemoveDB(SCatalog* pCatalog, const char* dbName, uint64_t dbId); diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index b021651c16..bdbd3e5cbd 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -121,7 +121,7 @@ struct SAppInstInfo { SCorEpSet mgmtEp; SInstanceSummary summary; SList* pConnList; // STscObj linked list - int64_t clusterId; + uint64_t clusterId; void* pTransporter; SAppHbMgr* pAppHbMgr; }; @@ -282,6 +282,8 @@ void initMsgHandleFp(); TAOS* taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db, uint16_t port, int connType); +SRequestObj* launchQuery(STscObj* pTscObj, const char* sql, int sqlLen); + int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery, SStmtCallback* pStmtCb); int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray* pNodeList); diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index e2d9b30d93..f5f2e5dd34 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -572,8 +572,10 @@ void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param int32_t retryNum = 0; int32_t code = 0; + size_t sqlLen = strlen(sql); + while (retryNum++ < REQUEST_MAX_TRY_TIMES) { -// pRequest = launchQuery(pTscObj, sql, sqlLen); + pRequest = launchQuery(taos, sql, sqlLen); if (pRequest == NULL || TSDB_CODE_SUCCESS == pRequest->code || !NEED_CLIENT_HANDLE_ERROR(pRequest->code)) { break; } diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index 8096ce395a..563438ea7f 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -125,10 +125,10 @@ int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) { struct SCatalog* pCatalog = NULL; if (usedbRsp.vgVersion >= 0) { - int32_t code1 = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog); + uint64_t clusterId = pRequest->pTscObj->pAppInfo->clusterId; + int32_t code1 = catalogGetHandle(clusterId, &pCatalog); if (code1 != TSDB_CODE_SUCCESS) { - tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", pRequest->pTscObj->pAppInfo->clusterId, - tstrerror(code1)); + tscWarn("0x%" PRIx64 "catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", pRequest->requestId, clusterId, tstrerror(code1)); } else { catalogRemoveDB(pCatalog, usedbRsp.db, usedbRsp.uid); } @@ -158,7 +158,7 @@ int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) { if (output.dbVgroup) taosHashCleanup(output.dbVgroup->vgHash); taosMemoryFreeClear(output.dbVgroup); - tscError("failed to build use db output since %s", terrstr()); + tscError("0x%" PRIx64" failed to build use db output since %s", pRequest->requestId, terrstr()); } else if (output.dbVgroup) { struct SCatalog* pCatalog = NULL; diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index f485f85809..eba19fdf20 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -563,6 +563,10 @@ int32_t ctgGetQnodeListFromMnode(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmt return TSDB_CODE_SUCCESS; } +//typedef void __taos_async_internal_fn_t(void* param, ) +void ctgGetDBVgInfoFromMnode_a(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, SBuildUseDBInput *input, __taos_async_fn_t fp) { + +} int32_t ctgGetDBVgInfoFromMnode(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, SBuildUseDBInput *input, SUseDbOutput *out) { char *msg = NULL; @@ -1768,7 +1772,7 @@ int32_t ctgCloneVgInfo(SDBVgInfo *src, SDBVgInfo **dst) { return TSDB_CODE_SUCCESS; } - +void ctgGetDBVgInfo_a(); int32_t ctgGetDBVgInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* dbFName, SCtgDBCache** dbCache, SDBVgInfo **pInfo) { bool inCache = false; -- GitLab