提交 59725d1d 编写于 作者: H Haojun Liao

refactor: do some internal refactor.

上级 8e2edd6e
......@@ -123,15 +123,13 @@ int32_t tsdbRowCmprFn(const void *p1, const void *p2);
int32_t tsdbRowIterOpen(STSDBRowIter *pIter, TSDBROW *pRow, STSchema *pTSchema);
void tsdbRowClose(STSDBRowIter *pIter);
SColVal *tsdbRowIterNext(STSDBRowIter *pIter);
// SRowMerger
int32_t tsdbRowMergerInit(SRowMerger *pMerger, STSchema *pResTSchema, TSDBROW *pRow, STSchema *pTSchema);
int32_t tsdbRowMergerInit(SRowMerger *pMerger, STSchema *pSchema);
int32_t tsdbRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema);
void tsdbRowMergerClear(SRowMerger *pMerger);
int32_t tsdbRowMergerGetRow(SRowMerger *pMerger, SRow **ppRow);
int32_t tsdbRowMergerInit_rv(SRowMerger* pMerger, STSchema *pSchema);
void tsdbRowMergerClear_rv(SRowMerger* pMerger);
void tsdbRowMergerCleanup_rv(SRowMerger* pMerger);
void tsdbRowMergerClear(SRowMerger *pMerger);
void tsdbRowMergerCleanup(SRowMerger *pMerger);
// TABLEID
int32_t tTABLEIDCmprFn(const void *p1, const void *p2);
......
......@@ -1464,7 +1464,6 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte
pBlockIter->pTableMap = pReader->status.pTableMap;
// access data blocks according to the offset of each block in asc/desc order.
// int32_t numOfTables = (int32_t)tSimpleHashGetSize(pReader->status.pTableMap);
int32_t numOfTables = taosArrayGetSize(pTableList);
int64_t st = taosGetTimestampUs();
......@@ -1474,21 +1473,10 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte
}
int32_t cnt = 0;
// void* ptr = NULL;
// int32_t iter = 0;
// while (1) {
// ptr = tSimpleHashIterate(pReader->status.pTableMap, ptr, &iter);
// if (ptr == NULL) {
// break;
// }
for(int32_t i = 0; i < numOfTables; ++i) {
for (int32_t i = 0; i < numOfTables; ++i) {
STableBlockScanInfo* pTableScanInfo = taosArrayGetP(pTableList, i);
ASSERT(pTableScanInfo->pBlockList != NULL && taosArrayGetSize(pTableScanInfo->pBlockList) > 0);
// if (pTableScanInfo->pBlockList == NULL || taosArrayGetSize(pTableScanInfo->pBlockList) == 0) {
// continue;
// }
size_t num = taosArrayGetSize(pTableScanInfo->pBlockList);
sup.numOfBlocksPerTable[sup.numOfTables] = num;
......@@ -1876,8 +1864,6 @@ static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBloc
pScanInfo->lastKeyInStt = key;
if (!hasBeenDropped(pScanInfo->delSkyline, &pScanInfo->lastBlockDelIndex, key, ver, pLastBlockReader->order, pVerRange)) {
// the qualifed ts may equal to k.ts, only a greater version one.
// here we need to fallback one step.
return true;
}
}
......@@ -1939,7 +1925,7 @@ static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader*
return NULL;
}
code = tsdbRowMergerInit_rv(&pReader->status.merger, pReader->pSchema);
code = tsdbRowMergerInit(&pReader->status.merger, pReader->pSchema);
if (code != 0) {
terrno = code;
return NULL;
......@@ -2021,7 +2007,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
if (pReader->order == TSDB_ORDER_ASC) {
if (minKey == key) {
init = true; // todo check if pReader->pSchema is null or not
int32_t code = tsdbRowMergerAdd(&pReader->status.merger, &fRow, pReader->pSchema);
int32_t code = tsdbRowMergerAdd(pMerger, &fRow, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -2031,15 +2017,15 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
if (minKey == tsLast) {
TSDBROW* fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
if (init) {
tsdbRowMergerAdd(&pReader->status.merger, fRow1, NULL);
tsdbRowMergerAdd(pMerger, fRow1, NULL);
} else {
init = true;
int32_t code = tsdbRowMergerAdd(&pReader->status.merger, fRow1, pReader->pSchema);
int32_t code = tsdbRowMergerAdd(pMerger, fRow1, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &pReader->status.merger, &pReader->verRange, pReader->idStr);
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->verRange, pReader->idStr);
}
if (minKey == k.ts) {
......@@ -2051,7 +2037,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
tsdbRowMergerAdd(pMerger, pRow, pSchema);
} else {
init = true;
int32_t code = tsdbRowMergerAdd(&pReader->status.merger, pRow, pSchema);
int32_t code = tsdbRowMergerAdd(pMerger, pRow, pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -2069,7 +2055,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
return terrno;
}
int32_t code = tsdbRowMergerAdd(&pReader->status.merger, pRow, pSchema);
int32_t code = tsdbRowMergerAdd(pMerger, pRow, pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -2086,7 +2072,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
tsdbRowMergerAdd(pMerger, fRow1, NULL);
} else {
init = true;
int32_t code = tsdbRowMergerAdd(&pReader->status.merger, fRow1, pReader->pSchema);
int32_t code = tsdbRowMergerAdd(pMerger, fRow1, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -2099,7 +2085,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
tsdbRowMergerAdd(pMerger, &fRow, NULL);
} else {
init = true;
int32_t code = tsdbRowMergerAdd(&pReader->status.merger, &fRow, pReader->pSchema);
int32_t code = tsdbRowMergerAdd(pMerger, &fRow, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -2116,7 +2102,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
code = doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
taosMemoryFree(pTSRow);
tsdbRowMergerClear_rv(pMerger);
tsdbRowMergerClear(pMerger);
return code;
}
......@@ -2149,7 +2135,7 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
pBlockScanInfo->lastKey = tsLastBlock;
return TSDB_CODE_SUCCESS;
} else {
code = tsdbRowMergerAdd(&pReader->status.merger, &fRow, pReader->pSchema);
code = tsdbRowMergerAdd(pMerger, &fRow, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -2166,14 +2152,14 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
code = doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
taosMemoryFree(pTSRow);
tsdbRowMergerClear_rv(pMerger);
tsdbRowMergerClear(pMerger);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
} else { // not merge block data
code = tsdbRowMergerAdd(&pReader->status.merger, &fRow, pReader->pSchema);
code = tsdbRowMergerAdd(pMerger, &fRow, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -2193,7 +2179,7 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
code = doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
taosMemoryFree(pTSRow);
tsdbRowMergerClear_rv(pMerger);
tsdbRowMergerClear(pMerger);
if (code != TSDB_CODE_SUCCESS) {
return code;
......@@ -2223,7 +2209,7 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader
return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
} else if (key == ts) {
SRow* pTSRow = NULL;
SRowMerger* pMerger = &pReader->status.merger;
SRowMerger* pMerger = pMerger;
int32_t code = tsdbRowMergerAdd(pMerger, &fRow, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) {
......@@ -2245,7 +2231,7 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader
code = doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
taosMemoryFree(pTSRow);
tsdbRowMergerClear_rv(pMerger);
tsdbRowMergerClear(pMerger);
return code;
} else {
return TSDB_CODE_SUCCESS;
......@@ -2454,7 +2440,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
code = doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
taosMemoryFree(pTSRow);
tsdbRowMergerClear_rv(pMerger);
tsdbRowMergerClear(pMerger);
return code;
}
......@@ -2627,7 +2613,7 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc
code = doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
taosMemoryFree(pTSRow);
tsdbRowMergerClear_rv(&pReader->status.merger);
tsdbRowMergerClear(&pReader->status.merger);
return code;
}
}
......@@ -3913,7 +3899,7 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter,
}
pResRow->type = TSDBROW_ROW_FMT;
tsdbRowMergerClear_rv(&pReader->status.merger);
tsdbRowMergerClear(&pReader->status.merger);
*freeTSRow = true;
return TSDB_CODE_SUCCESS;
......@@ -3972,7 +3958,7 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* p
}
int32_t code = tsdbRowMergerGetRow(pMerger, pTSRow);
tsdbRowMergerClear_rv(pMerger);
tsdbRowMergerClear(pMerger);
return code;
}
......@@ -4097,12 +4083,8 @@ int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, S
int32_t code = TSDB_CODE_SUCCESS;
SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
// ASSERT (pReader->suppInfo.colId[i] == PRIMARYKEY_TIMESTAMP_COL_ID);// {
// SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, pSupInfo->slotId[i]);
// ((int64_t*)pColData->pData)[outputRowIndex] = pBlockData->aTSKEY[rowIndex];
((int64_t*)pReader->status.pPrimaryTsCol->pData)[outputRowIndex] = pBlockData->aTSKEY[rowIndex];
i += 1;
// }
i += 1;
SColVal cv = {0};
int32_t numOfInputCols = pBlockData->nColData;
......@@ -4355,7 +4337,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableL
}
if (pReader->pSchema != NULL) {
tsdbRowMergerInit_rv(&pReader->status.merger, pReader->pSchema);
tsdbRowMergerInit(&pReader->status.merger, pReader->pSchema);
}
pReader->pSchemaMap = tSimpleHashInit(8, taosFastHash);
......@@ -4515,7 +4497,7 @@ void tsdbReaderClose(STsdbReader* pReader) {
taosMemoryFree(pReader->idStr);
tsdbRowMergerCleanup_rv(&pReader->status.merger);
tsdbRowMergerCleanup(&pReader->status.merger);
taosMemoryFree(pReader->pSchema);
tSimpleHashCleanup(pReader->pSchemaMap);
......@@ -4562,7 +4544,6 @@ int32_t tsdbReaderSuspend(STsdbReader* pReader) {
}
pInfo->delSkyline = taosArrayDestroy(pInfo->delSkyline);
// pInfo->lastKey = ts;
}
} else {
// resetDataBlockScanInfo excluding lastKey
......@@ -4585,7 +4566,6 @@ int32_t tsdbReaderSuspend(STsdbReader* pReader) {
}
pInfo->delSkyline = taosArrayDestroy(pInfo->delSkyline);
// pInfo->lastKey = ts;
}
pBlockScanInfo = pStatus->pTableIter == NULL ? NULL : *pStatus->pTableIter;
......@@ -5134,8 +5114,6 @@ int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTa
const int32_t numOfBuckets = 20.0;
// find the start data block in file
// find the start data block in file
tsdbAcquireReader(pReader);
if (pReader->suspended) {
......
......@@ -637,78 +637,6 @@ SColVal *tsdbRowIterNext(STSDBRowIter *pIter) {
}
// SRowMerger ======================================================
int32_t tsdbRowMergerInit(SRowMerger *pMerger, STSchema *pResTSchema, TSDBROW *pRow, STSchema *pTSchema) {
int32_t code = 0;
TSDBKEY key = TSDBROW_KEY(pRow);
SColVal *pColVal = &(SColVal){0};
STColumn *pTColumn;
int32_t iCol, jCol = 0;
if (NULL == pResTSchema) {
pResTSchema = pTSchema;
}
pMerger->pTSchema = pResTSchema;
pMerger->version = key.version;
pMerger->pArray = taosArrayInit(pResTSchema->numOfCols, sizeof(SColVal));
if (pMerger->pArray == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
// ts
pTColumn = &pTSchema->columns[jCol++];
ASSERT(pTColumn->type == TSDB_DATA_TYPE_TIMESTAMP);
*pColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.val = key.ts});
if (taosArrayPush(pMerger->pArray, pColVal) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
// other
for (iCol = 1; jCol < pTSchema->numOfCols && iCol < pResTSchema->numOfCols; ++iCol) {
pTColumn = &pResTSchema->columns[iCol];
if (pTSchema->columns[jCol].colId < pTColumn->colId) {
++jCol;
--iCol;
continue;
} else if (pTSchema->columns[jCol].colId > pTColumn->colId) {
taosArrayPush(pMerger->pArray, &COL_VAL_NONE(pTColumn->colId, pTColumn->type));
continue;
}
tsdbRowGetColVal(pRow, pTSchema, jCol++, pColVal);
if ((!COL_VAL_IS_NONE(pColVal)) && (!COL_VAL_IS_NULL(pColVal)) && IS_VAR_DATA_TYPE(pColVal->type)) {
uint8_t *pVal = pColVal->value.pData;
pColVal->value.pData = NULL;
code = tRealloc(&pColVal->value.pData, pColVal->value.nData);
if (code) goto _exit;
if (pColVal->value.nData) {
memcpy(pColVal->value.pData, pVal, pColVal->value.nData);
}
}
if (taosArrayPush(pMerger->pArray, pColVal) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
}
for (; iCol < pResTSchema->numOfCols; ++iCol) {
pTColumn = &pResTSchema->columns[iCol];
taosArrayPush(pMerger->pArray, &COL_VAL_NONE(pTColumn->colId, pTColumn->type));
}
_exit:
return code;
}
int32_t tsdbRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) {
int32_t code = 0;
TSDBKEY key = TSDBROW_KEY(pRow);
......@@ -836,7 +764,7 @@ int32_t tsdbRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema)
}
}
int32_t tsdbRowMergerInit_rv(SRowMerger* pMerger, STSchema *pSchema) {
int32_t tsdbRowMergerInit(SRowMerger* pMerger, STSchema *pSchema) {
pMerger->pTSchema = pSchema;
pMerger->pArray = taosArrayInit(pSchema->numOfCols, sizeof(SColVal));
if (pMerger->pArray == NULL) {
......@@ -846,7 +774,7 @@ int32_t tsdbRowMergerInit_rv(SRowMerger* pMerger, STSchema *pSchema) {
}
}
void tsdbRowMergerClear_rv(SRowMerger* pMerger) {
void tsdbRowMergerClear(SRowMerger* pMerger) {
for (int32_t iCol = 1; iCol < pMerger->pTSchema->numOfCols; iCol++) {
SColVal *pTColVal = taosArrayGet(pMerger->pArray, iCol);
if (IS_VAR_DATA_TYPE(pTColVal->type)) {
......@@ -857,7 +785,7 @@ void tsdbRowMergerClear_rv(SRowMerger* pMerger) {
taosArrayClear(pMerger->pArray);
}
void tsdbRowMergerCleanup_rv(SRowMerger* pMerger) {
void tsdbRowMergerCleanup(SRowMerger* pMerger) {
int32_t numOfCols = taosArrayGetSize(pMerger->pArray);
for (int32_t iCol = 1; iCol < numOfCols; iCol++) {
SColVal *pTColVal = taosArrayGet(pMerger->pArray, iCol);
......@@ -869,82 +797,6 @@ void tsdbRowMergerCleanup_rv(SRowMerger* pMerger) {
taosArrayDestroy(pMerger->pArray);
}
void tsdbRowMergerClear(SRowMerger *pMerger) {
for (int32_t iCol = 1; iCol < pMerger->pTSchema->numOfCols; iCol++) {
SColVal *pTColVal = taosArrayGet(pMerger->pArray, iCol);
if (IS_VAR_DATA_TYPE(pTColVal->type)) {
tFree(pTColVal->value.pData);
}
}
taosArrayDestroy(pMerger->pArray);
}
/*
int32_t tsdbRowMerge(SRowMerger *pMerger, TSDBROW *pRow) {
int32_t code = 0;
TSDBKEY key = TSDBROW_KEY(pRow);
SColVal *pColVal = &(SColVal){0};
ASSERT(((SColVal *)pMerger->pArray->pData)->value.val == key.ts);
for (int32_t iCol = 1; iCol < pMerger->pTSchema->numOfCols; iCol++) {
tsdbRowGetColVal(pRow, pMerger->pTSchema, iCol, pColVal);
if (key.version > pMerger->version) {
if (!COL_VAL_IS_NONE(pColVal)) {
if (IS_VAR_DATA_TYPE(pColVal->type)) {
SColVal *pTColVal = taosArrayGet(pMerger->pArray, iCol);
if (!COL_VAL_IS_NULL(pColVal)) {
code = tRealloc(&pTColVal->value.pData, pColVal->value.nData);
if (code) goto _exit;
pTColVal->value.nData = pColVal->value.nData;
if (pTColVal->value.nData) {
memcpy(pTColVal->value.pData, pColVal->value.pData, pTColVal->value.nData);
}
pTColVal->flag = 0;
} else {
tFree(pTColVal->value.pData);
pTColVal->value.pData = NULL;
taosArraySet(pMerger->pArray, iCol, pColVal);
}
} else {
taosArraySet(pMerger->pArray, iCol, pColVal);
}
}
} else if (key.version < pMerger->version) {
SColVal *tColVal = (SColVal *)taosArrayGet(pMerger->pArray, iCol);
if (COL_VAL_IS_NONE(tColVal) && !COL_VAL_IS_NONE(pColVal)) {
if (IS_VAR_DATA_TYPE(pColVal->type)) {
if (!COL_VAL_IS_NULL(pColVal)) {
code = tRealloc(&tColVal->value.pData, pColVal->value.nData);
if (code) goto _exit;
tColVal->value.nData = pColVal->value.nData;
if (tColVal->value.nData) {
memcpy(tColVal->value.pData, pColVal->value.pData, tColVal->value.nData);
}
tColVal->flag = 0;
} else {
tFree(tColVal->value.pData);
tColVal->value.pData = NULL;
taosArraySet(pMerger->pArray, iCol, pColVal);
}
} else {
taosArraySet(pMerger->pArray, iCol, pColVal);
}
}
} else {
ASSERT(0);
}
}
pMerger->version = key.version;
_exit:
return code;
}
*/
int32_t tsdbRowMergerGetRow(SRowMerger *pMerger, SRow **ppRow) {
return tRowBuild(pMerger->pArray, pMerger->pTSchema, ppRow);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册