提交 0d2ac682 编写于 作者: D dapan1121

feature/qnode

上级 6adae940
......@@ -45,7 +45,6 @@ typedef struct SMetaData {
} SMetaData;
typedef struct SCatalogCfg {
bool enableVgroupCache;
uint32_t maxTblCacheNum;
uint32_t maxDBCacheNum;
} SCatalogCfg;
......@@ -61,8 +60,8 @@ int32_t catalogInit(SCatalogCfg *cfg);
int32_t catalogGetHandle(const char *clusterId, struct SCatalog** catalogHandle);
int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName, int32_t* version);
int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* dbName, int32_t forceUpdate, SDBVgroupInfo* dbInfo);
int32_t catalogUpdateDBVgroupCache(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo);
int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo);
/**
* Get a table's meta data.
......
......@@ -24,7 +24,6 @@ extern "C" {
#include "catalog.h"
typedef struct SSchedulerCfg {
int32_t clusterType;
int32_t maxJobNum;
} SSchedulerCfg;
......
......@@ -66,8 +66,6 @@ typedef uint32_t (*tableNameHashFp)(const char *, uint32_t);
#define ctgTrace(...) do { if (ctgDebugFlag & DEBUG_TRACE) { taosPrintLog("CTG ", ctgDebugFlag, __VA_ARGS__); }} while(0)
#define ctgDebugL(...) do { if (ctgDebugFlag & DEBUG_DEBUG) { taosPrintLongString("CTG ", ctgDebugFlag, __VA_ARGS__); }} while(0)
#define CTG_CACHE_ENABLED() (ctgMgmt.cfg.maxDBCacheNum > 0 || ctgMgmt.cfg.maxTblCacheNum > 0)
#define CTG_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0)
#define CTG_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0)
#define CTG_ERR_LRET(c,...) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { ctgError(__VA_ARGS__); terrno = _code; return _code; } } while (0)
......
......@@ -370,6 +370,41 @@ int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *out
return TSDB_CODE_SUCCESS;
}
int32_t ctgGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* dbName, int32_t forceUpdate, SDBVgroupInfo* dbInfo) {
if (NULL == pCatalog || NULL == dbName || NULL == pRpc || NULL == pMgmtEps) {
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
}
int32_t exist = 0;
if (0 == forceUpdate) {
CTG_ERR_RET(ctgGetDBVgroupFromCache(pCatalog, dbName, dbInfo, &exist));
if (exist) {
return TSDB_CODE_SUCCESS;
}
}
SUseDbOutput DbOut = {0};
SBuildUseDBInput input = {0};
strncpy(input.db, dbName, sizeof(input.db));
input.db[sizeof(input.db) - 1] = 0;
input.vgVersion = CTG_DEFAULT_INVALID_VERSION;
CTG_ERR_RET(ctgGetDBVgroupFromMnode(pCatalog, pRpc, pMgmtEps, &input, &DbOut));
CTG_ERR_RET(catalogUpdateDBVgroup(pCatalog, dbName, &DbOut.dbVgroup));
if (dbInfo) {
*dbInfo = DbOut.dbVgroup;
}
return TSDB_CODE_SUCCESS;
}
int32_t catalogInit(SCatalogCfg *cfg) {
if (ctgMgmt.pCluster) {
ctgError("catalog already init");
......@@ -378,16 +413,22 @@ int32_t catalogInit(SCatalogCfg *cfg) {
if (cfg) {
memcpy(&ctgMgmt.cfg, cfg, sizeof(*cfg));
if (ctgMgmt.cfg.maxDBCacheNum == 0) {
ctgMgmt.cfg.maxDBCacheNum = CTG_DEFAULT_CACHE_DB_NUMBER;
}
if (ctgMgmt.cfg.maxTblCacheNum == 0) {
ctgMgmt.cfg.maxTblCacheNum = CTG_DEFAULT_CACHE_TABLEMETA_NUMBER;
}
} else {
ctgMgmt.cfg.maxDBCacheNum = CTG_DEFAULT_CACHE_DB_NUMBER;
ctgMgmt.cfg.maxTblCacheNum = CTG_DEFAULT_CACHE_TABLEMETA_NUMBER;
}
if (CTG_CACHE_ENABLED()) {
ctgMgmt.pCluster = taosHashInit(CTG_DEFAULT_CACHE_CLUSTER_NUMBER, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
if (NULL == ctgMgmt.pCluster) {
CTG_ERR_LRET(TSDB_CODE_CTG_INTERNAL_ERROR, "init %d cluster cache failed", CTG_DEFAULT_CACHE_CLUSTER_NUMBER);
}
ctgMgmt.pCluster = taosHashInit(CTG_DEFAULT_CACHE_CLUSTER_NUMBER, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
if (NULL == ctgMgmt.pCluster) {
CTG_ERR_LRET(TSDB_CODE_CTG_INTERNAL_ERROR, "init %d cluster cache failed", CTG_DEFAULT_CACHE_CLUSTER_NUMBER);
}
return TSDB_CODE_SUCCESS;
......@@ -449,13 +490,19 @@ int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName,
return TSDB_CODE_SUCCESS;
}
int32_t catalogUpdateDBVgroupCache(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo) {
int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo) {
if (NULL == pCatalog || NULL == dbName || NULL == dbInfo) {
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
}
if (dbInfo->vgVersion < 0) {
if (pCatalog->dbCache.cache) {
SDBVgroupInfo *oldInfo = taosHashGet(pCatalog->dbCache.cache, dbName, strlen(dbName));
if (oldInfo && oldInfo->vgInfo) {
taosHashCleanup(oldInfo->vgInfo);
oldInfo->vgInfo = NULL;
}
taosHashRemove(pCatalog->dbCache.cache, dbName, strlen(dbName));
}
......@@ -485,42 +532,6 @@ int32_t catalogUpdateDBVgroupCache(struct SCatalog* pCatalog, const char* dbName
return TSDB_CODE_SUCCESS;
}
int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* dbName, int32_t forceUpdate, SDBVgroupInfo* dbInfo) {
if (NULL == pCatalog || NULL == dbName || NULL == pRpc || NULL == pMgmtEps) {
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
}
int32_t exist = 0;
if (0 == forceUpdate) {
CTG_ERR_RET(ctgGetDBVgroupFromCache(pCatalog, dbName, dbInfo, &exist));
if (exist) {
return TSDB_CODE_SUCCESS;
}
}
SUseDbOutput DbOut = {0};
SBuildUseDBInput input = {0};
strncpy(input.db, dbName, sizeof(input.db));
input.db[sizeof(input.db) - 1] = 0;
input.vgVersion = CTG_DEFAULT_INVALID_VERSION;
CTG_ERR_RET(ctgGetDBVgroupFromMnode(pCatalog, pRpc, pMgmtEps, &input, &DbOut));
CTG_ERR_RET(catalogUpdateDBVgroupCache(pCatalog, dbName, &DbOut.dbVgroup));
if (dbInfo) {
*dbInfo = DbOut.dbVgroup;
}
return TSDB_CODE_SUCCESS;
}
int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, STableMeta** pTableMeta) {
return ctgGetTableMetaImpl(pCatalog, pTransporter, pMgmtEps, pDBName, pTableName, false, pTableMeta);
}
......@@ -531,6 +542,7 @@ int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSe
}
SVgroupInfo vgroupInfo = {0};
int32_t code = 0;
CTG_ERR_RET(catalogGetTableHashVgroup(pCatalog, pRpc, pMgmtEps, pDBName, pTableName, &vgroupInfo));
......@@ -540,11 +552,13 @@ int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSe
CTG_ERR_RET(ctgGetTableMetaFromMnode(pCatalog, pRpc, pMgmtEps, pDBName, pTableName, &output));
CTG_ERR_RET(ctgUpdateTableMetaCache(pCatalog, &output));
CTG_ERR_JRET(ctgUpdateTableMetaCache(pCatalog, &output));
_return:
tfree(output.tbMeta);
return TSDB_CODE_SUCCESS;
CTG_RET(code);
}
int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, STableMeta** pTableMeta) {
......@@ -563,7 +577,7 @@ int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const S
CTG_ERR_JRET(catalogGetTableMeta(pCatalog, pRpc, pMgmtEps, pDBName, pTableName, &tbMeta));
CTG_ERR_JRET(catalogGetDBVgroup(pCatalog, pRpc, pMgmtEps, pDBName, false, &dbVgroup));
CTG_ERR_JRET(ctgGetDBVgroup(pCatalog, pRpc, pMgmtEps, pDBName, false, &dbVgroup));
if (tbMeta->tableType == TSDB_SUPER_TABLE) {
CTG_ERR_JRET(ctgGetVgInfoFromDB(pCatalog, pRpc, pMgmtEps, &dbVgroup, pVgroupList));
......@@ -594,6 +608,7 @@ _return:
tfree(tbMeta);
taosArrayDestroy(*pVgroupList);
*pVgroupList = NULL;
CTG_RET(code);
}
......@@ -604,7 +619,7 @@ int32_t catalogGetTableHashVgroup(struct SCatalog *pCatalog, void *pTransporter,
int32_t code = 0;
int32_t vgId = 0;
CTG_ERR_RET(catalogGetDBVgroup(pCatalog, pTransporter, pMgmtEps, pDBName, false, &dbInfo));
CTG_ERR_RET(ctgGetDBVgroup(pCatalog, pTransporter, pMgmtEps, pDBName, false, &dbInfo));
if (dbInfo.vgVersion < 0 || NULL == dbInfo.vgInfo) {
ctgError("db[%s] vgroup cache invalid, vgroup version:%d, vgInfo:%p", pDBName, dbInfo.vgVersion, dbInfo.vgInfo);
......@@ -627,12 +642,15 @@ int32_t catalogGetAllMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* p
if (pReq->pTableName) {
char dbName[TSDB_DB_FNAME_LEN];
int32_t tbNum = (int32_t)taosArrayGetSize(pReq->pTableName);
if (tbNum > 0) {
pRsp->pTableMeta = taosArrayInit(tbNum, POINTER_BYTES);
if (NULL == pRsp->pTableMeta) {
ctgError("taosArrayInit num[%d] failed", tbNum);
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
}
if (tbNum <= 0) {
ctgError("empty table name list");
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
}
pRsp->pTableMeta = taosArrayInit(tbNum, POINTER_BYTES);
if (NULL == pRsp->pTableMeta) {
ctgError("taosArrayInit num[%d] failed", tbNum);
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
}
for (int32_t i = 0; i < tbNum; ++i) {
......@@ -663,6 +681,7 @@ _return:
}
taosArrayDestroy(pRsp->pTableMeta);
pRsp->pTableMeta = NULL;
}
CTG_RET(code);
......
......@@ -791,9 +791,29 @@ void schDropJobAllTasks(SSchJob *job) {
}
}
uint64_t schGenSchId(void) {
uint64_t sId = 0;
// TODO
qDebug("Gen sId:0x%"PRIx64, sId);
return sId;
}
int32_t schedulerInit(SSchedulerCfg *cfg) {
if (schMgmt.jobs) {
qError("scheduler already init");
return TSDB_CODE_QRY_INVALID_INPUT;
}
if (cfg) {
schMgmt.cfg = *cfg;
if (schMgmt.cfg.maxJobNum <= 0) {
schMgmt.cfg.maxJobNum = SCHEDULE_DEFAULT_JOB_NUMBER;
}
} else {
schMgmt.cfg.maxJobNum = SCHEDULE_DEFAULT_JOB_NUMBER;
}
......@@ -803,18 +823,14 @@ int32_t schedulerInit(SSchedulerCfg *cfg) {
SCH_ERR_LRET(TSDB_CODE_QRY_OUT_OF_MEMORY, "init %d schduler jobs failed", schMgmt.cfg.maxJobNum);
}
schMgmt.sId = 1; //TODO GENERATE A UUID
schMgmt.sId = schGenSchId();
return TSDB_CODE_SUCCESS;
}
int32_t scheduleExecJobImpl(void *transport, SArray *qnodeList, SQueryDag* pDag, void** pJob, bool syncSchedule) {
if (NULL == transport || NULL == transport ||NULL == pDag || NULL == pDag->pSubplans || NULL == pJob) {
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
if (taosArrayGetSize(qnodeList) <= 0) {
if (qnodeList && taosArrayGetSize(qnodeList) <= 0) {
qInfo("qnodeList is empty");
}
......@@ -882,6 +898,10 @@ _return:
}
int32_t scheduleExecJob(void *transport, SArray *qnodeList, SQueryDag* pDag, void** pJob, uint64_t *numOfRows) {
if (NULL == transport || /* NULL == qnodeList || */ NULL == pDag || NULL == pDag->pSubplans || NULL == pJob || NULL == numOfRows) {
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
*numOfRows = 0;
SCH_ERR_RET(scheduleExecJobImpl(transport, qnodeList, pDag, pJob, true));
......@@ -894,6 +914,10 @@ int32_t scheduleExecJob(void *transport, SArray *qnodeList, SQueryDag* pDag, voi
}
int32_t scheduleAsyncExecJob(void *transport, SArray *qnodeList, SQueryDag* pDag, void** pJob) {
if (NULL == transport || NULL == qnodeList ||NULL == pDag || NULL == pDag->pSubplans || NULL == pJob) {
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
return scheduleExecJobImpl(transport, qnodeList, pDag, pJob, false);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册