提交 d9aa35ec 编写于 作者: H Haojun Liao

fix(query): revise the output data column info in the exchange operator to be...

fix(query): revise the output data column info in the exchange operator to be adaptable for the case of extensive datablocks.
上级 8f3ecd52
......@@ -1039,6 +1039,7 @@ typedef struct {
int8_t compressed;
int32_t compLen;
int32_t numOfRows;
int32_t numOfCols;
char data[];
} SRetrieveTableRsp;
......
......@@ -45,6 +45,7 @@ typedef struct SInputData {
typedef struct SOutputData {
int32_t numOfRows;
int32_t numOfCols;
int8_t compressed;
char* pData;
bool queryEnd;
......
......@@ -14,7 +14,6 @@
*/
#define _DEFAULT_SOURCE
#include "mndPerfSchema.h"
#include "mndInt.h"
#include "systable.h"
......
......@@ -695,7 +695,7 @@ SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntim
SExprInfo* pExpr, int32_t numOfOutput);
#endif
void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx,
int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx,
int32_t numOfOutput, SArray* pPseudoList);
void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order, bool createDummyCol);
......
......@@ -31,6 +31,7 @@ typedef struct SDataDispatchBuf {
typedef struct SDataCacheEntry {
int32_t dataLen;
int32_t numOfRows;
int32_t numOfCols;
int8_t compressed;
char data[];
} SDataCacheEntry;
......@@ -76,6 +77,7 @@ static void toDataCacheEntry(const SDataDispatchHandle* pHandle, const SInputDat
SDataCacheEntry* pEntry = (SDataCacheEntry*)pBuf->pData;
pEntry->compressed = (int8_t)needCompress(pInput->pData, numOfCols);
pEntry->numOfRows = pInput->pData->info.rows;
pEntry->numOfCols = pInput->pData->info.numOfCols;
pEntry->dataLen = 0;
pBuf->useSize = sizeof(SRetrieveTableRsp);
......@@ -169,6 +171,7 @@ static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) {
SDataCacheEntry* pEntry = (SDataCacheEntry*)(pDispatcher->nextOutput.pData);
memcpy(pOutput->pData, pEntry->data, pEntry->dataLen);
pOutput->numOfRows = pEntry->numOfRows;
pOutput->numOfCols = pEntry->numOfCols;
pOutput->compressed = pEntry->compressed;
taosMemoryFreeClear(pDispatcher->nextOutput.pData); // todo persistent
pOutput->bufStatus = updateStatus(pDispatcher);
......
......@@ -1177,7 +1177,7 @@ static void setPseudoOutputColInfo(SSDataBlock* pResult, SqlFunctionCtx* pCtx, S
}
}
void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx,
int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx,
int32_t numOfOutput, SArray* pPseudoList) {
setPseudoOutputColInfo(pResult, pCtx, pPseudoList);
pResult->info.groupId = pSrcBlock->info.groupId;
......@@ -1217,7 +1217,7 @@ void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock*
taosArrayPush(pBlockList, &pSrcBlock);
SColumnInfoData* pResColData = taosArrayGet(pResult->pDataBlock, outputSlotId);
SColumnInfoData idata = {.info = pResColData->info};
SColumnInfoData idata = {.info = pResColData->info, .hasNull = true};
SScalarParam dest = {.columnData = &idata};
scalarCalculate(pExpr[k].pExpr->_optrRoot.pRootNode, pBlockList, &dest);
......@@ -1254,10 +1254,14 @@ void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock*
taosArrayPush(pBlockList, &pSrcBlock);
SColumnInfoData* pResColData = taosArrayGet(pResult->pDataBlock, outputSlotId);
SColumnInfoData idata = {.info = pResColData->info};
SColumnInfoData idata = {.info = pResColData->info, .hasNull = true};
SScalarParam dest = {.columnData = &idata};
scalarCalculate((SNode*)pExpr[k].pExpr->_function.pFunctNode, pBlockList, &dest);
int32_t code = scalarCalculate((SNode*)pExpr[k].pExpr->_function.pFunctNode, pBlockList, &dest);
if (code != TSDB_CODE_SUCCESS) {
taosArrayDestroy(pBlockList);
return code;
}
int32_t startOffset = createNewColModel ? 0 : pResult->info.rows;
colDataMergeCol(pResColData, startOffset, &idata, dest.numOfRows);
......@@ -1273,6 +1277,8 @@ void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock*
if (!createNewColModel) {
pResult->info.rows += numOfRows;
}
return TSDB_CODE_SUCCESS;
}
void doTimeWindowInterpolation(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo, SArray* pDataBlock, TSKEY prevTs,
......@@ -3668,6 +3674,7 @@ int32_t loadRemoteDataCallback(void* param, const SDataBuf* pMsg, int32_t code)
SRetrieveTableRsp* pRsp = pSourceDataInfo->pRsp;
pRsp->numOfRows = htonl(pRsp->numOfRows);
pRsp->compLen = htonl(pRsp->compLen);
pRsp->numOfCols = htonl(pRsp->numOfCols);
pRsp->useconds = htobe64(pRsp->useconds);
} else {
pSourceDataInfo->code = code;
......@@ -3951,7 +3958,7 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx
SRetrieveTableRsp* pTableRsp = pDataInfo->pRsp;
code =
setSDataBlockFromFetchRsp(pExchangeInfo->pResult, pLoadInfo, pTableRsp->numOfRows, pTableRsp->data,
pTableRsp->compLen, pOperator->numOfOutput, startTs, &pDataInfo->totalRows, NULL);
pTableRsp->compLen, pTableRsp->numOfCols, startTs, &pDataInfo->totalRows, NULL);
if (code != 0) {
goto _error;
}
......@@ -4086,7 +4093,7 @@ static SSDataBlock* seqLoadRemoteData(SOperatorInfo* pOperator) {
SRetrieveTableRsp* pTableRsp = pDataInfo->pRsp;
int32_t code =
setSDataBlockFromFetchRsp(pExchangeInfo->pResult, pLoadInfo, pTableRsp->numOfRows, pTableRsp->data,
pTableRsp->compLen, pOperator->numOfOutput, startTs, &pDataInfo->totalRows, NULL);
pTableRsp->compLen, pTableRsp->numOfCols, startTs, &pDataInfo->totalRows, NULL);
if (pRsp->completed == 1) {
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, rowsOfSource:%" PRIu64
......@@ -4771,8 +4778,12 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
// there is an scalar expression that needs to be calculated before apply the group aggregation.
if (pAggInfo->pScalarExprInfo != NULL) {
projectApplyFunctions(pAggInfo->pScalarExprInfo, pBlock, pBlock, pAggInfo->pScalarCtx, pAggInfo->numOfScalarExpr,
int32_t code = projectApplyFunctions(pAggInfo->pScalarExprInfo, pBlock, pBlock, pAggInfo->pScalarCtx, pAggInfo->numOfScalarExpr,
NULL);
if (code != TSDB_CODE_SUCCESS) {
pTaskInfo->code = code;
longjmp(pTaskInfo->env, pTaskInfo->code);
}
}
// the pDataBlock are always the same one, no need to call this again
......
......@@ -184,6 +184,8 @@ int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableScanInfo,
qDebug("%s data block skipped, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
pCost->skipBlocks += 1;
pBlock->info.blockId = 0;
return TSDB_CODE_SUCCESS;
} else if (*status == FUNC_DATA_REQUIRED_STATIS_LOAD) {
pCost->loadBlockStatis += 1;
......@@ -204,6 +206,7 @@ int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableScanInfo,
pBlock->pBlockAgg[pColMatchInfo->targetSlotId] = pColAgg[i];
}
pBlock->info.blockId = 0;
return TSDB_CODE_SUCCESS;
} else { // failed to load the block sma data, data block statistics does not exist, load data block instead
*status = FUNC_DATA_REQUIRED_DATA_LOAD;
......
......@@ -34,6 +34,7 @@ void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComplete)
rsp->compressed = input->compressed;
rsp->compLen = htonl(len);
rsp->numOfRows = htonl(input->numOfRows);
rsp->numOfCols = htonl(input->numOfCols);
}
void qwFreeFetchRsp(void *msg) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册