From 8fc86a31d0011761ddc7e365bda36ab62e2d63a3 Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Thu, 4 Aug 2022 20:53:43 +0800 Subject: [PATCH] enh: asynchronous catalog interface optimization --- include/libs/catalog/catalog.h | 9 +- source/libs/parser/inc/parUtil.h | 7 +- source/libs/parser/src/parUtil.c | 102 +++++++++++++++--- source/libs/parser/src/parser.c | 8 +- .../libs/parser/test/mockCatalogService.cpp | 43 +++++--- source/libs/parser/test/mockCatalogService.h | 1 + source/libs/parser/test/parInsertTest.cpp | 13 ++- source/libs/parser/test/parTestUtil.cpp | 11 +- 8 files changed, 151 insertions(+), 43 deletions(-) diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h index 1d1849f24a..ee566d759a 100644 --- a/include/libs/catalog/catalog.h +++ b/include/libs/catalog/catalog.h @@ -58,12 +58,17 @@ typedef struct SDbInfo { int64_t dbId; } SDbInfo; +typedef struct STablesReq { + char dbFName[TSDB_DB_FNAME_LEN]; + SArray* pTables; +} STablesReq; + typedef struct SCatalogReq { SArray* pDbVgroup; // element is db full name SArray* pDbCfg; // element is db full name SArray* pDbInfo; // element is db full name - SArray* pTableMeta; // element is SNAME - SArray* pTableHash; // element is SNAME + SArray* pTableMeta; // element is STablesReq + SArray* pTableHash; // element is STablesReq SArray* pUdf; // element is udf name SArray* pIndex; // element is index name SArray* pUser; // element is SUserAuthInfo diff --git a/source/libs/parser/inc/parUtil.h b/source/libs/parser/inc/parUtil.h index 896e2bc239..24f34637bf 100644 --- a/source/libs/parser/inc/parUtil.h +++ b/source/libs/parser/inc/parUtil.h @@ -38,6 +38,11 @@ typedef struct SMsgBuf { char* buf; } SMsgBuf; +typedef struct SParseTablesMetaReq { + char dbFName[TSDB_DB_FNAME_LEN]; + SHashObj* pTables; +} SParseTablesMetaReq; + typedef struct SParseMetaCache { SHashObj* pTableMeta; // key is tbFName, element is STableMeta* SHashObj* pDbVgroup; // key is dbFName, element is SArray* @@ -94,7 +99,7 @@ int32_t getUdfInfoFromCache(SParseMetaCache* pMetaCache, const char* pFunc, SFun int32_t getTableIndexFromCache(SParseMetaCache* pMetaCache, const SName* pName, SArray** pIndexes); int32_t getTableCfgFromCache(SParseMetaCache* pMetaCache, const SName* pName, STableCfg** pOutput); int32_t getDnodeListFromCache(SParseMetaCache* pMetaCache, SArray** pDnodes); -void destoryParseMetaCache(SParseMetaCache* pMetaCache); +void destoryParseMetaCache(SParseMetaCache* pMetaCache, bool request); #ifdef __cplusplus } diff --git a/source/libs/parser/src/parUtil.c b/source/libs/parser/src/parUtil.c index e51800aece..ae5a281aab 100644 --- a/source/libs/parser/src/parUtil.c +++ b/source/libs/parser/src/parUtil.c @@ -474,6 +474,24 @@ static int32_t buildDbReq(SHashObj* pDbsHash, SArray** pDbs) { return TSDB_CODE_SUCCESS; } +static int32_t buildTableReqFromDb(SHashObj* pDbsHash, SArray** pDbs) { + if (NULL != pDbsHash) { + *pDbs = taosArrayInit(taosHashGetSize(pDbsHash), sizeof(STablesReq)); + if (NULL == *pDbs) { + return TSDB_CODE_OUT_OF_MEMORY; + } + SParseTablesMetaReq* p = taosHashIterate(pDbsHash, NULL); + while (NULL != p) { + STablesReq req = {0}; + strcpy(req.dbFName, p->dbFName); + buildTableReq(p->pTables, &req.pTables); + taosArrayPush(*pDbs, &req); + p = taosHashIterate(pDbsHash, p); + } + } + return TSDB_CODE_SUCCESS; +} + static int32_t buildUserAuthReq(SHashObj* pUserAuthHash, SArray** pUserAuth) { if (NULL != pUserAuthHash) { *pUserAuth = taosArrayInit(taosHashGetSize(pUserAuthHash), sizeof(SUserAuthInfo)); @@ -513,12 +531,12 @@ static int32_t buildUdfReq(SHashObj* pUdfHash, SArray** pUdf) { } int32_t buildCatalogReq(const SParseMetaCache* pMetaCache, SCatalogReq* pCatalogReq) { - int32_t code = buildTableReq(pMetaCache->pTableMeta, &pCatalogReq->pTableMeta); + int32_t code = buildTableReqFromDb(pMetaCache->pTableMeta, &pCatalogReq->pTableMeta); if (TSDB_CODE_SUCCESS == code) { code = buildDbReq(pMetaCache->pDbVgroup, &pCatalogReq->pDbVgroup); } if (TSDB_CODE_SUCCESS == code) { - code = buildTableReq(pMetaCache->pTableVgroup, &pCatalogReq->pTableHash); + code = buildTableReqFromDb(pMetaCache->pTableVgroup, &pCatalogReq->pTableHash); } if (TSDB_CODE_SUCCESS == code) { code = buildDbReq(pMetaCache->pDbCfg, &pCatalogReq->pDbCfg); @@ -587,6 +605,24 @@ static int32_t putDbDataToCache(const SArray* pDbReq, const SArray* pDbData, SHa return TSDB_CODE_SUCCESS; } +static int32_t putDbTableDataToCache(const SArray* pDbReq, const SArray* pTableData, SHashObj** pTable) { + int32_t ndbs = taosArrayGetSize(pDbReq); + int32_t tableNo = 0; + for (int32_t i = 0; i < ndbs; ++i) { + STablesReq* pReq = taosArrayGet(pDbReq, i); + int32_t ntables = taosArrayGetSize(pReq->pTables); + for (int32_t j = 0; j < ntables; ++j) { + char fullName[TSDB_TABLE_FNAME_LEN]; + tNameExtractFullName(taosArrayGet(pReq->pTables, j), fullName); + if (TSDB_CODE_SUCCESS != putMetaDataToHash(fullName, strlen(fullName), pTableData, tableNo, pTable)) { + return TSDB_CODE_OUT_OF_MEMORY; + } + ++tableNo; + } + } + return TSDB_CODE_SUCCESS; +} + static int32_t putUserAuthToCache(const SArray* pUserAuthReq, const SArray* pUserAuthData, SHashObj** pUserAuth) { int32_t nvgs = taosArrayGetSize(pUserAuthReq); for (int32_t i = 0; i < nvgs; ++i) { @@ -612,12 +648,12 @@ static int32_t putUdfToCache(const SArray* pUdfReq, const SArray* pUdfData, SHas } int32_t putMetaDataToCache(const SCatalogReq* pCatalogReq, const SMetaData* pMetaData, SParseMetaCache* pMetaCache) { - int32_t code = putTableDataToCache(pCatalogReq->pTableMeta, pMetaData->pTableMeta, &pMetaCache->pTableMeta); + int32_t code = putDbTableDataToCache(pCatalogReq->pTableMeta, pMetaData->pTableMeta, &pMetaCache->pTableMeta); if (TSDB_CODE_SUCCESS == code) { code = putDbDataToCache(pCatalogReq->pDbVgroup, pMetaData->pDbVgroup, &pMetaCache->pDbVgroup); } if (TSDB_CODE_SUCCESS == code) { - code = putTableDataToCache(pCatalogReq->pTableHash, pMetaData->pTableHash, &pMetaCache->pTableVgroup); + code = putDbTableDataToCache(pCatalogReq->pTableHash, pMetaData->pTableHash, &pMetaCache->pTableVgroup); } if (TSDB_CODE_SUCCESS == code) { code = putDbDataToCache(pCatalogReq->pDbCfg, pMetaData->pDbCfg, &pMetaCache->pDbCfg); @@ -657,14 +693,38 @@ static int32_t reserveTableReqInCache(int32_t acctId, const char* pDb, const cha return reserveTableReqInCacheImpl(fullName, len, pTables); } +static int32_t reserveTableReqInDbCacheImpl(int32_t acctId, const char* pDb, const char* pTable, SHashObj* pDbs) { + SParseTablesMetaReq req = {0}; + int32_t len = snprintf(req.dbFName, sizeof(req.dbFName), "%d.%s", acctId, pDb); + int32_t code = reserveTableReqInCache(acctId, pDb, pTable, &req.pTables); + if (TSDB_CODE_SUCCESS == code) { + code = taosHashPut(pDbs, req.dbFName, len, &req, sizeof(SParseTablesMetaReq)); + } + return code; +} + +static int32_t reserveTableReqInDbCache(int32_t acctId, const char* pDb, const char* pTable, SHashObj** pDbs) { + if (NULL == *pDbs) { + *pDbs = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); + if (NULL == *pDbs) { + return TSDB_CODE_OUT_OF_MEMORY; + } + } + char fullName[TSDB_DB_FNAME_LEN]; + int32_t len = snprintf(fullName, sizeof(fullName), "%d.%s", acctId, pDb); + SParseTablesMetaReq* pReq = taosHashGet(*pDbs, fullName, len); + if (NULL == pReq) { + return reserveTableReqInDbCacheImpl(acctId, pDb, pTable, *pDbs); + } + return reserveTableReqInCache(acctId, pDb, pTable, &pReq->pTables); +} + int32_t reserveTableMetaInCache(int32_t acctId, const char* pDb, const char* pTable, SParseMetaCache* pMetaCache) { - return reserveTableReqInCache(acctId, pDb, pTable, &pMetaCache->pTableMeta); + return reserveTableReqInDbCache(acctId, pDb, pTable, &pMetaCache->pTableMeta); } int32_t reserveTableMetaInCacheExt(const SName* pName, SParseMetaCache* pMetaCache) { - char fullName[TSDB_TABLE_FNAME_LEN]; - tNameExtractFullName(pName, fullName); - return reserveTableReqInCacheImpl(fullName, strlen(fullName), &pMetaCache->pTableMeta); + return reserveTableReqInDbCache(pName->acctId, pName->dbname, pName->tname, &pMetaCache->pTableMeta); } int32_t getTableMetaFromCache(SParseMetaCache* pMetaCache, const SName* pName, STableMeta** pMeta) { @@ -711,13 +771,11 @@ int32_t getDbVgInfoFromCache(SParseMetaCache* pMetaCache, const char* pDbFName, } int32_t reserveTableVgroupInCache(int32_t acctId, const char* pDb, const char* pTable, SParseMetaCache* pMetaCache) { - return reserveTableReqInCache(acctId, pDb, pTable, &pMetaCache->pTableVgroup); + return reserveTableReqInDbCache(acctId, pDb, pTable, &pMetaCache->pTableVgroup); } int32_t reserveTableVgroupInCacheExt(const SName* pName, SParseMetaCache* pMetaCache) { - char fullName[TSDB_TABLE_FNAME_LEN]; - tNameExtractFullName(pName, fullName); - return reserveTableReqInCacheImpl(fullName, strlen(fullName), &pMetaCache->pTableVgroup); + return reserveTableReqInDbCache(pName->acctId, pName->dbname, pName->tname, &pMetaCache->pTableVgroup); } int32_t getTableVgroupFromCache(SParseMetaCache* pMetaCache, const SName* pName, SVgroupInfo* pVgroup) { @@ -919,10 +977,24 @@ int32_t getDnodeListFromCache(SParseMetaCache* pMetaCache, SArray** pDnodes) { return TSDB_CODE_SUCCESS; } -void destoryParseMetaCache(SParseMetaCache* pMetaCache) { - taosHashCleanup(pMetaCache->pTableMeta); +void destoryParseTablesMetaReqHash(SHashObj* pHash) { + SParseTablesMetaReq* p = taosHashIterate(pHash, NULL); + while (NULL != p) { + taosHashCleanup(p->pTables); + p = taosHashIterate(pHash, p); + } + taosHashCleanup(pHash); +} + +void destoryParseMetaCache(SParseMetaCache* pMetaCache, bool request) { + if (request) { + destoryParseTablesMetaReqHash(pMetaCache->pTableMeta); + destoryParseTablesMetaReqHash(pMetaCache->pTableVgroup); + } else { + taosHashCleanup(pMetaCache->pTableMeta); + taosHashCleanup(pMetaCache->pTableVgroup); + } taosHashCleanup(pMetaCache->pDbVgroup); - taosHashCleanup(pMetaCache->pTableVgroup); taosHashCleanup(pMetaCache->pDbCfg); taosHashCleanup(pMetaCache->pDbInfo); taosHashCleanup(pMetaCache->pUserAuth); diff --git a/source/libs/parser/src/parser.c b/source/libs/parser/src/parser.c index 53ee717af2..34cd783ace 100644 --- a/source/libs/parser/src/parser.c +++ b/source/libs/parser/src/parser.c @@ -85,13 +85,13 @@ static int32_t setValueByBindParam(SValueNode* pVal, TAOS_MULTI_BIND* pParam) { if (IS_VAR_DATA_TYPE(pVal->node.resType.type)) { taosMemoryFreeClear(pVal->datum.p); } - + if (pParam->is_null && 1 == *(pParam->is_null)) { pVal->node.resType.type = TSDB_DATA_TYPE_NULL; pVal->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_NULL].bytes; return TSDB_CODE_SUCCESS; } - + int32_t inputSize = (NULL != pParam->length ? *(pParam->length) : tDataTypes[pParam->buffer_type].bytes); pVal->node.resType.type = pParam->buffer_type; pVal->node.resType.bytes = inputSize; @@ -187,7 +187,7 @@ int32_t qParseSqlSyntax(SParseContext* pCxt, SQuery** pQuery, struct SCatalogReq if (TSDB_CODE_SUCCESS == code) { code = buildCatalogReq(&metaCache, pCatalogReq); } - destoryParseMetaCache(&metaCache); + destoryParseMetaCache(&metaCache, true); terrno = code; return code; } @@ -203,7 +203,7 @@ int32_t qAnalyseSqlSemantic(SParseContext* pCxt, const struct SCatalogReq* pCata code = analyseSemantic(pCxt, pQuery, &metaCache); } } - destoryParseMetaCache(&metaCache); + destoryParseMetaCache(&metaCache, false); terrno = code; return code; } diff --git a/source/libs/parser/test/mockCatalogService.cpp b/source/libs/parser/test/mockCatalogService.cpp index 4158453110..6717a1fdf1 100644 --- a/source/libs/parser/test/mockCatalogService.cpp +++ b/source/libs/parser/test/mockCatalogService.cpp @@ -472,12 +472,16 @@ class MockCatalogServiceImpl { int32_t getAllTableMeta(SArray* pTableMetaReq, SArray** pTableMetaData) const { if (NULL != pTableMetaReq) { - int32_t ntables = taosArrayGetSize(pTableMetaReq); - *pTableMetaData = taosArrayInit(ntables, sizeof(SMetaRes)); - for (int32_t i = 0; i < ntables; ++i) { - SMetaRes res = {0}; - res.code = catalogGetTableMeta((const SName*)taosArrayGet(pTableMetaReq, i), (STableMeta**)&res.pRes); - taosArrayPush(*pTableMetaData, &res); + int32_t ndbs = taosArrayGetSize(pTableMetaReq); + *pTableMetaData = taosArrayInit(ndbs, sizeof(SMetaRes)); + for (int32_t i = 0; i < ndbs; ++i) { + STablesReq* pReq = (STablesReq*)taosArrayGet(pTableMetaReq, i); + int32_t ntables = taosArrayGetSize(pReq->pTables); + for (int32_t j = 0; j < ntables; ++j) { + SMetaRes res = {0}; + res.code = catalogGetTableMeta((const SName*)taosArrayGet(pReq->pTables, j), (STableMeta**)&res.pRes); + taosArrayPush(*pTableMetaData, &res); + } } } return TSDB_CODE_SUCCESS; @@ -485,13 +489,17 @@ class MockCatalogServiceImpl { int32_t getAllTableVgroup(SArray* pTableVgroupReq, SArray** pTableVgroupData) const { if (NULL != pTableVgroupReq) { - int32_t ntables = taosArrayGetSize(pTableVgroupReq); - *pTableVgroupData = taosArrayInit(ntables, sizeof(SMetaRes)); - for (int32_t i = 0; i < ntables; ++i) { - SMetaRes res = {0}; - res.pRes = taosMemoryCalloc(1, sizeof(SVgroupInfo)); - res.code = catalogGetTableHashVgroup((const SName*)taosArrayGet(pTableVgroupReq, i), (SVgroupInfo*)res.pRes); - taosArrayPush(*pTableVgroupData, &res); + int32_t ndbs = taosArrayGetSize(pTableVgroupReq); + *pTableVgroupData = taosArrayInit(ndbs, sizeof(SMetaRes)); + for (int32_t i = 0; i < ndbs; ++i) { + STablesReq* pReq = (STablesReq*)taosArrayGet(pTableVgroupReq, i); + int32_t ntables = taosArrayGetSize(pReq->pTables); + for (int32_t j = 0; j < ntables; ++j) { + SMetaRes res = {0}; + res.pRes = taosMemoryCalloc(1, sizeof(SVgroupInfo)); + res.code = catalogGetTableHashVgroup((const SName*)taosArrayGet(pReq->pTables, j), (SVgroupInfo*)res.pRes); + taosArrayPush(*pTableVgroupData, &res); + } } } return TSDB_CODE_SUCCESS; @@ -677,12 +685,17 @@ int32_t MockCatalogService::catalogGetAllMeta(const SCatalogReq* pCatalogReq, SM return impl_->catalogGetAllMeta(pCatalogReq, pMetaData); } +void MockCatalogService::destoryTablesReq(void* p) { + STablesReq* pRes = (STablesReq*)p; + taosArrayDestroy(pRes->pTables); +} + void MockCatalogService::destoryCatalogReq(SCatalogReq* pReq) { taosArrayDestroy(pReq->pDbVgroup); taosArrayDestroy(pReq->pDbCfg); taosArrayDestroy(pReq->pDbInfo); - taosArrayDestroy(pReq->pTableMeta); - taosArrayDestroy(pReq->pTableHash); + taosArrayDestroyEx(pReq->pTableMeta, destoryTablesReq); + taosArrayDestroyEx(pReq->pTableHash, destoryTablesReq); taosArrayDestroy(pReq->pUdf); taosArrayDestroy(pReq->pIndex); taosArrayDestroy(pReq->pUser); diff --git a/source/libs/parser/test/mockCatalogService.h b/source/libs/parser/test/mockCatalogService.h index d76a6abca8..c0330692ee 100644 --- a/source/libs/parser/test/mockCatalogService.h +++ b/source/libs/parser/test/mockCatalogService.h @@ -50,6 +50,7 @@ struct MockTableMeta { class MockCatalogServiceImpl; class MockCatalogService { public: + static void destoryTablesReq(void* p); static void destoryCatalogReq(SCatalogReq* pReq); static void destoryMetaRes(void* p); static void destoryMetaArrayRes(void* p); diff --git a/source/libs/parser/test/parInsertTest.cpp b/source/libs/parser/test/parInsertTest.cpp index 22a1be2579..6014a97efa 100644 --- a/source/libs/parser/test/parInsertTest.cpp +++ b/source/libs/parser/test/parInsertTest.cpp @@ -13,6 +13,8 @@ * along with this program. If not, see . */ +#include + #include #include "mockCatalogService.h" @@ -20,6 +22,7 @@ #include "parInt.h" using namespace std; +using namespace std::placeholders; using namespace testing; namespace { @@ -63,7 +66,9 @@ class InsertTest : public Test { int32_t runAsync() { cxt_.async = true; - unique_ptr metaCache(new SParseMetaCache(), _destoryParseMetaCache); + bool request = true; + unique_ptr > metaCache( + new SParseMetaCache(), std::bind(_destoryParseMetaCache, _1, cref(request))); code_ = parseInsertSyntax(&cxt_, &res_, metaCache.get()); if (code_ != TSDB_CODE_SUCCESS) { cout << "parseInsertSyntax code:" << toString(code_) << ", msg:" << errMagBuf_ << endl; @@ -81,6 +86,8 @@ class InsertTest : public Test { unique_ptr metaData(new SMetaData(), MockCatalogService::destoryMetaData); g_mockCatalogService->catalogGetAllMeta(catalogReq.get(), metaData.get()); + metaCache.reset(new SParseMetaCache()); + request = false; code_ = putMetaDataToCache(catalogReq.get(), metaData.get(), metaCache.get()); if (code_ != TSDB_CODE_SUCCESS) { cout << "putMetaDataToCache code:" << toString(code_) << ", msg:" << errMagBuf_ << endl; @@ -144,8 +151,8 @@ class InsertTest : public Test { static const int max_err_len = 1024; static const int max_sql_len = 1024 * 1024; - static void _destoryParseMetaCache(SParseMetaCache* pMetaCache) { - destoryParseMetaCache(pMetaCache); + static void _destoryParseMetaCache(SParseMetaCache* pMetaCache, bool request) { + destoryParseMetaCache(pMetaCache, request); delete pMetaCache; } diff --git a/source/libs/parser/test/parTestUtil.cpp b/source/libs/parser/test/parTestUtil.cpp index 235cc487fb..3fe4b533e4 100644 --- a/source/libs/parser/test/parTestUtil.cpp +++ b/source/libs/parser/test/parTestUtil.cpp @@ -24,6 +24,7 @@ #include "parInt.h" using namespace std; +using namespace std::placeholders; using namespace testing; namespace ParserTest { @@ -118,8 +119,8 @@ class ParserTestBaseImpl { TEST_INTERFACE_ASYNC_API }; - static void _destoryParseMetaCache(SParseMetaCache* pMetaCache) { - destoryParseMetaCache(pMetaCache); + static void _destoryParseMetaCache(SParseMetaCache* pMetaCache, bool request) { + destoryParseMetaCache(pMetaCache, request); delete pMetaCache; } @@ -340,7 +341,9 @@ class ParserTestBaseImpl { doParse(&cxt, query.get()); SQuery* pQuery = *(query.get()); - unique_ptr metaCache(new SParseMetaCache(), _destoryParseMetaCache); + bool request = true; + unique_ptr > metaCache( + new SParseMetaCache(), bind(_destoryParseMetaCache, _1, cref(request))); doCollectMetaKey(&cxt, pQuery, metaCache.get()); unique_ptr catalogReq(new SCatalogReq(), @@ -353,6 +356,8 @@ class ParserTestBaseImpl { unique_ptr metaData(new SMetaData(), MockCatalogService::destoryMetaData); doGetAllMeta(catalogReq.get(), metaData.get()); + metaCache.reset(new SParseMetaCache()); + request = false; doPutMetaDataToCache(catalogReq.get(), metaData.get(), metaCache.get()); doAuthenticate(&cxt, pQuery, metaCache.get()); -- GitLab