diff --git a/src/common/inc/dataformat.h b/src/common/inc/dataformat.h index 08e49dd3c5c6045e074baeac38cac0c99e3b4ded..4589a0573aca0e9a8595cb9942f6ddfc9b6bc00b 100644 --- a/src/common/inc/dataformat.h +++ b/src/common/inc/dataformat.h @@ -69,31 +69,35 @@ void tdUpdateSchema(STSchema *pSchema); // ----------------- Data row structure /* A data row, the format is like below: - * +---------+---------------------------------+ - * | int32_t | | - * +---------+---------------------------------+ - * | len | row | - * +---------+---------------------------------+ + * +----------+---------+---------------------------------+---------------------------------+ + * | int32_t | int32_t | | | + * +----------+---------+---------------------------------+---------------------------------+ + * | len | flen | First part | Second part | + * +----------+---------+---------------------------------+---------------------------------+ + * plen: first part length * len: the length including sizeof(row) + sizeof(len) * row: actual row data encoding */ typedef void *SDataRow; -#define TD_DATA_ROW_HEAD_SIZE sizeof(int32_t) +#define TD_DATA_ROW_HEAD_SIZE (2 * sizeof(int32_t)) #define dataRowLen(r) (*(int32_t *)(r)) +#define dataRowFLen(r) (*(int32_t *)((char *)(r) + sizeof(int32_t))) #define dataRowTuple(r) ((char *)(r) + TD_DATA_ROW_HEAD_SIZE) #define dataRowSetLen(r, l) (dataRowLen(r) = (l)) +#define dataRowSetFLen(r, l) (dataRowFLen(r) = (l)) #define dataRowIdx(r, i) ((char *)(r) + i) #define dataRowCpy(dst, r) memcpy((dst), (r), dataRowLen(r)) +#define dataRowAt(r, idx) ((char *)(r) + (idx)) -SDataRow tdNewDataRow(int32_t bytes); +void tdInitDataRow(SDataRow row, STSchema *pSchema); int tdMaxRowBytesFromSchema(STSchema *pSchema); +SDataRow tdNewDataRow(int32_t bytes, STSchema *pSchema); SDataRow tdNewDataRowFromSchema(STSchema *pSchema); void tdFreeDataRow(SDataRow row); -int tdAppendColVal(SDataRow row, void *value, STColumn *pCol, int32_t suffixOffset); -void tdDataRowCpy(void *dst, SDataRow row); -void tdDataRowReset(SDataRow row); +int tdAppendColVal(SDataRow row, void *value, STColumn *pCol); +void tdDataRowReset(SDataRow row, STSchema *pSchema); SDataRow tdDataRowDup(SDataRow row); /* Data rows definition, the format of it is like below: diff --git a/src/common/src/dataformat.c b/src/common/src/dataformat.c index c6fd4af28428ac0d7264922be915f5ad3d0016c8..58530c5e3d54bdefe7a38b9e6c9c13112f0a2602 100644 --- a/src/common/src/dataformat.c +++ b/src/common/src/dataformat.c @@ -14,6 +14,8 @@ */ #include "dataformat.h" +static int tdFLenFromSchema(STSchema *pSchema); + /** * Create a new STColumn object * ASSUMPTIONS: VALID PARAMETERS @@ -157,6 +159,14 @@ void tdUpdateSchema(STSchema *pSchema) { } } +/** + * Initialize a data row + */ +void tdInitDataRow(SDataRow row, STSchema *pSchema) { + dataRowSetFLen(row, TD_DATA_ROW_HEAD_SIZE); + dataRowSetLen(row, TD_DATA_ROW_HEAD_SIZE + tdFLenFromSchema(pSchema)); +} + /** * Create a data row with maximum row length bytes. * @@ -167,13 +177,13 @@ void tdUpdateSchema(STSchema *pSchema) { * @return SDataRow object for success * NULL for failure */ -SDataRow tdNewDataRow(int32_t bytes) { +SDataRow tdNewDataRow(int32_t bytes, STSchema *pSchema) { int32_t size = sizeof(int32_t) + bytes; SDataRow row = malloc(size); if (row == NULL) return NULL; - dataRowSetLen(row, sizeof(int32_t)); + tdInitDataRow(row, pSchema); return row; } @@ -197,14 +207,7 @@ int tdMaxRowBytesFromSchema(STSchema *pSchema) { return bytes; } -SDataRow tdNewDataRowFromSchema(STSchema *pSchema) { - int bytes = 0; - { - // TODO: estimiate size from schema - } - - return tdNewDataRow(bytes); -} +SDataRow tdNewDataRowFromSchema(STSchema *pSchema) { return tdNewDataRow(tdMaxRowBytesFromSchema(pSchema), pSchema); } /** * Free the SDataRow object @@ -214,62 +217,37 @@ void tdFreeDataRow(SDataRow row) { } /** - * Append a column value to a SDataRow object. - * NOTE: THE APPLICATION SHOULD MAKE SURE VALID PARAMETERS. THE FUNCTION ASSUMES - * THE ROW OBJECT HAS ENOUGH SPACE TO HOLD THE VALUE. - * - * @param row the row to append value to - * @param value value pointer to append - * @param pSchema schema - * @param colIdx column index - * - * @return 0 for success and -1 for failure + * Append a column value to the data row */ -// int32_t tdAppendColVal(SDataRow row, void *value, SColumn *pCol, int32_t suffixOffset) { -// int32_t offset; - -// switch (pCol->type) { -// case TD_DATATYPE_BOOL: -// case TD_DATATYPE_TINYINT: -// case TD_DATATYPE_SMALLINT: -// case TD_DATATYPE_INT: -// case TD_DATATYPE_BIGINT: -// case TD_DATATYPE_FLOAT: -// case TD_DATATYPE_DOUBLE: -// case TD_DATATYPE_TIMESTAMP: -// memcpy(dataRowIdx(row, pCol->offset + sizeof(int32_t)), value, rowDataLen[pCol->type]); -// if (dataRowLen(row) < suffixOffset + sizeof(int32_t)) -// dataRowSetLen(row, dataRowLen(row) + rowDataLen[pCol->type]); -// break; -// case TD_DATATYPE_VARCHAR: -// offset = dataRowLen(row) > suffixOffset ? dataRowLen(row) : suffixOffset; -// memcpy(dataRowIdx(row, pCol->offset+sizeof(int32_t)), (void *)(&offset), sizeof(offset)); -// case TD_DATATYPE_NCHAR: -// case TD_DATATYPE_BINARY: -// break; -// default: -// return -1; -// } - -// return 0; -// } +int tdAppendColVal(SDataRow row, void *value, STColumn *pCol) { + switch (colType(pCol)) + { + case TSDB_DATA_TYPE_BINARY: + case TSDB_DATA_TYPE_NCHAR: + *(int32_t *)dataRowAt(row, dataRowFLen(row)) = dataRowLen(row); + dataRowFLen(row) += TYPE_BYTES[colType(pCol)]; + memcpy((void *)dataRowAt(row, dataRowLen(row)), value, strlen(value)); + dataRowLen(row) += strlen(value); + break; + default: + memcpy(dataRowAt(row, dataRowFLen(row)), value, TYPE_BYTES[colType(pCol)]); + dataRowFLen(row) += TYPE_BYTES[colType(pCol)]; + break; + } +} + +void tdDataRowReset(SDataRow row, STSchema *pSchema) { tdInitDataRow(row, pSchema); } -/** - * Copy a data row to a destination - * ASSUMPTIONS: dst has enough room for a copy of row - */ -void tdDataRowCpy(void *dst, SDataRow row) { memcpy(dst, row, dataRowLen(row)); } -void tdDataRowReset(SDataRow row) { dataRowSetLen(row, sizeof(int32_t)); } SDataRow tdDataRowDup(SDataRow row) { - SDataRow trow = tdNewDataRow(dataRowLen(row)); + SDataRow trow = malloc(dataRowLen(row)); if (trow == NULL) return NULL; dataRowCpy(trow, row); - return row; + return trow; } void tdDataRowsAppendRow(SDataRows rows, SDataRow row) { - tdDataRowCpy((void *)((char *)rows + dataRowsLen(rows)), row); + dataRowCpy((void *)((char *)rows + dataRowsLen(rows)), row); dataRowsSetLen(rows, dataRowsLen(rows) + dataRowLen(row)); } @@ -300,4 +278,17 @@ SDataRow tdDataRowsNext(SDataRowsIter *pIter) { } return row; +} + +/** + * Return the first part length of a data row for a schema + */ +static int tdFLenFromSchema(STSchema *pSchema) { + int ret = 0; + for (int i = 0; i < schemaNCols(pSchema); i++) { + STColumn *pCol = schemaColAt(pSchema, i); + ret += TYPE_BYTES[pCol->type]; + } + + return ret; } \ No newline at end of file diff --git a/src/vnode/tsdb/inc/tsdb.h b/src/vnode/tsdb/inc/tsdb.h index c41384114abd7eb039fd8c4ad0e442eaf92c8ec1..57798b6a091401d2eb644b4337612428032b7ea6 100644 --- a/src/vnode/tsdb/inc/tsdb.h +++ b/src/vnode/tsdb/inc/tsdb.h @@ -96,7 +96,7 @@ typedef struct { STableId tableId; int32_t padding; // TODO just for padding here int32_t sversion; // data schema version - int32_t len; // message length + int32_t len; // data part length, not including the SSubmitBlk head char data[]; } SSubmitBlk; diff --git a/src/vnode/tsdb/src/tsdbMain.c b/src/vnode/tsdb/src/tsdbMain.c index 37901ba588b91b535b0ec1734e6f393f8afa20de..f3aed75e790c3aba4e3baf73bfece7eab5a97095 100644 --- a/src/vnode/tsdb/src/tsdbMain.c +++ b/src/vnode/tsdb/src/tsdbMain.c @@ -621,7 +621,7 @@ static int32_t tdInsertRowToTable(STsdbRepo *pRepo, SDataRow row, STable *pTable } pNode->level = level; - tdDataRowCpy(SL_GET_NODE_DATA(pNode), row); + dataRowCpy(SL_GET_NODE_DATA(pNode), row); // Insert the skiplist node into the data tsdbInsertRowToTableImpl(pNode, pTable); diff --git a/src/vnode/tsdb/tests/tsdbTests.cpp b/src/vnode/tsdb/tests/tsdbTests.cpp index 1aa85c0808db26afb87a45536b3211365fe24ed0..eba7df3adb351adb137ef84576f8f2081a9b320c 100644 --- a/src/vnode/tsdb/tests/tsdbTests.cpp +++ b/src/vnode/tsdb/tests/tsdbTests.cpp @@ -32,15 +32,29 @@ TEST(TsdbTest, createRepo) { tsdbCreateTable(pRepo, &tCfg); - // 3. Loop to write some simple data - // int size = tdMaxRowBytesFromSchema(schema); - // int nrows = 100; - // SSubmitMsg *pMsg = (SSubmitMsg *)malloc(sizeof(SSubmitMsg) + sizeof(SSubmitBlk+ size * nrows); - - // { - // // TODO + // // 3. Loop to write some simple data + // int nRows = 10; + // SSubmitMsg *pMsg = (SSubmitMsg *)malloc(sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + tdMaxRowBytesFromSchema(schema) * nRows); + + // SSubmitBlk *pBlock = pMsg->blocks; + // pBlock->tableId = {.uid = 987607499877672L, .tid = 0}; + // pBlock->sversion = 0; + // pBlock->len = 0; + // int64_t start_time = 1584081000000; + // for (int i = 0; i < nRows; i++) { + // int64_t ttime = start_time + 1000 * i; + // SDataRow row = (SDataRow)(pBlock->data + pBlock->len); + // dataRowInit(row); + + // for (int j; j < schemaNCols(schema); j++) { + // if (j == 0) { // Just for timestamp + // tdAppendColVal(row, (void *)(&time), schemaColAt(schema, i), ); + // } else { // For int + + // } + // } + + // pBlock->len += dataRowLen(row); // } - - // tsdbInsertData(pRepo, pMsg); }