提交 f3c92f8e 编写于 作者: H hzcheng

refactor and add more code

上级 7e0ece01
...@@ -91,7 +91,7 @@ SDataRow tdNewDataRow(int32_t bytes); ...@@ -91,7 +91,7 @@ SDataRow tdNewDataRow(int32_t bytes);
int tdMaxRowBytesFromSchema(STSchema *pSchema); int tdMaxRowBytesFromSchema(STSchema *pSchema);
SDataRow tdNewDataRowFromSchema(STSchema *pSchema); SDataRow tdNewDataRowFromSchema(STSchema *pSchema);
void tdFreeDataRow(SDataRow row); void tdFreeDataRow(SDataRow row);
// int32_t tdAppendColVal(SDataRow row, void *value, SColumn *pCol, int32_t suffixOffset); int tdAppendColVal(SDataRow row, void *value, STColumn *pCol, int32_t suffixOffset);
void tdDataRowCpy(void *dst, SDataRow row); void tdDataRowCpy(void *dst, SDataRow row);
void tdDataRowReset(SDataRow row); void tdDataRowReset(SDataRow row);
SDataRow tdDataRowDup(SDataRow row); SDataRow tdDataRowDup(SDataRow row);
......
...@@ -91,14 +91,6 @@ int tsdbCreateTable(tsdb_repo_t *repo, STableCfg *pCfg); ...@@ -91,14 +91,6 @@ int tsdbCreateTable(tsdb_repo_t *repo, STableCfg *pCfg);
int tsdbDropTable(tsdb_repo_t *pRepo, STableId tableId); int tsdbDropTable(tsdb_repo_t *pRepo, STableId tableId);
int tsdbAlterTable(tsdb_repo_t *repo, STableCfg *pCfg); int tsdbAlterTable(tsdb_repo_t *repo, STableCfg *pCfg);
// Submit message for this TSDB
typedef struct {
int32_t numOfTables;
int32_t compressed;
char data[];
} SSubmitMsg;
// Submit message for one table // Submit message for one table
typedef struct { typedef struct {
STableId tableId; STableId tableId;
...@@ -106,8 +98,26 @@ typedef struct { ...@@ -106,8 +98,26 @@ typedef struct {
int32_t sversion; // data schema version int32_t sversion; // data schema version
int32_t len; // message length int32_t len; // message length
char data[]; char data[];
} SSubmitBlock; } SSubmitBlk;
// Submit message for this TSDB
typedef struct {
int32_t length;
int32_t compressed;
SSubmitBlk blocks[];
} SSubmitMsg;
#define TSDB_SUBMIT_MSG_HEAD_SIZE sizeof(SSubmitMsg)
// SSubmitMsg Iterator
typedef struct {
int32_t totalLen;
int32_t len;
SSubmitBlk *pBlock;
} SSubmitMsgIter;
int tsdbInitSubmitMsgIter(SSubmitMsg *pMsg, SSubmitMsgIter *pIter);
SSubmitBlk *tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter);
// the TSDB repository info // the TSDB repository info
typedef struct STsdbRepoInfo { typedef struct STsdbRepoInfo {
......
...@@ -78,7 +78,7 @@ static int32_t tsdbSetRepoEnv(STsdbRepo *pRepo); ...@@ -78,7 +78,7 @@ static int32_t tsdbSetRepoEnv(STsdbRepo *pRepo);
static int32_t tsdbDestroyRepoEnv(STsdbRepo *pRepo); static int32_t tsdbDestroyRepoEnv(STsdbRepo *pRepo);
static int tsdbOpenMetaFile(char *tsdbDir); static int tsdbOpenMetaFile(char *tsdbDir);
static int tsdbRecoverRepo(int fd, STsdbCfg *pCfg); static int tsdbRecoverRepo(int fd, STsdbCfg *pCfg);
static int32_t tsdbInsertDataToTable(tsdb_repo_t *repo, SSubmitBlock *pBlock); static int32_t tsdbInsertDataToTable(tsdb_repo_t *repo, SSubmitBlk *pBlock);
#define TSDB_GET_TABLE_BY_ID(pRepo, sid) (((STSDBRepo *)pRepo)->pTableList)[sid] #define TSDB_GET_TABLE_BY_ID(pRepo, sid) (((STSDBRepo *)pRepo)->pTableList)[sid]
#define TSDB_GET_TABLE_BY_NAME(pRepo, name) #define TSDB_GET_TABLE_BY_NAME(pRepo, name)
...@@ -322,14 +322,14 @@ STableInfo *tsdbGetTableInfo(tsdb_repo_t *pRepo, STableId tableId) { ...@@ -322,14 +322,14 @@ STableInfo *tsdbGetTableInfo(tsdb_repo_t *pRepo, STableId tableId) {
// TODO: need to return the number of data inserted // TODO: need to return the number of data inserted
int32_t tsdbInsertData(tsdb_repo_t *repo, SSubmitMsg *pMsg) { int32_t tsdbInsertData(tsdb_repo_t *repo, SSubmitMsg *pMsg) {
SSubmitBlock *pBlock = (SSubmitBlock *)pMsg->data; SSubmitBlk *pBlock = (SSubmitBlk *)pMsg->blocks;
for (int i = 0; i < pMsg->numOfTables; i++) { // Loop to deal with the submit message // for (int i = 0; i < pMsg->numOfTables; i++) { // Loop to deal with the submit message
if (tsdbInsertDataToTable(repo, pBlock) < 0) { // if (tsdbInsertDataToTable(repo, pBlock) < 0) {
return -1; // return -1;
} // }
pBlock = (SSubmitBlock *)(((char *)pBlock) + sizeof(SSubmitBlock) + pBlock->len); // pBlock = (SSubmitBlk *)(((char *)pBlock) + sizeof(SSubmitBlk) + pBlock->len);
} // }
return 0; return 0;
} }
...@@ -415,6 +415,34 @@ void tsdbClearTableCfg(STableCfg *config) { ...@@ -415,6 +415,34 @@ void tsdbClearTableCfg(STableCfg *config) {
if (config->tagValues) tdFreeDataRow(config->tagValues); if (config->tagValues) tdFreeDataRow(config->tagValues);
} }
int tsdbInitSubmitMsgIter(SSubmitMsg *pMsg, SSubmitMsgIter *pIter) {
if (pMsg == NULL || pIter == NULL) return -1;
pIter->totalLen = pMsg->length;
pIter->len = TSDB_SUBMIT_MSG_HEAD_SIZE;
if (pMsg->length <= TSDB_SUBMIT_MSG_HEAD_SIZE) {
pIter->pBlock = NULL;
} else {
pIter->pBlock = pMsg->blocks;
}
return 0;
}
SSubmitBlk *tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter) {
SSubmitBlk *pBlock = pIter->pBlock;
if (pBlock == NULL) return NULL;
pIter->len += pBlock->len;
if (pIter->len >= pIter->totalLen) {
pIter->pBlock = NULL;
} else {
pIter->pBlock = (char *)pBlock + pBlock->len;
}
return pBlock;
}
// Check the configuration and set default options // Check the configuration and set default options
static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg) { static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg) {
// Check precision // Check precision
...@@ -601,7 +629,7 @@ static int32_t tdInsertRowToTable(STsdbRepo *pRepo, SDataRow row, STable *pTable ...@@ -601,7 +629,7 @@ static int32_t tdInsertRowToTable(STsdbRepo *pRepo, SDataRow row, STable *pTable
return 0; return 0;
} }
static int32_t tsdbInsertDataToTable(tsdb_repo_t *repo, SSubmitBlock *pBlock) { static int32_t tsdbInsertDataToTable(tsdb_repo_t *repo, SSubmitBlk *pBlock) {
STsdbRepo *pRepo = (STsdbRepo *)repo; STsdbRepo *pRepo = (STsdbRepo *)repo;
STable *pTable = tsdbIsValidTableToInsert(pRepo->tsdbMeta, pBlock->tableId); STable *pTable = tsdbIsValidTableToInsert(pRepo->tsdbMeta, pBlock->tableId);
......
...@@ -17,11 +17,10 @@ TEST(TsdbTest, createRepo) { ...@@ -17,11 +17,10 @@ TEST(TsdbTest, createRepo) {
ASSERT_EQ(tsdbInitTableCfg(&tCfg, TSDB_SUPER_TABLE, 987607499877672L, 0), -1); ASSERT_EQ(tsdbInitTableCfg(&tCfg, TSDB_SUPER_TABLE, 987607499877672L, 0), -1);
ASSERT_EQ(tsdbInitTableCfg(&tCfg, TSDB_NTABLE, 987607499877672L, 0), 0); ASSERT_EQ(tsdbInitTableCfg(&tCfg, TSDB_NTABLE, 987607499877672L, 0), 0);
int nCols = 5; int nCols = 5;
STSchema *schema = tdNewSchema(nCols); STSchema *schema = tdNewSchema(nCols);
for (int i = 0; i < nCols; i++) for (int i = 0; i < nCols; i++) {
{
if (i == 0) { if (i == 0) {
tdSchemaAppendCol(schema, TSDB_DATA_TYPE_TIMESTAMP, i, -1); tdSchemaAppendCol(schema, TSDB_DATA_TYPE_TIMESTAMP, i, -1);
} else { } else {
...@@ -34,8 +33,14 @@ TEST(TsdbTest, createRepo) { ...@@ -34,8 +33,14 @@ TEST(TsdbTest, createRepo) {
tsdbCreateTable(pRepo, &tCfg); tsdbCreateTable(pRepo, &tCfg);
// 3. Loop to write some simple data // 3. Loop to write some simple data
SDataRow row = tdNewDataRowFromSchema(schema); // int size = tdMaxRowBytesFromSchema(schema);
for (int i = 0; i < nCols; i++) { // int nrows = 100;
} // SSubmitMsg *pMsg = (SSubmitMsg *)malloc(sizeof(SSubmitMsg) + sizeof(SSubmitBlk+ size * nrows);
// {
// // TODO
// }
// tsdbInsertData(pRepo, pMsg);
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册