提交 44d5644c 编写于 作者: H Hongze Cheng

TD-353

上级 d4e84b4f
...@@ -42,7 +42,7 @@ void *tdEncodeSchema(void *buf, STSchema *pSchema) { ...@@ -42,7 +42,7 @@ void *tdEncodeSchema(void *buf, STSchema *pSchema) {
STColumn *pCol = schemaColAt(pSchema, i); STColumn *pCol = schemaColAt(pSchema, i);
buf = taosEncodeFixedI8(buf, colType(pCol)); buf = taosEncodeFixedI8(buf, colType(pCol));
buf = taosEncodeFixedI16(buf, colColId(pCol)); buf = taosEncodeFixedI16(buf, colColId(pCol));
buf = taosEncodeFixedI32(buf, colBytes(pCol)) : buf = taosEncodeFixedI32(buf, colBytes(pCol));
} }
return buf; return buf;
...@@ -54,6 +54,7 @@ void *tdEncodeSchema(void *buf, STSchema *pSchema) { ...@@ -54,6 +54,7 @@ void *tdEncodeSchema(void *buf, STSchema *pSchema) {
void *tdDecodeSchema(void *buf, STSchema **pRSchema) { void *tdDecodeSchema(void *buf, STSchema **pRSchema) {
int version = 0; int version = 0;
int numOfCols = 0; int numOfCols = 0;
STSchemaBuilder schemaBuilder;
buf = taosDecodeFixedI32(buf, &version); buf = taosDecodeFixedI32(buf, &version);
buf = taosDecodeFixedI32(buf, &numOfCols); buf = taosDecodeFixedI32(buf, &numOfCols);
......
...@@ -362,7 +362,7 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size); ...@@ -362,7 +362,7 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size);
#define TAOS_QTYPE_WAL 2 #define TAOS_QTYPE_WAL 2
#define TAOS_QTYPE_CQ 3 #define TAOS_QTYPE_CQ 3
typedef enum : uint8_t{ typedef enum {
TSDB_SUPER_TABLE = 0, // super table TSDB_SUPER_TABLE = 0, // super table
TSDB_CHILD_TABLE = 1, // table created from super table TSDB_CHILD_TABLE = 1, // table created from super table
TSDB_NORMAL_TABLE = 2, // ordinary table TSDB_NORMAL_TABLE = 2, // ordinary table
......
...@@ -40,26 +40,25 @@ extern int tsdbDebugFlag; ...@@ -40,26 +40,25 @@ extern int tsdbDebugFlag;
#define TSDB_MAX_TABLE_SCHEMAS 16 #define TSDB_MAX_TABLE_SCHEMAS 16
#define TSDB_FILE_HEAD_SIZE 512 #define TSDB_FILE_HEAD_SIZE 512
#define TSDB_FILE_DELIMITER 0xF00AFA0F #define TSDB_FILE_DELIMITER 0xF00AFA0F
#define
// Definitions // Definitions
// ------------------ tsdbMeta.c // ------------------ tsdbMeta.c
typedef struct STable { typedef struct STable {
ETableType type; ETableType type;
tstr* name; // NOTE: there a flexible string here tstr* name; // NOTE: there a flexible string here
STableId tableId; STableId tableId;
uint64_t suid; uint64_t suid;
STable* pSuper; // super table pointer struct STable* pSuper; // super table pointer
uint8_t numOfSchemas; uint8_t numOfSchemas;
STSchema schema[TSDB_MAX_TABLE_SCHEMAS]; STSchema schema[TSDB_MAX_TABLE_SCHEMAS];
STSchema* tagSchema; STSchema* tagSchema;
SKVRow tagVal; SKVRow tagVal;
void* pIndex; // For TSDB_SUPER_TABLE, it is the skiplist index void* pIndex; // For TSDB_SUPER_TABLE, it is the skiplist index
void* eventHandler; // TODO void* eventHandler; // TODO
void* streamHandler; // TODO void* streamHandler; // TODO
TSKEY lastKey; // lastkey inserted in this table, initialized as 0, TODO: make a structure TSKEY lastKey; // lastkey inserted in this table, initialized as 0, TODO: make a structure
char* sql; char* sql;
void* cqhandle; void* cqhandle;
T_REF_DECLARE(); T_REF_DECLARE();
} STable; } STable;
...@@ -149,6 +148,25 @@ typedef struct { ...@@ -149,6 +148,25 @@ typedef struct {
int direction; int direction;
} SFileGroupIter; } SFileGroupIter;
// ------------------ tsdbMain.c
typedef struct {
int8_t state;
char* rootDir;
STsdbCfg config;
STsdbAppH appH;
STsdbStat stat;
STsdbMeta* tsdbMeta;
STsdbBufPool* pPool;
SMemTable* mem;
SMemTable* imem;
STsdbFileH* tsdbFileH;
int commit;
pthread_t commitThread;
pthread_mutex_t mutex;
bool repoLocked;
} STsdbRepo;
// ------------------ tsdbRWHelper.c // ------------------ tsdbRWHelper.c
typedef struct { typedef struct {
uint32_t len; uint32_t len;
...@@ -241,24 +259,6 @@ typedef struct { ...@@ -241,24 +259,6 @@ typedef struct {
void* compBuffer; // Buffer for temperary compress/decompress purpose void* compBuffer; // Buffer for temperary compress/decompress purpose
} SRWHelper; } SRWHelper;
// ------------------ tsdbMain.c
typedef struct {
int8_t state;
char* rootDir;
STsdbCfg config;
STsdbAppH appH;
STsdbStat stat;
STsdbMeta* tsdbMeta;
STsdbBufPool* pPool;
SMemTable* mem;
SMemTable* imem;
STsdbFileH* tsdbFileH;
int commit;
pthread_t commitThread;
pthread_mutex_t mutex;
bool repoLocked;
} STsdbRepo;
// Operations // Operations
// ------------------ tsdbMeta.c // ------------------ tsdbMeta.c
...@@ -289,7 +289,7 @@ void tsdbUnRefTable(STable* pTable); ...@@ -289,7 +289,7 @@ void tsdbUnRefTable(STable* pTable);
STsdbBufPool* tsdbNewBufPool(); STsdbBufPool* tsdbNewBufPool();
void tsdbFreeBufPool(STsdbBufPool* pBufPool); void tsdbFreeBufPool(STsdbBufPool* pBufPool);
int tsdbOpenBufPool(STsdbRepo* pRepo); int tsdbOpenBufPool(STsdbRepo* pRepo);
int tsdbOpenBufPool(STsdbRepo* pRepo); void tsdbCloseBufPool(STsdbRepo* pRepo);
SListNode* tsdbAllocBufBlockFromPool(STsdbRepo* pRepo); SListNode* tsdbAllocBufBlockFromPool(STsdbRepo* pRepo);
// ------------------ tsdbMemTable.c // ------------------ tsdbMemTable.c
...@@ -333,7 +333,6 @@ void tsdbFreeFileH(STsdbFileH* pFileH); ...@@ -333,7 +333,6 @@ void tsdbFreeFileH(STsdbFileH* pFileH);
char* tsdbGetMetaFileName(char* rootDir); char* tsdbGetMetaFileName(char* rootDir);
int tsdbLockRepo(STsdbRepo* pRepo); int tsdbLockRepo(STsdbRepo* pRepo);
int tsdbUnlockRepo(STsdbRepo* pRepo); int tsdbUnlockRepo(STsdbRepo* pRepo);
void* tsdbCommitData(void* arg);
#if 0 #if 0
......
...@@ -86,7 +86,7 @@ int tsdbOpenBufPool(STsdbRepo *pRepo) { ...@@ -86,7 +86,7 @@ int tsdbOpenBufPool(STsdbRepo *pRepo) {
} }
tsdbTrace("vgId:%d buffer pool is opened! bufBlockSize:%d tBufBlocks:%d nBufBlocks:%d", REPO_ID(pRepo), tsdbTrace("vgId:%d buffer pool is opened! bufBlockSize:%d tBufBlocks:%d nBufBlocks:%d", REPO_ID(pRepo),
pBufPool->bufBlockSize, pBufPool->tBufBlocks, pBufPool->nBufBlocks); pPool->bufBlockSize, pPool->tBufBlocks, pPool->nBufBlocks);
return 0; return 0;
...@@ -113,11 +113,10 @@ void tsdbCloseBufPool(STsdbRepo *pRepo) { ...@@ -113,11 +113,10 @@ void tsdbCloseBufPool(STsdbRepo *pRepo) {
} }
SListNode *tsdbAllocBufBlockFromPool(STsdbRepo *pRepo) { SListNode *tsdbAllocBufBlockFromPool(STsdbRepo *pRepo) {
ASSERT(pRepo != NULL && pRepo->pool != NULL); ASSERT(pRepo != NULL && pRepo->pPool != NULL);
ASSERT(IS_REPO_LOCKED(pRepo)); ASSERT(IS_REPO_LOCKED(pRepo));
STsdbCfg * pCfg = &pRepo->config; STsdbBufPool *pBufPool = pRepo->pPool;
STsdbBufPool *pBufPool = pRepo->pool;
while (POOL_IS_EMPTY(pBufPool)) { while (POOL_IS_EMPTY(pBufPool)) {
pthread_cond_wait(&(pBufPool->poolNotEmpty), &(pRepo->mutex)); pthread_cond_wait(&(pBufPool->poolNotEmpty), &(pRepo->mutex));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册