提交 649cf7e5 编写于 作者: H Haojun Liao

fix(query): support last_row(tags) for super table query.

上级 2c1efc39
......@@ -138,7 +138,7 @@ void *tsdbGetIdx(SMeta *pMeta);
void *tsdbGetIvtIdx(SMeta *pMeta);
int32_t tsdbLastRowReaderOpen(void *pVnode, int32_t type, SArray *pTableIdList, int32_t numOfCols, void **pReader);
int32_t tsdbRetrieveLastRow(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds);
int32_t tsdbRetrieveLastRow(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, SArray* pTableUids);
int32_t tsdbLastrowReaderClose(void *pReader);
int32_t tsdbGetTableSchema(SVnode *pVnode, int64_t uid, STSchema **pSchema, int64_t *suid);
......
......@@ -104,7 +104,7 @@ int32_t tsdbLastrowReaderClose(void* pReader) {
return TSDB_CODE_SUCCESS;
}
int32_t tsdbRetrieveLastRow(void* pReader, SSDataBlock* pResBlock, const int32_t* slotIds) {
int32_t tsdbRetrieveLastRow(void* pReader, SSDataBlock* pResBlock, const int32_t* slotIds, SArray* pTableUidList) {
if (pReader == NULL || pResBlock == NULL) {
return TSDB_CODE_INVALID_PARA;
}
......@@ -141,14 +141,15 @@ int32_t tsdbRetrieveLastRow(void* pReader, SSDataBlock* pResBlock, const int32_t
// appended or not.
if (internalResult) {
pResBlock->info.rows -= 1;
taosArrayClear(pTableUidList);
}
saveOneRow(pRow, pResBlock, pr, slotIds);
taosArrayPush(pTableUidList, &pKeyInfo->uid);
internalResult = true;
lastKey = pRow->ts;
}
// taosMemoryFree(pRow);
tsdbCacheRelease(lruCache, h);
}
} else if (pr->type == LASTROW_RETRIEVE_TYPE_ALL) {
......@@ -171,6 +172,7 @@ int32_t tsdbRetrieveLastRow(void* pReader, SSDataBlock* pResBlock, const int32_t
// tsdbCacheLastArray2Row(pLast, &pRow, pr->pSchema);
saveOneRow(pRow, pResBlock, pr, slotIds);
taosArrayPush(pTableUidList, &pKeyInfo->uid);
// taosMemoryFree(pRow);
tsdbCacheRelease(lruCache, h);
......
......@@ -319,6 +319,7 @@ typedef struct SLastrowScanInfo {
void *pLastrowReader;
SArray *pColMatchInfo;
int32_t *pSlotIds;
SExprSupp pseudoExprSup;
} SLastrowScanInfo;
typedef enum EStreamScanMode {
......@@ -787,6 +788,8 @@ int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaul
void doSetOperatorCompleted(SOperatorInfo* pOperator);
void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock);
int32_t addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int32_t numOfPseudoExpr,
SSDataBlock* pBlock, const char* idStr);
void cleanupAggSup(SAggSupporter* pAggSup);
void destroyBasicOperatorInfo(void* param, int32_t numOfOutput);
......
......@@ -45,20 +45,20 @@ SOperatorInfo* createLastrowScanOperator(SLastRowScanPhysiNode* pScanNode, SRead
int32_t numOfCols = 0;
pInfo->pColMatchInfo = extractColMatchInfo(pScanNode->pScanCols, pScanNode->node.pOutputDataBlockDesc, &numOfCols,
COL_MATCH_FROM_COL_ID);
int32_t* pCols = taosMemoryMalloc(numOfCols * sizeof(int32_t));
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pColMatchInfo); ++i) {
SColMatchInfo* pColMatch = taosArrayGet(pInfo->pColMatchInfo, i);
pCols[i] = pColMatch->colId;
}
int32_t code = extractTargetSlotId(pInfo->pColMatchInfo, pTaskInfo, &pInfo->pSlotIds);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
tsdbLastRowReaderOpen(readHandle->vnode, LASTROW_RETRIEVE_TYPE_ALL, pTableList, taosArrayGetSize(pInfo->pColMatchInfo),
tsdbLastRowReaderOpen(readHandle->vnode, LASTROW_RETRIEVE_TYPE_SINGLE, pTableList, taosArrayGetSize(pInfo->pColMatchInfo),
&pInfo->pLastrowReader);
taosMemoryFree(pCols);
if (pScanNode->pScanPseudoCols != NULL) {
SExprSupp* pPseudoExpr = &pInfo->pseudoExprSup;
pPseudoExpr->pExprInfo = createExprInfo(pScanNode->pScanPseudoCols, NULL, &pPseudoExpr->numOfExprs);
pPseudoExpr->pCtx = createSqlFunctionCtx(pPseudoExpr->pExprInfo, pPseudoExpr->numOfExprs, &pPseudoExpr->rowEntryInfoOffset);
}
pOperator->name = "LastrowScanOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN;
......@@ -100,7 +100,20 @@ SSDataBlock* doScanLastrow(SOperatorInfo* pOperator) {
// check if it is a group by tbname
if (size == taosArrayGetSize(pInfo->pTableList)) {
blockDataCleanup(pInfo->pRes);
tsdbRetrieveLastRow(pInfo->pLastrowReader, pInfo->pRes, pInfo->pSlotIds);
SArray* pUidList = taosArrayInit(1, sizeof(tb_uid_t));
int32_t code = tsdbRetrieveLastRow(pInfo->pLastrowReader, pInfo->pRes, pInfo->pSlotIds, pUidList);
if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, code);
}
// check for tag values
if (pInfo->pRes->info.rows > 0 && pInfo->pseudoExprSup.numOfExprs > 0) {
SExprSupp* pSup = &pInfo->pseudoExprSup;
pInfo->pRes->info.uid = *(tb_uid_t*) taosArrayGet(pUidList, 0);
addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pInfo->pRes, GET_TASKID(pTaskInfo));
}
doSetOperatorCompleted(pOperator);
return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes;
} else {
// todo fetch the result for each group
......
......@@ -39,8 +39,6 @@ static int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo, int32_t capac
static int32_t buildDbTableInfoBlock(const SSDataBlock* p, const SSysTableMeta* pSysDbTableMeta, size_t size,
const char* dbName);
static int32_t addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int32_t numOfPseudoExpr,
SSDataBlock* pBlock, const char* idStr);
static bool processBlockWithProbability(const SSampleExecInfo* pInfo);
bool processBlockWithProbability(const SSampleExecInfo* pInfo) {
......@@ -320,8 +318,6 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int
int32_t dstSlotId = pExpr->base.resSchema.slotId;
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotId);
colInfoDataEnsureCapacity(pColInfoData, pBlock->info.rows);
colInfoDataCleanup(pColInfoData, pBlock->info.rows);
int32_t functionId = pExpr->pExpr->_function.functionId;
......
......@@ -80,11 +80,12 @@ typedef struct STopBotRes {
} STopBotRes;
typedef struct SFirstLastRes {
bool hasResult;
bool hasResult;
// used for last_row function only, isNullRes in SResultRowEntry can not be passed to downstream.So,
// this attribute is required
bool isNull;
bool isNull;
int32_t bytes;
int64_t ts;
char buf[];
} SFirstLastRes;
......@@ -2951,6 +2952,7 @@ int32_t firstLastFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
SFirstLastRes* pRes = GET_ROWCELL_INTERBUF(pResInfo);
colDataAppend(pCol, pBlock->info.rows, pRes->buf, pRes->isNull||pResInfo->isNullRes);
// handle selectivity
STuplePos* pTuplePos = (STuplePos*)(pRes->buf + pRes->bytes + sizeof(TSKEY));
setSelectivityValue(pCtx, pBlock, pTuplePos, pBlock->info.rows);
......@@ -5988,7 +5990,7 @@ int32_t lastrowFunction(SqlFunctionCtx* pCtx) {
SInputColumnInfoData* pInput = &pCtx->input;
SColumnInfoData* pInputCol = pInput->pData[0];
int32_t type = pInputCol->info.type;
int32_t type = pInputCol->info.type;
int32_t bytes = pInputCol->info.bytes;
pInfo->bytes = bytes;
......@@ -5999,7 +6001,7 @@ int32_t lastrowFunction(SqlFunctionCtx* pCtx) {
char* data = colDataGetData(pInputCol, i);
TSKEY cts = getRowPTs(pInput->pPTS, i);
if (pResInfo->numOfRes == 0 || *(TSKEY*)(pInfo->buf + bytes) < cts) {
if (pResInfo->numOfRes == 0 || pInfo->ts < cts) {
if (colDataIsNull_s(pInputCol, i)) {
pInfo->isNull = true;
......@@ -6012,8 +6014,7 @@ int32_t lastrowFunction(SqlFunctionCtx* pCtx) {
memcpy(pInfo->buf, data, bytes);
}
*(TSKEY*)(pInfo->buf + bytes) = cts;
pInfo->ts = cts;
pInfo->hasResult = true;
pResInfo->numOfRes = 1;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册