From 2dda97074ab2ce3b81a6ac961987446af713af98 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 17 Dec 2021 09:49:25 +0800 Subject: [PATCH] fix catalog bug and modify scheduler api --- include/libs/catalog/catalog.h | 81 +++++++++++++++------ include/libs/query/query.h | 12 ++++ include/libs/scheduler/scheduler.h | 4 ++ include/util/taoserror.h | 1 + source/libs/query/inc/queryInt.h | 11 --- source/libs/query/src/querymsg.c | 2 +- source/libs/scheduler/CMakeLists.txt | 2 +- source/libs/scheduler/inc/schedulerInt.h | 52 ++++++++++---- source/libs/scheduler/src/scheduler.c | 89 +++++++++++++++++++++++- source/util/src/terror.c | 1 + 10 files changed, 207 insertions(+), 48 deletions(-) diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h index 449064c8c6..ee626865fb 100644 --- a/include/libs/catalog/catalog.h +++ b/include/libs/catalog/catalog.h @@ -32,8 +32,7 @@ extern "C" { struct SCatalog; typedef struct SCatalogReq { - char dbName[TSDB_DB_NAME_LEN]; - SArray *pTableName; // table full name + SArray *pTableName; // element is SNAME SArray *pUdf; // udf name bool qNodeRequired; // valid qnode } SCatalogReq; @@ -54,10 +53,10 @@ typedef struct SCatalogCfg { int32_t catalogInit(SCatalogCfg *cfg); /** - * Catalog service object, which is utilized to hold tableMeta (meta/vgroupInfo/udfInfo) at the client-side. - * There is ONLY one SCatalog object for one process space, and this function returns a singleton. - * @param clusterId - * @return + * Get a cluster's catalog handle for all later operations. + * @param clusterId (input, end with \0) + * @param catalogHandle (output, NO need to free it) + * @return error code */ int32_t catalogGetHandle(const char *clusterId, struct SCatalog** catalogHandle); @@ -65,36 +64,75 @@ int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName, int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* dbName, int32_t forceUpdate, SDBVgroupInfo* dbInfo); int32_t catalogUpdateDBVgroupCache(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo); - +/** + * Get a table's meta data. + * @param pCatalog (input, got with catalogGetHandle) + * @param pRpc (input, rpc object) + * @param pMgmtEps (input, mnode EPs) + * @param pDBName (input, full db name) + * @param pTableName (input, table name, NOT including db name) + * @param pTableMeta(output, table meta data, NEED to free it by calller) + * @return error code + */ int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, STableMeta** pTableMeta); + +/** + * Force renew a table's local cached meta data. + * @param pCatalog (input, got with catalogGetHandle) + * @param pRpc (input, rpc object) + * @param pMgmtEps (input, mnode EPs) + * @param pDBName (input, full db name) + * @param pTableName (input, table name, NOT including db name) + * @return error code + */ int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName); + +/** + * Force renew a table's local cached meta data and get the new one. + * @param pCatalog (input, got with catalogGetHandle) + * @param pRpc (input, rpc object) + * @param pMgmtEps (input, mnode EPs) + * @param pDBName (input, full db name) + * @param pTableName (input, table name, NOT including db name) + * @param pTableMeta(output, table meta data, NEED to free it by calller) + * @return error code + */ int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, STableMeta** pTableMeta); /** - * get table's vgroup list. - * @param clusterId - * @pVgroupList - array of SVgroupInfo - * @return + * Get a table's actual vgroup, for stable it's all possible vgroup list. + * @param pCatalog (input, got with catalogGetHandle) + * @param pRpc (input, rpc object) + * @param pMgmtEps (input, mnode EPs) + * @param pDBName (input, full db name) + * @param pTableName (input, table name, NOT including db name) + * @param pVgroupList (output, vgroup info list, element is SVgroupInfo, NEED to simply free the array by caller) + * @return error code */ int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, SArray* pVgroupList); /** - * get a table's dst vgroup from its name's hash value. - * @vgInfo - SVgroupInfo - * @return + * Get a table's vgroup from its name's hash value. + * @param pCatalog (input, got with catalogGetHandle) + * @param pRpc (input, rpc object) + * @param pMgmtEps (input, mnode EPs) + * @param pDBName (input, full db name) + * @param pTableName (input, table name, NOT including db name) + * @param vgInfo (output, vgroup info) + * @return error code */ int32_t catalogGetTableHashVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, SVgroupInfo* vgInfo); /** - * Get the required meta data from mnode. - * Note that this is a synchronized API and is also thread-safety. - * @param pCatalog - * @param pMgmtEps - * @param pMetaReq - * @param pMetaData - * @return + * Get all meta data required in pReq. + * @param pCatalog (input, got with catalogGetHandle) + * @param pRpc (input, rpc object) + * @param pMgmtEps (input, mnode EPs) + * @param pReq (input, reqest info) + * @param pRsp (output, response data) + * @return error code */ int32_t catalogGetAllMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SCatalogReq* pReq, SMetaData* pRsp); @@ -105,7 +143,6 @@ int32_t catalogGetQnodeList(struct SCatalog* pCatalog, const SEpSet* pMgmtEps, S /** * Destroy catalog and relase all resources - * @param pCatalog */ void catalogDestroy(void); diff --git a/include/libs/query/query.h b/include/libs/query/query.h index 060aef9d65..d92f7d4497 100644 --- a/include/libs/query/query.h +++ b/include/libs/query/query.h @@ -22,6 +22,7 @@ extern "C" { #include "tarray.h" #include "thash.h" +#include "tlog.h" typedef SVgroupListRspMsg SVgroupListInfo; @@ -88,6 +89,17 @@ extern int32_t (*queryProcessMsgRsp[TSDB_MSG_TYPE_MAX])(void* output, char *msg, extern void msgInit(); +extern int32_t qDebugFlag; + +#define qFatal(...) do { if (qDebugFlag & DEBUG_FATAL) { taosPrintLog("QRY FATAL ", qDebugFlag, __VA_ARGS__); }} while(0) +#define qError(...) do { if (qDebugFlag & DEBUG_ERROR) { taosPrintLog("QRY ERROR ", qDebugFlag, __VA_ARGS__); }} while(0) +#define qWarn(...) do { if (qDebugFlag & DEBUG_WARN) { taosPrintLog("QRY WARN ", qDebugFlag, __VA_ARGS__); }} while(0) +#define qInfo(...) do { if (qDebugFlag & DEBUG_INFO) { taosPrintLog("QRY ", qDebugFlag, __VA_ARGS__); }} while(0) +#define qDebug(...) do { if (qDebugFlag & DEBUG_DEBUG) { taosPrintLog("QRY ", qDebugFlag, __VA_ARGS__); }} while(0) +#define qTrace(...) do { if (qDebugFlag & DEBUG_TRACE) { taosPrintLog("QRY ", qDebugFlag, __VA_ARGS__); }} while(0) +#define qDebugL(...) do { if (qDebugFlag & DEBUG_DEBUG) { taosPrintLongString("QRY ", qDebugFlag, __VA_ARGS__); }} while(0) + + #ifdef __cplusplus } #endif diff --git a/include/libs/scheduler/scheduler.h b/include/libs/scheduler/scheduler.h index fe22d33086..d73e388c20 100644 --- a/include/libs/scheduler/scheduler.h +++ b/include/libs/scheduler/scheduler.h @@ -22,6 +22,10 @@ extern "C" { #include "planner.h" +typedef struct SSchedulerCfg { + +} SSchedulerCfg; + typedef struct SQueryProfileSummary { int64_t startTs; // Object created and added into the message queue int64_t endTs; // the timestamp when the task is completed diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 4f1ef7da7b..689d2676d1 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -324,6 +324,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_QRY_INCONSISTAN TAOS_DEF_ERROR_CODE(0, 0x070C) //"File inconsistency in replica") #define TSDB_CODE_QRY_INVALID_TIME_CONDITION TAOS_DEF_ERROR_CODE(0, 0x070D) //"invalid time condition") #define TSDB_CODE_QRY_SYS_ERROR TAOS_DEF_ERROR_CODE(0, 0x070E) //"System error") +#define TSDB_CODE_QRY_INVALID_INPUT TAOS_DEF_ERROR_CODE(0, 0x070F) //"invalid input") // grant diff --git a/source/libs/query/inc/queryInt.h b/source/libs/query/inc/queryInt.h index f3204b3785..75c1e134cd 100644 --- a/source/libs/query/inc/queryInt.h +++ b/source/libs/query/inc/queryInt.h @@ -21,17 +21,6 @@ extern "C" { #endif -#include "tlog.h" - -extern int32_t qDebugFlag; - -#define qFatal(...) do { if (qDebugFlag & DEBUG_FATAL) { taosPrintLog("QRY FATAL ", qDebugFlag, __VA_ARGS__); }} while(0) -#define qError(...) do { if (qDebugFlag & DEBUG_ERROR) { taosPrintLog("QRY ERROR ", qDebugFlag, __VA_ARGS__); }} while(0) -#define qWarn(...) do { if (qDebugFlag & DEBUG_WARN) { taosPrintLog("QRY WARN ", qDebugFlag, __VA_ARGS__); }} while(0) -#define qInfo(...) do { if (qDebugFlag & DEBUG_INFO) { taosPrintLog("QRY ", qDebugFlag, __VA_ARGS__); }} while(0) -#define qDebug(...) do { if (qDebugFlag & DEBUG_DEBUG) { taosPrintLog("QRY ", qDebugFlag, __VA_ARGS__); }} while(0) -#define qTrace(...) do { if (qDebugFlag & DEBUG_TRACE) { taosPrintLog("QRY ", qDebugFlag, __VA_ARGS__); }} while(0) -#define qDebugL(...) do { if (qDebugFlag & DEBUG_DEBUG) { taosPrintLongString("QRY ", qDebugFlag, __VA_ARGS__); }} while(0) #ifdef __cplusplus } diff --git a/source/libs/query/src/querymsg.c b/source/libs/query/src/querymsg.c index 9d99b568a5..7f033c0fdf 100644 --- a/source/libs/query/src/querymsg.c +++ b/source/libs/query/src/querymsg.c @@ -120,7 +120,7 @@ int32_t queryProcessUseDBRsp(void* output, char *msg, int32_t msgSize) { pRsp->vgroupInfo[i].hashEnd = htonl(pRsp->vgroupInfo[i].hashEnd); for (int32_t n = 0; n < pRsp->vgroupInfo[i].numOfEps; ++n) { - pRsp->vgroupInfo[i].epAddr[n].port = htonl(pRsp->vgroupInfo[i].epAddr[n].port); + pRsp->vgroupInfo[i].epAddr[n].port = htons(pRsp->vgroupInfo[i].epAddr[n].port); } if (0 != taosHashPut(pOut->dbVgroup.vgInfo, &pRsp->vgroupInfo[i].vgId, sizeof(pRsp->vgroupInfo[i].vgId), &pRsp->vgroupInfo[i], sizeof(pRsp->vgroupInfo[i]))) { diff --git a/source/libs/scheduler/CMakeLists.txt b/source/libs/scheduler/CMakeLists.txt index fd00085381..6675b7f5ec 100644 --- a/source/libs/scheduler/CMakeLists.txt +++ b/source/libs/scheduler/CMakeLists.txt @@ -9,5 +9,5 @@ target_include_directories( target_link_libraries( scheduler - PRIVATE os util planner common + PRIVATE os util planner common query ) \ No newline at end of file diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index 1648cbfc98..8e30ce1403 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -24,27 +24,55 @@ extern "C" { #include "tarray.h" #include "planner.h" #include "scheduler.h" +#include "thash.h" -typedef struct SQuery { - SArray **pSubquery; - int32_t numOfLevels; - int32_t currentLevel; -} SQuery; +#define SCHEDULE_DEFAULT_JOB_NUMBER 1000 + +enum { + SCH_STATUS_NOT_START = 1, + SCH_STATUS_EXECUTING, + SCH_STATUS_SUCCEED, + SCH_STATUS_FAILED, + SCH_STATUS_CANCELLING, + SCH_STATUS_CANCELLED +}; + +typedef struct SSchedulerMgmt { + SHashObj *Jobs; // key: queryId, value: SQueryJob* +} SSchedulerMgmt; typedef struct SQueryTask { - uint64_t queryId; // query id - uint64_t taskId; // task id - char *pSubplan; // operator tree - uint64_t status; // task status + uint64_t taskId; // task id + char *pSubplan; // operator tree + int8_t status; // task status SQueryProfileSummary summary; // task execution summary - void *pOutputHandle; // result buffer handle, to temporarily keep the output result for next stage } SQueryTask; +typedef struct SQueryLevel { + int8_t status; + int32_t taskNum; + + SArray *subTasks; // Element is SQueryTask + SArray *subPlans; // Element is SSubplan +} SQueryLevel; + typedef struct SQueryJob { - SArray **pSubtasks; - // todo + uint64_t queryId; + int32_t levelNum; + int32_t levelIdx; + int8_t status; + SQueryProfileSummary summary; + + SArray *levels; // Element is SQueryLevel, starting from 0. + SArray *subPlans; // Element is SArray*, and nested element is SSubplan. The execution level of subplan, starting from 0. } SQueryJob; + +#define SCH_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { return _code; } } while (0) +#define SCH_ERR_LRET(c,...) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { qError(__VA_ARGS__); return _code; } } while (0) +#define SCH_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { goto _return; } } while (0) + + #ifdef __cplusplus } #endif diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 66fd0aa4f3..8d2e1ed916 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -15,6 +15,9 @@ #include "schedulerInt.h" #include "taosmsg.h" +#include "query.h" + +SSchedulerMgmt schMgmt = {0}; int32_t schBuildAndSendRequest(void *pRpc, const SEpSet* pMgmtEps, __taos_async_fn_t fp) { @@ -47,11 +50,95 @@ int32_t schBuildAndSendRequest(void *pRpc, const SEpSet* pMgmtEps, __taos_async_ */ } +int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) { + int32_t levelNum = (int32_t)taosArrayGetSize(dag->pSubplans); + if (levelNum <= 0) { + qError("invalid level num:%d", levelNum); + return TSDB_CODE_QRY_INVALID_INPUT; + } + + job->levels = taosArrayInit(levelNum, sizeof(SQueryLevel)); + if (NULL == job->levels) { + qError("taosArrayInit %d failed", levelNum); + return TSDB_CODE_QRY_OUT_OF_MEMORY; + } + + job->levelNum = levelNum; + job->levelIdx = levelNum - 1; + job->status = SCH_STATUS_NOT_START; + + job->subPlans = dag->pSubplans; + + SQueryLevel level = {0}; + SArray *levelPlans = NULL; + int32_t levelPlanNum = 0; + + for (int32_t i = 0; i < levelNum; ++i) { + levelPlans = taosArrayGetP(dag->pSubplans, i); + if (NULL == levelPlans) { + qError("no level plans for level %d", i); + return TSDB_CODE_QRY_INVALID_INPUT; + } + + levelPlanNum = (int32_t)taosArrayGetSize(levelPlans); + if (levelPlanNum <= 0) { + qError("invalid level plans number:%d, level:%d", levelPlanNum, i); + return TSDB_CODE_QRY_INVALID_INPUT; + } + + for (int32_t n = 0; n < levelPlanNum; ++n) { + + } + } + + return TSDB_CODE_SUCCESS; +} + + + +int32_t schedulerInit(SSchedulerCfg *cfg) { + schMgmt.Jobs = taosHashInit(SCHEDULE_DEFAULT_JOB_NUMBER, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK); + if (NULL == schMgmt.Jobs) { + SCH_ERR_LRET(TSDB_CODE_QRY_OUT_OF_MEMORY, "init %d schduler jobs failed", SCHEDULE_DEFAULT_JOB_NUMBER); + } + + return TSDB_CODE_SUCCESS; +} + + +int32_t scheduleQueryJob(SQueryDag* pDag, void** pJob) { + if (NULL == pDag || NULL == pDag->pSubplans || NULL == pJob) { + return TSDB_CODE_QRY_INVALID_INPUT; + } + + + SQueryJob *job = calloc(1, sizeof(SQueryJob)); + if (NULL == job) { + return TSDB_CODE_QRY_OUT_OF_MEMORY; + } + + schValidateAndBuildJob(pDag, job); + + + -int32_t scheduleQueryJob(SQueryDag* pDag, void** pJob); + + *(SQueryJob **)pJob = job; + + + + +} int32_t scheduleFetchRows(void *pJob, void *data); int32_t scheduleCancelJob(void *pJob); +void schedulerDestroy(void) { + if (schMgmt.Jobs) { + taosHashCleanup(schMgmt.Jobs); //TBD + schMgmt.Jobs = NULL; + } +} + diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 70a3dc622f..5518ec2a31 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -322,6 +322,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_QRY_NOT_ENOUGH_BUFFER, "Query buffer limit ha TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INCONSISTAN, "File inconsistance in replica") TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INVALID_TIME_CONDITION, "One valid time range condition expected") TAOS_DEFINE_ERROR(TSDB_CODE_QRY_SYS_ERROR, "System error") +TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INVALID_INPUT, "invalid input") // grant -- GitLab