提交 6669c3a5 编写于 作者: X Xiaoyu Wang

feat: last queries with tags output can be read from cache

上级 1d2764eb
...@@ -362,15 +362,6 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 ...@@ -362,15 +362,6 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
p->ts = pCol->ts; p->ts = pCol->ts;
p->colVal = pCol->colVal; p->colVal = pCol->colVal;
singleTableLastTs = pCol->ts; singleTableLastTs = pCol->ts;
// only set value for last row query
if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_LAST_ROW)) {
if (taosArrayGetSize(pTableUidList) == 0) {
taosArrayPush(pTableUidList, &pKeyInfo->uid);
} else {
taosArraySet(pTableUidList, 0, &pKeyInfo->uid);
}
}
} }
} else { } else {
SLastCol* p = taosArrayGet(pLastCols, slotId); SLastCol* p = taosArrayGet(pLastCols, slotId);
...@@ -417,6 +408,14 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 ...@@ -417,6 +408,14 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
} }
} }
if (0 == i) {
if (taosArrayGetSize(pTableUidList) == 0) {
taosArrayPush(pTableUidList, &pKeyInfo->uid);
} else {
taosArraySet(pTableUidList, 0, &pKeyInfo->uid);
}
}
tsdbCacheRelease(lruCache, h); tsdbCacheRelease(lruCache, h);
} }
......
...@@ -45,13 +45,13 @@ static void destroyCacheScanOperator(void* param); ...@@ -45,13 +45,13 @@ static void destroyCacheScanOperator(void* param);
static int32_t extractCacheScanSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTaskInfo, int32_t** pSlotIds); static int32_t extractCacheScanSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTaskInfo, int32_t** pSlotIds);
static int32_t removeRedundantTsCol(SLastRowScanPhysiNode* pScanNode, SColMatchInfo* pColMatchInfo); static int32_t removeRedundantTsCol(SLastRowScanPhysiNode* pScanNode, SColMatchInfo* pColMatchInfo);
#define SCAN_ROW_TYPE(_t) ((_t)? CACHESCAN_RETRIEVE_LAST : CACHESCAN_RETRIEVE_LAST_ROW) #define SCAN_ROW_TYPE(_t) ((_t) ? CACHESCAN_RETRIEVE_LAST : CACHESCAN_RETRIEVE_LAST_ROW)
SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandle* readHandle, SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandle* readHandle,
STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) { STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SCacheRowsScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SCacheRowsScanInfo)); SCacheRowsScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SCacheRowsScanInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) { if (pInfo == NULL || pOperator == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
tableListDestroy(pTableListInfo); tableListDestroy(pTableListInfo);
...@@ -91,7 +91,8 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe ...@@ -91,7 +91,8 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe
uint64_t suid = tableListGetSuid(pTableListInfo); uint64_t suid = tableListGetSuid(pTableListInfo);
code = tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pList, totalTables, code = tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pList, totalTables,
taosArrayGetSize(pInfo->matchInfo.pList), suid, &pInfo->pLastrowReader, pTaskInfo->id.str); taosArrayGetSize(pInfo->matchInfo.pList), suid, &pInfo->pLastrowReader,
pTaskInfo->id.str);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
...@@ -114,7 +115,8 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe ...@@ -114,7 +115,8 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe
p->pCtx = createSqlFunctionCtx(p->pExprInfo, p->numOfExprs, &p->rowEntryInfoOffset); p->pCtx = createSqlFunctionCtx(p->pExprInfo, p->numOfExprs, &p->rowEntryInfoOffset);
} }
setOperatorInfo(pOperator, "CachedRowScanOperator", QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo); setOperatorInfo(pOperator, "CachedRowScanOperator", QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN, false, OP_NOT_OPENED,
pInfo, pTaskInfo);
pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock); pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);
pOperator->fpSet = pOperator->fpSet =
...@@ -123,7 +125,7 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe ...@@ -123,7 +125,7 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe
pOperator->cost.openCost = 0; pOperator->cost.openCost = 0;
return pOperator; return pOperator;
_error: _error:
pTaskInfo->code = code; pTaskInfo->code = code;
destroyCacheScanOperator(pInfo); destroyCacheScanOperator(pInfo);
taosMemoryFree(pOperator); taosMemoryFree(pOperator);
...@@ -136,8 +138,8 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { ...@@ -136,8 +138,8 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
} }
SCacheRowsScanInfo* pInfo = pOperator->info; SCacheRowsScanInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
STableListInfo* pTableList = pInfo->pTableList; STableListInfo* pTableList = pInfo->pTableList;
uint64_t suid = tableListGetSuid(pTableList); uint64_t suid = tableListGetSuid(pTableList);
int32_t size = tableListGetSize(pTableList); int32_t size = tableListGetSize(pTableList);
...@@ -194,8 +196,8 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { ...@@ -194,8 +196,8 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
pRes->info.rows = 1; pRes->info.rows = 1;
SExprSupp* pSup = &pInfo->pseudoExprSup; SExprSupp* pSup = &pInfo->pseudoExprSup;
int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pRes, int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pRes,
pRes->info.rows, GET_TASKID(pTaskInfo), NULL); pRes->info.rows, GET_TASKID(pTaskInfo), NULL);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
pTaskInfo->code = code; pTaskInfo->code = code;
return NULL; return NULL;
...@@ -217,7 +219,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { ...@@ -217,7 +219,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
} }
STableKeyInfo* pList = NULL; STableKeyInfo* pList = NULL;
int32_t num = 0; int32_t num = 0;
int32_t code = tableListGetGroupList(pTableList, pInfo->currentGroupIndex, &pList, &num); int32_t code = tableListGetGroupList(pTableList, pInfo->currentGroupIndex, &pList, &num);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
...@@ -251,11 +253,9 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { ...@@ -251,11 +253,9 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
pInfo->pRes->info.id.groupId = pKeyInfo->groupId; pInfo->pRes->info.id.groupId = pKeyInfo->groupId;
if (taosArrayGetSize(pInfo->pUidList) > 0) { if (taosArrayGetSize(pInfo->pUidList) > 0) {
ASSERT((pInfo->retrieveType & CACHESCAN_RETRIEVE_LAST_ROW) == CACHESCAN_RETRIEVE_LAST_ROW);
pInfo->pRes->info.id.uid = *(tb_uid_t*)taosArrayGet(pInfo->pUidList, 0); pInfo->pRes->info.id.uid = *(tb_uid_t*)taosArrayGet(pInfo->pUidList, 0);
code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pInfo->pRes, pInfo->pRes->info.rows, code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pInfo->pRes,
GET_TASKID(pTaskInfo), NULL); pInfo->pRes->info.rows, GET_TASKID(pTaskInfo), NULL);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
pTaskInfo->code = code; pTaskInfo->code = code;
return NULL; return NULL;
...@@ -325,7 +325,7 @@ int32_t removeRedundantTsCol(SLastRowScanPhysiNode* pScanNode, SColMatchInfo* pC ...@@ -325,7 +325,7 @@ int32_t removeRedundantTsCol(SLastRowScanPhysiNode* pScanNode, SColMatchInfo* pC
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
size_t size = taosArrayGetSize(pColMatchInfo->pList); size_t size = taosArrayGetSize(pColMatchInfo->pList);
SArray* pMatchInfo = taosArrayInit(size, sizeof(SColMatchItem)); SArray* pMatchInfo = taosArrayInit(size, sizeof(SColMatchItem));
for (int32_t i = 0; i < size; ++i) { for (int32_t i = 0; i < size; ++i) {
......
...@@ -1095,7 +1095,7 @@ static int32_t sortPriKeyOptGetSequencingNodesImpl(SLogicNode* pNode, bool group ...@@ -1095,7 +1095,7 @@ static int32_t sortPriKeyOptGetSequencingNodesImpl(SLogicNode* pNode, bool group
*pNotOptimize = false; *pNotOptimize = false;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
switch (nodeType(pNode)) { switch (nodeType(pNode)) {
case QUERY_NODE_LOGIC_PLAN_SCAN: { case QUERY_NODE_LOGIC_PLAN_SCAN: {
SScanLogicNode* pScan = (SScanLogicNode*)pNode; SScanLogicNode* pScan = (SScanLogicNode*)pNode;
...@@ -2139,7 +2139,7 @@ typedef struct SLastRowScanOptLastParaCkCxt { ...@@ -2139,7 +2139,7 @@ typedef struct SLastRowScanOptLastParaCkCxt {
bool hasCol; bool hasCol;
} SLastRowScanOptLastParaCkCxt; } SLastRowScanOptLastParaCkCxt;
static EDealRes lastRowScanOptLastParaCheckImpl(SNode* pNode, void* pContext) { static EDealRes lastRowScanOptLastParaIsTagImpl(SNode* pNode, void* pContext) {
if (QUERY_NODE_COLUMN == nodeType(pNode)) { if (QUERY_NODE_COLUMN == nodeType(pNode)) {
SLastRowScanOptLastParaCkCxt* pCxt = pContext; SLastRowScanOptLastParaCkCxt* pCxt = pContext;
if (COLUMN_TYPE_TAG == ((SColumnNode*)pNode)->colType || COLUMN_TYPE_TBNAME == ((SColumnNode*)pNode)->colType) { if (COLUMN_TYPE_TAG == ((SColumnNode*)pNode)->colType || COLUMN_TYPE_TBNAME == ((SColumnNode*)pNode)->colType) {
...@@ -2152,10 +2152,10 @@ static EDealRes lastRowScanOptLastParaCheckImpl(SNode* pNode, void* pContext) { ...@@ -2152,10 +2152,10 @@ static EDealRes lastRowScanOptLastParaCheckImpl(SNode* pNode, void* pContext) {
return DEAL_RES_CONTINUE; return DEAL_RES_CONTINUE;
} }
static bool lastRowScanOptLastParaCheck(SNode* pExpr) { static bool lastRowScanOptLastParaIsTag(SNode* pExpr) {
SLastRowScanOptLastParaCkCxt cxt = {.hasTag = false, .hasCol = false}; SLastRowScanOptLastParaCkCxt cxt = {.hasTag = false, .hasCol = false};
nodesWalkExpr(pExpr, lastRowScanOptLastParaCheckImpl, &cxt); nodesWalkExpr(pExpr, lastRowScanOptLastParaIsTagImpl, &cxt);
return !cxt.hasTag && cxt.hasCol; return cxt.hasTag && !cxt.hasCol;
} }
static bool hasSuitableCache(int8_t cacheLastMode, bool hasLastRow, bool hasLast) { static bool hasSuitableCache(int8_t cacheLastMode, bool hasLastRow, bool hasLast) {
...@@ -2195,15 +2195,19 @@ static bool lastRowScanOptMayBeOptimized(SLogicNode* pNode) { ...@@ -2195,15 +2195,19 @@ static bool lastRowScanOptMayBeOptimized(SLogicNode* pNode) {
FOREACH(pFunc, ((SAggLogicNode*)pNode)->pAggFuncs) { FOREACH(pFunc, ((SAggLogicNode*)pNode)->pAggFuncs) {
SFunctionNode* pAggFunc = (SFunctionNode*)pFunc; SFunctionNode* pAggFunc = (SFunctionNode*)pFunc;
if (FUNCTION_TYPE_LAST == pAggFunc->funcType) { if (FUNCTION_TYPE_LAST == pAggFunc->funcType) {
if (hasSelectFunc || !lastRowScanOptLastParaCheck(nodesListGetNode(pAggFunc->pParameterList, 0))) { if (hasSelectFunc) {
return false; return false;
} }
hasLastFunc = true; hasLastFunc = true;
} else if (FUNCTION_TYPE_SELECT_VALUE == pAggFunc->funcType || FUNCTION_TYPE_GROUP_KEY == pAggFunc->funcType) { } else if (FUNCTION_TYPE_SELECT_VALUE == pAggFunc->funcType) {
if (hasLastFunc) { if (hasLastFunc) {
return false; return false;
} }
hasSelectFunc = true; hasSelectFunc = true;
} else if (FUNCTION_TYPE_GROUP_KEY == pAggFunc->funcType) {
if (!lastRowScanOptLastParaIsTag(nodesListGetNode(pAggFunc->pParameterList, 0))) {
return false;
}
} else if (FUNCTION_TYPE_LAST_ROW != pAggFunc->funcType) { } else if (FUNCTION_TYPE_LAST_ROW != pAggFunc->funcType) {
return false; return false;
} }
...@@ -2282,6 +2286,8 @@ static int32_t lastRowScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogic ...@@ -2282,6 +2286,8 @@ static int32_t lastRowScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogic
nodesWalkExpr(nodesListGetNode(pFunc->pParameterList, 0), lastRowScanOptSetColDataType, &cxt); nodesWalkExpr(nodesListGetNode(pFunc->pParameterList, 0), lastRowScanOptSetColDataType, &cxt);
nodesListErase(pFunc->pParameterList, nodesListGetCell(pFunc->pParameterList, 1)); nodesListErase(pFunc->pParameterList, nodesListGetCell(pFunc->pParameterList, 1));
} }
} else if (FUNCTION_TYPE_GROUP_KEY == funcType) {
nodesListMakeAppend(&cxt.pLastCols, nodesListGetNode(pFunc->pParameterList, 0));
} }
} }
...@@ -2291,7 +2297,6 @@ static int32_t lastRowScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogic ...@@ -2291,7 +2297,6 @@ static int32_t lastRowScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogic
if (NULL != cxt.pLastCols) { if (NULL != cxt.pLastCols) {
cxt.doAgg = false; cxt.doAgg = false;
lastRowScanOptSetLastTargets(pScan->pScanCols, cxt.pLastCols); lastRowScanOptSetLastTargets(pScan->pScanCols, cxt.pLastCols);
NODES_DESTORY_LIST(pScan->pScanPseudoCols);
lastRowScanOptSetLastTargets(pScan->node.pTargets, cxt.pLastCols); lastRowScanOptSetLastTargets(pScan->node.pTargets, cxt.pLastCols);
nodesClearList(cxt.pLastCols); nodesClearList(cxt.pLastCols);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册