提交 99fb3c67 编写于 作者: H Haojun Liao

refactor: do some internal refactor.

上级 98ed6ce3
......@@ -669,13 +669,13 @@ TEST(testCase, projection_query_tables) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(pConn, nullptr);
// TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 2");
// if (taos_errno(pRes) != 0) {
// printf("error in create db, reason:%s\n", taos_errstr(pRes));
// }
// taos_free_result(pRes);
TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 1");
if (taos_errno(pRes) != 0) {
printf("error in create db, reason:%s\n", taos_errstr(pRes));
}
taos_free_result(pRes);
TAOS_RES* pRes = taos_query(pConn, "use abc1");
pRes = taos_query(pConn, "use abc1");
taos_free_result(pRes);
pRes = taos_query(pConn, "create stable st1 (ts timestamp, k int) tags(a int)");
......@@ -700,54 +700,55 @@ TEST(testCase, projection_query_tables) {
printf("create table :%d\n", i);
createNewTable(pConn, i);
}
// pRes = taos_query(pConn, "select * from tu");
// if (taos_errno(pRes) != 0) {
// printf("failed to select from table, reason:%s\n", taos_errstr(pRes));
// taos_free_result(pRes);
// ASSERT_TRUE(false);
// }
// TAOS_ROW pRow = NULL;
// TAOS_FIELD* pFields = taos_fetch_fields(pRes);
// int32_t numOfFields = taos_num_fields(pRes);
//
// char str[512] = {0};
// while ((pRow = taos_fetch_row(pRes)) != NULL) {
// int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
// printf("%s\n", str);
// }
pRes = taos_query(pConn, "select * from tu");
if (taos_errno(pRes) != 0) {
printf("failed to select from table, reason:%s\n", taos_errstr(pRes));
taos_free_result(pRes);
ASSERT_TRUE(false);
}
// taos_free_result(pRes);
TAOS_ROW pRow = NULL;
TAOS_FIELD* pFields = taos_fetch_fields(pRes);
int32_t numOfFields = taos_num_fields(pRes);
char str[512] = {0};
while ((pRow = taos_fetch_row(pRes)) != NULL) {
int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
printf("%s\n", str);
}
taos_free_result(pRes);
taos_close(pConn);
}
//TEST(testCase, projection_query_stables) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// ASSERT_NE(pConn, nullptr);
//
// TAOS_RES* pRes = taos_query(pConn, "use abc1");
// taos_free_result(pRes);
//
// pRes = taos_query(pConn, "select ts from st1");
// if (taos_errno(pRes) != 0) {
// printf("failed to select from table, reason:%s\n", taos_errstr(pRes));
// taos_free_result(pRes);
// ASSERT_TRUE(false);
// }
//
// TAOS_ROW pRow = NULL;
// TAOS_FIELD* pFields = taos_fetch_fields(pRes);
// int32_t numOfFields = taos_num_fields(pRes);
//
// char str[512] = {0};
// while ((pRow = taos_fetch_row(pRes)) != NULL) {
// int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
// printf("%s\n", str);
// }
//
// taos_free_result(pRes);
// taos_close(pConn);
//}
TEST(testCase, projection_query_stables) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(pConn, nullptr);
TAOS_RES* pRes = taos_query(pConn, "use abc1");
taos_free_result(pRes);
pRes = taos_query(pConn, "select ts from st1");
if (taos_errno(pRes) != 0) {
printf("failed to select from table, reason:%s\n", taos_errstr(pRes));
taos_free_result(pRes);
ASSERT_TRUE(false);
}
TAOS_ROW pRow = NULL;
TAOS_FIELD* pFields = taos_fetch_fields(pRes);
int32_t numOfFields = taos_num_fields(pRes);
char str[512] = {0};
while ((pRow = taos_fetch_row(pRes)) != NULL) {
int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
printf("%s\n", str);
}
taos_free_result(pRes);
taos_close(pConn);
}
TEST(testCase, agg_query_tables) {
......@@ -773,7 +774,7 @@ TEST(testCase, agg_query_tables) {
taos_free_result(pRes);
taos_close(pConn);
}
#endif
/*
--- copy the following script in the shell to setup the environment ---
......@@ -819,5 +820,27 @@ TEST(testCase, async_api_test) {
getchar();
taos_close(pConn);
}
#endif
TEST(testCase, update_test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(pConn, nullptr);
taos_query(pConn, "use abc1");
TAOS_RES* pRes = taos_query(pConn, "create table tup (ts timestamp, k int);");
if (taos_errno(pRes) != 0) {
printf("failed to create table, reason:%s", taos_errstr(pRes));
}
taos_free_result(pRes);
char s[256] = {0};
for(int32_t i = 0; i < 7000; ++i) {
sprintf(s, "insert into tup values('2020-1-1 1:1:1', %d)", i);
pRes = taos_query(pConn, s);
taos_free_result(pRes);
}
}
#pragma GCC diagnostic pop
......@@ -15,12 +15,6 @@
#include "tsdb.h"
#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC)
#define QH_GET_NUM_OF_COLS(handle) ((size_t)(taosArrayGetSize((handle)->pResBlock->pDataBlock)))
#define GET_FILE_DATA_BLOCK_INFO(_checkInfo, _block) \
((SDataBlockInfo){.window = {.skey = (_block)->minKey.ts, .ekey = (_block)->maxKey.ts}, \
.rows = (_block)->numOfRows, \
.uid = (_checkInfo)->tableId})
typedef struct SQueryFilePos {
int32_t fid;
......@@ -33,13 +27,6 @@ typedef struct SQueryFilePos {
STimeWindow win;
} SQueryFilePos;
typedef struct SDataBlockLoadInfo {
SDFileSet* fileGroup;
int32_t slot;
uint64_t uid;
SArray* pLoadedCols;
} SDataBlockLoadInfo;
typedef struct STableBlockScanInfo {
uint64_t uid;
TSKEY lastKey;
......@@ -88,8 +75,8 @@ typedef struct SFilesetIter {
} SFilesetIter;
typedef struct SFileDataBlockInfo {
int32_t tbBlockIdx; // index position in STableBlockScanInfo in order to check whether neighbor block overlaps with it
int64_t uid;
int32_t tbBlockIdx; // index position in STableBlockScanInfo in order to check whether neighbor block overlaps with it
uint64_t uid;
} SFileDataBlockInfo;
typedef struct SDataBlockIter {
......@@ -158,13 +145,17 @@ struct STsdbReader {
};
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter);
static int buildInmemDataBlockImpl(STableBlockScanInfo* pBlockScanInfo, TSDBKEY maxKey, int32_t capacity, STsdbReader* pReader);
static int buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, TSDBKEY maxKey, int32_t capacity, STsdbReader* pReader);
static TSDBROW* getValidRow(STbDataIter* pIter, bool* hasVal, STsdbReader* pReader);
static int32_t doLoadRowsOfIdenticalTsInFileBlock(SFileDataBlockInfo* pFBlock, SBlock* pBlock, SBlockData* pBlockData,
STableBlockScanInfo* pScanInfo, STsdbReader* pReader, SRowMerger* pMerger);
static int32_t doLoadRowsOfIdenticalTs(STbDataIter *pIter, bool* hasVal, int64_t ts, SRowMerger* pMerger, STsdbReader* pReader);
static int32_t doLoadRowsOfIdenticalTsInBuf(STbDataIter *pIter, bool* hasVal, int64_t ts, SRowMerger* pMerger, STsdbReader* pReader);
static int32_t doAppendOneRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow);
static void setComposedBlockFlag(STsdbReader* pReader, bool composed);
static void setComposedBlockFlag(STsdbReader* pReader, bool composed);
static void checkUpdateSchema(TSDBROW* pRow, uint64_t uid, STsdbReader* pReader);
static void doMergeMultiRows(TSDBROW* pRow, uint64_t uid, STbDataIter* dIter, bool* hasVal, STSRow **pTSRow, STsdbReader* pReader);
static void doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo *pBlockScanInfo, STsdbReader* pReader, STSRow** pTSRow);
// static void tsdbInitDataBlockLoadInfo(SDataBlockLoadInfo* pBlockLoadInfo) {
// pBlockLoadInfo->slot = -1;
......@@ -426,8 +417,7 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd
pReader->order = pCond->order;
pReader->capacity = 4096;
pReader->idStr = strdup(idstr);
pReader->verRange.minVer= pCond->startVersion;
pReader->verRange.maxVer = 100000;//pCond->endVersion; // todo for test purpose
pReader->verRange = (SVersionRange) {.minVer = pCond->startVersion, .maxVer = 10000};
pReader->type = pCond->type;
pReader->window = *pCond->twindows;
......@@ -437,11 +427,6 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd
}
#endif
if (pReader->suid != 0) {
pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pReader->suid, -1);
ASSERT(pReader->pSchema);
}
// todo remove this
setQueryTimewindow(pReader, pCond, 0);
ASSERT (pCond->numOfCols > 0);
......@@ -1007,7 +992,7 @@ _error:
// TSKEY maxKey = ascScan ? (binfo.window.skey - step) : (binfo.window.ekey - step);
// cur->rows =
// buildInmemDataBlockImpl(pCheckInfo, maxKey, pTsdbReadHandle->outputCapacity, &cur->win, pTsdbReadHandle);
// buildDataBlockFromBufImpl(pCheckInfo, maxKey, pTsdbReadHandle->outputCapacity, &cur->win, pTsdbReadHandle);
// pTsdbReadHandle->realNumOfRows = cur->rows;
// // update the last key value
......@@ -2022,11 +2007,10 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte
}
tsdbDebug("%p create data blocks info struct completed, %d blocks in %d tables %s", pReader, cnt, sup.numOfTables, pReader->idStr);
// the pTableQueryInfo[j]->numOfBlocks may be 0
assert(cnt <= numOfBlocks && sup.numOfTables <= numOfTables);
SMultiwayMergeTreeInfo* pTree = NULL;
uint8_t ret = tMergeTreeCreate(&pTree, sup.numOfTables, &sup, fileDataBlockOrderCompar);
if (ret != TSDB_CODE_SUCCESS) {
cleanupBlockOrderSupporter(&sup);
......@@ -2178,14 +2162,64 @@ static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter) {
return pFBlockInfo;
}
static bool overlapWithNeighborBlock(SFileDataBlockInfo *pFBlockInfo, SBlock* pBlock, STableBlockScanInfo* pTableBlockScanInfo) {
// it is the last block in current file, no chance to overlap with neighbor blocks.
if(pFBlockInfo->tbBlockIdx == taosArrayGetSize(pTableBlockScanInfo->pBlockList) - 1) { // last block in current file,
return false;
static SBlock* getNeighborBlockOfSameTable(SFileDataBlockInfo *pFBlockInfo, STableBlockScanInfo* pTableBlockScanInfo, int32_t* nextIndex, int32_t order) {
bool asc = ASCENDING_TRAVERSE(order);
if (asc && pFBlockInfo->tbBlockIdx >= taosArrayGetSize(pTableBlockScanInfo->pBlockList) - 1) {
return NULL;
}
SBlock* pNext = taosArrayGet(pTableBlockScanInfo->pBlockList, pFBlockInfo->tbBlockIdx + 1);
return (pNext->minKey.ts == pBlock->maxKey.ts);
if (!asc && pFBlockInfo->tbBlockIdx == 0) {
return NULL;
}
int32_t step = asc? 1:-1;
*nextIndex = pFBlockInfo->tbBlockIdx + step;
SBlock* pNext = taosArrayGet(pTableBlockScanInfo->pBlockList, *nextIndex);
return pNext;
}
static int32_t findFileBlockInfoIndex(SDataBlockIter* pBlockIter, SFileDataBlockInfo* pFBlockInfo) {
ASSERT(pBlockIter != NULL && pFBlockInfo != NULL);
int32_t step = ASCENDING_TRAVERSE(pBlockIter->order)? 1:-1;
int32_t index = pBlockIter->index;
while(index < pBlockIter->numOfBlocks && index >= 0) {
SFileDataBlockInfo* pFBlock = taosArrayGet(pBlockIter->blockList, index);
if (pFBlock->uid == pFBlockInfo->uid && pFBlock->tbBlockIdx == pFBlockInfo->tbBlockIdx) {
return index;
}
index += step;
}
ASSERT(0);
return -1;
}
static int32_t setFileBlockActiveInBlockIter(SDataBlockIter* pBlockIter, int32_t index) {
if (index < 0 || index >= pBlockIter->numOfBlocks) {
return -1;
}
SFileDataBlockInfo fblock = *(SFileDataBlockInfo*)taosArrayGet(pBlockIter->blockList, index);
taosArrayRemove(pBlockIter->blockList, index);
taosArrayInsert(pBlockIter->blockList, pBlockIter->index, &fblock);
SFileDataBlockInfo* pBlockInfo = taosArrayGet(pBlockIter->blockList, pBlockIter->index);
ASSERT(pBlockInfo->uid == fblock.uid && pBlockInfo->tbBlockIdx == fblock.tbBlockIdx);
return TSDB_CODE_SUCCESS;
}
static bool overlapWithNeighborBlock(SBlock* pBlock, SBlock* pNeighbor, int32_t order) {
// it is the last block in current file, no chance to overlap with neighbor blocks.
if (ASCENDING_TRAVERSE(order)) {
return pBlock->maxKey.ts == pNeighbor->minKey.ts;
} else {
return pBlock->minKey.ts == pNeighbor->maxKey.ts;
}
}
static bool bufferDataInFileBlockGap(int32_t order, TSDBKEY key, SBlock* pBlock) {
......@@ -2199,15 +2233,22 @@ static bool keyOverlapFileBlock(TSDBKEY key, SBlock* pBlock, SVersionRange* pVer
return (key.ts >= pBlock->minKey.ts && key.ts <= pBlock->maxKey.ts) && (pBlock->maxVersion >= pVerRange->minVer) && (pBlock->minVersion <= pVerRange->maxVer);
}
static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo *pFBlock, SBlock* pBlock, STableBlockScanInfo *pScanInfo, TSDBKEY key) {
return (dataBlockPartialRequired(&pReader->window, &pReader->verRange, pBlock) ||
overlapWithNeighborBlock(pFBlock, pBlock, pScanInfo) ||
int32_t neighborIndex = 0;
SBlock* pNeighbor = getNeighborBlockOfSameTable(pFBlock, pScanInfo, &neighborIndex, pReader->order);
bool overlapWithNeighbor = false;
if (pNeighbor) {
overlapWithNeighbor = overlapWithNeighborBlock(pBlock, pNeighbor, pReader->order);
}
return (overlapWithNeighbor ||
dataBlockPartialRequired(&pReader->window, &pReader->verRange, pBlock) ||
keyOverlapFileBlock(key, pBlock, &pReader->verRange) ||
(pBlock->nRow > pReader->capacity));
}
static int32_t buildInmemDataBlock(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, TSDBKEY *key) {
static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, TSDBKEY *key) {
if (!(pBlockScanInfo->imemHasVal || pBlockScanInfo->memHasVal)) {
return TSDB_CODE_SUCCESS;
}
......@@ -2215,7 +2256,7 @@ static int32_t buildInmemDataBlock(STsdbReader* pReader, STableBlockScanInfo* pB
SSDataBlock* pBlock = pReader->pResBlock;
int64_t st = taosGetTimestampUs();
int32_t code = buildInmemDataBlockImpl(pBlockScanInfo, *key, pReader->capacity, pReader);
int32_t code = buildDataBlockFromBufImpl(pBlockScanInfo, *key, pReader->capacity, pReader);
int64_t elapsedTime = taosGetTimestampUs() - st;
......@@ -2227,15 +2268,42 @@ static int32_t buildInmemDataBlock(STsdbReader* pReader, STableBlockScanInfo* pB
return code;
}
static int32_t doMergeBufFileRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, TSDBROW* pRow,
STSRow* pTSRow, STbDataIter* pIter, bool* hasVal, int64_t key,
SFileDataBlockInfo* pFBlock, SBlock* pBlock) {
SRowMerger merge = {0};
SBlockData* pBlockData = &pReader->status.fileBlockData;
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
TSDBKEY k = TSDBROW_KEY(pRow);
if (key <= k.ts) {
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
tRowMergerInit(&merge, &fRow, pReader->pSchema);
doLoadRowsOfIdenticalTsInFileBlock(pFBlock, pBlock, pBlockData, pBlockScanInfo, pReader, &merge);
if (k.ts == key) {
tRowMerge(&merge, pRow);
doLoadRowsOfIdenticalTsInBuf(pIter, hasVal, k.ts, &merge, pReader);
}
tRowMergerGetRow(&merge, &pTSRow);
} else { // k.ts < key
doMergeMultiRows(pRow, pBlockScanInfo->uid, pIter, hasVal, &pTSRow, pReader);
}
doAppendOneRow(pReader->pResBlock, pReader, pTSRow);
return TSDB_CODE_SUCCESS;
}
static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, SFileDataBlockInfo* pFBlock, SBlock* pBlock, STableBlockScanInfo* pBlockScanInfo) {
SFileBlockDumpInfo *pDumpInfo = &pReader->status.fBlockDumpInfo;
SBlockData* pBlockData = &pReader->status.fileBlockData;
SRowMerger merge = {0};
STSRow* pTSRow = NULL;
TSKEY mergeTs = TSKEY_INITIAL_VAL;
int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex];
int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex];
TSDBROW* pRow = getValidRow(pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, pReader);
TSDBROW* piRow = getValidRow(pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, pReader);
......@@ -2243,122 +2311,60 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, SFileDataBlockIn
TSDBKEY k = TSDBROW_KEY(pRow);
TSDBKEY ik = TSDBROW_KEY(piRow);
// [1&2] key <= [k.ts|ik.ts]
if (key <= k.ts || key <= ik.ts) {
// [1&2] key <= [k.ts && ik.ts]
if (key <= k.ts && key <= ik.ts) {
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
tRowMergerInit(&merge, &fRow, pReader->pSchema);
doLoadRowsOfIdenticalTsInFileBlock(pFBlock, pBlock, pBlockData, pBlockScanInfo, pReader, &merge);
if (ik.ts == mergeTs) {
doLoadRowsOfIdenticalTs(pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, ik.ts, &merge, pReader);
if (ik.ts == key) {
tRowMerge(&merge, piRow);
doLoadRowsOfIdenticalTsInBuf(pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, key, &merge, pReader);
}
if (k.ts == mergeTs) {
doLoadRowsOfIdenticalTs(pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, k.ts, &merge, pReader);
if (k.ts == key) {
tRowMerge(&merge, pRow);
doLoadRowsOfIdenticalTsInBuf(pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, key, &merge, pReader);
}
tRowMergerGetRow(&merge, &pTSRow);
doAppendOneRow(pReader->pResBlock, pReader, pTSRow);
} else {
} else { // key > ik.ts || key > k.ts
// [3] ik.ts < key <= k.ts
// [4] ik.ts < k.ts <= key
if (ik.ts < k.ts) {
doLoadRowsOfIdenticalTs(pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, ik.ts, &merge, pReader);
tRowMergerGetRow(&merge, &pTSRow);
doMergeMultiRows(piRow, pBlockScanInfo->uid, pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, &pTSRow,
pReader);
doAppendOneRow(pReader->pResBlock, pReader, pTSRow);
return TSDB_CODE_SUCCESS;
}
// [4] k.ts < key <= ik.ts
// [5] k.ts < key <= ik.ts
// [6] k.ts < ik.ts <= key
if (k.ts < ik.ts) {
doLoadRowsOfIdenticalTs(pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, k.ts, &merge, pReader);
tRowMergerGetRow(&merge, &pTSRow);
doMergeMultiRows(pRow, pBlockScanInfo->uid, pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, &pTSRow,
pReader);
doAppendOneRow(pReader->pResBlock, pReader, pTSRow);
return TSDB_CODE_SUCCESS;
}
// [5] k.ts == ik.ts < key
// [7] k.ts == ik.ts < key
if (k.ts == ik.ts) {
doLoadRowsOfIdenticalTs(pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, ik.ts, &merge, pReader);
tRowMergerGetRow(&merge, &pTSRow);
if (k.ts == mergeTs) {
doLoadRowsOfIdenticalTs(pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, k.ts, &merge, pReader);
}
tRowMergerGetRow(&merge, &pTSRow);
doAppendOneRow(pReader->pResBlock, pReader, pTSRow);
return TSDB_CODE_SUCCESS;
}
// [6] k.ts < ik.ts < key
if (k.ts < ik.ts) {
doLoadRowsOfIdenticalTs(pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, k.ts, &merge, pReader);
tRowMergerGetRow(&merge, &pTSRow);
doAppendOneRow(pReader->pResBlock, pReader, pTSRow);
return TSDB_CODE_SUCCESS;
}
// [6] ik.ts < k.ts < key
if (ik.ts < k.ts) {
doLoadRowsOfIdenticalTs(pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, ik.ts, &merge, pReader);
tRowMergerGetRow(&merge, &pTSRow);
doMergeMemIMemRows(pRow, piRow, pBlockScanInfo, pReader, &pTSRow);
doAppendOneRow(pReader->pResBlock, pReader, pTSRow);
return TSDB_CODE_SUCCESS;
}
}
} else {
if (pBlockScanInfo->imemHasVal) {
TSDBKEY ik = TSDBROW_KEY(piRow);
if (key <= ik.ts) {
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
tRowMergerInit(&merge, &fRow, pReader->pSchema);
doLoadRowsOfIdenticalTsInFileBlock(pFBlock, pBlock, pBlockData, pBlockScanInfo, pReader, &merge);
if (ik.ts == mergeTs) {
doLoadRowsOfIdenticalTs(pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, ik.ts, &merge, pReader);
}
tRowMergerGetRow(&merge, &pTSRow);
doAppendOneRow(pReader->pResBlock, pReader, pTSRow);
return TSDB_CODE_SUCCESS;
}
if (ik.ts < key) {
doLoadRowsOfIdenticalTs(pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, ik.ts, &merge, pReader);
tRowMergerGetRow(&merge, &pTSRow);
doAppendOneRow(pReader->pResBlock, pReader, pTSRow);
}
return TSDB_CODE_SUCCESS;
return doMergeBufFileRows(pReader, pBlockScanInfo, piRow, pTSRow, pBlockScanInfo->iiter,
&pBlockScanInfo->imemHasVal, key, pFBlock, pBlock);
}
if (pBlockScanInfo->memHasVal) { // pBlockScanInfo->memHasVal != NULL
TSDBKEY k = TSDBROW_KEY(pRow);
if (key <= k.ts) {
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
tRowMergerInit(&merge, &fRow, pReader->pSchema);
doLoadRowsOfIdenticalTsInFileBlock(pFBlock, pBlock, pBlockData, pBlockScanInfo, pReader, &merge);
if (k.ts == mergeTs) {
doLoadRowsOfIdenticalTs(pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, k.ts, &merge, pReader);
}
tRowMergerGetRow(&merge, &pTSRow);
doAppendOneRow(pReader->pResBlock, pReader, pTSRow);
return TSDB_CODE_SUCCESS;
}
if (k.ts < key) {
doLoadRowsOfIdenticalTs(pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, k.ts, &merge, pReader);
tRowMergerGetRow(&merge, &pTSRow);
doAppendOneRow(pReader->pResBlock, pReader, pTSRow);
}
return TSDB_CODE_SUCCESS;
return doMergeBufFileRows(pReader, pBlockScanInfo, pRow, pTSRow, pBlockScanInfo->iter, &pBlockScanInfo->memHasVal,
key, pFBlock, pBlock);
}
// imem & mem are all empty
......@@ -2372,7 +2378,8 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, SFileDataBlockIn
return TSDB_CODE_SUCCESS;
}
static int32_t buildComposedDataBlock(STsdbReader* pReader, SFileDataBlockInfo* pFBlock, SBlock* pBlock, STableBlockScanInfo* pBlockScanInfo) {
static int32_t buildComposedDataBlock(STsdbReader* pReader, SFileDataBlockInfo* pFBlock, SBlock* pBlock,
STableBlockScanInfo* pBlockScanInfo) {
SSDataBlock* pResBlock = pReader->pResBlock;
while(1) {
......@@ -2525,11 +2532,11 @@ static int32_t moveToNextFile(STsdbReader* pReader, int32_t* numOfBlocks) {
static int32_t doBuildDataBlock(STsdbReader* pReader) {
int32_t code = TSDB_CODE_SUCCESS;
SReaderStatus* pStatus = &pReader->status;
SReaderStatus* pStatus = &pReader->status;
SDataBlockIter* pBlockIter = &pStatus->blockIter;
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter);
STableBlockScanInfo* pScanInfo = taosHashGet(pReader->status.pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter);
STableBlockScanInfo* pScanInfo = taosHashGet(pStatus->pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));
SBlock* pBlock = taosArrayGet(pScanInfo->pBlockList, pFBlock->tbBlockIdx);
......@@ -2549,8 +2556,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
} else if (bufferDataInFileBlockGap(pReader->order, key, pBlock)) {
// data in memory that are earlier than current file block
TSDBKEY maxKey = {.ts = pReader->window.ekey, .version = pReader->verRange.maxVer};
code = buildInmemDataBlock(pReader, pScanInfo, &maxKey);
// build data block from in-memory buffer data completed.
code = buildDataBlockFromBuf(pReader, pScanInfo, &maxKey);
} else { // whole block is required, return it directly
// todo
// 1. the version of all rows should be less than the endVersion
......@@ -2582,7 +2588,7 @@ static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) {
initMemIterator(pBlockScanInfo, pReader);
TSDBKEY maxKey = {.ts = pReader->window.ekey, .version = pReader->verRange.maxVer};
int32_t code = buildInmemDataBlock(pReader, pBlockScanInfo, &maxKey);
int32_t code = buildDataBlockFromBuf(pReader, pBlockScanInfo, &maxKey);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -2611,7 +2617,7 @@ static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter)
pDumpInfo->rowIndex = ASCENDING_TRAVERSE(pReader->order)? 0: pBlock->nRow - 1;
}
static int32_t initForFirstBlockOfFile(STsdbReader* pReader, SDataBlockIter* pBlockIter) {
static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBlockIter) {
int32_t numOfBlocks = 0;
int32_t code = moveToNextFile(pReader, &numOfBlocks);
if (code != TSDB_CODE_SUCCESS) {
......@@ -2635,7 +2641,7 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
SDataBlockIter* pBlockIter = &pReader->status.blockIter;
if (pReader->status.blockIter.index == -1) {
code = initForFirstBlockOfFile(pReader, pBlockIter);
code = initForFirstBlockInFile(pReader, pBlockIter);
if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) {
return code;
}
......@@ -2645,19 +2651,18 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
return code;
}
} else {
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter);
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter);
STableBlockScanInfo* pScanInfo = taosHashGet(pReader->status.pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));
SBlock* pBlock = taosArrayGet(pScanInfo->pBlockList, pFBlock->tbBlockIdx);
SBlock* pBlock = taosArrayGet(pScanInfo->pBlockList, pFBlock->tbBlockIdx);
// current block are exhausted, try the next file block
if (pDumpInfo->allDumped) {
if (pReader->status.fBlockDumpInfo.allDumped) {
// try next data block in current file
bool hasNext = blockIteratorNext(&pReader->status.blockIter);
if (hasNext) { // current file is exhausted, let's try the next file
initBlockDumpInfo(pReader, pBlockIter);
} else {
code = initForFirstBlockOfFile(pReader, pBlockIter);
code = initForFirstBlockInFile(pReader, pBlockIter);
if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) {
return code;
}
......@@ -2750,7 +2755,7 @@ TSDBROW* getValidRow(STbDataIter* pIter, bool* hasVal, STsdbReader* pReader) {
}
}
int32_t doLoadRowsOfIdenticalTs(STbDataIter *pIter, bool* hasVal, int64_t ts, SRowMerger* pMerger, STsdbReader* pReader) {
int32_t doLoadRowsOfIdenticalTsInBuf(STbDataIter *pIter, bool* hasVal, int64_t ts, SRowMerger* pMerger, STsdbReader* pReader) {
while (1) {
*hasVal = tsdbTbDataIterNext(pIter);
if (!(*hasVal)) {
......@@ -2769,73 +2774,109 @@ int32_t doLoadRowsOfIdenticalTs(STbDataIter *pIter, bool* hasVal, int64_t ts, SR
return TSDB_CODE_SUCCESS;
}
static int32_t doMergeRowsInFileBlockImpl(SBlockData* pBlockData, int32_t rowIndex, int64_t key, SRowMerger* pMerger,
SVersionRange* pVerRange, int32_t step) {
while (pBlockData->aTSKEY[rowIndex] == key && rowIndex < pBlockData->nRow && rowIndex >= 0) {
if (pBlockData->aVersion[rowIndex] > pVerRange->maxVer || pBlockData->aVersion[rowIndex] < pVerRange->minVer) {
continue;
}
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, rowIndex);
tRowMerge(pMerger, &fRow);
rowIndex += step;
}
return rowIndex;
}
typedef enum {
CHECK_FILEBLOCK_CONT = 0x1,
CHECK_FILEBLOCK_QUIT = 0x2,
} CHECK_FILEBLOCK_STATE;
static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanInfo* pScanInfo, SBlock* pBlock,
SFileDataBlockInfo* pFBlock, SRowMerger* pMerger, int64_t key, CHECK_FILEBLOCK_STATE* state) {
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
SBlockData* pBlockData = &pReader->status.fileBlockData;
int32_t step = ASCENDING_TRAVERSE(pReader->order)? 1:-1;
int32_t nextIndex = -1;
SBlock* pNeighborBlock = getNeighborBlockOfSameTable(pFBlock, pScanInfo, &nextIndex, pReader->order);
if (pNeighborBlock == NULL) { // do nothing
*state = CHECK_FILEBLOCK_QUIT;
return 0;
}
bool overlap = overlapWithNeighborBlock(pBlock, pNeighborBlock, pReader->order);
if (overlap) { // load next block
SReaderStatus* pStatus = &pReader->status;
SDataBlockIter* pBlockIter = &pStatus->blockIter;
//1. find the next neighbor block in the scan block list
SFileDataBlockInfo fb = {.uid = pFBlock->uid, .tbBlockIdx = nextIndex};
int32_t neighborIndex = findFileBlockInfoIndex(&pStatus->blockIter, &fb);
//2. remove it from the scan block list
setFileBlockActiveInBlockIter(&pStatus->blockIter, neighborIndex);
//3. load the neighbor block, and set it to be the currently accessed file data block
int32_t code = doLoadFileBlockData(pReader, pBlockIter, pScanInfo, &pStatus->fileBlockData);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
//4. check the data values
initBlockDumpInfo(pReader, pBlockIter);
pDumpInfo->rowIndex =
doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, key, pMerger, &pReader->verRange, step);
if (pDumpInfo->rowIndex >= pBlock->nRow) {
*state = CHECK_FILEBLOCK_CONT;
}
}
return TSDB_CODE_SUCCESS;
}
int32_t doLoadRowsOfIdenticalTsInFileBlock(SFileDataBlockInfo* pFBlock, SBlock* pBlock, SBlockData* pBlockData,
STableBlockScanInfo* pScanInfo, STsdbReader* pReader, SRowMerger* pMerger) {
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
bool asc = ASCENDING_TRAVERSE(pReader->order);
int32_t step = asc? 1:-1;
bool asc = ASCENDING_TRAVERSE(pReader->order);
int32_t step = asc ? 1 : -1;
int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex];
if (asc) { // todo refactor
if (asc) {
pDumpInfo->rowIndex += step;
if (pDumpInfo->rowIndex < pBlockData->nRow - 1) {
if (pBlockData->aTSKEY[pDumpInfo->rowIndex + step] == key) {
int32_t rowIndex = pDumpInfo->rowIndex + step;
while (pBlockData->aTSKEY[rowIndex] == key) {
if (pBlockData->aVersion[rowIndex] > pReader->verRange.maxVer || pBlockData->aVersion[rowIndex] < pReader->verRange.minVer) {
continue;
}
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, rowIndex);
tRowMerge(pMerger, &fRow);
rowIndex += step;
}
pDumpInfo->rowIndex = rowIndex;
} else {
pDumpInfo->rowIndex += step;
}
} else { // last row of current block, check if current block is overlapped with neighbor block
pDumpInfo->rowIndex += step;
bool overlap = overlapWithNeighborBlock(pFBlock, pBlock, pScanInfo);
if (overlap) { // load next block
ASSERT(0);
}
pDumpInfo->rowIndex = doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, key, pMerger, &pReader->verRange, step);
}
} else {
if (pDumpInfo->rowIndex > 0) {
if (pBlockData->aTSKEY[pDumpInfo->rowIndex + step] == key) {
int32_t rowIndex = pDumpInfo->rowIndex + step;
while (pBlockData->aTSKEY[rowIndex] == key) {
if (pBlockData->aVersion[rowIndex] > pReader->verRange.maxVer ||
pBlockData->aVersion[rowIndex] < pReader->verRange.minVer) {
continue;
}
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, rowIndex);
tRowMerge(pMerger, &fRow);
rowIndex += step;
}
pDumpInfo->rowIndex = rowIndex;
} else {
pDumpInfo->rowIndex += step;
}
} else { // last row of current block, check if current block is overlapped with previous neighbor block
pDumpInfo->rowIndex += step;
bool overlap = overlapWithNeighborBlock(pFBlock, pBlock, pScanInfo);
if (overlap) { // load next block
ASSERT(0);
// all rows are consumed, let's try next file block
if (pDumpInfo->rowIndex >= pBlockData->nRow && asc) {
while (1) {
CHECK_FILEBLOCK_STATE st;
checkForNeighborFileBlock(pReader, pScanInfo, pBlock, pFBlock, pMerger, key, &st);
if (st == CHECK_FILEBLOCK_QUIT) {
break;
}
}
}
} else { // last row of current block, check if current block is overlapped with previous neighbor block
pDumpInfo->rowIndex += step;
// bool overlap = overlapWithNeighborBlock(pFBlock, pBlock, pScanInfo);
// if (overlap) { // load next block
// ASSERT(0);
// }
// }
}
return TSDB_CODE_SUCCESS;
}
static void checkUpdateSchema(TSDBROW* pRow, uint64_t uid, STsdbReader* pReader) {
void checkUpdateSchema(TSDBROW* pRow, uint64_t uid, STsdbReader* pReader) {
int32_t sversion = TSDBROW_SVERSION(pRow);
if (pReader->pSchema == NULL) {
......@@ -2846,11 +2887,36 @@ static void checkUpdateSchema(TSDBROW* pRow, uint64_t uid, STsdbReader* pReader)
}
}
int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STSRow** pTSRow) {
TSKEY mergeTs = TSKEY_INITIAL_VAL;
void doMergeMultiRows(TSDBROW* pRow, uint64_t uid, STbDataIter* dIter, bool* hasVal, STSRow **pTSRow, STsdbReader* pReader) {
SRowMerger merge = {0};
TSDBKEY k = TSDBROW_KEY(pRow);
checkUpdateSchema(pRow, uid, pReader);
tRowMergerInit(&merge, pRow, pReader->pSchema);
doLoadRowsOfIdenticalTsInBuf(dIter, hasVal, k.ts, &merge, pReader);
tRowMergerGetRow(&merge, pTSRow);
}
void doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo *pBlockScanInfo, STsdbReader* pReader, STSRow** pTSRow) {
SRowMerger merge = {0};
TSDBKEY k = TSDBROW_KEY(pRow);
TSDBKEY ik = TSDBROW_KEY(piRow);
ASSERT(k.ts == ik.ts);
checkUpdateSchema(piRow, pBlockScanInfo->uid, pReader);
tRowMergerInit(&merge, piRow, pReader->pSchema);
doLoadRowsOfIdenticalTsInBuf(pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, ik.ts, &merge, pReader);
tRowMerge(&merge, pRow);
doLoadRowsOfIdenticalTsInBuf(pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, k.ts, &merge, pReader);
tRowMergerGetRow(&merge, pTSRow);
}
int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STSRow** pTSRow) {
TSDBROW* pRow = getValidRow(pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, pReader);
TSDBROW* piRow = getValidRow(pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, pReader);
......@@ -2862,45 +2928,21 @@ int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pR
ik = TSDBROW_KEY(piRow);
if (ik.ts <= k.ts) {
checkUpdateSchema(piRow, pBlockScanInfo->uid, pReader);
tRowMergerInit(&merge, piRow, pReader->pSchema);
doLoadRowsOfIdenticalTs(pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, ik.ts, &merge, pReader);
if (k.ts == mergeTs) {
doLoadRowsOfIdenticalTs(pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, k.ts, &merge, pReader);
}
tRowMergerGetRow(&merge, pTSRow);
doMergeMemIMemRows(pRow, piRow, pBlockScanInfo, pReader, pTSRow);
return TSDB_CODE_SUCCESS;
} else { // k.ts < ik.ts
checkUpdateSchema(pRow, pBlockScanInfo->uid, pReader);
tRowMergerInit(&merge, pRow, pReader->pSchema);
doLoadRowsOfIdenticalTs(pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, k.ts, &merge, pReader);
tRowMergerGetRow(&merge, pTSRow);
doMergeMultiRows(piRow, pBlockScanInfo->uid, pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, pTSRow, pReader);
return TSDB_CODE_SUCCESS;
}
}
if (pBlockScanInfo->memHasVal) {
k = TSDBROW_KEY(pRow);
checkUpdateSchema(pRow, pBlockScanInfo->uid, pReader);
tRowMergerInit(&merge, pRow, pReader->pSchema);
doLoadRowsOfIdenticalTs(pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, k.ts, &merge, pReader);
tRowMergerGetRow(&merge, pTSRow);
doMergeMultiRows(pRow, pBlockScanInfo->uid, pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, pTSRow, pReader);
return TSDB_CODE_SUCCESS;
}
if (pBlockScanInfo->imemHasVal) {
ik = TSDBROW_KEY(piRow);
checkUpdateSchema(piRow, pBlockScanInfo->uid, pReader);
tRowMergerInit(&merge, piRow, pReader->pSchema);
doLoadRowsOfIdenticalTs(pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, ik.ts, &merge, pReader);
tRowMergerGetRow(&merge, pTSRow);
doMergeMultiRows(piRow, pBlockScanInfo->uid, pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, pTSRow, pReader);
return TSDB_CODE_SUCCESS;
}
......@@ -2951,7 +2993,7 @@ int32_t doAppendOneRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow
return TSDB_CODE_SUCCESS;
}
int32_t buildInmemDataBlockImpl(STableBlockScanInfo* pBlockScanInfo, TSDBKEY maxKey, int32_t capacity, STsdbReader* pReader) {
int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, TSDBKEY maxKey, int32_t capacity, STsdbReader* pReader) {
SSDataBlock* pBlock = pReader->pResBlock;
do {
......@@ -3167,6 +3209,14 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl
goto _err;
}
if (pCond->suid != 0) {
(*ppReader)->pSchema = metaGetTbTSchema((*ppReader)->pTsdb->pVnode->pMeta, (*ppReader)->suid, -1);
ASSERT((*ppReader)->pSchema);
} else if (taosArrayGetSize(pTableList) > 0) {
STableKeyInfo* pKey = taosArrayGet(pTableList, 0);
(*ppReader)->pSchema = metaGetTbTSchema((*ppReader)->pTsdb->pVnode->pMeta, pKey->uid, -1);
}
STsdbReader* pReader = *ppReader;
if (isEmptyQueryTimeWindow(&pReader->window, pReader->order)) {
tsdbDebug("%p query window not overlaps with the data set, no result returned, %s", pReader, pReader->idStr);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册