提交 d200fe51 编写于 作者: H Haojun Liao

refactor: do some internal refactor.

上级 8bf01cde
......@@ -137,6 +137,9 @@ static FORCE_INLINE void colDataAppendNNULL(SColumnInfoData* pColumnInfoData, ui
for (int32_t i = start; i < start + nRows; ++i) {
colDataSetNull_f(pColumnInfoData->nullbitmap, i);
}
int32_t bytes = pColumnInfoData->info.bytes;
memset(pColumnInfoData->pData + start * bytes, 0, nRows * bytes);
}
pColumnInfoData->hasNull = true;
......
......@@ -1126,26 +1126,27 @@ int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullF
}
void blockDataCleanup(SSDataBlock* pDataBlock) {
pDataBlock->info.rows = 0;
pDataBlock->info.groupId = 0;
SDataBlockInfo* pInfo = &pDataBlock->info;
pDataBlock->info.window.ekey = 0;
pDataBlock->info.window.skey = 0;
pInfo->rows = 0;
pInfo->groupId = 0;
pInfo->window.ekey = 0;
pInfo->window.skey = 0;
if (pDataBlock->info.capacity == 0) {
if (pInfo->capacity == 0) {
return;
}
size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i);
colInfoDataCleanup(p, pDataBlock->info.capacity);
colInfoDataCleanup(p, pInfo->capacity);
}
}
static int32_t doEnsureCapacity(SColumnInfoData* pColumn, const SDataBlockInfo* pBlockInfo, uint32_t numOfRows, bool clearPayload) {
ASSERT(numOfRows > 0 && pBlockInfo->capacity >= pBlockInfo->rows);
if (numOfRows < pBlockInfo->capacity) {
if (numOfRows <= pBlockInfo->capacity) {
return TSDB_CODE_SUCCESS;
}
......@@ -1193,6 +1194,7 @@ static int32_t doEnsureCapacity(SColumnInfoData* pColumn, const SDataBlockInfo*
void colInfoDataCleanup(SColumnInfoData* pColumn, uint32_t numOfRows) {
pColumn->hasNull = false;
if (IS_VAR_DATA_TYPE(pColumn->info.type)) {
pColumn->varmeta.length = 0;
if (pColumn->varmeta.offset != NULL) {
......@@ -1212,23 +1214,20 @@ int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, uint32_t numOfRows,
int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows) {
int32_t code = 0;
if (numOfRows == 0) {
if (numOfRows == 0 || numOfRows <= pDataBlock->info.capacity) {
return TSDB_CODE_SUCCESS;
}
if (pDataBlock->info.capacity < numOfRows) {
pDataBlock->info.capacity = numOfRows;
}
size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i);
code = doEnsureCapacity(p, &pDataBlock->info, numOfRows, false);
code = doEnsureCapacity(p, &pDataBlock->info, numOfRows, true);
if (code) {
return code;
}
}
pDataBlock->info.capacity = numOfRows;
return TSDB_CODE_SUCCESS;
}
......
......@@ -1065,8 +1065,8 @@ void tdSTSRowIterInit(STSRowIter *pIter, STSchema *pSchema) {
void tTSRowGetVal(STSRow *pRow, STSchema *pTSchema, int16_t iCol, SColVal *pColVal) {
STColumn *pTColumn = &pTSchema->columns[iCol];
SCellVal cv;
SValue value;
SCellVal cv = {0};
SValue value = {0};
ASSERT((pTColumn->colId == PRIMARYKEY_TIMESTAMP_COL_ID) || (iCol > 0));
......
......@@ -240,25 +240,22 @@ static int32_t mndProcessRetrieveSysTableReq(SRpcMsg *pReq) {
return -1;
}
int32_t numOfCols = pShow->pMeta->numOfColumns;
SSDataBlock *pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
pBlock->pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData));
int32_t numOfCols = pShow->pMeta->numOfColumns;
SSDataBlock *pBlock = createDataBlock();
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData idata = {0};
SSchema *p = &pShow->pMeta->pSchemas[i];
SSchema *p = &pShow->pMeta->pSchemas[i];
idata.info.bytes = p->bytes;
idata.info.type = p->type;
idata.info.colId = p->colId;
taosArrayPush(pBlock->pDataBlock, &idata);
if (IS_VAR_DATA_TYPE(p->type)) {
pBlock->info.hasVarCol = true;
}
blockDataAppendColInfo(pBlock, &idata);
}
blockDataEnsureCapacity(pBlock, rowsToRead);
if (mndCheckRetrieveFinished(pShow)) {
mDebug("show:0x%" PRIx64 ", read finished, numOfRows:%d", pShow->id, pShow->numOfRows);
rowsRead = 0;
......
......@@ -544,7 +544,6 @@ static SSDataBlock* createResBlock(SQueryTableDataCond* pCond, int32_t capacity)
taosMemoryFree(pResBlock);
return NULL;
}
return pResBlock;
}
......
......@@ -336,8 +336,6 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SGroupbyOperatorInfo* pInfo = pOperator->info;
SSDataBlock* pRes = pInfo->binfo.pRes;
if (pOperator->status == OP_RES_TO_RETURN) {
return buildGroupResultDataBlock(pOperator);
}
......@@ -390,7 +388,6 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
}
}
#endif
blockDataEnsureCapacity(pRes, pOperator->resultInfo.capacity);
initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, 0);
pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
......@@ -422,6 +419,8 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode*
}
initResultSizeInfo(&pOperator->resultInfo, 4096);
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pInfo->pGroupCols);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
......
......@@ -373,17 +373,6 @@ void applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo
}
}
static void ensureBlockCapacity(SSDataBlock* pBlock, int32_t capacity) {
// keep the value of rows temporarily
int32_t rows = pBlock->info.rows;
pBlock->info.rows = 0;
blockDataEnsureCapacity(pBlock, capacity);
// restore the rows number
pBlock->info.rows = rows;
}
static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock,
uint32_t* status) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
......@@ -414,11 +403,6 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
} else if (*status == FUNC_DATA_REQUIRED_NOT_LOAD) {
qDebug("%s data block skipped, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
if (pTableScanInfo->pseudoSup.numOfExprs > 0) {
ensureBlockCapacity(pBlock, pBlock->info.rows);
}
doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, 1);
pCost->skipBlocks += 1;
return TSDB_CODE_SUCCESS;
......@@ -429,10 +413,6 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
if (success) { // failed to load the block sma data, data block statistics does not exist, load data block instead
qDebug("%s data block SMA loaded, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
if (pTableScanInfo->pseudoSup.numOfExprs > 0) {
ensureBlockCapacity(pBlock, pBlock->info.rows);
}
doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, 1);
return TSDB_CODE_SUCCESS;
} else {
......@@ -482,7 +462,6 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
return terrno;
}
ensureBlockCapacity(pBlock, pBlock->info.rows);
relocateColumnData(pBlock, pTableScanInfo->matchInfo.pList, pCols, true);
doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, pBlock->info.rows);
......@@ -636,9 +615,13 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
}
blockDataCleanup(pBlock);
SDataBlockInfo* pBInfo = &pBlock->info;
tsdbRetrieveDataBlockInfo(pTableScanInfo->dataReader, &pBInfo->rows, &pBInfo->uid, &pBInfo->window);
int32_t rows = 0;
tsdbRetrieveDataBlockInfo(pTableScanInfo->dataReader, &rows, &pBInfo->uid, &pBInfo->window);
blockDataEnsureCapacity(pBlock, rows);
pBInfo->rows = rows;
ASSERT(pBInfo->uid != 0);
pBlock->info.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.uid);
......@@ -686,7 +669,6 @@ static SSDataBlock* doGroupedTableScan(SOperatorInfo* pOperator) {
while (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) {
SSDataBlock* p = doTableScanImpl(pOperator);
if (p != NULL) {
ASSERT(p->info.uid != 0);
return p;
}
......@@ -874,9 +856,12 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
pInfo->sample.seed = taosGetTimestampSec();
pInfo->dataBlockLoadFlag = pTableScanNode->dataRequired;
initResultSizeInfo(&pOperator->resultInfo, 4096);
pInfo->pResBlock = createResDataBlock(pDescNode);
pInfo->pFilterNode = pScanNode->node.pConditions;
blockDataEnsureCapacity(pInfo->pResBlock, pOperator->resultInfo.capacity);
pInfo->pFilterNode = pScanNode->node.pConditions;
if (pInfo->pFilterNode != NULL) {
code = filterInitFromNode((SNode*)pInfo->pFilterNode, &pOperator->exprSupp.pFilterInfo, 0);
if (code != TSDB_CODE_SUCCESS) {
......@@ -1002,12 +987,10 @@ static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) {
tSerializeBlockDistInfo(varDataVal(p), len, &blockDistInfo);
varDataSetLen(p, len);
blockDataEnsureCapacity(pBlock, 1);
colDataAppend(pColInfo, 0, p, false);
taosMemoryFree(p);
pBlock->info.rows = 1;
pOperator->status = OP_EXEC_DONE;
return pBlock;
}
......@@ -1073,7 +1056,9 @@ SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDi
pInfo->readHandle = *readHandle;
pInfo->uid = pBlockScanNode->suid;
pInfo->pResBlock = createResDataBlock(pBlockScanNode->node.pOutputDataBlockDesc);
blockDataEnsureCapacity(pInfo->pResBlock, 1);
int32_t numOfCols = 0;
SExprInfo* pExprInfo = createExprInfo(pBlockScanNode->pScanPseudoCols, NULL, &numOfCols);
......@@ -4601,7 +4586,6 @@ SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, SSDataBlock*
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
blockDataCleanup(pResBlock);
blockDataEnsureCapacity(pResBlock, capacity);
while (1) {
STupleHandle* pTupleHandle = tsortNextTuple(pHandle);
......@@ -4761,7 +4745,10 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
pInfo->tableListInfo = pTableListInfo;
pInfo->scanFlag = MAIN_SCAN;
initResultSizeInfo(&pOperator->resultInfo, 1024);
pInfo->pResBlock = createResDataBlock(pDescNode);
blockDataEnsureCapacity(pInfo->pResBlock, pOperator->resultInfo.capacity);
pInfo->sortSourceParams = taosArrayInit(64, sizeof(STableMergeScanSortSourceParam));
pInfo->pSortInfo = generateSortByTsInfo(pInfo->matchInfo.pList, pInfo->cond.order);
......@@ -4778,7 +4765,6 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
pOperator->info = pInfo;
pOperator->exprSupp.numOfExprs = numOfCols;
pOperator->pTaskInfo = pTaskInfo;
initResultSizeInfo(&pOperator->resultInfo, 1024);
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableMergeScan, NULL, NULL,
destroyTableMergeScanOperatorInfo, getTableMergeScanExplainExecInfo);
......
......@@ -579,12 +579,13 @@ int32_t doOpenMultiwayMergeOperator(SOperatorInfo* pOperator) {
return TSDB_CODE_SUCCESS;
}
SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity,
SArray* pColMatchInfo, SOperatorInfo* pOperator) {
SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, SArray* pColMatchInfo,
SOperatorInfo* pOperator) {
SMultiwayMergeOperatorInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
blockDataCleanup(pDataBlock);
int32_t capacity = pDataBlock->info.capacity;
SSDataBlock* p = tsortGetSortedDataBlock(pHandle);
if (p == NULL) {
......@@ -679,8 +680,7 @@ SSDataBlock* doMultiwayMerge(SOperatorInfo* pOperator) {
T_LONG_JMP(pTaskInfo->env, code);
}
SSDataBlock* pBlock = getMultiwaySortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes,
pOperator->resultInfo.capacity, pInfo->matchInfo.pList, pOperator);
SSDataBlock* pBlock = getMultiwaySortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pInfo->matchInfo.pList, pOperator);
if (pBlock != NULL) {
pOperator->resultInfo.totalRows += pBlock->info.rows;
} else {
......@@ -742,7 +742,9 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size
SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0);
SSDataBlock* pInputBlock = createResDataBlock(pChildNode->pOutputDataBlockDesc);
initResultSizeInfo(&pOperator->resultInfo, 1024);
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
pInfo->groupSort = pMergePhyNode->groupSort;
pInfo->pSortInfo = createSortInfo(pMergePhyNode->pMergeKeys);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册