diff --git a/src/common/inc/dataformat.h b/src/common/inc/dataformat.h index 76b5317b3553db797efd34c74a5c640ae1af521d..08e49dd3c5c6045e074baeac38cac0c99e3b4ded 100644 --- a/src/common/inc/dataformat.h +++ b/src/common/inc/dataformat.h @@ -91,7 +91,7 @@ SDataRow tdNewDataRow(int32_t bytes); int tdMaxRowBytesFromSchema(STSchema *pSchema); SDataRow tdNewDataRowFromSchema(STSchema *pSchema); void tdFreeDataRow(SDataRow row); -// int32_t tdAppendColVal(SDataRow row, void *value, SColumn *pCol, int32_t suffixOffset); +int tdAppendColVal(SDataRow row, void *value, STColumn *pCol, int32_t suffixOffset); void tdDataRowCpy(void *dst, SDataRow row); void tdDataRowReset(SDataRow row); SDataRow tdDataRowDup(SDataRow row); diff --git a/src/vnode/tsdb/inc/tsdb.h b/src/vnode/tsdb/inc/tsdb.h index f7c37009e37aec4163fab6c52454e2530ec7b52a..c41384114abd7eb039fd8c4ad0e442eaf92c8ec1 100644 --- a/src/vnode/tsdb/inc/tsdb.h +++ b/src/vnode/tsdb/inc/tsdb.h @@ -91,14 +91,6 @@ int tsdbCreateTable(tsdb_repo_t *repo, STableCfg *pCfg); int tsdbDropTable(tsdb_repo_t *pRepo, STableId tableId); int tsdbAlterTable(tsdb_repo_t *repo, STableCfg *pCfg); - -// Submit message for this TSDB -typedef struct { - int32_t numOfTables; - int32_t compressed; - char data[]; -} SSubmitMsg; - // Submit message for one table typedef struct { STableId tableId; @@ -106,8 +98,26 @@ typedef struct { int32_t sversion; // data schema version int32_t len; // message length char data[]; -} SSubmitBlock; +} SSubmitBlk; + +// Submit message for this TSDB +typedef struct { + int32_t length; + int32_t compressed; + SSubmitBlk blocks[]; +} SSubmitMsg; + +#define TSDB_SUBMIT_MSG_HEAD_SIZE sizeof(SSubmitMsg) + +// SSubmitMsg Iterator +typedef struct { + int32_t totalLen; + int32_t len; + SSubmitBlk *pBlock; +} SSubmitMsgIter; +int tsdbInitSubmitMsgIter(SSubmitMsg *pMsg, SSubmitMsgIter *pIter); +SSubmitBlk *tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter); // the TSDB repository info typedef struct STsdbRepoInfo { diff --git a/src/vnode/tsdb/src/tsdbMain.c b/src/vnode/tsdb/src/tsdbMain.c index 147dcff2a5a24eaf59ceacdc1ece069c566a39aa..37901ba588b91b535b0ec1734e6f393f8afa20de 100644 --- a/src/vnode/tsdb/src/tsdbMain.c +++ b/src/vnode/tsdb/src/tsdbMain.c @@ -78,7 +78,7 @@ static int32_t tsdbSetRepoEnv(STsdbRepo *pRepo); static int32_t tsdbDestroyRepoEnv(STsdbRepo *pRepo); static int tsdbOpenMetaFile(char *tsdbDir); static int tsdbRecoverRepo(int fd, STsdbCfg *pCfg); -static int32_t tsdbInsertDataToTable(tsdb_repo_t *repo, SSubmitBlock *pBlock); +static int32_t tsdbInsertDataToTable(tsdb_repo_t *repo, SSubmitBlk *pBlock); #define TSDB_GET_TABLE_BY_ID(pRepo, sid) (((STSDBRepo *)pRepo)->pTableList)[sid] #define TSDB_GET_TABLE_BY_NAME(pRepo, name) @@ -322,14 +322,14 @@ STableInfo *tsdbGetTableInfo(tsdb_repo_t *pRepo, STableId tableId) { // TODO: need to return the number of data inserted int32_t tsdbInsertData(tsdb_repo_t *repo, SSubmitMsg *pMsg) { - SSubmitBlock *pBlock = (SSubmitBlock *)pMsg->data; + SSubmitBlk *pBlock = (SSubmitBlk *)pMsg->blocks; - for (int i = 0; i < pMsg->numOfTables; i++) { // Loop to deal with the submit message - if (tsdbInsertDataToTable(repo, pBlock) < 0) { - return -1; - } - pBlock = (SSubmitBlock *)(((char *)pBlock) + sizeof(SSubmitBlock) + pBlock->len); - } + // for (int i = 0; i < pMsg->numOfTables; i++) { // Loop to deal with the submit message + // if (tsdbInsertDataToTable(repo, pBlock) < 0) { + // return -1; + // } + // pBlock = (SSubmitBlk *)(((char *)pBlock) + sizeof(SSubmitBlk) + pBlock->len); + // } return 0; } @@ -415,6 +415,34 @@ void tsdbClearTableCfg(STableCfg *config) { if (config->tagValues) tdFreeDataRow(config->tagValues); } +int tsdbInitSubmitMsgIter(SSubmitMsg *pMsg, SSubmitMsgIter *pIter) { + if (pMsg == NULL || pIter == NULL) return -1; + + pIter->totalLen = pMsg->length; + pIter->len = TSDB_SUBMIT_MSG_HEAD_SIZE; + if (pMsg->length <= TSDB_SUBMIT_MSG_HEAD_SIZE) { + pIter->pBlock = NULL; + } else { + pIter->pBlock = pMsg->blocks; + } + + return 0; +} + +SSubmitBlk *tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter) { + SSubmitBlk *pBlock = pIter->pBlock; + if (pBlock == NULL) return NULL; + + pIter->len += pBlock->len; + if (pIter->len >= pIter->totalLen) { + pIter->pBlock = NULL; + } else { + pIter->pBlock = (char *)pBlock + pBlock->len; + } + + return pBlock; +} + // Check the configuration and set default options static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg) { // Check precision @@ -601,7 +629,7 @@ static int32_t tdInsertRowToTable(STsdbRepo *pRepo, SDataRow row, STable *pTable return 0; } -static int32_t tsdbInsertDataToTable(tsdb_repo_t *repo, SSubmitBlock *pBlock) { +static int32_t tsdbInsertDataToTable(tsdb_repo_t *repo, SSubmitBlk *pBlock) { STsdbRepo *pRepo = (STsdbRepo *)repo; STable *pTable = tsdbIsValidTableToInsert(pRepo->tsdbMeta, pBlock->tableId); diff --git a/src/vnode/tsdb/tests/tsdbTests.cpp b/src/vnode/tsdb/tests/tsdbTests.cpp index 34a712dc0fb21bf73b9a3cdff721dadd154713b7..1aa85c0808db26afb87a45536b3211365fe24ed0 100644 --- a/src/vnode/tsdb/tests/tsdbTests.cpp +++ b/src/vnode/tsdb/tests/tsdbTests.cpp @@ -17,11 +17,10 @@ TEST(TsdbTest, createRepo) { ASSERT_EQ(tsdbInitTableCfg(&tCfg, TSDB_SUPER_TABLE, 987607499877672L, 0), -1); ASSERT_EQ(tsdbInitTableCfg(&tCfg, TSDB_NTABLE, 987607499877672L, 0), 0); - int nCols = 5; + int nCols = 5; STSchema *schema = tdNewSchema(nCols); - for (int i = 0; i < nCols; i++) - { + for (int i = 0; i < nCols; i++) { if (i == 0) { tdSchemaAppendCol(schema, TSDB_DATA_TYPE_TIMESTAMP, i, -1); } else { @@ -34,8 +33,14 @@ TEST(TsdbTest, createRepo) { tsdbCreateTable(pRepo, &tCfg); // 3. Loop to write some simple data - SDataRow row = tdNewDataRowFromSchema(schema); - for (int i = 0; i < nCols; i++) { - } + // int size = tdMaxRowBytesFromSchema(schema); + // int nrows = 100; + // SSubmitMsg *pMsg = (SSubmitMsg *)malloc(sizeof(SSubmitMsg) + sizeof(SSubmitBlk+ size * nrows); + + // { + // // TODO + // } + + // tsdbInsertData(pRepo, pMsg); }