未验证 提交 df9d086a 编写于 作者: S slguan 提交者: GitHub

Merge pull request #1661 from taosdata/feature/2.0tsdb

Feature/2.0tsdb
...@@ -867,7 +867,7 @@ static void *tsdbCommitData(void *arg) { ...@@ -867,7 +867,7 @@ static void *tsdbCommitData(void *arg) {
SRWHelper whelper = {0}; SRWHelper whelper = {0};
if (pCache->imem == NULL) return NULL; if (pCache->imem == NULL) return NULL;
pRepo->appH.walCallBack(pRepo->appH.appH); if (pRepo->appH.walCallBack) pRepo->appH.walCallBack(pRepo->appH.appH);
// Create the iterator to read from cache // Create the iterator to read from cache
SSkipListIterator **iters = tsdbCreateTableIters(pMeta, pCfg->maxTables); SSkipListIterator **iters = tsdbCreateTableIters(pMeta, pCfg->maxTables);
......
...@@ -29,7 +29,7 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa ...@@ -29,7 +29,7 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa
static int tsdbInsertSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx); static int tsdbInsertSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx);
static int tsdbAddSubBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx, int rowsAdded); static int tsdbAddSubBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx, int rowsAdded);
static int tsdbUpdateSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx); static int tsdbUpdateSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx);
static int tsdbGetRowsInRange(SDataCols *pDataCols, int minKey, int maxKey); static int tsdbGetRowsInRange(SDataCols *pDataCols, TSKEY minKey, TSKEY maxKey);
static void tsdbResetHelperBlock(SRWHelper *pHelper); static void tsdbResetHelperBlock(SRWHelper *pHelper);
// ---------- Operations on Helper File part // ---------- Operations on Helper File part
...@@ -342,7 +342,7 @@ int tsdbWriteDataBlock(SRWHelper *pHelper, SDataCols *pDataCols) { ...@@ -342,7 +342,7 @@ int tsdbWriteDataBlock(SRWHelper *pHelper, SDataCols *pDataCols) {
rowsToWrite = tsdbGetRowsInRange(pDataCols, 0, pCompBlock->keyFirst-1); rowsToWrite = tsdbGetRowsInRange(pDataCols, 0, pCompBlock->keyFirst-1);
ASSERT(rowsToWrite > 0); ASSERT(rowsToWrite > 0);
if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.dataF), pDataCols, rowsToWrite, &compBlock, false, true) < 0) goto _err; if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.dataF), pDataCols, rowsToWrite, &compBlock, false, true) < 0) goto _err;
if (tsdbInsertSuperBlock(pHelper, pCompBlock, blkIdx) < 0) goto _err; if (tsdbInsertSuperBlock(pHelper, &compBlock, blkIdx) < 0) goto _err;
} }
} }
} }
...@@ -934,7 +934,15 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa ...@@ -934,7 +934,15 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa
return -1; return -1;
} }
static int compTSKEY(const void *key1, const void *key2) { return ((TSKEY *)key1 - (TSKEY *)key2); } static int compTSKEY(const void *key1, const void *key2) {
if (*(TSKEY *)key1 > *(TSKEY *)key2) {
return 1;
} else if (*(TSKEY *)key1 == *(TSKEY *)key2) {
return 0;
} else {
return -1;
}
}
static int tsdbAdjustInfoSizeIfNeeded(SRWHelper *pHelper, size_t esize) { static int tsdbAdjustInfoSizeIfNeeded(SRWHelper *pHelper, size_t esize) {
...@@ -979,6 +987,10 @@ static int tsdbInsertSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int ...@@ -979,6 +987,10 @@ static int tsdbInsertSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int
pIdx->maxKey = pHelper->pCompInfo->blocks[pIdx->numOfSuperBlocks - 1].keyLast; pIdx->maxKey = pHelper->pCompInfo->blocks[pIdx->numOfSuperBlocks - 1].keyLast;
pIdx->hasLast = pHelper->pCompInfo->blocks[pIdx->numOfSuperBlocks - 1].last; pIdx->hasLast = pHelper->pCompInfo->blocks[pIdx->numOfSuperBlocks - 1].last;
if (pIdx->numOfSuperBlocks > 1) {
ASSERT(pHelper->pCompInfo->blocks[0].keyLast < pHelper->pCompInfo->blocks[1].keyFirst);
}
return 0; return 0;
_err: _err:
...@@ -1104,7 +1116,7 @@ static int tsdbUpdateSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int ...@@ -1104,7 +1116,7 @@ static int tsdbUpdateSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int
} }
// Get the number of rows in range [minKey, maxKey] // Get the number of rows in range [minKey, maxKey]
static int tsdbGetRowsInRange(SDataCols *pDataCols, int minKey, int maxKey) { static int tsdbGetRowsInRange(SDataCols *pDataCols, TSKEY minKey, TSKEY maxKey) {
if (pDataCols->numOfPoints == 0) return 0; if (pDataCols->numOfPoints == 0) return 0;
ASSERT(minKey <= maxKey); ASSERT(minKey <= maxKey);
......
...@@ -13,14 +13,15 @@ static double getCurTime() { ...@@ -13,14 +13,15 @@ static double getCurTime() {
typedef struct { typedef struct {
TsdbRepoT *pRepo; TsdbRepoT *pRepo;
int tid; bool isAscend;
int64_t uid; int tid;
int sversion; int64_t uid;
TSKEY startTime; int sversion;
TSKEY interval; TSKEY startTime;
int totalRows; TSKEY interval;
int rowsPerSubmit; int totalRows;
STSchema * pSchema; int rowsPerSubmit;
STSchema * pSchema;
} SInsertInfo; } SInsertInfo;
static int insertData(SInsertInfo *pInfo) { static int insertData(SInsertInfo *pInfo) {
...@@ -41,7 +42,11 @@ static int insertData(SInsertInfo *pInfo) { ...@@ -41,7 +42,11 @@ static int insertData(SInsertInfo *pInfo) {
pBlock->len = 0; pBlock->len = 0;
for (int i = 0; i < pInfo->rowsPerSubmit; i++) { for (int i = 0; i < pInfo->rowsPerSubmit; i++) {
// start_time += 1000; // start_time += 1000;
start_time += pInfo->interval; if (pInfo->isAscend) {
start_time += pInfo->interval;
} else {
start_time -= pInfo->interval;
}
SDataRow row = (SDataRow)(pBlock->data + pBlock->len); SDataRow row = (SDataRow)(pBlock->data + pBlock->len);
tdInitDataRow(row, pInfo->pSchema); tdInitDataRow(row, pInfo->pSchema);
...@@ -155,12 +160,14 @@ TEST(TsdbTest, createRepo) { ...@@ -155,12 +160,14 @@ TEST(TsdbTest, createRepo) {
// Insert Some Data // Insert Some Data
SInsertInfo iInfo = { SInsertInfo iInfo = {
.pRepo = pRepo, .pRepo = pRepo,
// .isAscend = true,
.isAscend = false,
.tid = tCfg.tableId.tid, .tid = tCfg.tableId.tid,
.uid = tCfg.tableId.uid, .uid = tCfg.tableId.uid,
.sversion = tCfg.sversion, .sversion = tCfg.sversion,
.startTime = 1584081000000, .startTime = 1584081000000,
.interval = 1000, .interval = 1000,
.totalRows = 50, .totalRows = 5000000,
.rowsPerSubmit = 1, .rowsPerSubmit = 1,
.pSchema = schema .pSchema = schema
}; };
...@@ -175,34 +182,34 @@ TEST(TsdbTest, createRepo) { ...@@ -175,34 +182,34 @@ TEST(TsdbTest, createRepo) {
repo = (STsdbRepo *)pRepo; repo = (STsdbRepo *)pRepo;
ASSERT_NE(pRepo, nullptr); ASSERT_NE(pRepo, nullptr);
// Insert more data // // Insert more data
iInfo.startTime = iInfo.startTime + iInfo.interval * iInfo.totalRows; // iInfo.startTime = iInfo.startTime + iInfo.interval * iInfo.totalRows;
iInfo.totalRows = 10; // iInfo.totalRows = 10;
iInfo.pRepo = pRepo; // iInfo.pRepo = pRepo;
ASSERT_EQ(insertData(&iInfo), 0); // ASSERT_EQ(insertData(&iInfo), 0);
// Close the repository // // Close the repository
tsdbCloseRepo(pRepo); // tsdbCloseRepo(pRepo);
// Open the repository again // // Open the repository again
pRepo = tsdbOpenRepo("/home/ubuntu/work/ttest/vnode0", NULL); // pRepo = tsdbOpenRepo("/home/ubuntu/work/ttest/vnode0", NULL);
repo = (STsdbRepo *)pRepo; // repo = (STsdbRepo *)pRepo;
ASSERT_NE(pRepo, nullptr); // ASSERT_NE(pRepo, nullptr);
// Read from file // // Read from file
SRWHelper rhelper; // SRWHelper rhelper;
tsdbInitReadHelper(&rhelper, repo); // tsdbInitReadHelper(&rhelper, repo);
SFileGroup *pFGroup = tsdbSearchFGroup(repo->tsdbFileH, 1833); // SFileGroup *pFGroup = tsdbSearchFGroup(repo->tsdbFileH, 1833);
ASSERT_NE(pFGroup, nullptr); // ASSERT_NE(pFGroup, nullptr);
ASSERT_GE(tsdbSetAndOpenHelperFile(&rhelper, pFGroup), 0); // ASSERT_GE(tsdbSetAndOpenHelperFile(&rhelper, pFGroup), 0);
STable *pTable = tsdbGetTableByUid(repo->tsdbMeta, tCfg.tableId.uid); // STable *pTable = tsdbGetTableByUid(repo->tsdbMeta, tCfg.tableId.uid);
ASSERT_NE(pTable, nullptr); // ASSERT_NE(pTable, nullptr);
tsdbSetHelperTable(&rhelper, pTable, repo); // tsdbSetHelperTable(&rhelper, pTable, repo);
ASSERT_EQ(tsdbLoadCompInfo(&rhelper, NULL), 0); // ASSERT_EQ(tsdbLoadCompInfo(&rhelper, NULL), 0);
ASSERT_EQ(tsdbLoadBlockData(&rhelper, blockAtIdx(&rhelper, 0), NULL), 0); // ASSERT_EQ(tsdbLoadBlockData(&rhelper, blockAtIdx(&rhelper, 0), NULL), 0);
int k = 0; int k = 0;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册