提交 758427b3 编写于 作者: S shenglian zhou

fix: use slimit to indicate group for tag scan

上级 057cc2d3
...@@ -366,6 +366,7 @@ typedef struct STagScanInfo { ...@@ -366,6 +366,7 @@ typedef struct STagScanInfo {
int32_t curPos; int32_t curPos;
SReadHandle readHandle; SReadHandle readHandle;
STableListInfo* pTableListInfo; STableListInfo* pTableListInfo;
SLimitNode* pSlimit;
} STagScanInfo; } STagScanInfo;
typedef enum EStreamScanMode { typedef enum EStreamScanMode {
......
...@@ -2512,37 +2512,16 @@ _error: ...@@ -2512,37 +2512,16 @@ _error:
return NULL; return NULL;
} }
static SSDataBlock* doTagScan(SOperatorInfo* pOperator) { static void doTagScanOneTable(SOperatorInfo* pOperator, const SExecTaskInfo* pTaskInfo, STagScanInfo* pInfo,
if (pOperator->status == OP_EXEC_DONE) { const SExprInfo* pExprInfo, const SSDataBlock* pRes, int32_t size, const char* str,
return NULL; int32_t* count, SMetaReader* mr) {
}
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
STagScanInfo* pInfo = pOperator->info;
SExprInfo* pExprInfo = &pOperator->exprSupp.pExprInfo[0];
SSDataBlock* pRes = pInfo->pRes;
blockDataCleanup(pRes);
int32_t size = tableListGetSize(pInfo->pTableListInfo);
if (size == 0) {
setTaskStatus(pTaskInfo, TASK_COMPLETED);
return NULL;
}
char str[512] = {0};
int32_t count = 0;
SMetaReader mr = {0};
metaReaderInit(&mr, pInfo->readHandle.meta, 0);
while (pInfo->curPos < size && count < pOperator->resultInfo.capacity) {
STableKeyInfo* item = tableListGetInfo(pInfo->pTableListInfo, pInfo->curPos); STableKeyInfo* item = tableListGetInfo(pInfo->pTableListInfo, pInfo->curPos);
int32_t code = metaGetTableEntryByUid(&mr, item->uid); int32_t code = metaGetTableEntryByUid(mr, item->uid);
tDecoderClear(&mr.coder); tDecoderClear(&(*mr).coder);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", item->uid, tstrerror(terrno), qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", item->uid, tstrerror(terrno),
GET_TASKID(pTaskInfo)); GET_TASKID(pTaskInfo));
metaReaderClear(&mr); metaReaderClear(mr);
T_LONG_JMP(pTaskInfo->env, terrno); T_LONG_JMP(pTaskInfo->env, terrno);
} }
...@@ -2551,12 +2530,12 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) { ...@@ -2551,12 +2530,12 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
// refactor later // refactor later
if (fmIsScanPseudoColumnFunc(pExprInfo[j].pExpr->_function.functionId)) { if (fmIsScanPseudoColumnFunc(pExprInfo[j].pExpr->_function.functionId)) {
STR_TO_VARSTR(str, mr.me.name); STR_TO_VARSTR(str, (*mr).me.name);
colDataSetVal(pDst, count, str, false); colDataSetVal(pDst, (*count), str, false);
} else { // it is a tag value } else { // it is a tag value
STagVal val = {0}; STagVal val = {0};
val.cid = pExprInfo[j].base.pParam[0].pCol->colId; val.cid = pExprInfo[j].base.pParam[0].pCol->colId;
const char* p = metaGetTableTagVal(mr.me.ctbEntry.pTags, pDst->info.type, &val); const char* p = metaGetTableTagVal((*mr).me.ctbEntry.pTags, pDst->info.type, &val);
char* data = NULL; char* data = NULL;
if (pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL) { if (pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL) {
...@@ -2564,7 +2543,7 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) { ...@@ -2564,7 +2543,7 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
} else { } else {
data = (char*)p; data = (char*)p;
} }
colDataSetVal(pDst, count, data, colDataSetVal(pDst, (*count), data,
(data == NULL) || (pDst->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data))); (data == NULL) || (pDst->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data)));
if (pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL && IS_VAR_DATA_TYPE(((const STagVal*)p)->type) && if (pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL && IS_VAR_DATA_TYPE(((const STagVal*)p)->type) &&
...@@ -2574,10 +2553,41 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) { ...@@ -2574,10 +2553,41 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
} }
} }
count += 1; (*count) += 1;
if (++pInfo->curPos >= size) { if (++pInfo->curPos >= size) {
setOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
} }
}
static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
STagScanInfo* pInfo = pOperator->info;
SExprInfo* pExprInfo = &pOperator->exprSupp.pExprInfo[0];
SSDataBlock* pRes = pInfo->pRes;
blockDataCleanup(pRes);
int32_t size = tableListGetSize(pInfo->pTableListInfo);
if (size == 0) {
setTaskStatus(pTaskInfo, TASK_COMPLETED);
return NULL;
}
char str[512] = {0};
int32_t count = 0;
SMetaReader mr = {0};
metaReaderInit(&mr, pInfo->readHandle.meta, 0);
while (pInfo->curPos < size && count < pOperator->resultInfo.capacity) {
doTagScanOneTable(pOperator, pTaskInfo, pInfo, pExprInfo, pRes, size, str, &count, &mr);
if (pInfo->pSlimit != NULL) {
pInfo->pRes->info.id.groupId = calcGroupId(mr.me.name, strlen(mr.me.name));
break;
}
} }
metaReaderClear(&mr); metaReaderClear(&mr);
...@@ -2628,6 +2638,7 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi ...@@ -2628,6 +2638,7 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi
pInfo->pRes = createDataBlockFromDescNode(pDescNode); pInfo->pRes = createDataBlockFromDescNode(pDescNode);
pInfo->readHandle = *pReadHandle; pInfo->readHandle = *pReadHandle;
pInfo->curPos = 0; pInfo->curPos = 0;
pInfo->pSlimit = (SLimitNode*)pPhyNode->node.pSlimit; //TODO: slimit now only indicate group
setOperatorInfo(pOperator, "TagScanOperator", QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN, false, OP_NOT_OPENED, pInfo, setOperatorInfo(pOperator, "TagScanOperator", QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN, false, OP_NOT_OPENED, pInfo,
pTaskInfo); pTaskInfo);
......
...@@ -2418,6 +2418,34 @@ static bool tagScanOptShouldBeOptimized(SLogicNode* pNode) { ...@@ -2418,6 +2418,34 @@ static bool tagScanOptShouldBeOptimized(SLogicNode* pNode) {
return true; return true;
} }
static SLogicNode* tagScanOptFindAncestorWithSlimit(SLogicNode* pTableScanNode) {
SLogicNode* pNode = pTableScanNode->pParent;
while (NULL != pNode) {
if (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pNode) || QUERY_NODE_LOGIC_PLAN_AGG == nodeType(pNode) ||
QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pNode) || QUERY_NODE_LOGIC_PLAN_SORT == nodeType(pNode)) {
return NULL;
}
if (NULL != pNode->pSlimit) {
return pNode;
}
pNode = pNode->pParent;
}
return NULL;
}
static void tagScanOptCloneAncestorSlimit(SLogicNode* pTableScanNode) {
if (NULL != pTableScanNode->pSlimit) {
return;
}
SLogicNode* pNode = tagScanOptFindAncestorWithSlimit(pTableScanNode);
if (NULL != pNode) {
//TODO: only set the slimit now. push down slimit later
pTableScanNode->pSlimit = nodesCloneNode(pNode->pSlimit);
}
return;
}
static int32_t tagScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) { static int32_t tagScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) {
SScanLogicNode* pScanNode = (SScanLogicNode*)optFindPossibleNode(pLogicSubplan->pNode, tagScanOptShouldBeOptimized); SScanLogicNode* pScanNode = (SScanLogicNode*)optFindPossibleNode(pLogicSubplan->pNode, tagScanOptShouldBeOptimized);
if (NULL == pScanNode) { if (NULL == pScanNode) {
...@@ -2458,6 +2486,7 @@ static int32_t tagScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubp ...@@ -2458,6 +2486,7 @@ static int32_t tagScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubp
NODES_CLEAR_LIST(pAgg->pChildren); NODES_CLEAR_LIST(pAgg->pChildren);
} }
nodesDestroyNode((SNode*)pAgg); nodesDestroyNode((SNode*)pAgg);
tagScanOptCloneAncestorSlimit((SLogicNode*)pScanNode);
pCxt->optimized = true; pCxt->optimized = true;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册