提交 2dda9707 编写于 作者: D dapan1121

fix catalog bug and modify scheduler api

上级 c6c97967
......@@ -32,8 +32,7 @@ extern "C" {
struct SCatalog;
typedef struct SCatalogReq {
char dbName[TSDB_DB_NAME_LEN];
SArray *pTableName; // table full name
SArray *pTableName; // element is SNAME
SArray *pUdf; // udf name
bool qNodeRequired; // valid qnode
} SCatalogReq;
......@@ -54,10 +53,10 @@ typedef struct SCatalogCfg {
int32_t catalogInit(SCatalogCfg *cfg);
/**
* Catalog service object, which is utilized to hold tableMeta (meta/vgroupInfo/udfInfo) at the client-side.
* There is ONLY one SCatalog object for one process space, and this function returns a singleton.
* @param clusterId
* @return
* Get a cluster's catalog handle for all later operations.
* @param clusterId (input, end with \0)
* @param catalogHandle (output, NO need to free it)
* @return error code
*/
int32_t catalogGetHandle(const char *clusterId, struct SCatalog** catalogHandle);
......@@ -65,36 +64,75 @@ int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName,
int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* dbName, int32_t forceUpdate, SDBVgroupInfo* dbInfo);
int32_t catalogUpdateDBVgroupCache(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo);
/**
* Get a table's meta data.
* @param pCatalog (input, got with catalogGetHandle)
* @param pRpc (input, rpc object)
* @param pMgmtEps (input, mnode EPs)
* @param pDBName (input, full db name)
* @param pTableName (input, table name, NOT including db name)
* @param pTableMeta(output, table meta data, NEED to free it by calller)
* @return error code
*/
int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, STableMeta** pTableMeta);
/**
* Force renew a table's local cached meta data.
* @param pCatalog (input, got with catalogGetHandle)
* @param pRpc (input, rpc object)
* @param pMgmtEps (input, mnode EPs)
* @param pDBName (input, full db name)
* @param pTableName (input, table name, NOT including db name)
* @return error code
*/
int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName);
/**
* Force renew a table's local cached meta data and get the new one.
* @param pCatalog (input, got with catalogGetHandle)
* @param pRpc (input, rpc object)
* @param pMgmtEps (input, mnode EPs)
* @param pDBName (input, full db name)
* @param pTableName (input, table name, NOT including db name)
* @param pTableMeta(output, table meta data, NEED to free it by calller)
* @return error code
*/
int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, STableMeta** pTableMeta);
/**
* get table's vgroup list.
* @param clusterId
* @pVgroupList - array of SVgroupInfo
* @return
* Get a table's actual vgroup, for stable it's all possible vgroup list.
* @param pCatalog (input, got with catalogGetHandle)
* @param pRpc (input, rpc object)
* @param pMgmtEps (input, mnode EPs)
* @param pDBName (input, full db name)
* @param pTableName (input, table name, NOT including db name)
* @param pVgroupList (output, vgroup info list, element is SVgroupInfo, NEED to simply free the array by caller)
* @return error code
*/
int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, SArray* pVgroupList);
/**
* get a table's dst vgroup from its name's hash value.
* @vgInfo - SVgroupInfo
* @return
* Get a table's vgroup from its name's hash value.
* @param pCatalog (input, got with catalogGetHandle)
* @param pRpc (input, rpc object)
* @param pMgmtEps (input, mnode EPs)
* @param pDBName (input, full db name)
* @param pTableName (input, table name, NOT including db name)
* @param vgInfo (output, vgroup info)
* @return error code
*/
int32_t catalogGetTableHashVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, SVgroupInfo* vgInfo);
/**
* Get the required meta data from mnode.
* Note that this is a synchronized API and is also thread-safety.
* @param pCatalog
* @param pMgmtEps
* @param pMetaReq
* @param pMetaData
* @return
* Get all meta data required in pReq.
* @param pCatalog (input, got with catalogGetHandle)
* @param pRpc (input, rpc object)
* @param pMgmtEps (input, mnode EPs)
* @param pReq (input, reqest info)
* @param pRsp (output, response data)
* @return error code
*/
int32_t catalogGetAllMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SCatalogReq* pReq, SMetaData* pRsp);
......@@ -105,7 +143,6 @@ int32_t catalogGetQnodeList(struct SCatalog* pCatalog, const SEpSet* pMgmtEps, S
/**
* Destroy catalog and relase all resources
* @param pCatalog
*/
void catalogDestroy(void);
......
......@@ -22,6 +22,7 @@ extern "C" {
#include "tarray.h"
#include "thash.h"
#include "tlog.h"
typedef SVgroupListRspMsg SVgroupListInfo;
......@@ -88,6 +89,17 @@ extern int32_t (*queryProcessMsgRsp[TSDB_MSG_TYPE_MAX])(void* output, char *msg,
extern void msgInit();
extern int32_t qDebugFlag;
#define qFatal(...) do { if (qDebugFlag & DEBUG_FATAL) { taosPrintLog("QRY FATAL ", qDebugFlag, __VA_ARGS__); }} while(0)
#define qError(...) do { if (qDebugFlag & DEBUG_ERROR) { taosPrintLog("QRY ERROR ", qDebugFlag, __VA_ARGS__); }} while(0)
#define qWarn(...) do { if (qDebugFlag & DEBUG_WARN) { taosPrintLog("QRY WARN ", qDebugFlag, __VA_ARGS__); }} while(0)
#define qInfo(...) do { if (qDebugFlag & DEBUG_INFO) { taosPrintLog("QRY ", qDebugFlag, __VA_ARGS__); }} while(0)
#define qDebug(...) do { if (qDebugFlag & DEBUG_DEBUG) { taosPrintLog("QRY ", qDebugFlag, __VA_ARGS__); }} while(0)
#define qTrace(...) do { if (qDebugFlag & DEBUG_TRACE) { taosPrintLog("QRY ", qDebugFlag, __VA_ARGS__); }} while(0)
#define qDebugL(...) do { if (qDebugFlag & DEBUG_DEBUG) { taosPrintLongString("QRY ", qDebugFlag, __VA_ARGS__); }} while(0)
#ifdef __cplusplus
}
#endif
......
......@@ -22,6 +22,10 @@ extern "C" {
#include "planner.h"
typedef struct SSchedulerCfg {
} SSchedulerCfg;
typedef struct SQueryProfileSummary {
int64_t startTs; // Object created and added into the message queue
int64_t endTs; // the timestamp when the task is completed
......
......@@ -324,6 +324,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_QRY_INCONSISTAN TAOS_DEF_ERROR_CODE(0, 0x070C) //"File inconsistency in replica")
#define TSDB_CODE_QRY_INVALID_TIME_CONDITION TAOS_DEF_ERROR_CODE(0, 0x070D) //"invalid time condition")
#define TSDB_CODE_QRY_SYS_ERROR TAOS_DEF_ERROR_CODE(0, 0x070E) //"System error")
#define TSDB_CODE_QRY_INVALID_INPUT TAOS_DEF_ERROR_CODE(0, 0x070F) //"invalid input")
// grant
......
......@@ -21,17 +21,6 @@ extern "C" {
#endif
#include "tlog.h"
extern int32_t qDebugFlag;
#define qFatal(...) do { if (qDebugFlag & DEBUG_FATAL) { taosPrintLog("QRY FATAL ", qDebugFlag, __VA_ARGS__); }} while(0)
#define qError(...) do { if (qDebugFlag & DEBUG_ERROR) { taosPrintLog("QRY ERROR ", qDebugFlag, __VA_ARGS__); }} while(0)
#define qWarn(...) do { if (qDebugFlag & DEBUG_WARN) { taosPrintLog("QRY WARN ", qDebugFlag, __VA_ARGS__); }} while(0)
#define qInfo(...) do { if (qDebugFlag & DEBUG_INFO) { taosPrintLog("QRY ", qDebugFlag, __VA_ARGS__); }} while(0)
#define qDebug(...) do { if (qDebugFlag & DEBUG_DEBUG) { taosPrintLog("QRY ", qDebugFlag, __VA_ARGS__); }} while(0)
#define qTrace(...) do { if (qDebugFlag & DEBUG_TRACE) { taosPrintLog("QRY ", qDebugFlag, __VA_ARGS__); }} while(0)
#define qDebugL(...) do { if (qDebugFlag & DEBUG_DEBUG) { taosPrintLongString("QRY ", qDebugFlag, __VA_ARGS__); }} while(0)
#ifdef __cplusplus
}
......
......@@ -120,7 +120,7 @@ int32_t queryProcessUseDBRsp(void* output, char *msg, int32_t msgSize) {
pRsp->vgroupInfo[i].hashEnd = htonl(pRsp->vgroupInfo[i].hashEnd);
for (int32_t n = 0; n < pRsp->vgroupInfo[i].numOfEps; ++n) {
pRsp->vgroupInfo[i].epAddr[n].port = htonl(pRsp->vgroupInfo[i].epAddr[n].port);
pRsp->vgroupInfo[i].epAddr[n].port = htons(pRsp->vgroupInfo[i].epAddr[n].port);
}
if (0 != taosHashPut(pOut->dbVgroup.vgInfo, &pRsp->vgroupInfo[i].vgId, sizeof(pRsp->vgroupInfo[i].vgId), &pRsp->vgroupInfo[i], sizeof(pRsp->vgroupInfo[i]))) {
......
......@@ -9,5 +9,5 @@ target_include_directories(
target_link_libraries(
scheduler
PRIVATE os util planner common
PRIVATE os util planner common query
)
\ No newline at end of file
......@@ -24,27 +24,55 @@ extern "C" {
#include "tarray.h"
#include "planner.h"
#include "scheduler.h"
#include "thash.h"
typedef struct SQuery {
SArray **pSubquery;
int32_t numOfLevels;
int32_t currentLevel;
} SQuery;
#define SCHEDULE_DEFAULT_JOB_NUMBER 1000
enum {
SCH_STATUS_NOT_START = 1,
SCH_STATUS_EXECUTING,
SCH_STATUS_SUCCEED,
SCH_STATUS_FAILED,
SCH_STATUS_CANCELLING,
SCH_STATUS_CANCELLED
};
typedef struct SSchedulerMgmt {
SHashObj *Jobs; // key: queryId, value: SQueryJob*
} SSchedulerMgmt;
typedef struct SQueryTask {
uint64_t queryId; // query id
uint64_t taskId; // task id
char *pSubplan; // operator tree
uint64_t status; // task status
uint64_t taskId; // task id
char *pSubplan; // operator tree
int8_t status; // task status
SQueryProfileSummary summary; // task execution summary
void *pOutputHandle; // result buffer handle, to temporarily keep the output result for next stage
} SQueryTask;
typedef struct SQueryLevel {
int8_t status;
int32_t taskNum;
SArray *subTasks; // Element is SQueryTask
SArray *subPlans; // Element is SSubplan
} SQueryLevel;
typedef struct SQueryJob {
SArray **pSubtasks;
// todo
uint64_t queryId;
int32_t levelNum;
int32_t levelIdx;
int8_t status;
SQueryProfileSummary summary;
SArray *levels; // Element is SQueryLevel, starting from 0.
SArray *subPlans; // Element is SArray*, and nested element is SSubplan. The execution level of subplan, starting from 0.
} SQueryJob;
#define SCH_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { return _code; } } while (0)
#define SCH_ERR_LRET(c,...) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { qError(__VA_ARGS__); return _code; } } while (0)
#define SCH_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { goto _return; } } while (0)
#ifdef __cplusplus
}
#endif
......
......@@ -15,6 +15,9 @@
#include "schedulerInt.h"
#include "taosmsg.h"
#include "query.h"
SSchedulerMgmt schMgmt = {0};
int32_t schBuildAndSendRequest(void *pRpc, const SEpSet* pMgmtEps, __taos_async_fn_t fp) {
......@@ -47,11 +50,95 @@ int32_t schBuildAndSendRequest(void *pRpc, const SEpSet* pMgmtEps, __taos_async_
*/
}
int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) {
int32_t levelNum = (int32_t)taosArrayGetSize(dag->pSubplans);
if (levelNum <= 0) {
qError("invalid level num:%d", levelNum);
return TSDB_CODE_QRY_INVALID_INPUT;
}
job->levels = taosArrayInit(levelNum, sizeof(SQueryLevel));
if (NULL == job->levels) {
qError("taosArrayInit %d failed", levelNum);
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
job->levelNum = levelNum;
job->levelIdx = levelNum - 1;
job->status = SCH_STATUS_NOT_START;
job->subPlans = dag->pSubplans;
SQueryLevel level = {0};
SArray *levelPlans = NULL;
int32_t levelPlanNum = 0;
for (int32_t i = 0; i < levelNum; ++i) {
levelPlans = taosArrayGetP(dag->pSubplans, i);
if (NULL == levelPlans) {
qError("no level plans for level %d", i);
return TSDB_CODE_QRY_INVALID_INPUT;
}
levelPlanNum = (int32_t)taosArrayGetSize(levelPlans);
if (levelPlanNum <= 0) {
qError("invalid level plans number:%d, level:%d", levelPlanNum, i);
return TSDB_CODE_QRY_INVALID_INPUT;
}
for (int32_t n = 0; n < levelPlanNum; ++n) {
}
}
return TSDB_CODE_SUCCESS;
}
int32_t schedulerInit(SSchedulerCfg *cfg) {
schMgmt.Jobs = taosHashInit(SCHEDULE_DEFAULT_JOB_NUMBER, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
if (NULL == schMgmt.Jobs) {
SCH_ERR_LRET(TSDB_CODE_QRY_OUT_OF_MEMORY, "init %d schduler jobs failed", SCHEDULE_DEFAULT_JOB_NUMBER);
}
return TSDB_CODE_SUCCESS;
}
int32_t scheduleQueryJob(SQueryDag* pDag, void** pJob) {
if (NULL == pDag || NULL == pDag->pSubplans || NULL == pJob) {
return TSDB_CODE_QRY_INVALID_INPUT;
}
SQueryJob *job = calloc(1, sizeof(SQueryJob));
if (NULL == job) {
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
schValidateAndBuildJob(pDag, job);
int32_t scheduleQueryJob(SQueryDag* pDag, void** pJob);
*(SQueryJob **)pJob = job;
}
int32_t scheduleFetchRows(void *pJob, void *data);
int32_t scheduleCancelJob(void *pJob);
void schedulerDestroy(void) {
if (schMgmt.Jobs) {
taosHashCleanup(schMgmt.Jobs); //TBD
schMgmt.Jobs = NULL;
}
}
......@@ -322,6 +322,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_QRY_NOT_ENOUGH_BUFFER, "Query buffer limit ha
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INCONSISTAN, "File inconsistance in replica")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INVALID_TIME_CONDITION, "One valid time range condition expected")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_SYS_ERROR, "System error")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INVALID_INPUT, "invalid input")
// grant
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册