未验证 提交 919d98d3 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #21066 from taosdata/szhou/tag-scan-group-slimit

feature: tag scan group and slimit
......@@ -366,6 +366,7 @@ typedef struct STagScanInfo {
int32_t curPos;
SReadHandle readHandle;
STableListInfo* pTableListInfo;
SLimitNode* pSlimit;
} STagScanInfo;
typedef enum EStreamScanMode {
......
......@@ -2512,51 +2512,33 @@ _error:
return NULL;
}
static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
static void doTagScanOneTable(SOperatorInfo* pOperator, const SSDataBlock* pRes, int32_t count, SMetaReader* mr) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
STagScanInfo* pInfo = pOperator->info;
SExprInfo* pExprInfo = &pOperator->exprSupp.pExprInfo[0];
SSDataBlock* pRes = pInfo->pRes;
blockDataCleanup(pRes);
int32_t size = tableListGetSize(pInfo->pTableListInfo);
if (size == 0) {
setTaskStatus(pTaskInfo, TASK_COMPLETED);
return NULL;
}
char str[512] = {0};
int32_t count = 0;
SMetaReader mr = {0};
metaReaderInit(&mr, pInfo->readHandle.meta, 0);
while (pInfo->curPos < size && count < pOperator->resultInfo.capacity) {
STableKeyInfo* item = tableListGetInfo(pInfo->pTableListInfo, pInfo->curPos);
int32_t code = metaGetTableEntryByUid(&mr, item->uid);
tDecoderClear(&mr.coder);
int32_t code = metaGetTableEntryByUid(mr, item->uid);
tDecoderClear(&(*mr).coder);
if (code != TSDB_CODE_SUCCESS) {
qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", item->uid, tstrerror(terrno),
GET_TASKID(pTaskInfo));
metaReaderClear(&mr);
metaReaderClear(mr);
T_LONG_JMP(pTaskInfo->env, terrno);
}
char str[512];
for (int32_t j = 0; j < pOperator->exprSupp.numOfExprs; ++j) {
SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pExprInfo[j].base.resSchema.slotId);
// refactor later
if (fmIsScanPseudoColumnFunc(pExprInfo[j].pExpr->_function.functionId)) {
STR_TO_VARSTR(str, mr.me.name);
colDataSetVal(pDst, count, str, false);
STR_TO_VARSTR(str, (*mr).me.name);
colDataSetVal(pDst, (count), str, false);
} else { // it is a tag value
STagVal val = {0};
val.cid = pExprInfo[j].base.pParam[0].pCol->colId;
const char* p = metaGetTableTagVal(mr.me.ctbEntry.pTags, pDst->info.type, &val);
const char* p = metaGetTableTagVal((*mr).me.ctbEntry.pTags, pDst->info.type, &val);
char* data = NULL;
if (pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL) {
......@@ -2564,7 +2546,7 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
} else {
data = (char*)p;
}
colDataSetVal(pDst, count, data,
colDataSetVal(pDst, (count), data,
(data == NULL) || (pDst->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data)));
if (pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL && IS_VAR_DATA_TYPE(((const STagVal*)p)->type) &&
......@@ -2573,11 +2555,48 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
}
}
}
}
static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
STagScanInfo* pInfo = pOperator->info;
SExprInfo* pExprInfo = &pOperator->exprSupp.pExprInfo[0];
SSDataBlock* pRes = pInfo->pRes;
blockDataCleanup(pRes);
int32_t size = tableListGetSize(pInfo->pTableListInfo);
if (size == 0) {
setTaskStatus(pTaskInfo, TASK_COMPLETED);
return NULL;
}
count += 1;
char str[512] = {0};
int32_t count = 0;
SMetaReader mr = {0};
metaReaderInit(&mr, pInfo->readHandle.meta, 0);
while (pInfo->curPos < size && count < pOperator->resultInfo.capacity) {
doTagScanOneTable(pOperator, pRes, count, &mr);
++count;
if (++pInfo->curPos >= size) {
setOperatorCompleted(pOperator);
}
// each table with tbname is a group, hence its own block, but only group when slimit exists for performance reason.
if (pInfo->pSlimit != NULL) {
if (pInfo->curPos < pInfo->pSlimit->offset) {
continue;
}
pInfo->pRes->info.id.groupId = calcGroupId(mr.me.name, strlen(mr.me.name));
if (pInfo->curPos >= (pInfo->pSlimit->offset + pInfo->pSlimit->limit) - 1) {
setOperatorCompleted(pOperator);
}
break;
}
}
metaReaderClear(&mr);
......@@ -2628,6 +2647,7 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi
pInfo->pRes = createDataBlockFromDescNode(pDescNode);
pInfo->readHandle = *pReadHandle;
pInfo->curPos = 0;
pInfo->pSlimit = (SLimitNode*)pPhyNode->node.pSlimit; //TODO: slimit now only indicate group
setOperatorInfo(pOperator, "TagScanOperator", QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN, false, OP_NOT_OPENED, pInfo,
pTaskInfo);
......
......@@ -2418,6 +2418,36 @@ static bool tagScanOptShouldBeOptimized(SLogicNode* pNode) {
return true;
}
static SLogicNode* tagScanOptFindAncestorWithSlimit(SLogicNode* pTableScanNode) {
SLogicNode* pNode = pTableScanNode->pParent;
while (NULL != pNode) {
if (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pNode) || QUERY_NODE_LOGIC_PLAN_AGG == nodeType(pNode) ||
QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pNode) || QUERY_NODE_LOGIC_PLAN_SORT == nodeType(pNode)) {
return NULL;
}
if (NULL != pNode->pSlimit) {
return pNode;
}
pNode = pNode->pParent;
}
return NULL;
}
static void tagScanOptCloneAncestorSlimit(SLogicNode* pTableScanNode) {
if (NULL != pTableScanNode->pSlimit) {
return;
}
SLogicNode* pNode = tagScanOptFindAncestorWithSlimit(pTableScanNode);
if (NULL != pNode) {
//TODO: only set the slimit now. push down slimit later
pTableScanNode->pSlimit = nodesCloneNode(pNode->pSlimit);
((SLimitNode*)pTableScanNode->pSlimit)->limit += ((SLimitNode*)pTableScanNode->pSlimit)->offset;
((SLimitNode*)pTableScanNode->pSlimit)->offset = 0;
}
return;
}
static int32_t tagScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) {
SScanLogicNode* pScanNode = (SScanLogicNode*)optFindPossibleNode(pLogicSubplan->pNode, tagScanOptShouldBeOptimized);
if (NULL == pScanNode) {
......@@ -2458,6 +2488,7 @@ static int32_t tagScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubp
NODES_CLEAR_LIST(pAgg->pChildren);
}
nodesDestroyNode((SNode*)pAgg);
tagScanOptCloneAncestorSlimit((SLogicNode*)pScanNode);
pCxt->optimized = true;
return TSDB_CODE_SUCCESS;
}
......
......@@ -887,6 +887,7 @@
,,y,script,./test.sh -f tsim/query/emptyTsRange.sim
,,y,script,./test.sh -f tsim/query/partitionby.sim
,,y,script,./test.sh -f tsim/query/tableCount.sim
,,y,script,./test.sh -f tsim/query/tag_scan.sim
,,y,script,./test.sh -f tsim/query/nullColSma.sim
,,y,script,./test.sh -f tsim/qnode/basic1.sim
,,y,script,./test.sh -f tsim/snode/basic1.sim
......
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
sql connect
sql drop database if exists test
sql create database test;
sql use test;
sql create table st(ts timestamp, f int) tags (t int);
sql insert into ct1 using st tags(1) values(now, 1);
sql insert into ct2 using st tags(2) values(now, 2);
sql insert into ct3 using st tags(3) values(now, 3);
sql insert into ct4 using st tags(4) values(now, 4);
sql create table st2(ts timestamp, f int) tags (t int);
sql insert into ct21 using st2 tags(1) values(now, 1);
sql insert into ct22 using st2 tags(2) values(now, 2);
sql insert into ct23 using st2 tags(3) values(now, 3);
sql insert into ct24 using st2 tags(4) values(now, 4);
sql select tbname, 1 from st group by tbname order by tbname;
print $rows $data00 $data10 $data20
if $rows != 4 then
return -1
endi
if $data00 != @ct1@ then
return -1
endi
if $data10 != @ct2@ then
return -1
endi
sql select tbname, 1 from st group by tbname slimit 0, 1;
print $rows
if $rows != 1 then
return -1
endi
sql select tbname, 1 from st group by tbname slimit 2, 2;
print $rows $data00 $data10
if $rows != 2 then
return -1
endi
sql select tbname, 1 from st group by tbname order by tbname slimit 0, 1;
print $rows $data00 $data10 $data20
if $rows != 4 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册