未验证 提交 6b09fef9 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #11178 from taosdata/feature/qnode

Feature/qnode
......@@ -60,8 +60,10 @@ int32_t parseNatualDuration(const char* token, int32_t tokenLen, int64_t* durati
int32_t taosParseTime(const char* timestr, int64_t* time, int32_t len, int32_t timePrec, int8_t dayligth);
void deltaToUtcInitOnce();
char getPrecisionUnit(int32_t precision);
int64_t convertTimePrecision(int64_t time, int32_t fromPrecision, int32_t toPrecision);
int64_t convertTimeFromPrecisionToUnit(int64_t time, int32_t fromPrecision, char toUnit);
void taosFormatUtcTime(char *buf, int32_t bufLen, int64_t time, int32_t precision);
......
......@@ -15,5 +15,10 @@
#include "cmdnodes.h"
#include "tmsg.h"
#include "plannodes.h"
int32_t qExecCommand(SNode* pStmt, SRetrieveTableRsp** pRsp);
int32_t qExecStaticExplain(SQueryPlan *pDag, SRetrieveTableRsp **pRsp);
......@@ -215,6 +215,8 @@ int32_t nodesStringToNode(const char* pStr, SNode** pNode);
int32_t nodesListToString(const SNodeList* pList, bool format, char** pStr, int32_t* pLen);
int32_t nodesStringToList(const char* pStr, SNodeList** pList);
int32_t nodesNodeToSQL(SNode *pNode, char *buf, int32_t bufSize, int32_t *len);
#ifdef __cplusplus
}
#endif
......
......@@ -253,7 +253,7 @@ typedef struct SIntervalPhysiNode {
int64_t sliding;
int8_t intervalUnit;
int8_t slidingUnit;
uint8_t precision;
uint8_t precision;
SFillNode* pFill;
} SIntervalPhysiNode;
......
......@@ -314,6 +314,7 @@ bool nodesIsTimeorderQuery(const SNode* pQuery);
bool nodesIsTimelineQuery(const SNode* pQuery);
void* nodesGetValueFromNode(SValueNode *pNode);
char* nodesGetStrValueFromNode(SValueNode *pNode);
#ifdef __cplusplus
}
......
......@@ -53,6 +53,7 @@ typedef struct SIndexMeta {
} SIndexMeta;
/*
* ASSERT(sizeof(SCTableMeta) == 24)
* ASSERT(tableType == TSDB_CHILD_TABLE)
......@@ -235,6 +236,11 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t
} \
} while (0)
#define QRY_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0)
#define QRY_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0)
#define QRY_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0)
#ifdef __cplusplus
}
#endif
......
......@@ -46,6 +46,14 @@ extern "C" {
#define MILLISECOND_PER_DAY (MILLISECOND_PER_HOUR * 24)
#define MILLISECOND_PER_WEEK (MILLISECOND_PER_DAY * 7)
#define NANOSECOND_PER_USEC (1000L)
#define NANOSECOND_PER_MSEC (1000000L)
#define NANOSECOND_PER_SEC (1000000000L)
#define NANOSECOND_PER_MINUTE (NANOSECOND_PER_SEC * 60)
#define NANOSECOND_PER_HOUR (NANOSECOND_PER_MINUTE * 60)
#define NANOSECOND_PER_DAY (NANOSECOND_PER_HOUR * 24)
#define NANOSECOND_PER_WEEK (NANOSECOND_PER_DAY * 7)
int32_t taosGetTimeOfDay(struct timeval *tv);
//@return timestamp in second
......
......@@ -224,6 +224,7 @@ typedef enum ELogicConditionType {
#define TSDB_APP_NAME_LEN TSDB_UNI_LEN
#define TSDB_STB_COMMENT_LEN 1024
/**
* In some scenarios uint16_t (0~65535) is used to store the row len.
* - Firstly, we use 65531(65535 - 4), as the SDataRow/SKVRow contains 4 bits header.
......@@ -388,6 +389,7 @@ typedef enum ELogicConditionType {
#define TSDB_DEFAULT_EXPLAIN_RATIO 0.001
#define TSDB_EXPLAIN_RESULT_ROW_SIZE 1024
#define TSDB_EXPLAIN_RESULT_COLUMN_NAME "QUERY PLAN"
#define TSDB_MAX_JOIN_TABLE_NUM 10
#define TSDB_MAX_UNION_CLAUSE 5
......@@ -479,6 +481,9 @@ enum {
#define QND_VGID 1
#define VND_VGID 0
#define MAX_NUM_STR_SIZE 40
#ifdef __cplusplus
}
#endif
......
......@@ -361,6 +361,18 @@ int32_t parseLocaltimeDst(char* timestr, int64_t* time, int32_t timePrec) {
return 0;
}
char getPrecisionUnit(int32_t precision) {
static char units[3] = {TIME_UNIT_MILLISECOND, TIME_UNIT_MICROSECOND, TIME_UNIT_NANOSECOND};
switch (precision) {
case TSDB_TIME_PRECISION_MILLI:
case TSDB_TIME_PRECISION_MICRO:
case TSDB_TIME_PRECISION_NANO:
return units[precision];
default:
return 0;
}
}
int64_t convertTimePrecision(int64_t time, int32_t fromPrecision, int32_t toPrecision) {
assert(fromPrecision == TSDB_TIME_PRECISION_MILLI || fromPrecision == TSDB_TIME_PRECISION_MICRO ||
fromPrecision == TSDB_TIME_PRECISION_NANO);
......@@ -370,6 +382,33 @@ int64_t convertTimePrecision(int64_t time, int32_t fromPrecision, int32_t toPrec
return (int64_t)((double)time * factors[fromPrecision][toPrecision]);
}
int64_t convertTimeFromPrecisionToUnit(int64_t time, int32_t fromPrecision, char toUnit) {
assert(fromPrecision == TSDB_TIME_PRECISION_MILLI || fromPrecision == TSDB_TIME_PRECISION_MICRO ||
fromPrecision == TSDB_TIME_PRECISION_NANO);
static double factors[3] = {1000000., 1000., 1.};
switch (toUnit) {
case 's':
return time * factors[fromPrecision] / NANOSECOND_PER_SEC;
case 'm':
return time * factors[fromPrecision] / NANOSECOND_PER_MINUTE;
case 'h':
return time * factors[fromPrecision] / NANOSECOND_PER_HOUR;
case 'd':
return time * factors[fromPrecision] / NANOSECOND_PER_DAY;
case 'w':
return time * factors[fromPrecision] / NANOSECOND_PER_WEEK;
case 'a':
return time * factors[fromPrecision] / NANOSECOND_PER_MSEC;
case 'u':
return time * factors[fromPrecision] / NANOSECOND_PER_USEC;
case 'b':
return time * factors[fromPrecision];
default: {
return -1;
}
}
}
static int32_t getDuration(int64_t val, char unit, int64_t* result, int32_t timePrecision) {
switch (unit) {
case 's':
......@@ -688,4 +727,4 @@ void taosFormatUtcTime(char* buf, int32_t bufLen, int64_t t, int32_t precision)
length += (int32_t)strftime(ts + length, 40 - length, "%z", ptm);
tstrncpy(buf, ts, bufLen);
}
\ No newline at end of file
}
......@@ -112,7 +112,14 @@ typedef struct SCtgRuntimeStat {
} SCtgRuntimeStat;
typedef struct SCtgCacheStat {
uint64_t clusterNum;
uint64_t dbNum;
uint64_t tblNum;
uint64_t stblNum;
uint64_t vgHitNum;
uint64_t vgMissNum;
uint64_t tblHitNum;
uint64_t tblMissNum;
} SCtgCacheStat;
typedef struct SCatalogStat {
......@@ -186,7 +193,7 @@ typedef struct SCatalogMgmt {
bool exit;
SRWLatch lock;
SCtgQueue queue;
TdThread updateThread;
TdThread updateThread;
SHashObj *pCluster; //key: clusterId, value: SCatalog*
SCatalogStat stat;
SCatalogCfg cfg;
......@@ -204,8 +211,13 @@ typedef struct SCtgAction {
#define CTG_QUEUE_ADD() atomic_add_fetch_64(&gCtgMgmt.queue.qRemainNum, 1)
#define CTG_QUEUE_SUB() atomic_sub_fetch_64(&gCtgMgmt.queue.qRemainNum, 1)
#define CTG_STAT_ADD(n) atomic_add_fetch_64(&(n), 1)
#define CTG_STAT_SUB(n) atomic_sub_fetch_64(&(n), 1)
#define CTG_STAT_ADD(_item, _n) atomic_add_fetch_64(&(_item), _n)
#define CTG_STAT_SUB(_item, _n) atomic_sub_fetch_64(&(_item), _n)
#define CTG_STAT_GET(_item) atomic_load_64(&(_item))
#define CTG_RUNTIME_STAT_ADD(item, n) (CTG_STAT_ADD(gCtgMgmt.stat.runtime.item, n))
#define CTG_CACHE_STAT_ADD(item, n) (CTG_STAT_ADD(gCtgMgmt.stat.cache.item, n))
#define CTG_CACHE_STAT_SUB(item, n) (CTG_STAT_SUB(gCtgMgmt.stat.cache.item, n))
#define CTG_IS_META_NULL(type) ((type) == META_TYPE_NULL_TABLE)
#define CTG_IS_META_CTABLE(type) ((type) == META_TYPE_CTABLE)
......@@ -291,6 +303,9 @@ typedef struct SCtgAction {
#define CTG_API_ENTER() do { CTG_API_DEBUG("CTG API enter %s", __FUNCTION__); CTG_LOCK(CTG_READ, &gCtgMgmt.lock); if (atomic_load_8((int8_t*)&gCtgMgmt.exit)) { CTG_API_LEAVE(TSDB_CODE_CTG_OUT_OF_SERVICE); } } while (0)
extern void ctgdShowTableMeta(SCatalog* pCtg, const char *tbName, STableMeta* p);
extern void ctgdShowClusterCache(SCatalog* pCtg);
extern int32_t ctgdShowCacheInfo(void);
#ifdef __cplusplus
}
......
此差异已折叠。
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "trpc.h"
#include "query.h"
#include "tname.h"
#include "catalogInt.h"
extern SCatalogMgmt gCtgMgmt;
SCtgDebug gCTGDebug = {0};
int32_t ctgdEnableDebug(char *option) {
if (0 == strcasecmp(option, "lock")) {
gCTGDebug.lockEnable = true;
qDebug("lock debug enabled");
return TSDB_CODE_SUCCESS;
}
if (0 == strcasecmp(option, "cache")) {
gCTGDebug.cacheEnable = true;
qDebug("cache debug enabled");
return TSDB_CODE_SUCCESS;
}
if (0 == strcasecmp(option, "api")) {
gCTGDebug.apiEnable = true;
qDebug("api debug enabled");
return TSDB_CODE_SUCCESS;
}
if (0 == strcasecmp(option, "meta")) {
gCTGDebug.metaEnable = true;
qDebug("api debug enabled");
return TSDB_CODE_SUCCESS;
}
qError("invalid debug option:%s", option);
return TSDB_CODE_CTG_INTERNAL_ERROR;
}
int32_t ctgdGetStatNum(char *option, void *res) {
if (0 == strcasecmp(option, "runtime.qDoneNum")) {
*(uint64_t *)res = atomic_load_64(&gCtgMgmt.stat.runtime.qDoneNum);
return TSDB_CODE_SUCCESS;
}
qError("invalid stat option:%s", option);
return TSDB_CODE_CTG_INTERNAL_ERROR;
}
int32_t ctgdGetTbMetaNum(SCtgDBCache *dbCache) {
return dbCache->tbCache.metaCache ? (int32_t)taosHashGetSize(dbCache->tbCache.metaCache) : 0;
}
int32_t ctgdGetStbNum(SCtgDBCache *dbCache) {
return dbCache->tbCache.stbCache ? (int32_t)taosHashGetSize(dbCache->tbCache.stbCache) : 0;
}
int32_t ctgdGetRentNum(SCtgRentMgmt *rent) {
int32_t num = 0;
for (uint16_t i = 0; i < rent->slotNum; ++i) {
SCtgRentSlot *slot = &rent->slots[i];
if (NULL == slot->meta) {
continue;
}
num += taosArrayGetSize(slot->meta);
}
return num;
}
int32_t ctgdGetClusterCacheNum(SCatalog* pCtg, int32_t type) {
if (NULL == pCtg || NULL == pCtg->dbCache) {
return 0;
}
switch (type) {
case CTG_DBG_DB_NUM:
return (int32_t)taosHashGetSize(pCtg->dbCache);
case CTG_DBG_DB_RENT_NUM:
return ctgdGetRentNum(&pCtg->dbRent);
case CTG_DBG_STB_RENT_NUM:
return ctgdGetRentNum(&pCtg->stbRent);
default:
break;
}
SCtgDBCache *dbCache = NULL;
int32_t num = 0;
void *pIter = taosHashIterate(pCtg->dbCache, NULL);
while (pIter) {
dbCache = (SCtgDBCache *)pIter;
switch (type) {
case CTG_DBG_META_NUM:
num += ctgdGetTbMetaNum(dbCache);
break;
case CTG_DBG_STB_NUM:
num += ctgdGetStbNum(dbCache);
break;
default:
ctgError("invalid type:%d", type);
break;
}
pIter = taosHashIterate(pCtg->dbCache, pIter);
}
return num;
}
void ctgdShowTableMeta(SCatalog* pCtg, const char *tbName, STableMeta* p) {
if (!gCTGDebug.metaEnable) {
return;
}
STableComInfo *c = &p->tableInfo;
if (TSDB_CHILD_TABLE == p->tableType) {
ctgDebug("table [%s] meta: type:%d, vgId:%d, uid:%" PRIx64 ",suid:%" PRIx64, tbName, p->tableType, p->vgId, p->uid, p->suid);
return;
} else {
ctgDebug("table [%s] meta: type:%d, vgId:%d, uid:%" PRIx64 ",suid:%" PRIx64 ",sv:%d, tv:%d, tagNum:%d, precision:%d, colNum:%d, rowSize:%d",
tbName, p->tableType, p->vgId, p->uid, p->suid, p->sversion, p->tversion, c->numOfTags, c->precision, c->numOfColumns, c->rowSize);
}
int32_t colNum = c->numOfColumns + c->numOfTags;
for (int32_t i = 0; i < colNum; ++i) {
SSchema *s = &p->schema[i];
ctgDebug("[%d] name:%s, type:%d, colId:%d, bytes:%d", i, s->name, s->type, s->colId, s->bytes);
}
}
void ctgdShowDBCache(SCatalog* pCtg, SHashObj *dbHash) {
if (NULL == dbHash || !gCTGDebug.cacheEnable) {
return;
}
int32_t i = 0;
SCtgDBCache *dbCache = NULL;
void *pIter = taosHashIterate(dbHash, NULL);
while (pIter) {
char *dbFName = NULL;
size_t len = 0;
dbCache = (SCtgDBCache *)pIter;
dbFName = taosHashGetKey(pIter, &len);
int32_t metaNum = dbCache->tbCache.metaCache ? taosHashGetSize(dbCache->tbCache.metaCache) : 0;
int32_t stbNum = dbCache->tbCache.stbCache ? taosHashGetSize(dbCache->tbCache.stbCache) : 0;
int32_t vgVersion = CTG_DEFAULT_INVALID_VERSION;
int32_t hashMethod = -1;
int32_t vgNum = 0;
if (dbCache->vgInfo) {
vgVersion = dbCache->vgInfo->vgVersion;
hashMethod = dbCache->vgInfo->hashMethod;
if (dbCache->vgInfo->vgHash) {
vgNum = taosHashGetSize(dbCache->vgInfo->vgHash);
}
}
ctgDebug("[%d] db [%.*s][%"PRIx64"] %s: metaNum:%d, stbNum:%d, vgVersion:%d, hashMethod:%d, vgNum:%d",
i, (int32_t)len, dbFName, dbCache->dbId, dbCache->deleted?"deleted":"", metaNum, stbNum, vgVersion, hashMethod, vgNum);
pIter = taosHashIterate(dbHash, pIter);
}
}
void ctgdShowClusterCache(SCatalog* pCtg) {
if (!gCTGDebug.cacheEnable || NULL == pCtg) {
return;
}
ctgDebug("## cluster %"PRIx64" %p cache Info BEGIN ##", pCtg->clusterId, pCtg);
ctgDebug("db:%d meta:%d stb:%d dbRent:%d stbRent:%d", ctgdGetClusterCacheNum(pCtg, CTG_DBG_DB_NUM), ctgdGetClusterCacheNum(pCtg, CTG_DBG_META_NUM),
ctgdGetClusterCacheNum(pCtg, CTG_DBG_STB_NUM), ctgdGetClusterCacheNum(pCtg, CTG_DBG_DB_RENT_NUM), ctgdGetClusterCacheNum(pCtg, CTG_DBG_STB_RENT_NUM));
ctgdShowDBCache(pCtg, pCtg->dbCache);
ctgDebug("## cluster %"PRIx64" %p cache Info END ##", pCtg->clusterId, pCtg);
}
int32_t ctgdShowCacheInfo(void) {
if (!gCTGDebug.cacheEnable) {
return TSDB_CODE_CTG_OUT_OF_SERVICE;
}
CTG_API_ENTER();
SCatalog *pCtg = NULL;
void *pIter = taosHashIterate(gCtgMgmt.pCluster, NULL);
while (pIter) {
pCtg = *(SCatalog **)pIter;
if (pCtg) {
ctgdShowClusterCache(pCtg);
}
pIter = taosHashIterate(gCtgMgmt.pCluster, pIter);
}
CTG_API_LEAVE(TSDB_CODE_SUCCESS);
}
......@@ -38,11 +38,11 @@
namespace {
extern "C" int32_t ctgGetTableMetaFromCache(struct SCatalog *pCatalog, const SName *pTableName, STableMeta **pTableMeta,
int32_t *exist, int32_t flag, uint64_t *dbId);
extern "C" int32_t ctgDbgGetClusterCacheNum(struct SCatalog* pCatalog, int32_t type);
bool *inCache, int32_t flag, uint64_t *dbId);
extern "C" int32_t ctgdGetClusterCacheNum(struct SCatalog* pCatalog, int32_t type);
extern "C" int32_t ctgActUpdateTbl(SCtgMetaAction *action);
extern "C" int32_t ctgDbgEnableDebug(char *option);
extern "C" int32_t ctgDbgGetStatNum(char *option, void *res);
extern "C" int32_t ctgdEnableDebug(char *option);
extern "C" int32_t ctgdGetStatNum(char *option, void *res);
void ctgTestSetRspTableMeta();
void ctgTestSetRspCTableMeta();
......@@ -140,9 +140,9 @@ void ctgTestInitLogFile() {
qDebugFlag = 159;
strcpy(tsLogDir, "/var/log/taos");
ctgDbgEnableDebug("api");
ctgDbgEnableDebug("meta");
ctgDbgEnableDebug("cache");
ctgdEnableDebug("api");
ctgdEnableDebug("meta");
ctgdEnableDebug("cache");
if (taosInitLog(defaultLogFileNamePrefix, maxLogFileNum) < 0) {
printf("failed to open log file in directory:%s\n", tsLogDir);
......@@ -786,15 +786,15 @@ void *ctgTestGetCtableMetaThread(void *param) {
int32_t code = 0;
int32_t n = 0;
STableMeta *tbMeta = NULL;
int32_t exist = 0;
bool inCache = false;
SName cn = {.type = TSDB_TABLE_NAME_T, .acctId = 1};
strcpy(cn.dbname, "db1");
strcpy(cn.tname, ctgTestCTablename);
while (!ctgTestStop) {
code = ctgGetTableMetaFromCache(pCtg, &cn, &tbMeta, &exist, 0, NULL);
if (code || 0 == exist) {
code = ctgGetTableMetaFromCache(pCtg, &cn, &tbMeta, &inCache, 0, NULL);
if (code || !inCache) {
assert(0);
}
......@@ -879,7 +879,7 @@ TEST(tableMeta, normalTable) {
ASSERT_EQ(vgInfo.vgId, 8);
ASSERT_EQ(vgInfo.epSet.numOfEps, 3);
while (0 == ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_DB_NUM)) {
while (0 == ctgdGetClusterCacheNum(pCtg, CTG_DBG_DB_NUM)) {
taosMsleep(50);
}
......@@ -899,7 +899,7 @@ TEST(tableMeta, normalTable) {
ASSERT_EQ(tableMeta->tableInfo.rowSize, 12);
while (true) {
uint32_t n = ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_META_NUM);
uint32_t n = ctgdGetClusterCacheNum(pCtg, CTG_DBG_META_NUM);
if (0 == n) {
taosMsleep(50);
} else {
......@@ -994,7 +994,7 @@ TEST(tableMeta, childTableCase) {
ASSERT_EQ(tableMeta->tableInfo.rowSize, 12);
while (true) {
uint32_t n = ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_META_NUM);
uint32_t n = ctgdGetClusterCacheNum(pCtg, CTG_DBG_META_NUM);
if (0 == n) {
taosMsleep(50);
} else {
......@@ -1103,7 +1103,7 @@ TEST(tableMeta, superTableCase) {
ASSERT_EQ(tableMeta->tableInfo.rowSize, 12);
while (true) {
uint32_t n = ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_META_NUM);
uint32_t n = ctgdGetClusterCacheNum(pCtg, CTG_DBG_META_NUM);
if (0 == n) {
taosMsleep(50);
} else {
......@@ -1130,7 +1130,7 @@ TEST(tableMeta, superTableCase) {
ASSERT_EQ(tableMeta->tableInfo.rowSize, 12);
while (true) {
uint32_t n = ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_META_NUM);
uint32_t n = ctgdGetClusterCacheNum(pCtg, CTG_DBG_META_NUM);
if (2 != n) {
taosMsleep(50);
} else {
......@@ -1228,7 +1228,7 @@ TEST(tableMeta, rmStbMeta) {
ASSERT_EQ(tableMeta->tableInfo.rowSize, 12);
while (true) {
uint32_t n = ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_META_NUM);
uint32_t n = ctgdGetClusterCacheNum(pCtg, CTG_DBG_META_NUM);
if (0 == n) {
taosMsleep(50);
} else {
......@@ -1241,8 +1241,8 @@ TEST(tableMeta, rmStbMeta) {
ASSERT_EQ(code, 0);
while (true) {
int32_t n = ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_META_NUM);
int32_t m = ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_STB_RENT_NUM);
int32_t n = ctgdGetClusterCacheNum(pCtg, CTG_DBG_META_NUM);
int32_t m = ctgdGetClusterCacheNum(pCtg, CTG_DBG_STB_RENT_NUM);
if (n || m) {
taosMsleep(50);
} else {
......@@ -1251,11 +1251,11 @@ TEST(tableMeta, rmStbMeta) {
}
ASSERT_EQ(ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_DB_NUM), 1);
ASSERT_EQ(ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_META_NUM), 0);
ASSERT_EQ(ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_STB_NUM), 0);
ASSERT_EQ(ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_DB_RENT_NUM), 1);
ASSERT_EQ(ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_STB_RENT_NUM), 0);
ASSERT_EQ(ctgdGetClusterCacheNum(pCtg, CTG_DBG_DB_NUM), 1);
ASSERT_EQ(ctgdGetClusterCacheNum(pCtg, CTG_DBG_META_NUM), 0);
ASSERT_EQ(ctgdGetClusterCacheNum(pCtg, CTG_DBG_STB_NUM), 0);
ASSERT_EQ(ctgdGetClusterCacheNum(pCtg, CTG_DBG_DB_RENT_NUM), 1);
ASSERT_EQ(ctgdGetClusterCacheNum(pCtg, CTG_DBG_STB_RENT_NUM), 0);
catalogDestroy();
memset(&gCtgMgmt, 0, sizeof(gCtgMgmt));
......@@ -1298,7 +1298,7 @@ TEST(tableMeta, updateStbMeta) {
ASSERT_EQ(tableMeta->tableInfo.rowSize, 12);
while (true) {
uint32_t n = ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_META_NUM);
uint32_t n = ctgdGetClusterCacheNum(pCtg, CTG_DBG_META_NUM);
if (0 == n) {
taosMsleep(50);
} else {
......@@ -1318,7 +1318,7 @@ TEST(tableMeta, updateStbMeta) {
while (true) {
uint64_t n = 0;
ctgDbgGetStatNum("runtime.qDoneNum", (void *)&n);
ctgdGetStatNum("runtime.qDoneNum", (void *)&n);
if (n != 3) {
taosMsleep(50);
} else {
......@@ -1326,11 +1326,11 @@ TEST(tableMeta, updateStbMeta) {
}
}
ASSERT_EQ(ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_DB_NUM), 1);
ASSERT_EQ(ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_META_NUM), 1);
ASSERT_EQ(ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_STB_NUM), 1);
ASSERT_EQ(ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_DB_RENT_NUM), 1);
ASSERT_EQ(ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_STB_RENT_NUM), 1);
ASSERT_EQ(ctgdGetClusterCacheNum(pCtg, CTG_DBG_DB_NUM), 1);
ASSERT_EQ(ctgdGetClusterCacheNum(pCtg, CTG_DBG_META_NUM), 1);
ASSERT_EQ(ctgdGetClusterCacheNum(pCtg, CTG_DBG_STB_NUM), 1);
ASSERT_EQ(ctgdGetClusterCacheNum(pCtg, CTG_DBG_DB_RENT_NUM), 1);
ASSERT_EQ(ctgdGetClusterCacheNum(pCtg, CTG_DBG_STB_RENT_NUM), 1);
code = catalogGetTableMeta(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &tableMeta);
ASSERT_EQ(code, 0);
......@@ -1388,7 +1388,7 @@ TEST(refreshGetMeta, normal2normal) {
while (true) {
uint64_t n = 0;
ctgDbgGetStatNum("runtime.qDoneNum", (void *)&n);
ctgdGetStatNum("runtime.qDoneNum", (void *)&n);
if (n > 0) {
break;
}
......@@ -1409,7 +1409,7 @@ TEST(refreshGetMeta, normal2normal) {
ASSERT_EQ(tableMeta->tableInfo.rowSize, 12);
taosMemoryFreeClear(tableMeta);
while (0 == ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_META_NUM)) {
while (0 == ctgdGetClusterCacheNum(pCtg, CTG_DBG_META_NUM)) {
taosMsleep(50);
}
......@@ -1467,7 +1467,7 @@ TEST(refreshGetMeta, normal2notexist) {
while (true) {
uint64_t n = 0;
ctgDbgGetStatNum("runtime.qDoneNum", (void *)&n);
ctgdGetStatNum("runtime.qDoneNum", (void *)&n);
if (n > 0) {
break;
}
......@@ -1488,7 +1488,7 @@ TEST(refreshGetMeta, normal2notexist) {
ASSERT_EQ(tableMeta->tableInfo.rowSize, 12);
taosMemoryFreeClear(tableMeta);
while (0 == ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_META_NUM)) {
while (0 == ctgdGetClusterCacheNum(pCtg, CTG_DBG_META_NUM)) {
taosMsleep(50);
}
......@@ -1541,7 +1541,7 @@ TEST(refreshGetMeta, normal2child) {
while (true) {
uint64_t n = 0;
ctgDbgGetStatNum("runtime.qDoneNum", (void *)&n);
ctgdGetStatNum("runtime.qDoneNum", (void *)&n);
if (n > 0) {
break;
}
......@@ -1562,7 +1562,7 @@ TEST(refreshGetMeta, normal2child) {
ASSERT_EQ(tableMeta->tableInfo.rowSize, 12);
taosMemoryFreeClear(tableMeta);
while (0 == ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_META_NUM)) {
while (0 == ctgdGetClusterCacheNum(pCtg, CTG_DBG_META_NUM)) {
taosMsleep(50);
}
......@@ -1625,7 +1625,7 @@ TEST(refreshGetMeta, stable2child) {
while (true) {
uint64_t n = 0;
ctgDbgGetStatNum("runtime.qDoneNum", (void *)&n);
ctgdGetStatNum("runtime.qDoneNum", (void *)&n);
if (n > 0) {
break;
}
......@@ -1647,7 +1647,7 @@ TEST(refreshGetMeta, stable2child) {
ASSERT_EQ(tableMeta->tableInfo.rowSize, 12);
taosMemoryFreeClear(tableMeta);
while (0 == ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_META_NUM)) {
while (0 == ctgdGetClusterCacheNum(pCtg, CTG_DBG_META_NUM)) {
taosMsleep(50);
}
......@@ -1710,7 +1710,7 @@ TEST(refreshGetMeta, stable2stable) {
while (true) {
uint64_t n = 0;
ctgDbgGetStatNum("runtime.qDoneNum", (void *)&n);
ctgdGetStatNum("runtime.qDoneNum", (void *)&n);
if (n > 0) {
break;
}
......@@ -1732,7 +1732,7 @@ TEST(refreshGetMeta, stable2stable) {
ASSERT_EQ(tableMeta->tableInfo.rowSize, 12);
taosMemoryFreeClear(tableMeta);
while (0 == ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_META_NUM)) {
while (0 == ctgdGetClusterCacheNum(pCtg, CTG_DBG_META_NUM)) {
taosMsleep(50);
}
......@@ -1798,7 +1798,7 @@ TEST(refreshGetMeta, child2stable) {
while (true) {
uint64_t n = 0;
ctgDbgGetStatNum("runtime.qDoneNum", (void *)&n);
ctgdGetStatNum("runtime.qDoneNum", (void *)&n);
if (n > 0) {
break;
}
......@@ -1818,7 +1818,7 @@ TEST(refreshGetMeta, child2stable) {
ASSERT_EQ(tableMeta->tableInfo.rowSize, 12);
taosMemoryFreeClear(tableMeta);
while (2 != ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_META_NUM)) {
while (2 != ctgdGetClusterCacheNum(pCtg, CTG_DBG_META_NUM)) {
taosMsleep(50);
}
......@@ -2015,7 +2015,7 @@ TEST(dbVgroup, getSetDbVgroupCase) {
while (true) {
uint64_t n = 0;
ctgDbgGetStatNum("runtime.qDoneNum", (void *)&n);
ctgdGetStatNum("runtime.qDoneNum", (void *)&n);
if (n > 0) {
break;
}
......@@ -2041,7 +2041,7 @@ TEST(dbVgroup, getSetDbVgroupCase) {
while (true) {
uint64_t n = 0;
ctgDbgGetStatNum("runtime.qDoneNum", (void *)&n);
ctgdGetStatNum("runtime.qDoneNum", (void *)&n);
if (n != 3) {
taosMsleep(50);
} else {
......@@ -2266,7 +2266,7 @@ TEST(rentTest, allRent) {
ASSERT_EQ(tableMeta->tableInfo.precision, 1);
ASSERT_EQ(tableMeta->tableInfo.rowSize, 12);
while (ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_META_NUM) < i) {
while (ctgdGetClusterCacheNum(pCtg, CTG_DBG_META_NUM) < i) {
taosMsleep(50);
}
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_QUERY_INT_H_
#define _TD_QUERY_INT_H_
#ifdef __cplusplus
extern "C" {
#endif
#include "nodes.h"
#include "plannodes.h"
#include "ttime.h"
#define EXPLAIN_MAX_GROUP_NUM 100
//newline area
#define EXPLAIN_TAG_SCAN_FORMAT "Tag Scan on %s columns=%d width=%d"
#define EXPLAIN_TBL_SCAN_FORMAT "Table Scan on %s columns=%d width=%d"
#define EXPLAIN_SYSTBL_SCAN_FORMAT "System Table Scan on %s columns=%d width=%d"
#define EXPLAIN_PROJECTION_FORMAT "Projection columns=%d width=%d"
#define EXPLAIN_JOIN_FORMAT "%s between %d tables width=%d"
#define EXPLAIN_AGG_FORMAT "Aggragate functions=%d"
#define EXPLAIN_EXCHANGE_FORMAT "Data Exchange %d:1 width=%d"
#define EXPLAIN_SORT_FORMAT "Sort on %d Column(s) width=%d"
#define EXPLAIN_INTERVAL_FORMAT "Interval on Column %s functions=%d interval=%" PRId64 "%c offset=%" PRId64 "%c sliding=%" PRId64 "%c width=%d"
#define EXPLAIN_SESSION_FORMAT "Session gap=%" PRId64 " functions=%d width=%d"
#define EXPLAIN_ORDER_FORMAT "Order: %s"
#define EXPLAIN_FILTER_FORMAT "Filter: "
#define EXPLAIN_FILL_FORMAT "Fill: %s"
#define EXPLAIN_ON_CONDITIONS_FORMAT "Join Cond: "
#define EXPLAIN_TIMERANGE_FORMAT "Time Range: [%" PRId64 ", %" PRId64 "]"
//append area
#define EXPLAIN_GROUPS_FORMAT " groups=%d"
#define EXPLAIN_WIDTH_FORMAT " width=%d"
#define EXPLAIN_LOOPS_FORMAT " loops=%d"
#define EXPLAIN_REVERSE_FORMAT " reverse=%d"
typedef struct SExplainGroup {
int32_t nodeNum;
SSubplan *plan;
void *execInfo; //TODO
} SExplainGroup;
typedef struct SExplainResNode {
SNodeList* pChildren;
SPhysiNode* pNode;
void* pExecInfo;
} SExplainResNode;
typedef struct SQueryExplainRowInfo {
int32_t level;
int32_t len;
char *buf;
} SQueryExplainRowInfo;
typedef struct SExplainCtx {
int32_t totalSize;
bool verbose;
char *tbuf;
SArray *rows;
SHashObj *groupHash;
} SExplainCtx;
#define EXPLAIN_ORDER_STRING(_order) ((TSDB_ORDER_ASC == _order) ? "Ascending" : "Descending")
#define EXPLAIN_JOIN_STRING(_type) ((JOIN_TYPE_INNER == _type) ? "Inner join" : "Join")
#define INVERAL_TIME_FROM_PRECISION_TO_UNIT(_t, _u, _p) (((_u) == 'n' || (_u) == 'y') ? (_t) : (convertTimeFromPrecisionToUnit(_t, _p, _u)))
#define EXPLAIN_ROW_NEW(level, ...) \
do { \
if (isVerboseLine) { \
tlen = snprintf(tbuf + VARSTR_HEADER_SIZE, TSDB_EXPLAIN_RESULT_ROW_SIZE, "%*s", (level) * 2 + 3, ""); \
} else { \
tlen = snprintf(tbuf + VARSTR_HEADER_SIZE, TSDB_EXPLAIN_RESULT_ROW_SIZE, "%*s%s", (level) * 2, "", "-> "); \
} \
tlen += snprintf(tbuf + VARSTR_HEADER_SIZE + tlen, TSDB_EXPLAIN_RESULT_ROW_SIZE - tlen, __VA_ARGS__); \
} while (0)
#define EXPLAIN_ROW_APPEND(...) tlen += snprintf(tbuf + VARSTR_HEADER_SIZE + tlen, TSDB_EXPLAIN_RESULT_ROW_SIZE - tlen, __VA_ARGS__)
#define EXPLAIN_ROW_END() do { varDataSetLen(tbuf, tlen); tlen += VARSTR_HEADER_SIZE; isVerboseLine = true; } while (0)
#ifdef __cplusplus
}
#endif
#endif /*_TD_QUERY_INT_H_*/
此差异已折叠。
......@@ -20,12 +20,16 @@
extern "C" {
#endif
#define nodesFatal(param, ...) qFatal("NODES: " param, __VA_ARGS__)
#define nodesError(param, ...) qError("NODES: " param, __VA_ARGS__)
#define nodesWarn(param, ...) qWarn("NODES: " param, __VA_ARGS__)
#define nodesInfo(param, ...) qInfo("NODES: " param, __VA_ARGS__)
#define nodesDebug(param, ...) qDebug("NODES: " param, __VA_ARGS__)
#define nodesTrace(param, ...) qTrace("NODES: " param, __VA_ARGS__)
#define nodesFatal(...) qFatal("NODES: " __VA_ARGS__)
#define nodesError(...) qError("NODES: " __VA_ARGS__)
#define nodesWarn(...) qWarn("NODES: " __VA_ARGS__)
#define nodesInfo(...) qInfo("NODES: " __VA_ARGS__)
#define nodesDebug(...) qDebug("NODES: " __VA_ARGS__)
#define nodesTrace(...) qTrace("NODES: " __VA_ARGS__)
#define NODES_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0)
#define NODES_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0)
#define NODES_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0)
#ifdef __cplusplus
}
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "cmdnodes.h"
#include "nodesUtil.h"
#include "plannodes.h"
#include "querynodes.h"
#include "taos.h"
#include "taoserror.h"
#include "thash.h"
char *gOperatorStr[] = {NULL, "+", "-", "*", "/", "%", "&", "|", ">", ">=", "<", "<=", "=", "<>",
"IN", "NOT IN", "LIKE", "NOT LIKE", "MATCH", "NMATCH", "IS NULL", "IS NOT NULL",
"IS TRUE", "IS FALSE", "IS UNKNOWN", "IS NOT TRUE", "IS NOT FALSE", "IS NOT UNKNOWN"};
char *gLogicConditionStr[] = {"AND", "OR", "NOT"};
int32_t nodesNodeToSQL(SNode *pNode, char *buf, int32_t bufSize, int32_t *len) {
switch (pNode->type) {
case QUERY_NODE_COLUMN: {
SColumnNode *colNode = (SColumnNode *)pNode;
if (colNode->dbName[0]) {
*len += snprintf(buf + *len, bufSize - *len, "`%s`.", colNode->dbName);
}
if (colNode->tableAlias[0]) {
*len += snprintf(buf + *len, bufSize - *len, "`%s`.", colNode->tableAlias);
} else if (colNode->tableName[0]) {
*len += snprintf(buf + *len, bufSize - *len, "`%s`.", colNode->tableName);
}
*len += snprintf(buf + *len, bufSize - *len, "`%s`", colNode->colName);
return TSDB_CODE_SUCCESS;
}
case QUERY_NODE_VALUE:{
SValueNode *colNode = (SValueNode *)pNode;
char *t = nodesGetStrValueFromNode(colNode);
if (NULL == t) {
nodesError("fail to get str value from valueNode");
NODES_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
}
*len += snprintf(buf + *len, bufSize - *len, "%s", t);
taosMemoryFree(t);
return TSDB_CODE_SUCCESS;
}
case QUERY_NODE_OPERATOR: {
SOperatorNode* pOpNode = (SOperatorNode*)pNode;
*len += snprintf(buf + *len, bufSize - *len, "(");
if (pOpNode->pLeft) {
NODES_ERR_RET(nodesNodeToSQL(pOpNode->pLeft, buf, bufSize, len));
}
if (pOpNode->opType >= (sizeof(gOperatorStr) / sizeof(gOperatorStr[0]))) {
nodesError("unknown operation type:%d", pOpNode->opType);
NODES_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
}
*len += snprintf(buf + *len, bufSize - *len, " %s ", gOperatorStr[pOpNode->opType]);
if (pOpNode->pRight) {
NODES_ERR_RET(nodesNodeToSQL(pOpNode->pRight, buf, bufSize, len));
}
*len += snprintf(buf + *len, bufSize - *len, ")");
return TSDB_CODE_SUCCESS;
}
case QUERY_NODE_LOGIC_CONDITION:{
SLogicConditionNode* pLogicNode = (SLogicConditionNode*)pNode;
SNode* node = NULL;
bool first = true;
*len += snprintf(buf + *len, bufSize - *len, "(");
FOREACH(node, pLogicNode->pParameterList) {
if (!first) {
*len += snprintf(buf + *len, bufSize - *len, " %s ", gLogicConditionStr[pLogicNode->condType]);
}
NODES_ERR_RET(nodesNodeToSQL(node, buf, bufSize, len));
first = false;
}
*len += snprintf(buf + *len, bufSize - *len, ")");
return TSDB_CODE_SUCCESS;
}
case QUERY_NODE_FUNCTION:{
SFunctionNode* pFuncNode = (SFunctionNode*)pNode;
SNode* node = NULL;
bool first = true;
*len += snprintf(buf + *len, bufSize - *len, "%s(", pFuncNode->functionName);
FOREACH(node, pFuncNode->pParameterList) {
if (!first) {
*len += snprintf(buf + *len, bufSize - *len, ", ");
}
NODES_ERR_RET(nodesNodeToSQL(node, buf, bufSize, len));
first = false;
}
*len += snprintf(buf + *len, bufSize - *len, ")");
return TSDB_CODE_SUCCESS;
}
case QUERY_NODE_NODE_LIST:{
SNodeListNode* pListNode = (SNodeListNode *)pNode;
SNode* node = NULL;
bool first = true;
*len += snprintf(buf + *len, bufSize - *len, "(");
FOREACH(node, pListNode->pNodeList) {
if (!first) {
*len += snprintf(buf + *len, bufSize - *len, ", ");
}
NODES_ERR_RET(nodesNodeToSQL(node, buf, bufSize, len));
first = false;
}
*len += snprintf(buf + *len, bufSize - *len, ")");
return TSDB_CODE_SUCCESS;
}
default:
break;
}
nodesError("nodesNodeToSQL unknown node = %s", nodesNodeName(pNode->type));
NODES_RET(TSDB_CODE_QRY_APP_ERROR);
}
......@@ -17,7 +17,8 @@
typedef enum ETraversalOrder {
TRAVERSAL_PREORDER = 1,
TRAVERSAL_POSTORDER
TRAVERSAL_INORDER,
TRAVERSAL_POSTORDER,
} ETraversalOrder;
static EDealRes walkList(SNodeList* pNodeList, ETraversalOrder order, FNodeWalker walker, void* pContext);
......
......@@ -790,6 +790,71 @@ void* nodesGetValueFromNode(SValueNode *pNode) {
return NULL;
}
char* nodesGetStrValueFromNode(SValueNode *pNode) {
switch (pNode->node.resType.type) {
case TSDB_DATA_TYPE_BOOL: {
void *buf = taosMemoryMalloc(MAX_NUM_STR_SIZE);
if (NULL == buf) {
return NULL;
}
sprintf(buf, "%s", pNode->datum.b ? "true" : "false");
return buf;
}
case TSDB_DATA_TYPE_TINYINT:
case TSDB_DATA_TYPE_SMALLINT:
case TSDB_DATA_TYPE_INT:
case TSDB_DATA_TYPE_BIGINT:
case TSDB_DATA_TYPE_TIMESTAMP: {
void *buf = taosMemoryMalloc(MAX_NUM_STR_SIZE);
if (NULL == buf) {
return NULL;
}
sprintf(buf, "%" PRId64, pNode->datum.i);
return buf;
}
case TSDB_DATA_TYPE_UTINYINT:
case TSDB_DATA_TYPE_USMALLINT:
case TSDB_DATA_TYPE_UINT:
case TSDB_DATA_TYPE_UBIGINT: {
void *buf = taosMemoryMalloc(MAX_NUM_STR_SIZE);
if (NULL == buf) {
return NULL;
}
sprintf(buf, "%" PRIu64, pNode->datum.u);
return buf;
}
case TSDB_DATA_TYPE_FLOAT:
case TSDB_DATA_TYPE_DOUBLE: {
void *buf = taosMemoryMalloc(MAX_NUM_STR_SIZE);
if (NULL == buf) {
return NULL;
}
sprintf(buf, "%e", pNode->datum.d);
return buf;
}
case TSDB_DATA_TYPE_NCHAR:
case TSDB_DATA_TYPE_VARCHAR:
case TSDB_DATA_TYPE_VARBINARY: {
int32_t bufSize = varDataLen(pNode->datum.p) + 2 + 1;
void *buf = taosMemoryMalloc(bufSize);
if (NULL == buf) {
return NULL;
}
snprintf(buf, bufSize, "'%s'", varDataVal(pNode->datum.p));
return buf;
}
default:
break;
}
return NULL;
}
bool nodesIsExprNode(const SNode* pNode) {
ENodeType type = nodeType(pNode);
return (QUERY_NODE_COLUMN == type || QUERY_NODE_VALUE == type || QUERY_NODE_OPERATOR == type || QUERY_NODE_FUNCTION == type);
......
......@@ -1952,6 +1952,7 @@ static int32_t extractExplainResultSchema(int32_t* numOfCols, SSchema** pSchema)
}
(*pSchema)[0].type = TSDB_DATA_TYPE_BINARY;
(*pSchema)[0].bytes = TSDB_EXPLAIN_RESULT_ROW_SIZE;
strcpy((*pSchema)[0].name, TSDB_EXPLAIN_RESULT_COLUMN_NAME);
return TSDB_CODE_SUCCESS;
}
......
......@@ -288,7 +288,7 @@ static int32_t createJoinLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
// set the output
if (TSDB_CODE_SUCCESS == code) {
pJoin->node.pTargets = nodesCloneList(pLeft->pTargets);
if (NULL == pJoin->pOnConditions) {
if (NULL == pJoin->node.pTargets) {
code = TSDB_CODE_OUT_OF_MEMORY;
}
if (TSDB_CODE_SUCCESS == code) {
......
......@@ -8,7 +8,7 @@ target_include_directories(
target_link_libraries(
qcom
PRIVATE os util transport
PRIVATE os util transport nodes
)
if(${BUILD_TEST})
......
......@@ -21,7 +21,6 @@ extern "C" {
#endif
#ifdef __cplusplus
}
#endif
......
......@@ -36,8 +36,6 @@ extern "C" {
#define FILTER_DUMMY_EMPTY_OPTR 127
#define MAX_NUM_STR_SIZE 40
#define FILTER_RM_UNIT_MIN_ROWS 100
enum {
......
......@@ -9,7 +9,7 @@ target_include_directories(
target_link_libraries(
scheduler
PUBLIC os util nodes planner qcom common catalog transport
PUBLIC os util nodes planner qcom common catalog transport command
)
if(${BUILD_TEST})
......
......@@ -142,10 +142,10 @@ typedef struct SSchTask {
} SSchTask;
typedef struct SSchJobAttr {
bool needFetch;
bool syncSchedule;
bool queryJob;
bool needFlowCtrl;
EExplainMode explainMode;
bool syncSchedule;
bool queryJob;
bool needFlowCtrl;
} SSchJobAttr;
typedef struct SSchJob {
......
......@@ -19,6 +19,7 @@
#include "tmsg.h"
#include "tref.h"
#include "trpc.h"
#include "command.h"
SSchedulerMgmt schMgmt = {0};
......@@ -2103,6 +2104,7 @@ static int32_t schExecJobImpl(void *transport, SArray *pNodeList, SQueryPlan *pD
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
pJob->attr.explainMode = pDag->explainInfo.mode;
pJob->attr.syncSchedule = syncSchedule;
pJob->transport = transport;
pJob->sql = sql;
......@@ -2136,19 +2138,24 @@ static int32_t schExecJobImpl(void *transport, SArray *pNodeList, SQueryPlan *pD
tsem_init(&pJob->rspSem, 0, 0);
pJob->refId = taosAddRef(schMgmt.jobRef, pJob);
if (pJob->refId < 0) {
SCH_JOB_ELOG("taosHashPut job failed, error:%s", tstrerror(terrno));
int64_t refId = taosAddRef(schMgmt.jobRef, pJob);
if (refId < 0) {
SCH_JOB_ELOG("taosAddRef job failed, error:%s", tstrerror(terrno));
SCH_ERR_JRET(terrno);
}
if (NULL == schAcquireJob(refId)) {
SCH_JOB_ELOG("schAcquireJob job failed, refId:%" PRIx64, refId);
SCH_RET(TSDB_CODE_SCH_STATUS_ERROR);
}
pJob->refId = refId;
SCH_JOB_DLOG("job refId:%" PRIx64, pJob->refId);
pJob->status = JOB_TASK_STATUS_NOT_START;
SCH_ERR_JRET(schLaunchJob(pJob));
schAcquireJob(pJob->refId);
*job = pJob->refId;
if (syncSchedule) {
......@@ -2168,6 +2175,54 @@ _return:
SCH_RET(code);
}
int32_t schExecStaticExplain(void *transport, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql,
bool syncSchedule) {
qDebug("QID:0x%" PRIx64 " job started", pDag->queryId);
int32_t code = 0;
SSchJob *pJob = taosMemoryCalloc(1, sizeof(SSchJob));
if (NULL == pJob) {
qError("QID:%" PRIx64 " calloc %d failed", pDag->queryId, (int32_t)sizeof(SSchJob));
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
pJob->sql = sql;
pJob->attr.queryJob = true;
pJob->attr.explainMode = pDag->explainInfo.mode;
pJob->queryId = pDag->queryId;
pJob->subPlans = pDag->pSubplans;
SCH_ERR_JRET(qExecStaticExplain(pDag, (SRetrieveTableRsp **)&pJob->resData));
int64_t refId = taosAddRef(schMgmt.jobRef, pJob);
if (refId < 0) {
SCH_JOB_ELOG("taosAddRef job failed, error:%s", tstrerror(terrno));
SCH_ERR_JRET(terrno);
}
if (NULL == schAcquireJob(refId)) {
SCH_JOB_ELOG("schAcquireJob job failed, refId:%" PRIx64, refId);
SCH_RET(TSDB_CODE_SCH_STATUS_ERROR);
}
pJob->refId = refId;
SCH_JOB_DLOG("job refId:%" PRIx64, pJob->refId);
pJob->status = JOB_TASK_STATUS_PARTIAL_SUCCEED;
*job = pJob->refId;
SCH_JOB_DLOG("job exec done, job status:%s", SCH_GET_JOB_STATUS_STR(pJob));
schReleaseJob(pJob->refId);
return TSDB_CODE_SUCCESS;
_return:
schFreeJobImpl(pJob);
SCH_RET(code);
}
int32_t schedulerInit(SSchedulerCfg *cfg) {
if (schMgmt.jobRef) {
qError("scheduler already initialized");
......@@ -2216,7 +2271,11 @@ int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryPlan *pDag, in
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
SCH_ERR_RET(schExecJobImpl(transport, nodeList, pDag, pJob, sql, true));
if (EXPLAIN_MODE_STATIC == pDag->explainInfo.mode) {
SCH_ERR_RET(schExecStaticExplain(transport, nodeList, pDag, pJob, sql, true));
} else {
SCH_ERR_RET(schExecJobImpl(transport, nodeList, pDag, pJob, sql, true));
}
SSchJob *job = schAcquireJob(*pJob);
......@@ -2399,11 +2458,15 @@ int32_t schedulerFetchRows(int64_t job, void **pData) {
SCH_JOB_DLOG("job already succeed, status:%s", jobTaskStatusStr(status));
goto _return;
} else if (status == JOB_TASK_STATUS_PARTIAL_SUCCEED) {
SCH_ERR_JRET(schFetchFromRemote(pJob));
if (!(pJob->attr.explainMode == EXPLAIN_MODE_STATIC)) {
SCH_ERR_JRET(schFetchFromRemote(pJob));
tsem_wait(&pJob->rspSem);
}
} else {
SCH_JOB_ELOG("job status error for fetch, status:%s", jobTaskStatusStr(status));
SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR);
}
tsem_wait(&pJob->rspSem);
status = SCH_GET_JOB_STATUS(pJob);
if (JOB_TASK_STATUS_FAILED == status || JOB_TASK_STATUS_DROPPING == status) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册