diff --git a/src/vnode/tsdb/inc/tsdb.h b/src/vnode/tsdb/inc/tsdb.h index 8c3fadf486acb4be3997eed299f6aa528b9432f3..267b462b91dbd07ac512b02681fb3efc6fdeffab 100644 --- a/src/vnode/tsdb/inc/tsdb.h +++ b/src/vnode/tsdb/inc/tsdb.h @@ -101,6 +101,15 @@ typedef struct { char data[]; } SSubmitBlk; +typedef struct { + int32_t totalLen; + int32_t len; + SDataRow row; +} SSubmitBlkIter; + +int tsdbInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter); +SDataRow tsdbGetSubmitBlkNext(SSubmitBlkIter *pIter); + // Submit message for this TSDB typedef struct { int32_t length; @@ -117,7 +126,7 @@ typedef struct { SSubmitBlk *pBlock; } SSubmitMsgIter; -int tsdbInitSubmitMsgIter(SSubmitMsg *pMsg, SSubmitMsgIter *pIter); +int tsdbInitSubmitMsgIter(SSubmitMsg *pMsg, SSubmitMsgIter *pIter); SSubmitBlk *tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter); // the TSDB repository info diff --git a/src/vnode/tsdb/src/tsdbMain.c b/src/vnode/tsdb/src/tsdbMain.c index f3aed75e790c3aba4e3baf73bfece7eab5a97095..0f70875c635feb05f2bac8e0b7261cc74cf04b07 100644 --- a/src/vnode/tsdb/src/tsdbMain.c +++ b/src/vnode/tsdb/src/tsdbMain.c @@ -322,14 +322,15 @@ STableInfo *tsdbGetTableInfo(tsdb_repo_t *pRepo, STableId tableId) { // TODO: need to return the number of data inserted int32_t tsdbInsertData(tsdb_repo_t *repo, SSubmitMsg *pMsg) { - SSubmitBlk *pBlock = (SSubmitBlk *)pMsg->blocks; + SSubmitMsgIter msgIter; - // for (int i = 0; i < pMsg->numOfTables; i++) { // Loop to deal with the submit message - // if (tsdbInsertDataToTable(repo, pBlock) < 0) { - // return -1; - // } - // pBlock = (SSubmitBlk *)(((char *)pBlock) + sizeof(SSubmitBlk) + pBlock->len); - // } + tsdbInitSubmitMsgIter(pMsg, &msgIter); + SSubmitBlk *pBlock; + while ((pBlock = tsdbGetSubmitMsgNext(&msgIter)) != NULL) { + if (tsdbInsertDataToTable(repo, pBlock) < 0) { + return -1; + } + } return 0; } @@ -415,6 +416,28 @@ void tsdbClearTableCfg(STableCfg *config) { if (config->tagValues) tdFreeDataRow(config->tagValues); } +int tsdbInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter) { + if (pBlock->len <= 0) return -1; + pIter->totalLen = pBlock->len; + pIter->len = 0; + pIter->row = (SDataRow)(pBlock->data); + return 0; +} + +SDataRow tsdbGetSubmitBlkNext(SSubmitBlkIter *pIter) { + SDataRow row = pIter->row; + if (row == NULL) return NULL; + + pIter->len += dataRowLen(row); + if (pIter->len >= pIter->totalLen) { + pIter->row = NULL; + } else { + pIter->row = (char *)row + dataRowLen(row); + } + + return row; +} + int tsdbInitSubmitMsgIter(SSubmitMsg *pMsg, SSubmitMsgIter *pIter) { if (pMsg == NULL || pIter == NULL) return -1; @@ -433,11 +456,11 @@ SSubmitBlk *tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter) { SSubmitBlk *pBlock = pIter->pBlock; if (pBlock == NULL) return NULL; - pIter->len += pBlock->len; + pIter->len = pIter->len + sizeof(SSubmitBlk) + pBlock->len; if (pIter->len >= pIter->totalLen) { pIter->pBlock = NULL; } else { - pIter->pBlock = (char *)pBlock + pBlock->len; + pIter->pBlock = (char *)pBlock + pBlock->len + sizeof(SSubmitBlk); } return pBlock; @@ -633,19 +656,15 @@ static int32_t tsdbInsertDataToTable(tsdb_repo_t *repo, SSubmitBlk *pBlock) { STsdbRepo *pRepo = (STsdbRepo *)repo; STable *pTable = tsdbIsValidTableToInsert(pRepo->tsdbMeta, pBlock->tableId); - if (pTable == NULL) { - return -1; - } + if (pTable == NULL) return -1; - SDataRows rows = pBlock->data; - SDataRowsIter rDataIter, *pIter; - pIter = &rDataIter; + SSubmitBlkIter blkIter; SDataRow row; - tdInitSDataRowsIter(rows, pIter); - while ((row = tdDataRowsNext(pIter)) != NULL) { + tsdbInitSubmitBlkIter(pBlock, &blkIter); + while ((row = tsdbGetSubmitBlkNext(&blkIter)) != NULL) { if (tdInsertRowToTable(pRepo, row, pTable) < 0) { - // TODO: deal with the error here + return -1; } } diff --git a/src/vnode/tsdb/tests/tsdbTests.cpp b/src/vnode/tsdb/tests/tsdbTests.cpp index 052ad38d01c73a0ec30ed835bb41c69547e6e4c2..7b09fdfcdebc0828b08e22f30157a413a553b3ab 100644 --- a/src/vnode/tsdb/tests/tsdbTests.cpp +++ b/src/vnode/tsdb/tests/tsdbTests.cpp @@ -54,11 +54,11 @@ TEST(TsdbTest, createRepo) { tdAppendColVal(row, (void *)(&val), schemaColAt(schema, j)); } - pBlock->len += dataRowLen(row); } + pBlock->len += dataRowLen(row); - pMsg->length = pMsg->length + sizeof(SSubmitBlk) + pBlock->len; } + pMsg->length = pMsg->length + sizeof(SSubmitBlk) + pBlock->len; tsdbInsertData(pRepo, pMsg); }