diff --git a/src/vnode/common/inc/dataformat.h b/src/vnode/common/inc/dataformat.h index 7fd8f4e1063c8bce572a3b9426555d06a13fc216..63e54c9a57493a710c02664b0c140bbcd538a0f4 100644 --- a/src/vnode/common/inc/dataformat.h +++ b/src/vnode/common/inc/dataformat.h @@ -58,10 +58,24 @@ void tdDataRowReset(SDataRow row); */ typedef void *SDataRows; +#define TD_DATA_ROWS_HEAD_LEN sizeof(int32_t) + #define dataRowsLen(rs) (*(int32_t *)(rs)) #define dataRowsSetLen(rs, l) (dataRowsLen(rs) = (l)) #define dataRowsInit(rs) dataRowsSetLen(rs, sizeof(int32_t)) +void tdDataRowsAppendRow(SDataRows rows, SDataRow row); + +// Data rows iterator +typedef struct { + int32_t totalLen; + int32_t len; + SDataRow row; +} SDataRowsIter; + +void tdInitSDataRowsIter(SDataRows rows, SDataRowsIter *pIter); +SDataRow tdDataRowsNext(SDataRowsIter *pIter); + /* Data column definition * +---------+---------+-----------------------+ * | int32_t | int32_t | | @@ -80,12 +94,6 @@ typedef char *SDataCol; */ typedef char *SDataCols; -typedef struct { - int32_t rowCounter; - int32_t totalRows; - SDataRow row; -} SDataRowsIter; - // ----------------- Data column structure // ---- operation on SDataRow; @@ -110,11 +118,6 @@ void tdFreeSDataRow(SDataRow rdata); #define TD_DATACOLS_LEN(pDataCols) (*(int32_t *)(pDataCols)) #define TD_DATACOLS_NPOINTS(pDataCols) (*(int32_t *)(pDataCols + sizeof(int32_t))) -// ---- operation on SDataRowIter -void tdInitSDataRowsIter(SDataRows rows, SDataRowsIter *pIter); -int32_t tdRdataIterEnd(SDataRowsIter *pIter); -void tdRdataIterNext(SDataRowsIter *pIter); - #ifdef __cplusplus } #endif diff --git a/src/vnode/common/src/dataformat.c b/src/vnode/common/src/dataformat.c index ed03df6df27c632521b78f16d692ac330bceb719..707f7a8eb2d58688827d84e2d36d5ec18f73a362 100644 --- a/src/vnode/common/src/dataformat.c +++ b/src/vnode/common/src/dataformat.c @@ -58,7 +58,7 @@ int32_t tdAppendColVal(SDataRow row, void *value, SColumn *pCol, int32_t suffixO case TD_DATATYPE_DOUBLE: case TD_DATATYPE_TIMESTAMP: memcpy(dataRowIdx(row, pCol->offset + sizeof(int32_t)), value, rowDataLen[pCol->type]); - if (dataRowLen(row) > suffixOffset + sizeof(int32_t)) + if (dataRowLen(row) < suffixOffset + sizeof(int32_t)) dataRowSetLen(row, dataRowLen(row) + rowDataLen[pCol->type]); break; case TD_DATATYPE_VARCHAR: @@ -81,26 +81,47 @@ int32_t tdAppendColVal(SDataRow row, void *value, SColumn *pCol, int32_t suffixO void tdDataRowCpy(void *dst, SDataRow row) { memcpy(dst, row, dataRowLen(row)); } void tdDataRowReset(SDataRow row) { dataRowSetLen(row, sizeof(int32_t)); } -// ------ Codes below should be refactored - -SDataRow tdSDataRowDup(SDataRow rdata) { return NULL; } -void tdFreeSDataRow(SDataRow rdata) { - if (rdata == NULL) return; - free(rdata); +void tdDataRowsAppendRow(SDataRows rows, SDataRow row) { + tdDataRowCpy((void *)((char *)rows + dataRowsLen(rows)), row); + dataRowsSetLen(rows, dataRowsLen(rows) + dataRowLen(row)); } +// Initialize the iterator void tdInitSDataRowsIter(SDataRows rows, SDataRowsIter *pIter) { - pIter->totalRows = TD_DATAROWS_ROWS(rows); - pIter->rowCounter = 1; - pIter->row = TD_DATAROWS_DATA(rows); + if (pIter == NULL) return; + pIter->totalLen = dataRowsLen(rows); + + if (pIter->totalLen == TD_DATA_ROWS_HEAD_LEN) { + pIter->row = NULL; + return; + } + + pIter->row = (SDataRow)((char *)rows + TD_DATA_ROWS_HEAD_LEN); + pIter->len = TD_DATA_ROWS_HEAD_LEN + dataRowLen(pIter->row); } -void tdRdataIterNext(SDataRowsIter *pIter) { - pIter->rowCounter++; - pIter->row = pIter->row + TD_DATAROW_LEN(pIter->row); +// Get the next row in Rows +SDataRow tdDataRowsNext(SDataRowsIter *pIter) { + SDataRow row = pIter->row; + if (row == NULL) return NULL; + + if (pIter->len >= pIter->totalLen) { + pIter->row = NULL; + } else { + pIter->row = (char *)row + dataRowLen(row); + pIter->len += dataRowLen(row); + } + + return row; } -int32_t tdRdataIterEnd(SDataRowsIter *pIter) { return pIter->rowCounter >= pIter->totalRows; } +// ------ Codes below should be refactored + +SDataRow tdSDataRowDup(SDataRow rdata) { return NULL; } +void tdFreeSDataRow(SDataRow rdata) { + if (rdata == NULL) return; + free(rdata); +} /** * Copy it diff --git a/src/vnode/tsdb/src/tsdbCache.c b/src/vnode/tsdb/src/tsdbCache.c index dfcafc0c3203e53553cb545e1d424c5eed2cf459..dacb36025370a27267bdc72557408c3f8db93974 100644 --- a/src/vnode/tsdb/src/tsdbCache.c +++ b/src/vnode/tsdb/src/tsdbCache.c @@ -30,5 +30,8 @@ int32_t tsdbFreeCache(STsdbCache *pHandle) { return 0; } void *tsdbAllocFromCache(STsdbCache *pCache, int64_t bytes) { // TODO: implement here - return NULL; + void *ptr = malloc(bytes); + if (ptr == NULL) return NULL; + + return ptr; } \ No newline at end of file diff --git a/src/vnode/tsdb/src/tsdbMain.c b/src/vnode/tsdb/src/tsdbMain.c index c6e696e7652dfc5bdd3a10e08599f1afeab69d69..155ad192064d74d329bb5605788a7fc81e71ffcc 100644 --- a/src/vnode/tsdb/src/tsdbMain.c +++ b/src/vnode/tsdb/src/tsdbMain.c @@ -409,14 +409,16 @@ static int32_t tdInsertRowToTable(STsdbRepo *pRepo, SDataRow row, STable *pTable int32_t level = 0; int32_t headSize = 0; + tSkipListRandNodeInfo(pTable->content.pData, &level, &headSize); + // Copy row into the memory - SSkipListNode *pNode = tsdbAllocFromCache(pRepo->tsdbCache, headSize + TD_DATAROW_LEN(row)); + SSkipListNode *pNode = tsdbAllocFromCache(pRepo->tsdbCache, headSize + dataRowLen(row)); if (pNode == NULL) { // TODO: deal with allocate failure } pNode->level = level; - tdSDataRowCpy(row, SL_GET_NODE_DATA(pNode)); + tdDataRowCpy(SL_GET_NODE_DATA(pNode), row); // Insert the skiplist node into the data tsdbInsertRowToTableImpl(pNode, pTable); @@ -434,14 +436,14 @@ static int32_t tsdbInsertDataToTable(tsdb_repo_t *repo, SSubmitBlock *pBlock) { SDataRows rows = pBlock->data; SDataRowsIter rDataIter, *pIter; + pIter = &rDataIter; + SDataRow row; tdInitSDataRowsIter(rows, pIter); - while (!tdRdataIterEnd(pIter)) { - if (tdInsertRowToTable(pRepo, pIter->row, pTable) < 0) { + while ((row = tdDataRowsNext(pIter)) != NULL) { + if (tdInsertRowToTable(pRepo, row, pTable) < 0) { // TODO: deal with the error here } - - tdRdataIterNext(pIter); } return 0; diff --git a/src/vnode/tsdb/tests/tsdbTests.cpp b/src/vnode/tsdb/tests/tsdbTests.cpp index c60fd57b07ee471861d295ba1661732ffd6f32f2..534b75bfb664bd5a3d3ab739f94d538a72ae6bf5 100644 --- a/src/vnode/tsdb/tests/tsdbTests.cpp +++ b/src/vnode/tsdb/tests/tsdbTests.cpp @@ -4,7 +4,7 @@ #include "tsdb.h" #include "tsdbMeta.h" -TEST(TsdbTest, createTable) { +TEST(TsdbTest, DISABLED_createTable) { STsdbMeta *pMeta = tsdbCreateMeta(100); ASSERT_NE(pMeta, nullptr); @@ -54,6 +54,8 @@ TEST(TsdbTest, createRepo) { int32_t size = sizeof(SSubmitMsg) + sizeof(SSubmitBlock) + tdMaxRowDataBytes(config.schema) * 10 + sizeof(int32_t); + tdUpdateSchema(config.schema); + SSubmitMsg *pMsg = (SSubmitMsg *)malloc(size); pMsg->numOfTables = 1; // TODO: use api @@ -67,21 +69,19 @@ TEST(TsdbTest, createRepo) { SDataRow row = tdNewDataRow(tdMaxRowDataBytes(config.schema)); int64_t ttime = 1583508800000; - void *pDst = pBlock->data; for (int i = 0; i < 10; i++) { // loop over rows ttime += (10000 * i); tdDataRowReset(row); for (int j = 0; j < schemaNCols(config.schema); j++) { if (j == 0) { // set time stamp - tdAppendColVal(row, (void *)(&ttime), schemaColAt(config.schema, j), 24); + tdAppendColVal(row, (void *)(&ttime), schemaColAt(config.schema, j), 40); } else { // set other fields - int val = 10; - tdAppendColVal(row, (void *)(&val), schemaColAt(config.schema, j), 24); + int32_t val = 10; + tdAppendColVal(row, (void *)(&val), schemaColAt(config.schema, j), 40); } } - dataRowCpy((void *)pDst, row); - pDst += dataRowLen(row); + tdDataRowsAppendRow(rows, row); } tsdbInsertData(pRepo, pMsg);