提交 7546da5e 编写于 作者: H hzcheng

TD-34

上级 7a562813
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
#include <stdint.h> #include <stdint.h>
#include "taosdef.h"
#include "tlist.h" #include "tlist.h"
#ifdef __cplusplus #ifdef __cplusplus
...@@ -38,18 +39,25 @@ typedef struct { ...@@ -38,18 +39,25 @@ typedef struct {
SList * memPool; SList * memPool;
} STsdbCachePool; } STsdbCachePool;
typedef struct {
TSKEY keyFirst;
TSKEY keyLast;
int64_t numOfPoints;
SList * list;
} SCacheMem;
typedef struct { typedef struct {
int maxBytes; int maxBytes;
int cacheBlockSize; int cacheBlockSize;
STsdbCachePool pool; STsdbCachePool pool;
STsdbCacheBlock *curBlock; STsdbCacheBlock *curBlock;
SList * mem; SCacheMem * mem;
SList * imem; SCacheMem * imem;
} STsdbCache; } STsdbCache;
STsdbCache *tsdbInitCache(int maxBytes, int cacheBlockSize); STsdbCache *tsdbInitCache(int maxBytes, int cacheBlockSize);
void tsdbFreeCache(STsdbCache *pCache); void tsdbFreeCache(STsdbCache *pCache);
void * tsdbAllocFromCache(STsdbCache *pCache, int bytes); void * tsdbAllocFromCache(STsdbCache *pCache, int bytes, TSKEY key);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -18,11 +18,15 @@ ...@@ -18,11 +18,15 @@
#include <stdint.h> #include <stdint.h>
#include "taosdef.h" #include "taosdef.h"
#include "tglobalcfg.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
#define tsdbGetKeyFileId(key, daysPerFile, precision) ((key) / tsMsPerDay[(precision)] / (daysPerFile))
#define tsdbGetMaxNumOfFiles(keep, daysPerFile) ((keep) / (daysPerFile) + 3)
typedef enum { typedef enum {
TSDB_FILE_TYPE_HEAD = 0, // .head file type TSDB_FILE_TYPE_HEAD = 0, // .head file type
TSDB_FILE_TYPE_DATA, // .data file type TSDB_FILE_TYPE_DATA, // .data file type
...@@ -66,6 +70,7 @@ STsdbFileH *tsdbInitFile(char *dataDir, int32_t daysPerFile, int32_t keep, int32 ...@@ -66,6 +70,7 @@ STsdbFileH *tsdbInitFile(char *dataDir, int32_t daysPerFile, int32_t keep, int32
void tsdbCloseFile(STsdbFileH *pFileH); void tsdbCloseFile(STsdbFileH *pFileH);
int tsdbCreateFileGroup(char *dataDir, int fileId, SFileGroup *pFGroup, int maxTables); int tsdbCreateFileGroup(char *dataDir, int fileId, SFileGroup *pFGroup, int maxTables);
void tsdbGetKeyRangeOfFileId(int32_t daysPerFile, int8_t precision, int32_t fileId, TSKEY *minKey, TSKEY *maxKey);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -17,7 +17,7 @@ ...@@ -17,7 +17,7 @@
#include "tsdbCache.h" #include "tsdbCache.h"
static int tsdbAllocBlockFromPool(STsdbCache *pCache); static int tsdbAllocBlockFromPool(STsdbCache *pCache);
static void tsdbFreeBlockList(SList *list); static void tsdbFreeBlockList(SCacheMem *mem);
STsdbCache *tsdbInitCache(int maxBytes, int cacheBlockSize) { STsdbCache *tsdbInitCache(int maxBytes, int cacheBlockSize) {
STsdbCache *pCache = (STsdbCache *)calloc(1, sizeof(STsdbCache)); STsdbCache *pCache = (STsdbCache *)calloc(1, sizeof(STsdbCache));
...@@ -46,11 +46,8 @@ STsdbCache *tsdbInitCache(int maxBytes, int cacheBlockSize) { ...@@ -46,11 +46,8 @@ STsdbCache *tsdbInitCache(int maxBytes, int cacheBlockSize) {
tdListAppend(pPool->memPool, (void *)(&pBlock)); tdListAppend(pPool->memPool, (void *)(&pBlock));
} }
pCache->mem = tdListNew(sizeof(STsdbCacheBlock *)); pCache->mem = NULL;
if (pCache->mem == NULL) goto _err; pCache->imem = NULL;
pCache->imem = tdListNew(sizeof(STsdbCacheBlock *));
if (pCache->imem == NULL) goto _err;
return pCache; return pCache;
...@@ -66,11 +63,20 @@ void tsdbFreeCache(STsdbCache *pCache) { ...@@ -66,11 +63,20 @@ void tsdbFreeCache(STsdbCache *pCache) {
free(pCache); free(pCache);
} }
void *tsdbAllocFromCache(STsdbCache *pCache, int bytes) { void *tsdbAllocFromCache(STsdbCache *pCache, int bytes, TSKEY key) {
if (pCache == NULL) return NULL; if (pCache == NULL) return NULL;
if (bytes > pCache->cacheBlockSize) return NULL; if (bytes > pCache->cacheBlockSize) return NULL;
if (isListEmpty(pCache->mem)) { if (pCache->mem == NULL) { // Create a new one
pCache->mem = (SCacheMem *)malloc(sizeof(SCacheMem));
if (pCache->mem == NULL) return NULL;
pCache->mem->keyFirst = INT64_MAX;
pCache->mem->keyLast = 0;
pCache->mem->numOfPoints = 0;
pCache->mem->list = tdListNew(sizeof(STsdbCacheBlock *));
}
if (isListEmpty(pCache->mem->list)) {
if (tsdbAllocBlockFromPool(pCache) < 0) { if (tsdbAllocBlockFromPool(pCache) < 0) {
// TODO: deal with the error // TODO: deal with the error
} }
...@@ -86,12 +92,15 @@ void *tsdbAllocFromCache(STsdbCache *pCache, int bytes) { ...@@ -86,12 +92,15 @@ void *tsdbAllocFromCache(STsdbCache *pCache, int bytes) {
pCache->curBlock->offset += bytes; pCache->curBlock->offset += bytes;
pCache->curBlock->remain -= bytes; pCache->curBlock->remain -= bytes;
memset(ptr, 0, bytes); memset(ptr, 0, bytes);
if (key < pCache->mem->keyFirst) pCache->mem->keyFirst = key;
if (key > pCache->mem->keyLast) pCache->mem->keyLast = key;
return ptr; return ptr;
} }
static void tsdbFreeBlockList(SList *list) { static void tsdbFreeBlockList(SCacheMem *mem) {
if (list == NULL) return; if (mem == NULL) return;
SList * list = mem->list;
SListNode * node = NULL; SListNode * node = NULL;
STsdbCacheBlock *pBlock = NULL; STsdbCacheBlock *pBlock = NULL;
while ((node = tdListPopHead(list)) != NULL) { while ((node = tdListPopHead(list)) != NULL) {
...@@ -100,6 +109,7 @@ static void tsdbFreeBlockList(SList *list) { ...@@ -100,6 +109,7 @@ static void tsdbFreeBlockList(SList *list) {
listNodeFree(node); listNodeFree(node);
} }
tdListFree(list); tdListFree(list);
free(mem);
} }
static int tsdbAllocBlockFromPool(STsdbCache *pCache) { static int tsdbAllocBlockFromPool(STsdbCache *pCache) {
...@@ -114,7 +124,7 @@ static int tsdbAllocBlockFromPool(STsdbCache *pCache) { ...@@ -114,7 +124,7 @@ static int tsdbAllocBlockFromPool(STsdbCache *pCache) {
pBlock->offset = 0; pBlock->offset = 0;
pBlock->remain = pCache->cacheBlockSize; pBlock->remain = pCache->cacheBlockSize;
tdListAppendNode(pCache->mem, node); tdListAppendNode(pCache->mem->list, node);
pCache->curBlock = pBlock; pCache->curBlock = pBlock;
return 0; return 0;
......
...@@ -22,15 +22,11 @@ ...@@ -22,15 +22,11 @@
#include <sys/types.h> #include <sys/types.h>
#include <unistd.h> #include <unistd.h>
#include "tglobalcfg.h"
#include "tsdbFile.h" #include "tsdbFile.h"
#define TSDB_FILE_HEAD_SIZE 512 #define TSDB_FILE_HEAD_SIZE 512
#define TSDB_FILE_DELIMITER 0xF00AFA0F #define TSDB_FILE_DELIMITER 0xF00AFA0F
#define tsdbGetKeyFileId(key, daysPerFile, precision) ((key) / tsMsPerDay[(precision)] / (daysPerFile))
#define tsdbGetMaxNumOfFiles(keep, daysPerFile) ((keep) / (daysPerFile) + 3)
typedef struct { typedef struct {
int32_t len; int32_t len;
int32_t padding; // For padding purpose int32_t padding; // For padding purpose
...@@ -228,7 +224,7 @@ STsdbFileH *tsdbInitFile(char *dataDir, int32_t daysPerFile, int32_t keep, int32 ...@@ -228,7 +224,7 @@ STsdbFileH *tsdbInitFile(char *dataDir, int32_t daysPerFile, int32_t keep, int32
return pTsdbFileH; return pTsdbFileH;
} }
static void tsdbGetKeyRangeOfFileId(int32_t daysPerFile, int8_t precision, int32_t fileId, TSKEY *minKey, void tsdbGetKeyRangeOfFileId(int32_t daysPerFile, int8_t precision, int32_t fileId, TSKEY *minKey,
TSKEY *maxKey) { TSKEY *maxKey) {
*minKey = fileId * daysPerFile * tsMsPerDay[precision]; *minKey = fileId * daysPerFile * tsMsPerDay[precision];
*maxKey = *minKey + daysPerFile * tsMsPerDay[precision] - 1; *maxKey = *minKey + daysPerFile * tsMsPerDay[precision] - 1;
......
...@@ -316,8 +316,9 @@ int32_t tsdbTriggerCommit(tsdb_repo_t *repo) { ...@@ -316,8 +316,9 @@ int32_t tsdbTriggerCommit(tsdb_repo_t *repo) {
pTable->mem = NULL; pTable->mem = NULL;
} }
} }
// Loop to move mem to imem // TODO: Loop to move mem to imem
tdListMove(pRepo->tsdbCache->mem, pRepo->tsdbCache->imem); pRepo->tsdbCache->imem = pRepo->tsdbCache->mem;
pRepo->tsdbCache->mem = NULL;
pthread_create(&(pRepo->commitThread), NULL, tsdbCommitToFile, (void *)repo); pthread_create(&(pRepo->commitThread), NULL, tsdbCommitToFile, (void *)repo);
pthread_mutex_unlock(&(pRepo->mutex)); pthread_mutex_unlock(&(pRepo->mutex));
...@@ -678,8 +679,9 @@ static int32_t tdInsertRowToTable(STsdbRepo *pRepo, SDataRow row, STable *pTable ...@@ -678,8 +679,9 @@ static int32_t tdInsertRowToTable(STsdbRepo *pRepo, SDataRow row, STable *pTable
tSkipListRandNodeInfo(pTable->mem->pData, &level, &headSize); tSkipListRandNodeInfo(pTable->mem->pData, &level, &headSize);
TSKEY key = dataRowKey(row);
// Copy row into the memory // Copy row into the memory
SSkipListNode *pNode = tsdbAllocFromCache(pRepo->tsdbCache, headSize + dataRowLen(row)); SSkipListNode *pNode = tsdbAllocFromCache(pRepo->tsdbCache, headSize + dataRowLen(row), key);
if (pNode == NULL) { if (pNode == NULL) {
// TODO: deal with allocate failure // TODO: deal with allocate failure
} }
...@@ -689,7 +691,6 @@ static int32_t tdInsertRowToTable(STsdbRepo *pRepo, SDataRow row, STable *pTable ...@@ -689,7 +691,6 @@ static int32_t tdInsertRowToTable(STsdbRepo *pRepo, SDataRow row, STable *pTable
// Insert the skiplist node into the data // Insert the skiplist node into the data
tSkipListPut(pTable->mem->pData, pNode); tSkipListPut(pTable->mem->pData, pNode);
TSKEY key = dataRowKey(row);
if (key > pTable->mem->keyLast) pTable->mem->keyLast = key; if (key > pTable->mem->keyLast) pTable->mem->keyLast = key;
if (key < pTable->mem->keyFirst) pTable->mem->keyFirst = key; if (key < pTable->mem->keyFirst) pTable->mem->keyFirst = key;
pTable->mem->numOfPoints++; pTable->mem->numOfPoints++;
...@@ -716,20 +717,24 @@ static int32_t tsdbInsertDataToTable(tsdb_repo_t *repo, SSubmitBlk *pBlock) { ...@@ -716,20 +717,24 @@ static int32_t tsdbInsertDataToTable(tsdb_repo_t *repo, SSubmitBlk *pBlock) {
return 0; return 0;
} }
// Commit to file
static void *tsdbCommitToFile(void *arg) { static void *tsdbCommitToFile(void *arg) {
// TODO // TODO
STsdbRepo *pRepo = (STsdbRepo *)arg; STsdbRepo *pRepo = (STsdbRepo *)arg;
STsdbMeta *pMeta = pRepo->tsdbMeta; STsdbMeta *pMeta = pRepo->tsdbMeta;
for (int i = 0; i < pRepo->config.maxTables; i++) { for (int i = 0; i < pRepo->config.maxTables; i++) { // Loop over table
STable *pTable = pMeta->tables[i]; STable *pTable = pMeta->tables[i];
if (pTable == NULL) continue; if (pTable == NULL || pTable->imem == NULL) continue;
SSkipListIterator *pIter = tSkipListCreateIter(pTable->imem->pData);
SMemTable *pMem = pTable->imem;
SSkipListIterator *pIter = tSkipListCreateIter(pMem->pData);
// Loop to commit to file
while (tSkipListIterNext(pIter)) { while (tSkipListIterNext(pIter)) {
SSkipListNode *node = tSkipListIterGet(pIter); SSkipListNode *node = tSkipListIterGet(pIter);
SDataRow row = SL_GET_NODE_DATA(node); SDataRow row = SL_GET_NODE_DATA(node);
int k = 0; int k = 0;
} }
tSkipListDestroyIter(pIter);
} }
return NULL; return NULL;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册