提交 6d9e229a 编写于 作者: H Haojun Liao

refactor: fix the in-memory row merge.

上级 1cf9974b
...@@ -194,7 +194,7 @@ static void setComposedBlockFlag(STsdbReader* pReader, bool composed); ...@@ -194,7 +194,7 @@ static void setComposedBlockFlag(STsdbReader* pReader, bool composed);
static bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order, static bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order,
SVersionRange* pVerRange); SVersionRange* pVerRange);
static int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, SRow** pTSRow, static int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, TSDBROW* pTSRow,
STsdbReader* pReader, bool* freeTSRow); STsdbReader* pReader, bool* freeTSRow);
static int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, static int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo,
STsdbReader* pReader, SRow** pTSRow); STsdbReader* pReader, SRow** pTSRow);
...@@ -3358,7 +3358,7 @@ int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockSc ...@@ -3358,7 +3358,7 @@ int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockSc
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, SRow** pTSRow, int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, TSDBROW* pResRow,
STsdbReader* pReader, bool* freeTSRow) { STsdbReader* pReader, bool* freeTSRow) {
TSDBROW* pNextRow = NULL; TSDBROW* pNextRow = NULL;
TSDBROW current = *pRow; TSDBROW current = *pRow;
...@@ -3367,25 +3367,27 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, ...@@ -3367,25 +3367,27 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter,
pIter->hasVal = tsdbTbDataIterNext(pIter->iter); pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
if (!pIter->hasVal) { if (!pIter->hasVal) {
*pTSRow = current.pTSRow; *pResRow = *pRow;
*freeTSRow = false; *freeTSRow = false;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} else { // has next point in mem/imem } else { // has next point in mem/imem
pNextRow = getValidMemRow(pIter, pDelList, pReader); pNextRow = getValidMemRow(pIter, pDelList, pReader);
if (pNextRow == NULL) { if (pNextRow == NULL) {
*pTSRow = current.pTSRow; *pResRow = *pRow;
*freeTSRow = false; *freeTSRow = false;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (TSDBROW_TS(&current) != TSDBROW_TS(pNextRow)) { if (TSDBROW_TS(&current) != TSDBROW_TS(pNextRow)) {
*pTSRow = current.pTSRow; *pResRow = *pRow;
*freeTSRow = false; *freeTSRow = false;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
} }
} }
#if 0
// todo handle the data block merge
SRowMerger merge = {0}; SRowMerger merge = {0};
// get the correct schema for data in memory // get the correct schema for data in memory
...@@ -3423,6 +3425,8 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, ...@@ -3423,6 +3425,8 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter,
tsdbRowMergerClear(&merge); tsdbRowMergerClear(&merge);
*freeTSRow = true; *freeTSRow = true;
#endif
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -3480,7 +3484,7 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* p ...@@ -3480,7 +3484,7 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* p
return code; return code;
} }
int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, SRow** pTSRow, int64_t endKey, int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, TSDBROW* pResRow, int64_t endKey,
bool* freeTSRow) { bool* freeTSRow) {
TSDBROW* pRow = getValidMemRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader); TSDBROW* pRow = getValidMemRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader);
TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader); TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader);
...@@ -3510,13 +3514,14 @@ int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pR ...@@ -3510,13 +3514,14 @@ int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pR
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
if (ik.ts != k.ts) { if (ik.ts != k.ts) {
if (((ik.ts < k.ts) && asc) || ((ik.ts > k.ts) && (!asc))) { // ik.ts < k.ts if (((ik.ts < k.ts) && asc) || ((ik.ts > k.ts) && (!asc))) { // ik.ts < k.ts
code = doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, pTSRow, pReader, freeTSRow); code = doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, pResRow, pReader, freeTSRow);
} else if (((k.ts < ik.ts) && asc) || ((k.ts > ik.ts) && (!asc))) { } else if (((k.ts < ik.ts) && asc) || ((k.ts > ik.ts) && (!asc))) {
code = doMergeMemTableMultiRows(pRow, uid, &pBlockScanInfo->iter, pDelList, pTSRow, pReader, freeTSRow); code = doMergeMemTableMultiRows(pRow, uid, &pBlockScanInfo->iter, pDelList, pResRow, pReader, freeTSRow);
} }
} else { // ik.ts == k.ts } else { // ik.ts == k.ts
*freeTSRow = true; *freeTSRow = true;
code = doMergeMemIMemRows(pRow, piRow, pBlockScanInfo, pReader, pTSRow); pResRow->type = TSDBROW_ROW_FMT;
code = doMergeMemIMemRows(pRow, piRow, pBlockScanInfo, pReader, &pResRow->pTSRow);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -3526,12 +3531,12 @@ int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pR ...@@ -3526,12 +3531,12 @@ int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pR
} }
if (pBlockScanInfo->iter.hasVal && pRow != NULL) { if (pBlockScanInfo->iter.hasVal && pRow != NULL) {
return doMergeMemTableMultiRows(pRow, pBlockScanInfo->uid, &pBlockScanInfo->iter, pDelList, pTSRow, pReader, return doMergeMemTableMultiRows(pRow, pBlockScanInfo->uid, &pBlockScanInfo->iter, pDelList, pResRow, pReader,
freeTSRow); freeTSRow);
} }
if (pBlockScanInfo->iiter.hasVal && piRow != NULL) { if (pBlockScanInfo->iiter.hasVal && piRow != NULL) {
return doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, pTSRow, pReader, freeTSRow); return doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, pResRow, pReader, freeTSRow);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -3636,17 +3641,22 @@ int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t e ...@@ -3636,17 +3641,22 @@ int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t e
SSDataBlock* pBlock = pReader->pResBlock; SSDataBlock* pBlock = pReader->pResBlock;
do { do {
SRow* pTSRow = NULL; // SRow* pTSRow = NULL;
TSDBROW row = {.type = -1};
bool freeTSRow = false; bool freeTSRow = false;
tsdbGetNextRowInMem(pBlockScanInfo, pReader, &pTSRow, endKey, &freeTSRow); tsdbGetNextRowInMem(pBlockScanInfo, pReader, &row, endKey, &freeTSRow);
if (pTSRow == NULL) { if (row.type == -1) {
break; break;
} }
doAppendRowFromTSRow(pBlock, pReader, pTSRow, pBlockScanInfo); if (row.type == TSDBROW_ROW_FMT) {
doAppendRowFromTSRow(pBlock, pReader, row.pTSRow, pBlockScanInfo);
if (freeTSRow) { if (freeTSRow) {
taosMemoryFree(pTSRow); taosMemoryFree(row.pTSRow);
}
} else {
doAppendRowFromFileBlock(pBlock, pReader, row.pBlockData, row.iRow);
} }
// no data in buffer, return immediately // no data in buffer, return immediately
......
...@@ -270,7 +270,7 @@ static void i64VectorSumAVX2(const int64_t* plist, int32_t numOfRows, SAvgRes* p ...@@ -270,7 +270,7 @@ static void i64VectorSumAVX2(const int64_t* plist, int32_t numOfRows, SAvgRes* p
#if __AVX2__ #if __AVX2__
// find the start position that are aligned to 32bytes address in memory // find the start position that are aligned to 32bytes address in memory
int32_t width = (bitWidth>>3u) / sizeof(int64_t); int32_t width = (bitWidth >> 3u) / sizeof(int64_t);
int32_t remainder = numOfRows % width; int32_t remainder = numOfRows % width;
int32_t rounds = numOfRows / width; int32_t rounds = numOfRows / width;
...@@ -286,21 +286,12 @@ static void i64VectorSumAVX2(const int64_t* plist, int32_t numOfRows, SAvgRes* p ...@@ -286,21 +286,12 @@ static void i64VectorSumAVX2(const int64_t* plist, int32_t numOfRows, SAvgRes* p
} }
// let sum up the final results // let sum up the final results
if (type == TSDB_DATA_TYPE_BIGINT) {
const int64_t* q = (const int64_t*)&sum; const int64_t* q = (const int64_t*)&sum;
pRes->sum.isum += q[0] + q[1] + q[2] + q[3]; pRes->sum.isum += q[0] + q[1] + q[2] + q[3];
for (int32_t j = 0; j < remainder; ++j) { for (int32_t j = 0; j < remainder; ++j) {
pRes->sum.isum += plist[j + rounds * width]; pRes->sum.isum += plist[j + rounds * width];
} }
} else {
const uint64_t* q = (const uint64_t*)&sum;
pRes->sum.usum += q[0] + q[1] + q[2] + q[3];
for (int32_t j = 0; j < remainder; ++j) {
pRes->sum.usum += (uint64_t)plist[j + rounds * width];
}
}
#endif #endif
} }
...@@ -588,7 +579,7 @@ int32_t avgFunction(SqlFunctionCtx* pCtx) { ...@@ -588,7 +579,7 @@ int32_t avgFunction(SqlFunctionCtx* pCtx) {
const int64_t* plist = (const int64_t*) pCol->pData; const int64_t* plist = (const int64_t*) pCol->pData;
// 1. If the CPU supports AVX, let's employ AVX instructions to speedup this loop // 1. If the CPU supports AVX, let's employ AVX instructions to speedup this loop
if (simdAvailable) { if (simdAvailable && type == TSDB_DATA_TYPE_BIGINT) {
i64VectorSumAVX2(plist, numOfRows, pAvgRes); i64VectorSumAVX2(plist, numOfRows, pAvgRes);
} else { } else {
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) { for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册