提交 b67c6531 编写于 作者: S slzhou

fix: add group to multi-merge operator

上级 732a91c9
......@@ -236,7 +236,7 @@ void blockCompressEncode(const SSDataBlock* pBlock, char* data, int32_t*
int8_t needCompress);
const char* blockCompressDecode(SSDataBlock* pBlock, int32_t numOfCols, int32_t numOfRows, const char* pData);
void blockDebugShowData(const SArray* dataBlocks, const char* flag);
void blockDebugShowDataBlocks(const SArray* dataBlocks, const char* flag);
// for debug
char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** dumpBuf);
......
......@@ -1613,7 +1613,7 @@ void blockDebugShowData(const SArray* dataBlocks, const char* flag) {
size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
int32_t rows = pDataBlock->info.rows;
printf("%s |block type %d |child id %d|\n", flag, (int32_t)pDataBlock->info.type, pDataBlock->info.childId);
printf("%s |block type %d |child id %d|group id %zX\n", flag, (int32_t)pDataBlock->info.type, pDataBlock->info.childId, pDataBlock->info.groupId);
for (int32_t j = 0; j < rows; j++) {
printf("%s |", flag);
for (int32_t k = 0; k < numOfCols; k++) {
......
......@@ -462,7 +462,7 @@ static int32_t tdFetchAndSubmitRSmaResult(SRSmaInfoItem *pItem, int8_t blkType)
#if 1
char flag[10] = {0};
snprintf(flag, 10, "level %" PRIi8, pItem->level);
blockDebugShowData(pResult, flag);
blockDebugShowDataBlocks(pResult, flag);
#endif
STsdb *sinkTsdb = (pItem->level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb1 : pSma->pRSmaTsdb2);
SSubmitReq *pReq = NULL;
......
......@@ -299,7 +299,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, int64_t version, SRpcMsg *pMsg, SRp
void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) {
// TODO
blockDebugShowData(data, __func__);
blockDebugShowDataBlocks(data, __func__);
tdProcessTSmaInsert(((SVnode *)pVnode)->pSma, smaId, (const char *)data);
}
......
......@@ -136,6 +136,13 @@ bool tsortIsNullVal(STupleHandle* pVHandle, int32_t colId);
*/
void* tsortGetValue(STupleHandle* pVHandle, int32_t colId);
/**
*
* @param pVHandle
* @return
*/
uint64_t tsortGetGroupId(STupleHandle* pVHandle);
/**
*
* @param pSortHandle
......
......@@ -142,7 +142,8 @@ SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, i
SSDataBlock* loadNextDataBlock(void* param) {
SOperatorInfo* pOperator = (SOperatorInfo*)param;
return pOperator->fpSet.getNextFn(pOperator);
SSDataBlock *pBlock = pOperator->fpSet.getNextFn(pOperator);
return pBlock;
}
// todo refactor: merged with fetch fp
......@@ -505,7 +506,9 @@ typedef struct SMultiwaySortMergeOperatorInfo {
SSDataBlock* pInputBlock;
int64_t startTs; // sort start time
uint64_t groupId;
bool hasGroupId;
uint64_t groupId;
STupleHandle *prefetchedTuple;
} SMultiwaySortMergeOperatorInfo;
int32_t doOpenMultiwaySortMergeOperator(SOperatorInfo* pOperator) {
......@@ -560,12 +563,30 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData
blockDataEnsureCapacity(p, capacity);
while (1) {
STupleHandle* pTupleHandle = tsortNextTuple(pHandle);
STupleHandle* pTupleHandle = NULL;
if (pInfo->prefetchedTuple == NULL) {
pTupleHandle = tsortNextTuple(pHandle);
} else {
pTupleHandle = pInfo->prefetchedTuple;
pInfo->groupId = tsortGetGroupId(pTupleHandle);
pInfo->prefetchedTuple = NULL;
}
if (pTupleHandle == NULL) {
break;
}
appendOneRowToDataBlock(p, pTupleHandle);
uint64_t tupleGroupId = tsortGetGroupId(pTupleHandle);
if (!pInfo->hasGroupId) {
pInfo->groupId = tupleGroupId;
pInfo->hasGroupId = true;
appendOneRowToDataBlock(p, pTupleHandle);
} else if (pInfo->groupId == tupleGroupId) {
appendOneRowToDataBlock(p, pTupleHandle);
} else {
pInfo->prefetchedTuple = pTupleHandle;
break;
}
if (p->info.rows >= capacity) {
break;
}
......@@ -608,7 +629,6 @@ SSDataBlock* doMultiwaySortMerge(SOperatorInfo* pOperator) {
SSDataBlock* pBlock = getMultiwaySortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes,
pOperator->resultInfo.capacity, pInfo->pColMatchInfo, pOperator);
if (pBlock != NULL) {
pOperator->resultInfo.totalRows += pBlock->info.rows;
} else {
......
......@@ -752,6 +752,10 @@ void* tsortGetValue(STupleHandle* pVHandle, int32_t colIndex) {
}
}
uint64_t tsortGetGroupId(STupleHandle* pVHandle) {
return pVHandle->pBlock->info.groupId;
}
SSortExecInfo tsortGetSortExecInfo(SSortHandle* pHandle) {
SSortExecInfo info = {0};
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册