提交 5d123801 编写于 作者: H Haojun Liao

[td-11818] fix bug in select * from super_table.

上级 23b39814
...@@ -95,6 +95,7 @@ typedef struct SScanPhyNode { ...@@ -95,6 +95,7 @@ typedef struct SScanPhyNode {
int8_t tableType; int8_t tableType;
int32_t order; // scan order: TSDB_ORDER_ASC|TSDB_ORDER_DESC int32_t order; // scan order: TSDB_ORDER_ASC|TSDB_ORDER_DESC
int32_t count; // repeat count int32_t count; // repeat count
int32_t reverse; // reverse scan count
} SScanPhyNode; } SScanPhyNode;
typedef SScanPhyNode SSystemTableScanPhyNode; typedef SScanPhyNode SSystemTableScanPhyNode;
......
此差异已折叠。
...@@ -542,7 +542,7 @@ typedef struct SOrderOperatorInfo { ...@@ -542,7 +542,7 @@ typedef struct SOrderOperatorInfo {
void appendUpstream(SOperatorInfo* p, SOperatorInfo* pUpstream); void appendUpstream(SOperatorInfo* p, SOperatorInfo* pUpstream);
SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, STaskRuntimeEnv* pRuntimeEnv, int32_t repeatTime, int32_t reverseTime); SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, int32_t order, int32_t numOfOutput, int32_t repeatTime, int32_t reverseTime, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, int32_t order, int32_t numOfOutput, int32_t repeatTime, SExecTaskInfo* pTaskInfo); SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, int32_t order, int32_t numOfOutput, int32_t repeatTime, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTableSeqScanOperator(void* pTsdbQueryHandle, STaskRuntimeEnv* pRuntimeEnv); SOperatorInfo* createTableSeqScanOperator(void* pTsdbQueryHandle, STaskRuntimeEnv* pRuntimeEnv);
......
...@@ -4844,7 +4844,16 @@ static SSDataBlock* doBlockInfoScan(void* param, bool* newgroup) { ...@@ -4844,7 +4844,16 @@ static SSDataBlock* doBlockInfoScan(void* param, bool* newgroup) {
SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, int32_t order, int32_t numOfOutput, int32_t repeatTime, SExecTaskInfo* pTaskInfo) { SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, int32_t order, int32_t numOfOutput, int32_t repeatTime, SExecTaskInfo* pTaskInfo) {
assert(repeatTime > 0 && numOfOutput > 0); assert(repeatTime > 0 && numOfOutput > 0);
STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo)); STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo));
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
tfree(pInfo);
tfree(pOperator);
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
return NULL;
}
pInfo->pTsdbReadHandle = pTsdbQueryHandle; pInfo->pTsdbReadHandle = pTsdbQueryHandle;
pInfo->times = repeatTime; pInfo->times = repeatTime;
pInfo->reverseTimes = 0; pInfo->reverseTimes = 0;
...@@ -4852,7 +4861,6 @@ SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, int32_t order, in ...@@ -4852,7 +4861,6 @@ SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, int32_t order, in
pInfo->current = 0; pInfo->current = 0;
pInfo->scanFlag = MAIN_SCAN; pInfo->scanFlag = MAIN_SCAN;
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "TableScanOperator"; pOperator->name = "TableScanOperator";
pOperator->operatorType = OP_TableScan; pOperator->operatorType = OP_TableScan;
pOperator->blockingOptr = false; pOperator->blockingOptr = false;
...@@ -4866,6 +4874,39 @@ SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, int32_t order, in ...@@ -4866,6 +4874,39 @@ SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, int32_t order, in
return pOperator; return pOperator;
} }
SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, int32_t order, int32_t numOfOutput, int32_t repeatTime, int32_t reverseTime, SExecTaskInfo* pTaskInfo) {
assert(repeatTime > 0);
STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo));
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
tfree(pInfo);
tfree(pOperator);
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
return NULL;
}
pInfo->pTsdbReadHandle = pTsdbQueryHandle;
pInfo->times = repeatTime;
pInfo->reverseTimes = reverseTime;
pInfo->order = order;
pInfo->current = 0;
pInfo->scanFlag = MAIN_SCAN;
pOperator->name = "DataBlocksOptimizedScanOperator";
pOperator->operatorType = OP_DataBlocksOptScan;
pOperator->blockingOptr = false;
pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo;
pOperator->numOfOutput = numOfOutput;
pOperator->pRuntimeEnv = NULL;
pOperator->exec = doTableScan;
pOperator->pTaskInfo = pTaskInfo;
return pOperator;
}
SOperatorInfo* createTableSeqScanOperator(void* pTsdbQueryHandle, STaskRuntimeEnv* pRuntimeEnv) { SOperatorInfo* createTableSeqScanOperator(void* pTsdbQueryHandle, STaskRuntimeEnv* pRuntimeEnv) {
STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo)); STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo));
...@@ -4973,27 +5014,6 @@ void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInf ...@@ -4973,27 +5014,6 @@ void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInf
} }
SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, STaskRuntimeEnv* pRuntimeEnv, int32_t repeatTime, int32_t reverseTime) {
assert(repeatTime > 0);
STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo));
pInfo->pTsdbReadHandle = pTsdbQueryHandle;
pInfo->times = repeatTime;
pInfo->reverseTimes = reverseTime;
pInfo->current = 0;
pInfo->order = pRuntimeEnv->pQueryAttr->order.order;
SOperatorInfo* pOptr = calloc(1, sizeof(SOperatorInfo));
pOptr->name = "DataBlocksOptimizedScanOperator";
// pOptr->operatorType = OP_DataBlocksOptScan;
pOptr->pRuntimeEnv = pRuntimeEnv;
pOptr->blockingOptr = false;
pOptr->info = pInfo;
pOptr->exec = doTableScan;
return pOptr;
}
SArray* getOrderCheckColumns(STaskAttr* pQuery) { SArray* getOrderCheckColumns(STaskAttr* pQuery) {
int32_t numOfCols = (pQuery->pGroupbyExpr == NULL)? 0: taosArrayGetSize(pQuery->pGroupbyExpr->columnInfo); int32_t numOfCols = (pQuery->pGroupbyExpr == NULL)? 0: taosArrayGetSize(pQuery->pGroupbyExpr->columnInfo);
...@@ -7187,10 +7207,13 @@ static SExecTaskInfo* createExecTaskInfo(uint64_t queryId) { ...@@ -7187,10 +7207,13 @@ static SExecTaskInfo* createExecTaskInfo(uint64_t queryId) {
SOperatorInfo* doCreateOperatorTreeNode(SPhyNode* pPhyNode, SExecTaskInfo* pTaskInfo, void* param) { SOperatorInfo* doCreateOperatorTreeNode(SPhyNode* pPhyNode, SExecTaskInfo* pTaskInfo, void* param) {
if (pPhyNode->pChildren == NULL || taosArrayGetSize(pPhyNode->pChildren) == 0) { if (pPhyNode->pChildren == NULL || taosArrayGetSize(pPhyNode->pChildren) == 0) {
if (pPhyNode->info.type == OP_TableScan) { if (pPhyNode->info.type == OP_TableScan) {
SScanPhyNode* pScanPhyNode = (SScanPhyNode*) pPhyNode; SScanPhyNode* pScanPhyNode = (SScanPhyNode*)pPhyNode;
size_t numOfCols = taosArrayGetSize(pPhyNode->pTargets); size_t numOfCols = taosArrayGetSize(pPhyNode->pTargets);
SOperatorInfo* pOperatorInfo = createTableScanOperator(param, pScanPhyNode->order, numOfCols, pScanPhyNode->count, pTaskInfo); return createTableScanOperator(param, pScanPhyNode->order, numOfCols, pScanPhyNode->count, pTaskInfo);
pTaskInfo->pRoot = pOperatorInfo; } else if (pPhyNode->info.type == OP_DataBlocksOptScan) {
SScanPhyNode* pScanPhyNode = (SScanPhyNode*)pPhyNode;
size_t numOfCols = taosArrayGetSize(pPhyNode->pTargets);
return createDataBlocksOptScanInfo(param, pScanPhyNode->order, numOfCols, pScanPhyNode->count, pScanPhyNode->reverse, pTaskInfo);
} else { } else {
assert(0); assert(0);
} }
...@@ -7199,21 +7222,20 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhyNode* pPhyNode, SExecTaskInfo* pTask ...@@ -7199,21 +7222,20 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhyNode* pPhyNode, SExecTaskInfo* pTask
int32_t doCreateExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, void* readerHandle) { int32_t doCreateExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, void* readerHandle) {
STsdbQueryCond cond = {.loadExternalRows = false}; STsdbQueryCond cond = {.loadExternalRows = false};
cond.twindow.skey = INT64_MIN;
cond.twindow.ekey = INT64_MAX;
uint64_t uid = 0; uint64_t uid = 0;
SPhyNode* pPhyNode = pPlan->pNode; SPhyNode* pPhyNode = pPlan->pNode;
if (pPhyNode->info.type == OP_TableScan) { if (pPhyNode->info.type == OP_TableScan || pPhyNode->info.type == OP_DataBlocksOptScan) {
SScanPhyNode* pScanNode = (SScanPhyNode*) pPhyNode; STableScanPhyNode* pTableScanNode = (STableScanPhyNode*) pPhyNode;
uid = pScanNode->uid; uid = pTableScanNode->scan.uid;
cond.order = pScanNode->order; cond.order = pTableScanNode->scan.order;
cond.numOfCols = taosArrayGetSize(pScanNode->node.pTargets); cond.numOfCols = taosArrayGetSize(pTableScanNode->scan.node.pTargets);
cond.colList = calloc(cond.numOfCols, sizeof(SColumnInfo)); cond.colList = calloc(cond.numOfCols, sizeof(SColumnInfo));
cond.twindow = pTableScanNode->window;
for(int32_t i = 0; i < cond.numOfCols; ++i) { for(int32_t i = 0; i < cond.numOfCols; ++i) {
SExprInfo* pExprInfo = taosArrayGetP(pScanNode->node.pTargets, i); SExprInfo* pExprInfo = taosArrayGetP(pTableScanNode->scan.node.pTargets, i);
assert(pExprInfo->pExpr->nodeType == TEXPR_COL_NODE); assert(pExprInfo->pExpr->nodeType == TEXPR_COL_NODE);
SSchema* pSchema = pExprInfo->pExpr->pSchema; SSchema* pSchema = pExprInfo->pExpr->pSchema;
...@@ -7235,7 +7257,11 @@ int32_t doCreateExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, void* r ...@@ -7235,7 +7257,11 @@ int32_t doCreateExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, void* r
*pTaskInfo = createExecTaskInfo((uint64_t)pPlan->id.queryId); *pTaskInfo = createExecTaskInfo((uint64_t)pPlan->id.queryId);
tsdbReadHandleT tsdbReadHandle = tsdbQueryTables(readerHandle, &cond, &group, (*pTaskInfo)->id.queryId, NULL); tsdbReadHandleT tsdbReadHandle = tsdbQueryTables(readerHandle, &cond, &group, (*pTaskInfo)->id.queryId, NULL);
doCreateOperatorTreeNode(pPlan->pNode, *pTaskInfo, tsdbReadHandle); (*pTaskInfo)->pRoot = doCreateOperatorTreeNode(pPlan->pNode, *pTaskInfo, tsdbReadHandle);
if ((*pTaskInfo)->pRoot == NULL) {
return terrno;
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -917,6 +917,8 @@ int32_t validateLimitNode(SQueryStmtInfo *pQueryInfo, SSqlNode* pSqlNode, SMsgBu ...@@ -917,6 +917,8 @@ int32_t validateLimitNode(SQueryStmtInfo *pQueryInfo, SSqlNode* pSqlNode, SMsgBu
return buildInvalidOperationMsg(pMsgBuf, msg1); return buildInvalidOperationMsg(pMsgBuf, msg1);
} }
} }
return TSDB_CODE_SUCCESS;
} }
int32_t validateOrderbyNode(SQueryStmtInfo *pQueryInfo, SSqlNode* pSqlNode, SMsgBuf* pMsgBuf) { int32_t validateOrderbyNode(SQueryStmtInfo *pQueryInfo, SSqlNode* pSqlNode, SMsgBuf* pMsgBuf) {
......
...@@ -262,8 +262,8 @@ static void vgroupMsgToEpSet(const SVgroupMsg* vg, SQueryNodeAddr* execNode) { ...@@ -262,8 +262,8 @@ static void vgroupMsgToEpSet(const SVgroupMsg* vg, SQueryNodeAddr* execNode) {
} }
static uint64_t splitSubplanByTable(SPlanContext* pCxt, SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable) { static uint64_t splitSubplanByTable(SPlanContext* pCxt, SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable) {
SVgroupsInfo* vgroupList = pTable->pMeta->vgroupList; SVgroupsInfo* pVgroupList = pTable->pMeta->vgroupList;
for (int32_t i = 0; i < pTable->pMeta->vgroupList->numOfVgroups; ++i) { for (int32_t i = 0; i < pVgroupList->numOfVgroups; ++i) {
STORE_CURRENT_SUBPLAN(pCxt); STORE_CURRENT_SUBPLAN(pCxt);
SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_SCAN); SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_SCAN);
subplan->msgType = TDMT_VND_QUERY; subplan->msgType = TDMT_VND_QUERY;
......
...@@ -530,32 +530,38 @@ static const char* jkScanNodeTableId = "TableId"; ...@@ -530,32 +530,38 @@ static const char* jkScanNodeTableId = "TableId";
static const char* jkScanNodeTableType = "TableType"; static const char* jkScanNodeTableType = "TableType";
static const char* jkScanNodeTableOrder = "Order"; static const char* jkScanNodeTableOrder = "Order";
static const char* jkScanNodeTableCount = "Count"; static const char* jkScanNodeTableCount = "Count";
static const char* jkScanNodeTableRevCount = "Reverse";
static bool scanNodeToJson(const void* obj, cJSON* json) { static bool scanNodeToJson(const void* obj, cJSON* json) {
const SScanPhyNode* scan = (const SScanPhyNode*)obj; const SScanPhyNode* pNode = (const SScanPhyNode*)obj;
bool res = cJSON_AddNumberToObject(json, jkScanNodeTableId, scan->uid); bool res = cJSON_AddNumberToObject(json, jkScanNodeTableId, pNode->uid);
if (res) { if (res) {
res = cJSON_AddNumberToObject(json, jkScanNodeTableType, scan->tableType); res = cJSON_AddNumberToObject(json, jkScanNodeTableType, pNode->tableType);
} }
if (res) { if (res) {
res = cJSON_AddNumberToObject(json, jkScanNodeTableOrder, scan->order); res = cJSON_AddNumberToObject(json, jkScanNodeTableOrder, pNode->order);
} }
if (res) { if (res) {
res = cJSON_AddNumberToObject(json, jkScanNodeTableCount, scan->count); res = cJSON_AddNumberToObject(json, jkScanNodeTableCount, pNode->count);
}
if (res) {
res = cJSON_AddNumberToObject(json, jkScanNodeTableRevCount, pNode->reverse);
} }
return res; return res;
} }
static bool scanNodeFromJson(const cJSON* json, void* obj) { static bool scanNodeFromJson(const cJSON* json, void* obj) {
SScanPhyNode* scan = (SScanPhyNode*)obj; SScanPhyNode* pNode = (SScanPhyNode*)obj;
scan->uid = getNumber(json, jkScanNodeTableId); pNode->uid = getNumber(json, jkScanNodeTableId);
scan->tableType = getNumber(json, jkScanNodeTableType); pNode->tableType = getNumber(json, jkScanNodeTableType);
scan->count = getNumber(json, jkScanNodeTableCount); pNode->count = getNumber(json, jkScanNodeTableCount);
scan->order = getNumber(json, jkScanNodeTableOrder); pNode->order = getNumber(json, jkScanNodeTableOrder);
pNode->reverse = getNumber(json, jkScanNodeTableRevCount);
return true; return true;
} }
...@@ -910,7 +916,7 @@ static SSubplan* subplanFromJson(const cJSON* json) { ...@@ -910,7 +916,7 @@ static SSubplan* subplanFromJson(const cJSON* json) {
} }
bool res = fromObject(json, jkSubplanId, subplanIdFromJson, &subplan->id, true); bool res = fromObject(json, jkSubplanId, subplanIdFromJson, &subplan->id, true);
if (res) { if (res) {
size_t size = MAX(sizeof(SPhyNode), sizeof(SScanPhyNode)); size_t size = MAX(sizeof(SPhyNode), sizeof(STableScanPhyNode));
res = fromObjectWithAlloc(json, jkSubplanNode, phyNodeFromJson, (void**)&subplan->pNode, size, false); res = fromObjectWithAlloc(json, jkSubplanNode, phyNodeFromJson, (void**)&subplan->pNode, size, false);
} }
if (res) { if (res) {
...@@ -940,8 +946,8 @@ int32_t subPlanToString(const SSubplan* subplan, char** str, int32_t* len) { ...@@ -940,8 +946,8 @@ int32_t subPlanToString(const SSubplan* subplan, char** str, int32_t* len) {
} }
*str = cJSON_Print(json); *str = cJSON_Print(json);
// printf("====Physical plan:====\n") printf("====Physical plan:====\n");
// printf("%s\n", *str); printf("%s\n", *str);
*len = strlen(*str) + 1; *len = strlen(*str) + 1;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -65,9 +65,9 @@ int32_t qCreateQueryDag(const struct SQueryNode* pNode, struct SQueryDag** pDag, ...@@ -65,9 +65,9 @@ int32_t qCreateQueryDag(const struct SQueryNode* pNode, struct SQueryDag** pDag,
} }
if (pLogicPlan->info.type != QNODE_MODIFY) { if (pLogicPlan->info.type != QNODE_MODIFY) {
// char* str = NULL; char* str = NULL;
// queryPlanToString(pLogicPlan, &str); queryPlanToString(pLogicPlan, &str);
// printf("%s\n", str); printf("%s\n", str);
} }
code = optimizeQueryPlan(pLogicPlan); code = optimizeQueryPlan(pLogicPlan);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册