未验证 提交 8686b228 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #16582 from taosdata/feature/3_liaohj

fix(query): check node type when extract column info.
......@@ -184,9 +184,9 @@ static int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pR
static void setComposedBlockFlag(STsdbReader* pReader, bool composed);
static bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order);
static void doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, STSRow** pTSRow,
static int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, STSRow** pTSRow,
STsdbReader* pReader, bool* freeTSRow);
static void doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader,
static int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader,
STSRow** pTSRow);
static int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key, STsdbReader* pReader);
......@@ -1395,7 +1395,11 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
}
}
tRowMergerGetRow(&merge, &pTSRow);
int32_t code = tRowMergerGetRow(&merge, &pTSRow);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);
taosMemoryFree(pTSRow);
......@@ -1422,7 +1426,11 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
}
tRowMergerGetRow(&merge, &pTSRow);
int32_t code = tRowMergerGetRow(&merge, &pTSRow);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);
taosMemoryFree(pTSRow);
......@@ -1456,12 +1464,16 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, ts, &merge);
tRowMergerGetRow(&merge, &pTSRow);
int32_t code = tRowMergerGetRow(&merge, &pTSRow);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);
taosMemoryFree(pTSRow);
tRowMergerClear(&merge);
return TSDB_CODE_SUCCESS;
return code;
} else {
ASSERT(0);
return TSDB_CODE_SUCCESS;
......@@ -1618,12 +1630,16 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
}
}
tRowMergerGetRow(&merge, &pTSRow);
int32_t code = tRowMergerGetRow(&merge, &pTSRow);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);
taosMemoryFree(pTSRow);
tRowMergerClear(&merge);
return TSDB_CODE_SUCCESS;
return code;
}
#if 0
......@@ -1907,7 +1923,11 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc
tRowMergerInit(&merge, &fRow, pReader->pSchema);
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
tRowMergerGetRow(&merge, &pTSRow);
int32_t code = tRowMergerGetRow(&merge, &pTSRow);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);
taosMemoryFree(pTSRow);
......@@ -3026,8 +3046,8 @@ int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockSc
return TSDB_CODE_SUCCESS;
}
void doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, STSRow** pTSRow,
STsdbReader* pReader, bool* freeTSRow) {
int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, STSRow** pTSRow,
STsdbReader* pReader, bool* freeTSRow) {
TSDBROW* pNextRow = NULL;
TSDBROW current = *pRow;
......@@ -3037,19 +3057,19 @@ void doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SAr
if (!pIter->hasVal) {
*pTSRow = current.pTSRow;
*freeTSRow = false;
return;
return TSDB_CODE_SUCCESS;
} else { // has next point in mem/imem
pNextRow = getValidMemRow(pIter, pDelList, pReader);
if (pNextRow == NULL) {
*pTSRow = current.pTSRow;
*freeTSRow = false;
return;
return TSDB_CODE_SUCCESS;
}
if (current.pTSRow->ts != pNextRow->pTSRow->ts) {
*pTSRow = current.pTSRow;
*freeTSRow = false;
return;
return TSDB_CODE_SUCCESS;
}
}
}
......@@ -3069,13 +3089,17 @@ void doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SAr
tRowMergerAdd(&merge, pNextRow, pTSchema1);
doMergeRowsInBuf(pIter, uid, current.pTSRow->ts, pDelList, &merge, pReader);
tRowMergerGetRow(&merge, pTSRow);
tRowMergerClear(&merge);
int32_t code = tRowMergerGetRow(&merge, pTSRow);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
tRowMergerClear(&merge);
*freeTSRow = true;
return TSDB_CODE_SUCCESS;
}
void doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader,
int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader,
STSRow** pTSRow) {
SRowMerger merge = {0};
......@@ -3100,7 +3124,8 @@ void doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlo
doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
}
tRowMergerGetRow(&merge, pTSRow);
int32_t code = tRowMergerGetRow(&merge, pTSRow);
return code;
}
int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STSRow** pTSRow, int64_t endKey,
......@@ -3130,28 +3155,30 @@ int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pR
TSDBKEY k = TSDBROW_KEY(pRow);
TSDBKEY ik = TSDBROW_KEY(piRow);
int32_t code = TSDB_CODE_SUCCESS;
if (ik.ts != k.ts) {
if (((ik.ts < k.ts) && asc) || ((ik.ts > k.ts) && (!asc))) { // ik.ts < k.ts
doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, pTSRow, pReader, freeTSRow);
code = doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, pTSRow, pReader, freeTSRow);
} else if (((k.ts < ik.ts) && asc) || ((k.ts > ik.ts) && (!asc))) {
doMergeMemTableMultiRows(pRow, uid, &pBlockScanInfo->iter, pDelList, pTSRow, pReader, freeTSRow);
code = doMergeMemTableMultiRows(pRow, uid, &pBlockScanInfo->iter, pDelList, pTSRow, pReader, freeTSRow);
}
} else { // ik.ts == k.ts
doMergeMemIMemRows(pRow, piRow, pBlockScanInfo, pReader, pTSRow);
*freeTSRow = true;
code = doMergeMemIMemRows(pRow, piRow, pBlockScanInfo, pReader, pTSRow);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
return TSDB_CODE_SUCCESS;
return code;
}
if (pBlockScanInfo->iter.hasVal && pRow != NULL) {
doMergeMemTableMultiRows(pRow, pBlockScanInfo->uid, &pBlockScanInfo->iter, pDelList, pTSRow, pReader, freeTSRow);
return TSDB_CODE_SUCCESS;
return doMergeMemTableMultiRows(pRow, pBlockScanInfo->uid, &pBlockScanInfo->iter, pDelList, pTSRow, pReader, freeTSRow);
}
if (pBlockScanInfo->iiter.hasVal && piRow != NULL) {
doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, pTSRow, pReader, freeTSRow);
return TSDB_CODE_SUCCESS;
return doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, pTSRow, pReader, freeTSRow);
}
return TSDB_CODE_SUCCESS;
......
......@@ -938,15 +938,17 @@ SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNod
for (int32_t i = 0; i < numOfCols; ++i) {
STargetNode* pNode = (STargetNode*)nodesListGetNode(pNodeList, i);
SColumnNode* pColNode = (SColumnNode*)pNode->pExpr;
SColMatchInfo c = {0};
c.output = true;
c.colId = pColNode->colId;
c.srcSlotId = pColNode->slotId;
c.matchType = type;
c.targetSlotId = pNode->slotId;
taosArrayPush(pList, &c);
if (nodeType(pNode->pExpr) == QUERY_NODE_COLUMN) {
SColumnNode* pColNode = (SColumnNode*)pNode->pExpr;
SColMatchInfo c = {0};
c.output = true;
c.colId = pColNode->colId;
c.srcSlotId = pColNode->slotId;
c.matchType = type;
c.targetSlotId = pNode->slotId;
taosArrayPush(pList, &c);
}
}
*numOfOutputCols = 0;
......
......@@ -263,8 +263,6 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
ASSERT(pSup->resultRowSize > 0);
pResult = getNewResultRow(pResultBuf, &pSup->currentPageId, pSup->resultRowSize);
initResultRow(pResult);
// add a new result set for a new group
SResultRowPosition pos = {.pageId = pResult->pageId, .offset = pResult->offset};
tSimpleHashPut(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), &pos,
......@@ -817,13 +815,6 @@ void setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, SSDataBlock* pB
} else {
pInput->colDataAggIsSet = false;
}
// set the statistics data for primary time stamp column
// if (pCtx->functionId == FUNCTION_SPREAD && pColumn->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
// pCtx->isAggSet = true;
// pCtx->agg.min = pBlock->info.window.skey;
// pCtx->agg.max = pBlock->info.window.ekey;
// }
}
bool isTaskKilled(SExecTaskInfo* pTaskInfo) {
......@@ -860,146 +851,6 @@ STimeWindow getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int
return win;
}
#if 0
static int32_t updateBlockLoadStatus(STaskAttr* pQuery, int32_t status) {
bool hasFirstLastFunc = false;
bool hasOtherFunc = false;
if (status == BLK_DATA_DATA_LOAD || status == BLK_DATA_FILTEROUT) {
return status;
}
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
int32_t functionId = getExprFunctionId(&pQuery->pExpr1[i]);
if (functionId == FUNCTION_TS || functionId == FUNCTION_TS_DUMMY || functionId == FUNCTION_TAG ||
functionId == FUNCTION_TAG_DUMMY) {
continue;
}
if (functionId == FUNCTION_FIRST_DST || functionId == FUNCTION_LAST_DST) {
hasFirstLastFunc = true;
} else {
hasOtherFunc = true;
}
}
if (hasFirstLastFunc && status == BLK_DATA_NOT_LOAD) {
if (!hasOtherFunc) {
return BLK_DATA_FILTEROUT;
} else {
return BLK_DATA_DATA_LOAD;
}
}
return status;
}
#endif
// static void updateDataCheckOrder(SQInfo *pQInfo, SQueryTableReq* pQueryMsg, bool stableQuery) {
// STaskAttr* pQueryAttr = pQInfo->runtimeEnv.pQueryAttr;
//
// // in case of point-interpolation query, use asc order scan
// char msg[] = "QInfo:0x%"PRIx64" scan order changed for %s query, old:%d, new:%d, qrange exchanged, old qrange:%"
// PRId64
// "-%" PRId64 ", new qrange:%" PRId64 "-%" PRId64;
//
// // todo handle the case the the order irrelevant query type mixed up with order critical query type
// // descending order query for last_row query
// if (isFirstLastRowQuery(pQueryAttr)) {
// //qDebug("QInfo:0x%"PRIx64" scan order changed for last_row query, old:%d, new:%d", pQInfo->qId,
// pQueryAttr->order.order, TSDB_ORDER_ASC);
//
// pQueryAttr->order.order = TSDB_ORDER_ASC;
// if (pQueryAttr->window.skey > pQueryAttr->window.ekey) {
// TSWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey);
// }
//
// pQueryAttr->needReverseScan = false;
// return;
// }
//
// if (pQueryAttr->groupbyColumn && pQueryAttr->order.order == TSDB_ORDER_DESC) {
// pQueryAttr->order.order = TSDB_ORDER_ASC;
// if (pQueryAttr->window.skey > pQueryAttr->window.ekey) {
// TSWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey);
// }
//
// pQueryAttr->needReverseScan = false;
// doUpdateLastKey(pQueryAttr);
// return;
// }
//
// if (pQueryAttr->pointInterpQuery && pQueryAttr->interval.interval == 0) {
// if (!QUERY_IS_ASC_QUERY(pQueryAttr)) {
// //qDebug(msg, pQInfo->qId, "interp", pQueryAttr->order.order, TSDB_ORDER_ASC, pQueryAttr->window.skey,
// pQueryAttr->window.ekey, pQueryAttr->window.ekey, pQueryAttr->window.skey); TSWAP(pQueryAttr->window.skey,
// pQueryAttr->window.ekey, TSKEY);
// }
//
// pQueryAttr->order.order = TSDB_ORDER_ASC;
// return;
// }
//
// if (pQueryAttr->interval.interval == 0) {
// if (onlyFirstQuery(pQueryAttr)) {
// if (!QUERY_IS_ASC_QUERY(pQueryAttr)) {
// //qDebug(msg, pQInfo->qId, "only-first", pQueryAttr->order.order, TSDB_ORDER_ASC, pQueryAttr->window.skey,
//// pQueryAttr->window.ekey, pQueryAttr->window.ekey, pQueryAttr->window.skey);
//
// TSWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey);
// doUpdateLastKey(pQueryAttr);
// }
//
// pQueryAttr->order.order = TSDB_ORDER_ASC;
// pQueryAttr->needReverseScan = false;
// } else if (onlyLastQuery(pQueryAttr) && notContainSessionOrStateWindow(pQueryAttr)) {
// if (QUERY_IS_ASC_QUERY(pQueryAttr)) {
// //qDebug(msg, pQInfo->qId, "only-last", pQueryAttr->order.order, TSDB_ORDER_DESC, pQueryAttr->window.skey,
//// pQueryAttr->window.ekey, pQueryAttr->window.ekey, pQueryAttr->window.skey);
//
// TSWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey);
// doUpdateLastKey(pQueryAttr);
// }
//
// pQueryAttr->order.order = TSDB_ORDER_DESC;
// pQueryAttr->needReverseScan = false;
// }
//
// } else { // interval query
// if (stableQuery) {
// if (onlyFirstQuery(pQueryAttr)) {
// if (!QUERY_IS_ASC_QUERY(pQueryAttr)) {
// //qDebug(msg, pQInfo->qId, "only-first stable", pQueryAttr->order.order, TSDB_ORDER_ASC,
//// pQueryAttr->window.skey, pQueryAttr->window.ekey, pQueryAttr->window.ekey,
/// pQueryAttr->window.skey);
//
// TSWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey);
// doUpdateLastKey(pQueryAttr);
// }
//
// pQueryAttr->order.order = TSDB_ORDER_ASC;
// pQueryAttr->needReverseScan = false;
// } else if (onlyLastQuery(pQueryAttr)) {
// if (QUERY_IS_ASC_QUERY(pQueryAttr)) {
// //qDebug(msg, pQInfo->qId, "only-last stable", pQueryAttr->order.order, TSDB_ORDER_DESC,
//// pQueryAttr->window.skey, pQueryAttr->window.ekey, pQueryAttr->window.ekey,
/// pQueryAttr->window.skey);
//
// TSWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey);
// doUpdateLastKey(pQueryAttr);
// }
//
// pQueryAttr->order.order = TSDB_ORDER_DESC;
// pQueryAttr->needReverseScan = false;
// }
// }
// }
//}
#if 0
static bool overlapWithTimeWindow(STaskAttr* pQueryAttr, SDataBlockInfo* pBlockInfo) {
STimeWindow w = {0};
......@@ -1225,24 +1076,6 @@ static void updateTableQueryInfoForReverseScan(STableQueryInfo* pTableQueryInfo)
if (pTableQueryInfo == NULL) {
return;
}
// TSWAP(pTableQueryInfo->win.skey, pTableQueryInfo->win.ekey);
// pTableQueryInfo->lastKey = pTableQueryInfo->win.skey;
// SWITCH_ORDER(pTableQueryInfo->cur.order);
// pTableQueryInfo->cur.vgroupIndex = -1;
// set the index to be the end slot of result rows array
// SResultRowInfo* pResultRowInfo = &pTableQueryInfo->resInfo;
// if (pResultRowInfo->size > 0) {
// pResultRowInfo->curPos = pResultRowInfo->size - 1;
// } else {
// pResultRowInfo->curPos = -1;
// }
}
void initResultRow(SResultRow* pResultRow) {
// pResultRow->pEntryInfo = (struct SResultRowEntryInfo*)((char*)pResultRow + sizeof(SResultRow));
}
void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status) {
......@@ -1255,15 +1088,6 @@ void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status) {
}
}
void destroyTableQueryInfoImpl(STableQueryInfo* pTableQueryInfo) {
if (pTableQueryInfo == NULL) {
return;
}
// taosVariantDestroy(&pTableQueryInfo->tag);
// cleanupResultRowInfo(&pTableQueryInfo->resInfo);
}
void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset) {
bool init = false;
for (int32_t i = 0; i < numOfOutput; ++i) {
......@@ -3022,7 +2846,6 @@ int32_t aggDecodeResultRow(SOperatorInfo* pOperator, char* result) {
resultRow->offset = pOffset;
offset += valueLen;
initResultRow(resultRow);
pInfo->resultRowInfo.cur = (SResultRowPosition){.pageId = resultRow->pageId, .offset = resultRow->offset};
// releaseBufPage(pSup->pResultBuf, getBufPage(pSup->pResultBuf, pageId));
}
......
......@@ -3873,7 +3873,6 @@ static int32_t setWindowOutputBuf(SResultWindowInfo* pWinInfo, SResultRow** pRes
if (*pResult == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
initResultRow(*pResult);
// add a new result set for a new group
pWinInfo->pos.pageId = (*pResult)->pageId;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册