From fb035bb3316405d0e9bf4e6e30f6ca3b82c49b06 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Tue, 14 Dec 2021 17:13:31 +0800 Subject: [PATCH] add query lib --- include/libs/catalog/catalog.h | 2 +- .../{common/tmessage.h => libs/query/query.h} | 10 ++-- source/common/inc/commonInt.h | 10 ---- source/libs/CMakeLists.txt | 3 +- source/libs/catalog/CMakeLists.txt | 4 +- source/libs/catalog/src/catalog.c | 57 +++++++++++++++++-- source/libs/query/CMakeLists.txt | 12 ++++ source/libs/query/inc/queryInt.h | 40 +++++++++++++ .../tmessage.c => libs/query/src/querymsg.c} | 25 ++++---- 9 files changed, 125 insertions(+), 38 deletions(-) rename include/{common/tmessage.h => libs/query/query.h} (69%) create mode 100644 source/libs/query/CMakeLists.txt create mode 100644 source/libs/query/inc/queryInt.h rename source/{common/src/tmessage.c => libs/query/src/querymsg.c} (86%) diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h index 5019191fd1..0b45f71557 100644 --- a/include/libs/catalog/catalog.h +++ b/include/libs/catalog/catalog.h @@ -108,7 +108,7 @@ int32_t catalogUpdateVgroup(struct SCatalog* pCatalog, SVgroupListInfo* pVgroup) int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName, int32_t* version); -int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* dbName, int32_t forceUpdate, SDBVgroupInfo* dbInfo); +int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* dbName, int32_t forceUpdate, SDBVgroupInfo** dbInfo); int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo); diff --git a/include/common/tmessage.h b/include/libs/query/query.h similarity index 69% rename from include/common/tmessage.h rename to include/libs/query/query.h index c728ee026e..866a69ed8e 100644 --- a/include/common/tmessage.h +++ b/include/libs/query/query.h @@ -13,20 +13,20 @@ * along with this program. If not, see . */ -#ifndef _TD_COMMON_TMESSAGE_H_ -#define _TD_COMMON_TMESSAGE_H_ +#ifndef _TD_QUERY_H_ +#define _TD_QUERY_H_ #ifdef __cplusplus extern "C" { #endif -extern int32_t (*tscBuildMsg[TSDB_MSG_TYPE_MAX])(void* input, char **msg, int32_t msgSize, int32_t *msgLen); -extern int32_t (*tscProcessMsgRsp[TSDB_MSG_TYPE_MAX])(void* output, char *msg, int32_t msgSize); +extern int32_t (*queryBuildMsg[TSDB_MSG_TYPE_MAX])(void* input, char **msg, int32_t msgSize, int32_t *msgLen); +extern int32_t (*queryProcessMsgRsp[TSDB_MSG_TYPE_MAX])(void* output, char *msg, int32_t msgSize); #ifdef __cplusplus } #endif -#endif /*_TD_COMMON_TMESSAGE_H_*/ +#endif /*_TD_QUERY_H_*/ diff --git a/source/common/inc/commonInt.h b/source/common/inc/commonInt.h index 448600e22e..e7d2dba95c 100644 --- a/source/common/inc/commonInt.h +++ b/source/common/inc/commonInt.h @@ -21,17 +21,7 @@ extern "C" { #endif -#include "tlog.h" -extern int32_t cDebugFlag; - -#define tscFatal(...) do { if (cDebugFlag & DEBUG_FATAL) { taosPrintLog("TSC FATAL ", cDebugFlag, __VA_ARGS__); }} while(0) -#define tscError(...) do { if (cDebugFlag & DEBUG_ERROR) { taosPrintLog("TSC ERROR ", cDebugFlag, __VA_ARGS__); }} while(0) -#define tscWarn(...) do { if (cDebugFlag & DEBUG_WARN) { taosPrintLog("TSC WARN ", cDebugFlag, __VA_ARGS__); }} while(0) -#define tscInfo(...) do { if (cDebugFlag & DEBUG_INFO) { taosPrintLog("TSC ", cDebugFlag, __VA_ARGS__); }} while(0) -#define tscDebug(...) do { if (cDebugFlag & DEBUG_DEBUG) { taosPrintLog("TSC ", cDebugFlag, __VA_ARGS__); }} while(0) -#define tscTrace(...) do { if (cDebugFlag & DEBUG_TRACE) { taosPrintLog("TSC ", cDebugFlag, __VA_ARGS__); }} while(0) -#define tscDebugL(...) do { if (cDebugFlag & DEBUG_DEBUG) { taosPrintLongString("TSC ", cDebugFlag, __VA_ARGS__); }} while(0) #ifdef __cplusplus } diff --git a/source/libs/CMakeLists.txt b/source/libs/CMakeLists.txt index 007bb1e967..57a5023807 100644 --- a/source/libs/CMakeLists.txt +++ b/source/libs/CMakeLists.txt @@ -9,4 +9,5 @@ add_subdirectory(cache) add_subdirectory(catalog) add_subdirectory(executor) add_subdirectory(planner) -add_subdirectory(function) \ No newline at end of file +add_subdirectory(function) +add_subdirectory(query) diff --git a/source/libs/catalog/CMakeLists.txt b/source/libs/catalog/CMakeLists.txt index ff3e62700a..25c80d502a 100644 --- a/source/libs/catalog/CMakeLists.txt +++ b/source/libs/catalog/CMakeLists.txt @@ -8,5 +8,5 @@ target_include_directories( target_link_libraries( catalog - PRIVATE os util common transport -) \ No newline at end of file + PRIVATE os util common transport query +) diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index f4a4b01dab..b488ab8101 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -15,7 +15,7 @@ #include "catalogInt.h" #include "trpc.h" -#include "tmessage.h" +#include "query.h" SCatalogMgmt ctgMgmt = {0}; @@ -24,7 +24,7 @@ int32_t ctgGetVgroupFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEpSe SEpSet *pVnodeEpSet = NULL; int32_t msgLen = 0; - int32_t code = tscBuildMsg[TSDB_MSG_TYPE_VGROUP_LIST](NULL, &msg, 0, &msgLen); + int32_t code = queryBuildMsg[TSDB_MSG_TYPE_VGROUP_LIST](NULL, &msg, 0, &msgLen); if (code) { return code; } @@ -39,7 +39,7 @@ int32_t ctgGetVgroupFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEpSe rpcSendRecv(pRpc, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp); - code = tscProcessMsgRsp[TSDB_MSG_TYPE_VGROUP_LIST](pVgroup, rpcRsp.pCont, rpcRsp.contLen); + code = queryProcessMsgRsp[TSDB_MSG_TYPE_VGROUP_LIST](pVgroup, rpcRsp.pCont, rpcRsp.contLen); if (code) { return code; } @@ -63,7 +63,23 @@ int32_t ctgGetVgroupFromCache(SCatalog* pCatalog, SArray** pVgroupList, int32_t* } +int32_t ctgGetDBVgroupFromCache(SCatalog* pCatalog, char *dbName, SDBVgroupInfo **dbInfo, int32_t *exist) { +/* + if (NULL == pCatalog->dbCache.cache) { + *exist = 0; + return TSDB_CODE_SUCCESS; + } + + taosHashGet(SHashObj * pHashObj, const void * key, size_t keyLen) + + if (dbInfo) { + *pVgroupList = taosArrayDup(pCatalog->vgroupCache.arrayCache); + } + *exist = 1; +*/ + return TSDB_CODE_SUCCESS; +} int32_t catalogInit(SCatalogCfg *cfg) { @@ -245,12 +261,41 @@ int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName, return TSDB_CODE_SUCCESS; } -int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* dbName, int32_t forceUpdate, SDBVgroupInfo* dbInfo) { +int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo) { } -int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo) { + + +int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* dbName, int32_t forceUpdate, SDBVgroupInfo** dbInfo) { + if (NULL == pCatalog || NULL == dbName || NULL == pRpc || NULL == pMgmtEps) { + return TSDB_CODE_CTG_INVALID_INPUT; + } + +/* + int32_t exist = 0; + + if (0 == forceUpdate) { + CTG_ERR_RET(ctgGetDBVgroupFromCache(pCatalog, dbName, dbInfo, &exist)); + + if (exist) { + return TSDB_CODE_SUCCESS; + } + } + + SDBVgroupInfo* newDbInfo = NULL; + + CTG_ERR_RET(ctgGetDBVgroupFromMnode(pCatalog, pRpc, pMgmtEps, dbName, &newDbInfo)); + + CTG_ERR_RET(catalogUpdateDBVgroup(pCatalog, dbName, newDbInfo)); + + if (dbInfo) { + *dbInfo = newDbInfo; + } +*/ + + return TSDB_CODE_SUCCESS; } @@ -265,7 +310,7 @@ int32_t catalogGetTableMetaFromMnode(struct SCatalog* pCatalog, void *pRpc, cons SEpSet *pVnodeEpSet = NULL; int32_t msgLen = 0; - int32_t code = tscBuildMsg[TSDB_MSG_TYPE_TABLE_META](&bInput, &msg, 0, &msgLen); + int32_t code = queryBuildMsg[TSDB_MSG_TYPE_TABLE_META](&bInput, &msg, 0, &msgLen); if (code) { return code; } diff --git a/source/libs/query/CMakeLists.txt b/source/libs/query/CMakeLists.txt new file mode 100644 index 0000000000..579a4b279c --- /dev/null +++ b/source/libs/query/CMakeLists.txt @@ -0,0 +1,12 @@ +aux_source_directory(src QUERY_SRC) +add_library(query ${QUERY_SRC}) +target_include_directories( + query + PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/query" + PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" +) + +target_link_libraries( + query + PRIVATE os util common transport +) diff --git a/source/libs/query/inc/queryInt.h b/source/libs/query/inc/queryInt.h new file mode 100644 index 0000000000..f3204b3785 --- /dev/null +++ b/source/libs/query/inc/queryInt.h @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_QUERY_INT_H_ +#define _TD_QUERY_INT_H_ + +#ifdef __cplusplus +extern "C" { +#endif + + +#include "tlog.h" + +extern int32_t qDebugFlag; + +#define qFatal(...) do { if (qDebugFlag & DEBUG_FATAL) { taosPrintLog("QRY FATAL ", qDebugFlag, __VA_ARGS__); }} while(0) +#define qError(...) do { if (qDebugFlag & DEBUG_ERROR) { taosPrintLog("QRY ERROR ", qDebugFlag, __VA_ARGS__); }} while(0) +#define qWarn(...) do { if (qDebugFlag & DEBUG_WARN) { taosPrintLog("QRY WARN ", qDebugFlag, __VA_ARGS__); }} while(0) +#define qInfo(...) do { if (qDebugFlag & DEBUG_INFO) { taosPrintLog("QRY ", qDebugFlag, __VA_ARGS__); }} while(0) +#define qDebug(...) do { if (qDebugFlag & DEBUG_DEBUG) { taosPrintLog("QRY ", qDebugFlag, __VA_ARGS__); }} while(0) +#define qTrace(...) do { if (qDebugFlag & DEBUG_TRACE) { taosPrintLog("QRY ", qDebugFlag, __VA_ARGS__); }} while(0) +#define qDebugL(...) do { if (qDebugFlag & DEBUG_DEBUG) { taosPrintLongString("QRY ", qDebugFlag, __VA_ARGS__); }} while(0) + +#ifdef __cplusplus +} +#endif + +#endif /*_TD_QUERY_INT_H_*/ diff --git a/source/common/src/tmessage.c b/source/libs/query/src/querymsg.c similarity index 86% rename from source/common/src/tmessage.c rename to source/libs/query/src/querymsg.c index 07adf1d599..924878c872 100644 --- a/source/common/src/tmessage.c +++ b/source/libs/query/src/querymsg.c @@ -14,15 +14,15 @@ */ #include "taosmsg.h" -#include "commonint.h" +#include "queryInt.h" -int32_t (*tscBuildMsg[TSDB_MSG_TYPE_MAX])(void* input, char **msg, int32_t msgSize, int32_t *msgLen) = {0}; +int32_t (*queryBuildMsg[TSDB_MSG_TYPE_MAX])(void* input, char **msg, int32_t msgSize, int32_t *msgLen) = {0}; -int32_t (*tscProcessMsgRsp[TSDB_MSG_TYPE_MAX])(void* output, char *msg, int32_t msgSize) = {0}; +int32_t (*queryProcessMsgRsp[TSDB_MSG_TYPE_MAX])(void* output, char *msg, int32_t msgSize) = {0}; -int32_t tscBuildVgroupListReqMsg(void* input, char **msg, int32_t msgSize, int32_t *msgLen) { +int32_t queryBuildVgroupListReqMsg(void* input, char **msg, int32_t msgSize, int32_t *msgLen) { if (NULL == msg || NULL == msgLen) { return TSDB_CODE_TSC_INVALID_INPUT; } @@ -32,7 +32,7 @@ int32_t tscBuildVgroupListReqMsg(void* input, char **msg, int32_t msgSize, int32 return TSDB_CODE_SUCCESS; } -int32_t tscBuildTableMetaReqMsg(void* input, char **msg, int32_t msgSize, int32_t *msgLen) { +int32_t queryBuildTableMetaReqMsg(void* input, char **msg, int32_t msgSize, int32_t *msgLen) { if (NULL == input || NULL == msg || NULL == msgLen) { return TSDB_CODE_TSC_INVALID_INPUT; } @@ -61,7 +61,7 @@ int32_t tscBuildTableMetaReqMsg(void* input, char **msg, int32_t msgSize, int32_ } -int32_t tscProcessVgroupListRsp(void* output, char *msg, int32_t msgSize) { +int32_t queryProcessVgroupListRsp(void* output, char *msg, int32_t msgSize) { if (NULL == output || NULL == msg || msgSize <= 0) { return TSDB_CODE_TSC_INVALID_INPUT; } @@ -72,17 +72,17 @@ int32_t tscProcessVgroupListRsp(void* output, char *msg, int32_t msgSize) { pRsp->vgroupVersion = htonl(pRsp->vgroupVersion); if (pRsp->vgroupNum < 0) { - tscError("vgroup number[%d] in rsp is invalid", pRsp->vgroupNum); + qError("vgroup number[%d] in rsp is invalid", pRsp->vgroupNum); return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE; } if (pRsp->vgroupVersion < 0) { - tscError("vgroup vgroupVersion[%d] in rsp is invalid", pRsp->vgroupVersion); + qError("vgroup vgroupVersion[%d] in rsp is invalid", pRsp->vgroupVersion); return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE; } if (msgSize != (pRsp->vgroupNum * sizeof(pRsp->vgroupInfo[0]) + sizeof(*pRsp))) { - tscError("vgroup list msg size mis-match, msgSize:%d, vgroup number:%d", msgSize, pRsp->vgroupNum); + qError("vgroup list msg size mis-match, msgSize:%d, vgroup number:%d", msgSize, pRsp->vgroupNum); return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE; } @@ -104,12 +104,11 @@ int32_t tscProcessVgroupListRsp(void* output, char *msg, int32_t msgSize) { } void msgInit() { - tscBuildMsg[TSDB_MSG_TYPE_TABLE_META] = tscBuildTableMetaReqMsg; - tscBuildMsg[TSDB_MSG_TYPE_VGROUP_LIST] = tscBuildVgroupListReqMsg; - + queryBuildMsg[TSDB_MSG_TYPE_TABLE_META] = queryBuildTableMetaReqMsg; + queryBuildMsg[TSDB_MSG_TYPE_VGROUP_LIST] = queryBuildVgroupListReqMsg; //tscProcessMsgRsp[TSDB_MSG_TYPE_TABLE_META] = tscProcessTableMetaRsp; - tscProcessMsgRsp[TSDB_MSG_TYPE_VGROUP_LIST] = tscProcessVgroupListRsp; + queryProcessMsgRsp[TSDB_MSG_TYPE_VGROUP_LIST] = queryProcessVgroupListRsp; /* tscBuildMsg[TSDB_SQL_SELECT] = tscBuildQueryMsg; -- GitLab