提交 b9a47162 编写于 作者: H Haojun Liao

fix(query): handle multi group last_row query.

上级 fd2dec28
...@@ -317,12 +317,16 @@ typedef struct STagScanInfo { ...@@ -317,12 +317,16 @@ typedef struct STagScanInfo {
typedef struct SLastrowScanInfo { typedef struct SLastrowScanInfo {
SSDataBlock *pRes; SSDataBlock *pRes;
SArray *pTableList;
SReadHandle readHandle; SReadHandle readHandle;
void *pLastrowReader; void *pLastrowReader;
SArray *pColMatchInfo; SArray *pColMatchInfo;
int32_t *pSlotIds; int32_t *pSlotIds;
SExprSupp pseudoExprSup; SExprSupp pseudoExprSup;
int32_t retrieveType;
int32_t currentGroupIndex;
SSDataBlock *pBufferredRes;
SArray *pUidList;
int32_t indexOfBufferedRes;
} SLastrowScanInfo; } SLastrowScanInfo;
typedef enum EStreamScanMode { typedef enum EStreamScanMode {
...@@ -826,8 +830,7 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys ...@@ -826,8 +830,7 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys
SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortPhyNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortPhyNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** dowStreams, size_t numStreams, SMergePhysiNode* pMergePhysiNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** dowStreams, size_t numStreams, SMergePhysiNode* pMergePhysiNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t num, SArray* pSortInfo, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo); SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t num, SArray* pSortInfo, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createLastrowScanOperator(SLastRowScanPhysiNode* pTableScanNode, SReadHandle* readHandle, SOperatorInfo* createLastrowScanOperator(SLastRowScanPhysiNode* pTableScanNode, SReadHandle* readHandle, SExecTaskInfo* pTaskInfo);
SArray* pTableList, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId, SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
...@@ -945,8 +948,9 @@ int32_t finalizeResultRowIntoResultDataBlock(SDiskbasedBuf* pBuf, SResultRowPosi ...@@ -945,8 +948,9 @@ int32_t finalizeResultRowIntoResultDataBlock(SDiskbasedBuf* pBuf, SResultRowPosi
SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, int32_t numOfExprs, const int32_t* rowCellOffset, SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, int32_t numOfExprs, const int32_t* rowCellOffset,
SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo); SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo);
int32_t createScanTableListInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, int32_t createScanTableListInfo(SScanPhysiNode *pScanNode, SNodeList* pGroupTags, bool groupSort, SReadHandle* pHandle,
STableListInfo* pTableListInfo, uint64_t queryId, uint64_t taskId); STableListInfo* pTableListInfo, uint64_t queryId, uint64_t taskId);
SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSortPhysiNode* pSortPhyNode, SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSortPhysiNode* pSortPhyNode,
SExecTaskInfo* pTaskInfo); SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, STableListInfo *pTableListInfo, SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, STableListInfo *pTableListInfo,
......
...@@ -30,15 +30,13 @@ static SSDataBlock* doScanLastrow(SOperatorInfo* pOperator); ...@@ -30,15 +30,13 @@ static SSDataBlock* doScanLastrow(SOperatorInfo* pOperator);
static void destroyLastrowScanOperator(void* param, int32_t numOfOutput); static void destroyLastrowScanOperator(void* param, int32_t numOfOutput);
static int32_t extractTargetSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTaskInfo, int32_t** pSlotIds); static int32_t extractTargetSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTaskInfo, int32_t** pSlotIds);
SOperatorInfo* createLastrowScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandle* readHandle, SArray* pTableList, SOperatorInfo* createLastrowScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandle* readHandle, SExecTaskInfo* pTaskInfo) {
SExecTaskInfo* pTaskInfo) {
SLastrowScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SLastrowScanInfo)); SLastrowScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SLastrowScanInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) { if (pInfo == NULL || pOperator == NULL) {
goto _error; goto _error;
} }
pInfo->pTableList = pTableList;
pInfo->readHandle = *readHandle; pInfo->readHandle = *readHandle;
pInfo->pRes = createResDataBlock(pScanNode->scan.node.pOutputDataBlockDesc); pInfo->pRes = createResDataBlock(pScanNode->scan.node.pOutputDataBlockDesc);
...@@ -50,8 +48,22 @@ SOperatorInfo* createLastrowScanOperator(SLastRowScanPhysiNode* pScanNode, SRead ...@@ -50,8 +48,22 @@ SOperatorInfo* createLastrowScanOperator(SLastRowScanPhysiNode* pScanNode, SRead
goto _error; goto _error;
} }
tsdbLastRowReaderOpen(readHandle->vnode, LASTROW_RETRIEVE_TYPE_SINGLE, pTableList, taosArrayGetSize(pInfo->pColMatchInfo), STableListInfo* pTableList = &pTaskInfo->tableqinfoList;
&pInfo->pLastrowReader);
initResultSizeInfo(pOperator, 1024);
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
pInfo->pUidList = taosArrayInit(4, sizeof(int64_t));
// partition by tbname
if (taosArrayGetSize(pTableList->pGroupList) == taosArrayGetSize(pTableList->pTableList)) {
pInfo->retrieveType = LASTROW_RETRIEVE_TYPE_ALL;
tsdbLastRowReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pTableList->pTableList,
taosArrayGetSize(pInfo->pColMatchInfo), &pInfo->pLastrowReader);
pInfo->pBufferredRes = createOneDataBlock(pInfo->pRes, false);
blockDataEnsureCapacity(pInfo->pBufferredRes, pOperator->resultInfo.capacity);
} else { // by tags
pInfo->retrieveType = LASTROW_RETRIEVE_TYPE_SINGLE;
}
if (pScanNode->scan.pScanPseudoCols != NULL) { if (pScanNode->scan.pScanPseudoCols != NULL) {
SExprSupp* pPseudoExpr = &pInfo->pseudoExprSup; SExprSupp* pPseudoExpr = &pInfo->pseudoExprSup;
...@@ -60,19 +72,17 @@ SOperatorInfo* createLastrowScanOperator(SLastRowScanPhysiNode* pScanNode, SRead ...@@ -60,19 +72,17 @@ SOperatorInfo* createLastrowScanOperator(SLastRowScanPhysiNode* pScanNode, SRead
pPseudoExpr->pCtx = createSqlFunctionCtx(pPseudoExpr->pExprInfo, pPseudoExpr->numOfExprs, &pPseudoExpr->rowEntryInfoOffset); pPseudoExpr->pCtx = createSqlFunctionCtx(pPseudoExpr->pExprInfo, pPseudoExpr->numOfExprs, &pPseudoExpr->rowEntryInfoOffset);
} }
pOperator->name = "LastrowScanOperator"; pOperator->name = "LastrowScanOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN;
pOperator->blocking = false; pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock); pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);
initResultSizeInfo(pOperator, 1024);
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
pOperator->fpSet = pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, doScanLastrow, NULL, NULL, destroyLastrowScanOperator, NULL, NULL, NULL); createOperatorFpSet(operatorDummyOpenFn, doScanLastrow, NULL, NULL, destroyLastrowScanOperator, NULL, NULL, NULL);
pOperator->cost.openCost = 0; pOperator->cost.openCost = 0;
return pOperator; return pOperator;
...@@ -90,43 +100,105 @@ SSDataBlock* doScanLastrow(SOperatorInfo* pOperator) { ...@@ -90,43 +100,105 @@ SSDataBlock* doScanLastrow(SOperatorInfo* pOperator) {
SLastrowScanInfo* pInfo = pOperator->info; SLastrowScanInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
STableListInfo* pTableList = &pTaskInfo->tableqinfoList;
int32_t size = taosArrayGetSize(pInfo->pTableList); int32_t size = taosArrayGetSize(pTableList->pTableList);
if (size == 0) { if (size == 0) {
setTaskStatus(pTaskInfo, TASK_COMPLETED); doSetOperatorCompleted(pOperator);
return NULL; return NULL;
} }
blockDataCleanup(pInfo->pRes);
// check if it is a group by tbname // check if it is a group by tbname
if (size == taosArrayGetSize(pInfo->pTableList)) { if (pInfo->retrieveType == LASTROW_RETRIEVE_TYPE_ALL) {
blockDataCleanup(pInfo->pRes); if (pInfo->indexOfBufferedRes >= pInfo->pBufferredRes->info.rows) {
SArray* pUidList = taosArrayInit(1, sizeof(tb_uid_t)); blockDataCleanup(pInfo->pBufferredRes);
int32_t code = tsdbRetrieveLastRow(pInfo->pLastrowReader, pInfo->pRes, pInfo->pSlotIds, pUidList); taosArrayClear(pInfo->pUidList);
if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, code); int32_t code = tsdbRetrieveLastRow(pInfo->pLastrowReader, pInfo->pBufferredRes, pInfo->pSlotIds, pInfo->pUidList);
if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, code);
}
// check for tag values
int32_t resultRows = pInfo->pBufferredRes->info.rows;
ASSERT(resultRows == taosArrayGetSize(pInfo->pUidList));
pInfo->indexOfBufferedRes = 0;
}
if (pInfo->indexOfBufferedRes < pInfo->pBufferredRes->info.rows) {
for(int32_t i = 0; i < taosArrayGetSize(pInfo->pColMatchInfo); ++i) {
SColMatchInfo* pMatchInfo = taosArrayGet(pInfo->pColMatchInfo, i);
int32_t slotId = pMatchInfo->targetSlotId;
SColumnInfoData* pSrc = taosArrayGet(pInfo->pBufferredRes->pDataBlock, slotId);
SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, slotId);
char* p = colDataGetData(pSrc, pInfo->indexOfBufferedRes);
bool isNull = colDataIsNull_s(pSrc, pInfo->indexOfBufferedRes);
colDataAppend(pDst, 0, p, isNull);
}
if (pInfo->pseudoExprSup.numOfExprs > 0) {
SExprSupp* pSup = &pInfo->pseudoExprSup;
addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pInfo->pRes,
GET_TASKID(pTaskInfo));
}
pInfo->pRes->info.uid = *(tb_uid_t*)taosArrayGet(pInfo->pUidList, pInfo->indexOfBufferedRes);
int64_t* groupId = taosHashGet(pTableList->map, &pInfo->pRes->info.uid, sizeof(int64_t));
pInfo->pRes->info.groupId = *groupId;
pInfo->indexOfBufferedRes += 1;
pInfo->pRes->info.rows = 1;
return pInfo->pRes;
} else {
doSetOperatorCompleted(pOperator);
return NULL;
} }
} else {
size_t totalGroups = taosArrayGetSize(pTableList->pGroupList);
while (pInfo->currentGroupIndex < totalGroups) {
SArray* pGroupTableList = taosArrayGetP(pTableList->pGroupList, pInfo->currentGroupIndex);
tsdbLastRowReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pGroupTableList,
taosArrayGetSize(pInfo->pColMatchInfo), &pInfo->pLastrowReader);
taosArrayClear(pInfo->pUidList);
int32_t code = tsdbRetrieveLastRow(pInfo->pLastrowReader, pInfo->pRes, pInfo->pSlotIds, pInfo->pUidList);
if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, code);
}
// check for tag values pInfo->currentGroupIndex += 1;
if (pInfo->pRes->info.rows > 0 && pInfo->pseudoExprSup.numOfExprs > 0) {
SExprSupp* pSup = &pInfo->pseudoExprSup; // check for tag values
pInfo->pRes->info.uid = *(tb_uid_t*) taosArrayGet(pUidList, 0); if (pInfo->pRes->info.rows > 0) {
addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pInfo->pRes, GET_TASKID(pTaskInfo)); if (pInfo->pseudoExprSup.numOfExprs > 0) {
SExprSupp* pSup = &pInfo->pseudoExprSup;
pInfo->pRes->info.uid = *(tb_uid_t*)taosArrayGet(pInfo->pUidList, 0);
STableKeyInfo* pKeyInfo = taosArrayGet(pGroupTableList, 0);
pInfo->pRes->info.groupId = pKeyInfo->groupId;
addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pInfo->pRes,
GET_TASKID(pTaskInfo));
}
tsdbLastrowReaderClose(pInfo->pLastrowReader);
return pInfo->pRes;
}
} }
doSetOperatorCompleted(pOperator); doSetOperatorCompleted(pOperator);
return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes; return NULL;
} else {
// todo fetch the result for each group
} }
return pInfo->pRes->info.rows == 0 ? NULL : pInfo->pRes;
} }
void destroyLastrowScanOperator(void* param, int32_t numOfOutput) { void destroyLastrowScanOperator(void* param, int32_t numOfOutput) {
SLastrowScanInfo* pInfo = (SLastrowScanInfo*)param; SLastrowScanInfo* pInfo = (SLastrowScanInfo*)param;
blockDataDestroy(pInfo->pRes); blockDataDestroy(pInfo->pRes);
tsdbLastrowReaderClose(pInfo->pLastrowReader);
taosMemoryFreeClear(param); taosMemoryFreeClear(param);
} }
......
...@@ -516,6 +516,7 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt ...@@ -516,6 +516,7 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt
// NOTE: the last parameter is the primary timestamp column // NOTE: the last parameter is the primary timestamp column
if (fmIsTimelineFunc(pCtx[i].functionId) && (j == pOneExpr->base.numOfParams - 1)) { if (fmIsTimelineFunc(pCtx[i].functionId) && (j == pOneExpr->base.numOfParams - 1)) {
pInput->pPTS = pInput->pData[j]; pInput->pPTS = pInput->pData[j];
ASSERT(pInput->pPTS->info.type == TSDB_DATA_TYPE_TIMESTAMP);
} }
ASSERT(pInput->pData[j] != NULL); ASSERT(pInput->pData[j] != NULL);
} else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) { } else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
...@@ -4291,6 +4292,7 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, ...@@ -4291,6 +4292,7 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle,
} }
} }
} }
int32_t len = (int32_t)(pStart - (char*)keyBuf); int32_t len = (int32_t)(pStart - (char*)keyBuf);
uint64_t groupId = calcGroupId(keyBuf, len); uint64_t groupId = calcGroupId(keyBuf, len);
taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), &groupId, sizeof(uint64_t)); taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), &groupId, sizeof(uint64_t));
...@@ -4309,6 +4311,30 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, ...@@ -4309,6 +4311,30 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle,
return TDB_CODE_SUCCESS; return TDB_CODE_SUCCESS;
} }
static int32_t initTableblockDistQueryCond(uint64_t uid, SQueryTableDataCond* pCond) {
memset(pCond, 0, sizeof(SQueryTableDataCond));
pCond->order = TSDB_ORDER_ASC;
pCond->numOfCols = 1;
pCond->colList = taosMemoryCalloc(1, sizeof(SColumnInfo));
if (pCond->colList == NULL) {
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
return terrno;
}
pCond->colList->colId = 1;
pCond->colList->type = TSDB_DATA_TYPE_TIMESTAMP;
pCond->colList->bytes = sizeof(TSKEY);
pCond->twindows = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX};
pCond->suid = uid;
pCond->type = BLOCK_LOAD_OFFSET_ORDER;
pCond->startVersion = -1;
pCond->endVersion = -1;
return TSDB_CODE_SUCCESS;
}
SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle,
uint64_t queryId, uint64_t taskId, STableListInfo* pTableListInfo, uint64_t queryId, uint64_t taskId, STableListInfo* pTableListInfo,
const char* pUser) { const char* pUser) {
...@@ -4318,7 +4344,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -4318,7 +4344,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == type) { if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == type) {
STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode; STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
int32_t code = createScanTableListInfo(pTableScanNode, pHandle, pTableListInfo, queryId, taskId); int32_t code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags,
pTableScanNode->groupSort, pHandle, pTableListInfo, queryId, taskId);
if (code) { if (code) {
pTaskInfo->code = code; pTaskInfo->code = code;
return NULL; return NULL;
...@@ -4337,7 +4364,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -4337,7 +4364,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
} else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == type) {
STableMergeScanPhysiNode* pTableScanNode = (STableMergeScanPhysiNode*)pPhyNode; STableMergeScanPhysiNode* pTableScanNode = (STableMergeScanPhysiNode*)pPhyNode;
int32_t code = createScanTableListInfo(pTableScanNode, pHandle, pTableListInfo, queryId, taskId); int32_t code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags,
pTableScanNode->groupSort, pHandle, pTableListInfo, queryId, taskId);
if (code) { if (code) {
pTaskInfo->code = code; pTaskInfo->code = code;
return NULL; return NULL;
...@@ -4366,7 +4394,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -4366,7 +4394,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
.maxTs = INT64_MIN, .maxTs = INT64_MIN,
}; };
if (pHandle) { if (pHandle) {
int32_t code = createScanTableListInfo(pTableScanNode, pHandle, pTableListInfo, queryId, taskId); int32_t code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags,
pTableScanNode->groupSort, pHandle, pTableListInfo, queryId, taskId);
if (code) { if (code) {
pTaskInfo->code = code; pTaskInfo->code = code;
return NULL; return NULL;
...@@ -4406,25 +4435,9 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -4406,25 +4435,9 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
} }
SQueryTableDataCond cond = {0}; SQueryTableDataCond cond = {0};
int32_t code = initTableblockDistQueryCond(pBlockNode->suid, &cond);
{ if (code != TSDB_CODE_SUCCESS) {
cond.order = TSDB_ORDER_ASC; return NULL;
cond.numOfCols = 1;
cond.colList = taosMemoryCalloc(1, sizeof(SColumnInfo));
if (cond.colList == NULL) {
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
return NULL;
}
cond.colList->colId = 1;
cond.colList->type = TSDB_DATA_TYPE_TIMESTAMP;
cond.colList->bytes = sizeof(TSKEY);
cond.twindows = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX};
cond.suid = pBlockNode->suid;
cond.type = BLOCK_LOAD_OFFSET_ORDER;
cond.startVersion = -1;
cond.endVersion = -1;
} }
STsdbReader* pReader = NULL; STsdbReader* pReader = NULL;
...@@ -4435,31 +4448,20 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -4435,31 +4448,20 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
} else if (QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN == type) {
SLastRowScanPhysiNode* pScanNode = (SLastRowScanPhysiNode*)pPhyNode; SLastRowScanPhysiNode* pScanNode = (SLastRowScanPhysiNode*)pPhyNode;
// int32_t code = createScanTableListInfo(pTableScanNode, pHandle, pTableListInfo, queryId, taskId); int32_t code = createScanTableListInfo(&pScanNode->scan, pScanNode->pGroupTags, true, pHandle, pTableListInfo,
// if (code) { queryId, taskId);
// pTaskInfo->code = code;
// return NULL;
// }
int32_t code = extractTableSchemaInfo(pHandle, pScanNode->scan.uid, pTaskInfo);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
pTaskInfo->code = code; pTaskInfo->code = code;
return NULL; return NULL;
} }
pTableListInfo->pTableList = taosArrayInit(4, sizeof(STableKeyInfo)); code = extractTableSchemaInfo(pHandle, pScanNode->scan.uid, pTaskInfo);
if (pScanNode->scan.tableType == TSDB_SUPER_TABLE) { if (code != TSDB_CODE_SUCCESS) {
code = vnodeGetAllTableList(pHandle->vnode, pScanNode->scan.uid, pTableListInfo->pTableList); pTaskInfo->code = code;
if (code != TSDB_CODE_SUCCESS) { return NULL;
pTaskInfo->code = terrno;
return NULL;
}
} else { // Create one table group.
STableKeyInfo info = {.lastKey = 0, .uid = pScanNode->scan.uid, .groupId = 0};
taosArrayPush(pTableListInfo->pTableList, &info);
} }
return createLastrowScanOperator(pScanNode, pHandle, pTableListInfo->pTableList, pTaskInfo); return createLastrowScanOperator(pScanNode, pHandle, pTaskInfo);
} else { } else {
ASSERT(0); ASSERT(0);
} }
...@@ -4928,6 +4930,9 @@ static void doDestroyTableList(STableListInfo* pTableqinfoList) { ...@@ -4928,6 +4930,9 @@ static void doDestroyTableList(STableListInfo* pTableqinfoList) {
if (pTableqinfoList->needSortTableByGroupId) { if (pTableqinfoList->needSortTableByGroupId) {
for (int32_t i = 0; i < taosArrayGetSize(pTableqinfoList->pGroupList); i++) { for (int32_t i = 0; i < taosArrayGetSize(pTableqinfoList->pGroupList); i++) {
SArray* tmp = taosArrayGetP(pTableqinfoList->pGroupList, i); SArray* tmp = taosArrayGetP(pTableqinfoList->pGroupList, i);
if (tmp == pTableqinfoList->pTableList) {
continue;
}
taosArrayDestroy(tmp); taosArrayDestroy(tmp);
} }
} }
......
...@@ -2222,7 +2222,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { ...@@ -2222,7 +2222,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{ {
.name = "_cache_last_row", .name = "_cache_last_row",
.type = FUNCTION_TYPE_CACHE_LAST_ROW, .type = FUNCTION_TYPE_CACHE_LAST_ROW,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC, .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC,
.translateFunc = translateFirstLast, .translateFunc = translateFirstLast,
.getEnvFunc = getFirstLastFuncEnv, .getEnvFunc = getFirstLastFuncEnv,
.initFunc = functionSetup, .initFunc = functionSetup,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册