未验证 提交 6c5929f3 编写于 作者: H haojun Liao 提交者: GitHub

Merge pull request #1461 from taosdata/feature/2.0tsdb

Feature/2.0tsdb
...@@ -100,6 +100,8 @@ SListNode *tdListPopHead(SList *list) { ...@@ -100,6 +100,8 @@ SListNode *tdListPopHead(SList *list) {
list->head = node->next; list->head = node->next;
} }
list->numOfEles--; list->numOfEles--;
node->next = NULL;
node->prev = NULL;
return node; return node;
} }
...@@ -113,6 +115,7 @@ SListNode *tdListPopTail(SList *list) { ...@@ -113,6 +115,7 @@ SListNode *tdListPopTail(SList *list) {
list->tail = node->prev; list->tail = node->prev;
} }
list->numOfEles--; list->numOfEles--;
node->next = node->prev = NULL;
return node; return node;
} }
...@@ -131,6 +134,7 @@ SListNode *tdListPopNode(SList *list, SListNode *node) { ...@@ -131,6 +134,7 @@ SListNode *tdListPopNode(SList *list, SListNode *node) {
node->next->prev = node->prev; node->next->prev = node->prev;
} }
list->numOfEles--; list->numOfEles--;
node->next = node->prev = NULL;
return node; return node;
} }
......
...@@ -157,22 +157,22 @@ TEST(testCase, string_strnchr_test) { ...@@ -157,22 +157,22 @@ TEST(testCase, string_strnchr_test) {
EXPECT_TRUE(strnchr(a10, '.', strlen(a10), true) == NULL); EXPECT_TRUE(strnchr(a10, '.', strlen(a10), true) == NULL);
} }
TEST(testCase, cache_resize_test) { // TEST(testCase, cache_resize_test) {
char a11[] = "abc'.'"; // char a11[] = "abc'.'";
EXPECT_TRUE(strnchr(a11, '.', strlen(a11), false) != NULL); // EXPECT_TRUE(strnchr(a11, '.', strlen(a11), false) != NULL);
char a12[] = "abc'-'"; // char a12[] = "abc'-'";
EXPECT_TRUE(strnchr(a12, '-', strlen(a12), false) != NULL); // EXPECT_TRUE(strnchr(a12, '-', strlen(a12), false) != NULL);
char a15[] = "abc'-'"; // char a15[] = "abc'-'";
EXPECT_TRUE(strnchr(a15, '-', strlen(a15), true) == NULL); // EXPECT_TRUE(strnchr(a15, '-', strlen(a15), true) == NULL);
char a13[] = "'-'"; // char a13[] = "'-'";
EXPECT_TRUE(strnchr(a13, '-', strlen(a13), false) != NULL); // EXPECT_TRUE(strnchr(a13, '-', strlen(a13), false) != NULL);
char a14[] = "'-'"; // char a14[] = "'-'";
EXPECT_TRUE(strnchr(a14, '-', strlen(a14), true) == NULL); // EXPECT_TRUE(strnchr(a14, '-', strlen(a14), true) == NULL);
char a16[] = "'-'."; // char a16[] = "'-'.";
EXPECT_TRUE(strnchr(a16, '.', strlen(a16), true) != NULL); // EXPECT_TRUE(strnchr(a16, '.', strlen(a16), true) != NULL);
} // }
\ No newline at end of file \ No newline at end of file
...@@ -345,12 +345,12 @@ int32_t tsdbTriggerCommit(tsdb_repo_t *repo) { ...@@ -345,12 +345,12 @@ int32_t tsdbTriggerCommit(tsdb_repo_t *repo) {
int32_t tsdbLockRepo(tsdb_repo_t *repo) { int32_t tsdbLockRepo(tsdb_repo_t *repo) {
STsdbRepo *pRepo = (STsdbRepo *)repo; STsdbRepo *pRepo = (STsdbRepo *)repo;
return pthread_mutex_lock(repo); return pthread_mutex_lock(&(pRepo->mutex));
} }
int32_t tsdbUnLockRepo(tsdb_repo_t *repo) { int32_t tsdbUnLockRepo(tsdb_repo_t *repo) {
STsdbRepo *pRepo = (STsdbRepo *)repo; STsdbRepo *pRepo = (STsdbRepo *)repo;
return pthread_mutex_unlock(repo); return pthread_mutex_unlock(&(pRepo->mutex));
} }
/** /**
...@@ -900,6 +900,9 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters ...@@ -900,6 +900,9 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters
int hasDataToCommit = tsdbHasDataToCommit(iters, pCfg->maxTables, minKey, maxKey); int hasDataToCommit = tsdbHasDataToCommit(iters, pCfg->maxTables, minKey, maxKey);
if (!hasDataToCommit) return 0; // No data to commit, just return if (!hasDataToCommit) return 0; // No data to commit, just return
// TODO: make it more flexible
pCompInfo = (SCompInfo *)malloc(sizeof(SCompInfo) + sizeof(SCompBlock) * 1000);
// Create and open files for commit // Create and open files for commit
tsdbGetDataDirName(pRepo, dataDir); tsdbGetDataDirName(pRepo, dataDir);
if (tsdbCreateFGroup(pFileH, dataDir, fid, pCfg->maxTables) < 0) { /* TODO */ if (tsdbCreateFGroup(pFileH, dataDir, fid, pCfg->maxTables) < 0) { /* TODO */
...@@ -908,7 +911,7 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters ...@@ -908,7 +911,7 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters
if (pGroup == NULL) { /* TODO */ if (pGroup == NULL) { /* TODO */
} }
tsdbCreateFile(dataDir, fid, ".h", pCfg->maxTables, &hFile, 1, 0); tsdbCreateFile(dataDir, fid, ".h", pCfg->maxTables, &hFile, 1, 0);
if (1 /*pGroup->files[TSDB_FILE_TYPE_LAST].size > TSDB_MAX_LAST_FILE_SIZE*/) { if (0 /*pGroup->files[TSDB_FILE_TYPE_LAST].size > TSDB_MAX_LAST_FILE_SIZE*/) {
// TODO: make it not to write the last file every time // TODO: make it not to write the last file every time
tsdbCreateFile(dataDir, fid, ".l", pCfg->maxTables, &lFile, 0, 0); tsdbCreateFile(dataDir, fid, ".l", pCfg->maxTables, &lFile, 0, 0);
isNewLastFile = 1; isNewLastFile = 1;
...@@ -929,6 +932,8 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters ...@@ -929,6 +932,8 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters
SSkipListIterator *pIter = iters[tid]; SSkipListIterator *pIter = iters[tid];
SCompIdx * pIdx = &pIndices[tid]; SCompIdx * pIdx = &pIndices[tid];
int nNewBlocks = 0;
if (pTable == NULL || pIter == NULL) continue; if (pTable == NULL || pIter == NULL) continue;
/* If no new data to write for this table, just write the old data to new file /* If no new data to write for this table, just write the old data to new file
...@@ -936,8 +941,10 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters ...@@ -936,8 +941,10 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters
*/ */
if (!tsdbHasDataInRange(pIter, minKey, maxKey)) { if (!tsdbHasDataInRange(pIter, minKey, maxKey)) {
// has old data // has old data
if (pIdx->offset > 0) { if (pIdx->len > 0) {
if (isNewLastFile && pIdx->hasLast) { goto _table_over;
// if (isNewLastFile && pIdx->hasLast) {
if (0) {
// need to move the last block to new file // need to move the last block to new file
if ((pCompInfo = (SCompInfo *)realloc((void *)pCompInfo, pIdx->len)) == NULL) { /* TODO */ if ((pCompInfo = (SCompInfo *)realloc((void *)pCompInfo, pIdx->len)) == NULL) { /* TODO */
} }
...@@ -976,9 +983,14 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters ...@@ -976,9 +983,14 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters
continue; continue;
} }
pCompInfo->delimiter = TSDB_FILE_DELIMITER;
pCompInfo->checksum = 0;
pCompInfo->uid = pTable->tableId.uid;
// Load SCompBlock part if neccessary // Load SCompBlock part if neccessary
int isCompBlockLoaded = 0; int isCompBlockLoaded = 0;
if (pIdx->offset > 0) { if (0) {
// if (pIdx->offset > 0) {
if (pIdx->hasLast || tsdbHasDataInRange(pIter, minKey, pIdx->maxKey)) { if (pIdx->hasLast || tsdbHasDataInRange(pIter, minKey, pIdx->maxKey)) {
// has last block || cache key overlap with commit key // has last block || cache key overlap with commit key
pCompInfo = (SCompInfo *)realloc((void *)pCompInfo, pIdx->len + sizeof(SCompBlock) * 100); pCompInfo = (SCompInfo *)realloc((void *)pCompInfo, pIdx->len + sizeof(SCompBlock) * 100);
...@@ -998,34 +1010,50 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters ...@@ -998,34 +1010,50 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters
tsdbReadRowsFromCache(pIter, maxKey, maxRowsToRead, pCols); tsdbReadRowsFromCache(pIter, maxKey, maxRowsToRead, pCols);
if (pCols->numOfPoints == 0) break; if (pCols->numOfPoints == 0) break;
int pointsWritten = 0; int pointsWritten = pCols->numOfPoints;
// { // TODO : try to write the block data to file // TODO: all write to the end of .data file
// if (!isCompBlockLoaded) { // Just append int64_t toffset = 0;
// if (pCols->numOfPoints > pCfg->minRowsPerFileBlock) { // write directly to .data file int32_t tlen = 0;
// lseek(pGroup->files[TSDB_FILE_TYPE_DATA], 0, SEEK_END); tsdbWriteBlockToFileImpl(&pGroup->files[TSDB_FILE_TYPE_DATA], pCols, pCols->numOfPoints, &toffset, &tlen, pTable->tableId.uid);
// } else { // Make the compBlock
// if (isNewLastFile) { // write directly to .l file SCompBlock *pTBlock = pCompInfo->blocks + nNewBlocks++;
pTBlock->offset = toffset;
// } else { // write directly to .last file pTBlock->len = tlen;
pTBlock->keyFirst = dataColsKeyFirst(pCols);
pTBlock->keyLast = dataColsKeyLast(pCols);
pTBlock->last = 0;
pTBlock->algorithm = 0;
pTBlock->numOfPoints = pCols->numOfPoints;
pTBlock->sversion = pTable->sversion;
pTBlock->numOfSubBlocks = 1;
if (dataColsKeyLast(pCols) > pIdx->maxKey) pIdx->maxKey = dataColsKeyLast(pCols);
// }
// }
// } else { // Need to append
// // SCompBlock *pTBlock = NULL;
// }
// }
// pointsWritten = pCols->numOfPoints;
tdPopDataColsPoints(pCols, pointsWritten); tdPopDataColsPoints(pCols, pointsWritten);
maxRowsToRead = pCfg->maxRowsPerFileBlock * 4 / 5 - pCols->numOfPoints; maxRowsToRead = pCfg->maxRowsPerFileBlock * 4 / 5 - pCols->numOfPoints;
} }
_table_over:
// Write the SCompBlock part // Write the SCompBlock part
if (isCompBlockLoaded) { pIdx->offset = lseek(hFile.fd, 0, SEEK_END);
// merge the block into old and update pIdx if (pIdx->len > 0) {
sendfile(pGroup->files[TSDB_FILE_TYPE_HEAD].fd, hFile.fd, NULL, pIdx->len);
if (nNewBlocks > 0) {
write(hFile.fd, (void *)(pCompInfo->blocks), sizeof(SCompBlock) * nNewBlocks);
pIdx->len += (sizeof(SCompBlock) * nNewBlocks);
}
} else { } else {
// sendfile the SCompBlock part and update the pIdx if (nNewBlocks > 0) {
write(hFile.fd, (void *)pCompInfo, sizeof(SCompInfo) + sizeof(SCompBlock) * nNewBlocks);
pIdx->len += sizeof(SCompInfo) + sizeof(SCompBlock) * nNewBlocks;
}
} }
pIdx->checksum = 0;
pIdx->numOfSuperBlocks += nNewBlocks;
pIdx->hasLast = 0;
} }
// Write the SCompIdx part // Write the SCompIdx part
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册