提交 ed7d5fe8 编写于 作者: H hzcheng

TD-34

上级 987708ac
...@@ -59,6 +59,8 @@ tsdb_repo_t * tsdbOpenRepo(char *tsdbDir); ...@@ -59,6 +59,8 @@ tsdb_repo_t * tsdbOpenRepo(char *tsdbDir);
int32_t tsdbCloseRepo(tsdb_repo_t *repo); int32_t tsdbCloseRepo(tsdb_repo_t *repo);
int32_t tsdbConfigRepo(tsdb_repo_t *repo, STsdbCfg *pCfg); int32_t tsdbConfigRepo(tsdb_repo_t *repo, STsdbCfg *pCfg);
int32_t tsdbTriggerCommit(tsdb_repo_t *repo); int32_t tsdbTriggerCommit(tsdb_repo_t *repo);
int32_t tsdbLockRepo(tsdb_repo_t *repo);
int32_t tsdbUnLockRepo(tsdb_repo_t *repo);
// --------- TSDB TABLE DEFINITION // --------- TSDB TABLE DEFINITION
typedef struct { typedef struct {
......
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
#include "taosdef.h" #include "taosdef.h"
#include "tlist.h" #include "tlist.h"
#include "tsdb.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
...@@ -49,13 +50,15 @@ typedef struct { ...@@ -49,13 +50,15 @@ typedef struct {
typedef struct { typedef struct {
int maxBytes; int maxBytes;
int cacheBlockSize; int cacheBlockSize;
int totalCacheBlocks;
STsdbCachePool pool; STsdbCachePool pool;
STsdbCacheBlock *curBlock; STsdbCacheBlock *curBlock;
SCacheMem * mem; SCacheMem * mem;
SCacheMem * imem; SCacheMem * imem;
tsdb_repo_t * pRepo;
} STsdbCache; } STsdbCache;
STsdbCache *tsdbInitCache(int maxBytes, int cacheBlockSize); STsdbCache *tsdbInitCache(int maxBytes, int cacheBlockSize, tsdb_repo_t *pRepo);
void tsdbFreeCache(STsdbCache *pCache); void tsdbFreeCache(STsdbCache *pCache);
void * tsdbAllocFromCache(STsdbCache *pCache, int bytes, TSKEY key); void * tsdbAllocFromCache(STsdbCache *pCache, int bytes, TSKEY key);
......
...@@ -14,12 +14,13 @@ ...@@ -14,12 +14,13 @@
*/ */
#include <stdlib.h> #include <stdlib.h>
#include "tsdb.h"
#include "tsdbCache.h" #include "tsdbCache.h"
static int tsdbAllocBlockFromPool(STsdbCache *pCache); static int tsdbAllocBlockFromPool(STsdbCache *pCache);
static void tsdbFreeBlockList(SCacheMem *mem); static void tsdbFreeBlockList(SCacheMem *mem);
STsdbCache *tsdbInitCache(int maxBytes, int cacheBlockSize) { STsdbCache *tsdbInitCache(int maxBytes, int cacheBlockSize, tsdb_repo_t *pRepo) {
STsdbCache *pCache = (STsdbCache *)calloc(1, sizeof(STsdbCache)); STsdbCache *pCache = (STsdbCache *)calloc(1, sizeof(STsdbCache));
if (pCache == NULL) return NULL; if (pCache == NULL) return NULL;
...@@ -27,9 +28,11 @@ STsdbCache *tsdbInitCache(int maxBytes, int cacheBlockSize) { ...@@ -27,9 +28,11 @@ STsdbCache *tsdbInitCache(int maxBytes, int cacheBlockSize) {
pCache->maxBytes = maxBytes; pCache->maxBytes = maxBytes;
pCache->cacheBlockSize = cacheBlockSize; pCache->cacheBlockSize = cacheBlockSize;
pCache->pRepo = pRepo;
int nBlocks = maxBytes / cacheBlockSize + 1; int nBlocks = maxBytes / cacheBlockSize + 1;
if (nBlocks <= 1) nBlocks = 2; if (nBlocks <= 1) nBlocks = 2;
pCache->totalCacheBlocks = nBlocks;
STsdbCachePool *pPool = &(pCache->pool); STsdbCachePool *pPool = &(pCache->pool);
pPool->index = 0; pPool->index = 0;
...@@ -67,22 +70,10 @@ void *tsdbAllocFromCache(STsdbCache *pCache, int bytes, TSKEY key) { ...@@ -67,22 +70,10 @@ void *tsdbAllocFromCache(STsdbCache *pCache, int bytes, TSKEY key) {
if (pCache == NULL) return NULL; if (pCache == NULL) return NULL;
if (bytes > pCache->cacheBlockSize) return NULL; if (bytes > pCache->cacheBlockSize) return NULL;
if (pCache->mem == NULL) { // Create a new one if (pCache->curBlock == NULL || pCache->curBlock->remain < bytes) {
pCache->mem = (SCacheMem *)malloc(sizeof(SCacheMem)); if (pCache->curBlock !=NULL && (pCache->mem->list) >= pCache->totalCacheBlocks/2) {
if (pCache->mem == NULL) return NULL; tsdbTriggerCommit(pCache->pRepo);
pCache->mem->keyFirst = INT64_MAX;
pCache->mem->keyLast = 0;
pCache->mem->numOfPoints = 0;
pCache->mem->list = tdListNew(sizeof(STsdbCacheBlock *));
}
if (isListEmpty(pCache->mem->list)) {
if (tsdbAllocBlockFromPool(pCache) < 0) {
// TODO: deal with the error
} }
}
if (pCache->curBlock->remain < bytes) {
if (tsdbAllocBlockFromPool(pCache) < 0) { if (tsdbAllocBlockFromPool(pCache) < 0) {
// TODO: deal with the error // TODO: deal with the error
} }
...@@ -115,7 +106,12 @@ static void tsdbFreeBlockList(SCacheMem *mem) { ...@@ -115,7 +106,12 @@ static void tsdbFreeBlockList(SCacheMem *mem) {
static int tsdbAllocBlockFromPool(STsdbCache *pCache) { static int tsdbAllocBlockFromPool(STsdbCache *pCache) {
STsdbCachePool *pPool = &(pCache->pool); STsdbCachePool *pPool = &(pCache->pool);
if (listNEles(pPool->memPool) == 0) return -1;
tsdbLockRepo(pCache->pRepo);
if (listNEles(pPool->memPool) == 0) {
tsdbUnLockRepo(pCache->pRepo);
return -1;
}
SListNode *node = tdListPopHead(pPool->memPool); SListNode *node = tdListPopHead(pPool->memPool);
...@@ -125,8 +121,19 @@ static int tsdbAllocBlockFromPool(STsdbCache *pCache) { ...@@ -125,8 +121,19 @@ static int tsdbAllocBlockFromPool(STsdbCache *pCache) {
pBlock->offset = 0; pBlock->offset = 0;
pBlock->remain = pCache->cacheBlockSize; pBlock->remain = pCache->cacheBlockSize;
if (pCache->mem == NULL) { // Create a new one
pCache->mem = (SCacheMem *)malloc(sizeof(SCacheMem));
if (pCache->mem == NULL) return NULL;
pCache->mem->keyFirst = INT64_MAX;
pCache->mem->keyLast = 0;
pCache->mem->numOfPoints = 0;
pCache->mem->list = tdListNew(sizeof(STsdbCacheBlock *));
}
tdListAppendNode(pCache->mem->list, node); tdListAppendNode(pCache->mem->list, node);
pCache->curBlock = pBlock; pCache->curBlock = pBlock;
tsdbUnLockRepo(pCache->pRepo);
return 0; return 0;
} }
\ No newline at end of file
...@@ -150,6 +150,7 @@ tsdb_repo_t *tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg, void *limiter /* TODO ...@@ -150,6 +150,7 @@ tsdb_repo_t *tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg, void *limiter /* TODO
pRepo->rootDir = strdup(rootDir); pRepo->rootDir = strdup(rootDir);
pRepo->config = *pCfg; pRepo->config = *pCfg;
pRepo->limiter = limiter; pRepo->limiter = limiter;
pthread_mutex_init(&pRepo->mutex, NULL);
// Create the environment files and directories // Create the environment files and directories
if (tsdbSetRepoEnv(pRepo) < 0) { if (tsdbSetRepoEnv(pRepo) < 0) {
...@@ -168,7 +169,7 @@ tsdb_repo_t *tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg, void *limiter /* TODO ...@@ -168,7 +169,7 @@ tsdb_repo_t *tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg, void *limiter /* TODO
pRepo->tsdbMeta = pMeta; pRepo->tsdbMeta = pMeta;
// Initialize cache // Initialize cache
STsdbCache *pCache = tsdbInitCache(pCfg->maxCacheSize, -1); STsdbCache *pCache = tsdbInitCache(pCfg->maxCacheSize, -1, (tsdb_repo_t *)pRepo);
if (pCache == NULL) { if (pCache == NULL) {
free(pRepo->rootDir); free(pRepo->rootDir);
tsdbFreeMeta(pRepo->tsdbMeta); tsdbFreeMeta(pRepo->tsdbMeta);
...@@ -249,7 +250,7 @@ tsdb_repo_t *tsdbOpenRepo(char *tsdbDir) { ...@@ -249,7 +250,7 @@ tsdb_repo_t *tsdbOpenRepo(char *tsdbDir) {
return NULL; return NULL;
} }
pRepo->tsdbCache = tsdbInitCache(pRepo->config.maxCacheSize, -1); pRepo->tsdbCache = tsdbInitCache(pRepo->config.maxCacheSize, -1, (tsdb_repo_t *)pRepo);
if (pRepo->tsdbCache == NULL) { if (pRepo->tsdbCache == NULL) {
tsdbFreeMeta(pRepo->tsdbMeta); tsdbFreeMeta(pRepo->tsdbMeta);
free(pRepo->rootDir); free(pRepo->rootDir);
...@@ -305,9 +306,12 @@ int32_t tsdbConfigRepo(tsdb_repo_t *repo, STsdbCfg *pCfg) { ...@@ -305,9 +306,12 @@ int32_t tsdbConfigRepo(tsdb_repo_t *repo, STsdbCfg *pCfg) {
int32_t tsdbTriggerCommit(tsdb_repo_t *repo) { int32_t tsdbTriggerCommit(tsdb_repo_t *repo) {
STsdbRepo *pRepo = (STsdbRepo *)repo; STsdbRepo *pRepo = (STsdbRepo *)repo;
if (pthread_mutex_lock(&(pRepo->mutex)) < 0) return -1; tsdbLockRepo(repo);
if (pRepo->commit) return 0; if (pRepo->commit) {
tsdbUnLockRepo(repo);
return -1;
}
pRepo->commit = 1; pRepo->commit = 1;
// Loop to move pData to iData // Loop to move pData to iData
for (int i = 0; i < pRepo->config.maxTables; i++) { for (int i = 0; i < pRepo->config.maxTables; i++) {
...@@ -320,15 +324,25 @@ int32_t tsdbTriggerCommit(tsdb_repo_t *repo) { ...@@ -320,15 +324,25 @@ int32_t tsdbTriggerCommit(tsdb_repo_t *repo) {
// TODO: Loop to move mem to imem // TODO: Loop to move mem to imem
pRepo->tsdbCache->imem = pRepo->tsdbCache->mem; pRepo->tsdbCache->imem = pRepo->tsdbCache->mem;
pRepo->tsdbCache->mem = NULL; pRepo->tsdbCache->mem = NULL;
pRepo->tsdbCache->curBlock = NULL;
// TODO: here should set as detached or use join for memory leak
pthread_create(&(pRepo->commitThread), NULL, tsdbCommitToFile, (void *)repo); pthread_create(&(pRepo->commitThread), NULL, tsdbCommitToFile, (void *)repo);
pthread_mutex_unlock(&(pRepo->mutex)); tsdbUnLockRepo(repo);
pthread_join(pRepo->commitThread, NULL);
return 0; return 0;
} }
int32_t tsdbLockRepo(tsdb_repo_t *repo) {
STsdbRepo *pRepo = (STsdbRepo *)repo;
return pthread_mutex_lock(repo);
}
int32_t tsdbUnLockRepo(tsdb_repo_t *repo) {
STsdbRepo *pRepo = (STsdbRepo *)repo;
return pthread_mutex_unlock(repo);
}
/** /**
* Get the TSDB repository information, including some statistics * Get the TSDB repository information, including some statistics
* @param pRepo the TSDB repository handle * @param pRepo the TSDB repository handle
...@@ -691,6 +705,13 @@ static int32_t tdInsertRowToTable(STsdbRepo *pRepo, SDataRow row, STable *pTable ...@@ -691,6 +705,13 @@ static int32_t tdInsertRowToTable(STsdbRepo *pRepo, SDataRow row, STable *pTable
dataRowCpy(SL_GET_NODE_DATA(pNode), row); dataRowCpy(SL_GET_NODE_DATA(pNode), row);
// Insert the skiplist node into the data // Insert the skiplist node into the data
if (pTable->mem == NULL) {
pTable->mem = (SMemTable *)calloc(1, sizeof(SMemTable));
if (pTable->mem == NULL) return -1;
pTable->mem->pData = tSkipListCreate(5, TSDB_DATA_TYPE_TIMESTAMP, TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP], 0, 0, getTupleKey);
pTable->mem->keyFirst = INT64_MAX;
pTable->mem->keyLast = 0;
}
tSkipListPut(pTable->mem->pData, pNode); tSkipListPut(pTable->mem->pData, pNode);
if (key > pTable->mem->keyLast) pTable->mem->keyLast = key; if (key > pTable->mem->keyLast) pTable->mem->keyLast = key;
if (key < pTable->mem->keyFirst) pTable->mem->keyFirst = key; if (key < pTable->mem->keyFirst) pTable->mem->keyFirst = key;
...@@ -788,6 +809,7 @@ static void *tsdbCommitToFile(void *arg) { ...@@ -788,6 +809,7 @@ static void *tsdbCommitToFile(void *arg) {
int rowsRead = 0; int rowsRead = 0;
while ((rowsRead = tsdbReadRowsFromCache(iters[tid], maxKey, pCfg->maxRowsPerFileBlock, cols, pTable->schema)) > while ((rowsRead = tsdbReadRowsFromCache(iters[tid], maxKey, pCfg->maxRowsPerFileBlock, cols, pTable->schema)) >
0) { 0) {
// printf("rowsRead:%d-----------\n", rowsRead);
int k = 0; int k = 0;
} }
} }
...@@ -802,5 +824,19 @@ static void *tsdbCommitToFile(void *arg) { ...@@ -802,5 +824,19 @@ static void *tsdbCommitToFile(void *arg) {
free(cols); free(cols);
free(iters); free(iters);
tsdbLockRepo(arg);
tdListMove(pCache->imem->list, pCache->pool.memPool);
free(pCache->imem);
pCache->imem = NULL;
pRepo->commit = 0;
// TODO: free the skiplist
for (int i = 0; i < pCfg->maxTables; i++) {
STable *pTable = pMeta->tables[i];
if (pTable && pTable->imem) { // Here has memory leak
pTable->imem = NULL;
}
}
tsdbUnLockRepo(arg);
return NULL; return NULL;
} }
\ No newline at end of file
#include <gtest/gtest.h> #include <gtest/gtest.h>
#include <stdlib.h> #include <stdlib.h>
#include <sys/time.h>
#include "tsdb.h" #include "tsdb.h"
#include "dataformat.h" #include "dataformat.h"
#include "tsdbFile.h" #include "tsdbFile.h"
#include "tsdbMeta.h" #include "tsdbMeta.h"
double getCurTime() {
struct timeval tv;
gettimeofday(&tv, NULL);
return tv.tv_sec + tv.tv_usec * 1E-6;
}
TEST(TsdbTest, DISABLED_tableEncodeDecode) { TEST(TsdbTest, DISABLED_tableEncodeDecode) {
STable *pTable = (STable *)malloc(sizeof(STable)); STable *pTable = (STable *)malloc(sizeof(STable));
...@@ -71,19 +78,22 @@ TEST(TsdbTest, createRepo) { ...@@ -71,19 +78,22 @@ TEST(TsdbTest, createRepo) {
tsdbCreateTable(pRepo, &tCfg); tsdbCreateTable(pRepo, &tCfg);
// // 3. Loop to write some simple data // // 3. Loop to write some simple data
int nRows = 1000; int nRows = 10000000;
int rowsPerSubmit = 10; int rowsPerSubmit = 100;
int64_t start_time = 1584081000000; int64_t start_time = 1584081000000;
SSubmitMsg *pMsg = (SSubmitMsg *)malloc(sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + tdMaxRowBytesFromSchema(schema) * rowsPerSubmit); SSubmitMsg *pMsg = (SSubmitMsg *)malloc(sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + tdMaxRowBytesFromSchema(schema) * rowsPerSubmit);
double stime = getCurTime();
for (int k = 0; k < nRows/rowsPerSubmit; k++) { for (int k = 0; k < nRows/rowsPerSubmit; k++) {
SSubmitBlk *pBlock = pMsg->blocks; SSubmitBlk *pBlock = pMsg->blocks;
pBlock->tableId = {.uid = 987607499877672L, .tid = 0}; pBlock->tableId = {.uid = 987607499877672L, .tid = 0};
pBlock->sversion = 0; pBlock->sversion = 0;
pBlock->len = 0; pBlock->len = 0;
for (int i = 0; i < rowsPerSubmit; i++) { for (int i = 0; i < rowsPerSubmit; i++) {
start_time += 1000; // start_time += 1000;
start_time -= 1000;
SDataRow row = (SDataRow)(pBlock->data + pBlock->len); SDataRow row = (SDataRow)(pBlock->data + pBlock->len);
tdInitDataRow(row, schema); tdInitDataRow(row, schema);
...@@ -102,7 +112,13 @@ TEST(TsdbTest, createRepo) { ...@@ -102,7 +112,13 @@ TEST(TsdbTest, createRepo) {
tsdbInsertData(pRepo, pMsg); tsdbInsertData(pRepo, pMsg);
} }
tsdbTriggerCommit(pRepo); double etime = getCurTime();
printf("Spent %f seconds to write %d records\n", etime - stime, nRows);
// tsdbTriggerCommit(pRepo);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册