提交 b4493692 编写于 作者: D dapan1121

feature/qnode

上级 06088241
......@@ -2011,7 +2011,9 @@ void* ctgUpdateThreadFunc(void* param) {
CTG_LOCK(CTG_READ, &gCtgMgmt.lock);
while (true) {
tsem_wait(&gCtgMgmt.queue.reqSem);
if (tsem_wait(&gCtgMgmt.queue.reqSem)) {
qError("ctg tsem_wait failed, error:%s", tstrerror(TAOS_SYSTEM_ERROR(errno)));
}
if (atomic_load_8((int8_t*)&gCtgMgmt.exit)) {
tsem_post(&gCtgMgmt.queue.rspSem);
......@@ -2175,10 +2177,15 @@ int32_t catalogInit(SCatalogCfg *cfg) {
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
CTG_ERR_RET(ctgStartUpdateThread());
tsem_init(&gCtgMgmt.queue.reqSem, 0, 0);
tsem_init(&gCtgMgmt.queue.rspSem, 0, 0);
if (tsem_init(&gCtgMgmt.queue.reqSem, 0, 0)) {
qError("tsem_init failed, error:%s", tstrerror(TAOS_SYSTEM_ERROR(errno)));
CTG_ERR_RET(TSDB_CODE_CTG_SYS_ERROR);
}
if (tsem_init(&gCtgMgmt.queue.rspSem, 0, 0)) {
qError("tsem_init failed, error:%s", tstrerror(TAOS_SYSTEM_ERROR(errno)));
CTG_ERR_RET(TSDB_CODE_CTG_SYS_ERROR);
}
gCtgMgmt.queue.head = taosMemoryCalloc(1, sizeof(SCtgQNode));
if (NULL == gCtgMgmt.queue.head) {
......@@ -2187,6 +2194,8 @@ int32_t catalogInit(SCatalogCfg *cfg) {
}
gCtgMgmt.queue.tail = gCtgMgmt.queue.head;
CTG_ERR_RET(ctgStartUpdateThread());
qDebug("catalog initialized, maxDb:%u, maxTbl:%u, dbRentSec:%u, stbRentSec:%u", gCtgMgmt.cfg.maxDBCacheNum, gCtgMgmt.cfg.maxTblCacheNum, gCtgMgmt.cfg.dbRentSec, gCtgMgmt.cfg.stbRentSec);
return TSDB_CODE_SUCCESS;
......@@ -2718,8 +2727,13 @@ void catalogDestroy(void) {
atomic_store_8((int8_t*)&gCtgMgmt.exit, true);
tsem_post(&gCtgMgmt.queue.reqSem);
tsem_post(&gCtgMgmt.queue.rspSem);
if (tsem_post(&gCtgMgmt.queue.reqSem)) {
qError("tsem_post failed, error:%s", tstrerror(TAOS_SYSTEM_ERROR(errno)));
}
if (tsem_post(&gCtgMgmt.queue.rspSem)) {
qError("tsem_post failed, error:%s", tstrerror(TAOS_SYSTEM_ERROR(errno)));
}
while (CTG_IS_LOCKED(&gCtgMgmt.lock)) {
taosUsleep(1);
......
......@@ -26,9 +26,9 @@ extern "C" {
#define EXPLAIN_MAX_GROUP_NUM 100
//newline area
#define EXPLAIN_TAG_SCAN_FORMAT "Tag Scan on %s columns=%d"
#define EXPLAIN_TBL_SCAN_FORMAT "Table Scan on %s columns=%d"
#define EXPLAIN_SYSTBL_SCAN_FORMAT "System Table Scan on %s columns=%d"
#define EXPLAIN_TAG_SCAN_FORMAT "Tag Scan on %s columns=%d width=%d"
#define EXPLAIN_TBL_SCAN_FORMAT "Table Scan on %s columns=%d width=%d"
#define EXPLAIN_SYSTBL_SCAN_FORMAT "System Table Scan on %s columns=%d width=%d"
#define EXPLAIN_PROJECTION_FORMAT "Projection columns=%d width=%d"
#define EXPLAIN_JOIN_FORMAT "%s between %d tables width=%d"
#define EXPLAIN_AGG_FORMAT "Aggragate functions=%d"
......
......@@ -266,7 +266,6 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
bool isVerboseLine = false;
char *tbuf = ctx->tbuf;
bool verbose = ctx->verbose;
int32_t filterLen = 0;
SPhysiNode* pNode = pResNode->pNode;
if (NULL == pNode) {
qError("pyhsical node in explain res node is NULL");
......@@ -276,7 +275,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
switch (pNode->type) {
case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN: {
STagScanPhysiNode *pTagScanNode = (STagScanPhysiNode *)pNode;
EXPLAIN_ROW_NEW(level, EXPLAIN_TAG_SCAN_FORMAT, pTagScanNode->tableName.tname, pTagScanNode->pScanCols->length);
EXPLAIN_ROW_NEW(level, EXPLAIN_TAG_SCAN_FORMAT, pTagScanNode->tableName.tname, pTagScanNode->pScanCols->length, pTagScanNode->node.pOutputDataBlockDesc->outputRowSize);
if (pResNode->pExecInfo) {
QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen));
}
......@@ -297,7 +296,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
case QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN:
case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN:{
STableScanPhysiNode *pTblScanNode = (STableScanPhysiNode *)pNode;
EXPLAIN_ROW_NEW(level, EXPLAIN_TBL_SCAN_FORMAT, pTblScanNode->scan.tableName.tname, pTblScanNode->scan.pScanCols->length);
EXPLAIN_ROW_NEW(level, EXPLAIN_TBL_SCAN_FORMAT, pTblScanNode->scan.tableName.tname, pTblScanNode->scan.pScanCols->length, pTblScanNode->scan.node.pOutputDataBlockDesc->outputRowSize);
if (pResNode->pExecInfo) {
QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen));
}
......@@ -319,8 +318,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
if (pTblScanNode->scan.node.pConditions) {
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT);
QRY_ERR_RET(nodesNodeToSQL(pTblScanNode->scan.node.pConditions, tbuf + tlen, TSDB_EXPLAIN_RESULT_ROW_SIZE - tlen, &filterLen));
tlen += filterLen;
QRY_ERR_RET(nodesNodeToSQL(pTblScanNode->scan.node.pConditions, tbuf + VARSTR_HEADER_SIZE, TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen));
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
}
......@@ -329,7 +327,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
}
case QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN:{
SSystemTableScanPhysiNode *pSTblScanNode = (SSystemTableScanPhysiNode *)pNode;
EXPLAIN_ROW_NEW(level, EXPLAIN_SYSTBL_SCAN_FORMAT, pSTblScanNode->scan.tableName.tname, pSTblScanNode->scan.pScanCols->length);
EXPLAIN_ROW_NEW(level, EXPLAIN_SYSTBL_SCAN_FORMAT, pSTblScanNode->scan.tableName.tname, pSTblScanNode->scan.pScanCols->length, pSTblScanNode->scan.node.pOutputDataBlockDesc->outputRowSize);
if (pResNode->pExecInfo) {
QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen));
}
......@@ -343,7 +341,15 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
if (verbose) {
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_ORDER_FORMAT, EXPLAIN_ORDER_STRING(pSTblScanNode->scan.order));
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
if (pSTblScanNode->scan.node.pConditions) {
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT);
QRY_ERR_RET(nodesNodeToSQL(pSTblScanNode->scan.node.pConditions, tbuf + VARSTR_HEADER_SIZE, TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen));
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
}
}
break;
}
......@@ -359,8 +365,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
if (verbose) {
if (pPrjNode->node.pConditions) {
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT);
QRY_ERR_RET(nodesNodeToSQL(pPrjNode->node.pConditions, tbuf + tlen, TSDB_EXPLAIN_RESULT_ROW_SIZE - tlen, &filterLen));
tlen += filterLen;
QRY_ERR_RET(nodesNodeToSQL(pPrjNode->node.pConditions, tbuf + VARSTR_HEADER_SIZE, TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen));
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
}
......@@ -379,15 +384,13 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
if (verbose) {
if (pJoinNode->node.pConditions) {
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT);
QRY_ERR_RET(nodesNodeToSQL(pJoinNode->node.pConditions, tbuf + tlen, TSDB_EXPLAIN_RESULT_ROW_SIZE - tlen, &filterLen));
tlen += filterLen;
QRY_ERR_RET(nodesNodeToSQL(pJoinNode->node.pConditions, tbuf + VARSTR_HEADER_SIZE, TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen));
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
}
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_ON_CONDITIONS_FORMAT);
QRY_ERR_RET(nodesNodeToSQL(pJoinNode->pOnConditions, tbuf + tlen, TSDB_EXPLAIN_RESULT_ROW_SIZE - tlen, &filterLen));
tlen += filterLen;
QRY_ERR_RET(nodesNodeToSQL(pJoinNode->pOnConditions, tbuf + VARSTR_HEADER_SIZE, TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen));
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
}
......@@ -410,8 +413,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
if (verbose) {
if (pAggNode->node.pConditions) {
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT);
QRY_ERR_RET(nodesNodeToSQL(pAggNode->node.pConditions, tbuf + tlen, TSDB_EXPLAIN_RESULT_ROW_SIZE - tlen, &filterLen));
tlen += filterLen;
QRY_ERR_RET(nodesNodeToSQL(pAggNode->node.pConditions, tbuf + VARSTR_HEADER_SIZE, TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen));
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
}
......@@ -436,8 +438,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
if (verbose) {
if (pExchNode->node.pConditions) {
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT);
QRY_ERR_RET(nodesNodeToSQL(pExchNode->node.pConditions, tbuf + tlen, TSDB_EXPLAIN_RESULT_ROW_SIZE - tlen, &filterLen));
tlen += filterLen;
QRY_ERR_RET(nodesNodeToSQL(pExchNode->node.pConditions, tbuf + VARSTR_HEADER_SIZE, TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen));
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
}
......@@ -458,8 +459,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
if (verbose) {
if (pSortNode->node.pConditions) {
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT);
QRY_ERR_RET(nodesNodeToSQL(pSortNode->node.pConditions, tbuf + tlen, TSDB_EXPLAIN_RESULT_ROW_SIZE - tlen, &filterLen));
tlen += filterLen;
QRY_ERR_RET(nodesNodeToSQL(pSortNode->node.pConditions, tbuf + VARSTR_HEADER_SIZE, TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen));
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
}
......@@ -488,8 +488,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
if (pIntNode->window.node.pConditions) {
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT);
QRY_ERR_RET(nodesNodeToSQL(pIntNode->window.node.pConditions, tbuf + tlen, TSDB_EXPLAIN_RESULT_ROW_SIZE - tlen, &filterLen));
tlen += filterLen;
QRY_ERR_RET(nodesNodeToSQL(pIntNode->window.node.pConditions, tbuf + VARSTR_HEADER_SIZE, TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen));
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
}
......@@ -508,8 +507,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
if (verbose) {
if (pIntNode->window.node.pConditions) {
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT);
QRY_ERR_RET(nodesNodeToSQL(pIntNode->window.node.pConditions, tbuf + tlen, TSDB_EXPLAIN_RESULT_ROW_SIZE - tlen, &filterLen));
tlen += filterLen;
QRY_ERR_RET(nodesNodeToSQL(pIntNode->window.node.pConditions, tbuf + VARSTR_HEADER_SIZE, TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen));
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
}
......
......@@ -30,18 +30,17 @@ int32_t nodesNodeToSQL(SNode *pNode, char *buf, int32_t bufSize, int32_t *len) {
switch (pNode->type) {
case QUERY_NODE_COLUMN: {
SColumnNode *colNode = (SColumnNode *)pNode;
*len = 0;
if (colNode->dbName[0]) {
*len += snprintf(buf, bufSize - *len, "`%s`.", colNode->dbName);
*len += snprintf(buf + *len, bufSize - *len, "`%s`.", colNode->dbName);
}
if (colNode->tableAlias[0]) {
*len += snprintf(buf, bufSize - *len, "`%s`.", colNode->tableAlias);
*len += snprintf(buf + *len, bufSize - *len, "`%s`.", colNode->tableAlias);
} else if (colNode->tableName[0]) {
*len += snprintf(buf, bufSize - *len, "`%s`.", colNode->tableName);
*len += snprintf(buf + *len, bufSize - *len, "`%s`.", colNode->tableName);
}
*len += snprintf(buf, bufSize - *len, "`%s`", colNode->colName);
*len += snprintf(buf + *len, bufSize - *len, "`%s`", colNode->colName);
return TSDB_CODE_SUCCESS;
}
......@@ -53,14 +52,14 @@ int32_t nodesNodeToSQL(SNode *pNode, char *buf, int32_t bufSize, int32_t *len) {
NODES_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
}
*len += snprintf(buf, bufSize - *len, "%s", t);
*len += snprintf(buf + *len, bufSize - *len, "%s", t);
taosMemoryFree(t);
return TSDB_CODE_SUCCESS;
}
case QUERY_NODE_OPERATOR: {
SOperatorNode* pOpNode = (SOperatorNode*)pNode;
*len += snprintf(buf, bufSize - *len, "(");
*len += snprintf(buf + *len, bufSize - *len, "(");
if (pOpNode->pLeft) {
NODES_ERR_RET(nodesNodeToSQL(pOpNode->pLeft, buf, bufSize, len));
}
......@@ -70,13 +69,13 @@ int32_t nodesNodeToSQL(SNode *pNode, char *buf, int32_t bufSize, int32_t *len) {
NODES_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
}
*len += snprintf(buf, bufSize - *len, " %s ", gOperatorStr[pOpNode->opType]);
*len += snprintf(buf + *len, bufSize - *len, " %s ", gOperatorStr[pOpNode->opType]);
if (pOpNode->pRight) {
NODES_ERR_RET(nodesNodeToSQL(pOpNode->pRight, buf, bufSize, len));
}
*len += snprintf(buf, bufSize - *len, ")");
*len += snprintf(buf + *len, bufSize - *len, ")");
return TSDB_CODE_SUCCESS;
}
......@@ -85,17 +84,17 @@ int32_t nodesNodeToSQL(SNode *pNode, char *buf, int32_t bufSize, int32_t *len) {
SNode* node = NULL;
bool first = true;
*len += snprintf(buf, bufSize - *len, "(");
*len += snprintf(buf + *len, bufSize - *len, "(");
FOREACH(node, pLogicNode->pParameterList) {
if (!first) {
*len += snprintf(buf, bufSize - *len, " %s ", gLogicConditionStr[pLogicNode->condType]);
*len += snprintf(buf + *len, bufSize - *len, " %s ", gLogicConditionStr[pLogicNode->condType]);
}
NODES_ERR_RET(nodesNodeToSQL(node, buf, bufSize, len));
first = false;
}
*len += snprintf(buf, bufSize - *len, ")");
*len += snprintf(buf + *len, bufSize - *len, ")");
return TSDB_CODE_SUCCESS;
}
......@@ -104,17 +103,17 @@ int32_t nodesNodeToSQL(SNode *pNode, char *buf, int32_t bufSize, int32_t *len) {
SNode* node = NULL;
bool first = true;
*len += snprintf(buf, bufSize - *len, "%s(", pFuncNode->functionName);
*len += snprintf(buf + *len, bufSize - *len, "%s(", pFuncNode->functionName);
FOREACH(node, pFuncNode->pParameterList) {
if (!first) {
*len += snprintf(buf, bufSize - *len, ", ");
*len += snprintf(buf + *len, bufSize - *len, ", ");
}
NODES_ERR_RET(nodesNodeToSQL(node, buf, bufSize, len));
first = false;
}
*len += snprintf(buf, bufSize - *len, ")");
*len += snprintf(buf + *len, bufSize - *len, ")");
return TSDB_CODE_SUCCESS;
}
......@@ -123,17 +122,17 @@ int32_t nodesNodeToSQL(SNode *pNode, char *buf, int32_t bufSize, int32_t *len) {
SNode* node = NULL;
bool first = true;
*len += snprintf(buf, bufSize - *len, "(");
*len += snprintf(buf + *len, bufSize - *len, "(");
FOREACH(node, pListNode->pNodeList) {
if (!first) {
*len += snprintf(buf, bufSize - *len, ", ");
*len += snprintf(buf + *len, bufSize - *len, ", ");
}
NODES_ERR_RET(nodesNodeToSQL(node, buf, bufSize, len));
first = false;
}
*len += snprintf(buf, bufSize - *len, ")");
*len += snprintf(buf + *len, bufSize - *len, ")");
return TSDB_CODE_SUCCESS;
}
......
......@@ -839,12 +839,13 @@ char* nodesGetStrValueFromNode(SValueNode *pNode) {
case TSDB_DATA_TYPE_NCHAR:
case TSDB_DATA_TYPE_VARCHAR:
case TSDB_DATA_TYPE_VARBINARY: {
void *buf = taosMemoryMalloc(varDataLen(pNode->datum.p) + 1);
int32_t bufSize = varDataLen(pNode->datum.p) + 2 + 1;
void *buf = taosMemoryMalloc(bufSize);
if (NULL == buf) {
return NULL;
}
strncpy(buf, varDataVal(pNode->datum.p), varDataLen(pNode->datum.p) + 1);
snprintf(buf, bufSize, "'%s'", varDataVal(pNode->datum.p));
return buf;
}
default:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册